Browse Source

🐛 Fix internal Pydantic v1 compatibility (warnings) for Python 3.14 and Pydantic 2.12.1 (#14186)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Sebastián Ramírez <[email protected]>
pull/14181/head
Sofie Van Landeghem 2 days ago
committed by GitHub
parent
commit
d8c691f7f0
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 8
      fastapi/_compat/__init__.py
  2. 107
      fastapi/_compat/main.py
  3. 123
      fastapi/_compat/may_v1.py
  4. 10
      fastapi/_compat/shared.py
  5. 34
      fastapi/_compat/v1.py
  6. 4
      fastapi/_compat/v2.py
  7. 23
      fastapi/dependencies/utils.py
  8. 18
      fastapi/encoders.py
  9. 2
      fastapi/temp_pydantic_v1_params.py
  10. 15
      fastapi/utils.py
  11. 12
      tests/test_compat.py
  12. 3
      tests/test_get_model_definitions_formfeed_escape.py
  13. 6
      tests/test_response_model_as_return_annotation.py

8
fastapi/_compat/__init__.py

@ -30,6 +30,10 @@ from .main import serialize_sequence_value as serialize_sequence_value
from .main import (
with_info_plain_validator_function as with_info_plain_validator_function,
)
from .may_v1 import CoreSchema as CoreSchema
from .may_v1 import GetJsonSchemaHandler as GetJsonSchemaHandler
from .may_v1 import JsonSchemaValue as JsonSchemaValue
from .may_v1 import _normalize_errors as _normalize_errors
from .model_field import ModelField as ModelField
from .shared import PYDANTIC_V2 as PYDANTIC_V2
from .shared import PYDANTIC_VERSION_MINOR_TUPLE as PYDANTIC_VERSION_MINOR_TUPLE
@ -44,7 +48,3 @@ from .shared import (
from .shared import lenient_issubclass as lenient_issubclass
from .shared import sequence_types as sequence_types
from .shared import value_is_sequence as value_is_sequence
from .v1 import CoreSchema as CoreSchema
from .v1 import GetJsonSchemaHandler as GetJsonSchemaHandler
from .v1 import JsonSchemaValue as JsonSchemaValue
from .v1 import _normalize_errors as _normalize_errors

107
fastapi/_compat/main.py

@ -1,3 +1,4 @@
import sys
from functools import lru_cache
from typing import (
Any,
@ -8,7 +9,7 @@ from typing import (
Type,
)
from fastapi._compat import v1
from fastapi._compat import may_v1
from fastapi._compat.shared import PYDANTIC_V2, lenient_issubclass
from fastapi.types import ModelNameMap
from pydantic import BaseModel
@ -50,7 +51,9 @@ else:
@lru_cache
def get_cached_model_fields(model: Type[BaseModel]) -> List[ModelField]:
if lenient_issubclass(model, v1.BaseModel):
if lenient_issubclass(model, may_v1.BaseModel):
from fastapi._compat import v1
return v1.get_model_fields(model)
else:
from . import v2
@ -59,7 +62,7 @@ def get_cached_model_fields(model: Type[BaseModel]) -> List[ModelField]:
def _is_undefined(value: object) -> bool:
if isinstance(value, v1.UndefinedType):
if isinstance(value, may_v1.UndefinedType):
return True
elif PYDANTIC_V2:
from . import v2
@ -69,7 +72,9 @@ def _is_undefined(value: object) -> bool:
def _get_model_config(model: BaseModel) -> Any:
if isinstance(model, v1.BaseModel):
if isinstance(model, may_v1.BaseModel):
from fastapi._compat import v1
return v1._get_model_config(model)
elif PYDANTIC_V2:
from . import v2
@ -80,7 +85,9 @@ def _get_model_config(model: BaseModel) -> Any:
def _model_dump(
model: BaseModel, mode: Literal["json", "python"] = "json", **kwargs: Any
) -> Any:
if isinstance(model, v1.BaseModel):
if isinstance(model, may_v1.BaseModel):
from fastapi._compat import v1
return v1._model_dump(model, mode=mode, **kwargs)
elif PYDANTIC_V2:
from . import v2
@ -89,7 +96,7 @@ def _model_dump(
def _is_error_wrapper(exc: Exception) -> bool:
if isinstance(exc, v1.ErrorWrapper):
if isinstance(exc, may_v1.ErrorWrapper):
return True
elif PYDANTIC_V2:
from . import v2
@ -99,7 +106,9 @@ def _is_error_wrapper(exc: Exception) -> bool:
def copy_field_info(*, field_info: FieldInfo, annotation: Any) -> FieldInfo:
if isinstance(field_info, v1.FieldInfo):
if isinstance(field_info, may_v1.FieldInfo):
from fastapi._compat import v1
return v1.copy_field_info(field_info=field_info, annotation=annotation)
else:
assert PYDANTIC_V2
@ -111,7 +120,9 @@ def copy_field_info(*, field_info: FieldInfo, annotation: Any) -> FieldInfo:
def create_body_model(
*, fields: Sequence[ModelField], model_name: str
) -> Type[BaseModel]:
if fields and isinstance(fields[0], v1.ModelField):
if fields and isinstance(fields[0], may_v1.ModelField):
from fastapi._compat import v1
return v1.create_body_model(fields=fields, model_name=model_name)
else:
assert PYDANTIC_V2
@ -123,7 +134,9 @@ def create_body_model(
def get_annotation_from_field_info(
annotation: Any, field_info: FieldInfo, field_name: str
) -> Any:
if isinstance(field_info, v1.FieldInfo):
if isinstance(field_info, may_v1.FieldInfo):
from fastapi._compat import v1
return v1.get_annotation_from_field_info(
annotation=annotation, field_info=field_info, field_name=field_name
)
@ -137,7 +150,9 @@ def get_annotation_from_field_info(
def is_bytes_field(field: ModelField) -> bool:
if isinstance(field, v1.ModelField):
if isinstance(field, may_v1.ModelField):
from fastapi._compat import v1
return v1.is_bytes_field(field)
else:
assert PYDANTIC_V2
@ -147,7 +162,9 @@ def is_bytes_field(field: ModelField) -> bool:
def is_bytes_sequence_field(field: ModelField) -> bool:
if isinstance(field, v1.ModelField):
if isinstance(field, may_v1.ModelField):
from fastapi._compat import v1
return v1.is_bytes_sequence_field(field)
else:
assert PYDANTIC_V2
@ -157,7 +174,9 @@ def is_bytes_sequence_field(field: ModelField) -> bool:
def is_scalar_field(field: ModelField) -> bool:
if isinstance(field, v1.ModelField):
if isinstance(field, may_v1.ModelField):
from fastapi._compat import v1
return v1.is_scalar_field(field)
else:
assert PYDANTIC_V2
@ -167,7 +186,9 @@ def is_scalar_field(field: ModelField) -> bool:
def is_scalar_sequence_field(field: ModelField) -> bool:
if isinstance(field, v1.ModelField):
if isinstance(field, may_v1.ModelField):
from fastapi._compat import v1
return v1.is_scalar_sequence_field(field)
else:
assert PYDANTIC_V2
@ -177,7 +198,9 @@ def is_scalar_sequence_field(field: ModelField) -> bool:
def is_sequence_field(field: ModelField) -> bool:
if isinstance(field, v1.ModelField):
if isinstance(field, may_v1.ModelField):
from fastapi._compat import v1
return v1.is_sequence_field(field)
else:
assert PYDANTIC_V2
@ -187,7 +210,9 @@ def is_sequence_field(field: ModelField) -> bool:
def serialize_sequence_value(*, field: ModelField, value: Any) -> Sequence[Any]:
if isinstance(field, v1.ModelField):
if isinstance(field, may_v1.ModelField):
from fastapi._compat import v1
return v1.serialize_sequence_value(field=field, value=value)
else:
assert PYDANTIC_V2
@ -197,7 +222,9 @@ def serialize_sequence_value(*, field: ModelField, value: Any) -> Sequence[Any]:
def _model_rebuild(model: Type[BaseModel]) -> None:
if lenient_issubclass(model, v1.BaseModel):
if lenient_issubclass(model, may_v1.BaseModel):
from fastapi._compat import v1
v1._model_rebuild(model)
elif PYDANTIC_V2:
from . import v2
@ -206,9 +233,18 @@ def _model_rebuild(model: Type[BaseModel]) -> None:
def get_compat_model_name_map(fields: List[ModelField]) -> ModelNameMap:
v1_model_fields = [field for field in fields if isinstance(field, v1.ModelField)]
v1_flat_models = v1.get_flat_models_from_fields(v1_model_fields, known_models=set()) # type: ignore[attr-defined]
v1_model_fields = [
field for field in fields if isinstance(field, may_v1.ModelField)
]
if v1_model_fields:
from fastapi._compat import v1
v1_flat_models = v1.get_flat_models_from_fields(
v1_model_fields, known_models=set()
)
all_flat_models = v1_flat_models
else:
all_flat_models = set()
if PYDANTIC_V2:
from . import v2
@ -222,6 +258,8 @@ def get_compat_model_name_map(fields: List[ModelField]) -> ModelNameMap:
model_name_map = v2.get_model_name_map(all_flat_models)
return model_name_map
from fastapi._compat import v1
model_name_map = v1.get_model_name_map(all_flat_models)
return model_name_map
@ -232,11 +270,15 @@ def get_definitions(
model_name_map: ModelNameMap,
separate_input_output_schemas: bool = True,
) -> Tuple[
Dict[Tuple[ModelField, Literal["validation", "serialization"]], v1.JsonSchemaValue],
Dict[
Tuple[ModelField, Literal["validation", "serialization"]],
may_v1.JsonSchemaValue,
],
Dict[str, Dict[str, Any]],
]:
v1_fields = [field for field in fields if isinstance(field, v1.ModelField)]
v1_field_maps, v1_definitions = v1.get_definitions(
if sys.version_info < (3, 14):
v1_fields = [field for field in fields if isinstance(field, may_v1.ModelField)]
v1_field_maps, v1_definitions = may_v1.get_definitions(
fields=v1_fields,
model_name_map=model_name_map,
separate_input_output_schemas=separate_input_output_schemas,
@ -256,17 +298,32 @@ def get_definitions(
all_field_maps = {**v1_field_maps, **v2_field_maps}
return all_field_maps, all_definitions
# Pydantic v1 is not supported since Python 3.14
else:
from . import v2
v2_fields = [field for field in fields if isinstance(field, v2.ModelField)]
v2_field_maps, v2_definitions = v2.get_definitions(
fields=v2_fields,
model_name_map=model_name_map,
separate_input_output_schemas=separate_input_output_schemas,
)
return v2_field_maps, v2_definitions
def get_schema_from_model_field(
*,
field: ModelField,
model_name_map: ModelNameMap,
field_mapping: Dict[
Tuple[ModelField, Literal["validation", "serialization"]], v1.JsonSchemaValue
Tuple[ModelField, Literal["validation", "serialization"]],
may_v1.JsonSchemaValue,
],
separate_input_output_schemas: bool = True,
) -> Dict[str, Any]:
if isinstance(field, v1.ModelField):
if isinstance(field, may_v1.ModelField):
from fastapi._compat import v1
return v1.get_schema_from_model_field(
field=field,
model_name_map=model_name_map,
@ -286,7 +343,7 @@ def get_schema_from_model_field(
def _is_model_field(value: Any) -> bool:
if isinstance(value, v1.ModelField):
if isinstance(value, may_v1.ModelField):
return True
elif PYDANTIC_V2:
from . import v2
@ -296,7 +353,7 @@ def _is_model_field(value: Any) -> bool:
def _is_model_class(value: Any) -> bool:
if lenient_issubclass(value, v1.BaseModel):
if lenient_issubclass(value, may_v1.BaseModel):
return True
elif PYDANTIC_V2:
from . import v2

123
fastapi/_compat/may_v1.py

@ -0,0 +1,123 @@
import sys
from typing import Any, Dict, List, Literal, Sequence, Tuple, Type, Union
from fastapi.types import ModelNameMap
if sys.version_info >= (3, 14):
class AnyUrl:
pass
class BaseConfig:
pass
class BaseModel:
pass
class Color:
pass
class CoreSchema:
pass
class ErrorWrapper:
pass
class FieldInfo:
pass
class GetJsonSchemaHandler:
pass
class JsonSchemaValue:
pass
class ModelField:
pass
class NameEmail:
pass
class RequiredParam:
pass
class SecretBytes:
pass
class SecretStr:
pass
class Undefined:
pass
class UndefinedType:
pass
class Url:
pass
from .v2 import ValidationError, create_model
def get_definitions(
*,
fields: List[ModelField],
model_name_map: ModelNameMap,
separate_input_output_schemas: bool = True,
) -> Tuple[
Dict[
Tuple[ModelField, Literal["validation", "serialization"]], JsonSchemaValue
],
Dict[str, Dict[str, Any]],
]:
return {}, {} # pragma: no cover
else:
from .v1 import AnyUrl as AnyUrl
from .v1 import BaseConfig as BaseConfig
from .v1 import BaseModel as BaseModel
from .v1 import Color as Color
from .v1 import CoreSchema as CoreSchema
from .v1 import ErrorWrapper as ErrorWrapper
from .v1 import FieldInfo as FieldInfo
from .v1 import GetJsonSchemaHandler as GetJsonSchemaHandler
from .v1 import JsonSchemaValue as JsonSchemaValue
from .v1 import ModelField as ModelField
from .v1 import NameEmail as NameEmail
from .v1 import RequiredParam as RequiredParam
from .v1 import SecretBytes as SecretBytes
from .v1 import SecretStr as SecretStr
from .v1 import Undefined as Undefined
from .v1 import UndefinedType as UndefinedType
from .v1 import Url as Url
from .v1 import ValidationError, create_model
from .v1 import get_definitions as get_definitions
RequestErrorModel: Type[BaseModel] = create_model("Request")
def _normalize_errors(errors: Sequence[Any]) -> List[Dict[str, Any]]:
use_errors: List[Any] = []
for error in errors:
if isinstance(error, ErrorWrapper):
new_errors = ValidationError( # type: ignore[call-arg]
errors=[error], model=RequestErrorModel
).errors()
use_errors.extend(new_errors)
elif isinstance(error, list):
use_errors.extend(_normalize_errors(error))
else:
use_errors.append(error)
return use_errors
def _regenerate_error_with_loc(
*, errors: Sequence[Any], loc_prefix: Tuple[Union[str, int], ...]
) -> List[Dict[str, Any]]:
updated_loc_errors: List[Any] = [
{**err, "loc": loc_prefix + err.get("loc", ())}
for err in _normalize_errors(errors)
]
return updated_loc_errors

10
fastapi/_compat/shared.py

@ -16,7 +16,7 @@ from typing import (
Union,
)
from fastapi._compat import v1
from fastapi._compat import may_v1
from fastapi.types import UnionType
from pydantic import BaseModel
from pydantic.version import VERSION as PYDANTIC_VERSION
@ -98,7 +98,9 @@ def value_is_sequence(value: Any) -> bool:
def _annotation_is_complex(annotation: Union[Type[Any], None]) -> bool:
return (
lenient_issubclass(annotation, (BaseModel, v1.BaseModel, Mapping, UploadFile))
lenient_issubclass(
annotation, (BaseModel, may_v1.BaseModel, Mapping, UploadFile)
)
or _annotation_is_sequence(annotation)
or is_dataclass(annotation)
)
@ -195,12 +197,12 @@ def is_uploadfile_sequence_annotation(annotation: Any) -> bool:
def annotation_is_pydantic_v1(annotation: Any) -> bool:
if lenient_issubclass(annotation, v1.BaseModel):
if lenient_issubclass(annotation, may_v1.BaseModel):
return True
origin = get_origin(annotation)
if origin is Union or origin is UnionType:
for arg in get_args(annotation):
if lenient_issubclass(arg, v1.BaseModel):
if lenient_issubclass(arg, may_v1.BaseModel):
return True
if field_annotation_is_sequence(annotation):
for sub_annotation in get_args(annotation):

34
fastapi/_compat/v1.py

@ -54,13 +54,15 @@ if not PYDANTIC_V2:
from pydantic.schema import TypeModelSet as TypeModelSet
from pydantic.schema import (
field_schema,
get_flat_models_from_fields,
model_process_schema,
)
from pydantic.schema import (
get_annotation_from_field_info as get_annotation_from_field_info,
)
from pydantic.schema import get_flat_models_from_field as get_flat_models_from_field
from pydantic.schema import (
get_flat_models_from_fields as get_flat_models_from_fields,
)
from pydantic.schema import get_model_name_map as get_model_name_map
from pydantic.types import SecretBytes as SecretBytes
from pydantic.types import SecretStr as SecretStr
@ -99,7 +101,6 @@ else:
from pydantic.v1.schema import TypeModelSet as TypeModelSet
from pydantic.v1.schema import (
field_schema,
get_flat_models_from_fields,
model_process_schema,
)
from pydantic.v1.schema import (
@ -108,6 +109,9 @@ else:
from pydantic.v1.schema import (
get_flat_models_from_field as get_flat_models_from_field,
)
from pydantic.v1.schema import (
get_flat_models_from_fields as get_flat_models_from_fields,
)
from pydantic.v1.schema import get_model_name_map as get_model_name_map
from pydantic.v1.types import ( # type: ignore[assignment]
SecretBytes as SecretBytes,
@ -215,32 +219,6 @@ def is_pv1_scalar_sequence_field(field: ModelField) -> bool:
return False
def _normalize_errors(errors: Sequence[Any]) -> List[Dict[str, Any]]:
use_errors: List[Any] = []
for error in errors:
if isinstance(error, ErrorWrapper):
new_errors = ValidationError( # type: ignore[call-arg]
errors=[error], model=RequestErrorModel
).errors()
use_errors.extend(new_errors)
elif isinstance(error, list):
use_errors.extend(_normalize_errors(error))
else:
use_errors.append(error)
return use_errors
def _regenerate_error_with_loc(
*, errors: Sequence[Any], loc_prefix: Tuple[Union[str, int], ...]
) -> List[Dict[str, Any]]:
updated_loc_errors: List[Any] = [
{**err, "loc": loc_prefix + err.get("loc", ())}
for err in _normalize_errors(errors)
]
return updated_loc_errors
def _model_rebuild(model: Type[BaseModel]) -> None:
model.update_forward_refs()

4
fastapi/_compat/v2.py

@ -15,7 +15,7 @@ from typing import (
cast,
)
from fastapi._compat import shared, v1
from fastapi._compat import may_v1, shared
from fastapi.openapi.constants import REF_TEMPLATE
from fastapi.types import IncEx, ModelNameMap
from pydantic import BaseModel, TypeAdapter, create_model
@ -116,7 +116,7 @@ class ModelField:
None,
)
except ValidationError as exc:
return None, v1._regenerate_error_with_loc(
return None, may_v1._regenerate_error_with_loc(
errors=exc.errors(include_url=False), loc_prefix=loc
)

23
fastapi/dependencies/utils.py

@ -43,9 +43,9 @@ from fastapi._compat import (
is_uploadfile_or_nonable_uploadfile_annotation,
is_uploadfile_sequence_annotation,
lenient_issubclass,
may_v1,
sequence_types,
serialize_sequence_value,
v1,
value_is_sequence,
)
from fastapi._compat.shared import annotation_is_pydantic_v1
@ -380,7 +380,7 @@ def analyze_param(
fastapi_annotations = [
arg
for arg in annotated_args[1:]
if isinstance(arg, (FieldInfo, v1.FieldInfo, params.Depends))
if isinstance(arg, (FieldInfo, may_v1.FieldInfo, params.Depends))
]
fastapi_specific_annotations = [
arg
@ -397,21 +397,21 @@ def analyze_param(
)
]
if fastapi_specific_annotations:
fastapi_annotation: Union[FieldInfo, v1.FieldInfo, params.Depends, None] = (
fastapi_specific_annotations[-1]
)
fastapi_annotation: Union[
FieldInfo, may_v1.FieldInfo, params.Depends, None
] = fastapi_specific_annotations[-1]
else:
fastapi_annotation = None
# Set default for Annotated FieldInfo
if isinstance(fastapi_annotation, (FieldInfo, v1.FieldInfo)):
if isinstance(fastapi_annotation, (FieldInfo, may_v1.FieldInfo)):
# Copy `field_info` because we mutate `field_info.default` below.
field_info = copy_field_info(
field_info=fastapi_annotation, annotation=use_annotation
)
assert field_info.default in {
Undefined,
v1.Undefined,
} or field_info.default in {RequiredParam, v1.RequiredParam}, (
may_v1.Undefined,
} or field_info.default in {RequiredParam, may_v1.RequiredParam}, (
f"`{field_info.__class__.__name__}` default value cannot be set in"
f" `Annotated` for {param_name!r}. Set the default value with `=` instead."
)
@ -435,7 +435,7 @@ def analyze_param(
)
depends = value
# Get FieldInfo from default value
elif isinstance(value, (FieldInfo, v1.FieldInfo)):
elif isinstance(value, (FieldInfo, may_v1.FieldInfo)):
assert field_info is None, (
"Cannot specify FastAPI annotations in `Annotated` and default value"
f" together for {param_name!r}"
@ -524,7 +524,8 @@ def analyze_param(
type_=use_annotation_from_field_info,
default=field_info.default,
alias=alias,
required=field_info.default in (RequiredParam, v1.RequiredParam, Undefined),
required=field_info.default
in (RequiredParam, may_v1.RequiredParam, Undefined),
field_info=field_info,
)
if is_path_param:
@ -741,7 +742,7 @@ def _validate_value_with_model_field(
if _is_error_wrapper(errors_): # type: ignore[arg-type]
return None, [errors_]
elif isinstance(errors_, list):
new_errors = v1._regenerate_error_with_loc(errors=errors_, loc_prefix=())
new_errors = may_v1._regenerate_error_with_loc(errors=errors_, loc_prefix=())
return None, new_errors
else:
return v_, []

18
fastapi/encoders.py

@ -17,7 +17,7 @@ from types import GeneratorType
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
from uuid import UUID
from fastapi._compat import v1
from fastapi._compat import may_v1
from fastapi.types import IncEx
from pydantic import BaseModel
from pydantic.color import Color
@ -59,7 +59,7 @@ def decimal_encoder(dec_value: Decimal) -> Union[int, float]:
ENCODERS_BY_TYPE: Dict[Type[Any], Callable[[Any], Any]] = {
bytes: lambda o: o.decode(),
Color: str,
v1.Color: str,
may_v1.Color: str,
datetime.date: isoformat,
datetime.datetime: isoformat,
datetime.time: isoformat,
@ -76,19 +76,19 @@ ENCODERS_BY_TYPE: Dict[Type[Any], Callable[[Any], Any]] = {
IPv6Interface: str,
IPv6Network: str,
NameEmail: str,
v1.NameEmail: str,
may_v1.NameEmail: str,
Path: str,
Pattern: lambda o: o.pattern,
SecretBytes: str,
v1.SecretBytes: str,
may_v1.SecretBytes: str,
SecretStr: str,
v1.SecretStr: str,
may_v1.SecretStr: str,
set: list,
UUID: str,
Url: str,
v1.Url: str,
may_v1.Url: str,
AnyUrl: str,
v1.AnyUrl: str,
may_v1.AnyUrl: str,
}
@ -220,10 +220,10 @@ def jsonable_encoder(
include = set(include)
if exclude is not None and not isinstance(exclude, (set, dict)):
exclude = set(exclude)
if isinstance(obj, (BaseModel, v1.BaseModel)):
if isinstance(obj, (BaseModel, may_v1.BaseModel)):
# TODO: remove when deprecating Pydantic v1
encoders: Dict[Any, Any] = {}
if isinstance(obj, v1.BaseModel):
if isinstance(obj, may_v1.BaseModel):
encoders = getattr(obj.__config__, "json_encoders", {}) # type: ignore[attr-defined]
if custom_encoder:
encoders = {**encoders, **custom_encoder}

2
fastapi/temp_pydantic_v1_params.py

@ -5,8 +5,8 @@ from fastapi.openapi.models import Example
from fastapi.params import ParamTypes
from typing_extensions import Annotated, deprecated
from ._compat.may_v1 import FieldInfo, Undefined
from ._compat.shared import PYDANTIC_VERSION_MINOR_TUPLE
from ._compat.v1 import FieldInfo, Undefined
_Unset: Any = Undefined

15
fastapi/utils.py

@ -25,7 +25,7 @@ from fastapi._compat import (
Validator,
annotation_is_pydantic_v1,
lenient_issubclass,
v1,
may_v1,
)
from fastapi.datastructures import DefaultPlaceholder, DefaultType
from pydantic import BaseModel
@ -87,8 +87,8 @@ def create_model_field(
) -> ModelField:
class_validators = class_validators or {}
v1_model_config = v1.BaseConfig
v1_field_info = field_info or v1.FieldInfo()
v1_model_config = may_v1.BaseConfig
v1_field_info = field_info or may_v1.FieldInfo()
v1_kwargs = {
"name": name,
"field_info": v1_field_info,
@ -102,9 +102,11 @@ def create_model_field(
if (
annotation_is_pydantic_v1(type_)
or isinstance(field_info, v1.FieldInfo)
or isinstance(field_info, may_v1.FieldInfo)
or version == "1"
):
from fastapi._compat import v1
try:
return v1.ModelField(**v1_kwargs) # type: ignore[no-any-return]
except RuntimeError:
@ -122,6 +124,8 @@ def create_model_field(
raise fastapi.exceptions.FastAPIError(_invalid_args_message) from None
# Pydantic v2 is not installed, but it's not a Pydantic v1 ModelField, it could be
# a Pydantic v1 type, like a constrained int
from fastapi._compat import v1
try:
return v1.ModelField(**v1_kwargs) # type: ignore[no-any-return]
except RuntimeError:
@ -138,6 +142,9 @@ def create_cloned_field(
if isinstance(field, v2.ModelField):
return field
from fastapi._compat import v1
# cloned_types caches already cloned types to support recursive models and improve
# performance by avoiding unnecessary cloning
if cloned_types is None:

12
tests/test_compat.py

@ -7,7 +7,7 @@ from fastapi._compat import (
get_cached_model_fields,
is_scalar_field,
is_uploadfile_sequence_annotation,
v1,
may_v1,
)
from fastapi._compat.shared import is_bytes_sequence_annotation
from fastapi.testclient import TestClient
@ -27,7 +27,10 @@ def test_model_field_default_required():
assert field.default is Undefined
@needs_py_lt_314
def test_v1_plain_validator_function():
from fastapi._compat import v1
# For coverage
def func(v): # pragma: no cover
return v
@ -135,6 +138,8 @@ def test_is_uploadfile_sequence_annotation():
@needs_py_lt_314
def test_is_pv1_scalar_field():
from fastapi._compat import v1
# For coverage
class Model(v1.BaseModel):
foo: Union[str, Dict[str, Any]]
@ -143,8 +148,11 @@ def test_is_pv1_scalar_field():
assert not is_scalar_field(fields[0])
@needs_py_lt_314
def test_get_model_fields_cached():
class Model(v1.BaseModel):
from fastapi._compat import v1
class Model(may_v1.BaseModel):
foo: str
non_cached_fields = v1.get_model_fields(Model)

3
tests/test_get_model_definitions_formfeed_escape.py

@ -5,7 +5,6 @@ import fastapi.openapi.utils
import pydantic.schema
import pytest
from fastapi import FastAPI
from fastapi._compat import v1
from pydantic import BaseModel
from starlette.testclient import TestClient
@ -165,6 +164,8 @@ def test_model_description_escaped_with_formfeed(sort_reversed: bool):
Test `get_model_definitions` with models passed in different order.
"""
from fastapi._compat import v1
all_fields = fastapi.openapi.utils.get_fields_from_routes(app.routes)
flat_models = v1.get_flat_models_from_fields(all_fields, known_models=set())

6
tests/test_response_model_as_return_annotation.py

@ -2,12 +2,13 @@ from typing import List, Union
import pytest
from fastapi import FastAPI
from fastapi._compat import v1
from fastapi.exceptions import FastAPIError, ResponseValidationError
from fastapi.responses import JSONResponse, Response
from fastapi.testclient import TestClient
from pydantic import BaseModel
from tests.utils import needs_pydanticv1
class BaseUser(BaseModel):
name: str
@ -511,7 +512,10 @@ def test_invalid_response_model_field():
# TODO: remove when dropping Pydantic v1 support
@needs_pydanticv1
def test_invalid_response_model_field_pv1():
from fastapi._compat import v1
app = FastAPI()
class Model(v1.BaseModel):

Loading…
Cancel
Save