Files
breakpilot-compliance/consent-tester/main.py
T
Benjamin Admin a349111a01 fix: Raise full_text limit 10K→50K + combine all DSI texts for checks
Two fixes:
1. consent-tester: full_text truncation raised from 10,000 to 50,000 chars
   (IHK Internetangebot has ~50K chars, Beschwerderecht was after 10K cutoff)
2. Backend: dse_text now combines Playwright HTML + ALL DSI discovery texts
   for mandatory content checking. Previously only used first 8K chars from
   one source, missing Verantwortlicher/DSB that were in DSI documents.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-05 16:03:56 +02:00

324 lines
11 KiB
Python

"""
Consent Tester Service — Playwright-based 3-phase cookie consent test.
Tests what scripts/cookies load BEFORE consent, AFTER rejection, and AFTER acceptance.
Runs as independent microservice on port 8094.
"""
import logging
from datetime import datetime, timezone
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from services.consent_scanner import run_consent_test, ConsentTestResult
from services.authenticated_scanner import run_authenticated_test, AuthTestResult
from services.playwright_scanner import scan_website_playwright
from services.dsi_discovery import discover_dsi_documents, DSIDiscoveryResult
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s")
logger = logging.getLogger(__name__)
app = FastAPI(title="BreakPilot Consent Tester", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
class ScanRequest(BaseModel):
url: str
timeout_per_phase: int = 10 # seconds to wait after page load
class ScanResponse(BaseModel):
url: str
banner_detected: bool
banner_provider: str
phases: dict
summary: dict
scanned_at: str
category_tests: list = []
banner_checks: dict = {}
@app.get("/health")
async def health():
return {"status": "healthy", "service": "consent-tester"}
@app.post("/scan", response_model=ScanResponse)
async def scan_consent(req: ScanRequest):
"""Run 3-phase consent test on a URL."""
logger.info("Starting consent test for %s", req.url)
result = await run_consent_test(req.url, req.timeout_per_phase)
return ScanResponse(
url=req.url,
banner_detected=result.banner_detected,
banner_provider=result.banner_provider,
phases={
"before_consent": {
"scripts": result.before_scripts,
"cookies": result.before_cookies,
"tracking_services": result.before_tracking,
"violations": [v.__dict__ for v in result.before_violations],
},
"after_reject": {
"scripts": result.reject_scripts,
"cookies": result.reject_cookies,
"new_tracking": result.reject_new_tracking,
"violations": [v.__dict__ for v in result.reject_violations],
},
"after_accept": {
"scripts": result.accept_scripts,
"cookies": result.accept_cookies,
"new_tracking": result.accept_new_tracking,
"undocumented": result.accept_undocumented,
},
},
summary={
"critical": sum(1 for v in result.reject_violations if v.severity == "CRITICAL"),
"high": len(result.before_violations) + sum(1 for v in result.banner_text_violations if v.severity == "HIGH"),
"undocumented": len(result.accept_undocumented),
"total_violations": len(result.before_violations) + len(result.reject_violations) + len(result.banner_text_violations),
"category_violations": sum(len(ct.violations) for ct in result.category_tests),
"categories_tested": len(result.category_tests),
"banner_text_issues": len(result.banner_text_violations),
},
banner_checks={
"has_impressum_link": result.banner_has_impressum_link,
"has_dse_link": result.banner_has_dse_link,
"violations": [v.__dict__ for v in result.banner_text_violations],
},
scanned_at=datetime.now(timezone.utc).isoformat(),
category_tests=[{
"category": ct.category,
"category_label": ct.category_label,
"tracking_services": ct.tracking_services,
"violations": ct.violations,
} for ct in result.category_tests] if result.category_tests else [],
)
class AuthScanRequest(BaseModel):
url: str
username: str
password: str
username_selector: str = ""
password_selector: str = ""
submit_selector: str = ""
class AuthCheckInfo(BaseModel):
found: bool = False
text: str = ""
legal_ref: str = ""
class AuthScanResponse(BaseModel):
url: str
authenticated: bool
login_error: str = ""
checks: dict[str, AuthCheckInfo]
findings_count: int
scanned_at: str
LEGAL_REFS = {
"cancel_subscription": "§312k BGB (Kuendigungsbutton)",
"delete_account": "Art. 17 DSGVO (Recht auf Loeschung)",
"export_data": "Art. 20 DSGVO (Datenportabilitaet)",
"consent_settings": "Art. 7 Abs. 3 DSGVO (Widerruf der Einwilligung)",
"profile_visible": "Art. 15 DSGVO (Auskunftsrecht)",
}
@app.post("/authenticated-scan", response_model=AuthScanResponse)
async def authenticated_scan(req: AuthScanRequest):
"""Test post-login functionality. Credentials are destroyed after test."""
logger.info("Starting authenticated test for %s", req.url)
result = await run_authenticated_test(
url=req.url,
username=req.username,
password=req.password,
username_selector=req.username_selector,
password_selector=req.password_selector,
submit_selector=req.submit_selector,
)
checks = {
"cancel_subscription": AuthCheckInfo(
found=result.cancel_subscription.found,
text=result.cancel_subscription.text,
legal_ref=LEGAL_REFS["cancel_subscription"],
),
"delete_account": AuthCheckInfo(
found=result.delete_account.found,
text=result.delete_account.text,
legal_ref=LEGAL_REFS["delete_account"],
),
"export_data": AuthCheckInfo(
found=result.export_data.found,
text=result.export_data.text,
legal_ref=LEGAL_REFS["export_data"],
),
"consent_settings": AuthCheckInfo(
found=result.consent_settings.found,
text=result.consent_settings.text,
legal_ref=LEGAL_REFS["consent_settings"],
),
"profile_visible": AuthCheckInfo(
found=result.profile_visible.found,
text=result.profile_visible.text,
legal_ref=LEGAL_REFS["profile_visible"],
),
}
missing = sum(1 for c in checks.values() if not c.found)
return AuthScanResponse(
url=req.url,
authenticated=result.authenticated,
login_error=result.login_error,
checks=checks,
findings_count=missing,
scanned_at=datetime.now(timezone.utc).isoformat(),
)
# ═══════════════════════════════════════════════════════════════
# PLAYWRIGHT WEBSITE SCAN (Phase 10 — replaces httpx scanner)
# ═══════════════════════════════════════════════════════════════
class WebsiteScanRequest(BaseModel):
url: str
max_pages: int = 15
click_nav: bool = True
class PageInfo(BaseModel):
url: str
status: int
title: str = ""
error: str = ""
class WebsiteScanResponse(BaseModel):
url: str
pages: list[PageInfo]
pages_count: int
external_scripts: list[str]
cookies: list[str]
page_htmls: dict[str, str] # url -> rendered HTML (for backend analysis)
scanned_at: str
@app.post("/website-scan", response_model=WebsiteScanResponse)
async def website_scan(req: WebsiteScanRequest):
"""Scan website using Playwright — discovers pages via JS navigation + menu clicks."""
logger.info("Starting Playwright website scan for %s (max %d pages)", req.url, req.max_pages)
result = await scan_website_playwright(req.url, req.max_pages, req.click_nav)
# Build page HTML map (only successful pages, truncated)
page_htmls = {}
for p in result.pages:
if p.html and p.status < 400:
page_htmls[p.url] = p.html[:50000] # Cap at 50KB per page
return WebsiteScanResponse(
url=req.url,
pages=[PageInfo(url=p.url, status=p.status, title=p.title, error=p.error) for p in result.pages],
pages_count=len(result.pages),
external_scripts=result.external_scripts[:50],
cookies=result.all_cookies,
page_htmls=page_htmls,
scanned_at=datetime.now(timezone.utc).isoformat(),
)
# ═══════════════════════════════════════════════════════════════
# DSI DISCOVERY (finds all privacy + legal documents on a website)
# ═══════════════════════════════════════════════════════════════
class DSIDiscoveryRequest(BaseModel):
url: str
max_documents: int = 30
class DSIDocumentInfo(BaseModel):
title: str
url: str
source_url: str
language: str = ""
doc_type: str = ""
word_count: int = 0
text_preview: str = ""
full_text: str = ""
class DSIDiscoveryResponse(BaseModel):
url: str
documents: list[DSIDocumentInfo]
total_found: int
languages_detected: list[str]
errors: list[str]
scanned_at: str
@app.post("/dsi-discovery", response_model=DSIDiscoveryResponse)
async def dsi_discovery(req: DSIDiscoveryRequest):
"""Discover all privacy/data protection documents on a website.
Generically finds DSI, AGB, Nutzungsbedingungen, Widerrufsbelehrung,
Cookie-Richtlinien etc. regardless of website technology or language.
Supports HTML pages, accordions, sidebars, PDFs, cross-domain links.
"""
logger.info("Starting DSI discovery for %s (max %d docs)", req.url, req.max_documents)
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=["--no-sandbox", "--disable-dev-shm-usage"],
)
context = await browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
)
page = await context.new_page()
try:
result = await discover_dsi_documents(page, req.url, req.max_documents)
finally:
await context.close()
await browser.close()
return DSIDiscoveryResponse(
url=req.url,
documents=[
DSIDocumentInfo(
title=d.title,
url=d.url,
source_url=d.source_url,
language=d.language,
doc_type=d.doc_type,
word_count=d.word_count,
text_preview=d.text[:500] if d.text else "",
full_text=d.text[:50000] if d.text else "",
)
for d in result.documents
],
total_found=result.total_found,
languages_detected=result.languages_detected,
errors=result.errors,
scanned_at=datetime.now(timezone.utc).isoformat(),
)