02879a2c3a
CI / detect-changes (push) Successful in 7s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / build-sha-integrity (push) Failing after 4s
CI / validate-canonical-controls (push) Successful in 11s
CI / loc-budget (push) Failing after 14s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m19s
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 29s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
CI hard-cap 500 LOC. cookie_screenshot_ocr.py war auf 642 gewachsen,
also gesplittet:
- cookie_screenshot_ocr_engines.py (353 LOC, NEU)
OCR-Engine-Funktionen: _slice_screenshot, Vision-LLM (qwen2.5vl),
PaddleOCR, Tesseract, parse_ocr_cookie_table, parse_vision_response,
Konstanten VISION_MODEL/OLLAMA_URL/VISION_PROMPT.
- cookie_screenshot_ocr.py (290 LOC, REWRITE)
Orchestration: capture_cookie_evidence_slices, _ocr_one_slice,
ocr_slices_extract_cookies, capture_cookie_screenshot,
extract_cookies_via_vision, cookies_to_vendor_records.
Re-Exports der Engine-Funktionen für Backward-Kompat.
Einziger externer Importer (_phase_d1_vendors_raw.py) braucht keinen
Code-Change — Public-API stabil.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
291 lines
9.9 KiB
Python
291 lines
9.9 KiB
Python
"""Screenshot-basierte Cookie-Extraktion (Orchestration).
|
|
|
|
Pipeline:
|
|
1. consent-tester macht Full-Page-Screenshot (Banner akzeptiert,
|
|
Accordions ausgeklappt, Timestamp eingebrannt) → PNG b64
|
|
2. Tesseract OCR (lang=deu, psm=4) → Rohtext mit Tabellen-Reihen
|
|
3. parse_ocr_cookie_table(text) → strukturierte Liste
|
|
|
|
Phase-1-Split (2026-06-06): Engine-Funktionen
|
|
(_slice_screenshot / vision-OCR / paddle / tesseract / parse) leben
|
|
jetzt in `cookie_screenshot_ocr_engines.py`. Re-Exports halten die
|
|
Public-API stabil — externe Importer (`_phase_d1_vendors_raw.py`)
|
|
brauchen keinen Code-Change.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64 as _b64
|
|
import logging
|
|
import os
|
|
|
|
import httpx
|
|
|
|
from .cookie_screenshot_ocr_engines import ( # noqa: F401 (re-exports)
|
|
OLLAMA_URL,
|
|
VISION_MODEL,
|
|
VISION_PROMPT,
|
|
_PADDLE_OCR,
|
|
_call_vision_on_slice,
|
|
_slice_screenshot,
|
|
ocr_screenshot_via_paddle,
|
|
ocr_screenshot_via_tesseract,
|
|
ocr_screenshot_via_vision_slices,
|
|
parse_ocr_cookie_table,
|
|
parse_vision_response,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
CONSENT_TESTER_URL = os.getenv(
|
|
"CONSENT_TESTER_URL", "http://bp-compliance-consent-tester:8094"
|
|
)
|
|
|
|
|
|
# Backward-compat: some callers may import _parse_vision_response
|
|
_parse_vision_response = parse_vision_response
|
|
|
|
|
|
async def capture_cookie_evidence_slices(
|
|
cookie_url: str, check_id: str = "",
|
|
viewport_h: int = 1024, overlap_px: int = 200, max_slices: int = 40,
|
|
timeout_s: float = 180.0,
|
|
) -> dict:
|
|
"""Capture a full-page screenshot and slice it (with overlap) in-memory.
|
|
|
|
Why not scroll-based slicing in Playwright? VW's cookie-page uses
|
|
scroll-snap / fixed-position elements that defeat window.scrollTo —
|
|
all viewport screenshots came back identical (header overlay only).
|
|
A full-page screenshot bypasses scrolling entirely, and we slice the
|
|
PNG bytes locally via PIL to get the same overlapping evidence chain.
|
|
"""
|
|
if not cookie_url:
|
|
return {"slices": [], "error": "no url"}
|
|
try:
|
|
async with httpx.AsyncClient(timeout=timeout_s) as c:
|
|
r = await c.post(
|
|
f"{CONSENT_TESTER_URL}/capture-evidence",
|
|
json={"url": cookie_url, "check_id": check_id},
|
|
timeout=timeout_s,
|
|
)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
except Exception as e:
|
|
logger.warning("capture full-page evidence failed: %s", e)
|
|
return {"slices": [], "error": str(e)[:200]}
|
|
|
|
png_b64 = data.get("png_b64", "")
|
|
if not png_b64:
|
|
return {"slices": [], "error": data.get("error", "no png")}
|
|
|
|
try:
|
|
from PIL import Image
|
|
from io import BytesIO
|
|
import hashlib as _hl
|
|
png = _b64.b64decode(png_b64)
|
|
img = Image.open(BytesIO(png)).convert("RGB")
|
|
w, h = img.size
|
|
step = max(1, viewport_h - overlap_px)
|
|
slices: list[dict] = []
|
|
idx = 0
|
|
y = 0
|
|
while y < h and idx < max_slices:
|
|
top = y
|
|
bot = min(y + viewport_h, h)
|
|
chunk = img.crop((0, top, w, bot))
|
|
buf = BytesIO()
|
|
chunk.save(buf, format="PNG", optimize=True)
|
|
png_chunk = buf.getvalue()
|
|
slices.append({
|
|
"idx": idx,
|
|
"ts": data.get("captured_at", ""),
|
|
"top_y": top, "bot_y": bot,
|
|
"sha256": _hl.sha256(png_chunk).hexdigest()[:16],
|
|
"png_b64": _b64.b64encode(png_chunk).decode("ascii"),
|
|
"png_size": len(png_chunk),
|
|
})
|
|
y += step
|
|
idx += 1
|
|
logger.info(
|
|
"Evidence-slices (PIL-cut): %s → %d slices (image %dx%d, "
|
|
"viewport=%d, overlap=%d)",
|
|
cookie_url, len(slices), w, h, viewport_h, overlap_px,
|
|
)
|
|
return {
|
|
"slices": slices,
|
|
"total_height_px": h,
|
|
"width_px": w,
|
|
"accepted_banner": data.get("accepted_banner"),
|
|
"expanded": data.get("expanded"),
|
|
"url": data.get("url", cookie_url),
|
|
"captured_at": data.get("captured_at", ""),
|
|
}
|
|
except Exception as e:
|
|
logger.warning("PIL-slice failed: %s (%s)",
|
|
str(e) or "(no msg)", type(e).__name__)
|
|
return {"slices": [], "error": str(e)[:200]}
|
|
|
|
|
|
def _ocr_one_slice(s: dict) -> tuple[dict, list[dict]]:
|
|
"""Helper for parallel execution: tesseract + parse for one slice."""
|
|
try:
|
|
png = _b64.b64decode(s.get("png_b64", ""))
|
|
except Exception:
|
|
return ({"idx": s.get("idx"), "ts": s.get("ts"),
|
|
"top_y": s.get("top_y"), "bot_y": s.get("bot_y"),
|
|
"cookies_found": 0}, [])
|
|
text = ocr_screenshot_via_tesseract(png)
|
|
chunk = parse_ocr_cookie_table(text)
|
|
return ({"idx": s.get("idx"), "ts": s.get("ts"),
|
|
"top_y": s.get("top_y"), "bot_y": s.get("bot_y"),
|
|
"cookies_found": len(chunk)},
|
|
chunk)
|
|
|
|
|
|
def ocr_slices_extract_cookies(
|
|
slices: list[dict], max_workers: int = 4,
|
|
) -> tuple[list[dict], dict]:
|
|
"""Run Tesseract on each slice IN PARALLEL + parse + dedup by name.
|
|
|
|
Tesseract releases the GIL during its C-level OCR, so a
|
|
ThreadPoolExecutor with 4 workers yields ~4x speedup on multi-core
|
|
machines (M4 Pro has plenty). Sequential 32 slices = ~60s, parallel
|
|
~15s.
|
|
"""
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
if not slices:
|
|
return [], {"per_slice": [], "total_raw": 0,
|
|
"total_unique": 0, "slices": 0}
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as ex:
|
|
results = list(ex.map(_ocr_one_slice, slices))
|
|
|
|
per_slice: list[dict] = [r[0] for r in results]
|
|
all_cookies: list[dict] = []
|
|
seen_names: set[str] = set()
|
|
for _, chunk in results:
|
|
for c in chunk:
|
|
nl = (c.get("name") or "").strip().lower()
|
|
if not nl or nl in seen_names:
|
|
continue
|
|
seen_names.add(nl)
|
|
all_cookies.append(c)
|
|
|
|
stats = {
|
|
"per_slice": per_slice,
|
|
"total_raw": sum(p["cookies_found"] for p in per_slice),
|
|
"total_unique": len(all_cookies),
|
|
"slices": len(slices),
|
|
}
|
|
logger.info(
|
|
"ocr_slices_extract_cookies (parallel=%d): %d slices → %d raw → %d unique",
|
|
max_workers, stats["slices"], stats["total_raw"],
|
|
stats["total_unique"],
|
|
)
|
|
return all_cookies, stats
|
|
|
|
|
|
async def capture_cookie_screenshot(
|
|
cookie_url: str, check_id: str = "", timeout_s: float = 60.0,
|
|
) -> dict:
|
|
"""Trigger consent-tester to capture full-page screenshot of cookie URL."""
|
|
if not cookie_url:
|
|
return {"png_b64": "", "error": "no url"}
|
|
try:
|
|
async with httpx.AsyncClient(timeout=timeout_s) as c:
|
|
r = await c.post(
|
|
f"{CONSENT_TESTER_URL}/capture-evidence",
|
|
json={"url": cookie_url, "check_id": check_id},
|
|
timeout=timeout_s,
|
|
)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
logger.info(
|
|
"Evidence-Screenshot: %s -> %d bytes (%dx%d, expanded=%d, accepted=%s)",
|
|
cookie_url, data.get("png_size", 0),
|
|
data.get("width_px", 0), data.get("height_px", 0),
|
|
data.get("expanded", 0), data.get("accepted_banner"),
|
|
)
|
|
return data
|
|
except Exception as e:
|
|
logger.warning("capture_cookie_screenshot failed for %s: %s",
|
|
cookie_url, e)
|
|
return {"png_b64": "", "error": str(e)[:200]}
|
|
|
|
|
|
async def extract_cookies_via_vision(
|
|
png_b64: str, timeout_s: float = 240.0,
|
|
) -> list[dict]:
|
|
"""Call Ollama vision model with the screenshot + extraction prompt."""
|
|
if not png_b64:
|
|
return []
|
|
payload = {
|
|
"model": VISION_MODEL,
|
|
"stream": False,
|
|
"format": "json",
|
|
"messages": [{
|
|
"role": "user",
|
|
"content": VISION_PROMPT,
|
|
"images": [png_b64],
|
|
}],
|
|
"options": {"temperature": 0.05, "num_predict": 8000},
|
|
}
|
|
try:
|
|
async with httpx.AsyncClient(timeout=timeout_s) as c:
|
|
r = await c.post(
|
|
f"{OLLAMA_URL.rstrip('/')}/api/chat",
|
|
json=payload,
|
|
)
|
|
r.raise_for_status()
|
|
content = (r.json().get("message") or {}).get("content", "") or ""
|
|
cookies = parse_vision_response(content)
|
|
logger.info(
|
|
"Vision-OCR extracted %d cookies (model=%s, response_len=%d)",
|
|
len(cookies), VISION_MODEL, len(content),
|
|
)
|
|
return cookies
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Vision-OCR call failed: %s (%s) model=%s",
|
|
str(e) or "(no msg)", type(e).__name__, VISION_MODEL,
|
|
)
|
|
return []
|
|
|
|
|
|
def cookies_to_vendor_records(
|
|
cookies: list[dict], guess_vendor_fn=None,
|
|
) -> list[dict]:
|
|
"""Aggregate OCR-extracted cookies into vendor records compatible with
|
|
cmp_vendors-schema. guess_vendor_fn: optional callable name → vendor."""
|
|
by_vendor: dict[str, dict] = {}
|
|
for c in cookies:
|
|
v_name = (c.get("vendor") or "").strip()
|
|
if not v_name and guess_vendor_fn:
|
|
try:
|
|
v_name = guess_vendor_fn(c["name"]) or ""
|
|
except Exception:
|
|
v_name = ""
|
|
if not v_name:
|
|
v_name = "Unbekannter Anbieter"
|
|
v = by_vendor.setdefault(v_name, {
|
|
"name": v_name,
|
|
"country": "",
|
|
"purpose": "",
|
|
"category": c.get("category", ""),
|
|
"opt_out_url": "",
|
|
"privacy_policy_url": "",
|
|
"persistence": c.get("duration", ""),
|
|
"cookies": [],
|
|
"source": "vision_ocr",
|
|
})
|
|
v["cookies"].append({
|
|
"name": c["name"],
|
|
"purpose": c.get("purpose", ""),
|
|
"expiry": c.get("duration", ""),
|
|
"is_third_party": True,
|
|
"declared_category": c.get("category", ""),
|
|
"type": c.get("type", ""),
|
|
})
|
|
return list(by_vendor.values())
|