From c867478791ba071b691d2fea147af89f6925789b Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Tue, 12 May 2026 18:18:50 +0200 Subject: [PATCH] feat(tcf-vendors): GVL cache + vendor extraction + VVT mapping MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1-2 of the closed quality loop: - GVL cache (consent-tester/services/gvl_cache.py): downloads and caches IAB Global Vendor List with 24h TTL, resolves vendor IDs to names, purposes, policy URLs, retention, country - Vendor extraction (consent_interceptor.py): extract_tcf_vendors() reads __tcfapi after accept phase, resolves via GVL - Scan response: tcf_vendors field added to /scan endpoint - VVT mapper (vendor_vvt_mapper.py): maps TCF vendors to VVT format with purpose labels, Rechtsgrundlage, Drittland detection - Vendor cross-check (banner_cookie_cross_check.py): checks all TCF vendors against DSI text — missing vendors, undocumented transfers - Compliance check integrates Step 3d: TCF vendors vs DSI Co-Authored-By: Claude Opus 4.6 (1M context) --- .../api/agent_compliance_check_routes.py | 19 ++- .../services/banner_cookie_cross_check.py | 80 ++++++++++ .../compliance/services/vendor_vvt_mapper.py | 104 +++++++++++++ consent-tester/main.py | 2 + .../services/consent_interceptor.py | 33 ++++ consent-tester/services/consent_scanner.py | 9 ++ consent-tester/services/gvl_cache.py | 147 ++++++++++++++++++ 7 files changed, 392 insertions(+), 2 deletions(-) create mode 100644 backend-compliance/compliance/services/vendor_vvt_mapper.py create mode 100644 consent-tester/services/gvl_cache.py diff --git a/backend-compliance/compliance/api/agent_compliance_check_routes.py b/backend-compliance/compliance/api/agent_compliance_check_routes.py index 98e4e8a..c6e73ae 100644 --- a/backend-compliance/compliance/api/agent_compliance_check_routes.py +++ b/backend-compliance/compliance/api/agent_compliance_check_routes.py @@ -258,16 +258,29 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): banner_result, doc_texts["cookie"], ) if cross_findings: - # Add cross-check findings to cookie results for r in results: if r.doc_type == "cookie": for cf in cross_findings: r.checks.append(CheckItem(**cf)) - # Recompute l2 = [c for c in r.checks if c.level == 2 and not c.skipped] l2p = sum(1 for c in l2 if c.passed) r.correctness_pct = round(l2p / len(l2) * 100) if l2 else 0 + # Step 3d: TCF Vendor cross-check against DSI + tcf_vendors = banner_result.get("tcf_vendors", []) if banner_result else [] + vvt_entries: list[dict] = [] + if tcf_vendors and "dse" in doc_texts: + _update(check_id, f"{len(tcf_vendors)} TCF-Verarbeiter vs. DSI abgleichen...") + from compliance.services.banner_cookie_cross_check import cross_check_vendors_vs_dsi + from compliance.services.vendor_vvt_mapper import map_vendors_to_vvt + vendor_findings = cross_check_vendors_vs_dsi(tcf_vendors, doc_texts["dse"]) + if vendor_findings: + for r in results: + if r.doc_type == "dse": + for vf in vendor_findings: + r.checks.append(CheckItem(**vf)) + vvt_entries = map_vendors_to_vvt(tcf_vendors) + # Step 4: Extract profile hints from documents _update(check_id, "Profil wird aus Dokumenten extrahiert...") from compliance.services.profile_extractor import extract_profile_from_documents @@ -307,7 +320,9 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): "detected": banner_result.get("banner_detected", False) if banner_result else False, "provider": banner_result.get("banner_provider", "") if banner_result else "", "violations": len(banner_result.get("banner_checks", {}).get("violations", [])) if banner_result else 0, + "tcf_vendor_count": len(tcf_vendors), } if banner_result else None, + "tcf_vendors": vvt_entries if tcf_vendors else [], "total_documents": len(results), "total_findings": total_findings, "email_status": email_result.get("status", "failed"), diff --git a/backend-compliance/compliance/services/banner_cookie_cross_check.py b/backend-compliance/compliance/services/banner_cookie_cross_check.py index 30f45a1..d1f8b9b 100644 --- a/backend-compliance/compliance/services/banner_cookie_cross_check.py +++ b/backend-compliance/compliance/services/banner_cookie_cross_check.py @@ -143,3 +143,83 @@ def cross_check_banner_vs_cookie( logger.info("Cross-check: %d findings (%d services, %d tracking before)", len(findings), len(all_tracking), len(tracking_before)) return findings + + +def cross_check_vendors_vs_dsi( + vendors: list[dict], + dsi_text: str, +) -> list[dict]: + """Cross-check: Are all TCF vendors documented in the DSI? + + Checks per vendor: + 1. Is the vendor mentioned by name? + 2. Is third-country transfer documented (if non-EU)? + 3. Is storage duration mentioned? + + Returns list of CheckItem-compatible dicts. + """ + findings: list[dict] = [] + dsi_lower = dsi_text.lower() + + for v in vendors: + name = v.get("name", "") + name_lower = name.lower() + if not name_lower: + continue + + # Check if vendor is mentioned in DSI + mentioned = any(kw in dsi_lower for kw in [ + name_lower, + name_lower.replace(" ", ""), + name_lower.split()[0] if " " in name_lower else name_lower, + ]) + + if not mentioned: + findings.append({ + "id": f"vendor-{v.get('vendor_id', name_lower[:20])}", + "label": f"Verarbeiter '{name}' fehlt in DSI", + "passed": False, + "severity": "HIGH", + "level": 2, + "parent": None, + "skipped": False, + "matched_text": "", + "hint": ( + f"Der Cookie-Banner listet '{name}' als Verarbeiter " + f"({v.get('zweck_kurz', 'unbekannt')}), aber die DSI " + f"erwaehnt diesen Dienst nicht. Art. 13(1)(e) DSGVO " + f"verlangt die Benennung aller Empfaenger." + ), + "source": "vendor_cross_check", + }) + + # Check third-country transfer documentation + if v.get("drittland") and mentioned: + country = v.get("land", "Drittland") + transfer_mentioned = any(kw in dsi_lower for kw in [ + name_lower + ".*" + "usa", + name_lower + ".*" + "drittland", + "scc", "standardvertragsklausel", "data privacy framework", + "angemessenheitsbeschluss", + ]) + if not transfer_mentioned: + findings.append({ + "id": f"vendor-transfer-{v.get('vendor_id', '')}", + "label": f"Drittlandtransfer fuer '{name}' nicht dokumentiert", + "passed": False, + "severity": "MEDIUM", + "level": 2, + "parent": None, + "skipped": False, + "matched_text": "", + "hint": ( + f"'{name}' verarbeitet Daten in {country} (ausserhalb EWR). " + f"Die DSI muss den Transfermechanismus benennen: " + f"SCC (Art. 46(2)(c)) oder DPF (Angemessenheitsbeschluss)." + ), + "source": "vendor_cross_check", + }) + + logger.info("Vendor cross-check: %d findings for %d vendors", + len(findings), len(vendors)) + return findings diff --git a/backend-compliance/compliance/services/vendor_vvt_mapper.py b/backend-compliance/compliance/services/vendor_vvt_mapper.py new file mode 100644 index 0000000..d0237ea --- /dev/null +++ b/backend-compliance/compliance/services/vendor_vvt_mapper.py @@ -0,0 +1,104 @@ +""" +Vendor VVT Mapper — map TCF vendors to VVT entries. + +Converts resolved TCF vendor data (from GVL) into the format +needed for the Verarbeitungsverzeichnis (VVT) and for DSI +cross-checking. +""" + +import logging + +logger = logging.getLogger(__name__) + +# IAB TCF v2.2 Purpose definitions (German) +TCF_PURPOSE_LABELS = { + 1: "Speicherung/Zugriff auf Endgeraet", + 2: "Auswahl einfacher Anzeigen", + 3: "Personalisiertes Anzeigenprofil erstellen", + 4: "Personalisierte Anzeigen auswaehlen", + 5: "Personalisiertes Inhaltsprofil erstellen", + 6: "Personalisierte Inhalte auswaehlen", + 7: "Anzeigenleistung messen", + 8: "Inhaltsleistung messen", + 9: "Marktforschung", + 10: "Produkte entwickeln und verbessern", + 11: "Geraeteeigenschaften zur Identifizierung nutzen", +} + +# Purpose → Banner-Kategorie Mapping +PURPOSE_CATEGORY = { + 1: "necessary", + 2: "marketing", 3: "marketing", 4: "marketing", + 5: "marketing", 6: "marketing", + 7: "statistics", 8: "statistics", + 9: "statistics", 10: "functional", 11: "functional", +} + +# EWR countries +_EU_EWR = { + "AT", "BE", "BG", "HR", "CY", "CZ", "DK", "EE", "FI", "FR", + "DE", "GR", "HU", "IE", "IT", "LV", "LT", "LU", "MT", "NL", + "PL", "PT", "RO", "SK", "SI", "ES", "SE", "IS", "LI", "NO", + "CH", "GB", +} + + +def tcf_vendor_to_vvt(vendor: dict) -> dict: + """Map a resolved TCF vendor to a VVT entry. + + Args: + vendor: Resolved GVL vendor dict with name, purposes, country, etc. + + Returns: + VVT-compatible dict with name, zweck, rechtsgrundlage, drittland, etc. + """ + purposes = vendor.get("purposes", []) + country = vendor.get("country") + is_eu = vendor.get("is_eu", country in _EU_EWR if country else None) + + # Determine primary category from purposes + categories = set() + for p in purposes: + cat = PURPOSE_CATEGORY.get(p, "functional") + categories.add(cat) + + # Rechtsgrundlage depends on category + if "marketing" in categories or "statistics" in categories: + rechtsgrundlage = "Einwilligung (Art. 6(1)(a) DSGVO, §25 Abs. 1 TDDDG)" + else: + rechtsgrundlage = "Berechtigtes Interesse (Art. 6(1)(f) DSGVO, §25 Abs. 2 TDDDG)" + + return { + "vendor_id": vendor.get("vendor_id"), + "name": vendor.get("name", ""), + "zweck": [TCF_PURPOSE_LABELS.get(p, f"Zweck {p}") for p in purposes], + "zweck_kurz": _summarize_purposes(purposes), + "kategorie": sorted(categories)[0] if categories else "functional", + "rechtsgrundlage": rechtsgrundlage, + "drittland": not is_eu if is_eu is not None else None, + "land": country, + "transfermechanismus": "SCC/DPF" if (not is_eu and is_eu is not None) else None, + "speicherdauer_tage": vendor.get("retention_days"), + "policy_url": vendor.get("policy_url", ""), + "uses_cookies": vendor.get("uses_cookies", False), + } + + +def map_vendors_to_vvt(vendors: list[dict]) -> list[dict]: + """Map a list of TCF vendors to VVT entries.""" + return [tcf_vendor_to_vvt(v) for v in vendors] + + +def _summarize_purposes(purposes: list[int]) -> str: + """Short German summary of purposes.""" + if not purposes: + return "Keine Zwecke angegeben" + cats = set(PURPOSE_CATEGORY.get(p, "sonstig") for p in purposes) + labels = { + "marketing": "Marketing/Werbung", + "statistics": "Analyse/Messung", + "functional": "Funktional", + "necessary": "Technisch notwendig", + "sonstig": "Sonstige", + } + return ", ".join(labels.get(c, c) for c in sorted(cats)) diff --git a/consent-tester/main.py b/consent-tester/main.py index 91c3772..08071d4 100644 --- a/consent-tester/main.py +++ b/consent-tester/main.py @@ -49,6 +49,7 @@ class ScanResponse(BaseModel): structured_checks: list = [] completeness_pct: int = 0 correctness_pct: int = 0 + tcf_vendors: list = [] # Resolved TCF vendor list from GVL @app.get("/health") @@ -102,6 +103,7 @@ async def scan_consent(req: ScanRequest): url=req.url, banner_detected=result.banner_detected, banner_provider=result.banner_provider, + tcf_vendors=result.tcf_vendors, phases=phases, summary={ "critical": sum(1 for v in result.reject_violations if v.severity == "CRITICAL"), diff --git a/consent-tester/services/consent_interceptor.py b/consent-tester/services/consent_interceptor.py index 2583528..2dd28db 100644 --- a/consent-tester/services/consent_interceptor.py +++ b/consent-tester/services/consent_interceptor.py @@ -110,6 +110,39 @@ async def get_consent_state(page) -> dict: return {"gcm_state": {}, "tcf_data": None} +async def extract_tcf_vendors(page) -> list[dict]: + """Extract full TCF vendor list from page via __tcfapi + GVL resolution. + + Returns list of resolved vendors with names, purposes, countries, etc. + Returns empty list if no TCF API is available on the page. + """ + state = await get_consent_state(page) + tcf = state.get("tcf_data") + if not tcf: + return [] + + vendor_map = tcf.get("vendor", {}) + consents = vendor_map.get("consents", {}) + if not consents: + return [] + + vendor_ids = [int(k) for k, v in consents.items() if v] + if not vendor_ids: + return [] + + try: + from .gvl_cache import GVLCache + gvl = GVLCache() + resolved = await gvl.resolve_vendors(vendor_ids) + logger.info("TCF: %d/%d vendors resolved via GVL", len(resolved), len(vendor_ids)) + return resolved + except Exception as e: + logger.warning("TCF vendor resolution failed: %s", e) + # Fallback: return unresolved IDs + return [{"vendor_id": vid, "name": f"Vendor #{vid}", "purposes": []} + for vid in vendor_ids[:50]] + + # -- Internal helpers -------------------------------------------------------- def _is_tracking_event(event_data: dict) -> bool: diff --git a/consent-tester/services/consent_scanner.py b/consent-tester/services/consent_scanner.py index 7ce9622..761caed 100644 --- a/consent-tester/services/consent_scanner.py +++ b/consent-tester/services/consent_scanner.py @@ -65,6 +65,8 @@ class ConsentTestResult: banner_has_dse_link: bool = False # Deep verification (per-phase intercepted data) deep_verification: dict = field(default_factory=dict) + # TCF vendors (resolved via GVL after accept phase) + tcf_vendors: list = field(default_factory=list) async def run_consent_test( @@ -239,6 +241,13 @@ async def run_consent_test( accept_tracking = find_tracking_services(result.accept_scripts) result.accept_new_tracking = [t for t in accept_tracking if t not in result.before_tracking] + # TCF vendor extraction (after accept, while page is still open) + try: + from services.consent_interceptor import extract_tcf_vendors + result.tcf_vendors = await extract_tcf_vendors(page_c) + except Exception as exc: + logger.warning("TCF vendor extraction failed: %s", exc) + await ctx_c.close() # ── Phase D-F: Per-category tests ──────────────────────── diff --git a/consent-tester/services/gvl_cache.py b/consent-tester/services/gvl_cache.py new file mode 100644 index 0000000..9b2b3c5 --- /dev/null +++ b/consent-tester/services/gvl_cache.py @@ -0,0 +1,147 @@ +""" +IAB Global Vendor List (GVL) Cache. + +Downloads and caches the TCF v2.2 Global Vendor List from the IAB. +Used to resolve TCF vendor IDs to human-readable names, purposes, +policy URLs, retention periods, and country information. + +GVL spec: https://github.com/InteractiveAdvertisingBureau/GDPR-Transparency-and-Consent-Framework +""" + +import json +import logging +import os +import time +from pathlib import Path + +import httpx + +logger = logging.getLogger(__name__) + +GVL_URL = "https://vendor-list.consensu.org/v3/vendor-list.json" +GVL_CACHE_DIR = Path(os.getenv("GVL_CACHE_DIR", "/tmp/gvl_cache")) +GVL_CACHE_FILE = GVL_CACHE_DIR / "vendor-list.json" +GVL_TTL_SECONDS = 86400 # 24 hours + +# IAB TCF v2.2 Purpose definitions (German) +TCF_PURPOSES = { + 1: "Speicherung/Zugriff auf Endgeraet", + 2: "Auswahl einfacher Anzeigen", + 3: "Personalisiertes Anzeigenprofil erstellen", + 4: "Personalisierte Anzeigen auswaehlen", + 5: "Personalisiertes Inhaltsprofil erstellen", + 6: "Personalisierte Inhalte auswaehlen", + 7: "Anzeigenleistung messen", + 8: "Inhaltsleistung messen", + 9: "Marktforschung", + 10: "Produkte entwickeln und verbessern", + 11: "Geraeteeigenschaften zur Identifizierung nutzen", +} + +# EWR countries (for third-country detection) +EU_EWR_COUNTRIES = { + "AT", "BE", "BG", "HR", "CY", "CZ", "DK", "EE", "FI", "FR", + "DE", "GR", "HU", "IE", "IT", "LV", "LT", "LU", "MT", "NL", + "PL", "PT", "RO", "SK", "SI", "ES", "SE", + "IS", "LI", "NO", # EWR + "CH", "GB", # Angemessenheitsbeschluss +} + + +class GVLCache: + """Cache for IAB Global Vendor List with file-based persistence.""" + + def __init__(self): + self._vendors: dict[int, dict] | None = None + self._loaded_at: float = 0 + + async def get_vendor(self, vendor_id: int) -> dict | None: + """Get a single vendor by ID.""" + vendors = await self._ensure_loaded() + return vendors.get(vendor_id) + + async def resolve_vendors(self, vendor_ids: list[int]) -> list[dict]: + """Resolve multiple vendor IDs to full vendor dicts.""" + vendors = await self._ensure_loaded() + result = [] + for vid in vendor_ids: + v = vendors.get(vid) + if v: + result.append(self._normalize(v)) + return result + + async def _ensure_loaded(self) -> dict[int, dict]: + """Load from cache or download if stale.""" + now = time.time() + if self._vendors and (now - self._loaded_at) < GVL_TTL_SECONDS: + return self._vendors + + # Try file cache first + if GVL_CACHE_FILE.exists(): + age = now - GVL_CACHE_FILE.stat().st_mtime + if age < GVL_TTL_SECONDS: + try: + data = json.loads(GVL_CACHE_FILE.read_text()) + self._vendors = { + int(k): v for k, v in data.get("vendors", {}).items() + } + self._loaded_at = now + logger.info("GVL loaded from cache (%d vendors)", len(self._vendors)) + return self._vendors + except Exception as e: + logger.warning("GVL cache read failed: %s", e) + + # Download fresh + try: + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.get(GVL_URL) + resp.raise_for_status() + data = resp.json() + + self._vendors = { + int(k): v for k, v in data.get("vendors", {}).items() + } + self._loaded_at = now + + # Persist to file + GVL_CACHE_DIR.mkdir(parents=True, exist_ok=True) + GVL_CACHE_FILE.write_text(json.dumps(data)) + logger.info("GVL downloaded (%d vendors)", len(self._vendors)) + except Exception as e: + logger.error("GVL download failed: %s", e) + self._vendors = self._vendors or {} + + return self._vendors + + def _normalize(self, vendor: dict) -> dict: + """Normalize a GVL vendor entry to a clean dict.""" + retention = vendor.get("cookieMaxAgeSeconds") + country = self._detect_country(vendor) + return { + "vendor_id": vendor.get("id"), + "name": vendor.get("name", ""), + "purposes": vendor.get("purposes", []), + "leg_int_purposes": vendor.get("legIntPurposes", []), + "special_purposes": vendor.get("specialPurposes", []), + "features": vendor.get("features", []), + "policy_url": vendor.get("policyUrl", ""), + "retention_days": (retention // 86400) if retention else None, + "uses_cookies": vendor.get("usesCookies", False), + "country": country, + "is_eu": country in EU_EWR_COUNTRIES if country else None, + "purpose_names": [ + TCF_PURPOSES.get(p, f"Zweck {p}") for p in vendor.get("purposes", []) + ], + } + + @staticmethod + def _detect_country(vendor: dict) -> str | None: + """Detect vendor country from overflow domain or known mappings.""" + overflow = vendor.get("overflow", {}) + domain = overflow.get("httpOnlyDomain", "") + if domain: + tld = domain.rsplit(".", 1)[-1].upper() + tld_map = {"COM": "US", "CO": "US", "IO": "US", "DE": "DE", + "FR": "FR", "NL": "NL", "SE": "SE", "UK": "GB"} + return tld_map.get(tld) + return None