Files
breakpilot-compliance/consent-tester/services/consent_scanner.py
T
Benjamin Admin 8cbb513e2c
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / detect-changes (push) Successful in 11s
CI / branch-name (push) Has been skipped
CI / loc-budget (push) Failing after 16s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 15s
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 38s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI / test-go (push) Has been skipped
feat(audit): Phase 1 Quick-Wins (P81 + P85 + P70 + P83) + TCF DELETE/INSERT-Fix
P81 — tests/fixtures/golden_truth/vw_de.json:
GT-Fixture mit must_find_cookies (47 VW-Cookies) + expected_vendors
(Google, Adobe, Trade Desk, ...). Basis fuer kuenftige Regression-Tests.

P85 — banner_screenshot_block.py + consent_scanner.py + main.py:
consent-tester macht beim Banner-Detect einen base64-PNG-Screenshot
(< 1.5MB). Backend rendert ihn als <img src="data:..."> direkt nach
dem GF-1-Pager. Visueller Beweis 'so sah das Banner aus' fuer Dispute
mit Marketing/DSB.

P70 — rag_provenance.py:
classify_finding_provenance() klassifiziert ein Finding als 'rag'
(Norm + Quelle), 'mixed' (Norm ohne Quelle) oder 'heuristic' (eigene
Interpretation). provenance_badge_html() rendert kleine Badges
(✓ RAG / NORM / ⚠ HEURISTIK). Modul ist generisch, kann bei jedem
Finding-Renderer einklinkt werden.

P83 — scripts/check-rebuild-needed.sh:
Prueft ob die im Container deployten BUILD_SHA mit local HEAD
uebereinstimmen. Bei Mismatch exit 1 mit 'REBUILD REQUIRED'-Hinweis.
Verhindert das 'alter Code im Container'-Problem das uns mehrfach
erwischt hat (Frontend-Tabs sichtbar, Backend ohne neuen Service).

TCF-Fix — tcf_vendor_authority.py:
cookie_library hat keinen UNIQUE-Index auf cookie_name → ON CONFLICT
war unmoeglich. Loesung: vor Insert DELETE WHERE source_name='iab_tcf_v2'.
Idempotent. + per-Vendor-Commit damit ein Fail die naechsten nicht blockt.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 08:24:46 +02:00

568 lines
27 KiB
Python

