8 changed files with 7 additions and 251 deletions
@ -1,106 +0,0 @@ |
|||
import asyncio |
|||
import pickle |
|||
|
|||
from .asyncio_taskqueue_manager import AsyncTaskQueueManager |
|||
|
|||
try: |
|||
import aio_pika |
|||
except ImportError: |
|||
aio_pika = None |
|||
|
|||
|
|||
class AsyncTaskQueueAioPikaManager(AsyncTaskQueueManager): |
|||
name = 'asynctaskqueueaiopika' |
|||
|
|||
def __init__(self, url='amqp://guest:guest@localhost:5672//', |
|||
channel='socketio', write_only=False, logger=None): |
|||
if aio_pika is None: |
|||
raise RuntimeError('aio_pika package is not installed ' |
|||
'(Run "pip install aio_pika" in your ' |
|||
'virtualenv).') |
|||
self.url = url |
|||
self.publisher_connection = None |
|||
self.publisher_channel = None |
|||
self.publisher_exchange = None |
|||
self.listener_connection = None |
|||
self.listener_channel = None |
|||
self.listener_exchange = None |
|||
self.listener_queues = {} |
|||
super().__init__(channel=channel, write_only=write_only, logger=logger) |
|||
|
|||
async def _connect_publisher(self): |
|||
self.publisher_connection = await aio_pika.connect_robust(self.url) |
|||
self.publisher_channel = await self.publisher_connection.channel() |
|||
self.publisher_exchange = \ |
|||
await self.publisher_channel.declare_exchange( |
|||
self.channel, aio_pika.ExchangeType.DIRECT) |
|||
|
|||
async def _connect_listener(self): |
|||
self.listener_connection = await aio_pika.connect_robust(self.url) |
|||
self.listener_channel = await self.listener_connection.channel() |
|||
await self.listener_channel.set_qos(prefetch_count=1) |
|||
self.listener_exchange = await self.listener_channel.declare_exchange( |
|||
self.channel, aio_pika.ExchangeType.DIRECT) |
|||
|
|||
def _reset_listener(self): |
|||
self.listener_connection = None |
|||
self.listener_queues = {} |
|||
|
|||
async def _declare_queue(self, task_queue): |
|||
queue = await self.listener_channel.declare_queue(durable=True) |
|||
await queue.bind(self.listener_exchange, routing_key=task_queue) |
|||
return queue |
|||
|
|||
async def _publish(self, task_queue, data, retry=True): |
|||
try: |
|||
if self.publisher_connection is None: |
|||
await self._connect_publisher() |
|||
|
|||
await self.publisher_exchange.publish( |
|||
aio_pika.Message( |
|||
body=pickle.dumps(data), |
|||
delivery_mode=aio_pika.DeliveryMode.PERSISTENT |
|||
), |
|||
routing_key=task_queue |
|||
) |
|||
except (aio_pika.exceptions.AMQPChannelError, |
|||
aio_pika.exceptions.AMQPConnectionError): |
|||
if retry: |
|||
self._get_logger().exception('Error while publishing event, ' |
|||
'retrying in 1sec') |
|||
self.publisher_connection = None |
|||
await asyncio.sleep(1) |
|||
await self._publish(task_queue, data, retry=False) |
|||
else: |
|||
self._get_logger().exception('Error while publishing event, ' |
|||
'giving up') |
|||
self.publisher_connection = None |
|||
raise |
|||
|
|||
async def _listen(self, task_queue): |
|||
retry_sleep = 1 |
|||
while True: |
|||
try: |
|||
if self.listener_connection is None: |
|||
await self._connect_listener() |
|||
|
|||
if task_queue not in self.listener_queues: |
|||
self.listener_queues[task_queue] = \ |
|||
await self._declare_queue(task_queue) |
|||
|
|||
async with self.listener_queues[task_queue].iterator() as \ |
|||
queue_iter: |
|||
async for message in queue_iter: |
|||
with message.process(): |
|||
return pickle.loads(message.body) |
|||
|
|||
except (aio_pika.exceptions.AMQPChannelError, |
|||
aio_pika.exceptions.AMQPConnectionError): |
|||
self._get_logger().exception('Error in listener for task queue' |
|||
' %s, retrying in %ss', |
|||
task_queue, retry_sleep) |
|||
self._reset_listener() |
|||
await asyncio.sleep(retry_sleep) |
|||
retry_sleep *= 2 |
|||
if retry_sleep > 60: |
|||
retry_sleep = 60 |
@ -1,94 +0,0 @@ |
|||
import asyncio |
|||
import json |
|||
import pickle |
|||
|
|||
import six |
|||
from socketio.asyncio_manager import AsyncManager |
|||
|
|||
|
|||
class AsyncTaskQueueManager(AsyncManager): |
|||
name = 'asynctaskqueue' |
|||
|
|||
def __init__(self, channel='socketio', write_only=False, logger=None): |
|||
super().__init__() |
|||
self.channel = channel |
|||
self.write_only = write_only |
|||
self.logger = logger |
|||
self.tasks = {} |
|||
|
|||
def register_task(self, task_queue): |
|||
self.tasks[task_queue] = asyncio.create_task(self._task(task_queue)) |
|||
self._get_logger().info('Starting async listening task for %s', |
|||
task_queue) |
|||
|
|||
def cancel_task(self, task_queue): |
|||
self.tasks[task_queue].cancel() |
|||
self._get_logger().info('Canceled async listening task for %s', |
|||
task_queue) |
|||
del self.tasks[task_queue] |
|||
|
|||
async def emit(self, event, data, task_queue, namespace=None, room=None, |
|||
**kwargs): |
|||
"""Emit a message to a task queue |
|||
|
|||
Note: this method is a coroutine. |
|||
""" |
|||
await self._publish( |
|||
task_queue, |
|||
{ |
|||
'event': event, 'data': data, |
|||
'namespace': namespace or '/', 'room': room |
|||
} |
|||
) |
|||
|
|||
async def _publish(self, task_queue, data): |
|||
"""Publish a message on the Socket.IO channel. |
|||
|
|||
This method needs to be implemented by the different subclasses that |
|||
support task queue backends. |
|||
""" |
|||
raise NotImplementedError('This method must be implemented in a ' |
|||
'subclass.') # pragma: no cover |
|||
|
|||
async def _listen(self, task_queue): |
|||
"""Return the next message published on the Socket.IO channel, |
|||
blocking until a message is available. |
|||
|
|||
This method needs to be implemented by the different subclasses that |
|||
support task queue backends. |
|||
""" |
|||
raise NotImplementedError('This method must be implemented in a ' |
|||
'subclass.') # pragma: no cover |
|||
|
|||
async def _handle_emit(self, message): |
|||
await super().emit(message['event'], message['data'], |
|||
namespace=message.get('namespace'), |
|||
room=message.get('room')) |
|||
|
|||
async def _task(self, task_queue): |
|||
while True: |
|||
try: |
|||
message = await self._listen(task_queue) |
|||
except asyncio.CancelledError: |
|||
self._get_logger().debug('Task queue %s canceled', task_queue) |
|||
raise |
|||
except Exception: |
|||
self._get_logger().exception('Unexpected error in task queue ' |
|||
'listener') |
|||
break |
|||
data = None |
|||
if isinstance(message, dict): |
|||
data = message |
|||
else: |
|||
if isinstance(message, six.binary_type): # pragma: no cover |
|||
try: |
|||
data = pickle.loads(message) |
|||
except: |
|||
pass |
|||
if data is None: |
|||
try: |
|||
data = json.loads(message) |
|||
except: |
|||
pass |
|||
if data: |
|||
await self._handle_emit(data) |
@ -1 +0,0 @@ |
|||
# WIP |
@ -1 +0,0 @@ |
|||
# WIP |
Loading…
Reference in new issue