feat(audit): P106 MC-Audit-Type + P83 BUILD_SHA in Dockerfiles + P80 v2 full
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 16s
CI / detect-changes (push) Successful in 11s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 16s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m42s
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 41s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped

P106 — mc_audit_type.py: zentrales Quality-Thema.
Klassifiziert pro MC: verifiable / process_internal / doc_internal /
ambiguous. Pattern-Match auf check_question + title + fail_criteria
(Schulung, AVV abgeschlossen, TOM umgesetzt, DSFA durchgefuehrt,
Ausnahmen dokumentieren, kostenfrei zur Verfuegung, opt-out
intern ermoeglichen, …).

Interne MCs werden in der MC-Auswertung NICHT mehr als FAIL gewertet,
sondern als CHECK markiert (audit_status='check'). Sie zaehlen im
build_scorecard als skipped (nicht failed) damit der Score realistisch
ist. build_internal_checks_block_html() rendert sie als separaten
blauen Block 'Pruefungen die wir von aussen NICHT durchfuehren koennen'
nach dem MC-Scorecard.

Erwartete Wirkung: bei VW 95 FAILs → wahrscheinlich 30-40 echte
verifiable_fails + 50-60 internal_checks. GF-Mail wird drastisch
realistischer (statt 'Sie haben 95 Verstoesse' → 'Sie haben 35
extern sichtbare Themen + 60 interne Checks, bitte mit DSB klaeren').

P83 — BUILD_SHA in backend/admin/consent-tester Dockerfiles als
ARG + ENV. check-rebuild-needed.sh kann jetzt deployed vs local SHA
vergleichen + REBUILD REQUIRED melden.

P80 v2 — check_replay.py macht jetzt vollstaendigen Replay aller
post-fetch Quality-Generatoren: vendor_normalizer (Dedup),
audit_quality_checks, cookie_compliance_audit, tcf_vendor_authority,
cookie_value_entropy, cookie_network_tracer. Snapshots aus alter Zeit
zeigen jetzt im Replay den aktuellen Audit-Stand.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-22 08:57:02 +02:00
parent bd65b6f318
commit e2be51b0aa
7 changed files with 384 additions and 11 deletions
+4
View File
@@ -60,5 +60,9 @@ EXPOSE 8002
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://127.0.0.1:8002/health || exit 1
# P83 — Build-SHA fuer check-rebuild-needed.sh
ARG BUILD_SHA="unknown"
ENV BUILD_SHA=${BUILD_SHA}
# Run the application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8002"]
@@ -1184,6 +1184,22 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
if (not c.passed and not c.skipped
and (c.severity or "").upper() in ("CRITICAL", "HIGH")):
fails_by_doc.setdefault(r.doc_type, []).append(rec)
# P106 — Audit-Type-Klassifizierung pro MC. Interne Prozess-/
# Doku-Checks werden NICHT als FAIL gewertet sondern als CHECK
# (manuelle Pruefung beim DSB notwendig).
try:
from compliance.services.mc_audit_type import (
annotate_mc_results, split_by_audit_type,
)
annotate_mc_results(all_mc_checks)
mc_split = split_by_audit_type(all_mc_checks)
# Fails-by-doc neu aufbauen: nur noch echte verifiable Fails
fails_by_doc = {}
for r in mc_split.get("verifiable_fails") or []:
fails_by_doc.setdefault("dse", []).append(r)
except Exception as e:
logger.warning("P106 mc_audit_type skipped: %s", e)
mc_split = {"internal_checks": [], "verifiable_fails": all_mc_checks}
scorecard = build_scorecard(all_mc_checks) if all_mc_checks else {}
# Trend: load previous scorecard for the same tenant + domain so the
# email can show delta indicators (A6).
@@ -1503,6 +1519,22 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
except Exception as e:
logger.warning("industry context skipped: %s", e)
# P106 — Internal-Checks-Block (interne Prozesse / Doku-Pflichten)
internal_checks_html = ""
try:
from compliance.services.mc_audit_type import (
build_internal_checks_block_html,
)
ic = (mc_split or {}).get("internal_checks") or []
if ic:
internal_checks_html = build_internal_checks_block_html(ic)
logger.info(
"P106: %d interne Checks (statt FAIL) im Block",
len(ic),
)
except Exception as e:
logger.warning("P106 internal_checks_html skipped: %s", e)
# P85 — Banner-Screenshot fuer visuellen Beweis (zwischen
# GF-1-Pager und Detail-Bloecken)
banner_shot_html = ""
@@ -1612,7 +1644,7 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest):
+ bench_html + diff_html
+ critical_html + scope_disclaimer_html + exec_summary_html
+ cookie_arch_html + summary_html + scanned_html + profile_html
+ scorecard_html + redundancy_html
+ scorecard_html + internal_checks_html + redundancy_html
+ industry_ctx_html
+ banner_shot_html
+ providers_html + banner_deep_html
@@ -86,27 +86,81 @@ def replay_from_snapshot(
parts: list[str] = []
# P80 v2 — Quality-Checks aus dem aktuellen Code auf Snapshot-Daten
# anwenden. Wir replayen NICHT die MC-Pipeline (zu schwer ohne
# rag_document_checker re-run), aber alle nachgelagerten Findings-
# Generatoren (audit_quality, cookie_compliance_audit, vendor_normalizer,
# entropy, network-trace) bekommen Snapshot-Daten und liefern den
# aktuellen Stand.
# anwenden. Vollstaendiger Replay aller post-fetch Findings-Generatoren.
cookie_t = doc_texts.get("cookie") or doc_texts.get("dse") or ""
# Vendor-Normalize (Dedup + Garbage-Filter)
try:
from compliance.services.vendor_normalizer import normalize_vendors
cmp_vendors = normalize_vendors(list(cmp_vendors))
except Exception as e:
logger.warning("Replay v2: normalizer failed: %s", e)
# Audit-Quality
try:
from compliance.services.audit_quality_checks import (
run_all as run_aq,
run_all as run_aq, build_audit_quality_block_html,
)
cookie_t = doc_texts.get("cookie") or doc_texts.get("dse") or ""
aq = run_aq(banner_result, cookie_t, cmp_vendors, doc_entries)
if aq:
from compliance.services.audit_quality_checks import (
build_audit_quality_block_html,
)
aq_html = build_audit_quality_block_html(aq)
parts.append(aq_html)
section_sizes["audit_quality_v2"] = len(aq_html)
except Exception as e:
logger.warning("Replay v2: audit_quality failed: %s", e)
# Cookie-Compliance-Audit
try:
from compliance.services.cookie_compliance_audit import (
audit_cookie_compliance, build_cookie_audit_block_html,
)
ca = audit_cookie_compliance(db, cookie_t, banner_result)
if ca and (ca.get("declared_count") or ca.get("browser_count")):
ca_html = build_cookie_audit_block_html(ca)
parts.append(ca_html)
section_sizes["cookie_audit_v2"] = len(ca_html)
except Exception as e:
logger.warning("Replay v2: cookie_audit failed: %s", e)
# TCF Authority
try:
from compliance.services.tcf_vendor_authority import (
cross_reference_with_tcf, build_tcf_authority_block_html,
)
tcf = cross_reference_with_tcf(db, cmp_vendors)
if tcf:
tcf_html = build_tcf_authority_block_html(tcf)
parts.append(tcf_html)
section_sizes["tcf_v2"] = len(tcf_html)
except Exception as e:
logger.warning("Replay v2: tcf failed: %s", e)
# Entropy + Network-Trace
try:
from compliance.services.cookie_value_entropy import (
check_cookies_for_entropy_mismatch, build_entropy_block_html,
)
from compliance.services.cookie_network_tracer import (
trace_cookie_network, build_network_trace_block_html,
)
cd = (banner_result or {}).get("cookies_detailed") or []
e1 = check_cookies_for_entropy_mismatch(cd)
if e1:
ent_html = build_entropy_block_html(e1)
parts.append(ent_html)
section_sizes["entropy_v2"] = len(ent_html)
site_url = ""
for entry in (doc_entries or []):
if entry.get("url"):
site_url = entry["url"]; break
net = trace_cookie_network(cd, site_url)
if net:
net_html = build_network_trace_block_html(net)
parts.append(net_html)
section_sizes["network_trace_v2"] = len(net_html)
except Exception as e:
logger.warning("Replay v2: entropy/network failed: %s", e)
# P82: GF-1-Pager zuerst (5-Bullet-Summary)
try:
from compliance.services.gf_one_pager import build_gf_one_pager_html
@@ -0,0 +1,269 @@
"""
P106 — MC-Audit-Type-Klassifizierung.
Zentrales Problem: viele Master-Controls pruefen Sachverhalte, die wir
von Aussen GAR NICHT pruefen koennen — z.B. ob das Unternehmen einen
internen Loeschkonzept-Prozess hat oder Schulungen durchgefuehrt wurden.
Bisher: alle MCs deren Pattern im Text nicht matched → FAIL.
Folge: GF-Mail mit 95 FAILs, davon ~60-70 in Wirklichkeit nur 'unknown'.
Loesung: pro MC klassifizieren:
* verifiable → Pattern muss im sichtbaren Dokument stehen (Audit moeglich)
* process_internal → interner Prozess des Kunden (Schulung, AVV-Vertrag, …)
* doc_internal → interne Dokumentation (VVT-Eintrag, DSFA-File, …)
* ambiguous → koennte beides sein
In der MC-Auswertung:
* verifiable + Pattern fehlt → echtes FAIL ❌
* process_internal → CHECK (Hinweis 'Bitte intern pruefen') ⓘ
* doc_internal → CHECK (Hinweis 'Im VVT/DSFA dokumentiert?') ⓘ
* ambiguous → CHECK mit Warnung
"""
from __future__ import annotations
import logging
import re
logger = logging.getLogger(__name__)
# Patterns die auf interne Prozesse hindeuten (NICHT von aussen pruefbar)
_PROCESS_INTERNAL_PATTERNS = [
# Schulung / Mitarbeiter
r"\bmitarbeiter\b.*schul",
r"\bschulung(en)?\b",
r"\bawareness\b",
r"\bsensibilisier",
# Vertraege intern
r"\bauftragsverarbeitungsvertrag\b",
r"\bAVV\b\s+abgeschlossen",
r"\bvertrag.*abgeschlossen",
r"\bdpa\s+(geschlossen|abgeschlossen|vorhanden)",
r"\bSCC\s+(geschlossen|abgeschlossen|implementiert)",
# Technisch-organisatorische Massnahmen (intern)
r"\btechnisch[-\s]*organisatorische\s+ma(ß|ss)nahmen?\b",
r"\bTOM\s+(umgesetzt|dokumentiert|implementiert)",
r"\bverschluesselung\s+(implementiert|aktiv)",
r"\bpseudonymisierung\s+(implementiert|aktiv)",
r"\bbackup[s]?\s+(eingerichtet|vorhanden)",
r"\bzugriffskontrolle",
r"\b(rollen|berechtigungs)konzept",
# Risikobewertung / DSFA (intern)
r"\bdsfa\s+(durchgefuehrt|erstellt|dokumentiert)",
r"\brisikobewertung\s+(durchgefuehrt|dokumentiert)",
r"\brisikoanalyse",
# Loeschkonzept / Aufbewahrung
r"\bloeschkonzept\s+(umgesetzt|implementiert)",
r"\baufbewahrungsfrist(en)?\s+(eingehalten|definiert)",
r"\bloeschroutinen?\s+(aktiv|implementiert)",
# Meldewege / Vorfallmanagement
r"\bmeldepflicht\s+(eingehalten|umgesetzt)",
r"\bvorfallmanagement",
r"\bincident[\s-]?response",
r"\b72[\s-]?stunden[\s-]?meldung",
# Generische Prozess-Indikatoren
r"\bdokumentiert\s+werden",
r"\bbitte\s+(intern\s+)?dokumentieren",
r"\bin\s+der\s+verfahrens",
r"\bnach\s+innen\s+geh",
r"\bausnahmen\s+(dokumentieren|protokollieren)",
r"\bkostenfrei\s+(zur\s+verfuegung|gewaehren|ermoegli)",
r"\bunentgeltlich\s+(zur\s+verfuegung)",
# Vertragsleistung / Service-Level (intern)
r"\bservice[\s-]?level",
r"\breaktionszeit",
# Auditierung / Aufsicht
r"\binterne(s)?\s+audit",
r"\baufsichtsbehoerde\s+gemeldet",
r"\bbeauftragter\s+(intern|benannt)",
# eCall + Branchen-spezifische interne Pflichten
r"\babschaltung\s+der\s+\w+\s+kostenfrei",
r"\bopt[\s-]?out\s+(intern|im\s+kundenportal)\s+ermoeglichen",
]
# Patterns die auf interne Dokumentation hindeuten (VVT, DSFA-Datei, …)
_DOC_INTERNAL_PATTERNS = [
r"\bverzeichnis\s+der\s+verarbeitungstaetigkeiten\b",
r"\bvvt(\s+|\b)",
r"\bdsfa[\s-]?dokument",
r"\bauftragsverarbeitungsverzeichnis",
r"\bsub[\s-]?prozessor[\s-]?liste",
r"\bverarbeitungs[\s-]?register",
r"\binternes\s+register",
r"\baufbewahrungs[\s-]?konzept\b",
]
# Patterns die auf externe Sichtbarkeit hindeuten → DEFINITIV verifiable
_VERIFIABLE_PATTERNS = [
r"\bin\s+der\s+(datenschutzerklaerung|dse|cookie[\s-]?richtlinie|impressum|agb)\b",
r"\bauf\s+der\s+website\s+(genannt|sichtbar|angegeben)",
r"\bim\s+banner\s+(genannt|sichtbar)",
r"\bim\s+cookie[\s-]?banner",
r"\bauf\s+der\s+startseite",
r"\bim\s+footer",
]
def _matches_any(text: str, patterns: list[str]) -> bool:
tl = text.lower()
for pat in patterns:
try:
if re.search(pat, tl):
return True
except re.error:
continue
return False
def classify_mc_audit_type(
title: str | None,
check_question: str | None = None,
fail_criteria: dict | None = None,
) -> str:
"""Returns 'verifiable', 'process_internal', 'doc_internal',
or 'ambiguous'."""
blob = " ".join([title or "", check_question or "",
str(fail_criteria or "")])
if not blob.strip():
return "ambiguous"
is_verifiable_hint = _matches_any(blob, _VERIFIABLE_PATTERNS)
is_process = _matches_any(blob, _PROCESS_INTERNAL_PATTERNS)
is_doc = _matches_any(blob, _DOC_INTERNAL_PATTERNS)
# Wenn explicit Verifiable-Indikator + kein Process → verifiable
if is_verifiable_hint and not (is_process or is_doc):
return "verifiable"
# Wenn Process oder Doc UND nicht Verifiable → intern
if is_process and not is_verifiable_hint:
return "process_internal"
if is_doc and not is_verifiable_hint:
return "doc_internal"
# Beides → ambiguous, im Zweifel CHECK markieren
if is_process or is_doc:
return "ambiguous"
return "verifiable"
def annotate_mc_results(check_results: list[dict]) -> list[dict]:
"""In-place: setzt mc_audit_type auf jeden MC-Check und ersetzt
Status 'failed' durch 'check' wenn audit_type != verifiable."""
if not check_results:
return check_results
n_reclassified = 0
for r in check_results:
if not isinstance(r, dict):
continue
if not (r.get("id") or "").startswith("mc-"):
continue
if "mc_audit_type" not in r:
r["mc_audit_type"] = classify_mc_audit_type(
r.get("label"), r.get("hint"), r.get("fail_criteria"),
)
# Wenn FAIL aber audit_type != verifiable → "check" (manuell)
if (not r.get("passed")
and not r.get("skipped")
and r["mc_audit_type"] in (
"process_internal", "doc_internal", "ambiguous",
)):
r["audit_status"] = "check" # NICHT failed
n_reclassified += 1
elif r.get("passed"):
r["audit_status"] = "pass"
elif r.get("skipped"):
r["audit_status"] = "skip"
else:
r["audit_status"] = "fail"
if n_reclassified:
logger.info(
"MC-Audit-Type: %d/%d MCs reklassifiziert von FAIL → CHECK "
"(interne Pruefung erforderlich)",
n_reclassified, len(check_results),
)
return check_results
def split_by_audit_type(check_results: list[dict]) -> dict[str, list[dict]]:
"""Liefert {verifiable_fails, internal_checks, passes, skips}."""
out = {"verifiable_fails": [], "internal_checks": [],
"passes": [], "skips": []}
for r in (check_results or []):
if not isinstance(r, dict):
continue
if not (r.get("id") or "").startswith("mc-"):
continue
status = r.get("audit_status")
if status == "pass":
out["passes"].append(r)
elif status == "skip":
out["skips"].append(r)
elif status == "check":
out["internal_checks"].append(r)
elif status == "fail" or (not r.get("passed") and not r.get("skipped")):
out["verifiable_fails"].append(r)
return out
def build_internal_checks_block_html(
internal_checks: list[dict],
limit: int = 30,
) -> str:
if not internal_checks:
return ""
by_type: dict[str, list[dict]] = {}
for c in internal_checks:
t = c.get("mc_audit_type", "ambiguous")
by_type.setdefault(t, []).append(c)
sections: list[str] = []
labels = {
"process_internal": ("Interne Prozesse — bitte beim DSB pruefen",
"#1e40af"),
"doc_internal": ("Interne Dokumentation — bitte im VVT/DSFA pruefen",
"#5b21b6"),
"ambiguous": ("Unklar ob Audit-Befund oder interne Pruefung",
"#92400e"),
}
for atype, (heading, color) in labels.items():
items = by_type.get(atype) or []
if not items:
continue
rows = "".join(
f'<li style="margin-bottom:4px;font-size:11px;line-height:1.45">'
f'<strong>{(c.get("label") or "")[:160]}</strong>'
+ (f' <span style="color:#94a3b8">({c.get("regulation") or ""})</span>'
if c.get("regulation") else '') +
f'</li>'
for c in items[:limit]
)
sections.append(
f'<div style="margin-bottom:10px">'
f'<div style="font-size:11px;color:{color};text-transform:uppercase;'
f'letter-spacing:1px;font-weight:600;margin-bottom:4px">'
f'{heading} ({len(items)})</div>'
f'<ul style="margin:0 0 0 18px;padding:0">{rows}</ul>'
f'</div>'
)
return (
'<div style="font-family:-apple-system,BlinkMacSystemFont,sans-serif;'
'max-width:760px;margin:0 auto 16px;padding:12px 16px;'
'background:#f0f9ff;border:1px solid #bfdbfe;border-radius:8px">'
'<div style="font-size:11px;color:#1e40af;text-transform:uppercase;'
'letter-spacing:1.2px;margin-bottom:4px;font-weight:600">'
'Pruefungen die wir von aussen NICHT durchfuehren koennen</div>'
f'<h3 style="margin:0 0 6px;font-size:14px;color:#1e293b">'
f'{len(internal_checks)} Pruefpunkt'
f'{"e" if len(internal_checks) != 1 else ""} sind '
'NUR intern beim Kunden zu pruefen</h3>'
'<p style="margin:0 0 10px;font-size:11px;color:#475569;'
'line-height:1.5">'
'Diese Anforderungen koennen wir per externem Website-Audit nicht '
'als erfuellt oder nicht-erfuellt bewerten — sie betreffen interne '
'Prozesse (Schulungen, AVV-Vertraege, TOM-Doku) oder interne '
'Dokumentation (VVT, DSFA, Loeschkonzept). Sie sind also <strong>kein '
'Verstoss</strong>, sondern Hinweis-Checks fuer Ihren DSB.</p>'
+ "".join(sections) +
'</div>'
)
@@ -61,6 +61,12 @@ def build_scorecard(check_results: list[dict]) -> dict:
b["skipped"] += 1
elif r.get("passed"):
b["passed"] += 1
# P106 — interner Check ist KEIN Fail (zaehlt als skipped fuer
# die Score-Berechnung damit der Score realistisch ist).
elif r.get("audit_status") == "check":
b["skipped"] += 1
b.setdefault("internal_checks", 0)
b["internal_checks"] += 1
else:
b["failed"] += 1
sev = (r.get("severity") or "MEDIUM").upper()