Files
Benjamin Admin 94057b1536
CI / loc-budget (push) Failing after 19s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 42s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / detect-changes (push) Successful in 11s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 15s
feat(audit): VW-Cookie-Bug-Fix + P101/P102 Cookie-Library-Mismatch-Findings
VW-Bug B1: extract_vendors_via_llm hatte max_text_chars=12000 -> bei
VW-Cookie-Doc (60k chars, 100 Cookies in Tabelle) wurden 80% abgeschnitten,
LLM extrahierte nur 1 Vendor. Fix: max_text_chars=50000, num_predict
6000->16000 fuer mehr Vendor-Output, Ollama-Timeout 120s->420s.

P101 Aggregator-Script (backend-compliance/scripts/cookie_library_enrich.py)
geht alle compliance_check_snapshots durch und extrahiert (cookie_name,
declared_category, observed_sites). Erste Auswertung ueber 8 Snapshots:
101 unique Cookies, 47 in Library, 54 unbekannt, 18 Mismatches.

P102 Cookie-Klassifikations-Pruefung als Mail-Block. Vergleicht
Site-deklarierte Kategorie vs Library + Vendor-Doku. HIGH wenn Library
sagt 'marketing' aber Site als 'essential'/'statistics' deklariert
(faktische Drittland-/Werbe-Verarbeitung versteckt). MEDIUM sonst.
In agent_compliance_check_routes Mail-Komposition + Replay-Pipeline
eingebaut.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 15:47:11 +02:00

238 lines
9.2 KiB
Python

