Browse Source

Merge 392170d116 into 6df50d40fe

pull/13589/merge
Tyler Epperson 3 days ago
committed by GitHub
parent
commit
a47279dadf
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 5
      fastapi/_compat.py
  2. 8
      fastapi/applications.py
  3. 159
      fastapi/dependencies/utils.py
  4. 4
      fastapi/openapi/models.py
  5. 30
      fastapi/openapi/utils.py
  6. 4
      fastapi/params.py
  7. 23
      fastapi/responses.py
  8. 38
      fastapi/routing.py
  9. 4
      fastapi/utils.py

5
fastapi/_compat.py

@ -269,7 +269,10 @@ if PYDANTIC_V2:
origin_type = ( origin_type = (
get_origin(field.field_info.annotation) or field.field_info.annotation get_origin(field.field_info.annotation) or field.field_info.annotation
) )
assert issubclass(origin_type, sequence_types) # type: ignore[arg-type] if not issubclass(origin_type, sequence_types) or not isinstance(
origin_type, type
):
raise TypeError(f"Field {field.name} is not a supported sequence type")
return sequence_annotation_to_type[origin_type](value) # type: ignore[no-any-return] return sequence_annotation_to_type[origin_type](value) # type: ignore[no-any-return]
def get_missing_field_error(loc: Tuple[str, ...]) -> Dict[str, Any]: def get_missing_field_error(loc: Tuple[str, ...]) -> Dict[str, Any]:

8
fastapi/applications.py

