diff --git a/consent-tester/Dockerfile b/consent-tester/Dockerfile new file mode 100644 index 0000000..58c2333 --- /dev/null +++ b/consent-tester/Dockerfile @@ -0,0 +1,23 @@ +FROM python:3.12-slim-bookworm + +WORKDIR /app + +# Install system dependencies for Playwright/Chromium +RUN apt-get update && apt-get install -y --no-install-recommends \ + libnss3 libnspr4 libatk1.0-0 libatk-bridge2.0-0 libcups2 \ + libdrm2 libxkbcommon0 libxcomposite1 libxdamage1 libxfixes3 \ + libxrandr2 libgbm1 libpango-1.0-0 libcairo2 libasound2 \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +RUN playwright install chromium + +COPY . . + +RUN useradd --create-home appuser +USER appuser + +EXPOSE 8094 + +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8094"] diff --git a/consent-tester/main.py b/consent-tester/main.py new file mode 100644 index 0000000..50eae88 --- /dev/null +++ b/consent-tester/main.py @@ -0,0 +1,86 @@ +""" +Consent Tester Service — Playwright-based 3-phase cookie consent test. + +Tests what scripts/cookies load BEFORE consent, AFTER rejection, and AFTER acceptance. +Runs as independent microservice on port 8094. +""" + +import logging +from datetime import datetime, timezone + +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel + +from services.consent_scanner import run_consent_test, ConsentTestResult + +logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s") +logger = logging.getLogger(__name__) + +app = FastAPI(title="BreakPilot Consent Tester", version="1.0.0") + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + + +class ScanRequest(BaseModel): + url: str + timeout_per_phase: int = 10 # seconds to wait after page load + + +class ScanResponse(BaseModel): + url: str + banner_detected: bool + banner_provider: str + phases: dict + summary: dict + scanned_at: str + + +@app.get("/health") +async def health(): + return {"status": "healthy", "service": "consent-tester"} + + +@app.post("/scan", response_model=ScanResponse) +async def scan_consent(req: ScanRequest): + """Run 3-phase consent test on a URL.""" + logger.info("Starting consent test for %s", req.url) + result = await run_consent_test(req.url, req.timeout_per_phase) + + return ScanResponse( + url=req.url, + banner_detected=result.banner_detected, + banner_provider=result.banner_provider, + phases={ + "before_consent": { + "scripts": result.before_scripts, + "cookies": result.before_cookies, + "tracking_services": result.before_tracking, + "violations": [v.__dict__ for v in result.before_violations], + }, + "after_reject": { + "scripts": result.reject_scripts, + "cookies": result.reject_cookies, + "new_tracking": result.reject_new_tracking, + "violations": [v.__dict__ for v in result.reject_violations], + }, + "after_accept": { + "scripts": result.accept_scripts, + "cookies": result.accept_cookies, + "new_tracking": result.accept_new_tracking, + "undocumented": result.accept_undocumented, + }, + }, + summary={ + "critical": sum(1 for v in result.reject_violations if v.severity == "CRITICAL"), + "high": len(result.before_violations), + "undocumented": len(result.accept_undocumented), + "total_violations": len(result.before_violations) + len(result.reject_violations), + }, + scanned_at=datetime.now(timezone.utc).isoformat(), + ) diff --git a/consent-tester/requirements.txt b/consent-tester/requirements.txt new file mode 100644 index 0000000..894be98 --- /dev/null +++ b/consent-tester/requirements.txt @@ -0,0 +1,3 @@ +fastapi==0.115.12 +uvicorn==0.34.2 +playwright==1.52.0 diff --git a/consent-tester/services/__init__.py b/consent-tester/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/consent-tester/services/banner_detector.py b/consent-tester/services/banner_detector.py new file mode 100644 index 0000000..396c5dd --- /dev/null +++ b/consent-tester/services/banner_detector.py @@ -0,0 +1,149 @@ +""" +Banner Detector — identifies Consent Management Platforms and their buttons. + +Supports 10+ CMPs with specific selectors + generic fallback. +""" + +from dataclasses import dataclass + +from playwright.async_api import Page, Locator + + +@dataclass +class BannerInfo: + detected: bool + provider: str + accept_selector: str + reject_selector: str + + +# CMP-specific selectors (ordered by market share) +CMP_SELECTORS = [ + { + "name": "Didomi", + "detect": "#didomi-host, [class*='didomi']", + "accept": "#didomi-notice-agree-button", + "reject": "#didomi-notice-disagree-button, .didomi-components-button--secondary", + }, + { + "name": "OneTrust", + "detect": "#onetrust-banner-sdk, [class*='onetrust']", + "accept": "#onetrust-accept-btn-handler", + "reject": "#onetrust-reject-all-handler, .onetrust-close-btn-handler", + }, + { + "name": "Cookiebot", + "detect": "#CybotCookiebotDialog, [class*='CybotCookiebot']", + "accept": "#CybotCookiebotDialogBodyLevelButtonLevelOptinAllowAll", + "reject": "#CybotCookiebotDialogBodyButtonDecline", + }, + { + "name": "Usercentrics", + "detect": "#usercentrics-root, [data-testid='uc-banner']", + "accept": "[data-testid='uc-accept-all-button']", + "reject": "[data-testid='uc-deny-all-button']", + }, + { + "name": "CookieYes", + "detect": ".cky-consent-container, [class*='cky-']", + "accept": ".cky-btn-accept", + "reject": ".cky-btn-reject, .cky-btn-customize", + }, + { + "name": "Quantcast", + "detect": ".qc-cmp2-container, [class*='qc-cmp']", + "accept": "[class*='qc-cmp2-summary-buttons'] button:first-child", + "reject": "[class*='qc-cmp2-summary-buttons'] button:last-child", + }, + { + "name": "Borlabs", + "detect": "#BorlabsCookieBox, [class*='BorlabsCookie']", + "accept": "#BorlabsCookieBox .cookie-accept, [data-cookie-accept]", + "reject": "#BorlabsCookieBox .cookie-refuse, [data-cookie-refuse]", + }, + { + "name": "Consentmanager", + "detect": "#cmpbox, [class*='cmpbox']", + "accept": ".cmpboxbtn.cmpboxbtnyes", + "reject": ".cmpboxbtn.cmpboxbtnno", + }, + { + "name": "Klaro", + "detect": ".klaro, [class*='klaro']", + "accept": ".klaro .cm-btn-accept", + "reject": ".klaro .cm-btn-decline", + }, + { + "name": "TarteAuCitron", + "detect": "#tarteaucitronRoot, [class*='tarteaucitron']", + "accept": "#tarteaucitronPersonalize2", + "reject": "#tarteaucitronAllDenied2", + }, +] + +# Generic fallback patterns (text-based) +GENERIC_ACCEPT_TEXTS = [ + "Alle akzeptieren", "Alles akzeptieren", "Alle Cookies akzeptieren", + "Accept all", "Accept All Cookies", "Akzeptieren", "Zustimmen", + "Einverstanden", "Ich stimme zu", "Ja, einverstanden", +] + +GENERIC_REJECT_TEXTS = [ + "Nur notwendige", "Nur essentielle", "Ablehnen", "Alle ablehnen", + "Reject", "Reject all", "Nur erforderliche", "Nur technisch notwendige", + "Decline", "Nein", "Nicht einverstanden", +] + + +async def detect_banner(page: Page) -> BannerInfo: + """Detect which CMP is used and return button selectors.""" + # Try CMP-specific selectors first + for cmp in CMP_SELECTORS: + try: + count = await page.locator(cmp["detect"]).count() + if count > 0: + return BannerInfo( + detected=True, + provider=cmp["name"], + accept_selector=cmp["accept"], + reject_selector=cmp["reject"], + ) + except Exception: + continue + + # Generic fallback — search for buttons by text + for text in GENERIC_ACCEPT_TEXTS: + try: + btn = page.get_by_text(text, exact=False) + if await btn.count() > 0: + accept = f'button:has-text("{text}")' + # Try to find reject button nearby + reject = "" + for rtext in GENERIC_REJECT_TEXTS: + rbtn = page.get_by_text(rtext, exact=False) + if await rbtn.count() > 0: + reject = f'button:has-text("{rtext}")' + break + return BannerInfo( + detected=True, + provider="Generic", + accept_selector=accept, + reject_selector=reject, + ) + except Exception: + continue + + return BannerInfo(detected=False, provider="", accept_selector="", reject_selector="") + + +async def click_button(page: Page, selector: str, timeout: int = 5000) -> bool: + """Try to click a consent button. Returns True if clicked successfully.""" + if not selector: + return False + try: + locator = page.locator(selector).first + await locator.wait_for(state="visible", timeout=timeout) + await locator.click() + return True + except Exception: + return False diff --git a/consent-tester/services/consent_scanner.py b/consent-tester/services/consent_scanner.py new file mode 100644 index 0000000..caa1c32 --- /dev/null +++ b/consent-tester/services/consent_scanner.py @@ -0,0 +1,171 @@ +""" +Consent Scanner — Playwright-based 3-phase cookie consent test. + +Phase A: Before consent (first visit) +Phase B: After rejecting consent +Phase C: After accepting consent +""" + +import logging +from dataclasses import dataclass, field + +from playwright.async_api import async_playwright, Page, BrowserContext + +from services.banner_detector import detect_banner, click_button, BannerInfo +from services.script_analyzer import ( + classify_scripts, find_tracking_services, + find_violations_before_consent, find_violations_after_reject, Violation, +) + +logger = logging.getLogger(__name__) + +USER_AGENT = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" +) + + +@dataclass +class ConsentTestResult: + banner_detected: bool = False + banner_provider: str = "" + # Phase A: Before consent + before_scripts: list[str] = field(default_factory=list) + before_cookies: list[str] = field(default_factory=list) + before_tracking: list[str] = field(default_factory=list) + before_violations: list[Violation] = field(default_factory=list) + # Phase B: After reject + reject_scripts: list[str] = field(default_factory=list) + reject_cookies: list[str] = field(default_factory=list) + reject_new_tracking: list[str] = field(default_factory=list) + reject_violations: list[Violation] = field(default_factory=list) + # Phase C: After accept + accept_scripts: list[str] = field(default_factory=list) + accept_cookies: list[str] = field(default_factory=list) + accept_new_tracking: list[str] = field(default_factory=list) + accept_undocumented: list[str] = field(default_factory=list) + + +async def run_consent_test(url: str, wait_secs: int = 10) -> ConsentTestResult: + """Run 3-phase consent test on a URL.""" + result = ConsentTestResult() + wait_ms = wait_secs * 1000 + + async with async_playwright() as p: + browser = await p.chromium.launch( + headless=True, + args=["--no-sandbox", "--disable-dev-shm-usage"], + ) + + try: + # ── Phase A: Before consent ────────────────────────── + logger.info("Phase A: First visit (no interaction)") + ctx_a = await browser.new_context(user_agent=USER_AGENT) + page_a = await ctx_a.new_page() + scripts_a = [] + page_a.on("request", lambda req: _collect_script(req, scripts_a)) + + await page_a.goto(url, wait_until="networkidle", timeout=30000) + await page_a.wait_for_timeout(wait_ms) + + result.before_scripts = _get_page_scripts(scripts_a) + result.before_cookies = _get_cookie_names(await ctx_a.cookies()) + result.before_tracking = find_tracking_services(result.before_scripts) + result.before_violations = find_violations_before_consent(result.before_scripts) + + # Detect banner + banner = await detect_banner(page_a) + result.banner_detected = banner.detected + result.banner_provider = banner.provider + + await ctx_a.close() + + if not banner.detected: + logger.info("No consent banner detected — skipping Phase B/C") + await browser.close() + return result + + # ── Phase B: After rejecting ───────────────────────── + logger.info("Phase B: Reject consent (%s)", banner.provider) + ctx_b = await browser.new_context(user_agent=USER_AGENT) + page_b = await ctx_b.new_page() + scripts_b = [] + page_b.on("request", lambda req: _collect_script(req, scripts_b)) + + await page_b.goto(url, wait_until="networkidle", timeout=30000) + await page_b.wait_for_timeout(3000) + + clicked = await click_button(page_b, banner.reject_selector) + if clicked: + logger.info("Reject button clicked, waiting %ds", wait_secs) + await page_b.wait_for_timeout(wait_ms) + else: + logger.warning("Could not click reject button") + + result.reject_scripts = _get_page_scripts(scripts_b) + result.reject_cookies = _get_cookie_names(await ctx_b.cookies()) + reject_tracking = find_tracking_services(result.reject_scripts) + result.reject_new_tracking = [t for t in reject_tracking if t not in result.before_tracking] + result.reject_violations = find_violations_after_reject( + result.before_scripts, result.reject_scripts, + ) + + await ctx_b.close() + + # ── Phase C: After accepting ───────────────────────── + logger.info("Phase C: Accept consent (%s)", banner.provider) + ctx_c = await browser.new_context(user_agent=USER_AGENT) + page_c = await ctx_c.new_page() + scripts_c = [] + page_c.on("request", lambda req: _collect_script(req, scripts_c)) + + await page_c.goto(url, wait_until="networkidle", timeout=30000) + await page_c.wait_for_timeout(3000) + + clicked = await click_button(page_c, banner.accept_selector) + if clicked: + logger.info("Accept button clicked, waiting %ds", wait_secs) + await page_c.wait_for_timeout(wait_ms) + else: + logger.warning("Could not click accept button") + + result.accept_scripts = _get_page_scripts(scripts_c) + result.accept_cookies = _get_cookie_names(await ctx_c.cookies()) + accept_tracking = find_tracking_services(result.accept_scripts) + result.accept_new_tracking = [t for t in accept_tracking if t not in result.before_tracking] + + await ctx_c.close() + + except Exception as e: + logger.error("Consent test failed: %s", e) + finally: + await browser.close() + + logger.info( + "Consent test complete: banner=%s, violations_before=%d, violations_reject=%d", + result.banner_provider, len(result.before_violations), len(result.reject_violations), + ) + return result + + +def _collect_script(request, scripts: list[str]): + """Collect script request URLs.""" + if request.resource_type in ("script", "image", "xhr", "fetch"): + scripts.append(request.url) + + +def _get_page_scripts(collected: list[str]) -> list[str]: + """Deduplicate and filter script URLs.""" + seen = set() + result = [] + for url in collected: + domain = url.split("/")[2] if "/" in url and len(url.split("/")) > 2 else url + if domain not in seen: + seen.add(domain) + result.append(url) + return result[:50] # Cap at 50 + + +def _get_cookie_names(cookies: list[dict]) -> list[str]: + """Extract cookie names from Playwright cookie list.""" + return sorted(set(c.get("name", "") for c in cookies if c.get("name"))) diff --git a/consent-tester/services/script_analyzer.py b/consent-tester/services/script_analyzer.py new file mode 100644 index 0000000..4079362 --- /dev/null +++ b/consent-tester/services/script_analyzer.py @@ -0,0 +1,157 @@ +""" +Script Analyzer — classifies detected scripts and cookies against known services. +""" + +import re +from dataclasses import dataclass + +SERVICE_PATTERNS: dict[str, dict] = { + r"google.?analytics|gtag|UA-\d|G-\w{5}": { + "name": "Google Analytics", "requires_consent": True, + "legal_ref": "§25 TDDDG, Art. 44-49 DSGVO", + }, + r"googletagmanager|gtm\.js": { + "name": "Google Tag Manager", "requires_consent": True, + "legal_ref": "§25 TDDDG", + }, + r"facebook\.net|fbevents|fbq": { + "name": "Meta/Facebook Pixel", "requires_consent": True, + "legal_ref": "§25 TDDDG, Art. 44-49 DSGVO", + }, + r"hotjar\.com|_hjSettings": { + "name": "Hotjar", "requires_consent": True, + "legal_ref": "§25 TDDDG (Session Recording)", + }, + r"clarity\.ms": { + "name": "Microsoft Clarity", "requires_consent": True, + "legal_ref": "§25 TDDDG (Session Replay)", + }, + r"tiktok\.com/i18n|analytics\.tiktok": { + "name": "TikTok Pixel", "requires_consent": True, + "legal_ref": "§25 TDDDG, Drittlandtransfer China", + }, + r"linkedin\.com/insight|snap\.licdn": { + "name": "LinkedIn Insight", "requires_consent": True, + "legal_ref": "§25 TDDDG, Art. 44-49 DSGVO", + }, + r"pinterest\.com/ct|pinimg\.com/ct": { + "name": "Pinterest Tag", "requires_consent": True, + "legal_ref": "§25 TDDDG", + }, + r"criteo\.com|criteo\.net": { + "name": "Criteo", "requires_consent": True, + "legal_ref": "§25 TDDDG", + }, + r"doubleclick\.net|googlesyndication": { + "name": "Google Ads/DoubleClick", "requires_consent": True, + "legal_ref": "§25 TDDDG, Art. 44-49 DSGVO", + }, + r"fonts\.googleapis\.com|fonts\.gstatic": { + "name": "Google Fonts", "requires_consent": True, + "legal_ref": "LG Muenchen I, Az. 3 O 17493/20", + }, + r"recaptcha|grecaptcha": { + "name": "Google reCAPTCHA", "requires_consent": True, + "legal_ref": "§25 TDDDG", + }, + r"youtube\.com/embed|ytimg": { + "name": "YouTube", "requires_consent": True, + "legal_ref": "§25 TDDDG, Art. 44-49 DSGVO", + }, + r"maps\.googleapis|maps\.google": { + "name": "Google Maps", "requires_consent": True, + "legal_ref": "§25 TDDDG", + }, + r"intercom\.io|intercomcdn": { + "name": "Intercom", "requires_consent": True, + "legal_ref": "Art. 44-49 DSGVO", + }, + r"zendesk\.com|zdassets": { + "name": "Zendesk", "requires_consent": True, + "legal_ref": "Art. 44-49 DSGVO", + }, + r"sentry\.io|sentry-cdn": { + "name": "Sentry", "requires_consent": False, + "legal_ref": "Berechtigtes Interesse (Error Tracking)", + }, + r"cdn\.cloudflare\.com": { + "name": "Cloudflare CDN", "requires_consent": False, + "legal_ref": "Berechtigtes Interesse (CDN)", + }, + r"didomi|cookiebot|onetrust|usercentrics|consentmanager": { + "name": "Consent Management", "requires_consent": False, + "legal_ref": "Notwendig (CMP)", + }, +} + + +@dataclass +class Violation: + service: str + severity: str # "HIGH", "CRITICAL" + text: str + legal_ref: str + + +def classify_scripts(scripts: list[str]) -> list[str]: + """Classify script URLs into known service names.""" + services = set() + for script in scripts: + for pattern, meta in SERVICE_PATTERNS.items(): + if re.search(pattern, script, re.IGNORECASE): + services.add(meta["name"]) + break + return sorted(services) + + +def find_tracking_services(scripts: list[str]) -> list[str]: + """Find services that require consent.""" + tracking = [] + for script in scripts: + for pattern, meta in SERVICE_PATTERNS.items(): + if re.search(pattern, script, re.IGNORECASE) and meta["requires_consent"]: + tracking.append(meta["name"]) + break + return sorted(set(tracking)) + + +def find_violations_before_consent(scripts: list[str]) -> list[Violation]: + """Find tracking scripts that load without consent (HIGH).""" + violations = [] + seen = set() + for script in scripts: + for pattern, meta in SERVICE_PATTERNS.items(): + if re.search(pattern, script, re.IGNORECASE) and meta["requires_consent"]: + name = meta["name"] + if name not in seen: + seen.add(name) + violations.append(Violation( + service=name, severity="HIGH", + text=f"{name} laedt OHNE vorherige Einwilligung", + legal_ref=meta["legal_ref"], + )) + break + return violations + + +def find_violations_after_reject( + before_scripts: list[str], after_scripts: list[str], +) -> list[Violation]: + """Find tracking scripts that still load after rejection (CRITICAL).""" + violations = [] + after_tracking = find_tracking_services(after_scripts) + before_tracking = find_tracking_services(before_scripts) + + for service in after_tracking: + if service in before_tracking: + # Was already loading before AND still loads after reject = CRITICAL + for pattern, meta in SERVICE_PATTERNS.items(): + if meta["name"] == service: + violations.append(Violation( + service=service, severity="CRITICAL", + text=f"{service} laedt TROTZ Ablehnung — moegliches Dark Pattern", + legal_ref=meta["legal_ref"] + ", Art. 5(3) ePrivacy", + )) + break + + return violations