Files
breakpilot-compliance/consent-tester/main.py
T
Benjamin Admin 37093ff9e3 feat: Browser-Matrix C2 + B11 AI-Retention + Impressum-Specialist-Agent + B1 Mobile Playwright
Task #15 Stage 1.c-e — Browser-Matrix Backend-Integration:
  - _phase_c2_browser_matrix.py: ruft consent-tester /scan-matrix wenn
    env BROWSER_MATRIX=true, fuellt state["browser_matrix"] +
    state["browser_aggregate"] + state["browser_matrix_html"]
  - V2-Mail-Block: 🌐 Browser-Matrix Tabelle (Profile · Score ·
    Sub-Scores PC/RR/BD · Bewertung) mit Worst-of-Header
  - Orchestrator ruft run_phase_c2 nach run_phase_c
  KNOWN: Stage 1.b (consent_scanner browser_profile-Param) bleibt
    zurueckgestellt (Datei in loc-exception, Hook-Patch verweigert).
    Stage 1.a-Shim laeuft im consent-tester — alle Profile aktuell
    auf Chromium, echte Engine-Diversitaet kommt mit 1.b.

Task #17 TH-RETENTION-002 als B11 ai_retention_granularity_check:
  - Erkennt AI-Provider-Kontext (vertex/openai/anthropic/etc)
  - In +-800-char-Window: prueft ≥2 Datenkategorien aus Standard-Liste
    (Texteingaben/IP/Geraet/Session/Fehlerprotokoll/Zeitstempel)
  - Wenn 1 pauschale Speicherdauer + ≥2 Kategorien aber kein
    per-Kategorie-Differential → LOW
  - Smoke: Elli-Mock-DSE trifft LOW "AI-Speicherdauer pauschal"

Task #18 Specialist-Agents Phase-1-Prototyp:
  - compliance/services/specialist_agents/__init__.py mit Architektur-Doku
  - impressum_agent.py: 9 Pflichtangaben § 5 TMG + § 1 DL-InfoV
    als Pattern-Registry (Name, Email, Telefon, HR, USt-IdNr,
    Vertretungsberechtigt, Aufsichtsbehoerde, Berufsangaben, OS-Link)
  - business_scope-aware (OS-Link nur fuer ecommerce, Aufsichtsbehoerde
    nur fuer regulated_profession/financial/insurance)
  - Phase-1 ist Pattern-Match-only (kein LLM), demonstriert die
    Schnittstelle. Phase 2 ersetzt Pattern durch System-Prompt + KB.
  - Smoke: minimal-Impressum triggert 4 Findings korrekt

Task #7 B1 Playwright Mobile-Verifikation:
  - consent-tester/services/mobile_reachability_scanner.py: echte
    WebKit-launch + p.devices['iPhone 15'] preset + de-DE locale +
    Europe/Berlin timezone
  - Footer-Anchor-Suche via locator("footer >> text=/.../i") fuer
    13 Reopen-Phrasen
  - Tap-Target-Boundingbox-Messung (Apple HIG / WCAG ≥44x44)
  - Click-Behavior: DOM-Modal-Snapshot vor/nach, erkennt CMP-Open
  - Output: has_anchor, anchor_text, tap_target_px, click_opens_cmp,
    engine_meta, screenshot_b64 (Footer-Crop wenn kein Anchor)
  - consent-tester/routes_mobile.py POST /scan-mobile-reachability
  - Backend _b1_wiring erweitert: ruft Mobile-Endpoint zuerst,
    Fallback auf statischen HTTP-Fetch. Mobile-Daten enrichen
    finding.mobile_playwright + Severity-Bump bei
    tap-target<44 / click-doesnt-open-CMP.
  KNOWN: WebKit-System-Libs sind im Dockerfile ergaenzt (Stage 1.a-
    Commit), greifen aber erst nach CI/CD-Rebuild des consent-tester.
    Bis dahin faellt B1 sauber auf statischen Fetch zurueck.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-06 22:20:25 +02:00

487 lines
17 KiB
Python

