Browse Source

asyncio documentation and various fixes

pull/71/head
Miguel Grinberg 8 years ago
parent
commit
43788db7a7
  1. 155
      docs/index.rst
  2. 4
      socketio/__init__.py
  3. 12
      socketio/asyncio_manager.py
  4. 30
      socketio/asyncio_server.py
  5. 4
      tests/test_asyncio_manager.py
  6. 4
      tests/test_asyncio_server.py

155
docs/index.rst

@ -21,9 +21,10 @@ features:
Socket.IO specification. Socket.IO specification.
- Compatible with Python 2.7 and Python 3.3+. - Compatible with Python 2.7 and Python 3.3+.
- Supports large number of clients even on modest hardware when used with an - Supports large number of clients even on modest hardware when used with an
asynchronous server based on `eventlet <http://eventlet.net/>`_ or asynchronous server based on `asyncio <https://docs.python.org/3/library/asyncio.html>`_,
`gevent <http://gevent.org/>`_. For development and testing, any WSGI `eventlet <http://eventlet.net/>`_ or `gevent <http://gevent.org/>`_. For
complaint multi-threaded server can be used. development and testing, any WSGI complaint multi-threaded server can also be
used.
- Includes a WSGI middleware that integrates Socket.IO traffic with standard - Includes a WSGI middleware that integrates Socket.IO traffic with standard
WSGI applications. WSGI applications.
- Broadcasting of messages to all connected clients, or to subsets of them - Broadcasting of messages to all connected clients, or to subsets of them
@ -55,8 +56,45 @@ The Socket.IO server can be installed with pip::
pip install python-socketio pip install python-socketio
The following is a basic example of a Socket.IO server that uses Flask to The following is a basic example of a Socket.IO server that uses the
deploy the client code to the browser:: `aiohttp <http://aiohttp.readthedocs.io/>`_ framework for asyncio (Python 3.5+
only):
.. code:: python
from aiohttp import web
import socketio
sio = socketio.AsyncServer()
app = web.Application()
sio.attach(app)
async def index(request):
"""Serve the client-side application."""
with open('index.html') as f:
return web.Response(text=f.read(), content_type='text/html')
@sio.on('connect', namespace='/chat')
def connect(sid, environ):
print("connect ", sid)
@sio.on('chat message', namespace='/chat')
async def message(sid, data):
print("message ", data)
await sio.emit('reply', room=sid)
@sio.on('disconnect', namespace='/chat')
def disconnect(sid):
print('disconnect ', sid)
app.router.add_static('/static', 'static')
app.router.add_get('/', index)
if __name__ == '__main__':
web.run_app(app)
And below is a similar example, but using Flask and Eventlet. This example is
compatible with Python 2.7 and 3.3+::
import socketio import socketio
import eventlet import eventlet
@ -107,6 +145,41 @@ them with event handlers. An event is defined simply by a name.
When a connection with a client is broken, the ``disconnect`` event is called, When a connection with a client is broken, the ``disconnect`` event is called,
allowing the application to perform cleanup. allowing the application to perform cleanup.
Server
------
Socket.IO servers are instances of class :class:`socketio.Server`, which can be
combined with a WSGI compliant application using :class:`socketio.Middleware`::
# create a Socket.IO server
sio = socketio.Server()
# wrap WSGI application with socketio's middleware
app = socketio.Middleware(sio, app)
For asyncio based servers, the :class:`socketio.AsyncServer` class provides a
coroutine friendly server::
# create a Socket.IO server
sio = socketio.AsyncServer()
# attach server to application
sio.attach(app)
Event handlers for servers are register using the :func:`socketio.Server.on`
method::
@sio.on('my custom event')
def my_custom_event():
pass
For asyncio servers, event handlers can be regular functions or coroutines::
@sio.on('my custom event')
async def my_custom_event():
await sio.emit('my reply')
Rooms Rooms
----- -----
@ -232,6 +305,22 @@ that belong to a namespace can be created as methods of a subclass of
sio.register_namespace(MyCustomNamespace('/test')) sio.register_namespace(MyCustomNamespace('/test'))
For asyncio based severs, namespaces must inherit from
:class:`socketio.AsyncNamespace`, and can define event handlers as regular
methods or coroutines::
class MyCustomNamespace(socketio.AsyncNamespace):
def on_connect(sid, environ):
pass
def on_disconnect(sid):
pass
async def on_my_event(sid, data):
await self.emit('my_response', data)
sio.register_namespace(MyCustomNamespace('/test'))
When class-based namespaces are used, any events received by the server are When class-based namespaces are used, any events received by the server are
dispatched to a method named as the event name with the ``on_`` prefix. For dispatched to a method named as the event name with the ``on_`` prefix. For
example, event ``my_event`` will be handled by a method named ``on_my_event``. example, event ``my_event`` will be handled by a method named ``on_my_event``.
@ -241,8 +330,8 @@ class-based namespaces must used characters that are legal in method names.
As a convenience to methods defined in a class-based namespace, the namespace As a convenience to methods defined in a class-based namespace, the namespace
instance includes versions of several of the methods in the instance includes versions of several of the methods in the
:class:`socketio.Server` class that default to the proper namespace when the :class:`socketio.Server` and :class:`socketio.AsyncServer` classes that default
``namespace`` argument is not given. to the proper namespace when the ``namespace`` argument is not given.
In the case that an event has a handler in a class-based namespace, and also a In the case that an event has a handler in a class-based namespace, and also a
decorator-based function handler, only the standalone function handler is decorator-based function handler, only the standalone function handler is
@ -344,6 +433,33 @@ Deployment
The following sections describe a variety of deployment strategies for The following sections describe a variety of deployment strategies for
Socket.IO servers. Socket.IO servers.
Aiohttp
~~~~~~~
`Aiohttp <http://aiohttp.readthedocs.io/>`_ is a framework with support for HTTP
and WebSocket, based on asyncio. Support for this framework is limited to Python
3.5 and newer.
Instances of class ``engineio.AsyncServer`` will automatically use aiohttp
for asynchronous operations if the library is installed. To request its use
explicitly, the ``async_mode`` option can be given in the constructor::
sio = socketio.AsyncServer(async_mode='aiohttp')
A server configured for aiohttp must be attached to an existing application::
app = web.Application()
sio.attach(app)
The aiohttp application can define regular routes that will coexist with the
Socket.IO server. A typical pattern is to add routes that serve a client
application and any associated static files.
The aiohttp application is then executed in the usual manner::
if __name__ == '__main__':
web.run_app(app)
Eventlet Eventlet
~~~~~~~~ ~~~~~~~~
@ -385,7 +501,7 @@ database drivers are likely to require it.
Gevent Gevent
~~~~~~ ~~~~~~
`Gevent <http://gevent.org>`_ is another asynchronous framework based on `Gevent <http://gevent.org/>`_ is another asynchronous framework based on
coroutines, very similar to eventlet. An Socket.IO server deployed with coroutines, very similar to eventlet. An Socket.IO server deployed with
gevent has access to the long-polling transport. If project gevent has access to the long-polling transport. If project
`gevent-websocket <https://bitbucket.org/Jeffrey/gevent-websocket/>`_ is `gevent-websocket <https://bitbucket.org/Jeffrey/gevent-websocket/>`_ is
@ -503,8 +619,8 @@ difficult. To deploy a cluster of Socket.IO processes (hosted on one or
multiple servers), the following conditions must be met: multiple servers), the following conditions must be met:
- Each Socket.IO process must be able to handle multiple requests, either by - Each Socket.IO process must be able to handle multiple requests, either by
using eventlet, gevent, or standard threads. Worker processes that only using asyncio, eventlet, gevent, or standard threads. Worker processes that
handle one request at a time are not supported. only handle one request at a time are not supported.
- The load balancer must be configured to always forward requests from a - The load balancer must be configured to always forward requests from a
client to the same worker process. Load balancers call this *sticky client to the same worker process. Load balancers call this *sticky
sessions*, or *session affinity*. sessions*, or *session affinity*.
@ -516,17 +632,36 @@ API Reference
------------- -------------
.. module:: socketio .. module:: socketio
.. autoclass:: Middleware .. autoclass:: Middleware
:members: :members:
.. autoclass:: Server .. autoclass:: Server
:members: :members:
.. autoclass:: AsyncServer
:members:
:inherited-members:
.. autoclass:: Namespace .. autoclass:: Namespace
:members: :members:
.. autoclass:: AsyncNamespace
:members:
:inherited-members:
.. autoclass:: BaseManager .. autoclass:: BaseManager
:members: :members:
.. autoclass:: PubSubManager .. autoclass:: PubSubManager
:members: :members:
.. autoclass:: KombuManager .. autoclass:: KombuManager
:members: :members:
.. autoclass:: RedisManager .. autoclass:: RedisManager
:members: :members:
.. autoclass:: AsyncManager
:members:
:inherited-members:

