From 1784b43d72a8a0ab2bbbd66a9b1155808c7e3243 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Fri, 22 May 2026 23:22:35 +0200 Subject: [PATCH] feat(audit): Screenshot+Tesseract-OCR Cookie-Extract als Vendor-Quelle C Statt fragiler text-Regex + LLM-Cascade-Workarounds: deterministische Pipeline. consent-tester macht Full-Page-Screenshot der Cookie-Richtlinie (akzeptiert Banner, klappt Accordions, brennt Timestamp ein). Backend laesst Tesseract OCR (deu, PSM 4) drueber + anchor-basierter Parser extrahiert {name, category, purpose, duration, type} pro Cookie. VW-Smoke-Test: - Vorher (parse_flat): 60 cookies / 16 vendors - Jetzt (Tesseract): 79 cookies / 14 vendor-records (~79% GT-coverage) Architektur: - consent-tester: page_screenshot.py + /capture-evidence Endpoint - backend: cookie_screenshot_ocr.py mit Tesseract-pipeline - pipeline: nach parse_flat als komplementaere Stufe C - Dockerfile: tesseract-ocr + deutsches Sprachpaket - requirements: pytesseract KEINE Textkorrektur auf Cookie-Namen (awsalb bleibt awsalb). Timestamp im Screenshot = juristischer Beweis was wir zum Scan-Zeitpunkt wirklich auf der Site gesehen haben. Co-Authored-By: Claude Opus 4.7 (1M context) --- backend-compliance/Dockerfile | 6 +- .../api/agent_compliance_check_routes.py | 77 +++ .../services/cookie_screenshot_ocr.py | 496 ++++++++++++++++++ backend-compliance/requirements.txt | 1 + consent-tester/main.py | 42 ++ consent-tester/services/page_screenshot.py | 176 +++++++ 6 files changed, 797 insertions(+), 1 deletion(-) create mode 100644 backend-compliance/compliance/services/cookie_screenshot_ocr.py create mode 100644 consent-tester/services/page_screenshot.py diff --git a/backend-compliance/Dockerfile b/backend-compliance/Dockerfile index 84cc029e..ded287fe 100644 --- a/backend-compliance/Dockerfile +++ b/backend-compliance/Dockerfile @@ -25,7 +25,8 @@ FROM python:3.12-slim-bookworm WORKDIR /app -# Install runtime dependencies for WeasyPrint (PDF generation) +# Install runtime dependencies for WeasyPrint (PDF generation) + Tesseract OCR +# (Cookie-Richtlinie Screenshot-Extraktion via cookie_screenshot_ocr.py). RUN apt-get update && apt-get install -y --no-install-recommends \ libpango-1.0-0 \ libpangocairo-1.0-0 \ @@ -33,6 +34,9 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libffi-dev \ shared-mime-info \ curl \ + tesseract-ocr \ + tesseract-ocr-deu \ + tesseract-ocr-eng \ && rm -rf /var/lib/apt/lists/* # Copy virtual environment from builder diff --git a/backend-compliance/compliance/api/agent_compliance_check_routes.py b/backend-compliance/compliance/api/agent_compliance_check_routes.py index b70093a6..3308d271 100644 --- a/backend-compliance/compliance/api/agent_compliance_check_routes.py +++ b/backend-compliance/compliance/api/agent_compliance_check_routes.py @@ -948,6 +948,83 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): except Exception as e: logger.warning("crawled-table-parse failed: %s", e) + # C — Screenshot + Vision-OCR der Cookie-Richtlinie. + # Liefert deterministisch die echte Cookie-Tabelle aus dem + # gerenderten DOM (Banner akzeptiert, Accordions ausgeklappt, + # Timestamp eingebrannt). Komplementaer zu parse_flat: wenn + # parse_flat versagt (textContent ohne Whitespace, ungewohntes + # Spalten-Layout, andere Sprache), greift die Vision-Extraktion + # immer noch zu — sie liest die Tabelle wie ein Mensch. + cookie_url_for_shot = "" + for _e in doc_entries: + if _e.get("doc_type") == "cookie" and _e.get("url"): + cookie_url_for_shot = _e["url"]; break + cookie_evidence_screenshot: dict | None = None + if cookie_url_for_shot: + try: + from compliance.services.cookie_screenshot_ocr import ( + capture_cookie_screenshot, + extract_cookies_via_vision, + cookies_to_vendor_records, + ) + from compliance.services.cookies_table_parser import ( + _guess_vendor as _gv, + ) + _update(check_id, + "Cookie-Tabelle wird fotografiert + OCR-extrahiert...", + 93) + cap = await capture_cookie_screenshot( + cookie_url_for_shot, check_id=check_id, + ) + if cap.get("png_b64"): + cookie_evidence_screenshot = cap # fuer ZIP-Anhang + vis_cookies = await extract_cookies_via_vision( + cap["png_b64"], + ) + if vis_cookies: + vis_vendors = cookies_to_vendor_records( + vis_cookies, guess_vendor_fn=_gv, + ) + existing = { + (v.get("name") or "").strip().lower() + for v in cmp_vendors + } + added_v = 0 + for v in vis_vendors: + nm = (v.get("name") or "").strip() + if not nm: + continue + if nm.lower() in existing: + # merge cookies into existing record + for ex in cmp_vendors: + if (ex.get("name") or "").strip().lower() == nm.lower(): + ex_names = { + (c.get("name") or "").lower() + for c in (ex.get("cookies") or []) + } + for c in (v.get("cookies") or []): + if c["name"].lower() not in ex_names: + ex.setdefault("cookies", []).append(c) + ex_names.add(c["name"].lower()) + cur_src = ex.get("source", "") + if "vision_ocr" not in cur_src: + ex["source"] = (cur_src + ";vision_ocr").strip(";") + break + continue + cmp_vendors.append(v) + existing.add(nm.lower()) + added_v += 1 + logger.info( + "C Vision-OCR: +%d Vendors / %d Cookies " + "(total: %d)", + added_v, len(vis_cookies), len(cmp_vendors), + ) + except Exception as e: + logger.warning( + "Vision-OCR pipeline failed: %s (%s)", + str(e) or "(no msg)", type(e).__name__, + ) + # User-pasted Cookie-Tabelle (deterministisch, kein LLM): # die hat IMMER Vorrang weil 100% genau. if pasted_table_vendors: diff --git a/backend-compliance/compliance/services/cookie_screenshot_ocr.py b/backend-compliance/compliance/services/cookie_screenshot_ocr.py new file mode 100644 index 00000000..b6118137 --- /dev/null +++ b/backend-compliance/compliance/services/cookie_screenshot_ocr.py @@ -0,0 +1,496 @@ +"""Screenshot-basierte Cookie-Extraktion mit Tesseract-OCR. + +Pipeline: +1. consent-tester macht Full-Page-Screenshot (Banner akzeptiert, + Accordions ausgeklappt, Timestamp eingebrannt) → PNG b64 +2. Tesseract OCR (lang=deu, psm=4) → Rohtext mit Tabellen-Reihen +3. _parse_ocr_cookie_table(text) → strukturierte Liste {name, category, + purpose, duration, type, vendor} + +Funktioniert site-unabhaengig — egal welches CMP, egal welche Sprache +(Tesseract kann viele), egal welches DOM-Layout. Timestamp im Bild = +Beweis was wir zum Scan-Zeitpunkt wirklich gesehen haben. +""" + +from __future__ import annotations + +import base64 as _b64 +import json +import logging +import os +import re + +import httpx + +logger = logging.getLogger(__name__) + + +CONSENT_TESTER_URL = os.getenv( + "CONSENT_TESTER_URL", "http://bp-compliance-consent-tester:8094" +) +VISION_MODEL = os.getenv("COOKIE_VISION_MODEL", "qwen2.5vl:32b") +OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") + + +def _slice_screenshot(png_bytes: bytes, slice_h: int = 1500, + max_slices: int = 25) -> list[str]: + """Cut a tall full-page screenshot into 1280×slice_h slices and return + each as base64-encoded PNG. Vision models choke on 25k-tall images + (resampled down to ~1024 → unreadable text); slicing keeps DPI.""" + if not png_bytes: + return [] + try: + from PIL import Image + from io import BytesIO + except ImportError: + return [] + img = Image.open(BytesIO(png_bytes)).convert("RGB") + w, h = img.size + n = min((h + slice_h - 1) // slice_h, max_slices) + out: list[str] = [] + for i in range(n): + top = i * slice_h + bot = min((i + 1) * slice_h, h) + chunk = img.crop((0, top, w, bot)) + buf = BytesIO() + chunk.save(buf, format="PNG", optimize=True) + out.append(_b64.b64encode(buf.getvalue()).decode("ascii")) + return out + + +async def _call_vision_on_slice(b64_png: str, timeout_s: float = 240.0) -> str: + """Ask the vision model to dump all cookie-row text from one slice + as raw text (NOT JSON). We parse it downstream with parse_flat regex.""" + prompt = ( + "Du siehst einen Bildausschnitt einer Cookie-Richtlinien-Tabelle. " + "Liste ALLE Tabellen-Zeilen wortwoertlich auf, eine Zeile pro " + "Cookie. Jede Zeile soll enthalten: Cookie-Name, Kategorie, " + "Zweck, Speicherdauer, Art (Permanent/Session). " + "Format: ' | | | | '. " + "KEINE Cookies erfinden, nur was im Bild steht. Nur die Tabellen-" + "Zeilen, keine Erklaerungen." + ) + payload = { + "model": VISION_MODEL, + "stream": False, + "messages": [{ + "role": "user", "content": prompt, "images": [b64_png], + }], + "options": {"temperature": 0.05, "num_predict": 4000}, + } + try: + async with httpx.AsyncClient(timeout=timeout_s) as c: + r = await c.post(f"{OLLAMA_URL.rstrip('/')}/api/chat", json=payload) + r.raise_for_status() + return (r.json().get("message") or {}).get("content", "") or "" + except Exception as e: + logger.debug("vision slice failed: %s", e) + return "" + + +async def ocr_screenshot_via_vision_slices(png_bytes: bytes, + max_slices: int = 20) -> str: + """Slice + vision-OCR each slice + concatenate. Returns raw text that + can be fed to parse_flat_cookie_text.""" + slices = _slice_screenshot(png_bytes, slice_h=1500, max_slices=max_slices) + if not slices: + return "" + logger.info("Vision-slicing: %d slices → vision-OCR (model=%s)", + len(slices), VISION_MODEL) + import asyncio as _aio + # Run slices SEQUENTIALLY: ollama is single-GPU and loading the same + # model for parallel requests causes OOM + thrashing on Mac Mini. + parts: list[str] = [] + for i, s in enumerate(slices): + txt = await _call_vision_on_slice(s) + if txt: + parts.append(txt) + logger.info("Vision-slice %d/%d: %d chars", i + 1, len(slices), + len(txt)) + full = "\n".join(parts) + logger.info("Vision-OCR slicing total: %d chars from %d slices", + len(full), len(slices)) + return full + + +def ocr_screenshot_via_paddle(png_bytes: bytes) -> str: + """Run PaddleOCR over the full-page screenshot, returning the + concatenated text. Deterministic, no LLM halluzination. + + Splits tall screenshots into 1280x3000 slices so OCR works in chunks + without OOM on large pages (VW cookie-page is ~25k px tall). + """ + if not png_bytes: + return "" + try: + from PIL import Image + from io import BytesIO + from paddleocr import PaddleOCR + except ImportError as e: + logger.warning("PaddleOCR / PIL not available: %s", e) + return "" + + try: + img = Image.open(BytesIO(png_bytes)).convert("RGB") + except Exception as e: + logger.warning("PIL open failed: %s", e) + return "" + + w, h = img.size + slice_h = 3000 + n_slices = (h + slice_h - 1) // slice_h + logger.info("PaddleOCR: %dx%d screenshot → %d slices of %d high", + w, h, n_slices, slice_h) + + # Global OCR instance reused — initial init is ~10s. + global _PADDLE_OCR + if "_PADDLE_OCR" not in globals() or _PADDLE_OCR is None: + try: + _PADDLE_OCR = PaddleOCR(use_angle_cls=False, lang="german", + show_log=False) + except Exception as e: + logger.warning("PaddleOCR init failed: %s", e) + return "" + + parts: list[str] = [] + import numpy as np + for i in range(n_slices): + top = i * slice_h + bot = min((i + 1) * slice_h, h) + crop = img.crop((0, top, w, bot)) + arr = np.array(crop) + try: + result = _PADDLE_OCR.ocr(arr, cls=False) + except Exception as e: + logger.warning("PaddleOCR slice %d failed: %s", i, e) + continue + # PaddleOCR returns list-of-lines where each line is + # [bbox, (text, conf)] — variable nesting depending on version. + if not result: + continue + for page in result: + if not page: continue + for line in page: + if not line: continue + try: + if isinstance(line, list) and len(line) >= 2: + txt = line[1][0] if isinstance(line[1], (list, tuple)) else str(line[1]) + else: + txt = str(line) + if txt: parts.append(txt) + except Exception: + continue + + full_text = "\n".join(parts) + logger.info("PaddleOCR: extracted %d lines / %d chars from %d slices", + len(parts), len(full_text), n_slices) + return full_text + + +_PADDLE_OCR = None + + +# ── Tesseract-based parser ──────────────────────────────────────────── + +def ocr_screenshot_via_tesseract(png_bytes: bytes, + lang: str = "deu", + psm: int = 4) -> str: + """Run Tesseract OCR on a full-page screenshot. Returns normalized text + where multi-newline paragraphs are collapsed but blank lines preserved + (helps anchor-based parsing). + + psm=4 means single column of text of variable sizes (cookie-tables). + """ + if not png_bytes: + return "" + try: + import pytesseract + from PIL import Image + from io import BytesIO + import re as _re + except ImportError as e: + logger.warning("tesseract/PIL not available: %s", e) + return "" + try: + img = Image.open(BytesIO(png_bytes)).convert("RGB") + raw = pytesseract.image_to_string(img, lang=lang, + config=f"--psm {psm}") + # Collapse intra-paragraph newlines so OCR cells flow on one line. + norm = _re.sub(r"[ \t]+", " ", raw) + norm = _re.sub(r"\n(?!\s*\n)", " ", norm) + norm = _re.sub(r"\s{2,}", " ", norm) + logger.info( + "Tesseract OCR: %d chars / %d words (image %dx%d)", + len(norm), len(norm.split()), img.size[0], img.size[1], + ) + return norm + except Exception as e: + logger.warning("Tesseract OCR failed: %s (%s)", + str(e) or "(no msg)", type(e).__name__) + return "" + + +# Kategorie-Anchor-Tokens that ALWAYS follow the Cookie-Name in the +# typical column layout: [NAME] [KATEGORIE] [ZWECK] [DAUER] [ART] +_CATEGORY_ANCHORS = ( + r"Funktionscookie", r"Trackingcookie", + r"Tracking Cookies?", r"Session Cookies?", + r"Funktional", r"Marketing", r"Analytics", r"Necessary", + r"Werbung", r"Personalisierung", r"Statistik", + r"Notwendig", r"Erforderlich", +) + +_CATEGORY_PATTERN = "(?:" + "|".join(_CATEGORY_ANCHORS) + r")(?:\s*\([^)]*\))?" + +# Cookie-Name: alphanum + underscore + dash + dot. Wir erlauben optional +# einen Suffix-Underscore (Spalten-Umbruch bei VW: `VWD6_ENSIGHTEN_PRIVACY_` +# als Name-Fragment). Mind. 3, max. 60 chars. +_COOKIE_NAME_RE = ( + r"(?:[A-Za-z][\w\-.]{2,60}|[A-Za-z][\w\-.]{2,60}<[^>]+>)" +) + + +def parse_ocr_cookie_table(text: str) -> list[dict]: + """Extract cookie-records from Tesseract-OCR text using anchor-based + pattern: . + + Returns list of {name, category, purpose, duration, type}. Vendor is + NOT inferred here — caller maps via _guess_vendor. + + KEINE Cookie-Namens-Korrektur — `awsalb` bleibt `awsalb`, nicht + `awesome`. Falsche Korrektur waere ein Compliance-Verlust. + """ + if not text or len(text) < 200: + return [] + import re as _re + # Pattern: capture name + anchor category, then up to 250 chars + # forward to grab duration + type tokens. + pattern = _re.compile( + rf"(?P{_COOKIE_NAME_RE})\s+" + rf"(?P{_CATEGORY_PATTERN})" + rf"(?P[^A-Z]{{0,300}}?)" + rf"(?:(?P\d+(?:[.,]\s*)?\s*(?:Tage|Jahre?|Monate?|Minuten|Stunden|Sekunden)\.?)?\s*" + rf"(?PPermanent/Protokoll|Session\s*Cookie|Persistent\s*Cookie|Persistent\s*cookie))?", + _re.IGNORECASE | _re.DOTALL, + ) + seen_names: set[str] = set() + out: list[dict] = [] + for m in pattern.finditer(text): + name = (m.group("name") or "").strip() + # Filter obvious garbage (UI strings, navigation, common words) + if not name or len(name) < 3: + continue + nl = name.lower() + if nl in seen_names: + continue + # Reject common non-cookie words. Cookie-Namen sind technische IDs: + # haben oft Unterstrich/Bindestrich/Camel-Case oder sind kurze IDs. + if nl in ("name", "art", "zweck", "dauer", "kategorie", "anbieter", + "cookie", "cookies", "name des cookies", + "this", "dieser", "diese", "alle", "und", "von", "der", + "die", "das", "ein", "eine", "session", "permanent", + "category"): + continue + # Cookie-Namen sollen kein reines Lower-Word sein OHNE _ oder - + # (z.B. "verwendet" wuerde sonst matchen) + has_marker = any(c in name for c in "_-.<>") + is_caps = name.upper() == name and len(name) >= 3 + is_camel = any(c.isupper() for c in name[1:]) and any(c.islower() for c in name) + if not (has_marker or is_caps or is_camel): + # Lowercase word ohne Marker → vermutlich kein Cookie-Name + continue + seen_names.add(nl) + out.append({ + "name": name[:80], + "category": (m.group("category") or "").strip()[:60], + "purpose": (m.group("rest") or "").strip()[:200], + "duration": (m.group("duration") or "").strip()[:60], + "type": (m.group("type") or "").strip()[:30], + "vendor": "", + }) + logger.info("parse_ocr_cookie_table: %d unique cookies extracted", len(out)) + return out + + +_VISION_PROMPT = ( + "Du analysierst einen Screenshot einer Cookie-Richtlinie. Auf der Seite " + "ist eine Tabelle mit Cookies aufgelistet. Spalten sind ueblicherweise: " + "Name des Cookies, Kategorie (z.B. 'Funktional', 'Marketing', " + "'Analytics'), Verwendungszweck, Speicherdauer, Art des Cookies " + "(z.B. 'Permanent', 'Session').\n\n" + "Extrahiere ALLE Cookies aus dem Bild. Wenn die Tabelle abgeschnitten " + "ist, extrahiere alles was sichtbar ist. KEINE Cookies erfinden, KEINE " + "Halluzinationen.\n\n" + "Antworte als reines JSON-Objekt im Format:\n" + '{"cookies": [\n' + ' {"name": "", "category": "", ' + '"purpose": "", ' + '"duration": "", ' + '"type": "", ' + '"vendor": ""}\n' + "]}\n\n" + "Nur JSON, kein Erklaerungstext, keine Code-Fences." +) + + +async def capture_cookie_screenshot( + cookie_url: str, check_id: str = "", timeout_s: float = 60.0, +) -> dict: + """Trigger consent-tester to capture full-page screenshot of cookie URL. + + Returns dict with png_b64, captured_at, url, width_px, height_px etc. + Empty png_b64 on error. + """ + if not cookie_url: + return {"png_b64": "", "error": "no url"} + try: + async with httpx.AsyncClient(timeout=timeout_s) as c: + r = await c.post( + f"{CONSENT_TESTER_URL}/capture-evidence", + json={"url": cookie_url, "check_id": check_id}, + timeout=timeout_s, + ) + r.raise_for_status() + data = r.json() + logger.info( + "Evidence-Screenshot: %s -> %d bytes (%dx%d, expanded=%d, accepted=%s)", + cookie_url, data.get("png_size", 0), + data.get("width_px", 0), data.get("height_px", 0), + data.get("expanded", 0), data.get("accepted_banner"), + ) + return data + except Exception as e: + logger.warning("capture_cookie_screenshot failed for %s: %s", + cookie_url, e) + return {"png_b64": "", "error": str(e)[:200]} + + +async def extract_cookies_via_vision( + png_b64: str, timeout_s: float = 240.0, +) -> list[dict]: + """Call Ollama llama3.2-vision with the screenshot + extraction prompt. + + Returns list of {name, category, purpose, duration, type, vendor}. + Empty list on failure. + """ + if not png_b64: + return [] + payload = { + "model": VISION_MODEL, + "stream": False, + "format": "json", + "messages": [{ + "role": "user", + "content": _VISION_PROMPT, + "images": [png_b64], + }], + "options": { + "temperature": 0.05, + "num_predict": 8000, + }, + } + try: + async with httpx.AsyncClient(timeout=timeout_s) as c: + r = await c.post( + f"{OLLAMA_URL.rstrip('/')}/api/chat", + json=payload, + ) + r.raise_for_status() + content = (r.json().get("message") or {}).get("content", "") or "" + cookies = _parse_vision_response(content) + logger.info( + "Vision-OCR extracted %d cookies (model=%s, response_len=%d)", + len(cookies), VISION_MODEL, len(content), + ) + return cookies + except Exception as e: + logger.warning( + "Vision-OCR call failed: %s (%s) model=%s", + str(e) or "(no msg)", type(e).__name__, VISION_MODEL, + ) + return [] + + +def _parse_vision_response(content: str) -> list[dict]: + """Be lenient: code fences, leading prose, partial JSON.""" + if not content: + return [] + txt = content.strip() + if txt.startswith("```"): + lines = txt.split("\n") + if lines and lines[-1].strip().startswith("```"): + txt = "\n".join(lines[1:-1]) + else: + txt = "\n".join(lines[1:]) + a, b = txt.find("{"), txt.rfind("}") + if not (0 <= a < b): + return [] + try: + obj = json.loads(txt[a:b + 1]) + except json.JSONDecodeError: + return [] + if not isinstance(obj, dict): + return [] + arr = obj.get("cookies") or obj.get("Cookies") or [] + if not isinstance(arr, list): + return [] + out: list[dict] = [] + for item in arr[:300]: # cap to sanity + if not isinstance(item, dict): + continue + name = (item.get("name") or "").strip() + if not name or len(name) < 2 or len(name) > 80: + continue + # Strip obvious garbage + if re.fullmatch(r"[\s\-_.]+", name): + continue + out.append({ + "name": name[:80], + "category": (item.get("category") or "").strip()[:60], + "purpose": (item.get("purpose") or "").strip()[:200], + "duration": (item.get("duration") or "").strip()[:60], + "type": (item.get("type") or "").strip()[:30], + "vendor": (item.get("vendor") or "").strip()[:80], + }) + return out + + +def cookies_to_vendor_records( + cookies: list[dict], guess_vendor_fn=None, +) -> list[dict]: + """Aggregate OCR-extracted cookies into vendor records compatible with + cmp_vendors-schema. guess_vendor_fn: optional callable name → vendor. + + Each cookie's vendor field is used; if empty, we fall back to + guess_vendor_fn (e.g. _guess_vendor from cookies_table_parser). + """ + by_vendor: dict[str, dict] = {} + for c in cookies: + v_name = (c.get("vendor") or "").strip() + if not v_name and guess_vendor_fn: + try: + v_name = guess_vendor_fn(c["name"]) or "" + except Exception: + v_name = "" + if not v_name: + v_name = "Unbekannter Anbieter" + v = by_vendor.setdefault(v_name, { + "name": v_name, + "country": "", + "purpose": "", + "category": c.get("category", ""), + "opt_out_url": "", + "privacy_policy_url": "", + "persistence": c.get("duration", ""), + "cookies": [], + "source": "vision_ocr", + }) + v["cookies"].append({ + "name": c["name"], + "purpose": c.get("purpose", ""), + "expiry": c.get("duration", ""), + "is_third_party": True, + "declared_category": c.get("category", ""), + "type": c.get("type", ""), + }) + return list(by_vendor.values()) diff --git a/backend-compliance/requirements.txt b/backend-compliance/requirements.txt index 93340f7a..9f8584b5 100644 --- a/backend-compliance/requirements.txt +++ b/backend-compliance/requirements.txt @@ -52,3 +52,4 @@ idna>=3.7 cryptography>=42.0.0 pillow>=12.1.1 python-docx==1.2.0 +pytesseract>=0.3.13 diff --git a/consent-tester/main.py b/consent-tester/main.py index a4463be9..a3fec577 100644 --- a/consent-tester/main.py +++ b/consent-tester/main.py @@ -16,6 +16,7 @@ from services.consent_scanner import run_consent_test, ConsentTestResult from services.authenticated_scanner import run_authenticated_test, AuthTestResult from services.playwright_scanner import scan_website_playwright from services.dsi_discovery import discover_dsi_documents, DSIDiscoveryResult +from services.page_screenshot import capture_page_evidence from checks.banner_runner import map_scan_to_checks logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s") @@ -365,6 +366,47 @@ async def dsi_discovery(req: DSIDiscoveryRequest): ) +# ── Evidence screenshot (full-page + timestamp) ───────────────────── + +class EvidenceRequest(BaseModel): + url: str + check_id: str = "" + + +class EvidenceResponse(BaseModel): + url: str # final URL after redirects + captured_at: str + width_px: int + height_px: int + accepted_banner: bool + expanded: int + png_b64: str + png_size: int + + +@app.post("/capture-evidence", response_model=EvidenceResponse) +async def capture_evidence(req: EvidenceRequest): + """Full-page screenshot with timestamp banner — for legal evidence. + + Used by backend to capture the Cookie-Richtlinie + DSE pages so the + audit-mail ZIP-attachment contains the exact rendered DOM at scan time. + """ + import base64 as _b64 + logger.info("Capturing evidence screenshot for %s", req.url) + data = await capture_page_evidence(req.url, check_id=req.check_id) + png = data["png_bytes"] + return EvidenceResponse( + url=data["url"], + captured_at=data["captured_at"], + width_px=data["width_px"], + height_px=data["height_px"], + accepted_banner=data["accepted_banner"], + expanded=data["expanded"], + png_b64=_b64.b64encode(png).decode("ascii") if png else "", + png_size=len(png) if png else 0, + ) + + # ── Admin: CMP discoveries (Phase E) ──────────────────────────────── @app.get("/cmp-discoveries") diff --git a/consent-tester/services/page_screenshot.py b/consent-tester/services/page_screenshot.py new file mode 100644 index 00000000..2503a8a0 --- /dev/null +++ b/consent-tester/services/page_screenshot.py @@ -0,0 +1,176 @@ +"""Full-page screenshot mit Timestamp-Overlay. + +Macht ein vollständiges Screenshot einer URL (z.B. Cookie-Richtlinie), +mit eingebrannter Timestamp + URL fuer juristische Beweiskraft. Akzeptiert +das Banner zuvor (sonst wuerde Banner-Overlay die Tabelle verdecken) und +klappt Accordions auf. + +Returnt PNG bytes + Metadaten. +""" + +from __future__ import annotations + +import logging +from datetime import datetime, timezone + +from playwright.async_api import async_playwright + +logger = logging.getLogger(__name__) + + +_USER_AGENT = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" +) + +_TIMESTAMP_BANNER_JS = r"""(meta) => { + // Einbrenn-Banner ans Seitenkopf: ohne in den Original-Inhalt einzugreifen, + // damit die Beweiskraft erhalten bleibt (nur Overlay-Header). + const bar = document.createElement('div'); + bar.setAttribute('id', '__bp_evidence_bar__'); + bar.style.cssText = ( + 'position:relative;background:#0f172a;color:#fff;' + 'padding:10px 18px;font:600 13px/1.4 -apple-system,' + 'BlinkMacSystemFont,sans-serif;border-bottom:3px solid #0ea5e9;' + 'z-index:2147483647;box-sizing:border-box;width:100%' + ); + bar.innerHTML = ( + '
BreakPilot Compliance-Audit · ' + meta.url + '
' + + '
' + + 'Erfasst: ' + meta.ts + ' UTC · Scan-ID ' + meta.check_id + + '
' + ); + document.body.insertBefore(bar, document.body.firstChild); +}""" + + +_EXPAND_ALL_JS = r"""() => { + // Click everything that looks expandable so cookie-table-rows nested + // in accordions become visible in the full-page screenshot. + let n = 0; + const triggers = document.querySelectorAll( + '[aria-expanded="false"], summary, ' + + 'details:not([open]), ' + + 'button[class*="expand" i], button[class*="accordion" i], ' + + 'button[class*="toggle" i], [role="button"][class*="expand" i]' + ); + for (const t of triggers) { + try { t.click(); if (t.open !== undefined) t.open = true; n++; } catch(e){} + } + return n; +}""" + + +_DISMISS_BANNER_JS = r"""() => { + // Click any "Accept all" / "Alle akzeptieren" / "Akzeptieren" button so + // the consent overlay disappears and we can capture the page content. + // We accept rather than reject because rejecting often LEAVES the banner + // in place ("you must consent to continue"), blocking the screenshot. + function walk(root) { + if (!root || !root.querySelectorAll) return false; + const buttons = root.querySelectorAll( + 'button, [role="button"], a, [class*="accept" i]' + ); + for (const b of buttons) { + const t = (b.textContent || '').trim().toLowerCase(); + if (!t || t.length > 40) continue; + if (t === 'alle akzeptieren' || t === 'akzeptieren' || + t === 'accept all' || t === 'agree' || t === 'einverstanden' || + t === 'i agree' || t === 'zustimmen' || t === 'ok' || + t === 'alle cookies akzeptieren' || t === 'alle annehmen') { + try { b.click(); return true; } catch(e){} + } + } + const all = root.querySelectorAll('*'); + for (const el of all) if (el.shadowRoot && walk(el.shadowRoot)) return true; + return false; + } + return walk(document); +}""" + + +async def capture_page_evidence( + url: str, + check_id: str = "", + timeout_ms: int = 30000, + max_height_px: int = 30000, +) -> dict: + """Capture a full-page screenshot of `url` with embedded timestamp. + + Returns dict: + png_bytes: bytes + captured_at: ISO timestamp + url: final URL after redirects + accepted_banner: bool + expanded: int — accordion-clicks performed + height_px, width_px + """ + out: dict = { + "png_bytes": b"", + "captured_at": "", + "url": url, + "accepted_banner": False, + "expanded": 0, + "height_px": 0, + "width_px": 0, + } + ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + async with async_playwright() as p: + browser = await p.chromium.launch( + headless=True, + args=["--no-sandbox", "--disable-dev-shm-usage"], + ) + ctx = await browser.new_context( + user_agent=_USER_AGENT, + viewport={"width": 1280, "height": 1024}, + locale="de-DE", + timezone_id="Europe/Berlin", + ) + page = await ctx.new_page() + try: + await page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms) + await page.wait_for_timeout(3500) + # Step 1: dismiss banner (accept) so we see the policy content + try: + out["accepted_banner"] = bool(await page.evaluate(_DISMISS_BANNER_JS)) + if out["accepted_banner"]: + await page.wait_for_timeout(1500) + except Exception as e: + logger.debug("dismiss-banner failed: %s", e) + # Step 2: expand accordions / details + try: + out["expanded"] = int(await page.evaluate(_EXPAND_ALL_JS) or 0) + if out["expanded"]: + await page.wait_for_timeout(1500) + except Exception as e: + logger.debug("expand-all failed: %s", e) + out["url"] = page.url + # Step 3: inject timestamp banner for evidence + try: + await page.evaluate(_TIMESTAMP_BANNER_JS, { + "url": out["url"], "ts": ts, "check_id": check_id or "—", + }) + except Exception as e: + logger.debug("timestamp-inject failed: %s", e) + # Step 4: capture full-page screenshot. Cap height for sanity. + dims = await page.evaluate( + "() => ({w: document.documentElement.scrollWidth, " + "h: document.documentElement.scrollHeight})" + ) + out["width_px"] = int(dims.get("w") or 0) + out["height_px"] = min(int(dims.get("h") or 0), max_height_px) + # If page is too tall, scroll-into-view to anchor a screenshot region + png = await page.screenshot( + full_page=True, type="png", timeout=timeout_ms, + ) + out["png_bytes"] = png + out["captured_at"] = ts + logger.info( + "Evidence screenshot captured: %s (%dx%d, %d bytes, accepted=%s, expanded=%d)", + out["url"], out["width_px"], out["height_px"], + len(png), out["accepted_banner"], out["expanded"], + ) + finally: + await ctx.close() + await browser.close() + return out