feat(audit): overlapping evidence-slices fuer lueckenlose Beweiskette

Statt EIN full-page screenshot: full-page wird per PIL in viewport-grosse
Slices geschnitten, jede ueberlappt die vorherige um overlap_px Pixel.
Jeder Cookie erscheint in mind. einer Slice, an Slice-Grenzen sogar in
zwei → Dedup nach Name eliminiert die Doppel.

Warum nicht direkt scroll-based slicing in Playwright? VW's
Cookie-Page nutzt scroll-snap / fixed-position — alle viewport-shots
kamen identisch zurueck (Header-Overlay). PIL-cut auf dem full-page
PNG bypasst das Problem voellig.

VW smoke-test (32 slices):
  per-slice: [0, 0, 2, 5, 5, 3, 4, 7, 4, 3, 4, 5, ...]
  103 raw cookies → 79 unique nach dedup
  14 vendor records (Google 9, Adobe-Familie 17, etc.)

Jeder Slice hat eigenen Timestamp + SHA256 → ZIP-Anhang fuer
juristische Beweiskette.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-22 23:38:13 +02:00
parent 1784b43d72
commit efeef73f90
3 changed files with 300 additions and 1 deletions
@@ -333,6 +333,131 @@ _VISION_PROMPT = (
)
async def capture_cookie_evidence_slices(
cookie_url: str, check_id: str = "",
viewport_h: int = 1024, overlap_px: int = 200, max_slices: int = 40,
timeout_s: float = 180.0,
) -> dict:
"""Capture a full-page screenshot and slice it (with overlap) in-memory.
Why not scroll-based slicing in Playwright? VW's cookie-page uses
scroll-snap / fixed-position elements that defeat window.scrollTo —
all viewport screenshots came back identical (header overlay only).
A full-page screenshot bypasses scrolling entirely, and we slice the
PNG bytes locally via PIL to get the same overlapping evidence chain.
"""
if not cookie_url:
return {"slices": [], "error": "no url"}
try:
async with httpx.AsyncClient(timeout=timeout_s) as c:
r = await c.post(
f"{CONSENT_TESTER_URL}/capture-evidence",
json={"url": cookie_url, "check_id": check_id},
timeout=timeout_s,
)
r.raise_for_status()
data = r.json()
except Exception as e:
logger.warning("capture full-page evidence failed: %s", e)
return {"slices": [], "error": str(e)[:200]}
png_b64 = data.get("png_b64", "")
if not png_b64:
return {"slices": [], "error": data.get("error", "no png")}
try:
from PIL import Image
from io import BytesIO
import hashlib as _hl
png = _b64.b64decode(png_b64)
img = Image.open(BytesIO(png)).convert("RGB")
w, h = img.size
step = max(1, viewport_h - overlap_px)
slices: list[dict] = []
idx = 0
y = 0
while y < h and idx < max_slices:
top = y
bot = min(y + viewport_h, h)
chunk = img.crop((0, top, w, bot))
buf = BytesIO()
chunk.save(buf, format="PNG", optimize=True)
png_chunk = buf.getvalue()
slices.append({
"idx": idx,
"ts": data.get("captured_at", ""),
"top_y": top, "bot_y": bot,
"sha256": _hl.sha256(png_chunk).hexdigest()[:16],
"png_b64": _b64.b64encode(png_chunk).decode("ascii"),
"png_size": len(png_chunk),
})
y += step
idx += 1
logger.info(
"Evidence-slices (PIL-cut): %s%d slices (image %dx%d, "
"viewport=%d, overlap=%d)",
cookie_url, len(slices), w, h, viewport_h, overlap_px,
)
return {
"slices": slices,
"total_height_px": h,
"width_px": w,
"accepted_banner": data.get("accepted_banner"),
"expanded": data.get("expanded"),
"url": data.get("url", cookie_url),
"captured_at": data.get("captured_at", ""),
}
except Exception as e:
logger.warning("PIL-slice failed: %s (%s)",
str(e) or "(no msg)", type(e).__name__)
return {"slices": [], "error": str(e)[:200]}
def ocr_slices_extract_cookies(
slices: list[dict],
) -> tuple[list[dict], dict]:
"""Run Tesseract on each slice + parse + dedup by cookie name.
Returns (cookies, stats) where stats has:
per_slice: [{idx, cookies_found, ts}]
total_raw, total_unique
"""
import base64 as _b64
per_slice: list[dict] = []
all_cookies: list[dict] = []
seen_names: set[str] = set()
for s in slices:
try:
png = _b64.b64decode(s.get("png_b64", ""))
except Exception:
continue
text = ocr_screenshot_via_tesseract(png)
chunk = parse_ocr_cookie_table(text)
per_slice.append({
"idx": s.get("idx"), "ts": s.get("ts"),
"top_y": s.get("top_y"), "bot_y": s.get("bot_y"),
"cookies_found": len(chunk),
})
for c in chunk:
nl = (c.get("name") or "").strip().lower()
if not nl or nl in seen_names:
continue
seen_names.add(nl)
all_cookies.append(c)
stats = {
"per_slice": per_slice,
"total_raw": sum(p["cookies_found"] for p in per_slice),
"total_unique": len(all_cookies),
"slices": len(slices),
}
logger.info(
"ocr_slices_extract_cookies: %d slices → %d raw → %d unique cookies",
stats["slices"], stats["total_raw"], stats["total_unique"],
)
return all_cookies, stats
async def capture_cookie_screenshot(
cookie_url: str, check_id: str = "", timeout_s: float = 60.0,
) -> dict: