Files
breakpilot-compliance/consent-tester/services/consent_interceptor.py
T
Benjamin Admin d2dc0c9fe4 feat: Deep consent verification — DataLayer, Storage, GCM, TCF
5 verification layers added to the 3-phase banner test:

1. DataLayer/GTM Interception: Proxy on window.dataLayer captures
   all push() events. Distinguishes safe lifecycle events (gtm.js,
   gtm.dom) from tracking events (page_view, conversion, purchase).
   Flags tracking events before consent as violations.

2. localStorage/sessionStorage Monitoring: Intercepts setItem() to
   detect tracking keys (_ga, _fbp, amplitude, mixpanel, etc.)
   written before consent.

3. Google Consent Mode v2 Runtime Verification: Reads actual GCM
   state (analytics_storage, ad_storage) per phase. Verifies
   default=denied before consent, stays denied after reject,
   switches to granted after accept.

4. TCF v2.2 State: Reads __tcfapi('getTCData') if available.
   Verifies consent purpose states match user choice.

5. Cookie Attribute Analysis: Domain (1st vs 3rd party), expires
   (>13 months), secure flag for tracking cookies.

10 new L2 checks with expert hints (EDPB, CNIL, §25 TDDDG).
All interceptor calls wrapped in try/except for graceful fallback.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-10 08:58:44 +02:00

190 lines
7.4 KiB
Python

