From 47620bbebd92a1b388df72a88c0ca35cdb530073 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Mon, 23 Nov 2015 23:07:48 -0800 Subject: [PATCH 1/6] initial implementation of inter-process communication --- README.rst | 41 ++++++++++++-------- docs/index.rst | 33 ++++++++++------ socketio/__init__.py | 7 +++- socketio/base_manager.py | 9 +++-- socketio/kombu_manager.py | 64 +++++++++++++++++++++++++++++++ socketio/pubsub_manager.py | 77 ++++++++++++++++++++++++++++++++++++++ socketio/redis_manager.py | 60 +++++++++++++++++++++++++++++ socketio/server.py | 19 ++++++---- tests/test_base_manager.py | 7 ++-- tests/test_server.py | 10 ++++- 10 files changed, 284 insertions(+), 43 deletions(-) create mode 100644 socketio/kombu_manager.py create mode 100644 socketio/pubsub_manager.py create mode 100644 socketio/redis_manager.py diff --git a/README.rst b/README.rst index 6ee835d..6bae2e4 100644 --- a/README.rst +++ b/README.rst @@ -9,20 +9,31 @@ Python implementation of the `Socket.IO`_ realtime server. Features -------- -- Fully compatible with the Javascript `socket.io-client`_ library. -- Compatible with Python 2.7 and Python 3.3+. -- Based on `Eventlet`_, enabling large number of clients even on modest - hardware. -- Includes a WSGI middleware that integrates Socket.IO traffic with - standard WSGI applications. -- Uses an event-based architecture implemented with decorators that - hides the details of the protocol. -- Implements HTTP long-polling and WebSocket transports. -- Supports XHR2 and XHR browsers as clients. -- Supports text and binary messages. -- Supports gzip and deflate HTTP compression. -- Configurable CORS responses to avoid cross-origin problems with - browsers. +- Fully compatible with the + `Javascript `_, + `Swift `_, + `C++ `_ and + `Java `_ official + Socket.IO clients, plus any third party clients that comply with the + Socket.IO specification. +- Compatible with Python 2.7 and Python 3.3+. +- Supports large number of clients even on modest hardware when used with an + asynchronous server based on `eventlet `_ or + `gevent `_. For development and testing, any WSGI + complaint multi-threaded server can be used. +- Includes a WSGI middleware that integrates Socket.IO traffic with standard + WSGI applications. +- Broadcasting of messages to all connected clients, or to subsets of them + assigned to "rooms". +- Optional support for multiple servers, connected through a messaging queue + such as Redis or RabbitMQ. +- Event-based architecture implemented with decorators that hides the details + of the protocol. +- Support for HTTP long-polling and WebSocket transports. +- Support for XHR2 and XHR browsers. +- Support for text and binary messages. +- Support for gzip and deflate HTTP compression. +- Configurable CORS responses, to avoid cross-origin problems with browsers. Example ------- @@ -74,4 +85,4 @@ Resources .. _socket.io-client: https://github.com/Automattic/socket.io-client .. _Eventlet: http://eventlet.net/ .. _Documentation: http://pythonhosted.org/python-socketio -.. _PyPI: https://pypi.python.org/pypi/python-socketio \ No newline at end of file +.. _PyPI: https://pypi.python.org/pypi/python-socketio diff --git a/docs/index.rst b/docs/index.rst index 9254da6..151e690 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -12,9 +12,13 @@ This project implements an Socket.IO server that can run standalone or integrated with a Python WSGI application. The following are some of its features: -- Fully compatible with the Javascript - `socket.io-client `_ library, - versions 1.3.5 and up. +- Fully compatible with the + `Javascript `_, + `Swift `_, + `C++ `_ and + `Java `_ official + Socket.IO clients, plus any third party clients that comply with the + Socket.IO specification. - Compatible with Python 2.7 and Python 3.3+. - Supports large number of clients even on modest hardware when used with an asynchronous server based on `eventlet `_ or @@ -24,20 +28,22 @@ features: WSGI applications. - Broadcasting of messages to all connected clients, or to subsets of them assigned to "rooms". -- Uses an event-based architecture implemented with decorators that hides the - details of the protocol. +- Optional support for multiple servers, connected through a messaging queue + such as Redis or RabbitMQ. +- Event-based architecture implemented with decorators that hides the details + of the protocol. - Support for HTTP long-polling and WebSocket transports. - Support for XHR2 and XHR browsers. - Support for text and binary messages. - Support for gzip and deflate HTTP compression. -- Configurable CORS responses to avoid cross-origin problems with browsers. +- Configurable CORS responses, to avoid cross-origin problems with browsers. What is Socket.IO? ------------------ Socket.IO is a transport protocol that enables real-time bidirectional event-based communication between clients (typically web browsers) and a -server. The official implementations of the client and server components are +server. The original implementations of the client and server components are written in JavaScript. Getting Started @@ -357,13 +363,16 @@ address this important limitation. API Reference ------------- -.. toctree:: - :maxdepth: 2 - .. module:: socketio - .. autoclass:: Middleware :members: - .. autoclass:: Server :members: +.. autoclass:: BaseManager + :members: +.. autoclass:: PubSubManager + :members: +.. autoclass:: KombuManager + :members: +.. autoclass:: RedisManager + :members: diff --git a/socketio/__init__.py b/socketio/__init__.py index 6e95349..be5882f 100644 --- a/socketio/__init__.py +++ b/socketio/__init__.py @@ -1,4 +1,9 @@ from .middleware import Middleware +from .base_manager import BaseManager +from .pubsub_manager import PubSubManager +from .kombu_manager import KombuManager +from .redis_manager import RedisManager from .server import Server -__all__ = [Middleware, Server] +__all__ = [Middleware, Server, BaseManager, PubSubManager, KombuManager, + RedisManager] diff --git a/socketio/base_manager.py b/socketio/base_manager.py index 7d13f1f..19e22f7 100644 --- a/socketio/base_manager.py +++ b/socketio/base_manager.py @@ -12,12 +12,15 @@ class BaseManager(object): services. More sophisticated storage backends can be implemented by subclasses. """ - def __init__(self, server): - self.server = server + def __init__(self): + self.server = None self.rooms = {} self.pending_removals = [] self.callbacks = {} + def initialize(self, server): + self.server = server + def get_namespaces(self): """Return an iterable with the active namespace names.""" return six.iterkeys(self.rooms) @@ -69,7 +72,7 @@ class BaseManager(object): except KeyError: pass - def close_room(self, namespace, room): + def close_room(self, room, namespace): """Remove all participants from a room.""" try: for sid in self.get_participants(namespace, room): diff --git a/socketio/kombu_manager.py b/socketio/kombu_manager.py new file mode 100644 index 0000000..837f054 --- /dev/null +++ b/socketio/kombu_manager.py @@ -0,0 +1,64 @@ +import json +import pickle + +import six +try: + import kombu +except ImportError: + kombu = None + +from .pubsub_manager import PubSubManager + + +class KombuManager(PubSubManager): + """Client manager that uses kombu for inter-process messaging. + + This class implements a client manager backend for event sharing across + multiple processes, using RabbitMQ, Redis or any other messaging mechanism + supported by `kombu `_. + + To use a kombu backend, initialize the :class:`Server` instance as + follows:: + + url = 'amqp://user:password@hostname:port//' + server = socketio.Server(client_manager=socketio.KombuManager(url)) + + :param url: The connection URL for the backend messaging queue. Example + connection URLs are ``'amqp://guest:guest@localhost:5672//'`` + and ``'redis://localhost:6379/'`` for RabbitMQ and Redis + respectively. Consult the `kombu documentation + `_ for more on how to construct + connection URLs. + :param channel: The channel name on which the server sends and receives + notifications. Must be the same in all the servers. + """ + name = 'kombu' + + def __init__(self, url='amqp://guest:guest@localhost:5672//', + channel='socketio'): + if kombu is None: + raise RuntimeError('Kombu package is not installed ' + '(Run "pip install kombu" in your ' + 'virtualenv).') + self.kombu = kombu.Connection(url) + self.queue = self.kombu.SimpleQueue(channel) + super(KombuManager, self).__init__(channel=channel) + + def _publish(self, data): + return self.queue.put(pickle.dumps(data)) + + def _listen(self): + listen_queue = self.kombu.SimpleQueue(self.channel) + while True: + message = listen_queue.get(block=True) + message.ack() + data = None + if isinstance(message.payload, six.binary_type): + try: + data = pickle.loads(message.payload) + except pickle.PickleError: + pass + if data is None: + data = json.loads(message.payload) + yield data diff --git a/socketio/pubsub_manager.py b/socketio/pubsub_manager.py new file mode 100644 index 0000000..01a03ec --- /dev/null +++ b/socketio/pubsub_manager.py @@ -0,0 +1,77 @@ +from .base_manager import BaseManager + + +class PubSubManager(BaseManager): + """Manage a client list attached to a pub/sub backend. + + This is a base class that enables multiple servers to share the list of + clients, with the servers communicating events through a pub/sub backend. + The use of a pub/sub backend also allows any client connected to the + backend to emit events addressed to Socket.IO clients. + + The actual backends must be implemented by subclasses, this class only + provides a pub/sub generic framework. + + :param channel: The channel name on which the server sends and receives + notifications. + """ + def __init__(self, channel='socketio'): + super(PubSubManager, self).__init__() + self.channel = channel + + def initialize(self, server): + super(PubSubManager, self).initialize(server) + self.thread = self.server.start_background_task(self._thread) + self.server.logger.info(self.name + ' backend initialized.') + + def emit(self, event, data, namespace=None, room=None, skip_sid=None, + callback=None): + """Emit a message to a single client, a room, or all the clients + connected to the namespace. + + This method takes care or propagating the message to all the servers + that are connected through the message queue. + + The parameters are the same as in :meth:`.Server.emit`. + """ + self._publish({'method': 'emit', 'event': event, 'data': data, + 'namespace': namespace or '/', 'room': room, + 'skip_sid': skip_sid, 'callback': callback}) + + def close_room(self, room, namespace=None): + self._publish({'method': 'close_room', 'room': room, + 'namespace': namespace or '/'}) + + def _publish(self, data): + """Publish a message on the Socket.IO channel. + + This method needs to be implemented by the different subclasses that + support pub/sub backends. + """ + raise NotImplementedError('This method must be implemented in a ' + 'subclass.') + + def _listen(self): + """Return the next message published on the Socket.IO channel, + blocking until a message is available. + + This method needs to be implemented by the different subclasses that + support pub/sub backends. + """ + raise NotImplementedError('This method must be implemented in a ' + 'subclass.') + + def _thread(self): + for message in self._listen(): + if 'method' in message: + if message['method'] == 'emit': + super(PubSubManager, self).emit( + message['event'], message['data'], + namespace=message.get('namespace'), + room=message.get('room'), + skip_sid=message.get('skip_sid'), + callback=message.get('callback')) + elif message['method'] == 'close_room': + super(PubSubManager, self).close_room( + room=message.get('room'), + namespace=message.get('namespace')) diff --git a/socketio/redis_manager.py b/socketio/redis_manager.py new file mode 100644 index 0000000..735bbcf --- /dev/null +++ b/socketio/redis_manager.py @@ -0,0 +1,60 @@ +import json +import pickle + +import six +try: + import redis +except ImportError: + redis = None + +from .pubsub_manager import PubSubManager + + +class RedisManager(PubSubManager): + """Redis based client manager. + + This class implements a Redis backend for event sharing across multiple + processes. Only kept here as one more example of how to build a custom + backend, since the kombu backend is perfectly adequate to support a Redis + message queue. + + To use a Redis backend, initialize the :class:`Server` instance as + follows:: + + url = 'redis://hostname:port/0' + server = socketio.Server(client_manager=socketio.RedisManager(url)) + + :param url: The connection URL for the Redis server. + :param channel: The channel name on which the server sends and receives + notifications. Must be the same in all the servers. + """ + name = 'redis' + + def __init__(self, url='redis://localhost:6379/0', channel='socketio'): + if redis is None: + raise RuntimeError('Redis package is not installed ' + '(Run "pip install redis" in your ' + 'virtualenv).') + self.redis = redis.Redis.from_url(url) + self.pubsub = self.redis.pubsub() + super(RedisManager, self).__init__(channel=channel) + + def _publish(self, data): + return self.redis.publish(self.channel, pickle.dumps(data)) + + def _listen(self): + channel = self.channel.encode('utf-8') + self.pubsub.subscribe(self.channel) + for message in self.pubsub.listen(): + if message['channel'] == channel and \ + message['type'] == 'message' and 'data' in message: + data = None + if isinstance(message['data'], six.binary_type): + try: + data = pickle.loads(message['data']) + except pickle.PickleError: + pass + if data is None: + data = json.loads(message['data']) + yield data + self.pubsub.unsubscribe(self.channel) diff --git a/socketio/server.py b/socketio/server.py index dcb1bae..0f22f98 100644 --- a/socketio/server.py +++ b/socketio/server.py @@ -14,9 +14,9 @@ class Server(object): for websocket and long-polling transports. :param client_manager: The client manager instance that will manage the - client list. By default the client list is stored - in an in-memory structure, which prevents the use - of multiple worker processes. + client list. When this is omitted, the client list + is stored in an in-memory structure, so the use of + multiple connected servers is not possible. :param logger: To enable logging set to ``True`` or pass a logger object to use. To disable logging set to ``False``. :param binary: ``True`` to support binary payloads, ``False`` to treat all @@ -62,9 +62,6 @@ class Server(object): """ def __init__(self, client_manager=None, logger=False, binary=False, json=None, **kwargs): - if client_manager is None: - client_manager = base_manager.BaseManager(self) - self.manager = client_manager engineio_options = kwargs engineio_logger = engineio_options.pop('engineio_logger', None) if engineio_logger is not None: @@ -97,6 +94,11 @@ class Server(object): self.logger.setLevel(logging.ERROR) self.logger.addHandler(logging.StreamHandler()) + if client_manager is None: + client_manager = base_manager.BaseManager() + client_manager.initialize(self) + self.manager = client_manager + def on(self, event, handler=None, namespace=None): """Register an event handler. @@ -248,7 +250,7 @@ class Server(object): """ namespace = namespace or '/' self.logger.info('room %s is closing [%s]', room, namespace) - self.manager.close_room(namespace, room) + self.manager.close_room(room, namespace) def rooms(self, sid, namespace=None): """Return the rooms a client is in. @@ -300,6 +302,9 @@ class Server(object): """ return self.eio.handle_request(environ, start_response) + def start_background_task(self, target, *args, **kwargs): + self.eio.start_background_task(target, *args, **kwargs) + def _emit_internal(self, sid, event, data, namespace=None, id=None): """Send a message to a client.""" if six.PY2 and not self.binary: diff --git a/tests/test_base_manager.py b/tests/test_base_manager.py index 1cdfa6b..0badf69 100644 --- a/tests/test_base_manager.py +++ b/tests/test_base_manager.py @@ -12,7 +12,8 @@ from socketio import base_manager class TestBaseManager(unittest.TestCase): def setUp(self): mock_server = mock.MagicMock() - self.bm = base_manager.BaseManager(mock_server) + self.bm = base_manager.BaseManager() + self.bm.initialize(mock_server) def test_connect(self): self.bm.connect('123', '/foo') @@ -142,11 +143,11 @@ class TestBaseManager(unittest.TestCase): self.bm.connect('789', '/foo') self.bm.enter_room('123', '/foo', 'bar') self.bm.enter_room('123', '/foo', 'bar') - self.bm.close_room('/foo', 'bar') + self.bm.close_room('bar', '/foo') self.assertNotIn('bar', self.bm.rooms['/foo']) def test_close_invalid_room(self): - self.bm.close_room('/foo', 'bar') + self.bm.close_room('bar', '/foo') def test_rooms(self): self.bm.connect('123', '/foo') diff --git a/tests/test_server.py b/tests/test_server.py index 142e207..c7a7a6d 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -92,13 +92,13 @@ class TestServer(unittest.TestCase): mgr = mock.MagicMock() s = server.Server(client_manager=mgr) s.close_room('room', namespace='/foo') - s.manager.close_room.assert_called_once_with('/foo', 'room') + s.manager.close_room.assert_called_once_with('room', '/foo') def test_close_room_default_namespace(self, eio): mgr = mock.MagicMock() s = server.Server(client_manager=mgr) s.close_room('room') - s.manager.close_room.assert_called_once_with('/', 'room') + s.manager.close_room.assert_called_once_with('room', '/') def test_rooms(self, eio): mgr = mock.MagicMock() @@ -397,3 +397,9 @@ class TestServer(unittest.TestCase): # restore the default JSON module packet.Packet.json = json + + def test_start_background_task(self, eio): + s = server.Server() + s.start_background_task('foo', 'bar', baz='baz') + s.eio.start_background_task.assert_called_once_with('foo', 'bar', + baz='baz') From 63f5ed3429f96afd8f3a7cd82f281c2e5db93de1 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Sun, 6 Dec 2015 19:11:20 -0800 Subject: [PATCH 2/6] Support for callbacks across servers --- socketio/base_manager.py | 10 ++++-- socketio/kombu_manager.py | 10 ++++-- socketio/pubsub_manager.py | 63 ++++++++++++++++++++++++++++++++------ socketio/redis_manager.py | 10 ++++-- tests/test_base_manager.py | 11 +++---- tests/test_server.py | 4 ++- 6 files changed, 82 insertions(+), 26 deletions(-) diff --git a/socketio/base_manager.py b/socketio/base_manager.py index 19e22f7..e0f35e0 100644 --- a/socketio/base_manager.py +++ b/socketio/base_manager.py @@ -104,12 +104,16 @@ class BaseManager(object): def trigger_callback(self, sid, namespace, id, data): """Invoke an application callback.""" + callback = None try: callback = self.callbacks[sid][namespace][id] except KeyError: - raise ValueError('Unknown callback') - del self.callbacks[sid][namespace][id] - callback(*data) + # if we get an unknown callback we just ignore it + self.server.logger.warning('Unknown callback received, ignoring.') + else: + del self.callbacks[sid][namespace][id] + if callback is not None: + callback(*data) def _generate_ack_id(self, sid, namespace, callback): """Generate a unique identifier for an ACK packet.""" diff --git a/socketio/kombu_manager.py b/socketio/kombu_manager.py index 837f054..c5e7298 100644 --- a/socketio/kombu_manager.py +++ b/socketio/kombu_manager.py @@ -57,8 +57,12 @@ class KombuManager(PubSubManager): if isinstance(message.payload, six.binary_type): try: data = pickle.loads(message.payload) - except pickle.PickleError: + except: pass if data is None: - data = json.loads(message.payload) - yield data + try: + data = json.loads(message.payload) + except: + pass + if data: + yield data diff --git a/socketio/pubsub_manager.py b/socketio/pubsub_manager.py index 01a03ec..bb21e0e 100644 --- a/socketio/pubsub_manager.py +++ b/socketio/pubsub_manager.py @@ -1,3 +1,6 @@ +from functools import partial +import uuid + from .base_manager import BaseManager @@ -18,6 +21,7 @@ class PubSubManager(BaseManager): def __init__(self, channel='socketio'): super(PubSubManager, self).__init__() self.channel = channel + self.host_id = uuid.uuid4().hex def initialize(self, server): super(PubSubManager, self).initialize(server) @@ -34,8 +38,14 @@ class PubSubManager(BaseManager): The parameters are the same as in :meth:`.Server.emit`. """ + namespace = namespace or '/' + if callback is not None: + id = self._generate_ack_id(room, namespace, callback) + callback = (room, namespace, id) + else: + callback = None self._publish({'method': 'emit', 'event': event, 'data': data, - 'namespace': namespace or '/', 'room': room, + 'namespace': namespace, 'room': room, 'skip_sid': skip_sid, 'callback': callback}) def close_room(self, room, namespace=None): @@ -61,17 +71,50 @@ class PubSubManager(BaseManager): raise NotImplementedError('This method must be implemented in a ' 'subclass.') + def _handle_emit(self, message): + # Events with callbacks are very tricky to handle across hosts + # Here in the receiving end we set up a local callback that preserves + # the callback host and id from the sender + remote_callback = message.get('callback') + if remote_callback is not None and len(remote_callback) == 3: + callback = partial(self._return_callback, self.host_id, + *remote_callback) + else: + callback = None + super(PubSubManager, self).emit(message['event'], message['data'], + namespace=message.get('namespace'), + room=message.get('room'), + skip_sid=message.get('skip_sid'), + callback=callback) + + def _handle_callback(self, message): + if self.host_id == message.get('host_id'): + try: + sid = message['sid'] + namespace = message['namespace'] + id = message['id'] + args = message['args'] + except KeyError: + return + self.trigger_callback(sid, namespace, id, args) + + def _return_callback(self, host_id, sid, namespace, callback_id, *args): + # When an event callback is received, the callback is returned back + # the sender, which is identified by the host_id + self._publish({'method': 'callback', 'host_id': host_id, + 'sid': sid, 'namespace': namespace, 'id': callback_id, + 'args': args}) + + def _handle_close_room(self, message): + super(PubSubManager, self).close_room( + room=message.get('room'), namespace=message.get('namespace')) + def _thread(self): for message in self._listen(): if 'method' in message: if message['method'] == 'emit': - super(PubSubManager, self).emit( - message['event'], message['data'], - namespace=message.get('namespace'), - room=message.get('room'), - skip_sid=message.get('skip_sid'), - callback=message.get('callback')) + self._handle_emit(message) + elif message['method'] == 'callback': + self._handle_callback(message) elif message['method'] == 'close_room': - super(PubSubManager, self).close_room( - room=message.get('room'), - namespace=message.get('namespace')) + self._handle_close_room(message) diff --git a/socketio/redis_manager.py b/socketio/redis_manager.py index 735bbcf..9004fae 100644 --- a/socketio/redis_manager.py +++ b/socketio/redis_manager.py @@ -52,9 +52,13 @@ class RedisManager(PubSubManager): if isinstance(message['data'], six.binary_type): try: data = pickle.loads(message['data']) - except pickle.PickleError: + except: pass if data is None: - data = json.loads(message['data']) - yield data + try: + data = json.loads(message['data']) + except: + pass + if data: + yield data self.pubsub.unsubscribe(self.channel) diff --git a/tests/test_base_manager.py b/tests/test_base_manager.py index 0badf69..8ff9369 100644 --- a/tests/test_base_manager.py +++ b/tests/test_base_manager.py @@ -104,12 +104,11 @@ class TestBaseManager(unittest.TestCase): self.bm.connect('123', '/') cb = mock.MagicMock() id = self.bm._generate_ack_id('123', '/', cb) - self.assertRaises(ValueError, self.bm.trigger_callback, - '124', '/', id, ['foo']) - self.assertRaises(ValueError, self.bm.trigger_callback, - '123', '/foo', id, ['foo']) - self.assertRaises(ValueError, self.bm.trigger_callback, - '123', '/', id + 1, ['foo']) + + # these should not raise an exception + self.bm.trigger_callback('124', '/', id, ['foo']) + self.bm.trigger_callback('123', '/foo', id, ['foo']) + self.bm.trigger_callback('123', '/', id + 1, ['foo']) self.assertEqual(cb.call_count, 0) def test_get_namespaces(self): diff --git a/tests/test_server.py b/tests/test_server.py index c7a7a6d..6f2c7ca 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -274,7 +274,9 @@ class TestServer(unittest.TestCase): s._handle_eio_message('123', '61-1["my message","a",' '{"_placeholder":true,"num":0}]') self.assertEqual(s._attachment_count, 1) - self.assertRaises(ValueError, s._handle_eio_message, '123', b'foo') + # the following call should not raise an exception in spite of the + # callback id being invalid + s._handle_eio_message('123', b'foo') def test_handle_event_with_ack(self, eio): s = server.Server() From 71142aa2db1434ab799edebeaa97020c6ec21089 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Mon, 7 Dec 2015 00:06:10 -0800 Subject: [PATCH 3/6] pubsub unit tests --- socketio/kombu_manager.py | 18 +-- socketio/pubsub_manager.py | 40 +++++-- socketio/redis_manager.py | 18 +-- tests/test_pubsub_manager.py | 209 +++++++++++++++++++++++++++++++++++ 4 files changed, 244 insertions(+), 41 deletions(-) create mode 100644 tests/test_pubsub_manager.py diff --git a/socketio/kombu_manager.py b/socketio/kombu_manager.py index c5e7298..05d256d 100644 --- a/socketio/kombu_manager.py +++ b/socketio/kombu_manager.py @@ -1,7 +1,5 @@ -import json import pickle -import six try: import kombu except ImportError: @@ -10,7 +8,7 @@ except ImportError: from .pubsub_manager import PubSubManager -class KombuManager(PubSubManager): +class KombuManager(PubSubManager): # pragma: no cover """Client manager that uses kombu for inter-process messaging. This class implements a client manager backend for event sharing across @@ -53,16 +51,4 @@ class KombuManager(PubSubManager): while True: message = listen_queue.get(block=True) message.ack() - data = None - if isinstance(message.payload, six.binary_type): - try: - data = pickle.loads(message.payload) - except: - pass - if data is None: - try: - data = json.loads(message.payload) - except: - pass - if data: - yield data + yield message.payload diff --git a/socketio/pubsub_manager.py b/socketio/pubsub_manager.py index bb21e0e..48ca09f 100644 --- a/socketio/pubsub_manager.py +++ b/socketio/pubsub_manager.py @@ -1,6 +1,10 @@ from functools import partial import uuid +import json +import pickle +import six + from .base_manager import BaseManager @@ -18,6 +22,8 @@ class PubSubManager(BaseManager): :param channel: The channel name on which the server sends and receives notifications. """ + name = 'pubsub' + def __init__(self, channel='socketio'): super(PubSubManager, self).__init__() self.channel = channel @@ -40,6 +46,8 @@ class PubSubManager(BaseManager): """ namespace = namespace or '/' if callback is not None: + if room is None: + raise ValueError('Cannot use callback without a room set.') id = self._generate_ack_id(room, namespace, callback) callback = (room, namespace, id) else: @@ -59,7 +67,7 @@ class PubSubManager(BaseManager): support pub/sub backends. """ raise NotImplementedError('This method must be implemented in a ' - 'subclass.') + 'subclass.') # pragma: no cover def _listen(self): """Return the next message published on the Socket.IO channel, @@ -69,7 +77,7 @@ class PubSubManager(BaseManager): support pub/sub backends. """ raise NotImplementedError('This method must be implemented in a ' - 'subclass.') + 'subclass.') # pragma: no cover def _handle_emit(self, message): # Events with callbacks are very tricky to handle across hosts @@ -111,10 +119,24 @@ class PubSubManager(BaseManager): def _thread(self): for message in self._listen(): - if 'method' in message: - if message['method'] == 'emit': - self._handle_emit(message) - elif message['method'] == 'callback': - self._handle_callback(message) - elif message['method'] == 'close_room': - self._handle_close_room(message) + data = None + if isinstance(message, dict): + data = message + else: + if isinstance(message, six.binary_type): # pragma: no cover + try: + data = pickle.loads(message) + except: + pass + if data is None: + try: + data = json.loads(message) + except: + pass + if data and 'method' in data: + if data['method'] == 'emit': + self._handle_emit(data) + elif data['method'] == 'callback': + self._handle_callback(data) + elif data['method'] == 'close_room': + self._handle_close_room(data) diff --git a/socketio/redis_manager.py b/socketio/redis_manager.py index 9004fae..900baaf 100644 --- a/socketio/redis_manager.py +++ b/socketio/redis_manager.py @@ -1,7 +1,5 @@ -import json import pickle -import six try: import redis except ImportError: @@ -10,7 +8,7 @@ except ImportError: from .pubsub_manager import PubSubManager -class RedisManager(PubSubManager): +class RedisManager(PubSubManager): # pragma: no cover """Redis based client manager. This class implements a Redis backend for event sharing across multiple @@ -48,17 +46,5 @@ class RedisManager(PubSubManager): for message in self.pubsub.listen(): if message['channel'] == channel and \ message['type'] == 'message' and 'data' in message: - data = None - if isinstance(message['data'], six.binary_type): - try: - data = pickle.loads(message['data']) - except: - pass - if data is None: - try: - data = json.loads(message['data']) - except: - pass - if data: - yield data + yield message['data'] self.pubsub.unsubscribe(self.channel) diff --git a/tests/test_pubsub_manager.py b/tests/test_pubsub_manager.py new file mode 100644 index 0000000..9fab1d7 --- /dev/null +++ b/tests/test_pubsub_manager.py @@ -0,0 +1,209 @@ +import functools +import unittest + +import six +if six.PY3: + from unittest import mock +else: + import mock + +from socketio import base_manager +from socketio import pubsub_manager + + +class TestBaseManager(unittest.TestCase): + def setUp(self): + mock_server = mock.MagicMock() + self.pm = pubsub_manager.PubSubManager() + self.pm._publish = mock.MagicMock() + self.pm.initialize(mock_server) + + def test_default_init(self): + self.assertEqual(self.pm.channel, 'socketio') + self.assertEqual(len(self.pm.host_id), 32) + self.pm.server.start_background_task.assert_called_once_with( + self.pm._thread) + + def test_custom_init(self): + pubsub = pubsub_manager.PubSubManager(channel='foo') + self.assertEqual(pubsub.channel, 'foo') + self.assertEqual(len(pubsub.host_id), 32) + + def test_emit(self): + self.pm.emit('foo', 'bar') + self.pm._publish.assert_called_once_with( + {'method': 'emit', 'event': 'foo', 'data': 'bar', + 'namespace': '/', 'room': None, 'skip_sid': None, + 'callback': None}) + + def test_emit_with_namespace(self): + self.pm.emit('foo', 'bar', namespace='/baz') + self.pm._publish.assert_called_once_with( + {'method': 'emit', 'event': 'foo', 'data': 'bar', + 'namespace': '/baz', 'room': None, 'skip_sid': None, + 'callback': None}) + + def test_emit_with_room(self): + self.pm.emit('foo', 'bar', room='baz') + self.pm._publish.assert_called_once_with( + {'method': 'emit', 'event': 'foo', 'data': 'bar', + 'namespace': '/', 'room': 'baz', 'skip_sid': None, + 'callback': None}) + + def test_emit_with_skip_sid(self): + self.pm.emit('foo', 'bar', skip_sid='baz') + self.pm._publish.assert_called_once_with( + {'method': 'emit', 'event': 'foo', 'data': 'bar', + 'namespace': '/', 'room': None, 'skip_sid': 'baz', + 'callback': None}) + + def test_emit_with_callback(self): + with mock.patch.object(self.pm, '_generate_ack_id', + return_value='123'): + self.pm.emit('foo', 'bar', room='baz', callback='cb') + self.pm._publish.assert_called_once_with( + {'method': 'emit', 'event': 'foo', 'data': 'bar', + 'namespace': '/', 'room': 'baz', 'skip_sid': None, + 'callback': ('baz', '/', '123')}) + + def test_emit_with_callback_missing_room(self): + with mock.patch.object(self.pm, '_generate_ack_id', + return_value='123'): + self.assertRaises(ValueError, self.pm.emit, 'foo', 'bar', + callback='cb') + + def test_close_room(self): + self.pm.close_room('foo') + self.pm._publish.assert_called_once_with( + {'method': 'close_room', 'room': 'foo', 'namespace': '/'}) + + def test_close_room_with_namespace(self): + self.pm.close_room('foo', '/bar') + self.pm._publish.assert_called_once_with( + {'method': 'close_room', 'room': 'foo', 'namespace': '/bar'}) + + def test_handle_emit(self): + with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: + self.pm._handle_emit({'event': 'foo', 'data': 'bar'}) + super_emit.assert_called_once_with('foo', 'bar', namespace=None, + room=None, skip_sid=None, + callback=None) + + def test_handle_emit_with_namespace(self): + with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: + self.pm._handle_emit({'event': 'foo', 'data': 'bar', + 'namespace': '/baz'}) + super_emit.assert_called_once_with('foo', 'bar', namespace='/baz', + room=None, skip_sid=None, + callback=None) + + def test_handle_emiti_with_room(self): + with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: + self.pm._handle_emit({'event': 'foo', 'data': 'bar', + 'room': 'baz'}) + super_emit.assert_called_once_with('foo', 'bar', namespace=None, + room='baz', skip_sid=None, + callback=None) + + def test_handle_emit_with_skip_sid(self): + with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: + self.pm._handle_emit({'event': 'foo', 'data': 'bar', + 'skip_sid': '123'}) + super_emit.assert_called_once_with('foo', 'bar', namespace=None, + room=None, skip_sid='123', + callback=None) + + def test_handle_emit_with_callback(self): + host_id = self.pm.host_id + with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: + self.pm._handle_emit({'event': 'foo', 'data': 'bar', + 'namespace': '/baz', + 'callback': ('sid', '/baz', 123)}) + self.assertEqual(super_emit.call_count, 1) + self.assertEqual(super_emit.call_args[0], ('foo', 'bar')) + self.assertEqual(super_emit.call_args[1]['namespace'], '/baz') + self.assertIsNone(super_emit.call_args[1]['room']) + self.assertIsNone(super_emit.call_args[1]['skip_sid']) + self.assertIsInstance(super_emit.call_args[1]['callback'], + functools.partial) + super_emit.call_args[1]['callback']('one', 2, 'three') + self.pm._publish.assert_called_once_with( + {'method': 'callback', 'host_id': host_id, 'sid': 'sid', + 'namespace': '/baz', 'id': 123, 'args': ('one', 2, 'three')}) + + def test_handle_callback(self): + host_id = self.pm.host_id + with mock.patch.object(self.pm, 'trigger_callback') as trigger: + self.pm._handle_callback({'method': 'callback', + 'host_id': host_id, 'sid': 'sid', + 'namespace': '/', 'id': 123, + 'args': ('one', 2)}) + trigger.assert_called_once_with('sid', '/', 123, ('one', 2)) + + def test_handle_callback_bad_host_id(self): + with mock.patch.object(self.pm, 'trigger_callback') as trigger: + self.pm._handle_callback({'method': 'callback', + 'host_id': 'bad', 'sid': 'sid', + 'namespace': '/', 'id': 123, + 'args': ('one', 2)}) + self.assertEqual(trigger.call_count, 0) + + def test_handle_callback_missing_args(self): + host_id = self.pm.host_id + with mock.patch.object(self.pm, 'trigger_callback') as trigger: + self.pm._handle_callback({'method': 'callback', + 'host_id': host_id, 'sid': 'sid', + 'namespace': '/', 'id': 123}) + self.pm._handle_callback({'method': 'callback', + 'host_id': host_id, 'sid': 'sid', + 'namespace': '/'}) + self.pm._handle_callback({'method': 'callback', + 'host_id': host_id, 'sid': 'sid'}) + self.pm._handle_callback({'method': 'callback', + 'host_id': host_id}) + self.assertEqual(trigger.call_count, 0) + + def test_handle_close_room(self): + with mock.patch.object(base_manager.BaseManager, 'close_room') \ + as super_close_room: + self.pm._handle_close_room({'method': 'close_room', + 'room': 'foo'}) + super_close_room.assert_called_once_with(room='foo', + namespace=None) + + def test_handle_close_room_with_namespace(self): + with mock.patch.object(base_manager.BaseManager, 'close_room') \ + as super_close_room: + self.pm._handle_close_room({'method': 'close_room', + 'room': 'foo', 'namespace': '/bar'}) + super_close_room.assert_called_once_with(room='foo', + namespace='/bar') + + def test_background_thread(self): + self.pm._handle_emit = mock.MagicMock() + self.pm._handle_callback = mock.MagicMock() + self.pm._handle_close_room = mock.MagicMock() + + def messages(): + import pickle + yield {'method': 'emit', 'value': 'foo'} + yield {'missing': 'method'} + yield '{"method": "callback", "value": "bar"}' + yield {'method': 'bogus'} + yield pickle.dumps({'method': 'close_room', 'value': 'baz'}) + yield 'bad json' + yield b'bad pickled' + raise KeyboardInterrupt + + self.pm._listen = mock.MagicMock(side_effect=messages) + try: + self.pm._thread() + except KeyboardInterrupt: + pass + + self.pm._handle_emit.assert_called_once_with( + {'method': 'emit', 'value': 'foo'}) + self.pm._handle_callback.assert_called_once_with( + {'method': 'callback', 'value': 'bar'}) + self.pm._handle_close_room.assert_called_once_with( + {'method': 'close_room', 'value': 'baz'}) From 8ee4cf7e7557aeb02079c8402029eec5b68c79b6 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Sun, 13 Dec 2015 00:27:56 -0800 Subject: [PATCH 4/6] message queue documentation --- docs/index.rst | 112 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 85 insertions(+), 27 deletions(-) diff --git a/docs/index.rst b/docs/index.rst index 151e690..acb66a2 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -8,7 +8,7 @@ socketio documentation :ref:`genindex` | :ref:`modindex` | :ref:`search` -This project implements an Socket.IO server that can run standalone or +This project implements a Socket.IO server that can run standalone or integrated with a Python WSGI application. The following are some of its features: @@ -49,6 +49,10 @@ written in JavaScript. Getting Started --------------- +The Socket.IO server can be installed with pip:: + + pip install python-socketio + The following is a basic example of a Socket.IO server that uses Flask to deploy the client code to the browser:: @@ -106,8 +110,8 @@ Rooms Because Socket.IO is a bidirectional protocol, the server can send messages to any connected client at any time. To make it easy to address groups of clients, -the application can put clients into rooms, and then address messages to all -the clients in a room. +the application can put clients into rooms, and then address messages to the +entire room. When clients first connect, they are assigned to their own rooms, named with the session ID (the ``sid`` argument passed to all event handlers). The @@ -204,6 +208,65 @@ methods in the :class:`socketio.Server` class. When the ``namespace`` argument is omitted, set to ``None`` or to ``'/'``, the default namespace, representing the physical connection, is used. +Using a Message Queue +--------------------- + +The Socket.IO server owns the socket connections to all the clients, so it is +the only process that can emit events to them. A common need of larger +applications is to emit events to clients from a different process, like a +a `Celery `_ worker, or any other auxiliary +process that works in conjunction with the server. + +To enable these other processes to emit events, the server can be configured +to listen for events to emit to clients on a message queue such as +`Redis `_ or `RabbitMQ `_. +Processes that need to emit events to client then post these events to the +queue. + +Another situation in which the use of a message queue is necessary is with +high traffic applications that work with large number of clients. To support +these clients, it may be necessary to horizontally scale the Socket.IO +server by splitting the client list among multiple server processes. For this +type of installation, the server processes communicate with each other through +ta message queue. + +The message queue service needs to be installed and configured separately. By +default, the server uses `Kombu `_ +to read and write to the queue, so any message queue supported by this package +can be used. Kombu can be installed with pip:: + + pip install kombu + +To configure a Socket.IO server to connect to a message queue, the +``client_manager`` argument must be passed in the server creation. The +following example instructs the server to connect to a Redis service running +on the same host and on the default port:: + + redis = socketio.KombuManager('redis://localhost:6379/') + sio = socketio.Server(client_manager=redis) + +For a RabbitMQ queue also running on the local server, the configuration is +as follows:: + + amqp = socketio.KombuManager('amqp://guest:guest@localhost:5672//') + sio = socketio.Server(client_manager=amqp) + +The arguments passed to the ``KombuManager`` constructor are passed directly +to Kombu's `Connection object +`_. + +If multiple Sokcet.IO servers are connected to a message queue, they +automatically communicate with each other and manage a combine client list, +without any need for additional configuration. To have a process other than +the server connect to the queue to emit a message, the same ``KombuManager`` +class can be used. For example:: + + # connect to the redis queue + redis = socketio.KombuManager('redis://localhost:6379/') + + # emit an event + redis.emit('my event', data={'foo': 'bar'}, room='my room') + Deployment ---------- @@ -239,16 +302,14 @@ command to launch the application under gunicorn is shown below:: $ gunicorn -k eventlet -w 1 module:app Due to limitations in its load balancing algorithm, gunicorn can only be used -with one worker process, so the ``-w 1`` option is required. Note that a -single eventlet worker can handle a large number of concurrent clients. - -Another limitation when using gunicorn is that the WebSocket transport is not -available, because this transport it requires extensions to the WSGI standard. +with one worker process, so the ``-w`` option cannot be set to a value higher +than 1. A single eventlet worker can handle a large number of concurrent +clients, each handled by a greenlet. -Note: Eventlet provides a ``monkey_patch()`` function that replaces all the -blocking functions in the standard library with equivalent asynchronous -versions. While python-socketio does not require monkey patching, other -libraries such as database drivers are likely to require it. +Eventlet provides a ``monkey_patch()`` function that replaces all the blocking +functions in the standard library with equivalent asynchronous versions. While +python-socketio does not require monkey patching, other libraries such as +database drivers are likely to require it. Gevent ~~~~~~ @@ -293,14 +354,14 @@ Or to include WebSocket:: $ gunicorn -k geventwebsocket.gunicorn.workers.GeventWebSocketWorker -w 1 module: app Same as with eventlet, due to limitations in its load balancing algorithm, -gunicorn can only be used with one worker process, so the ``-w 1`` option is -required. Note that a single eventlet worker can handle a large number of -concurrent clients. +gunicorn can only be used with one worker process, so the ``-w`` option cannot +be higher than 1. A single gevent worker can handle a large number of +concurrent clients through the use of greenlets. -Note: Gevent provides a ``monkey_patch()`` function that replaces all the -blocking functions in the standard library with equivalent asynchronous -versions. While python-socketio does not require monkey patching, other -libraries such as database drivers are likely to require it. +Gevent provides a ``monkey_patch()`` function that replaces all the blocking +functions in the standard library with equivalent asynchronous versions. While +python-socketio does not require monkey patching, other libraries such as +database drivers are likely to require it. Standard Threading Library ~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -351,14 +412,11 @@ multiple servers), the following conditions must be met: using eventlet, gevent, or standard threads. Worker processes that only handle one request at a time are not supported. - The load balancer must be configured to always forward requests from a - client to the same process. Load balancers call this *sticky sessions*, or - *session affinity*. - -A limitation in the current release of the Socket.IO server is that because -the clients are randomly assigned to different server processes, any form of -broadcasting is not supported. A storage backend that enables multiple -processes to share information about clients is currently in development to -address this important limitation. + client to the same worker process. Load balancers call this *sticky + sessions*, or *session affinity*. +- The worker processes communicate with each other through a message queue, + which must be installed and configured. See the section on using message + queues above for instructions. API Reference ------------- From 6ae89688d72ec7f570a0b04a1bbf0483865bb5b5 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Sun, 13 Dec 2015 16:35:52 -0800 Subject: [PATCH 5/6] do not allow callbacks outside of a server context --- socketio/pubsub_manager.py | 3 +++ tests/test_pubsub_manager.py | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/socketio/pubsub_manager.py b/socketio/pubsub_manager.py index 48ca09f..c454540 100644 --- a/socketio/pubsub_manager.py +++ b/socketio/pubsub_manager.py @@ -46,6 +46,9 @@ class PubSubManager(BaseManager): """ namespace = namespace or '/' if callback is not None: + if self.server is None: + raise RuntimeError('Callbacks can only be issued from the ' + 'context of a server.') if room is None: raise ValueError('Cannot use callback without a room set.') id = self._generate_ack_id(room, namespace, callback) diff --git a/tests/test_pubsub_manager.py b/tests/test_pubsub_manager.py index 9fab1d7..0345eb2 100644 --- a/tests/test_pubsub_manager.py +++ b/tests/test_pubsub_manager.py @@ -66,6 +66,11 @@ class TestBaseManager(unittest.TestCase): 'namespace': '/', 'room': 'baz', 'skip_sid': None, 'callback': ('baz', '/', '123')}) + def test_emit_with_callback_without_server(self): + standalone_pm = pubsub_manager.PubSubManager() + self.assertRaises(RuntimeError, standalone_pm.emit, 'foo', 'bar', + callback='cb') + def test_emit_with_callback_missing_room(self): with mock.patch.object(self.pm, '_generate_ack_id', return_value='123'): From e56ff807f4faabb4e7ef339fda79de5b8a201b26 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Sun, 13 Dec 2015 16:36:06 -0800 Subject: [PATCH 6/6] documentation updates --- README.rst | 2 ++ docs/index.rst | 15 +++++++++------ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/README.rst b/README.rst index 6bae2e4..faaf4ec 100644 --- a/README.rst +++ b/README.rst @@ -27,6 +27,8 @@ Features assigned to "rooms". - Optional support for multiple servers, connected through a messaging queue such as Redis or RabbitMQ. +- Send messages to clients from external processes, such as Celery workers or + auxiliary scripts. - Event-based architecture implemented with decorators that hides the details of the protocol. - Support for HTTP long-polling and WebSocket transports. diff --git a/docs/index.rst b/docs/index.rst index acb66a2..dbb0e63 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -30,6 +30,8 @@ features: assigned to "rooms". - Optional support for multiple servers, connected through a messaging queue such as Redis or RabbitMQ. +- Send messages to clients from external processes, such as Celery workers or + auxiliary scripts. - Event-based architecture implemented with decorators that hides the details of the protocol. - Support for HTTP long-polling and WebSocket transports. @@ -212,13 +214,14 @@ Using a Message Queue --------------------- The Socket.IO server owns the socket connections to all the clients, so it is -the only process that can emit events to them. A common need of larger -applications is to emit events to clients from a different process, like a -a `Celery `_ worker, or any other auxiliary -process that works in conjunction with the server. +the only process that can emit events to them. Unfortunately this becomes a +limitation for many applications, as a common need is to emit events to +clients from a different process, like a +`Celery `_ worker, or any other auxiliary +process or script that works in conjunction with the server. To enable these other processes to emit events, the server can be configured -to listen for events to emit to clients on a message queue such as +to listen for externally issued events on a message queue such as `Redis `_ or `RabbitMQ `_. Processes that need to emit events to client then post these events to the queue. @@ -232,7 +235,7 @@ ta message queue. The message queue service needs to be installed and configured separately. By default, the server uses `Kombu `_ -to read and write to the queue, so any message queue supported by this package +to access the message queue, so any message queue supported by this package can be used. Kombu can be installed with pip:: pip install kombu