Browse Source

feat: support valkey

pull/1488/head
phi 2 months ago
parent
commit
79a4321850
  1. 53
      src/socketio/async_redis_manager.py
  2. 53
      src/socketio/redis_manager.py

53
src/socketio/async_redis_manager.py

@ -1,5 +1,6 @@
import asyncio
import pickle
from urllib.parse import urlparse
try: # pragma: no cover
from redis import asyncio as aioredis
@ -12,6 +13,15 @@ except ImportError: # pragma: no cover
aioredis = None
RedisError = None
try: # pragma: no cover
from valkey import asyncio as valkey
from valkey.exceptions import ValkeyError
except ImportError: # pragma: no cover
valkey = None
ValkeyError = None
from .async_pubsub_manager import AsyncPubSubManager
from .redis_manager import parse_redis_sentinel_url
@ -47,38 +57,58 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None, redis_options=None):
if aioredis is None:
if aioredis is None and valkey is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your virtualenv).')
if not hasattr(aioredis.Redis, 'from_url'):
'(Run "pip install redis" or "pip install valkey" '
'in your virtualenv).')
if aioredis and not hasattr(aioredis.Redis, 'from_url'):
raise RuntimeError('Version 2 of aioredis package is required.')
super().__init__(channel=channel, write_only=write_only, logger=logger)
self.redis_url = url
self.redis_options = redis_options or {}
self._redis_connect()
def _get_redis_module_and_error(self):
parsed_url = urlparse(self.redis_url)
schema = parsed_url.scheme.split('+', 1)[0].lower()
if schema == 'redis':
if aioredis is None or RedisError is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your virtualenv).')
return aioredis, RedisError
if schema == 'valkey':
if valkey is None or ValkeyError is None:
raise RuntimeError('Valkey package is not installed '
'(Run "pip install valkey" in your virtualenv).')
return valkey, ValkeyError
error_msg = f'Unsupported Redis URL schema: {schema}'
raise ValueError(error_msg)
def _redis_connect(self):
if not self.redis_url.startswith('redis+sentinel://'):
self.redis = aioredis.Redis.from_url(self.redis_url,
**self.redis_options)
else:
module, _ = self._get_redis_module_and_error()
parsed_url = urlparse(self.redis_url)
if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}:
sentinels, service_name, connection_kwargs = \
parse_redis_sentinel_url(self.redis_url)
kwargs = self.redis_options
kwargs.update(connection_kwargs)
sentinel = aioredis.sentinel.Sentinel(sentinels, **kwargs)
sentinel = module.sentinel.Sentinel(sentinels, **kwargs)
self.redis = sentinel.master_for(service_name or self.channel)
else:
self.redis = module.Redis.from_url(self.redis_url,
**self.redis_options)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
async def _publish(self, data):
retry = True
_, error = self._get_redis_module_and_error()
while True:
try:
if not retry:
self._redis_connect()
return await self.redis.publish(
self.channel, pickle.dumps(data))
except RedisError as exc:
except error as exc:
if retry:
self._get_logger().error(
'Cannot publish to redis... '
@ -96,6 +126,7 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
async def _redis_listen_with_retries(self):
retry_sleep = 1
connect = False
_, error = self._get_redis_module_and_error()
while True:
try:
if connect:
@ -104,10 +135,10 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
retry_sleep = 1
async for message in self.pubsub.listen():
yield message
except RedisError as exc:
except error as exc:
self._get_logger().error('Cannot receive from redis... '
'retrying in '
'{} secs'.format(retry_sleep),
f'{retry_sleep} secs',
extra={"redis_exception": str(exc)})
connect = True
await asyncio.sleep(retry_sleep)

53
src/socketio/redis_manager.py

@ -5,8 +5,17 @@ from urllib.parse import urlparse
try:
import redis
from redis.exceptions import RedisError
except ImportError:
redis = None
RedisError = None
try:
import valkey
from valkey.exceptions import ValkeyError
except ImportError:
valkey = None
ValkeyError = None
from .pubsub_manager import PubSubManager
@ -18,7 +27,7 @@ def parse_redis_sentinel_url(url):
redis+sentinel://[:password]@host1:port1,host2:port2,.../db/service_name
"""
parsed_url = urlparse(url)
if parsed_url.scheme != 'redis+sentinel':
if parsed_url.scheme not in {'redis+sentinel', 'valkey+sentinel'}:
raise ValueError('Invalid Redis Sentinel URL')
sentinels = []
for host_port in parsed_url.netloc.split('@')[-1].split(','):
@ -71,10 +80,10 @@ class RedisManager(PubSubManager): # pragma: no cover
def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None, redis_options=None):
if redis is None:
if redis is None and valkey is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your '
'virtualenv).')
'(Run "pip install redis" or "pip install valkey" '
'in your virtualenv).')
super().__init__(channel=channel, write_only=write_only, logger=logger)
self.redis_url = url
self.redis_options = redis_options or {}
@ -95,27 +104,46 @@ class RedisManager(PubSubManager): # pragma: no cover
'Redis requires a monkey patched socket library to work '
'with ' + self.server.async_mode)
def _get_redis_module_and_error(self):
parsed_url = urlparse(self.redis_url)
schema = parsed_url.scheme.split('+', 1)[0].lower()
if schema == 'redis':
if redis is None or RedisError is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your virtualenv).')
return redis, RedisError
if schema == 'valkey':
if valkey is None or ValkeyError is None:
raise RuntimeError('Valkey package is not installed '
'(Run "pip install valkey" in your virtualenv).')
return valkey, ValkeyError
error_msg = f'Unsupported Redis URL schema: {schema}'
raise ValueError(error_msg)
def _redis_connect(self):
if not self.redis_url.startswith('redis+sentinel://'):
self.redis = redis.Redis.from_url(self.redis_url,
**self.redis_options)
else:
module, _ = self._get_redis_module_and_error()
parsed_url = urlparse(self.redis_url)
if parsed_url.scheme in {"redis+sentinel", "valkey+sentinel"}:
sentinels, service_name, connection_kwargs = \
parse_redis_sentinel_url(self.redis_url)
kwargs = self.redis_options
kwargs.update(connection_kwargs)
sentinel = redis.sentinel.Sentinel(sentinels, **kwargs)
sentinel = module.sentinel.Sentinel(sentinels, **kwargs)
self.redis = sentinel.master_for(service_name or self.channel)
else:
self.redis = module.Redis.from_url(self.redis_url,
**self.redis_options)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
def _publish(self, data):
retry = True
_, error = self._get_redis_module_and_error()
while True:
try:
if not retry:
self._redis_connect()
return self.redis.publish(self.channel, pickle.dumps(data))
except redis.exceptions.RedisError as exc:
except error as exc:
if retry:
logger.error(
'Cannot publish to redis... retrying',
@ -132,6 +160,7 @@ class RedisManager(PubSubManager): # pragma: no cover
def _redis_listen_with_retries(self):
retry_sleep = 1
connect = False
_, error = self._get_redis_module_and_error()
while True:
try:
if connect:
@ -139,9 +168,9 @@ class RedisManager(PubSubManager): # pragma: no cover
self.pubsub.subscribe(self.channel)
retry_sleep = 1
yield from self.pubsub.listen()
except redis.exceptions.RedisError as exc:
except error as exc:
logger.error('Cannot receive from redis... '
'retrying in {} secs'.format(retry_sleep),
f'retrying in {retry_sleep} secs',
extra={"redis_exception": str(exc)})
connect = True
time.sleep(retry_sleep)

Loading…
Cancel
Save