Files
breakpilot-compliance/consent-tester/services/gvl_cache.py
T
Benjamin Admin c867478791
CI / loc-budget (push) Failing after 16s
Build + Deploy / build-admin-compliance (push) Successful in 14s
Build + Deploy / build-backend-compliance (push) Successful in 16s
Build + Deploy / build-ai-sdk (push) Successful in 20s
Build + Deploy / build-developer-portal (push) Successful in 12s
Build + Deploy / build-tts (push) Successful in 15s
Build + Deploy / build-document-crawler (push) Successful in 13s
Build + Deploy / build-dsms-gateway (push) Successful in 13s
Build + Deploy / build-dsms-node (push) Successful in 12s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / test-python-document-crawler (push) Successful in 26s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m49s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Successful in 45s
CI / test-python-backend (push) Successful in 38s
CI / test-python-dsms-gateway (push) Successful in 23s
CI / validate-canonical-controls (push) Successful in 15s
Build + Deploy / trigger-orca (push) Successful in 2m23s
feat(tcf-vendors): GVL cache + vendor extraction + VVT mapping
Phase 1-2 of the closed quality loop:
- GVL cache (consent-tester/services/gvl_cache.py): downloads and caches
  IAB Global Vendor List with 24h TTL, resolves vendor IDs to names,
  purposes, policy URLs, retention, country
- Vendor extraction (consent_interceptor.py): extract_tcf_vendors()
  reads __tcfapi after accept phase, resolves via GVL
- Scan response: tcf_vendors field added to /scan endpoint
- VVT mapper (vendor_vvt_mapper.py): maps TCF vendors to VVT format
  with purpose labels, Rechtsgrundlage, Drittland detection
- Vendor cross-check (banner_cookie_cross_check.py): checks all TCF
  vendors against DSI text — missing vendors, undocumented transfers
- Compliance check integrates Step 3d: TCF vendors vs DSI

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-12 18:18:50 +02:00

148 lines
5.3 KiB
Python

"""
IAB Global Vendor List (GVL) Cache.
Downloads and caches the TCF v2.2 Global Vendor List from the IAB.
Used to resolve TCF vendor IDs to human-readable names, purposes,
policy URLs, retention periods, and country information.
GVL spec: https://github.com/InteractiveAdvertisingBureau/GDPR-Transparency-and-Consent-Framework
"""
import json
import logging
import os
import time
from pathlib import Path
import httpx
logger = logging.getLogger(__name__)
GVL_URL = "https://vendor-list.consensu.org/v3/vendor-list.json"
GVL_CACHE_DIR = Path(os.getenv("GVL_CACHE_DIR", "/tmp/gvl_cache"))
GVL_CACHE_FILE = GVL_CACHE_DIR / "vendor-list.json"
GVL_TTL_SECONDS = 86400 # 24 hours
# IAB TCF v2.2 Purpose definitions (German)
TCF_PURPOSES = {
1: "Speicherung/Zugriff auf Endgeraet",
2: "Auswahl einfacher Anzeigen",
3: "Personalisiertes Anzeigenprofil erstellen",
4: "Personalisierte Anzeigen auswaehlen",
5: "Personalisiertes Inhaltsprofil erstellen",
6: "Personalisierte Inhalte auswaehlen",
7: "Anzeigenleistung messen",
8: "Inhaltsleistung messen",
9: "Marktforschung",
10: "Produkte entwickeln und verbessern",
11: "Geraeteeigenschaften zur Identifizierung nutzen",
}
# EWR countries (for third-country detection)
EU_EWR_COUNTRIES = {
"AT", "BE", "BG", "HR", "CY", "CZ", "DK", "EE", "FI", "FR",
"DE", "GR", "HU", "IE", "IT", "LV", "LT", "LU", "MT", "NL",
"PL", "PT", "RO", "SK", "SI", "ES", "SE",
"IS", "LI", "NO", # EWR
"CH", "GB", # Angemessenheitsbeschluss
}
class GVLCache:
"""Cache for IAB Global Vendor List with file-based persistence."""
def __init__(self):
self._vendors: dict[int, dict] | None = None
self._loaded_at: float = 0
async def get_vendor(self, vendor_id: int) -> dict | None:
"""Get a single vendor by ID."""
vendors = await self._ensure_loaded()
return vendors.get(vendor_id)
async def resolve_vendors(self, vendor_ids: list[int]) -> list[dict]:
"""Resolve multiple vendor IDs to full vendor dicts."""
vendors = await self._ensure_loaded()
result = []
for vid in vendor_ids:
v = vendors.get(vid)
if v:
result.append(self._normalize(v))
return result
async def _ensure_loaded(self) -> dict[int, dict]:
"""Load from cache or download if stale."""
now = time.time()
if self._vendors and (now - self._loaded_at) < GVL_TTL_SECONDS:
return self._vendors
# Try file cache first
if GVL_CACHE_FILE.exists():
age = now - GVL_CACHE_FILE.stat().st_mtime
if age < GVL_TTL_SECONDS:
try:
data = json.loads(GVL_CACHE_FILE.read_text())
self._vendors = {
int(k): v for k, v in data.get("vendors", {}).items()
}
self._loaded_at = now
logger.info("GVL loaded from cache (%d vendors)", len(self._vendors))
return self._vendors
except Exception as e:
logger.warning("GVL cache read failed: %s", e)
# Download fresh
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.get(GVL_URL)
resp.raise_for_status()
data = resp.json()
self._vendors = {
int(k): v for k, v in data.get("vendors", {}).items()
}
self._loaded_at = now
# Persist to file
GVL_CACHE_DIR.mkdir(parents=True, exist_ok=True)
GVL_CACHE_FILE.write_text(json.dumps(data))
logger.info("GVL downloaded (%d vendors)", len(self._vendors))
except Exception as e:
logger.error("GVL download failed: %s", e)
self._vendors = self._vendors or {}
return self._vendors
def _normalize(self, vendor: dict) -> dict:
"""Normalize a GVL vendor entry to a clean dict."""
retention = vendor.get("cookieMaxAgeSeconds")
country = self._detect_country(vendor)
return {
"vendor_id": vendor.get("id"),
"name": vendor.get("name", ""),
"purposes": vendor.get("purposes", []),
"leg_int_purposes": vendor.get("legIntPurposes", []),
"special_purposes": vendor.get("specialPurposes", []),
"features": vendor.get("features", []),
"policy_url": vendor.get("policyUrl", ""),
"retention_days": (retention // 86400) if retention else None,
"uses_cookies": vendor.get("usesCookies", False),
"country": country,
"is_eu": country in EU_EWR_COUNTRIES if country else None,
"purpose_names": [
TCF_PURPOSES.get(p, f"Zweck {p}") for p in vendor.get("purposes", [])
],
}
@staticmethod
def _detect_country(vendor: dict) -> str | None:
"""Detect vendor country from overflow domain or known mappings."""
overflow = vendor.get("overflow", {})
domain = overflow.get("httpOnlyDomain", "")
if domain:
tld = domain.rsplit(".", 1)[-1].upper()
tld_map = {"COM": "US", "CO": "US", "IO": "US", "DE": "DE",
"FR": "FR", "NL": "NL", "SE": "SE", "UK": "GB"}
return tld_map.get(tld)
return None