feat(consent-tester): Phase A — generic JSON cookie-policy heuristic

New module cmp_heuristic.py with:
- looks_like_cookie_policy(data): shape-based classifier (top-level keys
  cookies/categories/providers/vendors/purposes/cookieList/etc. + at
  least 2 name+description objects, or IAB TCF v2 vendors[]+purposes[])
- reconstruct_generic(data): walks JSON, extracts name + description
  fields + standalone prologue/dataController/persistence fields,
  emits flat German Markdown text (max 5000 words, dedup)

cmp_extractor.py wired so that AFTER named CMP matchers (epaas,
onetrust) fail, every JSON response on the page is tested for the
heuristic. If matched, payload is captured as '_heuristic' kind and
reconstructed via the generic walker.

This is Phase A of the 4-stage cascade (B-D follow). Unknown CMPs that
return JSON now work without hand-coding each one.

Pre-filter: skips response paths /api/config, /beacon, /track,
/analytics, /fonts/, /log/, /heartbeat/, /.well-known/ to avoid
spamming the heuristic on every Playwright load.
This commit is contained in:
Benjamin Admin
2026-05-16 22:56:20 +02:00
parent 9814b56f2f
commit 8283483909
2 changed files with 235 additions and 11 deletions
+44 -11
View File
@@ -52,24 +52,40 @@ class CMPCapture:
async def _on_response(self, response: Response) -> None: async def _on_response(self, response: Response) -> None:
try: try:
url = response.url url = response.url
if response.status != 200:
return
# 1) Named CMP matchers (highest quality)
for cmp_name, pattern in _MATCHERS: for cmp_name, pattern in _MATCHERS:
if pattern.search(url): if pattern.search(url):
if response.status != 200: data = await _parse_json_response(response)
logger.info("CMP %s response %s (%d) — skipped", if data is None:
cmp_name, url[:120], response.status)
return
try:
data = await response.json()
except Exception:
body = await response.body()
try:
data = json.loads(body.decode("utf-8", errors="ignore"))
except Exception:
return return
self.payloads.append((cmp_name, data)) self.payloads.append((cmp_name, data))
logger.info("CMP captured: %s (%s, ~%dKB)", logger.info("CMP captured: %s (%s, ~%dKB)",
cmp_name, url[:120], len(json.dumps(data)) // 1024) cmp_name, url[:120], len(json.dumps(data)) // 1024)
return return
# 2) Generic shape-based heuristic for unknown CMPs.
# Only consider JSON responses ≥1KB (skip small config blobs).
content_type = (response.headers.get("content-type") or "").lower()
if "json" not in content_type:
return
# Cheap pre-filter: skip noisy paths (analytics, fonts, etc.)
url_lower = url.lower()
if any(skip in url_lower for skip in (
"/api/config", "/beacon", "/track", "/analytics",
"/fonts/", "/log/", "/heartbeat", "/.well-known/",
)):
return
data = await _parse_json_response(response)
if data is None:
return
from services.cmp_heuristic import looks_like_cookie_policy
if looks_like_cookie_policy(data):
self.payloads.append(("_heuristic", data))
logger.info("CMP captured: _heuristic (%s, ~%dKB)",
url[:120], len(json.dumps(data)) // 1024)
except Exception as e: except Exception as e:
logger.debug("CMP listener error: %s", e) logger.debug("CMP listener error: %s", e)
@@ -77,7 +93,10 @@ class CMPCapture:
"""Build a single Cookie-Policy text from all captured payloads. """Build a single Cookie-Policy text from all captured payloads.
Returns empty string if nothing was captured or reconstruction fails. Returns empty string if nothing was captured or reconstruction fails.
Named CMPs take precedence over the generic heuristic (richer output).
""" """
from services.cmp_heuristic import reconstruct_generic
parts: list[str] = [] parts: list[str] = []
for cmp_name, data in self.payloads: for cmp_name, data in self.payloads:
try: try:
@@ -85,11 +104,25 @@ class CMPCapture:
parts.append(_reconstruct_epaas(data)) parts.append(_reconstruct_epaas(data))
elif cmp_name == "onetrust": elif cmp_name == "onetrust":
parts.append(_reconstruct_onetrust(data)) parts.append(_reconstruct_onetrust(data))
elif cmp_name == "_heuristic":
parts.append(reconstruct_generic(data))
except Exception as e: except Exception as e:
logger.warning("CMP %s reconstruction failed: %s", cmp_name, e) logger.warning("CMP %s reconstruction failed: %s", cmp_name, e)
return "\n\n".join(p for p in parts if p) return "\n\n".join(p for p in parts if p)
async def _parse_json_response(response: Response) -> dict | None:
"""Best-effort JSON parse from a Playwright Response."""
try:
return await response.json()
except Exception:
try:
body = await response.body()
return json.loads(body.decode("utf-8", errors="ignore"))
except Exception:
return None
def _reconstruct_epaas(d: dict) -> str: def _reconstruct_epaas(d: dict) -> str:
"""Build a German Cookie-Policy from BMW ePaaS policy JSON. """Build a German Cookie-Policy from BMW ePaaS policy JSON.
+191
View File
@@ -0,0 +1,191 @@
"""
Generic Cookie-Policy JSON heuristic.
When a CMP we don't know yet returns a JSON payload, we can still recognize
"this JSON describes a cookie policy" by its shape. This module:
1. `looks_like_cookie_policy(data)` — fast shape-based classifier
2. `reconstruct_generic(data)` — walks the JSON, extracts every name/
description/purpose/expiry field and emits a flat German Markdown text
The point: Phase A makes unknown CMPs work without hand-coding each one.
The named library (Phase B) still takes priority because it produces nicer
text, but the heuristic catches everything else.
"""
from __future__ import annotations
import logging
from typing import Any
logger = logging.getLogger(__name__)
# ── Shape classifier ────────────────────────────────────────────────
# Keys whose presence strongly suggests "this JSON is a cookie policy".
# We require at least ONE of these at top-level OR within first nesting.
_SHAPE_KEYS = {
"cookies", "categories", "providers", "vendors", "purposes",
"cookielist", "cookiegroups", "consentcategories",
"cookiedeclaration", "groupedcookies", "groups",
"policy", "policypage", "policypagemetadata",
}
# Field names that mark a "category-like" or "vendor-like" object.
_OBJECT_NAME_FIELDS = ("name", "title", "label", "displayname",
"categoryname", "groupname", "vendorname",
"cookiename", "providername")
_OBJECT_DESC_FIELDS = ("description", "desc", "purpose", "zweck",
"explanation", "info", "details",
"groupdescription", "categorydescription",
"vendordescription", "providerdescription",
"descriptionhtml", "descriptiontext")
def looks_like_cookie_policy(data: Any) -> bool:
"""True when `data` shape strongly suggests a CMP cookie-policy payload.
Heuristic (any one is enough):
a) Top-level or first-nesting has one of `_SHAPE_KEYS` AND that key's
value is a non-empty list of dicts with name+description fields
b) IAB TCF v2 shape: top-level has `vendors` (list) AND `purposes` (list)
"""
if not isinstance(data, dict):
return False
# Direct top-level match
if _has_cookie_policy_shape(data):
return True
# First nesting (some CMPs wrap in {"data": {...}} or similar)
for v in data.values():
if isinstance(v, dict) and _has_cookie_policy_shape(v):
return True
# IAB TCF v2 shape
if isinstance(data.get("vendors"), list) and isinstance(data.get("purposes"), list):
if len(data["vendors"]) >= 2 and len(data["purposes"]) >= 2:
return True
return False
def _has_cookie_policy_shape(d: dict) -> bool:
lower_keys = {k.lower(): k for k in d.keys()}
matched = _SHAPE_KEYS & set(lower_keys.keys())
if not matched:
return False
# Verify at least one matched key holds a list of dicts that look like
# categories or vendors (name+description).
for low_key in matched:
val = d[lower_keys[low_key]]
if not isinstance(val, list) or len(val) < 2:
continue
well_formed = sum(
1 for entry in val
if isinstance(entry, dict)
and any(field in {k.lower() for k in entry.keys()} for field in _OBJECT_NAME_FIELDS)
)
if well_formed >= 2:
return True
return False
# ── Reconstruction ───────────────────────────────────────────────────
def reconstruct_generic(data: Any, max_words: int = 5000) -> str:
"""Walk the JSON structure, extract names/descriptions/purposes, and emit
a flat German Markdown text suitable for the compliance regex checker.
Limits output to `max_words` words to avoid pathological documents.
"""
parts: list[str] = ["# Cookie-Richtlinie"]
_walk(data, parts, depth=0, max_depth=6)
# Strip duplicates that often slip in (translations, repeated values)
seen: set[str] = set()
unique_parts: list[str] = []
for p in parts:
key = p.strip().lower()
if not key or key in seen:
continue
seen.add(key)
unique_parts.append(p)
text = "\n".join(unique_parts)
words = text.split()
if len(words) > max_words:
text = " ".join(words[:max_words])
return text
def _walk(node: Any, out: list[str], depth: int, max_depth: int) -> None:
if depth > max_depth:
return
if isinstance(node, dict):
# Emit name + description as a unit if both present
name = _first_field(node, _OBJECT_NAME_FIELDS)
desc = _first_field(node, _OBJECT_DESC_FIELDS)
if name and desc:
out.append("")
out.append(f"## {_clean(name)}")
out.append(_clean(desc))
elif name:
out.append("")
out.append(f"## {_clean(name)}")
elif desc:
out.append(_clean(desc))
# Common standalone fields
for key in ("prologue", "epilogue", "subheading", "datacontroller",
"expiresafter", "persistencedescription",
"persistencepurposetext", "persistencepurposedescription"):
val = _first_field(node, (key,))
if val:
out.append(_clean(val))
# Provider/vendor entries — emit as bullet line
provider_name = _first_field(node, ("vendorname", "providername"))
if provider_name and not name:
out.append(f"- {_clean(provider_name)}")
# Recurse into all values
for v in node.values():
_walk(v, out, depth + 1, max_depth)
elif isinstance(node, list):
for item in node:
_walk(item, out, depth + 1, max_depth)
def _first_field(d: dict, field_names: tuple[str, ...]) -> str:
"""Return first non-empty string value matching any of field_names (case-insensitive)."""
lower_map = {k.lower(): k for k in d.keys()}
for f in field_names:
actual_key = lower_map.get(f)
if actual_key:
v = d[actual_key]
if isinstance(v, str) and v.strip():
return v
return ""
_TAG_RE = None
def _clean(text: str) -> str:
"""Strip HTML tags and collapse whitespace."""
global _TAG_RE
if _TAG_RE is None:
import re
_TAG_RE = re.compile(r"<[^>]+>")
no_tags = _TAG_RE.sub(" ", text)
no_tags = (no_tags
.replace("&nbsp;", " ").replace("&amp;", "&")
.replace("&lt;", "<").replace("&gt;", ">")
.replace("&quot;", '"').replace("&#39;", "'"))
import re
return re.sub(r"\s+", " ", no_tags).strip()