|
|
@ -44,6 +44,8 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover |
|
|
|
'(Run "pip install aio_pika" in your ' |
|
|
|
'virtualenv).') |
|
|
|
self.url = url |
|
|
|
self._lock = asyncio.Lock() |
|
|
|
self._sender_connection_cache = None |
|
|
|
self.listener_connection = None |
|
|
|
self.listener_channel = None |
|
|
|
self.listener_queue = None |
|
|
@ -52,6 +54,27 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover |
|
|
|
async def _connection(self): |
|
|
|
return await aio_pika.connect_robust(self.url) |
|
|
|
|
|
|
|
async def _sender_connection(self): |
|
|
|
if self._sender_connection_cache is None: |
|
|
|
async with self._lock: |
|
|
|
if self._sender_connection_cache is None: |
|
|
|
connection = await self._connection() |
|
|
|
channel = await connection.channel() |
|
|
|
exchange = await channel.declare_exchange( |
|
|
|
self.channel, aio_pika.ExchangeType.FANOUT |
|
|
|
) |
|
|
|
queue = await channel.declare_queue( |
|
|
|
durable=False, arguments={"x-expires": 300000} |
|
|
|
) |
|
|
|
self._sender_connection_cache = { |
|
|
|
"connection": connection, |
|
|
|
"channel": channel, |
|
|
|
"exchange": exchange, |
|
|
|
"queue": queue, |
|
|
|
} |
|
|
|
|
|
|
|
return self._sender_connection_cache |
|
|
|
|
|
|
|
async def _channel(self, connection): |
|
|
|
return await connection.channel() |
|
|
|
|
|
|
@ -66,13 +89,13 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover |
|
|
|
return queue |
|
|
|
|
|
|
|
async def _publish(self, data): |
|
|
|
connection = await self._connection() |
|
|
|
channel = await self._channel(connection) |
|
|
|
exchange = await self._exchange(channel) |
|
|
|
await exchange.publish( |
|
|
|
aio_pika.Message(body=pickle.dumps(data), |
|
|
|
delivery_mode=aio_pika.DeliveryMode.PERSISTENT), |
|
|
|
routing_key='*' |
|
|
|
full_connection = await self._sender_connection() |
|
|
|
await full_connection["exchange"].publish( |
|
|
|
aio_pika.Message( |
|
|
|
body=pickle.dumps(data), |
|
|
|
delivery_mode=aio_pika.DeliveryMode.PERSISTENT, |
|
|
|
), |
|
|
|
routing_key="*", |
|
|
|
) |
|
|
|
|
|
|
|
async def _listen(self): |
|
|
|