diff --git a/docs/index.rst b/docs/index.rst
index 9d2949b..2f9676a 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -363,11 +363,13 @@ type of installation, each server processes owns the connections to a subset
of the clients. To make broadcasting work in this environment, the servers
communicate with each other through the message queue.
-The message queue service needs to be installed and configured separately. One
-of the options offered by this package is to use
-`Kombu `_ to access the message
-queue, which means that any message queue supported by this package can be
-used. Kombu can be installed with pip::
+Kombu
+~~~~~
+
+One of the messaging options offered by this package to access the message
+queue is `Kombu `_ , which means that
+any message queue supported by this package can be used. Kombu can be installed
+with pip::
pip install kombu
@@ -378,7 +380,8 @@ package for Redis installed as well::
pip install redis
-To configure a Socket.IO server to connect to a message queue, the
+The appropriate message queue service, such as RabbitMQ or Redis, must also be
+installed. To configure a Socket.IO server to connect to a Kombu queue, the
``client_manager`` argument must be passed in the server creation. The
following example instructs the server to connect to a Redis service running
on the same host and on the default port::
@@ -392,27 +395,56 @@ credentials, the configuration is as follows::
mgr = socketio.KombuManager('amqp://')
sio = socketio.Server(client_manager=mgr)
-The URL passed to the ``KombuManager`` constructor is passed directly to
+The URL passed to the :class:`KombuManager` constructor is passed directly to
Kombu's `Connection object
`_, so
the Kombu documentation should be consulted for information on how to
connect to the message queue appropriately.
-If the use of Kombu is not desired, native Redis support is also offered
-through the ``RedisManager`` class. This class takes the same arguments as
-``KombuManager``, but connects directly to a Redis store using the queue's
-pub/sub functionality::
+Note that Kombu currently does not support asyncio, so it cannot be used with
+the :class:`socketio.AsyncServer` class.
+
+Redis
+~~~~~
+
+To use a Redis message queue, the Python package for Redis must also be
+installed::
+
+ # WSGI server
+ pip install redis
+
+ # asyncio server
+ pip install aioredis
+
+Native Redis support is accessed through the :class:`socketio.RedisManager` and
+:class:`socketio.AsyncRedisManager` classes. These classes connect directly to
+the Redis store and use the queue's pub/sub functionality::
+ # WSGI server
mgr = socketio.RedisManager('redis://')
sio = socketio.Server(client_manager=mgr)
-If multiple Socket.IO servers are connected to a message queue, they
+ # asyncio server
+ mgr = socketio.AsyncRedisManager('redis://')
+ sio = socketio.AsyncServer(client_manager=mgr)
+
+Horizontal scaling
+~~~~~~~~~~~~~~~~~~
+
+If multiple Socket.IO servers are connected to the same message queue, they
automatically communicate with each other and manage a combined client list,
-without any need for additional configuration. To have a process other than
-a server connect to the queue to emit a message, the same ``KombuManager``
-and ``RedisManager`` classes can be used as standalone object. In this case,
-the ``write_only`` argument should be set to ``True`` to disable the creation
-of a listening thread. For example::
+without any need for additional configuration. When a load balancer such as
+nginx is used, this provides virtually unlimited scaling capabilities for the
+server.
+
+Emitting from external processes
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+To have a process other than a server connect to the queue to emit a message,
+the same client manager classes can be used as standalone objects. In this
+case, the ``write_only`` argument should be set to ``True`` to disable the
+creation of a listening thread, which only makes sense in a server. For
+example::
# connect to the redis queue through Kombu
external_sio = socketio.KombuManager('redis://', write_only=True)
@@ -420,11 +452,6 @@ of a listening thread. For example::
# emit an event
external_sio.emit('my event', data={'foo': 'bar'}, room='my room')
-Note: when using a third party package to manage a message queue such as Redis
-or RabbitMQ in conjunction with eventlet or gevent, it is necessary to monkey
-patch the Python standard library, so that these packages access coroutine
-friendly library functions and classes.
-
Deployment
----------
@@ -663,3 +690,6 @@ API Reference
.. autoclass:: AsyncManager
:members:
:inherited-members:
+
+.. autoclass:: AsyncRedisManager
+ :members:
diff --git a/examples/aiohttp/app.py b/examples/aiohttp/app.py
index 260636d..c9ac7d5 100755
--- a/examples/aiohttp/app.py
+++ b/examples/aiohttp/app.py
@@ -54,7 +54,7 @@ async def close(sid, message):
await sio.emit('my response',
{'data': 'Room ' + message['room'] + ' is closing.'},
room=message['room'], namespace='/test')
- sio.close_room(message['room'], namespace='/test')
+ await sio.close_room(message['room'], namespace='/test')
@sio.on('my room event', namespace='/test')
diff --git a/socketio/__init__.py b/socketio/__init__.py
index 24220d3..e840e7c 100644
--- a/socketio/__init__.py
+++ b/socketio/__init__.py
@@ -12,10 +12,12 @@ if sys.version_info >= (3, 5): # pragma: no cover
from .asyncio_server import AsyncServer
from .asyncio_manager import AsyncManager
from .asyncio_namespace import AsyncNamespace
+ from .asyncio_redis_manager import AsyncRedisManager
else: # pragma: no cover
AsyncServer = None
AsyncManager = None
AsyncNamespace = None
+ AsyncRedisManager = None
__version__ = '1.6.3'
@@ -23,6 +25,5 @@ __all__ = ['__version__', 'Middleware', 'Server', 'BaseManager',
'PubSubManager', 'KombuManager', 'RedisManager', 'ZmqManager',
'Namespace']
if AsyncServer is not None: # pragma: no cover
- __all__.append('AsyncServer')
- __all__.append('AsyncNamespace')
- __all__.append('AsyncManager')
+ __all__ += ['AsyncServer', 'AsyncNamespace', 'AsyncManager',
+ 'AsyncRedisManager']
diff --git a/socketio/asyncio_manager.py b/socketio/asyncio_manager.py
index 6297f73..0ecbd46 100644
--- a/socketio/asyncio_manager.py
+++ b/socketio/asyncio_manager.py
@@ -25,6 +25,13 @@ class AsyncManager(BaseManager):
namespace, id))
await asyncio.wait(tasks)
+ async def close_room(self, room, namespace):
+ """Remove all participants from a room.
+
+ Note: this method is a coroutine.
+ """
+ return super().close_room(room, namespace)
+
async def trigger_callback(self, sid, namespace, id, data):
"""Invoke an application callback.
diff --git a/socketio/asyncio_namespace.py b/socketio/asyncio_namespace.py
index 6befd37..ba22e4e 100644
--- a/socketio/asyncio_namespace.py
+++ b/socketio/asyncio_namespace.py
@@ -70,6 +70,18 @@ class AsyncNamespace(namespace.Namespace):
namespace=namespace or self.namespace,
callback=callback)
+ async def close_room(self, room, namespace=None):
+ """Close a room.
+
+ The only difference with the :func:`socketio.Server.close_room` method
+ is that when the ``namespace`` argument is not given the namespace
+ associated with the class is used.
+
+ Note: this method is a coroutine.
+ """
+ return await self.server.close_room(
+ room, namespace=namespace or self.namespace)
+
async def disconnect(self, sid, namespace=None):
"""Disconnect a client.
diff --git a/socketio/asyncio_pubsub_manager.py b/socketio/asyncio_pubsub_manager.py
new file mode 100644
index 0000000..8442cd1
--- /dev/null
+++ b/socketio/asyncio_pubsub_manager.py
@@ -0,0 +1,160 @@
+from functools import partial
+import uuid
+
+import json
+import pickle
+import six
+
+from .asyncio_manager import AsyncManager
+
+
+class AsyncPubSubManager(AsyncManager):
+ """Manage a client list attached to a pub/sub backend under asyncio.
+
+ This is a base class that enables multiple servers to share the list of
+ clients, with the servers communicating events through a pub/sub backend.
+ The use of a pub/sub backend also allows any client connected to the
+ backend to emit events addressed to Socket.IO clients.
+
+ The actual backends must be implemented by subclasses, this class only
+ provides a pub/sub generic framework for asyncio applications.
+
+ :param channel: The channel name on which the server sends and receives
+ notifications.
+ """
+ name = 'asyncpubsub'
+
+ def __init__(self, channel='socketio', write_only=False):
+ super().__init__()
+ self.channel = channel
+ self.write_only = write_only
+ self.host_id = uuid.uuid4().hex
+
+ def initialize(self):
+ super().initialize()
+ if not self.write_only:
+ self.thread = self.server.start_background_task(self._thread)
+ self.server.logger.info(self.name + ' backend initialized.')
+
+ async def emit(self, event, data, namespace=None, room=None, skip_sid=None,
+ callback=None, **kwargs):
+ """Emit a message to a single client, a room, or all the clients
+ connected to the namespace.
+
+ This method takes care or propagating the message to all the servers
+ that are connected through the message queue.
+
+ The parameters are the same as in :meth:`.Server.emit`.
+
+ Note: this method is a coroutine.
+ """
+ if kwargs.get('ignore_queue'):
+ return await super().emit(
+ event, data, namespace=namespace, room=room, skip_sid=skip_sid,
+ callback=callback)
+ namespace = namespace or '/'
+ if callback is not None:
+ if self.server is None:
+ raise RuntimeError('Callbacks can only be issued from the '
+ 'context of a server.')
+ if room is None:
+ raise ValueError('Cannot use callback without a room set.')
+ id = self._generate_ack_id(room, namespace, callback)
+ callback = (room, namespace, id)
+ else:
+ callback = None
+ await self._publish({'method': 'emit', 'event': event, 'data': data,
+ 'namespace': namespace, 'room': room,
+ 'skip_sid': skip_sid, 'callback': callback})
+
+ async def close_room(self, room, namespace=None):
+ await self._publish({'method': 'close_room', 'room': room,
+ 'namespace': namespace or '/'})
+
+ async def _publish(self, data):
+ """Publish a message on the Socket.IO channel.
+
+ This method needs to be implemented by the different subclasses that
+ support pub/sub backends.
+ """
+ raise NotImplementedError('This method must be implemented in a '
+ 'subclass.') # pragma: no cover
+
+ async def _listen(self):
+ """Return the next message published on the Socket.IO channel,
+ blocking until a message is available.
+
+ This method needs to be implemented by the different subclasses that
+ support pub/sub backends.
+ """
+ raise NotImplementedError('This method must be implemented in a '
+ 'subclass.') # pragma: no cover
+
+ async def _handle_emit(self, message):
+ # Events with callbacks are very tricky to handle across hosts
+ # Here in the receiving end we set up a local callback that preserves
+ # the callback host and id from the sender
+ remote_callback = message.get('callback')
+ if remote_callback is not None and len(remote_callback) == 3:
+ callback = partial(self._return_callback, self.host_id,
+ *remote_callback)
+ else:
+ callback = None
+ await super().emit(message['event'], message['data'],
+ namespace=message.get('namespace'),
+ room=message.get('room'),
+ skip_sid=message.get('skip_sid'),
+ callback=callback)
+
+ async def _handle_callback(self, message):
+ if self.host_id == message.get('host_id'):
+ try:
+ sid = message['sid']
+ namespace = message['namespace']
+ id = message['id']
+ args = message['args']
+ except KeyError:
+ return
+ await self.trigger_callback(sid, namespace, id, args)
+
+ async def _return_callback(self, host_id, sid, namespace, callback_id,
+ *args):
+ # When an event callback is received, the callback is returned back
+ # the sender, which is identified by the host_id
+ await self._publish({'method': 'callback', 'host_id': host_id,
+ 'sid': sid, 'namespace': namespace,
+ 'id': callback_id, 'args': args})
+
+ async def _handle_close_room(self, message):
+ await super().close_room(
+ room=message.get('room'), namespace=message.get('namespace'))
+
+ async def _thread(self):
+ while True:
+ try:
+ message = await self._listen()
+ except:
+ import traceback
+ traceback.print_exc()
+ break
+ data = None
+ if isinstance(message, dict):
+ data = message
+ else:
+ if isinstance(message, six.binary_type): # pragma: no cover
+ try:
+ data = pickle.loads(message)
+ except:
+ pass
+ if data is None:
+ try:
+ data = json.loads(message)
+ except:
+ pass
+ if data and 'method' in data:
+ if data['method'] == 'emit':
+ await self._handle_emit(data)
+ elif data['method'] == 'callback':
+ await self._handle_callback(data)
+ elif data['method'] == 'close_room':
+ await self._handle_close_room(data)
diff --git a/socketio/asyncio_redis_manager.py b/socketio/asyncio_redis_manager.py
new file mode 100644
index 0000000..7341033
--- /dev/null
+++ b/socketio/asyncio_redis_manager.py
@@ -0,0 +1,78 @@
+import pickle
+from urllib.parse import urlparse
+
+try:
+ import aioredis
+except ImportError:
+ aioredis = None
+
+from .asyncio_pubsub_manager import AsyncPubSubManager
+
+
+def _parse_redis_url(url):
+ p = urlparse(url)
+ if p.scheme != 'redis':
+ raise ValueError('Invalid redis url')
+ if ':' in p.netloc:
+ host, port = p.netloc.split(':')
+ port = int(port)
+ else:
+ host = p.netloc or 'localhost'
+ port = 6379
+ if p.path:
+ db = int(p.path[1:])
+ else:
+ db = 0
+ if not host:
+ raise ValueError('Invalid redis hostname')
+ return host, port, db
+
+
+class AsyncRedisManager(AsyncPubSubManager): # pragma: no cover
+ """Redis based client manager for asyncio servers.
+
+ This class implements a Redis backend for event sharing across multiple
+ processes. Only kept here as one more example of how to build a custom
+ backend, since the kombu backend is perfectly adequate to support a Redis
+ message queue.
+
+ To use a Redis backend, initialize the :class:`Server` instance as
+ follows::
+
+ server = socketio.Server(client_manager=socketio.AsyncRedisManager(
+ 'redis://hostname:port/0'))
+
+ :param url: The connection URL for the Redis server. For a default Redis
+ store running on the same host, use ``redis://``.
+ :param channel: The channel name on which the server sends and receives
+ notifications. Must be the same in all the servers.
+ :param write_only: If set ot ``True``, only initialize to emit events. The
+ default of ``False`` initializes the class for emitting
+ and receiving.
+ """
+ name = 'aioredis'
+
+ def __init__(self, url='redis://localhost:6379/0', channel='socketio',
+ write_only=False):
+ if aioredis is None:
+ raise RuntimeError('Redis package is not installed '
+ '(Run "pip install aioredis" in your '
+ 'virtualenv).')
+ self.host, self.port, self.db = _parse_redis_url(url)
+ self.pub = None
+ self.sub = None
+ super().__init__(channel=channel, write_only=write_only)
+
+ async def _publish(self, data):
+ if self.pub is None:
+ self.pub = await aioredis.create_redis((self.host, self.port),
+ db=self.db)
+ return await self.pub.publish(self.channel, pickle.dumps(data))
+
+ async def _listen(self):
+ if self.sub is None:
+ self.sub = await aioredis.create_redis((self.host, self.port),
+ db=self.db)
+ self.ch = (await self.sub.subscribe(self.channel))[0]
+ while True:
+ return await self.ch.get()
diff --git a/socketio/asyncio_server.py b/socketio/asyncio_server.py
index 4954013..19d0f89 100644
--- a/socketio/asyncio_server.py
+++ b/socketio/asyncio_server.py
@@ -149,6 +149,21 @@ class AsyncServer(server.Server):
await self.emit('message', data, room, skip_sid, namespace, callback,
**kwargs)
+ async def close_room(self, room, namespace=None):
+ """Close a room.
+
+ This function removes all the clients from the given room.
+
+ :param room: Room name.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the default namespace is used.
+
+ Note: this method is a coroutine.
+ """
+ namespace = namespace or '/'
+ self.logger.info('room %s is closing [%s]', room, namespace)
+ await self.manager.close_room(room, namespace)
+
async def disconnect(self, sid, namespace=None):
"""Disconnect a client.
diff --git a/tests/test_asyncio_manager.py b/tests/test_asyncio_manager.py
index b188525..926d854 100644
--- a/tests/test_asyncio_manager.py
+++ b/tests/test_asyncio_manager.py
@@ -195,7 +195,7 @@ class TestAsyncManager(unittest.TestCase):
self.bm.connect('789', '/foo')
self.bm.enter_room('123', '/foo', 'bar')
self.bm.enter_room('123', '/foo', 'bar')
- self.bm.close_room('bar', '/foo')
+ _run(self.bm.close_room('bar', '/foo'))
self.assertNotIn('bar', self.bm.rooms['/foo'])
def test_close_invalid_room(self):
diff --git a/tests/test_asyncio_namespace.py b/tests/test_asyncio_namespace.py
index d7e205d..abb7c10 100644
--- a/tests/test_asyncio_namespace.py
+++ b/tests/test_asyncio_namespace.py
@@ -154,11 +154,13 @@ class TestAsyncNamespace(unittest.TestCase):
def test_close_room(self):
ns = asyncio_namespace.AsyncNamespace('/foo')
- ns._set_server(mock.MagicMock())
- ns.close_room('room')
- ns.server.close_room.assert_called_with('room', namespace='/foo')
- ns.close_room('room', namespace='/bar')
- ns.server.close_room.assert_called_with('room', namespace='/bar')
+ mock_server = mock.MagicMock()
+ mock_server.close_room = AsyncMock()
+ ns._set_server(mock_server)
+ _run(ns.close_room('room'))
+ ns.server.close_room.mock.assert_called_with('room', namespace='/foo')
+ _run(ns.close_room('room', namespace='/bar'))
+ ns.server.close_room.mock.assert_called_with('room', namespace='/bar')
def test_rooms(self):
ns = asyncio_namespace.AsyncNamespace('/foo')
diff --git a/tests/test_asyncio_pubsub_manager.py b/tests/test_asyncio_pubsub_manager.py
new file mode 100644
index 0000000..2f556e6
--- /dev/null
+++ b/tests/test_asyncio_pubsub_manager.py
@@ -0,0 +1,268 @@
+import functools
+import sys
+import unittest
+
+import six
+if six.PY3:
+ from unittest import mock
+else:
+ import mock
+
+if sys.version_info >= (3, 5):
+ import asyncio
+ from asyncio import coroutine
+ from socketio import asyncio_manager
+ from socketio import asyncio_pubsub_manager
+else:
+ # mock coroutine so that Python 2 doesn't complain
+ def coroutine(f):
+ return f
+
+
+def AsyncMock(*args, **kwargs):
+ """Return a mock asynchronous function."""
+ m = mock.MagicMock(*args, **kwargs)
+
+ @coroutine
+ def mock_coro(*args, **kwargs):
+ return m(*args, **kwargs)
+
+ mock_coro.mock = m
+ return mock_coro
+
+
+def _run(coro):
+ """Run the given coroutine."""
+ return asyncio.get_event_loop().run_until_complete(coro)
+
+
+@unittest.skipIf(sys.version_info < (3, 5), 'only for Python 3.5+')
+class TestAsyncPubSubManager(unittest.TestCase):
+ def setUp(self):
+ mock_server = mock.MagicMock()
+ mock_server._emit_internal = AsyncMock()
+ self.pm = asyncio_pubsub_manager.AsyncPubSubManager()
+ self.pm._publish = AsyncMock()
+ self.pm.set_server(mock_server)
+ self.pm.initialize()
+
+ def test_default_init(self):
+ self.assertEqual(self.pm.channel, 'socketio')
+ self.assertEqual(len(self.pm.host_id), 32)
+ self.pm.server.start_background_task.assert_called_once_with(
+ self.pm._thread)
+
+ def test_custom_init(self):
+ pubsub = asyncio_pubsub_manager.AsyncPubSubManager(channel='foo')
+ self.assertEqual(pubsub.channel, 'foo')
+ self.assertEqual(len(pubsub.host_id), 32)
+
+ def test_write_only_init(self):
+ mock_server = mock.MagicMock()
+ pm = asyncio_pubsub_manager.AsyncPubSubManager(write_only=True)
+ pm.set_server(mock_server)
+ pm.initialize()
+ self.assertEqual(pm.channel, 'socketio')
+ self.assertEqual(len(pm.host_id), 32)
+ self.assertEqual(pm.server.start_background_task.call_count, 0)
+
+ def test_emit(self):
+ _run(self.pm.emit('foo', 'bar'))
+ self.pm._publish.mock.assert_called_once_with(
+ {'method': 'emit', 'event': 'foo', 'data': 'bar',
+ 'namespace': '/', 'room': None, 'skip_sid': None,
+ 'callback': None})
+
+ def test_emit_with_namespace(self):
+ _run(self.pm.emit('foo', 'bar', namespace='/baz'))
+ self.pm._publish.mock.assert_called_once_with(
+ {'method': 'emit', 'event': 'foo', 'data': 'bar',
+ 'namespace': '/baz', 'room': None, 'skip_sid': None,
+ 'callback': None})
+
+ def test_emit_with_room(self):
+ _run(self.pm.emit('foo', 'bar', room='baz'))
+ self.pm._publish.mock.assert_called_once_with(
+ {'method': 'emit', 'event': 'foo', 'data': 'bar',
+ 'namespace': '/', 'room': 'baz', 'skip_sid': None,
+ 'callback': None})
+
+ def test_emit_with_skip_sid(self):
+ _run(self.pm.emit('foo', 'bar', skip_sid='baz'))
+ self.pm._publish.mock.assert_called_once_with(
+ {'method': 'emit', 'event': 'foo', 'data': 'bar',
+ 'namespace': '/', 'room': None, 'skip_sid': 'baz',
+ 'callback': None})
+
+ def test_emit_with_callback(self):
+ with mock.patch.object(self.pm, '_generate_ack_id',
+ return_value='123'):
+ _run(self.pm.emit('foo', 'bar', room='baz', callback='cb'))
+ self.pm._publish.mock.assert_called_once_with(
+ {'method': 'emit', 'event': 'foo', 'data': 'bar',
+ 'namespace': '/', 'room': 'baz', 'skip_sid': None,
+ 'callback': ('baz', '/', '123')})
+
+ def test_emit_with_callback_without_server(self):
+ standalone_pm = asyncio_pubsub_manager.AsyncPubSubManager()
+ self.assertRaises(RuntimeError, _run,
+ standalone_pm.emit('foo', 'bar', callback='cb'))
+
+ def test_emit_with_callback_missing_room(self):
+ with mock.patch.object(self.pm, '_generate_ack_id',
+ return_value='123'):
+ self.assertRaises(ValueError, _run,
+ self.pm.emit('foo', 'bar', callback='cb'))
+
+ def test_emit_with_ignore_queue(self):
+ self.pm.connect('123', '/')
+ _run(self.pm.emit('foo', 'bar', room='123', namespace='/',
+ ignore_queue=True))
+ self.pm._publish.mock.assert_not_called()
+ self.pm.server._emit_internal.mock.assert_called_once_with(
+ '123', 'foo', 'bar', '/', None)
+
+ def test_close_room(self):
+ _run(self.pm.close_room('foo'))
+ self.pm._publish.mock.assert_called_once_with(
+ {'method': 'close_room', 'room': 'foo', 'namespace': '/'})
+
+ def test_close_room_with_namespace(self):
+ _run(self.pm.close_room('foo', '/bar'))
+ self.pm._publish.mock.assert_called_once_with(
+ {'method': 'close_room', 'room': 'foo', 'namespace': '/bar'})
+
+ def test_handle_emit(self):
+ with mock.patch.object(asyncio_manager.AsyncManager, 'emit',
+ new=AsyncMock()) as super_emit:
+ _run(self.pm._handle_emit({'event': 'foo', 'data': 'bar'}))
+ super_emit.mock.assert_called_once_with(
+ self.pm, 'foo', 'bar', namespace=None, room=None,
+ skip_sid=None, callback=None)
+
+ def test_handle_emit_with_namespace(self):
+ with mock.patch.object(asyncio_manager.AsyncManager, 'emit',
+ new=AsyncMock()) as super_emit:
+ _run(self.pm._handle_emit({'event': 'foo', 'data': 'bar',
+ 'namespace': '/baz'}))
+ super_emit.mock.assert_called_once_with(
+ self.pm, 'foo', 'bar', namespace='/baz', room=None,
+ skip_sid=None, callback=None)
+
+ def test_handle_emiti_with_room(self):
+ with mock.patch.object(asyncio_manager.AsyncManager, 'emit',
+ new=AsyncMock()) as super_emit:
+ _run(self.pm._handle_emit({'event': 'foo', 'data': 'bar',
+ 'room': 'baz'}))
+ super_emit.mock.assert_called_once_with(
+ self.pm, 'foo', 'bar', namespace=None, room='baz',
+ skip_sid=None, callback=None)
+
+ def test_handle_emit_with_skip_sid(self):
+ with mock.patch.object(asyncio_manager.AsyncManager, 'emit',
+ new=AsyncMock()) as super_emit:
+ _run(self.pm._handle_emit({'event': 'foo', 'data': 'bar',
+ 'skip_sid': '123'}))
+ super_emit.mock.assert_called_once_with(
+ self.pm, 'foo', 'bar', namespace=None, room=None,
+ skip_sid='123', callback=None)
+
+ def test_handle_emit_with_callback(self):
+ host_id = self.pm.host_id
+ with mock.patch.object(asyncio_manager.AsyncManager, 'emit',
+ new=AsyncMock()) as super_emit:
+ _run(self.pm._handle_emit({'event': 'foo', 'data': 'bar',
+ 'namespace': '/baz',
+ 'callback': ('sid', '/baz', 123)}))
+ self.assertEqual(super_emit.mock.call_count, 1)
+ self.assertEqual(super_emit.mock.call_args[0],
+ (self.pm, 'foo', 'bar'))
+ self.assertEqual(super_emit.mock.call_args[1]['namespace'], '/baz')
+ self.assertIsNone(super_emit.mock.call_args[1]['room'])
+ self.assertIsNone(super_emit.mock.call_args[1]['skip_sid'])
+ self.assertIsInstance(super_emit.mock.call_args[1]['callback'],
+ functools.partial)
+ _run(super_emit.mock.call_args[1]['callback']('one', 2, 'three'))
+ self.pm._publish.mock.assert_called_once_with(
+ {'method': 'callback', 'host_id': host_id, 'sid': 'sid',
+ 'namespace': '/baz', 'id': 123, 'args': ('one', 2, 'three')})
+
+ def test_handle_callback(self):
+ host_id = self.pm.host_id
+ with mock.patch.object(self.pm, 'trigger_callback',
+ new=AsyncMock()) as trigger:
+ _run(self.pm._handle_callback({'method': 'callback',
+ 'host_id': host_id, 'sid': 'sid',
+ 'namespace': '/', 'id': 123,
+ 'args': ('one', 2)}))
+ trigger.mock.assert_called_once_with('sid', '/', 123, ('one', 2))
+
+ def test_handle_callback_bad_host_id(self):
+ with mock.patch.object(self.pm, 'trigger_callback',
+ new=AsyncMock()) as trigger:
+ _run(self.pm._handle_callback({'method': 'callback',
+ 'host_id': 'bad', 'sid': 'sid',
+ 'namespace': '/', 'id': 123,
+ 'args': ('one', 2)}))
+ self.assertEqual(trigger.mock.call_count, 0)
+
+ def test_handle_callback_missing_args(self):
+ host_id = self.pm.host_id
+ with mock.patch.object(self.pm, 'trigger_callback',
+ new=AsyncMock()) as trigger:
+ _run(self.pm._handle_callback({'method': 'callback',
+ 'host_id': host_id, 'sid': 'sid',
+ 'namespace': '/', 'id': 123}))
+ _run(self.pm._handle_callback({'method': 'callback',
+ 'host_id': host_id, 'sid': 'sid',
+ 'namespace': '/'}))
+ _run(self.pm._handle_callback({'method': 'callback',
+ 'host_id': host_id, 'sid': 'sid'}))
+ _run(self.pm._handle_callback({'method': 'callback',
+ 'host_id': host_id}))
+ self.assertEqual(trigger.mock.call_count, 0)
+
+ def test_handle_close_room(self):
+ with mock.patch.object(asyncio_manager.AsyncManager, 'close_room',
+ new=AsyncMock()) as super_close_room:
+ _run(self.pm._handle_close_room({'method': 'close_room',
+ 'room': 'foo'}))
+ super_close_room.mock.assert_called_once_with(
+ self.pm, room='foo', namespace=None)
+
+ def test_handle_close_room_with_namespace(self):
+ with mock.patch.object(asyncio_manager.AsyncManager, 'close_room',
+ new=AsyncMock()) as super_close_room:
+ _run(self.pm._handle_close_room({'method': 'close_room',
+ 'room': 'foo',
+ 'namespace': '/bar'}))
+ super_close_room.mock.assert_called_once_with(
+ self.pm, room='foo', namespace='/bar')
+
+ def test_background_thread(self):
+ self.pm._handle_emit = AsyncMock()
+ self.pm._handle_callback = AsyncMock()
+ self.pm._handle_close_room = AsyncMock()
+
+ def messages():
+ import pickle
+ yield {'method': 'emit', 'value': 'foo'}
+ yield {'missing': 'method'}
+ yield '{"method": "callback", "value": "bar"}'
+ yield {'method': 'bogus'}
+ yield pickle.dumps({'method': 'close_room', 'value': 'baz'})
+ yield 'bad json'
+ yield b'bad pickled'
+
+ self.pm._listen = AsyncMock(side_effect=list(messages()))
+ try:
+ _run(self.pm._thread())
+ except StopIteration:
+ pass
+
+ self.pm._handle_emit.mock.assert_called_once_with(
+ {'method': 'emit', 'value': 'foo'})
+ self.pm._handle_callback.mock.assert_called_once_with(
+ {'method': 'callback', 'value': 'bar'})
+ self.pm._handle_close_room.mock.assert_called_once_with(
+ {'method': 'close_room', 'value': 'baz'})
diff --git a/tests/test_asyncio_redis_manager.py b/tests/test_asyncio_redis_manager.py
new file mode 100644
index 0000000..efa9721
--- /dev/null
+++ b/tests/test_asyncio_redis_manager.py
@@ -0,0 +1,43 @@
+import sys
+import unittest
+
+if sys.version_info >= (3, 5):
+ from socketio import asyncio_redis_manager
+
+
+@unittest.skipIf(sys.version_info < (3, 5), 'only for Python 3.5+')
+class TestAsyncRedisManager(unittest.TestCase):
+ def test_default_url(self):
+ self.assertEqual(asyncio_redis_manager._parse_redis_url('redis://'),
+ ('localhost', 6379, 0))
+
+ def test_only_host_url(self):
+ self.assertEqual(
+ asyncio_redis_manager._parse_redis_url('redis://redis.host'),
+ ('redis.host', 6379, 0))
+
+ def test_no_db_url(self):
+ self.assertEqual(
+ asyncio_redis_manager._parse_redis_url('redis://redis.host:123/1'),
+ ('redis.host', 123, 1))
+
+ def test_no_port_url(self):
+ self.assertEqual(
+ asyncio_redis_manager._parse_redis_url('redis://redis.host/1'),
+ ('redis.host', 6379, 1))
+
+ def test_no_host_url(self):
+ self.assertRaises(ValueError, asyncio_redis_manager._parse_redis_url,
+ 'redis://:123/1')
+
+ def test_bad_port_url(self):
+ self.assertRaises(ValueError, asyncio_redis_manager._parse_redis_url,
+ 'redis://localhost:abc/1')
+
+ def test_bad_db_url(self):
+ self.assertRaises(ValueError, asyncio_redis_manager._parse_redis_url,
+ 'redis://localhost:abc/z')
+
+ def test_bad_scheme_url(self):
+ self.assertRaises(ValueError, asyncio_redis_manager._parse_redis_url,
+ 'http://redis.host:123/1')
diff --git a/tests/test_asyncio_server.py b/tests/test_asyncio_server.py
index 07b75b3..a19bb38 100644
--- a/tests/test_asyncio_server.py
+++ b/tests/test_asyncio_server.py
@@ -49,6 +49,7 @@ class TestAsyncServer(unittest.TestCase):
def _get_mock_manager(self):
mgr = mock.MagicMock()
mgr.emit = AsyncMock()
+ mgr.close_room = AsyncMock()
mgr.trigger_callback = AsyncMock()
return mgr
@@ -134,14 +135,14 @@ class TestAsyncServer(unittest.TestCase):
def test_close_room(self, eio):
mgr = self._get_mock_manager()
s = asyncio_server.AsyncServer(client_manager=mgr)
- s.close_room('room', namespace='/foo')
- s.manager.close_room.assert_called_once_with('room', '/foo')
+ _run(s.close_room('room', namespace='/foo'))
+ s.manager.close_room.mock.assert_called_once_with('room', '/foo')
def test_close_room_default_namespace(self, eio):
mgr = self._get_mock_manager()
s = asyncio_server.AsyncServer(client_manager=mgr)
- s.close_room('room')
- s.manager.close_room.assert_called_once_with('room', '/')
+ _run(s.close_room('room'))
+ s.manager.close_room.mock.assert_called_once_with('room', '/')
def test_rooms(self, eio):
mgr = self._get_mock_manager()
diff --git a/tests/test_pubsub_manager.py b/tests/test_pubsub_manager.py
index aa461b6..684dedb 100644
--- a/tests/test_pubsub_manager.py
+++ b/tests/test_pubsub_manager.py
@@ -216,12 +216,11 @@ class TestBaseManager(unittest.TestCase):
yield pickle.dumps({'method': 'close_room', 'value': 'baz'})
yield 'bad json'
yield b'bad pickled'
- raise KeyboardInterrupt
self.pm._listen = mock.MagicMock(side_effect=messages)
try:
self.pm._thread()
- except KeyboardInterrupt:
+ except StopIteration:
pass
self.pm._handle_emit.assert_called_once_with(