Files
breakpilot-compliance/backend-compliance/compliance/api/snapshot_check_routes.py
T
Benjamin Admin 85a8a1d545 feat(browser-matrix): Cross-Browser-Befunde + Browser-Default-Einordnung (Phase 4)
- browser_cross_finding: deterministische Sicht ueber die Matrix (keine 2.
  Engine, kein LLM). Findet Inkonsistenzen ZWISCHEN Browsern (Cookies vor
  Consent / Ablehnen nicht universell respektiert / Banner-Links fehlend) und
  ordnet ein: Safari-ITP / Brave-Shields / Firefox-ETP maskieren Verstoesse
  clientseitig → strenge Engine "sauber" ist KEIN Compliance-Beleg, massgeblich
  sind die nachgiebigen (Chrome/Edge). Coverage-Hinweis fuer nicht verfuegbare
  Browser. Je Befund Titel/Detail/Severity/affected/Massnahme.
- snapshot_check_routes: cross_findings frisch in run + GET (nicht persistiert).
- BrowserBehaviorView: "Cross-Browser-Befunde"-Block ueber der Tabelle.
- Tests: test_browser_cross_finding (6).

Offen (Folge-Task): Borlabs-Consent-Historie-Live-Erkennung (braucht
consent-tester-Storage-Scan).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-12 23:22:57 +02:00

