feat(agent): SSE — progressive Themen-Tabs (Phase 2)

Der Compliance-Check streamt jetzt progressive Events; der Impressum-Tab
erscheint, sobald das Thema fertig ist, statt am Ende alles auf einmal.
Additiv — das Polling fürs finale Ergebnis bleibt.

- backend: _sse.py (Queue/emit/event_generator) + Endpoint
  /compliance-check/{id}/stream; _update emittiert progress,
  run_agent_outputs emittiert topic (laeuft jetzt frueh nach Phase B),
  Orchestrator emittiert complete/error.
- frontend: SSE-Proxy-Route + EventSource in ComplianceCheckTab merged
  topic-Events in agent_outputs -> Tab erscheint progressiv.
- Tests: backend 5 passed (SSE + agent_outputs); tsc 0 neue Fehler,
  vitest 2 passed, check-loc 0.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-10 19:07:26 +02:00
parent e21984e0ad
commit 65de90114a
8 changed files with 246 additions and 5 deletions
@@ -0,0 +1,41 @@
/**
* Compliance-Check SSE-Proxy
* GET /api/sdk/v1/agent/compliance-check/{check_id}/stream
* → backend /api/compliance/agent/compliance-check/{check_id}/stream
*
* Reicht den text/event-stream-Body unmodifiziert durch (progressive
* topic-/progress-Events fürs Frontend). Additiv zum Polling.
*/
import { NextRequest, NextResponse } from 'next/server'
const BACKEND_URL =
process.env.BACKEND_API_URL || process.env.BACKEND_URL ||
'http://backend-compliance:8002'
export async function GET(
_request: NextRequest,
{ params }: { params: Promise<{ check_id: string }> },
) {
const { check_id } = await params
try {
const response = await fetch(
`${BACKEND_URL}/api/compliance/agent/compliance-check/${check_id}/stream`,
{ signal: AbortSignal.timeout(1_800_000) }, // 30 min
)
return new NextResponse(response.body, {
status: response.status,
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
},
})
} catch {
return NextResponse.json(
{ error: 'SSE-Stream zum Backend fehlgeschlagen' },
{ status: 503 },
)
}
}
@@ -1,6 +1,6 @@
'use client'
import React, { useState, useCallback } from 'react'
import React, { useState, useCallback, useRef } from 'react'
import { ComplianceResultTabs } from './ComplianceResultTabs'
import { DocumentRow } from './DocumentRow'
import { PreScanWizard, useScanContext, isContextComplete } from './PreScanWizard'
@@ -35,6 +35,9 @@ export function ComplianceCheckTab() {
if (typeof window === 'undefined') return []
try { return JSON.parse(localStorage.getItem(STORAGE_KEY_HISTORY) || '[]') } catch { return [] }
})
// SSE: progressive Themen-Tabs (additiv zum Polling).
const esRef = useRef<EventSource | null>(null)
React.useEffect(() => () => { try { esRef.current?.close() } catch { /* noop */ } }, [])
// Persist URLs and texts (not loading/error state)
React.useEffect(() => {
@@ -117,6 +120,38 @@ export function ComplianceCheckTab() {
reader.readAsText(file)
}, [updateDoc])
// SSE: füllt agent_outputs progressiv, sobald ein Thema fertig ist.
// Das Polling unten liefert weiterhin das finale Gesamtergebnis.
const openTopicStream = useCallback((checkId: string) => {
try { esRef.current?.close() } catch { /* noop */ }
const partial: any = { results: [], agent_outputs: {} }
const es = new EventSource(
`/api/sdk/v1/agent/compliance-check/${checkId}/stream`,
)
esRef.current = es
es.onmessage = (ev) => {
try {
const data = JSON.parse(ev.data)
if (data.type === 'topic' && data.topic && data.output) {
partial.agent_outputs = {
...partial.agent_outputs, [data.topic]: data.output,
}
setResults((prev: any) =>
(prev && Array.isArray(prev.results) && prev.results.length > 0)
? prev // finales Ergebnis schon da → behalten
: { ...partial },
)
} else if (data.type === 'progress') {
if (data.msg) setProgress(data.msg)
if (typeof data.pct === 'number') setProgressPct(data.pct)
} else if (data.type === 'complete' || data.type === 'stream_close') {
try { es.close() } catch { /* noop */ }
}
} catch { /* noop */ }
}
es.onerror = () => { try { es.close() } catch { /* noop */ } }
}, [])
const filledCount = Object.values(docs).filter(d => d.url.trim() || d.text.trim()).length
const handleSubmit = async () => {
@@ -157,6 +192,7 @@ export function ComplianceCheckTab() {
if (!check_id) throw new Error('Keine Check-ID erhalten')
setActiveCheckId(check_id)
localStorage.setItem(STORAGE_KEY_CHECK_ID, check_id)
openTopicStream(check_id)
// Poll for results (max 25 min = 500 polls x 3s)
let attempts = 0
@@ -201,6 +237,7 @@ export function ComplianceCheckTab() {
setError(e instanceof Error ? e.message : 'Unbekannter Fehler')
setProgress('')
setProgressPct(0)
try { esRef.current?.close() } catch { /* noop */ }
} finally {
setLoading(false)
}
@@ -17,6 +17,8 @@ import logging
from compliance.services.specialist_agents import REGISTRY, AgentInput
from ._sse import emit
logger = logging.getLogger(__name__)
# topic key (matches state["doc_texts"]) -> registered agent_id
@@ -43,7 +45,9 @@ def _derive_scope(profile_dict: dict) -> list[str]:
async def run_agent_outputs(state: dict) -> None:
"""Für jedes Topic mit registriertem v3-Agent + ausreichend Text:
Agent laufen lassen und den strukturierten AgentOutput ablegen."""
Agent laufen lassen, AgentOutput ablegen + als SSE topic-Event
emittieren (Tab füllt sich progressiv)."""
check_id = state.get("check_id", "")
doc_texts = state.get("doc_texts") or {}
profile_dict = state.get("profile_dict") or {}
req = state.get("req")
@@ -75,6 +79,8 @@ async def run_agent_outputs(state: dict) -> None:
origin_domain=origin_domain,
))
outputs[topic] = out.model_dump(mode="json")
emit(check_id, {"type": "topic", "topic": topic,
"output": outputs[topic]})
logger.info(
"agent_outputs[%s]: %d findings, confidence %.2f",
topic, len(out.findings), out.confidence,
@@ -16,6 +16,7 @@ from ._constants import (
_DOC_TYPE_LABELS,
_compliance_check_jobs,
)
from ._sse import emit
logger = logging.getLogger(__name__)
@@ -26,6 +27,8 @@ def _update(check_id: str, msg: str, pct: int | None = None) -> None:
job["progress"] = msg
if pct is not None:
job["progress_pct"] = max(0, min(100, int(pct)))
emit(check_id, {"type": "progress", "msg": msg,
"pct": job.get("progress_pct", 0)})
def _doc_type_label(doc_type: str) -> str:
@@ -34,6 +34,7 @@ from ._b19_wiring import run_b19
from ._b20_wiring import run_b20
from ._b22_wiring import run_b22
from ._constants import _compliance_check_jobs
from ._sse import emit
from ._phase_a_resolve import run_phase_a
from ._phase_b_profile_check import run_phase_b
from ._phase_c_banner import run_phase_c
@@ -71,6 +72,10 @@ async def run_compliance_check(check_id: str, req) -> None:
logger.warning("chatbot-policy enrichment skipped: %s", e)
# Phase B: Step 2 (profile detect) + Step 3 (per-doc checks)
await run_phase_b(state)
# Strukturierter v3-AgentOutput pro Thema — früh (Impressum-Text +
# Profil liegen vor) → SSE topic-Event, Tab erscheint progressiv,
# während Banner/Vendor/B-Wirings noch laufen. Additiv zu B18.
await run_agent_outputs(state)
# Phase C: Step 3b-d (banner + cross-check + TCF) + Step 4
await run_phase_c(state)
# Phase C-2: optional browser-matrix scan (env BROWSER_MATRIX=true)
@@ -96,9 +101,6 @@ async def run_compliance_check(check_id: str, req) -> None:
run_b16(state) # Footer-Label-vs-URL-Slug-Drift
await run_b17(state) # Audit-Walk-Video (Beweis-Aufzeichnung)
await run_b18(state) # Impressum-Specialist-Agent (Pattern+LLM)
# Strukturierter v3-AgentOutput pro Thema → standardisierte
# Ergebnis-Tabs im Frontend (additiv zu B18-HTML).
await run_agent_outputs(state)
run_b19(state) # Cookie-Coherence (Salesforce-as-essential)
await run_b20(state) # Legacy-URL-Discovery (Sitemap+Wayback)
run_b22(state) # Cross-Domain-Legal-Doc-Hosting (Elli/LogPay)
@@ -110,8 +112,10 @@ async def run_compliance_check(check_id: str, req) -> None:
run_phase_e(state)
# Phase F: Step 7 persist + audit log + unified findings
run_phase_f(state)
emit(check_id, {"type": "complete", "status": "completed"})
except Exception as e:
logger.error("Compliance check %s failed: %s",
check_id, e, exc_info=True)
_compliance_check_jobs[check_id]["status"] = "failed"
_compliance_check_jobs[check_id]["error"] = str(e)[:500]
emit(check_id, {"type": "error", "error": str(e)[:300]})
@@ -0,0 +1,82 @@
"""SSE-Plumbing für den Compliance-Check — pro check_id eine Event-Queue
+ Generator. Spiegelt das Agent-Test-SSE (specialist_agent_routes).
ADDITIV: Das Polling auf GET /compliance-check/{check_id} bleibt die
Wahrheit fürs finale Ergebnis. SSE liefert nur **progressive** Events,
damit sich die Themen-Tabs füllen, sobald ein Thema fertig ist:
- {type:"progress", msg, pct} (aus _update)
- {type:"topic", topic, output} (aus run_agent_outputs, pro Thema)
- {type:"complete", status} (Orchestrator-Ende)
Geht ein Event verloren (Queue voll / kein Client) ist das unkritisch —
der Tab kommt spätestens mit dem finalen Poll.
"""
from __future__ import annotations
import asyncio
import json
import logging
from collections.abc import AsyncGenerator
from ._constants import _compliance_check_jobs
logger = logging.getLogger(__name__)
# In-memory Event-Queues pro check_id. Restart-fragil, aber für einen
# Live-Stream ausreichend (Polling ist der persistente Pfad).
_check_queues: dict[str, "asyncio.Queue[dict]"] = {}
_TERMINAL_JOB_STATES = ("completed", "failed", "skipped_tdm")
def new_queue(check_id: str) -> None:
"""Legt die Event-Queue für einen Check an (in POST /compliance-check)."""
_check_queues[check_id] = asyncio.Queue(maxsize=500)
def emit(check_id: str, event: dict) -> None:
"""Non-blocking best-effort push. Synchron, damit auch das synchrone
_update() emittieren kann."""
q = _check_queues.get(check_id)
if q is None:
return
try:
q.put_nowait(event)
except asyncio.QueueFull:
pass # Client zu langsam — Poll holt den Stand nach
def _format_sse(payload: dict) -> str:
return f"data: {json.dumps(payload, default=str)}\n\n"
async def event_generator(check_id: str) -> AsyncGenerator[str, None]:
"""Draint die Queue bis der Check terminal ist. Heartbeat alle 25s."""
q = _check_queues.get(check_id)
if q is None:
# Check evtl. schon fertig (Queue aufgeräumt) → Client soll pollen.
yield _format_sse({"type": "stream_close", "reason": "no_queue"})
return
yield _format_sse({"type": "hello", "check_id": check_id})
try:
while True:
try:
event = await asyncio.wait_for(q.get(), timeout=25.0)
except asyncio.TimeoutError:
yield _format_sse({"type": "heartbeat"})
job = _compliance_check_jobs.get(check_id) or {}
if job.get("status") in _TERMINAL_JOB_STATES:
yield _format_sse({"type": "complete",
"status": job.get("status")})
yield _format_sse({"type": "stream_close"})
return
continue
yield _format_sse(event)
if event.get("type") in ("complete", "error"):
yield _format_sse({"type": "stream_close"})
return
finally:
# Queue erst nach 5 Min freigeben (späte Reconnects).
asyncio.get_event_loop().call_later(
300, lambda: _check_queues.pop(check_id, None),
)
@@ -30,6 +30,7 @@ import uuid as _uuid
import httpx
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
# ── Re-exports: external callers import these from THIS module ──────
from .agent_check._constants import ( # noqa: F401
@@ -63,6 +64,7 @@ from .agent_check._schemas import (
ExtractTextRequest,
)
from .agent_check._single_check import _check_single # noqa: F401
from .agent_check._sse import event_generator, new_queue
logger = logging.getLogger(__name__)
@@ -137,6 +139,7 @@ async def start_compliance_check(req: ComplianceCheckRequest):
"result": None,
"error": "",
}
new_queue(check_id) # SSE: progressive topic-Events fürs Frontend
asyncio.create_task(_run_compliance_check(check_id, req))
return ComplianceCheckStartResponse(check_id=check_id, status="running")
@@ -157,6 +160,21 @@ async def get_compliance_check_status(check_id: str):
)
@router.get("/compliance-check/{check_id}/stream")
async def stream_compliance_check(check_id: str) -> StreamingResponse:
"""SSE-Stream: progressive Events (progress/topic/complete) eines
laufenden Checks. Additiv zum Polling auf /compliance-check/{check_id}."""
return StreamingResponse(
event_generator(check_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # nginx: nicht puffern
"Connection": "keep-alive",
},
)
# ── P80: Snapshot + Replay ───────────────────────────────────────────
@router.get("/snapshots")
@@ -0,0 +1,50 @@
"""Phase 2: SSE-Plumbing für den Compliance-Check.
Deckt emit (Queue-Push), _format_sse (SSE-Zeilenformat) und den
event_generator (hello → Events → stream_close bei 'complete') ab.
"""
from __future__ import annotations
import asyncio
from compliance.api.agent_check import _sse
def test_emit_pushes_and_format():
cid = "sse-test-1"
_sse.new_queue(cid)
_sse.emit(cid, {"type": "topic", "topic": "impressum", "output": {"x": 1}})
q = _sse._check_queues[cid]
assert q.qsize() == 1
ev = q.get_nowait()
assert ev["type"] == "topic" and ev["topic"] == "impressum"
line = _sse._format_sse(ev)
assert line.startswith("data: ") and line.endswith("\n\n")
assert '"impressum"' in line
def test_emit_is_noop_without_queue():
# Kein new_queue → emit darf nicht crashen (best-effort).
_sse.emit("does-not-exist-xyz", {"type": "topic"})
def test_event_generator_streams_topic_then_closes_on_complete():
cid = "sse-test-gen"
_sse.new_queue(cid)
_sse.emit(cid, {"type": "topic", "topic": "impressum", "output": {}})
_sse.emit(cid, {"type": "complete", "status": "completed"})
async def collect():
out = []
async for line in _sse.event_generator(cid):
out.append(line)
if len(out) > 12: # safety
break
return out
blob = "".join(asyncio.run(collect()))
assert '"type": "hello"' in blob
assert '"topic": "impressum"' in blob
assert '"type": "complete"' in blob
assert '"type": "stream_close"' in blob