diff --git a/socketio/asyncio_redis_manager.py b/socketio/asyncio_redis_manager.py index 7ecc34c..7cb2a53 100644 --- a/socketio/asyncio_redis_manager.py +++ b/socketio/asyncio_redis_manager.py @@ -1,3 +1,5 @@ +import asyncio +import logging import pickle from urllib.parse import urlparse @@ -8,6 +10,8 @@ except ImportError: from .asyncio_pubsub_manager import AsyncPubSubManager +logger = logging.getLogger('socketio') + def _parse_redis_url(url): p = urlparse(url) @@ -59,17 +63,39 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover super().__init__(channel=channel, write_only=write_only) async def _publish(self, data): - if self.pub is None: - self.pub = await aioredis.create_redis((self.host, self.port), - db=self.db, - password=self.password) - return await self.pub.publish(self.channel, pickle.dumps(data)) + retry = True + while True: + try: + if self.pub is None: + self.pub = await aioredis.create_redis( + (self.host, self.port), db=self.db, + password=self.password) + return await self.pub.publish(self.channel, + pickle.dumps(data)) + except (aioredis.RedisError, OSError): + if retry: + logger.error('Cannot publish to redis... retrying') + self.pub = None + retry = False + else: + logger.error('Cannot publish to redis... giving up') + break async def _listen(self): - if self.sub is None: - self.sub = await aioredis.create_redis((self.host, self.port), - db=self.db, - password=self.password) - self.ch = (await self.sub.subscribe(self.channel))[0] + retry_sleep = 1 while True: - return await self.ch.get() + try: + if self.sub is None: + self.sub = await aioredis.create_redis( + (self.host, self.port), db=self.db, + password=self.password) + self.ch = (await self.sub.subscribe(self.channel))[0] + return await self.ch.get() + except (aioredis.RedisError, OSError): + logger.error('Cannot receive from redis... ' + 'retrying in {} secs'.format(retry_sleep)) + self.sub = None + await asyncio.sleep(retry_sleep) + retry_sleep *= 2 + if retry_sleep > 60: + retry_sleep = 60 diff --git a/socketio/redis_manager.py b/socketio/redis_manager.py index 30f7978..9a6f499 100644 --- a/socketio/redis_manager.py +++ b/socketio/redis_manager.py @@ -1,4 +1,6 @@ +import logging import pickle +import time try: import redis @@ -7,6 +9,8 @@ except ImportError: from .pubsub_manager import PubSubManager +logger = logging.getLogger('socketio') + class RedisManager(PubSubManager): # pragma: no cover """Redis based client manager. @@ -38,8 +42,8 @@ class RedisManager(PubSubManager): # pragma: no cover raise RuntimeError('Redis package is not installed ' '(Run "pip install redis" in your ' 'virtualenv).') - self.redis = redis.Redis.from_url(url) - self.pubsub = self.redis.pubsub() + self.redis_url = url + self._redis_connect() super(RedisManager, self).__init__(channel=channel, write_only=write_only) @@ -58,13 +62,48 @@ class RedisManager(PubSubManager): # pragma: no cover 'Redis requires a monkey patched socket library to work ' 'with ' + self.server.async_mode) + def _redis_connect(self): + self.redis = redis.Redis.from_url(self.redis_url) + self.pubsub = self.redis.pubsub() + def _publish(self, data): - return self.redis.publish(self.channel, pickle.dumps(data)) + retry = True + while True: + try: + if not retry: + self._redis_connect() + return self.redis.publish(self.channel, pickle.dumps(data)) + except redis.exceptions.ConnectionError: + if retry: + logger.error('Cannot publish to redis... retrying') + retry = False + else: + logger.error('Cannot publish to redis... giving up') + break + + def _redis_listen_with_retries(self): + retry_sleep = 1 + connect = False + while True: + try: + if connect: + self._redis_connect() + self.pubsub.subscribe(self.channel) + for message in self.pubsub.listen(): + yield message + except redis.exceptions.ConnectionError: + logger.error('Cannot receive from redis... ' + 'retrying in {} secs'.format(retry_sleep)) + connect = True + time.sleep(retry_sleep) + retry_sleep *= 2 + if retry_sleep > 60: + retry_sleep = 60 def _listen(self): channel = self.channel.encode('utf-8') self.pubsub.subscribe(self.channel) - for message in self.pubsub.listen(): + for message in self._redis_listen_with_retries(): if message['channel'] == channel and \ message['type'] == 'message' and 'data' in message: yield message['data']