From 47620bbebd92a1b388df72a88c0ca35cdb530073 Mon Sep 17 00:00:00 2001
From: Miguel Grinberg <miguelgrinberg50@gmail.com>
Date: Mon, 23 Nov 2015 23:07:48 -0800
Subject: [PATCH] initial implementation of inter-process communication

---
 README.rst                 | 41 ++++++++++++--------
 docs/index.rst             | 33 ++++++++++------
 socketio/__init__.py       |  7 +++-
 socketio/base_manager.py   |  9 +++--
 socketio/kombu_manager.py  | 64 +++++++++++++++++++++++++++++++
 socketio/pubsub_manager.py | 77 ++++++++++++++++++++++++++++++++++++++
 socketio/redis_manager.py  | 60 +++++++++++++++++++++++++++++
 socketio/server.py         | 19 ++++++----
 tests/test_base_manager.py |  7 ++--
 tests/test_server.py       | 10 ++++-
 10 files changed, 284 insertions(+), 43 deletions(-)
 create mode 100644 socketio/kombu_manager.py
 create mode 100644 socketio/pubsub_manager.py
 create mode 100644 socketio/redis_manager.py

diff --git a/README.rst b/README.rst
index 6ee835d..6bae2e4 100644
--- a/README.rst
+++ b/README.rst
@@ -9,20 +9,31 @@ Python implementation of the `Socket.IO`_ realtime server.
 Features
 --------
 
--  Fully compatible with the Javascript `socket.io-client`_ library.
--  Compatible with Python 2.7 and Python 3.3+.
--  Based on `Eventlet`_, enabling large number of clients even on modest
-   hardware.
--  Includes a WSGI middleware that integrates Socket.IO traffic with
-   standard WSGI applications.
--  Uses an event-based architecture implemented with decorators that
-   hides the details of the protocol.
--  Implements HTTP long-polling and WebSocket transports.
--  Supports XHR2 and XHR browsers as clients.
--  Supports text and binary messages.
--  Supports gzip and deflate HTTP compression.
--  Configurable CORS responses to avoid cross-origin problems with
-   browsers.
+- Fully compatible with the 
+  `Javascript <https://github.com/Automattic/socket.io-client>`_,
+  `Swift <https://github.com/socketio/socket.io-client-swift>`_,
+  `C++ <https://github.com/socketio/socket.io-client-cpp>`_ and
+  `Java <https://github.com/socketio/socket.io-client-java>`_ official
+  Socket.IO clients, plus any third party clients that comply with the
+  Socket.IO specification.
+- Compatible with Python 2.7 and Python 3.3+.
+- Supports large number of clients even on modest hardware when used with an
+  asynchronous server based on `eventlet <http://eventlet.net/>`_ or
+  `gevent <http://gevent.org/>`_. For development and testing, any WSGI
+  complaint multi-threaded server can be used.
+- Includes a WSGI middleware that integrates Socket.IO traffic with standard
+  WSGI applications.
+- Broadcasting of messages to all connected clients, or to subsets of them
+  assigned to "rooms".
+- Optional support for multiple servers, connected through a messaging queue
+  such as Redis or RabbitMQ.
+- Event-based architecture implemented with decorators that hides the details
+  of the protocol.
+- Support for HTTP long-polling and WebSocket transports.
+- Support for XHR2 and XHR browsers.
+- Support for text and binary messages.
+- Support for gzip and deflate HTTP compression.
+- Configurable CORS responses, to avoid cross-origin problems with browsers.
 
 Example
 -------
@@ -74,4 +85,4 @@ Resources
 .. _socket.io-client: https://github.com/Automattic/socket.io-client
 .. _Eventlet: http://eventlet.net/
 .. _Documentation: http://pythonhosted.org/python-socketio
-.. _PyPI: https://pypi.python.org/pypi/python-socketio
\ No newline at end of file
+.. _PyPI: https://pypi.python.org/pypi/python-socketio
diff --git a/docs/index.rst b/docs/index.rst
index 9254da6..151e690 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -12,9 +12,13 @@ This project implements an Socket.IO server that can run standalone or
 integrated with a Python WSGI application. The following are some of its
 features:
 
