Files
breakpilot-compliance/consent-tester/services/audit_walk_recorder.py
T
Benjamin Admin c7d2038ad9 feat(b17): DSMS-CID-Anchor für Audit-Walk-Video (Stufe 3, #7)
Video + walk.json werden nach Aufnahme zu DSMS-IPFS hochgeladen.
Die zurückgegebenen CIDs sind manipulationssichere Audit-Anker —
Reviewer können das Walk-Video Monate später noch verifizieren und
auf Unverändertheit prüfen.

consent-tester:
  - _upload_to_dsms(): Best-Effort-Upload zu /api/v1/documents
    (Bearer-Token, document_type=audit_walk_video|meta). DSMS-Down
    bricht den Walk nicht ab — CID fehlt einfach im result.
  - record_audit_walk(): nach video.webm + walk.json erzeugt, beide
    hochladen. walk.json wird re-written sodass es BEIDE CIDs
    selbstreferenziell enthält.
  - ENV: DSMS_GATEWAY_URL + DSMS_BEARER konfigurierbar.

backend:
  - _b17_wiring._publicize_gateway_url(): DSMS gibt intern
    http://dsms-node:8080/ipfs/{cid} zurück. Für die Audit-Mail
    wird das via env DSMS_PUBLIC_GATEWAY (default
    https://dsms-dev.breakpilot.ai) durch eine extern erreichbare
    URL ersetzt.
  - Render-Block: gelber DSMS-Anchor-Hinweis mit Video-CID +
    walk.json-CID, beide als klickbare Links zur public Gateway.

Real-World-Smoke gegen Elli:
  - Video-CID: QmbdFwtSymPuWGYYdC6eNZ1eEvVLsTYmoRRxEo5L6BXgwt
  - walk.json-CID: QmWaTqwZq4KVd5wYFVAKB12uZtAosPqoG1X4m1azysXYJi
  - DSMS-Upload erfolgreich, gateway_url im response

Tests: 12/12 grün (+2 für DSMS-Anchor-Render-Pfade inkl.
Internal-Host → Public-Gateway-Rewrite).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-07 17:32:34 +02:00

405 lines
14 KiB
Python

"""Playwright Audit-Walk-Recorder.
Nimmt einen vollständigen Site-Walk per WebKit-Browser auf:
1. Goto homepage + Banner-Akzeptieren (Best-Effort)
2. Footer-Links sammeln (DSE, Impressum, AGB, Cookie, Widerruf, ...)
3. Pro Link: navigate + 5s Lese-Verweildauer
4. Video aufzeichnen (Playwright `record_video_dir`)
5. JSON-Action-Index mit Timestamps + SHA-256 für
Manipulation-Schutz
Output landet unter `/data/audit-walks/{walk_id}/`:
- `video.webm` — Playwright-Recording
- `walk.json` — Action-Index mit Timestamps + Hash
Dauer pro Walk: ~30-60 Sekunden bei 6-8 Footer-Links.
Stufe-1 dieser Suite. Stufe-2 (Akkordeon-Expansion) und
Stufe-3 (DSMS-CID-Anchor) folgen separat.
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import os
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from uuid import uuid4
logger = logging.getLogger(__name__)
# Walk-Output-Root (Volume mount: /data ist im docker-compose definiert)
WALK_ROOT = os.getenv("AUDIT_WALK_DIR", "/data/audit-walks")
# DSMS-Gateway intern (kein Public-Hostname nötig). Setzt der
# docker-compose env. Wird Stufe-3-Anchor benutzt.
DSMS_GATEWAY_URL = os.getenv(
"DSMS_GATEWAY_URL", "http://bp-compliance-dsms-gateway:8082",
)
DSMS_BEARER = os.getenv("DSMS_BEARER", "audit-walk-uploader")
# Footer-Link-Text-Hints — was wir als relevante Compliance-Anker
# erkennen. Wir laden NICHT jeden Footer-Link (sonst riesige Videos),
# sondern nur die compliance-relevanten.
_LINK_HINTS_LC = (
"impressum", "imprint", "legal",
"datenschutz", "privacy",
"cookie", "cookies",
"agb", "geschäftsbedingung", "geschaeftsbedingung",
"nutzungsbedingung", "terms",
"widerruf", "withdrawal", "cancellation",
"einwilligung", "consent",
)
# Banner-Accept-Buttons — Best-Effort-Liste.
_ACCEPT_PHRASES = (
"alle akzeptieren", "alle zulassen", "akzeptieren",
"alles akzeptieren", "zustimmen", "einverstanden",
"accept all", "accept", "agree", "allow all",
"ok", "verstanden",
)
def _ts() -> str:
return datetime.now(timezone.utc).isoformat()
async def _upload_to_dsms(
path: Path, document_type: str, document_id: str,
) -> dict:
"""Upload a single file to DSMS. Returns {cid, size, gateway_url}
or {error}. Best-effort: a DSMS-down doesn't abort the walk."""
try:
import httpx
async with httpx.AsyncClient(timeout=60.0) as client:
with path.open("rb") as f:
files = {"file": (path.name, f.read())}
r = await client.post(
f"{DSMS_GATEWAY_URL}/api/v1/documents",
files=files,
data={"document_type": document_type,
"document_id": document_id},
headers={"Authorization": f"Bearer {DSMS_BEARER}"},
)
if r.status_code in (200, 201):
data = r.json() or {}
return {
"cid": data.get("cid"),
"size": data.get("size"),
"gateway_url": data.get("gateway_url") or "",
}
return {"error": f"HTTP {r.status_code}: {r.text[:200]}"}
except Exception as e:
return {"error": str(e)[:200]}
def _sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
async def _try_accept_banner(page) -> dict:
"""Best-effort: click an accept button. Tries text patterns first,
then common CMP selectors as fallback. Returns action-event dict."""
started = _ts()
for phrase in _ACCEPT_PHRASES:
try:
btn = page.get_by_role("button", name=phrase, exact=False).first
if await btn.count() > 0:
await btn.click(timeout=3000)
await page.wait_for_timeout(1500)
return {
"timestamp": started, "action": "accept_banner",
"result": "clicked", "phrase": phrase,
}
except Exception:
continue
# CMP-fallback selectors
cmp_selectors = (
"#usercentrics-cmp button",
".ot-sdk-container button.banner-actions-container .accept-btn",
".cmp-modal button[aria-label*=accept i]",
"[data-testid=cookie-accept]",
"[aria-label*=akzeptieren i]",
"[aria-label*=accept i]",
)
for sel in cmp_selectors:
try:
el = page.locator(sel).first
if await el.count() > 0:
await el.click(timeout=2000)
await page.wait_for_timeout(1500)
return {
"timestamp": started, "action": "accept_banner",
"result": "clicked", "selector": sel,
}
except Exception:
continue
return {"timestamp": started, "action": "accept_banner",
"result": "no_button_found"}
async def _collect_footer_links(page) -> list[dict]:
"""Find compliance-relevant anchors inside the page footer."""
try:
anchors = await page.eval_on_selector_all(
"footer a[href]",
"(els) => els.map(a => ({text: (a.innerText||'').trim(), "
"href: a.href}))",
)
except Exception as e:
logger.warning("footer-anchor query failed: %s", e)
return []
seen: set[str] = set()
out: list[dict] = []
for a in anchors:
href = (a.get("href") or "").strip()
text = (a.get("text") or "").strip()
if not href or not text:
continue
tl = text.lower()
if not any(h in tl for h in _LINK_HINTS_LC):
continue
key = href.split("#")[0]
if key in seen:
continue
seen.add(key)
out.append({"text": text[:80], "href": href})
if len(out) >= 10:
break
return out
async def _expand_accordions(page, max_expansions: int = 25) -> dict:
"""Click through <details>, [aria-expanded=false], summary, and
typical accordion-header patterns. Returns event dict with count.
Why: privacy policies and cookie tables often hide vendor/purpose
details behind accordions. A video that only scrolls the page
misses 60-80% of the auditable content. Expanding them in-place
captures the disclosed text in the recording.
"""
started = _ts()
expanded = 0
selectors = (
"details:not([open]) > summary",
"[aria-expanded='false']",
"button.accordion-toggle",
"button[data-toggle='accordion']",
".accordion-header button",
".accordion-trigger",
"[class*=accordion] [class*=trigger]",
)
seen_handles: set[str] = set()
for sel in selectors:
try:
els = await page.query_selector_all(sel)
except Exception:
continue
for el in els:
if expanded >= max_expansions:
break
try:
# Dedup: get element-text as a poor-man's hash
txt = (await el.inner_text())[:60].strip()
if txt in seen_handles:
continue
seen_handles.add(txt)
# scroll-into-view + click; ignore obstructed clicks
try:
await el.scroll_into_view_if_needed(timeout=2000)
except Exception:
pass
await el.click(timeout=1500)
await page.wait_for_timeout(400)
expanded += 1
except Exception:
continue
if expanded >= max_expansions:
break
return {
"timestamp": started, "action": "expand_accordions",
"expanded": expanded, "max": max_expansions,
}
async def _visit_link(
page, link: dict, dwell_s: float = 5.0,
expand_accordions: bool = True,
) -> tuple[dict, dict | None]:
"""Navigate to `link.href`, dwell, capture title + status, then
optionally expand all accordions in-place (Stage 2)."""
started = _ts()
start_t = time.monotonic()
status = 0
title = ""
err = ""
try:
resp = await page.goto(link["href"], wait_until="domcontentloaded",
timeout=20000)
if resp is not None:
status = resp.status
await page.wait_for_timeout(int(dwell_s * 1000))
try:
title = (await page.title())[:120]
except Exception:
pass
except Exception as e:
err = str(e)[:200]
nav_event = {
"timestamp": started, "action": "navigate",
"url": link["href"], "anchor_text": link["text"],
"status": status, "title": title,
"dwell_s": round(time.monotonic() - start_t, 2),
"error": err or None,
}
expand_event = None
if expand_accordions and not err and status and status < 400:
try:
expand_event = await _expand_accordions(page)
# Give the camera a moment to record the expanded state
await page.wait_for_timeout(1500)
except Exception as e:
logger.info("expand_accordions failed for %s: %s",
link["href"][:60], e)
return nav_event, expand_event
async def record_audit_walk(
url: str, dwell_s: float = 5.0, max_links: int = 8,
) -> dict[str, Any]:
"""Run a full audit walk + record video. Returns walk metadata."""
try:
from playwright.async_api import async_playwright
except Exception as e:
return {"error": f"playwright missing: {e}"}
walk_id = uuid4().hex[:12]
out_dir = Path(WALK_ROOT) / walk_id
out_dir.mkdir(parents=True, exist_ok=True)
actions: list[dict] = []
started_at = _ts()
err = None
async with async_playwright() as p:
try:
browser = await p.webkit.launch(headless=True)
context = await browser.new_context(
viewport={"width": 1280, "height": 800},
record_video_dir=str(out_dir),
record_video_size={"width": 1280, "height": 800},
locale="de-DE",
)
page = await context.new_page()
actions.append({
"timestamp": _ts(), "action": "goto",
"url": url,
})
try:
resp = await page.goto(url, wait_until="domcontentloaded",
timeout=30000)
actions[-1]["status"] = (resp.status if resp else 0)
except Exception as e:
actions[-1]["error"] = str(e)[:200]
await page.wait_for_timeout(2000)
accept_event = await _try_accept_banner(page)
actions.append(accept_event)
links = await _collect_footer_links(page)
actions.append({
"timestamp": _ts(), "action": "discover_footer_links",
"count": len(links), "links": links[:max_links],
})
for link in links[:max_links]:
nav_ev, expand_ev = await _visit_link(
page, link, dwell_s=dwell_s,
)
actions.append(nav_ev)
if expand_ev is not None:
actions.append(expand_ev)
await context.close()
await browser.close()
except Exception as e:
err = f"walk failed: {str(e)[:200]}"
logger.exception("walk failed")
completed_at = _ts()
# Find produced video file. Playwright writes the .webm with a
# random name when the context closes; rename it for stability.
video_meta: dict[str, Any] = {}
try:
candidates = sorted(out_dir.glob("*.webm"))
if candidates:
src = candidates[0]
dest = out_dir / "video.webm"
if src != dest:
src.rename(dest)
video_meta = {
"filename": "video.webm",
"size_bytes": dest.stat().st_size,
"sha256": _sha256_file(dest),
}
except Exception as e:
logger.warning("video rename failed: %s", e)
walk_doc = {
"walk_id": walk_id,
"url": url,
"started_at": started_at,
"completed_at": completed_at,
"error": err,
"engine": "playwright/webkit",
"viewport": "1280x800",
"actions": actions,
"video": video_meta,
}
# Stufe 3: DSMS-CID-Anchor — Video + walk.json zu IPFS hochladen
# bevor walk.json final geschrieben wird, damit der CID in der
# walk.json selbst stehen kann (self-referential audit anchor).
video_path = out_dir / "video.webm"
if video_path.exists():
video_dsms = await _upload_to_dsms(
video_path, document_type="audit_walk_video",
document_id=walk_id,
)
walk_doc["video"]["dsms"] = video_dsms
try:
walk_json_path = out_dir / "walk.json"
walk_json_path.write_text(
json.dumps(walk_doc, indent=2, ensure_ascii=False),
)
walk_dsms = await _upload_to_dsms(
walk_json_path, document_type="audit_walk_meta",
document_id=walk_id,
)
walk_doc["walk_json_dsms"] = walk_dsms
# Re-write so the on-disk walk.json contains BOTH CIDs
walk_json_path.write_text(
json.dumps(walk_doc, indent=2, ensure_ascii=False),
)
except Exception as e:
logger.warning("walk.json write failed: %s", e)
return walk_doc
if __name__ == "__main__":
# Manual smoke
import sys
url = sys.argv[1] if len(sys.argv) > 1 else "https://www.elli.eco/de/startseite"
out = asyncio.run(record_audit_walk(url))
print(json.dumps(out, indent=2, ensure_ascii=False))