"""3-Layer Cookie-Lookup-Service. Hierarchie (höchste Priorität zuerst): 1. **Override-Layer**: cookie_knowledge_db.py + cookie_knowledge_extended.py — BreakPilot-kuratierte Einträge mit Schrems-II / EUGH-Rulings / EU-Alternative. IP-relevante Annotationen. 2. **Truth-Base**: compliance.cookie_library (PostgreSQL, ~2287 Einträge aus Open Cookie Database, CC0 Public Domain). actual_category + typical_max_age + Vendor-Country. 3. **Auto-Learning**: compliance.cookie_behavior_audits — Cookies die wir bei Audits beobachtet aber noch nicht klassifiziert haben. Cross-Site-Konsens (≥3 Sites mit gleichem declared_purpose) macht sie zu Promotion-Kandidaten. Match-Strategie (in dieser Reihenfolge): A. exact name match (case-insensitive) B. prefix match (mind. 3 Chars, falls Cookie wie "_ga" einen runtime-suffix wie "_ga_K8YL3M9T" hat) C. wildcard match (cookie_library.domain_pattern + cookie_name mit Suffix-Wildcard z.B. "_pk_id.*") Return: dict mit konsolidierter Sicht über alle 3 Layer + source-tag. """ from __future__ import annotations import logging import re from typing import Any logger = logging.getLogger(__name__) def _norm(s: str) -> str: return (s or "").strip().lower() def _strip_wildcards(s: str) -> str: out = _norm(s) out = out.replace("*", "").replace("…", "") out = re.sub(r"\.\*$", "", out) # Trailing separator (_, -, .) is implicit wildcard in the OCD — # "guest_uuid_essential_" means "guest_uuid_essential_anything". out = out.rstrip("_-.") return out.strip() _SEPARATORS = ("_", "-", ".", "[", ":", "$", "%") def _name_matches(library_name: str, query_name: str) -> bool: """Match-Rules zwischen einem cookie_library-Eintrag und der Anfrage. Beispiele: lib="_ga" vs query="_ga_K8YL3M9T" → True (prefix + separator) lib="_pk_id.*" vs query="_pk_id.5.7d8" → True (wildcard) lib="__cf_bm" vs query="__cf_bm" → True (exact) lib="c" vs query="completely_unknown" → False (no separator) lib="ID" vs query="IDcharger" → False (no separator) Regel: Prefix-Match ist nur gültig wenn das Trennzeichen nach dem Prefix in der Query ein Separator ist (oder Query endet). Verhindert false-positives bei kurzen library-Namen ("c", "id", "u"). """ lib = _strip_wildcards(library_name) q = _strip_wildcards(query_name) if not lib or not q: return False if lib == q: return True if not _is_specific_enough(lib): # Kurze generische Namen wie "c", "id" brauchen exakt-match return False if q.startswith(lib): # Prefix-Match nur wenn nächstes Zeichen ein Separator ist nxt = q[len(lib):len(lib) + 1] if not nxt or nxt in _SEPARATORS: return True if _is_specific_enough(q) and lib.startswith(q): nxt = lib[len(q):len(q) + 1] if not nxt or nxt in _SEPARATORS: return True return False def _is_specific_enough(name: str) -> bool: """Cookie-Name ist spezifisch genug für prefix-match. Regel: ≥5 Chars ODER enthält Separator (_, -, .). Filtert 1-3-Char Garbage ("c", "ID") aber lässt "_ga" / "fr" durch wenn präfixiert. """ if len(name) >= 5: return True return any(sep in name for sep in ("_", "-", ".", "[")) def _load_override_layer(name: str) -> dict | None: """Layer 1: BreakPilot-kuratiert (Schrems-II IP). Exact-first, then fuzzy across both KBs. Browser-Cookies haben oft runtime-Suffixes (`_ga_K8YL3M9T`); ohne Fuzzy würden wir die Schrems-II-Annotationen für `_ga` verfehlen. """ try: from .cookie_knowledge_db import KB as KB_DB, lookup_cookie from .cookie_knowledge_extended import ( KB_EXT, lookup_cookie_extended, ) except Exception as e: logger.warning("override-layer load failed: %s", e) return None hit = lookup_cookie_extended(name) or lookup_cookie(name) if hit: return {**hit, "_layer": "override"} # Fuzzy: iterate both KBs and apply _name_matches for kb in (KB_EXT, KB_DB): for lib_name, entry in kb.items(): if _name_matches(lib_name, name): out = dict(entry) out["_layer"] = "override" out["_matched_name"] = lib_name return out return None def _load_truth_base(name: str, domain: str = "") -> dict | None: """Layer 2: compliance.cookie_library DB-Lookup mit fuzzy match.""" try: from database import SessionLocal from sqlalchemy import text except Exception: return None db = SessionLocal() try: # First: exact match on cookie_name (fast) r = db.execute( text( "SELECT id, cookie_name, vendor_name, vendor_country, " "vendor_privacy_url, vendor_opt_out_url, actual_category, " "purpose_de, purpose_en, value_pattern, " "typical_max_age_seconds, data_receivers, is_pii, " "source_name, source_license, confidence " "FROM compliance.cookie_library " "WHERE LOWER(cookie_name) = LOWER(:n) LIMIT 1" ), {"n": name}, ).mappings().first() if not r: # Fuzzy-prefix: any library entry whose name is a prefix # of the query (or vice versa). Bounded to 20 rows for # perf — the 2287-row table is small enough to scan. stripped = _strip_wildcards(name) if len(stripped) >= 3: candidates = db.execute( text( "SELECT id, cookie_name, vendor_name, " "vendor_country, vendor_privacy_url, " "vendor_opt_out_url, actual_category, purpose_de, " "purpose_en, value_pattern, " "typical_max_age_seconds, data_receivers, is_pii, " "source_name, source_license, confidence " "FROM compliance.cookie_library " "WHERE LOWER(cookie_name) LIKE :prefix " "OR LOWER(:n) LIKE LOWER(cookie_name) || '%' " "LIMIT 20" ), {"prefix": f"{stripped[:6].lower()}%", "n": name}, ).mappings().all() for c in candidates: if _name_matches(c["cookie_name"], name): r = c break if r: out = dict(r) out["_layer"] = "truth_base" return out return None except Exception as e: logger.info("truth_base lookup failed for %s: %s", name, e) return None finally: db.close() def _load_auto_learning(name: str) -> dict | None: """Layer 3: was haben wir bei früheren Audits beobachtet? Wenn ≥3 unterschiedliche Sites denselben Cookie mit ähnlichem declared_purpose deklarieren → return Konsens. """ try: from database import SessionLocal from sqlalchemy import text except Exception: return None db = SessionLocal() try: r = db.execute( text( "SELECT cookie_name, " " COUNT(DISTINCT site_url) AS site_count, " " MODE() WITHIN GROUP (ORDER BY declared_category) " " AS consensus_category, " " MAX(observed_max_age_seconds) AS max_observed_age " "FROM compliance.cookie_behavior_audits " "WHERE LOWER(cookie_name) = LOWER(:n) " "GROUP BY cookie_name " "HAVING COUNT(*) >= 1" ), {"n": name}, ).mappings().first() if r: return { "cookie_name": r["cookie_name"], "consensus_category": r["consensus_category"], "observed_on_sites": r["site_count"], "max_observed_age_seconds": r["max_observed_age"], "_layer": "auto_learning", } return None except Exception as e: logger.info("auto_learning lookup failed for %s: %s", name, e) return None finally: db.close() def lookup(name: str, domain: str = "") -> dict[str, Any]: """3-Layer-Lookup. Returns merged dict with `_layer` showing the highest-priority source that contributed.""" out: dict[str, Any] = {"name": name, "_found": False, "_layer": "unknown"} truth = _load_truth_base(name, domain) if truth: out.update(truth) out["_found"] = True auto = _load_auto_learning(name) if auto: out.setdefault("consensus_category", auto.get("consensus_category")) out.setdefault("observed_on_sites", auto.get("observed_on_sites")) out["_found"] = True # If truth_base wasn't a hit, fall back to auto layer if out.get("_layer") == "unknown": out["_layer"] = "auto_learning" override = _load_override_layer(name) if override: # Override wins for ALL annotation fields (schrems_ii, eu_alt, # eugh_rulings). Truth-base actual_category SURVIVES — override # is purely additive annotations from BreakPilot research. annotation_keys = { "schrems_ii_status", "eugh_rulings", "exact_purpose", "data_collected", "ip_relevant", "ip_anonymized", "tcf_purpose_ids", "iab_vendor_id", "typical_lifetime", "reid_risk", "technical_necessity", "eu_alternative_cookies", "eu_alternative_vendor", "notes", } for k in annotation_keys: if k in override: out[k] = override[k] # Vendor-country override (BreakPilot recheck often more precise) if override.get("vendor_country"): out["vendor_country"] = override["vendor_country"] out["_layer"] = "override" out["_found"] = True return out def lookup_actual_category(name: str) -> str | None: """Convenience: return only the actual_category from truth-base / auto-learning. None if unknown.""" hit = lookup(name) return hit.get("actual_category") or hit.get("consensus_category")