feat(audit): VW-Cookie-Bug-Fix + P101/P102 Cookie-Library-Mismatch-Findings
CI / loc-budget (push) Failing after 19s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 42s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / detect-changes (push) Successful in 11s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 15s

VW-Bug B1: extract_vendors_via_llm hatte max_text_chars=12000 -> bei
VW-Cookie-Doc (60k chars, 100 Cookies in Tabelle) wurden 80% abgeschnitten,
LLM extrahierte nur 1 Vendor. Fix: max_text_chars=50000, num_predict
6000->16000 fuer mehr Vendor-Output, Ollama-Timeout 120s->420s.

P101 Aggregator-Script (backend-compliance/scripts/cookie_library_enrich.py)
geht alle compliance_check_snapshots durch und extrahiert (cookie_name,
declared_category, observed_sites). Erste Auswertung ueber 8 Snapshots:
101 unique Cookies, 47 in Library, 54 unbekannt, 18 Mismatches.

P102 Cookie-Klassifikations-Pruefung als Mail-Block. Vergleicht
Site-deklarierte Kategorie vs Library + Vendor-Doku. HIGH wenn Library
sagt 'marketing' aber Site als 'essential'/'statistics' deklariert
(faktische Drittland-/Werbe-Verarbeitung versteckt). MEDIUM sonst.
In agent_compliance_check_routes Mail-Komposition + Replay-Pipeline
eingebaut.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-21 15:47:11 +02:00
parent 9c11b5463c
commit 94057b1536
5 changed files with 467 additions and 7 deletions
@@ -1043,11 +1043,45 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
except Exception as e:
logger.warning("Scope-disclaimer block skipped: %s", e)
# P102: Cookie-Klassifikations-Pruefung (deklariert vs Library)
library_mismatch_html = ""
try:
from compliance.services.cookie_library_mismatch import (
detect_mismatches, build_mismatch_block_html,
)
from database import SessionLocal
cookie_doc_for_check = doc_texts.get("cookie") or doc_texts.get("dse") or ""
all_cookies_seen: list[str] = []
if banner_result:
for ph in (banner_result.get("phases") or {}).values():
if isinstance(ph, dict):
for ck in (ph.get("cookies") or []):
if isinstance(ck, str):
all_cookies_seen.append(ck)
elif isinstance(ck, dict) and ck.get("name"):
all_cookies_seen.append(ck["name"])
if all_cookies_seen and cookie_doc_for_check:
_mm_db = SessionLocal()
try:
mismatches = detect_mismatches(
_mm_db, all_cookies_seen, cookie_doc_for_check,
)
if mismatches:
library_mismatch_html = build_mismatch_block_html(mismatches)
logger.info(
"P102: %d Cookie-Mismatches gefunden", len(mismatches)
)
finally:
_mm_db.close()
except Exception as e:
logger.warning("P102 mismatch detection failed: %s", e)
full_html = (
critical_html + scope_disclaimer_html + exec_summary_html
+ cookie_arch_html + summary_html + scanned_html + profile_html
+ scorecard_html + redundancy_html
+ providers_html + banner_deep_html + vvt_html + report_html
+ providers_html + banner_deep_html + library_mismatch_html
+ vvt_html + report_html
)
# Step 6: Send email — derive site name primarily from entered URL.
@@ -116,6 +116,29 @@ def replay_from_snapshot(
except Exception as e:
logger.warning("Replay: vvt failed: %s", e)
# P102: Cookie-Klassifikations-Pruefung
try:
from compliance.services.cookie_library_mismatch import (
detect_mismatches, build_mismatch_block_html,
)
cookies_seen: list[str] = []
for ph in (banner_result.get("phases") or {}).values():
if isinstance(ph, dict):
for ck in (ph.get("cookies") or []):
if isinstance(ck, str):
cookies_seen.append(ck)
elif isinstance(ck, dict) and ck.get("name"):
cookies_seen.append(ck["name"])
doc_for_check = doc_texts.get("cookie") or doc_texts.get("dse") or ""
if cookies_seen and doc_for_check:
mm = detect_mismatches(db, cookies_seen, doc_for_check)
if mm:
mm_html = build_mismatch_block_html(mm)
parts.append(mm_html)
section_sizes["library_mismatch"] = len(mm_html)
except Exception as e:
logger.warning("Replay: mismatch block failed: %s", e)
full_html = "".join(parts)
result = {
@@ -0,0 +1,157 @@
"""
P102 Cookie-Library-Mismatch-Detection pro Site.
Vergleicht die in einem Lauf erfassten Cookies (mit deklarierter
Kategorie aus dem Cookie-Doc-Text) gegen die Library
(compliance.cookie_library). Liefert Mismatches: deklariert Library.
Genutzt im Mail-Render als neuer Block "Cookie-Klassifikations-Pruefung".
"""
from __future__ import annotations
import logging
import re
from sqlalchemy import text
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
_CATEGORY_PATTERNS = [
(re.compile(r"\b(?:strictly[-\s]?)?(?:notwendig|essential|funktional|"
r"funktionscookie|technisch[- ]?notwendig)\b", re.I),
"essential"),
(re.compile(r"\b(?:tracking|analytics|analyse|statistik|"
r"measurement|performance)\b", re.I),
"statistics"),
(re.compile(r"\b(?:marketing|werbung|advertising|targeting|"
r"drittanbieter[- ]?cookie)\b", re.I),
"marketing"),
(re.compile(r"\b(?:social[-\s]?media|share|like)\b", re.I),
"social_media"),
]
def _category_for(name: str, doc_text: str) -> str | None:
if not doc_text or not name:
return None
idx = doc_text.find(name)
if idx < 0:
return None
window = doc_text[max(0, idx - 50):idx + 400]
for pat, cat in _CATEGORY_PATTERNS:
if pat.search(window):
return cat
return None
def _load_library(db: Session) -> dict[str, dict]:
rows = db.execute(text(
"SELECT cookie_name, actual_category, vendor_name "
"FROM compliance.cookie_library"
)).fetchall()
return {r[0].lower(): {"category": r[1], "vendor": r[2]} for r in rows}
def detect_mismatches(
db: Session,
cookie_names_seen: list[str],
doc_text: str,
) -> list[dict]:
"""Returns list of finding dicts."""
if not cookie_names_seen or not doc_text:
return []
lib = _load_library(db)
findings: list[dict] = []
seen: set[str] = set()
for cname in cookie_names_seen:
cname = (cname or "").strip()
if not cname or cname.lower() in seen:
continue
seen.add(cname.lower())
declared = _category_for(cname, doc_text)
if not declared:
continue
lib_entry = lib.get(cname.lower())
if not lib_entry:
continue
lib_cat = lib_entry["category"]
if lib_cat in (None, "unknown") or lib_cat == declared:
continue
# HIGH wenn Library sagt Marketing aber Site als essential/statistics
# deklariert (faktische Drittland-/Werbe-Verarbeitung versteckt
# als technische/statistische Notwendigkeit). MEDIUM sonst.
severity = "HIGH" if (
lib_cat == "marketing" and declared in ("essential", "statistics")
) else "MEDIUM"
findings.append({
"cookie": cname,
"declared_category": declared,
"library_category": lib_cat,
"library_vendor": lib_entry["vendor"],
"severity": severity,
})
return findings
def build_mismatch_block_html(findings: list[dict]) -> str:
"""Render the mismatch findings as a Mail-Block."""
if not findings:
return ""
n_high = sum(1 for f in findings if f["severity"] == "HIGH")
items: list[str] = []
for f in findings[:25]:
sev_color = "#dc2626" if f["severity"] == "HIGH" else "#d97706"
items.append(
f'<li style="margin-bottom:6px;font-size:11px">'
f'<code style="background:#f1f5f9;padding:1px 4px;border-radius:2px">'
f'{f["cookie"]}</code> '
f'<span style="color:#64748b">— deklariert als</span> '
f'<strong>{f["declared_category"]}</strong>, '
f'<span style="color:#64748b">unsere Bibliothek + verbreitete '
f'Vendor-Doku sagen</span> <strong style="color:{sev_color}">'
f'{f["library_category"]}</strong> '
f'(Vendor: {f["library_vendor"]})'
f'</li>'
)
return (
'<div style="font-family:-apple-system,BlinkMacSystemFont,sans-serif;'
'max-width:760px;margin:0 auto 16px;padding:14px 18px;'
'background:#fffbeb;border:1px solid #fde68a;border-radius:8px">'
'<div style="font-size:11px;color:#92400e;text-transform:uppercase;'
'letter-spacing:1.2px;margin-bottom:4px;font-weight:600">'
'Cookie-Klassifikations-Pruefung</div>'
f'<h3 style="margin:0 0 8px;font-size:14px;color:#1e293b">'
f'{len(findings)} Cookie{"s" if len(findings) != 1 else ""}'
f' mit abweichender Klassifikation gefunden'
f'{f" ({n_high} davon mit erhoehter Bedeutung)" if n_high else ""}'
f'</h3>'
'<p style="margin:0 0 10px;font-size:11px;color:#475569;line-height:1.5">'
'Wir haben die in Ihrer Cookie-Richtlinie deklarierte Kategorie der '
'Cookies mit unserer globalen Bibliothek (~2.300 Cookies aus Open-'
'Cookie-Database + DACH-spezifischen Quellen) und der verbreiteten '
'Vendor-Doku abgeglichen. Bei den folgenden Cookies stimmt die '
'deklarierte Kategorie nicht mit dem typischerweise erwarteten '
'Zweck ueberein. Das ist kein automatischer Verstoss — aber ein '
'Pruefanlass: bei Marketing-Cookies braucht es Einwilligung, bei '
'als "essential" deklarierten nicht. Empfehlung: mit DSB / '
'Marketing-Agentur klaeren ob die Klassifikation korrigiert '
'oder die Einwilligung anders eingeholt werden muss.</p>'
'<ul style="margin:0 0 0 18px;padding:0">'
+ "".join(items) +
'</ul>'
'<p style="margin:8px 0 0;font-size:10px;color:#94a3b8;'
'font-style:italic">Hintergrund: Art. 13(1)(c) DSGVO + EDPB 5/2020 '
'— der angegebene Verarbeitungszweck muss dem tatsaechlichen '
'entsprechen.</p>'
'</div>'
)
@@ -49,13 +49,19 @@ _SYSTEM_PROMPT = (
async def extract_vendors_via_llm(
cookie_text: str,
max_text_chars: int = 12000,
max_text_chars: int = 50000,
) -> list[dict]:
"""Run the Qwen → OVH cascade. Returns vendor records (possibly empty)."""
"""Run the Qwen → OVH cascade. Returns vendor records (possibly empty).
max_text_chars: VW-Cookie-Richtlinie hat ~60k chars mit ~100 Cookies in
der Tabelle. Bei 12k waren wir auf die ersten ~5 Cookies begrenzt und
haben nur 1 Vendor extrahiert. 50k deckt VW/BMW/Mercedes komplett ab
und passt in Qwen3-30b-a3b (128k Context) sowie OVH 120B.
"""
if not cookie_text or len(cookie_text) < 500:
return []
excerpt = cookie_text[:max_text_chars]
user_prompt = f"Cookie-Richtlinie-Text (gekuerzt):\n\n{excerpt}"
user_prompt = f"Cookie-Richtlinie-Text:\n\n{excerpt}"
# Stage 1: local Qwen
content = await _call_ollama(user_prompt)
@@ -82,10 +88,13 @@ async def _call_ollama(user_prompt: str) -> str:
{"role": "user", "content": user_prompt},
],
"stream": False, "format": "json",
"options": {"temperature": 0.05, "num_predict": 6000},
# 16k tokens fuer ~80 Vendors mit je 30 Cookies. War vorher 6k →
# output wurde mittendrin abgeschnitten, JSON unparseable → 0 Vendors.
"options": {"temperature": 0.05, "num_predict": 16000},
}
try:
async with httpx.AsyncClient(timeout=120.0) as client:
# Qwen 30b braucht fuer 16k output ~4-6min auf M4 Pro.
async with httpx.AsyncClient(timeout=420.0) as client:
resp = await client.post(f"{base.rstrip('/')}/api/chat", json=payload)
resp.raise_for_status()
return (resp.json().get("message") or {}).get("content", "")
@@ -109,7 +118,7 @@ async def _call_ovh(user_prompt: str) -> str:
{"role": "system", "content": _SYSTEM_PROMPT},
{"role": "user", "content": user_prompt},
],
"temperature": 0.05, "max_tokens": 6000,
"temperature": 0.05, "max_tokens": 16000,
"response_format": {"type": "json_object"},
}
try:
@@ -0,0 +1,237 @@
#!/usr/bin/env python3
"""P101 — Cookie-Library Auto-Enrich aus Snapshots.
Geht alle compliance_check_snapshots durch und:
1. Extrahiert unique (cookie_name, vendor_hint) aus Phase-A/B/C-Cookies
2. Sammelt deklarierte Kategorie + Speicherdauer pro Cookie pro Site
3. Vergleicht mit cookie_library (Open-Cookie-Database + DACH)
4. Reportet: new_cookies, kategorie_mismatches, multi_site_inconsistencies
Run im Container:
docker exec bp-compliance-backend python /tmp/enrich.py
"""
from __future__ import annotations
import json
import re
import sys
from collections import defaultdict
from typing import Any
from database import engine
from sqlalchemy import text
def _category_from_text_context(cookie_name: str, doc_text: str) -> str | None:
"""Lookup cookie_name in doc_text + extract deklarierte Kategorie aus
der Tabellen-Zeile darum (innerhalb 200 Zeichen davor/danach)."""
if not doc_text or not cookie_name:
return None
idx = doc_text.find(cookie_name)
if idx < 0:
return None
window = doc_text[max(0, idx - 50):idx + 400].lower()
category_patterns = [
(r"(?:strictly[- ])?(?:notwendig|essential|funktional|funktionscookie|"
r"funktional cookie|technisch notwendig)", "essential"),
(r"(?:tracking|analytics|analyse|statistik|measurement|performance)",
"statistics"),
(r"(?:marketing|werbung|advertising|targeting|drittanbieter)",
"marketing"),
(r"(?:social[- ]?media|share|like|like[- ]?button)", "social_media"),
]
for pat, cat in category_patterns:
if re.search(pat, window):
return cat
return None
def _purpose_text(cookie_name: str, doc_text: str) -> str | None:
"""Extract die Zweck-Beschreibung aus dem Doc-Text (Sätze um den Namen)."""
if not doc_text or not cookie_name:
return None
idx = doc_text.find(cookie_name)
if idx < 0:
return None
after = doc_text[idx + len(cookie_name):idx + len(cookie_name) + 400]
sentences = re.split(r"[.\n]", after)
text_lines = [s.strip() for s in sentences if 30 < len(s.strip()) < 300]
return text_lines[0] if text_lines else None
def main() -> int:
with engine.connect() as c:
rows = c.execute(text(
"SELECT id, site_domain, doc_entries, banner_result "
"FROM compliance.compliance_check_snapshots"
)).fetchall()
print(f"Loaded {len(rows)} snapshots", file=sys.stderr)
# cookie_name -> list of observations
observations: dict[str, list[dict]] = defaultdict(list)
for row in rows:
snap_id, domain, doc_entries, banner_result = row
doc_entries = doc_entries or []
banner_result = banner_result or {}
# Build combined doc_text fuer Kategorie-Inference (Cookie-Doc bevorzugt,
# fallback DSE)
doc_text = ""
for e in doc_entries:
if e.get("doc_type") in ("cookie", "dse"):
t = e.get("text") or e.get("full_text") or e.get("text_preview") or ""
if len(t) > len(doc_text):
doc_text = t
phases = (banner_result or {}).get("phases", {})
for phase_name in ("before_consent", "after_reject", "after_accept"):
phase = phases.get(phase_name) or {}
if not isinstance(phase, dict):
continue
cookies = phase.get("cookies") or []
for ck in cookies:
# Snapshots: cookies sind meist string-Listen (Cookie-Namen),
# vereinzelt dicts mit name/domain/max_age.
if isinstance(ck, dict):
cname = (ck.get("name") or "").strip()
cdomain = (ck.get("domain") or "").lstrip(".").lower()
max_age = ck.get("max_age") or ck.get("expires")
else:
cname = str(ck).strip()
cdomain = ""
max_age = None
if not cname or len(cname) > 80:
continue
cat_declared = _category_from_text_context(cname, doc_text)
purpose = _purpose_text(cname, doc_text)
observations[cname].append({
"site": domain,
"phase": phase_name,
"cookie_domain": cdomain,
"max_age": max_age,
"declared_category": cat_declared,
"declared_purpose": (purpose[:150] if purpose else None),
})
print(f"\nUnique cookies observed: {len(observations)}\n")
# Lookup vs cookie_library
with engine.connect() as c:
lib_rows = c.execute(text(
"SELECT cookie_name, actual_category, vendor_name "
"FROM compliance.cookie_library"
)).fetchall()
lib_lookup = {r[0].lower(): {"category": r[1], "vendor": r[2]}
for r in lib_rows}
new_cookies: list[str] = []
mismatches: list[dict] = []
inconsistencies: list[dict] = []
for cname, obs_list in observations.items():
sites = {o["site"] for o in obs_list}
declared_cats = {o["declared_category"] for o in obs_list
if o["declared_category"]}
# 1) Multi-Site Inkonsistenz
if len(declared_cats) > 1:
inconsistencies.append({
"cookie": cname,
"sites": list(sites),
"categories": list(declared_cats),
})
# 2) Library lookup
lib_entry = lib_lookup.get(cname.lower())
if not lib_entry:
new_cookies.append(cname)
continue
# 3) Mismatch declared vs library
for dc in declared_cats:
if dc and lib_entry["category"] != dc and lib_entry["category"] != "unknown":
mismatches.append({
"cookie": cname,
"declared_by_site": dc,
"library_says": lib_entry["category"],
"library_vendor": lib_entry["vendor"],
"sites": list(sites),
})
break
# === Report ===
print("=" * 70)
print(f"AUDIT-REPORT: P101 Cookie-Library Auto-Enrich")
print(f" Snapshots: {len(rows)}")
print(f" Unique cookies observed: {len(observations)}")
print(f" In Library (Open-Cookie-DB + DACH): {len(observations) - len(new_cookies)}")
print(f" NEW (unbekannt): {len(new_cookies)}")
print(f" Mismatches (declared != library): {len(mismatches)}")
print(f" Multi-Site Inkonsistenzen: {len(inconsistencies)}")
print("=" * 70)
print("\n--- TOP-20 NEW COOKIES (Kandidaten fuer Library-Enrich) ---")
enriched_candidates: list[tuple[str, dict]] = []
for cname in new_cookies:
obs = observations[cname]
cats = [o["declared_category"] for o in obs if o["declared_category"]]
primary_cat = cats[0] if cats else None
purpose = next((o["declared_purpose"] for o in obs
if o["declared_purpose"]), None)
sites = sorted({o["site"] for o in obs})
if not primary_cat:
continue # ohne deklarierte Kategorie nicht enrichbar
confidence = min(0.6 + 0.1 * len(sites), 0.95)
enriched_candidates.append((cname, {
"category": primary_cat,
"purpose": purpose,
"sites": sites,
"confidence": confidence,
}))
for cname, info in enriched_candidates[:20]:
print(f" {cname:30s} [{info['category']:12s}] conf={info['confidence']} "
f"sites={info['sites']}")
if info.get("purpose"):
print(f" purpose: {info['purpose'][:100]}")
print(f"\n--- ALLE MISMATCHES ({len(mismatches)}) ---")
for m in mismatches[:30]:
print(f" {m['cookie']:30s} declared={m['declared_by_site']:12s} "
f"library={m['library_says']:12s} "
f"sites={m['sites']}")
print(f"\n--- ALLE INKONSISTENZEN ({len(inconsistencies)}) ---")
for i in inconsistencies[:30]:
print(f" {i['cookie']:30s} cats={i['categories']} sites={i['sites']}")
# Auto-Insert die mit confidence >= 0.75
print(f"\n--- AUTO-INSERTING in cookie_library (confidence>=0.75) ---")
inserted = 0
with engine.begin() as c:
for cname, info in enriched_candidates:
if info["confidence"] < 0.75:
continue
r = c.execute(text("""
INSERT INTO compliance.cookie_library
(cookie_name, domain_pattern, vendor_name,
actual_category, purpose_de,
source_name, source_url, source_license, confidence)
VALUES (:n, '*', 'Mehrere OEMs (BreakPilot-Snapshot)',
:cat, :pd,
'BreakPilot-Auto-Enrich', 'https://breakpilot.ai',
'CC-BY-eigene-Sammlung', :cf)
ON CONFLICT DO NOTHING
"""), dict(
n=cname[:200],
cat=info["category"],
pd=info.get("purpose") or f"Beobachtet bei {len(info['sites'])} OEMs",
cf=info["confidence"],
))
inserted += r.rowcount
print(f" inserted: {inserted}")
return 0
if __name__ == "__main__":
sys.exit(main())