Browse Source

concurrency fine tuning

pull/1164/head
Miguel Grinberg 2 years ago
parent
commit
9cc4177442
Failed to extract signature
  1. 17
      src/socketio/admin.py
  2. 20
      src/socketio/async_admin.py
  3. 2
      tests/async/test_admin.py
  4. 2
      tests/common/test_admin.py

17
src/socketio/admin.py

@ -51,14 +51,13 @@ class InstrumentedServer:
self.mode = mode
self.event_buffer = EventBuffer()
# task that emits "server_stats" every 2 seconds
self.stop_stats_event = None
self.stats_task = None
# monkey-patch the server to report metrics to the admin UI
self.instrument()
# start thread that emits "server_stats" every 2 seconds
self.stop_stats_event = sio.eio.create_event()
self.stats_task = self.sio.start_background_task(
self._emit_server_stats)
def instrument(self):
self.sio.on('connect', self.admin_connect,
namespace=self.admin_namespace)
@ -141,7 +140,8 @@ class InstrumentedServer:
from engineio.socket import Socket
Socket.handle_post_request = Socket.__handle_post_request
Socket._websocket_handler = Socket.__websocket_handler
Socket.send_ping = Socket.__send_ping
if self.mode == 'development':
Socket._send_ping = Socket.__send_ping
def admin_connect(self, sid, environ, client_auth):
if self.auth:
@ -294,6 +294,11 @@ class InstrumentedServer:
return ret
def _handle_eio_connect(self, eio_sid, environ):
if self.stop_stats_event is None:
self.stop_stats_event = self.sio.eio.create_event()
self.stats_task = self.sio.start_background_task(
self._emit_server_stats)
self.event_buffer.push('rawConnection')
return self.sio._handle_eio_connect(eio_sid, environ)

20
src/socketio/async_admin.py

@ -32,14 +32,13 @@ class InstrumentedAsyncServer:
self.admin_queue = []
self.event_buffer = EventBuffer()
# task that emits "server_stats" every 2 seconds
self.stop_stats_event = None
self.stats_task = None
# monkey-patch the server to report metrics to the admin UI
self.instrument()
# start thread that emits "server_stats" every 2 seconds
self.stop_stats_event = sio.eio.create_event()
self.stats_task = self.sio.start_background_task(
self._emit_server_stats)
def instrument(self):
self.sio.on('connect', self.admin_connect,
namespace=self.admin_namespace)
@ -122,7 +121,8 @@ class InstrumentedAsyncServer:
from engineio.async_socket import AsyncSocket
AsyncSocket.handle_post_request = AsyncSocket.__handle_post_request
AsyncSocket._websocket_handler = AsyncSocket.__websocket_handler
AsyncSocket.schedule_ping = AsyncSocket.__schedule_ping
if self.mode == 'development':
AsyncSocket._send_ping = AsyncSocket.__send_ping
async def admin_connect(self, sid, environ, client_auth):
authenticated = True
@ -165,6 +165,9 @@ class InstrumentedAsyncServer:
namespace=self.admin_namespace)
self.sio.start_background_task(config, sid)
self.stop_stats_event = self.sio.eio.create_event()
self.stats_task = self.sio.start_background_task(
self._emit_server_stats)
async def admin_emit(self, _, namespace, room_filter, event, *data):
await self.sio.emit(event, data, to=room_filter, namespace=namespace)
@ -280,6 +283,11 @@ class InstrumentedAsyncServer:
return ret
async def _handle_eio_connect(self, eio_sid, environ):
if self.stop_stats_event is None:
self.stop_stats_event = self.sio.eio.create_event()
self.stats_task = self.sio.start_background_task(
self._emit_server_stats)
self.event_buffer.push('rawConnection')
return await self.sio._handle_eio_connect(eio_sid, environ)

2
tests/async/test_admin.py

@ -24,7 +24,6 @@ def with_instrumented_server(auth=False, **ikwargs):
@wraps(f)
def wrapped(self, *args, **kwargs):
sio = socketio.AsyncServer(async_mode='asgi')
instrumented_server = sio.instrument(auth=auth, **ikwargs)
@sio.event
async def enter_room(sid, data):
@ -42,6 +41,7 @@ def with_instrumented_server(auth=False, **ikwargs):
await instrumented_server.shutdown()
await sio.shutdown()
instrumented_server = sio.instrument(auth=auth, **ikwargs)
server = SocketIOWebServer(sio, on_shutdown=shutdown)
server.start()

2
tests/common/test_admin.py

@ -20,7 +20,6 @@ def with_instrumented_server(auth=False, **ikwargs):
@wraps(f)
def wrapped(self, *args, **kwargs):
sio = socketio.Server(async_mode='threading')
instrumented_server = sio.instrument(auth=auth, **ikwargs)
@sio.event
def enter_room(sid, data):
@ -34,6 +33,7 @@ def with_instrumented_server(auth=False, **ikwargs):
def connect(sid, environ, auth):
pass
instrumented_server = sio.instrument(auth=auth, **ikwargs)
server = SocketIOWebServer(sio)
server.start()

Loading…
Cancel
Save