feat: consent-tester microservice — Playwright 3-phase cookie test

New independent service (port 8094) with headless Chromium:
- Phase A: What loads BEFORE any consent interaction
- Phase B: What loads AFTER rejecting consent (CRITICAL if tracking persists)
- Phase C: What loads AFTER accepting (check against cookie policy)
- 10 CMP-specific selectors (Didomi, OneTrust, Cookiebot, Usercentrics, etc.)
- Generic fallback via button text matching
- 18 tracking service patterns for script classification

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-04-29 12:14:41 +02:00
parent 15d1e118ed
commit d105842bf2
7 changed files with 589 additions and 0 deletions
+23
View File
@@ -0,0 +1,23 @@
FROM python:3.12-slim-bookworm
WORKDIR /app
# Install system dependencies for Playwright/Chromium
RUN apt-get update && apt-get install -y --no-install-recommends \
libnss3 libnspr4 libatk1.0-0 libatk-bridge2.0-0 libcups2 \
libdrm2 libxkbcommon0 libxcomposite1 libxdamage1 libxfixes3 \
libxrandr2 libgbm1 libpango-1.0-0 libcairo2 libasound2 \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
RUN playwright install chromium
COPY . .
RUN useradd --create-home appuser
USER appuser
EXPOSE 8094
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8094"]
+86
View File
@@ -0,0 +1,86 @@
"""
Consent Tester Service — Playwright-based 3-phase cookie consent test.
Tests what scripts/cookies load BEFORE consent, AFTER rejection, and AFTER acceptance.
Runs as independent microservice on port 8094.
"""
import logging
from datetime import datetime, timezone
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from services.consent_scanner import run_consent_test, ConsentTestResult
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s")
logger = logging.getLogger(__name__)
app = FastAPI(title="BreakPilot Consent Tester", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
class ScanRequest(BaseModel):
url: str
timeout_per_phase: int = 10 # seconds to wait after page load
class ScanResponse(BaseModel):
url: str
banner_detected: bool
banner_provider: str
phases: dict
summary: dict
scanned_at: str
@app.get("/health")
async def health():
return {"status": "healthy", "service": "consent-tester"}
@app.post("/scan", response_model=ScanResponse)
async def scan_consent(req: ScanRequest):
"""Run 3-phase consent test on a URL."""
logger.info("Starting consent test for %s", req.url)
result = await run_consent_test(req.url, req.timeout_per_phase)
return ScanResponse(
url=req.url,
banner_detected=result.banner_detected,
banner_provider=result.banner_provider,
phases={
"before_consent": {
"scripts": result.before_scripts,
"cookies": result.before_cookies,
"tracking_services": result.before_tracking,
"violations": [v.__dict__ for v in result.before_violations],
},
"after_reject": {
"scripts": result.reject_scripts,
"cookies": result.reject_cookies,
"new_tracking": result.reject_new_tracking,
"violations": [v.__dict__ for v in result.reject_violations],
},
"after_accept": {
"scripts": result.accept_scripts,
"cookies": result.accept_cookies,
"new_tracking": result.accept_new_tracking,
"undocumented": result.accept_undocumented,
},
},
summary={
"critical": sum(1 for v in result.reject_violations if v.severity == "CRITICAL"),
"high": len(result.before_violations),
"undocumented": len(result.accept_undocumented),
"total_violations": len(result.before_violations) + len(result.reject_violations),
},
scanned_at=datetime.now(timezone.utc).isoformat(),
)
+3
View File
@@ -0,0 +1,3 @@
fastapi==0.115.12
uvicorn==0.34.2
playwright==1.52.0
View File
+149
View File
@@ -0,0 +1,149 @@
"""
Banner Detector — identifies Consent Management Platforms and their buttons.
Supports 10+ CMPs with specific selectors + generic fallback.
"""
from dataclasses import dataclass
from playwright.async_api import Page, Locator
@dataclass
class BannerInfo:
detected: bool
provider: str
accept_selector: str
reject_selector: str
# CMP-specific selectors (ordered by market share)
CMP_SELECTORS = [
{
"name": "Didomi",
"detect": "#didomi-host, [class*='didomi']",
"accept": "#didomi-notice-agree-button",
"reject": "#didomi-notice-disagree-button, .didomi-components-button--secondary",
},
{
"name": "OneTrust",
"detect": "#onetrust-banner-sdk, [class*='onetrust']",
"accept": "#onetrust-accept-btn-handler",
"reject": "#onetrust-reject-all-handler, .onetrust-close-btn-handler",
},
{
"name": "Cookiebot",
"detect": "#CybotCookiebotDialog, [class*='CybotCookiebot']",
"accept": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll",
"reject": "#CybotCookiebotDialogBodyButtonDecline",
},
{
"name": "Usercentrics",
"detect": "#usercentrics-root, [data-testid='uc-banner']",
"accept": "[data-testid='uc-accept-all-button']",
"reject": "[data-testid='uc-deny-all-button']",
},
{
"name": "CookieYes",
"detect": ".cky-consent-container, [class*='cky-']",
"accept": ".cky-btn-accept",
"reject": ".cky-btn-reject, .cky-btn-customize",
},
{
"name": "Quantcast",
"detect": ".qc-cmp2-container, [class*='qc-cmp']",
"accept": "[class*='qc-cmp2-summary-buttons'] button:first-child",
"reject": "[class*='qc-cmp2-summary-buttons'] button:last-child",
},
{
"name": "Borlabs",
"detect": "#BorlabsCookieBox, [class*='BorlabsCookie']",
"accept": "#BorlabsCookieBox .cookie-accept, [data-cookie-accept]",
"reject": "#BorlabsCookieBox .cookie-refuse, [data-cookie-refuse]",
},
{
"name": "Consentmanager",
"detect": "#cmpbox, [class*='cmpbox']",
"accept": ".cmpboxbtn.cmpboxbtnyes",
"reject": ".cmpboxbtn.cmpboxbtnno",
},
{
"name": "Klaro",
"detect": ".klaro, [class*='klaro']",
"accept": ".klaro .cm-btn-accept",
"reject": ".klaro .cm-btn-decline",
},
{
"name": "TarteAuCitron",
"detect": "#tarteaucitronRoot, [class*='tarteaucitron']",
"accept": "#tarteaucitronPersonalize2",
"reject": "#tarteaucitronAllDenied2",
},
]
# Generic fallback patterns (text-based)
GENERIC_ACCEPT_TEXTS = [
"Alle akzeptieren", "Alles akzeptieren", "Alle Cookies akzeptieren",
"Accept all", "Accept All Cookies", "Akzeptieren", "Zustimmen",
"Einverstanden", "Ich stimme zu", "Ja, einverstanden",
]
GENERIC_REJECT_TEXTS = [
"Nur notwendige", "Nur essentielle", "Ablehnen", "Alle ablehnen",
"Reject", "Reject all", "Nur erforderliche", "Nur technisch notwendige",
"Decline", "Nein", "Nicht einverstanden",
]
async def detect_banner(page: Page) -> BannerInfo:
"""Detect which CMP is used and return button selectors."""
# Try CMP-specific selectors first
for cmp in CMP_SELECTORS:
try:
count = await page.locator(cmp["detect"]).count()
if count > 0:
return BannerInfo(
detected=True,
provider=cmp["name"],
accept_selector=cmp["accept"],
reject_selector=cmp["reject"],
)
except Exception:
continue
# Generic fallback — search for buttons by text
for text in GENERIC_ACCEPT_TEXTS:
try:
btn = page.get_by_text(text, exact=False)
if await btn.count() > 0:
accept = f'button:has-text("{text}")'
# Try to find reject button nearby
reject = ""
for rtext in GENERIC_REJECT_TEXTS:
rbtn = page.get_by_text(rtext, exact=False)
if await rbtn.count() > 0:
reject = f'button:has-text("{rtext}")'
break
return BannerInfo(
detected=True,
provider="Generic",
accept_selector=accept,
reject_selector=reject,
)
except Exception:
continue
return BannerInfo(detected=False, provider="", accept_selector="", reject_selector="")
async def click_button(page: Page, selector: str, timeout: int = 5000) -> bool:
"""Try to click a consent button. Returns True if clicked successfully."""
if not selector:
return False
try:
locator = page.locator(selector).first
await locator.wait_for(state="visible", timeout=timeout)
await locator.click()
return True
except Exception:
return False
+171
View File
@@ -0,0 +1,171 @@
"""
Consent Scanner — Playwright-based 3-phase cookie consent test.
Phase A: Before consent (first visit)
Phase B: After rejecting consent
Phase C: After accepting consent
"""
import logging
from dataclasses import dataclass, field
from playwright.async_api import async_playwright, Page, BrowserContext
from services.banner_detector import detect_banner, click_button, BannerInfo
from services.script_analyzer import (
classify_scripts, find_tracking_services,
find_violations_before_consent, find_violations_after_reject, Violation,
)
logger = logging.getLogger(__name__)
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
@dataclass
class ConsentTestResult:
banner_detected: bool = False
banner_provider: str = ""
# Phase A: Before consent
before_scripts: list[str] = field(default_factory=list)
before_cookies: list[str] = field(default_factory=list)
before_tracking: list[str] = field(default_factory=list)
before_violations: list[Violation] = field(default_factory=list)
# Phase B: After reject
reject_scripts: list[str] = field(default_factory=list)
reject_cookies: list[str] = field(default_factory=list)
reject_new_tracking: list[str] = field(default_factory=list)
reject_violations: list[Violation] = field(default_factory=list)
# Phase C: After accept
accept_scripts: list[str] = field(default_factory=list)
accept_cookies: list[str] = field(default_factory=list)
accept_new_tracking: list[str] = field(default_factory=list)
accept_undocumented: list[str] = field(default_factory=list)
async def run_consent_test(url: str, wait_secs: int = 10) -> ConsentTestResult:
"""Run 3-phase consent test on a URL."""
result = ConsentTestResult()
wait_ms = wait_secs * 1000
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=["--no-sandbox", "--disable-dev-shm-usage"],
)
try:
# ── Phase A: Before consent ──────────────────────────
logger.info("Phase A: First visit (no interaction)")
ctx_a = await browser.new_context(user_agent=USER_AGENT)
page_a = await ctx_a.new_page()
scripts_a = []
page_a.on("request", lambda req: _collect_script(req, scripts_a))
await page_a.goto(url, wait_until="networkidle", timeout=30000)
await page_a.wait_for_timeout(wait_ms)
result.before_scripts = _get_page_scripts(scripts_a)
result.before_cookies = _get_cookie_names(await ctx_a.cookies())
result.before_tracking = find_tracking_services(result.before_scripts)
result.before_violations = find_violations_before_consent(result.before_scripts)
# Detect banner
banner = await detect_banner(page_a)
result.banner_detected = banner.detected
result.banner_provider = banner.provider
await ctx_a.close()
if not banner.detected:
logger.info("No consent banner detected — skipping Phase B/C")
await browser.close()
return result
# ── Phase B: After rejecting ─────────────────────────
logger.info("Phase B: Reject consent (%s)", banner.provider)
ctx_b = await browser.new_context(user_agent=USER_AGENT)
page_b = await ctx_b.new_page()
scripts_b = []
page_b.on("request", lambda req: _collect_script(req, scripts_b))
await page_b.goto(url, wait_until="networkidle", timeout=30000)
await page_b.wait_for_timeout(3000)
clicked = await click_button(page_b, banner.reject_selector)
if clicked:
logger.info("Reject button clicked, waiting %ds", wait_secs)
await page_b.wait_for_timeout(wait_ms)
else:
logger.warning("Could not click reject button")
result.reject_scripts = _get_page_scripts(scripts_b)
result.reject_cookies = _get_cookie_names(await ctx_b.cookies())
reject_tracking = find_tracking_services(result.reject_scripts)
result.reject_new_tracking = [t for t in reject_tracking if t not in result.before_tracking]
result.reject_violations = find_violations_after_reject(
result.before_scripts, result.reject_scripts,
)
await ctx_b.close()
# ── Phase C: After accepting ─────────────────────────
logger.info("Phase C: Accept consent (%s)", banner.provider)
ctx_c = await browser.new_context(user_agent=USER_AGENT)
page_c = await ctx_c.new_page()
scripts_c = []
page_c.on("request", lambda req: _collect_script(req, scripts_c))
await page_c.goto(url, wait_until="networkidle", timeout=30000)
await page_c.wait_for_timeout(3000)
clicked = await click_button(page_c, banner.accept_selector)
if clicked:
logger.info("Accept button clicked, waiting %ds", wait_secs)
await page_c.wait_for_timeout(wait_ms)
else:
logger.warning("Could not click accept button")
result.accept_scripts = _get_page_scripts(scripts_c)
result.accept_cookies = _get_cookie_names(await ctx_c.cookies())
accept_tracking = find_tracking_services(result.accept_scripts)
result.accept_new_tracking = [t for t in accept_tracking if t not in result.before_tracking]
await ctx_c.close()
except Exception as e:
logger.error("Consent test failed: %s", e)
finally:
await browser.close()
logger.info(
"Consent test complete: banner=%s, violations_before=%d, violations_reject=%d",
result.banner_provider, len(result.before_violations), len(result.reject_violations),
)
return result
def _collect_script(request, scripts: list[str]):
"""Collect script request URLs."""
if request.resource_type in ("script", "image", "xhr", "fetch"):
scripts.append(request.url)
def _get_page_scripts(collected: list[str]) -> list[str]:
"""Deduplicate and filter script URLs."""
seen = set()
result = []
for url in collected:
domain = url.split("/")[2] if "/" in url and len(url.split("/")) > 2 else url
if domain not in seen:
seen.add(domain)
result.append(url)
return result[:50] # Cap at 50
def _get_cookie_names(cookies: list[dict]) -> list[str]:
"""Extract cookie names from Playwright cookie list."""
return sorted(set(c.get("name", "") for c in cookies if c.get("name")))
+157
View File
@@ -0,0 +1,157 @@
"""
Script Analyzer — classifies detected scripts and cookies against known services.
"""
import re
from dataclasses import dataclass
SERVICE_PATTERNS: dict[str, dict] = {
r"google.?analytics|gtag|UA-\d|G-\w{5}": {
"name": "Google Analytics", "requires_consent": True,
"legal_ref": "§25 TDDDG, Art. 44-49 DSGVO",
},
r"googletagmanager|gtm\.js": {
"name": "Google Tag Manager", "requires_consent": True,
"legal_ref": "§25 TDDDG",
},
r"facebook\.net|fbevents|fbq": {
"name": "Meta/Facebook Pixel", "requires_consent": True,
"legal_ref": "§25 TDDDG, Art. 44-49 DSGVO",
},
r"hotjar\.com|_hjSettings": {
"name": "Hotjar", "requires_consent": True,
"legal_ref": "§25 TDDDG (Session Recording)",
},
r"clarity\.ms": {
"name": "Microsoft Clarity", "requires_consent": True,
"legal_ref": "§25 TDDDG (Session Replay)",
},
r"tiktok\.com/i18n|analytics\.tiktok": {
"name": "TikTok Pixel", "requires_consent": True,
"legal_ref": "§25 TDDDG, Drittlandtransfer China",
},
r"linkedin\.com/insight|snap\.licdn": {
"name": "LinkedIn Insight", "requires_consent": True,
"legal_ref": "§25 TDDDG, Art. 44-49 DSGVO",
},
r"pinterest\.com/ct|pinimg\.com/ct": {
"name": "Pinterest Tag", "requires_consent": True,
"legal_ref": "§25 TDDDG",
},
r"criteo\.com|criteo\.net": {
"name": "Criteo", "requires_consent": True,
"legal_ref": "§25 TDDDG",
},
r"doubleclick\.net|googlesyndication": {
"name": "Google Ads/DoubleClick", "requires_consent": True,
"legal_ref": "§25 TDDDG, Art. 44-49 DSGVO",
},
r"fonts\.googleapis\.com|fonts\.gstatic": {
"name": "Google Fonts", "requires_consent": True,
"legal_ref": "LG Muenchen I, Az. 3 O 17493/20",
},
r"recaptcha|grecaptcha": {
"name": "Google reCAPTCHA", "requires_consent": True,
"legal_ref": "§25 TDDDG",
},
r"youtube\.com/embed|ytimg": {
"name": "YouTube", "requires_consent": True,
"legal_ref": "§25 TDDDG, Art. 44-49 DSGVO",
},
r"maps\.googleapis|maps\.google": {
"name": "Google Maps", "requires_consent": True,
"legal_ref": "§25 TDDDG",
},
r"intercom\.io|intercomcdn": {
"name": "Intercom", "requires_consent": True,
"legal_ref": "Art. 44-49 DSGVO",
},
r"zendesk\.com|zdassets": {
"name": "Zendesk", "requires_consent": True,
"legal_ref": "Art. 44-49 DSGVO",
},
r"sentry\.io|sentry-cdn": {
"name": "Sentry", "requires_consent": False,
"legal_ref": "Berechtigtes Interesse (Error Tracking)",
},
r"cdn\.cloudflare\.com": {
"name": "Cloudflare CDN", "requires_consent": False,
"legal_ref": "Berechtigtes Interesse (CDN)",
},
r"didomi|cookiebot|onetrust|usercentrics|consentmanager": {
"name": "Consent Management", "requires_consent": False,
"legal_ref": "Notwendig (CMP)",
},
}
@dataclass
class Violation:
service: str
severity: str # "HIGH", "CRITICAL"
text: str
legal_ref: str
def classify_scripts(scripts: list[str]) -> list[str]:
"""Classify script URLs into known service names."""
services = set()
for script in scripts:
for pattern, meta in SERVICE_PATTERNS.items():
if re.search(pattern, script, re.IGNORECASE):
services.add(meta["name"])
break
return sorted(services)
def find_tracking_services(scripts: list[str]) -> list[str]:
"""Find services that require consent."""
tracking = []
for script in scripts:
for pattern, meta in SERVICE_PATTERNS.items():
if re.search(pattern, script, re.IGNORECASE) and meta["requires_consent"]:
tracking.append(meta["name"])
break
return sorted(set(tracking))
def find_violations_before_consent(scripts: list[str]) -> list[Violation]:
"""Find tracking scripts that load without consent (HIGH)."""
violations = []
seen = set()
for script in scripts:
for pattern, meta in SERVICE_PATTERNS.items():
if re.search(pattern, script, re.IGNORECASE) and meta["requires_consent"]:
name = meta["name"]
if name not in seen:
seen.add(name)
violations.append(Violation(
service=name, severity="HIGH",
text=f"{name} laedt OHNE vorherige Einwilligung",
legal_ref=meta["legal_ref"],
))
break
return violations
def find_violations_after_reject(
before_scripts: list[str], after_scripts: list[str],
) -> list[Violation]:
"""Find tracking scripts that still load after rejection (CRITICAL)."""
violations = []
after_tracking = find_tracking_services(after_scripts)
before_tracking = find_tracking_services(before_scripts)
for service in after_tracking:
if service in before_tracking:
# Was already loading before AND still loads after reject = CRITICAL
for pattern, meta in SERVICE_PATTERNS.items():
if meta["name"] == service:
violations.append(Violation(
service=service, severity="CRITICAL",
text=f"{service} laedt TROTZ Ablehnung — moegliches Dark Pattern",
legal_ref=meta["legal_ref"] + ", Art. 5(3) ePrivacy",
))
break
return violations