diff --git a/fastapi/dependencies/utils.py b/fastapi/dependencies/utils.py index aceca6a1d3..b41654c675 100644 --- a/fastapi/dependencies/utils.py +++ b/fastapi/dependencies/utils.py @@ -55,7 +55,7 @@ from fastapi.concurrency import ( contextmanager_in_threadpool, ) from fastapi.dependencies.models import Dependant -from fastapi.exceptions import DependencyScopeError +from fastapi.exceptions import DependencyScopeError, FastAPIError from fastapi.logger import logger from fastapi.security.oauth2 import SecurityScopes from fastapi.types import DependencyCacheKey @@ -390,6 +390,204 @@ class ParamDetails: field: ModelField | None +@dataclass(slots=True) +class _AnnotatedBodyDependsMerge: + """Result of ``Annotated[..., marker, Depends(Model)]`` (order-independent).""" + + field_info: FieldInfo + use_annotation: Any + + +class _AnnotatedBodyDependsMerger: + """ + Merge ``Body`` / ``Form`` / ``File`` / ``Query`` with ``Depends(model_cls)`` inside ``Annotated``. + + FastAPI normally keeps only the last marker in ``Annotated[...]``, so + ``Annotated[T, Body(), Depends(Foo)]`` used to ignore ``Body`` and treat ``Foo`` as a + dependency (e.g. query fields). This path combines them into one field (body, form, + multipart, or query): OpenAPI and validation use ``Foo``; the handler parameter's + declared type stays ``T`` (``declared_type``, i.e. the first ``Annotated`` argument). + + Contract: + - Exactly one of ``Body()``, ``Form()``, ``File()``, or ``Query()`` among + FastAPI-specific metadata (not ``Body`` and ``Query`` together). + - Exactly one ``Depends``, whose dependency resolves to a single ``BaseModel`` subclass + (not a callable, not ``Security``). + - No other ``Param`` kinds in the same ``Annotated`` (e.g. ``Path``, ``Header``, …) + besides the single shape marker and ``Depends``. + - Not used on path parameters. + - Order of markers in ``Annotated`` does not matter. + """ + + __slots__ = ( + "param_name", + "declared_type", + "fastapi_specific_annotations", + "value", + "is_path_param", + ) + + def __init__( + self, + *, + param_name: str, + declared_type: Any, + fastapi_specific_annotations: list[Any], + value: Any, + is_path_param: bool, + ) -> None: + self.param_name = param_name + self.declared_type = declared_type + self.fastapi_specific_annotations = fastapi_specific_annotations + self.value = value + self.is_path_param = is_path_param + + def try_build(self) -> _AnnotatedBodyDependsMerge | None: + body_like_markers, query_markers, depends_markers = self._collect_markers() + if body_like_markers and query_markers: + raise FastAPIError( + f"Cannot combine `Query` with `Body`, `Form`, or `File` in `Annotated` for " + f"{self.param_name!r}" + ) + shape_markers = body_like_markers or query_markers + if not (shape_markers and depends_markers): + return None + + self._validate_path_parameter() + self._validate_single_body_like_marker(body_like_markers) + self._validate_single_query_marker(query_markers) + self._validate_single_depends_marker(depends_markers) + shape_marker = shape_markers[0] + self._validate_no_conflicting_param_markers(shape_marker=shape_marker) + dep_marker = self._copy_depends_marker_with_declared_type_fallback( + depends_markers=depends_markers, + ) + self._validate_depends_not_security(dep_marker) + model_cls = self._require_pydantic_model_class(dep_marker) + field_info, merged_annotation = self._build_field_info_and_annotation( + model_cls=model_cls, + shape_marker=shape_marker, + ) + self._apply_signature_default_to_field_info(field_info) + return _AnnotatedBodyDependsMerge( + field_info=field_info, + use_annotation=merged_annotation, + ) + + def _collect_markers(self) -> tuple[list[Any], list[Any], list[Any]]: + body_like = [ + arg + for arg in self.fastapi_specific_annotations + if isinstance(arg, params.Body) + ] + queries = [ + arg + for arg in self.fastapi_specific_annotations + if isinstance(arg, params.Query) + ] + depends = [ + arg + for arg in self.fastapi_specific_annotations + if isinstance(arg, params.Depends) + ] + return body_like, queries, depends + + def _validate_path_parameter(self) -> None: + if self.is_path_param: + raise FastAPIError( + f"Cannot combine `Body`/`Form`/`File`/`Query` with `Depends` in `Annotated` " + f"for path parameter {self.param_name!r}" + ) + + def _validate_single_body_like_marker(self, body_like_markers: list[Any]) -> None: + if len(body_like_markers) > 1: + raise FastAPIError( + f"Cannot specify multiple `Body`, `Form`, or `File` markers for " + f"{self.param_name!r} when using `Depends` in the same `Annotated`" + ) + + def _validate_single_query_marker(self, query_markers: list[Any]) -> None: + if len(query_markers) > 1: + raise FastAPIError( + f"Cannot specify multiple `Query` markers for {self.param_name!r} when " + f"using `Depends` in the same `Annotated`" + ) + + def _validate_single_depends_marker(self, depends_markers: list[Any]) -> None: + if len(depends_markers) > 1: + raise FastAPIError( + f"Cannot specify multiple `Depends` together with `Body`, `Form`, " + f"`File`, or `Query` in `Annotated` for {self.param_name!r}" + ) + + def _conflicting_param_markers(self, *, shape_marker: Any) -> list[Any]: + return [ + arg + for arg in self.fastapi_specific_annotations + if isinstance(arg, params.Param) and arg is not shape_marker + ] + + def _validate_no_conflicting_param_markers(self, *, shape_marker: Any) -> None: + if self._conflicting_param_markers(shape_marker=shape_marker): + raise FastAPIError( + f"Cannot combine `Body`/`Form`/`File`/`Query` with other parameter types " + f"(e.g. `Path`, `Header`) in `Annotated` for {self.param_name!r}" + ) + + def _copy_depends_marker_with_declared_type_fallback( + self, depends_markers: list[Any] + ) -> params.Depends: + dep_marker = cast(params.Depends, copy(depends_markers[0])) + if dep_marker.dependency is None: + dep_marker = dataclasses.replace(dep_marker, dependency=self.declared_type) + return dep_marker + + def _validate_depends_not_security(self, dep_marker: params.Depends) -> None: + if isinstance(dep_marker, params.Security): + raise FastAPIError( + f"Cannot combine `Body`/`Form`/`File`/`Query` with `Security` in `Annotated` " + f"for {self.param_name!r}" + ) + + def _require_pydantic_model_class( + self, dep_marker: params.Depends + ) -> type[BaseModel]: + dependency = dep_marker.dependency + assert dependency is not None + if not isinstance(dependency, type) or not lenient_issubclass( + dependency, BaseModel + ): + raise FastAPIError( + f"When using `Body`, `Form`, `File`, or `Query` together with `Depends` in " + f"`Annotated` for {self.param_name!r}, `Depends` must reference a Pydantic " + f"model class (e.g. `Depends(MyModel)`), not a callable dependency." + ) + return dependency + + def _build_field_info_and_annotation( + self, + model_cls: type[BaseModel], + shape_marker: Any, + ) -> tuple[FieldInfo, Any]: + merged_annotation = Annotated[model_cls, shape_marker] # type: ignore[valid-type] + field_info = copy_field_info( + field_info=shape_marker, + annotation=merged_annotation, + ) + assert field_info.default == Undefined or field_info.default == RequiredParam, ( + f"`{field_info.__class__.__name__}` default value cannot be set in" + f" `Annotated` for {self.param_name!r}. Set the default value with `=` instead." + ) + return field_info, merged_annotation + + def _apply_signature_default_to_field_info(self, field_info: FieldInfo) -> None: + if self.value is not inspect.Signature.empty: + assert not self.is_path_param, "Path parameters cannot have default values" + field_info.default = self.value + else: + field_info.default = RequiredParam + + def analyze_param( *, param_name: str, @@ -428,14 +626,24 @@ def analyze_param( ), ) ] - if fastapi_specific_annotations: - fastapi_annotation: FieldInfo | params.Depends | None = ( - fastapi_specific_annotations[-1] - ) - else: - fastapi_annotation = None - # Set default for Annotated FieldInfo - if isinstance(fastapi_annotation, FieldInfo): + merged_body_depends = False + fastapi_annotation: FieldInfo | params.Depends | None = None + body_depends_merge = _AnnotatedBodyDependsMerger( + param_name=param_name, + declared_type=type_annotation, + fastapi_specific_annotations=fastapi_specific_annotations, + value=value, + is_path_param=is_path_param, + ).try_build() + if body_depends_merge is not None: + field_info = body_depends_merge.field_info + use_annotation = body_depends_merge.use_annotation + depends = None + merged_body_depends = True + elif fastapi_specific_annotations: + fastapi_annotation = fastapi_specific_annotations[-1] + # Set default for Annotated FieldInfo / Depends (single winner unless merged above) + if not merged_body_depends and isinstance(fastapi_annotation, FieldInfo): # Copy `field_info` because we mutate `field_info.default` below. field_info = copy_field_info( field_info=fastapi_annotation, @@ -452,8 +660,7 @@ def analyze_param( field_info.default = value else: field_info.default = RequiredParam - # Get Annotated Depends - elif isinstance(fastapi_annotation, params.Depends): + elif not merged_body_depends and isinstance(fastapi_annotation, params.Depends): depends = fastapi_annotation # Get Depends from default value if isinstance(value, params.Depends):