from enum import Enum from pathlib import PurePath from types import GeneratorType from typing import Any, Callable, Dict, List, Set, Tuple, Union from fastapi.logger import logger from fastapi.utils import PYDANTIC_1 from pydantic import BaseModel from pydantic.json import ENCODERS_BY_TYPE SetIntStr = Set[Union[int, str]] DictIntStrAny = Dict[Union[int, str], Any] def generate_encoders_by_class_tuples( type_encoder_map: Dict[Any, Callable] ) -> Dict[Callable, Tuple]: encoders_by_classes: Dict[Callable, List] = {} for type_, encoder in type_encoder_map.items(): encoders_by_classes.setdefault(encoder, []).append(type_) encoders_by_class_tuples: Dict[Callable, Tuple] = {} for encoder, classes in encoders_by_classes.items(): encoders_by_class_tuples[encoder] = tuple(classes) return encoders_by_class_tuples encoders_by_class_tuples = generate_encoders_by_class_tuples(ENCODERS_BY_TYPE) def jsonable_encoder( obj: Any, include: Union[SetIntStr, DictIntStrAny] = None, exclude: Union[SetIntStr, DictIntStrAny] = set(), by_alias: bool = True, skip_defaults: bool = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False, custom_encoder: dict = {}, sqlalchemy_safe: bool = True, ) -> Any: if skip_defaults is not None: logger.warning( # pragma: nocover "skip_defaults in jsonable_encoder has been deprecated in favor of " "exclude_unset to keep in line with Pydantic v1, support for it will be " "removed soon." ) if include is not None and not isinstance(include, set): include = set(include) if exclude is not None and not isinstance(exclude, set): exclude = set(exclude) if isinstance(obj, BaseModel): encoder = getattr(obj.Config, "json_encoders", {}) if custom_encoder: encoder.update(custom_encoder) if PYDANTIC_1: obj_dict = obj.dict( include=include, exclude=exclude, by_alias=by_alias, exclude_unset=bool(exclude_unset or skip_defaults), exclude_none=exclude_none, exclude_defaults=exclude_defaults, ) else: # pragma: nocover if exclude_defaults: raise ValueError("Cannot use exclude_defaults") obj_dict = obj.dict( include=include, exclude=exclude, by_alias=by_alias, skip_defaults=bool(exclude_unset or skip_defaults), ) if "__root__" in obj_dict: obj_dict = obj_dict["__root__"] return jsonable_encoder( obj_dict, exclude_none=exclude_none, exclude_defaults=exclude_defaults, custom_encoder=encoder, sqlalchemy_safe=sqlalchemy_safe, ) if isinstance(obj, Enum): return obj.value if isinstance(obj, PurePath): return str(obj) if isinstance(obj, (str, int, float, type(None))): return obj if isinstance(obj, dict): encoded_dict = {} for key, value in obj.items(): if ( ( not sqlalchemy_safe or (not isinstance(key, str)) or (not key.startswith("_sa")) ) and (value is not None or not exclude_none) and ((include and key in include) or key not in exclude) ): encoded_key = jsonable_encoder( key, by_alias=by_alias, exclude_unset=exclude_unset, exclude_none=exclude_none, custom_encoder=custom_encoder, sqlalchemy_safe=sqlalchemy_safe, ) encoded_value = jsonable_encoder( value, by_alias=by_alias, exclude_unset=exclude_unset, exclude_none=exclude_none, custom_encoder=custom_encoder, sqlalchemy_safe=sqlalchemy_safe, ) encoded_dict[encoded_key] = encoded_value return encoded_dict if isinstance(obj, (list, set, frozenset, GeneratorType, tuple)): encoded_list = [] for item in obj: encoded_list.append( jsonable_encoder( item, include=include, exclude=exclude, by_alias=by_alias, exclude_unset=exclude_unset, exclude_defaults=exclude_defaults, exclude_none=exclude_none, custom_encoder=custom_encoder, sqlalchemy_safe=sqlalchemy_safe, ) ) return encoded_list if custom_encoder: if type(obj) in custom_encoder: return custom_encoder[type(obj)](obj) else: for encoder_type, encoder in custom_encoder.items(): if isinstance(obj, encoder_type): return encoder(obj) if type(obj) in ENCODERS_BY_TYPE: return ENCODERS_BY_TYPE[type(obj)](obj) for encoder, classes_tuple in encoders_by_class_tuples.items(): if isinstance(obj, classes_tuple): return encoder(obj) errors: List[Exception] = [] try: data = dict(obj) except Exception as e: errors.append(e) try: data = vars(obj) except Exception as e: errors.append(e) raise ValueError(errors) return jsonable_encoder( data, by_alias=by_alias, exclude_unset=exclude_unset, exclude_defaults=exclude_defaults, exclude_none=exclude_none, custom_encoder=custom_encoder, sqlalchemy_safe=sqlalchemy_safe, )