|
|
|
@ -25,13 +25,13 @@ from fastapi._compat import ( |
|
|
|
create_body_model, |
|
|
|
evaluate_forwardref, |
|
|
|
field_annotation_is_scalar, |
|
|
|
field_annotation_is_scalar_sequence, |
|
|
|
field_annotation_is_sequence, |
|
|
|
get_cached_model_fields, |
|
|
|
get_missing_field_error, |
|
|
|
is_bytes_field, |
|
|
|
is_bytes_sequence_field, |
|
|
|
is_bytes_or_nonable_bytes_annotation, |
|
|
|
is_bytes_sequence_annotation, |
|
|
|
is_scalar_field, |
|
|
|
is_scalar_sequence_field, |
|
|
|
is_sequence_field, |
|
|
|
is_uploadfile_or_nonable_uploadfile_annotation, |
|
|
|
is_uploadfile_sequence_annotation, |
|
|
|
lenient_issubclass, |
|
|
|
@ -182,8 +182,10 @@ def _get_flat_fields_from_params(fields: list[ModelField]) -> list[ModelField]: |
|
|
|
if not fields: |
|
|
|
return fields |
|
|
|
first_field = fields[0] |
|
|
|
if len(fields) == 1 and lenient_issubclass(first_field.type_, BaseModel): |
|
|
|
fields_to_extract = get_cached_model_fields(first_field.type_) |
|
|
|
if len(fields) == 1 and lenient_issubclass( |
|
|
|
first_field.field_info.annotation, BaseModel |
|
|
|
): |
|
|
|
fields_to_extract = get_cached_model_fields(first_field.field_info.annotation) |
|
|
|
return fields_to_extract |
|
|
|
return fields |
|
|
|
|
|
|
|
@ -512,7 +514,6 @@ def analyze_param( |
|
|
|
type_=use_annotation_from_field_info, |
|
|
|
default=field_info.default, |
|
|
|
alias=alias, |
|
|
|
required=field_info.default in (RequiredParam, Undefined), |
|
|
|
field_info=field_info, |
|
|
|
) |
|
|
|
if is_path_param: |
|
|
|
@ -522,12 +523,8 @@ def analyze_param( |
|
|
|
elif isinstance(field_info, params.Query): |
|
|
|
assert ( |
|
|
|
is_scalar_field(field) |
|
|
|
or is_scalar_sequence_field(field) |
|
|
|
or ( |
|
|
|
lenient_issubclass(field.type_, BaseModel) |
|
|
|
# For Pydantic v1 |
|
|
|
and getattr(field, "shape", 1) == 1 |
|
|
|
) |
|
|
|
or field_annotation_is_scalar_sequence(field.field_info.annotation) |
|
|
|
or lenient_issubclass(field.field_info.annotation, BaseModel) |
|
|
|
), f"Query parameter {param_name!r} must be one of the supported types" |
|
|
|
|
|
|
|
return ParamDetails(type_annotation=type_annotation, depends=depends, field=field) |
|
|
|
@ -713,7 +710,7 @@ def _validate_value_with_model_field( |
|
|
|
*, field: ModelField, value: Any, values: dict[str, Any], loc: tuple[str, ...] |
|
|
|
) -> tuple[Any, list[Any]]: |
|
|
|
if value is None: |
|
|
|
if field.required: |
|
|
|
if field.field_info.is_required(): |
|
|
|
return None, [get_missing_field_error(loc=loc)] |
|
|
|
else: |
|
|
|
return deepcopy(field.default), [] |
|
|
|
@ -730,7 +727,7 @@ def _get_multidict_value( |
|
|
|
alias = alias or get_validation_alias(field) |
|
|
|
if ( |
|
|
|
(not _is_json_field(field)) |
|
|
|
and is_sequence_field(field) |
|
|
|
and field_annotation_is_sequence(field.field_info.annotation) |
|
|
|
and isinstance(values, (ImmutableMultiDict, Headers)) |
|
|
|
): |
|
|
|
value = values.getlist(alias) |
|
|
|
@ -743,9 +740,12 @@ def _get_multidict_value( |
|
|
|
and isinstance(value, str) # For type checks |
|
|
|
and value == "" |
|
|
|
) |
|
|
|
or (is_sequence_field(field) and len(value) == 0) |
|
|
|
or ( |
|
|
|
field_annotation_is_sequence(field.field_info.annotation) |
|
|
|
and len(value) == 0 |
|
|
|
) |
|
|
|
): |
|
|
|
if field.required: |
|
|
|
if field.field_info.is_required(): |
|
|
|
return |
|
|
|
else: |
|
|
|
return deepcopy(field.default) |
|
|
|
@ -766,8 +766,10 @@ def request_params_to_args( |
|
|
|
fields_to_extract = fields |
|
|
|
single_not_embedded_field = False |
|
|
|
default_convert_underscores = True |
|
|
|
if len(fields) == 1 and lenient_issubclass(first_field.type_, BaseModel): |
|
|
|
fields_to_extract = get_cached_model_fields(first_field.type_) |
|
|
|
if len(fields) == 1 and lenient_issubclass( |
|
|
|
first_field.field_info.annotation, BaseModel |
|
|
|
): |
|
|
|
fields_to_extract = get_cached_model_fields(first_field.field_info.annotation) |
|
|
|
single_not_embedded_field = True |
|
|
|
# If headers are in a Pydantic model, the way to disable convert_underscores |
|
|
|
# would be with Header(convert_underscores=False) at the Pydantic model level |
|
|
|
@ -871,8 +873,8 @@ def _should_embed_body_fields(fields: list[ModelField]) -> bool: |
|
|
|
# otherwise it has to be embedded, so that the key value pair can be extracted |
|
|
|
if ( |
|
|
|
isinstance(first_field.field_info, params.Form) |
|
|
|
and not lenient_issubclass(first_field.type_, BaseModel) |
|
|
|
and not is_union_of_base_models(first_field.type_) |
|
|
|
and not lenient_issubclass(first_field.field_info.annotation, BaseModel) |
|
|
|
and not is_union_of_base_models(first_field.field_info.annotation) |
|
|
|
): |
|
|
|
return True |
|
|
|
return False |
|
|
|
@ -889,12 +891,12 @@ async def _extract_form_body( |
|
|
|
field_info = field.field_info |
|
|
|
if ( |
|
|
|
isinstance(field_info, params.File) |
|
|
|
and is_bytes_field(field) |
|
|
|
and is_bytes_or_nonable_bytes_annotation(field.field_info.annotation) |
|
|
|
and isinstance(value, UploadFile) |
|
|
|
): |
|
|
|
value = await value.read() |
|
|
|
elif ( |
|
|
|
is_bytes_sequence_field(field) |
|
|
|
is_bytes_sequence_annotation(field.field_info.annotation) |
|
|
|
and isinstance(field_info, params.File) |
|
|
|
and value_is_sequence(value) |
|
|
|
): |
|
|
|
@ -941,10 +943,10 @@ async def request_body_to_args( |
|
|
|
|
|
|
|
if ( |
|
|
|
single_not_embedded_field |
|
|
|
and lenient_issubclass(first_field.type_, BaseModel) |
|
|
|
and lenient_issubclass(first_field.field_info.annotation, BaseModel) |
|
|
|
and isinstance(received_body, FormData) |
|
|
|
): |
|
|
|
fields_to_extract = get_cached_model_fields(first_field.type_) |
|
|
|
fields_to_extract = get_cached_model_fields(first_field.field_info.annotation) |
|
|
|
|
|
|
|
if isinstance(received_body, FormData): |
|
|
|
body_to_process = await _extract_form_body(fields_to_extract, received_body) |
|
|
|
@ -997,7 +999,9 @@ def get_body_field( |
|
|
|
BodyModel = create_body_model( |
|
|
|
fields=flat_dependant.body_params, model_name=model_name |
|
|
|
) |
|
|
|
required = any(True for f in flat_dependant.body_params if f.required) |
|
|
|
required = any( |
|
|
|
True for f in flat_dependant.body_params if f.field_info.is_required() |
|
|
|
) |
|
|
|
BodyFieldInfo_kwargs: dict[str, Any] = { |
|
|
|
"annotation": BodyModel, |
|
|
|
"alias": "body", |
|
|
|
@ -1021,7 +1025,6 @@ def get_body_field( |
|
|
|
final_field = create_model_field( |
|
|
|
name="body", |
|
|
|
type_=BodyModel, |
|
|
|
required=required, |
|
|
|
alias="body", |
|
|
|
field_info=BodyFieldInfo(**BodyFieldInfo_kwargs), |
|
|
|
) |
|
|
|
|