feat(audit): V2 mail render + 5 new findings (B4/B5/B6/B7/B8) + LLM-Plausibility-Phase

Mail Render V2 (compliance/services/mail_render_v2/) — 11-Modul-Subpackage
das einen einheitlichen Audit-Mail-Output erzeugt mit:
  - Header + KPI-Kacheln (Score / Findings / Docs / Vendors)
  - TOC + Sprung-Links
  - 3-Bucket-Trennung: Kritische Befunde / Manuelle Prüfung / Interne Reminder
  - Cookie-Inventar (Name·Vendor·Kategorie·Speicherdauer·Löschfrist·Sitzland·Quelle·Status)
  - Sofortmaßnahmen-Aggregator ("Sitzland ergänzen für 11 Cookies")
  - 24 Legacy-Wrappers — alle alten build_*_html in V2-Sections
  - Scope-Filter: FIN/GOV/MED/INS/EDU/LEG aus Berichten wenn nicht relevant
  - Hint/Action-Dedup: keine doppelten Sätze pro Card mehr
Aktiviert via env MAIL_RENDER_V2=true (Default: legacy renderer).

5 neue deterministische Findings als Phase D-2b/B4/B5/B6/B7/B8:

  B4 vendor_consistency_check — Cross-Doc-Provider-Widerspruch
     (Elli: DSE nennt Vertex AI für Chatbot, /de/cookies nennt Iadvize → HIGH).
     6 Service-Types: chatbot/analytics/tag_manager/pixel/cdn/cmp.

  B5 ai_act_transparency_check — AI Act Art. 50 Transparenzpflicht
     (Elli: Vertex AI vorhanden ohne Pre-Chat-Disclosure → HIGH).
     Plus B5-Erweiterung: Rechtsgrundlage Art-6-Abs-1-lit-f bei AI → MED
     (Einwilligung empfehlen).

  B6 cross_doc_dpo_check — DPO in DSE genannt, nicht im Impressum (LOW).

  B7 doc_staleness_check — Datum-Extraktion aus DSE/AGB/Nutzungsbedingungen.
     Cap: AGB/NB 3y, DSE 2y. Älter → MEDIUM (Elli NB Stand 2018 → HIGH).

  B8 cmp_fingerprint_check — Banner detected, aber CMP-Provider generic
     (kein Usercentrics/OneTrust/Cookiebot/etc → MED).

  B3-Erweiterung detect_intra_doc_contradictions — Widersprüchliche
     Speicherdauer im SELBEN Doc (Elli: Logfile 7d vs 30d → HIGH).

LLM-Plausibility-Phase (Phase D-2b, finding_plausibility_check.py):
  - Läuft AFTER MC pipeline, BEFORE D3 render
  - Prompt mit Beispiel-IDs + 3-Phase-Mapping: exact-ID / position-fallback /
    fuzzy-tail-match
  - Stempelt llm_title / llm_severity / llm_recommendation / llm_drop auf
    jeden FAIL CheckItem
  - V2-Render zeigt "🤖 LLM-Plausibility:" Box pro Finding wenn gestempelt
  - KNOWN ISSUE: qwen3:30b-a3b liefert oft empty content auf format='json' +
    8000-char-excerpt prompts. Pipeline läuft mit stamped=0 weiter. Task #16.

Coverage gegen Elli Ground Truth (zeroclaw/docs/ground-truth/elli_eco_2026-06-06.json,
13 expected findings via WebFetch-Agent-Crawl):
  - 4/4 HIGH-Findings ✓ (COOKIE-CONSENT-UX-001 + WIDERRUFSBELEHRUNG-001 +
    VENDOR-CONSISTENCY-001 + AI-ACT-TRANSPARENCY-001)
  - 4/6 MEDIUM ✓
  - 2/3 LOW ✓
  - Total: 10/13 = 77% (Sprung von 4/13 = 31%)

Restliche 3 Gaps als Task #17: IMPRESSUM-001 (multi-entity USt-IdNr),
TRANSFER-001 (Vendor-Mechanismus DPF/SCC), TH-RETENTION-002 (AI-Retention
pro Datenkategorie).

V2-Mail-Preview in Mailpit: 'v2all@local.test' Subject '[V2 ALL] ELLI'.
Backend healthy, B1+B3+B4+B5+B6+B7+B8 alle live im Orchestrator.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-06 21:19:49 +02:00
parent c2c8783fee
commit d0e3621192
27 changed files with 4426 additions and 3 deletions
@@ -0,0 +1,16 @@
"""Mail Render V2 — unified, consistent layout for the audit mail.
The original Step-5 HTML composition grew across 27+ render functions,
each with its own inline styles. Result: inconsistent colors,
typography, and card widths. V2 fixes that with:
- `_style.py` ONE place for colors, fonts, spacing helpers
- `_cookie_inventory.py` SINGLE cookie list merged from DSE / table /
live browser, with per-cookie status
- `_blocks.py` Header / TOC / Critical / Per-Doc /
Per-Theme / Caveats / Footer renderers
- `_compose.py` compose_v2(state) → full_html
Activate via env var `MAIL_RENDER_V2=true`. Default is the legacy
renderer so we can A/B compare in Mailpit.
"""
@@ -0,0 +1,296 @@
"""Mail-V2 Action library — turn findings into 'what to do where'.
Each finding type maps to a concrete action recommendation. The
mapping is intentionally pattern-matched (not LLM-generated): the
audit is deterministic, so the corrective action must be too.
Patterns matched by:
- finding `id` prefix (mc-impressum-handelsregister → impressum/HR)
- severity_reason (factually_wrong / missing / misclassified)
- mismatch_type (dsi_under_actual / table_under_actual / ...)
- cookie field name (country / duration / processing_company)
Fallback: "Manuelle Prüfung beim DSB erforderlich" with finding hint.
Returns an Action dict:
- title: short imperative ("Sitzland ergänzen")
- target: where to fix ("DSE / Vendor-Liste")
- detail: extended explanation
- aggregation_key: groupBy key for bulk recommendations
("missing_country" / "long_retention" / ...)
- effort: "low" | "med" | "hi"
"""
from __future__ import annotations
from dataclasses import asdict, dataclass
@dataclass
class Action:
title: str
target: str
detail: str
aggregation_key: str | None
effort: str # low | med | hi
def to_dict(self) -> dict:
return asdict(self)
# ── Field-level actions for cookie inventory ──────────────────────
def cookie_field_missing_action(field: str, cookie_name: str,
vendor: str) -> Action | None:
"""Return action when a cookie field is missing (or unknown)."""
if field == "country":
return Action(
title="Sitzland ergänzen",
target="DSE / Vendor-Tabelle",
detail=(f"Für Cookie '{cookie_name}' (Vendor {vendor or ''}) "
"ist kein Sitzland der verarbeitenden Stelle angegeben. "
"Art. 13 Abs. 1 lit. a DSGVO verlangt die Identität + "
"Anschrift des Verantwortlichen."),
aggregation_key="missing_country",
effort="low",
)
if field == "duration":
return Action(
title="Speicherdauer angeben",
target="DSE / Cookie-Tabelle",
detail=(f"Cookie '{cookie_name}' hat keine deklarierte "
"Speicherdauer. Art. 13 Abs. 2 lit. a DSGVO verlangt "
"die Dauer der Speicherung oder ein Kriterium dafür."),
aggregation_key="missing_duration",
effort="low",
)
if field == "retention_grounds":
return Action(
title="Löschfrist + Rechtsgrundlage angeben",
target="Löschkonzept + DSE",
detail=(f"Für Cookie '{cookie_name}' fehlt eine konkrete "
"Löschfrist. § 35 BDSG + DSK-Standard verlangen ein "
"dokumentiertes Löschkonzept pro Datenkategorie."),
aggregation_key="missing_retention",
effort="med",
)
if field == "processing_company":
return Action(
title="Verantwortliche Stelle nennen",
target="DSE",
detail=(f"Cookie '{cookie_name}' nennt keinen Verantwortlichen "
"(Firma + Adresse). Art. 13 Abs. 1 DSGVO Pflichtangabe."),
aggregation_key="missing_processing_company",
effort="low",
)
if field == "third_country":
return Action(
title="Drittlandtransfer absichern",
target="DSE + AVV-Anhang",
detail=(f"Cookie '{cookie_name}' (Vendor {vendor or ''}) "
"verarbeitet Daten außerhalb EU/EWR. Erforderlich: "
"Angemessenheitsbeschluss, Standardvertragsklauseln "
"oder ausdrückliche Einwilligung (Art. 44 ff. DSGVO)."),
aggregation_key="missing_third_country",
effort="med",
)
if field == "category":
return Action(
title="Kategorie zuordnen",
target="Cookie-Tabelle",
detail=(f"Cookie '{cookie_name}' hat keine Kategorie. EDPB "
"Cookie-Sweep verlangt: technisch notwendig / "
"Statistik / Marketing / Externe Medien."),
aggregation_key="missing_category",
effort="low",
)
return None
# ── Status-level actions (UNDOC / ORPH / MISMATCH) ───────────────
def cookie_status_action(status_code: str, cookie_name: str,
vendor: str) -> Action | None:
if status_code == "UNDOC":
return Action(
title="Cookie deklarieren oder entfernen",
target="CMP-Config + DSE",
detail=(f"Cookie '{cookie_name}' wird im Browser gesetzt, ist "
"aber nicht in DSE/Cookie-Tabelle deklariert. § 25 "
"TDDDG: entweder Deklaration nachholen oder Cookie "
"blockieren (CMP-Trigger prüfen)."),
aggregation_key="undoc_cookies",
effort="med",
)
if status_code == "ORPH":
return Action(
title="Veraltete Cookie-Angabe entfernen",
target="DSE / Cookie-Tabelle",
detail=(f"Cookie '{cookie_name}' ist in DSE deklariert, wird "
"aber im Live-Browser nicht gesetzt. Veraltete Angabe "
"entfernen, um Transparenz zu wahren."),
aggregation_key="orphan_cookies",
effort="low",
)
if status_code == "MISMATCH":
return Action(
title="Cookie-Werte korrigieren",
target="DSE / Cookie-Tabelle",
detail=(f"Cookie '{cookie_name}': deklarierte Werte weichen von "
"tatsächlich gesetzten ab. Tabelle anpassen oder "
"Cookie-Setup korrigieren."),
aggregation_key="mismatch_cookies",
effort="med",
)
return None
# ── Retention-comparison actions ─────────────────────────────────
def retention_action(retention_finding: dict) -> Action | None:
mt = retention_finding.get("mismatch_type")
cookie = retention_finding.get("cookie_name", "")
if mt == "dsi_under_actual":
return Action(
title="DSE-Speicherdauer korrigieren",
target="DSE",
detail=(f"DSE behauptet für '{cookie}' kürzere Speicherdauer als "
"real. Wert in DSE auf reale Dauer anpassen ODER Cookie-"
"Setup auf deklarierte Dauer reduzieren."),
aggregation_key="dsi_too_short",
effort="low",
)
if mt == "table_under_actual":
return Action(
title="Cookie-Tabelle korrigieren",
target="Cookie-Tabelle / CMP",
detail=(f"Cookie-Tabelle behauptet für '{cookie}' kürzere Dauer "
"als real. Wert anpassen oder Cookie-Lifetime reduzieren."),
aggregation_key="table_too_short",
effort="low",
)
if mt == "dsi_vs_table":
return Action(
title="DSE und Cookie-Tabelle synchronisieren",
target="DSE + Cookie-Tabelle",
detail=(f"DSE und Cookie-Tabelle geben unterschiedliche Werte "
f"für '{cookie}' an. Werte abgleichen."),
aggregation_key="dsi_table_mismatch",
effort="low",
)
if mt == "actual_under_table":
return Action(
title="Speicherdauer-Cap dokumentieren (Safari-ITP)",
target="DSE",
detail=(f"Cookie '{cookie}' lebt real kürzer als deklariert — "
"wahrscheinlich Safari ITP 7-Tage-Cap. In DSE ergänzen: "
"'Auf Safari-Geräten kann die Speicherdauer durch ITP "
"verkürzt werden.'"),
aggregation_key="safari_itp",
effort="low",
)
return None
# ── Reachability actions (B1) ────────────────────────────────────
def reachability_action(rb1: dict) -> Action | None:
if rb1.get("passed"):
return None
reason = rb1.get("severity_reason")
if reason == "missing":
return Action(
title="Cookie-Einstellungen-Link im Footer ergänzen",
target="Website-Footer (alle Seiten)",
detail=("Art. 7 Abs. 3 DSGVO: Widerruf muss so einfach wie "
"Erteilung sein. Footer-Link 'Cookie-Einstellungen' "
"ergänzen, der den CMP direkt öffnet (kein neuer Tab, "
"kein Zwischendokument)."),
aggregation_key="footer_reachability",
effort="low",
)
if reason == "misclassified":
return Action(
title="CMP direkt öffnen statt neuer Tab",
target="Footer-Link-Config",
detail=("Bestehender Footer-Link öffnet die CMP nicht direkt. "
"JavaScript-Trigger umstellen: kein target=_blank, "
"keine externe Policy-Seite — CMP-Layer direkt öffnen."),
aggregation_key="footer_reachability",
effort="low",
)
if reason == "factually_wrong":
return Action(
title="Eigenen CMP statt Browser-Verweis",
target="Footer + CMP",
detail=("Nutzer wird auf Browser-Einstellungen verwiesen — das "
"ist nach LfDI BW kein gleichwertiger Widerruf. Eigenen "
"CMP-Re-Open-Mechanismus implementieren."),
aggregation_key="footer_reachability",
effort="med",
)
return None
# ── Generic finding → action ────────────────────────────────────
_ID_PATTERNS = {
"handelsregister": ("HR-Eintrag im Impressum ergänzen",
"Impressum",
"§ 5 Abs. 1 Nr. 4 TMG: Registereintrag mit "
"Registergericht + HR-Nr."),
"ust-id": ("USt-IdNr. ergänzen",
"Impressum",
"§ 5 Abs. 1 Nr. 6 TMG: USt-IdNr. falls vorhanden."),
"vertretungsberechtig": ("Vertretungsberechtigte Person nennen",
"Impressum",
"§ 5 Abs. 1 Nr. 1 TMG"),
"aufsichtsbehoerde": ("Aufsichtsbehörde nennen",
"Impressum",
"§ 5 Abs. 1 Nr. 3 TMG (regulierte Branchen)"),
"berufsordnung": ("Berufsrechtliche Angaben ergänzen",
"Impressum",
"§ 5 Abs. 1 Nr. 5 TMG"),
"dsb": ("DSB benennen",
"DSE",
"Art. 37 ff. DSGVO: Datenschutzbeauftragten benennen + DSE "
"ergänzen."),
"odr": ("OS-Link auf EU-Plattform ergänzen",
"Impressum / AGB",
"Art. 14 EU-VO 524/2013 (B2C-Onlineshop)"),
"widerrufsbelehrung": ("Widerrufsbelehrung anpassen",
"Widerruf-Dokument",
"§ 312g BGB + Art. 246a EGBGB Muster-Widerrufs-"
"belehrung."),
}
def derive_generic_action(finding_id: str, label: str,
hint: str) -> Action | None:
"""Pattern-match a generic MC finding ID to an action template."""
fid = (finding_id or "").lower()
haystack = f"{fid} {label.lower()}"
for kw, (title, target, detail) in _ID_PATTERNS.items():
if kw in haystack:
return Action(
title=title,
target=target,
detail=detail + (f" Hinweis: {hint[:200]}" if hint else ""),
aggregation_key=f"mc_{kw}",
effort="low",
)
if hint:
return Action(
title="Manuelle Prüfung beim DSB",
target=label or "Doc",
detail=hint[:400],
aggregation_key=None,
effort="med",
)
return None
def action_for_finding(finding_id: str, severity: str, label: str,
hint: str) -> Action | None:
"""Top-level entry point for MC findings."""
return derive_generic_action(finding_id, label, hint)
@@ -0,0 +1,248 @@
"""Mail-V2 Bulk-Recommendation Aggregator.
Collects per-item actions (cookie-level, MC-level, retention, B1)
and groups them by `aggregation_key` so the mail can show:
🛠 Sofortmaßnahmen
• Sitzland ergänzen für 12 Cookies: _ga, _gid, _fbp, …
• Drittlandtransfer absichern für 5 US-Vendors: Google, Meta, …
• Speicherdauer > 13mo bei 3 Cookies (CNIL-Cap): IDE, _gcl_au, …
This converts individual fix-recommendations into actionable
"do-this-one-thing-fixes-multiple-cookies" bullets that scale.
"""
from __future__ import annotations
from ._actions import (
Action,
cookie_field_missing_action,
cookie_status_action,
reachability_action,
retention_action,
action_for_finding,
)
# ── Group-label registry ─────────────────────────────────────────
GROUP_LABELS: dict[str, dict] = {
"missing_country": {
"label": "Sitzland ergänzen",
"icon": "🌍",
"norm": "Art. 13 Abs. 1 lit. a DSGVO",
},
"missing_duration": {
"label": "Speicherdauer ergänzen",
"icon": "",
"norm": "Art. 13 Abs. 2 lit. a DSGVO",
},
"missing_retention": {
"label": "Löschfrist + Rechtsgrundlage angeben",
"icon": "🗑",
"norm": "§ 35 BDSG",
},
"missing_processing_company": {
"label": "Verantwortliche Stelle nennen",
"icon": "🏢",
"norm": "Art. 13 Abs. 1 DSGVO",
},
"missing_third_country": {
"label": "Drittlandtransfer absichern",
"icon": "🌐",
"norm": "Art. 44 ff. DSGVO",
},
"missing_category": {
"label": "Cookie-Kategorie zuordnen",
"icon": "🏷",
"norm": "EDPB Cookie-Sweep",
},
"undoc_cookies": {
"label": "Undeklarierte Cookies adressieren",
"icon": "",
"norm": "§ 25 Abs. 1 TDDDG",
},
"orphan_cookies": {
"label": "Veraltete Cookie-Angaben entfernen",
"icon": "👻",
"norm": "Art. 5 Abs. 1 lit. a DSGVO (Transparenz)",
},
"mismatch_cookies": {
"label": "Cookie-Werte mit Realität abgleichen",
"icon": "🔀",
"norm": "Art. 5 Abs. 1 lit. d DSGVO",
},
"dsi_too_short": {
"label": "DSE-Speicherdauer korrigieren (zu kurz angegeben)",
"icon": "📏",
"norm": "Art. 13 Abs. 2 DSGVO",
},
"table_too_short": {
"label": "Cookie-Tabelle-Speicherdauer korrigieren",
"icon": "📏",
"norm": "Art. 13 Abs. 2 DSGVO",
},
"dsi_table_mismatch": {
"label": "DSE ↔ Cookie-Tabelle synchronisieren",
"icon": "🔁",
"norm": "Art. 5 Abs. 2 DSGVO Rechenschaftspflicht",
},
"safari_itp": {
"label": "Safari-ITP-Cap in DSE dokumentieren",
"icon": "🍎",
"norm": "DSGVO Transparenzgebot",
},
"footer_reachability": {
"label": "Footer-Reachability für Widerruf herstellen",
"icon": "🔗",
"norm": "Art. 7 Abs. 3 DSGVO",
},
}
def _generic_group(key: str | None) -> dict:
if not key:
return {"label": "Manuelle Prüfung", "icon": "🔍", "norm": ""}
if key.startswith("mc_"):
kw = key[3:].replace("_", " ").title()
return {"label": f"{kw} ergänzen", "icon": "📝",
"norm": "MC-Prüfung"}
return {"label": key.replace("_", " ").title(), "icon": "", "norm": ""}
# ── Item types collected ────────────────────────────────────────
def _cookie_items(state: dict) -> list[tuple[Action, str]]:
"""Yield (action, item_label) for every cookie-level concern.
item_label is what gets aggregated into the bullet list of names.
"""
from ._cookie_inventory import build_cookie_inventory
rows, _ = build_cookie_inventory(state)
items: list[tuple[Action, str]] = []
for r in rows:
name = r.get("name") or ""
vendor = r.get("vendor") or ""
label = f"{name}" + (f" ({vendor})" if vendor and vendor != "" else "")
# Status-level
st_action = cookie_status_action(r["status_code"], name, vendor)
if st_action:
items.append((st_action, label))
# Field-level
for field, value in (
("country", r.get("country")),
("duration", r.get("duration")),
("retention_grounds", r.get("retention_grounds")),
("processing_company", r.get("processing_company")),
("category", r.get("category")),
):
if not value or value in ("", "", ""):
fa = cookie_field_missing_action(field, name, vendor)
if fa:
items.append((fa, label))
if r.get("third_country"):
ta = cookie_field_missing_action("third_country", name, vendor)
if ta:
items.append((ta, label))
return items
def _retention_items(state: dict) -> list[tuple[Action, str]]:
items: list[tuple[Action, str]] = []
for f in (state.get("retention_findings") or []):
if f.get("matches"):
continue
a = retention_action(f)
if a:
label = (f.get("cookie_name") or "")
vendor = f.get("vendor_name") or ""
if vendor:
label += f" ({vendor})"
items.append((a, label))
return items
def _reachability_items(state: dict) -> list[tuple[Action, str]]:
a = reachability_action(state.get("reachability_finding") or {})
if not a:
return []
return [(a, "Footer")]
def _mc_items(state: dict) -> list[tuple[Action, str]]:
items: list[tuple[Action, str]] = []
for r in (state.get("results") or []):
doc = getattr(r, "label", "") or ""
for c in getattr(r, "checks", []) or []:
if getattr(c, "passed", True) or getattr(c, "skipped", False):
continue
sev = (getattr(c, "severity", "") or "").upper()
if sev not in ("CRITICAL", "HIGH", "MEDIUM"):
continue
a = action_for_finding(
getattr(c, "id", ""),
sev,
getattr(c, "label", ""),
getattr(c, "hint", "") or "",
)
if a:
items.append((a, doc))
return items
def collect_actions(state: dict) -> list[dict]:
"""Top-level: collect every item-action across cookie/retention/B1/MC."""
raw = (
_cookie_items(state)
+ _retention_items(state)
+ _reachability_items(state)
+ _mc_items(state)
)
out: list[dict] = []
for action, label in raw:
out.append({**action.to_dict(), "item": label})
return out
def group_by_action(state: dict) -> list[dict]:
"""Aggregate item-actions by aggregation_key.
Returns a list of groups:
{
"key": "missing_country",
"label": "Sitzland ergänzen",
"icon": "🌍",
"norm": "Art. 13 Abs. 1 lit. a DSGVO",
"effort": "low",
"count": 12,
"items": ["_ga (Google)", "_gid (Google)", ...],
"first_detail": "..." (first action.detail in the group),
}
sorted by count desc, then by group label.
"""
actions = collect_actions(state)
buckets: dict[str | None, dict] = {}
for a in actions:
key = a.get("aggregation_key")
bucket = buckets.setdefault(key, {
"key": key,
"label": None, "icon": None, "norm": None,
"effort": a.get("effort", "med"),
"items": [], "count": 0,
"first_detail": a.get("detail", ""),
})
if not bucket["label"]:
meta = GROUP_LABELS.get(key or "") or _generic_group(key)
bucket["label"] = meta["label"]
bucket["icon"] = meta["icon"]
bucket["norm"] = meta["norm"]
item = a.get("item") or ""
if item not in bucket["items"]:
bucket["items"].append(item)
bucket["count"] = len(bucket["items"])
groups = list(buckets.values())
# sort: high-impact (effort=low + many items) first
eff_rank = {"low": 0, "med": 1, "hi": 2}
groups.sort(key=lambda g: (eff_rank.get(g["effort"], 9),
-g["count"], g["label"] or ""))
return groups
@@ -0,0 +1,367 @@
"""Mail-V2 section renderers — one function per top-level block.
Each renderer takes a slice of `state` and returns ready-to-concatenate
HTML using the helpers from `_style`. Every block is full-width, has
the same card shell, and uses the same color palette.
Finding-bucket renderers (critical / manual / internal) live in
`_blocks_findings.py` to keep this file under the LOC cap.
"""
from __future__ import annotations
from html import escape as h
from ._aggregator import group_by_action
from ._blocks_findings import count_critical, count_internal, count_manual
from ._cookie_inventory import (
build_cookie_inventory,
inventory_headers,
render_inventory_rows,
)
from ._style import (
SZ_H3,
SZ_SMALL,
TEXT,
TEXT_MUTED,
card,
chip,
kpi_row,
section,
table,
)
# ── Helpers ──────────────────────────────────────────────────────
def _score_sev(pct: int | None) -> str:
if pct is None:
return "info"
if pct >= 90:
return "pass"
if pct >= 70:
return "info"
if pct >= 40:
return "warn"
return "fail"
# ── 1. Header + KPI row ──────────────────────────────────────────
def render_header(state: dict) -> str:
site = h(state.get("site_name") or "")
dom = h(state.get("domain") or "")
scorecard = state.get("scorecard") or {}
score_pct = (scorecard.get("totals") or {}).get("pct")
doc_count = state.get("doc_count") or 0
docs_total = len(state.get("results") or [])
findings = state.get("total_findings") or 0
vendors = len(state.get("cmp_vendors") or [])
title_html = (
f'<h1 style="font-size:24px;margin:0 0 4px;color:{TEXT};'
f'font-weight:700;">{site}</h1>'
f'<div style="font-size:13px;color:{TEXT_MUTED};margin-bottom:8px;">'
f'{dom} · Compliance-Audit</div>'
)
kpis = [
{"label": "Compliance-Score",
"value": f"{score_pct}%" if score_pct is not None else "",
"sev": _score_sev(score_pct)},
{"label": "Findings", "value": str(findings),
"sev": "fail" if findings > 5 else "warn" if findings > 0 else "pass"},
{"label": "Dokumente",
"value": f"{doc_count}/{docs_total}", "sev": "info"},
{"label": "Vendors", "value": str(vendors),
"sev": "warn" if vendors > 20 else "info"},
]
return title_html + kpi_row(kpis)
# ── 2. Table of contents ────────────────────────────────────────
def render_toc(state: dict) -> str:
rows = [
("#critical", f"Kritische Befunde ({count_critical(state)})"),
("#manual", f"Manuelle Prüfung ({count_manual(state)})"),
("#internal", f"Interne Reminder ({count_internal(state)})"),
("#sofortmassnahmen", "Sofortmaßnahmen"),
("#per-doc",
f"Pro Dokument ({len(state.get('results') or [])})"),
("#per-theme", "Pro Thema"),
("#caveats",
f"Audit-Vorbehalte ({len(state.get('audit_quality_findings') or [])})"),
("#attach",
f"Anhänge ({1 if state.get('cookie_evidence_slices') else 0})"),
]
items = "".join(
f'<li style="margin:6px 0;"><a href="{href}" style="color:#1e40af;'
f'text-decoration:none;">{h(label)}</a></li>'
for href, label in rows
)
return section(
"📋 Inhalt",
f'<ol style="margin:0;padding-left:18px;font-size:14px;">{items}</ol>',
)
# ── 4. Per-document blocks ──────────────────────────────────────
def render_per_doc(state: dict) -> str:
results = state.get("results") or []
if not results:
return ""
cards = []
for r in results:
label = h(getattr(r, "label", "") or "")
url = getattr(r, "url", "") or ""
url_html = (f'<a href="{h(url)}" style="color:#1e40af;font-size:'
f'{SZ_SMALL};">{h(url)}</a>') if url else ""
corr = getattr(r, "correctness_pct", 0) or 0
err = getattr(r, "error", "") or ""
checks = getattr(r, "checks", []) or []
n_total = len(checks)
n_pass = sum(1 for c in checks if c.passed and not c.skipped)
n_fail = sum(1 for c in checks if not c.passed and not c.skipped)
n_skip = sum(1 for c in checks if c.skipped)
score_sev = _score_sev(corr)
head = (
f'<div style="display:flex;justify-content:space-between;'
f'align-items:flex-start;">'
f'<div><span style="font-size:{SZ_H3};font-weight:600;">{label}</span>'
f'<div>{url_html}</div></div>'
f'<div style="text-align:right;">'
f'{chip(f"{corr}%", score_sev)}</div></div>'
)
if err:
body = (f'<p style="margin:8px 0 0;color:{TEXT_MUTED};">'
f'{h(err)}</p>')
else:
counts = (
f'<div style="margin:8px 0;font-size:{SZ_SMALL};'
f'color:{TEXT_MUTED};">'
f'{n_total} MCs · {n_pass} ✓ · {n_fail} ✗ · {n_skip} ?</div>'
)
top = [c for c in checks
if not c.passed and not c.skipped][:3]
top_list = ""
if top:
lis = "".join(
f'<li style="margin:4px 0;">'
f'{h(getattr(c, "label", "")[:120])}</li>'
for c in top
)
top_list = (
f'<ul style="margin:6px 0 0 16px;padding:0;'
f'font-size:13px;color:{TEXT};">{lis}</ul>'
)
body = counts + top_list
cards.append(card(head + body,
sev=score_sev if not err else "info"))
return section(f"📄 4. Pro Dokument ({len(results)})",
"".join(cards), anchor="per-doc")
# ── 5. Per-theme blocks ─────────────────────────────────────────
def render_theme_cookie_banner(state: dict) -> str:
br = state.get("banner_result") or {}
if not br:
return ""
detected = br.get("detected") or br.get("banner_detected")
provider = br.get("provider") or br.get("banner_provider") or ""
violations = br.get("violations") or len(
(br.get("banner_checks") or {}).get("violations") or [])
body = (
f'<div><strong>Provider:</strong> {h(str(provider))} · '
f'<strong>Detected:</strong> '
f'{chip("Ja" if detected else "Nein", "pass" if detected else "fail")} · '
f'<strong>Violations:</strong> {violations}</div>'
)
return card(
f'<h3 style="margin:0 0 6px;font-size:{SZ_H3};">▶ Cookie-Banner</h3>'
+ body,
sev="warn" if violations else "pass",
)
def render_theme_cookie_inventory(state: dict) -> str:
rows, summary = build_cookie_inventory(state)
if summary["total"] == 0:
return ""
head = (
f'<h3 style="margin:0 0 6px;font-size:{SZ_H3};">'
f'▶ Cookie-Inventar ({summary["total"]})</h3>'
f'<div style="font-size:{SZ_SMALL};color:{TEXT_MUTED};'
f'margin-bottom:6px;">'
f'{summary["declared"]} deklariert · '
f'{summary["in_browser"]} im Browser · '
f'<span style="color:#dc2626;">{summary["undoc"]} UNDOC</span> · '
f'<span style="color:#92400e;">{summary["orph"]} ORPH</span> · '
f'<span style="color:#15803d;">{summary["ok"]} OK</span>'
f' · {summary["third_country"]} Drittland'
f'</div>'
f'<div style="font-size:{SZ_SMALL};color:{TEXT_MUTED};'
f'margin-bottom:6px;">'
f'Fehlende Pflichtangaben — Sitzland: {summary["missing_country"]}'
f' · Speicherdauer: {summary["missing_duration"]}'
f'</div>'
)
show_rows = render_inventory_rows(rows[:50])
body = table(inventory_headers(), show_rows)
if len(rows) > 50:
body += (
f'<p style="margin:6px 0 0;font-size:{SZ_SMALL};'
f'color:{TEXT_MUTED};">'
f'… und {len(rows) - 50} weitere</p>'
)
sev = "fail" if summary["undoc"] else "warn" if summary["orph"] else "pass"
return card(head + body, sev=sev)
def render_sofortmassnahmen(state: dict) -> str:
"""Aggregated bulk-recommendations: '1 Aktion fixt N Items'."""
groups = group_by_action(state)
if not groups:
return ""
rows = []
for g in groups:
items = g["items"]
sample = ", ".join(items[:5])
more = f" + {len(items) - 5} weitere" if len(items) > 5 else ""
eff_sev = ("pass" if g["effort"] == "low"
else "warn" if g["effort"] == "med" else "fail")
rows.append([
f'{g.get("icon") or ""} <strong>{h(g["label"])}</strong>'
f'<div style="font-size:11px;color:{TEXT_MUTED};margin-top:2px;">'
f'{h(g.get("norm") or "")}</div>',
f'<strong>{g["count"]}</strong>',
f'<div style="font-size:12px;color:{TEXT};">'
f'{h(sample)}{h(more)}</div>',
chip(g["effort"].upper(), eff_sev),
])
body = table(["Maßnahme", "Anz.", "Betrifft", "Aufwand"], rows)
return section(
f"🛠 Sofortmaßnahmen ({len(groups)})",
'<p style="margin:0 0 8px;color:' + TEXT_MUTED + ';font-size:13px;">'
'Eine Aktion behebt mehrere Findings auf einmal — nach Aufwand sortiert.'
'</p>' + body,
sev="warn",
anchor="sofortmassnahmen",
)
def render_theme_retention(state: dict) -> str:
s = state.get("retention_theme_summary") or {}
findings = state.get("retention_findings") or []
if not s.get("total"):
return ""
head = (
f'<h3 style="margin:0 0 6px;font-size:{SZ_H3};">'
f'▶ Speicherdauer-Konsistenz (TH-RETENTION)</h3>'
f'<div style="font-size:{SZ_SMALL};color:{TEXT_MUTED};'
f'margin-bottom:6px;">'
f'{s["total"]} Cookies · '
f'<span style="color:#15803d;">{s["passed"]} ✓</span> · '
f'<span style="color:#dc2626;">{s["failed"]} ✗</span> · '
f'<span style="color:#64748b;">{s["incomplete"]} ?</span>'
f'</div>'
)
fails = [f for f in findings
if not f.get("matches")
and f.get("severity_reason") != "incomplete"][:5]
if not fails:
return card(head, sev="pass")
rows = []
for f in fails:
sev = (f.get("severity") or "").upper()
sev_key = "fail" if sev == "HIGH" else "warn"
rows.append([
f'<code>{h(f.get("cookie_name") or "")}</code>',
h(f.get("vendor_name") or ""),
h(f.get("mismatch_type") or ""),
chip(sev, sev_key),
])
body = table(["Cookie", "Vendor", "Mismatch", "Sev"], rows)
sev = "fail" if s.get("failed", 0) else "warn"
return card(head + body, sev=sev)
def render_theme_reachability(state: dict) -> str:
f = state.get("reachability_finding") or {}
if not f:
return ""
passed = f.get("passed")
sev_key = "pass" if passed else (
"fail" if (f.get("severity") or "").upper() == "HIGH" else "warn")
notes_html = "".join(
f'<li style="margin:3px 0;">{h(n)}</li>'
for n in (f.get("notes") or [])
)
sub = (
f'<ul style="margin:6px 0 0 16px;font-size:13px;color:{TEXT};">'
f'{notes_html}</ul>' if notes_html else ""
)
head = (
f'<h3 style="margin:0 0 6px;font-size:{SZ_H3};">'
f'▶ Mobile Reachability (COOKIE-CONSENT-UX-001)</h3>'
f'<div>{chip((f.get("severity") or "PASS").upper(), sev_key)} '
f'<span style="margin-left:6px;font-size:{SZ_SMALL};'
f'color:{TEXT_MUTED};">{h(f.get("severity_reason") or "ok")}</span>'
f'</div>'
)
return card(head + sub, sev=sev_key)
def render_per_theme(state: dict) -> str:
parts = [
render_theme_cookie_banner(state),
render_theme_cookie_inventory(state),
render_theme_retention(state),
render_theme_reachability(state),
]
parts = [p for p in parts if p]
if not parts:
return ""
return section("🎯 5. Pro Thema", "".join(parts), anchor="per-theme")
# ── 6. Audit caveats ────────────────────────────────────────────
def render_caveats(state: dict) -> str:
fs = state.get("audit_quality_findings") or []
if not fs:
return ""
items = []
for f in fs:
sev = (f.get("severity") or "INFO").upper()
sev_key = ("fail" if sev == "HIGH"
else "warn" if sev == "MEDIUM" else "info")
title = h(f.get("title") or f.get("label") or "Vorbehalt")
msg = h(f.get("message") or f.get("hint") or "")
items.append(card(
f'<strong>{chip(sev, sev_key)} {title}</strong>'
f'<div style="margin-top:6px;">{msg}</div>',
sev=sev_key,
))
return section(f"⚠️ 6. Audit-Vorbehalte ({len(fs)})",
"".join(items), sev="warn", anchor="caveats")
# ── 7. Attachments ──────────────────────────────────────────────
def render_attachments(state: dict) -> str:
slices = state.get("cookie_evidence_slices") or []
if not slices:
return ""
meta = state.get("cookie_evidence_meta") or {}
n = len(slices)
body = (
f'<p style="margin:0;">'
f'Beweis-ZIP <code>evidence-{h(state.get("check_id", "")[:8])}.zip</code> '
f'mit <strong>{n}</strong> Slice(s), '
f'manifest.json + audit_metadata.json (SHA256 pro Slice).</p>'
f'<p style="margin:6px 0 0;font-size:{SZ_SMALL};color:{TEXT_MUTED};">'
f'Quelle: {h(meta.get("url") or "")}'
f'</p>'
)
return section("📎 7. Anhänge", body, sev="info", anchor="attach")
@@ -0,0 +1,290 @@
"""Mail-V2 finding-bucket renderers.
Separates FAIL items into three buckets — the user's design constraint:
hard_fail public + evidence → 🔴 Kritische Befunde
manual_review public, no evidence → 🔍 Manuelle Prüfung
internal_reminder internal process → 💼 Reminder (NEVER a fail)
The MC-DB stays as-is. If the LLM-Plausibility phase has already run
it stamps `c.llm_title` / `c.llm_recommendation` / `c.llm_severity`
onto each check; the renderer picks those up when present, otherwise
falls back to the original MC label verbatim. No question-form
rewriting here — that's the LLM-phase's job.
"""
from __future__ import annotations
from html import escape as h
from ._actions import action_for_finding
from ._label_norm import classify_check
from ._scope_filter import (
filter_out_of_scope,
get_last_drop_stats,
)
from ._style import (
SZ_SMALL,
TEXT,
TEXT_MUTED,
card,
chip,
section,
)
def _strip_qmark(s: str) -> str:
"""Normalise a string for dedup comparison."""
return (s or "").strip().rstrip("?").strip().lower()
def _is_dup(a: str, b: str) -> bool:
"""True when a and b carry essentially the same content."""
aa = _strip_qmark(a)
bb = _strip_qmark(b)
if not aa or not bb:
return False
if aa == bb:
return True
short, long = sorted((aa, bb), key=len)
return short and short in long and len(short) > 30
def _collect_three_buckets(state: dict) -> tuple[list[dict], list[dict],
list[dict]]:
"""Split all FAIL items into the three buckets."""
hard: list[dict] = []
manual: list[dict] = []
internal: list[dict] = []
business_scope = state.get("business_scope") or set()
for r in state.get("results") or []:
# Drop sector-specific MCs that don't apply to this business
scoped = filter_out_of_scope(
getattr(r, "checks", []) or [], business_scope,
)
for c in scoped:
sev = (getattr(c, "severity", "") or "").upper()
# LLM-plausibility may downgrade — read llm_severity if set
llm_sev = (getattr(c, "llm_severity", "") or "").upper()
effective_sev = llm_sev or sev
if effective_sev not in ("CRITICAL", "HIGH", "MEDIUM"):
continue
if getattr(c, "passed", True) or getattr(c, "skipped", False):
continue
# LLM may flag a finding as not plausible → drop
if getattr(c, "llm_drop", False):
continue
bucket = classify_check(c)
raw_label = getattr(c, "label", "")
llm_title = getattr(c, "llm_title", "") or ""
llm_recommendation = getattr(c, "llm_recommendation", "") or ""
title = (llm_title or raw_label)[:200]
hint = (getattr(c, "hint", "") or "")[:500]
matched = (getattr(c, "matched_text", "") or "")[:400]
action = action_for_finding(
getattr(c, "id", ""), effective_sev, raw_label, hint,
)
entry = {
"sev": effective_sev,
"id": getattr(c, "id", ""),
"title": title,
"raw_label": raw_label,
"hint": hint,
"matched": matched,
"llm_recommendation": llm_recommendation,
"doc": getattr(r, "label", ""),
"reg": getattr(c, "regulation", "") or "",
"action": action.to_dict() if action else None,
}
if bucket == "hard_fail" and effective_sev in ("CRITICAL", "HIGH"):
hard.append(entry)
elif bucket == "internal_reminder":
internal.append(entry)
else:
manual.append(entry)
# B1 reachability (always hard if HIGH — directly observed)
rb1 = state.get("reachability_finding") or {}
if (rb1.get("severity") or "").upper() == "HIGH" and not rb1.get("passed"):
notes = " · ".join(rb1.get("notes") or [])
hard.append({
"sev": "HIGH",
"id": rb1.get("check_id", "COOKIE-CONSENT-UX-001"),
"title": "Mobile Consent-Reachability — kein Reopen-Link im Footer",
"raw_label": "Mobile Consent-Reachability",
"hint": notes,
"matched": "Footer-Scan: 0 Reopen-Anchor",
"llm_recommendation": "",
"doc": "Website-Footer",
"reg": "DSGVO Art. 7 Abs. 3",
"action": {"title": "Cookie-Einstellungen-Link im Footer ergänzen",
"target": "Website-Footer (alle Seiten)",
"detail": ("Footer-Link 'Cookie-Einstellungen' "
"ergänzen, der den CMP direkt öffnet."),
"effort": "low"},
})
# B3 retention HIGH/MED fails (3-source evidence)
for f in (state.get("retention_findings") or []):
sev = (f.get("severity") or "").upper()
if sev not in ("HIGH", "MEDIUM") or f.get("matches"):
continue
cookie = f.get("cookie_name") or ""
hard.append({
"sev": sev,
"id": "TH-RETENTION",
"title": f"Speicherdauer-Konflikt für {cookie}",
"raw_label": "Cookie-Speicherdauer-Konsistenz",
"hint": (f"DSI {f.get('dsi_days')}d · Tabelle "
f"{f.get('table_days')}d · "
f"Realität {f.get('actual_days')}d"),
"matched": (f.get("dsi_sentence") or "")[:200],
"llm_recommendation": "",
"doc": "Cookie-Richtlinie",
"reg": "DSGVO Art. 13 Abs. 2 lit.a",
"action": {"title": ("DSE / Cookie-Tabelle korrigieren "
if "dsi" in (f.get("mismatch_type") or "")
else "Cookie-Lifetime reduzieren"),
"target": "DSE + Cookie-Tabelle",
"detail": f"Mismatch-Typ: {f.get('mismatch_type')}",
"effort": "low"},
})
sev_rank = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2}
hard.sort(key=lambda x: (sev_rank.get(x["sev"], 9), x["title"]))
return hard, manual, internal
def count_critical(state: dict) -> int:
hard, _, _ = _collect_three_buckets(state)
return len(hard)
def count_manual(state: dict) -> int:
_, manual, _ = _collect_three_buckets(state)
return len(manual)
def count_internal(state: dict) -> int:
_, _, internal = _collect_three_buckets(state)
return len(internal)
def _render_finding_card(it: dict, *, sev_key: str = "fail") -> str:
head = (
f'{chip(it["sev"], sev_key)}'
f'<span style="margin-left:8px;font-weight:600;">{h(it["title"])}</span>'
)
meta = (
f'<div style="font-size:{SZ_SMALL};color:{TEXT_MUTED};margin:4px 0;">'
f'{h(it["id"])} · {h(it["doc"])} · {h(it["reg"])}</div>'
)
evidence = ""
if it.get("matched"):
evidence = (
f'<div style="font-size:13px;color:{TEXT};margin-top:6px;'
f'background:#fef3c7;padding:6px 8px;border-radius:4px;'
f'border-left:2px solid #f59e0b;">'
f'<strong>Beobachtet:</strong> <em>{h(it["matched"])}</em></div>'
)
hint = ""
if it.get("hint") and not _is_dup(it.get("hint"), it.get("title")):
hint = (
f'<div style="font-size:13px;color:{TEXT_MUTED};margin-top:6px;">'
f'{h(it["hint"])}</div>'
)
action_html = ""
a = it.get("action")
if a:
# Skip action.target rendering when it duplicates the title
target = "" if _is_dup(a.get("target", ""), it.get("title")) \
else a.get("target", "")
# Skip action.detail when it duplicates hint or title
detail = a.get("detail", "")
if _is_dup(detail, it.get("hint")) or _is_dup(detail, it.get("title")):
detail = ""
target_html = (
f' <span style="color:{TEXT_MUTED};">({h(target)})</span>'
if target else ""
)
detail_html = (
f'<div style="margin-top:4px;color:{TEXT};">{h(detail)}</div>'
if detail else ""
)
action_html = (
f'<div style="font-size:13px;margin-top:8px;background:#dcfce7;'
f'padding:8px 10px;border-radius:4px;border-left:2px solid #16a34a;">'
f'<strong>→ {h(a["title"])}</strong>{target_html}'
f'{detail_html}'
f'</div>'
)
llm_html = ""
if it.get("llm_recommendation"):
llm_html = (
f'<div style="font-size:13px;margin-top:8px;background:#dbeafe;'
f'padding:8px 10px;border-radius:4px;border-left:2px solid #3b82f6;">'
f'<strong>🤖 LLM-Plausibility:</strong> '
f'{h(it["llm_recommendation"])}</div>'
)
return card(head + meta + evidence + hint + action_html + llm_html,
sev=sev_key)
def render_critical(state: dict) -> str:
hard, _, _ = _collect_three_buckets(state)
if not hard:
body = (
'<p style="margin:0;color:#15803d;">'
'Keine HIGH/CRITICAL-Befunde mit harter Evidenz im aktuellen Lauf.'
'</p>'
)
return section("✅ 1. Kritische Befunde", body, sev="pass",
anchor="critical")
cards = [_render_finding_card(it, sev_key="fail") for it in hard]
intro = ('<p style="margin:0 0 8px;color:' + TEXT_MUTED + ';font-size:13px;">'
'Findings mit direkt beobachtbarer Evidenz (öffentliche Daten). '
'Pro Befund: Was wir geprüft haben · Beobachtung · Was zu tun ist.'
'</p>')
return section(f"🔴 1. Kritische Befunde ({len(hard)})",
intro + "".join(cards), sev="fail", anchor="critical")
def render_manual_review(state: dict) -> str:
_, manual, _ = _collect_three_buckets(state)
drop_stats = get_last_drop_stats()
if not manual:
if drop_stats.get("count"):
note = ('<p style="margin:0;color:#64748b;font-size:13px;">'
f'Keine manuell zu prüfenden Punkte. '
f'Branchen-spezifische MCs ausgefiltert: '
f'{drop_stats["count"]} '
f'({", ".join(f"{k}:{v}" for k,v in drop_stats["by_prefix"].items())})'
'</p>')
return section("✅ 2. Manuelle Prüfung", note, sev="pass",
anchor="manual")
return ""
cards = [_render_finding_card(it, sev_key="warn") for it in manual]
intro = ('<p style="margin:0 0 8px;color:' + TEXT_MUTED + ';font-size:13px;">'
'Diese Punkte sind öffentlich prüfbar, aber unser Audit konnte '
'sie nicht eindeutig feststellen — Hinweis: Original-MC-Frage. '
'Empfehlung: manuell beim Mandanten/DSB klären. '
'Die LLM-Plausibilitätsprüfung hilft Frage→Aussage zu wandeln '
'(siehe 🤖-Block pro Finding falls schon gelaufen).</p>')
return section(f"🔍 2. Manuelle Prüfung erforderlich ({len(manual)})",
intro + "".join(cards), sev="warn", anchor="manual")
def render_internal_reminders(state: dict) -> str:
_, _, internal = _collect_three_buckets(state)
if not internal:
return ""
cards = [_render_finding_card(it, sev_key="info") for it in internal]
intro = ('<p style="margin:0 0 8px;color:' + TEXT_MUTED + ';font-size:13px;">'
'Interne Prozesse (TOM, DSFA, AVV, Löschkonzept, Schulungen, '
'Incident-Response, VVT) sind von außen nicht prüfbar. '
'<strong>Dies sind Reminder — kein Befund über die Website.</strong> '
'Beim Mandanten die Existenz + Aktualität der Dokumente verifizieren.'
'</p>')
return section(f"💼 3. Interne Prozesse — Reminder ({len(internal)})",
intro + "".join(cards), sev="info", anchor="internal")
@@ -0,0 +1,64 @@
"""Mail-V2 compose — single entrypoint that returns the full HTML.
Call `compose_v2(state)` from the email-dispatch phase when
`MAIL_RENDER_V2=true`. Default remains the legacy compose so we can
A/B in Mailpit.
"""
from __future__ import annotations
import os
from ._blocks import (
render_attachments,
render_caveats,
render_header,
render_per_doc,
render_per_theme,
render_sofortmassnahmen,
render_toc,
)
from ._blocks_findings import (
render_critical,
render_internal_reminders,
render_manual_review,
)
from ._legacy_wrappers import render_all_legacy
from ._style import page_close, page_open
def compose_v2(state: dict) -> str:
"""Build the full audit-mail HTML in the V2 layout."""
site = state.get("site_name") or ""
parts = [
page_open(site),
render_header(state),
render_toc(state),
render_critical(state),
render_manual_review(state),
render_internal_reminders(state),
render_sofortmassnahmen(state),
render_per_doc(state),
render_per_theme(state),
# B4 — Cross-Doc Vendor-Consistency (Elli Vertex↔Iadvize pattern)
state.get("vendor_consistency_html", ""),
# B5 — AI-Act Art. 50 Transparenzpflicht
state.get("ai_act_html", ""),
# B6/B7/B8 — DPO-cross-doc + Doc-Staleness + CMP-fingerprint
state.get("extra_findings_html", ""),
# All legacy build_*_html() wrapped in V2 sections — preserves
# every information block from the old renderer (Exec Summary,
# Banner-Screenshot, VVT, Redundancy, Solutions, Diff, etc.)
render_all_legacy(state),
render_caveats(state),
render_attachments(state),
page_close(state.get("check_id", ""),
os.environ.get("BUILD_SHA", "unknown")),
]
return "".join(p for p in parts if p)
def is_v2_enabled() -> bool:
return os.environ.get("MAIL_RENDER_V2", "false").lower() in (
"true", "1", "yes", "on",
)
@@ -0,0 +1,267 @@
"""Mail-V2 Cookie-Inventar — single table with per-cookie status + action.
Merges three sources:
- declared in DSE / cookie-table (state["cmp_vendors"][i]["cookies"])
- live in browser (state["banner_result"]["cookies_detailed"])
- cookie_audit comparison (state["cookie_audit"]: declared/undocumented)
Status hierarchy per cookie:
UNDOC — in browser, NOT in declared list HIGH
MISMATCH — declared with different category/duration MED
ORPH — declared, NOT in browser LOW
OK — declared + in browser, values agree PASS
Per-row fields (each `❌` when not ascertainable):
name, vendor, category, duration, retention_grounds, country,
third_country (bool), processing_company, sources, status, action
"""
from __future__ import annotations
from html import escape as h
from ._style import chip
# EU + EWR + CH — no third-country transfer.
EU_EEA_CH = {
"DE", "AT", "BE", "BG", "HR", "CY", "CZ", "DK", "EE", "FI",
"FR", "GR", "HU", "IE", "IT", "LV", "LT", "LU", "MT", "NL",
"PL", "PT", "RO", "SK", "SI", "ES", "SE",
"IS", "LI", "NO", "CH",
}
# Adequacy decisions (limited list — most relevant in cookie context).
ADEQUACY = {"US", "UK", "JP", "KR", "IL", "CA", "NZ", "AR", "UY", "AD"}
def _norm(s: str | None) -> str:
return (s or "").strip().lower()
def _missing(value: str | None) -> bool:
if value is None:
return True
v = str(value).strip()
if not v:
return True
return v.lower() in ("", "?", "unknown", "n/a", "tbd")
def _x_or(value: str | None) -> str:
"""Render `❌` when the value is missing, else escape + return."""
if _missing(value):
return '<span style="color:#dc2626;font-weight:700;" title="fehlend">❌</span>'
return h(str(value))
def _country_third(country: str | None) -> tuple[str, bool, str | None]:
"""Return (display, is_third_country, adequacy_tag).
is_third_country=True when outside EU/EEA/CH.
adequacy_tag e.g. "DPF" or None.
"""
if _missing(country):
return ("", False, None)
code = (country or "").strip().upper()
# accept "Germany" → "DE" via crude mapping for the most common names
name_map = {
"DEUTSCHLAND": "DE", "GERMANY": "DE", "IRELAND": "IE", "IRLAND": "IE",
"USA": "US", "UNITED STATES": "US",
}
code = name_map.get(code, code)
if code in EU_EEA_CH:
return (code, False, None)
tag = "DPF" if code in ADEQUACY else "RISK"
return (code, True, tag)
def _src_chip(in_dse: bool, in_table: bool, in_browser: bool,
in_ocr: bool) -> str:
parts: list[str] = []
if in_dse:
parts.append("DSE")
if in_table:
parts.append("Tabelle")
if in_ocr:
parts.append("OCR")
if in_browser:
parts.append("Browser")
return " · ".join(parts) if parts else ""
def _build_status(declared: bool, in_browser: bool,
cookie_audit_undeclared: set,
cookie_audit_compliant: set,
name_lc: str) -> tuple[str, str]:
if name_lc in cookie_audit_undeclared or (in_browser and not declared):
return "UNDOC", "fail"
if declared and not in_browser:
return "ORPH", "warn"
if declared and in_browser:
return "OK", "pass"
return "", "info"
def build_cookie_inventory(state: dict) -> tuple[list[dict], dict]:
"""Build the merged inventory + summary."""
cmp_vendors = state.get("cmp_vendors") or []
banner = state.get("banner_result") or {}
cookies_detailed = banner.get("cookies_detailed") or []
cookie_audit = state.get("cookie_audit") or {}
# 1) Declared
declared: dict[str, dict] = {}
for v in cmp_vendors:
vname = (v.get("name") or "").strip()
vcountry = (v.get("country") or "").strip()
vproc = (v.get("processing_company") or "").strip()
vretention = (v.get("persistence") or "").strip() # vendor-level
src = (v.get("source") or "").lower()
in_dse = "dse" in src or "table_crawled" in src
in_table = ("table" in src or "pasted" in src
or "html_table" in src)
in_ocr = "tesseract" in src or "ocr" in src
for c in (v.get("cookies") or []):
cname = (c.get("name") or "").strip()
if not cname:
continue
key = _norm(cname)
entry = declared.setdefault(key, {
"name": cname,
"vendor": vname,
"category": "",
"duration": "",
"retention_grounds": "",
"country": vcountry,
"processing_company": vproc,
"in_dse": False,
"in_table": False,
"in_ocr": False,
})
entry["category"] = (entry["category"]
or (c.get("category") or "").strip())
entry["duration"] = (entry["duration"]
or (c.get("duration")
or c.get("persistence") or "").strip())
# cookie-level overrides if richer
if not entry["country"] and vcountry:
entry["country"] = vcountry
if not entry["processing_company"] and vproc:
entry["processing_company"] = vproc
if not entry["retention_grounds"] and vretention:
entry["retention_grounds"] = vretention
entry["in_dse"] = entry["in_dse"] or in_dse
entry["in_table"] = entry["in_table"] or in_table
entry["in_ocr"] = entry["in_ocr"] or in_ocr
# 2) Browser
browser: dict[str, dict] = {}
for c in cookies_detailed:
cname = (c.get("name") or "").strip()
if not cname:
continue
browser[_norm(cname)] = c
# 3) cookie_audit hints
undeclared_set: set = {
_norm((c.get("name") if isinstance(c, dict) else c) or "")
for c in (cookie_audit.get("undeclared_in_browser") or [])
}
compliant_set: set = {
_norm((c.get("name") if isinstance(c, dict) else c) or "")
for c in (cookie_audit.get("compliant") or [])
}
all_keys = set(declared.keys()) | set(browser.keys())
rows: list[dict] = []
for key in sorted(all_keys):
d = declared.get(key) or {}
b = browser.get(key) or {}
name = d.get("name") or b.get("name") or key
vendor = (d.get("vendor")
or b.get("domain") or "").strip() or ""
country = d.get("country", "")
country_display, is_third, adq = _country_third(country)
in_browser = key in browser
is_declared = key in declared
status, sev = _build_status(
is_declared, in_browser, undeclared_set, compliant_set, key,
)
sources = _src_chip(
d.get("in_dse", False),
d.get("in_table", False),
in_browser,
d.get("in_ocr", False),
)
rows.append({
"name": name,
"vendor": vendor,
"category": d.get("category", ""),
"duration": d.get("duration", ""),
"retention_grounds": d.get("retention_grounds", ""),
"country": country_display,
"third_country": is_third,
"third_country_tag": adq,
"processing_company": d.get("processing_company", ""),
"sources": sources,
"status_code": status,
"status_sev": sev,
"declared": is_declared,
"in_browser": in_browser,
})
order = {"UNDOC": 0, "MISMATCH": 1, "ORPH": 2, "OK": 3, "": 4}
rows.sort(key=lambda r: (order.get(r["status_code"], 9),
r["name"].lower()))
summary = {
"total": len(rows),
"ok": sum(1 for r in rows if r["status_code"] == "OK"),
"undoc": sum(1 for r in rows if r["status_code"] == "UNDOC"),
"orph": sum(1 for r in rows if r["status_code"] == "ORPH"),
"mismatch": sum(1 for r in rows if r["status_code"] == "MISMATCH"),
"declared": sum(1 for r in rows if r["declared"]),
"in_browser": sum(1 for r in rows if r["in_browser"]),
"third_country": sum(1 for r in rows if r["third_country"]),
"missing_country": sum(1 for r in rows if _missing(r["country"])),
"missing_duration": sum(1 for r in rows if _missing(r["duration"])),
}
return rows, summary
def render_inventory_rows(rows: list[dict]) -> list[list[str]]:
"""Cell-rows for `_style.table`.
Columns: Name | Vendor | Kat | Speicherdauer | Löschfrist |
Sitzland | Verantwortlich | Quelle | Status
"""
out: list[list[str]] = []
for r in rows:
country_html = _x_or(r["country"])
if r["third_country"]:
tag = r.get("third_country_tag") or "RISK"
tag_color = "#92400e" if tag == "DPF" else "#dc2626"
country_html += (
f' <span style="font-size:10px;color:{tag_color};'
f'font-weight:700;">[{tag}]</span>'
)
out.append([
f'<code>{h(r["name"])}</code>',
h(r["vendor"]) if r["vendor"] else
'<span style="color:#dc2626;">❌</span>',
_x_or(r["category"]),
_x_or(r["duration"]),
_x_or(r["retention_grounds"]),
country_html,
_x_or(r["processing_company"]),
h(r["sources"]),
chip(r["status_code"], r["status_sev"]),
])
return out
def inventory_headers() -> list[str]:
return ["Name", "Vendor", "Kat.", "Speicherdauer", "Löschfrist",
"Sitzland", "Verantwortlich", "Quelle", "Status"]
@@ -0,0 +1,113 @@
"""Mail-V2 label normalizer — turn MC questions into statements.
Historic MC labels read like compliance-officer checklists:
"Dokumentiert die Datenschutzinformation alle Datenübermittlungen
gemäß Art. 49 Abs. 1 Unterabs. 2 DS-GVO?"
In the audit mail that looks like "we don't know" — unhelpful.
This module rewrites the label as a statement of WHAT WAS CHECKED
so the reader gets a topic, not a question:
"Drittland-Übermittlungen Art. 49 Abs. 1 Unterabs. 2 DS-GVO"
The transformation is purely textual; the underlying MC stays as is.
"""
from __future__ import annotations
import re
# Question-stem → topic-prefix rewrites, applied in order.
_REWRITES: list[tuple[re.Pattern, str]] = [
(re.compile(r"^Dokumentiert\s+die\s+(.+?)\s+(.+?)\?$", re.IGNORECASE),
r"\2"),
(re.compile(r"^Werden\s+(.+?)\s+dokumentiert\?$", re.IGNORECASE),
r"\1 dokumentieren"),
(re.compile(r"^Wird\s+(.+?)\s+benannt\?$", re.IGNORECASE),
r"\1 benennen"),
(re.compile(r"^Ist\s+(.+?)\s+angegeben\?$", re.IGNORECASE),
r"\1 angeben"),
(re.compile(r"^Enthält\s+(?:die\s+)?(.+?)\s+(.+?)\?$", re.IGNORECASE),
r"\2 in \1"),
(re.compile(r"^Sind\s+(.+?)\s+vorhanden\?$", re.IGNORECASE),
r"\1 prüfen"),
(re.compile(r"^Gibt\s+es\s+(.+?)\?$", re.IGNORECASE),
r"\1 prüfen"),
]
def label_as_statement(label: str) -> str:
"""Rewrite a question-form label as a topic statement."""
if not label:
return label
s = label.strip()
if not s.endswith("?"):
return s
for pat, repl in _REWRITES:
m = pat.match(s)
if m:
out = pat.sub(repl, s).strip()
# First word capitalised
return out[:1].upper() + out[1:] if out else s
# Generic fallback: drop the question mark + leading "Wird/Sind/Ist"
s2 = re.sub(r"^\s*(Wird|Sind|Ist|Werden|Gibt es|Enthält|Hat)\s+",
"", s, flags=re.IGNORECASE).rstrip("?")
return s2[:1].upper() + s2[1:] if s2 else s
def has_evidence(check) -> bool:
"""Decide whether an MC check has real evidence backing the FAIL.
A FAIL with non-empty `matched_text` (the regex/LLM did find a
string and judged it insufficient) is a hard fail. A FAIL with
empty matched_text is more like 'we could not confirm' → that
belongs in the manual-review bucket, not in critical findings.
"""
matched = getattr(check, "matched_text", "") or ""
return bool(matched.strip())
# Keywords that indicate a check is about an INTERNAL process the
# auditor cannot observe from outside (TOM, DSFA, AVV, training,
# incident response, risk analysis, deletion concept). These are
# never findings — they are reminders that the DPO/DSB must verify
# the document/process exists internally.
_INTERNAL_KEYWORDS = (
"tom", "technisch-organisatorische", "technisch organisatorische",
"dsfa", "datenschutz-folgenabschätzung",
"datenschutzfolgenabschätzung",
"schulung", "training", "awareness",
"avv", "auftragsverarbeitungsvertrag", "auftragsverarbeitung",
"incident", "vorfall", "meldepflicht intern",
"risikoanalyse", "risikobewertung", "risk assessment",
"löschkonzept", "löschfristen-konzept",
"vvt", "verzeichnis der verarbeitungstätigkeiten",
"dsb-bestellung", "dsb bestellung",
"verfahrensverzeichnis", "berichtigungskonzept",
"betroffenenrechte-prozess", "dsr-prozess",
)
def is_internal_process(check) -> bool:
"""Decide whether the MC check is about an internal process."""
label = (getattr(check, "label", "") or "").lower()
cid = (getattr(check, "id", "") or "").lower()
hint = (getattr(check, "hint", "") or "").lower()
# mc_audit_type module may have annotated the check
audit_type = getattr(check, "audit_type", "")
if audit_type and audit_type in ("internal", "process", "documentation"):
return True
hay = f"{label} {cid} {hint}"
return any(k in hay for k in _INTERNAL_KEYWORDS)
def classify_check(check) -> str:
"""Return one of: 'hard_fail' | 'manual_review' | 'internal_reminder'.
Only call on FAIL checks (passed=False, skipped=False). Drives
which bucket the check renders into.
"""
if is_internal_process(check):
return "internal_reminder"
if has_evidence(check):
return "hard_fail"
return "manual_review"
@@ -0,0 +1,446 @@
"""Mail-V2 legacy wrappers — wrap each existing build_*_html() in V2 shell.
The original step-5 had 24+ render functions, each emitting standalone
HTML with their own styles. V2 keeps all the information by wrapping
each output in a consistent V2 `section()` container with stripe +
palette. The block-level styling normalizes; the inner data tables/
lists keep their legacy markup so we don't lose detail.
Each wrapper is defensive: missing data, import errors, or empty
HTML → return "" so the section disappears rather than crashing.
"""
from __future__ import annotations
import logging
from ._style import section
logger = logging.getLogger(__name__)
def _safe_wrap(label: str, anchor: str, html: str,
*, sev: str = "info") -> str:
if not html or not html.strip():
return ""
return section(label, html, sev=sev, anchor=anchor)
# ── Tier 1 (Sales-critical) ──────────────────────────────────────
def render_executive_summary(state: dict) -> str:
"""P82 GF-1-Pager + P1 Exec-Summary combined as 'Executive Summary'."""
parts: list[str] = []
req = state.get("req")
try:
from compliance.services.gf_one_pager import build_gf_one_pager_html
html = build_gf_one_pager_html(
site_name=state.get("site_name") or "",
scorecard=state.get("scorecard") or {},
previous_scorecard=state.get("prev_scorecard"),
banner_result=state.get("banner_result"),
library_mismatch_findings=state.get("mismatches") or [],
scan_context=getattr(req, "scan_context", None) if req else None,
audit_quality_findings=state.get("audit_quality_findings") or [],
)
if html and html.strip():
parts.append(html)
except Exception as e:
logger.warning("gf_one_pager wrapper: %s", e)
try:
from compliance.api.agent_doc_check_exec_summary import (
build_exec_summary_html,
)
html = build_exec_summary_html(
scorecard=state.get("scorecard") or {},
previous_scorecard=state.get("prev_scorecard"),
cmp_vendors=state.get("cmp_vendors") or [],
redundancy_report=state.get("redundancy_report"),
site_name=state.get("site_name") or "",
)
if html and html.strip():
parts.append(html)
except Exception as e:
logger.warning("exec_summary wrapper: %s", e)
return _safe_wrap("💼 Executive Summary", "exec",
"".join(parts), sev="info")
def render_banner_screenshot(state: dict) -> str:
"""P85 — Banner-Screenshot as visual proof."""
try:
from compliance.services.banner_screenshot_block import (
build_banner_screenshot_html,
)
html = build_banner_screenshot_html(state.get("banner_result"))
return _safe_wrap("📸 Banner-Screenshot", "banner-shot",
html, sev="info")
except Exception as e:
logger.warning("banner_screenshot wrapper: %s", e)
return ""
def render_vvt(state: dict) -> str:
"""VVT-Tabelle nach Art. 30 DSGVO — Verarbeitungstätigkeiten."""
try:
from compliance.api.agent_doc_check_extras import (
build_vvt_table_html,
)
html = build_vvt_table_html(state.get("cmp_vendors") or [])
return _safe_wrap("📋 VVT — Verarbeitungstätigkeiten (Art. 30 DSGVO)",
"vvt", html, sev="info")
except Exception as e:
logger.warning("vvt wrapper: %s", e)
return ""
def render_redundancy(state: dict) -> str:
"""O4 — Vendor-Redundanz + EU-Alternativen + Cost-Savings."""
try:
from compliance.api.agent_doc_check_redundancy import (
build_redundancy_html,
)
html = build_redundancy_html(state.get("redundancy_report"))
return _safe_wrap("💰 Optimierungspotenzial (Redundanz / EU-Alt.)",
"redundancy", html, sev="warn")
except Exception as e:
logger.warning("redundancy wrapper: %s", e)
return ""
def render_diff(state: dict) -> str:
"""P84 — Diff-Mode: Veränderung seit letztem Lauf."""
try:
from compliance.services.run_diff import (
build_diff_block_html, compute_diff,
)
from database import SessionLocal
db = SessionLocal()
try:
diff = compute_diff(
db, state["check_id"], state.get("domain_for_exec") or "",
state.get("banner_result"), state.get("scorecard"),
)
html = build_diff_block_html(diff) if diff else ""
finally:
db.close()
return _safe_wrap("📊 Veränderung seit letztem Lauf",
"diff", html, sev="info")
except Exception as e:
logger.warning("diff wrapper: %s", e)
return ""
def render_scope_disclaimer(state: dict) -> str:
"""P62 — Was wir prüfen, was wir nicht prüfen können."""
try:
from compliance.api.scope_disclaimer import build_scope_disclaimer_html
html = build_scope_disclaimer_html()
return _safe_wrap("🔍 Prüfumfang & Methodische Hinweise",
"scope", html, sev="info")
except Exception as e:
logger.warning("scope_disclaimer wrapper: %s", e)
return ""
# ── Tier 2 (Audit-detail) ─────────────────────────────────────────
def render_banner_deep(state: dict) -> str:
"""Banner-Deep: Phases + Quality-Score + Per-Category-Tracker."""
try:
from compliance.api.agent_doc_check_banner import (
build_banner_deep_html,
)
html = build_banner_deep_html(state.get("banner_result"))
return _safe_wrap("🍪 Banner-Tiefenanalyse (Phasen + Kategorien)",
"banner-deep", html, sev="info")
except Exception as e:
logger.warning("banner_deep wrapper: %s", e)
return ""
def render_cookie_audit(state: dict) -> str:
"""Cookie 3-Quellen-Audit (deklariert ↔ Browser ↔ Library)."""
try:
from compliance.services.cookie_compliance_audit import (
build_cookie_audit_block_html,
)
html = build_cookie_audit_block_html(state.get("cookie_audit") or {})
return _safe_wrap("🔬 Cookie-Audit (3-Quellen-Vergleich)",
"cookie-audit", html, sev="warn")
except Exception as e:
logger.warning("cookie_audit wrapper: %s", e)
return ""
def render_solutions(state: dict) -> str:
"""P73 — LLM-Lösungsvorschläge pro HIGH-Fail."""
try:
from compliance.services.mc_solution_generator import (
build_solutions_block_html,
)
html = build_solutions_block_html(state.get("mc_solutions") or [])
return _safe_wrap("🎯 LLM-Lösungsvorschläge (P73)",
"solutions", html, sev="info")
except Exception as e:
logger.warning("solutions wrapper: %s", e)
return ""
def render_cookie_architecture(state: dict) -> str:
"""P10 — Cookie-Policy-Architecture (BMW-Pattern, layered separation)."""
try:
from compliance.services.cookie_policy_architecture import (
build_architecture_html,
)
html = build_architecture_html(state.get("cookie_architecture") or {})
return _safe_wrap("🏗 Cookie-Policy-Architektur",
"cookie-arch", html, sev="info")
except Exception as e:
logger.warning("cookie_architecture wrapper: %s", e)
return ""
def render_library_mismatch(state: dict) -> str:
"""P102 — Cookie-Klassifikations-Pruefung gegen Library."""
try:
from compliance.services.cookie_library_mismatch import (
build_mismatch_block_html,
)
html = build_mismatch_block_html(state.get("mismatches") or [])
return _safe_wrap("⚖️ Cookie-Klassifikation gegen Library (P102)",
"lib-mismatch", html, sev="warn")
except Exception as e:
logger.warning("library_mismatch wrapper: %s", e)
return ""
def render_banner_consistency(state: dict) -> str:
"""P92/P94 — Banner-Konsistenz / CMP-Health."""
try:
from compliance.services.banner_consistency_checks import (
build_consistency_block_html,
)
html = build_consistency_block_html(
state.get("consistency_findings") or [])
return _safe_wrap("🧩 Banner-Konsistenz + CMP-Health",
"banner-consistency", html, sev="warn")
except Exception as e:
logger.warning("banner_consistency wrapper: %s", e)
return ""
def render_signals(state: dict) -> str:
"""P35/P77/P78 — Save-Label, Cookies-in-DSE, JC-Klausel."""
try:
from compliance.services.doc_text_signals import (
build_signals_block_html,
)
html = build_signals_block_html(state.get("signal_findings") or [])
return _safe_wrap("🚩 Doc-Text-Signale (P35/P77/P78)",
"signals", html, sev="info")
except Exception as e:
logger.warning("signals wrapper: %s", e)
return ""
def render_scorecard_regulation(state: dict) -> str:
"""MC-Scorecard per Regulation (DSGVO/TDDDG/BGB-Split)."""
try:
from compliance.api.agent_doc_check_scorecard import (
build_scorecard_html,
)
html = build_scorecard_html(
state.get("scorecard") or {},
previous_scorecard=state.get("prev_scorecard"),
)
return _safe_wrap("📊 Compliance-Scorecard pro Regulation",
"scorecard", html, sev="info")
except Exception as e:
logger.warning("scorecard wrapper: %s", e)
return ""
def render_profile_html(state: dict) -> str:
"""Erkanntes Geschäftsmodell."""
try:
from compliance.api.agent_doc_check_report import build_profile_html
html = build_profile_html(state.get("profile"))
return _safe_wrap("🏢 Erkanntes Geschäftsmodell",
"profile", html, sev="info")
except Exception as e:
logger.warning("profile wrapper: %s", e)
return ""
def render_input_warnings(state: dict) -> str:
"""Doc-Input-Warnings: User Text in falsches Feld gepastet."""
try:
from compliance.services.doc_input_warnings import (
build_warnings_block_html,
)
warns = state.get("input_warnings") or []
html = build_warnings_block_html(warns) if warns else ""
return _safe_wrap("⚠️ Eingabe-Warnungen",
"input-warn", html, sev="warn")
except Exception as e:
logger.warning("input_warnings wrapper: %s", e)
return ""
# ── Tier 3 (Cookie-deep + advisory) ───────────────────────────────
def render_entropy(state: dict) -> str:
"""P103 — Cookie-Value-Entropy."""
try:
from compliance.services.cookie_value_entropy import (
build_entropy_block_html,
)
html = build_entropy_block_html(state.get("entropy_findings") or [])
return _safe_wrap("🎲 Cookie-Entropy-Anomalien (P103)",
"entropy", html, sev="info")
except Exception as e:
logger.warning("entropy wrapper: %s", e)
return ""
def render_network_trace(state: dict) -> str:
"""P104 — Network-Tracing."""
try:
from compliance.services.cookie_network_tracer import (
build_network_trace_block_html,
)
html = build_network_trace_block_html(
state.get("network_findings") or [])
return _safe_wrap("🌐 Network-Tracing (P104)",
"network", html, sev="info")
except Exception as e:
logger.warning("network_trace wrapper: %s", e)
return ""
def render_tcf_authority(state: dict) -> str:
"""P105 — IAB TCF Authority Cross-Reference."""
try:
from compliance.services.tcf_vendor_authority import (
build_tcf_authority_block_html,
)
html = build_tcf_authority_block_html(
state.get("tcf_authority_findings") or [])
return _safe_wrap("🆔 IAB TCF Vendor Authority (P105)",
"tcf-auth", html, sev="info")
except Exception as e:
logger.warning("tcf_authority wrapper: %s", e)
return ""
def render_jc_avv(state: dict) -> str:
"""P71 — JC-vs-AVV Entscheidungsbaum."""
try:
from compliance.services.jc_avv_decision import (
build_jc_avv_decision_html,
)
html = build_jc_avv_decision_html(
(state.get("doc_texts") or {}).get("dse"))
return _safe_wrap("⚖️ Joint Controller vs. AVV — Entscheidung (P71)",
"jc-avv", html, sev="info")
except Exception as e:
logger.warning("jc_avv wrapper: %s", e)
return ""
def render_industry_context(state: dict) -> str:
"""P6/53/55 — Branchen-Kontext + Site-History."""
try:
from compliance.services.industry_library import (
build_industry_context_block_html,
)
ind = None
req = state.get("req")
if req and getattr(req, "scan_context", None):
ind = req.scan_context.get("industry")
html = build_industry_context_block_html(
ind, state.get("site_profile"))
return _safe_wrap("🏭 Branchen-Kontext + Historie",
"industry", html, sev="info")
except Exception as e:
logger.warning("industry_context wrapper: %s", e)
return ""
def render_benchmark(state: dict) -> str:
"""P86 — Branchen-Benchmark."""
try:
from compliance.services.industry_benchmark import (
build_benchmark_html,
)
html = build_benchmark_html(state.get("benchmark") or {})
return _safe_wrap("📈 Branchen-Benchmark (P86)",
"bench", html, sev="info")
except Exception as e:
logger.warning("benchmark wrapper: %s", e)
return ""
def render_scanned_urls(state: dict) -> str:
"""Quellen-Transparenz: welche URLs wurden gecrawlt."""
try:
from compliance.api.agent_doc_check_report import (
build_scanned_urls_html,
)
html = build_scanned_urls_html(state.get("doc_entries") or [])
return _safe_wrap("🔗 Geprüfte URLs (Quellen-Transparenz)",
"scanned-urls", html, sev="info")
except Exception as e:
logger.warning("scanned_urls wrapper: %s", e)
return ""
def render_management_summary(state: dict) -> str:
"""Konkrete Aufgaben für die Geschäftsführung."""
try:
from compliance.api.agent_doc_check_report import (
build_management_summary,
)
html = build_management_summary(state.get("results") or [])
return _safe_wrap("📝 Management-Zusammenfassung",
"mgmt", html, sev="info")
except Exception as e:
logger.warning("management_summary wrapper: %s", e)
return ""
# ── Render the whole legacy block region ────────────────────────
def render_all_legacy(state: dict) -> str:
"""Render every legacy block in the canonical order."""
return "".join([
# Tier 1 (Sales)
render_executive_summary(state),
render_diff(state),
render_solutions(state),
render_redundancy(state),
render_vvt(state),
render_banner_screenshot(state),
# Tier 2 (Audit-detail)
render_scorecard_regulation(state),
render_banner_deep(state),
render_banner_consistency(state),
render_cookie_audit(state),
render_cookie_architecture(state),
render_library_mismatch(state),
render_signals(state),
render_profile_html(state),
render_input_warnings(state),
# Tier 3 (advisory)
render_entropy(state),
render_network_trace(state),
render_tcf_authority(state),
render_jc_avv(state),
render_industry_context(state),
render_benchmark(state),
render_scanned_urls(state),
render_management_summary(state),
# Scope-Disclaimer last — footer-ish
render_scope_disclaimer(state),
])
@@ -0,0 +1,88 @@
"""Mail-V2 scope filter — drop MC findings that don't apply.
Some MC-DB entries are sector-specific (FIN = financial services,
GOV = public authority, MED = healthcare, INS = insurance, EDU =
education, LEG = legal profession). They have no business surfacing
for a normal B2C company like Elli (energy/EV charging).
This filter inspects the MC ID prefix and, when the prefix denotes
a sector that doesn't match the detected `business_scope`, drops the
check from the V2 finding renderers.
The MC pipeline itself is unchanged — MCs are still evaluated; we
just suppress them in the report when out of scope. Set
`KEEP_OOS_MCS=true` in the env to disable the filter (useful for
DSB debug runs).
"""
from __future__ import annotations
import os
# Prefix -> sector token expected in business_scope to KEEP the check.
SECTOR_PREFIXES: dict[str, set[str]] = {
"FIN": {"financial_services", "bank", "bafin", "fintech",
"payment_provider"},
"GOV": {"public_authority", "government", "behoerde"},
"MED": {"healthcare", "medical", "pharma", "klinik"},
"INS": {"insurance", "versicherung"},
"EDU": {"education", "schule", "hochschule", "university"},
"LEG": {"legal_profession", "anwaltskammer", "kanzlei"},
"REL": {"church", "religion", "religious"},
"POL": {"political_party", "partei"},
}
# Cheap counter so the renderer can show "X MCs gefiltert (out of scope)".
_LAST_DROPPED: dict[str, int] = {"count": 0, "by_prefix": {}}
def _enabled() -> bool:
return os.environ.get("KEEP_OOS_MCS", "false").lower() not in (
"true", "1", "yes", "on",
)
def _extract_prefix(check_id: str) -> str | None:
"""Return the sector prefix (e.g. 'FIN') from mc-FIN-814-A03."""
if not check_id:
return None
parts = check_id.split("-")
# mc-XXX-NNN-AYY → parts = ["mc", "XXX", "NNN", "AYY"]
if len(parts) >= 2 and parts[0].lower() == "mc":
prefix = parts[1].upper()
if prefix in SECTOR_PREFIXES:
return prefix
return None
def is_out_of_scope(check, business_scope: set[str] | None) -> bool:
"""Decide whether the check is sector-specific AND out of scope."""
if not _enabled():
return False
prefix = _extract_prefix(getattr(check, "id", "") or "")
if not prefix:
return False
required = SECTOR_PREFIXES.get(prefix) or set()
scope_lc = {s.lower() for s in (business_scope or set())}
return not (scope_lc & required)
def filter_out_of_scope(checks, business_scope: set[str] | None) -> list:
"""Return `checks` with out-of-scope items removed; mutates counter."""
_LAST_DROPPED["count"] = 0
_LAST_DROPPED["by_prefix"] = {}
out = []
for c in checks:
if is_out_of_scope(c, business_scope):
_LAST_DROPPED["count"] += 1
prefix = _extract_prefix(getattr(c, "id", "") or "") or "?"
_LAST_DROPPED["by_prefix"][prefix] = (
_LAST_DROPPED["by_prefix"].get(prefix, 0) + 1
)
continue
out.append(c)
return out
def get_last_drop_stats() -> dict:
return dict(_LAST_DROPPED)
@@ -0,0 +1,200 @@
"""Mail-V2 style system — single source of truth for all visual props.
Email rendering = inline styles only (most clients strip <style> tags
or sandbox them). Table-based layouts because flex/grid is unreliable
in Outlook. Font stack = email-safe (no web fonts).
Public helpers:
- `section(title, body_html, *, sev=None, anchor=None)` →
standardized full-width card with optional severity stripe + TOC
anchor
- `card(body_html, *, sev=None)` → smaller card inside a section
- `kpi(label, value, sub=None, sev=None)` → single KPI tile (used
in 4-column header grid)
- `kpi_row(items)` → evenly-sized row of KPIs
- `chip(text, sev)` → inline pill for severity / status
- `table(headers, rows, *, sev_col=None)` → consistent zebra table
`sev` is one of: "pass" | "fail" | "warn" | "info" | None (neutral)
"""
from __future__ import annotations
# ── Color palette ─────────────────────────────────────────────────
PAGE_BG = "#f8fafc"
CARD_BG = "#ffffff"
BORDER = "#e2e8f0"
TEXT = "#1e293b"
TEXT_MUTED = "#64748b"
HEADER_BG = "#0f172a"
HEADER_FG = "#f8fafc"
SEV = {
"pass": {"bg": "#dcfce7", "fg": "#15803d", "stripe": "#16a34a"},
"fail": {"bg": "#fee2e2", "fg": "#991b1b", "stripe": "#dc2626"},
"warn": {"bg": "#fef3c7", "fg": "#92400e", "stripe": "#f59e0b"},
"info": {"bg": "#dbeafe", "fg": "#1e40af", "stripe": "#3b82f6"},
}
NEUTRAL_STRIPE = "#cbd5e1"
# ── Typography ────────────────────────────────────────────────────
FONT = ("-apple-system,BlinkMacSystemFont,'Segoe UI',Roboto,Oxygen,"
"Ubuntu,sans-serif")
SZ_TITLE = "24px"
SZ_H2 = "18px"
SZ_H3 = "15px"
SZ_BODY = "14px"
SZ_SMALL = "12px"
# ── Layout ────────────────────────────────────────────────────────
MAX_W = "720px"
PAD_SECTION = "20px"
PAD_CARD = "14px"
RADIUS = "6px"
def _stripe(sev: str | None) -> str:
return SEV[sev]["stripe"] if sev in SEV else NEUTRAL_STRIPE
def section(title: str, body_html: str, *,
sev: str | None = None, anchor: str | None = None) -> str:
"""Top-level audit card — every report section uses this shell."""
stripe = _stripe(sev)
a = f'<a id="{anchor}"></a>' if anchor else ""
return (
f'{a}<table role="presentation" cellpadding="0" cellspacing="0" '
f'border="0" width="100%" style="margin:24px 0;border-collapse:'
f'separate;border-spacing:0;">'
f'<tr><td style="background:{CARD_BG};border:1px solid {BORDER};'
f'border-left:4px solid {stripe};border-radius:{RADIUS};'
f'padding:{PAD_SECTION};">'
f'<h2 style="margin:0 0 12px;font-family:{FONT};font-size:{SZ_H2};'
f'color:{TEXT};font-weight:600;">{title}</h2>'
f'<div style="font-family:{FONT};font-size:{SZ_BODY};color:{TEXT};'
f'line-height:1.5;">{body_html}</div>'
f'</td></tr></table>'
)
def card(body_html: str, *, sev: str | None = None) -> str:
"""Sub-card inside a section."""
stripe = _stripe(sev)
return (
f'<table role="presentation" cellpadding="0" cellspacing="0" '
f'border="0" width="100%" style="margin:8px 0;border-collapse:'
f'separate;border-spacing:0;">'
f'<tr><td style="background:{CARD_BG};border:1px solid {BORDER};'
f'border-left:3px solid {stripe};border-radius:{RADIUS};'
f'padding:{PAD_CARD};font-family:{FONT};font-size:{SZ_BODY};'
f'color:{TEXT};">'
f'{body_html}'
f'</td></tr></table>'
)
def kpi(label: str, value: str, sub: str | None = None,
sev: str | None = None) -> str:
"""One KPI tile. Used 4-in-a-row in the header."""
value_color = SEV[sev]["fg"] if sev in SEV else TEXT
sub_html = (
f'<div style="font-family:{FONT};font-size:{SZ_SMALL};'
f'color:{TEXT_MUTED};margin-top:2px;">{sub}</div>'
if sub else ""
)
return (
f'<td style="background:{CARD_BG};border:1px solid {BORDER};'
f'border-radius:{RADIUS};padding:14px;text-align:center;'
f'width:25%;vertical-align:top;">'
f'<div style="font-family:{FONT};font-size:{SZ_SMALL};'
f'color:{TEXT_MUTED};text-transform:uppercase;letter-spacing:.5px;">'
f'{label}</div>'
f'<div style="font-family:{FONT};font-size:26px;color:{value_color};'
f'font-weight:700;margin-top:6px;">{value}</div>'
f'{sub_html}'
f'</td>'
)
def kpi_row(items: list[dict]) -> str:
"""Render a row of 2-4 KPI tiles, equally sized."""
cells = "".join(
kpi(it["label"], it["value"], it.get("sub"), it.get("sev"))
for it in items
)
spacers = "".join(
'<td style="width:8px;"></td>' for _ in range(max(0, len(items) - 1))
)
# interleave
parts = items[:]
cells_list = [
kpi(it["label"], it["value"], it.get("sub"), it.get("sev"))
for it in parts
]
interleaved = '<td style="width:8px;"></td>'.join(cells_list)
return (
f'<table role="presentation" cellpadding="0" cellspacing="0" '
f'border="0" width="100%" style="margin:12px 0;border-collapse:'
f'separate;border-spacing:0;"><tr>{interleaved}</tr></table>'
)
def chip(text: str, sev: str | None = None) -> str:
"""Inline pill for severity / status."""
pal = SEV.get(sev or "", {"bg": "#f1f5f9", "fg": TEXT_MUTED})
return (
f'<span style="display:inline-block;background:{pal["bg"]};'
f'color:{pal["fg"]};font-family:{FONT};font-size:11px;font-weight:600;'
f'padding:2px 8px;border-radius:999px;'
f'text-transform:uppercase;letter-spacing:.3px;">{text}</span>'
)
def table(headers: list[str], rows: list[list[str]], *,
sev_col: int | None = None) -> str:
"""Render a consistent zebra table.
`sev_col`, when set, indicates which column already contains a
chip() (so we don't escape it).
"""
th = "".join(
f'<th style="text-align:left;padding:8px 10px;font-family:{FONT};'
f'font-size:{SZ_SMALL};color:{TEXT_MUTED};text-transform:uppercase;'
f'letter-spacing:.5px;border-bottom:1px solid {BORDER};">{h}</th>'
for h in headers
)
body_rows = []
for i, r in enumerate(rows):
bg = "#ffffff" if i % 2 == 0 else "#f8fafc"
cells = "".join(
f'<td style="padding:8px 10px;font-family:{FONT};font-size:13px;'
f'color:{TEXT};border-bottom:1px solid {BORDER};vertical-align:top;">'
f'{c}</td>' for c in r
)
body_rows.append(f'<tr style="background:{bg};">{cells}</tr>')
body = "".join(body_rows)
return (
f'<table role="presentation" cellpadding="0" cellspacing="0" '
f'border="0" width="100%" style="border-collapse:collapse;'
f'margin:8px 0;background:{CARD_BG};border:1px solid {BORDER};'
f'border-radius:{RADIUS};overflow:hidden;">'
f'<thead><tr>{th}</tr></thead><tbody>{body}</tbody></table>'
)
def page_open(site_name: str) -> str:
return (
f'<div style="background:{PAGE_BG};padding:24px 16px;font-family:{FONT};">'
f'<div style="max-width:{MAX_W};margin:0 auto;">'
)
def page_close(check_id: str, build_sha: str) -> str:
return (
f'<div style="margin-top:32px;padding:16px;font-family:{FONT};'
f'font-size:11px;color:{TEXT_MUTED};text-align:center;">'
f'BreakPilot Compliance · check_id <code>{check_id}</code> · '
f'build <code>{build_sha}</code>'
f'</div>'
f'</div></div>'
)