""" Consent Interceptor — JS injection for deep consent verification. Intercepts dataLayer.push (GTM), gtag() (GCM), Storage.setItem before page load. Provides helpers to collect, read, and analyze intercepted data per phase. """ import logging import re logger = logging.getLogger(__name__) # Tracking event patterns (dataLayer) — NOT gtm.js/gtm.dom/gtm.load TRACKING_DATALAYER_PATTERNS: list[re.Pattern] = [ re.compile(p, re.IGNORECASE) for p in [ r'"event"\s*:\s*"(gtm\.click|ga4|conversion|purchase|page_view|add_to_cart|begin_checkout)', r'"event"\s*:\s*"(fb|facebook|meta)\.', r'"event"\s*:\s*"(hotjar|hj\.|clarity|linkedin|tiktok|pinterest|criteo)', r'"event"\s*:\s*"track(ing)?', ] ] _SAFE_DATALAYER_EVENTS = { "gtm.js", "gtm.dom", "gtm.load", "gtm.init", "gtm.historyChange", "gtm.scrollDepth", "optimize.activate", "consent_update", } # Storage key prefixes that indicate tracking TRACKING_STORAGE_KEYS: list[str] = [ "_ga", "_gid", "_gat", "_fbp", "_fbc", "_gcl", "amplitude", "mixpanel", "_hjSession", "_hjIncludedInPageviewSample", "_hjid", "_clck", "_clsk", "ai_session", "ai_user", "_pin_unauth", "sc_at", ] # JS injected via page.addInitScript() BEFORE page loads INIT_SCRIPT: str = """(() => { window.__bp_events = []; window.__bp_consent_updates = []; window.__bp_storage_changes = []; const _safe = v => { try { return JSON.parse(JSON.stringify(v)); } catch(_) { return {}; } }; function proxyDL(arr) { const p = new Proxy(arr, { set(t, k, v) { t[k] = v; if (k !== 'length') window.__bp_events.push({ts: Date.now(), data: _safe(v)}); return true; } }); const origPush = Array.prototype.push; p.push = function(...a) { for (const i of a) window.__bp_events.push({ts: Date.now(), data: _safe(i)}); return origPush.apply(this, a); }; return p; } let _dl = window.dataLayer ? proxyDL(window.dataLayer) : undefined; Object.defineProperty(window, 'dataLayer', { configurable: true, get() { return _dl; }, set(v) { _dl = Array.isArray(v) ? proxyDL(v) : v; } }); const origGtag = window.gtag; window.gtag = function() { const a = Array.from(arguments); window.__bp_consent_updates.push({ts: Date.now(), action: a[0]||'', params: a.length>1 ? _safe(a.slice(1)) : []}); if (typeof origGtag === 'function') return origGtag.apply(this, arguments); }; const origSet = Storage.prototype.setItem; Storage.prototype.setItem = function(k, v) { window.__bp_storage_changes.push({ts: Date.now(), type: this===localStorage?'local':'session', key: k, valueLen: (v||'').length}); return origSet.call(this, k, v); }; })();""" async def collect_intercepted_data(page) -> dict: """Read back intercepted data arrays from the page context.""" try: return await page.evaluate("""() => ({ datalayer_events: (window.__bp_events || []).slice(0, 200), consent_updates: (window.__bp_consent_updates || []).slice(0, 100), storage_changes: (window.__bp_storage_changes || []).slice(0, 200), })""") except Exception as exc: logger.warning("collect_intercepted_data failed: %s", exc) return {"datalayer_events": [], "consent_updates": [], "storage_changes": []} async def get_consent_state(page) -> dict: """Read current GCM v2 + TCF v2.2 consent state from the page.""" try: return await page.evaluate("""() => { const r = {gcm_state: {}, tcf_data: null}; if (window.dataLayer) { for (const e of window.dataLayer) { if (e && e[0] === 'consent') { const p = e[2] || {}; for (const [k,v] of Object.entries(p)) r.gcm_state[k] = v; } } } if (typeof window.__tcfapi === 'function') { try { window.__tcfapi('getTCData', 2, (d, ok) => { if (ok) r.tcf_data = {tcString: d.tcString||'', gdprApplies: d.gdprApplies, purpose: d.purpose||{}, vendor: d.vendor||{}}; }); } catch(_) {} } return r; }""") except Exception as exc: logger.warning("get_consent_state failed: %s", exc) return {"gcm_state": {}, "tcf_data": None} # -- Internal helpers -------------------------------------------------------- def _is_tracking_event(event_data: dict) -> bool: """True if a dataLayer event dict represents a tracking event.""" if event_data.get("event", "") in _SAFE_DATALAYER_EVENTS: return False s = str(event_data) return any(p.search(s) for p in TRACKING_DATALAYER_PATTERNS) def _tracking_storage_keys(changes: list[dict]) -> list[str]: """Return storage keys matching known tracking prefixes.""" return [ch["key"] for ch in changes if any(ch.get("key", "").startswith(p) for p in TRACKING_STORAGE_KEYS)] def _gcm_all_denied(gcm: dict) -> bool: return not gcm or all(v == "denied" for v in gcm.values()) def _violation(code: str, severity: str, text: str) -> dict: return {"code": code, "severity": severity, "text": text} # -- Public analysis --------------------------------------------------------- def analyze_phase_data( phase_name: str, intercepted: dict, consent_state: dict, ) -> list[dict]: """Analyze one phase and return list of {code, severity, text} violations. phase_name: 'before_consent' | 'after_reject' | 'after_accept' """ violations: list[dict] = [] events = intercepted.get("datalayer_events", []) storage = intercepted.get("storage_changes", []) gcm = consent_state.get("gcm_state", {}) tracking_evts = [e for e in events if _is_tracking_event(e.get("data", {}))] tracking_keys = _tracking_storage_keys(storage) if phase_name == "before_consent": sev = "high" if tracking_evts: violations.append(_violation( "DL_TRACK_BEFORE_CONSENT", sev, f"{len(tracking_evts)} tracking event(s) in dataLayer before consent")) if tracking_keys: violations.append(_violation( "STORAGE_TRACK_BEFORE_CONSENT", sev, f"Tracking storage keys before consent: {', '.join(tracking_keys[:5])}")) if gcm and not _gcm_all_denied(gcm): granted = [k for k, v in gcm.items() if v == "granted"] violations.append(_violation( "GCM_NOT_DENIED_BEFORE_CONSENT", sev, f"GCM granted before consent: {', '.join(granted)}")) elif phase_name == "after_reject": sev = "critical" if tracking_evts: violations.append(_violation( "DL_TRACK_AFTER_REJECT", sev, f"{len(tracking_evts)} tracking event(s) in dataLayer after reject")) if gcm and not _gcm_all_denied(gcm): granted = [k for k, v in gcm.items() if v == "granted"] violations.append(_violation( "GCM_NOT_DENIED_AFTER_REJECT", sev, f"GCM still granted after reject: {', '.join(granted)}")) if tracking_keys: violations.append(_violation( "STORAGE_TRACK_AFTER_REJECT", sev, f"Tracking storage keys after reject: {', '.join(tracking_keys[:5])}")) elif phase_name == "after_accept": logger.info("Phase accept: %d tracking events (expected), GCM=%s", len(tracking_evts), gcm or "none") return violations