Browse Source

initial implementation of inter-process communication

pull/9/head
Miguel Grinberg 9 years ago
parent
commit
47620bbebd
  1. 41
      README.rst
  2. 33
      docs/index.rst
  3. 7
      socketio/__init__.py
  4. 9
      socketio/base_manager.py
  5. 64
      socketio/kombu_manager.py
  6. 77
      socketio/pubsub_manager.py
  7. 60
      socketio/redis_manager.py
  8. 19
      socketio/server.py
  9. 7
      tests/test_base_manager.py
  10. 10
      tests/test_server.py

41
README.rst

@ -9,20 +9,31 @@ Python implementation of the `Socket.IO`_ realtime server.
Features Features
-------- --------
- Fully compatible with the Javascript `socket.io-client`_ library. - Fully compatible with the
- Compatible with Python 2.7 and Python 3.3+. `Javascript <https://github.com/Automattic/socket.io-client>`_,
- Based on `Eventlet`_, enabling large number of clients even on modest `Swift <https://github.com/socketio/socket.io-client-swift>`_,
hardware. `C++ <https://github.com/socketio/socket.io-client-cpp>`_ and
- Includes a WSGI middleware that integrates Socket.IO traffic with `Java <https://github.com/socketio/socket.io-client-java>`_ official
standard WSGI applications. Socket.IO clients, plus any third party clients that comply with the
- Uses an event-based architecture implemented with decorators that Socket.IO specification.
hides the details of the protocol. - Compatible with Python 2.7 and Python 3.3+.
- Implements HTTP long-polling and WebSocket transports. - Supports large number of clients even on modest hardware when used with an
- Supports XHR2 and XHR browsers as clients. asynchronous server based on `eventlet <http://eventlet.net/>`_ or
- Supports text and binary messages. `gevent <http://gevent.org/>`_. For development and testing, any WSGI
- Supports gzip and deflate HTTP compression. complaint multi-threaded server can be used.
- Configurable CORS responses to avoid cross-origin problems with - Includes a WSGI middleware that integrates Socket.IO traffic with standard
browsers. WSGI applications.
- Broadcasting of messages to all connected clients, or to subsets of them
assigned to "rooms".
- Optional support for multiple servers, connected through a messaging queue
such as Redis or RabbitMQ.
- Event-based architecture implemented with decorators that hides the details
of the protocol.
- Support for HTTP long-polling and WebSocket transports.
- Support for XHR2 and XHR browsers.
- Support for text and binary messages.
- Support for gzip and deflate HTTP compression.
- Configurable CORS responses, to avoid cross-origin problems with browsers.
Example Example
------- -------
@ -74,4 +85,4 @@ Resources
.. _socket.io-client: https://github.com/Automattic/socket.io-client .. _socket.io-client: https://github.com/Automattic/socket.io-client
.. _Eventlet: http://eventlet.net/ .. _Eventlet: http://eventlet.net/
.. _Documentation: http://pythonhosted.org/python-socketio .. _Documentation: http://pythonhosted.org/python-socketio
.. _PyPI: https://pypi.python.org/pypi/python-socketio .. _PyPI: https://pypi.python.org/pypi/python-socketio

33
docs/index.rst

