|
|
@ -80,25 +80,23 @@ multipart_incorrect_install_error = ( |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def check_file_field(field: ModelField) -> None: |
|
|
|
field_info = field.field_info |
|
|
|
if isinstance(field_info, params.Form): |
|
|
|
def ensure_multipart_is_installed() -> None: |
|
|
|
try: |
|
|
|
# __version__ is available in both multiparts, and can be mocked |
|
|
|
from multipart import __version__ # type: ignore |
|
|
|
|
|
|
|
assert __version__ |
|
|
|
try: |
|
|
|
# __version__ is available in both multiparts, and can be mocked |
|
|
|
from multipart import __version__ # type: ignore |
|
|
|
|
|
|
|
assert __version__ |
|
|
|
try: |
|
|
|
# parse_options_header is only available in the right multipart |
|
|
|
from multipart.multipart import parse_options_header # type: ignore |
|
|
|
|
|
|
|
assert parse_options_header |
|
|
|
except ImportError: |
|
|
|
logger.error(multipart_incorrect_install_error) |
|
|
|
raise RuntimeError(multipart_incorrect_install_error) from None |
|
|
|
# parse_options_header is only available in the right multipart |
|
|
|
from multipart.multipart import parse_options_header # type: ignore |
|
|
|
|
|
|
|
assert parse_options_header |
|
|
|
except ImportError: |
|
|
|
logger.error(multipart_not_installed_error) |
|
|
|
raise RuntimeError(multipart_not_installed_error) from None |
|
|
|
logger.error(multipart_incorrect_install_error) |
|
|
|
raise RuntimeError(multipart_incorrect_install_error) from None |
|
|
|
except ImportError: |
|
|
|
logger.error(multipart_not_installed_error) |
|
|
|
raise RuntimeError(multipart_not_installed_error) from None |
|
|
|
|
|
|
|
|
|
|
|
def get_param_sub_dependant( |
|
|
@ -336,6 +334,7 @@ def analyze_param( |
|
|
|
if annotation is not inspect.Signature.empty: |
|
|
|
use_annotation = annotation |
|
|
|
type_annotation = annotation |
|
|
|
# Extract Annotated info |
|
|
|
if get_origin(use_annotation) is Annotated: |
|
|
|
annotated_args = get_args(annotation) |
|
|
|
type_annotation = annotated_args[0] |
|
|
@ -355,6 +354,7 @@ def analyze_param( |
|
|
|
) |
|
|
|
else: |
|
|
|
fastapi_annotation = None |
|
|
|
# Set default for Annotated FieldInfo |
|
|
|
if isinstance(fastapi_annotation, FieldInfo): |
|
|
|
# Copy `field_info` because we mutate `field_info.default` below. |
|
|
|
field_info = copy_field_info( |
|
|
@ -369,9 +369,10 @@ def analyze_param( |
|
|
|
field_info.default = value |
|
|
|
else: |
|
|
|
field_info.default = Required |
|
|
|
# Get Annotated Depends |
|
|
|
elif isinstance(fastapi_annotation, params.Depends): |
|
|
|
depends = fastapi_annotation |
|
|
|
|
|
|
|
# Get Depends from default value |
|
|
|
if isinstance(value, params.Depends): |
|
|
|
assert depends is None, ( |
|
|
|
"Cannot specify `Depends` in `Annotated` and default value" |
|
|
@ -382,6 +383,7 @@ def analyze_param( |
|
|
|
f" default value together for {param_name!r}" |
|
|
|
) |
|
|
|
depends = value |
|
|
|
# Get FieldInfo from default value |
|
|
|
elif isinstance(value, FieldInfo): |
|
|
|
assert field_info is None, ( |
|
|
|
"Cannot specify FastAPI annotations in `Annotated` and default value" |
|
|
@ -391,11 +393,13 @@ def analyze_param( |
|
|
|
if PYDANTIC_V2: |
|
|
|
field_info.annotation = type_annotation |
|
|
|
|
|
|
|
# Get Depends from type annotation |
|
|
|
if depends is not None and depends.dependency is None: |
|
|
|
# Copy `depends` before mutating it |
|
|
|
depends = copy(depends) |
|
|
|
depends.dependency = type_annotation |
|
|
|
|
|
|
|
# Handle non-param type annotations like Request |
|
|
|
if lenient_issubclass( |
|
|
|
type_annotation, |
|
|
|
( |
|
|
@ -411,6 +415,7 @@ def analyze_param( |
|
|
|
assert ( |
|
|
|
field_info is None |
|
|
|
), f"Cannot specify FastAPI annotation for type {type_annotation!r}" |
|
|
|
# Handle default assignations, neither field_info nor depends was not found in Annotated nor default value |
|
|
|
elif field_info is None and depends is None: |
|
|
|
default_value = value if value is not inspect.Signature.empty else Required |
|
|
|
if is_path_param: |
|
|
@ -428,7 +433,9 @@ def analyze_param( |
|
|
|
field_info = params.Query(annotation=use_annotation, default=default_value) |
|
|
|
|
|
|
|
field = None |
|
|
|
# It's a field_info, not a dependency |
|
|
|
if field_info is not None: |
|
|
|
# Handle field_info.in_ |
|
|
|
if is_path_param: |
|
|
|
assert isinstance(field_info, params.Path), ( |
|
|
|
f"Cannot use `{field_info.__class__.__name__}` for path param" |
|
|
@ -444,6 +451,8 @@ def analyze_param( |
|
|
|
field_info, |
|
|
|
param_name, |
|
|
|
) |
|
|
|
if isinstance(field_info, params.Form): |
|
|
|
ensure_multipart_is_installed() |
|
|
|
if not field_info.alias and getattr(field_info, "convert_underscores", None): |
|
|
|
alias = param_name.replace("_", "-") |
|
|
|
else: |
|
|
@ -786,7 +795,6 @@ def get_body_field(*, dependant: Dependant, name: str) -> Optional[ModelField]: |
|
|
|
embed = getattr(field_info, "embed", None) |
|
|
|
body_param_names_set = {param.name for param in flat_dependant.body_params} |
|
|
|
if len(body_param_names_set) == 1 and not embed: |
|
|
|
check_file_field(first_param) |
|
|
|
return first_param |
|
|
|
# If one field requires to embed, all have to be embedded |
|
|
|
# in case a sub-dependency is evaluated with a single unique body field |
|
|
@ -825,5 +833,4 @@ def get_body_field(*, dependant: Dependant, name: str) -> Optional[ModelField]: |
|
|
|
alias="body", |
|
|
|
field_info=BodyFieldInfo(**BodyFieldInfo_kwargs), |
|
|
|
) |
|
|
|
check_file_field(final_field) |
|
|
|
return final_field |
|
|
|