diff --git a/consent-tester/services/cmp_extractor.py b/consent-tester/services/cmp_extractor.py index 5077c8b4..d2ca2bcc 100644 --- a/consent-tester/services/cmp_extractor.py +++ b/consent-tester/services/cmp_extractor.py @@ -1,42 +1,35 @@ """ -CMP Extractor — capture Cookie-Policy data from Consent Management Platforms. +CMP Extractor — thin coordinator. -Many sites (BMW, Daimler, big enterprise) do NOT render their cookie policy as -static HTML. Instead, a JS widget loads structured data from a JSON endpoint -(BMW: ePaaS; OneTrust: /consent/.json; Cookiebot: /uc.js; Usercentrics: -/settings/.json) and renders it client-side after consent is given. +Captures CMP (Cookie Management Platform) JSON payloads from network responses +during page navigation. Three-stage cascade: -This module sniffs network responses while Playwright loads the page and, if -a CMP JSON is captured, reconstructs the cookie policy text. That bypasses the -"the rendered HTML container is empty" problem entirely. + 1. Named CMP library (services/cmp_library/) — best quality, hand-written + reconstructors per vendor (ePaaS, OneTrust, Cookiebot, Usercentrics, + Didomi, TrustArc, + auto-promoted entries from Phase E). + 2. Generic JSON cookie-policy heuristic (cmp_heuristic.py) — catches + unknown CMPs that still expose a recognizable JSON shape. -Currently supported: - - ePaaS (BMW Group): policypage/.../.epaas.json - - OneTrust (placeholder): cdn.cookielaw.org/consent//.json - -Add more CMPs by extending `_MATCHERS` + a corresponding `_reconstruct_`. +A single CMP page may emit multiple payloads (e.g. consentcontroller + +policypage); all are captured and concatenated by reconstruct_cookie_policy(). """ from __future__ import annotations import json import logging -import re -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Callable if TYPE_CHECKING: from playwright.async_api import Page, Response +from services.cmp_library._registry import load_all as _load_registry + logger = logging.getLogger(__name__) - -# URL patterns that identify a CMP policy JSON. Order matters — first match wins. -_MATCHERS: list[tuple[str, re.Pattern[str]]] = [ - # BMW ePaaS: /epaas/prod/policypage///.epaas.json - # Use a tolerant pattern: any number of segments before .epaas.json - ("epaas", re.compile(r"/epaas/prod/policypage/.+\.epaas\.json(\?|$)", re.I)), - ("onetrust", re.compile(r"cdn\.cookielaw\.org/consent/[^/]+/[^/]+\.json", re.I)), -] +# Loaded once at import time. Restart consent-tester to pick up new modules. +_REGISTRY: list[tuple[str, object, Callable[[dict], str]]] = _load_registry() +logger.info("CMP library loaded: %d named matchers", len(_REGISTRY)) class CMPCapture: @@ -51,60 +44,67 @@ class CMPCapture: async def _on_response(self, response: Response) -> None: try: - url = response.url if response.status != 200: return + url = response.url - # 1) Named CMP matchers (highest quality) - for cmp_name, pattern in _MATCHERS: - if pattern.search(url): + # Stage 1: Named CMP matchers (highest quality) + for cmp_name, matcher, _reconstruct in _REGISTRY: + if matcher.search(url): # type: ignore[attr-defined] data = await _parse_json_response(response) if data is None: return self.payloads.append((cmp_name, data)) - logger.info("CMP captured: %s (%s, ~%dKB)", - cmp_name, url[:120], len(json.dumps(data)) // 1024) + logger.info( + "CMP captured: %s (%s, ~%dKB)", + cmp_name, url[:120], len(json.dumps(data)) // 1024, + ) return - # 2) Generic shape-based heuristic for unknown CMPs. - # Only consider JSON responses ≥1KB (skip small config blobs). + # Stage 2: Generic heuristic for unknown CMPs. + # Pre-filter: skip noisy/irrelevant endpoints to avoid spamming. content_type = (response.headers.get("content-type") or "").lower() if "json" not in content_type: return - # Cheap pre-filter: skip noisy paths (analytics, fonts, etc.) url_lower = url.lower() if any(skip in url_lower for skip in ( "/api/config", "/beacon", "/track", "/analytics", "/fonts/", "/log/", "/heartbeat", "/.well-known/", + "/intake/", "/collect", "/ping", "/metrics", )): return + data = await _parse_json_response(response) if data is None: return from services.cmp_heuristic import looks_like_cookie_policy if looks_like_cookie_policy(data): self.payloads.append(("_heuristic", data)) - logger.info("CMP captured: _heuristic (%s, ~%dKB)", - url[:120], len(json.dumps(data)) // 1024) + logger.info( + "CMP captured: _heuristic (%s, ~%dKB)", + url[:120], len(json.dumps(data)) // 1024, + ) except Exception as e: logger.debug("CMP listener error: %s", e) def reconstruct_cookie_policy(self) -> str: - """Build a single Cookie-Policy text from all captured payloads. - - Returns empty string if nothing was captured or reconstruction fails. - Named CMPs take precedence over the generic heuristic (richer output). - """ + """Build a single Cookie-Policy text from all captured payloads.""" from services.cmp_heuristic import reconstruct_generic + # Build a quick lookup so we can dispatch by name without re-loading + # the registry on every call. + by_name = {name: fn for name, _matcher, fn in _REGISTRY} + parts: list[str] = [] for cmp_name, data in self.payloads: try: - if cmp_name == "epaas": - parts.append(_reconstruct_epaas(data)) - elif cmp_name == "onetrust": - parts.append(_reconstruct_onetrust(data)) - elif cmp_name == "_heuristic": + if cmp_name == "_heuristic": + parts.append(reconstruct_generic(data)) + elif cmp_name in by_name: + parts.append(by_name[cmp_name](data)) + else: + # Unknown name (perhaps a hot-loaded module that was + # since removed) — fall back to generic. parts.append(reconstruct_generic(data)) except Exception as e: logger.warning("CMP %s reconstruction failed: %s", cmp_name, e) @@ -121,103 +121,3 @@ async def _parse_json_response(response: Response) -> dict | None: return json.loads(body.decode("utf-8", errors="ignore")) except Exception: return None - - -def _reconstruct_epaas(d: dict) -> str: - """Build a German Cookie-Policy from BMW ePaaS policy JSON. - - Schema (observed 2026-05): - - policyPageMetadata: { heading, subHeading, prologue, dataController, - epilogue, persistencePurposeText, expiresAfter, ... } - - categories: [ { id, name, description, ... } ] - - providers: [ { id, name, purpose, country, persistencePurposeDescription, ... } ] - """ - meta = d.get("policyPageMetadata", {}) or {} - parts: list[str] = ["# Cookie-Richtlinie"] - - for key in ("heading", "subHeading", "prologue", "dataController", "epilogue"): - val = meta.get(key) - if val: - parts.append("") - parts.append(_clean_html(str(val))) - - cats = d.get("categories", []) or [] - if cats: - parts.append("") - parts.append("## Cookie-Kategorien") - for c in cats: - name = c.get("name") or c.get("id") or "" - desc = c.get("description") or c.get("descriptionHtml") or "" - parts.append("") - parts.append(f"### {name}") - parts.append(_clean_html(str(desc))) - - providers = d.get("providers", []) or [] - if providers: - parts.append("") - parts.append(f"## Anbieter ({len(providers)})") - for p in providers: - name = p.get("name") or p.get("id") or "" - purpose = (p.get("purpose") or "").strip() - country = (p.get("country") or "").strip() - persistence = (p.get("persistencePurposeDescription") or "").strip() - line = f"- {name}" - if purpose: - line += f" — Zweck: {purpose}" - if country: - line += f" — Sitz: {country}" - if persistence: - line += f" — Speicherdauer: {persistence[:120]}" - parts.append(line) - - if meta.get("expiresAfter"): - parts.append("") - parts.append(f"Speicherdauer: {meta['expiresAfter']}") - if meta.get("persistencePurposeText"): - parts.append(_clean_html(str(meta["persistencePurposeText"]))) - - return "\n".join(parts) - - -def _reconstruct_onetrust(d: dict) -> str: - """Build a Cookie-Policy from OneTrust consent JSON. - - Schema varies; common fields: Groups[].GroupName/Description, Cookies[].Name. - """ - parts: list[str] = ["# Cookie-Richtlinie (OneTrust)"] - groups = d.get("Groups") or d.get("groups") or [] - for g in groups: - name = g.get("GroupName") or g.get("name") or "" - desc = g.get("GroupDescription") or g.get("description") or "" - parts.append("") - parts.append(f"## {name}") - parts.append(_clean_html(str(desc))) - cookies = g.get("Cookies") or g.get("cookies") or [] - for c in cookies[:50]: - cn = c.get("Name") or c.get("name") or "" - cp = c.get("Provider") or c.get("provider") or "" - cd = c.get("description") or c.get("Description") or "" - ce = c.get("Length") or c.get("expires") or "" - line = f"- {cn}" - if cp: - line += f" ({cp})" - if cd: - line += f" — {cd[:120]}" - if ce: - line += f" — Speicherdauer: {ce}" - parts.append(line) - return "\n".join(parts) - - -_TAG_RE = re.compile(r"<[^>]+>") -_WS_RE = re.compile(r"\s+") - - -def _clean_html(text: str) -> str: - """Strip HTML tags and collapse whitespace.""" - no_tags = _TAG_RE.sub(" ", text) - no_tags = (no_tags - .replace(" ", " ").replace("&", "&") - .replace("<", "<").replace(">", ">") - .replace(""", '"').replace("'", "'")) - return _WS_RE.sub(" ", no_tags).strip() diff --git a/consent-tester/services/cmp_library/__init__.py b/consent-tester/services/cmp_library/__init__.py new file mode 100644 index 00000000..e0a95982 --- /dev/null +++ b/consent-tester/services/cmp_library/__init__.py @@ -0,0 +1 @@ +"""Named CMP library — one module per platform, loaded by _registry.py.""" diff --git a/consent-tester/services/cmp_library/_registry.py b/consent-tester/services/cmp_library/_registry.py new file mode 100644 index 00000000..9ef6e944 --- /dev/null +++ b/consent-tester/services/cmp_library/_registry.py @@ -0,0 +1,55 @@ +""" +CMP Library Registry — discovers and registers all CMP modules. + +Each CMP module exports two module-level symbols: + - MATCHER: a compiled regex matching the JSON endpoint URL + - reconstruct(data: dict) -> str: builds the cookie-policy text from JSON + +The registry auto-discovers: + 1. Hand-written modules: epaas, onetrust, cookiebot, usercentrics, + didomi, trustarc + 2. Auto-promoted modules: any file matching `auto_*.py` in this folder + (created by Phase E when an LLM successfully discovers a new pattern) + +A consent-tester restart picks up new auto_*.py files automatically. +""" + +from __future__ import annotations + +import importlib +import logging +import pkgutil +from pathlib import Path +from typing import Callable + +logger = logging.getLogger(__name__) + +# (cmp_name, url_pattern, reconstruct_fn) +Registry = list[tuple[str, "object", Callable[[dict], str]]] + + +def load_all() -> Registry: + """Import every module in this package and collect MATCHER + reconstruct.""" + import services.cmp_library as pkg # type: ignore[import-not-found] + registry: Registry = [] + + pkg_path = Path(pkg.__file__).parent + for module_info in pkgutil.iter_modules([str(pkg_path)]): + name = module_info.name + if name.startswith("_"): + continue + try: + module = importlib.import_module(f"services.cmp_library.{name}") + matcher = getattr(module, "MATCHER", None) + reconstruct = getattr(module, "reconstruct", None) + if matcher is None or not callable(reconstruct): + logger.warning( + "CMP module %s missing MATCHER or reconstruct() — skipped", name, + ) + continue + registry.append((name, matcher, reconstruct)) + logger.info("CMP loaded: %s", name) + except Exception as e: + logger.warning("CMP module %s failed to load: %s", name, e) + + return registry diff --git a/consent-tester/services/cmp_library/cookiebot.py b/consent-tester/services/cmp_library/cookiebot.py new file mode 100644 index 00000000..9d22d5f2 --- /dev/null +++ b/consent-tester/services/cmp_library/cookiebot.py @@ -0,0 +1,46 @@ +"""Cookiebot (by Usercentrics A/S — separate product from Usercentrics CMP). + +URLs (multiple shapes observed): + - consent.cookiebot.com//cc.js (JSONP-wrapped) + - consent.cookiebot.com/uc.js?... (JSONP) + - consent.cookiebot.com//cd.js (cookie declaration) +We accept any URL on consent.cookiebot.com that returns JSON-like data. +The capture pipeline JSON-decodes; if it's JSONP we'd need to strip the +callback wrapper. For now we match only direct JSON responses. + +Schema (cookiedeclaration JSON): + Categories: list with name + cookies (each with name, vendor, expires) +""" + +import re + +MATCHER = re.compile(r"consent\.cookiebot\.com/.*\.(?:json|js)(?:\?|$)", re.I) + + +def reconstruct(d: dict) -> str: + parts: list[str] = ["# Cookie-Richtlinie (Cookiebot)"] + + cats = d.get("Categories") or d.get("categories") or [] + for cat in cats: + name = cat.get("Name") or cat.get("name") or "" + desc = cat.get("Description") or cat.get("description") or "" + parts.append("") + parts.append(f"## {name}") + if desc: + parts.append(desc) + cookies = cat.get("Cookies") or cat.get("cookies") or [] + for c in cookies[:50]: + cn = c.get("Name") or c.get("name") or "" + vendor = c.get("Vendor") or c.get("vendor") or "" + expires = c.get("Expires") or c.get("expires") or "" + purpose = c.get("Purpose") or c.get("purpose") or "" + line = f"- {cn}" + if vendor: + line += f" ({vendor})" + if purpose: + line += f" — {purpose[:120]}" + if expires: + line += f" — Speicherdauer: {expires}" + parts.append(line) + + return "\n".join(parts) diff --git a/consent-tester/services/cmp_library/didomi.py b/consent-tester/services/cmp_library/didomi.py new file mode 100644 index 00000000..9a22e8c5 --- /dev/null +++ b/consent-tester/services/cmp_library/didomi.py @@ -0,0 +1,51 @@ +"""Didomi CMP. + +URLs: + - sdk.privacy-center.org//notice/.json + - api.privacy-center.org/v1/notices/... +Schema: app.vendors[], app.purposes[], notice texts +""" + +import re + +MATCHER = re.compile( + r"(?:sdk|api)\.privacy-center\.org/.+/notice[s]?/.*\.json(?:\?|$)", re.I, +) + + +def reconstruct(d: dict) -> str: + parts: list[str] = ["# Cookie-Richtlinie (Didomi)"] + + app = d.get("app", d) or {} + notice = d.get("notice", {}) or app.get("notice", {}) or {} + for key in ("content", "title", "subtitle"): + v = notice.get(key) + if v: + parts.append("") + parts.append(str(v)) + + purposes = app.get("purposes") or d.get("purposes") or [] + if purposes: + parts.append("") + parts.append("## Zwecke") + for p in purposes: + name = p.get("name") or p.get("id") or "" + desc = p.get("description") or "" + parts.append(f"- {name}: {desc[:200]}") + + vendors = app.get("vendors") or d.get("vendors") or [] + if vendors: + parts.append("") + parts.append(f"## Anbieter ({len(vendors)})") + for v in vendors[:80]: + name = v.get("name") or "" + country = v.get("country") or "" + policy = v.get("policyUrl") or "" + line = f"- {name}" + if country: + line += f" — Sitz: {country}" + if policy: + line += f" — Datenschutz: {policy}" + parts.append(line) + + return "\n".join(parts) diff --git a/consent-tester/services/cmp_library/epaas.py b/consent-tester/services/cmp_library/epaas.py new file mode 100644 index 00000000..4cc8e063 --- /dev/null +++ b/consent-tester/services/cmp_library/epaas.py @@ -0,0 +1,69 @@ +"""BMW Group ePaaS (Enterprise Privacy as a Service). + +URL: /epaas/prod/policypage///.epaas.json +Schema: policyPageMetadata + categories + providers +""" + +import re + +MATCHER = re.compile(r"/epaas/prod/policypage/.+\.epaas\.json(\?|$)", re.I) + +_TAG_RE = re.compile(r"<[^>]+>") +_WS_RE = re.compile(r"\s+") + + +def _clean(text: str) -> str: + no_tags = _TAG_RE.sub(" ", text) + no_tags = (no_tags + .replace(" ", " ").replace("&", "&") + .replace("<", "<").replace(">", ">") + .replace(""", '"').replace("'", "'")) + return _WS_RE.sub(" ", no_tags).strip() + + +def reconstruct(d: dict) -> str: + meta = d.get("policyPageMetadata", {}) or {} + parts: list[str] = ["# Cookie-Richtlinie"] + + for key in ("heading", "subHeading", "prologue", "dataController", "epilogue"): + val = meta.get(key) + if val: + parts.append("") + parts.append(_clean(str(val))) + + cats = d.get("categories", []) or [] + if cats: + parts.append("") + parts.append("## Cookie-Kategorien") + for c in cats: + name = c.get("name") or c.get("id") or "" + desc = c.get("description") or c.get("descriptionHtml") or "" + parts.append("") + parts.append(f"### {name}") + parts.append(_clean(str(desc))) + + providers = d.get("providers", []) or [] + if providers: + parts.append("") + parts.append(f"## Anbieter ({len(providers)})") + for p in providers: + name = p.get("name") or p.get("id") or "" + purpose = (p.get("purpose") or "").strip() + country = (p.get("country") or "").strip() + persistence = (p.get("persistencePurposeDescription") or "").strip() + line = f"- {name}" + if purpose: + line += f" — Zweck: {purpose}" + if country: + line += f" — Sitz: {country}" + if persistence: + line += f" — Speicherdauer: {persistence[:120]}" + parts.append(line) + + if meta.get("expiresAfter"): + parts.append("") + parts.append(f"Speicherdauer: {meta['expiresAfter']}") + if meta.get("persistencePurposeText"): + parts.append(_clean(str(meta["persistencePurposeText"]))) + + return "\n".join(parts) diff --git a/consent-tester/services/cmp_library/onetrust.py b/consent-tester/services/cmp_library/onetrust.py new file mode 100644 index 00000000..4b97a684 --- /dev/null +++ b/consent-tester/services/cmp_library/onetrust.py @@ -0,0 +1,56 @@ +"""OneTrust Cookie Consent. + +URL: cdn.cookielaw.org/consent//.json + OR cdn.cookielaw.org/consent//.json +Schema: Groups[] with GroupName, GroupDescription, Cookies[] +""" + +import re + +MATCHER = re.compile(r"cdn\.cookielaw\.org/consent/[^/]+/[^/]+\.json", re.I) + +_TAG_RE = re.compile(r"<[^>]+>") +_WS_RE = re.compile(r"\s+") + + +def _clean(text: str) -> str: + no_tags = _TAG_RE.sub(" ", text) + no_tags = no_tags.replace(" ", " ").replace("&", "&") + return _WS_RE.sub(" ", no_tags).strip() + + +def reconstruct(d: dict) -> str: + parts: list[str] = ["# Cookie-Richtlinie (OneTrust)"] + + # Optional preamble fields + for key in ("Description", "PolicyText", "PolicyDescription"): + val = d.get(key) + if val: + parts.append("") + parts.append(_clean(str(val))) + + groups = d.get("Groups") or d.get("groups") or [] + for g in groups: + name = g.get("GroupName") or g.get("name") or "" + desc = g.get("GroupDescription") or g.get("description") or "" + parts.append("") + parts.append(f"## {name}") + if desc: + parts.append(_clean(str(desc))) + + cookies = g.get("Cookies") or g.get("cookies") or [] + for c in cookies[:50]: + cn = c.get("Name") or c.get("name") or "" + cp = c.get("Provider") or c.get("provider") or "" + cd = c.get("description") or c.get("Description") or "" + ce = c.get("Length") or c.get("expires") or "" + line = f"- {cn}" + if cp: + line += f" ({cp})" + if cd: + line += f" — {cd[:120]}" + if ce: + line += f" — Speicherdauer: {ce}" + parts.append(line) + + return "\n".join(parts) diff --git a/consent-tester/services/cmp_library/trustarc.py b/consent-tester/services/cmp_library/trustarc.py new file mode 100644 index 00000000..04cb241f --- /dev/null +++ b/consent-tester/services/cmp_library/trustarc.py @@ -0,0 +1,53 @@ +"""TrustArc / TRUSTe CMP. + +URLs: + - consent.trustarc.com/v2/notice/ + - cookie-pref.trustarc.com/... +Schema varies; typically categories[] + vendors[] +""" + +import re + +MATCHER = re.compile( + r"(?:consent|cookie-pref|tr-cdn)\.trustarc\.com/.+\.(?:json|js)(?:\?|$)", re.I, +) + + +def reconstruct(d: dict) -> str: + parts: list[str] = ["# Cookie-Richtlinie (TrustArc)"] + + for key in ("title", "summary", "description", "intro"): + v = d.get(key) + if v: + parts.append("") + parts.append(str(v)) + + cats = d.get("categories") or d.get("Categories") or [] + for c in cats: + name = c.get("name") or c.get("Name") or "" + desc = c.get("description") or c.get("Description") or "" + parts.append("") + parts.append(f"## {name}") + if desc: + parts.append(str(desc)) + cookies = c.get("cookies") or c.get("Cookies") or [] + for ck in cookies[:50]: + cn = ck.get("name") or ck.get("Name") or "" + cp = ck.get("purpose") or ck.get("Purpose") or "" + ce = ck.get("expires") or ck.get("Expires") or "" + line = f"- {cn}" + if cp: + line += f" — {cp[:120]}" + if ce: + line += f" — Speicherdauer: {ce}" + parts.append(line) + + vendors = d.get("vendors") or d.get("Vendors") or [] + if vendors: + parts.append("") + parts.append(f"## Anbieter ({len(vendors)})") + for v in vendors[:80]: + name = v.get("name") or v.get("Name") or "" + parts.append(f"- {name}") + + return "\n".join(parts) diff --git a/consent-tester/services/cmp_library/usercentrics.py b/consent-tester/services/cmp_library/usercentrics.py new file mode 100644 index 00000000..59a6d5e1 --- /dev/null +++ b/consent-tester/services/cmp_library/usercentrics.py @@ -0,0 +1,52 @@ +"""Usercentrics CMP. + +URLs: + - api.usercentrics.eu/settings//.json + - app.usercentrics.eu/api/... +Schema: services[] with dataProcessor, dataPurpose, cookieMaxAgeSeconds +""" + +import re + +MATCHER = re.compile(r"(?:api|app)\.usercentrics\.eu/.+\.json(?:\?|$)", re.I) + + +def reconstruct(d: dict) -> str: + parts: list[str] = ["# Cookie-Richtlinie (Usercentrics)"] + + services = d.get("services") or d.get("dataProcessingServices") or [] + if not services and isinstance(d.get("settings"), dict): + services = d["settings"].get("services") or [] + + for s in services: + name = s.get("name") or s.get("dataProcessor") or "" + purpose = s.get("dataPurpose") or s.get("purpose") or "" + desc = s.get("description") or "" + country = s.get("processingCompanyCountry") or s.get("country") or "" + max_age = s.get("cookieMaxAgeSeconds") + retention = s.get("retentionPeriodDescription") or "" + + parts.append("") + parts.append(f"## {name}") + if desc: + parts.append(desc) + if purpose: + parts.append(f"Zweck: {purpose}") + if country: + parts.append(f"Sitz: {country}") + if max_age: + days = max_age // 86400 if isinstance(max_age, int) else max_age + parts.append(f"Speicherdauer: {days} Tage") + if retention: + parts.append(f"Aufbewahrung: {retention}") + + categories = d.get("categories") or [] + for c in categories: + name = c.get("name") or "" + desc = c.get("description") or "" + parts.append("") + parts.append(f"## Kategorie: {name}") + if desc: + parts.append(desc) + + return "\n".join(parts)