Browse Source

Restore support for rediss:// URLs, and add valkeys://

pull/1505/merge
Miguel Grinberg 2 weeks ago
parent
commit
6e2d0de12b
Failed to extract signature
  1. 47
      src/socketio/async_redis_manager.py
  2. 42
      src/socketio/redis_manager.py
  3. 107
      tests/async/test_redis_manager.py
  4. 104
      tests/common/test_redis_manager.py
  5. 2
      tox.ini

47
src/socketio/async_redis_manager.py

@ -1,7 +1,7 @@
import asyncio
from urllib.parse import urlparse
try: # pragma: no cover
try:
from redis import asyncio as aioredis
from redis.exceptions import RedisError
except ImportError: # pragma: no cover
@ -12,11 +12,11 @@ except ImportError: # pragma: no cover
aioredis = None
RedisError = None
try: # pragma: no cover
from valkey import asyncio as valkey
try:
from valkey import asyncio as aiovalkey
from valkey.exceptions import ValkeyError
except ImportError: # pragma: no cover
valkey = None
aiovalkey = None
ValkeyError = None
from engineio import json
@ -24,7 +24,7 @@ from .async_pubsub_manager import AsyncPubSubManager
from .redis_manager import parse_redis_sentinel_url
class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
class AsyncRedisManager(AsyncPubSubManager):
"""Redis based client manager for asyncio servers.
This class implements a Redis backend for event sharing across multiple
@ -55,12 +55,8 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None, redis_options=None):
if aioredis is None and valkey is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" or '
'"pip install valkey" '
'in your virtualenv).')
if aioredis and not hasattr(aioredis.Redis, 'from_url'):
if aioredis and \
not hasattr(aioredis.Redis, 'from_url'): # pragma: no cover
raise RuntimeError('Version 2 of aioredis package is required.')
super().__init__(channel=channel, write_only=write_only, logger=logger)
self.redis_url = url
@ -69,20 +65,31 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
def _get_redis_module_and_error(self):
parsed_url = urlparse(self.redis_url)
schema = parsed_url.scheme.split('+', 1)[0].lower()
if schema in ['redis', 'unix']:
scheme = parsed_url.scheme.split('+', 1)[0].lower()
if scheme in ['redis', 'rediss']:
if aioredis is None or RedisError is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" '
'in your virtualenv).')
return aioredis, RedisError
if schema == 'valkey':
if valkey is None or ValkeyError is None:
if scheme in ['valkey', 'valkeys']:
if aiovalkey is None or ValkeyError is None:
raise RuntimeError('Valkey package is not installed '
'(Run "pip install valkey" '
'in your virtualenv).')
return valkey, ValkeyError
error_msg = f'Unsupported Redis URL schema: {schema}'
return aiovalkey, ValkeyError
if scheme == 'unix':
if aioredis is None or RedisError is None:
if aiovalkey is None or ValkeyError is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" '
'or "pip install valkey" '
'in your virtualenv).')
else:
return aiovalkey, ValkeyError
else:
return aioredis, RedisError
error_msg = f'Unsupported Redis URL scheme: {scheme}'
raise ValueError(error_msg)
def _redis_connect(self):
@ -100,7 +107,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
**self.redis_options)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
async def _publish(self, data):
async def _publish(self, data): # pragma: no cover
retry = True
_, error = self._get_redis_module_and_error()
while True:
@ -124,7 +131,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
break
async def _redis_listen_with_retries(self):
async def _redis_listen_with_retries(self): # pragma: no cover
retry_sleep = 1
connect = False
_, error = self._get_redis_module_and_error()
@ -147,7 +154,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
if retry_sleep > 60:
retry_sleep = 60
async def _listen(self):
async def _listen(self): # pragma: no cover
channel = self.channel.encode('utf-8')
await self.pubsub.subscribe(self.channel)
async for message in self._redis_listen_with_retries():

42
src/socketio/redis_manager.py

@ -2,17 +2,17 @@ import logging
import time
from urllib.parse import urlparse
try: # pragma: no cover
try:
import redis
from redis.exceptions import RedisError
except ImportError:
except ImportError: # pragma: no cover
redis = None
RedisError = None
try: # pragma: no cover
try:
import valkey
from valkey.exceptions import ValkeyError
except ImportError:
except ImportError: # pragma: no cover
valkey = None
ValkeyError = None
@ -48,7 +48,7 @@ def parse_redis_sentinel_url(url):
return sentinels, service_name, kwargs
class RedisManager(PubSubManager): # pragma: no cover
class RedisManager(PubSubManager):
"""Redis based client manager.
This class implements a Redis backend for event sharing across multiple
@ -80,17 +80,12 @@ class RedisManager(PubSubManager): # pragma: no cover
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None, redis_options=None):
if redis is None and valkey is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" '
'or "pip install valkey" '
'in your virtualenv).')
super().__init__(channel=channel, write_only=write_only, logger=logger)
self.redis_url = url
self.redis_options = redis_options or {}
self._redis_connect()
def initialize(self):
def initialize(self): # pragma: no cover
super().initialize()
monkey_patched = True
@ -107,20 +102,31 @@ class RedisManager(PubSubManager): # pragma: no cover
def _get_redis_module_and_error(self):
parsed_url = urlparse(self.redis_url)
schema = parsed_url.scheme.split('+', 1)[0].lower()
if schema in ['redis', 'unix']:
scheme = parsed_url.scheme.split('+', 1)[0].lower()
if scheme in ['redis', 'rediss']:
if redis is None or RedisError is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" '
'in your virtualenv).')
return redis, RedisError
if schema == 'valkey':
if scheme in ['valkey', 'valkeys']:
if valkey is None or ValkeyError is None:
raise RuntimeError('Valkey package is not installed '
'(Run "pip install valkey" '
'in your virtualenv).')
return valkey, ValkeyError
error_msg = f'Unsupported Redis URL schema: {schema}'
if scheme == 'unix':
if redis is None or RedisError is None:
if valkey is None or ValkeyError is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" '
'or "pip install valkey" '
'in your virtualenv).')
else:
return valkey, ValkeyError
else:
return redis, RedisError
error_msg = f'Unsupported Redis URL scheme: {scheme}'
raise ValueError(error_msg)
def _redis_connect(self):
@ -138,7 +144,7 @@ class RedisManager(PubSubManager): # pragma: no cover
**self.redis_options)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
def _publish(self, data):
def _publish(self, data): # pragma: no cover
retry = True
_, error = self._get_redis_module_and_error()
while True:
@ -160,7 +166,7 @@ class RedisManager(PubSubManager): # pragma: no cover
)
break
def _redis_listen_with_retries(self):
def _redis_listen_with_retries(self): # pragma: no cover
retry_sleep = 1
connect = False
_, error = self._get_redis_module_and_error()
@ -181,7 +187,7 @@ class RedisManager(PubSubManager): # pragma: no cover
if retry_sleep > 60:
retry_sleep = 60
def _listen(self):
def _listen(self): # pragma: no cover
channel = self.channel.encode('utf-8')
self.pubsub.subscribe(self.channel)
for message in self._redis_listen_with_retries():

