Files
breakpilot-compliance/consent-tester/main.py
T
Benjamin Admin 686834cea0
Build + Deploy / build-ai-sdk (push) Failing after 36s
Build + Deploy / build-developer-portal (push) Successful in 8s
Build + Deploy / build-tts (push) Successful in 7s
Build + Deploy / build-document-crawler (push) Successful in 7s
Build + Deploy / build-admin-compliance (push) Successful in 8s
Build + Deploy / build-backend-compliance (push) Successful in 8s
CI / nodejs-build (push) Successful in 3m14s
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / test-go (push) Failing after 46s
CI / test-python-backend (push) Successful in 43s
CI / test-python-document-crawler (push) Successful in 29s
CI / test-python-dsms-gateway (push) Successful in 30s
CI / validate-canonical-controls (push) Successful in 16s
Build + Deploy / build-dsms-gateway (push) Successful in 8s
Build + Deploy / build-dsms-node (push) Successful in 8s
CI / branch-name (push) Has been skipped
Build + Deploy / trigger-orca (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 17s
CI / secret-scan (push) Has been skipped
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
feat: 4 remaining tasks — EU institutions, banner integration, JS-sites, Caritas fixes
1. EU Institution Checks (Verordnung 2018/1725):
   - New doc_type "eu_institution" with 9 L1 + 15 L2 checks
   - Both German + English patterns (EU institutions are multilingual)
   - Auto-detection via "2018/1725", "EDSB", "EDPS" keywords
   - Correct article references (Art. 15 instead of 13, Art. 5 instead of 6)

2. Banner Check Integration:
   - banner_runner.py maps scan results to 36 L1/L2 structured checks
   - BannerCheckTab shows hierarchical ChecklistView with hints
   - 3-phase summary (cookies/scripts before/after consent)
   - /scan endpoint now includes structured_checks in response

3. JS-heavy Website Fixes (dm, Zalando, HWK):
   - dsi_helpers.py: goto_resilient (networkidle→domcontentloaded fallback)
   - try_dismiss_consent_banner before text extraction
   - PDF redirect detection (dm.de redirects to GCS PDF)

4. Caritas False Positive Fixes:
   - Phone regex allows parentheses: +49 (0)761 → now matches
   - "Recht auf Widerspruch" (3 words) + §23 KDG → matches Art. 21
   - Church authorities: "Katholisches Datenschutzzentrum" recognized

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-08 01:10:10 +02:00

344 lines
12 KiB
Python

"""
Consent Tester Service — Playwright-based 3-phase cookie consent test.
Tests what scripts/cookies load BEFORE consent, AFTER rejection, and AFTER acceptance.
Runs as independent microservice on port 8094.
"""
import logging
from datetime import datetime, timezone
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from services.consent_scanner import run_consent_test, ConsentTestResult
from services.authenticated_scanner import run_authenticated_test, AuthTestResult
from services.playwright_scanner import scan_website_playwright
from services.dsi_discovery import discover_dsi_documents, DSIDiscoveryResult
from checks.banner_runner import map_scan_to_checks
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s")
logger = logging.getLogger(__name__)
app = FastAPI(title="BreakPilot Consent Tester", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
class ScanRequest(BaseModel):
url: str
timeout_per_phase: int = 10 # seconds to wait after page load
class ScanResponse(BaseModel):
url: str
banner_detected: bool
banner_provider: str
phases: dict
summary: dict
scanned_at: str
category_tests: list = []
banner_checks: dict = {}
structured_checks: list = []
completeness_pct: int = 0
correctness_pct: int = 0
@app.get("/health")
async def health():
return {"status": "healthy", "service": "consent-tester"}
@app.post("/scan", response_model=ScanResponse)
async def scan_consent(req: ScanRequest):
"""Run 3-phase consent test on a URL."""
logger.info("Starting consent test for %s", req.url)
result = await run_consent_test(req.url, req.timeout_per_phase)
# Build raw response dict for structured check mapping
phases = {
"before_consent": {
"scripts": result.before_scripts,
"cookies": result.before_cookies,
"tracking_services": result.before_tracking,
"violations": [v.__dict__ for v in result.before_violations],
},
"after_reject": {
"scripts": result.reject_scripts,
"cookies": result.reject_cookies,
"new_tracking": result.reject_new_tracking,
"violations": [v.__dict__ for v in result.reject_violations],
},
"after_accept": {
"scripts": result.accept_scripts,
"cookies": result.accept_cookies,
"new_tracking": result.accept_new_tracking,
"undocumented": result.accept_undocumented,
},
}
banner_checks_data = {
"has_impressum_link": result.banner_has_impressum_link,
"has_dse_link": result.banner_has_dse_link,
"violations": [v.__dict__ for v in result.banner_text_violations],
}
# Map to L1/L2 hierarchy
raw_for_mapping = {
"banner_detected": result.banner_detected,
"banner_provider": result.banner_provider,
"phases": phases,
"banner_checks": banner_checks_data,
}
check_result = map_scan_to_checks(raw_for_mapping)
return ScanResponse(
url=req.url,
banner_detected=result.banner_detected,
banner_provider=result.banner_provider,
phases=phases,
summary={
"critical": sum(1 for v in result.reject_violations if v.severity == "CRITICAL"),
"high": len(result.before_violations) + sum(1 for v in result.banner_text_violations if v.severity == "HIGH"),
"undocumented": len(result.accept_undocumented),
"total_violations": len(result.before_violations) + len(result.reject_violations) + len(result.banner_text_violations),
"category_violations": sum(len(ct.violations) for ct in result.category_tests),
"categories_tested": len(result.category_tests),
"banner_text_issues": len(result.banner_text_violations),
},
banner_checks=banner_checks_data,
structured_checks=check_result["structured_checks"],
completeness_pct=check_result["completeness_pct"],
correctness_pct=check_result["correctness_pct"],
scanned_at=datetime.now(timezone.utc).isoformat(),
category_tests=[{
"category": ct.category,
"category_label": ct.category_label,
"tracking_services": ct.tracking_services,
"violations": ct.violations,
} for ct in result.category_tests] if result.category_tests else [],
)
class AuthScanRequest(BaseModel):
url: str
username: str
password: str
username_selector: str = ""
password_selector: str = ""
submit_selector: str = ""
class AuthCheckInfo(BaseModel):
found: bool = False
text: str = ""
legal_ref: str = ""
class AuthScanResponse(BaseModel):
url: str
authenticated: bool
login_error: str = ""
checks: dict[str, AuthCheckInfo]
findings_count: int
scanned_at: str
LEGAL_REFS = {
"cancel_subscription": "§312k BGB (Kuendigungsbutton)",
"delete_account": "Art. 17 DSGVO (Recht auf Loeschung)",
"export_data": "Art. 20 DSGVO (Datenportabilitaet)",
"consent_settings": "Art. 7 Abs. 3 DSGVO (Widerruf der Einwilligung)",
"profile_visible": "Art. 15 DSGVO (Auskunftsrecht)",
}
@app.post("/authenticated-scan", response_model=AuthScanResponse)
async def authenticated_scan(req: AuthScanRequest):
"""Test post-login functionality. Credentials are destroyed after test."""
logger.info("Starting authenticated test for %s", req.url)
result = await run_authenticated_test(
url=req.url,
username=req.username,
password=req.password,
username_selector=req.username_selector,
password_selector=req.password_selector,
submit_selector=req.submit_selector,
)
checks = {
"cancel_subscription": AuthCheckInfo(
found=result.cancel_subscription.found,
text=result.cancel_subscription.text,
legal_ref=LEGAL_REFS["cancel_subscription"],
),
"delete_account": AuthCheckInfo(
found=result.delete_account.found,
text=result.delete_account.text,
legal_ref=LEGAL_REFS["delete_account"],
),
"export_data": AuthCheckInfo(
found=result.export_data.found,
text=result.export_data.text,
legal_ref=LEGAL_REFS["export_data"],
),
"consent_settings": AuthCheckInfo(
found=result.consent_settings.found,
text=result.consent_settings.text,
legal_ref=LEGAL_REFS["consent_settings"],
),
"profile_visible": AuthCheckInfo(
found=result.profile_visible.found,
text=result.profile_visible.text,
legal_ref=LEGAL_REFS["profile_visible"],
),
}
missing = sum(1 for c in checks.values() if not c.found)
return AuthScanResponse(
url=req.url,
authenticated=result.authenticated,
login_error=result.login_error,
checks=checks,
findings_count=missing,
scanned_at=datetime.now(timezone.utc).isoformat(),
)
# ═══════════════════════════════════════════════════════════════
# PLAYWRIGHT WEBSITE SCAN (Phase 10 — replaces httpx scanner)
# ═══════════════════════════════════════════════════════════════
class WebsiteScanRequest(BaseModel):
url: str
max_pages: int = 15
click_nav: bool = True
class PageInfo(BaseModel):
url: str
status: int
title: str = ""
error: str = ""
class WebsiteScanResponse(BaseModel):
url: str
pages: list[PageInfo]
pages_count: int
external_scripts: list[str]
cookies: list[str]
page_htmls: dict[str, str] # url -> rendered HTML (for backend analysis)
scanned_at: str
@app.post("/website-scan", response_model=WebsiteScanResponse)
async def website_scan(req: WebsiteScanRequest):
"""Scan website using Playwright — discovers pages via JS navigation + menu clicks."""
logger.info("Starting Playwright website scan for %s (max %d pages)", req.url, req.max_pages)
result = await scan_website_playwright(req.url, req.max_pages, req.click_nav)
# Build page HTML map (only successful pages, truncated)
page_htmls = {}
for p in result.pages:
if p.html and p.status < 400:
page_htmls[p.url] = p.html[:50000] # Cap at 50KB per page
return WebsiteScanResponse(
url=req.url,
pages=[PageInfo(url=p.url, status=p.status, title=p.title, error=p.error) for p in result.pages],
pages_count=len(result.pages),
external_scripts=result.external_scripts[:50],
cookies=result.all_cookies,
page_htmls=page_htmls,
scanned_at=datetime.now(timezone.utc).isoformat(),
)
# ═══════════════════════════════════════════════════════════════
# DSI DISCOVERY (finds all privacy + legal documents on a website)
# ═══════════════════════════════════════════════════════════════
class DSIDiscoveryRequest(BaseModel):
url: str
max_documents: int = 30
class DSIDocumentInfo(BaseModel):
title: str
url: str
source_url: str
language: str = ""
doc_type: str = ""
word_count: int = 0
text_preview: str = ""
full_text: str = ""
class DSIDiscoveryResponse(BaseModel):
url: str
documents: list[DSIDocumentInfo]
total_found: int
languages_detected: list[str]
errors: list[str]
scanned_at: str
@app.post("/dsi-discovery", response_model=DSIDiscoveryResponse)
async def dsi_discovery(req: DSIDiscoveryRequest):
"""Discover all privacy/data protection documents on a website.
Generically finds DSI, AGB, Nutzungsbedingungen, Widerrufsbelehrung,
Cookie-Richtlinien etc. regardless of website technology or language.
Supports HTML pages, accordions, sidebars, PDFs, cross-domain links.
"""
logger.info("Starting DSI discovery for %s (max %d docs)", req.url, req.max_documents)
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=["--no-sandbox", "--disable-dev-shm-usage"],
)
context = await browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
)
page = await context.new_page()
try:
result = await discover_dsi_documents(page, req.url, req.max_documents)
finally:
await context.close()
await browser.close()
return DSIDiscoveryResponse(
url=req.url,
documents=[
DSIDocumentInfo(
title=d.title,
url=d.url,
source_url=d.source_url,
language=d.language,
doc_type=d.doc_type,
word_count=d.word_count,
text_preview=d.text[:500] if d.text else "",
full_text=d.text[:50000] if d.text else "",
)
for d in result.documents
],
total_found=result.total_found,
languages_detected=result.languages_detected,
errors=result.errors,
scanned_at=datetime.now(timezone.utc).isoformat(),
)