diff --git a/docs/server.rst b/docs/server.rst index 4b580cb..21d5019 100644 --- a/docs/server.rst +++ b/docs/server.rst @@ -517,8 +517,8 @@ the correct URL for a given message queue. Note that Kombu currently does not support asyncio, so it cannot be used with the :class:`socketio.AsyncServer` class. -AioPika as a pubsub -~~~~~~~~~~~~~~~~~~~ +AioPika +~~~~~~~ If you want to combine a RabbitMQ based manager with asyncio and a :class:`socketio.AsyncServer` class, you can use the @@ -533,44 +533,6 @@ The RabbitMQ queue is configured through the mgr = socketio.AsyncPubSubAioPikaManager('amqp://') sio = socketio.AsyncServer(client_manager=mgr) -AioPika as task queue -~~~~~~~~~~~~~~~~~~~~~ - -Using a pubsub manager does not guarantee message delivery, in the case where a -client you want to deliver a message to is disconnected during the publication -of the event. When this client reconnects, the message was already published, -so it will miss it. -Using a task queue by client to publish this message will solve this use case. -This comes with a limitation: you have to publish the message once for each -client you want to deliver it to, which is inconvenient if you need to dispatch -messages to multiple clients at once. - -You can use RabbitMQ as a task queue with -:class:`socketio.AsyncTaskQueueAioPikaManager`:: - - mgr = socketio.AsyncTaskQueueAioPikaManager('amqp://') - sio = socketio.AsyncServer(client_manager=mgr) - -Then, you can register tasks to this manager to start listening for events to -emit to a specific client, for example:: - - @sio.event - async def connect(sid, environ): - mgr.register_task('some token to uniquely identify your client') - - @sio.event - async def disconnect(sid): - mgr.cancel_task('some token to uniquely identify your client') - -You can them emit events to this client:: - - mgr.emit('event', data='data', - task_queue='some token to uniquely identify your client', - room='room', namespace='namespace') - -The event will be emitted even if the client is currently disconnected but -reconnects later. - Emitting from external processes ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/socketio/__init__.py b/socketio/__init__.py index 2eb419d..cd560a1 100644 --- a/socketio/__init__.py +++ b/socketio/__init__.py @@ -16,9 +16,7 @@ if sys.version_info >= (3, 5): # pragma: no cover from .asyncio_manager import AsyncManager from .asyncio_namespace import AsyncNamespace, AsyncClientNamespace from .asyncio_redis_manager import AsyncRedisManager - from .asyncio_pubsub_aiopika_manager import AsyncPubSubAioPikaManager - from .asyncio_taskqueue_aiopika_manager \ - import AsyncTaskQueueAioPikaManager + from .asyncio_aiopika_manager import AsyncAioPikaManager from .asgi import ASGIApp else: # pragma: no cover AsyncClient = None @@ -26,8 +24,7 @@ else: # pragma: no cover AsyncManager = None AsyncNamespace = None AsyncRedisManager = None - AsyncPubSubAioPikaManager = None - AsyncTaskQueueAioPikaManager = None + AsyncAioPikaManager = None __version__ = '4.2.1dev' @@ -37,5 +34,4 @@ __all__ = ['__version__', 'Client', 'Server', 'BaseManager', 'PubSubManager', if AsyncServer is not None: # pragma: no cover __all__ += ['AsyncClient', 'AsyncServer', 'AsyncNamespace', 'AsyncClientNamespace', 'AsyncManager', 'AsyncRedisManager', - 'ASGIApp', 'get_tornado_handler', 'AsyncPubSubAioPikaManager', - 'AsyncTaskQueueAioPikaManager'] + 'ASGIApp', 'get_tornado_handler', 'AsyncAioPikaManager'] diff --git a/socketio/asyncio_pubsub_aiopika_manager.py b/socketio/asyncio_aiopika_manager.py similarity index 98% rename from socketio/asyncio_pubsub_aiopika_manager.py rename to socketio/asyncio_aiopika_manager.py index a7c68c5..7c9e43e 100644 --- a/socketio/asyncio_pubsub_aiopika_manager.py +++ b/socketio/asyncio_aiopika_manager.py @@ -9,7 +9,7 @@ except ImportError: aio_pika = None -class AsyncPubSubAioPikaManager(AsyncPubSubManager): +class AsyncAioPikaManager(AsyncPubSubManager): """Client manager that uses aio_pika for inter-process messaging under asyncio. @@ -34,7 +34,7 @@ class AsyncPubSubAioPikaManager(AsyncPubSubManager): and receiving. """ - name = 'asyncpubsubaiopika' + name = 'asyncaiopika' def __init__(self, url='amqp://guest:guest@localhost:5672//', channel='socketio', write_only=False, logger=None): diff --git a/socketio/asyncio_taskqueue_aiopika_manager.py b/socketio/asyncio_taskqueue_aiopika_manager.py deleted file mode 100644 index f4b36dc..0000000 --- a/socketio/asyncio_taskqueue_aiopika_manager.py +++ /dev/null @@ -1,106 +0,0 @@ -import asyncio -import pickle - -from .asyncio_taskqueue_manager import AsyncTaskQueueManager - -try: - import aio_pika -except ImportError: - aio_pika = None - - -class AsyncTaskQueueAioPikaManager(AsyncTaskQueueManager): - name = 'asynctaskqueueaiopika' - - def __init__(self, url='amqp://guest:guest@localhost:5672//', - channel='socketio', write_only=False, logger=None): - if aio_pika is None: - raise RuntimeError('aio_pika package is not installed ' - '(Run "pip install aio_pika" in your ' - 'virtualenv).') - self.url = url - self.publisher_connection = None - self.publisher_channel = None - self.publisher_exchange = None - self.listener_connection = None - self.listener_channel = None - self.listener_exchange = None - self.listener_queues = {} - super().__init__(channel=channel, write_only=write_only, logger=logger) - - async def _connect_publisher(self): - self.publisher_connection = await aio_pika.connect_robust(self.url) - self.publisher_channel = await self.publisher_connection.channel() - self.publisher_exchange = \ - await self.publisher_channel.declare_exchange( - self.channel, aio_pika.ExchangeType.DIRECT) - - async def _connect_listener(self): - self.listener_connection = await aio_pika.connect_robust(self.url) - self.listener_channel = await self.listener_connection.channel() - await self.listener_channel.set_qos(prefetch_count=1) - self.listener_exchange = await self.listener_channel.declare_exchange( - self.channel, aio_pika.ExchangeType.DIRECT) - - def _reset_listener(self): - self.listener_connection = None - self.listener_queues = {} - - async def _declare_queue(self, task_queue): - queue = await self.listener_channel.declare_queue(durable=True) - await queue.bind(self.listener_exchange, routing_key=task_queue) - return queue - - async def _publish(self, task_queue, data, retry=True): - try: - if self.publisher_connection is None: - await self._connect_publisher() - - await self.publisher_exchange.publish( - aio_pika.Message( - body=pickle.dumps(data), - delivery_mode=aio_pika.DeliveryMode.PERSISTENT - ), - routing_key=task_queue - ) - except (aio_pika.exceptions.AMQPChannelError, - aio_pika.exceptions.AMQPConnectionError): - if retry: - self._get_logger().exception('Error while publishing event, ' - 'retrying in 1sec') - self.publisher_connection = None - await asyncio.sleep(1) - await self._publish(task_queue, data, retry=False) - else: - self._get_logger().exception('Error while publishing event, ' - 'giving up') - self.publisher_connection = None - raise - - async def _listen(self, task_queue): - retry_sleep = 1 - while True: - try: - if self.listener_connection is None: - await self._connect_listener() - - if task_queue not in self.listener_queues: - self.listener_queues[task_queue] = \ - await self._declare_queue(task_queue) - - async with self.listener_queues[task_queue].iterator() as \ - queue_iter: - async for message in queue_iter: - with message.process(): - return pickle.loads(message.body) - - except (aio_pika.exceptions.AMQPChannelError, - aio_pika.exceptions.AMQPConnectionError): - self._get_logger().exception('Error in listener for task queue' - ' %s, retrying in %ss', - task_queue, retry_sleep) - self._reset_listener() - await asyncio.sleep(retry_sleep) - retry_sleep *= 2 - if retry_sleep > 60: - retry_sleep = 60 diff --git a/socketio/asyncio_taskqueue_manager.py b/socketio/asyncio_taskqueue_manager.py deleted file mode 100644 index df73c68..0000000 --- a/socketio/asyncio_taskqueue_manager.py +++ /dev/null @@ -1,94 +0,0 @@ -import asyncio -import json -import pickle - -import six -from socketio.asyncio_manager import AsyncManager - - -class AsyncTaskQueueManager(AsyncManager): - name = 'asynctaskqueue' - - def __init__(self, channel='socketio', write_only=False, logger=None): - super().__init__() - self.channel = channel - self.write_only = write_only - self.logger = logger - self.tasks = {} - - def register_task(self, task_queue): - self.tasks[task_queue] = asyncio.create_task(self._task(task_queue)) - self._get_logger().info('Starting async listening task for %s', - task_queue) - - def cancel_task(self, task_queue): - self.tasks[task_queue].cancel() - self._get_logger().info('Canceled async listening task for %s', - task_queue) - del self.tasks[task_queue] - - async def emit(self, event, data, task_queue, namespace=None, room=None, - **kwargs): - """Emit a message to a task queue - - Note: this method is a coroutine. - """ - await self._publish( - task_queue, - { - 'event': event, 'data': data, - 'namespace': namespace or '/', 'room': room - } - ) - - async def _publish(self, task_queue, data): - """Publish a message on the Socket.IO channel. - - This method needs to be implemented by the different subclasses that - support task queue backends. - """ - raise NotImplementedError('This method must be implemented in a ' - 'subclass.') # pragma: no cover - - async def _listen(self, task_queue): - """Return the next message published on the Socket.IO channel, - blocking until a message is available. - - This method needs to be implemented by the different subclasses that - support task queue backends. - """ - raise NotImplementedError('This method must be implemented in a ' - 'subclass.') # pragma: no cover - - async def _handle_emit(self, message): - await super().emit(message['event'], message['data'], - namespace=message.get('namespace'), - room=message.get('room')) - - async def _task(self, task_queue): - while True: - try: - message = await self._listen(task_queue) - except asyncio.CancelledError: - self._get_logger().debug('Task queue %s canceled', task_queue) - raise - except Exception: - self._get_logger().exception('Unexpected error in task queue ' - 'listener') - break - data = None - if isinstance(message, dict): - data = message - else: - if isinstance(message, six.binary_type): # pragma: no cover - try: - data = pickle.loads(message) - except: - pass - if data is None: - try: - data = json.loads(message) - except: - pass - if data: - await self._handle_emit(data) diff --git a/tests/asyncio/test_asyncio_pubsub_aiopika_manager.py b/tests/asyncio/test_asyncio_aiopika_manager.py similarity index 100% rename from tests/asyncio/test_asyncio_pubsub_aiopika_manager.py rename to tests/asyncio/test_asyncio_aiopika_manager.py diff --git a/tests/asyncio/test_asyncio_taskqueue_aiopika_manager.py b/tests/asyncio/test_asyncio_taskqueue_aiopika_manager.py deleted file mode 100644 index 85e6ff1..0000000 --- a/tests/asyncio/test_asyncio_taskqueue_aiopika_manager.py +++ /dev/null @@ -1 +0,0 @@ -# WIP diff --git a/tests/asyncio/test_asyncio_taskqueue_manager.py b/tests/asyncio/test_asyncio_taskqueue_manager.py deleted file mode 100644 index 85e6ff1..0000000 --- a/tests/asyncio/test_asyncio_taskqueue_manager.py +++ /dev/null @@ -1 +0,0 @@ -# WIP