From 4bf92f42b80341ac0c6b09b605170988fe01d473 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Wed, 29 Apr 2026 16:08:41 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=209=20=E2=80=94=20Authenticated?= =?UTF-8?q?=20Testing=20+=20Legal=20Basis=20Validator=20(lit.=20mapping)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 9: Playwright login + 5 post-login checks: - §312k BGB: Kündigungsbutton (2 Klicks) - Art. 17 DSGVO: Konto löschen - Art. 20 DSGVO: Daten exportieren - Art. 7(3): Einwilligungen widerrufen - Art. 15: Profildaten einsehen Auto-detects login form selectors. Credentials destroyed after test. Legal Basis Validator: Checks 7 common lit-mapping mistakes: - Cookie tracking on lit. f instead of lit. a (Planet49) - Analytics on lit. b (contract overextension) - Klarna without Art. 22 reference - Session recording without consent Integrated into website scan pipeline. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../compliance/api/agent_scan_routes.py | 17 ++ .../services/legal_basis_validator.py | 155 ++++++++++++ consent-tester/main.py | 88 +++++++ .../services/authenticated_scanner.py | 230 ++++++++++++++++++ 4 files changed, 490 insertions(+) create mode 100644 backend-compliance/compliance/services/legal_basis_validator.py create mode 100644 consent-tester/services/authenticated_scanner.py diff --git a/backend-compliance/compliance/api/agent_scan_routes.py b/backend-compliance/compliance/api/agent_scan_routes.py index 6554828..fa671dd 100644 --- a/backend-compliance/compliance/api/agent_scan_routes.py +++ b/backend-compliance/compliance/api/agent_scan_routes.py @@ -21,6 +21,7 @@ from compliance.services.dse_matcher import build_text_references, TextReference from compliance.services.mandatory_content_checker import ( check_mandatory_documents, check_dse_mandatory_content, MandatoryFinding, ) +from compliance.services.legal_basis_validator import validate_legal_bases logger = logging.getLogger(__name__) @@ -132,6 +133,22 @@ async def scan_website_endpoint(req: ScanRequest): text=f"{mf.text}" + (f" — {mf.suggestion}" if mf.suggestion else ""), )) + # Step 8b: Validate legal bases (lit. a-f) in DSE + if dse_text: + lit_findings = validate_legal_bases(dse_text) + for lf in lit_findings: + findings.append(ScanFinding( + code=f"LIT-{lf.purpose.upper()}", + severity=lf.severity, + text=lf.text, + text_reference=TextReferenceModel( + found=True, source_url=req.url, + original_text=lf.original_text, + issue="incorrect", correction_type="replace", + correction_text=f"Korrekte Rechtsgrundlage: {lf.correct_basis} ({lf.legal_ref})", + ) if lf.original_text else None, + )) + # Step 9: Generate corrections for pre-launch mode if not is_live and findings: await _add_corrections(findings, dse_text) diff --git a/backend-compliance/compliance/services/legal_basis_validator.py b/backend-compliance/compliance/services/legal_basis_validator.py new file mode 100644 index 0000000..58df91c --- /dev/null +++ b/backend-compliance/compliance/services/legal_basis_validator.py @@ -0,0 +1,155 @@ +""" +Legal Basis Validator — checks if the correct DSGVO legal basis (lit. a-f) +is used for each processing purpose in the privacy policy. + +Common mistakes: +- Cookie tracking on lit. f (legitimate interest) instead of lit. a (consent) +- Marketing emails on lit. f instead of lit. a +- Analytics on lit. b (contract) — incorrect overextension +- Klarna credit check without Art. 22 reference +""" + +import logging +import re +from dataclasses import dataclass + +logger = logging.getLogger(__name__) + + +@dataclass +class LitFinding: + purpose: str + stated_basis: str + correct_basis: str + severity: str + text: str + legal_ref: str + original_text: str = "" + + +# Purpose → correct legal basis mapping +# Based on: DSK Kurzpapiere, Planet49 (EuGH C-673/17), BGH Cookie-Urteil +CORRECT_BASIS: dict[str, dict] = { + "cookie_tracking": { + "correct": "lit. a (Einwilligung)", + "wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f", "legitimate interest"], + "detect_patterns": ["cookie", "tracking", "pixel", "analytics.*cookie"], + "ref": "EuGH C-673/17 (Planet49), §25 TDDDG", + }, + "web_analytics": { + "correct": "lit. a (Einwilligung)", + "wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f", "vertragserfuellung", "lit. b", "lit.b"], + "detect_patterns": ["google analytics", "webanalyse", "web analytics", "reichweitenmessung", + "nutzungsanalyse", "hotjar", "matomo"], + "ref": "DSK Orientierungshilfe Telemedien, §25 TDDDG", + }, + "marketing_email": { + "correct": "lit. a (Einwilligung)", + "wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f"], + "detect_patterns": ["newsletter", "marketing.*mail", "werbe.*mail", "werbe.*email", + "marketing.*email", "werbliche.*kommunikation"], + "ref": "Art. 7 DSGVO, §7 UWG (Double Opt-In)", + }, + "remarketing": { + "correct": "lit. a (Einwilligung)", + "wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f"], + "detect_patterns": ["remarketing", "retargeting", "personalisierte werbung", + "personalized advertising", "custom audience"], + "ref": "§25 TDDDG, EuGH C-673/17", + }, + "credit_check": { + "correct": "lit. b/f + Art. 22 DSGVO Hinweis", + "wrong_patterns": [], # Not about wrong basis, but missing Art. 22 + "detect_patterns": ["bonitaet", "bonität", "kreditprüfung", "kreditpruefung", + "schufa", "auskunftei", "klarna.*rechnung", "ratenzahlung"], + "ref": "Art. 22 DSGVO (automatisierte Einzelentscheidung)", + "must_contain": ["art. 22", "art.22", "automatisierte entscheidung", + "automated decision", "einzelentscheidung"], + }, + "social_media_embed": { + "correct": "lit. a (Einwilligung)", + "wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f"], + "detect_patterns": ["facebook.*plugin", "social.*plugin", "like.*button", + "share.*button", "instagram.*embed", "twitter.*embed"], + "ref": "EuGH C-40/17 (Fashion ID), 2-Klick-Loesung", + }, + "session_recording": { + "correct": "lit. a (Einwilligung)", + "wrong_patterns": ["berechtigtes interesse", "lit. f", "lit.f"], + "detect_patterns": ["session.?recording", "session.?replay", "heatmap", + "mouseflow", "hotjar.*recording", "clarity.*recording", + "fullstory", "lucky orange"], + "ref": "§25 TDDDG, Aufzeichnung von Nutzerverhalten", + }, +} + + +def validate_legal_bases(dse_text: str) -> list[LitFinding]: + """Check if correct legal bases are used in the privacy policy.""" + findings = [] + text_lower = dse_text.lower() + + for purpose_id, rules in CORRECT_BASIS.items(): + # Step 1: Is this purpose mentioned in the DSE? + purpose_found = False + matched_text = "" + for pattern in rules["detect_patterns"]: + match = re.search(pattern, text_lower) + if match: + purpose_found = True + # Extract surrounding context (200 chars) + start = max(0, match.start() - 100) + end = min(len(text_lower), match.end() + 200) + matched_text = dse_text[start:end].strip() + break + + if not purpose_found: + continue + + context_lower = matched_text.lower() + + # Step 2: Check if wrong legal basis is stated + for wrong in rules["wrong_patterns"]: + if wrong in context_lower: + findings.append(LitFinding( + purpose=purpose_id, + stated_basis=wrong, + correct_basis=rules["correct"], + severity="HIGH", + text=f"Falsche Rechtsgrundlage: '{_purpose_label(purpose_id)}' nutzt " + f"'{wrong}' statt '{rules['correct']}'", + legal_ref=rules["ref"], + original_text=matched_text[:300], + )) + break + + # Step 3: Special check — must_contain (e.g., Art. 22 for credit checks) + if "must_contain" in rules: + has_required = any(req in context_lower for req in rules["must_contain"]) + if not has_required: + findings.append(LitFinding( + purpose=purpose_id, + stated_basis="(fehlt)", + correct_basis=rules["correct"], + severity="HIGH", + text=f"Pflichthinweis fehlt: '{_purpose_label(purpose_id)}' erwaehnt " + f"keine automatisierte Entscheidungsfindung ({rules['ref']})", + legal_ref=rules["ref"], + original_text=matched_text[:300], + )) + + return findings + + +def _purpose_label(purpose_id: str) -> str: + """German label for purpose ID.""" + labels = { + "cookie_tracking": "Cookie-Tracking", + "web_analytics": "Webanalyse", + "marketing_email": "Marketing-Emails/Newsletter", + "remarketing": "Remarketing/Retargeting", + "credit_check": "Bonitaetspruefung", + "social_media_embed": "Social Media Einbindung", + "session_recording": "Session Recording/Heatmaps", + } + return labels.get(purpose_id, purpose_id) diff --git a/consent-tester/main.py b/consent-tester/main.py index 50eae88..01faecd 100644 --- a/consent-tester/main.py +++ b/consent-tester/main.py @@ -13,6 +13,7 @@ from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from services.consent_scanner import run_consent_test, ConsentTestResult +from services.authenticated_scanner import run_authenticated_test, AuthTestResult logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s") logger = logging.getLogger(__name__) @@ -84,3 +85,90 @@ async def scan_consent(req: ScanRequest): }, scanned_at=datetime.now(timezone.utc).isoformat(), ) + + +class AuthScanRequest(BaseModel): + url: str + username: str + password: str + username_selector: str = "" + password_selector: str = "" + submit_selector: str = "" + + +class AuthCheckInfo(BaseModel): + found: bool = False + text: str = "" + legal_ref: str = "" + + +class AuthScanResponse(BaseModel): + url: str + authenticated: bool + login_error: str = "" + checks: dict[str, AuthCheckInfo] + findings_count: int + scanned_at: str + + +LEGAL_REFS = { + "cancel_subscription": "§312k BGB (Kuendigungsbutton)", + "delete_account": "Art. 17 DSGVO (Recht auf Loeschung)", + "export_data": "Art. 20 DSGVO (Datenportabilitaet)", + "consent_settings": "Art. 7 Abs. 3 DSGVO (Widerruf der Einwilligung)", + "profile_visible": "Art. 15 DSGVO (Auskunftsrecht)", +} + + +@app.post("/authenticated-scan", response_model=AuthScanResponse) +async def authenticated_scan(req: AuthScanRequest): + """Test post-login functionality. Credentials are destroyed after test.""" + logger.info("Starting authenticated test for %s", req.url) + + result = await run_authenticated_test( + url=req.url, + username=req.username, + password=req.password, + username_selector=req.username_selector, + password_selector=req.password_selector, + submit_selector=req.submit_selector, + ) + + checks = { + "cancel_subscription": AuthCheckInfo( + found=result.cancel_subscription.found, + text=result.cancel_subscription.text, + legal_ref=LEGAL_REFS["cancel_subscription"], + ), + "delete_account": AuthCheckInfo( + found=result.delete_account.found, + text=result.delete_account.text, + legal_ref=LEGAL_REFS["delete_account"], + ), + "export_data": AuthCheckInfo( + found=result.export_data.found, + text=result.export_data.text, + legal_ref=LEGAL_REFS["export_data"], + ), + "consent_settings": AuthCheckInfo( + found=result.consent_settings.found, + text=result.consent_settings.text, + legal_ref=LEGAL_REFS["consent_settings"], + ), + "profile_visible": AuthCheckInfo( + found=result.profile_visible.found, + text=result.profile_visible.text, + legal_ref=LEGAL_REFS["profile_visible"], + ), + } + + missing = sum(1 for c in checks.values() if not c.found) + + return AuthScanResponse( + url=req.url, + authenticated=result.authenticated, + login_error=result.login_error, + checks=checks, + findings_count=missing, + scanned_at=datetime.now(timezone.utc).isoformat(), + ) diff --git a/consent-tester/services/authenticated_scanner.py b/consent-tester/services/authenticated_scanner.py new file mode 100644 index 0000000..58d8fe7 --- /dev/null +++ b/consent-tester/services/authenticated_scanner.py @@ -0,0 +1,230 @@ +""" +Authenticated Scanner — tests post-login functionality. + +Checks §312k BGB (cancellation), Art. 17 (deletion), Art. 20 (export), +Art. 7(3) (consent withdrawal), Art. 15 (data access). + +Credentials are NEVER stored, logged, or transmitted beyond the browser context. +""" + +import logging +from dataclasses import dataclass, field + +from playwright.async_api import async_playwright, Page + +logger = logging.getLogger(__name__) + +USER_AGENT = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" +) + + +@dataclass +class CheckResult: + found: bool = False + selector: str = "" + text: str = "" + clicks_needed: int = 0 + screenshot: bytes = b"" + + +@dataclass +class AuthTestResult: + authenticated: bool = False + login_error: str = "" + cancel_subscription: CheckResult = field(default_factory=CheckResult) + delete_account: CheckResult = field(default_factory=CheckResult) + export_data: CheckResult = field(default_factory=CheckResult) + consent_settings: CheckResult = field(default_factory=CheckResult) + profile_visible: CheckResult = field(default_factory=CheckResult) + + +# Search patterns for each check (DE + EN) +CANCEL_PATTERNS = [ + "kündigen", "kuendigen", "vertrag beenden", "abo beenden", + "mitgliedschaft kündigen", "cancel subscription", "unsubscribe", + "cancel membership", "vertrag kündigen", +] + +DELETE_PATTERNS = [ + "konto löschen", "konto loeschen", "account löschen", "delete account", + "account deaktivieren", "profil löschen", "remove account", +] + +EXPORT_PATTERNS = [ + "daten exportieren", "daten herunterladen", "export data", "download data", + "meine daten", "datenauskunft", "data download", "daten anfordern", +] + +CONSENT_PATTERNS = [ + "einwilligung", "einstellungen", "datenschutz-einstellungen", + "consent", "privacy settings", "cookie-einstellungen", + "werbeeinstellungen", "marketing preferences", +] + +PROFILE_PATTERNS = [ + "profil", "mein konto", "kontodaten", "persönliche daten", + "profile", "my account", "account settings", "personal data", +] + + +async def run_authenticated_test( + url: str, + username: str, + password: str, + username_selector: str = "", + password_selector: str = "", + submit_selector: str = "", +) -> AuthTestResult: + """Run authenticated area test. Credentials are destroyed after test.""" + result = AuthTestResult() + + async with async_playwright() as p: + browser = await p.chromium.launch( + headless=True, + args=["--no-sandbox", "--disable-dev-shm-usage"], + ) + context = await browser.new_context(user_agent=USER_AGENT) + page = await context.new_page() + + try: + # Step 1: Login + await page.goto(url, wait_until="networkidle", timeout=30000) + await page.wait_for_timeout(2000) + + login_ok = await _try_login( + page, username, password, + username_selector, password_selector, submit_selector, + ) + + if not login_ok: + result.login_error = "Login fehlgeschlagen — Formular nicht gefunden oder Credentials falsch" + await context.close() + await browser.close() + return result + + result.authenticated = True + await page.wait_for_timeout(3000) + + # Step 2: Check cancellation (§312k BGB) + result.cancel_subscription = await _check_patterns(page, CANCEL_PATTERNS, "cancel") + logger.info("Cancel check: found=%s", result.cancel_subscription.found) + + # Step 3: Check delete account (Art. 17 DSGVO) + result.delete_account = await _check_patterns(page, DELETE_PATTERNS, "delete") + + # Step 4: Check data export (Art. 20 DSGVO) + result.export_data = await _check_patterns(page, EXPORT_PATTERNS, "export") + + # Step 5: Check consent settings (Art. 7(3) DSGVO) + result.consent_settings = await _check_patterns(page, CONSENT_PATTERNS, "consent") + + # Step 6: Check profile visibility (Art. 15 DSGVO) + result.profile_visible = await _check_patterns(page, PROFILE_PATTERNS, "profile") + + except Exception as e: + logger.error("Authenticated test failed: %s", e) + result.login_error = str(e) + finally: + # CRITICAL: Destroy context — wipes all credentials, cookies, session + await context.close() + await browser.close() + + return result + + +async def _try_login( + page: Page, username: str, password: str, + user_sel: str, pass_sel: str, submit_sel: str, +) -> bool: + """Attempt to fill and submit login form.""" + try: + # Auto-detect selectors if not provided + if not user_sel: + for sel in ['input[type="email"]', 'input[name="email"]', 'input[name="username"]', + 'input[name="login"]', 'input[id="email"]', 'input[id="username"]']: + if await page.locator(sel).count() > 0: + user_sel = sel + break + if not pass_sel: + for sel in ['input[type="password"]', 'input[name="password"]', 'input[id="password"]']: + if await page.locator(sel).count() > 0: + pass_sel = sel + break + if not submit_sel: + for sel in ['button[type="submit"]', 'input[type="submit"]', + 'button:has-text("Anmelden")', 'button:has-text("Login")', + 'button:has-text("Sign in")', 'button:has-text("Einloggen")']: + if await page.locator(sel).count() > 0: + submit_sel = sel + break + + if not user_sel or not pass_sel: + return False + + await page.fill(user_sel, username) + await page.fill(pass_sel, password) + + if submit_sel: + await page.click(submit_sel) + else: + await page.press(pass_sel, "Enter") + + await page.wait_for_timeout(5000) + + # Check if login succeeded (URL changed or login form disappeared) + still_on_login = await page.locator('input[type="password"]').count() > 0 + return not still_on_login + + except Exception as e: + logger.warning("Login attempt failed: %s", e) + return False + + +async def _check_patterns(page: Page, patterns: list[str], check_name: str) -> CheckResult: + """Search current page and navigation for patterns.""" + result = CheckResult() + + # Check current page text + for pattern in patterns: + try: + locator = page.get_by_text(pattern, exact=False) + count = await locator.count() + if count > 0: + text = await locator.first.text_content() + result.found = True + result.text = (text or "").strip()[:100] + return result + except Exception: + continue + + # Check links/buttons + for pattern in patterns: + try: + for sel in [f'a:has-text("{pattern}")', f'button:has-text("{pattern}")', + f'[href*="{pattern.replace(" ", "-")}"]']: + locator = page.locator(sel) + if await locator.count() > 0: + result.found = True + result.selector = sel + result.text = pattern + return result + except Exception: + continue + + # Check navigation menus (common locations for account management) + for nav_sel in ['nav', '[role="navigation"]', '.sidebar', '.account-menu', '#account']: + try: + nav = page.locator(nav_sel) + if await nav.count() > 0: + nav_text = (await nav.first.text_content() or "").lower() + for pattern in patterns: + if pattern.lower() in nav_text: + result.found = True + result.text = f"In Navigation: {pattern}" + return result + except Exception: + continue + + return result