107
tests/async/test_redis_manager.py

@ -0,0 +1,107 @@
import pytest
import redis
import valkey
from socketio import async_redis_manager
from socketio.async_redis_manager import AsyncRedisManager
class TestAsyncRedisManager:
def test_redis_not_installed(self):
saved_redis = async_redis_manager.aioredis
async_redis_manager.aioredis = None
with pytest.raises(RuntimeError):
AsyncRedisManager('redis://')
assert AsyncRedisManager('unix:///var/sock/redis.sock') is not None
async_redis_manager.aioredis = saved_redis
def test_valkey_not_installed(self):
saved_valkey = async_redis_manager.aiovalkey
async_redis_manager.aiovalkey = None
with pytest.raises(RuntimeError):
AsyncRedisManager('valkey://')
assert AsyncRedisManager('unix:///var/sock/redis.sock') is not None
async_redis_manager.aiovalkey = saved_valkey
def test_redis_valkey_not_installed(self):
saved_redis = async_redis_manager.aioredis
async_redis_manager.aioredis = None
saved_valkey = async_redis_manager.aiovalkey
async_redis_manager.aiovalkey = None
with pytest.raises(RuntimeError):
AsyncRedisManager('redis://')
with pytest.raises(RuntimeError):
AsyncRedisManager('valkey://')
with pytest.raises(RuntimeError):
AsyncRedisManager('unix:///var/sock/redis.sock')
async_redis_manager.aioredis = saved_redis
async_redis_manager.aiovalkey = saved_valkey
def test_bad_url(self):
with pytest.raises(ValueError):
AsyncRedisManager('http://localhost:6379')
def test_redis_connect(self):
urls = [
'redis://localhost:6379',
'redis://localhost:6379/0',
'redis://:password@localhost:6379',
'redis://:password@localhost:6379/0',
'redis://user:password@localhost:6379',
'redis://user:password@localhost:6379/0',
'rediss://localhost:6379',
'rediss://localhost:6379/0',
'rediss://:password@localhost:6379',
'rediss://:password@localhost:6379/0',
'rediss://user:password@localhost:6379',
'rediss://user:password@localhost:6379/0',
'unix:///var/sock/redis.sock',
'unix:///var/sock/redis.sock?db=0',
'unix://user@/var/sock/redis.sock',
'unix://user@/var/sock/redis.sock?db=0',
'redis+sentinel://192.168.0.1:6379,192.168.0.2:6379/'
]
for url in urls:
c = AsyncRedisManager(url)
assert isinstance(c.redis, redis.asyncio.Redis)
def test_valkey_connect(self):
saved_redis = async_redis_manager.aioredis
async_redis_manager.aioredis = None
urls = [
'valkey://localhost:6379',
'valkey://localhost:6379/0',
'valkey://:password@localhost:6379',
'valkey://:password@localhost:6379/0',
'valkey://user:password@localhost:6379',
'valkey://user:password@localhost:6379/0',
'valkeys://localhost:6379',
'valkeys://localhost:6379/0',
'valkeys://:password@localhost:6379',
'valkeys://:password@localhost:6379/0',
'valkeys://user:password@localhost:6379',
'valkeys://user:password@localhost:6379/0',
'unix:///var/sock/redis.sock',
'unix:///var/sock/redis.sock?db=0',
'unix://user@/var/sock/redis.sock',
'unix://user@/var/sock/redis.sock?db=0',
'valkey+sentinel://192.168.0.1:6379,192.168.0.2:6379/'
]
for url in urls:
c = AsyncRedisManager(url)
assert isinstance(c.redis, valkey.asyncio.Valkey)
async_redis_manager.aioredis = saved_redis

