Browse Source

Retry initial Redis connection (Fixes #1534) (#1536)

pull/1537/head
Miguel Grinberg 6 months ago
committed by GitHub
parent
commit
1e903e173a
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 20
      src/socketio/async_redis_manager.py
  2. 20
      src/socketio/redis_manager.py
  3. 16
      tests/async/test_redis_manager.py
  4. 16
      tests/common/test_redis_manager.py

20
src/socketio/async_redis_manager.py

@ -61,7 +61,9 @@ class AsyncRedisManager(AsyncPubSubManager):
super().__init__(channel=channel, write_only=write_only, logger=logger) super().__init__(channel=channel, write_only=write_only, logger=logger)
self.redis_url = url self.redis_url = url
self.redis_options = redis_options or {} self.redis_options = redis_options or {}
self._redis_connect() self.connected = False
self.redis = None
self.pubsub = None
def _get_redis_module_and_error(self): def _get_redis_module_and_error(self):
parsed_url = urlparse(self.redis_url) parsed_url = urlparse(self.redis_url)
@ -106,23 +108,23 @@ class AsyncRedisManager(AsyncPubSubManager):
self.redis = module.Redis.from_url(self.redis_url, self.redis = module.Redis.from_url(self.redis_url,
**self.redis_options) **self.redis_options)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
self.connected = True
async def _publish(self, data): # pragma: no cover async def _publish(self, data): # pragma: no cover
retry = True
_, error = self._get_redis_module_and_error() _, error = self._get_redis_module_and_error()
while True: for retries_left in range(1, -1, -1): # 2 attempts
try: try:
if not retry: if not self.connected:
self._redis_connect() self._redis_connect()
return await self.redis.publish( return await self.redis.publish(
self.channel, json.dumps(data)) self.channel, json.dumps(data))
except error as exc: except error as exc:
if retry: if retries_left > 0:
self._get_logger().error( self._get_logger().error(
'Cannot publish to redis... ' 'Cannot publish to redis... '
'retrying', 'retrying',
extra={"redis_exception": str(exc)}) extra={"redis_exception": str(exc)})
retry = False self.connected = False
else: else:
self._get_logger().error( self._get_logger().error(
'Cannot publish to redis... ' 'Cannot publish to redis... '
@ -133,11 +135,10 @@ class AsyncRedisManager(AsyncPubSubManager):
async def _redis_listen_with_retries(self): # pragma: no cover async def _redis_listen_with_retries(self): # pragma: no cover
retry_sleep = 1 retry_sleep = 1
connect = False
_, error = self._get_redis_module_and_error() _, error = self._get_redis_module_and_error()
while True: while True:
try: try:
if connect: if not self.connected:
self._redis_connect() self._redis_connect()
await self.pubsub.subscribe(self.channel) await self.pubsub.subscribe(self.channel)
retry_sleep = 1 retry_sleep = 1
@ -148,7 +149,7 @@ class AsyncRedisManager(AsyncPubSubManager):
'retrying in ' 'retrying in '
f'{retry_sleep} secs', f'{retry_sleep} secs',
extra={"redis_exception": str(exc)}) extra={"redis_exception": str(exc)})
connect = True self.connected = False
await asyncio.sleep(retry_sleep) await asyncio.sleep(retry_sleep)
retry_sleep *= 2 retry_sleep *= 2
if retry_sleep > 60: if retry_sleep > 60:
@ -156,7 +157,6 @@ class AsyncRedisManager(AsyncPubSubManager):
async def _listen(self): # pragma: no cover async def _listen(self): # pragma: no cover
channel = self.channel.encode('utf-8') channel = self.channel.encode('utf-8')
await self.pubsub.subscribe(self.channel)
async for message in self._redis_listen_with_retries(): async for message in self._redis_listen_with_retries():
if message['channel'] == channel and \ if message['channel'] == channel and \
message['type'] == 'message' and 'data' in message: message['type'] == 'message' and 'data' in message:

20
src/socketio/redis_manager.py

@ -83,7 +83,9 @@ class RedisManager(PubSubManager):
super().__init__(channel=channel, write_only=write_only, logger=logger) super().__init__(channel=channel, write_only=write_only, logger=logger)
self.redis_url = url self.redis_url = url
self.redis_options = redis_options or {} self.redis_options = redis_options or {}
self._redis_connect() self.connected = False
self.redis = None
self.pubsub = None
def initialize(self): # pragma: no cover def initialize(self): # pragma: no cover
super().initialize() super().initialize()
@ -143,22 +145,22 @@ class RedisManager(PubSubManager):
self.redis = module.Redis.from_url(self.redis_url, self.redis = module.Redis.from_url(self.redis_url,
**self.redis_options) **self.redis_options)
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
self.connected = True
def _publish(self, data): # pragma: no cover def _publish(self, data): # pragma: no cover
retry = True
_, error = self._get_redis_module_and_error() _, error = self._get_redis_module_and_error()
while True: for retries_left in range(1, -1, -1): # 2 attempts
try: try:
if not retry: if not self.connected:
self._redis_connect() self._redis_connect()
return self.redis.publish(self.channel, json.dumps(data)) return self.redis.publish(self.channel, json.dumps(data))
except error as exc: except error as exc:
if retry: if retries_left > 0:
logger.error( logger.error(
'Cannot publish to redis... retrying', 'Cannot publish to redis... retrying',
extra={"redis_exception": str(exc)} extra={"redis_exception": str(exc)}
) )
retry = False self.connected = False
else: else:
logger.error( logger.error(
'Cannot publish to redis... giving up', 'Cannot publish to redis... giving up',
@ -168,11 +170,10 @@ class RedisManager(PubSubManager):
def _redis_listen_with_retries(self): # pragma: no cover def _redis_listen_with_retries(self): # pragma: no cover
retry_sleep = 1 retry_sleep = 1
connect = False
_, error = self._get_redis_module_and_error() _, error = self._get_redis_module_and_error()
while True: while True:
try: try:
if connect: if not self.connected:
self._redis_connect() self._redis_connect()
self.pubsub.subscribe(self.channel) self.pubsub.subscribe(self.channel)
retry_sleep = 1 retry_sleep = 1
@ -181,7 +182,7 @@ class RedisManager(PubSubManager):
logger.error('Cannot receive from redis... ' logger.error('Cannot receive from redis... '
f'retrying in {retry_sleep} secs', f'retrying in {retry_sleep} secs',
extra={"redis_exception": str(exc)}) extra={"redis_exception": str(exc)})
connect = True self.connected = False
time.sleep(retry_sleep) time.sleep(retry_sleep)
retry_sleep *= 2 retry_sleep *= 2
if retry_sleep > 60: if retry_sleep > 60:
@ -189,7 +190,6 @@ class RedisManager(PubSubManager):
def _listen(self): # pragma: no cover def _listen(self): # pragma: no cover
channel = self.channel.encode('utf-8') channel = self.channel.encode('utf-8')
self.pubsub.subscribe(self.channel)
for message in self._redis_listen_with_retries(): for message in self._redis_listen_with_retries():
if message['channel'] == channel and \ if message['channel'] == channel and \
message['type'] == 'message' and 'data' in message: message['type'] == 'message' and 'data' in message:

16
tests/async/test_redis_manager.py

@ -12,7 +12,7 @@ class TestAsyncRedisManager:
async_redis_manager.aioredis = None async_redis_manager.aioredis = None
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
AsyncRedisManager('redis://') AsyncRedisManager('redis://')._redis_connect()
assert AsyncRedisManager('unix:///var/sock/redis.sock') is not None assert AsyncRedisManager('unix:///var/sock/redis.sock') is not None
async_redis_manager.aioredis = saved_redis async_redis_manager.aioredis = saved_redis
@ -22,7 +22,7 @@ class TestAsyncRedisManager:
async_redis_manager.aiovalkey = None async_redis_manager.aiovalkey = None
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
AsyncRedisManager('valkey://') AsyncRedisManager('valkey://')._redis_connect()
assert AsyncRedisManager('unix:///var/sock/redis.sock') is not None assert AsyncRedisManager('unix:///var/sock/redis.sock') is not None
async_redis_manager.aiovalkey = saved_valkey async_redis_manager.aiovalkey = saved_valkey
@ -34,18 +34,18 @@ class TestAsyncRedisManager:
async_redis_manager.aiovalkey = None async_redis_manager.aiovalkey = None
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
AsyncRedisManager('redis://') AsyncRedisManager('redis://')._redis_connect()
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
AsyncRedisManager('valkey://') AsyncRedisManager('valkey://')._redis_connect()
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
AsyncRedisManager('unix:///var/sock/redis.sock') AsyncRedisManager('unix:///var/sock/redis.sock')._redis_connect()
async_redis_manager.aioredis = saved_redis async_redis_manager.aioredis = saved_redis
async_redis_manager.aiovalkey = saved_valkey async_redis_manager.aiovalkey = saved_valkey
def test_bad_url(self): def test_bad_url(self):
with pytest.raises(ValueError): with pytest.raises(ValueError):
AsyncRedisManager('http://localhost:6379') AsyncRedisManager('http://localhost:6379')._redis_connect()
def test_redis_connect(self): def test_redis_connect(self):
urls = [ urls = [
@ -72,6 +72,8 @@ class TestAsyncRedisManager:
] ]
for url in urls: for url in urls:
c = AsyncRedisManager(url) c = AsyncRedisManager(url)
assert c.redis is None
c._redis_connect()
assert isinstance(c.redis, redis.asyncio.Redis) assert isinstance(c.redis, redis.asyncio.Redis)
def test_valkey_connect(self): def test_valkey_connect(self):
@ -102,6 +104,8 @@ class TestAsyncRedisManager:
] ]
for url in urls: for url in urls:
c = AsyncRedisManager(url) c = AsyncRedisManager(url)
assert c.redis is None
c._redis_connect()
assert isinstance(c.redis, valkey.asyncio.Valkey) assert isinstance(c.redis, valkey.asyncio.Valkey)
async_redis_manager.aioredis = saved_redis async_redis_manager.aioredis = saved_redis

16
tests/common/test_redis_manager.py

@ -12,7 +12,7 @@ class TestPubSubManager:
redis_manager.redis = None redis_manager.redis = None
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
RedisManager('redis://') RedisManager('redis://')._redis_connect()
assert RedisManager('unix:///var/sock/redis.sock') is not None assert RedisManager('unix:///var/sock/redis.sock') is not None
redis_manager.redis = saved_redis redis_manager.redis = saved_redis
@ -22,7 +22,7 @@ class TestPubSubManager:
redis_manager.valkey = None redis_manager.valkey = None
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
RedisManager('valkey://') RedisManager('valkey://')._redis_connect()
assert RedisManager('unix:///var/sock/redis.sock') is not None assert RedisManager('unix:///var/sock/redis.sock') is not None
redis_manager.valkey = saved_valkey redis_manager.valkey = saved_valkey
@ -34,18 +34,18 @@ class TestPubSubManager:
redis_manager.valkey = None redis_manager.valkey = None
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
RedisManager('redis://') RedisManager('redis://')._redis_connect()
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
RedisManager('valkey://') RedisManager('valkey://')._redis_connect()
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
RedisManager('unix:///var/sock/redis.sock') RedisManager('unix:///var/sock/redis.sock')._redis_connect()
redis_manager.redis = saved_redis redis_manager.redis = saved_redis
redis_manager.valkey = saved_valkey redis_manager.valkey = saved_valkey
def test_bad_url(self): def test_bad_url(self):
with pytest.raises(ValueError): with pytest.raises(ValueError):
RedisManager('http://localhost:6379') RedisManager('http://localhost:6379')._redis_connect()
def test_redis_connect(self): def test_redis_connect(self):
urls = [ urls = [
@ -72,6 +72,8 @@ class TestPubSubManager:
] ]
for url in urls: for url in urls:
c = RedisManager(url) c = RedisManager(url)
assert c.redis is None
c._redis_connect()
assert isinstance(c.redis, redis.Redis) assert isinstance(c.redis, redis.Redis)
def test_valkey_connect(self): def test_valkey_connect(self):
@ -102,6 +104,8 @@ class TestPubSubManager:
] ]
for url in urls: for url in urls:
c = RedisManager(url) c = RedisManager(url)
assert c.redis is None
c._redis_connect()
assert isinstance(c.redis, valkey.Valkey) assert isinstance(c.redis, valkey.Valkey)
redis_manager.redis = saved_redis redis_manager.redis = saved_redis

Loading…
Cancel
Save