Browse Source

asyncio support

pull/31/merge
Miguel Grinberg 8 years ago
parent
commit
53d10d9f32
  1. 2
      .gitignore
  2. 4
      .travis.yml
  3. 0
      examples/aiohttp/app.html
  4. 88
      examples/aiohttp/app.py
  5. 64
      examples/aiohttp/latency.html
  6. 25
      examples/aiohttp/latency.py
  7. 0
      examples/aiohttp/static/style.css
  8. 0
      examples/wsgi/app.py
  9. 0
      examples/wsgi/latency.py
  10. 0
      examples/wsgi/requirements.txt
  11. 4
      examples/wsgi/static/style.css
  12. 91
      examples/wsgi/templates/index.html
  13. 0
      examples/wsgi/templates/latency.html
  14. 13
      socketio/__init__.py
  15. 42
      socketio/asyncio_manager.py
  16. 342
      socketio/asyncio_server.py
  17. 8
      socketio/base_manager.py
  18. 4
      socketio/pubsub_manager.py
  19. 8
      socketio/server.py
  20. 3
      tests/test_base_manager.py
  21. 6
      tests/test_pubsub_manager.py

2
.gitignore

@ -40,5 +40,7 @@ venv*
.eggs
.ropeproject
.idea
.vscode
tags
htmlcov
*.swp

4
.travis.yml

@ -1,8 +1,6 @@
language: python
matrix:
include:
- python: 2.7
env: TOXENV=flake8
- python: 3.6
env: TOXENV=flake8
- python: 2.7
@ -15,7 +13,7 @@ matrix:
env: TOXENV=py36
- python: pypy
env: TOXENV=pypy
- python: 2.7
- python: 3.6
env: TOXENV=docs
install:
- pip install tox

0
examples/templates/index.html → examples/aiohttp/app.html

88
examples/aiohttp/app.py

@ -0,0 +1,88 @@
import asyncio
from aiohttp import web
import socketio
sio = socketio.AsyncServer(async_mode='aiohttp')
app = web.Application()
sio.attach(app)
async def background_task():
"""Example of how to send server generated events to clients."""
count = 0
while True:
await asyncio.sleep(10)
count += 1
await sio.emit('my response', {'data': 'Server generated event'},
namespace='/test')
async def index(request):
with open('app.html') as f:
return web.Response(text=f.read(), content_type='text/html')
@sio.on('my event', namespace='/test')
async def test_message(sid, message):
await sio.emit('my response', {'data': message['data']}, room=sid,
namespace='/test')
@sio.on('my broadcast event', namespace='/test')
async def test_broadcast_message(sid, message):
await sio.emit('my response', {'data': message['data']}, namespace='/test')
@sio.on('join', namespace='/test')
async def join(sid, message):
sio.enter_room(sid, message['room'], namespace='/test')
await sio.emit('my response', {'data': 'Entered room: ' + message['room']},
room=sid, namespace='/test')
@sio.on('leave', namespace='/test')
async def leave(sid, message):
sio.leave_room(sid, message['room'], namespace='/test')
await sio.emit('my response', {'data': 'Left room: ' + message['room']},
room=sid, namespace='/test')
@sio.on('close room', namespace='/test')
async def close(sid, message):
await sio.emit('my response',
{'data': 'Room ' + message['room'] + ' is closing.'},
room=message['room'], namespace='/test')
sio.close_room(message['room'], namespace='/test')
@sio.on('my room event', namespace='/test')
async def send_room_message(sid, message):
await sio.emit('my response', {'data': message['data']},
room=message['room'], namespace='/test')
@sio.on('disconnect request', namespace='/test')
async def disconnect_request(sid):
await sio.disconnect(sid, namespace='/test')
@sio.on('connect', namespace='/test')
async def test_connect(sid, environ):
await sio.emit('my response', {'data': 'Connected', 'count': 0}, room=sid,
namespace='/test')
@sio.on('disconnect', namespace='/test')
def test_disconnect(sid):
print('Client disconnected')
app.router.add_static('/static', 'static')
app.router.add_get('/', index)
if __name__ == '__main__':
asyncio.ensure_future(background_task())
web.run_app(app)

