From cc9027586f045b6aa96e832bb287971762fb339d Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Sun, 18 Sep 2016 14:32:49 -0700 Subject: [PATCH 1/3] add a TTL option to Kombu queues when RabbitMQ is used --- socketio/kombu_manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/socketio/kombu_manager.py b/socketio/kombu_manager.py index ee1a772..7c0b06b 100644 --- a/socketio/kombu_manager.py +++ b/socketio/kombu_manager.py @@ -50,7 +50,8 @@ class KombuManager(PubSubManager): # pragma: no cover def _queue(self, conn=None): exchange = kombu.Exchange(self.channel, type='fanout', durable=False) - queue = kombu.Queue(str(uuid.uuid4()), exchange) + queue = kombu.Queue(str(uuid.uuid4()), exchange, + queue_arguments={'x-expires': 300000}) return queue def _publish(self, data): From 052fd937453dc098761dba10c7240c39bdb5d750 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Sat, 24 Sep 2016 23:51:56 -0700 Subject: [PATCH 2/3] some improvements and optimizations to KombuManager class --- socketio/kombu_manager.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/socketio/kombu_manager.py b/socketio/kombu_manager.py index 7c0b06b..3895e8a 100644 --- a/socketio/kombu_manager.py +++ b/socketio/kombu_manager.py @@ -45,23 +45,28 @@ class KombuManager(PubSubManager): # pragma: no cover 'virtualenv).') super(KombuManager, self).__init__(channel=channel) self.url = url - self.writer_conn = kombu.Connection(self.url) - self.writer_queue = self._queue(self.writer_conn) + self.producer = self._producer() - def _queue(self, conn=None): - exchange = kombu.Exchange(self.channel, type='fanout', durable=False) - queue = kombu.Queue(str(uuid.uuid4()), exchange, - queue_arguments={'x-expires': 300000}) - return queue + def _connection(self): + return kombu.Connection(self.url) + + def _exchange(self): + return kombu.Exchange(self.channel, type='fanout', durable=False) + + def _queue(self): + queue_name = 'flask-socketio.' + str(uuid.uuid4()) + return kombu.Queue(queue_name, self._exchange(), + queue_arguments={'x-expires': 300000}) + + def _producer(self): + return self._connection().Producer(exchange=self._exchange()) def _publish(self, data): - with self.writer_conn.SimpleQueue(self.writer_queue) as queue: - queue.put(pickle.dumps(data)) + self.producer.publish(pickle.dumps(data)) def _listen(self): - reader_conn = kombu.Connection(self.url) - reader_queue = self._queue(reader_conn) - with reader_conn.SimpleQueue(reader_queue) as queue: + reader_queue = self._queue() + with self._connection().SimpleQueue(reader_queue) as queue: while True: message = queue.get(block=True) message.ack() From 9a7dc7f9dec94cf5393b14c84e7c522629fd5620 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Sat, 24 Sep 2016 23:54:02 -0700 Subject: [PATCH 3/3] Release 1.6.0 --- socketio/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/socketio/__init__.py b/socketio/__init__.py index 96c46af..11b9f6c 100644 --- a/socketio/__init__.py +++ b/socketio/__init__.py @@ -6,7 +6,7 @@ from .redis_manager import RedisManager from .server import Server from .namespace import Namespace -__version__ = '1.5.1' +__version__ = '1.6.0' __all__ = [__version__, Middleware, Server, BaseManager, PubSubManager, KombuManager, RedisManager, Namespace]