Browse Source

Add OAuth2 scopes with SecurityScopes, upgrade Security (#141)

*  Upgrade OAuth2 Security with scopes handling

* 📝 Update Security tutorial with OAuth2 and JWT

*  Add tutorial code for OAuth2 with scopes (and JWT)

*  Add tests for tutorial/OAuth2 with scopes

* 🐛 Fix security_scopes type declaration

*  Add docs and tests for SecurityScopes
pull/148/head
Sebastián Ramírez 6 years ago
committed by GitHub
parent
commit
7391056daf
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. BIN
      docs/img/tutorial/security/image11.png
  2. 29
      docs/src/security/tutorial004.py
  3. 162
      docs/src/security/tutorial005.py
  4. 55
      docs/tutorial/security/oauth2-jwt.md
  5. 191
      docs/tutorial/security/oauth2-scopes.md
  6. 4
      fastapi/dependencies/models.py
  7. 38
      fastapi/dependencies/utils.py
  8. 7
      fastapi/security/__init__.py
  9. 7
      fastapi/security/oauth2.py
  10. 1
      mkdocs.yml
  11. 313
      tests/test_tutorial/test_security/test_tutorial005.py

BIN
docs/img/tutorial/security/image11.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 79 KiB

29
docs/src/security/tutorial004.py

@ -1,7 +1,7 @@
from datetime import datetime, timedelta
import jwt
from fastapi import Depends, FastAPI, HTTPException, Security
from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jwt import PyJWTError
from passlib.context import CryptContext
@ -12,7 +12,6 @@ from starlette.status import HTTP_403_FORBIDDEN
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
TOKEN_SUBJECT = "access"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
@ -32,7 +31,7 @@ class Token(BaseModel):
token_type: str
class TokenPayload(BaseModel):
class TokenData(BaseModel):
username: str = None
@ -83,20 +82,26 @@ def create_access_token(*, data: dict, expires_delta: timedelta = None):
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire, "sub": TOKEN_SUBJECT})
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Security(oauth2_scheme)):
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Could not validate credentials"
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
token_data = TokenPayload(**payload)
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except PyJWTError:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Could not validate credentials"
)
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
@ -110,15 +115,15 @@ async def get_current_active_user(current_user: User = Depends(get_current_user)
async def route_login_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect email or password")
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"username": form_data.username}, expires_delta=access_token_expires
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me", response_model=User)
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user

162
docs/src/security/tutorial005.py

@ -0,0 +1,162 @@
from datetime import datetime, timedelta
from typing import List
import jwt
from fastapi import Depends, FastAPI, HTTPException, Security
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from jwt import PyJWTError
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
from starlette.status import HTTP_403_FORBIDDEN
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "[email protected]",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "[email protected]",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str = None
scopes: List[str] = []
class User(BaseModel):
username: str
email: str = None
full_name: str = None
disabled: bool = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/token",
scopes={"me": "Read information about the current user.", "items": "Read items."},
)
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(*, data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
credentials_exception = HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Could not validate credentials"
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (PyJWTError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not enough permissions"
)
return user
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"])
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token", response_model=Token)
async def route_login_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"])
):
return [{"item_id": "Foo", "owner": current_user.username}]

55
docs/tutorial/security/oauth2-jwt.md

