""" Website Scanner — scans multiple pages of a website for third-party services, chatbots, tracking, AI indicators, and compares against privacy policy. Used by the Compliance Agent for SOLL/IST analysis. """ import logging import re from dataclasses import dataclass, field from urllib.parse import urljoin, urlparse import httpx logger = logging.getLogger(__name__) USER_AGENT = "BreakPilot-Compliance-Agent/1.0" @dataclass class DetectedService: id: str name: str category: str # "tracking", "chatbot", "cdn", "payment", "marketing", "other" provider: str country: str eu_adequate: bool requires_consent: bool legal_ref: str found_on: str = "" # URL where detected @dataclass class ScanResult: pages_scanned: list[str] = field(default_factory=list) detected_services: list[DetectedService] = field(default_factory=list) ai_mentions: list[str] = field(default_factory=list) chatbot_detected: bool = False chatbot_provider: str = "" missing_pages: dict = field(default_factory=dict) # url -> status_code # ── Service Registry (imported from master) ────────────────────────────────── from compliance.services.service_registry import SERVICE_REGISTRY # noqa: E402 AI_TEXT_PATTERNS = [ r"k(?:ue|ü)nstliche.?intelligenz", r"artificial.?intelligence", r"machine.?learning", r"maschinelles.?lernen", r"KI.?gest(?:ue|ü)tzt", r"AI.?powered", r"chatgpt|openai", r"deep.?learning", r"neural.?net", r"automatisierte.?entscheidung", ] FOOTER_LINK_PATTERNS = [ (r'href="([^"]*(?:impressum|imprint|legal-notice)[^"]*)"', "impressum"), (r'href="([^"]*(?:datenschutz|privacy|dsgvo)[^"]*)"', "datenschutz"), (r'href="([^"]*(?:agb|terms|nutzungsbedingungen)[^"]*)"', "agb"), (r'href="([^"]*(?:cookie)[^"]*)"', "cookies"), ] async def scan_website(base_url: str) -> ScanResult: """Scan a website: start page + footer links for services and AI indicators.""" result = ScanResult() parsed = urlparse(base_url) origin = f"{parsed.scheme}://{parsed.netloc}" async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client: # 1. Fetch start page start_html = await _fetch_page(client, origin, result) if not start_html: return result # 2. Discover footer links page_urls = {origin} page_urls.add(base_url) # Also scan the provided URL for pattern, _ in FOOTER_LINK_PATTERNS: for match in re.finditer(pattern, start_html, re.IGNORECASE): href = match.group(1) if href.startswith("/"): href = urljoin(origin, href) if href.startswith(origin): page_urls.add(href) # 3. Scan all pages in PARALLEL (max 10) import asyncio other_urls = [u for u in list(page_urls)[:10] if u != origin] fetch_tasks = [_fetch_page(client, u, result) for u in other_urls] other_htmls = await asyncio.gather(*fetch_tasks, return_exceptions=True) # Process start page _detect_services(start_html, origin, result) _detect_ai_mentions(start_html, origin, result) # Process other pages for url, html in zip(other_urls, other_htmls): if isinstance(html, str) and html: _detect_services(html, url, result) _detect_ai_mentions(html, url, result) # Deduplicate services seen = set() unique = [] for svc in result.detected_services: if svc.id not in seen: seen.add(svc.id) unique.append(svc) result.detected_services = unique result.chatbot_detected = any(s.category == "chatbot" for s in result.detected_services) if result.chatbot_detected: result.chatbot_provider = next( s.name for s in result.detected_services if s.category == "chatbot" ) return result async def _fetch_page( client: httpx.AsyncClient, url: str, result: ScanResult, ) -> str: """Fetch a single page. Returns HTML or empty string on failure.""" try: resp = await client.get(url, headers={"User-Agent": USER_AGENT}) result.pages_scanned.append(url) if resp.status_code >= 400: result.missing_pages[url] = resp.status_code return "" return resp.text except Exception as e: logger.warning("Failed to fetch %s: %s", url, e) return "" def _detect_services(html: str, url: str, result: ScanResult) -> None: """Detect third-party services in HTML.""" for pattern, meta in SERVICE_REGISTRY.items(): if re.search(pattern, html, re.IGNORECASE): result.detected_services.append(DetectedService( found_on=url, **meta, )) def _detect_ai_mentions(html: str, url: str, result: ScanResult) -> None: """Detect AI/ML text mentions in page content.""" # Strip scripts/styles first for text-only search clean = re.sub(r"<(script|style)[^>]*>.*?", "", html, flags=re.DOTALL | re.IGNORECASE) clean = re.sub(r"<[^>]+>", " ", clean) for pattern in AI_TEXT_PATTERNS: match = re.search(pattern, clean, re.IGNORECASE) if match: context = clean[max(0, match.start() - 40):match.end() + 40].strip() result.ai_mentions.append(f"{url}: ...{context}...")