From 9cc4177442943e454974f83e911e3ccfebd3cb74 Mon Sep 17 00:00:00 2001 From: Miguel Grinberg Date: Sat, 14 Oct 2023 20:23:48 +0100 Subject: [PATCH] concurrency fine tuning --- src/socketio/admin.py | 17 +++++++++++------ src/socketio/async_admin.py | 20 ++++++++++++++------ tests/async/test_admin.py | 2 +- tests/common/test_admin.py | 2 +- 4 files changed, 27 insertions(+), 14 deletions(-) diff --git a/src/socketio/admin.py b/src/socketio/admin.py index eb14475..2973404 100644 --- a/src/socketio/admin.py +++ b/src/socketio/admin.py @@ -51,14 +51,13 @@ class InstrumentedServer: self.mode = mode self.event_buffer = EventBuffer() + # task that emits "server_stats" every 2 seconds + self.stop_stats_event = None + self.stats_task = None + # monkey-patch the server to report metrics to the admin UI self.instrument() - # start thread that emits "server_stats" every 2 seconds - self.stop_stats_event = sio.eio.create_event() - self.stats_task = self.sio.start_background_task( - self._emit_server_stats) - def instrument(self): self.sio.on('connect', self.admin_connect, namespace=self.admin_namespace) @@ -141,7 +140,8 @@ class InstrumentedServer: from engineio.socket import Socket Socket.handle_post_request = Socket.__handle_post_request Socket._websocket_handler = Socket.__websocket_handler - Socket.send_ping = Socket.__send_ping + if self.mode == 'development': + Socket._send_ping = Socket.__send_ping def admin_connect(self, sid, environ, client_auth): if self.auth: @@ -294,6 +294,11 @@ class InstrumentedServer: return ret def _handle_eio_connect(self, eio_sid, environ): + if self.stop_stats_event is None: + self.stop_stats_event = self.sio.eio.create_event() + self.stats_task = self.sio.start_background_task( + self._emit_server_stats) + self.event_buffer.push('rawConnection') return self.sio._handle_eio_connect(eio_sid, environ) diff --git a/src/socketio/async_admin.py b/src/socketio/async_admin.py index 5cb638e..b8bb0ed 100644 --- a/src/socketio/async_admin.py +++ b/src/socketio/async_admin.py @@ -32,14 +32,13 @@ class InstrumentedAsyncServer: self.admin_queue = [] self.event_buffer = EventBuffer() + # task that emits "server_stats" every 2 seconds + self.stop_stats_event = None + self.stats_task = None + # monkey-patch the server to report metrics to the admin UI self.instrument() - # start thread that emits "server_stats" every 2 seconds - self.stop_stats_event = sio.eio.create_event() - self.stats_task = self.sio.start_background_task( - self._emit_server_stats) - def instrument(self): self.sio.on('connect', self.admin_connect, namespace=self.admin_namespace) @@ -122,7 +121,8 @@ class InstrumentedAsyncServer: from engineio.async_socket import AsyncSocket AsyncSocket.handle_post_request = AsyncSocket.__handle_post_request AsyncSocket._websocket_handler = AsyncSocket.__websocket_handler - AsyncSocket.schedule_ping = AsyncSocket.__schedule_ping + if self.mode == 'development': + AsyncSocket._send_ping = AsyncSocket.__send_ping async def admin_connect(self, sid, environ, client_auth): authenticated = True @@ -165,6 +165,9 @@ class InstrumentedAsyncServer: namespace=self.admin_namespace) self.sio.start_background_task(config, sid) + self.stop_stats_event = self.sio.eio.create_event() + self.stats_task = self.sio.start_background_task( + self._emit_server_stats) async def admin_emit(self, _, namespace, room_filter, event, *data): await self.sio.emit(event, data, to=room_filter, namespace=namespace) @@ -280,6 +283,11 @@ class InstrumentedAsyncServer: return ret async def _handle_eio_connect(self, eio_sid, environ): + if self.stop_stats_event is None: + self.stop_stats_event = self.sio.eio.create_event() + self.stats_task = self.sio.start_background_task( + self._emit_server_stats) + self.event_buffer.push('rawConnection') return await self.sio._handle_eio_connect(eio_sid, environ) diff --git a/tests/async/test_admin.py b/tests/async/test_admin.py index 2822fd1..cdc93ba 100644 --- a/tests/async/test_admin.py +++ b/tests/async/test_admin.py @@ -24,7 +24,6 @@ def with_instrumented_server(auth=False, **ikwargs): @wraps(f) def wrapped(self, *args, **kwargs): sio = socketio.AsyncServer(async_mode='asgi') - instrumented_server = sio.instrument(auth=auth, **ikwargs) @sio.event async def enter_room(sid, data): @@ -42,6 +41,7 @@ def with_instrumented_server(auth=False, **ikwargs): await instrumented_server.shutdown() await sio.shutdown() + instrumented_server = sio.instrument(auth=auth, **ikwargs) server = SocketIOWebServer(sio, on_shutdown=shutdown) server.start() diff --git a/tests/common/test_admin.py b/tests/common/test_admin.py index 037b547..e65d3cf 100644 --- a/tests/common/test_admin.py +++ b/tests/common/test_admin.py @@ -20,7 +20,6 @@ def with_instrumented_server(auth=False, **ikwargs): @wraps(f) def wrapped(self, *args, **kwargs): sio = socketio.Server(async_mode='threading') - instrumented_server = sio.instrument(auth=auth, **ikwargs) @sio.event def enter_room(sid, data): @@ -34,6 +33,7 @@ def with_instrumented_server(auth=False, **ikwargs): def connect(sid, environ, auth): pass + instrumented_server = sio.instrument(auth=auth, **ikwargs) server = SocketIOWebServer(sio) server.start()