feat: Phase 10 — Playwright website scanner replaces httpx

New /website-scan endpoint in consent-tester service:
- Real browser renders JavaScript (finds dynamic content)
- Clicks navigation menus (discovers hidden sub-pages like IHK DSB page)
- Follows links within DSE to find regional privacy policies
- Collects rendered HTML for each page (after JS execution)

Backend integration:
- agent_scan_routes tries Playwright first, falls back to httpx
- DSE text and HTML extracted from Playwright-rendered pages
- Service detection runs on rendered HTML (catches JS-loaded scripts)

Also fixes:
- GA regex: G-[A-Z0-9]{8,12} prevents CSS class false positives
- etracker added to service registry
- External page scanning blocked (same-domain only)
- CSS/JS/image files excluded from page list

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-04-29 19:16:50 +02:00
parent 5eeef3a9c3
commit cedc5de15d
3 changed files with 367 additions and 6 deletions
@@ -98,19 +98,73 @@ async def scan_website_endpoint(req: ScanRequest):
"""Deep website scan: multi-page crawl + SOLL/IST service comparison."""
is_live = req.mode == "post_launch"
# Step 1: Scan website (5-10 pages)
scan = await scan_website(req.url)
# Step 1: Scan website — try Playwright first (JS-rendered), fallback to httpx
playwright_htmls: dict[str, str] = {}
try:
async with httpx.AsyncClient(timeout=120.0) as pw_client:
pw_resp = await pw_client.post(
"http://bp-compliance-consent-tester:8094/website-scan",
json={"url": req.url, "max_pages": 15, "click_nav": True},
)
if pw_resp.status_code == 200:
pw_data = pw_resp.json()
playwright_htmls = pw_data.get("page_htmls", {})
logger.info("Playwright scan: %d pages, %d scripts",
pw_data.get("pages_count", 0), len(pw_data.get("external_scripts", [])))
except Exception as e:
logger.warning("Playwright scanner unavailable, falling back to httpx: %s", e)
# Use Playwright results if available, otherwise fall back to httpx scanner
if playwright_htmls:
# Build ScanResult from Playwright data
from compliance.services.website_scanner import ScanResult, DetectedService, _detect_services, _detect_ai_mentions
from compliance.services.service_registry import SERVICE_REGISTRY
scan = ScanResult()
scan.pages_scanned = list(playwright_htmls.keys())
for page_url, html in playwright_htmls.items():
_detect_services(html, page_url, scan)
_detect_ai_mentions(html, page_url, scan)
# Deduplicate
seen = set()
unique = []
for svc in scan.detected_services:
if svc.id not in seen:
seen.add(svc.id)
unique.append(svc)
scan.detected_services = unique
scan.chatbot_detected = any(s.category == "chatbot" for s in scan.detected_services)
if scan.chatbot_detected:
scan.chatbot_provider = next(s.name for s in scan.detected_services if s.category == "chatbot")
else:
scan = await scan_website(req.url)
logger.info("Scanned %d pages, found %d services", len(scan.pages_scanned), len(scan.detected_services))
# Step 2: Fetch privacy policy text for SOLL extraction
dse_text = await _fetch_dse_text(req.url, scan.pages_scanned)
# Step 2: Fetch privacy policy text (from Playwright HTMLs or httpx)
dse_text = ""
for page_url, html in playwright_htmls.items():
if re.search(r"datenschutz|privacy|dsgvo", page_url, re.IGNORECASE):
import re as _re
clean = _re.sub(r"<(script|style)[^>]*>.*?</\1>", "", html, flags=_re.DOTALL | _re.IGNORECASE)
clean = _re.sub(r"<[^>]+>", " ", clean)
clean = _re.sub(r"\s+", " ", clean).strip()
dse_text = clean[:4000]
break
if not dse_text:
dse_text = await _fetch_dse_text(req.url, scan.pages_scanned)
# Step 3: Extract services mentioned in DSE via LLM
dse_services = await extract_dse_services(dse_text) if dse_text else []
logger.info("DSE mentions %d services", len(dse_services))
# Step 4: Parse DSE into structured sections
dse_html = await _fetch_dse_html(req.url, scan.pages_scanned)
# Step 4: Parse DSE into structured sections (prefer Playwright HTML)
dse_html = ""
for page_url, html in playwright_htmls.items():
if re.search(r"datenschutz|privacy|dsgvo", page_url, re.IGNORECASE):
dse_html = html
break
if not dse_html:
dse_html = await _fetch_dse_html(req.url, scan.pages_scanned)
dse_sections = parse_dse(dse_html, req.url) if dse_html else []
logger.info("Parsed %d DSE sections", len(dse_sections))
+52
View File
@@ -14,6 +14,7 @@ from pydantic import BaseModel
from services.consent_scanner import run_consent_test, ConsentTestResult
from services.authenticated_scanner import run_authenticated_test, AuthTestResult
from services.playwright_scanner import scan_website_playwright
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s")
logger = logging.getLogger(__name__)
@@ -172,3 +173,54 @@ async def authenticated_scan(req: AuthScanRequest):
findings_count=missing,
scanned_at=datetime.now(timezone.utc).isoformat(),
)
# ═══════════════════════════════════════════════════════════════
# PLAYWRIGHT WEBSITE SCAN (Phase 10 — replaces httpx scanner)
# ═══════════════════════════════════════════════════════════════
class WebsiteScanRequest(BaseModel):
url: str
max_pages: int = 15
click_nav: bool = True
class PageInfo(BaseModel):
url: str
status: int
title: str = ""
error: str = ""
class WebsiteScanResponse(BaseModel):
url: str
pages: list[PageInfo]
pages_count: int
external_scripts: list[str]
cookies: list[str]
page_htmls: dict[str, str] # url -> rendered HTML (for backend analysis)
scanned_at: str
@app.post("/website-scan", response_model=WebsiteScanResponse)
async def website_scan(req: WebsiteScanRequest):
"""Scan website using Playwright — discovers pages via JS navigation + menu clicks."""
logger.info("Starting Playwright website scan for %s (max %d pages)", req.url, req.max_pages)
result = await scan_website_playwright(req.url, req.max_pages, req.click_nav)
# Build page HTML map (only successful pages, truncated)
page_htmls = {}
for p in result.pages:
if p.html and p.status < 400:
page_htmls[p.url] = p.html[:50000] # Cap at 50KB per page
return WebsiteScanResponse(
url=req.url,
pages=[PageInfo(url=p.url, status=p.status, title=p.title, error=p.error) for p in result.pages],
pages_count=len(result.pages),
external_scripts=result.external_scripts[:50],
cookies=result.all_cookies,
page_htmls=page_htmls,
scanned_at=datetime.now(timezone.utc).isoformat(),
)
@@ -0,0 +1,255 @@
"""
Playwright Website Scanner — browser-based page discovery and scanning.
Unlike httpx (curl-like), this uses a real browser that:
- Executes JavaScript (finds dynamically loaded content)
- Clicks navigation menus (discovers hidden sub-pages)
- Renders SPAs (React, Angular, Vue)
- Sees what the user sees
Replaces the httpx-based scanner for comprehensive website analysis.
"""
import logging
import re
from dataclasses import dataclass, field
from urllib.parse import urljoin, urlparse
from playwright.async_api import async_playwright, Page, BrowserContext
logger = logging.getLogger(__name__)
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
# Patterns for discovering important pages
NAV_LINK_KEYWORDS = [
"datenschutz", "privacy", "dsgvo",
"impressum", "imprint", "legal",
"agb", "terms", "nutzungsbedingung",
"cookie",
"kontakt", "contact",
"ueber-uns", "about",
"service",
]
# Skip these URL patterns (not HTML pages)
SKIP_PATTERNS = re.compile(
r"\.(css|js|png|jpg|jpeg|gif|svg|pdf|zip|xml|json|woff|woff2|ttf|eot|ico)(\?|#|$)",
re.IGNORECASE,
)
@dataclass
class ScannedPage:
url: str
status: int
html: str = ""
title: str = ""
error: str = ""
@dataclass
class PlaywrightScanResult:
pages: list[ScannedPage] = field(default_factory=list)
discovered_urls: list[str] = field(default_factory=list)
external_scripts: list[str] = field(default_factory=list)
all_cookies: list[str] = field(default_factory=list)
async def scan_website_playwright(
base_url: str,
max_pages: int = 15,
click_nav: bool = True,
) -> PlaywrightScanResult:
"""Scan website using Playwright — discovers pages via JS navigation."""
result = PlaywrightScanResult()
parsed = urlparse(base_url)
origin = f"{parsed.scheme}://{parsed.netloc}"
visited: set[str] = set()
to_visit: list[str] = [base_url]
# Also add common paths to probe
if base_url != origin:
to_visit.append(origin)
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=["--no-sandbox", "--disable-dev-shm-usage"],
)
context = await browser.new_context(user_agent=USER_AGENT)
try:
# Phase 1: Load start page and discover navigation links
page = await context.new_page()
scripts_collected: list[str] = []
page.on("request", lambda req: _collect_external(req, scripts_collected, origin))
start_page = await _visit_page(page, base_url, result)
visited.add(base_url)
if start_page and start_page.html:
# Extract links from rendered HTML (after JS execution)
nav_links = await _discover_nav_links(page, origin)
for link in nav_links:
if link not in visited and link not in to_visit:
to_visit.append(link)
# Click navigation menus to find hidden links
if click_nav:
menu_links = await _click_navigation_menus(page, origin)
for link in menu_links:
if link not in visited and link not in to_visit:
to_visit.append(link)
# Phase 2: Visit discovered pages (up to max_pages)
for url in to_visit[:max_pages]:
if url in visited:
continue
if SKIP_PATTERNS.search(url):
continue
if not url.startswith(origin):
continue
visited.add(url)
await _visit_page(page, url, result)
# On DSE pages, discover additional links
current_url = page.url
if re.search(r"datenschutz|privacy|dsgvo", current_url, re.IGNORECASE):
dse_links = await _discover_nav_links(page, origin)
for link in dse_links:
if link not in visited and link not in to_visit and link.startswith(origin):
to_visit.append(link)
# Collect cookies
cookies = await context.cookies()
result.all_cookies = sorted(set(c.get("name", "") for c in cookies))
result.external_scripts = list(set(scripts_collected))
result.discovered_urls = [p.url for p in result.pages]
except Exception as e:
logger.error("Playwright scan failed: %s", e)
finally:
await context.close()
await browser.close()
logger.info("Playwright scan: %d pages visited, %d scripts found",
len(result.pages), len(result.external_scripts))
return result
async def _visit_page(page: Page, url: str, result: PlaywrightScanResult) -> ScannedPage | None:
"""Visit a page and capture its rendered HTML."""
sp = ScannedPage(url=url, status=0)
try:
response = await page.goto(url, wait_until="networkidle", timeout=20000)
sp.status = response.status if response else 0
await page.wait_for_timeout(2000)
if sp.status < 400:
sp.html = await page.content()
sp.title = await page.title()
else:
sp.error = f"HTTP {sp.status}"
except Exception as e:
sp.status = 0
sp.error = str(e)[:100]
logger.warning("Failed to visit %s: %s", url, sp.error)
result.pages.append(sp)
return sp if sp.status < 400 and sp.html else None
async def _discover_nav_links(page: Page, origin: str) -> list[str]:
"""Extract all navigation links from the rendered page."""
links = set()
try:
# Get all <a> hrefs from the rendered DOM
all_hrefs = await page.evaluate("""
() => [...document.querySelectorAll('a[href]')]
.map(a => a.href)
.filter(h => h.startsWith('http'))
""")
for href in (all_hrefs or []):
href_clean = href.split("#")[0].split("?")[0] # Strip anchors and params
if not href_clean.startswith(origin):
continue
if SKIP_PATTERNS.search(href_clean):
continue
# Prioritize pages with relevant keywords
href_lower = href_clean.lower()
if any(kw in href_lower for kw in NAV_LINK_KEYWORDS):
links.add(href_clean)
except Exception as e:
logger.warning("Link discovery failed: %s", e)
return sorted(links)[:20] # Cap at 20
async def _click_navigation_menus(page: Page, origin: str) -> list[str]:
"""Click expandable navigation menus to discover hidden links."""
links = set()
try:
# Find and click common menu toggles
menu_selectors = [
'button[aria-expanded="false"]',
'[class*="dropdown"] > a',
'[class*="menu-toggle"]',
'[class*="nav-toggle"]',
'details:not([open]) > summary',
'[class*="accordion"] > button',
'nav button',
]
for selector in menu_selectors:
try:
elements = page.locator(selector)
count = await elements.count()
for i in range(min(count, 10)): # Max 10 menus
try:
await elements.nth(i).click(timeout=2000)
await page.wait_for_timeout(500)
except Exception:
continue
except Exception:
continue
# After clicking, collect newly visible links
new_hrefs = await page.evaluate("""
() => [...document.querySelectorAll('a[href]')]
.filter(a => {
const rect = a.getBoundingClientRect();
return rect.width > 0 && rect.height > 0;
})
.map(a => a.href)
.filter(h => h.startsWith('http'))
""")
for href in (new_hrefs or []):
href_clean = href.split("#")[0].split("?")[0]
if href_clean.startswith(origin) and not SKIP_PATTERNS.search(href_clean):
href_lower = href_clean.lower()
if any(kw in href_lower for kw in NAV_LINK_KEYWORDS):
links.add(href_clean)
except Exception as e:
logger.warning("Menu click failed: %s", e)
return sorted(links)[:10]
def _collect_external(request, scripts: list[str], origin: str):
"""Collect external script/resource URLs."""
url = request.url
if request.resource_type in ("script", "image") and not url.startswith(origin):
domain = url.split("/")[2] if len(url.split("/")) > 2 else url
if domain not in [s.split("/")[2] if len(s.split("/")) > 2 else s for s in scripts]:
scripts.append(url)