feat(consent-tester): Phase B — named CMP library + plugin architecture

cmp_extractor.py refactored to thin coordinator (123 LOC, was 223).
Discovers all CMP modules via cmp_library/_registry.py:load_all() at
import time. Restart consent-tester to pick up new modules.

New cmp_library/ folder:
- _registry.py: auto-discovers all modules with MATCHER + reconstruct()
- epaas.py:     BMW Group ePaaS (extracted from cmp_extractor)
- onetrust.py:  cdn.cookielaw.org Groups/Cookies schema
- cookiebot.py: consent.cookiebot.com Categories schema
- usercentrics.py: api.usercentrics.eu services schema
- didomi.py:    sdk.privacy-center.org notice + vendors + purposes
- trustarc.py:  consent.trustarc.com categories + vendors

Each module:
- MATCHER: re.Pattern matching the CMP JSON endpoint URL
- reconstruct(d: dict) -> str: builds German Markdown cookie-policy text

Phase E (self-improving) will write auto_*.py files into the same folder;
_registry already picks those up via pkgutil.iter_modules.
This commit is contained in:
Benjamin Admin
2026-05-16 22:59:48 +02:00
parent 4f19310130
commit 7e426c31f1
9 changed files with 427 additions and 144 deletions
@@ -0,0 +1 @@
"""Named CMP library — one module per platform, loaded by _registry.py."""
@@ -0,0 +1,55 @@
"""
CMP Library Registry — discovers and registers all CMP modules.
Each CMP module exports two module-level symbols:
- MATCHER: a compiled regex matching the JSON endpoint URL
- reconstruct(data: dict) -> str: builds the cookie-policy text from JSON
The registry auto-discovers:
1. Hand-written modules: epaas, onetrust, cookiebot, usercentrics,
didomi, trustarc
2. Auto-promoted modules: any file matching `auto_*.py` in this folder
(created by Phase E when an LLM successfully discovers a new pattern)
A consent-tester restart picks up new auto_*.py files automatically.
"""
from __future__ import annotations
import importlib
import logging
import pkgutil
from pathlib import Path
from typing import Callable
logger = logging.getLogger(__name__)
# (cmp_name, url_pattern, reconstruct_fn)
Registry = list[tuple[str, "object", Callable[[dict], str]]]
def load_all() -> Registry:
"""Import every module in this package and collect MATCHER + reconstruct."""
import services.cmp_library as pkg # type: ignore[import-not-found]
registry: Registry = []
pkg_path = Path(pkg.__file__).parent
for module_info in pkgutil.iter_modules([str(pkg_path)]):
name = module_info.name
if name.startswith("_"):
continue
try:
module = importlib.import_module(f"services.cmp_library.{name}")
matcher = getattr(module, "MATCHER", None)
reconstruct = getattr(module, "reconstruct", None)
if matcher is None or not callable(reconstruct):
logger.warning(
"CMP module %s missing MATCHER or reconstruct() — skipped", name,
)
continue
registry.append((name, matcher, reconstruct))
logger.info("CMP loaded: %s", name)
except Exception as e:
logger.warning("CMP module %s failed to load: %s", name, e)
return registry
@@ -0,0 +1,46 @@
"""Cookiebot (by Usercentrics A/S — separate product from Usercentrics CMP).
URLs (multiple shapes observed):
- consent.cookiebot.com/<id>/cc.js (JSONP-wrapped)
- consent.cookiebot.com/uc.js?... (JSONP)
- consent.cookiebot.com/<id>/cd.js (cookie declaration)
We accept any URL on consent.cookiebot.com that returns JSON-like data.
The capture pipeline JSON-decodes; if it's JSONP we'd need to strip the
callback wrapper. For now we match only direct JSON responses.
Schema (cookiedeclaration JSON):
Categories: list with name + cookies (each with name, vendor, expires)
"""
import re
MATCHER = re.compile(r"consent\.cookiebot\.com/.*\.(?:json|js)(?:\?|$)", re.I)
def reconstruct(d: dict) -> str:
parts: list[str] = ["# Cookie-Richtlinie (Cookiebot)"]
cats = d.get("Categories") or d.get("categories") or []
for cat in cats:
name = cat.get("Name") or cat.get("name") or ""
desc = cat.get("Description") or cat.get("description") or ""
parts.append("")
parts.append(f"## {name}")
if desc:
parts.append(desc)
cookies = cat.get("Cookies") or cat.get("cookies") or []
for c in cookies[:50]:
cn = c.get("Name") or c.get("name") or ""
vendor = c.get("Vendor") or c.get("vendor") or ""
expires = c.get("Expires") or c.get("expires") or ""
purpose = c.get("Purpose") or c.get("purpose") or ""
line = f"- {cn}"
if vendor:
line += f" ({vendor})"
if purpose:
line += f"{purpose[:120]}"
if expires:
line += f" — Speicherdauer: {expires}"
parts.append(line)
return "\n".join(parts)
@@ -0,0 +1,51 @@
"""Didomi CMP.
URLs:
- sdk.privacy-center.org/<id>/notice/<lang>.json
- api.privacy-center.org/v1/notices/...
Schema: app.vendors[], app.purposes[], notice texts
"""
import re
MATCHER = re.compile(
r"(?:sdk|api)\.privacy-center\.org/.+/notice[s]?/.*\.json(?:\?|$)", re.I,
)
def reconstruct(d: dict) -> str:
parts: list[str] = ["# Cookie-Richtlinie (Didomi)"]
app = d.get("app", d) or {}
notice = d.get("notice", {}) or app.get("notice", {}) or {}
for key in ("content", "title", "subtitle"):
v = notice.get(key)
if v:
parts.append("")
parts.append(str(v))
purposes = app.get("purposes") or d.get("purposes") or []
if purposes:
parts.append("")
parts.append("## Zwecke")
for p in purposes:
name = p.get("name") or p.get("id") or ""
desc = p.get("description") or ""
parts.append(f"- {name}: {desc[:200]}")
vendors = app.get("vendors") or d.get("vendors") or []
if vendors:
parts.append("")
parts.append(f"## Anbieter ({len(vendors)})")
for v in vendors[:80]:
name = v.get("name") or ""
country = v.get("country") or ""
policy = v.get("policyUrl") or ""
line = f"- {name}"
if country:
line += f" — Sitz: {country}"
if policy:
line += f" — Datenschutz: {policy}"
parts.append(line)
return "\n".join(parts)
@@ -0,0 +1,69 @@
"""BMW Group ePaaS (Enterprise Privacy as a Service).
URL: /epaas/prod/policypage/<tenant>/<config>/<locale>.epaas.json
Schema: policyPageMetadata + categories + providers
"""
import re
MATCHER = re.compile(r"/epaas/prod/policypage/.+\.epaas\.json(\?|$)", re.I)
_TAG_RE = re.compile(r"<[^>]+>")
_WS_RE = re.compile(r"\s+")
def _clean(text: str) -> str:
no_tags = _TAG_RE.sub(" ", text)
no_tags = (no_tags
.replace("&nbsp;", " ").replace("&amp;", "&")
.replace("&lt;", "<").replace("&gt;", ">")
.replace("&quot;", '"').replace("&#39;", "'"))
return _WS_RE.sub(" ", no_tags).strip()
def reconstruct(d: dict) -> str:
meta = d.get("policyPageMetadata", {}) or {}
parts: list[str] = ["# Cookie-Richtlinie"]
for key in ("heading", "subHeading", "prologue", "dataController", "epilogue"):
val = meta.get(key)
if val:
parts.append("")
parts.append(_clean(str(val)))
cats = d.get("categories", []) or []
if cats:
parts.append("")
parts.append("## Cookie-Kategorien")
for c in cats:
name = c.get("name") or c.get("id") or ""
desc = c.get("description") or c.get("descriptionHtml") or ""
parts.append("")
parts.append(f"### {name}")
parts.append(_clean(str(desc)))
providers = d.get("providers", []) or []
if providers:
parts.append("")
parts.append(f"## Anbieter ({len(providers)})")
for p in providers:
name = p.get("name") or p.get("id") or ""
purpose = (p.get("purpose") or "").strip()
country = (p.get("country") or "").strip()
persistence = (p.get("persistencePurposeDescription") or "").strip()
line = f"- {name}"
if purpose:
line += f" — Zweck: {purpose}"
if country:
line += f" — Sitz: {country}"
if persistence:
line += f" — Speicherdauer: {persistence[:120]}"
parts.append(line)
if meta.get("expiresAfter"):
parts.append("")
parts.append(f"Speicherdauer: {meta['expiresAfter']}")
if meta.get("persistencePurposeText"):
parts.append(_clean(str(meta["persistencePurposeText"])))
return "\n".join(parts)
@@ -0,0 +1,56 @@
"""OneTrust Cookie Consent.
URL: cdn.cookielaw.org/consent/<id>/<id>.json
OR cdn.cookielaw.org/consent/<id>/<lang>.json
Schema: Groups[] with GroupName, GroupDescription, Cookies[]
"""
import re
MATCHER = re.compile(r"cdn\.cookielaw\.org/consent/[^/]+/[^/]+\.json", re.I)
_TAG_RE = re.compile(r"<[^>]+>")
_WS_RE = re.compile(r"\s+")
def _clean(text: str) -> str:
no_tags = _TAG_RE.sub(" ", text)
no_tags = no_tags.replace("&nbsp;", " ").replace("&amp;", "&")
return _WS_RE.sub(" ", no_tags).strip()
def reconstruct(d: dict) -> str:
parts: list[str] = ["# Cookie-Richtlinie (OneTrust)"]
# Optional preamble fields
for key in ("Description", "PolicyText", "PolicyDescription"):
val = d.get(key)
if val:
parts.append("")
parts.append(_clean(str(val)))
groups = d.get("Groups") or d.get("groups") or []
for g in groups:
name = g.get("GroupName") or g.get("name") or ""
desc = g.get("GroupDescription") or g.get("description") or ""
parts.append("")
parts.append(f"## {name}")
if desc:
parts.append(_clean(str(desc)))
cookies = g.get("Cookies") or g.get("cookies") or []
for c in cookies[:50]:
cn = c.get("Name") or c.get("name") or ""
cp = c.get("Provider") or c.get("provider") or ""
cd = c.get("description") or c.get("Description") or ""
ce = c.get("Length") or c.get("expires") or ""
line = f"- {cn}"
if cp:
line += f" ({cp})"
if cd:
line += f"{cd[:120]}"
if ce:
line += f" — Speicherdauer: {ce}"
parts.append(line)
return "\n".join(parts)
@@ -0,0 +1,53 @@
"""TrustArc / TRUSTe CMP.
URLs:
- consent.trustarc.com/v2/notice/<id>
- cookie-pref.trustarc.com/...
Schema varies; typically categories[] + vendors[]
"""
import re
MATCHER = re.compile(
r"(?:consent|cookie-pref|tr-cdn)\.trustarc\.com/.+\.(?:json|js)(?:\?|$)", re.I,
)
def reconstruct(d: dict) -> str:
parts: list[str] = ["# Cookie-Richtlinie (TrustArc)"]
for key in ("title", "summary", "description", "intro"):
v = d.get(key)
if v:
parts.append("")
parts.append(str(v))
cats = d.get("categories") or d.get("Categories") or []
for c in cats:
name = c.get("name") or c.get("Name") or ""
desc = c.get("description") or c.get("Description") or ""
parts.append("")
parts.append(f"## {name}")
if desc:
parts.append(str(desc))
cookies = c.get("cookies") or c.get("Cookies") or []
for ck in cookies[:50]:
cn = ck.get("name") or ck.get("Name") or ""
cp = ck.get("purpose") or ck.get("Purpose") or ""
ce = ck.get("expires") or ck.get("Expires") or ""
line = f"- {cn}"
if cp:
line += f"{cp[:120]}"
if ce:
line += f" — Speicherdauer: {ce}"
parts.append(line)
vendors = d.get("vendors") or d.get("Vendors") or []
if vendors:
parts.append("")
parts.append(f"## Anbieter ({len(vendors)})")
for v in vendors[:80]:
name = v.get("name") or v.get("Name") or ""
parts.append(f"- {name}")
return "\n".join(parts)
@@ -0,0 +1,52 @@
"""Usercentrics CMP.
URLs:
- api.usercentrics.eu/settings/<id>/<lang>.json
- app.usercentrics.eu/api/...
Schema: services[] with dataProcessor, dataPurpose, cookieMaxAgeSeconds
"""
import re
MATCHER = re.compile(r"(?:api|app)\.usercentrics\.eu/.+\.json(?:\?|$)", re.I)
def reconstruct(d: dict) -> str:
parts: list[str] = ["# Cookie-Richtlinie (Usercentrics)"]
services = d.get("services") or d.get("dataProcessingServices") or []
if not services and isinstance(d.get("settings"), dict):
services = d["settings"].get("services") or []
for s in services:
name = s.get("name") or s.get("dataProcessor") or ""
purpose = s.get("dataPurpose") or s.get("purpose") or ""
desc = s.get("description") or ""
country = s.get("processingCompanyCountry") or s.get("country") or ""
max_age = s.get("cookieMaxAgeSeconds")
retention = s.get("retentionPeriodDescription") or ""
parts.append("")
parts.append(f"## {name}")
if desc:
parts.append(desc)
if purpose:
parts.append(f"Zweck: {purpose}")
if country:
parts.append(f"Sitz: {country}")
if max_age:
days = max_age // 86400 if isinstance(max_age, int) else max_age
parts.append(f"Speicherdauer: {days} Tage")
if retention:
parts.append(f"Aufbewahrung: {retention}")
categories = d.get("categories") or []
for c in categories:
name = c.get("name") or ""
desc = c.get("description") or ""
parts.append("")
parts.append(f"## Kategorie: {name}")
if desc:
parts.append(desc)
return "\n".join(parts)