8 changed files with 376 additions and 6 deletions
@ -0,0 +1,107 @@ |
|||||
|
import asyncio |
||||
|
import pickle |
||||
|
|
||||
|
from socketio.asyncio_pubsub_manager import AsyncPubSubManager |
||||
|
|
||||
|
try: |
||||
|
import aio_pika |
||||
|
except ImportError: |
||||
|
aio_pika = None |
||||
|
|
||||
|
|
||||
|
class AsyncPubSubAioPikaManager(AsyncPubSubManager): |
||||
|
"""Client manager that uses aio_pika for inter-process messaging under |
||||
|
asyncio. |
||||
|
|
||||
|
This class implements a client manager backend for event sharing across |
||||
|
multiple processes, using RabbitMQ |
||||
|
|
||||
|
To use a aio_pika backend, initialize the :class:`Server` instance as |
||||
|
follows:: |
||||
|
|
||||
|
url = 'amqp://user:password@hostname:port//' |
||||
|
server = socketio.Server(client_manager=socketio.AsyncPikaManager(url)) |
||||
|
|
||||
|
:param url: The connection URL for the backend messaging queue. Example |
||||
|
connection URLs are ``'amqp://guest:guest@localhost:5672//'`` |
||||
|
for RabbitMQ. |
||||
|
:param channel: The channel name on which the server sends and receives |
||||
|
notifications. Must be the same in all the servers. |
||||
|
With this manager, the channel name is the exchange name |
||||
|
in rabbitmq |
||||
|
:param write_only: If set ot ``True``, only initialize to emit events. The |
||||
|
default of ``False`` initializes the class for emitting |
||||
|
and receiving. |
||||
|
""" |
||||
|
|
||||
|
name = 'asyncpubsubaiopika' |
||||
|
|
||||
|
def __init__(self, url='amqp://guest:guest@localhost:5672//', |
||||
|
channel='socketio', write_only=False, logger=None): |
||||
|
if aio_pika is None: |
||||
|
raise RuntimeError('aio_pika package is not installed ' |
||||
|
'(Run "pip install aio_pika" in your ' |
||||
|
'virtualenv).') |
||||
|
self.url = url |
||||
|
self.listener_connection = None |
||||
|
self.listener_channel = None |
||||
|
self.listener_queue = None |
||||
|
super().__init__(channel=channel, write_only=write_only, logger=logger) |
||||
|
|
||||
|
async def _connection(self): |
||||
|
return await aio_pika.connect_robust(self.url) |
||||
|
|
||||
|
async def _channel(self, connection): |
||||
|
return await connection.channel() |
||||
|
|
||||
|
async def _exchange(self, channel): |
||||
|
return await channel.declare_exchange(self.channel, |
||||
|
aio_pika.ExchangeType.FANOUT) |
||||
|
|
||||
|
async def _queue(self, channel, exchange): |
||||
|
queue = await channel.declare_queue(exclusive=True) |
||||
|
await queue.bind(exchange) |
||||
|
return queue |
||||
|
|
||||
|
async def _publish(self, data): |
||||
|
connection = await self._connection() |
||||
|
channel = await self._channel(connection) |
||||
|
exchange = await self._exchange(channel) |
||||
|
await exchange.publish( |
||||
|
aio_pika.Message(body=pickle.dumps(data), |
||||
|
delivery_mode=aio_pika.DeliveryMode.PERSISTENT), |
||||
|
routing_key='*' |
||||
|
) |
||||
|
self._get_logger().info('published %s', data) |
||||
|
|
||||
|
async def _listen(self): |
||||
|
retry_sleep = 1 |
||||
|
while True: |
||||
|
try: |
||||
|
self._get_logger().info('Starting listener') |
||||
|
if self.listener_connection is None: |
||||
|
self.listener_connection = await self._connection() |
||||
|
self.listener_channel = await self._channel( |
||||
|
self.listener_connection |
||||
|
) |
||||
|
await self.listener_channel.set_qos(prefetch_count=1) |
||||
|
exchange = await self._exchange(self.listener_channel) |
||||
|
self.listener_queue = await self._queue( |
||||
|
self.listener_channel, exchange |
||||
|
) |
||||
|
|
||||
|
async with self.listener_queue.iterator() as queue_iter: |
||||
|
async for message in queue_iter: |
||||
|
message_body = pickle.loads(message.body) |
||||
|
self._get_logger().info('received message %s', |
||||
|
message_body) |
||||
|
return message_body |
||||
|
except Exception: |
||||
|
self._get_logger().error('Cannot receive from rabbitmq... ' |
||||
|
'retrying in ' |
||||
|
'{} secs'.format(retry_sleep)) |
||||
|
self.listener_connection = None |
||||
|
await asyncio.sleep(retry_sleep) |
||||
|
retry_sleep *= 2 |
||||
|
if retry_sleep > 60: |
||||
|
retry_sleep = 60 |
@ -0,0 +1,106 @@ |
|||||
|
import asyncio |
||||
|
import pickle |
||||
|
|
||||
|
from .asyncio_taskqueue_manager import AsyncTaskQueueManager |
||||
|
|
||||
|
try: |
||||
|
import aio_pika |
||||
|
except ImportError: |
||||
|
aio_pika = None |
||||
|
|
||||
|
|
||||
|
class AsyncTaskQueueAioPikaManager(AsyncTaskQueueManager): |
||||
|
name = 'asynctaskqueueaiopika' |
||||
|
|
||||
|
def __init__(self, url='amqp://guest:guest@localhost:5672//', |
||||
|
channel='socketio', write_only=False, logger=None): |
||||
|
if aio_pika is None: |
||||
|
raise RuntimeError('aio_pika package is not installed ' |
||||
|
'(Run "pip install aio_pika" in your ' |
||||
|
'virtualenv).') |
||||
|
self.url = url |
||||
|
self.publisher_connection = None |
||||
|
self.publisher_channel = None |
||||
|
self.publisher_exchange = None |
||||
|
self.listener_connection = None |
||||
|
self.listener_channel = None |
||||
|
self.listener_exchange = None |
||||
|
self.listener_queues = {} |
||||
|
super().__init__(channel=channel, write_only=write_only, logger=logger) |
||||
|
|
||||
|
async def _connect_publisher(self): |
||||
|
self.publisher_connection = await aio_pika.connect_robust(self.url) |
||||
|
self.publisher_channel = await self.publisher_connection.channel() |
||||
|
self.publisher_exchange = \ |
||||
|
await self.publisher_channel.declare_exchange( |
||||
|
self.channel, aio_pika.ExchangeType.DIRECT) |
||||
|
|
||||
|
async def _connect_listener(self): |
||||
|
self.listener_connection = await aio_pika.connect_robust(self.url) |
||||
|
self.listener_channel = await self.listener_connection.channel() |
||||
|
await self.listener_channel.set_qos(prefetch_count=1) |
||||
|
self.listener_exchange = await self.listener_channel.declare_exchange( |
||||
|
self.channel, aio_pika.ExchangeType.DIRECT) |
||||
|
|
||||
|
def _reset_listener(self): |
||||
|
self.listener_connection = None |
||||
|
self.listener_queues = {} |
||||
|
|
||||
|
async def _declare_queue(self, task_queue): |
||||
|
queue = await self.listener_channel.declare_queue(durable=True) |
||||
|
await queue.bind(self.listener_exchange, routing_key=task_queue) |
||||
|
return queue |
||||
|
|
||||
|
async def _publish(self, task_queue, data, retry=True): |
||||
|
try: |
||||
|
if self.publisher_connection is None: |
||||
|
await self._connect_publisher() |
||||
|
|
||||
|
await self.publisher_exchange.publish( |
||||
|
aio_pika.Message( |
||||
|
body=pickle.dumps(data), |
||||
|
delivery_mode=aio_pika.DeliveryMode.PERSISTENT |
||||
|
), |
||||
|
routing_key=task_queue |
||||
|
) |
||||
|
except (aio_pika.exceptions.AMQPChannelError, |
||||
|
aio_pika.exceptions.AMQPConnectionError): |
||||
|
if retry: |
||||
|
self._get_logger().exception('Error while publishing event, ' |
||||
|
'retrying in 1sec') |
||||
|
self.publisher_connection = None |
||||
|
await asyncio.sleep(1) |
||||
|
await self._publish(task_queue, data, retry=False) |
||||
|
else: |
||||
|
self._get_logger().exception('Error while publishing event, ' |
||||
|
'giving up') |
||||
|
self.publisher_connection = None |
||||
|
raise |
||||
|
|
||||
|
async def _listen(self, task_queue): |
||||
|
retry_sleep = 1 |
||||
|
while True: |
||||
|
try: |
||||
|
if self.listener_connection is None: |
||||
|
await self._connect_listener() |
||||
|
|
||||
|
if task_queue not in self.listener_queues: |
||||
|
self.listener_queues[task_queue] = \ |
||||
|
await self._declare_queue(task_queue) |
||||
|
|
||||
|
async with self.listener_queues[task_queue].iterator() as \ |
||||
|
queue_iter: |
||||
|
async for message in queue_iter: |
||||
|
with message.process(): |
||||
|
return pickle.loads(message.body) |
||||
|
|
||||
|
except (aio_pika.exceptions.AMQPChannelError, |
||||
|
aio_pika.exceptions.AMQPConnectionError): |
||||
|
self._get_logger().exception('Error in listener for task queue' |
||||
|
' %s, retrying in %ss', |
||||
|
task_queue, retry_sleep) |
||||
|
self._reset_listener() |
||||
|
await asyncio.sleep(retry_sleep) |
||||
|
retry_sleep *= 2 |
||||
|
if retry_sleep > 60: |
||||
|
retry_sleep = 60 |
@ -0,0 +1,94 @@ |
|||||
|
import asyncio |
||||
|
import json |
||||
|
import pickle |
||||
|
|
||||
|
import six |
||||
|
from socketio.asyncio_manager import AsyncManager |
||||
|
|
||||
|
|
||||
|
class AsyncTaskQueueManager(AsyncManager): |
||||
|
name = 'asynctaskqueue' |
||||
|
|
||||
|
def __init__(self, channel='socketio', write_only=False, logger=None): |
||||
|
super().__init__() |
||||
|
self.channel = channel |
||||
|
self.write_only = write_only |
||||
|
self.logger = logger |
||||
|
self.tasks = {} |
||||
|
|
||||
|
def register_task(self, task_queue): |
||||
|
self.tasks[task_queue] = asyncio.create_task(self._task(task_queue)) |
||||
|
self._get_logger().info('Starting async listening task for %s', |
||||
|
task_queue) |
||||
|
|
||||
|
def cancel_task(self, task_queue): |
||||
|
self.tasks[task_queue].cancel() |
||||
|
self._get_logger().info('Canceled async listening task for %s', |
||||
|
task_queue) |
||||
|
del self.tasks[task_queue] |
||||
|
|
||||
|
async def emit(self, event, data, task_queue, namespace=None, room=None, |
||||
|
**kwargs): |
||||
|
"""Emit a message to a task queue |
||||
|
|
||||
|
Note: this method is a coroutine. |
||||
|
""" |
||||
|
await self._publish( |
||||
|
task_queue, |
||||
|
{ |
||||
|
'event': event, 'data': data, |
||||
|
'namespace': namespace or '/', 'room': room |
||||
|
} |
||||
|
) |
||||
|
|
||||
|
async def _publish(self, task_queue, data): |
||||
|
"""Publish a message on the Socket.IO channel. |
||||
|
|
||||
|
This method needs to be implemented by the different subclasses that |
||||
|
support task queue backends. |
||||
|
""" |
||||
|
raise NotImplementedError('This method must be implemented in a ' |
||||
|
'subclass.') # pragma: no cover |
||||
|
|
||||
|
async def _listen(self, task_queue): |
||||
|
"""Return the next message published on the Socket.IO channel, |
||||
|
blocking until a message is available. |
||||
|
|
||||
|
This method needs to be implemented by the different subclasses that |
||||
|
support task queue backends. |
||||
|
""" |
||||
|
raise NotImplementedError('This method must be implemented in a ' |
||||
|
'subclass.') # pragma: no cover |
||||
|
|
||||
|
async def _handle_emit(self, message): |
||||
|
await super().emit(message['event'], message['data'], |
||||
|
namespace=message.get('namespace'), |
||||
|
room=message.get('room')) |
||||
|
|
||||
|
async def _task(self, task_queue): |
||||
|
while True: |
||||
|
try: |
||||
|
message = await self._listen(task_queue) |
||||
|
except asyncio.CancelledError: |
||||
|
self._get_logger().debug('Task queue %s canceled', task_queue) |
||||
|
raise |
||||
|
except Exception: |
||||
|
self._get_logger().exception('Unexpected error in task queue ' |
||||
|
'listener') |
||||
|
break |
||||
|
data = None |
||||
|
if isinstance(message, dict): |
||||
|
data = message |
||||
|
else: |
||||
|
if isinstance(message, six.binary_type): # pragma: no cover |
||||
|
try: |
||||
|
data = pickle.loads(message) |
||||
|
except: |
||||
|
pass |
||||
|
if data is None: |
||||
|
try: |
||||
|
data = json.loads(message) |
||||
|
except: |
||||
|
pass |
||||
|
if data: |
||||
|
await self._handle_emit(data) |
@ -0,0 +1 @@ |
|||||
|
# WIP |
@ -0,0 +1 @@ |
|||||
|
# WIP |
@ -0,0 +1 @@ |
|||||
|
# WIP |
Loading…
Reference in new issue