Browse Source

Merge branch 'master' into middlewares

pull/45/head
Robert Schindler 9 years ago
parent
commit
96161d1bfc
  1. 2
      socketio/__init__.py
  2. 28
      socketio/kombu_manager.py

2
socketio/__init__.py

@ -8,7 +8,7 @@ from .redis_manager import RedisManager
from .server import Server
from .util import apply_interceptor, ignore_interceptor
__version__ = '1.5.1'
__version__ = '1.6.0'
__all__ = [__version__, Interceptor, Middleware, Namespace, Server,
BaseManager, PubSubManager, KombuManager, RedisManager,

28
socketio/kombu_manager.py

@ -45,22 +45,28 @@ class KombuManager(PubSubManager): # pragma: no cover
'virtualenv).')
super(KombuManager, self).__init__(channel=channel)
self.url = url
self.writer_conn = kombu.Connection(self.url)
self.writer_queue = self._queue(self.writer_conn)
self.producer = self._producer()
def _queue(self, conn=None):
exchange = kombu.Exchange(self.channel, type='fanout', durable=False)
queue = kombu.Queue(str(uuid.uuid4()), exchange)
return queue
def _connection(self):
return kombu.Connection(self.url)
def _exchange(self):
return kombu.Exchange(self.channel, type='fanout', durable=False)
def _queue(self):
queue_name = 'flask-socketio.' + str(uuid.uuid4())
return kombu.Queue(queue_name, self._exchange(),
queue_arguments={'x-expires': 300000})
def _producer(self):
return self._connection().Producer(exchange=self._exchange())
def _publish(self, data):
with self.writer_conn.SimpleQueue(self.writer_queue) as queue:
queue.put(pickle.dumps(data))
self.producer.publish(pickle.dumps(data))
def _listen(self):
reader_conn = kombu.Connection(self.url)
reader_queue = self._queue(reader_conn)
with reader_conn.SimpleQueue(reader_queue) as queue:
reader_queue = self._queue()
with self._connection().SimpleQueue(reader_queue) as queue:
while True:
message = queue.get(block=True)
message.ack()

Loading…
Cancel
Save