6 changed files with 252 additions and 0 deletions
After Width: | Height: | Size: 94 KiB |
After Width: | Height: | Size: 105 KiB |
After Width: | Height: | Size: 67 KiB |
@ -0,0 +1,115 @@ |
|||
# OpenID Connect (OIDC) with JWT Access Tokens |
|||
|
|||
For this tutorial we will be using OpenID Connect (OIDC) as an *authentication* layer that builds on top of the OAuth2 *authorization* layer. |
|||
|
|||
We will be using the Swagger UI to serve the OpenID Connect authentication flow. The **FastAPI** (default) router will implement a OAuth2 resource server that validates the JWT access tokens and grant access to the router's endpoints. |
|||
|
|||
We will use a custom claim to grant permission to endpoints for users with specific roles which are represented as a claim in the access token (there is no standard defined what the claim name is, so it has a configurable or *custom* name). Typically, the authorization server exposes the user's group membership in a specific claim in the JWT Access Token, which defaults to 'groups' in the tutorial. |
|||
|
|||
Note that the Swagger UI mirrors the OIDC main flow of the frontend that would be used in a production environment. Therefore the OIDC security scheme, as represented in the openAPI definition, is separate from the OAuth2 scheme. |
|||
|
|||
# Configure Requirements |
|||
|
|||
First, you will need to select an OpenID provider if you do not have one already. There are ones that offer free trials or free tiers to experiment with [here](https://identitymanagementinstitute.org/identity-and-access-management-vendor-list/). |
|||
|
|||
|
|||
## Setup OpenID provder |
|||
|
|||
First, we will need to configure an Applicaton (i.e. Relying Party in OpenID-speak) in the OpenID provider. This application allows the **FastAPI** client that logs in to the OpenID Connect provider: |
|||
!!! check "Step 1 - Create Application" |
|||
* Create an Application of type SPA |
|||
* Select Authorization Code, Refresh Token, Require PKCE |
|||
* Configure sign-in redirect URIs: `http://localhost:8080/docs/oauth2-redirect` |
|||
* Configure sign-out redirect URIs: `http://localhost:8080/docs/` |
|||
* *Write down the client id* |
|||
|
|||
Then, we will select an authorization server to verify user identities and issue tokens for secure authentication and authorization of login requests: |
|||
!!! check "Step 2 - Configure authorization server to return a custom claim" |
|||
* Select/create a custom authorization server for the abovementioned application |
|||
* Create a custom claim with the name "`groups`". |
|||
* Map the values to the groups of which the authenticated user is member of |
|||
* *Write down issuer URL* |
|||
* *Write down audience* |
|||
|
|||
Finally, we will need to create a user and a group named "`Foo`" to |
|||
!!! check "Step 3 - Create a user and group" |
|||
* Create a group called "`Foo`" |
|||
* Create a user |
|||
* Assign the "`Foo`"` group to the user |
|||
* Assign the application of step 1 to the user |
|||
* *Write down user/password as you will need to authenticate with it later* |
|||
|
|||
## Configure your **FastAPI** Application |
|||
|
|||
We assume a running pip environment with **FastAPI** installed (see [here](../../index.md#installation)). |
|||
|
|||
This example contains a `AccessTokenValidator` that validates the JWT access tokens using the jwks url that is part of the oidc well known configuration. It requires a Python JavaScript Object Signing and Encryprion (JOSE) library, a HTTP client to fetch keysets and some cache utilities. |
|||
|
|||
|
|||
!!! check "Step 4 - Install AccessTokenValidator Dependencies" |
|||
```console |
|||
pip install jose cachetools types-cachetools httpx |
|||
``` |
|||
|
|||
You need to fill in the values in the .env file that you wrote down from the previous steps: |
|||
!!! check "Step 5 - Configure **FastAPI** environment" |
|||
``` |
|||
client_id = "Client Id of Step 1 here" |
|||
issuer = "Issuer URL of Step 2 here" |
|||
audience = "Audience of Step 2 here" |
|||
``` |
|||
|
|||
This was the final step of the configuration. |
|||
|
|||
# Running the **FastAPI** Application |
|||
|
|||
Finally we come to the actual **FastAPI** code: |
|||
|
|||
=== "Python 3.9+" |
|||
|
|||
```Python hl_lines="112-124 127-129 134" |
|||
{!> ../../../docs_src/security/tutorial008_an_py39.py!} |
|||
``` |
|||
|
|||
!!! check "some small tweaks necessary?" |
|||
* line 118, set usePkceWithAuthorizationCodeGrant if you require PKCE authentication (configured when you set up your application) |
|||
* line 116, add additional scopes to "openid" if your authorization requires this |
|||
|
|||
|
|||
If you save this file as `main.py`, you can run the app [as normal](../../index.md#run-it), for instance: |
|||
```bash |
|||
uvicorn main:app --port 8080 --reload |
|||
``` |
|||
(*If you do not specify the correct port defined in Step 1, the authentication flow will fail*) |
|||
|
|||
|
|||
# Test the **FastAPI** Application |
|||
|
|||
When the application is running, you can then point your browser to the [Interactive API Docs](../../index.md#interactive-api-docs): |
|||
`http://localhost:8080/docs/` |
|||
|
|||
Authenticate first in the Swagger UI using the 'Authorize' button at the top and scroll to the topmost authentication flow named **'OpenIdConnect (OAuth2, authorization_code with PKCE)'**: |
|||
|
|||
<img src="/img/tutorial/security/image13.png"> |
|||
|
|||
Then press the 'Authorize' button. |
|||
|
|||
When successfully authenticated, you will see that your session is 'authorized': |
|||
|
|||
<img src="/img/tutorial/security/image14.png"> |
|||
|
|||
Press the 'Close' button to close this screen. |
|||
|
|||
Then execute the /hello endpoint with your user if part of the "`Foo`" group: |
|||
|
|||
<img src="/img/tutorial/security/image15.png"> |
|||
|
|||
If you see "Hi!" as a response, your user was successfully authenticated and had the "`Foo`" role in the claim as required by the /hello endpoint. |
|||
|
|||
To understand the code step by step, it will help if you step through the code using a [Debugger](../debugging.md#run-your-code-with-your-debugger). |
|||
|
|||
Good luck! |
|||
|
|||
# Appendix - References |
|||
|
|||
* OIDC Terminology: https://openid.net/specs/openid-connect-core-1_0.html#Terminology |
@ -0,0 +1,136 @@ |
|||
from typing import Annotated, Any, Dict, Optional |
|||
|
|||
import httpx |
|||
from cachetools import TTLCache |
|||
from fastapi import Depends, FastAPI, HTTPException, Security |
|||
from fastapi.security import ( |
|||
HTTPAuthorizationCredentials, |
|||
HTTPBearer, |
|||
OpenIdConnect, |
|||
SecurityScopes, |
|||
) |
|||
from jose import JWTError, jwt |
|||
from pydantic import Field |
|||
from pydantic_settings import BaseSettings |
|||
from starlette.requests import Request |
|||
from starlette.status import HTTP_400_BAD_REQUEST, HTTP_403_FORBIDDEN |
|||
|
|||
|
|||
class AccessTokenCredentials(HTTPAuthorizationCredentials): |
|||
token: Dict[str, Any] |
|||
|
|||
|
|||
class AccessTokenValidator(HTTPBearer): |
|||
"""Generic HTTPBearer Validator that validates JWT tokens given the JWKS provided at jwks_url.""" |
|||
|
|||
def __init__( |
|||
self, |
|||
*, |
|||
jwks_url: str, |
|||
audience: str, |
|||
issuer: str, |
|||
expire_seconds: int = 3600, |
|||
roles_claim: str = "groups", |
|||
scheme_name: Optional[str] = None, |
|||
description: Optional[str] = None, |
|||
): |
|||
super().__init__(scheme_name=scheme_name, description=description) |
|||
self.uri = jwks_url |
|||
self.audience = audience |
|||
self.issuer = issuer |
|||
self.roles_claim = roles_claim |
|||
self.keyset_cache: TTLCache[str, str] = TTLCache(16, expire_seconds) |
|||
|
|||
async def get_jwt_keyset(self) -> str: |
|||
"""Retrieves keyset when expired/not cached yet.""" |
|||
result: Optional[str] = self.keyset_cache.get(self.uri) |
|||
if result is None: |
|||
async with httpx.AsyncClient() as client: |
|||
response = await client.get(self.uri) |
|||
result = self.keyset_cache[self.uri] = response.text |
|||
return result |
|||
|
|||
async def __call__(self, request: Request, security_scopes: SecurityScopes) -> AccessTokenCredentials: # type: ignore |
|||
"""Validates the JWT Access Token. If security_scopes are given, they are validated against the roles_claim in the Access Token.""" |
|||
# 1. Unpack bearer token |
|||
unverified_token = await super().__call__(request) |
|||
if not unverified_token: |
|||
raise HTTPException(HTTP_400_BAD_REQUEST, "Invalid Access Token") |
|||
access_token = unverified_token.credentials |
|||
try: |
|||
# 2. Get keyset from authorization server so that we can validate the JWT Access Token |
|||
keyset = await self.get_jwt_keyset() |
|||
# 3. Perform validation |
|||
verified_token = jwt.decode( |
|||
token=access_token, |
|||
key=keyset, |
|||
audience=self.audience, |
|||
issuer=self.issuer, |
|||
) |
|||
except JWTError: |
|||
raise HTTPException( |
|||
status_code=HTTP_400_BAD_REQUEST, |
|||
detail="Unsupported authorization code", |
|||
) |
|||
|
|||
# 4. if security scopes are present, validate them |
|||
if security_scopes and security_scopes.scopes: |
|||
# 4.1 the roles_claim must be present in the access token |
|||
scopes = verified_token.get(self.roles_claim) |
|||
if scopes is None: |
|||
raise HTTPException( |
|||
status_code=HTTP_400_BAD_REQUEST, detail="Unsupported Access Token" |
|||
) |
|||
# 4.2 all required roles in the roles_claim must be present |
|||
if not set(security_scopes.scopes).issubset(set(scopes)): |
|||
raise HTTPException( |
|||
status_code=HTTP_403_FORBIDDEN, detail="Not Authorized" |
|||
) |
|||
|
|||
return AccessTokenCredentials( |
|||
scheme=self.scheme_name, credentials=access_token, token=verified_token |
|||
) |
|||
|
|||
|
|||
class Settings(BaseSettings): |
|||
"""Settings wil be read from an .env file""" |
|||
|
|||
issuer: str = Field(default=...) |
|||
audience: str = Field(default=...) |
|||
client_id: str = Field(default=...) |
|||
|
|||
class Config: |
|||
env_file = ".env" |
|||
|
|||
|
|||
settings = Settings() |
|||
|
|||
# Standard OIDC URLs |
|||
oidc_url = f"{settings.issuer}/.well-known/openid-configuration" |
|||
jwks_url = f"{settings.issuer}/v1/keys" |
|||
|
|||
openid_connect = OpenIdConnect(openIdConnectUrl=oidc_url) |
|||
|
|||
swagger_ui_init_oauth = { |
|||
"clientId": settings.client_id, |
|||
"scopes": ["openid"], # fill in additional scopes when necessary |
|||
"appName": "Test Application", |
|||
"usePkceWithAuthorizationCodeGrant": True, |
|||
} |
|||
|
|||
# The openid_connect security scheme is given as a dependency so that you can authenticate using the swagger UI |
|||
app = FastAPI( |
|||
swagger_ui_init_oauth=swagger_ui_init_oauth, dependencies=[Depends(openid_connect)] |
|||
) |
|||
|
|||
# the tokenvalidator is used for all endpoints that need to be authorized |
|||
oauth2 = AccessTokenValidator( |
|||
jwks_url=jwks_url, audience=settings.audience, issuer=settings.issuer |
|||
) |
|||
|
|||
|
|||
@app.get("/hello") |
|||
async def hello( |
|||
token: Annotated[AccessTokenCredentials, Security(oauth2, scopes=["Foo"])] |
|||
) -> str: |
|||
return "Hi!" |
Loading…
Reference in new issue