Browse Source
Reset message queue sleep timer upon reconnect
pull/657/head
Ed Serzo
4 years ago
committed by
Miguel Grinberg
No known key found for this signature in database
GPG Key ID: 36848B262DF5F06C
3 changed files with
3 additions and
0 deletions
-
socketio/asyncio_aiopika_manager.py
-
socketio/asyncio_redis_manager.py
-
socketio/redis_manager.py
|
@ -89,6 +89,7 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover |
|
|
self.listener_queue = await self._queue( |
|
|
self.listener_queue = await self._queue( |
|
|
self.listener_channel, exchange |
|
|
self.listener_channel, exchange |
|
|
) |
|
|
) |
|
|
|
|
|
retry_sleep = 1 |
|
|
|
|
|
|
|
|
async with self.listener_queue.iterator() as queue_iter: |
|
|
async with self.listener_queue.iterator() as queue_iter: |
|
|
async for message in queue_iter: |
|
|
async for message in queue_iter: |
|
|
|
@ -95,6 +95,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover |
|
|
password=self.password, ssl=self.ssl |
|
|
password=self.password, ssl=self.ssl |
|
|
) |
|
|
) |
|
|
self.ch = (await self.sub.subscribe(self.channel))[0] |
|
|
self.ch = (await self.sub.subscribe(self.channel))[0] |
|
|
|
|
|
retry_sleep = 1 |
|
|
return await self.ch.get() |
|
|
return await self.ch.get() |
|
|
except (aioredis.RedisError, OSError): |
|
|
except (aioredis.RedisError, OSError): |
|
|
self._get_logger().error('Cannot receive from redis... ' |
|
|
self._get_logger().error('Cannot receive from redis... ' |
|
|
|
@ -94,6 +94,7 @@ class RedisManager(PubSubManager): # pragma: no cover |
|
|
if connect: |
|
|
if connect: |
|
|
self._redis_connect() |
|
|
self._redis_connect() |
|
|
self.pubsub.subscribe(self.channel) |
|
|
self.pubsub.subscribe(self.channel) |
|
|
|
|
|
retry_sleep = 1 |
|
|
for message in self.pubsub.listen(): |
|
|
for message in self.pubsub.listen(): |
|
|
yield message |
|
|
yield message |
|
|
except redis.exceptions.ConnectionError: |
|
|
except redis.exceptions.ConnectionError: |
|
|