diff --git a/src/socketio/async_redis_manager.py b/src/socketio/async_redis_manager.py index b099d9e..b8ac4a0 100644 --- a/src/socketio/async_redis_manager.py +++ b/src/socketio/async_redis_manager.py @@ -1,7 +1,7 @@ import asyncio from urllib.parse import urlparse -try: # pragma: no cover +try: from redis import asyncio as aioredis from redis.exceptions import RedisError except ImportError: # pragma: no cover @@ -12,11 +12,11 @@ except ImportError: # pragma: no cover aioredis = None RedisError = None -try: # pragma: no cover - from valkey import asyncio as valkey +try: + from valkey import asyncio as aiovalkey from valkey.exceptions import ValkeyError except ImportError: # pragma: no cover - valkey = None + aiovalkey = None ValkeyError = None from engineio import json @@ -24,7 +24,7 @@ from .async_pubsub_manager import AsyncPubSubManager from .redis_manager import parse_redis_sentinel_url -class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover +class AsyncRedisManager(AsyncPubSubManager): """Redis based client manager for asyncio servers. This class implements a Redis backend for event sharing across multiple @@ -55,12 +55,8 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover def __init__(self, url='redis://localhost:6379/0', channel='socketio', write_only=False, logger=None, redis_options=None): - if aioredis is None and valkey is None: - raise RuntimeError('Redis package is not installed ' - '(Run "pip install redis" or ' - '"pip install valkey" ' - 'in your virtualenv).') - if aioredis and not hasattr(aioredis.Redis, 'from_url'): + if aioredis and \ + not hasattr(aioredis.Redis, 'from_url'): # pragma: no cover raise RuntimeError('Version 2 of aioredis package is required.') super().__init__(channel=channel, write_only=write_only, logger=logger) self.redis_url = url @@ -69,20 +65,31 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover def _get_redis_module_and_error(self): parsed_url = urlparse(self.redis_url) - schema = parsed_url.scheme.split('+', 1)[0].lower() - if schema in ['redis', 'unix']: + scheme = parsed_url.scheme.split('+', 1)[0].lower() + if scheme in ['redis', 'rediss']: if aioredis is None or RedisError is None: raise RuntimeError('Redis package is not installed ' '(Run "pip install redis" ' 'in your virtualenv).') return aioredis, RedisError - if schema == 'valkey': - if valkey is None or ValkeyError is None: + if scheme in ['valkey', 'valkeys']: + if aiovalkey is None or ValkeyError is None: raise RuntimeError('Valkey package is not installed ' '(Run "pip install valkey" ' 'in your virtualenv).') - return valkey, ValkeyError - error_msg = f'Unsupported Redis URL schema: {schema}' + return aiovalkey, ValkeyError + if scheme == 'unix': + if aioredis is None or RedisError is None: + if aiovalkey is None or ValkeyError is None: + raise RuntimeError('Redis package is not installed ' + '(Run "pip install redis" ' + 'or "pip install valkey" ' + 'in your virtualenv).') + else: + return aiovalkey, ValkeyError + else: + return aioredis, RedisError + error_msg = f'Unsupported Redis URL scheme: {scheme}' raise ValueError(error_msg) def _redis_connect(self): @@ -100,7 +107,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover **self.redis_options) self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) - async def _publish(self, data): + async def _publish(self, data): # pragma: no cover retry = True _, error = self._get_redis_module_and_error() while True: @@ -124,7 +131,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover break - async def _redis_listen_with_retries(self): + async def _redis_listen_with_retries(self): # pragma: no cover retry_sleep = 1 connect = False _, error = self._get_redis_module_and_error() @@ -147,7 +154,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover if retry_sleep > 60: retry_sleep = 60 - async def _listen(self): + async def _listen(self): # pragma: no cover channel = self.channel.encode('utf-8') await self.pubsub.subscribe(self.channel) async for message in self._redis_listen_with_retries(): diff --git a/src/socketio/redis_manager.py b/src/socketio/redis_manager.py index 13d2022..5e58ef4 100644 --- a/src/socketio/redis_manager.py +++ b/src/socketio/redis_manager.py @@ -2,17 +2,17 @@ import logging import time from urllib.parse import urlparse -try: # pragma: no cover +try: import redis from redis.exceptions import RedisError -except ImportError: +except ImportError: # pragma: no cover redis = None RedisError = None -try: # pragma: no cover +try: import valkey from valkey.exceptions import ValkeyError -except ImportError: +except ImportError: # pragma: no cover valkey = None ValkeyError = None @@ -48,7 +48,7 @@ def parse_redis_sentinel_url(url): return sentinels, service_name, kwargs -class RedisManager(PubSubManager): # pragma: no cover +class RedisManager(PubSubManager): """Redis based client manager. This class implements a Redis backend for event sharing across multiple @@ -80,17 +80,12 @@ class RedisManager(PubSubManager): # pragma: no cover def __init__(self, url='redis://localhost:6379/0', channel='socketio', write_only=False, logger=None, redis_options=None): - if redis is None and valkey is None: - raise RuntimeError('Redis package is not installed ' - '(Run "pip install redis" ' - 'or "pip install valkey" ' - 'in your virtualenv).') super().__init__(channel=channel, write_only=write_only, logger=logger) self.redis_url = url self.redis_options = redis_options or {} self._redis_connect() - def initialize(self): + def initialize(self): # pragma: no cover super().initialize() monkey_patched = True @@ -107,20 +102,31 @@ class RedisManager(PubSubManager): # pragma: no cover def _get_redis_module_and_error(self): parsed_url = urlparse(self.redis_url) - schema = parsed_url.scheme.split('+', 1)[0].lower() - if schema in ['redis', 'unix']: + scheme = parsed_url.scheme.split('+', 1)[0].lower() + if scheme in ['redis', 'rediss']: if redis is None or RedisError is None: raise RuntimeError('Redis package is not installed ' '(Run "pip install redis" ' 'in your virtualenv).') return redis, RedisError - if schema == 'valkey': + if scheme in ['valkey', 'valkeys']: if valkey is None or ValkeyError is None: raise RuntimeError('Valkey package is not installed ' '(Run "pip install valkey" ' 'in your virtualenv).') return valkey, ValkeyError - error_msg = f'Unsupported Redis URL schema: {schema}' + if scheme == 'unix': + if redis is None or RedisError is None: + if valkey is None or ValkeyError is None: + raise RuntimeError('Redis package is not installed ' + '(Run "pip install redis" ' + 'or "pip install valkey" ' + 'in your virtualenv).') + else: + return valkey, ValkeyError + else: + return redis, RedisError + error_msg = f'Unsupported Redis URL scheme: {scheme}' raise ValueError(error_msg) def _redis_connect(self): @@ -138,7 +144,7 @@ class RedisManager(PubSubManager): # pragma: no cover **self.redis_options) self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) - def _publish(self, data): + def _publish(self, data): # pragma: no cover retry = True _, error = self._get_redis_module_and_error() while True: @@ -160,7 +166,7 @@ class RedisManager(PubSubManager): # pragma: no cover ) break - def _redis_listen_with_retries(self): + def _redis_listen_with_retries(self): # pragma: no cover retry_sleep = 1 connect = False _, error = self._get_redis_module_and_error() @@ -181,7 +187,7 @@ class RedisManager(PubSubManager): # pragma: no cover if retry_sleep > 60: retry_sleep = 60 - def _listen(self): + def _listen(self): # pragma: no cover channel = self.channel.encode('utf-8') self.pubsub.subscribe(self.channel) for message in self._redis_listen_with_retries(): diff --git a/tests/async/test_redis_manager.py b/tests/async/test_redis_manager.py new file mode 100644 index 0000000..01c0c37 --- /dev/null +++ b/tests/async/test_redis_manager.py @@ -0,0 +1,107 @@ +import pytest +import redis +import valkey + +from socketio import async_redis_manager +from socketio.async_redis_manager import AsyncRedisManager + + +class TestAsyncRedisManager: + def test_redis_not_installed(self): + saved_redis = async_redis_manager.aioredis + async_redis_manager.aioredis = None + + with pytest.raises(RuntimeError): + AsyncRedisManager('redis://') + assert AsyncRedisManager('unix:///var/sock/redis.sock') is not None + + async_redis_manager.aioredis = saved_redis + + def test_valkey_not_installed(self): + saved_valkey = async_redis_manager.aiovalkey + async_redis_manager.aiovalkey = None + + with pytest.raises(RuntimeError): + AsyncRedisManager('valkey://') + assert AsyncRedisManager('unix:///var/sock/redis.sock') is not None + + async_redis_manager.aiovalkey = saved_valkey + + def test_redis_valkey_not_installed(self): + saved_redis = async_redis_manager.aioredis + async_redis_manager.aioredis = None + saved_valkey = async_redis_manager.aiovalkey + async_redis_manager.aiovalkey = None + + with pytest.raises(RuntimeError): + AsyncRedisManager('redis://') + with pytest.raises(RuntimeError): + AsyncRedisManager('valkey://') + with pytest.raises(RuntimeError): + AsyncRedisManager('unix:///var/sock/redis.sock') + + async_redis_manager.aioredis = saved_redis + async_redis_manager.aiovalkey = saved_valkey + + def test_bad_url(self): + with pytest.raises(ValueError): + AsyncRedisManager('http://localhost:6379') + + def test_redis_connect(self): + urls = [ + 'redis://localhost:6379', + 'redis://localhost:6379/0', + 'redis://:password@localhost:6379', + 'redis://:password@localhost:6379/0', + 'redis://user:password@localhost:6379', + 'redis://user:password@localhost:6379/0', + + 'rediss://localhost:6379', + 'rediss://localhost:6379/0', + 'rediss://:password@localhost:6379', + 'rediss://:password@localhost:6379/0', + 'rediss://user:password@localhost:6379', + 'rediss://user:password@localhost:6379/0', + + 'unix:///var/sock/redis.sock', + 'unix:///var/sock/redis.sock?db=0', + 'unix://user@/var/sock/redis.sock', + 'unix://user@/var/sock/redis.sock?db=0', + + 'redis+sentinel://192.168.0.1:6379,192.168.0.2:6379/' + ] + for url in urls: + c = AsyncRedisManager(url) + assert isinstance(c.redis, redis.asyncio.Redis) + + def test_valkey_connect(self): + saved_redis = async_redis_manager.aioredis + async_redis_manager.aioredis = None + + urls = [ + 'valkey://localhost:6379', + 'valkey://localhost:6379/0', + 'valkey://:password@localhost:6379', + 'valkey://:password@localhost:6379/0', + 'valkey://user:password@localhost:6379', + 'valkey://user:password@localhost:6379/0', + + 'valkeys://localhost:6379', + 'valkeys://localhost:6379/0', + 'valkeys://:password@localhost:6379', + 'valkeys://:password@localhost:6379/0', + 'valkeys://user:password@localhost:6379', + 'valkeys://user:password@localhost:6379/0', + + 'unix:///var/sock/redis.sock', + 'unix:///var/sock/redis.sock?db=0', + 'unix://user@/var/sock/redis.sock', + 'unix://user@/var/sock/redis.sock?db=0', + + 'valkey+sentinel://192.168.0.1:6379,192.168.0.2:6379/' + ] + for url in urls: + c = AsyncRedisManager(url) + assert isinstance(c.redis, valkey.asyncio.Valkey) + + async_redis_manager.aioredis = saved_redis diff --git a/tests/common/test_redis_manager.py b/tests/common/test_redis_manager.py index 48dfb4a..3beadf3 100644 --- a/tests/common/test_redis_manager.py +++ b/tests/common/test_redis_manager.py @@ -1,9 +1,111 @@ import pytest +import redis +import valkey -from socketio.redis_manager import parse_redis_sentinel_url +from socketio import redis_manager +from socketio.redis_manager import RedisManager, parse_redis_sentinel_url class TestPubSubManager: + def test_redis_not_installed(self): + saved_redis = redis_manager.redis + redis_manager.redis = None + + with pytest.raises(RuntimeError): + RedisManager('redis://') + assert RedisManager('unix:///var/sock/redis.sock') is not None + + redis_manager.redis = saved_redis + + def test_valkey_not_installed(self): + saved_valkey = redis_manager.valkey + redis_manager.valkey = None + + with pytest.raises(RuntimeError): + RedisManager('valkey://') + assert RedisManager('unix:///var/sock/redis.sock') is not None + + redis_manager.valkey = saved_valkey + + def test_redis_valkey_not_installed(self): + saved_redis = redis_manager.redis + redis_manager.redis = None + saved_valkey = redis_manager.valkey + redis_manager.valkey = None + + with pytest.raises(RuntimeError): + RedisManager('redis://') + with pytest.raises(RuntimeError): + RedisManager('valkey://') + with pytest.raises(RuntimeError): + RedisManager('unix:///var/sock/redis.sock') + + redis_manager.redis = saved_redis + redis_manager.valkey = saved_valkey + + def test_bad_url(self): + with pytest.raises(ValueError): + RedisManager('http://localhost:6379') + + def test_redis_connect(self): + urls = [ + 'redis://localhost:6379', + 'redis://localhost:6379/0', + 'redis://:password@localhost:6379', + 'redis://:password@localhost:6379/0', + 'redis://user:password@localhost:6379', + 'redis://user:password@localhost:6379/0', + + 'rediss://localhost:6379', + 'rediss://localhost:6379/0', + 'rediss://:password@localhost:6379', + 'rediss://:password@localhost:6379/0', + 'rediss://user:password@localhost:6379', + 'rediss://user:password@localhost:6379/0', + + 'unix:///var/sock/redis.sock', + 'unix:///var/sock/redis.sock?db=0', + 'unix://user@/var/sock/redis.sock', + 'unix://user@/var/sock/redis.sock?db=0', + + 'redis+sentinel://192.168.0.1:6379,192.168.0.2:6379/' + ] + for url in urls: + c = RedisManager(url) + assert isinstance(c.redis, redis.Redis) + + def test_valkey_connect(self): + saved_redis = redis_manager.redis + redis_manager.redis = None + + urls = [ + 'valkey://localhost:6379', + 'valkey://localhost:6379/0', + 'valkey://:password@localhost:6379', + 'valkey://:password@localhost:6379/0', + 'valkey://user:password@localhost:6379', + 'valkey://user:password@localhost:6379/0', + + 'valkeys://localhost:6379', + 'valkeys://localhost:6379/0', + 'valkeys://:password@localhost:6379', + 'valkeys://:password@localhost:6379/0', + 'valkeys://user:password@localhost:6379', + 'valkeys://user:password@localhost:6379/0', + + 'unix:///var/sock/redis.sock', + 'unix:///var/sock/redis.sock?db=0', + 'unix://user@/var/sock/redis.sock', + 'unix://user@/var/sock/redis.sock?db=0', + + 'valkey+sentinel://192.168.0.1:6379,192.168.0.2:6379/' + ] + for url in urls: + c = RedisManager(url) + assert isinstance(c.redis, valkey.Valkey) + + redis_manager.redis = saved_redis + @pytest.mark.parametrize('rtype', ['redis', 'valkey']) def test_sentinel_url_parser(self, rtype): with pytest.raises(ValueError): diff --git a/tox.ini b/tox.ini index fc0116c..bb83059 100644 --- a/tox.ini +++ b/tox.ini @@ -23,6 +23,8 @@ deps= websocket-client aiohttp msgpack + redis + valkey pytest pytest-asyncio pytest-timeout