|
|
@ -140,24 +140,55 @@ _log = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
class ChunkRequest: |
|
|
|
__slots__ = ( |
|
|
|
'guild_id', |
|
|
|
'resolver', |
|
|
|
'loop', |
|
|
|
'limit', |
|
|
|
'remaining', |
|
|
|
'cache', |
|
|
|
'oneshot', |
|
|
|
'nonce', |
|
|
|
'buffer', |
|
|
|
'last_buffer', |
|
|
|
'waiters', |
|
|
|
) |
|
|
|
|
|
|
|
def __init__( |
|
|
|
self, |
|
|
|
guild_id: int, |
|
|
|
loop: asyncio.AbstractEventLoop, |
|
|
|
resolver: Callable[[int], Any], |
|
|
|
*, |
|
|
|
limit: Optional[int] = None, |
|
|
|
cache: bool = True, |
|
|
|
oneshot: bool = True, |
|
|
|
) -> None: |
|
|
|
self.guild_id: int = guild_id |
|
|
|
self.resolver: Callable[[int], Any] = resolver |
|
|
|
self.loop: asyncio.AbstractEventLoop = loop |
|
|
|
self.limit: Optional[int] = limit |
|
|
|
self.remaining: int = limit or 0 |
|
|
|
self.cache: bool = cache |
|
|
|
self.oneshot: bool = oneshot |
|
|
|
self.nonce: str = str(utils.time_snowflake(utils.utcnow())) |
|
|
|
self.buffer: List[Member] = [] |
|
|
|
self.last_buffer: Optional[List[Member]] = None |
|
|
|
self.waiters: List[asyncio.Future[List[Member]]] = [] |
|
|
|
|
|
|
|
def add_members(self, members: List[Member]) -> None: |
|
|
|
unique_members = set(members) |
|
|
|
if self.limit is not None: |
|
|
|
if self.remaining <= 0: |
|
|
|
return |
|
|
|
|
|
|
|
members = list(unique_members)[: self.remaining] |
|
|
|
self.remaining -= len(unique_members) |
|
|
|
else: |
|
|
|
members = list(unique_members) |
|
|
|
|
|
|
|
self.buffer.extend(members) |
|
|
|
|
|
|
|
if self.cache: |
|
|
|
guild = self.resolver(self.guild_id) |
|
|
|
if guild is None: |
|
|
@ -166,6 +197,9 @@ class ChunkRequest: |
|
|
|
for member in members: |
|
|
|
guild._add_member(member) |
|
|
|
|
|
|
|
if not self.oneshot: |
|
|
|
self.last_buffer = members |
|
|
|
|
|
|
|
async def wait(self) -> List[Member]: |
|
|
|
future = self.loop.create_future() |
|
|
|
self.waiters.append(future) |
|
|
@ -180,12 +214,28 @@ class ChunkRequest: |
|
|
|
return future |
|
|
|
|
|
|
|
def done(self) -> None: |
|
|
|
result = self.buffer if self.oneshot else self.last_buffer or self.buffer |
|
|
|
for future in self.waiters: |
|
|
|
if not future.done(): |
|
|
|
future.set_result(self.buffer) |
|
|
|
future.set_result(result) |
|
|
|
|
|
|
|
|
|
|
|
class MemberSidebar: |
|
|
|
__slots__ = ( |
|
|
|
'guild', |
|
|
|
'channels', |
|
|
|
'chunk', |
|
|
|
'delay', |
|
|
|
'cache', |
|
|
|
'loop', |
|
|
|
'safe_override', |
|
|
|
'ranges', |
|
|
|
'subscribing', |
|
|
|
'buffer', |
|
|
|
'exception', |
|
|
|
'waiters', |
|
|
|
) |
|
|
|
|
|
|
|
def __init__( |
|
|
|
self, |
|
|
|
guild: Guild, |
|
|
@ -301,6 +351,7 @@ class MemberSidebar: |
|
|
|
return list(ret) |
|
|
|
|
|
|
|
def add_members(self, members: List[Member]) -> None: |
|
|
|
members = list(set(members)) |
|
|
|
self.buffer.extend(members) |
|
|
|
if self.cache: |
|
|
|
guild = self.guild |
|
|
@ -653,7 +704,8 @@ class ConnectionState: |
|
|
|
request.add_members(members) |
|
|
|
if complete: |
|
|
|
request.done() |
|
|
|
removed.append(key) |
|
|
|
if request.oneshot: |
|
|
|
removed.append(key) |
|
|
|
|
|
|
|
for key in removed: |
|
|
|
del self._chunk_requests[key] |
|
|
@ -905,9 +957,18 @@ class ConnectionState: |
|
|
|
return self.ws.request_lazy_guild(guild_id, typing=typing, activities=activities, threads=threads) |
|
|
|
|
|
|
|
def chunker( |
|
|
|
self, guild_id: int, query: str = '', limit: int = 0, presences: bool = True, *, nonce: Optional[str] = None |
|
|
|
self, |
|
|
|
guild_id: int, |
|
|
|
query: Optional[str] = '', |
|
|
|
limit: int = 0, |
|
|
|
presences: bool = True, |
|
|
|
*, |
|
|
|
user_ids: Optional[List[Snowflake]] = None, |
|
|
|
nonce: Optional[str] = None, |
|
|
|
): |
|
|
|
return self.ws.request_chunks([guild_id], query=query, limit=limit, presences=presences, nonce=nonce) |
|
|
|
return self.ws.request_chunks( |
|
|
|
[guild_id], query=query, limit=limit, presences=presences, user_ids=user_ids, nonce=nonce |
|
|
|
) |
|
|
|
|
|
|
|
async def query_members( |
|
|
|
self, |
|
|
@ -923,14 +984,51 @@ class ConnectionState: |
|
|
|
self._chunk_requests[request.nonce] = request |
|
|
|
|
|
|
|
try: |
|
|
|
await self.ws.request_chunks( |
|
|
|
[guild_id], query=query, limit=limit, user_ids=user_ids, presences=presences, nonce=request.nonce |
|
|
|
await self.chunker( |
|
|
|
guild_id, query=query, limit=limit, presences=presences, user_ids=user_ids, nonce=request.nonce |
|
|
|
) |
|
|
|
return await asyncio.wait_for(request.wait(), timeout=30.0) |
|
|
|
except asyncio.TimeoutError: |
|
|
|
_log.warning('Timed out waiting for chunks with query %r and limit %d for guild_id %d.', query, limit, guild_id) |
|
|
|
raise |
|
|
|
|
|
|
|
async def search_recent_members( |
|
|
|
self, |
|
|
|
guild: Guild, |
|
|
|
query: str = '', |
|
|
|
limit: Optional[int] = None, |
|
|
|
cache: bool = False, |
|
|
|
) -> List[Member]: |
|
|
|
guild_id = guild.id |
|
|
|
request = ChunkRequest(guild.id, self.loop, self._get_guild, limit=limit, cache=cache, oneshot=False) |
|
|
|
self._chunk_requests[request.nonce] = request |
|
|
|
|
|
|
|
# Unlike query members, this OP is paginated |
|
|
|
old_continuation_token = None |
|
|
|
continuation_token = None |
|
|
|
while True: |
|
|
|
try: |
|
|
|
await self.ws.search_recent_members(guild_id, query=query, nonce=request.nonce, after=continuation_token) |
|
|
|
returned = await asyncio.wait_for(request.wait(), timeout=30.0) |
|
|
|
except asyncio.TimeoutError: |
|
|
|
_log.warning( |
|
|
|
'Timed out waiting for search chunks with query %r and limit %d for guild_id %d.', query, limit, guild_id |
|
|
|
) |
|
|
|
raise |
|
|
|
|
|
|
|
if (limit is not None and request.remaining < 1) or len(returned) < 1: |
|
|
|
break |
|
|
|
|
|
|
|
# Sort the members by joined_at timestamp and grab the oldest one |
|
|
|
request.buffer.sort(key=lambda m: m.joined_at or utils.utcnow()) |
|
|
|
old_continuation_token = continuation_token |
|
|
|
continuation_token = request.buffer[0].id |
|
|
|
if continuation_token == old_continuation_token: |
|
|
|
break |
|
|
|
|
|
|
|
self._chunk_requests.pop(request.nonce, None) |
|
|
|
return list(set(request.buffer)) |
|
|
|
|
|
|
|
async def _delay_ready(self) -> None: |
|
|
|
try: |
|
|
|
states = [] |
|
|
|