feat(tcf-vendors): GVL cache + vendor extraction + VVT mapping
CI / loc-budget (push) Failing after 16s
Build + Deploy / build-admin-compliance (push) Successful in 14s
Build + Deploy / build-backend-compliance (push) Successful in 16s
Build + Deploy / build-ai-sdk (push) Successful in 20s
Build + Deploy / build-developer-portal (push) Successful in 12s
Build + Deploy / build-tts (push) Successful in 15s
Build + Deploy / build-document-crawler (push) Successful in 13s
Build + Deploy / build-dsms-gateway (push) Successful in 13s
Build + Deploy / build-dsms-node (push) Successful in 12s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / test-python-document-crawler (push) Successful in 26s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m49s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Successful in 45s
CI / test-python-backend (push) Successful in 38s
CI / test-python-dsms-gateway (push) Successful in 23s
CI / validate-canonical-controls (push) Successful in 15s
Build + Deploy / trigger-orca (push) Successful in 2m23s
CI / loc-budget (push) Failing after 16s
Build + Deploy / build-admin-compliance (push) Successful in 14s
Build + Deploy / build-backend-compliance (push) Successful in 16s
Build + Deploy / build-ai-sdk (push) Successful in 20s
Build + Deploy / build-developer-portal (push) Successful in 12s
Build + Deploy / build-tts (push) Successful in 15s
Build + Deploy / build-document-crawler (push) Successful in 13s
Build + Deploy / build-dsms-gateway (push) Successful in 13s
Build + Deploy / build-dsms-node (push) Successful in 12s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / test-python-document-crawler (push) Successful in 26s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m49s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Successful in 45s
CI / test-python-backend (push) Successful in 38s
CI / test-python-dsms-gateway (push) Successful in 23s
CI / validate-canonical-controls (push) Successful in 15s
Build + Deploy / trigger-orca (push) Successful in 2m23s
Phase 1-2 of the closed quality loop: - GVL cache (consent-tester/services/gvl_cache.py): downloads and caches IAB Global Vendor List with 24h TTL, resolves vendor IDs to names, purposes, policy URLs, retention, country - Vendor extraction (consent_interceptor.py): extract_tcf_vendors() reads __tcfapi after accept phase, resolves via GVL - Scan response: tcf_vendors field added to /scan endpoint - VVT mapper (vendor_vvt_mapper.py): maps TCF vendors to VVT format with purpose labels, Rechtsgrundlage, Drittland detection - Vendor cross-check (banner_cookie_cross_check.py): checks all TCF vendors against DSI text — missing vendors, undocumented transfers - Compliance check integrates Step 3d: TCF vendors vs DSI Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -258,16 +258,29 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
|
||||
banner_result, doc_texts["cookie"],
|
||||
)
|
||||
if cross_findings:
|
||||
# Add cross-check findings to cookie results
|
||||
for r in results:
|
||||
if r.doc_type == "cookie":
|
||||
for cf in cross_findings:
|
||||
r.checks.append(CheckItem(**cf))
|
||||
# Recompute
|
||||
l2 = [c for c in r.checks if c.level == 2 and not c.skipped]
|
||||
l2p = sum(1 for c in l2 if c.passed)
|
||||
r.correctness_pct = round(l2p / len(l2) * 100) if l2 else 0
|
||||
|
||||
# Step 3d: TCF Vendor cross-check against DSI
|
||||
tcf_vendors = banner_result.get("tcf_vendors", []) if banner_result else []
|
||||
vvt_entries: list[dict] = []
|
||||
if tcf_vendors and "dse" in doc_texts:
|
||||
_update(check_id, f"{len(tcf_vendors)} TCF-Verarbeiter vs. DSI abgleichen...")
|
||||
from compliance.services.banner_cookie_cross_check import cross_check_vendors_vs_dsi
|
||||
from compliance.services.vendor_vvt_mapper import map_vendors_to_vvt
|
||||
vendor_findings = cross_check_vendors_vs_dsi(tcf_vendors, doc_texts["dse"])
|
||||
if vendor_findings:
|
||||
for r in results:
|
||||
if r.doc_type == "dse":
|
||||
for vf in vendor_findings:
|
||||
r.checks.append(CheckItem(**vf))
|
||||
vvt_entries = map_vendors_to_vvt(tcf_vendors)
|
||||
|
||||
# Step 4: Extract profile hints from documents
|
||||
_update(check_id, "Profil wird aus Dokumenten extrahiert...")
|
||||
from compliance.services.profile_extractor import extract_profile_from_documents
|
||||
@@ -307,7 +320,9 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
|
||||
"detected": banner_result.get("banner_detected", False) if banner_result else False,
|
||||
"provider": banner_result.get("banner_provider", "") if banner_result else "",
|
||||
"violations": len(banner_result.get("banner_checks", {}).get("violations", [])) if banner_result else 0,
|
||||
"tcf_vendor_count": len(tcf_vendors),
|
||||
} if banner_result else None,
|
||||
"tcf_vendors": vvt_entries if tcf_vendors else [],
|
||||
"total_documents": len(results),
|
||||
"total_findings": total_findings,
|
||||
"email_status": email_result.get("status", "failed"),
|
||||
|
||||
@@ -143,3 +143,83 @@ def cross_check_banner_vs_cookie(
|
||||
logger.info("Cross-check: %d findings (%d services, %d tracking before)",
|
||||
len(findings), len(all_tracking), len(tracking_before))
|
||||
return findings
|
||||
|
||||
|
||||
def cross_check_vendors_vs_dsi(
|
||||
vendors: list[dict],
|
||||
dsi_text: str,
|
||||
) -> list[dict]:
|
||||
"""Cross-check: Are all TCF vendors documented in the DSI?
|
||||
|
||||
Checks per vendor:
|
||||
1. Is the vendor mentioned by name?
|
||||
2. Is third-country transfer documented (if non-EU)?
|
||||
3. Is storage duration mentioned?
|
||||
|
||||
Returns list of CheckItem-compatible dicts.
|
||||
"""
|
||||
findings: list[dict] = []
|
||||
dsi_lower = dsi_text.lower()
|
||||
|
||||
for v in vendors:
|
||||
name = v.get("name", "")
|
||||
name_lower = name.lower()
|
||||
if not name_lower:
|
||||
continue
|
||||
|
||||
# Check if vendor is mentioned in DSI
|
||||
mentioned = any(kw in dsi_lower for kw in [
|
||||
name_lower,
|
||||
name_lower.replace(" ", ""),
|
||||
name_lower.split()[0] if " " in name_lower else name_lower,
|
||||
])
|
||||
|
||||
if not mentioned:
|
||||
findings.append({
|
||||
"id": f"vendor-{v.get('vendor_id', name_lower[:20])}",
|
||||
"label": f"Verarbeiter '{name}' fehlt in DSI",
|
||||
"passed": False,
|
||||
"severity": "HIGH",
|
||||
"level": 2,
|
||||
"parent": None,
|
||||
"skipped": False,
|
||||
"matched_text": "",
|
||||
"hint": (
|
||||
f"Der Cookie-Banner listet '{name}' als Verarbeiter "
|
||||
f"({v.get('zweck_kurz', 'unbekannt')}), aber die DSI "
|
||||
f"erwaehnt diesen Dienst nicht. Art. 13(1)(e) DSGVO "
|
||||
f"verlangt die Benennung aller Empfaenger."
|
||||
),
|
||||
"source": "vendor_cross_check",
|
||||
})
|
||||
|
||||
# Check third-country transfer documentation
|
||||
if v.get("drittland") and mentioned:
|
||||
country = v.get("land", "Drittland")
|
||||
transfer_mentioned = any(kw in dsi_lower for kw in [
|
||||
name_lower + ".*" + "usa",
|
||||
name_lower + ".*" + "drittland",
|
||||
"scc", "standardvertragsklausel", "data privacy framework",
|
||||
"angemessenheitsbeschluss",
|
||||
])
|
||||
if not transfer_mentioned:
|
||||
findings.append({
|
||||
"id": f"vendor-transfer-{v.get('vendor_id', '')}",
|
||||
"label": f"Drittlandtransfer fuer '{name}' nicht dokumentiert",
|
||||
"passed": False,
|
||||
"severity": "MEDIUM",
|
||||
"level": 2,
|
||||
"parent": None,
|
||||
"skipped": False,
|
||||
"matched_text": "",
|
||||
"hint": (
|
||||
f"'{name}' verarbeitet Daten in {country} (ausserhalb EWR). "
|
||||
f"Die DSI muss den Transfermechanismus benennen: "
|
||||
f"SCC (Art. 46(2)(c)) oder DPF (Angemessenheitsbeschluss)."
|
||||
),
|
||||
"source": "vendor_cross_check",
|
||||
})
|
||||
|
||||
logger.info("Vendor cross-check: %d findings for %d vendors",
|
||||
len(findings), len(vendors))
|
||||
return findings
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
"""
|
||||
Vendor VVT Mapper — map TCF vendors to VVT entries.
|
||||
|
||||
Converts resolved TCF vendor data (from GVL) into the format
|
||||
needed for the Verarbeitungsverzeichnis (VVT) and for DSI
|
||||
cross-checking.
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# IAB TCF v2.2 Purpose definitions (German)
|
||||
TCF_PURPOSE_LABELS = {
|
||||
1: "Speicherung/Zugriff auf Endgeraet",
|
||||
2: "Auswahl einfacher Anzeigen",
|
||||
3: "Personalisiertes Anzeigenprofil erstellen",
|
||||
4: "Personalisierte Anzeigen auswaehlen",
|
||||
5: "Personalisiertes Inhaltsprofil erstellen",
|
||||
6: "Personalisierte Inhalte auswaehlen",
|
||||
7: "Anzeigenleistung messen",
|
||||
8: "Inhaltsleistung messen",
|
||||
9: "Marktforschung",
|
||||
10: "Produkte entwickeln und verbessern",
|
||||
11: "Geraeteeigenschaften zur Identifizierung nutzen",
|
||||
}
|
||||
|
||||
# Purpose → Banner-Kategorie Mapping
|
||||
PURPOSE_CATEGORY = {
|
||||
1: "necessary",
|
||||
2: "marketing", 3: "marketing", 4: "marketing",
|
||||
5: "marketing", 6: "marketing",
|
||||
7: "statistics", 8: "statistics",
|
||||
9: "statistics", 10: "functional", 11: "functional",
|
||||
}
|
||||
|
||||
# EWR countries
|
||||
_EU_EWR = {
|
||||
"AT", "BE", "BG", "HR", "CY", "CZ", "DK", "EE", "FI", "FR",
|
||||
"DE", "GR", "HU", "IE", "IT", "LV", "LT", "LU", "MT", "NL",
|
||||
"PL", "PT", "RO", "SK", "SI", "ES", "SE", "IS", "LI", "NO",
|
||||
"CH", "GB",
|
||||
}
|
||||
|
||||
|
||||
def tcf_vendor_to_vvt(vendor: dict) -> dict:
|
||||
"""Map a resolved TCF vendor to a VVT entry.
|
||||
|
||||
Args:
|
||||
vendor: Resolved GVL vendor dict with name, purposes, country, etc.
|
||||
|
||||
Returns:
|
||||
VVT-compatible dict with name, zweck, rechtsgrundlage, drittland, etc.
|
||||
"""
|
||||
purposes = vendor.get("purposes", [])
|
||||
country = vendor.get("country")
|
||||
is_eu = vendor.get("is_eu", country in _EU_EWR if country else None)
|
||||
|
||||
# Determine primary category from purposes
|
||||
categories = set()
|
||||
for p in purposes:
|
||||
cat = PURPOSE_CATEGORY.get(p, "functional")
|
||||
categories.add(cat)
|
||||
|
||||
# Rechtsgrundlage depends on category
|
||||
if "marketing" in categories or "statistics" in categories:
|
||||
rechtsgrundlage = "Einwilligung (Art. 6(1)(a) DSGVO, §25 Abs. 1 TDDDG)"
|
||||
else:
|
||||
rechtsgrundlage = "Berechtigtes Interesse (Art. 6(1)(f) DSGVO, §25 Abs. 2 TDDDG)"
|
||||
|
||||
return {
|
||||
"vendor_id": vendor.get("vendor_id"),
|
||||
"name": vendor.get("name", ""),
|
||||
"zweck": [TCF_PURPOSE_LABELS.get(p, f"Zweck {p}") for p in purposes],
|
||||
"zweck_kurz": _summarize_purposes(purposes),
|
||||
"kategorie": sorted(categories)[0] if categories else "functional",
|
||||
"rechtsgrundlage": rechtsgrundlage,
|
||||
"drittland": not is_eu if is_eu is not None else None,
|
||||
"land": country,
|
||||
"transfermechanismus": "SCC/DPF" if (not is_eu and is_eu is not None) else None,
|
||||
"speicherdauer_tage": vendor.get("retention_days"),
|
||||
"policy_url": vendor.get("policy_url", ""),
|
||||
"uses_cookies": vendor.get("uses_cookies", False),
|
||||
}
|
||||
|
||||
|
||||
def map_vendors_to_vvt(vendors: list[dict]) -> list[dict]:
|
||||
"""Map a list of TCF vendors to VVT entries."""
|
||||
return [tcf_vendor_to_vvt(v) for v in vendors]
|
||||
|
||||
|
||||
def _summarize_purposes(purposes: list[int]) -> str:
|
||||
"""Short German summary of purposes."""
|
||||
if not purposes:
|
||||
return "Keine Zwecke angegeben"
|
||||
cats = set(PURPOSE_CATEGORY.get(p, "sonstig") for p in purposes)
|
||||
labels = {
|
||||
"marketing": "Marketing/Werbung",
|
||||
"statistics": "Analyse/Messung",
|
||||
"functional": "Funktional",
|
||||
"necessary": "Technisch notwendig",
|
||||
"sonstig": "Sonstige",
|
||||
}
|
||||
return ", ".join(labels.get(c, c) for c in sorted(cats))
|
||||
@@ -49,6 +49,7 @@ class ScanResponse(BaseModel):
|
||||
structured_checks: list = []
|
||||
completeness_pct: int = 0
|
||||
correctness_pct: int = 0
|
||||
tcf_vendors: list = [] # Resolved TCF vendor list from GVL
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
@@ -102,6 +103,7 @@ async def scan_consent(req: ScanRequest):
|
||||
url=req.url,
|
||||
banner_detected=result.banner_detected,
|
||||
banner_provider=result.banner_provider,
|
||||
tcf_vendors=result.tcf_vendors,
|
||||
phases=phases,
|
||||
summary={
|
||||
"critical": sum(1 for v in result.reject_violations if v.severity == "CRITICAL"),
|
||||
|
||||
@@ -110,6 +110,39 @@ async def get_consent_state(page) -> dict:
|
||||
return {"gcm_state": {}, "tcf_data": None}
|
||||
|
||||
|
||||
async def extract_tcf_vendors(page) -> list[dict]:
|
||||
"""Extract full TCF vendor list from page via __tcfapi + GVL resolution.
|
||||
|
||||
Returns list of resolved vendors with names, purposes, countries, etc.
|
||||
Returns empty list if no TCF API is available on the page.
|
||||
"""
|
||||
state = await get_consent_state(page)
|
||||
tcf = state.get("tcf_data")
|
||||
if not tcf:
|
||||
return []
|
||||
|
||||
vendor_map = tcf.get("vendor", {})
|
||||
consents = vendor_map.get("consents", {})
|
||||
if not consents:
|
||||
return []
|
||||
|
||||
vendor_ids = [int(k) for k, v in consents.items() if v]
|
||||
if not vendor_ids:
|
||||
return []
|
||||
|
||||
try:
|
||||
from .gvl_cache import GVLCache
|
||||
gvl = GVLCache()
|
||||
resolved = await gvl.resolve_vendors(vendor_ids)
|
||||
logger.info("TCF: %d/%d vendors resolved via GVL", len(resolved), len(vendor_ids))
|
||||
return resolved
|
||||
except Exception as e:
|
||||
logger.warning("TCF vendor resolution failed: %s", e)
|
||||
# Fallback: return unresolved IDs
|
||||
return [{"vendor_id": vid, "name": f"Vendor #{vid}", "purposes": []}
|
||||
for vid in vendor_ids[:50]]
|
||||
|
||||
|
||||
# -- Internal helpers --------------------------------------------------------
|
||||
|
||||
def _is_tracking_event(event_data: dict) -> bool:
|
||||
|
||||
@@ -65,6 +65,8 @@ class ConsentTestResult:
|
||||
banner_has_dse_link: bool = False
|
||||
# Deep verification (per-phase intercepted data)
|
||||
deep_verification: dict = field(default_factory=dict)
|
||||
# TCF vendors (resolved via GVL after accept phase)
|
||||
tcf_vendors: list = field(default_factory=list)
|
||||
|
||||
|
||||
async def run_consent_test(
|
||||
@@ -239,6 +241,13 @@ async def run_consent_test(
|
||||
accept_tracking = find_tracking_services(result.accept_scripts)
|
||||
result.accept_new_tracking = [t for t in accept_tracking if t not in result.before_tracking]
|
||||
|
||||
# TCF vendor extraction (after accept, while page is still open)
|
||||
try:
|
||||
from services.consent_interceptor import extract_tcf_vendors
|
||||
result.tcf_vendors = await extract_tcf_vendors(page_c)
|
||||
except Exception as exc:
|
||||
logger.warning("TCF vendor extraction failed: %s", exc)
|
||||
|
||||
await ctx_c.close()
|
||||
|
||||
# ── Phase D-F: Per-category tests ────────────────────────
|
||||
|
||||
@@ -0,0 +1,147 @@
|
||||
"""
|
||||
IAB Global Vendor List (GVL) Cache.
|
||||
|
||||
Downloads and caches the TCF v2.2 Global Vendor List from the IAB.
|
||||
Used to resolve TCF vendor IDs to human-readable names, purposes,
|
||||
policy URLs, retention periods, and country information.
|
||||
|
||||
GVL spec: https://github.com/InteractiveAdvertisingBureau/GDPR-Transparency-and-Consent-Framework
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
GVL_URL = "https://vendor-list.consensu.org/v3/vendor-list.json"
|
||||
GVL_CACHE_DIR = Path(os.getenv("GVL_CACHE_DIR", "/tmp/gvl_cache"))
|
||||
GVL_CACHE_FILE = GVL_CACHE_DIR / "vendor-list.json"
|
||||
GVL_TTL_SECONDS = 86400 # 24 hours
|
||||
|
||||
# IAB TCF v2.2 Purpose definitions (German)
|
||||
TCF_PURPOSES = {
|
||||
1: "Speicherung/Zugriff auf Endgeraet",
|
||||
2: "Auswahl einfacher Anzeigen",
|
||||
3: "Personalisiertes Anzeigenprofil erstellen",
|
||||
4: "Personalisierte Anzeigen auswaehlen",
|
||||
5: "Personalisiertes Inhaltsprofil erstellen",
|
||||
6: "Personalisierte Inhalte auswaehlen",
|
||||
7: "Anzeigenleistung messen",
|
||||
8: "Inhaltsleistung messen",
|
||||
9: "Marktforschung",
|
||||
10: "Produkte entwickeln und verbessern",
|
||||
11: "Geraeteeigenschaften zur Identifizierung nutzen",
|
||||
}
|
||||
|
||||
# EWR countries (for third-country detection)
|
||||
EU_EWR_COUNTRIES = {
|
||||
"AT", "BE", "BG", "HR", "CY", "CZ", "DK", "EE", "FI", "FR",
|
||||
"DE", "GR", "HU", "IE", "IT", "LV", "LT", "LU", "MT", "NL",
|
||||
"PL", "PT", "RO", "SK", "SI", "ES", "SE",
|
||||
"IS", "LI", "NO", # EWR
|
||||
"CH", "GB", # Angemessenheitsbeschluss
|
||||
}
|
||||
|
||||
|
||||
class GVLCache:
|
||||
"""Cache for IAB Global Vendor List with file-based persistence."""
|
||||
|
||||
def __init__(self):
|
||||
self._vendors: dict[int, dict] | None = None
|
||||
self._loaded_at: float = 0
|
||||
|
||||
async def get_vendor(self, vendor_id: int) -> dict | None:
|
||||
"""Get a single vendor by ID."""
|
||||
vendors = await self._ensure_loaded()
|
||||
return vendors.get(vendor_id)
|
||||
|
||||
async def resolve_vendors(self, vendor_ids: list[int]) -> list[dict]:
|
||||
"""Resolve multiple vendor IDs to full vendor dicts."""
|
||||
vendors = await self._ensure_loaded()
|
||||
result = []
|
||||
for vid in vendor_ids:
|
||||
v = vendors.get(vid)
|
||||
if v:
|
||||
result.append(self._normalize(v))
|
||||
return result
|
||||
|
||||
async def _ensure_loaded(self) -> dict[int, dict]:
|
||||
"""Load from cache or download if stale."""
|
||||
now = time.time()
|
||||
if self._vendors and (now - self._loaded_at) < GVL_TTL_SECONDS:
|
||||
return self._vendors
|
||||
|
||||
# Try file cache first
|
||||
if GVL_CACHE_FILE.exists():
|
||||
age = now - GVL_CACHE_FILE.stat().st_mtime
|
||||
if age < GVL_TTL_SECONDS:
|
||||
try:
|
||||
data = json.loads(GVL_CACHE_FILE.read_text())
|
||||
self._vendors = {
|
||||
int(k): v for k, v in data.get("vendors", {}).items()
|
||||
}
|
||||
self._loaded_at = now
|
||||
logger.info("GVL loaded from cache (%d vendors)", len(self._vendors))
|
||||
return self._vendors
|
||||
except Exception as e:
|
||||
logger.warning("GVL cache read failed: %s", e)
|
||||
|
||||
# Download fresh
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||
resp = await client.get(GVL_URL)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
||||
self._vendors = {
|
||||
int(k): v for k, v in data.get("vendors", {}).items()
|
||||
}
|
||||
self._loaded_at = now
|
||||
|
||||
# Persist to file
|
||||
GVL_CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
GVL_CACHE_FILE.write_text(json.dumps(data))
|
||||
logger.info("GVL downloaded (%d vendors)", len(self._vendors))
|
||||
except Exception as e:
|
||||
logger.error("GVL download failed: %s", e)
|
||||
self._vendors = self._vendors or {}
|
||||
|
||||
return self._vendors
|
||||
|
||||
def _normalize(self, vendor: dict) -> dict:
|
||||
"""Normalize a GVL vendor entry to a clean dict."""
|
||||
retention = vendor.get("cookieMaxAgeSeconds")
|
||||
country = self._detect_country(vendor)
|
||||
return {
|
||||
"vendor_id": vendor.get("id"),
|
||||
"name": vendor.get("name", ""),
|
||||
"purposes": vendor.get("purposes", []),
|
||||
"leg_int_purposes": vendor.get("legIntPurposes", []),
|
||||
"special_purposes": vendor.get("specialPurposes", []),
|
||||
"features": vendor.get("features", []),
|
||||
"policy_url": vendor.get("policyUrl", ""),
|
||||
"retention_days": (retention // 86400) if retention else None,
|
||||
"uses_cookies": vendor.get("usesCookies", False),
|
||||
"country": country,
|
||||
"is_eu": country in EU_EWR_COUNTRIES if country else None,
|
||||
"purpose_names": [
|
||||
TCF_PURPOSES.get(p, f"Zweck {p}") for p in vendor.get("purposes", [])
|
||||
],
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _detect_country(vendor: dict) -> str | None:
|
||||
"""Detect vendor country from overflow domain or known mappings."""
|
||||
overflow = vendor.get("overflow", {})
|
||||
domain = overflow.get("httpOnlyDomain", "")
|
||||
if domain:
|
||||
tld = domain.rsplit(".", 1)[-1].upper()
|
||||
tld_map = {"COM": "US", "CO": "US", "IO": "US", "DE": "DE",
|
||||
"FR": "FR", "NL": "NL", "SE": "SE", "UK": "GB"}
|
||||
return tld_map.get(tld)
|
||||
return None
|
||||
Reference in New Issue
Block a user