From 8283483909360d11c41946a5fd067416a662f816 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sat, 16 May 2026 22:56:20 +0200 Subject: [PATCH] =?UTF-8?q?feat(consent-tester):=20Phase=20A=20=E2=80=94?= =?UTF-8?q?=20generic=20JSON=20cookie-policy=20heuristic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New module cmp_heuristic.py with: - looks_like_cookie_policy(data): shape-based classifier (top-level keys cookies/categories/providers/vendors/purposes/cookieList/etc. + at least 2 name+description objects, or IAB TCF v2 vendors[]+purposes[]) - reconstruct_generic(data): walks JSON, extracts name + description fields + standalone prologue/dataController/persistence fields, emits flat German Markdown text (max 5000 words, dedup) cmp_extractor.py wired so that AFTER named CMP matchers (epaas, onetrust) fail, every JSON response on the page is tested for the heuristic. If matched, payload is captured as '_heuristic' kind and reconstructed via the generic walker. This is Phase A of the 4-stage cascade (B-D follow). Unknown CMPs that return JSON now work without hand-coding each one. Pre-filter: skips response paths /api/config, /beacon, /track, /analytics, /fonts/, /log/, /heartbeat/, /.well-known/ to avoid spamming the heuristic on every Playwright load. --- consent-tester/services/cmp_extractor.py | 55 +++++-- consent-tester/services/cmp_heuristic.py | 191 +++++++++++++++++++++++ 2 files changed, 235 insertions(+), 11 deletions(-) create mode 100644 consent-tester/services/cmp_heuristic.py diff --git a/consent-tester/services/cmp_extractor.py b/consent-tester/services/cmp_extractor.py index 8bc4c2e4..5077c8b4 100644 --- a/consent-tester/services/cmp_extractor.py +++ b/consent-tester/services/cmp_extractor.py @@ -52,24 +52,40 @@ class CMPCapture: async def _on_response(self, response: Response) -> None: try: url = response.url + if response.status != 200: + return + + # 1) Named CMP matchers (highest quality) for cmp_name, pattern in _MATCHERS: if pattern.search(url): - if response.status != 200: - logger.info("CMP %s response %s (%d) — skipped", - cmp_name, url[:120], response.status) + data = await _parse_json_response(response) + if data is None: return - try: - data = await response.json() - except Exception: - body = await response.body() - try: - data = json.loads(body.decode("utf-8", errors="ignore")) - except Exception: - return self.payloads.append((cmp_name, data)) logger.info("CMP captured: %s (%s, ~%dKB)", cmp_name, url[:120], len(json.dumps(data)) // 1024) return + + # 2) Generic shape-based heuristic for unknown CMPs. + # Only consider JSON responses ≥1KB (skip small config blobs). + content_type = (response.headers.get("content-type") or "").lower() + if "json" not in content_type: + return + # Cheap pre-filter: skip noisy paths (analytics, fonts, etc.) + url_lower = url.lower() + if any(skip in url_lower for skip in ( + "/api/config", "/beacon", "/track", "/analytics", + "/fonts/", "/log/", "/heartbeat", "/.well-known/", + )): + return + data = await _parse_json_response(response) + if data is None: + return + from services.cmp_heuristic import looks_like_cookie_policy + if looks_like_cookie_policy(data): + self.payloads.append(("_heuristic", data)) + logger.info("CMP captured: _heuristic (%s, ~%dKB)", + url[:120], len(json.dumps(data)) // 1024) except Exception as e: logger.debug("CMP listener error: %s", e) @@ -77,7 +93,10 @@ class CMPCapture: """Build a single Cookie-Policy text from all captured payloads. Returns empty string if nothing was captured or reconstruction fails. + Named CMPs take precedence over the generic heuristic (richer output). """ + from services.cmp_heuristic import reconstruct_generic + parts: list[str] = [] for cmp_name, data in self.payloads: try: @@ -85,11 +104,25 @@ class CMPCapture: parts.append(_reconstruct_epaas(data)) elif cmp_name == "onetrust": parts.append(_reconstruct_onetrust(data)) + elif cmp_name == "_heuristic": + parts.append(reconstruct_generic(data)) except Exception as e: logger.warning("CMP %s reconstruction failed: %s", cmp_name, e) return "\n\n".join(p for p in parts if p) +async def _parse_json_response(response: Response) -> dict | None: + """Best-effort JSON parse from a Playwright Response.""" + try: + return await response.json() + except Exception: + try: + body = await response.body() + return json.loads(body.decode("utf-8", errors="ignore")) + except Exception: + return None + + def _reconstruct_epaas(d: dict) -> str: """Build a German Cookie-Policy from BMW ePaaS policy JSON. diff --git a/consent-tester/services/cmp_heuristic.py b/consent-tester/services/cmp_heuristic.py new file mode 100644 index 00000000..4b98feab --- /dev/null +++ b/consent-tester/services/cmp_heuristic.py @@ -0,0 +1,191 @@ +""" +Generic Cookie-Policy JSON heuristic. + +When a CMP we don't know yet returns a JSON payload, we can still recognize +"this JSON describes a cookie policy" by its shape. This module: + +1. `looks_like_cookie_policy(data)` — fast shape-based classifier +2. `reconstruct_generic(data)` — walks the JSON, extracts every name/ + description/purpose/expiry field and emits a flat German Markdown text + +The point: Phase A makes unknown CMPs work without hand-coding each one. +The named library (Phase B) still takes priority because it produces nicer +text, but the heuristic catches everything else. +""" + +from __future__ import annotations + +import logging +from typing import Any + +logger = logging.getLogger(__name__) + + +# ── Shape classifier ──────────────────────────────────────────────── + +# Keys whose presence strongly suggests "this JSON is a cookie policy". +# We require at least ONE of these at top-level OR within first nesting. +_SHAPE_KEYS = { + "cookies", "categories", "providers", "vendors", "purposes", + "cookielist", "cookiegroups", "consentcategories", + "cookiedeclaration", "groupedcookies", "groups", + "policy", "policypage", "policypagemetadata", +} + +# Field names that mark a "category-like" or "vendor-like" object. +_OBJECT_NAME_FIELDS = ("name", "title", "label", "displayname", + "categoryname", "groupname", "vendorname", + "cookiename", "providername") +_OBJECT_DESC_FIELDS = ("description", "desc", "purpose", "zweck", + "explanation", "info", "details", + "groupdescription", "categorydescription", + "vendordescription", "providerdescription", + "descriptionhtml", "descriptiontext") + + +def looks_like_cookie_policy(data: Any) -> bool: + """True when `data` shape strongly suggests a CMP cookie-policy payload. + + Heuristic (any one is enough): + a) Top-level or first-nesting has one of `_SHAPE_KEYS` AND that key's + value is a non-empty list of dicts with name+description fields + b) IAB TCF v2 shape: top-level has `vendors` (list) AND `purposes` (list) + """ + if not isinstance(data, dict): + return False + + # Direct top-level match + if _has_cookie_policy_shape(data): + return True + + # First nesting (some CMPs wrap in {"data": {...}} or similar) + for v in data.values(): + if isinstance(v, dict) and _has_cookie_policy_shape(v): + return True + + # IAB TCF v2 shape + if isinstance(data.get("vendors"), list) and isinstance(data.get("purposes"), list): + if len(data["vendors"]) >= 2 and len(data["purposes"]) >= 2: + return True + + return False + + +def _has_cookie_policy_shape(d: dict) -> bool: + lower_keys = {k.lower(): k for k in d.keys()} + matched = _SHAPE_KEYS & set(lower_keys.keys()) + if not matched: + return False + + # Verify at least one matched key holds a list of dicts that look like + # categories or vendors (name+description). + for low_key in matched: + val = d[lower_keys[low_key]] + if not isinstance(val, list) or len(val) < 2: + continue + well_formed = sum( + 1 for entry in val + if isinstance(entry, dict) + and any(field in {k.lower() for k in entry.keys()} for field in _OBJECT_NAME_FIELDS) + ) + if well_formed >= 2: + return True + return False + + +# ── Reconstruction ─────────────────────────────────────────────────── + +def reconstruct_generic(data: Any, max_words: int = 5000) -> str: + """Walk the JSON structure, extract names/descriptions/purposes, and emit + a flat German Markdown text suitable for the compliance regex checker. + + Limits output to `max_words` words to avoid pathological documents. + """ + parts: list[str] = ["# Cookie-Richtlinie"] + _walk(data, parts, depth=0, max_depth=6) + + # Strip duplicates that often slip in (translations, repeated values) + seen: set[str] = set() + unique_parts: list[str] = [] + for p in parts: + key = p.strip().lower() + if not key or key in seen: + continue + seen.add(key) + unique_parts.append(p) + + text = "\n".join(unique_parts) + words = text.split() + if len(words) > max_words: + text = " ".join(words[:max_words]) + return text + + +def _walk(node: Any, out: list[str], depth: int, max_depth: int) -> None: + if depth > max_depth: + return + + if isinstance(node, dict): + # Emit name + description as a unit if both present + name = _first_field(node, _OBJECT_NAME_FIELDS) + desc = _first_field(node, _OBJECT_DESC_FIELDS) + if name and desc: + out.append("") + out.append(f"## {_clean(name)}") + out.append(_clean(desc)) + elif name: + out.append("") + out.append(f"## {_clean(name)}") + elif desc: + out.append(_clean(desc)) + + # Common standalone fields + for key in ("prologue", "epilogue", "subheading", "datacontroller", + "expiresafter", "persistencedescription", + "persistencepurposetext", "persistencepurposedescription"): + val = _first_field(node, (key,)) + if val: + out.append(_clean(val)) + + # Provider/vendor entries — emit as bullet line + provider_name = _first_field(node, ("vendorname", "providername")) + if provider_name and not name: + out.append(f"- {_clean(provider_name)}") + + # Recurse into all values + for v in node.values(): + _walk(v, out, depth + 1, max_depth) + + elif isinstance(node, list): + for item in node: + _walk(item, out, depth + 1, max_depth) + + +def _first_field(d: dict, field_names: tuple[str, ...]) -> str: + """Return first non-empty string value matching any of field_names (case-insensitive).""" + lower_map = {k.lower(): k for k in d.keys()} + for f in field_names: + actual_key = lower_map.get(f) + if actual_key: + v = d[actual_key] + if isinstance(v, str) and v.strip(): + return v + return "" + + +_TAG_RE = None + + +def _clean(text: str) -> str: + """Strip HTML tags and collapse whitespace.""" + global _TAG_RE + if _TAG_RE is None: + import re + _TAG_RE = re.compile(r"<[^>]+>") + no_tags = _TAG_RE.sub(" ", text) + no_tags = (no_tags + .replace(" ", " ").replace("&", "&") + .replace("<", "<").replace(">", ">") + .replace(""", '"').replace("'", "'")) + import re + return re.sub(r"\s+", " ", no_tags).strip()