-- Fully compatible with the Javascript
-  `socket.io-client <https://github.com/Automattic/socket.io-client>`_ library,
-  versions 1.3.5 and up.
+- Fully compatible with the 
+  `Javascript <https://github.com/Automattic/socket.io-client>`_,
+  `Swift <https://github.com/socketio/socket.io-client-swift>`_,
+  `C++ <https://github.com/socketio/socket.io-client-cpp>`_ and
+  `Java <https://github.com/socketio/socket.io-client-java>`_ official
+  Socket.IO clients, plus any third party clients that comply with the
+  Socket.IO specification.
 - Compatible with Python 2.7 and Python 3.3+.
 - Supports large number of clients even on modest hardware when used with an
   asynchronous server based on `eventlet <http://eventlet.net/>`_ or
@@ -24,20 +28,22 @@ features:
   WSGI applications.
 - Broadcasting of messages to all connected clients, or to subsets of them
   assigned to "rooms".
-- Uses an event-based architecture implemented with decorators that hides the
-  details of the protocol.
+- Optional support for multiple servers, connected through a messaging queue
+  such as Redis or RabbitMQ.
+- Event-based architecture implemented with decorators that hides the details
+  of the protocol.
 - Support for HTTP long-polling and WebSocket transports.
 - Support for XHR2 and XHR browsers.
 - Support for text and binary messages.
 - Support for gzip and deflate HTTP compression.
-- Configurable CORS responses to avoid cross-origin problems with browsers.
+- Configurable CORS responses, to avoid cross-origin problems with browsers.
 
 What is Socket.IO?
 ------------------
 
 Socket.IO is a transport protocol that enables real-time bidirectional
 event-based communication between clients (typically web browsers) and a
-server. The official implementations of the client and server components are
+server. The original implementations of the client and server components are
 written in JavaScript.
 
 Getting Started
@@ -357,13 +363,16 @@ address this important limitation.
 API Reference
 -------------
 
-.. toctree::
-   :maxdepth: 2
-
 .. module:: socketio
-
 .. autoclass:: Middleware
    :members:
-
 .. autoclass:: Server
    :members:
+.. autoclass:: BaseManager
+   :members:
+.. autoclass:: PubSubManager
+   :members:
+.. autoclass:: KombuManager
+   :members:
+.. autoclass:: RedisManager
+   :members:
diff --git a/socketio/__init__.py b/socketio/__init__.py
index 6e95349..be5882f 100644
--- a/socketio/__init__.py
+++ b/socketio/__init__.py
@@ -1,4 +1,9 @@
 from .middleware import Middleware
+from .base_manager import BaseManager
+from .pubsub_manager import PubSubManager
+from .kombu_manager import KombuManager
+from .redis_manager import RedisManager
 from .server import Server
 
-__all__ = [Middleware, Server]
+__all__ = [Middleware, Server, BaseManager, PubSubManager, KombuManager,
+           RedisManager]
diff --git a/socketio/base_manager.py b/socketio/base_manager.py
index 7d13f1f..19e22f7 100644
--- a/socketio/base_manager.py
+++ b/socketio/base_manager.py
@@ -12,12 +12,15 @@ class BaseManager(object):
     services. More sophisticated storage backends can be implemented by
     subclasses.
     """
-    def __init__(self, server):
-        self.server = server
+    def __init__(self):
+        self.server = None
         self.rooms = {}
         self.pending_removals = []
         self.callbacks = {}
 
+    def initialize(self, server):
+        self.server = server
+
     def get_namespaces(self):
         """Return an iterable with the active namespace names."""
         return six.iterkeys(self.rooms)
@@ -69,7 +72,7 @@ class BaseManager(object):
         except KeyError:
             pass
 
-    def close_room(self, namespace, room):
+    def close_room(self, room, namespace):
         """Remove all participants from a room."""
         try:
             for sid in self.get_participants(namespace, room):
diff --git a/socketio/kombu_manager.py b/socketio/kombu_manager.py
new file mode 100644
index 0000000..837f054
--- /dev/null
+++ b/socketio/kombu_manager.py
@@ -0,0 +1,64 @@
+import json
+import pickle
+
+import six
+try:
+    import kombu
+except ImportError:
+    kombu = None
+
+from .pubsub_manager import PubSubManager
+
+
+class KombuManager(PubSubManager):
+    """Client manager that uses kombu for inter-process messaging.
+
+    This class implements a client manager backend for event sharing across
+    multiple processes, using RabbitMQ, Redis or any other messaging mechanism
+    supported by `kombu <http://kombu.readthedocs.org/en/latest/>`_.
+
+    To use a kombu backend, initialize the :class:`Server` instance as
+    follows::
+
+        url = 'amqp://user:password@hostname:port//'
+        server = socketio.Server(client_manager=socketio.KombuManager(url))
+
+    :param url: The connection URL for the backend messaging queue. Example
+                connection URLs are ``'amqp://guest:guest@localhost:5672//'``
+                and ``'redis://localhost:6379/'`` for RabbitMQ and Redis
+                respectively. Consult the `kombu documentation
+                <http://kombu.readthedocs.org/en/latest/userguide\
+                /connections.html#urls>`_ for more on how to construct
+                connection URLs.
+    :param channel: The channel name on which the server sends and receives
+                    notifications. Must be the same in all the servers.
+    """
+    name = 'kombu'
+
+    def __init__(self, url='amqp://guest:guest@localhost:5672//',
+                 channel='socketio'):
+        if kombu is None:
+            raise RuntimeError('Kombu package is not installed '
+                               '(Run "pip install kombu" in your '
+                               'virtualenv).')
+        self.kombu = kombu.Connection(url)
+        self.queue = self.kombu.SimpleQueue(channel)
+        super(KombuManager, self).__init__(channel=channel)
+
+    def _publish(self, data):
+        return self.queue.put(pickle.dumps(data))
+
+    def _listen(self):
+        listen_queue = self.kombu.SimpleQueue(self.channel)
+        while True:
+            message = listen_queue.get(block=True)
+            message.ack()
+            data = None
+            if isinstance(message.payload, six.binary_type):
+                try:
+                    data = pickle.loads(message.payload)
+                except pickle.PickleError:
+                    pass
+            if data is None:
+                data = json.loads(message.payload)
+            yield data
diff --git a/socketio/pubsub_manager.py b/socketio/pubsub_manager.py
new file mode 100644
index 0000000..01a03ec
--- /dev/null
+++ b/socketio/pubsub_manager.py
@@ -0,0 +1,77 @@
+from .base_manager import BaseManager
+
+
+class PubSubManager(BaseManager):
+    """Manage a client list attached to a pub/sub backend.
+
+    This is a base class that enables multiple servers to share the list of
+    clients, with the servers communicating events through a pub/sub backend.
+    The use of a pub/sub backend also allows any client connected to the
+    backend to emit events addressed to Socket.IO clients.
+
+    The actual backends must be implemented by subclasses, this class only
+    provides a pub/sub generic framework.
+
+    :param channel: The channel name on which the server sends and receives
+                    notifications.
+    """
+    def __init__(self, channel='socketio'):
+        super(PubSubManager, self).__init__()
+        self.channel = channel
+
+    def initialize(self, server):
+        super(PubSubManager, self).initialize(server)
+        self.thread = self.server.start_background_task(self._thread)
+        self.server.logger.info(self.name + ' backend initialized.')
+
+    def emit(self, event, data, namespace=None, room=None, skip_sid=None,
+             callback=None):
+        """Emit a message to a single client, a room, or all the clients
+        connected to the namespace.
+
+        This method takes care or propagating the message to all the servers
+        that are connected through the message queue.
+
+        The parameters are the same as in :meth:`.Server.emit`.
+        """
+        self._publish({'method': 'emit', 'event': event, 'data': data,
+                       'namespace': namespace or '/', 'room': room,
+                       'skip_sid': skip_sid, 'callback': callback})
+
+    def close_room(self, room, namespace=None):
+        self._publish({'method': 'close_room', 'room': room,
+                       'namespace': namespace or '/'})
+
+    def _publish(self, data):
+        """Publish a message on the Socket.IO channel.
+
+        This method needs to be implemented by the different subclasses that
+        support pub/sub backends.
+        """
+        raise NotImplementedError('This method must be implemented in a '
+                                  'subclass.')
+
+    def _listen(self):
+        """Return the next message published on the Socket.IO channel,
+        blocking until a message is available.
+
+        This method needs to be implemented by the different subclasses that
+        support pub/sub backends.
+        """
+        raise NotImplementedError('This method must be implemented in a '
+                                  'subclass.')
+
+    def _thread(self):
+        for message in self._listen():
+            if 'method' in message:
+                if message['method'] == 'emit':
+                    super(PubSubManager, self).emit(
+                        message['event'], message['data'],
+                        namespace=message.get('namespace'),
+                        room=message.get('room'),
+                        skip_sid=message.get('skip_sid'),
+                        callback=message.get('callback'))
+                elif message['method'] == 'close_room':
+                    super(PubSubManager, self).close_room(
+                        room=message.get('room'),
+                        namespace=message.get('namespace'))
diff --git a/socketio/redis_manager.py b/socketio/redis_manager.py
new file mode 100644
index 0000000..735bbcf
--- /dev/null
+++ b/socketio/redis_manager.py
@@ -0,0 +1,60 @@
+import json
+import pickle
+
+import six
+try:
+    import redis
+except ImportError:
+    redis = None
+
+from .pubsub_manager import PubSubManager
+
+
+class RedisManager(PubSubManager):
+    """Redis based client manager.
+
+    This class implements a Redis backend for event sharing across multiple
+    processes. Only kept here as one more example of how to build a custom
+    backend, since the kombu backend is perfectly adequate to support a Redis
+    message queue.
+
+    To use a Redis backend, initialize the :class:`Server` instance as
+    follows::
+
+        url = 'redis://hostname:port/0'
+        server = socketio.Server(client_manager=socketio.RedisManager(url))
+
+    :param url: The connection URL for the Redis server.
+    :param channel: The channel name on which the server sends and receives
+                    notifications. Must be the same in all the servers.
+    """
+    name = 'redis'
+
+    def __init__(self, url='redis://localhost:6379/0', channel='socketio'):
+        if redis is None:
+            raise RuntimeError('Redis package is not installed '
+                               '(Run "pip install redis" in your '
+                               'virtualenv).')
+        self.redis = redis.Redis.from_url(url)
+        self.pubsub = self.redis.pubsub()
+        super(RedisManager, self).__init__(channel=channel)
+
+    def _publish(self, data):
+        return self.redis.publish(self.channel, pickle.dumps(data))
+
+    def _listen(self):
+        channel = self.channel.encode('utf-8')
+        self.pubsub.subscribe(self.channel)
+        for message in self.pubsub.listen():
+            if message['channel'] == channel and \
+                    message['type'] == 'message' and 'data' in message:
+                data = None
+                if isinstance(message['data'], six.binary_type):
+                    try:
+                        data = pickle.loads(message['data'])
+                    except pickle.PickleError:
+                        pass
+                if data is None:
+                    data = json.loads(message['data'])
+                yield data
+        self.pubsub.unsubscribe(self.channel)
diff --git a/socketio/server.py b/socketio/server.py
index dcb1bae..0f22f98 100644
--- a/socketio/server.py
+++ b/socketio/server.py
@@ -14,9 +14,9 @@ class Server(object):
     for websocket and long-polling transports.
 
     :param client_manager: The client manager instance that will manage the
-                           client list. By default the client list is stored
-                           in an in-memory structure, which prevents the use
-                           of multiple worker processes.
+                           client list. When this is omitted, the client list
+                           is stored in an in-memory structure, so the use of
+                           multiple connected servers is not possible.
     :param logger: To enable logging set to ``True`` or pass a logger object to
                    use. To disable logging set to ``False``.
     :param binary: ``True`` to support binary payloads, ``False`` to treat all
@@ -62,9 +62,6 @@ class Server(object):
     """
     def __init__(self, client_manager=None, logger=False, binary=False,
                  json=None, **kwargs):