#!/usr/bin/env python3
"""P101 — Cookie-Library Auto-Enrich aus Snapshots.
Geht alle compliance_check_snapshots durch und:
1. Extrahiert unique (cookie_name, vendor_hint) aus Phase-A/B/C-Cookies
2. Sammelt deklarierte Kategorie + Speicherdauer pro Cookie pro Site
3. Vergleicht mit cookie_library (Open-Cookie-Database + DACH)
4. Reportet: new_cookies, kategorie_mismatches, multi_site_inconsistencies
Run im Container:
docker exec bp-compliance-backend python /tmp/enrich.py
"""
from __future__ import annotations
import json
import re
import sys
from collections import defaultdict
from typing import Any
from database import engine
from sqlalchemy import text
def _category_from_text_context(cookie_name: str, doc_text: str) -> str | None:
"""Lookup cookie_name in doc_text + extract deklarierte Kategorie aus
der Tabellen-Zeile darum (innerhalb 200 Zeichen davor/danach)."""
if not doc_text or not cookie_name:
return None
idx = doc_text.find(cookie_name)
if idx < 0:
return None
window = doc_text[max(0, idx - 50):idx + 400].lower()
category_patterns = [
(r"(?:strictly[- ])?(?:notwendig|essential|funktional|funktionscookie|"
r"funktional cookie|technisch notwendig)", "essential"),
(r"(?:tracking|analytics|analyse|statistik|measurement|performance)",
"statistics"),
(r"(?:marketing|werbung|advertising|targeting|drittanbieter)",
"marketing"),
(r"(?:social[- ]?media|share|like|like[- ]?button)", "social_media"),
]
for pat, cat in category_patterns:
if re.search(pat, window):
return cat
return None
def _purpose_text(cookie_name: str, doc_text: str) -> str | None:
"""Extract die Zweck-Beschreibung aus dem Doc-Text (Sätze um den Namen)."""
if not doc_text or not cookie_name:
return None
idx = doc_text.find(cookie_name)
if idx < 0:
return None
after = doc_text[idx + len(cookie_name):idx + len(cookie_name) + 400]
sentences = re.split(r"[.\n]", after)
text_lines = [s.strip() for s in sentences if 30 < len(s.strip()) < 300]
return text_lines[0] if text_lines else None
def main() -> int:
with engine.connect() as c:
rows = c.execute(text(
"SELECT id, site_domain, doc_entries, banner_result "
"FROM compliance.compliance_check_snapshots"
)).fetchall()
print(f"Loaded {len(rows)} snapshots", file=sys.stderr)
# cookie_name -> list of observations
observations: dict[str, list[dict]] = defaultdict(list)
for row in rows:
snap_id, domain, doc_entries, banner_result = row
doc_entries = doc_entries or []
banner_result = banner_result or {}
# Build combined doc_text fuer Kategorie-Inference (Cookie-Doc bevorzugt,
# fallback DSE)
doc_text = ""
for e in doc_entries:
if e.get("doc_type") in ("cookie", "dse"):
t = e.get("text") or e.get("full_text") or e.get("text_preview") or ""
if len(t) > len(doc_text):
doc_text = t
phases = (banner_result or {}).get("phases", {})
for phase_name in ("before_consent", "after_reject", "after_accept"):
phase = phases.get(phase_name) or {}
if not isinstance(phase, dict):
continue
cookies = phase.get("cookies") or []
for ck in cookies:
# Snapshots: cookies sind meist string-Listen (Cookie-Namen),
# vereinzelt dicts mit name/domain/max_age.
if isinstance(ck, dict):
cname = (ck.get("name") or "").strip()
cdomain = (ck.get("domain") or "").lstrip(".").lower()
max_age = ck.get("max_age") or ck.get("expires")
else:
cname = str(ck).strip()
cdomain = ""
max_age = None
if not cname or len(cname) > 80:
continue
cat_declared = _category_from_text_context(cname, doc_text)
purpose = _purpose_text(cname, doc_text)
observations[cname].append({
"site": domain,
"phase": phase_name,
"cookie_domain": cdomain,
"max_age": max_age,
"declared_category": cat_declared,
"declared_purpose": (purpose[:150] if purpose else None),
})
print(f"\nUnique cookies observed: {len(observations)}\n")
# Lookup vs cookie_library
with engine.connect() as c:
lib_rows = c.execute(text(
"SELECT cookie_name, actual_category, vendor_name "
"FROM compliance.cookie_library"
)).fetchall()
lib_lookup = {r[0].lower(): {"category": r[1], "vendor": r[2]}
for r in lib_rows}
new_cookies: list[str] = []
mismatches: list[dict] = []
inconsistencies: list[dict] = []
for cname, obs_list in observations.items():
sites = {o["site"] for o in obs_list}
declared_cats = {o["declared_category"] for o in obs_list
if o["declared_category"]}
# 1) Multi-Site Inkonsistenz
if len(declared_cats) > 1:
inconsistencies.append({
"cookie": cname,
"sites": list(sites),
"categories": list(declared_cats),
})
# 2) Library lookup
lib_entry = lib_lookup.get(cname.lower())
if not lib_entry:
new_cookies.append(cname)
continue
# 3) Mismatch declared vs library
for dc in declared_cats:
if dc and lib_entry["category"] != dc and lib_entry["category"] != "unknown":
mismatches.append({
"cookie": cname,
"declared_by_site": dc,
"library_says": lib_entry["category"],
"library_vendor": lib_entry["vendor"],
"sites": list(sites),
})
break
# === Report ===
print("=" * 70)
print(f"AUDIT-REPORT: P101 Cookie-Library Auto-Enrich")
print(f" Snapshots: {len(rows)}")
print(f" Unique cookies observed: {len(observations)}")
print(f" In Library (Open-Cookie-DB + DACH): {len(observations) - len(new_cookies)}")
print(f" NEW (unbekannt): {len(new_cookies)}")
print(f" Mismatches (declared != library): {len(mismatches)}")
print(f" Multi-Site Inkonsistenzen: {len(inconsistencies)}")
print("=" * 70)
print("\n--- TOP-20 NEW COOKIES (Kandidaten fuer Library-Enrich) ---")
enriched_candidates: list[tuple[str, dict]] = []
for cname in new_cookies:
obs = observations[cname]
cats = [o["declared_category"] for o in obs if o["declared_category"]]
primary_cat = cats[0] if cats else None
purpose = next((o["declared_purpose"] for o in obs
if o["declared_purpose"]), None)
sites = sorted({o["site"] for o in obs})
if not primary_cat:
continue # ohne deklarierte Kategorie nicht enrichbar
confidence = min(0.6 + 0.1 * len(sites), 0.95)
enriched_candidates.append((cname, {
"category": primary_cat,
"purpose": purpose,
"sites": sites,
"confidence": confidence,
}))
for cname, info in enriched_candidates[:20]:
print(f" {cname:30s} [{info['category']:12s}] conf={info['confidence']} "
f"sites={info['sites']}")
if info.get("purpose"):
print(f" purpose: {info['purpose'][:100]}")
print(f"\n--- ALLE MISMATCHES ({len(mismatches)}) ---")
for m in mismatches[:30]:
print(f" {m['cookie']:30s} declared={m['declared_by_site']:12s} "
f"library={m['library_says']:12s} "
f"sites={m['sites']}")
print(f"\n--- ALLE INKONSISTENZEN ({len(inconsistencies)}) ---")
for i in inconsistencies[:30]:
print(f" {i['cookie']:30s} cats={i['categories']} sites={i['sites']}")
# Auto-Insert die mit confidence >= 0.75
print(f"\n--- AUTO-INSERTING in cookie_library (confidence>=0.75) ---")
inserted = 0
with engine.begin() as c:
for cname, info in enriched_candidates:
if info["confidence"] < 0.75:
continue
r = c.execute(text("""
INSERT INTO compliance.cookie_library
(cookie_name, domain_pattern, vendor_name,
actual_category, purpose_de,
source_name, source_url, source_license, confidence)
VALUES (:n, '*', 'Mehrere OEMs (BreakPilot-Snapshot)',
:cat, :pd,
'BreakPilot-Auto-Enrich', 'https://breakpilot.ai',
'CC-BY-eigene-Sammlung', :cf)
ON CONFLICT DO NOTHING
"""), dict(
n=cname[:200],
cat=info["category"],
pd=info.get("purpose") or f"Beobachtet bei {len(info['sites'])} OEMs",
cf=info["confidence"],
))
inserted += r.rowcount
print(f" inserted: {inserted}")
return 0
if __name__ == "__main__":
sys.exit(main())