Browse Source

some improvements and optimizations to KombuManager class

pull/56/head
Miguel Grinberg 9 years ago
parent
commit
052fd93745
  1. 29
      socketio/kombu_manager.py

29
socketio/kombu_manager.py

@ -45,23 +45,28 @@ class KombuManager(PubSubManager): # pragma: no cover
'virtualenv).')
super(KombuManager, self).__init__(channel=channel)
self.url = url
self.writer_conn = kombu.Connection(self.url)
self.writer_queue = self._queue(self.writer_conn)
self.producer = self._producer()
def _queue(self, conn=None):
exchange = kombu.Exchange(self.channel, type='fanout', durable=False)
queue = kombu.Queue(str(uuid.uuid4()), exchange,
queue_arguments={'x-expires': 300000})
return queue
def _connection(self):
return kombu.Connection(self.url)
def _exchange(self):
return kombu.Exchange(self.channel, type='fanout', durable=False)
def _queue(self):
queue_name = 'flask-socketio.' + str(uuid.uuid4())
return kombu.Queue(queue_name, self._exchange(),
queue_arguments={'x-expires': 300000})
def _producer(self):
return self._connection().Producer(exchange=self._exchange())
def _publish(self, data):
with self.writer_conn.SimpleQueue(self.writer_queue) as queue:
queue.put(pickle.dumps(data))
self.producer.publish(pickle.dumps(data))
def _listen(self):
reader_conn = kombu.Connection(self.url)
reader_queue = self._queue(reader_conn)
with reader_conn.SimpleQueue(reader_queue) as queue:
reader_queue = self._queue()
with self._connection().SimpleQueue(reader_queue) as queue:
while True:
message = queue.get(block=True)
message.ack()

Loading…
Cancel
Save