Browse Source

Add support for Redis Sentinel

pull/1448/head
Miguel Grinberg 1 week ago
parent
commit
d3efeb4efc
Failed to extract signature
  1. 18
      src/socketio/async_redis_manager.py
  2. 47
      src/socketio/redis_manager.py
  3. 24
      tests/common/test_redis_manager.py

18
src/socketio/async_redis_manager.py

@ -13,6 +13,7 @@ except ImportError: # pragma: no cover
RedisError = None
from .async_pubsub_manager import AsyncPubSubManager
from .redis_manager import parse_redis_sentinel_url
class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
@ -29,15 +30,18 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
client_manager=socketio.AsyncRedisManager(url))
:param url: The connection URL for the Redis server. For a default Redis
store running on the same host, use ``redis://``. To use an
SSL connection, use ``rediss://``.
store running on the same host, use ``redis://``. To use a
TLS connection, use ``rediss://``. To use Redis Sentinel, use
``redis+sentinel://`` with a comma-separated list of hosts
and the service name after the db in the URL path. Example:
``redis+sentinel://user:pw@host1:1234,host2:2345/0/myredis``.
:param channel: The channel name on which the server sends and receives
notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
:param redis_options: additional keyword arguments to be passed to
``aioredis.from_url()``.
``Redis.from_url()`` or ``Sentinel()``.
"""
name = 'aioredis'
@ -54,8 +58,16 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
super().__init__(channel=channel, write_only=write_only, logger=logger)
def _redis_connect(self):
if not self.redis_url.startswith('redis+sentinel://'):
self.redis = aioredis.Redis.from_url(self.redis_url,
**self.redis_options)
else:
sentinels, service_name, connection_kwargs = \
parse_redis_sentinel_url(self.redis_url)
kwargs = self.redis_options
kwargs.update(connection_kwargs)
sentinel = aioredis.sentinel.Sentinel(sentinels, **kwargs)
self.redis = sentinel.master_for(service_name or self.channel)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
async def _publish(self, data):

47
src/socketio/redis_manager.py

@ -1,6 +1,7 @@
import logging
import pickle
import time
from urllib.parse import urlparse
try:
import redis
@ -12,6 +13,35 @@ from .pubsub_manager import PubSubManager
logger = logging.getLogger('socketio')
def parse_redis_sentinel_url(url):
"""Parse a Redis Sentinel URL with the format:
redis+sentinel://[:password]@host1:port1,host2:port2,.../db/service_name
"""
parsed_url = urlparse(url)
if parsed_url.scheme != 'redis+sentinel':
raise ValueError('Invalid Redis Sentinel URL')
sentinels = []
for host_port in parsed_url.netloc.split('@')[-1].split(','):
host, port = host_port.rsplit(':', 1)
sentinels.append((host, int(port)))
kwargs = {}
if parsed_url.username:
kwargs['username'] = parsed_url.username
if parsed_url.password:
kwargs['password'] = parsed_url.password
if parsed_url.query:
for key, value in urlparse.parse_qs(parsed_url.query).items():
kwargs[key] = value[0]
service_name = None
if parsed_url.path:
parts = parsed_url.path.split('/')
if len(parts) >= 2:
kwargs['db'] = int(parts[1])
if len(parts) >= 3:
service_name = parts[2]
return sentinels, service_name, kwargs
class RedisManager(PubSubManager): # pragma: no cover
"""Redis based client manager.
@ -27,15 +57,18 @@ class RedisManager(PubSubManager): # pragma: no cover
server = socketio.Server(client_manager=socketio.RedisManager(url))
:param url: The connection URL for the Redis server. For a default Redis
store running on the same host, use ``redis://``. To use an
SSL connection, use ``rediss://``.
store running on the same host, use ``redis://``. To use a
TLS connection, use ``rediss://``. To use Redis Sentinel, use
``redis+sentinel://`` with a comma-separated list of hosts
and the service name after the db in the URL path. Example:
``redis+sentinel://user:pw@host1:1234,host2:2345/0/myredis``.
:param channel: The channel name on which the server sends and receives
notifications. Must be the same in all the servers.
:param write_only: If set to ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
:param redis_options: additional keyword arguments to be passed to
``Redis.from_url()``.
``Redis.from_url()`` or ``Sentinel()``.
"""
name = 'redis'
@ -66,8 +99,16 @@ class RedisManager(PubSubManager): # pragma: no cover
'with ' + self.server.async_mode)
def _redis_connect(self):
if not self.redis_url.startswith('redis+sentinel://'):
self.redis = redis.Redis.from_url(self.redis_url,
**self.redis_options)
else:
sentinels, service_name, connection_kwargs = \
parse_redis_sentinel_url(self.redis_url)
kwargs = self.redis_options
kwargs.update(connection_kwargs)
sentinel = redis.sentinel.Sentinel(sentinels, **kwargs)
self.redis = sentinel.master_for(service_name or self.channel)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
def _publish(self, data):

24
tests/common/test_redis_manager.py

@ -0,0 +1,24 @@
import pytest
from socketio.redis_manager import parse_redis_sentinel_url
class TestPubSubManager:
def test_sentinel_url_parser(self):
with pytest.raises(ValueError):
parse_redis_sentinel_url('redis://localhost:6379/0')
assert parse_redis_sentinel_url(
'redis+sentinel://h1:6379,h2:6379'
) == (
[('h1', 6379), ('h2', 6379)],
None,
{}
)
assert parse_redis_sentinel_url(
'redis+sentinel://user:password@h1:6379,h2:6379,h1:6380/0/myredis'
) == (
[('h1', 6379), ('h2', 6379), ('h1', 6380)],
'myredis',
{'username': 'user', 'password': 'password', 'db': 0}
)
Loading…
Cancel
Save