diff --git a/src/socketio/asyncio_aiopika_manager.py b/src/socketio/asyncio_aiopika_manager.py index eff3f8c..9c99903 100644 --- a/src/socketio/asyncio_aiopika_manager.py +++ b/src/socketio/asyncio_aiopika_manager.py @@ -47,17 +47,28 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover self.listener_connection = None self.listener_channel = None self.listener_queue = None + + self.aiopika_connection = None + self.aiopika_channel = None + self.aiopika_exchange = None + super().__init__(channel=channel, write_only=write_only, logger=logger) async def _connection(self): - return await aio_pika.connect_robust(self.url) + if (self.aiopika_connection is None or self.aiopika_connection.is_closed is False): + return await aio_pika.connect_robust(self.url) + return self.aiopika_connection async def _channel(self, connection): - return await connection.channel() + if self.aiopika_channel is None: + return await connection.channel() + return self.aiopika_channel async def _exchange(self, channel): - return await channel.declare_exchange(self.channel, + if self.aiopika_exchange is None: + return await channel.declare_exchange(self.channel, aio_pika.ExchangeType.FANOUT) + return self.aiopika_exchange async def _queue(self, channel, exchange): queue = await channel.declare_queue(durable=False, @@ -66,10 +77,10 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover return queue async def _publish(self, data): - connection = await self._connection() - channel = await self._channel(connection) - exchange = await self._exchange(channel) - await exchange.publish( + self.aiopika_connection = await self._connection() + self.aiopika_channel = await self._channel(self.aiopika_connection) + self.aiopika_exchange = await self._exchange(self.aiopika_channel) + await self.aiopika_exchange.publish( aio_pika.Message(body=pickle.dumps(data), delivery_mode=aio_pika.DeliveryMode.PERSISTENT), routing_key='*'