Browse Source

Added support for gevent and standard threads besides eventlet

pull/5/head
Miguel Grinberg 10 years ago
parent
commit
8e570789aa
  1. 169
      docs/index.rst
  2. 82
      example/app.py
  3. 3
      example/templates/index.html
  4. 2
      setup.py
  5. 15
      socketio/base_manager.py
  6. 36
      socketio/server.py
  7. 8
      tests/test_base_manager.py
  8. 10
      tests/test_server.py

169
docs/index.rst

@ -13,14 +13,18 @@ integrated with a Python WSGI application. The following are some of its
features:
- Fully compatible with the Javascript
`socket.io-client <https://github.com/Automattic/socket.io-client>`_ library.
`socket.io-client <https://github.com/Automattic/socket.io-client>`_ library,
versions 1.3.5 and up.
- Compatible with Python 2.7 and Python 3.3+.
- Based on `Eventlet <http://eventlet.net/>`_, enabling large number of
clients even on modest hardware.
- Supports large number of clients even on modest hardware when used with an
asynchronous server based on `eventlet <http://eventlet.net/>`_ or
`gevent <http://gevent.org/>`_. For development and testing, any WSGI
complaint multi-threaded server can be used.
- Includes a WSGI middleware that integrates Socket.IO traffic with standard
WSGI applications.
- Broadcasting of messages to all or a subset of the connected clients.
- Event-based architecture implemented with decorators that hides the
- Broadcasting of messages to all connected clients, or to subsets of them
assigned to "rooms".
- Uses an event-based architecture implemented with decorators that hides the
details of the protocol.
- Support for HTTP long-polling and WebSocket transports.
- Support for XHR2 and XHR browsers.
@ -95,9 +99,9 @@ Rooms
-----
Because Socket.IO is a bidirectional protocol, the server can send messages to
any connected client at any time. To make it easy to address specific subsets
of clients, the application can put clients into rooms, and then address
messages to all the clients in a room.
any connected client at any time. To make it easy to address groups of clients,
the application can put clients into rooms, and then address messages to all
the clients in a room.
When clients first connect, they are assigned to their own rooms, named with
the session ID (the ``sid`` argument passed to all event handlers). The
@ -185,13 +189,152 @@ as a pathname following the hostname and port. For example, connecting to
*http://example.com:8000/chat* would open a connection to the namespace
*/chat*.
Event handlers and rooms are maintained separately for each namespace, so it is
important that applications that use multiple namespaces specify the correct
namespace when setting up event handlers and rooms, using the ``namespace``
argument available in All the methods in the :class:`socketio.Server` class.
Each namespace is handled independently from the others, with separate event
handlers and rooms. It is important that applications that use multiple
namespaces specify the correct namespace when setting up their event handlers
and rooms, using the optional ``namespace`` argument available in all the
methods in the :class:`socketio.Server` class.
When the ``namespace`` argument is omitted, set to ``None`` or to ``'/'``, the
default namespace is used.
default namespace, representing the physical connection, is used.
Deployment
----------
The following sections describe a variety of deployment strategies for
Socket.IO servers.
Eventlet
~~~~~~~~
`Eventlet <http://eventlet.net/>`_ is a high performance concurrent networking
library for Python 2 and 3 that uses coroutines, enabling code to be written in
the same style used with the blocking standard library functions. An Socket.IO
server deployed with eventlet has access to the long-polling and WebSocket
transports.
Instances of class ``socketio.Server`` will automatically use eventlet for
asynchronous operations if the library is installed. To request its use
explicitly, the ``async_mode`` option can be given in the constructor::
sio = socketio.Server(async_mode='eventlet')
A server configured for eventlet is deployed as a regular WSGI application,
using the provided ``socketio.Middleware``::
app = socketio.Middleware(sio)
import eventlet
eventlet.wsgi.server(eventlet.listen(('', 8000)), app)
An alternative to running the eventlet WSGI server as above is to use
`gunicorn <gunicorn.org>`_, a fully featured pure Python web server. The
command to launch the application under gunicorn is shown below::
$ gunicorn -k eventlet -w 1 module:app
Due to limitations in its load balancing algorithm, gunicorn can only be used
with one worker process, so the ``-w 1`` option is required. Note that a
single eventlet worker can handle a large number of concurrent clients.
Another limitation when using gunicorn is that the WebSocket transport is not
available, because this transport it requires extensions to the WSGI standard.
Note: Eventlet provides a ``monkey_patch()`` function that replaces all the
blocking functions in the standard library with equivalent asynchronous
versions. While python-socketio does not require monkey patching, other
libraries such as database drivers are likely to require it.
Gevent
~~~~~~
`Gevent <http://gevent.org>`_ is another asynchronous framework based on
coroutines, very similar to eventlet. Only the long-polling transport is
currently available when using gevent.
Instances of class ``socketio.Server`` will automatically use gevent for
asynchronous operations if the library is installed and eventlet is not
installed. To request gevent to be selected explicitly, the ``async_mode``
option can be given in the constructor::
eio = socketio.Server(async_mode='gevent')
A server configured for gevent is deployed as a regular WSGI application,
using the provided ``socketio.Middleware``::
app = socketio.Middleware(eio)
from gevent import pywsgi
pywsgi.WSGIServer(('', 8000), app).serve_forever()
An alternative to running the eventlet WSGI server as above is to use
`gunicorn <gunicorn.org>`_, a fully featured pure Python web server. The
command to launch the application under gunicorn is shown below::
$ gunicorn -k gevent -w 1 module:app
Same as with eventlet, due to limitations in its load balancing algorithm,
gunicorn can only be used with one worker process, so the ``-w 1`` option is
required. Note that a single eventlet worker can handle a large number of
concurrent clients.
Note: Gevent provides a ``monkey_patch()`` function that replaces all the
blocking functions in the standard library with equivalent asynchronous
versions. While python-socketio does not require monkey patching, other
libraries such as database drivers are likely to require it.
Standard Threading Library
~~~~~~~~~~~~~~~~~~~~~~~~~~
While not comparable to eventlet and gevent in terms of performance,
the Socket.IO server can also be configured to work with multi-threaded web
servers that use standard Python threads. This is an ideal setup to use with
development servers such as `Werkzeug <http://werkzeug.pocoo.org>`_. Only the
long-polling transport is currently available when using standard threads.
Instances of class ``socketio.Server`` will automatically use the threading
mode if neither eventlet nor gevent are not installed. To request the
threading mode explicitly, the ``async_mode`` option can be given in the
constructor::
sio = socketio.Server(async_mode='threading')
A server configured for threading is deployed as a regular web application,
using any WSGI complaint multi-threaded server. The example below deploys an
Socket.IO application combined with a Flask web application, using Flask's
development web server based on Werkzeug::
sio = socketio.Server(async_mode='threading')
app = Flask(__name__)
app.wsgi_app = socketio.Middleware(sio, app.wsgi_app)
# ... Socket.IO and Flask handler functions ...
if __name__ == '__main__':
app.run(threaded=True)
When using the threading mode, it is important to ensure that the WSGI server
can handle multiple concurrent requests using threads, since a client can have
up to two outstanding requests at any given time. The Werkzeug server is
single-threaded by default, so the ``threaded=True`` option is required.
Multi-process deployments
~~~~~~~~~~~~~~~~~~~~~~~~~
Socket.IO is a stateful protocol, which makes horizontal scaling more
difficult. To deploy a cluster of Socket.IO processes (hosted on one or
multiple servers), the following conditions must be met:
- Each Socket.IO process must be able to handle multiple requests, either by
using eventlet, gevent, or standard threads. Worker processes that only
handle one request at a time are not supported.
- The load balancer must be configured to always forward requests from a
client to the same process. Load balancers call this *sticky sessions*, or
*session affinity*.
A limitation in the current release of the Socket.IO server is that because
the clients are randomly assigned to different server processes, any form of
broadcasting is not supported. A storage backend that enables multiple
processes to share information about clients is currently in development to
address this important limitation.
API Reference
-------------

