feat(agents): SSE-Endpoint + Agent-Test-Tab (5-URL parallel)
CI / detect-changes (push) Successful in 7s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Failing after 4s
CI / validate-canonical-controls (push) Successful in 12s
CI / loc-budget (push) Successful in 14s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m24s
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 29s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped

Backend:
- specialist_agent_routes.py: GET /agents, POST /test/start (run_id),
  GET /test/stream/{run_id} (SSE), GET /run/{run_id}/result,
  GET /run/{run_id}/artifacts, GET /run/{run_id}/artifact/{path},
  DELETE /run/{run_id}, GET /runs.
- Per-URL async orchestrator: text fetch via consent-tester
  dsi-discovery → agent.evaluate() → vault.put_json + stream events.
- Tests: 7/7 grün.

Frontend:
- /api/sdk/v1/specialist-agent proxy mit SSE-passthrough.
- AgentTestTab.tsx: Agent-Wähler + 5 URL-Slots + Live-Events +
  Speedometer (OK/N-A/HIGH/MEDIUM/LOW) + Findings + Recommendations +
  Eskalations-Log + Artefakt-Link pro Slot.
- Neuer Tab "Agent-Test" in /sdk/agent.

User-Wunsch 2026-06-08: pro Agent isoliert testen, 5 URLs gleichzeitig,
Live-Updates statt Polling-Wartespiel.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-08 17:47:05 +02:00
parent f4357a2e9b
commit 3ae4e60c9d
6 changed files with 1064 additions and 1 deletions
@@ -74,6 +74,7 @@ _ROUTER_MODULES = [
"founding_wizard_routes",
"licenses_routes",
"template_rule_routes",
"specialist_agent_routes",
]
_loaded_count = 0
@@ -0,0 +1,378 @@
"""SSE-Endpoint für den Agent-Test-Harness.
User-Vorgabe 2026-06-08: pro Agent isoliert testen mit z.B. 5 URLs
gleichzeitig. Live-Stream der Events ins Frontend.
Endpoints:
GET /specialist-agent/agents
POST /specialist-agent/test/start { agent_id, urls }
GET /specialist-agent/test/stream/{run_id} → SSE-Stream
GET /specialist-agent/run/{run_id}/artifacts
GET /specialist-agent/run/{run_id}/artifact/{relpath}
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import uuid
from collections.abc import AsyncGenerator
from typing import Any
import httpx
from fastapi import APIRouter, HTTPException
from fastapi.responses import FileResponse, StreamingResponse
from pydantic import BaseModel, Field
from compliance.services.specialist_agents import REGISTRY, AgentInput
from compliance.services.specialist_agents._evidence_vault import (
EvidenceVault,
delete_run as vault_delete_run,
list_runs as vault_list_runs,
)
logger = logging.getLogger(__name__)
CONSENT_TESTER_URL = os.environ.get(
"CONSENT_TESTER_URL",
"http://bp-compliance-consent-tester:8094",
)
router = APIRouter(prefix="/specialist-agent", tags=["specialist-agent"])
# In-memory event-queues pro run_id. Restart-fragil aber für ein
# Live-Test-Tool ausreichend (keine Persistenz nötig).
_run_queues: dict[str, asyncio.Queue] = {}
_run_states: dict[str, dict[str, Any]] = {}
class TestStartRequest(BaseModel):
agent_id: str
urls: list[str] = Field(default_factory=list, max_length=10)
raw_texts: list[str] = Field(default_factory=list, max_length=10)
business_scope: list[str] = Field(default_factory=list)
company_name: str = ""
origin_domain: str = ""
class TestStartResponse(BaseModel):
run_id: str
agent_id: str
slot_count: int
@router.get("/agents")
async def list_agents() -> dict[str, Any]:
"""Liefert die registrierten Specialist-Agenten."""
return {"agents": REGISTRY.list_agents()}
@router.post("/test/start", response_model=TestStartResponse)
async def start_test(req: TestStartRequest) -> TestStartResponse:
"""Startet einen Multi-URL-Test gegen einen Agent.
Liefert eine run_id zurück. Der Frontend-Client öffnet danach
einen SSE-Stream auf /test/stream/{run_id} um Events zu empfangen.
"""
agent = REGISTRY.get(req.agent_id)
if agent is None:
raise HTTPException(404, f"agent '{req.agent_id}' nicht registriert")
slots = max(len(req.urls), len(req.raw_texts))
if slots == 0:
raise HTTPException(400, "urls oder raw_texts dürfen nicht leer sein")
run_id = uuid.uuid4().hex[:16]
queue: asyncio.Queue = asyncio.Queue(maxsize=500)
_run_queues[run_id] = queue
_run_states[run_id] = {
"agent_id": req.agent_id,
"started": False,
"finished": False,
"slot_count": slots,
"results": {},
}
vault = EvidenceVault(agent.agent_id, agent.agent_version,
run_id=run_id)
asyncio.create_task(_run_test_orchestrator(run_id, req, vault))
return TestStartResponse(
run_id=run_id,
agent_id=req.agent_id,
slot_count=slots,
)
@router.get("/test/stream/{run_id}")
async def stream_test(run_id: str) -> StreamingResponse:
"""SSE-Stream der Events für einen laufenden Test."""
if run_id not in _run_queues:
raise HTTPException(404, "run_id unbekannt")
return StreamingResponse(
_event_generator(run_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # nginx
"Connection": "keep-alive",
},
)
async def _event_generator(run_id: str) -> AsyncGenerator[str, None]:
"""Reads events from the queue until the run is finished."""
queue = _run_queues[run_id]
# Initial hello
yield _format_sse({"type": "hello", "run_id": run_id})
try:
while True:
try:
event = await asyncio.wait_for(queue.get(), timeout=30.0)
except asyncio.TimeoutError:
# heartbeat
yield _format_sse({"type": "heartbeat"})
if _run_states.get(run_id, {}).get("finished"):
yield _format_sse({"type": "stream_close"})
return
continue
yield _format_sse(event)
if event.get("type") in ("run_complete", "run_error"):
yield _format_sse({"type": "stream_close"})
return
finally:
# Defer cleanup: keep state for 5 min so late GETs can read
# results from _run_states. The queue can be released earlier.
asyncio.get_event_loop().call_later(
300, lambda: _run_queues.pop(run_id, None),
)
def _format_sse(payload: dict) -> str:
"""SSE event line format."""
return f"data: {json.dumps(payload, default=str)}\n\n"
async def _emit(run_id: str, event: dict) -> None:
q = _run_queues.get(run_id)
if q is None:
return
try:
await q.put(event)
except Exception:
pass
async def _run_test_orchestrator(
run_id: str,
req: TestStartRequest,
vault: EvidenceVault,
) -> None:
"""Kernlogik: pro URL / raw_text parallel den Agent feuern."""
agent = REGISTRY.get(req.agent_id)
if agent is None:
await _emit(run_id, {"type": "run_error",
"error": "agent gone"})
return
_run_states[run_id]["started"] = True
await _emit(run_id, {
"type": "run_started",
"agent_id": agent.agent_id,
"agent_version": agent.agent_version,
"slot_count": _run_states[run_id]["slot_count"],
})
slot_jobs: list[asyncio.Task] = []
# URLs first, then raw_texts. Slots numbered url1, url2, …, text1, …
for i, url in enumerate(req.urls, start=1):
slot = f"url{i}"
slot_jobs.append(asyncio.create_task(
_process_slot(run_id, slot, agent, url, "", req, vault),
))
for j, raw in enumerate(req.raw_texts, start=1):
slot = f"text{j}"
slot_jobs.append(asyncio.create_task(
_process_slot(run_id, slot, agent, "", raw, req, vault),
))
try:
await asyncio.gather(*slot_jobs, return_exceptions=True)
finally:
manifest = vault.finalize()
_run_states[run_id]["finished"] = True
await _emit(run_id, {
"type": "run_complete",
"vault_url": vault.url(),
"manifest_asset_count": len(manifest.get("assets") or []),
})
async def _process_slot(
run_id: str,
slot: str,
agent,
url: str,
raw_text: str,
req: TestStartRequest,
vault: EvidenceVault,
) -> None:
"""Holt den Text (URL oder raw), ruft Agent, vault-speichert Output."""
label = url or f"text-slot-{slot}"
await _emit(run_id, {"type": "slot_started", "slot": slot,
"label": label})
text = raw_text
fetch_err = ""
if url and not raw_text:
await _emit(run_id, {"type": "slot_fetching",
"slot": slot, "url": url})
text, fetch_err = await _fetch_text(url)
if fetch_err:
await _emit(run_id, {
"type": "slot_fetch_error",
"slot": slot,
"error": fetch_err,
})
if text:
vault.put_bytes("raw", slot, "source.txt",
text.encode("utf-8"),
mime="text/plain")
await _emit(run_id, {
"type": "slot_text_ready",
"slot": slot,
"char_count": len(text),
})
agent_input = AgentInput(
doc_type=agent.doc_type,
text=text,
url=url,
business_scope=req.business_scope,
company_name=req.company_name,
origin_domain=req.origin_domain,
)
await _emit(run_id, {"type": "slot_agent_running", "slot": slot})
try:
output = await agent.evaluate(agent_input)
except Exception as e:
logger.exception("agent crashed slot=%s", slot)
await _emit(run_id, {
"type": "slot_agent_error", "slot": slot,
"error": f"{type(e).__name__}: {str(e)[:160]}",
})
return
# Persist findings as JSON in vault
vault.put_json("finding", slot, "output.json",
json.loads(output.model_dump_json()))
# Update state for later /artifacts query
_run_states[run_id]["results"][slot] = json.loads(
output.model_dump_json(),
)
# Stream finding-emitted events
for f in output.findings:
await _emit(run_id, {
"type": "finding",
"slot": slot,
"check_id": f.check_id,
"severity": f.severity,
"title": f.title,
"field_id": f.field_id,
})
for esc in output.escalation_log:
await _emit(run_id, {
"type": "escalation",
"slot": slot,
"stage": esc.stage,
"model": esc.model,
"success": esc.success,
"duration_ms": esc.duration_ms,
})
await _emit(run_id, {
"type": "slot_complete",
"slot": slot,
"duration_ms": output.duration_ms,
"mc_total": output.mc_total,
"mc_ok": output.mc_ok,
"mc_na": output.mc_na,
"mc_high": output.mc_high,
"mc_medium": output.mc_medium,
"mc_low": output.mc_low,
"findings_count": len(output.findings),
"recommendations_count": len(output.recommendations),
"confidence": output.confidence,
})
async def _fetch_text(url: str) -> tuple[str, str]:
"""Nutzt den consent-tester DSI-Discovery für Volltext."""
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{CONSENT_TESTER_URL}/dsi-discovery",
json={"url": url, "max_documents": 5},
timeout=120.0,
)
if resp.status_code != 200:
return "", f"HTTP {resp.status_code}"
data = resp.json()
docs = data.get("documents", []) or []
if not docs:
return "", "no documents discovered"
texts: list[str] = []
for doc in docs:
t = (doc.get("full_text", "") or
doc.get("text_preview", "") or "")
if t and len(t) > 50:
texts.append(t)
return "\n\n".join(texts), ""
except Exception as e:
return "", f"{type(e).__name__}: {str(e)[:160]}"
# ── Run / Vault Queries ──────────────────────────────────────────────
@router.get("/run/{run_id}/result")
async def get_run_result(run_id: str) -> dict[str, Any]:
"""Komplette Ergebnisse eines Runs (für Frontend-Refresh)."""
state = _run_states.get(run_id)
if state is None:
raise HTTPException(404, "run unbekannt")
return {
"run_id": run_id,
"agent_id": state["agent_id"],
"finished": state["finished"],
"results": state["results"],
"vault_url": f"/api/v1/specialist-agent/run/{run_id}/artifacts",
}
@router.get("/run/{run_id}/artifacts")
async def list_run_artifacts(run_id: str) -> dict[str, Any]:
"""Listet die Assets eines Runs."""
vault = EvidenceVault("?", "?", run_id=run_id)
return {
"run_id": run_id,
"manifest": vault._manifest,
}
@router.get("/run/{run_id}/artifact/{path:path}")
async def get_run_artifact(run_id: str, path: str):
"""Liefert ein einzelnes Artefakt aus dem Vault."""
vault = EvidenceVault("?", "?", run_id=run_id)
p = vault.asset_path(path)
if p is None:
raise HTTPException(404, "asset not found")
return FileResponse(str(p))
@router.delete("/run/{run_id}")
async def delete_run(run_id: str) -> dict[str, bool]:
"""DSR Art. 17: löscht den ganzen Run + Vault."""
deleted_vault = vault_delete_run(run_id)
_run_queues.pop(run_id, None)
_run_states.pop(run_id, None)
return {"deleted": deleted_vault}
@router.get("/runs")
async def list_runs(limit: int = 20) -> dict[str, Any]:
"""Listet die letzten Runs im Vault."""
return {"runs": vault_list_runs(limit)}
@@ -0,0 +1,129 @@
"""Tests für SSE-Endpoints des Specialist-Agent-Test-Harness."""
from __future__ import annotations
import asyncio
import json
from unittest.mock import AsyncMock, patch
import pytest
from fastapi.testclient import TestClient
@pytest.fixture
def app(tmp_path, monkeypatch):
monkeypatch.setenv("EVIDENCE_VAULT_ROOT", str(tmp_path / "vault"))
from fastapi import FastAPI
from compliance.api.specialist_agent_routes import router
app = FastAPI()
app.include_router(router, prefix="/api/v1")
return app
@pytest.fixture
def client(app):
return TestClient(app)
def test_list_agents(client):
r = client.get("/api/v1/specialist-agent/agents")
assert r.status_code == 200
data = r.json()
agent_ids = {a["agent_id"] for a in data["agents"]}
assert "impressum" in agent_ids
assert "cookie_policy" in agent_ids
def test_start_test_invalid_agent(client):
r = client.post("/api/v1/specialist-agent/test/start",
json={"agent_id": "ghost",
"raw_texts": ["test"]})
assert r.status_code == 404
def test_start_test_no_input(client):
r = client.post("/api/v1/specialist-agent/test/start",
json={"agent_id": "impressum"})
assert r.status_code == 400
def test_start_test_with_raw_text(client):
r = client.post("/api/v1/specialist-agent/test/start",
json={"agent_id": "impressum",
"raw_texts": ["Tesla Germany GmbH "
"Berlin Email: x@y.com "
"HRB 123 Charlottenburg"]})
assert r.status_code == 200
data = r.json()
assert data["agent_id"] == "impressum"
assert data["slot_count"] == 1
assert data["run_id"]
def test_stream_unknown_run(client):
r = client.get("/api/v1/specialist-agent/test/stream/ghost")
assert r.status_code == 404
def test_run_result_after_text_input(client, monkeypatch):
# Skip LLM
async def _no_cascade(*a, **kw): return None, []
monkeypatch.setattr(
"compliance.services.specialist_agents.impressum.agent.cascade",
_no_cascade,
)
r = client.post("/api/v1/specialist-agent/test/start",
json={"agent_id": "impressum",
"raw_texts": [
"Tesla Germany GmbH\nLudwig-Prandtl-Strasse 25\n"
"12526 Berlin\nDeutschland\nEmail: x@y.com\n"
"Tel: +49 89 1250 16 800\n"
"Management: Elon Musk\n"
"HRB 218904 B Charlottenburg",
]})
run_id = r.json()["run_id"]
# Give async task time to finish (small text → fast)
for _ in range(40):
rr = client.get(
f"/api/v1/specialist-agent/run/{run_id}/result",
)
if rr.json().get("finished"):
break
import time; time.sleep(0.05)
res = client.get(f"/api/v1/specialist-agent/run/{run_id}/result")
body = res.json()
assert body["finished"]
assert "text1" in body["results"]
out = body["results"]["text1"]
field_ids = {f["field_id"] for f in out["findings"]}
# Tesla pattern: German-label fehlt + USt fehlt
assert "vertretungsberechtigte_label_korrekt" in field_ids
def test_artifacts_listing(client, monkeypatch):
async def _no_cascade(*a, **kw): return None, []
monkeypatch.setattr(
"compliance.services.specialist_agents.impressum.agent.cascade",
_no_cascade,
)
r = client.post("/api/v1/specialist-agent/test/start",
json={"agent_id": "impressum",
"raw_texts": ["Tesla Germany GmbH "
"Berlin Email: x@y.com "
"HRB 123 Charlottenburg"]})
run_id = r.json()["run_id"]
for _ in range(40):
rr = client.get(
f"/api/v1/specialist-agent/run/{run_id}/result",
)
if rr.json().get("finished"):
break
import time; time.sleep(0.05)
arts = client.get(
f"/api/v1/specialist-agent/run/{run_id}/artifacts",
)
assert arts.status_code == 200
manifest = arts.json()["manifest"]
kinds = {a["kind"] for a in manifest["assets"]}
assert "finding" in kinds
assert "raw" in kinds