Browse Source

Add write_only argument to Kombu and Redis manager classes

pull/14/head
Miguel Grinberg 9 years ago
parent
commit
57756e3bdc
  1. 40
      docs/index.rst
  2. 8
      socketio/kombu_manager.py
  3. 6
      socketio/pubsub_manager.py
  4. 12
      socketio/redis_manager.py
  5. 8
      tests/test_pubsub_manager.py

40
docs/index.rst

@ -233,10 +233,11 @@ type of installation, each server processes owns the connections to a subset
of the clients. To make broadcasting work in this environment, the servers of the clients. To make broadcasting work in this environment, the servers
communicate with each other through the message queue. communicate with each other through the message queue.
The message queue service needs to be installed and configured separately. By The message queue service needs to be installed and configured separately. One
default, the server uses `Kombu <http://kombu.readthedocs.org/en/latest/>`_ of the options offered by this package is to use
to access the message queue, so any message queue supported by this package `Kombu <http://kombu.readthedocs.org/en/latest/>`_ to access the message
can be used. Kombu can be installed with pip:: queue, which means that any message queue supported by this package can be
used. Kombu can be installed with pip::
pip install kombu pip install kombu
@ -252,29 +253,40 @@ To configure a Socket.IO server to connect to a message queue, the
following example instructs the server to connect to a Redis service running following example instructs the server to connect to a Redis service running
on the same host and on the default port:: on the same host and on the default port::
redis = socketio.KombuManager('redis://') mgr = socketio.KombuManager('redis://')
sio = socketio.Server(client_manager=redis) sio = socketio.Server(client_manager=mgr)
For a RabbitMQ queue also running on the local server with default For a RabbitMQ queue also running on the local server with default
credentials, the configuration is as follows:: credentials, the configuration is as follows::
amqp = socketio.KombuManager('amqp://') mgr = socketio.KombuManager('amqp://')
sio = socketio.Server(client_manager=amqp) sio = socketio.Server(client_manager=mgr)
The arguments passed to the ``KombuManager`` constructor are passed directly The URL passed to the ``KombuManager`` constructor is passed directly to
to Kombu's `Connection object Kombu's `Connection object
<http://kombu.readthedocs.org/en/latest/userguide/connections.html>`_, so <http://kombu.readthedocs.org/en/latest/userguide/connections.html>`_, so
the Kombu documentation should be consulted for information on how to the Kombu documentation should be consulted for information on how to
connect to the message queue appropriately. connect to the message queue appropriately.
If the use of Kombu is not desired, native Redis support is also offered
through the ``RedisManager`` class. This class takes the same arguments as
``KombuManager``, but connects directly to a Redis store using the queue's
pub/sub functionality::
mgr = socketio.RedisManager('redis://')
sio = socketio.Server(client_manager=mgr)
If multiple Sokcet.IO servers are connected to a message queue, they If multiple Sokcet.IO servers are connected to a message queue, they
automatically communicate with each other and manage a combined client list, automatically communicate with each other and manage a combined client list,
without any need for additional configuration. To have a process other than without any need for additional configuration. To have a process other than
the server connect to the queue to emit a message, the same ``KombuManager`` a server connect to the queue to emit a message, the same ``KombuManager``
class can be used as standalone object. For example:: and ``RedisManager`` classes can be used as standalone object. In this case,
the ``write_only`` argument should be set to ``True`` to disable the creation
of a listening thread. For example::
# connect to the redis queue # connect to the redis queue through Kombu
redis = socketio.KombuManager('redis://localhost:6379/') redis = socketio.KombuManager('redis://', write_only=True)
# emit an event # emit an event
redis.emit('my event', data={'foo': 'bar'}, room='my room') redis.emit('my event', data={'foo': 'bar'}, room='my room')

8
socketio/kombu_manager.py

