pythonasyncioapiasyncfastapiframeworkjsonjson-schemaopenapiopenapi3pydanticpython-typespython3redocreststarletteswaggerswagger-uiuvicornweb
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
95 lines
2.5 KiB
95 lines
2.5 KiB
from typing import Union
|
|
|
|
from fastapi import Depends, FastAPI, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
|
from pydantic import BaseModel
|
|
from typing_extensions import Annotated
|
|
|
|
fake_users_db = {
|
|
"johndoe": {
|
|
"username": "johndoe",
|
|
"full_name": "John Doe",
|
|
"email": "[email protected]",
|
|
"hashed_password": "fakehashedsecret",
|
|
"disabled": False,
|
|
},
|
|
"alice": {
|
|
"username": "alice",
|
|
"full_name": "Alice Wonderson",
|
|
"email": "[email protected]",
|
|
"hashed_password": "fakehashedsecret2",
|
|
"disabled": True,
|
|
},
|
|
}
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
def fake_hash_password(password: str):
|
|
return "fakehashed" + password
|
|
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
|
|
|
|
|
class User(BaseModel):
|
|
username: str
|
|
email: Union[str, None] = None
|
|
full_name: Union[str, None] = None
|
|
disabled: Union[bool, None] = None
|
|
|
|
|
|
class UserInDB(User):
|
|
hashed_password: str
|
|
|
|
|
|
def get_user(db, username: str):
|
|
if username in db:
|
|
user_dict = db[username]
|
|
return UserInDB(**user_dict)
|
|
|
|
|
|
def fake_decode_token(token):
|
|
# This doesn't provide any security at all
|
|
# Check the next version
|
|
user = get_user(fake_users_db, token)
|
|
return user
|
|
|
|
|
|
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
|
|
user = fake_decode_token(token)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
return user
|
|
|
|
|
|
async def get_current_active_user(
|
|
current_user: Annotated[User, Depends(get_current_user)],
|
|
):
|
|
if current_user.disabled:
|
|
raise HTTPException(status_code=400, detail="Inactive user")
|
|
return current_user
|
|
|
|
|
|
@app.post("/token")
|
|
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
|
|
user_dict = fake_users_db.get(form_data.username)
|
|
if not user_dict:
|
|
raise HTTPException(status_code=400, detail="Incorrect username or password")
|
|
user = UserInDB(**user_dict)
|
|
hashed_password = fake_hash_password(form_data.password)
|
|
if not hashed_password == user.hashed_password:
|
|
raise HTTPException(status_code=400, detail="Incorrect username or password")
|
|
|
|
return {"access_token": user.username, "token_type": "bearer"}
|
|
|
|
|
|
@app.get("/users/me")
|
|
async def read_users_me(
|
|
current_user: Annotated[User, Depends(get_current_active_user)],
|
|
):
|
|
return current_user
|
|
|