From 73fd49903387725b47498625005a6a1b13ff6948 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Wed, 30 Dec 2015 14:29:47 -0800 Subject: [PATCH] correct kombu implementation of a fanout queue --- socketio/kombu_manager.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/socketio/kombu_manager.py b/socketio/kombu_manager.py index 05d256d..0c911ba 100644 --- a/socketio/kombu_manager.py +++ b/socketio/kombu_manager.py @@ -1,4 +1,5 @@ import pickle +import uuid try: import kombu @@ -40,15 +41,17 @@ class KombuManager(PubSubManager): # pragma: no cover '(Run "pip install kombu" in your ' 'virtualenv).') self.kombu = kombu.Connection(url) - self.queue = self.kombu.SimpleQueue(channel) + self.exchange = kombu.Exchange(channel, type='fanout', durable=False) + self.queue = kombu.Queue(str(uuid.uuid4()), self.exchange) super(KombuManager, self).__init__(channel=channel) def _publish(self, data): - return self.queue.put(pickle.dumps(data)) + with self.kombu.SimpleQueue(self.queue) as queue: + queue.put(pickle.dumps(data)) def _listen(self): - listen_queue = self.kombu.SimpleQueue(self.channel) - while True: - message = listen_queue.get(block=True) - message.ack() - yield message.payload + with self.kombu.SimpleQueue(self.queue) as queue: + while True: + message = queue.get(block=True) + message.ack() + yield message.payload