104
tests/common/test_redis_manager.py

@ -1,9 +1,111 @@
import pytest
import redis
import valkey
from socketio.redis_manager import parse_redis_sentinel_url
from socketio import redis_manager
from socketio.redis_manager import RedisManager, parse_redis_sentinel_url
class TestPubSubManager:
def test_redis_not_installed(self):
saved_redis = redis_manager.redis
redis_manager.redis = None
with pytest.raises(RuntimeError):
RedisManager('redis://')
assert RedisManager('unix:///var/sock/redis.sock') is not None
redis_manager.redis = saved_redis
def test_valkey_not_installed(self):
saved_valkey = redis_manager.valkey
redis_manager.valkey = None
with pytest.raises(RuntimeError):
RedisManager('valkey://')
assert RedisManager('unix:///var/sock/redis.sock') is not None
redis_manager.valkey = saved_valkey
def test_redis_valkey_not_installed(self):
saved_redis = redis_manager.redis
redis_manager.redis = None
saved_valkey = redis_manager.valkey
redis_manager.valkey = None
with pytest.raises(RuntimeError):
RedisManager('redis://')
with pytest.raises(RuntimeError):
RedisManager('valkey://')
with pytest.raises(RuntimeError):
RedisManager('unix:///var/sock/redis.sock')
redis_manager.redis = saved_redis
redis_manager.valkey = saved_valkey
def test_bad_url(self):
with pytest.raises(ValueError):
RedisManager('http://localhost:6379')
def test_redis_connect(self):
urls = [
'redis://localhost:6379',
'redis://localhost:6379/0',
'redis://:password@localhost:6379',
'redis://:password@localhost:6379/0',
'redis://user:password@localhost:6379',
'redis://user:password@localhost:6379/0',
'rediss://localhost:6379',
'rediss://localhost:6379/0',
'rediss://:password@localhost:6379',
'rediss://:password@localhost:6379/0',
'rediss://user:password@localhost:6379',
'rediss://user:password@localhost:6379/0',
'unix:///var/sock/redis.sock',
'unix:///var/sock/redis.sock?db=0',
'unix://user@/var/sock/redis.sock',
'unix://user@/var/sock/redis.sock?db=0',
'redis+sentinel://192.168.0.1:6379,192.168.0.2:6379/'
]
for url in urls:
c = RedisManager(url)
assert isinstance(c.redis, redis.Redis)
def test_valkey_connect(self):
saved_redis = redis_manager.redis
redis_manager.redis = None
urls = [
'valkey://localhost:6379',
'valkey://localhost:6379/0',
'valkey://:password@localhost:6379',
'valkey://:password@localhost:6379/0',
'valkey://user:password@localhost:6379',
'valkey://user:password@localhost:6379/0',
'valkeys://localhost:6379',
'valkeys://localhost:6379/0',
'valkeys://:password@localhost:6379',
'valkeys://:password@localhost:6379/0',
'valkeys://user:password@localhost:6379',
'valkeys://user:password@localhost:6379/0',
'unix:///var/sock/redis.sock',
'unix:///var/sock/redis.sock?db=0',
'unix://user@/var/sock/redis.sock',
'unix://user@/var/sock/redis.sock?db=0',
'valkey+sentinel://192.168.0.1:6379,192.168.0.2:6379/'
]
for url in urls:
c = RedisManager(url)
assert isinstance(c.redis, valkey.Valkey)
redis_manager.redis = saved_redis
@pytest.mark.parametrize('rtype', ['redis', 'valkey'])
def test_sentinel_url_parser(self, rtype):
with pytest.raises(ValueError):

2
tox.ini

@ -23,6 +23,8 @@ deps=
websocket-client
aiohttp
msgpack
redis
valkey
pytest
pytest-asyncio
pytest-timeout

Loading…
Cancel
Save