|
@ -20,7 +20,7 @@ class AsyncAioPikaManager(AsyncPubSubManager): |
|
|
follows:: |
|
|
follows:: |
|
|
|
|
|
|
|
|
url = 'amqp://user:password@hostname:port//' |
|
|
url = 'amqp://user:password@hostname:port//' |
|
|
server = socketio.Server(client_manager=socketio.AsyncPikaManager(url)) |
|
|
server = socketio.Server(client_manager=socketio.AsyncAioPikaManager(url)) |
|
|
|
|
|
|
|
|
:param url: The connection URL for the backend messaging queue. Example |
|
|
:param url: The connection URL for the backend messaging queue. Example |
|
|
connection URLs are ``'amqp://guest:guest@localhost:5672//'`` |
|
|
connection URLs are ``'amqp://guest:guest@localhost:5672//'`` |
|
@ -59,7 +59,8 @@ class AsyncAioPikaManager(AsyncPubSubManager): |
|
|
aio_pika.ExchangeType.FANOUT) |
|
|
aio_pika.ExchangeType.FANOUT) |
|
|
|
|
|
|
|
|
async def _queue(self, channel, exchange): |
|
|
async def _queue(self, channel, exchange): |
|
|
queue = await channel.declare_queue(exclusive=True) |
|
|
queue = await channel.declare_queue(durable=False, |
|
|
|
|
|
arguments={'x-expires': 300000}) |
|
|
await queue.bind(exchange) |
|
|
await queue.bind(exchange) |
|
|
return queue |
|
|
return queue |
|
|
|
|
|
|
|
@ -72,13 +73,11 @@ class AsyncAioPikaManager(AsyncPubSubManager): |
|
|
delivery_mode=aio_pika.DeliveryMode.PERSISTENT), |
|
|
delivery_mode=aio_pika.DeliveryMode.PERSISTENT), |
|
|
routing_key='*' |
|
|
routing_key='*' |
|
|
) |
|
|
) |
|
|
self._get_logger().info('published %s', data) |
|
|
|
|
|
|
|
|
|
|
|
async def _listen(self): |
|
|
async def _listen(self): |
|
|
retry_sleep = 1 |
|
|
retry_sleep = 1 |
|
|
while True: |
|
|
while True: |
|
|
try: |
|
|
try: |
|
|
self._get_logger().info('Starting listener') |
|
|
|
|
|
if self.listener_connection is None: |
|
|
if self.listener_connection is None: |
|
|
self.listener_connection = await self._connection() |
|
|
self.listener_connection = await self._connection() |
|
|
self.listener_channel = await self._channel( |
|
|
self.listener_channel = await self._channel( |
|
@ -92,10 +91,8 @@ class AsyncAioPikaManager(AsyncPubSubManager): |
|
|
|
|
|
|
|
|
async with self.listener_queue.iterator() as queue_iter: |
|
|
async with self.listener_queue.iterator() as queue_iter: |
|
|
async for message in queue_iter: |
|
|
async for message in queue_iter: |
|
|
message_body = pickle.loads(message.body) |
|
|
with message.process(): |
|
|
self._get_logger().info('received message %s', |
|
|
return pickle.loads(message.body) |
|
|
message_body) |
|
|
|
|
|
return message_body |
|
|
|
|
|
except Exception: |
|
|
except Exception: |
|
|
self._get_logger().error('Cannot receive from rabbitmq... ' |
|
|
self._get_logger().error('Cannot receive from rabbitmq... ' |
|
|
'retrying in ' |
|
|
'retrying in ' |
|
|