diff --git a/socketio/asyncio_aiopika_manager.py b/socketio/asyncio_aiopika_manager.py index 905057d..2c5b083 100644 --- a/socketio/asyncio_aiopika_manager.py +++ b/socketio/asyncio_aiopika_manager.py @@ -46,30 +46,32 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover self.url = url self.listener_connection = None self.listener_channel = None + self.listener_exchange = None self.listener_queue = None super().__init__(channel=channel, write_only=write_only, logger=logger) - async def _connection(self): - return await aio_pika.connect_robust(self.url) + async def _initialize(self): + """ + initialization aio_pika connection, channel, exchange and queue - async def _channel(self, connection): - return await connection.channel() + """ + self.listener_connection = await aio_pika.connect_robust(url=self.url) + self.listener_channel = await self.listener_connection.channel() + self.listener_exchange = await self.listener_channel.declare_exchange(self.channel, + aio_pika.ExchangeType.FANOUT) - async def _exchange(self, channel): - return await channel.declare_exchange(self.channel, - aio_pika.ExchangeType.FANOUT) + if not self.write_only: + await self.listener_channel.set_qos(prefetch_count=1) + self.listener_queue = await self.listener_channel.declare_queue(durable=False) + await self.listener_queue.bind(self.listener_exchange) + AsyncPubSubManager.initialize(self) - async def _queue(self, channel, exchange): - queue = await channel.declare_queue(durable=False, - arguments={'x-expires': 300000}) - await queue.bind(exchange) - return queue + def initialize(self): + loop = asyncio.get_event_loop() + loop.create_task(self._initialize()) async def _publish(self, data): - connection = await self._connection() - channel = await self._channel(connection) - exchange = await self._exchange(channel) - await exchange.publish( + await self.listener_exchange.publish( aio_pika.Message(body=pickle.dumps(data), delivery_mode=aio_pika.DeliveryMode.PERSISTENT), routing_key='*' @@ -80,16 +82,7 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover while True: try: if self.listener_connection is None: - self.listener_connection = await self._connection() - self.listener_channel = await self._channel( - self.listener_connection - ) - await self.listener_channel.set_qos(prefetch_count=1) - exchange = await self._exchange(self.listener_channel) - self.listener_queue = await self._queue( - self.listener_channel, exchange - ) - retry_sleep = 1 + await self._initialize() async with self.listener_queue.iterator() as queue_iter: async for message in queue_iter: