Files
breakpilot-compliance/consent-tester/main.py
T
Benjamin Admin ff796fb480 feat: B12 Chatbot-Cookie-Klassifikation (#19) + Cookie-Matrix scan + safetykon test
#19 Chatbot-Cookie-Klassifikation:
  - chat_providers.json KB mit 11 Providern (iAdvize, Intercom, Tidio,
    Drift, Userlike, Zendesk, LivePerson, HubSpot, Vertex AI, OpenAI,
    Anthropic Claude). Pro Provider: Cookie-Pattern-Regex,
    typical_retention_days, tn_functions vs cp_functions, ai_capable.
  - chatbot_cookie_classification_check.py mit 4 KORRIGIERTEN Checks:
      CHAT-COOKIE-CLASS-001 (MED) — TN deklariert + Vendor-Purpose
        erwähnt Targeting/Analytics/A-B-Tests
      CHAT-COOKIE-CLASS-002 (MED) — Provider hat tn+cp Funktionen,
        Tabelle nennt nur eine Seite → keine Einwilligungs-Differenzierung
      CHAT-COOKIE-PURPOSE-001 (LOW) — Zweck zu generisch (Art. 13
        DSGVO konkret)
      CHAT-COOKIE-RETENTION-001 (HIGH) — deklariert <90d, KB-typisch
        >365d → vermutlich unterdeklariert
    NEU vs vorigem Plan: kein "eigene Banner-Kategorie Chat/AI"-Check —
    gesetzlich nicht vorgeschrieben (Vermischung Zweck-Transparenz vs
    Kategorie-Name). Anwender-Frage berechtigt, Konzept geschärft.
  - _b12_wiring.py + Orchestrator-Wire + V2-Compose-Slot
  - Cookie-Inventar mit [Chat]/[Chat+AI]-Tag pro Cookie-Name (KB-Lookup)
  - Smoke (3 Vendors / 5 Cookies): 9 findings korrekt (3 HIGH RETENTION,
    3 MEDIUM CLASS-001, 4 LOW PURPOSE)

Cookie-Matrix Scan (Browser-Vergleich gegen safetykon.de):
  - consent-tester/services/cookie_behavior_per_browser.py: eigener
    fokussierter Scanner. Pro Browser-Profile: cookies before / after
    reject / after accept in separaten Kontexten. Sequenzielle Runs
    statt parallel (Race-Conditions).
  - routes_cookie_matrix.py POST /scan-cookie-matrix
  - Live-Test safetykon.de: chromium=1, firefox=0, webkit=1, mobile-
    safari=1 nach reject — Firefox setzt KEIN Cookie nach Reject!
    (consent-tester Rebuild brachte playwright install-deps für system-libs)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-06 23:25:20 +02:00

489 lines
17 KiB
Python

"""
Consent Tester Service — Playwright-based 3-phase cookie consent test.
Tests what scripts/cookies load BEFORE consent, AFTER rejection, and AFTER acceptance.
Runs as independent microservice on port 8094.
"""
import logging
from datetime import datetime, timezone
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from services.consent_scanner import run_consent_test, ConsentTestResult
from services.authenticated_scanner import run_authenticated_test, AuthTestResult
from services.playwright_scanner import scan_website_playwright
from services.dsi_discovery import discover_dsi_documents, DSIDiscoveryResult
from services.page_screenshot import (
capture_page_evidence,
capture_page_overlapping_slices,
)
from checks.banner_runner import map_scan_to_checks
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s")
logger = logging.getLogger(__name__)
app = FastAPI(title="BreakPilot Consent Tester", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
class ScanRequest(BaseModel):
url: str
timeout_per_phase: int = 10 # seconds to wait after page load
categories: list[str] = [] # empty = test all categories
class ScanResponse(BaseModel):
url: str
banner_detected: bool
banner_provider: str
phases: dict
summary: dict
scanned_at: str
category_tests: list = []
banner_checks: dict = {}
structured_checks: list = []
completeness_pct: int = 0
correctness_pct: int = 0
tcf_vendors: list = [] # Resolved TCF vendor list from GVL
cmp_payloads: list[dict] = [] # P48: raw CMP JSON-payloads (Usercentrics/OneTrust/...) captured during scan
vendor_details: list[dict] = [] # P50: per-vendor detail-modal-extracts (Beschreibung/Cookies/Opt-Out/Privacy)
cookies_detailed: list[dict] = [] # P59b: full cookie details for behavior-validation (name,value,domain,expires,phase,declared_category)
banner_screenshot_b64: str = "" # P85: base64-PNG des Banners (initial-view)
from routes_matrix import router as matrix_router
from routes_mobile import router as mobile_router
from routes_cookie_matrix import router as cookie_matrix_router
app.include_router(matrix_router)
app.include_router(mobile_router)
app.include_router(cookie_matrix_router)
@app.get("/health")
async def health():
return {"status": "healthy", "service": "consent-tester"}
@app.post("/scan", response_model=ScanResponse)
async def scan_consent(req: ScanRequest):
"""Run 3-phase consent test on a URL."""
logger.info("Starting consent test for %s", req.url)
result = await run_consent_test(req.url, req.timeout_per_phase, req.categories)
# Build raw response dict for structured check mapping
phases = {
"before_consent": {
"scripts": result.before_scripts,
"cookies": result.before_cookies,
"tracking_services": result.before_tracking,
"violations": [v.__dict__ for v in result.before_violations],
},
"after_reject": {
"scripts": result.reject_scripts,
"cookies": result.reject_cookies,
"new_tracking": result.reject_new_tracking,
"violations": [v.__dict__ for v in result.reject_violations],
},
"after_accept": {
"scripts": result.accept_scripts,
"cookies": result.accept_cookies,
"new_tracking": result.accept_new_tracking,
"undocumented": result.accept_undocumented,
},
}
banner_checks_data = {
"has_impressum_link": result.banner_has_impressum_link,
"has_dse_link": result.banner_has_dse_link,
"violations": [v.__dict__ for v in result.banner_text_violations],
}
# Map to L1/L2 hierarchy
raw_for_mapping = {
"banner_detected": result.banner_detected,
"banner_provider": result.banner_provider,
"phases": phases,
"banner_checks": banner_checks_data,
}
check_result = map_scan_to_checks(raw_for_mapping)
return ScanResponse(
url=req.url,
banner_detected=result.banner_detected,
banner_provider=result.banner_provider,
tcf_vendors=result.tcf_vendors,
phases=phases,
summary={
"critical": sum(1 for v in result.reject_violations if v.severity == "CRITICAL"),
"high": len(result.before_violations) + sum(1 for v in result.banner_text_violations if v.severity == "HIGH"),
"undocumented": len(result.accept_undocumented),
"total_violations": len(result.before_violations) + len(result.reject_violations) + len(result.banner_text_violations),
"category_violations": sum(len(ct.violations) for ct in result.category_tests),
"categories_tested": len(result.category_tests),
"banner_text_issues": len(result.banner_text_violations),
},
banner_checks=banner_checks_data,
structured_checks=check_result["structured_checks"],
completeness_pct=check_result["completeness_pct"],
correctness_pct=check_result["correctness_pct"],
scanned_at=datetime.now(timezone.utc).isoformat(),
category_tests=[{
"category": ct.category,
"category_label": ct.category_label,
"tracking_services": ct.tracking_services,
"violations": ct.violations,
"provider_details_visible": getattr(ct, "provider_details_visible", False),
"cookies_set": ct.cookies_set,
} for ct in result.category_tests] if result.category_tests else [],
cmp_payloads=result.cmp_payloads, # P48
vendor_details=result.vendor_details, # P50
cookies_detailed=result.cookies_detailed, # P59b
banner_screenshot_b64=result.banner_screenshot_b64, # P85
)
class AuthScanRequest(BaseModel):
url: str
username: str
password: str
username_selector: str = ""
password_selector: str = ""
submit_selector: str = ""
class AuthCheckInfo(BaseModel):
found: bool = False
text: str = ""
legal_ref: str = ""
class AuthScanResponse(BaseModel):
url: str
authenticated: bool
login_error: str = ""
checks: dict[str, AuthCheckInfo]
findings_count: int
scanned_at: str
LEGAL_REFS = {
"cancel_subscription": "§312k BGB (Kuendigungsbutton)",
"delete_account": "Art. 17 DSGVO (Recht auf Loeschung)",
"export_data": "Art. 20 DSGVO (Datenportabilitaet)",
"consent_settings": "Art. 7 Abs. 3 DSGVO (Widerruf der Einwilligung)",
"profile_visible": "Art. 15 DSGVO (Auskunftsrecht)",
}
@app.post("/authenticated-scan", response_model=AuthScanResponse)
async def authenticated_scan(req: AuthScanRequest):
"""Test post-login functionality. Credentials are destroyed after test."""
logger.info("Starting authenticated test for %s", req.url)
result = await run_authenticated_test(
url=req.url,
username=req.username,
password=req.password,
username_selector=req.username_selector,
password_selector=req.password_selector,
submit_selector=req.submit_selector,
)
checks = {
"cancel_subscription": AuthCheckInfo(
found=result.cancel_subscription.found,
text=result.cancel_subscription.text,
legal_ref=LEGAL_REFS["cancel_subscription"],
),
"delete_account": AuthCheckInfo(
found=result.delete_account.found,
text=result.delete_account.text,
legal_ref=LEGAL_REFS["delete_account"],
),
"export_data": AuthCheckInfo(
found=result.export_data.found,
text=result.export_data.text,
legal_ref=LEGAL_REFS["export_data"],
),
"consent_settings": AuthCheckInfo(
found=result.consent_settings.found,
text=result.consent_settings.text,
legal_ref=LEGAL_REFS["consent_settings"],
),
"profile_visible": AuthCheckInfo(
found=result.profile_visible.found,
text=result.profile_visible.text,
legal_ref=LEGAL_REFS["profile_visible"],
),
}
missing = sum(1 for c in checks.values() if not c.found)
return AuthScanResponse(
url=req.url,
authenticated=result.authenticated,
login_error=result.login_error,
checks=checks,
findings_count=missing,
scanned_at=datetime.now(timezone.utc).isoformat(),
)
# ═══════════════════════════════════════════════════════════════
# PLAYWRIGHT WEBSITE SCAN (Phase 10 — replaces httpx scanner)
# ═══════════════════════════════════════════════════════════════
class WebsiteScanRequest(BaseModel):
url: str
max_pages: int = 15
click_nav: bool = True
class PageInfo(BaseModel):
url: str
status: int
title: str = ""
error: str = ""
class WebsiteScanResponse(BaseModel):
url: str
pages: list[PageInfo]
pages_count: int
external_scripts: list[str]
cookies: list[str]
page_htmls: dict[str, str] # url -> rendered HTML (for backend analysis)
scanned_at: str
@app.post("/website-scan", response_model=WebsiteScanResponse)
async def website_scan(req: WebsiteScanRequest):
"""Scan website using Playwright — discovers pages via JS navigation + menu clicks."""
logger.info("Starting Playwright website scan for %s (max %d pages)", req.url, req.max_pages)
result = await scan_website_playwright(req.url, req.max_pages, req.click_nav)
# Build page HTML map (only successful pages, truncated)
page_htmls = {}
for p in result.pages:
if p.html and p.status < 400:
page_htmls[p.url] = p.html[:50000] # Cap at 50KB per page
return WebsiteScanResponse(
url=req.url,
pages=[PageInfo(url=p.url, status=p.status, title=p.title, error=p.error) for p in result.pages],
pages_count=len(result.pages),
external_scripts=result.external_scripts[:50],
cookies=result.all_cookies,
page_htmls=page_htmls,
scanned_at=datetime.now(timezone.utc).isoformat(),
)
# ═══════════════════════════════════════════════════════════════
# DSI DISCOVERY (finds all privacy + legal documents on a website)
# ═══════════════════════════════════════════════════════════════
class DSIDiscoveryRequest(BaseModel):
url: str
max_documents: int = 30
class DSIDocumentInfo(BaseModel):
title: str
url: str
source_url: str
language: str = ""
doc_type: str = ""
word_count: int = 0
text_preview: str = ""
full_text: str = ""
# D — Tab-getrennte HTML-Tabellen aus dem DOM (z.B. Cookie-Tabellen).
# Pro Tabelle ein Array von Zeilen, jede Zeile Tab-getrennt.
# Backend nutzt das fuer deterministischen Cookie-Tabellen-Parse.
tables: list[list[str]] = []
class DSIDiscoveryResponse(BaseModel):
url: str
documents: list[DSIDocumentInfo]
total_found: int
languages_detected: list[str]
errors: list[str]
scanned_at: str
# Raw CMP payloads captured during navigation (ePaaS, OneTrust, etc.).
# Backend uses these to build the per-vendor compliance table.
cmp_payloads: list[dict] = []
@app.post("/dsi-discovery", response_model=DSIDiscoveryResponse)
async def dsi_discovery(req: DSIDiscoveryRequest):
"""Discover all privacy/data protection documents on a website.
Generically finds DSI, AGB, Nutzungsbedingungen, Widerrufsbelehrung,
Cookie-Richtlinien etc. regardless of website technology or language.
Supports HTML pages, accordions, sidebars, PDFs, cross-domain links.
"""
logger.info("Starting DSI discovery for %s (max %d docs)", req.url, req.max_documents)
from playwright.async_api import async_playwright
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=["--no-sandbox", "--disable-dev-shm-usage"],
)
context = await browser.new_context(
user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
)
page = await context.new_page()
try:
result = await discover_dsi_documents(page, req.url, req.max_documents)
finally:
await context.close()
await browser.close()
return DSIDiscoveryResponse(
url=req.url,
documents=[
DSIDocumentInfo(
title=d.title,
url=d.url,
source_url=d.source_url,
language=d.language,
doc_type=d.doc_type,
word_count=d.word_count,
text_preview=d.text[:500] if d.text else "",
full_text=d.text[:200000] if d.text else "",
tables=getattr(d, "tables", []) or [],
)
for d in result.documents
],
total_found=result.total_found,
languages_detected=result.languages_detected,
errors=result.errors,
scanned_at=datetime.now(timezone.utc).isoformat(),
cmp_payloads=result.cmp_payloads,
)
# ── Evidence screenshot (full-page + timestamp) ─────────────────────
class EvidenceRequest(BaseModel):
url: str
check_id: str = ""
class EvidenceResponse(BaseModel):
url: str # final URL after redirects
captured_at: str
width_px: int
height_px: int
accepted_banner: bool
expanded: int
png_b64: str
png_size: int
@app.post("/capture-evidence", response_model=EvidenceResponse)
async def capture_evidence(req: EvidenceRequest):
"""Full-page screenshot with timestamp banner — for legal evidence.
Used by backend to capture the Cookie-Richtlinie + DSE pages so the
audit-mail ZIP-attachment contains the exact rendered DOM at scan time.
"""
import base64 as _b64
logger.info("Capturing evidence screenshot for %s", req.url)
data = await capture_page_evidence(req.url, check_id=req.check_id)
png = data["png_bytes"]
return EvidenceResponse(
url=data["url"],
captured_at=data["captured_at"],
width_px=data["width_px"],
height_px=data["height_px"],
accepted_banner=data["accepted_banner"],
expanded=data["expanded"],
png_b64=_b64.b64encode(png).decode("ascii") if png else "",
png_size=len(png) if png else 0,
)
# ── Evidence slices (overlapping scrolling screenshots) ─────────────
class EvidenceSlicesRequest(BaseModel):
url: str
check_id: str = ""
viewport_h: int = 1024
overlap_px: int = 200
max_slices: int = 40
class EvidenceSliceItem(BaseModel):
idx: int
ts: str
top_y: int
bot_y: int
sha256: str
png_b64: str
png_size: int
class EvidenceSlicesResponse(BaseModel):
url: str
total_height_px: int
width_px: int
accepted_banner: bool
expanded: int
slices: list[EvidenceSliceItem]
@app.post("/capture-evidence-slices", response_model=EvidenceSlicesResponse)
async def capture_evidence_slices(req: EvidenceSlicesRequest):
"""Overlapping viewport-screenshots fuer lueckenlose Beweiskette.
Jede Slice ueberlappt die vorherige um overlap_px Pixel — jeder Cookie
erscheint in mind. einem Bild, an Slice-Grenzen sogar in zwei. Dedup
nach Cookie-Name eliminiert die Doppel im Endresultat.
"""
logger.info("Capturing overlapping evidence slices for %s", req.url)
data = await capture_page_overlapping_slices(
req.url, check_id=req.check_id,
viewport_h=req.viewport_h, overlap_px=req.overlap_px,
max_slices=req.max_slices,
)
return EvidenceSlicesResponse(
url=data["url"],
total_height_px=data["total_height_px"],
width_px=data["width_px"],
accepted_banner=data["accepted_banner"],
expanded=data["expanded"],
slices=[EvidenceSliceItem(**s) for s in data["slices"]],
)
# ── Admin: CMP discoveries (Phase E) ────────────────────────────────
@app.get("/cmp-discoveries")
async def cmp_discoveries(limit: int = 200):
"""List LLM-discovered CMP patterns (Phase E auto-promote log)."""
from services.cmp_discovery_log import list_discoveries
return {"discoveries": list_discoveries(limit=limit)}
@app.delete("/cmp-discoveries/{disc_id}")
async def cmp_discovery_delete(disc_id: int):
"""Delete a discovery + its auto-promoted module (rollback)."""
from services.cmp_discovery_log import delete_discovery
ok = delete_discovery(disc_id)
return {"deleted": ok, "id": disc_id}