from fastapi import FastAPI from fastapi.responses import JSONResponse, Response import os, argparse from a2s import info as VALVE_SERVER_INFO from a2s import players as VALVE_SERVER_PLAYERS #from rcon.source import rcon as VALVE_SERVER_RCON from custom_rcon import rcon as VALVE_SERVER_RCON from time import time from pydantic import BaseModel from RCONPlayerModel import * import traceback import logging class A2S_request(BaseModel): server: str = None @property def address(self): return self.server.split(":")[0] @property def port(self): return int(self.server.split(":")[1]) @property def tuple_address(self): return (self.address, self.port) class RCON_Request(A2S_request): password: str = None command: str = None @property def fulled_data(self): return self.server and self.password and self.command class EndpointFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: return record.getMessage().find("/api") == -1 class SourceBackend: app = None def __init__(self): self.app = FastAPI() self.setup_routes() def run(self, host = "0.0.0.0", port = 45353): import uvicorn logging.getLogger("uvicorn.access").addFilter(EndpointFilter()) uvicorn.run(self.app, host = host, port = port) def setup_routes(self): @self.app.get("/api/ping") async def get_ping(): return {"pong":int(time())} @self.app.post("/api/rcon") async def execute_rcon(request: RCON_Request): #print(request.__dict__) if not request.fulled_data: return Response(status_code=409) try: return await VALVE_SERVER_RCON( request.command, host = request.address, port = request.port, passwd = request.password ) except: print(request.__dict__) return Response(status_code = 400) @self.app.post("/api/a2s/info") async def get_a2s_info(request: A2S_request): #print(request.__dict__) if not request.address: return Response(status_code=409) try: return JSONResponse(content=dict(VALVE_SERVER_INFO(request.tuple_address))) except: print(request.__dict__) traceback.print_exc() return Response(status_code = 400) @self.app.post("/api/a2s/players") async def get_a2s_players(request: A2S_request): #print(request.__dict__) if not request.address: return Response(status_code=409) try: return JSONResponse(content=dict(VALVE_SERVER_PLAYERS(request.tuple_address))) except: print(request.__dict__) traceback.print_exc() return Response(status_code = 400) @self.app.post("/api/players") async def get_players(request: RCON_Request): #print(request.__dict__) a2s_players = await VALVE_SERVER_PLAYERS(request.tuple_address) if(request.password == None): return Response(status_code = 403) try: status_lines:str = await VALVE_SERVER_RCON( "status", host = request.address, port = request.port, passwd = request.password ) except UnicodeDecodeError as err: print(f"Cannot execute 'status' on {request.address}:{request.port}, error: {err}") return Response(status_code = 500) try: start_index = status_lines.index("# userid") except: return Response(status_code = 400) skip_table_header = True players:list[RCONPlayer] = [] for line in status_lines[start_index:].split("\n"): if skip_table_header or line.__len__() < 1: skip_table_header = False continue try: players.append(RCONPlayer(line).__dict__) except (BotPlayer, ValueError, IndexError): pass except: traceback.print_exc() pass for a2s_player in a2s_players: for rcon_player in players: if rcon_player["name"] == a2s_player.name: rcon_player["score"] = a2s_player.score break return JSONResponse(content=players) if __name__ == "__main__": SourceBackend().run(port = 8082)