|
|
|
@ -101,26 +101,26 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover |
|
|
|
raise asyncio.CancelledError() |
|
|
|
|
|
|
|
async def _listen(self): |
|
|
|
async with (await self._connection()) as connection: |
|
|
|
channel = await self._channel(connection) |
|
|
|
await channel.set_qos(prefetch_count=1) |
|
|
|
exchange = await self._exchange(channel) |
|
|
|
queue = await self._queue(channel, exchange) |
|
|
|
|
|
|
|
retry_sleep = 1 |
|
|
|
while True: |
|
|
|
try: |
|
|
|
retry_sleep = 1 |
|
|
|
while True: |
|
|
|
try: |
|
|
|
async with (await self._connection()) as connection: |
|
|
|
channel = await self._channel(connection) |
|
|
|
await channel.set_qos(prefetch_count=1) |
|
|
|
exchange = await self._exchange(channel) |
|
|
|
queue = await self._queue(channel, exchange) |
|
|
|
|
|
|
|
async with queue.iterator() as queue_iter: |
|
|
|
async for message in queue_iter: |
|
|
|
async with message.process(): |
|
|
|
yield message.body |
|
|
|
retry_sleep = 1 |
|
|
|
except aio_pika.AMQPException: |
|
|
|
self._get_logger().error( |
|
|
|
'Cannot receive from rabbitmq... ' |
|
|
|
'retrying in {} secs'.format(retry_sleep)) |
|
|
|
await asyncio.sleep(retry_sleep) |
|
|
|
retry_sleep = min(retry_sleep * 2, 60) |
|
|
|
except aio_pika.exceptions.ChannelInvalidStateError: |
|
|
|
# aio_pika raises this exception when the task is cancelled |
|
|
|
raise asyncio.CancelledError() |
|
|
|
except aio_pika.AMQPException: |
|
|
|
self._get_logger().error( |
|
|
|
'Cannot receive from rabbitmq... ' |
|
|
|
'retrying in {} secs'.format(retry_sleep)) |
|
|
|
await asyncio.sleep(retry_sleep) |
|
|
|
retry_sleep = min(retry_sleep * 2, 60) |
|
|
|
except aio_pika.exceptions.ChannelInvalidStateError: |
|
|
|
# aio_pika raises this exception when the task is cancelled |
|
|
|
raise asyncio.CancelledError() |
|
|
|
|