218 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Snapshot-getriebene Doc-Check-Endpoints (kein Re-Crawl).
Cookie-Library-Abgleich + v3-Doc-Agenten (Impressum/DSE/AGB …) laufen auf den
gespeicherten Snapshot-Texten. Ausgelagert aus agent_compliance_check_routes.py
(LOC-Budget). Gleicher Router-Prefix → identische Pfade, keine Contract-Änderung.
"""
from __future__ import annotations
import logging
from typing import List, Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/compliance/agent", tags=["agent-snapshots"])
# Lokal wie in agent_doc_check_routes/vendor_assessment_routes (kein Import-Zyklus).
CONSENT_TESTER_URL = "http://bp-compliance-consent-tester:8094"
# Browser-Matrix-Profilsatz: 4 Default-Engines (alle Arches) + 3 echte Browser
# (amd64-only, nur Prod). Auf arm64-Dev scheitern die 3 Extras schnell beim
# Launch → Fehlerzeile in der Matrix (Frontend zeigt „nicht verfügbar").
_BROWSER_MATRIX_PROFILES = [
"chromium-headed-de", "firefox-headed-de", "webkit-headed-de",
"iphone-mobile-safari-de",
"brave-default-de", "chrome-channel-desktop-de", "edge-channel-desktop-de",
]
class BrowserBehaviorRunRequest(BaseModel):
browser_profiles: Optional[List[str]] = None
timeout_per_phase: int = 10
def _snapshot_scan_url(snap: dict) -> str:
"""Scanbare Homepage-URL aus dem Snapshot ableiten (doc_entry-Origin,
sonst https://<site_domain>)."""
from urllib.parse import urlparse
for e in snap.get("doc_entries") or []:
u = (e.get("url") or "").strip()
if u:
p = urlparse(u)
if p.scheme and p.netloc:
return f"{p.scheme}://{p.netloc}"
dom = (snap.get("site_domain") or "").strip()
return f"https://{dom}" if dom and dom != "unknown" else ""
async def _run_doc_agent(snapshot_id: str, doc_type: str, agent_id: str) -> dict:
"""Lädt den Snapshot, baut den AgentInput für doc_type und läuft den
registrierten v3-Doc-Agenten. Geteilt von impressum/dse/agb (kein Re-Crawl)."""
from database import SessionLocal
from compliance.services.check_snapshot import load_snapshot
from compliance.services.specialist_agents import REGISTRY, AgentInput
from compliance.api.agent_check._agent_outputs import doc_input_from_snapshot
db = SessionLocal()
try:
snap = load_snapshot(db, snapshot_id)
if not snap:
raise HTTPException(status_code=404, detail="snapshot not found")
agent_input = doc_input_from_snapshot(snap, doc_type)
if not agent_input:
return {"findings": [], "recommendations": [], "mc_coverage": [],
"notes": f"kein {doc_type}-Text im Snapshot", "confidence": 0.0}
out = await REGISTRY.get(agent_id).evaluate(AgentInput(**agent_input))
result = out.model_dump(mode="json")
# B: Cross-Doc-Reconciliation — Pflichten, die in einem ANDEREN Dokument
# erfüllt sind (z.B. § 36 VSBG / OS-Link in AGB/Legal), nicht als Finding
# zeigen. Konservative Allowlist in cross_doc_reconcile.
from compliance.services.cross_doc_reconcile import reconcile_doc_findings
other = [(e.get("doc_type"), e.get("text") or e.get("content") or "")
for e in (snap.get("doc_entries") or [])
if e.get("doc_type") != doc_type
and (e.get("text") or e.get("content"))]
reconcile_doc_findings(result, agent_id, other)
return result
finally:
db.close()
@router.get("/snapshots/{snapshot_id}/cookie-check")
async def snapshot_cookie_check(snapshot_id: str):
"""Pro-Cookie-Abgleich der Snapshot-Vendors gegen cookie_knowledge_db."""
from database import SessionLocal
from compliance.services.check_snapshot import load_snapshot
from compliance.services.cookie_library_check import (
analyze_cookies, load_big_library,
)
from compliance.services.cookie_storage_inventory import (
build_storage_inventory, storage_transparency_finding,
dedupe_vendor_cookies,
)
from compliance.services.cookie_compliance_audit import (
audit_cookie_compliance,
)
db = SessionLocal()
try:
snap = load_snapshot(db, snapshot_id)
if not snap:
raise HTTPException(status_code=404, detail="snapshot not found")
# Consent-Phasen duplizieren Cookies → je Vendor nach Name deduplizieren.
vendors = dedupe_vendor_cookies(snap.get("cmp_vendors") or [])
names = [c.get("name", "")
for v in vendors for c in (v.get("cookies") or [])]
big = load_big_library(db, names)
out = analyze_cookies(vendors, big)
inv = build_storage_inventory(vendors)
tf = storage_transparency_finding(inv)
if tf:
out["findings"].insert(0, tf)
out["summary"]["findings"] = len(out["findings"])
out["storage_inventory"] = inv
# Deklaration-vs-Bibliothek-Diff (nur die getroffene Teilmenge) + Funnel.
from compliance.services.cookie_declaration_diff import (
build_declaration_diff,
)
out["declaration_diff"] = build_declaration_diff(out)
# ② Documentation Drift: Cookie-Richtlinie (Text) vs. Browser-Realität.
docs = snap.get("doc_entries") or []
cookie_text = next(
(e.get("text") or e.get("content") or "" for e in docs
if e.get("doc_type") in ("cookie", "cookie_richtlinie", "cookies")),
"",
)
out["drift"] = audit_cookie_compliance(
db, cookie_text, snap.get("banner_result"))
return out
finally:
db.close()
@router.get("/snapshots/{snapshot_id}/impressum-check")
async def snapshot_impressum_check(snapshot_id: str):
"""Impressum-Analyse (v3 ImpressumAgent) auf dem gespeicherten Text."""
return await _run_doc_agent(snapshot_id, "impressum", "impressum")
@router.get("/snapshots/{snapshot_id}/dse-check")
async def snapshot_dse_check(snapshot_id: str):
"""DSE-Analyse (kuratierter DSEAgent, Art. 13/14) auf dem gespeicherten Text."""
return await _run_doc_agent(snapshot_id, "dse", "dse")
@router.get("/snapshots/{snapshot_id}/agb-check")
async def snapshot_agb_check(snapshot_id: str):
"""AGB-Analyse (kuratierter AGBAgent, §§ 305 ff. BGB) auf dem gespeicherten Text."""
return await _run_doc_agent(snapshot_id, "agb", "agb")
@router.post("/snapshots/{snapshot_id}/browser-behavior/run")
async def snapshot_browser_behavior_run(
snapshot_id: str, req: Optional[BrowserBehaviorRunRequest] = None,
):
"""On-demand: Browser-Verhaltens-Matrix LIVE laufen lassen (Re-Crawl der
Site je Engine — Browser-Verhalten ist nur live messbar) und das Ergebnis
migrationsfrei in den Snapshot (banner_result.browser_matrix) persistieren.
Teuer (mehrere Browser × 3 Phasen) → bewusst nur per Button, nicht je Check."""
import httpx
from database import SessionLocal
from compliance.services.check_snapshot import (
load_snapshot, update_browser_matrix,
)
db = SessionLocal()
try:
snap = load_snapshot(db, snapshot_id)
if not snap:
raise HTTPException(status_code=404, detail="snapshot not found")
url = _snapshot_scan_url(snap)
if not url:
raise HTTPException(
status_code=422, detail="keine scanbare URL im Snapshot")
profiles = (req.browser_profiles if req and req.browser_profiles
else list(_BROWSER_MATRIX_PROFILES))
payload = {
"url": url, "browser_profiles": profiles,
"timeout_per_phase": req.timeout_per_phase if req else 10,
}
try:
async with httpx.AsyncClient(timeout=360.0) as client:
r = await client.post(
f"{CONSENT_TESTER_URL}/scan-matrix", json=payload)
r.raise_for_status()
matrix = r.json()
except Exception as e:
logger.warning("browser-matrix scan failed for %s: %s",
snapshot_id, e)
raise HTTPException(
status_code=502,
detail=f"consent-tester /scan-matrix fehlgeschlagen: {e}")
update_browser_matrix(db, snapshot_id, matrix)
# Cross-Browser-Befunde frisch ableiten (deterministische Sicht, nicht
# persistiert → GET berechnet identisch neu).
from compliance.services.browser_cross_finding import build_cross_findings
matrix["cross_findings"] = build_cross_findings(matrix)
return matrix
finally:
db.close()
@router.get("/snapshots/{snapshot_id}/browser-behavior")
async def snapshot_browser_behavior(snapshot_id: str):
"""Liefert die persistierte Browser-Matrix (kein Re-Crawl). `browser_matrix`
ist null, solange der On-demand-Lauf noch nie ausgelöst wurde."""
from database import SessionLocal
from compliance.services.check_snapshot import load_browser_matrix
from compliance.services.browser_cross_finding import build_cross_findings
db = SessionLocal()
try:
matrix = load_browser_matrix(db, snapshot_id)
if matrix:
matrix["cross_findings"] = build_cross_findings(matrix)
return {"browser_matrix": matrix}
finally:
db.close()