@ -31,11 +31,14 @@ class KombuManager(PubSubManager): # pragma: no cover
connection URLs. connection URLs.
:param channel: The channel name on which the server sends and receives :param channel: The channel name on which the server sends and receives
notifications. Must be the same in all the servers. notifications. Must be the same in all the servers.
:param write_only: If set ot ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
""" """
name = 'kombu' name = 'kombu'
def __init__(self, url='amqp://guest:guest@localhost:5672//', def __init__(self, url='amqp://guest:guest@localhost:5672//',
channel='socketio'): channel='socketio', write_only=False):
if kombu is None: if kombu is None:
raise RuntimeError('Kombu package is not installed ' raise RuntimeError('Kombu package is not installed '
'(Run "pip install kombu" in your ' '(Run "pip install kombu" in your '
@ -43,7 +46,8 @@ class KombuManager(PubSubManager): # pragma: no cover
self.kombu = kombu.Connection(url) self.kombu = kombu.Connection(url)
self.exchange = kombu.Exchange(channel, type='fanout', durable=False) self.exchange = kombu.Exchange(channel, type='fanout', durable=False)
self.queue = kombu.Queue(str(uuid.uuid4()), self.exchange) self.queue = kombu.Queue(str(uuid.uuid4()), self.exchange)
super(KombuManager, self).__init__(channel=channel) super(KombuManager, self).__init__(channel=channel,
write_only=write_only)
def _publish(self, data): def _publish(self, data):
with self.kombu.SimpleQueue(self.queue) as queue: with self.kombu.SimpleQueue(self.queue) as queue:

6
socketio/pubsub_manager.py

@ -24,14 +24,16 @@ class PubSubManager(BaseManager):
""" """
name = 'pubsub' name = 'pubsub'
def __init__(self, channel='socketio'): def __init__(self, channel='socketio', write_only=False):
super(PubSubManager, self).__init__() super(PubSubManager, self).__init__()
self.channel = channel self.channel = channel
self.write_only = write_only
self.host_id = uuid.uuid4().hex self.host_id = uuid.uuid4().hex
def initialize(self, server): def initialize(self, server):
super(PubSubManager, self).initialize(server) super(PubSubManager, self).initialize(server)
self.thread = self.server.start_background_task(self._thread) if not self.write_only:
self.thread = self.server.start_background_task(self._thread)
self.server.logger.info(self.name + ' backend initialized.') self.server.logger.info(self.name + ' backend initialized.')
def emit(self, event, data, namespace=None, room=None, skip_sid=None, def emit(self, event, data, namespace=None, room=None, skip_sid=None,

12
socketio/redis_manager.py

@ -22,20 +22,26 @@ class RedisManager(PubSubManager): # pragma: no cover
url = 'redis://hostname:port/0' url = 'redis://hostname:port/0'
server = socketio.Server(client_manager=socketio.RedisManager(url)) server = socketio.Server(client_manager=socketio.RedisManager(url))
:param url: The connection URL for the Redis server. :param url: The connection URL for the Redis server. For a default Redis
store running on the same host, use ``redis://``.
:param channel: The channel name on which the server sends and receives :param channel: The channel name on which the server sends and receives
notifications. Must be the same in all the servers. notifications. Must be the same in all the servers.
:param write_only: If set ot ``True``, only initialize to emit events. The
default of ``False`` initializes the class for emitting
and receiving.
""" """
name = 'redis' name = 'redis'
def __init__(self, url='redis://localhost:6379/0', channel='socketio'): def __init__(self, url='redis://localhost:6379/0', channel='socketio',
write_only=False):
if redis is None: if redis is None:
raise RuntimeError('Redis package is not installed ' raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your ' '(Run "pip install redis" in your '
'virtualenv).') 'virtualenv).')
self.redis = redis.Redis.from_url(url) self.redis = redis.Redis.from_url(url)
self.pubsub = self.redis.pubsub() self.pubsub = self.redis.pubsub()
super(RedisManager, self).__init__(channel=channel) super(RedisManager, self).__init__(channel=channel,
write_only=write_only)
def _publish(self, data): def _publish(self, data):
return self.redis.publish(self.channel, pickle.dumps(data)) return self.redis.publish(self.channel, pickle.dumps(data))

8
tests/test_pubsub_manager.py

@ -29,6 +29,14 @@ class TestBaseManager(unittest.TestCase):
self.assertEqual(pubsub.channel, 'foo') self.assertEqual(pubsub.channel, 'foo')
self.assertEqual(len(pubsub.host_id), 32) self.assertEqual(len(pubsub.host_id), 32)
def test_write_only_init(self):
mock_server = mock.MagicMock()
pm = pubsub_manager.PubSubManager(write_only=True)
pm.initialize(mock_server)
self.assertEqual(pm.channel, 'socketio')
self.assertEqual(len(pm.host_id), 32)
self.assertEqual(pm.server.start_background_task.call_count, 0)
def test_emit(self): def test_emit(self):
self.pm.emit('foo', 'bar') self.pm.emit('foo', 'bar')
self.pm._publish.assert_called_once_with( self.pm._publish.assert_called_once_with(

Loading…
Cancel
Save