Browse Source

Recoonect to redis when connection is lost

Fixes #143
pull/164/head
Miguel Grinberg 7 years ago
parent
commit
af13ef067c
No known key found for this signature in database GPG Key ID: 36848B262DF5F06C
  1. 48
      socketio/asyncio_redis_manager.py
  2. 47
      socketio/redis_manager.py

48
socketio/asyncio_redis_manager.py

@ -1,3 +1,5 @@
import asyncio
import logging
import pickle import pickle
from urllib.parse import urlparse from urllib.parse import urlparse
@ -8,6 +10,8 @@ except ImportError:
from .asyncio_pubsub_manager import AsyncPubSubManager from .asyncio_pubsub_manager import AsyncPubSubManager
logger = logging.getLogger('socketio')
def _parse_redis_url(url): def _parse_redis_url(url):
p = urlparse(url) p = urlparse(url)
@ -59,17 +63,39 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
super().__init__(channel=channel, write_only=write_only) super().__init__(channel=channel, write_only=write_only)
async def _publish(self, data): async def _publish(self, data):
if self.pub is None: retry = True
self.pub = await aioredis.create_redis((self.host, self.port), while True:
db=self.db, try:
password=self.password) if self.pub is None:
return await self.pub.publish(self.channel, pickle.dumps(data)) self.pub = await aioredis.create_redis(
(self.host, self.port), db=self.db,
password=self.password)
return await self.pub.publish(self.channel,
pickle.dumps(data))
except (aioredis.RedisError, OSError):
if retry:
logger.error('Cannot publish to redis... retrying')
self.pub = None
retry = False
else:
logger.error('Cannot publish to redis... giving up')
break
async def _listen(self): async def _listen(self):
if self.sub is None: retry_sleep = 1
self.sub = await aioredis.create_redis((self.host, self.port),
db=self.db,
password=self.password)
self.ch = (await self.sub.subscribe(self.channel))[0]
while True: while True:
return await self.ch.get() try:
if self.sub is None:
self.sub = await aioredis.create_redis(
(self.host, self.port), db=self.db,
password=self.password)
self.ch = (await self.sub.subscribe(self.channel))[0]
return await self.ch.get()
except (aioredis.RedisError, OSError):
logger.error('Cannot receive from redis... '
'retrying in {} secs'.format(retry_sleep))
self.sub = None
await asyncio.sleep(retry_sleep)
retry_sleep *= 2
if retry_sleep > 60:
retry_sleep = 60

47
socketio/redis_manager.py

@ -1,4 +1,6 @@
import logging
import pickle import pickle
import time
try: try:
import redis import redis
@ -7,6 +9,8 @@ except ImportError:
from .pubsub_manager import PubSubManager from .pubsub_manager import PubSubManager
logger = logging.getLogger('socketio')
class RedisManager(PubSubManager): # pragma: no cover class RedisManager(PubSubManager): # pragma: no cover
"""Redis based client manager. """Redis based client manager.
@ -38,8 +42,8 @@ class RedisManager(PubSubManager): # pragma: no cover
raise RuntimeError('Redis package is not installed ' raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your ' '(Run "pip install redis" in your '
'virtualenv).') 'virtualenv).')
self.redis = redis.Redis.from_url(url) self.redis_url = url
self.pubsub = self.redis.pubsub() self._redis_connect()
super(RedisManager, self).__init__(channel=channel, super(RedisManager, self).__init__(channel=channel,
write_only=write_only) write_only=write_only)
@ -58,13 +62,48 @@ class RedisManager(PubSubManager): # pragma: no cover
'Redis requires a monkey patched socket library to work ' 'Redis requires a monkey patched socket library to work '
'with ' + self.server.async_mode) 'with ' + self.server.async_mode)
def _redis_connect(self):
self.redis = redis.Redis.from_url(self.redis_url)
self.pubsub = self.redis.pubsub()
def _publish(self, data): def _publish(self, data):
return self.redis.publish(self.channel, pickle.dumps(data)) retry = True
while True:
try:
if not retry:
self._redis_connect()
return self.redis.publish(self.channel, pickle.dumps(data))
except redis.exceptions.ConnectionError:
if retry:
logger.error('Cannot publish to redis... retrying')
retry = False
else:
logger.error('Cannot publish to redis... giving up')
break
def _redis_listen_with_retries(self):
retry_sleep = 1
connect = False
while True:
try:
if connect:
self._redis_connect()
self.pubsub.subscribe(self.channel)
for message in self.pubsub.listen():
yield message
except redis.exceptions.ConnectionError:
logger.error('Cannot receive from redis... '
'retrying in {} secs'.format(retry_sleep))
connect = True
time.sleep(retry_sleep)
retry_sleep *= 2
if retry_sleep > 60:
retry_sleep = 60
def _listen(self): def _listen(self):
channel = self.channel.encode('utf-8') channel = self.channel.encode('utf-8')
self.pubsub.subscribe(self.channel) self.pubsub.subscribe(self.channel)
for message in self.pubsub.listen(): for message in self._redis_listen_with_retries():
if message['channel'] == channel and \ if message['channel'] == channel and \
message['type'] == 'message' and 'data' in message: message['type'] == 'message' and 'data' in message:
yield message['data'] yield message['data']

Loading…
Cancel
Save