rhysrevans3 2 days ago
committed by GitHub
parent
commit
b4e3d39a8f
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 3
      docs/en/docs/reference/security/index.md
  2. 2
      docs/en/docs/release-notes.md
  3. 1
      fastapi/security/__init__.py
  4. 99
      fastapi/security/oauth2.py
  5. 72
      tests/test_security_oauth2_authorization_client_credentials.py
  6. 77
      tests/test_security_oauth2_authorization_client_credentials_description.py

3
docs/en/docs/reference/security/index.md

@ -21,6 +21,7 @@ from fastapi.security import (
OAuth2,
OAuth2AuthorizationCodeBearer,
OAuth2PasswordBearer,
OAuth2ClientCredentials,
OAuth2PasswordRequestForm,
OAuth2PasswordRequestFormStrict,
OpenIdConnect,
@ -58,6 +59,8 @@ from fastapi.security import (
::: fastapi.security.OAuth2PasswordBearer
::: fastapi.security.OAuth2ClientCredentials
## OAuth2 Password Form
::: fastapi.security.OAuth2PasswordRequestForm

2
docs/en/docs/release-notes.md

@ -5,6 +5,8 @@ hide:
# Release Notes
* Implement `OAuth2ClientCredentials` class. PR [#11560](https://github.com/tiangolo/fastapi/pull/11560) by [@rhysrevans3](https://github.com/rhysrevans3).
## Latest Changes
### Docs

1
fastapi/security/__init__.py

@ -8,6 +8,7 @@ from .http import HTTPBearer as HTTPBearer
from .http import HTTPDigest as HTTPDigest
from .oauth2 import OAuth2 as OAuth2
from .oauth2 import OAuth2AuthorizationCodeBearer as OAuth2AuthorizationCodeBearer
from .oauth2 import OAuth2ClientCredentials as OAuth2ClientCredentials
from .oauth2 import OAuth2PasswordBearer as OAuth2PasswordBearer
from .oauth2 import OAuth2PasswordRequestForm as OAuth2PasswordRequestForm
from .oauth2 import OAuth2PasswordRequestFormStrict as OAuth2PasswordRequestFormStrict

99
fastapi/security/oauth2.py

@ -595,6 +595,105 @@ class OAuth2AuthorizationCodeBearer(OAuth2):
return param
class OAuth2ClientCredentials(OAuth2):
"""
OAuth2 flow for authentication using a bearer token obtained with an OAuth2 client
credentials flow. An instance of it would be used as a dependency.
"""
def __init__(
self,
tokenUrl: Annotated[
str,
Doc(
"""
The URL to obtain the OAuth2 token.
"""
),
],
scheme_name: Annotated[
Optional[str],
Doc(
"""
Security scheme name.
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
"""
),
] = None,
scopes: Annotated[
Optional[Dict[str, str]],
Doc(
"""
The OAuth2 scopes that would be required by the *"path" operations* that
use this dependency.
"""
),
] = None,
description: Annotated[
Optional[str],
Doc(
"""
Security scheme description.
It will be included in the generated OpenAPI (e.g. visible at `/docs`).
"""
),
] = None,
auto_error: Annotated[
bool,
Doc(
"""
By default, if no HTTP Authorization header is provided, required for
OAuth2 authentication, it will automatically cancel the request and
send the client an error.
If `auto_error` is set to `False`, when the HTTP Authorization header
is not available, instead of erroring out, the dependency result will
be `None`.
This is useful when you want to have optional authentication.
It is also useful when you want to have authentication that can be
provided in one of multiple optional ways (for example, with OAuth2
or in a cookie).
"""
),
] = True,
):
if not scopes:
scopes = {}
flows = OAuthFlowsModel(
clientCredentials=cast(
Any,
{
"tokenUrl": tokenUrl,
"scopes": scopes,
},
)
)
super().__init__(
flows=flows,
scheme_name=scheme_name,
description=description,
auto_error=auto_error,
)
async def __call__(self, request: Request) -> Optional[str]:
authorization = request.headers.get("Authorization")
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
if self.auto_error:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
else:
return None # pragma: nocover
return param
class SecurityScopes:
"""
This is a special class that you can define in a parameter in a dependency to

72
tests/test_security_oauth2_authorization_client_credentials.py

@ -0,0 +1,72 @@
from typing import Optional
from fastapi import FastAPI, Security
from fastapi.security import OAuth2ClientCredentials
from fastapi.testclient import TestClient
app = FastAPI()
oauth2_scheme = OAuth2ClientCredentials(tokenUrl="token", auto_error=True)
@app.get("/items/")
async def read_items(token: Optional[str] = Security(oauth2_scheme)):
return {"token": token}
client = TestClient(app)
def test_no_token():
response = client.get("/items")
assert response.status_code == 401, response.text
assert response.json() == {"detail": "Not authenticated"}
def test_incorrect_token():
response = client.get("/items", headers={"Authorization": "Non-existent testtoken"})
assert response.status_code == 401, response.text
assert response.json() == {"detail": "Not authenticated"}
def test_token():
response = client.get("/items", headers={"Authorization": "Bearer testtoken"})
assert response.status_code == 200, response.text
assert response.json() == {"token": "testtoken"}
def test_openapi_schema():
response = client.get("/openapi.json")
assert response.status_code == 200, response.text
assert response.json() == {
"openapi": "3.1.0",
"info": {"title": "FastAPI", "version": "0.1.0"},
"paths": {
"/items/": {
"get": {
"responses": {
"200": {
"description": "Successful Response",
"content": {"application/json": {"schema": {}}},
}
},
"summary": "Read Items",
"operationId": "read_items_items__get",
"security": [{"OAuth2ClientCredentials": []}],
}
}
},
"components": {
"securitySchemes": {
"OAuth2ClientCredentials": {
"type": "oauth2",
"flows": {
"clientCredentials": {
"tokenUrl": "token",
"scopes": {},
}
},
}
}
},
}

77
tests/test_security_oauth2_authorization_client_credentials_description.py

@ -0,0 +1,77 @@
from typing import Optional
from fastapi import FastAPI, Security
from fastapi.security import OAuth2ClientCredentials
from fastapi.testclient import TestClient
app = FastAPI()
oauth2_scheme = OAuth2ClientCredentials(
tokenUrl="token",
description="OAuth2 Client Credentials Flow",
auto_error=True,
)
@app.get("/items/")
async def read_items(token: Optional[str] = Security(oauth2_scheme)):
return {"token": token}
client = TestClient(app)
def test_no_token():
response = client.get("/items")
assert response.status_code == 401, response.text
assert response.json() == {"detail": "Not authenticated"}
def test_incorrect_token():
response = client.get("/items", headers={"Authorization": "Non-existent testtoken"})
assert response.status_code == 401, response.text
assert response.json() == {"detail": "Not authenticated"}
def test_token():
response = client.get("/items", headers={"Authorization": "Bearer testtoken"})
assert response.status_code == 200, response.text
assert response.json() == {"token": "testtoken"}
def test_openapi_schema():
response = client.get("/openapi.json")
assert response.status_code == 200, response.text
assert response.json() == {
"openapi": "3.1.0",
"info": {"title": "FastAPI", "version": "0.1.0"},
"paths": {
"/items/": {
"get": {
"responses": {
"200": {
"description": "Successful Response",
"content": {"application/json": {"schema": {}}},
}
},
"summary": "Read Items",
"operationId": "read_items_items__get",
"security": [{"OAuth2ClientCredentials": []}],
}
}
},
"components": {
"securitySchemes": {
"OAuth2ClientCredentials": {
"type": "oauth2",
"flows": {
"clientCredentials": {
"tokenUrl": "token",
"scopes": {},
}
},
"description": "OAuth2 Client Credentials Flow",
}
}
},
}
Loading…
Cancel
Save