"""
Consent Tester Service — Playwright-based 3-phase cookie consent test.
Tests what scripts/cookies load BEFORE consent, AFTER rejection, and AFTER acceptance.
Runs as independent microservice on port 8094.
"""
import logging
from datetime import datetime, timezone
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from services.consent_scanner import run_consent_test, ConsentTestResult
from services.authenticated_scanner import run_authenticated_test, AuthTestResult
from services.playwright_scanner import scan_website_playwright
from services.dsi_discovery import discover_dsi_documents, DSIDiscoveryResult
from services.page_screenshot import (
capture_page_evidence,
capture_page_overlapping_slices,
)
from checks.banner_runner import map_scan_to_checks
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s")
logger = logging.getLogger(__name__)
app = FastAPI(title="BreakPilot Consent Tester", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
class ScanRequest(BaseModel):
url: str
timeout_per_phase: int = 10 # seconds to wait after page load
categories: list[str] = [] # empty = test all categories
class ScanResponse(BaseModel):
url: str
banner_detected: bool
banner_provider: str
phases: dict
summary: dict
scanned_at: str
category_tests: list = []
banner_checks: dict = {}
structured_checks: list = []
completeness_pct: int = 0
correctness_pct: int = 0
tcf_vendors: list = [] # Resolved TCF vendor list from GVL
cmp_payloads: list[dict] = [] # P48: raw CMP JSON-payloads (Usercentrics/OneTrust/...) captured during scan
vendor_details: list[dict] = [] # P50: per-vendor detail-modal-extracts (Beschreibung/Cookies/Opt-Out/Privacy)
cookies_detailed: list[dict] = [] # P59b: full cookie details for behavior-validation (name,value,domain,expires,phase,declared_category)
banner_screenshot_b64: str = "" # P85: base64-PNG des Banners (initial-view)
from routes_matrix import router as matrix_router
from routes_mobile import router as mobile_router
app.include_router(matrix_router)
app.include_router(mobile_router)
@app.get("/health")
async def health():
return {"status": "healthy", "service": "consent-tester"}
@app.post("/scan", response_model=ScanResponse)
async def scan_consent(req: ScanRequest):
"""Run 3-phase consent test on a URL."""
logger.info("Starting consent test for %s", req.url)
result = await run_consent_test(req.url, req.timeout_per_phase, req.categories)
# Build raw response dict for structured check mapping
phases = {
"before_consent": {
"scripts": result.before_scripts,
"cookies": result.before_cookies,
"tracking_services": result.before_tracking,
"violations": [v.__dict__ for v in result.before_violations],
},
"after_reject": {
"scripts": result.reject_scripts,
"cookies": result.reject_cookies,
"new_tracking": result.reject_new_tracking,
"violations": [v.__dict__ for v in result.reject_violations],
},
"after_accept": {
"scripts": result.accept_scripts,
"cookies": result.accept_cookies,
"new_tracking": result.accept_new_tracking,
"undocumented": result.accept_undocumented,
},
}
banner_checks_data = {
"has_impressum_link": result.banner_has_impressum_link,
"has_dse_link": result.banner_has_dse_link,
"violations": [v.__dict__ for v in result.banner_text_violations],
}
# Map to L1/L2 hierarchy
raw_for_mapping = {
"banner_detected": result.banner_detected,
"banner_provider": result.banner_provider,
"phases": phases,
"banner_checks": banner_checks_data,
}
check_result = map_scan_to_checks(raw_for_mapping)
return ScanResponse(
url=req.url,
banner_detected=result.banner_detected,
banner_provider=result.banner_provider,
tcf_vendors=result.tcf_vendors,
phases=phases,
summary={
"critical": sum(1 for v in result.reject_violations if v.severity == "CRITICAL"),
"high": len(result.before_violations) + sum(1 for v in result.banner_text_violations if v.severity == "HIGH"),
"undocumented": len(result.accept_undocumented),
"total_violations": len(result.before_violations) + len(result.reject_violations) + len(result.banner_text_violations),
"category_violations": sum(len(ct.violations) for ct in result.category_tests),
"categories_tested": len(result.category_tests),
"banner_text_issues": len(result.banner_text_violations),
},
banner_checks=banner_checks_data,
structured_checks=check_result["structured_checks"],
completeness_pct=check_result["completeness_pct"],
correctness_pct=check_result["correctness_pct"],
scanned_at=datetime.now(timezone.utc).isoformat(),
category_tests=[{
"category": ct.category,
"category_label": ct.category_label,
"tracking_services": ct.tracking_services,
"violations": ct.violations,
"provider_details_visible": getattr(ct, "provider_details_visible", False),
"cookies_set": ct.cookies_set,
} for ct in result.category_tests] if result.category_tests else [],
cmp_payloads=result.cmp_payloads, # P48
vendor_details=result.vendor_details, # P50
cookies_detailed=result.cookies_detailed, # P59b
banner_screenshot_b64=result.banner_screenshot_b64, # P85
)
class AuthScanRequest(BaseModel):
url: str
username: str
password: str
username_selector: str = ""
password_selector: str = ""
submit_selector: str = ""
class AuthCheckInfo(BaseModel):
found: bool = False
text: str = ""
legal_ref: str = ""
class AuthScanResponse(BaseModel):
url: str
authenticated: bool
login_error: str = ""
checks: dict[str, AuthCheckInfo]
findings_count: int
scanned_at: str
LEGAL_REFS = {
"cancel_subscription": "§312k BGB (Kuendigungsbutton)",
"delete_account": "Art. 17 DSGVO (Recht auf Loeschung)",
"export_data": "Art. 20 DSGVO (Datenportabilitaet)",
"consent_settings": "Art. 7 Abs. 3 DSGVO (Widerruf der Einwilligung)",
"profile_visible": "Art. 15 DSGVO (Auskunftsrecht)",
}
@app.post("/authenticated-scan", response_model=AuthScanResponse)
async def authenticated_scan(req: AuthScanRequest):
"""Test post-login functionality. Credentials are destroyed after test."""
logger.info("Starting authenticated test for %s", req.url)
result = await run_authenticated_test(
url=req.url,
username=req.username,
password=req.password,
username_selector=req.username_selector,
password_selector=req.password_selector,
submit_selector=req.submit_selector,
)
checks = {
"cancel_subscription": AuthCheckInfo(
found=result.cancel_subscription.found,
text=result.cancel_subscription.text,
legal_ref=LEGAL_REFS["cancel_subscription"],
),
"delete_account": AuthCheckInfo(
found=result.delete_account.found,
text=result.delete_account.text,
legal_ref=LEGAL_REFS["delete_account"],
),
"export_data": AuthCheckInfo(
found=result.export_data.found,
text=result.export_data.text,
legal_ref=LEGAL_REFS["export_data"],
),
"consent_settings": AuthCheckInfo(
found=result.consent_settings.found,
text=result.consent_settings.text,
legal_ref=LEGAL_REFS["consent_settings"],
),
"profile_visible": AuthCheckInfo(
found=result.profile_visible.found,
text=result.profile_visible.text,
legal_ref=LEGAL_REFS["profile_visible"],
),
}
missing = sum(1 for c in checks.values() if not c.found)
return AuthScanResponse(
url=req.url,
authenticated=result.authenticated,
login_error=result.login_error,
checks=checks,
findings_count=missing,
scanned_at=datetime.now(timezone.utc).isoformat(),
)
# ═══════════════════════════════════════════════════════════════
# PLAYWRIGHT WEBSITE SCAN (Phase 10 — replaces httpx scanner)
# ═══════════════════════════════════════════════════════════════
class WebsiteScanRequest(BaseModel):
url: str
max_pages: int = 15
click_nav: bool = True
class PageInfo(BaseModel):
url: str
status: int
title: str = ""
error: str = ""
class WebsiteScanResponse(BaseModel):
url: str
pages: list[PageInfo]
pages_count: int
external_scripts: list[str]
cookies: list[str]
page_htmls: dict[str, str] # url -> rendered HTML (for backend analysis)
scanned_at: str
@app.post("/website-scan", response_model=WebsiteScanResponse)
async def website_scan(req: WebsiteScanRequest):
"""Scan website using Playwright — discovers pages via JS navigation + menu clicks."""
logger.info("Starting Playwright website scan for %s (max %d pages)", req.url, req.max_pages)
result = await scan_website_playwright(req.url, req.max_pages, req.click_nav)
# Build page HTML map (only successful pages, truncated)
page_htmls = {}
for p in result.pages:
if p.html and p.status < 400:
page_htmls[p.url] = p.html[:50000] # Cap at 50KB per page
return WebsiteScanResponse(
url=req.url,
pages=[PageInfo(url=p.url, status=p.status, title=p.title, error=p.error) for p in result.pages],
pages_count=len(result.pages),
external_scripts=result.external_scripts[:50],
cookies=result.all_cookies,
page_htmls=page_htmls,
scanned_at=datetime.now(timezone.utc).isoformat(),
)
# ═══════════════════════════════════════════════════════════════
# DSI DISCOVERY (finds all privacy + legal documents on a website)
# ═══════════════════════════════════════════════════════════════
class DSIDiscoveryRequest(BaseModel):
url: str
max_documents: int = 30
class DSIDocumentInfo(BaseModel):
title: str
url: str
source_url: str
language: str = ""
doc_type: str = ""
word_count: int = 0
text_preview: str = ""
full_text: str = ""
# D — Tab-getrennte HTML-Tabellen aus dem DOM (z.B. Cookie-Tabellen).
# Pro Tabelle ein Array von Zeilen, jede Zeile Tab-getrennt.
# Backend nutzt das fuer deterministischen Cookie-Tabellen-Parse.
tables: list[list[str]] = []
class DSIDiscoveryResponse(BaseModel):
url: str
documents: list[DSIDocumentInfo]
total_found: int
languages_detected: list[str]
errors: list[str]
scanned_at: str
# Raw CMP payloads captured during navigation (ePaaS, OneTrust, etc.).
# Backend uses these to build the per-vendor compliance table.
cmp_payloads: list[dict] = []
@app.post("/dsi-discovery", response_model=DSIDiscoveryResponse)
async def dsi_discovery(req: DSIDiscoveryRequest):
"""Discover all privacy/data protection documents on a website.
Generically finds DSI, AGB, Nutzungsbedingungen, Widerrufsbelehrung,
Cookie-Richtlinien etc. regardless of website technology or language.
Supports HTML pages, accordions, sidebars, PDFs, cross-domain links.
"""
logger.info("Starting DSI discovery for %s (max %d docs)", req.url, req.max_documents)
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=["--no-sandbox", "--disable-dev-shm-usage"],
)
context = await browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
)
page = await context.new_page()
try:
result = await discover_dsi_documents(page, req.url, req.max_documents)
finally:
await context.close()
await browser.close()
return DSIDiscoveryResponse(
url=req.url,
documents=[
DSIDocumentInfo(
title=d.title,
url=d.url,
source_url=d.source_url,
language=d.language,
doc_type=d.doc_type,
word_count=d.word_count,
text_preview=d.text[:500] if d.text else "",
full_text=d.text[:200000] if d.text else "",
tables=getattr(d, "tables", []) or [],
)
for d in result.documents
],
total_found=result.total_found,
languages_detected=result.languages_detected,
errors=result.errors,
scanned_at=datetime.now(timezone.utc).isoformat(),
cmp_payloads=result.cmp_payloads,
)
# ── Evidence screenshot (full-page + timestamp) ─────────────────────
class EvidenceRequest(BaseModel):
url: str
check_id: str = ""
class EvidenceResponse(BaseModel):
url: str # final URL after redirects
captured_at: str
width_px: int
height_px: int
accepted_banner: bool
expanded: int
png_b64: str
png_size: int
@app.post("/capture-evidence", response_model=EvidenceResponse)
async def capture_evidence(req: EvidenceRequest):
"""Full-page screenshot with timestamp banner — for legal evidence.
Used by backend to capture the Cookie-Richtlinie + DSE pages so the
audit-mail ZIP-attachment contains the exact rendered DOM at scan time.
"""
import base64 as _b64
logger.info("Capturing evidence screenshot for %s", req.url)
data = await capture_page_evidence(req.url, check_id=req.check_id)
png = data["png_bytes"]
return EvidenceResponse(
url=data["url"],
captured_at=data["captured_at"],
width_px=data["width_px"],
height_px=data["height_px"],
accepted_banner=data["accepted_banner"],
expanded=data["expanded"],
png_b64=_b64.b64encode(png).decode("ascii") if png else "",
png_size=len(png) if png else 0,
)
# ── Evidence slices (overlapping scrolling screenshots) ─────────────
class EvidenceSlicesRequest(BaseModel):
url: str
check_id: str = ""
viewport_h: int = 1024
overlap_px: int = 200
max_slices: int = 40
class EvidenceSliceItem(BaseModel):
idx: int
ts: str
top_y: int
bot_y: int
sha256: str
png_b64: str
png_size: int
class EvidenceSlicesResponse(BaseModel):
url: str
total_height_px: int
width_px: int
accepted_banner: bool
expanded: int
slices: list[EvidenceSliceItem]
@app.post("/capture-evidence-slices", response_model=EvidenceSlicesResponse)
async def capture_evidence_slices(req: EvidenceSlicesRequest):
"""Overlapping viewport-screenshots fuer lueckenlose Beweiskette.
Jede Slice ueberlappt die vorherige um overlap_px Pixel — jeder Cookie
erscheint in mind. einem Bild, an Slice-Grenzen sogar in zwei. Dedup
nach Cookie-Name eliminiert die Doppel im Endresultat.
"""
logger.info("Capturing overlapping evidence slices for %s", req.url)
data = await capture_page_overlapping_slices(
req.url, check_id=req.check_id,
viewport_h=req.viewport_h, overlap_px=req.overlap_px,
max_slices=req.max_slices,
)
return EvidenceSlicesResponse(
url=data["url"],
total_height_px=data["total_height_px"],
width_px=data["width_px"],
accepted_banner=data["accepted_banner"],
expanded=data["expanded"],
slices=[EvidenceSliceItem(**s) for s in data["slices"]],
)
# ── Admin: CMP discoveries (Phase E) ────────────────────────────────
@app.get("/cmp-discoveries")
async def cmp_discoveries(limit: int = 200):
"""List LLM-discovered CMP patterns (Phase E auto-promote log)."""
from services.cmp_discovery_log import list_discoveries
return {"discoveries": list_discoveries(limit=limit)}
@app.delete("/cmp-discoveries/{disc_id}")
async def cmp_discovery_delete(disc_id: int):
"""Delete a discovery + its auto-promoted module (rollback)."""
from services.cmp_discovery_log import delete_discovery
ok = delete_discovery(disc_id)
return {"deleted": ok, "id": disc_id}