Files
breakpilot-compliance/consent-tester/services/audit_walk_recorder.py
T
Benjamin Admin 80c4778017 feat(b17): Akkordeon-Expansion im Audit-Walk (Stufe 2, #7)
Nach jedem Compliance-Doc-Aufruf werden alle Akkordeons /
<details> / [aria-expanded=false] / Trigger-Patterns geklickt
und im Video aufgenommen.

  - _expand_accordions(): 7 Selektor-Patterns, max 25 Expansionen
    pro Seite, Dedup nach inner_text (verhindert Endlos-Loops bei
    nesteten Strukturen). Scroll-into-view + click + 400ms warten
    sicher dass das Klick-Result im Video erfasst wird.
  - _visit_link(): Returns (nav_event, expand_event) Tuple. Expand
    läuft nur bei HTTP 2xx + ohne nav-error.
  - 1500ms post-expand wait gibt der Kamera Zeit, den finalen
    Zustand mitzuschneiden.

Backend B17 render: "expand_accordions" Action wird als "5
Akkordeon/Details-Sektion(en) entfaltet" gerendert. Bei 0:
"Keine Akkordeons gefunden" (neutraler Hinweis, kein Fehler).

Real-World-Smoke gegen Elli:
  Impressum:        0 Akkordeons (keine)
  Datenschutzerkl: 5 Akkordeons aufgeklappt
  Nutzungsbeding:   0 Akkordeons

Video-Größe verdoppelt sich (581 KB → 1.14 MB) — Reviewer sieht
jetzt den vollen DSE-Vendor-Tabellen-Inhalt im Video.

Tests: 10/10 grün (+2 für Akkordeon-Render-Pfade).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-07 17:23:55 +02:00

347 lines
11 KiB
Python

"""Playwright Audit-Walk-Recorder.
Nimmt einen vollständigen Site-Walk per WebKit-Browser auf:
1. Goto homepage + Banner-Akzeptieren (Best-Effort)
2. Footer-Links sammeln (DSE, Impressum, AGB, Cookie, Widerruf, ...)
3. Pro Link: navigate + 5s Lese-Verweildauer
4. Video aufzeichnen (Playwright `record_video_dir`)
5. JSON-Action-Index mit Timestamps + SHA-256 für
Manipulation-Schutz
Output landet unter `/data/audit-walks/{walk_id}/`:
- `video.webm` — Playwright-Recording
- `walk.json` — Action-Index mit Timestamps + Hash
Dauer pro Walk: ~30-60 Sekunden bei 6-8 Footer-Links.
Stufe-1 dieser Suite. Stufe-2 (Akkordeon-Expansion) und
Stufe-3 (DSMS-CID-Anchor) folgen separat.
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import os
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from uuid import uuid4
logger = logging.getLogger(__name__)
# Walk-Output-Root (Volume mount: /data ist im docker-compose definiert)
WALK_ROOT = os.getenv("AUDIT_WALK_DIR", "/data/audit-walks")
# Footer-Link-Text-Hints — was wir als relevante Compliance-Anker
# erkennen. Wir laden NICHT jeden Footer-Link (sonst riesige Videos),
# sondern nur die compliance-relevanten.
_LINK_HINTS_LC = (
"impressum", "imprint", "legal",
"datenschutz", "privacy",
"cookie", "cookies",
"agb", "geschäftsbedingung", "geschaeftsbedingung",
"nutzungsbedingung", "terms",
"widerruf", "withdrawal", "cancellation",
"einwilligung", "consent",
)
# Banner-Accept-Buttons — Best-Effort-Liste.
_ACCEPT_PHRASES = (
"alle akzeptieren", "alle zulassen", "akzeptieren",
"alles akzeptieren", "zustimmen", "einverstanden",
"accept all", "accept", "agree", "allow all",
"ok", "verstanden",
)
def _ts() -> str:
return datetime.now(timezone.utc).isoformat()
def _sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
async def _try_accept_banner(page) -> dict:
"""Best-effort: click an accept button. Tries text patterns first,
then common CMP selectors as fallback. Returns action-event dict."""
started = _ts()
for phrase in _ACCEPT_PHRASES:
try:
btn = page.get_by_role("button", name=phrase, exact=False).first
if await btn.count() > 0:
await btn.click(timeout=3000)
await page.wait_for_timeout(1500)
return {
"timestamp": started, "action": "accept_banner",
"result": "clicked", "phrase": phrase,
}
except Exception:
continue
# CMP-fallback selectors
cmp_selectors = (
"#usercentrics-cmp button",
".ot-sdk-container button.banner-actions-container .accept-btn",
".cmp-modal button[aria-label*=accept i]",
"[data-testid=cookie-accept]",
"[aria-label*=akzeptieren i]",
"[aria-label*=accept i]",
)
for sel in cmp_selectors:
try:
el = page.locator(sel).first
if await el.count() > 0:
await el.click(timeout=2000)
await page.wait_for_timeout(1500)
return {
"timestamp": started, "action": "accept_banner",
"result": "clicked", "selector": sel,
}
except Exception:
continue
return {"timestamp": started, "action": "accept_banner",
"result": "no_button_found"}
async def _collect_footer_links(page) -> list[dict]:
"""Find compliance-relevant anchors inside the page footer."""
try:
anchors = await page.eval_on_selector_all(
"footer a[href]",
"(els) => els.map(a => ({text: (a.innerText||'').trim(), "
"href: a.href}))",
)
except Exception as e:
logger.warning("footer-anchor query failed: %s", e)
return []
seen: set[str] = set()
out: list[dict] = []
for a in anchors:
href = (a.get("href") or "").strip()
text = (a.get("text") or "").strip()
if not href or not text:
continue
tl = text.lower()
if not any(h in tl for h in _LINK_HINTS_LC):
continue
key = href.split("#")[0]
if key in seen:
continue
seen.add(key)
out.append({"text": text[:80], "href": href})
if len(out) >= 10:
break
return out
async def _expand_accordions(page, max_expansions: int = 25) -> dict:
"""Click through <details>, [aria-expanded=false], summary, and
typical accordion-header patterns. Returns event dict with count.
Why: privacy policies and cookie tables often hide vendor/purpose
details behind accordions. A video that only scrolls the page
misses 60-80% of the auditable content. Expanding them in-place
captures the disclosed text in the recording.
"""
started = _ts()
expanded = 0
selectors = (
"details:not([open]) > summary",
"[aria-expanded='false']",
"button.accordion-toggle",
"button[data-toggle='accordion']",
".accordion-header button",
".accordion-trigger",
"[class*=accordion] [class*=trigger]",
)
seen_handles: set[str] = set()
for sel in selectors:
try:
els = await page.query_selector_all(sel)
except Exception:
continue
for el in els:
if expanded >= max_expansions:
break
try:
# Dedup: get element-text as a poor-man's hash
txt = (await el.inner_text())[:60].strip()
if txt in seen_handles:
continue
seen_handles.add(txt)
# scroll-into-view + click; ignore obstructed clicks
try:
await el.scroll_into_view_if_needed(timeout=2000)
except Exception:
pass
await el.click(timeout=1500)
await page.wait_for_timeout(400)
expanded += 1
except Exception:
continue
if expanded >= max_expansions:
break
return {
"timestamp": started, "action": "expand_accordions",
"expanded": expanded, "max": max_expansions,
}
async def _visit_link(
page, link: dict, dwell_s: float = 5.0,
expand_accordions: bool = True,
) -> tuple[dict, dict | None]:
"""Navigate to `link.href`, dwell, capture title + status, then
optionally expand all accordions in-place (Stage 2)."""
started = _ts()
start_t = time.monotonic()
status = 0
title = ""
err = ""
try:
resp = await page.goto(link["href"], wait_until="domcontentloaded",
timeout=20000)
if resp is not None:
status = resp.status
await page.wait_for_timeout(int(dwell_s * 1000))
try:
title = (await page.title())[:120]
except Exception:
pass
except Exception as e:
err = str(e)[:200]
nav_event = {
"timestamp": started, "action": "navigate",
"url": link["href"], "anchor_text": link["text"],
"status": status, "title": title,
"dwell_s": round(time.monotonic() - start_t, 2),
"error": err or None,
}
expand_event = None
if expand_accordions and not err and status and status < 400:
try:
expand_event = await _expand_accordions(page)
# Give the camera a moment to record the expanded state
await page.wait_for_timeout(1500)
except Exception as e:
logger.info("expand_accordions failed for %s: %s",
link["href"][:60], e)
return nav_event, expand_event
async def record_audit_walk(
url: str, dwell_s: float = 5.0, max_links: int = 8,
) -> dict[str, Any]:
"""Run a full audit walk + record video. Returns walk metadata."""
try:
from playwright.async_api import async_playwright
except Exception as e:
return {"error": f"playwright missing: {e}"}
walk_id = uuid4().hex[:12]
out_dir = Path(WALK_ROOT) / walk_id
out_dir.mkdir(parents=True, exist_ok=True)
actions: list[dict] = []
started_at = _ts()
err = None
async with async_playwright() as p:
try:
browser = await p.webkit.launch(headless=True)
context = await browser.new_context(
viewport={"width": 1280, "height": 800},
record_video_dir=str(out_dir),
record_video_size={"width": 1280, "height": 800},
locale="de-DE",
)
page = await context.new_page()
actions.append({
"timestamp": _ts(), "action": "goto",
"url": url,
})
try:
resp = await page.goto(url, wait_until="domcontentloaded",
timeout=30000)
actions[-1]["status"] = (resp.status if resp else 0)
except Exception as e:
actions[-1]["error"] = str(e)[:200]
await page.wait_for_timeout(2000)
accept_event = await _try_accept_banner(page)
actions.append(accept_event)
links = await _collect_footer_links(page)
actions.append({
"timestamp": _ts(), "action": "discover_footer_links",
"count": len(links), "links": links[:max_links],
})
for link in links[:max_links]:
nav_ev, expand_ev = await _visit_link(
page, link, dwell_s=dwell_s,
)
actions.append(nav_ev)
if expand_ev is not None:
actions.append(expand_ev)
await context.close()
await browser.close()
except Exception as e:
err = f"walk failed: {str(e)[:200]}"
logger.exception("walk failed")
completed_at = _ts()
# Find produced video file. Playwright writes the .webm with a
# random name when the context closes; rename it for stability.
video_meta: dict[str, Any] = {}
try:
candidates = sorted(out_dir.glob("*.webm"))
if candidates:
src = candidates[0]
dest = out_dir / "video.webm"
if src != dest:
src.rename(dest)
video_meta = {
"filename": "video.webm",
"size_bytes": dest.stat().st_size,
"sha256": _sha256_file(dest),
}
except Exception as e:
logger.warning("video rename failed: %s", e)
walk_doc = {
"walk_id": walk_id,
"url": url,
"started_at": started_at,
"completed_at": completed_at,
"error": err,
"engine": "playwright/webkit",
"viewport": "1280x800",
"actions": actions,
"video": video_meta,
}
try:
(out_dir / "walk.json").write_text(
json.dumps(walk_doc, indent=2, ensure_ascii=False),
)
except Exception as e:
logger.warning("walk.json write failed: %s", e)
return walk_doc
if __name__ == "__main__":
# Manual smoke
import sys
url = sys.argv[1] if len(sys.argv) > 1 else "https://www.elli.eco/de/startseite"
out = asyncio.run(record_audit_walk(url))
print(json.dumps(out, indent=2, ensure_ascii=False))