Browse Source

fix: reject None from security dependencies when annotation is non-optional

When a security scheme (OAuth2PasswordBearer, HTTPBearer, APIKeyHeader,
etc.) has auto_error=False, it returns None on missing or invalid
credentials. Previously, FastAPI injected this None into the endpoint
parameter regardless of its type annotation, causing either a 500 crash
(if the handler called methods on the value) or a silent auth bypass
(if the handler did a truthy check).

Now, when a security scheme dependency returns None and the endpoint
parameter is annotated as non-optional (e.g. `str` instead of
`str | None`), FastAPI returns a 422 validation error with a message
explaining the mismatch. Parameters annotated as optional continue to
receive None as before.

The check only applies to direct security scheme dependencies, not to
intermediate functions in dependency chains.
pull/15414/head
Ismail Pelaseyed 2 months ago
parent
commit
da5f00610d
  1. 1
      fastapi/dependencies/models.py
  2. 35
      fastapi/dependencies/utils.py
  3. 264
      tests/test_security_auto_error_none_type_safety.py

1
fastapi/dependencies/models.py

@ -49,6 +49,7 @@ class Dependant:
use_cache: bool = True use_cache: bool = True
path: str | None = None path: str | None = None
scope: Literal["function", "request"] | None = None scope: Literal["function", "request"] | None = None
param_annotation: Any = field(default=inspect.Parameter.empty)
@cached_property @cached_property
def oauth_scopes(self) -> list[str]: def oauth_scopes(self) -> list[str]:

35
fastapi/dependencies/utils.py

