Browse Source

Unify redis usage

pull/806/head
Sam Mosleh 4 years ago
parent
commit
7a7da80c5c
  1. 52
      src/socketio/asyncio_redis_manager.py
  2. 51
      src/socketio/redis_manager.py

52
src/socketio/asyncio_redis_manager.py

@ -36,67 +36,33 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
name = 'aioredis' name = 'aioredis'
def __init__(self, url='redis://localhost:6379/0', channel='socketio', def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None, redis_options=None): write_only=False, logger=None, redis_options={}):
if aioredis is None: if aioredis is None:
raise RuntimeError('Redis package is not installed ' raise RuntimeError('Redis package is not installed '
'(Run "pip install aioredis" in your ' '(Run "pip install aioredis" in your '
'virtualenv).') 'virtualenv).')
if not hasattr(aioredis.Redis, 'from_url'): if not hasattr(aioredis.Redis, 'from_url'):
raise RuntimeError('Version 2 of aioredis package is required.') raise RuntimeError('Version 2 of aioredis package is required.')
self.redis_url = url self.redis = aioredis.Redis.from_url(url, **redis_options)
self.redis_options = redis_options or {} self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
self._redis_connect()
super().__init__(channel=channel, write_only=write_only, logger=logger) super().__init__(channel=channel, write_only=write_only, logger=logger)
def _redis_connect(self):
self.redis = aioredis.Redis.from_url(self.redis_url,
**self.redis_options)
self.pubsub = self.redis.pubsub()
async def _publish(self, data): async def _publish(self, data):
retry = True return await self.redis.publish(self.channel, pickle.dumps(data))
while True:
try:
if not retry:
self._redis_connect()
return await self.redis.publish(
self.channel, pickle.dumps(data))
except aioredis.exceptions.RedisError:
if retry:
self._get_logger().error('Cannot publish to redis... '
'retrying')
retry = False
else:
self._get_logger().error('Cannot publish to redis... '
'giving up')
break
async def _redis_listen_with_retries(self): async def _listen(self):
retry_sleep = 1 retry_sleep = 1
connect = False
while True: while True:
try: try:
if connect: await self.pubsub.subscribe(self.channel)
self._redis_connect() retry_sleep = 1
await self.pubsub.subscribe(self.channel)
retry_sleep = 1
async for message in self.pubsub.listen(): async for message in self.pubsub.listen():
yield message yield message['data']
except aioredis.exceptions.RedisError: except aioredis.exceptions.RedisError:
self._get_logger().error('Cannot receive from redis... ' self._get_logger().error('Cannot receive from redis... '
'retrying in ' 'retrying in {} secs'.format(retry_sleep))
'{} secs'.format(retry_sleep))
connect = True
await asyncio.sleep(retry_sleep) await asyncio.sleep(retry_sleep)
retry_sleep *= 2 retry_sleep *= 2
if retry_sleep > 60: if retry_sleep > 60:
retry_sleep = 60 retry_sleep = 60
async def _listen(self):
channel = self.channel.encode('utf-8')
await self.pubsub.subscribe(self.channel)
async for message in self._redis_listen_with_retries():
if message['channel'] == channel and \
message['type'] == 'message' and 'data' in message:
yield message['data']
await self.pubsub.unsubscribe(self.channel)

51
src/socketio/redis_manager.py

@ -9,8 +9,6 @@ except ImportError:
from .pubsub_manager import PubSubManager from .pubsub_manager import PubSubManager
logger = logging.getLogger('socketio')
class RedisManager(PubSubManager): # pragma: no cover class RedisManager(PubSubManager): # pragma: no cover
"""Redis based client manager. """Redis based client manager.
@ -40,14 +38,13 @@ class RedisManager(PubSubManager): # pragma: no cover
name = 'redis' name = 'redis'
def __init__(self, url='redis://localhost:6379/0', channel='socketio', def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False, logger=None, redis_options=None): write_only=False, logger=None, redis_options={}):
if redis is None: if redis is None:
raise RuntimeError('Redis package is not installed ' raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your ' '(Run "pip install redis" in your '
'virtualenv).') 'virtualenv).')
self.redis_url = url self.redis = redis.Redis.from_url(url, **redis_options)
self.redis_options = redis_options or {} self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
self._redis_connect()
super(RedisManager, self).__init__(channel=channel, super(RedisManager, self).__init__(channel=channel,
write_only=write_only, write_only=write_only,
logger=logger) logger=logger)
@ -67,51 +64,21 @@ class RedisManager(PubSubManager): # pragma: no cover
'Redis requires a monkey patched socket library to work ' 'Redis requires a monkey patched socket library to work '
'with ' + self.server.async_mode) 'with ' + self.server.async_mode)
def _redis_connect(self):
self.redis = redis.Redis.from_url(self.redis_url,
**self.redis_options)
self.pubsub = self.redis.pubsub()
def _publish(self, data): def _publish(self, data):
retry = True return self.redis.publish(self.channel, pickle.dumps(data))
while True:
try:
if not retry:
self._redis_connect()
return self.redis.publish(self.channel, pickle.dumps(data))
except redis.exceptions.RedisError:
if retry:
logger.error('Cannot publish to redis... retrying')
retry = False
else:
logger.error('Cannot publish to redis... giving up')
break
def _redis_listen_with_retries(self): def _listen(self):
retry_sleep = 1 retry_sleep = 1
connect = False
while True: while True:
try: try:
if connect: self.pubsub.subscribe(self.channel)
self._redis_connect() retry_sleep = 1
self.pubsub.subscribe(self.channel)
retry_sleep = 1
for message in self.pubsub.listen(): for message in self.pubsub.listen():
yield message yield message['data']
except redis.exceptions.RedisError: except redis.exceptions.RedisError:
logger.error('Cannot receive from redis... ' self._get_logger().error('Cannot receive from redis... '
'retrying in {} secs'.format(retry_sleep)) 'retrying in {} secs'.format(retry_sleep))
connect = True
time.sleep(retry_sleep) time.sleep(retry_sleep)
retry_sleep *= 2 retry_sleep *= 2
if retry_sleep > 60: if retry_sleep > 60:
retry_sleep = 60 retry_sleep = 60
def _listen(self):
channel = self.channel.encode('utf-8')
self.pubsub.subscribe(self.channel)
for message in self._redis_listen_with_retries():
if message['channel'] == channel and \
message['type'] == 'message' and 'data' in message:
yield message['data']
self.pubsub.unsubscribe(self.channel)

Loading…
Cancel
Save