"""B14 — Conflicting-Retention-in-Document-Detector. Erkennt: in DERSELBEN DSE / Cookie-Richtlinie nennt der Anbieter für DIESELBE Datenkategorie mehrere unterschiedliche Speicherdauern. GT-Anker (Elli TH-RETENTION-001): - "Logfiles werden für 7 Tage gespeichert" - "Server-Logs werden 30 Tage aufbewahrt" → Eine der Angaben ist falsch / veraltet. Norm: DSGVO Art. 5 Abs. 1 lit. a (Transparenz) + Art. 13 Abs. 2 lit. a (konkrete Angabe der Speicherdauer). Heuristik: 1. Kategorie-Anker scannen (Logfile, Kontaktformular, Bewerbung, ...) 2. Pro Treffer: ± 300 Zeichen Kontext, Retention-Werte extrahieren 3. Pro Kategorie alle gefundenen Tage-Werte sammeln 4. Werte clustern (Toleranz ±20%, mind. 1 Tag) 5. ≥2 Cluster → Finding mit Schweregrad MEDIUM """ from __future__ import annotations import logging import re from collections import defaultdict from .retention_comparator import parse_duration_to_days logger = logging.getLogger(__name__) # Each entry: (category_key, anchors_lower) _CATEGORIES: list[tuple[str, tuple[str, ...]]] = [ ("logfile", ( "logfile", "logfiles", "log-datei", "log-dateien", "logdatei", "server-log", "server log", "serverlog", "access-log", "access log", "zugriffslog", "webserver-log", "webserver log", "webserver-protokoll", "server-protokoll", "protokolldat", "protokollierung der zugriffe", "zugriffsdat", "zugriffsprotokoll", "ip-adressen werden gespeichert", "ip-adresse wird gespeichert", )), ("contact_form", ( "kontaktformular", "kontakt-anfrage", "kontaktanfrage", "contact form", )), ("application", ( "bewerbung", "bewerberdat", "applicant", )), ("newsletter", ( "newsletter-abonnement", "newsletter abonnem", "newsletter-anmeldung", )), ("invoice", ( "rechnungsdaten", "rechnungs-daten", "rechnungen werden", )), ("session_cookie", ( "session-cookie", "session cookie", "sitzungs-cookie", "sitzungscookie", )), ] # Find any retention figure: "X Tage / Monate / Jahre / Wochen". _DURATION_PAT = re.compile( r"(\d+(?:[.,]\d+)?\s*(?:tage?|monate?|jahre?|wochen?|" r"days?|months?|years?|weeks?|d|h))", re.IGNORECASE, ) _SENTENCE_SPLIT_PAT = re.compile(r"(?<=[.!?])\s+(?=[A-ZÄÖÜ])") def _extract_durations_in(text: str) -> list[float]: """Return all duration values (in days) found in `text`.""" days: list[float] = [] for m in _DURATION_PAT.finditer(text): d, kind = parse_duration_to_days(m.group(1)) if d is not None and kind == "days" and d > 0: days.append(d) return days def _cluster_values(values: list[float], tol_ratio: float = 0.2) -> list[list[float]]: """Cluster values where any pair within tol_ratio of each other belongs to the same cluster. 7 and 30 days → 2 clusters; 30 and 31 → 1. """ if not values: return [] sv = sorted(values) clusters: list[list[float]] = [[sv[0]]] for v in sv[1:]: last = clusters[-1][-1] # Same cluster if within ratio OR within 1 day absolute tol = max(last * tol_ratio, 1.0) if abs(v - last) <= tol: clusters[-1].append(v) else: clusters.append([v]) return clusters def _format_days(days: float) -> str: if days >= 365 and abs(days % 365) < 2: y = round(days / 365) return f"{y} Jahr" if y == 1 else f"{y} Jahre" if days >= 30 and abs(days % 30) < 2: mo = round(days / 30) return f"{mo} Monat" if mo == 1 else f"{mo} Monate" if days >= 7 and abs(days % 7) < 0.5: w = round(days / 7) return f"{w} Woche" if w == 1 else f"{w} Wochen" if days == int(days): return f"{int(days)} Tage" return f"{days:.1f} Tage" _CATEGORY_LABELS = { "logfile": "Server-Logfiles", "contact_form": "Kontaktformular-Daten", "application": "Bewerberdaten", "newsletter": "Newsletter-Abonnement", "invoice": "Rechnungsdaten", "session_cookie": "Session-Cookies", } def check_retention_conflicts(state: dict) -> list[dict]: """Scan DSE + cookie doc for conflicting retention values per category.""" doc_texts = state.get("doc_texts") or {} findings: list[dict] = [] for doc_type in ("dse", "cookie"): text = doc_texts.get(doc_type) or "" if not text: continue # Sentence-level scope: a retention value only counts for a # category when both the anchor AND the duration appear in the # SAME sentence. This prevents cross-category leakage where # "Kontaktformular ... 6 Monate" sits two sentences after # "Logfiles 30 Tage" and gets credited to the wrong category. sentences = _SENTENCE_SPLIT_PAT.split(text) per_cat: dict[str, list[float]] = defaultdict(list) for sent in sentences: sent_lc = sent.lower() for cat_key, anchors in _CATEGORIES: if any(a in sent_lc for a in anchors): per_cat[cat_key].extend(_extract_durations_in(sent)) for cat_key, days_list in per_cat.items(): clusters = _cluster_values(days_list) if len(clusters) < 2: continue # Take min & max cluster center mins = [min(c) for c in clusters] mins.sort() samples = [_format_days(m) for m in mins[:3]] findings.append({ "check_id": "RETENTION-CONFLICT-001", "severity": "MEDIUM", "severity_reason": "inconsistent", "category": cat_key, "doc_type": doc_type, "values_days": sorted(set(round(d, 1) for d in days_list)), "title": ( f"Widersprüchliche Speicherdauer für " f"{_CATEGORY_LABELS.get(cat_key, cat_key)} im " f"{('Datenschutzerklärung' if doc_type == 'dse' else 'Cookie-Doc')}" ), "norm": "DSGVO Art. 5 Abs. 1 lit. a + Art. 13 Abs. 2 lit. a", "evidence": ( f"Genannte Werte: {', '.join(samples)}. " f"Bei DERSELBEN Datenkategorie dürfen nicht zwei " f"unterschiedliche Speicherdauern stehen — eine ist " f"falsch oder veraltet." ), "action": ( f"Speicherdauer für " f"{_CATEGORY_LABELS.get(cat_key, cat_key)} vereinheitlichen: " f"den korrekten Wert recherchieren und Doppelnennungen " f"streichen. Bei abgestuften Werten (z.B. Anonymisierung " f"nach 7 Tagen, Vollöschung nach 30 Tagen) explizit " f"als Stufen ausweisen." ), }) if findings: logger.info("B14 retention-conflict: %d finding(s)", len(findings)) return findings