diff --git a/socketio/kombu_manager.py b/socketio/kombu_manager.py index 7c0b06b..3895e8a 100644 --- a/socketio/kombu_manager.py +++ b/socketio/kombu_manager.py @@ -45,23 +45,28 @@ class KombuManager(PubSubManager): # pragma: no cover 'virtualenv).') super(KombuManager, self).__init__(channel=channel) self.url = url - self.writer_conn = kombu.Connection(self.url) - self.writer_queue = self._queue(self.writer_conn) + self.producer = self._producer() - def _queue(self, conn=None): - exchange = kombu.Exchange(self.channel, type='fanout', durable=False) - queue = kombu.Queue(str(uuid.uuid4()), exchange, - queue_arguments={'x-expires': 300000}) - return queue + def _connection(self): + return kombu.Connection(self.url) + + def _exchange(self): + return kombu.Exchange(self.channel, type='fanout', durable=False) + + def _queue(self): + queue_name = 'flask-socketio.' + str(uuid.uuid4()) + return kombu.Queue(queue_name, self._exchange(), + queue_arguments={'x-expires': 300000}) + + def _producer(self): + return self._connection().Producer(exchange=self._exchange()) def _publish(self, data): - with self.writer_conn.SimpleQueue(self.writer_queue) as queue: - queue.put(pickle.dumps(data)) + self.producer.publish(pickle.dumps(data)) def _listen(self): - reader_conn = kombu.Connection(self.url) - reader_queue = self._queue(reader_conn) - with reader_conn.SimpleQueue(reader_queue) as queue: + reader_queue = self._queue() + with self._connection().SimpleQueue(reader_queue) as queue: while True: message = queue.get(block=True) message.ack()