Browse Source

Catch all exceptionsin redis and rabbitmq managers (Fixes #1581)

pull/1583/head
Miguel Grinberg 2 days ago
parent
commit
80bb5c9b07
Failed to extract signature
  1. 29
      src/socketio/async_aiopika_manager.py
  2. 33
      src/socketio/async_redis_manager.py
  3. 17
      src/socketio/kombu_manager.py
  4. 32
      src/socketio/redis_manager.py

29
src/socketio/async_aiopika_manager.py

@ -97,18 +97,20 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover
), routing_key='*', ), routing_key='*',
) )
break break
except aio_pika.AMQPException: except aio_pika.exceptions.ChannelInvalidStateError:
# aio_pika raises this exception when the task is cancelled
raise asyncio.CancelledError()
except Exception as exc:
if retry: if retry:
self._get_logger().error('Cannot publish to rabbitmq... ' self._get_logger().error(
'retrying') 'Cannot publish to rabbitmq... retrying',
extra={"rabbitmq_exception": str(exc)})
retry = False retry = False
else: else:
self._get_logger().error( self._get_logger().error(
'Cannot publish to rabbitmq... giving up') 'Cannot publish to rabbitmq... giving up',
extra={"rabbitmq_exception": str(exc)})
break break
except aio_pika.exceptions.ChannelInvalidStateError:
# aio_pika raises this exception when the task is cancelled
raise asyncio.CancelledError()
async def _listen(self): async def _listen(self):
retry_sleep = 1 retry_sleep = 1
@ -125,12 +127,13 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover
async with message.process(): async with message.process():
yield message.body yield message.body
retry_sleep = 1 retry_sleep = 1
except aio_pika.AMQPException:
self._get_logger().error(
'Cannot receive from rabbitmq... '
'retrying in {} secs'.format(retry_sleep))
await asyncio.sleep(retry_sleep)
retry_sleep = min(retry_sleep * 2, 60)
except aio_pika.exceptions.ChannelInvalidStateError: except aio_pika.exceptions.ChannelInvalidStateError:
# aio_pika raises this exception when the task is cancelled # aio_pika raises this exception when the task is cancelled
raise asyncio.CancelledError() raise asyncio.CancelledError()
except Exception as exc:
self._get_logger().error(
'Cannot receive from rabbotmq... retrying in '
f'{retry_sleep} secs',
extra={"rabbitmq_exception": str(exc)})
await asyncio.sleep(retry_sleep)
retry_sleep = min(retry_sleep * 2, 60)

33
src/socketio/async_redis_manager.py

