feat: Phase 9 — Authenticated Testing + Legal Basis Validator (lit. mapping)

Phase 9: Playwright login + 5 post-login checks:
- §312k BGB: Kündigungsbutton (2 Klicks)
- Art. 17 DSGVO: Konto löschen
- Art. 20 DSGVO: Daten exportieren
- Art. 7(3): Einwilligungen widerrufen
- Art. 15: Profildaten einsehen
Auto-detects login form selectors. Credentials destroyed after test.

Legal Basis Validator: Checks 7 common lit-mapping mistakes:
- Cookie tracking on lit. f instead of lit. a (Planet49)
- Analytics on lit. b (contract overextension)
- Klarna without Art. 22 reference
- Session recording without consent
Integrated into website scan pipeline.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-04-29 16:08:41 +02:00
parent 8336c01c5c
commit 4bf92f42b8
4 changed files with 490 additions and 0 deletions
@@ -21,6 +21,7 @@ from compliance.services.dse_matcher import build_text_references, TextReference
from compliance.services.mandatory_content_checker import (
check_mandatory_documents, check_dse_mandatory_content, MandatoryFinding,
)
from compliance.services.legal_basis_validator import validate_legal_bases
logger = logging.getLogger(__name__)
@@ -132,6 +133,22 @@ async def scan_website_endpoint(req: ScanRequest):
text=f"{mf.text}" + (f"{mf.suggestion}" if mf.suggestion else ""),
))
# Step 8b: Validate legal bases (lit. a-f) in DSE
if dse_text:
lit_findings = validate_legal_bases(dse_text)
for lf in lit_findings:
findings.append(ScanFinding(
code=f"LIT-{lf.purpose.upper()}",
severity=lf.severity,
text=lf.text,
text_reference=TextReferenceModel(
found=True, source_url=req.url,
original_text=lf.original_text,
issue="incorrect", correction_type="replace",
correction_text=f"Korrekte Rechtsgrundlage: {lf.correct_basis} ({lf.legal_ref})",
) if lf.original_text else None,
))
# Step 9: Generate corrections for pre-launch mode
if not is_live and findings:
await _add_corrections(findings, dse_text)
@@ -0,0 +1,155 @@
"""
Legal Basis Validator — checks if the correct DSGVO legal basis (lit. a-f)
is used for each processing purpose in the privacy policy.
Common mistakes:
- Cookie tracking on lit. f (legitimate interest) instead of lit. a (consent)
- Marketing emails on lit. f instead of lit. a
- Analytics on lit. b (contract) — incorrect overextension
- Klarna credit check without Art. 22 reference
"""
import logging
import re
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class LitFinding:
purpose: str
stated_basis: str
correct_basis: str
severity: str
text: str
legal_ref: str
original_text: str = ""
# Purpose → correct legal basis mapping
# Based on: DSK Kurzpapiere, Planet49 (EuGH C-673/17), BGH Cookie-Urteil
CORRECT_BASIS: dict[str, dict] = {
"cookie_tracking": {
"correct": "lit. a (Einwilligung)",
"wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f", "legitimate interest"],
"detect_patterns": ["cookie", "tracking", "pixel", "analytics.*cookie"],
"ref": "EuGH C-673/17 (Planet49), §25 TDDDG",
},
"web_analytics": {
"correct": "lit. a (Einwilligung)",
"wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f", "vertragserfuellung", "lit. b", "lit.b"],
"detect_patterns": ["google analytics", "webanalyse", "web analytics", "reichweitenmessung",
"nutzungsanalyse", "hotjar", "matomo"],
"ref": "DSK Orientierungshilfe Telemedien, §25 TDDDG",
},
"marketing_email": {
"correct": "lit. a (Einwilligung)",
"wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f"],
"detect_patterns": ["newsletter", "marketing.*mail", "werbe.*mail", "werbe.*email",
"marketing.*email", "werbliche.*kommunikation"],
"ref": "Art. 7 DSGVO, §7 UWG (Double Opt-In)",
},
"remarketing": {
"correct": "lit. a (Einwilligung)",
"wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f"],
"detect_patterns": ["remarketing", "retargeting", "personalisierte werbung",
"personalized advertising", "custom audience"],
"ref": "§25 TDDDG, EuGH C-673/17",
},
"credit_check": {
"correct": "lit. b/f + Art. 22 DSGVO Hinweis",
"wrong_patterns": [], # Not about wrong basis, but missing Art. 22
"detect_patterns": ["bonitaet", "bonität", "kreditprüfung", "kreditpruefung",
"schufa", "auskunftei", "klarna.*rechnung", "ratenzahlung"],
"ref": "Art. 22 DSGVO (automatisierte Einzelentscheidung)",
"must_contain": ["art. 22", "art.22", "automatisierte entscheidung",
"automated decision", "einzelentscheidung"],
},
"social_media_embed": {
"correct": "lit. a (Einwilligung)",
"wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f"],
"detect_patterns": ["facebook.*plugin", "social.*plugin", "like.*button",
"share.*button", "instagram.*embed", "twitter.*embed"],
"ref": "EuGH C-40/17 (Fashion ID), 2-Klick-Loesung",
},
"session_recording": {
"correct": "lit. a (Einwilligung)",
"wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f"],
"detect_patterns": ["session.?recording", "session.?replay", "heatmap",
"mouseflow", "hotjar.*recording", "clarity.*recording",
"fullstory", "lucky orange"],
"ref": "§25 TDDDG, Aufzeichnung von Nutzerverhalten",
},
}
def validate_legal_bases(dse_text: str) -> list[LitFinding]:
"""Check if correct legal bases are used in the privacy policy."""
findings = []
text_lower = dse_text.lower()
for purpose_id, rules in CORRECT_BASIS.items():
# Step 1: Is this purpose mentioned in the DSE?
purpose_found = False
matched_text = ""
for pattern in rules["detect_patterns"]:
match = re.search(pattern, text_lower)
if match:
purpose_found = True
# Extract surrounding context (200 chars)
start = max(0, match.start() - 100)
end = min(len(text_lower), match.end() + 200)
matched_text = dse_text[start:end].strip()
break
if not purpose_found:
continue
context_lower = matched_text.lower()
# Step 2: Check if wrong legal basis is stated
for wrong in rules["wrong_patterns"]:
if wrong in context_lower:
findings.append(LitFinding(
purpose=purpose_id,
stated_basis=wrong,
correct_basis=rules["correct"],
severity="HIGH",
text=f"Falsche Rechtsgrundlage: '{_purpose_label(purpose_id)}' nutzt "
f"'{wrong}' statt '{rules['correct']}'",
legal_ref=rules["ref"],
original_text=matched_text[:300],
))
break
# Step 3: Special check — must_contain (e.g., Art. 22 for credit checks)
if "must_contain" in rules:
has_required = any(req in context_lower for req in rules["must_contain"])
if not has_required:
findings.append(LitFinding(
purpose=purpose_id,
stated_basis="(fehlt)",
correct_basis=rules["correct"],
severity="HIGH",
text=f"Pflichthinweis fehlt: '{_purpose_label(purpose_id)}' erwaehnt "
f"keine automatisierte Entscheidungsfindung ({rules['ref']})",
legal_ref=rules["ref"],
original_text=matched_text[:300],
))
return findings
def _purpose_label(purpose_id: str) -> str:
"""German label for purpose ID."""
labels = {
"cookie_tracking": "Cookie-Tracking",
"web_analytics": "Webanalyse",
"marketing_email": "Marketing-Emails/Newsletter",
"remarketing": "Remarketing/Retargeting",
"credit_check": "Bonitaetspruefung",
"social_media_embed": "Social Media Einbindung",
"session_recording": "Session Recording/Heatmaps",
}
return labels.get(purpose_id, purpose_id)
+88
View File
@@ -13,6 +13,7 @@ from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from services.consent_scanner import run_consent_test, ConsentTestResult
from services.authenticated_scanner import run_authenticated_test, AuthTestResult
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s")
logger = logging.getLogger(__name__)
@@ -84,3 +85,90 @@ async def scan_consent(req: ScanRequest):
},
scanned_at=datetime.now(timezone.utc).isoformat(),
)
class AuthScanRequest(BaseModel):
url: str
username: str
password: str
username_selector: str = ""
password_selector: str = ""
submit_selector: str = ""
class AuthCheckInfo(BaseModel):
found: bool = False
text: str = ""
legal_ref: str = ""
class AuthScanResponse(BaseModel):
url: str
authenticated: bool
login_error: str = ""
checks: dict[str, AuthCheckInfo]
findings_count: int
scanned_at: str
LEGAL_REFS = {
"cancel_subscription": "§312k BGB (Kuendigungsbutton)",
"delete_account": "Art. 17 DSGVO (Recht auf Loeschung)",
"export_data": "Art. 20 DSGVO (Datenportabilitaet)",
"consent_settings": "Art. 7 Abs. 3 DSGVO (Widerruf der Einwilligung)",
"profile_visible": "Art. 15 DSGVO (Auskunftsrecht)",
}
@app.post("/authenticated-scan", response_model=AuthScanResponse)
async def authenticated_scan(req: AuthScanRequest):
"""Test post-login functionality. Credentials are destroyed after test."""
logger.info("Starting authenticated test for %s", req.url)
result = await run_authenticated_test(
url=req.url,
username=req.username,
password=req.password,
username_selector=req.username_selector,
password_selector=req.password_selector,
submit_selector=req.submit_selector,
)
checks = {
"cancel_subscription": AuthCheckInfo(
found=result.cancel_subscription.found,
text=result.cancel_subscription.text,
legal_ref=LEGAL_REFS["cancel_subscription"],
),
"delete_account": AuthCheckInfo(
found=result.delete_account.found,
text=result.delete_account.text,
legal_ref=LEGAL_REFS["delete_account"],
),
"export_data": AuthCheckInfo(
found=result.export_data.found,
text=result.export_data.text,
legal_ref=LEGAL_REFS["export_data"],
),
"consent_settings": AuthCheckInfo(
found=result.consent_settings.found,
text=result.consent_settings.text,
legal_ref=LEGAL_REFS["consent_settings"],
),
"profile_visible": AuthCheckInfo(
found=result.profile_visible.found,
text=result.profile_visible.text,
legal_ref=LEGAL_REFS["profile_visible"],
),
}
missing = sum(1 for c in checks.values() if not c.found)
return AuthScanResponse(
url=req.url,
authenticated=result.authenticated,
login_error=result.login_error,
checks=checks,
findings_count=missing,
scanned_at=datetime.now(timezone.utc).isoformat(),
)
@@ -0,0 +1,230 @@
"""
Authenticated Scanner — tests post-login functionality.
Checks §312k BGB (cancellation), Art. 17 (deletion), Art. 20 (export),
Art. 7(3) (consent withdrawal), Art. 15 (data access).
Credentials are NEVER stored, logged, or transmitted beyond the browser context.
"""
import logging
from dataclasses import dataclass, field
from playwright.async_api import async_playwright, Page
logger = logging.getLogger(__name__)
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
@dataclass
class CheckResult:
found: bool = False
selector: str = ""
text: str = ""
clicks_needed: int = 0
screenshot: bytes = b""
@dataclass
class AuthTestResult:
authenticated: bool = False
login_error: str = ""
cancel_subscription: CheckResult = field(default_factory=CheckResult)
delete_account: CheckResult = field(default_factory=CheckResult)
export_data: CheckResult = field(default_factory=CheckResult)
consent_settings: CheckResult = field(default_factory=CheckResult)
profile_visible: CheckResult = field(default_factory=CheckResult)
# Search patterns for each check (DE + EN)
CANCEL_PATTERNS = [
"kündigen", "kuendigen", "vertrag beenden", "abo beenden",
"mitgliedschaft kündigen", "cancel subscription", "unsubscribe",
"cancel membership", "vertrag kündigen",
]
DELETE_PATTERNS = [
"konto löschen", "konto loeschen", "account löschen", "delete account",
"account deaktivieren", "profil löschen", "remove account",
]
EXPORT_PATTERNS = [
"daten exportieren", "daten herunterladen", "export data", "download data",
"meine daten", "datenauskunft", "data download", "daten anfordern",
]
CONSENT_PATTERNS = [
"einwilligung", "einstellungen", "datenschutz-einstellungen",
"consent", "privacy settings", "cookie-einstellungen",
"werbeeinstellungen", "marketing preferences",
]
PROFILE_PATTERNS = [
"profil", "mein konto", "kontodaten", "persönliche daten",
"profile", "my account", "account settings", "personal data",
]
async def run_authenticated_test(
url: str,
username: str,
password: str,
username_selector: str = "",
password_selector: str = "",
submit_selector: str = "",
) -> AuthTestResult:
"""Run authenticated area test. Credentials are destroyed after test."""
result = AuthTestResult()
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=["--no-sandbox", "--disable-dev-shm-usage"],
)
context = await browser.new_context(user_agent=USER_AGENT)
page = await context.new_page()
try:
# Step 1: Login
await page.goto(url, wait_until="networkidle", timeout=30000)
await page.wait_for_timeout(2000)
login_ok = await _try_login(
page, username, password,
username_selector, password_selector, submit_selector,
)
if not login_ok:
result.login_error = "Login fehlgeschlagen — Formular nicht gefunden oder Credentials falsch"
await context.close()
await browser.close()
return result
result.authenticated = True
await page.wait_for_timeout(3000)
# Step 2: Check cancellation (§312k BGB)
result.cancel_subscription = await _check_patterns(page, CANCEL_PATTERNS, "cancel")
logger.info("Cancel check: found=%s", result.cancel_subscription.found)
# Step 3: Check delete account (Art. 17 DSGVO)
result.delete_account = await _check_patterns(page, DELETE_PATTERNS, "delete")
# Step 4: Check data export (Art. 20 DSGVO)
result.export_data = await _check_patterns(page, EXPORT_PATTERNS, "export")
# Step 5: Check consent settings (Art. 7(3) DSGVO)
result.consent_settings = await _check_patterns(page, CONSENT_PATTERNS, "consent")
# Step 6: Check profile visibility (Art. 15 DSGVO)
result.profile_visible = await _check_patterns(page, PROFILE_PATTERNS, "profile")
except Exception as e:
logger.error("Authenticated test failed: %s", e)
result.login_error = str(e)
finally:
# CRITICAL: Destroy context — wipes all credentials, cookies, session
await context.close()
await browser.close()
return result
async def _try_login(
page: Page, username: str, password: str,
user_sel: str, pass_sel: str, submit_sel: str,
) -> bool:
"""Attempt to fill and submit login form."""
try:
# Auto-detect selectors if not provided
if not user_sel:
for sel in ['input[type="email"]', 'input[name="email"]', 'input[name="username"]',
'input[name="login"]', 'input[id="email"]', 'input[id="username"]']:
if await page.locator(sel).count() > 0:
user_sel = sel
break
if not pass_sel:
for sel in ['input[type="password"]', 'input[name="password"]', 'input[id="password"]']:
if await page.locator(sel).count() > 0:
pass_sel = sel
break
if not submit_sel:
for sel in ['button[type="submit"]', 'input[type="submit"]',
'button:has-text("Anmelden")', 'button:has-text("Login")',
'button:has-text("Sign in")', 'button:has-text("Einloggen")']:
if await page.locator(sel).count() > 0:
submit_sel = sel
break
if not user_sel or not pass_sel:
return False
await page.fill(user_sel, username)
await page.fill(pass_sel, password)
if submit_sel:
await page.click(submit_sel)
else:
await page.press(pass_sel, "Enter")
await page.wait_for_timeout(5000)
# Check if login succeeded (URL changed or login form disappeared)
still_on_login = await page.locator('input[type="password"]').count() > 0
return not still_on_login
except Exception as e:
logger.warning("Login attempt failed: %s", e)
return False
async def _check_patterns(page: Page, patterns: list[str], check_name: str) -> CheckResult:
"""Search current page and navigation for patterns."""
result = CheckResult()
# Check current page text
for pattern in patterns:
try:
locator = page.get_by_text(pattern, exact=False)
count = await locator.count()
if count > 0:
text = await locator.first.text_content()
result.found = True
result.text = (text or "").strip()[:100]
return result
except Exception:
continue
# Check links/buttons
for pattern in patterns:
try:
for sel in [f'a:has-text("{pattern}")', f'button:has-text("{pattern}")',
f'[href*="{pattern.replace(" ", "-")}"]']:
locator = page.locator(sel)
if await locator.count() > 0:
result.found = True
result.selector = sel
result.text = pattern
return result
except Exception:
continue
# Check navigation menus (common locations for account management)
for nav_sel in ['nav', '[role="navigation"]', '.sidebar', '.account-menu', '#account']:
try:
nav = page.locator(nav_sel)
if await nav.count() > 0:
nav_text = (await nav.first.text_content() or "").lower()
for pattern in patterns:
if pattern.lower() in nav_text:
result.found = True
result.text = f"In Navigation: {pattern}"
return result
except Exception:
continue
return result