diff --git a/docs/server.rst b/docs/server.rst index 3f169e9..4b580cb 100644 --- a/docs/server.rst +++ b/docs/server.rst @@ -283,7 +283,7 @@ Class-Based Namespaces ---------------------- As an alternative to the decorator-based event handlers, the event handlers -that belong to a namespace can be created as methods of a subclass of +that belong to a namespace can be created as methods of a subclass of :class:`socketio.Namespace`:: class MyCustomNamespace(socketio.Namespace): @@ -322,7 +322,7 @@ the namespace class, then the event is ignored. All event names used in class-based namespaces must use characters that are legal in method names. As a convenience to methods defined in a class-based namespace, the namespace -instance includes versions of several of the methods in the +instance includes versions of several of the methods in the :class:`socketio.Server` and :class:`socketio.AsyncServer` classes that default to the proper namespace when the ``namespace`` argument is not given. @@ -349,7 +349,7 @@ personal room for each client is created and named with the ``sid`` assigned to the connection. The application is then free to create additional rooms and manage which clients are in them using the :func:`socketio.Server.enter_room` and :func:`socketio.Server.leave_room` methods. Clients can be in as many -rooms as needed and can be moved between rooms as often as necessary. +rooms as needed and can be moved between rooms as often as necessary. :: @@ -472,7 +472,7 @@ To use a Redis message queue, a Python Redis client must be installed:: # socketio.AsyncServer class pip install aioredis -The Redis queue is configured through the :class:`socketio.RedisManager` and +The Redis queue is configured through the :class:`socketio.RedisManager` and :class:`socketio.AsyncRedisManager` classes. These classes connect directly to the Redis store and use the queue's pub/sub functionality:: @@ -517,6 +517,60 @@ the correct URL for a given message queue. Note that Kombu currently does not support asyncio, so it cannot be used with the :class:`socketio.AsyncServer` class. +AioPika as a pubsub +~~~~~~~~~~~~~~~~~~~ + +If you want to combine a RabbitMQ based manager with asyncio and a +:class:`socketio.AsyncServer` class, you can use the +`AioPika `_ based manager. +You need to install aio_pika with pip:: + + pip install aio_pika + +The RabbitMQ queue is configured through the +:class:`socketio.AsyncPubSubAioPikaManager`:: + + mgr = socketio.AsyncPubSubAioPikaManager('amqp://') + sio = socketio.AsyncServer(client_manager=mgr) + +AioPika as task queue +~~~~~~~~~~~~~~~~~~~~~ + +Using a pubsub manager does not guarantee message delivery, in the case where a +client you want to deliver a message to is disconnected during the publication +of the event. When this client reconnects, the message was already published, +so it will miss it. +Using a task queue by client to publish this message will solve this use case. +This comes with a limitation: you have to publish the message once for each +client you want to deliver it to, which is inconvenient if you need to dispatch +messages to multiple clients at once. + +You can use RabbitMQ as a task queue with +:class:`socketio.AsyncTaskQueueAioPikaManager`:: + + mgr = socketio.AsyncTaskQueueAioPikaManager('amqp://') + sio = socketio.AsyncServer(client_manager=mgr) + +Then, you can register tasks to this manager to start listening for events to +emit to a specific client, for example:: + + @sio.event + async def connect(sid, environ): + mgr.register_task('some token to uniquely identify your client') + + @sio.event + async def disconnect(sid): + mgr.cancel_task('some token to uniquely identify your client') + +You can them emit events to this client:: + + mgr.emit('event', data='data', + task_queue='some token to uniquely identify your client', + room='room', namespace='namespace') + +The event will be emitted even if the client is currently disconnected but +reconnects later. + Emitting from external processes ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -528,7 +582,7 @@ example:: # connect to the redis queue as an external process external_sio = socketio.RedisManager('redis://', write_only=True) - + # emit an event external_sio.emit('my event', data={'foo': 'bar'}, room='my room') diff --git a/socketio/__init__.py b/socketio/__init__.py index b5849b8..2eb419d 100644 --- a/socketio/__init__.py +++ b/socketio/__init__.py @@ -16,6 +16,9 @@ if sys.version_info >= (3, 5): # pragma: no cover from .asyncio_manager import AsyncManager from .asyncio_namespace import AsyncNamespace, AsyncClientNamespace from .asyncio_redis_manager import AsyncRedisManager + from .asyncio_pubsub_aiopika_manager import AsyncPubSubAioPikaManager + from .asyncio_taskqueue_aiopika_manager \ + import AsyncTaskQueueAioPikaManager from .asgi import ASGIApp else: # pragma: no cover AsyncClient = None @@ -23,6 +26,8 @@ else: # pragma: no cover AsyncManager = None AsyncNamespace = None AsyncRedisManager = None + AsyncPubSubAioPikaManager = None + AsyncTaskQueueAioPikaManager = None __version__ = '4.2.1dev' @@ -32,4 +37,5 @@ __all__ = ['__version__', 'Client', 'Server', 'BaseManager', 'PubSubManager', if AsyncServer is not None: # pragma: no cover __all__ += ['AsyncClient', 'AsyncServer', 'AsyncNamespace', 'AsyncClientNamespace', 'AsyncManager', 'AsyncRedisManager', - 'ASGIApp', 'get_tornado_handler'] + 'ASGIApp', 'get_tornado_handler', 'AsyncPubSubAioPikaManager', + 'AsyncTaskQueueAioPikaManager'] diff --git a/socketio/asyncio_pubsub_aiopika_manager.py b/socketio/asyncio_pubsub_aiopika_manager.py new file mode 100644 index 0000000..a7c68c5 --- /dev/null +++ b/socketio/asyncio_pubsub_aiopika_manager.py @@ -0,0 +1,107 @@ +import asyncio +import pickle + +from socketio.asyncio_pubsub_manager import AsyncPubSubManager + +try: + import aio_pika +except ImportError: + aio_pika = None + + +class AsyncPubSubAioPikaManager(AsyncPubSubManager): + """Client manager that uses aio_pika for inter-process messaging under + asyncio. + + This class implements a client manager backend for event sharing across + multiple processes, using RabbitMQ + + To use a aio_pika backend, initialize the :class:`Server` instance as + follows:: + + url = 'amqp://user:password@hostname:port//' + server = socketio.Server(client_manager=socketio.AsyncPikaManager(url)) + + :param url: The connection URL for the backend messaging queue. Example + connection URLs are ``'amqp://guest:guest@localhost:5672//'`` + for RabbitMQ. + :param channel: The channel name on which the server sends and receives + notifications. Must be the same in all the servers. + With this manager, the channel name is the exchange name + in rabbitmq + :param write_only: If set ot ``True``, only initialize to emit events. The + default of ``False`` initializes the class for emitting + and receiving. + """ + + name = 'asyncpubsubaiopika' + + def __init__(self, url='amqp://guest:guest@localhost:5672//', + channel='socketio', write_only=False, logger=None): + if aio_pika is None: + raise RuntimeError('aio_pika package is not installed ' + '(Run "pip install aio_pika" in your ' + 'virtualenv).') + self.url = url + self.listener_connection = None + self.listener_channel = None + self.listener_queue = None + super().__init__(channel=channel, write_only=write_only, logger=logger) + + async def _connection(self): + return await aio_pika.connect_robust(self.url) + + async def _channel(self, connection): + return await connection.channel() + + async def _exchange(self, channel): + return await channel.declare_exchange(self.channel, + aio_pika.ExchangeType.FANOUT) + + async def _queue(self, channel, exchange): + queue = await channel.declare_queue(exclusive=True) + await queue.bind(exchange) + return queue + + async def _publish(self, data): + connection = await self._connection() + channel = await self._channel(connection) + exchange = await self._exchange(channel) + await exchange.publish( + aio_pika.Message(body=pickle.dumps(data), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT), + routing_key='*' + ) + self._get_logger().info('published %s', data) + + async def _listen(self): + retry_sleep = 1 + while True: + try: + self._get_logger().info('Starting listener') + if self.listener_connection is None: + self.listener_connection = await self._connection() + self.listener_channel = await self._channel( + self.listener_connection + ) + await self.listener_channel.set_qos(prefetch_count=1) + exchange = await self._exchange(self.listener_channel) + self.listener_queue = await self._queue( + self.listener_channel, exchange + ) + + async with self.listener_queue.iterator() as queue_iter: + async for message in queue_iter: + message_body = pickle.loads(message.body) + self._get_logger().info('received message %s', + message_body) + return message_body + except Exception: + self._get_logger().error('Cannot receive from rabbitmq... ' + 'retrying in ' + '{} secs'.format(retry_sleep)) + self.listener_connection = None + await asyncio.sleep(retry_sleep) + retry_sleep *= 2 + if retry_sleep > 60: + retry_sleep = 60 diff --git a/socketio/asyncio_taskqueue_aiopika_manager.py b/socketio/asyncio_taskqueue_aiopika_manager.py new file mode 100644 index 0000000..f4b36dc --- /dev/null +++ b/socketio/asyncio_taskqueue_aiopika_manager.py @@ -0,0 +1,106 @@ +import asyncio +import pickle + +from .asyncio_taskqueue_manager import AsyncTaskQueueManager + +try: + import aio_pika +except ImportError: + aio_pika = None + + +class AsyncTaskQueueAioPikaManager(AsyncTaskQueueManager): + name = 'asynctaskqueueaiopika' + + def __init__(self, url='amqp://guest:guest@localhost:5672//', + channel='socketio', write_only=False, logger=None): + if aio_pika is None: + raise RuntimeError('aio_pika package is not installed ' + '(Run "pip install aio_pika" in your ' + 'virtualenv).') + self.url = url + self.publisher_connection = None + self.publisher_channel = None + self.publisher_exchange = None + self.listener_connection = None + self.listener_channel = None + self.listener_exchange = None + self.listener_queues = {} + super().__init__(channel=channel, write_only=write_only, logger=logger) + + async def _connect_publisher(self): + self.publisher_connection = await aio_pika.connect_robust(self.url) + self.publisher_channel = await self.publisher_connection.channel() + self.publisher_exchange = \ + await self.publisher_channel.declare_exchange( + self.channel, aio_pika.ExchangeType.DIRECT) + + async def _connect_listener(self): + self.listener_connection = await aio_pika.connect_robust(self.url) + self.listener_channel = await self.listener_connection.channel() + await self.listener_channel.set_qos(prefetch_count=1) + self.listener_exchange = await self.listener_channel.declare_exchange( + self.channel, aio_pika.ExchangeType.DIRECT) + + def _reset_listener(self): + self.listener_connection = None + self.listener_queues = {} + + async def _declare_queue(self, task_queue): + queue = await self.listener_channel.declare_queue(durable=True) + await queue.bind(self.listener_exchange, routing_key=task_queue) + return queue + + async def _publish(self, task_queue, data, retry=True): + try: + if self.publisher_connection is None: + await self._connect_publisher() + + await self.publisher_exchange.publish( + aio_pika.Message( + body=pickle.dumps(data), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT + ), + routing_key=task_queue + ) + except (aio_pika.exceptions.AMQPChannelError, + aio_pika.exceptions.AMQPConnectionError): + if retry: + self._get_logger().exception('Error while publishing event, ' + 'retrying in 1sec') + self.publisher_connection = None + await asyncio.sleep(1) + await self._publish(task_queue, data, retry=False) + else: + self._get_logger().exception('Error while publishing event, ' + 'giving up') + self.publisher_connection = None + raise + + async def _listen(self, task_queue): + retry_sleep = 1 + while True: + try: + if self.listener_connection is None: + await self._connect_listener() + + if task_queue not in self.listener_queues: + self.listener_queues[task_queue] = \ + await self._declare_queue(task_queue) + + async with self.listener_queues[task_queue].iterator() as \ + queue_iter: + async for message in queue_iter: + with message.process(): + return pickle.loads(message.body) + + except (aio_pika.exceptions.AMQPChannelError, + aio_pika.exceptions.AMQPConnectionError): + self._get_logger().exception('Error in listener for task queue' + ' %s, retrying in %ss', + task_queue, retry_sleep) + self._reset_listener() + await asyncio.sleep(retry_sleep) + retry_sleep *= 2 + if retry_sleep > 60: + retry_sleep = 60 diff --git a/socketio/asyncio_taskqueue_manager.py b/socketio/asyncio_taskqueue_manager.py new file mode 100644 index 0000000..df73c68 --- /dev/null +++ b/socketio/asyncio_taskqueue_manager.py @@ -0,0 +1,94 @@ +import asyncio +import json +import pickle + +import six +from socketio.asyncio_manager import AsyncManager + + +class AsyncTaskQueueManager(AsyncManager): + name = 'asynctaskqueue' + + def __init__(self, channel='socketio', write_only=False, logger=None): + super().__init__() + self.channel = channel + self.write_only = write_only + self.logger = logger + self.tasks = {} + + def register_task(self, task_queue): + self.tasks[task_queue] = asyncio.create_task(self._task(task_queue)) + self._get_logger().info('Starting async listening task for %s', + task_queue) + + def cancel_task(self, task_queue): + self.tasks[task_queue].cancel() + self._get_logger().info('Canceled async listening task for %s', + task_queue) + del self.tasks[task_queue] + + async def emit(self, event, data, task_queue, namespace=None, room=None, + **kwargs): + """Emit a message to a task queue + + Note: this method is a coroutine. + """ + await self._publish( + task_queue, + { + 'event': event, 'data': data, + 'namespace': namespace or '/', 'room': room + } + ) + + async def _publish(self, task_queue, data): + """Publish a message on the Socket.IO channel. + + This method needs to be implemented by the different subclasses that + support task queue backends. + """ + raise NotImplementedError('This method must be implemented in a ' + 'subclass.') # pragma: no cover + + async def _listen(self, task_queue): + """Return the next message published on the Socket.IO channel, + blocking until a message is available. + + This method needs to be implemented by the different subclasses that + support task queue backends. + """ + raise NotImplementedError('This method must be implemented in a ' + 'subclass.') # pragma: no cover + + async def _handle_emit(self, message): + await super().emit(message['event'], message['data'], + namespace=message.get('namespace'), + room=message.get('room')) + + async def _task(self, task_queue): + while True: + try: + message = await self._listen(task_queue) + except asyncio.CancelledError: + self._get_logger().debug('Task queue %s canceled', task_queue) + raise + except Exception: + self._get_logger().exception('Unexpected error in task queue ' + 'listener') + break + data = None + if isinstance(message, dict): + data = message + else: + if isinstance(message, six.binary_type): # pragma: no cover + try: + data = pickle.loads(message) + except: + pass + if data is None: + try: + data = json.loads(message) + except: + pass + if data: + await self._handle_emit(data) diff --git a/tests/asyncio/test_asyncio_pubsub_aiopika_manager.py b/tests/asyncio/test_asyncio_pubsub_aiopika_manager.py new file mode 100644 index 0000000..85e6ff1 --- /dev/null +++ b/tests/asyncio/test_asyncio_pubsub_aiopika_manager.py @@ -0,0 +1 @@ +# WIP diff --git a/tests/asyncio/test_asyncio_taskqueue_aiopika_manager.py b/tests/asyncio/test_asyncio_taskqueue_aiopika_manager.py new file mode 100644 index 0000000..85e6ff1 --- /dev/null +++ b/tests/asyncio/test_asyncio_taskqueue_aiopika_manager.py @@ -0,0 +1 @@ +# WIP diff --git a/tests/asyncio/test_asyncio_taskqueue_manager.py b/tests/asyncio/test_asyncio_taskqueue_manager.py new file mode 100644 index 0000000..85e6ff1 --- /dev/null +++ b/tests/asyncio/test_asyncio_taskqueue_manager.py @@ -0,0 +1 @@ +# WIP