diff --git a/backend-compliance/compliance/api/agent_check/_b22_wiring.py b/backend-compliance/compliance/api/agent_check/_b22_wiring.py new file mode 100644 index 00000000..bf818b7d --- /dev/null +++ b/backend-compliance/compliance/api/agent_check/_b22_wiring.py @@ -0,0 +1,59 @@ +"""B22 wiring — Cross-Domain-Legal-Doc-Detector.""" + +from __future__ import annotations + +import html +import logging + +from compliance.services.cross_domain_doc_check import check_cross_domain_docs + +logger = logging.getLogger(__name__) + + +def run_b22(state: dict) -> None: + new = check_cross_domain_docs(state) + if not new: + return + extras = state.get("extra_findings") or [] + extras.extend(new) + state["extra_findings"] = extras + state["cross_domain_doc_html"] = _render(new) + logger.info("B22 cross-domain: %d finding(s)", len(new)) + + +def _render(findings: list[dict]) -> str: + cards = [] + for f in findings: + sev = (f.get("severity") or "").upper() + color = ("#dc2626" if sev == "HIGH" + else "#f59e0b" if sev == "MEDIUM" else "#64748b") + cards.append( + f"
" + f"
" + f"{sev} · {html.escape(f.get('check_id') or '')}
" + f"
" + f"{html.escape(f.get('title') or '')}
" + f"
" + f"{html.escape(f.get('norm') or '')}
" + f"
" + f"{html.escape(f.get('evidence') or '')}
" + f"
" + f"→ Empfehlung: " + f"{html.escape(f.get('recommended_action') or '')}
" + "
" + ) + return ( + "
" + f"

" + f"🌐 Vertragsdoc auf Fremd-Domain ({len(findings)} Fall(e))" + "

" + "

" + "Vertragsrelevante Dokumente liegen auf einer anderen Second-Level-" + "Domain als die Site. AVV-Pflicht + URL-Stabilitäts-Risiko." + "

" + + "".join(cards) + + "
" + ) diff --git a/backend-compliance/compliance/api/agent_check/_orchestrator.py b/backend-compliance/compliance/api/agent_check/_orchestrator.py index 0c051fc2..48d58805 100644 --- a/backend-compliance/compliance/api/agent_check/_orchestrator.py +++ b/backend-compliance/compliance/api/agent_check/_orchestrator.py @@ -31,6 +31,7 @@ from ._b17_wiring import run_b17 from ._b18_wiring import run_b18 from ._b19_wiring import run_b19 from ._b20_wiring import run_b20 +from ._b22_wiring import run_b22 from ._constants import _compliance_check_jobs from ._phase_a_resolve import run_phase_a from ._phase_b_profile_check import run_phase_b @@ -96,6 +97,7 @@ async def run_compliance_check(check_id: str, req) -> None: await run_b18(state) # Impressum-Specialist-Agent (Pattern+LLM) run_b19(state) # Cookie-Coherence (Salesforce-as-essential) await run_b20(state) # Legacy-URL-Discovery (Sitemap+Wayback) + run_b22(state) # Cross-Domain-Legal-Doc-Hosting (Elli/LogPay) # Phase D-3 top/mid/bot: Step 5 HTML blocks await run_phase_d3_top(state) await run_phase_d3_mid(state) diff --git a/backend-compliance/compliance/services/cross_domain_doc_check.py b/backend-compliance/compliance/services/cross_domain_doc_check.py new file mode 100644 index 00000000..773812a6 --- /dev/null +++ b/backend-compliance/compliance/services/cross_domain_doc_check.py @@ -0,0 +1,151 @@ +"""B22 — Cross-Domain-Legal-Doc-Detector. + +Erkennt: vertragsrelevante Dokumente (AGB, DSE, Widerrufsbelehrung, +Nutzungsbedingungen) liegen auf einer anderen Second-Level-Domain als +die Site selbst. Beispiel Elli/LogPay: AGB von Elli (elli.eco) liegt +auf docs.logpay.de. + +Norm-Argument: + - DSGVO Art. 28: das Hosten von Vertragsdokumenten durch einen + Dritten ist Auftragsverarbeitung — AVV-Pflicht. + - DSGVO Art. 13 Abs. 1 lit. e: Empfänger / Auftragsverarbeiter + müssen in der DSE benannt sein. + - Vertragsrechtlich: AGB-Verbindlichkeit wackelig wenn der + Dokumenten-Host wechselt — was passiert wenn der externe Host + den Pfad ändert (Cool-URLs-Problem § 312i BGB). + +Severity: + - HIGH bei AGB / Widerrufsbelehrung (vertragsrelevant) + - MEDIUM bei DSE / Nutzungsbedingungen + - INFO bei Cookie-Policy / Impressum (eher Best-Practice) +""" + +from __future__ import annotations + +import logging +from urllib.parse import urlparse + +logger = logging.getLogger(__name__) + + +_COMPOUND_TLDS = { + "co.uk", "co.jp", "co.nz", "co.kr", "co.za", "co.in", + "com.au", "com.br", "com.mx", "com.tr", "com.sg", +} + + +_SEVERITY_BY_DOC = { + "agb": "HIGH", + "widerruf": "HIGH", + "dse": "MEDIUM", + "nutzungsbedingungen": "MEDIUM", + "cookie": "INFO", + "impressum": "INFO", + "social_media": "INFO", +} + + +def _sld(host: str) -> str: + """Extract the second-level domain. Handles compound TLDs.""" + if not host: + return "" + host = host.lower().lstrip("www.") + parts = host.split(".") + if len(parts) < 2: + return host + if len(parts) >= 3 and ".".join(parts[-2:]) in _COMPOUND_TLDS: + return parts[-3] + return parts[-2] + + +def _site_origin_sld(state: dict) -> str: + """Find the primary site SLD by counting most common host in + submitted URLs.""" + counter: dict[str, int] = {} + for e in (state.get("doc_entries") or []): + url = (e.get("url") or "").strip() + if not url or "://" not in url: + continue + # Skip auto-discovered docs (they may already be cross-domain + # by design — we want the USER's stated origin). + if e.get("auto_discovered"): + continue + try: + host = urlparse(url).netloc + sld = _sld(host) + if sld: + counter[sld] = counter.get(sld, 0) + 1 + except Exception: + continue + if not counter: + # Fallback: use any URL + for e in (state.get("doc_entries") or []): + url = (e.get("url") or "").strip() + if url and "://" in url: + return _sld(urlparse(url).netloc) + return "" + return max(counter, key=counter.get) + + +def check_cross_domain_docs(state: dict) -> list[dict]: + """Emit findings for doc_entries whose URL has a different SLD + than the site origin.""" + primary = _site_origin_sld(state) + if not primary: + return [] + findings: list[dict] = [] + for e in (state.get("doc_entries") or []): + url = (e.get("url") or "").strip() + doc_type = (e.get("doc_type") or "").lower() + if not url or "://" not in url: + continue + try: + host = urlparse(url).netloc + url_sld = _sld(host) + except Exception: + continue + if not url_sld or url_sld == primary: + continue + # Cross-Domain detected + severity = _SEVERITY_BY_DOC.get(doc_type, "MEDIUM") + doc_label = { + "agb": "Allgemeine Geschäftsbedingungen", + "widerruf": "Widerrufsbelehrung", + "dse": "Datenschutzerklärung", + "nutzungsbedingungen": "Nutzungsbedingungen", + "cookie": "Cookie-Richtlinie", + "impressum": "Impressum", + "social_media": "Social-Media-Hinweise", + }.get(doc_type, doc_type.upper()) + findings.append({ + "check_id": "CROSS-DOMAIN-DOC-001", + "severity": severity, + "severity_reason": "third_party_hosted", + "doc_type": doc_type, + "site_sld": primary, + "host_sld": url_sld, + "url": url, + "title": ( + f"{doc_label} liegt auf Drittanbieter-Domain " + f"({host}) statt {primary}" + ), + "norm": ( + "DSGVO Art. 28 (AVV) + Art. 13 Abs. 1 lit. e (Empfänger) + " + "§ 312i BGB (Cool-URLs / Vertragspflicht)" + ), + "evidence": ( + f"Site-Origin: {primary} · " + f"Dokument gehostet auf: {host} · " + f"URL: {url[:120]}" + ), + "recommended_action": ( + f"Entweder das Dokument auf eigene Domain ({primary}) " + "migrieren ODER (a) den externen Host {host} als " + "Auftragsverarbeiter in der DSE benennen, (b) AVV " + "abschließen, (c) sicherstellen dass URL-Stabilität " + f"vertraglich garantiert ist (§ 312i BGB Cool-URL-Pflicht)." + ), + }) + if findings: + logger.info("B22 cross-domain: %d finding(s)", len(findings)) + return findings diff --git a/backend-compliance/compliance/services/mail_render_v2/_compose.py b/backend-compliance/compliance/services/mail_render_v2/_compose.py index c1f95537..27b65032 100644 --- a/backend-compliance/compliance/services/mail_render_v2/_compose.py +++ b/backend-compliance/compliance/services/mail_render_v2/_compose.py @@ -27,59 +27,85 @@ from ._vendor_cards import ( render_info_box_rechtsrahmen, render_vendor_cards, ) +from ._executive_summary import collapsible, render_executive_summary from ._legacy_wrappers import render_all_legacy from ._style import page_close, page_open def compose_v2(state: dict) -> str: - """Build the full audit-mail HTML in the V2 layout.""" + """Build the full audit-mail HTML in the V2 layout. + + Struktur: + 1. Header (Site-Name + Datum) + 2. Executive Summary (Compliance-Score + Top-3 + Cookie-Stats) + 3. Critical Findings (immer offen, max 5) + 4. Alle anderen Sektionen als
-Akkordeons (kollabiert) + 5. Caveats + Attachments + Page-Close + """ site = state.get("site_name") or "—" parts = [ page_open(site), render_header(state), - render_info_box_rechtsrahmen(), - render_toc(state), - render_vendor_cards( - state.get("cmp_vendors") or [], - state.get("cookie_coherence_findings") or [], - ), + render_executive_summary(state), + # IMMER OFFEN: kritische Findings + Sofortmaßnahmen render_critical(state), - render_manual_review(state), - render_internal_reminders(state), render_sofortmassnahmen(state), - render_per_doc(state), - render_per_theme(state), - # B4 — Cross-Doc Vendor-Consistency (Elli Vertex↔Iadvize pattern) - state.get("vendor_consistency_html", ""), - # B5 — AI-Act Art. 50 Transparenzpflicht - state.get("ai_act_html", ""), - # B6/B7/B8/B9/B10 — DPO + Staleness + CMP + MultiEntity + Transfer - state.get("extra_findings_html", ""), - # B12 Chatbot-Cookie-Klassifikation - state.get("chatbot_cookie_html", ""), - # B13 Widerrufsbelehrung-Reachability (B2C-Pflicht) - state.get("widerruf_reach_html", ""), - # B14 Widersprüchliche Speicherdauer im selben Doc - state.get("retention_conflict_html", ""), - # B15 AI-Act Rechtsgrundlage (LLM-Vendor auf lit. f) - state.get("ai_legal_basis_html", ""), - # B16 Footer-Label-vs-URL-Slug-Drift (SEO / Bookmarks) - state.get("url_slug_drift_html", ""), - # B17 Audit-Walk-Video (Beweis-Aufzeichnung) - state.get("audit_walk_html", ""), - # B18 Impressum-Specialist-Agent (Pattern + LLM) - state.get("impressum_agent_html", ""), - # B19 Cookie-Coherence-Check (Salesforce-as-essential etc.) - state.get("cookie_coherence_html", ""), - # B20 Legacy-URL-Discovery + Multi-Version-DSE-Vergleich - state.get("multi_version_dse_html", ""), - state.get("legacy_url_html", ""), - # Browser-Matrix (Stage 1.c) - state.get("browser_matrix_html", ""), - # All legacy build_*_html() wrapped in V2 sections — preserves - # every information block from the old renderer (Exec Summary, - # Banner-Screenshot, VVT, Redundancy, Solutions, Diff, etc.) - render_all_legacy(state), + + # AKKORDEON-Sektionen (kollabiert, Reviewer öffnet selektiv) + collapsible("🍪 Cookie-Inventar (alle deklarierten + im Browser)", + state.get("cookie_inventory_html", "") + + _render_per_theme_inventory_only(state)), + collapsible("🏷️ Vendor-Übersicht (aggregiert nach Anbieter)", + render_vendor_cards( + state.get("cmp_vendors") or [], + state.get("cookie_coherence_findings") or [], + )), + collapsible("🍪 Cookie-Kohärenz (Salesforce-Pattern, Pseudo-Zwecke)", + state.get("cookie_coherence_html", "")), + collapsible("💬 Chatbot-Cookie-Klassifikation", + state.get("chatbot_cookie_html", "")), + collapsible("📜 Widerrufsbelehrung-Reachability (B2C)", + state.get("widerruf_reach_html", "")), + collapsible("⏱️ Widersprüchliche Speicherdauer", + state.get("retention_conflict_html", "")), + collapsible("🤖 AI-Act Rechtsgrundlage (LLM-Vendor)", + state.get("ai_legal_basis_html", "")), + collapsible("🔗 URL-Slug-Drift (SEO / Bookmarks)", + state.get("url_slug_drift_html", "")), + collapsible("🎥 Audit-Walk-Video (Beweis-Aufzeichnung)", + state.get("audit_walk_html", "")), + collapsible("🤖 Impressum-Agent (Pattern + LLM)", + state.get("impressum_agent_html", "")), + collapsible("📑 Mehrere DSE-Versionen erkannt", + state.get("multi_version_dse_html", "")), + collapsible("🗂️ Legacy-URL-Inventar", + state.get("legacy_url_html", "")), + collapsible("🌐 Vertragsdoc auf Fremd-Domain (Cross-Domain)", + state.get("cross_domain_doc_html", "")), + collapsible("🔍 Cross-Doc Vendor-Konsistenz", + state.get("vendor_consistency_html", "")), + collapsible("⚖️ AI-Act Art. 50 Transparenzpflicht", + state.get("ai_act_html", "")), + collapsible("📌 Cross-Doc-Befunde (DPO, Staleness, CMP, Transfer)", + state.get("extra_findings_html", "")), + collapsible("🌐 Browser-Matrix (per-Browser-Verhalten)", + state.get("browser_matrix_html", "")), + collapsible("📋 Manuell zu prüfen", + render_manual_review(state)), + collapsible("🔧 Interne Erinnerungen", + render_internal_reminders(state)), + collapsible("📄 Per-Dokument-Befunde", + render_per_doc(state)), + collapsible("🧩 Per-Thema-Übersicht (Sub-Sektionen)", + render_per_theme(state)), + collapsible("📚 Rechtsrahmen-Info (Art. 13 DSGVO, § 25 TDDDG, …)", + render_info_box_rechtsrahmen()), + collapsible("📑 Inhaltsverzeichnis (alt)", + render_toc(state)), + collapsible("🗃️ Vollständige Legacy-Blöcke (Banner-Screenshot, " + "VVT, Redundancy, Solutions, Diff)", + render_all_legacy(state)), + render_caveats(state), render_attachments(state), page_close(state.get("check_id", ""), @@ -88,6 +114,17 @@ def compose_v2(state: dict) -> str: return "".join(p for p in parts if p) +def _render_per_theme_inventory_only(state: dict) -> str: + """Extrahiert nur die Cookie-Inventar-Tabelle aus per_theme (die + 742er-Tabelle). per_theme rendert sonst ALL themes — wir wollen + hier nur das Inventory-Theme.""" + try: + from ._blocks import render_theme_cookie_inventory + return render_theme_cookie_inventory(state) + except Exception: + return "" + + def is_v2_enabled() -> bool: return os.environ.get("MAIL_RENDER_V2", "false").lower() in ( "true", "1", "yes", "on", diff --git a/backend-compliance/compliance/services/mail_render_v2/_executive_summary.py b/backend-compliance/compliance/services/mail_render_v2/_executive_summary.py new file mode 100644 index 00000000..e2ee43f3 --- /dev/null +++ b/backend-compliance/compliance/services/mail_render_v2/_executive_summary.py @@ -0,0 +1,175 @@ +"""Executive Summary für die V2-Audit-Mail. + +Sitzt ganz oben in der Mail. Reviewer sieht in ≤ 15 Zeilen: + - Compliance-Score (gross + farbig) + - Top-3-Findings nach Severity + - Cookie-Statistik (deklariert / Browser / Drittland) + - Saving-Indikation (1 Zahl) + - Verteilung der Findings-Typen + +Alles danach (B-Blocks, Per-Doc, Per-Theme) wird in +`
` kollabiert ausgespielt. +""" + +from __future__ import annotations + +from collections import Counter +from html import escape as h + + +def _scorecard_html(state: dict) -> str: + sc = state.get("scorecard") or {} + score = sc.get("compliance_pct") + if score is None: + score = sc.get("completeness_pct", 0) + score_int = int(score) if score is not None else 0 + color = ("#15803d" if score_int >= 80 + else "#f59e0b" if score_int >= 50 else "#dc2626") + label = ("GUT" if score_int >= 80 + else "VERBESSERUNGSBEDARF" if score_int >= 50 else "KRITISCH") + return ( + f"
" + f"
Compliance-Score
" + f"
{score_int}%
" + f"
" + f"{label}
" + ) + + +def _findings_top_severity_html(state: dict, top_n: int = 3) -> str: + extras = state.get("extra_findings") or [] + # Filter to HIGH/MEDIUM, take top_n + high_med = [ + f for f in extras + if (f.get("severity") or "").upper() in ("HIGH", "MEDIUM") + ] + high_med.sort( + key=lambda f: 0 if (f.get("severity") or "").upper() == "HIGH" else 1, + ) + if not high_med: + return ( + "

" + "Keine HIGH/MEDIUM Findings — siehe Detail-Sektionen " + "für Hinweise.

" + ) + rows = [] + for f in high_med[:top_n]: + sev = (f.get("severity") or "").upper() + color = "#dc2626" if sev == "HIGH" else "#f59e0b" + title = (f.get("title") or "")[:120] + norm = (f.get("norm") or "")[:80] + rows.append( + f"
" + f"
{sev}
" + f"
" + f"{h(title)}
" + f"
" + f"{h(norm)}
" + ) + return "".join(rows) + + +def _cookie_stats_html(state: dict) -> str: + cmp_vendors = state.get("cmp_vendors") or [] + declared = sum(len(v.get("cookies") or []) for v in cmp_vendors) + banner = state.get("banner_result") or {} + in_browser = len(banner.get("cookies_detailed") or []) + third_country = sum( + 1 for v in cmp_vendors + if (v.get("country") or "").upper() not in ("DE", "AT", "BE", "FR", + "NL", "IT", "ES", "IE", "DK", "FI", "SE", "PT", "PL", "CZ", + "CH", "NO", "LI", "IS", "") + ) + return ( + f"
" + f"
" + f"
" + f"{declared}
" + f"
" + f"Cookies deklariert
" + f"
" + f"
" + f"{in_browser}
" + f"
" + f"Im Browser gesetzt
" + f"
" + f"
0 else '#64748b'};'>" + f"{third_country}
" + f"
" + f"Vendoren Drittland
" + f"
" + ) + + +def _findings_distribution_html(state: dict) -> str: + extras = state.get("extra_findings") or [] + if not extras: + return "" + by_sev = Counter( + (f.get("severity") or "").upper() for f in extras + ) + parts = [] + for sev, color in (("HIGH", "#dc2626"), ("MEDIUM", "#f59e0b"), + ("LOW", "#64748b"), ("INFO", "#94a3b8")): + n = by_sev.get(sev, 0) + if n > 0: + parts.append( + f"{n} {sev}" + ) + return " · ".join(parts) + + +def render_executive_summary(state: dict) -> str: + """Top-of-mail TL;DR. Should fit on 1 screen.""" + return ( + "
" + "

" + "📊 Executive Summary" + "

" + "
" + + _scorecard_html(state) + + f"
" + f"
" + f"Top Befunde · {_findings_distribution_html(state)}" + f"
" + + _findings_top_severity_html(state, top_n=3) + + "
" + "
" + + _cookie_stats_html(state) + + "

" + "Details siehe Akkordeons unten — alle Sektionen " + "klappbar.

" + "
" + ) + + +def collapsible(title: str, body: str, *, open_default: bool = False) -> str: + """Wrap any HTML block in a
/ accordion.""" + if not body: + return "" + open_attr = " open" if open_default else "" + return ( + f"" + f"{h(title)}" + f"
{body}
" + "
" + ) diff --git a/backend-compliance/tests/test_cross_domain_doc_check.py b/backend-compliance/tests/test_cross_domain_doc_check.py new file mode 100644 index 00000000..78b44a28 --- /dev/null +++ b/backend-compliance/tests/test_cross_domain_doc_check.py @@ -0,0 +1,88 @@ +"""Tests for B22 Cross-Domain-Legal-Doc-Detector.""" + +from compliance.services.cross_domain_doc_check import ( + _site_origin_sld, + _sld, + check_cross_domain_docs, +) + + +class TestSld: + def test_simple(self): + assert _sld("www.bmw.de") == "bmw" + + def test_compound_tld(self): + assert _sld("docs.example.co.uk") == "example" + + def test_no_www(self): + assert _sld("elli.eco") == "elli" + + +class TestPrimaryDetection: + def test_majority_wins(self): + state = {"doc_entries": [ + {"url": "https://elli.eco/de/impressum"}, + {"url": "https://elli.eco/de/datenschutz"}, + {"url": "https://docs.logpay.de/_docs/agb.pdf"}, + ]} + assert _site_origin_sld(state) == "elli" + + def test_auto_discovered_excluded(self): + # discovery results don't influence primary detection + state = {"doc_entries": [ + {"url": "https://elli.eco/de/impressum", "auto_discovered": False}, + {"url": "https://discovered.tld/foo", "auto_discovered": True}, + ]} + assert _site_origin_sld(state) == "elli" + + +class TestCheck: + def test_elli_logpay_pattern(self): + state = {"doc_entries": [ + {"doc_type": "dse", "url": "https://www.elli.eco/de/datenschutz"}, + {"doc_type": "impressum", + "url": "https://www.elli.eco/de/impressum"}, + {"doc_type": "agb", + "url": "https://docs.logpay.de/_docs/de/" + "allgemeine_geschaeftsbedingungen_de_EM.pdf"}, + ]} + findings = check_cross_domain_docs(state) + assert len(findings) == 1 + f = findings[0] + assert f["check_id"] == "CROSS-DOMAIN-DOC-001" + assert f["severity"] == "HIGH" # AGB is HIGH + assert f["doc_type"] == "agb" + assert f["site_sld"] == "elli" + assert f["host_sld"] == "logpay" + + def test_same_subdomain_no_finding(self): + # docs.bmw.de is same SLD as www.bmw.de — no finding + state = {"doc_entries": [ + {"doc_type": "dse", + "url": "https://www.bmw.de/de/datenschutz.html"}, + {"doc_type": "agb", + "url": "https://docs.bmw.de/agb.pdf"}, + ]} + findings = check_cross_domain_docs(state) + assert findings == [] + + def test_no_primary_no_finding(self): + # No URLs at all + state = {"doc_entries": []} + assert check_cross_domain_docs(state) == [] + + def test_severity_per_doc_type(self): + state = {"doc_entries": [ + {"doc_type": "agb", "url": "https://acme.de/x"}, + {"doc_type": "dse", + "url": "https://docs.thirdparty.com/agb"}, + {"doc_type": "impressum", + "url": "https://www.other.com/impressum"}, + ]} + findings = check_cross_domain_docs(state) + sev_by_doc = {f["doc_type"]: f["severity"] for f in findings} + # agb is on primary (acme.de) — no finding + # dse on thirdparty.com → MEDIUM + # impressum on other.com → INFO + assert sev_by_doc.get("dse") == "MEDIUM" + assert sev_by_doc.get("impressum") == "INFO"