diff --git a/docs/server.rst b/docs/server.rst index f5d8b9f..bd4ee5e 100644 --- a/docs/server.rst +++ b/docs/server.rst @@ -283,7 +283,7 @@ Class-Based Namespaces ---------------------- As an alternative to the decorator-based event handlers, the event handlers -that belong to a namespace can be created as methods of a subclass of +that belong to a namespace can be created as methods of a subclass of :class:`socketio.Namespace`:: class MyCustomNamespace(socketio.Namespace): @@ -322,7 +322,7 @@ the namespace class, then the event is ignored. All event names used in class-based namespaces must use characters that are legal in method names. As a convenience to methods defined in a class-based namespace, the namespace -instance includes versions of several of the methods in the +instance includes versions of several of the methods in the :class:`socketio.Server` and :class:`socketio.AsyncServer` classes that default to the proper namespace when the ``namespace`` argument is not given. @@ -349,7 +349,7 @@ personal room for each client is created and named with the ``sid`` assigned to the connection. The application is then free to create additional rooms and manage which clients are in them using the :func:`socketio.Server.enter_room` and :func:`socketio.Server.leave_room` methods. Clients can be in as many -rooms as needed and can be moved between rooms as often as necessary. +rooms as needed and can be moved between rooms as often as necessary. :: @@ -472,7 +472,7 @@ To use a Redis message queue, a Python Redis client must be installed:: # socketio.AsyncServer class pip install aioredis -The Redis queue is configured through the :class:`socketio.RedisManager` and +The Redis queue is configured through the :class:`socketio.RedisManager` and :class:`socketio.AsyncRedisManager` classes. These classes connect directly to the Redis store and use the queue's pub/sub functionality:: @@ -517,6 +517,22 @@ the correct URL for a given message queue. Note that Kombu currently does not support asyncio, so it cannot be used with the :class:`socketio.AsyncServer` class. +AioPika +~~~~~~~ + +If you want to combine a RabbitMQ based manager with asyncio and a +:class:`socketio.AsyncServer` class, you can use the +`AioPika `_ based manager. +You need to install aio_pika with pip:: + + pip install aio_pika + +The RabbitMQ queue is configured through the +:class:`socketio.AsyncPubSubAioPikaManager`:: + + mgr = socketio.AsyncPubSubAioPikaManager('amqp://') + sio = socketio.AsyncServer(client_manager=mgr) + Emitting from external processes ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -528,7 +544,7 @@ example:: # connect to the redis queue as an external process external_sio = socketio.RedisManager('redis://', write_only=True) - + # emit an event external_sio.emit('my event', data={'foo': 'bar'}, room='my room') diff --git a/socketio/__init__.py b/socketio/__init__.py index e60d8e3..b97db6f 100644 --- a/socketio/__init__.py +++ b/socketio/__init__.py @@ -16,6 +16,7 @@ if sys.version_info >= (3, 5): # pragma: no cover from .asyncio_manager import AsyncManager from .asyncio_namespace import AsyncNamespace, AsyncClientNamespace from .asyncio_redis_manager import AsyncRedisManager + from .asyncio_aiopika_manager import AsyncAioPikaManager from .asgi import ASGIApp else: # pragma: no cover AsyncClient = None @@ -23,6 +24,7 @@ else: # pragma: no cover AsyncManager = None AsyncNamespace = None AsyncRedisManager = None + AsyncAioPikaManager = None __version__ = '4.3.1dev' @@ -32,4 +34,4 @@ __all__ = ['__version__', 'Client', 'Server', 'BaseManager', 'PubSubManager', if AsyncServer is not None: # pragma: no cover __all__ += ['AsyncClient', 'AsyncServer', 'AsyncNamespace', 'AsyncClientNamespace', 'AsyncManager', 'AsyncRedisManager', - 'ASGIApp', 'get_tornado_handler'] + 'ASGIApp', 'get_tornado_handler', 'AsyncAioPikaManager'] diff --git a/socketio/asyncio_aiopika_manager.py b/socketio/asyncio_aiopika_manager.py new file mode 100644 index 0000000..10f2929 --- /dev/null +++ b/socketio/asyncio_aiopika_manager.py @@ -0,0 +1,105 @@ +import asyncio +import pickle + +from socketio.asyncio_pubsub_manager import AsyncPubSubManager + +try: + import aio_pika +except ImportError: + aio_pika = None + + +class AsyncAioPikaManager(AsyncPubSubManager): + """Client manager that uses aio_pika for inter-process messaging under + asyncio. + + This class implements a client manager backend for event sharing across + multiple processes, using RabbitMQ + + To use a aio_pika backend, initialize the :class:`Server` instance as + follows:: + + url = 'amqp://user:password@hostname:port//' + server = socketio.Server(client_manager=socketio.AsyncAioPikaManager( + url)) + + :param url: The connection URL for the backend messaging queue. Example + connection URLs are ``'amqp://guest:guest@localhost:5672//'`` + for RabbitMQ. + :param channel: The channel name on which the server sends and receives + notifications. Must be the same in all the servers. + With this manager, the channel name is the exchange name + in rabbitmq + :param write_only: If set ot ``True``, only initialize to emit events. The + default of ``False`` initializes the class for emitting + and receiving. + """ + + name = 'asyncaiopika' + + def __init__(self, url='amqp://guest:guest@localhost:5672//', + channel='socketio', write_only=False, logger=None): + if aio_pika is None: + raise RuntimeError('aio_pika package is not installed ' + '(Run "pip install aio_pika" in your ' + 'virtualenv).') + self.url = url + self.listener_connection = None + self.listener_channel = None + self.listener_queue = None + super().__init__(channel=channel, write_only=write_only, logger=logger) + + async def _connection(self): + return await aio_pika.connect_robust(self.url) + + async def _channel(self, connection): + return await connection.channel() + + async def _exchange(self, channel): + return await channel.declare_exchange(self.channel, + aio_pika.ExchangeType.FANOUT) + + async def _queue(self, channel, exchange): + queue = await channel.declare_queue(durable=False, + arguments={'x-expires': 300000}) + await queue.bind(exchange) + return queue + + async def _publish(self, data): + connection = await self._connection() + channel = await self._channel(connection) + exchange = await self._exchange(channel) + await exchange.publish( + aio_pika.Message(body=pickle.dumps(data), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT), + routing_key='*' + ) + + async def _listen(self): + retry_sleep = 1 + while True: + try: + if self.listener_connection is None: + self.listener_connection = await self._connection() + self.listener_channel = await self._channel( + self.listener_connection + ) + await self.listener_channel.set_qos(prefetch_count=1) + exchange = await self._exchange(self.listener_channel) + self.listener_queue = await self._queue( + self.listener_channel, exchange + ) + + async with self.listener_queue.iterator() as queue_iter: + async for message in queue_iter: + with message.process(): + return pickle.loads(message.body) + except Exception: + self._get_logger().error('Cannot receive from rabbitmq... ' + 'retrying in ' + '{} secs'.format(retry_sleep)) + self.listener_connection = None + await asyncio.sleep(retry_sleep) + retry_sleep *= 2 + if retry_sleep > 60: + retry_sleep = 60