Browse Source

Rewrite chunking to work with intents.

This slows down chunking significantly for bots in a large number of
guilds since it goes down from 75 guilds/request to 1 guild/request.
However the logic was rewritten to fire the chunking request
immediately after receiving the GUILD_CREATE rather than waiting for
all the guilds in the ready stream before doing it.
pull/5849/head
Rapptz 5 years ago
parent
commit
930761e058
  1. 2
      discord/gateway.py
  2. 24
      discord/guild.py
  3. 211
      discord/state.py

2
discord/gateway.py

@ -370,7 +370,7 @@ class DiscordWebSocket:
}
if state._intents is not None:
payload['d']['intents'] = state._intents
payload['d']['intents'] = state._intents.value
await self.call_hooks('before_identify', self.shard_id, initial=self._initial_identify)
await self.send_as_json(payload)

24
discord/guild.py

@ -2045,11 +2045,6 @@ class Guild(Hashable):
This is a websocket operation and can be slow.
.. warning::
Most bots do not need to use this. It's mainly a helper
for bots who have disabled ``guild_subscriptions``.
.. versionadded:: 1.3
Parameters
@ -2059,7 +2054,7 @@ class Guild(Hashable):
requests all members.
limit: :class:`int`
The maximum number of members to send back. This must be
a number between 1 and 1000.
a number between 1 and 100.
cache: :class:`bool`
Whether to cache the members internally. This makes operations
such as :meth:`get_member` work for those that matched.
@ -2073,19 +2068,26 @@ class Guild(Hashable):
-------
asyncio.TimeoutError
The query timed out waiting for the members.
ValueError
Invalid parameters were passed to the function
Returns
--------
List[:class:`Member`]
The list of members that have matched the query.
"""
if user_ids is not None and query is not None:
raise TypeError('Cannot pass both query and user_ids')
if user_ids is None and query is None:
raise TypeError('Must pass either query or user_ids')
if query is None:
if query == '':
raise ValueError('Cannot pass empty query string.')
if user_ids is None:
raise ValueError('Must pass either query or user_ids')
if user_ids is not None and query is not None:
raise ValueError('Cannot pass both query and user_ids')
limit = limit or 5
limit = min(100, limit or 5)
return await self._state.query_members(self, query=query, limit=limit, user_ids=user_ids, cache=cache)
async def change_voice_state(self, *, channel, self_mute=False, self_deaf=False):

211
discord/state.py

@ -25,7 +25,7 @@ DEALINGS IN THE SOFTWARE.
"""
import asyncio
from collections import deque, namedtuple, OrderedDict
from collections import deque, OrderedDict
import copy
import datetime
import itertools
@ -49,20 +49,22 @@ from .channel import *
from .raw_models import *
from .member import Member
from .role import Role
from .enums import ChannelType, try_enum, Status, Enum
from .enums import ChannelType, try_enum, Status
from . import utils
from .flags import Intents
from .embeds import Embed
from .object import Object
from .invite import Invite
class ListenerType(Enum):
chunk = 0
query_members = 1
class ChunkRequest:
__slots__ = ('guild_id', 'nonce', 'future')
def __init__(self, guild_id, future):
self.guild_id = guild_id
self.nonce = os.urandom(16).hex()
self.future = future
Listener = namedtuple('Listener', ('type', 'future', 'predicate'))
log = logging.getLogger(__name__)
ReadyState = namedtuple('ReadyState', ('launch', 'guilds'))
async def logging_coroutine(coroutine, *, info):
try:
@ -100,7 +102,7 @@ class ConnectionState:
self.allowed_mentions = allowed_mentions
# Only disable cache if both fetch_offline and guild_subscriptions are off.
self._cache_members = (self._fetch_offline or self.guild_subscriptions)
self._listeners = []
self._chunk_requests = []
activity = options.get('activity', None)
if activity:
@ -120,7 +122,9 @@ class ConnectionState:
if intents is not None:
if not isinstance(intents, Intents):
raise TypeError('intents parameter must be Intent not %r' % type(intents))
intents = intents.value
if not intents.members and self._fetch_offline:
raise ValueError('Intents.members has be enabled to fetch offline members.')
self._activity = activity
self._status = status
@ -152,34 +156,20 @@ class ConnectionState:
# to reconnect loops which cause mass allocations and deallocations.
gc.collect()
def get_nonce(self):
return os.urandom(16).hex()
def process_listeners(self, listener_type, argument, result):
def process_chunk_requests(self, guild_id, nonce, members):
removed = []
for i, listener in enumerate(self._listeners):
if listener.type != listener_type:
continue
future = listener.future
for i, request in enumerate(self._chunk_requests):
future = request.future
if future.cancelled():
removed.append(i)
continue
try:
passed = listener.predicate(argument)
except Exception as exc:
future.set_exception(exc)
removed.append(i)
else:
if passed:
future.set_result(result)
if request.guild_id == guild_id and request.nonce == nonce:
future.set_result(members)
removed.append(i)
if listener.type == ListenerType.chunk:
break
for index in reversed(removed):
del self._listeners[index]
del self._chunk_requests[index]
def call_handlers(self, key, *args, **kwargs):
try:
@ -313,10 +303,6 @@ class ConnectionState:
self._add_guild(guild)
return guild
def chunks_needed(self, guild):
for _ in range(math.ceil(guild._member_count / 1000)):
yield self.receive_chunk(guild.id)
def _get_guild_channel(self, data):
channel_id = int(data['channel_id'])
try:
@ -333,43 +319,20 @@ class ConnectionState:
ws = self._get_websocket(guild_id) # This is ignored upstream
await ws.request_chunks(guild_id, query=query, limit=limit, nonce=nonce)
async def request_offline_members(self, guilds):
# get all the chunks
chunks = []
for guild in guilds:
chunks.extend(self.chunks_needed(guild))
# we only want to request ~75 guilds per chunk request.
splits = [guilds[i:i + 75] for i in range(0, len(guilds), 75)]
for split in splits:
await self.chunker([g.id for g in split])
# wait for the chunks
if chunks:
try:
await utils.sane_wait_for(chunks, timeout=len(chunks) * 30.0)
except asyncio.TimeoutError:
log.warning('Somehow timed out waiting for chunks.')
else:
log.info('Finished requesting guild member chunks for %d guilds.', len(guilds))
async def query_members(self, guild, query, limit, user_ids, cache):
guild_id = guild.id
ws = self._get_websocket(guild_id)
if ws is None:
raise RuntimeError('Somehow do not have a websocket for this guild_id')
# Limits over 1000 cannot be supported since
# the main use case for this is guild_subscriptions being disabled
# and they don't receive GUILD_MEMBER events which make computing
# member_count impossible. The only way to fix it is by limiting
# the limit parameter to 1 to 1000.
nonce = self.get_nonce()
future = self.receive_member_query(guild_id, nonce)
future = self.loop.create_future()
request = ChunkRequest(guild.id, future)
self._chunk_requests.append(request)
try:
# start the query operation
await ws.request_chunks(guild_id, query=query, limit=limit, user_ids=user_ids, nonce=nonce)
members = await asyncio.wait_for(future, timeout=5.0)
await ws.request_chunks(guild_id, query=query, limit=limit, user_ids=user_ids, nonce=request.nonce)
members = await asyncio.wait_for(future, timeout=30.0)
if cache:
for member in members:
@ -382,26 +345,23 @@ class ConnectionState:
async def _delay_ready(self):
try:
launch = self._ready_state.launch
# only real bots wait for GUILD_CREATE streaming
if self.is_bot:
while True:
# this snippet of code is basically waiting N seconds
# until the last GUILD_CREATE was sent
try:
await asyncio.wait_for(launch.wait(), timeout=self.guild_ready_timeout)
guild = await asyncio.wait_for(self._ready_state.get(), timeout=self.guild_ready_timeout)
except asyncio.TimeoutError:
break
else:
launch.clear()
guilds = next(zip(*self._ready_state.guilds), [])
try:
if self._fetch_offline:
await self.request_offline_members(guilds)
for guild, unavailable in self._ready_state.guilds:
if unavailable is False:
await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0)
except asyncio.TimeoutError:
log.info('Timed out waiting for chunks while launching ready event.')
finally:
if guild.unavailable is False:
self.dispatch('guild_available', guild)
else:
self.dispatch('guild_join', guild)
@ -429,16 +389,13 @@ class ConnectionState:
if self._ready_task is not None:
self._ready_task.cancel()
self._ready_state = ReadyState(launch=asyncio.Event(), guilds=[])
self._ready_state = asyncio.Queue()
self.clear()
self.user = user = ClientUser(state=self, data=data['user'])
self._users[user.id] = user
guilds = self._ready_state.guilds
for guild_data in data['guilds']:
guild = self._add_guild_from_data(guild_data)
if (not self.is_bot and not guild.unavailable) or guild.large:
guilds.append((guild, guild.unavailable))
self._add_guild_from_data(guild_data)
for relationship in data.get('relationships', []):
try:
@ -772,12 +729,16 @@ class ConnectionState:
return self._add_guild_from_data(data)
async def chunk_guild(self, guild):
future = self.loop.create_future()
request = ChunkRequest(guild.id, future)
self._chunk_requests.append(request)
await self.chunker(guild.id, nonce=request.nonce)
await request.future
async def _chunk_and_dispatch(self, guild, unavailable):
chunks = list(self.chunks_needed(guild))
await self.chunker(guild.id)
if chunks:
try:
await utils.sane_wait_for(chunks, timeout=len(chunks))
await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0)
except asyncio.TimeoutError:
log.info('Somehow timed out waiting for chunks.')
@ -794,25 +755,17 @@ class ConnectionState:
guild = self._get_create_guild(data)
# check if it requires chunking
if guild.large:
if unavailable is False:
# check if we're waiting for 'useful' READY
# and if we are, we don't want to dispatch any
# event such as guild_join or guild_available
# because we're still in the 'READY' phase. Or
# so we say.
try:
state = self._ready_state
state.launch.set()
state.guilds.append((guild, unavailable))
# Notify the on_ready state, if any, that this guild is complete.
self._ready_state.put_nowait(guild)
except AttributeError:
# the _ready_state attribute is only there during
# processing of useful READY.
pass
else:
# If we're waiting for the event, put the rest on hold
return
# check if it requires chunking
if guild.large:
# since we're not waiting for 'useful' READY we'll just
# do the chunk request here if wanted
if self._fetch_offline:
@ -929,8 +882,8 @@ class ConnectionState:
if existing is None or existing.joined_at is None:
guild._add_member(member)
self.process_listeners(ListenerType.chunk, guild, len(members))
self.process_listeners(ListenerType.query_members, (guild_id, data.get('nonce')), members)
if data.get('chunk_index', 0) + 1 == data.get('chunk_count'):
self.process_chunk_requests(guild_id, data.get('nonce'), members)
def parse_guild_integrations_update(self, data):
guild = self._get_guild(int(data['guild_id']))
@ -1054,21 +1007,6 @@ class ConnectionState:
def create_message(self, *, channel, data):
return Message(state=self, channel=channel, data=data)
def receive_chunk(self, guild_id):
future = self.loop.create_future()
listener = Listener(ListenerType.chunk, future, lambda s: s.id == guild_id)
self._listeners.append(listener)
return future
def receive_member_query(self, guild_id, nonce):
def predicate(args, *, guild_id=guild_id, nonce=nonce):
return args == (guild_id, nonce)
future = self.loop.create_future()
listener = Listener(ListenerType.query_members, future, predicate)
self._listeners.append(listener)
return future
class AutoShardedConnectionState(ConnectionState):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@ -1091,51 +1029,31 @@ class AutoShardedConnectionState(ConnectionState):
ws = self._get_websocket(guild_id, shard_id=shard_id)
await ws.request_chunks(guild_id, query=query, limit=limit, nonce=nonce)
async def request_offline_members(self, guilds, *, shard_id):
# get all the chunks
chunks = []
for guild in guilds:
chunks.extend(self.chunks_needed(guild))
# we only want to request ~75 guilds per chunk request.
splits = [guilds[i:i + 75] for i in range(0, len(guilds), 75)]
for split in splits:
await self.chunker([g.id for g in split], shard_id=shard_id)
# wait for the chunks
if chunks:
try:
await utils.sane_wait_for(chunks, timeout=len(chunks) * 30.0)
except asyncio.TimeoutError:
log.info('Somehow timed out waiting for chunks.')
else:
log.info('Finished requesting guild member chunks for %d guilds.', len(guilds))
async def _delay_ready(self):
await self.shards_launched.wait()
launch = self._ready_state.launch
processed = []
while True:
# this snippet of code is basically waiting N seconds
# until the last GUILD_CREATE was sent
try:
await asyncio.wait_for(launch.wait(), timeout=self.guild_ready_timeout)
guild = await asyncio.wait_for(self._ready_state.get(), timeout=self.guild_ready_timeout)
except asyncio.TimeoutError:
break
else:
launch.clear()
guilds = sorted(self._ready_state.guilds, key=lambda g: g[0].shard_id)
for shard_id, sub_guilds_info in itertools.groupby(guilds, key=lambda g: g[0].shard_id):
sub_guilds, sub_available = zip(*sub_guilds_info)
try:
if self._fetch_offline:
await self.request_offline_members(sub_guilds, shard_id=shard_id)
for guild, unavailable in zip(sub_guilds, sub_available):
if unavailable is False:
await asyncio.wait_for(self.chunk_guild(guild), timeout=60.0)
except asyncio.TimeoutError:
log.info('Timed out waiting for chunks while launching ready event.')
finally:
processed.append(guild)
if guild.unavailable is False:
self.dispatch('guild_available', guild)
else:
self.dispatch('guild_join', guild)
guilds = sorted(processed, key=lambda g: g.shard_id)
for shard_id, _ in itertools.groupby(guilds, key=lambda g: g.shard_id):
self.dispatch('shard_ready', shard_id)
# remove the state
@ -1155,16 +1073,13 @@ class AutoShardedConnectionState(ConnectionState):
def parse_ready(self, data):
if not hasattr(self, '_ready_state'):
self._ready_state = ReadyState(launch=asyncio.Event(), guilds=[])
self._ready_state = asyncio.Queue()
self.user = user = ClientUser(state=self, data=data['user'])
self._users[user.id] = user
guilds = self._ready_state.guilds
for guild_data in data['guilds']:
guild = self._add_guild_from_data(guild_data)
if guild.large:
guilds.append((guild, guild.unavailable))
self._add_guild_from_data(guild_data)
if self._messages:
self._update_message_references()

Loading…
Cancel
Save