Eugnee
3 weeks ago
committed by
GitHub
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with
25 additions and
12 deletions
-
src/socketio/async_redis_manager.py
-
src/socketio/redis_manager.py
|
|
@ -78,14 +78,19 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover |
|
|
|
self._redis_connect() |
|
|
|
return await self.redis.publish( |
|
|
|
self.channel, pickle.dumps(data)) |
|
|
|
except RedisError: |
|
|
|
except RedisError as exc: |
|
|
|
if retry: |
|
|
|
self._get_logger().error('Cannot publish to redis... ' |
|
|
|
'retrying') |
|
|
|
self._get_logger().error( |
|
|
|
'Cannot publish to redis... ' |
|
|
|
'retrying', |
|
|
|
extra={"redis_exception": str(exc)}) |
|
|
|
retry = False |
|
|
|
else: |
|
|
|
self._get_logger().error('Cannot publish to redis... ' |
|
|
|
'giving up') |
|
|
|
self._get_logger().error( |
|
|
|
'Cannot publish to redis... ' |
|
|
|
'giving up', |
|
|
|
extra={"redis_exception": str(exc)}) |
|
|
|
|
|
|
|
break |
|
|
|
|
|
|
|
async def _redis_listen_with_retries(self): |
|
|
@ -99,10 +104,11 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover |
|
|
|
retry_sleep = 1 |
|
|
|
async for message in self.pubsub.listen(): |
|
|
|
yield message |
|
|
|
except RedisError: |
|
|
|
except RedisError as exc: |
|
|
|
self._get_logger().error('Cannot receive from redis... ' |
|
|
|
'retrying in ' |
|
|
|
'{} secs'.format(retry_sleep)) |
|
|
|
'{} secs'.format(retry_sleep), |
|
|
|
extra={"redis_exception": str(exc)}) |
|
|
|
connect = True |
|
|
|
await asyncio.sleep(retry_sleep) |
|
|
|
retry_sleep *= 2 |
|
|
|
|
|
@ -115,12 +115,18 @@ class RedisManager(PubSubManager): # pragma: no cover |
|
|
|
if not retry: |
|
|
|
self._redis_connect() |
|
|
|
return self.redis.publish(self.channel, pickle.dumps(data)) |
|
|
|
except redis.exceptions.RedisError: |
|
|
|
except redis.exceptions.RedisError as exc: |
|
|
|
if retry: |
|
|
|
logger.error('Cannot publish to redis... retrying') |
|
|
|
logger.error( |
|
|
|
'Cannot publish to redis... retrying', |
|
|
|
extra={"redis_exception": str(exc)} |
|
|
|
) |
|
|
|
retry = False |
|
|
|
else: |
|
|
|
logger.error('Cannot publish to redis... giving up') |
|
|
|
logger.error( |
|
|
|
'Cannot publish to redis... giving up', |
|
|
|
extra={"redis_exception": str(exc)} |
|
|
|
) |
|
|
|
break |
|
|
|
|
|
|
|
def _redis_listen_with_retries(self): |
|
|
@ -133,9 +139,10 @@ class RedisManager(PubSubManager): # pragma: no cover |
|
|
|
self.pubsub.subscribe(self.channel) |
|
|
|
retry_sleep = 1 |
|
|
|
yield from self.pubsub.listen() |
|
|
|
except redis.exceptions.RedisError: |
|
|
|
except redis.exceptions.RedisError as exc: |
|
|
|
logger.error('Cannot receive from redis... ' |
|
|
|
'retrying in {} secs'.format(retry_sleep)) |
|
|
|
'retrying in {} secs'.format(retry_sleep), |
|
|
|
extra={"redis_exception": str(exc)}) |
|
|
|
connect = True |
|
|
|
time.sleep(retry_sleep) |
|
|
|
retry_sleep *= 2 |
|
|
|