Files
breakpilot-compliance/backend-compliance/compliance/services/audit_walk_zip_builder.py
T
Benjamin Admin c908fcd5eb feat(b19): Cookie-Coherence — 3-Layer-Lookup + Vendor-Karten + CSV
Adressiert das BMW-Beispiel (740 Cookies, Salesforce als "essential"
mit 1-Jahres-Lifetime, Pseudo-Zwecke wie "Siehe dazugehörige
Datenverarbeitung"). User-Konzept "Regulation als Code".

Step 1 — cookie_library_lookup.py (3 Layer):
  1. Override = cookie_knowledge_db.py + extended (74) für
     Schrems-II / EUGH / EU-Alternative — BreakPilot-juristische-IP.
  2. Truth-Base = compliance.cookie_library (2287 aus Open Cookie
     Database, CC0). actual_category als Wahrheit.
  3. Auto-Learning = cookie_behavior_audits — Cross-Site-Konsens
     wenn ≥3 Sites denselben Cookie melden.

  Match: exact > prefix (mit Separator-Check) > wildcard. Kurze
  Library-Namen ("c", "ID") brauchen exact-match — verhindert
  False-Positive auf "completely_unknown". Trailing-Underscore
  in OCD ("guest_uuid_essential_") wird als implicit-wildcard
  interpretiert.

Step 2 — cookie_coherence_check.py (B19, 6 Finding-Typen):
  - MARKETING_AS_ESSENTIAL (HIGH): KB sagt actual=marketing, Site
    deklariert essential/erforderlich → Einwilligung wird umgangen
  - LIFETIME_TOO_LONG_FOR_ESSENTIAL (MED): essential + >90d
  - PSEUDO_PURPOSE (LOW): "Siehe dazugehörige Datenverarbeitung"
    / <4 Wörter (suppressed wenn Vendor-Purpose substantial ist)
  - MISSING_COUNTRY (LOW): vendor_country leer trotz KB-Hit
  - UNKNOWN_VENDOR (LOW): nicht in KB → Auto-Learning-Kandidat
  - DUPLICATE_VENDOR (MED): selber Vendor in N Kategorien =
    Stack-Aufspaltung um Marketing unter "essential" zu schmuggeln

  Jedes Finding mit recommended_action ("Cookie X aus 'erforderlich'
  raus und in 'Marketing' setzen").

Step 3 — cookie_observation_logger.py:
  Loggt nach jedem Audit alle (cookie, site, declared_purpose) in
  compliance.cookie_behavior_audits → Basis für Cross-Site-Konsens
  in Layer 3.

Step 4 — cookie_csv_exporter.py:
  cookies-full-{check_id}.csv mit 21 Spalten (Name, Vendor decl/KB,
  Cat decl/KB, Lifetime decl/KB, Country, Opt-Out, 8x FIND_* flags,
  recommended_action). UTF-8 BOM für Excel.
  ZIP-Attachment: erweitert audit_walk_zip_builder um extra_files=
  parameter; phase_e ruft mit cookies-full-...csv auf.

Step 5 — mail_render_v2/_vendor_cards.py:
  Statt 740 Cookie-Rows: Aggregation pro Vendor mit Cookie-Count +
  Issue-Count + 1-2 Beispiel-Cookies + Issue-Type-Tags. Top 30
  Vendoren in der Mail, Rest nur in CSV. Sortiert nach Issue-Score.

Step 6 — render_info_box_rechtsrahmen():
  Generic Header-Info-Box mit Art. 13 DSGVO + § 25 TDDDG + Art. 5
  + § 5 UWG + § 30/130 OWiG. Immer angezeigt, kein explicit-
  finding-mapping (User-mündigkeit).

Orchestrator + _compose: run_b19 + render_vendor_cards +
  render_info_box_rechtsrahmen ins V2-Layout.

Tests: 28/28 grün (15 lookup + 13 coherence).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-07 23:48:04 +02:00

123 lines
4.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Bundle the audit-walk-video + metadata into a ZIP for email attachment.
Backend hat kein direkten Zugriff auf das consent-tester-Volume, also
laden wir das Video via HTTP vom consent-tester (Stufe-1-Endpoint).
DSMS-CIDs sind im walk dict + werden zusätzlich in README.txt
geschrieben, sodass der Empfänger das Original auch via IPFS-Gateway
verifizieren kann.
Output: bytes (ZIP-stream) — ready für SMTP-attachment.
"""
from __future__ import annotations
import io
import json
import logging
import zipfile
import httpx
logger = logging.getLogger(__name__)
def _readme(walk: dict) -> str:
wid = walk.get("walk_id") or "?"
url = walk.get("url") or "?"
started = walk.get("started_at") or "?"
completed = walk.get("completed_at") or "?"
video = walk.get("video") or {}
sha = video.get("sha256") or "?"
size = video.get("size_bytes") or 0
video_cid = (video.get("dsms") or {}).get("cid") or ""
meta_cid = (walk.get("walk_json_dsms") or {}).get("cid") or ""
nav = sum(1 for a in walk.get("actions") or []
if a.get("action") == "navigate")
accs = sum((a.get("expanded") or 0) for a in walk.get("actions") or []
if a.get("action") == "expand_accordions")
return f"""BreakPilot Compliance — Audit-Walk-Beweis-Paket
Walk-ID: {wid}
Site: {url}
Aufgenommen: {started}{completed}
Engine: Playwright WebKit (Mobile-Viewport 1280×800)
Inhalt dieses Pakets:
- video.webm {size:,} Bytes, SHA-256 {sha[:32]}
- walk.json Action-Index mit UTC-Timestamps pro Schritt
- README.txt diese Datei
Walk-Statistik:
- {nav} Compliance-Seiten besucht (Datenschutz, Impressum, AGB, ...)
- {accs} Akkordeon-/Details-Sektionen automatisch entfaltet
DSMS-Anker (IPFS, manipulationssicher):
Video: {video_cid}
walk.json: {meta_cid}
Zur Verifikation:
1. Lade das Original via https://dsms-dev.breakpilot.ai/ipfs/<CID>
2. Vergleiche SHA-256 mit obigem Hash
3. Öffne video.webm in einem modernen Browser (VLC / Chrome)
4. Lies walk.json um die Klick-Sequenz nachzuvollziehen
"""
def build_audit_walk_zip(
walk: dict,
consent_tester_url: str = "http://bp-compliance-consent-tester:8094",
extra_files: dict[str, bytes] | None = None,
) -> bytes:
"""Fetch video from consent-tester + bundle with walk.json + README.
`extra_files` is optional name→bytes mapping (e.g. cookies-full.csv
from B19 export). Placed at the ZIP root next to video.webm.
"""
wid = walk.get("walk_id") or ""
if not wid:
return b""
# Pull video binary from consent-tester (Stufe 1 endpoint)
video_bytes = b""
try:
with httpx.Client(timeout=60.0) as c:
r = c.get(f"{consent_tester_url}/audit-walks/{wid}/video.webm")
if r.status_code == 200:
video_bytes = r.content
except Exception as e:
logger.warning("audit-walk video fetch failed: %s", e)
walk_json_bytes = json.dumps(walk, indent=2, ensure_ascii=False).encode(
"utf-8",
)
readme_bytes = _readme(walk).encode("utf-8")
# Annotierte Screenshots pro Finding (Stufe 5)
import base64
annotations = walk.get("annotations") or []
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as z:
if video_bytes:
z.writestr("video.webm", video_bytes)
z.writestr("walk.json", walk_json_bytes)
z.writestr("README.txt", readme_bytes)
for a in annotations:
fname = a.get("filename") or ""
b64 = a.get("png_b64") or ""
if not fname or not b64:
continue
try:
z.writestr(f"findings/{fname}", base64.b64decode(b64))
except Exception as e:
logger.warning("annotation %s write failed: %s",
fname, e)
for fname, content in (extra_files or {}).items():
if content:
try:
z.writestr(fname, content)
except Exception as e:
logger.warning("extra-file %s write failed: %s",
fname, e)
return buf.getvalue()