"""
Consent Interceptor — JS injection for deep consent verification.
Intercepts dataLayer.push (GTM), gtag() (GCM), Storage.setItem before page load.
Provides helpers to collect, read, and analyze intercepted data per phase.
"""
import logging
import re
logger = logging.getLogger(__name__)
# Tracking event patterns (dataLayer) — NOT gtm.js/gtm.dom/gtm.load
TRACKING_DATALAYER_PATTERNS: list[re.Pattern] = [
re.compile(p, re.IGNORECASE) for p in [
r'"event"\s*:\s*"(gtm\.click|ga4|conversion|purchase|page_view|add_to_cart|begin_checkout)',
r'"event"\s*:\s*"(fb|facebook|meta)\.',
r'"event"\s*:\s*"(hotjar|hj\.|clarity|linkedin|tiktok|pinterest|criteo)',
r'"event"\s*:\s*"track(ing)?',
]
]
_SAFE_DATALAYER_EVENTS = {
"gtm.js", "gtm.dom", "gtm.load", "gtm.init",
"gtm.historyChange", "gtm.scrollDepth", "optimize.activate", "consent_update",
}
# Storage key prefixes that indicate tracking
TRACKING_STORAGE_KEYS: list[str] = [
"_ga", "_gid", "_gat", "_fbp", "_fbc", "_gcl",
"amplitude", "mixpanel",
"_hjSession", "_hjIncludedInPageviewSample", "_hjid",
"_clck", "_clsk", "ai_session", "ai_user", "_pin_unauth", "sc_at",
]
# JS injected via page.addInitScript() BEFORE page loads
INIT_SCRIPT: str = """(() => {
window.__bp_events = [];
window.__bp_consent_updates = [];
window.__bp_storage_changes = [];
const _safe = v => { try { return JSON.parse(JSON.stringify(v)); } catch(_) { return {}; } };
function proxyDL(arr) {
const p = new Proxy(arr, {
set(t, k, v) { t[k] = v;
if (k !== 'length') window.__bp_events.push({ts: Date.now(), data: _safe(v)});
return true; }
});
const origPush = Array.prototype.push;
p.push = function(...a) {
for (const i of a) window.__bp_events.push({ts: Date.now(), data: _safe(i)});
return origPush.apply(this, a);
};
return p;
}
let _dl = window.dataLayer ? proxyDL(window.dataLayer) : undefined;
Object.defineProperty(window, 'dataLayer', {
configurable: true,
get() { return _dl; },
set(v) { _dl = Array.isArray(v) ? proxyDL(v) : v; }
});
const origGtag = window.gtag;
window.gtag = function() {
const a = Array.from(arguments);
window.__bp_consent_updates.push({ts: Date.now(), action: a[0]||'', params: a.length>1 ? _safe(a.slice(1)) : []});
if (typeof origGtag === 'function') return origGtag.apply(this, arguments);
};
const origSet = Storage.prototype.setItem;
Storage.prototype.setItem = function(k, v) {
window.__bp_storage_changes.push({ts: Date.now(), type: this===localStorage?'local':'session', key: k, valueLen: (v||'').length});
return origSet.call(this, k, v);
};
})();"""
async def collect_intercepted_data(page) -> dict:
"""Read back intercepted data arrays from the page context."""
try:
return await page.evaluate("""() => ({
datalayer_events: (window.__bp_events || []).slice(0, 200),
consent_updates: (window.__bp_consent_updates || []).slice(0, 100),
storage_changes: (window.__bp_storage_changes || []).slice(0, 200),
})""")
except Exception as exc:
logger.warning("collect_intercepted_data failed: %s", exc)
return {"datalayer_events": [], "consent_updates": [], "storage_changes": []}
async def get_consent_state(page) -> dict:
"""Read current GCM v2 + TCF v2.2 consent state from the page."""
try:
return await page.evaluate("""() => {
const r = {gcm_state: {}, tcf_data: null};
if (window.dataLayer) {
for (const e of window.dataLayer) {
if (e && e[0] === 'consent') {
const p = e[2] || {};
for (const [k,v] of Object.entries(p)) r.gcm_state[k] = v;
}
}
}
if (typeof window.__tcfapi === 'function') {
try { window.__tcfapi('getTCData', 2, (d, ok) => {
if (ok) r.tcf_data = {tcString: d.tcString||'', gdprApplies: d.gdprApplies,
purpose: d.purpose||{}, vendor: d.vendor||{}};
}); } catch(_) {}
}
return r;
}""")
except Exception as exc:
logger.warning("get_consent_state failed: %s", exc)
return {"gcm_state": {}, "tcf_data": None}
# -- Internal helpers --------------------------------------------------------
def _is_tracking_event(event_data: dict) -> bool:
"""True if a dataLayer event dict represents a tracking event."""
if event_data.get("event", "") in _SAFE_DATALAYER_EVENTS:
return False
s = str(event_data)
return any(p.search(s) for p in TRACKING_DATALAYER_PATTERNS)
def _tracking_storage_keys(changes: list[dict]) -> list[str]:
"""Return storage keys matching known tracking prefixes."""
return [ch["key"] for ch in changes
if any(ch.get("key", "").startswith(p) for p in TRACKING_STORAGE_KEYS)]
def _gcm_all_denied(gcm: dict) -> bool:
return not gcm or all(v == "denied" for v in gcm.values())
def _violation(code: str, severity: str, text: str) -> dict:
return {"code": code, "severity": severity, "text": text}
# -- Public analysis ---------------------------------------------------------
def analyze_phase_data(
phase_name: str, intercepted: dict, consent_state: dict,
) -> list[dict]:
"""Analyze one phase and return list of {code, severity, text} violations.
phase_name: 'before_consent' | 'after_reject' | 'after_accept'
"""
violations: list[dict] = []
events = intercepted.get("datalayer_events", [])
storage = intercepted.get("storage_changes", [])
gcm = consent_state.get("gcm_state", {})
tracking_evts = [e for e in events if _is_tracking_event(e.get("data", {}))]
tracking_keys = _tracking_storage_keys(storage)
if phase_name == "before_consent":
sev = "high"
if tracking_evts:
violations.append(_violation(
"DL_TRACK_BEFORE_CONSENT", sev,
f"{len(tracking_evts)} tracking event(s) in dataLayer before consent"))
if tracking_keys:
violations.append(_violation(
"STORAGE_TRACK_BEFORE_CONSENT", sev,
f"Tracking storage keys before consent: {', '.join(tracking_keys[:5])}"))
if gcm and not _gcm_all_denied(gcm):
granted = [k for k, v in gcm.items() if v == "granted"]
violations.append(_violation(
"GCM_NOT_DENIED_BEFORE_CONSENT", sev,
f"GCM granted before consent: {', '.join(granted)}"))
elif phase_name == "after_reject":
sev = "critical"
if tracking_evts:
violations.append(_violation(
"DL_TRACK_AFTER_REJECT", sev,
f"{len(tracking_evts)} tracking event(s) in dataLayer after reject"))
if gcm and not _gcm_all_denied(gcm):
granted = [k for k, v in gcm.items() if v == "granted"]
violations.append(_violation(
"GCM_NOT_DENIED_AFTER_REJECT", sev,
f"GCM still granted after reject: {', '.join(granted)}"))
if tracking_keys:
violations.append(_violation(
"STORAGE_TRACK_AFTER_REJECT", sev,
f"Tracking storage keys after reject: {', '.join(tracking_keys[:5])}"))
elif phase_name == "after_accept":
logger.info("Phase accept: %d tracking events (expected), GCM=%s",
len(tracking_evts), gcm or "none")
return violations