From df142a63a39457f9e1eb291e25d78221fb5568bc Mon Sep 17 00:00:00 2001 From: Salim Aboubacar Date: Fri, 26 Jul 2019 19:28:17 +0200 Subject: [PATCH] Harmonize settings with kombu manager --- socketio/asyncio_aiopika_manager.py | 13 +++++-------- tests/asyncio/test_asyncio_aiopika_manager.py | 1 - 2 files changed, 5 insertions(+), 9 deletions(-) delete mode 100644 tests/asyncio/test_asyncio_aiopika_manager.py diff --git a/socketio/asyncio_aiopika_manager.py b/socketio/asyncio_aiopika_manager.py index 7c9e43e..63d0454 100644 --- a/socketio/asyncio_aiopika_manager.py +++ b/socketio/asyncio_aiopika_manager.py @@ -20,7 +20,7 @@ class AsyncAioPikaManager(AsyncPubSubManager): follows:: url = 'amqp://user:password@hostname:port//' - server = socketio.Server(client_manager=socketio.AsyncPikaManager(url)) + server = socketio.Server(client_manager=socketio.AsyncAioPikaManager(url)) :param url: The connection URL for the backend messaging queue. Example connection URLs are ``'amqp://guest:guest@localhost:5672//'`` @@ -59,7 +59,8 @@ class AsyncAioPikaManager(AsyncPubSubManager): aio_pika.ExchangeType.FANOUT) async def _queue(self, channel, exchange): - queue = await channel.declare_queue(exclusive=True) + queue = await channel.declare_queue(durable=False, + arguments={'x-expires': 300000}) await queue.bind(exchange) return queue @@ -72,13 +73,11 @@ class AsyncAioPikaManager(AsyncPubSubManager): delivery_mode=aio_pika.DeliveryMode.PERSISTENT), routing_key='*' ) - self._get_logger().info('published %s', data) async def _listen(self): retry_sleep = 1 while True: try: - self._get_logger().info('Starting listener') if self.listener_connection is None: self.listener_connection = await self._connection() self.listener_channel = await self._channel( @@ -92,10 +91,8 @@ class AsyncAioPikaManager(AsyncPubSubManager): async with self.listener_queue.iterator() as queue_iter: async for message in queue_iter: - message_body = pickle.loads(message.body) - self._get_logger().info('received message %s', - message_body) - return message_body + with message.process(): + return pickle.loads(message.body) except Exception: self._get_logger().error('Cannot receive from rabbitmq... ' 'retrying in ' diff --git a/tests/asyncio/test_asyncio_aiopika_manager.py b/tests/asyncio/test_asyncio_aiopika_manager.py deleted file mode 100644 index 85e6ff1..0000000 --- a/tests/asyncio/test_asyncio_aiopika_manager.py +++ /dev/null @@ -1 +0,0 @@ -# WIP