64
examples/aiohttp/latency.html

@ -0,0 +1,64 @@
<!doctype html>
<html>
<head>
<title>Socket.IO Latency</title>
<link rel="stylesheet" href="/static/style.css" />
</head>
<body>
<h1>Socket.IO Latency <span id="latency"></span></h1>
<h2 id="transport">(connecting)</h2>
<canvas id="chart" height="200"></canvas>
<script src="//cdnjs.cloudflare.com/ajax/libs/jquery/2.1.4/jquery.js"></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/smoothie/1.27.0/smoothie.js"></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/socket.io/1.3.6/socket.io.min.js"></script>
<script>
// socket
var socket = io.connect('http://' + document.domain + ':' + location.port);
var char = $('chart').get(0);
socket.on('connect', function() {
if (chart.getContext) {
render();
window.onresize = render;
}
send();
});
socket.on('pong_from_server', function() {
var latency = new Date - last;
$('#latency').text(latency + 'ms');
if (time)
time.append(+new Date, latency);
setTimeout(send, 100);
});
socket.on('disconnect', function() {
if (smoothie)
smoothie.stop();
$('#transport').text('(disconnected)');
});
var last;
function send() {
last = new Date;
socket.emit('ping_from_client');
$('#transport').text(socket.io.engine.transport.name);
}
// chart
var smoothie;
var time;
function render() {
if (smoothie)
smoothie.stop();
chart.width = document.body.clientWidth;
smoothie = new SmoothieChart();
smoothie.streamTo(chart, 1000);
time = new TimeSeries();
smoothie.addTimeSeries(time, {
strokeStyle: 'rgb(255, 0, 0)',
fillStyle: 'rgba(255, 0, 0, 0.4)',
lineWidth: 2
});
}
</script>
</body>
</html>

25
examples/aiohttp/latency.py

@ -0,0 +1,25 @@
from aiohttp import web
import socketio
sio = socketio.AsyncServer(async_mode='aiohttp')
app = web.Application()
sio.attach(app)
async def index(request):
with open('latency.html') as f:
return web.Response(text=f.read(), content_type='text/html')
@sio.on('ping_from_client')
async def ping(sid):
await sio.emit('pong_from_server', room=sid)
app.router.add_static('/static', 'static')
app.router.add_get('/', index)
if __name__ == '__main__':
web.run_app(app)

0
examples/static/style.css → examples/aiohttp/static/style.css

0
examples/app.py → examples/wsgi/app.py

0
examples/latency.py → examples/wsgi/latency.py

0
examples/requirements.txt → examples/wsgi/requirements.txt

4
examples/wsgi/static/style.css

