You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

114 lines
3.0 KiB

#!/usr/bin/env python
# set instrument to `True` to accept connections from the official Socket.IO
# Admin UI hosted at https://admin.socket.io
instrument = True
admin_login = {
"username": "admin",
"password": "python", # change this to a strong secret for production use!
}
import uvicorn
import socketio
sio = socketio.AsyncServer(
async_mode="asgi",
cors_allowed_origins=None
if not instrument
else [
"http://localhost:5000",
"https://admin.socket.io", # edit the allowed origins if necessary
],
)
if instrument:
sio.instrument(auth=admin_login)
app = socketio.ASGIApp(
sio,
static_files={
"/": "app.html",
},
)
background_task_started = False
async def background_task():
"""Example of how to send server generated events to clients."""
count = 0
while True:
await sio.sleep(10)
count += 1
await sio.emit("my_response", {"data": "Server generated event"})
@sio.on("my_event")
async def test_message(sid, message):
await sio.emit("my_response", {"data": message["data"]}, room=sid)
@sio.on("my_broadcast_event")
async def test_broadcast_message(sid, message):
await sio.emit("my_response", {"data": message["data"]})
@sio.on("join")
async def join(sid, message):
await sio.enter_room(sid, message["room"])
await sio.emit(
"my_response", {"data": "Entered room: " + message["room"]}, room=sid
)
@sio.on("leave")
async def leave(sid, message):
await sio.leave_room(sid, message["room"])
await sio.emit("my_response", {"data": "Left room: " + message["room"]}, room=sid)
@sio.on("close room")
async def close(sid, message):
await sio.emit(
"my_response",
{"data": "Room " + message["room"] + " is closing."},
room=message["room"],
)
await sio.close_room(message["room"])
@sio.on("my_room_event")
async def send_room_message(sid, message):
await sio.emit("my_response", {"data": message["data"]}, room=message["room"])
@sio.on("disconnect request")
async def disconnect_request(sid):
await sio.disconnect(sid)
@sio.on("connect")
async def test_connect(sid, environ):
global background_task_started
if not background_task_started:
sio.start_background_task(background_task)
background_task_started = True
await sio.emit("my_response", {"data": "Connected", "count": 0}, room=sid)
@sio.on("disconnect")
def test_disconnect(sid, reason):
print("Client disconnected, reason:", reason)
if __name__ == "__main__":
if instrument:
print("The server is instrumented for remote administration.")
print(
"Use the official Socket.IO Admin UI at https://admin.socket.io "
"with the following connection details:"
)
print(" - Server URL: http://localhost:5000")
print(" - Username:", admin_login["username"])
print(" - Password:", admin_login["password"])
print("")
uvicorn.run(app, host="127.0.0.1", port=5000)