from typing import Union from couchbase import LOCKMODE_WAIT from couchbase.bucket import Bucket from couchbase.cluster import Cluster, PasswordAuthenticator from fastapi import FastAPI from pydantic import BaseModel USERPROFILE_DOC_TYPE = "userprofile" def get_bucket(): cluster = Cluster( "couchbase://couchbasehost:8091?fetch_mutation_tokens=1&operation_timeout=30&n1ql_timeout=300" ) authenticator = PasswordAuthenticator("username", "password") cluster.authenticate(authenticator) bucket: Bucket = cluster.open_bucket("bucket_name", lockmode=LOCKMODE_WAIT) bucket.timeout = 30 bucket.n1ql_timeout = 300 return bucket class User(BaseModel): username: str email: Union[str, None] = None full_name: Union[str, None] = None disabled: Union[bool, None] = None class UserInDB(User): type: str = USERPROFILE_DOC_TYPE hashed_password: str def get_user(bucket: Bucket, username: str): doc_id = f"userprofile::{username}" result = bucket.get(doc_id, quiet=True) if not result.value: return None user = UserInDB(**result.value) return user # FastAPI specific code app = FastAPI() @app.get("/users/{username}", response_model=User) def read_user(username: str): bucket = get_bucket() user = get_user(bucket=bucket, username=username) return user