@ -1,6 +1,7 @@
import dataclasses import dataclasses
import inspect import inspect
import sys import sys
import types
from collections.abc import ( from collections.abc import (
AsyncGenerator, AsyncGenerator,
AsyncIterable, AsyncIterable,
@ -91,6 +92,19 @@ multipart_incorrect_install_error = (
) )
def _annotation_allows_none(annotation: Any) -> bool:
if annotation is inspect.Parameter.empty or annotation is Any:
return True
if annotation is type(None) or annotation is None:
return True
origin = get_origin(annotation)
if origin is Annotated:
return _annotation_allows_none(get_args(annotation)[0])
if origin is Union or origin is types.UnionType:
return type(None) in get_args(annotation)
return False
def ensure_multipart_is_installed() -> None: def ensure_multipart_is_installed() -> None:
try: try:
from python_multipart import __version__ from python_multipart import __version__
@ -340,6 +354,7 @@ def get_dependant(
use_cache=param_details.depends.use_cache, use_cache=param_details.depends.use_cache,
scope=param_details.depends.scope, scope=param_details.depends.scope,
) )
sub_dependant.param_annotation = param.annotation
dependant.dependencies.append(sub_dependant) dependant.dependencies.append(sub_dependant)
continue continue
if add_non_field_param_to_dependency( if add_non_field_param_to_dependency(
@ -679,6 +694,26 @@ async def solve_dependencies(
else: else:
solved = await run_in_threadpool(call, **solved_result.values) solved = await run_in_threadpool(call, **solved_result.values)
if sub_dependant.name is not None: if sub_dependant.name is not None:
if (
solved is None
and sub_dependant._is_security_scheme
and sub_dependant.param_annotation is not inspect.Parameter.empty
and not _annotation_allows_none(sub_dependant.param_annotation)
):
errors.append(
{
"type": "missing",
"loc": ("dependency", sub_dependant.name),
"msg": (
f"Dependency returned None for parameter "
f"'{sub_dependant.name}' which is annotated as "
f"non-optional. Use 'Optional[...]' or '... | None' "
f"if the dependency can return None (e.g. when using "
f"auto_error=False)."
),
}
)
continue
values[sub_dependant.name] = solved values[sub_dependant.name] = solved
if sub_dependant.cache_key not in dependency_cache: if sub_dependant.cache_key not in dependency_cache:
dependency_cache[sub_dependant.cache_key] = solved dependency_cache[sub_dependant.cache_key] = solved

264
tests/test_security_auto_error_none_type_safety.py

@ -0,0 +1,264 @@
from typing import Annotated
from fastapi import Depends, FastAPI
from fastapi.security import (
APIKeyCookie,
APIKeyHeader,
APIKeyQuery,
HTTPAuthorizationCredentials,
HTTPBasic,
HTTPBasicCredentials,
HTTPBearer,
OAuth2PasswordBearer,
)
from fastapi.testclient import TestClient
from inline_snapshot import snapshot
def test_oauth2_non_optional_no_auth():
app = FastAPI()
oauth2 = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
@app.get("/me")
def get_me(token: str = Depends(oauth2)):
return {"token": token}
client = TestClient(app)
resp = client.get("/me")
assert resp.status_code == 422
assert resp.json() == snapshot(
{
"detail": [
{
"type": "missing",
"loc": ["dependency", "token"],
"msg": "Dependency returned None for parameter 'token' which is annotated as non-optional. Use 'Optional[...]' or '... | None' if the dependency can return None (e.g. when using auto_error=False).",
}
]
}
)
def test_oauth2_optional_no_auth():
app = FastAPI()
oauth2 = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
@app.get("/me")
def get_me(token: str | None = Depends(oauth2)):
return {"token": token}
client = TestClient(app)
resp = client.get("/me")
assert resp.status_code == 200
assert resp.json() == {"token": None}
def test_oauth2_non_optional_with_auth():
app = FastAPI()
oauth2 = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
@app.get("/me")
def get_me(token: str = Depends(oauth2)):
return {"token": token}
client = TestClient(app)
resp = client.get("/me", headers={"Authorization": "Bearer valid"})
assert resp.status_code == 200
assert resp.json() == {"token": "valid"}
def test_http_bearer_non_optional_no_auth():
app = FastAPI()
bearer = HTTPBearer(auto_error=False)
@app.get("/profile")
def get_profile(creds: HTTPAuthorizationCredentials = Depends(bearer)):
return {"scheme": creds.scheme}
client = TestClient(app)
resp = client.get("/profile")
assert resp.status_code == 422
def test_http_bearer_optional_no_auth():
app = FastAPI()
bearer = HTTPBearer(auto_error=False)
@app.get("/profile")
def get_profile(creds: HTTPAuthorizationCredentials | None = Depends(bearer)):
if creds is None:
return {"status": "anonymous"}
return {"scheme": creds.scheme}
client = TestClient(app)
resp = client.get("/profile")
assert resp.status_code == 200
assert resp.json() == {"status": "anonymous"}
def test_api_key_header_non_optional_no_key():
app = FastAPI()
api_key = APIKeyHeader(name="X-API-Key", auto_error=False)
@app.get("/data")
def get_data(key: str = Depends(api_key)):
return {"key": key}
client = TestClient(app)
resp = client.get("/data")
assert resp.status_code == 422
def test_api_key_query_non_optional_no_key():
app = FastAPI()
api_key = APIKeyQuery(name="api_key", auto_error=False)
@app.get("/data")
def get_data(key: str = Depends(api_key)):
return {"key": key}
client = TestClient(app)
resp = client.get("/data")
assert resp.status_code == 422
def test_api_key_cookie_non_optional_no_key():
app = FastAPI()
api_key = APIKeyCookie(name="session", auto_error=False)
@app.get("/data")
def get_data(key: str = Depends(api_key)):
return {"key": key}
client = TestClient(app)
resp = client.get("/data")
assert resp.status_code == 422
def test_http_basic_non_optional_no_auth():
app = FastAPI()
basic = HTTPBasic(auto_error=False)
@app.get("/data")
def get_data(creds: HTTPBasicCredentials = Depends(basic)):
return {"user": creds.username}
client = TestClient(app)
resp = client.get("/data")
assert resp.status_code == 422
def test_annotated_syntax_non_optional():
app = FastAPI()
api_key = APIKeyHeader(name="X-API-Key", auto_error=False)
@app.get("/data")
def get_data(key: Annotated[str, Depends(api_key)]):
return {"key": key}
client = TestClient(app)
resp = client.get("/data")
assert resp.status_code == 422
resp2 = client.get("/data", headers={"X-API-Key": "secret"})
assert resp2.status_code == 200
assert resp2.json() == {"key": "secret"}
def test_annotated_syntax_optional():
app = FastAPI()
api_key = APIKeyHeader(name="X-API-Key", auto_error=False)
@app.get("/data")
def get_data(key: Annotated[str | None, Depends(api_key)]):
return {"key": key}
client = TestClient(app)
resp = client.get("/data")
assert resp.status_code == 200
assert resp.json() == {"key": None}
def test_any_annotation_allows_none():
"""Any annotation should allow None (no type constraint)."""
from typing import Any
app = FastAPI()
oauth2 = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
@app.get("/me")
def get_me(token: Any = Depends(oauth2)):
return {"token": token}
client = TestClient(app)
resp = client.get("/me")
assert resp.status_code == 200
assert resp.json() == {"token": None}
def test_none_type_annotation_allows_none():
app = FastAPI()
oauth2 = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
@app.get("/me")
def get_me(token: None = Depends(oauth2)):
return {"token": token}
client = TestClient(app)
resp = client.get("/me")
assert resp.status_code == 200
assert resp.json() == {"token": None}
def test_annotated_non_optional_inner_blocked():
"""Annotated[str, Depends(...)] should still be blocked when str is non-optional."""
from typing import Annotated
app = FastAPI()
api_key = APIKeyHeader(name="X-Key", auto_error=False)
@app.get("/data")
def get_data(key: Annotated[str, Depends(api_key)]):
return {"key": key}
client = TestClient(app)
resp = client.get("/data")
assert resp.status_code == 422
def test_annotated_optional_inner_allowed():
"""Annotated[str | None, Depends(...)] should allow None."""
from typing import Annotated
app = FastAPI()
api_key = APIKeyHeader(name="X-Key", auto_error=False)
@app.get("/data")
def get_data(key: Annotated[str | None, Depends(api_key)]):
return {"key": key}
client = TestClient(app)
resp = client.get("/data")
assert resp.status_code == 200
assert resp.json() == {"key": None}
def test_chain_through_intermediate_not_blocked():
app = FastAPI()
oauth2 = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
def get_user(token: str | None = Depends(oauth2)):
if token is None:
return None
return {"name": "alice"}
@app.get("/data")
def get_data(user: dict = Depends(get_user)):
if user is None:
return {"msg": "anonymous"}
return user
client = TestClient(app)
resp = client.get("/data")
assert resp.status_code == 200
assert resp.json() == {"msg": "anonymous"}
Loading…
Cancel
Save