diff --git a/discord/gateway.py b/discord/gateway.py index e7274520b..ce64c5018 100644 --- a/discord/gateway.py +++ b/discord/gateway.py @@ -370,7 +370,7 @@ class DiscordWebSocket: } if state._intents is not None: - payload['d']['intents'] = state._intents + payload['d']['intents'] = state._intents.value await self.call_hooks('before_identify', self.shard_id, initial=self._initial_identify) await self.send_as_json(payload) diff --git a/discord/guild.py b/discord/guild.py index 0bf94a280..c15b78cf4 100644 --- a/discord/guild.py +++ b/discord/guild.py @@ -2045,11 +2045,6 @@ class Guild(Hashable): This is a websocket operation and can be slow. - .. warning:: - - Most bots do not need to use this. It's mainly a helper - for bots who have disabled ``guild_subscriptions``. - .. versionadded:: 1.3 Parameters @@ -2059,7 +2054,7 @@ class Guild(Hashable): requests all members. limit: :class:`int` The maximum number of members to send back. This must be - a number between 1 and 1000. + a number between 1 and 100. cache: :class:`bool` Whether to cache the members internally. This makes operations such as :meth:`get_member` work for those that matched. @@ -2073,19 +2068,26 @@ class Guild(Hashable): ------- asyncio.TimeoutError The query timed out waiting for the members. + ValueError + Invalid parameters were passed to the function Returns -------- List[:class:`Member`] The list of members that have matched the query. """ - if user_ids is not None and query is not None: - raise TypeError('Cannot pass both query and user_ids') - if user_ids is None and query is None: - raise TypeError('Must pass either query or user_ids') + if query is None: + if query == '': + raise ValueError('Cannot pass empty query string.') + + if user_ids is None: + raise ValueError('Must pass either query or user_ids') + + if user_ids is not None and query is not None: + raise ValueError('Cannot pass both query and user_ids') - limit = limit or 5 + limit = min(100, limit or 5) return await self._state.query_members(self, query=query, limit=limit, user_ids=user_ids, cache=cache) async def change_voice_state(self, *, channel, self_mute=False, self_deaf=False): diff --git a/discord/state.py b/discord/state.py index 61f66457a..bcbc70b04 100644 --- a/discord/state.py +++ b/discord/state.py @@ -25,7 +25,7 @@ DEALINGS IN THE SOFTWARE. """ import asyncio -from collections import deque, namedtuple, OrderedDict +from collections import deque, OrderedDict import copy import datetime import itertools @@ -49,20 +49,22 @@ from .channel import * from .raw_models import * from .member import Member from .role import Role -from .enums import ChannelType, try_enum, Status, Enum +from .enums import ChannelType, try_enum, Status from . import utils from .flags import Intents from .embeds import Embed from .object import Object from .invite import Invite -class ListenerType(Enum): - chunk = 0 - query_members = 1 +class ChunkRequest: + __slots__ = ('guild_id', 'nonce', 'future') + + def __init__(self, guild_id, future): + self.guild_id = guild_id + self.nonce = os.urandom(16).hex() + self.future = future -Listener = namedtuple('Listener', ('type', 'future', 'predicate')) log = logging.getLogger(__name__) -ReadyState = namedtuple('ReadyState', ('launch', 'guilds')) async def logging_coroutine(coroutine, *, info): try: @@ -100,7 +102,7 @@ class ConnectionState: self.allowed_mentions = allowed_mentions # Only disable cache if both fetch_offline and guild_subscriptions are off. self._cache_members = (self._fetch_offline or self.guild_subscriptions) - self._listeners = [] + self._chunk_requests = [] activity = options.get('activity', None) if activity: @@ -120,7 +122,9 @@ class ConnectionState: if intents is not None: if not isinstance(intents, Intents): raise TypeError('intents parameter must be Intent not %r' % type(intents)) - intents = intents.value + + if not intents.members and self._fetch_offline: + raise ValueError('Intents.members has be enabled to fetch offline members.') self._activity = activity self._status = status @@ -152,34 +156,20 @@ class ConnectionState: # to reconnect loops which cause mass allocations and deallocations. gc.collect() - def get_nonce(self): - return os.urandom(16).hex() - - def process_listeners(self, listener_type, argument, result): + def process_chunk_requests(self, guild_id, nonce, members): removed = [] - for i, listener in enumerate(self._listeners): - if listener.type != listener_type: - continue - - future = listener.future + for i, request in enumerate(self._chunk_requests): + future = request.future if future.cancelled(): removed.append(i) continue - try: - passed = listener.predicate(argument) - except Exception as exc: - future.set_exception(exc) + if request.guild_id == guild_id and request.nonce == nonce: + future.set_result(members) removed.append(i) - else: - if passed: - future.set_result(result) - removed.append(i) - if listener.type == ListenerType.chunk: - break for index in reversed(removed): - del self._listeners[index] + del self._chunk_requests[index] def call_handlers(self, key, *args, **kwargs): try: @@ -313,10 +303,6 @@ class ConnectionState: self._add_guild(guild) return guild - def chunks_needed(self, guild): - for _ in range(math.ceil(guild._member_count / 1000)): - yield self.receive_chunk(guild.id) - def _get_guild_channel(self, data): channel_id = int(data['channel_id']) try: @@ -333,43 +319,20 @@ class ConnectionState: ws = self._get_websocket(guild_id) # This is ignored upstream await ws.request_chunks(guild_id, query=query, limit=limit, nonce=nonce) - async def request_offline_members(self, guilds): - # get all the chunks - chunks = [] - for guild in guilds: - chunks.extend(self.chunks_needed(guild)) - - # we only want to request ~75 guilds per chunk request. - splits = [guilds[i:i + 75] for i in range(0, len(guilds), 75)] - for split in splits: - await self.chunker([g.id for g in split]) - - # wait for the chunks - if chunks: - try: - await utils.sane_wait_for(chunks, timeout=len(chunks) * 30.0) - except asyncio.TimeoutError: - log.warning('Somehow timed out waiting for chunks.') - else: - log.info('Finished requesting guild member chunks for %d guilds.', len(guilds)) - async def query_members(self, guild, query, limit, user_ids, cache): guild_id = guild.id ws = self._get_websocket(guild_id) if ws is None: raise RuntimeError('Somehow do not have a websocket for this guild_id') - # Limits over 1000 cannot be supported since - # the main use case for this is guild_subscriptions being disabled - # and they don't receive GUILD_MEMBER events which make computing - # member_count impossible. The only way to fix it is by limiting - # the limit parameter to 1 to 1000. - nonce = self.get_nonce() - future = self.receive_member_query(guild_id, nonce) + future = self.loop.create_future() + request = ChunkRequest(guild.id, future) + self._chunk_requests.append(request) + try: # start the query operation - await ws.request_chunks(guild_id, query=query, limit=limit, user_ids=user_ids, nonce=nonce) - members = await asyncio.wait_for(future, timeout=5.0) + await ws.request_chunks(guild_id, query=query, limit=limit, user_ids=user_ids, nonce=request.nonce) + members = await asyncio.wait_for(future, timeout=30.0) if cache: for member in members: @@ -382,29 +345,26 @@ class ConnectionState: async def _delay_ready(self): try: - launch = self._ready_state.launch - # only real bots wait for GUILD_CREATE streaming if self.is_bot: while True: # this snippet of code is basically waiting N seconds # until the last GUILD_CREATE was sent try: - await asyncio.wait_for(launch.wait(), timeout=self.guild_ready_timeout) + guild = await asyncio.wait_for(self._ready_state.get(), timeout=self.guild_ready_timeout) except asyncio.TimeoutError: break else: - launch.clear() - - guilds = next(zip(*self._ready_state.guilds), []) - if self._fetch_offline: - await self.request_offline_members(guilds) - - for guild, unavailable in self._ready_state.guilds: - if unavailable is False: - self.dispatch('guild_available', guild) - else: - self.dispatch('guild_join', guild) + try: + if self._fetch_offline: + await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0) + except asyncio.TimeoutError: + log.info('Timed out waiting for chunks while launching ready event.') + finally: + if guild.unavailable is False: + self.dispatch('guild_available', guild) + else: + self.dispatch('guild_join', guild) # remove the state try: @@ -429,16 +389,13 @@ class ConnectionState: if self._ready_task is not None: self._ready_task.cancel() - self._ready_state = ReadyState(launch=asyncio.Event(), guilds=[]) + self._ready_state = asyncio.Queue() self.clear() self.user = user = ClientUser(state=self, data=data['user']) self._users[user.id] = user - guilds = self._ready_state.guilds for guild_data in data['guilds']: - guild = self._add_guild_from_data(guild_data) - if (not self.is_bot and not guild.unavailable) or guild.large: - guilds.append((guild, guild.unavailable)) + self._add_guild_from_data(guild_data) for relationship in data.get('relationships', []): try: @@ -772,14 +729,18 @@ class ConnectionState: return self._add_guild_from_data(data) + async def chunk_guild(self, guild): + future = self.loop.create_future() + request = ChunkRequest(guild.id, future) + self._chunk_requests.append(request) + await self.chunker(guild.id, nonce=request.nonce) + await request.future + async def _chunk_and_dispatch(self, guild, unavailable): - chunks = list(self.chunks_needed(guild)) - await self.chunker(guild.id) - if chunks: - try: - await utils.sane_wait_for(chunks, timeout=len(chunks)) - except asyncio.TimeoutError: - log.info('Somehow timed out waiting for chunks.') + try: + await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0) + except asyncio.TimeoutError: + log.info('Somehow timed out waiting for chunks.') if unavailable is False: self.dispatch('guild_available', guild) @@ -794,25 +755,17 @@ class ConnectionState: guild = self._get_create_guild(data) + try: + # Notify the on_ready state, if any, that this guild is complete. + self._ready_state.put_nowait(guild) + except AttributeError: + pass + else: + # If we're waiting for the event, put the rest on hold + return + # check if it requires chunking if guild.large: - if unavailable is False: - # check if we're waiting for 'useful' READY - # and if we are, we don't want to dispatch any - # event such as guild_join or guild_available - # because we're still in the 'READY' phase. Or - # so we say. - try: - state = self._ready_state - state.launch.set() - state.guilds.append((guild, unavailable)) - except AttributeError: - # the _ready_state attribute is only there during - # processing of useful READY. - pass - else: - return - # since we're not waiting for 'useful' READY we'll just # do the chunk request here if wanted if self._fetch_offline: @@ -929,8 +882,8 @@ class ConnectionState: if existing is None or existing.joined_at is None: guild._add_member(member) - self.process_listeners(ListenerType.chunk, guild, len(members)) - self.process_listeners(ListenerType.query_members, (guild_id, data.get('nonce')), members) + if data.get('chunk_index', 0) + 1 == data.get('chunk_count'): + self.process_chunk_requests(guild_id, data.get('nonce'), members) def parse_guild_integrations_update(self, data): guild = self._get_guild(int(data['guild_id'])) @@ -1054,21 +1007,6 @@ class ConnectionState: def create_message(self, *, channel, data): return Message(state=self, channel=channel, data=data) - def receive_chunk(self, guild_id): - future = self.loop.create_future() - listener = Listener(ListenerType.chunk, future, lambda s: s.id == guild_id) - self._listeners.append(listener) - return future - - def receive_member_query(self, guild_id, nonce): - def predicate(args, *, guild_id=guild_id, nonce=nonce): - return args == (guild_id, nonce) - - future = self.loop.create_future() - listener = Listener(ListenerType.query_members, future, predicate) - self._listeners.append(listener) - return future - class AutoShardedConnectionState(ConnectionState): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -1091,51 +1029,31 @@ class AutoShardedConnectionState(ConnectionState): ws = self._get_websocket(guild_id, shard_id=shard_id) await ws.request_chunks(guild_id, query=query, limit=limit, nonce=nonce) - async def request_offline_members(self, guilds, *, shard_id): - # get all the chunks - chunks = [] - for guild in guilds: - chunks.extend(self.chunks_needed(guild)) - - # we only want to request ~75 guilds per chunk request. - splits = [guilds[i:i + 75] for i in range(0, len(guilds), 75)] - for split in splits: - await self.chunker([g.id for g in split], shard_id=shard_id) - - # wait for the chunks - if chunks: - try: - await utils.sane_wait_for(chunks, timeout=len(chunks) * 30.0) - except asyncio.TimeoutError: - log.info('Somehow timed out waiting for chunks.') - else: - log.info('Finished requesting guild member chunks for %d guilds.', len(guilds)) - async def _delay_ready(self): await self.shards_launched.wait() - launch = self._ready_state.launch + processed = [] while True: # this snippet of code is basically waiting N seconds # until the last GUILD_CREATE was sent try: - await asyncio.wait_for(launch.wait(), timeout=self.guild_ready_timeout) + guild = await asyncio.wait_for(self._ready_state.get(), timeout=self.guild_ready_timeout) except asyncio.TimeoutError: break else: - launch.clear() - - guilds = sorted(self._ready_state.guilds, key=lambda g: g[0].shard_id) - - for shard_id, sub_guilds_info in itertools.groupby(guilds, key=lambda g: g[0].shard_id): - sub_guilds, sub_available = zip(*sub_guilds_info) - if self._fetch_offline: - await self.request_offline_members(sub_guilds, shard_id=shard_id) + try: + if self._fetch_offline: + await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0) + except asyncio.TimeoutError: + log.info('Timed out waiting for chunks while launching ready event.') + finally: + processed.append(guild) + if guild.unavailable is False: + self.dispatch('guild_available', guild) + else: + self.dispatch('guild_join', guild) - for guild, unavailable in zip(sub_guilds, sub_available): - if unavailable is False: - self.dispatch('guild_available', guild) - else: - self.dispatch('guild_join', guild) + guilds = sorted(processed, key=lambda g: g.shard_id) + for shard_id, _ in itertools.groupby(guilds, key=lambda g: g.shard_id): self.dispatch('shard_ready', shard_id) # remove the state @@ -1155,16 +1073,13 @@ class AutoShardedConnectionState(ConnectionState): def parse_ready(self, data): if not hasattr(self, '_ready_state'): - self._ready_state = ReadyState(launch=asyncio.Event(), guilds=[]) + self._ready_state = asyncio.Queue() self.user = user = ClientUser(state=self, data=data['user']) self._users[user.id] = user - guilds = self._ready_state.guilds for guild_data in data['guilds']: - guild = self._add_guild_from_data(guild_data) - if guild.large: - guilds.append((guild, guild.unavailable)) + self._add_guild_from_data(guild_data) if self._messages: self._update_message_references()