committed by
GitHub
23 changed files with 1810 additions and 102 deletions
@ -0,0 +1,405 @@ |
|||
from datetime import datetime |
|||
import functools |
|||
import os |
|||
import socket |
|||
import time |
|||
from urllib.parse import parse_qs |
|||
from .exceptions import ConnectionRefusedError |
|||
|
|||
HOSTNAME = socket.gethostname() |
|||
PID = os.getpid() |
|||
|
|||
|
|||
class EventBuffer: |
|||
def __init__(self): |
|||
self.buffer = {} |
|||
|
|||
def push(self, type, count=1): |
|||
timestamp = int(time.time()) * 1000 |
|||
key = '{};{}'.format(timestamp, type) |
|||
if key not in self.buffer: |
|||
self.buffer[key] = { |
|||
'timestamp': timestamp, |
|||
'type': type, |
|||
'count': count, |
|||
} |
|||
else: |
|||
self.buffer[key]['count'] += count |
|||
|
|||
def get_and_clear(self): |
|||
buffer = self.buffer |
|||
self.buffer = {} |
|||
return [value for value in buffer.values()] |
|||
|
|||
|
|||
class InstrumentedServer: |
|||
def __init__(self, sio, auth=None, mode='development', read_only=False, |
|||
server_id=None, namespace='/admin', server_stats_interval=2): |
|||
"""Instrument the Socket.IO server for monitoring with the `Socket.IO |
|||
Admin UI <https://socket.io/docs/v4/admin-ui/>`_. |
|||
""" |
|||
if auth is None: |
|||
raise ValueError('auth must be specified') |
|||
self.sio = sio |
|||
self.auth = auth |
|||
self.admin_namespace = namespace |
|||
self.read_only = read_only |
|||
self.server_id = server_id or ( |
|||
self.sio.manager.host_id if hasattr(self.sio.manager, 'host_id') |
|||
else HOSTNAME |
|||
) |
|||
self.mode = mode |
|||
self.server_stats_interval = server_stats_interval |
|||
self.event_buffer = EventBuffer() |
|||
|
|||
# task that emits "server_stats" every 2 seconds |
|||
self.stop_stats_event = None |
|||
self.stats_task = None |
|||
|
|||
# monkey-patch the server to report metrics to the admin UI |
|||
self.instrument() |
|||
|
|||
def instrument(self): |
|||
self.sio.on('connect', self.admin_connect, |
|||
namespace=self.admin_namespace) |
|||
|
|||
if self.mode == 'development': |
|||
if not self.read_only: # pragma: no branch |
|||
self.sio.on('emit', self.admin_emit, |
|||
namespace=self.admin_namespace) |
|||
self.sio.on('join', self.admin_enter_room, |
|||
namespace=self.admin_namespace) |
|||
self.sio.on('leave', self.admin_leave_room, |
|||
namespace=self.admin_namespace) |
|||
self.sio.on('_disconnect', self.admin_disconnect, |
|||
namespace=self.admin_namespace) |
|||
|
|||
# track socket connection times |
|||
self.sio.manager._timestamps = {} |
|||
|
|||
# report socket.io connections |
|||
self.sio.manager.__connect = self.sio.manager.connect |
|||
self.sio.manager.connect = self._connect |
|||
|
|||
# report socket.io disconnection |
|||
self.sio.manager.__disconnect = self.sio.manager.disconnect |
|||
self.sio.manager.disconnect = self._disconnect |
|||
|
|||
# report join rooms |
|||
self.sio.manager.__basic_enter_room = \ |
|||
self.sio.manager.basic_enter_room |
|||
self.sio.manager.basic_enter_room = self._basic_enter_room |
|||
|
|||
# report leave rooms |
|||
self.sio.manager.__basic_leave_room = \ |
|||
self.sio.manager.basic_leave_room |
|||
self.sio.manager.basic_leave_room = self._basic_leave_room |
|||
|
|||
# report emit events |
|||
self.sio.manager.__emit = self.sio.manager.emit |
|||
self.sio.manager.emit = self._emit |
|||
|
|||
# report receive events |
|||
self.sio.__handle_event_internal = self.sio._handle_event_internal |
|||
self.sio._handle_event_internal = self._handle_event_internal |
|||
|
|||
# report engine.io connections |
|||
self.sio.eio.on('connect', self._handle_eio_connect) |
|||
self.sio.eio.on('disconnect', self._handle_eio_disconnect) |
|||
|
|||
# report polling packets |
|||
from engineio.socket import Socket |
|||
self.sio.eio.__ok = self.sio.eio._ok |
|||
self.sio.eio._ok = self._eio_http_response |
|||
Socket.__handle_post_request = Socket.handle_post_request |
|||
Socket.handle_post_request = functools.partialmethod( |
|||
self.__class__._eio_handle_post_request, self) |
|||
|
|||
# report websocket packets |
|||
Socket.__websocket_handler = Socket._websocket_handler |
|||
Socket._websocket_handler = functools.partialmethod( |
|||
self.__class__._eio_websocket_handler, self) |
|||
|
|||
# report connected sockets with each ping |
|||
if self.mode == 'development': |
|||
Socket.__send_ping = Socket._send_ping |
|||
Socket._send_ping = functools.partialmethod( |
|||
self.__class__._eio_send_ping, self) |
|||
|
|||
def uninstrument(self): # pragma: no cover |
|||
if self.mode == 'development': |
|||
self.sio.manager.connect = self.sio.manager.__connect |
|||
self.sio.manager.disconnect = self.sio.manager.__disconnect |
|||
self.sio.manager.basic_enter_room = \ |
|||
self.sio.manager.__basic_enter_room |
|||
self.sio.manager.basic_leave_room = \ |
|||
self.sio.manager.__basic_leave_room |
|||
self.sio.manager.emit = self.sio.manager.__emit |
|||
self.sio._handle_event_internal = self.sio.__handle_event_internal |
|||
self.sio.eio._ok = self.sio.eio.__ok |
|||
|
|||
from engineio.socket import Socket |
|||
Socket.handle_post_request = Socket.__handle_post_request |
|||
Socket._websocket_handler = Socket.__websocket_handler |
|||
if self.mode == 'development': |
|||
Socket._send_ping = Socket.__send_ping |
|||
|
|||
def admin_connect(self, sid, environ, client_auth): |
|||
if self.auth: |
|||
authenticated = False |
|||
if isinstance(self.auth, dict): |
|||
authenticated = client_auth == self.auth |
|||
elif isinstance(self.auth, list): |
|||
authenticated = client_auth in self.auth |
|||
else: |
|||
authenticated = self.auth(client_auth) |
|||
if not authenticated: |
|||
raise ConnectionRefusedError('authentication failed') |
|||
|
|||
def config(sid): |
|||
self.sio.sleep(0.1) |
|||
|
|||
# supported features |
|||
features = ['AGGREGATED_EVENTS'] |
|||
if not self.read_only: |
|||
features += ['EMIT', 'JOIN', 'LEAVE', 'DISCONNECT', 'MJOIN', |
|||
'MLEAVE', 'MDISCONNECT'] |
|||
if self.mode == 'development': |
|||
features.append('ALL_EVENTS') |
|||
self.sio.emit('config', {'supportedFeatures': features}, |
|||
to=sid, namespace=self.admin_namespace) |
|||
|
|||
# send current sockets |
|||
if self.mode == 'development': |
|||
all_sockets = [] |
|||
for nsp in self.sio.manager.get_namespaces(): |
|||
for sid, eio_sid in self.sio.manager.get_participants( |
|||
nsp, None): |
|||
all_sockets.append( |
|||
self.serialize_socket(sid, nsp, eio_sid)) |
|||
self.sio.emit('all_sockets', all_sockets, to=sid, |
|||
namespace=self.admin_namespace) |
|||
|
|||
self.sio.start_background_task(config, sid) |
|||
|
|||
def admin_emit(self, _, namespace, room_filter, event, *data): |
|||
self.sio.emit(event, data, to=room_filter, namespace=namespace) |
|||
|
|||
def admin_enter_room(self, _, namespace, room, room_filter=None): |
|||
for sid, _ in self.sio.manager.get_participants( |
|||
namespace, room_filter): |
|||
self.sio.enter_room(sid, room, namespace=namespace) |
|||
|
|||
def admin_leave_room(self, _, namespace, room, room_filter=None): |
|||
for sid, _ in self.sio.manager.get_participants( |
|||
namespace, room_filter): |
|||
self.sio.leave_room(sid, room, namespace=namespace) |
|||
|
|||
def admin_disconnect(self, _, namespace, close, room_filter=None): |
|||
for sid, _ in self.sio.manager.get_participants( |
|||
namespace, room_filter): |
|||
self.sio.disconnect(sid, namespace=namespace) |
|||
|
|||
def shutdown(self): |
|||
if self.stats_task: # pragma: no branch |
|||
self.stop_stats_event.set() |
|||
self.stats_task.join() |
|||
|
|||
def _connect(self, eio_sid, namespace): |
|||
sid = self.sio.manager.__connect(eio_sid, namespace) |
|||
t = time.time() |
|||
self.sio.manager._timestamps[sid] = t |
|||
serialized_socket = self.serialize_socket(sid, namespace, eio_sid) |
|||
self.sio.emit('socket_connected', ( |
|||
serialized_socket, |
|||
datetime.utcfromtimestamp(t).isoformat() + 'Z', |
|||
), namespace=self.admin_namespace) |
|||
return sid |
|||
|
|||
def _disconnect(self, sid, namespace, **kwargs): |
|||
del self.sio.manager._timestamps[sid] |
|||
self.sio.emit('socket_disconnected', ( |
|||
namespace, |
|||
sid, |
|||
'N/A', |
|||
datetime.utcnow().isoformat() + 'Z', |
|||
), namespace=self.admin_namespace) |
|||
return self.sio.manager.__disconnect(sid, namespace, **kwargs) |
|||
|
|||
def _check_for_upgrade(self, eio_sid, sid, namespace): # pragma: no cover |
|||
for _ in range(5): |
|||
self.sio.sleep(5) |
|||
try: |
|||
if self.sio.eio._get_socket(eio_sid).upgraded: |
|||
self.sio.emit('socket_updated', { |
|||
'id': sid, |
|||
'nsp': namespace, |
|||
'transport': 'websocket', |
|||
}, namespace=self.admin_namespace) |
|||
break |
|||
except KeyError: |
|||
pass |
|||
|
|||
def _basic_enter_room(self, sid, namespace, room, eio_sid=None): |
|||
ret = self.sio.manager.__basic_enter_room(sid, namespace, room, |
|||
eio_sid) |
|||
if room: |
|||
self.sio.emit('room_joined', ( |
|||
namespace, |
|||
room, |
|||
sid, |
|||
datetime.utcnow().isoformat() + 'Z', |
|||
), namespace=self.admin_namespace) |
|||
return ret |
|||
|
|||
def _basic_leave_room(self, sid, namespace, room): |
|||
if room: |
|||
self.sio.emit('room_left', ( |
|||
namespace, |
|||
room, |
|||
sid, |
|||
datetime.utcnow().isoformat() + 'Z', |
|||
), namespace=self.admin_namespace) |
|||
return self.sio.manager.__basic_leave_room(sid, namespace, room) |
|||
|
|||
def _emit(self, event, data, namespace, room=None, skip_sid=None, |
|||
callback=None, **kwargs): |
|||
ret = self.sio.manager.__emit(event, data, namespace, room=room, |
|||
skip_sid=skip_sid, callback=callback, |
|||
**kwargs) |
|||
if namespace != self.admin_namespace: |
|||
event_data = [event] + list(data) if isinstance(data, tuple) \ |
|||
else [data] |
|||
if not isinstance(skip_sid, list): # pragma: no branch |
|||
skip_sid = [skip_sid] |
|||
for sid, _ in self.sio.manager.get_participants(namespace, room): |
|||
if sid not in skip_sid: |
|||
self.sio.emit('event_sent', ( |
|||
namespace, |
|||
sid, |
|||
event_data, |
|||
datetime.utcnow().isoformat() + 'Z', |
|||
), namespace=self.admin_namespace) |
|||
return ret |
|||
|
|||
def _handle_event_internal(self, server, sid, eio_sid, data, namespace, |
|||
id): |
|||
ret = self.sio.__handle_event_internal(server, sid, eio_sid, data, |
|||
namespace, id) |
|||
self.sio.emit('event_received', ( |
|||
namespace, |
|||
sid, |
|||
data, |
|||
datetime.utcnow().isoformat() + 'Z', |
|||
), namespace=self.admin_namespace) |
|||
return ret |
|||
|
|||
def _handle_eio_connect(self, eio_sid, environ): |
|||
if self.stop_stats_event is None: |
|||
self.stop_stats_event = self.sio.eio.create_event() |
|||
self.stats_task = self.sio.start_background_task( |
|||
self._emit_server_stats) |
|||
|
|||
self.event_buffer.push('rawConnection') |
|||
return self.sio._handle_eio_connect(eio_sid, environ) |
|||
|
|||
def _handle_eio_disconnect(self, eio_sid): |
|||
self.event_buffer.push('rawDisconnection') |
|||
return self.sio._handle_eio_disconnect(eio_sid) |
|||
|
|||
def _eio_http_response(self, packets=None, headers=None, jsonp_index=None): |
|||
ret = self.sio.eio.__ok(packets=packets, headers=headers, |
|||
jsonp_index=jsonp_index) |
|||
self.event_buffer.push('packetsOut') |
|||
self.event_buffer.push('bytesOut', len(ret['response'])) |
|||
return ret |
|||
|
|||
def _eio_handle_post_request(socket, self, environ): |
|||
ret = socket.__handle_post_request(environ) |
|||
self.event_buffer.push('packetsIn') |
|||
self.event_buffer.push( |
|||
'bytesIn', int(environ.get('CONTENT_LENGTH', 0))) |
|||
return ret |
|||
|
|||
def _eio_websocket_handler(socket, self, ws): |
|||
def _send(ws, data, *args, **kwargs): |
|||
self.event_buffer.push('packetsOut') |
|||
self.event_buffer.push('bytesOut', len(data)) |
|||
return ws.__send(data, *args, **kwargs) |
|||
|
|||
def _wait(ws): |
|||
ret = ws.__wait() |
|||
self.event_buffer.push('packetsIn') |
|||
self.event_buffer.push('bytesIn', len(ret or '')) |
|||
return ret |
|||
|
|||
ws.__send = ws.send |
|||
ws.send = functools.partial(_send, ws) |
|||
ws.__wait = ws.wait |
|||
ws.wait = functools.partial(_wait, ws) |
|||
return socket.__websocket_handler(ws) |
|||
|
|||
def _eio_send_ping(socket, self): # pragma: no cover |
|||
eio_sid = socket.sid |
|||
t = time.time() |
|||
for namespace in self.sio.manager.get_namespaces(): |
|||
sid = self.sio.manager.sid_from_eio_sid(eio_sid, namespace) |
|||
if sid: |
|||
serialized_socket = self.serialize_socket(sid, namespace, |
|||
eio_sid) |
|||
self.sio.emit('socket_connected', ( |
|||
serialized_socket, |
|||
datetime.utcfromtimestamp(t).isoformat() + 'Z', |
|||
), namespace=self.admin_namespace) |
|||
return socket.__send_ping() |
|||
|
|||
def _emit_server_stats(self): |
|||
start_time = time.time() |
|||
namespaces = list(self.sio.handlers.keys()) |
|||
namespaces.sort() |
|||
while not self.stop_stats_event.is_set(): |
|||
self.sio.sleep(self.server_stats_interval) |
|||
self.sio.emit('server_stats', { |
|||
'serverId': self.server_id, |
|||
'hostname': HOSTNAME, |
|||
'pid': PID, |
|||
'uptime': time.time() - start_time, |
|||
'clientsCount': len(self.sio.eio.sockets), |
|||
'pollingClientsCount': len( |
|||
[s for s in self.sio.eio.sockets.values() |
|||
if not s.upgraded]), |
|||
'aggregatedEvents': self.event_buffer.get_and_clear(), |
|||
'namespaces': [{ |
|||
'name': nsp, |
|||
'socketsCount': len(self.sio.manager.rooms.get( |
|||
nsp, {None: []}).get(None, [])) |
|||
} for nsp in namespaces], |
|||
}, namespace=self.admin_namespace) |
|||
|
|||
def serialize_socket(self, sid, namespace, eio_sid=None): |
|||
if eio_sid is None: # pragma: no cover |
|||
eio_sid = self.sio.manager.eio_sid_from_sid(sid) |
|||
socket = self.sio.eio._get_socket(eio_sid) |
|||
environ = self.sio.environ.get(eio_sid, {}) |
|||
tm = self.sio.manager._timestamps[sid] if sid in \ |
|||
self.sio.manager._timestamps else 0 |
|||
return { |
|||
'id': sid, |
|||
'clientId': eio_sid, |
|||
'transport': 'websocket' if socket.upgraded else 'polling', |
|||
'nsp': namespace, |
|||
'data': {}, |
|||
'handshake': { |
|||
'address': environ.get('REMOTE_ADDR', ''), |
|||
'headers': {k[5:].lower(): v for k, v in environ.items() |
|||
if k.startswith('HTTP_')}, |
|||
'query': {k: v[0] if len(v) == 1 else v for k, v in parse_qs( |
|||
environ.get('QUERY_STRING', '')).items()}, |
|||
'secure': environ.get('wsgi.url_scheme', '') == 'https', |
|||
'url': environ.get('PATH_INFO', ''), |
|||
'issued': tm * 1000, |
|||
'time': datetime.utcfromtimestamp(tm).isoformat() + 'Z' |
|||
if tm else '', |
|||
}, |
|||
'rooms': self.sio.manager.get_rooms(sid, namespace), |
|||
} |
@ -0,0 +1,398 @@ |
|||
import asyncio |
|||
from datetime import datetime |
|||
import functools |
|||
import os |
|||
import socket |
|||
import time |
|||
from urllib.parse import parse_qs |
|||
from .admin import EventBuffer |
|||
from .exceptions import ConnectionRefusedError |
|||
|
|||
HOSTNAME = socket.gethostname() |
|||
PID = os.getpid() |
|||
|
|||
|
|||
class InstrumentedAsyncServer: |
|||
def __init__(self, sio, auth=None, namespace='/admin', read_only=False, |
|||
server_id=None, mode='development', server_stats_interval=2): |
|||
"""Instrument the Socket.IO server for monitoring with the `Socket.IO |
|||
Admin UI <https://socket.io/docs/v4/admin-ui/>`_. |
|||
""" |
|||
if auth is None: |
|||
raise ValueError('auth must be specified') |
|||
self.sio = sio |
|||
self.auth = auth |
|||
self.admin_namespace = namespace |
|||
self.read_only = read_only |
|||
self.server_id = server_id or ( |
|||
self.sio.manager.host_id if hasattr(self.sio.manager, 'host_id') |
|||
else HOSTNAME |
|||
) |
|||
self.mode = mode |
|||
self.server_stats_interval = server_stats_interval |
|||
self.admin_queue = [] |
|||
self.event_buffer = EventBuffer() |
|||
|
|||
# task that emits "server_stats" every 2 seconds |
|||
self.stop_stats_event = None |
|||
self.stats_task = None |
|||
|
|||
# monkey-patch the server to report metrics to the admin UI |
|||
self.instrument() |
|||
|
|||
def instrument(self): |
|||
self.sio.on('connect', self.admin_connect, |
|||
namespace=self.admin_namespace) |
|||
|
|||
if self.mode == 'development': |
|||
if not self.read_only: # pragma: no branch |
|||
self.sio.on('emit', self.admin_emit, |
|||
namespace=self.admin_namespace) |
|||
self.sio.on('join', self.admin_enter_room, |
|||
namespace=self.admin_namespace) |
|||
self.sio.on('leave', self.admin_leave_room, |
|||
namespace=self.admin_namespace) |
|||
self.sio.on('_disconnect', self.admin_disconnect, |
|||
namespace=self.admin_namespace) |
|||
|
|||
# track socket connection times |
|||
self.sio.manager._timestamps = {} |
|||
|
|||
# report socket.io connections |
|||
self.sio.manager.__connect = self.sio.manager.connect |
|||
self.sio.manager.connect = self._connect |
|||
|
|||
# report socket.io disconnection |
|||
self.sio.manager.__disconnect = self.sio.manager.disconnect |
|||
self.sio.manager.disconnect = self._disconnect |
|||
|
|||
# report join rooms |
|||
self.sio.manager.__basic_enter_room = \ |
|||
self.sio.manager.basic_enter_room |
|||
self.sio.manager.basic_enter_room = self._basic_enter_room |
|||
|
|||
# report leave rooms |
|||
self.sio.manager.__basic_leave_room = \ |
|||
self.sio.manager.basic_leave_room |
|||
self.sio.manager.basic_leave_room = self._basic_leave_room |
|||
|
|||
# report emit events |
|||
self.sio.manager.__emit = self.sio.manager.emit |
|||
self.sio.manager.emit = self._emit |
|||
|
|||
# report receive events |
|||
self.sio.__handle_event_internal = self.sio._handle_event_internal |
|||
self.sio._handle_event_internal = self._handle_event_internal |
|||
|
|||
# report engine.io connections |
|||
self.sio.eio.on('connect', self._handle_eio_connect) |
|||
self.sio.eio.on('disconnect', self._handle_eio_disconnect) |
|||
|
|||
# report polling packets |
|||
from engineio.async_socket import AsyncSocket |
|||
self.sio.eio.__ok = self.sio.eio._ok |
|||
self.sio.eio._ok = self._eio_http_response |
|||
AsyncSocket.__handle_post_request = AsyncSocket.handle_post_request |
|||
AsyncSocket.handle_post_request = functools.partialmethod( |
|||
self.__class__._eio_handle_post_request, self) |
|||
|
|||
# report websocket packets |
|||
AsyncSocket.__websocket_handler = AsyncSocket._websocket_handler |
|||
AsyncSocket._websocket_handler = functools.partialmethod( |
|||
self.__class__._eio_websocket_handler, self) |
|||
|
|||
# report connected sockets with each ping |
|||
if self.mode == 'development': |
|||
AsyncSocket.__send_ping = AsyncSocket._send_ping |
|||
AsyncSocket._send_ping = functools.partialmethod( |
|||
self.__class__._eio_send_ping, self) |
|||
|
|||
def uninstrument(self): # pragma: no cover |
|||
if self.mode == 'development': |
|||
self.sio.manager.connect = self.sio.manager.__connect |
|||
self.sio.manager.disconnect = self.sio.manager.__disconnect |
|||
self.sio.manager.basic_enter_room = \ |
|||
self.sio.manager.__basic_enter_room |
|||
self.sio.manager.basic_leave_room = \ |
|||
self.sio.manager.__basic_leave_room |
|||
self.sio.manager.emit = self.sio.manager.__emit |
|||
self.sio._handle_event_internal = self.sio.__handle_event_internal |
|||
self.sio.eio._ok = self.sio.eio.__ok |
|||
|
|||
from engineio.async_socket import AsyncSocket |
|||
AsyncSocket.handle_post_request = AsyncSocket.__handle_post_request |
|||
AsyncSocket._websocket_handler = AsyncSocket.__websocket_handler |
|||
if self.mode == 'development': |
|||
AsyncSocket._send_ping = AsyncSocket.__send_ping |
|||
|
|||
async def admin_connect(self, sid, environ, client_auth): |
|||
authenticated = True |
|||
if self.auth: |
|||
authenticated = False |
|||
if isinstance(self.auth, dict): |
|||
authenticated = client_auth == self.auth |
|||
elif isinstance(self.auth, list): |
|||
authenticated = client_auth in self.auth |
|||
else: |
|||
if asyncio.iscoroutinefunction(self.auth): |
|||
authenticated = await self.auth(client_auth) |
|||
else: |
|||
authenticated = self.auth(client_auth) |
|||
if not authenticated: |
|||
raise ConnectionRefusedError('authentication failed') |
|||
|
|||
async def config(sid): |
|||
await self.sio.sleep(0.1) |
|||
|
|||
# supported features |
|||
features = ['AGGREGATED_EVENTS'] |
|||
if not self.read_only: |
|||
features += ['EMIT', 'JOIN', 'LEAVE', 'DISCONNECT', 'MJOIN', |
|||
'MLEAVE', 'MDISCONNECT'] |
|||
if self.mode == 'development': |
|||
features.append('ALL_EVENTS') |
|||
await self.sio.emit('config', {'supportedFeatures': features}, |
|||
to=sid, namespace=self.admin_namespace) |
|||
|
|||
# send current sockets |
|||
if self.mode == 'development': |
|||
all_sockets = [] |
|||
for nsp in self.sio.manager.get_namespaces(): |
|||
for sid, eio_sid in self.sio.manager.get_participants( |
|||
nsp, None): |
|||
all_sockets.append( |
|||
self.serialize_socket(sid, nsp, eio_sid)) |
|||
await self.sio.emit('all_sockets', all_sockets, to=sid, |
|||
namespace=self.admin_namespace) |
|||
|
|||
self.sio.start_background_task(config, sid) |
|||
self.stop_stats_event = self.sio.eio.create_event() |
|||
self.stats_task = self.sio.start_background_task( |
|||
self._emit_server_stats) |
|||
|
|||
async def admin_emit(self, _, namespace, room_filter, event, *data): |
|||
await self.sio.emit(event, data, to=room_filter, namespace=namespace) |
|||
|
|||
async def admin_enter_room(self, _, namespace, room, room_filter=None): |
|||
for sid, _ in self.sio.manager.get_participants( |
|||
namespace, room_filter): |
|||
await self.sio.enter_room(sid, room, namespace=namespace) |
|||
|
|||
async def admin_leave_room(self, _, namespace, room, room_filter=None): |
|||
for sid, _ in self.sio.manager.get_participants( |
|||
namespace, room_filter): |
|||
await self.sio.leave_room(sid, room, namespace=namespace) |
|||
|
|||
async def admin_disconnect(self, _, namespace, close, room_filter=None): |
|||
for sid, _ in self.sio.manager.get_participants( |
|||
namespace, room_filter): |
|||
await self.sio.disconnect(sid, namespace=namespace) |
|||
|
|||
async def shutdown(self): |
|||
if self.stats_task: # pragma: no branch |
|||
self.stop_stats_event.set() |
|||
await asyncio.gather(self.stats_task) |
|||
|
|||
async def _connect(self, eio_sid, namespace): |
|||
sid = await self.sio.manager.__connect(eio_sid, namespace) |
|||
t = time.time() |
|||
self.sio.manager._timestamps[sid] = t |
|||
serialized_socket = self.serialize_socket(sid, namespace, eio_sid) |
|||
await self.sio.emit('socket_connected', ( |
|||
serialized_socket, |
|||
datetime.utcfromtimestamp(t).isoformat() + 'Z', |
|||
), namespace=self.admin_namespace) |
|||
return sid |
|||
|
|||
async def _disconnect(self, sid, namespace, **kwargs): |
|||
del self.sio.manager._timestamps[sid] |
|||
await self.sio.emit('socket_disconnected', ( |
|||
namespace, |
|||
sid, |
|||
'N/A', |
|||
datetime.utcnow().isoformat() + 'Z', |
|||
), namespace=self.admin_namespace) |
|||
return await self.sio.manager.__disconnect(sid, namespace, **kwargs) |
|||
|
|||
async def _check_for_upgrade(self, eio_sid, sid, |
|||
namespace): # pragma: no cover |
|||
for _ in range(5): |
|||
await self.sio.sleep(5) |
|||
try: |
|||
if self.sio.eio._get_socket(eio_sid).upgraded: |
|||
await self.sio.emit('socket_updated', { |
|||
'id': sid, |
|||
'nsp': namespace, |
|||
'transport': 'websocket', |
|||
}, namespace=self.admin_namespace) |
|||
break |
|||
except KeyError: |
|||
pass |
|||
|
|||
def _basic_enter_room(self, sid, namespace, room, eio_sid=None): |
|||
ret = self.sio.manager.__basic_enter_room(sid, namespace, room, |
|||
eio_sid) |
|||
if room: |
|||
self.admin_queue.append(('room_joined', ( |
|||
namespace, |
|||
room, |
|||
sid, |
|||
datetime.utcnow().isoformat() + 'Z', |
|||
))) |
|||
return ret |
|||
|
|||
def _basic_leave_room(self, sid, namespace, room): |
|||
if room: |
|||
self.admin_queue.append(('room_left', ( |
|||
namespace, |
|||
room, |
|||
sid, |
|||
datetime.utcnow().isoformat() + 'Z', |
|||
))) |
|||
return self.sio.manager.__basic_leave_room(sid, namespace, room) |
|||
|
|||
async def _emit(self, event, data, namespace, room=None, skip_sid=None, |
|||
callback=None, **kwargs): |
|||
ret = await self.sio.manager.__emit( |
|||
event, data, namespace, room=room, skip_sid=skip_sid, |
|||
callback=callback, **kwargs) |
|||
if namespace != self.admin_namespace: |
|||
event_data = [event] + list(data) if isinstance(data, tuple) \ |
|||
else [data] |
|||
if not isinstance(skip_sid, list): # pragma: no branch |
|||
skip_sid = [skip_sid] |
|||
for sid, _ in self.sio.manager.get_participants(namespace, room): |
|||
if sid not in skip_sid: |
|||
await self.sio.emit('event_sent', ( |
|||
namespace, |
|||
sid, |
|||
event_data, |
|||
datetime.utcnow().isoformat() + 'Z', |
|||
), namespace=self.admin_namespace) |
|||
return ret |
|||
|
|||
async def _handle_event_internal(self, server, sid, eio_sid, data, |
|||
namespace, id): |
|||
ret = await self.sio.__handle_event_internal(server, sid, eio_sid, |
|||
data, namespace, id) |
|||
await self.sio.emit('event_received', ( |
|||
namespace, |
|||
sid, |
|||
data, |
|||
datetime.utcnow().isoformat() + 'Z', |
|||
), namespace=self.admin_namespace) |
|||
return ret |
|||
|
|||
async def _handle_eio_connect(self, eio_sid, environ): |
|||
if self.stop_stats_event is None: |
|||
self.stop_stats_event = self.sio.eio.create_event() |
|||
self.stats_task = self.sio.start_background_task( |
|||
self._emit_server_stats) |
|||
|
|||
self.event_buffer.push('rawConnection') |
|||
return await self.sio._handle_eio_connect(eio_sid, environ) |
|||
|
|||
async def _handle_eio_disconnect(self, eio_sid): |
|||
self.event_buffer.push('rawDisconnection') |
|||
return await self.sio._handle_eio_disconnect(eio_sid) |
|||
|
|||
def _eio_http_response(self, packets=None, headers=None, jsonp_index=None): |
|||
ret = self.sio.eio.__ok(packets=packets, headers=headers, |
|||
jsonp_index=jsonp_index) |
|||
self.event_buffer.push('packetsOut') |
|||
self.event_buffer.push('bytesOut', len(ret['response'])) |
|||
return ret |
|||
|
|||
async def _eio_handle_post_request(socket, self, environ): |
|||
ret = await socket.__handle_post_request(environ) |
|||
self.event_buffer.push('packetsIn') |
|||
self.event_buffer.push( |
|||
'bytesIn', int(environ.get('CONTENT_LENGTH', 0))) |
|||
return ret |
|||
|
|||
async def _eio_websocket_handler(socket, self, ws): |
|||
async def _send(ws, data): |
|||
self.event_buffer.push('packetsOut') |
|||
self.event_buffer.push('bytesOut', len(data)) |
|||
return await ws.__send(data) |
|||
|
|||
async def _wait(ws): |
|||
ret = await ws.__wait() |
|||
self.event_buffer.push('packetsIn') |
|||
self.event_buffer.push('bytesIn', len(ret or '')) |
|||
return ret |
|||
|
|||
ws.__send = ws.send |
|||
ws.send = functools.partial(_send, ws) |
|||
ws.__wait = ws.wait |
|||
ws.wait = functools.partial(_wait, ws) |
|||
return await socket.__websocket_handler(ws) |
|||
|
|||
async def _eio_send_ping(socket, self): # pragma: no cover |
|||
eio_sid = socket.sid |
|||
t = time.time() |
|||
for namespace in self.sio.manager.get_namespaces(): |
|||
sid = self.sio.manager.sid_from_eio_sid(eio_sid, namespace) |
|||
if sid: |
|||
serialized_socket = self.serialize_socket(sid, namespace, |
|||
eio_sid) |
|||
await self.sio.emit('socket_connected', ( |
|||
serialized_socket, |
|||
datetime.utcfromtimestamp(t).isoformat() + 'Z', |
|||
), namespace=self.admin_namespace) |
|||
return await socket.__send_ping() |
|||
|
|||
async def _emit_server_stats(self): |
|||
start_time = time.time() |
|||
namespaces = list(self.sio.handlers.keys()) |
|||
namespaces.sort() |
|||
while not self.stop_stats_event.is_set(): |
|||
await self.sio.sleep(self.server_stats_interval) |
|||
await self.sio.emit('server_stats', { |
|||
'serverId': self.server_id, |
|||
'hostname': HOSTNAME, |
|||
'pid': PID, |
|||
'uptime': time.time() - start_time, |
|||
'clientsCount': len(self.sio.eio.sockets), |
|||
'pollingClientsCount': len( |
|||
[s for s in self.sio.eio.sockets.values() |
|||
if not s.upgraded]), |
|||
'aggregatedEvents': self.event_buffer.get_and_clear(), |
|||
'namespaces': [{ |
|||
'name': nsp, |
|||
'socketsCount': len(self.sio.manager.rooms.get( |
|||
nsp, {None: []}).get(None, [])) |
|||
} for nsp in namespaces], |
|||
}, namespace=self.admin_namespace) |
|||
while self.admin_queue: |
|||
event, args = self.admin_queue.pop(0) |
|||
await self.sio.emit(event, args, |
|||
namespace=self.admin_namespace) |
|||
|
|||
def serialize_socket(self, sid, namespace, eio_sid=None): |
|||
if eio_sid is None: # pragma: no cover |
|||
eio_sid = self.sio.manager.eio_sid_from_sid(sid) |
|||
socket = self.sio.eio._get_socket(eio_sid) |
|||
environ = self.sio.environ.get(eio_sid, {}) |
|||
tm = self.sio.manager._timestamps[sid] if sid in \ |
|||
self.sio.manager._timestamps else 0 |
|||
return { |
|||
'id': sid, |
|||
'clientId': eio_sid, |
|||
'transport': 'websocket' if socket.upgraded else 'polling', |
|||
'nsp': namespace, |
|||
'data': {}, |
|||
'handshake': { |
|||
'address': environ.get('REMOTE_ADDR', ''), |
|||
'headers': {k[5:].lower(): v for k, v in environ.items() |
|||
if k.startswith('HTTP_')}, |
|||
'query': {k: v[0] if len(v) == 1 else v for k, v in parse_qs( |
|||
environ.get('QUERY_STRING', '')).items()}, |
|||
'secure': environ.get('wsgi.url_scheme', '') == 'https', |
|||
'url': environ.get('PATH_INFO', ''), |
|||
'issued': tm * 1000, |
|||
'time': datetime.utcfromtimestamp(tm).isoformat() + 'Z' |
|||
if tm else '', |
|||
}, |
|||
'rooms': self.sio.manager.get_rooms(sid, namespace), |
|||
} |
@ -0,0 +1,311 @@ |
|||
from functools import wraps |
|||
import threading |
|||
import time |
|||
from unittest import mock |
|||
import unittest |
|||
import pytest |
|||
try: |
|||
from engineio.async_socket import AsyncSocket as EngineIOSocket |
|||
except ImportError: |
|||
from engineio.asyncio_socket import AsyncSocket as EngineIOSocket |
|||
import socketio |
|||
from socketio.exceptions import ConnectionError |
|||
from tests.asyncio_web_server import SocketIOWebServer |
|||
from .helpers import AsyncMock |
|||
|
|||
|
|||
def with_instrumented_server(auth=False, **ikwargs): |
|||
"""This decorator can be applied to test functions or methods so that they |
|||
run with a Socket.IO server that has been instrumented for the official |
|||
Admin UI project. The arguments passed to the decorator are passed directly |
|||
to the ``instrument()`` method of the server. |
|||
""" |
|||
def decorator(f): |
|||
@wraps(f) |
|||
def wrapped(self, *args, **kwargs): |
|||
sio = socketio.AsyncServer(async_mode='asgi') |
|||
|
|||
@sio.event |
|||
async def enter_room(sid, data): |
|||
await sio.enter_room(sid, data) |
|||
|
|||
@sio.event |
|||
async def emit(sid, event): |
|||
await sio.emit(event, skip_sid=sid) |
|||
|
|||
@sio.event(namespace='/foo') |
|||
def connect(sid, environ, auth): |
|||
pass |
|||
|
|||
async def shutdown(): |
|||
await instrumented_server.shutdown() |
|||
await sio.shutdown() |
|||
|
|||
if 'server_stats_interval' not in ikwargs: |
|||
ikwargs['server_stats_interval'] = 0.25 |
|||
|
|||
instrumented_server = sio.instrument(auth=auth, **ikwargs) |
|||
server = SocketIOWebServer(sio, on_shutdown=shutdown) |
|||
server.start() |
|||
|
|||
# import logging |
|||
# logging.getLogger('engineio.client').setLevel(logging.DEBUG) |
|||
# logging.getLogger('socketio.client').setLevel(logging.DEBUG) |
|||
|
|||
original_schedule_ping = EngineIOSocket.schedule_ping |
|||
EngineIOSocket.schedule_ping = mock.MagicMock() |
|||
|
|||
try: |
|||
ret = f(self, instrumented_server, *args, **kwargs) |
|||
finally: |
|||
server.stop() |
|||
instrumented_server.uninstrument() |
|||
|
|||
EngineIOSocket.schedule_ping = original_schedule_ping |
|||
|
|||
# import logging |
|||
# logging.getLogger('engineio.client').setLevel(logging.NOTSET) |
|||
# logging.getLogger('socketio.client').setLevel(logging.NOTSET) |
|||
|
|||
return ret |
|||
return wrapped |
|||
return decorator |
|||
|
|||
|
|||
def _custom_auth(auth): |
|||
return auth == {'foo': 'bar'} |
|||
|
|||
|
|||
async def _async_custom_auth(auth): |
|||
return auth == {'foo': 'bar'} |
|||
|
|||
|
|||
class TestAsyncAdmin(unittest.TestCase): |
|||
def setUp(self): |
|||
print('threads at start:', threading.enumerate()) |
|||
self.thread_count = threading.active_count() |
|||
|
|||
def tearDown(self): |
|||
print('threads at end:', threading.enumerate()) |
|||
assert self.thread_count == threading.active_count() |
|||
|
|||
def test_missing_auth(self): |
|||
sio = socketio.AsyncServer(async_mode='asgi') |
|||
with pytest.raises(ValueError): |
|||
sio.instrument() |
|||
|
|||
@with_instrumented_server(auth=False) |
|||
def test_admin_connect_with_no_auth(self, isvr): |
|||
with socketio.SimpleClient() as admin_client: |
|||
admin_client.connect('http://localhost:8900', namespace='/admin') |
|||
with socketio.SimpleClient() as admin_client: |
|||
admin_client.connect('http://localhost:8900', namespace='/admin', |
|||
auth={'foo': 'bar'}) |
|||
|
|||
@with_instrumented_server(auth={'foo': 'bar'}) |
|||
def test_admin_connect_with_dict_auth(self, isvr): |
|||
with socketio.SimpleClient() as admin_client: |
|||
admin_client.connect('http://localhost:8900', namespace='/admin', |
|||
auth={'foo': 'bar'}) |
|||
with socketio.SimpleClient() as admin_client: |
|||
with pytest.raises(ConnectionError): |
|||
admin_client.connect( |
|||
'http://localhost:8900', namespace='/admin', |
|||
auth={'foo': 'baz'}) |
|||
with socketio.SimpleClient() as admin_client: |
|||
with pytest.raises(ConnectionError): |
|||
admin_client.connect( |
|||
'http://localhost:8900', namespace='/admin') |
|||
|
|||
@with_instrumented_server(auth=[{'foo': 'bar'}, |
|||
{'u': 'admin', 'p': 'secret'}]) |
|||
def test_admin_connect_with_list_auth(self, isvr): |
|||
with socketio.SimpleClient() as admin_client: |
|||
admin_client.connect('http://localhost:8900', namespace='/admin', |
|||
auth={'foo': 'bar'}) |
|||
with socketio.SimpleClient() as admin_client: |
|||
admin_client.connect('http://localhost:8900', namespace='/admin', |
|||
auth={'u': 'admin', 'p': 'secret'}) |
|||
with socketio.SimpleClient() as admin_client: |
|||
with pytest.raises(ConnectionError): |
|||
admin_client.connect('http://localhost:8900', |
|||
namespace='/admin', auth={'foo': 'baz'}) |
|||
with socketio.SimpleClient() as admin_client: |
|||
with pytest.raises(ConnectionError): |
|||
admin_client.connect('http://localhost:8900', |
|||
namespace='/admin') |
|||
|
|||
@with_instrumented_server(auth=_custom_auth) |
|||
def test_admin_connect_with_function_auth(self, isvr): |
|||
with socketio.SimpleClient() as admin_client: |
|||
admin_client.connect('http://localhost:8900', namespace='/admin', |
|||
auth={'foo': 'bar'}) |
|||
with socketio.SimpleClient() as admin_client: |
|||
with pytest.raises(ConnectionError): |
|||
admin_client.connect('http://localhost:8900', |
|||
namespace='/admin', auth={'foo': 'baz'}) |
|||
with socketio.SimpleClient() as admin_client: |
|||
with pytest.raises(ConnectionError): |
|||
admin_client.connect('http://localhost:8900', |
|||
namespace='/admin') |
|||
|
|||
@with_instrumented_server(auth=_async_custom_auth) |
|||
def test_admin_connect_with_async_function_auth(self, isvr): |
|||
with socketio.SimpleClient() as admin_client: |
|||
admin_client.connect('http://localhost:8900', namespace='/admin', |
|||
auth={'foo': 'bar'}) |
|||
with socketio.SimpleClient() as admin_client: |
|||
with pytest.raises(ConnectionError): |
|||
admin_client.connect('http://localhost:8900', |
|||
namespace='/admin', auth={'foo': 'baz'}) |
|||
with socketio.SimpleClient() as admin_client: |
|||
with pytest.raises(ConnectionError): |
|||
admin_client.connect('http://localhost:8900', |
|||
namespace='/admin') |
|||
|
|||
@with_instrumented_server() |
|||
def test_admin_connect_only_admin(self, isvr): |
|||
with socketio.SimpleClient() as admin_client: |
|||
admin_client.connect('http://localhost:8900', namespace='/admin') |
|||
sid = admin_client.sid |
|||
expected = ['config', 'all_sockets', 'server_stats'] |
|||
events = {} |
|||
while expected: |
|||
data = admin_client.receive(timeout=5) |
|||
if data[0] in expected: |
|||
events[data[0]] = data[1] |
|||
expected.remove(data[0]) |
|||
|
|||
assert 'supportedFeatures' in events['config'] |
|||
assert 'ALL_EVENTS' in events['config']['supportedFeatures'] |
|||
assert 'AGGREGATED_EVENTS' in events['config']['supportedFeatures'] |
|||
assert 'EMIT' in events['config']['supportedFeatures'] |
|||
assert len(events['all_sockets']) == 1 |
|||
assert events['all_sockets'][0]['id'] == sid |
|||
assert events['all_sockets'][0]['rooms'] == [sid] |
|||
assert events['server_stats']['clientsCount'] == 1 |
|||
assert events['server_stats']['pollingClientsCount'] == 0 |
|||
assert len(events['server_stats']['namespaces']) == 3 |
|||
assert {'name': '/', 'socketsCount': 0} in \ |
|||
events['server_stats']['namespaces'] |
|||
assert {'name': '/foo', 'socketsCount': 0} in \ |
|||
events['server_stats']['namespaces'] |
|||
assert {'name': '/admin', 'socketsCount': 1} in \ |
|||
events['server_stats']['namespaces'] |
|||
|
|||
@with_instrumented_server() |
|||
def test_admin_connect_with_others(self, isvr): |
|||
with socketio.SimpleClient() as client1, \ |
|||
socketio.SimpleClient() as client2, \ |
|||
socketio.SimpleClient() as client3, \ |
|||
socketio.SimpleClient() as admin_client: |
|||
client1.connect('http://localhost:8900') |
|||
client1.emit('enter_room', 'room') |
|||
sid1 = client1.sid |
|||
|
|||
saved_check_for_upgrade = isvr._check_for_upgrade |
|||
isvr._check_for_upgrade = AsyncMock() |
|||
client2.connect('http://localhost:8900', namespace='/foo', |
|||
transports=['polling']) |
|||
sid2 = client2.sid |
|||
isvr._check_for_upgrade = saved_check_for_upgrade |
|||
|
|||
client3.connect('http://localhost:8900', namespace='/admin') |
|||
sid3 = client3.sid |
|||
|
|||
admin_client.connect('http://localhost:8900', namespace='/admin') |
|||
sid = admin_client.sid |
|||
expected = ['config', 'all_sockets', 'server_stats'] |
|||
events = {} |
|||
while expected: |
|||
data = admin_client.receive(timeout=5) |
|||
if data[0] in expected: |
|||
events[data[0]] = data[1] |
|||
expected.remove(data[0]) |
|||
|
|||
assert 'supportedFeatures' in events['config'] |
|||
assert 'ALL_EVENTS' in events['config']['supportedFeatures'] |
|||
assert 'AGGREGATED_EVENTS' in events['config']['supportedFeatures'] |
|||
assert 'EMIT' in events['config']['supportedFeatures'] |
|||
assert len(events['all_sockets']) == 4 |
|||
assert events['server_stats']['clientsCount'] == 4 |
|||
assert events['server_stats']['pollingClientsCount'] == 1 |
|||
assert len(events['server_stats']['namespaces']) == 3 |
|||
assert {'name': '/', 'socketsCount': 1} in \ |
|||
events['server_stats']['namespaces'] |
|||
assert {'name': '/foo', 'socketsCount': 1} in \ |
|||
events['server_stats']['namespaces'] |
|||
assert {'name': '/admin', 'socketsCount': 2} in \ |
|||
events['server_stats']['namespaces'] |
|||
|
|||
for socket in events['all_sockets']: |
|||
if socket['id'] == sid: |
|||
assert socket['rooms'] == [sid] |
|||
elif socket['id'] == sid1: |
|||
assert socket['rooms'] == [sid1, 'room'] |
|||
elif socket['id'] == sid2: |
|||
assert socket['rooms'] == [sid2] |
|||
elif socket['id'] == sid3: |
|||
assert socket['rooms'] == [sid3] |
|||
|
|||
@with_instrumented_server(mode='production', read_only=True) |
|||
def test_admin_connect_production(self, isvr): |
|||
with socketio.SimpleClient() as admin_client: |
|||
admin_client.connect('http://localhost:8900', namespace='/admin') |
|||
expected = ['config', 'server_stats'] |
|||
events = {} |
|||
while expected: |
|||
data = admin_client.receive(timeout=5) |
|||
if data[0] in expected: |
|||
events[data[0]] = data[1] |
|||
expected.remove(data[0]) |
|||
|
|||
assert 'supportedFeatures' in events['config'] |
|||
assert 'ALL_EVENTS' not in events['config']['supportedFeatures'] |
|||
assert 'AGGREGATED_EVENTS' in events['config']['supportedFeatures'] |
|||
assert 'EMIT' not in events['config']['supportedFeatures'] |
|||
assert events['server_stats']['clientsCount'] == 1 |
|||
assert events['server_stats']['pollingClientsCount'] == 0 |
|||
assert len(events['server_stats']['namespaces']) == 3 |
|||
assert {'name': '/', 'socketsCount': 0} in \ |
|||
events['server_stats']['namespaces'] |
|||
assert {'name': '/foo', 'socketsCount': 0} in \ |
|||
events['server_stats']['namespaces'] |
|||
assert {'name': '/admin', 'socketsCount': 1} in \ |
|||
events['server_stats']['namespaces'] |
|||
|
|||
@with_instrumented_server() |
|||
def test_admin_features(self, isvr): |
|||
with socketio.SimpleClient() as client1, \ |
|||
socketio.SimpleClient() as client2, \ |
|||
socketio.SimpleClient() as admin_client: |
|||
client1.connect('http://localhost:8900') |
|||
client2.connect('http://localhost:8900') |
|||
admin_client.connect('http://localhost:8900', namespace='/admin') |
|||
|
|||
# emit from admin |
|||
admin_client.emit( |
|||
'emit', ('/', client1.sid, 'foo', {'bar': 'baz'}, 'extra')) |
|||
data = client1.receive(timeout=5) |
|||
assert data == ['foo', {'bar': 'baz'}, 'extra'] |
|||
|
|||
# emit from regular client |
|||
client1.emit('emit', 'foo') |
|||
data = client2.receive(timeout=5) |
|||
assert data == ['foo'] |
|||
|
|||
# join and leave |
|||
admin_client.emit('join', ('/', 'room', client1.sid)) |
|||
admin_client.emit( |
|||
'emit', ('/', 'room', 'foo', {'bar': 'baz'})) |
|||
data = client1.receive(timeout=5) |
|||
assert data == ['foo', {'bar': 'baz'}] |
|||
admin_client.emit('leave', ('/', 'room')) |
|||
|
|||
# disconnect |
|||
admin_client.emit('_disconnect', ('/', False, client1.sid)) |
|||
for _ in range(10): |
|||
if not client1.connected: |
|||
break |
|||
time.sleep(0.2) |
|||
assert not client1.connected |
@ -0,0 +1,57 @@ |
|||
import requests |
|||
import threading |
|||
import time |
|||
import uvicorn |
|||
import socketio |
|||
|
|||
|
|||
class SocketIOWebServer: |
|||
"""A simple web server used for running Socket.IO servers in tests. |
|||
|
|||
:param sio: a Socket.IO server instance. |
|||
|
|||
Note 1: This class is not production-ready and is intended for testing. |
|||
Note 2: This class only supports the "asgi" async_mode. |
|||
""" |
|||
def __init__(self, sio, on_shutdown=None): |
|||
if sio.async_mode != 'asgi': |
|||
raise ValueError('The async_mode must be "asgi"') |
|||
|
|||
async def http_app(scope, receive, send): |
|||
await send({'type': 'http.response.start', |
|||
'status': 200, |
|||
'headers': [('Content-Type', 'text/plain')]}) |
|||
await send({'type': 'http.response.body', |
|||
'body': b'OK'}) |
|||
|
|||
self.sio = sio |
|||
self.app = socketio.ASGIApp(sio, http_app, on_shutdown=on_shutdown) |
|||
self.httpd = None |
|||
self.thread = None |
|||
|
|||
def start(self, port=8900): |
|||
"""Start the web server. |
|||
|
|||
:param port: the port to listen on. Defaults to 8900. |
|||
|
|||
The server is started in a background thread. |
|||
""" |
|||
self.httpd = uvicorn.Server(config=uvicorn.Config(self.app, port=port)) |
|||
self.thread = threading.Thread(target=self.httpd.run) |
|||
self.thread.start() |
|||
|
|||
# wait for the server to start |
|||
while True: |
|||
try: |
|||
r = requests.get(f'http://localhost:{port}/') |
|||
r.raise_for_status() |
|||
if r.text == 'OK': |
|||
break |
|||
except: |
|||
time.sleep(0.1) |
|||
|
|||
def stop(self): |
|||
"""Stop the web server.""" |
|||
self.httpd.should_exit = True |
|||
self.thread.join() |
|||
self.thread = None |
@ -0,0 +1,286 @@ |
|||
from functools import wraps |
|||
import threading |
|||
import time |
|||
from unittest import mock |
|||
import unittest |
|||
import pytest |
|||
from engineio.socket import Socket as EngineIOSocket |
|||
import socketio |
|||
from socketio.exceptions import ConnectionError |
|||
from tests.web_server import SocketIOWebServer |
|||
|
|||
|
|||
def with_instrumented_server(auth=False, **ikwargs): |
|||
"""This decorator can be applied to test functions or methods so that they |
|||
run with a Socket.IO server that has been instrumented for the official |
|||
Admin UI project. The arguments passed to the decorator are passed directly |
|||
to the ``instrument()`` method of the server. |
|||
""" |
|||
def decorator(f): |
|||
@wraps(f) |
|||
def wrapped(self, *args, **kwargs): |
|||
sio = socketio.Server(async_mode='threading') |
|||
|
|||
@sio.event |
|||
def enter_room(sid, data): |
|||
sio.enter_room(sid, data) |
|||
|
|||
@sio.event |
|||
def emit(sid, event): |
|||
sio.emit(event, skip_sid=sid) |
|||
|
|||
@sio.event(namespace='/foo') |
|||
def connect(sid, environ, auth): |
|||
pass |
|||
|
|||
if 'server_stats_interval' not in ikwargs: |
|||
ikwargs['server_stats_interval'] = 0.25 |
|||
|
|||
instrumented_server = sio.instrument(auth=auth, **ikwargs) |
|||
server = SocketIOWebServer(sio) |
|||
server.start() |
|||
|
|||
# import logging |
|||
# logging.getLogger('engineio.client').setLevel(logging.DEBUG) |
|||
# logging.getLogger('socketio.client').setLevel(logging.DEBUG) |
|||
|
|||
original_schedule_ping = EngineIOSocket.schedule_ping |
|||
EngineIOSocket.schedule_ping = mock.MagicMock() |
|||
|
|||
try: |
|||
ret = f(self, instrumented_server, *args, **kwargs) |
|||
finally: |
|||
server.stop() |
|||
instrumented_server.shutdown() |
|||
instrumented_server.uninstrument() |
|||
|
|||
EngineIOSocket.schedule_ping = original_schedule_ping |
|||
|
|||
# import logging |
|||
# logging.getLogger('engineio.client').setLevel(logging.NOTSET) |
|||
# logging.getLogger('socketio.client').setLevel(logging.NOTSET) |
|||
|
|||
return ret |
|||
return wrapped |
|||
return decorator |
|||
|
|||
|
|||
def _custom_auth(auth): |
|||
return auth == {'foo': 'bar'} |
|||
|
|||
|
|||
class TestAdmin(unittest.TestCase): |
|||
def setUp(self): |
|||
print('threads at start:', threading.enumerate()) |
|||
self.thread_count = threading.active_count() |
|||
|
|||
def tearDown(self): |
|||
print('threads at end:', threading.enumerate()) |
|||
assert self.thread_count == threading.active_count() |
|||
|
|||
def test_missing_auth(self): |
|||
sio = socketio.Server(async_mode='threading') |
|||
with pytest.raises(ValueError): |
|||
sio.instrument() |
|||
|
|||
@with_instrumented_server(auth=False) |
|||
def test_admin_connect_with_no_auth(self, isvr): |
|||
with socketio.SimpleClient() as admin_client: |
|||
admin_client.connect('http://localhost:8900', namespace='/admin') |
|||
with socketio.SimpleClient() as admin_client: |
|||
admin_client.connect('http://localhost:8900', namespace='/admin', |
|||
auth={'foo': 'bar'}) |
|||
|
|||
@with_instrumented_server(auth={'foo': 'bar'}) |
|||
def test_admin_connect_with_dict_auth(self, isvr): |
|||
with socketio.SimpleClient() as admin_client: |
|||
admin_client.connect('http://localhost:8900', namespace='/admin', |
|||
auth={'foo': 'bar'}) |
|||
with socketio.SimpleClient() as admin_client: |
|||
with pytest.raises(ConnectionError): |
|||
admin_client.connect( |
|||
'http://localhost:8900', namespace='/admin', |
|||
auth={'foo': 'baz'}) |
|||
with socketio.SimpleClient() as admin_client: |
|||
with pytest.raises(ConnectionError): |
|||
admin_client.connect( |
|||
'http://localhost:8900', namespace='/admin') |
|||
|
|||
@with_instrumented_server(auth=[{'foo': 'bar'}, |
|||
{'u': 'admin', 'p': 'secret'}]) |
|||
def test_admin_connect_with_list_auth(self, isvr): |
|||
with socketio.SimpleClient() as admin_client: |
|||
admin_client.connect('http://localhost:8900', namespace='/admin', |
|||
auth={'foo': 'bar'}) |
|||
with socketio.SimpleClient() as admin_client: |
|||
admin_client.connect('http://localhost:8900', namespace='/admin', |
|||
auth={'u': 'admin', 'p': 'secret'}) |
|||
with socketio.SimpleClient() as admin_client: |
|||
with pytest.raises(ConnectionError): |
|||
admin_client.connect('http://localhost:8900', |
|||
namespace='/admin', auth={'foo': 'baz'}) |
|||
with socketio.SimpleClient() as admin_client: |
|||
with pytest.raises(ConnectionError): |
|||
admin_client.connect('http://localhost:8900', |
|||
namespace='/admin') |
|||
|
|||
@with_instrumented_server(auth=_custom_auth) |
|||
def test_admin_connect_with_function_auth(self, isvr): |
|||
with socketio.SimpleClient() as admin_client: |
|||
admin_client.connect('http://localhost:8900', namespace='/admin', |
|||
auth={'foo': 'bar'}) |
|||
with socketio.SimpleClient() as admin_client: |
|||
with pytest.raises(ConnectionError): |
|||
admin_client.connect('http://localhost:8900', |
|||
namespace='/admin', auth={'foo': 'baz'}) |
|||
with socketio.SimpleClient() as admin_client: |
|||
with pytest.raises(ConnectionError): |
|||
admin_client.connect('http://localhost:8900', |
|||
namespace='/admin') |
|||
|
|||
@with_instrumented_server() |
|||
def test_admin_connect_only_admin(self, isvr): |
|||
with socketio.SimpleClient() as admin_client: |
|||
admin_client.connect('http://localhost:8900', namespace='/admin') |
|||
sid = admin_client.sid |
|||
expected = ['config', 'all_sockets', 'server_stats'] |
|||
events = {} |
|||
while expected: |
|||
data = admin_client.receive(timeout=5) |
|||
if data[0] in expected: |
|||
events[data[0]] = data[1] |
|||
expected.remove(data[0]) |
|||
|
|||
assert 'supportedFeatures' in events['config'] |
|||
assert 'ALL_EVENTS' in events['config']['supportedFeatures'] |
|||
assert 'AGGREGATED_EVENTS' in events['config']['supportedFeatures'] |
|||
assert 'EMIT' in events['config']['supportedFeatures'] |
|||
assert len(events['all_sockets']) == 1 |
|||
assert events['all_sockets'][0]['id'] == sid |
|||
assert events['all_sockets'][0]['rooms'] == [sid] |
|||
assert events['server_stats']['clientsCount'] == 1 |
|||
assert events['server_stats']['pollingClientsCount'] == 0 |
|||
assert len(events['server_stats']['namespaces']) == 3 |
|||
assert {'name': '/', 'socketsCount': 0} in \ |
|||
events['server_stats']['namespaces'] |
|||
assert {'name': '/foo', 'socketsCount': 0} in \ |
|||
events['server_stats']['namespaces'] |
|||
assert {'name': '/admin', 'socketsCount': 1} in \ |
|||
events['server_stats']['namespaces'] |
|||
|
|||
@with_instrumented_server() |
|||
def test_admin_connect_with_others(self, isvr): |
|||
with socketio.SimpleClient() as client1, \ |
|||
socketio.SimpleClient() as client2, \ |
|||
socketio.SimpleClient() as client3, \ |
|||
socketio.SimpleClient() as admin_client: |
|||
client1.connect('http://localhost:8900') |
|||
client1.emit('enter_room', 'room') |
|||
sid1 = client1.sid |
|||
|
|||
saved_check_for_upgrade = isvr._check_for_upgrade |
|||
isvr._check_for_upgrade = mock.MagicMock() |
|||
client2.connect('http://localhost:8900', namespace='/foo', |
|||
transports=['polling']) |
|||
sid2 = client2.sid |
|||
isvr._check_for_upgrade = saved_check_for_upgrade |
|||
|
|||
client3.connect('http://localhost:8900', namespace='/admin') |
|||
sid3 = client3.sid |
|||
|
|||
admin_client.connect('http://localhost:8900', namespace='/admin') |
|||
sid = admin_client.sid |
|||
expected = ['config', 'all_sockets', 'server_stats'] |
|||
events = {} |
|||
while expected: |
|||
data = admin_client.receive(timeout=5) |
|||
if data[0] in expected: |
|||
events[data[0]] = data[1] |
|||
expected.remove(data[0]) |
|||
|
|||
assert 'supportedFeatures' in events['config'] |
|||
assert 'ALL_EVENTS' in events['config']['supportedFeatures'] |
|||
assert 'AGGREGATED_EVENTS' in events['config']['supportedFeatures'] |
|||
assert 'EMIT' in events['config']['supportedFeatures'] |
|||
assert len(events['all_sockets']) == 4 |
|||
assert events['server_stats']['clientsCount'] == 4 |
|||
assert events['server_stats']['pollingClientsCount'] == 1 |
|||
assert len(events['server_stats']['namespaces']) == 3 |
|||
assert {'name': '/', 'socketsCount': 1} in \ |
|||
events['server_stats']['namespaces'] |
|||
assert {'name': '/foo', 'socketsCount': 1} in \ |
|||
events['server_stats']['namespaces'] |
|||
assert {'name': '/admin', 'socketsCount': 2} in \ |
|||
events['server_stats']['namespaces'] |
|||
|
|||
for socket in events['all_sockets']: |
|||
if socket['id'] == sid: |
|||
assert socket['rooms'] == [sid] |
|||
elif socket['id'] == sid1: |
|||
assert socket['rooms'] == [sid1, 'room'] |
|||
elif socket['id'] == sid2: |
|||
assert socket['rooms'] == [sid2] |
|||
elif socket['id'] == sid3: |
|||
assert socket['rooms'] == [sid3] |
|||
|
|||
@with_instrumented_server(mode='production', read_only=True) |
|||
def test_admin_connect_production(self, isvr): |
|||
with socketio.SimpleClient() as admin_client: |
|||
admin_client.connect('http://localhost:8900', namespace='/admin') |
|||
expected = ['config', 'server_stats'] |
|||
events = {} |
|||
while expected: |
|||
data = admin_client.receive(timeout=5) |
|||
if data[0] in expected: |
|||
events[data[0]] = data[1] |
|||
expected.remove(data[0]) |
|||
|
|||
assert 'supportedFeatures' in events['config'] |
|||
assert 'ALL_EVENTS' not in events['config']['supportedFeatures'] |
|||
assert 'AGGREGATED_EVENTS' in events['config']['supportedFeatures'] |
|||
assert 'EMIT' not in events['config']['supportedFeatures'] |
|||
assert events['server_stats']['clientsCount'] == 1 |
|||
assert events['server_stats']['pollingClientsCount'] == 0 |
|||
assert len(events['server_stats']['namespaces']) == 3 |
|||
assert {'name': '/', 'socketsCount': 0} in \ |
|||
events['server_stats']['namespaces'] |
|||
assert {'name': '/foo', 'socketsCount': 0} in \ |
|||
events['server_stats']['namespaces'] |
|||
assert {'name': '/admin', 'socketsCount': 1} in \ |
|||
events['server_stats']['namespaces'] |
|||
|
|||
@with_instrumented_server() |
|||
def test_admin_features(self, isvr): |
|||
with socketio.SimpleClient() as client1, \ |
|||
socketio.SimpleClient() as client2, \ |
|||
socketio.SimpleClient() as admin_client: |
|||
client1.connect('http://localhost:8900') |
|||
client2.connect('http://localhost:8900') |
|||
admin_client.connect('http://localhost:8900', namespace='/admin') |
|||
|
|||
# emit from admin |
|||
admin_client.emit( |
|||
'emit', ('/', client1.sid, 'foo', {'bar': 'baz'}, 'extra')) |
|||
data = client1.receive(timeout=5) |
|||
assert data == ['foo', {'bar': 'baz'}, 'extra'] |
|||
|
|||
# emit from regular client |
|||
client1.emit('emit', 'foo') |
|||
data = client2.receive(timeout=5) |
|||
assert data == ['foo'] |
|||
|
|||
# join and leave |
|||
admin_client.emit('join', ('/', 'room', client1.sid)) |
|||
admin_client.emit( |
|||
'emit', ('/', 'room', 'foo', {'bar': 'baz'})) |
|||
data = client1.receive(timeout=5) |
|||
assert data == ['foo', {'bar': 'baz'}] |
|||
admin_client.emit('leave', ('/', 'room')) |
|||
|
|||
# disconnect |
|||
admin_client.emit('_disconnect', ('/', False, client1.sid)) |
|||
for _ in range(10): |
|||
if not client1.connected: |
|||
break |
|||
time.sleep(0.2) |
|||
assert not client1.connected |
@ -0,0 +1,81 @@ |
|||
import threading |
|||
import time |
|||
from socketserver import ThreadingMixIn |
|||
from wsgiref.simple_server import make_server, WSGIServer, WSGIRequestHandler |
|||
import requests |
|||
import socketio |
|||
|
|||
|
|||
class SocketIOWebServer: |
|||
"""A simple web server used for running Socket.IO servers in tests. |
|||
|
|||
:param sio: a Socket.IO server instance. |
|||
|
|||
Note 1: This class is not production-ready and is intended for testing. |
|||
Note 2: This class only supports the "threading" async_mode, with WebSocket |
|||
support provided by the simple-websocket package. |
|||
""" |
|||
def __init__(self, sio): |
|||
if sio.async_mode != 'threading': |
|||
raise ValueError('The async_mode must be "threading"') |
|||
|
|||
def http_app(environ, start_response): |
|||
start_response('200 OK', [('Content-Type', 'text/plain')]) |
|||
return [b'OK'] |
|||
|
|||
self.sio = sio |
|||
self.app = socketio.WSGIApp(sio, http_app) |
|||
self.httpd = None |
|||
self.thread = None |
|||
|
|||
def start(self, port=8900): |
|||
"""Start the web server. |
|||
|
|||
:param port: the port to listen on. Defaults to 8900. |
|||
|
|||
The server is started in a background thread. |
|||
""" |
|||
class ThreadingWSGIServer(ThreadingMixIn, WSGIServer): |
|||
pass |
|||
|
|||
class WebSocketRequestHandler(WSGIRequestHandler): |
|||
def get_environ(self): |
|||
env = super().get_environ() |
|||
|
|||
# pass the raw socket to the WSGI app so that it can be used |
|||
# by WebSocket connections (hack copied from gunicorn) |
|||
env['gunicorn.socket'] = self.connection |
|||
return env |
|||
|
|||
self.httpd = make_server('', port, self._app_wrapper, |
|||
ThreadingWSGIServer, WebSocketRequestHandler) |
|||
self.thread = threading.Thread(target=self.httpd.serve_forever) |
|||
self.thread.start() |
|||
|
|||
# wait for the server to start |
|||
while True: |
|||
try: |
|||
r = requests.get(f'http://localhost:{port}/') |
|||
r.raise_for_status() |
|||
if r.text == 'OK': |
|||
break |
|||
except: |
|||
time.sleep(0.1) |
|||
|
|||
def stop(self): |
|||
"""Stop the web server.""" |
|||
self.sio.shutdown() |
|||
self.httpd.shutdown() |
|||
self.httpd.server_close() |
|||
self.thread.join() |
|||
self.httpd = None |
|||
self.thread = None |
|||
|
|||
def _app_wrapper(self, environ, start_response): |
|||
try: |
|||
return self.app(environ, start_response) |
|||
except StopIteration: |
|||
# end the WebSocket request without sending a response |
|||
# (this is a hack that was copied from gunicorn's threaded worker) |
|||
start_response('200 OK', []) |
|||
return [] |
Loading…
Reference in new issue