|
@ -46,30 +46,32 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover |
|
|
self.url = url |
|
|
self.url = url |
|
|
self.listener_connection = None |
|
|
self.listener_connection = None |
|
|
self.listener_channel = None |
|
|
self.listener_channel = None |
|
|
|
|
|
self.listener_exchange = None |
|
|
self.listener_queue = None |
|
|
self.listener_queue = None |
|
|
super().__init__(channel=channel, write_only=write_only, logger=logger) |
|
|
super().__init__(channel=channel, write_only=write_only, logger=logger) |
|
|
|
|
|
|
|
|
async def _connection(self): |
|
|
async def _initialize(self): |
|
|
return await aio_pika.connect_robust(self.url) |
|
|
""" |
|
|
|
|
|
initialization aio_pika connection, channel, exchange and queue |
|
|
|
|
|
|
|
|
async def _channel(self, connection): |
|
|
""" |
|
|
return await connection.channel() |
|
|
self.listener_connection = await aio_pika.connect_robust(url=self.url) |
|
|
|
|
|
self.listener_channel = await self.listener_connection.channel() |
|
|
|
|
|
self.listener_exchange = await self.listener_channel.declare_exchange(self.channel, |
|
|
|
|
|
aio_pika.ExchangeType.FANOUT) |
|
|
|
|
|
|
|
|
async def _exchange(self, channel): |
|
|
if not self.write_only: |
|
|
return await channel.declare_exchange(self.channel, |
|
|
await self.listener_channel.set_qos(prefetch_count=1) |
|
|
aio_pika.ExchangeType.FANOUT) |
|
|
self.listener_queue = await self.listener_channel.declare_queue(durable=False) |
|
|
|
|
|
await self.listener_queue.bind(self.listener_exchange) |
|
|
|
|
|
AsyncPubSubManager.initialize(self) |
|
|
|
|
|
|
|
|
async def _queue(self, channel, exchange): |
|
|
def initialize(self): |
|
|
queue = await channel.declare_queue(durable=False, |
|
|
loop = asyncio.get_event_loop() |
|
|
arguments={'x-expires': 300000}) |
|
|
loop.create_task(self._initialize()) |
|
|
await queue.bind(exchange) |
|
|
|
|
|
return queue |
|
|
|
|
|
|
|
|
|
|
|
async def _publish(self, data): |
|
|
async def _publish(self, data): |
|
|
connection = await self._connection() |
|
|
await self.listener_exchange.publish( |
|
|
channel = await self._channel(connection) |
|
|
|
|
|
exchange = await self._exchange(channel) |
|
|
|
|
|
await exchange.publish( |
|
|
|
|
|
aio_pika.Message(body=pickle.dumps(data), |
|
|
aio_pika.Message(body=pickle.dumps(data), |
|
|
delivery_mode=aio_pika.DeliveryMode.PERSISTENT), |
|
|
delivery_mode=aio_pika.DeliveryMode.PERSISTENT), |
|
|
routing_key='*' |
|
|
routing_key='*' |
|
@ -80,16 +82,7 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover |
|
|
while True: |
|
|
while True: |
|
|
try: |
|
|
try: |
|
|
if self.listener_connection is None: |
|
|
if self.listener_connection is None: |
|
|
self.listener_connection = await self._connection() |
|
|
await self._initialize() |
|
|
self.listener_channel = await self._channel( |
|
|
|
|
|
self.listener_connection |
|
|
|
|
|
) |
|
|
|
|
|
await self.listener_channel.set_qos(prefetch_count=1) |
|
|
|
|
|
exchange = await self._exchange(self.listener_channel) |
|
|
|
|
|
self.listener_queue = await self._queue( |
|
|
|
|
|
self.listener_channel, exchange |
|
|
|
|
|
) |
|
|
|
|
|
retry_sleep = 1 |
|
|
|
|
|
|
|
|
|
|
|
async with self.listener_queue.iterator() as queue_iter: |
|
|
async with self.listener_queue.iterator() as queue_iter: |
|
|
async for message in queue_iter: |
|
|
async for message in queue_iter: |
|
|