b16130369a
Stufe 4 — Cookie-Banner-Tour vor dem Accept-Klick:
- audit_walk_banner_tour.tour_cookie_banner(): öffnet Settings
(16 Phrase-Varianten), scrollt vertikal, aktiviert jedes
[role=tab], expandet jedes [aria-expanded=false] / details /
summary + 14 CMP-spezifische Selektoren. Max 35 Klicks,
Best-Effort.
- audit_walk_recorder ruft tour_cookie_banner() VOR
_try_accept_banner auf — Reviewer sieht den vollen Consent-
Katalog im Video (Vendor-Liste, Kategorien, Zwecke).
- Recorder unter 500 LOC (412+155 split).
Stufe 5 — Annotierte Screenshots pro Finding:
- finding_annotator.annotate_url(): WebKit headless, JS-Inject
eines rot-banner-Labels oben + roter Outline um das Element
(Selector oder Text-Match).
- finding_annotator.annotate_findings(): dispatched 3 Cases —
B1 Tap-Target (Anchor markiert mit "Tap-Target X×Y px"),
B16 URL-Slug-Drift (404-Seite mit "/<slug> 404"),
B13 Widerruf (Footer markiert "Widerruf-Link fehlt").
- routes_audit_walk.POST /annotate-findings (consent-tester).
- _b17_wiring ruft annotate-findings nach record_audit_walk und
speichert annotations in walk.annotations.
- audit_walk_zip_builder packt PNGs nach findings/<name>.png ins
ZIP — Reviewer hat Beweis-Bilder im Postfach.
Plausibility Circuit-Breaker:
- Nach 6 consecutive empty batches (PLAUSIBILITY_EMPTY_BUDGET=6)
bricht die ganze Phase ab statt 200 Calls zu warten. Fix für
qwen3-down + große DSE-Sites (BMW: ohne Breaker 21min, mit
Breaker ~3min).
audit_walk_zip_builder fängt walk.annotations ab und legt sie unter
findings/<fname>.png im ZIP-Anhang ab.
V2-Default:
- docker-compose.yml backend-compliance.environment.MAIL_RENDER_V2:
default 'true'. Ohne diesen Override liefert die Engine
weiterhin das alte Legacy-Mail-Layout, in dem die B-Wiring-
Blöcke nicht sichtbar sind.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
227 lines
8.8 KiB
Python
227 lines
8.8 KiB
Python
"""B17 wiring — Audit-Walk-Recorder.
|
|
|
|
Triggert beim consent-tester einen kompletten Playwright-Site-Walk
|
|
mit Video-Aufzeichnung. Result: Video + JSON-Action-Index mit
|
|
Timestamps + SHA-256-Hash für Manipulation-Schutz.
|
|
|
|
Speichert nur die Walk-Metadata + Video-URL im state. Der eigentliche
|
|
File-Body bleibt im consent-tester-Volume (Stufe 1). Stufe 3 wird das
|
|
Video zu DSMS-IPFS hochladen und die CID hier einbinden.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import html
|
|
import logging
|
|
import os
|
|
from urllib.parse import urlparse
|
|
|
|
import httpx
|
|
|
|
from ._constants import CONSENT_TESTER_URL
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Optionaler Override für die öffentliche IPFS-Gateway-URL. DSMS gibt
|
|
# intern http://dsms-node:8080/ipfs/{cid} zurück — für die Mail brauchen
|
|
# Reviewer aber eine extern erreichbare URL.
|
|
DSMS_PUBLIC_GATEWAY = os.environ.get(
|
|
"DSMS_PUBLIC_GATEWAY", "https://dsms-dev.breakpilot.ai",
|
|
)
|
|
|
|
|
|
def _publicize_gateway_url(internal_url: str) -> str:
|
|
"""Replace internal dsms-node host with the public gateway."""
|
|
if not internal_url:
|
|
return ""
|
|
return internal_url.replace(
|
|
"http://dsms-node:8080", DSMS_PUBLIC_GATEWAY,
|
|
).replace(
|
|
"http://bp-compliance-dsms-node:8080", DSMS_PUBLIC_GATEWAY,
|
|
)
|
|
|
|
|
|
async def run_b17(state: dict) -> None:
|
|
"""Trigger walk recording + store metadata in state."""
|
|
req = state.get("req")
|
|
if req is None:
|
|
return
|
|
homepage = ""
|
|
for d in req.documents:
|
|
if d.url:
|
|
p = urlparse(d.url)
|
|
if p.scheme and p.netloc:
|
|
homepage = f"{p.scheme}://{p.netloc}/"
|
|
break
|
|
if not homepage:
|
|
return
|
|
|
|
walk: dict = {}
|
|
try:
|
|
async with httpx.AsyncClient(timeout=180.0) as c:
|
|
r = await c.post(
|
|
f"{CONSENT_TESTER_URL}/scan-audit-walk",
|
|
json={"url": homepage, "dwell_s": 4.0, "max_links": 8},
|
|
timeout=180.0,
|
|
)
|
|
if r.status_code == 200:
|
|
walk = r.json()
|
|
except Exception as e:
|
|
logger.warning("B17 audit-walk request failed: %s", e)
|
|
return
|
|
|
|
if not walk or not walk.get("walk_id"):
|
|
return
|
|
|
|
# Stufe-5: annotierte Screenshots pro Finding. Schickt die
|
|
# gesammelten findings (B1 mobile + B16 slug-drift + B13 widerruf)
|
|
# zum consent-tester der pro Finding ein PNG erzeugt.
|
|
annotations: list[dict] = []
|
|
try:
|
|
findings_for_annot: list[dict] = []
|
|
rf = state.get("reachability_finding")
|
|
if rf and not rf.get("passed", True):
|
|
findings_for_annot.append({
|
|
"check_id": "COOKIE-CONSENT-UX-001",
|
|
"mobile_playwright": rf.get("mobile_playwright") or {},
|
|
})
|
|
for f in (state.get("extra_findings") or []):
|
|
cid = (f.get("check_id") or "").upper()
|
|
if cid in ("URL-SLUG-DRIFT-001", "WIDERRUF-REACH-001"):
|
|
findings_for_annot.append(f)
|
|
if findings_for_annot:
|
|
async with httpx.AsyncClient(timeout=120.0) as c:
|
|
r = await c.post(
|
|
f"{CONSENT_TESTER_URL}/annotate-findings",
|
|
json={"findings": findings_for_annot,
|
|
"home_url": homepage},
|
|
timeout=120.0,
|
|
)
|
|
if r.status_code == 200:
|
|
annotations = (r.json() or {}).get("annotations") or []
|
|
logger.info(
|
|
"B17 annotations: %d Screenshots erzeugt",
|
|
len(annotations),
|
|
)
|
|
except Exception as e:
|
|
logger.warning("annotate-findings request failed: %s", e)
|
|
|
|
walk["annotations"] = annotations
|
|
state["audit_walk"] = walk
|
|
state["audit_walk_html"] = _render(walk)
|
|
logger.info(
|
|
"B17 audit-walk: %s · %d actions · video %d bytes · sha256 %s",
|
|
walk.get("walk_id"),
|
|
len(walk.get("actions") or []),
|
|
(walk.get("video") or {}).get("size_bytes", 0),
|
|
((walk.get("video") or {}).get("sha256") or "")[:12],
|
|
)
|
|
|
|
|
|
def _video_link(walk_id: str) -> str:
|
|
"""External URL for the recorded video (when consent-tester is
|
|
reachable from the audit reviewer)."""
|
|
return f"{CONSENT_TESTER_URL}/audit-walks/{walk_id}/video.webm"
|
|
|
|
|
|
def _render(walk: dict) -> str:
|
|
wid = walk.get("walk_id") or ""
|
|
video = walk.get("video") or {}
|
|
actions = walk.get("actions") or []
|
|
nav_count = sum(1 for a in actions if a.get("action") == "navigate")
|
|
sha = (video.get("sha256") or "")[:12]
|
|
size_kb = round((video.get("size_bytes") or 0) / 1024, 1)
|
|
walk_link = _video_link(wid)
|
|
meta_link = f"{CONSENT_TESTER_URL}/audit-walks/{wid}/walk.json"
|
|
|
|
# Stufe-3 DSMS-Anchor
|
|
video_dsms = (video.get("dsms") or {})
|
|
meta_dsms = (walk.get("walk_json_dsms") or {})
|
|
video_cid = video_dsms.get("cid") or ""
|
|
meta_cid = meta_dsms.get("cid") or ""
|
|
video_gw = _publicize_gateway_url(video_dsms.get("gateway_url") or "")
|
|
meta_gw = _publicize_gateway_url(meta_dsms.get("gateway_url") or "")
|
|
dsms_html = ""
|
|
if video_cid or meta_cid:
|
|
parts = []
|
|
if video_cid:
|
|
link = (f"<a href='{html.escape(video_gw)}' style='color:#0369a1;'>"
|
|
f"<code>{html.escape(video_cid[:20])}…</code></a>"
|
|
if video_gw else
|
|
f"<code>{html.escape(video_cid)}</code>")
|
|
parts.append(f"Video-CID: {link}")
|
|
if meta_cid:
|
|
link = (f"<a href='{html.escape(meta_gw)}' style='color:#0369a1;'>"
|
|
f"<code>{html.escape(meta_cid[:20])}…</code></a>"
|
|
if meta_gw else
|
|
f"<code>{html.escape(meta_cid)}</code>")
|
|
parts.append(f"walk.json-CID: {link}")
|
|
dsms_html = (
|
|
"<p style='margin:0 0 8px;padding:6px 10px;background:#fef3c7;"
|
|
"border-radius:4px;font-size:12px;color:#78350f;'>"
|
|
"<strong>🔒 DSMS-Anchor (manipulationssicher):</strong> "
|
|
+ " · ".join(parts) +
|
|
"</p>"
|
|
)
|
|
|
|
rows = []
|
|
for a in actions:
|
|
ts = (a.get("timestamp") or "")[11:19] # HH:MM:SS
|
|
act = a.get("action") or ""
|
|
detail = ""
|
|
if act == "goto" or act == "navigate":
|
|
detail = (a.get("url") or "")[:120]
|
|
if a.get("status"):
|
|
detail += f" → HTTP {a['status']}"
|
|
elif act == "accept_banner":
|
|
r = a.get("result") or ""
|
|
if r == "clicked":
|
|
detail = f"Banner akzeptiert ({a.get('phrase') or a.get('selector') or ''})"
|
|
else:
|
|
detail = "Kein Accept-Button gefunden"
|
|
elif act == "discover_footer_links":
|
|
detail = f"{a.get('count', 0)} Compliance-Links im Footer"
|
|
elif act == "expand_accordions":
|
|
n = a.get("expanded", 0)
|
|
detail = (f"{n} Akkordeon/Details-Sektion(en) entfaltet"
|
|
if n else "Keine Akkordeons gefunden")
|
|
elif act == "tour_cookie_banner":
|
|
n = a.get("clicks", 0)
|
|
opened = "Settings geöffnet" if a.get("settings_opened") \
|
|
else "kein Settings-Trigger gefunden"
|
|
detail = f"Cookie-Banner-Tour: {n} Klicks ({opened})"
|
|
rows.append(
|
|
f"<tr><td style='padding:4px 8px;font-family:monospace;"
|
|
f"color:#475569;'>{html.escape(ts)}</td>"
|
|
f"<td style='padding:4px 8px;'>{html.escape(act)}</td>"
|
|
f"<td style='padding:4px 8px;color:#475569;'>"
|
|
f"{html.escape(detail)}</td></tr>"
|
|
)
|
|
return (
|
|
"<div style='margin:24px 0;padding:16px;border-left:4px solid #0ea5e9;"
|
|
"background:#f0f9ff;border-radius:4px;'>"
|
|
"<h2 style='margin:0 0 8px;color:#0c4a6e;font-size:16px;'>"
|
|
"🎥 Audit-Walk-Video (Beweis-Aufzeichnung)"
|
|
"</h2>"
|
|
"<p style='margin:0 0 8px;font-size:13px;color:#475569;'>"
|
|
f"<strong>Video:</strong> "
|
|
f"<a href='{html.escape(walk_link)}' style='color:#0369a1;'>video.webm</a> "
|
|
f"({size_kb} KB, SHA-256 <code>{html.escape(sha)}…</code>) · "
|
|
f"<strong>Metadata:</strong> "
|
|
f"<a href='{html.escape(meta_link)}' style='color:#0369a1;'>walk.json</a>"
|
|
"</p>"
|
|
"<p style='margin:0 0 8px;font-size:13px;color:#475569;'>"
|
|
f"{nav_count} Compliance-Seiten besucht, jede 4 Sek "
|
|
"verweilt — Reviewer kann den Audit-Walk nachverfolgen."
|
|
"</p>"
|
|
+ dsms_html +
|
|
"<table style='font-size:12px;width:100%;border-collapse:collapse;"
|
|
"background:#fff;border-radius:4px;'>"
|
|
"<thead><tr style='background:#e0f2fe;'>"
|
|
"<th style='padding:6px 8px;text-align:left;'>Zeit (UTC)</th>"
|
|
"<th style='padding:6px 8px;text-align:left;'>Aktion</th>"
|
|
"<th style='padding:6px 8px;text-align:left;'>Detail</th>"
|
|
"</tr></thead><tbody>" + "".join(rows) + "</tbody></table>"
|
|
"</div>"
|
|
)
|