feat(consent+report): P56-P67 Mercedes-Audit-Cycle (Anti-Audit, Phase G Vendors, Cookie-Behavior-Validator + 5 Mail-Polish-Items) [migration-approved]
CI / detect-changes (push) Successful in 11s
CI / branch-name (push) Has been skipped
CI / nodejs-build (push) Successful in 2m19s
CI / test-go (push) Has been skipped
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 16s
CI / loc-budget (push) Failing after 15s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 37s

P56  Anti-Auditing-Detection als constructive Compliance-Finding (Audit-API-
     Empfehlung statt Anklage, weil Mercedes berechtigt Bots blockiert)
P57  Phase G vendor_details Union mit cmp_vendors -> 42 Anbieter sichtbar
P58  Anti-Audit-Detection robuster (Script-Domain-Check + Settings-spezifisch)
P59  Cookie-Behavior-Validator (4 Layer, 3-Tier-Severity: MEDIUM=Kategorie-
     Mismatch / HIGH=Zweck-Mismatch / CRITICAL=beide=Vorsatz-Indiz)
     + Open Cookie Database (CC0) als Library-Seed (2264 Cookies)
P59b Cookie-Behavior in Banner-Check verdrahtet + Mail-Block (BUGFIX:
     SessionLocal selbst oeffnen, db war im Background-Task nicht im Scope)

Mail-Polish nach Mercedes-Review:
P63  Banner-Footer-Links auch im wb7-link/role=link erkennen (Shadow-DOM-
     Walker label-based statt nur <a href>)
P64  Re-Access-Severity: MEDIUM statt HIGH, wenn Footer "Einstellungen" oder
     Mercedes-typisch existiert; OEM-Footer-Detection (wb7-footer)
P65  Text-Truncation: Word-Boundary statt Zeichen-Cut (kein "einfa"-Bruch
     mehr in Sofortmassnahmen)
P66  GF-Aktionen: Service-Zweck vs Cookie-Zweck explizit erklaert
     (haeufige Verwechslung Marketing/GF: "Akamai-Beschreibung" != Cookie-
     Zweck pro DSK-OH 2024)
P67  Stirring-Finding mit "Verlust-Framing"-Erklaerung + Alt-vs-Neutral-
     Beispiel, statt nur EDPB-Fachbegriff

Compliance-Advisor FAQ (admin agent-core/soul):
  + CNIL/EDPB Top-Bussgelder (Google 100M, Meta 60M, Amazon 35M)
  + Deutsche Praezedenz (LG Muenchen Google Fonts, EuGH Planet49, BGH I ZR 7/16)
  + 4 Risiko-Pfade (Bussgeld/Abmahnung/Sammelklage/NOYB) + Berechnungs-Methodik

Document-Generator Templates: AGB-DE (142), Impressum (140), Widerrufs-
formular-Anlage (143), DSR-Process-Dedup (139), Cookie-Library (144).

