From cb4b352846b4eaf3199d06cb7c2fb6c0b1260e1e Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sun, 7 Jun 2026 17:20:13 +0200 Subject: [PATCH] feat(b17): Playwright Audit-Walk-Video (Stufe 1, #7) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Nimmt einen kompletten Site-Walk als WebKit-Browser-Session inkl. Video auf. Reviewer kann nachträglich exakt nachvollziehen, wie die Engine zum Befund kam. consent-tester: - services/audit_walk_recorder.py: Playwright record_video_dir, iPhone-Viewport-free 1280×800. Goto homepage → Banner-Accept (Best-Effort: 12 Text-Phrasen + 5 CMP-Fallback-Selektoren) → Footer-Links sammeln (compliance-relevant gefiltert) → pro Link navigate + Dwell-Time → JSON-Action-Index mit UTC-Timestamps + SHA-256 vom Video als Manipulation-Schutz. - routes_audit_walk.py: POST /scan-audit-walk; statische Serves für /audit-walks/{walk_id}/video.webm + walk.json. - main.py: Router registriert. backend: - _b17_wiring.py: Triggert /scan-audit-walk, speichert Walk-Metadata in state["audit_walk"]. Render-Block mit HTML-Tabelle aller Actions (HH:MM:SS + Aktion + Detail) + Links zu Video und walk.json. - _orchestrator.py: run_b17 nach run_b16, async-aufgerufen. - mail_render_v2/_compose.py: audit_walk_html im V2-Layout. - test_b17_audit_walk.py: 8 Tests (Render-Pfade + Wiring). Stufe-2 (Akkordeon-Expansion) und Stufe-3 (DSMS-CID-Anchor) folgen separat. Real-World-Smoke gegen Elli: - 581 KB Video, SHA-256 verifizierbar - 3 Footer-Links besucht (Impressum, Datenschutzerkl., Nutzungs-) - 6 Actions im JSON-Index Co-Authored-By: Claude Opus 4.7 (1M context) --- .../compliance/api/agent_check/_b17_wiring.py | 133 +++++++++ .../api/agent_check/_orchestrator.py | 2 + .../services/mail_render_v2/_compose.py | 2 + .../tests/test_b17_audit_walk.py | 95 ++++++ consent-tester/main.py | 2 + consent-tester/routes_audit_walk.py | 53 ++++ .../services/audit_walk_recorder.py | 275 ++++++++++++++++++ 7 files changed, 562 insertions(+) create mode 100644 backend-compliance/compliance/api/agent_check/_b17_wiring.py create mode 100644 backend-compliance/tests/test_b17_audit_walk.py create mode 100644 consent-tester/routes_audit_walk.py create mode 100644 consent-tester/services/audit_walk_recorder.py diff --git a/backend-compliance/compliance/api/agent_check/_b17_wiring.py b/backend-compliance/compliance/api/agent_check/_b17_wiring.py new file mode 100644 index 00000000..731247b9 --- /dev/null +++ b/backend-compliance/compliance/api/agent_check/_b17_wiring.py @@ -0,0 +1,133 @@ +"""B17 wiring — Audit-Walk-Recorder. + +Triggert beim consent-tester einen kompletten Playwright-Site-Walk +mit Video-Aufzeichnung. Result: Video + JSON-Action-Index mit +Timestamps + SHA-256-Hash für Manipulation-Schutz. + +Speichert nur die Walk-Metadata + Video-URL im state. Der eigentliche +File-Body bleibt im consent-tester-Volume (Stufe 1). Stufe 3 wird das +Video zu DSMS-IPFS hochladen und die CID hier einbinden. +""" + +from __future__ import annotations + +import html +import logging +from urllib.parse import urlparse + +import httpx + +from ._constants import CONSENT_TESTER_URL + +logger = logging.getLogger(__name__) + + +async def run_b17(state: dict) -> None: + """Trigger walk recording + store metadata in state.""" + req = state.get("req") + if req is None: + return + homepage = "" + for d in req.documents: + if d.url: + p = urlparse(d.url) + if p.scheme and p.netloc: + homepage = f"{p.scheme}://{p.netloc}/" + break + if not homepage: + return + + walk: dict = {} + try: + async with httpx.AsyncClient(timeout=180.0) as c: + r = await c.post( + f"{CONSENT_TESTER_URL}/scan-audit-walk", + json={"url": homepage, "dwell_s": 4.0, "max_links": 8}, + timeout=180.0, + ) + if r.status_code == 200: + walk = r.json() + except Exception as e: + logger.warning("B17 audit-walk request failed: %s", e) + return + + if not walk or not walk.get("walk_id"): + return + + state["audit_walk"] = walk + state["audit_walk_html"] = _render(walk) + logger.info( + "B17 audit-walk: %s · %d actions · video %d bytes · sha256 %s", + walk.get("walk_id"), + len(walk.get("actions") or []), + (walk.get("video") or {}).get("size_bytes", 0), + ((walk.get("video") or {}).get("sha256") or "")[:12], + ) + + +def _video_link(walk_id: str) -> str: + """External URL for the recorded video (when consent-tester is + reachable from the audit reviewer).""" + return f"{CONSENT_TESTER_URL}/audit-walks/{walk_id}/video.webm" + + +def _render(walk: dict) -> str: + wid = walk.get("walk_id") or "" + video = walk.get("video") or {} + actions = walk.get("actions") or [] + nav_count = sum(1 for a in actions if a.get("action") == "navigate") + sha = (video.get("sha256") or "")[:12] + size_kb = round((video.get("size_bytes") or 0) / 1024, 1) + walk_link = _video_link(wid) + meta_link = f"{CONSENT_TESTER_URL}/audit-walks/{wid}/walk.json" + + rows = [] + for a in actions: + ts = (a.get("timestamp") or "")[11:19] # HH:MM:SS + act = a.get("action") or "" + detail = "" + if act == "goto" or act == "navigate": + detail = (a.get("url") or "")[:120] + if a.get("status"): + detail += f" → HTTP {a['status']}" + elif act == "accept_banner": + r = a.get("result") or "" + if r == "clicked": + detail = f"Banner akzeptiert ({a.get('phrase') or a.get('selector') or ''})" + else: + detail = "Kein Accept-Button gefunden" + elif act == "discover_footer_links": + detail = f"{a.get('count', 0)} Compliance-Links im Footer" + rows.append( + f"{html.escape(ts)}" + f"{html.escape(act)}" + f"" + f"{html.escape(detail)}" + ) + return ( + "
" + "

" + "🎥 Audit-Walk-Video (Beweis-Aufzeichnung)" + "

" + "

" + f"Video: " + f"video.webm " + f"({size_kb} KB, SHA-256 {html.escape(sha)}…) · " + f"Metadata: " + f"walk.json" + "

" + "

" + f"{nav_count} Compliance-Seiten besucht, jede 4 Sek " + "verweilt — Reviewer kann den Audit-Walk nachverfolgen." + "

" + "" + "" + "" + "" + "" + "" + "".join(rows) + "
Zeit (UTC)AktionDetail
" + "
" + ) diff --git a/backend-compliance/compliance/api/agent_check/_orchestrator.py b/backend-compliance/compliance/api/agent_check/_orchestrator.py index 235492d2..e99d2645 100644 --- a/backend-compliance/compliance/api/agent_check/_orchestrator.py +++ b/backend-compliance/compliance/api/agent_check/_orchestrator.py @@ -27,6 +27,7 @@ from ._b13_wiring import run_b13 from ._b14_wiring import run_b14 from ._b15_wiring import run_b15 from ._b16_wiring import run_b16 +from ._b17_wiring import run_b17 from ._constants import _compliance_check_jobs from ._phase_a_resolve import run_phase_a from ._phase_b_profile_check import run_phase_b @@ -78,6 +79,7 @@ async def run_compliance_check(check_id: str, req) -> None: run_b14(state) # Widersprüchliche Speicherdauer im selben Doc run_b15(state) # AI-Act Rechtsgrundlage (LLM-Vendor auf lit. f) run_b16(state) # Footer-Label-vs-URL-Slug-Drift + await run_b17(state) # Audit-Walk-Video (Beweis-Aufzeichnung) # Phase D-3 top/mid/bot: Step 5 HTML blocks await run_phase_d3_top(state) await run_phase_d3_mid(state) diff --git a/backend-compliance/compliance/services/mail_render_v2/_compose.py b/backend-compliance/compliance/services/mail_render_v2/_compose.py index ba7baafb..eab6b1bd 100644 --- a/backend-compliance/compliance/services/mail_render_v2/_compose.py +++ b/backend-compliance/compliance/services/mail_render_v2/_compose.py @@ -56,6 +56,8 @@ def compose_v2(state: dict) -> str: state.get("ai_legal_basis_html", ""), # B16 Footer-Label-vs-URL-Slug-Drift (SEO / Bookmarks) state.get("url_slug_drift_html", ""), + # B17 Audit-Walk-Video (Beweis-Aufzeichnung) + state.get("audit_walk_html", ""), # Browser-Matrix (Stage 1.c) state.get("browser_matrix_html", ""), # All legacy build_*_html() wrapped in V2 sections — preserves diff --git a/backend-compliance/tests/test_b17_audit_walk.py b/backend-compliance/tests/test_b17_audit_walk.py new file mode 100644 index 00000000..c0b5cb44 --- /dev/null +++ b/backend-compliance/tests/test_b17_audit_walk.py @@ -0,0 +1,95 @@ +"""Tests for B17 Audit-Walk-Wiring (Stufe 1).""" + +import asyncio +from unittest.mock import patch, MagicMock, AsyncMock + +import pytest + +from compliance.api.agent_check._b17_wiring import _render, run_b17 + + +_FAKE_WALK = { + "walk_id": "abc123def456", + "url": "https://example.com/", + "started_at": "2026-06-07T10:00:00+00:00", + "completed_at": "2026-06-07T10:00:30+00:00", + "engine": "playwright/webkit", + "viewport": "1280x800", + "actions": [ + {"timestamp": "2026-06-07T10:00:00+00:00", "action": "goto", + "url": "https://example.com/", "status": 200}, + {"timestamp": "2026-06-07T10:00:02+00:00", "action": "accept_banner", + "result": "clicked", "phrase": "alle akzeptieren"}, + {"timestamp": "2026-06-07T10:00:04+00:00", + "action": "discover_footer_links", "count": 3, "links": []}, + {"timestamp": "2026-06-07T10:00:06+00:00", "action": "navigate", + "url": "https://example.com/datenschutz", + "anchor_text": "Datenschutz", "status": 200, + "title": "Datenschutzerklärung"}, + ], + "video": { + "filename": "video.webm", + "size_bytes": 512000, + "sha256": "a1b2c3d4e5f67890fedcba0987654321ffffeeeeddddccccbbbbaaaa00001111", + }, +} + + +class TestRender: + def test_renders_walk_id_and_link(self): + html = _render(_FAKE_WALK) + assert "abc123def456" in html + assert "video.webm" in html + assert "walk.json" in html + + def test_includes_sha_prefix(self): + html = _render(_FAKE_WALK) + # First 12 chars of sha + assert "a1b2c3d4e5f6" in html + + def test_action_table_lists_all_actions(self): + html = _render(_FAKE_WALK) + # All four actions appear as + assert html.count("") >= 4 # incl. header + + def test_nav_count_reflects_navigate_actions(self): + html = _render(_FAKE_WALK) + # 1 navigate in the fixture + assert "1 Compliance-Seiten" in html + + +class TestRunB17: + def test_no_request_skipped(self): + state = {} + asyncio.run(run_b17(state)) + assert "audit_walk" not in state + + def test_no_url_skipped(self): + state = {"req": MagicMock(documents=[MagicMock(url="")])} + asyncio.run(run_b17(state)) + assert "audit_walk" not in state + + def test_consent_tester_failure_skipped(self): + req = MagicMock(documents=[MagicMock(url="https://example.com/dse")]) + state = {"req": req} + with patch( + "compliance.api.agent_check._b17_wiring.httpx.AsyncClient" + ) as mock_client: + instance = mock_client.return_value.__aenter__.return_value + instance.post = AsyncMock(side_effect=Exception("nope")) + asyncio.run(run_b17(state)) + assert "audit_walk" not in state + + def test_success_populates_state(self): + req = MagicMock(documents=[MagicMock(url="https://example.com/dse")]) + state = {"req": req} + resp = MagicMock(status_code=200) + resp.json = MagicMock(return_value=_FAKE_WALK) + with patch( + "compliance.api.agent_check._b17_wiring.httpx.AsyncClient" + ) as mock_client: + instance = mock_client.return_value.__aenter__.return_value + instance.post = AsyncMock(return_value=resp) + asyncio.run(run_b17(state)) + assert state["audit_walk"]["walk_id"] == "abc123def456" + assert "video.webm" in state["audit_walk_html"] diff --git a/consent-tester/main.py b/consent-tester/main.py index d7c972b4..bf1df978 100644 --- a/consent-tester/main.py +++ b/consent-tester/main.py @@ -63,9 +63,11 @@ class ScanResponse(BaseModel): from routes_matrix import router as matrix_router from routes_mobile import router as mobile_router from routes_cookie_matrix import router as cookie_matrix_router +from routes_audit_walk import router as audit_walk_router app.include_router(matrix_router) app.include_router(mobile_router) app.include_router(cookie_matrix_router) +app.include_router(audit_walk_router) @app.get("/health") diff --git a/consent-tester/routes_audit_walk.py b/consent-tester/routes_audit_walk.py new file mode 100644 index 00000000..a918f873 --- /dev/null +++ b/consent-tester/routes_audit_walk.py @@ -0,0 +1,53 @@ +"""Routes für Audit-Walk-Recorder (POST /scan-audit-walk + Video-Serve).""" + +from __future__ import annotations + +import os +from pathlib import Path + +from fastapi import APIRouter, HTTPException +from fastapi.responses import FileResponse +from pydantic import BaseModel + +from services.audit_walk_recorder import WALK_ROOT, record_audit_walk + +router = APIRouter() + + +class AuditWalkReq(BaseModel): + url: str + dwell_s: float = 5.0 + max_links: int = 8 + + +@router.post("/scan-audit-walk") +async def scan_audit_walk(req: AuditWalkReq) -> dict: + if not req.url or not req.url.startswith(("http://", "https://")): + raise HTTPException(400, "invalid url") + walk = await record_audit_walk( + req.url, + dwell_s=max(1.0, min(req.dwell_s, 10.0)), + max_links=max(1, min(req.max_links, 12)), + ) + return walk + + +@router.get("/audit-walks/{walk_id}/video.webm") +async def serve_walk_video(walk_id: str): + # Basic path-traversal guard + if not walk_id.isalnum() or len(walk_id) > 32: + raise HTTPException(400, "invalid walk_id") + path = Path(WALK_ROOT) / walk_id / "video.webm" + if not path.exists(): + raise HTTPException(404, "walk video not found") + return FileResponse(str(path), media_type="video/webm") + + +@router.get("/audit-walks/{walk_id}/walk.json") +async def serve_walk_meta(walk_id: str): + if not walk_id.isalnum() or len(walk_id) > 32: + raise HTTPException(400, "invalid walk_id") + path = Path(WALK_ROOT) / walk_id / "walk.json" + if not path.exists(): + raise HTTPException(404, "walk.json not found") + return FileResponse(str(path), media_type="application/json") diff --git a/consent-tester/services/audit_walk_recorder.py b/consent-tester/services/audit_walk_recorder.py new file mode 100644 index 00000000..739dff6a --- /dev/null +++ b/consent-tester/services/audit_walk_recorder.py @@ -0,0 +1,275 @@ +"""Playwright Audit-Walk-Recorder. + +Nimmt einen vollständigen Site-Walk per WebKit-Browser auf: + 1. Goto homepage + Banner-Akzeptieren (Best-Effort) + 2. Footer-Links sammeln (DSE, Impressum, AGB, Cookie, Widerruf, ...) + 3. Pro Link: navigate + 5s Lese-Verweildauer + 4. Video aufzeichnen (Playwright `record_video_dir`) + 5. JSON-Action-Index mit Timestamps + SHA-256 für + Manipulation-Schutz + +Output landet unter `/data/audit-walks/{walk_id}/`: + - `video.webm` — Playwright-Recording + - `walk.json` — Action-Index mit Timestamps + Hash + +Dauer pro Walk: ~30-60 Sekunden bei 6-8 Footer-Links. + +Stufe-1 dieser Suite. Stufe-2 (Akkordeon-Expansion) und +Stufe-3 (DSMS-CID-Anchor) folgen separat. +""" + +from __future__ import annotations + +import asyncio +import hashlib +import json +import logging +import os +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Any +from uuid import uuid4 + +logger = logging.getLogger(__name__) + +# Walk-Output-Root (Volume mount: /data ist im docker-compose definiert) +WALK_ROOT = os.getenv("AUDIT_WALK_DIR", "/data/audit-walks") + +# Footer-Link-Text-Hints — was wir als relevante Compliance-Anker +# erkennen. Wir laden NICHT jeden Footer-Link (sonst riesige Videos), +# sondern nur die compliance-relevanten. +_LINK_HINTS_LC = ( + "impressum", "imprint", "legal", + "datenschutz", "privacy", + "cookie", "cookies", + "agb", "geschäftsbedingung", "geschaeftsbedingung", + "nutzungsbedingung", "terms", + "widerruf", "withdrawal", "cancellation", + "einwilligung", "consent", +) + +# Banner-Accept-Buttons — Best-Effort-Liste. +_ACCEPT_PHRASES = ( + "alle akzeptieren", "alle zulassen", "akzeptieren", + "alles akzeptieren", "zustimmen", "einverstanden", + "accept all", "accept", "agree", "allow all", + "ok", "verstanden", +) + + +def _ts() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _sha256_file(path: Path) -> str: + h = hashlib.sha256() + with path.open("rb") as f: + for chunk in iter(lambda: f.read(65536), b""): + h.update(chunk) + return h.hexdigest() + + +async def _try_accept_banner(page) -> dict: + """Best-effort: click an accept button. Tries text patterns first, + then common CMP selectors as fallback. Returns action-event dict.""" + started = _ts() + for phrase in _ACCEPT_PHRASES: + try: + btn = page.get_by_role("button", name=phrase, exact=False).first + if await btn.count() > 0: + await btn.click(timeout=3000) + await page.wait_for_timeout(1500) + return { + "timestamp": started, "action": "accept_banner", + "result": "clicked", "phrase": phrase, + } + except Exception: + continue + # CMP-fallback selectors + cmp_selectors = ( + "#usercentrics-cmp button", + ".ot-sdk-container button.banner-actions-container .accept-btn", + ".cmp-modal button[aria-label*=accept i]", + "[data-testid=cookie-accept]", + "[aria-label*=akzeptieren i]", + "[aria-label*=accept i]", + ) + for sel in cmp_selectors: + try: + el = page.locator(sel).first + if await el.count() > 0: + await el.click(timeout=2000) + await page.wait_for_timeout(1500) + return { + "timestamp": started, "action": "accept_banner", + "result": "clicked", "selector": sel, + } + except Exception: + continue + return {"timestamp": started, "action": "accept_banner", + "result": "no_button_found"} + + +async def _collect_footer_links(page) -> list[dict]: + """Find compliance-relevant anchors inside the page footer.""" + try: + anchors = await page.eval_on_selector_all( + "footer a[href]", + "(els) => els.map(a => ({text: (a.innerText||'').trim(), " + "href: a.href}))", + ) + except Exception as e: + logger.warning("footer-anchor query failed: %s", e) + return [] + seen: set[str] = set() + out: list[dict] = [] + for a in anchors: + href = (a.get("href") or "").strip() + text = (a.get("text") or "").strip() + if not href or not text: + continue + tl = text.lower() + if not any(h in tl for h in _LINK_HINTS_LC): + continue + key = href.split("#")[0] + if key in seen: + continue + seen.add(key) + out.append({"text": text[:80], "href": href}) + if len(out) >= 10: + break + return out + + +async def _visit_link(page, link: dict, dwell_s: float = 5.0) -> dict: + """Navigate to `link.href`, dwell, capture title + status.""" + started = _ts() + start_t = time.monotonic() + status = 0 + title = "" + err = "" + try: + resp = await page.goto(link["href"], wait_until="domcontentloaded", + timeout=20000) + if resp is not None: + status = resp.status + await page.wait_for_timeout(int(dwell_s * 1000)) + try: + title = (await page.title())[:120] + except Exception: + pass + except Exception as e: + err = str(e)[:200] + return { + "timestamp": started, "action": "navigate", + "url": link["href"], "anchor_text": link["text"], + "status": status, "title": title, + "dwell_s": round(time.monotonic() - start_t, 2), + "error": err or None, + } + + +async def record_audit_walk( + url: str, dwell_s: float = 5.0, max_links: int = 8, +) -> dict[str, Any]: + """Run a full audit walk + record video. Returns walk metadata.""" + try: + from playwright.async_api import async_playwright + except Exception as e: + return {"error": f"playwright missing: {e}"} + + walk_id = uuid4().hex[:12] + out_dir = Path(WALK_ROOT) / walk_id + out_dir.mkdir(parents=True, exist_ok=True) + actions: list[dict] = [] + started_at = _ts() + err = None + + async with async_playwright() as p: + try: + browser = await p.webkit.launch(headless=True) + context = await browser.new_context( + viewport={"width": 1280, "height": 800}, + record_video_dir=str(out_dir), + record_video_size={"width": 1280, "height": 800}, + locale="de-DE", + ) + page = await context.new_page() + actions.append({ + "timestamp": _ts(), "action": "goto", + "url": url, + }) + try: + resp = await page.goto(url, wait_until="domcontentloaded", + timeout=30000) + actions[-1]["status"] = (resp.status if resp else 0) + except Exception as e: + actions[-1]["error"] = str(e)[:200] + await page.wait_for_timeout(2000) + + accept_event = await _try_accept_banner(page) + actions.append(accept_event) + + links = await _collect_footer_links(page) + actions.append({ + "timestamp": _ts(), "action": "discover_footer_links", + "count": len(links), "links": links[:max_links], + }) + + for link in links[:max_links]: + ev = await _visit_link(page, link, dwell_s=dwell_s) + actions.append(ev) + + await context.close() + await browser.close() + except Exception as e: + err = f"walk failed: {str(e)[:200]}" + logger.exception("walk failed") + + completed_at = _ts() + + # Find produced video file. Playwright writes the .webm with a + # random name when the context closes; rename it for stability. + video_meta: dict[str, Any] = {} + try: + candidates = sorted(out_dir.glob("*.webm")) + if candidates: + src = candidates[0] + dest = out_dir / "video.webm" + if src != dest: + src.rename(dest) + video_meta = { + "filename": "video.webm", + "size_bytes": dest.stat().st_size, + "sha256": _sha256_file(dest), + } + except Exception as e: + logger.warning("video rename failed: %s", e) + + walk_doc = { + "walk_id": walk_id, + "url": url, + "started_at": started_at, + "completed_at": completed_at, + "error": err, + "engine": "playwright/webkit", + "viewport": "1280x800", + "actions": actions, + "video": video_meta, + } + try: + (out_dir / "walk.json").write_text( + json.dumps(walk_doc, indent=2, ensure_ascii=False), + ) + except Exception as e: + logger.warning("walk.json write failed: %s", e) + return walk_doc + + +if __name__ == "__main__": + # Manual smoke + import sys + url = sys.argv[1] if len(sys.argv) > 1 else "https://www.elli.eco/de/startseite" + out = asyncio.run(record_audit_walk(url)) + print(json.dumps(out, indent=2, ensure_ascii=False))