11 changed files with 650 additions and 79 deletions
@ -1,4 +1,9 @@ |
|||
from .middleware import Middleware |
|||
from .base_manager import BaseManager |
|||
from .pubsub_manager import PubSubManager |
|||
from .kombu_manager import KombuManager |
|||
from .redis_manager import RedisManager |
|||
from .server import Server |
|||
|
|||
__all__ = [Middleware, Server] |
|||
__all__ = [Middleware, Server, BaseManager, PubSubManager, KombuManager, |
|||
RedisManager] |
|||
|
@ -0,0 +1,54 @@ |
|||
import pickle |
|||
|
|||
try: |
|||
import kombu |
|||
except ImportError: |
|||
kombu = None |
|||
|
|||
from .pubsub_manager import PubSubManager |
|||
|
|||
|
|||
class KombuManager(PubSubManager): # pragma: no cover |
|||
"""Client manager that uses kombu for inter-process messaging. |
|||
|
|||
This class implements a client manager backend for event sharing across |
|||
multiple processes, using RabbitMQ, Redis or any other messaging mechanism |
|||
supported by `kombu <http://kombu.readthedocs.org/en/latest/>`_. |
|||
|
|||
To use a kombu backend, initialize the :class:`Server` instance as |
|||
follows:: |
|||
|
|||
url = 'amqp://user:password@hostname:port//' |
|||
server = socketio.Server(client_manager=socketio.KombuManager(url)) |
|||
|
|||
:param url: The connection URL for the backend messaging queue. Example |
|||
connection URLs are ``'amqp://guest:guest@localhost:5672//'`` |
|||
and ``'redis://localhost:6379/'`` for RabbitMQ and Redis |
|||
respectively. Consult the `kombu documentation |
|||
<http://kombu.readthedocs.org/en/latest/userguide\ |
|||
/connections.html#urls>`_ for more on how to construct |
|||
connection URLs. |
|||
:param channel: The channel name on which the server sends and receives |
|||
notifications. Must be the same in all the servers. |
|||
""" |
|||
name = 'kombu' |
|||
|
|||
def __init__(self, url='amqp://guest:guest@localhost:5672//', |
|||
channel='socketio'): |
|||
if kombu is None: |
|||
raise RuntimeError('Kombu package is not installed ' |
|||
'(Run "pip install kombu" in your ' |
|||
'virtualenv).') |
|||
self.kombu = kombu.Connection(url) |
|||
self.queue = self.kombu.SimpleQueue(channel) |
|||
super(KombuManager, self).__init__(channel=channel) |
|||
|
|||
def _publish(self, data): |
|||
return self.queue.put(pickle.dumps(data)) |
|||
|
|||
def _listen(self): |
|||
listen_queue = self.kombu.SimpleQueue(self.channel) |
|||
while True: |
|||
message = listen_queue.get(block=True) |
|||
message.ack() |
|||
yield message.payload |
@ -0,0 +1,145 @@ |
|||
from functools import partial |
|||
import uuid |
|||
|
|||
import json |
|||
import pickle |
|||
import six |
|||
|
|||
from .base_manager import BaseManager |
|||
|
|||
|
|||
class PubSubManager(BaseManager): |
|||
"""Manage a client list attached to a pub/sub backend. |
|||
|
|||
This is a base class that enables multiple servers to share the list of |
|||
clients, with the servers communicating events through a pub/sub backend. |
|||
The use of a pub/sub backend also allows any client connected to the |
|||
backend to emit events addressed to Socket.IO clients. |
|||
|
|||
The actual backends must be implemented by subclasses, this class only |
|||
provides a pub/sub generic framework. |
|||
|
|||
:param channel: The channel name on which the server sends and receives |
|||
notifications. |
|||
""" |
|||
name = 'pubsub' |
|||
|
|||
def __init__(self, channel='socketio'): |
|||
super(PubSubManager, self).__init__() |
|||
self.channel = channel |
|||
self.host_id = uuid.uuid4().hex |
|||
|
|||
def initialize(self, server): |
|||
super(PubSubManager, self).initialize(server) |
|||
self.thread = self.server.start_background_task(self._thread) |
|||
self.server.logger.info(self.name + ' backend initialized.') |
|||
|
|||
def emit(self, event, data, namespace=None, room=None, skip_sid=None, |
|||
callback=None): |
|||
"""Emit a message to a single client, a room, or all the clients |
|||
connected to the namespace. |
|||
|
|||
This method takes care or propagating the message to all the servers |
|||
that are connected through the message queue. |
|||
|
|||
The parameters are the same as in :meth:`.Server.emit`. |
|||
""" |
|||
namespace = namespace or '/' |
|||
if callback is not None: |
|||
if self.server is None: |
|||
raise RuntimeError('Callbacks can only be issued from the ' |
|||
'context of a server.') |
|||
if room is None: |
|||
raise ValueError('Cannot use callback without a room set.') |
|||
id = self._generate_ack_id(room, namespace, callback) |
|||
callback = (room, namespace, id) |
|||
else: |
|||
callback = None |
|||
self._publish({'method': 'emit', 'event': event, 'data': data, |
|||
'namespace': namespace, 'room': room, |
|||
'skip_sid': skip_sid, 'callback': callback}) |
|||
|
|||
def close_room(self, room, namespace=None): |
|||
self._publish({'method': 'close_room', 'room': room, |
|||
'namespace': namespace or '/'}) |
|||
|
|||
def _publish(self, data): |
|||
"""Publish a message on the Socket.IO channel. |
|||
|
|||
This method needs to be implemented by the different subclasses that |
|||
support pub/sub backends. |
|||
""" |
|||
raise NotImplementedError('This method must be implemented in a ' |
|||
'subclass.') # pragma: no cover |
|||
|
|||
def _listen(self): |
|||
"""Return the next message published on the Socket.IO channel, |
|||
blocking until a message is available. |
|||
|
|||
This method needs to be implemented by the different subclasses that |
|||
support pub/sub backends. |
|||
""" |
|||
raise NotImplementedError('This method must be implemented in a ' |
|||
'subclass.') # pragma: no cover |
|||
|
|||
def _handle_emit(self, message): |
|||
# Events with callbacks are very tricky to handle across hosts |
|||
# Here in the receiving end we set up a local callback that preserves |
|||
# the callback host and id from the sender |
|||
remote_callback = message.get('callback') |
|||
if remote_callback is not None and len(remote_callback) == 3: |
|||
callback = partial(self._return_callback, self.host_id, |
|||
*remote_callback) |
|||
else: |
|||
callback = None |
|||
super(PubSubManager, self).emit(message['event'], message['data'], |
|||
namespace=message.get('namespace'), |
|||
room=message.get('room'), |
|||
skip_sid=message.get('skip_sid'), |
|||
callback=callback) |
|||
|
|||
def _handle_callback(self, message): |
|||
if self.host_id == message.get('host_id'): |
|||
try: |
|||
sid = message['sid'] |
|||
namespace = message['namespace'] |
|||
id = message['id'] |
|||
args = message['args'] |
|||
except KeyError: |
|||
return |
|||
self.trigger_callback(sid, namespace, id, args) |
|||
|
|||
def _return_callback(self, host_id, sid, namespace, callback_id, *args): |
|||
# When an event callback is received, the callback is returned back |
|||
# the sender, which is identified by the host_id |
|||
self._publish({'method': 'callback', 'host_id': host_id, |
|||
'sid': sid, 'namespace': namespace, 'id': callback_id, |
|||
'args': args}) |
|||
|
|||
def _handle_close_room(self, message): |
|||
super(PubSubManager, self).close_room( |
|||
room=message.get('room'), namespace=message.get('namespace')) |
|||
|
|||
def _thread(self): |
|||
for message in self._listen(): |
|||
data = None |
|||
if isinstance(message, dict): |
|||
data = message |
|||
else: |
|||
if isinstance(message, six.binary_type): # pragma: no cover |
|||
try: |
|||
data = pickle.loads(message) |
|||
except: |
|||
pass |
|||
if data is None: |
|||
try: |
|||
data = json.loads(message) |
|||
except: |
|||
pass |
|||
if data and 'method' in data: |
|||
if data['method'] == 'emit': |
|||
self._handle_emit(data) |
|||
elif data['method'] == 'callback': |
|||
self._handle_callback(data) |
|||
elif data['method'] == 'close_room': |
|||
self._handle_close_room(data) |
@ -0,0 +1,50 @@ |
|||
import pickle |
|||
|
|||
try: |
|||
import redis |
|||
except ImportError: |
|||
redis = None |
|||
|
|||
from .pubsub_manager import PubSubManager |
|||
|
|||
|
|||
class RedisManager(PubSubManager): # pragma: no cover |
|||
"""Redis based client manager. |
|||
|
|||
This class implements a Redis backend for event sharing across multiple |
|||
processes. Only kept here as one more example of how to build a custom |
|||
backend, since the kombu backend is perfectly adequate to support a Redis |
|||
message queue. |
|||
|
|||
To use a Redis backend, initialize the :class:`Server` instance as |
|||
follows:: |
|||
|
|||
url = 'redis://hostname:port/0' |
|||
server = socketio.Server(client_manager=socketio.RedisManager(url)) |
|||
|
|||
:param url: The connection URL for the Redis server. |
|||
:param channel: The channel name on which the server sends and receives |
|||
notifications. Must be the same in all the servers. |
|||
""" |
|||
name = 'redis' |
|||
|
|||
def __init__(self, url='redis://localhost:6379/0', channel='socketio'): |
|||
if redis is None: |
|||
raise RuntimeError('Redis package is not installed ' |
|||
'(Run "pip install redis" in your ' |
|||
'virtualenv).') |
|||
self.redis = redis.Redis.from_url(url) |
|||
self.pubsub = self.redis.pubsub() |
|||
super(RedisManager, self).__init__(channel=channel) |
|||
|
|||
def _publish(self, data): |
|||
return self.redis.publish(self.channel, pickle.dumps(data)) |
|||
|
|||
def _listen(self): |
|||
channel = self.channel.encode('utf-8') |
|||
self.pubsub.subscribe(self.channel) |
|||
for message in self.pubsub.listen(): |
|||
if message['channel'] == channel and \ |
|||
message['type'] == 'message' and 'data' in message: |
|||
yield message['data'] |
|||
self.pubsub.unsubscribe(self.channel) |
@ -0,0 +1,214 @@ |
|||
import functools |
|||
import unittest |
|||
|
|||
import six |
|||
if six.PY3: |
|||
from unittest import mock |
|||
else: |
|||
import mock |
|||
|
|||
from socketio import base_manager |
|||
from socketio import pubsub_manager |
|||
|
|||
|
|||
class TestBaseManager(unittest.TestCase): |
|||
def setUp(self): |
|||
mock_server = mock.MagicMock() |
|||
self.pm = pubsub_manager.PubSubManager() |
|||
self.pm._publish = mock.MagicMock() |
|||
self.pm.initialize(mock_server) |
|||
|
|||
def test_default_init(self): |
|||
self.assertEqual(self.pm.channel, 'socketio') |
|||
self.assertEqual(len(self.pm.host_id), 32) |
|||
self.pm.server.start_background_task.assert_called_once_with( |
|||
self.pm._thread) |
|||
|
|||
def test_custom_init(self): |
|||
pubsub = pubsub_manager.PubSubManager(channel='foo') |
|||
self.assertEqual(pubsub.channel, 'foo') |
|||
self.assertEqual(len(pubsub.host_id), 32) |
|||
|
|||
def test_emit(self): |
|||
self.pm.emit('foo', 'bar') |
|||
self.pm._publish.assert_called_once_with( |
|||
{'method': 'emit', 'event': 'foo', 'data': 'bar', |
|||
'namespace': '/', 'room': None, 'skip_sid': None, |
|||
'callback': None}) |
|||
|
|||
def test_emit_with_namespace(self): |
|||
self.pm.emit('foo', 'bar', namespace='/baz') |
|||
self.pm._publish.assert_called_once_with( |
|||
{'method': 'emit', 'event': 'foo', 'data': 'bar', |
|||
'namespace': '/baz', 'room': None, 'skip_sid': None, |
|||
'callback': None}) |
|||
|
|||
def test_emit_with_room(self): |
|||
self.pm.emit('foo', 'bar', room='baz') |
|||
self.pm._publish.assert_called_once_with( |
|||
{'method': 'emit', 'event': 'foo', 'data': 'bar', |
|||
'namespace': '/', 'room': 'baz', 'skip_sid': None, |
|||
'callback': None}) |
|||
|
|||
def test_emit_with_skip_sid(self): |
|||
self.pm.emit('foo', 'bar', skip_sid='baz') |
|||
self.pm._publish.assert_called_once_with( |
|||
{'method': 'emit', 'event': 'foo', 'data': 'bar', |
|||
'namespace': '/', 'room': None, 'skip_sid': 'baz', |
|||
'callback': None}) |
|||
|
|||
def test_emit_with_callback(self): |
|||
with mock.patch.object(self.pm, '_generate_ack_id', |
|||
return_value='123'): |
|||
self.pm.emit('foo', 'bar', room='baz', callback='cb') |
|||
self.pm._publish.assert_called_once_with( |
|||
{'method': 'emit', 'event': 'foo', 'data': 'bar', |
|||
'namespace': '/', 'room': 'baz', 'skip_sid': None, |
|||
'callback': ('baz', '/', '123')}) |
|||
|
|||
def test_emit_with_callback_without_server(self): |
|||
standalone_pm = pubsub_manager.PubSubManager() |
|||
self.assertRaises(RuntimeError, standalone_pm.emit, 'foo', 'bar', |
|||
callback='cb') |
|||
|
|||
def test_emit_with_callback_missing_room(self): |
|||
with mock.patch.object(self.pm, '_generate_ack_id', |
|||
return_value='123'): |
|||
self.assertRaises(ValueError, self.pm.emit, 'foo', 'bar', |
|||
callback='cb') |
|||
|
|||
def test_close_room(self): |
|||
self.pm.close_room('foo') |
|||
self.pm._publish.assert_called_once_with( |
|||
{'method': 'close_room', 'room': 'foo', 'namespace': '/'}) |
|||
|
|||
def test_close_room_with_namespace(self): |
|||
self.pm.close_room('foo', '/bar') |
|||
self.pm._publish.assert_called_once_with( |
|||
{'method': 'close_room', 'room': 'foo', 'namespace': '/bar'}) |
|||
|
|||
def test_handle_emit(self): |
|||
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: |
|||
self.pm._handle_emit({'event': 'foo', 'data': 'bar'}) |
|||
super_emit.assert_called_once_with('foo', 'bar', namespace=None, |
|||
room=None, skip_sid=None, |
|||
callback=None) |
|||
|
|||
def test_handle_emit_with_namespace(self): |
|||
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: |
|||
self.pm._handle_emit({'event': 'foo', 'data': 'bar', |
|||
'namespace': '/baz'}) |
|||
super_emit.assert_called_once_with('foo', 'bar', namespace='/baz', |
|||
room=None, skip_sid=None, |
|||
callback=None) |
|||
|
|||
def test_handle_emiti_with_room(self): |
|||
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: |
|||
self.pm._handle_emit({'event': 'foo', 'data': 'bar', |
|||
'room': 'baz'}) |
|||
super_emit.assert_called_once_with('foo', 'bar', namespace=None, |
|||
room='baz', skip_sid=None, |
|||
callback=None) |
|||
|
|||
def test_handle_emit_with_skip_sid(self): |
|||
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: |
|||
self.pm._handle_emit({'event': 'foo', 'data': 'bar', |
|||
'skip_sid': '123'}) |
|||
super_emit.assert_called_once_with('foo', 'bar', namespace=None, |
|||
room=None, skip_sid='123', |
|||
callback=None) |
|||
|
|||
def test_handle_emit_with_callback(self): |
|||
host_id = self.pm.host_id |
|||
with mock.patch.object(base_manager.BaseManager, 'emit') as super_emit: |
|||
self.pm._handle_emit({'event': 'foo', 'data': 'bar', |
|||
'namespace': '/baz', |
|||
'callback': ('sid', '/baz', 123)}) |
|||
self.assertEqual(super_emit.call_count, 1) |
|||
self.assertEqual(super_emit.call_args[0], ('foo', 'bar')) |
|||
self.assertEqual(super_emit.call_args[1]['namespace'], '/baz') |
|||
self.assertIsNone(super_emit.call_args[1]['room']) |
|||
self.assertIsNone(super_emit.call_args[1]['skip_sid']) |
|||
self.assertIsInstance(super_emit.call_args[1]['callback'], |
|||
functools.partial) |
|||
super_emit.call_args[1]['callback']('one', 2, 'three') |
|||
self.pm._publish.assert_called_once_with( |
|||
{'method': 'callback', 'host_id': host_id, 'sid': 'sid', |
|||
'namespace': '/baz', 'id': 123, 'args': ('one', 2, 'three')}) |
|||
|
|||
def test_handle_callback(self): |
|||
host_id = self.pm.host_id |
|||
with mock.patch.object(self.pm, 'trigger_callback') as trigger: |
|||
self.pm._handle_callback({'method': 'callback', |
|||
'host_id': host_id, 'sid': 'sid', |
|||
'namespace': '/', 'id': 123, |
|||
'args': ('one', 2)}) |
|||
trigger.assert_called_once_with('sid', '/', 123, ('one', 2)) |
|||
|
|||
def test_handle_callback_bad_host_id(self): |
|||
with mock.patch.object(self.pm, 'trigger_callback') as trigger: |
|||
self.pm._handle_callback({'method': 'callback', |
|||
'host_id': 'bad', 'sid': 'sid', |
|||
'namespace': '/', 'id': 123, |
|||
'args': ('one', 2)}) |
|||
self.assertEqual(trigger.call_count, 0) |
|||
|
|||
def test_handle_callback_missing_args(self): |
|||
host_id = self.pm.host_id |
|||
with mock.patch.object(self.pm, 'trigger_callback') as trigger: |
|||
self.pm._handle_callback({'method': 'callback', |
|||
'host_id': host_id, 'sid': 'sid', |
|||
'namespace': '/', 'id': 123}) |
|||
self.pm._handle_callback({'method': 'callback', |
|||
'host_id': host_id, 'sid': 'sid', |
|||
'namespace': '/'}) |
|||
self.pm._handle_callback({'method': 'callback', |
|||
'host_id': host_id, 'sid': 'sid'}) |
|||
self.pm._handle_callback({'method': 'callback', |
|||
'host_id': host_id}) |
|||
self.assertEqual(trigger.call_count, 0) |
|||
|
|||
def test_handle_close_room(self): |
|||
with mock.patch.object(base_manager.BaseManager, 'close_room') \ |
|||
as super_close_room: |
|||
self.pm._handle_close_room({'method': 'close_room', |
|||
'room': 'foo'}) |
|||
super_close_room.assert_called_once_with(room='foo', |
|||
namespace=None) |
|||
|
|||
def test_handle_close_room_with_namespace(self): |
|||
with mock.patch.object(base_manager.BaseManager, 'close_room') \ |
|||
as super_close_room: |
|||
self.pm._handle_close_room({'method': 'close_room', |
|||
'room': 'foo', 'namespace': '/bar'}) |
|||
super_close_room.assert_called_once_with(room='foo', |
|||
namespace='/bar') |
|||
|
|||
def test_background_thread(self): |
|||
self.pm._handle_emit = mock.MagicMock() |
|||
self.pm._handle_callback = mock.MagicMock() |
|||
self.pm._handle_close_room = mock.MagicMock() |
|||
|
|||
def messages(): |
|||
import pickle |
|||
yield {'method': 'emit', 'value': 'foo'} |
|||
yield {'missing': 'method'} |
|||
yield '{"method": "callback", "value": "bar"}' |
|||
yield {'method': 'bogus'} |
|||
yield pickle.dumps({'method': 'close_room', 'value': 'baz'}) |
|||
yield 'bad json' |
|||
yield b'bad pickled' |
|||
raise KeyboardInterrupt |
|||
|
|||
self.pm._listen = mock.MagicMock(side_effect=messages) |
|||
try: |
|||
self.pm._thread() |
|||
except KeyboardInterrupt: |
|||
pass |
|||
|
|||
self.pm._handle_emit.assert_called_once_with( |
|||
{'method': 'emit', 'value': 'foo'}) |
|||
self.pm._handle_callback.assert_called_once_with( |
|||
{'method': 'callback', 'value': 'bar'}) |
|||
self.pm._handle_close_room.assert_called_once_with( |
|||
{'method': 'close_room', 'value': 'baz'}) |
Loading…
Reference in new issue