From 65de90114ac570376800615f5c755db1274ca0c2 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Wed, 10 Jun 2026 19:07:26 +0200 Subject: [PATCH] =?UTF-8?q?feat(agent):=20SSE=20=E2=80=94=20progressive=20?= =?UTF-8?q?Themen-Tabs=20(Phase=202)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Der Compliance-Check streamt jetzt progressive Events; der Impressum-Tab erscheint, sobald das Thema fertig ist, statt am Ende alles auf einmal. Additiv — das Polling fürs finale Ergebnis bleibt. - backend: _sse.py (Queue/emit/event_generator) + Endpoint /compliance-check/{id}/stream; _update emittiert progress, run_agent_outputs emittiert topic (laeuft jetzt frueh nach Phase B), Orchestrator emittiert complete/error. - frontend: SSE-Proxy-Route + EventSource in ComplianceCheckTab merged topic-Events in agent_outputs -> Tab erscheint progressiv. - Tests: backend 5 passed (SSE + agent_outputs); tsc 0 neue Fehler, vitest 2 passed, check-loc 0. Co-Authored-By: Claude Opus 4.7 --- .../[check_id]/stream/route.ts | 41 ++++++++++ .../agent/_components/ComplianceCheckTab.tsx | 39 ++++++++- .../api/agent_check/_agent_outputs.py | 8 +- .../compliance/api/agent_check/_helpers.py | 3 + .../api/agent_check/_orchestrator.py | 10 ++- .../compliance/api/agent_check/_sse.py | 82 +++++++++++++++++++ .../api/agent_compliance_check_routes.py | 18 ++++ .../tests/test_sse_compliance_check.py | 50 +++++++++++ 8 files changed, 246 insertions(+), 5 deletions(-) create mode 100644 admin-compliance/app/api/sdk/v1/agent/compliance-check/[check_id]/stream/route.ts create mode 100644 backend-compliance/compliance/api/agent_check/_sse.py create mode 100644 backend-compliance/compliance/tests/test_sse_compliance_check.py diff --git a/admin-compliance/app/api/sdk/v1/agent/compliance-check/[check_id]/stream/route.ts b/admin-compliance/app/api/sdk/v1/agent/compliance-check/[check_id]/stream/route.ts new file mode 100644 index 00000000..3e0beb3e --- /dev/null +++ b/admin-compliance/app/api/sdk/v1/agent/compliance-check/[check_id]/stream/route.ts @@ -0,0 +1,41 @@ +/** + * Compliance-Check SSE-Proxy + * GET /api/sdk/v1/agent/compliance-check/{check_id}/stream + * → backend /api/compliance/agent/compliance-check/{check_id}/stream + * + * Reicht den text/event-stream-Body unmodifiziert durch (progressive + * topic-/progress-Events fürs Frontend). Additiv zum Polling. + */ + +import { NextRequest, NextResponse } from 'next/server' + +const BACKEND_URL = + process.env.BACKEND_API_URL || process.env.BACKEND_URL || + 'http://backend-compliance:8002' + +export async function GET( + _request: NextRequest, + { params }: { params: Promise<{ check_id: string }> }, +) { + const { check_id } = await params + try { + const response = await fetch( + `${BACKEND_URL}/api/compliance/agent/compliance-check/${check_id}/stream`, + { signal: AbortSignal.timeout(1_800_000) }, // 30 min + ) + return new NextResponse(response.body, { + status: response.status, + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'X-Accel-Buffering': 'no', + }, + }) + } catch { + return NextResponse.json( + { error: 'SSE-Stream zum Backend fehlgeschlagen' }, + { status: 503 }, + ) + } +} diff --git a/admin-compliance/app/sdk/agent/_components/ComplianceCheckTab.tsx b/admin-compliance/app/sdk/agent/_components/ComplianceCheckTab.tsx index c8d8e6f7..63844945 100644 --- a/admin-compliance/app/sdk/agent/_components/ComplianceCheckTab.tsx +++ b/admin-compliance/app/sdk/agent/_components/ComplianceCheckTab.tsx @@ -1,6 +1,6 @@ 'use client' -import React, { useState, useCallback } from 'react' +import React, { useState, useCallback, useRef } from 'react' import { ComplianceResultTabs } from './ComplianceResultTabs' import { DocumentRow } from './DocumentRow' import { PreScanWizard, useScanContext, isContextComplete } from './PreScanWizard' @@ -35,6 +35,9 @@ export function ComplianceCheckTab() { if (typeof window === 'undefined') return [] try { return JSON.parse(localStorage.getItem(STORAGE_KEY_HISTORY) || '[]') } catch { return [] } }) + // SSE: progressive Themen-Tabs (additiv zum Polling). + const esRef = useRef(null) + React.useEffect(() => () => { try { esRef.current?.close() } catch { /* noop */ } }, []) // Persist URLs and texts (not loading/error state) React.useEffect(() => { @@ -117,6 +120,38 @@ export function ComplianceCheckTab() { reader.readAsText(file) }, [updateDoc]) + // SSE: füllt agent_outputs progressiv, sobald ein Thema fertig ist. + // Das Polling unten liefert weiterhin das finale Gesamtergebnis. + const openTopicStream = useCallback((checkId: string) => { + try { esRef.current?.close() } catch { /* noop */ } + const partial: any = { results: [], agent_outputs: {} } + const es = new EventSource( + `/api/sdk/v1/agent/compliance-check/${checkId}/stream`, + ) + esRef.current = es + es.onmessage = (ev) => { + try { + const data = JSON.parse(ev.data) + if (data.type === 'topic' && data.topic && data.output) { + partial.agent_outputs = { + ...partial.agent_outputs, [data.topic]: data.output, + } + setResults((prev: any) => + (prev && Array.isArray(prev.results) && prev.results.length > 0) + ? prev // finales Ergebnis schon da → behalten + : { ...partial }, + ) + } else if (data.type === 'progress') { + if (data.msg) setProgress(data.msg) + if (typeof data.pct === 'number') setProgressPct(data.pct) + } else if (data.type === 'complete' || data.type === 'stream_close') { + try { es.close() } catch { /* noop */ } + } + } catch { /* noop */ } + } + es.onerror = () => { try { es.close() } catch { /* noop */ } } + }, []) + const filledCount = Object.values(docs).filter(d => d.url.trim() || d.text.trim()).length const handleSubmit = async () => { @@ -157,6 +192,7 @@ export function ComplianceCheckTab() { if (!check_id) throw new Error('Keine Check-ID erhalten') setActiveCheckId(check_id) localStorage.setItem(STORAGE_KEY_CHECK_ID, check_id) + openTopicStream(check_id) // Poll for results (max 25 min = 500 polls x 3s) let attempts = 0 @@ -201,6 +237,7 @@ export function ComplianceCheckTab() { setError(e instanceof Error ? e.message : 'Unbekannter Fehler') setProgress('') setProgressPct(0) + try { esRef.current?.close() } catch { /* noop */ } } finally { setLoading(false) } diff --git a/backend-compliance/compliance/api/agent_check/_agent_outputs.py b/backend-compliance/compliance/api/agent_check/_agent_outputs.py index 45dba02b..1fd82d1b 100644 --- a/backend-compliance/compliance/api/agent_check/_agent_outputs.py +++ b/backend-compliance/compliance/api/agent_check/_agent_outputs.py @@ -17,6 +17,8 @@ import logging from compliance.services.specialist_agents import REGISTRY, AgentInput +from ._sse import emit + logger = logging.getLogger(__name__) # topic key (matches state["doc_texts"]) -> registered agent_id @@ -43,7 +45,9 @@ def _derive_scope(profile_dict: dict) -> list[str]: async def run_agent_outputs(state: dict) -> None: """Für jedes Topic mit registriertem v3-Agent + ausreichend Text: - Agent laufen lassen und den strukturierten AgentOutput ablegen.""" + Agent laufen lassen, AgentOutput ablegen + als SSE topic-Event + emittieren (Tab füllt sich progressiv).""" + check_id = state.get("check_id", "") doc_texts = state.get("doc_texts") or {} profile_dict = state.get("profile_dict") or {} req = state.get("req") @@ -75,6 +79,8 @@ async def run_agent_outputs(state: dict) -> None: origin_domain=origin_domain, )) outputs[topic] = out.model_dump(mode="json") + emit(check_id, {"type": "topic", "topic": topic, + "output": outputs[topic]}) logger.info( "agent_outputs[%s]: %d findings, confidence %.2f", topic, len(out.findings), out.confidence, diff --git a/backend-compliance/compliance/api/agent_check/_helpers.py b/backend-compliance/compliance/api/agent_check/_helpers.py index 4c8d5d28..eec40136 100644 --- a/backend-compliance/compliance/api/agent_check/_helpers.py +++ b/backend-compliance/compliance/api/agent_check/_helpers.py @@ -16,6 +16,7 @@ from ._constants import ( _DOC_TYPE_LABELS, _compliance_check_jobs, ) +from ._sse import emit logger = logging.getLogger(__name__) @@ -26,6 +27,8 @@ def _update(check_id: str, msg: str, pct: int | None = None) -> None: job["progress"] = msg if pct is not None: job["progress_pct"] = max(0, min(100, int(pct))) + emit(check_id, {"type": "progress", "msg": msg, + "pct": job.get("progress_pct", 0)}) def _doc_type_label(doc_type: str) -> str: diff --git a/backend-compliance/compliance/api/agent_check/_orchestrator.py b/backend-compliance/compliance/api/agent_check/_orchestrator.py index bb007897..65003588 100644 --- a/backend-compliance/compliance/api/agent_check/_orchestrator.py +++ b/backend-compliance/compliance/api/agent_check/_orchestrator.py @@ -34,6 +34,7 @@ from ._b19_wiring import run_b19 from ._b20_wiring import run_b20 from ._b22_wiring import run_b22 from ._constants import _compliance_check_jobs +from ._sse import emit from ._phase_a_resolve import run_phase_a from ._phase_b_profile_check import run_phase_b from ._phase_c_banner import run_phase_c @@ -71,6 +72,10 @@ async def run_compliance_check(check_id: str, req) -> None: logger.warning("chatbot-policy enrichment skipped: %s", e) # Phase B: Step 2 (profile detect) + Step 3 (per-doc checks) await run_phase_b(state) + # Strukturierter v3-AgentOutput pro Thema — früh (Impressum-Text + + # Profil liegen vor) → SSE topic-Event, Tab erscheint progressiv, + # während Banner/Vendor/B-Wirings noch laufen. Additiv zu B18. + await run_agent_outputs(state) # Phase C: Step 3b-d (banner + cross-check + TCF) + Step 4 await run_phase_c(state) # Phase C-2: optional browser-matrix scan (env BROWSER_MATRIX=true) @@ -96,9 +101,6 @@ async def run_compliance_check(check_id: str, req) -> None: run_b16(state) # Footer-Label-vs-URL-Slug-Drift await run_b17(state) # Audit-Walk-Video (Beweis-Aufzeichnung) await run_b18(state) # Impressum-Specialist-Agent (Pattern+LLM) - # Strukturierter v3-AgentOutput pro Thema → standardisierte - # Ergebnis-Tabs im Frontend (additiv zu B18-HTML). - await run_agent_outputs(state) run_b19(state) # Cookie-Coherence (Salesforce-as-essential) await run_b20(state) # Legacy-URL-Discovery (Sitemap+Wayback) run_b22(state) # Cross-Domain-Legal-Doc-Hosting (Elli/LogPay) @@ -110,8 +112,10 @@ async def run_compliance_check(check_id: str, req) -> None: run_phase_e(state) # Phase F: Step 7 persist + audit log + unified findings run_phase_f(state) + emit(check_id, {"type": "complete", "status": "completed"}) except Exception as e: logger.error("Compliance check %s failed: %s", check_id, e, exc_info=True) _compliance_check_jobs[check_id]["status"] = "failed" _compliance_check_jobs[check_id]["error"] = str(e)[:500] + emit(check_id, {"type": "error", "error": str(e)[:300]}) diff --git a/backend-compliance/compliance/api/agent_check/_sse.py b/backend-compliance/compliance/api/agent_check/_sse.py new file mode 100644 index 00000000..9091974f --- /dev/null +++ b/backend-compliance/compliance/api/agent_check/_sse.py @@ -0,0 +1,82 @@ +"""SSE-Plumbing für den Compliance-Check — pro check_id eine Event-Queue ++ Generator. Spiegelt das Agent-Test-SSE (specialist_agent_routes). + +ADDITIV: Das Polling auf GET /compliance-check/{check_id} bleibt die +Wahrheit fürs finale Ergebnis. SSE liefert nur **progressive** Events, +damit sich die Themen-Tabs füllen, sobald ein Thema fertig ist: + - {type:"progress", msg, pct} (aus _update) + - {type:"topic", topic, output} (aus run_agent_outputs, pro Thema) + - {type:"complete", status} (Orchestrator-Ende) +Geht ein Event verloren (Queue voll / kein Client) ist das unkritisch — +der Tab kommt spätestens mit dem finalen Poll. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +from collections.abc import AsyncGenerator + +from ._constants import _compliance_check_jobs + +logger = logging.getLogger(__name__) + +# In-memory Event-Queues pro check_id. Restart-fragil, aber für einen +# Live-Stream ausreichend (Polling ist der persistente Pfad). +_check_queues: dict[str, "asyncio.Queue[dict]"] = {} + +_TERMINAL_JOB_STATES = ("completed", "failed", "skipped_tdm") + + +def new_queue(check_id: str) -> None: + """Legt die Event-Queue für einen Check an (in POST /compliance-check).""" + _check_queues[check_id] = asyncio.Queue(maxsize=500) + + +def emit(check_id: str, event: dict) -> None: + """Non-blocking best-effort push. Synchron, damit auch das synchrone + _update() emittieren kann.""" + q = _check_queues.get(check_id) + if q is None: + return + try: + q.put_nowait(event) + except asyncio.QueueFull: + pass # Client zu langsam — Poll holt den Stand nach + + +def _format_sse(payload: dict) -> str: + return f"data: {json.dumps(payload, default=str)}\n\n" + + +async def event_generator(check_id: str) -> AsyncGenerator[str, None]: + """Draint die Queue bis der Check terminal ist. Heartbeat alle 25s.""" + q = _check_queues.get(check_id) + if q is None: + # Check evtl. schon fertig (Queue aufgeräumt) → Client soll pollen. + yield _format_sse({"type": "stream_close", "reason": "no_queue"}) + return + yield _format_sse({"type": "hello", "check_id": check_id}) + try: + while True: + try: + event = await asyncio.wait_for(q.get(), timeout=25.0) + except asyncio.TimeoutError: + yield _format_sse({"type": "heartbeat"}) + job = _compliance_check_jobs.get(check_id) or {} + if job.get("status") in _TERMINAL_JOB_STATES: + yield _format_sse({"type": "complete", + "status": job.get("status")}) + yield _format_sse({"type": "stream_close"}) + return + continue + yield _format_sse(event) + if event.get("type") in ("complete", "error"): + yield _format_sse({"type": "stream_close"}) + return + finally: + # Queue erst nach 5 Min freigeben (späte Reconnects). + asyncio.get_event_loop().call_later( + 300, lambda: _check_queues.pop(check_id, None), + ) diff --git a/backend-compliance/compliance/api/agent_compliance_check_routes.py b/backend-compliance/compliance/api/agent_compliance_check_routes.py index 324326cc..6cdeacf5 100644 --- a/backend-compliance/compliance/api/agent_compliance_check_routes.py +++ b/backend-compliance/compliance/api/agent_compliance_check_routes.py @@ -30,6 +30,7 @@ import uuid as _uuid import httpx from fastapi import APIRouter +from fastapi.responses import StreamingResponse # ── Re-exports: external callers import these from THIS module ────── from .agent_check._constants import ( # noqa: F401 @@ -63,6 +64,7 @@ from .agent_check._schemas import ( ExtractTextRequest, ) from .agent_check._single_check import _check_single # noqa: F401 +from .agent_check._sse import event_generator, new_queue logger = logging.getLogger(__name__) @@ -137,6 +139,7 @@ async def start_compliance_check(req: ComplianceCheckRequest): "result": None, "error": "", } + new_queue(check_id) # SSE: progressive topic-Events fürs Frontend asyncio.create_task(_run_compliance_check(check_id, req)) return ComplianceCheckStartResponse(check_id=check_id, status="running") @@ -157,6 +160,21 @@ async def get_compliance_check_status(check_id: str): ) +@router.get("/compliance-check/{check_id}/stream") +async def stream_compliance_check(check_id: str) -> StreamingResponse: + """SSE-Stream: progressive Events (progress/topic/complete) eines + laufenden Checks. Additiv zum Polling auf /compliance-check/{check_id}.""" + return StreamingResponse( + event_generator(check_id), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", # nginx: nicht puffern + "Connection": "keep-alive", + }, + ) + + # ── P80: Snapshot + Replay ─────────────────────────────────────────── @router.get("/snapshots") diff --git a/backend-compliance/compliance/tests/test_sse_compliance_check.py b/backend-compliance/compliance/tests/test_sse_compliance_check.py new file mode 100644 index 00000000..e1c046c6 --- /dev/null +++ b/backend-compliance/compliance/tests/test_sse_compliance_check.py @@ -0,0 +1,50 @@ +"""Phase 2: SSE-Plumbing für den Compliance-Check. + +Deckt emit (Queue-Push), _format_sse (SSE-Zeilenformat) und den +event_generator (hello → Events → stream_close bei 'complete') ab. +""" + +from __future__ import annotations + +import asyncio + +from compliance.api.agent_check import _sse + + +def test_emit_pushes_and_format(): + cid = "sse-test-1" + _sse.new_queue(cid) + _sse.emit(cid, {"type": "topic", "topic": "impressum", "output": {"x": 1}}) + q = _sse._check_queues[cid] + assert q.qsize() == 1 + ev = q.get_nowait() + assert ev["type"] == "topic" and ev["topic"] == "impressum" + line = _sse._format_sse(ev) + assert line.startswith("data: ") and line.endswith("\n\n") + assert '"impressum"' in line + + +def test_emit_is_noop_without_queue(): + # Kein new_queue → emit darf nicht crashen (best-effort). + _sse.emit("does-not-exist-xyz", {"type": "topic"}) + + +def test_event_generator_streams_topic_then_closes_on_complete(): + cid = "sse-test-gen" + _sse.new_queue(cid) + _sse.emit(cid, {"type": "topic", "topic": "impressum", "output": {}}) + _sse.emit(cid, {"type": "complete", "status": "completed"}) + + async def collect(): + out = [] + async for line in _sse.event_generator(cid): + out.append(line) + if len(out) > 12: # safety + break + return out + + blob = "".join(asyncio.run(collect())) + assert '"type": "hello"' in blob + assert '"topic": "impressum"' in blob + assert '"type": "complete"' in blob + assert '"type": "stream_close"' in blob