diff --git a/backend-compliance/compliance/api/agent_check/_b3_wiring.py b/backend-compliance/compliance/api/agent_check/_b3_wiring.py index 8f6e1a9d..e866990a 100644 --- a/backend-compliance/compliance/api/agent_check/_b3_wiring.py +++ b/backend-compliance/compliance/api/agent_check/_b3_wiring.py @@ -20,6 +20,7 @@ import time from compliance.services.retention_comparator import ( build_retention_theme_summary, compare_retention, + detect_intra_doc_contradictions, extract_retention_claims, ) @@ -54,6 +55,11 @@ def run_b3(state: dict) -> None: if not dsi_text: return + # Intra-doc contradictions are independent of cmp_vendors — run + # them first so they survive the early-return below. + intra = detect_intra_doc_contradictions(dsi_text) + state["retention_intra_doc"] = intra + cookie_records: list[dict] = [] cookie_names: list[str] = [] vendor_names: list[str] = [] diff --git a/backend-compliance/compliance/api/agent_check/_b4_wiring.py b/backend-compliance/compliance/api/agent_check/_b4_wiring.py new file mode 100644 index 00000000..d551c041 --- /dev/null +++ b/backend-compliance/compliance/api/agent_check/_b4_wiring.py @@ -0,0 +1,78 @@ +"""B4 wiring — Cross-Doc Vendor-Consistency check + HTML block. + +Activated after B1+B3 in the orchestrator. The check itself is +deterministic (no LLM); it scans DSE + cookie texts for known +service providers per service type and flags every mismatch. + +The mail renderer reads `state["vendor_consistency_findings"]` and +`state["vendor_consistency_html"]` directly — no further wiring. +""" + +from __future__ import annotations + +import html +import logging + +from compliance.services.vendor_consistency_check import ( + check_vendor_consistency, +) + +logger = logging.getLogger(__name__) + + +def run_b4(state: dict) -> None: + findings = check_vendor_consistency(state) + state["vendor_consistency_findings"] = findings + if not findings: + return + state["vendor_consistency_html"] = _render(findings) + logger.info( + "B4 Vendor-Consistency: %d findings (HIGH=%d, MEDIUM=%d)", + len(findings), + sum(1 for f in findings if (f.get("severity") or "") == "HIGH"), + sum(1 for f in findings if (f.get("severity") or "") == "MEDIUM"), + ) + + +def _render(findings: list[dict]) -> str: + rows = [] + for f in findings: + sev = (f.get("severity") or "").upper() + color = "#dc2626" if sev == "HIGH" else "#f59e0b" + dse = ", ".join(f.get("dse_providers") or []) or "–" + cookie = ", ".join(f.get("cookie_providers") or []) or "–" + rows.append( + "
" + f"{len(findings)} Provider-Widersprüche zwischen " + "Datenschutzerklärung und Cookie-Seite. Beispiel Elli: " + "DSE = Vertex AI für Chatbot, Cookies-Seite = Iadvize.
" + "| Service-Typ | " + "In DSE | " + "Auf Cookies-Seite | " + "Severity | " + "
|---|
' + f'{h(err)}
') + else: + counts = ( + f'' + f'… und {len(rows) - 50} weitere
' + ) + sev = "fail" if summary["undoc"] else "warn" if summary["orph"] else "pass" + return card(head + body, sev=sev) + + +def render_sofortmassnahmen(state: dict) -> str: + """Aggregated bulk-recommendations: '1 Aktion fixt N Items'.""" + groups = group_by_action(state) + if not groups: + return "" + rows = [] + for g in groups: + items = g["items"] + sample = ", ".join(items[:5]) + more = f" + {len(items) - 5} weitere" if len(items) > 5 else "" + eff_sev = ("pass" if g["effort"] == "low" + else "warn" if g["effort"] == "med" else "fail") + rows.append([ + f'{g.get("icon") or "•"} {h(g["label"])}' + f'' + 'Eine Aktion behebt mehrere Findings auf einmal — nach Aufwand sortiert.' + '
' + body, + sev="warn", + anchor="sofortmassnahmen", + ) + + +def render_theme_retention(state: dict) -> str: + s = state.get("retention_theme_summary") or {} + findings = state.get("retention_findings") or [] + if not s.get("total"): + return "" + head = ( + f'{h(f.get("cookie_name") or "—")}',
+ h(f.get("vendor_name") or "—"),
+ h(f.get("mismatch_type") or ""),
+ chip(sev, sev_key),
+ ])
+ body = table(["Cookie", "Vendor", "Mismatch", "Sev"], rows)
+ sev = "fail" if s.get("failed", 0) else "warn"
+ return card(head + body, sev=sev)
+
+
+def render_theme_reachability(state: dict) -> str:
+ f = state.get("reachability_finding") or {}
+ if not f:
+ return ""
+ passed = f.get("passed")
+ sev_key = "pass" if passed else (
+ "fail" if (f.get("severity") or "").upper() == "HIGH" else "warn")
+ notes_html = "".join(
+ f''
+ f'Beweis-ZIP evidence-{h(state.get("check_id", "")[:8])}.zip '
+ f'mit {n} Slice(s), '
+ f'manifest.json + audit_metadata.json (SHA256 pro Slice).
' + f'Quelle: {h(meta.get("url") or "—")}' + f'
' + ) + return section("📎 7. Anhänge", body, sev="info", anchor="attach") diff --git a/backend-compliance/compliance/services/mail_render_v2/_blocks_findings.py b/backend-compliance/compliance/services/mail_render_v2/_blocks_findings.py new file mode 100644 index 00000000..7e5b485e --- /dev/null +++ b/backend-compliance/compliance/services/mail_render_v2/_blocks_findings.py @@ -0,0 +1,290 @@ +"""Mail-V2 finding-bucket renderers. + +Separates FAIL items into three buckets — the user's design constraint: + + hard_fail public + evidence → 🔴 Kritische Befunde + manual_review public, no evidence → 🔍 Manuelle Prüfung + internal_reminder internal process → 💼 Reminder (NEVER a fail) + +The MC-DB stays as-is. If the LLM-Plausibility phase has already run +it stamps `c.llm_title` / `c.llm_recommendation` / `c.llm_severity` +onto each check; the renderer picks those up when present, otherwise +falls back to the original MC label verbatim. No question-form +rewriting here — that's the LLM-phase's job. +""" + +from __future__ import annotations + +from html import escape as h + +from ._actions import action_for_finding +from ._label_norm import classify_check +from ._scope_filter import ( + filter_out_of_scope, + get_last_drop_stats, +) +from ._style import ( + SZ_SMALL, + TEXT, + TEXT_MUTED, + card, + chip, + section, +) + + +def _strip_qmark(s: str) -> str: + """Normalise a string for dedup comparison.""" + return (s or "").strip().rstrip("?").strip().lower() + + +def _is_dup(a: str, b: str) -> bool: + """True when a and b carry essentially the same content.""" + aa = _strip_qmark(a) + bb = _strip_qmark(b) + if not aa or not bb: + return False + if aa == bb: + return True + short, long = sorted((aa, bb), key=len) + return short and short in long and len(short) > 30 + + +def _collect_three_buckets(state: dict) -> tuple[list[dict], list[dict], + list[dict]]: + """Split all FAIL items into the three buckets.""" + hard: list[dict] = [] + manual: list[dict] = [] + internal: list[dict] = [] + + business_scope = state.get("business_scope") or set() + for r in state.get("results") or []: + # Drop sector-specific MCs that don't apply to this business + scoped = filter_out_of_scope( + getattr(r, "checks", []) or [], business_scope, + ) + for c in scoped: + sev = (getattr(c, "severity", "") or "").upper() + # LLM-plausibility may downgrade — read llm_severity if set + llm_sev = (getattr(c, "llm_severity", "") or "").upper() + effective_sev = llm_sev or sev + if effective_sev not in ("CRITICAL", "HIGH", "MEDIUM"): + continue + if getattr(c, "passed", True) or getattr(c, "skipped", False): + continue + # LLM may flag a finding as not plausible → drop + if getattr(c, "llm_drop", False): + continue + bucket = classify_check(c) + raw_label = getattr(c, "label", "") + llm_title = getattr(c, "llm_title", "") or "" + llm_recommendation = getattr(c, "llm_recommendation", "") or "" + title = (llm_title or raw_label)[:200] + hint = (getattr(c, "hint", "") or "")[:500] + matched = (getattr(c, "matched_text", "") or "")[:400] + action = action_for_finding( + getattr(c, "id", ""), effective_sev, raw_label, hint, + ) + entry = { + "sev": effective_sev, + "id": getattr(c, "id", ""), + "title": title, + "raw_label": raw_label, + "hint": hint, + "matched": matched, + "llm_recommendation": llm_recommendation, + "doc": getattr(r, "label", ""), + "reg": getattr(c, "regulation", "") or "", + "action": action.to_dict() if action else None, + } + if bucket == "hard_fail" and effective_sev in ("CRITICAL", "HIGH"): + hard.append(entry) + elif bucket == "internal_reminder": + internal.append(entry) + else: + manual.append(entry) + + # B1 reachability (always hard if HIGH — directly observed) + rb1 = state.get("reachability_finding") or {} + if (rb1.get("severity") or "").upper() == "HIGH" and not rb1.get("passed"): + notes = " · ".join(rb1.get("notes") or []) + hard.append({ + "sev": "HIGH", + "id": rb1.get("check_id", "COOKIE-CONSENT-UX-001"), + "title": "Mobile Consent-Reachability — kein Reopen-Link im Footer", + "raw_label": "Mobile Consent-Reachability", + "hint": notes, + "matched": "Footer-Scan: 0 Reopen-Anchor", + "llm_recommendation": "", + "doc": "Website-Footer", + "reg": "DSGVO Art. 7 Abs. 3", + "action": {"title": "Cookie-Einstellungen-Link im Footer ergänzen", + "target": "Website-Footer (alle Seiten)", + "detail": ("Footer-Link 'Cookie-Einstellungen' " + "ergänzen, der den CMP direkt öffnet."), + "effort": "low"}, + }) + + # B3 retention HIGH/MED fails (3-source evidence) + for f in (state.get("retention_findings") or []): + sev = (f.get("severity") or "").upper() + if sev not in ("HIGH", "MEDIUM") or f.get("matches"): + continue + cookie = f.get("cookie_name") or "—" + hard.append({ + "sev": sev, + "id": "TH-RETENTION", + "title": f"Speicherdauer-Konflikt für {cookie}", + "raw_label": "Cookie-Speicherdauer-Konsistenz", + "hint": (f"DSI {f.get('dsi_days')}d · Tabelle " + f"{f.get('table_days')}d · " + f"Realität {f.get('actual_days')}d"), + "matched": (f.get("dsi_sentence") or "")[:200], + "llm_recommendation": "", + "doc": "Cookie-Richtlinie", + "reg": "DSGVO Art. 13 Abs. 2 lit.a", + "action": {"title": ("DSE / Cookie-Tabelle korrigieren " + if "dsi" in (f.get("mismatch_type") or "") + else "Cookie-Lifetime reduzieren"), + "target": "DSE + Cookie-Tabelle", + "detail": f"Mismatch-Typ: {f.get('mismatch_type')}", + "effort": "low"}, + }) + + sev_rank = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2} + hard.sort(key=lambda x: (sev_rank.get(x["sev"], 9), x["title"])) + return hard, manual, internal + + +def count_critical(state: dict) -> int: + hard, _, _ = _collect_three_buckets(state) + return len(hard) + + +def count_manual(state: dict) -> int: + _, manual, _ = _collect_three_buckets(state) + return len(manual) + + +def count_internal(state: dict) -> int: + _, _, internal = _collect_three_buckets(state) + return len(internal) + + +def _render_finding_card(it: dict, *, sev_key: str = "fail") -> str: + head = ( + f'{chip(it["sev"], sev_key)}' + f'{h(it["title"])}' + ) + meta = ( + f'' + 'Keine HIGH/CRITICAL-Befunde mit harter Evidenz im aktuellen Lauf.' + '
' + ) + return section("✅ 1. Kritische Befunde", body, sev="pass", + anchor="critical") + cards = [_render_finding_card(it, sev_key="fail") for it in hard] + intro = ('' + 'Findings mit direkt beobachtbarer Evidenz (öffentliche Daten). ' + 'Pro Befund: Was wir geprüft haben · Beobachtung · Was zu tun ist.' + '
') + return section(f"🔴 1. Kritische Befunde ({len(hard)})", + intro + "".join(cards), sev="fail", anchor="critical") + + +def render_manual_review(state: dict) -> str: + _, manual, _ = _collect_three_buckets(state) + drop_stats = get_last_drop_stats() + if not manual: + if drop_stats.get("count"): + note = ('' + f'Keine manuell zu prüfenden Punkte. ' + f'Branchen-spezifische MCs ausgefiltert: ' + f'{drop_stats["count"]} ' + f'({", ".join(f"{k}:{v}" for k,v in drop_stats["by_prefix"].items())})' + '
') + return section("✅ 2. Manuelle Prüfung", note, sev="pass", + anchor="manual") + return "" + cards = [_render_finding_card(it, sev_key="warn") for it in manual] + intro = ('' + 'Diese Punkte sind öffentlich prüfbar, aber unser Audit konnte ' + 'sie nicht eindeutig feststellen — Hinweis: Original-MC-Frage. ' + 'Empfehlung: manuell beim Mandanten/DSB klären. ' + 'Die LLM-Plausibilitätsprüfung hilft Frage→Aussage zu wandeln ' + '(siehe 🤖-Block pro Finding falls schon gelaufen).
') + return section(f"🔍 2. Manuelle Prüfung erforderlich ({len(manual)})", + intro + "".join(cards), sev="warn", anchor="manual") + + +def render_internal_reminders(state: dict) -> str: + _, _, internal = _collect_three_buckets(state) + if not internal: + return "" + cards = [_render_finding_card(it, sev_key="info") for it in internal] + intro = ('' + 'Interne Prozesse (TOM, DSFA, AVV, Löschkonzept, Schulungen, ' + 'Incident-Response, VVT) sind von außen nicht prüfbar. ' + 'Dies sind Reminder — kein Befund über die Website. ' + 'Beim Mandanten die Existenz + Aktualität der Dokumente verifizieren.' + '
') + return section(f"💼 3. Interne Prozesse — Reminder ({len(internal)})", + intro + "".join(cards), sev="info", anchor="internal") diff --git a/backend-compliance/compliance/services/mail_render_v2/_compose.py b/backend-compliance/compliance/services/mail_render_v2/_compose.py new file mode 100644 index 00000000..502933bf --- /dev/null +++ b/backend-compliance/compliance/services/mail_render_v2/_compose.py @@ -0,0 +1,64 @@ +"""Mail-V2 compose — single entrypoint that returns the full HTML. + +Call `compose_v2(state)` from the email-dispatch phase when +`MAIL_RENDER_V2=true`. Default remains the legacy compose so we can +A/B in Mailpit. +""" + +from __future__ import annotations + +import os + +from ._blocks import ( + render_attachments, + render_caveats, + render_header, + render_per_doc, + render_per_theme, + render_sofortmassnahmen, + render_toc, +) +from ._blocks_findings import ( + render_critical, + render_internal_reminders, + render_manual_review, +) +from ._legacy_wrappers import render_all_legacy +from ._style import page_close, page_open + + +def compose_v2(state: dict) -> str: + """Build the full audit-mail HTML in the V2 layout.""" + site = state.get("site_name") or "—" + parts = [ + page_open(site), + render_header(state), + render_toc(state), + render_critical(state), + render_manual_review(state), + render_internal_reminders(state), + render_sofortmassnahmen(state), + render_per_doc(state), + render_per_theme(state), + # B4 — Cross-Doc Vendor-Consistency (Elli Vertex↔Iadvize pattern) + state.get("vendor_consistency_html", ""), + # B5 — AI-Act Art. 50 Transparenzpflicht + state.get("ai_act_html", ""), + # B6/B7/B8 — DPO-cross-doc + Doc-Staleness + CMP-fingerprint + state.get("extra_findings_html", ""), + # All legacy build_*_html() wrapped in V2 sections — preserves + # every information block from the old renderer (Exec Summary, + # Banner-Screenshot, VVT, Redundancy, Solutions, Diff, etc.) + render_all_legacy(state), + render_caveats(state), + render_attachments(state), + page_close(state.get("check_id", ""), + os.environ.get("BUILD_SHA", "unknown")), + ] + return "".join(p for p in parts if p) + + +def is_v2_enabled() -> bool: + return os.environ.get("MAIL_RENDER_V2", "false").lower() in ( + "true", "1", "yes", "on", + ) diff --git a/backend-compliance/compliance/services/mail_render_v2/_cookie_inventory.py b/backend-compliance/compliance/services/mail_render_v2/_cookie_inventory.py new file mode 100644 index 00000000..e8e9a17c --- /dev/null +++ b/backend-compliance/compliance/services/mail_render_v2/_cookie_inventory.py @@ -0,0 +1,267 @@ +"""Mail-V2 Cookie-Inventar — single table with per-cookie status + action. + +Merges three sources: + + - declared in DSE / cookie-table (state["cmp_vendors"][i]["cookies"]) + - live in browser (state["banner_result"]["cookies_detailed"]) + - cookie_audit comparison (state["cookie_audit"]: declared/undocumented) + +Status hierarchy per cookie: + + UNDOC — in browser, NOT in declared list HIGH + MISMATCH — declared with different category/duration MED + ORPH — declared, NOT in browser LOW + OK — declared + in browser, values agree PASS + +Per-row fields (each `❌` when not ascertainable): + name, vendor, category, duration, retention_grounds, country, + third_country (bool), processing_company, sources, status, action +""" + +from __future__ import annotations + +from html import escape as h + +from ._style import chip + + +# EU + EWR + CH — no third-country transfer. +EU_EEA_CH = { + "DE", "AT", "BE", "BG", "HR", "CY", "CZ", "DK", "EE", "FI", + "FR", "GR", "HU", "IE", "IT", "LV", "LT", "LU", "MT", "NL", + "PL", "PT", "RO", "SK", "SI", "ES", "SE", + "IS", "LI", "NO", "CH", +} +# Adequacy decisions (limited list — most relevant in cookie context). +ADEQUACY = {"US", "UK", "JP", "KR", "IL", "CA", "NZ", "AR", "UY", "AD"} + + +def _norm(s: str | None) -> str: + return (s or "").strip().lower() + + +def _missing(value: str | None) -> bool: + if value is None: + return True + v = str(value).strip() + if not v: + return True + return v.lower() in ("—", "?", "unknown", "n/a", "tbd") + + +def _x_or(value: str | None) -> str: + """Render `❌` when the value is missing, else escape + return.""" + if _missing(value): + return '❌' + return h(str(value)) + + +def _country_third(country: str | None) -> tuple[str, bool, str | None]: + """Return (display, is_third_country, adequacy_tag). + + is_third_country=True when outside EU/EEA/CH. + adequacy_tag e.g. "DPF" or None. + """ + if _missing(country): + return ("", False, None) + code = (country or "").strip().upper() + # accept "Germany" → "DE" via crude mapping for the most common names + name_map = { + "DEUTSCHLAND": "DE", "GERMANY": "DE", "IRELAND": "IE", "IRLAND": "IE", + "USA": "US", "UNITED STATES": "US", + } + code = name_map.get(code, code) + if code in EU_EEA_CH: + return (code, False, None) + tag = "DPF" if code in ADEQUACY else "RISK" + return (code, True, tag) + + +def _src_chip(in_dse: bool, in_table: bool, in_browser: bool, + in_ocr: bool) -> str: + parts: list[str] = [] + if in_dse: + parts.append("DSE") + if in_table: + parts.append("Tabelle") + if in_ocr: + parts.append("OCR") + if in_browser: + parts.append("Browser") + return " · ".join(parts) if parts else "—" + + +def _build_status(declared: bool, in_browser: bool, + cookie_audit_undeclared: set, + cookie_audit_compliant: set, + name_lc: str) -> tuple[str, str]: + if name_lc in cookie_audit_undeclared or (in_browser and not declared): + return "UNDOC", "fail" + if declared and not in_browser: + return "ORPH", "warn" + if declared and in_browser: + return "OK", "pass" + return "—", "info" + + +def build_cookie_inventory(state: dict) -> tuple[list[dict], dict]: + """Build the merged inventory + summary.""" + cmp_vendors = state.get("cmp_vendors") or [] + banner = state.get("banner_result") or {} + cookies_detailed = banner.get("cookies_detailed") or [] + cookie_audit = state.get("cookie_audit") or {} + + # 1) Declared + declared: dict[str, dict] = {} + for v in cmp_vendors: + vname = (v.get("name") or "").strip() + vcountry = (v.get("country") or "").strip() + vproc = (v.get("processing_company") or "").strip() + vretention = (v.get("persistence") or "").strip() # vendor-level + src = (v.get("source") or "").lower() + in_dse = "dse" in src or "table_crawled" in src + in_table = ("table" in src or "pasted" in src + or "html_table" in src) + in_ocr = "tesseract" in src or "ocr" in src + for c in (v.get("cookies") or []): + cname = (c.get("name") or "").strip() + if not cname: + continue + key = _norm(cname) + entry = declared.setdefault(key, { + "name": cname, + "vendor": vname, + "category": "", + "duration": "", + "retention_grounds": "", + "country": vcountry, + "processing_company": vproc, + "in_dse": False, + "in_table": False, + "in_ocr": False, + }) + entry["category"] = (entry["category"] + or (c.get("category") or "").strip()) + entry["duration"] = (entry["duration"] + or (c.get("duration") + or c.get("persistence") or "").strip()) + # cookie-level overrides if richer + if not entry["country"] and vcountry: + entry["country"] = vcountry + if not entry["processing_company"] and vproc: + entry["processing_company"] = vproc + if not entry["retention_grounds"] and vretention: + entry["retention_grounds"] = vretention + entry["in_dse"] = entry["in_dse"] or in_dse + entry["in_table"] = entry["in_table"] or in_table + entry["in_ocr"] = entry["in_ocr"] or in_ocr + + # 2) Browser + browser: dict[str, dict] = {} + for c in cookies_detailed: + cname = (c.get("name") or "").strip() + if not cname: + continue + browser[_norm(cname)] = c + + # 3) cookie_audit hints + undeclared_set: set = { + _norm((c.get("name") if isinstance(c, dict) else c) or "") + for c in (cookie_audit.get("undeclared_in_browser") or []) + } + compliant_set: set = { + _norm((c.get("name") if isinstance(c, dict) else c) or "") + for c in (cookie_audit.get("compliant") or []) + } + + all_keys = set(declared.keys()) | set(browser.keys()) + rows: list[dict] = [] + for key in sorted(all_keys): + d = declared.get(key) or {} + b = browser.get(key) or {} + name = d.get("name") or b.get("name") or key + vendor = (d.get("vendor") + or b.get("domain") or "").strip() or "" + country = d.get("country", "") + country_display, is_third, adq = _country_third(country) + in_browser = key in browser + is_declared = key in declared + status, sev = _build_status( + is_declared, in_browser, undeclared_set, compliant_set, key, + ) + sources = _src_chip( + d.get("in_dse", False), + d.get("in_table", False), + in_browser, + d.get("in_ocr", False), + ) + rows.append({ + "name": name, + "vendor": vendor, + "category": d.get("category", ""), + "duration": d.get("duration", ""), + "retention_grounds": d.get("retention_grounds", ""), + "country": country_display, + "third_country": is_third, + "third_country_tag": adq, + "processing_company": d.get("processing_company", ""), + "sources": sources, + "status_code": status, + "status_sev": sev, + "declared": is_declared, + "in_browser": in_browser, + }) + + order = {"UNDOC": 0, "MISMATCH": 1, "ORPH": 2, "OK": 3, "—": 4} + rows.sort(key=lambda r: (order.get(r["status_code"], 9), + r["name"].lower())) + + summary = { + "total": len(rows), + "ok": sum(1 for r in rows if r["status_code"] == "OK"), + "undoc": sum(1 for r in rows if r["status_code"] == "UNDOC"), + "orph": sum(1 for r in rows if r["status_code"] == "ORPH"), + "mismatch": sum(1 for r in rows if r["status_code"] == "MISMATCH"), + "declared": sum(1 for r in rows if r["declared"]), + "in_browser": sum(1 for r in rows if r["in_browser"]), + "third_country": sum(1 for r in rows if r["third_country"]), + "missing_country": sum(1 for r in rows if _missing(r["country"])), + "missing_duration": sum(1 for r in rows if _missing(r["duration"])), + } + return rows, summary + + +def render_inventory_rows(rows: list[dict]) -> list[list[str]]: + """Cell-rows for `_style.table`. + + Columns: Name | Vendor | Kat | Speicherdauer | Löschfrist | + Sitzland | Verantwortlich | Quelle | Status + """ + out: list[list[str]] = [] + for r in rows: + country_html = _x_or(r["country"]) + if r["third_country"]: + tag = r.get("third_country_tag") or "RISK" + tag_color = "#92400e" if tag == "DPF" else "#dc2626" + country_html += ( + f' [{tag}]' + ) + out.append([ + f'{h(r["name"])}',
+ h(r["vendor"]) if r["vendor"] else
+ '❌',
+ _x_or(r["category"]),
+ _x_or(r["duration"]),
+ _x_or(r["retention_grounds"]),
+ country_html,
+ _x_or(r["processing_company"]),
+ h(r["sources"]),
+ chip(r["status_code"], r["status_sev"]),
+ ])
+ return out
+
+
+def inventory_headers() -> list[str]:
+ return ["Name", "Vendor", "Kat.", "Speicherdauer", "Löschfrist",
+ "Sitzland", "Verantwortlich", "Quelle", "Status"]
diff --git a/backend-compliance/compliance/services/mail_render_v2/_label_norm.py b/backend-compliance/compliance/services/mail_render_v2/_label_norm.py
new file mode 100644
index 00000000..c7df7d34
--- /dev/null
+++ b/backend-compliance/compliance/services/mail_render_v2/_label_norm.py
@@ -0,0 +1,113 @@
+"""Mail-V2 label normalizer — turn MC questions into statements.
+
+Historic MC labels read like compliance-officer checklists:
+ "Dokumentiert die Datenschutzinformation alle Datenübermittlungen
+ gemäß Art. 49 Abs. 1 Unterabs. 2 DS-GVO?"
+
+In the audit mail that looks like "we don't know" — unhelpful.
+This module rewrites the label as a statement of WHAT WAS CHECKED
+so the reader gets a topic, not a question:
+ "Drittland-Übermittlungen Art. 49 Abs. 1 Unterabs. 2 DS-GVO"
+
+The transformation is purely textual; the underlying MC stays as is.
+"""
+
+from __future__ import annotations
+
+import re
+
+# Question-stem → topic-prefix rewrites, applied in order.
+_REWRITES: list[tuple[re.Pattern, str]] = [
+ (re.compile(r"^Dokumentiert\s+die\s+(.+?)\s+(.+?)\?$", re.IGNORECASE),
+ r"\2"),
+ (re.compile(r"^Werden\s+(.+?)\s+dokumentiert\?$", re.IGNORECASE),
+ r"\1 dokumentieren"),
+ (re.compile(r"^Wird\s+(.+?)\s+benannt\?$", re.IGNORECASE),
+ r"\1 benennen"),
+ (re.compile(r"^Ist\s+(.+?)\s+angegeben\?$", re.IGNORECASE),
+ r"\1 angeben"),
+ (re.compile(r"^Enthält\s+(?:die\s+)?(.+?)\s+(.+?)\?$", re.IGNORECASE),
+ r"\2 in \1"),
+ (re.compile(r"^Sind\s+(.+?)\s+vorhanden\?$", re.IGNORECASE),
+ r"\1 prüfen"),
+ (re.compile(r"^Gibt\s+es\s+(.+?)\?$", re.IGNORECASE),
+ r"\1 prüfen"),
+]
+
+
+def label_as_statement(label: str) -> str:
+ """Rewrite a question-form label as a topic statement."""
+ if not label:
+ return label
+ s = label.strip()
+ if not s.endswith("?"):
+ return s
+ for pat, repl in _REWRITES:
+ m = pat.match(s)
+ if m:
+ out = pat.sub(repl, s).strip()
+ # First word capitalised
+ return out[:1].upper() + out[1:] if out else s
+ # Generic fallback: drop the question mark + leading "Wird/Sind/Ist"
+ s2 = re.sub(r"^\s*(Wird|Sind|Ist|Werden|Gibt es|Enthält|Hat)\s+",
+ "", s, flags=re.IGNORECASE).rstrip("?")
+ return s2[:1].upper() + s2[1:] if s2 else s
+
+
+def has_evidence(check) -> bool:
+ """Decide whether an MC check has real evidence backing the FAIL.
+
+ A FAIL with non-empty `matched_text` (the regex/LLM did find a
+ string and judged it insufficient) is a hard fail. A FAIL with
+ empty matched_text is more like 'we could not confirm' → that
+ belongs in the manual-review bucket, not in critical findings.
+ """
+ matched = getattr(check, "matched_text", "") or ""
+ return bool(matched.strip())
+
+
+# Keywords that indicate a check is about an INTERNAL process the
+# auditor cannot observe from outside (TOM, DSFA, AVV, training,
+# incident response, risk analysis, deletion concept). These are
+# never findings — they are reminders that the DPO/DSB must verify
+# the document/process exists internally.
+_INTERNAL_KEYWORDS = (
+ "tom", "technisch-organisatorische", "technisch organisatorische",
+ "dsfa", "datenschutz-folgenabschätzung",
+ "datenschutzfolgenabschätzung",
+ "schulung", "training", "awareness",
+ "avv", "auftragsverarbeitungsvertrag", "auftragsverarbeitung",
+ "incident", "vorfall", "meldepflicht intern",
+ "risikoanalyse", "risikobewertung", "risk assessment",
+ "löschkonzept", "löschfristen-konzept",
+ "vvt", "verzeichnis der verarbeitungstätigkeiten",
+ "dsb-bestellung", "dsb bestellung",
+ "verfahrensverzeichnis", "berichtigungskonzept",
+ "betroffenenrechte-prozess", "dsr-prozess",
+)
+
+
+def is_internal_process(check) -> bool:
+ """Decide whether the MC check is about an internal process."""
+ label = (getattr(check, "label", "") or "").lower()
+ cid = (getattr(check, "id", "") or "").lower()
+ hint = (getattr(check, "hint", "") or "").lower()
+ # mc_audit_type module may have annotated the check
+ audit_type = getattr(check, "audit_type", "")
+ if audit_type and audit_type in ("internal", "process", "documentation"):
+ return True
+ hay = f"{label} {cid} {hint}"
+ return any(k in hay for k in _INTERNAL_KEYWORDS)
+
+
+def classify_check(check) -> str:
+ """Return one of: 'hard_fail' | 'manual_review' | 'internal_reminder'.
+
+ Only call on FAIL checks (passed=False, skipped=False). Drives
+ which bucket the check renders into.
+ """
+ if is_internal_process(check):
+ return "internal_reminder"
+ if has_evidence(check):
+ return "hard_fail"
+ return "manual_review"
diff --git a/backend-compliance/compliance/services/mail_render_v2/_legacy_wrappers.py b/backend-compliance/compliance/services/mail_render_v2/_legacy_wrappers.py
new file mode 100644
index 00000000..bc1a6b40
--- /dev/null
+++ b/backend-compliance/compliance/services/mail_render_v2/_legacy_wrappers.py
@@ -0,0 +1,446 @@
+"""Mail-V2 legacy wrappers — wrap each existing build_*_html() in V2 shell.
+
+The original step-5 had 24+ render functions, each emitting standalone
+HTML with their own styles. V2 keeps all the information by wrapping
+each output in a consistent V2 `section()` container with stripe +
+palette. The block-level styling normalizes; the inner data tables/
+lists keep their legacy markup so we don't lose detail.
+
+Each wrapper is defensive: missing data, import errors, or empty
+HTML → return "" so the section disappears rather than crashing.
+"""
+
+from __future__ import annotations
+
+import logging
+
+from ._style import section
+
+logger = logging.getLogger(__name__)
+
+
+def _safe_wrap(label: str, anchor: str, html: str,
+ *, sev: str = "info") -> str:
+ if not html or not html.strip():
+ return ""
+ return section(label, html, sev=sev, anchor=anchor)
+
+
+# ── Tier 1 (Sales-critical) ──────────────────────────────────────
+
+def render_executive_summary(state: dict) -> str:
+ """P82 GF-1-Pager + P1 Exec-Summary combined as 'Executive Summary'."""
+ parts: list[str] = []
+ req = state.get("req")
+ try:
+ from compliance.services.gf_one_pager import build_gf_one_pager_html
+ html = build_gf_one_pager_html(
+ site_name=state.get("site_name") or "",
+ scorecard=state.get("scorecard") or {},
+ previous_scorecard=state.get("prev_scorecard"),
+ banner_result=state.get("banner_result"),
+ library_mismatch_findings=state.get("mismatches") or [],
+ scan_context=getattr(req, "scan_context", None) if req else None,
+ audit_quality_findings=state.get("audit_quality_findings") or [],
+ )
+ if html and html.strip():
+ parts.append(html)
+ except Exception as e:
+ logger.warning("gf_one_pager wrapper: %s", e)
+ try:
+ from compliance.api.agent_doc_check_exec_summary import (
+ build_exec_summary_html,
+ )
+ html = build_exec_summary_html(
+ scorecard=state.get("scorecard") or {},
+ previous_scorecard=state.get("prev_scorecard"),
+ cmp_vendors=state.get("cmp_vendors") or [],
+ redundancy_report=state.get("redundancy_report"),
+ site_name=state.get("site_name") or "",
+ )
+ if html and html.strip():
+ parts.append(html)
+ except Exception as e:
+ logger.warning("exec_summary wrapper: %s", e)
+ return _safe_wrap("💼 Executive Summary", "exec",
+ "".join(parts), sev="info")
+
+
+def render_banner_screenshot(state: dict) -> str:
+ """P85 — Banner-Screenshot as visual proof."""
+ try:
+ from compliance.services.banner_screenshot_block import (
+ build_banner_screenshot_html,
+ )
+ html = build_banner_screenshot_html(state.get("banner_result"))
+ return _safe_wrap("📸 Banner-Screenshot", "banner-shot",
+ html, sev="info")
+ except Exception as e:
+ logger.warning("banner_screenshot wrapper: %s", e)
+ return ""
+
+
+def render_vvt(state: dict) -> str:
+ """VVT-Tabelle nach Art. 30 DSGVO — Verarbeitungstätigkeiten."""
+ try:
+ from compliance.api.agent_doc_check_extras import (
+ build_vvt_table_html,
+ )
+ html = build_vvt_table_html(state.get("cmp_vendors") or [])
+ return _safe_wrap("📋 VVT — Verarbeitungstätigkeiten (Art. 30 DSGVO)",
+ "vvt", html, sev="info")
+ except Exception as e:
+ logger.warning("vvt wrapper: %s", e)
+ return ""
+
+
+def render_redundancy(state: dict) -> str:
+ """O4 — Vendor-Redundanz + EU-Alternativen + Cost-Savings."""
+ try:
+ from compliance.api.agent_doc_check_redundancy import (
+ build_redundancy_html,
+ )
+ html = build_redundancy_html(state.get("redundancy_report"))
+ return _safe_wrap("💰 Optimierungspotenzial (Redundanz / EU-Alt.)",
+ "redundancy", html, sev="warn")
+ except Exception as e:
+ logger.warning("redundancy wrapper: %s", e)
+ return ""
+
+
+def render_diff(state: dict) -> str:
+ """P84 — Diff-Mode: Veränderung seit letztem Lauf."""
+ try:
+ from compliance.services.run_diff import (
+ build_diff_block_html, compute_diff,
+ )
+ from database import SessionLocal
+ db = SessionLocal()
+ try:
+ diff = compute_diff(
+ db, state["check_id"], state.get("domain_for_exec") or "",
+ state.get("banner_result"), state.get("scorecard"),
+ )
+ html = build_diff_block_html(diff) if diff else ""
+ finally:
+ db.close()
+ return _safe_wrap("📊 Veränderung seit letztem Lauf",
+ "diff", html, sev="info")
+ except Exception as e:
+ logger.warning("diff wrapper: %s", e)
+ return ""
+
+
+def render_scope_disclaimer(state: dict) -> str:
+ """P62 — Was wir prüfen, was wir nicht prüfen können."""
+ try:
+ from compliance.api.scope_disclaimer import build_scope_disclaimer_html
+ html = build_scope_disclaimer_html()
+ return _safe_wrap("🔍 Prüfumfang & Methodische Hinweise",
+ "scope", html, sev="info")
+ except Exception as e:
+ logger.warning("scope_disclaimer wrapper: %s", e)
+ return ""
+
+
+# ── Tier 2 (Audit-detail) ─────────────────────────────────────────
+
+def render_banner_deep(state: dict) -> str:
+ """Banner-Deep: Phases + Quality-Score + Per-Category-Tracker."""
+ try:
+ from compliance.api.agent_doc_check_banner import (
+ build_banner_deep_html,
+ )
+ html = build_banner_deep_html(state.get("banner_result"))
+ return _safe_wrap("🍪 Banner-Tiefenanalyse (Phasen + Kategorien)",
+ "banner-deep", html, sev="info")
+ except Exception as e:
+ logger.warning("banner_deep wrapper: %s", e)
+ return ""
+
+
+def render_cookie_audit(state: dict) -> str:
+ """Cookie 3-Quellen-Audit (deklariert ↔ Browser ↔ Library)."""
+ try:
+ from compliance.services.cookie_compliance_audit import (
+ build_cookie_audit_block_html,
+ )
+ html = build_cookie_audit_block_html(state.get("cookie_audit") or {})
+ return _safe_wrap("🔬 Cookie-Audit (3-Quellen-Vergleich)",
+ "cookie-audit", html, sev="warn")
+ except Exception as e:
+ logger.warning("cookie_audit wrapper: %s", e)
+ return ""
+
+
+def render_solutions(state: dict) -> str:
+ """P73 — LLM-Lösungsvorschläge pro HIGH-Fail."""
+ try:
+ from compliance.services.mc_solution_generator import (
+ build_solutions_block_html,
+ )
+ html = build_solutions_block_html(state.get("mc_solutions") or [])
+ return _safe_wrap("🎯 LLM-Lösungsvorschläge (P73)",
+ "solutions", html, sev="info")
+ except Exception as e:
+ logger.warning("solutions wrapper: %s", e)
+ return ""
+
+
+def render_cookie_architecture(state: dict) -> str:
+ """P10 — Cookie-Policy-Architecture (BMW-Pattern, layered separation)."""
+ try:
+ from compliance.services.cookie_policy_architecture import (
+ build_architecture_html,
+ )
+ html = build_architecture_html(state.get("cookie_architecture") or {})
+ return _safe_wrap("🏗 Cookie-Policy-Architektur",
+ "cookie-arch", html, sev="info")
+ except Exception as e:
+ logger.warning("cookie_architecture wrapper: %s", e)
+ return ""
+
+
+def render_library_mismatch(state: dict) -> str:
+ """P102 — Cookie-Klassifikations-Pruefung gegen Library."""
+ try:
+ from compliance.services.cookie_library_mismatch import (
+ build_mismatch_block_html,
+ )
+ html = build_mismatch_block_html(state.get("mismatches") or [])
+ return _safe_wrap("⚖️ Cookie-Klassifikation gegen Library (P102)",
+ "lib-mismatch", html, sev="warn")
+ except Exception as e:
+ logger.warning("library_mismatch wrapper: %s", e)
+ return ""
+
+
+def render_banner_consistency(state: dict) -> str:
+ """P92/P94 — Banner-Konsistenz / CMP-Health."""
+ try:
+ from compliance.services.banner_consistency_checks import (
+ build_consistency_block_html,
+ )
+ html = build_consistency_block_html(
+ state.get("consistency_findings") or [])
+ return _safe_wrap("🧩 Banner-Konsistenz + CMP-Health",
+ "banner-consistency", html, sev="warn")
+ except Exception as e:
+ logger.warning("banner_consistency wrapper: %s", e)
+ return ""
+
+
+def render_signals(state: dict) -> str:
+ """P35/P77/P78 — Save-Label, Cookies-in-DSE, JC-Klausel."""
+ try:
+ from compliance.services.doc_text_signals import (
+ build_signals_block_html,
+ )
+ html = build_signals_block_html(state.get("signal_findings") or [])
+ return _safe_wrap("🚩 Doc-Text-Signale (P35/P77/P78)",
+ "signals", html, sev="info")
+ except Exception as e:
+ logger.warning("signals wrapper: %s", e)
+ return ""
+
+
+def render_scorecard_regulation(state: dict) -> str:
+ """MC-Scorecard per Regulation (DSGVO/TDDDG/BGB-Split)."""
+ try:
+ from compliance.api.agent_doc_check_scorecard import (
+ build_scorecard_html,
+ )
+ html = build_scorecard_html(
+ state.get("scorecard") or {},
+ previous_scorecard=state.get("prev_scorecard"),
+ )
+ return _safe_wrap("📊 Compliance-Scorecard pro Regulation",
+ "scorecard", html, sev="info")
+ except Exception as e:
+ logger.warning("scorecard wrapper: %s", e)
+ return ""
+
+
+def render_profile_html(state: dict) -> str:
+ """Erkanntes Geschäftsmodell."""
+ try:
+ from compliance.api.agent_doc_check_report import build_profile_html
+ html = build_profile_html(state.get("profile"))
+ return _safe_wrap("🏢 Erkanntes Geschäftsmodell",
+ "profile", html, sev="info")
+ except Exception as e:
+ logger.warning("profile wrapper: %s", e)
+ return ""
+
+
+def render_input_warnings(state: dict) -> str:
+ """Doc-Input-Warnings: User Text in falsches Feld gepastet."""
+ try:
+ from compliance.services.doc_input_warnings import (
+ build_warnings_block_html,
+ )
+ warns = state.get("input_warnings") or []
+ html = build_warnings_block_html(warns) if warns else ""
+ return _safe_wrap("⚠️ Eingabe-Warnungen",
+ "input-warn", html, sev="warn")
+ except Exception as e:
+ logger.warning("input_warnings wrapper: %s", e)
+ return ""
+
+
+# ── Tier 3 (Cookie-deep + advisory) ───────────────────────────────
+
+def render_entropy(state: dict) -> str:
+ """P103 — Cookie-Value-Entropy."""
+ try:
+ from compliance.services.cookie_value_entropy import (
+ build_entropy_block_html,
+ )
+ html = build_entropy_block_html(state.get("entropy_findings") or [])
+ return _safe_wrap("🎲 Cookie-Entropy-Anomalien (P103)",
+ "entropy", html, sev="info")
+ except Exception as e:
+ logger.warning("entropy wrapper: %s", e)
+ return ""
+
+
+def render_network_trace(state: dict) -> str:
+ """P104 — Network-Tracing."""
+ try:
+ from compliance.services.cookie_network_tracer import (
+ build_network_trace_block_html,
+ )
+ html = build_network_trace_block_html(
+ state.get("network_findings") or [])
+ return _safe_wrap("🌐 Network-Tracing (P104)",
+ "network", html, sev="info")
+ except Exception as e:
+ logger.warning("network_trace wrapper: %s", e)
+ return ""
+
+
+def render_tcf_authority(state: dict) -> str:
+ """P105 — IAB TCF Authority Cross-Reference."""
+ try:
+ from compliance.services.tcf_vendor_authority import (
+ build_tcf_authority_block_html,
+ )
+ html = build_tcf_authority_block_html(
+ state.get("tcf_authority_findings") or [])
+ return _safe_wrap("🆔 IAB TCF Vendor Authority (P105)",
+ "tcf-auth", html, sev="info")
+ except Exception as e:
+ logger.warning("tcf_authority wrapper: %s", e)
+ return ""
+
+
+def render_jc_avv(state: dict) -> str:
+ """P71 — JC-vs-AVV Entscheidungsbaum."""
+ try:
+ from compliance.services.jc_avv_decision import (
+ build_jc_avv_decision_html,
+ )
+ html = build_jc_avv_decision_html(
+ (state.get("doc_texts") or {}).get("dse"))
+ return _safe_wrap("⚖️ Joint Controller vs. AVV — Entscheidung (P71)",
+ "jc-avv", html, sev="info")
+ except Exception as e:
+ logger.warning("jc_avv wrapper: %s", e)
+ return ""
+
+
+def render_industry_context(state: dict) -> str:
+ """P6/53/55 — Branchen-Kontext + Site-History."""
+ try:
+ from compliance.services.industry_library import (
+ build_industry_context_block_html,
+ )
+ ind = None
+ req = state.get("req")
+ if req and getattr(req, "scan_context", None):
+ ind = req.scan_context.get("industry")
+ html = build_industry_context_block_html(
+ ind, state.get("site_profile"))
+ return _safe_wrap("🏭 Branchen-Kontext + Historie",
+ "industry", html, sev="info")
+ except Exception as e:
+ logger.warning("industry_context wrapper: %s", e)
+ return ""
+
+
+def render_benchmark(state: dict) -> str:
+ """P86 — Branchen-Benchmark."""
+ try:
+ from compliance.services.industry_benchmark import (
+ build_benchmark_html,
+ )
+ html = build_benchmark_html(state.get("benchmark") or {})
+ return _safe_wrap("📈 Branchen-Benchmark (P86)",
+ "bench", html, sev="info")
+ except Exception as e:
+ logger.warning("benchmark wrapper: %s", e)
+ return ""
+
+
+def render_scanned_urls(state: dict) -> str:
+ """Quellen-Transparenz: welche URLs wurden gecrawlt."""
+ try:
+ from compliance.api.agent_doc_check_report import (
+ build_scanned_urls_html,
+ )
+ html = build_scanned_urls_html(state.get("doc_entries") or [])
+ return _safe_wrap("🔗 Geprüfte URLs (Quellen-Transparenz)",
+ "scanned-urls", html, sev="info")
+ except Exception as e:
+ logger.warning("scanned_urls wrapper: %s", e)
+ return ""
+
+
+def render_management_summary(state: dict) -> str:
+ """Konkrete Aufgaben für die Geschäftsführung."""
+ try:
+ from compliance.api.agent_doc_check_report import (
+ build_management_summary,
+ )
+ html = build_management_summary(state.get("results") or [])
+ return _safe_wrap("📝 Management-Zusammenfassung",
+ "mgmt", html, sev="info")
+ except Exception as e:
+ logger.warning("management_summary wrapper: %s", e)
+ return ""
+
+
+# ── Render the whole legacy block region ────────────────────────
+
+def render_all_legacy(state: dict) -> str:
+ """Render every legacy block in the canonical order."""
+ return "".join([
+ # Tier 1 (Sales)
+ render_executive_summary(state),
+ render_diff(state),
+ render_solutions(state),
+ render_redundancy(state),
+ render_vvt(state),
+ render_banner_screenshot(state),
+ # Tier 2 (Audit-detail)
+ render_scorecard_regulation(state),
+ render_banner_deep(state),
+ render_banner_consistency(state),
+ render_cookie_audit(state),
+ render_cookie_architecture(state),
+ render_library_mismatch(state),
+ render_signals(state),
+ render_profile_html(state),
+ render_input_warnings(state),
+ # Tier 3 (advisory)
+ render_entropy(state),
+ render_network_trace(state),
+ render_tcf_authority(state),
+ render_jc_avv(state),
+ render_industry_context(state),
+ render_benchmark(state),
+ render_scanned_urls(state),
+ render_management_summary(state),
+ # Scope-Disclaimer last — footer-ish
+ render_scope_disclaimer(state),
+ ])
diff --git a/backend-compliance/compliance/services/mail_render_v2/_scope_filter.py b/backend-compliance/compliance/services/mail_render_v2/_scope_filter.py
new file mode 100644
index 00000000..80f64ab2
--- /dev/null
+++ b/backend-compliance/compliance/services/mail_render_v2/_scope_filter.py
@@ -0,0 +1,88 @@
+"""Mail-V2 scope filter — drop MC findings that don't apply.
+
+Some MC-DB entries are sector-specific (FIN = financial services,
+GOV = public authority, MED = healthcare, INS = insurance, EDU =
+education, LEG = legal profession). They have no business surfacing
+for a normal B2C company like Elli (energy/EV charging).
+
+This filter inspects the MC ID prefix and, when the prefix denotes
+a sector that doesn't match the detected `business_scope`, drops the
+check from the V2 finding renderers.
+
+The MC pipeline itself is unchanged — MCs are still evaluated; we
+just suppress them in the report when out of scope. Set
+`KEEP_OOS_MCS=true` in the env to disable the filter (useful for
+DSB debug runs).
+"""
+
+from __future__ import annotations
+
+import os
+
+# Prefix -> sector token expected in business_scope to KEEP the check.
+SECTOR_PREFIXES: dict[str, set[str]] = {
+ "FIN": {"financial_services", "bank", "bafin", "fintech",
+ "payment_provider"},
+ "GOV": {"public_authority", "government", "behoerde"},
+ "MED": {"healthcare", "medical", "pharma", "klinik"},
+ "INS": {"insurance", "versicherung"},
+ "EDU": {"education", "schule", "hochschule", "university"},
+ "LEG": {"legal_profession", "anwaltskammer", "kanzlei"},
+ "REL": {"church", "religion", "religious"},
+ "POL": {"political_party", "partei"},
+}
+
+# Cheap counter so the renderer can show "X MCs gefiltert (out of scope)".
+_LAST_DROPPED: dict[str, int] = {"count": 0, "by_prefix": {}}
+
+
+def _enabled() -> bool:
+ return os.environ.get("KEEP_OOS_MCS", "false").lower() not in (
+ "true", "1", "yes", "on",
+ )
+
+
+def _extract_prefix(check_id: str) -> str | None:
+ """Return the sector prefix (e.g. 'FIN') from mc-FIN-814-A03."""
+ if not check_id:
+ return None
+ parts = check_id.split("-")
+ # mc-XXX-NNN-AYY → parts = ["mc", "XXX", "NNN", "AYY"]
+ if len(parts) >= 2 and parts[0].lower() == "mc":
+ prefix = parts[1].upper()
+ if prefix in SECTOR_PREFIXES:
+ return prefix
+ return None
+
+
+def is_out_of_scope(check, business_scope: set[str] | None) -> bool:
+ """Decide whether the check is sector-specific AND out of scope."""
+ if not _enabled():
+ return False
+ prefix = _extract_prefix(getattr(check, "id", "") or "")
+ if not prefix:
+ return False
+ required = SECTOR_PREFIXES.get(prefix) or set()
+ scope_lc = {s.lower() for s in (business_scope or set())}
+ return not (scope_lc & required)
+
+
+def filter_out_of_scope(checks, business_scope: set[str] | None) -> list:
+ """Return `checks` with out-of-scope items removed; mutates counter."""
+ _LAST_DROPPED["count"] = 0
+ _LAST_DROPPED["by_prefix"] = {}
+ out = []
+ for c in checks:
+ if is_out_of_scope(c, business_scope):
+ _LAST_DROPPED["count"] += 1
+ prefix = _extract_prefix(getattr(c, "id", "") or "") or "?"
+ _LAST_DROPPED["by_prefix"][prefix] = (
+ _LAST_DROPPED["by_prefix"].get(prefix, 0) + 1
+ )
+ continue
+ out.append(c)
+ return out
+
+
+def get_last_drop_stats() -> dict:
+ return dict(_LAST_DROPPED)
diff --git a/backend-compliance/compliance/services/mail_render_v2/_style.py b/backend-compliance/compliance/services/mail_render_v2/_style.py
new file mode 100644
index 00000000..e0011896
--- /dev/null
+++ b/backend-compliance/compliance/services/mail_render_v2/_style.py
@@ -0,0 +1,200 @@
+"""Mail-V2 style system — single source of truth for all visual props.
+
+Email rendering = inline styles only (most clients strip