Browse Source

correct kombu implementation of a fanout queue

pull/14/head
Miguel Grinberg 9 years ago
parent
commit
73fd499033
  1. 17
      socketio/kombu_manager.py

17
socketio/kombu_manager.py

@ -1,4 +1,5 @@
import pickle import pickle
import uuid
try: try:
import kombu import kombu
@ -40,15 +41,17 @@ class KombuManager(PubSubManager): # pragma: no cover
'(Run "pip install kombu" in your ' '(Run "pip install kombu" in your '
'virtualenv).') 'virtualenv).')
self.kombu = kombu.Connection(url) self.kombu = kombu.Connection(url)
self.queue = self.kombu.SimpleQueue(channel) self.exchange = kombu.Exchange(channel, type='fanout', durable=False)
self.queue = kombu.Queue(str(uuid.uuid4()), self.exchange)
super(KombuManager, self).__init__(channel=channel) super(KombuManager, self).__init__(channel=channel)
def _publish(self, data): def _publish(self, data):
return self.queue.put(pickle.dumps(data)) with self.kombu.SimpleQueue(self.queue) as queue:
queue.put(pickle.dumps(data))
def _listen(self): def _listen(self):
listen_queue = self.kombu.SimpleQueue(self.channel) with self.kombu.SimpleQueue(self.queue) as queue:
while True: while True:
message = listen_queue.get(block=True) message = queue.get(block=True)
message.ack() message.ack()
yield message.payload yield message.payload

Loading…
Cancel
Save