From d0e3621192b0cb78a1e869453fd967c5424d70ac Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sat, 6 Jun 2026 21:19:49 +0200 Subject: [PATCH] feat(audit): V2 mail render + 5 new findings (B4/B5/B6/B7/B8) + LLM-Plausibility-Phase MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mail Render V2 (compliance/services/mail_render_v2/) — 11-Modul-Subpackage das einen einheitlichen Audit-Mail-Output erzeugt mit: - Header + KPI-Kacheln (Score / Findings / Docs / Vendors) - TOC + Sprung-Links - 3-Bucket-Trennung: Kritische Befunde / Manuelle Prüfung / Interne Reminder - Cookie-Inventar (Name·Vendor·Kategorie·Speicherdauer·Löschfrist·Sitzland·Quelle·Status) - Sofortmaßnahmen-Aggregator ("Sitzland ergänzen für 11 Cookies") - 24 Legacy-Wrappers — alle alten build_*_html in V2-Sections - Scope-Filter: FIN/GOV/MED/INS/EDU/LEG aus Berichten wenn nicht relevant - Hint/Action-Dedup: keine doppelten Sätze pro Card mehr Aktiviert via env MAIL_RENDER_V2=true (Default: legacy renderer). 5 neue deterministische Findings als Phase D-2b/B4/B5/B6/B7/B8: B4 vendor_consistency_check — Cross-Doc-Provider-Widerspruch (Elli: DSE nennt Vertex AI für Chatbot, /de/cookies nennt Iadvize → HIGH). 6 Service-Types: chatbot/analytics/tag_manager/pixel/cdn/cmp. B5 ai_act_transparency_check — AI Act Art. 50 Transparenzpflicht (Elli: Vertex AI vorhanden ohne Pre-Chat-Disclosure → HIGH). Plus B5-Erweiterung: Rechtsgrundlage Art-6-Abs-1-lit-f bei AI → MED (Einwilligung empfehlen). B6 cross_doc_dpo_check — DPO in DSE genannt, nicht im Impressum (LOW). B7 doc_staleness_check — Datum-Extraktion aus DSE/AGB/Nutzungsbedingungen. Cap: AGB/NB 3y, DSE 2y. Älter → MEDIUM (Elli NB Stand 2018 → HIGH). B8 cmp_fingerprint_check — Banner detected, aber CMP-Provider generic (kein Usercentrics/OneTrust/Cookiebot/etc → MED). B3-Erweiterung detect_intra_doc_contradictions — Widersprüchliche Speicherdauer im SELBEN Doc (Elli: Logfile 7d vs 30d → HIGH). LLM-Plausibility-Phase (Phase D-2b, finding_plausibility_check.py): - Läuft AFTER MC pipeline, BEFORE D3 render - Prompt mit Beispiel-IDs + 3-Phase-Mapping: exact-ID / position-fallback / fuzzy-tail-match - Stempelt llm_title / llm_severity / llm_recommendation / llm_drop auf jeden FAIL CheckItem - V2-Render zeigt "🤖 LLM-Plausibility:" Box pro Finding wenn gestempelt - KNOWN ISSUE: qwen3:30b-a3b liefert oft empty content auf format='json' + 8000-char-excerpt prompts. Pipeline läuft mit stamped=0 weiter. Task #16. Coverage gegen Elli Ground Truth (zeroclaw/docs/ground-truth/elli_eco_2026-06-06.json, 13 expected findings via WebFetch-Agent-Crawl): - 4/4 HIGH-Findings ✓ (COOKIE-CONSENT-UX-001 + WIDERRUFSBELEHRUNG-001 + VENDOR-CONSISTENCY-001 + AI-ACT-TRANSPARENCY-001) - 4/6 MEDIUM ✓ - 2/3 LOW ✓ - Total: 10/13 = 77% (Sprung von 4/13 = 31%) Restliche 3 Gaps als Task #17: IMPRESSUM-001 (multi-entity USt-IdNr), TRANSFER-001 (Vendor-Mechanismus DPF/SCC), TH-RETENTION-002 (AI-Retention pro Datenkategorie). V2-Mail-Preview in Mailpit: 'v2all@local.test' Subject '[V2 ALL] ELLI'. Backend healthy, B1+B3+B4+B5+B6+B7+B8 alle live im Orchestrator. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../compliance/api/agent_check/_b3_wiring.py | 6 + .../compliance/api/agent_check/_b4_wiring.py | 78 +++ .../compliance/api/agent_check/_b5_wiring.py | 81 ++++ .../api/agent_check/_b6b7b8_wiring.py | 97 ++++ .../api/agent_check/_orchestrator.py | 9 + .../agent_check/_phase_d2b_plausibility.py | 41 ++ .../api/agent_check/_phase_d3_blocks_bot.py | 13 + .../services/ai_act_transparency_check.py | 221 +++++++++ .../services/cmp_fingerprint_check.py | 55 +++ .../services/cross_doc_dpo_check.py | 66 +++ .../services/doc_staleness_check.py | 133 ++++++ .../services/finding_plausibility_check.py | 329 +++++++++++++ .../services/mail_render_v2/__init__.py | 16 + .../services/mail_render_v2/_actions.py | 296 ++++++++++++ .../services/mail_render_v2/_aggregator.py | 248 ++++++++++ .../services/mail_render_v2/_blocks.py | 367 ++++++++++++++ .../mail_render_v2/_blocks_findings.py | 290 ++++++++++++ .../services/mail_render_v2/_compose.py | 64 +++ .../mail_render_v2/_cookie_inventory.py | 267 +++++++++++ .../services/mail_render_v2/_label_norm.py | 113 +++++ .../mail_render_v2/_legacy_wrappers.py | 446 ++++++++++++++++++ .../services/mail_render_v2/_scope_filter.py | 88 ++++ .../services/mail_render_v2/_style.py | 200 ++++++++ .../services/retention_comparator.py | 75 ++- .../services/vendor_consistency_check.py | 192 ++++++++ .../tests/test_mail_render_v2.py | 213 +++++++++ .../ground-truth/elli_eco_2026-06-06.json | 425 +++++++++++++++++ 27 files changed, 4426 insertions(+), 3 deletions(-) create mode 100644 backend-compliance/compliance/api/agent_check/_b4_wiring.py create mode 100644 backend-compliance/compliance/api/agent_check/_b5_wiring.py create mode 100644 backend-compliance/compliance/api/agent_check/_b6b7b8_wiring.py create mode 100644 backend-compliance/compliance/api/agent_check/_phase_d2b_plausibility.py create mode 100644 backend-compliance/compliance/services/ai_act_transparency_check.py create mode 100644 backend-compliance/compliance/services/cmp_fingerprint_check.py create mode 100644 backend-compliance/compliance/services/cross_doc_dpo_check.py create mode 100644 backend-compliance/compliance/services/doc_staleness_check.py create mode 100644 backend-compliance/compliance/services/finding_plausibility_check.py create mode 100644 backend-compliance/compliance/services/mail_render_v2/__init__.py create mode 100644 backend-compliance/compliance/services/mail_render_v2/_actions.py create mode 100644 backend-compliance/compliance/services/mail_render_v2/_aggregator.py create mode 100644 backend-compliance/compliance/services/mail_render_v2/_blocks.py create mode 100644 backend-compliance/compliance/services/mail_render_v2/_blocks_findings.py create mode 100644 backend-compliance/compliance/services/mail_render_v2/_compose.py create mode 100644 backend-compliance/compliance/services/mail_render_v2/_cookie_inventory.py create mode 100644 backend-compliance/compliance/services/mail_render_v2/_label_norm.py create mode 100644 backend-compliance/compliance/services/mail_render_v2/_legacy_wrappers.py create mode 100644 backend-compliance/compliance/services/mail_render_v2/_scope_filter.py create mode 100644 backend-compliance/compliance/services/mail_render_v2/_style.py create mode 100644 backend-compliance/compliance/services/vendor_consistency_check.py create mode 100644 backend-compliance/tests/test_mail_render_v2.py create mode 100644 zeroclaw/docs/ground-truth/elli_eco_2026-06-06.json diff --git a/backend-compliance/compliance/api/agent_check/_b3_wiring.py b/backend-compliance/compliance/api/agent_check/_b3_wiring.py index 8f6e1a9d..e866990a 100644 --- a/backend-compliance/compliance/api/agent_check/_b3_wiring.py +++ b/backend-compliance/compliance/api/agent_check/_b3_wiring.py @@ -20,6 +20,7 @@ import time from compliance.services.retention_comparator import ( build_retention_theme_summary, compare_retention, + detect_intra_doc_contradictions, extract_retention_claims, ) @@ -54,6 +55,11 @@ def run_b3(state: dict) -> None: if not dsi_text: return + # Intra-doc contradictions are independent of cmp_vendors — run + # them first so they survive the early-return below. + intra = detect_intra_doc_contradictions(dsi_text) + state["retention_intra_doc"] = intra + cookie_records: list[dict] = [] cookie_names: list[str] = [] vendor_names: list[str] = [] diff --git a/backend-compliance/compliance/api/agent_check/_b4_wiring.py b/backend-compliance/compliance/api/agent_check/_b4_wiring.py new file mode 100644 index 00000000..d551c041 --- /dev/null +++ b/backend-compliance/compliance/api/agent_check/_b4_wiring.py @@ -0,0 +1,78 @@ +"""B4 wiring — Cross-Doc Vendor-Consistency check + HTML block. + +Activated after B1+B3 in the orchestrator. The check itself is +deterministic (no LLM); it scans DSE + cookie texts for known +service providers per service type and flags every mismatch. + +The mail renderer reads `state["vendor_consistency_findings"]` and +`state["vendor_consistency_html"]` directly — no further wiring. +""" + +from __future__ import annotations + +import html +import logging + +from compliance.services.vendor_consistency_check import ( + check_vendor_consistency, +) + +logger = logging.getLogger(__name__) + + +def run_b4(state: dict) -> None: + findings = check_vendor_consistency(state) + state["vendor_consistency_findings"] = findings + if not findings: + return + state["vendor_consistency_html"] = _render(findings) + logger.info( + "B4 Vendor-Consistency: %d findings (HIGH=%d, MEDIUM=%d)", + len(findings), + sum(1 for f in findings if (f.get("severity") or "") == "HIGH"), + sum(1 for f in findings if (f.get("severity") or "") == "MEDIUM"), + ) + + +def _render(findings: list[dict]) -> str: + rows = [] + for f in findings: + sev = (f.get("severity") or "").upper() + color = "#dc2626" if sev == "HIGH" else "#f59e0b" + dse = ", ".join(f.get("dse_providers") or []) or "" + cookie = ", ".join(f.get("cookie_providers") or []) or "" + rows.append( + "" + f"" + f"{html.escape((f.get('service_type') or '').replace('_',' ').title())}" + "" + f"" + f"{dse}" + f"" + f"{cookie}" + f"" + f"{sev} {html.escape(f.get('severity_reason') or '')}" + "" + ) + return ( + "
" + "

" + "VENDOR-CONSISTENCY-001 — Vendor-Konsistenz DSE ↔ Cookies

" + "

" + f"{len(findings)} Provider-Widersprüche zwischen " + "Datenschutzerklärung und Cookie-Seite. Beispiel Elli: " + "DSE = Vertex AI für Chatbot, Cookies-Seite = Iadvize.

" + "" + "" + "" + "" + "" + "" + "" + f"{''.join(rows)}" + "
Service-TypIn DSEAuf Cookies-SeiteSeverity
" + "
" + ) diff --git a/backend-compliance/compliance/api/agent_check/_b5_wiring.py b/backend-compliance/compliance/api/agent_check/_b5_wiring.py new file mode 100644 index 00000000..ffe0d557 --- /dev/null +++ b/backend-compliance/compliance/api/agent_check/_b5_wiring.py @@ -0,0 +1,81 @@ +"""B5 wiring — AI-Act Art. 50 Transparenzpflicht-Check + HTML block. + +Runs after B4 (vendor-consistency). Deterministic detection of +AI-Provider mentions + disclosure-phrase mentions. When an AI is +present but no Art-50-disclosure → HIGH finding; when both present +the renderer flags MEDIUM/manual-review because the LIVE pre-chat +UI hint cannot be verified without a consent-tester DOM scan. +""" + +from __future__ import annotations + +import html +import logging + +from compliance.services.ai_act_transparency_check import ( + check_ai_act_transparency, +) + +logger = logging.getLogger(__name__) + + +def run_b5(state: dict) -> None: + findings = check_ai_act_transparency(state) + state["ai_act_findings"] = findings + if not findings: + return + state["ai_act_html"] = _render(findings) + logger.info( + "B5 AI-Act: %d findings (HIGH=%d, MEDIUM=%d)", + len(findings), + sum(1 for f in findings if (f.get("severity") or "") == "HIGH"), + sum(1 for f in findings if (f.get("severity") or "") == "MEDIUM"), + ) + + +def _render(findings: list[dict]) -> str: + cards = [] + for f in findings: + sev = (f.get("severity") or "").upper() + color = "#dc2626" if sev == "HIGH" else "#f59e0b" + vendors_html = "" + if f.get("ai_vendors"): + chips = "".join( + f"{html.escape(v.get('vendor') or '—')}" + for v in f["ai_vendors"] + ) + vendors_html = ( + "
" + f"Erkannte AI-Vendors: {chips}
" + ) + signals_html = ( + f"
" + f"{html.escape(f.get('detected_signals') or '')}
" + ) + cards.append( + f"
" + f"
" + f"{sev} · {html.escape(f.get('check_id') or '')}
" + f"
" + f"{html.escape(f.get('title') or '')}
" + f"
" + f"{html.escape(f.get('norm') or '')}
" + f"{vendors_html}{signals_html}" + f"
" + f"→ Empfehlung: " + f"{html.escape(f.get('action') or '')}
" + "
" + ) + return ( + "
" + "

" + "🤖 AI-Act Art. 50 — Transparenzpflicht KI-Interaktion" + "

" + + "".join(cards) + + "
" + ) diff --git a/backend-compliance/compliance/api/agent_check/_b6b7b8_wiring.py b/backend-compliance/compliance/api/agent_check/_b6b7b8_wiring.py new file mode 100644 index 00000000..2132d394 --- /dev/null +++ b/backend-compliance/compliance/api/agent_check/_b6b7b8_wiring.py @@ -0,0 +1,97 @@ +"""B6 / B7 / B8 wiring — DPO-cross-doc, doc-staleness, CMP-fingerprint. + +Three small, deterministic checks added after B5. Each writes one or +more findings into `state["extra_findings"]` and a tiny HTML block +into `state["extra_findings_html"]` that the V2 renderer concatenates +between B5 (AI-Act) and the legacy section block. +""" + +from __future__ import annotations + +import html +import logging + +from compliance.services.cmp_fingerprint_check import check_cmp_fingerprint +from compliance.services.cross_doc_dpo_check import check_dpo_cross_doc +from compliance.services.doc_staleness_check import check_staleness + +logger = logging.getLogger(__name__) + + +def run_b6b7b8(state: dict) -> None: + findings: list[dict] = [] + + dpo = check_dpo_cross_doc(state) + if dpo: + findings.append(dpo) + + stale = check_staleness(state) + findings.extend(stale) + + cmp = check_cmp_fingerprint(state) + if cmp: + findings.append(cmp) + + state["extra_findings"] = findings + if findings: + state["extra_findings_html"] = _render(findings) + logger.info( + "B6/B7/B8 extra: %d findings (DPO=%d, staleness=%d, CMP=%d)", + len(findings), 1 if dpo else 0, len(stale), 1 if cmp else 0, + ) + + +def _render(findings: list[dict]) -> str: + cards = [] + for f in findings: + sev = (f.get("severity") or "").upper() + color = "#dc2626" if sev == "HIGH" else ( + "#f59e0b" if sev == "MEDIUM" else "#64748b" + ) + evidence_html = "" + if f.get("evidence_dse"): + evidence_html = ( + "
" + f"In DSE: {html.escape(', '.join(f['evidence_dse']))}" + "
" + ) + if f.get("doc_date"): + evidence_html = ( + "
" + f"Stand: {html.escape(f['doc_date'])} " + f"({f.get('age_years','?')} Jahre alt, Cap " + f"{f.get('threshold_years','?')} Jahre)" + "
" + ) + if f.get("detected_provider"): + evidence_html = ( + "
" + f"Erkannter Provider: " + f"{html.escape(f['detected_provider'])}" + "
" + ) + cards.append( + f"
" + f"
" + f"{sev} · {html.escape(f.get('check_id') or '')}
" + f"
" + f"{html.escape(f.get('title') or '')}
" + f"
" + f"{html.escape(f.get('norm') or '')}
" + f"{evidence_html}" + f"
" + f"→ Empfehlung: " + f"{html.escape(f.get('action') or '')}
" + "
" + ) + return ( + "
" + "

" + "📌 Zusätzliche Cross-Doc-Befunde (DPO / Staleness / CMP-Fingerprint)" + "

" + + "".join(cards) + + "
" + ) diff --git a/backend-compliance/compliance/api/agent_check/_orchestrator.py b/backend-compliance/compliance/api/agent_check/_orchestrator.py index 3fcbb4f1..49e3f0af 100644 --- a/backend-compliance/compliance/api/agent_check/_orchestrator.py +++ b/backend-compliance/compliance/api/agent_check/_orchestrator.py @@ -18,12 +18,16 @@ import logging from ._b1_wiring import run_b1 from ._b3_wiring import run_b3 +from ._b4_wiring import run_b4 +from ._b5_wiring import run_b5 +from ._b6b7b8_wiring import run_b6b7b8 from ._constants import _compliance_check_jobs from ._phase_a_resolve import run_phase_a from ._phase_b_profile_check import run_phase_b from ._phase_c_banner import run_phase_c from ._phase_d1_vendors_raw import run_phase_d1 from ._phase_d2_vendors_finalize import run_phase_d2 +from ._phase_d2b_plausibility import run_phase_d2b from ._phase_d3_blocks_bot import run_phase_d3_bot from ._phase_d3_blocks_mid import run_phase_d3_mid from ._phase_d3_blocks_top import run_phase_d3_top @@ -49,11 +53,16 @@ async def run_compliance_check(check_id: str, req) -> None: # Phase D-1/D-2: Step 5 vendor extraction + finalize await run_phase_d1(state) await run_phase_d2(state) + # D-2b: LLM Plausibility Re-Eval — stamps llm_* on all FAIL checks + await run_phase_d2b(state) # B1 + B3: cross-cutting checks that need the finalized vendor # list + DSI text. Render their own HTML blocks consumed by # phase D-3 bot's full_html composition. await run_b1(state) run_b3(state) + run_b4(state) # Cross-doc vendor-consistency (Elli Vertex↔Iadvize) + run_b5(state) # AI-Act Art. 50 transparency + run_b6b7b8(state) # DPO-cross-doc + Doc-Staleness + CMP-fingerprint # Phase D-3 top/mid/bot: Step 5 HTML blocks await run_phase_d3_top(state) await run_phase_d3_mid(state) diff --git a/backend-compliance/compliance/api/agent_check/_phase_d2b_plausibility.py b/backend-compliance/compliance/api/agent_check/_phase_d2b_plausibility.py new file mode 100644 index 00000000..0b9d4eda --- /dev/null +++ b/backend-compliance/compliance/api/agent_check/_phase_d2b_plausibility.py @@ -0,0 +1,41 @@ +"""Phase D-2b — LLM Plausibility Re-Eval over all MC findings. + +Runs AFTER vendor finalize and BEFORE D3 HTML blocks. Stamps the +`llm_title` / `llm_severity` / `llm_recommendation` / `llm_drop` +fields onto every FAIL CheckItem. The V2 mail renderer reads these +fields automatically — no further wiring needed. + +Opt-out via env var `PLAUSIBILITY_DISABLED=true` (e.g. for CI runs +where the LLM endpoint isn't reachable). +""" + +from __future__ import annotations + +import logging +import os + +from ._helpers import _update + +logger = logging.getLogger(__name__) + + +async def run_phase_d2b(state: dict) -> None: + """Run the plausibility re-eval over state["results"]. Mutates checks.""" + if os.environ.get("PLAUSIBILITY_DISABLED", "false").lower() in ( + "true", "1", "yes", + ): + logger.info("plausibility-check disabled by env") + return + check_id = state["check_id"] + results = state.get("results") or [] + doc_texts = state.get("doc_texts") or {} + if not results: + return + _update(check_id, "LLM-Plausibilitäts-Check über alle Findings...", 94) + try: + from compliance.services.finding_plausibility_check import ( + verify_plausibility, + ) + await verify_plausibility(results, doc_texts) + except Exception as e: + logger.warning("plausibility-phase failed (continuing): %s", e) diff --git a/backend-compliance/compliance/api/agent_check/_phase_d3_blocks_bot.py b/backend-compliance/compliance/api/agent_check/_phase_d3_blocks_bot.py index 49f6ac6d..f894f23a 100644 --- a/backend-compliance/compliance/api/agent_check/_phase_d3_blocks_bot.py +++ b/backend-compliance/compliance/api/agent_check/_phase_d3_blocks_bot.py @@ -217,4 +217,17 @@ async def run_phase_d3_bot(state: dict) -> None: ) state["audit_quality_findings"] = audit_quality_findings + + # MAIL_RENDER_V2 — opt-in unified layout. Default keeps the legacy + # composition so we can A/B compare in Mailpit. + try: + from compliance.services.mail_render_v2._compose import ( + compose_v2, is_v2_enabled, + ) + if is_v2_enabled(): + full_html = compose_v2(state) + logger.info("MAIL_RENDER_V2 active: %d bytes", len(full_html)) + except Exception as e: + logger.warning("MAIL_RENDER_V2 fallback to legacy: %s", e) + state["full_html"] = full_html diff --git a/backend-compliance/compliance/services/ai_act_transparency_check.py b/backend-compliance/compliance/services/ai_act_transparency_check.py new file mode 100644 index 00000000..48c4ca6a --- /dev/null +++ b/backend-compliance/compliance/services/ai_act_transparency_check.py @@ -0,0 +1,221 @@ +"""AI-Act Art. 50 Transparenzpflicht-Check (vereinfacht). + +Art. 50 AI Act verlangt, dass Nutzer beim Interagieren mit einem +KI-System (Chatbot, Sprachassistent etc.) erkennen können, dass sie +mit einer KI sprechen — es sei denn, das ist offensichtlich aus dem +Kontext heraus. + +Der Check ist heuristisch (kein LLM) und prüft drei Schichten: + + 1. AI-Provider-Detection in DSE und Vendor-Liste + (Vertex AI, OpenAI, Anthropic, etc.) + 2. Disclosure-Text-Detection in DSE / Cookie-Doc + ("KI-System", "Sie chatten mit einer KI", "automatisiert", + "Artificial Intelligence", "Konversations-KI", "GPT", …) + 3. Cross-Check: AI-Provider gefunden + keine Disclosure → HIGH + AI-Provider gefunden + Disclosure vorhanden, aber kein "Sie + interagieren mit einer KI"-Hinweis → MEDIUM (Pre-Chat-Hinweis + vor erstem Input gefordert; kann nur ein consent-tester-DOM-Scan + verifizieren) + +Bekannte Limitation: ohne consent-tester-Erweiterung kann der Check +nicht entscheiden, ob ein Pre-Chat-Hinweis im Live-DOM vor dem +ersten Nutzer-Input erscheint. Wir flaggen das daher als MEDIUM +"manuell verifizieren". +""" + +from __future__ import annotations + +import logging +import re + +logger = logging.getLogger(__name__) + + +# AI-Anbieter / Modelle / Frameworks, die "AI" auslösen. +_AI_KEYWORDS = ( + "vertex ai", "google vertex", "openai", "gpt-3", "gpt-4", "chatgpt", + "anthropic", "claude.ai", "claude-3", "mistral ai", "huggingface", + "hugging face", "stable diffusion", "midjourney", "llama-2", "llama-3", + "qwen", "deepseek", "perplexity ai", "azure openai", "copilot", + "konversations-ki", "konversations ki", + "ai assistant", "ai-assistant", "ki-assistent", "ki assistent", + "intelligenter assistent", + "automatisierter chat", "chatbot", "live-chat", +) + + +# Phrasen, die als Art-50-Disclosure gelten. +_DISCLOSURE_PHRASES = ( + "sie chatten mit einer ki", "sie kommunizieren mit einer ki", + "automatisierter chat", "automatisierter assistent", + "ki-gestützter", "ki gestützt", "ki-gestuetzt", + "künstliche intelligenz", "kuenstliche intelligenz", + "artificial intelligence", + "art. 50 ai act", "ai act art. 50", "art. 50 ki-vo", + "ki-verordnung", "ki verordnung", + "automatisiertes system", + "generativ", "generative ki", "generative ai", + "large language model", "llm-", + "machine learning", +) + + +def _has_any(text: str, phrases) -> list[str]: + text_lc = (text or "").lower() + if not text_lc: + return [] + return [p for p in phrases if p in text_lc] + + +def _find_ai_in_vendors(cmp_vendors: list[dict]) -> list[dict]: + """Find vendors whose name or category mentions an AI provider.""" + hits: list[dict] = [] + for v in cmp_vendors or []: + haystack = " ".join([ + (v.get("name") or "").lower(), + (v.get("category") or "").lower(), + (v.get("processing_company") or "").lower(), + ]) + if not haystack.strip(): + continue + matched = [k for k in _AI_KEYWORDS if k in haystack] + if matched: + hits.append({ + "vendor": v.get("name") or "—", + "matched": matched[:3], + }) + return hits + + +def check_ai_act_transparency(state: dict) -> list[dict]: + """Return findings about AI Art. 50 transparency obligations.""" + doc_texts = state.get("doc_texts") or {} + dse_text = doc_texts.get("dse") or "" + cookie_text = doc_texts.get("cookie") or "" + cmp_vendors = state.get("cmp_vendors") or [] + + if not dse_text and not cookie_text and not cmp_vendors: + return [] + + ai_mentions_dse = _has_any(dse_text, _AI_KEYWORDS) + ai_mentions_cookie = _has_any(cookie_text, _AI_KEYWORDS) + ai_vendors = _find_ai_in_vendors(cmp_vendors) + + has_ai_signal = bool( + ai_mentions_dse or ai_mentions_cookie or ai_vendors + ) + if not has_ai_signal: + return [] + + disc_dse = _has_any(dse_text, _DISCLOSURE_PHRASES) + disc_cookie = _has_any(cookie_text, _DISCLOSURE_PHRASES) + has_disclosure = bool(disc_dse or disc_cookie) + + findings: list[dict] = [] + summary_signals = ( + f"DSE-AI-Hinweise: {len(ai_mentions_dse)} " + f"(z.B. {', '.join(ai_mentions_dse[:3])}); " + f"Cookie-AI-Hinweise: {len(ai_mentions_cookie)}; " + f"AI-Vendors: {len(ai_vendors)}" + ) + + if not has_disclosure: + findings.append({ + "check_id": "AI-ACT-TRANSPARENCY-001", + "severity": "HIGH", + "severity_reason": "missing", + "title": ( + "AI-Act Art. 50 Transparenz-Hinweis fehlt — " + "KI-System eingesetzt, aber keine Nutzer-Erklärung" + ), + "norm": "AI Act Art. 50 Abs. 1 (Transparenz gegenüber Nutzern)", + "detected_signals": summary_signals, + "ai_vendors": ai_vendors, + "ai_keywords_in_dse": ai_mentions_dse[:5], + "ai_keywords_in_cookie": ai_mentions_cookie[:5], + "action": ( + "DSE und Pre-Chat-UI mit ausdrücklichem Hinweis " + "'Sie kommunizieren mit einer KI (System X)' ergänzen. " + "Anbieter offen nennen + Rechtsgrundlage + Speicherdauer." + ), + }) + else: + # AI detected + DSE-Disclosure vorhanden — aber Pre-Chat-Hinweis + # im Live-DOM kann der Check nicht verifizieren. + findings.append({ + "check_id": "AI-ACT-TRANSPARENCY-002", + "severity": "MEDIUM", + "severity_reason": "manual_review_required", + "title": ( + "AI-Act Art. 50: DSE-Disclosure vorhanden — Pre-Chat-Hinweis " + "im UI manuell verifizieren" + ), + "norm": "AI Act Art. 50 Abs. 1", + "detected_signals": summary_signals, + "ai_vendors": ai_vendors, + "disclosure_in_dse": disc_dse[:3], + "disclosure_in_cookie": disc_cookie[:3], + "action": ( + "Pre-Chat-UI öffnen: vor der ersten Nutzereingabe muss " + "ein klarer Hinweis erscheinen, dass die Konversation " + "mit einer KI geführt wird. Verifizieren ob Banner/Modal " + "vorhanden oder reine Footnote." + ), + }) + + # Zusatzcheck: Wenn AI vorhanden und Rechtsgrundlage = berechtigtes + # Interesse (Art. 6 Abs. 1 lit. f) statt Einwilligung — MEDIUM + if ai_vendors or ai_mentions_dse: + if _legitimate_interest_for_ai(dse_text): + findings.append({ + "check_id": "AI-ACT-RISK-001", + "severity": "MEDIUM", + "severity_reason": "misclassified", + "title": ( + "Rechtsgrundlage 'berechtigtes Interesse' für " + "KI-Verarbeitung — Einwilligung empfehlen" + ), + "norm": "DSGVO Art. 6 Abs. 1 lit. a vs lit. f + AI Act", + "detected_signals": ( + "AI-Provider erkannt; Art. 6 Abs. 1 lit. f als " + "Rechtsgrundlage in DSE genannt" + ), + "ai_vendors": ai_vendors, + "action": ( + "Bei generativer KI (insbesondere mit Drittland-" + "Transfer und Profiling-Verwandtschaft) " + "Rechtsgrundlage auf Einwilligung (Art. 6 Abs. 1 " + "lit. a) umstellen. Interessenabwägung dokumentieren." + ), + }) + + if findings: + logger.info("ai-act-transparency: %d findings", len(findings)) + return findings + + +def _legitimate_interest_for_ai(dse_text: str) -> bool: + """Detect 'Rechtsgrundlage Art. 6 Abs. 1 lit. f' near AI mentions.""" + text_lc = (dse_text or "").lower() + if not text_lc: + return False + # crude proximity check: any of the AI keywords AND lit-f phrase + # within a ~600 char window + import re + lit_f_patterns = ( + "art. 6 abs. 1 lit. f", "artikel 6 abs. 1 lit. f", + "art. 6 1 f", "berechtigtes interesse", "berechtigten interesses", + ) + for ai_kw in _AI_KEYWORDS: + for pos in range(0, len(text_lc) - 200): + window = text_lc[max(0, pos-300):pos+300] + if ai_kw in window and any(p in window for p in lit_f_patterns): + return True + # don't walk every char; jump to next ai_kw occurrence + idx = text_lc.find(ai_kw, pos) + if idx == -1: + break + pos = idx + 1 + break + return False diff --git a/backend-compliance/compliance/services/cmp_fingerprint_check.py b/backend-compliance/compliance/services/cmp_fingerprint_check.py new file mode 100644 index 00000000..570d9755 --- /dev/null +++ b/backend-compliance/compliance/services/cmp_fingerprint_check.py @@ -0,0 +1,55 @@ +"""B8 — CMP-Provider-Fingerprint-Check. + +Findings wenn: +- Cookie-Banner wurde erkannt (banner_result.detected=True) +- Aber CMP-Provider/Vendor nicht ableitbar (provider in {Generic, "", "?"}) + +Das ist nach EDPB-Taskforce-Methodik MEDIUM: ohne klare CMP-Identität +ist schwer zu beurteilen, welches Consent-Storage-Format greift, ob +TCF unterstützt wird, und wie der DSB mit dem CMP-Anbieter +kommunizieren kann (Audit-Trail / DPA). + +Provider-Detection läuft schon im consent-tester. Hier nur die +Lückenmeldung wenn der Banner zwar steht aber der Anbieter offen +bleibt. +""" + +from __future__ import annotations + +import logging + +logger = logging.getLogger(__name__) + + +_KNOWN_PROVIDERS = ( + "usercentrics", "onetrust", "cookiebot", "cookiepro", + "sourcepoint", "consentmanager", "klaro!", "borlabs", + "iubenda", "didomi", "trustarc", "complianz", +) + + +def check_cmp_fingerprint(state: dict) -> dict | None: + br = state.get("banner_result") or {} + detected = br.get("detected") or br.get("banner_detected") + if not detected: + return None + provider = (br.get("provider") or br.get("banner_provider") or "").lower() + is_known = any(k in provider for k in _KNOWN_PROVIDERS) + if is_known: + return None + # Banner steht, aber CMP-Provider ist generisch oder leer. + finding = { + "check_id": "COOKIE-CONSENT-UX-002", + "severity": "MEDIUM", + "severity_reason": "incomplete", + "title": "Cookie-Banner erkannt, aber CMP-Provider nicht eindeutig", + "norm": "EDPB Cookie Banner Taskforce-Report (Transparenz CMP)", + "detected_provider": provider or "—", + "action": ( + "CMP-Provider in der DSE benennen (Auftragsverarbeiter), " + "Consent-Storage-Format dokumentieren (TCF / proprietär), " + "und Audit-Trail-Zugang für den DSB sicherstellen." + ), + } + logger.info("B8 CMP-fingerprint: detected_provider=%r is generic", provider) + return finding diff --git a/backend-compliance/compliance/services/cross_doc_dpo_check.py b/backend-compliance/compliance/services/cross_doc_dpo_check.py new file mode 100644 index 00000000..55c21c2f --- /dev/null +++ b/backend-compliance/compliance/services/cross_doc_dpo_check.py @@ -0,0 +1,66 @@ +"""B6 — DPO in DSE genannt, im Impressum aber nicht verlinkt. + +Best-Practice-Check nach DSGVO Art. 37 + § 5 TMG-Geist: +wenn die DSE einen Datenschutzbeauftragten benennt, sollte er +auch im Impressum referenziert sein (mind. Verweis "DSB siehe DSE") +— sonst geht die Kontaktmöglichkeit verloren, wenn die DSE separat +publiziert wird. + +Severity LOW (nicht zwingend Pflicht), aber relevant für DSBs. +""" + +from __future__ import annotations + +import logging +import re + +logger = logging.getLogger(__name__) + +# Phrasen, die einen DSB / DPO in einem Text als benannt markieren +_DSB_NAMED_PATTERNS = [ + re.compile(r"datenschutzbeauftrag\w+", re.I), + re.compile(r"data\s+protection\s+officer\b", re.I), + re.compile(r"\bdpo\b", re.I), + re.compile(r"privacy@\S+", re.I), + re.compile(r"datenschutz@\S+", re.I), +] + + +def _names_dsb(text: str) -> list[str]: + if not text: + return [] + out: list[str] = [] + for pat in _DSB_NAMED_PATTERNS: + for m in pat.finditer(text): + out.append(m.group(0)) + if len(out) >= 3: + return out + return out + + +def check_dpo_cross_doc(state: dict) -> dict | None: + """Return a finding when DSE names a DPO but Impressum does not.""" + doc_texts = state.get("doc_texts") or {} + dse = doc_texts.get("dse") or "" + imp = doc_texts.get("impressum") or "" + if not dse or not imp: + return None + dse_hits = _names_dsb(dse) + imp_hits = _names_dsb(imp) + if dse_hits and not imp_hits: + finding = { + "check_id": "IMPRESSUM-DPO-001", + "severity": "LOW", + "severity_reason": "incomplete", + "title": "DSB im Impressum nicht verlinkt", + "norm": "DSGVO Art. 37 (Best Practice) + § 5 TMG-Geist", + "evidence_dse": dse_hits[:2], + "action": ( + "Im Impressum den DSB-Kontakt verlinken oder Verweis " + "auf die Datenschutzerklärung ergänzen, damit Betroffene " + "auch über das Impressum den DSB erreichen." + ), + } + logger.info("B6 DPO-cross-doc: DSE has DPO, Impressum doesn't") + return finding + return None diff --git a/backend-compliance/compliance/services/doc_staleness_check.py b/backend-compliance/compliance/services/doc_staleness_check.py new file mode 100644 index 00000000..b7b6f133 --- /dev/null +++ b/backend-compliance/compliance/services/doc_staleness_check.py @@ -0,0 +1,133 @@ +"""B7 — Doc-Staleness: Datum extrahieren + Aktualität bewerten. + +Findings, wenn ein rechtliches Dokument (AGB, Nutzungsbedingungen, +Widerruf, DSE) über N Jahre alt ist. Default-Cap: 3 Jahre für AGB/ +Nutzungsbedingungen (TERMS-STALENESS-001), 2 Jahre für DSE. + +Heuristik für Datumsextraktion: + - "Stand: November 2018" / "Stand November 2018" / "Stand: Dezember 2018" + - "Letzte Aktualisierung: 2018-12-01" + - "Version vom 1.12.2018" + - "Last updated: December 2018" +""" + +from __future__ import annotations + +import logging +import re +from datetime import datetime + +logger = logging.getLogger(__name__) + +_MONTHS_DE = { + "januar": 1, "februar": 2, "märz": 3, "maerz": 3, "april": 4, + "mai": 5, "juni": 6, "juli": 7, "august": 8, "september": 9, + "oktober": 10, "november": 11, "dezember": 12, +} +_MONTHS_EN = { + "january": 1, "february": 2, "march": 3, "april": 4, "may": 5, + "june": 6, "july": 7, "august": 8, "september": 9, "october": 10, + "november": 11, "december": 12, +} + +# Match patterns like "Stand: Dezember 2018" / "Stand November 2018" +_PAT_STAND = re.compile( + r"(?:stand|version|letzte\s+aktualisierung|last\s+updated|" + r"last\s+revised)\s*[:\-]?\s*" + r"(?:vom\s+)?" + r"(?:(?P\d{1,2})[.\-/])?" + r"(?P" + r"januar|februar|m[äa]rz|april|mai|juni|juli|august|september|" + r"oktober|november|dezember|" + r"january|february|march|april|may|june|july|august|september|" + r"october|november|december|" + r"\d{1,2}" + r")" + r"[.\s\-/]+" + r"(?P\d{4})", + re.I, +) + + +_AGE_THRESHOLDS_YEARS = { + "agb": 3, + "nutzungsbedingungen": 3, + "widerruf": 2, + "dse": 2, + "impressum": 5, # less critical + "cookie": 2, +} + + +def _extract_date(text: str) -> tuple[int, int, int] | None: + """Return (year, month, day) of the most recent revision date.""" + if not text: + return None + candidates: list[tuple[int, int, int]] = [] + for m in _PAT_STAND.finditer(text): + try: + year = int(m.group("year")) + mon_str = (m.group("month") or "").lower() + day = int(m.group("day") or 1) + if mon_str.isdigit(): + month = int(mon_str) + else: + month = (_MONTHS_DE.get(mon_str) + or _MONTHS_EN.get(mon_str)) + if not month or not (1 <= month <= 12): + continue + if year < 2000 or year > 2100: + continue + candidates.append((year, month, day)) + except (ValueError, TypeError): + continue + if not candidates: + return None + # newest date wins + candidates.sort(reverse=True) + return candidates[0] + + +def check_staleness(state: dict) -> list[dict]: + """Run staleness check across legal docs.""" + findings: list[dict] = [] + doc_texts = state.get("doc_texts") or {} + today = datetime.utcnow() + for doc_type, text in doc_texts.items(): + threshold_years = _AGE_THRESHOLDS_YEARS.get(doc_type) + if not threshold_years: + continue + date = _extract_date(text) + if not date: + continue + year, month, day = date + try: + doc_date = datetime(year, month, min(day, 28)) + except ValueError: + continue + age_years = (today - doc_date).days / 365.25 + if age_years < threshold_years: + continue + sev = "HIGH" if age_years > threshold_years * 2 else "MEDIUM" + findings.append({ + "check_id": f"DOC-STALENESS-{doc_type.upper()}", + "doc_type": doc_type, + "severity": sev, + "severity_reason": "incomplete", + "title": ( + f"{doc_type.title()} ist {int(age_years)} Jahre alt " + f"(Stand {year:04d}-{month:02d})" + ), + "norm": "Sorgfaltspflicht (laufende Anpassung an Rechtsänderungen)", + "doc_date": f"{year:04d}-{month:02d}-{day:02d}", + "age_years": round(age_years, 1), + "threshold_years": threshold_years, + "action": ( + f"{doc_type.title()} überprüfen und an aktuelle " + "Gesetzeslage anpassen (DSGVO-Updates, AI Act, DSA, " + "neue BGH-Rechtsprechung). Stand-Datum aktualisieren." + ), + }) + if findings: + logger.info("B7 staleness: %d findings", len(findings)) + return findings diff --git a/backend-compliance/compliance/services/finding_plausibility_check.py b/backend-compliance/compliance/services/finding_plausibility_check.py new file mode 100644 index 00000000..66693e0f --- /dev/null +++ b/backend-compliance/compliance/services/finding_plausibility_check.py @@ -0,0 +1,329 @@ +"""LLM Plausibility Re-Evaluation for MC findings. + +Why this exists: + MC-DB labels are historic compliance-officer questions ("Dokumentiert + die DSI alle Datenübermittlungen gemäß Art. 49 Abs. 1 Unterabs. 2 + DS-GVO?"). When the deterministic regex+LLM-verify pipeline flags + them as FAIL, the question stays as the title. The reader sees + "we don't know" — unhelpful. + +What this does: + AFTER the MC pipeline finished, run a second LLM pass over EVERY + remaining FAIL with the original doc-text. The LLM: + 1. Reformulates the question as a STATEMENT-OF-TOPIC + ("Drittland-Übermittlungen nach Art. 49 Abs. 1 Unterabs. 2 DS-GVO") + 2. Suggests a plausible severity (or DROP if the finding is bogus) + 3. Produces a CONCRETE recommendation ("Im Abschnitt 'Drittland' + der DSE Mechanismus pro Empfänger ergänzen") + +What this does NOT do: + - Touch the MC-DB. Original label stays in c.label. + - Touch passed/skipped/regulation/matched_text — those are facts. + - Run for non-fails or already-handled checks. + +Stamping schema on each Check (CheckItem dataclass): + llm_title: str — reformulated topic statement + llm_severity: str — suggested severity ("HIGH"|"MED"|"LOW"|"DROP") + llm_recommendation: str — concrete fix recommendation + llm_drop: bool — True if the LLM judged the finding not plausible + llm_plausibility: float — 0..1 confidence (optional) + +The mail-render V2 reads these stamps and renders them next to the +original label (🤖 LLM-Plausibility box). + +Config: + OLLAMA_URL default "http://host.docker.internal:11434" + PLAUSIBILITY_LLM_MODEL default "qwen3:30b-a3b" + PLAUSIBILITY_BATCH_SIZE default 8 + PLAUSIBILITY_TIMEOUT_S default 60.0 +""" + +from __future__ import annotations + +import hashlib +import json +import logging +import os + +import httpx + +logger = logging.getLogger(__name__) + +OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") +MODEL = os.getenv("PLAUSIBILITY_LLM_MODEL", "qwen3:30b-a3b") +BATCH_SIZE = int(os.getenv("PLAUSIBILITY_BATCH_SIZE", "8")) +TIMEOUT = float(os.getenv("PLAUSIBILITY_TIMEOUT_S", "60.0")) + +# In-memory cache: (input_hash) -> result_dict. Survives one run. +_CACHE: dict[str, dict] = {} + + +def _checksum(check_id: str, label: str, hint: str, + doc_excerpt: str) -> str: + """Stable hash of the LLM input — avoid re-asking on retries.""" + h = hashlib.sha256() + h.update(check_id.encode()) + h.update(b"\x00") + h.update(label.encode()) + h.update(b"\x00") + h.update(hint.encode()) + h.update(b"\x00") + h.update(doc_excerpt[:2000].encode()) + return h.hexdigest()[:16] + + +_SYSTEM_PROMPT = ( + "Du bist Compliance-Plausibilitäts-Auditor für deutsche " + "Datenschutz-Prüfberichte. Für jeden Finding-Eintrag bekommst du " + "die MC-Pflichtfrage, den LLM-Hinweis und einen Ausschnitt aus " + "dem geprüften Dokument.\n\n" + "REGELN — sehr wichtig:\n" + "1. Du gibst für JEDEN Finding-Eintrag im Input GENAU EINEN Output-" + "Eintrag zurück (keine ausgelassen, keine zusätzlichen).\n" + "2. Die ID muss BUCHSTABENGENAU vom Input übernommen werden — " + "nicht abgekürzt, nicht umformatiert (Beispiel: \"mc-DATA-3953-A04\" " + "bleibt \"mc-DATA-3953-A04\").\n" + "3. Reihenfolge der Output-Items entspricht der Input-Reihenfolge.\n\n" + "Pro Finding:\n" + "- title: TOPIC-STATEMENT (max 80 Zeichen, ohne Frageton, " + "nennt die Norm wenn sinnvoll). Beispiel: " + "Frage \"Dokumentiert die DSI Drittlandtransfers nach Art. 49?\" " + "→ title \"Drittlandtransfer-Doku Art. 49 DSGVO\".\n" + "- severity: HIGH (klar verletzt), MEDIUM (verletzt, weniger " + "kritisch), LOW (unsicher / manuelle Prüfung), DROP " + "(Auszug zeigt klar dass die Anforderung erfüllt ist).\n" + "- recommendation: KONKRETE Aktion (max 200 Zeichen), nennt " + "WAS und WO. Beispiel: \"Im Abschnitt 'Drittlandtransfer' " + "der DSE pro Empfänger einen Mechanismus nach Art. 49 ergänzen\".\n" + "- drop: true wenn severity=DROP, sonst false.\n\n" + "JSON-Schema (genauso antworten):\n" + "{\"findings\":[" + "{\"id\":\"\",\"title\":\"...\"," + "\"severity\":\"HIGH|MEDIUM|LOW|DROP\"," + "\"recommendation\":\"...\",\"drop\":false}" + "]}\n\n" + "Beispiel-Antwort bei 2 Inputs mit IDs mc-A und mc-B:\n" + "{\"findings\":[{\"id\":\"mc-A\",\"title\":\"Norm X erfüllen\"," + "\"severity\":\"MEDIUM\",\"recommendation\":\"In Abschnitt Y " + "ergänzen: Norm X erfüllt\",\"drop\":false}," + "{\"id\":\"mc-B\",\"title\":\"Norm Z geprüft\",\"severity\":\"DROP\"," + "\"recommendation\":\"Bereits erfüllt — Hinweis im Doc Z3\"," + "\"drop\":true}]}" +) + + +def _build_user_prompt(items: list[dict], doc_title: str, + doc_excerpt: str) -> str: + findings_block = "\n".join( + f'{i+1}. ID="{it["id"]}" | FRAGE: {it["label"]} | ' + f'HINT: {it.get("hint", "")[:200]} | SEV_REGEX: {it.get("severity")}' + for i, it in enumerate(items) + ) + return ( + f"DOKUMENT: {doc_title}\n\n" + f"DOKUMENT-AUSZUG (max 4000 Zeichen):\n{doc_excerpt[:4000]}\n\n" + f"FINDINGS ZU BEWERTEN:\n{findings_block}" + ) + + +async def _ask_llm_batch(items: list[dict], doc_title: str, + doc_excerpt: str) -> dict[str, dict]: + """Send a batch of up to BATCH_SIZE findings to the LLM.""" + body = { + "model": MODEL, + "messages": [ + {"role": "system", "content": _SYSTEM_PROMPT}, + {"role": "user", "content": _build_user_prompt( + items, doc_title, doc_excerpt, + )}, + ], + "format": "json", + "stream": False, + "options": {"temperature": 0.0, "seed": 42, "num_predict": 1500}, + } + out: dict[str, dict] = {} + input_ids = [it["id"] for it in items] + try: + async with httpx.AsyncClient(timeout=TIMEOUT) as c: + r = await c.post(f"{OLLAMA_URL}/api/chat", json=body) + r.raise_for_status() + content = (r.json().get("message") or {}).get("content", "") + if not content: + logger.warning("plausibility LLM returned empty content") + return out + try: + data = json.loads(content) + except json.JSONDecodeError as je: + logger.warning( + "plausibility LLM JSON parse failed: %s; raw=%s", + je, content[:300], + ) + return out + llm_findings = data.get("findings") or [] + if not llm_findings: + logger.warning( + "plausibility LLM returned 0 findings for %d input " + "items; raw=%s", len(items), content[:300], + ) + return out + # Phase 1: exact ID match + id_set = set(input_ids) + for entry in llm_findings: + fid = (entry.get("id") or "").strip() + if fid in id_set and fid not in out: + out[fid] = _entry_to_stamp(entry) + # Phase 2: position fallback — for any input item still + # unmapped, use the LLM finding at the same index if it's + # otherwise unclaimed. + if len(out) < len(input_ids): + claimed_indices: set[int] = set() + for idx, entry in enumerate(llm_findings): + fid = (entry.get("id") or "").strip() + if fid in out: + claimed_indices.add(idx) + for idx, input_id in enumerate(input_ids): + if input_id in out: + continue + if idx < len(llm_findings) and idx not in claimed_indices: + out[input_id] = _entry_to_stamp(llm_findings[idx]) + claimed_indices.add(idx) + # Phase 3: fuzzy match by ID-tail + if len(out) < len(input_ids): + unmapped_ids = [i for i in input_ids if i not in out] + used_entries: set[int] = set() + for idx, entry in enumerate(llm_findings): + fid = (entry.get("id") or "").strip().lower() + if not fid or any(entry == out.get(i) for i in unmapped_ids): + continue + if idx in used_entries: + continue + for inp in unmapped_ids: + if inp in out: + continue + if inp[-8:].lower() in fid or fid in inp.lower(): + out[inp] = _entry_to_stamp(entry) + used_entries.add(idx) + break + if not out: + logger.warning( + "plausibility could not map any of %d input IDs; " + "raw=%s", len(input_ids), content[:300], + ) + else: + logger.info( + "plausibility mapped %d/%d findings", len(out), + len(input_ids), + ) + except Exception as e: + logger.warning("plausibility batch failed: %s", e) + return out + + +def _entry_to_stamp(entry: dict) -> dict: + return { + "llm_title": (entry.get("title") or "")[:200], + "llm_severity": (entry.get("severity") or "").upper(), + "llm_recommendation": (entry.get("recommendation") or "")[:400], + "llm_drop": bool(entry.get("drop", False)), + } + + +async def verify_plausibility(results, doc_texts: dict[str, str]) -> None: + """Stamp llm_* fields onto every FAIL CheckItem in results. + + Args: + results: list of DocCheckResult, each with .checks (list of CheckItem) + and .doc_type + doc_texts: doc_type -> source text excerpt for context + """ + if not results: + return + # Gather candidate fails per doc_type so the prompt can scope the + # excerpt correctly. + by_doc: dict[str, list] = {} + by_doc_meta: dict[str, str] = {} + for r in results: + dt = getattr(r, "doc_type", "") + label = getattr(r, "label", "") or dt + for c in getattr(r, "checks", []) or []: + if getattr(c, "passed", True) or getattr(c, "skipped", False): + continue + # MC checks only — skip the structural P-* placement findings + cid = (getattr(c, "id", "") or "").lower() + if not cid.startswith("mc-"): + continue + by_doc.setdefault(dt, []).append(c) + by_doc_meta[dt] = label + + if not by_doc: + return + + total = sum(len(v) for v in by_doc.values()) + logger.info("plausibility-check: %d findings across %d docs", + total, len(by_doc)) + + for dt, checks in by_doc.items(): + doc_title = by_doc_meta.get(dt) or dt + doc_text = doc_texts.get(dt) or "" + if not doc_text: + # Fall back to DSE excerpt when the doc has no own text + doc_text = doc_texts.get("dse") or "" + for i in range(0, len(checks), BATCH_SIZE): + batch = checks[i:i + BATCH_SIZE] + items = [] + for c in batch: + items.append({ + "id": getattr(c, "id", ""), + "label": getattr(c, "label", ""), + "hint": getattr(c, "hint", "") or "", + "severity": getattr(c, "severity", ""), + }) + # Cache lookup per item — skip those already cached. + uncached_items: list[dict] = [] + for it in items: + key = _checksum(it["id"], it["label"], it["hint"], doc_text) + if key in _CACHE: + continue + uncached_items.append(it) + if not uncached_items: + cache_results = {it["id"]: _CACHE[_checksum( + it["id"], it["label"], it["hint"], doc_text, + )] for it in items} + else: + cache_results = await _ask_llm_batch( + uncached_items, doc_title, doc_text, + ) + for it in uncached_items: + rid = it["id"] + if rid in cache_results: + key = _checksum( + it["id"], it["label"], it["hint"], doc_text, + ) + _CACHE[key] = cache_results[rid] + # add cached ones too + for it in items: + if it["id"] not in cache_results: + key = _checksum( + it["id"], it["label"], it["hint"], doc_text, + ) + if key in _CACHE: + cache_results[it["id"]] = _CACHE[key] + # Stamp onto each CheckItem + stamped = 0 + for c in batch: + cid = getattr(c, "id", "") + if cid in cache_results: + res = cache_results[cid] + try: + c.llm_title = res.get("llm_title", "") or "" + sev = res.get("llm_severity", "") or "" + c.llm_severity = sev if sev in ( + "HIGH", "MEDIUM", "LOW", "DROP") else "" + c.llm_recommendation = res.get( + "llm_recommendation", "") or "" + c.llm_drop = bool(res.get("llm_drop", False)) + stamped += 1 + except Exception: + pass + logger.info("plausibility-check %s: batch %d → %d stamped", + dt, len(batch), stamped) diff --git a/backend-compliance/compliance/services/mail_render_v2/__init__.py b/backend-compliance/compliance/services/mail_render_v2/__init__.py new file mode 100644 index 00000000..4fbcf32b --- /dev/null +++ b/backend-compliance/compliance/services/mail_render_v2/__init__.py @@ -0,0 +1,16 @@ +"""Mail Render V2 — unified, consistent layout for the audit mail. + +The original Step-5 HTML composition grew across 27+ render functions, +each with its own inline styles. Result: inconsistent colors, +typography, and card widths. V2 fixes that with: + + - `_style.py` ONE place for colors, fonts, spacing helpers + - `_cookie_inventory.py` SINGLE cookie list merged from DSE / table / + live browser, with per-cookie status + - `_blocks.py` Header / TOC / Critical / Per-Doc / + Per-Theme / Caveats / Footer renderers + - `_compose.py` compose_v2(state) → full_html + +Activate via env var `MAIL_RENDER_V2=true`. Default is the legacy +renderer so we can A/B compare in Mailpit. +""" diff --git a/backend-compliance/compliance/services/mail_render_v2/_actions.py b/backend-compliance/compliance/services/mail_render_v2/_actions.py new file mode 100644 index 00000000..44a4c8b0 --- /dev/null +++ b/backend-compliance/compliance/services/mail_render_v2/_actions.py @@ -0,0 +1,296 @@ +"""Mail-V2 Action library — turn findings into 'what to do where'. + +Each finding type maps to a concrete action recommendation. The +mapping is intentionally pattern-matched (not LLM-generated): the +audit is deterministic, so the corrective action must be too. + +Patterns matched by: + - finding `id` prefix (mc-impressum-handelsregister → impressum/HR) + - severity_reason (factually_wrong / missing / misclassified) + - mismatch_type (dsi_under_actual / table_under_actual / ...) + - cookie field name (country / duration / processing_company) + +Fallback: "Manuelle Prüfung beim DSB erforderlich" with finding hint. + +Returns an Action dict: + - title: short imperative ("Sitzland ergänzen") + - target: where to fix ("DSE / Vendor-Liste") + - detail: extended explanation + - aggregation_key: groupBy key for bulk recommendations + ("missing_country" / "long_retention" / ...) + - effort: "low" | "med" | "hi" +""" + +from __future__ import annotations + +from dataclasses import asdict, dataclass + + +@dataclass +class Action: + title: str + target: str + detail: str + aggregation_key: str | None + effort: str # low | med | hi + + def to_dict(self) -> dict: + return asdict(self) + + +# ── Field-level actions for cookie inventory ────────────────────── + +def cookie_field_missing_action(field: str, cookie_name: str, + vendor: str) -> Action | None: + """Return action when a cookie field is missing (or unknown).""" + if field == "country": + return Action( + title="Sitzland ergänzen", + target="DSE / Vendor-Tabelle", + detail=(f"Für Cookie '{cookie_name}' (Vendor {vendor or '—'}) " + "ist kein Sitzland der verarbeitenden Stelle angegeben. " + "Art. 13 Abs. 1 lit. a DSGVO verlangt die Identität + " + "Anschrift des Verantwortlichen."), + aggregation_key="missing_country", + effort="low", + ) + if field == "duration": + return Action( + title="Speicherdauer angeben", + target="DSE / Cookie-Tabelle", + detail=(f"Cookie '{cookie_name}' hat keine deklarierte " + "Speicherdauer. Art. 13 Abs. 2 lit. a DSGVO verlangt " + "die Dauer der Speicherung oder ein Kriterium dafür."), + aggregation_key="missing_duration", + effort="low", + ) + if field == "retention_grounds": + return Action( + title="Löschfrist + Rechtsgrundlage angeben", + target="Löschkonzept + DSE", + detail=(f"Für Cookie '{cookie_name}' fehlt eine konkrete " + "Löschfrist. § 35 BDSG + DSK-Standard verlangen ein " + "dokumentiertes Löschkonzept pro Datenkategorie."), + aggregation_key="missing_retention", + effort="med", + ) + if field == "processing_company": + return Action( + title="Verantwortliche Stelle nennen", + target="DSE", + detail=(f"Cookie '{cookie_name}' nennt keinen Verantwortlichen " + "(Firma + Adresse). Art. 13 Abs. 1 DSGVO Pflichtangabe."), + aggregation_key="missing_processing_company", + effort="low", + ) + if field == "third_country": + return Action( + title="Drittlandtransfer absichern", + target="DSE + AVV-Anhang", + detail=(f"Cookie '{cookie_name}' (Vendor {vendor or '—'}) " + "verarbeitet Daten außerhalb EU/EWR. Erforderlich: " + "Angemessenheitsbeschluss, Standardvertragsklauseln " + "oder ausdrückliche Einwilligung (Art. 44 ff. DSGVO)."), + aggregation_key="missing_third_country", + effort="med", + ) + if field == "category": + return Action( + title="Kategorie zuordnen", + target="Cookie-Tabelle", + detail=(f"Cookie '{cookie_name}' hat keine Kategorie. EDPB " + "Cookie-Sweep verlangt: technisch notwendig / " + "Statistik / Marketing / Externe Medien."), + aggregation_key="missing_category", + effort="low", + ) + return None + + +# ── Status-level actions (UNDOC / ORPH / MISMATCH) ─────────────── + +def cookie_status_action(status_code: str, cookie_name: str, + vendor: str) -> Action | None: + if status_code == "UNDOC": + return Action( + title="Cookie deklarieren oder entfernen", + target="CMP-Config + DSE", + detail=(f"Cookie '{cookie_name}' wird im Browser gesetzt, ist " + "aber nicht in DSE/Cookie-Tabelle deklariert. § 25 " + "TDDDG: entweder Deklaration nachholen oder Cookie " + "blockieren (CMP-Trigger prüfen)."), + aggregation_key="undoc_cookies", + effort="med", + ) + if status_code == "ORPH": + return Action( + title="Veraltete Cookie-Angabe entfernen", + target="DSE / Cookie-Tabelle", + detail=(f"Cookie '{cookie_name}' ist in DSE deklariert, wird " + "aber im Live-Browser nicht gesetzt. Veraltete Angabe " + "entfernen, um Transparenz zu wahren."), + aggregation_key="orphan_cookies", + effort="low", + ) + if status_code == "MISMATCH": + return Action( + title="Cookie-Werte korrigieren", + target="DSE / Cookie-Tabelle", + detail=(f"Cookie '{cookie_name}': deklarierte Werte weichen von " + "tatsächlich gesetzten ab. Tabelle anpassen oder " + "Cookie-Setup korrigieren."), + aggregation_key="mismatch_cookies", + effort="med", + ) + return None + + +# ── Retention-comparison actions ───────────────────────────────── + +def retention_action(retention_finding: dict) -> Action | None: + mt = retention_finding.get("mismatch_type") + cookie = retention_finding.get("cookie_name", "—") + if mt == "dsi_under_actual": + return Action( + title="DSE-Speicherdauer korrigieren", + target="DSE", + detail=(f"DSE behauptet für '{cookie}' kürzere Speicherdauer als " + "real. Wert in DSE auf reale Dauer anpassen ODER Cookie-" + "Setup auf deklarierte Dauer reduzieren."), + aggregation_key="dsi_too_short", + effort="low", + ) + if mt == "table_under_actual": + return Action( + title="Cookie-Tabelle korrigieren", + target="Cookie-Tabelle / CMP", + detail=(f"Cookie-Tabelle behauptet für '{cookie}' kürzere Dauer " + "als real. Wert anpassen oder Cookie-Lifetime reduzieren."), + aggregation_key="table_too_short", + effort="low", + ) + if mt == "dsi_vs_table": + return Action( + title="DSE und Cookie-Tabelle synchronisieren", + target="DSE + Cookie-Tabelle", + detail=(f"DSE und Cookie-Tabelle geben unterschiedliche Werte " + f"für '{cookie}' an. Werte abgleichen."), + aggregation_key="dsi_table_mismatch", + effort="low", + ) + if mt == "actual_under_table": + return Action( + title="Speicherdauer-Cap dokumentieren (Safari-ITP)", + target="DSE", + detail=(f"Cookie '{cookie}' lebt real kürzer als deklariert — " + "wahrscheinlich Safari ITP 7-Tage-Cap. In DSE ergänzen: " + "'Auf Safari-Geräten kann die Speicherdauer durch ITP " + "verkürzt werden.'"), + aggregation_key="safari_itp", + effort="low", + ) + return None + + +# ── Reachability actions (B1) ──────────────────────────────────── + +def reachability_action(rb1: dict) -> Action | None: + if rb1.get("passed"): + return None + reason = rb1.get("severity_reason") + if reason == "missing": + return Action( + title="Cookie-Einstellungen-Link im Footer ergänzen", + target="Website-Footer (alle Seiten)", + detail=("Art. 7 Abs. 3 DSGVO: Widerruf muss so einfach wie " + "Erteilung sein. Footer-Link 'Cookie-Einstellungen' " + "ergänzen, der den CMP direkt öffnet (kein neuer Tab, " + "kein Zwischendokument)."), + aggregation_key="footer_reachability", + effort="low", + ) + if reason == "misclassified": + return Action( + title="CMP direkt öffnen statt neuer Tab", + target="Footer-Link-Config", + detail=("Bestehender Footer-Link öffnet die CMP nicht direkt. " + "JavaScript-Trigger umstellen: kein target=_blank, " + "keine externe Policy-Seite — CMP-Layer direkt öffnen."), + aggregation_key="footer_reachability", + effort="low", + ) + if reason == "factually_wrong": + return Action( + title="Eigenen CMP statt Browser-Verweis", + target="Footer + CMP", + detail=("Nutzer wird auf Browser-Einstellungen verwiesen — das " + "ist nach LfDI BW kein gleichwertiger Widerruf. Eigenen " + "CMP-Re-Open-Mechanismus implementieren."), + aggregation_key="footer_reachability", + effort="med", + ) + return None + + +# ── Generic finding → action ──────────────────────────────────── + +_ID_PATTERNS = { + "handelsregister": ("HR-Eintrag im Impressum ergänzen", + "Impressum", + "§ 5 Abs. 1 Nr. 4 TMG: Registereintrag mit " + "Registergericht + HR-Nr."), + "ust-id": ("USt-IdNr. ergänzen", + "Impressum", + "§ 5 Abs. 1 Nr. 6 TMG: USt-IdNr. falls vorhanden."), + "vertretungsberechtig": ("Vertretungsberechtigte Person nennen", + "Impressum", + "§ 5 Abs. 1 Nr. 1 TMG"), + "aufsichtsbehoerde": ("Aufsichtsbehörde nennen", + "Impressum", + "§ 5 Abs. 1 Nr. 3 TMG (regulierte Branchen)"), + "berufsordnung": ("Berufsrechtliche Angaben ergänzen", + "Impressum", + "§ 5 Abs. 1 Nr. 5 TMG"), + "dsb": ("DSB benennen", + "DSE", + "Art. 37 ff. DSGVO: Datenschutzbeauftragten benennen + DSE " + "ergänzen."), + "odr": ("OS-Link auf EU-Plattform ergänzen", + "Impressum / AGB", + "Art. 14 EU-VO 524/2013 (B2C-Onlineshop)"), + "widerrufsbelehrung": ("Widerrufsbelehrung anpassen", + "Widerruf-Dokument", + "§ 312g BGB + Art. 246a EGBGB Muster-Widerrufs-" + "belehrung."), +} + + +def derive_generic_action(finding_id: str, label: str, + hint: str) -> Action | None: + """Pattern-match a generic MC finding ID to an action template.""" + fid = (finding_id or "").lower() + haystack = f"{fid} {label.lower()}" + for kw, (title, target, detail) in _ID_PATTERNS.items(): + if kw in haystack: + return Action( + title=title, + target=target, + detail=detail + (f" Hinweis: {hint[:200]}" if hint else ""), + aggregation_key=f"mc_{kw}", + effort="low", + ) + if hint: + return Action( + title="Manuelle Prüfung beim DSB", + target=label or "Doc", + detail=hint[:400], + aggregation_key=None, + effort="med", + ) + return None + + +def action_for_finding(finding_id: str, severity: str, label: str, + hint: str) -> Action | None: + """Top-level entry point for MC findings.""" + return derive_generic_action(finding_id, label, hint) diff --git a/backend-compliance/compliance/services/mail_render_v2/_aggregator.py b/backend-compliance/compliance/services/mail_render_v2/_aggregator.py new file mode 100644 index 00000000..d9311fbc --- /dev/null +++ b/backend-compliance/compliance/services/mail_render_v2/_aggregator.py @@ -0,0 +1,248 @@ +"""Mail-V2 Bulk-Recommendation Aggregator. + +Collects per-item actions (cookie-level, MC-level, retention, B1) +and groups them by `aggregation_key` so the mail can show: + + 🛠 Sofortmaßnahmen + • Sitzland ergänzen für 12 Cookies: _ga, _gid, _fbp, … + • Drittlandtransfer absichern für 5 US-Vendors: Google, Meta, … + • Speicherdauer > 13mo bei 3 Cookies (CNIL-Cap): IDE, _gcl_au, … + +This converts individual fix-recommendations into actionable +"do-this-one-thing-fixes-multiple-cookies" bullets that scale. +""" + +from __future__ import annotations + +from ._actions import ( + Action, + cookie_field_missing_action, + cookie_status_action, + reachability_action, + retention_action, + action_for_finding, +) + + +# ── Group-label registry ───────────────────────────────────────── + +GROUP_LABELS: dict[str, dict] = { + "missing_country": { + "label": "Sitzland ergänzen", + "icon": "🌍", + "norm": "Art. 13 Abs. 1 lit. a DSGVO", + }, + "missing_duration": { + "label": "Speicherdauer ergänzen", + "icon": "⏱", + "norm": "Art. 13 Abs. 2 lit. a DSGVO", + }, + "missing_retention": { + "label": "Löschfrist + Rechtsgrundlage angeben", + "icon": "🗑", + "norm": "§ 35 BDSG", + }, + "missing_processing_company": { + "label": "Verantwortliche Stelle nennen", + "icon": "🏢", + "norm": "Art. 13 Abs. 1 DSGVO", + }, + "missing_third_country": { + "label": "Drittlandtransfer absichern", + "icon": "🌐", + "norm": "Art. 44 ff. DSGVO", + }, + "missing_category": { + "label": "Cookie-Kategorie zuordnen", + "icon": "🏷", + "norm": "EDPB Cookie-Sweep", + }, + "undoc_cookies": { + "label": "Undeklarierte Cookies adressieren", + "icon": "❌", + "norm": "§ 25 Abs. 1 TDDDG", + }, + "orphan_cookies": { + "label": "Veraltete Cookie-Angaben entfernen", + "icon": "👻", + "norm": "Art. 5 Abs. 1 lit. a DSGVO (Transparenz)", + }, + "mismatch_cookies": { + "label": "Cookie-Werte mit Realität abgleichen", + "icon": "🔀", + "norm": "Art. 5 Abs. 1 lit. d DSGVO", + }, + "dsi_too_short": { + "label": "DSE-Speicherdauer korrigieren (zu kurz angegeben)", + "icon": "📏", + "norm": "Art. 13 Abs. 2 DSGVO", + }, + "table_too_short": { + "label": "Cookie-Tabelle-Speicherdauer korrigieren", + "icon": "📏", + "norm": "Art. 13 Abs. 2 DSGVO", + }, + "dsi_table_mismatch": { + "label": "DSE ↔ Cookie-Tabelle synchronisieren", + "icon": "🔁", + "norm": "Art. 5 Abs. 2 DSGVO Rechenschaftspflicht", + }, + "safari_itp": { + "label": "Safari-ITP-Cap in DSE dokumentieren", + "icon": "🍎", + "norm": "DSGVO Transparenzgebot", + }, + "footer_reachability": { + "label": "Footer-Reachability für Widerruf herstellen", + "icon": "🔗", + "norm": "Art. 7 Abs. 3 DSGVO", + }, +} + + +def _generic_group(key: str | None) -> dict: + if not key: + return {"label": "Manuelle Prüfung", "icon": "🔍", "norm": ""} + if key.startswith("mc_"): + kw = key[3:].replace("_", " ").title() + return {"label": f"{kw} ergänzen", "icon": "📝", + "norm": "MC-Prüfung"} + return {"label": key.replace("_", " ").title(), "icon": "•", "norm": ""} + + +# ── Item types collected ──────────────────────────────────────── + +def _cookie_items(state: dict) -> list[tuple[Action, str]]: + """Yield (action, item_label) for every cookie-level concern. + + item_label is what gets aggregated into the bullet list of names. + """ + from ._cookie_inventory import build_cookie_inventory + rows, _ = build_cookie_inventory(state) + items: list[tuple[Action, str]] = [] + for r in rows: + name = r.get("name") or "—" + vendor = r.get("vendor") or "" + label = f"{name}" + (f" ({vendor})" if vendor and vendor != "—" else "") + # Status-level + st_action = cookie_status_action(r["status_code"], name, vendor) + if st_action: + items.append((st_action, label)) + # Field-level + for field, value in ( + ("country", r.get("country")), + ("duration", r.get("duration")), + ("retention_grounds", r.get("retention_grounds")), + ("processing_company", r.get("processing_company")), + ("category", r.get("category")), + ): + if not value or value in ("—", "❌", ""): + fa = cookie_field_missing_action(field, name, vendor) + if fa: + items.append((fa, label)) + if r.get("third_country"): + ta = cookie_field_missing_action("third_country", name, vendor) + if ta: + items.append((ta, label)) + return items + + +def _retention_items(state: dict) -> list[tuple[Action, str]]: + items: list[tuple[Action, str]] = [] + for f in (state.get("retention_findings") or []): + if f.get("matches"): + continue + a = retention_action(f) + if a: + label = (f.get("cookie_name") or "—") + vendor = f.get("vendor_name") or "" + if vendor: + label += f" ({vendor})" + items.append((a, label)) + return items + + +def _reachability_items(state: dict) -> list[tuple[Action, str]]: + a = reachability_action(state.get("reachability_finding") or {}) + if not a: + return [] + return [(a, "Footer")] + + +def _mc_items(state: dict) -> list[tuple[Action, str]]: + items: list[tuple[Action, str]] = [] + for r in (state.get("results") or []): + doc = getattr(r, "label", "") or "" + for c in getattr(r, "checks", []) or []: + if getattr(c, "passed", True) or getattr(c, "skipped", False): + continue + sev = (getattr(c, "severity", "") or "").upper() + if sev not in ("CRITICAL", "HIGH", "MEDIUM"): + continue + a = action_for_finding( + getattr(c, "id", ""), + sev, + getattr(c, "label", ""), + getattr(c, "hint", "") or "", + ) + if a: + items.append((a, doc)) + return items + + +def collect_actions(state: dict) -> list[dict]: + """Top-level: collect every item-action across cookie/retention/B1/MC.""" + raw = ( + _cookie_items(state) + + _retention_items(state) + + _reachability_items(state) + + _mc_items(state) + ) + out: list[dict] = [] + for action, label in raw: + out.append({**action.to_dict(), "item": label}) + return out + + +def group_by_action(state: dict) -> list[dict]: + """Aggregate item-actions by aggregation_key. + + Returns a list of groups: + { + "key": "missing_country", + "label": "Sitzland ergänzen", + "icon": "🌍", + "norm": "Art. 13 Abs. 1 lit. a DSGVO", + "effort": "low", + "count": 12, + "items": ["_ga (Google)", "_gid (Google)", ...], + "first_detail": "..." (first action.detail in the group), + } + sorted by count desc, then by group label. + """ + actions = collect_actions(state) + buckets: dict[str | None, dict] = {} + for a in actions: + key = a.get("aggregation_key") + bucket = buckets.setdefault(key, { + "key": key, + "label": None, "icon": None, "norm": None, + "effort": a.get("effort", "med"), + "items": [], "count": 0, + "first_detail": a.get("detail", ""), + }) + if not bucket["label"]: + meta = GROUP_LABELS.get(key or "") or _generic_group(key) + bucket["label"] = meta["label"] + bucket["icon"] = meta["icon"] + bucket["norm"] = meta["norm"] + item = a.get("item") or "—" + if item not in bucket["items"]: + bucket["items"].append(item) + bucket["count"] = len(bucket["items"]) + groups = list(buckets.values()) + # sort: high-impact (effort=low + many items) first + eff_rank = {"low": 0, "med": 1, "hi": 2} + groups.sort(key=lambda g: (eff_rank.get(g["effort"], 9), + -g["count"], g["label"] or "")) + return groups diff --git a/backend-compliance/compliance/services/mail_render_v2/_blocks.py b/backend-compliance/compliance/services/mail_render_v2/_blocks.py new file mode 100644 index 00000000..88956703 --- /dev/null +++ b/backend-compliance/compliance/services/mail_render_v2/_blocks.py @@ -0,0 +1,367 @@ +"""Mail-V2 section renderers — one function per top-level block. + +Each renderer takes a slice of `state` and returns ready-to-concatenate +HTML using the helpers from `_style`. Every block is full-width, has +the same card shell, and uses the same color palette. + +Finding-bucket renderers (critical / manual / internal) live in +`_blocks_findings.py` to keep this file under the LOC cap. +""" + +from __future__ import annotations + +from html import escape as h + +from ._aggregator import group_by_action +from ._blocks_findings import count_critical, count_internal, count_manual +from ._cookie_inventory import ( + build_cookie_inventory, + inventory_headers, + render_inventory_rows, +) +from ._style import ( + SZ_H3, + SZ_SMALL, + TEXT, + TEXT_MUTED, + card, + chip, + kpi_row, + section, + table, +) + + +# ── Helpers ────────────────────────────────────────────────────── + +def _score_sev(pct: int | None) -> str: + if pct is None: + return "info" + if pct >= 90: + return "pass" + if pct >= 70: + return "info" + if pct >= 40: + return "warn" + return "fail" + + +# ── 1. Header + KPI row ────────────────────────────────────────── + +def render_header(state: dict) -> str: + site = h(state.get("site_name") or "—") + dom = h(state.get("domain") or "") + scorecard = state.get("scorecard") or {} + score_pct = (scorecard.get("totals") or {}).get("pct") + doc_count = state.get("doc_count") or 0 + docs_total = len(state.get("results") or []) + findings = state.get("total_findings") or 0 + vendors = len(state.get("cmp_vendors") or []) + title_html = ( + f'

{site}

' + f'
' + f'{dom} · Compliance-Audit
' + ) + kpis = [ + {"label": "Compliance-Score", + "value": f"{score_pct}%" if score_pct is not None else "—", + "sev": _score_sev(score_pct)}, + {"label": "Findings", "value": str(findings), + "sev": "fail" if findings > 5 else "warn" if findings > 0 else "pass"}, + {"label": "Dokumente", + "value": f"{doc_count}/{docs_total}", "sev": "info"}, + {"label": "Vendors", "value": str(vendors), + "sev": "warn" if vendors > 20 else "info"}, + ] + return title_html + kpi_row(kpis) + + +# ── 2. Table of contents ──────────────────────────────────────── + +def render_toc(state: dict) -> str: + rows = [ + ("#critical", f"Kritische Befunde ({count_critical(state)})"), + ("#manual", f"Manuelle Prüfung ({count_manual(state)})"), + ("#internal", f"Interne Reminder ({count_internal(state)})"), + ("#sofortmassnahmen", "Sofortmaßnahmen"), + ("#per-doc", + f"Pro Dokument ({len(state.get('results') or [])})"), + ("#per-theme", "Pro Thema"), + ("#caveats", + f"Audit-Vorbehalte ({len(state.get('audit_quality_findings') or [])})"), + ("#attach", + f"Anhänge ({1 if state.get('cookie_evidence_slices') else 0})"), + ] + items = "".join( + f'
  • {h(label)}
  • ' + for href, label in rows + ) + return section( + "📋 Inhalt", + f'
      {items}
    ', + ) + + +# ── 4. Per-document blocks ────────────────────────────────────── + +def render_per_doc(state: dict) -> str: + results = state.get("results") or [] + if not results: + return "" + cards = [] + for r in results: + label = h(getattr(r, "label", "") or "—") + url = getattr(r, "url", "") or "" + url_html = (f'{h(url)}') if url else "" + corr = getattr(r, "correctness_pct", 0) or 0 + err = getattr(r, "error", "") or "" + checks = getattr(r, "checks", []) or [] + n_total = len(checks) + n_pass = sum(1 for c in checks if c.passed and not c.skipped) + n_fail = sum(1 for c in checks if not c.passed and not c.skipped) + n_skip = sum(1 for c in checks if c.skipped) + score_sev = _score_sev(corr) + head = ( + f'
    ' + f'
    {label}' + f'
    {url_html}
    ' + f'
    ' + f'{chip(f"{corr}%", score_sev)}
    ' + ) + if err: + body = (f'

    ' + f'{h(err)}

    ') + else: + counts = ( + f'
    ' + f'{n_total} MCs · {n_pass} ✓ · {n_fail} ✗ · {n_skip} ?
    ' + ) + top = [c for c in checks + if not c.passed and not c.skipped][:3] + top_list = "" + if top: + lis = "".join( + f'
  • ' + f'{h(getattr(c, "label", "")[:120])}
  • ' + for c in top + ) + top_list = ( + f'
      {lis}
    ' + ) + body = counts + top_list + cards.append(card(head + body, + sev=score_sev if not err else "info")) + return section(f"📄 4. Pro Dokument ({len(results)})", + "".join(cards), anchor="per-doc") + + +# ── 5. Per-theme blocks ───────────────────────────────────────── + +def render_theme_cookie_banner(state: dict) -> str: + br = state.get("banner_result") or {} + if not br: + return "" + detected = br.get("detected") or br.get("banner_detected") + provider = br.get("provider") or br.get("banner_provider") or "—" + violations = br.get("violations") or len( + (br.get("banner_checks") or {}).get("violations") or []) + body = ( + f'
    Provider: {h(str(provider))} · ' + f'Detected: ' + f'{chip("Ja" if detected else "Nein", "pass" if detected else "fail")} · ' + f'Violations: {violations}
    ' + ) + return card( + f'

    ▶ Cookie-Banner

    ' + + body, + sev="warn" if violations else "pass", + ) + + +def render_theme_cookie_inventory(state: dict) -> str: + rows, summary = build_cookie_inventory(state) + if summary["total"] == 0: + return "" + head = ( + f'

    ' + f'▶ Cookie-Inventar ({summary["total"]})

    ' + f'
    ' + f'{summary["declared"]} deklariert · ' + f'{summary["in_browser"]} im Browser · ' + f'{summary["undoc"]} UNDOC · ' + f'{summary["orph"]} ORPH · ' + f'{summary["ok"]} OK' + f' · {summary["third_country"]} Drittland' + f'
    ' + f'
    ' + f'Fehlende Pflichtangaben — Sitzland: {summary["missing_country"]}' + f' · Speicherdauer: {summary["missing_duration"]}' + f'
    ' + ) + show_rows = render_inventory_rows(rows[:50]) + body = table(inventory_headers(), show_rows) + if len(rows) > 50: + body += ( + f'

    ' + f'… und {len(rows) - 50} weitere

    ' + ) + sev = "fail" if summary["undoc"] else "warn" if summary["orph"] else "pass" + return card(head + body, sev=sev) + + +def render_sofortmassnahmen(state: dict) -> str: + """Aggregated bulk-recommendations: '1 Aktion fixt N Items'.""" + groups = group_by_action(state) + if not groups: + return "" + rows = [] + for g in groups: + items = g["items"] + sample = ", ".join(items[:5]) + more = f" + {len(items) - 5} weitere" if len(items) > 5 else "" + eff_sev = ("pass" if g["effort"] == "low" + else "warn" if g["effort"] == "med" else "fail") + rows.append([ + f'{g.get("icon") or "•"} {h(g["label"])}' + f'
    ' + f'{h(g.get("norm") or "")}
    ', + f'{g["count"]}', + f'
    ' + f'{h(sample)}{h(more)}
    ', + chip(g["effort"].upper(), eff_sev), + ]) + body = table(["Maßnahme", "Anz.", "Betrifft", "Aufwand"], rows) + return section( + f"🛠 Sofortmaßnahmen ({len(groups)})", + '

    ' + 'Eine Aktion behebt mehrere Findings auf einmal — nach Aufwand sortiert.' + '

    ' + body, + sev="warn", + anchor="sofortmassnahmen", + ) + + +def render_theme_retention(state: dict) -> str: + s = state.get("retention_theme_summary") or {} + findings = state.get("retention_findings") or [] + if not s.get("total"): + return "" + head = ( + f'

    ' + f'▶ Speicherdauer-Konsistenz (TH-RETENTION)

    ' + f'
    ' + f'{s["total"]} Cookies · ' + f'{s["passed"]} ✓ · ' + f'{s["failed"]} ✗ · ' + f'{s["incomplete"]} ?' + f'
    ' + ) + fails = [f for f in findings + if not f.get("matches") + and f.get("severity_reason") != "incomplete"][:5] + if not fails: + return card(head, sev="pass") + rows = [] + for f in fails: + sev = (f.get("severity") or "").upper() + sev_key = "fail" if sev == "HIGH" else "warn" + rows.append([ + f'{h(f.get("cookie_name") or "—")}', + h(f.get("vendor_name") or "—"), + h(f.get("mismatch_type") or ""), + chip(sev, sev_key), + ]) + body = table(["Cookie", "Vendor", "Mismatch", "Sev"], rows) + sev = "fail" if s.get("failed", 0) else "warn" + return card(head + body, sev=sev) + + +def render_theme_reachability(state: dict) -> str: + f = state.get("reachability_finding") or {} + if not f: + return "" + passed = f.get("passed") + sev_key = "pass" if passed else ( + "fail" if (f.get("severity") or "").upper() == "HIGH" else "warn") + notes_html = "".join( + f'
  • {h(n)}
  • ' + for n in (f.get("notes") or []) + ) + sub = ( + f'
      ' + f'{notes_html}
    ' if notes_html else "" + ) + head = ( + f'

    ' + f'▶ Mobile Reachability (COOKIE-CONSENT-UX-001)

    ' + f'
    {chip((f.get("severity") or "PASS").upper(), sev_key)} ' + f'{h(f.get("severity_reason") or "ok")}' + f'
    ' + ) + return card(head + sub, sev=sev_key) + + +def render_per_theme(state: dict) -> str: + parts = [ + render_theme_cookie_banner(state), + render_theme_cookie_inventory(state), + render_theme_retention(state), + render_theme_reachability(state), + ] + parts = [p for p in parts if p] + if not parts: + return "" + return section("🎯 5. Pro Thema", "".join(parts), anchor="per-theme") + + +# ── 6. Audit caveats ──────────────────────────────────────────── + +def render_caveats(state: dict) -> str: + fs = state.get("audit_quality_findings") or [] + if not fs: + return "" + items = [] + for f in fs: + sev = (f.get("severity") or "INFO").upper() + sev_key = ("fail" if sev == "HIGH" + else "warn" if sev == "MEDIUM" else "info") + title = h(f.get("title") or f.get("label") or "Vorbehalt") + msg = h(f.get("message") or f.get("hint") or "") + items.append(card( + f'{chip(sev, sev_key)} {title}' + f'
    {msg}
    ', + sev=sev_key, + )) + return section(f"⚠️ 6. Audit-Vorbehalte ({len(fs)})", + "".join(items), sev="warn", anchor="caveats") + + +# ── 7. Attachments ────────────────────────────────────────────── + +def render_attachments(state: dict) -> str: + slices = state.get("cookie_evidence_slices") or [] + if not slices: + return "" + meta = state.get("cookie_evidence_meta") or {} + n = len(slices) + body = ( + f'

    ' + f'Beweis-ZIP evidence-{h(state.get("check_id", "")[:8])}.zip ' + f'mit {n} Slice(s), ' + f'manifest.json + audit_metadata.json (SHA256 pro Slice).

    ' + f'

    ' + f'Quelle: {h(meta.get("url") or "—")}' + f'

    ' + ) + return section("📎 7. Anhänge", body, sev="info", anchor="attach") diff --git a/backend-compliance/compliance/services/mail_render_v2/_blocks_findings.py b/backend-compliance/compliance/services/mail_render_v2/_blocks_findings.py new file mode 100644 index 00000000..7e5b485e --- /dev/null +++ b/backend-compliance/compliance/services/mail_render_v2/_blocks_findings.py @@ -0,0 +1,290 @@ +"""Mail-V2 finding-bucket renderers. + +Separates FAIL items into three buckets — the user's design constraint: + + hard_fail public + evidence → 🔴 Kritische Befunde + manual_review public, no evidence → 🔍 Manuelle Prüfung + internal_reminder internal process → 💼 Reminder (NEVER a fail) + +The MC-DB stays as-is. If the LLM-Plausibility phase has already run +it stamps `c.llm_title` / `c.llm_recommendation` / `c.llm_severity` +onto each check; the renderer picks those up when present, otherwise +falls back to the original MC label verbatim. No question-form +rewriting here — that's the LLM-phase's job. +""" + +from __future__ import annotations + +from html import escape as h + +from ._actions import action_for_finding +from ._label_norm import classify_check +from ._scope_filter import ( + filter_out_of_scope, + get_last_drop_stats, +) +from ._style import ( + SZ_SMALL, + TEXT, + TEXT_MUTED, + card, + chip, + section, +) + + +def _strip_qmark(s: str) -> str: + """Normalise a string for dedup comparison.""" + return (s or "").strip().rstrip("?").strip().lower() + + +def _is_dup(a: str, b: str) -> bool: + """True when a and b carry essentially the same content.""" + aa = _strip_qmark(a) + bb = _strip_qmark(b) + if not aa or not bb: + return False + if aa == bb: + return True + short, long = sorted((aa, bb), key=len) + return short and short in long and len(short) > 30 + + +def _collect_three_buckets(state: dict) -> tuple[list[dict], list[dict], + list[dict]]: + """Split all FAIL items into the three buckets.""" + hard: list[dict] = [] + manual: list[dict] = [] + internal: list[dict] = [] + + business_scope = state.get("business_scope") or set() + for r in state.get("results") or []: + # Drop sector-specific MCs that don't apply to this business + scoped = filter_out_of_scope( + getattr(r, "checks", []) or [], business_scope, + ) + for c in scoped: + sev = (getattr(c, "severity", "") or "").upper() + # LLM-plausibility may downgrade — read llm_severity if set + llm_sev = (getattr(c, "llm_severity", "") or "").upper() + effective_sev = llm_sev or sev + if effective_sev not in ("CRITICAL", "HIGH", "MEDIUM"): + continue + if getattr(c, "passed", True) or getattr(c, "skipped", False): + continue + # LLM may flag a finding as not plausible → drop + if getattr(c, "llm_drop", False): + continue + bucket = classify_check(c) + raw_label = getattr(c, "label", "") + llm_title = getattr(c, "llm_title", "") or "" + llm_recommendation = getattr(c, "llm_recommendation", "") or "" + title = (llm_title or raw_label)[:200] + hint = (getattr(c, "hint", "") or "")[:500] + matched = (getattr(c, "matched_text", "") or "")[:400] + action = action_for_finding( + getattr(c, "id", ""), effective_sev, raw_label, hint, + ) + entry = { + "sev": effective_sev, + "id": getattr(c, "id", ""), + "title": title, + "raw_label": raw_label, + "hint": hint, + "matched": matched, + "llm_recommendation": llm_recommendation, + "doc": getattr(r, "label", ""), + "reg": getattr(c, "regulation", "") or "", + "action": action.to_dict() if action else None, + } + if bucket == "hard_fail" and effective_sev in ("CRITICAL", "HIGH"): + hard.append(entry) + elif bucket == "internal_reminder": + internal.append(entry) + else: + manual.append(entry) + + # B1 reachability (always hard if HIGH — directly observed) + rb1 = state.get("reachability_finding") or {} + if (rb1.get("severity") or "").upper() == "HIGH" and not rb1.get("passed"): + notes = " · ".join(rb1.get("notes") or []) + hard.append({ + "sev": "HIGH", + "id": rb1.get("check_id", "COOKIE-CONSENT-UX-001"), + "title": "Mobile Consent-Reachability — kein Reopen-Link im Footer", + "raw_label": "Mobile Consent-Reachability", + "hint": notes, + "matched": "Footer-Scan: 0 Reopen-Anchor", + "llm_recommendation": "", + "doc": "Website-Footer", + "reg": "DSGVO Art. 7 Abs. 3", + "action": {"title": "Cookie-Einstellungen-Link im Footer ergänzen", + "target": "Website-Footer (alle Seiten)", + "detail": ("Footer-Link 'Cookie-Einstellungen' " + "ergänzen, der den CMP direkt öffnet."), + "effort": "low"}, + }) + + # B3 retention HIGH/MED fails (3-source evidence) + for f in (state.get("retention_findings") or []): + sev = (f.get("severity") or "").upper() + if sev not in ("HIGH", "MEDIUM") or f.get("matches"): + continue + cookie = f.get("cookie_name") or "—" + hard.append({ + "sev": sev, + "id": "TH-RETENTION", + "title": f"Speicherdauer-Konflikt für {cookie}", + "raw_label": "Cookie-Speicherdauer-Konsistenz", + "hint": (f"DSI {f.get('dsi_days')}d · Tabelle " + f"{f.get('table_days')}d · " + f"Realität {f.get('actual_days')}d"), + "matched": (f.get("dsi_sentence") or "")[:200], + "llm_recommendation": "", + "doc": "Cookie-Richtlinie", + "reg": "DSGVO Art. 13 Abs. 2 lit.a", + "action": {"title": ("DSE / Cookie-Tabelle korrigieren " + if "dsi" in (f.get("mismatch_type") or "") + else "Cookie-Lifetime reduzieren"), + "target": "DSE + Cookie-Tabelle", + "detail": f"Mismatch-Typ: {f.get('mismatch_type')}", + "effort": "low"}, + }) + + sev_rank = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2} + hard.sort(key=lambda x: (sev_rank.get(x["sev"], 9), x["title"])) + return hard, manual, internal + + +def count_critical(state: dict) -> int: + hard, _, _ = _collect_three_buckets(state) + return len(hard) + + +def count_manual(state: dict) -> int: + _, manual, _ = _collect_three_buckets(state) + return len(manual) + + +def count_internal(state: dict) -> int: + _, _, internal = _collect_three_buckets(state) + return len(internal) + + +def _render_finding_card(it: dict, *, sev_key: str = "fail") -> str: + head = ( + f'{chip(it["sev"], sev_key)}' + f'{h(it["title"])}' + ) + meta = ( + f'
    ' + f'{h(it["id"])} · {h(it["doc"])} · {h(it["reg"])}
    ' + ) + evidence = "" + if it.get("matched"): + evidence = ( + f'
    ' + f'Beobachtet: {h(it["matched"])}
    ' + ) + hint = "" + if it.get("hint") and not _is_dup(it.get("hint"), it.get("title")): + hint = ( + f'
    ' + f'{h(it["hint"])}
    ' + ) + action_html = "" + a = it.get("action") + if a: + # Skip action.target rendering when it duplicates the title + target = "" if _is_dup(a.get("target", ""), it.get("title")) \ + else a.get("target", "") + # Skip action.detail when it duplicates hint or title + detail = a.get("detail", "") + if _is_dup(detail, it.get("hint")) or _is_dup(detail, it.get("title")): + detail = "" + target_html = ( + f' ({h(target)})' + if target else "" + ) + detail_html = ( + f'
    {h(detail)}
    ' + if detail else "" + ) + action_html = ( + f'
    ' + f'→ {h(a["title"])}{target_html}' + f'{detail_html}' + f'
    ' + ) + llm_html = "" + if it.get("llm_recommendation"): + llm_html = ( + f'
    ' + f'🤖 LLM-Plausibility: ' + f'{h(it["llm_recommendation"])}
    ' + ) + return card(head + meta + evidence + hint + action_html + llm_html, + sev=sev_key) + + +def render_critical(state: dict) -> str: + hard, _, _ = _collect_three_buckets(state) + if not hard: + body = ( + '

    ' + 'Keine HIGH/CRITICAL-Befunde mit harter Evidenz im aktuellen Lauf.' + '

    ' + ) + return section("✅ 1. Kritische Befunde", body, sev="pass", + anchor="critical") + cards = [_render_finding_card(it, sev_key="fail") for it in hard] + intro = ('

    ' + 'Findings mit direkt beobachtbarer Evidenz (öffentliche Daten). ' + 'Pro Befund: Was wir geprüft haben · Beobachtung · Was zu tun ist.' + '

    ') + return section(f"🔴 1. Kritische Befunde ({len(hard)})", + intro + "".join(cards), sev="fail", anchor="critical") + + +def render_manual_review(state: dict) -> str: + _, manual, _ = _collect_three_buckets(state) + drop_stats = get_last_drop_stats() + if not manual: + if drop_stats.get("count"): + note = ('

    ' + f'Keine manuell zu prüfenden Punkte. ' + f'Branchen-spezifische MCs ausgefiltert: ' + f'{drop_stats["count"]} ' + f'({", ".join(f"{k}:{v}" for k,v in drop_stats["by_prefix"].items())})' + '

    ') + return section("✅ 2. Manuelle Prüfung", note, sev="pass", + anchor="manual") + return "" + cards = [_render_finding_card(it, sev_key="warn") for it in manual] + intro = ('

    ' + 'Diese Punkte sind öffentlich prüfbar, aber unser Audit konnte ' + 'sie nicht eindeutig feststellen — Hinweis: Original-MC-Frage. ' + 'Empfehlung: manuell beim Mandanten/DSB klären. ' + 'Die LLM-Plausibilitätsprüfung hilft Frage→Aussage zu wandeln ' + '(siehe 🤖-Block pro Finding falls schon gelaufen).

    ') + return section(f"🔍 2. Manuelle Prüfung erforderlich ({len(manual)})", + intro + "".join(cards), sev="warn", anchor="manual") + + +def render_internal_reminders(state: dict) -> str: + _, _, internal = _collect_three_buckets(state) + if not internal: + return "" + cards = [_render_finding_card(it, sev_key="info") for it in internal] + intro = ('

    ' + 'Interne Prozesse (TOM, DSFA, AVV, Löschkonzept, Schulungen, ' + 'Incident-Response, VVT) sind von außen nicht prüfbar. ' + 'Dies sind Reminder — kein Befund über die Website. ' + 'Beim Mandanten die Existenz + Aktualität der Dokumente verifizieren.' + '

    ') + return section(f"💼 3. Interne Prozesse — Reminder ({len(internal)})", + intro + "".join(cards), sev="info", anchor="internal") diff --git a/backend-compliance/compliance/services/mail_render_v2/_compose.py b/backend-compliance/compliance/services/mail_render_v2/_compose.py new file mode 100644 index 00000000..502933bf --- /dev/null +++ b/backend-compliance/compliance/services/mail_render_v2/_compose.py @@ -0,0 +1,64 @@ +"""Mail-V2 compose — single entrypoint that returns the full HTML. + +Call `compose_v2(state)` from the email-dispatch phase when +`MAIL_RENDER_V2=true`. Default remains the legacy compose so we can +A/B in Mailpit. +""" + +from __future__ import annotations + +import os + +from ._blocks import ( + render_attachments, + render_caveats, + render_header, + render_per_doc, + render_per_theme, + render_sofortmassnahmen, + render_toc, +) +from ._blocks_findings import ( + render_critical, + render_internal_reminders, + render_manual_review, +) +from ._legacy_wrappers import render_all_legacy +from ._style import page_close, page_open + + +def compose_v2(state: dict) -> str: + """Build the full audit-mail HTML in the V2 layout.""" + site = state.get("site_name") or "—" + parts = [ + page_open(site), + render_header(state), + render_toc(state), + render_critical(state), + render_manual_review(state), + render_internal_reminders(state), + render_sofortmassnahmen(state), + render_per_doc(state), + render_per_theme(state), + # B4 — Cross-Doc Vendor-Consistency (Elli Vertex↔Iadvize pattern) + state.get("vendor_consistency_html", ""), + # B5 — AI-Act Art. 50 Transparenzpflicht + state.get("ai_act_html", ""), + # B6/B7/B8 — DPO-cross-doc + Doc-Staleness + CMP-fingerprint + state.get("extra_findings_html", ""), + # All legacy build_*_html() wrapped in V2 sections — preserves + # every information block from the old renderer (Exec Summary, + # Banner-Screenshot, VVT, Redundancy, Solutions, Diff, etc.) + render_all_legacy(state), + render_caveats(state), + render_attachments(state), + page_close(state.get("check_id", ""), + os.environ.get("BUILD_SHA", "unknown")), + ] + return "".join(p for p in parts if p) + + +def is_v2_enabled() -> bool: + return os.environ.get("MAIL_RENDER_V2", "false").lower() in ( + "true", "1", "yes", "on", + ) diff --git a/backend-compliance/compliance/services/mail_render_v2/_cookie_inventory.py b/backend-compliance/compliance/services/mail_render_v2/_cookie_inventory.py new file mode 100644 index 00000000..e8e9a17c --- /dev/null +++ b/backend-compliance/compliance/services/mail_render_v2/_cookie_inventory.py @@ -0,0 +1,267 @@ +"""Mail-V2 Cookie-Inventar — single table with per-cookie status + action. + +Merges three sources: + + - declared in DSE / cookie-table (state["cmp_vendors"][i]["cookies"]) + - live in browser (state["banner_result"]["cookies_detailed"]) + - cookie_audit comparison (state["cookie_audit"]: declared/undocumented) + +Status hierarchy per cookie: + + UNDOC — in browser, NOT in declared list HIGH + MISMATCH — declared with different category/duration MED + ORPH — declared, NOT in browser LOW + OK — declared + in browser, values agree PASS + +Per-row fields (each `❌` when not ascertainable): + name, vendor, category, duration, retention_grounds, country, + third_country (bool), processing_company, sources, status, action +""" + +from __future__ import annotations + +from html import escape as h + +from ._style import chip + + +# EU + EWR + CH — no third-country transfer. +EU_EEA_CH = { + "DE", "AT", "BE", "BG", "HR", "CY", "CZ", "DK", "EE", "FI", + "FR", "GR", "HU", "IE", "IT", "LV", "LT", "LU", "MT", "NL", + "PL", "PT", "RO", "SK", "SI", "ES", "SE", + "IS", "LI", "NO", "CH", +} +# Adequacy decisions (limited list — most relevant in cookie context). +ADEQUACY = {"US", "UK", "JP", "KR", "IL", "CA", "NZ", "AR", "UY", "AD"} + + +def _norm(s: str | None) -> str: + return (s or "").strip().lower() + + +def _missing(value: str | None) -> bool: + if value is None: + return True + v = str(value).strip() + if not v: + return True + return v.lower() in ("—", "?", "unknown", "n/a", "tbd") + + +def _x_or(value: str | None) -> str: + """Render `❌` when the value is missing, else escape + return.""" + if _missing(value): + return '' + return h(str(value)) + + +def _country_third(country: str | None) -> tuple[str, bool, str | None]: + """Return (display, is_third_country, adequacy_tag). + + is_third_country=True when outside EU/EEA/CH. + adequacy_tag e.g. "DPF" or None. + """ + if _missing(country): + return ("", False, None) + code = (country or "").strip().upper() + # accept "Germany" → "DE" via crude mapping for the most common names + name_map = { + "DEUTSCHLAND": "DE", "GERMANY": "DE", "IRELAND": "IE", "IRLAND": "IE", + "USA": "US", "UNITED STATES": "US", + } + code = name_map.get(code, code) + if code in EU_EEA_CH: + return (code, False, None) + tag = "DPF" if code in ADEQUACY else "RISK" + return (code, True, tag) + + +def _src_chip(in_dse: bool, in_table: bool, in_browser: bool, + in_ocr: bool) -> str: + parts: list[str] = [] + if in_dse: + parts.append("DSE") + if in_table: + parts.append("Tabelle") + if in_ocr: + parts.append("OCR") + if in_browser: + parts.append("Browser") + return " · ".join(parts) if parts else "—" + + +def _build_status(declared: bool, in_browser: bool, + cookie_audit_undeclared: set, + cookie_audit_compliant: set, + name_lc: str) -> tuple[str, str]: + if name_lc in cookie_audit_undeclared or (in_browser and not declared): + return "UNDOC", "fail" + if declared and not in_browser: + return "ORPH", "warn" + if declared and in_browser: + return "OK", "pass" + return "—", "info" + + +def build_cookie_inventory(state: dict) -> tuple[list[dict], dict]: + """Build the merged inventory + summary.""" + cmp_vendors = state.get("cmp_vendors") or [] + banner = state.get("banner_result") or {} + cookies_detailed = banner.get("cookies_detailed") or [] + cookie_audit = state.get("cookie_audit") or {} + + # 1) Declared + declared: dict[str, dict] = {} + for v in cmp_vendors: + vname = (v.get("name") or "").strip() + vcountry = (v.get("country") or "").strip() + vproc = (v.get("processing_company") or "").strip() + vretention = (v.get("persistence") or "").strip() # vendor-level + src = (v.get("source") or "").lower() + in_dse = "dse" in src or "table_crawled" in src + in_table = ("table" in src or "pasted" in src + or "html_table" in src) + in_ocr = "tesseract" in src or "ocr" in src + for c in (v.get("cookies") or []): + cname = (c.get("name") or "").strip() + if not cname: + continue + key = _norm(cname) + entry = declared.setdefault(key, { + "name": cname, + "vendor": vname, + "category": "", + "duration": "", + "retention_grounds": "", + "country": vcountry, + "processing_company": vproc, + "in_dse": False, + "in_table": False, + "in_ocr": False, + }) + entry["category"] = (entry["category"] + or (c.get("category") or "").strip()) + entry["duration"] = (entry["duration"] + or (c.get("duration") + or c.get("persistence") or "").strip()) + # cookie-level overrides if richer + if not entry["country"] and vcountry: + entry["country"] = vcountry + if not entry["processing_company"] and vproc: + entry["processing_company"] = vproc + if not entry["retention_grounds"] and vretention: + entry["retention_grounds"] = vretention + entry["in_dse"] = entry["in_dse"] or in_dse + entry["in_table"] = entry["in_table"] or in_table + entry["in_ocr"] = entry["in_ocr"] or in_ocr + + # 2) Browser + browser: dict[str, dict] = {} + for c in cookies_detailed: + cname = (c.get("name") or "").strip() + if not cname: + continue + browser[_norm(cname)] = c + + # 3) cookie_audit hints + undeclared_set: set = { + _norm((c.get("name") if isinstance(c, dict) else c) or "") + for c in (cookie_audit.get("undeclared_in_browser") or []) + } + compliant_set: set = { + _norm((c.get("name") if isinstance(c, dict) else c) or "") + for c in (cookie_audit.get("compliant") or []) + } + + all_keys = set(declared.keys()) | set(browser.keys()) + rows: list[dict] = [] + for key in sorted(all_keys): + d = declared.get(key) or {} + b = browser.get(key) or {} + name = d.get("name") or b.get("name") or key + vendor = (d.get("vendor") + or b.get("domain") or "").strip() or "" + country = d.get("country", "") + country_display, is_third, adq = _country_third(country) + in_browser = key in browser + is_declared = key in declared + status, sev = _build_status( + is_declared, in_browser, undeclared_set, compliant_set, key, + ) + sources = _src_chip( + d.get("in_dse", False), + d.get("in_table", False), + in_browser, + d.get("in_ocr", False), + ) + rows.append({ + "name": name, + "vendor": vendor, + "category": d.get("category", ""), + "duration": d.get("duration", ""), + "retention_grounds": d.get("retention_grounds", ""), + "country": country_display, + "third_country": is_third, + "third_country_tag": adq, + "processing_company": d.get("processing_company", ""), + "sources": sources, + "status_code": status, + "status_sev": sev, + "declared": is_declared, + "in_browser": in_browser, + }) + + order = {"UNDOC": 0, "MISMATCH": 1, "ORPH": 2, "OK": 3, "—": 4} + rows.sort(key=lambda r: (order.get(r["status_code"], 9), + r["name"].lower())) + + summary = { + "total": len(rows), + "ok": sum(1 for r in rows if r["status_code"] == "OK"), + "undoc": sum(1 for r in rows if r["status_code"] == "UNDOC"), + "orph": sum(1 for r in rows if r["status_code"] == "ORPH"), + "mismatch": sum(1 for r in rows if r["status_code"] == "MISMATCH"), + "declared": sum(1 for r in rows if r["declared"]), + "in_browser": sum(1 for r in rows if r["in_browser"]), + "third_country": sum(1 for r in rows if r["third_country"]), + "missing_country": sum(1 for r in rows if _missing(r["country"])), + "missing_duration": sum(1 for r in rows if _missing(r["duration"])), + } + return rows, summary + + +def render_inventory_rows(rows: list[dict]) -> list[list[str]]: + """Cell-rows for `_style.table`. + + Columns: Name | Vendor | Kat | Speicherdauer | Löschfrist | + Sitzland | Verantwortlich | Quelle | Status + """ + out: list[list[str]] = [] + for r in rows: + country_html = _x_or(r["country"]) + if r["third_country"]: + tag = r.get("third_country_tag") or "RISK" + tag_color = "#92400e" if tag == "DPF" else "#dc2626" + country_html += ( + f' [{tag}]' + ) + out.append([ + f'{h(r["name"])}', + h(r["vendor"]) if r["vendor"] else + '', + _x_or(r["category"]), + _x_or(r["duration"]), + _x_or(r["retention_grounds"]), + country_html, + _x_or(r["processing_company"]), + h(r["sources"]), + chip(r["status_code"], r["status_sev"]), + ]) + return out + + +def inventory_headers() -> list[str]: + return ["Name", "Vendor", "Kat.", "Speicherdauer", "Löschfrist", + "Sitzland", "Verantwortlich", "Quelle", "Status"] diff --git a/backend-compliance/compliance/services/mail_render_v2/_label_norm.py b/backend-compliance/compliance/services/mail_render_v2/_label_norm.py new file mode 100644 index 00000000..c7df7d34 --- /dev/null +++ b/backend-compliance/compliance/services/mail_render_v2/_label_norm.py @@ -0,0 +1,113 @@ +"""Mail-V2 label normalizer — turn MC questions into statements. + +Historic MC labels read like compliance-officer checklists: + "Dokumentiert die Datenschutzinformation alle Datenübermittlungen + gemäß Art. 49 Abs. 1 Unterabs. 2 DS-GVO?" + +In the audit mail that looks like "we don't know" — unhelpful. +This module rewrites the label as a statement of WHAT WAS CHECKED +so the reader gets a topic, not a question: + "Drittland-Übermittlungen Art. 49 Abs. 1 Unterabs. 2 DS-GVO" + +The transformation is purely textual; the underlying MC stays as is. +""" + +from __future__ import annotations + +import re + +# Question-stem → topic-prefix rewrites, applied in order. +_REWRITES: list[tuple[re.Pattern, str]] = [ + (re.compile(r"^Dokumentiert\s+die\s+(.+?)\s+(.+?)\?$", re.IGNORECASE), + r"\2"), + (re.compile(r"^Werden\s+(.+?)\s+dokumentiert\?$", re.IGNORECASE), + r"\1 dokumentieren"), + (re.compile(r"^Wird\s+(.+?)\s+benannt\?$", re.IGNORECASE), + r"\1 benennen"), + (re.compile(r"^Ist\s+(.+?)\s+angegeben\?$", re.IGNORECASE), + r"\1 angeben"), + (re.compile(r"^Enthält\s+(?:die\s+)?(.+?)\s+(.+?)\?$", re.IGNORECASE), + r"\2 in \1"), + (re.compile(r"^Sind\s+(.+?)\s+vorhanden\?$", re.IGNORECASE), + r"\1 prüfen"), + (re.compile(r"^Gibt\s+es\s+(.+?)\?$", re.IGNORECASE), + r"\1 prüfen"), +] + + +def label_as_statement(label: str) -> str: + """Rewrite a question-form label as a topic statement.""" + if not label: + return label + s = label.strip() + if not s.endswith("?"): + return s + for pat, repl in _REWRITES: + m = pat.match(s) + if m: + out = pat.sub(repl, s).strip() + # First word capitalised + return out[:1].upper() + out[1:] if out else s + # Generic fallback: drop the question mark + leading "Wird/Sind/Ist" + s2 = re.sub(r"^\s*(Wird|Sind|Ist|Werden|Gibt es|Enthält|Hat)\s+", + "", s, flags=re.IGNORECASE).rstrip("?") + return s2[:1].upper() + s2[1:] if s2 else s + + +def has_evidence(check) -> bool: + """Decide whether an MC check has real evidence backing the FAIL. + + A FAIL with non-empty `matched_text` (the regex/LLM did find a + string and judged it insufficient) is a hard fail. A FAIL with + empty matched_text is more like 'we could not confirm' → that + belongs in the manual-review bucket, not in critical findings. + """ + matched = getattr(check, "matched_text", "") or "" + return bool(matched.strip()) + + +# Keywords that indicate a check is about an INTERNAL process the +# auditor cannot observe from outside (TOM, DSFA, AVV, training, +# incident response, risk analysis, deletion concept). These are +# never findings — they are reminders that the DPO/DSB must verify +# the document/process exists internally. +_INTERNAL_KEYWORDS = ( + "tom", "technisch-organisatorische", "technisch organisatorische", + "dsfa", "datenschutz-folgenabschätzung", + "datenschutzfolgenabschätzung", + "schulung", "training", "awareness", + "avv", "auftragsverarbeitungsvertrag", "auftragsverarbeitung", + "incident", "vorfall", "meldepflicht intern", + "risikoanalyse", "risikobewertung", "risk assessment", + "löschkonzept", "löschfristen-konzept", + "vvt", "verzeichnis der verarbeitungstätigkeiten", + "dsb-bestellung", "dsb bestellung", + "verfahrensverzeichnis", "berichtigungskonzept", + "betroffenenrechte-prozess", "dsr-prozess", +) + + +def is_internal_process(check) -> bool: + """Decide whether the MC check is about an internal process.""" + label = (getattr(check, "label", "") or "").lower() + cid = (getattr(check, "id", "") or "").lower() + hint = (getattr(check, "hint", "") or "").lower() + # mc_audit_type module may have annotated the check + audit_type = getattr(check, "audit_type", "") + if audit_type and audit_type in ("internal", "process", "documentation"): + return True + hay = f"{label} {cid} {hint}" + return any(k in hay for k in _INTERNAL_KEYWORDS) + + +def classify_check(check) -> str: + """Return one of: 'hard_fail' | 'manual_review' | 'internal_reminder'. + + Only call on FAIL checks (passed=False, skipped=False). Drives + which bucket the check renders into. + """ + if is_internal_process(check): + return "internal_reminder" + if has_evidence(check): + return "hard_fail" + return "manual_review" diff --git a/backend-compliance/compliance/services/mail_render_v2/_legacy_wrappers.py b/backend-compliance/compliance/services/mail_render_v2/_legacy_wrappers.py new file mode 100644 index 00000000..bc1a6b40 --- /dev/null +++ b/backend-compliance/compliance/services/mail_render_v2/_legacy_wrappers.py @@ -0,0 +1,446 @@ +"""Mail-V2 legacy wrappers — wrap each existing build_*_html() in V2 shell. + +The original step-5 had 24+ render functions, each emitting standalone +HTML with their own styles. V2 keeps all the information by wrapping +each output in a consistent V2 `section()` container with stripe + +palette. The block-level styling normalizes; the inner data tables/ +lists keep their legacy markup so we don't lose detail. + +Each wrapper is defensive: missing data, import errors, or empty +HTML → return "" so the section disappears rather than crashing. +""" + +from __future__ import annotations + +import logging + +from ._style import section + +logger = logging.getLogger(__name__) + + +def _safe_wrap(label: str, anchor: str, html: str, + *, sev: str = "info") -> str: + if not html or not html.strip(): + return "" + return section(label, html, sev=sev, anchor=anchor) + + +# ── Tier 1 (Sales-critical) ────────────────────────────────────── + +def render_executive_summary(state: dict) -> str: + """P82 GF-1-Pager + P1 Exec-Summary combined as 'Executive Summary'.""" + parts: list[str] = [] + req = state.get("req") + try: + from compliance.services.gf_one_pager import build_gf_one_pager_html + html = build_gf_one_pager_html( + site_name=state.get("site_name") or "", + scorecard=state.get("scorecard") or {}, + previous_scorecard=state.get("prev_scorecard"), + banner_result=state.get("banner_result"), + library_mismatch_findings=state.get("mismatches") or [], + scan_context=getattr(req, "scan_context", None) if req else None, + audit_quality_findings=state.get("audit_quality_findings") or [], + ) + if html and html.strip(): + parts.append(html) + except Exception as e: + logger.warning("gf_one_pager wrapper: %s", e) + try: + from compliance.api.agent_doc_check_exec_summary import ( + build_exec_summary_html, + ) + html = build_exec_summary_html( + scorecard=state.get("scorecard") or {}, + previous_scorecard=state.get("prev_scorecard"), + cmp_vendors=state.get("cmp_vendors") or [], + redundancy_report=state.get("redundancy_report"), + site_name=state.get("site_name") or "", + ) + if html and html.strip(): + parts.append(html) + except Exception as e: + logger.warning("exec_summary wrapper: %s", e) + return _safe_wrap("💼 Executive Summary", "exec", + "".join(parts), sev="info") + + +def render_banner_screenshot(state: dict) -> str: + """P85 — Banner-Screenshot as visual proof.""" + try: + from compliance.services.banner_screenshot_block import ( + build_banner_screenshot_html, + ) + html = build_banner_screenshot_html(state.get("banner_result")) + return _safe_wrap("📸 Banner-Screenshot", "banner-shot", + html, sev="info") + except Exception as e: + logger.warning("banner_screenshot wrapper: %s", e) + return "" + + +def render_vvt(state: dict) -> str: + """VVT-Tabelle nach Art. 30 DSGVO — Verarbeitungstätigkeiten.""" + try: + from compliance.api.agent_doc_check_extras import ( + build_vvt_table_html, + ) + html = build_vvt_table_html(state.get("cmp_vendors") or []) + return _safe_wrap("📋 VVT — Verarbeitungstätigkeiten (Art. 30 DSGVO)", + "vvt", html, sev="info") + except Exception as e: + logger.warning("vvt wrapper: %s", e) + return "" + + +def render_redundancy(state: dict) -> str: + """O4 — Vendor-Redundanz + EU-Alternativen + Cost-Savings.""" + try: + from compliance.api.agent_doc_check_redundancy import ( + build_redundancy_html, + ) + html = build_redundancy_html(state.get("redundancy_report")) + return _safe_wrap("💰 Optimierungspotenzial (Redundanz / EU-Alt.)", + "redundancy", html, sev="warn") + except Exception as e: + logger.warning("redundancy wrapper: %s", e) + return "" + + +def render_diff(state: dict) -> str: + """P84 — Diff-Mode: Veränderung seit letztem Lauf.""" + try: + from compliance.services.run_diff import ( + build_diff_block_html, compute_diff, + ) + from database import SessionLocal + db = SessionLocal() + try: + diff = compute_diff( + db, state["check_id"], state.get("domain_for_exec") or "", + state.get("banner_result"), state.get("scorecard"), + ) + html = build_diff_block_html(diff) if diff else "" + finally: + db.close() + return _safe_wrap("📊 Veränderung seit letztem Lauf", + "diff", html, sev="info") + except Exception as e: + logger.warning("diff wrapper: %s", e) + return "" + + +def render_scope_disclaimer(state: dict) -> str: + """P62 — Was wir prüfen, was wir nicht prüfen können.""" + try: + from compliance.api.scope_disclaimer import build_scope_disclaimer_html + html = build_scope_disclaimer_html() + return _safe_wrap("🔍 Prüfumfang & Methodische Hinweise", + "scope", html, sev="info") + except Exception as e: + logger.warning("scope_disclaimer wrapper: %s", e) + return "" + + +# ── Tier 2 (Audit-detail) ───────────────────────────────────────── + +def render_banner_deep(state: dict) -> str: + """Banner-Deep: Phases + Quality-Score + Per-Category-Tracker.""" + try: + from compliance.api.agent_doc_check_banner import ( + build_banner_deep_html, + ) + html = build_banner_deep_html(state.get("banner_result")) + return _safe_wrap("🍪 Banner-Tiefenanalyse (Phasen + Kategorien)", + "banner-deep", html, sev="info") + except Exception as e: + logger.warning("banner_deep wrapper: %s", e) + return "" + + +def render_cookie_audit(state: dict) -> str: + """Cookie 3-Quellen-Audit (deklariert ↔ Browser ↔ Library).""" + try: + from compliance.services.cookie_compliance_audit import ( + build_cookie_audit_block_html, + ) + html = build_cookie_audit_block_html(state.get("cookie_audit") or {}) + return _safe_wrap("🔬 Cookie-Audit (3-Quellen-Vergleich)", + "cookie-audit", html, sev="warn") + except Exception as e: + logger.warning("cookie_audit wrapper: %s", e) + return "" + + +def render_solutions(state: dict) -> str: + """P73 — LLM-Lösungsvorschläge pro HIGH-Fail.""" + try: + from compliance.services.mc_solution_generator import ( + build_solutions_block_html, + ) + html = build_solutions_block_html(state.get("mc_solutions") or []) + return _safe_wrap("🎯 LLM-Lösungsvorschläge (P73)", + "solutions", html, sev="info") + except Exception as e: + logger.warning("solutions wrapper: %s", e) + return "" + + +def render_cookie_architecture(state: dict) -> str: + """P10 — Cookie-Policy-Architecture (BMW-Pattern, layered separation).""" + try: + from compliance.services.cookie_policy_architecture import ( + build_architecture_html, + ) + html = build_architecture_html(state.get("cookie_architecture") or {}) + return _safe_wrap("🏗 Cookie-Policy-Architektur", + "cookie-arch", html, sev="info") + except Exception as e: + logger.warning("cookie_architecture wrapper: %s", e) + return "" + + +def render_library_mismatch(state: dict) -> str: + """P102 — Cookie-Klassifikations-Pruefung gegen Library.""" + try: + from compliance.services.cookie_library_mismatch import ( + build_mismatch_block_html, + ) + html = build_mismatch_block_html(state.get("mismatches") or []) + return _safe_wrap("⚖️ Cookie-Klassifikation gegen Library (P102)", + "lib-mismatch", html, sev="warn") + except Exception as e: + logger.warning("library_mismatch wrapper: %s", e) + return "" + + +def render_banner_consistency(state: dict) -> str: + """P92/P94 — Banner-Konsistenz / CMP-Health.""" + try: + from compliance.services.banner_consistency_checks import ( + build_consistency_block_html, + ) + html = build_consistency_block_html( + state.get("consistency_findings") or []) + return _safe_wrap("🧩 Banner-Konsistenz + CMP-Health", + "banner-consistency", html, sev="warn") + except Exception as e: + logger.warning("banner_consistency wrapper: %s", e) + return "" + + +def render_signals(state: dict) -> str: + """P35/P77/P78 — Save-Label, Cookies-in-DSE, JC-Klausel.""" + try: + from compliance.services.doc_text_signals import ( + build_signals_block_html, + ) + html = build_signals_block_html(state.get("signal_findings") or []) + return _safe_wrap("🚩 Doc-Text-Signale (P35/P77/P78)", + "signals", html, sev="info") + except Exception as e: + logger.warning("signals wrapper: %s", e) + return "" + + +def render_scorecard_regulation(state: dict) -> str: + """MC-Scorecard per Regulation (DSGVO/TDDDG/BGB-Split).""" + try: + from compliance.api.agent_doc_check_scorecard import ( + build_scorecard_html, + ) + html = build_scorecard_html( + state.get("scorecard") or {}, + previous_scorecard=state.get("prev_scorecard"), + ) + return _safe_wrap("📊 Compliance-Scorecard pro Regulation", + "scorecard", html, sev="info") + except Exception as e: + logger.warning("scorecard wrapper: %s", e) + return "" + + +def render_profile_html(state: dict) -> str: + """Erkanntes Geschäftsmodell.""" + try: + from compliance.api.agent_doc_check_report import build_profile_html + html = build_profile_html(state.get("profile")) + return _safe_wrap("🏢 Erkanntes Geschäftsmodell", + "profile", html, sev="info") + except Exception as e: + logger.warning("profile wrapper: %s", e) + return "" + + +def render_input_warnings(state: dict) -> str: + """Doc-Input-Warnings: User Text in falsches Feld gepastet.""" + try: + from compliance.services.doc_input_warnings import ( + build_warnings_block_html, + ) + warns = state.get("input_warnings") or [] + html = build_warnings_block_html(warns) if warns else "" + return _safe_wrap("⚠️ Eingabe-Warnungen", + "input-warn", html, sev="warn") + except Exception as e: + logger.warning("input_warnings wrapper: %s", e) + return "" + + +# ── Tier 3 (Cookie-deep + advisory) ─────────────────────────────── + +def render_entropy(state: dict) -> str: + """P103 — Cookie-Value-Entropy.""" + try: + from compliance.services.cookie_value_entropy import ( + build_entropy_block_html, + ) + html = build_entropy_block_html(state.get("entropy_findings") or []) + return _safe_wrap("🎲 Cookie-Entropy-Anomalien (P103)", + "entropy", html, sev="info") + except Exception as e: + logger.warning("entropy wrapper: %s", e) + return "" + + +def render_network_trace(state: dict) -> str: + """P104 — Network-Tracing.""" + try: + from compliance.services.cookie_network_tracer import ( + build_network_trace_block_html, + ) + html = build_network_trace_block_html( + state.get("network_findings") or []) + return _safe_wrap("🌐 Network-Tracing (P104)", + "network", html, sev="info") + except Exception as e: + logger.warning("network_trace wrapper: %s", e) + return "" + + +def render_tcf_authority(state: dict) -> str: + """P105 — IAB TCF Authority Cross-Reference.""" + try: + from compliance.services.tcf_vendor_authority import ( + build_tcf_authority_block_html, + ) + html = build_tcf_authority_block_html( + state.get("tcf_authority_findings") or []) + return _safe_wrap("🆔 IAB TCF Vendor Authority (P105)", + "tcf-auth", html, sev="info") + except Exception as e: + logger.warning("tcf_authority wrapper: %s", e) + return "" + + +def render_jc_avv(state: dict) -> str: + """P71 — JC-vs-AVV Entscheidungsbaum.""" + try: + from compliance.services.jc_avv_decision import ( + build_jc_avv_decision_html, + ) + html = build_jc_avv_decision_html( + (state.get("doc_texts") or {}).get("dse")) + return _safe_wrap("⚖️ Joint Controller vs. AVV — Entscheidung (P71)", + "jc-avv", html, sev="info") + except Exception as e: + logger.warning("jc_avv wrapper: %s", e) + return "" + + +def render_industry_context(state: dict) -> str: + """P6/53/55 — Branchen-Kontext + Site-History.""" + try: + from compliance.services.industry_library import ( + build_industry_context_block_html, + ) + ind = None + req = state.get("req") + if req and getattr(req, "scan_context", None): + ind = req.scan_context.get("industry") + html = build_industry_context_block_html( + ind, state.get("site_profile")) + return _safe_wrap("🏭 Branchen-Kontext + Historie", + "industry", html, sev="info") + except Exception as e: + logger.warning("industry_context wrapper: %s", e) + return "" + + +def render_benchmark(state: dict) -> str: + """P86 — Branchen-Benchmark.""" + try: + from compliance.services.industry_benchmark import ( + build_benchmark_html, + ) + html = build_benchmark_html(state.get("benchmark") or {}) + return _safe_wrap("📈 Branchen-Benchmark (P86)", + "bench", html, sev="info") + except Exception as e: + logger.warning("benchmark wrapper: %s", e) + return "" + + +def render_scanned_urls(state: dict) -> str: + """Quellen-Transparenz: welche URLs wurden gecrawlt.""" + try: + from compliance.api.agent_doc_check_report import ( + build_scanned_urls_html, + ) + html = build_scanned_urls_html(state.get("doc_entries") or []) + return _safe_wrap("🔗 Geprüfte URLs (Quellen-Transparenz)", + "scanned-urls", html, sev="info") + except Exception as e: + logger.warning("scanned_urls wrapper: %s", e) + return "" + + +def render_management_summary(state: dict) -> str: + """Konkrete Aufgaben für die Geschäftsführung.""" + try: + from compliance.api.agent_doc_check_report import ( + build_management_summary, + ) + html = build_management_summary(state.get("results") or []) + return _safe_wrap("📝 Management-Zusammenfassung", + "mgmt", html, sev="info") + except Exception as e: + logger.warning("management_summary wrapper: %s", e) + return "" + + +# ── Render the whole legacy block region ──────────────────────── + +def render_all_legacy(state: dict) -> str: + """Render every legacy block in the canonical order.""" + return "".join([ + # Tier 1 (Sales) + render_executive_summary(state), + render_diff(state), + render_solutions(state), + render_redundancy(state), + render_vvt(state), + render_banner_screenshot(state), + # Tier 2 (Audit-detail) + render_scorecard_regulation(state), + render_banner_deep(state), + render_banner_consistency(state), + render_cookie_audit(state), + render_cookie_architecture(state), + render_library_mismatch(state), + render_signals(state), + render_profile_html(state), + render_input_warnings(state), + # Tier 3 (advisory) + render_entropy(state), + render_network_trace(state), + render_tcf_authority(state), + render_jc_avv(state), + render_industry_context(state), + render_benchmark(state), + render_scanned_urls(state), + render_management_summary(state), + # Scope-Disclaimer last — footer-ish + render_scope_disclaimer(state), + ]) diff --git a/backend-compliance/compliance/services/mail_render_v2/_scope_filter.py b/backend-compliance/compliance/services/mail_render_v2/_scope_filter.py new file mode 100644 index 00000000..80f64ab2 --- /dev/null +++ b/backend-compliance/compliance/services/mail_render_v2/_scope_filter.py @@ -0,0 +1,88 @@ +"""Mail-V2 scope filter — drop MC findings that don't apply. + +Some MC-DB entries are sector-specific (FIN = financial services, +GOV = public authority, MED = healthcare, INS = insurance, EDU = +education, LEG = legal profession). They have no business surfacing +for a normal B2C company like Elli (energy/EV charging). + +This filter inspects the MC ID prefix and, when the prefix denotes +a sector that doesn't match the detected `business_scope`, drops the +check from the V2 finding renderers. + +The MC pipeline itself is unchanged — MCs are still evaluated; we +just suppress them in the report when out of scope. Set +`KEEP_OOS_MCS=true` in the env to disable the filter (useful for +DSB debug runs). +""" + +from __future__ import annotations + +import os + +# Prefix -> sector token expected in business_scope to KEEP the check. +SECTOR_PREFIXES: dict[str, set[str]] = { + "FIN": {"financial_services", "bank", "bafin", "fintech", + "payment_provider"}, + "GOV": {"public_authority", "government", "behoerde"}, + "MED": {"healthcare", "medical", "pharma", "klinik"}, + "INS": {"insurance", "versicherung"}, + "EDU": {"education", "schule", "hochschule", "university"}, + "LEG": {"legal_profession", "anwaltskammer", "kanzlei"}, + "REL": {"church", "religion", "religious"}, + "POL": {"political_party", "partei"}, +} + +# Cheap counter so the renderer can show "X MCs gefiltert (out of scope)". +_LAST_DROPPED: dict[str, int] = {"count": 0, "by_prefix": {}} + + +def _enabled() -> bool: + return os.environ.get("KEEP_OOS_MCS", "false").lower() not in ( + "true", "1", "yes", "on", + ) + + +def _extract_prefix(check_id: str) -> str | None: + """Return the sector prefix (e.g. 'FIN') from mc-FIN-814-A03.""" + if not check_id: + return None + parts = check_id.split("-") + # mc-XXX-NNN-AYY → parts = ["mc", "XXX", "NNN", "AYY"] + if len(parts) >= 2 and parts[0].lower() == "mc": + prefix = parts[1].upper() + if prefix in SECTOR_PREFIXES: + return prefix + return None + + +def is_out_of_scope(check, business_scope: set[str] | None) -> bool: + """Decide whether the check is sector-specific AND out of scope.""" + if not _enabled(): + return False + prefix = _extract_prefix(getattr(check, "id", "") or "") + if not prefix: + return False + required = SECTOR_PREFIXES.get(prefix) or set() + scope_lc = {s.lower() for s in (business_scope or set())} + return not (scope_lc & required) + + +def filter_out_of_scope(checks, business_scope: set[str] | None) -> list: + """Return `checks` with out-of-scope items removed; mutates counter.""" + _LAST_DROPPED["count"] = 0 + _LAST_DROPPED["by_prefix"] = {} + out = [] + for c in checks: + if is_out_of_scope(c, business_scope): + _LAST_DROPPED["count"] += 1 + prefix = _extract_prefix(getattr(c, "id", "") or "") or "?" + _LAST_DROPPED["by_prefix"][prefix] = ( + _LAST_DROPPED["by_prefix"].get(prefix, 0) + 1 + ) + continue + out.append(c) + return out + + +def get_last_drop_stats() -> dict: + return dict(_LAST_DROPPED) diff --git a/backend-compliance/compliance/services/mail_render_v2/_style.py b/backend-compliance/compliance/services/mail_render_v2/_style.py new file mode 100644 index 00000000..e0011896 --- /dev/null +++ b/backend-compliance/compliance/services/mail_render_v2/_style.py @@ -0,0 +1,200 @@ +"""Mail-V2 style system — single source of truth for all visual props. + +Email rendering = inline styles only (most clients strip