Files
breakpilot-compliance/backend-compliance/compliance/api/snapshot_check_routes.py
T
Benjamin Admin 0f6cdc93fd fix(snapshot): Cookie-Dedup + schneller Impressum-Tab + Tabellen-Zahl
- Cookies werden je Vendor nach Name dedupliziert (Consent-Phasen-Dubletten;
  BMW 2196 → ~772) — in cookie-check + get_snapshot, behebt aufgeblähte
  Kachel-/Finding-Zahlen.
- Impressum-Snapshot-Check überspringt den ~40s-LLM-Schritt (context skip_llm)
  → Tab lädt sofort statt leer zu bleiben.
- Vendor-Tabelle zeigt nur die Cookie-Zahl (kein 'Cookies'-Wort je Zeile).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-11 19:54:15 +02:00

113 lines
4.8 KiB
Python

"""Snapshot-getriebene Doc-Check-Endpoints (kein Re-Crawl).
Cookie-Library-Abgleich + v3-Doc-Agenten (Impressum/DSE/AGB …) laufen auf den
gespeicherten Snapshot-Texten. Ausgelagert aus agent_compliance_check_routes.py
(LOC-Budget). Gleicher Router-Prefix → identische Pfade, keine Contract-Änderung.
"""
from __future__ import annotations
import logging
from fastapi import APIRouter, HTTPException
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/compliance/agent", tags=["agent-snapshots"])
async def _run_doc_agent(snapshot_id: str, doc_type: str, agent_id: str) -> dict:
"""Lädt den Snapshot, baut den AgentInput für doc_type und läuft den
registrierten v3-Doc-Agenten. Geteilt von impressum/dse/agb (kein Re-Crawl)."""
from database import SessionLocal
from compliance.services.check_snapshot import load_snapshot
from compliance.services.specialist_agents import REGISTRY, AgentInput
from compliance.api.agent_check._agent_outputs import doc_input_from_snapshot
db = SessionLocal()
try:
snap = load_snapshot(db, snapshot_id)
if not snap:
raise HTTPException(status_code=404, detail="snapshot not found")
agent_input = doc_input_from_snapshot(snap, doc_type)
if not agent_input:
return {"findings": [], "recommendations": [], "mc_coverage": [],
"notes": f"kein {doc_type}-Text im Snapshot", "confidence": 0.0}
out = await REGISTRY.get(agent_id).evaluate(AgentInput(**agent_input))
result = out.model_dump(mode="json")
# B: Cross-Doc-Reconciliation — Pflichten, die in einem ANDEREN Dokument
# erfüllt sind (z.B. § 36 VSBG / OS-Link in AGB/Legal), nicht als Finding
# zeigen. Konservative Allowlist in cross_doc_reconcile.
from compliance.services.cross_doc_reconcile import reconcile_doc_findings
other = [(e.get("doc_type"), e.get("text") or e.get("content") or "")
for e in (snap.get("doc_entries") or [])
if e.get("doc_type") != doc_type
and (e.get("text") or e.get("content"))]
reconcile_doc_findings(result, agent_id, other)
return result
finally:
db.close()
@router.get("/snapshots/{snapshot_id}/cookie-check")
async def snapshot_cookie_check(snapshot_id: str):
"""Pro-Cookie-Abgleich der Snapshot-Vendors gegen cookie_knowledge_db."""
from database import SessionLocal
from compliance.services.check_snapshot import load_snapshot
from compliance.services.cookie_library_check import (
analyze_cookies, load_big_library,
)
from compliance.services.cookie_storage_inventory import (
build_storage_inventory, storage_transparency_finding,
dedupe_vendor_cookies,
)
from compliance.services.cookie_compliance_audit import (
audit_cookie_compliance,
)
db = SessionLocal()
try:
snap = load_snapshot(db, snapshot_id)
if not snap:
raise HTTPException(status_code=404, detail="snapshot not found")
# Consent-Phasen duplizieren Cookies → je Vendor nach Name deduplizieren.
vendors = dedupe_vendor_cookies(snap.get("cmp_vendors") or [])
names = [c.get("name", "")
for v in vendors for c in (v.get("cookies") or [])]
big = load_big_library(db, names)
out = analyze_cookies(vendors, big)
inv = build_storage_inventory(vendors)
tf = storage_transparency_finding(inv)
if tf:
out["findings"].insert(0, tf)
out["summary"]["findings"] = len(out["findings"])
out["storage_inventory"] = inv
# ② Documentation Drift: Cookie-Richtlinie (Text) vs. Browser-Realität.
docs = snap.get("doc_entries") or []
cookie_text = next(
(e.get("text") or e.get("content") or "" for e in docs
if e.get("doc_type") in ("cookie", "cookie_richtlinie", "cookies")),
"",
)
out["drift"] = audit_cookie_compliance(
db, cookie_text, snap.get("banner_result"))
return out
finally:
db.close()
@router.get("/snapshots/{snapshot_id}/impressum-check")
async def snapshot_impressum_check(snapshot_id: str):
"""Impressum-Analyse (v3 ImpressumAgent) auf dem gespeicherten Text."""
return await _run_doc_agent(snapshot_id, "impressum", "impressum")
@router.get("/snapshots/{snapshot_id}/dse-check")
async def snapshot_dse_check(snapshot_id: str):
"""DSE-Analyse (kuratierter DSEAgent, Art. 13/14) auf dem gespeicherten Text."""
return await _run_doc_agent(snapshot_id, "dse", "dse")
@router.get("/snapshots/{snapshot_id}/agb-check")
async def snapshot_agb_check(snapshot_id: str):
"""AGB-Analyse (kuratierter AGBAgent, §§ 305 ff. BGB) auf dem gespeicherten Text."""
return await _run_doc_agent(snapshot_id, "agb", "agb")