From 7a7da80c5c4c8bd7a0bc6de54dcbd14e2d6310e0 Mon Sep 17 00:00:00 2001 From: Sam Mosleh Date: Wed, 20 Oct 2021 09:50:07 +0330 Subject: [PATCH] Unify redis usage --- src/socketio/asyncio_redis_manager.py | 52 +++++---------------------- src/socketio/redis_manager.py | 51 +++++--------------------- 2 files changed, 18 insertions(+), 85 deletions(-) diff --git a/src/socketio/asyncio_redis_manager.py b/src/socketio/asyncio_redis_manager.py index 41a62c6..23f5cae 100644 --- a/src/socketio/asyncio_redis_manager.py +++ b/src/socketio/asyncio_redis_manager.py @@ -36,67 +36,33 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover name = 'aioredis' def __init__(self, url='redis://localhost:6379/0', channel='socketio', - write_only=False, logger=None, redis_options=None): + write_only=False, logger=None, redis_options={}): if aioredis is None: raise RuntimeError('Redis package is not installed ' '(Run "pip install aioredis" in your ' 'virtualenv).') if not hasattr(aioredis.Redis, 'from_url'): raise RuntimeError('Version 2 of aioredis package is required.') - self.redis_url = url - self.redis_options = redis_options or {} - self._redis_connect() + self.redis = aioredis.Redis.from_url(url, **redis_options) + self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) super().__init__(channel=channel, write_only=write_only, logger=logger) - def _redis_connect(self): - self.redis = aioredis.Redis.from_url(self.redis_url, - **self.redis_options) - self.pubsub = self.redis.pubsub() - async def _publish(self, data): - retry = True - while True: - try: - if not retry: - self._redis_connect() - return await self.redis.publish( - self.channel, pickle.dumps(data)) - except aioredis.exceptions.RedisError: - if retry: - self._get_logger().error('Cannot publish to redis... ' - 'retrying') - retry = False - else: - self._get_logger().error('Cannot publish to redis... ' - 'giving up') - break + return await self.redis.publish(self.channel, pickle.dumps(data)) - async def _redis_listen_with_retries(self): + async def _listen(self): retry_sleep = 1 - connect = False while True: try: - if connect: - self._redis_connect() - await self.pubsub.subscribe(self.channel) - retry_sleep = 1 + await self.pubsub.subscribe(self.channel) + retry_sleep = 1 async for message in self.pubsub.listen(): - yield message + yield message['data'] except aioredis.exceptions.RedisError: self._get_logger().error('Cannot receive from redis... ' - 'retrying in ' - '{} secs'.format(retry_sleep)) - connect = True + 'retrying in {} secs'.format(retry_sleep)) await asyncio.sleep(retry_sleep) retry_sleep *= 2 if retry_sleep > 60: retry_sleep = 60 - async def _listen(self): - channel = self.channel.encode('utf-8') - await self.pubsub.subscribe(self.channel) - async for message in self._redis_listen_with_retries(): - if message['channel'] == channel and \ - message['type'] == 'message' and 'data' in message: - yield message['data'] - await self.pubsub.unsubscribe(self.channel) diff --git a/src/socketio/redis_manager.py b/src/socketio/redis_manager.py index 6ac0601..836dc5b 100644 --- a/src/socketio/redis_manager.py +++ b/src/socketio/redis_manager.py @@ -9,8 +9,6 @@ except ImportError: from .pubsub_manager import PubSubManager -logger = logging.getLogger('socketio') - class RedisManager(PubSubManager): # pragma: no cover """Redis based client manager. @@ -40,14 +38,13 @@ class RedisManager(PubSubManager): # pragma: no cover name = 'redis' def __init__(self, url='redis://localhost:6379/0', channel='socketio', - write_only=False, logger=None, redis_options=None): + write_only=False, logger=None, redis_options={}): if redis is None: raise RuntimeError('Redis package is not installed ' '(Run "pip install redis" in your ' 'virtualenv).') - self.redis_url = url - self.redis_options = redis_options or {} - self._redis_connect() + self.redis = redis.Redis.from_url(url, **redis_options) + self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) super(RedisManager, self).__init__(channel=channel, write_only=write_only, logger=logger) @@ -67,51 +64,21 @@ class RedisManager(PubSubManager): # pragma: no cover 'Redis requires a monkey patched socket library to work ' 'with ' + self.server.async_mode) - def _redis_connect(self): - self.redis = redis.Redis.from_url(self.redis_url, - **self.redis_options) - self.pubsub = self.redis.pubsub() - def _publish(self, data): - retry = True - while True: - try: - if not retry: - self._redis_connect() - return self.redis.publish(self.channel, pickle.dumps(data)) - except redis.exceptions.RedisError: - if retry: - logger.error('Cannot publish to redis... retrying') - retry = False - else: - logger.error('Cannot publish to redis... giving up') - break + return self.redis.publish(self.channel, pickle.dumps(data)) - def _redis_listen_with_retries(self): + def _listen(self): retry_sleep = 1 - connect = False while True: try: - if connect: - self._redis_connect() - self.pubsub.subscribe(self.channel) - retry_sleep = 1 + self.pubsub.subscribe(self.channel) + retry_sleep = 1 for message in self.pubsub.listen(): - yield message + yield message['data'] except redis.exceptions.RedisError: - logger.error('Cannot receive from redis... ' + self._get_logger().error('Cannot receive from redis... ' 'retrying in {} secs'.format(retry_sleep)) - connect = True time.sleep(retry_sleep) retry_sleep *= 2 if retry_sleep > 60: retry_sleep = 60 - - def _listen(self): - channel = self.channel.encode('utf-8') - self.pubsub.subscribe(self.channel) - for message in self._redis_listen_with_retries(): - if message['channel'] == channel and \ - message['type'] == 'message' and 'data' in message: - yield message['data'] - self.pubsub.unsubscribe(self.channel)