feat(b17): Stufe 4 banner-tour + Stufe 5 annotierte Screenshots + V2-default
Stufe 4 — Cookie-Banner-Tour vor dem Accept-Klick:
- audit_walk_banner_tour.tour_cookie_banner(): öffnet Settings
(16 Phrase-Varianten), scrollt vertikal, aktiviert jedes
[role=tab], expandet jedes [aria-expanded=false] / details /
summary + 14 CMP-spezifische Selektoren. Max 35 Klicks,
Best-Effort.
- audit_walk_recorder ruft tour_cookie_banner() VOR
_try_accept_banner auf — Reviewer sieht den vollen Consent-
Katalog im Video (Vendor-Liste, Kategorien, Zwecke).
- Recorder unter 500 LOC (412+155 split).
Stufe 5 — Annotierte Screenshots pro Finding:
- finding_annotator.annotate_url(): WebKit headless, JS-Inject
eines rot-banner-Labels oben + roter Outline um das Element
(Selector oder Text-Match).
- finding_annotator.annotate_findings(): dispatched 3 Cases —
B1 Tap-Target (Anchor markiert mit "Tap-Target X×Y px"),
B16 URL-Slug-Drift (404-Seite mit "/<slug> 404"),
B13 Widerruf (Footer markiert "Widerruf-Link fehlt").
- routes_audit_walk.POST /annotate-findings (consent-tester).
- _b17_wiring ruft annotate-findings nach record_audit_walk und
speichert annotations in walk.annotations.
- audit_walk_zip_builder packt PNGs nach findings/<name>.png ins
ZIP — Reviewer hat Beweis-Bilder im Postfach.
Plausibility Circuit-Breaker:
- Nach 6 consecutive empty batches (PLAUSIBILITY_EMPTY_BUDGET=6)
bricht die ganze Phase ab statt 200 Calls zu warten. Fix für
qwen3-down + große DSE-Sites (BMW: ohne Breaker 21min, mit
Breaker ~3min).
audit_walk_zip_builder fängt walk.annotations ab und legt sie unter
findings/<fname>.png im ZIP-Anhang ab.
V2-Default:
- docker-compose.yml backend-compliance.environment.MAIL_RENDER_V2:
default 'true'. Ohne diesen Override liefert die Engine
weiterhin das alte Legacy-Mail-Layout, in dem die B-Wiring-
Blöcke nicht sichtbar sind.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -73,6 +73,40 @@ async def run_b17(state: dict) -> None:
|
||||
if not walk or not walk.get("walk_id"):
|
||||
return
|
||||
|
||||
# Stufe-5: annotierte Screenshots pro Finding. Schickt die
|
||||
# gesammelten findings (B1 mobile + B16 slug-drift + B13 widerruf)
|
||||
# zum consent-tester der pro Finding ein PNG erzeugt.
|
||||
annotations: list[dict] = []
|
||||
try:
|
||||
findings_for_annot: list[dict] = []
|
||||
rf = state.get("reachability_finding")
|
||||
if rf and not rf.get("passed", True):
|
||||
findings_for_annot.append({
|
||||
"check_id": "COOKIE-CONSENT-UX-001",
|
||||
"mobile_playwright": rf.get("mobile_playwright") or {},
|
||||
})
|
||||
for f in (state.get("extra_findings") or []):
|
||||
cid = (f.get("check_id") or "").upper()
|
||||
if cid in ("URL-SLUG-DRIFT-001", "WIDERRUF-REACH-001"):
|
||||
findings_for_annot.append(f)
|
||||
if findings_for_annot:
|
||||
async with httpx.AsyncClient(timeout=120.0) as c:
|
||||
r = await c.post(
|
||||
f"{CONSENT_TESTER_URL}/annotate-findings",
|
||||
json={"findings": findings_for_annot,
|
||||
"home_url": homepage},
|
||||
timeout=120.0,
|
||||
)
|
||||
if r.status_code == 200:
|
||||
annotations = (r.json() or {}).get("annotations") or []
|
||||
logger.info(
|
||||
"B17 annotations: %d Screenshots erzeugt",
|
||||
len(annotations),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("annotate-findings request failed: %s", e)
|
||||
|
||||
walk["annotations"] = annotations
|
||||
state["audit_walk"] = walk
|
||||
state["audit_walk_html"] = _render(walk)
|
||||
logger.info(
|
||||
@@ -151,6 +185,11 @@ def _render(walk: dict) -> str:
|
||||
n = a.get("expanded", 0)
|
||||
detail = (f"{n} Akkordeon/Details-Sektion(en) entfaltet"
|
||||
if n else "Keine Akkordeons gefunden")
|
||||
elif act == "tour_cookie_banner":
|
||||
n = a.get("clicks", 0)
|
||||
opened = "Settings geöffnet" if a.get("settings_opened") \
|
||||
else "kein Settings-Trigger gefunden"
|
||||
detail = f"Cookie-Banner-Tour: {n} Klicks ({opened})"
|
||||
rows.append(
|
||||
f"<tr><td style='padding:4px 8px;font-family:monospace;"
|
||||
f"color:#475569;'>{html.escape(ts)}</td>"
|
||||
|
||||
@@ -0,0 +1,110 @@
|
||||
"""Bundle the audit-walk-video + metadata into a ZIP for email attachment.
|
||||
|
||||
Backend hat kein direkten Zugriff auf das consent-tester-Volume, also
|
||||
laden wir das Video via HTTP vom consent-tester (Stufe-1-Endpoint).
|
||||
DSMS-CIDs sind im walk dict + werden zusätzlich in README.txt
|
||||
geschrieben, sodass der Empfänger das Original auch via IPFS-Gateway
|
||||
verifizieren kann.
|
||||
|
||||
Output: bytes (ZIP-stream) — ready für SMTP-attachment.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import zipfile
|
||||
|
||||
import httpx
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _readme(walk: dict) -> str:
|
||||
wid = walk.get("walk_id") or "?"
|
||||
url = walk.get("url") or "?"
|
||||
started = walk.get("started_at") or "?"
|
||||
completed = walk.get("completed_at") or "?"
|
||||
video = walk.get("video") or {}
|
||||
sha = video.get("sha256") or "?"
|
||||
size = video.get("size_bytes") or 0
|
||||
video_cid = (video.get("dsms") or {}).get("cid") or "—"
|
||||
meta_cid = (walk.get("walk_json_dsms") or {}).get("cid") or "—"
|
||||
nav = sum(1 for a in walk.get("actions") or []
|
||||
if a.get("action") == "navigate")
|
||||
accs = sum((a.get("expanded") or 0) for a in walk.get("actions") or []
|
||||
if a.get("action") == "expand_accordions")
|
||||
return f"""BreakPilot Compliance — Audit-Walk-Beweis-Paket
|
||||
|
||||
Walk-ID: {wid}
|
||||
Site: {url}
|
||||
Aufgenommen: {started} → {completed}
|
||||
Engine: Playwright WebKit (Mobile-Viewport 1280×800)
|
||||
|
||||
Inhalt dieses Pakets:
|
||||
- video.webm {size:,} Bytes, SHA-256 {sha[:32]}…
|
||||
- walk.json Action-Index mit UTC-Timestamps pro Schritt
|
||||
- README.txt diese Datei
|
||||
|
||||
Walk-Statistik:
|
||||
- {nav} Compliance-Seiten besucht (Datenschutz, Impressum, AGB, ...)
|
||||
- {accs} Akkordeon-/Details-Sektionen automatisch entfaltet
|
||||
|
||||
DSMS-Anker (IPFS, manipulationssicher):
|
||||
Video: {video_cid}
|
||||
walk.json: {meta_cid}
|
||||
|
||||
Zur Verifikation:
|
||||
1. Lade das Original via https://dsms-dev.breakpilot.ai/ipfs/<CID>
|
||||
2. Vergleiche SHA-256 mit obigem Hash
|
||||
3. Öffne video.webm in einem modernen Browser (VLC / Chrome)
|
||||
4. Lies walk.json um die Klick-Sequenz nachzuvollziehen
|
||||
"""
|
||||
|
||||
|
||||
def build_audit_walk_zip(
|
||||
walk: dict,
|
||||
consent_tester_url: str = "http://bp-compliance-consent-tester:8094",
|
||||
) -> bytes:
|
||||
"""Fetch video from consent-tester + bundle with walk.json + README."""
|
||||
wid = walk.get("walk_id") or ""
|
||||
if not wid:
|
||||
return b""
|
||||
|
||||
# Pull video binary from consent-tester (Stufe 1 endpoint)
|
||||
video_bytes = b""
|
||||
try:
|
||||
with httpx.Client(timeout=60.0) as c:
|
||||
r = c.get(f"{consent_tester_url}/audit-walks/{wid}/video.webm")
|
||||
if r.status_code == 200:
|
||||
video_bytes = r.content
|
||||
except Exception as e:
|
||||
logger.warning("audit-walk video fetch failed: %s", e)
|
||||
|
||||
walk_json_bytes = json.dumps(walk, indent=2, ensure_ascii=False).encode(
|
||||
"utf-8",
|
||||
)
|
||||
readme_bytes = _readme(walk).encode("utf-8")
|
||||
|
||||
# Annotierte Screenshots pro Finding (Stufe 5)
|
||||
import base64
|
||||
annotations = walk.get("annotations") or []
|
||||
|
||||
buf = io.BytesIO()
|
||||
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as z:
|
||||
if video_bytes:
|
||||
z.writestr("video.webm", video_bytes)
|
||||
z.writestr("walk.json", walk_json_bytes)
|
||||
z.writestr("README.txt", readme_bytes)
|
||||
for a in annotations:
|
||||
fname = a.get("filename") or ""
|
||||
b64 = a.get("png_b64") or ""
|
||||
if not fname or not b64:
|
||||
continue
|
||||
try:
|
||||
z.writestr(f"findings/{fname}", base64.b64decode(b64))
|
||||
except Exception as e:
|
||||
logger.warning("annotation %s write failed: %s",
|
||||
fname, e)
|
||||
return buf.getvalue()
|
||||
@@ -333,13 +333,27 @@ async def verify_plausibility(results, doc_texts: dict[str, str]) -> None:
|
||||
logger.info("plausibility-check: %d findings across %d docs",
|
||||
total, len(by_doc))
|
||||
|
||||
# Circuit-Breaker gegen Ollama-Total-Down: nach N consecutive
|
||||
# batches mit 0 stamped → ganze Phase abbrechen (statt 200 calls
|
||||
# warten). Wert konservativ: 6 consecutive empties = qwen3 ist
|
||||
# offensichtlich nicht in der Lage zu antworten.
|
||||
consecutive_empty_budget = int(
|
||||
os.getenv("PLAUSIBILITY_EMPTY_BUDGET", "6"),
|
||||
)
|
||||
consecutive_empty = 0
|
||||
breaker_tripped = False
|
||||
|
||||
for dt, checks in by_doc.items():
|
||||
if breaker_tripped:
|
||||
break
|
||||
doc_title = by_doc_meta.get(dt) or dt
|
||||
doc_text = doc_texts.get(dt) or ""
|
||||
if not doc_text:
|
||||
# Fall back to DSE excerpt when the doc has no own text
|
||||
doc_text = doc_texts.get("dse") or ""
|
||||
for i in range(0, len(checks), BATCH_SIZE):
|
||||
if breaker_tripped:
|
||||
break
|
||||
batch = checks[i:i + BATCH_SIZE]
|
||||
items = []
|
||||
for c in batch:
|
||||
@@ -396,5 +410,19 @@ async def verify_plausibility(results, doc_texts: dict[str, str]) -> None:
|
||||
stamped += 1
|
||||
except Exception:
|
||||
pass
|
||||
# Circuit-Breaker: stamped=0 zählt als consecutive_empty.
|
||||
# Ausnahme: wenn ALLE items aus dem _CACHE kamen, ist 0 OK
|
||||
# (kein neuer LLM-Call gemacht).
|
||||
if uncached_items and stamped == 0:
|
||||
consecutive_empty += 1
|
||||
if consecutive_empty >= consecutive_empty_budget:
|
||||
logger.warning(
|
||||
"plausibility circuit-breaker tripped after "
|
||||
"%d consecutive empty batches — aborting phase",
|
||||
consecutive_empty,
|
||||
)
|
||||
breaker_tripped = True
|
||||
elif stamped > 0:
|
||||
consecutive_empty = 0
|
||||
logger.info("plausibility-check %s: batch %d → %d stamped",
|
||||
dt, len(batch), stamped)
|
||||
|
||||
@@ -10,6 +10,7 @@ from fastapi.responses import FileResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
from services.audit_walk_recorder import WALK_ROOT, record_audit_walk
|
||||
from services.finding_annotator import annotate_findings
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@@ -20,6 +21,11 @@ class AuditWalkReq(BaseModel):
|
||||
max_links: int = 8
|
||||
|
||||
|
||||
class AnnotateReq(BaseModel):
|
||||
findings: list[dict]
|
||||
home_url: str
|
||||
|
||||
|
||||
@router.post("/scan-audit-walk")
|
||||
async def scan_audit_walk(req: AuditWalkReq) -> dict:
|
||||
if not req.url or not req.url.startswith(("http://", "https://")):
|
||||
@@ -51,3 +57,12 @@ async def serve_walk_meta(walk_id: str):
|
||||
if not path.exists():
|
||||
raise HTTPException(404, "walk.json not found")
|
||||
return FileResponse(str(path), media_type="application/json")
|
||||
|
||||
|
||||
@router.post("/annotate-findings")
|
||||
async def annotate_findings_route(req: AnnotateReq) -> dict:
|
||||
"""Produce annotated screenshots per finding."""
|
||||
if not req.home_url.startswith(("http://", "https://")):
|
||||
raise HTTPException(400, "invalid home_url")
|
||||
out = await annotate_findings(req.findings, req.home_url)
|
||||
return {"annotations": out, "count": len(out)}
|
||||
|
||||
@@ -0,0 +1,155 @@
|
||||
"""Cookie-Banner-Tour vor dem Accept-Klick.
|
||||
|
||||
Öffnet im Banner Settings/Mehr-Optionen, scrollt vertikal, aktiviert
|
||||
jeden Tab, expandet jedes Akkordeon — sodass das Walk-Video den
|
||||
vollen Consent-Katalog (Vendoren, Zwecke, Speicherdauern) zeigt.
|
||||
|
||||
Wird vom audit_walk_recorder vor `_try_accept_banner` aufgerufen.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Tour-Trigger: Öffne im Banner zuerst die Detail-Ansicht /
|
||||
# Einstellungen — DORT findet sich die Vendor-Liste und die Kategorie-
|
||||
# Klappmenüs.
|
||||
_TOUR_OPEN_PHRASES = (
|
||||
"einstellungen", "individuelle einstellungen",
|
||||
"cookie-einstellungen", "anpassen", "details", "details anzeigen",
|
||||
"mehr optionen", "auswahl anpassen", "verwalten",
|
||||
"konfigurieren", "manage", "customize", "manage cookies",
|
||||
"manage preferences", "settings", "show purposes",
|
||||
"show vendors", "show details", "purposes", "vendors",
|
||||
)
|
||||
|
||||
# Banner-Akkordeon/Tab-Pattern: was wir IM Banner aufklappen wollen.
|
||||
_BANNER_EXPAND_SELECTORS = (
|
||||
"[role=tab]",
|
||||
"[role=tablist] [role=tab]",
|
||||
"[role=tabpanel] button",
|
||||
"[aria-expanded='false']",
|
||||
"button[data-toggle='collapse']",
|
||||
"summary",
|
||||
"details:not([open]) > summary",
|
||||
"[class*=accordion] button",
|
||||
"[class*=expand] button",
|
||||
".purposes-list button",
|
||||
".vendor-list button",
|
||||
"[data-testid*='expand']",
|
||||
"[data-testid*='vendor']",
|
||||
"[data-testid*='purpose']",
|
||||
)
|
||||
|
||||
|
||||
def _ts() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
async def tour_cookie_banner(page, max_clicks: int = 35,
|
||||
dwell_ms: int = 700) -> dict:
|
||||
"""BEFORE accepting the banner: open Settings, scroll through it,
|
||||
expand every category / vendor / accordion / tab inside.
|
||||
|
||||
Strategie:
|
||||
1. Suche Buttons mit Tour-Phrases ("Einstellungen", "Vendor" etc.)
|
||||
— klicken erste Detail-Ansicht.
|
||||
2. Scrolle Banner vertikal (sodass mehr Inhalte rendern).
|
||||
3. Aktiviere jedes [role=tab] nacheinander.
|
||||
4. Expand jedes [aria-expanded=false] / details / summary im
|
||||
Banner-Container.
|
||||
|
||||
Alles im Video aufgezeichnet — Reviewer sieht den vollen Vendor-
|
||||
und Zweck-Katalog. Best-Effort; jeder Klick mit try/except.
|
||||
"""
|
||||
started = _ts()
|
||||
clicks = 0
|
||||
notes: list[str] = []
|
||||
|
||||
# Phase 1: open "Settings" / "Mehr Optionen"
|
||||
settings_opened = False
|
||||
for phrase in _TOUR_OPEN_PHRASES:
|
||||
try:
|
||||
btn = page.get_by_role("button", name=phrase, exact=False).first
|
||||
if await btn.count() > 0:
|
||||
await btn.scroll_into_view_if_needed(timeout=1500)
|
||||
await btn.click(timeout=2000)
|
||||
await page.wait_for_timeout(1500)
|
||||
notes.append(f"opened: {phrase}")
|
||||
settings_opened = True
|
||||
clicks += 1
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
if not settings_opened:
|
||||
try:
|
||||
link = page.get_by_role("link", name=phrase,
|
||||
exact=False).first
|
||||
if await link.count() > 0:
|
||||
await link.scroll_into_view_if_needed(timeout=1500)
|
||||
await link.click(timeout=2000)
|
||||
await page.wait_for_timeout(1500)
|
||||
notes.append(f"opened (link): {phrase}")
|
||||
settings_opened = True
|
||||
clicks += 1
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
# Phase 2: scroll the banner / dialog to expose more content
|
||||
try:
|
||||
await page.mouse.wheel(0, 600)
|
||||
await page.wait_for_timeout(500)
|
||||
await page.mouse.wheel(0, 600)
|
||||
await page.wait_for_timeout(500)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Phase 3: activate every [role=tab] in any role=tablist
|
||||
try:
|
||||
tabs = await page.query_selector_all("[role=tab]")
|
||||
for tab in tabs:
|
||||
if clicks >= max_clicks:
|
||||
break
|
||||
try:
|
||||
await tab.scroll_into_view_if_needed(timeout=1000)
|
||||
await tab.click(timeout=1200)
|
||||
await page.wait_for_timeout(dwell_ms)
|
||||
clicks += 1
|
||||
except Exception:
|
||||
continue
|
||||
if tabs:
|
||||
notes.append(f"tabs activated: {len(tabs)}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Phase 4: expand every aria-expanded=false / details / summary
|
||||
for sel in _BANNER_EXPAND_SELECTORS:
|
||||
if clicks >= max_clicks:
|
||||
break
|
||||
try:
|
||||
els = await page.query_selector_all(sel)
|
||||
except Exception:
|
||||
continue
|
||||
for el in els:
|
||||
if clicks >= max_clicks:
|
||||
break
|
||||
try:
|
||||
await el.scroll_into_view_if_needed(timeout=800)
|
||||
await el.click(timeout=1000)
|
||||
await page.wait_for_timeout(dwell_ms)
|
||||
clicks += 1
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
return {
|
||||
"timestamp": started,
|
||||
"action": "tour_cookie_banner",
|
||||
"clicks": clicks,
|
||||
"settings_opened": settings_opened,
|
||||
"notes": notes,
|
||||
}
|
||||
@@ -64,6 +64,8 @@ _ACCEPT_PHRASES = (
|
||||
"ok", "verstanden",
|
||||
)
|
||||
|
||||
from .audit_walk_banner_tour import tour_cookie_banner
|
||||
|
||||
|
||||
def _ts() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
@@ -311,6 +313,12 @@ async def record_audit_walk(
|
||||
actions[-1]["error"] = str(e)[:200]
|
||||
await page.wait_for_timeout(2000)
|
||||
|
||||
# NEU (Stufe 4): Banner-Tour VOR Accept — Vendor-Liste,
|
||||
# Klappmenüs, Tabs durchklicken sodass Reviewer den
|
||||
# vollen Consent-Inhalt im Video sieht.
|
||||
tour_event = await tour_cookie_banner(page)
|
||||
actions.append(tour_event)
|
||||
|
||||
accept_event = await _try_accept_banner(page)
|
||||
actions.append(accept_event)
|
||||
|
||||
|
||||
@@ -0,0 +1,183 @@
|
||||
"""Finding-Annotator — pro Befund ein PNG mit Marker.
|
||||
|
||||
Erzeugt für die Audit-Mail einen visuellen Beweis pro Finding:
|
||||
roter Rand um das verletzende Element + Label-Banner darüber.
|
||||
|
||||
Aktuell unterstützte Annotation-Typen:
|
||||
- "tap_target_too_small" B1 Mobile-Reachability — markiert den
|
||||
Footer-Anchor mit Größenanforderung
|
||||
- "slug_404" B16 URL-Slug-Drift — Screenshot des
|
||||
404-Slugs mit "Standard-Slug 404"
|
||||
- "missing_link_in_footer" B13 Widerruf — Footer-Screenshot mit
|
||||
"→ Widerruf-Link erwartet, fehlt"
|
||||
|
||||
Output: bytes (PNG). Best-Effort: Fehler beim Rendern werfen nicht,
|
||||
sondern returnen leere bytes (caller dropt das Asset).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Banner-Style für Label oben im Bild (rot mit weißer Schrift).
|
||||
_LABEL_JS = """
|
||||
(label) => {
|
||||
const div = document.createElement('div');
|
||||
div.style.cssText = `
|
||||
position:fixed;top:0;left:0;right:0;z-index:2147483647;
|
||||
background:#dc2626;color:#fff;padding:14px 24px;
|
||||
font:bold 18px/1.2 -apple-system,sans-serif;
|
||||
box-shadow:0 4px 12px rgba(0,0,0,0.4);
|
||||
border-bottom:3px solid #7f1d1d;text-align:center;
|
||||
`;
|
||||
div.textContent = label;
|
||||
document.body.appendChild(div);
|
||||
}
|
||||
"""
|
||||
|
||||
# JS zum Markieren eines Elements: roter Rand + 0.4-Schatten innen.
|
||||
_MARK_JS = """
|
||||
(args) => {
|
||||
const sel = args.selector;
|
||||
const text = args.text;
|
||||
let el = null;
|
||||
if (sel) {
|
||||
try { el = document.querySelector(sel); } catch(e) {}
|
||||
}
|
||||
if (!el && text) {
|
||||
const all = document.querySelectorAll('a, button, [role=button]');
|
||||
for (const c of all) {
|
||||
if ((c.innerText || '').trim().toLowerCase()
|
||||
.includes(text.toLowerCase())) {
|
||||
el = c; break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!el) return false;
|
||||
el.scrollIntoView({behavior:'instant', block:'center'});
|
||||
el.style.outline = '4px solid #dc2626';
|
||||
el.style.outlineOffset = '4px';
|
||||
el.style.boxShadow = 'inset 0 0 0 9999px rgba(220,38,38,0.15)';
|
||||
return true;
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
async def annotate_url(
|
||||
url: str,
|
||||
label: str,
|
||||
selector: str | None = None,
|
||||
text_match: str | None = None,
|
||||
viewport_w: int = 1280,
|
||||
viewport_h: int = 800,
|
||||
timeout_s: float = 20.0,
|
||||
) -> bytes:
|
||||
"""Open `url`, mark element by selector or text-match, save PNG."""
|
||||
try:
|
||||
from playwright.async_api import async_playwright
|
||||
except Exception as e:
|
||||
logger.warning("playwright unavailable: %s", e)
|
||||
return b""
|
||||
|
||||
async with async_playwright() as p:
|
||||
try:
|
||||
browser = await p.webkit.launch(headless=True)
|
||||
context = await browser.new_context(
|
||||
viewport={"width": viewport_w, "height": viewport_h},
|
||||
locale="de-DE",
|
||||
)
|
||||
page = await context.new_page()
|
||||
try:
|
||||
await page.goto(url, wait_until="domcontentloaded",
|
||||
timeout=timeout_s * 1000)
|
||||
except Exception as e:
|
||||
logger.info("annotate_url goto failed for %s: %s",
|
||||
url, e)
|
||||
await page.wait_for_timeout(1500)
|
||||
await page.evaluate(_LABEL_JS, label)
|
||||
if selector or text_match:
|
||||
try:
|
||||
await page.evaluate(
|
||||
_MARK_JS,
|
||||
{"selector": selector or "",
|
||||
"text": text_match or ""},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.info("annotate mark failed: %s", e)
|
||||
await page.wait_for_timeout(500)
|
||||
png = await page.screenshot(full_page=False, type="png")
|
||||
await context.close()
|
||||
await browser.close()
|
||||
return png
|
||||
except Exception as e:
|
||||
logger.warning("annotate_url failed: %s", e)
|
||||
return b""
|
||||
|
||||
|
||||
async def annotate_findings(
|
||||
findings: list[dict],
|
||||
home_url: str,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Per finding produce a PNG. Returns list of {filename, png_b64}.
|
||||
|
||||
Supported finding shapes:
|
||||
- {check_id: 'COOKIE-CONSENT-UX-001', mobile_playwright:
|
||||
{anchor_text: '...'}}
|
||||
- {check_id: 'URL-SLUG-DRIFT-001', alt_slugs_404: [...],
|
||||
doc_type: '...', source_url: '...'} (Stufe-1: nimmt source_url
|
||||
aus state, hier optional)
|
||||
- {check_id: 'WIDERRUF-REACH-001'} → Footer-Screenshot der
|
||||
Homepage mit "→ Widerruf-Link fehlt"
|
||||
|
||||
Skip silently if a finding doesn't match a supported shape.
|
||||
"""
|
||||
out: list[dict[str, Any]] = []
|
||||
for f in findings:
|
||||
cid = (f.get("check_id") or "").upper()
|
||||
png = b""
|
||||
fname = ""
|
||||
if cid == "COOKIE-CONSENT-UX-001":
|
||||
mob = f.get("mobile_playwright") or {}
|
||||
anchor_text = mob.get("anchor_text") or ""
|
||||
if anchor_text:
|
||||
tp = mob.get("tap_target_px") or {}
|
||||
label = (
|
||||
f"B1 — Tap-Target {tp.get('w','?')}×{tp.get('h','?')} "
|
||||
"px (Anforderung ≥ 44×44)"
|
||||
)
|
||||
png = await annotate_url(
|
||||
home_url, label, text_match=anchor_text,
|
||||
)
|
||||
fname = f"finding-tap-target.png"
|
||||
elif cid == "URL-SLUG-DRIFT-001":
|
||||
alts = f.get("alt_slugs_404") or []
|
||||
if alts:
|
||||
# Pick first 404-slug + screenshot the 404 page
|
||||
doc_type = f.get("doc_type") or "x"
|
||||
from urllib.parse import urlparse
|
||||
p = urlparse(home_url)
|
||||
origin = f"{p.scheme}://{p.netloc}"
|
||||
slug_url = f"{origin}/{alts[0]}"
|
||||
label = (
|
||||
f"B16 — Standard-Slug 404 ({doc_type}): /{alts[0]}"
|
||||
)
|
||||
png = await annotate_url(slug_url, label)
|
||||
fname = f"finding-slug-404-{doc_type}.png"
|
||||
elif cid == "WIDERRUF-REACH-001":
|
||||
label = "B13 — Widerrufsbelehrung im Footer erwartet, fehlt"
|
||||
png = await annotate_url(
|
||||
home_url, label, selector="footer",
|
||||
)
|
||||
fname = "finding-widerruf-missing.png"
|
||||
|
||||
if png and fname:
|
||||
out.append({
|
||||
"filename": fname,
|
||||
"png_b64": base64.b64encode(png).decode("ascii"),
|
||||
})
|
||||
return out
|
||||
@@ -117,6 +117,8 @@ services:
|
||||
DECOMPOSITION_LLM_MODEL: ${DECOMPOSITION_LLM_MODEL:-claude-haiku-4-5-20251001}
|
||||
SMTP_HOST: ${SMTP_HOST:-bp-core-mailpit}
|
||||
SMTP_PORT: ${SMTP_PORT:-1025}
|
||||
# P125: V2-Mail-Layout default aktiv (B-Wirings sichtbar)
|
||||
MAIL_RENDER_V2: ${MAIL_RENDER_V2:-true}
|
||||
SMTP_USERNAME: ${SMTP_USERNAME:-}
|
||||
SMTP_PASSWORD: ${SMTP_PASSWORD:-}
|
||||
SMTP_FROM_NAME: ${SMTP_FROM_NAME:-BreakPilot Compliance}
|
||||
|
||||
Reference in New Issue
Block a user