feat: Deep consent verification — DataLayer, Storage, GCM, TCF

5 verification layers added to the 3-phase banner test:

1. DataLayer/GTM Interception: Proxy on window.dataLayer captures
   all push() events. Distinguishes safe lifecycle events (gtm.js,
   gtm.dom) from tracking events (page_view, conversion, purchase).
   Flags tracking events before consent as violations.

2. localStorage/sessionStorage Monitoring: Intercepts setItem() to
   detect tracking keys (_ga, _fbp, amplitude, mixpanel, etc.)
   written before consent.

3. Google Consent Mode v2 Runtime Verification: Reads actual GCM
   state (analytics_storage, ad_storage) per phase. Verifies
   default=denied before consent, stays denied after reject,
   switches to granted after accept.

4. TCF v2.2 State: Reads __tcfapi('getTCData') if available.
   Verifies consent purpose states match user choice.

5. Cookie Attribute Analysis: Domain (1st vs 3rd party), expires
   (>13 months), secure flag for tracking cookies.

10 new L2 checks with expert hints (EDPB, CNIL, §25 TDDDG).
All interceptor calls wrapped in try/except for graceful fallback.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-10 08:58:44 +02:00
parent 99ef9873ad
commit d2dc0c9fe4
4 changed files with 499 additions and 0 deletions
@@ -0,0 +1,189 @@
"""
Consent Interceptor — JS injection for deep consent verification.
Intercepts dataLayer.push (GTM), gtag() (GCM), Storage.setItem before page load.
Provides helpers to collect, read, and analyze intercepted data per phase.
"""
import logging
import re
logger = logging.getLogger(__name__)
# Tracking event patterns (dataLayer) — NOT gtm.js/gtm.dom/gtm.load
TRACKING_DATALAYER_PATTERNS: list[re.Pattern] = [
re.compile(p, re.IGNORECASE) for p in [
r'"event"\s*:\s*"(gtm\.click|ga4|conversion|purchase|page_view|add_to_cart|begin_checkout)',
r'"event"\s*:\s*"(fb|facebook|meta)\.',
r'"event"\s*:\s*"(hotjar|hj\.|clarity|linkedin|tiktok|pinterest|criteo)',
r'"event"\s*:\s*"track(ing)?',
]
]
_SAFE_DATALAYER_EVENTS = {
"gtm.js", "gtm.dom", "gtm.load", "gtm.init",
"gtm.historyChange", "gtm.scrollDepth", "optimize.activate", "consent_update",
}
# Storage key prefixes that indicate tracking
TRACKING_STORAGE_KEYS: list[str] = [
"_ga", "_gid", "_gat", "_fbp", "_fbc", "_gcl",
"amplitude", "mixpanel",
"_hjSession", "_hjIncludedInPageviewSample", "_hjid",
"_clck", "_clsk", "ai_session", "ai_user", "_pin_unauth", "sc_at",
]
# JS injected via page.addInitScript() BEFORE page loads
INIT_SCRIPT: str = """(() => {
window.__bp_events = [];
window.__bp_consent_updates = [];
window.__bp_storage_changes = [];
const _safe = v => { try { return JSON.parse(JSON.stringify(v)); } catch(_) { return {}; } };
function proxyDL(arr) {
const p = new Proxy(arr, {
set(t, k, v) { t[k] = v;
if (k !== 'length') window.__bp_events.push({ts: Date.now(), data: _safe(v)});
return true; }
});
const origPush = Array.prototype.push;
p.push = function(...a) {
for (const i of a) window.__bp_events.push({ts: Date.now(), data: _safe(i)});
return origPush.apply(this, a);
};
return p;
}
let _dl = window.dataLayer ? proxyDL(window.dataLayer) : undefined;
Object.defineProperty(window, 'dataLayer', {
configurable: true,
get() { return _dl; },
set(v) { _dl = Array.isArray(v) ? proxyDL(v) : v; }
});
const origGtag = window.gtag;
window.gtag = function() {
const a = Array.from(arguments);
window.__bp_consent_updates.push({ts: Date.now(), action: a[0]||'', params: a.length>1 ? _safe(a.slice(1)) : []});
if (typeof origGtag === 'function') return origGtag.apply(this, arguments);
};
const origSet = Storage.prototype.setItem;
Storage.prototype.setItem = function(k, v) {
window.__bp_storage_changes.push({ts: Date.now(), type: this===localStorage?'local':'session', key: k, valueLen: (v||'').length});
return origSet.call(this, k, v);
};
})();"""
async def collect_intercepted_data(page) -> dict:
"""Read back intercepted data arrays from the page context."""
try:
return await page.evaluate("""() => ({
datalayer_events: (window.__bp_events || []).slice(0, 200),
consent_updates: (window.__bp_consent_updates || []).slice(0, 100),
storage_changes: (window.__bp_storage_changes || []).slice(0, 200),
})""")
except Exception as exc:
logger.warning("collect_intercepted_data failed: %s", exc)
return {"datalayer_events": [], "consent_updates": [], "storage_changes": []}
async def get_consent_state(page) -> dict:
"""Read current GCM v2 + TCF v2.2 consent state from the page."""
try:
return await page.evaluate("""() => {
const r = {gcm_state: {}, tcf_data: null};
if (window.dataLayer) {
for (const e of window.dataLayer) {
if (e && e[0] === 'consent') {
const p = e[2] || {};
for (const [k,v] of Object.entries(p)) r.gcm_state[k] = v;
}
}
}
if (typeof window.__tcfapi === 'function') {
try { window.__tcfapi('getTCData', 2, (d, ok) => {
if (ok) r.tcf_data = {tcString: d.tcString||'', gdprApplies: d.gdprApplies,
purpose: d.purpose||{}, vendor: d.vendor||{}};
}); } catch(_) {}
}
return r;
}""")
except Exception as exc:
logger.warning("get_consent_state failed: %s", exc)
return {"gcm_state": {}, "tcf_data": None}
# -- Internal helpers --------------------------------------------------------
def _is_tracking_event(event_data: dict) -> bool:
"""True if a dataLayer event dict represents a tracking event."""
if event_data.get("event", "") in _SAFE_DATALAYER_EVENTS:
return False
s = str(event_data)
return any(p.search(s) for p in TRACKING_DATALAYER_PATTERNS)
def _tracking_storage_keys(changes: list[dict]) -> list[str]:
"""Return storage keys matching known tracking prefixes."""
return [ch["key"] for ch in changes
if any(ch.get("key", "").startswith(p) for p in TRACKING_STORAGE_KEYS)]
def _gcm_all_denied(gcm: dict) -> bool:
return not gcm or all(v == "denied" for v in gcm.values())
def _violation(code: str, severity: str, text: str) -> dict:
return {"code": code, "severity": severity, "text": text}
# -- Public analysis ---------------------------------------------------------
def analyze_phase_data(
phase_name: str, intercepted: dict, consent_state: dict,
) -> list[dict]:
"""Analyze one phase and return list of {code, severity, text} violations.
phase_name: 'before_consent' | 'after_reject' | 'after_accept'
"""
violations: list[dict] = []
events = intercepted.get("datalayer_events", [])
storage = intercepted.get("storage_changes", [])
gcm = consent_state.get("gcm_state", {})
tracking_evts = [e for e in events if _is_tracking_event(e.get("data", {}))]
tracking_keys = _tracking_storage_keys(storage)
if phase_name == "before_consent":
sev = "high"
if tracking_evts:
violations.append(_violation(
"DL_TRACK_BEFORE_CONSENT", sev,
f"{len(tracking_evts)} tracking event(s) in dataLayer before consent"))
if tracking_keys:
violations.append(_violation(
"STORAGE_TRACK_BEFORE_CONSENT", sev,
f"Tracking storage keys before consent: {', '.join(tracking_keys[:5])}"))
if gcm and not _gcm_all_denied(gcm):
granted = [k for k, v in gcm.items() if v == "granted"]
violations.append(_violation(
"GCM_NOT_DENIED_BEFORE_CONSENT", sev,
f"GCM granted before consent: {', '.join(granted)}"))
elif phase_name == "after_reject":
sev = "critical"
if tracking_evts:
violations.append(_violation(
"DL_TRACK_AFTER_REJECT", sev,
f"{len(tracking_evts)} tracking event(s) in dataLayer after reject"))
if gcm and not _gcm_all_denied(gcm):
granted = [k for k, v in gcm.items() if v == "granted"]
violations.append(_violation(
"GCM_NOT_DENIED_AFTER_REJECT", sev,
f"GCM still granted after reject: {', '.join(granted)}"))
if tracking_keys:
violations.append(_violation(
"STORAGE_TRACK_AFTER_REJECT", sev,
f"Tracking storage keys after reject: {', '.join(tracking_keys[:5])}"))
elif phase_name == "after_accept":
logger.info("Phase accept: %d tracking events (expected), GCM=%s",
len(tracking_evts), gcm or "none")
return violations
@@ -23,6 +23,12 @@ from services.script_analyzer import (
find_violations_before_consent, find_violations_after_reject, Violation,
)
from services.banner_text_checker import check_banner_text as _check_banner_text
from services.consent_interceptor import (
INIT_SCRIPT as _INTERCEPTOR_INIT,
collect_intercepted_data as _collect_intercepted,
get_consent_state as _get_consent_state,
analyze_phase_data as _analyze_phase,
)
logger = logging.getLogger(__name__)
@@ -57,6 +63,8 @@ class ConsentTestResult:
banner_text_violations: list[Violation] = field(default_factory=list)
banner_has_impressum_link: bool = False
banner_has_dse_link: bool = False
# Deep verification (per-phase intercepted data)
deep_verification: dict = field(default_factory=dict)
async def run_consent_test(
@@ -94,6 +102,7 @@ async def run_consent_test(
timezone_id="Europe/Berlin",
)
page_a = await ctx_a.new_page()
await page_a.add_init_script(_INTERCEPTOR_INIT)
if HAS_STEALTH:
await stealth_async(page_a)
scripts_a = []
@@ -102,6 +111,19 @@ async def run_consent_test(
await page_a.goto(url, wait_until="networkidle", timeout=30000)
await page_a.wait_for_timeout(wait_ms)
# Deep verification: Phase A
try:
intercepted_a = await _collect_intercepted(page_a)
consent_state_a = await _get_consent_state(page_a)
deep_violations_a = _analyze_phase("before_consent", intercepted_a, consent_state_a)
result.deep_verification["before_consent"] = {
"intercepted": intercepted_a,
"consent_state": consent_state_a,
"violations": deep_violations_a,
}
except Exception as exc:
logger.warning("Phase A deep verification failed: %s", exc)
result.before_scripts = _get_page_scripts(scripts_a)
result.before_cookies = _get_cookie_names(await ctx_a.cookies())
result.before_tracking = find_tracking_services(result.before_scripts)
@@ -135,6 +157,7 @@ async def run_consent_test(
timezone_id="Europe/Berlin",
)
page_b = await ctx_b.new_page()
await page_b.add_init_script(_INTERCEPTOR_INIT)
if HAS_STEALTH:
await stealth_async(page_b)
scripts_b = []
@@ -150,6 +173,19 @@ async def run_consent_test(
else:
logger.warning("Could not click reject button")
# Deep verification: Phase B
try:
intercepted_b = await _collect_intercepted(page_b)
consent_state_b = await _get_consent_state(page_b)
deep_violations_b = _analyze_phase("after_reject", intercepted_b, consent_state_b)
result.deep_verification["after_reject"] = {
"intercepted": intercepted_b,
"consent_state": consent_state_b,
"violations": deep_violations_b,
}
except Exception as exc:
logger.warning("Phase B deep verification failed: %s", exc)
result.reject_scripts = _get_page_scripts(scripts_b)
result.reject_cookies = _get_cookie_names(await ctx_b.cookies())
reject_tracking = find_tracking_services(result.reject_scripts)
@@ -169,6 +205,7 @@ async def run_consent_test(
timezone_id="Europe/Berlin",
)
page_c = await ctx_c.new_page()
await page_c.add_init_script(_INTERCEPTOR_INIT)
if HAS_STEALTH:
await stealth_async(page_c)
scripts_c = []
@@ -184,6 +221,19 @@ async def run_consent_test(
else:
logger.warning("Could not click accept button")
# Deep verification: Phase C
try:
intercepted_c = await _collect_intercepted(page_c)
consent_state_c = await _get_consent_state(page_c)
deep_violations_c = _analyze_phase("after_accept", intercepted_c, consent_state_c)
result.deep_verification["after_accept"] = {
"intercepted": intercepted_c,
"consent_state": consent_state_c,
"violations": deep_violations_c,
}
except Exception as exc:
logger.warning("Phase C deep verification failed: %s", exc)
result.accept_scripts = _get_page_scripts(scripts_c)
result.accept_cookies = _get_cookie_names(await ctx_c.cookies())
accept_tracking = find_tracking_services(result.accept_scripts)