diff --git a/fastapi/dependencies/models.py b/fastapi/dependencies/models.py index 25ffb0d2da..3744fc7a12 100644 --- a/fastapi/dependencies/models.py +++ b/fastapi/dependencies/models.py @@ -2,8 +2,8 @@ import inspect import sys from collections.abc import Callable from dataclasses import dataclass, field -from functools import cached_property, partial -from typing import Any, Literal +from functools import partial +from typing import Any, Literal, cast from fastapi._compat import ModelField from fastapi.security.base import SecurityBase @@ -18,17 +18,17 @@ else: # pragma: no cover def _unwrapped_call(call: Callable[..., Any] | None) -> Any: if call is None: return call # pragma: no cover - unwrapped = inspect.unwrap(_impartial(call)) + unwrapped = inspect.unwrap(cast(Callable[..., Any], _impartial(call))) return unwrapped -def _impartial(func: Callable[..., Any]) -> Callable[..., Any]: +def _impartial(func: Callable[..., Any] | None) -> Callable[..., Any] | None: while isinstance(func, partial): func = func.func return func -@dataclass +@dataclass(slots=True) class Dependant: path_params: list[ModelField] = field(default_factory=list) query_params: list[ModelField] = field(default_factory=list) @@ -49,145 +49,239 @@ class Dependant: use_cache: bool = True path: str | None = None scope: Literal["function", "request"] | None = None + # Lazy cached fields + _oauth_scopes_cache: list[str] | None = field(default=None, init=False, repr=False) + _cache_key_cache: DependencyCacheKey | None = field( + default=None, init=False, repr=False + ) + _uses_scopes_cache: bool | None = field(default=None, init=False, repr=False) + _is_security_scheme_cache: bool | None = field(default=None, init=False, repr=False) + _security_scheme_cache: SecurityBase | None = field( + default=None, init=False, repr=False + ) + _security_dependencies_cache: list["Dependant"] | None = field( + default=None, init=False, repr=False + ) + _is_gen_callable_cache: bool | None = field(default=None, init=False, repr=False) + _is_async_gen_callable_cache: bool | None = field( + default=None, init=False, repr=False + ) + _is_coroutine_callable_cache: bool | None = field( + default=None, init=False, repr=False + ) + _computed_scope_cache: str | None = field(default=None, init=False, repr=False) - @cached_property + @property def oauth_scopes(self) -> list[str]: - scopes = self.parent_oauth_scopes.copy() if self.parent_oauth_scopes else [] - # This doesn't use a set to preserve order, just in case - for scope in self.own_oauth_scopes or []: - if scope not in scopes: - scopes.append(scope) - return scopes - - @cached_property + if self._oauth_scopes_cache is None: + scopes = self.parent_oauth_scopes.copy() if self.parent_oauth_scopes else [] + # This doesn't use a set to preserve order, just in case + for scope in self.own_oauth_scopes or []: + if scope not in scopes: + scopes.append(scope) + self._oauth_scopes_cache = scopes + + return self._oauth_scopes_cache + + @property def cache_key(self) -> DependencyCacheKey: - scopes_for_cache = ( - tuple(sorted(set(self.oauth_scopes or []))) if self._uses_scopes else () - ) - return ( - self.call, - scopes_for_cache, - self.computed_scope or "", - ) - - @cached_property + if self._cache_key_cache is None: + scopes_for_cache = ( + tuple(sorted(set(self.oauth_scopes or []))) if self._uses_scopes else () + ) + self._cache_key_cache = ( + self.call, + scopes_for_cache, + self.computed_scope or "", + ) + + return self._cache_key_cache + + @property def _uses_scopes(self) -> bool: - if self.own_oauth_scopes: - return True - if self.security_scopes_param_name is not None: - return True - if self._is_security_scheme: - return True - for sub_dep in self.dependencies: - if sub_dep._uses_scopes: - return True - return False - - @cached_property + if self._uses_scopes_cache is None: + if self.own_oauth_scopes: + self._uses_scopes_cache = True + elif self.security_scopes_param_name is not None: + self._uses_scopes_cache = True + elif self._is_security_scheme: + self._uses_scopes_cache = True + + for sub_dep in self.dependencies: + if sub_dep._uses_scopes: + self._uses_scopes_cache = True + break + + if self._uses_scopes_cache is None: + self._uses_scopes_cache = False + + return self._uses_scopes_cache + + @property def _is_security_scheme(self) -> bool: - if self.call is None: - return False # pragma: no cover - unwrapped = _unwrapped_call(self.call) - return isinstance(unwrapped, SecurityBase) + if self._is_security_scheme_cache is None: + if self.call is None: + self._is_security_scheme_cache = False # pragma: no cover + else: + unwrapped = _unwrapped_call(self.call) + self._is_security_scheme_cache = isinstance(unwrapped, SecurityBase) + + return self._is_security_scheme_cache # Mainly to get the type of SecurityBase, but it's the same self.call - @cached_property + @property def _security_scheme(self) -> SecurityBase: - unwrapped = _unwrapped_call(self.call) - assert isinstance(unwrapped, SecurityBase) - return unwrapped + if self._security_scheme_cache is None: + unwrapped = _unwrapped_call(self.call) + assert isinstance(unwrapped, SecurityBase) + self._security_scheme_cache = unwrapped + + return self._security_scheme_cache - @cached_property + @property def _security_dependencies(self) -> list["Dependant"]: - security_deps = [dep for dep in self.dependencies if dep._is_security_scheme] - return security_deps + if self._security_dependencies_cache is None: + security_deps = [ + dep for dep in self.dependencies if dep._is_security_scheme + ] + self._security_dependencies_cache = security_deps - @cached_property + return self._security_dependencies_cache + + @property def is_gen_callable(self) -> bool: - if self.call is None: - return False # pragma: no cover - if inspect.isgeneratorfunction( - _impartial(self.call) - ) or inspect.isgeneratorfunction(_unwrapped_call(self.call)): - return True - if inspect.isclass(_unwrapped_call(self.call)): - return False - dunder_call = getattr(_impartial(self.call), "__call__", None) # noqa: B004 - if dunder_call is None: - return False # pragma: no cover - if inspect.isgeneratorfunction( - _impartial(dunder_call) - ) or inspect.isgeneratorfunction(_unwrapped_call(dunder_call)): - return True - dunder_unwrapped_call = getattr(_unwrapped_call(self.call), "__call__", None) # noqa: B004 - if dunder_unwrapped_call is None: - return False # pragma: no cover - if inspect.isgeneratorfunction( - _impartial(dunder_unwrapped_call) - ) or inspect.isgeneratorfunction(_unwrapped_call(dunder_unwrapped_call)): - return True - return False - - @cached_property + if self._is_gen_callable_cache is None: + if self.call is None: + self._is_gen_callable_cache = False # pragma: no cover + elif inspect.isgeneratorfunction( + _impartial(self.call) + ) or inspect.isgeneratorfunction(_unwrapped_call(self.call)): + self._is_gen_callable_cache = True + elif inspect.isclass(_unwrapped_call(self.call)): + self._is_gen_callable_cache = False + + if self._is_gen_callable_cache is not None: + return self._is_gen_callable_cache + + dunder_call = getattr(_impartial(self.call), "__call__", None) # noqa: B004 + if dunder_call is None: + self._is_gen_callable_cache = False # pragma: no cover + elif inspect.isgeneratorfunction( + _impartial(dunder_call) + ) or inspect.isgeneratorfunction(_unwrapped_call(dunder_call)): + self._is_gen_callable_cache = True + + if self._is_gen_callable_cache is not None: + return self._is_gen_callable_cache + + dunder_unwrapped_call = getattr( # noqa: B004 + _unwrapped_call(self.call), + "__call__", + None, + ) + if dunder_unwrapped_call is None: + self._is_gen_callable_cache = False # pragma: no cover + if inspect.isgeneratorfunction( + _impartial(dunder_unwrapped_call) + ) or inspect.isgeneratorfunction(_unwrapped_call(dunder_unwrapped_call)): + self._is_gen_callable_cache = True + else: + self._is_gen_callable_cache = False + + return self._is_gen_callable_cache + + @property def is_async_gen_callable(self) -> bool: - if self.call is None: - return False # pragma: no cover - if inspect.isasyncgenfunction( - _impartial(self.call) - ) or inspect.isasyncgenfunction(_unwrapped_call(self.call)): - return True - if inspect.isclass(_unwrapped_call(self.call)): - return False - dunder_call = getattr(_impartial(self.call), "__call__", None) # noqa: B004 - if dunder_call is None: - return False # pragma: no cover - if inspect.isasyncgenfunction( - _impartial(dunder_call) - ) or inspect.isasyncgenfunction(_unwrapped_call(dunder_call)): - return True - dunder_unwrapped_call = getattr(_unwrapped_call(self.call), "__call__", None) # noqa: B004 - if dunder_unwrapped_call is None: - return False # pragma: no cover - if inspect.isasyncgenfunction( - _impartial(dunder_unwrapped_call) - ) or inspect.isasyncgenfunction(_unwrapped_call(dunder_unwrapped_call)): - return True - return False - - @cached_property + if self._is_async_gen_callable_cache is None: + if self.call is None: + self._is_async_gen_callable_cache = False # pragma: no cover + elif inspect.isasyncgenfunction( + _impartial(self.call) + ) or inspect.isasyncgenfunction(_unwrapped_call(self.call)): + self._is_async_gen_callable_cache = True + elif inspect.isclass(_unwrapped_call(self.call)): + self._is_async_gen_callable_cache = False + + if self._is_async_gen_callable_cache is not None: + return self._is_async_gen_callable_cache + + dunder_call = getattr(_impartial(self.call), "__call__", None) # noqa: B004 + if dunder_call is None: + self._is_async_gen_callable_cache = False # pragma: no cover + elif inspect.isasyncgenfunction( + _impartial(dunder_call) + ) or inspect.isasyncgenfunction(_unwrapped_call(dunder_call)): + self._is_async_gen_callable_cache = True + + if self._is_async_gen_callable_cache is not None: + return self._is_async_gen_callable_cache + + dunder_unwrapped_call = getattr( # noqa: B004 + _unwrapped_call(self.call), "__call__", None + ) + if dunder_unwrapped_call is None: + self._is_async_gen_callable_cache = False # pragma: no cover + elif inspect.isasyncgenfunction( + _impartial(dunder_unwrapped_call) + ) or inspect.isasyncgenfunction(_unwrapped_call(dunder_unwrapped_call)): + self._is_async_gen_callable_cache = True + else: + self._is_async_gen_callable_cache = False + + return self._is_async_gen_callable_cache + + @property def is_coroutine_callable(self) -> bool: - if self.call is None: - return False # pragma: no cover - if inspect.isroutine(_impartial(self.call)) and iscoroutinefunction( - _impartial(self.call) - ): - return True - if inspect.isroutine(_unwrapped_call(self.call)) and iscoroutinefunction( - _unwrapped_call(self.call) - ): - return True - if inspect.isclass(_unwrapped_call(self.call)): - return False - dunder_call = getattr(_impartial(self.call), "__call__", None) # noqa: B004 - if dunder_call is None: - return False # pragma: no cover - if iscoroutinefunction(_impartial(dunder_call)) or iscoroutinefunction( - _unwrapped_call(dunder_call) - ): - return True - dunder_unwrapped_call = getattr(_unwrapped_call(self.call), "__call__", None) # noqa: B004 - if dunder_unwrapped_call is None: - return False # pragma: no cover - if iscoroutinefunction( - _impartial(dunder_unwrapped_call) - ) or iscoroutinefunction(_unwrapped_call(dunder_unwrapped_call)): - return True - return False - - @cached_property + if self._is_coroutine_callable_cache is None: + if self.call is None: + self._is_coroutine_callable_cache = False # pragma: no cover + elif inspect.isroutine(_impartial(self.call)) and iscoroutinefunction( + _impartial(self.call) + ): + self._is_coroutine_callable_cache = True + elif inspect.isroutine(_unwrapped_call(self.call)) and iscoroutinefunction( + _unwrapped_call(self.call) + ): + self._is_coroutine_callable_cache = True + elif inspect.isclass(_unwrapped_call(self.call)): + self._is_coroutine_callable_cache = False + + if self._is_coroutine_callable_cache is not None: + return self._is_coroutine_callable_cache + + dunder_call = getattr(_impartial(self.call), "__call__", None) # noqa: B004 + if dunder_call is None: + self._is_coroutine_callable_cache = False # pragma: no cover + elif iscoroutinefunction(_impartial(dunder_call)) or iscoroutinefunction( + _unwrapped_call(dunder_call) + ): + self._is_coroutine_callable_cache = True + + if self._is_coroutine_callable_cache is not None: + return self._is_coroutine_callable_cache + + dunder_unwrapped_call = getattr( # noqa: B004 + _unwrapped_call(self.call), "__call__", None + ) + if dunder_unwrapped_call is None: + self._is_coroutine_callable_cache = False # pragma: no cover + elif iscoroutinefunction( + _impartial(dunder_unwrapped_call) + ) or iscoroutinefunction(_unwrapped_call(dunder_unwrapped_call)): + self._is_coroutine_callable_cache = True + else: + self._is_coroutine_callable_cache = False + + return self._is_coroutine_callable_cache + + @property def computed_scope(self) -> str | None: - if self.scope: - return self.scope - if self.is_gen_callable or self.is_async_gen_callable: - return "request" - return None + if self._computed_scope_cache is None: + if self.scope: + self._computed_scope_cache = self.scope + elif self.is_gen_callable or self.is_async_gen_callable: + self._computed_scope_cache = "request" + else: + self._computed_scope_cache = None + + return self._computed_scope_cache