""" Consent Tester Service — Playwright-based 3-phase cookie consent test. Tests what scripts/cookies load BEFORE consent, AFTER rejection, and AFTER acceptance. Runs as independent microservice on port 8094. """ import logging from datetime import datetime, timezone from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from services.consent_scanner import run_consent_test, ConsentTestResult from services.authenticated_scanner import run_authenticated_test, AuthTestResult from services.playwright_scanner import scan_website_playwright from services.dsi_discovery import discover_dsi_documents, DSIDiscoveryResult logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s") logger = logging.getLogger(__name__) app = FastAPI(title="BreakPilot Consent Tester", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) class ScanRequest(BaseModel): url: str timeout_per_phase: int = 10 # seconds to wait after page load class ScanResponse(BaseModel): url: str banner_detected: bool banner_provider: str phases: dict summary: dict scanned_at: str category_tests: list = [] banner_checks: dict = {} @app.get("/health") async def health(): return {"status": "healthy", "service": "consent-tester"} @app.post("/scan", response_model=ScanResponse) async def scan_consent(req: ScanRequest): """Run 3-phase consent test on a URL.""" logger.info("Starting consent test for %s", req.url) result = await run_consent_test(req.url, req.timeout_per_phase) return ScanResponse( url=req.url, banner_detected=result.banner_detected, banner_provider=result.banner_provider, phases={ "before_consent": { "scripts": result.before_scripts, "cookies": result.before_cookies, "tracking_services": result.before_tracking, "violations": [v.__dict__ for v in result.before_violations], }, "after_reject": { "scripts": result.reject_scripts, "cookies": result.reject_cookies, "new_tracking": result.reject_new_tracking, "violations": [v.__dict__ for v in result.reject_violations], }, "after_accept": { "scripts": result.accept_scripts, "cookies": result.accept_cookies, "new_tracking": result.accept_new_tracking, "undocumented": result.accept_undocumented, }, }, summary={ "critical": sum(1 for v in result.reject_violations if v.severity == "CRITICAL"), "high": len(result.before_violations) + sum(1 for v in result.banner_text_violations if v.severity == "HIGH"), "undocumented": len(result.accept_undocumented), "total_violations": len(result.before_violations) + len(result.reject_violations) + len(result.banner_text_violations), "category_violations": sum(len(ct.violations) for ct in result.category_tests), "categories_tested": len(result.category_tests), "banner_text_issues": len(result.banner_text_violations), }, banner_checks={ "has_impressum_link": result.banner_has_impressum_link, "has_dse_link": result.banner_has_dse_link, "violations": [v.__dict__ for v in result.banner_text_violations], }, scanned_at=datetime.now(timezone.utc).isoformat(), category_tests=[{ "category": ct.category, "category_label": ct.category_label, "tracking_services": ct.tracking_services, "violations": ct.violations, } for ct in result.category_tests] if result.category_tests else [], ) class AuthScanRequest(BaseModel): url: str username: str password: str username_selector: str = "" password_selector: str = "" submit_selector: str = "" class AuthCheckInfo(BaseModel): found: bool = False text: str = "" legal_ref: str = "" class AuthScanResponse(BaseModel): url: str authenticated: bool login_error: str = "" checks: dict[str, AuthCheckInfo] findings_count: int scanned_at: str LEGAL_REFS = { "cancel_subscription": "§312k BGB (Kuendigungsbutton)", "delete_account": "Art. 17 DSGVO (Recht auf Loeschung)", "export_data": "Art. 20 DSGVO (Datenportabilitaet)", "consent_settings": "Art. 7 Abs. 3 DSGVO (Widerruf der Einwilligung)", "profile_visible": "Art. 15 DSGVO (Auskunftsrecht)", } @app.post("/authenticated-scan", response_model=AuthScanResponse) async def authenticated_scan(req: AuthScanRequest): """Test post-login functionality. Credentials are destroyed after test.""" logger.info("Starting authenticated test for %s", req.url) result = await run_authenticated_test( url=req.url, username=req.username, password=req.password, username_selector=req.username_selector, password_selector=req.password_selector, submit_selector=req.submit_selector, ) checks = { "cancel_subscription": AuthCheckInfo( found=result.cancel_subscription.found, text=result.cancel_subscription.text, legal_ref=LEGAL_REFS["cancel_subscription"], ), "delete_account": AuthCheckInfo( found=result.delete_account.found, text=result.delete_account.text, legal_ref=LEGAL_REFS["delete_account"], ), "export_data": AuthCheckInfo( found=result.export_data.found, text=result.export_data.text, legal_ref=LEGAL_REFS["export_data"], ), "consent_settings": AuthCheckInfo( found=result.consent_settings.found, text=result.consent_settings.text, legal_ref=LEGAL_REFS["consent_settings"], ), "profile_visible": AuthCheckInfo( found=result.profile_visible.found, text=result.profile_visible.text, legal_ref=LEGAL_REFS["profile_visible"], ), } missing = sum(1 for c in checks.values() if not c.found) return AuthScanResponse( url=req.url, authenticated=result.authenticated, login_error=result.login_error, checks=checks, findings_count=missing, scanned_at=datetime.now(timezone.utc).isoformat(), ) # ═══════════════════════════════════════════════════════════════ # PLAYWRIGHT WEBSITE SCAN (Phase 10 — replaces httpx scanner) # ═══════════════════════════════════════════════════════════════ class WebsiteScanRequest(BaseModel): url: str max_pages: int = 15 click_nav: bool = True class PageInfo(BaseModel): url: str status: int title: str = "" error: str = "" class WebsiteScanResponse(BaseModel): url: str pages: list[PageInfo] pages_count: int external_scripts: list[str] cookies: list[str] page_htmls: dict[str, str] # url -> rendered HTML (for backend analysis) scanned_at: str @app.post("/website-scan", response_model=WebsiteScanResponse) async def website_scan(req: WebsiteScanRequest): """Scan website using Playwright — discovers pages via JS navigation + menu clicks.""" logger.info("Starting Playwright website scan for %s (max %d pages)", req.url, req.max_pages) result = await scan_website_playwright(req.url, req.max_pages, req.click_nav) # Build page HTML map (only successful pages, truncated) page_htmls = {} for p in result.pages: if p.html and p.status < 400: page_htmls[p.url] = p.html[:50000] # Cap at 50KB per page return WebsiteScanResponse( url=req.url, pages=[PageInfo(url=p.url, status=p.status, title=p.title, error=p.error) for p in result.pages], pages_count=len(result.pages), external_scripts=result.external_scripts[:50], cookies=result.all_cookies, page_htmls=page_htmls, scanned_at=datetime.now(timezone.utc).isoformat(), ) # ═══════════════════════════════════════════════════════════════ # DSI DISCOVERY (finds all privacy + legal documents on a website) # ═══════════════════════════════════════════════════════════════ class DSIDiscoveryRequest(BaseModel): url: str max_documents: int = 30 class DSIDocumentInfo(BaseModel): title: str url: str source_url: str language: str = "" doc_type: str = "" word_count: int = 0 text_preview: str = "" class DSIDiscoveryResponse(BaseModel): url: str documents: list[DSIDocumentInfo] total_found: int languages_detected: list[str] errors: list[str] scanned_at: str @app.post("/dsi-discovery", response_model=DSIDiscoveryResponse) async def dsi_discovery(req: DSIDiscoveryRequest): """Discover all privacy/data protection documents on a website. Generically finds DSI, AGB, Nutzungsbedingungen, Widerrufsbelehrung, Cookie-Richtlinien etc. regardless of website technology or language. Supports HTML pages, accordions, sidebars, PDFs, cross-domain links. """ logger.info("Starting DSI discovery for %s (max %d docs)", req.url, req.max_documents) from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch( headless=True, args=["--no-sandbox", "--disable-dev-shm-usage"], ) context = await browser.new_context( user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", ) page = await context.new_page() try: result = await discover_dsi_documents(page, req.url, req.max_documents) finally: await context.close() await browser.close() return DSIDiscoveryResponse( url=req.url, documents=[ DSIDocumentInfo( title=d.title, url=d.url, source_url=d.source_url, language=d.language, doc_type=d.doc_type, word_count=d.word_count, text_preview=d.text[:500] if d.text else "", ) for d in result.documents ], total_found=result.total_found, languages_detected=result.languages_detected, errors=result.errors, scanned_at=datetime.now(timezone.utc).isoformat(), )