Files
breakpilot-compliance/consent-tester/services/cmp_extractor.py
T
Benjamin Admin 189918b043 fix(cmp): stricter heuristic + only replace DOM when CMP is strictly larger
Two bugs observed in BMW BMW test run:

1. Generic JSON heuristic captured /de-de/login/bmw/api/flyout/data (4KB,
   user login fly-out data) and reconstruct_generic produced 56 words of
   noise. The CMP-prefer logic then 'replaced' the 185-word imprint DOM
   extraction with those 56 words because self_wc(185) < 300 — even
   though cmp_wc(56) < self_wc(185).

2. The strict prefilter list was too short. Login/auth/cart endpoints
   often have category-shaped JSON without being cookie policies.

Fixes:
- dsi_discovery: replace DOM with CMP only when cmp_wc > self_wc AND
  meets one of the existing conditions. Tiny captures can no longer
  silently destroy a bigger DOM extraction.
- cmp_extractor: skip non-cookie URLs (/login, /auth, /user, /session,
  /cart, /checkout, /search, /flyout, /menu, /nav, /translation, /i18n,
  /locale, /feature-flag).
- cmp_extractor: require ≥5KB payload size — real CMP policies are
  always larger (BMW ePaaS is ~393KB). Tiny matches drop out before
  reconstruction.
2026-05-17 10:50:19 +02:00

135 lines
5.3 KiB
Python

"""
CMP Extractor — thin coordinator.
Captures CMP (Cookie Management Platform) JSON payloads from network responses
during page navigation. Three-stage cascade:
1. Named CMP library (services/cmp_library/) — best quality, hand-written
reconstructors per vendor (ePaaS, OneTrust, Cookiebot, Usercentrics,
Didomi, TrustArc, + auto-promoted entries from Phase E).
2. Generic JSON cookie-policy heuristic (cmp_heuristic.py) — catches
unknown CMPs that still expose a recognizable JSON shape.
A single CMP page may emit multiple payloads (e.g. consentcontroller +
policypage); all are captured and concatenated by reconstruct_cookie_policy().
"""
from __future__ import annotations
import json
import logging
from typing import TYPE_CHECKING, Callable
if TYPE_CHECKING:
from playwright.async_api import Page, Response
from services.cmp_library._registry import load_all as _load_registry
logger = logging.getLogger(__name__)
# Loaded once at import time. Restart consent-tester to pick up new modules.
_REGISTRY: list[tuple[str, object, Callable[[dict], str]]] = _load_registry()
logger.info("CMP library loaded: %d named matchers", len(_REGISTRY))
class CMPCapture:
"""Holds CMP-related JSON payloads captured during navigation."""
def __init__(self) -> None:
self.payloads: list[tuple[str, dict]] = [] # [(cmp_name, parsed_json), ...]
def attach(self, page: Page) -> None:
"""Hook the page's response event. Must be called BEFORE page.goto()."""
page.on("response", self._on_response)
async def _on_response(self, response: Response) -> None:
try:
if response.status != 200:
return
url = response.url
# Stage 1: Named CMP matchers (highest quality)
for cmp_name, matcher, _reconstruct in _REGISTRY:
if matcher.search(url): # type: ignore[attr-defined]
data = await _parse_json_response(response)
if data is None:
return
self.payloads.append((cmp_name, data))
logger.info(
"CMP captured: %s (%s, ~%dKB)",
cmp_name, url[:120], len(json.dumps(data)) // 1024,
)
return
# Stage 2: Generic heuristic for unknown CMPs.
# Pre-filter: skip noisy/irrelevant endpoints to avoid spamming.
content_type = (response.headers.get("content-type") or "").lower()
if "json" not in content_type:
return
url_lower = url.lower()
if any(skip in url_lower for skip in (
"/api/config", "/beacon", "/track", "/analytics",
"/fonts/", "/log/", "/heartbeat", "/.well-known/",
"/intake/", "/collect", "/ping", "/metrics",
"/login", "/auth", "/user", "/session", "/cart", "/checkout",
"/search", "/recommendation", "/flyout", "/menu", "/nav",
"/translation", "/i18n", "/locale", "/feature-flag",
)):
return
data = await _parse_json_response(response)
if data is None:
return
# Skip tiny payloads — real CMP cookie policies are ≥5KB.
# A 4KB JSON of cookie-shaped data is almost never the policy.
try:
size_kb = len(json.dumps(data)) // 1024
except Exception:
size_kb = 0
if size_kb < 5:
return
from services.cmp_heuristic import looks_like_cookie_policy
if looks_like_cookie_policy(data):
self.payloads.append(("_heuristic", data))
logger.info(
"CMP captured: _heuristic (%s, ~%dKB)",
url[:120], size_kb,
)
except Exception as e:
logger.debug("CMP listener error: %s", e)
def reconstruct_cookie_policy(self) -> str:
"""Build a single Cookie-Policy text from all captured payloads."""
from services.cmp_heuristic import reconstruct_generic
# Build a quick lookup so we can dispatch by name without re-loading
# the registry on every call.
by_name = {name: fn for name, _matcher, fn in _REGISTRY}
parts: list[str] = []
for cmp_name, data in self.payloads:
try:
if cmp_name == "_heuristic":
parts.append(reconstruct_generic(data))
elif cmp_name in by_name:
parts.append(by_name[cmp_name](data))
else:
# Unknown name (perhaps a hot-loaded module that was
# since removed) — fall back to generic.
parts.append(reconstruct_generic(data))
except Exception as e:
logger.warning("CMP %s reconstruction failed: %s", cmp_name, e)
return "\n\n".join(p for p in parts if p)
async def _parse_json_response(response: Response) -> dict | None:
"""Best-effort JSON parse from a Playwright Response."""
try:
return await response.json()
except Exception:
try:
body = await response.body()
return json.loads(body.decode("utf-8", errors="ignore"))
except Exception:
return None