#!/usr/bin/env python3 """P101 — Cookie-Library Auto-Enrich aus Snapshots. Geht alle compliance_check_snapshots durch und: 1. Extrahiert unique (cookie_name, vendor_hint) aus Phase-A/B/C-Cookies 2. Sammelt deklarierte Kategorie + Speicherdauer pro Cookie pro Site 3. Vergleicht mit cookie_library (Open-Cookie-Database + DACH) 4. Reportet: new_cookies, kategorie_mismatches, multi_site_inconsistencies Run im Container: docker exec bp-compliance-backend python /tmp/enrich.py """ from __future__ import annotations import json import re import sys from collections import defaultdict from typing import Any from database import engine from sqlalchemy import text def _category_from_text_context(cookie_name: str, doc_text: str) -> str | None: """Lookup cookie_name in doc_text + extract deklarierte Kategorie aus der Tabellen-Zeile darum (innerhalb 200 Zeichen davor/danach).""" if not doc_text or not cookie_name: return None idx = doc_text.find(cookie_name) if idx < 0: return None window = doc_text[max(0, idx - 50):idx + 400].lower() category_patterns = [ (r"(?:strictly[- ])?(?:notwendig|essential|funktional|funktionscookie|" r"funktional cookie|technisch notwendig)", "essential"), (r"(?:tracking|analytics|analyse|statistik|measurement|performance)", "statistics"), (r"(?:marketing|werbung|advertising|targeting|drittanbieter)", "marketing"), (r"(?:social[- ]?media|share|like|like[- ]?button)", "social_media"), ] for pat, cat in category_patterns: if re.search(pat, window): return cat return None def _purpose_text(cookie_name: str, doc_text: str) -> str | None: """Extract die Zweck-Beschreibung aus dem Doc-Text (Sätze um den Namen).""" if not doc_text or not cookie_name: return None idx = doc_text.find(cookie_name) if idx < 0: return None after = doc_text[idx + len(cookie_name):idx + len(cookie_name) + 400] sentences = re.split(r"[.\n]", after) text_lines = [s.strip() for s in sentences if 30 < len(s.strip()) < 300] return text_lines[0] if text_lines else None def main() -> int: with engine.connect() as c: rows = c.execute(text( "SELECT id, site_domain, doc_entries, banner_result " "FROM compliance.compliance_check_snapshots" )).fetchall() print(f"Loaded {len(rows)} snapshots", file=sys.stderr) # cookie_name -> list of observations observations: dict[str, list[dict]] = defaultdict(list) for row in rows: snap_id, domain, doc_entries, banner_result = row doc_entries = doc_entries or [] banner_result = banner_result or {} # Build combined doc_text fuer Kategorie-Inference (Cookie-Doc bevorzugt, # fallback DSE) doc_text = "" for e in doc_entries: if e.get("doc_type") in ("cookie", "dse"): t = e.get("text") or e.get("full_text") or e.get("text_preview") or "" if len(t) > len(doc_text): doc_text = t phases = (banner_result or {}).get("phases", {}) for phase_name in ("before_consent", "after_reject", "after_accept"): phase = phases.get(phase_name) or {} if not isinstance(phase, dict): continue cookies = phase.get("cookies") or [] for ck in cookies: # Snapshots: cookies sind meist string-Listen (Cookie-Namen), # vereinzelt dicts mit name/domain/max_age. if isinstance(ck, dict): cname = (ck.get("name") or "").strip() cdomain = (ck.get("domain") or "").lstrip(".").lower() max_age = ck.get("max_age") or ck.get("expires") else: cname = str(ck).strip() cdomain = "" max_age = None if not cname or len(cname) > 80: continue cat_declared = _category_from_text_context(cname, doc_text) purpose = _purpose_text(cname, doc_text) observations[cname].append({ "site": domain, "phase": phase_name, "cookie_domain": cdomain, "max_age": max_age, "declared_category": cat_declared, "declared_purpose": (purpose[:150] if purpose else None), }) print(f"\nUnique cookies observed: {len(observations)}\n") # Lookup vs cookie_library with engine.connect() as c: lib_rows = c.execute(text( "SELECT cookie_name, actual_category, vendor_name " "FROM compliance.cookie_library" )).fetchall() lib_lookup = {r[0].lower(): {"category": r[1], "vendor": r[2]} for r in lib_rows} new_cookies: list[str] = [] mismatches: list[dict] = [] inconsistencies: list[dict] = [] for cname, obs_list in observations.items(): sites = {o["site"] for o in obs_list} declared_cats = {o["declared_category"] for o in obs_list if o["declared_category"]} # 1) Multi-Site Inkonsistenz if len(declared_cats) > 1: inconsistencies.append({ "cookie": cname, "sites": list(sites), "categories": list(declared_cats), }) # 2) Library lookup lib_entry = lib_lookup.get(cname.lower()) if not lib_entry: new_cookies.append(cname) continue # 3) Mismatch declared vs library for dc in declared_cats: if dc and lib_entry["category"] != dc and lib_entry["category"] != "unknown": mismatches.append({ "cookie": cname, "declared_by_site": dc, "library_says": lib_entry["category"], "library_vendor": lib_entry["vendor"], "sites": list(sites), }) break # === Report === print("=" * 70) print(f"AUDIT-REPORT: P101 Cookie-Library Auto-Enrich") print(f" Snapshots: {len(rows)}") print(f" Unique cookies observed: {len(observations)}") print(f" In Library (Open-Cookie-DB + DACH): {len(observations) - len(new_cookies)}") print(f" NEW (unbekannt): {len(new_cookies)}") print(f" Mismatches (declared != library): {len(mismatches)}") print(f" Multi-Site Inkonsistenzen: {len(inconsistencies)}") print("=" * 70) print("\n--- TOP-20 NEW COOKIES (Kandidaten fuer Library-Enrich) ---") enriched_candidates: list[tuple[str, dict]] = [] for cname in new_cookies: obs = observations[cname] cats = [o["declared_category"] for o in obs if o["declared_category"]] primary_cat = cats[0] if cats else None purpose = next((o["declared_purpose"] for o in obs if o["declared_purpose"]), None) sites = sorted({o["site"] for o in obs}) if not primary_cat: continue # ohne deklarierte Kategorie nicht enrichbar confidence = min(0.6 + 0.1 * len(sites), 0.95) enriched_candidates.append((cname, { "category": primary_cat, "purpose": purpose, "sites": sites, "confidence": confidence, })) for cname, info in enriched_candidates[:20]: print(f" {cname:30s} [{info['category']:12s}] conf={info['confidence']} " f"sites={info['sites']}") if info.get("purpose"): print(f" purpose: {info['purpose'][:100]}") print(f"\n--- ALLE MISMATCHES ({len(mismatches)}) ---") for m in mismatches[:30]: print(f" {m['cookie']:30s} declared={m['declared_by_site']:12s} " f"library={m['library_says']:12s} " f"sites={m['sites']}") print(f"\n--- ALLE INKONSISTENZEN ({len(inconsistencies)}) ---") for i in inconsistencies[:30]: print(f" {i['cookie']:30s} cats={i['categories']} sites={i['sites']}") # Auto-Insert die mit confidence >= 0.75 print(f"\n--- AUTO-INSERTING in cookie_library (confidence>=0.75) ---") inserted = 0 with engine.begin() as c: for cname, info in enriched_candidates: if info["confidence"] < 0.75: continue r = c.execute(text(""" INSERT INTO compliance.cookie_library (cookie_name, domain_pattern, vendor_name, actual_category, purpose_de, source_name, source_url, source_license, confidence) VALUES (:n, '*', 'Mehrere OEMs (BreakPilot-Snapshot)', :cat, :pd, 'BreakPilot-Auto-Enrich', 'https://breakpilot.ai', 'CC-BY-eigene-Sammlung', :cf) ON CONFLICT DO NOTHING """), dict( n=cname[:200], cat=info["category"], pd=info.get("purpose") or f"Beobachtet bei {len(info['sites'])} OEMs", cf=info["confidence"], )) inserted += r.rowcount print(f" inserted: {inserted}") return 0 if __name__ == "__main__": sys.exit(main())