|
|
@ -57,12 +57,28 @@ from .object import Object |
|
|
|
from .invite import Invite |
|
|
|
|
|
|
|
class ChunkRequest: |
|
|
|
__slots__ = ('guild_id', 'nonce', 'future') |
|
|
|
|
|
|
|
def __init__(self, guild_id, future): |
|
|
|
def __init__(self, guild_id, future, resolver, *, cache=True): |
|
|
|
self.guild_id = guild_id |
|
|
|
self.resolver = resolver |
|
|
|
self.cache = cache |
|
|
|
self.nonce = os.urandom(16).hex() |
|
|
|
self.future = future |
|
|
|
self.buffer = [] # List[Member] |
|
|
|
|
|
|
|
def add_members(self, members): |
|
|
|
self.buffer.extend(members) |
|
|
|
if self.cache: |
|
|
|
guild = self.resolver(self.guild_id) |
|
|
|
if guild is None: |
|
|
|
return |
|
|
|
|
|
|
|
for member in members: |
|
|
|
existing = guild.get_member(member.id) |
|
|
|
if existing is None or existing.joined_at is None: |
|
|
|
guild._add_member(member) |
|
|
|
|
|
|
|
def done(self): |
|
|
|
self.future.set_result(self.buffer) |
|
|
|
|
|
|
|
log = logging.getLogger(__name__) |
|
|
|
|
|
|
@ -156,7 +172,7 @@ class ConnectionState: |
|
|
|
# to reconnect loops which cause mass allocations and deallocations. |
|
|
|
gc.collect() |
|
|
|
|
|
|
|
def process_chunk_requests(self, guild_id, nonce, members): |
|
|
|
def process_chunk_requests(self, guild_id, nonce, members, complete): |
|
|
|
removed = [] |
|
|
|
for i, request in enumerate(self._chunk_requests): |
|
|
|
future = request.future |
|
|
@ -165,8 +181,10 @@ class ConnectionState: |
|
|
|
continue |
|
|
|
|
|
|
|
if request.guild_id == guild_id and request.nonce == nonce: |
|
|
|
future.set_result(members) |
|
|
|
removed.append(i) |
|
|
|
request.add_members(members) |
|
|
|
if complete: |
|
|
|
request.done() |
|
|
|
removed.append(i) |
|
|
|
|
|
|
|
for index in reversed(removed): |
|
|
|
del self._chunk_requests[index] |
|
|
@ -330,19 +348,13 @@ class ConnectionState: |
|
|
|
raise RuntimeError('Somehow do not have a websocket for this guild_id') |
|
|
|
|
|
|
|
future = self.loop.create_future() |
|
|
|
request = ChunkRequest(guild.id, future) |
|
|
|
request = ChunkRequest(guild.id, future, self._get_guild, cache=cache) |
|
|
|
self._chunk_requests.append(request) |
|
|
|
|
|
|
|
try: |
|
|
|
# start the query operation |
|
|
|
await ws.request_chunks(guild_id, query=query, limit=limit, user_ids=user_ids, nonce=request.nonce) |
|
|
|
members = await asyncio.wait_for(future, timeout=30.0) |
|
|
|
|
|
|
|
if cache: |
|
|
|
for member in members: |
|
|
|
guild._add_member(member) |
|
|
|
|
|
|
|
return members |
|
|
|
return await asyncio.wait_for(future, timeout=30.0) |
|
|
|
except asyncio.TimeoutError: |
|
|
|
log.warning('Timed out waiting for chunks with query %r and limit %d for guild_id %d', query, limit, guild_id) |
|
|
|
raise |
|
|
@ -747,9 +759,10 @@ class ConnectionState: |
|
|
|
|
|
|
|
return self._add_guild_from_data(data) |
|
|
|
|
|
|
|
async def chunk_guild(self, guild, *, wait=True): |
|
|
|
async def chunk_guild(self, guild, *, wait=True, cache=None): |
|
|
|
cache = cache or self._cache_members |
|
|
|
future = self.loop.create_future() |
|
|
|
request = ChunkRequest(guild.id, future) |
|
|
|
request = ChunkRequest(guild.id, future, self._get_guild, cache=cache) |
|
|
|
self._chunk_requests.append(request) |
|
|
|
await self.chunker(guild.id, nonce=request.nonce) |
|
|
|
if wait: |
|
|
@ -893,14 +906,8 @@ class ConnectionState: |
|
|
|
guild = self._get_guild(guild_id) |
|
|
|
members = [Member(guild=guild, data=member, state=self) for member in data.get('members', [])] |
|
|
|
log.debug('Processed a chunk for %s members in guild ID %s.', len(members), guild_id) |
|
|
|
if self._cache_members: |
|
|
|
for member in members: |
|
|
|
existing = guild.get_member(member.id) |
|
|
|
if existing is None or existing.joined_at is None: |
|
|
|
guild._add_member(member) |
|
|
|
|
|
|
|
if data.get('chunk_index', 0) + 1 == data.get('chunk_count'): |
|
|
|
self.process_chunk_requests(guild_id, data.get('nonce'), members) |
|
|
|
complete = data.get('chunk_index', 0) + 1 == data.get('chunk_count') |
|
|
|
self.process_chunk_requests(guild_id, data.get('nonce'), members, complete) |
|
|
|
|
|
|
|
def parse_guild_integrations_update(self, data): |
|
|
|
guild = self._get_guild(int(data['guild_id'])) |
|
|
@ -1075,7 +1082,7 @@ class AutoShardedConnectionState(ConnectionState): |
|
|
|
current_bucket.append(future) |
|
|
|
else: |
|
|
|
future = self.loop.create_future() |
|
|
|
future.set_result(True) |
|
|
|
future.set_result([]) |
|
|
|
|
|
|
|
processed.append((guild, future)) |
|
|
|
|
|
|
|