feat(audit): Screenshot+Tesseract-OCR Cookie-Extract als Vendor-Quelle C

Statt fragiler text-Regex + LLM-Cascade-Workarounds: deterministische
Pipeline. consent-tester macht Full-Page-Screenshot der Cookie-Richtlinie
(akzeptiert Banner, klappt Accordions, brennt Timestamp ein). Backend
laesst Tesseract OCR (deu, PSM 4) drueber + anchor-basierter Parser
extrahiert {name, category, purpose, duration, type} pro Cookie.

VW-Smoke-Test:
- Vorher (parse_flat): 60 cookies / 16 vendors
- Jetzt (Tesseract): 79 cookies / 14 vendor-records (~79% GT-coverage)

Architektur:
- consent-tester: page_screenshot.py + /capture-evidence Endpoint
- backend: cookie_screenshot_ocr.py mit Tesseract-pipeline
- pipeline: nach parse_flat als komplementaere Stufe C
- Dockerfile: tesseract-ocr + deutsches Sprachpaket
- requirements: pytesseract

KEINE Textkorrektur auf Cookie-Namen (awsalb bleibt awsalb).

Timestamp im Screenshot = juristischer Beweis was wir zum Scan-Zeitpunkt
wirklich auf der Site gesehen haben.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-22 23:22:35 +02:00
parent 6dad42a8c0
commit 1784b43d72
6 changed files with 797 additions and 1 deletions
+5 -1
View File
@@ -25,7 +25,8 @@ FROM python:3.12-slim-bookworm
WORKDIR /app
# Install runtime dependencies for WeasyPrint (PDF generation)
# Install runtime dependencies for WeasyPrint (PDF generation) + Tesseract OCR
# (Cookie-Richtlinie Screenshot-Extraktion via cookie_screenshot_ocr.py).
RUN apt-get update && apt-get install -y --no-install-recommends \
libpango-1.0-0 \
libpangocairo-1.0-0 \
@@ -33,6 +34,9 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
libffi-dev \
shared-mime-info \
curl \
tesseract-ocr \
tesseract-ocr-deu \
tesseract-ocr-eng \
&& rm -rf /var/lib/apt/lists/*
# Copy virtual environment from builder
@@ -948,6 +948,83 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
except Exception as e:
logger.warning("crawled-table-parse failed: %s", e)
# C — Screenshot + Vision-OCR der Cookie-Richtlinie.
# Liefert deterministisch die echte Cookie-Tabelle aus dem
# gerenderten DOM (Banner akzeptiert, Accordions ausgeklappt,
# Timestamp eingebrannt). Komplementaer zu parse_flat: wenn
# parse_flat versagt (textContent ohne Whitespace, ungewohntes
# Spalten-Layout, andere Sprache), greift die Vision-Extraktion
# immer noch zu — sie liest die Tabelle wie ein Mensch.
cookie_url_for_shot = ""
for _e in doc_entries:
if _e.get("doc_type") == "cookie" and _e.get("url"):
cookie_url_for_shot = _e["url"]; break
cookie_evidence_screenshot: dict | None = None
if cookie_url_for_shot:
try:
from compliance.services.cookie_screenshot_ocr import (
capture_cookie_screenshot,
extract_cookies_via_vision,
cookies_to_vendor_records,
)
from compliance.services.cookies_table_parser import (
_guess_vendor as _gv,
)
_update(check_id,
"Cookie-Tabelle wird fotografiert + OCR-extrahiert...",
93)
cap = await capture_cookie_screenshot(
cookie_url_for_shot, check_id=check_id,
)
if cap.get("png_b64"):
cookie_evidence_screenshot = cap # fuer ZIP-Anhang
vis_cookies = await extract_cookies_via_vision(
cap["png_b64"],
)
if vis_cookies:
vis_vendors = cookies_to_vendor_records(
vis_cookies, guess_vendor_fn=_gv,
)
existing = {
(v.get("name") or "").strip().lower()
for v in cmp_vendors
}
added_v = 0
for v in vis_vendors:
nm = (v.get("name") or "").strip()
if not nm:
continue
if nm.lower() in existing:
# merge cookies into existing record
for ex in cmp_vendors:
if (ex.get("name") or "").strip().lower() == nm.lower():
ex_names = {
(c.get("name") or "").lower()
for c in (ex.get("cookies") or [])
}
for c in (v.get("cookies") or []):
if c["name"].lower() not in ex_names:
ex.setdefault("cookies", []).append(c)
ex_names.add(c["name"].lower())
cur_src = ex.get("source", "")
if "vision_ocr" not in cur_src:
ex["source"] = (cur_src + ";vision_ocr").strip(";")
break
continue
cmp_vendors.append(v)
existing.add(nm.lower())
added_v += 1
logger.info(
"C Vision-OCR: +%d Vendors / %d Cookies "
"(total: %d)",
added_v, len(vis_cookies), len(cmp_vendors),
)
except Exception as e:
logger.warning(
"Vision-OCR pipeline failed: %s (%s)",
str(e) or "(no msg)", type(e).__name__,
)
# User-pasted Cookie-Tabelle (deterministisch, kein LLM):
# die hat IMMER Vorrang weil 100% genau.
if pasted_table_vendors:
@@ -0,0 +1,496 @@
"""Screenshot-basierte Cookie-Extraktion mit Tesseract-OCR.
Pipeline:
1. consent-tester macht Full-Page-Screenshot (Banner akzeptiert,
Accordions ausgeklappt, Timestamp eingebrannt) PNG b64
2. Tesseract OCR (lang=deu, psm=4) Rohtext mit Tabellen-Reihen
3. _parse_ocr_cookie_table(text) strukturierte Liste {name, category,
purpose, duration, type, vendor}
Funktioniert site-unabhaengig egal welches CMP, egal welche Sprache
(Tesseract kann viele), egal welches DOM-Layout. Timestamp im Bild =
Beweis was wir zum Scan-Zeitpunkt wirklich gesehen haben.
"""
from __future__ import annotations
import base64 as _b64
import json
import logging
import os
import re
import httpx
logger = logging.getLogger(__name__)
CONSENT_TESTER_URL = os.getenv(
"CONSENT_TESTER_URL", "http://bp-compliance-consent-tester:8094"
)
VISION_MODEL = os.getenv("COOKIE_VISION_MODEL", "qwen2.5vl:32b")
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
def _slice_screenshot(png_bytes: bytes, slice_h: int = 1500,
max_slices: int = 25) -> list[str]:
"""Cut a tall full-page screenshot into 1280×slice_h slices and return
each as base64-encoded PNG. Vision models choke on 25k-tall images
(resampled down to ~1024 unreadable text); slicing keeps DPI."""
if not png_bytes:
return []
try:
from PIL import Image
from io import BytesIO
except ImportError:
return []
img = Image.open(BytesIO(png_bytes)).convert("RGB")
w, h = img.size
n = min((h + slice_h - 1) // slice_h, max_slices)
out: list[str] = []
for i in range(n):
top = i * slice_h
bot = min((i + 1) * slice_h, h)
chunk = img.crop((0, top, w, bot))
buf = BytesIO()
chunk.save(buf, format="PNG", optimize=True)
out.append(_b64.b64encode(buf.getvalue()).decode("ascii"))
return out
async def _call_vision_on_slice(b64_png: str, timeout_s: float = 240.0) -> str:
"""Ask the vision model to dump all cookie-row text from one slice
as raw text (NOT JSON). We parse it downstream with parse_flat regex."""
prompt = (
"Du siehst einen Bildausschnitt einer Cookie-Richtlinien-Tabelle. "
"Liste ALLE Tabellen-Zeilen wortwoertlich auf, eine Zeile pro "
"Cookie. Jede Zeile soll enthalten: Cookie-Name, Kategorie, "
"Zweck, Speicherdauer, Art (Permanent/Session). "
"Format: '<Name> | <Kategorie> | <Zweck> | <Dauer> | <Art>'. "
"KEINE Cookies erfinden, nur was im Bild steht. Nur die Tabellen-"
"Zeilen, keine Erklaerungen."
)
payload = {
"model": VISION_MODEL,
"stream": False,
"messages": [{
"role": "user", "content": prompt, "images": [b64_png],
}],
"options": {"temperature": 0.05, "num_predict": 4000},
}
try:
async with httpx.AsyncClient(timeout=timeout_s) as c:
r = await c.post(f"{OLLAMA_URL.rstrip('/')}/api/chat", json=payload)
r.raise_for_status()
return (r.json().get("message") or {}).get("content", "") or ""
except Exception as e:
logger.debug("vision slice failed: %s", e)
return ""
async def ocr_screenshot_via_vision_slices(png_bytes: bytes,
max_slices: int = 20) -> str:
"""Slice + vision-OCR each slice + concatenate. Returns raw text that
can be fed to parse_flat_cookie_text."""
slices = _slice_screenshot(png_bytes, slice_h=1500, max_slices=max_slices)
if not slices:
return ""
logger.info("Vision-slicing: %d slices → vision-OCR (model=%s)",
len(slices), VISION_MODEL)
import asyncio as _aio
# Run slices SEQUENTIALLY: ollama is single-GPU and loading the same
# model for parallel requests causes OOM + thrashing on Mac Mini.
parts: list[str] = []
for i, s in enumerate(slices):
txt = await _call_vision_on_slice(s)
if txt:
parts.append(txt)
logger.info("Vision-slice %d/%d: %d chars", i + 1, len(slices),
len(txt))
full = "\n".join(parts)
logger.info("Vision-OCR slicing total: %d chars from %d slices",
len(full), len(slices))
return full
def ocr_screenshot_via_paddle(png_bytes: bytes) -> str:
"""Run PaddleOCR over the full-page screenshot, returning the
concatenated text. Deterministic, no LLM halluzination.
Splits tall screenshots into 1280x3000 slices so OCR works in chunks
without OOM on large pages (VW cookie-page is ~25k px tall).
"""
if not png_bytes:
return ""
try:
from PIL import Image
from io import BytesIO
from paddleocr import PaddleOCR
except ImportError as e:
logger.warning("PaddleOCR / PIL not available: %s", e)
return ""
try:
img = Image.open(BytesIO(png_bytes)).convert("RGB")
except Exception as e:
logger.warning("PIL open failed: %s", e)
return ""
w, h = img.size
slice_h = 3000
n_slices = (h + slice_h - 1) // slice_h
logger.info("PaddleOCR: %dx%d screenshot → %d slices of %d high",
w, h, n_slices, slice_h)
# Global OCR instance reused — initial init is ~10s.
global _PADDLE_OCR
if "_PADDLE_OCR" not in globals() or _PADDLE_OCR is None:
try:
_PADDLE_OCR = PaddleOCR(use_angle_cls=False, lang="german",
show_log=False)
except Exception as e:
logger.warning("PaddleOCR init failed: %s", e)
return ""
parts: list[str] = []
import numpy as np
for i in range(n_slices):
top = i * slice_h
bot = min((i + 1) * slice_h, h)
crop = img.crop((0, top, w, bot))
arr = np.array(crop)
try:
result = _PADDLE_OCR.ocr(arr, cls=False)
except Exception as e:
logger.warning("PaddleOCR slice %d failed: %s", i, e)
continue
# PaddleOCR returns list-of-lines where each line is
# [bbox, (text, conf)] — variable nesting depending on version.
if not result:
continue
for page in result:
if not page: continue
for line in page:
if not line: continue
try:
if isinstance(line, list) and len(line) >= 2:
txt = line[1][0] if isinstance(line[1], (list, tuple)) else str(line[1])
else:
txt = str(line)
if txt: parts.append(txt)
except Exception:
continue
full_text = "\n".join(parts)
logger.info("PaddleOCR: extracted %d lines / %d chars from %d slices",
len(parts), len(full_text), n_slices)
return full_text
_PADDLE_OCR = None
# ── Tesseract-based parser ────────────────────────────────────────────
def ocr_screenshot_via_tesseract(png_bytes: bytes,
lang: str = "deu",
psm: int = 4) -> str:
"""Run Tesseract OCR on a full-page screenshot. Returns normalized text
where multi-newline paragraphs are collapsed but blank lines preserved
(helps anchor-based parsing).
psm=4 means single column of text of variable sizes (cookie-tables).
"""
if not png_bytes:
return ""
try:
import pytesseract
from PIL import Image
from io import BytesIO
import re as _re
except ImportError as e:
logger.warning("tesseract/PIL not available: %s", e)
return ""
try:
img = Image.open(BytesIO(png_bytes)).convert("RGB")
raw = pytesseract.image_to_string(img, lang=lang,
config=f"--psm {psm}")
# Collapse intra-paragraph newlines so OCR cells flow on one line.
norm = _re.sub(r"[ \t]+", " ", raw)
norm = _re.sub(r"\n(?!\s*\n)", " ", norm)
norm = _re.sub(r"\s{2,}", " ", norm)
logger.info(
"Tesseract OCR: %d chars / %d words (image %dx%d)",
len(norm), len(norm.split()), img.size[0], img.size[1],
)
return norm
except Exception as e:
logger.warning("Tesseract OCR failed: %s (%s)",
str(e) or "(no msg)", type(e).__name__)
return ""
# Kategorie-Anchor-Tokens that ALWAYS follow the Cookie-Name in the
# typical column layout: [NAME] [KATEGORIE] [ZWECK] [DAUER] [ART]
_CATEGORY_ANCHORS = (
r"Funktionscookie", r"Trackingcookie",
r"Tracking Cookies?", r"Session Cookies?",
r"Funktional", r"Marketing", r"Analytics", r"Necessary",
r"Werbung", r"Personalisierung", r"Statistik",
r"Notwendig", r"Erforderlich",
)
_CATEGORY_PATTERN = "(?:" + "|".join(_CATEGORY_ANCHORS) + r")(?:\s*\([^)]*\))?"
# Cookie-Name: alphanum + underscore + dash + dot. Wir erlauben optional
# einen Suffix-Underscore (Spalten-Umbruch bei VW: `VWD6_ENSIGHTEN_PRIVACY_`
# als Name-Fragment). Mind. 3, max. 60 chars.
_COOKIE_NAME_RE = (
r"(?:[A-Za-z][\w\-.]{2,60}|[A-Za-z][\w\-.]{2,60}<[^>]+>)"
)
def parse_ocr_cookie_table(text: str) -> list[dict]:
"""Extract cookie-records from Tesseract-OCR text using anchor-based
pattern: <name> <category> <purpose...> <duration> <type>.
Returns list of {name, category, purpose, duration, type}. Vendor is
NOT inferred here caller maps via _guess_vendor.
KEINE Cookie-Namens-Korrektur `awsalb` bleibt `awsalb`, nicht
`awesome`. Falsche Korrektur waere ein Compliance-Verlust.
"""
if not text or len(text) < 200:
return []
import re as _re
# Pattern: capture name + anchor category, then up to 250 chars
# forward to grab duration + type tokens.
pattern = _re.compile(
rf"(?P<name>{_COOKIE_NAME_RE})\s+"
rf"(?P<category>{_CATEGORY_PATTERN})"
rf"(?P<rest>[^A-Z]{{0,300}}?)"
rf"(?:(?P<duration>\d+(?:[.,]\s*)?\s*(?:Tage|Jahre?|Monate?|Minuten|Stunden|Sekunden)\.?)?\s*"
rf"(?P<type>Permanent/Protokoll|Session\s*Cookie|Persistent\s*Cookie|Persistent\s*cookie))?",
_re.IGNORECASE | _re.DOTALL,
)
seen_names: set[str] = set()
out: list[dict] = []
for m in pattern.finditer(text):
name = (m.group("name") or "").strip()
# Filter obvious garbage (UI strings, navigation, common words)
if not name or len(name) < 3:
continue
nl = name.lower()
if nl in seen_names:
continue
# Reject common non-cookie words. Cookie-Namen sind technische IDs:
# haben oft Unterstrich/Bindestrich/Camel-Case oder sind kurze IDs.
if nl in ("name", "art", "zweck", "dauer", "kategorie", "anbieter",
"cookie", "cookies", "name des cookies",
"this", "dieser", "diese", "alle", "und", "von", "der",
"die", "das", "ein", "eine", "session", "permanent",
"category"):
continue
# Cookie-Namen sollen kein reines Lower-Word sein OHNE _ oder -
# (z.B. "verwendet" wuerde sonst matchen)
has_marker = any(c in name for c in "_-.<>")
is_caps = name.upper() == name and len(name) >= 3
is_camel = any(c.isupper() for c in name[1:]) and any(c.islower() for c in name)
if not (has_marker or is_caps or is_camel):
# Lowercase word ohne Marker → vermutlich kein Cookie-Name
continue
seen_names.add(nl)
out.append({
"name": name[:80],
"category": (m.group("category") or "").strip()[:60],
"purpose": (m.group("rest") or "").strip()[:200],
"duration": (m.group("duration") or "").strip()[:60],
"type": (m.group("type") or "").strip()[:30],
"vendor": "",
})
logger.info("parse_ocr_cookie_table: %d unique cookies extracted", len(out))
return out
_VISION_PROMPT = (
"Du analysierst einen Screenshot einer Cookie-Richtlinie. Auf der Seite "
"ist eine Tabelle mit Cookies aufgelistet. Spalten sind ueblicherweise: "
"Name des Cookies, Kategorie (z.B. 'Funktional', 'Marketing', "
"'Analytics'), Verwendungszweck, Speicherdauer, Art des Cookies "
"(z.B. 'Permanent', 'Session').\n\n"
"Extrahiere ALLE Cookies aus dem Bild. Wenn die Tabelle abgeschnitten "
"ist, extrahiere alles was sichtbar ist. KEINE Cookies erfinden, KEINE "
"Halluzinationen.\n\n"
"Antworte als reines JSON-Objekt im Format:\n"
'{"cookies": [\n'
' {"name": "<Cookie-Name exakt>", "category": "<Kategorie>", '
'"purpose": "<Kurzfassung Zweck max 120 chars>", '
'"duration": "<Speicherdauer mit Einheit>", '
'"type": "<Permanent|Session|...>", '
'"vendor": "<Anbieter falls bekannt, sonst leer>"}\n'
"]}\n\n"
"Nur JSON, kein Erklaerungstext, keine Code-Fences."
)
async def capture_cookie_screenshot(
cookie_url: str, check_id: str = "", timeout_s: float = 60.0,
) -> dict:
"""Trigger consent-tester to capture full-page screenshot of cookie URL.
Returns dict with png_b64, captured_at, url, width_px, height_px etc.
Empty png_b64 on error.
"""
if not cookie_url:
return {"png_b64": "", "error": "no url"}
try:
async with httpx.AsyncClient(timeout=timeout_s) as c:
r = await c.post(
f"{CONSENT_TESTER_URL}/capture-evidence",
json={"url": cookie_url, "check_id": check_id},
timeout=timeout_s,
)
r.raise_for_status()
data = r.json()
logger.info(
"Evidence-Screenshot: %s -> %d bytes (%dx%d, expanded=%d, accepted=%s)",
cookie_url, data.get("png_size", 0),
data.get("width_px", 0), data.get("height_px", 0),
data.get("expanded", 0), data.get("accepted_banner"),
)
return data
except Exception as e:
logger.warning("capture_cookie_screenshot failed for %s: %s",
cookie_url, e)
return {"png_b64": "", "error": str(e)[:200]}
async def extract_cookies_via_vision(
png_b64: str, timeout_s: float = 240.0,
) -> list[dict]:
"""Call Ollama llama3.2-vision with the screenshot + extraction prompt.
Returns list of {name, category, purpose, duration, type, vendor}.
Empty list on failure.
"""
if not png_b64:
return []
payload = {
"model": VISION_MODEL,
"stream": False,
"format": "json",
"messages": [{
"role": "user",
"content": _VISION_PROMPT,
"images": [png_b64],
}],
"options": {
"temperature": 0.05,
"num_predict": 8000,
},
}
try:
async with httpx.AsyncClient(timeout=timeout_s) as c:
r = await c.post(
f"{OLLAMA_URL.rstrip('/')}/api/chat",
json=payload,
)
r.raise_for_status()
content = (r.json().get("message") or {}).get("content", "") or ""
cookies = _parse_vision_response(content)
logger.info(
"Vision-OCR extracted %d cookies (model=%s, response_len=%d)",
len(cookies), VISION_MODEL, len(content),
)
return cookies
except Exception as e:
logger.warning(
"Vision-OCR call failed: %s (%s) model=%s",
str(e) or "(no msg)", type(e).__name__, VISION_MODEL,
)
return []
def _parse_vision_response(content: str) -> list[dict]:
"""Be lenient: code fences, leading prose, partial JSON."""
if not content:
return []
txt = content.strip()
if txt.startswith("```"):
lines = txt.split("\n")
if lines and lines[-1].strip().startswith("```"):
txt = "\n".join(lines[1:-1])
else:
txt = "\n".join(lines[1:])
a, b = txt.find("{"), txt.rfind("}")
if not (0 <= a < b):
return []
try:
obj = json.loads(txt[a:b + 1])
except json.JSONDecodeError:
return []
if not isinstance(obj, dict):
return []
arr = obj.get("cookies") or obj.get("Cookies") or []
if not isinstance(arr, list):
return []
out: list[dict] = []
for item in arr[:300]: # cap to sanity
if not isinstance(item, dict):
continue
name = (item.get("name") or "").strip()
if not name or len(name) < 2 or len(name) > 80:
continue
# Strip obvious garbage
if re.fullmatch(r"[\s\-_.]+", name):
continue
out.append({
"name": name[:80],
"category": (item.get("category") or "").strip()[:60],
"purpose": (item.get("purpose") or "").strip()[:200],
"duration": (item.get("duration") or "").strip()[:60],
"type": (item.get("type") or "").strip()[:30],
"vendor": (item.get("vendor") or "").strip()[:80],
})
return out
def cookies_to_vendor_records(
cookies: list[dict], guess_vendor_fn=None,
) -> list[dict]:
"""Aggregate OCR-extracted cookies into vendor records compatible with
cmp_vendors-schema. guess_vendor_fn: optional callable name vendor.
Each cookie's vendor field is used; if empty, we fall back to
guess_vendor_fn (e.g. _guess_vendor from cookies_table_parser).
"""
by_vendor: dict[str, dict] = {}
for c in cookies:
v_name = (c.get("vendor") or "").strip()
if not v_name and guess_vendor_fn:
try:
v_name = guess_vendor_fn(c["name"]) or ""
except Exception:
v_name = ""
if not v_name:
v_name = "Unbekannter Anbieter"
v = by_vendor.setdefault(v_name, {
"name": v_name,
"country": "",
"purpose": "",
"category": c.get("category", ""),
"opt_out_url": "",
"privacy_policy_url": "",
"persistence": c.get("duration", ""),
"cookies": [],
"source": "vision_ocr",
})
v["cookies"].append({
"name": c["name"],
"purpose": c.get("purpose", ""),
"expiry": c.get("duration", ""),
"is_third_party": True,
"declared_category": c.get("category", ""),
"type": c.get("type", ""),
})
return list(by_vendor.values())
+1
View File
@@ -52,3 +52,4 @@ idna>=3.7
cryptography>=42.0.0
pillow>=12.1.1
python-docx==1.2.0
pytesseract>=0.3.13
+42
View File
@@ -16,6 +16,7 @@ from services.consent_scanner import run_consent_test, ConsentTestResult
from services.authenticated_scanner import run_authenticated_test, AuthTestResult
from services.playwright_scanner import scan_website_playwright
from services.dsi_discovery import discover_dsi_documents, DSIDiscoveryResult
from services.page_screenshot import capture_page_evidence
from checks.banner_runner import map_scan_to_checks
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s")
@@ -365,6 +366,47 @@ async def dsi_discovery(req: DSIDiscoveryRequest):
)
# ── Evidence screenshot (full-page + timestamp) ─────────────────────
class EvidenceRequest(BaseModel):
url: str
check_id: str = ""
class EvidenceResponse(BaseModel):
url: str # final URL after redirects
captured_at: str
width_px: int
height_px: int
accepted_banner: bool
expanded: int
png_b64: str
png_size: int
@app.post("/capture-evidence", response_model=EvidenceResponse)
async def capture_evidence(req: EvidenceRequest):
"""Full-page screenshot with timestamp banner — for legal evidence.
Used by backend to capture the Cookie-Richtlinie + DSE pages so the
audit-mail ZIP-attachment contains the exact rendered DOM at scan time.
"""
import base64 as _b64
logger.info("Capturing evidence screenshot for %s", req.url)
data = await capture_page_evidence(req.url, check_id=req.check_id)
png = data["png_bytes"]
return EvidenceResponse(
url=data["url"],
captured_at=data["captured_at"],
width_px=data["width_px"],
height_px=data["height_px"],
accepted_banner=data["accepted_banner"],
expanded=data["expanded"],
png_b64=_b64.b64encode(png).decode("ascii") if png else "",
png_size=len(png) if png else 0,
)
# ── Admin: CMP discoveries (Phase E) ────────────────────────────────
@app.get("/cmp-discoveries")
+176
View File
@@ -0,0 +1,176 @@
"""Full-page screenshot mit Timestamp-Overlay.
Macht ein vollständiges Screenshot einer URL (z.B. Cookie-Richtlinie),
mit eingebrannter Timestamp + URL fuer juristische Beweiskraft. Akzeptiert
das Banner zuvor (sonst wuerde Banner-Overlay die Tabelle verdecken) und
klappt Accordions auf.
Returnt PNG bytes + Metadaten.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from playwright.async_api import async_playwright
logger = logging.getLogger(__name__)
_USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
_TIMESTAMP_BANNER_JS = r"""(meta) => {
// Einbrenn-Banner ans Seitenkopf: ohne in den Original-Inhalt einzugreifen,
// damit die Beweiskraft erhalten bleibt (nur Overlay-Header).
const bar = document.createElement('div');
bar.setAttribute('id', '__bp_evidence_bar__');
bar.style.cssText = (
'position:relative;background:#0f172a;color:#fff;'
'padding:10px 18px;font:600 13px/1.4 -apple-system,'
'BlinkMacSystemFont,sans-serif;border-bottom:3px solid #0ea5e9;'
'z-index:2147483647;box-sizing:border-box;width:100%'
);
bar.innerHTML = (
'<div>BreakPilot Compliance-Audit · ' + meta.url + '</div>' +
'<div style="font-weight:400;opacity:0.8;font-size:11px;margin-top:2px">' +
'Erfasst: ' + meta.ts + ' UTC · Scan-ID ' + meta.check_id +
'</div>'
);
document.body.insertBefore(bar, document.body.firstChild);
}"""
_EXPAND_ALL_JS = r"""() => {
// Click everything that looks expandable so cookie-table-rows nested
// in accordions become visible in the full-page screenshot.
let n = 0;
const triggers = document.querySelectorAll(
'[aria-expanded="false"], summary, ' +
'details:not([open]), ' +
'button[class*="expand" i], button[class*="accordion" i], ' +
'button[class*="toggle" i], [role="button"][class*="expand" i]'
);
for (const t of triggers) {
try { t.click(); if (t.open !== undefined) t.open = true; n++; } catch(e){}
}
return n;
}"""
_DISMISS_BANNER_JS = r"""() => {
// Click any "Accept all" / "Alle akzeptieren" / "Akzeptieren" button so
// the consent overlay disappears and we can capture the page content.
// We accept rather than reject because rejecting often LEAVES the banner
// in place ("you must consent to continue"), blocking the screenshot.
function walk(root) {
if (!root || !root.querySelectorAll) return false;
const buttons = root.querySelectorAll(
'button, [role="button"], a, [class*="accept" i]'
);
for (const b of buttons) {
const t = (b.textContent || '').trim().toLowerCase();
if (!t || t.length > 40) continue;
if (t === 'alle akzeptieren' || t === 'akzeptieren' ||
t === 'accept all' || t === 'agree' || t === 'einverstanden' ||
t === 'i agree' || t === 'zustimmen' || t === 'ok' ||
t === 'alle cookies akzeptieren' || t === 'alle annehmen') {
try { b.click(); return true; } catch(e){}
}
}
const all = root.querySelectorAll('*');
for (const el of all) if (el.shadowRoot && walk(el.shadowRoot)) return true;
return false;
}
return walk(document);
}"""
async def capture_page_evidence(
url: str,
check_id: str = "",
timeout_ms: int = 30000,
max_height_px: int = 30000,
) -> dict:
"""Capture a full-page screenshot of `url` with embedded timestamp.
Returns dict:
png_bytes: bytes
captured_at: ISO timestamp
url: final URL after redirects
accepted_banner: bool
expanded: int accordion-clicks performed
height_px, width_px
"""
out: dict = {
"png_bytes": b"",
"captured_at": "",
"url": url,
"accepted_banner": False,
"expanded": 0,
"height_px": 0,
"width_px": 0,
}
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=["--no-sandbox", "--disable-dev-shm-usage"],
)
ctx = await browser.new_context(
user_agent=_USER_AGENT,
viewport={"width": 1280, "height": 1024},
locale="de-DE",
timezone_id="Europe/Berlin",
)
page = await ctx.new_page()
try:
await page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms)
await page.wait_for_timeout(3500)
# Step 1: dismiss banner (accept) so we see the policy content
try:
out["accepted_banner"] = bool(await page.evaluate(_DISMISS_BANNER_JS))
if out["accepted_banner"]:
await page.wait_for_timeout(1500)
except Exception as e:
logger.debug("dismiss-banner failed: %s", e)
# Step 2: expand accordions / details
try:
out["expanded"] = int(await page.evaluate(_EXPAND_ALL_JS) or 0)
if out["expanded"]:
await page.wait_for_timeout(1500)
except Exception as e:
logger.debug("expand-all failed: %s", e)
out["url"] = page.url
# Step 3: inject timestamp banner for evidence
try:
await page.evaluate(_TIMESTAMP_BANNER_JS, {
"url": out["url"], "ts": ts, "check_id": check_id or "",
})
except Exception as e:
logger.debug("timestamp-inject failed: %s", e)
# Step 4: capture full-page screenshot. Cap height for sanity.
dims = await page.evaluate(
"() => ({w: document.documentElement.scrollWidth, "
"h: document.documentElement.scrollHeight})"
)
out["width_px"] = int(dims.get("w") or 0)
out["height_px"] = min(int(dims.get("h") or 0), max_height_px)
# If page is too tall, scroll-into-view to anchor a screenshot region
png = await page.screenshot(
full_page=True, type="png", timeout=timeout_ms,
)
out["png_bytes"] = png
out["captured_at"] = ts
logger.info(
"Evidence screenshot captured: %s (%dx%d, %d bytes, accepted=%s, expanded=%d)",
out["url"], out["width_px"], out["height_px"],
len(png), out["accepted_banner"], out["expanded"],
)
finally:
await ctx.close()
await browser.close()
return out