Zawwar Sami 2 weeks ago
committed by GitHub
parent
commit
d00125af27
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 10
      fastapi/sse.py
  2. 29
      tests/test_sse.py

10
fastapi/sse.py

@ -156,6 +156,12 @@ class ServerSentEvent(BaseModel):
return self return self
def _split_sse_lines(value: str) -> list[str]:
# Split on SSE-spec line terminators only (\n, \r\n, \r), preserving
# trailing empty strings.
return value.replace("\r\n", "\n").replace("\r", "\n").split("\n")
def format_sse_event( def format_sse_event(
*, *,
data_str: Annotated[ data_str: Annotated[
@ -206,14 +212,14 @@ def format_sse_event(
lines: list[str] = [] lines: list[str] = []
if comment is not None: if comment is not None:
for line in comment.splitlines(): for line in _split_sse_lines(comment):
lines.append(f": {line}") lines.append(f": {line}")
if event is not None: if event is not None:
lines.append(f"event: {event}") lines.append(f"event: {event}")
if data_str is not None: if data_str is not None:
for line in data_str.splitlines(): for line in _split_sse_lines(data_str):
lines.append(f"data: {line}") lines.append(f"data: {line}")
if id is not None: if id is not None:

29
tests/test_sse.py

@ -6,7 +6,7 @@ import fastapi.routing
import pytest import pytest
from fastapi import APIRouter, FastAPI from fastapi import APIRouter, FastAPI
from fastapi.responses import EventSourceResponse from fastapi.responses import EventSourceResponse
from fastapi.sse import ServerSentEvent from fastapi.sse import ServerSentEvent, format_sse_event
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
from pydantic import BaseModel from pydantic import BaseModel
@ -325,3 +325,30 @@ def test_no_keepalive_when_fast(client: TestClient):
assert response.status_code == 200 assert response.status_code == 200
# KEEPALIVE_COMMENT is ": ping\n\n". # KEEPALIVE_COMMENT is ": ping\n\n".
assert ": ping\n" not in response.text assert ": ping\n" not in response.text
@pytest.mark.parametrize(
("data", "expected_result"),
[
("Hello\n", b"data: Hello\ndata: \n\n"),
("Hello\n\n", b"data: Hello\ndata: \ndata: \n\n"),
("\n", b"data: \ndata: \n\n"),
("Hello\r\nWorld", b"data: Hello\ndata: World\n\n"),
("Hello\rWorld", b"data: Hello\ndata: World\n\n"),
("A\u2028B", "data: A\u2028B\n\n".encode()),
("A\vB", b"data: A\x0bB\n\n"),
],
)
def test_format_sse_event_splitlines_behavior_in_data(
data: str, expected_result: bytes
) -> None:
assert format_sse_event(data_str=data) == expected_result
def test_format_sse_event_splitlines_behavior_in_comment():
assert format_sse_event(comment="hi\n") == b": hi\n: \n\n"
def test_format_sse_event_keeps_empty_data_line():
payload = format_sse_event(data_str="")
assert payload == b"data: \n\n"

Loading…
Cancel
Save