From 80bb5c9b07e6b53e45f610ec29a24124c539d41b Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Sun, 14 Jun 2026 19:46:52 +0100 Subject: [PATCH] Catch all exceptionsin redis and rabbitmq managers (Fixes #1581) --- src/socketio/async_aiopika_manager.py | 29 ++++++++++++----------- src/socketio/async_redis_manager.py | 33 ++++++++++++--------------- src/socketio/kombu_manager.py | 17 ++++++++------ src/socketio/redis_manager.py | 32 ++++++++++++-------------- 4 files changed, 54 insertions(+), 57 deletions(-) diff --git a/src/socketio/async_aiopika_manager.py b/src/socketio/async_aiopika_manager.py index 626158b..b6d5e33 100644 --- a/src/socketio/async_aiopika_manager.py +++ b/src/socketio/async_aiopika_manager.py @@ -97,18 +97,20 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover ), routing_key='*', ) break - except aio_pika.AMQPException: + except aio_pika.exceptions.ChannelInvalidStateError: + # aio_pika raises this exception when the task is cancelled + raise asyncio.CancelledError() + except Exception as exc: if retry: - self._get_logger().error('Cannot publish to rabbitmq... ' - 'retrying') + self._get_logger().error( + 'Cannot publish to rabbitmq... retrying', + extra={"rabbitmq_exception": str(exc)}) retry = False else: self._get_logger().error( - 'Cannot publish to rabbitmq... giving up') + 'Cannot publish to rabbitmq... giving up', + extra={"rabbitmq_exception": str(exc)}) break - except aio_pika.exceptions.ChannelInvalidStateError: - # aio_pika raises this exception when the task is cancelled - raise asyncio.CancelledError() async def _listen(self): retry_sleep = 1 @@ -125,12 +127,13 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover async with message.process(): yield message.body retry_sleep = 1 - except aio_pika.AMQPException: - self._get_logger().error( - 'Cannot receive from rabbitmq... ' - 'retrying in {} secs'.format(retry_sleep)) - await asyncio.sleep(retry_sleep) - retry_sleep = min(retry_sleep * 2, 60) except aio_pika.exceptions.ChannelInvalidStateError: # aio_pika raises this exception when the task is cancelled raise asyncio.CancelledError() + except Exception as exc: + self._get_logger().error( + 'Cannot receive from rabbotmq... retrying in ' + f'{retry_sleep} secs', + extra={"rabbitmq_exception": str(exc)}) + await asyncio.sleep(retry_sleep) + retry_sleep = min(retry_sleep * 2, 60) diff --git a/src/socketio/async_redis_manager.py b/src/socketio/async_redis_manager.py index 862e610..52b30f4 100644 --- a/src/socketio/async_redis_manager.py +++ b/src/socketio/async_redis_manager.py @@ -75,7 +75,7 @@ class AsyncRedisManager(AsyncPubSubManager): self.redis = None self.pubsub = None - def _get_redis_module_and_error(self): + def _get_redis_module(self): parsed_url = urlparse(self.redis_url) scheme = parsed_url.scheme.split('+', 1)[0].lower() if scheme in ['redis', 'rediss']: @@ -83,13 +83,13 @@ class AsyncRedisManager(AsyncPubSubManager): raise RuntimeError('Redis package is not installed ' '(Run "pip install redis" ' 'in your virtualenv).') - return aioredis, RedisError + return aioredis if scheme in ['valkey', 'valkeys']: if aiovalkey is None or ValkeyError is None: raise RuntimeError('Valkey package is not installed ' '(Run "pip install valkey" ' 'in your virtualenv).') - return aiovalkey, ValkeyError + return aiovalkey if scheme == 'unix': if aioredis is None or RedisError is None: if aiovalkey is None or ValkeyError is None: @@ -98,14 +98,14 @@ class AsyncRedisManager(AsyncPubSubManager): 'or "pip install valkey" ' 'in your virtualenv).') else: - return aiovalkey, ValkeyError + return aiovalkey else: - return aioredis, RedisError + return aioredis error_msg = f'Unsupported Redis URL scheme: {scheme}' raise ValueError(error_msg) def _redis_connect(self): - module, _ = self._get_redis_module_and_error() + module = self._get_redis_module() parsed_url = urlparse(self.redis_url) if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}: sentinels, service_name, connection_kwargs = \ @@ -121,30 +121,25 @@ class AsyncRedisManager(AsyncPubSubManager): self.connected = True async def _publish(self, data): # pragma: no cover - _, error = self._get_redis_module_and_error() for retries_left in range(1, -1, -1): # 2 attempts try: if not self.connected: self._redis_connect() return await self.redis.publish( self.channel, self.json.dumps(data)) - except error as exc: + except Exception as exc: if retries_left > 0: self._get_logger().error( - 'Cannot publish to redis... ' - 'retrying', + 'Cannot publish to redis... retrying', extra={"redis_exception": str(exc)}) self.connected = False else: self._get_logger().error( - 'Cannot publish to redis... ' - 'giving up', + 'Cannot publish to redis... giving up', extra={"redis_exception": str(exc)}) - break async def _redis_listen_with_retries(self): # pragma: no cover - _, error = self._get_redis_module_and_error() retry_sleep = 1 subscribed = False while True: @@ -155,11 +150,11 @@ class AsyncRedisManager(AsyncPubSubManager): retry_sleep = 1 async for message in self.pubsub.listen(): yield message - except error as exc: - self._get_logger().error('Cannot receive from redis... ' - 'retrying in ' - f'{retry_sleep} secs', - extra={"redis_exception": str(exc)}) + except Exception as exc: + self._get_logger().error( + 'Cannot receive from redis... retrying in ' + f'{retry_sleep} secs', + extra={"redis_exception": str(exc)}) subscribed = False await asyncio.sleep(retry_sleep) retry_sleep *= 2 diff --git a/src/socketio/kombu_manager.py b/src/socketio/kombu_manager.py index d17505e..797dac4 100644 --- a/src/socketio/kombu_manager.py +++ b/src/socketio/kombu_manager.py @@ -114,14 +114,16 @@ class KombuManager(PubSubManager): # pragma: no cover self.publisher_connection) producer_publish(self.json.dumps(data)) break - except (OSError, kombu.exceptions.KombuError): + except Exception as exc: if retry: - self._get_logger().error('Cannot publish to rabbitmq... ' - 'retrying') + self._get_logger().error( + 'Cannot publish to rabbitmq... retrying', + extra={"rabbitmq_exception": str(exc)}) retry = False else: self._get_logger().error( - 'Cannot publish to rabbitmq... giving up') + 'Cannot publish to rabbitmq... giving up', + extra={"rabbitmq_exception": str(exc)}) break def _listen(self): @@ -136,9 +138,10 @@ class KombuManager(PubSubManager): # pragma: no cover message.ack() yield message.payload retry_sleep = 1 - except (OSError, kombu.exceptions.KombuError): + except Exception as exc: self._get_logger().error( - 'Cannot receive from rabbitmq... ' - 'retrying in {} secs'.format(retry_sleep)) + 'Cannot receive from rabbotmq... retrying in ' + f'{retry_sleep} secs', + extra={"rabbitmq_exception": str(exc)}) time.sleep(retry_sleep) retry_sleep = min(retry_sleep * 2, 60) diff --git a/src/socketio/redis_manager.py b/src/socketio/redis_manager.py index fb2e040..a2edfac 100644 --- a/src/socketio/redis_manager.py +++ b/src/socketio/redis_manager.py @@ -1,4 +1,3 @@ -import logging import time from urllib.parse import urlparse @@ -18,8 +17,6 @@ except ImportError: # pragma: no cover from .pubsub_manager import PubSubManager -logger = logging.getLogger('socketio') - def parse_redis_sentinel_url(url): """Parse a Redis Sentinel URL with the format: @@ -112,7 +109,7 @@ class RedisManager(PubSubManager): 'Redis requires a monkey patched socket library to work ' 'with ' + self.server.async_mode) - def _get_redis_module_and_error(self): + def _get_redis_module(self): parsed_url = urlparse(self.redis_url) scheme = parsed_url.scheme.split('+', 1)[0].lower() if scheme in ['redis', 'rediss']: @@ -120,13 +117,13 @@ class RedisManager(PubSubManager): raise RuntimeError('Redis package is not installed ' '(Run "pip install redis" ' 'in your virtualenv).') - return redis, RedisError + return redis if scheme in ['valkey', 'valkeys']: if valkey is None or ValkeyError is None: raise RuntimeError('Valkey package is not installed ' '(Run "pip install valkey" ' 'in your virtualenv).') - return valkey, ValkeyError + return valkey if scheme == 'unix': if redis is None or RedisError is None: if valkey is None or ValkeyError is None: @@ -135,14 +132,14 @@ class RedisManager(PubSubManager): 'or "pip install valkey" ' 'in your virtualenv).') else: - return valkey, ValkeyError + return valkey else: - return redis, RedisError + return redis error_msg = f'Unsupported Redis URL scheme: {scheme}' raise ValueError(error_msg) def _redis_connect(self): - module, _ = self._get_redis_module_and_error() + module = self._get_redis_module() parsed_url = urlparse(self.redis_url) if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}: sentinels, service_name, connection_kwargs = \ @@ -158,28 +155,26 @@ class RedisManager(PubSubManager): self.connected = True def _publish(self, data): # pragma: no cover - _, error = self._get_redis_module_and_error() for retries_left in range(1, -1, -1): # 2 attempts try: if not self.connected: self._redis_connect() return self.redis.publish(self.channel, self.json.dumps(data)) - except error as exc: + except Exception as exc: if retries_left > 0: - logger.error( + self._get_logger().error( 'Cannot publish to redis... retrying', extra={"redis_exception": str(exc)} ) self.connected = False else: - logger.error( + self._get_logger().error( 'Cannot publish to redis... giving up', extra={"redis_exception": str(exc)} ) break def _redis_listen_with_retries(self): # pragma: no cover - _, error = self._get_redis_module_and_error() retry_sleep = 1 subscribed = False while True: @@ -189,10 +184,11 @@ class RedisManager(PubSubManager): self.pubsub.subscribe(self.channel) retry_sleep = 1 yield from self.pubsub.listen() - except error as exc: - logger.error('Cannot receive from redis... ' - f'retrying in {retry_sleep} secs', - extra={"redis_exception": str(exc)}) + except Exception as exc: + self._get_logger().error( + 'Cannot receive from redis... ' + f'retrying in {retry_sleep} secs', + extra={"redis_exception": str(exc)}) subscribed = False time.sleep(retry_sleep) retry_sleep *= 2