Browse Source

Add support for valkey in the Redis client managers (#1488)

pull/1499/head
phi-friday 3 weeks ago
committed by GitHub
parent
commit
36a89226a2
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 54
      src/socketio/async_redis_manager.py
  2. 58
      src/socketio/redis_manager.py
  3. 14
      tests/common/test_redis_manager.py

54
src/socketio/async_redis_manager.py

@ -1,5 +1,6 @@
import asyncio
import pickle
from urllib.parse import urlparse
try: # pragma: no cover
from redis import asyncio as aioredis
@ -12,6 +13,13 @@ except ImportError: # pragma: no cover
aioredis = None
RedisError = None
try: # pragma: no cover
from valkey import asyncio as valkey
from valkey.exceptions import ValkeyError
except ImportError: # pragma: no cover
valkey = None
ValkeyError = None
from .async_pubsub_manager import AsyncPubSubManager
from .redis_manager import parse_redis_sentinel_url
@ -47,38 +55,61 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None, redis_options=None):
if aioredis is None:
if aioredis is None and valkey is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your virtualenv).')
if not hasattr(aioredis.Redis, 'from_url'):
'(Run "pip install redis" or '
'"pip install valkey" '
'in your virtualenv).')
if aioredis and not hasattr(aioredis.Redis, 'from_url'):
raise RuntimeError('Version 2 of aioredis package is required.')
super().__init__(channel=channel, write_only=write_only, logger=logger)
self.redis_url = url
self.redis_options = redis_options or {}
self._redis_connect()
def _get_redis_module_and_error(self):
parsed_url = urlparse(self.redis_url)
schema = parsed_url.scheme.split('+', 1)[0].lower()
if schema == 'redis':
if aioredis is None or RedisError is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" '
'in your virtualenv).')
return aioredis, RedisError
if schema == 'valkey':
if valkey is None or ValkeyError is None:
raise RuntimeError('Valkey package is not installed '
'(Run "pip install valkey" '
'in your virtualenv).')
return valkey, ValkeyError
error_msg = f'Unsupported Redis URL schema: {schema}'
raise ValueError(error_msg)
def _redis_connect(self):
if not self.redis_url.startswith('redis+sentinel://'):
self.redis = aioredis.Redis.from_url(self.redis_url,
**self.redis_options)
else:
module, _ = self._get_redis_module_and_error()
parsed_url = urlparse(self.redis_url)
if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}:
sentinels, service_name, connection_kwargs = \
parse_redis_sentinel_url(self.redis_url)
kwargs = self.redis_options
kwargs.update(connection_kwargs)
sentinel = aioredis.sentinel.Sentinel(sentinels, **kwargs)
sentinel = module.sentinel.Sentinel(sentinels, **kwargs)
self.redis = sentinel.master_for(service_name or self.channel)
else:
self.redis = module.Redis.from_url(self.redis_url,
**self.redis_options)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
async def _publish(self, data):
retry = True
_, error = self._get_redis_module_and_error()
while True:
try:
if not retry:
self._redis_connect()
return await self.redis.publish(
self.channel, pickle.dumps(data))
except RedisError as exc:
except error as exc:
if retry:
self._get_logger().error(
'Cannot publish to redis... '
@ -96,6 +127,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
async def _redis_listen_with_retries(self):
retry_sleep = 1
connect = False
_, error = self._get_redis_module_and_error()
while True:
try:
if connect:
@ -104,10 +136,10 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
retry_sleep = 1
async for message in self.pubsub.listen():
yield message
except RedisError as exc:
except error as exc:
self._get_logger().error('Cannot receive from redis... '
'retrying in '
'{} secs'.format(retry_sleep),
f'{retry_sleep} secs',
extra={"redis_exception": str(exc)})
connect = True
await asyncio.sleep(retry_sleep)

58
src/socketio/redis_manager.py

@ -3,10 +3,19 @@ import pickle
import time
from urllib.parse import urlparse
try:
try: # pragma: no cover
import redis
from redis.exceptions import RedisError
except ImportError:
redis = None
RedisError = None
try: # pragma: no cover
import valkey
from valkey.exceptions import ValkeyError
except ImportError:
valkey = None
ValkeyError = None
from .pubsub_manager import PubSubManager
@ -18,7 +27,7 @@ def parse_redis_sentinel_url(url):
redis+sentinel://[:password]@host1:port1,host2:port2,.../db/service_name
"""
parsed_url = urlparse(url)
if parsed_url.scheme != 'redis+sentinel':
if parsed_url.scheme not in {'redis+sentinel', 'valkey+sentinel'}:
raise ValueError('Invalid Redis Sentinel URL')
sentinels = []
for host_port in parsed_url.netloc.split('@')[-1].split(','):
@ -71,10 +80,11 @@ class RedisManager(PubSubManager): # pragma: no cover
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None, redis_options=None):
if redis is None:
if redis is None and valkey is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your '
'virtualenv).')
'(Run "pip install redis" '
'or "pip install valkey" '
'in your virtualenv).')
super().__init__(channel=channel, write_only=write_only, logger=logger)
self.redis_url = url
self.redis_options = redis_options or {}
@ -95,27 +105,48 @@ class RedisManager(PubSubManager): # pragma: no cover
'Redis requires a monkey patched socket library to work '
'with ' + self.server.async_mode)
def _get_redis_module_and_error(self):
parsed_url = urlparse(self.redis_url)
schema = parsed_url.scheme.split('+', 1)[0].lower()
if schema == 'redis':
if redis is None or RedisError is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" '
'in your virtualenv).')
return redis, RedisError
if schema == 'valkey':
if valkey is None or ValkeyError is None:
raise RuntimeError('Valkey package is not installed '
'(Run "pip install valkey" '
'in your virtualenv).')
return valkey, ValkeyError
error_msg = f'Unsupported Redis URL schema: {schema}'
raise ValueError(error_msg)
def _redis_connect(self):
if not self.redis_url.startswith('redis+sentinel://'):
self.redis = redis.Redis.from_url(self.redis_url,
**self.redis_options)
else:
module, _ = self._get_redis_module_and_error()
parsed_url = urlparse(self.redis_url)
if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}:
sentinels, service_name, connection_kwargs = \
parse_redis_sentinel_url(self.redis_url)
kwargs = self.redis_options
kwargs.update(connection_kwargs)
sentinel = redis.sentinel.Sentinel(sentinels, **kwargs)
sentinel = module.sentinel.Sentinel(sentinels, **kwargs)
self.redis = sentinel.master_for(service_name or self.channel)
else:
self.redis = module.Redis.from_url(self.redis_url,
**self.redis_options)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
def _publish(self, data):
retry = True
_, error = self._get_redis_module_and_error()
while True:
try:
if not retry:
self._redis_connect()
return self.redis.publish(self.channel, pickle.dumps(data))
except redis.exceptions.RedisError as exc:
except error as exc:
if retry:
logger.error(
'Cannot publish to redis... retrying',
@ -132,6 +163,7 @@ class RedisManager(PubSubManager): # pragma: no cover
def _redis_listen_with_retries(self):
retry_sleep = 1
connect = False
_, error = self._get_redis_module_and_error()
while True:
try:
if connect:
@ -139,9 +171,9 @@ class RedisManager(PubSubManager): # pragma: no cover
self.pubsub.subscribe(self.channel)
retry_sleep = 1
yield from self.pubsub.listen()
except redis.exceptions.RedisError as exc:
except error as exc:
logger.error('Cannot receive from redis... '
'retrying in {} secs'.format(retry_sleep),
f'retrying in {retry_sleep} secs',
extra={"redis_exception": str(exc)})
connect = True
time.sleep(retry_sleep)

14
tests/common/test_redis_manager.py

@ -4,33 +4,35 @@ from socketio.redis_manager import parse_redis_sentinel_url
class TestPubSubManager:
def test_sentinel_url_parser(self):
@pytest.mark.parametrize('rtype', ['redis', 'valkey'])
def test_sentinel_url_parser(self, rtype):
with pytest.raises(ValueError):
parse_redis_sentinel_url('redis://localhost:6379/0')
parse_redis_sentinel_url(f'{rtype}://localhost:6379/0')
assert parse_redis_sentinel_url(
'redis+sentinel://localhost:6379'
f'{rtype}+sentinel://localhost:6379'
) == (
[('localhost', 6379)],
None,
{}
)
assert parse_redis_sentinel_url(
'redis+sentinel://192.168.0.1:6379,192.168.0.2:6379/'
f'{rtype}+sentinel://192.168.0.1:6379,192.168.0.2:6379/'
) == (
[('192.168.0.1', 6379), ('192.168.0.2', 6379)],
None,
{}
)
assert parse_redis_sentinel_url(
'redis+sentinel://h1:6379,h2:6379/0'
f'{rtype}+sentinel://h1:6379,h2:6379/0'
) == (
[('h1', 6379), ('h2', 6379)],
None,
{'db': 0}
)
assert parse_redis_sentinel_url(
'redis+sentinel://user:password@h1:6379,h2:6379,h1:6380/0/myredis'
f'{rtype}+sentinel://'
'user:password@h1:6379,h2:6379,h1:6380/0/myredis'
) == (
[('h1', 6379), ('h2', 6379), ('h1', 6380)],
'myredis',

Loading…
Cancel
Save