75174273f4
VW & andere unbekannte CMPs liefern 603-Wort-Bug: kein Named-Matcher greift, generische Heuristik filtert oder size_kb < 5 → cmp_cookie_text bleibt leer → Backend faellt auf 603-Wort DOM-Navigation zurueck. Neuer INFO-Log fuer jede JSON-Response >=3KB die als CMP-Kandidat ueberlebt, aber Heuristik ODER Size-Schwelle nicht passt. Top-Keys + URL + Size — beim naechsten VW-Run sofort sichtbar, welcher Endpoint ein Named-Pattern braucht. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
147 lines
5.9 KiB
Python
147 lines
5.9 KiB
Python
"""
|
|
CMP Extractor — thin coordinator.
|
|
|
|
Captures CMP (Cookie Management Platform) JSON payloads from network responses
|
|
during page navigation. Three-stage cascade:
|
|
|
|
1. Named CMP library (services/cmp_library/) — best quality, hand-written
|
|
reconstructors per vendor (ePaaS, OneTrust, Cookiebot, Usercentrics,
|
|
Didomi, TrustArc, + auto-promoted entries from Phase E).
|
|
2. Generic JSON cookie-policy heuristic (cmp_heuristic.py) — catches
|
|
unknown CMPs that still expose a recognizable JSON shape.
|
|
|
|
A single CMP page may emit multiple payloads (e.g. consentcontroller +
|
|
policypage); all are captured and concatenated by reconstruct_cookie_policy().
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from typing import TYPE_CHECKING, Callable
|
|
|
|
if TYPE_CHECKING:
|
|
from playwright.async_api import Page, Response
|
|
|
|
from services.cmp_library._registry import load_all as _load_registry
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Loaded once at import time. Restart consent-tester to pick up new modules.
|
|
_REGISTRY: list[tuple[str, object, Callable[[dict], str]]] = _load_registry()
|
|
logger.info("CMP library loaded: %d named matchers", len(_REGISTRY))
|
|
|
|
|
|
class CMPCapture:
|
|
"""Holds CMP-related JSON payloads captured during navigation."""
|
|
|
|
def __init__(self) -> None:
|
|
self.payloads: list[tuple[str, dict]] = [] # [(cmp_name, parsed_json), ...]
|
|
|
|
def attach(self, page: Page) -> None:
|
|
"""Hook the page's response event. Must be called BEFORE page.goto()."""
|
|
page.on("response", self._on_response)
|
|
|
|
async def _on_response(self, response: Response) -> None:
|
|
try:
|
|
if response.status != 200:
|
|
return
|
|
url = response.url
|
|
|
|
# Stage 1: Named CMP matchers (highest quality)
|
|
for cmp_name, matcher, _reconstruct in _REGISTRY:
|
|
if matcher.search(url): # type: ignore[attr-defined]
|
|
data = await _parse_json_response(response)
|
|
if data is None:
|
|
return
|
|
self.payloads.append((cmp_name, data))
|
|
logger.info(
|
|
"CMP captured: %s (%s, ~%dKB)",
|
|
cmp_name, url[:120], len(json.dumps(data)) // 1024,
|
|
)
|
|
return
|
|
|
|
# Stage 2: Generic heuristic for unknown CMPs.
|
|
# Pre-filter: skip noisy/irrelevant endpoints to avoid spamming.
|
|
content_type = (response.headers.get("content-type") or "").lower()
|
|
if "json" not in content_type:
|
|
return
|
|
url_lower = url.lower()
|
|
if any(skip in url_lower for skip in (
|
|
"/api/config", "/beacon", "/track", "/analytics",
|
|
"/fonts/", "/log/", "/heartbeat", "/.well-known/",
|
|
"/intake/", "/collect", "/ping", "/metrics",
|
|
"/login", "/auth", "/user", "/session", "/cart", "/checkout",
|
|
"/search", "/recommendation", "/flyout", "/menu", "/nav",
|
|
"/translation", "/i18n", "/locale", "/feature-flag",
|
|
)):
|
|
return
|
|
|
|
data = await _parse_json_response(response)
|
|
if data is None:
|
|
return
|
|
try:
|
|
size_kb = len(json.dumps(data)) // 1024
|
|
except Exception:
|
|
size_kb = 0
|
|
from services.cmp_heuristic import looks_like_cookie_policy
|
|
matched = looks_like_cookie_policy(data)
|
|
if matched and size_kb >= 5:
|
|
self.payloads.append(("_heuristic", data))
|
|
logger.info(
|
|
"CMP captured: _heuristic (%s, ~%dKB)",
|
|
url[:120], size_kb,
|
|
)
|
|
elif size_kb >= 3:
|
|
# Phase-0-Diagnose-Log: JSON-Response die als CMP-Kandidat
|
|
# ueberlebt hat, aber heuristic OR size-threshold abgelehnt
|
|
# wurde. Zeigt beim naechsten VW/BMW/... Run welche Endpoints
|
|
# uebersehen werden — schneller Pattern-Add ohne raten.
|
|
top_keys = []
|
|
if isinstance(data, dict):
|
|
top_keys = list(data.keys())[:8]
|
|
elif isinstance(data, list) and data and isinstance(data[0], dict):
|
|
top_keys = list(data[0].keys())[:8]
|
|
logger.info(
|
|
"CMP candidate skipped: url=%s size=%dKB heuristic=%s "
|
|
"top_keys=%s",
|
|
url[:120], size_kb, matched, top_keys,
|
|
)
|
|
except Exception as e:
|
|
logger.debug("CMP listener error: %s", e)
|
|
|
|
def reconstruct_cookie_policy(self) -> str:
|
|
"""Build a single Cookie-Policy text from all captured payloads."""
|
|
from services.cmp_heuristic import reconstruct_generic
|
|
|
|
# Build a quick lookup so we can dispatch by name without re-loading
|
|
# the registry on every call.
|
|
by_name = {name: fn for name, _matcher, fn in _REGISTRY}
|
|
|
|
parts: list[str] = []
|
|
for cmp_name, data in self.payloads:
|
|
try:
|
|
if cmp_name == "_heuristic":
|
|
parts.append(reconstruct_generic(data))
|
|
elif cmp_name in by_name:
|
|
parts.append(by_name[cmp_name](data))
|
|
else:
|
|
# Unknown name (perhaps a hot-loaded module that was
|
|
# since removed) — fall back to generic.
|
|
parts.append(reconstruct_generic(data))
|
|
except Exception as e:
|
|
logger.warning("CMP %s reconstruction failed: %s", cmp_name, e)
|
|
return "\n\n".join(p for p in parts if p)
|
|
|
|
|
|
async def _parse_json_response(response: Response) -> dict | None:
|
|
"""Best-effort JSON parse from a Playwright Response."""
|
|
try:
|
|
return await response.json()
|
|
except Exception:
|
|
try:
|
|
body = await response.body()
|
|
return json.loads(body.decode("utf-8", errors="ignore"))
|
|
except Exception:
|
|
return None
|