|
|
@ -2,12 +2,7 @@ import binascii |
|
|
|
from base64 import b64decode |
|
|
|
from typing import Optional |
|
|
|
|
|
|
|
from fastapi.security.base import SecurityBase |
|
|
|
from fastapi import HTTPException, status, Depends |
|
|
|
from fastapi.security.utils import get_authorization_scheme_param |
|
|
|
from starlette.requests import Request |
|
|
|
from starlette.status import HTTP_401_UNAUTHORIZED |
|
|
|
from typing import Optional, Tuple |
|
|
|
from fastapi import HTTPException |
|
|
|
from fastapi.exceptions import HTTPException |
|
|
|
from fastapi.openapi.models import HTTPBase as HTTPBaseModel |
|
|
|
from fastapi.openapi.models import HTTPBearer as HTTPBearerModel |
|
|
@ -70,17 +65,20 @@ class HTTPAuthorizationCredentials(BaseModel): |
|
|
|
""" |
|
|
|
), |
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
class HTTPBasicCredentials(BaseModel): |
|
|
|
username: str |
|
|
|
password: str |
|
|
|
|
|
|
|
|
|
|
|
class HTTPBasic(SecurityBase): |
|
|
|
def __init__( |
|
|
|
self, |
|
|
|
*, |
|
|
|
scheme_name: Optional[str] = None, |
|
|
|
realm: str, # REQUIRED parameter (RFC 7617 compliance) |
|
|
|
auto_error: bool = True |
|
|
|
auto_error: bool = True, |
|
|
|
): |
|
|
|
self.scheme_name = scheme_name or self.__class__.__name__ |
|
|
|
self.realm = realm |
|
|
@ -89,7 +87,7 @@ class HTTPBasic(SecurityBase): |
|
|
|
async def __call__(self, request: Request) -> Optional[HTTPBasicCredentials]: |
|
|
|
authorization_header = request.headers.get("Authorization") |
|
|
|
scheme, credentials = get_authorization_scheme_param(authorization_header) |
|
|
|
|
|
|
|
|
|
|
|
# Handle missing/invalid scheme |
|
|
|
if not authorization_header or scheme.lower() != "basic": |
|
|
|
if self.auto_error: |
|
|
@ -99,7 +97,7 @@ class HTTPBasic(SecurityBase): |
|
|
|
detail="Invalid authentication credentials", |
|
|
|
) |
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
# Decode credentials (base64) |
|
|
|
try: |
|
|
|
decoded = base64.b64decode(credentials).decode("utf-8") |
|
|
|