diff --git a/fastapi/dependencies/utils.py b/fastapi/dependencies/utils.py index f35b2d0ba..a45a8fe09 100644 --- a/fastapi/dependencies/utils.py +++ b/fastapi/dependencies/utils.py @@ -24,6 +24,7 @@ from fastapi.concurrency import ( contextmanager_in_threadpool, ) from fastapi.dependencies.models import Dependant, SecurityRequirement +from fastapi.logger import logger from fastapi.security.base import SecurityBase from fastapi.security.oauth2 import OAuth2, SecurityScopes from fastapi.security.open_id_connect_url import OpenIdConnect @@ -96,6 +97,42 @@ sequence_shape_to_type = { } +multipart_not_installed_error = ( + 'Form data requires "python-multipart" to be installed. \n' + 'You can install "python-multipart" with: \n\n' + "pip install python-multipart\n" +) +multipart_incorrect_install_error = ( + 'Form data requires "python-multipart" to be installed. ' + 'It seems you installed "multipart" instead. \n' + 'You can remove "multipart" with: \n\n' + "pip uninstall multipart\n\n" + 'And then install "python-multipart" with: \n\n' + "pip install python-multipart\n" +) + + +def check_file_field(field: ModelField) -> None: + field_info = get_field_info(field) + if isinstance(field_info, params.Form): + try: + # __version__ is available in both multiparts, and can be mocked + from multipart import __version__ + + assert __version__ + try: + # parse_options_header is only available in the right multlipart + from multipart.multipart import parse_options_header + + assert parse_options_header + except ImportError: + logger.error(multipart_incorrect_install_error) + raise RuntimeError(multipart_incorrect_install_error) + except ImportError: + logger.error(multipart_not_installed_error) + raise RuntimeError(multipart_not_installed_error) + + def get_param_sub_dependant( *, param: inspect.Parameter, path: str, security_scopes: Optional[List[str]] = None ) -> Dependant: @@ -733,9 +770,8 @@ def get_schema_compatible_field(*, field: ModelField) -> ModelField: default=field.default, required=field.required, alias=field.alias, - field_info=field.field_info if PYDANTIC_1 else field.schema, # type: ignore + field_info=get_field_info(field), ) - return out_field @@ -748,7 +784,9 @@ def get_body_field(*, dependant: Dependant, name: str) -> Optional[ModelField]: embed = getattr(field_info, "embed", None) body_param_names_set = {param.name for param in flat_dependant.body_params} if len(body_param_names_set) == 1 and not embed: - return get_schema_compatible_field(field=first_param) + final_field = get_schema_compatible_field(field=first_param) + check_file_field(final_field) + return final_field # If one field requires to embed, all have to be embedded # in case a sub-dependency is evaluated with a single unique body field # That is combined (embedded) with other body fields @@ -779,10 +817,12 @@ def get_body_field(*, dependant: Dependant, name: str) -> Optional[ModelField]: ] if len(set(body_param_media_types)) == 1: BodyFieldInfo_kwargs["media_type"] = body_param_media_types[0] - return create_response_field( + final_field = create_response_field( name="body", type_=BodyModel, required=required, alias="body", field_info=BodyFieldInfo(**BodyFieldInfo_kwargs), ) + check_file_field(final_field) + return final_field diff --git a/tests/test_multipart_installation.py b/tests/test_multipart_installation.py new file mode 100644 index 000000000..c134332d3 --- /dev/null +++ b/tests/test_multipart_installation.py @@ -0,0 +1,106 @@ +import pytest +from fastapi import FastAPI, File, Form, UploadFile +from fastapi.dependencies.utils import ( + multipart_incorrect_install_error, + multipart_not_installed_error, +) + + +def test_incorrect_multipart_installed_form(monkeypatch): + monkeypatch.delattr("multipart.multipart.parse_options_header", raising=False) + with pytest.raises(RuntimeError, match=multipart_incorrect_install_error): + app = FastAPI() + + @app.post("/") + async def root(username: str = Form(...)): + return username # pragma: nocover + + +def test_incorrect_multipart_installed_file_upload(monkeypatch): + monkeypatch.delattr("multipart.multipart.parse_options_header", raising=False) + with pytest.raises(RuntimeError, match=multipart_incorrect_install_error): + app = FastAPI() + + @app.post("/") + async def root(f: UploadFile = File(...)): + return f # pragma: nocover + + +def test_incorrect_multipart_installed_file_bytes(monkeypatch): + monkeypatch.delattr("multipart.multipart.parse_options_header", raising=False) + with pytest.raises(RuntimeError, match=multipart_incorrect_install_error): + app = FastAPI() + + @app.post("/") + async def root(f: bytes = File(...)): + return f # pragma: nocover + + +def test_incorrect_multipart_installed_multi_form(monkeypatch): + monkeypatch.delattr("multipart.multipart.parse_options_header", raising=False) + with pytest.raises(RuntimeError, match=multipart_incorrect_install_error): + app = FastAPI() + + @app.post("/") + async def root(username: str = Form(...), pasword: str = Form(...)): + return username # pragma: nocover + + +def test_incorrect_multipart_installed_form_file(monkeypatch): + monkeypatch.delattr("multipart.multipart.parse_options_header", raising=False) + with pytest.raises(RuntimeError, match=multipart_incorrect_install_error): + app = FastAPI() + + @app.post("/") + async def root(username: str = Form(...), f: UploadFile = File(...)): + return username # pragma: nocover + + +def test_no_multipart_installed(monkeypatch): + monkeypatch.delattr("multipart.__version__", raising=False) + with pytest.raises(RuntimeError, match=multipart_not_installed_error): + app = FastAPI() + + @app.post("/") + async def root(username: str = Form(...)): + return username # pragma: nocover + + +def test_no_multipart_installed_file(monkeypatch): + monkeypatch.delattr("multipart.__version__", raising=False) + with pytest.raises(RuntimeError, match=multipart_not_installed_error): + app = FastAPI() + + @app.post("/") + async def root(f: UploadFile = File(...)): + return f # pragma: nocover + + +def test_no_multipart_installed_file_bytes(monkeypatch): + monkeypatch.delattr("multipart.__version__", raising=False) + with pytest.raises(RuntimeError, match=multipart_not_installed_error): + app = FastAPI() + + @app.post("/") + async def root(f: bytes = File(...)): + return f # pragma: nocover + + +def test_no_multipart_installed_multi_form(monkeypatch): + monkeypatch.delattr("multipart.__version__", raising=False) + with pytest.raises(RuntimeError, match=multipart_not_installed_error): + app = FastAPI() + + @app.post("/") + async def root(username: str = Form(...), password: str = Form(...)): + return username # pragma: nocover + + +def test_no_multipart_installed_form_file(monkeypatch): + monkeypatch.delattr("multipart.__version__", raising=False) + with pytest.raises(RuntimeError, match=multipart_not_installed_error): + app = FastAPI() + + @app.post("/") + async def root(username: str = Form(...), f: UploadFile = File(...)): + return username # pragma: nocover