Architektur: doc_action_mappings.py + banner_dom_walkers.py +
cookie_behavior_validator.py + vendor_detail_extractor.py rausgezogen,
um die 500-LOC-Caps in agent_doc_check_report.py und
banner_text_checker.py einzuhalten.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-21 06:28:25 +02:00
parent badb356740
commit 57c0f940a2
38 changed files with 3656 additions and 116 deletions
+6
View File
@@ -50,6 +50,9 @@ class ScanResponse(BaseModel):
completeness_pct: int = 0
correctness_pct: int = 0
tcf_vendors: list = [] # Resolved TCF vendor list from GVL
cmp_payloads: list[dict] = [] # P48: raw CMP JSON-payloads (Usercentrics/OneTrust/...) captured during scan
vendor_details: list[dict] = [] # P50: per-vendor detail-modal-extracts (Beschreibung/Cookies/Opt-Out/Privacy)
cookies_detailed: list[dict] = [] # P59b: full cookie details for behavior-validation (name,value,domain,expires,phase,declared_category)
@app.get("/health")
@@ -127,6 +130,9 @@ async def scan_consent(req: ScanRequest):
"provider_details_visible": getattr(ct, "provider_details_visible", False),
"cookies_set": ct.cookies_set,
} for ct in result.category_tests] if result.category_tests else [],
cmp_payloads=result.cmp_payloads, # P48
vendor_details=result.vendor_details, # P50
cookies_detailed=result.cookies_detailed, # P59b
)
@@ -383,13 +383,23 @@ async def run_advanced_checks(page, banner_text: str) -> list[Violation]:
]
for pattern, label in stirring_patterns:
if pattern in banner_lower:
# P67: konkrete Erklaerung statt nur Fachbegriff. Marketing/GF
# versteht "Stirring" nicht — aber "Verlust-Framing" versteht
# jeder, und der Vergleich (alt vs neutral) macht es greifbar.
violations.append(Violation(
service="Cookie-Banner",
severity="LOW",
text=f"Emotionale Sprache im Banner: {label}. "
f"Solche Formulierungen koennen als 'Stirring' (emotionale Manipulation) "
f"gewertet werden und die Freiwilligkeit der Einwilligung beeintraechtigen.",
legal_ref="EDPB Guidelines 3/2022 (Deceptive Design: Stirring), Art. 7(4) DSGVO",
text=f"Verlust-Framing im Banner-Text: {label}. "
f"Diese Formulierung suggeriert, dass das Nicht-Zustimmen "
f"eine schlechtere ('nicht-optimale') Nutzung bedeutet — "
f"selbst wenn die Website ohne Cookies technisch genauso "
f"funktioniert. Die EDPB (3/2022) nennt das 'Stirring': "
f"emotionale Hebel statt informierter Entscheidung. "
f"Empfehlung: neutrale Sprache ('Nutzung unserer Website') "
f"statt qualitativer Bewertung ('optimal', 'voll', "
f"'bestmoeglich').",
legal_ref="EDPB Guidelines 3/2022 (Deceptive Design: Stirring), "
"Art. 7(4) DSGVO (Freiwilligkeit)",
))
break # One finding is enough
@@ -17,6 +17,32 @@ class BannerInfo:
reject_selector: str
# P22: Web-Component-CMPs (Banner ist im Shadow-DOM eines custom-element).
# Standard-Selektoren greifen nicht — Detection ueber Tag-Name.
WEB_COMPONENT_CMP_TAGS = [
{
"tag": "cmm-cookie-banner",
"provider": "Mercedes (cmm-cookie-banner)",
# Mercedes-Banner-Buttons im Shadow: "Alle akzeptieren" /
# "Nur technisch notwendige" / "Einstellungen"
"accept_text": "Alle akzeptieren",
"reject_text": "Nur technisch notwendige",
},
{
"tag": "cookie-consent-banner",
"provider": "Generic Web Component (cookie-consent-banner)",
"accept_text": "akzeptieren|accept|zustimmen",
"reject_text": "ablehnen|reject|notwendig",
},
{
"tag": "consent-banner",
"provider": "Generic Web Component (consent-banner)",
"accept_text": "akzeptieren|accept",
"reject_text": "ablehnen|reject",
},
]
# CMP-specific selectors (ordered by market share)
CMP_SELECTORS = [
{
@@ -409,6 +435,23 @@ async def _detect_generic_attr(page: Page) -> BannerInfo | None:
async def detect_banner(page: Page) -> BannerInfo:
"""Detect which CMP is used and return button selectors."""
# P22: Web-Component-CMPs (Mercedes etc.) — direkter Tag-Check.
# Shadow-DOM-Buttons werden via shadow-click:<pattern>-Selektor angesprochen.
for wc in WEB_COMPONENT_CMP_TAGS:
try:
count = await page.evaluate(
"(tag) => document.querySelectorAll(tag).length",
wc["tag"],
)
if count > 0:
return BannerInfo(
detected=True, provider=wc["provider"],
accept_selector=f"shadow-click:{wc['accept_text']}",
reject_selector=f"shadow-click:{wc['reject_text']}",
)
except Exception:
continue
# 1. Try CMP-specific selectors
for cmp in CMP_SELECTORS:
try:
@@ -0,0 +1,108 @@
"""
Browser-side DOM walkers for Web-Component CMPs and OEM design-systems.
Centralizes the JavaScript snippets used by banner_text_checker.py so the
checker file stays under the 500-LOC cap. Each function returns a JS string
that Playwright passes to `page.evaluate()`.
Two walkers:
* SHADOW_BANNER_WALKER_JS pierces shadow DOM (Mercedes cmm-cookie-banner,
BMW cookie-consent-banner, etc.) and extracts banner text + label-based
legal links (P63 recognizes wb7-link/role=link/button, not just
<a href>, since OEM design-systems wrap navigation).
* FOOTER_LABELS_WALKER_JS collects unique footer link labels from any
candidate footer root (footer, [role=contentinfo], wb7-footer, ...) with
a bottom-25%-of-viewport fallback (P64).
"""
from __future__ import annotations
SHADOW_BANNER_WALKER_JS = """() => {
const LEGAL_KW = {
impressum: ['impressum','imprint','legal notice','mentions legales','colophon'],
dse: ['datenschutz','privacy','dsgvo','data protection','politique de confidentialite'],
};
function isLegalLabel(txt) {
const t = (txt||'').toLowerCase();
if (!t || t.length > 60) return null;
for (const k of LEGAL_KW.impressum) if (t.includes(k)) return 'impressum';
for (const k of LEGAL_KW.dse) if (t.includes(k)) return 'dse';
return null;
}
function walk(root, acc) {
if (!root) return;
const all = root.querySelectorAll ? root.querySelectorAll('*') : [];
for (const el of all) {
if (el.shadowRoot) walk(el.shadowRoot, acc);
}
const tags = ['cmm-cookie-banner', 'cookie-consent-banner',
'consent-banner', 'cookie-banner', 'cmp-banner',
'ot-banner', 'usercentrics-banner'];
for (const tag of tags) {
const els = root.querySelectorAll ? root.querySelectorAll(tag) : [];
for (const el of els) {
if (el.shadowRoot) {
const txt = (el.shadowRoot.textContent || '').trim();
if (txt) acc.text += ' ' + txt;
const links = el.shadowRoot.querySelectorAll('a[href]');
for (const a of links) {
acc.links.push({
href: (a.getAttribute('href') || '').toLowerCase(),
text: (a.textContent || '').trim().toLowerCase(),
});
}
const cands = el.shadowRoot.querySelectorAll(
'wb7-link, wb7-button, [role="link"], button, span, a'
);
for (const c of cands) {
const label = (c.textContent || '').trim();
const which = isLegalLabel(label);
if (which) {
const href = (c.getAttribute('href') ||
c.getAttribute('data-href') ||
c.getAttribute('data-uri') || '').toLowerCase();
acc.links.push({
href: href || ('#shadow-' + which),
text: label.toLowerCase(),
});
}
}
}
}
}
}
const acc = { text: '', links: [] };
walk(document, acc);
return acc;
}"""
FOOTER_LABELS_WALKER_JS = """() => {
const out = new Set();
const roots = [
...document.querySelectorAll(
'footer, [role="contentinfo"], ' +
'wb7-footer, wb-footer, b-footer, cmm-footer, ' +
'[class*="footer" i], [id*="footer" i]'
)
];
if (roots.length === 0) {
const viewH = window.innerHeight;
for (const el of document.querySelectorAll('a, button, [role="link"], wb7-link')) {
const r = el.getBoundingClientRect();
if (r.top > viewH * 0.75) roots.push(el.parentElement);
}
}
for (const root of roots) {
if (!root) continue;
const cands = root.querySelectorAll('a, button, [role="link"], wb7-link, wb7-button');
let n = 0;
for (const c of cands) {
if (n++ > 80) break;
const t = (c.textContent || '').trim().toLowerCase();
if (t && t.length < 60) out.add(t);
}
}
return [...out];
}"""
+119 -32
View File
@@ -19,6 +19,10 @@ import logging
from services.script_analyzer import Violation
from services.banner_advanced_checks import run_advanced_checks
from services.banner_dom_walkers import (
SHADOW_BANNER_WALKER_JS,
FOOTER_LABELS_WALKER_JS,
)
logger = logging.getLogger(__name__)
@@ -62,6 +66,21 @@ async def check_banner_text(page) -> dict:
except Exception:
continue
# P28a + P63: Shadow-DOM Web Component CMPs (Mercedes cmm-cookie-banner,
# BMW cookie-consent-banner). Walker pierces shadow tree + extracts
# label-based legal links (wb7-link/button/role=link). See
# banner_dom_walkers.SHADOW_BANNER_WALKER_JS.
if not banner_text or not banner_links:
try:
shadow_data = await page.evaluate(SHADOW_BANNER_WALKER_JS)
if shadow_data and isinstance(shadow_data, dict):
if shadow_data.get("text"):
banner_text = (banner_text + " " + shadow_data["text"]).strip()
if shadow_data.get("links"):
banner_links.extend(shadow_data["links"])
except Exception:
pass
if not banner_text:
return {"violations": violations, "has_impressum": False, "has_dse": False}
@@ -134,17 +153,38 @@ async def check_banner_text(page) -> dict:
))
break
# Check 4: Reject button visible (no hidden reject)
reject_texts = ["ablehnen", "reject", "nur notwendige", "alle ablehnen", "decline"]
has_visible_reject = any(t in banner_lower for t in reject_texts)
if not has_visible_reject:
# P28b Check 4: Reject mechanism present + explicit-labeled?
# HIGH = no reject mechanism at all
# MEDIUM = reject available but not labeled "Ablehnen"/"Reject"
# (e.g. only "Nur technisch Notwendige" — semantically
# a reject but EDPB 5/2020 + DSK-OH 2024 prefer explicit
# labeling so users recognize it as the reject option)
explicit_reject_texts = ["ablehnen", "reject", "alle ablehnen",
"decline", "alles ablehnen"]
implicit_reject_texts = ["nur notwendige", "nur technisch", "nur essenzielle",
"nur essentielle", "notwendige akzeptieren",
"essential only", "only necessary",
"nur erforderliche"]
has_explicit_reject = any(t in banner_lower for t in explicit_reject_texts)
has_implicit_reject = any(t in banner_lower for t in implicit_reject_texts)
if not has_explicit_reject and not has_implicit_reject:
violations.append(Violation(
service="Cookie-Banner",
severity="HIGH",
text="Kein sichtbarer 'Ablehnen'-Button im Banner erkannt. "
text="Kein 'Ablehnen'-Mechanismus im Banner erkannt. "
"Die Ablehnung muss ebenso einfach sein wie die Zustimmung.",
legal_ref="§25 Abs. 1 TDDDG, EDPB Guidelines 05/2020 (Consent)",
))
elif not has_explicit_reject and has_implicit_reject:
violations.append(Violation(
service="Cookie-Banner",
severity="MEDIUM",
text="Reject-Moeglichkeit vorhanden ('Nur technisch Notwendige' o.ae.), "
"aber nicht als 'Ablehnen' beschriftet. Nutzer erkennen 'Ablehnen' "
"schneller als sprachlich umschriebene Varianten. "
"Empfehlung: zusaetzlich 'Ablehnen' als Button-Label.",
legal_ref="EDPB 5/2020 (Consent) + DSK-OH 2024 (Telemedien)",
))
# Check 5: Pre-ticked checkboxes (EuGH Planet49)
try:
@@ -210,7 +250,8 @@ async def check_banner_text(page) -> dict:
accept_btn = None
reject_btn = None
accept_kw = ["akzeptieren", "accept", "zustimmen", "agree", "einverstanden", "ok"]
reject_kw = ["ablehnen", "reject", "notwendige", "decline", "nein"]
reject_kw = ["ablehnen", "reject", "notwendige", "decline", "nein",
"technisch", "essenzielle", "essential", "erforderliche"]
for btn in button_info:
text_lower = btn["text"].lower()
@@ -245,44 +286,90 @@ async def check_banner_text(page) -> dict:
# Check 7: Cookie Wall — does rejecting block the site?
# (This is checked in Phase B — if after reject the page is not navigable)
# Check 8: Re-access to settings (Art. 7(3) — revocation as easy as consent)
# P29 Check 8: Re-access to cookie settings (Art. 7(3) DSGVO).
# Three quality tiers:
# OK = persistent floating cookie icon OR explicit-labeled
# footer link ("Cookie-Einstellungen", "Cookie-Richtlinie",
# "Cookies verwalten", etc.)
# MEDIUM = re-access only via ambiguous label (e.g. "Einstellungen"
# alone — could mean theme/language) OR only via
# cookies.html doc link (not a settings dialog)
# HIGH = no re-access mechanism found at all
try:
settings_accessible = False
settings_selectors = [
'[class*="cookie-settings"]', '[class*="privacy-settings"]',
'a[href*="cookie"]', 'a[href*="datenschutz-einstellungen"]',
'[class*="consent-settings"]', '#ot-sdk-btn',
'.cky-btn-revisit', '#CybotCookiebotDialogBodyButtonDetails',
'[data-testid="uc-footer-link"]',
has_floating_icon = False
floating_selectors = [
".cky-btn-revisit", "#ot-sdk-btn", "#ot-sdk-btn-floating",
"[class*='ot-floating']", "[class*='cookie-floating']",
"[id*='cookiebot-renew']", "[class*='cmp-floating']",
"[id*='cmplz-cookiebanner-status']", ".uc-cookie-settings-trigger",
"[class*='consent-floating']", "[data-testid*='cookie-revisit']",
]
for sel in settings_selectors:
for sel in floating_selectors:
try:
if await page.locator(sel).count() > 0:
settings_accessible = True
has_floating_icon = True
break
except Exception:
continue
# Also check footer for cookie settings link
if not settings_accessible:
footer_text = ""
try:
footer = page.locator("footer").first
if await footer.count() > 0:
footer_text = (await footer.text_content() or "").lower()
except Exception:
pass
if any(kw in footer_text for kw in ["cookie-einstellungen", "cookie settings",
"datenschutz-einstellungen", "privacy settings"]):
settings_accessible = True
# Footer label inspection — distinguish explicit vs ambiguous
# P64: OEM design-systems (Mercedes wb7-footer, BMW b-footer) don't
# use <footer>. Scan via evaluate() with multiple candidate roots
# including page-bottom region as last-ditch fallback.
footer_labels: list[str] = []
try:
footer_labels = await page.evaluate(FOOTER_LABELS_WALKER_JS) or []
except Exception:
pass
if not settings_accessible:
# Explicit, unambiguous cookie/consent labels
explicit_patterns = [
"cookie-einstellungen", "cookie einstellungen",
"cookie-richtlinie", "cookie richtlinie",
"cookie-praeferenzen", "cookie preferences",
"cookies verwalten", "manage cookies",
"datenschutz-einstellungen", "privacy preferences",
"datenschutzeinstellungen", "datenschutz einstellungen",
"cookie consent", "consent settings",
"cookie-banner", "cookies anpassen",
# P64: OEM-typical labels
"tracking-einstellungen", "tracking einstellungen",
"cookie-zustimmung", "consent verwalten",
]
has_explicit_footer = any(
any(p in lbl for p in explicit_patterns)
for lbl in footer_labels
)
# Ambiguous labels — "Einstellungen" alone, generic "Cookies"
ambiguous_patterns = ["einstellungen", "settings", "cookies"]
has_ambiguous_footer = (not has_explicit_footer) and any(
lbl.strip() in ambiguous_patterns
or any(lbl.strip() == p for p in ambiguous_patterns)
for lbl in footer_labels
)
if has_floating_icon or has_explicit_footer:
pass # OK — no violation
elif has_ambiguous_footer:
violations.append(Violation(
service="Cookie-Banner",
severity="MEDIUM",
text="Kein erneuter Zugang zu Cookie-Einstellungen gefunden. "
"Der Widerruf der Einwilligung muss ebenso einfach sein wie "
"die Erteilung (Art. 7 Abs. 3 DSGVO).",
text="Re-Zugang zu Cookie-Einstellungen nur ueber mehrdeutiges "
"Footer-Label (z.B. 'Einstellungen' oder 'Cookies'). "
"Empfehlung: persistenten Cookie-Icon-Button (Floating) "
"oder explizites Footer-Label 'Cookie-Einstellungen', "
"'Cookie-Richtlinie' o.ae. damit Nutzer den Widerruf "
"ohne Suchen finden.",
legal_ref="Art. 7 Abs. 3 DSGVO + EDPB 5/2020 (informierte, "
"leicht widerrufbare Einwilligung)",
))
else:
violations.append(Violation(
service="Cookie-Banner",
severity="HIGH",
text="Kein erneuter Zugang zu Cookie-Einstellungen gefunden "
"(weder Floating-Icon noch Footer-Link). Widerruf muss "
"ebenso einfach sein wie Erteilung.",
legal_ref="Art. 7 Abs. 3 DSGVO (Widerruf so einfach wie Einwilligung)",
))
except Exception:
+90 -1
View File
@@ -214,6 +214,49 @@ async def detect_categories(page: Page, banner: BannerInfo) -> list[CategoryInfo
except Exception:
continue
# P22: Shadow-DOM-Fallback fuer Web-Component-CMPs (Mercedes cmm-cookie-banner).
# Sucht Checkboxes/Switches rekursiv durch alle shadowRoots.
if not categories:
try:
shadow_cats = await page.evaluate("""
() => {
const out = [];
function walk(root, depth) {
if (depth > 6) return;
for (const el of root.querySelectorAll('*')) {
if (el.shadowRoot) {
const sr = el.shadowRoot;
const inputs = sr.querySelectorAll('input[type=checkbox], [role=switch], [role=checkbox]');
for (const i of inputs) {
const lbl = (i.closest('label')?.textContent || i.getAttribute('aria-label') || '').trim();
if (lbl.length > 0) {
out.push({label: lbl.slice(0,60), host: el.tagName.toLowerCase()});
}
}
walk(sr, depth + 1);
}
}
}
walk(document, 0);
return out;
}
""")
for sc in (shadow_cats or []):
text_lower = sc["label"].lower()
for cat_name, keywords in CATEGORY_KEYWORDS.items():
if any(kw in text_lower for kw in keywords):
# Marker selector — toggling per shadow:cat:<label-pattern>
categories.append(CategoryInfo(
name=cat_name,
label=sc["label"][:50],
selector=f"shadow-toggle:{sc['label'][:50]}",
))
break
if categories:
logger.info("P22: %d shadow-DOM categories detected", len(categories))
except Exception as e:
logger.warning("Shadow-DOM category detection failed: %s", e)
# Generic fallback: search for toggle/checkbox elements with category keywords
if not categories:
try:
@@ -266,9 +309,55 @@ async def test_single_category(
scripts: list[str] = []
page.on("request", lambda req: _collect(req, scripts))
await page.goto(url, wait_until="networkidle", timeout=20000)
try:
await page.goto(url, wait_until="networkidle", timeout=20000)
except Exception:
await page.goto(url, wait_until="load", timeout=20000)
await page.wait_for_timeout(2000)
# P22: Shadow-DOM-Toggle fuer Web-Component-CMPs (Mercedes etc.)
if category.selector.startswith("shadow-toggle:"):
label_pat = category.selector[len("shadow-toggle:"):]
try:
await page.evaluate("""(pat) => {
const lbl = pat.toLowerCase();
function walk(root) {
for (const el of root.querySelectorAll('*')) {
if (el.shadowRoot) {
const inputs = el.shadowRoot.querySelectorAll(
'input[type=checkbox], [role=switch], [role=checkbox]');
for (const i of inputs) {
const t = (i.closest('label')?.textContent || i.getAttribute('aria-label') || '').toLowerCase();
if (t.includes(lbl) && !i.checked) { i.click(); return true; }
}
if (walk(el.shadowRoot)) return true;
}
}
return false;
}
walk(document);
}""", label_pat)
await page.wait_for_timeout(500)
# Save via accept-text "Speichern" / "Save" inside shadow
await page.evaluate("""() => {
const SAVE = /speichern|save|bestaetigen|confirm/i;
function walk(root) {
for (const el of root.querySelectorAll('*')) {
if (el.shadowRoot) {
for (const b of el.shadowRoot.querySelectorAll('button, [role=button]')) {
if (SAVE.test(b.textContent || '')) { b.click(); return true; }
}
if (walk(el.shadowRoot)) return true;
}
}
return false;
}
walk(document);
}""")
await page.wait_for_timeout(wait_ms)
except Exception as e:
logger.warning("Shadow-toggle for %s failed: %s", category.name, e)
config = CMP_CATEGORY_CONFIG.get(banner.provider)
if config:
+231 -14
View File
@@ -6,6 +6,7 @@ Phase B: After rejecting consent
Phase C: After accepting consent
"""
import asyncio
import logging
from dataclasses import dataclass, field
@@ -67,6 +68,15 @@ class ConsentTestResult:
deep_verification: dict = field(default_factory=dict)
# TCF vendors (resolved via GVL after accept phase)
tcf_vendors: list = field(default_factory=list)
# P48: CMP-Payloads captured during all phases (Usercentrics, OneTrust, etc.)
# — passed to backend for deterministic vendor extraction.
cmp_payloads: list = field(default_factory=list)
# P50: per-vendor detail-modal-extracts (description, opt-out, cookies etc.)
vendor_details: list = field(default_factory=list)
# P59b: full cookie details per phase (name, value, domain, expires)
# for behavior-validation in backend. Implicit declared_category:
# before/reject phase = essential (site claims), accept = any.
cookies_detailed: list = field(default_factory=list)
async def run_consent_test(
@@ -83,6 +93,13 @@ async def run_consent_test(
wait_ms = wait_secs * 1000
filter_cats = categories or []
# P48: Init CMP-Capture early so it attaches to every page/context.
# CMP JSON-Endpoints (Usercentrics, OneTrust, Cookiebot, ePaaS) are
# fetched once per page load — capture them across all 3 phases so
# the backend can do deterministic vendor extraction without LLM.
from services.cmp_extractor import CMPCapture
cmp_capture = CMPCapture()
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
@@ -91,6 +108,14 @@ async def run_consent_test(
"--disable-dev-shm-usage",
"--disable-blink-features=AutomationControlled",
"--window-size=1920,1080",
# P50c: Mercedes/Akamai Bot Manager crashed renderer
# without these (limits memory pressure + GPU init):
"--disable-gpu",
"--disable-software-rasterizer",
"--disable-background-timer-throttling",
"--disable-renderer-backgrounding",
"--disable-backgrounding-occluded-windows",
"--js-flags=--max-old-space-size=2048",
],
)
@@ -107,10 +132,28 @@ async def run_consent_test(
await page_a.add_init_script(_INTERCEPTOR_INIT)
if HAS_STEALTH:
await stealth_async(page_a)
cmp_capture.attach(page_a) # P48
scripts_a = []
page_a.on("request", lambda req: _collect_script(req, scripts_a))
await page_a.goto(url, wait_until="networkidle", timeout=30000)
# P50c: Mercedes/Akamai SPA never reaches networkidle.
# Use domcontentloaded + short JS-wait + retry on crash.
for _attempt in range(2):
try:
await page_a.goto(url, wait_until="domcontentloaded", timeout=20000)
await page_a.wait_for_timeout(3500)
break
except Exception as _e:
err = str(_e)[:120]
logger.warning("Phase A goto attempt %d failed: %s", _attempt + 1, err)
if "crashed" in err.lower() and _attempt == 0:
await page_a.wait_for_timeout(2000)
continue
try:
await page_a.goto(url, wait_until="load", timeout=20000)
except Exception:
pass
break
await page_a.wait_for_timeout(wait_ms)
# Deep verification: Phase A
@@ -127,7 +170,18 @@ async def run_consent_test(
logger.warning("Phase A deep verification failed: %s", exc)
result.before_scripts = _get_page_scripts(scripts_a)
result.before_cookies = _get_cookie_names(await ctx_a.cookies())
_cookies_a = await ctx_a.cookies()
result.before_cookies = _get_cookie_names(_cookies_a)
# P59b: capture full details — phase = "before" = implicit essential-claim
for ck in _cookies_a:
result.cookies_detailed.append({
"name": ck.get("name", ""),
"value": (ck.get("value") or "")[:200],
"domain": ck.get("domain", ""),
"expires": ck.get("expires"),
"phase": "before",
"declared_category": "essential",
})
result.before_tracking = find_tracking_services(result.before_scripts)
result.before_violations = find_violations_before_consent(result.before_scripts)
@@ -162,10 +216,15 @@ async def run_consent_test(
await page_b.add_init_script(_INTERCEPTOR_INIT)
if HAS_STEALTH:
await stealth_async(page_b)
cmp_capture.attach(page_b) # P48
scripts_b = []
page_b.on("request", lambda req: _collect_script(req, scripts_b))
await page_b.goto(url, wait_until="networkidle", timeout=30000)
try:
await page_b.goto(url, wait_until="domcontentloaded", timeout=20000)
except Exception as _e:
logger.warning("networkidle timeout, fallback to load: %s", str(_e)[:80])
await page_b.goto(url, wait_until="load", timeout=30000)
await page_b.wait_for_timeout(3000)
clicked = await click_button(page_b, banner.reject_selector)
@@ -189,7 +248,21 @@ async def run_consent_test(
logger.warning("Phase B deep verification failed: %s", exc)
result.reject_scripts = _get_page_scripts(scripts_b)
result.reject_cookies = _get_cookie_names(await ctx_b.cookies())
_cookies_b = await ctx_b.cookies()
result.reject_cookies = _get_cookie_names(_cookies_b)
# P59b: after-Reject = site claims these are essential
_before_names = {c.get("name", "") for c in _cookies_a}
for ck in _cookies_b:
if ck.get("name", "") in _before_names:
continue # already captured in 'before'
result.cookies_detailed.append({
"name": ck.get("name", ""),
"value": (ck.get("value") or "")[:200],
"domain": ck.get("domain", ""),
"expires": ck.get("expires"),
"phase": "reject",
"declared_category": "essential",
})
reject_tracking = find_tracking_services(result.reject_scripts)
result.reject_new_tracking = [t for t in reject_tracking if t not in result.before_tracking]
result.reject_violations = find_violations_after_reject(
@@ -210,10 +283,15 @@ async def run_consent_test(
await page_c.add_init_script(_INTERCEPTOR_INIT)
if HAS_STEALTH:
await stealth_async(page_c)
cmp_capture.attach(page_c) # P48
scripts_c = []
page_c.on("request", lambda req: _collect_script(req, scripts_c))
await page_c.goto(url, wait_until="networkidle", timeout=30000)
try:
await page_c.goto(url, wait_until="domcontentloaded", timeout=20000)
except Exception as _e:
logger.warning("networkidle timeout, fallback to load: %s", str(_e)[:80])
await page_c.goto(url, wait_until="load", timeout=30000)
await page_c.wait_for_timeout(3000)
clicked = await click_button(page_c, banner.accept_selector)
@@ -237,7 +315,21 @@ async def run_consent_test(
logger.warning("Phase C deep verification failed: %s", exc)
result.accept_scripts = _get_page_scripts(scripts_c)
result.accept_cookies = _get_cookie_names(await ctx_c.cookies())
_cookies_c = await ctx_c.cookies()
result.accept_cookies = _get_cookie_names(_cookies_c)
# P59b: post-Accept new cookies — declared "any" (consent given)
_seen_names = {c["name"] for c in result.cookies_detailed}
for ck in _cookies_c:
if ck.get("name", "") in _seen_names:
continue
result.cookies_detailed.append({
"name": ck.get("name", ""),
"value": (ck.get("value") or "")[:200],
"domain": ck.get("domain", ""),
"expires": ck.get("expires"),
"phase": "accept",
"declared_category": "", # unclear what category — consent given
})
accept_tracking = find_tracking_services(result.accept_scripts)
result.accept_new_tracking = [t for t in accept_tracking if t not in result.before_tracking]
@@ -263,7 +355,11 @@ async def run_consent_test(
page_cat = await ctx_cat.new_page()
if HAS_STEALTH:
await stealth_async(page_cat)
await page_cat.goto(url, wait_until="networkidle", timeout=20000)
try:
await page_cat.goto(url, wait_until="domcontentloaded", timeout=15000)
except Exception as _e:
logger.warning("networkidle timeout, fallback to load: %s", str(_e)[:80])
await page_cat.goto(url, wait_until="load", timeout=20000)
await page_cat.wait_for_timeout(2000)
detected_cats = await detect_categories(page_cat, banner)
@@ -280,17 +376,42 @@ async def run_consent_test(
)
if detected_cats:
logger.info("Testing %d categories individually", len(detected_cats))
for cat in detected_cats:
# P26: per-category 25s + phase budget 150s. Mercedes
# has 9 categories which would block the /scan well
# beyond the caller's 240s timeout. Skip rather than
# block — banner_quality + cmp_payloads matter more
# than per-cat detail.
import time # asyncio already imported at top (P50c)
phase_deadline = time.monotonic() + 90.0
# Dedup by name (some sites detect same cat 3x via
# shadow-DOM walk; testing each is wasteful)
seen_names: set[str] = set()
unique_cats = [c for c in detected_cats
if not (c.name in seen_names or seen_names.add(c.name))]
logger.info("Testing %d unique categories (budget=90s, per-cat=15s)",
len(unique_cats))
for cat in unique_cats:
if time.monotonic() >= phase_deadline:
logger.warning("Category phase budget exhausted, "
"skipping remaining %d categories",
len(unique_cats) - len(result.category_tests))
break
cat_ctx = await browser.new_context(
user_agent=USER_AGENT,
viewport={"width": 1920, "height": 1080},
locale="de-DE",
timezone_id="Europe/Berlin",
)
cat_result = await test_single_category(cat_ctx, url, cat, banner, wait_ms)
result.category_tests.append(cat_result)
await cat_ctx.close()
try:
cat_result = await asyncio.wait_for(
test_single_category(cat_ctx, url, cat, banner, wait_ms),
timeout=15.0,
)
result.category_tests.append(cat_result)
except asyncio.TimeoutError:
logger.warning("Category '%s' timed out after 15s, skipping", cat.name)
finally:
await cat_ctx.close()
else:
logger.info("No categories detected — skipping per-category tests")
@@ -298,15 +419,111 @@ async def run_consent_test(
except Exception as cat_err:
logger.warning("Category tests failed (non-blocking): %s", cat_err)
# ── P56: Anti-Auditing-Detection (vor Phase G) ─────────
# Marker erfassen → bei aktivem Bot-Block Phase G überspringen
# (TDM-Respekt) UND HIGH-Finding für Transparenz-Verstoss.
try:
from services.vendor_detail_extractor import _detect_anti_audit
anti = await _detect_anti_audit(page_c)
if anti.get("bot_protection"):
result.banner_text_violations.append(Violation(
service="Cookie-Banner",
severity="LOW",
text=f"Hinweis: {anti['bot_protection']} ist aktiv und blockiert "
f"automatisierte Compliance-Audits. Fuer Endnutzer voll "
f"funktional. Empfehlung: Audit-API bereitstellen damit "
f"unabhaengige Pruefer (Aufsichtsbehoerden, DSB) maschinen"
f"lesbar verifizieren koennen — staerkt Vertrauen ohne "
f"Bot-Schutz zu reduzieren.",
legal_ref="Rechenschaftspflicht Art. 5(2) DSGVO, "
"Transparenz-Empfehlung DSK-OH 2024",
))
if anti.get("user_select_none"):
result.banner_text_violations.append(Violation(
service="Cookie-Banner",
severity="MEDIUM",
text="Banner-Settings-Oberflaeche nicht per Maus kopierbar "
"(CSS user-select:none). Endnutzer koennen sich Cookie-Listen "
"+ Anbieter nicht einfach archivieren. Info-Modals pro Vendor "
"sind hingegen kopierbar — bitte gleiches Verhalten auch "
"auf der Uebersichtsseite ermoeglichen.",
legal_ref="Art. 12(1) DSGVO (transparente Information), "
"DSK-OH Telemedien 2024 (Informations-Festhalten)",
))
if anti.get("tdm_meta"):
logger.info("Anti-Audit: TDM opt-out meta-tag detected: %s",
anti["tdm_meta"])
except Exception as e:
logger.debug("Anti-Audit detection skipped: %s", e)
# ── Phase G: Per-Vendor Detail-Extraction (P50) ─────────
# After Accept, re-open banner and click each Info-button
# to capture detail-modal text. Detail-XHRs also captured
# by CMPCapture (still attached). Runs only if Banner was
# detected and an accept_text is known.
if result.banner_detected and banner is not None:
try:
from services.vendor_detail_extractor import (
extract_vendor_details,
)
accept_sel = banner.accept_selector or None
logger.info("Phase G: starting vendor-detail-extract (max 50 vendors)")
vd = await asyncio.wait_for(
extract_vendor_details(
browser, url,
accept_selector=accept_sel,
max_vendors=50,
),
timeout=600.0, # 10min hard cap
)
# Serialise dataclasses to plain dicts for JSON-Response
for v in vd:
result.vendor_details.append({
"name": v.name,
"description": v.description,
"processing_company": v.processing_company,
"address": v.address,
"purposes": v.purposes,
"technologies": v.technologies,
"cookies": v.cookies,
"retention": v.retention,
"opt_out_url": v.opt_out_url,
"privacy_url": v.privacy_url,
"raw_text": v.raw_text,
})
logger.info("Phase G complete: %d vendor-details captured",
len(result.vendor_details))
except asyncio.TimeoutError:
logger.warning("Phase G: hard timeout reached (10min)")
except Exception as vd_err:
logger.warning("Phase G failed (non-blocking): %s", vd_err)
except Exception as e:
logger.error("Consent test failed: %s", e)
finally:
await browser.close()
# P48: collect CMP-payloads captured during all phases. CMPCapture
# stores them as tuples (cmp_name, data). Convert to dicts that
# match the format used by /dsi-discovery so backend can process
# them with extract_vendors_from_payloads(). Dedup by-data not
# by-URL since CMPCapture doesn't store the URL.
seen_keys: set[str] = set()
for cmp_name, data in cmp_capture.payloads:
# Dedup key: cmp_name + length-of-data + first few JSON keys
try:
sig = f"{cmp_name}:{len(str(data))}:{','.join(sorted(list(data.keys())[:5]) if isinstance(data, dict) else [])}"
except Exception:
sig = f"{cmp_name}:{id(data)}"
if sig in seen_keys:
continue
seen_keys.add(sig)
result.cmp_payloads.append({"kind": cmp_name, "data": data})
logger.info(
"Consent test complete: banner=%s, violations_before=%d, violations_reject=%d, categories=%d",
"Consent test complete: banner=%s, violations_before=%d, violations_reject=%d, categories=%d, cmp_payloads=%d",
result.banner_provider, len(result.before_violations), len(result.reject_violations),
len(result.category_tests),
len(result.category_tests), len(result.cmp_payloads),
)
return result
+3 -1
View File
@@ -25,7 +25,9 @@ async def goto_resilient(page: Page, url: str, timeout: int = 60000) -> None:
except PlaywrightTimeout:
logger.debug("networkidle timeout for %s, falling back to domcontentloaded", url)
await page.goto(url, wait_until="domcontentloaded", timeout=timeout)
await page.wait_for_timeout(5000) # extra wait for JS rendering
# P23: Web-Component-Footer (Mercedes wbx, BMW similar) braucht laenger.
# 5s -> 8s damit Vue/Web-Component-Footer-Links sichtbar werden.
await page.wait_for_timeout(8000)
async def try_dismiss_consent_banner(page: Page) -> bool:
@@ -0,0 +1,675 @@
"""
Phase D Per-Vendor Detail Extraction (P50).
After Accept (Phase C) the banner contains every vendor; on most CMPs
(Usercentrics, OneTrust, Cookiebot) each vendor has an Info/Details
icon that opens a modal with Beschreibung, Verarbeitendes Unternehmen,
Zweck, Genutzte Technologien, Cookies, Opt-Out-URL and Privacy-URL.
We open the settings-view of the banner, walk the Shadow-DOM for info
icons, click each one, capture the modal text + the XHR triggered by
the click (which Usercentrics uses to load the detail JSON), and parse
the text into structured fields.
Returns: list[VendorDetail] with raw_text + structured fields.
"""
from __future__ import annotations
import asyncio
import logging
import re
from dataclasses import dataclass, field
from typing import Optional
from playwright.async_api import Browser, Page, TimeoutError as PlaywrightTimeout
logger = logging.getLogger(__name__)
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
@dataclass
class VendorDetail:
name: str = ""
description: str = ""
processing_company: str = ""
address: str = ""
purposes: list[str] = field(default_factory=list)
technologies: list[str] = field(default_factory=list)
cookies: list[str] = field(default_factory=list)
retention: str = ""
opt_out_url: str = ""
privacy_url: str = ""
raw_text: str = ""
# ── Shadow-DOM helper: find info-buttons in Mercedes/Usercentrics/etc.
_FIND_INFO_BUTTONS_JS = r"""
() => {
// Walk all shadow roots and collect "info"/"i"-icon clickables.
// Covers <button>, <div>, <span>, <cmm-icon> Mercedes uses
// <div class="consent-item__information"> as info trigger.
const results = [];
function walk(root) {
if (!root || !root.querySelectorAll) return;
const buttons = root.querySelectorAll(
'button[aria-label*="info" i], button[aria-label*="details" i], ' +
'button[aria-label*="weitere" i], button[title*="info" i], ' +
'button[title*="details" i], button[class*="info" i], ' +
'button[class*="detail" i], [data-testid*="info"], [data-testid*="detail"], ' +
'button > i.material-icons, button[aria-label="i"], svg[aria-label*="info" i], ' +
// P50e: Mercedes uses button.consent-item__icon with
// data-test="toggle-consent-info-modal", aria-label=vendor name.
'button.consent-item__icon, [data-test*="toggle-consent-info"], ' +
'button[class*="info-icon"], button[class*="detail-toggle"]'
);
for (const b of buttons) {
// P50e: priority aria-label IS vendor name for Mercedes
let label = (b.getAttribute('aria-label') || '').trim();
if (!label) {
// Walk up to find a heading/label/consent-item__name span
let el = b;
for (let i = 0; i < 5 && el; i++) {
const parent = el.parentElement || (el.getRootNode && el.getRootNode().host);
if (!parent) break;
const heading = parent.querySelector ? parent.querySelector('.consent-item__name, h1,h2,h3,h4,h5,h6,strong') : null;
if (heading && heading.textContent && heading.textContent.trim().length > 1) {
label = heading.textContent.trim().substring(0, 100);
break;
}
el = parent;
}
}
// Mercedes button is visually-hidden (width=0) still clickable
results.push({label: label});
}
// Recurse into shadow roots
const all = root.querySelectorAll('*');
for (const el of all) {
if (el.shadowRoot) walk(el.shadowRoot);
}
}
walk(document);
return results;
}
"""
_CLICK_INFO_BY_LABEL_JS = r"""
(label) => {
// P50e: prefer direct aria-label match (Mercedes uses it).
function walk(root) {
if (!root || !root.querySelectorAll) return false;
try {
const escaped = label.replace(/"/g, '\\"');
const direct = root.querySelector('button[aria-label="' + escaped + '"]');
if (direct) { direct.click(); return true; }
} catch(e) {}
const buttons = root.querySelectorAll(
'button[aria-label*="info" i], button[aria-label*="details" i], ' +
'button[aria-label*="weitere" i], button[title*="info" i], ' +
'button[title*="details" i], button[class*="info" i], ' +
'button[class*="detail" i], [data-testid*="info"], [data-testid*="detail"], ' +
'button.consent-item__icon, [data-test*="toggle-consent-info"]'
);
for (const b of buttons) {
let el = b;
for (let i = 0; i < 5 && el; i++) {
const parent = el.parentElement || (el.getRootNode && el.getRootNode().host);
if (!parent) break;
const h = parent.querySelector ? parent.querySelector('h1,h2,h3,h4,h5,h6,label,strong,span') : null;
if (h && h.textContent && h.textContent.trim().substring(0, 100) === label) {
b.click();
return true;
}
el = parent;
}
}
const all = root.querySelectorAll('*');
for (const el of all) {
if (el.shadowRoot && walk(el.shadowRoot)) return true;
}
return false;
}
return walk(document);
}
"""
_EXTRACT_MODAL_TEXT_JS = r"""
() => {
// P50d: Find the detail-info container that opened on click.
// Mercedes uses an inline detail-view (not <dialog>), recognisable by
// text markers ("Verarbeitendes Unternehmen", "Beschreibung",
// "Genutzte Technologien"). Walk all shadow roots, find the SMALLEST
// element containing all/most markers that's the detail-box.
const MARKERS = [
'Verarbeitendes Unternehmen', 'Beschreibung des Services',
'Zweck der Daten', 'Genutzte Technologien', 'Gesammelte Daten',
'Datenschutz-Beauftragter', 'processing company',
'data purpose', 'technologies used',
];
let best = null, bestLen = Infinity;
function walk(root) {
if (!root || !root.querySelectorAll) return;
const all = root.querySelectorAll('*');
for (const el of all) {
const txt = (el.textContent || '');
if (txt.length < 80 || txt.length > 5000) continue;
const hits = MARKERS.filter(m => txt.includes(m)).length;
if (hits >= 2 && txt.length < bestLen) {
best = txt;
bestLen = txt.length;
}
if (el.shadowRoot) walk(el.shadowRoot);
}
}
walk(document);
if (best) return best;
// Fallback: open dialog/modal with reasonable size
function findDialog(root) {
if (!root || !root.querySelectorAll) return null;
const sels = ['[role="dialog"]:not([aria-hidden="true"])',
'[class*="modal"]:not([class*="closed"])',
'[class*="dialog"]', '[class*="popup"]',
'[class*="detail-view"]', '[class*="info-panel"]',
'[class*="detail-box"]'];
for (const sel of sels) {
const els = root.querySelectorAll(sel);
for (const el of els) {
const rect = el.getBoundingClientRect();
if (rect.width > 100 && rect.height > 100) {
const text = (el.textContent || '').trim();
if (text.length > 50 && text.length < 8000) return text;
}
}
}
const all = root.querySelectorAll('*');
for (const el of all) {
if (el.shadowRoot) {
const t = findDialog(el.shadowRoot);
if (t) return t;
}
}
return null;
}
return findDialog(document) || '';
}
"""
_CLOSE_MODAL_JS = r"""
() => {
function walk(root) {
if (!root || !root.querySelectorAll) return false;
// Close-button: aria-label, title, X-character, or class
const closes = root.querySelectorAll(
'[aria-label*="schlie" i], [aria-label*="close" i], ' +
'[title*="schlie" i], [title*="close" i], ' +
'[class*="close" i]:not([disabled])'
);
for (const c of closes) {
if (c.getBoundingClientRect().width > 0) {
c.click();
return true;
}
}
const all = root.querySelectorAll('*');
for (const el of all) {
if (el.shadowRoot && walk(el.shadowRoot)) return true;
}
return false;
}
return walk(document);
}
"""
# ── Modal-Text parsing ──────────────────────────────────────────────
_FIELD_PATTERNS = [
("description", r"Beschreibung[\s\S]{0,30}?\n([\s\S]{20,800}?)(?:\n\n|\nVerarbeit|\nZweck|\nGenutzt|\nCookies|\nAdresse|$)"),
("processing_company", r"Verarbeitende[s]?\s+Unternehmen[\s\S]{0,30}?\n([\s\S]{5,300}?)(?:\n\n|\nAdresse|\nZweck|$)"),
("address", r"(?:Adresse|Anschrift)[\s\S]{0,30}?\n([\s\S]{5,300}?)(?:\n\n|\nZweck|\nGenutzt|$)"),
("retention", r"Speicherdauer[\s\S]{0,30}?\n([\s\S]{2,200}?)(?:\n\n|\n[A-Z])"),
("opt_out_url", r"(?:Opt[\-\s]?[Oo]ut|Widerspruch)[\s\S]{0,80}?(https?://[^\s<>\"']+)"),
("privacy_url", r"(?:Datenschutz[\-\s]?(?:erkl|Information)|Privacy)[\s\S]{0,80}?(https?://[^\s<>\"']+)"),
]
def parse_modal_text(text: str) -> dict:
"""Best-effort parse of detail-modal text into structured fields."""
result = {}
for field_name, pat in _FIELD_PATTERNS:
m = re.search(pat, text)
if m:
result[field_name] = m.group(1).strip()
# Purposes / Technologies / Cookies — bullet-list style
purposes_m = re.search(
r"Zweck(?:e)?\s+der\s+Daten[\s\S]{0,80}?(?:\n)([\s\S]{20,500}?)(?:\nGenutzt|\nVerarbeit|\nCookies|\n\n[A-Z])",
text,
)
if purposes_m:
items = [s.strip(" -•*\t") for s in purposes_m.group(1).split("\n") if s.strip()]
result["purposes"] = [s for s in items if 2 < len(s) < 80][:15]
tech_m = re.search(
r"Genutzte\s+Technologien[\s\S]{0,80}?\n([\s\S]{5,500}?)(?:\nCookies|\nGesammelt|\n\n[A-Z]|\nWeb)",
text,
)
if tech_m:
items = [s.strip(" -•*\t") for s in tech_m.group(1).split("\n") if s.strip()]
result["technologies"] = [s for s in items if 2 < len(s) < 80][:10]
cookies_m = re.search(
r"Cookies?\s*(?:Name)?[\s\S]{0,80}?\n([\s\S]{5,1000}?)(?:\n\n[A-Z]|$)",
text,
)
if cookies_m:
items = [s.strip(" -•*\t") for s in cookies_m.group(1).split("\n") if s.strip()]
result["cookies"] = [s for s in items if 2 < len(s) < 100][:30]
return result
async def _try_reopen_banner(page: Page) -> bool:
"""Try to re-open the banner after Accept — floating icon or footer link."""
# 1. Common floating-icon selectors
floating_sels = [
".uc-cookie-settings-trigger", "#ot-sdk-btn", "#ot-sdk-btn-floating",
".cky-btn-revisit", "[class*='cookie-floating']",
"[class*='cmplz-cookiebanner-status']",
"[id*='cookiebot-renew']",
]
for sel in floating_sels:
try:
el = page.locator(sel).first
if await el.count() > 0:
await el.click(timeout=3000)
await page.wait_for_timeout(1500)
return True
except Exception:
continue
# 2. Footer link — generic text search
for txt in ["Cookie-Einstellungen", "Cookie Einstellungen", "Cookie-Richtlinie",
"Cookies", "Einstellungen", "Privatsphäre"]:
try:
l = page.locator(f"footer >> text=/{txt}/i").first
if await l.count() > 0:
await l.click(timeout=3000)
await page.wait_for_timeout(1500)
return True
except Exception:
continue
# 3. Shadow-DOM web-component re-open (Mercedes specific)
try:
clicked = await page.evaluate(r"""() => {
function walk(root) {
if (!root || !root.querySelectorAll) return false;
// Mercedes uses chip / persistent button inside cmm-cookie-banner
const tags = ['cmm-cookie-banner', 'cookie-consent-banner'];
for (const tag of tags) {
const els = root.querySelectorAll(tag);
for (const el of els) {
if (el.shadowRoot) {
const trigger = el.shadowRoot.querySelector(
'[aria-label*="cookie" i], [class*="trigger"], [class*="chip"]'
);
if (trigger) { trigger.click(); return true; }
}
}
}
const all = root.querySelectorAll('*');
for (const el of all) {
if (el.shadowRoot && walk(el.shadowRoot)) return true;
}
return false;
}
return walk(document);
}""")
if clicked:
await page.wait_for_timeout(1500)
return True
except Exception:
pass
return False
async def _expand_all_categories(page: Page) -> int:
"""P50d: After settings-view is open, click category expanders so all
individual vendors with their info-icons become visible.
Mercedes shows 5 category items by default; each expands to a list
of vendors with consent-item__information divs."""
try:
n = await page.evaluate(r"""() => {
let clicked = 0;
function walk(root) {
if (!root || !root.querySelectorAll) return;
// Expander triggers: wb7-button / button with "+" or aria-expanded="false"
const triggers = root.querySelectorAll(
'[aria-expanded="false"], wb7-button[class*="expand" i], ' +
'button[class*="expand" i], [class*="accordion"][aria-expanded="false"], ' +
'[class*="category"] > [role="button"], ' +
'[class*="category-header"], [class*="category__header"]'
);
for (const t of triggers) {
try { t.click(); clicked++; } catch(e) {}
}
const all = root.querySelectorAll('*');
for (const el of all) {
if (el.shadowRoot) walk(el.shadowRoot);
}
}
walk(document);
return clicked;
}""")
if n:
logger.info("Detail-Phase: expanded %d category collapsibles", n)
await page.wait_for_timeout(1500)
return n or 0
except Exception as e:
logger.debug("_expand_all_categories failed: %s", e)
return 0
async def _open_settings_view(page: Page) -> bool:
"""After banner is shown, click 'Einstellungen' to reveal the vendor list
(where consent-item__information info-divs are visible)."""
try:
# Mercedes / cmm-cookie-banner: click "Einstellungen" wb7-button
clicked = await page.evaluate(r"""() => {
function walk(root) {
if (!root || !root.querySelectorAll) return false;
const buttons = root.querySelectorAll(
'button, [role="button"], wb7-button, cmm-button'
);
for (const b of buttons) {
const txt = (b.textContent || '').trim().toLowerCase();
if (txt === 'einstellungen' || txt === 'settings' ||
txt === 'mehr informationen' || txt === 'individuell' ||
txt.includes('cookie-einstellungen') ||
txt.includes('details anzeigen')) {
b.click();
return true;
}
}
const all = root.querySelectorAll('*');
for (const el of all) {
if (el.shadowRoot && walk(el.shadowRoot)) return true;
}
return false;
}
return walk(document);
}""")
if clicked:
await page.wait_for_timeout(2500)
return True
except Exception as e:
logger.debug("open settings-view failed: %s", e)
return False
async def _detect_anti_audit(page: Page) -> dict:
"""P56: Detect anti-auditing measures on the page.
Returns dict with markers:
- bot_protection: name of detected anti-bot tool (or "")
- user_select_none: True if Banner-text has CSS user-select:none
- tdm_meta: noai/notdm meta-tag content if present
Caller decides severity. Bot-protection alone triggers TDM-skip
(§44b UrhG), user-select:none triggers HIGH Transparency-Finding
(Art. 5(1)(a) DSGVO)."""
out = {"bot_protection": "", "user_select_none": False, "tdm_meta": "",
"click_ignored": False}
try:
cookies = await page.context.cookies()
cookie_names = {c.get("name", "") for c in cookies}
if any(n.startswith(("ak_bmsc", "bm_sv", "bm_sz", "_abck")) for n in cookie_names):
out["bot_protection"] = "Akamai Bot Manager"
elif any(n in cookie_names for n in ("__cf_bm", "cf_clearance", "__cfduid")):
out["bot_protection"] = "Cloudflare Bot Management"
elif "datadome" in cookie_names:
out["bot_protection"] = "Datadome"
elif any(n.startswith("_px") for n in cookie_names):
out["bot_protection"] = "PerimeterX"
except Exception:
pass
# P58: also detect via script-domain (Mercedes loads Akamai assets even
# before bot-cookies are set on first visit).
if not out["bot_protection"]:
try:
domains = await page.evaluate(r"""() => {
const out = new Set();
document.querySelectorAll('script[src], link[href], img[src]').forEach(el => {
const src = el.src || el.href || '';
if (src) {
try { out.add(new URL(src).hostname); } catch(e) {}
}
});
return [...out];
}""")
if isinstance(domains, list):
for d in domains:
dl = d.lower()
if any(x in dl for x in (
"akamaihd.net", "akamaized.net", "akamai.net",
"edgekey.net", "edgesuite.net",
)):
out["bot_protection"] = "Akamai (via asset-CDN)"
break
if "cloudflare.com" in dl or dl.endswith(".cloudflare.net"):
out["bot_protection"] = "Cloudflare (via asset-CDN)"
break
if "datadome" in dl:
out["bot_protection"] = "Datadome (via asset)"
break
except Exception:
pass
try:
# Check CSS user-select on banner-text + meta-tag
css_meta = await page.evaluate(r"""() => {
const result = {user_select_none: false, tdm_meta: ''};
// P58: prefer SETTINGS-view sub-containers (where vendor-list lives)
// because Mercedes' banner-body is copy-able, but the settings
// section with the vendor list is not.
const settingsSels = [
'cmm-cookie-settings', '.consent-item', '.consent-label',
'.consent-item__name', '.consent-item__information',
'.uc-settings-list', '.uc-vendor', '.ot-vlst-cntr',
];
const bannerSels = ['cmm-cookie-banner', 'cookie-consent-banner',
'#usercentrics-root', '#onetrust-banner-sdk',
'#CybotCookiebotDialog', '[role="dialog"]'];
function check(root) {
if (!root || !root.querySelectorAll) return;
// 1) Settings sub-containers first much more meaningful
for (const sel of settingsSels) {
const els = root.querySelectorAll(sel);
for (const el of els) {
const v = getComputedStyle(el).userSelect;
if (v === 'none') { result.user_select_none = true; return; }
}
}
// 2) Fall back to banner-body sample
for (const sel of bannerSels) {
const els = root.querySelectorAll(sel);
for (const el of els) {
const target = el.shadowRoot ? el.shadowRoot : el;
const samples = target.querySelectorAll('p, span, div, label');
let noneHits = 0, total = 0;
for (const s of samples) {
total++;
if (getComputedStyle(s).userSelect === 'none') noneHits++;
if (total >= 20) break;
}
// Mark only if MAJORITY of text-elements are user-select:none
if (total > 0 && noneHits / total >= 0.5) {
result.user_select_none = true;
return;
}
}
}
const all = root.querySelectorAll('*');
for (const e of all) { if (e.shadowRoot) check(e.shadowRoot); }
}
check(document);
// Meta-tag
const metas = document.querySelectorAll('meta[name="robots"], meta[name="googlebot"]');
for (const m of metas) {
const c = (m.getAttribute('content') || '').toLowerCase();
if (c.includes('noai') || c.includes('notdm')) {
result.tdm_meta = c.substring(0, 80);
break;
}
}
return result;
}""")
if isinstance(css_meta, dict):
out["user_select_none"] = bool(css_meta.get("user_select_none"))
out["tdm_meta"] = css_meta.get("tdm_meta", "") or ""
except Exception:
pass
return out
async def _is_tdm_protected(page: Page) -> tuple[bool, str]:
"""Convenience wrapper — TDM if bot_protection or tdm_meta present."""
d = await _detect_anti_audit(page)
if d["bot_protection"]:
return True, f"{d['bot_protection']} (cookie marker)"
if d["tdm_meta"]:
return True, f"TDM opt-out meta-tag: {d['tdm_meta']}"
return False, ""
async def extract_vendor_details(
browser: Browser,
url: str,
accept_selector: Optional[str] = None,
max_vendors: int = 50,
per_vendor_timeout: float = 6.0,
) -> list[VendorDetail]:
"""Phase D: open settings-view of banner, click each Info-button, capture modal.
P50f: respect TDM opt-out (Akamai/Cloudflare/Datadome/PerimeterX) skip
Phase G entirely when active anti-bot protection is detected."""
details: list[VendorDetail] = []
ctx = await browser.new_context(
user_agent=USER_AGENT,
viewport={"width": 1920, "height": 1080},
locale="de-DE",
timezone_id="Europe/Berlin",
)
page = await ctx.new_page()
try:
try:
await page.goto(url, wait_until="load", timeout=30000)
except Exception as e:
logger.warning("Detail-Phase: page.goto failed: %s", e)
return details
await page.wait_for_timeout(3500)
# P50f: Respect TDM opt-out (§44b UrhG). If site uses active
# anti-bot protection, do NOT attempt click-through scraping.
tdm_protected, tdm_reason = await _is_tdm_protected(page)
if tdm_protected:
logger.info(
"Detail-Phase: TDM opt-out detected (%s) — skipping vendor "
"detail-extract to respect §44b UrhG", tdm_reason
)
# Emit a sentinel detail entry so caller can flag this in the report
details.append(VendorDetail(
name="__TDM_OPTOUT__",
description=f"Phase G übersprungen — Site nutzt aktive Bot-Detection ({tdm_reason}). TDM-Vorbehalt nach §44b UrhG respektiert.",
))
return details
# Step 1: Fresh context — banner should already be open. Skip
# the Accept step and go directly to 'Einstellungen' (avoids
# closing-then-reopening which Mercedes makes hard).
# Step 2b (P50b): click 'Einstellungen' to reveal vendor list with
# info-icons. Without this Mercedes only shows the initial 3 buttons.
settings_opened = await _open_settings_view(page)
if settings_opened:
logger.info("Detail-Phase: opened settings-view")
else:
# If banner is not open, try to re-open it first then settings
await _try_reopen_banner(page)
await page.wait_for_timeout(1500)
await _open_settings_view(page)
await page.wait_for_timeout(2000)
# Step 2c (P50d): expand all category accordions so each vendor's
# info-icon becomes visible. Mercedes collapses categories by default.
await _expand_all_categories(page)
await page.wait_for_timeout(1000)
# Step 3: collect info-button candidates
btn_infos = await page.evaluate(_FIND_INFO_BUTTONS_JS)
if not isinstance(btn_infos, list):
return details
# Dedup by label
seen_labels: set[str] = set()
unique = []
for b in btn_infos:
lbl = b.get("label", "").strip()
if lbl and lbl not in seen_labels:
seen_labels.add(lbl)
unique.append(b)
logger.info("Detail-Phase: found %d info-button candidates (deduped from %d)",
len(unique), len(btn_infos))
# Step 4: click each, extract modal, close
for i, btn in enumerate(unique[:max_vendors]):
label = btn["label"]
try:
clicked = await asyncio.wait_for(
page.evaluate(_CLICK_INFO_BY_LABEL_JS, label),
timeout=per_vendor_timeout,
)
if not clicked:
continue
await page.wait_for_timeout(1200)
text = await asyncio.wait_for(
page.evaluate(_EXTRACT_MODAL_TEXT_JS),
timeout=per_vendor_timeout,
)
if isinstance(text, str) and len(text) > 50:
fields = parse_modal_text(text)
details.append(VendorDetail(
name=label,
raw_text=text[:3000],
**{k: v for k, v in fields.items()
if k in ("description", "processing_company", "address",
"retention", "opt_out_url", "privacy_url")},
purposes=fields.get("purposes", []),
technologies=fields.get("technologies", []),
cookies=fields.get("cookies", []),
))
# Close modal
try:
await asyncio.wait_for(
page.evaluate(_CLOSE_MODAL_JS),
timeout=2.0,
)
except Exception:
await page.keyboard.press("Escape")
await page.wait_for_timeout(500)
except (asyncio.TimeoutError, PlaywrightTimeout) as e:
logger.warning("Detail-Phase: vendor '%s' timed out", label[:40])
continue
except Exception as e:
logger.warning("Detail-Phase: vendor '%s' failed: %s", label[:40], e)
continue
logger.info("Detail-Phase complete: %d vendors with details", len(details))
finally:
await ctx.close()
return details