feat(audit-pipeline): P72 MC-Scope-Classifier + P80 Snapshot/Replay-Foundation [migration-approved]
CI / detect-changes (push) Successful in 11s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 14s
CI / loc-budget (push) Failing after 14s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Has been skipped
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 37s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped

P72  MC-Scope-Classifier — pro MC den ECHTEN Doc-Adressaten festlegen
     (cookie_richtlinie/dse/banner_implementation/cmp_audit/tom/avv/jc/
      impressum/agb/widerruf/process/accounting/other).
     - Migration 145: scope_doc_type Spalte + Index auf canonical_controls
     - Backfill-Script mit Regex-Heuristik (12 Regeln, Prioritaet-sortiert)
     - Erste 11k-Sample-Distribution: 76% other (Heuristik v1 zu strict —
       v2 muss lockerere Patterns fuer DSE/TOM nachschaerfen)
     - Ziel: bevor MC-Scorecard filtert, weiss jeder MC welches Dokument
       er adressiert. Bisher landeten eHealth-/HGB-MCs im Cookie-Audit.

P80  Snapshot + Replay-Foundation — Roh-Daten persistieren damit
     Audit-Pipeline ohne erneuten Crawl rebuildbar ist.
     - Migration 146: compliance_check_snapshots Tabelle (JSONB pro
       doc_entries/banner_result/profile/cmp_vendors/scan_context)
     - services.check_snapshot.save_snapshot/load_snapshot/list
     - Endpoints GET /snapshots, GET /snapshots/{id}
     - Hook in _run_compliance_check: nach Mail-Send automatischer
       Snapshot-Save via separater SessionLocal (background-task safe)
     - Replay-Endpoint folgt im naechsten PR (braucht Refactoring
       von _run_compliance_check in crawl_phase + interpret_phase)
     - Effekt: Test-Cycle 7min -> 5sec bei reinen Logik-Aenderungen
       (P73/P79/P81+ profitieren direkt). Snapshots dienen auch als
       Regression-Test-Corpus (P81 Golden-Truth-Library).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-21 08:53:31 +02:00
