""" Consent Scanner — Playwright-based 3-phase cookie consent test. Phase A: Before consent (first visit) Phase B: After rejecting consent Phase C: After accepting consent """ import logging from dataclasses import dataclass, field from playwright.async_api import async_playwright, Page, BrowserContext from services.banner_detector import detect_banner, click_button, BannerInfo from services.script_analyzer import ( classify_scripts, find_tracking_services, find_violations_before_consent, find_violations_after_reject, Violation, ) from services.banner_text_checker import check_banner_text as _check_banner_text logger = logging.getLogger(__name__) USER_AGENT = ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) @dataclass class ConsentTestResult: banner_detected: bool = False banner_provider: str = "" # Phase A: Before consent before_scripts: list[str] = field(default_factory=list) before_cookies: list[str] = field(default_factory=list) before_tracking: list[str] = field(default_factory=list) before_violations: list[Violation] = field(default_factory=list) # Phase B: After reject reject_scripts: list[str] = field(default_factory=list) reject_cookies: list[str] = field(default_factory=list) reject_new_tracking: list[str] = field(default_factory=list) reject_violations: list[Violation] = field(default_factory=list) # Phase C: After accept accept_scripts: list[str] = field(default_factory=list) accept_cookies: list[str] = field(default_factory=list) accept_new_tracking: list[str] = field(default_factory=list) accept_undocumented: list[str] = field(default_factory=list) # Phase D-F: Per-category tests category_tests: list = field(default_factory=list) # list[CategoryTestResult] # Banner text checks banner_text_violations: list[Violation] = field(default_factory=list) banner_has_impressum_link: bool = False banner_has_dse_link: bool = False async def run_consent_test(url: str, wait_secs: int = 10) -> ConsentTestResult: """Run 3-phase consent test on a URL.""" result = ConsentTestResult() wait_ms = wait_secs * 1000 async with async_playwright() as p: browser = await p.chromium.launch( headless=True, args=["--no-sandbox", "--disable-dev-shm-usage"], ) try: # ── Phase A: Before consent ────────────────────────── logger.info("Phase A: First visit (no interaction)") ctx_a = await browser.new_context(user_agent=USER_AGENT) page_a = await ctx_a.new_page() scripts_a = [] page_a.on("request", lambda req: _collect_script(req, scripts_a)) await page_a.goto(url, wait_until="networkidle", timeout=30000) await page_a.wait_for_timeout(wait_ms) result.before_scripts = _get_page_scripts(scripts_a) result.before_cookies = _get_cookie_names(await ctx_a.cookies()) result.before_tracking = find_tracking_services(result.before_scripts) result.before_violations = find_violations_before_consent(result.before_scripts) # Detect banner banner = await detect_banner(page_a) result.banner_detected = banner.detected result.banner_provider = banner.provider # Check banner text for legal issues if banner.detected: banner_violations = await _check_banner_text(page_a) result.banner_text_violations = banner_violations["violations"] result.banner_has_impressum_link = banner_violations["has_impressum"] result.banner_has_dse_link = banner_violations["has_dse"] await ctx_a.close() if not banner.detected: logger.info("No consent banner detected — skipping Phase B/C") await browser.close() return result # ── Phase B: After rejecting ───────────────────────── logger.info("Phase B: Reject consent (%s)", banner.provider) ctx_b = await browser.new_context(user_agent=USER_AGENT) page_b = await ctx_b.new_page() scripts_b = [] page_b.on("request", lambda req: _collect_script(req, scripts_b)) await page_b.goto(url, wait_until="networkidle", timeout=30000) await page_b.wait_for_timeout(3000) clicked = await click_button(page_b, banner.reject_selector) if clicked: logger.info("Reject button clicked, waiting %ds", wait_secs) await page_b.wait_for_timeout(wait_ms) else: logger.warning("Could not click reject button") result.reject_scripts = _get_page_scripts(scripts_b) result.reject_cookies = _get_cookie_names(await ctx_b.cookies()) reject_tracking = find_tracking_services(result.reject_scripts) result.reject_new_tracking = [t for t in reject_tracking if t not in result.before_tracking] result.reject_violations = find_violations_after_reject( result.before_scripts, result.reject_scripts, ) await ctx_b.close() # ── Phase C: After accepting ───────────────────────── logger.info("Phase C: Accept consent (%s)", banner.provider) ctx_c = await browser.new_context(user_agent=USER_AGENT) page_c = await ctx_c.new_page() scripts_c = [] page_c.on("request", lambda req: _collect_script(req, scripts_c)) await page_c.goto(url, wait_until="networkidle", timeout=30000) await page_c.wait_for_timeout(3000) clicked = await click_button(page_c, banner.accept_selector) if clicked: logger.info("Accept button clicked, waiting %ds", wait_secs) await page_c.wait_for_timeout(wait_ms) else: logger.warning("Could not click accept button") result.accept_scripts = _get_page_scripts(scripts_c) result.accept_cookies = _get_cookie_names(await ctx_c.cookies()) accept_tracking = find_tracking_services(result.accept_scripts) result.accept_new_tracking = [t for t in accept_tracking if t not in result.before_tracking] await ctx_c.close() # ── Phase D-F: Per-category tests ──────────────────────── try: from services.category_tester import detect_categories, test_single_category ctx_cat = await browser.new_context(user_agent=USER_AGENT) page_cat = await ctx_cat.new_page() await page_cat.goto(url, wait_until="networkidle", timeout=20000) await page_cat.wait_for_timeout(2000) categories = await detect_categories(page_cat, banner) await page_cat.close() if categories: logger.info("Testing %d categories individually", len(categories)) for cat in categories: cat_ctx = await browser.new_context(user_agent=USER_AGENT) cat_result = await test_single_category(cat_ctx, url, cat, banner, wait_ms) result.category_tests.append(cat_result) await cat_ctx.close() else: logger.info("No categories detected — skipping per-category tests") await ctx_cat.close() except Exception as cat_err: logger.warning("Category tests failed (non-blocking): %s", cat_err) except Exception as e: logger.error("Consent test failed: %s", e) finally: await browser.close() logger.info( "Consent test complete: banner=%s, violations_before=%d, violations_reject=%d, categories=%d", result.banner_provider, len(result.before_violations), len(result.reject_violations), len(result.category_tests), ) return result def _collect_script(request, scripts: list[str]): """Collect script request URLs.""" if request.resource_type in ("script", "image", "xhr", "fetch"): scripts.append(request.url) def _get_page_scripts(collected: list[str]) -> list[str]: """Deduplicate and filter script URLs.""" seen = set() result = [] for url in collected: domain = url.split("/")[2] if "/" in url and len(url.split("/")) > 2 else url if domain not in seen: seen.add(domain) result.append(url) return result[:50] # Cap at 50 def _get_cookie_names(cookies: list[dict]) -> list[str]: """Extract cookie names from Playwright cookie list.""" return sorted(set(c.get("name", "") for c in cookies if c.get("name")))