""" Consent Tester Service — Playwright-based 3-phase cookie consent test. Tests what scripts/cookies load BEFORE consent, AFTER rejection, and AFTER acceptance. Runs as independent microservice on port 8094. """ import logging from datetime import datetime, timezone from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from services.consent_scanner import run_consent_test, ConsentTestResult from services.authenticated_scanner import run_authenticated_test, AuthTestResult logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s") logger = logging.getLogger(__name__) app = FastAPI(title="BreakPilot Consent Tester", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) class ScanRequest(BaseModel): url: str timeout_per_phase: int = 10 # seconds to wait after page load class ScanResponse(BaseModel): url: str banner_detected: bool banner_provider: str phases: dict summary: dict scanned_at: str @app.get("/health") async def health(): return {"status": "healthy", "service": "consent-tester"} @app.post("/scan", response_model=ScanResponse) async def scan_consent(req: ScanRequest): """Run 3-phase consent test on a URL.""" logger.info("Starting consent test for %s", req.url) result = await run_consent_test(req.url, req.timeout_per_phase) return ScanResponse( url=req.url, banner_detected=result.banner_detected, banner_provider=result.banner_provider, phases={ "before_consent": { "scripts": result.before_scripts, "cookies": result.before_cookies, "tracking_services": result.before_tracking, "violations": [v.__dict__ for v in result.before_violations], }, "after_reject": { "scripts": result.reject_scripts, "cookies": result.reject_cookies, "new_tracking": result.reject_new_tracking, "violations": [v.__dict__ for v in result.reject_violations], }, "after_accept": { "scripts": result.accept_scripts, "cookies": result.accept_cookies, "new_tracking": result.accept_new_tracking, "undocumented": result.accept_undocumented, }, }, summary={ "critical": sum(1 for v in result.reject_violations if v.severity == "CRITICAL"), "high": len(result.before_violations), "undocumented": len(result.accept_undocumented), "total_violations": len(result.before_violations) + len(result.reject_violations), }, scanned_at=datetime.now(timezone.utc).isoformat(), ) class AuthScanRequest(BaseModel): url: str username: str password: str username_selector: str = "" password_selector: str = "" submit_selector: str = "" class AuthCheckInfo(BaseModel): found: bool = False text: str = "" legal_ref: str = "" class AuthScanResponse(BaseModel): url: str authenticated: bool login_error: str = "" checks: dict[str, AuthCheckInfo] findings_count: int scanned_at: str LEGAL_REFS = { "cancel_subscription": "§312k BGB (Kuendigungsbutton)", "delete_account": "Art. 17 DSGVO (Recht auf Loeschung)", "export_data": "Art. 20 DSGVO (Datenportabilitaet)", "consent_settings": "Art. 7 Abs. 3 DSGVO (Widerruf der Einwilligung)", "profile_visible": "Art. 15 DSGVO (Auskunftsrecht)", } @app.post("/authenticated-scan", response_model=AuthScanResponse) async def authenticated_scan(req: AuthScanRequest): """Test post-login functionality. Credentials are destroyed after test.""" logger.info("Starting authenticated test for %s", req.url) result = await run_authenticated_test( url=req.url, username=req.username, password=req.password, username_selector=req.username_selector, password_selector=req.password_selector, submit_selector=req.submit_selector, ) checks = { "cancel_subscription": AuthCheckInfo( found=result.cancel_subscription.found, text=result.cancel_subscription.text, legal_ref=LEGAL_REFS["cancel_subscription"], ), "delete_account": AuthCheckInfo( found=result.delete_account.found, text=result.delete_account.text, legal_ref=LEGAL_REFS["delete_account"], ), "export_data": AuthCheckInfo( found=result.export_data.found, text=result.export_data.text, legal_ref=LEGAL_REFS["export_data"], ), "consent_settings": AuthCheckInfo( found=result.consent_settings.found, text=result.consent_settings.text, legal_ref=LEGAL_REFS["consent_settings"], ), "profile_visible": AuthCheckInfo( found=result.profile_visible.found, text=result.profile_visible.text, legal_ref=LEGAL_REFS["profile_visible"], ), } missing = sum(1 for c in checks.values() if not c.found) return AuthScanResponse( url=req.url, authenticated=result.authenticated, login_error=result.login_error, checks=checks, findings_count=missing, scanned_at=datetime.now(timezone.utc).isoformat(), )