From c52e93b4a328d98a968bfbdec0cfd598b73ee913 Mon Sep 17 00:00:00 2001 From: Gritty_dev <101377478+codomposer@users.noreply.github.com> Date: Thu, 30 Oct 2025 16:21:35 -0400 Subject: [PATCH] fix recreate binding on failure (#1516) --- src/socketio/async_aiopika_manager.py | 36 +++++++++++++-------------- src/socketio/kombu_manager.py | 2 +- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/socketio/async_aiopika_manager.py b/src/socketio/async_aiopika_manager.py index 60be887..34171c4 100644 --- a/src/socketio/async_aiopika_manager.py +++ b/src/socketio/async_aiopika_manager.py @@ -101,26 +101,26 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover raise asyncio.CancelledError() async def _listen(self): - async with (await self._connection()) as connection: - channel = await self._channel(connection) - await channel.set_qos(prefetch_count=1) - exchange = await self._exchange(channel) - queue = await self._queue(channel, exchange) - - retry_sleep = 1 - while True: - try: + retry_sleep = 1 + while True: + try: + async with (await self._connection()) as connection: + channel = await self._channel(connection) + await channel.set_qos(prefetch_count=1) + exchange = await self._exchange(channel) + queue = await self._queue(channel, exchange) + async with queue.iterator() as queue_iter: async for message in queue_iter: async with message.process(): yield message.body retry_sleep = 1 - except aio_pika.AMQPException: - self._get_logger().error( - 'Cannot receive from rabbitmq... ' - 'retrying in {} secs'.format(retry_sleep)) - await asyncio.sleep(retry_sleep) - retry_sleep = min(retry_sleep * 2, 60) - except aio_pika.exceptions.ChannelInvalidStateError: - # aio_pika raises this exception when the task is cancelled - raise asyncio.CancelledError() + except aio_pika.AMQPException: + self._get_logger().error( + 'Cannot receive from rabbitmq... ' + 'retrying in {} secs'.format(retry_sleep)) + await asyncio.sleep(retry_sleep) + retry_sleep = min(retry_sleep * 2, 60) + except aio_pika.exceptions.ChannelInvalidStateError: + # aio_pika raises this exception when the task is cancelled + raise asyncio.CancelledError() diff --git a/src/socketio/kombu_manager.py b/src/socketio/kombu_manager.py index bc84bbb..ae9d139 100644 --- a/src/socketio/kombu_manager.py +++ b/src/socketio/kombu_manager.py @@ -115,10 +115,10 @@ class KombuManager(PubSubManager): # pragma: no cover break def _listen(self): - reader_queue = self._queue() retry_sleep = 1 while True: try: + reader_queue = self._queue() with self._connection() as connection: with connection.SimpleQueue(reader_queue) as queue: while True: