From 6aad774fc1b48cbb51ddb931609e7b7b4273e74d Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sun, 7 Jun 2026 00:12:00 +0200 Subject: [PATCH] =?UTF-8?q?feat(b14):=20widerspr=C3=BCchliche=20Speicherda?= =?UTF-8?q?uer=20im=20selben=20Doc=20(GT=20TH-RETENTION-001)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Erkennt: in derselben DSE / Cookie-Richtlinie nennt der Anbieter für DIESELBE Datenkategorie mehrere unterschiedliche Speicherdauern. GT-Anker (Elli): Logfiles "7 Tage" + "30 Tage" im selben DSE → eine Angabe ist falsch oder veraltet. Heuristik: - Satz-Boundary-Scope (kein ±N-Zeichen-Fenster) verhindert Cross-Category-Leakage - Pro Satz: Kategorie-Anchor + Retention-Werte beide drin - Tag-Cluster mit ±20 %-Toleranz: "30 Tage" und "1 Monat" = 1 Cluster; "7 Tage" und "30 Tage" = 2 Cluster → Finding Kategorien (Phase 1): - logfile, contact_form, application, newsletter, invoice, session_cookie Severity: MEDIUM (DSGVO Art. 5 Abs. 1 lit. a + Art. 13 Abs. 2 lit. a). Tests: 11/11 grün (Cluster-Logik 5, Check-Pfade 6, inkl. Cross- Category-Leakage-Regression). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../compliance/api/agent_check/_b14_wiring.py | 69 +++++++ .../api/agent_check/_orchestrator.py | 2 + .../services/mail_render_v2/_compose.py | 2 + .../services/retention_conflict_check.py | 188 ++++++++++++++++++ .../tests/test_retention_conflict_check.py | 86 ++++++++ 5 files changed, 347 insertions(+) create mode 100644 backend-compliance/compliance/api/agent_check/_b14_wiring.py create mode 100644 backend-compliance/compliance/services/retention_conflict_check.py create mode 100644 backend-compliance/tests/test_retention_conflict_check.py diff --git a/backend-compliance/compliance/api/agent_check/_b14_wiring.py b/backend-compliance/compliance/api/agent_check/_b14_wiring.py new file mode 100644 index 00000000..fd00742a --- /dev/null +++ b/backend-compliance/compliance/api/agent_check/_b14_wiring.py @@ -0,0 +1,69 @@ +"""B14 wiring — Conflicting-Retention-Detector. + +Hängt sich an `state["extra_findings"]` an und rendert einen V2-Block +(`retention_conflict_html`). +""" + +from __future__ import annotations + +import html +import logging + +from compliance.services.retention_conflict_check import ( + check_retention_conflicts, +) + +logger = logging.getLogger(__name__) + + +def run_b14(state: dict) -> None: + new = check_retention_conflicts(state) + if not new: + return + extras = state.get("extra_findings") or [] + extras.extend(new) + state["extra_findings"] = extras + state["retention_conflict_html"] = _render(new) + logger.info("B14 retention-conflict: %d finding(s)", len(new)) + + +def _render(findings: list[dict]) -> str: + cards = [] + for f in findings: + sev = (f.get("severity") or "").upper() + color = "#f59e0b" if sev == "MEDIUM" else "#dc2626" + vals = f.get("values_days") or [] + vals_html = "" + if vals: + vals_html = ( + "
" + f"Werte (Tage): {html.escape(', '.join(str(v) for v in vals))}" + "
" + ) + cards.append( + f"
" + f"
" + f"{sev} · {html.escape(f.get('check_id') or '')}
" + f"
" + f"{html.escape(f.get('title') or '')}
" + f"
" + f"{html.escape(f.get('norm') or '')}
" + f"{vals_html}" + f"
" + f"{html.escape(f.get('evidence') or '')}
" + f"
" + f"→ Empfehlung: " + f"{html.escape(f.get('action') or '')}
" + "
" + ) + return ( + "
" + "

" + "⏱️ Widersprüchliche Speicherdauer (Doc-intern)" + "

" + + "".join(cards) + + "
" + ) diff --git a/backend-compliance/compliance/api/agent_check/_orchestrator.py b/backend-compliance/compliance/api/agent_check/_orchestrator.py index bdde8681..126d33b8 100644 --- a/backend-compliance/compliance/api/agent_check/_orchestrator.py +++ b/backend-compliance/compliance/api/agent_check/_orchestrator.py @@ -24,6 +24,7 @@ from ._b6b7b8_wiring import run_b6b7b8 from ._b9b10_wiring import run_b9b10 from ._b12_wiring import run_b12 from ._b13_wiring import run_b13 +from ._b14_wiring import run_b14 from ._constants import _compliance_check_jobs from ._phase_a_resolve import run_phase_a from ._phase_b_profile_check import run_phase_b @@ -72,6 +73,7 @@ async def run_compliance_check(check_id: str, req) -> None: run_b9b10(state) # Multi-Entity-Impressum + Drittland-Mechanismus run_b12(state) # Chatbot-Cookie-Klassifikation (B11 ist in B9B10) run_b13(state) # Widerrufsbelehrung-Reachability (B2C-Pflicht) + run_b14(state) # Widersprüchliche Speicherdauer im selben Doc # Phase D-3 top/mid/bot: Step 5 HTML blocks await run_phase_d3_top(state) await run_phase_d3_mid(state) diff --git a/backend-compliance/compliance/services/mail_render_v2/_compose.py b/backend-compliance/compliance/services/mail_render_v2/_compose.py index ca1a2d62..9839315a 100644 --- a/backend-compliance/compliance/services/mail_render_v2/_compose.py +++ b/backend-compliance/compliance/services/mail_render_v2/_compose.py @@ -50,6 +50,8 @@ def compose_v2(state: dict) -> str: state.get("chatbot_cookie_html", ""), # B13 Widerrufsbelehrung-Reachability (B2C-Pflicht) state.get("widerruf_reach_html", ""), + # B14 Widersprüchliche Speicherdauer im selben Doc + state.get("retention_conflict_html", ""), # Browser-Matrix (Stage 1.c) state.get("browser_matrix_html", ""), # All legacy build_*_html() wrapped in V2 sections — preserves diff --git a/backend-compliance/compliance/services/retention_conflict_check.py b/backend-compliance/compliance/services/retention_conflict_check.py new file mode 100644 index 00000000..2e603396 --- /dev/null +++ b/backend-compliance/compliance/services/retention_conflict_check.py @@ -0,0 +1,188 @@ +"""B14 — Conflicting-Retention-in-Document-Detector. + +Erkennt: in DERSELBEN DSE / Cookie-Richtlinie nennt der Anbieter +für DIESELBE Datenkategorie mehrere unterschiedliche Speicherdauern. + +GT-Anker (Elli TH-RETENTION-001): + - "Logfiles werden für 7 Tage gespeichert" + - "Server-Logs werden 30 Tage aufbewahrt" + → Eine der Angaben ist falsch / veraltet. + +Norm: DSGVO Art. 5 Abs. 1 lit. a (Transparenz) + Art. 13 Abs. 2 lit. a +(konkrete Angabe der Speicherdauer). + +Heuristik: + 1. Kategorie-Anker scannen (Logfile, Kontaktformular, Bewerbung, ...) + 2. Pro Treffer: ± 300 Zeichen Kontext, Retention-Werte extrahieren + 3. Pro Kategorie alle gefundenen Tage-Werte sammeln + 4. Werte clustern (Toleranz ±20%, mind. 1 Tag) + 5. ≥2 Cluster → Finding mit Schweregrad MEDIUM +""" + +from __future__ import annotations + +import logging +import re +from collections import defaultdict + +from .retention_comparator import parse_duration_to_days + +logger = logging.getLogger(__name__) + +# Each entry: (category_key, anchors_lower) +_CATEGORIES: list[tuple[str, tuple[str, ...]]] = [ + ("logfile", ( + "logfile", "logfiles", "log-datei", "log-dateien", "logdatei", + "server-log", "server log", "serverlog", + "access-log", "access log", "zugriffslog", + "webserver-log", "webserver log", + "webserver-protokoll", "server-protokoll", + "ip-adressen werden gespeichert", "ip-adresse wird gespeichert", + )), + ("contact_form", ( + "kontaktformular", "kontakt-anfrage", "kontaktanfrage", + "contact form", + )), + ("application", ( + "bewerbung", "bewerberdat", "applicant", + )), + ("newsletter", ( + "newsletter-abonnement", "newsletter abonnem", + "newsletter-anmeldung", + )), + ("invoice", ( + "rechnungsdaten", "rechnungs-daten", "rechnungen werden", + )), + ("session_cookie", ( + "session-cookie", "session cookie", "sitzungs-cookie", + "sitzungscookie", + )), +] + + +# Find any retention figure: "X Tage / Monate / Jahre / Wochen". +_DURATION_PAT = re.compile( + r"(\d+(?:[.,]\d+)?\s*(?:tage?|monate?|jahre?|wochen?|" + r"days?|months?|years?|weeks?|d|h))", + re.IGNORECASE, +) + + +_SENTENCE_SPLIT_PAT = re.compile(r"(?<=[.!?])\s+(?=[A-ZÄÖÜ])") + + +def _extract_durations_in(text: str) -> list[float]: + """Return all duration values (in days) found in `text`.""" + days: list[float] = [] + for m in _DURATION_PAT.finditer(text): + d, kind = parse_duration_to_days(m.group(1)) + if d is not None and kind == "days" and d > 0: + days.append(d) + return days + + +def _cluster_values(values: list[float], + tol_ratio: float = 0.2) -> list[list[float]]: + """Cluster values where any pair within tol_ratio of each other belongs + to the same cluster. 7 and 30 days → 2 clusters; 30 and 31 → 1. + """ + if not values: + return [] + sv = sorted(values) + clusters: list[list[float]] = [[sv[0]]] + for v in sv[1:]: + last = clusters[-1][-1] + # Same cluster if within ratio OR within 1 day absolute + tol = max(last * tol_ratio, 1.0) + if abs(v - last) <= tol: + clusters[-1].append(v) + else: + clusters.append([v]) + return clusters + + +def _format_days(days: float) -> str: + if days >= 365 and abs(days % 365) < 2: + y = round(days / 365) + return f"{y} Jahr" if y == 1 else f"{y} Jahre" + if days >= 30 and abs(days % 30) < 2: + mo = round(days / 30) + return f"{mo} Monat" if mo == 1 else f"{mo} Monate" + if days >= 7 and abs(days % 7) < 0.5: + w = round(days / 7) + return f"{w} Woche" if w == 1 else f"{w} Wochen" + if days == int(days): + return f"{int(days)} Tage" + return f"{days:.1f} Tage" + + +_CATEGORY_LABELS = { + "logfile": "Server-Logfiles", + "contact_form": "Kontaktformular-Daten", + "application": "Bewerberdaten", + "newsletter": "Newsletter-Abonnement", + "invoice": "Rechnungsdaten", + "session_cookie": "Session-Cookies", +} + + +def check_retention_conflicts(state: dict) -> list[dict]: + """Scan DSE + cookie doc for conflicting retention values per category.""" + doc_texts = state.get("doc_texts") or {} + findings: list[dict] = [] + for doc_type in ("dse", "cookie"): + text = doc_texts.get(doc_type) or "" + if not text: + continue + # Sentence-level scope: a retention value only counts for a + # category when both the anchor AND the duration appear in the + # SAME sentence. This prevents cross-category leakage where + # "Kontaktformular ... 6 Monate" sits two sentences after + # "Logfiles 30 Tage" and gets credited to the wrong category. + sentences = _SENTENCE_SPLIT_PAT.split(text) + per_cat: dict[str, list[float]] = defaultdict(list) + for sent in sentences: + sent_lc = sent.lower() + for cat_key, anchors in _CATEGORIES: + if any(a in sent_lc for a in anchors): + per_cat[cat_key].extend(_extract_durations_in(sent)) + + for cat_key, days_list in per_cat.items(): + clusters = _cluster_values(days_list) + if len(clusters) < 2: + continue + # Take min & max cluster center + mins = [min(c) for c in clusters] + mins.sort() + samples = [_format_days(m) for m in mins[:3]] + findings.append({ + "check_id": "RETENTION-CONFLICT-001", + "severity": "MEDIUM", + "severity_reason": "inconsistent", + "category": cat_key, + "doc_type": doc_type, + "values_days": sorted(set(round(d, 1) for d in days_list)), + "title": ( + f"Widersprüchliche Speicherdauer für " + f"{_CATEGORY_LABELS.get(cat_key, cat_key)} im " + f"{('Datenschutzerklärung' if doc_type == 'dse' else 'Cookie-Doc')}" + ), + "norm": "DSGVO Art. 5 Abs. 1 lit. a + Art. 13 Abs. 2 lit. a", + "evidence": ( + f"Genannte Werte: {', '.join(samples)}. " + f"Bei DERSELBEN Datenkategorie dürfen nicht zwei " + f"unterschiedliche Speicherdauern stehen — eine ist " + f"falsch oder veraltet." + ), + "action": ( + f"Speicherdauer für " + f"{_CATEGORY_LABELS.get(cat_key, cat_key)} vereinheitlichen: " + f"den korrekten Wert recherchieren und Doppelnennungen " + f"streichen. Bei abgestuften Werten (z.B. Anonymisierung " + f"nach 7 Tagen, Vollöschung nach 30 Tagen) explizit " + f"als Stufen ausweisen." + ), + }) + if findings: + logger.info("B14 retention-conflict: %d finding(s)", len(findings)) + return findings diff --git a/backend-compliance/tests/test_retention_conflict_check.py b/backend-compliance/tests/test_retention_conflict_check.py new file mode 100644 index 00000000..2c8ba7fa --- /dev/null +++ b/backend-compliance/tests/test_retention_conflict_check.py @@ -0,0 +1,86 @@ +"""Tests for B14 retention-conflict-Detector (GT TH-RETENTION-001).""" + +from compliance.services.retention_conflict_check import ( + _cluster_values, + check_retention_conflicts, +) + + +class TestClusterValues: + def test_empty(self): + assert _cluster_values([]) == [] + + def test_single_value(self): + assert _cluster_values([7]) == [[7]] + + def test_two_close_values_one_cluster(self): + # 30 and 31 days within 20% tolerance + assert _cluster_values([30, 31]) == [[30, 31]] + + def test_two_distant_values_two_clusters(self): + # 7 and 30 days — well outside 20% tolerance + clusters = _cluster_values([7, 30]) + assert len(clusters) == 2 + + def test_equivalent_durations_collapse(self): + # 30 Tage and 1 Monat (==30 Tage) → one cluster + clusters = _cluster_values([30, 30]) + assert clusters == [[30, 30]] + + +class TestCheckRetentionConflicts: + def test_no_doc_no_findings(self): + assert check_retention_conflicts({}) == [] + + def test_logfile_7_vs_30_finding(self): + text = ( + "Server-Logfiles werden für 7 Tage gespeichert. " + "Bei Sicherheitsvorfällen werden die Logfiles bis zu 30 Tage " + "aufbewahrt." + ) + findings = check_retention_conflicts({"doc_texts": {"dse": text}}) + assert len(findings) == 1 + f = findings[0] + assert f["check_id"] == "RETENTION-CONFLICT-001" + assert f["category"] == "logfile" + assert f["doc_type"] == "dse" + assert 7.0 in f["values_days"] + assert 30.0 in f["values_days"] + + def test_logfile_single_value_no_finding(self): + text = "Logfiles werden 7 Tage aufbewahrt." + assert check_retention_conflicts({"doc_texts": {"dse": text}}) == [] + + def test_logfile_close_values_no_finding(self): + # 30 days vs ~1 Monat — same cluster + text = ( + "Logfiles werden 30 Tage gespeichert. " + "Die Aufbewahrungsdauer beträgt 1 Monat." + ) + # NOTE: parse_duration_to_days('1 Monat') → 30 days; same cluster. + findings = check_retention_conflicts({"doc_texts": {"dse": text}}) + # Either no finding (preferred) or zero because clusters collapse. + cf = [f for f in findings if f["category"] == "logfile"] + assert cf == [] + + def test_only_categorisations_with_two_clusters_emit(self): + # Logfile two values + contact_form single → only logfile fires. + text = ( + "Server-Logfiles werden 7 Tage gespeichert. " + "Außerdem speichern wir Logfiles bis zu 90 Tage. " + "Kontaktformular-Daten werden 6 Monate aufbewahrt." + ) + findings = check_retention_conflicts({"doc_texts": {"dse": text}}) + cats = [f["category"] for f in findings] + assert "logfile" in cats + assert "contact_form" not in cats + + def test_dse_and_cookie_doc_separately(self): + text_dse = "Logfiles werden 7 Tage gespeichert. Logfiles 30 Tage." + text_cookie = "Session-Cookie läuft nach 1 Tag ab." + findings = check_retention_conflicts({ + "doc_texts": {"dse": text_dse, "cookie": text_cookie} + }) + # Only logfile conflict in dse, nothing in cookie. + assert len(findings) == 1 + assert findings[0]["doc_type"] == "dse"