d720db07dd
Firmen-tauglicher Bericht aus den Snapshot-Modulergebnissen (kein Re-Crawl, kein LLM): Einleitung, Testumfang+Methodik, Management-Summary (4-Status), Detail- befunde je Modul, Maßnahmen, Rechtlicher Hinweis. Co-Pilot-Tonalität, Tracking- statt Cookie-Rohzahl, Norm nur referenziert (kein Normtext). - audit_report.py: assemble_report (pur) + render_markdown + render_pdf (reportlab) - snapshot_check_routes: GET /report (struktur+md) + GET /report.pdf - Frontend: AuditReportTab + Proxys (report, report/pdf) + "Bericht"-Tab - Tests: 5 Assembler (compliance/tests → CI-geprüft) + 1 Vitest Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
289 lines
12 KiB
Python
289 lines
12 KiB
Python
"""Snapshot-getriebene Doc-Check-Endpoints (kein Re-Crawl).
|
||
|
||
Cookie-Library-Abgleich + v3-Doc-Agenten (Impressum/DSE/AGB …) laufen auf den
|
||
gespeicherten Snapshot-Texten. Ausgelagert aus agent_compliance_check_routes.py
|
||
(LOC-Budget). Gleicher Router-Prefix → identische Pfade, keine Contract-Änderung.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
from typing import List, Optional
|
||
|
||
from fastapi import APIRouter, HTTPException
|
||
from pydantic import BaseModel
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
router = APIRouter(prefix="/compliance/agent", tags=["agent-snapshots"])
|
||
|
||
# Lokal wie in agent_doc_check_routes/vendor_assessment_routes (kein Import-Zyklus).
|
||
CONSENT_TESTER_URL = "http://bp-compliance-consent-tester:8094"
|
||
|
||
# Browser-Matrix-Profilsatz: 4 Default-Engines (alle Arches) + 3 echte Browser
|
||
# (amd64-only, nur Prod). Auf arm64-Dev scheitern die 3 Extras schnell beim
|
||
# Launch → Fehlerzeile in der Matrix (Frontend zeigt „nicht verfügbar").
|
||
_BROWSER_MATRIX_PROFILES = [
|
||
"chromium-headed-de", "firefox-headed-de", "webkit-headed-de",
|
||
"iphone-mobile-safari-de",
|
||
"brave-default-de", "chrome-channel-desktop-de", "edge-channel-desktop-de",
|
||
]
|
||
|
||
|
||
class BrowserBehaviorRunRequest(BaseModel):
|
||
browser_profiles: Optional[List[str]] = None
|
||
timeout_per_phase: int = 10
|
||
|
||
|
||
def _snapshot_scan_url(snap: dict) -> str:
|
||
"""Scanbare Homepage-URL aus dem Snapshot ableiten (doc_entry-Origin,
|
||
sonst https://<site_domain>)."""
|
||
from urllib.parse import urlparse
|
||
for e in snap.get("doc_entries") or []:
|
||
u = (e.get("url") or "").strip()
|
||
if u:
|
||
p = urlparse(u)
|
||
if p.scheme and p.netloc:
|
||
return f"{p.scheme}://{p.netloc}"
|
||
dom = (snap.get("site_domain") or "").strip()
|
||
return f"https://{dom}" if dom and dom != "unknown" else ""
|
||
|
||
|
||
async def _run_doc_agent(snapshot_id: str, doc_type: str, agent_id: str) -> dict:
|
||
"""Lädt den Snapshot, baut den AgentInput für doc_type und läuft den
|
||
registrierten v3-Doc-Agenten. Geteilt von impressum/dse/agb (kein Re-Crawl)."""
|
||
from database import SessionLocal
|
||
from compliance.services.check_snapshot import load_snapshot
|
||
from compliance.services.specialist_agents import REGISTRY, AgentInput
|
||
from compliance.api.agent_check._agent_outputs import doc_input_from_snapshot
|
||
db = SessionLocal()
|
||
try:
|
||
snap = load_snapshot(db, snapshot_id)
|
||
if not snap:
|
||
raise HTTPException(status_code=404, detail="snapshot not found")
|
||
agent_input = doc_input_from_snapshot(snap, doc_type)
|
||
if not agent_input:
|
||
return {"findings": [], "recommendations": [], "mc_coverage": [],
|
||
"notes": f"kein {doc_type}-Text im Snapshot", "confidence": 0.0}
|
||
out = await REGISTRY.get(agent_id).evaluate(AgentInput(**agent_input))
|
||
result = out.model_dump(mode="json")
|
||
# B: Cross-Doc-Reconciliation — Pflichten, die in einem ANDEREN Dokument
|
||
# erfüllt sind (z.B. § 36 VSBG / OS-Link in AGB/Legal), nicht als Finding
|
||
# zeigen. Konservative Allowlist in cross_doc_reconcile.
|
||
from compliance.services.cross_doc_reconcile import reconcile_doc_findings
|
||
other = [(e.get("doc_type"), e.get("text") or e.get("content") or "")
|
||
for e in (snap.get("doc_entries") or [])
|
||
if e.get("doc_type") != doc_type
|
||
and (e.get("text") or e.get("content"))]
|
||
reconcile_doc_findings(result, agent_id, other)
|
||
return result
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
@router.get("/snapshots/{snapshot_id}/cookie-check")
|
||
async def snapshot_cookie_check(snapshot_id: str):
|
||
"""Pro-Cookie-Abgleich der Snapshot-Vendors gegen cookie_knowledge_db."""
|
||
from database import SessionLocal
|
||
from compliance.services.check_snapshot import load_snapshot
|
||
from compliance.services.cookie_library_check import (
|
||
analyze_cookies, load_big_library,
|
||
)
|
||
from compliance.services.cookie_storage_inventory import (
|
||
build_storage_inventory, storage_transparency_finding,
|
||
dedupe_vendor_cookies,
|
||
)
|
||
from compliance.services.cookie_compliance_audit import (
|
||
audit_cookie_compliance,
|
||
)
|
||
db = SessionLocal()
|
||
try:
|
||
snap = load_snapshot(db, snapshot_id)
|
||
if not snap:
|
||
raise HTTPException(status_code=404, detail="snapshot not found")
|
||
# Consent-Phasen duplizieren Cookies → je Vendor nach Name deduplizieren.
|
||
vendors = dedupe_vendor_cookies(snap.get("cmp_vendors") or [])
|
||
names = [c.get("name", "")
|
||
for v in vendors for c in (v.get("cookies") or [])]
|
||
big = load_big_library(db, names)
|
||
out = analyze_cookies(vendors, big)
|
||
inv = build_storage_inventory(vendors)
|
||
tf = storage_transparency_finding(inv)
|
||
if tf:
|
||
out["findings"].insert(0, tf)
|
||
out["summary"]["findings"] = len(out["findings"])
|
||
out["storage_inventory"] = inv
|
||
# Deklaration-vs-Bibliothek-Diff (nur die getroffene Teilmenge) + Funnel.
|
||
from compliance.services.cookie_declaration_diff import (
|
||
build_declaration_diff,
|
||
)
|
||
out["declaration_diff"] = build_declaration_diff(out)
|
||
# ② Documentation Drift: Cookie-Richtlinie (Text) vs. Browser-Realität.
|
||
docs = snap.get("doc_entries") or []
|
||
cookie_text = next(
|
||
(e.get("text") or e.get("content") or "" for e in docs
|
||
if e.get("doc_type") in ("cookie", "cookie_richtlinie", "cookies")),
|
||
"",
|
||
)
|
||
out["drift"] = audit_cookie_compliance(
|
||
db, cookie_text, snap.get("banner_result"))
|
||
return out
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
@router.get("/snapshots/{snapshot_id}/impressum-check")
|
||
async def snapshot_impressum_check(snapshot_id: str):
|
||
"""Impressum-Analyse (v3 ImpressumAgent) auf dem gespeicherten Text."""
|
||
return await _run_doc_agent(snapshot_id, "impressum", "impressum")
|
||
|
||
|
||
@router.get("/snapshots/{snapshot_id}/dse-check")
|
||
async def snapshot_dse_check(snapshot_id: str):
|
||
"""DSE-Analyse (kuratierter DSEAgent, Art. 13/14) auf dem gespeicherten Text."""
|
||
return await _run_doc_agent(snapshot_id, "dse", "dse")
|
||
|
||
|
||
@router.get("/snapshots/{snapshot_id}/agb-check")
|
||
async def snapshot_agb_check(snapshot_id: str):
|
||
"""AGB-Analyse (kuratierter AGBAgent, §§ 305 ff. BGB) auf dem gespeicherten Text."""
|
||
return await _run_doc_agent(snapshot_id, "agb", "agb")
|
||
|
||
|
||
@router.post("/snapshots/{snapshot_id}/browser-behavior/run")
|
||
async def snapshot_browser_behavior_run(
|
||
snapshot_id: str, req: Optional[BrowserBehaviorRunRequest] = None,
|
||
):
|
||
"""On-demand: Browser-Verhaltens-Matrix LIVE laufen lassen (Re-Crawl der
|
||
Site je Engine — Browser-Verhalten ist nur live messbar) und das Ergebnis
|
||
migrationsfrei in den Snapshot (banner_result.browser_matrix) persistieren.
|
||
Teuer (mehrere Browser × 3 Phasen) → bewusst nur per Button, nicht je Check."""
|
||
import httpx
|
||
from database import SessionLocal
|
||
from compliance.services.check_snapshot import (
|
||
load_snapshot, update_browser_matrix,
|
||
)
|
||
db = SessionLocal()
|
||
try:
|
||
snap = load_snapshot(db, snapshot_id)
|
||
if not snap:
|
||
raise HTTPException(status_code=404, detail="snapshot not found")
|
||
url = _snapshot_scan_url(snap)
|
||
if not url:
|
||
raise HTTPException(
|
||
status_code=422, detail="keine scanbare URL im Snapshot")
|
||
profiles = (req.browser_profiles if req and req.browser_profiles
|
||
else list(_BROWSER_MATRIX_PROFILES))
|
||
payload = {
|
||
"url": url, "browser_profiles": profiles,
|
||
"timeout_per_phase": req.timeout_per_phase if req else 10,
|
||
}
|
||
try:
|
||
async with httpx.AsyncClient(timeout=360.0) as client:
|
||
r = await client.post(
|
||
f"{CONSENT_TESTER_URL}/scan-matrix", json=payload)
|
||
r.raise_for_status()
|
||
matrix = r.json()
|
||
except Exception as e:
|
||
logger.warning("browser-matrix scan failed for %s: %s",
|
||
snapshot_id, e)
|
||
raise HTTPException(
|
||
status_code=502,
|
||
detail=f"consent-tester /scan-matrix fehlgeschlagen: {e}")
|
||
update_browser_matrix(db, snapshot_id, matrix)
|
||
# Cross-Browser-Befunde frisch ableiten (deterministische Sicht, nicht
|
||
# persistiert → GET berechnet identisch neu).
|
||
from compliance.services.browser_cross_finding import build_cross_findings
|
||
matrix["cross_findings"] = build_cross_findings(matrix)
|
||
return matrix
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
@router.get("/snapshots/{snapshot_id}/browser-behavior")
|
||
async def snapshot_browser_behavior(snapshot_id: str):
|
||
"""Liefert die persistierte Browser-Matrix (kein Re-Crawl). `browser_matrix`
|
||
ist null, solange der On-demand-Lauf noch nie ausgelöst wurde."""
|
||
from database import SessionLocal
|
||
from compliance.services.check_snapshot import load_browser_matrix
|
||
from compliance.services.browser_cross_finding import build_cross_findings
|
||
db = SessionLocal()
|
||
try:
|
||
matrix = load_browser_matrix(db, snapshot_id)
|
||
if matrix:
|
||
matrix["cross_findings"] = build_cross_findings(matrix)
|
||
return {"browser_matrix": matrix}
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
async def _gather_report(snapshot_id: str):
|
||
"""Lädt den Snapshot + sammelt ALLE Modul-Ergebnisse (kein Re-Crawl) für den
|
||
Audit-Report. Gibt (meta, modules) zurück."""
|
||
from database import SessionLocal
|
||
from compliance.services.check_snapshot import (
|
||
load_snapshot, load_browser_matrix,
|
||
)
|
||
from compliance.services.browser_cross_finding import build_cross_findings
|
||
db = SessionLocal()
|
||
try:
|
||
snap = load_snapshot(db, snapshot_id)
|
||
if not snap:
|
||
raise HTTPException(status_code=404, detail="snapshot not found")
|
||
meta = {
|
||
"site_label": snap.get("site_label"),
|
||
"site_domain": snap.get("site_domain"),
|
||
"created_at": snap.get("created_at"),
|
||
"check_id": snap.get("check_id"),
|
||
"scan_context": snap.get("scan_context"),
|
||
}
|
||
bm = load_browser_matrix(db, snapshot_id)
|
||
finally:
|
||
db.close()
|
||
docs = snap.get("doc_entries") or []
|
||
|
||
def _has(dt: str) -> bool:
|
||
return any(e.get("doc_type") == dt
|
||
and len(e.get("text") or e.get("content") or "") > 100
|
||
for e in docs)
|
||
|
||
modules: dict = {}
|
||
if snap.get("cmp_vendors"):
|
||
try:
|
||
modules["cookie"] = await snapshot_cookie_check(snapshot_id)
|
||
except Exception as e:
|
||
logger.warning("report cookie failed: %s", e)
|
||
for dt, agent in (("impressum", "impressum"), ("dse", "dse"), ("agb", "agb")):
|
||
if _has(dt):
|
||
try:
|
||
modules[dt] = await _run_doc_agent(snapshot_id, dt, agent)
|
||
except Exception as e:
|
||
logger.warning("report %s failed: %s", dt, e)
|
||
if bm:
|
||
modules["browser"] = {"browser_matrix": bm,
|
||
"cross_findings": build_cross_findings(bm)}
|
||
return meta, modules
|
||
|
||
|
||
@router.get("/snapshots/{snapshot_id}/report")
|
||
async def snapshot_report(snapshot_id: str):
|
||
"""Deterministischer Audit-Textreport (strukturiert + Markdown), aus den
|
||
Modul-Ergebnissen des Snapshots — kein Re-Crawl, kein LLM."""
|
||
from compliance.services.audit_report import assemble_report, render_markdown
|
||
meta, modules = await _gather_report(snapshot_id)
|
||
report = assemble_report(meta, modules)
|
||
return {"report": report, "markdown": render_markdown(report)}
|
||
|
||
|
||
@router.get("/snapshots/{snapshot_id}/report.pdf")
|
||
async def snapshot_report_pdf(snapshot_id: str):
|
||
"""Druckfertiges PDF des Audit-Reports (reportlab)."""
|
||
from fastapi import Response
|
||
from compliance.services.audit_report import assemble_report, render_pdf
|
||
meta, modules = await _gather_report(snapshot_id)
|
||
pdf = render_pdf(assemble_report(meta, modules))
|
||
dom = (meta.get("site_domain") or "report").replace("/", "_")
|
||
return Response(
|
||
content=pdf, media_type="application/pdf",
|
||
headers={"Content-Disposition": f'attachment; filename="audit-{dom}.pdf"'})
|