You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

66 lines
2.3 KiB

from typing import List, Optional
from fastapi.openapi.models import OAuth2 as OAuth2Model, OAuthFlows as OAuthFlowsModel
from fastapi.security.base import SecurityBase
from pydantic import BaseModel, Schema
from starlette.exceptions import HTTPException
from starlette.requests import Request
from starlette.status import HTTP_403_FORBIDDEN
class OAuth2PasswordRequestData(BaseModel):
grant_type: str = "password"
username: str
password: str
scope: Optional[List[str]] = None
# Client ID and secret might come from headers
client_id: Optional[str] = None
client_secret: Optional[str] = None
class OAuth2PasswordRequestForm(BaseModel):
grant_type: str = Schema(..., regex="password") # it must have the value "password"
username: str
password: str
scope: str = ""
# Client ID and secret might come from headers
client_id: Optional[str] = None
client_secret: Optional[str] = None
def parse(self) -> OAuth2PasswordRequestData:
return OAuth2PasswordRequestData(
grant_type=self.grant_type,
username=self.username,
password=self.password,
scope=self.scope.split(),
client_id=self.client_id,
client_secret=self.client_secret,
)
class OAuth2(SecurityBase):
def __init__(
self, *, flows: OAuthFlowsModel = OAuthFlowsModel(), scheme_name: str = None
):
self.model = OAuth2Model(flows=flows)
self.scheme_name = scheme_name or self.__class__.__name__
async def __call__(self, request: Request) -> str:
return request.headers.get("Authorization")
class OAuth2PasswordBearer(OAuth2):
def __init__(self, tokenUrl: str, scheme_name: str = None, scopes: dict = None):
if not scopes:
scopes = {}
flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes})
super().__init__(flows=flows, scheme_name=scheme_name)
async def __call__(self, request: Request) -> str:
authorization: str = request.headers.get("Authorization")
if not authorization or "Bearer " not in authorization:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
)
token = authorization.replace("Bearer ", "")
return token