From 53d10d9f3204a86b8c48ffca347857044215d009 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Fri, 10 Feb 2017 10:11:02 -0800 Subject: [PATCH] asyncio support --- .gitignore | 2 + .travis.yml | 4 +- .../index.html => aiohttp/app.html} | 0 examples/aiohttp/app.py | 88 +++++ examples/aiohttp/latency.html | 64 ++++ examples/aiohttp/latency.py | 25 ++ examples/{ => aiohttp}/static/style.css | 0 examples/{ => wsgi}/app.py | 0 examples/{ => wsgi}/latency.py | 0 examples/{ => wsgi}/requirements.txt | 0 examples/wsgi/static/style.css | 4 + examples/wsgi/templates/index.html | 91 +++++ examples/{ => wsgi}/templates/latency.html | 0 socketio/__init__.py | 13 +- socketio/asyncio_manager.py | 42 +++ socketio/asyncio_server.py | 342 ++++++++++++++++++ socketio/base_manager.py | 8 +- socketio/pubsub_manager.py | 4 +- socketio/server.py | 8 +- tests/test_base_manager.py | 3 +- tests/test_pubsub_manager.py | 6 +- 21 files changed, 691 insertions(+), 13 deletions(-) rename examples/{templates/index.html => aiohttp/app.html} (100%) create mode 100755 examples/aiohttp/app.py create mode 100755 examples/aiohttp/latency.html create mode 100755 examples/aiohttp/latency.py rename examples/{ => aiohttp}/static/style.css (100%) rename examples/{ => wsgi}/app.py (100%) rename examples/{ => wsgi}/latency.py (100%) rename examples/{ => wsgi}/requirements.txt (100%) create mode 100644 examples/wsgi/static/style.css create mode 100755 examples/wsgi/templates/index.html rename examples/{ => wsgi}/templates/latency.html (100%) create mode 100644 socketio/asyncio_manager.py create mode 100644 socketio/asyncio_server.py diff --git a/.gitignore b/.gitignore index d619cd7..30526d0 100644 --- a/.gitignore +++ b/.gitignore @@ -40,5 +40,7 @@ venv* .eggs .ropeproject .idea +.vscode +tags htmlcov *.swp diff --git a/.travis.yml b/.travis.yml index 84e59ea..cd3a253 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,6 @@ language: python matrix: include: - - python: 2.7 - env: TOXENV=flake8 - python: 3.6 env: TOXENV=flake8 - python: 2.7 @@ -15,7 +13,7 @@ matrix: env: TOXENV=py36 - python: pypy env: TOXENV=pypy - - python: 2.7 + - python: 3.6 env: TOXENV=docs install: - pip install tox diff --git a/examples/templates/index.html b/examples/aiohttp/app.html similarity index 100% rename from examples/templates/index.html rename to examples/aiohttp/app.html diff --git a/examples/aiohttp/app.py b/examples/aiohttp/app.py new file mode 100755 index 0000000..da6397f --- /dev/null +++ b/examples/aiohttp/app.py @@ -0,0 +1,88 @@ +import asyncio + +from aiohttp import web + +import socketio + +sio = socketio.AsyncServer(async_mode='aiohttp') +app = web.Application() +sio.attach(app) + + +async def background_task(): + """Example of how to send server generated events to clients.""" + count = 0 + while True: + await asyncio.sleep(10) + count += 1 + await sio.emit('my response', {'data': 'Server generated event'}, + namespace='/test') + + +async def index(request): + with open('app.html') as f: + return web.Response(text=f.read(), content_type='text/html') + + +@sio.on('my event', namespace='/test') +async def test_message(sid, message): + await sio.emit('my response', {'data': message['data']}, room=sid, + namespace='/test') + + +@sio.on('my broadcast event', namespace='/test') +async def test_broadcast_message(sid, message): + await sio.emit('my response', {'data': message['data']}, namespace='/test') + + +@sio.on('join', namespace='/test') +async def join(sid, message): + sio.enter_room(sid, message['room'], namespace='/test') + await sio.emit('my response', {'data': 'Entered room: ' + message['room']}, + room=sid, namespace='/test') + + +@sio.on('leave', namespace='/test') +async def leave(sid, message): + sio.leave_room(sid, message['room'], namespace='/test') + await sio.emit('my response', {'data': 'Left room: ' + message['room']}, + room=sid, namespace='/test') + + +@sio.on('close room', namespace='/test') +async def close(sid, message): + await sio.emit('my response', + {'data': 'Room ' + message['room'] + ' is closing.'}, + room=message['room'], namespace='/test') + sio.close_room(message['room'], namespace='/test') + + +@sio.on('my room event', namespace='/test') +async def send_room_message(sid, message): + await sio.emit('my response', {'data': message['data']}, + room=message['room'], namespace='/test') + + +@sio.on('disconnect request', namespace='/test') +async def disconnect_request(sid): + await sio.disconnect(sid, namespace='/test') + + +@sio.on('connect', namespace='/test') +async def test_connect(sid, environ): + await sio.emit('my response', {'data': 'Connected', 'count': 0}, room=sid, + namespace='/test') + + +@sio.on('disconnect', namespace='/test') +def test_disconnect(sid): + print('Client disconnected') + + +app.router.add_static('/static', 'static') +app.router.add_get('/', index) + + +if __name__ == '__main__': + asyncio.ensure_future(background_task()) + web.run_app(app) diff --git a/examples/aiohttp/latency.html b/examples/aiohttp/latency.html new file mode 100755 index 0000000..769e9ef --- /dev/null +++ b/examples/aiohttp/latency.html @@ -0,0 +1,64 @@ + + + + Socket.IO Latency + + + +

