Browse Source

[fix] Convert all instances of assert for safer error handling

pull/13589/head
Tyler Epperson 4 months ago
parent
commit
11b0eff14a
  1. 3
      fastapi/_compat.py
  2. 6
      fastapi/applications.py
  3. 184
      fastapi/dependencies/utils.py
  4. 4
      fastapi/openapi/models.py
  5. 28
      fastapi/openapi/utils.py
  6. 4
      fastapi/params.py
  7. 23
      fastapi/responses.py
  8. 44
      fastapi/routing.py
  9. 4
      fastapi/utils.py

3
fastapi/_compat.py

@ -264,7 +264,8 @@ if PYDANTIC_V2:
origin_type = ( origin_type = (
get_origin(field.field_info.annotation) or field.field_info.annotation get_origin(field.field_info.annotation) or field.field_info.annotation
) )
assert issubclass(origin_type, sequence_types) # type: ignore[arg-type] if not issubclass(origin_type, sequence_types) or not isinstance(origin_type, type):
raise TypeError(f"Field {field.name} is not a supported sequence type")
return sequence_annotation_to_type[origin_type](value) # type: ignore[no-any-return] return sequence_annotation_to_type[origin_type](value) # type: ignore[no-any-return]
def get_missing_field_error(loc: Tuple[str, ...]) -> Dict[str, Any]: def get_missing_field_error(loc: Tuple[str, ...]) -> Dict[str, Any]:

6
fastapi/applications.py

@ -872,8 +872,10 @@ class FastAPI(Starlette):
] = "3.1.0" ] = "3.1.0"
self.openapi_schema: Optional[Dict[str, Any]] = None self.openapi_schema: Optional[Dict[str, Any]] = None
if self.openapi_url: if self.openapi_url:
assert self.title, "A title must be provided for OpenAPI, e.g.: 'My API'" if not self.title:
assert self.version, "A version must be provided for OpenAPI, e.g.: '2.1.0'" raise ValueError("A title must be provided for OpenAPI, e.g.: 'My API'")
if not self.version:
raise ValueError("A version must be provided for OpenAPI, e.g.: '2.1.0'")
# TODO: remove when discarding the openapi_prefix parameter # TODO: remove when discarding the openapi_prefix parameter
if openapi_prefix: if openapi_prefix:
logger.warning( logger.warning(

184
fastapi/dependencies/utils.py

@ -90,29 +90,38 @@ multipart_incorrect_install_error = (
def ensure_multipart_is_installed() -> None: def ensure_multipart_is_installed() -> None:
try: try:
from python_multipart import __version__ from python_multipart import __version__ as python_multipart_version
# Import an attribute that can be mocked/deleted in testing # Import an attribute that can be mocked/deleted in testing
assert __version__ > "0.0.12" if python_multipart_version <= "0.0.12":
except (ImportError, AssertionError): raise RuntimeError(
try: f"Incompatible 'python-multipart' version: {python_multipart_version}. "
# __version__ is available in both multiparts, and can be mocked "Please upgrade to a version > 0.0.12"
from multipart import __version__ # type: ignore[no-redef,import-untyped] )
return
assert __version__ except ImportError:
try: pass # fallback to trying multipart
# parse_options_header is only available in the right multipart except Exception as e:
from multipart.multipart import ( # type: ignore[import-untyped] logger.error("Error checking 'python-multipart' version: %s", e)
parse_options_header, raise
)
assert parse_options_header try:
except ImportError: # __version__ is available in both multiparts, and can be mocked
logger.error(multipart_incorrect_install_error) from multipart import __version__ as multipart_version # type: ignore[no-redef,import-untyped]
raise RuntimeError(multipart_incorrect_install_error) from None except ImportError:
except ImportError: logger.error(multipart_not_installed_error)
logger.error(multipart_not_installed_error) raise RuntimeError(multipart_not_installed_error) from None
raise RuntimeError(multipart_not_installed_error) from None
if not multipart_version:
logger.error(multipart_incorrect_install_error)
raise RuntimeError(multipart_incorrect_install_error)
try:
from multipart.multipart import parse_options_header # type: ignore[import-untyped]
except ImportError:
logger.error(multipart_incorrect_install_error)
raise RuntimeError(multipart_incorrect_install_error) from None
def get_param_sub_dependant( def get_param_sub_dependant(
@ -122,7 +131,9 @@ def get_param_sub_dependant(
path: str, path: str,
security_scopes: Optional[List[str]] = None, security_scopes: Optional[List[str]] = None,
) -> Dependant: ) -> Dependant:
assert depends.dependency if depends.dependency is None:
raise ValueError(f"`depends.dependency` must be set for parameter '{param_name}'")
return get_sub_dependant( return get_sub_dependant(
depends=depends, depends=depends,
dependency=depends.dependency, dependency=depends.dependency,
@ -133,9 +144,9 @@ def get_param_sub_dependant(
def get_parameterless_sub_dependant(*, depends: params.Depends, path: str) -> Dependant: def get_parameterless_sub_dependant(*, depends: params.Depends, path: str) -> Dependant:
assert callable(depends.dependency), ( if not callable(depends.dependency):
"A parameter-less dependency must have a callable dependency" raise TypeError("A parameter-less dependency must have a callable dependency")
)
return get_sub_dependant(depends=depends, dependency=depends.dependency, path=path) return get_sub_dependant(depends=depends, dependency=depends.dependency, path=path)
@ -302,11 +313,14 @@ def get_dependant(
type_annotation=param_details.type_annotation, type_annotation=param_details.type_annotation,
dependant=dependant, dependant=dependant,
): ):
assert param_details.field is None, ( if param_details.field is not None:
f"Cannot specify multiple FastAPI annotations for {param_name!r}" raise ValueError(f"Cannot specify multiple FastAPI annotations for {param_name!r}")
)
continue continue
assert param_details.field is not None
if param_details.field is None:
raise ValueError("Expected param_details.field to be set, but got None")
if isinstance(param_details.field.field_info, params.Body): if isinstance(param_details.field.field_info, params.Body):
dependant.body_params.append(param_details.field) dependant.body_params.append(param_details.field)
else: else:
@ -385,14 +399,17 @@ def analyze_param(
field_info = copy_field_info( field_info = copy_field_info(
field_info=fastapi_annotation, annotation=use_annotation field_info=fastapi_annotation, annotation=use_annotation
) )
assert (
field_info.default is Undefined or field_info.default is RequiredParam if field_info.default is not Undefined and field_info.default is not RequiredParam:
), ( raise ValueError(
f"`{field_info.__class__.__name__}` default value cannot be set in" f"`{field_info.__class__.__name__}` default value cannot be set in "
f" `Annotated` for {param_name!r}. Set the default value with `=` instead." f"`Annotated` for {param_name!r}. Set the default value with `=` instead."
) )
if value is not inspect.Signature.empty: if value is not inspect.Signature.empty:
assert not is_path_param, "Path parameters cannot have default values" if is_path_param:
raise ValueError("Path parameters cannot have default values")
field_info.default = value field_info.default = value
else: else:
field_info.default = RequiredParam field_info.default = RequiredParam
@ -401,21 +418,26 @@ def analyze_param(
depends = fastapi_annotation depends = fastapi_annotation
# Get Depends from default value # Get Depends from default value
if isinstance(value, params.Depends): if isinstance(value, params.Depends):
assert depends is None, ( if depends is not None:
"Cannot specify `Depends` in `Annotated` and default value" raise ValueError(
f" together for {param_name!r}" f"Cannot specify `Depends` in `Annotated` and a default value together for {param_name!r}"
) )
assert field_info is None, (
"Cannot specify a FastAPI annotation in `Annotated` and `Depends` as a" if field_info is not None:
f" default value together for {param_name!r}" raise ValueError(
) f"Cannot specify a FastAPI annotation in `Annotated` and `Depends` as a default value "
f"together for {param_name!r}"
)
depends = value depends = value
# Get FieldInfo from default value # Get FieldInfo from default value
elif isinstance(value, FieldInfo): elif isinstance(value, FieldInfo):
assert field_info is None, ( if field_info is not None:
"Cannot specify FastAPI annotations in `Annotated` and default value" raise ValueError(
f" together for {param_name!r}" f"Cannot specify FastAPI annotations in `Annotated` and a default value "
) f"together for {param_name!r}"
)
field_info = value field_info = value
if PYDANTIC_V2: if PYDANTIC_V2:
field_info.annotation = type_annotation field_info.annotation = type_annotation
@ -438,10 +460,12 @@ def analyze_param(
SecurityScopes, SecurityScopes,
), ),
): ):
assert depends is None, f"Cannot specify `Depends` for type {type_annotation!r}" if depends is not None:
assert field_info is None, ( raise ValueError(f"Cannot specify `Depends` for type {type_annotation!r}")
f"Cannot specify FastAPI annotation for type {type_annotation!r}"
) if field_info is not None:
raise ValueError(f"Cannot specify FastAPI annotation for type {type_annotation!r}")
# Handle default assignations, neither field_info nor depends was not found in Annotated nor default value # Handle default assignations, neither field_info nor depends was not found in Annotated nor default value
elif field_info is None and depends is None: elif field_info is None and depends is None:
default_value = value if value is not inspect.Signature.empty else RequiredParam default_value = value if value is not inspect.Signature.empty else RequiredParam
@ -464,10 +488,11 @@ def analyze_param(
if field_info is not None: if field_info is not None:
# Handle field_info.in_ # Handle field_info.in_
if is_path_param: if is_path_param:
assert isinstance(field_info, params.Path), ( if not isinstance(field_info, params.Path):
f"Cannot use `{field_info.__class__.__name__}` for path param" raise TypeError(
f" {param_name!r}" f"Cannot use `{field_info.__class__.__name__}` for path param {param_name!r}"
) )
elif ( elif (
isinstance(field_info, params.Param) isinstance(field_info, params.Param)
and getattr(field_info, "in_", None) is None and getattr(field_info, "in_", None) is None
@ -494,19 +519,23 @@ def analyze_param(
field_info=field_info, field_info=field_info,
) )
if is_path_param: if is_path_param:
assert is_scalar_field(field=field), ( if not is_scalar_field(field=field):
"Path params must be of one of the supported types" raise TypeError("Path parameters must be of one of the supported scalar types")
)
elif isinstance(field_info, params.Query): elif isinstance(field_info, params.Query):
assert ( if not (
is_scalar_field(field) is_scalar_field(field)
or is_scalar_sequence_field(field) or is_scalar_sequence_field(field)
or ( or (
lenient_issubclass(field.type_, BaseModel) isinstance(field.type_, type) and
# For Pydantic v1 issubclass(field.type_, BaseModel) and
and getattr(field, "shape", 1) == 1 getattr(field, "shape", 1) == 1 # shape check for Pydantic v1
)
):
raise TypeError(
f"Query parameter {field.name!r} must be a supported scalar, sequence, "
"or a non-nested Pydantic model (shape == 1)"
) )
)
return ParamDetails(type_annotation=type_annotation, depends=depends, field=field) return ParamDetails(type_annotation=type_annotation, depends=depends, field=field)
@ -521,9 +550,11 @@ def add_param_to_fields(*, field: ModelField, dependant: Dependant) -> None:
elif field_info_in == params.ParamTypes.header: elif field_info_in == params.ParamTypes.header:
dependant.header_params.append(field) dependant.header_params.append(field)
else: else:
assert field_info_in == params.ParamTypes.cookie, ( if field_info_in != params.ParamTypes.cookie:
f"non-body parameters must be in path, query, header or cookie: {field.name}" raise ValueError(
) f"Non-body parameters must be in path, query, header, or cookie: {field.name}"
)
dependant.cookie_params.append(field) dependant.cookie_params.append(field)
@ -790,9 +821,9 @@ def request_params_to_args(
if single_not_embedded_field: if single_not_embedded_field:
field_info = first_field.field_info field_info = first_field.field_info
assert isinstance(field_info, params.Param), ( if not isinstance(field_info, params.Param):
"Params must be subclasses of Param" raise TypeError("Params must be subclasses of Param")
)
loc: Tuple[str, ...] = (field_info.in_.value,) loc: Tuple[str, ...] = (field_info.in_.value,)
v_, errors_ = _validate_value_with_model_field( v_, errors_ = _validate_value_with_model_field(
field=first_field, value=params_to_process, values=values, loc=loc field=first_field, value=params_to_process, values=values, loc=loc
@ -802,9 +833,11 @@ def request_params_to_args(
for field in fields: for field in fields:
value = _get_multidict_value(field, received_params) value = _get_multidict_value(field, received_params)
field_info = field.field_info field_info = field.field_info
assert isinstance(field_info, params.Param), ( if not isinstance(field_info, params.Param):
"Params must be subclasses of Param" raise TypeError(
) f"Invalid parameter type: expected a subclass of Param, got {type(field_info).__name__}"
)
loc = (field_info.in_.value, field.alias) loc = (field_info.in_.value, field.alias)
v_, errors_ = _validate_value_with_model_field( v_, errors_ = _validate_value_with_model_field(
field=field, value=value, values=values, loc=loc field=field, value=value, values=values, loc=loc
@ -860,7 +893,9 @@ async def _extract_form_body(
and value_is_sequence(value) and value_is_sequence(value)
): ):
# For types # For types
assert isinstance(value, sequence_types) # type: ignore[arg-type] if not isinstance(value, sequence_types):
raise TypeError(f"Expected a sequence type (e.g., list, tuple), got {type(value).__name__}")
results: List[Union[bytes, str]] = [] results: List[Union[bytes, str]] = []
async def process_fn( async def process_fn(
@ -888,7 +923,10 @@ async def request_body_to_args(
) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
values: Dict[str, Any] = {} values: Dict[str, Any] = {}
errors: List[Dict[str, Any]] = [] errors: List[Dict[str, Any]] = []
assert body_fields, "request_body_to_args() should be called with fields"
if not body_fields:
raise ValueError("request_body_to_args() should be called with at least one field")
single_not_embedded_field = len(body_fields) == 1 and not embed_body_fields single_not_embedded_field = len(body_fields) == 1 and not embed_body_fields
first_field = body_fields[0] first_field = body_fields[0]
body_to_process = received_body body_to_process = received_body

4
fastapi/openapi/models.py

@ -15,9 +15,7 @@ from typing_extensions import Annotated, Literal, TypedDict
from typing_extensions import deprecated as typing_deprecated from typing_extensions import deprecated as typing_deprecated
try: try:
import email_validator import email_validator # noqa: F401
assert email_validator # make autoflake ignore the unused import
from pydantic import EmailStr from pydantic import EmailStr
except ImportError: # pragma: no cover except ImportError: # pragma: no cover

28
fastapi/openapi/utils.py

@ -179,7 +179,10 @@ def get_openapi_operation_request_body(
) -> Optional[Dict[str, Any]]: ) -> Optional[Dict[str, Any]]:
if not body_field: if not body_field:
return None return None
assert isinstance(body_field, ModelField)
if not isinstance(body_field, ModelField):
raise TypeError(f"Expected body_field to be a ModelField, got {type(body_field).__name__}")
body_schema = get_schema_from_model_field( body_schema = get_schema_from_model_field(
field=body_field, field=body_field,
schema_generator=schema_generator, schema_generator=schema_generator,
@ -265,12 +268,18 @@ def get_openapi_path(
path = {} path = {}
security_schemes: Dict[str, Any] = {} security_schemes: Dict[str, Any] = {}
definitions: Dict[str, Any] = {} definitions: Dict[str, Any] = {}
assert route.methods is not None, "Methods must be a list"
if route.methods is None:
raise ValueError("Route methods must be defined as a list of HTTP methods")
if isinstance(route.response_class, DefaultPlaceholder): if isinstance(route.response_class, DefaultPlaceholder):
current_response_class: Type[Response] = route.response_class.value current_response_class: Type[Response] = route.response_class.value
else: else:
current_response_class = route.response_class current_response_class = route.response_class
assert current_response_class, "A response class is needed to generate OpenAPI"
if not current_response_class:
raise ValueError("A response class is needed to generate OpenAPI")
route_response_media_type: Optional[str] = current_response_class.media_type route_response_media_type: Optional[str] = current_response_class.media_type
if route.include_in_schema: if route.include_in_schema:
for method in route.methods: for method in route.methods:
@ -385,9 +394,10 @@ def get_openapi_path(
openapi_response = operation_responses.setdefault( openapi_response = operation_responses.setdefault(
status_code_key, {} status_code_key, {}
) )
assert isinstance(process_response, dict), (
"An additional response must be a dict" if not isinstance(process_response, dict):
) raise TypeError("An additional response must be a dict")
field = route.response_fields.get(additional_status_code) field = route.response_fields.get(additional_status_code)
additional_field_schema: Optional[Dict[str, Any]] = None additional_field_schema: Optional[Dict[str, Any]] = None
if field: if field:
@ -455,9 +465,9 @@ def get_fields_from_routes(
route, routing.APIRoute route, routing.APIRoute
): ):
if route.body_field: if route.body_field:
assert isinstance(route.body_field, ModelField), ( if not isinstance(route.body_field, ModelField):
"A request body must be a Pydantic Field" raise TypeError("A request body must be a Pydantic ModelField instance")
)
body_fields_from_routes.append(route.body_field) body_fields_from_routes.append(route.body_field)
if route.response_field: if route.response_field:
responses_from_routes.append(route.response_field) responses_from_routes.append(route.response_field)

4
fastapi/params.py

@ -186,7 +186,9 @@ class Path(Param):
json_schema_extra: Union[Dict[str, Any], None] = None, json_schema_extra: Union[Dict[str, Any], None] = None,
**extra: Any, **extra: Any,
): ):
assert default is ..., "Path parameters cannot have a default value" if default is ...:
raise ValueError("Path parameters cannot have a default value")
self.in_ = self.in_ self.in_ = self.in_
super().__init__( super().__init__(
default=default, default=default,

23
fastapi/responses.py

@ -1,12 +1,14 @@
from typing import Any from typing import Any
from starlette.responses import FileResponse as FileResponse # noqa from starlette.responses import (
from starlette.responses import HTMLResponse as HTMLResponse # noqa FileResponse,
from starlette.responses import JSONResponse as JSONResponse # noqa HTMLResponse,
from starlette.responses import PlainTextResponse as PlainTextResponse # noqa JSONResponse,
from starlette.responses import RedirectResponse as RedirectResponse # noqa PlainTextResponse,
from starlette.responses import Response as Response # noqa RedirectResponse,
from starlette.responses import StreamingResponse as StreamingResponse # noqa Response,
StreamingResponse,
) # noqa
try: try:
import ujson import ujson
@ -29,7 +31,8 @@ class UJSONResponse(JSONResponse):
""" """
def render(self, content: Any) -> bytes: def render(self, content: Any) -> bytes:
assert ujson is not None, "ujson must be installed to use UJSONResponse" if ujson is None:
raise RuntimeError("ujson must be installed to use UJSONResponse")
return ujson.dumps(content, ensure_ascii=False).encode("utf-8") return ujson.dumps(content, ensure_ascii=False).encode("utf-8")
@ -42,7 +45,9 @@ class ORJSONResponse(JSONResponse):
""" """
def render(self, content: Any) -> bytes: def render(self, content: Any) -> bytes:
assert orjson is not None, "orjson must be installed to use ORJSONResponse" print("Inside the render my boi")
if orjson is None:
raise RuntimeError("orjson must be installed to use ORJSONResponse")
return orjson.dumps( return orjson.dumps(
content, option=orjson.OPT_NON_STR_KEYS | orjson.OPT_SERIALIZE_NUMPY content, option=orjson.OPT_NON_STR_KEYS | orjson.OPT_SERIALIZE_NUMPY
) )

44
fastapi/routing.py

@ -206,7 +206,8 @@ async def run_endpoint_function(
) -> Any: ) -> Any:
# Only called by get_request_handler. Has been split into its own function to # Only called by get_request_handler. Has been split into its own function to
# facilitate profiling endpoints, since inner functions are harder to profile. # facilitate profiling endpoints, since inner functions are harder to profile.
assert dependant.call is not None, "dependant.call must be a function" if dependant.call is None:
raise ValueError("dependant.call must be a callable function")
if is_coroutine: if is_coroutine:
return await dependant.call(**values) return await dependant.call(**values)
@ -229,7 +230,8 @@ def get_request_handler(
dependency_overrides_provider: Optional[Any] = None, dependency_overrides_provider: Optional[Any] = None,
embed_body_fields: bool = False, embed_body_fields: bool = False,
) -> Callable[[Request], Coroutine[Any, Any, Response]]: ) -> Callable[[Request], Coroutine[Any, Any, Response]]:
assert dependant.call is not None, "dependant.call must be a function" if dependant.call is None:
raise ValueError("dependant.call must be a callable function")
is_coroutine = asyncio.iscoroutinefunction(dependant.call) is_coroutine = asyncio.iscoroutinefunction(dependant.call)
is_body_form = body_field and isinstance(body_field.field_info, params.Form) is_body_form = body_field and isinstance(body_field.field_info, params.Form)
if isinstance(response_class, DefaultPlaceholder): if isinstance(response_class, DefaultPlaceholder):
@ -379,7 +381,8 @@ def get_websocket_app(
raise WebSocketRequestValidationError( raise WebSocketRequestValidationError(
_normalize_errors(solved_result.errors) _normalize_errors(solved_result.errors)
) )
assert dependant.call is not None, "dependant.call must be a function" if dependant.call is None:
raise ValueError("dependant.call must be a callable function")
await dependant.call(**solved_result.values) await dependant.call(**solved_result.values)
return app return app
@ -504,9 +507,8 @@ class APIRoute(routing.Route):
status_code = int(status_code) status_code = int(status_code)
self.status_code = status_code self.status_code = status_code
if self.response_model: if self.response_model:
assert is_body_allowed_for_status_code(status_code), ( if not is_body_allowed_for_status_code(status_code):
f"Status code {status_code} must not have a response body" raise ValueError(f"Status code {status_code} must not have a response body")
)
response_name = "Response_" + self.unique_id response_name = "Response_" + self.unique_id
self.response_field = create_model_field( self.response_field = create_model_field(
name=response_name, name=response_name,
@ -534,12 +536,12 @@ class APIRoute(routing.Route):
self.description = self.description.split("\f")[0].strip() self.description = self.description.split("\f")[0].strip()
response_fields = {} response_fields = {}
for additional_status_code, response in self.responses.items(): for additional_status_code, response in self.responses.items():
assert isinstance(response, dict), "An additional response must be a dict" if not isinstance(response, dict):
raise TypeError("An additional response must be a dict")
model = response.get("model") model = response.get("model")
if model: if model:
assert is_body_allowed_for_status_code(additional_status_code), ( if not is_body_allowed_for_status_code(status_code):
f"Status code {additional_status_code} must not have a response body" raise ValueError(f"Status code {status_code} must not have a response body")
)
response_name = f"Response_{additional_status_code}_{self.unique_id}" response_name = f"Response_{additional_status_code}_{self.unique_id}"
response_field = create_model_field( response_field = create_model_field(
name=response_name, type_=model, mode="serialization" name=response_name, type_=model, mode="serialization"
@ -550,7 +552,9 @@ class APIRoute(routing.Route):
else: else:
self.response_fields = {} self.response_fields = {}
assert callable(endpoint), "An endpoint must be a callable" if not callable(endpoint):
raise TypeError("An endpoint must be a callable")
self.dependant = get_dependant(path=self.path_format, call=self.endpoint) self.dependant = get_dependant(path=self.path_format, call=self.endpoint)
for depends in self.dependencies[::-1]: for depends in self.dependencies[::-1]:
self.dependant.dependencies.insert( self.dependant.dependencies.insert(
@ -843,10 +847,11 @@ class APIRouter(routing.Router):
lifespan=lifespan, lifespan=lifespan,
) )
if prefix: if prefix:
assert prefix.startswith("/"), "A path prefix must start with '/'" if not prefix.startswith("/"):
assert not prefix.endswith("/"), ( raise ValueError("A path prefix must start with '/'")
"A path prefix must not end with '/', as the routes will start with '/'" if prefix.endswith("/"):
) raise ValueError("A path prefix must not end with '/', as the routes will start with '/'")
self.prefix = prefix self.prefix = prefix
self.tags: List[Union[str, Enum]] = tags or [] self.tags: List[Union[str, Enum]] = tags or []
self.dependencies = list(dependencies or []) self.dependencies = list(dependencies or [])
@ -1255,10 +1260,11 @@ class APIRouter(routing.Router):
``` ```
""" """
if prefix: if prefix:
assert prefix.startswith("/"), "A path prefix must start with '/'" if not prefix.startswith("/"):
assert not prefix.endswith("/"), ( raise ValueError("A path prefix must start with '/'")
"A path prefix must not end with '/', as the routes will start with '/'" if prefix.endswith("/"):
) raise ValueError("A path prefix must not end with '/', as the routes will start with '/'")
else: else:
for r in router.routes: for r in router.routes:
path = getattr(r, "path") # noqa: B009 path = getattr(r, "path") # noqa: B009

4
fastapi/utils.py

@ -177,9 +177,11 @@ def generate_operation_id_for_path(
def generate_unique_id(route: "APIRoute") -> str: def generate_unique_id(route: "APIRoute") -> str:
if not route.methods:
raise ValueError("Route must have at least one HTTP method")
operation_id = f"{route.name}{route.path_format}" operation_id = f"{route.name}{route.path_format}"
operation_id = re.sub(r"\W", "_", operation_id) operation_id = re.sub(r"\W", "_", operation_id)
assert route.methods
operation_id = f"{operation_id}_{list(route.methods)[0].lower()}" operation_id = f"{operation_id}_{list(route.methods)[0].lower()}"
return operation_id return operation_id

Loading…
Cancel
Save