|
|
@ -6,6 +6,7 @@ import socket |
|
|
|
import time |
|
|
|
from urllib.parse import parse_qs |
|
|
|
from .admin import EventBuffer |
|
|
|
from .exceptions import ConnectionRefusedError |
|
|
|
|
|
|
|
HOSTNAME = socket.gethostname() |
|
|
|
PID = os.getpid() |
|
|
@ -73,8 +74,8 @@ class InstrumentedAsyncServer: |
|
|
|
self.sio.manager.leave_room = self._leave_room |
|
|
|
|
|
|
|
# report emit events |
|
|
|
self.sio.__emit_internal = self.sio._emit_internal |
|
|
|
self.sio._emit_internal = self._emit_internal |
|
|
|
self.sio.manager.__emit = self.sio.manager.emit |
|
|
|
self.sio.manager.emit = self._emit |
|
|
|
|
|
|
|
# report receive events |
|
|
|
self.sio.__handle_event_internal = self.sio._handle_event_internal |
|
|
@ -98,13 +99,19 @@ class InstrumentedAsyncServer: |
|
|
|
|
|
|
|
async def admin_connect(self, sid, environ, client_auth): |
|
|
|
authenticated = True |
|
|
|
if self.auth: |
|
|
|
if asyncio.iscoroutinefunction(self.auth): |
|
|
|
authenticated = await self.auth(client_auth) |
|
|
|
if self.auth != None: |
|
|
|
authenticated = False |
|
|
|
if isinstance(self.auth, dict): |
|
|
|
authenticated = client_auth == self.auth |
|
|
|
elif isinstance(self.auth, list): |
|
|
|
authenticated = client_auth in self.auth |
|
|
|
else: |
|
|
|
authenticated = self.auth(client_auth) |
|
|
|
if not authenticated: |
|
|
|
raise ConnectionRefusedError('Invalid credentials') |
|
|
|
if asyncio.iscoroutinefunction(self.auth): |
|
|
|
authenticated = await self.auth(client_auth) |
|
|
|
else: |
|
|
|
authenticated = self.auth(client_auth) |
|
|
|
if not authenticated: |
|
|
|
raise ConnectionRefusedError('authentication failed') |
|
|
|
|
|
|
|
async def config(sid): |
|
|
|
await self.sio.sleep(0.1) |
|
|
@ -137,7 +144,6 @@ class InstrumentedAsyncServer: |
|
|
|
await self.sio.emit(event, data, to=room_filter, namespace=namespace) |
|
|
|
|
|
|
|
def admin_enter_room(self, _, namespace, room, room_filter=None): |
|
|
|
print(namespace, room, room_filter) |
|
|
|
for sid, _ in self.sio.manager.get_participants( |
|
|
|
namespace, room_filter): |
|
|
|
self.sio.enter_room(sid, room, namespace=namespace) |
|
|
@ -169,13 +175,16 @@ class InstrumentedAsyncServer: |
|
|
|
async def check_for_upgrade(): |
|
|
|
for _ in range(5): |
|
|
|
await self.sio.sleep(5) |
|
|
|
if self.sio.eio._get_socket(eio_sid).upgraded: |
|
|
|
await self.sio.emit('socket_updated', { |
|
|
|
'id': sid, |
|
|
|
'nsp': namespace, |
|
|
|
'transport': 'websocket', |
|
|
|
}, namespace=self.admin_namespace) |
|
|
|
break |
|
|
|
try: |
|
|
|
if self.sio.eio._get_socket(eio_sid).upgraded: |
|
|
|
await self.sio.emit('socket_updated', { |
|
|
|
'id': sid, |
|
|
|
'nsp': namespace, |
|
|
|
'transport': 'websocket', |
|
|
|
}, namespace=self.admin_namespace) |
|
|
|
break |
|
|
|
except KeyError: |
|
|
|
pass |
|
|
|
|
|
|
|
if serialized_socket['transport'] == 'polling': |
|
|
|
self.sio.start_background_task(check_for_upgrade) |
|
|
@ -212,18 +221,24 @@ class InstrumentedAsyncServer: |
|
|
|
))) |
|
|
|
return self.sio.manager.__leave_room(sid, namespace, room) |
|
|
|
|
|
|
|
async def _emit_internal(self, eio_sid, event, data, namespace=None, |
|
|
|
id=None): |
|
|
|
ret = await self.sio.__emit_internal(eio_sid, event, data, |
|
|
|
namespace=namespace, id=id) |
|
|
|
async def _emit(self, event, data, namespace, room=None, skip_sid=None, |
|
|
|
callback=None, **kwargs): |
|
|
|
ret = await self.sio.manager.__emit(event, data, namespace, room=room, |
|
|
|
skip_sid=skip_sid, callback=callback, |
|
|
|
**kwargs) |
|
|
|
if namespace != self.admin_namespace: |
|
|
|
sid = self.sio.manager.sid_from_eio_sid(eio_sid, namespace) |
|
|
|
await self.sio.emit('event_sent', ( |
|
|
|
namespace, |
|
|
|
sid, |
|
|
|
[event] + list(data) if isinstance(data, tuple) else [data], |
|
|
|
datetime.utcnow().isoformat() + 'Z', |
|
|
|
), namespace=self.admin_namespace) |
|
|
|
event_data = [event] + list(data) if isinstance(data, tuple) \ |
|
|
|
else [data] |
|
|
|
if not isinstance(skip_sid, list): |
|
|
|
skip_sid = [skip_sid] |
|
|
|
for sid, _ in self.sio.manager.get_participants(namespace, room): |
|
|
|
if sid not in skip_sid: |
|
|
|
await self.sio.emit('event_sent', ( |
|
|
|
namespace, |
|
|
|
sid, |
|
|
|
event_data, |
|
|
|
datetime.utcnow().isoformat() + 'Z', |
|
|
|
), namespace=self.admin_namespace) |
|
|
|
return ret |
|
|
|
|
|
|
|
async def _handle_event_internal(self, server, sid, eio_sid, data, |
|
|
@ -269,7 +284,7 @@ class InstrumentedAsyncServer: |
|
|
|
async def _wait(ws): |
|
|
|
ret = await ws.__wait() |
|
|
|
self.event_buffer.push('packetsIn') |
|
|
|
self.event_buffer.push('bytesIn', len(ret)) |
|
|
|
self.event_buffer.push('bytesIn', len(ret or '')) |
|
|
|
return ret |
|
|
|
|
|
|
|
ws.__send = ws.send |
|
|
|