Socket.IO Latency

+

(connecting)

+ + + + + + + + diff --git a/examples/aiohttp/latency.py b/examples/aiohttp/latency.py new file mode 100755 index 0000000..fa196e5 --- /dev/null +++ b/examples/aiohttp/latency.py @@ -0,0 +1,25 @@ +from aiohttp import web + +import socketio + +sio = socketio.AsyncServer(async_mode='aiohttp') +app = web.Application() +sio.attach(app) + + +async def index(request): + with open('latency.html') as f: + return web.Response(text=f.read(), content_type='text/html') + + +@sio.on('ping_from_client') +async def ping(sid): + await sio.emit('pong_from_server', room=sid) + + +app.router.add_static('/static', 'static') +app.router.add_get('/', index) + + +if __name__ == '__main__': + web.run_app(app) diff --git a/examples/static/style.css b/examples/aiohttp/static/style.css similarity index 100% rename from examples/static/style.css rename to examples/aiohttp/static/style.css diff --git a/examples/app.py b/examples/wsgi/app.py similarity index 100% rename from examples/app.py rename to examples/wsgi/app.py diff --git a/examples/latency.py b/examples/wsgi/latency.py similarity index 100% rename from examples/latency.py rename to examples/wsgi/latency.py diff --git a/examples/requirements.txt b/examples/wsgi/requirements.txt similarity index 100% rename from examples/requirements.txt rename to examples/wsgi/requirements.txt diff --git a/examples/wsgi/static/style.css b/examples/wsgi/static/style.css new file mode 100644 index 0000000..d20bcad --- /dev/null +++ b/examples/wsgi/static/style.css @@ -0,0 +1,4 @@ +body { margin: 0; padding: 0; font-family: Helvetica Neue; } +h1 { margin: 100px 100px 10px; } +h2 { color: #999; margin: 0 100px 30px; font-weight: normal; } +#latency { color: red; } diff --git a/examples/wsgi/templates/index.html b/examples/wsgi/templates/index.html new file mode 100755 index 0000000..1668814 --- /dev/null +++ b/examples/wsgi/templates/index.html @@ -0,0 +1,91 @@ + + + + Flask-SocketIO Test + + + + + +

Flask-SocketIO Test

+

Send:

+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ + + +
+
+ + +
+
+ +
+

Receive:

+

+ + diff --git a/examples/templates/latency.html b/examples/wsgi/templates/latency.html similarity index 100% rename from examples/templates/latency.html rename to examples/wsgi/templates/latency.html diff --git a/socketio/__init__.py b/socketio/__init__.py index d4935a0..942604d 100644 --- a/socketio/__init__.py +++ b/socketio/__init__.py @@ -1,3 +1,5 @@ +import sys + from .middleware import Middleware from .base_manager import BaseManager from .pubsub_manager import PubSubManager @@ -6,8 +8,15 @@ from .redis_manager import RedisManager from .zmq_manager import ZmqManager from .server import Server from .namespace import Namespace +if sys.version_info >= (3, 5): # pragma: no cover + from .asyncio_server import AsyncServer +else: # pragma: no cover + AsyncServer = None __version__ = '1.6.3' -__all__ = [__version__, Middleware, Server, BaseManager, PubSubManager, - KombuManager, RedisManager, ZmqManager, Namespace] +__all__ = ['__version__', 'Middleware', 'Server', 'BaseManager', + 'PubSubManager', 'KombuManager', 'RedisManager', 'ZmqManager', + 'Namespace'] +if AsyncServer is not None: # pragma: no cover + __all__.append('AsyncServer') diff --git a/socketio/asyncio_manager.py b/socketio/asyncio_manager.py new file mode 100644 index 0000000..eb3b7cb --- /dev/null +++ b/socketio/asyncio_manager.py @@ -0,0 +1,42 @@ +import asyncio + +from .base_manager import BaseManager + + +class AsyncioManager(BaseManager): + """Manage a client list for an asyncio server.""" + async def emit(self, event, data, namespace, room=None, skip_sid=None, + callback=None, **kwargs): + """Emit a message to a single client, a room, or all the clients + connected to the namespace.""" + if namespace not in self.rooms or room not in self.rooms[namespace]: + return + tasks = [] + for sid in self.get_participants(namespace, room): + if sid != skip_sid: + if callback is not None: + id = self._generate_ack_id(sid, namespace, callback) + else: + id = None + tasks.append(self.server._emit_internal(sid, event, data, + namespace, id)) + await asyncio.wait(tasks) + + async def trigger_callback(self, sid, namespace, id, data): + """Invoke an application callback.""" + callback = None + try: + callback = self.callbacks[sid][namespace][id] + except KeyError: + # if we get an unknown callback we just ignore it + self.server.logger.warning('Unknown callback received, ignoring.') + else: + del self.callbacks[sid][namespace][id] + if callback is not None: + if asyncio.iscoroutinefunction(callback): + try: + await callback(*data) + except asyncio.CancelledError: # pragma: no cover + pass + else: + callback(*data) diff --git a/socketio/asyncio_server.py b/socketio/asyncio_server.py new file mode 100644 index 0000000..c3dc312 --- /dev/null +++ b/socketio/asyncio_server.py @@ -0,0 +1,342 @@ +import asyncio + +import engineio + +from . import asyncio_manager +from . import packet +from . import server + + +class AsyncServer(server.Server): + """A Socket.IO server for asyncio. + + This class implements a fully compliant Socket.IO web server with support + for websocket and long-polling transports, compatible with the asyncio + framework on Python 3.5 or newer. + + :param client_manager: The client manager instance that will manage the + client list. When this is omitted, the client list + is stored in an in-memory structure, so the use of + multiple connected servers is not possible. + :param logger: To enable logging set to ``True`` or pass a logger object to + use. To disable logging set to ``False``. + :param json: An alternative json module to use for encoding and decoding + packets. Custom json modules must have ``dumps`` and ``loads`` + functions that are compatible with the standard library + versions. + :param kwargs: Connection parameters for the underlying Engine.IO server. + + The Engine.IO configuration supports the following settings: + + :param async_mode: The asynchronous model to use. See the Deployment + section in the documentation for a description of the + available options. Valid async modes are "threading", + "eventlet", "gevent" and "gevent_uwsgi". If this + argument is not given, "eventlet" is tried first, then + "gevent_uwsgi", then "gevent", and finally "threading". + The first async mode that has all its dependencies + installed is then one that is chosen. + :param ping_timeout: The time in seconds that the client waits for the + server to respond before disconnecting. + :param ping_interval: The interval in seconds at which the client pings + the server. + :param max_http_buffer_size: The maximum size of a message when using the + polling transport. + :param allow_upgrades: Whether to allow transport upgrades or not. + :param http_compression: Whether to compress packages when using the + polling transport. + :param compression_threshold: Only compress messages when their byte size + is greater than this value. + :param cookie: Name of the HTTP cookie that contains the client session + id. If set to ``None``, a cookie is not sent to the client. + :param cors_allowed_origins: List of origins that are allowed to connect + to this server. All origins are allowed by + default. + :param cors_credentials: Whether credentials (cookies, authentication) are + allowed in requests to this server. + :param engineio_logger: To enable Engine.IO logging set to ``True`` or pass + a logger object to use. To disable logging set to + ``False``. + """ + def __init__(self, client_manager=None, logger=False, binary=False, + json=None, async_handlers=False, **kwargs): + if client_manager is None: + client_manager = asyncio_manager.AsyncioManager() + super().__init__(client_manager=client_manager, logger=logger, + binary=False, json=json, **kwargs) + + def is_asyncio_based(self): + return True + + def attach(self, app, socketio_path='socket.io'): + """Attach the Socket.IO server to an application.""" + self.eio.attach(app, socketio_path) + + async def emit(self, event, data=None, room=None, skip_sid=None, + namespace=None, callback=None, **kwargs): + """Emit a custom event to one or more connected clients. + + :param event: The event name. It can be any string. The event names + ``'connect'``, ``'message'`` and ``'disconnect'`` are + reserved and should not be used. + :param data: The data to send to the client or clients. Data can be of + type ``str``, ``bytes``, ``list`` or ``dict``. If a + ``list`` or ``dict``, the data will be serialized as JSON. + :param room: The recipient of the message. This can be set to the + session ID of a client to address that client's room, or + to any custom room created by the application, If this + argument is omitted the event is broadcasted to all + connected clients. + :param skip_sid: The session ID of a client to skip when broadcasting + to a room or to all clients. This can be used to + prevent a message from being sent to the sender. + :param namespace: The Socket.IO namespace for the event. If this + argument is omitted the event is emitted to the + default namespace. + :param callback: If given, this function will be called to acknowledge + the the client has received the message. The arguments + that will be passed to the function are those provided + by the client. Callback functions can only be used + when addressing an individual client. + :param ignore_queue: Only used when a message queue is configured. If + set to ``True``, the event is emitted to the + clients directly, without going through the queue. + This is more efficient, but only works when a + single server process is used. It is recommended + to always leave this parameter with its default + value of ``False``. + + Note: this method is asynchronous. + """ + namespace = namespace or '/' + self.logger.info('emitting event "%s" to %s [%s]', event, + room or 'all', namespace) + await self.manager.emit(event, data, namespace, room, skip_sid, + callback, **kwargs) + + async def send(self, data, room=None, skip_sid=None, namespace=None, + callback=None, **kwargs): + """Send a message to one or more connected clients. + + This function emits an event with the name ``'message'``. Use + :func:`emit` to issue custom event names. + + :param data: The data to send to the client or clients. Data can be of + type ``str``, ``bytes``, ``list`` or ``dict``. If a + ``list`` or ``dict``, the data will be serialized as JSON. + :param room: The recipient of the message. This can be set to the + session ID of a client to address that client's room, or + to any custom room created by the application, If this + argument is omitted the event is broadcasted to all + connected clients. + :param skip_sid: The session ID of a client to skip when broadcasting + to a room or to all clients. This can be used to + prevent a message from being sent to the sender. + :param namespace: The Socket.IO namespace for the event. If this + argument is omitted the event is emitted to the + default namespace. + :param callback: If given, this function will be called to acknowledge + the the client has received the message. The arguments + that will be passed to the function are those provided + by the client. Callback functions can only be used + when addressing an individual client. + :param ignore_queue: Only used when a message queue is configured. If + set to ``True``, the event is emitted to the + clients directly, without going through the queue. + This is more efficient, but only works when a + single server process is used. It is recommended + to always leave this parameter with its default + value of ``False``. + + Note: this method is asynchronous. + """ + await self.emit('message', data, room, skip_sid, namespace, callback, + **kwargs) + + async def disconnect(self, sid, namespace=None): + """Disconnect a client. + + :param sid: Session ID of the client. + :param namespace: The Socket.IO namespace to disconnect. If this + argument is omitted the default namespace is used. + + Note: this method is asynchronous. + """ + namespace = namespace or '/' + if self.manager.is_connected(sid, namespace=namespace): + self.logger.info('Disconnecting %s [%s]', sid, namespace) + self.manager.pre_disconnect(sid, namespace=namespace) + await self._send_packet(sid, packet.Packet(packet.DISCONNECT, + namespace=namespace)) + await self._trigger_event('disconnect', namespace, sid) + self.manager.disconnect(sid, namespace=namespace) + + async def handle_request(self, environ): + """Handle an HTTP request from the client. + + This is the entry point of the Socket.IO application, using the same + interface as a WSGI application. For the typical usage, this function + is invoked by the :class:`Middleware` instance, but it can be invoked + directly when the middleware is not used. + + :param environ: The WSGI environment. + :param start_response: The WSGI ``start_response`` function. + + This function returns the HTTP response body to deliver to the client + as a byte sequence. + + Note: this method is asynchronous. + """ + if not self.manager_initialized: + self.manager_initialized = True + self.manager.initialize() + return await self.eio.handle_request(environ) + + def start_background_task(self, target, *args, **kwargs): + raise RuntimeError('Not implemented, use asyncio.') + + def sleep(self, seconds=0): + raise RuntimeError('Not implemented, use asyncio.') + + async def _emit_internal(self, sid, event, data, namespace=None, id=None): + """Send a message to a client.""" + # tuples are expanded to multiple arguments, everything else is sent + # as a single argument + if isinstance(data, tuple): + data = list(data) + else: + data = [data] + await self._send_packet(sid, packet.Packet( + packet.EVENT, namespace=namespace, data=[event] + data, id=id, + binary=None)) + + async def _send_packet(self, sid, pkt): + """Send a Socket.IO packet to a client.""" + encoded_packet = pkt.encode() + if isinstance(encoded_packet, list): + binary = False + for ep in encoded_packet: + await self.eio.send(sid, ep, binary=binary) + binary = True + else: + await self.eio.send(sid, encoded_packet, binary=False) + + async def _handle_connect(self, sid, namespace): + """Handle a client connection request.""" + namespace = namespace or '/' + self.manager.connect(sid, namespace) + if await self._trigger_event('connect', namespace, sid, + self.environ[sid]) is False: + self.manager.disconnect(sid, namespace) + await self._send_packet(sid, packet.Packet(packet.ERROR, + namespace=namespace)) + return False + else: + await self._send_packet(sid, packet.Packet(packet.CONNECT, + namespace=namespace)) + + async def _handle_disconnect(self, sid, namespace): + """Handle a client disconnect.""" + namespace = namespace or '/' + if namespace == '/': + namespace_list = list(self.manager.get_namespaces()) + else: + namespace_list = [namespace] + for n in namespace_list: + if n != '/' and self.manager.is_connected(sid, n): + await self._trigger_event('disconnect', n, sid) + self.manager.disconnect(sid, n) + if namespace == '/' and self.manager.is_connected(sid, namespace): + await self._trigger_event('disconnect', '/', sid) + self.manager.disconnect(sid, '/') + if sid in self.environ: + del self.environ[sid] + + async def _handle_event(self, sid, namespace, id, data): + """Handle an incoming client event.""" + namespace = namespace or '/' + self.logger.info('received event "%s" from %s [%s]', data[0], sid, + namespace) + await self._handle_event_internal(self, sid, data, namespace, id) + + async def _handle_event_internal(self, server, sid, data, namespace, id): + r = await server._trigger_event(data[0], namespace, sid, *data[1:]) + if id is not None: + # send ACK packet with the response returned by the handler + # tuples are expanded as multiple arguments + if r is None: + data = [] + elif isinstance(r, tuple): + data = list(r) + else: + data = [r] + await server._send_packet(sid, packet.Packet(packet.ACK, + namespace=namespace, + id=id, data=data, + binary=None)) + + async def _handle_ack(self, sid, namespace, id, data): + """Handle ACK packets from the client.""" + namespace = namespace or '/' + self.logger.info('received ack from %s [%s]', sid, namespace) + await self.manager.trigger_callback(sid, namespace, id, data) + + async def _trigger_event(self, event, namespace, *args): + """Invoke an application event handler.""" + # first see if we have an explicit handler for the event + if namespace in self.handlers and event in self.handlers[namespace]: + if asyncio.iscoroutinefunction(self.handlers[namespace][event]): + try: + ret = await self.handlers[namespace][event](*args) + except asyncio.CancelledError: # pragma: no cover + pass + else: + ret = self.handlers[namespace][event](*args) + return ret + + # or else, forward the event to a namepsace handler if one exists + elif namespace in self.namespace_handlers: + return await self.namespace_handlers[namespace].trigger_event( + event, *args) + + async def _handle_eio_connect(self, sid, environ): + """Handle the Engine.IO connection event.""" + self.environ[sid] = environ + return await self._handle_connect(sid, '/') + + async def _handle_eio_message(self, sid, data): + """Dispatch Engine.IO messages.""" + if sid in self._binary_packet: + pkt = self._binary_packet[sid] + if pkt.add_attachment(data): + del self._binary_packet[sid] + if pkt.packet_type == packet.BINARY_EVENT: + await self._handle_event(sid, pkt.namespace, pkt.id, + pkt.data) + else: + await self._handle_ack(sid, pkt.namespace, pkt.id, + pkt.data) + else: + pkt = packet.Packet(encoded_packet=data) + if pkt.packet_type == packet.CONNECT: + await self._handle_connect(sid, pkt.namespace) + elif pkt.packet_type == packet.DISCONNECT: + await self._handle_disconnect(sid, pkt.namespace) + elif pkt.packet_type == packet.EVENT: + await self._handle_event(sid, pkt.namespace, pkt.id, pkt.data) + elif pkt.packet_type == packet.ACK: + await self._handle_ack(sid, pkt.namespace, pkt.id, pkt.data) + elif pkt.packet_type == packet.BINARY_EVENT or \ + pkt.packet_type == packet.BINARY_ACK: + self._binary_packet[sid] = pkt + elif pkt.packet_type == packet.ERROR: + raise ValueError('Unexpected ERROR packet.') + else: + raise ValueError('Unknown packet type.') + + async def _handle_eio_disconnect(self, sid): + """Handle Engine.IO disconnect event.""" + await self._handle_disconnect(sid, '/') + + def _engineio_server_class(self): + return engineio.AsyncServer diff --git a/socketio/base_manager.py b/socketio/base_manager.py index 198fabc..283db13 100644 --- a/socketio/base_manager.py +++ b/socketio/base_manager.py @@ -18,9 +18,15 @@ class BaseManager(object): self.callbacks = {} self.pending_disconnect = {} - def initialize(self, server): + def set_server(self, server): self.server = server + def initialize(self): + """Invoked before the first request is received. Subclasses can add + their initialization code here. + """ + pass + def get_namespaces(self): """Return an iterable with the active namespace names.""" return six.iterkeys(self.rooms) diff --git a/socketio/pubsub_manager.py b/socketio/pubsub_manager.py index 5de89b6..4810728 100644 --- a/socketio/pubsub_manager.py +++ b/socketio/pubsub_manager.py @@ -30,8 +30,8 @@ class PubSubManager(BaseManager): self.write_only = write_only self.host_id = uuid.uuid4().hex - def initialize(self, server): - super(PubSubManager, self).initialize(server) + def initialize(self): + super(PubSubManager, self).initialize() if not self.write_only: self.thread = self.server.start_background_task(self._thread) self.server.logger.info(self.name + ' backend initialized.') diff --git a/socketio/server.py b/socketio/server.py index 211eff8..7975ec5 100644 --- a/socketio/server.py +++ b/socketio/server.py @@ -79,7 +79,7 @@ class Server(object): packet.Packet.json = json engineio_options['json'] = json engineio_options['async_handlers'] = False - self.eio = engineio.Server(**engineio_options) + self.eio = self._engineio_server_class()(**engineio_options) self.eio.on('connect', self._handle_eio_connect) self.eio.on('message', self._handle_eio_message) self.eio.on('disconnect', self._handle_eio_disconnect) @@ -106,6 +106,7 @@ class Server(object): if client_manager is None: client_manager = base_manager.BaseManager() self.manager = client_manager + self.manager.set_server(self) self.manager_initialized = False self.async_handlers = async_handlers @@ -346,7 +347,7 @@ class Server(object): """ if not self.manager_initialized: self.manager_initialized = True - self.manager.initialize(self) + self.manager.initialize() return self.eio.handle_request(environ, start_response) def start_background_task(self, target, *args, **kwargs): @@ -518,3 +519,6 @@ class Server(object): def _handle_eio_disconnect(self, sid): """Handle Engine.IO disconnect event.""" self._handle_disconnect(sid, '/') + + def _engineio_server_class(self): + return engineio.Server diff --git a/tests/test_base_manager.py b/tests/test_base_manager.py index 2a801be..55141d4 100644 --- a/tests/test_base_manager.py +++ b/tests/test_base_manager.py @@ -13,7 +13,8 @@ class TestBaseManager(unittest.TestCase): def setUp(self): mock_server = mock.MagicMock() self.bm = base_manager.BaseManager() - self.bm.initialize(mock_server) + self.bm.set_server(mock_server) + self.bm.initialize() def test_connect(self): self.bm.connect('123', '/foo') diff --git a/tests/test_pubsub_manager.py b/tests/test_pubsub_manager.py index d4cca4f..aa461b6 100644 --- a/tests/test_pubsub_manager.py +++ b/tests/test_pubsub_manager.py @@ -16,7 +16,8 @@ class TestBaseManager(unittest.TestCase): mock_server = mock.MagicMock() self.pm = pubsub_manager.PubSubManager() self.pm._publish = mock.MagicMock() - self.pm.initialize(mock_server) + self.pm.set_server(mock_server) + self.pm.initialize() def test_default_init(self): self.assertEqual(self.pm.channel, 'socketio') @@ -32,7 +33,8 @@ class TestBaseManager(unittest.TestCase): def test_write_only_init(self): mock_server = mock.MagicMock() pm = pubsub_manager.PubSubManager(write_only=True) - pm.initialize(mock_server) + pm.set_server(mock_server) + pm.initialize() self.assertEqual(pm.channel, 'socketio') self.assertEqual(len(pm.host_id), 32) self.assertEqual(pm.server.start_background_task.call_count, 0)