From cedc5de15d896e0f2b378277fe9d3b3129c1db27 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Wed, 29 Apr 2026 19:16:50 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=2010=20=E2=80=94=20Playwright=20w?= =?UTF-8?q?ebsite=20scanner=20replaces=20httpx?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New /website-scan endpoint in consent-tester service: - Real browser renders JavaScript (finds dynamic content) - Clicks navigation menus (discovers hidden sub-pages like IHK DSB page) - Follows links within DSE to find regional privacy policies - Collects rendered HTML for each page (after JS execution) Backend integration: - agent_scan_routes tries Playwright first, falls back to httpx - DSE text and HTML extracted from Playwright-rendered pages - Service detection runs on rendered HTML (catches JS-loaded scripts) Also fixes: - GA regex: G-[A-Z0-9]{8,12} prevents CSS class false positives - etracker added to service registry - External page scanning blocked (same-domain only) - CSS/JS/image files excluded from page list Co-Authored-By: Claude Opus 4.6 (1M context) --- .../compliance/api/agent_scan_routes.py | 66 ++++- consent-tester/main.py | 52 ++++ consent-tester/services/playwright_scanner.py | 255 ++++++++++++++++++ 3 files changed, 367 insertions(+), 6 deletions(-) create mode 100644 consent-tester/services/playwright_scanner.py diff --git a/backend-compliance/compliance/api/agent_scan_routes.py b/backend-compliance/compliance/api/agent_scan_routes.py index fa671dd..d75adb3 100644 --- a/backend-compliance/compliance/api/agent_scan_routes.py +++ b/backend-compliance/compliance/api/agent_scan_routes.py @@ -98,19 +98,73 @@ async def scan_website_endpoint(req: ScanRequest): """Deep website scan: multi-page crawl + SOLL/IST service comparison.""" is_live = req.mode == "post_launch" - # Step 1: Scan website (5-10 pages) - scan = await scan_website(req.url) + # Step 1: Scan website — try Playwright first (JS-rendered), fallback to httpx + playwright_htmls: dict[str, str] = {} + try: + async with httpx.AsyncClient(timeout=120.0) as pw_client: + pw_resp = await pw_client.post( + "http://bp-compliance-consent-tester:8094/website-scan", + json={"url": req.url, "max_pages": 15, "click_nav": True}, + ) + if pw_resp.status_code == 200: + pw_data = pw_resp.json() + playwright_htmls = pw_data.get("page_htmls", {}) + logger.info("Playwright scan: %d pages, %d scripts", + pw_data.get("pages_count", 0), len(pw_data.get("external_scripts", []))) + except Exception as e: + logger.warning("Playwright scanner unavailable, falling back to httpx: %s", e) + + # Use Playwright results if available, otherwise fall back to httpx scanner + if playwright_htmls: + # Build ScanResult from Playwright data + from compliance.services.website_scanner import ScanResult, DetectedService, _detect_services, _detect_ai_mentions + from compliance.services.service_registry import SERVICE_REGISTRY + scan = ScanResult() + scan.pages_scanned = list(playwright_htmls.keys()) + for page_url, html in playwright_htmls.items(): + _detect_services(html, page_url, scan) + _detect_ai_mentions(html, page_url, scan) + # Deduplicate + seen = set() + unique = [] + for svc in scan.detected_services: + if svc.id not in seen: + seen.add(svc.id) + unique.append(svc) + scan.detected_services = unique + scan.chatbot_detected = any(s.category == "chatbot" for s in scan.detected_services) + if scan.chatbot_detected: + scan.chatbot_provider = next(s.name for s in scan.detected_services if s.category == "chatbot") + else: + scan = await scan_website(req.url) + logger.info("Scanned %d pages, found %d services", len(scan.pages_scanned), len(scan.detected_services)) - # Step 2: Fetch privacy policy text for SOLL extraction - dse_text = await _fetch_dse_text(req.url, scan.pages_scanned) + # Step 2: Fetch privacy policy text (from Playwright HTMLs or httpx) + dse_text = "" + for page_url, html in playwright_htmls.items(): + if re.search(r"datenschutz|privacy|dsgvo", page_url, re.IGNORECASE): + import re as _re + clean = _re.sub(r"<(script|style)[^>]*>.*?", "", html, flags=_re.DOTALL | _re.IGNORECASE) + clean = _re.sub(r"<[^>]+>", " ", clean) + clean = _re.sub(r"\s+", " ", clean).strip() + dse_text = clean[:4000] + break + if not dse_text: + dse_text = await _fetch_dse_text(req.url, scan.pages_scanned) # Step 3: Extract services mentioned in DSE via LLM dse_services = await extract_dse_services(dse_text) if dse_text else [] logger.info("DSE mentions %d services", len(dse_services)) - # Step 4: Parse DSE into structured sections - dse_html = await _fetch_dse_html(req.url, scan.pages_scanned) + # Step 4: Parse DSE into structured sections (prefer Playwright HTML) + dse_html = "" + for page_url, html in playwright_htmls.items(): + if re.search(r"datenschutz|privacy|dsgvo", page_url, re.IGNORECASE): + dse_html = html + break + if not dse_html: + dse_html = await _fetch_dse_html(req.url, scan.pages_scanned) dse_sections = parse_dse(dse_html, req.url) if dse_html else [] logger.info("Parsed %d DSE sections", len(dse_sections)) diff --git a/consent-tester/main.py b/consent-tester/main.py index 01faecd..f06d3cc 100644 --- a/consent-tester/main.py +++ b/consent-tester/main.py @@ -14,6 +14,7 @@ from pydantic import BaseModel from services.consent_scanner import run_consent_test, ConsentTestResult from services.authenticated_scanner import run_authenticated_test, AuthTestResult +from services.playwright_scanner import scan_website_playwright logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s") logger = logging.getLogger(__name__) @@ -172,3 +173,54 @@ async def authenticated_scan(req: AuthScanRequest): findings_count=missing, scanned_at=datetime.now(timezone.utc).isoformat(), ) + + +# ═══════════════════════════════════════════════════════════════ +# PLAYWRIGHT WEBSITE SCAN (Phase 10 — replaces httpx scanner) +# ═══════════════════════════════════════════════════════════════ + +class WebsiteScanRequest(BaseModel): + url: str + max_pages: int = 15 + click_nav: bool = True + + +class PageInfo(BaseModel): + url: str + status: int + title: str = "" + error: str = "" + + +class WebsiteScanResponse(BaseModel): + url: str + pages: list[PageInfo] + pages_count: int + external_scripts: list[str] + cookies: list[str] + page_htmls: dict[str, str] # url -> rendered HTML (for backend analysis) + scanned_at: str + + +@app.post("/website-scan", response_model=WebsiteScanResponse) +async def website_scan(req: WebsiteScanRequest): + """Scan website using Playwright — discovers pages via JS navigation + menu clicks.""" + logger.info("Starting Playwright website scan for %s (max %d pages)", req.url, req.max_pages) + + result = await scan_website_playwright(req.url, req.max_pages, req.click_nav) + + # Build page HTML map (only successful pages, truncated) + page_htmls = {} + for p in result.pages: + if p.html and p.status < 400: + page_htmls[p.url] = p.html[:50000] # Cap at 50KB per page + + return WebsiteScanResponse( + url=req.url, + pages=[PageInfo(url=p.url, status=p.status, title=p.title, error=p.error) for p in result.pages], + pages_count=len(result.pages), + external_scripts=result.external_scripts[:50], + cookies=result.all_cookies, + page_htmls=page_htmls, + scanned_at=datetime.now(timezone.utc).isoformat(), + ) diff --git a/consent-tester/services/playwright_scanner.py b/consent-tester/services/playwright_scanner.py new file mode 100644 index 0000000..5fdbd5c --- /dev/null +++ b/consent-tester/services/playwright_scanner.py @@ -0,0 +1,255 @@ +""" +Playwright Website Scanner — browser-based page discovery and scanning. + +Unlike httpx (curl-like), this uses a real browser that: +- Executes JavaScript (finds dynamically loaded content) +- Clicks navigation menus (discovers hidden sub-pages) +- Renders SPAs (React, Angular, Vue) +- Sees what the user sees + +Replaces the httpx-based scanner for comprehensive website analysis. +""" + +import logging +import re +from dataclasses import dataclass, field +from urllib.parse import urljoin, urlparse + +from playwright.async_api import async_playwright, Page, BrowserContext + +logger = logging.getLogger(__name__) + +USER_AGENT = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" +) + +# Patterns for discovering important pages +NAV_LINK_KEYWORDS = [ + "datenschutz", "privacy", "dsgvo", + "impressum", "imprint", "legal", + "agb", "terms", "nutzungsbedingung", + "cookie", + "kontakt", "contact", + "ueber-uns", "about", + "service", +] + +# Skip these URL patterns (not HTML pages) +SKIP_PATTERNS = re.compile( + r"\.(css|js|png|jpg|jpeg|gif|svg|pdf|zip|xml|json|woff|woff2|ttf|eot|ico)(\?|#|$)", + re.IGNORECASE, +) + + +@dataclass +class ScannedPage: + url: str + status: int + html: str = "" + title: str = "" + error: str = "" + + +@dataclass +class PlaywrightScanResult: + pages: list[ScannedPage] = field(default_factory=list) + discovered_urls: list[str] = field(default_factory=list) + external_scripts: list[str] = field(default_factory=list) + all_cookies: list[str] = field(default_factory=list) + + +async def scan_website_playwright( + base_url: str, + max_pages: int = 15, + click_nav: bool = True, +) -> PlaywrightScanResult: + """Scan website using Playwright — discovers pages via JS navigation.""" + result = PlaywrightScanResult() + parsed = urlparse(base_url) + origin = f"{parsed.scheme}://{parsed.netloc}" + visited: set[str] = set() + to_visit: list[str] = [base_url] + + # Also add common paths to probe + if base_url != origin: + to_visit.append(origin) + + async with async_playwright() as p: + browser = await p.chromium.launch( + headless=True, + args=["--no-sandbox", "--disable-dev-shm-usage"], + ) + context = await browser.new_context(user_agent=USER_AGENT) + + try: + # Phase 1: Load start page and discover navigation links + page = await context.new_page() + scripts_collected: list[str] = [] + page.on("request", lambda req: _collect_external(req, scripts_collected, origin)) + + start_page = await _visit_page(page, base_url, result) + visited.add(base_url) + + if start_page and start_page.html: + # Extract links from rendered HTML (after JS execution) + nav_links = await _discover_nav_links(page, origin) + for link in nav_links: + if link not in visited and link not in to_visit: + to_visit.append(link) + + # Click navigation menus to find hidden links + if click_nav: + menu_links = await _click_navigation_menus(page, origin) + for link in menu_links: + if link not in visited and link not in to_visit: + to_visit.append(link) + + # Phase 2: Visit discovered pages (up to max_pages) + for url in to_visit[:max_pages]: + if url in visited: + continue + if SKIP_PATTERNS.search(url): + continue + if not url.startswith(origin): + continue + + visited.add(url) + await _visit_page(page, url, result) + + # On DSE pages, discover additional links + current_url = page.url + if re.search(r"datenschutz|privacy|dsgvo", current_url, re.IGNORECASE): + dse_links = await _discover_nav_links(page, origin) + for link in dse_links: + if link not in visited and link not in to_visit and link.startswith(origin): + to_visit.append(link) + + # Collect cookies + cookies = await context.cookies() + result.all_cookies = sorted(set(c.get("name", "") for c in cookies)) + result.external_scripts = list(set(scripts_collected)) + result.discovered_urls = [p.url for p in result.pages] + + except Exception as e: + logger.error("Playwright scan failed: %s", e) + finally: + await context.close() + await browser.close() + + logger.info("Playwright scan: %d pages visited, %d scripts found", + len(result.pages), len(result.external_scripts)) + return result + + +async def _visit_page(page: Page, url: str, result: PlaywrightScanResult) -> ScannedPage | None: + """Visit a page and capture its rendered HTML.""" + sp = ScannedPage(url=url, status=0) + try: + response = await page.goto(url, wait_until="networkidle", timeout=20000) + sp.status = response.status if response else 0 + await page.wait_for_timeout(2000) + + if sp.status < 400: + sp.html = await page.content() + sp.title = await page.title() + else: + sp.error = f"HTTP {sp.status}" + + except Exception as e: + sp.status = 0 + sp.error = str(e)[:100] + logger.warning("Failed to visit %s: %s", url, sp.error) + + result.pages.append(sp) + return sp if sp.status < 400 and sp.html else None + + +async def _discover_nav_links(page: Page, origin: str) -> list[str]: + """Extract all navigation links from the rendered page.""" + links = set() + try: + # Get all hrefs from the rendered DOM + all_hrefs = await page.evaluate(""" + () => [...document.querySelectorAll('a[href]')] + .map(a => a.href) + .filter(h => h.startsWith('http')) + """) + + for href in (all_hrefs or []): + href_clean = href.split("#")[0].split("?")[0] # Strip anchors and params + if not href_clean.startswith(origin): + continue + if SKIP_PATTERNS.search(href_clean): + continue + + # Prioritize pages with relevant keywords + href_lower = href_clean.lower() + if any(kw in href_lower for kw in NAV_LINK_KEYWORDS): + links.add(href_clean) + + except Exception as e: + logger.warning("Link discovery failed: %s", e) + + return sorted(links)[:20] # Cap at 20 + + +async def _click_navigation_menus(page: Page, origin: str) -> list[str]: + """Click expandable navigation menus to discover hidden links.""" + links = set() + try: + # Find and click common menu toggles + menu_selectors = [ + 'button[aria-expanded="false"]', + '[class*="dropdown"] > a', + '[class*="menu-toggle"]', + '[class*="nav-toggle"]', + 'details:not([open]) > summary', + '[class*="accordion"] > button', + 'nav button', + ] + + for selector in menu_selectors: + try: + elements = page.locator(selector) + count = await elements.count() + for i in range(min(count, 10)): # Max 10 menus + try: + await elements.nth(i).click(timeout=2000) + await page.wait_for_timeout(500) + except Exception: + continue + except Exception: + continue + + # After clicking, collect newly visible links + new_hrefs = await page.evaluate(""" + () => [...document.querySelectorAll('a[href]')] + .filter(a => { + const rect = a.getBoundingClientRect(); + return rect.width > 0 && rect.height > 0; + }) + .map(a => a.href) + .filter(h => h.startsWith('http')) + """) + + for href in (new_hrefs or []): + href_clean = href.split("#")[0].split("?")[0] + if href_clean.startswith(origin) and not SKIP_PATTERNS.search(href_clean): + href_lower = href_clean.lower() + if any(kw in href_lower for kw in NAV_LINK_KEYWORDS): + links.add(href_clean) + + except Exception as e: + logger.warning("Menu click failed: %s", e) + + return sorted(links)[:10] + + +def _collect_external(request, scripts: list[str], origin: str): + """Collect external script/resource URLs.""" + url = request.url + if request.resource_type in ("script", "image") and not url.startswith(origin): + domain = url.split("/")[2] if len(url.split("/")) > 2 else url + if domain not in [s.split("/")[2] if len(s.split("/")) > 2 else s for s in scripts]: + scripts.append(url)