|
|
@ -69,11 +69,11 @@ class EventItem: |
|
|
|
return hash(self.type) |
|
|
|
|
|
|
|
class Shard: |
|
|
|
def __init__(self, ws, client): |
|
|
|
def __init__(self, ws, client, queue_put): |
|
|
|
self.ws = ws |
|
|
|
self._client = client |
|
|
|
self._dispatch = client.dispatch |
|
|
|
self._queue = client._queue |
|
|
|
self._queue_put = queue_put |
|
|
|
self.loop = self._client.loop |
|
|
|
self._disconnect = False |
|
|
|
self._reconnect = client._reconnect |
|
|
@ -111,7 +111,7 @@ class Shard: |
|
|
|
self._dispatch('disconnect') |
|
|
|
self._dispatch('shard_disconnect', self.id) |
|
|
|
if not self._reconnect: |
|
|
|
self._queue.put_nowait(EventItem(EventType.close, self, e)) |
|
|
|
self._queue_put(EventItem(EventType.close, self, e)) |
|
|
|
return |
|
|
|
|
|
|
|
if self._client.is_closed(): |
|
|
@ -120,18 +120,18 @@ class Shard: |
|
|
|
if isinstance(e, OSError) and e.errno in (54, 10054): |
|
|
|
# If we get Connection reset by peer then always try to RESUME the connection. |
|
|
|
exc = ReconnectWebSocket(self.id, resume=True) |
|
|
|
self._queue.put_nowait(EventItem(EventType.resume, self, exc)) |
|
|
|
self._queue_put(EventItem(EventType.resume, self, exc)) |
|
|
|
return |
|
|
|
|
|
|
|
if isinstance(e, ConnectionClosed): |
|
|
|
if e.code != 1000: |
|
|
|
self._queue.put_nowait(EventItem(EventType.close, self, e)) |
|
|
|
self._queue_put(EventItem(EventType.close, self, e)) |
|
|
|
return |
|
|
|
|
|
|
|
retry = self._backoff.delay() |
|
|
|
log.error('Attempting a reconnect for shard ID %s in %.2fs', self.id, retry, exc_info=e) |
|
|
|
await asyncio.sleep(retry) |
|
|
|
self._queue.put_nowait(EventItem(EventType.reconnect, self, e)) |
|
|
|
self._queue_put(EventItem(EventType.reconnect, self, e)) |
|
|
|
|
|
|
|
async def worker(self): |
|
|
|
while not self._client.is_closed(): |
|
|
@ -139,7 +139,7 @@ class Shard: |
|
|
|
await self.ws.poll_event() |
|
|
|
except ReconnectWebSocket as e: |
|
|
|
etype = EventType.resume if e.resume else EventType.identify |
|
|
|
self._queue.put_nowait(EventItem(etype, self, e)) |
|
|
|
self._queue_put(EventItem(etype, self, e)) |
|
|
|
break |
|
|
|
except self._handled_exceptions as e: |
|
|
|
await self._handle_disconnect(e) |
|
|
@ -147,7 +147,7 @@ class Shard: |
|
|
|
except asyncio.CancelledError: |
|
|
|
break |
|
|
|
except Exception as e: |
|
|
|
self._queue.put_nowait(EventItem(EventType.terminate, self, e)) |
|
|
|
self._queue_put(EventItem(EventType.terminate, self, e)) |
|
|
|
break |
|
|
|
|
|
|
|
async def reidentify(self, exc): |
|
|
@ -164,7 +164,7 @@ class Shard: |
|
|
|
except asyncio.CancelledError: |
|
|
|
return |
|
|
|
except Exception as e: |
|
|
|
self._queue.put_nowait(EventItem(EventType.terminate, self, e)) |
|
|
|
self._queue_put(EventItem(EventType.terminate, self, e)) |
|
|
|
else: |
|
|
|
self.launch() |
|
|
|
|
|
|
@ -178,7 +178,7 @@ class Shard: |
|
|
|
except asyncio.CancelledError: |
|
|
|
return |
|
|
|
except Exception as e: |
|
|
|
self._queue.put_nowait(EventItem(EventType.terminate, self, e)) |
|
|
|
self._queue_put(EventItem(EventType.terminate, self, e)) |
|
|
|
else: |
|
|
|
self.launch() |
|
|
|
|
|
|
@ -373,7 +373,7 @@ class AutoShardedClient(Client): |
|
|
|
return await self.launch_shard(gateway, shard_id) |
|
|
|
|
|
|
|
# keep reading the shard while others connect |
|
|
|
self.__shards[shard_id] = ret = Shard(ws, self) |
|
|
|
self.__shards[shard_id] = ret = Shard(ws, self, self.__queue.put_nowait) |
|
|
|
ret.launch() |
|
|
|
|
|
|
|
async def launch_shards(self): |
|
|
|