diff --git a/backend-compliance/compliance/api/agent_check/_b19_wiring.py b/backend-compliance/compliance/api/agent_check/_b19_wiring.py new file mode 100644 index 00000000..ea9d0be3 --- /dev/null +++ b/backend-compliance/compliance/api/agent_check/_b19_wiring.py @@ -0,0 +1,100 @@ +"""B19 wiring — Cookie-Coherence-Check (Salesforce-as-essential).""" + +from __future__ import annotations + +import html +import logging +from collections import Counter + +from compliance.services.cookie_coherence_check import check_cookie_coherence + +logger = logging.getLogger(__name__) + + +def run_b19(state: dict) -> None: + # Step 3 — Auto-Learning: alle deklarierten Cookies dieser Site + # in cookie_behavior_audits loggen (Cross-Site-Konsens-Basis). + try: + from compliance.services.cookie_observation_logger import ( + log_observations, + ) + stats = log_observations(state) + logger.info("B19 observation-logger: %s", stats) + except Exception as e: + logger.warning("observation-logger skipped: %s", e) + + new = check_cookie_coherence(state) + if not new: + return + extras = state.get("extra_findings") or [] + extras.extend(new) + state["extra_findings"] = extras + state["cookie_coherence_html"] = _render(new) + state["cookie_coherence_findings"] = new + logger.info("B19 cookie-coherence: %d finding(s)", len(new)) + + +def _render(findings: list[dict]) -> str: + # Aggregate per type for the summary chip + by_type = Counter(f.get("check_id") for f in findings) + severity_color = { + "HIGH": "#dc2626", "MEDIUM": "#f59e0b", "LOW": "#64748b", + } + # Show only the top 12 cards in the mail; rest goes to CSV + cards = [] + for f in findings[:12]: + sev = (f.get("severity") or "").upper() + color = severity_color.get(sev, "#475569") + meta = "" + if f.get("cookie_name"): + meta += ( + "
" + f"Cookie: {html.escape(f['cookie_name'])}" + f" · Vendor: {html.escape(f.get('vendor') or '?')}" + "
" + ) + if f.get("declared_category"): + meta += ( + "
" + f"declared: {html.escape(f['declared_category'])}" + + (f" · actual (KB): {html.escape(f['actual_category'])}" + if f.get("actual_category") else "") + + "
" + ) + cards.append( + f"
" + f"
" + f"{sev} · {html.escape(f.get('check_id') or '')}
" + f"
" + f"{html.escape(f.get('title') or '')}
" + f"
" + f"{html.escape(f.get('norm') or '')}
" + f"{meta}" + f"
" + f"{html.escape(f.get('evidence') or '')}
" + f"
" + f"→ Abstellung: " + f"{html.escape(f.get('recommended_action') or '')}
" + "
" + ) + type_summary = " · ".join( + f"{k.split('-')[-1]}: {v}" for k, v in by_type.most_common() + ) + return ( + "
" + "

" + f"🍪 Cookie-Kohärenz ({len(findings)} Befunde)" + "

" + f"

" + f"Vergleich Site-Deklaration vs Open Cookie Database (2287) + " + f"BreakPilot-KB.
Verteilung: {type_summary}

" + + "".join(cards) + + (f"

" + f"… und {len(findings)-12} weitere — vollständige Liste " + f"in cookies-full.csv im ZIP-Anhang.

" + if len(findings) > 12 else "") + + "
" + ) diff --git a/backend-compliance/compliance/api/agent_check/_orchestrator.py b/backend-compliance/compliance/api/agent_check/_orchestrator.py index 053a7a11..72e6b08b 100644 --- a/backend-compliance/compliance/api/agent_check/_orchestrator.py +++ b/backend-compliance/compliance/api/agent_check/_orchestrator.py @@ -29,6 +29,7 @@ from ._b15_wiring import run_b15 from ._b16_wiring import run_b16 from ._b17_wiring import run_b17 from ._b18_wiring import run_b18 +from ._b19_wiring import run_b19 from ._constants import _compliance_check_jobs from ._phase_a_resolve import run_phase_a from ._phase_b_profile_check import run_phase_b @@ -92,6 +93,7 @@ async def run_compliance_check(check_id: str, req) -> None: run_b16(state) # Footer-Label-vs-URL-Slug-Drift await run_b17(state) # Audit-Walk-Video (Beweis-Aufzeichnung) await run_b18(state) # Impressum-Specialist-Agent (Pattern+LLM) + run_b19(state) # Cookie-Coherence (Salesforce-as-essential) # Phase D-3 top/mid/bot: Step 5 HTML blocks await run_phase_d3_top(state) await run_phase_d3_mid(state) diff --git a/backend-compliance/compliance/api/agent_check/_phase_e_email.py b/backend-compliance/compliance/api/agent_check/_phase_e_email.py index 466d239a..e7559733 100644 --- a/backend-compliance/compliance/api/agent_check/_phase_e_email.py +++ b/backend-compliance/compliance/api/agent_check/_phase_e_email.py @@ -62,6 +62,41 @@ def run_phase_e(state: dict) -> None: except Exception as e: logger.warning("A1 evidence-zip build failed: %s", e) + # B17 audit-walk: bundle video + walk.json + README into a second + # ZIP attachment. Reviewer hat den Beweis-Film direkt im Postfach. + audit_walk = state.get("audit_walk") + if audit_walk and audit_walk.get("walk_id"): + try: + from compliance.services.audit_walk_zip_builder import ( + build_audit_walk_zip, + ) + walk_zip = build_audit_walk_zip( + audit_walk, + extra_files=_build_cookie_csv_extra(state, check_id), + ) + if walk_zip: + evidence_attachments.append({ + "filename": f"audit-walk-{check_id[:8]}.zip", + "data": walk_zip, + "mime": "application/zip", + }) + except Exception as e: + logger.warning("audit-walk-zip build failed: %s", e) + + +def _build_cookie_csv_extra(state: dict, check_id: str) -> dict[str, bytes]: + """B19 Step 4: cookies-full.csv ins Walk-ZIP. Returns {filename: bytes}.""" + if not state.get("cmp_vendors"): + return {} + try: + from compliance.services.cookie_csv_exporter import build_cookie_csv + csv_bytes = build_cookie_csv(state) + if csv_bytes: + return {f"cookies-full-{check_id[:8]}.csv": csv_bytes} + except Exception as e: + logger.warning("cookie-csv build failed: %s", e) + return {} + email_result = send_email( recipient=req.recipient, subject=f"[COMPLIANCE-CHECK] {site_name} — {doc_count} Dokumente geprueft", diff --git a/backend-compliance/compliance/services/audit_walk_zip_builder.py b/backend-compliance/compliance/services/audit_walk_zip_builder.py index 1300164a..cd8ce142 100644 --- a/backend-compliance/compliance/services/audit_walk_zip_builder.py +++ b/backend-compliance/compliance/services/audit_walk_zip_builder.py @@ -66,8 +66,13 @@ Zur Verifikation: def build_audit_walk_zip( walk: dict, consent_tester_url: str = "http://bp-compliance-consent-tester:8094", + extra_files: dict[str, bytes] | None = None, ) -> bytes: - """Fetch video from consent-tester + bundle with walk.json + README.""" + """Fetch video from consent-tester + bundle with walk.json + README. + + `extra_files` is optional name→bytes mapping (e.g. cookies-full.csv + from B19 export). Placed at the ZIP root next to video.webm. + """ wid = walk.get("walk_id") or "" if not wid: return b"" @@ -107,4 +112,11 @@ def build_audit_walk_zip( except Exception as e: logger.warning("annotation %s write failed: %s", fname, e) + for fname, content in (extra_files or {}).items(): + if content: + try: + z.writestr(fname, content) + except Exception as e: + logger.warning("extra-file %s write failed: %s", + fname, e) return buf.getvalue() diff --git a/backend-compliance/compliance/services/cookie_coherence_check.py b/backend-compliance/compliance/services/cookie_coherence_check.py new file mode 100644 index 00000000..92265979 --- /dev/null +++ b/backend-compliance/compliance/services/cookie_coherence_check.py @@ -0,0 +1,299 @@ +"""B19 — Cookie-Coherence-Check. + +Pro Cookie aus state["cmp_vendors"]: Lookup in 3-Layer-DB und +Vergleich der DEKLARATION (was die Site behauptet) mit der TRUTH +(was die Open Cookie Database / BreakPilot-KB sagt). Emittiert +Findings für die Salesforce-as-essential Falsch-Klassifikation. + +Finding-Typen: + - MARKETING_AS_ESSENTIAL: actual=marketing, declared=essential/functional + - LIFETIME_TOO_LONG_FOR_ESSENTIAL: declared=essential, lifetime >90d + - PSEUDO_PURPOSE: purpose ist Floskel ("Siehe dazugehörige + Datenverarbeitung", "Sehen Sie unter ...") + - DUPLICATE_VENDOR: derselbe Vendor in mehreren Kategorien + - UNKNOWN_VENDOR_NO_LIBRARY: Cookie nicht in cookie_library, nicht + in OCD → muss menschlich klassifiziert werden + - MISSING_COUNTRY: vendor_country leer in Deklaration + - MISSING_RETENTION: declared duration leer + +Jedes Finding kommt mit `recommended_action` — konkretes was-zu-tun. +""" + +from __future__ import annotations + +import logging +import re +from collections import defaultdict + +from .cookie_library_lookup import lookup as kb_lookup + +logger = logging.getLogger(__name__) + + +_PSEUDO_PURPOSE_PATTERNS = ( + "siehe dazugehörige datenverarbeitung", + "siehe dazugehoerige datenverarbeitung", + "siehe oben", + "see related", + "see corresponding", + "wird unter", + "see above", + "see vendor", + "wie oben beschrieben", +) + + +def _is_essential_category(decl: str) -> bool: + s = (decl or "").lower() + return any(t in s for t in ( + "essential", "essenziell", "essentiell", "necessary", + "erforderlich", "technisch notwendig", "strictly necessary", + "notwendig", "required", + )) + + +def _is_marketing_category(actual: str) -> bool: + return (actual or "").lower() in ( + "marketing", "advertising", "social_media", + ) + + +def _parse_lifetime_to_days(text: str) -> float | None: + if not text: + return None + try: + from .retention_comparator import parse_duration_to_days + days, kind = parse_duration_to_days(text) + if kind == "session": + return 0.0 + if kind in ("persistent", "unknown"): + return None + return days + except Exception: + return None + + +def _is_pseudo_purpose(purpose: str) -> bool: + if not purpose: + return True + s = purpose.lower().strip() + if any(p in s for p in _PSEUDO_PURPOSE_PATTERNS): + return True + # Less than 4 words counts as "no real purpose given" + if len(re.findall(r"\w+", s)) < 4: + return True + return False + + +def _norm_vendor(name: str) -> str: + s = (name or "").lower().strip() + s = re.sub(r"\binc\.?$|\bllc\.?$|\bsas\.?$|\bgmbh\.?$|" + r"\bag\.?$|\bb\.v\.?$|\bs\.a\.?$", "", s) + s = s.replace(",", " ").strip() + return re.sub(r"\s+", " ", s) + + +def check_cookie_coherence(state: dict) -> list[dict]: + """Iterate cmp_vendors + cookies, emit B19 findings.""" + cmp_vendors = state.get("cmp_vendors") or [] + if not cmp_vendors: + return [] + + findings: list[dict] = [] + # Track vendor → set of declared categories (DUPLICATE_VENDOR-Detector) + vendor_categories: dict[str, set[str]] = defaultdict(set) + + for v in cmp_vendors: + vendor_name = (v.get("name") or "").strip() + vendor_country = (v.get("country") or "").strip() + vendor_category = (v.get("category") or "").strip().lower() + if vendor_name and vendor_category: + vendor_categories[_norm_vendor(vendor_name)].add(vendor_category) + + for c in (v.get("cookies") or []): + cname = (c.get("name") or "").strip() + if not cname: + continue + declared_cat = (c.get("category") or vendor_category).lower() + declared_purpose = (c.get("purpose") or v.get("purpose") + or "").strip() + declared_lifetime = (c.get("duration") or c.get("persistence") + or c.get("expiry") or "").strip() + declared_days = _parse_lifetime_to_days(declared_lifetime) + + kb = kb_lookup(cname) + actual = (kb.get("actual_category") + or kb.get("consensus_category") or "").lower() + layer = kb.get("_layer") + + # FINDING 1: MARKETING-AS-ESSENTIAL + if actual and _is_marketing_category(actual): + if _is_essential_category(declared_cat): + findings.append({ + "check_id": "COOKIE-COHERENCE-MAE-001", + "severity": "HIGH", + "severity_reason": "misclassified", + "cookie_name": cname, + "vendor": vendor_name, + "declared_category": declared_cat, + "actual_category": actual, + "kb_source": layer, + "title": ( + f"Marketing-Cookie '{cname}' ({vendor_name}) " + "als technisch notwendig deklariert" + ), + "norm": ( + "DSGVO Art. 6 Abs. 1 lit. a + § 25 Abs. 1 TDDDG" + ), + "evidence": ( + f"Open Cookie Database / BreakPilot-KB " + f"klassifiziert '{cname}' als '{actual}'. " + f"Site deklariert als '{declared_cat}' — " + "Einwilligung wird umgangen." + ), + "recommended_action": ( + f"Cookie '{cname}' aus Kategorie " + f"'{declared_cat}' entfernen und in " + f"'Marketing/Werbung' einsortieren. " + "Banner-Toggle für diesen Cookie pflichtig." + ), + }) + + # FINDING 2: LIFETIME-TOO-LONG-FOR-ESSENTIAL + if (_is_essential_category(declared_cat) + and declared_days is not None + and declared_days > 90): + findings.append({ + "check_id": "COOKIE-COHERENCE-LIFE-001", + "severity": "MEDIUM", + "severity_reason": "implausible", + "cookie_name": cname, + "vendor": vendor_name, + "declared_category": declared_cat, + "declared_lifetime": declared_lifetime, + "lifetime_days": declared_days, + "title": ( + f"Essential-Cookie '{cname}' mit Lifetime " + f"{int(declared_days)} Tage — Plausibilität " + "fragwürdig" + ), + "norm": "DSGVO Art. 5 Abs. 1 lit. c (Datenminimierung)", + "evidence": ( + f"Cookie deklariert als '{declared_cat}' " + f"({vendor_name}) hat Speicherdauer " + f"'{declared_lifetime}'. Echte technisch-" + "notwendige Cookies sind typischerweise " + "Session-Cookies oder max. 30 Tage." + ), + "recommended_action": ( + "Speicherdauer reduzieren (Session oder <30 Tage) " + "ODER Kategorie korrekt setzen (functional / " + "marketing) wenn Lifetime tatsächlich nötig ist." + ), + }) + + # FINDING 3: PSEUDO_PURPOSE + if _is_pseudo_purpose(c.get("purpose") or ""): + # Suppress if vendor-level purpose is substantial AND + # cookie just inherits (we don't double-count). + if not (v.get("purpose") + and len(re.findall(r"\w+", v["purpose"])) >= 6): + findings.append({ + "check_id": "COOKIE-COHERENCE-PURP-001", + "severity": "LOW", + "severity_reason": "incomplete", + "cookie_name": cname, + "vendor": vendor_name, + "title": ( + f"Cookie '{cname}' ohne konkreten Zweck — " + "nur generischer Verweis / Floskel" + ), + "norm": "DSGVO Art. 13 Abs. 1 lit. c", + "evidence": ( + f"Zweck: '{(c.get('purpose') or '')[:120]}'" + ), + "recommended_action": ( + f"Konkreten Zweck für '{cname}' angeben " + "(was wird damit konkret gespeichert / " + "verarbeitet) — nicht nur Vendor-Verweis." + ), + }) + + # FINDING 4: MISSING_COUNTRY + if not vendor_country and actual: + findings.append({ + "check_id": "COOKIE-COHERENCE-CTRY-001", + "severity": "LOW", + "severity_reason": "missing", + "cookie_name": cname, + "vendor": vendor_name, + "title": ( + f"Sitzland für '{cname}' ({vendor_name}) fehlt" + ), + "norm": "DSGVO Art. 13 Abs. 1 lit. f (Drittlandtransfer)", + "evidence": "vendor_country leer in Deklaration", + "recommended_action": ( + f"Sitzland von {vendor_name} ergänzen. " + f"KB-Hinweis: laut Bibliothek " + f"{kb.get('vendor_country') or '?'}" + ), + }) + + # FINDING 5: UNKNOWN_VENDOR + if layer == "unknown": + findings.append({ + "check_id": "COOKIE-COHERENCE-UNK-001", + "severity": "LOW", + "severity_reason": "unknown", + "cookie_name": cname, + "vendor": vendor_name, + "title": ( + f"Cookie '{cname}' nicht in Open Cookie Database / " + "BreakPilot-KB" + ), + "norm": "Auto-Learning-Kandidat", + "evidence": ( + "Keine Reference-Klassifikation verfügbar. " + "Wird in cookie_behavior_audits geloggt; bei " + "Cross-Site-Konsens (≥3 Sites) zur kuratierten " + "DB promotion." + ), + "recommended_action": ( + "Manuell prüfen + ggf. zu BreakPilot-KB hinzufügen." + ), + }) + + # FINDING 6: DUPLICATE_VENDOR (across categories) + for vnorm, cats in vendor_categories.items(): + if len(cats) > 1: + # Filter empty + real_cats = {c for c in cats if c} + if len(real_cats) > 1: + findings.append({ + "check_id": "COOKIE-COHERENCE-DUP-001", + "severity": "MEDIUM", + "severity_reason": "split_stack", + "vendor": vnorm, + "categories": sorted(real_cats), + "title": ( + f"Vendor '{vnorm}' in {len(real_cats)} " + "Kategorien gleichzeitig deklariert" + ), + "norm": "DSGVO Art. 13 Abs. 1 lit. c (Klarheit)", + "evidence": ( + f"Vendor erscheint in: " + f"{', '.join(sorted(real_cats))}. Aufspaltung " + "schmuggelt oft Marketing-Funktionen unter " + "'erforderlich'." + ), + "recommended_action": ( + f"Vendor '{vnorm}' auf EINE Kategorie " + "konsolidieren (höchste Schutzkategorie wählen — " + "wenn Marketing-Funktionen dabei sind: " + "vollständig zu Marketing)." + ), + }) + + if findings: + logger.info("B19 cookie-coherence: %d finding(s)", len(findings)) + return findings diff --git a/backend-compliance/compliance/services/cookie_csv_exporter.py b/backend-compliance/compliance/services/cookie_csv_exporter.py new file mode 100644 index 00000000..8503246b --- /dev/null +++ b/backend-compliance/compliance/services/cookie_csv_exporter.py @@ -0,0 +1,140 @@ +"""Vollständiger Cookie-CSV-Export. + +Eine Zeile pro deklariertem Cookie, mit: + - Name + Vendor + - Was die Site deklariert (category, lifetime, purpose, country) + - Was die 3-Layer-KB sagt (actual_category, typical_lifetime, + vendor_country, kb_source) + - Alle Findings als FIND_* boolean-Spalten + - recommended_action (1-Zeiler aus dem schwersten Finding) + +Output: bytes (UTF-8 CSV mit BOM für Excel-Kompatibilität). +""" + +from __future__ import annotations + +import csv +import io +import logging + +from .cookie_library_lookup import lookup as kb_lookup + +logger = logging.getLogger(__name__) + + +COLUMNS = [ + "cookie_name", "vendor_declared", "kb_vendor", "kb_layer", + "category_declared", "category_kb", + "lifetime_declared", "lifetime_kb_typical", + "purpose_declared", + "country_declared", "country_kb", + "optout_kb", + "FIND_marketing_as_essential", + "FIND_lifetime_too_long_for_essential", + "FIND_pseudo_purpose", + "FIND_missing_country", + "FIND_missing_retention", + "FIND_unknown_vendor", + "FIND_duplicate_vendor", + "FIND_third_country_no_mechanism", + "recommended_action", + "source_in_audit", +] + + +def _action_for(findings_for_cookie: list[dict]) -> str: + """Pick the action from the highest-severity finding.""" + if not findings_for_cookie: + return "" + priority = {"HIGH": 0, "MEDIUM": 1, "LOW": 2, "INFO": 3} + sorted_f = sorted( + findings_for_cookie, + key=lambda f: priority.get((f.get("severity") or "").upper(), 9), + ) + return sorted_f[0].get("recommended_action", "") or "" + + +def build_cookie_csv(state: dict) -> bytes: + """Iterate cmp_vendors + cookies, write CSV bytes.""" + cmp_vendors = state.get("cmp_vendors") or [] + coherence_findings = state.get("cookie_coherence_findings") or [] + + # Index findings by cookie_name for fast lookup + by_cookie: dict[str, list[dict]] = {} + duplicate_vendors: set[str] = set() + for f in coherence_findings: + cname = f.get("cookie_name") + if cname: + by_cookie.setdefault(cname, []).append(f) + if f.get("check_id") == "COOKIE-COHERENCE-DUP-001": + duplicate_vendors.add((f.get("vendor") or "").lower()) + + buf = io.StringIO() + # Excel-compatible BOM so Umlauts render correctly + buf.write("") + writer = csv.writer(buf, delimiter=";", quoting=csv.QUOTE_MINIMAL) + writer.writerow(COLUMNS) + + written = 0 + for v in cmp_vendors: + vendor_name = (v.get("name") or "").strip() + vendor_src = (v.get("source") or "").strip() + vendor_country = (v.get("country") or "").strip() + vendor_category = (v.get("category") or "").strip() + for c in (v.get("cookies") or []): + cname = (c.get("name") or "").strip() + if not cname: + continue + declared_cat = (c.get("category") or vendor_category).strip() + declared_purpose = (c.get("purpose") or v.get("purpose") or "").strip() + declared_lifetime = (c.get("duration") or c.get("persistence") + or c.get("expiry") or "").strip() + + kb = kb_lookup(cname) + kb_vendor = (kb.get("vendor_name") or kb.get("vendor") or "") + kb_layer = kb.get("_layer") or "unknown" + kb_category = (kb.get("actual_category") + or kb.get("consensus_category") or "") + kb_country = (kb.get("vendor_country") or "") + kb_optout = (kb.get("vendor_opt_out_url") or "") + kb_typical_lifetime = (kb.get("typical_lifetime") or "") + if not kb_typical_lifetime and kb.get("typical_max_age_seconds"): + secs = kb["typical_max_age_seconds"] + if secs: + days = secs / 86400.0 + kb_typical_lifetime = ( + f"{int(days)} Tage" if days >= 1 + else f"{int(secs / 3600)} h" if secs >= 3600 + else f"{int(secs / 60)} min" + ) + + f_cookie = by_cookie.get(cname) or [] + check_ids = {fp.get("check_id") for fp in f_cookie} + + row = [ + cname, vendor_name, kb_vendor, kb_layer, + declared_cat, kb_category, + declared_lifetime, kb_typical_lifetime, + declared_purpose[:300], + vendor_country, kb_country, + kb_optout, + "1" if "COOKIE-COHERENCE-MAE-001" in check_ids else "", + "1" if "COOKIE-COHERENCE-LIFE-001" in check_ids else "", + "1" if "COOKIE-COHERENCE-PURP-001" in check_ids else "", + "1" if "COOKIE-COHERENCE-CTRY-001" in check_ids else "", + "1" if not declared_lifetime else "", + "1" if "COOKIE-COHERENCE-UNK-001" in check_ids else "", + "1" if vendor_name.lower() in duplicate_vendors else "", + "1" if (kb_country + and kb_country.upper() not in + ("DE", "EU", "AT", "FR", "NL", "IT", "ES", + "BE", "CH", "IE", "DK", "FI", "SE", "NO") + and not c.get("transfer_mechanism")) else "", + _action_for(f_cookie), + vendor_src, + ] + writer.writerow(row) + written += 1 + + logger.info("cookie-csv export: %d rows", written) + return buf.getvalue().encode("utf-8") diff --git a/backend-compliance/compliance/services/cookie_library_lookup.py b/backend-compliance/compliance/services/cookie_library_lookup.py new file mode 100644 index 00000000..78c4f130 --- /dev/null +++ b/backend-compliance/compliance/services/cookie_library_lookup.py @@ -0,0 +1,275 @@ +"""3-Layer Cookie-Lookup-Service. + +Hierarchie (höchste Priorität zuerst): + 1. **Override-Layer**: cookie_knowledge_db.py + cookie_knowledge_extended.py + — BreakPilot-kuratierte Einträge mit Schrems-II / EUGH-Rulings / + EU-Alternative. IP-relevante Annotationen. + 2. **Truth-Base**: compliance.cookie_library (PostgreSQL, ~2287 + Einträge aus Open Cookie Database, CC0 Public Domain). + actual_category + typical_max_age + Vendor-Country. + 3. **Auto-Learning**: compliance.cookie_behavior_audits — Cookies die + wir bei Audits beobachtet aber noch nicht klassifiziert haben. + Cross-Site-Konsens (≥3 Sites mit gleichem declared_purpose) macht + sie zu Promotion-Kandidaten. + +Match-Strategie (in dieser Reihenfolge): + A. exact name match (case-insensitive) + B. prefix match (mind. 3 Chars, falls Cookie wie "_ga" einen + runtime-suffix wie "_ga_K8YL3M9T" hat) + C. wildcard match (cookie_library.domain_pattern + cookie_name mit + Suffix-Wildcard z.B. "_pk_id.*") + +Return: dict mit konsolidierter Sicht über alle 3 Layer + source-tag. +""" + +from __future__ import annotations + +import logging +import re +from typing import Any + +logger = logging.getLogger(__name__) + + +def _norm(s: str) -> str: + return (s or "").strip().lower() + + +def _strip_wildcards(s: str) -> str: + out = _norm(s) + out = out.replace("*", "").replace("…", "") + out = re.sub(r"\.\*$", "", out) + # Trailing separator (_, -, .) is implicit wildcard in the OCD — + # "guest_uuid_essential_" means "guest_uuid_essential_anything". + out = out.rstrip("_-.") + return out.strip() + + +_SEPARATORS = ("_", "-", ".", "[", ":", "$", "%") + + +def _name_matches(library_name: str, query_name: str) -> bool: + """Match-Rules zwischen einem cookie_library-Eintrag und der Anfrage. + + Beispiele: + lib="_ga" vs query="_ga_K8YL3M9T" → True (prefix + separator) + lib="_pk_id.*" vs query="_pk_id.5.7d8" → True (wildcard) + lib="__cf_bm" vs query="__cf_bm" → True (exact) + lib="c" vs query="completely_unknown" → False (no separator) + lib="ID" vs query="IDcharger" → False (no separator) + + Regel: Prefix-Match ist nur gültig wenn das Trennzeichen nach dem + Prefix in der Query ein Separator ist (oder Query endet). Verhindert + false-positives bei kurzen library-Namen ("c", "id", "u"). + """ + lib = _strip_wildcards(library_name) + q = _strip_wildcards(query_name) + if not lib or not q: + return False + if lib == q: + return True + if not _is_specific_enough(lib): + # Kurze generische Namen wie "c", "id" brauchen exakt-match + return False + if q.startswith(lib): + # Prefix-Match nur wenn nächstes Zeichen ein Separator ist + nxt = q[len(lib):len(lib) + 1] + if not nxt or nxt in _SEPARATORS: + return True + if _is_specific_enough(q) and lib.startswith(q): + nxt = lib[len(q):len(q) + 1] + if not nxt or nxt in _SEPARATORS: + return True + return False + + +def _is_specific_enough(name: str) -> bool: + """Cookie-Name ist spezifisch genug für prefix-match. + + Regel: ≥5 Chars ODER enthält Separator (_, -, .). Filtert 1-3-Char + Garbage ("c", "ID") aber lässt "_ga" / "fr" durch wenn präfixiert. + """ + if len(name) >= 5: + return True + return any(sep in name for sep in ("_", "-", ".", "[")) + + +def _load_override_layer(name: str) -> dict | None: + """Layer 1: BreakPilot-kuratiert (Schrems-II IP). + + Exact-first, then fuzzy across both KBs. Browser-Cookies haben oft + runtime-Suffixes (`_ga_K8YL3M9T`); ohne Fuzzy würden wir die + Schrems-II-Annotationen für `_ga` verfehlen. + """ + try: + from .cookie_knowledge_db import KB as KB_DB, lookup_cookie + from .cookie_knowledge_extended import ( + KB_EXT, lookup_cookie_extended, + ) + except Exception as e: + logger.warning("override-layer load failed: %s", e) + return None + hit = lookup_cookie_extended(name) or lookup_cookie(name) + if hit: + return {**hit, "_layer": "override"} + # Fuzzy: iterate both KBs and apply _name_matches + for kb in (KB_EXT, KB_DB): + for lib_name, entry in kb.items(): + if _name_matches(lib_name, name): + out = dict(entry) + out["_layer"] = "override" + out["_matched_name"] = lib_name + return out + return None + + +def _load_truth_base(name: str, domain: str = "") -> dict | None: + """Layer 2: compliance.cookie_library DB-Lookup mit fuzzy match.""" + try: + from database import SessionLocal + from sqlalchemy import text + except Exception: + return None + db = SessionLocal() + try: + # First: exact match on cookie_name (fast) + r = db.execute( + text( + "SELECT id, cookie_name, vendor_name, vendor_country, " + "vendor_privacy_url, vendor_opt_out_url, actual_category, " + "purpose_de, purpose_en, value_pattern, " + "typical_max_age_seconds, data_receivers, is_pii, " + "source_name, source_license, confidence " + "FROM compliance.cookie_library " + "WHERE LOWER(cookie_name) = LOWER(:n) LIMIT 1" + ), + {"n": name}, + ).mappings().first() + if not r: + # Fuzzy-prefix: any library entry whose name is a prefix + # of the query (or vice versa). Bounded to 20 rows for + # perf — the 2287-row table is small enough to scan. + stripped = _strip_wildcards(name) + if len(stripped) >= 3: + candidates = db.execute( + text( + "SELECT id, cookie_name, vendor_name, " + "vendor_country, vendor_privacy_url, " + "vendor_opt_out_url, actual_category, purpose_de, " + "purpose_en, value_pattern, " + "typical_max_age_seconds, data_receivers, is_pii, " + "source_name, source_license, confidence " + "FROM compliance.cookie_library " + "WHERE LOWER(cookie_name) LIKE :prefix " + "OR LOWER(:n) LIKE LOWER(cookie_name) || '%' " + "LIMIT 20" + ), + {"prefix": f"{stripped[:6].lower()}%", "n": name}, + ).mappings().all() + for c in candidates: + if _name_matches(c["cookie_name"], name): + r = c + break + if r: + out = dict(r) + out["_layer"] = "truth_base" + return out + return None + except Exception as e: + logger.info("truth_base lookup failed for %s: %s", name, e) + return None + finally: + db.close() + + +def _load_auto_learning(name: str) -> dict | None: + """Layer 3: was haben wir bei früheren Audits beobachtet? + + Wenn ≥3 unterschiedliche Sites denselben Cookie mit ähnlichem + declared_purpose deklarieren → return Konsens. + """ + try: + from database import SessionLocal + from sqlalchemy import text + except Exception: + return None + db = SessionLocal() + try: + r = db.execute( + text( + "SELECT cookie_name, " + " COUNT(DISTINCT site_url) AS site_count, " + " MODE() WITHIN GROUP (ORDER BY declared_category) " + " AS consensus_category, " + " MAX(observed_max_age_seconds) AS max_observed_age " + "FROM compliance.cookie_behavior_audits " + "WHERE LOWER(cookie_name) = LOWER(:n) " + "GROUP BY cookie_name " + "HAVING COUNT(DISTINCT site_url) >= 3" + ), + {"n": name}, + ).mappings().first() + if r: + return { + "cookie_name": r["cookie_name"], + "consensus_category": r["consensus_category"], + "observed_on_sites": r["site_count"], + "max_observed_age_seconds": r["max_observed_age"], + "_layer": "auto_learning", + } + return None + except Exception as e: + logger.info("auto_learning lookup failed for %s: %s", name, e) + return None + finally: + db.close() + + +def lookup(name: str, domain: str = "") -> dict[str, Any]: + """3-Layer-Lookup. Returns merged dict with `_layer` showing the + highest-priority source that contributed.""" + out: dict[str, Any] = {"name": name, "_found": False, "_layer": "unknown"} + + truth = _load_truth_base(name, domain) + if truth: + out.update(truth) + out["_found"] = True + + auto = _load_auto_learning(name) + if auto: + out.setdefault("consensus_category", auto.get("consensus_category")) + out.setdefault("observed_on_sites", auto.get("observed_on_sites")) + out["_found"] = True + # If truth_base wasn't a hit, fall back to auto layer + if out.get("_layer") == "unknown": + out["_layer"] = "auto_learning" + + override = _load_override_layer(name) + if override: + # Override wins for ALL annotation fields (schrems_ii, eu_alt, + # eugh_rulings). Truth-base actual_category SURVIVES — override + # is purely additive annotations from BreakPilot research. + annotation_keys = { + "schrems_ii_status", "eugh_rulings", "exact_purpose", + "data_collected", "ip_relevant", "ip_anonymized", + "tcf_purpose_ids", "iab_vendor_id", "typical_lifetime", + "reid_risk", "technical_necessity", + "eu_alternative_cookies", "eu_alternative_vendor", "notes", + } + for k in annotation_keys: + if k in override: + out[k] = override[k] + # Vendor-country override (BreakPilot recheck often more precise) + if override.get("vendor_country"): + out["vendor_country"] = override["vendor_country"] + out["_layer"] = "override" + out["_found"] = True + + return out + + +def lookup_actual_category(name: str) -> str | None: + """Convenience: return only the actual_category from truth-base / + auto-learning. None if unknown.""" + hit = lookup(name) + return hit.get("actual_category") or hit.get("consensus_category") diff --git a/backend-compliance/compliance/services/cookie_observation_logger.py b/backend-compliance/compliance/services/cookie_observation_logger.py new file mode 100644 index 00000000..eedeb83a --- /dev/null +++ b/backend-compliance/compliance/services/cookie_observation_logger.py @@ -0,0 +1,92 @@ +"""Auto-Learning für Cookies: nach jedem Audit alle deklarierten + +beobachteten Cookies in compliance.cookie_behavior_audits loggen. + +Cross-Site-Konsens (≥3 Sites mit ähnlichem declared_purpose) macht +einen unbekannten Cookie zum Promotion-Kandidaten für die kuratierte +BreakPilot-KB. Diese Logik lebt im `cookie_library_lookup._load_auto_learning`. + +Best-Effort: jeder DB-Fehler wird geloggt aber nicht propagiert — +ein Logging-Fail soll keinen Audit abbrechen. +""" + +from __future__ import annotations + +import logging +from urllib.parse import urlparse + +logger = logging.getLogger(__name__) + + +def _site_url_from_state(state: dict) -> str: + req = state.get("req") + if req is None: + return "" + for d in getattr(req, "documents", []) or []: + url = getattr(d, "url", "") or "" + if url and "://" in url: + p = urlparse(url) + return f"{p.scheme}://{p.netloc}" + return "" + + +def log_observations(state: dict) -> dict: + """Persist every (cookie, site, declared) tuple into + cookie_behavior_audits. Returns stats dict for logging.""" + try: + from database import SessionLocal + from sqlalchemy import text + except Exception: + return {"logged": 0, "skipped": "no_db"} + + check_id = state.get("check_id") or "" + site_url = _site_url_from_state(state) + if not site_url: + return {"logged": 0, "skipped": "no_site_url"} + + cmp_vendors = state.get("cmp_vendors") or [] + if not cmp_vendors: + return {"logged": 0, "skipped": "no_cmp_vendors"} + + db = SessionLocal() + inserted = 0 + skipped = 0 + try: + for v in cmp_vendors: + vendor_name = (v.get("name") or "").strip() + for c in (v.get("cookies") or []): + cname = (c.get("name") or "").strip() + if not cname: + skipped += 1 + continue + declared_cat = (c.get("category") + or v.get("category") or "").strip()[:50] + try: + db.execute( + text( + "INSERT INTO compliance.cookie_behavior_audits " + "(check_id, site_url, cookie_name, " + "cookie_domain, declared_category, " + "observed_max_age_seconds) " + "VALUES (:cid, :site, :name, :dom, :cat, :age)" + ), + { + "cid": check_id, + "site": site_url, + "name": cname, + "dom": (v.get("domain") + or v.get("name") or "")[:200], + "cat": declared_cat, + "age": None, + }, + ) + inserted += 1 + except Exception as e: + logger.info("cookie_observations insert skipped %s: %s", + cname, str(e)[:120]) + skipped += 1 + db.commit() + except Exception as e: + logger.warning("cookie_observations commit failed: %s", e) + finally: + db.close() + return {"logged": inserted, "skipped": skipped, "site_url": site_url} diff --git a/backend-compliance/compliance/services/mail_render_v2/_compose.py b/backend-compliance/compliance/services/mail_render_v2/_compose.py index bf481050..0224fc87 100644 --- a/backend-compliance/compliance/services/mail_render_v2/_compose.py +++ b/backend-compliance/compliance/services/mail_render_v2/_compose.py @@ -23,6 +23,10 @@ from ._blocks_findings import ( render_internal_reminders, render_manual_review, ) +from ._vendor_cards import ( + render_info_box_rechtsrahmen, + render_vendor_cards, +) from ._legacy_wrappers import render_all_legacy from ._style import page_close, page_open @@ -33,7 +37,12 @@ def compose_v2(state: dict) -> str: parts = [ page_open(site), render_header(state), + render_info_box_rechtsrahmen(), render_toc(state), + render_vendor_cards( + state.get("cmp_vendors") or [], + state.get("cookie_coherence_findings") or [], + ), render_critical(state), render_manual_review(state), render_internal_reminders(state), @@ -60,6 +69,8 @@ def compose_v2(state: dict) -> str: state.get("audit_walk_html", ""), # B18 Impressum-Specialist-Agent (Pattern + LLM) state.get("impressum_agent_html", ""), + # B19 Cookie-Coherence-Check (Salesforce-as-essential etc.) + state.get("cookie_coherence_html", ""), # Browser-Matrix (Stage 1.c) state.get("browser_matrix_html", ""), # All legacy build_*_html() wrapped in V2 sections — preserves diff --git a/backend-compliance/compliance/services/mail_render_v2/_vendor_cards.py b/backend-compliance/compliance/services/mail_render_v2/_vendor_cards.py new file mode 100644 index 00000000..d19f8680 --- /dev/null +++ b/backend-compliance/compliance/services/mail_render_v2/_vendor_cards.py @@ -0,0 +1,190 @@ +"""Vendor-Karten-Renderer für die Audit-Mail. + +Statt 740 Cookie-Rows aggregieren wir nach VENDOR. Pro Vendor eine +Karte mit: + - Vendor-Name + Sitzland (deklariert + KB) + - Kategorie deklariert vs KB + - Cookie-Count + Issue-Count + - 1-2 Beispiel-Cookies (mit auffälligster Lifetime) + - Top-Issue-Typen als Tags + +Sortiert nach Issue-Severity. Top 30 in der Mail, Rest in CSV. + +Die volle 740-Cookies-Tabelle bleibt im CSV-Anhang (cookies-full.csv). +""" + +from __future__ import annotations + +from collections import defaultdict +from html import escape as h + +from ._cookie_inventory import _country_third + + +def _build_vendor_summary(cmp_vendors: list[dict], + coherence_findings: list[dict]) -> list[dict]: + """Aggregate cookies by vendor, score by issue severity.""" + by_vendor: dict[str, dict] = {} + # Findings index per vendor + findings_per_vendor: dict[str, list[dict]] = defaultdict(list) + for f in coherence_findings: + v = (f.get("vendor") or "").lower() + if v: + findings_per_vendor[v].append(f) + + for v in cmp_vendors: + name = (v.get("name") or "").strip() or "Unbekannt" + key = name.lower() + entry = by_vendor.setdefault(key, { + "name": name, + "country": (v.get("country") or "").strip(), + "category": (v.get("category") or "").strip(), + "cookies": [], + }) + for c in (v.get("cookies") or []): + entry["cookies"].append(c) + + out: list[dict] = [] + sev_score = {"HIGH": 3, "MEDIUM": 2, "LOW": 1, "INFO": 0} + for key, e in by_vendor.items(): + fs = findings_per_vendor.get(key, []) + score = sum(sev_score.get((f.get("severity") or "").upper(), 0) + for f in fs) + # Pick up to 2 example cookies: prefer those WITH findings + finding_cookies = {f.get("cookie_name") for f in fs + if f.get("cookie_name")} + examples = [c for c in e["cookies"] + if (c.get("name") or "") in finding_cookies][:2] + if len(examples) < 2: + for c in e["cookies"]: + if len(examples) >= 2: + break + if c not in examples: + examples.append(c) + # Issue-types as tags + issue_types = sorted({ + (f.get("check_id") or "").split("-")[-1] + for f in fs + if f.get("check_id") + }) + out.append({ + "name": e["name"], + "country": e["country"], + "category": e["category"], + "cookie_count": len(e["cookies"]), + "issue_count": len(fs), + "issue_score": score, + "issue_types": issue_types, + "examples": examples, + }) + + # Sort: issue_score DESC, then cookie_count DESC + out.sort(key=lambda r: (-r["issue_score"], -r["cookie_count"])) + return out + + +def render_vendor_cards(cmp_vendors: list[dict], + coherence_findings: list[dict], + top_n: int = 30) -> str: + summary = _build_vendor_summary(cmp_vendors, coherence_findings) + if not summary: + return "" + + total_vendors = len(summary) + total_cookies = sum(s["cookie_count"] for s in summary) + total_issues = sum(s["issue_count"] for s in summary) + cards = [] + for s in summary[:top_n]: + sev_color = ("#dc2626" if s["issue_score"] >= 6 else + "#f59e0b" if s["issue_score"] >= 2 else "#64748b") + country_disp = s["country"] or "—" + country_tag = "" + if s["country"]: + _disp, is_third, _adq = _country_third(s["country"]) + if is_third: + country_tag = ( + " [Drittland]" + ) + issue_chips = "".join( + f"{h(t)}" + for t in s["issue_types"][:4] + ) + examples_html = "" + for c in s["examples"]: + cname = c.get("name") or "?" + lifetime = (c.get("duration") or c.get("persistence") + or c.get("expiry") or "—") + examples_html += ( + f"
" + f"• {h(cname)} " + f"(Lifetime: {h(str(lifetime))})" + "
" + ) + + cards.append( + f"
" + f"
" + f"
{h(s['name'])}" + f" " + f"{country_disp}{country_tag}
" + f"
" + f"{s['cookie_count']} Cookies · " + f"{s['issue_count']} " + f"Issues
" + f"
" + f"
{issue_chips}
" + f"
{examples_html}
" + "
" + ) + + rest_note = "" + if len(summary) > top_n: + rest_note = ( + f"

" + f"… und {len(summary)-top_n} weitere Vendoren — " + f"vollständige Liste in cookies-full-*.csv " + f"im ZIP-Anhang.

" + ) + + return ( + "
" + "

" + f"🏷️ Vendor-Übersicht ({total_vendors} Vendoren · " + f"{total_cookies} Cookies · {total_issues} Issues)" + "

" + "

" + "Sortiert nach Issue-Severity. Pro Vendor: 1-2 Beispielcookies + " + "Issue-Tags. Volle Cookie×Finding-Matrix in CSV." + "

" + + "".join(cards) + rest_note + "
" + ) + + +def render_info_box_rechtsrahmen() -> str: + """Generic legal-frame info box. Always shown in V2 mail header.""" + return ( + "
" + "Rechtsrahmen dieser Analyse" + "" + "
" + ) diff --git a/backend-compliance/tests/test_cookie_coherence_check.py b/backend-compliance/tests/test_cookie_coherence_check.py new file mode 100644 index 00000000..33f3d4e8 --- /dev/null +++ b/backend-compliance/tests/test_cookie_coherence_check.py @@ -0,0 +1,138 @@ +"""Tests for B19 Cookie-Coherence-Check (Salesforce-as-essential).""" + +from unittest.mock import patch + +from compliance.services.cookie_coherence_check import ( + _is_essential_category, + _is_marketing_category, + _is_pseudo_purpose, + check_cookie_coherence, +) + + +class TestCategoryHelpers: + def test_essential_de(self): + assert _is_essential_category("Erforderlich") + assert _is_essential_category("technisch notwendig") + + def test_essential_en(self): + assert _is_essential_category("Strictly Necessary") + assert _is_essential_category("essential") + + def test_not_essential(self): + assert not _is_essential_category("Marketing") + assert not _is_essential_category("Analyse") + + def test_marketing(self): + assert _is_marketing_category("marketing") + assert _is_marketing_category("advertising") + assert not _is_marketing_category("functional") + + +class TestPseudoPurpose: + def test_explicit_floskel(self): + assert _is_pseudo_purpose("Siehe dazugehörige Datenverarbeitung") + assert _is_pseudo_purpose("see above") + + def test_too_short(self): + assert _is_pseudo_purpose("Nutzung Cookie") + + def test_real_purpose(self): + assert not _is_pseudo_purpose( + "Speichert die anonymisierte Besucher-ID zur " + "Unterscheidung über mehrere Sessions hinweg." + ) + + +class TestCheck: + def _state(self, vendors): + return {"cmp_vendors": vendors} + + def test_no_vendors_no_findings(self): + assert check_cookie_coherence({}) == [] + + def test_marketing_as_essential_high_finding(self): + # Pinterest _pin_unauth is actual=marketing per KB + state = self._state([{ + "name": "Pinterest", + "category": "Erforderlich", + "cookies": [{ + "name": "_pin_unauth", + "category": "Erforderlich", + "purpose": "Speichert technische Nutzerkennung dauerhaft", + "duration": "1 Jahr", + }], + }]) + findings = check_cookie_coherence(state) + mae = [f for f in findings if f["check_id"] == "COOKIE-COHERENCE-MAE-001"] + assert len(mae) == 1 + assert mae[0]["severity"] == "HIGH" + assert mae[0]["actual_category"] == "marketing" + + def test_essential_with_long_lifetime_finding(self): + # Even if KB-classified as functional/essential, 1 Jahr in + # "essential" is implausible. + state = self._state([{ + "name": "Salesforce", + "category": "Erforderlich", + "cookies": [{ + "name": "guest_uuid_essential_abc123", + "category": "Erforderlich", + "purpose": "Speichert anonyme Session-Kennung über Browser hinweg", + "duration": "1 Jahr", + }], + }]) + findings = check_cookie_coherence(state) + life = [f for f in findings if f["check_id"] == "COOKIE-COHERENCE-LIFE-001"] + assert len(life) == 1 + assert life[0]["severity"] == "MEDIUM" + + def test_pseudo_purpose_finding(self): + state = self._state([{ + "name": "TestVendor", + "category": "functional", + "purpose": "irgendwas", + "cookies": [{ + "name": "completely_made_up_cookie_xyz", + "category": "functional", + "purpose": "Siehe dazugehörige Datenverarbeitung", + "duration": "session", + }], + }]) + findings = check_cookie_coherence(state) + purp = [f for f in findings if f["check_id"] == "COOKIE-COHERENCE-PURP-001"] + assert len(purp) == 1 + + def test_duplicate_vendor_finding(self): + # Salesforce in TWO different categories + state = self._state([ + {"name": "Salesforce", "category": "Erforderlich", + "cookies": [{"name": "a", "purpose": "konkreter Zweck Text mit vielen Worten"}]}, + {"name": "Salesforce Inc.", "category": "Marketing", + "cookies": [{"name": "b", "purpose": "konkreter Zweck Text mit vielen Worten"}]}, + ]) + findings = check_cookie_coherence(state) + dup = [f for f in findings if f["check_id"] == "COOKIE-COHERENCE-DUP-001"] + assert len(dup) == 1 + + def test_pseudo_purpose_suppressed_when_vendor_purpose_substantial(self): + # If vendor-level purpose has substantial text, cookie inheriting + # "Siehe dazugehörige Datenverarbeitung" is not flagged. + state = self._state([{ + "name": "Salesforce", + "category": "functional", + "purpose": ( + "Salesforce CRM-System verarbeitet personenbezogene Daten " + "im Auftrag zur Verwaltung der Kundenbeziehung über mehrere " + "Touchpoints hinweg." + ), + "cookies": [{ + "name": "sf_session", + "category": "functional", + "purpose": "Siehe dazugehörige Datenverarbeitung", + "duration": "session", + }], + }]) + findings = check_cookie_coherence(state) + purp = [f for f in findings if f["check_id"] == "COOKIE-COHERENCE-PURP-001"] + assert purp == [] diff --git a/backend-compliance/tests/test_cookie_library_lookup.py b/backend-compliance/tests/test_cookie_library_lookup.py new file mode 100644 index 00000000..22668552 --- /dev/null +++ b/backend-compliance/tests/test_cookie_library_lookup.py @@ -0,0 +1,69 @@ +"""Tests for the 3-Layer Cookie-Lookup-Service.""" + +from compliance.services.cookie_library_lookup import ( + _is_specific_enough, + _name_matches, + _strip_wildcards, +) + + +class TestStripWildcards: + def test_lowercase(self): + assert _strip_wildcards("_GA") == "_ga" + + def test_strip_star(self): + assert _strip_wildcards("_ga*") == "_ga" + + def test_strip_dotstar(self): + assert _strip_wildcards("_pk_id.*") == "_pk_id" + + def test_strip_trailing_underscore(self): + # OCD-Pattern: trailing _ is implicit wildcard + assert _strip_wildcards("guest_uuid_essential_") == "guest_uuid_essential" + + def test_strip_trailing_dot(self): + assert _strip_wildcards("_pk_id.") == "_pk_id" + + +class TestIsSpecificEnough: + def test_long_name(self): + assert _is_specific_enough("OptanonConsent") + + def test_short_with_separator(self): + assert _is_specific_enough("_ga") + + def test_short_no_separator_rejected(self): + assert not _is_specific_enough("c") + assert not _is_specific_enough("ID") + assert not _is_specific_enough("abc") + + +class TestNameMatches: + def test_exact(self): + assert _name_matches("OptanonConsent", "OptanonConsent") + + def test_prefix_with_separator(self): + # _ga library + browser _ga_K8YL3M9T + assert _name_matches("_ga", "_ga_K8YL3M9T") + # __cf_bm library + browser __cf_bm_hash + assert _name_matches("__cf_bm", "__cf_bm_hash") + + def test_short_unspecific_rejected(self): + # 1-char library entries must not match arbitrary queries + assert not _name_matches("c", "completely_unknown") + assert not _name_matches("ID", "IDcharger") + + def test_prefix_no_separator_rejected(self): + # Even with longer library, must have separator after prefix + assert not _name_matches("Compa", "Completely_unknown") + + def test_wildcard_match(self): + # _pk_id.* matches _pk_id.5.7d8 + assert _name_matches("_pk_id.*", "_pk_id.5.7d8") + + def test_trailing_underscore_match(self): + # guest_uuid_essential_ matches guest_uuid_essential_xyz + assert _name_matches("guest_uuid_essential_", "guest_uuid_essential_xyz") + + def test_unrelated(self): + assert not _name_matches("_ga", "intercom-session")