#!/usr/bin/env python # set instrument to `True` to accept connections from the official Socket.IO # Admin UI hosted at https://admin.socket.io instrument = True admin_login = { "username": "admin", "password": "python", # change this to a strong secret for production use! } import uvicorn import socketio sio = socketio.AsyncServer( async_mode="asgi", cors_allowed_origins=None if not instrument else [ "http://localhost:5000", "https://admin.socket.io", # edit the allowed origins if necessary ], ) if instrument: sio.instrument(auth=admin_login) app = socketio.ASGIApp( sio, static_files={ "/": "app.html", }, ) background_task_started = False async def background_task(): """Example of how to send server generated events to clients.""" count = 0 while True: await sio.sleep(10) count += 1 await sio.emit("my_response", {"data": "Server generated event"}) @sio.on("my_event") async def test_message(sid, message): await sio.emit("my_response", {"data": message["data"]}, room=sid) @sio.on("my_broadcast_event") async def test_broadcast_message(sid, message): await sio.emit("my_response", {"data": message["data"]}) @sio.on("join") async def join(sid, message): await sio.enter_room(sid, message["room"]) await sio.emit( "my_response", {"data": "Entered room: " + message["room"]}, room=sid ) @sio.on("leave") async def leave(sid, message): await sio.leave_room(sid, message["room"]) await sio.emit("my_response", {"data": "Left room: " + message["room"]}, room=sid) @sio.on("close room") async def close(sid, message): await sio.emit( "my_response", {"data": "Room " + message["room"] + " is closing."}, room=message["room"], ) await sio.close_room(message["room"]) @sio.on("my_room_event") async def send_room_message(sid, message): await sio.emit("my_response", {"data": message["data"]}, room=message["room"]) @sio.on("disconnect request") async def disconnect_request(sid): await sio.disconnect(sid) @sio.on("connect") async def test_connect(sid, environ): global background_task_started if not background_task_started: sio.start_background_task(background_task) background_task_started = True await sio.emit("my_response", {"data": "Connected", "count": 0}, room=sid) @sio.on("disconnect") def test_disconnect(sid, reason): print("Client disconnected, reason:", reason) if __name__ == "__main__": if instrument: print("The server is instrumented for remote administration.") print( "Use the official Socket.IO Admin UI at https://admin.socket.io " "with the following connection details:" ) print(" - Server URL: http://localhost:5000") print(" - Username:", admin_login["username"]) print(" - Password:", admin_login["password"]) print("") uvicorn.run(app, host="127.0.0.1", port=5000)