Browse Source

Merge e6f376dacb into 460f8d2cc8

pull/15077/merge
Alexander Rauhut 13 hours ago
committed by GitHub
parent
commit
f21f53d469
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 6
      fastapi/routing.py
  2. 142
      tests/test_sse.py
  3. 33
      tests/test_stream_bare_type.py

6
fastapi/routing.py

@ -840,10 +840,11 @@ class APIRoute(routing.Route):
generate_unique_id_function: Callable[["APIRoute"], str]
| DefaultPlaceholder = Default(generate_unique_id),
strict_content_type: bool | DefaultPlaceholder = Default(True),
stream_item_type: Any | None = None,
) -> None:
self.path = path
self.endpoint = endpoint
self.stream_item_type: Any | None = None
self.stream_item_type: Any | None = stream_item_type
if isinstance(response_model, DefaultPlaceholder):
return_annotation = get_typed_return_annotation(endpoint)
if lenient_issubclass(return_annotation, Response):
@ -1364,6 +1365,7 @@ class APIRouter(routing.Router):
generate_unique_id_function: Callable[[APIRoute], str]
| DefaultPlaceholder = Default(generate_unique_id),
strict_content_type: bool | DefaultPlaceholder = Default(True),
stream_item_type: Any | None = None,
) -> None:
route_class = route_class_override or self.route_class
responses = responses or {}
@ -1413,6 +1415,7 @@ class APIRouter(routing.Router):
strict_content_type=get_value_or_default(
strict_content_type, self.strict_content_type
),
stream_item_type=stream_item_type,
)
self.routes.append(route)
@ -1793,6 +1796,7 @@ class APIRouter(routing.Router):
router.strict_content_type,
self.strict_content_type,
),
stream_item_type=route.stream_item_type,
)
elif isinstance(route, routing.Route):
methods = list(route.methods or [])

142
tests/test_sse.py

@ -64,7 +64,8 @@ async def sse_items_event():
@app.get("/items/stream-mixed", response_class=EventSourceResponse)
async def sse_items_mixed() -> AsyncIterable[Item]:
yield items[0]
for item in items:
yield item
yield ServerSentEvent(data="custom-event", event="special")
yield items[1]
@ -96,6 +97,12 @@ async def stream_events():
yield {"msg": "world"}
@router.get("/events-typed", response_class=EventSourceResponse)
async def stream_events_typed() -> AsyncIterable[Item]:
for item in items:
yield item
app.include_router(router, prefix="/api")
@ -274,6 +281,45 @@ def test_sse_on_router_included_in_app(client: TestClient):
assert len(data_lines) == 2
def test_sse_router_typed_stream(client: TestClient):
response = client.get("/api/events-typed")
assert response.status_code == 200
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
data_lines = [
line for line in response.text.strip().split("\n") if line.startswith("data: ")
]
assert len(data_lines) == 3
def test_sse_router_typed_openapi_schema(client: TestClient):
"""Typed SSE endpoint on a router should preserve itemSchema with contentSchema."""
response = client.get("/openapi.json")
assert response.status_code == 200
paths = response.json()["paths"]
sse_response = paths["/api/events-typed"]["get"]["responses"]["200"]
assert sse_response == {
"description": "Successful Response",
"content": {
"text/event-stream": {
"itemSchema": {
"type": "object",
"properties": {
"data": {
"type": "string",
"contentMediaType": "application/json",
"contentSchema": {"$ref": "#/components/schemas/Item"},
},
"event": {"type": "string"},
"id": {"type": "string"},
"retry": {"type": "integer", "minimum": 0},
},
"required": ["data"],
}
}
},
}
# Keepalive ping tests
@ -325,3 +371,97 @@ def test_no_keepalive_when_fast(client: TestClient):
assert response.status_code == 200
# KEEPALIVE_COMMENT is ": ping\n\n".
assert ": ping\n" not in response.text
# default_response_class tests
sse_schema_response = {
"description": "Successful Response",
"content": {
"text/event-stream": {
"itemSchema": {
"type": "object",
"properties": {
"data": {
"type": "string",
"contentMediaType": "application/json",
"contentSchema": {"$ref": "#/components/schemas/Item"},
},
"event": {"type": "string"},
"id": {"type": "string"},
"retry": {"type": "integer", "minimum": 0},
},
"required": ["data"],
}
}
},
}
# default_response_class on app
default_app_app = FastAPI(default_response_class=EventSourceResponse)
default_app_router = APIRouter()
@default_app_router.get("/stream")
async def default_app_stream() -> AsyncIterable[Item]:
for item in items:
yield item
default_app_app.include_router(default_app_router, prefix="/api")
def test_default_response_class_on_app_stream():
with TestClient(default_app_app) as client:
response = client.get("/api/stream")
assert response.status_code == 200
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
data_lines = [
line for line in response.text.strip().split("\n") if line.startswith("data: ")
]
assert len(data_lines) == 3
def test_default_response_class_on_app_openapi_schema():
assert (
default_app_app.openapi()["paths"]["/api/stream"]["get"]["responses"]["200"]
== sse_schema_response
)
# default_response_class on parent router
default_parent_app = FastAPI()
parent_router = APIRouter(default_response_class=EventSourceResponse)
child_router = APIRouter()
@child_router.get("/stream")
async def default_parent_stream() -> AsyncIterable[Item]:
for item in items:
yield item
parent_router.include_router(child_router)
default_parent_app.include_router(parent_router, prefix="/api")
def test_default_response_class_on_parent_router_stream():
with TestClient(default_parent_app) as client:
response = client.get("/api/stream")
assert response.status_code == 200
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
data_lines = [
line for line in response.text.strip().split("\n") if line.startswith("data: ")
]
assert len(data_lines) == 3
def test_default_response_class_on_parent_router_openapi_schema():
assert (
default_parent_app.openapi()["paths"]["/api/stream"]["get"]["responses"]["200"]
== sse_schema_response
)

33
tests/test_stream_bare_type.py

@ -1,7 +1,7 @@
import json
from typing import AsyncIterable, Iterable # noqa: UP035 to test coverage
from fastapi import FastAPI
from fastapi import APIRouter, FastAPI
from fastapi.testclient import TestClient
from pydantic import BaseModel
@ -23,6 +23,16 @@ def stream_bare_sync() -> Iterable:
yield {"name": "bar"}
router = APIRouter()
@router.get("/events-jsonl")
async def stream_events_jsonl() -> AsyncIterable[Item]:
yield Item(name="foo")
app.include_router(router, prefix="/api")
client = TestClient(app)
@ -40,3 +50,24 @@ def test_stream_bare_sync_iterable():
assert response.headers["content-type"] == "application/jsonl"
lines = [json.loads(line) for line in response.text.strip().splitlines()]
assert lines == [{"name": "bar"}]
def test_jsonl_router_typed_stream():
response = client.get("/api/events-jsonl")
assert response.status_code == 200
assert response.headers["content-type"] == "application/jsonl"
lines = [json.loads(line) for line in response.text.strip().splitlines()]
assert lines == [{"name": "foo"}]
def test_jsonl_router_typed_openapi_schema():
response = client.get("/openapi.json")
assert response.status_code == 200
paths = response.json()["paths"]
jsonl_response = paths["/api/events-jsonl"]["get"]["responses"]["200"]
assert jsonl_response == {
"description": "Successful Response",
"content": {
"application/jsonl": {"itemSchema": {"$ref": "#/components/schemas/Item"}}
},
}

Loading…
Cancel
Save