You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

53 lines
1.3 KiB

from typing import Union
from couchbase import LOCKMODE_WAIT
from couchbase.bucket import Bucket
from couchbase.cluster import Cluster, PasswordAuthenticator
from fastapi import FastAPI
from pydantic import BaseModel
USERPROFILE_DOC_TYPE = "userprofile"
def get_bucket():
cluster = Cluster(
"couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300"
)
authenticator = PasswordAuthenticator("username", "password")
cluster.authenticate(authenticator)
bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT)
bucket.timeout = 30
bucket.n1ql_timeout = 300
return bucket
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
type: str = USERPROFILE_DOC_TYPE
hashed_password: str
def get_user(bucket: Bucket, username: str):
doc_id = f"userprofile::{username}"
result = bucket.get(doc_id, quiet=True)
if not result.value:
return None
user = UserInDB(**result.value)
return user
# FastAPI specific code
app = FastAPI()
@app.get("/users/{username}", response_model=User)
def read_user(username: str):
bucket = get_bucket()
user = get_user(bucket=bucket, username=username)
return user