Browse Source

Fix admin instrumentation to support disconnect reasons (Fixes #1423)

pull/1430/head
Miguel Grinberg 5 months ago
parent
commit
b75fd31625
Failed to extract signature
  1. 82
      src/socketio/admin.py
  2. 82
      src/socketio/async_admin.py

82
src/socketio/admin.py

@ -77,13 +77,9 @@ class InstrumentedServer:
# track socket connection times # track socket connection times
self.sio.manager._timestamps = {} self.sio.manager._timestamps = {}
# report socket.io connections # report socket.io connections, disconnections and received events
self.sio.manager.__connect = self.sio.manager.connect self.sio.__trigger_event = self.sio._trigger_event
self.sio.manager.connect = self._connect self.sio._trigger_event = self._trigger_event
# report socket.io disconnection
self.sio.manager.__disconnect = self.sio.manager.disconnect
self.sio.manager.disconnect = self._disconnect
# report join rooms # report join rooms
self.sio.manager.__basic_enter_room = \ self.sio.manager.__basic_enter_room = \
@ -99,10 +95,6 @@ class InstrumentedServer:
self.sio.manager.__emit = self.sio.manager.emit self.sio.manager.__emit = self.sio.manager.emit
self.sio.manager.emit = self._emit self.sio.manager.emit = self._emit
# report receive events
self.sio.__handle_event_internal = self.sio._handle_event_internal
self.sio._handle_event_internal = self._handle_event_internal
# report engine.io connections # report engine.io connections
self.sio.eio.on('connect', self._handle_eio_connect) self.sio.eio.on('connect', self._handle_eio_connect)
self.sio.eio.on('disconnect', self._handle_eio_disconnect) self.sio.eio.on('disconnect', self._handle_eio_disconnect)
@ -128,14 +120,12 @@ class InstrumentedServer:
def uninstrument(self): # pragma: no cover def uninstrument(self): # pragma: no cover
if self.mode == 'development': if self.mode == 'development':
self.sio.manager.connect = self.sio.manager.__connect self.sio._trigger_event = self.sio.__trigger_event
self.sio.manager.disconnect = self.sio.manager.__disconnect
self.sio.manager.basic_enter_room = \ self.sio.manager.basic_enter_room = \
self.sio.manager.__basic_enter_room self.sio.manager.__basic_enter_room
self.sio.manager.basic_leave_room = \ self.sio.manager.basic_leave_room = \
self.sio.manager.__basic_leave_room self.sio.manager.__basic_leave_room
self.sio.manager.emit = self.sio.manager.__emit self.sio.manager.emit = self.sio.manager.__emit
self.sio._handle_event_internal = self.sio.__handle_event_internal
self.sio.eio._ok = self.sio.eio.__ok self.sio.eio._ok = self.sio.eio.__ok
from engineio.socket import Socket from engineio.socket import Socket
@ -205,26 +195,34 @@ class InstrumentedServer:
self.stop_stats_event.set() self.stop_stats_event.set()
self.stats_task.join() self.stats_task.join()
def _connect(self, eio_sid, namespace): def _trigger_event(self, event, namespace, *args):
sid = self.sio.manager.__connect(eio_sid, namespace)
t = time.time() t = time.time()
self.sio.manager._timestamps[sid] = t sid = args[0]
serialized_socket = self.serialize_socket(sid, namespace, eio_sid) if event == 'connect':
self.sio.emit('socket_connected', ( eio_sid = self.sio.manager.eio_sid_from_sid(sid, namespace)
serialized_socket, self.sio.manager._timestamps[sid] = t
datetime.utcfromtimestamp(t).isoformat() + 'Z', serialized_socket = self.serialize_socket(sid, namespace, eio_sid)
), namespace=self.admin_namespace) self.sio.emit('socket_connected', (
return sid serialized_socket,
datetime.utcfromtimestamp(t).isoformat() + 'Z',
def _disconnect(self, sid, namespace, **kwargs): ), namespace=self.admin_namespace)
del self.sio.manager._timestamps[sid] elif event == 'disconnect':
self.sio.emit('socket_disconnected', ( del self.sio.manager._timestamps[sid]
namespace, reason = args[1]
sid, self.sio.emit('socket_disconnected', (
'N/A', namespace,
datetime.utcnow().isoformat() + 'Z', sid,
), namespace=self.admin_namespace) reason,
return self.sio.manager.__disconnect(sid, namespace, **kwargs) datetime.utcfromtimestamp(t).isoformat() + 'Z',
), namespace=self.admin_namespace)
else:
self.sio.emit('event_received', (
namespace,
sid,
(event, *args[1:]),
datetime.utcfromtimestamp(t).isoformat() + 'Z',
), namespace=self.admin_namespace)
return self.sio.__trigger_event(event, namespace, *args)
def _check_for_upgrade(self, eio_sid, sid, namespace): # pragma: no cover def _check_for_upgrade(self, eio_sid, sid, namespace): # pragma: no cover
for _ in range(5): for _ in range(5):
@ -269,7 +267,7 @@ class InstrumentedServer:
**kwargs) **kwargs)
if namespace != self.admin_namespace: if namespace != self.admin_namespace:
event_data = [event] + list(data) if isinstance(data, tuple) \ event_data = [event] + list(data) if isinstance(data, tuple) \
else [data] else [event, data]
if not isinstance(skip_sid, list): # pragma: no branch if not isinstance(skip_sid, list): # pragma: no branch
skip_sid = [skip_sid] skip_sid = [skip_sid]
for sid, _ in self.sio.manager.get_participants(namespace, room): for sid, _ in self.sio.manager.get_participants(namespace, room):
@ -282,18 +280,6 @@ class InstrumentedServer:
), namespace=self.admin_namespace) ), namespace=self.admin_namespace)
return ret return ret
def _handle_event_internal(self, server, sid, eio_sid, data, namespace,
id):
ret = self.sio.__handle_event_internal(server, sid, eio_sid, data,
namespace, id)
self.sio.emit('event_received', (
namespace,
sid,
data,
datetime.utcnow().isoformat() + 'Z',
), namespace=self.admin_namespace)
return ret
def _handle_eio_connect(self, eio_sid, environ): def _handle_eio_connect(self, eio_sid, environ):
if self.stop_stats_event is None: if self.stop_stats_event is None:
self.stop_stats_event = self.sio.eio.create_event() self.stop_stats_event = self.sio.eio.create_event()
@ -303,9 +289,9 @@ class InstrumentedServer:
self.event_buffer.push('rawConnection') self.event_buffer.push('rawConnection')
return self.sio._handle_eio_connect(eio_sid, environ) return self.sio._handle_eio_connect(eio_sid, environ)
def _handle_eio_disconnect(self, eio_sid): def _handle_eio_disconnect(self, eio_sid, reason):
self.event_buffer.push('rawDisconnection') self.event_buffer.push('rawDisconnection')
return self.sio._handle_eio_disconnect(eio_sid) return self.sio._handle_eio_disconnect(eio_sid, reason)
def _eio_http_response(self, packets=None, headers=None, jsonp_index=None): def _eio_http_response(self, packets=None, headers=None, jsonp_index=None):
ret = self.sio.eio.__ok(packets=packets, headers=headers, ret = self.sio.eio.__ok(packets=packets, headers=headers,

82
src/socketio/async_admin.py

@ -58,13 +58,9 @@ class InstrumentedAsyncServer:
# track socket connection times # track socket connection times
self.sio.manager._timestamps = {} self.sio.manager._timestamps = {}
# report socket.io connections # report socket.io connections, disconnections and received events
self.sio.manager.__connect = self.sio.manager.connect self.sio.__trigger_event = self.sio._trigger_event
self.sio.manager.connect = self._connect self.sio._trigger_event = self._trigger_event
# report socket.io disconnection
self.sio.manager.__disconnect = self.sio.manager.disconnect
self.sio.manager.disconnect = self._disconnect
# report join rooms # report join rooms
self.sio.manager.__basic_enter_room = \ self.sio.manager.__basic_enter_room = \
@ -80,10 +76,6 @@ class InstrumentedAsyncServer:
self.sio.manager.__emit = self.sio.manager.emit self.sio.manager.__emit = self.sio.manager.emit
self.sio.manager.emit = self._emit self.sio.manager.emit = self._emit
# report receive events
self.sio.__handle_event_internal = self.sio._handle_event_internal
self.sio._handle_event_internal = self._handle_event_internal
# report engine.io connections # report engine.io connections
self.sio.eio.on('connect', self._handle_eio_connect) self.sio.eio.on('connect', self._handle_eio_connect)
self.sio.eio.on('disconnect', self._handle_eio_disconnect) self.sio.eio.on('disconnect', self._handle_eio_disconnect)
@ -109,14 +101,12 @@ class InstrumentedAsyncServer:
def uninstrument(self): # pragma: no cover def uninstrument(self): # pragma: no cover
if self.mode == 'development': if self.mode == 'development':
self.sio.manager.connect = self.sio.manager.__connect self.sio._trigger_event = self.sio.__trigger_event
self.sio.manager.disconnect = self.sio.manager.__disconnect
self.sio.manager.basic_enter_room = \ self.sio.manager.basic_enter_room = \
self.sio.manager.__basic_enter_room self.sio.manager.__basic_enter_room
self.sio.manager.basic_leave_room = \ self.sio.manager.basic_leave_room = \
self.sio.manager.__basic_leave_room self.sio.manager.__basic_leave_room
self.sio.manager.emit = self.sio.manager.__emit self.sio.manager.emit = self.sio.manager.__emit
self.sio._handle_event_internal = self.sio.__handle_event_internal
self.sio.eio._ok = self.sio.eio.__ok self.sio.eio._ok = self.sio.eio.__ok
from engineio.async_socket import AsyncSocket from engineio.async_socket import AsyncSocket
@ -193,26 +183,34 @@ class InstrumentedAsyncServer:
self.stop_stats_event.set() self.stop_stats_event.set()
await asyncio.gather(self.stats_task) await asyncio.gather(self.stats_task)
async def _connect(self, eio_sid, namespace): async def _trigger_event(self, event, namespace, *args):
sid = await self.sio.manager.__connect(eio_sid, namespace)
t = time.time() t = time.time()
self.sio.manager._timestamps[sid] = t sid = args[0]
serialized_socket = self.serialize_socket(sid, namespace, eio_sid) if event == 'connect':
await self.sio.emit('socket_connected', ( eio_sid = self.sio.manager.eio_sid_from_sid(sid, namespace)
serialized_socket, self.sio.manager._timestamps[sid] = t
datetime.utcfromtimestamp(t).isoformat() + 'Z', serialized_socket = self.serialize_socket(sid, namespace, eio_sid)
), namespace=self.admin_namespace) await self.sio.emit('socket_connected', (
return sid serialized_socket,
datetime.utcfromtimestamp(t).isoformat() + 'Z',
async def _disconnect(self, sid, namespace, **kwargs): ), namespace=self.admin_namespace)
del self.sio.manager._timestamps[sid] elif event == 'disconnect':
await self.sio.emit('socket_disconnected', ( del self.sio.manager._timestamps[sid]
namespace, reason = args[1]
sid, await self.sio.emit('socket_disconnected', (
'N/A', namespace,
datetime.utcnow().isoformat() + 'Z', sid,
), namespace=self.admin_namespace) reason,
return await self.sio.manager.__disconnect(sid, namespace, **kwargs) datetime.utcfromtimestamp(t).isoformat() + 'Z',
), namespace=self.admin_namespace)
else:
await self.sio.emit('event_received', (
namespace,
sid,
(event, *args[1:]),
datetime.utcfromtimestamp(t).isoformat() + 'Z',
), namespace=self.admin_namespace)
return await self.sio.__trigger_event(event, namespace, *args)
async def _check_for_upgrade(self, eio_sid, sid, async def _check_for_upgrade(self, eio_sid, sid,
namespace): # pragma: no cover namespace): # pragma: no cover
@ -258,7 +256,7 @@ class InstrumentedAsyncServer:
callback=callback, **kwargs) callback=callback, **kwargs)
if namespace != self.admin_namespace: if namespace != self.admin_namespace:
event_data = [event] + list(data) if isinstance(data, tuple) \ event_data = [event] + list(data) if isinstance(data, tuple) \
else [data] else [event, data]
if not isinstance(skip_sid, list): # pragma: no branch if not isinstance(skip_sid, list): # pragma: no branch
skip_sid = [skip_sid] skip_sid = [skip_sid]
for sid, _ in self.sio.manager.get_participants(namespace, room): for sid, _ in self.sio.manager.get_participants(namespace, room):
@ -271,18 +269,6 @@ class InstrumentedAsyncServer:
), namespace=self.admin_namespace) ), namespace=self.admin_namespace)
return ret return ret
async def _handle_event_internal(self, server, sid, eio_sid, data,
namespace, id):
ret = await self.sio.__handle_event_internal(server, sid, eio_sid,
data, namespace, id)
await self.sio.emit('event_received', (
namespace,
sid,
data,
datetime.utcnow().isoformat() + 'Z',
), namespace=self.admin_namespace)
return ret
async def _handle_eio_connect(self, eio_sid, environ): async def _handle_eio_connect(self, eio_sid, environ):
if self.stop_stats_event is None: if self.stop_stats_event is None:
self.stop_stats_event = self.sio.eio.create_event() self.stop_stats_event = self.sio.eio.create_event()
@ -292,9 +278,9 @@ class InstrumentedAsyncServer:
self.event_buffer.push('rawConnection') self.event_buffer.push('rawConnection')
return await self.sio._handle_eio_connect(eio_sid, environ) return await self.sio._handle_eio_connect(eio_sid, environ)
async def _handle_eio_disconnect(self, eio_sid): async def _handle_eio_disconnect(self, eio_sid, reason):
self.event_buffer.push('rawDisconnection') self.event_buffer.push('rawDisconnection')
return await self.sio._handle_eio_disconnect(eio_sid) return await self.sio._handle_eio_disconnect(eio_sid, reason)
def _eio_http_response(self, packets=None, headers=None, jsonp_index=None): def _eio_http_response(self, packets=None, headers=None, jsonp_index=None):
ret = self.sio.eio.__ok(packets=packets, headers=headers, ret = self.sio.eio.__ok(packets=packets, headers=headers,

Loading…
Cancel
Save