#!/usr/bin/env python # set instrument to `True` to accept connections from the official Socket.IO # Admin UI hosted at https://admin.socket.io instrument = True admin_login = { 'username': 'admin', 'password': 'python', # change this to a strong secret for production use! } import uvicorn import socketio sio = socketio.AsyncServer( async_mode='asgi', cors_allowed_origins=None if not instrument else [ 'http://localhost:5000', 'https://admin.socket.io', # edit the allowed origins if necessary ]) if instrument: sio.instrument(auth=admin_login) app = socketio.ASGIApp(sio, static_files={ '/': 'app.html', }) background_task_started = False async def background_task(): """Example of how to send server generated events to clients.""" count = 0 while True: await sio.sleep(10) count += 1 await sio.emit('my_response', {'data': 'Server generated event'}) @sio.on('my_event') async def test_message(sid, message): await sio.emit('my_response', {'data': message['data']}, room=sid) @sio.on('my_broadcast_event') async def test_broadcast_message(sid, message): await sio.emit('my_response', {'data': message['data']}) @sio.on('join') async def join(sid, message): await sio.enter_room(sid, message['room']) await sio.emit('my_response', {'data': 'Entered room: ' + message['room']}, room=sid) @sio.on('leave') async def leave(sid, message): await sio.leave_room(sid, message['room']) await sio.emit('my_response', {'data': 'Left room: ' + message['room']}, room=sid) @sio.on('close room') async def close(sid, message): await sio.emit('my_response', {'data': 'Room ' + message['room'] + ' is closing.'}, room=message['room']) await sio.close_room(message['room']) @sio.on('my_room_event') async def send_room_message(sid, message): await sio.emit('my_response', {'data': message['data']}, room=message['room']) @sio.on('disconnect request') async def disconnect_request(sid): await sio.disconnect(sid) @sio.on('connect') async def test_connect(sid, environ): global background_task_started if not background_task_started: sio.start_background_task(background_task) background_task_started = True await sio.emit('my_response', {'data': 'Connected', 'count': 0}, room=sid) @sio.on('disconnect') def test_disconnect(sid, reason): print('Client disconnected, reason:', reason) if __name__ == '__main__': if instrument: print('The server is instrumented for remote administration.') print( 'Use the official Socket.IO Admin UI at https://admin.socket.io ' 'with the following connection details:' ) print(' - Server URL: http://localhost:5000') print(' - Username:', admin_login['username']) print(' - Password:', admin_login['password']) print('') uvicorn.run(app, host='127.0.0.1', port=5000)