Files
breakpilot-compliance/backend-compliance/compliance/api/agent_check/_sse.py
T
Benjamin Admin 65de90114a feat(agent): SSE — progressive Themen-Tabs (Phase 2)
Der Compliance-Check streamt jetzt progressive Events; der Impressum-Tab
erscheint, sobald das Thema fertig ist, statt am Ende alles auf einmal.
Additiv — das Polling fürs finale Ergebnis bleibt.

- backend: _sse.py (Queue/emit/event_generator) + Endpoint
  /compliance-check/{id}/stream; _update emittiert progress,
  run_agent_outputs emittiert topic (laeuft jetzt frueh nach Phase B),
  Orchestrator emittiert complete/error.
- frontend: SSE-Proxy-Route + EventSource in ComplianceCheckTab merged
  topic-Events in agent_outputs -> Tab erscheint progressiv.
- Tests: backend 5 passed (SSE + agent_outputs); tsc 0 neue Fehler,
  vitest 2 passed, check-loc 0.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-10 19:07:26 +02:00

83 lines
3.1 KiB
Python

"""SSE-Plumbing für den Compliance-Check — pro check_id eine Event-Queue
+ Generator. Spiegelt das Agent-Test-SSE (specialist_agent_routes).
ADDITIV: Das Polling auf GET /compliance-check/{check_id} bleibt die
Wahrheit fürs finale Ergebnis. SSE liefert nur **progressive** Events,
damit sich die Themen-Tabs füllen, sobald ein Thema fertig ist:
- {type:"progress", msg, pct} (aus _update)
- {type:"topic", topic, output} (aus run_agent_outputs, pro Thema)
- {type:"complete", status} (Orchestrator-Ende)
Geht ein Event verloren (Queue voll / kein Client) ist das unkritisch —
der Tab kommt spätestens mit dem finalen Poll.
"""
from __future__ import annotations
import asyncio
import json
import logging
from collections.abc import AsyncGenerator
from ._constants import _compliance_check_jobs
logger = logging.getLogger(__name__)
# In-memory Event-Queues pro check_id. Restart-fragil, aber für einen
# Live-Stream ausreichend (Polling ist der persistente Pfad).
_check_queues: dict[str, "asyncio.Queue[dict]"] = {}
_TERMINAL_JOB_STATES = ("completed", "failed", "skipped_tdm")
def new_queue(check_id: str) -> None:
"""Legt die Event-Queue für einen Check an (in POST /compliance-check)."""
_check_queues[check_id] = asyncio.Queue(maxsize=500)
def emit(check_id: str, event: dict) -> None:
"""Non-blocking best-effort push. Synchron, damit auch das synchrone
_update() emittieren kann."""
q = _check_queues.get(check_id)
if q is None:
return
try:
q.put_nowait(event)
except asyncio.QueueFull:
pass # Client zu langsam — Poll holt den Stand nach
def _format_sse(payload: dict) -> str:
return f"data: {json.dumps(payload, default=str)}\n\n"
async def event_generator(check_id: str) -> AsyncGenerator[str, None]:
"""Draint die Queue bis der Check terminal ist. Heartbeat alle 25s."""
q = _check_queues.get(check_id)
if q is None:
# Check evtl. schon fertig (Queue aufgeräumt) → Client soll pollen.
yield _format_sse({"type": "stream_close", "reason": "no_queue"})
return
yield _format_sse({"type": "hello", "check_id": check_id})
try:
while True:
try:
event = await asyncio.wait_for(q.get(), timeout=25.0)
except asyncio.TimeoutError:
yield _format_sse({"type": "heartbeat"})
job = _compliance_check_jobs.get(check_id) or {}
if job.get("status") in _TERMINAL_JOB_STATES:
yield _format_sse({"type": "complete",
"status": job.get("status")})
yield _format_sse({"type": "stream_close"})
return
continue
yield _format_sse(event)
if event.get("type") in ("complete", "error"):
yield _format_sse({"type": "stream_close"})
return
finally:
# Queue erst nach 5 Min freigeben (späte Reconnects).
asyncio.get_event_loop().call_later(
300, lambda: _check_queues.pop(check_id, None),
)