from functools import wraps from fastapi.requests import Request from fastapi.responses import Response from fastapi.exceptions import HTTPException from utils import md5hash from random import randint from time import time import asyncio from logger import logger class NotValidCode(Exception): pass class AuthManager: NUM = "NUM" NUM_SECURED = "NUM_SECURED" SECRET_KEY = "SECRET_KEY"#todo MAX_CODE_LIFE = 180 def __init__(self, args): self.salt = args.web_salt self.enable = args.web_auth_enable self.code_store = {} async def storeCleaner(self): logger.info("Code store cleaner working...") run = True while run: try: l = list(self.code_store.keys()) for code in l: if code in self.code_store.keys():#check mb is not exists one time if time() - self.code_store[code]["ts"] > self.MAX_CODE_LIFE: logger.info(f"Code {code} is ended") del self.code_store[code] await asyncio.sleep(1) except asyncio.exceptions.CancelledError: run = False except: logger.error("Cannot check code store") pass def authRequest(self, method=[]):#todo cookie or secret_key def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): if not self.enable: logger.info("AUTH IS DISABLE") return await func(*args, **kwargs) request: Request = kwargs.get("request", None) if request is None: raise HTTPException(status_code=500, detail="Authed endpoint need request arg, but is missing") if request.cookies.get(self.NUM, None) and request.cookies.get(self.NUM_SECURED, None): #check cookie is valid if md5hash(request.cookies[self.NUM] + self.salt) == request.cookies.get(self.NUM_SECURED): return await func(*args, **kwargs) raise HTTPException(status_code=401) return wrapper return decorator def setAuth(self, response: Response, num:int, clear = False): if clear: response.set_cookie(self.NUM, "") response.set_cookie(self.NUM_SECURED, "") else: response.set_cookie(self.NUM, str(num)) response.set_cookie(self.NUM_SECURED, md5hash(str(num)+self.salt)) return response def request_auth(self, num: int): code = randint(1000, 9999) while code in self.code_store.keys(): code = randint(1000, 9999) self.code_store[code] = { "code": code, "num": num, "ts": time() } #logger.info(code) return code def accept_code(self, code:int): if code in self.code_store.keys(): num = self.code_store[code]["num"] del self.code_store[code] return num raise NotValidCode()