""" CMP Extractor — capture Cookie-Policy data from Consent Management Platforms. Many sites (BMW, Daimler, big enterprise) do NOT render their cookie policy as static HTML. Instead, a JS widget loads structured data from a JSON endpoint (BMW: ePaaS; OneTrust: /consent/.json; Cookiebot: /uc.js; Usercentrics: /settings/.json) and renders it client-side after consent is given. This module sniffs network responses while Playwright loads the page and, if a CMP JSON is captured, reconstructs the cookie policy text. That bypasses the "the rendered HTML container is empty" problem entirely. Currently supported: - ePaaS (BMW Group): policypage/.../.epaas.json - OneTrust (placeholder): cdn.cookielaw.org/consent//.json Add more CMPs by extending `_MATCHERS` + a corresponding `_reconstruct_`. """ from __future__ import annotations import json import logging import re from typing import TYPE_CHECKING if TYPE_CHECKING: from playwright.async_api import Page, Response logger = logging.getLogger(__name__) # URL patterns that identify a CMP policy JSON. Order matters — first match wins. _MATCHERS: list[tuple[str, re.Pattern[str]]] = [ ("epaas", re.compile(r"/epaas/prod/policypage/[^/]+/[^/]+\.epaas\.json", re.I)), ("onetrust", re.compile(r"cdn\.cookielaw\.org/consent/[^/]+/[^/]+\.json", re.I)), ] class CMPCapture: """Holds CMP-related JSON payloads captured during navigation.""" def __init__(self) -> None: self.payloads: list[tuple[str, dict]] = [] # [(cmp_name, parsed_json), ...] def attach(self, page: Page) -> None: """Hook the page's response event. Must be called BEFORE page.goto().""" page.on("response", self._on_response) async def _on_response(self, response: Response) -> None: try: url = response.url for cmp_name, pattern in _MATCHERS: if pattern.search(url): if response.status != 200: logger.info("CMP %s response %s (%d) — skipped", cmp_name, url[:120], response.status) return try: data = await response.json() except Exception: body = await response.body() try: data = json.loads(body.decode("utf-8", errors="ignore")) except Exception: return self.payloads.append((cmp_name, data)) logger.info("CMP captured: %s (%s, ~%dKB)", cmp_name, url[:120], len(json.dumps(data)) // 1024) return except Exception as e: logger.debug("CMP listener error: %s", e) def reconstruct_cookie_policy(self) -> str: """Build a single Cookie-Policy text from all captured payloads. Returns empty string if nothing was captured or reconstruction fails. """ parts: list[str] = [] for cmp_name, data in self.payloads: try: if cmp_name == "epaas": parts.append(_reconstruct_epaas(data)) elif cmp_name == "onetrust": parts.append(_reconstruct_onetrust(data)) except Exception as e: logger.warning("CMP %s reconstruction failed: %s", cmp_name, e) return "\n\n".join(p for p in parts if p) def _reconstruct_epaas(d: dict) -> str: """Build a German Cookie-Policy from BMW ePaaS policy JSON. Schema (observed 2026-05): - policyPageMetadata: { heading, subHeading, prologue, dataController, epilogue, persistencePurposeText, expiresAfter, ... } - categories: [ { id, name, description, ... } ] - providers: [ { id, name, purpose, country, persistencePurposeDescription, ... } ] """ meta = d.get("policyPageMetadata", {}) or {} parts: list[str] = ["# Cookie-Richtlinie"] for key in ("heading", "subHeading", "prologue", "dataController", "epilogue"): val = meta.get(key) if val: parts.append("") parts.append(_clean_html(str(val))) cats = d.get("categories", []) or [] if cats: parts.append("") parts.append("## Cookie-Kategorien") for c in cats: name = c.get("name") or c.get("id") or "" desc = c.get("description") or c.get("descriptionHtml") or "" parts.append("") parts.append(f"### {name}") parts.append(_clean_html(str(desc))) providers = d.get("providers", []) or [] if providers: parts.append("") parts.append(f"## Anbieter ({len(providers)})") for p in providers: name = p.get("name") or p.get("id") or "" purpose = (p.get("purpose") or "").strip() country = (p.get("country") or "").strip() persistence = (p.get("persistencePurposeDescription") or "").strip() line = f"- {name}" if purpose: line += f" — Zweck: {purpose}" if country: line += f" — Sitz: {country}" if persistence: line += f" — Speicherdauer: {persistence[:120]}" parts.append(line) if meta.get("expiresAfter"): parts.append("") parts.append(f"Speicherdauer: {meta['expiresAfter']}") if meta.get("persistencePurposeText"): parts.append(_clean_html(str(meta["persistencePurposeText"]))) return "\n".join(parts) def _reconstruct_onetrust(d: dict) -> str: """Build a Cookie-Policy from OneTrust consent JSON. Schema varies; common fields: Groups[].GroupName/Description, Cookies[].Name. """ parts: list[str] = ["# Cookie-Richtlinie (OneTrust)"] groups = d.get("Groups") or d.get("groups") or [] for g in groups: name = g.get("GroupName") or g.get("name") or "" desc = g.get("GroupDescription") or g.get("description") or "" parts.append("") parts.append(f"## {name}") parts.append(_clean_html(str(desc))) cookies = g.get("Cookies") or g.get("cookies") or [] for c in cookies[:50]: cn = c.get("Name") or c.get("name") or "" cp = c.get("Provider") or c.get("provider") or "" cd = c.get("description") or c.get("Description") or "" ce = c.get("Length") or c.get("expires") or "" line = f"- {cn}" if cp: line += f" ({cp})" if cd: line += f" — {cd[:120]}" if ce: line += f" — Speicherdauer: {ce}" parts.append(line) return "\n".join(parts) _TAG_RE = re.compile(r"<[^>]+>") _WS_RE = re.compile(r"\s+") def _clean_html(text: str) -> str: """Strip HTML tags and collapse whitespace.""" no_tags = _TAG_RE.sub(" ", text) no_tags = (no_tags .replace(" ", " ").replace("&", "&") .replace("<", "<").replace(">", ">") .replace(""", '"').replace("'", "'")) return _WS_RE.sub(" ", no_tags).strip()