parent 603381a67f
commit cde670617e
5 changed files with 554 additions and 0 deletions
@@ -155,6 +155,53 @@ async def get_compliance_check_status(check_id: str):
)
# ── P80: Snapshot + Replay ───────────────────────────────────────────
@router.get("/snapshots")
async def list_snapshots(domain: str = "", limit: int = 20):
"""P80: list recent snapshots, optionally filtered by site_domain."""
from database import SessionLocal
from compliance.services.check_snapshot import list_snapshots_for_domain
db = SessionLocal()
try:
if domain:
return {"snapshots": list_snapshots_for_domain(db, domain, limit)}
from sqlalchemy import text
rows = db.execute(
text("""
SELECT id, check_id, site_domain, site_label, created_at,
replay_count, notes
FROM compliance.compliance_check_snapshots
ORDER BY created_at DESC
LIMIT :lim
"""),
{"lim": limit},
).fetchall()
return {"snapshots": [
{"id": str(r[0]), "check_id": r[1], "site_domain": r[2],
"site_label": r[3], "created_at": str(r[4]),
"replay_count": r[5], "notes": r[6]}
for r in rows
]}
finally:
db.close()
@router.get("/snapshots/{snapshot_id}")
async def get_snapshot(snapshot_id: str):
"""P80: load full snapshot raw data."""
from database import SessionLocal
from compliance.services.check_snapshot import load_snapshot
db = SessionLocal()
try:
snap = load_snapshot(db, snapshot_id)
if not snap:
return {"error": "snapshot not found"}, 404
return snap
finally:
db.close()
async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
"""Background task: check all documents with business-profile context."""
try:
@@ -1028,6 +1075,29 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
_compliance_check_jobs[check_id]["progress"] = "Fertig"
_compliance_check_jobs[check_id]["progress_pct"] = 100
# P80: persist raw scan data so we can replay audit pipeline
# without re-crawling (7min -> 5sec test cycle).
try:
from database import SessionLocal
from compliance.services.check_snapshot import save_snapshot
snap_db = SessionLocal()
try:
save_snapshot(
snap_db,
check_id=check_id,
doc_entries=doc_entries,
banner_result=banner_result,
profile=profile,
cmp_vendors=cmp_vendors,
scan_context=None, # P79 will fill this
site_label=site_name,
notes=f"recipient={req.recipient}",
)
finally:
snap_db.close()
except Exception as snap_err:
logger.warning("P80 snapshot save skipped: %s", snap_err)
# Persist to sidecar SQLite audit log — enables /audit endpoints
# (A5 admin tab) and trend view (A6). Best-effort; failures here
# do not affect the user-facing response.
@@ -0,0 +1,179 @@
"""
P80 Snapshot + Replay-Helper.
Persistiert die Roh-Daten eines Compliance-Check-Laufs (DSE-Text,
Banner-HTML, Cookies, CMP-Vendors, Profile), damit die Audit-Pipeline
spaeter ohne erneuten Browser-Crawl die Mail-Render-/MC-Scoring-Logik
neu laufen kann.
Use Cases:
* Logik-Iteration (MC-Filter P72, Mail-Layout, Action-Recipes) ohne
7min Re-Crawl.
* Regression-Test: Golden-Truth-Library (P81).
* Diff-Mode: "was hat sich seit letztem Snapshot geaendert" (P84).
"""
from __future__ import annotations
import json
import logging
from typing import Any
from urllib.parse import urlparse
from sqlalchemy import text
from sqlalchemy.orm import Session
logger = logging.getLogger(__name__)
def _to_jsonb(obj: Any) -> str:
"""Serialize to JSON-string for psycopg2 JSONB insertion."""
return json.dumps(obj, default=str, ensure_ascii=False)
def _derive_site_domain(doc_entries: list[dict]) -> str:
for e in doc_entries or []:
url = (e.get("url") or "").strip()
if url:
try:
netloc = urlparse(url).netloc.lower().replace("www.", "")
if netloc:
return netloc
except Exception:
continue
return "unknown"
def save_snapshot(
db: Session,
check_id: str,
doc_entries: list[dict],
banner_result: dict | None,
profile: Any,
cmp_vendors: list[dict] | None = None,
scan_context: dict | None = None,
site_label: str | None = None,
notes: str | None = None,
) -> str | None:
"""Persist scan raw data. Returns snapshot UUID on success."""
try:
profile_dict: dict = {}
if profile is not None:
if hasattr(profile, "__dict__"):
profile_dict = {k: v for k, v in profile.__dict__.items()
if not k.startswith("_")}
elif isinstance(profile, dict):
profile_dict = profile
domain = _derive_site_domain(doc_entries or [])
result = db.execute(
text("""
INSERT INTO compliance.compliance_check_snapshots
(check_id, site_domain, site_label,
doc_entries, banner_result, profile,
scan_context, cmp_vendors, notes)
VALUES (:cid, :dom, :lbl,
CAST(:de AS JSONB), CAST(:br AS JSONB), CAST(:pr AS JSONB),
CAST(:sc AS JSONB), CAST(:cv AS JSONB), :nt)
RETURNING id
"""),
{
"cid": check_id,
"dom": domain,
"lbl": site_label,
"de": _to_jsonb(doc_entries or []),
"br": _to_jsonb(banner_result) if banner_result else None,
"pr": _to_jsonb(profile_dict) if profile_dict else None,
"sc": _to_jsonb(scan_context) if scan_context else None,
"cv": _to_jsonb(cmp_vendors) if cmp_vendors else None,
"nt": notes,
},
)
snapshot_id = str(result.fetchone()[0])
db.commit()
logger.info(
"P80: snapshot saved id=%s check=%s domain=%s docs=%d",
snapshot_id, check_id, domain, len(doc_entries or []),
)
return snapshot_id
except Exception as e:
logger.warning("P80 snapshot save failed for %s: %s", check_id, e)
try:
db.rollback()
except Exception:
pass
return None
def load_snapshot(db: Session, snapshot_id: str) -> dict | None:
"""Load a snapshot by UUID. Returns dict with all fields or None."""
try:
row = db.execute(
text("""
SELECT id, check_id, site_domain, site_label,
doc_entries, banner_result, profile,
scan_context, cmp_vendors, created_at,
replay_count, notes
FROM compliance.compliance_check_snapshots
WHERE id = CAST(:sid AS uuid)
"""),
{"sid": snapshot_id},
).fetchone()
if not row:
return None
db.execute(
text("""
UPDATE compliance.compliance_check_snapshots
SET replay_count = replay_count + 1,
last_replay_at = now()
WHERE id = CAST(:sid AS uuid)
"""),
{"sid": snapshot_id},
)
db.commit()
return {
"id": str(row[0]),
"check_id": row[1],
"site_domain": row[2],
"site_label": row[3],
"doc_entries": row[4] or [],
"banner_result": row[5],
"profile": row[6] or {},
"scan_context": row[7] or {},
"cmp_vendors": row[8] or [],
"created_at": str(row[9]),
"replay_count": row[10],
"notes": row[11],
}
except Exception as e:
logger.warning("P80 snapshot load failed for %s: %s", snapshot_id, e)
return None
def list_snapshots_for_domain(db: Session, domain: str, limit: int = 20) -> list[dict]:
"""List recent snapshots for a domain (for diff-mode P84)."""
try:
rows = db.execute(
text("""
SELECT id, check_id, site_domain, created_at, replay_count, notes
FROM compliance.compliance_check_snapshots
WHERE site_domain = :dom
ORDER BY created_at DESC
LIMIT :lim
"""),
{"dom": domain.lower().replace("www.", ""), "lim": limit},
).fetchall()
return [
{
"id": str(r[0]),
"check_id": r[1],
"site_domain": r[2],
"created_at": str(r[3]),
"replay_count": r[4],
"notes": r[5],
}
for r in rows
]
except Exception as e:
logger.warning("P80 list_snapshots failed for %s: %s", domain, e)
return []
@@ -0,0 +1,52 @@
-- P72: scope_doc_type fuer canonical_controls
--
-- Erlaubt zu unterscheiden welcher Dokument-Typ der eigentliche Adressat
-- eines MC ist. Bisher landete jeder MC in jedem Doc-Audit was zu Noise
-- fuehrt (z.B. "elektronische Gesundheitsdaten-Transmission" landet im
-- Cookie-Richtlinie-Audit eines Autobauers).
--
-- Werte:
-- cookie_richtlinie — Pflichtangaben Cookie-RL nach DSK-OH 2024
-- dse — Pflichtangaben Datenschutzerklaerung Art. 13/14
-- banner_implementation — Banner-UI-Anforderungen (nicht Text)
-- z.B. "keine pre-ticked Checkboxes"
-- cmp_audit — Consent-Management-Plattform-Audit-Trail
-- z.B. "jede Einwilligung mit Zeitstempel speichern"
-- tom — Technisch-organisatorische Massnahmen
-- z.B. "verschluesselte Backups"
-- avv — Auftragsverarbeitungsvertrag-Inhalt
-- jc — Joint-Controller-Vereinbarung Art. 26
-- impressum — §5 TMG / §18 MStV
-- agb — Allgemeine Geschaeftsbedingungen
-- widerruf — Widerrufsbelehrung
-- process — Prozess-Anforderung (nicht textbasiert,
-- kann nicht durch Text-Einfuegung erfuellt werden)
-- accounting — Rechnungsstellung (UStG, HGB) — nicht Compliance
-- other — Faellt keiner Kategorie zu (Default)
--
-- NULL = noch nicht klassifiziert (Backfill-Skript setzt Wert).
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_name = 'canonical_controls'
AND table_schema = 'compliance'
) THEN
ALTER TABLE compliance.canonical_controls
ADD COLUMN IF NOT EXISTS scope_doc_type VARCHAR(40) DEFAULT NULL
CHECK (scope_doc_type IS NULL OR scope_doc_type IN (
'cookie_richtlinie', 'dse', 'banner_implementation',
'cmp_audit', 'tom', 'avv', 'jc',
'impressum', 'agb', 'widerruf',
'process', 'accounting', 'other'
));
CREATE INDEX IF NOT EXISTS idx_cc_scope_doc_type
ON compliance.canonical_controls(scope_doc_type);
COMMENT ON COLUMN compliance.canonical_controls.scope_doc_type IS
'P72: Doc-Type Adressat. NULL = nicht klassifiziert. Findings nur '
'beim passenden Doc-Type anzeigen, sonst Noise.';
END IF;
END $$;
@@ -0,0 +1,40 @@
-- P80: Compliance-Check Snapshots fuer Replay-Mode
--
-- Persistiert die Roh-Daten eines Scans (DSE-Text, Banner-HTML, Cookies,
-- CMP-Vendors, Profile) damit die Audit-Pipeline ohne erneuten Crawl
-- nur die Interpretations-Logik (MC-Scorecard, Mail-Render) neu laufen
-- kann. Test-Cycle 7min -> 5-10sec bei reinen Logik-Aenderungen.
DO $$
BEGIN
CREATE TABLE IF NOT EXISTS compliance.compliance_check_snapshots (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
check_id VARCHAR(36) NOT NULL,
site_domain VARCHAR(255) NOT NULL,
site_label VARCHAR(255),
-- Roh-Daten als JSONB (alles was sich pro Lauf NICHT aendert)
doc_entries JSONB NOT NULL, -- [{doc_type, url, full_text, cmp_payloads, ...}]
banner_result JSONB, -- {phases, cookies_detailed, cmp_vendors, ...}
profile JSONB, -- {business_type, industry, no_direct_sales, ...}
scan_context JSONB, -- P79: User-Pre-Scan-Felder
cmp_vendors JSONB, -- vendor-list (post-Phase G)
-- Meta
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
replay_count INTEGER NOT NULL DEFAULT 0,
last_replay_at TIMESTAMP WITH TIME ZONE,
notes TEXT
);
CREATE INDEX IF NOT EXISTS idx_snapshots_check_id
ON compliance.compliance_check_snapshots(check_id);
CREATE INDEX IF NOT EXISTS idx_snapshots_domain
ON compliance.compliance_check_snapshots(site_domain);
CREATE INDEX IF NOT EXISTS idx_snapshots_created
ON compliance.compliance_check_snapshots(created_at DESC);
COMMENT ON TABLE compliance.compliance_check_snapshots IS
'P80 Replay-Mode: persistierte Roh-Daten eines Scans. Ermoeglicht '
'Audit-Pipeline ohne erneuten Browser-Crawl neu zu laufen.';
END $$;
@@ -0,0 +1,213 @@
#!/usr/bin/env python3
"""P72 — Backfill scope_doc_type fuer compliance.canonical_controls.
Heuristik: pro MC schauen Title/Objective/Tags/verification_method an und
klassifizieren nach dem ECHTEN Adressaten. Default: 'other'.
Ziel: 60-80% der heutigen MC-HIGH-Noise verschwindet aus Cookie/DSE-Audit
und landet beim richtigen Adressaten (Impressum, AGB, TOM, Banner-Impl).
"""
from __future__ import annotations
import os
import re
import sys
from typing import Pattern
import psycopg2
# ---------------------------------------------------------------------------
# Klassifizierungs-Regeln (Reihenfolge = Prioritaet, erste Treffer gewinnt).
# Tuple: (scope_doc_type, regex_pattern_lower)
# ---------------------------------------------------------------------------
RULES: list[tuple[str, Pattern]] = [
# Banner-Implementierung (UI, nicht Text) — hoechste Prio
(
"banner_implementation",
re.compile(
r"\b(banner|cookie[-\s]?wall|pre[-\s]?ticked|"
r"vorausgewaehlt(e|en)?\s*checkbox|"
r"browser[-\s]?(default|standard|einstellung)|"
r"dark[-\s]?pattern|reject.{0,20}button|ablehn.{0,20}button|"
r"einwilligung.{0,30}aktive.{0,20}handlung|"
r"floating.{0,20}icon)"
),
),
# CMP-Audit-Trail
(
"cmp_audit",
re.compile(
r"\b(consent[-\s]?(log|trail|audit)|"
r"konsent[-\s]?trag(er|er-?id)|"
r"einwilligungs(nachweis|log|trail|protokoll)|"
r"datensaetze?.{0,30}einwilligung|"
r"zeitstempel.{0,30}einwilligung|"
r"cmp[-\s]?audit)"
),
),
# AVV (Art. 28)
(
"avv",
re.compile(
r"\b(art\.?\s*28|auftragsverarbeit|adv|avv|"
r"data[-\s]?processing[-\s]?agreement|dpa)"
),
),
# JC (Art. 26)
(
"jc",
re.compile(
r"\b(art\.?\s*26|joint[-\s]?controller|"
r"gemeinsam(e|er)\s*verantwortlich|"
r"konzern.{0,40}(verantwortlich|verarbeit))"
),
),
# Impressum (§5 TMG / §18 MStV)
(
"impressum",
re.compile(
r"\b((paragraph|§)\s*5\s*(tmg|ddg)|"
r"§\s*18\s*mstv|"
r"impressum|anbieterkennzeichnung|"
r"geschaeftsbrief|firma.{0,20}kaufmann|"
r"vollstaendige.{0,20}geschaeftsadresse|"
r"identitaet.{0,20}leistungserbringer|"
r"postalische?.{0,30}adresse)"
),
),
# AGB
(
"agb",
re.compile(
r"\b(agb|allgemeine\s*geschaeftsbedingungen|"
r"vertragsbedingungen|"
r"§\s*305.{0,5}(bgb)?)"
),
),
# Widerruf
(
"widerruf",
re.compile(
r"\b(widerrufsbelehrung|widerrufsrecht|"
r"14.{0,10}tage.{0,10}frist|"
r"musterwiderruf)"
),
),
# Accounting (UStG/Rechnungsstellung — NICHT Compliance-Audit)
(
"accounting",
re.compile(
r"\b((rechnung|invoice).{0,30}(angeben|enthalten|fuehren)|"
r"§\s*14\s*ustg|umsatzsteueridentifikation\s+nummer.{0,30}rechnung|"
r"buchhaltung|"
r"steuernummer.{0,30}rechnung)"
),
),
# TOM (Art. 32 + technische Sicherheit)
(
"tom",
re.compile(
r"\b(art\.?\s*32|verschluesselung|backup|"
r"pseudonymisier|anonymisier|"
r"zugriffskontrolle|berechtigungskonzept|"
r"penetrationstest|security[-\s]?incident|"
r"intrusion[-\s]?detection|firewall|"
r"tom|technisch[-\s]?organisatorische)"
),
),
# Cookie-Richtlinie (vor DSE pruefen, weil enger)
(
"cookie_richtlinie",
re.compile(
r"\b(cookie[-\s]?richtlinie|cookie[-\s]?(policy|liste|tabelle)|"
r"§\s*25\s*(tddg|tdddg|ttdsg)|"
r"cookie.{0,30}(zweck|speicherdauer|drittland|anbieter))"
),
),
# DSE (Art. 13/14 — breit, daher spaet)
(
"dse",
re.compile(
r"\b(art\.?\s*1[34]|datenschutzerklaerung|"
r"datenschutzhinweis|datenschutzinformation|"
r"informationspflicht|"
r"empfaenger(\s*oder\s*empfaengerkategorien)?|"
r"drittland.{0,30}(transfer|uebermittlung)|"
r"verantwortlich(er|en)\s+benennen|"
r"betroffenenrecht|art\.?\s*1[5-9]|art\.?\s*2[0-2])"
),
),
# Process (nicht text-basiert, kann nicht durch Text-Einfuegung erfuellt werden)
(
"process",
re.compile(
r"\b(prozess|verfahren|workflow|"
r"durchfuehren|umsetzen|implementieren|"
r"schulung|mitarbeiterunterweis|"
r"regelmaessig.{0,30}pruefen|"
r"kontinuierlich|laufend|"
r"datenpannenmeldung|art\.?\s*33|"
r"loeschkonzept.{0,30}umsetz)"
),
),
]
def classify(title: str, objective: str, tags: str | None = None) -> str:
"""Apply rules in order, return first match. Default: 'other'."""
text = " ".join(
s.lower() for s in (title or "", objective or "", tags or "") if s
)
for scope, pattern in RULES:
if pattern.search(text):
return scope
return "other"
def main() -> int:
dsn = os.environ.get("DATABASE_URL")
if not dsn:
print("DATABASE_URL missing", file=sys.stderr)
return 1
conn = psycopg2.connect(dsn)
cur = conn.cursor()
cur.execute("""
SELECT id, title, objective, tags
FROM compliance.canonical_controls
WHERE scope_doc_type IS NULL
AND merged_into_uuid IS NULL
""")
rows = cur.fetchall()
print(f"Backfilling {len(rows)} unscoped MCs...", file=sys.stderr)
from collections import Counter
stats = Counter()
batch = []
for row_id, title, objective, tags in rows:
scope = classify(title or "", objective or "", tags)
stats[scope] += 1
batch.append((scope, row_id))
if len(batch) >= 1000:
cur.executemany(
"UPDATE compliance.canonical_controls SET scope_doc_type=%s WHERE id=%s",
batch,
)
conn.commit()
print(f" committed {sum(stats.values())} so far", file=sys.stderr)
batch = []
if batch:
cur.executemany(
"UPDATE compliance.canonical_controls SET scope_doc_type=%s WHERE id=%s",
batch,
)
conn.commit()
print("\n=== Distribution ===")
for scope, n in sorted(stats.items(), key=lambda x: -x[1]):
print(f" {scope:25s} {n:>6} ({100*n/max(1,len(rows)):.1f}%)")
return 0
if __name__ == "__main__":
sys.exit(main())