diff --git a/README.rst b/README.rst index 6ee835d..6bae2e4 100644 --- a/README.rst +++ b/README.rst @@ -9,20 +9,31 @@ Python implementation of the `Socket.IO`_ realtime server. Features -------- -- Fully compatible with the Javascript `socket.io-client`_ library. -- Compatible with Python 2.7 and Python 3.3+. -- Based on `Eventlet`_, enabling large number of clients even on modest - hardware. -- Includes a WSGI middleware that integrates Socket.IO traffic with - standard WSGI applications. -- Uses an event-based architecture implemented with decorators that - hides the details of the protocol. -- Implements HTTP long-polling and WebSocket transports. -- Supports XHR2 and XHR browsers as clients. -- Supports text and binary messages. -- Supports gzip and deflate HTTP compression. -- Configurable CORS responses to avoid cross-origin problems with - browsers. +- Fully compatible with the + `Javascript `_, + `Swift `_, + `C++ `_ and + `Java `_ official + Socket.IO clients, plus any third party clients that comply with the + Socket.IO specification. +- Compatible with Python 2.7 and Python 3.3+. +- Supports large number of clients even on modest hardware when used with an + asynchronous server based on `eventlet `_ or + `gevent `_. For development and testing, any WSGI + complaint multi-threaded server can be used. +- Includes a WSGI middleware that integrates Socket.IO traffic with standard + WSGI applications. +- Broadcasting of messages to all connected clients, or to subsets of them + assigned to "rooms". +- Optional support for multiple servers, connected through a messaging queue + such as Redis or RabbitMQ. +- Event-based architecture implemented with decorators that hides the details + of the protocol. +- Support for HTTP long-polling and WebSocket transports. +- Support for XHR2 and XHR browsers. +- Support for text and binary messages. +- Support for gzip and deflate HTTP compression. +- Configurable CORS responses, to avoid cross-origin problems with browsers. Example ------- @@ -74,4 +85,4 @@ Resources .. _socket.io-client: https://github.com/Automattic/socket.io-client .. _Eventlet: http://eventlet.net/ .. _Documentation: http://pythonhosted.org/python-socketio -.. _PyPI: https://pypi.python.org/pypi/python-socketio \ No newline at end of file +.. _PyPI: https://pypi.python.org/pypi/python-socketio diff --git a/docs/index.rst b/docs/index.rst index 9254da6..151e690 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -12,9 +12,13 @@ This project implements an Socket.IO server that can run standalone or integrated with a Python WSGI application. The following are some of its features: -- Fully compatible with the Javascript - `socket.io-client `_ library, - versions 1.3.5 and up. +- Fully compatible with the + `Javascript `_, + `Swift `_, + `C++ `_ and + `Java `_ official + Socket.IO clients, plus any third party clients that comply with the + Socket.IO specification. - Compatible with Python 2.7 and Python 3.3+. - Supports large number of clients even on modest hardware when used with an asynchronous server based on `eventlet `_ or @@ -24,20 +28,22 @@ features: WSGI applications. - Broadcasting of messages to all connected clients, or to subsets of them assigned to "rooms". -- Uses an event-based architecture implemented with decorators that hides the - details of the protocol. +- Optional support for multiple servers, connected through a messaging queue + such as Redis or RabbitMQ. +- Event-based architecture implemented with decorators that hides the details + of the protocol. - Support for HTTP long-polling and WebSocket transports. - Support for XHR2 and XHR browsers. - Support for text and binary messages. - Support for gzip and deflate HTTP compression. -- Configurable CORS responses to avoid cross-origin problems with browsers. +- Configurable CORS responses, to avoid cross-origin problems with browsers. What is Socket.IO? ------------------ Socket.IO is a transport protocol that enables real-time bidirectional event-based communication between clients (typically web browsers) and a -server. The official implementations of the client and server components are +server. The original implementations of the client and server components are written in JavaScript. Getting Started @@ -357,13 +363,16 @@ address this important limitation. API Reference ------------- -.. toctree:: - :maxdepth: 2 - .. module:: socketio - .. autoclass:: Middleware :members: - .. autoclass:: Server :members: +.. autoclass:: BaseManager + :members: +.. autoclass:: PubSubManager + :members: +.. autoclass:: KombuManager + :members: +.. autoclass:: RedisManager + :members: diff --git a/socketio/__init__.py b/socketio/__init__.py index 6e95349..be5882f 100644 --- a/socketio/__init__.py +++ b/socketio/__init__.py @@ -1,4 +1,9 @@ from .middleware import Middleware +from .base_manager import BaseManager +from .pubsub_manager import PubSubManager +from .kombu_manager import KombuManager +from .redis_manager import RedisManager from .server import Server -__all__ = [Middleware, Server] +__all__ = [Middleware, Server, BaseManager, PubSubManager, KombuManager, + RedisManager] diff --git a/socketio/base_manager.py b/socketio/base_manager.py index 7d13f1f..19e22f7 100644 --- a/socketio/base_manager.py +++ b/socketio/base_manager.py @@ -12,12 +12,15 @@ class BaseManager(object): services. More sophisticated storage backends can be implemented by subclasses. """ - def __init__(self, server): - self.server = server + def __init__(self): + self.server = None self.rooms = {} self.pending_removals = [] self.callbacks = {} + def initialize(self, server): + self.server = server + def get_namespaces(self): """Return an iterable with the active namespace names.""" return six.iterkeys(self.rooms) @@ -69,7 +72,7 @@ class BaseManager(object): except KeyError: pass - def close_room(self, namespace, room): + def close_room(self, room, namespace): """Remove all participants from a room.""" try: for sid in self.get_participants(namespace, room): diff --git a/socketio/kombu_manager.py b/socketio/kombu_manager.py new file mode 100644 index 0000000..837f054 --- /dev/null +++ b/socketio/kombu_manager.py @@ -0,0 +1,64 @@ +import json +import pickle + +import six +try: + import kombu +except ImportError: + kombu = None + +from .pubsub_manager import PubSubManager + + +class KombuManager(PubSubManager): + """Client manager that uses kombu for inter-process messaging. + + This class implements a client manager backend for event sharing across + multiple processes, using RabbitMQ, Redis or any other messaging mechanism + supported by `kombu `_. + + To use a kombu backend, initialize the :class:`Server` instance as + follows:: + + url = 'amqp://user:password@hostname:port//' + server = socketio.Server(client_manager=socketio.KombuManager(url)) + + :param url: The connection URL for the backend messaging queue. Example + connection URLs are ``'amqp://guest:guest@localhost:5672//'`` + and ``'redis://localhost:6379/'`` for RabbitMQ and Redis + respectively. Consult the `kombu documentation + `_ for more on how to construct + connection URLs. + :param channel: The channel name on which the server sends and receives + notifications. Must be the same in all the servers. + """ + name = 'kombu' + + def __init__(self, url='amqp://guest:guest@localhost:5672//', + channel='socketio'): + if kombu is None: + raise RuntimeError('Kombu package is not installed ' + '(Run "pip install kombu" in your ' + 'virtualenv).') + self.kombu = kombu.Connection(url) + self.queue = self.kombu.SimpleQueue(channel) + super(KombuManager, self).__init__(channel=channel) + + def _publish(self, data): + return self.queue.put(pickle.dumps(data)) + + def _listen(self): + listen_queue = self.kombu.SimpleQueue(self.channel) + while True: + message = listen_queue.get(block=True) + message.ack() + data = None + if isinstance(message.payload, six.binary_type): + try: + data = pickle.loads(message.payload) + except pickle.PickleError: + pass + if data is None: + data = json.loads(message.payload) + yield data diff --git a/socketio/pubsub_manager.py b/socketio/pubsub_manager.py new file mode 100644 index 0000000..01a03ec --- /dev/null +++ b/socketio/pubsub_manager.py @@ -0,0 +1,77 @@ +from .base_manager import BaseManager + + +class PubSubManager(BaseManager): + """Manage a client list attached to a pub/sub backend. + + This is a base class that enables multiple servers to share the list of + clients, with the servers communicating events through a pub/sub backend. + The use of a pub/sub backend also allows any client connected to the + backend to emit events addressed to Socket.IO clients. + + The actual backends must be implemented by subclasses, this class only + provides a pub/sub generic framework. + + :param channel: The channel name on which the server sends and receives + notifications. + """ + def __init__(self, channel='socketio'): + super(PubSubManager, self).__init__() + self.channel = channel + + def initialize(self, server): + super(PubSubManager, self).initialize(server) + self.thread = self.server.start_background_task(self._thread) + self.server.logger.info(self.name + ' backend initialized.') + + def emit(self, event, data, namespace=None, room=None, skip_sid=None, + callback=None): + """Emit a message to a single client, a room, or all the clients + connected to the namespace. + + This method takes care or propagating the message to all the servers + that are connected through the message queue. + + The parameters are the same as in :meth:`.Server.emit`. + """ + self._publish({'method': 'emit', 'event': event, 'data': data, + 'namespace': namespace or '/', 'room': room, + 'skip_sid': skip_sid, 'callback': callback}) + + def close_room(self, room, namespace=None): + self._publish({'method': 'close_room', 'room': room, + 'namespace': namespace or '/'}) + + def _publish(self, data): + """Publish a message on the Socket.IO channel. + + This method needs to be implemented by the different subclasses that + support pub/sub backends. + """ + raise NotImplementedError('This method must be implemented in a ' + 'subclass.') + + def _listen(self): + """Return the next message published on the Socket.IO channel, + blocking until a message is available. + + This method needs to be implemented by the different subclasses that + support pub/sub backends. + """ + raise NotImplementedError('This method must be implemented in a ' + 'subclass.') + + def _thread(self): + for message in self._listen(): + if 'method' in message: + if message['method'] == 'emit': + super(PubSubManager, self).emit( + message['event'], message['data'], + namespace=message.get('namespace'), + room=message.get('room'), + skip_sid=message.get('skip_sid'), + callback=message.get('callback')) + elif message['method'] == 'close_room': + super(PubSubManager, self).close_room( + room=message.get('room'), + namespace=message.get('namespace')) diff --git a/socketio/redis_manager.py b/socketio/redis_manager.py new file mode 100644 index 0000000..735bbcf --- /dev/null +++ b/socketio/redis_manager.py @@ -0,0 +1,60 @@ +import json +import pickle + +import six +try: + import redis +except ImportError: + redis = None + +from .pubsub_manager import PubSubManager + + +class RedisManager(PubSubManager): + """Redis based client manager. + + This class implements a Redis backend for event sharing across multiple + processes. Only kept here as one more example of how to build a custom + backend, since the kombu backend is perfectly adequate to support a Redis + message queue. + + To use a Redis backend, initialize the :class:`Server` instance as + follows:: + + url = 'redis://hostname:port/0' + server = socketio.Server(client_manager=socketio.RedisManager(url)) + + :param url: The connection URL for the Redis server. + :param channel: The channel name on which the server sends and receives + notifications. Must be the same in all the servers. + """ + name = 'redis' + + def __init__(self, url='redis://localhost:6379/0', channel='socketio'): + if redis is None: + raise RuntimeError('Redis package is not installed ' + '(Run "pip install redis" in your ' + 'virtualenv).') + self.redis = redis.Redis.from_url(url) + self.pubsub = self.redis.pubsub() + super(RedisManager, self).__init__(channel=channel) + + def _publish(self, data): + return self.redis.publish(self.channel, pickle.dumps(data)) + + def _listen(self): + channel = self.channel.encode('utf-8') + self.pubsub.subscribe(self.channel) + for message in self.pubsub.listen(): + if message['channel'] == channel and \ + message['type'] == 'message' and 'data' in message: + data = None + if isinstance(message['data'], six.binary_type): + try: + data = pickle.loads(message['data']) + except pickle.PickleError: + pass + if data is None: + data = json.loads(message['data']) + yield data + self.pubsub.unsubscribe(self.channel) diff --git a/socketio/server.py b/socketio/server.py index dcb1bae..0f22f98 100644 --- a/socketio/server.py +++ b/socketio/server.py @@ -14,9 +14,9 @@ class Server(object): for websocket and long-polling transports. :param client_manager: The client manager instance that will manage the - client list. By default the client list is stored - in an in-memory structure, which prevents the use - of multiple worker processes. + client list. When this is omitted, the client list + is stored in an in-memory structure, so the use of + multiple connected servers is not possible. :param logger: To enable logging set to ``True`` or pass a logger object to use. To disable logging set to ``False``. :param binary: ``True`` to support binary payloads, ``False`` to treat all @@ -62,9 +62,6 @@ class Server(object): """ def __init__(self, client_manager=None, logger=False, binary=False, json=None, **kwargs): - if client_manager is None: - client_manager = base_manager.BaseManager(self) - self.manager = client_manager engineio_options = kwargs engineio_logger = engineio_options.pop('engineio_logger', None) if engineio_logger is not None: @@ -97,6 +94,11 @@ class Server(object): self.logger.setLevel(logging.ERROR) self.logger.addHandler(logging.StreamHandler()) + if client_manager is None: + client_manager = base_manager.BaseManager() + client_manager.initialize(self) + self.manager = client_manager + def on(self, event, handler=None, namespace=None): """Register an event handler. @@ -248,7 +250,7 @@ class Server(object): """ namespace = namespace or '/' self.logger.info('room %s is closing [%s]', room, namespace) - self.manager.close_room(namespace, room) + self.manager.close_room(room, namespace) def rooms(self, sid, namespace=None): """Return the rooms a client is in. @@ -300,6 +302,9 @@ class Server(object): """ return self.eio.handle_request(environ, start_response) + def start_background_task(self, target, *args, **kwargs): + self.eio.start_background_task(target, *args, **kwargs) + def _emit_internal(self, sid, event, data, namespace=None, id=None): """Send a message to a client.""" if six.PY2 and not self.binary: diff --git a/tests/test_base_manager.py b/tests/test_base_manager.py index 1cdfa6b..0badf69 100644 --- a/tests/test_base_manager.py +++ b/tests/test_base_manager.py @@ -12,7 +12,8 @@ from socketio import base_manager class TestBaseManager(unittest.TestCase): def setUp(self): mock_server = mock.MagicMock() - self.bm = base_manager.BaseManager(mock_server) + self.bm = base_manager.BaseManager() + self.bm.initialize(mock_server) def test_connect(self): self.bm.connect('123', '/foo') @@ -142,11 +143,11 @@ class TestBaseManager(unittest.TestCase): self.bm.connect('789', '/foo') self.bm.enter_room('123', '/foo', 'bar') self.bm.enter_room('123', '/foo', 'bar') - self.bm.close_room('/foo', 'bar') + self.bm.close_room('bar', '/foo') self.assertNotIn('bar', self.bm.rooms['/foo']) def test_close_invalid_room(self): - self.bm.close_room('/foo', 'bar') + self.bm.close_room('bar', '/foo') def test_rooms(self): self.bm.connect('123', '/foo') diff --git a/tests/test_server.py b/tests/test_server.py index 142e207..c7a7a6d 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -92,13 +92,13 @@ class TestServer(unittest.TestCase): mgr = mock.MagicMock() s = server.Server(client_manager=mgr) s.close_room('room', namespace='/foo') - s.manager.close_room.assert_called_once_with('/foo', 'room') + s.manager.close_room.assert_called_once_with('room', '/foo') def test_close_room_default_namespace(self, eio): mgr = mock.MagicMock() s = server.Server(client_manager=mgr) s.close_room('room') - s.manager.close_room.assert_called_once_with('/', 'room') + s.manager.close_room.assert_called_once_with('room', '/') def test_rooms(self, eio): mgr = mock.MagicMock() @@ -397,3 +397,9 @@ class TestServer(unittest.TestCase): # restore the default JSON module packet.Packet.json = json + + def test_start_background_task(self, eio): + s = server.Server() + s.start_background_task('foo', 'bar', baz='baz') + s.eio.start_background_task.assert_called_once_with('foo', 'bar', + baz='baz')