c867478791
CI / loc-budget (push) Failing after 16s
Build + Deploy / build-admin-compliance (push) Successful in 14s
Build + Deploy / build-backend-compliance (push) Successful in 16s
Build + Deploy / build-ai-sdk (push) Successful in 20s
Build + Deploy / build-developer-portal (push) Successful in 12s
Build + Deploy / build-tts (push) Successful in 15s
Build + Deploy / build-document-crawler (push) Successful in 13s
Build + Deploy / build-dsms-gateway (push) Successful in 13s
Build + Deploy / build-dsms-node (push) Successful in 12s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / test-python-document-crawler (push) Successful in 26s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m49s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Successful in 45s
CI / test-python-backend (push) Successful in 38s
CI / test-python-dsms-gateway (push) Successful in 23s
CI / validate-canonical-controls (push) Successful in 15s
Build + Deploy / trigger-orca (push) Successful in 2m23s
Phase 1-2 of the closed quality loop: - GVL cache (consent-tester/services/gvl_cache.py): downloads and caches IAB Global Vendor List with 24h TTL, resolves vendor IDs to names, purposes, policy URLs, retention, country - Vendor extraction (consent_interceptor.py): extract_tcf_vendors() reads __tcfapi after accept phase, resolves via GVL - Scan response: tcf_vendors field added to /scan endpoint - VVT mapper (vendor_vvt_mapper.py): maps TCF vendors to VVT format with purpose labels, Rechtsgrundlage, Drittland detection - Vendor cross-check (banner_cookie_cross_check.py): checks all TCF vendors against DSI text — missing vendors, undocumented transfers - Compliance check integrates Step 3d: TCF vendors vs DSI Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
223 lines
8.5 KiB
Python
223 lines
8.5 KiB
Python
"""
|
|
Consent Interceptor — JS injection for deep consent verification.
|
|
|
|
Intercepts dataLayer.push (GTM), gtag() (GCM), Storage.setItem before page load.
|
|
Provides helpers to collect, read, and analyze intercepted data per phase.
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Tracking event patterns (dataLayer) — NOT gtm.js/gtm.dom/gtm.load
|
|
TRACKING_DATALAYER_PATTERNS: list[re.Pattern] = [
|
|
re.compile(p, re.IGNORECASE) for p in [
|
|
r'"event"\s*:\s*"(gtm\.click|ga4|conversion|purchase|page_view|add_to_cart|begin_checkout)',
|
|
r'"event"\s*:\s*"(fb|facebook|meta)\.',
|
|
r'"event"\s*:\s*"(hotjar|hj\.|clarity|linkedin|tiktok|pinterest|criteo)',
|
|
r'"event"\s*:\s*"track(ing)?',
|
|
]
|
|
]
|
|
_SAFE_DATALAYER_EVENTS = {
|
|
"gtm.js", "gtm.dom", "gtm.load", "gtm.init",
|
|
"gtm.historyChange", "gtm.scrollDepth", "optimize.activate", "consent_update",
|
|
}
|
|
|
|
# Storage key prefixes that indicate tracking
|
|
TRACKING_STORAGE_KEYS: list[str] = [
|
|
"_ga", "_gid", "_gat", "_fbp", "_fbc", "_gcl",
|
|
"amplitude", "mixpanel",
|
|
"_hjSession", "_hjIncludedInPageviewSample", "_hjid",
|
|
"_clck", "_clsk", "ai_session", "ai_user", "_pin_unauth", "sc_at",
|
|
]
|
|
|
|
# JS injected via page.addInitScript() BEFORE page loads
|
|
INIT_SCRIPT: str = """(() => {
|
|
window.__bp_events = [];
|
|
window.__bp_consent_updates = [];
|
|
window.__bp_storage_changes = [];
|
|
const _safe = v => { try { return JSON.parse(JSON.stringify(v)); } catch(_) { return {}; } };
|
|
function proxyDL(arr) {
|
|
const p = new Proxy(arr, {
|
|
set(t, k, v) { t[k] = v;
|
|
if (k !== 'length') window.__bp_events.push({ts: Date.now(), data: _safe(v)});
|
|
return true; }
|
|
});
|
|
const origPush = Array.prototype.push;
|
|
p.push = function(...a) {
|
|
for (const i of a) window.__bp_events.push({ts: Date.now(), data: _safe(i)});
|
|
return origPush.apply(this, a);
|
|
};
|
|
return p;
|
|
}
|
|
let _dl = window.dataLayer ? proxyDL(window.dataLayer) : undefined;
|
|
Object.defineProperty(window, 'dataLayer', {
|
|
configurable: true,
|
|
get() { return _dl; },
|
|
set(v) { _dl = Array.isArray(v) ? proxyDL(v) : v; }
|
|
});
|
|
const origGtag = window.gtag;
|
|
window.gtag = function() {
|
|
const a = Array.from(arguments);
|
|
window.__bp_consent_updates.push({ts: Date.now(), action: a[0]||'', params: a.length>1 ? _safe(a.slice(1)) : []});
|
|
if (typeof origGtag === 'function') return origGtag.apply(this, arguments);
|
|
};
|
|
const origSet = Storage.prototype.setItem;
|
|
Storage.prototype.setItem = function(k, v) {
|
|
window.__bp_storage_changes.push({ts: Date.now(), type: this===localStorage?'local':'session', key: k, valueLen: (v||'').length});
|
|
return origSet.call(this, k, v);
|
|
};
|
|
})();"""
|
|
|
|
|
|
async def collect_intercepted_data(page) -> dict:
|
|
"""Read back intercepted data arrays from the page context."""
|
|
try:
|
|
return await page.evaluate("""() => ({
|
|
datalayer_events: (window.__bp_events || []).slice(0, 200),
|
|
consent_updates: (window.__bp_consent_updates || []).slice(0, 100),
|
|
storage_changes: (window.__bp_storage_changes || []).slice(0, 200),
|
|
})""")
|
|
except Exception as exc:
|
|
logger.warning("collect_intercepted_data failed: %s", exc)
|
|
return {"datalayer_events": [], "consent_updates": [], "storage_changes": []}
|
|
|
|
|
|
async def get_consent_state(page) -> dict:
|
|
"""Read current GCM v2 + TCF v2.2 consent state from the page."""
|
|
try:
|
|
return await page.evaluate("""() => {
|
|
const r = {gcm_state: {}, tcf_data: null};
|
|
if (window.dataLayer) {
|
|
for (const e of window.dataLayer) {
|
|
if (e && e[0] === 'consent') {
|
|
const p = e[2] || {};
|
|
for (const [k,v] of Object.entries(p)) r.gcm_state[k] = v;
|
|
}
|
|
}
|
|
}
|
|
if (typeof window.__tcfapi === 'function') {
|
|
try { window.__tcfapi('getTCData', 2, (d, ok) => {
|
|
if (ok) r.tcf_data = {tcString: d.tcString||'', gdprApplies: d.gdprApplies,
|
|
purpose: d.purpose||{}, vendor: d.vendor||{}};
|
|
}); } catch(_) {}
|
|
}
|
|
return r;
|
|
}""")
|
|
except Exception as exc:
|
|
logger.warning("get_consent_state failed: %s", exc)
|
|
return {"gcm_state": {}, "tcf_data": None}
|
|
|
|
|
|
async def extract_tcf_vendors(page) -> list[dict]:
|
|
"""Extract full TCF vendor list from page via __tcfapi + GVL resolution.
|
|
|
|
Returns list of resolved vendors with names, purposes, countries, etc.
|
|
Returns empty list if no TCF API is available on the page.
|
|
"""
|
|
state = await get_consent_state(page)
|
|
tcf = state.get("tcf_data")
|
|
if not tcf:
|
|
return []
|
|
|
|
vendor_map = tcf.get("vendor", {})
|
|
consents = vendor_map.get("consents", {})
|
|
if not consents:
|
|
return []
|
|
|
|
vendor_ids = [int(k) for k, v in consents.items() if v]
|
|
if not vendor_ids:
|
|
return []
|
|
|
|
try:
|
|
from .gvl_cache import GVLCache
|
|
gvl = GVLCache()
|
|
resolved = await gvl.resolve_vendors(vendor_ids)
|
|
logger.info("TCF: %d/%d vendors resolved via GVL", len(resolved), len(vendor_ids))
|
|
return resolved
|
|
except Exception as e:
|
|
logger.warning("TCF vendor resolution failed: %s", e)
|
|
# Fallback: return unresolved IDs
|
|
return [{"vendor_id": vid, "name": f"Vendor #{vid}", "purposes": []}
|
|
for vid in vendor_ids[:50]]
|
|
|
|
|
|
# -- Internal helpers --------------------------------------------------------
|
|
|
|
def _is_tracking_event(event_data: dict) -> bool:
|
|
"""True if a dataLayer event dict represents a tracking event."""
|
|
if event_data.get("event", "") in _SAFE_DATALAYER_EVENTS:
|
|
return False
|
|
s = str(event_data)
|
|
return any(p.search(s) for p in TRACKING_DATALAYER_PATTERNS)
|
|
|
|
|
|
def _tracking_storage_keys(changes: list[dict]) -> list[str]:
|
|
"""Return storage keys matching known tracking prefixes."""
|
|
return [ch["key"] for ch in changes
|
|
if any(ch.get("key", "").startswith(p) for p in TRACKING_STORAGE_KEYS)]
|
|
|
|
|
|
def _gcm_all_denied(gcm: dict) -> bool:
|
|
return not gcm or all(v == "denied" for v in gcm.values())
|
|
|
|
|
|
def _violation(code: str, severity: str, text: str) -> dict:
|
|
return {"code": code, "severity": severity, "text": text}
|
|
|
|
|
|
# -- Public analysis ---------------------------------------------------------
|
|
|
|
def analyze_phase_data(
|
|
phase_name: str, intercepted: dict, consent_state: dict,
|
|
) -> list[dict]:
|
|
"""Analyze one phase and return list of {code, severity, text} violations.
|
|
|
|
phase_name: 'before_consent' | 'after_reject' | 'after_accept'
|
|
"""
|
|
violations: list[dict] = []
|
|
events = intercepted.get("datalayer_events", [])
|
|
storage = intercepted.get("storage_changes", [])
|
|
gcm = consent_state.get("gcm_state", {})
|
|
tracking_evts = [e for e in events if _is_tracking_event(e.get("data", {}))]
|
|
tracking_keys = _tracking_storage_keys(storage)
|
|
|
|
if phase_name == "before_consent":
|
|
sev = "high"
|
|
if tracking_evts:
|
|
violations.append(_violation(
|
|
"DL_TRACK_BEFORE_CONSENT", sev,
|
|
f"{len(tracking_evts)} tracking event(s) in dataLayer before consent"))
|
|
if tracking_keys:
|
|
violations.append(_violation(
|
|
"STORAGE_TRACK_BEFORE_CONSENT", sev,
|
|
f"Tracking storage keys before consent: {', '.join(tracking_keys[:5])}"))
|
|
if gcm and not _gcm_all_denied(gcm):
|
|
granted = [k for k, v in gcm.items() if v == "granted"]
|
|
violations.append(_violation(
|
|
"GCM_NOT_DENIED_BEFORE_CONSENT", sev,
|
|
f"GCM granted before consent: {', '.join(granted)}"))
|
|
|
|
elif phase_name == "after_reject":
|
|
sev = "critical"
|
|
if tracking_evts:
|
|
violations.append(_violation(
|
|
"DL_TRACK_AFTER_REJECT", sev,
|
|
f"{len(tracking_evts)} tracking event(s) in dataLayer after reject"))
|
|
if gcm and not _gcm_all_denied(gcm):
|
|
granted = [k for k, v in gcm.items() if v == "granted"]
|
|
violations.append(_violation(
|
|
"GCM_NOT_DENIED_AFTER_REJECT", sev,
|
|
f"GCM still granted after reject: {', '.join(granted)}"))
|
|
if tracking_keys:
|
|
violations.append(_violation(
|
|
"STORAGE_TRACK_AFTER_REJECT", sev,
|
|
f"Tracking storage keys after reject: {', '.join(tracking_keys[:5])}"))
|
|
|
|
elif phase_name == "after_accept":
|
|
logger.info("Phase accept: %d tracking events (expected), GCM=%s",
|
|
len(tracking_evts), gcm or "none")
|
|
|
|
return violations
|