diff --git a/consent-tester/services/cmp_extractor.py b/consent-tester/services/cmp_extractor.py new file mode 100644 index 00000000..8b5a19e9 --- /dev/null +++ b/consent-tester/services/cmp_extractor.py @@ -0,0 +1,188 @@ +""" +CMP Extractor — capture Cookie-Policy data from Consent Management Platforms. + +Many sites (BMW, Daimler, big enterprise) do NOT render their cookie policy as +static HTML. Instead, a JS widget loads structured data from a JSON endpoint +(BMW: ePaaS; OneTrust: /consent/.json; Cookiebot: /uc.js; Usercentrics: +/settings/.json) and renders it client-side after consent is given. + +This module sniffs network responses while Playwright loads the page and, if +a CMP JSON is captured, reconstructs the cookie policy text. That bypasses the +"the rendered HTML container is empty" problem entirely. + +Currently supported: + - ePaaS (BMW Group): policypage/.../.epaas.json + - OneTrust (placeholder): cdn.cookielaw.org/consent//.json + +Add more CMPs by extending `_MATCHERS` + a corresponding `_reconstruct_`. +""" + +from __future__ import annotations + +import json +import logging +import re +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from playwright.async_api import Page, Response + +logger = logging.getLogger(__name__) + + +# URL patterns that identify a CMP policy JSON. Order matters — first match wins. +_MATCHERS: list[tuple[str, re.Pattern[str]]] = [ + ("epaas", re.compile(r"/epaas/prod/policypage/[^/]+/[^/]+\.epaas\.json", re.I)), + ("onetrust", re.compile(r"cdn\.cookielaw\.org/consent/[^/]+/[^/]+\.json", re.I)), +] + + +class CMPCapture: + """Holds CMP-related JSON payloads captured during navigation.""" + + def __init__(self) -> None: + self.payloads: list[tuple[str, dict]] = [] # [(cmp_name, parsed_json), ...] + + def attach(self, page: Page) -> None: + """Hook the page's response event. Must be called BEFORE page.goto().""" + page.on("response", self._on_response) + + async def _on_response(self, response: Response) -> None: + try: + url = response.url + for cmp_name, pattern in _MATCHERS: + if pattern.search(url): + if response.status != 200: + logger.info("CMP %s response %s (%d) — skipped", + cmp_name, url[:120], response.status) + return + try: + data = await response.json() + except Exception: + body = await response.body() + try: + data = json.loads(body.decode("utf-8", errors="ignore")) + except Exception: + return + self.payloads.append((cmp_name, data)) + logger.info("CMP captured: %s (%s, ~%dKB)", + cmp_name, url[:120], len(json.dumps(data)) // 1024) + return + except Exception as e: + logger.debug("CMP listener error: %s", e) + + def reconstruct_cookie_policy(self) -> str: + """Build a single Cookie-Policy text from all captured payloads. + + Returns empty string if nothing was captured or reconstruction fails. + """ + parts: list[str] = [] + for cmp_name, data in self.payloads: + try: + if cmp_name == "epaas": + parts.append(_reconstruct_epaas(data)) + elif cmp_name == "onetrust": + parts.append(_reconstruct_onetrust(data)) + except Exception as e: + logger.warning("CMP %s reconstruction failed: %s", cmp_name, e) + return "\n\n".join(p for p in parts if p) + + +def _reconstruct_epaas(d: dict) -> str: + """Build a German Cookie-Policy from BMW ePaaS policy JSON. + + Schema (observed 2026-05): + - policyPageMetadata: { heading, subHeading, prologue, dataController, + epilogue, persistencePurposeText, expiresAfter, ... } + - categories: [ { id, name, description, ... } ] + - providers: [ { id, name, purpose, country, persistencePurposeDescription, ... } ] + """ + meta = d.get("policyPageMetadata", {}) or {} + parts: list[str] = ["# Cookie-Richtlinie"] + + for key in ("heading", "subHeading", "prologue", "dataController", "epilogue"): + val = meta.get(key) + if val: + parts.append("") + parts.append(_clean_html(str(val))) + + cats = d.get("categories", []) or [] + if cats: + parts.append("") + parts.append("## Cookie-Kategorien") + for c in cats: + name = c.get("name") or c.get("id") or "" + desc = c.get("description") or c.get("descriptionHtml") or "" + parts.append("") + parts.append(f"### {name}") + parts.append(_clean_html(str(desc))) + + providers = d.get("providers", []) or [] + if providers: + parts.append("") + parts.append(f"## Anbieter ({len(providers)})") + for p in providers: + name = p.get("name") or p.get("id") or "" + purpose = (p.get("purpose") or "").strip() + country = (p.get("country") or "").strip() + persistence = (p.get("persistencePurposeDescription") or "").strip() + line = f"- {name}" + if purpose: + line += f" — Zweck: {purpose}" + if country: + line += f" — Sitz: {country}" + if persistence: + line += f" — Speicherdauer: {persistence[:120]}" + parts.append(line) + + if meta.get("expiresAfter"): + parts.append("") + parts.append(f"Speicherdauer: {meta['expiresAfter']}") + if meta.get("persistencePurposeText"): + parts.append(_clean_html(str(meta["persistencePurposeText"]))) + + return "\n".join(parts) + + +def _reconstruct_onetrust(d: dict) -> str: + """Build a Cookie-Policy from OneTrust consent JSON. + + Schema varies; common fields: Groups[].GroupName/Description, Cookies[].Name. + """ + parts: list[str] = ["# Cookie-Richtlinie (OneTrust)"] + groups = d.get("Groups") or d.get("groups") or [] + for g in groups: + name = g.get("GroupName") or g.get("name") or "" + desc = g.get("GroupDescription") or g.get("description") or "" + parts.append("") + parts.append(f"## {name}") + parts.append(_clean_html(str(desc))) + cookies = g.get("Cookies") or g.get("cookies") or [] + for c in cookies[:50]: + cn = c.get("Name") or c.get("name") or "" + cp = c.get("Provider") or c.get("provider") or "" + cd = c.get("description") or c.get("Description") or "" + ce = c.get("Length") or c.get("expires") or "" + line = f"- {cn}" + if cp: + line += f" ({cp})" + if cd: + line += f" — {cd[:120]}" + if ce: + line += f" — Speicherdauer: {ce}" + parts.append(line) + return "\n".join(parts) + + +_TAG_RE = re.compile(r"<[^>]+>") +_WS_RE = re.compile(r"\s+") + + +def _clean_html(text: str) -> str: + """Strip HTML tags and collapse whitespace.""" + no_tags = _TAG_RE.sub(" ", text) + no_tags = (no_tags + .replace(" ", " ").replace("&", "&") + .replace("<", "<").replace(">", ">") + .replace(""", '"').replace("'", "'")) + return _WS_RE.sub(" ", no_tags).strip() diff --git a/consent-tester/services/dsi_discovery.py b/consent-tester/services/dsi_discovery.py index bf170f66..304c2425 100644 --- a/consent-tester/services/dsi_discovery.py +++ b/consent-tester/services/dsi_discovery.py @@ -24,6 +24,7 @@ from urllib.parse import urlparse, urljoin from playwright.async_api import Page from services.dsi_helpers import goto_resilient, try_dismiss_consent_banner, is_pdf_redirect +from services.cmp_extractor import CMPCapture logger = logging.getLogger(__name__) @@ -221,6 +222,11 @@ async def discover_dsi_documents( seen_urls: set[str] = set() seen_titles: set[str] = set() + # CMP capture must be wired BEFORE navigation so we catch the JSON requests + # that fire as soon as the consent widget initializes (e.g. BMW ePaaS). + cmp_capture = CMPCapture() + cmp_capture.attach(page) + try: # Step 1: Load the page (with networkidle → domcontentloaded fallback) await goto_resilient(page, url, timeout=60000) @@ -302,6 +308,22 @@ async def discover_dsi_documents( self_wc = len(self_text.split()) logger.info("Self-extraction via iframe for %s: %d words", url, self_wc) + # If the rendered DOM is still short, the page is likely a + # JS-injected CMP widget (BMW ePaaS, OneTrust Cookie List). + # Use the JSON we captured from network responses instead — + # that's the structured source the widget would have rendered. + # We also prefer CMP data over thin DOM extraction (< 300 words) + # because thin DOM = mostly site navigation, not policy. + if self_wc < 300 and cmp_capture.payloads: + cmp_text = cmp_capture.reconstruct_cookie_policy() + cmp_wc = len(cmp_text.split()) if cmp_text else 0 + if cmp_wc > self_wc: + self_text = cmp_text + self_wc = cmp_wc + logger.info("Self-extraction via CMP capture for %s: %d words " + "(%d CMP payloads)", url, self_wc, + len(cmp_capture.payloads)) + if self_wc >= 100: page_title = await page.title() or url result.documents.append(DiscoveredDSI(