"""
Consent Scanner — Playwright-based 3-phase cookie consent test.
Phase A: Before consent (first visit)
Phase B: After rejecting consent
Phase C: After accepting consent
"""
import asyncio
import logging
from dataclasses import dataclass, field
from playwright.async_api import async_playwright, Page, BrowserContext
try:
from playwright_stealth import stealth_async
HAS_STEALTH = True
except ImportError:
HAS_STEALTH = False
from services.banner_detector import detect_banner, click_button, BannerInfo
from services.script_analyzer import (
classify_scripts, find_tracking_services,
find_violations_before_consent, find_violations_after_reject, Violation,
)
from services.banner_text_checker import check_banner_text as _check_banner_text
from services.consent_interceptor import (
INIT_SCRIPT as _INTERCEPTOR_INIT,
collect_intercepted_data as _collect_intercepted,
get_consent_state as _get_consent_state,
analyze_phase_data as _analyze_phase,
)
logger = logging.getLogger(__name__)
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
@dataclass
class ConsentTestResult:
banner_detected: bool = False
banner_provider: str = ""
# Phase A: Before consent
before_scripts: list[str] = field(default_factory=list)
before_cookies: list[str] = field(default_factory=list)
before_tracking: list[str] = field(default_factory=list)
before_violations: list[Violation] = field(default_factory=list)
# Phase B: After reject
reject_scripts: list[str] = field(default_factory=list)
reject_cookies: list[str] = field(default_factory=list)
reject_new_tracking: list[str] = field(default_factory=list)
reject_violations: list[Violation] = field(default_factory=list)
# Phase C: After accept
accept_scripts: list[str] = field(default_factory=list)
accept_cookies: list[str] = field(default_factory=list)
accept_new_tracking: list[str] = field(default_factory=list)
accept_undocumented: list[str] = field(default_factory=list)
# Phase D-F: Per-category tests
category_tests: list = field(default_factory=list) # list[CategoryTestResult]
# Banner text checks
banner_text_violations: list[Violation] = field(default_factory=list)
banner_has_impressum_link: bool = False
banner_has_dse_link: bool = False
# Deep verification (per-phase intercepted data)
deep_verification: dict = field(default_factory=dict)
# TCF vendors (resolved via GVL after accept phase)
tcf_vendors: list = field(default_factory=list)
# P48: CMP-Payloads captured during all phases (Usercentrics, OneTrust, etc.)
# — passed to backend for deterministic vendor extraction.
cmp_payloads: list = field(default_factory=list)
# P50: per-vendor detail-modal-extracts (description, opt-out, cookies etc.)
vendor_details: list = field(default_factory=list)
# P59b: full cookie details per phase (name, value, domain, expires)
# for behavior-validation in backend. Implicit declared_category:
# before/reject phase = essential (site claims), accept = any.
cookies_detailed: list = field(default_factory=list)
# P85: base64-PNG-Screenshot des Banners vor dem ersten Klick.
# Backend embedded das als <img> in der Mail — visueller Beweis
# "so sah das Banner zum Audit-Zeitpunkt aus".
banner_screenshot_b64: str = ""
async def run_consent_test(
url: str, wait_secs: int = 10, categories: list[str] | None = None,
) -> ConsentTestResult:
"""Run 3-phase consent test on a URL.
Args:
url: Website URL to test.
wait_secs: Seconds to wait per phase.
categories: Optional list of category names to test (empty = test all).
"""
result = ConsentTestResult()
wait_ms = wait_secs * 1000
filter_cats = categories or []
# P48: Init CMP-Capture early so it attaches to every page/context.
# CMP JSON-Endpoints (Usercentrics, OneTrust, Cookiebot, ePaaS) are
# fetched once per page load — capture them across all 3 phases so
# the backend can do deterministic vendor extraction without LLM.
from services.cmp_extractor import CMPCapture
cmp_capture = CMPCapture()
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=[
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-blink-features=AutomationControlled",
"--window-size=1920,1080",
# P50c: Mercedes/Akamai Bot Manager crashed renderer
# without these (limits memory pressure + GPU init):
"--disable-gpu",
"--disable-software-rasterizer",
"--disable-background-timer-throttling",
"--disable-renderer-backgrounding",
"--disable-backgrounding-occluded-windows",
"--js-flags=--max-old-space-size=2048",
],
)
try:
# ── Phase A: Before consent ──────────────────────────
logger.info("Phase A: First visit (no interaction)")
ctx_a = await browser.new_context(
user_agent=USER_AGENT,
viewport={"width": 1920, "height": 1080},
locale="de-DE",
timezone_id="Europe/Berlin",
)
page_a = await ctx_a.new_page()
await page_a.add_init_script(_INTERCEPTOR_INIT)
if HAS_STEALTH:
await stealth_async(page_a)
cmp_capture.attach(page_a) # P48
scripts_a = []
page_a.on("request", lambda req: _collect_script(req, scripts_a))
# P50c: Mercedes/Akamai SPA never reaches networkidle.
# Use domcontentloaded + short JS-wait + retry on crash.
for _attempt in range(2):
try:
await page_a.goto(url, wait_until="domcontentloaded", timeout=20000)
await page_a.wait_for_timeout(3500)
break
except Exception as _e:
err = str(_e)[:120]
logger.warning("Phase A goto attempt %d failed: %s", _attempt + 1, err)
if "crashed" in err.lower() and _attempt == 0:
await page_a.wait_for_timeout(2000)
continue
try:
await page_a.goto(url, wait_until="load", timeout=20000)
except Exception:
pass
break
await page_a.wait_for_timeout(wait_ms)
# Deep verification: Phase A
try:
intercepted_a = await _collect_intercepted(page_a)
consent_state_a = await _get_consent_state(page_a)
deep_violations_a = _analyze_phase("before_consent", intercepted_a, consent_state_a)
result.deep_verification["before_consent"] = {
"intercepted": intercepted_a,
"consent_state": consent_state_a,
"violations": deep_violations_a,
}
except Exception as exc:
logger.warning("Phase A deep verification failed: %s", exc)
result.before_scripts = _get_page_scripts(scripts_a)
_cookies_a = await ctx_a.cookies()
result.before_cookies = _get_cookie_names(_cookies_a)
# P59b: capture full details — phase = "before" = implicit essential-claim
for ck in _cookies_a:
result.cookies_detailed.append({
"name": ck.get("name", ""),
"value": (ck.get("value") or "")[:200],
"domain": ck.get("domain", ""),
"expires": ck.get("expires"),
"phase": "before",
"declared_category": "essential",
})
result.before_tracking = find_tracking_services(result.before_scripts)
result.before_violations = find_violations_before_consent(result.before_scripts)
# Detect banner
banner = await detect_banner(page_a)
result.banner_detected = banner.detected
result.banner_provider = banner.provider
# Check banner text for legal issues
if banner.detected:
banner_violations = await _check_banner_text(page_a)
result.banner_text_violations = banner_violations["violations"]
result.banner_has_impressum_link = banner_violations["has_impressum"]
result.banner_has_dse_link = banner_violations["has_dse"]
# P85 — visueller Beweis fuer die Mail.
try:
import base64 as _b64
png = await page_a.screenshot(
full_page=False, type="png", timeout=10000,
)
if png and len(png) < 1_500_000: # < 1.5 MB
result.banner_screenshot_b64 = _b64.b64encode(png).decode("ascii")
logger.info("P85: banner screenshot captured (%d bytes)", len(png))
except Exception as _se:
logger.warning("P85: banner screenshot failed: %s", _se)
await ctx_a.close()
if not banner.detected:
logger.info("No consent banner detected — skipping Phase B/C")
await browser.close()
return result
# ── Phase B: After rejecting ─────────────────────────
logger.info("Phase B: Reject consent (%s)", banner.provider)
ctx_b = await browser.new_context(
user_agent=USER_AGENT,
viewport={"width": 1920, "height": 1080},
locale="de-DE",
timezone_id="Europe/Berlin",
)
page_b = await ctx_b.new_page()
await page_b.add_init_script(_INTERCEPTOR_INIT)
if HAS_STEALTH:
await stealth_async(page_b)
cmp_capture.attach(page_b) # P48
scripts_b = []
page_b.on("request", lambda req: _collect_script(req, scripts_b))
try:
await page_b.goto(url, wait_until="domcontentloaded", timeout=20000)
except Exception as _e:
logger.warning("networkidle timeout, fallback to load: %s", str(_e)[:80])
await page_b.goto(url, wait_until="load", timeout=30000)
await page_b.wait_for_timeout(3000)
clicked = await click_button(page_b, banner.reject_selector)
if clicked:
logger.info("Reject button clicked, waiting %ds", wait_secs)
await page_b.wait_for_timeout(wait_ms)
else:
logger.warning("Could not click reject button")
# Deep verification: Phase B
try:
intercepted_b = await _collect_intercepted(page_b)
consent_state_b = await _get_consent_state(page_b)
deep_violations_b = _analyze_phase("after_reject", intercepted_b, consent_state_b)
result.deep_verification["after_reject"] = {
"intercepted": intercepted_b,
"consent_state": consent_state_b,
"violations": deep_violations_b,
}
except Exception as exc:
logger.warning("Phase B deep verification failed: %s", exc)
result.reject_scripts = _get_page_scripts(scripts_b)
_cookies_b = await ctx_b.cookies()
result.reject_cookies = _get_cookie_names(_cookies_b)
# P59b: after-Reject = site claims these are essential
_before_names = {c.get("name", "") for c in _cookies_a}
for ck in _cookies_b:
if ck.get("name", "") in _before_names:
continue # already captured in 'before'
result.cookies_detailed.append({
"name": ck.get("name", ""),
"value": (ck.get("value") or "")[:200],
"domain": ck.get("domain", ""),
"expires": ck.get("expires"),
"phase": "reject",
"declared_category": "essential",
})
reject_tracking = find_tracking_services(result.reject_scripts)
result.reject_new_tracking = [t for t in reject_tracking if t not in result.before_tracking]
result.reject_violations = find_violations_after_reject(
result.before_scripts, result.reject_scripts,
)
await ctx_b.close()
# ── Phase C: After accepting ─────────────────────────
logger.info("Phase C: Accept consent (%s)", banner.provider)
ctx_c = await browser.new_context(
user_agent=USER_AGENT,
viewport={"width": 1920, "height": 1080},
locale="de-DE",
timezone_id="Europe/Berlin",
)
page_c = await ctx_c.new_page()
await page_c.add_init_script(_INTERCEPTOR_INIT)
if HAS_STEALTH:
await stealth_async(page_c)
cmp_capture.attach(page_c) # P48
scripts_c = []
page_c.on("request", lambda req: _collect_script(req, scripts_c))
try:
await page_c.goto(url, wait_until="domcontentloaded", timeout=20000)
except Exception as _e:
logger.warning("networkidle timeout, fallback to load: %s", str(_e)[:80])
await page_c.goto(url, wait_until="load", timeout=30000)
await page_c.wait_for_timeout(3000)
clicked = await click_button(page_c, banner.accept_selector)
if clicked:
logger.info("Accept button clicked, waiting %ds", wait_secs)
await page_c.wait_for_timeout(wait_ms)
else:
logger.warning("Could not click accept button")
# Deep verification: Phase C
try:
intercepted_c = await _collect_intercepted(page_c)
consent_state_c = await _get_consent_state(page_c)
deep_violations_c = _analyze_phase("after_accept", intercepted_c, consent_state_c)
result.deep_verification["after_accept"] = {
"intercepted": intercepted_c,
"consent_state": consent_state_c,
"violations": deep_violations_c,
}
except Exception as exc:
logger.warning("Phase C deep verification failed: %s", exc)
result.accept_scripts = _get_page_scripts(scripts_c)
_cookies_c = await ctx_c.cookies()
result.accept_cookies = _get_cookie_names(_cookies_c)
# P59b: post-Accept new cookies — declared "any" (consent given)
_seen_names = {c["name"] for c in result.cookies_detailed}
for ck in _cookies_c:
if ck.get("name", "") in _seen_names:
continue
result.cookies_detailed.append({
"name": ck.get("name", ""),
"value": (ck.get("value") or "")[:200],
"domain": ck.get("domain", ""),
"expires": ck.get("expires"),
"phase": "accept",
"declared_category": "", # unclear what category — consent given
})
accept_tracking = find_tracking_services(result.accept_scripts)
result.accept_new_tracking = [t for t in accept_tracking if t not in result.before_tracking]
# TCF vendor extraction (after accept, while page is still open)
try:
from services.consent_interceptor import extract_tcf_vendors
result.tcf_vendors = await extract_tcf_vendors(page_c)
except Exception as exc:
logger.warning("TCF vendor extraction failed: %s", exc)
await ctx_c.close()
# ── Phase D-F: Per-category tests ────────────────────────
try:
from services.category_tester import detect_categories, test_single_category
ctx_cat = await browser.new_context(
user_agent=USER_AGENT,
viewport={"width": 1920, "height": 1080},
locale="de-DE",
timezone_id="Europe/Berlin",
)
page_cat = await ctx_cat.new_page()
if HAS_STEALTH:
await stealth_async(page_cat)
try:
await page_cat.goto(url, wait_until="domcontentloaded", timeout=15000)
except Exception as _e:
logger.warning("networkidle timeout, fallback to load: %s", str(_e)[:80])
await page_cat.goto(url, wait_until="load", timeout=20000)
await page_cat.wait_for_timeout(2000)
detected_cats = await detect_categories(page_cat, banner)
await page_cat.close()
# Filter to requested categories if specified
if filter_cats and detected_cats:
detected_cats = [
c for c in detected_cats if c.name in filter_cats
]
logger.info(
"Filtered to %d categories (requested: %s)",
len(detected_cats), filter_cats,
)
if detected_cats:
# P26: per-category 25s + phase budget 150s. Mercedes
# has 9 categories which would block the /scan well
# beyond the caller's 240s timeout. Skip rather than
# block — banner_quality + cmp_payloads matter more
# than per-cat detail.
import time # asyncio already imported at top (P50c)
phase_deadline = time.monotonic() + 90.0
# Dedup by name (some sites detect same cat 3x via
# shadow-DOM walk; testing each is wasteful)
seen_names: set[str] = set()
unique_cats = [c for c in detected_cats
if not (c.name in seen_names or seen_names.add(c.name))]
logger.info("Testing %d unique categories (budget=90s, per-cat=15s)",
len(unique_cats))
for cat in unique_cats:
if time.monotonic() >= phase_deadline:
logger.warning("Category phase budget exhausted, "
"skipping remaining %d categories",
len(unique_cats) - len(result.category_tests))
break
cat_ctx = await browser.new_context(
user_agent=USER_AGENT,
viewport={"width": 1920, "height": 1080},
locale="de-DE",
timezone_id="Europe/Berlin",
)
try:
cat_result = await asyncio.wait_for(
test_single_category(cat_ctx, url, cat, banner, wait_ms),
timeout=15.0,
)
result.category_tests.append(cat_result)
except asyncio.TimeoutError:
logger.warning("Category '%s' timed out after 15s, skipping", cat.name)
finally:
await cat_ctx.close()
else:
logger.info("No categories detected — skipping per-category tests")
await ctx_cat.close()
except Exception as cat_err:
logger.warning("Category tests failed (non-blocking): %s", cat_err)
# ── P56: Anti-Auditing-Detection (vor Phase G) ─────────
# Marker erfassen → bei aktivem Bot-Block Phase G überspringen
# (TDM-Respekt) UND HIGH-Finding für Transparenz-Verstoss.
try:
from services.vendor_detail_extractor import _detect_anti_audit
anti = await _detect_anti_audit(page_c)
if anti.get("bot_protection"):
result.banner_text_violations.append(Violation(
service="Cookie-Banner",
severity="LOW",
text=f"Hinweis: {anti['bot_protection']} ist aktiv und blockiert "
f"automatisierte Compliance-Audits. Fuer Endnutzer voll "
f"funktional. Empfehlung: Audit-API bereitstellen damit "
f"unabhaengige Pruefer (Aufsichtsbehoerden, DSB) maschinen"
f"lesbar verifizieren koennen — staerkt Vertrauen ohne "
f"Bot-Schutz zu reduzieren.",
legal_ref="Rechenschaftspflicht Art. 5(2) DSGVO, "
"Transparenz-Empfehlung DSK-OH 2024",
))
if anti.get("user_select_none"):
result.banner_text_violations.append(Violation(
service="Cookie-Banner",
severity="MEDIUM",
text="Banner-Settings-Oberflaeche nicht per Maus kopierbar "
"(CSS user-select:none). Endnutzer koennen sich Cookie-Listen "
"+ Anbieter nicht einfach archivieren. Info-Modals pro Vendor "
"sind hingegen kopierbar — bitte gleiches Verhalten auch "
"auf der Uebersichtsseite ermoeglichen.",
legal_ref="Art. 12(1) DSGVO (transparente Information), "
"DSK-OH Telemedien 2024 (Informations-Festhalten)",
))
if anti.get("tdm_meta"):
logger.info("Anti-Audit: TDM opt-out meta-tag detected: %s",
anti["tdm_meta"])
except Exception as e:
logger.debug("Anti-Audit detection skipped: %s", e)
# ── Phase G: Per-Vendor Detail-Extraction (P50) ─────────
# After Accept, re-open banner and click each Info-button
# to capture detail-modal text. Detail-XHRs also captured
# by CMPCapture (still attached). Runs only if Banner was
# detected and an accept_text is known.
if result.banner_detected and banner is not None:
try:
from services.vendor_detail_extractor import (
extract_vendor_details,
)
accept_sel = banner.accept_selector or None
logger.info("Phase G: starting vendor-detail-extract (max 50 vendors)")
vd = await asyncio.wait_for(
extract_vendor_details(
browser, url,
accept_selector=accept_sel,
max_vendors=50,
),
timeout=600.0, # 10min hard cap
)
# Serialise dataclasses to plain dicts for JSON-Response
for v in vd:
result.vendor_details.append({
"name": v.name,
"description": v.description,
"processing_company": v.processing_company,
"address": v.address,
"purposes": v.purposes,
"technologies": v.technologies,
"cookies": v.cookies,
"retention": v.retention,
"opt_out_url": v.opt_out_url,
"privacy_url": v.privacy_url,
"raw_text": v.raw_text,
})
logger.info("Phase G complete: %d vendor-details captured",
len(result.vendor_details))
except asyncio.TimeoutError:
logger.warning("Phase G: hard timeout reached (10min)")
except Exception as vd_err:
logger.warning("Phase G failed (non-blocking): %s", vd_err)
except Exception as e:
logger.error("Consent test failed: %s", e)
finally:
await browser.close()
# P48: collect CMP-payloads captured during all phases. CMPCapture
# stores them as tuples (cmp_name, data). Convert to dicts that
# match the format used by /dsi-discovery so backend can process
# them with extract_vendors_from_payloads(). Dedup by-data not
# by-URL since CMPCapture doesn't store the URL.
seen_keys: set[str] = set()
for cmp_name, data in cmp_capture.payloads:
# Dedup key: cmp_name + length-of-data + first few JSON keys
try:
sig = f"{cmp_name}:{len(str(data))}:{','.join(sorted(list(data.keys())[:5]) if isinstance(data, dict) else [])}"
except Exception:
sig = f"{cmp_name}:{id(data)}"
if sig in seen_keys:
continue
seen_keys.add(sig)
result.cmp_payloads.append({"kind": cmp_name, "data": data})
logger.info(
"Consent test complete: banner=%s, violations_before=%d, violations_reject=%d, categories=%d, cmp_payloads=%d",
result.banner_provider, len(result.before_violations), len(result.reject_violations),
len(result.category_tests), len(result.cmp_payloads),
)
return result
def _collect_script(request, scripts: list[str]):
"""Collect script request URLs."""
if request.resource_type in ("script", "image", "xhr", "fetch"):
scripts.append(request.url)
def _get_page_scripts(collected: list[str]) -> list[str]:
"""Deduplicate and filter script URLs."""
seen = set()
result = []
for url in collected:
domain = url.split("/")[2] if "/" in url and len(url.split("/")) > 2 else url
if domain not in seen:
seen.add(domain)
result.append(url)
return result[:50] # Cap at 50
def _get_cookie_names(cookies: list[dict]) -> list[str]:
"""Extract cookie names from Playwright cookie list."""
return sorted(set(c.get("name", "") for c in cookies if c.get("name")))