"""Phase 2: SSE-Plumbing für den Compliance-Check. Queue + Generator laufen innerhalb eines asyncio.run (sonst bindet asyncio.Queue in Py3.9 an einen ggf. geschlossenen Loop). """ from __future__ import annotations import asyncio from compliance.api.agent_check import _sse def test_format_sse_line(): line = _sse._format_sse({"type": "topic", "topic": "impressum"}) assert line.startswith("data: ") and line.endswith("\n\n") assert '"impressum"' in line def test_emit_is_noop_without_queue(): # Kein new_queue → emit darf nicht crashen (best-effort). _sse.emit("does-not-exist-xyz", {"type": "topic"}) def test_emit_and_event_generator_streams_then_closes(): async def scenario(): cid = "sse-test-gen" _sse.new_queue(cid) _sse.emit(cid, {"type": "topic", "topic": "impressum", "output": {}}) _sse.emit(cid, {"type": "complete", "status": "completed"}) out: list[str] = [] async for line in _sse.event_generator(cid): out.append(line) if len(out) > 12: # safety break return out blob = "".join(asyncio.run(scenario())) assert '"type": "hello"' in blob assert '"topic": "impressum"' in blob assert '"type": "complete"' in blob assert '"type": "stream_close"' in blob