@ -12,9 +12,13 @@ This project implements an Socket.IO server that can run standalone or
integrated with a Python WSGI application. The following are some of its integrated with a Python WSGI application. The following are some of its
features: features:
- Fully compatible with the Javascript - Fully compatible with the
`socket.io-client <https://github.com/Automattic/socket.io-client>`_ library, `Javascript <https://github.com/Automattic/socket.io-client>`_,
versions 1.3.5 and up. `Swift <https://github.com/socketio/socket.io-client-swift>`_,
`C++ <https://github.com/socketio/socket.io-client-cpp>`_ and
`Java <https://github.com/socketio/socket.io-client-java>`_ official
Socket.IO clients, plus any third party clients that comply with the
Socket.IO specification.
- Compatible with Python 2.7 and Python 3.3+. - Compatible with Python 2.7 and Python 3.3+.
- Supports large number of clients even on modest hardware when used with an - Supports large number of clients even on modest hardware when used with an
asynchronous server based on `eventlet <http://eventlet.net/>`_ or asynchronous server based on `eventlet <http://eventlet.net/>`_ or
@ -24,20 +28,22 @@ features:
WSGI applications. WSGI applications.
- Broadcasting of messages to all connected clients, or to subsets of them - Broadcasting of messages to all connected clients, or to subsets of them
assigned to "rooms". assigned to "rooms".
- Uses an event-based architecture implemented with decorators that hides the - Optional support for multiple servers, connected through a messaging queue
details of the protocol. such as Redis or RabbitMQ.
- Event-based architecture implemented with decorators that hides the details
of the protocol.
- Support for HTTP long-polling and WebSocket transports. - Support for HTTP long-polling and WebSocket transports.
- Support for XHR2 and XHR browsers. - Support for XHR2 and XHR browsers.
- Support for text and binary messages. - Support for text and binary messages.
- Support for gzip and deflate HTTP compression. - Support for gzip and deflate HTTP compression.
- Configurable CORS responses to avoid cross-origin problems with browsers. - Configurable CORS responses, to avoid cross-origin problems with browsers.
What is Socket.IO? What is Socket.IO?
------------------ ------------------
Socket.IO is a transport protocol that enables real-time bidirectional Socket.IO is a transport protocol that enables real-time bidirectional
event-based communication between clients (typically web browsers) and a event-based communication between clients (typically web browsers) and a
server. The official implementations of the client and server components are server. The original implementations of the client and server components are
written in JavaScript. written in JavaScript.
Getting Started Getting Started
@ -357,13 +363,16 @@ address this important limitation.
API Reference API Reference
------------- -------------
.. toctree::
:maxdepth: 2
.. module:: socketio .. module:: socketio
.. autoclass:: Middleware .. autoclass:: Middleware
:members: :members:
.. autoclass:: Server .. autoclass:: Server
:members: :members:
.. autoclass:: BaseManager
:members:
.. autoclass:: PubSubManager
:members:
.. autoclass:: KombuManager
:members:
.. autoclass:: RedisManager
:members:

7
socketio/__init__.py

@ -1,4 +1,9 @@
from .middleware import Middleware from .middleware import Middleware
from .base_manager import BaseManager
from .pubsub_manager import PubSubManager
from .kombu_manager import KombuManager
from .redis_manager import RedisManager
from .server import Server from .server import Server
__all__ = [Middleware, Server] __all__ = [Middleware, Server, BaseManager, PubSubManager, KombuManager,
RedisManager]

9
socketio/base_manager.py

@ -12,12 +12,15 @@ class BaseManager(object):
services. More sophisticated storage backends can be implemented by services. More sophisticated storage backends can be implemented by
subclasses. subclasses.
""" """
def __init__(self, server): def __init__(self):
self.server = server self.server = None
self.rooms = {} self.rooms = {}
self.pending_removals = [] self.pending_removals = []
self.callbacks = {} self.callbacks = {}
def initialize(self, server):
self.server = server
def get_namespaces(self): def get_namespaces(self):
"""Return an iterable with the active namespace names.""" """Return an iterable with the active namespace names."""
return six.iterkeys(self.rooms) return six.iterkeys(self.rooms)
@ -69,7 +72,7 @@ class BaseManager(object):
except KeyError: except KeyError:
pass pass
def close_room(self, namespace, room): def close_room(self, room, namespace):
"""Remove all participants from a room.""" """Remove all participants from a room."""
try: try:
for sid in self.get_participants(namespace, room): for sid in self.get_participants(namespace, room):

64
socketio/kombu_manager.py

