Files
breakpilot-compliance/backend-compliance/compliance/services/check_replay.py
T
Benjamin Admin e2be51b0aa
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 16s
CI / detect-changes (push) Successful in 11s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 16s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m42s
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 41s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
feat(audit): P106 MC-Audit-Type + P83 BUILD_SHA in Dockerfiles + P80 v2 full
P106 — mc_audit_type.py: zentrales Quality-Thema.
Klassifiziert pro MC: verifiable / process_internal / doc_internal /
ambiguous. Pattern-Match auf check_question + title + fail_criteria
(Schulung, AVV abgeschlossen, TOM umgesetzt, DSFA durchgefuehrt,
Ausnahmen dokumentieren, kostenfrei zur Verfuegung, opt-out
intern ermoeglichen, …).

Interne MCs werden in der MC-Auswertung NICHT mehr als FAIL gewertet,
sondern als CHECK markiert (audit_status='check'). Sie zaehlen im
build_scorecard als skipped (nicht failed) damit der Score realistisch
ist. build_internal_checks_block_html() rendert sie als separaten
blauen Block 'Pruefungen die wir von aussen NICHT durchfuehren koennen'
nach dem MC-Scorecard.

Erwartete Wirkung: bei VW 95 FAILs → wahrscheinlich 30-40 echte
verifiable_fails + 50-60 internal_checks. GF-Mail wird drastisch
realistischer (statt 'Sie haben 95 Verstoesse' → 'Sie haben 35
extern sichtbare Themen + 60 interne Checks, bitte mit DSB klaeren').

P83 — BUILD_SHA in backend/admin/consent-tester Dockerfiles als
ARG + ENV. check-rebuild-needed.sh kann jetzt deployed vs local SHA
vergleichen + REBUILD REQUIRED melden.

P80 v2 — check_replay.py macht jetzt vollstaendigen Replay aller
post-fetch Quality-Generatoren: vendor_normalizer (Dedup),
audit_quality_checks, cookie_compliance_audit, tcf_vendor_authority,
cookie_value_entropy, cookie_network_tracer. Snapshots aus alter Zeit
zeigen jetzt im Replay den aktuellen Audit-Stand.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 08:57:02 +02:00

298 lines
11 KiB
Python

"""
P80 — Replay-Pipeline (Mini-Version v1).
Lädt einen persistierten Snapshot und rendert die Audit-Mail mit dem
AKTUELLEN Mail-Render-Code neu. Nutzbar fuer:
* Mail-Layout-Aenderungen (P63-P67, P82 1-Pager, P84 Diff-Mode) testen
* Action-Recipes anpassen
* Disclaimer-Text iterieren
* Pattern-Notice-Logik tunen
NICHT enthalten (kommt in v2):
* MC-Scorecard re-run mit aktuellem scope_doc_type-Filter (P72) —
erfordert MC-Pipeline-Refactoring aus _run_compliance_check
* Vendor-Redundancy-Analyse re-run
Effekt v1: 7min Re-Scan -> 2-5 Sek fuer Mail-Layout-Iterationen.
Effekt v2 (spaeter): auch fuer MC-Filter-Tests.
"""
from __future__ import annotations
import logging
from typing import Any
from sqlalchemy.orm import Session
from compliance.services.check_snapshot import load_snapshot
logger = logging.getLogger(__name__)
def replay_from_snapshot(
db: Session,
snapshot_id: str,
recipient: str | None = None,
dry_run: bool = False,
) -> dict:
"""Replay audit mail render from snapshot.
Args:
db: SQLAlchemy session
snapshot_id: UUID of snapshot to replay
recipient: Override email recipient. None = skip send.
dry_run: If True, render HTML but do not send mail.
Returns:
{"snapshot_id", "html_size", "sections", "mail_sent", "preview"}
"""
snap = load_snapshot(db, snapshot_id)
if not snap:
return {"error": "snapshot not found", "snapshot_id": snapshot_id}
doc_entries = snap.get("doc_entries") or []
banner_result = snap.get("banner_result") or {}
profile_dict = snap.get("profile") or {}
cmp_vendors = snap.get("cmp_vendors") or []
site_label = snap.get("site_label") or snap.get("site_domain")
# Reconstruct doc_texts mapping (was the input to mail-render).
# Snapshot-Schema speichert text unter "text" (nicht full_text).
doc_texts: dict[str, str] = {}
for e in doc_entries:
dt = e.get("doc_type", "")
txt = (e.get("text") or e.get("full_text") or e.get("text_preview") or "").strip()
if dt and txt:
doc_texts[dt] = txt
# Build results list mock (just enough for mail-render)
def _dict_to_result(d: dict) -> Any:
"""Best-effort reconstruction. Snapshot didn't persist DocCheckResult
so we fake minimal fields. For real MC-replay (v2) we'd re-run the
check_document_completeness function against the snapshot text."""
return type("R", (), {
"doc_type": d.get("doc_type", "other"),
"label": d.get("doc_type", "Dokument"),
"completeness_pct": d.get("completeness_pct", 0),
"correctness_pct": d.get("correctness_pct"),
"checks": [],
"error": d.get("error", ""),
})()
results = [_dict_to_result(e) for e in doc_entries]
# Render mail sections
section_sizes: dict[str, int] = {}
parts: list[str] = []
# P80 v2 — Quality-Checks aus dem aktuellen Code auf Snapshot-Daten
# anwenden. Vollstaendiger Replay aller post-fetch Findings-Generatoren.
cookie_t = doc_texts.get("cookie") or doc_texts.get("dse") or ""
# Vendor-Normalize (Dedup + Garbage-Filter)
try:
from compliance.services.vendor_normalizer import normalize_vendors
cmp_vendors = normalize_vendors(list(cmp_vendors))
except Exception as e:
logger.warning("Replay v2: normalizer failed: %s", e)
# Audit-Quality
try:
from compliance.services.audit_quality_checks import (
run_all as run_aq, build_audit_quality_block_html,
)
aq = run_aq(banner_result, cookie_t, cmp_vendors, doc_entries)
if aq:
aq_html = build_audit_quality_block_html(aq)
parts.append(aq_html)
section_sizes["audit_quality_v2"] = len(aq_html)
except Exception as e:
logger.warning("Replay v2: audit_quality failed: %s", e)
# Cookie-Compliance-Audit
try:
from compliance.services.cookie_compliance_audit import (
audit_cookie_compliance, build_cookie_audit_block_html,
)
ca = audit_cookie_compliance(db, cookie_t, banner_result)
if ca and (ca.get("declared_count") or ca.get("browser_count")):
ca_html = build_cookie_audit_block_html(ca)
parts.append(ca_html)
section_sizes["cookie_audit_v2"] = len(ca_html)
except Exception as e:
logger.warning("Replay v2: cookie_audit failed: %s", e)
# TCF Authority
try:
from compliance.services.tcf_vendor_authority import (
cross_reference_with_tcf, build_tcf_authority_block_html,
)
tcf = cross_reference_with_tcf(db, cmp_vendors)
if tcf:
tcf_html = build_tcf_authority_block_html(tcf)
parts.append(tcf_html)
section_sizes["tcf_v2"] = len(tcf_html)
except Exception as e:
logger.warning("Replay v2: tcf failed: %s", e)
# Entropy + Network-Trace
try:
from compliance.services.cookie_value_entropy import (
check_cookies_for_entropy_mismatch, build_entropy_block_html,
)
from compliance.services.cookie_network_tracer import (
trace_cookie_network, build_network_trace_block_html,
)
cd = (banner_result or {}).get("cookies_detailed") or []
e1 = check_cookies_for_entropy_mismatch(cd)
if e1:
ent_html = build_entropy_block_html(e1)
parts.append(ent_html)
section_sizes["entropy_v2"] = len(ent_html)
site_url = ""
for entry in (doc_entries or []):
if entry.get("url"):
site_url = entry["url"]; break
net = trace_cookie_network(cd, site_url)
if net:
net_html = build_network_trace_block_html(net)
parts.append(net_html)
section_sizes["network_trace_v2"] = len(net_html)
except Exception as e:
logger.warning("Replay v2: entropy/network failed: %s", e)
# P82: GF-1-Pager zuerst (5-Bullet-Summary)
try:
from compliance.services.gf_one_pager import build_gf_one_pager_html
gf_html = build_gf_one_pager_html(
site_name=site_label or "",
scorecard=None, # Snapshot enthaelt keine MC-Scorecard
banner_result=banner_result,
library_mismatch_findings=None, # wird unten gefuellt
scan_context=snap.get("scan_context"),
)
parts.append(gf_html)
section_sizes["gf_one_pager"] = len(gf_html)
except Exception as e:
logger.warning("Replay: GF-1-pager failed: %s", e)
try:
from compliance.api.agent_doc_check_critical import build_critical_findings_html
critical_html = build_critical_findings_html(banner_result, None, results) or ""
parts.append(critical_html)
section_sizes["critical"] = len(critical_html)
except Exception as e:
logger.warning("Replay: critical-block failed: %s", e)
try:
from compliance.api.scope_disclaimer import build_scope_disclaimer_html
disclaimer = build_scope_disclaimer_html()
parts.append(disclaimer)
section_sizes["disclaimer"] = len(disclaimer)
except Exception as e:
logger.warning("Replay: disclaimer failed: %s", e)
try:
from compliance.api.agent_doc_check_banner import build_banner_deep_html
banner_html = build_banner_deep_html(banner_result) or ""
parts.append(banner_html)
section_sizes["banner"] = len(banner_html)
except Exception as e:
logger.warning("Replay: banner-block failed: %s", e)
try:
from compliance.api.vvt_table_renderer import build_vvt_table_html
vvt_html = build_vvt_table_html(cmp_vendors) or ""
parts.append(vvt_html)
section_sizes["vvt"] = len(vvt_html)
except Exception as e:
logger.warning("Replay: vvt failed: %s", e)
# P35 + P77 + P78 + P36: Textsignale (Save-Label, Cookies-in-DSE,
# JC-Klausel, Social-Embeds)
try:
from compliance.services.doc_text_signals import (
run_all as run_signal_checks,
build_signals_block_html,
)
cookie_doc_missing = not bool(doc_texts.get("cookie"))
sig_findings = run_signal_checks(
banner_result, doc_texts, cookie_doc_missing,
)
if sig_findings:
sig_html = build_signals_block_html(sig_findings)
parts.append(sig_html)
section_sizes["signals"] = len(sig_html)
except Exception as e:
logger.warning("Replay: signals block failed: %s", e)
# P92 + P94: Banner-Konsistenz
try:
from compliance.services.banner_consistency_checks import (
run_all as run_consistency_checks,
build_consistency_block_html,
)
cookie_doc_for_check = doc_texts.get("cookie") or doc_texts.get("dse") or ""
cons = run_consistency_checks(
banner_result or {}, cookie_doc_for_check, cmp_vendors,
doc_texts=doc_texts,
)
if cons:
cons_html = build_consistency_block_html(cons)
parts.append(cons_html)
section_sizes["consistency"] = len(cons_html)
except Exception as e:
logger.warning("Replay: consistency block failed: %s", e)
# P102: Cookie-Klassifikations-Pruefung
try:
from compliance.services.cookie_library_mismatch import (
detect_mismatches, build_mismatch_block_html,
)
cookies_seen: list[str] = []
for ph in (banner_result.get("phases") or {}).values():
if isinstance(ph, dict):
for ck in (ph.get("cookies") or []):
if isinstance(ck, str):
cookies_seen.append(ck)
elif isinstance(ck, dict) and ck.get("name"):
cookies_seen.append(ck["name"])
doc_for_check = doc_texts.get("cookie") or doc_texts.get("dse") or ""
if cookies_seen and doc_for_check:
mm = detect_mismatches(db, cookies_seen, doc_for_check)
if mm:
mm_html = build_mismatch_block_html(mm)
parts.append(mm_html)
section_sizes["library_mismatch"] = len(mm_html)
except Exception as e:
logger.warning("Replay: mismatch block failed: %s", e)
full_html = "".join(parts)
result = {
"snapshot_id": snapshot_id,
"check_id": snap.get("check_id"),
"site_domain": snap.get("site_domain"),
"html_size": len(full_html),
"sections": section_sizes,
"mail_sent": False,
"preview": full_html[:500] + "..." if len(full_html) > 500 else full_html,
"full_html": full_html, # P88 PDF-Export braucht das volle HTML.
}
if recipient and not dry_run:
try:
from compliance.services.smtp_sender import send_email
email_res = send_email(
recipient=recipient,
subject=f"[REPLAY] {site_label} (Snapshot {snapshot_id[:8]})",
body_html=full_html,
)
result["mail_sent"] = (email_res.get("status") == "sent")
result["mail_status"] = email_res.get("status")
except Exception as e:
logger.warning("Replay: mail send failed: %s", e)
result["mail_send_error"] = str(e)[:200]
return result