"""B3 wiring — Cross-doc retention consistency check + HTML block. Combines three sources of retention truth per cookie: - DSI text (state["doc_texts"]["dse"] or "cookie") - cookie-table `duration` from cmp_vendors[i]["cookies"][j] - actual cookie expiry from banner_result["cookies_detailed"][k] and produces per-cookie findings + a TH-RETENTION theme summary. Only renders an HTML block when there are findings to show; the block is sorted by severity (HIGH first) and shows the top-10 mismatches. """ from __future__ import annotations import html import logging import time from compliance.services.retention_comparator import ( build_retention_theme_summary, compare_retention, detect_intra_doc_contradictions, extract_retention_claims, ) logger = logging.getLogger(__name__) def _actual_max_age_seconds(cookie: dict) -> float | None: """Get cookie Max-Age in seconds. Playwright gives us `expires` as a Unix timestamp (seconds-since- epoch). Some sources give `max_age` directly. -1 / 0 means session cookie (no expiry) — return None to signal that. """ ma = cookie.get("max_age") if isinstance(ma, (int, float)) and ma > 0: return float(ma) exp = cookie.get("expires") if isinstance(exp, (int, float)) and exp > 0: delta = exp - time.time() if delta > 0: return float(delta) return None def run_b3(state: dict) -> None: """Cross-doc retention check + render HTML. Mutates state in place.""" doc_texts = state["doc_texts"] cmp_vendors = state["cmp_vendors"] banner_result = state["banner_result"] dsi_text = doc_texts.get("dse") or doc_texts.get("cookie") or "" if not dsi_text: return # Intra-doc contradictions are independent of cmp_vendors — run # them first so they survive the early-return below. intra = detect_intra_doc_contradictions(dsi_text) state["retention_intra_doc"] = intra cookie_records: list[dict] = [] cookie_names: list[str] = [] vendor_names: list[str] = [] for v in cmp_vendors or []: vname = (v.get("name") or "").strip() if vname: vendor_names.append(vname) for c in (v.get("cookies") or []): cname = (c.get("name") or "").strip() if not cname: continue duration = (c.get("duration") or c.get("persistence") or c.get("expiry") or "") cookie_names.append(cname) cookie_records.append({ "name": cname, "vendor": vname, "table_duration": duration, "actual_max_age": None, }) if not cookie_records: return # Match actual max_age from banner_result.cookies_detailed if banner_result: cookies_detailed = banner_result.get("cookies_detailed") or [] by_name: dict[str, dict] = {} for c in cookies_detailed: n = (c.get("name") or "").lower() if n: by_name[n] = c for rec in cookie_records: nm = rec["name"].lower() if nm in by_name: rec["actual_max_age"] = _actual_max_age_seconds(by_name[nm]) claims = extract_retention_claims(dsi_text, cookie_names, vendor_names) findings: list[dict] = [] for rec in cookie_records: finding = compare_retention( cookie_name=rec["name"], table_duration=rec["table_duration"], actual_max_age_seconds=rec["actual_max_age"], dsi_claims=claims, vendor_name=rec["vendor"] or None, ) findings.append(finding) summary = build_retention_theme_summary(findings) state["retention_findings"] = findings state["retention_theme_summary"] = summary state["retention_html"] = _render_block(summary, findings) logger.info( "B3 Retention: %d findings, %d passed, %d failed, %d incomplete", summary["total"], summary["passed"], summary["failed"], summary["incomplete"], ) def _fmt_days(d: float | None) -> str: if d is None: return "—" if d < 1: return f"{int(d * 24)}h" if d < 30: return f"{int(d)}d" if d < 365: return f"{int(d / 30)}mo" return f"{d / 365:.1f}y" def _render_block(summary: dict, findings: list[dict]) -> str: if summary["total"] == 0: return "" failed_findings = [f for f in findings if not f.get("matches") and f.get("severity_reason") != "incomplete"] if not failed_findings: return "" # all OK, no block needed # Sort by severity (HIGH first) then diff_days desc sev_rank = {"HIGH": 0, "MEDIUM": 1, "LOW": 2} failed_findings.sort(key=lambda f: ( sev_rank.get((f.get("severity") or "").upper(), 9), -(f.get("diff_days") or 0), )) rows = [] for f in failed_findings[:10]: sev = (f.get("severity") or "").upper() color = ("#dc2626" if sev == "HIGH" else "#f59e0b" if sev == "MEDIUM" else "#64748b") rows.append( "" f"" f"{html.escape(f.get('cookie_name') or '—')}" f"" f"{html.escape((f.get('vendor_name') or '—'))}" f"" f"DSI: {_fmt_days(f.get('dsi_days'))} • " f"Tabelle: {_fmt_days(f.get('table_days'))} • " f"Realität: {_fmt_days(f.get('actual_days'))}" f"" f"{sev} ({html.escape(f.get('mismatch_type') or '—')})" "" ) total = summary["total"] passed = summary["passed"] failed = summary["failed"] incomplete = summary["incomplete"] return ( "
" "

" "TH-RETENTION — Speicherdauer-Konsistenz (DSI ↔ Cookie-Tabelle ↔ Realität)" "

" "

" f"{total} Cookies verglichen: " f"{passed} ✓ / " f"{failed} ✗ / " f"{incomplete} ?

" "" "" "" "" "" "" "" f"{''.join(rows)}" "
CookieVendorWerteMismatch
" "
" )