""" B3 — Cross-Doc Retention Consistency Comparator. Compares three sources of truth for cookie storage duration: 1. DSI claim — sentence(s) in the privacy policy mentioning retention ("Die Speicherdauer beträgt 14 Monate", "_ga: 14 Monate", ...). 2. Cookie-table — the `duration` field parsed from the cookie policy table (parse_flat_cookie_text / OCR / vendor-extract). 3. Actual cookie — `Max-Age` / `Expires` from the real Set-Cookie header captured by the consent-tester. Output is a per-cookie finding usable by the audit report: - matches=True → all three sources agree (within tolerance) - matches=False → mismatch with explicit type + severity_reason Severity hierarchy (see project_audit_report_architecture.md): HIGH/factually_wrong : DSI claim is shorter than reality → user is told "X" but tracked for longer HIGH/factually_wrong : table duration is shorter than reality → cookie table understates what is set MEDIUM/misclassified : DSI is shorter than table (internal docs disagree) LOW/incomplete : only one source has data The module is pure (no DB, no network) and meant to be called from the report pipeline after cookies+DSI+HAR have already been collected. """ from __future__ import annotations import logging import re from dataclasses import dataclass logger = logging.getLogger(__name__) # 5% tolerance — Safari ITP, leap years, server clocks etc. _MATCH_TOLERANCE_PCT = 5 # Multipliers in DAYS for the German + English unit vocabulary used in # our cookie tables and policies. _UNIT_DAYS: dict[str, float] = { "sekunden": 1 / 86400, "sekunde": 1 / 86400, "sec": 1 / 86400, "s": 1 / 86400, "minuten": 1 / 1440, "minute": 1 / 1440, "min": 1 / 1440, "stunden": 1 / 24, "stunde": 1 / 24, "h": 1 / 24, "tage": 1, "tag": 1, "d": 1, "day": 1, "days": 1, "wochen": 7, "woche": 7, "week": 7, "weeks": 7, "monate": 30, "monat": 30, "month": 30, "months": 30, "jahre": 365, "jahr": 365, "year": 365, "years": 365, } # Phrases that mean "session" — cookie deleted when browser closes. _SESSION_TOKENS = { "session", "sitzung", "sitzungsdauer", "browsersitzung", "browser session", "browsing session", "tab", } # Phrases that mean "persistent without explicit cap". _NO_EXPIRY_TOKENS = { "unbegrenzt", "unbestimmt", "kein ablaufdatum", "no expiry", "persistent", "permanent", } @dataclass class RetentionClaim: """One retention statement found in the DSI text.""" sentence: str days: float | None # None for session/unknown is_session: bool is_persistent: bool context_terms: list[str] # cookie names / provider names mentioned nearby def parse_duration_to_days(text: str) -> tuple[float | None, str]: """Convert a duration phrase to days. Returns (days, kind) where kind ∈ {"days", "session", "persistent", "unknown"}. For "session" / "persistent" days is None — comparisons must handle these as special cases, not as 0 or infinity. """ if text is None: return None, "unknown" s = text.strip().lower() if not s: return None, "unknown" for tok in _SESSION_TOKENS: if tok in s: return None, "session" for tok in _NO_EXPIRY_TOKENS: if tok in s: return None, "persistent" # "14 Monate", "1 Jahr", "24h", "30 Tage", "365 Tage", "30d" m = re.search( r"(?P\d+(?:[.,]\d+)?)\s*(?P" r"sekunden?|sec|s|minuten?|min|stunden?|h|" r"tage?|d(?:ays?)?|wochen?|weeks?|" r"monate?|months?|jahre?|years?)\b", s, ) if not m: return None, "unknown" num = float(m.group("num").replace(",", ".")) unit = m.group("unit") mult = _UNIT_DAYS.get(unit) if mult is None: return None, "unknown" return num * mult, "days" def max_age_to_days(max_age_seconds: int | float | None) -> float | None: """Convert a Set-Cookie Max-Age (in seconds) to days.""" if max_age_seconds is None: return None try: return float(max_age_seconds) / 86400.0 except (TypeError, ValueError): return None # Sentence splitter that respects German legal text style (lots of # semicolons + parentheses but few capitalised abbreviations). _SENTENCE_SPLIT = re.compile(r"(?<=[.!?])\s+(?=[A-ZÄÖÜ])") # Quick anchor terms for retention sentences. _RETENTION_ANCHORS = ( "speicherdauer", "speicherfrist", "speicher", "aufbewahrungsdauer", "aufbewahrungsfrist", "aufbewahr", "löschfrist", "löschung", "gelöscht", "gespeichert für", "wird gespeichert", "wird für", "werden für", "in der regel", "bis zu", "retention", "expires", "expiration", "lifetime", "gültigkeit", "laufzeit", ) def _looks_like_retention(sentence: str) -> bool: s = sentence.lower() if not any(a in s for a in _RETENTION_ANCHORS): return False # Need a unit token nearby — otherwise it's metadata not duration. return bool(re.search( r"\b\d[\d.,]*\s*(" r"sekunden?|minuten?|stunden?|tage?|wochen?|" r"monate?|jahre?|sec|min|h|d|" r"weeks?|months?|years?|days?)\b", s, )) def extract_retention_claims( dsi_text: str, cookie_names: list[str] | None = None, vendor_names: list[str] | None = None, ) -> list[RetentionClaim]: """Find sentences in the DSI that state a retention period. cookie_names / vendor_names attach themselves to a sentence when they are mentioned in it; the comparator uses this to prefer the most specific claim available for a given cookie. """ if not dsi_text: return [] cookie_names = cookie_names or [] vendor_names = vendor_names or [] # Normalise — keep original case for the sentence so it can be # cited verbatim in the audit report. sentences = _SENTENCE_SPLIT.split(dsi_text) claims: list[RetentionClaim] = [] for raw in sentences: s = raw.strip() if not s: continue if not _looks_like_retention(s): continue days, kind = parse_duration_to_days(s) lower = s.lower() contexts: list[str] = [] for n in cookie_names: if n and n.lower() in lower: contexts.append(n) for v in vendor_names: if v and v.lower() in lower: contexts.append(v) claims.append(RetentionClaim( sentence=s[:400], days=days, is_session=(kind == "session"), is_persistent=(kind == "persistent"), context_terms=contexts, )) return claims def _best_dsi_claim( claims: list[RetentionClaim], cookie_name: str, vendor_name: str | None, ) -> RetentionClaim | None: """Pick the most specific DSI claim for a given cookie. Priority: claim that mentions the cookie name > claim that mentions the vendor > generic (no context). """ if not claims: return None by_cookie = [c for c in claims if cookie_name and cookie_name in c.context_terms] if by_cookie: return by_cookie[0] if vendor_name: by_vendor = [c for c in claims if vendor_name in c.context_terms] if by_vendor: return by_vendor[0] generic = [c for c in claims if not c.context_terms] return generic[0] if generic else claims[0] def _within_tolerance(a: float, b: float) -> bool: if a == 0 and b == 0: return True base = max(abs(a), abs(b)) return abs(a - b) <= base * (_MATCH_TOLERANCE_PCT / 100.0) def compare_retention( cookie_name: str, table_duration: str | None, actual_max_age_seconds: int | float | None, dsi_claims: list[RetentionClaim] | None = None, vendor_name: str | None = None, ) -> dict: """Per-cookie three-way retention comparison. Returns a finding dict suitable for the audit-report aggregator (theme = TH-RETENTION). Output schema is stable — extending it must be additive so existing tests stay green. """ table_days, table_kind = parse_duration_to_days(table_duration or "") actual_days = max_age_to_days(actual_max_age_seconds) dsi_claim = _best_dsi_claim( dsi_claims or [], cookie_name, vendor_name, ) dsi_days = dsi_claim.days if dsi_claim else None out: dict = { "cookie_name": cookie_name, "vendor_name": vendor_name, "table_duration_raw": table_duration, "table_days": table_days, "table_kind": table_kind, "actual_max_age_seconds": actual_max_age_seconds, "actual_days": actual_days, "dsi_days": dsi_days, "dsi_sentence": dsi_claim.sentence if dsi_claim else None, "dsi_context_terms": dsi_claim.context_terms if dsi_claim else [], "matches": True, "mismatch_type": None, "severity_reason": None, "severity": None, "diff_days": None, "notes": [], } sources = [v for v in (table_days, actual_days, dsi_days) if v is not None] if len(sources) <= 1: out["severity_reason"] = "incomplete" out["severity"] = "LOW" out["notes"].append("only_one_source_has_data") return out # Highest-severity check first: DSI claim is shorter than the cookie # actually lives — user was misled. if dsi_days is not None and actual_days is not None: if not _within_tolerance(dsi_days, actual_days): if dsi_days < actual_days: out["matches"] = False out["mismatch_type"] = "dsi_under_actual" out["severity_reason"] = "factually_wrong" out["severity"] = "HIGH" out["diff_days"] = actual_days - dsi_days # Cookie table understates reality — second highest. if (out["matches"] and table_days is not None and actual_days is not None): if not _within_tolerance(table_days, actual_days): if table_days < actual_days: out["matches"] = False out["mismatch_type"] = "table_under_actual" out["severity_reason"] = "factually_wrong" out["severity"] = "HIGH" out["diff_days"] = actual_days - table_days # Internal disagreement DSI vs. table (less severe — both are # documentation, neither contradicts the live cookie). if (out["matches"] and dsi_days is not None and table_days is not None): if not _within_tolerance(dsi_days, table_days): out["matches"] = False out["mismatch_type"] = "dsi_vs_table" out["severity_reason"] = "misclassified" out["severity"] = "MEDIUM" out["diff_days"] = abs(dsi_days - table_days) # Catch over-declaration too — table says "2 years" but cookie # lives 7 days (Safari ITP). Less severe but worth flagging. if (out["matches"] and table_days is not None and actual_days is not None): if (not _within_tolerance(table_days, actual_days) and table_days > actual_days): out["matches"] = False out["mismatch_type"] = "actual_under_table" out["severity_reason"] = "incomplete" out["severity"] = "LOW" out["notes"].append("possible_safari_itp_cap") out["diff_days"] = table_days - actual_days return out def detect_intra_doc_contradictions( dsi_text: str, ) -> list[dict]: """Find sentences in the SAME doc that claim different retention values for what looks like the same data category. Catches the Elli pattern: "Logfiles werden 7 Tage gespeichert" + "Logfiles werden 30 Tage aufbewahrt" → contradiction in one DSE. Heuristik: group retention-bearing sentences by a category-anchor keyword (logfile / log / chatverlauf / cookies / nutzungsdaten / server-log) and report when ≥2 different day-values exist for the same group. """ if not dsi_text: return [] claims = extract_retention_claims(dsi_text) if len(claims) < 2: return [] anchors = ( ("logfile", ("logfile", "log-file", "log file", "server-log")), ("chat", ("chat", "chatverlauf", "konversation")), ("cookie", ("cookie",)), ("session", ("session", "sitzung")), ("nutzungsdaten", ("nutzungsdaten", "usage data")), ) by_group: dict[str, list[RetentionClaim]] = {} for cl in claims: if cl.days is None: continue sentence_lc = cl.sentence.lower() for group, kws in anchors: if any(k in sentence_lc for k in kws): by_group.setdefault(group, []).append(cl) break findings: list[dict] = [] for group, group_claims in by_group.items(): days_set = {round(c.days, 1) for c in group_claims if c.days} if len(days_set) < 2: continue values = sorted(days_set) delta = values[-1] - values[0] sev = "HIGH" if delta > values[0] * 3 else "MEDIUM" findings.append({ "check_id": "TH-RETENTION-INTRA-001", "category": group, "severity": sev, "severity_reason": "factually_wrong", "values_days": values, "claims": [c.sentence[:200] for c in group_claims[:3]], "title": ( f"Speicherdauer-Widerspruch in DSE für '{group}': " f"{values} Tage" ), "norm": "DSGVO Art. 5 Abs. 1 lit. a (Transparenz)", "action": ( f"In der DSE einheitlichen Wert für '{group}' angeben. " "Aktuell mindestens zwei verschiedene Werte genannt — " "ein Mandant kann die Frist nicht eindeutig erkennen." ), }) return findings def build_retention_theme_summary( findings: list[dict], ) -> dict: """Aggregate per-cookie findings into the per-theme block used by the report (theme = TH-RETENTION). """ total = len(findings) incomplete = sum( 1 for f in findings if f.get("severity_reason") == "incomplete" ) # Incomplete findings keep matches=True (we did not observe a # mismatch), but they don't count as a verified pass either. passed = sum( 1 for f in findings if f.get("matches") and f.get("severity_reason") != "incomplete" ) failed = total - passed - incomplete by_severity: dict[str, int] = {} by_type: dict[str, int] = {} for f in findings: sev = f.get("severity") if sev: by_severity[sev] = by_severity.get(sev, 0) + 1 mt = f.get("mismatch_type") if mt: by_type[mt] = by_type.get(mt, 0) + 1 return { "theme_id": "TH-RETENTION", "total": total, "passed": passed, "failed": failed, "incomplete": incomplete, "pct": int(round(100 * passed / total)) if total else 0, "by_severity": by_severity, "by_mismatch_type": by_type, "top_fails": sorted( (f for f in findings if not f.get("matches") and f.get("severity_reason") == "factually_wrong"), key=lambda f: -(f.get("diff_days") or 0), )[:10], }