938f9a6c51
BMW ePaaS URLs use 3 segments between /policypage/ and .epaas.json: /epaas/prod/policypage/<tenant>/<config-hash>/<locale>.epaas.json The old pattern only matched 2 segments. Switch to a tolerant pattern that matches any path before .epaas.json (anchored at .epaas.json end).
191 lines
7.2 KiB
Python
191 lines
7.2 KiB
Python
"""
|
|
CMP Extractor — capture Cookie-Policy data from Consent Management Platforms.
|
|
|
|
Many sites (BMW, Daimler, big enterprise) do NOT render their cookie policy as
|
|
static HTML. Instead, a JS widget loads structured data from a JSON endpoint
|
|
(BMW: ePaaS; OneTrust: /consent/<id>.json; Cookiebot: /uc.js; Usercentrics:
|
|
/settings/<id>.json) and renders it client-side after consent is given.
|
|
|
|
This module sniffs network responses while Playwright loads the page and, if
|
|
a CMP JSON is captured, reconstructs the cookie policy text. That bypasses the
|
|
"the rendered HTML container is empty" problem entirely.
|
|
|
|
Currently supported:
|
|
- ePaaS (BMW Group): policypage/.../<locale>.epaas.json
|
|
- OneTrust (placeholder): cdn.cookielaw.org/consent/<id>/<id>.json
|
|
|
|
Add more CMPs by extending `_MATCHERS` + a corresponding `_reconstruct_<cmp>`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from playwright.async_api import Page, Response
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# URL patterns that identify a CMP policy JSON. Order matters — first match wins.
|
|
_MATCHERS: list[tuple[str, re.Pattern[str]]] = [
|
|
# BMW ePaaS: /epaas/prod/policypage/<tenant>/<config>/<locale>.epaas.json
|
|
# Use a tolerant pattern: any number of segments before .epaas.json
|
|
("epaas", re.compile(r"/epaas/prod/policypage/.+\.epaas\.json(\?|$)", re.I)),
|
|
("onetrust", re.compile(r"cdn\.cookielaw\.org/consent/[^/]+/[^/]+\.json", re.I)),
|
|
]
|
|
|
|
|
|
class CMPCapture:
|
|
"""Holds CMP-related JSON payloads captured during navigation."""
|
|
|
|
def __init__(self) -> None:
|
|
self.payloads: list[tuple[str, dict]] = [] # [(cmp_name, parsed_json), ...]
|
|
|
|
def attach(self, page: Page) -> None:
|
|
"""Hook the page's response event. Must be called BEFORE page.goto()."""
|
|
page.on("response", self._on_response)
|
|
|
|
async def _on_response(self, response: Response) -> None:
|
|
try:
|
|
url = response.url
|
|
for cmp_name, pattern in _MATCHERS:
|
|
if pattern.search(url):
|
|
if response.status != 200:
|
|
logger.info("CMP %s response %s (%d) — skipped",
|
|
cmp_name, url[:120], response.status)
|
|
return
|
|
try:
|
|
data = await response.json()
|
|
except Exception:
|
|
body = await response.body()
|
|
try:
|
|
data = json.loads(body.decode("utf-8", errors="ignore"))
|
|
except Exception:
|
|
return
|
|
self.payloads.append((cmp_name, data))
|
|
logger.info("CMP captured: %s (%s, ~%dKB)",
|
|
cmp_name, url[:120], len(json.dumps(data)) // 1024)
|
|
return
|
|
except Exception as e:
|
|
logger.debug("CMP listener error: %s", e)
|
|
|
|
def reconstruct_cookie_policy(self) -> str:
|
|
"""Build a single Cookie-Policy text from all captured payloads.
|
|
|
|
Returns empty string if nothing was captured or reconstruction fails.
|
|
"""
|
|
parts: list[str] = []
|
|
for cmp_name, data in self.payloads:
|
|
try:
|
|
if cmp_name == "epaas":
|
|
parts.append(_reconstruct_epaas(data))
|
|
elif cmp_name == "onetrust":
|
|
parts.append(_reconstruct_onetrust(data))
|
|
except Exception as e:
|
|
logger.warning("CMP %s reconstruction failed: %s", cmp_name, e)
|
|
return "\n\n".join(p for p in parts if p)
|
|
|
|
|
|
def _reconstruct_epaas(d: dict) -> str:
|
|
"""Build a German Cookie-Policy from BMW ePaaS policy JSON.
|
|
|
|
Schema (observed 2026-05):
|
|
- policyPageMetadata: { heading, subHeading, prologue, dataController,
|
|
epilogue, persistencePurposeText, expiresAfter, ... }
|
|
- categories: [ { id, name, description, ... } ]
|
|
- providers: [ { id, name, purpose, country, persistencePurposeDescription, ... } ]
|
|
"""
|
|
meta = d.get("policyPageMetadata", {}) or {}
|
|
parts: list[str] = ["# Cookie-Richtlinie"]
|
|
|
|
for key in ("heading", "subHeading", "prologue", "dataController", "epilogue"):
|
|
val = meta.get(key)
|
|
if val:
|
|
parts.append("")
|
|
parts.append(_clean_html(str(val)))
|
|
|
|
cats = d.get("categories", []) or []
|
|
if cats:
|
|
parts.append("")
|
|
parts.append("## Cookie-Kategorien")
|
|
for c in cats:
|
|
name = c.get("name") or c.get("id") or ""
|
|
desc = c.get("description") or c.get("descriptionHtml") or ""
|
|
parts.append("")
|
|
parts.append(f"### {name}")
|
|
parts.append(_clean_html(str(desc)))
|
|
|
|
providers = d.get("providers", []) or []
|
|
if providers:
|
|
parts.append("")
|
|
parts.append(f"## Anbieter ({len(providers)})")
|
|
for p in providers:
|
|
name = p.get("name") or p.get("id") or ""
|
|
purpose = (p.get("purpose") or "").strip()
|
|
country = (p.get("country") or "").strip()
|
|
persistence = (p.get("persistencePurposeDescription") or "").strip()
|
|
line = f"- {name}"
|
|
if purpose:
|
|
line += f" — Zweck: {purpose}"
|
|
if country:
|
|
line += f" — Sitz: {country}"
|
|
if persistence:
|
|
line += f" — Speicherdauer: {persistence[:120]}"
|
|
parts.append(line)
|
|
|
|
if meta.get("expiresAfter"):
|
|
parts.append("")
|
|
parts.append(f"Speicherdauer: {meta['expiresAfter']}")
|
|
if meta.get("persistencePurposeText"):
|
|
parts.append(_clean_html(str(meta["persistencePurposeText"])))
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
def _reconstruct_onetrust(d: dict) -> str:
|
|
"""Build a Cookie-Policy from OneTrust consent JSON.
|
|
|
|
Schema varies; common fields: Groups[].GroupName/Description, Cookies[].Name.
|
|
"""
|
|
parts: list[str] = ["# Cookie-Richtlinie (OneTrust)"]
|
|
groups = d.get("Groups") or d.get("groups") or []
|
|
for g in groups:
|
|
name = g.get("GroupName") or g.get("name") or ""
|
|
desc = g.get("GroupDescription") or g.get("description") or ""
|
|
parts.append("")
|
|
parts.append(f"## {name}")
|
|
parts.append(_clean_html(str(desc)))
|
|
cookies = g.get("Cookies") or g.get("cookies") or []
|
|
for c in cookies[:50]:
|
|
cn = c.get("Name") or c.get("name") or ""
|
|
cp = c.get("Provider") or c.get("provider") or ""
|
|
cd = c.get("description") or c.get("Description") or ""
|
|
ce = c.get("Length") or c.get("expires") or ""
|
|
line = f"- {cn}"
|
|
if cp:
|
|
line += f" ({cp})"
|
|
if cd:
|
|
line += f" — {cd[:120]}"
|
|
if ce:
|
|
line += f" — Speicherdauer: {ce}"
|
|
parts.append(line)
|
|
return "\n".join(parts)
|
|
|
|
|
|
_TAG_RE = re.compile(r"<[^>]+>")
|
|
_WS_RE = re.compile(r"\s+")
|
|
|
|
|
|
def _clean_html(text: str) -> str:
|
|
"""Strip HTML tags and collapse whitespace."""
|
|
no_tags = _TAG_RE.sub(" ", text)
|
|
no_tags = (no_tags
|
|
.replace(" ", " ").replace("&", "&")
|
|
.replace("<", "<").replace(">", ">")
|
|
.replace(""", '"').replace("'", "'"))
|
|
return _WS_RE.sub(" ", no_tags).strip()
|