From 57756e3bdcc025e1bc31f67f8a44c0b38fc3747c Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Sat, 9 Jan 2016 18:32:08 -0800 Subject: [PATCH] Add write_only argument to Kombu and Redis manager classes --- docs/index.rst | 40 +++++++++++++++++++++++------------- socketio/kombu_manager.py | 8 ++++++-- socketio/pubsub_manager.py | 6 ++++-- socketio/redis_manager.py | 12 ++++++++--- tests/test_pubsub_manager.py | 8 ++++++++ 5 files changed, 53 insertions(+), 21 deletions(-) diff --git a/docs/index.rst b/docs/index.rst index 41cb375..3edcd4e 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -233,10 +233,11 @@ type of installation, each server processes owns the connections to a subset of the clients. To make broadcasting work in this environment, the servers communicate with each other through the message queue. -The message queue service needs to be installed and configured separately. By -default, the server uses `Kombu `_ -to access the message queue, so any message queue supported by this package -can be used. Kombu can be installed with pip:: +The message queue service needs to be installed and configured separately. One +of the options offered by this package is to use +`Kombu `_ to access the message +queue, which means that any message queue supported by this package can be +used. Kombu can be installed with pip:: pip install kombu @@ -252,29 +253,40 @@ To configure a Socket.IO server to connect to a message queue, the following example instructs the server to connect to a Redis service running on the same host and on the default port:: - redis = socketio.KombuManager('redis://') - sio = socketio.Server(client_manager=redis) + mgr = socketio.KombuManager('redis://') + sio = socketio.Server(client_manager=mgr) For a RabbitMQ queue also running on the local server with default credentials, the configuration is as follows:: - amqp = socketio.KombuManager('amqp://') - sio = socketio.Server(client_manager=amqp) + mgr = socketio.KombuManager('amqp://') + sio = socketio.Server(client_manager=mgr) -The arguments passed to the ``KombuManager`` constructor are passed directly -to Kombu's `Connection object +The URL passed to the ``KombuManager`` constructor is passed directly to +Kombu's `Connection object `_, so the Kombu documentation should be consulted for information on how to connect to the message queue appropriately. +If the use of Kombu is not desired, native Redis support is also offered +through the ``RedisManager`` class. This class takes the same arguments as +``KombuManager``, but connects directly to a Redis store using the queue's +pub/sub functionality:: + + + mgr = socketio.RedisManager('redis://') + sio = socketio.Server(client_manager=mgr) + If multiple Sokcet.IO servers are connected to a message queue, they automatically communicate with each other and manage a combined client list, without any need for additional configuration. To have a process other than -the server connect to the queue to emit a message, the same ``KombuManager`` -class can be used as standalone object. For example:: +a server connect to the queue to emit a message, the same ``KombuManager`` +and ``RedisManager`` classes can be used as standalone object. In this case, +the ``write_only`` argument should be set to ``True`` to disable the creation +of a listening thread. For example:: - # connect to the redis queue - redis = socketio.KombuManager('redis://localhost:6379/') + # connect to the redis queue through Kombu + redis = socketio.KombuManager('redis://', write_only=True) # emit an event redis.emit('my event', data={'foo': 'bar'}, room='my room') diff --git a/socketio/kombu_manager.py b/socketio/kombu_manager.py index 0c911ba..3a069af 100644 --- a/socketio/kombu_manager.py +++ b/socketio/kombu_manager.py @@ -31,11 +31,14 @@ class KombuManager(PubSubManager): # pragma: no cover connection URLs. :param channel: The channel name on which the server sends and receives notifications. Must be the same in all the servers. + :param write_only: If set ot ``True``, only initialize to emit events. The + default of ``False`` initializes the class for emitting + and receiving. """ name = 'kombu' def __init__(self, url='amqp://guest:guest@localhost:5672//', - channel='socketio'): + channel='socketio', write_only=False): if kombu is None: raise RuntimeError('Kombu package is not installed ' '(Run "pip install kombu" in your ' @@ -43,7 +46,8 @@ class KombuManager(PubSubManager): # pragma: no cover self.kombu = kombu.Connection(url) self.exchange = kombu.Exchange(channel, type='fanout', durable=False) self.queue = kombu.Queue(str(uuid.uuid4()), self.exchange) - super(KombuManager, self).__init__(channel=channel) + super(KombuManager, self).__init__(channel=channel, + write_only=write_only) def _publish(self, data): with self.kombu.SimpleQueue(self.queue) as queue: diff --git a/socketio/pubsub_manager.py b/socketio/pubsub_manager.py index c454540..8f34786 100644 --- a/socketio/pubsub_manager.py +++ b/socketio/pubsub_manager.py @@ -24,14 +24,16 @@ class PubSubManager(BaseManager): """ name = 'pubsub' - def __init__(self, channel='socketio'): + def __init__(self, channel='socketio', write_only=False): super(PubSubManager, self).__init__() self.channel = channel + self.write_only = write_only self.host_id = uuid.uuid4().hex def initialize(self, server): super(PubSubManager, self).initialize(server) - self.thread = self.server.start_background_task(self._thread) + if not self.write_only: + self.thread = self.server.start_background_task(self._thread) self.server.logger.info(self.name + ' backend initialized.') def emit(self, event, data, namespace=None, room=None, skip_sid=None, diff --git a/socketio/redis_manager.py b/socketio/redis_manager.py index 900baaf..e11ab97 100644 --- a/socketio/redis_manager.py +++ b/socketio/redis_manager.py @@ -22,20 +22,26 @@ class RedisManager(PubSubManager): # pragma: no cover url = 'redis://hostname:port/0' server = socketio.Server(client_manager=socketio.RedisManager(url)) - :param url: The connection URL for the Redis server. + :param url: The connection URL for the Redis server. For a default Redis + store running on the same host, use ``redis://``. :param channel: The channel name on which the server sends and receives notifications. Must be the same in all the servers. + :param write_only: If set ot ``True``, only initialize to emit events. The + default of ``False`` initializes the class for emitting + and receiving. """ name = 'redis' - def __init__(self, url='redis://localhost:6379/0', channel='socketio'): + def __init__(self, url='redis://localhost:6379/0', channel='socketio', + write_only=False): if redis is None: raise RuntimeError('Redis package is not installed ' '(Run "pip install redis" in your ' 'virtualenv).') self.redis = redis.Redis.from_url(url) self.pubsub = self.redis.pubsub() - super(RedisManager, self).__init__(channel=channel) + super(RedisManager, self).__init__(channel=channel, + write_only=write_only) def _publish(self, data): return self.redis.publish(self.channel, pickle.dumps(data)) diff --git a/tests/test_pubsub_manager.py b/tests/test_pubsub_manager.py index 0345eb2..d67dc5b 100644 --- a/tests/test_pubsub_manager.py +++ b/tests/test_pubsub_manager.py @@ -29,6 +29,14 @@ class TestBaseManager(unittest.TestCase): self.assertEqual(pubsub.channel, 'foo') self.assertEqual(len(pubsub.host_id), 32) + def test_write_only_init(self): + mock_server = mock.MagicMock() + pm = pubsub_manager.PubSubManager(write_only=True) + pm.initialize(mock_server) + self.assertEqual(pm.channel, 'socketio') + self.assertEqual(len(pm.host_id), 32) + self.assertEqual(pm.server.start_background_task.call_count, 0) + def test_emit(self): self.pm.emit('foo', 'bar') self.pm._publish.assert_called_once_with(