@ -46,7 +46,7 @@ So, the thief won't be able to try to use that password in another system (as ma
PassLib is a great Python package to handle password hashes.
It supports many secure hashing algorithms, and utilities to work with them.
It supports many secure hashing algorithms and utilities to work with them.
The recommended algorithm is "Bcrypt".
@ -69,7 +69,7 @@ Import the tools we need from `passlib`.
Create a PassLib "context". This is what will be used to hash and verify passwords.
!!! tip
The PassLib context also has functionality to use different hashing algorithms, deprecate old ones, but allow verifying them, etc.
The PassLib context also has functionality to use different hashing algorithms, including deprecate old ones only to allow verifying them, etc.
For example, you could use it to read and verify passwords generated by another system (like Django) but hash any new passwords with a different algorithm like Bcrypt.
@ -81,7 +81,7 @@ And another utility to verify if a received password matches the hash stored.
And another one to authenticate and return a user.
```Python hl_lines="7 50 57 58 61 62 71 72 73 74 75 76 77"
```Python hl_lines="7 39 56 57 60 61 70 71 72 73 74 75 76"
{!./src/security/tutorial004.py!}
```
@ -94,7 +94,7 @@ Import the modules installed.
Create a random secret key that will be used to sign the JWT tokens.
To generate a secure random secret, key use the command:
To generate a secure random secret key use the command:
```bash
openssl rand -hex 32
@ -104,15 +104,13 @@ And copy the output to the variable `SECRET_KEY` (don't use the one in the examp
Create a variable `ALGORITHM` with the algorithm used to sign the JWT token and set it to `"HS256"`.
And another one for the `TOKEN_SUBJECT`, and set it to, for example, `"access"`.
Create a variable for the expiration of the token.
Define a Pydantic Model that will be used in the token endpoint for the response.
Create a utility function to generate a new access token.
```Python hl_lines="3 6 13 14 15 16 30 31 32 80 81 82 83 84 85 86 87 88"
```Python hl_lines="3 6 13 14 15 29 30 31 79 80 81 82 83 84 85 86 87"
{!./src/security/tutorial004.py!}
```
@ -124,7 +122,7 @@ Decode the received token, verify it, and return the current user.
If the token is invalid, return an HTTP error right away.
```Python hl_lines="91 92 93 94 95 96 97 98 99 100"
```Python hl_lines="90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105"
{!./src/security/tutorial004.py!}
```
@ -134,10 +132,33 @@ Create a `timedelta` with the expiration time of the token.
Create a real JWT access token and return it.
```Python hl_lines="114 115 116 117 118"
```Python hl_lines="114 115 116 117 118 119 120 121 122 123"
{!./src/security/tutorial004.py!}
```
### Technical details about the JWT "subject" `sub`
The JWT specification says that there's a key `sub`, with the subject of the token.
It's optional to use it, but that's where you would put the user's identification, so we are using it here.
JWT might be used for other things apart from identifying a user and allowing him to perform operations directly on your API.
For example, you could identify a "car" or a "blog post".
Then you could add permissions about that entity, like "drive" (for the car) or "edit" (for the blog).
And then, you could give that JWT token to a user (or bot), and he could use it to perform those actions (drive the car, or edit the blog post) without even needing to have an account, just with the JWT token your API generated for that.
Using these ideas, JWT can be used for way more sophisticate scenarios.
In those cases, several of those entities could have the same ID, let's say `foo` (a user `foo`, a car `foo`, and a blog post `foo`).
So, to avoid ID collisions, when creating the JWT token for the user, you could prefix the value of the `sub` key, e.g. with `username:`.
The important thing to have in mind is that the `sub` key should have a unique identifier across the entire application.
## Check it
Run the server and go to the docs: <a href="http://127.0.0.1:8000/docs" target="_blank">http://127.0.0.1:8000/docs</a>.
@ -158,7 +179,7 @@ Password: `secret`
<img src="/img/tutorial/security/image08.png">
Call the endpoint `/users/me`, you will get the response as:
Call the endpoint `/users/me/`, you will get the response as:
```JSON
{
@ -180,15 +201,17 @@ If you open the developer tools, you could see how the data sent and received is
## Advanced usage with `scopes`
We didn't use it in this example, but `Security` can receive a parameter `scopes`, as a list of strings.
OAuth2 has the notion of "scopes".
You can use them to add a specific set of permissions to a JWT token.
It would describe the scopes required for a specific path operation, as different path operations might require different security scopes, even while using the same `OAuth2PasswordBearer` (or any of the other tools).
Then you can give this token to a user directly or a third party, to interact with your API with a set of restrictions.
This only applies to OAuth2, and it might be, more or less, an advanced feature, but it is there, if you need to use it.
You can learn how to use them and how they are integrated into **FastAPI** in the next section.
## Recap
This concludes our tour for the security features of **FastAPI**.
With what you have seen up to now, you can set up a secure **FastAPI** application using standards like OAuth2 and JWT.
In almost any framework handling the security becomes a rather complex subject quite quickly.
@ -205,3 +228,7 @@ And you can use directly many well maintained and widely used packages like `pas
But it provides you the tools to simplify the process as much as possible without compromising flexibility, robustness or security.
And you can use secure, standard protocols like OAuth2 in a relatively simple way.
In the next (optional) section you can see how to extend this even further, using OAuth2 "scopes", for a more fine-grained permission system following standards.
OAuth2 with scopes (explained in the next section) is the mechanism used by many big authentication providers, like Facebook, Google, GitHub, Microsoft, Twitter, etc.

191
docs/tutorial/security/oauth2-scopes.md

@ -0,0 +1,191 @@
You can use OAuth2 scopes directly with **FastAPI**, they are integrated to work seamlessly.
This would allow you to have a more fine-grained permission system, following standards like OAuth2, integrated into your OpenAPI application (and the API docs).
OAuth2 with scopes is the mechanism used by many big authentication providers, like Facebook, Google, GitHub, Microsoft, Twitter, etc. They use it to provide specific permissions to users and applications.
Every time you "log in with" Facebook, Google, GitHub, Microsoft, Twitter, that application is using OAuth2 with scopes.
In this section you will see how to manage authentication and authorization with the same OAuth2 with scopes in your **FastAPI** application.
!!! warning
This is a more or less advanced section. If you are just starting, you can skip it.
You don't necessarily need OAuth2 scopes, you can handle authentication and authorization however you want.
But OAuth2 with scopes can be nicely integrated into your API (with OpenAPI) and your API docs.
Nevertheless, you still enforce those scopes or any other security/authorization requirement however you need in your code.
In many cases, OAuth2 with scopes can be an overkill.
But if you know you need it, or you are curious, keep reading.
## OAuth2 scopes and OpenAPI
The OAuth2 specification defines "scopes" as a list of strings separated by spaces.
The content of each of these strings can have any format, but should not contain spaces.
These scopes represent "permissions".
In OpenAPI (e.g. the API docs), you can define "security schemes", the same as you saw in the previous sections.
When one of these security schemes uses OAuth2, you can also declare and use scopes.
## Global view
First, let's quickly see the parts that change from the previous section about OAuth2 and JWT. Now using OAuth2 scopes:
```Python hl_lines="2 5 9 13 48 66 106 115 116 117 122 123 124 125 126 131 145 158"
{!./src/security/tutorial005.py!}
```
Now let's review those changes step by step.
## OAuth2 Security scheme
The first change is that now we are declaring the OAuth2 security scheme with two available scopes, `me` and `items`.
The `scopes` parameter receives a `dict` with each scope as a key and the description as the value:
```Python hl_lines="64 65 66 67"
{!./src/security/tutorial005.py!}
```
Because we are now declaring those scopes,they will show up in the API docs when you log-in/authorize.
And you will be able to select which scopes you want to give access to: `me` and `items`.
This is the same mechanism used when you give permissions while logging in with Facebook, Google, GitHub, etc:
<img src="/img/tutorial/security/image11.png">
## JWT token with scopes
Now, modify the token *path operation* to return the scopes requested.
We are still using the same `OAuth2PasswordRequestForm`. It includes a property `scopes` with each scope it received.
And we return the scopes as part of the JWT token.
!!! danger
For simplicity, here we are just adding the scopes received directly to the token.
But in your application, for security, you should make sure you only add the scopes that the user is actually able to have, or the ones you have predefined.
```Python hl_lines="145"
{!./src/security/tutorial005.py!}
```
## Declare scopes in *path operations* and dependencies
Now we declare that the *path operation* for `/users/me/items/` requires the scope `items`.
For this, we import and use `Security` from `fastapi`.
You can use `Security` to declare dependencies (just like `Depends`), but `Security` also receives a parameter `scopes` with a list of scopes (strings).
In this case, we pass a dependency function `get_current_active_user` to `Security` (the same way we would do with `Depends`).
But we also pass a `list` of scopes, in this case with just one scope: `items` (it could have more).
And the dependency function `get_current_active_user` can also declare sub-dependencies, not only with `Depends` but also with `Security`. Declaring its own sub-dependency function (`get_current_user`), and more scope requirements.
In this case, it requires the scope `me` (it could require more than one scope).
!!! note
You don't necessarily need to add different scopes in different places.
We are doing it here to demonstrate how **FastAPI** handles scopes declared at different levels.
```Python hl_lines="5 131 158"
{!./src/security/tutorial005.py!}
```
## Use `SecurityScopes`
Now update the dependency `get_current_user`.
This is the one used by the dependencies above.
Here's were we are declaring the same OAuth2 scheme we created above as a dependency: `oauth2_scheme`.
Because this dependency function doesn't have any scope requirements itself, we can use `Depends` with `oauth2_scheme`, we don't have to use `Security`.
We also declare a special parameter of type `SecurityScopes`, imported from `fastapi.security`.
This `SecurityScopes` class is similar to `Request` (`Request` was used to get the request object directly).
The parameter `security_scopes` will be of type `SecurityScopes`. It will have a property `scopes` with a list containing all the scopes required by itself and all the dependencies that use this as a sub-dependency. That means, all the "dependants" or all the super-dependencies (the contrary of sub-dependencies).
We verify that all the scopes required, by this dependency and all the dependants (including *path operations*), are included in the scopes provided in the token received, otherwise raise an `HTTPException`.
We also check that the token data is validated with the Pydantic model (catching the `ValidationError` exception), and if we get an error reading the JWT token or validating the data with Pydantic, we also raise an `HTTPException`.
By validating the data with Pydantic we can make sure that we have, for example, exactly a `list` of `str` with the scopes and a `str` with the `username`. Instead of, for example, a `dict`, or something else, as it could break the application at some point later.
```Python hl_lines="9 13 106 48 106 115 116 117 122 123"
{!./src/security/tutorial005.py!}
```
So, as the other dependency `get_current_active_user` has as a sub-dependency this `get_current_user`, the scope `"me"` declared at `get_current_active_user` will be included in the `security_scopes.scopes` `list` inside of `get_current_user`.
And as the *path operation* itself also declares a scope `"items"`, it will also be part of this `list` `security_scopes.scopes` in `get_current_user`.
Here's how the hierarchy of dependencies and scopes looks like:
* The *path operation* `read_own_items` has:
* Required scopes `["items"]` with the dependency:
* `get_current_active_user`:
* The dependency function `get_current_active_user` has:
* Required scopes `["me"]` with the dependency:
* `get_current_user`:
* The dependency function `get_current_user` has:
* No scopes required by itself.
* A dependency using `oauth2_scheme`.
* A `security_scopes` parameter of type `SecurityScopes`:
* This `security_scopes` parameter has a property `scopes` with a `list` containing all these scopes declared above, so:
* `security_scopes.scopes` will contain `["me", "items"]`
## More details about `SecurityScopes`
You can use `SecurityScopes` at any point, and in multiple places, it doesn't have to be at the "root" dependency.
It will always have the security scopes declared in the current `Security` dependencies and all the super-dependencies/dependants.
Because the `SecurityScopes` will have all the scopes declared by super-dependencies/dependants, you can use it to verify that a token has the required scopes in a central dependency function, and then declare different scope requirements in different *path operations*.
## Check it
If you open the API docs, you can authenticate and specify which scopes you want to authorize.
<img src="/img/tutorial/security/image11.png">
If you don't select any scope, you will be "authenticated", but when you try to access `/users/me/` or `/users/me/items/` you will get an error saying that you don't have enough permissions.
And if you select the scope `me` but not the scope `items`, you will be able to access `/users/me/` but not `/users/me/items/`.
That's what would happen to a third party application that tried to access one of these *path operations* with a token provided by a user, depending on how many permissions the user gave the application.
## About third party integrations
In this example we are using the OAuth2 "password" flow.
This is appropriate when we are logging in to our own application, probably with our own frontend.
Because we can trust it to receive the `username` and `password`, as we control it.
But if you are building an OAuth2 application that others would connect to (i.e., if you are building an authentication provider equivalent to Facebook, Google, GitHub, etc.) you should use one of the other flows.
The most common is the implicit flow.
The most secure is the code flow, but is more complex to implement as it requires more steps. As it is more cumbersome, many providers end up suggesting the implicit flow.
!!! note
It's common that each authentication provider names their flows in a different way, to make it part of their brand.
But in the end, they are implementing the same OAuth2 standard.
**FastAPI** includes utilities for all these OAuth2 authentication flows in `fastapi.security.oauth2`.

4
fastapi/dependencies/models.py

@ -27,6 +27,8 @@ class Dependant:
call: Callable = None,
request_param_name: str = None,
background_tasks_param_name: str = None,
security_scopes_param_name: str = None,
security_scopes: List[str] = None,
) -> None:
self.path_params = path_params or []
self.query_params = query_params or []
@ -37,5 +39,7 @@ class Dependant:
self.security_requirements = security_schemes or []
self.request_param_name = request_param_name
self.background_tasks_param_name = background_tasks_param_name
self.security_scopes = security_scopes
self.security_scopes_param_name = security_scopes_param_name
self.name = name
self.call = call

38
fastapi/dependencies/utils.py

@ -20,6 +20,8 @@ from uuid import UUID
from fastapi import params
from fastapi.dependencies.models import Dependant, SecurityRequirement
from fastapi.security.base import SecurityBase
from fastapi.security.oauth2 import OAuth2, SecurityScopes
from fastapi.security.open_id_connect_url import OpenIdConnect
from fastapi.utils import UnconstrainedConfig, get_path_param_names
from pydantic import Schema, create_model
from pydantic.error_wrappers import ErrorWrapper
@ -46,18 +48,32 @@ param_supported_types = (
)
def get_sub_dependant(*, param: inspect.Parameter, path: str) -> Dependant:
def get_sub_dependant(
*, param: inspect.Parameter, path: str, security_scopes: List[str] = None
) -> Dependant:
depends: params.Depends = param.default
if depends.dependency:
dependency = depends.dependency
else:
dependency = param.annotation
sub_dependant = get_dependant(path=path, call=dependency, name=param.name)
if isinstance(depends, params.Security) and isinstance(dependency, SecurityBase):
security_requirement = None
security_scopes = security_scopes or []
if isinstance(depends, params.Security):
dependency_scopes = depends.scopes
security_scopes.extend(dependency_scopes)
if isinstance(dependency, SecurityBase):
use_scopes = []
if isinstance(dependency, (OAuth2, OpenIdConnect)):
use_scopes = security_scopes
security_requirement = SecurityRequirement(
security_scheme=dependency, scopes=depends.scopes
security_scheme=dependency, scopes=use_scopes
)
sub_dependant = get_dependant(
path=path, call=dependency, name=param.name, security_scopes=security_scopes
)
if security_requirement:
sub_dependant.security_requirements.append(security_requirement)
sub_dependant.security_scopes = security_scopes
return sub_dependant
@ -81,7 +97,9 @@ def get_flat_dependant(dependant: Dependant) -> Dependant:
return flat_dependant
def get_dependant(*, path: str, call: Callable, name: str = None) -> Dependant:
def get_dependant(
*, path: str, call: Callable, name: str = None, security_scopes: List[str] = None
) -> Dependant:
path_param_names = get_path_param_names(path)
endpoint_signature = inspect.signature(call)
signature_params = endpoint_signature.parameters
@ -89,7 +107,9 @@ def get_dependant(*, path: str, call: Callable, name: str = None) -> Dependant:
for param_name in signature_params:
param = signature_params[param_name]
if isinstance(param.default, params.Depends):
sub_dependant = get_sub_dependant(param=param, path=path)
sub_dependant = get_sub_dependant(
param=param, path=path, security_scopes=security_scopes
)
dependant.dependencies.append(sub_dependant)
for param_name in signature_params:
param = signature_params[param_name]
@ -138,6 +158,8 @@ def get_dependant(*, path: str, call: Callable, name: str = None) -> Dependant:
dependant.request_param_name = param_name
elif lenient_issubclass(param.annotation, BackgroundTasks):
dependant.background_tasks_param_name = param_name
elif lenient_issubclass(param.annotation, SecurityScopes):
dependant.security_scopes_param_name = param_name
elif not isinstance(param.default, params.Depends):
add_param_to_body_fields(param=param, dependant=dependant)
return dependant
@ -282,6 +304,10 @@ async def solve_dependencies(
if background_tasks is None:
background_tasks = BackgroundTasks()
values[dependant.background_tasks_param_name] = background_tasks
if dependant.security_scopes_param_name:
values[dependant.security_scopes_param_name] = SecurityScopes(
scopes=dependant.security_scopes
)
return values, errors, background_tasks

7
fastapi/security/__init__.py

@ -6,5 +6,10 @@ from .http import (
HTTPBearer,
HTTPDigest,
)
from .oauth2 import OAuth2, OAuth2PasswordBearer, OAuth2PasswordRequestForm
from .oauth2 import (
OAuth2,
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)
from .open_id_connect_url import OpenIdConnect

7
fastapi/security/oauth2.py

@ -1,4 +1,4 @@
from typing import Optional
from typing import List, Optional
from fastapi.openapi.models import OAuth2 as OAuth2Model, OAuthFlows as OAuthFlowsModel
from fastapi.params import Form
@ -159,3 +159,8 @@ class OAuth2PasswordBearer(OAuth2):
else:
return None
return param
class SecurityScopes:
def __init__(self, scopes: List[str] = None):
self.scopes = scopes or []

1
mkdocs.yml

@ -56,6 +56,7 @@ nav:
- Get Current User: 'tutorial/security/get-current-user.md'
- Simple OAuth2 with Password and Bearer: 'tutorial/security/simple-oauth2.md'
- OAuth2 with Password (and hashing), Bearer with JWT tokens: 'tutorial/security/oauth2-jwt.md'
- OAuth2 scopes: 'tutorial/security/oauth2-scopes.md'
- Using the Request Directly: 'tutorial/using-request-directly.md'
- SQL (Relational) Databases: 'tutorial/sql-databases.md'
- Async SQL (Relational) Databases: 'tutorial/async-sql-databases.md'

313
tests/test_tutorial/test_security/test_tutorial005.py

@ -0,0 +1,313 @@
from starlette.testclient import TestClient
from security.tutorial005 import (
app,
create_access_token,
fake_users_db,
get_password_hash,
verify_password,
)
client = TestClient(app)
openapi_schema = {
"openapi": "3.0.2",
"info": {"title": "Fast API", "version": "0.1.0"},
"paths": {
"/token": {
"post": {
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/Token"}
}
},
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/HTTPValidationError"
}
}
},
},
},
"summary": "Route Login Access Token Post",
"operationId": "route_login_access_token_token_post",
"requestBody": {
"content": {
"application/x-www-form-urlencoded": {
"schema": {
"$ref": "#/components/schemas/Body_route_login_access_token"
}
}
},
"required": True,
},
}
},
"/users/me/": {
"get": {
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/User"}
}
},
}
},
"summary": "Read Users Me Get",
"operationId": "read_users_me_users_me__get",
"security": [{"OAuth2PasswordBearer": ["me"]}],
}
},
"/users/me/items/": {
"get": {
"responses": {
"200": {
"description": "Successful Response",
"content": {"application/json": {"schema": {}}},
}
},
"summary": "Read Own Items Get",
"operationId": "read_own_items_users_me_items__get",
"security": [{"OAuth2PasswordBearer": ["items", "me"]}],
}
},
},
"components": {
"schemas": {
"Body_route_login_access_token": {
"title": "Body_route_login_access_token",
"required": ["username", "password"],
"type": "object",
"properties": {
"grant_type": {
"title": "Grant_Type",
"pattern": "password",
"type": "string",
},
"username": {"title": "Username", "type": "string"},
"password": {"title": "Password", "type": "string"},
"scope": {"title": "Scope", "type": "string", "default": ""},
"client_id": {"title": "Client_Id", "type": "string"},
"client_secret": {"title": "Client_Secret", "type": "string"},
},
},
"Token": {
"title": "Token",
"required": ["access_token", "token_type"],
"type": "object",
"properties": {
"access_token": {"title": "Access_Token", "type": "string"},
"token_type": {"title": "Token_Type", "type": "string"},
},
},
"User": {
"title": "User",
"required": ["username"],
"type": "object",
"properties": {
"username": {"title": "Username", "type": "string"},
"email": {"title": "Email", "type": "string"},
"full_name": {"title": "Full_Name", "type": "string"},
"disabled": {"title": "Disabled", "type": "boolean"},
},
},
"ValidationError": {
"title": "ValidationError",
"required": ["loc", "msg", "type"],
"type": "object",
"properties": {
"loc": {
"title": "Location",
"type": "array",
"items": {"type": "string"},
},
"msg": {"title": "Message", "type": "string"},
"type": {"title": "Error Type", "type": "string"},
},
},
"HTTPValidationError": {
"title": "HTTPValidationError",
"type": "object",
"properties": {
"detail": {
"title": "Detail",
"type": "array",
"items": {"$ref": "#/components/schemas/ValidationError"},
}
},
},
},
"securitySchemes": {
"OAuth2PasswordBearer": {
"type": "oauth2",
"flows": {
"password": {
"scopes": {
"me": "Read information about the current user.",
"items": "Read items.",
},
"tokenUrl": "/token",
}
},
}
},
},
}
def get_access_token(username="johndoe", password="secret", scope=None):
data = {"username": username, "password": password}
if scope:
data["scope"] = scope
response = client.post("/token", data=data)
content = response.json()
access_token = content.get("access_token")
return access_token
def test_openapi_schema():
response = client.get("/openapi.json")
assert response.status_code == 200
assert response.json() == openapi_schema
def test_login():
response = client.post("/token", data={"username": "johndoe", "password": "secret"})
assert response.status_code == 200
content = response.json()
assert "access_token" in content
assert content["token_type"] == "bearer"
def test_login_incorrect_password():
response = client.post(
"/token", data={"username": "johndoe", "password": "incorrect"}
)
assert response.status_code == 400
assert response.json() == {"detail": "Incorrect username or password"}
def test_login_incorrect_username():
response = client.post("/token", data={"username": "foo", "password": "secret"})
assert response.status_code == 400
assert response.json() == {"detail": "Incorrect username or password"}
def test_no_token():
response = client.get("/users/me")
assert response.status_code == 403
assert response.json() == {"detail": "Not authenticated"}
def test_token():
access_token = get_access_token(scope="me")
response = client.get(
"/users/me", headers={"Authorization": f"Bearer {access_token}"}
)
print(response.json())
assert response.status_code == 200
assert response.json() == {
"username": "johndoe",
"full_name": "John Doe",
"email": "[email protected]",
"disabled": False,
}
def test_incorrect_token():
response = client.get("/users/me", headers={"Authorization": "Bearer nonexistent"})
assert response.status_code == 403
assert response.json() == {"detail": "Could not validate credentials"}
def test_incorrect_token_type():
response = client.get(
"/users/me", headers={"Authorization": "Notexistent testtoken"}
)
assert response.status_code == 403
assert response.json() == {"detail": "Not authenticated"}
def test_verify_password():
assert verify_password("secret", fake_users_db["johndoe"]["hashed_password"])
def test_get_password_hash():
assert get_password_hash("secretalice")
def test_create_access_token():
access_token = create_access_token(data={"data": "foo"})
assert access_token
def test_token_no_sub():
response = client.get(
"/users/me",
headers={
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJkYXRhIjoiZm9vIn0.9ynBhuYb4e6aW3oJr_K_TBgwcMTDpRToQIE25L57rOE"
},
)
assert response.status_code == 403
assert response.json() == {"detail": "Could not validate credentials"}
def test_token_no_username():
response = client.get(
"/users/me",
headers={
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJmb28ifQ.NnExK_dlNAYyzACrXtXDrcWOgGY2JuPbI4eDaHdfK5Y"
},
)
assert response.status_code == 403
assert response.json() == {"detail": "Could not validate credentials"}
def test_token_no_scope():
access_token = get_access_token()
response = client.get(
"/users/me", headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == 403
assert response.json() == {"detail": "Not enough permissions"}
def test_token_inexistent_user():
response = client.get(
"/users/me",
headers={
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VybmFtZTpib2IifQ.HcfCW67Uda-0gz54ZWTqmtgJnZeNem0Q757eTa9EZuw"
},
)
assert response.status_code == 403
assert response.json() == {"detail": "Could not validate credentials"}
def test_token_inactive_user():
access_token = get_access_token(
username="alice", password="secretalice", scope="me"
)
response = client.get(
"/users/me", headers={"Authorization": f"Bearer {access_token}"}
)
print(response.json())
assert response.status_code == 400
assert response.json() == {"detail": "Inactive user"}
def test_read_items():
access_token = get_access_token(scope="me items")
response = client.get(
"/users/me/items/", headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == 200
assert response.json() == [{"item_id": "Foo", "owner": "johndoe"}]
Loading…
Cancel
Save