@ -0,0 +1,64 @@
import json
import pickle
import six
try:
import kombu
except ImportError:
kombu = None
from .pubsub_manager import PubSubManager
class KombuManager(PubSubManager):
"""Client manager that uses kombu for inter-process messaging.
This class implements a client manager backend for event sharing across
multiple processes, using RabbitMQ, Redis or any other messaging mechanism
supported by `kombu <http://kombu.readthedocs.org/en/latest/>`_.
To use a kombu backend, initialize the :class:`Server` instance as
follows::
url = 'amqp://user:password@hostname:port//'
server = socketio.Server(client_manager=socketio.KombuManager(url))
:param url: The connection URL for the backend messaging queue. Example
connection URLs are ``'amqp://guest:guest@localhost:5672//'``
and ``'redis://localhost:6379/'`` for RabbitMQ and Redis
respectively. Consult the `kombu documentation
<http://kombu.readthedocs.org/en/latest/userguide\
/connections.html#urls>`_ for more on how to construct
connection URLs.
:param channel: The channel name on which the server sends and receives
notifications. Must be the same in all the servers.
"""
name = 'kombu'
def __init__(self, url='amqp://guest:guest@localhost:5672//',
channel='socketio'):
if kombu is None:
raise RuntimeError('Kombu package is not installed '
'(Run "pip install kombu" in your '
'virtualenv).')
self.kombu = kombu.Connection(url)
self.queue = self.kombu.SimpleQueue(channel)
super(KombuManager, self).__init__(channel=channel)
def _publish(self, data):
return self.queue.put(pickle.dumps(data))
def _listen(self):
listen_queue = self.kombu.SimpleQueue(self.channel)
while True:
message = listen_queue.get(block=True)
message.ack()
data = None
if isinstance(message.payload, six.binary_type):
try:
data = pickle.loads(message.payload)
except pickle.PickleError:
pass
if data is None:
data = json.loads(message.payload)
yield data

77
socketio/pubsub_manager.py

@ -0,0 +1,77 @@
from .base_manager import BaseManager
class PubSubManager(BaseManager):
"""Manage a client list attached to a pub/sub backend.
This is a base class that enables multiple servers to share the list of
clients, with the servers communicating events through a pub/sub backend.
The use of a pub/sub backend also allows any client connected to the
backend to emit events addressed to Socket.IO clients.
The actual backends must be implemented by subclasses, this class only
provides a pub/sub generic framework.
:param channel: The channel name on which the server sends and receives
notifications.
"""
def __init__(self, channel='socketio'):
super(PubSubManager, self).__init__()
self.channel = channel
def initialize(self, server):
super(PubSubManager, self).initialize(server)
self.thread = self.server.start_background_task(self._thread)
self.server.logger.info(self.name + ' backend initialized.')
def emit(self, event, data, namespace=None, room=None, skip_sid=None,
callback=None):
"""Emit a message to a single client, a room, or all the clients
connected to the namespace.
This method takes care or propagating the message to all the servers
that are connected through the message queue.
The parameters are the same as in :meth:`.Server.emit`.
"""
self._publish({'method': 'emit', 'event': event, 'data': data,
'namespace': namespace or '/', 'room': room,
'skip_sid': skip_sid, 'callback': callback})
def close_room(self, room, namespace=None):
self._publish({'method': 'close_room', 'room': room,
'namespace': namespace or '/'})
def _publish(self, data):
"""Publish a message on the Socket.IO channel.
This method needs to be implemented by the different subclasses that
support pub/sub backends.
"""
raise NotImplementedError('This method must be implemented in a '
'subclass.')
def _listen(self):
"""Return the next message published on the Socket.IO channel,
blocking until a message is available.
This method needs to be implemented by the different subclasses that
support pub/sub backends.
"""
raise NotImplementedError('This method must be implemented in a '
'subclass.')
def _thread(self):
for message in self._listen():
if 'method' in message:
if message['method'] == 'emit':
super(PubSubManager, self).emit(
message['event'], message['data'],
namespace=message.get('namespace'),
room=message.get('room'),
skip_sid=message.get('skip_sid'),
callback=message.get('callback'))
elif message['method'] == 'close_room':
super(PubSubManager, self).close_room(
room=message.get('room'),
namespace=message.get('namespace'))

