diff --git a/docs/img/tutorial/security/image11.png b/docs/img/tutorial/security/image11.png new file mode 100644 index 000000000..3e1a0ce91 Binary files /dev/null and b/docs/img/tutorial/security/image11.png differ diff --git a/docs/src/security/tutorial004.py b/docs/src/security/tutorial004.py index edbac86f6..bac77a4b1 100644 --- a/docs/src/security/tutorial004.py +++ b/docs/src/security/tutorial004.py @@ -1,7 +1,7 @@ from datetime import datetime, timedelta import jwt -from fastapi import Depends, FastAPI, HTTPException, Security +from fastapi import Depends, FastAPI, HTTPException from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jwt import PyJWTError from passlib.context import CryptContext @@ -12,7 +12,6 @@ from starlette.status import HTTP_403_FORBIDDEN # openssl rand -hex 32 SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" ALGORITHM = "HS256" -TOKEN_SUBJECT = "access" ACCESS_TOKEN_EXPIRE_MINUTES = 30 @@ -32,7 +31,7 @@ class Token(BaseModel): token_type: str -class TokenPayload(BaseModel): +class TokenData(BaseModel): username: str = None @@ -83,20 +82,26 @@ def create_access_token(*, data: dict, expires_delta: timedelta = None): expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) - to_encode.update({"exp": expire, "sub": TOKEN_SUBJECT}) + to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt -async def get_current_user(token: str = Security(oauth2_scheme)): +async def get_current_user(token: str = Depends(oauth2_scheme)): + credentials_exception = HTTPException( + status_code=HTTP_403_FORBIDDEN, detail="Could not validate credentials" + ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) - token_data = TokenPayload(**payload) + username: str = payload.get("sub") + if username is None: + raise credentials_exception + token_data = TokenData(username=username) except PyJWTError: - raise HTTPException( - status_code=HTTP_403_FORBIDDEN, detail="Could not validate credentials" - ) + raise credentials_exception user = get_user(fake_users_db, username=token_data.username) + if user is None: + raise credentials_exception return user @@ -110,15 +115,15 @@ async def get_current_active_user(current_user: User = Depends(get_current_user) async def route_login_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(fake_users_db, form_data.username, form_data.password) if not user: - raise HTTPException(status_code=400, detail="Incorrect email or password") + raise HTTPException(status_code=400, detail="Incorrect username or password") access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( - data={"username": form_data.username}, expires_delta=access_token_expires + data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} -@app.get("/users/me", response_model=User) +@app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user diff --git a/docs/src/security/tutorial005.py b/docs/src/security/tutorial005.py new file mode 100644 index 000000000..f13e2310e --- /dev/null +++ b/docs/src/security/tutorial005.py @@ -0,0 +1,162 @@ +from datetime import datetime, timedelta +from typing import List + +import jwt +from fastapi import Depends, FastAPI, HTTPException, Security +from fastapi.security import ( + OAuth2PasswordBearer, + OAuth2PasswordRequestForm, + SecurityScopes, +) +from jwt import PyJWTError +from passlib.context import CryptContext +from pydantic import BaseModel, ValidationError +from starlette.status import HTTP_403_FORBIDDEN + +# to get a string like this run: +# openssl rand -hex 32 +SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" +ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 + + +fake_users_db = { + "johndoe": { + "username": "johndoe", + "full_name": "John Doe", + "email": "johndoe@example.com", + "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", + "disabled": False, + }, + "alice": { + "username": "alice", + "full_name": "Alice Chains", + "email": "alicechains@example.com", + "hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm", + "disabled": True, + }, +} + + +class Token(BaseModel): + access_token: str + token_type: str + + +class TokenData(BaseModel): + username: str = None + scopes: List[str] = [] + + +class User(BaseModel): + username: str + email: str = None + full_name: str = None + disabled: bool = None + + +class UserInDB(User): + hashed_password: str + + +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + +oauth2_scheme = OAuth2PasswordBearer( + tokenUrl="/token", + scopes={"me": "Read information about the current user.", "items": "Read items."}, +) + +app = FastAPI() + + +def verify_password(plain_password, hashed_password): + return pwd_context.verify(plain_password, hashed_password) + + +def get_password_hash(password): + return pwd_context.hash(password) + + +def get_user(db, username: str): + if username in db: + user_dict = db[username] + return UserInDB(**user_dict) + + +def authenticate_user(fake_db, username: str, password: str): + user = get_user(fake_db, username) + if not user: + return False + if not verify_password(password, user.hashed_password): + return False + return user + + +def create_access_token(*, data: dict, expires_delta: timedelta = None): + to_encode = data.copy() + if expires_delta: + expire = datetime.utcnow() + expires_delta + else: + expire = datetime.utcnow() + timedelta(minutes=15) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + return encoded_jwt + + +async def get_current_user( + security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme) +): + credentials_exception = HTTPException( + status_code=HTTP_403_FORBIDDEN, detail="Could not validate credentials" + ) + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + username: str = payload.get("sub") + if username is None: + raise credentials_exception + token_scopes = payload.get("scopes", []) + token_data = TokenData(scopes=token_scopes, username=username) + except (PyJWTError, ValidationError): + raise credentials_exception + user = get_user(fake_users_db, username=token_data.username) + if user is None: + raise credentials_exception + for scope in security_scopes.scopes: + if scope not in token_data.scopes: + raise HTTPException( + status_code=HTTP_403_FORBIDDEN, detail="Not enough permissions" + ) + return user + + +async def get_current_active_user( + current_user: User = Security(get_current_user, scopes=["me"]) +): + if current_user.disabled: + raise HTTPException(status_code=400, detail="Inactive user") + return current_user + + +@app.post("/token", response_model=Token) +async def route_login_access_token(form_data: OAuth2PasswordRequestForm = Depends()): + user = authenticate_user(fake_users_db, form_data.username, form_data.password) + if not user: + raise HTTPException(status_code=400, detail="Incorrect username or password") + access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = create_access_token( + data={"sub": user.username, "scopes": form_data.scopes}, + expires_delta=access_token_expires, + ) + return {"access_token": access_token, "token_type": "bearer"} + + +@app.get("/users/me/", response_model=User) +async def read_users_me(current_user: User = Depends(get_current_active_user)): + return current_user + + +@app.get("/users/me/items/") +async def read_own_items( + current_user: User = Security(get_current_active_user, scopes=["items"]) +): + return [{"item_id": "Foo", "owner": current_user.username}] diff --git a/docs/tutorial/security/oauth2-jwt.md b/docs/tutorial/security/oauth2-jwt.md index a28f4e95f..4958d35f2 100644 --- a/docs/tutorial/security/oauth2-jwt.md +++ b/docs/tutorial/security/oauth2-jwt.md @@ -46,7 +46,7 @@ So, the thief won't be able to try to use that password in another system (as ma PassLib is a great Python package to handle password hashes. -It supports many secure hashing algorithms, and utilities to work with them. +It supports many secure hashing algorithms and utilities to work with them. The recommended algorithm is "Bcrypt". @@ -69,7 +69,7 @@ Import the tools we need from `passlib`. Create a PassLib "context". This is what will be used to hash and verify passwords. !!! tip - The PassLib context also has functionality to use different hashing algorithms, deprecate old ones, but allow verifying them, etc. + The PassLib context also has functionality to use different hashing algorithms, including deprecate old ones only to allow verifying them, etc. For example, you could use it to read and verify passwords generated by another system (like Django) but hash any new passwords with a different algorithm like Bcrypt. @@ -81,7 +81,7 @@ And another utility to verify if a received password matches the hash stored. And another one to authenticate and return a user. -```Python hl_lines="7 50 57 58 61 62 71 72 73 74 75 76 77" +```Python hl_lines="7 39 56 57 60 61 70 71 72 73 74 75 76" {!./src/security/tutorial004.py!} ``` @@ -94,7 +94,7 @@ Import the modules installed. Create a random secret key that will be used to sign the JWT tokens. -To generate a secure random secret, key use the command: +To generate a secure random secret key use the command: ```bash openssl rand -hex 32 @@ -104,15 +104,13 @@ And copy the output to the variable `SECRET_KEY` (don't use the one in the examp Create a variable `ALGORITHM` with the algorithm used to sign the JWT token and set it to `"HS256"`. -And another one for the `TOKEN_SUBJECT`, and set it to, for example, `"access"`. - Create a variable for the expiration of the token. Define a Pydantic Model that will be used in the token endpoint for the response. Create a utility function to generate a new access token. -```Python hl_lines="3 6 13 14 15 16 30 31 32 80 81 82 83 84 85 86 87 88" +```Python hl_lines="3 6 13 14 15 29 30 31 79 80 81 82 83 84 85 86 87" {!./src/security/tutorial004.py!} ``` @@ -124,7 +122,7 @@ Decode the received token, verify it, and return the current user. If the token is invalid, return an HTTP error right away. -```Python hl_lines="91 92 93 94 95 96 97 98 99 100" +```Python hl_lines="90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105" {!./src/security/tutorial004.py!} ``` @@ -134,10 +132,33 @@ Create a `timedelta` with the expiration time of the token. Create a real JWT access token and return it. -```Python hl_lines="114 115 116 117 118" +```Python hl_lines="114 115 116 117 118 119 120 121 122 123" {!./src/security/tutorial004.py!} ``` + +### Technical details about the JWT "subject" `sub` + +The JWT specification says that there's a key `sub`, with the subject of the token. + +It's optional to use it, but that's where you would put the user's identification, so we are using it here. + +JWT might be used for other things apart from identifying a user and allowing him to perform operations directly on your API. + +For example, you could identify a "car" or a "blog post". + +Then you could add permissions about that entity, like "drive" (for the car) or "edit" (for the blog). + +And then, you could give that JWT token to a user (or bot), and he could use it to perform those actions (drive the car, or edit the blog post) without even needing to have an account, just with the JWT token your API generated for that. + +Using these ideas, JWT can be used for way more sophisticate scenarios. + +In those cases, several of those entities could have the same ID, let's say `foo` (a user `foo`, a car `foo`, and a blog post `foo`). + +So, to avoid ID collisions, when creating the JWT token for the user, you could prefix the value of the `sub` key, e.g. with `username:`. + +The important thing to have in mind is that the `sub` key should have a unique identifier across the entire application. + ## Check it Run the server and go to the docs: http://127.0.0.1:8000/docs. @@ -158,7 +179,7 @@ Password: `secret` -Call the endpoint `/users/me`, you will get the response as: +Call the endpoint `/users/me/`, you will get the response as: ```JSON { @@ -180,15 +201,17 @@ If you open the developer tools, you could see how the data sent and received is ## Advanced usage with `scopes` -We didn't use it in this example, but `Security` can receive a parameter `scopes`, as a list of strings. +OAuth2 has the notion of "scopes". + +You can use them to add a specific set of permissions to a JWT token. -It would describe the scopes required for a specific path operation, as different path operations might require different security scopes, even while using the same `OAuth2PasswordBearer` (or any of the other tools). +Then you can give this token to a user directly or a third party, to interact with your API with a set of restrictions. -This only applies to OAuth2, and it might be, more or less, an advanced feature, but it is there, if you need to use it. +You can learn how to use them and how they are integrated into **FastAPI** in the next section. ## Recap -This concludes our tour for the security features of **FastAPI**. +With what you have seen up to now, you can set up a secure **FastAPI** application using standards like OAuth2 and JWT. In almost any framework handling the security becomes a rather complex subject quite quickly. @@ -205,3 +228,7 @@ And you can use directly many well maintained and widely used packages like `pas But it provides you the tools to simplify the process as much as possible without compromising flexibility, robustness or security. And you can use secure, standard protocols like OAuth2 in a relatively simple way. + +In the next (optional) section you can see how to extend this even further, using OAuth2 "scopes", for a more fine-grained permission system following standards. + +OAuth2 with scopes (explained in the next section) is the mechanism used by many big authentication providers, like Facebook, Google, GitHub, Microsoft, Twitter, etc. diff --git a/docs/tutorial/security/oauth2-scopes.md b/docs/tutorial/security/oauth2-scopes.md new file mode 100644 index 000000000..51e28daaf --- /dev/null +++ b/docs/tutorial/security/oauth2-scopes.md @@ -0,0 +1,191 @@ +You can use OAuth2 scopes directly with **FastAPI**, they are integrated to work seamlessly. + +This would allow you to have a more fine-grained permission system, following standards like OAuth2, integrated into your OpenAPI application (and the API docs). + +OAuth2 with scopes is the mechanism used by many big authentication providers, like Facebook, Google, GitHub, Microsoft, Twitter, etc. They use it to provide specific permissions to users and applications. + +Every time you "log in with" Facebook, Google, GitHub, Microsoft, Twitter, that application is using OAuth2 with scopes. + +In this section you will see how to manage authentication and authorization with the same OAuth2 with scopes in your **FastAPI** application. + +!!! warning + This is a more or less advanced section. If you are just starting, you can skip it. + + You don't necessarily need OAuth2 scopes, you can handle authentication and authorization however you want. + + But OAuth2 with scopes can be nicely integrated into your API (with OpenAPI) and your API docs. + + Nevertheless, you still enforce those scopes or any other security/authorization requirement however you need in your code. + + In many cases, OAuth2 with scopes can be an overkill. + + But if you know you need it, or you are curious, keep reading. + +## OAuth2 scopes and OpenAPI + +The OAuth2 specification defines "scopes" as a list of strings separated by spaces. + +The content of each of these strings can have any format, but should not contain spaces. + +These scopes represent "permissions". + +In OpenAPI (e.g. the API docs), you can define "security schemes", the same as you saw in the previous sections. + +When one of these security schemes uses OAuth2, you can also declare and use scopes. + +## Global view + +First, let's quickly see the parts that change from the previous section about OAuth2 and JWT. Now using OAuth2 scopes: + +```Python hl_lines="2 5 9 13 48 66 106 115 116 117 122 123 124 125 126 131 145 158" +{!./src/security/tutorial005.py!} +``` + +Now let's review those changes step by step. + +## OAuth2 Security scheme + +The first change is that now we are declaring the OAuth2 security scheme with two available scopes, `me` and `items`. + +The `scopes` parameter receives a `dict` with each scope as a key and the description as the value: + +```Python hl_lines="64 65 66 67" +{!./src/security/tutorial005.py!} +``` + +Because we are now declaring those scopes,they will show up in the API docs when you log-in/authorize. + +And you will be able to select which scopes you want to give access to: `me` and `items`. + +This is the same mechanism used when you give permissions while logging in with Facebook, Google, GitHub, etc: + + + +## JWT token with scopes + +Now, modify the token *path operation* to return the scopes requested. + +We are still using the same `OAuth2PasswordRequestForm`. It includes a property `scopes` with each scope it received. + +And we return the scopes as part of the JWT token. + +!!! danger + For simplicity, here we are just adding the scopes received directly to the token. + + But in your application, for security, you should make sure you only add the scopes that the user is actually able to have, or the ones you have predefined. + +```Python hl_lines="145" +{!./src/security/tutorial005.py!} +``` + +## Declare scopes in *path operations* and dependencies + +Now we declare that the *path operation* for `/users/me/items/` requires the scope `items`. + +For this, we import and use `Security` from `fastapi`. + +You can use `Security` to declare dependencies (just like `Depends`), but `Security` also receives a parameter `scopes` with a list of scopes (strings). + +In this case, we pass a dependency function `get_current_active_user` to `Security` (the same way we would do with `Depends`). + +But we also pass a `list` of scopes, in this case with just one scope: `items` (it could have more). + +And the dependency function `get_current_active_user` can also declare sub-dependencies, not only with `Depends` but also with `Security`. Declaring its own sub-dependency function (`get_current_user`), and more scope requirements. + +In this case, it requires the scope `me` (it could require more than one scope). + +!!! note + You don't necessarily need to add different scopes in different places. + + We are doing it here to demonstrate how **FastAPI** handles scopes declared at different levels. + +```Python hl_lines="5 131 158" +{!./src/security/tutorial005.py!} +``` + +## Use `SecurityScopes` + +Now update the dependency `get_current_user`. + +This is the one used by the dependencies above. + +Here's were we are declaring the same OAuth2 scheme we created above as a dependency: `oauth2_scheme`. + +Because this dependency function doesn't have any scope requirements itself, we can use `Depends` with `oauth2_scheme`, we don't have to use `Security`. + +We also declare a special parameter of type `SecurityScopes`, imported from `fastapi.security`. + +This `SecurityScopes` class is similar to `Request` (`Request` was used to get the request object directly). + +The parameter `security_scopes` will be of type `SecurityScopes`. It will have a property `scopes` with a list containing all the scopes required by itself and all the dependencies that use this as a sub-dependency. That means, all the "dependants" or all the super-dependencies (the contrary of sub-dependencies). + +We verify that all the scopes required, by this dependency and all the dependants (including *path operations*), are included in the scopes provided in the token received, otherwise raise an `HTTPException`. + +We also check that the token data is validated with the Pydantic model (catching the `ValidationError` exception), and if we get an error reading the JWT token or validating the data with Pydantic, we also raise an `HTTPException`. + +By validating the data with Pydantic we can make sure that we have, for example, exactly a `list` of `str` with the scopes and a `str` with the `username`. Instead of, for example, a `dict`, or something else, as it could break the application at some point later. + + +```Python hl_lines="9 13 106 48 106 115 116 117 122 123" +{!./src/security/tutorial005.py!} +``` + +So, as the other dependency `get_current_active_user` has as a sub-dependency this `get_current_user`, the scope `"me"` declared at `get_current_active_user` will be included in the `security_scopes.scopes` `list` inside of `get_current_user`. + +And as the *path operation* itself also declares a scope `"items"`, it will also be part of this `list` `security_scopes.scopes` in `get_current_user`. + +Here's how the hierarchy of dependencies and scopes looks like: + +* The *path operation* `read_own_items` has: + * Required scopes `["items"]` with the dependency: + * `get_current_active_user`: + * The dependency function `get_current_active_user` has: + * Required scopes `["me"]` with the dependency: + * `get_current_user`: + * The dependency function `get_current_user` has: + * No scopes required by itself. + * A dependency using `oauth2_scheme`. + * A `security_scopes` parameter of type `SecurityScopes`: + * This `security_scopes` parameter has a property `scopes` with a `list` containing all these scopes declared above, so: + * `security_scopes.scopes` will contain `["me", "items"]` + +## More details about `SecurityScopes` + +You can use `SecurityScopes` at any point, and in multiple places, it doesn't have to be at the "root" dependency. + +It will always have the security scopes declared in the current `Security` dependencies and all the super-dependencies/dependants. + +Because the `SecurityScopes` will have all the scopes declared by super-dependencies/dependants, you can use it to verify that a token has the required scopes in a central dependency function, and then declare different scope requirements in different *path operations*. + +## Check it + +If you open the API docs, you can authenticate and specify which scopes you want to authorize. + + + +If you don't select any scope, you will be "authenticated", but when you try to access `/users/me/` or `/users/me/items/` you will get an error saying that you don't have enough permissions. + +And if you select the scope `me` but not the scope `items`, you will be able to access `/users/me/` but not `/users/me/items/`. + +That's what would happen to a third party application that tried to access one of these *path operations* with a token provided by a user, depending on how many permissions the user gave the application. + +## About third party integrations + +In this example we are using the OAuth2 "password" flow. + +This is appropriate when we are logging in to our own application, probably with our own frontend. + +Because we can trust it to receive the `username` and `password`, as we control it. + +But if you are building an OAuth2 application that others would connect to (i.e., if you are building an authentication provider equivalent to Facebook, Google, GitHub, etc.) you should use one of the other flows. + +The most common is the implicit flow. + +The most secure is the code flow, but is more complex to implement as it requires more steps. As it is more cumbersome, many providers end up suggesting the implicit flow. + +!!! note + It's common that each authentication provider names their flows in a different way, to make it part of their brand. + + But in the end, they are implementing the same OAuth2 standard. + +**FastAPI** includes utilities for all these OAuth2 authentication flows in `fastapi.security.oauth2`. diff --git a/fastapi/dependencies/models.py b/fastapi/dependencies/models.py index d7fbf853d..8bba5e369 100644 --- a/fastapi/dependencies/models.py +++ b/fastapi/dependencies/models.py @@ -27,6 +27,8 @@ class Dependant: call: Callable = None, request_param_name: str = None, background_tasks_param_name: str = None, + security_scopes_param_name: str = None, + security_scopes: List[str] = None, ) -> None: self.path_params = path_params or [] self.query_params = query_params or [] @@ -37,5 +39,7 @@ class Dependant: self.security_requirements = security_schemes or [] self.request_param_name = request_param_name self.background_tasks_param_name = background_tasks_param_name + self.security_scopes = security_scopes + self.security_scopes_param_name = security_scopes_param_name self.name = name self.call = call diff --git a/fastapi/dependencies/utils.py b/fastapi/dependencies/utils.py index 86b4ca005..4cf737d67 100644 --- a/fastapi/dependencies/utils.py +++ b/fastapi/dependencies/utils.py @@ -20,6 +20,8 @@ from uuid import UUID from fastapi import params from fastapi.dependencies.models import Dependant, SecurityRequirement from fastapi.security.base import SecurityBase +from fastapi.security.oauth2 import OAuth2, SecurityScopes +from fastapi.security.open_id_connect_url import OpenIdConnect from fastapi.utils import UnconstrainedConfig, get_path_param_names from pydantic import Schema, create_model from pydantic.error_wrappers import ErrorWrapper @@ -46,18 +48,32 @@ param_supported_types = ( ) -def get_sub_dependant(*, param: inspect.Parameter, path: str) -> Dependant: +def get_sub_dependant( + *, param: inspect.Parameter, path: str, security_scopes: List[str] = None +) -> Dependant: depends: params.Depends = param.default if depends.dependency: dependency = depends.dependency else: dependency = param.annotation - sub_dependant = get_dependant(path=path, call=dependency, name=param.name) - if isinstance(depends, params.Security) and isinstance(dependency, SecurityBase): + security_requirement = None + security_scopes = security_scopes or [] + if isinstance(depends, params.Security): + dependency_scopes = depends.scopes + security_scopes.extend(dependency_scopes) + if isinstance(dependency, SecurityBase): + use_scopes = [] + if isinstance(dependency, (OAuth2, OpenIdConnect)): + use_scopes = security_scopes security_requirement = SecurityRequirement( - security_scheme=dependency, scopes=depends.scopes + security_scheme=dependency, scopes=use_scopes ) + sub_dependant = get_dependant( + path=path, call=dependency, name=param.name, security_scopes=security_scopes + ) + if security_requirement: sub_dependant.security_requirements.append(security_requirement) + sub_dependant.security_scopes = security_scopes return sub_dependant @@ -81,7 +97,9 @@ def get_flat_dependant(dependant: Dependant) -> Dependant: return flat_dependant -def get_dependant(*, path: str, call: Callable, name: str = None) -> Dependant: +def get_dependant( + *, path: str, call: Callable, name: str = None, security_scopes: List[str] = None +) -> Dependant: path_param_names = get_path_param_names(path) endpoint_signature = inspect.signature(call) signature_params = endpoint_signature.parameters @@ -89,7 +107,9 @@ def get_dependant(*, path: str, call: Callable, name: str = None) -> Dependant: for param_name in signature_params: param = signature_params[param_name] if isinstance(param.default, params.Depends): - sub_dependant = get_sub_dependant(param=param, path=path) + sub_dependant = get_sub_dependant( + param=param, path=path, security_scopes=security_scopes + ) dependant.dependencies.append(sub_dependant) for param_name in signature_params: param = signature_params[param_name] @@ -138,6 +158,8 @@ def get_dependant(*, path: str, call: Callable, name: str = None) -> Dependant: dependant.request_param_name = param_name elif lenient_issubclass(param.annotation, BackgroundTasks): dependant.background_tasks_param_name = param_name + elif lenient_issubclass(param.annotation, SecurityScopes): + dependant.security_scopes_param_name = param_name elif not isinstance(param.default, params.Depends): add_param_to_body_fields(param=param, dependant=dependant) return dependant @@ -282,6 +304,10 @@ async def solve_dependencies( if background_tasks is None: background_tasks = BackgroundTasks() values[dependant.background_tasks_param_name] = background_tasks + if dependant.security_scopes_param_name: + values[dependant.security_scopes_param_name] = SecurityScopes( + scopes=dependant.security_scopes + ) return values, errors, background_tasks diff --git a/fastapi/security/__init__.py b/fastapi/security/__init__.py index 32d69e74d..de88d8f15 100644 --- a/fastapi/security/__init__.py +++ b/fastapi/security/__init__.py @@ -6,5 +6,10 @@ from .http import ( HTTPBearer, HTTPDigest, ) -from .oauth2 import OAuth2, OAuth2PasswordBearer, OAuth2PasswordRequestForm +from .oauth2 import ( + OAuth2, + OAuth2PasswordBearer, + OAuth2PasswordRequestForm, + SecurityScopes, +) from .open_id_connect_url import OpenIdConnect diff --git a/fastapi/security/oauth2.py b/fastapi/security/oauth2.py index d779bcae1..31ddc946d 100644 --- a/fastapi/security/oauth2.py +++ b/fastapi/security/oauth2.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import List, Optional from fastapi.openapi.models import OAuth2 as OAuth2Model, OAuthFlows as OAuthFlowsModel from fastapi.params import Form @@ -159,3 +159,8 @@ class OAuth2PasswordBearer(OAuth2): else: return None return param + + +class SecurityScopes: + def __init__(self, scopes: List[str] = None): + self.scopes = scopes or [] diff --git a/mkdocs.yml b/mkdocs.yml index 811d3c197..d7cf5f242 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -56,6 +56,7 @@ nav: - Get Current User: 'tutorial/security/get-current-user.md' - Simple OAuth2 with Password and Bearer: 'tutorial/security/simple-oauth2.md' - OAuth2 with Password (and hashing), Bearer with JWT tokens: 'tutorial/security/oauth2-jwt.md' + - OAuth2 scopes: 'tutorial/security/oauth2-scopes.md' - Using the Request Directly: 'tutorial/using-request-directly.md' - SQL (Relational) Databases: 'tutorial/sql-databases.md' - Async SQL (Relational) Databases: 'tutorial/async-sql-databases.md' diff --git a/tests/test_tutorial/test_security/test_tutorial005.py b/tests/test_tutorial/test_security/test_tutorial005.py new file mode 100644 index 000000000..d0f0bdf04 --- /dev/null +++ b/tests/test_tutorial/test_security/test_tutorial005.py @@ -0,0 +1,313 @@ +from starlette.testclient import TestClient + +from security.tutorial005 import ( + app, + create_access_token, + fake_users_db, + get_password_hash, + verify_password, +) + +client = TestClient(app) + +openapi_schema = { + "openapi": "3.0.2", + "info": {"title": "Fast API", "version": "0.1.0"}, + "paths": { + "/token": { + "post": { + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": {"$ref": "#/components/schemas/Token"} + } + }, + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + }, + }, + }, + "summary": "Route Login Access Token Post", + "operationId": "route_login_access_token_token_post", + "requestBody": { + "content": { + "application/x-www-form-urlencoded": { + "schema": { + "$ref": "#/components/schemas/Body_route_login_access_token" + } + } + }, + "required": True, + }, + } + }, + "/users/me/": { + "get": { + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": {"$ref": "#/components/schemas/User"} + } + }, + } + }, + "summary": "Read Users Me Get", + "operationId": "read_users_me_users_me__get", + "security": [{"OAuth2PasswordBearer": ["me"]}], + } + }, + "/users/me/items/": { + "get": { + "responses": { + "200": { + "description": "Successful Response", + "content": {"application/json": {"schema": {}}}, + } + }, + "summary": "Read Own Items Get", + "operationId": "read_own_items_users_me_items__get", + "security": [{"OAuth2PasswordBearer": ["items", "me"]}], + } + }, + }, + "components": { + "schemas": { + "Body_route_login_access_token": { + "title": "Body_route_login_access_token", + "required": ["username", "password"], + "type": "object", + "properties": { + "grant_type": { + "title": "Grant_Type", + "pattern": "password", + "type": "string", + }, + "username": {"title": "Username", "type": "string"}, + "password": {"title": "Password", "type": "string"}, + "scope": {"title": "Scope", "type": "string", "default": ""}, + "client_id": {"title": "Client_Id", "type": "string"}, + "client_secret": {"title": "Client_Secret", "type": "string"}, + }, + }, + "Token": { + "title": "Token", + "required": ["access_token", "token_type"], + "type": "object", + "properties": { + "access_token": {"title": "Access_Token", "type": "string"}, + "token_type": {"title": "Token_Type", "type": "string"}, + }, + }, + "User": { + "title": "User", + "required": ["username"], + "type": "object", + "properties": { + "username": {"title": "Username", "type": "string"}, + "email": {"title": "Email", "type": "string"}, + "full_name": {"title": "Full_Name", "type": "string"}, + "disabled": {"title": "Disabled", "type": "boolean"}, + }, + }, + "ValidationError": { + "title": "ValidationError", + "required": ["loc", "msg", "type"], + "type": "object", + "properties": { + "loc": { + "title": "Location", + "type": "array", + "items": {"type": "string"}, + }, + "msg": {"title": "Message", "type": "string"}, + "type": {"title": "Error Type", "type": "string"}, + }, + }, + "HTTPValidationError": { + "title": "HTTPValidationError", + "type": "object", + "properties": { + "detail": { + "title": "Detail", + "type": "array", + "items": {"$ref": "#/components/schemas/ValidationError"}, + } + }, + }, + }, + "securitySchemes": { + "OAuth2PasswordBearer": { + "type": "oauth2", + "flows": { + "password": { + "scopes": { + "me": "Read information about the current user.", + "items": "Read items.", + }, + "tokenUrl": "/token", + } + }, + } + }, + }, +} + + +def get_access_token(username="johndoe", password="secret", scope=None): + data = {"username": username, "password": password} + if scope: + data["scope"] = scope + response = client.post("/token", data=data) + content = response.json() + access_token = content.get("access_token") + return access_token + + +def test_openapi_schema(): + response = client.get("/openapi.json") + assert response.status_code == 200 + assert response.json() == openapi_schema + + +def test_login(): + response = client.post("/token", data={"username": "johndoe", "password": "secret"}) + assert response.status_code == 200 + content = response.json() + assert "access_token" in content + assert content["token_type"] == "bearer" + + +def test_login_incorrect_password(): + response = client.post( + "/token", data={"username": "johndoe", "password": "incorrect"} + ) + assert response.status_code == 400 + assert response.json() == {"detail": "Incorrect username or password"} + + +def test_login_incorrect_username(): + response = client.post("/token", data={"username": "foo", "password": "secret"}) + assert response.status_code == 400 + assert response.json() == {"detail": "Incorrect username or password"} + + +def test_no_token(): + response = client.get("/users/me") + assert response.status_code == 403 + assert response.json() == {"detail": "Not authenticated"} + + +def test_token(): + access_token = get_access_token(scope="me") + response = client.get( + "/users/me", headers={"Authorization": f"Bearer {access_token}"} + ) + print(response.json()) + assert response.status_code == 200 + assert response.json() == { + "username": "johndoe", + "full_name": "John Doe", + "email": "johndoe@example.com", + "disabled": False, + } + + +def test_incorrect_token(): + response = client.get("/users/me", headers={"Authorization": "Bearer nonexistent"}) + assert response.status_code == 403 + assert response.json() == {"detail": "Could not validate credentials"} + + +def test_incorrect_token_type(): + response = client.get( + "/users/me", headers={"Authorization": "Notexistent testtoken"} + ) + assert response.status_code == 403 + assert response.json() == {"detail": "Not authenticated"} + + +def test_verify_password(): + assert verify_password("secret", fake_users_db["johndoe"]["hashed_password"]) + + +def test_get_password_hash(): + assert get_password_hash("secretalice") + + +def test_create_access_token(): + access_token = create_access_token(data={"data": "foo"}) + assert access_token + + +def test_token_no_sub(): + response = client.get( + "/users/me", + headers={ + "Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJkYXRhIjoiZm9vIn0.9ynBhuYb4e6aW3oJr_K_TBgwcMTDpRToQIE25L57rOE" + }, + ) + assert response.status_code == 403 + assert response.json() == {"detail": "Could not validate credentials"} + + +def test_token_no_username(): + response = client.get( + "/users/me", + headers={ + "Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJmb28ifQ.NnExK_dlNAYyzACrXtXDrcWOgGY2JuPbI4eDaHdfK5Y" + }, + ) + assert response.status_code == 403 + assert response.json() == {"detail": "Could not validate credentials"} + + +def test_token_no_scope(): + access_token = get_access_token() + response = client.get( + "/users/me", headers={"Authorization": f"Bearer {access_token}"} + ) + assert response.status_code == 403 + assert response.json() == {"detail": "Not enough permissions"} + + +def test_token_inexistent_user(): + response = client.get( + "/users/me", + headers={ + "Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VybmFtZTpib2IifQ.HcfCW67Uda-0gz54ZWTqmtgJnZeNem0Q757eTa9EZuw" + }, + ) + assert response.status_code == 403 + assert response.json() == {"detail": "Could not validate credentials"} + + +def test_token_inactive_user(): + access_token = get_access_token( + username="alice", password="secretalice", scope="me" + ) + response = client.get( + "/users/me", headers={"Authorization": f"Bearer {access_token}"} + ) + print(response.json()) + assert response.status_code == 400 + assert response.json() == {"detail": "Inactive user"} + + +def test_read_items(): + access_token = get_access_token(scope="me items") + response = client.get( + "/users/me/items/", headers={"Authorization": f"Bearer {access_token}"} + ) + assert response.status_code == 200 + assert response.json() == [{"item_id": "Foo", "owner": "johndoe"}]