@ -872,8 +872,12 @@ class FastAPI(Starlette):
] = "3.1.0" ] = "3.1.0"
self.openapi_schema: Optional[Dict[str, Any]] = None self.openapi_schema: Optional[Dict[str, Any]] = None
if self.openapi_url: if self.openapi_url:
assert self.title, "A title must be provided for OpenAPI, e.g.: 'My API'" if not self.title:
assert self.version, "A version must be provided for OpenAPI, e.g.: '2.1.0'" raise ValueError("A title must be provided for OpenAPI, e.g.: 'My API'")
if not self.version:
raise ValueError(
"A version must be provided for OpenAPI, e.g.: '2.1.0'"
)
# TODO: remove when discarding the openapi_prefix parameter # TODO: remove when discarding the openapi_prefix parameter
if openapi_prefix: if openapi_prefix:
logger.warning( logger.warning(

159
fastapi/dependencies/utils.py

@ -90,29 +90,42 @@ multipart_incorrect_install_error = (
def ensure_multipart_is_installed() -> None: def ensure_multipart_is_installed() -> None:
try: try:
from python_multipart import __version__ from python_multipart import __version__ as python_multipart_version
# Import an attribute that can be mocked/deleted in testing # Import an attribute that can be mocked/deleted in testing
assert __version__ > "0.0.12" if python_multipart_version <= "0.0.12":
except (ImportError, AssertionError): raise RuntimeError(
f"Incompatible 'python-multipart' version: {python_multipart_version}. "
"Please upgrade to a version > 0.0.12"
)
return
except ImportError:
pass # fallback to trying multipart
except Exception as e:
logger.error("Error checking 'python-multipart' version: %s", e)
raise
try: try:
# __version__ is available in both multiparts, and can be mocked # __version__ is available in both multiparts, and can be mocked
from multipart import __version__ # type: ignore[no-redef,import-untyped] from multipart import (
__version__ as multipart_version, # type: ignore[no-redef,import-untyped]
)
except ImportError:
logger.error(multipart_not_installed_error)
raise RuntimeError(multipart_not_installed_error) from None
if not multipart_version:
logger.error(multipart_incorrect_install_error)
raise RuntimeError(multipart_incorrect_install_error)
assert __version__
try: try:
# parse_options_header is only available in the right multipart from multipart.multipart import (
from multipart.multipart import ( # type: ignore[import-untyped] parse_options_header, # type: ignore[import-untyped]
parse_options_header,
) )
assert parse_options_header
except ImportError: except ImportError:
logger.error(multipart_incorrect_install_error) logger.error(multipart_incorrect_install_error)
raise RuntimeError(multipart_incorrect_install_error) from None raise RuntimeError(multipart_incorrect_install_error) from None
except ImportError:
logger.error(multipart_not_installed_error)
raise RuntimeError(multipart_not_installed_error) from None
def get_param_sub_dependant( def get_param_sub_dependant(
@ -122,7 +135,11 @@ def get_param_sub_dependant(
path: str, path: str,
security_scopes: Optional[List[str]] = None, security_scopes: Optional[List[str]] = None,
) -> Dependant: ) -> Dependant:
assert depends.dependency if depends.dependency is None:
raise ValueError(
f"`depends.dependency` must be set for parameter '{param_name}'"
)
return get_sub_dependant( return get_sub_dependant(
depends=depends, depends=depends,
dependency=depends.dependency, dependency=depends.dependency,
@ -133,9 +150,9 @@ def get_param_sub_dependant(
def get_parameterless_sub_dependant(*, depends: params.Depends, path: str) -> Dependant: def get_parameterless_sub_dependant(*, depends: params.Depends, path: str) -> Dependant:
assert callable(depends.dependency), ( if not callable(depends.dependency):
"A parameter-less dependency must have a callable dependency" raise TypeError("A parameter-less dependency must have a callable dependency")
)
return get_sub_dependant(depends=depends, dependency=depends.dependency, path=path) return get_sub_dependant(depends=depends, dependency=depends.dependency, path=path)
@ -302,11 +319,16 @@ def get_dependant(
type_annotation=param_details.type_annotation, type_annotation=param_details.type_annotation,
dependant=dependant, dependant=dependant,
): ):
assert param_details.field is None, ( if param_details.field is not None:
raise ValueError(
f"Cannot specify multiple FastAPI annotations for {param_name!r}" f"Cannot specify multiple FastAPI annotations for {param_name!r}"
) )
continue continue
assert param_details.field is not None
if param_details.field is None:
raise ValueError("Expected param_details.field to be set, but got None")
if isinstance(param_details.field.field_info, params.Body): if isinstance(param_details.field.field_info, params.Body):
dependant.body_params.append(param_details.field) dependant.body_params.append(param_details.field)
else: else:
@ -385,14 +407,20 @@ def analyze_param(
field_info = copy_field_info( field_info = copy_field_info(
field_info=fastapi_annotation, annotation=use_annotation field_info=fastapi_annotation, annotation=use_annotation
) )
assert (
field_info.default is Undefined or field_info.default is RequiredParam if (
), ( field_info.default is not Undefined
and field_info.default is not RequiredParam
):
raise ValueError(
f"`{field_info.__class__.__name__}` default value cannot be set in " f"`{field_info.__class__.__name__}` default value cannot be set in "
f"`Annotated` for {param_name!r}. Set the default value with `=` instead." f"`Annotated` for {param_name!r}. Set the default value with `=` instead."
) )
if value is not inspect.Signature.empty: if value is not inspect.Signature.empty:
assert not is_path_param, "Path parameters cannot have default values" if is_path_param:
raise ValueError("Path parameters cannot have default values")
field_info.default = value field_info.default = value
else: else:
field_info.default = RequiredParam field_info.default = RequiredParam
@ -401,21 +429,26 @@ def analyze_param(
depends = fastapi_annotation depends = fastapi_annotation
# Get Depends from default value # Get Depends from default value
if isinstance(value, params.Depends): if isinstance(value, params.Depends):
assert depends is None, ( if depends is not None:
"Cannot specify `Depends` in `Annotated` and default value" raise ValueError(
f" together for {param_name!r}" f"Cannot specify `Depends` in `Annotated` and a default value together for {param_name!r}"
) )
assert field_info is None, (
"Cannot specify a FastAPI annotation in `Annotated` and `Depends` as a" if field_info is not None:
f" default value together for {param_name!r}" raise ValueError(
f"Cannot specify a FastAPI annotation in `Annotated` and `Depends` as a default value "
f"together for {param_name!r}"
) )
depends = value depends = value
# Get FieldInfo from default value # Get FieldInfo from default value
elif isinstance(value, FieldInfo): elif isinstance(value, FieldInfo):
assert field_info is None, ( if field_info is not None:
"Cannot specify FastAPI annotations in `Annotated` and default value" raise ValueError(
f"Cannot specify FastAPI annotations in `Annotated` and a default value "
f"together for {param_name!r}" f"together for {param_name!r}"
) )
field_info = value field_info = value
if PYDANTIC_V2: if PYDANTIC_V2:
field_info.annotation = type_annotation field_info.annotation = type_annotation
@ -438,10 +471,14 @@ def analyze_param(
SecurityScopes, SecurityScopes,
), ),
): ):
assert depends is None, f"Cannot specify `Depends` for type {type_annotation!r}" if depends is not None:
assert field_info is None, ( raise ValueError(f"Cannot specify `Depends` for type {type_annotation!r}")
if field_info is not None:
raise ValueError(
f"Cannot specify FastAPI annotation for type {type_annotation!r}" f"Cannot specify FastAPI annotation for type {type_annotation!r}"
) )
# Handle default assignations, neither field_info nor depends was not found in Annotated nor default value # Handle default assignations, neither field_info nor depends was not found in Annotated nor default value
elif field_info is None and depends is None: elif field_info is None and depends is None:
default_value = value if value is not inspect.Signature.empty else RequiredParam default_value = value if value is not inspect.Signature.empty else RequiredParam
@ -464,10 +501,11 @@ def analyze_param(
if field_info is not None: if field_info is not None:
# Handle field_info.in_ # Handle field_info.in_
if is_path_param: if is_path_param:
assert isinstance(field_info, params.Path), ( if not isinstance(field_info, params.Path):
f"Cannot use `{field_info.__class__.__name__}` for path param" raise TypeError(
f" {param_name!r}" f"Cannot use `{field_info.__class__.__name__}` for path param {param_name!r}"
) )
elif ( elif (
isinstance(field_info, params.Param) isinstance(field_info, params.Param)
and getattr(field_info, "in_", None) is None and getattr(field_info, "in_", None) is None
@ -494,18 +532,24 @@ def analyze_param(
field_info=field_info, field_info=field_info,
) )
if is_path_param: if is_path_param:
assert is_scalar_field(field=field), ( if not is_scalar_field(field=field):
"Path params must be of one of the supported types" raise TypeError(
"Path parameters must be of one of the supported scalar types"
) )
elif isinstance(field_info, params.Query): elif isinstance(field_info, params.Query):
assert ( if not (
is_scalar_field(field) is_scalar_field(field)
or is_scalar_sequence_field(field) or is_scalar_sequence_field(field)
or ( or (
lenient_issubclass(field.type_, BaseModel) isinstance(field.type_, type)
# For Pydantic v1 and issubclass(field.type_, BaseModel)
and getattr(field, "shape", 1) == 1 and getattr(field, "shape", 1) == 1 # shape check for Pydantic v1
) )
):
raise TypeError(
f"Query parameter {field.name!r} must be a supported scalar, sequence, "
"or a non-nested Pydantic model (shape == 1)"
) )
return ParamDetails(type_annotation=type_annotation, depends=depends, field=field) return ParamDetails(type_annotation=type_annotation, depends=depends, field=field)
@ -521,9 +565,11 @@ def add_param_to_fields(*, field: ModelField, dependant: Dependant) -> None:
elif field_info_in == params.ParamTypes.header: elif field_info_in == params.ParamTypes.header:
dependant.header_params.append(field) dependant.header_params.append(field)
else: else:
assert field_info_in == params.ParamTypes.cookie, ( if field_info_in != params.ParamTypes.cookie:
f"non-body parameters must be in path, query, header or cookie: {field.name}" raise ValueError(
f"Non-body parameters must be in path, query, header, or cookie: {field.name}"
) )
dependant.cookie_params.append(field) dependant.cookie_params.append(field)
@ -790,9 +836,9 @@ def request_params_to_args(
if single_not_embedded_field: if single_not_embedded_field:
field_info = first_field.field_info field_info = first_field.field_info
assert isinstance(field_info, params.Param), ( if not isinstance(field_info, params.Param):
"Params must be subclasses of Param" raise TypeError("Params must be subclasses of Param")
)
loc: Tuple[str, ...] = (field_info.in_.value,) loc: Tuple[str, ...] = (field_info.in_.value,)
v_, errors_ = _validate_value_with_model_field( v_, errors_ = _validate_value_with_model_field(
field=first_field, value=params_to_process, values=values, loc=loc field=first_field, value=params_to_process, values=values, loc=loc
@ -802,9 +848,11 @@ def request_params_to_args(
for field in fields: for field in fields:
value = _get_multidict_value(field, received_params) value = _get_multidict_value(field, received_params)
field_info = field.field_info field_info = field.field_info
assert isinstance(field_info, params.Param), ( if not isinstance(field_info, params.Param):
"Params must be subclasses of Param" raise TypeError(
f"Invalid parameter type: expected a subclass of Param, got {type(field_info).__name__}"
) )
loc = (field_info.in_.value, field.alias) loc = (field_info.in_.value, field.alias)
v_, errors_ = _validate_value_with_model_field( v_, errors_ = _validate_value_with_model_field(
field=field, value=value, values=values, loc=loc field=field, value=value, values=values, loc=loc
@ -881,7 +929,11 @@ async def _extract_form_body(
and value_is_sequence(value) and value_is_sequence(value)
): ):
# For types # For types
assert isinstance(value, sequence_types) # type: ignore[arg-type] if not isinstance(value, sequence_types):
raise TypeError(
f"Expected a sequence type (e.g., list, tuple), got {type(value).__name__}"
)
results: List[Union[bytes, str]] = [] results: List[Union[bytes, str]] = []
async def process_fn( async def process_fn(
@ -909,7 +961,12 @@ async def request_body_to_args(
) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
values: Dict[str, Any] = {} values: Dict[str, Any] = {}
errors: List[Dict[str, Any]] = [] errors: List[Dict[str, Any]] = []
assert body_fields, "request_body_to_args() should be called with fields"
if not body_fields:
raise ValueError(
"request_body_to_args() should be called with at least one field"
)
single_not_embedded_field = len(body_fields) == 1 and not embed_body_fields single_not_embedded_field = len(body_fields) == 1 and not embed_body_fields
first_field = body_fields[0] first_field = body_fields[0]
body_to_process = received_body body_to_process = received_body

4
fastapi/openapi/models.py

@ -15,9 +15,7 @@ from typing_extensions import Annotated, Literal, TypedDict
from typing_extensions import deprecated as typing_deprecated from typing_extensions import deprecated as typing_deprecated
try: try:
import email_validator import email_validator # noqa: F401
assert email_validator # make autoflake ignore the unused import
from pydantic import EmailStr from pydantic import EmailStr
except ImportError: # pragma: no cover except ImportError: # pragma: no cover

30
fastapi/openapi/utils.py

@ -179,7 +179,12 @@ def get_openapi_operation_request_body(
) -> Optional[Dict[str, Any]]: ) -> Optional[Dict[str, Any]]:
if not body_field: if not body_field:
return None return None
assert isinstance(body_field, ModelField)
if not isinstance(body_field, ModelField):
raise TypeError(
f"Expected body_field to be a ModelField, got {type(body_field).__name__}"
)
body_schema = get_schema_from_model_field( body_schema = get_schema_from_model_field(
field=body_field, field=body_field,
schema_generator=schema_generator, schema_generator=schema_generator,
@ -265,12 +270,18 @@ def get_openapi_path(
path = {} path = {}
security_schemes: Dict[str, Any] = {} security_schemes: Dict[str, Any] = {}
definitions: Dict[str, Any] = {} definitions: Dict[str, Any] = {}
assert route.methods is not None, "Methods must be a list"
if route.methods is None:
raise ValueError("Route methods must be defined as a list of HTTP methods")
if isinstance(route.response_class, DefaultPlaceholder): if isinstance(route.response_class, DefaultPlaceholder):
current_response_class: Type[Response] = route.response_class.value current_response_class: Type[Response] = route.response_class.value
else: else:
current_response_class = route.response_class current_response_class = route.response_class
assert current_response_class, "A response class is needed to generate OpenAPI"
if not current_response_class:
raise ValueError("A response class is needed to generate OpenAPI")
route_response_media_type: Optional[str] = current_response_class.media_type route_response_media_type: Optional[str] = current_response_class.media_type
if route.include_in_schema: if route.include_in_schema:
for method in route.methods: for method in route.methods:
@ -385,9 +396,10 @@ def get_openapi_path(
openapi_response = operation_responses.setdefault( openapi_response = operation_responses.setdefault(
status_code_key, {} status_code_key, {}
) )
assert isinstance(process_response, dict), (
"An additional response must be a dict" if not isinstance(process_response, dict):
) raise TypeError("An additional response must be a dict")
field = route.response_fields.get(additional_status_code) field = route.response_fields.get(additional_status_code)
additional_field_schema: Optional[Dict[str, Any]] = None additional_field_schema: Optional[Dict[str, Any]] = None
if field: if field:
@ -455,9 +467,11 @@ def get_fields_from_routes(
route, routing.APIRoute route, routing.APIRoute
): ):
if route.body_field: if route.body_field:
assert isinstance(route.body_field, ModelField), ( if not isinstance(route.body_field, ModelField):
"A request body must be a Pydantic Field" raise TypeError(
"A request body must be a Pydantic ModelField instance"
) )
body_fields_from_routes.append(route.body_field) body_fields_from_routes.append(route.body_field)
if route.response_field: if route.response_field:
responses_from_routes.append(route.response_field) responses_from_routes.append(route.response_field)

4
fastapi/params.py

@ -186,7 +186,9 @@ class Path(Param):
json_schema_extra: Union[Dict[str, Any], None] = None, json_schema_extra: Union[Dict[str, Any], None] = None,
**extra: Any, **extra: Any,
): ):
assert default is ..., "Path parameters cannot have a default value" if default is ...:
raise ValueError("Path parameters cannot have a default value")
self.in_ = self.in_ self.in_ = self.in_
super().__init__( super().__init__(
default=default, default=default,

23
fastapi/responses.py

@ -1,12 +1,14 @@
from typing import Any from typing import Any
from starlette.responses import FileResponse as FileResponse # noqa from starlette.responses import ( # noqa: F401
from starlette.responses import HTMLResponse as HTMLResponse # noqa FileResponse, # noqa: F401
from starlette.responses import JSONResponse as JSONResponse # noqa HTMLResponse, # noqa: F401
from starlette.responses import PlainTextResponse as PlainTextResponse # noqa JSONResponse, # noqa: F401
from starlette.responses import RedirectResponse as RedirectResponse # noqa PlainTextResponse, # noqa: F401
from starlette.responses import Response as Response # noqa RedirectResponse, # noqa: F401
from starlette.responses import StreamingResponse as StreamingResponse # noqa Response, # noqa: F401
StreamingResponse, # noqa: F401
)
try: try:
import ujson import ujson
@ -29,7 +31,8 @@ class UJSONResponse(JSONResponse):
""" """
def render(self, content: Any) -> bytes: def render(self, content: Any) -> bytes:
assert ujson is not None, "ujson must be installed to use UJSONResponse" if ujson is None:
raise RuntimeError("ujson must be installed to use UJSONResponse")
return ujson.dumps(content, ensure_ascii=False).encode("utf-8") return ujson.dumps(content, ensure_ascii=False).encode("utf-8")
@ -42,7 +45,9 @@ class ORJSONResponse(JSONResponse):
""" """
def render(self, content: Any) -> bytes: def render(self, content: Any) -> bytes:
assert orjson is not None, "orjson must be installed to use ORJSONResponse" print("Inside the render my boi")
if orjson is None:
raise RuntimeError("orjson must be installed to use ORJSONResponse")
return orjson.dumps( return orjson.dumps(
content, option=orjson.OPT_NON_STR_KEYS | orjson.OPT_SERIALIZE_NUMPY content, option=orjson.OPT_NON_STR_KEYS | orjson.OPT_SERIALIZE_NUMPY
) )

38
fastapi/routing.py

@ -207,7 +207,8 @@ async def run_endpoint_function(
) -> Any: ) -> Any:
# Only called by get_request_handler. Has been split into its own function to # Only called by get_request_handler. Has been split into its own function to
# facilitate profiling endpoints, since inner functions are harder to profile. # facilitate profiling endpoints, since inner functions are harder to profile.
assert dependant.call is not None, "dependant.call must be a function" if dependant.call is None:
raise ValueError("dependant.call must be a callable function")
if is_coroutine: if is_coroutine:
return await dependant.call(**values) return await dependant.call(**values)
@ -230,7 +231,8 @@ def get_request_handler(
dependency_overrides_provider: Optional[Any] = None, dependency_overrides_provider: Optional[Any] = None,
embed_body_fields: bool = False, embed_body_fields: bool = False,
) -> Callable[[Request], Coroutine[Any, Any, Response]]: ) -> Callable[[Request], Coroutine[Any, Any, Response]]:
assert dependant.call is not None, "dependant.call must be a function" if dependant.call is None:
raise ValueError("dependant.call must be a callable function")
is_coroutine = asyncio.iscoroutinefunction(dependant.call) is_coroutine = asyncio.iscoroutinefunction(dependant.call)
is_body_form = body_field and isinstance(body_field.field_info, params.Form) is_body_form = body_field and isinstance(body_field.field_info, params.Form)
if isinstance(response_class, DefaultPlaceholder): if isinstance(response_class, DefaultPlaceholder):
@ -380,7 +382,8 @@ def get_websocket_app(
raise WebSocketRequestValidationError( raise WebSocketRequestValidationError(
_normalize_errors(solved_result.errors) _normalize_errors(solved_result.errors)
) )
assert dependant.call is not None, "dependant.call must be a function" if dependant.call is None:
raise ValueError("dependant.call must be a callable function")
await dependant.call(**solved_result.values) await dependant.call(**solved_result.values)
return app return app
@ -505,7 +508,8 @@ class APIRoute(routing.Route):
status_code = int(status_code) status_code = int(status_code)
self.status_code = status_code self.status_code = status_code
if self.response_model: if self.response_model:
assert is_body_allowed_for_status_code(status_code), ( if not is_body_allowed_for_status_code(status_code):
raise ValueError(
f"Status code {status_code} must not have a response body" f"Status code {status_code} must not have a response body"
) )
response_name = "Response_" + self.unique_id response_name = "Response_" + self.unique_id
@ -535,11 +539,13 @@ class APIRoute(routing.Route):
self.description = self.description.split("\f")[0].strip() self.description = self.description.split("\f")[0].strip()
response_fields = {} response_fields = {}
for additional_status_code, response in self.responses.items(): for additional_status_code, response in self.responses.items():
assert isinstance(response, dict), "An additional response must be a dict" if not isinstance(response, dict):
raise TypeError("An additional response must be a dict")
model = response.get("model") model = response.get("model")
if model: if model:
assert is_body_allowed_for_status_code(additional_status_code), ( if not is_body_allowed_for_status_code(status_code):
f"Status code {additional_status_code} must not have a response body" raise ValueError(
f"Status code {status_code} must not have a response body"
) )
response_name = f"Response_{additional_status_code}_{self.unique_id}" response_name = f"Response_{additional_status_code}_{self.unique_id}"
response_field = create_model_field( response_field = create_model_field(
@ -551,7 +557,9 @@ class APIRoute(routing.Route):
else: else:
self.response_fields = {} self.response_fields = {}
assert callable(endpoint), "An endpoint must be a callable" if not callable(endpoint):
raise TypeError("An endpoint must be a callable")
self.dependant = get_dependant(path=self.path_format, call=self.endpoint) self.dependant = get_dependant(path=self.path_format, call=self.endpoint)
for depends in self.dependencies[::-1]: for depends in self.dependencies[::-1]:
self.dependant.dependencies.insert( self.dependant.dependencies.insert(
@ -844,10 +852,13 @@ class APIRouter(routing.Router):
lifespan=lifespan, lifespan=lifespan,
) )
if prefix: if prefix:
assert prefix.startswith("/"), "A path prefix must start with '/'" if not prefix.startswith("/"):
assert not prefix.endswith("/"), ( raise ValueError("A path prefix must start with '/'")
if prefix.endswith("/"):
raise ValueError(
"A path prefix must not end with '/', as the routes will start with '/'" "A path prefix must not end with '/', as the routes will start with '/'"
) )
self.prefix = prefix self.prefix = prefix
self.tags: List[Union[str, Enum]] = tags or [] self.tags: List[Union[str, Enum]] = tags or []
self.dependencies = list(dependencies or []) self.dependencies = list(dependencies or [])
@ -1256,10 +1267,13 @@ class APIRouter(routing.Router):
``` ```
""" """
if prefix: if prefix:
assert prefix.startswith("/"), "A path prefix must start with '/'" if not prefix.startswith("/"):
assert not prefix.endswith("/"), ( raise ValueError("A path prefix must start with '/'")
if prefix.endswith("/"):
raise ValueError(
"A path prefix must not end with '/', as the routes will start with '/'" "A path prefix must not end with '/', as the routes will start with '/'"
) )
else: else:
for r in router.routes: for r in router.routes:
path = getattr(r, "path") # noqa: B009 path = getattr(r, "path") # noqa: B009

4
fastapi/utils.py

@ -177,9 +177,11 @@ def generate_operation_id_for_path(
def generate_unique_id(route: "APIRoute") -> str: def generate_unique_id(route: "APIRoute") -> str:
if not route.methods:
raise ValueError("Route must have at least one HTTP method")
operation_id = f"{route.name}{route.path_format}" operation_id = f"{route.name}{route.path_format}"
operation_id = re.sub(r"\W", "_", operation_id) operation_id = re.sub(r"\W", "_", operation_id)
assert route.methods
operation_id = f"{operation_id}_{list(route.methods)[0].lower()}" operation_id = f"{operation_id}_{list(route.methods)[0].lower()}"
return operation_id return operation_id

Loading…
Cancel
Save