diff --git a/socketio/kombu_manager.py b/socketio/kombu_manager.py index 4394a6d..4eb9ee4 100644 --- a/socketio/kombu_manager.py +++ b/socketio/kombu_manager.py @@ -34,11 +34,21 @@ class KombuManager(PubSubManager): # pragma: no cover :param write_only: If set ot ``True``, only initialize to emit events. The default of ``False`` initializes the class for emitting and receiving. + :param connection_options: additional keyword arguments to be passed to + ``kombu.Connection()``. + :param exchange_options: additional keyword arguments to be passed to + ``kombu.Exchange()``. + :param queue_options: additional keyword arguments to be passed to + ``kombu.Queue()``. + :param producer_options: additional keyword arguments to be passed to + ``kombu.Producer()``. """ name = 'kombu' def __init__(self, url='amqp://guest:guest@localhost:5672//', - channel='socketio', write_only=False, logger=None): + channel='socketio', write_only=False, logger=None, + connection_options=None, exchange_options=None, + queue_options=None, producer_options=None): if kombu is None: raise RuntimeError('Kombu package is not installed ' '(Run "pip install kombu" in your ' @@ -47,6 +57,10 @@ class KombuManager(PubSubManager): # pragma: no cover write_only=write_only, logger=logger) self.url = url + self.connection_options = connection_options or {} + self.exchange_options = exchange_options or {} + self.queue_options = queue_options or {} + self.producer_options = producer_options or {} self.producer = self._producer() def initialize(self): @@ -65,19 +79,22 @@ class KombuManager(PubSubManager): # pragma: no cover 'with ' + self.server.async_mode) def _connection(self): - return kombu.Connection(self.url) + return kombu.Connection(self.url, **self.connection_options) def _exchange(self): - return kombu.Exchange(self.channel, type='fanout', durable=False) + options = {'type': 'fanout', 'durable': False} + options.update(self.exchange_options) + return kombu.Exchange(self.channel, **options) def _queue(self): queue_name = 'flask-socketio.' + str(uuid.uuid4()) - return kombu.Queue(queue_name, self._exchange(), - durable=False, - queue_arguments={'x-expires': 300000}) + options = {'durable': False, 'queue_arguments': {'x-expires': 300000}} + options.update(self.queue_options) + return kombu.Queue(queue_name, self._exchange(), **options) def _producer(self): - return self._connection().Producer(exchange=self._exchange()) + return self._connection().Producer(exchange=self._exchange(), + **self.producer_options) def __error_callback(self, exception, interval): self._get_logger().exception('Sleeping {}s'.format(interval)) diff --git a/socketio/redis_manager.py b/socketio/redis_manager.py index 69be586..ad38334 100644 --- a/socketio/redis_manager.py +++ b/socketio/redis_manager.py @@ -33,16 +33,19 @@ class RedisManager(PubSubManager): # pragma: no cover :param write_only: If set ot ``True``, only initialize to emit events. The default of ``False`` initializes the class for emitting and receiving. + :param redis_options: additional keyword arguments to be passed to + ``Redis.from_url()``. """ name = 'redis' def __init__(self, url='redis://localhost:6379/0', channel='socketio', - write_only=False, logger=None): + write_only=False, logger=None, redis_options=None): if redis is None: raise RuntimeError('Redis package is not installed ' '(Run "pip install redis" in your ' 'virtualenv).') self.redis_url = url + self.redis_options = redis_options or {} self._redis_connect() super(RedisManager, self).__init__(channel=channel, write_only=write_only, @@ -64,7 +67,8 @@ class RedisManager(PubSubManager): # pragma: no cover 'with ' + self.server.async_mode) def _redis_connect(self): - self.redis = redis.Redis.from_url(self.redis_url) + self.redis = redis.Redis.from_url(self.redis_url, + **self.redis_options) self.pubsub = self.redis.pubsub() def _publish(self, data):