60
socketio/redis_manager.py

@ -0,0 +1,60 @@
import json
import pickle
import six
try:
import redis
except ImportError:
redis = None
from .pubsub_manager import PubSubManager
class RedisManager(PubSubManager):
"""Redis based client manager.
This class implements a Redis backend for event sharing across multiple
processes. Only kept here as one more example of how to build a custom
backend, since the kombu backend is perfectly adequate to support a Redis
message queue.
To use a Redis backend, initialize the :class:`Server` instance as
follows::
url = 'redis://hostname:port/0'
server = socketio.Server(client_manager=socketio.RedisManager(url))
:param url: The connection URL for the Redis server.
:param channel: The channel name on which the server sends and receives
notifications. Must be the same in all the servers.
"""
name = 'redis'
def __init__(self, url='redis://localhost:6379/0', channel='socketio'):
if redis is None:
raise RuntimeError('Redis package is not installed '
'(Run "pip install redis" in your '
'virtualenv).')
self.redis = redis.Redis.from_url(url)
self.pubsub = self.redis.pubsub()
super(RedisManager, self).__init__(channel=channel)
def _publish(self, data):
return self.redis.publish(self.channel, pickle.dumps(data))
def _listen(self):
channel = self.channel.encode('utf-8')
self.pubsub.subscribe(self.channel)
for message in self.pubsub.listen():
if message['channel'] == channel and \
message['type'] == 'message' and 'data' in message:
data = None
if isinstance(message['data'], six.binary_type):
try:
data = pickle.loads(message['data'])
except pickle.PickleError:
pass
if data is None:
data = json.loads(message['data'])
yield data
self.pubsub.unsubscribe(self.channel)

19
socketio/server.py

@ -14,9 +14,9 @@ class Server(object):
for websocket and long-polling transports. for websocket and long-polling transports.
:param client_manager: The client manager instance that will manage the :param client_manager: The client manager instance that will manage the
client list. By default the client list is stored client list. When this is omitted, the client list
in an in-memory structure, which prevents the use is stored in an in-memory structure, so the use of
of multiple worker processes. multiple connected servers is not possible.
:param logger: To enable logging set to ``True`` or pass a logger object to :param logger: To enable logging set to ``True`` or pass a logger object to
use. To disable logging set to ``False``. use. To disable logging set to ``False``.
:param binary: ``True`` to support binary payloads, ``False`` to treat all :param binary: ``True`` to support binary payloads, ``False`` to treat all
@ -62,9 +62,6 @@ class Server(object):
""" """
def __init__(self, client_manager=None, logger=False, binary=False, def __init__(self, client_manager=None, logger=False, binary=False,
json=None, **kwargs): json=None, **kwargs):
if client_manager is None:
client_manager = base_manager.BaseManager(self)
self.manager = client_manager
engineio_options = kwargs engineio_options = kwargs
engineio_logger = engineio_options.pop('engineio_logger', None) engineio_logger = engineio_options.pop('engineio_logger', None)
if engineio_logger is not None: if engineio_logger is not None:
@ -97,6 +94,11 @@ class Server(object):
self.logger.setLevel(logging.ERROR) self.logger.setLevel(logging.ERROR)
self.logger.addHandler(logging.StreamHandler()) self.logger.addHandler(logging.StreamHandler())
if client_manager is None:
client_manager = base_manager.BaseManager()
client_manager.initialize(self)
self.manager = client_manager
def on(self, event, handler=None, namespace=None): def on(self, event, handler=None, namespace=None):
"""Register an event handler. """Register an event handler.
@ -248,7 +250,7 @@ class Server(object):
""" """
namespace = namespace or '/' namespace = namespace or '/'
self.logger.info('room %s is closing [%s]', room, namespace) self.logger.info('room %s is closing [%s]', room, namespace)
self.manager.close_room(namespace, room) self.manager.close_room(room, namespace)
def rooms(self, sid, namespace=None): def rooms(self, sid, namespace=None):
"""Return the rooms a client is in. """Return the rooms a client is in.
@ -300,6 +302,9 @@ class Server(object):
""" """
return self.eio.handle_request(environ, start_response) return self.eio.handle_request(environ, start_response)
def start_background_task(self, target, *args, **kwargs):
self.eio.start_background_task(target, *args, **kwargs)
def _emit_internal(self, sid, event, data, namespace=None, id=None): def _emit_internal(self, sid, event, data, namespace=None, id=None):
"""Send a message to a client.""" """Send a message to a client."""
if six.PY2 and not self.binary: if six.PY2 and not self.binary:

7
tests/test_base_manager.py

@ -12,7 +12,8 @@ from socketio import base_manager
class TestBaseManager(unittest.TestCase): class TestBaseManager(unittest.TestCase):
def setUp(self): def setUp(self):
mock_server = mock.MagicMock() mock_server = mock.MagicMock()
self.bm = base_manager.BaseManager(mock_server) self.bm = base_manager.BaseManager()
self.bm.initialize(mock_server)
def test_connect(self): def test_connect(self):
self.bm.connect('123', '/foo') self.bm.connect('123', '/foo')
@ -142,11 +143,11 @@ class TestBaseManager(unittest.TestCase):
self.bm.connect('789', '/foo') self.bm.connect('789', '/foo')
self.bm.enter_room('123', '/foo', 'bar') self.bm.enter_room('123', '/foo', 'bar')
self.bm.enter_room('123', '/foo', 'bar') self.bm.enter_room('123', '/foo', 'bar')
self.bm.close_room('/foo', 'bar') self.bm.close_room('bar', '/foo')
self.assertNotIn('bar', self.bm.rooms['/foo']) self.assertNotIn('bar', self.bm.rooms['/foo'])
def test_close_invalid_room(self): def test_close_invalid_room(self):
self.bm.close_room('/foo', 'bar') self.bm.close_room('bar', '/foo')
def test_rooms(self): def test_rooms(self):
self.bm.connect('123', '/foo') self.bm.connect('123', '/foo')

10
tests/test_server.py

@ -92,13 +92,13 @@ class TestServer(unittest.TestCase):
mgr = mock.MagicMock() mgr = mock.MagicMock()
s = server.Server(client_manager=mgr) s = server.Server(client_manager=mgr)
s.close_room('room', namespace='/foo') s.close_room('room', namespace='/foo')
s.manager.close_room.assert_called_once_with('/foo', 'room') s.manager.close_room.assert_called_once_with('room', '/foo')
def test_close_room_default_namespace(self, eio): def test_close_room_default_namespace(self, eio):
mgr = mock.MagicMock() mgr = mock.MagicMock()
s = server.Server(client_manager=mgr) s = server.Server(client_manager=mgr)
s.close_room('room') s.close_room('room')
s.manager.close_room.assert_called_once_with('/', 'room') s.manager.close_room.assert_called_once_with('room', '/')
def test_rooms(self, eio): def test_rooms(self, eio):
mgr = mock.MagicMock() mgr = mock.MagicMock()
@ -397,3 +397,9 @@ class TestServer(unittest.TestCase):
# restore the default JSON module # restore the default JSON module
packet.Packet.json = json packet.Packet.json = json
def test_start_background_task(self, eio):
s = server.Server()
s.start_background_task('foo', 'bar', baz='baz')
s.eio.start_background_task.assert_called_once_with('foo', 'bar',
baz='baz')

Loading…
Cancel
Save