'
f'| '
- f'{name}{flag_str} | '
+ f'{name}{risk_badge}{flag_str}'
f'{category} | '
f'{country} | '
f''
diff --git a/backend-compliance/compliance/api/agent_doc_check_redundancy.py b/backend-compliance/compliance/api/agent_doc_check_redundancy.py
index 6776e3b2..dcabcc87 100644
--- a/backend-compliance/compliance/api/agent_doc_check_redundancy.py
+++ b/backend-compliance/compliance/api/agent_doc_check_redundancy.py
@@ -28,9 +28,10 @@ def build_redundancy_html(report: dict | None) -> str:
pct = s.get("estimated_saving_pct") or "n/a"
parts = [
- '',
+ ' ',
' '
'Optimierungspotenzial: Redundanzen + EU-Alternativen',
f' '
diff --git a/backend-compliance/compliance/api/agent_doc_check_report.py b/backend-compliance/compliance/api/agent_doc_check_report.py
index 43f355d0..fa1aefab 100644
--- a/backend-compliance/compliance/api/agent_doc_check_report.py
+++ b/backend-compliance/compliance/api/agent_doc_check_report.py
@@ -134,7 +134,9 @@ def build_management_summary(results: list[DocCheckResult]) -> str:
ok = [r for r in results if r.completeness_pct == 100 and not r.error]
fixable = [r for r in results if 0 < r.completeness_pct < 100 and not r.error]
critical = [r for r in results if r.completeness_pct == 0 and not r.error]
- errors = [r for r in results if r.error]
+ not_applicable = [r for r in results if r.error
+ and r.error.startswith("Nicht anwendbar")]
+ errors = [r for r in results if r.error and r not in not_applicable]
html = [
' '
- 'Alle Dokumente sind vollstaendig. Keine dringenden Massnahmen noetig.'
+ f' '
+ f'Alle Dokumente sind vollstaendig. Keine dringenden Massnahmen noetig.'
+ f'{na_note} '
)
else:
html.append(
f' '
f'{len(ok)} von {total} Dokumenten sind vollstaendig. '
f'{len(fixable)} brauchen Korrekturen'
- f'{f", {len(critical)} fehlen oder sind unbrauchbar" if critical else ""}. '
+ f'{f", {len(critical)} fehlen oder sind unbrauchbar" if critical else ""}.'
+ f'{na_note}'
)
# Concrete actions
@@ -279,10 +288,13 @@ def _render_document(html: list[str], r: DocCheckResult, doc_text: str = "") ->
r.error.startswith("Nicht eingereicht")
or r.error.startswith("Auf der Website nicht gefunden")
)
+ is_not_applicable = bool(r.error) and r.error.startswith("Nicht anwendbar")
if is_missing:
status_label = ("NICHT GEFUNDEN"
if r.error.startswith("Auf der Website")
else "NICHT EINGEREICHT")
+ elif is_not_applicable:
+ status_label = "NICHT ANWENDBAR"
elif r.error:
status_label = "FEHLER"
@@ -330,6 +342,13 @@ def _render_document(html: list[str], r: DocCheckResult, doc_text: str = "") ->
'background:#fafafa;border-top:1px solid #f3f4f6">'
+ body_msg + ' '
)
+ elif is_not_applicable:
+ html.append(
+ ' '
+ + r.error + ' '
+ )
elif r.error:
html.append(f' {r.error} ')
else:
diff --git a/backend-compliance/compliance/api/agent_doc_check_scorecard.py b/backend-compliance/compliance/api/agent_doc_check_scorecard.py
index 5b7c9083..b32854e1 100644
--- a/backend-compliance/compliance/api/agent_doc_check_scorecard.py
+++ b/backend-compliance/compliance/api/agent_doc_check_scorecard.py
@@ -44,7 +44,7 @@ def build_scorecard_html(
trend_str = _delta_badge(overall_pct, prev_total_pct) if prev_total_pct is not None else ""
head = (
- ' '
' '
diff --git a/backend-compliance/compliance/api/agent_findings_routes.py b/backend-compliance/compliance/api/agent_findings_routes.py
new file mode 100644
index 00000000..1ef51c45
--- /dev/null
+++ b/backend-compliance/compliance/api/agent_findings_routes.py
@@ -0,0 +1,104 @@
+"""
+Voll-Audit Findings Router — unified view across all 4 finding sources.
+
+Endpoint:
+ GET /api/compliance/agent/findings/{check_id}
+ ?source=mc|pflichtangabe|vendor|redundanz|all
+ &severity=CRITICAL|HIGH|MEDIUM|LOW|INFO|all
+ &doc_type=impressum|dse|cookie|...|all
+ &status=failed|passed|skipped|na|info|all
+ &q=
+ &limit=
+
+Liefert summary + filtered findings list. Frontend rendert daraus den
+Voll-Audit-Tab unter /sdk/agent/audit/.
+"""
+
+from __future__ import annotations
+
+import logging
+from urllib.parse import urlparse
+from fastapi import APIRouter, HTTPException, Query
+
+from compliance.services.unified_findings_store import (
+ findings_summary,
+ list_findings,
+)
+from compliance.services.compliance_audit_log import get_check_run
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter(prefix="/compliance/agent", tags=["agent"])
+
+
+def _normalize_domain(d: str) -> str:
+ if not d:
+ return ""
+ if "://" not in d:
+ d = "https://" + d
+ host = urlparse(d).netloc.lower()
+ return host[4:] if host.startswith("www.") else host
+
+
+@router.get("/findings/{check_id}")
+def get_findings(
+ check_id: str,
+ source: str | None = Query(None, description="mc|pflichtangabe|vendor|redundanz|all"),
+ severity: str | None = Query(None, description="CRITICAL|HIGH|MEDIUM|LOW|INFO|all"),
+ doc_type: str | None = Query(None),
+ status: str | None = Query(None, description="failed|passed|skipped|na|info|all"),
+ q: str | None = Query(None, description="freitext-suche label/vendor"),
+ limit: int = Query(1000, ge=1, le=5000),
+ expected_domain: str | None = Query(
+ None, description="Hard-Assertion: Run muss zu dieser Domain gehoeren (Cross-Tenant-Schutz)",
+ ),
+) -> dict:
+ """Return aggregated findings + summary counters for a check run."""
+ # P7-Restpunkt: optionale Domain-Assertion. Verhindert dass ein Frontend
+ # einen check_id einer fremden Tenant-Domain anfragen kann.
+ if expected_domain:
+ run = get_check_run(check_id)
+ actual = _normalize_domain((run or {}).get("base_domain") or "")
+ if not run or actual != _normalize_domain(expected_domain):
+ raise HTTPException(
+ status_code=403,
+ detail=f"Cross-tenant access blocked: check_id {check_id} "
+ f"gehoert zu Domain '{actual or '?'}', angefragt: "
+ f"'{_normalize_domain(expected_domain)}'",
+ )
+ try:
+ summary = findings_summary(check_id)
+ findings = list_findings(
+ check_id=check_id,
+ source_type=source,
+ severity=severity,
+ doc_type=doc_type,
+ status=status,
+ q=q,
+ limit=limit,
+ )
+ return {
+ "found": summary.get("total", 0) > 0,
+ "check_id": check_id,
+ "summary": summary,
+ "filter": {
+ "source": source or "all",
+ "severity": severity or "all",
+ "doc_type": doc_type or "all",
+ "status": status or "all",
+ "q": q or "",
+ "limit": limit,
+ },
+ "count": len(findings),
+ "findings": findings,
+ }
+ except Exception as e:
+ logger.exception("get_findings failed for %s", check_id)
+ return {
+ "found": False,
+ "check_id": check_id,
+ "error": str(e)[:200],
+ "summary": {},
+ "count": 0,
+ "findings": [],
+ }
diff --git a/backend-compliance/compliance/api/saving_scan_routes.py b/backend-compliance/compliance/api/saving_scan_routes.py
new file mode 100644
index 00000000..34b207b5
--- /dev/null
+++ b/backend-compliance/compliance/api/saving_scan_routes.py
@@ -0,0 +1,196 @@
+"""
+Saving-Scan-Funnel Endpoint — Marketing-Lead → Compliance-Check.
+
+Externes Form (https://breakpilot.ai/savings-scan) postet hier:
+ POST /api/compliance/agent/saving-scan/start
+ Body: {"url": "...", "email": "..."}
+
+Server-side:
+ 1. Validierung URL + Email (E-Mail-Regex, URL-Schema).
+ 2. Rate-Limit: max 1 vollstaendiger Scan / Domain / 24h
+ (saving_scan_allowed aus compliance_user_agent).
+ 3. Lead persistieren (saving_scan_leads in Sidecar-SQLite) — fuer
+ spaeteren Report-Versand + Sales-Follow-Up.
+ 4. Compliance-Check starten mit Auto-Discovery (DocumentInput leer
+ ausser Homepage). Der bestehende Worker laeuft TDM-Check, dann
+ Discovery, dann Pruefung.
+ 5. check_id zurueck — Frontend pollt /compliance-check/.
+"""
+
+from __future__ import annotations
+
+import logging
+import os
+import re
+import sqlite3
+import uuid as _uuid
+from datetime import datetime, timezone
+from pathlib import Path
+
+import asyncio
+from fastapi import APIRouter, HTTPException
+from pydantic import BaseModel, Field
+
+from compliance.services.compliance_user_agent import (
+ base_domain_of, saving_scan_allowed,
+)
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter(prefix="/compliance/agent", tags=["agent"])
+
+DB_PATH = os.getenv("COMPLIANCE_AUDIT_DB", "/data/compliance_audits.db")
+
+_EMAIL_RE = re.compile(r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$")
+_URL_RE = re.compile(r"^https?://[A-Za-z0-9.-]+(/.*)?$")
+
+
+class SavingScanRequest(BaseModel):
+ url: str = Field(..., min_length=4, max_length=400)
+ email: str = Field(..., min_length=5, max_length=200)
+ consent: bool = Field(
+ True, description="Marketing-Consent fuer Sales-Follow-Up — "
+ "muss True sein laut Form-Checkbox.",
+ )
+
+
+class SavingScanResponse(BaseModel):
+ check_id: str
+ status: str
+ message: str = ""
+
+
+def _ensure_leads_table() -> None:
+ Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
+ with sqlite3.connect(DB_PATH) as conn:
+ conn.executescript("""
+ CREATE TABLE IF NOT EXISTS saving_scan_leads (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ ts TEXT NOT NULL,
+ email TEXT NOT NULL,
+ url TEXT NOT NULL,
+ base_domain TEXT NOT NULL,
+ check_id TEXT,
+ consent INTEGER NOT NULL,
+ source TEXT
+ );
+ CREATE INDEX IF NOT EXISTS idx_leads_domain ON saving_scan_leads(base_domain, ts);
+ CREATE INDEX IF NOT EXISTS idx_leads_email ON saving_scan_leads(email, ts);
+ """)
+
+
+def _persist_lead(email: str, url: str, check_id: str, consent: bool) -> None:
+ try:
+ _ensure_leads_table()
+ with sqlite3.connect(DB_PATH) as conn:
+ conn.execute(
+ "INSERT INTO saving_scan_leads "
+ "(ts, email, url, base_domain, check_id, consent, source) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?)",
+ (
+ datetime.now(timezone.utc).isoformat(),
+ email.lower().strip(),
+ url,
+ base_domain_of(url),
+ check_id,
+ 1 if consent else 0,
+ "saving_scan_form",
+ ),
+ )
+ conn.commit()
+ except Exception as e:
+ logger.warning("persist lead failed: %s", e)
+
+
+def _normalize_url(url: str) -> str:
+ """Strip path → behaupt nur Homepage, der Discover findet den Rest."""
+ if "://" not in url:
+ url = "https://" + url
+ from urllib.parse import urlparse
+ p = urlparse(url)
+ return f"{p.scheme}://{p.netloc}/"
+
+
+@router.post("/saving-scan/start", response_model=SavingScanResponse)
+async def start_saving_scan(req: SavingScanRequest) -> SavingScanResponse:
+ """Trigger compliance check from the marketing-funnel form."""
+ if not _EMAIL_RE.match(req.email):
+ raise HTTPException(400, "Ungueltige E-Mail-Adresse.")
+ if not _URL_RE.match(req.url):
+ raise HTTPException(400, "URL muss mit http:// oder https:// beginnen.")
+ if not req.consent:
+ raise HTTPException(400, "Marketing-Consent erforderlich.")
+
+ domain = base_domain_of(req.url)
+ if not domain:
+ raise HTTPException(400, "Konnte Domain nicht ermitteln.")
+
+ allowed, wait_s = saving_scan_allowed(req.url)
+ if not allowed:
+ raise HTTPException(
+ 429,
+ f"Fuer '{domain}' wurde in den letzten 24h bereits ein Scan "
+ f"durchgefuehrt. Bitte in {wait_s // 3600}h {wait_s % 3600 // 60}min "
+ f"erneut versuchen.",
+ )
+
+ # Lazy import to avoid circular dependency at module load.
+ from compliance.api.agent_compliance_check_routes import (
+ DocumentInput,
+ ComplianceCheckRequest,
+ _run_compliance_check,
+ _compliance_check_jobs,
+ )
+
+ homepage = _normalize_url(req.url)
+ check_id = str(_uuid.uuid4())[:8]
+ _compliance_check_jobs[check_id] = {
+ "status": "running",
+ "progress": "Saving-Scan gestartet — Auto-Discovery laeuft...",
+ "progress_pct": 0,
+ "result": None,
+ "error": "",
+ }
+
+ # Single "other" entry forces auto-discovery to fill in the rest.
+ docs = [DocumentInput(doc_type="other", url=homepage)]
+ check_req = ComplianceCheckRequest(
+ documents=docs, recipient=req.email.lower().strip(),
+ )
+
+ _persist_lead(req.email, req.url, check_id, req.consent)
+ asyncio.create_task(_run_compliance_check(check_id, check_req))
+
+ logger.info("saving-scan start: check_id=%s domain=%s email=%s",
+ check_id, domain, req.email[:3] + "***")
+ return SavingScanResponse(
+ check_id=check_id,
+ status="running",
+ message=f"Scan gestartet fuer {domain}. Bericht in ~3-5 Minuten.",
+ )
+
+
+@router.get("/saving-scan/lead-count")
+def saving_scan_lead_count() -> dict:
+ """Diagnostik fuer das Sales-Dashboard."""
+ try:
+ _ensure_leads_table()
+ with sqlite3.connect(DB_PATH) as conn:
+ total = conn.execute(
+ "SELECT COUNT(*) FROM saving_scan_leads",
+ ).fetchone()[0]
+ last_24h = conn.execute(
+ "SELECT COUNT(*) FROM saving_scan_leads "
+ "WHERE ts > datetime('now', '-1 day')",
+ ).fetchone()[0]
+ top_domains = conn.execute(
+ "SELECT base_domain, COUNT(*) AS n FROM saving_scan_leads "
+ "GROUP BY base_domain ORDER BY n DESC LIMIT 10",
+ ).fetchall()
+ return {
+ "total_leads": total,
+ "last_24h": last_24h,
+ "top_domains": [{"domain": d, "scans": n} for d, n in top_domains],
+ }
+ except Exception as e:
+ return {"error": str(e)[:200]}
diff --git a/backend-compliance/compliance/services/benchmark_k_anonymity.py b/backend-compliance/compliance/services/benchmark_k_anonymity.py
new file mode 100644
index 00000000..6835b6e7
--- /dev/null
+++ b/backend-compliance/compliance/services/benchmark_k_anonymity.py
@@ -0,0 +1,149 @@
+"""
+k-Anonymitaets-Helper fuer Branchen-Benchmarks (P6-Vorbereitung).
+
+Vor jeder Veroeffentlichung von Benchmark-Aussagen pruefen, ob die
+zugrundeliegende Stichprobe gross genug ist, dass keine Re-Identifikation
+einzelner Hersteller moeglich wird.
+
+Default k=5: jede publizierbare Aussage muss auf mindestens 5 verschiedenen
+Datensubjekten (z.B. OEM-Sites) beruhen. Bei OEM-Markt mit ~30 Spielern
+ist k=5 das Minimum, um "ein deutscher Premium-Hersteller mit X Modellen"
+auszuschliessen.
+
+Memory: feedback_oem_data_legal.md + project_legal_contracts_2026_07.md.
+
+Verwendung:
+ from compliance.services.benchmark_k_anonymity import (
+ enforce_k_anonymity, quantize_value, KAnonymityError,
+ )
+
+ rows = [...] # pro Hersteller 1 Row
+ safe_groups = enforce_k_anonymity(rows, group_keys=["segment", "country"])
+ # safe_groups: nur Gruppen mit count >= 5 zurueck
+"""
+
+from __future__ import annotations
+
+from collections.abc import Iterable
+from typing import Any
+
+DEFAULT_K = 5
+
+
+class KAnonymityError(RuntimeError):
+ """Stichprobe ist zu klein fuer eine publizierbare Aussage."""
+
+
+def assert_min_sample(n: int, k: int = DEFAULT_K, context: str = "") -> None:
+ """Wirft KAnonymityError wenn n < k."""
+ if n < k:
+ raise KAnonymityError(
+ f"Stichprobe zu klein fuer Publikation: n={n} < k={k}"
+ + (f" — Kontext: {context}" if context else "")
+ )
+
+
+def quantize_value(value: float | int, step: int = 5) -> int:
+ """Quantisiere Zahlenwerte auf step-Vielfache (Generalisierung).
+
+ quantize_value(67, 5) -> 65
+ quantize_value(83, 10) -> 80
+
+ Verhindert exakte Identifizierung ueber numerische Signale.
+ """
+ if step <= 0:
+ return int(value)
+ return int(value // step) * step
+
+
+def quantize_range(value: float | int, step: int = 10) -> str:
+ """Gib ein Range-Bucket zurueck als String: '60-70%', '80-90%'."""
+ base = quantize_value(value, step)
+ return f"{base}-{base + step}%"
+
+
+def group_and_count(
+ rows: Iterable[dict],
+ keys: list[str],
+) -> dict[tuple, int]:
+ """Gruppiere Rows nach allen `keys` und zaehle pro Bucket."""
+ counts: dict[tuple, int] = {}
+ for r in rows:
+ bucket = tuple(r.get(k, "") for k in keys)
+ counts[bucket] = counts.get(bucket, 0) + 1
+ return counts
+
+
+def enforce_k_anonymity(
+ rows: list[dict],
+ group_keys: list[str],
+ k: int = DEFAULT_K,
+) -> list[dict]:
+ """Filtere Rows so, dass jede ueberlebende Gruppe >= k Mitglieder hat.
+
+ Returns: Rows die in ausreichend grossen Gruppen sind.
+ Rows in zu kleinen Gruppen werden suppressed (entfernt).
+ """
+ counts = group_and_count(rows, group_keys)
+ safe_buckets = {bucket for bucket, n in counts.items() if n >= k}
+ return [
+ r for r in rows
+ if tuple(r.get(key, "") for key in group_keys) in safe_buckets
+ ]
+
+
+def summarize_benchmark(
+ rows: list[dict],
+ group_keys: list[str],
+ measure_key: str,
+ k: int = DEFAULT_K,
+ quantize_step: int = 5,
+) -> list[dict]:
+ """Erzeuge publizierbare Benchmark-Aggregat-Zeilen.
+
+ Pro Gruppe: count, mean (quantisiert), only-if count >= k.
+ Liefert sortiert nach count desc.
+
+ Beispiel:
+ rows = [{"segment": "premium", "consent_score": 84}, ...]
+ summarize_benchmark(rows, ["segment"], "consent_score")
+ -> [{"segment": "premium", "n": 8, "mean_quantized": 80}, ...]
+ """
+ buckets: dict[tuple, list[float]] = {}
+ for r in rows:
+ bucket = tuple(r.get(k, "") for k in group_keys)
+ val = r.get(measure_key)
+ if val is not None:
+ buckets.setdefault(bucket, []).append(float(val))
+
+ out: list[dict] = []
+ for bucket, values in buckets.items():
+ n = len(values)
+ if n < k:
+ continue
+ mean = sum(values) / n
+ entry: dict[str, Any] = {key: bucket[i] for i, key in enumerate(group_keys)}
+ entry["n"] = n
+ entry["mean_quantized"] = quantize_value(mean, quantize_step)
+ entry["mean_range"] = quantize_range(mean, quantize_step * 2)
+ out.append(entry)
+ out.sort(key=lambda e: e["n"], reverse=True)
+ return out
+
+
+def safe_to_publish(
+ statement: str,
+ sample_size: int,
+ k: int = DEFAULT_K,
+) -> tuple[bool, str]:
+ """Validator fuer Marketing/Press-Statements.
+
+ Returns (ok, message). Wenn ok=False, NICHT publishen.
+ """
+ if sample_size < k:
+ return False, (
+ f'Aussage NICHT publizierbar: "{statement[:60]}…" '
+ f'(n={sample_size} < k={k}). Risiko: Re-Identifikation '
+ f'einzelner Hersteller moeglich.'
+ )
+ return True, f"OK (n={sample_size}, k={k})"
diff --git a/backend-compliance/compliance/services/business_profiler.py b/backend-compliance/compliance/services/business_profiler.py
index 2f511ec1..cf127614 100644
--- a/backend-compliance/compliance/services/business_profiler.py
+++ b/backend-compliance/compliance/services/business_profiler.py
@@ -28,6 +28,12 @@ class BusinessProfile:
needs_odr: bool = False # Online-Streitbeilegung
detected_services: list[str] = field(default_factory=list)
confidence: float = 0.0
+ # Wenn True: die Site selbst schliesst KEINEN Direktkauf-Vertrag
+ # (typisch OEM-Konfigurator-Sites BMW/Audi/Mercedes — Vertrag laeuft
+ # ueber den Vertragshaendler, nicht die Hersteller-Webseite).
+ # Konsequenz: AGB/Widerruf/Nutzungsbedingungen sind NICHT PFLICHT
+ # auf der Website, sondern werden beim Haendler ausgehaendigt.
+ no_direct_sales: bool = False
# ── Keyword lists ────────────────────────────────────────────────────
@@ -319,4 +325,49 @@ async def detect_business_profile(documents: dict[str, str]) -> BusinessProfile:
"steuerberater": "finance", "architekt": "craft"}
profile.industry = prof_map.get(profile.regulated_profession_type, "unknown")
+ # ── no_direct_sales (OEM-Konfigurator-Pattern) ───────────────
+ # Hersteller-Sites die nur konfigurieren + zu Vertragshaendlern
+ # weiterleiten (BMW/Audi/Mercedes/VW/Porsche) schliessen KEINEN
+ # Direkt-Kaufvertrag. AGB/Widerruf/Nutzungsbedingungen sind dort
+ # nicht Pflicht — werden beim Haendler ausgehaendigt.
+ profile.no_direct_sales = _detect_no_direct_sales(full_text)
+
return profile
+
+
+# Indikatoren: Site verweist primaer auf Vertragshaendler/Niederlassungen
+# statt einen eigenen Checkout-Vertragsabschluss zu bieten.
+_NO_DIRECT_SALES_POSITIVE = [
+ "vertragshaendler", "vertragshändler", "vertragspartner",
+ "vertragswerkstatt", "haendlersuche", "händlersuche",
+ "niederlassung", "vertretung", "autorisierter haendler",
+ "autorisierter händler", "ihr haendler vor ort",
+ "ihr händler vor ort", "haendler in ihrer naehe",
+ "händler in ihrer nähe", "probefahrt vereinbaren",
+ "anfrage an haendler", "anfrage an händler",
+ "konfigurator", "fahrzeug konfigurieren",
+ "ihre individuelle anfrage",
+ # OEM-Markennamen — sind Hersteller-Marken die ueblicherweise via
+ # Haendler vertreiben.
+ "bmw vertriebs", "audi vertriebs", "mercedes-benz vertriebs",
+ "volkswagen vertriebs", "porsche zentrum",
+]
+
+# Indikatoren GEGEN no_direct_sales: echte Online-Shop-Funktionen.
+_DIRECT_SALES_NEGATIVE = [
+ "in den warenkorb", "warenkorb hinzu", "zur kasse",
+ "jetzt kaufen", "kostenpflichtig bestellen",
+ "zahlungspflichtig bestellen", "sofort-kauf",
+ "online bestellen", "lieferadresse", "rechnungsadresse",
+]
+
+
+def _detect_no_direct_sales(full_text: str) -> bool:
+ """Heuristik: erkennt OEM-Konfigurator-Sites die nicht direkt verkaufen."""
+ text = full_text.lower()
+ pos = sum(1 for k in _NO_DIRECT_SALES_POSITIVE if k in text)
+ neg = sum(1 for k in _DIRECT_SALES_NEGATIVE if k in text)
+ # Mindestens 3 Haendler-Indikatoren UND weniger Shop-Indikatoren als
+ # Haendler-Indikatoren. Vermeidet false-positive fuer Shops die
+ # zusaetzlich "Haendlersuche" als Filiale-Finder anbieten.
+ return pos >= 3 and pos > neg
diff --git a/backend-compliance/compliance/services/compliance_user_agent.py b/backend-compliance/compliance/services/compliance_user_agent.py
new file mode 100644
index 00000000..ff6da062
--- /dev/null
+++ b/backend-compliance/compliance/services/compliance_user_agent.py
@@ -0,0 +1,141 @@
+"""
+Zentraler User-Agent-Provider + Domain-Rate-Limiter fuer alle Crawls.
+
+UA-Switch ist Trigger-gebunden an Firmengruendung:
+ - aktuell (Vor-Gruendung): generischer Headless-Chrome-UA
+ - nach Gruendung: env BREAKPILOT_BRANDED_UA=1 setzen
+ -> "BreakPilot-Compliance-Scanner/1.0 (+https://...)"
+
+Memory: project_legal_contracts_2026_07.md (Punkt 0).
+
+Rate-Limit:
+ - Default 1 req/sec/Domain, max 2 concurrent pro Domain.
+ - Saving-Scan-Funnel separat: max 1 vollstaendiger Run / Domain / 24h.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import os
+import time
+from collections import defaultdict
+from urllib.parse import urlparse
+
+
+_BRANDED_UA = (
+ "BreakPilot-Compliance-Scanner/1.0 "
+ "(+https://breakpilot.ai/scanner)"
+)
+_NEUTRAL_UA = (
+ "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
+ "(KHTML, like Gecko) HeadlessChrome/120.0.0.0 Safari/537.36"
+)
+
+
+def crawler_user_agent() -> str:
+ """Aktueller UA-String fuer alle ausgehenden Crawls.
+
+ Switcht auf den Markennamen sobald BREAKPILOT_BRANDED_UA=1 gesetzt
+ wird (nach Firmengruendung — siehe Memory).
+ """
+ branded = (os.getenv("BREAKPILOT_BRANDED_UA") or "").strip().lower()
+ if branded in ("1", "true", "yes"):
+ return _BRANDED_UA
+ return _NEUTRAL_UA
+
+
+def default_request_headers() -> dict:
+ """Vollstaendiger Header-Satz fuer httpx-Calls."""
+ return {
+ "User-Agent": crawler_user_agent(),
+ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
+ "Accept-Language": "de-DE,de;q=0.9,en;q=0.8",
+ }
+
+
+def base_domain_of(url_or_host: str) -> str:
+ if not url_or_host:
+ return ""
+ if "://" not in url_or_host:
+ url_or_host = "https://" + url_or_host
+ netloc = urlparse(url_or_host).netloc.lower()
+ return netloc.replace("www.", "") or url_or_host
+
+
+# --- per-Domain Rate-Limit ----------------------------------------------
+
+_MIN_INTERVAL_S = 1.0 # 1 req/sec/Domain
+_MAX_CONCURRENT_PER_DOMAIN = 2
+
+_last_request_at: dict[str, float] = defaultdict(float)
+_semaphores: dict[str, asyncio.Semaphore] = {}
+_locks_lock = asyncio.Lock()
+
+
+async def _get_semaphore(domain: str) -> asyncio.Semaphore:
+ async with _locks_lock:
+ sem = _semaphores.get(domain)
+ if sem is None:
+ sem = asyncio.Semaphore(_MAX_CONCURRENT_PER_DOMAIN)
+ _semaphores[domain] = sem
+ return sem
+
+
+class DomainRateLimiter:
+ """Async-Context: warte vor Request + halte concurrent-Slot.
+
+ async with DomainRateLimiter(url):
+ resp = await client.get(url)
+ """
+
+ def __init__(self, url_or_domain: str):
+ self.domain = base_domain_of(url_or_domain)
+
+ async def __aenter__(self):
+ sem = await _get_semaphore(self.domain)
+ await sem.acquire()
+ last = _last_request_at[self.domain]
+ wait = (last + _MIN_INTERVAL_S) - time.monotonic()
+ if wait > 0:
+ await asyncio.sleep(wait)
+ _last_request_at[self.domain] = time.monotonic()
+ self._sem = sem
+ return self
+
+ async def __aexit__(self, exc_type, exc, tb):
+ self._sem.release()
+ return False
+
+
+# --- per-Domain "1 full run / 24h" (Saving-Scan) -----------------------
+
+_DB_PATH = os.getenv("COMPLIANCE_AUDIT_DB", "/data/compliance_audits.db")
+_SAVING_SCAN_INTERVAL_S = 24 * 3600
+
+
+def saving_scan_allowed(domain_or_url: str) -> tuple[bool, int]:
+ """True wenn fuer diese Domain in den letzten 24h kein Saving-Scan lief.
+
+ Liest aus compliance_audit_log.check_runs (existierende Tabelle).
+ Liefert (allowed, seconds_until_allowed).
+ """
+ import sqlite3
+ domain = base_domain_of(domain_or_url)
+ if not domain:
+ return True, 0
+ try:
+ with sqlite3.connect(_DB_PATH) as conn:
+ row = conn.execute(
+ "SELECT MAX(ts) FROM check_runs WHERE base_domain=?",
+ (domain,),
+ ).fetchone()
+ last = row[0] if row else None
+ if not last:
+ return True, 0
+ from datetime import datetime
+ elapsed = time.time() - datetime.fromisoformat(last).timestamp()
+ if elapsed >= _SAVING_SCAN_INTERVAL_S:
+ return True, 0
+ return False, int(_SAVING_SCAN_INTERVAL_S - elapsed)
+ except Exception:
+ return True, 0
diff --git a/backend-compliance/compliance/services/cookie_function_classifier.py b/backend-compliance/compliance/services/cookie_function_classifier.py
index 172580ca..a4911a27 100644
--- a/backend-compliance/compliance/services/cookie_function_classifier.py
+++ b/backend-compliance/compliance/services/cookie_function_classifier.py
@@ -129,20 +129,29 @@ def classify_cookie(cookie_name: str) -> tuple[str, str]:
def annotate_vendor_cookies(vendor: dict) -> dict:
- """Enrich a vendor record with functional_role per cookie."""
+ """Enrich a vendor record with functional_role + KB knowledge per cookie."""
+ from compliance.services.cookie_knowledge import (
+ lookup_cookie, summarize_compliance_risk,
+ )
cookies = vendor.get("cookies") or []
annotated = []
role_counts: dict[str, int] = {}
for c in cookies:
role, impact = classify_cookie(c.get("name", ""))
- annotated.append({**c, "functional_role": role, "blocking_impact": impact})
+ knowledge = lookup_cookie(c.get("name", ""))
+ entry = {**c, "functional_role": role, "blocking_impact": impact}
+ if knowledge:
+ entry["knowledge"] = knowledge
+ annotated.append(entry)
role_counts[role] = role_counts.get(role, 0) + 1
- return {
+ out = {
**vendor,
"cookies": annotated,
"role_distribution": role_counts,
"role_labels": {r: _FUNCTIONAL_LABEL.get(r, r) for r in role_counts},
}
+ out["compliance_risk"] = summarize_compliance_risk(out)
+ return out
def aggregate_cookie_purposes(vendors: Iterable[dict]) -> dict:
diff --git a/backend-compliance/compliance/services/cookie_knowledge.py b/backend-compliance/compliance/services/cookie_knowledge.py
new file mode 100644
index 00000000..dbf032d5
--- /dev/null
+++ b/backend-compliance/compliance/services/cookie_knowledge.py
@@ -0,0 +1,106 @@
+"""
+Cookie-Knowledge Facade — vereint die Basis-KB (cookie_knowledge_db) mit
+der Erweiterung (cookie_knowledge_extended) hinter einer einzigen API.
+
+Caller sollten von hier importieren statt von einer der beiden Sub-DBs.
+
+ from compliance.services.cookie_knowledge import (
+ lookup_cookie,
+ enrich_vendor_with_knowledge,
+ summarize_compliance_risk,
+ compliance_risk_label,
+ )
+
+Lookup-Reihenfolge: Extended (kuratiert, juenger) vor Base. Dadurch
+koennen wir Eintraege ueberschreiben ohne die Base zu touchen.
+"""
+
+from __future__ import annotations
+
+from compliance.services.cookie_knowledge_db import (
+ CookieKnowledge,
+ lookup_cookie as _lookup_base,
+)
+from compliance.services.cookie_knowledge_extended import (
+ KB_EXT,
+ lookup_cookie_extended,
+)
+
+
+def lookup_cookie(name: str) -> CookieKnowledge | None:
+ """Resolve cookie name to enriched knowledge — extended overrides base."""
+ return lookup_cookie_extended(name) or _lookup_base(name)
+
+
+def enrich_vendor_with_knowledge(vendor: dict) -> dict:
+ """Add per-cookie knowledge dict + per-vendor risk summary."""
+ cookies = vendor.get("cookies") or []
+ enriched = []
+ for c in cookies:
+ info = lookup_cookie(c.get("name", ""))
+ enriched.append({**c, "knowledge": info} if info else c)
+ out = {**vendor, "cookies": enriched}
+ out["compliance_risk"] = summarize_compliance_risk(out)
+ return out
+
+
+def summarize_compliance_risk(vendor: dict) -> dict:
+ """Aggregate Re-ID risk + Schrems-II exposure across all cookies."""
+ cookies = vendor.get("cookies") or []
+ risk_counts = {"high": 0, "medium": 0, "low": 0}
+ schrems_affected = 0
+ strictly_necessary = 0
+ classified = 0
+ for c in cookies:
+ k = c.get("knowledge") or lookup_cookie(c.get("name", ""))
+ if not k:
+ continue
+ classified += 1
+ risk = (k.get("reid_risk") or "low").lower()
+ risk_counts[risk] = risk_counts.get(risk, 0) + 1
+ if "us" in (k.get("vendor_country") or "").lower() or \
+ "schrems" in (k.get("schrems_ii_status") or "").lower():
+ schrems_affected += 1
+ if k.get("technical_necessity") == "full":
+ strictly_necessary += 1
+ return {
+ "reid_risk_distribution": risk_counts,
+ "high_risk_cookie_count": risk_counts["high"],
+ "schrems_ii_affected_cookies": schrems_affected,
+ "strictly_necessary_cookies": strictly_necessary,
+ "total_classified": classified,
+ "label": compliance_risk_label({
+ "high_risk_cookie_count": risk_counts["high"],
+ "schrems_ii_affected_cookies": schrems_affected,
+ "total_classified": classified,
+ }),
+ }
+
+
+def compliance_risk_label(summary: dict) -> str:
+ """Compact risk badge: 'kritisch' | 'hoch' | 'mittel' | 'gering' | 'unklar'."""
+ if not summary or not summary.get("total_classified"):
+ return "unklar"
+ high = summary.get("high_risk_cookie_count", 0)
+ schrems = summary.get("schrems_ii_affected_cookies", 0)
+ total = summary.get("total_classified", 0) or 1
+ if high >= 3 and schrems >= 2:
+ return "kritisch"
+ if high >= 2 or (high >= 1 and schrems >= 1):
+ return "hoch"
+ if high >= 1 or schrems >= 1:
+ return "mittel"
+ return "gering"
+
+
+def kb_size() -> dict:
+ """Diagnostik fuer den Admin/Health-Endpoint."""
+ from compliance.services.cookie_knowledge_db import KB as _KB_BASE
+ base_keys = set(_KB_BASE.keys())
+ ext_keys = set(KB_EXT.keys())
+ return {
+ "base_entries": len(base_keys),
+ "extended_entries": len(ext_keys),
+ "extended_overrides_base": len(base_keys & ext_keys),
+ "total_unique": len(base_keys | ext_keys),
+ }
diff --git a/backend-compliance/compliance/services/cookie_knowledge_extended.py b/backend-compliance/compliance/services/cookie_knowledge_extended.py
new file mode 100644
index 00000000..bb825ff7
--- /dev/null
+++ b/backend-compliance/compliance/services/cookie_knowledge_extended.py
@@ -0,0 +1,497 @@
+"""
+Cookie-Knowledge Erweiterung — Adobe, Meta erweitert, Microsoft, LinkedIn,
+TikTok, Salesforce/HubSpot/Marketo, Hotjar/Mouseflow/FullStory, Live-Chat,
+Cloudflare/Akamai, Payment, CMP-eigene Cookies, EU-Analytics.
+
+Hinweis zu Rechten: Eintraege enthalten ausschliesslich Identitaetsfelder
+(Cookie-Name, Anbieter, Sitzland) + EIGENE Knappformulierungen + Verweise
+auf oeffentliche EuGH-/CNIL-/EDPB-Quellen. KEINE 1:1-Kopien aus OneTrust,
+Cookiepedia oder Vendor-eigenen Beschreibungstexten.
+
+Quellen-Pointer: IAB TCF v2.2 Vendor List, CNIL Cookies & Trackers
+Guidelines 2024, EDPB Guidelines 2/2023, EuGH-Rechtsprechung (Schrems II,
+Planet49), DSK-Orientierungshilfen 2021/2024.
+"""
+
+from __future__ import annotations
+
+from compliance.services.cookie_knowledge_db import CookieKnowledge
+
+
+_ADOBE_BASE = {
+ "vendor": "Adobe Inc.", "vendor_country": "US",
+ "schrems_ii_status": "Drittlandtransfer US. Mit DPF (2023) wieder "
+ "zulaessig; EU-Datenresidenz-Option in Adobe "
+ "Experience Platform verfuegbar.",
+ "eugh_rulings": [
+ "EuGH C-311/18 (Schrems II)",
+ "EDPB Recommendations 01/2020 — Supplementary Measures",
+ ],
+}
+
+_META_BASE = {
+ "vendor": "Meta Platforms Ireland Ltd.", "vendor_country": "IE",
+ "schrems_ii_status": "Verarbeitung in IE + US-Transfer. DPC Ireland "
+ "Bussgeld 2023 (€1,2 Mrd) wegen unzureichender "
+ "Schutzmassnahmen — DPF deckt seit 2023.",
+ "eugh_rulings": [
+ "EuGH C-311/18 (Schrems II)",
+ "DPC Ireland 2023 — Meta 1,2 Mrd. EUR",
+ ],
+}
+
+_MICROSOFT_BASE = {
+ "vendor": "Microsoft Corp.", "vendor_country": "US",
+ "schrems_ii_status": "DPF-zertifiziert; EU Data Boundary fuer Azure/365 "
+ "seit 2024 verfuegbar.",
+ "eugh_rulings": ["EuGH C-311/18 (Schrems II)"],
+}
+
+_LINKEDIN_BASE = {
+ "vendor": "LinkedIn Ireland Unlimited Co.", "vendor_country": "IE",
+ "schrems_ii_status": "Microsoft-Konzern, EU-Hauptsitz IE, Transfer US.",
+ "eugh_rulings": ["EuGH C-311/18 (Schrems II)"],
+}
+
+
+KB_EXT: dict[str, CookieKnowledge] = {
+
+ # --- Adobe Experience Cloud --------------------------------------
+ # AMCV_, s_cc, s_sq leben in Base-KB.
+ "demdex": {
+ **_ADOBE_BASE,
+ "vendor": "Adobe Inc. (Audience Manager)",
+ "exact_purpose": "Adobe Audience Manager DMP — Cross-Site-Profil "
+ "fuer Zielgruppen-Segmentierung.",
+ "data_collected": ["dpuuid", "segments"],
+ "ip_relevant": True,
+ "tcf_purpose_ids": [4, 9, 10],
+ "typical_lifetime": "180 Tage",
+ "reid_risk": "high", "technical_necessity": "none",
+ },
+
+ # --- Meta erweitert -----------------------------------------------
+ # fr, _fbc leben in Base-KB.
+ "datr": {
+ **_META_BASE,
+ "exact_purpose": "Facebook Browser-Identifier — Anti-Abuse/Bot-Schutz.",
+ "data_collected": ["browser_fingerprint_id"],
+ "ip_relevant": True,
+ "typical_lifetime": "2 Jahre",
+ "reid_risk": "high", "technical_necessity": "partial",
+ "notes": "Wird auch ohne Consent gesetzt; Meta argumentiert "
+ "Sicherheit. Trotzdem von DSK 2024 kritisch bewertet.",
+ },
+ # --- Microsoft / Bing ---------------------------------------------
+ # MUID lebt in Base-KB.
+ "MSCC": {
+ **_MICROSOFT_BASE,
+ "exact_purpose": "Microsoft Site Consent — Consent-Status-Speicherung "
+ "fuer Microsoft-eigene Properties.",
+ "data_collected": ["consent_string"],
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "low", "technical_necessity": "full",
+ "notes": "Strictly necessary nach §25(2) TDDDG.",
+ },
+ "ai_session": {
+ **_MICROSOFT_BASE,
+ "vendor": "Microsoft Corp. (Application Insights)",
+ "exact_purpose": "Azure Application Insights — Session-Tracking fuer "
+ "Telemetry.",
+ "data_collected": ["session_id"],
+ "typical_lifetime": "30 Minuten",
+ "reid_risk": "medium", "technical_necessity": "partial",
+ },
+
+ # --- LinkedIn ------------------------------------------------------
+ "li_at": {
+ **_LINKEDIN_BASE,
+ "exact_purpose": "LinkedIn-Authentifizierung — Login-Session.",
+ "data_collected": ["auth_token"],
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "high", "technical_necessity": "full",
+ "notes": "Nur fuer eingeloggte Nutzer; auf externer Site = "
+ "Insight Tag (siehe li_sugr).",
+ },
+ "li_sugr": {
+ **_LINKEDIN_BASE,
+ "exact_purpose": "LinkedIn Insight Tag — Browser-ID fuer "
+ "Conversion-Tracking + Werbe-Targeting.",
+ "data_collected": ["browser_id"],
+ "ip_relevant": True,
+ "tcf_purpose_ids": [7, 9, 10],
+ "typical_lifetime": "90 Tage",
+ "reid_risk": "high", "technical_necessity": "none",
+ },
+ # bcookie, lidc leben in Base-KB.
+
+ # --- TikTok --------------------------------------------------------
+ "_ttp": {
+ "vendor": "TikTok Pte. Ltd.", "vendor_country": "SG/CN",
+ "exact_purpose": "TikTok Pixel — User-ID fuer Conversion-Tracking + "
+ "Werbeoptimierung.",
+ "data_collected": ["pixel_id", "browser_id"],
+ "ip_relevant": True,
+ "tcf_purpose_ids": [7, 9, 10],
+ "typical_lifetime": "13 Monate",
+ "reid_risk": "high", "technical_necessity": "none",
+ "schrems_ii_status": "Drittlandtransfer in Drittstaaten ohne "
+ "Angemessenheitsbeschluss. CNIL 2023 — "
+ "TikTok 5 Mio EUR Bussgeld.",
+ "eugh_rulings": [
+ "CNIL SAN-2022-027 — TikTok 5 Mio EUR",
+ "Italienische DPA 2024 — TikTok 10 Mio EUR",
+ ],
+ },
+ "ttwid": {
+ "vendor": "TikTok Pte. Ltd.", "vendor_country": "SG/CN",
+ "exact_purpose": "TikTok Web-Identifier — eindeutige Browser-ID auch "
+ "ohne Login.",
+ "data_collected": ["ttwid"],
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "high", "technical_necessity": "none",
+ "schrems_ii_status": "Wie _ttp.",
+ },
+
+ # --- HubSpot / Marketo / Salesforce ------------------------------
+ "hubspotutk": {
+ "vendor": "HubSpot Inc.", "vendor_country": "US",
+ "exact_purpose": "HubSpot User-Token — Cross-Visit-Identitaet fuer "
+ "Lead-Tracking.",
+ "data_collected": ["user_token"],
+ "ip_relevant": True,
+ "tcf_purpose_ids": [7, 8],
+ "typical_lifetime": "6 Monate",
+ "reid_risk": "high", "technical_necessity": "none",
+ "schrems_ii_status": "DPF-zertifiziert.",
+ },
+ "__hssc": {
+ "vendor": "HubSpot Inc.", "vendor_country": "US",
+ "exact_purpose": "HubSpot Session-Tracking — Pageviews innerhalb "
+ "einer Session.",
+ "data_collected": ["session_count"],
+ "typical_lifetime": "30 Minuten",
+ "reid_risk": "low", "technical_necessity": "none",
+ },
+ "_mkto_trk": {
+ "vendor": "Adobe Inc. (Marketo)", "vendor_country": "US",
+ "exact_purpose": "Marketo Munchkin-Tracker — Lead-Identifikation "
+ "fuer Marketing-Automation.",
+ "data_collected": ["munchkin_id", "session_id"],
+ "ip_relevant": True,
+ "typical_lifetime": "2 Jahre",
+ "reid_risk": "high", "technical_necessity": "none",
+ "schrems_ii_status": _ADOBE_BASE["schrems_ii_status"],
+ },
+ "BrowserId_sec": {
+ "vendor": "Salesforce.com Inc.", "vendor_country": "US",
+ "exact_purpose": "Salesforce Marketing Cloud Browser-Token — "
+ "Cross-Visit-Identifikation.",
+ "data_collected": ["browser_id"],
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "medium", "technical_necessity": "none",
+ "schrems_ii_status": "DPF-zertifiziert.",
+ },
+
+ # --- Session-Recording / Heatmaps ---------------------------------
+ "_hjSessionUser_": {
+ "vendor": "Hotjar Ltd.", "vendor_country": "MT",
+ "exact_purpose": "Hotjar User-ID — Cross-Visit-Identifikation fuer "
+ "Session-Recording + Heatmaps.",
+ "data_collected": ["user_id"],
+ "ip_relevant": True,
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "high", "technical_necessity": "none",
+ "schrems_ii_status": "EU (Malta) — kein Drittland. Aber: parent "
+ "Contentsquare (FR) hostet teilweise in US.",
+ "notes": "Suffix ``. Pattern-Match noetig. "
+ "DSGVO-Aufzeichnung = Einwilligung pflichtig.",
+ "eu_alternative_vendor": "Mouseflow / Smartlook (CZ)",
+ },
+ "_hjSession_": {
+ "vendor": "Hotjar Ltd.", "vendor_country": "MT",
+ "exact_purpose": "Hotjar Session-Token — eindeutige Session-ID "
+ "innerhalb 30min Inaktivitaet.",
+ "data_collected": ["session_id"],
+ "typical_lifetime": "30 Minuten",
+ "reid_risk": "medium", "technical_necessity": "none",
+ },
+ "fs_uid": {
+ "vendor": "FullStory Inc.", "vendor_country": "US",
+ "exact_purpose": "FullStory User-ID — Cross-Visit-Identifikation "
+ "fuer Session-Replay.",
+ "data_collected": ["user_id"],
+ "ip_relevant": True,
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "high", "technical_necessity": "none",
+ "schrems_ii_status": "DPF-zertifiziert. EU-Region verfuegbar (opt-in).",
+ },
+ "mf_user": {
+ "vendor": "Mouseflow Aps", "vendor_country": "DK",
+ "exact_purpose": "Mouseflow User-ID — Cross-Visit-Identifikation fuer "
+ "Heatmap + Recording.",
+ "data_collected": ["user_id"],
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "medium", "technical_necessity": "none",
+ "schrems_ii_status": "EU (DK) — kein Drittland.",
+ },
+
+ # --- Live-Chat ----------------------------------------------------
+ "intercom-id-": {
+ "vendor": "Intercom Inc.", "vendor_country": "US",
+ "exact_purpose": "Intercom Visitor-ID — Wiedererkennung anonymer "
+ "Besucher fuer Chat-History.",
+ "data_collected": ["visitor_id"],
+ "typical_lifetime": "9 Monate",
+ "reid_risk": "medium", "technical_necessity": "partial",
+ "schrems_ii_status": "DPF-zertifiziert; EU-Datenresidenz optional.",
+ "notes": "Suffix ``. Pattern-Match noetig.",
+ },
+ "driftt_aid": {
+ "vendor": "Salesforce.com Inc. (Drift)", "vendor_country": "US",
+ "exact_purpose": "Drift Anonymous-Visitor-ID fuer Chat-Personalisierung.",
+ "data_collected": ["visitor_id"],
+ "typical_lifetime": "2 Jahre",
+ "reid_risk": "medium", "technical_necessity": "partial",
+ },
+ "__zlcmid": {
+ "vendor": "Zendesk Inc.", "vendor_country": "US",
+ "exact_purpose": "Zendesk Chat Visitor-ID fuer Session-Tracking.",
+ "data_collected": ["chat_visitor_id"],
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "medium", "technical_necessity": "partial",
+ "schrems_ii_status": "DPF-zertifiziert; EU-Datacenter optional.",
+ },
+
+ # --- CDN / Sicherheit (strictly necessary) -----------------------
+ # __cf_bm, cf_clearance leben in Base-KB.
+ "AKA_A2": {
+ "vendor": "Akamai Technologies Inc.", "vendor_country": "US",
+ "exact_purpose": "Akamai Adaptive Acceleration — geroutete Best-Path-"
+ "Optimierung.",
+ "data_collected": ["a2_route"],
+ "typical_lifetime": "1 Stunde",
+ "reid_risk": "low", "technical_necessity": "full",
+ },
+
+ # --- Payment (strictly necessary fuer Checkout) ------------------
+ "__stripe_mid": {
+ "vendor": "Stripe Payments Europe Ltd.", "vendor_country": "IE",
+ "exact_purpose": "Stripe Fraud-Detection Merchant-ID — Risiko-Scoring "
+ "fuer Zahlungs-Authentifizierung.",
+ "data_collected": ["merchant_visitor_id"],
+ "ip_relevant": True,
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "low", "technical_necessity": "full",
+ "schrems_ii_status": "EU (IE) — kein Drittland.",
+ "notes": "Strictly necessary nach §25(2) TDDDG fuer Zahlungsabwicklung.",
+ },
+ "__stripe_sid": {
+ "vendor": "Stripe Payments Europe Ltd.", "vendor_country": "IE",
+ "exact_purpose": "Stripe Session-ID — temporaere Zahlungs-Session.",
+ "data_collected": ["session_id"],
+ "typical_lifetime": "30 Minuten",
+ "reid_risk": "low", "technical_necessity": "full",
+ },
+
+ # --- CMP-eigene Cookies (strictly necessary) ---------------------
+ "CookieConsent": {
+ "vendor": "Cybot A/S (Cookiebot)", "vendor_country": "DK",
+ "exact_purpose": "Cookiebot Consent-Speicherung — gewaehlte "
+ "Kategorien + Zeitstempel.",
+ "data_collected": ["consent_categories", "consent_timestamp"],
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "low", "technical_necessity": "full",
+ "schrems_ii_status": "EU (DK). Wenn EU-Cloud, kein Drittland.",
+ },
+ "OptanonConsent": {
+ "vendor": "OneTrust LLC", "vendor_country": "US",
+ "exact_purpose": "OneTrust Consent-Speicherung — Kategorien + "
+ "Vendor-Liste + Zeitstempel.",
+ "data_collected": ["consent_categories", "consent_string"],
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "low", "technical_necessity": "full",
+ "schrems_ii_status": "DPF-zertifiziert; EU-Cloud optional.",
+ },
+ "OptanonAlertBoxClosed": {
+ "vendor": "OneTrust LLC", "vendor_country": "US",
+ "exact_purpose": "OneTrust UI-Flag — verhindert Re-Display des "
+ "Banners nach Schliessung.",
+ "data_collected": ["closed_timestamp"],
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "low", "technical_necessity": "full",
+ },
+ "usercentrics-uuid": {
+ "vendor": "Usercentrics GmbH", "vendor_country": "DE",
+ "exact_purpose": "Usercentrics Consent-Speicherung — UUID-basiert.",
+ "data_collected": ["consent_uuid", "consent_settings"],
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "low", "technical_necessity": "full",
+ "schrems_ii_status": "DE — kein Drittland.",
+ },
+
+ # --- Weitere Social / Werbeplattformen ---------------------------
+ # _pin_unauth lebt in Base-KB.
+ "_scid": {
+ "vendor": "Snap Group Ltd.", "vendor_country": "GB/US",
+ "exact_purpose": "Snapchat Pixel — Conversion-Tracking fuer "
+ "Snap Ads.",
+ "data_collected": ["snap_visitor_id"],
+ "ip_relevant": True,
+ "tcf_purpose_ids": [7, 9, 10],
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "high", "technical_necessity": "none",
+ "schrems_ii_status": "Drittlandtransfer; UK seit 2021 mit "
+ "Angemessenheitsbeschluss.",
+ },
+ "guest_id": {
+ "vendor": "X Corp. (Twitter)", "vendor_country": "US",
+ "exact_purpose": "X/Twitter Guest-Identifier — Tracking nicht "
+ "eingeloggter Besucher inkl. Embeds.",
+ "data_collected": ["guest_id"],
+ "ip_relevant": True,
+ "tcf_purpose_ids": [4, 9, 10],
+ "typical_lifetime": "2 Jahre",
+ "reid_risk": "high", "technical_necessity": "none",
+ "schrems_ii_status": "DPF-Status unklar seit Eigentuemerwechsel 2022. "
+ "Erhoehtes Risiko, EDPB beobachtet.",
+ },
+ "VISITOR_INFO1_LIVE": {
+ "vendor": "Google Ireland Ltd. (YouTube)", "vendor_country": "IE",
+ "exact_purpose": "YouTube Embed Visitor-ID — Bandbreiten-Optimierung "
+ "+ Empfehlungsalgorithmus.",
+ "data_collected": ["youtube_visitor_id"],
+ "ip_relevant": True,
+ "tcf_purpose_ids": [8, 10],
+ "typical_lifetime": "6 Monate",
+ "reid_risk": "high", "technical_necessity": "none",
+ "notes": "YouTube-NoCookie-Domain (youtube-nocookie.com) reduziert "
+ "Tracking — DSGVO-konformer.",
+ },
+ "vuid": {
+ "vendor": "Vimeo Inc.", "vendor_country": "US",
+ "exact_purpose": "Vimeo User-Identifier — Wiedererkennung "
+ "wiederkehrender Besucher fuer Statistik.",
+ "data_collected": ["vimeo_user_id"],
+ "typical_lifetime": "2 Jahre",
+ "reid_risk": "medium", "technical_necessity": "none",
+ "schrems_ii_status": "DPF-zertifiziert.",
+ },
+
+ # --- Marketing-Automation / Email --------------------------------
+ "__kla_id": {
+ "vendor": "Klaviyo Inc.", "vendor_country": "US",
+ "exact_purpose": "Klaviyo Visitor-Tracking — fuer E-Mail-Marketing-"
+ "Attribution.",
+ "data_collected": ["klaviyo_id"],
+ "ip_relevant": True,
+ "typical_lifetime": "2 Jahre",
+ "reid_risk": "high", "technical_necessity": "none",
+ "schrems_ii_status": "DPF-zertifiziert.",
+ },
+ "_mcid": {
+ "vendor": "Intuit Mailchimp", "vendor_country": "US",
+ "exact_purpose": "Mailchimp Email-Click-Tracking — Verknuepft "
+ "Pageviews mit gesendeter Kampagne.",
+ "data_collected": ["mc_email_id"],
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "high", "technical_necessity": "none",
+ "schrems_ii_status": "DPF-zertifiziert.",
+ },
+
+ # --- Product-Analytics / CDP -------------------------------------
+ "mp_": {
+ "vendor": "Mixpanel Inc.", "vendor_country": "US",
+ "exact_purpose": "Mixpanel Distinct-ID + Properties — "
+ "Pseudonyme Event-Analytics.",
+ "data_collected": ["distinct_id", "properties"],
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "high", "technical_necessity": "none",
+ "schrems_ii_status": "DPF-zertifiziert; EU-Residency optional.",
+ "notes": "Suffix `_mixpanel`. Pattern-Match noetig.",
+ },
+ "ajs_anonymous_id": {
+ "vendor": "Twilio Inc. (Segment)", "vendor_country": "US",
+ "exact_purpose": "Segment Anonymous-ID — Cross-Device-Identitaet "
+ "vor Login.",
+ "data_collected": ["anonymous_id"],
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "high", "technical_necessity": "none",
+ "schrems_ii_status": "DPF-zertifiziert; EU-Datenresidenz optional.",
+ },
+ "AMP_": {
+ "vendor": "Amplitude Inc.", "vendor_country": "US",
+ "exact_purpose": "Amplitude Device-ID — Cross-Session-Identitaet "
+ "fuer Product-Analytics.",
+ "data_collected": ["device_id", "session_id"],
+ "typical_lifetime": "1 Jahr",
+ "reid_risk": "high", "technical_necessity": "none",
+ "schrems_ii_status": "DPF-zertifiziert.",
+ "notes": "Suffix ``. Pattern-Match noetig.",
+ },
+
+ # --- A/B-Testing -------------------------------------------------
+ "optimizelyEndUserId": {
+ "vendor": "Optimizely Inc.", "vendor_country": "US",
+ "exact_purpose": "Optimizely End-User-ID — konsistente "
+ "Experiment-Zuteilung pro Besucher.",
+ "data_collected": ["end_user_id", "variation_assignments"],
+ "typical_lifetime": "6 Monate",
+ "reid_risk": "medium", "technical_necessity": "none",
+ "schrems_ii_status": "DPF-zertifiziert.",
+ },
+
+ # --- RUM / Monitoring (oft strictly necessary diskutiert) --------
+ "_dd_s": {
+ "vendor": "Datadog Inc.", "vendor_country": "US",
+ "exact_purpose": "Datadog RUM Session-Tracking — Performance- "
+ "Monitoring + Fehler-Telemetrie.",
+ "data_collected": ["session_id", "session_type"],
+ "typical_lifetime": "15 Minuten",
+ "reid_risk": "low", "technical_necessity": "partial",
+ "schrems_ii_status": "EU-Region (Frankfurt) verfuegbar.",
+ "notes": "Bei reiner Server-/Fehler-Telemetrie ohne Cross-Site-"
+ "Tracking Argument fuer berechtigtes Interesse moeglich.",
+ },
+
+ # --- EU-Analytics-Alternativen -----------------------------------
+ "_pk_ref": {
+ "vendor": "InnoCraft Ltd. (Matomo)", "vendor_country": "NZ",
+ "exact_purpose": "Matomo Referrer-Tracking — Quelle des Besuchs.",
+ "data_collected": ["referrer", "campaign"],
+ "typical_lifetime": "6 Monate",
+ "reid_risk": "low", "technical_necessity": "none",
+ "schrems_ii_status": "NZ hat Angemessenheitsbeschluss (2012). "
+ "Bei On-Premise-Hosting kein Transfer.",
+ "notes": "Self-Hosting empfohlen — dann zeroes Drittland.",
+ },
+ "_pk_cvar": {
+ "vendor": "InnoCraft Ltd. (Matomo)", "vendor_country": "NZ",
+ "exact_purpose": "Matomo Custom-Variables — pro Visit konfigurierbar.",
+ "data_collected": ["custom_vars"],
+ "typical_lifetime": "30 Minuten",
+ "reid_risk": "low", "technical_necessity": "none",
+ },
+}
+
+
+# Pattern-Lookups fuer dynamische Cookie-Namen
+_EXT_PATTERNS: list[tuple[str, str]] = [
+ (r"^_hjSessionUser_", "_hjSessionUser_"),
+ (r"^_hjSession_", "_hjSession_"),
+ (r"^intercom-id-", "intercom-id-"),
+ (r"^mp_", "mp_"),
+ (r"^AMP_", "AMP_"),
+]
+
+
+def lookup_cookie_extended(name: str) -> CookieKnowledge | None:
+ """Lookup in der KB_EXT (Extension). None wenn nicht gefunden."""
+ import re
+ if not name: return None # noqa: E701
+ if name in KB_EXT: return KB_EXT[name] # noqa: E701
+ for pat, key in _EXT_PATTERNS:
+ if re.search(pat, name): return KB_EXT.get(key) # noqa: E701
+ base = name.split(".", 1)[0]
+ if base != name and base in KB_EXT: return KB_EXT[base] # noqa: E701
+ return None
diff --git a/backend-compliance/compliance/services/tdm_reservation_check.py b/backend-compliance/compliance/services/tdm_reservation_check.py
new file mode 100644
index 00000000..49304353
--- /dev/null
+++ b/backend-compliance/compliance/services/tdm_reservation_check.py
@@ -0,0 +1,242 @@
+"""
+TDM-Reservation-Check (§ 44b UrhG / EU CDSM Art. 4).
+
+Prueft pro Domain ob ein maschinenlesbarer Nutzungsvorbehalt fuer
+Text-and-Data-Mining gesetzt ist. Quellen:
+ 1. robots.txt — User-agent: * Disallow: / (oder spezifisch fuer uns)
+ 2. /ai.txt — neuer OpenAI-Standard
+ 3. HTTP-Header `tdm-reservation: 1` auf Homepage
+ 4. HTML auf Homepage
+ 5. HTML Tags
+
+Status-Interpretation:
+ status=allowed -> kein Vorbehalt, crawlbar
+ status=reserved -> expliziter Vorbehalt, NICHT crawlen
+ status=denied -> robots.txt-Zugriff aktiv blockiert (403/401)
+ => konservativ: NICHT crawlen
+ status=unknown -> Server-Error (500/timeout/DNS) auf robots.txt
+ => crawlbar, aber 24h-Recheck markiert
+
+Cache via sidecar SQLite (gleiche DB wie compliance_audit_log), 24h TTL.
+"""
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sqlite3
+import time
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Literal
+from urllib.parse import urlparse
+
+import httpx
+
+logger = logging.getLogger(__name__)
+
+DB_PATH = os.getenv("COMPLIANCE_AUDIT_DB", "/data/compliance_audits.db")
+CACHE_TTL_SECONDS = 24 * 3600
+
+Status = Literal["allowed", "reserved", "denied", "unknown"]
+
+_DEFAULT_UA = (
+ "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
+ "(KHTML, like Gecko) HeadlessChrome/120.0.0.0 Safari/537.36"
+)
+
+
+def _ensure_cache_table() -> None:
+ Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
+ with sqlite3.connect(DB_PATH) as conn:
+ conn.executescript("""
+ CREATE TABLE IF NOT EXISTS tdm_reservation_cache (
+ domain TEXT PRIMARY KEY,
+ ts TEXT NOT NULL,
+ status TEXT NOT NULL,
+ signals TEXT NOT NULL -- JSON list[dict]
+ );
+ CREATE INDEX IF NOT EXISTS idx_tdm_ts ON tdm_reservation_cache(ts);
+ """)
+
+
+def _cache_get(domain: str) -> dict | None:
+ try:
+ _ensure_cache_table()
+ with sqlite3.connect(DB_PATH) as conn:
+ conn.row_factory = sqlite3.Row
+ row = conn.execute(
+ "SELECT * FROM tdm_reservation_cache WHERE domain=?", (domain,),
+ ).fetchone()
+ if not row:
+ return None
+ ts = datetime.fromisoformat(row["ts"]).timestamp()
+ if time.time() - ts > CACHE_TTL_SECONDS:
+ return None
+ return {
+ "domain": domain,
+ "status": row["status"],
+ "signals": json.loads(row["signals"]),
+ "cached": True,
+ "ts": row["ts"],
+ }
+ except Exception as e:
+ logger.debug("tdm cache_get failed for %s: %s", domain, e)
+ return None
+
+
+def _cache_put(domain: str, status: Status, signals: list[dict]) -> None:
+ try:
+ _ensure_cache_table()
+ with sqlite3.connect(DB_PATH) as conn:
+ conn.execute(
+ "INSERT OR REPLACE INTO tdm_reservation_cache "
+ "(domain, ts, status, signals) VALUES (?, ?, ?, ?)",
+ (
+ domain,
+ datetime.now(timezone.utc).isoformat(),
+ status,
+ json.dumps(signals, ensure_ascii=False),
+ ),
+ )
+ conn.commit()
+ except Exception as e:
+ logger.warning("tdm cache_put failed for %s: %s", domain, e)
+
+
+def _base_domain(url_or_domain: str) -> str:
+ if not url_or_domain:
+ return ""
+ if "://" not in url_or_domain:
+ url_or_domain = "https://" + url_or_domain
+ netloc = urlparse(url_or_domain).netloc.lower()
+ return netloc.replace("www.", "")
+
+
+async def _fetch_status(client: httpx.AsyncClient, url: str) -> tuple[int, str, dict]:
+ """Return (status_code, body, headers). Body capped at 16 KiB."""
+ try:
+ resp = await client.get(url)
+ body = resp.text[:16384] if resp.content else ""
+ return resp.status_code, body, dict(resp.headers)
+ except Exception as e:
+ logger.debug("tdm fetch %s failed: %s", url, e)
+ return 0, "", {}
+
+
+def _robots_disallows_us(body: str) -> bool:
+ """Parse robots.txt — true if our group has Disallow: /."""
+ if not body:
+ return False
+ relevant_groups = ["*", "claudebot", "anthropic-ai", "gptbot",
+ "google-extended", "ccbot", "breakpilot"]
+ current_uas: list[str] = []
+ in_our_group = False
+ for raw in body.splitlines():
+ line = raw.split("#", 1)[0].strip()
+ if not line:
+ in_our_group = False
+ current_uas = []
+ continue
+ if ":" not in line:
+ continue
+ key, val = (s.strip().lower() for s in line.split(":", 1))
+ if key == "user-agent":
+ current_uas.append(val)
+ in_our_group = any(ua in relevant_groups for ua in current_uas)
+ elif key == "disallow" and in_our_group:
+ if val == "/" or val == "":
+ if val == "/":
+ return True
+ return False
+
+
+def _meta_has_reservation(body: str) -> bool:
+ """Detect with noai/noimageai/1."""
+ low = body.lower()
+ needles = [
+ 'name="tdm-reservation" content="1"',
+ "name='tdm-reservation' content='1'",
+ '"noai"', '"noimageai"',
+ "content=\"noai", "content='noai",
+ ]
+ return any(n in low for n in needles)
+
+
+async def check_tdm_reservation(domain_or_url: str) -> dict:
+ """Probe a domain for machine-readable TDM reservations.
+
+ Returns:
+ {
+ domain, status, signals: [{src, detail}], cached, ts
+ }
+ """
+ domain = _base_domain(domain_or_url)
+ if not domain:
+ return {"domain": "", "status": "unknown", "signals": [], "cached": False}
+
+ cached = _cache_get(domain)
+ if cached:
+ return cached
+
+ signals: list[dict] = []
+ status: Status = "allowed"
+
+ headers = {"User-Agent": _DEFAULT_UA, "Accept": "*/*"}
+ async with httpx.AsyncClient(
+ timeout=12.0, follow_redirects=True, headers=headers,
+ ) as client:
+ for scheme in ("https", "http"):
+ r_code, r_body, _ = await _fetch_status(
+ client, f"{scheme}://www.{domain}/robots.txt",
+ )
+ if r_code == 0 and scheme == "https":
+ continue
+ signals.append({"src": "robots.txt", "status_code": r_code,
+ "scheme": scheme})
+ if r_code in (401, 403):
+ status = "denied"
+ elif r_code == 200 and _robots_disallows_us(r_body):
+ status = "reserved"
+ signals[-1]["detail"] = "Disallow: / for relevant UA group"
+ elif r_code not in (200, 404):
+ status = "unknown"
+ break
+
+ if status == "allowed":
+ ai_code, _, _ = await _fetch_status(
+ client, f"https://www.{domain}/ai.txt",
+ )
+ if ai_code == 200:
+ status = "reserved"
+ signals.append({"src": "ai.txt", "status_code": 200,
+ "detail": "ai.txt present"})
+
+ if status == "allowed":
+ h_code, h_body, h_hdrs = await _fetch_status(
+ client, f"https://www.{domain}/",
+ )
+ if h_code == 200:
+ if h_hdrs.get("tdm-reservation") == "1":
+ status = "reserved"
+ signals.append({"src": "http-header",
+ "detail": "tdm-reservation: 1"})
+ elif _meta_has_reservation(h_body):
+ status = "reserved"
+ signals.append({"src": "html-meta",
+ "detail": "noai/tdm-reservation meta"})
+
+ _cache_put(domain, status, signals)
+ return {
+ "domain": domain,
+ "status": status,
+ "signals": signals,
+ "cached": False,
+ "ts": datetime.now(timezone.utc).isoformat(),
+ }
+
+
+def is_crawl_allowed(result: dict) -> bool:
+ """Strict: only 'allowed' and 'unknown' are crawlable."""
+ return (result.get("status") or "unknown") in ("allowed", "unknown")
diff --git a/backend-compliance/compliance/services/unified_findings_collector.py b/backend-compliance/compliance/services/unified_findings_collector.py
new file mode 100644
index 00000000..a909e615
--- /dev/null
+++ b/backend-compliance/compliance/services/unified_findings_collector.py
@@ -0,0 +1,277 @@
+"""
+Aggregator: Doc-Check-Results + cmp_vendors + redundancy_report
+ -> einheitliche Finding-Records fuer unified_findings_store.
+
+Speichert nur ABGELEITETE/normalisierte Findings (siehe Memory
+'feedback_oem_data_legal.md'): keine rohen CMP-Cookie-Texte, keine
+1:1-Spiegelung fremder Vendor-Listen — nur eigene Risk-/Status-Bewertung.
+
+Hook:
+ from compliance.services.unified_findings_collector import collect
+ from compliance.services.unified_findings_store import record_findings
+ findings = collect(check_id, results, cmp_vendors, redundancy_report, doc_texts)
+ record_findings(check_id, findings)
+"""
+
+from __future__ import annotations
+
+import logging
+from typing import Any
+
+logger = logging.getLogger(__name__)
+
+
+_SEVERITY_DEFAULT = {
+ "mc": "MEDIUM",
+ "pflichtangabe": "MEDIUM",
+ "vendor": "MEDIUM",
+ "redundanz": "LOW",
+}
+
+# Mapping cmp_vendor.flag → action_recipe key + Default-Severity
+_VENDOR_FLAG_SEVERITY = {
+ "no_cookies_listed": ("HIGH", "Cookie-Auflistung fehlt"),
+ "no_country": ("MEDIUM", "Sitzland des Anbieters fehlt"),
+ "no_privacy_url": ("HIGH", "Datenschutzerklaerung des Anbieters fehlt"),
+ "broken_privacy_url": ("HIGH", "Datenschutz-URL nicht erreichbar"),
+ "no_opt_out_url": ("MEDIUM", "Widerspruchs-/Opt-Out-Link fehlt"),
+ "broken_opt_out": ("MEDIUM", "Opt-Out-Link nicht erreichbar"),
+ "no_name": ("HIGH", "Anbieter-Name fehlt"),
+ "no_purpose": ("HIGH", "Verarbeitungszweck fehlt"),
+ "cookies_no_expiry": ("LOW", "Cookie-Speicherdauer fehlt"),
+ "cookies_no_names": ("LOW", "Cookie-Namen fehlen"),
+}
+
+
+def _safe_recipe(key: str) -> dict:
+ """Lookup mit lazy-import — recipes-Modul ist optional."""
+ try:
+ from compliance.services.finding_action_recipes import recipe_for
+ r = recipe_for(key)
+ return dict(r) if r else {}
+ except Exception:
+ return {}
+
+
+def _safe_anchor(label: str, doc_text: str, doc_id: str) -> dict:
+ """Anchor-Lookup mit lazy-import + best-effort."""
+ if not label or not doc_text:
+ return {}
+ try:
+ from compliance.services.doc_anchor_locator import locate_anchor
+ a = locate_anchor(label, doc_text, doc_id)
+ return a or {}
+ except Exception:
+ return {}
+
+
+def _from_doc_check(
+ check_id: str,
+ r: Any,
+ doc_text: str,
+) -> list[dict]:
+ """Convert one DocCheckResult into unified-finding rows."""
+ out: list[dict] = []
+ if r.error and r.error.startswith("Nicht anwendbar"):
+ out.append({
+ "source_type": "pflichtangabe",
+ "doc_type": r.doc_type,
+ "severity": "INFO",
+ "status": "na",
+ "regulation": "",
+ "label": f"{r.label}: {r.error}",
+ "hint": r.error,
+ "action_recipe": {},
+ "payload": {"scenario": r.scenario},
+ })
+ return out
+ if r.error:
+ out.append({
+ "source_type": "pflichtangabe",
+ "doc_type": r.doc_type,
+ "severity": "HIGH",
+ "status": "failed",
+ "regulation": "",
+ "label": f"{r.label}: Dokument nicht erreichbar",
+ "hint": r.error[:400],
+ "action_recipe": {},
+ "payload": {},
+ })
+ return out
+
+ for c in (r.checks or []):
+ is_mc = (c.id or "").startswith("mc-")
+ source = "mc" if is_mc else "pflichtangabe"
+ if c.passed:
+ status = "passed"
+ elif c.skipped:
+ status = "skipped"
+ else:
+ status = "failed"
+ severity = (c.severity or _SEVERITY_DEFAULT[source]).upper()
+ # Nur fuer Fails Anchor + Recipe — Pass-Eintraege halten wir mager
+ recipe: dict = {}
+ anchor: dict = {}
+ if status == "failed":
+ # Recipe per Label-Substring (mehr als nur exakte Keys)
+ recipe = _safe_recipe(c.label or "") or _safe_recipe(c.id or "")
+ anchor = _safe_anchor(c.label or "", doc_text, r.doc_type)
+ out.append({
+ "source_type": source,
+ "doc_type": r.doc_type,
+ "severity": severity,
+ "status": status,
+ "regulation": c.regulation or "",
+ "label": c.label or "",
+ "hint": c.hint or "",
+ "action_recipe": recipe,
+ "anchor_excerpt": (anchor.get("anchor_phrase") or "")[:800],
+ "anchor_conf": _conf_to_score(anchor),
+ "payload": {
+ "mc_id": c.id,
+ "level": c.level,
+ "parent": c.parent,
+ "matched_text": (c.matched_text or "")[:300],
+ "article": c.article or "",
+ "anchor_method": anchor.get("method"),
+ "anchor_position": anchor.get("position_hint"),
+ },
+ })
+ return out
+
+
+def _conf_to_score(anchor: dict) -> float:
+ if not anchor:
+ return 0.0
+ try:
+ return float(anchor.get("score") or 0.0)
+ except (TypeError, ValueError):
+ return 0.0
+
+
+def _from_vendors(check_id: str, vendors: list[dict]) -> list[dict]:
+ """Per-vendor flag -> finding row."""
+ out: list[dict] = []
+ for v in vendors or []:
+ name = v.get("name") or v.get("vendor_name") or "Unbekannter Anbieter"
+ country = v.get("country") or ""
+ risk = v.get("compliance_risk") or {}
+ for flag in (v.get("compliance_flags") or v.get("flags") or []):
+ sev, label = _VENDOR_FLAG_SEVERITY.get(
+ flag, ("LOW", flag.replace("_", " ").title()),
+ )
+ out.append({
+ "source_type": "vendor",
+ "doc_type": "-",
+ "severity": sev,
+ "status": "failed",
+ "regulation": "DSGVO",
+ "label": f"{name} — {label}",
+ "hint": _vendor_hint(flag, name),
+ "action_recipe": _safe_recipe(flag),
+ "vendor_name": name,
+ "category": (v.get("category") or "")[:64],
+ "payload": {
+ "flag": flag,
+ "country": country,
+ "compliance_score": v.get("compliance_score"),
+ "category": v.get("category"),
+ "risk_label": risk.get("label"),
+ "high_risk_cookies": risk.get("high_risk_cookie_count"),
+ "schrems_ii_cookies": risk.get("schrems_ii_affected_cookies"),
+ },
+ })
+ return out
+
+
+def _vendor_hint(flag: str, name: str) -> str:
+ hints = {
+ "no_cookies_listed":
+ f"Bei '{name}' sind keine Cookies dokumentiert — DSK-Orientierungshilfe "
+ "verlangt Name + Zweck + Speicherdauer pro Cookie.",
+ "no_country":
+ f"Sitzland von '{name}' fehlt — bei Drittland-Anbieter "
+ "Art. 44 ff. DSGVO erforderlich.",
+ "no_privacy_url":
+ f"Link zur Datenschutzerklaerung von '{name}' fehlt — Art. 13 Abs. 1 lit. e.",
+ "broken_privacy_url":
+ f"Privacy-URL von '{name}' nicht erreichbar (404/Timeout).",
+ "no_opt_out_url":
+ f"Opt-Out/Widerspruchs-Link fuer '{name}' fehlt — Art. 21 DSGVO.",
+ "broken_opt_out":
+ f"Opt-Out-Link von '{name}' nicht erreichbar.",
+ "no_name":
+ "Anbieter ohne Name erfasst — Art. 13 Abs. 1 lit. a.",
+ "no_purpose":
+ f"Verarbeitungszweck fuer '{name}' fehlt — Art. 13 Abs. 1 lit. c.",
+ }
+ return hints.get(flag, f"Flag: {flag}")
+
+
+def _from_redundancies(check_id: str, report: dict | None) -> list[dict]:
+ """Each redundancy category -> finding row (status='info', sev='LOW')."""
+ if not report:
+ return []
+ out: list[dict] = []
+ for r in (report.get("redundancies") or []):
+ cat = r.get("category_label") or r.get("category") or "Unbekannt"
+ vendors = r.get("vendors") or []
+ sav = r.get("estimated_saving_year_eur") or [0, 0]
+ out.append({
+ "source_type": "redundanz",
+ "doc_type": "-",
+ "severity": "LOW",
+ "status": "info",
+ "regulation": "Cost-Optimization",
+ "label": f"Mehrfach-Anbieter in '{cat}' ({len(vendors)} Tools)",
+ "hint": (
+ f"Anbieter: {', '.join(vendors[:6])}"
+ + (f" (+{len(vendors)-6} weitere)" if len(vendors) > 6 else "")
+ + (f" · EU-Empfehlung: {r['suggested_eu_tool']}"
+ if r.get("suggested_eu_tool") else "")
+ ),
+ "action_recipe": {
+ "what": "Konsolidierung auf 1 Tool pro Kategorie pruefen.",
+ "why": (r.get("consolidation_hint") or
+ "Mehrfach-Lizenzen + Vertrags-Overhead reduzieren."),
+ "fix_text": "Migrations-Plan zu einem Anbieter erarbeiten; "
+ "Vertraege ueberlappend kuendigen.",
+ },
+ "category": cat,
+ "payload": {
+ "vendors": vendors[:20],
+ "saving_year_eur_low": sav[0],
+ "saving_year_eur_high": sav[1],
+ "suggested_eu_tool": r.get("suggested_eu_tool"),
+ "caveats": (r.get("caveats") or [])[:4],
+ },
+ })
+ return out
+
+
+def collect(
+ check_id: str,
+ results: list[Any],
+ cmp_vendors: list[dict] | None,
+ redundancy_report: dict | None,
+ doc_texts: dict[str, str] | None = None,
+) -> list[dict]:
+ """Bundle all 4 finding sources into one list ready for record_findings()."""
+ out: list[dict] = []
+ texts = doc_texts or {}
+ for r in (results or []):
+ try:
+ out.extend(_from_doc_check(check_id, r, texts.get(r.doc_type, "")))
+ except Exception as e:
+ logger.warning("collect: doc result %s failed: %s",
+ getattr(r, "doc_type", "?"), e)
+ try:
+ out.extend(_from_vendors(check_id, cmp_vendors or []))
+ except Exception as e:
+ logger.warning("collect: vendors failed: %s", e)
+ try:
+ out.extend(_from_redundancies(check_id, redundancy_report))
+ except Exception as e:
+ logger.warning("collect: redundancies failed: %s", e)
+ logger.info("collect: check=%s total_findings=%d", check_id, len(out))
+ return out
diff --git a/backend-compliance/compliance/services/unified_findings_store.py b/backend-compliance/compliance/services/unified_findings_store.py
new file mode 100644
index 00000000..726a26a7
--- /dev/null
+++ b/backend-compliance/compliance/services/unified_findings_store.py
@@ -0,0 +1,190 @@
+"""
+Unified-Findings sidecar store.
+
+A compliance check produces findings from 4 sources today:
+ - Master-Controls (mc_results table — already persisted)
+ - Pflichtangaben (L1/L2 doc checks, e.g. Impressum-Vollstaendigkeit)
+ - Vendor scans (per cmp_vendor: missing privacy url, no opt-out, ...)
+ - Redundancies (multi-vendor in same category)
+
+Previously the DSB had to look in 4 different blocks of the email to
+find everything. This store flattens all of them into ONE searchable
+table so the /audit/ frontend can show a unified list with
+source / severity / status / doc_type filters.
+
+Sidecar SQLite (same DB as compliance_audit_log) — no Postgres
+migration needed.
+"""
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import sqlite3
+from pathlib import Path
+
+logger = logging.getLogger(__name__)
+
+DB_PATH = os.getenv("COMPLIANCE_AUDIT_DB", "/data/compliance_audits.db")
+
+
+def _ensure_table() -> None:
+ Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
+ with sqlite3.connect(DB_PATH) as conn:
+ conn.executescript("""
+ CREATE TABLE IF NOT EXISTS unified_findings (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ check_id TEXT NOT NULL,
+ source_type TEXT NOT NULL, -- mc|pflichtangabe|vendor|redundanz
+ doc_type TEXT, -- impressum|dse|cookie|... or '-' for vendor/redundanz
+ severity TEXT, -- CRITICAL|HIGH|MEDIUM|LOW|INFO
+ status TEXT, -- failed|passed|skipped|na|info
+ regulation TEXT,
+ label TEXT,
+ hint TEXT,
+ action_recipe TEXT, -- JSON {what,why,fix_text,where,example}
+ anchor_excerpt TEXT,
+ anchor_conf REAL,
+ vendor_name TEXT,
+ category TEXT,
+ payload TEXT -- JSON extras (matched_text, cookies count, ...)
+ );
+ CREATE INDEX IF NOT EXISTS idx_uf_check ON unified_findings(check_id);
+ CREATE INDEX IF NOT EXISTS idx_uf_source ON unified_findings(check_id, source_type);
+ CREATE INDEX IF NOT EXISTS idx_uf_status ON unified_findings(check_id, status);
+ CREATE INDEX IF NOT EXISTS idx_uf_severity ON unified_findings(check_id, severity);
+ """)
+
+
+def record_findings(check_id: str, findings: list[dict]) -> int:
+ """Bulk-insert all findings for a check. Idempotent on check_id."""
+ if not check_id:
+ return 0
+ try:
+ _ensure_table()
+ with sqlite3.connect(DB_PATH) as conn:
+ conn.execute(
+ "DELETE FROM unified_findings WHERE check_id=?", (check_id,),
+ )
+ if not findings:
+ conn.commit()
+ return 0
+ rows = [
+ (
+ check_id,
+ (f.get("source_type") or "mc")[:24],
+ (f.get("doc_type") or "")[:32],
+ (f.get("severity") or "MEDIUM").upper()[:16],
+ (f.get("status") or "failed")[:16],
+ (f.get("regulation") or "")[:64],
+ (f.get("label") or "")[:400],
+ (f.get("hint") or "")[:1200],
+ json.dumps(f.get("action_recipe") or {}, ensure_ascii=False),
+ (f.get("anchor_excerpt") or "")[:800],
+ float(f.get("anchor_conf") or 0.0),
+ (f.get("vendor_name") or "")[:160],
+ (f.get("category") or "")[:64],
+ json.dumps(f.get("payload") or {}, ensure_ascii=False),
+ )
+ for f in findings
+ ]
+ conn.executemany(
+ "INSERT INTO unified_findings "
+ "(check_id, source_type, doc_type, severity, status, regulation, "
+ " label, hint, action_recipe, anchor_excerpt, anchor_conf, "
+ " vendor_name, category, payload) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+ rows,
+ )
+ conn.commit()
+ logger.info(
+ "unified_findings: %s rows=%d sources=%s",
+ check_id, len(rows),
+ sorted(set(f.get("source_type", "mc") for f in findings)),
+ )
+ return len(rows)
+ except Exception as e:
+ logger.warning("record_findings failed for %s: %s", check_id, e)
+ return 0
+
+
+def list_findings(
+ check_id: str,
+ source_type: str | None = None,
+ severity: str | None = None,
+ doc_type: str | None = None,
+ status: str | None = None,
+ q: str | None = None,
+ limit: int = 1000,
+) -> list[dict]:
+ """Return filtered findings. q matches label OR vendor_name (case-insensitive)."""
+ try:
+ _ensure_table()
+ where = ["check_id = ?"]
+ params: list = [check_id]
+ if source_type and source_type != "all":
+ where.append("source_type = ?")
+ params.append(source_type)
+ if severity and severity != "all":
+ where.append("severity = ?")
+ params.append(severity.upper())
+ if doc_type and doc_type != "all":
+ where.append("doc_type = ?")
+ params.append(doc_type)
+ if status and status != "all":
+ where.append("status = ?")
+ params.append(status)
+ if q:
+ where.append("(LOWER(label) LIKE ? OR LOWER(vendor_name) LIKE ?)")
+ needle = f"%{q.lower()}%"
+ params.extend([needle, needle])
+ sql = ("SELECT * FROM unified_findings WHERE " + " AND ".join(where) +
+ " ORDER BY CASE severity "
+ " WHEN 'CRITICAL' THEN 0 WHEN 'HIGH' THEN 1 "
+ " WHEN 'MEDIUM' THEN 2 WHEN 'LOW' THEN 3 "
+ " ELSE 4 END, source_type, label LIMIT ?")
+ params.append(int(limit))
+ with sqlite3.connect(DB_PATH) as conn:
+ conn.row_factory = sqlite3.Row
+ rows = conn.execute(sql, params).fetchall()
+ out = []
+ for r in rows:
+ d = dict(r)
+ d["action_recipe"] = json.loads(d.get("action_recipe") or "{}")
+ d["payload"] = json.loads(d.get("payload") or "{}")
+ out.append(d)
+ return out
+ except Exception as e:
+ logger.warning("list_findings failed: %s", e)
+ return []
+
+
+def findings_summary(check_id: str) -> dict:
+ """Return aggregate counts for the filter UI (source/severity/status)."""
+ out = {
+ "total": 0,
+ "by_source": {},
+ "by_severity": {},
+ "by_status": {},
+ "by_doc_type": {},
+ }
+ try:
+ _ensure_table()
+ with sqlite3.connect(DB_PATH) as conn:
+ conn.row_factory = sqlite3.Row
+ for col in ("source_type", "severity", "status", "doc_type"):
+ rows = conn.execute(
+ f"SELECT {col} AS k, COUNT(*) AS n FROM unified_findings "
+ f"WHERE check_id=? GROUP BY {col}",
+ (check_id,),
+ ).fetchall()
+ bucket = f"by_{col if col != 'source_type' else 'source'}"
+ if col == "doc_type":
+ bucket = "by_doc_type"
+ out[bucket] = {r["k"] or "-": r["n"] for r in rows}
+ out["total"] = max(out["total"], sum(r["n"] for r in rows))
+ return out
+ except Exception as e:
+ logger.warning("findings_summary failed: %s", e)
+ return out
diff --git a/backend-compliance/main.py b/backend-compliance/main.py
index a2f66d6e..0130c83f 100644
--- a/backend-compliance/main.py
+++ b/backend-compliance/main.py
@@ -50,6 +50,8 @@ from compliance.api.agent_recurring_routes import router as agent_recurring_rout
from compliance.api.agent_compare_routes import router as agent_compare_router
from compliance.api.agent_doc_check_routes import router as agent_doc_check_router
from compliance.api.agent_compliance_check_routes import router as agent_compliance_check_router
+from compliance.api.agent_findings_routes import router as agent_findings_router
+from compliance.api.saving_scan_routes import router as saving_scan_router
from compliance.api.agent_migration_routes import router as agent_migration_router
from compliance.api.vendor_assessment_routes import router as vendor_assessment_router
from compliance.api.cra_routes import router as cra_router
@@ -157,6 +159,8 @@ app.include_router(agent_recurring_router, prefix="/api")
app.include_router(agent_compare_router, prefix="/api")
app.include_router(agent_doc_check_router, prefix="/api")
app.include_router(agent_compliance_check_router, prefix="/api")
+app.include_router(agent_findings_router, prefix="/api")
+app.include_router(saving_scan_router, prefix="/api")
app.include_router(agent_migration_router, prefix="/api")
# Vendor Contract Assessment
diff --git a/backend-compliance/tests/test_saving_scan_routes.py b/backend-compliance/tests/test_saving_scan_routes.py
new file mode 100644
index 00000000..c5c1e664
--- /dev/null
+++ b/backend-compliance/tests/test_saving_scan_routes.py
@@ -0,0 +1,116 @@
+"""
+Tests for the saving-scan funnel endpoint.
+
+Focus: input validation + lead persistence + rate-limit error path.
+The actual compliance check is mocked — we only verify the route layer.
+"""
+
+import os
+import sys
+from unittest.mock import patch
+
+import pytest
+from fastapi import FastAPI
+from fastapi.testclient import TestClient
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
+
+# Use a temp SQLite for the sidecar
+os.environ["COMPLIANCE_AUDIT_DB"] = "/tmp/test_saving_scan.db"
+if os.path.exists("/tmp/test_saving_scan.db"):
+ os.remove("/tmp/test_saving_scan.db")
+
+from compliance.api.saving_scan_routes import router # noqa: E402
+
+app = FastAPI()
+app.include_router(router, prefix="/api")
+client = TestClient(app)
+
+
+class TestStartSavingScanValidation:
+
+ def test_missing_email_returns_422(self):
+ resp = client.post("/api/compliance/agent/saving-scan/start",
+ json={"url": "https://example.de"})
+ assert resp.status_code == 422
+
+ def test_invalid_email_returns_400(self):
+ with patch("compliance.api.saving_scan_routes.asyncio.create_task"):
+ resp = client.post(
+ "/api/compliance/agent/saving-scan/start",
+ json={"url": "https://example.de", "email": "kein-email",
+ "consent": True},
+ )
+ assert resp.status_code == 400
+ assert "E-Mail" in resp.json()["detail"]
+
+ def test_invalid_url_returns_400(self):
+ with patch("compliance.api.saving_scan_routes.asyncio.create_task"):
+ resp = client.post(
+ "/api/compliance/agent/saving-scan/start",
+ json={"url": "ftp://wrong.de", "email": "u@x.de",
+ "consent": True},
+ )
+ assert resp.status_code == 400
+
+ def test_consent_required(self):
+ with patch("compliance.api.saving_scan_routes.asyncio.create_task"):
+ resp = client.post(
+ "/api/compliance/agent/saving-scan/start",
+ json={"url": "https://example.de", "email": "u@x.de",
+ "consent": False},
+ )
+ assert resp.status_code == 400
+ assert "Consent" in resp.json()["detail"]
+
+
+def _patch_check_runner():
+ """Stub the lazy-imported worker — avoids loading smtp_sender (Py3.10+)."""
+ import sys, types
+ fake = types.ModuleType("compliance.api.agent_compliance_check_routes")
+
+ class _DocInput:
+ def __init__(self, doc_type="other", url=""): self.doc_type, self.url = doc_type, url
+
+ class _Req:
+ def __init__(self, **kw): self.__dict__.update(kw)
+
+ async def _runner(*_a, **_kw): pass
+
+ fake.DocumentInput = _DocInput
+ fake.ComplianceCheckRequest = _Req
+ fake._run_compliance_check = _runner
+ fake._compliance_check_jobs = {}
+ sys.modules["compliance.api.agent_compliance_check_routes"] = fake
+
+
+class TestStartSavingScanSuccess:
+
+ def test_valid_request_starts_check(self):
+ _patch_check_runner()
+ resp = client.post(
+ "/api/compliance/agent/saving-scan/start",
+ json={"url": "https://example-newdomain.de",
+ "email": "user@example.de", "consent": True},
+ )
+ assert resp.status_code == 200, resp.text
+ data = resp.json()
+ assert "check_id" in data
+ assert data["status"] == "running"
+ assert "example-newdomain.de" in data["message"]
+
+
+class TestLeadCount:
+
+ def test_lead_count_after_submit(self):
+ _patch_check_runner()
+ client.post(
+ "/api/compliance/agent/saving-scan/start",
+ json={"url": "https://abc-leadtest.de",
+ "email": "lead@x.de", "consent": True},
+ )
+ resp = client.get("/api/compliance/agent/saving-scan/lead-count")
+ assert resp.status_code == 200
+ data = resp.json()
+ assert data["total_leads"] >= 1
+ assert "abc-leadtest.de" in str(data["top_domains"])
|