""" Consent Tester Service — Playwright-based 3-phase cookie consent test. Tests what scripts/cookies load BEFORE consent, AFTER rejection, and AFTER acceptance. Runs as independent microservice on port 8094. """ import logging from datetime import datetime, timezone from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from services.consent_scanner import run_consent_test, ConsentTestResult from services.authenticated_scanner import run_authenticated_test, AuthTestResult from services.playwright_scanner import scan_website_playwright from services.dsi_discovery import discover_dsi_documents, DSIDiscoveryResult from checks.banner_runner import map_scan_to_checks logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s") logger = logging.getLogger(__name__) app = FastAPI(title="BreakPilot Consent Tester", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) class ScanRequest(BaseModel): url: str timeout_per_phase: int = 10 # seconds to wait after page load categories: list[str] = [] # empty = test all categories class ScanResponse(BaseModel): url: str banner_detected: bool banner_provider: str phases: dict summary: dict scanned_at: str category_tests: list = [] banner_checks: dict = {} structured_checks: list = [] completeness_pct: int = 0 correctness_pct: int = 0 tcf_vendors: list = [] # Resolved TCF vendor list from GVL cmp_payloads: list[dict] = [] # P48: raw CMP JSON-payloads (Usercentrics/OneTrust/...) captured during scan vendor_details: list[dict] = [] # P50: per-vendor detail-modal-extracts (Beschreibung/Cookies/Opt-Out/Privacy) cookies_detailed: list[dict] = [] # P59b: full cookie details for behavior-validation (name,value,domain,expires,phase,declared_category) banner_screenshot_b64: str = "" # P85: base64-PNG des Banners (initial-view) @app.get("/health") async def health(): return {"status": "healthy", "service": "consent-tester"} @app.post("/scan", response_model=ScanResponse) async def scan_consent(req: ScanRequest): """Run 3-phase consent test on a URL.""" logger.info("Starting consent test for %s", req.url) result = await run_consent_test(req.url, req.timeout_per_phase, req.categories) # Build raw response dict for structured check mapping phases = { "before_consent": { "scripts": result.before_scripts, "cookies": result.before_cookies, "tracking_services": result.before_tracking, "violations": [v.__dict__ for v in result.before_violations], }, "after_reject": { "scripts": result.reject_scripts, "cookies": result.reject_cookies, "new_tracking": result.reject_new_tracking, "violations": [v.__dict__ for v in result.reject_violations], }, "after_accept": { "scripts": result.accept_scripts, "cookies": result.accept_cookies, "new_tracking": result.accept_new_tracking, "undocumented": result.accept_undocumented, }, } banner_checks_data = { "has_impressum_link": result.banner_has_impressum_link, "has_dse_link": result.banner_has_dse_link, "violations": [v.__dict__ for v in result.banner_text_violations], } # Map to L1/L2 hierarchy raw_for_mapping = { "banner_detected": result.banner_detected, "banner_provider": result.banner_provider, "phases": phases, "banner_checks": banner_checks_data, } check_result = map_scan_to_checks(raw_for_mapping) return ScanResponse( url=req.url, banner_detected=result.banner_detected, banner_provider=result.banner_provider, tcf_vendors=result.tcf_vendors, phases=phases, summary={ "critical": sum(1 for v in result.reject_violations if v.severity == "CRITICAL"), "high": len(result.before_violations) + sum(1 for v in result.banner_text_violations if v.severity == "HIGH"), "undocumented": len(result.accept_undocumented), "total_violations": len(result.before_violations) + len(result.reject_violations) + len(result.banner_text_violations), "category_violations": sum(len(ct.violations) for ct in result.category_tests), "categories_tested": len(result.category_tests), "banner_text_issues": len(result.banner_text_violations), }, banner_checks=banner_checks_data, structured_checks=check_result["structured_checks"], completeness_pct=check_result["completeness_pct"], correctness_pct=check_result["correctness_pct"], scanned_at=datetime.now(timezone.utc).isoformat(), category_tests=[{ "category": ct.category, "category_label": ct.category_label, "tracking_services": ct.tracking_services, "violations": ct.violations, "provider_details_visible": getattr(ct, "provider_details_visible", False), "cookies_set": ct.cookies_set, } for ct in result.category_tests] if result.category_tests else [], cmp_payloads=result.cmp_payloads, # P48 vendor_details=result.vendor_details, # P50 cookies_detailed=result.cookies_detailed, # P59b banner_screenshot_b64=result.banner_screenshot_b64, # P85 ) class AuthScanRequest(BaseModel): url: str username: str password: str username_selector: str = "" password_selector: str = "" submit_selector: str = "" class AuthCheckInfo(BaseModel): found: bool = False text: str = "" legal_ref: str = "" class AuthScanResponse(BaseModel): url: str authenticated: bool login_error: str = "" checks: dict[str, AuthCheckInfo] findings_count: int scanned_at: str LEGAL_REFS = { "cancel_subscription": "§312k BGB (Kuendigungsbutton)", "delete_account": "Art. 17 DSGVO (Recht auf Loeschung)", "export_data": "Art. 20 DSGVO (Datenportabilitaet)", "consent_settings": "Art. 7 Abs. 3 DSGVO (Widerruf der Einwilligung)", "profile_visible": "Art. 15 DSGVO (Auskunftsrecht)", } @app.post("/authenticated-scan", response_model=AuthScanResponse) async def authenticated_scan(req: AuthScanRequest): """Test post-login functionality. Credentials are destroyed after test.""" logger.info("Starting authenticated test for %s", req.url) result = await run_authenticated_test( url=req.url, username=req.username, password=req.password, username_selector=req.username_selector, password_selector=req.password_selector, submit_selector=req.submit_selector, ) checks = { "cancel_subscription": AuthCheckInfo( found=result.cancel_subscription.found, text=result.cancel_subscription.text, legal_ref=LEGAL_REFS["cancel_subscription"], ), "delete_account": AuthCheckInfo( found=result.delete_account.found, text=result.delete_account.text, legal_ref=LEGAL_REFS["delete_account"], ), "export_data": AuthCheckInfo( found=result.export_data.found, text=result.export_data.text, legal_ref=LEGAL_REFS["export_data"], ), "consent_settings": AuthCheckInfo( found=result.consent_settings.found, text=result.consent_settings.text, legal_ref=LEGAL_REFS["consent_settings"], ), "profile_visible": AuthCheckInfo( found=result.profile_visible.found, text=result.profile_visible.text, legal_ref=LEGAL_REFS["profile_visible"], ), } missing = sum(1 for c in checks.values() if not c.found) return AuthScanResponse( url=req.url, authenticated=result.authenticated, login_error=result.login_error, checks=checks, findings_count=missing, scanned_at=datetime.now(timezone.utc).isoformat(), ) # ═══════════════════════════════════════════════════════════════ # PLAYWRIGHT WEBSITE SCAN (Phase 10 — replaces httpx scanner) # ═══════════════════════════════════════════════════════════════ class WebsiteScanRequest(BaseModel): url: str max_pages: int = 15 click_nav: bool = True class PageInfo(BaseModel): url: str status: int title: str = "" error: str = "" class WebsiteScanResponse(BaseModel): url: str pages: list[PageInfo] pages_count: int external_scripts: list[str] cookies: list[str] page_htmls: dict[str, str] # url -> rendered HTML (for backend analysis) scanned_at: str @app.post("/website-scan", response_model=WebsiteScanResponse) async def website_scan(req: WebsiteScanRequest): """Scan website using Playwright — discovers pages via JS navigation + menu clicks.""" logger.info("Starting Playwright website scan for %s (max %d pages)", req.url, req.max_pages) result = await scan_website_playwright(req.url, req.max_pages, req.click_nav) # Build page HTML map (only successful pages, truncated) page_htmls = {} for p in result.pages: if p.html and p.status < 400: page_htmls[p.url] = p.html[:50000] # Cap at 50KB per page return WebsiteScanResponse( url=req.url, pages=[PageInfo(url=p.url, status=p.status, title=p.title, error=p.error) for p in result.pages], pages_count=len(result.pages), external_scripts=result.external_scripts[:50], cookies=result.all_cookies, page_htmls=page_htmls, scanned_at=datetime.now(timezone.utc).isoformat(), ) # ═══════════════════════════════════════════════════════════════ # DSI DISCOVERY (finds all privacy + legal documents on a website) # ═══════════════════════════════════════════════════════════════ class DSIDiscoveryRequest(BaseModel): url: str max_documents: int = 30 class DSIDocumentInfo(BaseModel): title: str url: str source_url: str language: str = "" doc_type: str = "" word_count: int = 0 text_preview: str = "" full_text: str = "" # D — Tab-getrennte HTML-Tabellen aus dem DOM (z.B. Cookie-Tabellen). # Pro Tabelle ein Array von Zeilen, jede Zeile Tab-getrennt. # Backend nutzt das fuer deterministischen Cookie-Tabellen-Parse. tables: list[list[str]] = [] class DSIDiscoveryResponse(BaseModel): url: str documents: list[DSIDocumentInfo] total_found: int languages_detected: list[str] errors: list[str] scanned_at: str # Raw CMP payloads captured during navigation (ePaaS, OneTrust, etc.). # Backend uses these to build the per-vendor compliance table. cmp_payloads: list[dict] = [] @app.post("/dsi-discovery", response_model=DSIDiscoveryResponse) async def dsi_discovery(req: DSIDiscoveryRequest): """Discover all privacy/data protection documents on a website. Generically finds DSI, AGB, Nutzungsbedingungen, Widerrufsbelehrung, Cookie-Richtlinien etc. regardless of website technology or language. Supports HTML pages, accordions, sidebars, PDFs, cross-domain links. """ logger.info("Starting DSI discovery for %s (max %d docs)", req.url, req.max_documents) from playwright.async_api import async_playwright async with async_playwright() as p: browser = await p.chromium.launch( headless=True, args=["--no-sandbox", "--disable-dev-shm-usage"], ) context = await browser.new_context( user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", ) page = await context.new_page() try: result = await discover_dsi_documents(page, req.url, req.max_documents) finally: await context.close() await browser.close() return DSIDiscoveryResponse( url=req.url, documents=[ DSIDocumentInfo( title=d.title, url=d.url, source_url=d.source_url, language=d.language, doc_type=d.doc_type, word_count=d.word_count, text_preview=d.text[:500] if d.text else "", full_text=d.text[:200000] if d.text else "", tables=getattr(d, "tables", []) or [], ) for d in result.documents ], total_found=result.total_found, languages_detected=result.languages_detected, errors=result.errors, scanned_at=datetime.now(timezone.utc).isoformat(), cmp_payloads=result.cmp_payloads, ) # ── Admin: CMP discoveries (Phase E) ──────────────────────────────── @app.get("/cmp-discoveries") async def cmp_discoveries(limit: int = 200): """List LLM-discovered CMP patterns (Phase E auto-promote log).""" from services.cmp_discovery_log import list_discoveries return {"discoveries": list_discoveries(limit=limit)} @app.delete("/cmp-discoveries/{disc_id}") async def cmp_discovery_delete(disc_id: int): """Delete a discovery + its auto-promoted module (rollback).""" from services.cmp_discovery_log import delete_discovery ok = delete_discovery(disc_id) return {"deleted": ok, "id": disc_id}