|
|
@ -10,21 +10,6 @@ except ImportError: |
|
|
|
from .asyncio_pubsub_manager import AsyncPubSubManager |
|
|
|
|
|
|
|
|
|
|
|
def _parse_redis_url(url): |
|
|
|
p = urlparse(url) |
|
|
|
if p.scheme not in {'redis', 'rediss'}: |
|
|
|
raise ValueError('Invalid redis url') |
|
|
|
ssl = p.scheme == 'rediss' |
|
|
|
host = p.hostname or 'localhost' |
|
|
|
port = p.port or 6379 |
|
|
|
password = p.password |
|
|
|
if p.path: |
|
|
|
db = int(p.path[1:]) |
|
|
|
else: |
|
|
|
db = 0 |
|
|
|
return host, port, password, db, ssl |
|
|
|
|
|
|
|
|
|
|
|
class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover |
|
|
|
"""Redis based client manager for asyncio servers. |
|
|
|
|
|
|
@ -51,58 +36,41 @@ class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover |
|
|
|
name = 'aioredis' |
|
|
|
|
|
|
|
def __init__(self, url='redis://localhost:6379/0', channel='socketio', |
|
|
|
write_only=False, logger=None): |
|
|
|
write_only=False, logger=None, redis_options={}): |
|
|
|
if aioredis is None: |
|
|
|
raise RuntimeError('Redis package is not installed ' |
|
|
|
'(Run "pip install aioredis" in your ' |
|
|
|
'virtualenv).') |
|
|
|
( |
|
|
|
self.host, self.port, self.password, self.db, self.ssl |
|
|
|
) = _parse_redis_url(url) |
|
|
|
self.pub = None |
|
|
|
self.sub = None |
|
|
|
self.redis = aioredis.from_url(url, **redis_options) |
|
|
|
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) |
|
|
|
super().__init__(channel=channel, write_only=write_only, logger=logger) |
|
|
|
|
|
|
|
async def _publish(self, data): |
|
|
|
retry = True |
|
|
|
while True: |
|
|
|
try: |
|
|
|
if self.pub is None: |
|
|
|
self.pub = await aioredis.create_redis( |
|
|
|
(self.host, self.port), db=self.db, |
|
|
|
password=self.password, ssl=self.ssl |
|
|
|
) |
|
|
|
return await self.pub.publish(self.channel, |
|
|
|
pickle.dumps(data)) |
|
|
|
except (aioredis.RedisError, OSError): |
|
|
|
return await self.redis.publish(self.channel, pickle.dumps(data)) |
|
|
|
except redis.exceptions.RedisError: |
|
|
|
if retry: |
|
|
|
self._get_logger().error('Cannot publish to redis... ' |
|
|
|
'retrying') |
|
|
|
self.pub = None |
|
|
|
self._get_logger().error('Cannot publish to redis... retrying') |
|
|
|
retry = False |
|
|
|
else: |
|
|
|
self._get_logger().error('Cannot publish to redis... ' |
|
|
|
'giving up') |
|
|
|
self._get_logger().error('Cannot publish to redis... giving up') |
|
|
|
break |
|
|
|
|
|
|
|
async def _listen(self): |
|
|
|
retry_sleep = 1 |
|
|
|
while True: |
|
|
|
try: |
|
|
|
if self.sub is None: |
|
|
|
self.sub = await aioredis.create_redis( |
|
|
|
(self.host, self.port), db=self.db, |
|
|
|
password=self.password, ssl=self.ssl |
|
|
|
) |
|
|
|
self.ch = (await self.sub.subscribe(self.channel))[0] |
|
|
|
await self.pubsub.subscribe(self.channel) |
|
|
|
retry_sleep = 1 |
|
|
|
return await self.ch.get() |
|
|
|
except (aioredis.RedisError, OSError): |
|
|
|
async for message in self.pubsub.listen(): |
|
|
|
yield message['data'] |
|
|
|
except aioredis.exceptions.RedisError: |
|
|
|
self._get_logger().error('Cannot receive from redis... ' |
|
|
|
'retrying in ' |
|
|
|
'{} secs'.format(retry_sleep)) |
|
|
|
self.sub = None |
|
|
|
'retrying in {} secs'.format(retry_sleep)) |
|
|
|
await asyncio.sleep(retry_sleep) |
|
|
|
retry_sleep *= 2 |
|
|
|
if retry_sleep > 60: |
|
|
|
retry_sleep = 60 |
|
|
|
await self.pubsub.unsubscribe(self.channel) |
|
|
|