diff --git a/src/socketio/asyncio_aiopika_manager.py b/src/socketio/asyncio_aiopika_manager.py index c6d7137..92171e0 100644 --- a/src/socketio/asyncio_aiopika_manager.py +++ b/src/socketio/asyncio_aiopika_manager.py @@ -86,7 +86,7 @@ class AsyncAioPikaManager(AsyncPubSubManager): # pragma: no cover delivery_mode=aio_pika.DeliveryMode.PERSISTENT ), routing_key='*', ) - return + break except aio_pika.AMQPException: if retry: self._get_logger().error('Cannot publish to rabbitmq... ' diff --git a/src/socketio/kombu_manager.py b/src/socketio/kombu_manager.py index 61eebd0..7350c15 100644 --- a/src/socketio/kombu_manager.py +++ b/src/socketio/kombu_manager.py @@ -1,4 +1,5 @@ import pickle +import time import uuid try: @@ -61,7 +62,7 @@ class KombuManager(PubSubManager): # pragma: no cover self.exchange_options = exchange_options or {} self.queue_options = queue_options or {} self.producer_options = producer_options or {} - self.producer = self._producer() + self.publisher_connection = self._connection() def initialize(self): super(KombuManager, self).initialize() @@ -92,31 +93,44 @@ class KombuManager(PubSubManager): # pragma: no cover options.update(self.queue_options) return kombu.Queue(queue_name, self._exchange(), **options) - def _producer(self): - return self._connection().Producer(exchange=self._exchange(), - **self.producer_options) - - def __error_callback(self, exception, interval): - self._get_logger().exception('Sleeping {}s'.format(interval)) + def _producer_publish(self, connection): + producer = connection.Producer(exchange=self._exchange(), + **self.producer_options) + return connection.ensure(producer, producer.publish) def _publish(self, data): - connection = self._connection() - publish = connection.ensure(self.producer, self.producer.publish, - errback=self.__error_callback) - publish(pickle.dumps(data)) + retry = True + while True: + try: + producer_publish = self._producer_publish( + self.publisher_connection) + producer_publish(pickle.dumps(data)) + break + except (OSError, kombu.exceptions.KombuError): + if retry: + self._get_logger().error('Cannot publish to rabbitmq... ' + 'retrying') + retry = False + else: + self._get_logger().error( + 'Cannot publish to rabbitmq... giving up') + break def _listen(self): reader_queue = self._queue() - + retry_sleep = 1 while True: - connection = self._connection().ensure_connection( - errback=self.__error_callback) try: - with connection.SimpleQueue(reader_queue) as queue: - while True: - message = queue.get(block=True) - message.ack() - yield message.payload - except connection.connection_errors: - self._get_logger().exception("Connection error " - "while reading from queue") + with self._connection() as connection: + with connection.SimpleQueue(reader_queue) as queue: + while True: + message = queue.get(block=True) + message.ack() + yield message.payload + retry_sleep = 1 + except (OSError, kombu.exceptions.KombuError): + self._get_logger().error( + 'Cannot receive from rabbitmq... ' + 'retrying in {} secs'.format(retry_sleep)) + time.sleep(retry_sleep) + retry_sleep = min(retry_sleep * 2, 60)