committed by
GitHub
144 changed files with 6222 additions and 1304 deletions
@ -0,0 +1,63 @@ |
|||
# JSON with Bytes as Base64 { #json-with-bytes-as-base64 } |
|||
|
|||
If your app needs to receive and send JSON data, but you need to include binary data in it, you can encode it as base64. |
|||
|
|||
## Base64 vs Files { #base64-vs-files } |
|||
|
|||
Consider first if you can use [Request Files](../tutorial/request-files.md){.internal-link target=_blank} for uploading binary data and [Custom Response - FileResponse](./custom-response.md#fileresponse--fileresponse-){.internal-link target=_blank} for sending binary data, instead of encoding it in JSON. |
|||
|
|||
JSON can only contain UTF-8 encoded strings, so it can't contain raw bytes. |
|||
|
|||
Base64 can encode binary data in strings, but to do it, it needs to use more characters than the original binary data, so it would normally be less efficient than regular files. |
|||
|
|||
Use base64 only if you definitely need to include binary data in JSON, and you can't use files for that. |
|||
|
|||
## Pydantic `bytes` { #pydantic-bytes } |
|||
|
|||
You can declare a Pydantic model with `bytes` fields, and then use `val_json_bytes` in the model config to tell it to use base64 to *validate* input JSON data, as part of that validation it will decode the base64 string into bytes. |
|||
|
|||
{* ../../docs_src/json_base64_bytes/tutorial001_py310.py ln[1:9,29:35] hl[9] *} |
|||
|
|||
If you check the `/docs`, they will show that the field `data` expects base64 encoded bytes: |
|||
|
|||
<div class="screenshot"> |
|||
<img src="/img/tutorial/json-base64-bytes/image01.png"> |
|||
</div> |
|||
|
|||
You could send a request like: |
|||
|
|||
```json |
|||
{ |
|||
"description": "Some data", |
|||
"data": "aGVsbG8=" |
|||
} |
|||
``` |
|||
|
|||
/// tip |
|||
|
|||
`aGVsbG8=` is the base64 encoding of `hello`. |
|||
|
|||
/// |
|||
|
|||
And then Pydantic will decode the base64 string and give you the original bytes in the `data` field of the model. |
|||
|
|||
You will receive a response like: |
|||
|
|||
```json |
|||
{ |
|||
"description": "Some data", |
|||
"content": "hello" |
|||
} |
|||
``` |
|||
|
|||
## Pydantic `bytes` for Output Data { #pydantic-bytes-for-output-data } |
|||
|
|||
You can also use `bytes` fields with `ser_json_bytes` in the model config for output data, and Pydantic will *serialize* the bytes as base64 when generating the JSON response. |
|||
|
|||
{* ../../docs_src/json_base64_bytes/tutorial001_py310.py ln[1:2,12:16,29,38:41] hl[16] *} |
|||
|
|||
## Pydantic `bytes` for Input and Output Data { #pydantic-bytes-for-input-and-output-data } |
|||
|
|||
And of course, you can use the same model configured to use base64 to handle both input (*validate*) with `val_json_bytes` and output (*serialize*) with `ser_json_bytes` when receiving and sending JSON data. |
|||
|
|||
{* ../../docs_src/json_base64_bytes/tutorial001_py310.py ln[1:2,19:26,29,44:46] hl[23:26] *} |
|||
@ -0,0 +1,117 @@ |
|||
# Stream Data { #stream-data } |
|||
|
|||
If you want to stream data that can be structured as JSON, you should [Stream JSON Lines](../tutorial/stream-json-lines.md){.internal-link target=_blank}. |
|||
|
|||
But if you want to **stream pure binary data** or strings, here's how you can do it. |
|||
|
|||
/// info |
|||
|
|||
Added in FastAPI 0.134.0. |
|||
|
|||
/// |
|||
|
|||
## Use Cases { #use-cases } |
|||
|
|||
You could use this if you want to stream pure strings, for example directly from the output of an **AI LLM** service. |
|||
|
|||
You could also use it to stream **large binary files**, where you stream each chunk of data as you read it, without having to read it all in memory at once. |
|||
|
|||
You could also stream **video** or **audio** this way, it could even be generated as you process and send it. |
|||
|
|||
## A `StreamingResponse` with `yield` { #a-streamingresponse-with-yield } |
|||
|
|||
If you declare a `response_class=StreamingResponse` in your *path operation function*, you can use `yield` to send each chunk of data in turn. |
|||
|
|||
{* ../../docs_src/stream_data/tutorial001_py310.py ln[1:23] hl[20,23] *} |
|||
|
|||
FastAPI will give each chunk of data to the `StreamingResponse` as is, it won't try to convert it to JSON or anything similar. |
|||
|
|||
### Non-async *path operation functions* { #non-async-path-operation-functions } |
|||
|
|||
You can also use regular `def` functions (without `async`), and use `yield` the same way. |
|||
|
|||
{* ../../docs_src/stream_data/tutorial001_py310.py ln[26:29] hl[27] *} |
|||
|
|||
### No Annotation { #no-annotation } |
|||
|
|||
You don't really need to declare the return type annotation for streaming binary data. |
|||
|
|||
As FastAPI will not try to convert the data to JSON with Pydantic or serialize it in any way, in this case, the type annotation is only for your editor and tools to use, it won't be used by FastAPI. |
|||
|
|||
{* ../../docs_src/stream_data/tutorial001_py310.py ln[32:35] hl[33] *} |
|||
|
|||
This also means that with `StreamingResponse` you have the **freedom** and **responsibility** to produce and encode the data bytes exactly as you need them to be sent, independent of the type annotations. 🤓 |
|||
|
|||
### Stream Bytes { #stream-bytes } |
|||
|
|||
One of the main use cases would be to stream `bytes` instead of strings, you can of course do it. |
|||
|
|||
{* ../../docs_src/stream_data/tutorial001_py310.py ln[44:47] hl[47] *} |
|||
|
|||
## A Custom `PNGStreamingResponse` { #a-custom-pngstreamingresponse } |
|||
|
|||
In the examples above, the data bytes were streamed, but the response didn't have a `Content-Type` header, so the client didn't know what type of data it was receiving. |
|||
|
|||
You can create a custom sub-class of `StreamingResponse` that sets the `Content-Type` header to the type of data you're streaming. |
|||
|
|||
For example, you can create a `PNGStreamingResponse` that sets the `Content-Type` header to `image/png` using the `media_type` attribute: |
|||
|
|||
{* ../../docs_src/stream_data/tutorial002_py310.py ln[6,19:20] hl[20] *} |
|||
|
|||
Then you can use this new class in `response_class=PNGStreamingResponse` in your *path operation function*: |
|||
|
|||
{* ../../docs_src/stream_data/tutorial002_py310.py ln[23:27] hl[23] *} |
|||
|
|||
### Simulate a File { #simulate-a-file } |
|||
|
|||
In this example, we are simulating a file with `io.BytesIO`, which is a file-like object that lives only in memory, but lets us use the same interface. |
|||
|
|||
For example, we can iterate over it to consume its contents, as we could with a file. |
|||
|
|||
{* ../../docs_src/stream_data/tutorial002_py310.py ln[1:27] hl[3,12:13,25] *} |
|||
|
|||
/// note | Technical Details |
|||
|
|||
The other two variables, `image_base64` and `binary_image`, are an image encoded in Base64, and then converted to bytes, to then pass it to `io.BytesIO`. |
|||
|
|||
Only so that it can live in the same file for this example and you can copy it and run it as is. 🥚 |
|||
|
|||
/// |
|||
|
|||
By using a `with` block, we make sure that the file-like object is closed after the generator function (the function with `yield`) is done. So, after it finishes sending the response. |
|||
|
|||
It wouldn't be that important in this specific example because it's a fake in-memory file (with `io.BytesIO`), but with a real file, it would be important to make sure the file is closed after the work with it is done. |
|||
|
|||
### Files and Async { #files-and-async } |
|||
|
|||
In most cases, file-like objects are not compatible with async and await by default. |
|||
|
|||
For example, they don't have an `await file.read()`, or `async for chunk in file`. |
|||
|
|||
And in many cases, reading them would be a blocking operation (that could block the event loop), because they are read from disk or from the network. |
|||
|
|||
/// info |
|||
|
|||
The example above is actually an exception, because the `io.BytesIO` object is already in memory, so reading it won't block anything. |
|||
|
|||
But in many cases reading a file or a file-like object would block. |
|||
|
|||
/// |
|||
|
|||
To avoid blocking the event loop, you can simply declare the *path operation function* with regular `def` instead of `async def`, that way FastAPI will run it on a threadpool worker, to avoid blocking the main loop. |
|||
|
|||
{* ../../docs_src/stream_data/tutorial002_py310.py ln[30:34] hl[31] *} |
|||
|
|||
/// tip |
|||
|
|||
If you need to call blocking code from inside of an async function, or an async function from inside of a blocking function, you could use <a href="https://asyncer.tiangolo.com" class="external-link" target="_blank">Asyncer</a>, a sibling library to FastAPI. |
|||
|
|||
/// |
|||
|
|||
### `yield from` { #yield-from } |
|||
|
|||
When you are iterating over something, like a file-like object, and then you are doing `yield` for each item, you could also use `yield from` to yield each item directly and skip the `for` loop. |
|||
|
|||
This is not particular to FastAPI, it's just Python, but it's a nice trick to know. 😎 |
|||
|
|||
{* ../../docs_src/stream_data/tutorial002_py310.py ln[37:40] hl[40] *} |
|||
@ -0,0 +1,88 @@ |
|||
# Strict Content-Type Checking { #strict-content-type-checking } |
|||
|
|||
By default, **FastAPI** uses strict `Content-Type` header checking for JSON request bodies, this means that JSON requests **must** include a valid `Content-Type` header (e.g. `application/json`) in order for the body to be parsed as JSON. |
|||
|
|||
## CSRF Risk { #csrf-risk } |
|||
|
|||
This default behavior provides protection against a class of **Cross-Site Request Forgery (CSRF)** attacks in a very specific scenario. |
|||
|
|||
These attacks exploit the fact that browsers allow scripts to send requests without doing any CORS preflight check when they: |
|||
|
|||
* don't have a `Content-Type` header (e.g. using `fetch()` with a `Blob` body) |
|||
* and don't send any authentication credentials. |
|||
|
|||
This type of attack is mainly relevant when: |
|||
|
|||
* the application is running locally (e.g. on `localhost`) or in an internal network |
|||
* and the application doesn't have any authentication, it expects that any request from the same network can be trusted. |
|||
|
|||
## Example Attack { #example-attack } |
|||
|
|||
Imagine you build a way to run a local AI agent. |
|||
|
|||
It provides an API at |
|||
|
|||
``` |
|||
http://localhost:8000/v1/agents/multivac |
|||
``` |
|||
|
|||
There's also a frontend at |
|||
|
|||
``` |
|||
http://localhost:8000 |
|||
``` |
|||
|
|||
/// tip |
|||
|
|||
Note that both have the same host. |
|||
|
|||
/// |
|||
|
|||
Then using the frontend you can make the AI agent do things on your behalf. |
|||
|
|||
As it's running **locally**, and not in the open internet, you decide to **not have any authentication** set up, just trusting the access to the local network. |
|||
|
|||
Then one of your users could install it and run it locally. |
|||
|
|||
Then they could open a malicious website, e.g. something like |
|||
|
|||
``` |
|||
https://evilhackers.example.com |
|||
``` |
|||
|
|||
And that malicious website sends requests using `fetch()` with a `Blob` body to the local API at |
|||
|
|||
``` |
|||
http://localhost:8000/v1/agents/multivac |
|||
``` |
|||
|
|||
Even though the host of the malicious website and the local app is different, the browser won't trigger a CORS preflight request because: |
|||
|
|||
* It's running without any authentication, it doesn't have to send any credentials. |
|||
* The browser thinks it's not sending JSON (because of the missing `Content-Type` header). |
|||
|
|||
Then the malicious website could make the local AI agent send angry messages to the user's ex-boss... or worse. 😅 |
|||
|
|||
## Open Internet { #open-internet } |
|||
|
|||
If your app is in the open internet, you wouldn't "trust the network" and let anyone send privileged requests without authentication. |
|||
|
|||
Attackers could simply run a script to send requests to your API, no need for browser interaction, so you are probably already securing any privileged endpoints. |
|||
|
|||
In that case **this attack / risk doesn't apply to you**. |
|||
|
|||
This risk and attack is mainly relevant when the app runs on the **local network** and that is the **only assumed protection**. |
|||
|
|||
## Allowing Requests Without Content-Type { #allowing-requests-without-content-type } |
|||
|
|||
If you need to support clients that don't send a `Content-Type` header, you can disable strict checking by setting `strict_content_type=False`: |
|||
|
|||
{* ../../docs_src/strict_content_type/tutorial001_py310.py hl[4] *} |
|||
|
|||
With this setting, requests without a `Content-Type` header will have their body parsed as JSON, which is the same behavior as older versions of FastAPI. |
|||
|
|||
/// info |
|||
|
|||
This behavior and configuration was added in FastAPI 0.132.0. |
|||
|
|||
/// |
|||
|
After Width: | Height: | Size: 71 KiB |
@ -0,0 +1,29 @@ |
|||
document.addEventListener("DOMContentLoaded", function () { |
|||
var script = document.createElement("script"); |
|||
script.src = "https://widget.kapa.ai/kapa-widget.bundle.js"; |
|||
script.setAttribute("data-website-id", "91f47f27-b405-4299-bf5f-a1c0ec07b3cc"); |
|||
script.setAttribute("data-project-name", "FastAPI"); |
|||
script.setAttribute("data-project-color", "#009485"); |
|||
script.setAttribute("data-project-logo", "https://fastapi.tiangolo.com/img/favicon.png"); |
|||
script.setAttribute("data-bot-protection-mechanism", "hcaptcha"); |
|||
script.setAttribute("data-button-height", "3rem"); |
|||
script.setAttribute("data-button-width", "3rem"); |
|||
script.setAttribute("data-button-border-radius", "50%"); |
|||
script.setAttribute("data-button-padding", "0"); |
|||
script.setAttribute("data-button-image", "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='24' height='24' viewBox='0 0 24 24' fill='none' stroke='white' stroke-width='2' stroke-linecap='round' stroke-linejoin='round'%3E%3Cpath d='M12 8V4H8'/%3E%3Crect width='16' height='12' x='4' y='8' rx='2'/%3E%3Cpath d='M2 14h2'/%3E%3Cpath d='M20 14h2'/%3E%3Cpath d='M15 13v2'/%3E%3Cpath d='M9 13v2'/%3E%3C/svg%3E"); |
|||
script.setAttribute("data-button-image-height", "20px"); |
|||
script.setAttribute("data-button-image-width", "20px"); |
|||
script.setAttribute("data-button-text", "Ask AI"); |
|||
script.setAttribute("data-button-text-font-size", "0.5rem"); |
|||
script.setAttribute("data-button-text-font-family", "Roboto, sans-serif"); |
|||
script.setAttribute("data-button-text-color", "#FFFFFF"); |
|||
script.setAttribute("data-modal-border-radius", "0.5rem"); |
|||
script.setAttribute("data-modal-header-bg-color", "#009485"); |
|||
script.setAttribute("data-modal-title", "FastAPI AI Assistant"); |
|||
script.setAttribute("data-modal-title-color", "#FFFFFF"); |
|||
script.setAttribute("data-modal-title-font-family", "Roboto, sans-serif"); |
|||
script.setAttribute("data-modal-example-questions", "How to define a route?,How to validate models?,How to handle responses?,How to deploy FastAPI?"); |
|||
script.setAttribute("data-modal-disclaimer", "AI-generated answers based on FastAPI [documentation](https://fastapi.tiangolo.com/) and [community discussions](https://github.com/fastapi/fastapi/discussions). Always verify important information."); |
|||
script.async = true; |
|||
document.head.appendChild(script); |
|||
}); |
|||
@ -0,0 +1,120 @@ |
|||
# Server-Sent Events (SSE) { #server-sent-events-sse } |
|||
|
|||
You can stream data to the client using **Server-Sent Events** (SSE). |
|||
|
|||
This is similar to [Stream JSON Lines](stream-json-lines.md){.internal-link target=_blank}, but uses the `text/event-stream` format, which is supported natively by browsers with the <a href="https://developer.mozilla.org/en-US/docs/Web/API/EventSource" class="external-link" target="_blank">`EventSource` API</a>. |
|||
|
|||
/// info |
|||
|
|||
Added in FastAPI 0.135.0. |
|||
|
|||
/// |
|||
|
|||
## What are Server-Sent Events? { #what-are-server-sent-events } |
|||
|
|||
SSE is a standard for streaming data from the server to the client over HTTP. |
|||
|
|||
Each event is a small text block with "fields" like `data`, `event`, `id`, and `retry`, separated by blank lines. |
|||
|
|||
It looks like this: |
|||
|
|||
``` |
|||
data: {"name": "Portal Gun", "price": 999.99} |
|||
|
|||
data: {"name": "Plumbus", "price": 32.99} |
|||
|
|||
``` |
|||
|
|||
SSE is commonly used for AI chat streaming, live notifications, logs and observability, and other cases where the server pushes updates to the client. |
|||
|
|||
/// tip |
|||
|
|||
If you want to stream binary data, for example video or audio, check the advanced guide: [Stream Data](../advanced/stream-data.md){.internal-link target=_blank}. |
|||
|
|||
/// |
|||
|
|||
## Stream SSE with FastAPI { #stream-sse-with-fastapi } |
|||
|
|||
To stream SSE with FastAPI, use `yield` in your *path operation function* and set `response_class=EventSourceResponse`. |
|||
|
|||
Import `EventSourceResponse` from `fastapi.sse`: |
|||
|
|||
{* ../../docs_src/server_sent_events/tutorial001_py310.py ln[1:25] hl[4,22] *} |
|||
|
|||
Each yielded item is encoded as JSON and sent in the `data:` field of an SSE event. |
|||
|
|||
If you declare the return type as `AsyncIterable[Item]`, FastAPI will use it to **validate**, **document**, and **serialize** the data using Pydantic. |
|||
|
|||
{* ../../docs_src/server_sent_events/tutorial001_py310.py ln[1:25] hl[10:12,23] *} |
|||
|
|||
/// tip |
|||
|
|||
As Pydantic will serialize it in the **Rust** side, you will get much higher **performance** than if you don't declare a return type. |
|||
|
|||
/// |
|||
|
|||
### Non-async *path operation functions* { #non-async-path-operation-functions } |
|||
|
|||
You can also use regular `def` functions (without `async`), and use `yield` the same way. |
|||
|
|||
FastAPI will make sure it's run correctly so that it doesn't block the event loop. |
|||
|
|||
As in this case the function is not async, the right return type would be `Iterable[Item]`: |
|||
|
|||
{* ../../docs_src/server_sent_events/tutorial001_py310.py ln[28:31] hl[29] *} |
|||
|
|||
### No Return Type { #no-return-type } |
|||
|
|||
You can also omit the return type. FastAPI will use the [`jsonable_encoder`](./encoder.md){.internal-link target=_blank} to convert the data and send it. |
|||
|
|||
{* ../../docs_src/server_sent_events/tutorial001_py310.py ln[34:37] hl[35] *} |
|||
|
|||
## `ServerSentEvent` { #serversentevent } |
|||
|
|||
If you need to set SSE fields like `event`, `id`, `retry`, or `comment`, you can yield `ServerSentEvent` objects instead of plain data. |
|||
|
|||
Import `ServerSentEvent` from `fastapi.sse`: |
|||
|
|||
{* ../../docs_src/server_sent_events/tutorial002_py310.py hl[4,26] *} |
|||
|
|||
The `data` field is always encoded as JSON. You can pass any value that can be serialized as JSON, including Pydantic models. |
|||
|
|||
## Raw Data { #raw-data } |
|||
|
|||
If you need to send data **without** JSON encoding, use `raw_data` instead of `data`. |
|||
|
|||
This is useful for sending pre-formatted text, log lines, or special <dfn title="A value used to indicate a special condition or state">"sentinel"</dfn> values like `[DONE]`. |
|||
|
|||
{* ../../docs_src/server_sent_events/tutorial003_py310.py hl[17] *} |
|||
|
|||
/// note |
|||
|
|||
`data` and `raw_data` are mutually exclusive. You can only set one of them on each `ServerSentEvent`. |
|||
|
|||
/// |
|||
|
|||
## Resuming with `Last-Event-ID` { #resuming-with-last-event-id } |
|||
|
|||
When a browser reconnects after a connection drop, it sends the last received `id` in the `Last-Event-ID` header. |
|||
|
|||
You can read it as a header parameter and use it to resume the stream from where the client left off: |
|||
|
|||
{* ../../docs_src/server_sent_events/tutorial004_py310.py hl[25,27,31] *} |
|||
|
|||
## SSE with POST { #sse-with-post } |
|||
|
|||
SSE works with **any HTTP method**, not just `GET`. |
|||
|
|||
This is useful for protocols like <a href="https://modelcontextprotocol.io" class="external-link" target="_blank">MCP</a> that stream SSE over `POST`: |
|||
|
|||
{* ../../docs_src/server_sent_events/tutorial005_py310.py hl[14] *} |
|||
|
|||
## Technical Details { #technical-details } |
|||
|
|||
FastAPI implements some SSE best practices out of the box. |
|||
|
|||
* Send a **"keep alive" `ping` comment** every 15 seconds when there hasn't been any message, to prevent some proxies from closing the connection, as suggested in the <a href="https://html.spec.whatwg.org/multipage/server-sent-events.html#authoring-notes" class="external-link" target="_blank">HTML specification: Server-Sent Events</a>. |
|||
* Set the `Cache-Control: no-cache` header to **prevent caching** of the stream. |
|||
* Set a special header `X-Accel-Buffering: no` to **prevent buffering** in some proxies like Nginx. |
|||
|
|||
You don't have to do anything about it, it works out of the box. 🤓 |
|||
@ -0,0 +1,111 @@ |
|||
# Stream JSON Lines { #stream-json-lines } |
|||
|
|||
You could have a sequence of data that you would like to send in a "**stream**", you could do it with **JSON Lines**. |
|||
|
|||
/// info |
|||
|
|||
Added in FastAPI 0.134.0. |
|||
|
|||
/// |
|||
|
|||
## What is a Stream? { #what-is-a-stream } |
|||
|
|||
"**Streaming**" data means that your app will start sending data items to the client without waiting for the entire sequence of items to be ready. |
|||
|
|||
So, it will send the first item, the client will receive and start processing it, and you might still be producing the next item. |
|||
|
|||
```mermaid |
|||
sequenceDiagram |
|||
participant App |
|||
participant Client |
|||
|
|||
App->>App: Produce Item 1 |
|||
App->>Client: Send Item 1 |
|||
App->>App: Produce Item 2 |
|||
Client->>Client: Process Item 1 |
|||
App->>Client: Send Item 2 |
|||
App->>App: Produce Item 3 |
|||
Client->>Client: Process Item 2 |
|||
App->>Client: Send Item 3 |
|||
Client->>Client: Process Item 3 |
|||
Note over App: Keeps producing... |
|||
Note over Client: Keeps consuming... |
|||
``` |
|||
|
|||
It could even be an infinite stream, where you keep sending data. |
|||
|
|||
## JSON Lines { #json-lines } |
|||
|
|||
In these cases, it's common to send "**JSON Lines**", which is a format where you send one JSON object per line. |
|||
|
|||
A response would have a content type of `application/jsonl` (instead of `application/json`) and the body would be something like: |
|||
|
|||
```json |
|||
{"name": "Plumbus", "description": "A multi-purpose household device."} |
|||
{"name": "Portal Gun", "description": "A portal opening device."} |
|||
{"name": "Meeseeks Box", "description": "A box that summons a Meeseeks."} |
|||
``` |
|||
|
|||
It's very similar to a JSON array (equivalent of a Python list), but instead of being wrapped in `[]` and having `,` between the items, it has **one JSON object per line**, they are separated by a new line character. |
|||
|
|||
/// info |
|||
|
|||
The important point is that your app will be able to produce each line in turn, while the client consumes the previous lines. |
|||
|
|||
/// |
|||
|
|||
/// note | Technical Details |
|||
|
|||
Because each JSON object will be separated by a new line, they can't contain literal new line characters in their content, but they can contain escaped new lines (`\n`), which is part of the JSON standard. |
|||
|
|||
But normally you won't have to worry about it, it's done automatically, continue reading. 🤓 |
|||
|
|||
/// |
|||
|
|||
## Use Cases { #use-cases } |
|||
|
|||
You could use this to stream data from an **AI LLM** service, from **logs** or **telemetry**, or from other types of data that can be structured in **JSON** items. |
|||
|
|||
/// tip |
|||
|
|||
If you want to stream binary data, for example video or audio, check the advanced guide: [Stream Data](../advanced/stream-data.md). |
|||
|
|||
/// |
|||
|
|||
## Stream JSON Lines with FastAPI { #stream-json-lines-with-fastapi } |
|||
|
|||
To stream JSON Lines with FastAPI you can, instead of using `return` in your *path operation function*, use `yield` to produce each item in turn. |
|||
|
|||
{* ../../docs_src/stream_json_lines/tutorial001_py310.py ln[1:24] hl[24] *} |
|||
|
|||
If each JSON item you want to send back is of type `Item` (a Pydantic model) and it's an async function, you can declare the return type as `AsyncIterable[Item]`: |
|||
|
|||
{* ../../docs_src/stream_json_lines/tutorial001_py310.py ln[1:24] hl[9:11,22] *} |
|||
|
|||
If you declare the return type, FastAPI will use it to **validate** the data, **document** it in OpenAPI, **filter** it, and **serialize** it using Pydantic. |
|||
|
|||
/// tip |
|||
|
|||
As Pydantic will serialize it in the **Rust** side, you will get much higher **performance** than if you don't declare a return type. |
|||
|
|||
/// |
|||
|
|||
### Non-async *path operation functions* { #non-async-path-operation-functions } |
|||
|
|||
You can also use regular `def` functions (without `async`), and use `yield` the same way. |
|||
|
|||
FastAPI will make sure it's run correctly so that it doesn't block the event loop. |
|||
|
|||
As in this case the function is not async, the right return type would be `Iterable[Item]`: |
|||
|
|||
{* ../../docs_src/stream_json_lines/tutorial001_py310.py ln[27:30] hl[28] *} |
|||
|
|||
### No Return Type { #no-return-type } |
|||
|
|||
You can also omit the return type. FastAPI will then use the [`jsonable_encoder`](./encoder.md){.internal-link target=_blank} to convert the data to something that can be serialized to JSON and then send it as JSON Lines. |
|||
|
|||
{* ../../docs_src/stream_json_lines/tutorial001_py310.py ln[33:36] hl[34] *} |
|||
|
|||
## Server-Sent Events (SSE) { #server-sent-events-sse } |
|||
|
|||
FastAPI also has first-class support for Server-Sent Events (SSE), which are quite similar but with a couple of extra details. You can learn about them in the next chapter: [Server-Sent Events (SSE)](server-sent-events.md){.internal-link target=_blank}. 🤓 |
|||
@ -1,9 +1,9 @@ |
|||
from fastapi import FastAPI |
|||
from fastapi.responses import ORJSONResponse |
|||
from fastapi.responses import HTMLResponse |
|||
|
|||
app = FastAPI(default_response_class=ORJSONResponse) |
|||
app = FastAPI(default_response_class=HTMLResponse) |
|||
|
|||
|
|||
@app.get("/items/") |
|||
async def read_items(): |
|||
return [{"item_id": "Foo"}] |
|||
return "<h1>Items</h1><p>This is a list of items.</p>" |
|||
|
|||
@ -0,0 +1,46 @@ |
|||
from fastapi import FastAPI |
|||
from pydantic import BaseModel |
|||
|
|||
|
|||
class DataInput(BaseModel): |
|||
description: str |
|||
data: bytes |
|||
|
|||
model_config = {"val_json_bytes": "base64"} |
|||
|
|||
|
|||
class DataOutput(BaseModel): |
|||
description: str |
|||
data: bytes |
|||
|
|||
model_config = {"ser_json_bytes": "base64"} |
|||
|
|||
|
|||
class DataInputOutput(BaseModel): |
|||
description: str |
|||
data: bytes |
|||
|
|||
model_config = { |
|||
"val_json_bytes": "base64", |
|||
"ser_json_bytes": "base64", |
|||
} |
|||
|
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
@app.post("/data") |
|||
def post_data(body: DataInput): |
|||
content = body.data.decode("utf-8") |
|||
return {"description": body.description, "content": content} |
|||
|
|||
|
|||
@app.get("/data") |
|||
def get_data() -> DataOutput: |
|||
data = "hello".encode("utf-8") |
|||
return DataOutput(description="A plumbus", data=data) |
|||
|
|||
|
|||
@app.post("/data-in-out") |
|||
def post_data_in_out(body: DataInputOutput) -> DataInputOutput: |
|||
return body |
|||
@ -1,2 +0,0 @@ |
|||
def get_items(item_a: str, item_b: int, item_c: float, item_d: bool, item_e: bytes): |
|||
return item_a, item_b, item_c, item_d, item_e |
|||
@ -0,0 +1,43 @@ |
|||
from collections.abc import AsyncIterable, Iterable |
|||
|
|||
from fastapi import FastAPI |
|||
from fastapi.sse import EventSourceResponse |
|||
from pydantic import BaseModel |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
class Item(BaseModel): |
|||
name: str |
|||
description: str | None |
|||
|
|||
|
|||
items = [ |
|||
Item(name="Plumbus", description="A multi-purpose household device."), |
|||
Item(name="Portal Gun", description="A portal opening device."), |
|||
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."), |
|||
] |
|||
|
|||
|
|||
@app.get("/items/stream", response_class=EventSourceResponse) |
|||
async def sse_items() -> AsyncIterable[Item]: |
|||
for item in items: |
|||
yield item |
|||
|
|||
|
|||
@app.get("/items/stream-no-async", response_class=EventSourceResponse) |
|||
def sse_items_no_async() -> Iterable[Item]: |
|||
for item in items: |
|||
yield item |
|||
|
|||
|
|||
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse) |
|||
async def sse_items_no_annotation(): |
|||
for item in items: |
|||
yield item |
|||
|
|||
|
|||
@app.get("/items/stream-no-async-no-annotation", response_class=EventSourceResponse) |
|||
def sse_items_no_async_no_annotation(): |
|||
for item in items: |
|||
yield item |
|||
@ -0,0 +1,26 @@ |
|||
from collections.abc import AsyncIterable |
|||
|
|||
from fastapi import FastAPI |
|||
from fastapi.sse import EventSourceResponse, ServerSentEvent |
|||
from pydantic import BaseModel |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
class Item(BaseModel): |
|||
name: str |
|||
price: float |
|||
|
|||
|
|||
items = [ |
|||
Item(name="Plumbus", price=32.99), |
|||
Item(name="Portal Gun", price=999.99), |
|||
Item(name="Meeseeks Box", price=49.99), |
|||
] |
|||
|
|||
|
|||
@app.get("/items/stream", response_class=EventSourceResponse) |
|||
async def stream_items() -> AsyncIterable[ServerSentEvent]: |
|||
yield ServerSentEvent(comment="stream of item updates") |
|||
for i, item in enumerate(items): |
|||
yield ServerSentEvent(data=item, event="item_update", id=str(i + 1), retry=5000) |
|||
@ -0,0 +1,17 @@ |
|||
from collections.abc import AsyncIterable |
|||
|
|||
from fastapi import FastAPI |
|||
from fastapi.sse import EventSourceResponse, ServerSentEvent |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
@app.get("/logs/stream", response_class=EventSourceResponse) |
|||
async def stream_logs() -> AsyncIterable[ServerSentEvent]: |
|||
logs = [ |
|||
"2025-01-01 INFO Application started", |
|||
"2025-01-01 DEBUG Connected to database", |
|||
"2025-01-01 WARN High memory usage detected", |
|||
] |
|||
for log_line in logs: |
|||
yield ServerSentEvent(raw_data=log_line) |
|||
@ -0,0 +1,31 @@ |
|||
from collections.abc import AsyncIterable |
|||
from typing import Annotated |
|||
|
|||
from fastapi import FastAPI, Header |
|||
from fastapi.sse import EventSourceResponse, ServerSentEvent |
|||
from pydantic import BaseModel |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
class Item(BaseModel): |
|||
name: str |
|||
price: float |
|||
|
|||
|
|||
items = [ |
|||
Item(name="Plumbus", price=32.99), |
|||
Item(name="Portal Gun", price=999.99), |
|||
Item(name="Meeseeks Box", price=49.99), |
|||
] |
|||
|
|||
|
|||
@app.get("/items/stream", response_class=EventSourceResponse) |
|||
async def stream_items( |
|||
last_event_id: Annotated[int | None, Header()] = None, |
|||
) -> AsyncIterable[ServerSentEvent]: |
|||
start = last_event_id + 1 if last_event_id is not None else 0 |
|||
for i, item in enumerate(items): |
|||
if i < start: |
|||
continue |
|||
yield ServerSentEvent(data=item, id=str(i)) |
|||
@ -0,0 +1,19 @@ |
|||
from collections.abc import AsyncIterable |
|||
|
|||
from fastapi import FastAPI |
|||
from fastapi.sse import EventSourceResponse, ServerSentEvent |
|||
from pydantic import BaseModel |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
class Prompt(BaseModel): |
|||
text: str |
|||
|
|||
|
|||
@app.post("/chat/stream", response_class=EventSourceResponse) |
|||
async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]: |
|||
words = prompt.text.split() |
|||
for word in words: |
|||
yield ServerSentEvent(data=word, event="token") |
|||
yield ServerSentEvent(raw_data="[DONE]", event="done") |
|||
@ -0,0 +1,65 @@ |
|||
from collections.abc import AsyncIterable, Iterable |
|||
|
|||
from fastapi import FastAPI |
|||
from fastapi.responses import StreamingResponse |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
message = """ |
|||
Rick: (stumbles in drunkenly, and turns on the lights) Morty! You gotta come on. You got--... you gotta come with me. |
|||
Morty: (rubs his eyes) What, Rick? What's going on? |
|||
Rick: I got a surprise for you, Morty. |
|||
Morty: It's the middle of the night. What are you talking about? |
|||
Rick: (spills alcohol on Morty's bed) Come on, I got a surprise for you. (drags Morty by the ankle) Come on, hurry up. (pulls Morty out of his bed and into the hall) |
|||
Morty: Ow! Ow! You're tugging me too hard! |
|||
Rick: We gotta go, gotta get outta here, come on. Got a surprise for you Morty. |
|||
""" |
|||
|
|||
|
|||
@app.get("/story/stream", response_class=StreamingResponse) |
|||
async def stream_story() -> AsyncIterable[str]: |
|||
for line in message.splitlines(): |
|||
yield line |
|||
|
|||
|
|||
@app.get("/story/stream-no-async", response_class=StreamingResponse) |
|||
def stream_story_no_async() -> Iterable[str]: |
|||
for line in message.splitlines(): |
|||
yield line |
|||
|
|||
|
|||
@app.get("/story/stream-no-annotation", response_class=StreamingResponse) |
|||
async def stream_story_no_annotation(): |
|||
for line in message.splitlines(): |
|||
yield line |
|||
|
|||
|
|||
@app.get("/story/stream-no-async-no-annotation", response_class=StreamingResponse) |
|||
def stream_story_no_async_no_annotation(): |
|||
for line in message.splitlines(): |
|||
yield line |
|||
|
|||
|
|||
@app.get("/story/stream-bytes", response_class=StreamingResponse) |
|||
async def stream_story_bytes() -> AsyncIterable[bytes]: |
|||
for line in message.splitlines(): |
|||
yield line.encode("utf-8") |
|||
|
|||
|
|||
@app.get("/story/stream-no-async-bytes", response_class=StreamingResponse) |
|||
def stream_story_no_async_bytes() -> Iterable[bytes]: |
|||
for line in message.splitlines(): |
|||
yield line.encode("utf-8") |
|||
|
|||
|
|||
@app.get("/story/stream-no-annotation-bytes", response_class=StreamingResponse) |
|||
async def stream_story_no_annotation_bytes(): |
|||
for line in message.splitlines(): |
|||
yield line.encode("utf-8") |
|||
|
|||
|
|||
@app.get("/story/stream-no-async-no-annotation-bytes", response_class=StreamingResponse) |
|||
def stream_story_no_async_no_annotation_bytes(): |
|||
for line in message.splitlines(): |
|||
yield line.encode("utf-8") |
|||
@ -0,0 +1,54 @@ |
|||
import base64 |
|||
from collections.abc import AsyncIterable, Iterable |
|||
from io import BytesIO |
|||
|
|||
from fastapi import FastAPI |
|||
from fastapi.responses import StreamingResponse |
|||
|
|||
image_base64 = "iVBORw0KGgoAAAANSUhEUgAAAB0AAAAdCAYAAABWk2cPAAAAbnpUWHRSYXcgcHJvZmlsZSB0eXBlIGV4aWYAAHjadYzRDYAwCET/mcIRDoq0jGOiJm7g+NJK0vjhS4DjIEfHfZ20DKqSrrWZmyFQV5ctRMOLACxglNCcXk7zVqFzJzF8kV6R5vOJ97yVH78HjfYAtg0ged033ZgAAAoCaVRYdFhNTDpjb20uYWRvYmUueG1wAAAAAAA8P3hwYWNrZXQgYmVnaW49Iu+7vyIgaWQ9Ilc1TTBNcENlaGlIenJlU3pOVGN6a2M5ZCI/Pgo8eDp4bXBtZXRhIHhtbG5zOng9ImFkb2JlOm5zOm1ldGEvIiB4OnhtcHRrPSJYTVAgQ29yZSA0LjQuMC1FeGl2MiI+CiA8cmRmOlJERiB4bWxuczpyZGY9Imh0dHA6Ly93d3cudzMub3JnLzE5OTkvMDIvMjItcmRmLXN5bnRheC1ucyMiPgogIDxyZGY6RGVzY3JpcHRpb24gcmRmOmFib3V0PSIiCiAgICB4bWxuczpleGlmPSJodHRwOi8vbnMuYWRvYmUuY29tL2V4aWYvMS4wLyIKICAgIHhtbG5zOnRpZmY9Imh0dHA6Ly9ucy5hZG9iZS5jb20vdGlmZi8xLjAvIgogICBleGlmOlBpeGVsWERpbWVuc2lvbj0iMjkiCiAgIGV4aWY6UGl4ZWxZRGltZW5zaW9uPSIyOSIKICAgdGlmZjpJbWFnZVdpZHRoPSIyOSIKICAgdGlmZjpJbWFnZUxlbmd0aD0iMjkiCiAgIHRpZmY6T3JpZW50YXRpb249IjEiLz4KIDwvcmRmOlJERj4KPC94OnhtcG1ldGE+CiAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAKICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgIAogICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgCiAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAKICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgIAogICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgCiAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAKICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgIAogICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgCiAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAKICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgIAogICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgCiAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAKICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgIAogICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgCiAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAKICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgIAogICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgCiAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAKICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgICAgIAogICAgICAgICAgICAgICAgICAgICAgICAgICAKPD94cGFja2V0IGVuZD0idyI/PnQkBZAAAAAEc0JJVAgICAh8CGSIAAABoklEQVRIx8VXwY7FIAjE5iXWU+P/f6RHPNW9LIaOoHYP+0yMShVkwNGG1lqjfy4HfaF0oyEEt+oSQqBaa//m9Wd6PlqhhbRMDiEQM3e59FNKw5qZHpnQfuPaW6lazsztvu/eElFj5j63lNLlMz2ttbZtVMu1MTGo5Sujn93gMzOllKiUQjHGB9QxxneZhJ5iwZ1rL2fwenoGeL0q3wVGhBPHMz0PeFccIfASEeWcO8xEROd50q6eAV6s1s5XXoncas1EKqVQznnwUBdJJmm1l3hmmdlOMrGO8Vl5gZ56Y0y8IZF0BuqkQWM4B6HXrRCKa1SEqyzEo7KK59RT/VHDjX3ZvSefeW3CO6O6vsiA1NrwVkxxAcYTCcHyTjZmJd00pugBQoTnzjvn+kzLBh9GtRDjhleZFwbx3kugP3GvFzdkqRlbDYw0u/HxKjuOw2QxZCGL5V5f4l7cd6qsffUa1DcLM9N1XcTMvep5ul1e4jNPtZfWGIkE6dI8MquXg/dS2CGVJQ2ushd5GmlxFdOw+1tRa32MY4zDQ9yaZ60J3/iX+QG4U3qGrFHmswAAAABJRU5ErkJggg==" |
|||
binary_image = base64.b64decode(image_base64) |
|||
|
|||
|
|||
def read_image() -> BytesIO: |
|||
return BytesIO(binary_image) |
|||
|
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
class PNGStreamingResponse(StreamingResponse): |
|||
media_type = "image/png" |
|||
|
|||
|
|||
@app.get("/image/stream", response_class=PNGStreamingResponse) |
|||
async def stream_image() -> AsyncIterable[bytes]: |
|||
with read_image() as image_file: |
|||
for chunk in image_file: |
|||
yield chunk |
|||
|
|||
|
|||
@app.get("/image/stream-no-async", response_class=PNGStreamingResponse) |
|||
def stream_image_no_async() -> Iterable[bytes]: |
|||
with read_image() as image_file: |
|||
for chunk in image_file: |
|||
yield chunk |
|||
|
|||
|
|||
@app.get("/image/stream-no-async-yield-from", response_class=PNGStreamingResponse) |
|||
def stream_image_no_async_yield_from() -> Iterable[bytes]: |
|||
with read_image() as image_file: |
|||
yield from image_file |
|||
|
|||
|
|||
@app.get("/image/stream-no-annotation", response_class=PNGStreamingResponse) |
|||
async def stream_image_no_annotation(): |
|||
with read_image() as image_file: |
|||
for chunk in image_file: |
|||
yield chunk |
|||
|
|||
|
|||
@app.get("/image/stream-no-async-no-annotation", response_class=PNGStreamingResponse) |
|||
def stream_image_no_async_no_annotation(): |
|||
with read_image() as image_file: |
|||
for chunk in image_file: |
|||
yield chunk |
|||
@ -0,0 +1,42 @@ |
|||
from collections.abc import AsyncIterable, Iterable |
|||
|
|||
from fastapi import FastAPI |
|||
from pydantic import BaseModel |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
class Item(BaseModel): |
|||
name: str |
|||
description: str | None |
|||
|
|||
|
|||
items = [ |
|||
Item(name="Plumbus", description="A multi-purpose household device."), |
|||
Item(name="Portal Gun", description="A portal opening device."), |
|||
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."), |
|||
] |
|||
|
|||
|
|||
@app.get("/items/stream") |
|||
async def stream_items() -> AsyncIterable[Item]: |
|||
for item in items: |
|||
yield item |
|||
|
|||
|
|||
@app.get("/items/stream-no-async") |
|||
def stream_items_no_async() -> Iterable[Item]: |
|||
for item in items: |
|||
yield item |
|||
|
|||
|
|||
@app.get("/items/stream-no-annotation") |
|||
async def stream_items_no_annotation(): |
|||
for item in items: |
|||
yield item |
|||
|
|||
|
|||
@app.get("/items/stream-no-async-no-annotation") |
|||
def stream_items_no_async_no_annotation(): |
|||
for item in items: |
|||
yield item |
|||
@ -0,0 +1,14 @@ |
|||
from fastapi import FastAPI |
|||
from pydantic import BaseModel |
|||
|
|||
app = FastAPI(strict_content_type=False) |
|||
|
|||
|
|||
class Item(BaseModel): |
|||
name: str |
|||
price: float |
|||
|
|||
|
|||
@app.post("/items/") |
|||
async def create_item(item: Item): |
|||
return item |
|||
@ -0,0 +1,436 @@ |
|||
--- |
|||
name: fastapi |
|||
description: FastAPI best practices and conventions. Use when working with FastAPI APIs and Pydantic models for them. Keeps FastAPI code clean and up to date with the latest features and patterns, updated with new versions. Write new code or refactor and update old code. |
|||
--- |
|||
|
|||
# FastAPI |
|||
|
|||
Official FastAPI skill to write code with best practices, keeping up to date with new versions and features. |
|||
|
|||
## Use the `fastapi` CLI |
|||
|
|||
Run the development server on localhost with reload: |
|||
|
|||
```bash |
|||
fastapi dev |
|||
``` |
|||
|
|||
|
|||
Run the production server: |
|||
|
|||
```bash |
|||
fastapi run |
|||
``` |
|||
|
|||
### Add an entrypoint in `pyproject.toml` |
|||
|
|||
FastAPI CLI will read the entrypoint in `pyproject.toml` to know where the FastAPI app is declared. |
|||
|
|||
```toml |
|||
[tool.fastapi] |
|||
entrypoint = "my_app.main:app" |
|||
``` |
|||
|
|||
### Use `fastapi` with a path |
|||
|
|||
When adding the entrypoint to `pyproject.toml` is not possible, or the user explicitly asks not to, or it's running an independent small app, you can pass the app file path to the `fastapi` command: |
|||
|
|||
```bash |
|||
fastapi dev my_app/main.py |
|||
``` |
|||
|
|||
Prefer to set the entrypoint in `pyproject.toml` when possible. |
|||
|
|||
## Use `Annotated` |
|||
|
|||
Always prefer the `Annotated` style for parameter and dependency declarations. |
|||
|
|||
It keeps the function signatures working in other contexts, respects the types, allows reusability. |
|||
|
|||
### In Parameter Declarations |
|||
|
|||
Use `Annotated` for parameter declarations, including `Path`, `Query`, `Header`, etc.: |
|||
|
|||
```python |
|||
from typing import Annotated |
|||
|
|||
from fastapi import FastAPI, Path, Query |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
@app.get("/items/{item_id}") |
|||
async def read_item( |
|||
item_id: Annotated[int, Path(ge=1, description="The item ID")], |
|||
q: Annotated[str | None, Query(max_length=50)] = None, |
|||
): |
|||
return {"message": "Hello World"} |
|||
``` |
|||
|
|||
instead of: |
|||
|
|||
```python |
|||
# DO NOT DO THIS |
|||
@app.get("/items/{item_id}") |
|||
async def read_item( |
|||
item_id: int = Path(ge=1, description="The item ID"), |
|||
q: str | None = Query(default=None, max_length=50), |
|||
): |
|||
return {"message": "Hello World"} |
|||
``` |
|||
|
|||
### For Dependencies |
|||
|
|||
Use `Annotated` for dependencies with `Depends()`. |
|||
|
|||
Unless asked not to, create a new type alias for the dependency to allow re-using it. |
|||
|
|||
```python |
|||
from typing import Annotated |
|||
|
|||
from fastapi import Depends, FastAPI |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
def get_current_user(): |
|||
return {"username": "johndoe"} |
|||
|
|||
|
|||
CurrentUserDep = Annotated[dict, Depends(get_current_user)] |
|||
|
|||
|
|||
@app.get("/items/") |
|||
async def read_item(current_user: CurrentUserDep): |
|||
return {"message": "Hello World"} |
|||
``` |
|||
|
|||
instead of: |
|||
|
|||
```python |
|||
# DO NOT DO THIS |
|||
@app.get("/items/") |
|||
async def read_item(current_user: dict = Depends(get_current_user)): |
|||
return {"message": "Hello World"} |
|||
``` |
|||
|
|||
## Do not use Ellipsis for *path operations* or Pydantic models |
|||
|
|||
Do not use `...` as a default value for required parameters, it's not needed and not recommended. |
|||
|
|||
Do this, without Ellipsis (`...`): |
|||
|
|||
```python |
|||
from typing import Annotated |
|||
|
|||
from fastapi import FastAPI, Query |
|||
from pydantic import BaseModel, Field |
|||
|
|||
|
|||
class Item(BaseModel): |
|||
name: str |
|||
description: str | None = None |
|||
price: float = Field(gt=0) |
|||
|
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
@app.post("/items/") |
|||
async def create_item(item: Item, project_id: Annotated[int, Query()]): ... |
|||
``` |
|||
|
|||
instead of this: |
|||
|
|||
```python |
|||
# DO NOT DO THIS |
|||
class Item(BaseModel): |
|||
name: str = ... |
|||
description: str | None = None |
|||
price: float = Field(..., gt=0) |
|||
|
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
@app.post("/items/") |
|||
async def create_item(item: Item, project_id: Annotated[int, Query(...)]): ... |
|||
``` |
|||
|
|||
## Return Type or Response Model |
|||
|
|||
When possible, include a return type. It will be used to validate, filter, document, and serialize the response. |
|||
|
|||
```python |
|||
from fastapi import FastAPI |
|||
from pydantic import BaseModel |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
class Item(BaseModel): |
|||
name: str |
|||
description: str | None = None |
|||
|
|||
|
|||
@app.get("/items/me") |
|||
async def get_item() -> Item: |
|||
return Item(name="Plumbus", description="All-purpose home device") |
|||
``` |
|||
|
|||
**Important**: Return types or response models are what filter data ensuring no sensitive information is exposed. And they are used to serialize data with Pydantic (in Rust), this is the main idea that can increase response performance. |
|||
|
|||
The return type doesn't have to be a Pydantic model, it could be a different type, like a list of integers, or a dict, etc. |
|||
|
|||
### When to use `response_model` instead |
|||
|
|||
If the return type is not the same as the type that you want to use to validate, filter, or serialize, use the `response_model` parameter on the decorator instead. |
|||
|
|||
```python |
|||
from typing import Any |
|||
|
|||
from fastapi import FastAPI |
|||
from pydantic import BaseModel |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
class Item(BaseModel): |
|||
name: str |
|||
description: str | None = None |
|||
|
|||
|
|||
@app.get("/items/me", response_model=Item) |
|||
async def get_item() -> Any: |
|||
return {"name": "Foo", "description": "A very nice Item"} |
|||
``` |
|||
|
|||
This can be particularly useful when filtering data to expose only the public fields and avoid exposing sensitive information. |
|||
|
|||
```python |
|||
from typing import Any |
|||
|
|||
from fastapi import FastAPI |
|||
from pydantic import BaseModel |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
class InternalItem(BaseModel): |
|||
name: str |
|||
description: str | None = None |
|||
secret_key: str |
|||
|
|||
|
|||
class Item(BaseModel): |
|||
name: str |
|||
description: str | None = None |
|||
|
|||
|
|||
@app.get("/items/me", response_model=Item) |
|||
async def get_item() -> Any: |
|||
item = InternalItem( |
|||
name="Foo", description="A very nice Item", secret_key="supersecret" |
|||
) |
|||
return item |
|||
``` |
|||
|
|||
## Performance |
|||
|
|||
Do not use `ORJSONResponse` or `UJSONResponse`, they are deprecated. |
|||
|
|||
Instead, declare a return type or response model. Pydantic will handle the data serialization on the Rust side. |
|||
|
|||
## Including Routers |
|||
|
|||
When declaring routers, prefer to add router level parameters like prefix, tags, etc. to the router itself, instead of in `include_router()`. |
|||
|
|||
Do this: |
|||
|
|||
```python |
|||
from fastapi import APIRouter, FastAPI |
|||
|
|||
app = FastAPI() |
|||
|
|||
router = APIRouter(prefix="/items", tags=["items"]) |
|||
|
|||
|
|||
@router.get("/") |
|||
async def list_items(): |
|||
return [] |
|||
|
|||
|
|||
# In main.py |
|||
app.include_router(router) |
|||
``` |
|||
|
|||
instead of this: |
|||
|
|||
```python |
|||
# DO NOT DO THIS |
|||
from fastapi import APIRouter, FastAPI |
|||
|
|||
app = FastAPI() |
|||
|
|||
router = APIRouter() |
|||
|
|||
|
|||
@router.get("/") |
|||
async def list_items(): |
|||
return [] |
|||
|
|||
|
|||
# In main.py |
|||
app.include_router(router, prefix="/items", tags=["items"]) |
|||
``` |
|||
|
|||
There could be exceptions, but try to follow this convention. |
|||
|
|||
Apply shared dependencies at the router level via `dependencies=[Depends(...)]`. |
|||
|
|||
## Dependency Injection |
|||
|
|||
See [the dependency injection reference](references/dependencies.md) for detailed patterns including `yield` with `scope`, and class dependencies. |
|||
|
|||
Use dependencies when the logic can't be declared in Pydantic validation, depends on external resources, needs cleanup (with `yield`), or is shared across endpoints. |
|||
|
|||
Apply shared dependencies at the router level via `dependencies=[Depends(...)]`. |
|||
|
|||
## Async vs Sync *path operations* |
|||
|
|||
Use `async` *path operations* only when fully certain that the logic called inside is compatible with async and await (it's called with `await`) or that doesn't block. |
|||
|
|||
```python |
|||
from fastapi import FastAPI |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
# Use async def when calling async code |
|||
@app.get("/async-items/") |
|||
async def read_async_items(): |
|||
data = await some_async_library.fetch_items() |
|||
return data |
|||
|
|||
|
|||
# Use plain def when calling blocking/sync code or when in doubt |
|||
@app.get("/items/") |
|||
def read_items(): |
|||
data = some_blocking_library.fetch_items() |
|||
return data |
|||
``` |
|||
|
|||
In case of doubt, or by default, use regular `def` functions, those will be run in a threadpool so they don't block the event loop. |
|||
|
|||
The same rules apply to dependencies. |
|||
|
|||
Make sure blocking code is not run inside of `async` functions. The logic will work, but will damage the performance heavily. |
|||
|
|||
When needing to mix blocking and async code, see Asyncer in [the other tools reference](references/other-tools.md). |
|||
|
|||
## Streaming (JSON Lines, SSE, bytes) |
|||
|
|||
See [the streaming reference](references/streaming.md) for JSON Lines, Server-Sent Events (`EventSourceResponse`, `ServerSentEvent`), and byte streaming (`StreamingResponse`) patterns. |
|||
|
|||
## Tooling |
|||
|
|||
See [the other tools reference](references/other-tools.md) for details on uv, Ruff, ty for package management, linting, type checking, formatting, etc. |
|||
|
|||
## Other Libraries |
|||
|
|||
See [the other tools reference](references/other-tools.md) for details on other libraries: |
|||
|
|||
* Asyncer for handling async and await, concurrency, mixing async and blocking code, prefer it over AnyIO or asyncio. |
|||
* SQLModel for working with SQL databases, prefer it over SQLAlchemy. |
|||
* HTTPX for interacting with HTTP (other APIs), prefer it over Requests. |
|||
|
|||
## Do not use Pydantic RootModels |
|||
|
|||
Do not use Pydantic `RootModel`, instead use regular type annotations with `Annotated` and Pydantic validation utilities. |
|||
|
|||
For example, for a list with validations you could do: |
|||
|
|||
```python |
|||
from typing import Annotated |
|||
|
|||
from fastapi import Body, FastAPI |
|||
from pydantic import Field |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
@app.post("/items/") |
|||
async def create_items(items: Annotated[list[int], Field(min_length=1), Body()]): |
|||
return items |
|||
``` |
|||
|
|||
instead of: |
|||
|
|||
```python |
|||
# DO NOT DO THIS |
|||
from typing import Annotated |
|||
|
|||
from fastapi import FastAPI |
|||
from pydantic import Field, RootModel |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
class ItemList(RootModel[Annotated[list[int], Field(min_length=1)]]): |
|||
pass |
|||
|
|||
|
|||
@app.post("/items/") |
|||
async def create_items(items: ItemList): |
|||
return items |
|||
|
|||
``` |
|||
|
|||
FastAPI supports these type annotations and will create a Pydantic `TypeAdapter` for them, so that types can work as normally and there's no need for the custom logic and types in RootModels. |
|||
|
|||
## Use one HTTP operation per function |
|||
|
|||
Don't mix HTTP operations in a single function, having one function per HTTP operation helps separate concerns and organize the code. |
|||
|
|||
Do this: |
|||
|
|||
```python |
|||
from fastapi import FastAPI |
|||
from pydantic import BaseModel |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
class Item(BaseModel): |
|||
name: str |
|||
|
|||
|
|||
@app.get("/items/") |
|||
async def list_items(): |
|||
return [] |
|||
|
|||
|
|||
@app.post("/items/") |
|||
async def create_item(item: Item): |
|||
return item |
|||
``` |
|||
|
|||
instead of this: |
|||
|
|||
```python |
|||
# DO NOT DO THIS |
|||
from fastapi import FastAPI, Request |
|||
from pydantic import BaseModel |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
class Item(BaseModel): |
|||
name: str |
|||
|
|||
|
|||
@app.api_route("/items/", methods=["GET", "POST"]) |
|||
async def handle_items(request: Request): |
|||
if request.method == "GET": |
|||
return [] |
|||
``` |
|||
@ -0,0 +1,142 @@ |
|||
# Dependency Injection |
|||
|
|||
Use dependencies when: |
|||
|
|||
* They can't be declared in Pydantic validation and require additional logic |
|||
* The logic depends on external resources or could block in any other way |
|||
* Other dependencies need their results (it's a sub-dependency) |
|||
* The logic can be shared by multiple endpoints to do things like error early, authentication, etc. |
|||
* They need to handle cleanup (e.g., DB sessions, file handles), using dependencies with `yield` |
|||
* Their logic needs input data from the request, like headers, query parameters, etc. |
|||
|
|||
## Dependencies with `yield` and `scope` |
|||
|
|||
When using dependencies with `yield`, they can have a `scope` that defines when the exit code is run. |
|||
|
|||
Use the default scope `"request"` to run the exit code after the response is sent back. |
|||
|
|||
```python |
|||
from typing import Annotated |
|||
|
|||
from fastapi import Depends, FastAPI |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
def get_db(): |
|||
db = DBSession() |
|||
try: |
|||
yield db |
|||
finally: |
|||
db.close() |
|||
|
|||
|
|||
DBDep = Annotated[DBSession, Depends(get_db)] |
|||
|
|||
|
|||
@app.get("/items/") |
|||
async def read_items(db: DBDep): |
|||
return db.query(Item).all() |
|||
``` |
|||
|
|||
Use the scope `"function"` when they should run the exit code after the response data is generated but before the response is sent back to the client. |
|||
|
|||
```python |
|||
from typing import Annotated |
|||
|
|||
from fastapi import Depends, FastAPI |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
def get_username(): |
|||
try: |
|||
yield "Rick" |
|||
finally: |
|||
print("Cleanup up before response is sent") |
|||
|
|||
UserNameDep = Annotated[str, Depends(get_username, scope="function")] |
|||
|
|||
@app.get("/users/me") |
|||
def get_user_me(username: UserNameDep): |
|||
return username |
|||
``` |
|||
|
|||
## Class Dependencies |
|||
|
|||
Avoid creating class dependencies when possible. |
|||
|
|||
If a class is needed, instead create a regular function dependency that returns a class instance. |
|||
|
|||
Do this: |
|||
|
|||
```python |
|||
from dataclasses import dataclass |
|||
from typing import Annotated |
|||
|
|||
from fastapi import Depends, FastAPI |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
@dataclass |
|||
class DatabasePaginator: |
|||
offset: int = 0 |
|||
limit: int = 100 |
|||
q: str | None = None |
|||
|
|||
def get_page(self) -> dict: |
|||
# Simulate a page of data |
|||
return { |
|||
"offset": self.offset, |
|||
"limit": self.limit, |
|||
"q": self.q, |
|||
"items": [], |
|||
} |
|||
|
|||
|
|||
def get_db_paginator( |
|||
offset: int = 0, limit: int = 100, q: str | None = None |
|||
) -> DatabasePaginator: |
|||
return DatabasePaginator(offset=offset, limit=limit, q=q) |
|||
|
|||
|
|||
PaginatorDep = Annotated[DatabasePaginator, Depends(get_db_paginator)] |
|||
|
|||
|
|||
@app.get("/items/") |
|||
async def read_items(paginator: PaginatorDep): |
|||
return paginator.get_page() |
|||
``` |
|||
|
|||
instead of this: |
|||
|
|||
```python |
|||
# DO NOT DO THIS |
|||
from typing import Annotated |
|||
|
|||
from fastapi import Depends, FastAPI |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
class DatabasePaginator: |
|||
def __init__(self, offset: int = 0, limit: int = 100, q: str | None = None): |
|||
self.offset = offset |
|||
self.limit = limit |
|||
self.q = q |
|||
|
|||
def get_page(self) -> dict: |
|||
# Simulate a page of data |
|||
return { |
|||
"offset": self.offset, |
|||
"limit": self.limit, |
|||
"q": self.q, |
|||
"items": [], |
|||
} |
|||
|
|||
|
|||
@app.get("/items/") |
|||
async def read_items(paginator: Annotated[DatabasePaginator, Depends()]): |
|||
return paginator.get_page() |
|||
``` |
|||
@ -0,0 +1,76 @@ |
|||
# Other Tools |
|||
|
|||
## uv |
|||
|
|||
If uv is available, use it to manage dependencies. |
|||
|
|||
## Ruff |
|||
|
|||
If Ruff is available, use it to lint and format the code. Consider enabling the FastAPI rules. |
|||
|
|||
## ty |
|||
|
|||
If ty is available, use it to check types. |
|||
|
|||
## Asyncer |
|||
|
|||
When needing to run blocking code inside of async functions, or async code inside of blocking functions, suggest using Asyncer. |
|||
|
|||
Prefer it over AnyIO or asyncio. |
|||
|
|||
Install: |
|||
|
|||
```bash |
|||
uv add asyncer |
|||
``` |
|||
|
|||
Run blocking sync code inside of async with `asyncify()`: |
|||
|
|||
```python |
|||
from asyncer import asyncify |
|||
from fastapi import FastAPI |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
def do_blocking_work(name: str) -> str: |
|||
# Some blocking I/O operation |
|||
return f"Hello {name}" |
|||
|
|||
|
|||
@app.get("/items/") |
|||
async def read_items(): |
|||
result = await asyncify(do_blocking_work)(name="World") |
|||
return {"message": result} |
|||
``` |
|||
|
|||
And run async code inside of blocking sync code with `syncify()`: |
|||
|
|||
```python |
|||
from asyncer import syncify |
|||
from fastapi import FastAPI |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
async def do_async_work(name: str) -> str: |
|||
return f"Hello {name}" |
|||
|
|||
|
|||
@app.get("/items/") |
|||
def read_items(): |
|||
result = syncify(do_async_work)(name="World") |
|||
return {"message": result} |
|||
``` |
|||
|
|||
## SQLModel for SQL databases |
|||
|
|||
When working with SQL databases, prefer using SQLModel as it is integrated with Pydantic and will allow declaring data validation with the same models. |
|||
|
|||
Prefer it over SQLAlchemy. |
|||
|
|||
## HTTPX |
|||
|
|||
Use HTTPX for handling HTTP communication (e.g. with other APIs). It support sync and async usage. |
|||
|
|||
Prefer it over Requests. |
|||
@ -0,0 +1,105 @@ |
|||
# Streaming |
|||
|
|||
## Stream JSON Lines |
|||
|
|||
To stream JSON Lines, declare the return type and use `yield` to return the data. |
|||
|
|||
```python |
|||
@app.get("/items/stream") |
|||
async def stream_items() -> AsyncIterable[Item]: |
|||
for item in items: |
|||
yield item |
|||
``` |
|||
|
|||
## Server-Sent Events (SSE) |
|||
|
|||
To stream Server-Sent Events, use `response_class=EventSourceResponse` and `yield` items from the endpoint. |
|||
|
|||
Plain objects are automatically JSON-serialized as `data:` fields, declare the return type so the serialization is done by Pydantic: |
|||
|
|||
```python |
|||
from collections.abc import AsyncIterable |
|||
|
|||
from fastapi import FastAPI |
|||
from fastapi.sse import EventSourceResponse |
|||
from pydantic import BaseModel |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
class Item(BaseModel): |
|||
name: str |
|||
price: float |
|||
|
|||
|
|||
@app.get("/items/stream", response_class=EventSourceResponse) |
|||
async def stream_items() -> AsyncIterable[Item]: |
|||
yield Item(name="Plumbus", price=32.99) |
|||
yield Item(name="Portal Gun", price=999.99) |
|||
``` |
|||
|
|||
For full control over SSE fields (`event`, `id`, `retry`, `comment`), yield `ServerSentEvent` instances: |
|||
|
|||
```python |
|||
from collections.abc import AsyncIterable |
|||
|
|||
from fastapi import FastAPI |
|||
from fastapi.sse import EventSourceResponse, ServerSentEvent |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
@app.get("/events", response_class=EventSourceResponse) |
|||
async def stream_events() -> AsyncIterable[ServerSentEvent]: |
|||
yield ServerSentEvent(data={"status": "started"}, event="status", id="1") |
|||
yield ServerSentEvent(data={"progress": 50}, event="progress", id="2") |
|||
``` |
|||
|
|||
Use `raw_data` instead of `data` to send pre-formatted strings without JSON encoding: |
|||
|
|||
```python |
|||
yield ServerSentEvent(raw_data="plain text line", event="log") |
|||
``` |
|||
|
|||
## Stream bytes |
|||
|
|||
To stream bytes, declare a `response_class=` of `StreamingResponse` or a sub-class, and use `yield` to return the data. |
|||
|
|||
```python |
|||
from fastapi import FastAPI |
|||
from fastapi.responses import StreamingResponse |
|||
from app.utils import read_image |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
class PNGStreamingResponse(StreamingResponse): |
|||
media_type = "image/png" |
|||
|
|||
@app.get("/image", response_class=PNGStreamingResponse) |
|||
def stream_image_no_async_no_annotation(): |
|||
with read_image() as image_file: |
|||
yield from image_file |
|||
``` |
|||
|
|||
prefer this over returning a `StreamingResponse` directly: |
|||
|
|||
```python |
|||
# DO NOT DO THIS |
|||
|
|||
import anyio |
|||
from fastapi import FastAPI |
|||
from fastapi.responses import StreamingResponse |
|||
from app.utils import read_image |
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
class PNGStreamingResponse(StreamingResponse): |
|||
media_type = "image/png" |
|||
|
|||
|
|||
@app.get("/") |
|||
async def main(): |
|||
return PNGStreamingResponse(read_image()) |
|||
``` |
|||
@ -0,0 +1,222 @@ |
|||
from typing import Annotated, Any |
|||
|
|||
from annotated_doc import Doc |
|||
from pydantic import AfterValidator, BaseModel, Field, model_validator |
|||
from starlette.responses import StreamingResponse |
|||
|
|||
# Canonical SSE event schema matching the OpenAPI 3.2 spec |
|||
# (Section 4.14.4 "Special Considerations for Server-Sent Events") |
|||
_SSE_EVENT_SCHEMA: dict[str, Any] = { |
|||
"type": "object", |
|||
"properties": { |
|||
"data": {"type": "string"}, |
|||
"event": {"type": "string"}, |
|||
"id": {"type": "string"}, |
|||
"retry": {"type": "integer", "minimum": 0}, |
|||
}, |
|||
} |
|||
|
|||
|
|||
class EventSourceResponse(StreamingResponse): |
|||
"""Streaming response with `text/event-stream` media type. |
|||
|
|||
Use as `response_class=EventSourceResponse` on a *path operation* that uses `yield` |
|||
to enable Server Sent Events (SSE) responses. |
|||
|
|||
Works with **any HTTP method** (`GET`, `POST`, etc.), which makes it compatible |
|||
with protocols like MCP that stream SSE over `POST`. |
|||
|
|||
The actual encoding logic lives in the FastAPI routing layer. This class |
|||
serves mainly as a marker and sets the correct `Content-Type`. |
|||
""" |
|||
|
|||
media_type = "text/event-stream" |
|||
|
|||
|
|||
def _check_id_no_null(v: str | None) -> str | None: |
|||
if v is not None and "\0" in v: |
|||
raise ValueError("SSE 'id' must not contain null characters") |
|||
return v |
|||
|
|||
|
|||
class ServerSentEvent(BaseModel): |
|||
"""Represents a single Server-Sent Event. |
|||
|
|||
When `yield`ed from a *path operation function* that uses |
|||
`response_class=EventSourceResponse`, each `ServerSentEvent` is encoded |
|||
into the [SSE wire format](https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream) |
|||
(`text/event-stream`). |
|||
|
|||
If you yield a plain object (dict, Pydantic model, etc.) instead, it is |
|||
automatically JSON-encoded and sent as the `data:` field. |
|||
|
|||
All `data` values **including plain strings** are JSON-serialized. |
|||
|
|||
For example, `data="hello"` produces `data: "hello"` on the wire (with |
|||
quotes). |
|||
""" |
|||
|
|||
data: Annotated[ |
|||
Any, |
|||
Doc( |
|||
""" |
|||
The event payload. |
|||
|
|||
Can be any JSON-serializable value: a Pydantic model, dict, list, |
|||
string, number, etc. It is **always** serialized to JSON: strings |
|||
are quoted (`"hello"` becomes `data: "hello"` on the wire). |
|||
|
|||
Mutually exclusive with `raw_data`. |
|||
""" |
|||
), |
|||
] = None |
|||
raw_data: Annotated[ |
|||
str | None, |
|||
Doc( |
|||
""" |
|||
Raw string to send as the `data:` field **without** JSON encoding. |
|||
|
|||
Use this when you need to send pre-formatted text, HTML fragments, |
|||
CSV lines, or any non-JSON payload. The string is placed directly |
|||
into the `data:` field as-is. |
|||
|
|||
Mutually exclusive with `data`. |
|||
""" |
|||
), |
|||
] = None |
|||
event: Annotated[ |
|||
str | None, |
|||
Doc( |
|||
""" |
|||
Optional event type name. |
|||
|
|||
Maps to `addEventListener(event, ...)` on the browser. When omitted, |
|||
the browser dispatches on the generic `message` event. |
|||
""" |
|||
), |
|||
] = None |
|||
id: Annotated[ |
|||
str | None, |
|||
AfterValidator(_check_id_no_null), |
|||
Doc( |
|||
""" |
|||
Optional event ID. |
|||
|
|||
The browser sends this value back as the `Last-Event-ID` header on |
|||
automatic reconnection. **Must not contain null (`\\0`) characters.** |
|||
""" |
|||
), |
|||
] = None |
|||
retry: Annotated[ |
|||
int | None, |
|||
Field(ge=0), |
|||
Doc( |
|||
""" |
|||
Optional reconnection time in **milliseconds**. |
|||
|
|||
Tells the browser how long to wait before reconnecting after the |
|||
connection is lost. Must be a non-negative integer. |
|||
""" |
|||
), |
|||
] = None |
|||
comment: Annotated[ |
|||
str | None, |
|||
Doc( |
|||
""" |
|||
Optional comment line(s). |
|||
|
|||
Comment lines start with `:` in the SSE wire format and are ignored by |
|||
`EventSource` clients. Useful for keep-alive pings to prevent |
|||
proxy/load-balancer timeouts. |
|||
""" |
|||
), |
|||
] = None |
|||
|
|||
@model_validator(mode="after") |
|||
def _check_data_exclusive(self) -> "ServerSentEvent": |
|||
if self.data is not None and self.raw_data is not None: |
|||
raise ValueError( |
|||
"Cannot set both 'data' and 'raw_data' on the same " |
|||
"ServerSentEvent. Use 'data' for JSON-serialized payloads " |
|||
"or 'raw_data' for pre-formatted strings." |
|||
) |
|||
return self |
|||
|
|||
|
|||
def format_sse_event( |
|||
*, |
|||
data_str: Annotated[ |
|||
str | None, |
|||
Doc( |
|||
""" |
|||
Pre-serialized data string to use as the `data:` field. |
|||
""" |
|||
), |
|||
] = None, |
|||
event: Annotated[ |
|||
str | None, |
|||
Doc( |
|||
""" |
|||
Optional event type name (`event:` field). |
|||
""" |
|||
), |
|||
] = None, |
|||
id: Annotated[ |
|||
str | None, |
|||
Doc( |
|||
""" |
|||
Optional event ID (`id:` field). |
|||
""" |
|||
), |
|||
] = None, |
|||
retry: Annotated[ |
|||
int | None, |
|||
Doc( |
|||
""" |
|||
Optional reconnection time in milliseconds (`retry:` field). |
|||
""" |
|||
), |
|||
] = None, |
|||
comment: Annotated[ |
|||
str | None, |
|||
Doc( |
|||
""" |
|||
Optional comment line(s) (`:` prefix). |
|||
""" |
|||
), |
|||
] = None, |
|||
) -> bytes: |
|||
"""Build SSE wire-format bytes from **pre-serialized** data. |
|||
|
|||
The result always ends with `\n\n` (the event terminator). |
|||
""" |
|||
lines: list[str] = [] |
|||
|
|||
if comment is not None: |
|||
for line in comment.splitlines(): |
|||
lines.append(f": {line}") |
|||
|
|||
if event is not None: |
|||
lines.append(f"event: {event}") |
|||
|
|||
if data_str is not None: |
|||
for line in data_str.splitlines(): |
|||
lines.append(f"data: {line}") |
|||
|
|||
if id is not None: |
|||
lines.append(f"id: {id}") |
|||
|
|||
if retry is not None: |
|||
lines.append(f"retry: {retry}") |
|||
|
|||
lines.append("") |
|||
lines.append("") |
|||
return "\n".join(lines).encode("utf-8") |
|||
|
|||
|
|||
# Keep-alive comment, per the SSE spec recommendation |
|||
KEEPALIVE_COMMENT = b": ping\n\n" |
|||
|
|||
# Seconds between keep-alive pings when a generator is idle. |
|||
# Private but importable so tests can monkeypatch it. |
|||
_PING_INTERVAL: float = 15.0 |
|||
@ -1,40 +0,0 @@ |
|||
import os |
|||
from typing import Any |
|||
|
|||
from pdm.backend.hooks import Context |
|||
|
|||
TIANGOLO_BUILD_PACKAGE = os.getenv("TIANGOLO_BUILD_PACKAGE") |
|||
|
|||
|
|||
def pdm_build_initialize(context: Context) -> None: |
|||
metadata = context.config.metadata |
|||
# Get main version |
|||
version = metadata["version"] |
|||
# Get custom config for the current package, from the env var |
|||
all_configs_config: dict[str, Any] = context.config.data["tool"]["tiangolo"][ |
|||
"_internal-slim-build" |
|||
]["packages"] |
|||
|
|||
if TIANGOLO_BUILD_PACKAGE not in all_configs_config: |
|||
return |
|||
|
|||
config = all_configs_config[TIANGOLO_BUILD_PACKAGE] |
|||
project_config: dict[str, Any] = config["project"] |
|||
# Override main [project] configs with custom configs for this package |
|||
for key, value in project_config.items(): |
|||
metadata[key] = value |
|||
# Get custom build config for the current package |
|||
build_config: dict[str, Any] = ( |
|||
config.get("tool", {}).get("pdm", {}).get("build", {}) |
|||
) |
|||
# Override PDM build config with custom build config for this package |
|||
for key, value in build_config.items(): |
|||
context.config.build_config[key] = value |
|||
# Get main dependencies |
|||
dependencies: list[str] = metadata.get("dependencies", []) |
|||
# Sync versions in dependencies |
|||
new_dependencies = [] |
|||
for dep in dependencies: |
|||
new_dep = f"{dep}>={version}" |
|||
new_dependencies.append(new_dep) |
|||
metadata["dependencies"] = new_dependencies |
|||
@ -0,0 +1,37 @@ |
|||
import subprocess |
|||
import time |
|||
|
|||
import httpx |
|||
from playwright.sync_api import Playwright, sync_playwright |
|||
|
|||
|
|||
# Run playwright codegen to generate the code below, copy paste the sections in run() |
|||
def run(playwright: Playwright) -> None: |
|||
browser = playwright.chromium.launch(headless=False) |
|||
# Update the viewport manually |
|||
context = browser.new_context(viewport={"width": 960, "height": 1080}) |
|||
page = context.new_page() |
|||
page.goto("http://localhost:8000/docs") |
|||
page.get_by_role("button", name="POST /data Post Data").click() |
|||
# Manually add the screenshot |
|||
page.screenshot(path="docs/en/docs/img/tutorial/json-base64-bytes/image01.png") |
|||
|
|||
# --------------------- |
|||
context.close() |
|||
browser.close() |
|||
|
|||
|
|||
process = subprocess.Popen( |
|||
["fastapi", "run", "docs_src/json_base64_bytes/tutorial001_py310.py"] |
|||
) |
|||
try: |
|||
for _ in range(3): |
|||
try: |
|||
response = httpx.get("http://localhost:8000/docs") |
|||
except httpx.ConnectError: |
|||
time.sleep(1) |
|||
break |
|||
with sync_playwright() as playwright: |
|||
run(playwright) |
|||
finally: |
|||
process.terminate() |
|||
@ -0,0 +1,6 @@ |
|||
#!/usr/bin/env bash |
|||
|
|||
set -e |
|||
set -x |
|||
|
|||
bash scripts/test.sh --cov --cov-context=test ${@} |
|||
@ -0,0 +1,73 @@ |
|||
import warnings |
|||
|
|||
import pytest |
|||
from fastapi import FastAPI |
|||
from fastapi.exceptions import FastAPIDeprecationWarning |
|||
from fastapi.responses import ORJSONResponse, UJSONResponse |
|||
from fastapi.testclient import TestClient |
|||
from pydantic import BaseModel |
|||
|
|||
|
|||
class Item(BaseModel): |
|||
name: str |
|||
price: float |
|||
|
|||
|
|||
# ORJSON |
|||
|
|||
|
|||
def _make_orjson_app() -> FastAPI: |
|||
with warnings.catch_warnings(): |
|||
warnings.simplefilter("ignore", FastAPIDeprecationWarning) |
|||
app = FastAPI(default_response_class=ORJSONResponse) |
|||
|
|||
@app.get("/items") |
|||
def get_items() -> Item: |
|||
return Item(name="widget", price=9.99) |
|||
|
|||
return app |
|||
|
|||
|
|||
def test_orjson_response_returns_correct_data(): |
|||
app = _make_orjson_app() |
|||
client = TestClient(app) |
|||
with warnings.catch_warnings(): |
|||
warnings.simplefilter("ignore", FastAPIDeprecationWarning) |
|||
response = client.get("/items") |
|||
assert response.status_code == 200 |
|||
assert response.json() == {"name": "widget", "price": 9.99} |
|||
|
|||
|
|||
def test_orjson_response_emits_deprecation_warning(): |
|||
with pytest.warns(FastAPIDeprecationWarning, match="ORJSONResponse is deprecated"): |
|||
ORJSONResponse(content={"hello": "world"}) |
|||
|
|||
|
|||
# UJSON |
|||
|
|||
|
|||
def _make_ujson_app() -> FastAPI: |
|||
with warnings.catch_warnings(): |
|||
warnings.simplefilter("ignore", FastAPIDeprecationWarning) |
|||
app = FastAPI(default_response_class=UJSONResponse) |
|||
|
|||
@app.get("/items") |
|||
def get_items() -> Item: |
|||
return Item(name="widget", price=9.99) |
|||
|
|||
return app |
|||
|
|||
|
|||
def test_ujson_response_returns_correct_data(): |
|||
app = _make_ujson_app() |
|||
client = TestClient(app) |
|||
with warnings.catch_warnings(): |
|||
warnings.simplefilter("ignore", FastAPIDeprecationWarning) |
|||
response = client.get("/items") |
|||
assert response.status_code == 200 |
|||
assert response.json() == {"name": "widget", "price": 9.99} |
|||
|
|||
|
|||
def test_ujson_response_emits_deprecation_warning(): |
|||
with pytest.warns(FastAPIDeprecationWarning, match="UJSONResponse is deprecated"): |
|||
UJSONResponse(content={"hello": "world"}) |
|||
@ -0,0 +1,51 @@ |
|||
from unittest.mock import patch |
|||
|
|||
from fastapi import FastAPI |
|||
from fastapi.responses import JSONResponse |
|||
from fastapi.testclient import TestClient |
|||
from pydantic import BaseModel |
|||
|
|||
|
|||
class Item(BaseModel): |
|||
name: str |
|||
price: float |
|||
|
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
@app.get("/default") |
|||
def get_default() -> Item: |
|||
return Item(name="widget", price=9.99) |
|||
|
|||
|
|||
@app.get("/explicit", response_class=JSONResponse) |
|||
def get_explicit() -> Item: |
|||
return Item(name="widget", price=9.99) |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
|
|||
def test_default_response_class_skips_json_dumps(): |
|||
"""When no response_class is set, the fast path serializes directly to |
|||
JSON bytes via Pydantic's dump_json and never calls json.dumps.""" |
|||
with patch( |
|||
"starlette.responses.json.dumps", wraps=__import__("json").dumps |
|||
) as mock_dumps: |
|||
response = client.get("/default") |
|||
assert response.status_code == 200 |
|||
assert response.json() == {"name": "widget", "price": 9.99} |
|||
mock_dumps.assert_not_called() |
|||
|
|||
|
|||
def test_explicit_response_class_uses_json_dumps(): |
|||
"""When response_class is explicitly set to JSONResponse, the normal path |
|||
is used and json.dumps is called via JSONResponse.render().""" |
|||
with patch( |
|||
"starlette.responses.json.dumps", wraps=__import__("json").dumps |
|||
) as mock_dumps: |
|||
response = client.get("/explicit") |
|||
assert response.status_code == 200 |
|||
assert response.json() == {"name": "widget", "price": 9.99} |
|||
mock_dumps.assert_called_once() |
|||
@ -0,0 +1,75 @@ |
|||
from fastapi import FastAPI |
|||
from fastapi.testclient import TestClient |
|||
|
|||
|
|||
def test_root_path_does_not_persist_across_requests(): |
|||
app = FastAPI() |
|||
|
|||
@app.get("/") |
|||
def read_root(): # pragma: no cover |
|||
return {"ok": True} |
|||
|
|||
# Attacker request with a spoofed root_path |
|||
attacker_client = TestClient(app, root_path="/evil-api") |
|||
response1 = attacker_client.get("/openapi.json") |
|||
data1 = response1.json() |
|||
assert any(s.get("url") == "/evil-api" for s in data1.get("servers", [])) |
|||
|
|||
# Subsequent legitimate request with no root_path |
|||
clean_client = TestClient(app) |
|||
response2 = clean_client.get("/openapi.json") |
|||
data2 = response2.json() |
|||
servers = [s.get("url") for s in data2.get("servers", [])] |
|||
assert "/evil-api" not in servers |
|||
|
|||
|
|||
def test_multiple_different_root_paths_do_not_accumulate(): |
|||
app = FastAPI() |
|||
|
|||
@app.get("/") |
|||
def read_root(): # pragma: no cover |
|||
return {"ok": True} |
|||
|
|||
for prefix in ["/path-a", "/path-b", "/path-c"]: |
|||
c = TestClient(app, root_path=prefix) |
|||
c.get("/openapi.json") |
|||
|
|||
# A clean request should not have any of them |
|||
clean_client = TestClient(app) |
|||
response = clean_client.get("/openapi.json") |
|||
data = response.json() |
|||
servers = [s.get("url") for s in data.get("servers", [])] |
|||
for prefix in ["/path-a", "/path-b", "/path-c"]: |
|||
assert prefix not in servers, ( |
|||
f"root_path '{prefix}' leaked into clean request: {servers}" |
|||
) |
|||
|
|||
|
|||
def test_legitimate_root_path_still_appears(): |
|||
app = FastAPI() |
|||
|
|||
@app.get("/") |
|||
def read_root(): # pragma: no cover |
|||
return {"ok": True} |
|||
|
|||
client = TestClient(app, root_path="/api/v1") |
|||
response = client.get("/openapi.json") |
|||
data = response.json() |
|||
servers = [s.get("url") for s in data.get("servers", [])] |
|||
assert "/api/v1" in servers |
|||
|
|||
|
|||
def test_configured_servers_not_mutated(): |
|||
configured_servers = [{"url": "https://prod.example.com"}] |
|||
app = FastAPI(servers=configured_servers) |
|||
|
|||
@app.get("/") |
|||
def read_root(): # pragma: no cover |
|||
return {"ok": True} |
|||
|
|||
# Request with a rogue root_path |
|||
attacker_client = TestClient(app, root_path="/evil") |
|||
attacker_client.get("/openapi.json") |
|||
|
|||
# The original servers list must be untouched |
|||
assert configured_servers == [{"url": "https://prod.example.com"}] |
|||
@ -0,0 +1,318 @@ |
|||
import asyncio |
|||
import time |
|||
from collections.abc import AsyncIterable, Iterable |
|||
|
|||
import fastapi.routing |
|||
import pytest |
|||
from fastapi import APIRouter, FastAPI |
|||
from fastapi.responses import EventSourceResponse |
|||
from fastapi.sse import ServerSentEvent |
|||
from fastapi.testclient import TestClient |
|||
from pydantic import BaseModel |
|||
|
|||
|
|||
class Item(BaseModel): |
|||
name: str |
|||
description: str | None = None |
|||
|
|||
|
|||
items = [ |
|||
Item(name="Plumbus", description="A multi-purpose household device."), |
|||
Item(name="Portal Gun", description="A portal opening device."), |
|||
Item(name="Meeseeks Box", description="A box that summons a Meeseeks."), |
|||
] |
|||
|
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
@app.get("/items/stream", response_class=EventSourceResponse) |
|||
async def sse_items() -> AsyncIterable[Item]: |
|||
for item in items: |
|||
yield item |
|||
|
|||
|
|||
@app.get("/items/stream-sync", response_class=EventSourceResponse) |
|||
def sse_items_sync() -> Iterable[Item]: |
|||
yield from items |
|||
|
|||
|
|||
@app.get("/items/stream-no-annotation", response_class=EventSourceResponse) |
|||
async def sse_items_no_annotation(): |
|||
for item in items: |
|||
yield item |
|||
|
|||
|
|||
@app.get("/items/stream-sync-no-annotation", response_class=EventSourceResponse) |
|||
def sse_items_sync_no_annotation(): |
|||
yield from items |
|||
|
|||
|
|||
@app.get("/items/stream-dict", response_class=EventSourceResponse) |
|||
async def sse_items_dict(): |
|||
for item in items: |
|||
yield {"name": item.name, "description": item.description} |
|||
|
|||
|
|||
@app.get("/items/stream-sse-event", response_class=EventSourceResponse) |
|||
async def sse_items_event(): |
|||
yield ServerSentEvent(data="hello", event="greeting", id="1") |
|||
yield ServerSentEvent(data={"key": "value"}, event="json-data", id="2") |
|||
yield ServerSentEvent(comment="just a comment") |
|||
yield ServerSentEvent(data="retry-test", retry=5000) |
|||
|
|||
|
|||
@app.get("/items/stream-mixed", response_class=EventSourceResponse) |
|||
async def sse_items_mixed() -> AsyncIterable[Item]: |
|||
yield items[0] |
|||
yield ServerSentEvent(data="custom-event", event="special") |
|||
yield items[1] |
|||
|
|||
|
|||
@app.get("/items/stream-string", response_class=EventSourceResponse) |
|||
async def sse_items_string(): |
|||
yield ServerSentEvent(data="plain text data") |
|||
|
|||
|
|||
@app.post("/items/stream-post", response_class=EventSourceResponse) |
|||
async def sse_items_post() -> AsyncIterable[Item]: |
|||
for item in items: |
|||
yield item |
|||
|
|||
|
|||
@app.get("/items/stream-raw", response_class=EventSourceResponse) |
|||
async def sse_items_raw(): |
|||
yield ServerSentEvent(raw_data="plain text without quotes") |
|||
yield ServerSentEvent(raw_data="<div>html fragment</div>", event="html") |
|||
yield ServerSentEvent(raw_data="cpu,87.3,1709145600", event="csv") |
|||
|
|||
|
|||
router = APIRouter() |
|||
|
|||
|
|||
@router.get("/events", response_class=EventSourceResponse) |
|||
async def stream_events(): |
|||
yield {"msg": "hello"} |
|||
yield {"msg": "world"} |
|||
|
|||
|
|||
app.include_router(router, prefix="/api") |
|||
|
|||
|
|||
@pytest.fixture(name="client") |
|||
def client_fixture(): |
|||
with TestClient(app) as c: |
|||
yield c |
|||
|
|||
|
|||
def test_async_generator_with_model(client: TestClient): |
|||
response = client.get("/items/stream") |
|||
assert response.status_code == 200 |
|||
assert response.headers["content-type"] == "text/event-stream; charset=utf-8" |
|||
assert response.headers["cache-control"] == "no-cache" |
|||
assert response.headers["x-accel-buffering"] == "no" |
|||
|
|||
lines = response.text.strip().split("\n") |
|||
data_lines = [line for line in lines if line.startswith("data: ")] |
|||
assert len(data_lines) == 3 |
|||
assert '"name":"Plumbus"' in data_lines[0] or '"name": "Plumbus"' in data_lines[0] |
|||
assert ( |
|||
'"name":"Portal Gun"' in data_lines[1] |
|||
or '"name": "Portal Gun"' in data_lines[1] |
|||
) |
|||
assert ( |
|||
'"name":"Meeseeks Box"' in data_lines[2] |
|||
or '"name": "Meeseeks Box"' in data_lines[2] |
|||
) |
|||
|
|||
|
|||
def test_sync_generator_with_model(client: TestClient): |
|||
response = client.get("/items/stream-sync") |
|||
assert response.status_code == 200 |
|||
assert response.headers["content-type"] == "text/event-stream; charset=utf-8" |
|||
|
|||
data_lines = [ |
|||
line for line in response.text.strip().split("\n") if line.startswith("data: ") |
|||
] |
|||
assert len(data_lines) == 3 |
|||
|
|||
|
|||
def test_async_generator_no_annotation(client: TestClient): |
|||
response = client.get("/items/stream-no-annotation") |
|||
assert response.status_code == 200 |
|||
assert response.headers["content-type"] == "text/event-stream; charset=utf-8" |
|||
|
|||
data_lines = [ |
|||
line for line in response.text.strip().split("\n") if line.startswith("data: ") |
|||
] |
|||
assert len(data_lines) == 3 |
|||
|
|||
|
|||
def test_sync_generator_no_annotation(client: TestClient): |
|||
response = client.get("/items/stream-sync-no-annotation") |
|||
assert response.status_code == 200 |
|||
assert response.headers["content-type"] == "text/event-stream; charset=utf-8" |
|||
|
|||
data_lines = [ |
|||
line for line in response.text.strip().split("\n") if line.startswith("data: ") |
|||
] |
|||
assert len(data_lines) == 3 |
|||
|
|||
|
|||
def test_dict_items(client: TestClient): |
|||
response = client.get("/items/stream-dict") |
|||
assert response.status_code == 200 |
|||
data_lines = [ |
|||
line for line in response.text.strip().split("\n") if line.startswith("data: ") |
|||
] |
|||
assert len(data_lines) == 3 |
|||
assert '"name"' in data_lines[0] |
|||
|
|||
|
|||
def test_post_method_sse(client: TestClient): |
|||
"""SSE should work with POST (needed for MCP compatibility).""" |
|||
response = client.post("/items/stream-post") |
|||
assert response.status_code == 200 |
|||
assert response.headers["content-type"] == "text/event-stream; charset=utf-8" |
|||
data_lines = [ |
|||
line for line in response.text.strip().split("\n") if line.startswith("data: ") |
|||
] |
|||
assert len(data_lines) == 3 |
|||
|
|||
|
|||
def test_sse_events_with_fields(client: TestClient): |
|||
response = client.get("/items/stream-sse-event") |
|||
assert response.status_code == 200 |
|||
text = response.text |
|||
|
|||
assert "event: greeting\n" in text |
|||
assert 'data: "hello"\n' in text |
|||
assert "id: 1\n" in text |
|||
|
|||
assert "event: json-data\n" in text |
|||
assert "id: 2\n" in text |
|||
assert 'data: {"key": "value"}\n' in text |
|||
|
|||
assert ": just a comment\n" in text |
|||
|
|||
assert "retry: 5000\n" in text |
|||
assert 'data: "retry-test"\n' in text |
|||
|
|||
|
|||
def test_mixed_plain_and_sse_events(client: TestClient): |
|||
response = client.get("/items/stream-mixed") |
|||
assert response.status_code == 200 |
|||
text = response.text |
|||
|
|||
assert "event: special\n" in text |
|||
assert 'data: "custom-event"\n' in text |
|||
assert '"name"' in text |
|||
|
|||
|
|||
def test_string_data_json_encoded(client: TestClient): |
|||
"""Strings are always JSON-encoded (quoted).""" |
|||
response = client.get("/items/stream-string") |
|||
assert response.status_code == 200 |
|||
assert 'data: "plain text data"\n' in response.text |
|||
|
|||
|
|||
def test_server_sent_event_null_id_rejected(): |
|||
with pytest.raises(ValueError, match="null"): |
|||
ServerSentEvent(data="test", id="has\0null") |
|||
|
|||
|
|||
def test_server_sent_event_negative_retry_rejected(): |
|||
with pytest.raises(ValueError): |
|||
ServerSentEvent(data="test", retry=-1) |
|||
|
|||
|
|||
def test_server_sent_event_float_retry_rejected(): |
|||
with pytest.raises(ValueError): |
|||
ServerSentEvent(data="test", retry=1.5) # type: ignore[arg-type] |
|||
|
|||
|
|||
def test_raw_data_sent_without_json_encoding(client: TestClient): |
|||
"""raw_data is sent as-is, not JSON-encoded.""" |
|||
response = client.get("/items/stream-raw") |
|||
assert response.status_code == 200 |
|||
text = response.text |
|||
|
|||
# raw_data should appear without JSON quotes |
|||
assert "data: plain text without quotes\n" in text |
|||
# Not JSON-quoted |
|||
assert 'data: "plain text without quotes"' not in text |
|||
|
|||
assert "event: html\n" in text |
|||
assert "data: <div>html fragment</div>\n" in text |
|||
|
|||
assert "event: csv\n" in text |
|||
assert "data: cpu,87.3,1709145600\n" in text |
|||
|
|||
|
|||
def test_data_and_raw_data_mutually_exclusive(): |
|||
"""Cannot set both data and raw_data.""" |
|||
with pytest.raises(ValueError, match="Cannot set both"): |
|||
ServerSentEvent(data="json", raw_data="raw") |
|||
|
|||
|
|||
def test_sse_on_router_included_in_app(client: TestClient): |
|||
response = client.get("/api/events") |
|||
assert response.status_code == 200 |
|||
assert response.headers["content-type"] == "text/event-stream; charset=utf-8" |
|||
data_lines = [ |
|||
line for line in response.text.strip().split("\n") if line.startswith("data: ") |
|||
] |
|||
assert len(data_lines) == 2 |
|||
|
|||
|
|||
# Keepalive ping tests |
|||
|
|||
|
|||
keepalive_app = FastAPI() |
|||
|
|||
|
|||
@keepalive_app.get("/slow-async", response_class=EventSourceResponse) |
|||
async def slow_async_stream(): |
|||
yield {"n": 1} |
|||
# Sleep longer than the (monkeypatched) ping interval so a keepalive |
|||
# comment is emitted before the next item. |
|||
await asyncio.sleep(0.3) |
|||
yield {"n": 2} |
|||
|
|||
|
|||
@keepalive_app.get("/slow-sync", response_class=EventSourceResponse) |
|||
def slow_sync_stream(): |
|||
yield {"n": 1} |
|||
time.sleep(0.3) |
|||
yield {"n": 2} |
|||
|
|||
|
|||
def test_keepalive_ping_async(monkeypatch: pytest.MonkeyPatch): |
|||
monkeypatch.setattr(fastapi.routing, "_PING_INTERVAL", 0.05) |
|||
with TestClient(keepalive_app) as c: |
|||
response = c.get("/slow-async") |
|||
assert response.status_code == 200 |
|||
text = response.text |
|||
# The keepalive comment ": ping" should appear between the two data events |
|||
assert ": ping\n" in text |
|||
data_lines = [line for line in text.split("\n") if line.startswith("data: ")] |
|||
assert len(data_lines) == 2 |
|||
|
|||
|
|||
def test_keepalive_ping_sync(monkeypatch: pytest.MonkeyPatch): |
|||
monkeypatch.setattr(fastapi.routing, "_PING_INTERVAL", 0.05) |
|||
with TestClient(keepalive_app) as c: |
|||
response = c.get("/slow-sync") |
|||
assert response.status_code == 200 |
|||
text = response.text |
|||
assert ": ping\n" in text |
|||
data_lines = [line for line in text.split("\n") if line.startswith("data: ")] |
|||
assert len(data_lines) == 2 |
|||
|
|||
|
|||
def test_no_keepalive_when_fast(client: TestClient): |
|||
"""No keepalive comment when items arrive quickly.""" |
|||
response = client.get("/items/stream") |
|||
assert response.status_code == 200 |
|||
# KEEPALIVE_COMMENT is ": ping\n\n". |
|||
assert ": ping\n" not in response.text |
|||
@ -0,0 +1,42 @@ |
|||
import json |
|||
from typing import AsyncIterable, Iterable # noqa: UP035 to test coverage |
|||
|
|||
from fastapi import FastAPI |
|||
from fastapi.testclient import TestClient |
|||
from pydantic import BaseModel |
|||
|
|||
|
|||
class Item(BaseModel): |
|||
name: str |
|||
|
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
@app.get("/items/stream-bare-async") |
|||
async def stream_bare_async() -> AsyncIterable: |
|||
yield {"name": "foo"} |
|||
|
|||
|
|||
@app.get("/items/stream-bare-sync") |
|||
def stream_bare_sync() -> Iterable: |
|||
yield {"name": "bar"} |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
|
|||
def test_stream_bare_async_iterable(): |
|||
response = client.get("/items/stream-bare-async") |
|||
assert response.status_code == 200 |
|||
assert response.headers["content-type"] == "application/jsonl" |
|||
lines = [json.loads(line) for line in response.text.strip().splitlines()] |
|||
assert lines == [{"name": "foo"}] |
|||
|
|||
|
|||
def test_stream_bare_sync_iterable(): |
|||
response = client.get("/items/stream-bare-sync") |
|||
assert response.status_code == 200 |
|||
assert response.headers["content-type"] == "application/jsonl" |
|||
lines = [json.loads(line) for line in response.text.strip().splitlines()] |
|||
assert lines == [{"name": "bar"}] |
|||
@ -0,0 +1,88 @@ |
|||
""" |
|||
Test that async streaming endpoints can be cancelled without hanging. |
|||
|
|||
Ref: https://github.com/fastapi/fastapi/issues/14680 |
|||
""" |
|||
|
|||
from collections.abc import AsyncIterable |
|||
|
|||
import anyio |
|||
import pytest |
|||
from fastapi import FastAPI |
|||
from fastapi.responses import StreamingResponse |
|||
|
|||
pytestmark = [ |
|||
pytest.mark.anyio, |
|||
pytest.mark.filterwarnings("ignore::pytest.PytestUnraisableExceptionWarning"), |
|||
] |
|||
|
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
@app.get("/stream-raw", response_class=StreamingResponse) |
|||
async def stream_raw() -> AsyncIterable[str]: |
|||
"""Async generator with no internal await - would hang without checkpoint.""" |
|||
i = 0 |
|||
while True: |
|||
yield f"item {i}\n" |
|||
i += 1 |
|||
|
|||
|
|||
@app.get("/stream-jsonl") |
|||
async def stream_jsonl() -> AsyncIterable[int]: |
|||
"""JSONL async generator with no internal await.""" |
|||
i = 0 |
|||
while True: |
|||
yield i |
|||
i += 1 |
|||
|
|||
|
|||
async def _run_asgi_and_cancel(app: FastAPI, path: str, timeout: float) -> bool: |
|||
"""Call the ASGI app for *path* and cancel after *timeout* seconds. |
|||
|
|||
Returns `True` if the cancellation was delivered (i.e. it did not hang). |
|||
""" |
|||
chunks: list[bytes] = [] |
|||
|
|||
async def receive(): # type: ignore[no-untyped-def] |
|||
# Simulate a client that never disconnects, rely on cancellation |
|||
await anyio.sleep(float("inf")) |
|||
return {"type": "http.disconnect"} # pragma: no cover |
|||
|
|||
async def send(message: dict) -> None: # type: ignore[type-arg] |
|||
if message["type"] == "http.response.body": |
|||
chunks.append(message.get("body", b"")) |
|||
|
|||
scope = { |
|||
"type": "http", |
|||
"asgi": {"version": "3.0", "spec_version": "2.0"}, |
|||
"http_version": "1.1", |
|||
"method": "GET", |
|||
"path": path, |
|||
"query_string": b"", |
|||
"root_path": "", |
|||
"headers": [], |
|||
"server": ("test", 80), |
|||
} |
|||
|
|||
with anyio.move_on_after(timeout) as cancel_scope: |
|||
await app(scope, receive, send) # type: ignore[arg-type] |
|||
|
|||
# If we got here within the timeout the generator was cancellable. |
|||
# cancel_scope.cancelled_caught is True when move_on_after fired. |
|||
return cancel_scope.cancelled_caught or len(chunks) > 0 |
|||
|
|||
|
|||
async def test_raw_stream_cancellation() -> None: |
|||
"""Raw streaming endpoint should be cancellable within a reasonable time.""" |
|||
cancelled = await _run_asgi_and_cancel(app, "/stream-raw", timeout=3.0) |
|||
# The key assertion: we reached this line at all (didn't hang). |
|||
# cancelled will be True because the infinite generator was interrupted. |
|||
assert cancelled |
|||
|
|||
|
|||
async def test_jsonl_stream_cancellation() -> None: |
|||
"""JSONL streaming endpoint should be cancellable within a reasonable time.""" |
|||
cancelled = await _run_asgi_and_cancel(app, "/stream-jsonl", timeout=3.0) |
|||
assert cancelled |
|||
@ -0,0 +1,40 @@ |
|||
from collections.abc import AsyncIterable, Iterable |
|||
|
|||
import pytest |
|||
from fastapi import FastAPI |
|||
from fastapi.exceptions import ResponseValidationError |
|||
from fastapi.testclient import TestClient |
|||
from pydantic import BaseModel |
|||
|
|||
|
|||
class Item(BaseModel): |
|||
name: str |
|||
price: float |
|||
|
|||
|
|||
app = FastAPI() |
|||
|
|||
|
|||
@app.get("/items/stream-invalid") |
|||
async def stream_items_invalid() -> AsyncIterable[Item]: |
|||
yield {"name": "valid", "price": 1.0} |
|||
yield {"name": "invalid", "price": "not-a-number"} |
|||
|
|||
|
|||
@app.get("/items/stream-invalid-sync") |
|||
def stream_items_invalid_sync() -> Iterable[Item]: |
|||
yield {"name": "valid", "price": 1.0} |
|||
yield {"name": "invalid", "price": "not-a-number"} |
|||
|
|||
|
|||
client = TestClient(app) |
|||
|
|||
|
|||
def test_stream_json_validation_error_async(): |
|||
with pytest.raises(ResponseValidationError): |
|||
client.get("/items/stream-invalid") |
|||
|
|||
|
|||
def test_stream_json_validation_error_sync(): |
|||
with pytest.raises(ResponseValidationError): |
|||
client.get("/items/stream-invalid-sync") |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue