feat(b14): widersprüchliche Speicherdauer im selben Doc (GT TH-RETENTION-001)
Erkennt: in derselben DSE / Cookie-Richtlinie nennt der Anbieter für
DIESELBE Datenkategorie mehrere unterschiedliche Speicherdauern.
GT-Anker (Elli): Logfiles "7 Tage" + "30 Tage" im selben DSE → eine
Angabe ist falsch oder veraltet.
Heuristik:
- Satz-Boundary-Scope (kein ±N-Zeichen-Fenster) verhindert
Cross-Category-Leakage
- Pro Satz: Kategorie-Anchor + Retention-Werte beide drin
- Tag-Cluster mit ±20 %-Toleranz: "30 Tage" und "1 Monat" =
1 Cluster; "7 Tage" und "30 Tage" = 2 Cluster → Finding
Kategorien (Phase 1):
- logfile, contact_form, application, newsletter, invoice,
session_cookie
Severity: MEDIUM (DSGVO Art. 5 Abs. 1 lit. a + Art. 13 Abs. 2 lit. a).
Tests: 11/11 grün (Cluster-Logik 5, Check-Pfade 6, inkl. Cross-
Category-Leakage-Regression).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,69 @@
|
||||
"""B14 wiring — Conflicting-Retention-Detector.
|
||||
|
||||
Hängt sich an `state["extra_findings"]` an und rendert einen V2-Block
|
||||
(`retention_conflict_html`).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import html
|
||||
import logging
|
||||
|
||||
from compliance.services.retention_conflict_check import (
|
||||
check_retention_conflicts,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def run_b14(state: dict) -> None:
|
||||
new = check_retention_conflicts(state)
|
||||
if not new:
|
||||
return
|
||||
extras = state.get("extra_findings") or []
|
||||
extras.extend(new)
|
||||
state["extra_findings"] = extras
|
||||
state["retention_conflict_html"] = _render(new)
|
||||
logger.info("B14 retention-conflict: %d finding(s)", len(new))
|
||||
|
||||
|
||||
def _render(findings: list[dict]) -> str:
|
||||
cards = []
|
||||
for f in findings:
|
||||
sev = (f.get("severity") or "").upper()
|
||||
color = "#f59e0b" if sev == "MEDIUM" else "#dc2626"
|
||||
vals = f.get("values_days") or []
|
||||
vals_html = ""
|
||||
if vals:
|
||||
vals_html = (
|
||||
"<div style='font-size:12px;color:#475569;margin-top:6px;'>"
|
||||
f"<em>Werte (Tage): {html.escape(', '.join(str(v) for v in vals))}</em>"
|
||||
"</div>"
|
||||
)
|
||||
cards.append(
|
||||
f"<div style='margin:12px 0;padding:14px;background:#fff;"
|
||||
f"border-left:3px solid {color};border-radius:4px;'>"
|
||||
f"<div style='font-weight:600;color:{color};font-size:14px;'>"
|
||||
f"{sev} · {html.escape(f.get('check_id') or '')}</div>"
|
||||
f"<div style='font-size:14px;margin-top:4px;'>"
|
||||
f"<strong>{html.escape(f.get('title') or '')}</strong></div>"
|
||||
f"<div style='font-size:12px;color:#64748b;margin-top:2px;'>"
|
||||
f"{html.escape(f.get('norm') or '')}</div>"
|
||||
f"{vals_html}"
|
||||
f"<div style='font-size:12px;color:#475569;margin-top:6px;'>"
|
||||
f"<em>{html.escape(f.get('evidence') or '')}</em></div>"
|
||||
f"<div style='font-size:13px;margin-top:8px;background:#dcfce7;"
|
||||
f"padding:8px 10px;border-radius:4px;'>"
|
||||
f"<strong>→ Empfehlung:</strong> "
|
||||
f"{html.escape(f.get('action') or '')}</div>"
|
||||
"</div>"
|
||||
)
|
||||
return (
|
||||
"<div style='margin:24px 0;padding:16px;border-left:4px solid #f59e0b;"
|
||||
"background:#fffbeb;border-radius:4px;'>"
|
||||
"<h2 style='margin:0 0 8px;color:#92400e;font-size:16px;'>"
|
||||
"⏱️ Widersprüchliche Speicherdauer (Doc-intern)"
|
||||
"</h2>"
|
||||
+ "".join(cards) +
|
||||
"</div>"
|
||||
)
|
||||
@@ -24,6 +24,7 @@ from ._b6b7b8_wiring import run_b6b7b8
|
||||
from ._b9b10_wiring import run_b9b10
|
||||
from ._b12_wiring import run_b12
|
||||
from ._b13_wiring import run_b13
|
||||
from ._b14_wiring import run_b14
|
||||
from ._constants import _compliance_check_jobs
|
||||
from ._phase_a_resolve import run_phase_a
|
||||
from ._phase_b_profile_check import run_phase_b
|
||||
@@ -72,6 +73,7 @@ async def run_compliance_check(check_id: str, req) -> None:
|
||||
run_b9b10(state) # Multi-Entity-Impressum + Drittland-Mechanismus
|
||||
run_b12(state) # Chatbot-Cookie-Klassifikation (B11 ist in B9B10)
|
||||
run_b13(state) # Widerrufsbelehrung-Reachability (B2C-Pflicht)
|
||||
run_b14(state) # Widersprüchliche Speicherdauer im selben Doc
|
||||
# Phase D-3 top/mid/bot: Step 5 HTML blocks
|
||||
await run_phase_d3_top(state)
|
||||
await run_phase_d3_mid(state)
|
||||
|
||||
@@ -50,6 +50,8 @@ def compose_v2(state: dict) -> str:
|
||||
state.get("chatbot_cookie_html", ""),
|
||||
# B13 Widerrufsbelehrung-Reachability (B2C-Pflicht)
|
||||
state.get("widerruf_reach_html", ""),
|
||||
# B14 Widersprüchliche Speicherdauer im selben Doc
|
||||
state.get("retention_conflict_html", ""),
|
||||
# Browser-Matrix (Stage 1.c)
|
||||
state.get("browser_matrix_html", ""),
|
||||
# All legacy build_*_html() wrapped in V2 sections — preserves
|
||||
|
||||
@@ -0,0 +1,188 @@
|
||||
"""B14 — Conflicting-Retention-in-Document-Detector.
|
||||
|
||||
Erkennt: in DERSELBEN DSE / Cookie-Richtlinie nennt der Anbieter
|
||||
für DIESELBE Datenkategorie mehrere unterschiedliche Speicherdauern.
|
||||
|
||||
GT-Anker (Elli TH-RETENTION-001):
|
||||
- "Logfiles werden für 7 Tage gespeichert"
|
||||
- "Server-Logs werden 30 Tage aufbewahrt"
|
||||
→ Eine der Angaben ist falsch / veraltet.
|
||||
|
||||
Norm: DSGVO Art. 5 Abs. 1 lit. a (Transparenz) + Art. 13 Abs. 2 lit. a
|
||||
(konkrete Angabe der Speicherdauer).
|
||||
|
||||
Heuristik:
|
||||
1. Kategorie-Anker scannen (Logfile, Kontaktformular, Bewerbung, ...)
|
||||
2. Pro Treffer: ± 300 Zeichen Kontext, Retention-Werte extrahieren
|
||||
3. Pro Kategorie alle gefundenen Tage-Werte sammeln
|
||||
4. Werte clustern (Toleranz ±20%, mind. 1 Tag)
|
||||
5. ≥2 Cluster → Finding mit Schweregrad MEDIUM
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
from collections import defaultdict
|
||||
|
||||
from .retention_comparator import parse_duration_to_days
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Each entry: (category_key, anchors_lower)
|
||||
_CATEGORIES: list[tuple[str, tuple[str, ...]]] = [
|
||||
("logfile", (
|
||||
"logfile", "logfiles", "log-datei", "log-dateien", "logdatei",
|
||||
"server-log", "server log", "serverlog",
|
||||
"access-log", "access log", "zugriffslog",
|
||||
"webserver-log", "webserver log",
|
||||
"webserver-protokoll", "server-protokoll",
|
||||
"ip-adressen werden gespeichert", "ip-adresse wird gespeichert",
|
||||
)),
|
||||
("contact_form", (
|
||||
"kontaktformular", "kontakt-anfrage", "kontaktanfrage",
|
||||
"contact form",
|
||||
)),
|
||||
("application", (
|
||||
"bewerbung", "bewerberdat", "applicant",
|
||||
)),
|
||||
("newsletter", (
|
||||
"newsletter-abonnement", "newsletter abonnem",
|
||||
"newsletter-anmeldung",
|
||||
)),
|
||||
("invoice", (
|
||||
"rechnungsdaten", "rechnungs-daten", "rechnungen werden",
|
||||
)),
|
||||
("session_cookie", (
|
||||
"session-cookie", "session cookie", "sitzungs-cookie",
|
||||
"sitzungscookie",
|
||||
)),
|
||||
]
|
||||
|
||||
|
||||
# Find any retention figure: "X Tage / Monate / Jahre / Wochen".
|
||||
_DURATION_PAT = re.compile(
|
||||
r"(\d+(?:[.,]\d+)?\s*(?:tage?|monate?|jahre?|wochen?|"
|
||||
r"days?|months?|years?|weeks?|d|h))",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
_SENTENCE_SPLIT_PAT = re.compile(r"(?<=[.!?])\s+(?=[A-ZÄÖÜ])")
|
||||
|
||||
|
||||
def _extract_durations_in(text: str) -> list[float]:
|
||||
"""Return all duration values (in days) found in `text`."""
|
||||
days: list[float] = []
|
||||
for m in _DURATION_PAT.finditer(text):
|
||||
d, kind = parse_duration_to_days(m.group(1))
|
||||
if d is not None and kind == "days" and d > 0:
|
||||
days.append(d)
|
||||
return days
|
||||
|
||||
|
||||
def _cluster_values(values: list[float],
|
||||
tol_ratio: float = 0.2) -> list[list[float]]:
|
||||
"""Cluster values where any pair within tol_ratio of each other belongs
|
||||
to the same cluster. 7 and 30 days → 2 clusters; 30 and 31 → 1.
|
||||
"""
|
||||
if not values:
|
||||
return []
|
||||
sv = sorted(values)
|
||||
clusters: list[list[float]] = [[sv[0]]]
|
||||
for v in sv[1:]:
|
||||
last = clusters[-1][-1]
|
||||
# Same cluster if within ratio OR within 1 day absolute
|
||||
tol = max(last * tol_ratio, 1.0)
|
||||
if abs(v - last) <= tol:
|
||||
clusters[-1].append(v)
|
||||
else:
|
||||
clusters.append([v])
|
||||
return clusters
|
||||
|
||||
|
||||
def _format_days(days: float) -> str:
|
||||
if days >= 365 and abs(days % 365) < 2:
|
||||
y = round(days / 365)
|
||||
return f"{y} Jahr" if y == 1 else f"{y} Jahre"
|
||||
if days >= 30 and abs(days % 30) < 2:
|
||||
mo = round(days / 30)
|
||||
return f"{mo} Monat" if mo == 1 else f"{mo} Monate"
|
||||
if days >= 7 and abs(days % 7) < 0.5:
|
||||
w = round(days / 7)
|
||||
return f"{w} Woche" if w == 1 else f"{w} Wochen"
|
||||
if days == int(days):
|
||||
return f"{int(days)} Tage"
|
||||
return f"{days:.1f} Tage"
|
||||
|
||||
|
||||
_CATEGORY_LABELS = {
|
||||
"logfile": "Server-Logfiles",
|
||||
"contact_form": "Kontaktformular-Daten",
|
||||
"application": "Bewerberdaten",
|
||||
"newsletter": "Newsletter-Abonnement",
|
||||
"invoice": "Rechnungsdaten",
|
||||
"session_cookie": "Session-Cookies",
|
||||
}
|
||||
|
||||
|
||||
def check_retention_conflicts(state: dict) -> list[dict]:
|
||||
"""Scan DSE + cookie doc for conflicting retention values per category."""
|
||||
doc_texts = state.get("doc_texts") or {}
|
||||
findings: list[dict] = []
|
||||
for doc_type in ("dse", "cookie"):
|
||||
text = doc_texts.get(doc_type) or ""
|
||||
if not text:
|
||||
continue
|
||||
# Sentence-level scope: a retention value only counts for a
|
||||
# category when both the anchor AND the duration appear in the
|
||||
# SAME sentence. This prevents cross-category leakage where
|
||||
# "Kontaktformular ... 6 Monate" sits two sentences after
|
||||
# "Logfiles 30 Tage" and gets credited to the wrong category.
|
||||
sentences = _SENTENCE_SPLIT_PAT.split(text)
|
||||
per_cat: dict[str, list[float]] = defaultdict(list)
|
||||
for sent in sentences:
|
||||
sent_lc = sent.lower()
|
||||
for cat_key, anchors in _CATEGORIES:
|
||||
if any(a in sent_lc for a in anchors):
|
||||
per_cat[cat_key].extend(_extract_durations_in(sent))
|
||||
|
||||
for cat_key, days_list in per_cat.items():
|
||||
clusters = _cluster_values(days_list)
|
||||
if len(clusters) < 2:
|
||||
continue
|
||||
# Take min & max cluster center
|
||||
mins = [min(c) for c in clusters]
|
||||
mins.sort()
|
||||
samples = [_format_days(m) for m in mins[:3]]
|
||||
findings.append({
|
||||
"check_id": "RETENTION-CONFLICT-001",
|
||||
"severity": "MEDIUM",
|
||||
"severity_reason": "inconsistent",
|
||||
"category": cat_key,
|
||||
"doc_type": doc_type,
|
||||
"values_days": sorted(set(round(d, 1) for d in days_list)),
|
||||
"title": (
|
||||
f"Widersprüchliche Speicherdauer für "
|
||||
f"{_CATEGORY_LABELS.get(cat_key, cat_key)} im "
|
||||
f"{('Datenschutzerklärung' if doc_type == 'dse' else 'Cookie-Doc')}"
|
||||
),
|
||||
"norm": "DSGVO Art. 5 Abs. 1 lit. a + Art. 13 Abs. 2 lit. a",
|
||||
"evidence": (
|
||||
f"Genannte Werte: {', '.join(samples)}. "
|
||||
f"Bei DERSELBEN Datenkategorie dürfen nicht zwei "
|
||||
f"unterschiedliche Speicherdauern stehen — eine ist "
|
||||
f"falsch oder veraltet."
|
||||
),
|
||||
"action": (
|
||||
f"Speicherdauer für "
|
||||
f"{_CATEGORY_LABELS.get(cat_key, cat_key)} vereinheitlichen: "
|
||||
f"den korrekten Wert recherchieren und Doppelnennungen "
|
||||
f"streichen. Bei abgestuften Werten (z.B. Anonymisierung "
|
||||
f"nach 7 Tagen, Vollöschung nach 30 Tagen) explizit "
|
||||
f"als Stufen ausweisen."
|
||||
),
|
||||
})
|
||||
if findings:
|
||||
logger.info("B14 retention-conflict: %d finding(s)", len(findings))
|
||||
return findings
|
||||
@@ -0,0 +1,86 @@
|
||||
"""Tests for B14 retention-conflict-Detector (GT TH-RETENTION-001)."""
|
||||
|
||||
from compliance.services.retention_conflict_check import (
|
||||
_cluster_values,
|
||||
check_retention_conflicts,
|
||||
)
|
||||
|
||||
|
||||
class TestClusterValues:
|
||||
def test_empty(self):
|
||||
assert _cluster_values([]) == []
|
||||
|
||||
def test_single_value(self):
|
||||
assert _cluster_values([7]) == [[7]]
|
||||
|
||||
def test_two_close_values_one_cluster(self):
|
||||
# 30 and 31 days within 20% tolerance
|
||||
assert _cluster_values([30, 31]) == [[30, 31]]
|
||||
|
||||
def test_two_distant_values_two_clusters(self):
|
||||
# 7 and 30 days — well outside 20% tolerance
|
||||
clusters = _cluster_values([7, 30])
|
||||
assert len(clusters) == 2
|
||||
|
||||
def test_equivalent_durations_collapse(self):
|
||||
# 30 Tage and 1 Monat (==30 Tage) → one cluster
|
||||
clusters = _cluster_values([30, 30])
|
||||
assert clusters == [[30, 30]]
|
||||
|
||||
|
||||
class TestCheckRetentionConflicts:
|
||||
def test_no_doc_no_findings(self):
|
||||
assert check_retention_conflicts({}) == []
|
||||
|
||||
def test_logfile_7_vs_30_finding(self):
|
||||
text = (
|
||||
"Server-Logfiles werden für 7 Tage gespeichert. "
|
||||
"Bei Sicherheitsvorfällen werden die Logfiles bis zu 30 Tage "
|
||||
"aufbewahrt."
|
||||
)
|
||||
findings = check_retention_conflicts({"doc_texts": {"dse": text}})
|
||||
assert len(findings) == 1
|
||||
f = findings[0]
|
||||
assert f["check_id"] == "RETENTION-CONFLICT-001"
|
||||
assert f["category"] == "logfile"
|
||||
assert f["doc_type"] == "dse"
|
||||
assert 7.0 in f["values_days"]
|
||||
assert 30.0 in f["values_days"]
|
||||
|
||||
def test_logfile_single_value_no_finding(self):
|
||||
text = "Logfiles werden 7 Tage aufbewahrt."
|
||||
assert check_retention_conflicts({"doc_texts": {"dse": text}}) == []
|
||||
|
||||
def test_logfile_close_values_no_finding(self):
|
||||
# 30 days vs ~1 Monat — same cluster
|
||||
text = (
|
||||
"Logfiles werden 30 Tage gespeichert. "
|
||||
"Die Aufbewahrungsdauer beträgt 1 Monat."
|
||||
)
|
||||
# NOTE: parse_duration_to_days('1 Monat') → 30 days; same cluster.
|
||||
findings = check_retention_conflicts({"doc_texts": {"dse": text}})
|
||||
# Either no finding (preferred) or zero because clusters collapse.
|
||||
cf = [f for f in findings if f["category"] == "logfile"]
|
||||
assert cf == []
|
||||
|
||||
def test_only_categorisations_with_two_clusters_emit(self):
|
||||
# Logfile two values + contact_form single → only logfile fires.
|
||||
text = (
|
||||
"Server-Logfiles werden 7 Tage gespeichert. "
|
||||
"Außerdem speichern wir Logfiles bis zu 90 Tage. "
|
||||
"Kontaktformular-Daten werden 6 Monate aufbewahrt."
|
||||
)
|
||||
findings = check_retention_conflicts({"doc_texts": {"dse": text}})
|
||||
cats = [f["category"] for f in findings]
|
||||
assert "logfile" in cats
|
||||
assert "contact_form" not in cats
|
||||
|
||||
def test_dse_and_cookie_doc_separately(self):
|
||||
text_dse = "Logfiles werden 7 Tage gespeichert. Logfiles 30 Tage."
|
||||
text_cookie = "Session-Cookie läuft nach 1 Tag ab."
|
||||
findings = check_retention_conflicts({
|
||||
"doc_texts": {"dse": text_dse, "cookie": text_cookie}
|
||||
})
|
||||
# Only logfile conflict in dse, nothing in cookie.
|
||||
assert len(findings) == 1
|
||||
assert findings[0]["doc_type"] == "dse"
|
||||
Reference in New Issue
Block a user