""" Consent Scanner — Playwright-based 3-phase cookie consent test. Phase A: Before consent (first visit) Phase B: After rejecting consent Phase C: After accepting consent """ import asyncio import logging from dataclasses import dataclass, field from playwright.async_api import async_playwright, Page, BrowserContext try: from playwright_stealth import stealth_async HAS_STEALTH = True except ImportError: HAS_STEALTH = False from services.banner_detector import detect_banner, click_button, BannerInfo from services.script_analyzer import ( classify_scripts, find_tracking_services, find_violations_before_consent, find_violations_after_reject, Violation, ) from services.banner_text_checker import check_banner_text as _check_banner_text from services.consent_interceptor import ( INIT_SCRIPT as _INTERCEPTOR_INIT, collect_intercepted_data as _collect_intercepted, get_consent_state as _get_consent_state, analyze_phase_data as _analyze_phase, ) logger = logging.getLogger(__name__) USER_AGENT = ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) @dataclass class ConsentTestResult: banner_detected: bool = False banner_provider: str = "" # Phase A: Before consent before_scripts: list[str] = field(default_factory=list) before_cookies: list[str] = field(default_factory=list) before_tracking: list[str] = field(default_factory=list) before_violations: list[Violation] = field(default_factory=list) # Phase B: After reject reject_scripts: list[str] = field(default_factory=list) reject_cookies: list[str] = field(default_factory=list) reject_new_tracking: list[str] = field(default_factory=list) reject_violations: list[Violation] = field(default_factory=list) # Phase C: After accept accept_scripts: list[str] = field(default_factory=list) accept_cookies: list[str] = field(default_factory=list) accept_new_tracking: list[str] = field(default_factory=list) accept_undocumented: list[str] = field(default_factory=list) # Phase D-F: Per-category tests category_tests: list = field(default_factory=list) # list[CategoryTestResult] # Banner text checks banner_text_violations: list[Violation] = field(default_factory=list) banner_has_impressum_link: bool = False banner_has_dse_link: bool = False # Deep verification (per-phase intercepted data) deep_verification: dict = field(default_factory=dict) # TCF vendors (resolved via GVL after accept phase) tcf_vendors: list = field(default_factory=list) # P48: CMP-Payloads captured during all phases (Usercentrics, OneTrust, etc.) # — passed to backend for deterministic vendor extraction. cmp_payloads: list = field(default_factory=list) # P50: per-vendor detail-modal-extracts (description, opt-out, cookies etc.) vendor_details: list = field(default_factory=list) # P59b: full cookie details per phase (name, value, domain, expires) # for behavior-validation in backend. Implicit declared_category: # before/reject phase = essential (site claims), accept = any. cookies_detailed: list = field(default_factory=list) # P85: base64-PNG-Screenshot des Banners vor dem ersten Klick. # Backend embedded das als in der Mail — visueller Beweis # "so sah das Banner zum Audit-Zeitpunkt aus". banner_screenshot_b64: str = "" async def run_consent_test( url: str, wait_secs: int = 10, categories: list[str] | None = None, ) -> ConsentTestResult: """Run 3-phase consent test on a URL. Args: url: Website URL to test. wait_secs: Seconds to wait per phase. categories: Optional list of category names to test (empty = test all). """ result = ConsentTestResult() wait_ms = wait_secs * 1000 filter_cats = categories or [] # P48: Init CMP-Capture early so it attaches to every page/context. # CMP JSON-Endpoints (Usercentrics, OneTrust, Cookiebot, ePaaS) are # fetched once per page load — capture them across all 3 phases so # the backend can do deterministic vendor extraction without LLM. from services.cmp_extractor import CMPCapture cmp_capture = CMPCapture() async with async_playwright() as p: browser = await p.chromium.launch( headless=True, args=[ "--no-sandbox", "--disable-dev-shm-usage", "--disable-blink-features=AutomationControlled", "--window-size=1920,1080", # P50c: Mercedes/Akamai Bot Manager crashed renderer # without these (limits memory pressure + GPU init): "--disable-gpu", "--disable-software-rasterizer", "--disable-background-timer-throttling", "--disable-renderer-backgrounding", "--disable-backgrounding-occluded-windows", "--js-flags=--max-old-space-size=2048", ], ) try: # ── Phase A: Before consent ────────────────────────── logger.info("Phase A: First visit (no interaction)") ctx_a = await browser.new_context( user_agent=USER_AGENT, viewport={"width": 1920, "height": 1080}, locale="de-DE", timezone_id="Europe/Berlin", ) page_a = await ctx_a.new_page() await page_a.add_init_script(_INTERCEPTOR_INIT) if HAS_STEALTH: await stealth_async(page_a) cmp_capture.attach(page_a) # P48 scripts_a = [] page_a.on("request", lambda req: _collect_script(req, scripts_a)) # P50c: Mercedes/Akamai SPA never reaches networkidle. # Use domcontentloaded + short JS-wait + retry on crash. for _attempt in range(2): try: await page_a.goto(url, wait_until="domcontentloaded", timeout=20000) await page_a.wait_for_timeout(3500) break except Exception as _e: err = str(_e)[:120] logger.warning("Phase A goto attempt %d failed: %s", _attempt + 1, err) if "crashed" in err.lower() and _attempt == 0: await page_a.wait_for_timeout(2000) continue try: await page_a.goto(url, wait_until="load", timeout=20000) except Exception: pass break await page_a.wait_for_timeout(wait_ms) # Deep verification: Phase A try: intercepted_a = await _collect_intercepted(page_a) consent_state_a = await _get_consent_state(page_a) deep_violations_a = _analyze_phase("before_consent", intercepted_a, consent_state_a) result.deep_verification["before_consent"] = { "intercepted": intercepted_a, "consent_state": consent_state_a, "violations": deep_violations_a, } except Exception as exc: logger.warning("Phase A deep verification failed: %s", exc) result.before_scripts = _get_page_scripts(scripts_a) _cookies_a = await ctx_a.cookies() result.before_cookies = _get_cookie_names(_cookies_a) # P59b: capture full details — phase = "before" = implicit essential-claim for ck in _cookies_a: result.cookies_detailed.append({ "name": ck.get("name", ""), "value": (ck.get("value") or "")[:200], "domain": ck.get("domain", ""), "expires": ck.get("expires"), "phase": "before", "declared_category": "essential", }) result.before_tracking = find_tracking_services(result.before_scripts) result.before_violations = find_violations_before_consent(result.before_scripts) # Detect banner banner = await detect_banner(page_a) result.banner_detected = banner.detected result.banner_provider = banner.provider # Check banner text for legal issues if banner.detected: banner_violations = await _check_banner_text(page_a) result.banner_text_violations = banner_violations["violations"] result.banner_has_impressum_link = banner_violations["has_impressum"] result.banner_has_dse_link = banner_violations["has_dse"] # P85 — visueller Beweis fuer die Mail. try: import base64 as _b64 png = await page_a.screenshot( full_page=False, type="png", timeout=10000, ) if png and len(png) < 1_500_000: # < 1.5 MB result.banner_screenshot_b64 = _b64.b64encode(png).decode("ascii") logger.info("P85: banner screenshot captured (%d bytes)", len(png)) except Exception as _se: logger.warning("P85: banner screenshot failed: %s", _se) await ctx_a.close() if not banner.detected: logger.info("No consent banner detected — skipping Phase B/C") await browser.close() return result # ── Phase B: After rejecting ───────────────────────── logger.info("Phase B: Reject consent (%s)", banner.provider) ctx_b = await browser.new_context( user_agent=USER_AGENT, viewport={"width": 1920, "height": 1080}, locale="de-DE", timezone_id="Europe/Berlin", ) page_b = await ctx_b.new_page() await page_b.add_init_script(_INTERCEPTOR_INIT) if HAS_STEALTH: await stealth_async(page_b) cmp_capture.attach(page_b) # P48 scripts_b = [] page_b.on("request", lambda req: _collect_script(req, scripts_b)) try: await page_b.goto(url, wait_until="domcontentloaded", timeout=20000) except Exception as _e: logger.warning("networkidle timeout, fallback to load: %s", str(_e)[:80]) await page_b.goto(url, wait_until="load", timeout=30000) await page_b.wait_for_timeout(3000) clicked = await click_button(page_b, banner.reject_selector) if clicked: logger.info("Reject button clicked, waiting %ds", wait_secs) await page_b.wait_for_timeout(wait_ms) else: logger.warning("Could not click reject button") # Deep verification: Phase B try: intercepted_b = await _collect_intercepted(page_b) consent_state_b = await _get_consent_state(page_b) deep_violations_b = _analyze_phase("after_reject", intercepted_b, consent_state_b) result.deep_verification["after_reject"] = { "intercepted": intercepted_b, "consent_state": consent_state_b, "violations": deep_violations_b, } except Exception as exc: logger.warning("Phase B deep verification failed: %s", exc) result.reject_scripts = _get_page_scripts(scripts_b) _cookies_b = await ctx_b.cookies() result.reject_cookies = _get_cookie_names(_cookies_b) # P59b: after-Reject = site claims these are essential _before_names = {c.get("name", "") for c in _cookies_a} for ck in _cookies_b: if ck.get("name", "") in _before_names: continue # already captured in 'before' result.cookies_detailed.append({ "name": ck.get("name", ""), "value": (ck.get("value") or "")[:200], "domain": ck.get("domain", ""), "expires": ck.get("expires"), "phase": "reject", "declared_category": "essential", }) reject_tracking = find_tracking_services(result.reject_scripts) result.reject_new_tracking = [t for t in reject_tracking if t not in result.before_tracking] result.reject_violations = find_violations_after_reject( result.before_scripts, result.reject_scripts, ) await ctx_b.close() # ── Phase C: After accepting ───────────────────────── logger.info("Phase C: Accept consent (%s)", banner.provider) ctx_c = await browser.new_context( user_agent=USER_AGENT, viewport={"width": 1920, "height": 1080}, locale="de-DE", timezone_id="Europe/Berlin", ) page_c = await ctx_c.new_page() await page_c.add_init_script(_INTERCEPTOR_INIT) if HAS_STEALTH: await stealth_async(page_c) cmp_capture.attach(page_c) # P48 scripts_c = [] page_c.on("request", lambda req: _collect_script(req, scripts_c)) try: await page_c.goto(url, wait_until="domcontentloaded", timeout=20000) except Exception as _e: logger.warning("networkidle timeout, fallback to load: %s", str(_e)[:80]) await page_c.goto(url, wait_until="load", timeout=30000) await page_c.wait_for_timeout(3000) clicked = await click_button(page_c, banner.accept_selector) if clicked: logger.info("Accept button clicked, waiting %ds", wait_secs) await page_c.wait_for_timeout(wait_ms) else: logger.warning("Could not click accept button") # Deep verification: Phase C try: intercepted_c = await _collect_intercepted(page_c) consent_state_c = await _get_consent_state(page_c) deep_violations_c = _analyze_phase("after_accept", intercepted_c, consent_state_c) result.deep_verification["after_accept"] = { "intercepted": intercepted_c, "consent_state": consent_state_c, "violations": deep_violations_c, } except Exception as exc: logger.warning("Phase C deep verification failed: %s", exc) result.accept_scripts = _get_page_scripts(scripts_c) _cookies_c = await ctx_c.cookies() result.accept_cookies = _get_cookie_names(_cookies_c) # P59b: post-Accept new cookies — declared "any" (consent given) _seen_names = {c["name"] for c in result.cookies_detailed} for ck in _cookies_c: if ck.get("name", "") in _seen_names: continue result.cookies_detailed.append({ "name": ck.get("name", ""), "value": (ck.get("value") or "")[:200], "domain": ck.get("domain", ""), "expires": ck.get("expires"), "phase": "accept", "declared_category": "", # unclear what category — consent given }) accept_tracking = find_tracking_services(result.accept_scripts) result.accept_new_tracking = [t for t in accept_tracking if t not in result.before_tracking] # TCF vendor extraction (after accept, while page is still open) try: from services.consent_interceptor import extract_tcf_vendors result.tcf_vendors = await extract_tcf_vendors(page_c) except Exception as exc: logger.warning("TCF vendor extraction failed: %s", exc) await ctx_c.close() # ── Phase D-F: Per-category tests ──────────────────────── try: from services.category_tester import detect_categories, test_single_category ctx_cat = await browser.new_context( user_agent=USER_AGENT, viewport={"width": 1920, "height": 1080}, locale="de-DE", timezone_id="Europe/Berlin", ) page_cat = await ctx_cat.new_page() if HAS_STEALTH: await stealth_async(page_cat) try: await page_cat.goto(url, wait_until="domcontentloaded", timeout=15000) except Exception as _e: logger.warning("networkidle timeout, fallback to load: %s", str(_e)[:80]) await page_cat.goto(url, wait_until="load", timeout=20000) await page_cat.wait_for_timeout(2000) detected_cats = await detect_categories(page_cat, banner) await page_cat.close() # Filter to requested categories if specified if filter_cats and detected_cats: detected_cats = [ c for c in detected_cats if c.name in filter_cats ] logger.info( "Filtered to %d categories (requested: %s)", len(detected_cats), filter_cats, ) if detected_cats: # P26: per-category 25s + phase budget 150s. Mercedes # has 9 categories which would block the /scan well # beyond the caller's 240s timeout. Skip rather than # block — banner_quality + cmp_payloads matter more # than per-cat detail. import time # asyncio already imported at top (P50c) phase_deadline = time.monotonic() + 90.0 # Dedup by name (some sites detect same cat 3x via # shadow-DOM walk; testing each is wasteful) seen_names: set[str] = set() unique_cats = [c for c in detected_cats if not (c.name in seen_names or seen_names.add(c.name))] logger.info("Testing %d unique categories (budget=90s, per-cat=15s)", len(unique_cats)) for cat in unique_cats: if time.monotonic() >= phase_deadline: logger.warning("Category phase budget exhausted, " "skipping remaining %d categories", len(unique_cats) - len(result.category_tests)) break cat_ctx = await browser.new_context( user_agent=USER_AGENT, viewport={"width": 1920, "height": 1080}, locale="de-DE", timezone_id="Europe/Berlin", ) try: cat_result = await asyncio.wait_for( test_single_category(cat_ctx, url, cat, banner, wait_ms), timeout=15.0, ) result.category_tests.append(cat_result) except asyncio.TimeoutError: logger.warning("Category '%s' timed out after 15s, skipping", cat.name) finally: await cat_ctx.close() else: logger.info("No categories detected — skipping per-category tests") await ctx_cat.close() except Exception as cat_err: logger.warning("Category tests failed (non-blocking): %s", cat_err) # ── P56: Anti-Auditing-Detection (vor Phase G) ───────── # Marker erfassen → bei aktivem Bot-Block Phase G überspringen # (TDM-Respekt) UND HIGH-Finding für Transparenz-Verstoss. try: from services.vendor_detail_extractor import _detect_anti_audit anti = await _detect_anti_audit(page_c) if anti.get("bot_protection"): result.banner_text_violations.append(Violation( service="Cookie-Banner", severity="LOW", text=f"Hinweis: {anti['bot_protection']} ist aktiv und blockiert " f"automatisierte Compliance-Audits. Fuer Endnutzer voll " f"funktional. Empfehlung: Audit-API bereitstellen damit " f"unabhaengige Pruefer (Aufsichtsbehoerden, DSB) maschinen" f"lesbar verifizieren koennen — staerkt Vertrauen ohne " f"Bot-Schutz zu reduzieren.", legal_ref="Rechenschaftspflicht Art. 5(2) DSGVO, " "Transparenz-Empfehlung DSK-OH 2024", )) if anti.get("user_select_none"): result.banner_text_violations.append(Violation( service="Cookie-Banner", severity="MEDIUM", text="Banner-Settings-Oberflaeche nicht per Maus kopierbar " "(CSS user-select:none). Endnutzer koennen sich Cookie-Listen " "+ Anbieter nicht einfach archivieren. Info-Modals pro Vendor " "sind hingegen kopierbar — bitte gleiches Verhalten auch " "auf der Uebersichtsseite ermoeglichen.", legal_ref="Art. 12(1) DSGVO (transparente Information), " "DSK-OH Telemedien 2024 (Informations-Festhalten)", )) if anti.get("tdm_meta"): logger.info("Anti-Audit: TDM opt-out meta-tag detected: %s", anti["tdm_meta"]) except Exception as e: logger.debug("Anti-Audit detection skipped: %s", e) # ── Phase G: Per-Vendor Detail-Extraction (P50) ───────── # After Accept, re-open banner and click each Info-button # to capture detail-modal text. Detail-XHRs also captured # by CMPCapture (still attached). Runs only if Banner was # detected and an accept_text is known. if result.banner_detected and banner is not None: try: from services.vendor_detail_extractor import ( extract_vendor_details, ) accept_sel = banner.accept_selector or None logger.info("Phase G: starting vendor-detail-extract (max 50 vendors)") vd = await asyncio.wait_for( extract_vendor_details( browser, url, accept_selector=accept_sel, max_vendors=50, ), timeout=600.0, # 10min hard cap ) # Serialise dataclasses to plain dicts for JSON-Response for v in vd: result.vendor_details.append({ "name": v.name, "description": v.description, "processing_company": v.processing_company, "address": v.address, "purposes": v.purposes, "technologies": v.technologies, "cookies": v.cookies, "retention": v.retention, "opt_out_url": v.opt_out_url, "privacy_url": v.privacy_url, "raw_text": v.raw_text, }) logger.info("Phase G complete: %d vendor-details captured", len(result.vendor_details)) except asyncio.TimeoutError: logger.warning("Phase G: hard timeout reached (10min)") except Exception as vd_err: logger.warning("Phase G failed (non-blocking): %s", vd_err) except Exception as e: logger.error("Consent test failed: %s", e) finally: await browser.close() # P48: collect CMP-payloads captured during all phases. CMPCapture # stores them as tuples (cmp_name, data). Convert to dicts that # match the format used by /dsi-discovery so backend can process # them with extract_vendors_from_payloads(). Dedup by-data not # by-URL since CMPCapture doesn't store the URL. seen_keys: set[str] = set() for cmp_name, data in cmp_capture.payloads: # Dedup key: cmp_name + length-of-data + first few JSON keys try: sig = f"{cmp_name}:{len(str(data))}:{','.join(sorted(list(data.keys())[:5]) if isinstance(data, dict) else [])}" except Exception: sig = f"{cmp_name}:{id(data)}" if sig in seen_keys: continue seen_keys.add(sig) result.cmp_payloads.append({"kind": cmp_name, "data": data}) logger.info( "Consent test complete: banner=%s, violations_before=%d, violations_reject=%d, categories=%d, cmp_payloads=%d", result.banner_provider, len(result.before_violations), len(result.reject_violations), len(result.category_tests), len(result.cmp_payloads), ) return result def _collect_script(request, scripts: list[str]): """Collect script request URLs.""" if request.resource_type in ("script", "image", "xhr", "fetch"): scripts.append(request.url) def _get_page_scripts(collected: list[str]) -> list[str]: """Deduplicate and filter script URLs.""" seen = set() result = [] for url in collected: domain = url.split("/")[2] if "/" in url and len(url.split("/")) > 2 else url if domain not in seen: seen.add(domain) result.append(url) return result[:50] # Cap at 50 def _get_cookie_names(cookies: list[dict]) -> list[str]: """Extract cookie names from Playwright cookie list.""" return sorted(set(c.get("name", "") for c in cookies if c.get("name")))