@ -75,7 +75,7 @@ class AsyncRedisManager(AsyncPubSubManager):
self.redis = None self.redis = None
self.pubsub = None self.pubsub = None
def _get_redis_module_and_error(self): def _get_redis_module(self):
parsed_url = urlparse(self.redis_url) parsed_url = urlparse(self.redis_url)
scheme = parsed_url.scheme.split('+', 1)[0].lower() scheme = parsed_url.scheme.split('+', 1)[0].lower()
if scheme in ['redis', 'rediss']: if scheme in ['redis', 'rediss']:
@ -83,13 +83,13 @@ class AsyncRedisManager(AsyncPubSubManager):
raise RuntimeError('Redis package is not installed ' raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" ' '(Run "pip install redis" '
'in your virtualenv).') 'in your virtualenv).')
return aioredis, RedisError return aioredis
if scheme in ['valkey', 'valkeys']: if scheme in ['valkey', 'valkeys']:
if aiovalkey is None or ValkeyError is None: if aiovalkey is None or ValkeyError is None:
raise RuntimeError('Valkey package is not installed ' raise RuntimeError('Valkey package is not installed '
'(Run "pip install valkey" ' '(Run "pip install valkey" '
'in your virtualenv).') 'in your virtualenv).')
return aiovalkey, ValkeyError return aiovalkey
if scheme == 'unix': if scheme == 'unix':
if aioredis is None or RedisError is None: if aioredis is None or RedisError is None:
if aiovalkey is None or ValkeyError is None: if aiovalkey is None or ValkeyError is None:
@ -98,14 +98,14 @@ class AsyncRedisManager(AsyncPubSubManager):
'or "pip install valkey" ' 'or "pip install valkey" '
'in your virtualenv).') 'in your virtualenv).')
else: else:
return aiovalkey, ValkeyError return aiovalkey
else: else:
return aioredis, RedisError return aioredis
error_msg = f'Unsupported Redis URL scheme: {scheme}' error_msg = f'Unsupported Redis URL scheme: {scheme}'
raise ValueError(error_msg) raise ValueError(error_msg)
def _redis_connect(self): def _redis_connect(self):
module, _ = self._get_redis_module_and_error() module = self._get_redis_module()
parsed_url = urlparse(self.redis_url) parsed_url = urlparse(self.redis_url)
if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}: if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}:
sentinels, service_name, connection_kwargs = \ sentinels, service_name, connection_kwargs = \
@ -121,30 +121,25 @@ class AsyncRedisManager(AsyncPubSubManager):
self.connected = True self.connected = True
async def _publish(self, data): # pragma: no cover async def _publish(self, data): # pragma: no cover
_, error = self._get_redis_module_and_error()
for retries_left in range(1, -1, -1): # 2 attempts for retries_left in range(1, -1, -1): # 2 attempts
try: try:
if not self.connected: if not self.connected:
self._redis_connect() self._redis_connect()
return await self.redis.publish( return await self.redis.publish(
self.channel, self.json.dumps(data)) self.channel, self.json.dumps(data))
except error as exc: except Exception as exc:
if retries_left > 0: if retries_left > 0:
self._get_logger().error( self._get_logger().error(
'Cannot publish to redis... ' 'Cannot publish to redis... retrying',
'retrying',
extra={"redis_exception": str(exc)}) extra={"redis_exception": str(exc)})
self.connected = False self.connected = False
else: else:
self._get_logger().error( self._get_logger().error(
'Cannot publish to redis... ' 'Cannot publish to redis... giving up',
'giving up',
extra={"redis_exception": str(exc)}) extra={"redis_exception": str(exc)})
break break
async def _redis_listen_with_retries(self): # pragma: no cover async def _redis_listen_with_retries(self): # pragma: no cover
_, error = self._get_redis_module_and_error()
retry_sleep = 1 retry_sleep = 1
subscribed = False subscribed = False
while True: while True:
@ -155,11 +150,11 @@ class AsyncRedisManager(AsyncPubSubManager):
retry_sleep = 1 retry_sleep = 1
async for message in self.pubsub.listen(): async for message in self.pubsub.listen():
yield message yield message
except error as exc: except Exception as exc:
self._get_logger().error('Cannot receive from redis... ' self._get_logger().error(
'retrying in ' 'Cannot receive from redis... retrying in '
f'{retry_sleep} secs', f'{retry_sleep} secs',
extra={"redis_exception": str(exc)}) extra={"redis_exception": str(exc)})
subscribed = False subscribed = False
await asyncio.sleep(retry_sleep) await asyncio.sleep(retry_sleep)
retry_sleep *= 2 retry_sleep *= 2

17
src/socketio/kombu_manager.py

@ -114,14 +114,16 @@ class KombuManager(PubSubManager): # pragma: no cover
self.publisher_connection) self.publisher_connection)
producer_publish(self.json.dumps(data)) producer_publish(self.json.dumps(data))
break break
except (OSError, kombu.exceptions.KombuError): except Exception as exc:
if retry: if retry:
self._get_logger().error('Cannot publish to rabbitmq... ' self._get_logger().error(
'retrying') 'Cannot publish to rabbitmq... retrying',
extra={"rabbitmq_exception": str(exc)})
retry = False retry = False
else: else:
self._get_logger().error( self._get_logger().error(
'Cannot publish to rabbitmq... giving up') 'Cannot publish to rabbitmq... giving up',
extra={"rabbitmq_exception": str(exc)})
break break
def _listen(self): def _listen(self):
@ -136,9 +138,10 @@ class KombuManager(PubSubManager): # pragma: no cover
message.ack() message.ack()
yield message.payload yield message.payload
retry_sleep = 1 retry_sleep = 1
except (OSError, kombu.exceptions.KombuError): except Exception as exc:
self._get_logger().error( self._get_logger().error(
'Cannot receive from rabbitmq... ' 'Cannot receive from rabbotmq... retrying in '
'retrying in {} secs'.format(retry_sleep)) f'{retry_sleep} secs',
extra={"rabbitmq_exception": str(exc)})
time.sleep(retry_sleep) time.sleep(retry_sleep)
retry_sleep = min(retry_sleep * 2, 60) retry_sleep = min(retry_sleep * 2, 60)

32
src/socketio/redis_manager.py

@ -1,4 +1,3 @@
import logging
import time import time
from urllib.parse import urlparse from urllib.parse import urlparse
@ -18,8 +17,6 @@ except ImportError: # pragma: no cover
from .pubsub_manager import PubSubManager from .pubsub_manager import PubSubManager
logger = logging.getLogger('socketio')
def parse_redis_sentinel_url(url): def parse_redis_sentinel_url(url):
"""Parse a Redis Sentinel URL with the format: """Parse a Redis Sentinel URL with the format:
@ -112,7 +109,7 @@ class RedisManager(PubSubManager):
'Redis requires a monkey patched socket library to work ' 'Redis requires a monkey patched socket library to work '
'with ' + self.server.async_mode) 'with ' + self.server.async_mode)
def _get_redis_module_and_error(self): def _get_redis_module(self):
parsed_url = urlparse(self.redis_url) parsed_url = urlparse(self.redis_url)
scheme = parsed_url.scheme.split('+', 1)[0].lower() scheme = parsed_url.scheme.split('+', 1)[0].lower()
if scheme in ['redis', 'rediss']: if scheme in ['redis', 'rediss']:
@ -120,13 +117,13 @@ class RedisManager(PubSubManager):
raise RuntimeError('Redis package is not installed ' raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" ' '(Run "pip install redis" '
'in your virtualenv).') 'in your virtualenv).')
return redis, RedisError return redis
if scheme in ['valkey', 'valkeys']: if scheme in ['valkey', 'valkeys']:
if valkey is None or ValkeyError is None: if valkey is None or ValkeyError is None:
raise RuntimeError('Valkey package is not installed ' raise RuntimeError('Valkey package is not installed '
'(Run "pip install valkey" ' '(Run "pip install valkey" '
'in your virtualenv).') 'in your virtualenv).')
return valkey, ValkeyError return valkey
if scheme == 'unix': if scheme == 'unix':
if redis is None or RedisError is None: if redis is None or RedisError is None:
if valkey is None or ValkeyError is None: if valkey is None or ValkeyError is None:
@ -135,14 +132,14 @@ class RedisManager(PubSubManager):
'or "pip install valkey" ' 'or "pip install valkey" '
'in your virtualenv).') 'in your virtualenv).')
else: else:
return valkey, ValkeyError return valkey
else: else:
return redis, RedisError return redis
error_msg = f'Unsupported Redis URL scheme: {scheme}' error_msg = f'Unsupported Redis URL scheme: {scheme}'
raise ValueError(error_msg) raise ValueError(error_msg)
def _redis_connect(self): def _redis_connect(self):
module, _ = self._get_redis_module_and_error() module = self._get_redis_module()
parsed_url = urlparse(self.redis_url) parsed_url = urlparse(self.redis_url)
if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}: if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}:
sentinels, service_name, connection_kwargs = \ sentinels, service_name, connection_kwargs = \
@ -158,28 +155,26 @@ class RedisManager(PubSubManager):
self.connected = True self.connected = True
def _publish(self, data): # pragma: no cover def _publish(self, data): # pragma: no cover
_, error = self._get_redis_module_and_error()
for retries_left in range(1, -1, -1): # 2 attempts for retries_left in range(1, -1, -1): # 2 attempts
try: try:
if not self.connected: if not self.connected:
self._redis_connect() self._redis_connect()
return self.redis.publish(self.channel, self.json.dumps(data)) return self.redis.publish(self.channel, self.json.dumps(data))
except error as exc: except Exception as exc:
if retries_left > 0: if retries_left > 0:
logger.error( self._get_logger().error(
'Cannot publish to redis... retrying', 'Cannot publish to redis... retrying',
extra={"redis_exception": str(exc)} extra={"redis_exception": str(exc)}
) )
self.connected = False self.connected = False
else: else:
logger.error( self._get_logger().error(
'Cannot publish to redis... giving up', 'Cannot publish to redis... giving up',
extra={"redis_exception": str(exc)} extra={"redis_exception": str(exc)}
) )
break break
def _redis_listen_with_retries(self): # pragma: no cover def _redis_listen_with_retries(self): # pragma: no cover
_, error = self._get_redis_module_and_error()
retry_sleep = 1 retry_sleep = 1
subscribed = False subscribed = False
while True: while True:
@ -189,10 +184,11 @@ class RedisManager(PubSubManager):
self.pubsub.subscribe(self.channel) self.pubsub.subscribe(self.channel)
retry_sleep = 1 retry_sleep = 1
yield from self.pubsub.listen() yield from self.pubsub.listen()
except error as exc: except Exception as exc:
logger.error('Cannot receive from redis... ' self._get_logger().error(
f'retrying in {retry_sleep} secs', 'Cannot receive from redis... '
extra={"redis_exception": str(exc)}) f'retrying in {retry_sleep} secs',
extra={"redis_exception": str(exc)})
subscribed = False subscribed = False
time.sleep(retry_sleep) time.sleep(retry_sleep)
retry_sleep *= 2 retry_sleep *= 2

Loading…
Cancel
Save