4
socketio/__init__.py

@ -10,9 +10,12 @@ from .server import Server
from .namespace import Namespace from .namespace import Namespace
if sys.version_info >= (3, 5): # pragma: no cover if sys.version_info >= (3, 5): # pragma: no cover
from .asyncio_server import AsyncServer from .asyncio_server import AsyncServer
from .asyncio_manager import AsyncManager
from .asyncio_namespace import AsyncNamespace from .asyncio_namespace import AsyncNamespace
else: # pragma: no cover else: # pragma: no cover
AsyncServer = None AsyncServer = None
AsyncManager = None
AsyncNamespace = None
__version__ = '1.6.3' __version__ = '1.6.3'
@ -22,3 +25,4 @@ __all__ = ['__version__', 'Middleware', 'Server', 'BaseManager',
if AsyncServer is not None: # pragma: no cover if AsyncServer is not None: # pragma: no cover
__all__.append('AsyncServer') __all__.append('AsyncServer')
__all__.append('AsyncNamespace') __all__.append('AsyncNamespace')
__all__.append('AsyncManager')

12
socketio/asyncio_manager.py

@ -3,12 +3,15 @@ import asyncio
from .base_manager import BaseManager from .base_manager import BaseManager
class AsyncioManager(BaseManager): class AsyncManager(BaseManager):
"""Manage a client list for an asyncio server.""" """Manage a client list for an asyncio server."""
async def emit(self, event, data, namespace, room=None, skip_sid=None, async def emit(self, event, data, namespace, room=None, skip_sid=None,
callback=None, **kwargs): callback=None, **kwargs):
"""Emit a message to a single client, a room, or all the clients """Emit a message to a single client, a room, or all the clients
connected to the namespace.""" connected to the namespace.
Note: this method is a coroutine.
"""
if namespace not in self.rooms or room not in self.rooms[namespace]: if namespace not in self.rooms or room not in self.rooms[namespace]:
return return
tasks = [] tasks = []
@ -23,7 +26,10 @@ class AsyncioManager(BaseManager):
await asyncio.wait(tasks) await asyncio.wait(tasks)
async def trigger_callback(self, sid, namespace, id, data): async def trigger_callback(self, sid, namespace, id, data):
"""Invoke an application callback.""" """Invoke an application callback.
Note: this method is a coroutine.
"""
callback = None callback = None
try: try:
callback = self.callbacks[sid][namespace][id] callback = self.callbacks[sid][namespace][id]

30
socketio/asyncio_server.py

@ -30,12 +30,9 @@ class AsyncServer(server.Server):
:param async_mode: The asynchronous model to use. See the Deployment :param async_mode: The asynchronous model to use. See the Deployment
section in the documentation for a description of the section in the documentation for a description of the
available options. Valid async modes are "threading", available options. Valid async modes are "aiohttp". If
"eventlet", "gevent" and "gevent_uwsgi". If this this argument is not given, an async mode is chosen
argument is not given, "eventlet" is tried first, then based on the installed packages.
"gevent_uwsgi", then "gevent", and finally "threading".
The first async mode that has all its dependencies
installed is then one that is chosen.
:param ping_timeout: The time in seconds that the client waits for the :param ping_timeout: The time in seconds that the client waits for the
server to respond before disconnecting. server to respond before disconnecting.
:param ping_interval: The interval in seconds at which the client pings :param ping_interval: The interval in seconds at which the client pings
@ -58,10 +55,9 @@ class AsyncServer(server.Server):
a logger object to use. To disable logging set to a logger object to use. To disable logging set to
``False``. ``False``.
""" """
def __init__(self, client_manager=None, logger=False, binary=False, def __init__(self, client_manager=None, logger=False, json=None, **kwargs):
json=None, async_handlers=False, **kwargs):
if client_manager is None: if client_manager is None:
client_manager = asyncio_manager.AsyncioManager() client_manager = asyncio_manager.AsyncManager()
super().__init__(client_manager=client_manager, logger=logger, super().__init__(client_manager=client_manager, logger=logger,
binary=False, json=json, **kwargs) binary=False, json=json, **kwargs)
@ -171,23 +167,15 @@ class AsyncServer(server.Server):
await self._trigger_event('disconnect', namespace, sid) await self._trigger_event('disconnect', namespace, sid)
self.manager.disconnect(sid, namespace=namespace) self.manager.disconnect(sid, namespace=namespace)
async def handle_request(self, environ): async def handle_request(self, *args, **kwargs):
"""Handle an HTTP request from the client. """Handle an HTTP request from the client.
This is the entry point of the Socket.IO application, using the same This is the entry point of the Socket.IO application. This function
interface as a WSGI application. For the typical usage, this function returns the HTTP response body to deliver to the client.
is invoked by the :class:`Middleware` instance, but it can be invoked
directly when the middleware is not used.
:param environ: The WSGI environment.
:param start_response: The WSGI ``start_response`` function.
This function returns the HTTP response body to deliver to the client
as a byte sequence.
Note: this method is a coroutine. Note: this method is a coroutine.
""" """
return await self.eio.handle_request(environ) return await self.eio.handle_request(*args, **kwargs)
def start_background_task(self, target, *args, **kwargs): def start_background_task(self, target, *args, **kwargs):
"""Start a background task using the appropriate async model. """Start a background task using the appropriate async model.

4
tests/test_asyncio_manager.py

@ -35,11 +35,11 @@ def _run(coro):
@unittest.skipIf(sys.version_info < (3, 5), 'only for Python 3.5+') @unittest.skipIf(sys.version_info < (3, 5), 'only for Python 3.5+')
class TestAsyncioManager(unittest.TestCase): class TestAsyncManager(unittest.TestCase):
def setUp(self): def setUp(self):
mock_server = mock.MagicMock() mock_server = mock.MagicMock()
mock_server._emit_internal = AsyncMock() mock_server._emit_internal = AsyncMock()
self.bm = asyncio_manager.AsyncioManager() self.bm = asyncio_manager.AsyncManager()
self.bm.set_server(mock_server) self.bm.set_server(mock_server)
self.bm.initialize() self.bm.initialize()

4
tests/test_asyncio_server.py

@ -202,7 +202,7 @@ class TestAsyncServer(unittest.TestCase):
def test_emit_internal_binary(self, eio): def test_emit_internal_binary(self, eio):
eio.return_value.send = AsyncMock() eio.return_value.send = AsyncMock()
s = asyncio_server.AsyncServer(binary=True) s = asyncio_server.AsyncServer()
_run(s._emit_internal('123', u'my event', b'my binary data')) _run(s._emit_internal('123', u'my event', b'my binary data'))
self.assertEqual(s.eio.send.mock.call_count, 2) self.assertEqual(s.eio.send.mock.call_count, 2)
@ -412,7 +412,7 @@ class TestAsyncServer(unittest.TestCase):
def test_handle_event_with_ack_binary(self, eio): def test_handle_event_with_ack_binary(self, eio):
eio.return_value.send = AsyncMock() eio.return_value.send = AsyncMock()
mgr = self._get_mock_manager() mgr = self._get_mock_manager()
s = asyncio_server.AsyncServer(client_manager=mgr, binary=True) s = asyncio_server.AsyncServer(client_manager=mgr)
handler = mock.MagicMock(return_value=b'foo') handler = mock.MagicMock(return_value=b'foo')
s.on('my message', handler) s.on('my message', handler)
_run(s._handle_eio_message('123', '21000["my message","foo"]')) _run(s._handle_eio_message('123', '21000["my message","foo"]'))

Loading…
Cancel
Save