diff --git a/docs/index.rst b/docs/index.rst index 30120d0..c9c42e6 100755 --- a/docs/index.rst +++ b/docs/index.rst @@ -13,14 +13,18 @@ integrated with a Python WSGI application. The following are some of its features: - Fully compatible with the Javascript - `socket.io-client `_ library. + `socket.io-client `_ library, + versions 1.3.5 and up. - Compatible with Python 2.7 and Python 3.3+. -- Based on `Eventlet `_, enabling large number of - clients even on modest hardware. +- Supports large number of clients even on modest hardware when used with an + asynchronous server based on `eventlet `_ or + `gevent `_. For development and testing, any WSGI + complaint multi-threaded server can be used. - Includes a WSGI middleware that integrates Socket.IO traffic with standard WSGI applications. -- Broadcasting of messages to all or a subset of the connected clients. -- Event-based architecture implemented with decorators that hides the +- Broadcasting of messages to all connected clients, or to subsets of them + assigned to "rooms". +- Uses an event-based architecture implemented with decorators that hides the details of the protocol. - Support for HTTP long-polling and WebSocket transports. - Support for XHR2 and XHR browsers. @@ -95,9 +99,9 @@ Rooms ----- Because Socket.IO is a bidirectional protocol, the server can send messages to -any connected client at any time. To make it easy to address specific subsets -of clients, the application can put clients into rooms, and then address -messages to all the clients in a room. +any connected client at any time. To make it easy to address groups of clients, +the application can put clients into rooms, and then address messages to all +the clients in a room. When clients first connect, they are assigned to their own rooms, named with the session ID (the ``sid`` argument passed to all event handlers). The @@ -185,13 +189,152 @@ as a pathname following the hostname and port. For example, connecting to *http://example.com:8000/chat* would open a connection to the namespace */chat*. -Event handlers and rooms are maintained separately for each namespace, so it is -important that applications that use multiple namespaces specify the correct -namespace when setting up event handlers and rooms, using the ``namespace`` -argument available in All the methods in the :class:`socketio.Server` class. +Each namespace is handled independently from the others, with separate event +handlers and rooms. It is important that applications that use multiple +namespaces specify the correct namespace when setting up their event handlers +and rooms, using the optional ``namespace`` argument available in all the +methods in the :class:`socketio.Server` class. When the ``namespace`` argument is omitted, set to ``None`` or to ``'/'``, the -default namespace is used. +default namespace, representing the physical connection, is used. + +Deployment +---------- + +The following sections describe a variety of deployment strategies for +Socket.IO servers. + +Eventlet +~~~~~~~~ + +`Eventlet `_ is a high performance concurrent networking +library for Python 2 and 3 that uses coroutines, enabling code to be written in +the same style used with the blocking standard library functions. An Socket.IO +server deployed with eventlet has access to the long-polling and WebSocket +transports. + +Instances of class ``socketio.Server`` will automatically use eventlet for +asynchronous operations if the library is installed. To request its use +explicitly, the ``async_mode`` option can be given in the constructor:: + + sio = socketio.Server(async_mode='eventlet') + +A server configured for eventlet is deployed as a regular WSGI application, +using the provided ``socketio.Middleware``:: + + app = socketio.Middleware(sio) + import eventlet + eventlet.wsgi.server(eventlet.listen(('', 8000)), app) + +An alternative to running the eventlet WSGI server as above is to use +`gunicorn `_, a fully featured pure Python web server. The +command to launch the application under gunicorn is shown below:: + + $ gunicorn -k eventlet -w 1 module:app + +Due to limitations in its load balancing algorithm, gunicorn can only be used +with one worker process, so the ``-w 1`` option is required. Note that a +single eventlet worker can handle a large number of concurrent clients. + +Another limitation when using gunicorn is that the WebSocket transport is not +available, because this transport it requires extensions to the WSGI standard. + +Note: Eventlet provides a ``monkey_patch()`` function that replaces all the +blocking functions in the standard library with equivalent asynchronous +versions. While python-socketio does not require monkey patching, other +libraries such as database drivers are likely to require it. + +Gevent +~~~~~~ + +`Gevent `_ is another asynchronous framework based on +coroutines, very similar to eventlet. Only the long-polling transport is +currently available when using gevent. + +Instances of class ``socketio.Server`` will automatically use gevent for +asynchronous operations if the library is installed and eventlet is not +installed. To request gevent to be selected explicitly, the ``async_mode`` +option can be given in the constructor:: + + eio = socketio.Server(async_mode='gevent') + +A server configured for gevent is deployed as a regular WSGI application, +using the provided ``socketio.Middleware``:: + + app = socketio.Middleware(eio) + from gevent import pywsgi + pywsgi.WSGIServer(('', 8000), app).serve_forever() + +An alternative to running the eventlet WSGI server as above is to use +`gunicorn `_, a fully featured pure Python web server. The +command to launch the application under gunicorn is shown below:: + + $ gunicorn -k gevent -w 1 module:app + +Same as with eventlet, due to limitations in its load balancing algorithm, +gunicorn can only be used with one worker process, so the ``-w 1`` option is +required. Note that a single eventlet worker can handle a large number of +concurrent clients. + +Note: Gevent provides a ``monkey_patch()`` function that replaces all the +blocking functions in the standard library with equivalent asynchronous +versions. While python-socketio does not require monkey patching, other +libraries such as database drivers are likely to require it. + +Standard Threading Library +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +While not comparable to eventlet and gevent in terms of performance, +the Socket.IO server can also be configured to work with multi-threaded web +servers that use standard Python threads. This is an ideal setup to use with +development servers such as `Werkzeug `_. Only the +long-polling transport is currently available when using standard threads. + +Instances of class ``socketio.Server`` will automatically use the threading +mode if neither eventlet nor gevent are not installed. To request the +threading mode explicitly, the ``async_mode`` option can be given in the +constructor:: + + sio = socketio.Server(async_mode='threading') + +A server configured for threading is deployed as a regular web application, +using any WSGI complaint multi-threaded server. The example below deploys an +Socket.IO application combined with a Flask web application, using Flask's +development web server based on Werkzeug:: + + sio = socketio.Server(async_mode='threading') + app = Flask(__name__) + app.wsgi_app = socketio.Middleware(sio, app.wsgi_app) + + # ... Socket.IO and Flask handler functions ... + + if __name__ == '__main__': + app.run(threaded=True) + +When using the threading mode, it is important to ensure that the WSGI server +can handle multiple concurrent requests using threads, since a client can have +up to two outstanding requests at any given time. The Werkzeug server is +single-threaded by default, so the ``threaded=True`` option is required. + +Multi-process deployments +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Socket.IO is a stateful protocol, which makes horizontal scaling more +difficult. To deploy a cluster of Socket.IO processes (hosted on one or +multiple servers), the following conditions must be met: + +- Each Socket.IO process must be able to handle multiple requests, either by + using eventlet, gevent, or standard threads. Worker processes that only + handle one request at a time are not supported. +- The load balancer must be configured to always forward requests from a + client to the same process. Load balancers call this *sticky sessions*, or + *session affinity*. + +A limitation in the current release of the Socket.IO server is that because +the clients are randomly assigned to different server processes, any form of +broadcasting is not supported. A storage backend that enables multiple +processes to share information about clients is currently in development to +address this important limitation. API Reference ------------- diff --git a/example/app.py b/example/app.py index 74a4279..801ca21 100755 --- a/example/app.py +++ b/example/app.py @@ -1,17 +1,21 @@ -import eventlet -eventlet.monkey_patch() +# set this to 'threading', 'eventlet', or 'gevent' +async_mode = 'threading' + +if async_mode == 'eventlet': + import eventlet + eventlet.monkey_patch() +elif async_mode == 'gevent': + from gevent import monkey + monkey.patch_all() import time from threading import Thread -import eventlet -from eventlet import wsgi from flask import Flask, render_template -from socketio import Server as SocketIOServer -from socketio import Middleware as SocketIOMiddleware +import socketio -socketio = SocketIOServer(logger=True) +sio = socketio.Server(logger=True, async_mode=async_mode) app = Flask(__name__) -app.debug = True +app.wsgi_app = socketio.Middleware(sio, app.wsgi_app) app.config['SECRET_KEY'] = 'secret!' thread = None @@ -22,8 +26,8 @@ def background_thread(): while True: time.sleep(10) count += 1 - socketio.emit('my response', - {'data': 'Server generated event'}, namespace='/test') + sio.emit('my response', {'data': 'Server generated event'}, + namespace='/test') @app.route('/') @@ -35,64 +39,72 @@ def index(): return render_template('index.html') -@socketio.on('my event', namespace='/test') +@sio.on('my event', namespace='/test') def test_message(sid, message): - socketio.emit('my response', {'data': message['data']}, - room=sid, namespace='/test') + sio.emit('my response', {'data': message['data']}, room=sid, + namespace='/test') -@socketio.on('my broadcast event', namespace='/test') +@sio.on('my broadcast event', namespace='/test') def test_broadcast_message(sid, message): - socketio.emit('my response', {'data': message['data']}, - namespace='/test') + sio.emit('my response', {'data': message['data']}, namespace='/test') -@socketio.on('join', namespace='/test') +@sio.on('join', namespace='/test') def join(sid, message): - socketio.enter_room(sid, message['room'], namespace='/test') - socketio.emit('my response', - {'data': 'Entered room: ' + message['room']}, - room=sid, namespace='/test') + sio.enter_room(sid, message['room'], namespace='/test') + sio.emit('my response', {'data': 'Entered room: ' + message['room']}, + room=sid, namespace='/test') -@socketio.on('leave', namespace='/test') +@sio.on('leave', namespace='/test') def leave(sid, message): - socketio.leave_room(sid, message['room'], namespace='/test') - socketio.emit('my response', - {'data': 'Left room: ' + message['room']}, - room=sid, namespace='/test') + sio.leave_room(sid, message['room'], namespace='/test') + sio.emit('my response', {'data': 'Left room: ' + message['room']}, + room=sid, namespace='/test') -@socketio.on('close room', namespace='/test') +@sio.on('close room', namespace='/test') def close(sid, message): - socketio.emit('my response', - {'data': 'Room ' + message['room'] + ' is closing.'}, - room=message['room'], namespace='/test') - socketio.close_room(message['room'], namespace='/test') + sio.emit('my response', + {'data': 'Room ' + message['room'] + ' is closing.'}, + room=message['room'], namespace='/test') + sio.close_room(message['room'], namespace='/test') -@socketio.on('my room event', namespace='/test') +@sio.on('my room event', namespace='/test') def send_room_message(sid, message): - socketio.emit('my response', {'data': message['data']}, - room=message['room'], namespace='/test') + sio.emit('my response', {'data': message['data']}, room=message['room'], + namespace='/test') -@socketio.on('disconnect request', namespace='/test') +@sio.on('disconnect request', namespace='/test') def disconnect_request(sid): - socketio.disconnect(sid, namespace='/test') + sio.disconnect(sid, namespace='/test') -@socketio.on('connect', namespace='/test') +@sio.on('connect', namespace='/test') def test_connect(sid, environ): - socketio.emit('my response', {'data': 'Connected', 'count': 0}, room=sid, - namespace='/test') + sio.emit('my response', {'data': 'Connected', 'count': 0}, room=sid, + namespace='/test') -@socketio.on('disconnect', namespace='/test') +@sio.on('disconnect', namespace='/test') def test_disconnect(sid): print('Client disconnected') if __name__ == '__main__': - app = SocketIOMiddleware(socketio, app) - wsgi.server(eventlet.listen(('', 5000)), app) + if async_mode == 'threading': + # deploy with Werkzeug + app.run(threaded=True) + elif async_mode == 'eventlet': + # deploy with eventlet + import eventlet + eventlet.wsgi.server(eventlet.listen(('', 5000)), app) + elif async_mode == 'gevent': + # deploy with gevent + from gevent import pywsgi + pywsgi.WSGIServer(('', 5000), app).serve_forever() + else: + print('Unknown async_mode: ' + async_mode) diff --git a/example/templates/index.html b/example/templates/index.html index 5c37340..402bf40 100755 --- a/example/templates/index.html +++ b/example/templates/index.html @@ -12,6 +12,9 @@ socket.on('connect', function() { socket.emit('my event', {data: 'I\'m connected!'}); }); + socket.on('disconnect', function() { + $('#log').append('
Disconnected'); + }); socket.on('my response', function(msg) { $('#log').append('
Received: ' + msg.data); }); diff --git a/setup.py b/setup.py index 0e16970..4c07924 100755 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ setup( install_requires=[ 'six>=1.9.0', 'eventlet>=0.17.4', - 'python-engineio>=0.2.0' + 'python-engineio>=0.5.0' ], tests_require=[ 'mock', diff --git a/socketio/base_manager.py b/socketio/base_manager.py index 8ba7925..fa83d5b 100755 --- a/socketio/base_manager.py +++ b/socketio/base_manager.py @@ -31,19 +31,18 @@ class BaseManager(object): self.enter_room(sid, namespace, None) self.enter_room(sid, namespace, sid) + def is_connected(self, sid, namespace): + return sid in self.rooms[namespace][None] and \ + self.rooms[namespace][None][sid] + def disconnect(self, sid, namespace): - """Register a client disconnect event.""" - if namespace == '/': - namespace_list = list(self.get_namespaces()) - else: - namespace_list = [namespace] - for n in namespace_list: - rooms = [] - for room_name, room in six.iteritems(self.rooms[n]): - if sid in room: - rooms.append(room_name) - for room in rooms: - self.leave_room(sid, n, room) + """Register a client disconnect from a namespace.""" + rooms = [] + for room_name, room in six.iteritems(self.rooms[namespace]): + if sid in room: + rooms.append(room_name) + for room in rooms: + self.leave_room(sid, namespace, room) def enter_room(self, sid, namespace, room): """Add a client to a room.""" diff --git a/socketio/server.py b/socketio/server.py index 7fe7c1f..1b57b5a 100755 --- a/socketio/server.py +++ b/socketio/server.py @@ -28,9 +28,15 @@ class Server(object): ``bytes`` values are treated as binary. This option has no effect on Python 3, where text and binary payloads are always automatically discovered. + :param kwargs: Connection parameters for the underlying Engine.IO server. - This ``engineio_options`` dictionary can include the following settings: + The Engine.IO configuration supports the following settings: + :param async_mode: The library used for asynchronous operations. Valid + options are "threading", "eventlet" and "gevent". If + this argument is not given, "eventlet" is tried first, + then "gevent", and finally "threading". The websocket + transport is only supported in "eventlet" mode. :param ping_timeout: The time in seconds that the client waits for the server to respond before disconnecting. :param ping_interval: The interval in seconds at which the client pings @@ -49,16 +55,19 @@ class Server(object): default. :param cors_credentials: Whether credentials (cookies, authentication) are allowed in requests to this server. - :param logger: To enable Engine.IO logging set to ``True`` or pass a logger - object to use. To disable logging set to ``False``. + :param engineio_logger: To enable Engine.IO logging set to ``True`` or pass + a logger object to use. To disable logging set to + ``False``. """ - def __init__(self, engineio_options=None, client_manager_class=None, - logger=False, binary=False): + def __init__(self, client_manager_class=None, logger=False, binary=False, + **kwargs): if client_manager_class is None: client_manager_class = base_manager.BaseManager self.manager = client_manager_class(self) - if engineio_options is None: - engineio_options = {} + engineio_options = kwargs + engineio_logger = engineio_options.pop('engineio_logger', None) + if engineio_logger is not None: + engineio_options['logger'] = engineio_logger self.eio = engineio.Server(**engineio_options) self.eio.on('connect', self._handle_eio_connect) self.eio.on('message', self._handle_eio_message) @@ -253,14 +262,11 @@ class Server(object): :param namespace: The Socket.IO namespace to disconnect. If this argument is omitted the default namespace is used. """ - self.logger.info('Disconnecting %s]', sid) - if namespace is None or namespace == '/': - for namespace in self.manager.get_namespaces(): - self._send_packet(sid, packet.Packet(packet.DISCONNECT, - namespace=namespace)) - else: - self._send_packet(sid, packet.Packet(packet.DISCONNECT, - namespace=namespace)) + namespace = namespace or '/' + self.logger.info('Disconnecting %s [%s]', sid, namespace) + self._send_packet(sid, packet.Packet(packet.DISCONNECT, + namespace=namespace)) + self._trigger_event('disconnect', namespace, sid) self.manager.disconnect(sid, namespace=namespace) def handle_request(self, environ, start_response): @@ -325,12 +331,12 @@ class Server(object): else: namespace_list = [namespace] for n in namespace_list: - if n != '/': + if n != '/' and self.manager.is_connected(sid, n): self._trigger_event('disconnect', n, sid) self.manager.disconnect(sid, n) if sid in self.callbacks and n in self.callbacks[sid]: del self.callbacks[sid][n] - if namespace == '/': + if namespace == '/' and self.manager.is_connected(sid, namespace): self._trigger_event('disconnect', '/', sid) self.manager.disconnect(sid, '/') if sid in self.callbacks: diff --git a/tests/test_base_manager.py b/tests/test_base_manager.py index 2ef5161..ffb1f43 100755 --- a/tests/test_base_manager.py +++ b/tests/test_base_manager.py @@ -40,7 +40,13 @@ class TestBaseManager(unittest.TestCase): self.bm.connect('123', '/foo') self.bm.connect('456', '/') self.bm.connect('456', '/foo') + self.assertTrue(self.bm.is_connected('123', '/')) + self.assertTrue(self.bm.is_connected('123', '/foo')) self.bm.disconnect('123', '/') + self.assertFalse(self.bm.is_connected('123', '/')) + self.assertTrue(self.bm.is_connected('123', '/foo')) + self.bm.disconnect('123', '/foo') + self.assertFalse(self.bm.is_connected('123', '/foo')) self.bm._clean_rooms() self.assertEqual(self.bm.rooms['/'], {None: {'456': True}, '456': {'456': True}}) @@ -53,7 +59,9 @@ class TestBaseManager(unittest.TestCase): self.bm.connect('456', '/') self.bm.connect('456', '/foo') self.bm.disconnect('123', '/') + self.bm.disconnect('123', '/foo') self.bm.disconnect('123', '/') + self.bm.disconnect('123', '/foo') self.bm._clean_rooms() self.assertEqual(self.bm.rooms['/'], {None: {'456': True}, '456': {'456': True}}) diff --git a/tests/test_server.py b/tests/test_server.py index 460fabe..34ac61a 100755 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -14,7 +14,7 @@ from socketio import server class TestServer(unittest.TestCase): def test_create(self, eio): mgr = mock.MagicMock() - s = server.Server({'foo': 'bar'}, mgr, binary=True) + s = server.Server(mgr, binary=True, foo='bar') mgr.assert_called_once_with(s) eio.assert_called_once_with(**{'foo': 'bar'}) self.assertEqual(s.eio.on.call_count, 3) @@ -345,12 +345,10 @@ class TestServer(unittest.TestCase): self.assertRaises(ValueError, s._handle_eio_message, '123', '32["foo",2]') - def test_disconnect_all(self, eio): + def test_disconnect(self, eio): s = server.Server() s._handle_eio_connect('123', 'environ') - s._handle_eio_message('123', '0/foo') s.disconnect('123') - s.eio.send.assert_any_call('123', '1/foo', binary=False) s.eio.send.assert_any_call('123', '1', binary=False) def test_disconnect_namespace(self, eio): @@ -367,3 +365,7 @@ class TestServer(unittest.TestCase): self.assertEqual(s.logger.getEffectiveLevel(), logging.INFO) s = server.Server(logger='foo') self.assertEqual(s.logger, 'foo') + + def test_engineio_logger(self, eio): + server.Server(engineio_logger='foo') + eio.assert_called_once_with(**{'logger': 'foo'})