+
+
+
+
+
+
+
+
diff --git a/examples/aiohttp/latency.py b/examples/aiohttp/latency.py
new file mode 100755
index 0000000..fa196e5
--- /dev/null
+++ b/examples/aiohttp/latency.py
@@ -0,0 +1,25 @@
+from aiohttp import web
+
+import socketio
+
+sio = socketio.AsyncServer(async_mode='aiohttp')
+app = web.Application()
+sio.attach(app)
+
+
+async def index(request):
+ with open('latency.html') as f:
+ return web.Response(text=f.read(), content_type='text/html')
+
+
+@sio.on('ping_from_client')
+async def ping(sid):
+ await sio.emit('pong_from_server', room=sid)
+
+
+app.router.add_static('/static', 'static')
+app.router.add_get('/', index)
+
+
+if __name__ == '__main__':
+ web.run_app(app)
diff --git a/examples/static/style.css b/examples/aiohttp/static/style.css
similarity index 100%
rename from examples/static/style.css
rename to examples/aiohttp/static/style.css
diff --git a/examples/app.py b/examples/wsgi/app.py
similarity index 100%
rename from examples/app.py
rename to examples/wsgi/app.py
diff --git a/examples/latency.py b/examples/wsgi/latency.py
similarity index 100%
rename from examples/latency.py
rename to examples/wsgi/latency.py
diff --git a/examples/requirements.txt b/examples/wsgi/requirements.txt
similarity index 100%
rename from examples/requirements.txt
rename to examples/wsgi/requirements.txt
diff --git a/examples/wsgi/static/style.css b/examples/wsgi/static/style.css
new file mode 100644
index 0000000..d20bcad
--- /dev/null
+++ b/examples/wsgi/static/style.css
@@ -0,0 +1,4 @@
+body { margin: 0; padding: 0; font-family: Helvetica Neue; }
+h1 { margin: 100px 100px 10px; }
+h2 { color: #999; margin: 0 100px 30px; font-weight: normal; }
+#latency { color: red; }
diff --git a/examples/wsgi/templates/index.html b/examples/wsgi/templates/index.html
new file mode 100755
index 0000000..1668814
--- /dev/null
+++ b/examples/wsgi/templates/index.html
@@ -0,0 +1,91 @@
+
+
+
+ Flask-SocketIO Test
+
+
+
+
+
+
Flask-SocketIO Test
+
Send:
+
+
+
+
+
+
+
+
Receive:
+
+
+
diff --git a/examples/templates/latency.html b/examples/wsgi/templates/latency.html
similarity index 100%
rename from examples/templates/latency.html
rename to examples/wsgi/templates/latency.html
diff --git a/socketio/__init__.py b/socketio/__init__.py
index d4935a0..942604d 100644
--- a/socketio/__init__.py
+++ b/socketio/__init__.py
@@ -1,3 +1,5 @@
+import sys
+
from .middleware import Middleware
from .base_manager import BaseManager
from .pubsub_manager import PubSubManager
@@ -6,8 +8,15 @@ from .redis_manager import RedisManager
from .zmq_manager import ZmqManager
from .server import Server
from .namespace import Namespace
+if sys.version_info >= (3, 5): # pragma: no cover
+ from .asyncio_server import AsyncServer
+else: # pragma: no cover
+ AsyncServer = None
__version__ = '1.6.3'
-__all__ = [__version__, Middleware, Server, BaseManager, PubSubManager,
- KombuManager, RedisManager, ZmqManager, Namespace]
+__all__ = ['__version__', 'Middleware', 'Server', 'BaseManager',
+ 'PubSubManager', 'KombuManager', 'RedisManager', 'ZmqManager',
+ 'Namespace']
+if AsyncServer is not None: # pragma: no cover
+ __all__.append('AsyncServer')
diff --git a/socketio/asyncio_manager.py b/socketio/asyncio_manager.py
new file mode 100644
index 0000000..eb3b7cb
--- /dev/null
+++ b/socketio/asyncio_manager.py
@@ -0,0 +1,42 @@
+import asyncio
+
+from .base_manager import BaseManager
+
+
+class AsyncioManager(BaseManager):
+ """Manage a client list for an asyncio server."""
+ async def emit(self, event, data, namespace, room=None, skip_sid=None,
+ callback=None, **kwargs):
+ """Emit a message to a single client, a room, or all the clients
+ connected to the namespace."""
+ if namespace not in self.rooms or room not in self.rooms[namespace]:
+ return
+ tasks = []
+ for sid in self.get_participants(namespace, room):
+ if sid != skip_sid:
+ if callback is not None:
+ id = self._generate_ack_id(sid, namespace, callback)
+ else:
+ id = None
+ tasks.append(self.server._emit_internal(sid, event, data,
+ namespace, id))
+ await asyncio.wait(tasks)
+
+ async def trigger_callback(self, sid, namespace, id, data):
+ """Invoke an application callback."""
+ callback = None
+ try:
+ callback = self.callbacks[sid][namespace][id]
+ except KeyError:
+ # if we get an unknown callback we just ignore it
+ self.server.logger.warning('Unknown callback received, ignoring.')
+ else:
+ del self.callbacks[sid][namespace][id]
+ if callback is not None:
+ if asyncio.iscoroutinefunction(callback):
+ try:
+ await callback(*data)
+ except asyncio.CancelledError: # pragma: no cover
+ pass
+ else:
+ callback(*data)
diff --git a/socketio/asyncio_server.py b/socketio/asyncio_server.py
new file mode 100644
index 0000000..c3dc312
--- /dev/null
+++ b/socketio/asyncio_server.py
@@ -0,0 +1,342 @@
+import asyncio
+
+import engineio
+
+from . import asyncio_manager
+from . import packet
+from . import server
+
+
+class AsyncServer(server.Server):
+ """A Socket.IO server for asyncio.
+
+ This class implements a fully compliant Socket.IO web server with support
+ for websocket and long-polling transports, compatible with the asyncio
+ framework on Python 3.5 or newer.
+
+ :param client_manager: The client manager instance that will manage the
+ client list. When this is omitted, the client list
+ is stored in an in-memory structure, so the use of
+ multiple connected servers is not possible.
+ :param logger: To enable logging set to ``True`` or pass a logger object to
+ use. To disable logging set to ``False``.
+ :param json: An alternative json module to use for encoding and decoding
+ packets. Custom json modules must have ``dumps`` and ``loads``
+ functions that are compatible with the standard library
+ versions.
+ :param kwargs: Connection parameters for the underlying Engine.IO server.
+
+ The Engine.IO configuration supports the following settings:
+
+ :param async_mode: The asynchronous model to use. See the Deployment
+ section in the documentation for a description of the
+ available options. Valid async modes are "threading",
+ "eventlet", "gevent" and "gevent_uwsgi". If this
+ argument is not given, "eventlet" is tried first, then
+ "gevent_uwsgi", then "gevent", and finally "threading".
+ The first async mode that has all its dependencies
+ installed is then one that is chosen.
+ :param ping_timeout: The time in seconds that the client waits for the
+ server to respond before disconnecting.
+ :param ping_interval: The interval in seconds at which the client pings
+ the server.
+ :param max_http_buffer_size: The maximum size of a message when using the
+ polling transport.
+ :param allow_upgrades: Whether to allow transport upgrades or not.
+ :param http_compression: Whether to compress packages when using the
+ polling transport.
+ :param compression_threshold: Only compress messages when their byte size
+ is greater than this value.
+ :param cookie: Name of the HTTP cookie that contains the client session
+ id. If set to ``None``, a cookie is not sent to the client.
+ :param cors_allowed_origins: List of origins that are allowed to connect
+ to this server. All origins are allowed by
+ default.
+ :param cors_credentials: Whether credentials (cookies, authentication) are
+ allowed in requests to this server.
+ :param engineio_logger: To enable Engine.IO logging set to ``True`` or pass
+ a logger object to use. To disable logging set to
+ ``False``.
+ """
+ def __init__(self, client_manager=None, logger=False, binary=False,
+ json=None, async_handlers=False, **kwargs):
+ if client_manager is None:
+ client_manager = asyncio_manager.AsyncioManager()
+ super().__init__(client_manager=client_manager, logger=logger,
+ binary=False, json=json, **kwargs)
+
+ def is_asyncio_based(self):
+ return True
+
+ def attach(self, app, socketio_path='socket.io'):
+ """Attach the Socket.IO server to an application."""
+ self.eio.attach(app, socketio_path)
+
+ async def emit(self, event, data=None, room=None, skip_sid=None,
+ namespace=None, callback=None, **kwargs):
+ """Emit a custom event to one or more connected clients.
+
+ :param event: The event name. It can be any string. The event names
+ ``'connect'``, ``'message'`` and ``'disconnect'`` are
+ reserved and should not be used.
+ :param data: The data to send to the client or clients. Data can be of
+ type ``str``, ``bytes``, ``list`` or ``dict``. If a
+ ``list`` or ``dict``, the data will be serialized as JSON.
+ :param room: The recipient of the message. This can be set to the
+ session ID of a client to address that client's room, or
+ to any custom room created by the application, If this
+ argument is omitted the event is broadcasted to all
+ connected clients.
+ :param skip_sid: The session ID of a client to skip when broadcasting
+ to a room or to all clients. This can be used to
+ prevent a message from being sent to the sender.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the event is emitted to the
+ default namespace.
+ :param callback: If given, this function will be called to acknowledge
+ the the client has received the message. The arguments
+ that will be passed to the function are those provided
+ by the client. Callback functions can only be used
+ when addressing an individual client.
+ :param ignore_queue: Only used when a message queue is configured. If
+ set to ``True``, the event is emitted to the
+ clients directly, without going through the queue.
+ This is more efficient, but only works when a
+ single server process is used. It is recommended
+ to always leave this parameter with its default
+ value of ``False``.
+
+ Note: this method is asynchronous.
+ """
+ namespace = namespace or '/'
+ self.logger.info('emitting event "%s" to %s [%s]', event,
+ room or 'all', namespace)
+ await self.manager.emit(event, data, namespace, room, skip_sid,
+ callback, **kwargs)
+
+ async def send(self, data, room=None, skip_sid=None, namespace=None,
+ callback=None, **kwargs):
+ """Send a message to one or more connected clients.
+
+ This function emits an event with the name ``'message'``. Use
+ :func:`emit` to issue custom event names.
+
+ :param data: The data to send to the client or clients. Data can be of
+ type ``str``, ``bytes``, ``list`` or ``dict``. If a
+ ``list`` or ``dict``, the data will be serialized as JSON.
+ :param room: The recipient of the message. This can be set to the
+ session ID of a client to address that client's room, or
+ to any custom room created by the application, If this
+ argument is omitted the event is broadcasted to all
+ connected clients.
+ :param skip_sid: The session ID of a client to skip when broadcasting
+ to a room or to all clients. This can be used to
+ prevent a message from being sent to the sender.
+ :param namespace: The Socket.IO namespace for the event. If this
+ argument is omitted the event is emitted to the
+ default namespace.
+ :param callback: If given, this function will be called to acknowledge
+ the the client has received the message. The arguments
+ that will be passed to the function are those provided
+ by the client. Callback functions can only be used
+ when addressing an individual client.
+ :param ignore_queue: Only used when a message queue is configured. If
+ set to ``True``, the event is emitted to the
+ clients directly, without going through the queue.
+ This is more efficient, but only works when a
+ single server process is used. It is recommended
+ to always leave this parameter with its default
+ value of ``False``.
+
+ Note: this method is asynchronous.
+ """
+ await self.emit('message', data, room, skip_sid, namespace, callback,
+ **kwargs)
+
+ async def disconnect(self, sid, namespace=None):
+ """Disconnect a client.
+
+ :param sid: Session ID of the client.
+ :param namespace: The Socket.IO namespace to disconnect. If this
+ argument is omitted the default namespace is used.
+
+ Note: this method is asynchronous.
+ """
+ namespace = namespace or '/'
+ if self.manager.is_connected(sid, namespace=namespace):
+ self.logger.info('Disconnecting %s [%s]', sid, namespace)
+ self.manager.pre_disconnect(sid, namespace=namespace)
+ await self._send_packet(sid, packet.Packet(packet.DISCONNECT,
+ namespace=namespace))
+ await self._trigger_event('disconnect', namespace, sid)
+ self.manager.disconnect(sid, namespace=namespace)
+
+ async def handle_request(self, environ):
+ """Handle an HTTP request from the client.
+
+ This is the entry point of the Socket.IO application, using the same
+ interface as a WSGI application. For the typical usage, this function
+ is invoked by the :class:`Middleware` instance, but it can be invoked
+ directly when the middleware is not used.
+
+ :param environ: The WSGI environment.
+ :param start_response: The WSGI ``start_response`` function.
+
+ This function returns the HTTP response body to deliver to the client
+ as a byte sequence.
+
+ Note: this method is asynchronous.
+ """
+ if not self.manager_initialized:
+ self.manager_initialized = True
+ self.manager.initialize()
+ return await self.eio.handle_request(environ)
+
+ def start_background_task(self, target, *args, **kwargs):
+ raise RuntimeError('Not implemented, use asyncio.')
+
+ def sleep(self, seconds=0):
+ raise RuntimeError('Not implemented, use asyncio.')
+
+ async def _emit_internal(self, sid, event, data, namespace=None, id=None):
+ """Send a message to a client."""
+ # tuples are expanded to multiple arguments, everything else is sent
+ # as a single argument
+ if isinstance(data, tuple):
+ data = list(data)
+ else:
+ data = [data]
+ await self._send_packet(sid, packet.Packet(
+ packet.EVENT, namespace=namespace, data=[event] + data, id=id,
+ binary=None))
+
+ async def _send_packet(self, sid, pkt):
+ """Send a Socket.IO packet to a client."""
+ encoded_packet = pkt.encode()
+ if isinstance(encoded_packet, list):
+ binary = False
+ for ep in encoded_packet:
+ await self.eio.send(sid, ep, binary=binary)
+ binary = True
+ else:
+ await self.eio.send(sid, encoded_packet, binary=False)
+
+ async def _handle_connect(self, sid, namespace):
+ """Handle a client connection request."""
+ namespace = namespace or '/'
+ self.manager.connect(sid, namespace)
+ if await self._trigger_event('connect', namespace, sid,
+ self.environ[sid]) is False:
+ self.manager.disconnect(sid, namespace)
+ await self._send_packet(sid, packet.Packet(packet.ERROR,
+ namespace=namespace))
+ return False
+ else:
+ await self._send_packet(sid, packet.Packet(packet.CONNECT,
+ namespace=namespace))
+
+ async def _handle_disconnect(self, sid, namespace):
+ """Handle a client disconnect."""
+ namespace = namespace or '/'
+ if namespace == '/':
+ namespace_list = list(self.manager.get_namespaces())
+ else:
+ namespace_list = [namespace]
+ for n in namespace_list:
+ if n != '/' and self.manager.is_connected(sid, n):
+ await self._trigger_event('disconnect', n, sid)
+ self.manager.disconnect(sid, n)
+ if namespace == '/' and self.manager.is_connected(sid, namespace):
+ await self._trigger_event('disconnect', '/', sid)
+ self.manager.disconnect(sid, '/')
+ if sid in self.environ:
+ del self.environ[sid]
+
+ async def _handle_event(self, sid, namespace, id, data):
+ """Handle an incoming client event."""
+ namespace = namespace or '/'
+ self.logger.info('received event "%s" from %s [%s]', data[0], sid,
+ namespace)
+ await self._handle_event_internal(self, sid, data, namespace, id)
+
+ async def _handle_event_internal(self, server, sid, data, namespace, id):
+ r = await server._trigger_event(data[0], namespace, sid, *data[1:])
+ if id is not None:
+ # send ACK packet with the response returned by the handler
+ # tuples are expanded as multiple arguments
+ if r is None:
+ data = []
+ elif isinstance(r, tuple):
+ data = list(r)
+ else:
+ data = [r]
+ await server._send_packet(sid, packet.Packet(packet.ACK,
+ namespace=namespace,
+ id=id, data=data,
+ binary=None))
+
+ async def _handle_ack(self, sid, namespace, id, data):
+ """Handle ACK packets from the client."""
+ namespace = namespace or '/'
+ self.logger.info('received ack from %s [%s]', sid, namespace)
+ await self.manager.trigger_callback(sid, namespace, id, data)
+
+ async def _trigger_event(self, event, namespace, *args):
+ """Invoke an application event handler."""
+ # first see if we have an explicit handler for the event
+ if namespace in self.handlers and event in self.handlers[namespace]:
+ if asyncio.iscoroutinefunction(self.handlers[namespace][event]):
+ try:
+ ret = await self.handlers[namespace][event](*args)
+ except asyncio.CancelledError: # pragma: no cover
+ pass
+ else:
+ ret = self.handlers[namespace][event](*args)
+ return ret
+
+ # or else, forward the event to a namepsace handler if one exists
+ elif namespace in self.namespace_handlers:
+ return await self.namespace_handlers[namespace].trigger_event(
+ event, *args)
+
+ async def _handle_eio_connect(self, sid, environ):
+ """Handle the Engine.IO connection event."""
+ self.environ[sid] = environ
+ return await self._handle_connect(sid, '/')
+
+ async def _handle_eio_message(self, sid, data):
+ """Dispatch Engine.IO messages."""
+ if sid in self._binary_packet:
+ pkt = self._binary_packet[sid]
+ if pkt.add_attachment(data):
+ del self._binary_packet[sid]
+ if pkt.packet_type == packet.BINARY_EVENT:
+ await self._handle_event(sid, pkt.namespace, pkt.id,
+ pkt.data)
+ else:
+ await self._handle_ack(sid, pkt.namespace, pkt.id,
+ pkt.data)
+ else:
+ pkt = packet.Packet(encoded_packet=data)
+ if pkt.packet_type == packet.CONNECT:
+ await self._handle_connect(sid, pkt.namespace)
+ elif pkt.packet_type == packet.DISCONNECT:
+ await self._handle_disconnect(sid, pkt.namespace)
+ elif pkt.packet_type == packet.EVENT:
+ await self._handle_event(sid, pkt.namespace, pkt.id, pkt.data)
+ elif pkt.packet_type == packet.ACK:
+ await self._handle_ack(sid, pkt.namespace, pkt.id, pkt.data)
+ elif pkt.packet_type == packet.BINARY_EVENT or \
+ pkt.packet_type == packet.BINARY_ACK:
+ self._binary_packet[sid] = pkt
+ elif pkt.packet_type == packet.ERROR:
+ raise ValueError('Unexpected ERROR packet.')
+ else:
+ raise ValueError('Unknown packet type.')
+
+ async def _handle_eio_disconnect(self, sid):
+ """Handle Engine.IO disconnect event."""
+ await self._handle_disconnect(sid, '/')
+
+ def _engineio_server_class(self):
+ return engineio.AsyncServer
diff --git a/socketio/base_manager.py b/socketio/base_manager.py
index 198fabc..283db13 100644
--- a/socketio/base_manager.py
+++ b/socketio/base_manager.py
@@ -18,9 +18,15 @@ class BaseManager(object):
self.callbacks = {}
self.pending_disconnect = {}
- def initialize(self, server):
+ def set_server(self, server):
self.server = server
+ def initialize(self):
+ """Invoked before the first request is received. Subclasses can add
+ their initialization code here.
+ """
+ pass
+
def get_namespaces(self):
"""Return an iterable with the active namespace names."""
return six.iterkeys(self.rooms)
diff --git a/socketio/pubsub_manager.py b/socketio/pubsub_manager.py
index 5de89b6..4810728 100644
--- a/socketio/pubsub_manager.py
+++ b/socketio/pubsub_manager.py
@@ -30,8 +30,8 @@ class PubSubManager(BaseManager):
self.write_only = write_only
self.host_id = uuid.uuid4().hex
- def initialize(self, server):
- super(PubSubManager, self).initialize(server)
+ def initialize(self):
+ super(PubSubManager, self).initialize()
if not self.write_only:
self.thread = self.server.start_background_task(self._thread)
self.server.logger.info(self.name + ' backend initialized.')
diff --git a/socketio/server.py b/socketio/server.py
index 211eff8..7975ec5 100644
--- a/socketio/server.py
+++ b/socketio/server.py
@@ -79,7 +79,7 @@ class Server(object):
packet.Packet.json = json
engineio_options['json'] = json
engineio_options['async_handlers'] = False
- self.eio = engineio.Server(**engineio_options)
+ self.eio = self._engineio_server_class()(**engineio_options)
self.eio.on('connect', self._handle_eio_connect)
self.eio.on('message', self._handle_eio_message)
self.eio.on('disconnect', self._handle_eio_disconnect)
@@ -106,6 +106,7 @@ class Server(object):
if client_manager is None:
client_manager = base_manager.BaseManager()
self.manager = client_manager
+ self.manager.set_server(self)
self.manager_initialized = False
self.async_handlers = async_handlers
@@ -346,7 +347,7 @@ class Server(object):
"""
if not self.manager_initialized:
self.manager_initialized = True
- self.manager.initialize(self)
+ self.manager.initialize()
return self.eio.handle_request(environ, start_response)
def start_background_task(self, target, *args, **kwargs):
@@ -518,3 +519,6 @@ class Server(object):
def _handle_eio_disconnect(self, sid):
"""Handle Engine.IO disconnect event."""
self._handle_disconnect(sid, '/')
+
+ def _engineio_server_class(self):
+ return engineio.Server
diff --git a/tests/test_base_manager.py b/tests/test_base_manager.py
index 2a801be..55141d4 100644
--- a/tests/test_base_manager.py
+++ b/tests/test_base_manager.py
@@ -13,7 +13,8 @@ class TestBaseManager(unittest.TestCase):
def setUp(self):
mock_server = mock.MagicMock()
self.bm = base_manager.BaseManager()
- self.bm.initialize(mock_server)
+ self.bm.set_server(mock_server)
+ self.bm.initialize()
def test_connect(self):
self.bm.connect('123', '/foo')
diff --git a/tests/test_pubsub_manager.py b/tests/test_pubsub_manager.py
index d4cca4f..aa461b6 100644
--- a/tests/test_pubsub_manager.py
+++ b/tests/test_pubsub_manager.py
@@ -16,7 +16,8 @@ class TestBaseManager(unittest.TestCase):
mock_server = mock.MagicMock()
self.pm = pubsub_manager.PubSubManager()
self.pm._publish = mock.MagicMock()
- self.pm.initialize(mock_server)
+ self.pm.set_server(mock_server)
+ self.pm.initialize()
def test_default_init(self):
self.assertEqual(self.pm.channel, 'socketio')
@@ -32,7 +33,8 @@ class TestBaseManager(unittest.TestCase):
def test_write_only_init(self):
mock_server = mock.MagicMock()
pm = pubsub_manager.PubSubManager(write_only=True)
- pm.initialize(mock_server)
+ pm.set_server(mock_server)
+ pm.initialize()
self.assertEqual(pm.channel, 'socketio')
self.assertEqual(len(pm.host_id), 32)
self.assertEqual(pm.server.start_background_task.call_count, 0)