diff --git a/docs/index.rst b/docs/index.rst index 026bef1..2f39b6b 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -560,6 +560,29 @@ functions in the standard library with equivalent asynchronous versions. While python-socketio does not require monkey patching, other libraries such as database drivers are likely to require it. +Gevent with uWSGI +~~~~~~~~~~~~~~~~~ + +When using the uWSGI server in combination with gevent, the Socket.IO server +can take advantage of uWSGI's native WebSocket support. + +Instances of class ``socketio.Server`` will automatically use this option for +asynchronous operations if both gevent and uWSGI are installed and eventlet is +not installed. To request this asynchoronous mode explicitly, the +``async_mode`` option can be given in the constructor:: + + # gevent with uWSGI + sio = socketio.Server(async_mode='gevent_uwsgi') + +A complete explanation of the configuration and usage of the uWSGI server is +beyond the scope of this documentation. The uWSGI server is a fairly complex +package that provides a large and comprehensive set of options. It must be +compiled with WebSocket and SSL support for the WebSocket transport to be +available. As way of an introduction, the following command starts a uWSGI +server for the ``latency.py`` example on port 5000:: + + $ uwsgi --http :5000 --gevent 1000 --http-websockets --master --wsgi-file latency.py --callable app + Standard Threading Library ~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/examples/app.py b/examples/app.py index 5f3f63e..7f4fe4a 100755 --- a/examples/app.py +++ b/examples/app.py @@ -1,5 +1,6 @@ -# set async_mode to 'threading', 'eventlet' or 'gevent' to force a mode -# else, the best mode is selected automatically from what's installed +# set async_mode to 'threading', 'eventlet', 'gevent' or 'gevent_uwsgi' to +# force a mode else, the best mode is selected automatically from what's +# installed async_mode = None import time @@ -108,5 +109,9 @@ if __name__ == '__main__': handler_class=WebSocketHandler).serve_forever() else: pywsgi.WSGIServer(('', 5000), app).serve_forever() + elif sio.async_mode == 'gevent_uwsgi': + print('Start the application through the uwsgi server. Example:') + print('uwsgi --http :5000 --gevent 1000 --http-websockets --master ' + '--wsgi-file app.py --callable app') else: print('Unknown async_mode: ' + sio.async_mode) diff --git a/examples/latency.py b/examples/latency.py index e43a2f3..45252f9 100755 --- a/examples/latency.py +++ b/examples/latency.py @@ -1,11 +1,11 @@ -from flask import Flask, render_template +# set async_mode to 'threading', 'eventlet', 'gevent' or 'gevent_uwsgi' to +# force a mode else, the best mode is selected automatically from what's +# installed +async_mode = None +from flask import Flask, render_template import socketio -# set async_mode to 'threading', 'eventlet' or 'gevent' to force a mode -# else, the best mode is selected automatically from what's installed -async_mode = None - sio = socketio.Server(async_mode=async_mode) app = Flask(__name__) app.wsgi_app = socketio.Middleware(sio, app.wsgi_app) @@ -43,5 +43,9 @@ if __name__ == '__main__': handler_class=WebSocketHandler).serve_forever() else: pywsgi.WSGIServer(('', 5000), app).serve_forever() + elif sio.async_mode == 'gevent_uwsgi': + print('Start the application through the uwsgi server. Example:') + print('uwsgi --http :5000 --gevent 1000 --http-websockets --master ' + '--wsgi-file latency.py --callable app') else: print('Unknown async_mode: ' + sio.async_mode) diff --git a/setup.py b/setup.py index cc620d0..15478a3 100755 --- a/setup.py +++ b/setup.py @@ -4,15 +4,19 @@ python-socketio Socket.IO server. """ +import re from setuptools import setup +with open('socketio/__init__.py', 'r') as f: + version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]', + f.read(), re.MULTILINE).group(1) with open('README.rst', 'r') as f: long_description = f.read() setup( name='python-socketio', - version='1.4.4', + version=version, url='http://github.com/miguelgrinberg/python-socketio/', license='MIT', author='Miguel Grinberg', @@ -25,7 +29,7 @@ setup( platforms='any', install_requires=[ 'six>=1.9.0', - 'python-engineio>=0.8.0' + 'python-engineio>=1.0.0' ], tests_require=[ 'mock', diff --git a/socketio/__init__.py b/socketio/__init__.py index b4b0362..e4cda4c 100644 --- a/socketio/__init__.py +++ b/socketio/__init__.py @@ -8,6 +8,8 @@ from .redis_manager import RedisManager from .server import Server from .util import apply_interceptor, ignore_interceptor -__all__ = [Interceptor, Middleware, Namespace, Server, BaseManager, - PubSubManager, KombuManager, RedisManager, +__version__ = '1.5.1' + +__all__ = [__version__, Interceptor, Middleware, Namespace, Server, + BaseManager, PubSubManager, KombuManager, RedisManager, apply_interceptor, ignore_interceptor] diff --git a/socketio/base_manager.py b/socketio/base_manager.py index cdbaa80..5b98990 100644 --- a/socketio/base_manager.py +++ b/socketio/base_manager.py @@ -16,6 +16,7 @@ class BaseManager(object): self.server = None self.rooms = {} self.callbacks = {} + self.pending_disconnect = {} def initialize(self, server): self.server = server @@ -35,11 +36,26 @@ class BaseManager(object): self.enter_room(sid, namespace, sid) def is_connected(self, sid, namespace): + if namespace in self.pending_disconnect and \ + sid in self.pending_disconnect[namespace]: + # the client is in the process of being disconnected + return False try: return self.rooms[namespace][None][sid] except KeyError: pass + def pre_disconnect(self, sid, namespace): + """Put the client in the to-be-disconnected list. + + This allows the client data structures to be present while the + disconnect handler is invoked, but still recognize the fact that the + client is soon going away. + """ + if namespace not in self.pending_disconnect: + self.pending_disconnect[namespace] = [] + self.pending_disconnect[namespace].append(sid) + def disconnect(self, sid, namespace): """Register a client disconnect from a namespace.""" rooms = [] @@ -52,6 +68,11 @@ class BaseManager(object): del self.callbacks[sid][namespace] if len(self.callbacks[sid]) == 0: del self.callbacks[sid] + if namespace in self.pending_disconnect and \ + sid in self.pending_disconnect[namespace]: + self.pending_disconnect[namespace].remove(sid) + if len(self.pending_disconnect[namespace]) == 0: + del self.pending_disconnect[namespace] def enter_room(self, sid, namespace, room): """Add a client to a room.""" diff --git a/socketio/server.py b/socketio/server.py index bbd08f4..3e69e31 100644 --- a/socketio/server.py +++ b/socketio/server.py @@ -31,15 +31,21 @@ class Server(object): packets. Custom json modules must have ``dumps`` and ``loads`` functions that are compatible with the standard library versions. + :param async_handlers: If set to ``True``, event handlers are executed in + separate threads. To run handlers synchronously, + set to ``False``. The default is ``False``. :param kwargs: Connection parameters for the underlying Engine.IO server. The Engine.IO configuration supports the following settings: - :param async_mode: The library used for asynchronous operations. Valid - options are "threading", "eventlet" and "gevent". If - this argument is not given, "eventlet" is tried first, - then "gevent", and finally "threading". The websocket - transport is only supported in "eventlet" mode. + :param async_mode: The asynchronous model to use. See the Deployment + section in the documentation for a description of the + available options. Valid async modes are "threading", + "eventlet", "gevent" and "gevent_uwsgi". If this + argument is not given, "eventlet" is tried first, then + "gevent_uwsgi", then "gevent", and finally "threading". + The first async mode that has all its dependencies + installed is then one that is chosen. :param ping_timeout: The time in seconds that the client waits for the server to respond before disconnecting. :param ping_interval: The interval in seconds at which the client pings @@ -63,7 +69,7 @@ class Server(object): ``False``. """ def __init__(self, client_manager=None, logger=False, binary=False, - json=None, **kwargs): + json=None, async_handlers=False, **kwargs): engineio_options = kwargs engineio_logger = engineio_options.pop('engineio_logger', None) if engineio_logger is not None: @@ -71,6 +77,7 @@ class Server(object): if json is not None: packet.Packet.json = json engineio_options['json'] = json + engineio_options['async_handlers'] = False self.eio = engineio.Server(**engineio_options) self.eio.on('connect', self._handle_eio_connect) self.eio.on('message', self._handle_eio_message) @@ -101,6 +108,8 @@ class Server(object): self.manager = client_manager self.manager_initialized = False + self.async_handlers = async_handlers + self.async_mode = self.eio.async_mode def on(self, event, handler=None, namespace=None): @@ -293,11 +302,13 @@ class Server(object): argument is omitted the default namespace is used. """ namespace = namespace or '/' - self.logger.info('Disconnecting %s [%s]', sid, namespace) - self._send_packet(sid, packet.Packet(packet.DISCONNECT, - namespace=namespace)) - self._trigger_event('disconnect', namespace, sid) - self.manager.disconnect(sid, namespace=namespace) + if self.manager.is_connected(sid, namespace=namespace): + self.logger.info('Disconnecting %s [%s]', sid, namespace) + self.manager.pre_disconnect(sid, namespace=namespace) + self._send_packet(sid, packet.Packet(packet.DISCONNECT, + namespace=namespace)) + self._trigger_event('disconnect', namespace, sid) + self.manager.disconnect(sid, namespace=namespace) def transport(self, sid): """Return the name of the transport used by the client. @@ -418,7 +429,14 @@ class Server(object): namespace = namespace or '/' self.logger.info('received event "%s" from %s [%s]', data[0], sid, namespace) - r = self._trigger_event(data[0], namespace, sid, *data[1:]) + if self.async_handlers: + self.start_background_task(self._handle_event_internal, self, sid, + data, namespace, id) + else: + self._handle_event_internal(self, sid, data, namespace, id) + + def _handle_event_internal(self, server, sid, data, namespace, id): + r = server._trigger_event(data[0], namespace, sid, *data[1:]) if id is not None: # send ACK packet with the response returned by the handler # tuples are expanded as multiple arguments @@ -432,10 +450,10 @@ class Server(object): binary = False # pragma: nocover else: binary = None - self._send_packet(sid, packet.Packet(packet.ACK, - namespace=namespace, - id=id, data=data, - binary=binary)) + server._send_packet(sid, packet.Packet(packet.ACK, + namespace=namespace, + id=id, data=data, + binary=binary)) def _handle_ack(self, sid, namespace, id, data): """Handle ACK packets from the client.""" diff --git a/tests/test_base_manager.py b/tests/test_base_manager.py index b916ddc..2a801be 100644 --- a/tests/test_base_manager.py +++ b/tests/test_base_manager.py @@ -24,6 +24,20 @@ class TestBaseManager(unittest.TestCase): self.assertEqual(self.bm.rooms['/foo'], {None: {'123': True}, '123': {'123': True}}) + def test_pre_disconnect(self): + self.bm.connect('123', '/foo') + self.bm.connect('456', '/foo') + self.bm.pre_disconnect('123', '/foo') + self.assertEqual(self.bm.pending_disconnect, {'/foo': ['123']}) + self.assertFalse(self.bm.is_connected('123', '/foo')) + self.bm.pre_disconnect('456', '/foo') + self.assertEqual(self.bm.pending_disconnect, {'/foo': ['123', '456']}) + self.assertFalse(self.bm.is_connected('456', '/foo')) + self.bm.disconnect('123', '/foo') + self.assertEqual(self.bm.pending_disconnect, {'/foo': ['456']}) + self.bm.disconnect('456', '/foo') + self.assertEqual(self.bm.pending_disconnect, {}) + def test_disconnect(self): self.bm.connect('123', '/foo') self.bm.connect('456', '/foo') diff --git a/tests/test_server.py b/tests/test_server.py index b10a8cd..ec3baaf 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -23,13 +23,15 @@ class TestServer(unittest.TestCase): def test_create(self, eio): mgr = mock.MagicMock() - s = server.Server(client_manager=mgr, binary=True, foo='bar') + s = server.Server(client_manager=mgr, binary=True, + async_handlers=True, foo='bar') s.handle_request({}, None) s.handle_request({}, None) - eio.assert_called_once_with(**{'foo': 'bar'}) + eio.assert_called_once_with(**{'foo': 'bar', 'async_handlers': False}) self.assertEqual(s.manager, mgr) self.assertEqual(s.eio.on.call_count, 3) self.assertEqual(s.binary, True) + self.assertEqual(s.async_handlers, True) self.assertEqual(mgr.initialize.call_count, 1) def test_on_event(self, eio): @@ -486,6 +488,23 @@ class TestServer(unittest.TestCase): s.disconnect('123', namespace='/foo') s.eio.send.assert_any_call('123', '1/foo', binary=False) + def test_disconnect_twice(self, eio): + s = server.Server() + s._handle_eio_connect('123', 'environ') + s.disconnect('123') + calls = s.eio.send.call_count + s.disconnect('123') + self.assertEqual(calls, s.eio.send.call_count) + + def test_disconnect_twice_namespace(self, eio): + s = server.Server() + s._handle_eio_connect('123', 'environ') + s._handle_eio_message('123', '0/foo') + s.disconnect('123', namespace='/foo') + calls = s.eio.send.call_count + s.disconnect('123', namespace='/foo') + self.assertEqual(calls, s.eio.send.call_count) + def test_namespace_handler(self, eio): result = {} @@ -527,6 +546,8 @@ class TestServer(unittest.TestCase): self.assertRaises(ValueError, s.register_namespace, 123) self.assertRaises(ValueError, s.register_namespace, Dummy) self.assertRaises(ValueError, s.register_namespace, Dummy()) + self.assertRaises(ValueError, s.register_namespace, + namespace.Namespace) def test_logger(self, eio): s = server.Server(logger=False) @@ -543,7 +564,8 @@ class TestServer(unittest.TestCase): def test_engineio_logger(self, eio): server.Server(engineio_logger='foo') - eio.assert_called_once_with(**{'logger': 'foo'}) + eio.assert_called_once_with(**{'logger': 'foo', + 'async_handlers': False}) def test_custom_json(self, eio): # Warning: this test cannot run in parallel with other tests, as it @@ -559,7 +581,8 @@ class TestServer(unittest.TestCase): return '+++ decoded +++' server.Server(json=CustomJSON) - eio.assert_called_once_with(**{'json': CustomJSON}) + eio.assert_called_once_with(**{'json': CustomJSON, + 'async_handlers': False}) pkt = packet.Packet(packet_type=packet.EVENT, data={six.text_type('foo'): six.text_type('bar')}) @@ -570,6 +593,13 @@ class TestServer(unittest.TestCase): # restore the default JSON module packet.Packet.json = json + def test_async_handlers(self, eio): + s = server.Server(async_handlers=True) + s._handle_eio_message('123', '2["my message","a","b","c"]') + s.eio.start_background_task.assert_called_once_with( + s._handle_event_internal, s, '123', ['my message', 'a', 'b', 'c'], + '/', None) + def test_start_background_task(self, eio): s = server.Server() s.start_background_task('foo', 'bar', baz='baz')