82
example/app.py

@ -1,17 +1,21 @@
# set this to 'threading', 'eventlet', or 'gevent'
async_mode = 'threading'
if async_mode == 'eventlet':
import eventlet
eventlet.monkey_patch()
elif async_mode == 'gevent':
from gevent import monkey
monkey.patch_all()
import time
from threading import Thread
import eventlet
from eventlet import wsgi
from flask import Flask, render_template
from socketio import Server as SocketIOServer
from socketio import Middleware as SocketIOMiddleware
import socketio
socketio = SocketIOServer(logger=True)
sio = socketio.Server(logger=True, async_mode=async_mode)
app = Flask(__name__)
app.debug = True
app.wsgi_app = socketio.Middleware(sio, app.wsgi_app)
app.config['SECRET_KEY'] = 'secret!'
thread = None
@ -22,8 +26,8 @@ def background_thread():
while True:
time.sleep(10)
count += 1
socketio.emit('my response',
{'data': 'Server generated event'}, namespace='/test')
sio.emit('my response', {'data': 'Server generated event'},
namespace='/test')
@app.route('/')
@ -35,64 +39,72 @@ def index():
return render_template('index.html')
@socketio.on('my event', namespace='/test')
@sio.on('my event', namespace='/test')
def test_message(sid, message):
socketio.emit('my response', {'data': message['data']},
room=sid, namespace='/test')
sio.emit('my response', {'data': message['data']}, room=sid,
namespace='/test')
@socketio.on('my broadcast event', namespace='/test')
@sio.on('my broadcast event', namespace='/test')
def test_broadcast_message(sid, message):
socketio.emit('my response', {'data': message['data']},
namespace='/test')
sio.emit('my response', {'data': message['data']}, namespace='/test')
@socketio.on('join', namespace='/test')
@sio.on('join', namespace='/test')
def join(sid, message):
socketio.enter_room(sid, message['room'], namespace='/test')
socketio.emit('my response',
{'data': 'Entered room: ' + message['room']},
sio.enter_room(sid, message['room'], namespace='/test')
sio.emit('my response', {'data': 'Entered room: ' + message['room']},
room=sid, namespace='/test')
@socketio.on('leave', namespace='/test')
@sio.on('leave', namespace='/test')
def leave(sid, message):
socketio.leave_room(sid, message['room'], namespace='/test')
socketio.emit('my response',
{'data': 'Left room: ' + message['room']},
sio.leave_room(sid, message['room'], namespace='/test')
sio.emit('my response', {'data': 'Left room: ' + message['room']},
room=sid, namespace='/test')
@socketio.on('close room', namespace='/test')
@sio.on('close room', namespace='/test')
def close(sid, message):
socketio.emit('my response',
sio.emit('my response',
{'data': 'Room ' + message['room'] + ' is closing.'},
room=message['room'], namespace='/test')
socketio.close_room(message['room'], namespace='/test')
sio.close_room(message['room'], namespace='/test')
@socketio.on('my room event', namespace='/test')
@sio.on('my room event', namespace='/test')
def send_room_message(sid, message):
socketio.emit('my response', {'data': message['data']},
room=message['room'], namespace='/test')
sio.emit('my response', {'data': message['data']}, room=message['room'],
namespace='/test')
@socketio.on('disconnect request', namespace='/test')
@sio.on('disconnect request', namespace='/test')
def disconnect_request(sid):
socketio.disconnect(sid, namespace='/test')
sio.disconnect(sid, namespace='/test')
@socketio.on('connect', namespace='/test')
@sio.on('connect', namespace='/test')
def test_connect(sid, environ):
socketio.emit('my response', {'data': 'Connected', 'count': 0}, room=sid,
sio.emit('my response', {'data': 'Connected', 'count': 0}, room=sid,
namespace='/test')
@socketio.on('disconnect', namespace='/test')
@sio.on('disconnect', namespace='/test')
def test_disconnect(sid):
print('Client disconnected')
if __name__ == '__main__':
app = SocketIOMiddleware(socketio, app)
wsgi.server(eventlet.listen(('', 5000)), app)
if async_mode == 'threading':
# deploy with Werkzeug
app.run(threaded=True)
elif async_mode == 'eventlet':
# deploy with eventlet
import eventlet
eventlet.wsgi.server(eventlet.listen(('', 5000)), app)
elif async_mode == 'gevent':
# deploy with gevent
from gevent import pywsgi
pywsgi.WSGIServer(('', 5000), app).serve_forever()
else:
print('Unknown async_mode: ' + async_mode)

3
example/templates/index.html

@ -12,6 +12,9 @@
socket.on('connect', function() {
socket.emit('my event', {data: 'I\'m connected!'});
});
socket.on('disconnect', function() {
$('#log').append('<br>Disconnected');
});
socket.on('my response', function(msg) {
$('#log').append('<br>Received: ' + msg.data);
});

2
setup.py

@ -26,7 +26,7 @@ setup(
install_requires=[
'six>=1.9.0',
'eventlet>=0.17.4',
'python-engineio>=0.2.0'
'python-engineio>=0.5.0'
],
tests_require=[
'mock',

15
socketio/base_manager.py

@ -31,19 +31,18 @@ class BaseManager(object):
self.enter_room(sid, namespace, None)
self.enter_room(sid, namespace, sid)
def is_connected(self, sid, namespace):
return sid in self.rooms[namespace][None] and \
self.rooms[namespace][None][sid]
def disconnect(self, sid, namespace):
"""Register a client disconnect event."""
if namespace == '/':
namespace_list = list(self.get_namespaces())
else:
namespace_list = [namespace]
for n in namespace_list:
"""Register a client disconnect from a namespace."""
rooms = []
for room_name, room in six.iteritems(self.rooms[n]):
for room_name, room in six.iteritems(self.rooms[namespace]):
if sid in room:
rooms.append(room_name)
for room in rooms:
self.leave_room(sid, n, room)
self.leave_room(sid, namespace, room)
def enter_room(self, sid, namespace, room):
"""Add a client to a room."""

36
socketio/server.py

@ -28,9 +28,15 @@ class Server(object):
``bytes`` values are treated as binary. This option has no
effect on Python 3, where text and binary payloads are
always automatically discovered.
:param kwargs: Connection parameters for the underlying Engine.IO server.
This ``engineio_options`` dictionary can include the following settings:
The Engine.IO configuration supports the following settings:
:param async_mode: The library used for asynchronous operations. Valid
options are "threading", "eventlet" and "gevent". If
this argument is not given, "eventlet" is tried first,
then "gevent", and finally "threading". The websocket
transport is only supported in "eventlet" mode.
:param ping_timeout: The time in seconds that the client waits for the
server to respond before disconnecting.
:param ping_interval: The interval in seconds at which the client pings
@ -49,16 +55,19 @@ class Server(object):
default.
:param cors_credentials: Whether credentials (cookies, authentication) are
allowed in requests to this server.
:param logger: To enable Engine.IO logging set to ``True`` or pass a logger
object to use. To disable logging set to ``False``.
:param engineio_logger: To enable Engine.IO logging set to ``True`` or pass
a logger object to use. To disable logging set to
``False``.
"""
def __init__(self, engineio_options=None, client_manager_class=None,
logger=False, binary=False):
def __init__(self, client_manager_class=None, logger=False, binary=False,
**kwargs):
if client_manager_class is None:
client_manager_class = base_manager.BaseManager
self.manager = client_manager_class(self)
if engineio_options is None:
engineio_options = {}
engineio_options = kwargs
engineio_logger = engineio_options.pop('engineio_logger', None)
if engineio_logger is not None:
engineio_options['logger'] = engineio_logger
self.eio = engineio.Server(**engineio_options)
self.eio.on('connect', self._handle_eio_connect)
self.eio.on('message', self._handle_eio_message)
@ -253,14 +262,11 @@ class Server(object):
:param namespace: The Socket.IO namespace to disconnect. If this
argument is omitted the default namespace is used.
"""
self.logger.info('Disconnecting %s]', sid)
if namespace is None or namespace == '/':
for namespace in self.manager.get_namespaces():
self._send_packet(sid, packet.Packet(packet.DISCONNECT,
namespace=namespace))
else:
namespace = namespace or '/'
self.logger.info('Disconnecting %s [%s]', sid, namespace)
self._send_packet(sid, packet.Packet(packet.DISCONNECT,
namespace=namespace))
self._trigger_event('disconnect', namespace, sid)
self.manager.disconnect(sid, namespace=namespace)
def handle_request(self, environ, start_response):
@ -325,12 +331,12 @@ class Server(object):
else:
namespace_list = [namespace]
for n in namespace_list:
if n != '/':
if n != '/' and self.manager.is_connected(sid, n):
self._trigger_event('disconnect', n, sid)
self.manager.disconnect(sid, n)
if sid in self.callbacks and n in self.callbacks[sid]:
del self.callbacks[sid][n]
if namespace == '/':
if namespace == '/' and self.manager.is_connected(sid, namespace):
self._trigger_event('disconnect', '/', sid)
self.manager.disconnect(sid, '/')
if sid in self.callbacks:

8
tests/test_base_manager.py

@ -40,7 +40,13 @@ class TestBaseManager(unittest.TestCase):
self.bm.connect('123', '/foo')
self.bm.connect('456', '/')
self.bm.connect('456', '/foo')
self.assertTrue(self.bm.is_connected('123', '/'))
self.assertTrue(self.bm.is_connected('123', '/foo'))
self.bm.disconnect('123', '/')
self.assertFalse(self.bm.is_connected('123', '/'))
self.assertTrue(self.bm.is_connected('123', '/foo'))
self.bm.disconnect('123', '/foo')
self.assertFalse(self.bm.is_connected('123', '/foo'))
self.bm._clean_rooms()
self.assertEqual(self.bm.rooms['/'], {None: {'456': True},
'456': {'456': True}})
@ -53,7 +59,9 @@ class TestBaseManager(unittest.TestCase):
self.bm.connect('456', '/')
self.bm.connect('456', '/foo')
self.bm.disconnect('123', '/')
self.bm.disconnect('123', '/foo')
self.bm.disconnect('123', '/')
self.bm.disconnect('123', '/foo')
self.bm._clean_rooms()
self.assertEqual(self.bm.rooms['/'], {None: {'456': True},
'456': {'456': True}})

10
tests/test_server.py

@ -14,7 +14,7 @@ from socketio import server
class TestServer(unittest.TestCase):
def test_create(self, eio):
mgr = mock.MagicMock()
s = server.Server({'foo': 'bar'}, mgr, binary=True)
s = server.Server(mgr, binary=True, foo='bar')
mgr.assert_called_once_with(s)
eio.assert_called_once_with(**{'foo': 'bar'})
self.assertEqual(s.eio.on.call_count, 3)
@ -345,12 +345,10 @@ class TestServer(unittest.TestCase):
self.assertRaises(ValueError, s._handle_eio_message, '123',
'32["foo",2]')
def test_disconnect_all(self, eio):
def test_disconnect(self, eio):
s = server.Server()
s._handle_eio_connect('123', 'environ')
s._handle_eio_message('123', '0/foo')
s.disconnect('123')
s.eio.send.assert_any_call('123', '1/foo', binary=False)
s.eio.send.assert_any_call('123', '1', binary=False)
def test_disconnect_namespace(self, eio):
@ -367,3 +365,7 @@ class TestServer(unittest.TestCase):
self.assertEqual(s.logger.getEffectiveLevel(), logging.INFO)
s = server.Server(logger='foo')
self.assertEqual(s.logger, 'foo')
def test_engineio_logger(self, eio):
server.Server(engineio_logger='foo')
eio.assert_called_once_with(**{'logger': 'foo'})

Loading…
Cancel
Save