-        if client_manager is None:
-            client_manager = base_manager.BaseManager(self)
-        self.manager = client_manager
         engineio_options = kwargs
         engineio_logger = engineio_options.pop('engineio_logger', None)
         if engineio_logger is not None:
@@ -97,6 +94,11 @@ class Server(object):
                     self.logger.setLevel(logging.ERROR)
                 self.logger.addHandler(logging.StreamHandler())
 
+        if client_manager is None:
+            client_manager = base_manager.BaseManager()
+        client_manager.initialize(self)
+        self.manager = client_manager
+
     def on(self, event, handler=None, namespace=None):
         """Register an event handler.
 
@@ -248,7 +250,7 @@ class Server(object):
         """
         namespace = namespace or '/'
         self.logger.info('room %s is closing [%s]', room, namespace)
-        self.manager.close_room(namespace, room)
+        self.manager.close_room(room, namespace)
 
     def rooms(self, sid, namespace=None):
         """Return the rooms a client is in.
@@ -300,6 +302,9 @@ class Server(object):
         """
         return self.eio.handle_request(environ, start_response)
 
+    def start_background_task(self, target, *args, **kwargs):
+        self.eio.start_background_task(target, *args, **kwargs)
+
     def _emit_internal(self, sid, event, data, namespace=None, id=None):
         """Send a message to a client."""
         if six.PY2 and not self.binary:
diff --git a/tests/test_base_manager.py b/tests/test_base_manager.py
index 1cdfa6b..0badf69 100644
--- a/tests/test_base_manager.py
+++ b/tests/test_base_manager.py
@@ -12,7 +12,8 @@ from socketio import base_manager
 class TestBaseManager(unittest.TestCase):
     def setUp(self):
         mock_server = mock.MagicMock()
-        self.bm = base_manager.BaseManager(mock_server)
+        self.bm = base_manager.BaseManager()
+        self.bm.initialize(mock_server)
 
     def test_connect(self):
         self.bm.connect('123', '/foo')
@@ -142,11 +143,11 @@ class TestBaseManager(unittest.TestCase):
         self.bm.connect('789', '/foo')
         self.bm.enter_room('123', '/foo', 'bar')
         self.bm.enter_room('123', '/foo', 'bar')
-        self.bm.close_room('/foo', 'bar')
+        self.bm.close_room('bar', '/foo')
         self.assertNotIn('bar', self.bm.rooms['/foo'])
 
     def test_close_invalid_room(self):
-        self.bm.close_room('/foo', 'bar')
+        self.bm.close_room('bar', '/foo')
 
     def test_rooms(self):
         self.bm.connect('123', '/foo')
diff --git a/tests/test_server.py b/tests/test_server.py
index 142e207..c7a7a6d 100644
--- a/tests/test_server.py
+++ b/tests/test_server.py
@@ -92,13 +92,13 @@ class TestServer(unittest.TestCase):
         mgr = mock.MagicMock()
         s = server.Server(client_manager=mgr)
         s.close_room('room', namespace='/foo')
-        s.manager.close_room.assert_called_once_with('/foo', 'room')
+        s.manager.close_room.assert_called_once_with('room', '/foo')
 
     def test_close_room_default_namespace(self, eio):
         mgr = mock.MagicMock()
         s = server.Server(client_manager=mgr)
         s.close_room('room')
-        s.manager.close_room.assert_called_once_with('/', 'room')
+        s.manager.close_room.assert_called_once_with('room', '/')
 
     def test_rooms(self, eio):
         mgr = mock.MagicMock()
@@ -397,3 +397,9 @@ class TestServer(unittest.TestCase):
 
         # restore the default JSON module
         packet.Packet.json = json
+
+    def test_start_background_task(self, eio):
+        s = server.Server()
+        s.start_background_task('foo', 'bar', baz='baz')
+        s.eio.start_background_task.assert_called_once_with('foo', 'bar',
+                                                            baz='baz')