From 2e6d91f2330910ae0804e3e7bfdd9b30dcdc0241 Mon Sep 17 00:00:00 2001 From: Sillyfrog Date: Sun, 26 Feb 2023 09:04:22 +1000 Subject: [PATCH] Use a persistent ampq connection when publishing --- src/socketio/asyncio_aiopika_manager.py | 37 ++++++++++++++++++++----- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/src/socketio/asyncio_aiopika_manager.py b/src/socketio/asyncio_aiopika_manager.py index eff3f8c..8015e4d 100644 --- a/src/socketio/asyncio_aiopika_manager.py +++ b/src/socketio/asyncio_aiopika_manager.py @@ -44,6 +44,8 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover '(Run "pip install aio_pika" in your ' 'virtualenv).') self.url = url + self._lock = asyncio.Lock() + self._sender_connection_cache = None self.listener_connection = None self.listener_channel = None self.listener_queue = None @@ -52,6 +54,27 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover async def _connection(self): return await aio_pika.connect_robust(self.url) + async def _sender_connection(self): + if self._sender_connection_cache is None: + async with self._lock: + if self._sender_connection_cache is None: + connection = await self._connection() + channel = await connection.channel() + exchange = await channel.declare_exchange( + self.channel, aio_pika.ExchangeType.FANOUT + ) + queue = await channel.declare_queue( + durable=False, arguments={"x-expires": 300000} + ) + self._sender_connection_cache = { + "connection": connection, + "channel": channel, + "exchange": exchange, + "queue": queue, + } + + return self._sender_connection_cache + async def _channel(self, connection): return await connection.channel() @@ -66,13 +89,13 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover return queue async def _publish(self, data): - connection = await self._connection() - channel = await self._channel(connection) - exchange = await self._exchange(channel) - await exchange.publish( - aio_pika.Message(body=pickle.dumps(data), - delivery_mode=aio_pika.DeliveryMode.PERSISTENT), - routing_key='*' + full_connection = await self._sender_connection() + await full_connection["exchange"].publish( + aio_pika.Message( + body=pickle.dumps(data), + delivery_mode=aio_pika.DeliveryMode.PERSISTENT, + ), + routing_key="*", ) async def _listen(self):