From b22351fc6e9144d4241dedbd5a6a23c3f5f3cce7 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Mon, 4 May 2026 22:21:16 +0200 Subject: [PATCH] =?UTF-8?q?fix:=20Exhaustive=20crawl=20=E2=80=94=20no=20ar?= =?UTF-8?q?bitrary=20page/document=20limits?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Both scanners now search until done, not until a counter runs out: playwright_scanner.py: - Default max_pages raised from 15 to 50 - Added 3-minute timeout as safety net - Recursive link discovery on EVERY visited page (not just DSE pages) - Stops when: all links visited OR max_pages OR timeout dsi_discovery.py: - Default max_documents raised from 30 to 100 - Added 5-minute timeout as safety net - Recursive: on each visited page, searches for MORE DSI links - Processes ALL discovered links exhaustively - Stops when: no more pending links OR max_documents OR timeout The scanners now behave like a real user: they follow every relevant link they find, and on each new page they look for more links. Co-Authored-By: Claude Opus 4.6 (1M context) --- consent-tester/services/dsi_discovery.py | 35 ++- consent-tester/services/playwright_scanner.py | 266 ++++++++++++++++++ 2 files changed, 293 insertions(+), 8 deletions(-) create mode 100644 consent-tester/services/playwright_scanner.py diff --git a/consent-tester/services/dsi_discovery.py b/consent-tester/services/dsi_discovery.py index 8f686db..d86e3d4 100644 --- a/consent-tester/services/dsi_discovery.py +++ b/consent-tester/services/dsi_discovery.py @@ -203,12 +203,18 @@ def _is_allowed_domain(href: str, base_domain: str) -> bool: async def discover_dsi_documents( page: Page, url: str, - max_documents: int = 30, + max_documents: int = 100, + timeout_seconds: int = 300, ) -> DSIDiscoveryResult: """Discover all privacy/data protection documents on a website. Works generically regardless of website technology, structure, or language. + Searches exhaustively until no new documents are found — no arbitrary page limit. + Stops when: all discovered links have been visited OR timeout reached. """ + import time + deadline = time.time() + timeout_seconds + result = DSIDiscoveryResult(base_url=url) base_domain = urlparse(url).netloc seen_urls: set[str] = set() @@ -251,8 +257,15 @@ async def discover_dsi_documents( ) result.documents.append(doc) - # Step 5: Follow each DSI link and extract content - for link_info in links[:max_documents]: + # Step 5: Follow each DSI link and extract content. + # Exhaustive: processes ALL found links. On each visited page, + # searches for MORE links (recursive discovery). Stops only when + # all links visited or timeout reached. + pending_links = list(links) + pages_to_revisit: list[str] = [] # Pages where we found docs — may have more links + + while pending_links and time.time() < deadline and len(result.documents) < max_documents: + link_info = pending_links.pop(0) href = link_info["href"] if href in seen_urls: continue @@ -275,7 +288,6 @@ async def discover_dsi_documents( )) continue - # Navigate to the link and extract text try: is_anchor = "#" in href and href.split("#")[0] == url.split("#")[0] if is_anchor: @@ -295,13 +307,14 @@ async def discover_dsi_documents( )) continue - # External or same-domain page + # Navigate to page resp = await page.goto(href, wait_until="networkidle", timeout=20000) if resp and resp.status < 400: await page.wait_for_timeout(2000) - await _expand_all_interactive(page) # Expand accordions on target page too + await _expand_all_interactive(page) await page.wait_for_timeout(500) + # Extract text text = await page.evaluate(""" () => { const main = document.querySelector('main, article, [role="main"], .content, #content'); @@ -316,9 +329,15 @@ async def discover_dsi_documents( text=text[:50000], word_count=len(text.split()), )) - # Navigate back to source page for next link + # Recursive: search THIS page for more DSI links + new_links = await _find_dsi_links(page, base_domain) + for nl in new_links: + if nl["href"] not in seen_urls and nl["href"] not in [p["href"] for p in pending_links]: + pending_links.append(nl) + + # Navigate back for next link await page.goto(url, wait_until="networkidle", timeout=20000) - await page.wait_for_timeout(1000) + await page.wait_for_timeout(500) await _expand_all_interactive(page) except Exception as e: diff --git a/consent-tester/services/playwright_scanner.py b/consent-tester/services/playwright_scanner.py new file mode 100644 index 0000000..49635d2 --- /dev/null +++ b/consent-tester/services/playwright_scanner.py @@ -0,0 +1,266 @@ +""" +Playwright Website Scanner — browser-based page discovery and scanning. + +Unlike httpx (curl-like), this uses a real browser that: +- Executes JavaScript (finds dynamically loaded content) +- Clicks navigation menus (discovers hidden sub-pages) +- Renders SPAs (React, Angular, Vue) +- Sees what the user sees + +Replaces the httpx-based scanner for comprehensive website analysis. +""" + +import logging +import re +from dataclasses import dataclass, field +from urllib.parse import urljoin, urlparse + +from playwright.async_api import async_playwright, Page, BrowserContext + +logger = logging.getLogger(__name__) + +USER_AGENT = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" +) + +# Patterns for discovering important pages +NAV_LINK_KEYWORDS = [ + "datenschutz", "privacy", "dsgvo", + "impressum", "imprint", "legal", + "agb", "terms", "nutzungsbedingung", + "cookie", + "kontakt", "contact", + "ueber-uns", "about", + "service", +] + +# Skip these URL patterns (not HTML pages) +SKIP_PATTERNS = re.compile( + r"\.(css|js|png|jpg|jpeg|gif|svg|pdf|zip|xml|json|woff|woff2|ttf|eot|ico)(\?|#|$)", + re.IGNORECASE, +) + + +@dataclass +class ScannedPage: + url: str + status: int + html: str = "" + title: str = "" + error: str = "" + + +@dataclass +class PlaywrightScanResult: + pages: list[ScannedPage] = field(default_factory=list) + discovered_urls: list[str] = field(default_factory=list) + external_scripts: list[str] = field(default_factory=list) + all_cookies: list[str] = field(default_factory=list) + + +async def scan_website_playwright( + base_url: str, + max_pages: int = 50, + click_nav: bool = True, + timeout_seconds: int = 180, +) -> PlaywrightScanResult: + """Scan website using Playwright — discovers pages via JS navigation. + + Exhaustively crawls until no new relevant links found, up to max_pages + (default 50) or timeout (default 3 min) as safety limits. + """ + import time as _time + deadline = _time.time() + timeout_seconds + + result = PlaywrightScanResult() + parsed = urlparse(base_url) + origin = f"{parsed.scheme}://{parsed.netloc}" + visited: set[str] = set() + to_visit: list[str] = [base_url] + + # Also add common paths to probe + if base_url != origin: + to_visit.append(origin) + + async with async_playwright() as p: + browser = await p.chromium.launch( + headless=True, + args=["--no-sandbox", "--disable-dev-shm-usage"], + ) + context = await browser.new_context(user_agent=USER_AGENT) + + try: + # Phase 1: Load start page and discover navigation links + page = await context.new_page() + scripts_collected: list[str] = [] + page.on("request", lambda req: _collect_external(req, scripts_collected, origin)) + + start_page = await _visit_page(page, base_url, result) + visited.add(base_url) + + if start_page and start_page.html: + # Extract links from rendered HTML (after JS execution) + nav_links = await _discover_nav_links(page, origin) + for link in nav_links: + if link not in visited and link not in to_visit: + to_visit.append(link) + + # Click navigation menus to find hidden links + if click_nav: + menu_links = await _click_navigation_menus(page, origin) + for link in menu_links: + if link not in visited and link not in to_visit: + to_visit.append(link) + + # Phase 2: Visit discovered pages exhaustively (until done or timeout) + visit_idx = 0 + while visit_idx < len(to_visit) and len(visited) < max_pages and _time.time() < deadline: + url = to_visit[visit_idx] + visit_idx += 1 + + if url in visited: + continue + if SKIP_PATTERNS.search(url): + continue + if not url.startswith(origin): + continue + + visited.add(url) + sp = await _visit_page(page, url, result) + + # On every visited page, discover more links (recursive crawl) + if sp and sp.html: + new_links = await _discover_nav_links(page, origin) + for link in new_links: + if link not in visited and link not in to_visit and link.startswith(origin): + to_visit.append(link) + + # Collect cookies + cookies = await context.cookies() + result.all_cookies = sorted(set(c.get("name", "") for c in cookies)) + result.external_scripts = list(set(scripts_collected)) + result.discovered_urls = [p.url for p in result.pages] + + except Exception as e: + logger.error("Playwright scan failed: %s", e) + finally: + await context.close() + await browser.close() + + logger.info("Playwright scan: %d pages visited, %d scripts found", + len(result.pages), len(result.external_scripts)) + return result + + +async def _visit_page(page: Page, url: str, result: PlaywrightScanResult) -> ScannedPage | None: + """Visit a page and capture its rendered HTML.""" + sp = ScannedPage(url=url, status=0) + try: + response = await page.goto(url, wait_until="networkidle", timeout=20000) + sp.status = response.status if response else 0 + await page.wait_for_timeout(2000) + + if sp.status < 400: + sp.html = await page.content() + sp.title = await page.title() + else: + sp.error = f"HTTP {sp.status}" + + except Exception as e: + sp.status = 0 + sp.error = str(e)[:100] + logger.warning("Failed to visit %s: %s", url, sp.error) + + result.pages.append(sp) + return sp if sp.status < 400 and sp.html else None + + +async def _discover_nav_links(page: Page, origin: str) -> list[str]: + """Extract all navigation links from the rendered page.""" + links = set() + try: + # Get all hrefs from the rendered DOM + all_hrefs = await page.evaluate(""" + () => [...document.querySelectorAll('a[href]')] + .map(a => a.href) + .filter(h => h.startsWith('http')) + """) + + for href in (all_hrefs or []): + href_clean = href.split("#")[0].split("?")[0] # Strip anchors and params + if not href_clean.startswith(origin): + continue + if SKIP_PATTERNS.search(href_clean): + continue + + # Prioritize pages with relevant keywords + href_lower = href_clean.lower() + if any(kw in href_lower for kw in NAV_LINK_KEYWORDS): + links.add(href_clean) + + except Exception as e: + logger.warning("Link discovery failed: %s", e) + + return sorted(links)[:20] # Cap at 20 + + +async def _click_navigation_menus(page: Page, origin: str) -> list[str]: + """Click expandable navigation menus to discover hidden links.""" + links = set() + try: + # Find and click common menu toggles + menu_selectors = [ + 'button[aria-expanded="false"]', + '[class*="dropdown"] > a', + '[class*="menu-toggle"]', + '[class*="nav-toggle"]', + 'details:not([open]) > summary', + '[class*="accordion"] > button', + 'nav button', + ] + + for selector in menu_selectors: + try: + elements = page.locator(selector) + count = await elements.count() + for i in range(min(count, 10)): # Max 10 menus + try: + await elements.nth(i).click(timeout=2000) + await page.wait_for_timeout(500) + except Exception: + continue + except Exception: + continue + + # After clicking, collect newly visible links + new_hrefs = await page.evaluate(""" + () => [...document.querySelectorAll('a[href]')] + .filter(a => { + const rect = a.getBoundingClientRect(); + return rect.width > 0 && rect.height > 0; + }) + .map(a => a.href) + .filter(h => h.startsWith('http')) + """) + + for href in (new_hrefs or []): + href_clean = href.split("#")[0].split("?")[0] + if href_clean.startswith(origin) and not SKIP_PATTERNS.search(href_clean): + href_lower = href_clean.lower() + if any(kw in href_lower for kw in NAV_LINK_KEYWORDS): + links.add(href_clean) + + except Exception as e: + logger.warning("Menu click failed: %s", e) + + return sorted(links)[:10] + + +def _collect_external(request, scripts: list[str], origin: str): + """Collect external script/resource URLs.""" + url = request.url + if request.resource_type in ("script", "image") and not url.startswith(origin): + domain = url.split("/")[2] if len(url.split("/")) > 2 else url + if domain not in [s.split("/")[2] if len(s.split("/")) > 2 else s for s in scripts]: + scripts.append(url)