From efeef73f9016f46235681caa48559e3bbe257e56 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Fri, 22 May 2026 23:38:13 +0200 Subject: [PATCH] feat(audit): overlapping evidence-slices fuer lueckenlose Beweiskette MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Statt EIN full-page screenshot: full-page wird per PIL in viewport-grosse Slices geschnitten, jede ueberlappt die vorherige um overlap_px Pixel. Jeder Cookie erscheint in mind. einer Slice, an Slice-Grenzen sogar in zwei → Dedup nach Name eliminiert die Doppel. Warum nicht direkt scroll-based slicing in Playwright? VW's Cookie-Page nutzt scroll-snap / fixed-position — alle viewport-shots kamen identisch zurueck (Header-Overlay). PIL-cut auf dem full-page PNG bypasst das Problem voellig. VW smoke-test (32 slices): per-slice: [0, 0, 2, 5, 5, 3, 4, 7, 4, 3, 4, 5, ...] 103 raw cookies → 79 unique nach dedup 14 vendor records (Google 9, Adobe-Familie 17, etc.) Jeder Slice hat eigenen Timestamp + SHA256 → ZIP-Anhang fuer juristische Beweiskette. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../services/cookie_screenshot_ocr.py | 125 ++++++++++++++++++ consent-tester/main.py | 58 +++++++- consent-tester/services/page_screenshot.py | 118 +++++++++++++++++ 3 files changed, 300 insertions(+), 1 deletion(-) diff --git a/backend-compliance/compliance/services/cookie_screenshot_ocr.py b/backend-compliance/compliance/services/cookie_screenshot_ocr.py index b6118137..2365c636 100644 --- a/backend-compliance/compliance/services/cookie_screenshot_ocr.py +++ b/backend-compliance/compliance/services/cookie_screenshot_ocr.py @@ -333,6 +333,131 @@ _VISION_PROMPT = ( ) +async def capture_cookie_evidence_slices( + cookie_url: str, check_id: str = "", + viewport_h: int = 1024, overlap_px: int = 200, max_slices: int = 40, + timeout_s: float = 180.0, +) -> dict: + """Capture a full-page screenshot and slice it (with overlap) in-memory. + + Why not scroll-based slicing in Playwright? VW's cookie-page uses + scroll-snap / fixed-position elements that defeat window.scrollTo — + all viewport screenshots came back identical (header overlay only). + A full-page screenshot bypasses scrolling entirely, and we slice the + PNG bytes locally via PIL to get the same overlapping evidence chain. + """ + if not cookie_url: + return {"slices": [], "error": "no url"} + try: + async with httpx.AsyncClient(timeout=timeout_s) as c: + r = await c.post( + f"{CONSENT_TESTER_URL}/capture-evidence", + json={"url": cookie_url, "check_id": check_id}, + timeout=timeout_s, + ) + r.raise_for_status() + data = r.json() + except Exception as e: + logger.warning("capture full-page evidence failed: %s", e) + return {"slices": [], "error": str(e)[:200]} + + png_b64 = data.get("png_b64", "") + if not png_b64: + return {"slices": [], "error": data.get("error", "no png")} + + try: + from PIL import Image + from io import BytesIO + import hashlib as _hl + png = _b64.b64decode(png_b64) + img = Image.open(BytesIO(png)).convert("RGB") + w, h = img.size + step = max(1, viewport_h - overlap_px) + slices: list[dict] = [] + idx = 0 + y = 0 + while y < h and idx < max_slices: + top = y + bot = min(y + viewport_h, h) + chunk = img.crop((0, top, w, bot)) + buf = BytesIO() + chunk.save(buf, format="PNG", optimize=True) + png_chunk = buf.getvalue() + slices.append({ + "idx": idx, + "ts": data.get("captured_at", ""), + "top_y": top, "bot_y": bot, + "sha256": _hl.sha256(png_chunk).hexdigest()[:16], + "png_b64": _b64.b64encode(png_chunk).decode("ascii"), + "png_size": len(png_chunk), + }) + y += step + idx += 1 + logger.info( + "Evidence-slices (PIL-cut): %s → %d slices (image %dx%d, " + "viewport=%d, overlap=%d)", + cookie_url, len(slices), w, h, viewport_h, overlap_px, + ) + return { + "slices": slices, + "total_height_px": h, + "width_px": w, + "accepted_banner": data.get("accepted_banner"), + "expanded": data.get("expanded"), + "url": data.get("url", cookie_url), + "captured_at": data.get("captured_at", ""), + } + except Exception as e: + logger.warning("PIL-slice failed: %s (%s)", + str(e) or "(no msg)", type(e).__name__) + return {"slices": [], "error": str(e)[:200]} + + +def ocr_slices_extract_cookies( + slices: list[dict], +) -> tuple[list[dict], dict]: + """Run Tesseract on each slice + parse + dedup by cookie name. + + Returns (cookies, stats) where stats has: + per_slice: [{idx, cookies_found, ts}] + total_raw, total_unique + """ + import base64 as _b64 + + per_slice: list[dict] = [] + all_cookies: list[dict] = [] + seen_names: set[str] = set() + for s in slices: + try: + png = _b64.b64decode(s.get("png_b64", "")) + except Exception: + continue + text = ocr_screenshot_via_tesseract(png) + chunk = parse_ocr_cookie_table(text) + per_slice.append({ + "idx": s.get("idx"), "ts": s.get("ts"), + "top_y": s.get("top_y"), "bot_y": s.get("bot_y"), + "cookies_found": len(chunk), + }) + for c in chunk: + nl = (c.get("name") or "").strip().lower() + if not nl or nl in seen_names: + continue + seen_names.add(nl) + all_cookies.append(c) + stats = { + "per_slice": per_slice, + "total_raw": sum(p["cookies_found"] for p in per_slice), + "total_unique": len(all_cookies), + "slices": len(slices), + } + logger.info( + "ocr_slices_extract_cookies: %d slices → %d raw → %d unique cookies", + stats["slices"], stats["total_raw"], stats["total_unique"], + ) + return all_cookies, stats + + async def capture_cookie_screenshot( cookie_url: str, check_id: str = "", timeout_s: float = 60.0, ) -> dict: diff --git a/consent-tester/main.py b/consent-tester/main.py index a3fec577..274938ec 100644 --- a/consent-tester/main.py +++ b/consent-tester/main.py @@ -16,7 +16,10 @@ from services.consent_scanner import run_consent_test, ConsentTestResult from services.authenticated_scanner import run_authenticated_test, AuthTestResult from services.playwright_scanner import scan_website_playwright from services.dsi_discovery import discover_dsi_documents, DSIDiscoveryResult -from services.page_screenshot import capture_page_evidence +from services.page_screenshot import ( + capture_page_evidence, + capture_page_overlapping_slices, +) from checks.banner_runner import map_scan_to_checks logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s") @@ -407,6 +410,59 @@ async def capture_evidence(req: EvidenceRequest): ) +# ── Evidence slices (overlapping scrolling screenshots) ───────────── + +class EvidenceSlicesRequest(BaseModel): + url: str + check_id: str = "" + viewport_h: int = 1024 + overlap_px: int = 200 + max_slices: int = 40 + + +class EvidenceSliceItem(BaseModel): + idx: int + ts: str + top_y: int + bot_y: int + sha256: str + png_b64: str + png_size: int + + +class EvidenceSlicesResponse(BaseModel): + url: str + total_height_px: int + width_px: int + accepted_banner: bool + expanded: int + slices: list[EvidenceSliceItem] + + +@app.post("/capture-evidence-slices", response_model=EvidenceSlicesResponse) +async def capture_evidence_slices(req: EvidenceSlicesRequest): + """Overlapping viewport-screenshots fuer lueckenlose Beweiskette. + + Jede Slice ueberlappt die vorherige um overlap_px Pixel — jeder Cookie + erscheint in mind. einem Bild, an Slice-Grenzen sogar in zwei. Dedup + nach Cookie-Name eliminiert die Doppel im Endresultat. + """ + logger.info("Capturing overlapping evidence slices for %s", req.url) + data = await capture_page_overlapping_slices( + req.url, check_id=req.check_id, + viewport_h=req.viewport_h, overlap_px=req.overlap_px, + max_slices=req.max_slices, + ) + return EvidenceSlicesResponse( + url=data["url"], + total_height_px=data["total_height_px"], + width_px=data["width_px"], + accepted_banner=data["accepted_banner"], + expanded=data["expanded"], + slices=[EvidenceSliceItem(**s) for s in data["slices"]], + ) + + # ── Admin: CMP discoveries (Phase E) ──────────────────────────────── @app.get("/cmp-discoveries") diff --git a/consent-tester/services/page_screenshot.py b/consent-tester/services/page_screenshot.py index 2503a8a0..6dfd1eaf 100644 --- a/consent-tester/services/page_screenshot.py +++ b/consent-tester/services/page_screenshot.py @@ -89,6 +89,124 @@ _DISMISS_BANNER_JS = r"""() => { }""" +async def capture_page_overlapping_slices( + url: str, + check_id: str = "", + viewport_h: int = 1024, + overlap_px: int = 200, + timeout_ms: int = 30000, + max_slices: int = 40, +) -> dict: + """Lückenlose Beweiskette: scrollt die Seite in viewport-grossen + Schritten und macht pro Schritt ein eigenes Screenshot. Jeder + Schritt ueberlappt mit dem vorherigen um `overlap_px` Pixel — so + erscheint jeder Cookie in mind. einem Bild, an Slice-Grenzen sogar + in zweien. Tesseract-Dedup nach Cookie-Name eliminiert Doppel. + + Vorteil ggue. full_page=True: + - Beweiskette VERIFIZIERBAR (Overlap dokumentiert Lueckenfreiheit) + - Tesseract pro Slice schneller + parallel ausfuehrbar + - Pro Slice eigener Timestamp + Sequenz-Nummer in der Mail-ZIP + + Returns dict: + slices: [{idx, ts, png_b64, top_y, bot_y, sha256}, ...] + total_height_px + width_px + url (final after redirect) + accepted_banner, expanded + """ + import base64 as _b64 + import hashlib + + out: dict = { + "slices": [], + "total_height_px": 0, + "width_px": 0, + "url": url, + "accepted_banner": False, + "expanded": 0, + } + ts_base = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + async with async_playwright() as p: + browser = await p.chromium.launch( + headless=True, args=["--no-sandbox", "--disable-dev-shm-usage"], + ) + ctx = await browser.new_context( + user_agent=_USER_AGENT, + viewport={"width": 1280, "height": viewport_h}, + locale="de-DE", timezone_id="Europe/Berlin", + ) + page = await ctx.new_page() + try: + await page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms) + await page.wait_for_timeout(3500) + try: + out["accepted_banner"] = bool(await page.evaluate(_DISMISS_BANNER_JS)) + if out["accepted_banner"]: + await page.wait_for_timeout(1500) + except Exception: + pass + try: + out["expanded"] = int(await page.evaluate(_EXPAND_ALL_JS) or 0) + if out["expanded"]: + await page.wait_for_timeout(1500) + except Exception: + pass + out["url"] = page.url + # Inject timestamp banner so the FIRST slice carries it. + try: + await page.evaluate(_TIMESTAMP_BANNER_JS, { + "url": out["url"], "ts": ts_base, "check_id": check_id or "—", + }) + except Exception: + pass + await page.wait_for_timeout(500) + # Measure total scroll height + width + dims = await page.evaluate( + "() => ({w: document.documentElement.scrollWidth, " + "h: document.documentElement.scrollHeight})" + ) + total_h = int(dims.get("h") or 0) + out["total_height_px"] = total_h + out["width_px"] = int(dims.get("w") or 0) + # Calculate scroll-step: viewport_h minus overlap. Each slice + # contains overlap_px pixels of the PREVIOUS slice's bottom. + step = max(1, viewport_h - overlap_px) + scroll_y = 0 + idx = 0 + while scroll_y < total_h and idx < max_slices: + # Scroll to position. Wait for any lazy content to render. + await page.evaluate(f"window.scrollTo(0, {scroll_y})") + await page.wait_for_timeout(400) + png = await page.screenshot( + full_page=False, type="png", timeout=timeout_ms, + ) + ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + top_y = scroll_y + bot_y = min(scroll_y + viewport_h, total_h) + sha = hashlib.sha256(png).hexdigest()[:16] + out["slices"].append({ + "idx": idx, + "ts": ts, + "top_y": top_y, + "bot_y": bot_y, + "sha256": sha, + "png_b64": _b64.b64encode(png).decode("ascii"), + "png_size": len(png), + }) + scroll_y += step + idx += 1 + logger.info( + "Overlapping screenshots: %d slices for %s (total_h=%d, " + "viewport=%d, overlap=%d)", + len(out["slices"]), out["url"], total_h, viewport_h, overlap_px, + ) + finally: + await ctx.close() + await browser.close() + return out + + async def capture_page_evidence( url: str, check_id: str = "",