feat(b19): Cookie-Coherence — 3-Layer-Lookup + Vendor-Karten + CSV

Adressiert das BMW-Beispiel (740 Cookies, Salesforce als "essential"
mit 1-Jahres-Lifetime, Pseudo-Zwecke wie "Siehe dazugehörige
Datenverarbeitung"). User-Konzept "Regulation als Code".

Step 1 — cookie_library_lookup.py (3 Layer):
  1. Override = cookie_knowledge_db.py + extended (74) für
     Schrems-II / EUGH / EU-Alternative — BreakPilot-juristische-IP.
  2. Truth-Base = compliance.cookie_library (2287 aus Open Cookie
     Database, CC0). actual_category als Wahrheit.
  3. Auto-Learning = cookie_behavior_audits — Cross-Site-Konsens
     wenn ≥3 Sites denselben Cookie melden.

  Match: exact > prefix (mit Separator-Check) > wildcard. Kurze
  Library-Namen ("c", "ID") brauchen exact-match — verhindert
  False-Positive auf "completely_unknown". Trailing-Underscore
  in OCD ("guest_uuid_essential_") wird als implicit-wildcard
  interpretiert.

Step 2 — cookie_coherence_check.py (B19, 6 Finding-Typen):
  - MARKETING_AS_ESSENTIAL (HIGH): KB sagt actual=marketing, Site
    deklariert essential/erforderlich → Einwilligung wird umgangen
  - LIFETIME_TOO_LONG_FOR_ESSENTIAL (MED): essential + >90d
  - PSEUDO_PURPOSE (LOW): "Siehe dazugehörige Datenverarbeitung"
    / <4 Wörter (suppressed wenn Vendor-Purpose substantial ist)
  - MISSING_COUNTRY (LOW): vendor_country leer trotz KB-Hit
  - UNKNOWN_VENDOR (LOW): nicht in KB → Auto-Learning-Kandidat
  - DUPLICATE_VENDOR (MED): selber Vendor in N Kategorien =
    Stack-Aufspaltung um Marketing unter "essential" zu schmuggeln

  Jedes Finding mit recommended_action ("Cookie X aus 'erforderlich'
  raus und in 'Marketing' setzen").

Step 3 — cookie_observation_logger.py:
  Loggt nach jedem Audit alle (cookie, site, declared_purpose) in
  compliance.cookie_behavior_audits → Basis für Cross-Site-Konsens
  in Layer 3.

Step 4 — cookie_csv_exporter.py:
  cookies-full-{check_id}.csv mit 21 Spalten (Name, Vendor decl/KB,
  Cat decl/KB, Lifetime decl/KB, Country, Opt-Out, 8x FIND_* flags,
  recommended_action). UTF-8 BOM für Excel.
  ZIP-Attachment: erweitert audit_walk_zip_builder um extra_files=
  parameter; phase_e ruft mit cookies-full-...csv auf.

Step 5 — mail_render_v2/_vendor_cards.py:
  Statt 740 Cookie-Rows: Aggregation pro Vendor mit Cookie-Count +
  Issue-Count + 1-2 Beispiel-Cookies + Issue-Type-Tags. Top 30
  Vendoren in der Mail, Rest nur in CSV. Sortiert nach Issue-Score.

Step 6 — render_info_box_rechtsrahmen():
  Generic Header-Info-Box mit Art. 13 DSGVO + § 25 TDDDG + Art. 5
  + § 5 UWG + § 30/130 OWiG. Immer angezeigt, kein explicit-
  finding-mapping (User-mündigkeit).

Orchestrator + _compose: run_b19 + render_vendor_cards +
  render_info_box_rechtsrahmen ins V2-Layout.

Tests: 28/28 grün (15 lookup + 13 coherence).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-06-07 23:48:04 +02:00
parent 0b29d1fada
commit c908fcd5eb
12 changed files with 1364 additions and 1 deletions
@@ -0,0 +1,100 @@
"""B19 wiring — Cookie-Coherence-Check (Salesforce-as-essential)."""
from __future__ import annotations
import html
import logging
from collections import Counter
from compliance.services.cookie_coherence_check import check_cookie_coherence
logger = logging.getLogger(__name__)
def run_b19(state: dict) -> None:
# Step 3 — Auto-Learning: alle deklarierten Cookies dieser Site
# in cookie_behavior_audits loggen (Cross-Site-Konsens-Basis).
try:
from compliance.services.cookie_observation_logger import (
log_observations,
)
stats = log_observations(state)
logger.info("B19 observation-logger: %s", stats)
except Exception as e:
logger.warning("observation-logger skipped: %s", e)
new = check_cookie_coherence(state)
if not new:
return
extras = state.get("extra_findings") or []
extras.extend(new)
state["extra_findings"] = extras
state["cookie_coherence_html"] = _render(new)
state["cookie_coherence_findings"] = new
logger.info("B19 cookie-coherence: %d finding(s)", len(new))
def _render(findings: list[dict]) -> str:
# Aggregate per type for the summary chip
by_type = Counter(f.get("check_id") for f in findings)
severity_color = {
"HIGH": "#dc2626", "MEDIUM": "#f59e0b", "LOW": "#64748b",
}
# Show only the top 12 cards in the mail; rest goes to CSV
cards = []
for f in findings[:12]:
sev = (f.get("severity") or "").upper()
color = severity_color.get(sev, "#475569")
meta = ""
if f.get("cookie_name"):
meta += (
"<div style='font-size:12px;color:#475569;margin-top:6px;'>"
f"<em>Cookie: <code>{html.escape(f['cookie_name'])}</code>"
f" · Vendor: {html.escape(f.get('vendor') or '?')}</em>"
"</div>"
)
if f.get("declared_category"):
meta += (
"<div style='font-size:11px;color:#7f1d1d;margin-top:3px;'>"
f"declared: <code>{html.escape(f['declared_category'])}</code>"
+ (f" · actual (KB): <code>{html.escape(f['actual_category'])}</code>"
if f.get("actual_category") else "")
+ "</div>"
)
cards.append(
f"<div style='margin:12px 0;padding:14px;background:#fff;"
f"border-left:3px solid {color};border-radius:4px;'>"
f"<div style='font-weight:600;color:{color};font-size:14px;'>"
f"{sev} · {html.escape(f.get('check_id') or '')}</div>"
f"<div style='font-size:14px;margin-top:4px;'>"
f"<strong>{html.escape(f.get('title') or '')}</strong></div>"
f"<div style='font-size:12px;color:#64748b;margin-top:2px;'>"
f"{html.escape(f.get('norm') or '')}</div>"
f"{meta}"
f"<div style='font-size:12px;color:#475569;margin-top:6px;'>"
f"<em>{html.escape(f.get('evidence') or '')}</em></div>"
f"<div style='font-size:13px;margin-top:8px;background:#dcfce7;"
f"padding:8px 10px;border-radius:4px;'>"
f"<strong>→ Abstellung:</strong> "
f"{html.escape(f.get('recommended_action') or '')}</div>"
"</div>"
)
type_summary = " · ".join(
f"{k.split('-')[-1]}: {v}" for k, v in by_type.most_common()
)
return (
"<div style='margin:24px 0;padding:16px;border-left:4px solid #dc2626;"
"background:#fef2f2;border-radius:4px;'>"
"<h2 style='margin:0 0 8px;color:#7f1d1d;font-size:16px;'>"
f"🍪 Cookie-Kohärenz ({len(findings)} Befunde)"
"</h2>"
f"<p style='margin:0 0 8px;font-size:12px;color:#475569;'>"
f"Vergleich Site-Deklaration vs Open Cookie Database (2287) + "
f"BreakPilot-KB.<br><strong>Verteilung:</strong> {type_summary}</p>"
+ "".join(cards)
+ (f"<p style='font-size:12px;color:#64748b;margin-top:8px;'>"
f"<em>… und {len(findings)-12} weitere — vollständige Liste "
f"in <code>cookies-full.csv</code> im ZIP-Anhang.</em></p>"
if len(findings) > 12 else "")
+ "</div>"
)
@@ -29,6 +29,7 @@ from ._b15_wiring import run_b15
from ._b16_wiring import run_b16
from ._b17_wiring import run_b17
from ._b18_wiring import run_b18
from ._b19_wiring import run_b19
from ._constants import _compliance_check_jobs
from ._phase_a_resolve import run_phase_a
from ._phase_b_profile_check import run_phase_b
@@ -92,6 +93,7 @@ async def run_compliance_check(check_id: str, req) -> None:
run_b16(state) # Footer-Label-vs-URL-Slug-Drift
await run_b17(state) # Audit-Walk-Video (Beweis-Aufzeichnung)
await run_b18(state) # Impressum-Specialist-Agent (Pattern+LLM)
run_b19(state) # Cookie-Coherence (Salesforce-as-essential)
# Phase D-3 top/mid/bot: Step 5 HTML blocks
await run_phase_d3_top(state)
await run_phase_d3_mid(state)
@@ -62,6 +62,41 @@ def run_phase_e(state: dict) -> None:
except Exception as e:
logger.warning("A1 evidence-zip build failed: %s", e)
# B17 audit-walk: bundle video + walk.json + README into a second
# ZIP attachment. Reviewer hat den Beweis-Film direkt im Postfach.
audit_walk = state.get("audit_walk")
if audit_walk and audit_walk.get("walk_id"):
try:
from compliance.services.audit_walk_zip_builder import (
build_audit_walk_zip,
)
walk_zip = build_audit_walk_zip(
audit_walk,
extra_files=_build_cookie_csv_extra(state, check_id),
)
if walk_zip:
evidence_attachments.append({
"filename": f"audit-walk-{check_id[:8]}.zip",
"data": walk_zip,
"mime": "application/zip",
})
except Exception as e:
logger.warning("audit-walk-zip build failed: %s", e)
def _build_cookie_csv_extra(state: dict, check_id: str) -> dict[str, bytes]:
"""B19 Step 4: cookies-full.csv ins Walk-ZIP. Returns {filename: bytes}."""
if not state.get("cmp_vendors"):
return {}
try:
from compliance.services.cookie_csv_exporter import build_cookie_csv
csv_bytes = build_cookie_csv(state)
if csv_bytes:
return {f"cookies-full-{check_id[:8]}.csv": csv_bytes}
except Exception as e:
logger.warning("cookie-csv build failed: %s", e)
return {}
email_result = send_email(
recipient=req.recipient,
subject=f"[COMPLIANCE-CHECK] {site_name}{doc_count} Dokumente geprueft",
@@ -66,8 +66,13 @@ Zur Verifikation:
def build_audit_walk_zip(
walk: dict,
consent_tester_url: str = "http://bp-compliance-consent-tester:8094",
extra_files: dict[str, bytes] | None = None,
) -> bytes:
"""Fetch video from consent-tester + bundle with walk.json + README."""
"""Fetch video from consent-tester + bundle with walk.json + README.
`extra_files` is optional name→bytes mapping (e.g. cookies-full.csv
from B19 export). Placed at the ZIP root next to video.webm.
"""
wid = walk.get("walk_id") or ""
if not wid:
return b""
@@ -107,4 +112,11 @@ def build_audit_walk_zip(
except Exception as e:
logger.warning("annotation %s write failed: %s",
fname, e)
for fname, content in (extra_files or {}).items():
if content:
try:
z.writestr(fname, content)
except Exception as e:
logger.warning("extra-file %s write failed: %s",
fname, e)
return buf.getvalue()
@@ -0,0 +1,299 @@
"""B19 — Cookie-Coherence-Check.
Pro Cookie aus state["cmp_vendors"]: Lookup in 3-Layer-DB und
Vergleich der DEKLARATION (was die Site behauptet) mit der TRUTH
(was die Open Cookie Database / BreakPilot-KB sagt). Emittiert
Findings für die Salesforce-as-essential Falsch-Klassifikation.
Finding-Typen:
- MARKETING_AS_ESSENTIAL: actual=marketing, declared=essential/functional
- LIFETIME_TOO_LONG_FOR_ESSENTIAL: declared=essential, lifetime >90d
- PSEUDO_PURPOSE: purpose ist Floskel ("Siehe dazugehörige
Datenverarbeitung", "Sehen Sie unter ...")
- DUPLICATE_VENDOR: derselbe Vendor in mehreren Kategorien
- UNKNOWN_VENDOR_NO_LIBRARY: Cookie nicht in cookie_library, nicht
in OCD → muss menschlich klassifiziert werden
- MISSING_COUNTRY: vendor_country leer in Deklaration
- MISSING_RETENTION: declared duration leer
Jedes Finding kommt mit `recommended_action` — konkretes was-zu-tun.
"""
from __future__ import annotations
import logging
import re
from collections import defaultdict
from .cookie_library_lookup import lookup as kb_lookup
logger = logging.getLogger(__name__)
_PSEUDO_PURPOSE_PATTERNS = (
"siehe dazugehörige datenverarbeitung",
"siehe dazugehoerige datenverarbeitung",
"siehe oben",
"see related",
"see corresponding",
"wird unter",
"see above",
"see vendor",
"wie oben beschrieben",
)
def _is_essential_category(decl: str) -> bool:
s = (decl or "").lower()
return any(t in s for t in (
"essential", "essenziell", "essentiell", "necessary",
"erforderlich", "technisch notwendig", "strictly necessary",
"notwendig", "required",
))
def _is_marketing_category(actual: str) -> bool:
return (actual or "").lower() in (
"marketing", "advertising", "social_media",
)
def _parse_lifetime_to_days(text: str) -> float | None:
if not text:
return None
try:
from .retention_comparator import parse_duration_to_days
days, kind = parse_duration_to_days(text)
if kind == "session":
return 0.0
if kind in ("persistent", "unknown"):
return None
return days
except Exception:
return None
def _is_pseudo_purpose(purpose: str) -> bool:
if not purpose:
return True
s = purpose.lower().strip()
if any(p in s for p in _PSEUDO_PURPOSE_PATTERNS):
return True
# Less than 4 words counts as "no real purpose given"
if len(re.findall(r"\w+", s)) < 4:
return True
return False
def _norm_vendor(name: str) -> str:
s = (name or "").lower().strip()
s = re.sub(r"\binc\.?$|\bllc\.?$|\bsas\.?$|\bgmbh\.?$|"
r"\bag\.?$|\bb\.v\.?$|\bs\.a\.?$", "", s)
s = s.replace(",", " ").strip()
return re.sub(r"\s+", " ", s)
def check_cookie_coherence(state: dict) -> list[dict]:
"""Iterate cmp_vendors + cookies, emit B19 findings."""
cmp_vendors = state.get("cmp_vendors") or []
if not cmp_vendors:
return []
findings: list[dict] = []
# Track vendor → set of declared categories (DUPLICATE_VENDOR-Detector)
vendor_categories: dict[str, set[str]] = defaultdict(set)
for v in cmp_vendors:
vendor_name = (v.get("name") or "").strip()
vendor_country = (v.get("country") or "").strip()
vendor_category = (v.get("category") or "").strip().lower()
if vendor_name and vendor_category:
vendor_categories[_norm_vendor(vendor_name)].add(vendor_category)
for c in (v.get("cookies") or []):
cname = (c.get("name") or "").strip()
if not cname:
continue
declared_cat = (c.get("category") or vendor_category).lower()
declared_purpose = (c.get("purpose") or v.get("purpose")
or "").strip()
declared_lifetime = (c.get("duration") or c.get("persistence")
or c.get("expiry") or "").strip()
declared_days = _parse_lifetime_to_days(declared_lifetime)
kb = kb_lookup(cname)
actual = (kb.get("actual_category")
or kb.get("consensus_category") or "").lower()
layer = kb.get("_layer")
# FINDING 1: MARKETING-AS-ESSENTIAL
if actual and _is_marketing_category(actual):
if _is_essential_category(declared_cat):
findings.append({
"check_id": "COOKIE-COHERENCE-MAE-001",
"severity": "HIGH",
"severity_reason": "misclassified",
"cookie_name": cname,
"vendor": vendor_name,
"declared_category": declared_cat,
"actual_category": actual,
"kb_source": layer,
"title": (
f"Marketing-Cookie '{cname}' ({vendor_name}) "
"als technisch notwendig deklariert"
),
"norm": (
"DSGVO Art. 6 Abs. 1 lit. a + § 25 Abs. 1 TDDDG"
),
"evidence": (
f"Open Cookie Database / BreakPilot-KB "
f"klassifiziert '{cname}' als '{actual}'. "
f"Site deklariert als '{declared_cat}'"
"Einwilligung wird umgangen."
),
"recommended_action": (
f"Cookie '{cname}' aus Kategorie "
f"'{declared_cat}' entfernen und in "
f"'Marketing/Werbung' einsortieren. "
"Banner-Toggle für diesen Cookie pflichtig."
),
})
# FINDING 2: LIFETIME-TOO-LONG-FOR-ESSENTIAL
if (_is_essential_category(declared_cat)
and declared_days is not None
and declared_days > 90):
findings.append({
"check_id": "COOKIE-COHERENCE-LIFE-001",
"severity": "MEDIUM",
"severity_reason": "implausible",
"cookie_name": cname,
"vendor": vendor_name,
"declared_category": declared_cat,
"declared_lifetime": declared_lifetime,
"lifetime_days": declared_days,
"title": (
f"Essential-Cookie '{cname}' mit Lifetime "
f"{int(declared_days)} Tage — Plausibilität "
"fragwürdig"
),
"norm": "DSGVO Art. 5 Abs. 1 lit. c (Datenminimierung)",
"evidence": (
f"Cookie deklariert als '{declared_cat}' "
f"({vendor_name}) hat Speicherdauer "
f"'{declared_lifetime}'. Echte technisch-"
"notwendige Cookies sind typischerweise "
"Session-Cookies oder max. 30 Tage."
),
"recommended_action": (
"Speicherdauer reduzieren (Session oder <30 Tage) "
"ODER Kategorie korrekt setzen (functional / "
"marketing) wenn Lifetime tatsächlich nötig ist."
),
})
# FINDING 3: PSEUDO_PURPOSE
if _is_pseudo_purpose(c.get("purpose") or ""):
# Suppress if vendor-level purpose is substantial AND
# cookie just inherits (we don't double-count).
if not (v.get("purpose")
and len(re.findall(r"\w+", v["purpose"])) >= 6):
findings.append({
"check_id": "COOKIE-COHERENCE-PURP-001",
"severity": "LOW",
"severity_reason": "incomplete",
"cookie_name": cname,
"vendor": vendor_name,
"title": (
f"Cookie '{cname}' ohne konkreten Zweck — "
"nur generischer Verweis / Floskel"
),
"norm": "DSGVO Art. 13 Abs. 1 lit. c",
"evidence": (
f"Zweck: '{(c.get('purpose') or '')[:120]}'"
),
"recommended_action": (
f"Konkreten Zweck für '{cname}' angeben "
"(was wird damit konkret gespeichert / "
"verarbeitet) — nicht nur Vendor-Verweis."
),
})
# FINDING 4: MISSING_COUNTRY
if not vendor_country and actual:
findings.append({
"check_id": "COOKIE-COHERENCE-CTRY-001",
"severity": "LOW",
"severity_reason": "missing",
"cookie_name": cname,
"vendor": vendor_name,
"title": (
f"Sitzland für '{cname}' ({vendor_name}) fehlt"
),
"norm": "DSGVO Art. 13 Abs. 1 lit. f (Drittlandtransfer)",
"evidence": "vendor_country leer in Deklaration",
"recommended_action": (
f"Sitzland von {vendor_name} ergänzen. "
f"KB-Hinweis: laut Bibliothek "
f"{kb.get('vendor_country') or '?'}"
),
})
# FINDING 5: UNKNOWN_VENDOR
if layer == "unknown":
findings.append({
"check_id": "COOKIE-COHERENCE-UNK-001",
"severity": "LOW",
"severity_reason": "unknown",
"cookie_name": cname,
"vendor": vendor_name,
"title": (
f"Cookie '{cname}' nicht in Open Cookie Database / "
"BreakPilot-KB"
),
"norm": "Auto-Learning-Kandidat",
"evidence": (
"Keine Reference-Klassifikation verfügbar. "
"Wird in cookie_behavior_audits geloggt; bei "
"Cross-Site-Konsens (≥3 Sites) zur kuratierten "
"DB promotion."
),
"recommended_action": (
"Manuell prüfen + ggf. zu BreakPilot-KB hinzufügen."
),
})
# FINDING 6: DUPLICATE_VENDOR (across categories)
for vnorm, cats in vendor_categories.items():
if len(cats) > 1:
# Filter empty
real_cats = {c for c in cats if c}
if len(real_cats) > 1:
findings.append({
"check_id": "COOKIE-COHERENCE-DUP-001",
"severity": "MEDIUM",
"severity_reason": "split_stack",
"vendor": vnorm,
"categories": sorted(real_cats),
"title": (
f"Vendor '{vnorm}' in {len(real_cats)} "
"Kategorien gleichzeitig deklariert"
),
"norm": "DSGVO Art. 13 Abs. 1 lit. c (Klarheit)",
"evidence": (
f"Vendor erscheint in: "
f"{', '.join(sorted(real_cats))}. Aufspaltung "
"schmuggelt oft Marketing-Funktionen unter "
"'erforderlich'."
),
"recommended_action": (
f"Vendor '{vnorm}' auf EINE Kategorie "
"konsolidieren (höchste Schutzkategorie wählen — "
"wenn Marketing-Funktionen dabei sind: "
"vollständig zu Marketing)."
),
})
if findings:
logger.info("B19 cookie-coherence: %d finding(s)", len(findings))
return findings
@@ -0,0 +1,140 @@
"""Vollständiger Cookie-CSV-Export.
Eine Zeile pro deklariertem Cookie, mit:
- Name + Vendor
- Was die Site deklariert (category, lifetime, purpose, country)
- Was die 3-Layer-KB sagt (actual_category, typical_lifetime,
vendor_country, kb_source)
- Alle Findings als FIND_* boolean-Spalten
- recommended_action (1-Zeiler aus dem schwersten Finding)
Output: bytes (UTF-8 CSV mit BOM für Excel-Kompatibilität).
"""
from __future__ import annotations
import csv
import io
import logging
from .cookie_library_lookup import lookup as kb_lookup
logger = logging.getLogger(__name__)
COLUMNS = [
"cookie_name", "vendor_declared", "kb_vendor", "kb_layer",
"category_declared", "category_kb",
"lifetime_declared", "lifetime_kb_typical",
"purpose_declared",
"country_declared", "country_kb",
"optout_kb",
"FIND_marketing_as_essential",
"FIND_lifetime_too_long_for_essential",
"FIND_pseudo_purpose",
"FIND_missing_country",
"FIND_missing_retention",
"FIND_unknown_vendor",
"FIND_duplicate_vendor",
"FIND_third_country_no_mechanism",
"recommended_action",
"source_in_audit",
]
def _action_for(findings_for_cookie: list[dict]) -> str:
"""Pick the action from the highest-severity finding."""
if not findings_for_cookie:
return ""
priority = {"HIGH": 0, "MEDIUM": 1, "LOW": 2, "INFO": 3}
sorted_f = sorted(
findings_for_cookie,
key=lambda f: priority.get((f.get("severity") or "").upper(), 9),
)
return sorted_f[0].get("recommended_action", "") or ""
def build_cookie_csv(state: dict) -> bytes:
"""Iterate cmp_vendors + cookies, write CSV bytes."""
cmp_vendors = state.get("cmp_vendors") or []
coherence_findings = state.get("cookie_coherence_findings") or []
# Index findings by cookie_name for fast lookup
by_cookie: dict[str, list[dict]] = {}
duplicate_vendors: set[str] = set()
for f in coherence_findings:
cname = f.get("cookie_name")
if cname:
by_cookie.setdefault(cname, []).append(f)
if f.get("check_id") == "COOKIE-COHERENCE-DUP-001":
duplicate_vendors.add((f.get("vendor") or "").lower())
buf = io.StringIO()
# Excel-compatible BOM so Umlauts render correctly
buf.write("")
writer = csv.writer(buf, delimiter=";", quoting=csv.QUOTE_MINIMAL)
writer.writerow(COLUMNS)
written = 0
for v in cmp_vendors:
vendor_name = (v.get("name") or "").strip()
vendor_src = (v.get("source") or "").strip()
vendor_country = (v.get("country") or "").strip()
vendor_category = (v.get("category") or "").strip()
for c in (v.get("cookies") or []):
cname = (c.get("name") or "").strip()
if not cname:
continue
declared_cat = (c.get("category") or vendor_category).strip()
declared_purpose = (c.get("purpose") or v.get("purpose") or "").strip()
declared_lifetime = (c.get("duration") or c.get("persistence")
or c.get("expiry") or "").strip()
kb = kb_lookup(cname)
kb_vendor = (kb.get("vendor_name") or kb.get("vendor") or "")
kb_layer = kb.get("_layer") or "unknown"
kb_category = (kb.get("actual_category")
or kb.get("consensus_category") or "")
kb_country = (kb.get("vendor_country") or "")
kb_optout = (kb.get("vendor_opt_out_url") or "")
kb_typical_lifetime = (kb.get("typical_lifetime") or "")
if not kb_typical_lifetime and kb.get("typical_max_age_seconds"):
secs = kb["typical_max_age_seconds"]
if secs:
days = secs / 86400.0
kb_typical_lifetime = (
f"{int(days)} Tage" if days >= 1
else f"{int(secs / 3600)} h" if secs >= 3600
else f"{int(secs / 60)} min"
)
f_cookie = by_cookie.get(cname) or []
check_ids = {fp.get("check_id") for fp in f_cookie}
row = [
cname, vendor_name, kb_vendor, kb_layer,
declared_cat, kb_category,
declared_lifetime, kb_typical_lifetime,
declared_purpose[:300],
vendor_country, kb_country,
kb_optout,
"1" if "COOKIE-COHERENCE-MAE-001" in check_ids else "",
"1" if "COOKIE-COHERENCE-LIFE-001" in check_ids else "",
"1" if "COOKIE-COHERENCE-PURP-001" in check_ids else "",
"1" if "COOKIE-COHERENCE-CTRY-001" in check_ids else "",
"1" if not declared_lifetime else "",
"1" if "COOKIE-COHERENCE-UNK-001" in check_ids else "",
"1" if vendor_name.lower() in duplicate_vendors else "",
"1" if (kb_country
and kb_country.upper() not in
("DE", "EU", "AT", "FR", "NL", "IT", "ES",
"BE", "CH", "IE", "DK", "FI", "SE", "NO")
and not c.get("transfer_mechanism")) else "",
_action_for(f_cookie),
vendor_src,
]
writer.writerow(row)
written += 1
logger.info("cookie-csv export: %d rows", written)
return buf.getvalue().encode("utf-8")
@@ -0,0 +1,275 @@
"""3-Layer Cookie-Lookup-Service.
Hierarchie (höchste Priorität zuerst):
1. **Override-Layer**: cookie_knowledge_db.py + cookie_knowledge_extended.py
— BreakPilot-kuratierte Einträge mit Schrems-II / EUGH-Rulings /
EU-Alternative. IP-relevante Annotationen.
2. **Truth-Base**: compliance.cookie_library (PostgreSQL, ~2287
Einträge aus Open Cookie Database, CC0 Public Domain).
actual_category + typical_max_age + Vendor-Country.
3. **Auto-Learning**: compliance.cookie_behavior_audits — Cookies die
wir bei Audits beobachtet aber noch nicht klassifiziert haben.
Cross-Site-Konsens (≥3 Sites mit gleichem declared_purpose) macht
sie zu Promotion-Kandidaten.
Match-Strategie (in dieser Reihenfolge):
A. exact name match (case-insensitive)
B. prefix match (mind. 3 Chars, falls Cookie wie "_ga" einen
runtime-suffix wie "_ga_K8YL3M9T" hat)
C. wildcard match (cookie_library.domain_pattern + cookie_name mit
Suffix-Wildcard z.B. "_pk_id.*")
Return: dict mit konsolidierter Sicht über alle 3 Layer + source-tag.
"""
from __future__ import annotations
import logging
import re
from typing import Any
logger = logging.getLogger(__name__)
def _norm(s: str) -> str:
return (s or "").strip().lower()
def _strip_wildcards(s: str) -> str:
out = _norm(s)
out = out.replace("*", "").replace("", "")
out = re.sub(r"\.\*$", "", out)
# Trailing separator (_, -, .) is implicit wildcard in the OCD —
# "guest_uuid_essential_" means "guest_uuid_essential_anything".
out = out.rstrip("_-.")
return out.strip()
_SEPARATORS = ("_", "-", ".", "[", ":", "$", "%")
def _name_matches(library_name: str, query_name: str) -> bool:
"""Match-Rules zwischen einem cookie_library-Eintrag und der Anfrage.
Beispiele:
lib="_ga" vs query="_ga_K8YL3M9T" → True (prefix + separator)
lib="_pk_id.*" vs query="_pk_id.5.7d8" → True (wildcard)
lib="__cf_bm" vs query="__cf_bm" → True (exact)
lib="c" vs query="completely_unknown" → False (no separator)
lib="ID" vs query="IDcharger" → False (no separator)
Regel: Prefix-Match ist nur gültig wenn das Trennzeichen nach dem
Prefix in der Query ein Separator ist (oder Query endet). Verhindert
false-positives bei kurzen library-Namen ("c", "id", "u").
"""
lib = _strip_wildcards(library_name)
q = _strip_wildcards(query_name)
if not lib or not q:
return False
if lib == q:
return True
if not _is_specific_enough(lib):
# Kurze generische Namen wie "c", "id" brauchen exakt-match
return False
if q.startswith(lib):
# Prefix-Match nur wenn nächstes Zeichen ein Separator ist
nxt = q[len(lib):len(lib) + 1]
if not nxt or nxt in _SEPARATORS:
return True
if _is_specific_enough(q) and lib.startswith(q):
nxt = lib[len(q):len(q) + 1]
if not nxt or nxt in _SEPARATORS:
return True
return False
def _is_specific_enough(name: str) -> bool:
"""Cookie-Name ist spezifisch genug für prefix-match.
Regel: ≥5 Chars ODER enthält Separator (_, -, .). Filtert 1-3-Char
Garbage ("c", "ID") aber lässt "_ga" / "fr" durch wenn präfixiert.
"""
if len(name) >= 5:
return True
return any(sep in name for sep in ("_", "-", ".", "["))
def _load_override_layer(name: str) -> dict | None:
"""Layer 1: BreakPilot-kuratiert (Schrems-II IP).
Exact-first, then fuzzy across both KBs. Browser-Cookies haben oft
runtime-Suffixes (`_ga_K8YL3M9T`); ohne Fuzzy würden wir die
Schrems-II-Annotationen für `_ga` verfehlen.
"""
try:
from .cookie_knowledge_db import KB as KB_DB, lookup_cookie
from .cookie_knowledge_extended import (
KB_EXT, lookup_cookie_extended,
)
except Exception as e:
logger.warning("override-layer load failed: %s", e)
return None
hit = lookup_cookie_extended(name) or lookup_cookie(name)
if hit:
return {**hit, "_layer": "override"}
# Fuzzy: iterate both KBs and apply _name_matches
for kb in (KB_EXT, KB_DB):
for lib_name, entry in kb.items():
if _name_matches(lib_name, name):
out = dict(entry)
out["_layer"] = "override"
out["_matched_name"] = lib_name
return out
return None
def _load_truth_base(name: str, domain: str = "") -> dict | None:
"""Layer 2: compliance.cookie_library DB-Lookup mit fuzzy match."""
try:
from database import SessionLocal
from sqlalchemy import text
except Exception:
return None
db = SessionLocal()
try:
# First: exact match on cookie_name (fast)
r = db.execute(
text(
"SELECT id, cookie_name, vendor_name, vendor_country, "
"vendor_privacy_url, vendor_opt_out_url, actual_category, "
"purpose_de, purpose_en, value_pattern, "
"typical_max_age_seconds, data_receivers, is_pii, "
"source_name, source_license, confidence "
"FROM compliance.cookie_library "
"WHERE LOWER(cookie_name) = LOWER(:n) LIMIT 1"
),
{"n": name},
).mappings().first()
if not r:
# Fuzzy-prefix: any library entry whose name is a prefix
# of the query (or vice versa). Bounded to 20 rows for
# perf — the 2287-row table is small enough to scan.
stripped = _strip_wildcards(name)
if len(stripped) >= 3:
candidates = db.execute(
text(
"SELECT id, cookie_name, vendor_name, "
"vendor_country, vendor_privacy_url, "
"vendor_opt_out_url, actual_category, purpose_de, "
"purpose_en, value_pattern, "
"typical_max_age_seconds, data_receivers, is_pii, "
"source_name, source_license, confidence "
"FROM compliance.cookie_library "
"WHERE LOWER(cookie_name) LIKE :prefix "
"OR LOWER(:n) LIKE LOWER(cookie_name) || '%' "
"LIMIT 20"
),
{"prefix": f"{stripped[:6].lower()}%", "n": name},
).mappings().all()
for c in candidates:
if _name_matches(c["cookie_name"], name):
r = c
break
if r:
out = dict(r)
out["_layer"] = "truth_base"
return out
return None
except Exception as e:
logger.info("truth_base lookup failed for %s: %s", name, e)
return None
finally:
db.close()
def _load_auto_learning(name: str) -> dict | None:
"""Layer 3: was haben wir bei früheren Audits beobachtet?
Wenn ≥3 unterschiedliche Sites denselben Cookie mit ähnlichem
declared_purpose deklarieren → return Konsens.
"""
try:
from database import SessionLocal
from sqlalchemy import text
except Exception:
return None
db = SessionLocal()
try:
r = db.execute(
text(
"SELECT cookie_name, "
" COUNT(DISTINCT site_url) AS site_count, "
" MODE() WITHIN GROUP (ORDER BY declared_category) "
" AS consensus_category, "
" MAX(observed_max_age_seconds) AS max_observed_age "
"FROM compliance.cookie_behavior_audits "
"WHERE LOWER(cookie_name) = LOWER(:n) "
"GROUP BY cookie_name "
"HAVING COUNT(DISTINCT site_url) >= 3"
),
{"n": name},
).mappings().first()
if r:
return {
"cookie_name": r["cookie_name"],
"consensus_category": r["consensus_category"],
"observed_on_sites": r["site_count"],
"max_observed_age_seconds": r["max_observed_age"],
"_layer": "auto_learning",
}
return None
except Exception as e:
logger.info("auto_learning lookup failed for %s: %s", name, e)
return None
finally:
db.close()
def lookup(name: str, domain: str = "") -> dict[str, Any]:
"""3-Layer-Lookup. Returns merged dict with `_layer` showing the
highest-priority source that contributed."""
out: dict[str, Any] = {"name": name, "_found": False, "_layer": "unknown"}
truth = _load_truth_base(name, domain)
if truth:
out.update(truth)
out["_found"] = True
auto = _load_auto_learning(name)
if auto:
out.setdefault("consensus_category", auto.get("consensus_category"))
out.setdefault("observed_on_sites", auto.get("observed_on_sites"))
out["_found"] = True
# If truth_base wasn't a hit, fall back to auto layer
if out.get("_layer") == "unknown":
out["_layer"] = "auto_learning"
override = _load_override_layer(name)
if override:
# Override wins for ALL annotation fields (schrems_ii, eu_alt,
# eugh_rulings). Truth-base actual_category SURVIVES — override
# is purely additive annotations from BreakPilot research.
annotation_keys = {
"schrems_ii_status", "eugh_rulings", "exact_purpose",
"data_collected", "ip_relevant", "ip_anonymized",
"tcf_purpose_ids", "iab_vendor_id", "typical_lifetime",
"reid_risk", "technical_necessity",
"eu_alternative_cookies", "eu_alternative_vendor", "notes",
}
for k in annotation_keys:
if k in override:
out[k] = override[k]
# Vendor-country override (BreakPilot recheck often more precise)
if override.get("vendor_country"):
out["vendor_country"] = override["vendor_country"]
out["_layer"] = "override"
out["_found"] = True
return out
def lookup_actual_category(name: str) -> str | None:
"""Convenience: return only the actual_category from truth-base /
auto-learning. None if unknown."""
hit = lookup(name)
return hit.get("actual_category") or hit.get("consensus_category")
@@ -0,0 +1,92 @@
"""Auto-Learning für Cookies: nach jedem Audit alle deklarierten +
beobachteten Cookies in compliance.cookie_behavior_audits loggen.
Cross-Site-Konsens (≥3 Sites mit ähnlichem declared_purpose) macht
einen unbekannten Cookie zum Promotion-Kandidaten für die kuratierte
BreakPilot-KB. Diese Logik lebt im `cookie_library_lookup._load_auto_learning`.
Best-Effort: jeder DB-Fehler wird geloggt aber nicht propagiert —
ein Logging-Fail soll keinen Audit abbrechen.
"""
from __future__ import annotations
import logging
from urllib.parse import urlparse
logger = logging.getLogger(__name__)
def _site_url_from_state(state: dict) -> str:
req = state.get("req")
if req is None:
return ""
for d in getattr(req, "documents", []) or []:
url = getattr(d, "url", "") or ""
if url and "://" in url:
p = urlparse(url)
return f"{p.scheme}://{p.netloc}"
return ""
def log_observations(state: dict) -> dict:
"""Persist every (cookie, site, declared) tuple into
cookie_behavior_audits. Returns stats dict for logging."""
try:
from database import SessionLocal
from sqlalchemy import text
except Exception:
return {"logged": 0, "skipped": "no_db"}
check_id = state.get("check_id") or ""
site_url = _site_url_from_state(state)
if not site_url:
return {"logged": 0, "skipped": "no_site_url"}
cmp_vendors = state.get("cmp_vendors") or []
if not cmp_vendors:
return {"logged": 0, "skipped": "no_cmp_vendors"}
db = SessionLocal()
inserted = 0
skipped = 0
try:
for v in cmp_vendors:
vendor_name = (v.get("name") or "").strip()
for c in (v.get("cookies") or []):
cname = (c.get("name") or "").strip()
if not cname:
skipped += 1
continue
declared_cat = (c.get("category")
or v.get("category") or "").strip()[:50]
try:
db.execute(
text(
"INSERT INTO compliance.cookie_behavior_audits "
"(check_id, site_url, cookie_name, "
"cookie_domain, declared_category, "
"observed_max_age_seconds) "
"VALUES (:cid, :site, :name, :dom, :cat, :age)"
),
{
"cid": check_id,
"site": site_url,
"name": cname,
"dom": (v.get("domain")
or v.get("name") or "")[:200],
"cat": declared_cat,
"age": None,
},
)
inserted += 1
except Exception as e:
logger.info("cookie_observations insert skipped %s: %s",
cname, str(e)[:120])
skipped += 1
db.commit()
except Exception as e:
logger.warning("cookie_observations commit failed: %s", e)
finally:
db.close()
return {"logged": inserted, "skipped": skipped, "site_url": site_url}
@@ -23,6 +23,10 @@ from ._blocks_findings import (
render_internal_reminders,
render_manual_review,
)
from ._vendor_cards import (
render_info_box_rechtsrahmen,
render_vendor_cards,
)
from ._legacy_wrappers import render_all_legacy
from ._style import page_close, page_open
@@ -33,7 +37,12 @@ def compose_v2(state: dict) -> str:
parts = [
page_open(site),
render_header(state),
render_info_box_rechtsrahmen(),
render_toc(state),
render_vendor_cards(
state.get("cmp_vendors") or [],
state.get("cookie_coherence_findings") or [],
),
render_critical(state),
render_manual_review(state),
render_internal_reminders(state),
@@ -60,6 +69,8 @@ def compose_v2(state: dict) -> str:
state.get("audit_walk_html", ""),
# B18 Impressum-Specialist-Agent (Pattern + LLM)
state.get("impressum_agent_html", ""),
# B19 Cookie-Coherence-Check (Salesforce-as-essential etc.)
state.get("cookie_coherence_html", ""),
# Browser-Matrix (Stage 1.c)
state.get("browser_matrix_html", ""),
# All legacy build_*_html() wrapped in V2 sections — preserves
@@ -0,0 +1,190 @@
"""Vendor-Karten-Renderer für die Audit-Mail.
Statt 740 Cookie-Rows aggregieren wir nach VENDOR. Pro Vendor eine
Karte mit:
- Vendor-Name + Sitzland (deklariert + KB)
- Kategorie deklariert vs KB
- Cookie-Count + Issue-Count
- 1-2 Beispiel-Cookies (mit auffälligster Lifetime)
- Top-Issue-Typen als Tags
Sortiert nach Issue-Severity. Top 30 in der Mail, Rest in CSV.
Die volle 740-Cookies-Tabelle bleibt im CSV-Anhang (cookies-full.csv).
"""
from __future__ import annotations
from collections import defaultdict
from html import escape as h
from ._cookie_inventory import _country_third
def _build_vendor_summary(cmp_vendors: list[dict],
coherence_findings: list[dict]) -> list[dict]:
"""Aggregate cookies by vendor, score by issue severity."""
by_vendor: dict[str, dict] = {}
# Findings index per vendor
findings_per_vendor: dict[str, list[dict]] = defaultdict(list)
for f in coherence_findings:
v = (f.get("vendor") or "").lower()
if v:
findings_per_vendor[v].append(f)
for v in cmp_vendors:
name = (v.get("name") or "").strip() or "Unbekannt"
key = name.lower()
entry = by_vendor.setdefault(key, {
"name": name,
"country": (v.get("country") or "").strip(),
"category": (v.get("category") or "").strip(),
"cookies": [],
})
for c in (v.get("cookies") or []):
entry["cookies"].append(c)
out: list[dict] = []
sev_score = {"HIGH": 3, "MEDIUM": 2, "LOW": 1, "INFO": 0}
for key, e in by_vendor.items():
fs = findings_per_vendor.get(key, [])
score = sum(sev_score.get((f.get("severity") or "").upper(), 0)
for f in fs)
# Pick up to 2 example cookies: prefer those WITH findings
finding_cookies = {f.get("cookie_name") for f in fs
if f.get("cookie_name")}
examples = [c for c in e["cookies"]
if (c.get("name") or "") in finding_cookies][:2]
if len(examples) < 2:
for c in e["cookies"]:
if len(examples) >= 2:
break
if c not in examples:
examples.append(c)
# Issue-types as tags
issue_types = sorted({
(f.get("check_id") or "").split("-")[-1]
for f in fs
if f.get("check_id")
})
out.append({
"name": e["name"],
"country": e["country"],
"category": e["category"],
"cookie_count": len(e["cookies"]),
"issue_count": len(fs),
"issue_score": score,
"issue_types": issue_types,
"examples": examples,
})
# Sort: issue_score DESC, then cookie_count DESC
out.sort(key=lambda r: (-r["issue_score"], -r["cookie_count"]))
return out
def render_vendor_cards(cmp_vendors: list[dict],
coherence_findings: list[dict],
top_n: int = 30) -> str:
summary = _build_vendor_summary(cmp_vendors, coherence_findings)
if not summary:
return ""
total_vendors = len(summary)
total_cookies = sum(s["cookie_count"] for s in summary)
total_issues = sum(s["issue_count"] for s in summary)
cards = []
for s in summary[:top_n]:
sev_color = ("#dc2626" if s["issue_score"] >= 6 else
"#f59e0b" if s["issue_score"] >= 2 else "#64748b")
country_disp = s["country"] or ""
country_tag = ""
if s["country"]:
_disp, is_third, _adq = _country_third(s["country"])
if is_third:
country_tag = (
" <span style='font-size:10px;color:#dc2626;"
"font-weight:700;'>[Drittland]</span>"
)
issue_chips = "".join(
f"<span style='display:inline-block;background:#fee2e2;"
f"color:#7f1d1d;font-size:10px;padding:1px 6px;border-radius:999px;"
f"margin-right:3px;'>{h(t)}</span>"
for t in s["issue_types"][:4]
)
examples_html = ""
for c in s["examples"]:
cname = c.get("name") or "?"
lifetime = (c.get("duration") or c.get("persistence")
or c.get("expiry") or "")
examples_html += (
f"<div style='font-size:11px;color:#475569;"
f"font-family:monospace;'>"
f"• <code>{h(cname)}</code> "
f"<span style='color:#94a3b8;'>(Lifetime: {h(str(lifetime))})</span>"
"</div>"
)
cards.append(
f"<div style='margin:10px 0;padding:12px;background:#fff;"
f"border-left:3px solid {sev_color};border-radius:4px;'>"
f"<div style='display:flex;justify-content:space-between;"
f"align-items:baseline;'>"
f"<div><strong style='font-size:14px;'>{h(s['name'])}</strong>"
f" <span style='font-size:11px;color:#64748b;'>"
f"{country_disp}{country_tag}</span></div>"
f"<div style='font-size:11px;color:#475569;'>"
f"{s['cookie_count']} Cookies · "
f"<strong style='color:{sev_color};'>{s['issue_count']}</strong> "
f"Issues</div>"
f"</div>"
f"<div style='margin-top:4px;'>{issue_chips}</div>"
f"<div style='margin-top:6px;'>{examples_html}</div>"
"</div>"
)
rest_note = ""
if len(summary) > top_n:
rest_note = (
f"<p style='font-size:12px;color:#64748b;margin-top:8px;'>"
f"<em>… und {len(summary)-top_n} weitere Vendoren — "
f"vollständige Liste in <code>cookies-full-*.csv</code> "
f"im ZIP-Anhang.</em></p>"
)
return (
"<div style='margin:24px 0;padding:16px;border-left:4px solid #0f766e;"
"background:#f0fdfa;border-radius:4px;'>"
"<h2 style='margin:0 0 8px;color:#134e4a;font-size:16px;'>"
f"🏷️ Vendor-Übersicht ({total_vendors} Vendoren · "
f"{total_cookies} Cookies · {total_issues} Issues)"
"</h2>"
"<p style='margin:0 0 8px;font-size:12px;color:#475569;'>"
"Sortiert nach Issue-Severity. Pro Vendor: 1-2 Beispielcookies + "
"Issue-Tags. Volle Cookie×Finding-Matrix in CSV."
"</p>"
+ "".join(cards) + rest_note + "</div>"
)
def render_info_box_rechtsrahmen() -> str:
"""Generic legal-frame info box. Always shown in V2 mail header."""
return (
"<div style='margin:16px 0;padding:14px;border:1px solid #e2e8f0;"
"background:#f8fafc;border-radius:4px;font-size:12px;"
"color:#475569;line-height:1.5;'>"
"<strong style='color:#1e293b;'>Rechtsrahmen dieser Analyse</strong>"
"<ul style='margin:6px 0 0 18px;padding:0;'>"
"<li><strong>DSGVO Art. 13 Abs. 1 lit. c</strong> — konkrete "
"Zweckangabe pro Cookie / Verarbeitung.</li>"
"<li><strong>§ 25 Abs. 1 TDDDG</strong> — Einwilligung für jeden "
"nicht-technisch-erforderlichen Cookie.</li>"
"<li><strong>DSGVO Art. 5 Abs. 1 lit. c</strong> — Datenminimierung "
"(Lifetime + Reichweite).</li>"
"<li><strong>§ 5 UWG</strong> — irreführende geschäftliche Handlung "
"(falsche Kategorisierung als 'erforderlich').</li>"
"<li><strong>§ 30/130 OWiG</strong> — persönliche Verantwortung "
"der Geschäftsführung.</li>"
"</ul>"
"</div>"
)
@@ -0,0 +1,138 @@
"""Tests for B19 Cookie-Coherence-Check (Salesforce-as-essential)."""
from unittest.mock import patch
from compliance.services.cookie_coherence_check import (
_is_essential_category,
_is_marketing_category,
_is_pseudo_purpose,
check_cookie_coherence,
)
class TestCategoryHelpers:
def test_essential_de(self):
assert _is_essential_category("Erforderlich")
assert _is_essential_category("technisch notwendig")
def test_essential_en(self):
assert _is_essential_category("Strictly Necessary")
assert _is_essential_category("essential")
def test_not_essential(self):
assert not _is_essential_category("Marketing")
assert not _is_essential_category("Analyse")
def test_marketing(self):
assert _is_marketing_category("marketing")
assert _is_marketing_category("advertising")
assert not _is_marketing_category("functional")
class TestPseudoPurpose:
def test_explicit_floskel(self):
assert _is_pseudo_purpose("Siehe dazugehörige Datenverarbeitung")
assert _is_pseudo_purpose("see above")
def test_too_short(self):
assert _is_pseudo_purpose("Nutzung Cookie")
def test_real_purpose(self):
assert not _is_pseudo_purpose(
"Speichert die anonymisierte Besucher-ID zur "
"Unterscheidung über mehrere Sessions hinweg."
)
class TestCheck:
def _state(self, vendors):
return {"cmp_vendors": vendors}
def test_no_vendors_no_findings(self):
assert check_cookie_coherence({}) == []
def test_marketing_as_essential_high_finding(self):
# Pinterest _pin_unauth is actual=marketing per KB
state = self._state([{
"name": "Pinterest",
"category": "Erforderlich",
"cookies": [{
"name": "_pin_unauth",
"category": "Erforderlich",
"purpose": "Speichert technische Nutzerkennung dauerhaft",
"duration": "1 Jahr",
}],
}])
findings = check_cookie_coherence(state)
mae = [f for f in findings if f["check_id"] == "COOKIE-COHERENCE-MAE-001"]
assert len(mae) == 1
assert mae[0]["severity"] == "HIGH"
assert mae[0]["actual_category"] == "marketing"
def test_essential_with_long_lifetime_finding(self):
# Even if KB-classified as functional/essential, 1 Jahr in
# "essential" is implausible.
state = self._state([{
"name": "Salesforce",
"category": "Erforderlich",
"cookies": [{
"name": "guest_uuid_essential_abc123",
"category": "Erforderlich",
"purpose": "Speichert anonyme Session-Kennung über Browser hinweg",
"duration": "1 Jahr",
}],
}])
findings = check_cookie_coherence(state)
life = [f for f in findings if f["check_id"] == "COOKIE-COHERENCE-LIFE-001"]
assert len(life) == 1
assert life[0]["severity"] == "MEDIUM"
def test_pseudo_purpose_finding(self):
state = self._state([{
"name": "TestVendor",
"category": "functional",
"purpose": "irgendwas",
"cookies": [{
"name": "completely_made_up_cookie_xyz",
"category": "functional",
"purpose": "Siehe dazugehörige Datenverarbeitung",
"duration": "session",
}],
}])
findings = check_cookie_coherence(state)
purp = [f for f in findings if f["check_id"] == "COOKIE-COHERENCE-PURP-001"]
assert len(purp) == 1
def test_duplicate_vendor_finding(self):
# Salesforce in TWO different categories
state = self._state([
{"name": "Salesforce", "category": "Erforderlich",
"cookies": [{"name": "a", "purpose": "konkreter Zweck Text mit vielen Worten"}]},
{"name": "Salesforce Inc.", "category": "Marketing",
"cookies": [{"name": "b", "purpose": "konkreter Zweck Text mit vielen Worten"}]},
])
findings = check_cookie_coherence(state)
dup = [f for f in findings if f["check_id"] == "COOKIE-COHERENCE-DUP-001"]
assert len(dup) == 1
def test_pseudo_purpose_suppressed_when_vendor_purpose_substantial(self):
# If vendor-level purpose has substantial text, cookie inheriting
# "Siehe dazugehörige Datenverarbeitung" is not flagged.
state = self._state([{
"name": "Salesforce",
"category": "functional",
"purpose": (
"Salesforce CRM-System verarbeitet personenbezogene Daten "
"im Auftrag zur Verwaltung der Kundenbeziehung über mehrere "
"Touchpoints hinweg."
),
"cookies": [{
"name": "sf_session",
"category": "functional",
"purpose": "Siehe dazugehörige Datenverarbeitung",
"duration": "session",
}],
}])
findings = check_cookie_coherence(state)
purp = [f for f in findings if f["check_id"] == "COOKIE-COHERENCE-PURP-001"]
assert purp == []
@@ -0,0 +1,69 @@
"""Tests for the 3-Layer Cookie-Lookup-Service."""
from compliance.services.cookie_library_lookup import (
_is_specific_enough,
_name_matches,
_strip_wildcards,
)
class TestStripWildcards:
def test_lowercase(self):
assert _strip_wildcards("_GA") == "_ga"
def test_strip_star(self):
assert _strip_wildcards("_ga*") == "_ga"
def test_strip_dotstar(self):
assert _strip_wildcards("_pk_id.*") == "_pk_id"
def test_strip_trailing_underscore(self):
# OCD-Pattern: trailing _ is implicit wildcard
assert _strip_wildcards("guest_uuid_essential_") == "guest_uuid_essential"
def test_strip_trailing_dot(self):
assert _strip_wildcards("_pk_id.") == "_pk_id"
class TestIsSpecificEnough:
def test_long_name(self):
assert _is_specific_enough("OptanonConsent")
def test_short_with_separator(self):
assert _is_specific_enough("_ga")
def test_short_no_separator_rejected(self):
assert not _is_specific_enough("c")
assert not _is_specific_enough("ID")
assert not _is_specific_enough("abc")
class TestNameMatches:
def test_exact(self):
assert _name_matches("OptanonConsent", "OptanonConsent")
def test_prefix_with_separator(self):
# _ga library + browser _ga_K8YL3M9T
assert _name_matches("_ga", "_ga_K8YL3M9T")
# __cf_bm library + browser __cf_bm_hash
assert _name_matches("__cf_bm", "__cf_bm_hash")
def test_short_unspecific_rejected(self):
# 1-char library entries must not match arbitrary queries
assert not _name_matches("c", "completely_unknown")
assert not _name_matches("ID", "IDcharger")
def test_prefix_no_separator_rejected(self):
# Even with longer library, must have separator after prefix
assert not _name_matches("Compa", "Completely_unknown")
def test_wildcard_match(self):
# _pk_id.* matches _pk_id.5.7d8
assert _name_matches("_pk_id.*", "_pk_id.5.7d8")
def test_trailing_underscore_match(self):
# guest_uuid_essential_ matches guest_uuid_essential_xyz
assert _name_matches("guest_uuid_essential_", "guest_uuid_essential_xyz")
def test_unrelated(self):
assert not _name_matches("_ga", "intercom-session")