"""Screenshot-basierte Cookie-Extraktion mit Tesseract-OCR. Pipeline: 1. consent-tester macht Full-Page-Screenshot (Banner akzeptiert, Accordions ausgeklappt, Timestamp eingebrannt) → PNG b64 2. Tesseract OCR (lang=deu, psm=4) → Rohtext mit Tabellen-Reihen 3. _parse_ocr_cookie_table(text) → strukturierte Liste {name, category, purpose, duration, type, vendor} Funktioniert site-unabhaengig — egal welches CMP, egal welche Sprache (Tesseract kann viele), egal welches DOM-Layout. Timestamp im Bild = Beweis was wir zum Scan-Zeitpunkt wirklich gesehen haben. """ from __future__ import annotations import base64 as _b64 import json import logging import os import re import httpx logger = logging.getLogger(__name__) CONSENT_TESTER_URL = os.getenv( "CONSENT_TESTER_URL", "http://bp-compliance-consent-tester:8094" ) VISION_MODEL = os.getenv("COOKIE_VISION_MODEL", "qwen2.5vl:32b") OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") def _slice_screenshot(png_bytes: bytes, slice_h: int = 1500, max_slices: int = 25) -> list[str]: """Cut a tall full-page screenshot into 1280×slice_h slices and return each as base64-encoded PNG. Vision models choke on 25k-tall images (resampled down to ~1024 → unreadable text); slicing keeps DPI.""" if not png_bytes: return [] try: from PIL import Image from io import BytesIO except ImportError: return [] img = Image.open(BytesIO(png_bytes)).convert("RGB") w, h = img.size n = min((h + slice_h - 1) // slice_h, max_slices) out: list[str] = [] for i in range(n): top = i * slice_h bot = min((i + 1) * slice_h, h) chunk = img.crop((0, top, w, bot)) buf = BytesIO() chunk.save(buf, format="PNG", optimize=True) out.append(_b64.b64encode(buf.getvalue()).decode("ascii")) return out async def _call_vision_on_slice(b64_png: str, timeout_s: float = 240.0) -> str: """Ask the vision model to dump all cookie-row text from one slice as raw text (NOT JSON). We parse it downstream with parse_flat regex.""" prompt = ( "Du siehst einen Bildausschnitt einer Cookie-Richtlinien-Tabelle. " "Liste ALLE Tabellen-Zeilen wortwoertlich auf, eine Zeile pro " "Cookie. Jede Zeile soll enthalten: Cookie-Name, Kategorie, " "Zweck, Speicherdauer, Art (Permanent/Session). " "Format: ' | | | | '. " "KEINE Cookies erfinden, nur was im Bild steht. Nur die Tabellen-" "Zeilen, keine Erklaerungen." ) payload = { "model": VISION_MODEL, "stream": False, "messages": [{ "role": "user", "content": prompt, "images": [b64_png], }], "options": {"temperature": 0.05, "num_predict": 4000}, } try: async with httpx.AsyncClient(timeout=timeout_s) as c: r = await c.post(f"{OLLAMA_URL.rstrip('/')}/api/chat", json=payload) r.raise_for_status() return (r.json().get("message") or {}).get("content", "") or "" except Exception as e: logger.debug("vision slice failed: %s", e) return "" async def ocr_screenshot_via_vision_slices(png_bytes: bytes, max_slices: int = 20) -> str: """Slice + vision-OCR each slice + concatenate. Returns raw text that can be fed to parse_flat_cookie_text.""" slices = _slice_screenshot(png_bytes, slice_h=1500, max_slices=max_slices) if not slices: return "" logger.info("Vision-slicing: %d slices → vision-OCR (model=%s)", len(slices), VISION_MODEL) import asyncio as _aio # Run slices SEQUENTIALLY: ollama is single-GPU and loading the same # model for parallel requests causes OOM + thrashing on Mac Mini. parts: list[str] = [] for i, s in enumerate(slices): txt = await _call_vision_on_slice(s) if txt: parts.append(txt) logger.info("Vision-slice %d/%d: %d chars", i + 1, len(slices), len(txt)) full = "\n".join(parts) logger.info("Vision-OCR slicing total: %d chars from %d slices", len(full), len(slices)) return full def ocr_screenshot_via_paddle(png_bytes: bytes) -> str: """Run PaddleOCR over the full-page screenshot, returning the concatenated text. Deterministic, no LLM halluzination. Splits tall screenshots into 1280x3000 slices so OCR works in chunks without OOM on large pages (VW cookie-page is ~25k px tall). """ if not png_bytes: return "" try: from PIL import Image from io import BytesIO from paddleocr import PaddleOCR except ImportError as e: logger.warning("PaddleOCR / PIL not available: %s", e) return "" try: img = Image.open(BytesIO(png_bytes)).convert("RGB") except Exception as e: logger.warning("PIL open failed: %s", e) return "" w, h = img.size slice_h = 3000 n_slices = (h + slice_h - 1) // slice_h logger.info("PaddleOCR: %dx%d screenshot → %d slices of %d high", w, h, n_slices, slice_h) # Global OCR instance reused — initial init is ~10s. global _PADDLE_OCR if "_PADDLE_OCR" not in globals() or _PADDLE_OCR is None: try: _PADDLE_OCR = PaddleOCR(use_angle_cls=False, lang="german", show_log=False) except Exception as e: logger.warning("PaddleOCR init failed: %s", e) return "" parts: list[str] = [] import numpy as np for i in range(n_slices): top = i * slice_h bot = min((i + 1) * slice_h, h) crop = img.crop((0, top, w, bot)) arr = np.array(crop) try: result = _PADDLE_OCR.ocr(arr, cls=False) except Exception as e: logger.warning("PaddleOCR slice %d failed: %s", i, e) continue # PaddleOCR returns list-of-lines where each line is # [bbox, (text, conf)] — variable nesting depending on version. if not result: continue for page in result: if not page: continue for line in page: if not line: continue try: if isinstance(line, list) and len(line) >= 2: txt = line[1][0] if isinstance(line[1], (list, tuple)) else str(line[1]) else: txt = str(line) if txt: parts.append(txt) except Exception: continue full_text = "\n".join(parts) logger.info("PaddleOCR: extracted %d lines / %d chars from %d slices", len(parts), len(full_text), n_slices) return full_text _PADDLE_OCR = None # ── Tesseract-based parser ──────────────────────────────────────────── def ocr_screenshot_via_tesseract(png_bytes: bytes, lang: str = "deu", psm: int = 4) -> str: """Run Tesseract OCR on a full-page screenshot. Returns normalized text where multi-newline paragraphs are collapsed but blank lines preserved (helps anchor-based parsing). psm=4 means single column of text of variable sizes (cookie-tables). """ if not png_bytes: return "" try: import pytesseract from PIL import Image from io import BytesIO import re as _re except ImportError as e: logger.warning("tesseract/PIL not available: %s", e) return "" try: img = Image.open(BytesIO(png_bytes)).convert("RGB") raw = pytesseract.image_to_string(img, lang=lang, config=f"--psm {psm}") # Collapse intra-paragraph newlines so OCR cells flow on one line. norm = _re.sub(r"[ \t]+", " ", raw) norm = _re.sub(r"\n(?!\s*\n)", " ", norm) norm = _re.sub(r"\s{2,}", " ", norm) logger.info( "Tesseract OCR: %d chars / %d words (image %dx%d)", len(norm), len(norm.split()), img.size[0], img.size[1], ) return norm except Exception as e: logger.warning("Tesseract OCR failed: %s (%s)", str(e) or "(no msg)", type(e).__name__) return "" # Kategorie-Anchor-Tokens that ALWAYS follow the Cookie-Name in the # typical column layout: [NAME] [KATEGORIE] [ZWECK] [DAUER] [ART] _CATEGORY_ANCHORS = ( r"Funktionscookie", r"Trackingcookie", r"Tracking Cookies?", r"Session Cookies?", r"Funktional", r"Marketing", r"Analytics", r"Necessary", r"Werbung", r"Personalisierung", r"Statistik", r"Notwendig", r"Erforderlich", ) _CATEGORY_PATTERN = "(?:" + "|".join(_CATEGORY_ANCHORS) + r")(?:\s*\([^)]*\))?" # Cookie-Name: alphanum + underscore + dash + dot. Wir erlauben optional # einen Suffix-Underscore (Spalten-Umbruch bei VW: `VWD6_ENSIGHTEN_PRIVACY_` # als Name-Fragment). Mind. 3, max. 60 chars. _COOKIE_NAME_RE = ( r"(?:[A-Za-z][\w\-.]{2,60}|[A-Za-z][\w\-.]{2,60}<[^>]+>)" ) def parse_ocr_cookie_table(text: str) -> list[dict]: """Extract cookie-records from Tesseract-OCR text using anchor-based pattern: . Returns list of {name, category, purpose, duration, type}. Vendor is NOT inferred here — caller maps via _guess_vendor. KEINE Cookie-Namens-Korrektur — `awsalb` bleibt `awsalb`, nicht `awesome`. Falsche Korrektur waere ein Compliance-Verlust. """ if not text or len(text) < 200: return [] import re as _re # Pattern: capture name + anchor category, then up to 250 chars # forward to grab duration + type tokens. pattern = _re.compile( rf"(?P{_COOKIE_NAME_RE})\s+" rf"(?P{_CATEGORY_PATTERN})" rf"(?P[^A-Z]{{0,300}}?)" rf"(?:(?P\d+(?:[.,]\s*)?\s*(?:Tage|Jahre?|Monate?|Minuten|Stunden|Sekunden)\.?)?\s*" rf"(?PPermanent/Protokoll|Session\s*Cookie|Persistent\s*Cookie|Persistent\s*cookie))?", _re.IGNORECASE | _re.DOTALL, ) seen_names: set[str] = set() out: list[dict] = [] for m in pattern.finditer(text): name = (m.group("name") or "").strip() # Filter obvious garbage (UI strings, navigation, common words) if not name or len(name) < 3: continue nl = name.lower() if nl in seen_names: continue # Reject common non-cookie words. Cookie-Namen sind technische IDs: # haben oft Unterstrich/Bindestrich/Camel-Case oder sind kurze IDs. if nl in ("name", "art", "zweck", "dauer", "kategorie", "anbieter", "cookie", "cookies", "name des cookies", "this", "dieser", "diese", "alle", "und", "von", "der", "die", "das", "ein", "eine", "session", "permanent", "category"): continue # Cookie-Namen sollen kein reines Lower-Word sein OHNE _ oder - # (z.B. "verwendet" wuerde sonst matchen) has_marker = any(c in name for c in "_-.<>") is_caps = name.upper() == name and len(name) >= 3 is_camel = any(c.isupper() for c in name[1:]) and any(c.islower() for c in name) if not (has_marker or is_caps or is_camel): # Lowercase word ohne Marker → vermutlich kein Cookie-Name continue seen_names.add(nl) out.append({ "name": name[:80], "category": (m.group("category") or "").strip()[:60], "purpose": (m.group("rest") or "").strip()[:200], "duration": (m.group("duration") or "").strip()[:60], "type": (m.group("type") or "").strip()[:30], "vendor": "", }) logger.info("parse_ocr_cookie_table: %d unique cookies extracted", len(out)) return out _VISION_PROMPT = ( "Du analysierst einen Screenshot einer Cookie-Richtlinie. Auf der Seite " "ist eine Tabelle mit Cookies aufgelistet. Spalten sind ueblicherweise: " "Name des Cookies, Kategorie (z.B. 'Funktional', 'Marketing', " "'Analytics'), Verwendungszweck, Speicherdauer, Art des Cookies " "(z.B. 'Permanent', 'Session').\n\n" "Extrahiere ALLE Cookies aus dem Bild. Wenn die Tabelle abgeschnitten " "ist, extrahiere alles was sichtbar ist. KEINE Cookies erfinden, KEINE " "Halluzinationen.\n\n" "Antworte als reines JSON-Objekt im Format:\n" '{"cookies": [\n' ' {"name": "", "category": "", ' '"purpose": "", ' '"duration": "", ' '"type": "", ' '"vendor": ""}\n' "]}\n\n" "Nur JSON, kein Erklaerungstext, keine Code-Fences." ) async def capture_cookie_evidence_slices( cookie_url: str, check_id: str = "", viewport_h: int = 1024, overlap_px: int = 200, max_slices: int = 40, timeout_s: float = 180.0, ) -> dict: """Capture a full-page screenshot and slice it (with overlap) in-memory. Why not scroll-based slicing in Playwright? VW's cookie-page uses scroll-snap / fixed-position elements that defeat window.scrollTo — all viewport screenshots came back identical (header overlay only). A full-page screenshot bypasses scrolling entirely, and we slice the PNG bytes locally via PIL to get the same overlapping evidence chain. """ if not cookie_url: return {"slices": [], "error": "no url"} try: async with httpx.AsyncClient(timeout=timeout_s) as c: r = await c.post( f"{CONSENT_TESTER_URL}/capture-evidence", json={"url": cookie_url, "check_id": check_id}, timeout=timeout_s, ) r.raise_for_status() data = r.json() except Exception as e: logger.warning("capture full-page evidence failed: %s", e) return {"slices": [], "error": str(e)[:200]} png_b64 = data.get("png_b64", "") if not png_b64: return {"slices": [], "error": data.get("error", "no png")} try: from PIL import Image from io import BytesIO import hashlib as _hl png = _b64.b64decode(png_b64) img = Image.open(BytesIO(png)).convert("RGB") w, h = img.size step = max(1, viewport_h - overlap_px) slices: list[dict] = [] idx = 0 y = 0 while y < h and idx < max_slices: top = y bot = min(y + viewport_h, h) chunk = img.crop((0, top, w, bot)) buf = BytesIO() chunk.save(buf, format="PNG", optimize=True) png_chunk = buf.getvalue() slices.append({ "idx": idx, "ts": data.get("captured_at", ""), "top_y": top, "bot_y": bot, "sha256": _hl.sha256(png_chunk).hexdigest()[:16], "png_b64": _b64.b64encode(png_chunk).decode("ascii"), "png_size": len(png_chunk), }) y += step idx += 1 logger.info( "Evidence-slices (PIL-cut): %s → %d slices (image %dx%d, " "viewport=%d, overlap=%d)", cookie_url, len(slices), w, h, viewport_h, overlap_px, ) return { "slices": slices, "total_height_px": h, "width_px": w, "accepted_banner": data.get("accepted_banner"), "expanded": data.get("expanded"), "url": data.get("url", cookie_url), "captured_at": data.get("captured_at", ""), } except Exception as e: logger.warning("PIL-slice failed: %s (%s)", str(e) or "(no msg)", type(e).__name__) return {"slices": [], "error": str(e)[:200]} def _ocr_one_slice(s: dict) -> tuple[dict, list[dict]]: """Helper for parallel execution: tesseract + parse for one slice. Returns (slice_metadata_summary, cookies).""" import base64 as _b64 try: png = _b64.b64decode(s.get("png_b64", "")) except Exception: return ({"idx": s.get("idx"), "ts": s.get("ts"), "top_y": s.get("top_y"), "bot_y": s.get("bot_y"), "cookies_found": 0}, []) text = ocr_screenshot_via_tesseract(png) chunk = parse_ocr_cookie_table(text) return ({"idx": s.get("idx"), "ts": s.get("ts"), "top_y": s.get("top_y"), "bot_y": s.get("bot_y"), "cookies_found": len(chunk)}, chunk) def ocr_slices_extract_cookies( slices: list[dict], max_workers: int = 4, ) -> tuple[list[dict], dict]: """Run Tesseract on each slice IN PARALLEL + parse + dedup by name. Tesseract releases the GIL during its C-level OCR, so a ThreadPoolExecutor with 4 workers yields ~4x speedup on multi-core machines (M4 Pro has plenty). Sequential 32 slices = ~60s, parallel ~15s. Returns (cookies, stats) where stats has: per_slice: [{idx, cookies_found, ts, top_y, bot_y}] total_raw, total_unique, slices """ from concurrent.futures import ThreadPoolExecutor if not slices: return [], {"per_slice": [], "total_raw": 0, "total_unique": 0, "slices": 0} # Keep slice order so the per-slice report is sequential. with ThreadPoolExecutor(max_workers=max_workers) as ex: results = list(ex.map(_ocr_one_slice, slices)) per_slice: list[dict] = [r[0] for r in results] all_cookies: list[dict] = [] seen_names: set[str] = set() for _, chunk in results: for c in chunk: nl = (c.get("name") or "").strip().lower() if not nl or nl in seen_names: continue seen_names.add(nl) all_cookies.append(c) stats = { "per_slice": per_slice, "total_raw": sum(p["cookies_found"] for p in per_slice), "total_unique": len(all_cookies), "slices": len(slices), } logger.info( "ocr_slices_extract_cookies (parallel=%d): %d slices → %d raw → %d unique", max_workers, stats["slices"], stats["total_raw"], stats["total_unique"], ) return all_cookies, stats async def capture_cookie_screenshot( cookie_url: str, check_id: str = "", timeout_s: float = 60.0, ) -> dict: """Trigger consent-tester to capture full-page screenshot of cookie URL. Returns dict with png_b64, captured_at, url, width_px, height_px etc. Empty png_b64 on error. """ if not cookie_url: return {"png_b64": "", "error": "no url"} try: async with httpx.AsyncClient(timeout=timeout_s) as c: r = await c.post( f"{CONSENT_TESTER_URL}/capture-evidence", json={"url": cookie_url, "check_id": check_id}, timeout=timeout_s, ) r.raise_for_status() data = r.json() logger.info( "Evidence-Screenshot: %s -> %d bytes (%dx%d, expanded=%d, accepted=%s)", cookie_url, data.get("png_size", 0), data.get("width_px", 0), data.get("height_px", 0), data.get("expanded", 0), data.get("accepted_banner"), ) return data except Exception as e: logger.warning("capture_cookie_screenshot failed for %s: %s", cookie_url, e) return {"png_b64": "", "error": str(e)[:200]} async def extract_cookies_via_vision( png_b64: str, timeout_s: float = 240.0, ) -> list[dict]: """Call Ollama llama3.2-vision with the screenshot + extraction prompt. Returns list of {name, category, purpose, duration, type, vendor}. Empty list on failure. """ if not png_b64: return [] payload = { "model": VISION_MODEL, "stream": False, "format": "json", "messages": [{ "role": "user", "content": _VISION_PROMPT, "images": [png_b64], }], "options": { "temperature": 0.05, "num_predict": 8000, }, } try: async with httpx.AsyncClient(timeout=timeout_s) as c: r = await c.post( f"{OLLAMA_URL.rstrip('/')}/api/chat", json=payload, ) r.raise_for_status() content = (r.json().get("message") or {}).get("content", "") or "" cookies = _parse_vision_response(content) logger.info( "Vision-OCR extracted %d cookies (model=%s, response_len=%d)", len(cookies), VISION_MODEL, len(content), ) return cookies except Exception as e: logger.warning( "Vision-OCR call failed: %s (%s) model=%s", str(e) or "(no msg)", type(e).__name__, VISION_MODEL, ) return [] def _parse_vision_response(content: str) -> list[dict]: """Be lenient: code fences, leading prose, partial JSON.""" if not content: return [] txt = content.strip() if txt.startswith("```"): lines = txt.split("\n") if lines and lines[-1].strip().startswith("```"): txt = "\n".join(lines[1:-1]) else: txt = "\n".join(lines[1:]) a, b = txt.find("{"), txt.rfind("}") if not (0 <= a < b): return [] try: obj = json.loads(txt[a:b + 1]) except json.JSONDecodeError: return [] if not isinstance(obj, dict): return [] arr = obj.get("cookies") or obj.get("Cookies") or [] if not isinstance(arr, list): return [] out: list[dict] = [] for item in arr[:300]: # cap to sanity if not isinstance(item, dict): continue name = (item.get("name") or "").strip() if not name or len(name) < 2 or len(name) > 80: continue # Strip obvious garbage if re.fullmatch(r"[\s\-_.]+", name): continue out.append({ "name": name[:80], "category": (item.get("category") or "").strip()[:60], "purpose": (item.get("purpose") or "").strip()[:200], "duration": (item.get("duration") or "").strip()[:60], "type": (item.get("type") or "").strip()[:30], "vendor": (item.get("vendor") or "").strip()[:80], }) return out def cookies_to_vendor_records( cookies: list[dict], guess_vendor_fn=None, ) -> list[dict]: """Aggregate OCR-extracted cookies into vendor records compatible with cmp_vendors-schema. guess_vendor_fn: optional callable name → vendor. Each cookie's vendor field is used; if empty, we fall back to guess_vendor_fn (e.g. _guess_vendor from cookies_table_parser). """ by_vendor: dict[str, dict] = {} for c in cookies: v_name = (c.get("vendor") or "").strip() if not v_name and guess_vendor_fn: try: v_name = guess_vendor_fn(c["name"]) or "" except Exception: v_name = "" if not v_name: v_name = "Unbekannter Anbieter" v = by_vendor.setdefault(v_name, { "name": v_name, "country": "", "purpose": "", "category": c.get("category", ""), "opt_out_url": "", "privacy_policy_url": "", "persistence": c.get("duration", ""), "cookies": [], "source": "vision_ocr", }) v["cookies"].append({ "name": c["name"], "purpose": c.get("purpose", ""), "expiry": c.get("duration", ""), "is_third_party": True, "declared_category": c.get("category", ""), "type": c.get("type", ""), }) return list(by_vendor.values())