94057b1536
CI / loc-budget (push) Failing after 19s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 42s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / detect-changes (push) Successful in 11s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 15s
VW-Bug B1: extract_vendors_via_llm hatte max_text_chars=12000 -> bei VW-Cookie-Doc (60k chars, 100 Cookies in Tabelle) wurden 80% abgeschnitten, LLM extrahierte nur 1 Vendor. Fix: max_text_chars=50000, num_predict 6000->16000 fuer mehr Vendor-Output, Ollama-Timeout 120s->420s. P101 Aggregator-Script (backend-compliance/scripts/cookie_library_enrich.py) geht alle compliance_check_snapshots durch und extrahiert (cookie_name, declared_category, observed_sites). Erste Auswertung ueber 8 Snapshots: 101 unique Cookies, 47 in Library, 54 unbekannt, 18 Mismatches. P102 Cookie-Klassifikations-Pruefung als Mail-Block. Vergleicht Site-deklarierte Kategorie vs Library + Vendor-Doku. HIGH wenn Library sagt 'marketing' aber Site als 'essential'/'statistics' deklariert (faktische Drittland-/Werbe-Verarbeitung versteckt). MEDIUM sonst. In agent_compliance_check_routes Mail-Komposition + Replay-Pipeline eingebaut. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
238 lines
9.2 KiB
Python
238 lines
9.2 KiB
Python
#!/usr/bin/env python3
|
|
"""P101 — Cookie-Library Auto-Enrich aus Snapshots.
|
|
|
|
Geht alle compliance_check_snapshots durch und:
|
|
1. Extrahiert unique (cookie_name, vendor_hint) aus Phase-A/B/C-Cookies
|
|
2. Sammelt deklarierte Kategorie + Speicherdauer pro Cookie pro Site
|
|
3. Vergleicht mit cookie_library (Open-Cookie-Database + DACH)
|
|
4. Reportet: new_cookies, kategorie_mismatches, multi_site_inconsistencies
|
|
|
|
Run im Container:
|
|
docker exec bp-compliance-backend python /tmp/enrich.py
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
import sys
|
|
from collections import defaultdict
|
|
from typing import Any
|
|
|
|
from database import engine
|
|
from sqlalchemy import text
|
|
|
|
|
|
def _category_from_text_context(cookie_name: str, doc_text: str) -> str | None:
|
|
"""Lookup cookie_name in doc_text + extract deklarierte Kategorie aus
|
|
der Tabellen-Zeile darum (innerhalb 200 Zeichen davor/danach)."""
|
|
if not doc_text or not cookie_name:
|
|
return None
|
|
idx = doc_text.find(cookie_name)
|
|
if idx < 0:
|
|
return None
|
|
window = doc_text[max(0, idx - 50):idx + 400].lower()
|
|
category_patterns = [
|
|
(r"(?:strictly[- ])?(?:notwendig|essential|funktional|funktionscookie|"
|
|
r"funktional cookie|technisch notwendig)", "essential"),
|
|
(r"(?:tracking|analytics|analyse|statistik|measurement|performance)",
|
|
"statistics"),
|
|
(r"(?:marketing|werbung|advertising|targeting|drittanbieter)",
|
|
"marketing"),
|
|
(r"(?:social[- ]?media|share|like|like[- ]?button)", "social_media"),
|
|
]
|
|
for pat, cat in category_patterns:
|
|
if re.search(pat, window):
|
|
return cat
|
|
return None
|
|
|
|
|
|
def _purpose_text(cookie_name: str, doc_text: str) -> str | None:
|
|
"""Extract die Zweck-Beschreibung aus dem Doc-Text (Sätze um den Namen)."""
|
|
if not doc_text or not cookie_name:
|
|
return None
|
|
idx = doc_text.find(cookie_name)
|
|
if idx < 0:
|
|
return None
|
|
after = doc_text[idx + len(cookie_name):idx + len(cookie_name) + 400]
|
|
sentences = re.split(r"[.\n]", after)
|
|
text_lines = [s.strip() for s in sentences if 30 < len(s.strip()) < 300]
|
|
return text_lines[0] if text_lines else None
|
|
|
|
|
|
def main() -> int:
|
|
with engine.connect() as c:
|
|
rows = c.execute(text(
|
|
"SELECT id, site_domain, doc_entries, banner_result "
|
|
"FROM compliance.compliance_check_snapshots"
|
|
)).fetchall()
|
|
print(f"Loaded {len(rows)} snapshots", file=sys.stderr)
|
|
|
|
# cookie_name -> list of observations
|
|
observations: dict[str, list[dict]] = defaultdict(list)
|
|
|
|
for row in rows:
|
|
snap_id, domain, doc_entries, banner_result = row
|
|
doc_entries = doc_entries or []
|
|
banner_result = banner_result or {}
|
|
|
|
# Build combined doc_text fuer Kategorie-Inference (Cookie-Doc bevorzugt,
|
|
# fallback DSE)
|
|
doc_text = ""
|
|
for e in doc_entries:
|
|
if e.get("doc_type") in ("cookie", "dse"):
|
|
t = e.get("text") or e.get("full_text") or e.get("text_preview") or ""
|
|
if len(t) > len(doc_text):
|
|
doc_text = t
|
|
|
|
phases = (banner_result or {}).get("phases", {})
|
|
for phase_name in ("before_consent", "after_reject", "after_accept"):
|
|
phase = phases.get(phase_name) or {}
|
|
if not isinstance(phase, dict):
|
|
continue
|
|
cookies = phase.get("cookies") or []
|
|
for ck in cookies:
|
|
# Snapshots: cookies sind meist string-Listen (Cookie-Namen),
|
|
# vereinzelt dicts mit name/domain/max_age.
|
|
if isinstance(ck, dict):
|
|
cname = (ck.get("name") or "").strip()
|
|
cdomain = (ck.get("domain") or "").lstrip(".").lower()
|
|
max_age = ck.get("max_age") or ck.get("expires")
|
|
else:
|
|
cname = str(ck).strip()
|
|
cdomain = ""
|
|
max_age = None
|
|
if not cname or len(cname) > 80:
|
|
continue
|
|
cat_declared = _category_from_text_context(cname, doc_text)
|
|
purpose = _purpose_text(cname, doc_text)
|
|
observations[cname].append({
|
|
"site": domain,
|
|
"phase": phase_name,
|
|
"cookie_domain": cdomain,
|
|
"max_age": max_age,
|
|
"declared_category": cat_declared,
|
|
"declared_purpose": (purpose[:150] if purpose else None),
|
|
})
|
|
|
|
print(f"\nUnique cookies observed: {len(observations)}\n")
|
|
|
|
# Lookup vs cookie_library
|
|
with engine.connect() as c:
|
|
lib_rows = c.execute(text(
|
|
"SELECT cookie_name, actual_category, vendor_name "
|
|
"FROM compliance.cookie_library"
|
|
)).fetchall()
|
|
lib_lookup = {r[0].lower(): {"category": r[1], "vendor": r[2]}
|
|
for r in lib_rows}
|
|
|
|
new_cookies: list[str] = []
|
|
mismatches: list[dict] = []
|
|
inconsistencies: list[dict] = []
|
|
|
|
for cname, obs_list in observations.items():
|
|
sites = {o["site"] for o in obs_list}
|
|
declared_cats = {o["declared_category"] for o in obs_list
|
|
if o["declared_category"]}
|
|
|
|
# 1) Multi-Site Inkonsistenz
|
|
if len(declared_cats) > 1:
|
|
inconsistencies.append({
|
|
"cookie": cname,
|
|
"sites": list(sites),
|
|
"categories": list(declared_cats),
|
|
})
|
|
|
|
# 2) Library lookup
|
|
lib_entry = lib_lookup.get(cname.lower())
|
|
if not lib_entry:
|
|
new_cookies.append(cname)
|
|
continue
|
|
|
|
# 3) Mismatch declared vs library
|
|
for dc in declared_cats:
|
|
if dc and lib_entry["category"] != dc and lib_entry["category"] != "unknown":
|
|
mismatches.append({
|
|
"cookie": cname,
|
|
"declared_by_site": dc,
|
|
"library_says": lib_entry["category"],
|
|
"library_vendor": lib_entry["vendor"],
|
|
"sites": list(sites),
|
|
})
|
|
break
|
|
|
|
# === Report ===
|
|
print("=" * 70)
|
|
print(f"AUDIT-REPORT: P101 Cookie-Library Auto-Enrich")
|
|
print(f" Snapshots: {len(rows)}")
|
|
print(f" Unique cookies observed: {len(observations)}")
|
|
print(f" In Library (Open-Cookie-DB + DACH): {len(observations) - len(new_cookies)}")
|
|
print(f" NEW (unbekannt): {len(new_cookies)}")
|
|
print(f" Mismatches (declared != library): {len(mismatches)}")
|
|
print(f" Multi-Site Inkonsistenzen: {len(inconsistencies)}")
|
|
print("=" * 70)
|
|
|
|
print("\n--- TOP-20 NEW COOKIES (Kandidaten fuer Library-Enrich) ---")
|
|
enriched_candidates: list[tuple[str, dict]] = []
|
|
for cname in new_cookies:
|
|
obs = observations[cname]
|
|
cats = [o["declared_category"] for o in obs if o["declared_category"]]
|
|
primary_cat = cats[0] if cats else None
|
|
purpose = next((o["declared_purpose"] for o in obs
|
|
if o["declared_purpose"]), None)
|
|
sites = sorted({o["site"] for o in obs})
|
|
if not primary_cat:
|
|
continue # ohne deklarierte Kategorie nicht enrichbar
|
|
confidence = min(0.6 + 0.1 * len(sites), 0.95)
|
|
enriched_candidates.append((cname, {
|
|
"category": primary_cat,
|
|
"purpose": purpose,
|
|
"sites": sites,
|
|
"confidence": confidence,
|
|
}))
|
|
for cname, info in enriched_candidates[:20]:
|
|
print(f" {cname:30s} [{info['category']:12s}] conf={info['confidence']} "
|
|
f"sites={info['sites']}")
|
|
if info.get("purpose"):
|
|
print(f" purpose: {info['purpose'][:100]}")
|
|
|
|
print(f"\n--- ALLE MISMATCHES ({len(mismatches)}) ---")
|
|
for m in mismatches[:30]:
|
|
print(f" {m['cookie']:30s} declared={m['declared_by_site']:12s} "
|
|
f"library={m['library_says']:12s} "
|
|
f"sites={m['sites']}")
|
|
|
|
print(f"\n--- ALLE INKONSISTENZEN ({len(inconsistencies)}) ---")
|
|
for i in inconsistencies[:30]:
|
|
print(f" {i['cookie']:30s} cats={i['categories']} sites={i['sites']}")
|
|
|
|
# Auto-Insert die mit confidence >= 0.75
|
|
print(f"\n--- AUTO-INSERTING in cookie_library (confidence>=0.75) ---")
|
|
inserted = 0
|
|
with engine.begin() as c:
|
|
for cname, info in enriched_candidates:
|
|
if info["confidence"] < 0.75:
|
|
continue
|
|
r = c.execute(text("""
|
|
INSERT INTO compliance.cookie_library
|
|
(cookie_name, domain_pattern, vendor_name,
|
|
actual_category, purpose_de,
|
|
source_name, source_url, source_license, confidence)
|
|
VALUES (:n, '*', 'Mehrere OEMs (BreakPilot-Snapshot)',
|
|
:cat, :pd,
|
|
'BreakPilot-Auto-Enrich', 'https://breakpilot.ai',
|
|
'CC-BY-eigene-Sammlung', :cf)
|
|
ON CONFLICT DO NOTHING
|
|
"""), dict(
|
|
n=cname[:200],
|
|
cat=info["category"],
|
|
pd=info.get("purpose") or f"Beobachtet bei {len(info['sites'])} OEMs",
|
|
cf=info["confidence"],
|
|
))
|
|
inserted += r.rowcount
|
|
print(f" inserted: {inserted}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|