@ -0,0 +1,4 @@
body { margin: 0; padding: 0; font-family: Helvetica Neue; }
h1 { margin: 100px 100px 10px; }
h2 { color: #999; margin: 0 100px 30px; font-weight: normal; }
#latency { color: red; }

91
examples/wsgi/templates/index.html

@ -0,0 +1,91 @@
<!DOCTYPE HTML>
<html>
<head>
<title>Flask-SocketIO Test</title>
<script type="text/javascript" src="//code.jquery.com/jquery-2.1.4.min.js"></script>
<script type="text/javascript" src="//cdnjs.cloudflare.com/ajax/libs/socket.io/1.3.5/socket.io.min.js"></script>
<script type="text/javascript" charset="utf-8">
$(document).ready(function(){
namespace = '/test';
var socket = io.connect('http://' + document.domain + ':' + location.port + namespace);
socket.on('connect', function() {
socket.emit('my event', {data: 'I\'m connected!'});
});
socket.on('disconnect', function() {
$('#log').append('<br>Disconnected');
});
socket.on('my response', function(msg) {
$('#log').append('<br>Received: ' + msg.data);
});
// event handler for server sent data
// the data is displayed in the "Received" section of the page
// handlers for the different forms in the page
// these send data to the server in a variety of ways
$('form#emit').submit(function(event) {
socket.emit('my event', {data: $('#emit_data').val()});
return false;
});
$('form#broadcast').submit(function(event) {
socket.emit('my broadcast event', {data: $('#broadcast_data').val()});
return false;
});
$('form#join').submit(function(event) {
socket.emit('join', {room: $('#join_room').val()});
return false;
});
$('form#leave').submit(function(event) {
socket.emit('leave', {room: $('#leave_room').val()});
return false;
});
$('form#send_room').submit(function(event) {
socket.emit('my room event', {room: $('#room_name').val(), data: $('#room_data').val()});
return false;
});
$('form#close').submit(function(event) {
socket.emit('close room', {room: $('#close_room').val()});
return false;
});
$('form#disconnect').submit(function(event) {
socket.emit('disconnect request');
return false;
});
});
</script>
</head>
<body>
<h1>Flask-SocketIO Test</h1>
<h2>Send:</h2>
<form id="emit" method="POST" action='#'>
<input type="text" name="emit_data" id="emit_data" placeholder="Message">
<input type="submit" value="Echo">
</form>
<form id="broadcast" method="POST" action='#'>
<input type="text" name="broadcast_data" id="broadcast_data" placeholder="Message">
<input type="submit" value="Broadcast">
</form>
<form id="join" method="POST" action='#'>
<input type="text" name="join_room" id="join_room" placeholder="Room Name">
<input type="submit" value="Join Room">
</form>
<form id="leave" method="POST" action='#'>
<input type="text" name="leave_room" id="leave_room" placeholder="Room Name">
<input type="submit" value="Leave Room">
</form>
<form id="send_room" method="POST" action='#'>
<input type="text" name="room_name" id="room_name" placeholder="Room Name">
<input type="text" name="room_data" id="room_data" placeholder="Message">
<input type="submit" value="Send to Room">
</form>
<form id="close" method="POST" action="#">
<input type="text" name="close_room" id="close_room" placeholder="Room Name">
<input type="submit" value="Close Room">
</form>
<form id="disconnect" method="POST" action="#">
<input type="submit" value="Disconnect">
</form>
<h2>Receive:</h2>
<div><p id="log"></p></div>
</body>
</html>

0
examples/templates/latency.html → examples/wsgi/templates/latency.html

13
socketio/__init__.py

@ -1,3 +1,5 @@
import sys
from .middleware import Middleware
from .base_manager import BaseManager
from .pubsub_manager import PubSubManager
@ -6,8 +8,15 @@ from .redis_manager import RedisManager
from .zmq_manager import ZmqManager
from .server import Server
from .namespace import Namespace
if sys.version_info >= (3, 5): # pragma: no cover
from .asyncio_server import AsyncServer
else: # pragma: no cover
AsyncServer = None
__version__ = '1.6.3'
__all__ = [__version__, Middleware, Server, BaseManager, PubSubManager,
KombuManager, RedisManager, ZmqManager, Namespace]
__all__ = ['__version__', 'Middleware', 'Server', 'BaseManager',
'PubSubManager', 'KombuManager', 'RedisManager', 'ZmqManager',
'Namespace']
if AsyncServer is not None: # pragma: no cover
__all__.append('AsyncServer')

42
socketio/asyncio_manager.py

@ -0,0 +1,42 @@
import asyncio
from .base_manager import BaseManager
class AsyncioManager(BaseManager):
"""Manage a client list for an asyncio server."""
async def emit(self, event, data, namespace, room=None, skip_sid=None,
callback=None, **kwargs):
"""Emit a message to a single client, a room, or all the clients
connected to the namespace."""
if namespace not in self.rooms or room not in self.rooms[namespace]:
return
tasks = []
for sid in self.get_participants(namespace, room):
if sid != skip_sid:
if callback is not None:
id = self._generate_ack_id(sid, namespace, callback)
else:
id = None
tasks.append(self.server._emit_internal(sid, event, data,
namespace, id))
await asyncio.wait(tasks)
async def trigger_callback(self, sid, namespace, id, data):
"""Invoke an application callback."""
callback = None
try:
callback = self.callbacks[sid][namespace][id]
except KeyError:
# if we get an unknown callback we just ignore it
self.server.logger.warning('Unknown callback received, ignoring.')
else:
del self.callbacks[sid][namespace][id]
if callback is not None:
if asyncio.iscoroutinefunction(callback):
try:
await callback(*data)
except asyncio.CancelledError: # pragma: no cover
pass
else:
callback(*data)

342
socketio/asyncio_server.py

@ -0,0 +1,342 @@
import asyncio
import engineio
from . import asyncio_manager
from . import packet
from . import server
class AsyncServer(server.Server):
"""A Socket.IO server for asyncio.
This class implements a fully compliant Socket.IO web server with support
for websocket and long-polling transports, compatible with the asyncio
framework on Python 3.5 or newer.
:param client_manager: The client manager instance that will manage the
client list. When this is omitted, the client list
is stored in an in-memory structure, so the use of
multiple connected servers is not possible.
:param logger: To enable logging set to ``True`` or pass a logger object to
use. To disable logging set to ``False``.
:param json: An alternative json module to use for encoding and decoding
packets. Custom json modules must have ``dumps`` and ``loads``
functions that are compatible with the standard library
versions.
:param kwargs: Connection parameters for the underlying Engine.IO server.
The Engine.IO configuration supports the following settings:
:param async_mode: The asynchronous model to use. See the Deployment
section in the documentation for a description of the
available options. Valid async modes are "threading",
"eventlet", "gevent" and "gevent_uwsgi". If this
argument is not given, "eventlet" is tried first, then
"gevent_uwsgi", then "gevent", and finally "threading".
The first async mode that has all its dependencies
installed is then one that is chosen.
:param ping_timeout: The time in seconds that the client waits for the
server to respond before disconnecting.
:param ping_interval: The interval in seconds at which the client pings
the server.
:param max_http_buffer_size: The maximum size of a message when using the
polling transport.
:param allow_upgrades: Whether to allow transport upgrades or not.
:param http_compression: Whether to compress packages when using the
polling transport.
:param compression_threshold: Only compress messages when their byte size
is greater than this value.
:param cookie: Name of the HTTP cookie that contains the client session
id. If set to ``None``, a cookie is not sent to the client.
:param cors_allowed_origins: List of origins that are allowed to connect
to this server. All origins are allowed by
default.
:param cors_credentials: Whether credentials (cookies, authentication) are
allowed in requests to this server.
:param engineio_logger: To enable Engine.IO logging set to ``True`` or pass
a logger object to use. To disable logging set to
``False``.
"""
def __init__(self, client_manager=None, logger=False, binary=False,
json=None, async_handlers=False, **kwargs):
if client_manager is None:
client_manager = asyncio_manager.AsyncioManager()
super().__init__(client_manager=client_manager, logger=logger,
binary=False, json=json, **kwargs)
def is_asyncio_based(self):
return True
def attach(self, app, socketio_path='socket.io'):
"""Attach the Socket.IO server to an application."""
self.eio.attach(app, socketio_path)
async def emit(self, event, data=None, room=None, skip_sid=None,
namespace=None, callback=None, **kwargs):
"""Emit a custom event to one or more connected clients.
:param event: The event name. It can be any string. The event names
``'connect'``, ``'message'`` and ``'disconnect'`` are
reserved and should not be used.
:param data: The data to send to the client or clients. Data can be of
type ``str``, ``bytes``, ``list`` or ``dict``. If a
``list`` or ``dict``, the data will be serialized as JSON.
:param room: The recipient of the message. This can be set to the
session ID of a client to address that client's room, or
to any custom room created by the application, If this
argument is omitted the event is broadcasted to all
connected clients.
:param skip_sid: The session ID of a client to skip when broadcasting
to a room or to all clients. This can be used to
prevent a message from being sent to the sender.
:param namespace: The Socket.IO namespace for the event. If this
argument is omitted the event is emitted to the
default namespace.
:param callback: If given, this function will be called to acknowledge
the the client has received the message. The arguments
that will be passed to the function are those provided
by the client. Callback functions can only be used
when addressing an individual client.
:param ignore_queue: Only used when a message queue is configured. If
set to ``True``, the event is emitted to the
clients directly, without going through the queue.
This is more efficient, but only works when a
single server process is used. It is recommended
to always leave this parameter with its default
value of ``False``.
Note: this method is asynchronous.
"""
namespace = namespace or '/'
self.logger.info('emitting event "%s" to %s [%s]', event,
room or 'all', namespace)
await self.manager.emit(event, data, namespace, room, skip_sid,
callback, **kwargs)
async def send(self, data, room=None, skip_sid=None, namespace=None,
callback=None, **kwargs):
"""Send a message to one or more connected clients.
This function emits an event with the name ``'message'``. Use
:func:`emit` to issue custom event names.
:param data: The data to send to the client or clients. Data can be of
type ``str``, ``bytes``, ``list`` or ``dict``. If a
``list`` or ``dict``, the data will be serialized as JSON.
:param room: The recipient of the message. This can be set to the
session ID of a client to address that client's room, or
to any custom room created by the application, If this
argument is omitted the event is broadcasted to all
connected clients.
:param skip_sid: The session ID of a client to skip when broadcasting
to a room or to all clients. This can be used to
prevent a message from being sent to the sender.
:param namespace: The Socket.IO namespace for the event. If this
argument is omitted the event is emitted to the
default namespace.
:param callback: If given, this function will be called to acknowledge
the the client has received the message. The arguments
that will be passed to the function are those provided
by the client. Callback functions can only be used
when addressing an individual client.
:param ignore_queue: Only used when a message queue is configured. If
set to ``True``, the event is emitted to the
clients directly, without going through the queue.
This is more efficient, but only works when a
single server process is used. It is recommended
to always leave this parameter with its default
value of ``False``.
Note: this method is asynchronous.
"""
await self.emit('message', data, room, skip_sid, namespace, callback,
**kwargs)
async def disconnect(self, sid, namespace=None):
"""Disconnect a client.
:param sid: Session ID of the client.
:param namespace: The Socket.IO namespace to disconnect. If this
argument is omitted the default namespace is used.
Note: this method is asynchronous.
"""
namespace = namespace or '/'
if self.manager.is_connected(sid, namespace=namespace):
self.logger.info('Disconnecting %s [%s]', sid, namespace)
self.manager.pre_disconnect(sid, namespace=namespace)
await self._send_packet(sid, packet.Packet(packet.DISCONNECT,
namespace=namespace))
await self._trigger_event('disconnect', namespace, sid)
self.manager.disconnect(sid, namespace=namespace)
async def handle_request(self, environ):
"""Handle an HTTP request from the client.
This is the entry point of the Socket.IO application, using the same
interface as a WSGI application. For the typical usage, this function
is invoked by the :class:`Middleware` instance, but it can be invoked
directly when the middleware is not used.
:param environ: The WSGI environment.
:param start_response: The WSGI ``start_response`` function.
This function returns the HTTP response body to deliver to the client
as a byte sequence.
Note: this method is asynchronous.
"""
if not self.manager_initialized:
self.manager_initialized = True
self.manager.initialize()
return await self.eio.handle_request(environ)
def start_background_task(self, target, *args, **kwargs):
raise RuntimeError('Not implemented, use asyncio.')
def sleep(self, seconds=0):
raise RuntimeError('Not implemented, use asyncio.')
async def _emit_internal(self, sid, event, data, namespace=None, id=None):
"""Send a message to a client."""
# tuples are expanded to multiple arguments, everything else is sent
# as a single argument
if isinstance(data, tuple):
data = list(data)
else:
data = [data]
await self._send_packet(sid, packet.Packet(
packet.EVENT, namespace=namespace, data=[event] + data, id=id,
binary=None))
async def _send_packet(self, sid, pkt):
"""Send a Socket.IO packet to a client."""
encoded_packet = pkt.encode()
if isinstance(encoded_packet, list):
binary = False
for ep in encoded_packet:
await self.eio.send(sid, ep, binary=binary)
binary = True
else:
await self.eio.send(sid, encoded_packet, binary=False)
async def _handle_connect(self, sid, namespace):
"""Handle a client connection request."""
namespace = namespace or '/'
self.manager.connect(sid, namespace)
if await self._trigger_event('connect', namespace, sid,
self.environ[sid]) is False:
self.manager.disconnect(sid, namespace)
await self._send_packet(sid, packet.Packet(packet.ERROR,
namespace=namespace))
return False
else:
await self._send_packet(sid, packet.Packet(packet.CONNECT,
namespace=namespace))
async def _handle_disconnect(self, sid, namespace):
"""Handle a client disconnect."""
namespace = namespace or '/'
if namespace == '/':
namespace_list = list(self.manager.get_namespaces())
else:
namespace_list = [namespace]
for n in namespace_list:
if n != '/' and self.manager.is_connected(sid, n):
await self._trigger_event('disconnect', n, sid)
self.manager.disconnect(sid, n)
if namespace == '/' and self.manager.is_connected(sid, namespace):
await self._trigger_event('disconnect', '/', sid)
self.manager.disconnect(sid, '/')
if sid in self.environ:
del self.environ[sid]
async def _handle_event(self, sid, namespace, id, data):
"""Handle an incoming client event."""
namespace = namespace or '/'
self.logger.info('received event "%s" from %s [%s]', data[0], sid,
namespace)
await self._handle_event_internal(self, sid, data, namespace, id)
async def _handle_event_internal(self, server, sid, data, namespace, id):
r = await server._trigger_event(data[0], namespace, sid, *data[1:])
if id is not None:
# send ACK packet with the response returned by the handler
# tuples are expanded as multiple arguments
if r is None:
data = []
elif isinstance(r, tuple):
data = list(r)
else:
data = [r]
await server._send_packet(sid, packet.Packet(packet.ACK,
namespace=namespace,
id=id, data=data,
binary=None))
async def _handle_ack(self, sid, namespace, id, data):
"""Handle ACK packets from the client."""
namespace = namespace or '/'
self.logger.info('received ack from %s [%s]', sid, namespace)
await self.manager.trigger_callback(sid, namespace, id, data)
async def _trigger_event(self, event, namespace, *args):
"""Invoke an application event handler."""
# first see if we have an explicit handler for the event
if namespace in self.handlers and event in self.handlers[namespace]:
if asyncio.iscoroutinefunction(self.handlers[namespace][event]):
try:
ret = await self.handlers[namespace][event](*args)
except asyncio.CancelledError: # pragma: no cover
pass
else:
ret = self.handlers[namespace][event](*args)
return ret
# or else, forward the event to a namepsace handler if one exists
elif namespace in self.namespace_handlers:
return await self.namespace_handlers[namespace].trigger_event(
event, *args)
async def _handle_eio_connect(self, sid, environ):
"""Handle the Engine.IO connection event."""
self.environ[sid] = environ
return await self._handle_connect(sid, '/')
async def _handle_eio_message(self, sid, data):
"""Dispatch Engine.IO messages."""
if sid in self._binary_packet:
pkt = self._binary_packet[sid]
if pkt.add_attachment(data):
del self._binary_packet[sid]
if pkt.packet_type == packet.BINARY_EVENT:
await self._handle_event(sid, pkt.namespace, pkt.id,
pkt.data)
else:
await self._handle_ack(sid, pkt.namespace, pkt.id,
pkt.data)
else:
pkt = packet.Packet(encoded_packet=data)
if pkt.packet_type == packet.CONNECT:
await self._handle_connect(sid, pkt.namespace)
elif pkt.packet_type == packet.DISCONNECT:
await self._handle_disconnect(sid, pkt.namespace)
elif pkt.packet_type == packet.EVENT:
await self._handle_event(sid, pkt.namespace, pkt.id, pkt.data)
elif pkt.packet_type == packet.ACK:
await self._handle_ack(sid, pkt.namespace, pkt.id, pkt.data)
elif pkt.packet_type == packet.BINARY_EVENT or \
pkt.packet_type == packet.BINARY_ACK:
self._binary_packet[sid] = pkt
elif pkt.packet_type == packet.ERROR:
raise ValueError('Unexpected ERROR packet.')
else:
raise ValueError('Unknown packet type.')
async def _handle_eio_disconnect(self, sid):
"""Handle Engine.IO disconnect event."""
await self._handle_disconnect(sid, '/')
def _engineio_server_class(self):
return engineio.AsyncServer

8
socketio/base_manager.py

@ -18,9 +18,15 @@ class BaseManager(object):
self.callbacks = {}
self.pending_disconnect = {}
def initialize(self, server):
def set_server(self, server):
self.server = server
def initialize(self):
"""Invoked before the first request is received. Subclasses can add
their initialization code here.
"""
pass
def get_namespaces(self):
"""Return an iterable with the active namespace names."""
return six.iterkeys(self.rooms)

4
socketio/pubsub_manager.py

@ -30,8 +30,8 @@ class PubSubManager(BaseManager):
self.write_only = write_only
self.host_id = uuid.uuid4().hex
def initialize(self, server):
super(PubSubManager, self).initialize(server)
def initialize(self):
super(PubSubManager, self).initialize()
if not self.write_only:
self.thread = self.server.start_background_task(self._thread)
self.server.logger.info(self.name + ' backend initialized.')

8
socketio/server.py

@ -79,7 +79,7 @@ class Server(object):
packet.Packet.json = json
engineio_options['json'] = json
engineio_options['async_handlers'] = False
self.eio = engineio.Server(**engineio_options)
self.eio = self._engineio_server_class()(**engineio_options)
self.eio.on('connect', self._handle_eio_connect)
self.eio.on('message', self._handle_eio_message)
self.eio.on('disconnect', self._handle_eio_disconnect)
@ -106,6 +106,7 @@ class Server(object):
if client_manager is None:
client_manager = base_manager.BaseManager()
self.manager = client_manager
self.manager.set_server(self)
self.manager_initialized = False
self.async_handlers = async_handlers
@ -346,7 +347,7 @@ class Server(object):
"""
if not self.manager_initialized:
self.manager_initialized = True
self.manager.initialize(self)
self.manager.initialize()
return self.eio.handle_request(environ, start_response)
def start_background_task(self, target, *args, **kwargs):
@ -518,3 +519,6 @@ class Server(object):
def _handle_eio_disconnect(self, sid):
"""Handle Engine.IO disconnect event."""
self._handle_disconnect(sid, '/')
def _engineio_server_class(self):
return engineio.Server

3
tests/test_base_manager.py

@ -13,7 +13,8 @@ class TestBaseManager(unittest.TestCase):
def setUp(self):
mock_server = mock.MagicMock()
self.bm = base_manager.BaseManager()
self.bm.initialize(mock_server)
self.bm.set_server(mock_server)
self.bm.initialize()
def test_connect(self):
self.bm.connect('123', '/foo')

6
tests/test_pubsub_manager.py

@ -16,7 +16,8 @@ class TestBaseManager(unittest.TestCase):
mock_server = mock.MagicMock()
self.pm = pubsub_manager.PubSubManager()
self.pm._publish = mock.MagicMock()
self.pm.initialize(mock_server)
self.pm.set_server(mock_server)
self.pm.initialize()
def test_default_init(self):
self.assertEqual(self.pm.channel, 'socketio')
@ -32,7 +33,8 @@ class TestBaseManager(unittest.TestCase):
def test_write_only_init(self):
mock_server = mock.MagicMock()
pm = pubsub_manager.PubSubManager(write_only=True)
pm.initialize(mock_server)
pm.set_server(mock_server)
pm.initialize()
self.assertEqual(pm.channel, 'socketio')
self.assertEqual(len(pm.host_id), 32)
self.assertEqual(pm.server.start_background_task.call_count, 0)

Loading…
Cancel
Save