|
|
@ -63,6 +63,7 @@ class ConnectionState: |
|
|
|
self.syncer = syncer |
|
|
|
self.is_bot = None |
|
|
|
self.shard_count = None |
|
|
|
self._fetch_offline = options.get('fetch_offline_members', True) |
|
|
|
self._listeners = [] |
|
|
|
self.clear() |
|
|
|
|
|
|
@ -197,16 +198,7 @@ class ConnectionState: |
|
|
|
yield self.receive_chunk(guild.id) |
|
|
|
|
|
|
|
@asyncio.coroutine |
|
|
|
def _delay_ready(self): |
|
|
|
launch = self._ready_state.launch |
|
|
|
while not launch.is_set(): |
|
|
|
# this snippet of code is basically waiting 2 seconds |
|
|
|
# until the last GUILD_CREATE was sent |
|
|
|
launch.set() |
|
|
|
yield from asyncio.sleep(2, loop=self.loop) |
|
|
|
|
|
|
|
guilds = self._ready_state.guilds |
|
|
|
|
|
|
|
def request_offline_members(self, guilds): |
|
|
|
# get all the chunks |
|
|
|
chunks = [] |
|
|
|
for guild in guilds: |
|
|
@ -224,6 +216,22 @@ class ConnectionState: |
|
|
|
except asyncio.TimeoutError: |
|
|
|
log.info('Somehow timed out waiting for chunks.') |
|
|
|
|
|
|
|
@asyncio.coroutine |
|
|
|
def _delay_ready(self): |
|
|
|
launch = self._ready_state.launch |
|
|
|
|
|
|
|
# only real bots wait for GUILD_CREATE streaming |
|
|
|
if self.is_bot: |
|
|
|
while not launch.is_set(): |
|
|
|
# this snippet of code is basically waiting 2 seconds |
|
|
|
# until the last GUILD_CREATE was sent |
|
|
|
launch.set() |
|
|
|
yield from asyncio.sleep(2, loop=self.loop) |
|
|
|
|
|
|
|
guilds = self._ready_state.guilds |
|
|
|
if self._fetch_offline: |
|
|
|
yield from self.request_offline_members(guilds) |
|
|
|
|
|
|
|
# remove the state |
|
|
|
try: |
|
|
|
del self._ready_state |
|
|
@ -260,6 +268,7 @@ class ConnectionState: |
|
|
|
factory, _ = _channel_factory(pm['type']) |
|
|
|
self._add_private_channel(factory(me=self.user, data=pm, state=self)) |
|
|
|
|
|
|
|
self.dispatch('connect') |
|
|
|
compat.create_task(self._delay_ready(), loop=self.loop) |
|
|
|
|
|
|
|
def parse_resumed(self, data): |
|
|
@ -477,8 +486,8 @@ class ConnectionState: |
|
|
|
|
|
|
|
@asyncio.coroutine |
|
|
|
def _chunk_and_dispatch(self, guild, unavailable): |
|
|
|
yield from self.chunker(guild) |
|
|
|
chunks = list(self.chunks_needed(guild)) |
|
|
|
yield from self.chunker(guild) |
|
|
|
if chunks: |
|
|
|
try: |
|
|
|
yield from asyncio.wait(chunks, timeout=len(chunks), loop=self.loop) |
|
|
@ -518,9 +527,10 @@ class ConnectionState: |
|
|
|
return |
|
|
|
|
|
|
|
# since we're not waiting for 'useful' READY we'll just |
|
|
|
# do the chunk request here |
|
|
|
compat.create_task(self._chunk_and_dispatch(guild, unavailable), loop=self.loop) |
|
|
|
return |
|
|
|
# do the chunk request here if wanted |
|
|
|
if self._fetch_offline: |
|
|
|
compat.create_task(self._chunk_and_dispatch(guild, unavailable), loop=self.loop) |
|
|
|
return |
|
|
|
|
|
|
|
# Dispatch available if newly available |
|
|
|
if unavailable == False: |
|
|
@ -740,6 +750,25 @@ class AutoShardedConnectionState(ConnectionState): |
|
|
|
self._ready_state = ReadyState(launch=asyncio.Event(), guilds=[]) |
|
|
|
self._ready_task = None |
|
|
|
|
|
|
|
@asyncio.coroutine |
|
|
|
def request_offline_members(self, guilds, *, shard_id): |
|
|
|
# get all the chunks |
|
|
|
chunks = [] |
|
|
|
for guild in guilds: |
|
|
|
chunks.extend(self.chunks_needed(guild)) |
|
|
|
|
|
|
|
# we only want to request ~75 guilds per chunk request. |
|
|
|
splits = [guilds[i:i + 75] for i in range(0, len(guilds), 75)] |
|
|
|
for split in splits: |
|
|
|
yield from self.chunker(split, shard_id=shard_id) |
|
|
|
|
|
|
|
# wait for the chunks |
|
|
|
if chunks: |
|
|
|
try: |
|
|
|
yield from asyncio.wait(chunks, timeout=len(chunks) * 30.0, loop=self.loop) |
|
|
|
except asyncio.TimeoutError: |
|
|
|
log.info('Somehow timed out waiting for chunks.') |
|
|
|
|
|
|
|
@asyncio.coroutine |
|
|
|
def _delay_ready(self): |
|
|
|
launch = self._ready_state.launch |
|
|
@ -749,30 +778,14 @@ class AutoShardedConnectionState(ConnectionState): |
|
|
|
launch.set() |
|
|
|
yield from asyncio.sleep(2.0 * self.shard_count, loop=self.loop) |
|
|
|
|
|
|
|
guilds = sorted(self._ready_state.guilds, key=lambda g: g.shard_id) |
|
|
|
|
|
|
|
# we only want to request ~75 guilds per chunk request. |
|
|
|
# we also want to split the chunks per shard_id |
|
|
|
for shard_id, sub_guilds in itertools.groupby(guilds, key=lambda g: g.shard_id): |
|
|
|
sub_guilds = list(sub_guilds) |
|
|
|
|
|
|
|
# split chunks by shard ID |
|
|
|
chunks = [] |
|
|
|
for guild in sub_guilds: |
|
|
|
chunks.extend(self.chunks_needed(guild)) |
|
|
|
if self._fetch_offline: |
|
|
|
guilds = sorted(self._ready_state.guilds, key=lambda g: g.shard_id) |
|
|
|
|
|
|
|
splits = [sub_guilds[i:i + 75] for i in range(0, len(sub_guilds), 75)] |
|
|
|
for split in splits: |
|
|
|
yield from self.chunker(split, shard_id=shard_id) |
|
|
|
|
|
|
|
# wait for the chunks |
|
|
|
if chunks: |
|
|
|
try: |
|
|
|
yield from asyncio.wait(chunks, timeout=len(chunks) * 30.0, loop=self.loop) |
|
|
|
except asyncio.TimeoutError: |
|
|
|
log.info('Somehow timed out waiting for chunks for %s shard_id' % shard_id) |
|
|
|
|
|
|
|
self.dispatch('shard_ready', shard_id) |
|
|
|
for shard_id, sub_guilds in itertools.groupby(guilds, key=lambda g: g.shard_id): |
|
|
|
sub_guilds = list(sub_guilds) |
|
|
|
yield from self.request_offline_members(sub_guilds, shard_id=shard_id) |
|
|
|
self.dispatch('shard_ready', shard_id) |
|
|
|
|
|
|
|
# remove the state |
|
|
|
try: |
|
|
@ -782,6 +795,9 @@ class AutoShardedConnectionState(ConnectionState): |
|
|
|
|
|
|
|
# regular users cannot shard so we won't worry about it here. |
|
|
|
|
|
|
|
# clear the current task |
|
|
|
self._ready_task = None |
|
|
|
|
|
|
|
# dispatch the event |
|
|
|
self.dispatch('ready') |
|
|
|
|
|
|
@ -801,5 +817,6 @@ class AutoShardedConnectionState(ConnectionState): |
|
|
|
factory, _ = _channel_factory(pm['type']) |
|
|
|
self._add_private_channel(factory(me=self.user, data=pm, state=self)) |
|
|
|
|
|
|
|
self.dispatch('connect') |
|
|
|
if self._ready_task is None: |
|
|
|
self._ready_task = compat.create_task(self._delay_ready(), loop=self.loop) |
|
|
|