diff --git a/fastapi/sse.py b/fastapi/sse.py index 1e2bd86171..642ff77340 100644 --- a/fastapi/sse.py +++ b/fastapi/sse.py @@ -156,6 +156,12 @@ class ServerSentEvent(BaseModel): return self +def _split_sse_lines(value: str) -> list[str]: + # Split on SSE-spec line terminators only (\n, \r\n, \r), preserving + # trailing empty strings. + return value.replace("\r\n", "\n").replace("\r", "\n").split("\n") + + def format_sse_event( *, data_str: Annotated[ @@ -206,14 +212,14 @@ def format_sse_event( lines: list[str] = [] if comment is not None: - for line in comment.splitlines(): + for line in _split_sse_lines(comment): lines.append(f": {line}") if event is not None: lines.append(f"event: {event}") if data_str is not None: - for line in data_str.splitlines(): + for line in _split_sse_lines(data_str): lines.append(f"data: {line}") if id is not None: diff --git a/tests/test_sse.py b/tests/test_sse.py index 86a67f8f9f..c70c5b8d5c 100644 --- a/tests/test_sse.py +++ b/tests/test_sse.py @@ -6,7 +6,7 @@ import fastapi.routing import pytest from fastapi import APIRouter, FastAPI from fastapi.responses import EventSourceResponse -from fastapi.sse import ServerSentEvent +from fastapi.sse import ServerSentEvent, format_sse_event from fastapi.testclient import TestClient from pydantic import BaseModel @@ -325,3 +325,30 @@ def test_no_keepalive_when_fast(client: TestClient): assert response.status_code == 200 # KEEPALIVE_COMMENT is ": ping\n\n". assert ": ping\n" not in response.text + + +@pytest.mark.parametrize( + ("data", "expected_result"), + [ + ("Hello\n", b"data: Hello\ndata: \n\n"), + ("Hello\n\n", b"data: Hello\ndata: \ndata: \n\n"), + ("\n", b"data: \ndata: \n\n"), + ("Hello\r\nWorld", b"data: Hello\ndata: World\n\n"), + ("Hello\rWorld", b"data: Hello\ndata: World\n\n"), + ("A\u2028B", "data: A\u2028B\n\n".encode()), + ("A\vB", b"data: A\x0bB\n\n"), + ], +) +def test_format_sse_event_splitlines_behavior_in_data( + data: str, expected_result: bytes +) -> None: + assert format_sse_event(data_str=data) == expected_result + + +def test_format_sse_event_splitlines_behavior_in_comment(): + assert format_sse_event(comment="hi\n") == b": hi\n: \n\n" + + +def test_format_sse_event_keeps_empty_data_line(): + payload = format_sse_event(data_str="") + assert payload == b"data: \n\n"