diff --git a/src/socketio/async_redis_manager.py b/src/socketio/async_redis_manager.py index b8ac4a0..b55e1fc 100644 --- a/src/socketio/async_redis_manager.py +++ b/src/socketio/async_redis_manager.py @@ -95,16 +95,23 @@ class AsyncRedisManager(AsyncPubSubManager): def _redis_connect(self): module, _ = self._get_redis_module_and_error() parsed_url = urlparse(self.redis_url) + pubsub_defaults = { + "decode_responses": False, # keep channels as bytes + "socket_timeout": None, # block indefinitely on pub/sub + "health_check_interval": 0, # no periodic PINGs on read socket + "retry_on_timeout": False, # Pub/Sub loop handles reconnects + } + kwargs = {**pubsub_defaults, **(self.redis_options or {})} + if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}: sentinels, service_name, connection_kwargs = \ parse_redis_sentinel_url(self.redis_url) - kwargs = self.redis_options - kwargs.update(connection_kwargs) - sentinel = module.sentinel.Sentinel(sentinels, **kwargs) + connection_kwargs.update(kwargs) + sentinel = module.sentinel.Sentinel(sentinels, **connection_kwargs) self.redis = sentinel.master_for(service_name or self.channel) else: self.redis = module.Redis.from_url(self.redis_url, - **self.redis_options) + **kwargs) self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) async def _publish(self, data): # pragma: no cover diff --git a/tests/async/test_redis_manager.py b/tests/async/test_redis_manager.py index 01c0c37..52afb51 100644 --- a/tests/async/test_redis_manager.py +++ b/tests/async/test_redis_manager.py @@ -105,3 +105,40 @@ class TestAsyncRedisManager: assert isinstance(c.redis, valkey.asyncio.Valkey) async_redis_manager.aioredis = saved_redis + + def test_pubsub_defaults_pinned_for_redis(self): + c = AsyncRedisManager('redis://localhost:6379/0') + kw = c.redis.connection_pool.connection_kwargs + assert kw.get('decode_responses') is False + assert kw.get('socket_timeout') is None + assert kw.get('health_check_interval') == 0 + assert kw.get('retry_on_timeout') is False + + def test_pubsub_defaults_pinned_for_valkey(self): + saved_redis = async_redis_manager.aioredis + async_redis_manager.aioredis = None + try: + c = AsyncRedisManager('valkey://localhost:6379/0') + kw = c.redis.connection_pool.connection_kwargs + assert kw.get('decode_responses') is False + assert kw.get('socket_timeout') is None + assert kw.get('health_check_interval') == 0 + assert kw.get('retry_on_timeout') is False + finally: + async_redis_manager.aioredis = saved_redis + + def test_pubsub_defaults_can_be_overridden(self): + c = AsyncRedisManager( + 'redis://localhost:6379/0', + redis_options={ + 'decode_responses': True, + 'socket_timeout': 7, + 'health_check_interval': 13, + 'retry_on_timeout': True, + }, + ) + kw = c.redis.connection_pool.connection_kwargs + assert kw.get('decode_responses') is True + assert kw.get('socket_timeout') == 7 + assert kw.get('health_check_interval') == 13 + assert kw.get('retry_on_timeout') is True