diff --git a/admin-compliance/Dockerfile b/admin-compliance/Dockerfile index b98962ae..fc0f4688 100644 --- a/admin-compliance/Dockerfile +++ b/admin-compliance/Dockerfile @@ -55,5 +55,9 @@ EXPOSE 3000 # Set hostname ENV HOSTNAME="0.0.0.0" +# P83 — Build-SHA fuer check-rebuild-needed.sh +ARG BUILD_SHA="unknown" +ENV BUILD_SHA=${BUILD_SHA} + # Start the application CMD ["node", "server.js"] diff --git a/backend-compliance/Dockerfile b/backend-compliance/Dockerfile index d54f2aa1..84cc029e 100644 --- a/backend-compliance/Dockerfile +++ b/backend-compliance/Dockerfile @@ -60,5 +60,9 @@ EXPOSE 8002 HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ CMD curl -f http://127.0.0.1:8002/health || exit 1 +# P83 — Build-SHA fuer check-rebuild-needed.sh +ARG BUILD_SHA="unknown" +ENV BUILD_SHA=${BUILD_SHA} + # Run the application CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8002"] diff --git a/backend-compliance/compliance/api/agent_compliance_check_routes.py b/backend-compliance/compliance/api/agent_compliance_check_routes.py index 5d312061..6f928309 100644 --- a/backend-compliance/compliance/api/agent_compliance_check_routes.py +++ b/backend-compliance/compliance/api/agent_compliance_check_routes.py @@ -1184,6 +1184,22 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): if (not c.passed and not c.skipped and (c.severity or "").upper() in ("CRITICAL", "HIGH")): fails_by_doc.setdefault(r.doc_type, []).append(rec) + # P106 — Audit-Type-Klassifizierung pro MC. Interne Prozess-/ + # Doku-Checks werden NICHT als FAIL gewertet sondern als CHECK + # (manuelle Pruefung beim DSB notwendig). + try: + from compliance.services.mc_audit_type import ( + annotate_mc_results, split_by_audit_type, + ) + annotate_mc_results(all_mc_checks) + mc_split = split_by_audit_type(all_mc_checks) + # Fails-by-doc neu aufbauen: nur noch echte verifiable Fails + fails_by_doc = {} + for r in mc_split.get("verifiable_fails") or []: + fails_by_doc.setdefault("dse", []).append(r) + except Exception as e: + logger.warning("P106 mc_audit_type skipped: %s", e) + mc_split = {"internal_checks": [], "verifiable_fails": all_mc_checks} scorecard = build_scorecard(all_mc_checks) if all_mc_checks else {} # Trend: load previous scorecard for the same tenant + domain so the # email can show delta indicators (A6). @@ -1503,6 +1519,22 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): except Exception as e: logger.warning("industry context skipped: %s", e) + # P106 — Internal-Checks-Block (interne Prozesse / Doku-Pflichten) + internal_checks_html = "" + try: + from compliance.services.mc_audit_type import ( + build_internal_checks_block_html, + ) + ic = (mc_split or {}).get("internal_checks") or [] + if ic: + internal_checks_html = build_internal_checks_block_html(ic) + logger.info( + "P106: %d interne Checks (statt FAIL) im Block", + len(ic), + ) + except Exception as e: + logger.warning("P106 internal_checks_html skipped: %s", e) + # P85 — Banner-Screenshot fuer visuellen Beweis (zwischen # GF-1-Pager und Detail-Bloecken) banner_shot_html = "" @@ -1612,7 +1644,7 @@ async def _run_compliance_check(check_id: str, req: ComplianceCheckRequest): + bench_html + diff_html + critical_html + scope_disclaimer_html + exec_summary_html + cookie_arch_html + summary_html + scanned_html + profile_html - + scorecard_html + redundancy_html + + scorecard_html + internal_checks_html + redundancy_html + industry_ctx_html + banner_shot_html + providers_html + banner_deep_html diff --git a/backend-compliance/compliance/services/check_replay.py b/backend-compliance/compliance/services/check_replay.py index 2447c558..9daf2e3a 100644 --- a/backend-compliance/compliance/services/check_replay.py +++ b/backend-compliance/compliance/services/check_replay.py @@ -86,27 +86,81 @@ def replay_from_snapshot( parts: list[str] = [] # P80 v2 — Quality-Checks aus dem aktuellen Code auf Snapshot-Daten - # anwenden. Wir replayen NICHT die MC-Pipeline (zu schwer ohne - # rag_document_checker re-run), aber alle nachgelagerten Findings- - # Generatoren (audit_quality, cookie_compliance_audit, vendor_normalizer, - # entropy, network-trace) bekommen Snapshot-Daten und liefern den - # aktuellen Stand. + # anwenden. Vollstaendiger Replay aller post-fetch Findings-Generatoren. + cookie_t = doc_texts.get("cookie") or doc_texts.get("dse") or "" + + # Vendor-Normalize (Dedup + Garbage-Filter) + try: + from compliance.services.vendor_normalizer import normalize_vendors + cmp_vendors = normalize_vendors(list(cmp_vendors)) + except Exception as e: + logger.warning("Replay v2: normalizer failed: %s", e) + + # Audit-Quality try: from compliance.services.audit_quality_checks import ( - run_all as run_aq, + run_all as run_aq, build_audit_quality_block_html, ) - cookie_t = doc_texts.get("cookie") or doc_texts.get("dse") or "" aq = run_aq(banner_result, cookie_t, cmp_vendors, doc_entries) if aq: - from compliance.services.audit_quality_checks import ( - build_audit_quality_block_html, - ) aq_html = build_audit_quality_block_html(aq) parts.append(aq_html) section_sizes["audit_quality_v2"] = len(aq_html) except Exception as e: logger.warning("Replay v2: audit_quality failed: %s", e) + # Cookie-Compliance-Audit + try: + from compliance.services.cookie_compliance_audit import ( + audit_cookie_compliance, build_cookie_audit_block_html, + ) + ca = audit_cookie_compliance(db, cookie_t, banner_result) + if ca and (ca.get("declared_count") or ca.get("browser_count")): + ca_html = build_cookie_audit_block_html(ca) + parts.append(ca_html) + section_sizes["cookie_audit_v2"] = len(ca_html) + except Exception as e: + logger.warning("Replay v2: cookie_audit failed: %s", e) + + # TCF Authority + try: + from compliance.services.tcf_vendor_authority import ( + cross_reference_with_tcf, build_tcf_authority_block_html, + ) + tcf = cross_reference_with_tcf(db, cmp_vendors) + if tcf: + tcf_html = build_tcf_authority_block_html(tcf) + parts.append(tcf_html) + section_sizes["tcf_v2"] = len(tcf_html) + except Exception as e: + logger.warning("Replay v2: tcf failed: %s", e) + + # Entropy + Network-Trace + try: + from compliance.services.cookie_value_entropy import ( + check_cookies_for_entropy_mismatch, build_entropy_block_html, + ) + from compliance.services.cookie_network_tracer import ( + trace_cookie_network, build_network_trace_block_html, + ) + cd = (banner_result or {}).get("cookies_detailed") or [] + e1 = check_cookies_for_entropy_mismatch(cd) + if e1: + ent_html = build_entropy_block_html(e1) + parts.append(ent_html) + section_sizes["entropy_v2"] = len(ent_html) + site_url = "" + for entry in (doc_entries or []): + if entry.get("url"): + site_url = entry["url"]; break + net = trace_cookie_network(cd, site_url) + if net: + net_html = build_network_trace_block_html(net) + parts.append(net_html) + section_sizes["network_trace_v2"] = len(net_html) + except Exception as e: + logger.warning("Replay v2: entropy/network failed: %s", e) + # P82: GF-1-Pager zuerst (5-Bullet-Summary) try: from compliance.services.gf_one_pager import build_gf_one_pager_html diff --git a/backend-compliance/compliance/services/mc_audit_type.py b/backend-compliance/compliance/services/mc_audit_type.py new file mode 100644 index 00000000..23f68858 --- /dev/null +++ b/backend-compliance/compliance/services/mc_audit_type.py @@ -0,0 +1,269 @@ +""" +P106 — MC-Audit-Type-Klassifizierung. + +Zentrales Problem: viele Master-Controls pruefen Sachverhalte, die wir +von Aussen GAR NICHT pruefen koennen — z.B. ob das Unternehmen einen +internen Loeschkonzept-Prozess hat oder Schulungen durchgefuehrt wurden. + +Bisher: alle MCs deren Pattern im Text nicht matched → FAIL. +Folge: GF-Mail mit 95 FAILs, davon ~60-70 in Wirklichkeit nur 'unknown'. + +Loesung: pro MC klassifizieren: +* verifiable → Pattern muss im sichtbaren Dokument stehen (Audit moeglich) +* process_internal → interner Prozess des Kunden (Schulung, AVV-Vertrag, …) +* doc_internal → interne Dokumentation (VVT-Eintrag, DSFA-File, …) +* ambiguous → koennte beides sein + +In der MC-Auswertung: +* verifiable + Pattern fehlt → echtes FAIL ❌ +* process_internal → CHECK (Hinweis 'Bitte intern pruefen') ⓘ +* doc_internal → CHECK (Hinweis 'Im VVT/DSFA dokumentiert?') ⓘ +* ambiguous → CHECK mit Warnung +""" + +from __future__ import annotations + +import logging +import re + +logger = logging.getLogger(__name__) + + +# Patterns die auf interne Prozesse hindeuten (NICHT von aussen pruefbar) +_PROCESS_INTERNAL_PATTERNS = [ + # Schulung / Mitarbeiter + r"\bmitarbeiter\b.*schul", + r"\bschulung(en)?\b", + r"\bawareness\b", + r"\bsensibilisier", + # Vertraege intern + r"\bauftragsverarbeitungsvertrag\b", + r"\bAVV\b\s+abgeschlossen", + r"\bvertrag.*abgeschlossen", + r"\bdpa\s+(geschlossen|abgeschlossen|vorhanden)", + r"\bSCC\s+(geschlossen|abgeschlossen|implementiert)", + # Technisch-organisatorische Massnahmen (intern) + r"\btechnisch[-\s]*organisatorische\s+ma(ß|ss)nahmen?\b", + r"\bTOM\s+(umgesetzt|dokumentiert|implementiert)", + r"\bverschluesselung\s+(implementiert|aktiv)", + r"\bpseudonymisierung\s+(implementiert|aktiv)", + r"\bbackup[s]?\s+(eingerichtet|vorhanden)", + r"\bzugriffskontrolle", + r"\b(rollen|berechtigungs)konzept", + # Risikobewertung / DSFA (intern) + r"\bdsfa\s+(durchgefuehrt|erstellt|dokumentiert)", + r"\brisikobewertung\s+(durchgefuehrt|dokumentiert)", + r"\brisikoanalyse", + # Loeschkonzept / Aufbewahrung + r"\bloeschkonzept\s+(umgesetzt|implementiert)", + r"\baufbewahrungsfrist(en)?\s+(eingehalten|definiert)", + r"\bloeschroutinen?\s+(aktiv|implementiert)", + # Meldewege / Vorfallmanagement + r"\bmeldepflicht\s+(eingehalten|umgesetzt)", + r"\bvorfallmanagement", + r"\bincident[\s-]?response", + r"\b72[\s-]?stunden[\s-]?meldung", + # Generische Prozess-Indikatoren + r"\bdokumentiert\s+werden", + r"\bbitte\s+(intern\s+)?dokumentieren", + r"\bin\s+der\s+verfahrens", + r"\bnach\s+innen\s+geh", + r"\bausnahmen\s+(dokumentieren|protokollieren)", + r"\bkostenfrei\s+(zur\s+verfuegung|gewaehren|ermoegli)", + r"\bunentgeltlich\s+(zur\s+verfuegung)", + # Vertragsleistung / Service-Level (intern) + r"\bservice[\s-]?level", + r"\breaktionszeit", + # Auditierung / Aufsicht + r"\binterne(s)?\s+audit", + r"\baufsichtsbehoerde\s+gemeldet", + r"\bbeauftragter\s+(intern|benannt)", + # eCall + Branchen-spezifische interne Pflichten + r"\babschaltung\s+der\s+\w+\s+kostenfrei", + r"\bopt[\s-]?out\s+(intern|im\s+kundenportal)\s+ermoeglichen", +] + +# Patterns die auf interne Dokumentation hindeuten (VVT, DSFA-Datei, …) +_DOC_INTERNAL_PATTERNS = [ + r"\bverzeichnis\s+der\s+verarbeitungstaetigkeiten\b", + r"\bvvt(\s+|\b)", + r"\bdsfa[\s-]?dokument", + r"\bauftragsverarbeitungsverzeichnis", + r"\bsub[\s-]?prozessor[\s-]?liste", + r"\bverarbeitungs[\s-]?register", + r"\binternes\s+register", + r"\baufbewahrungs[\s-]?konzept\b", +] + +# Patterns die auf externe Sichtbarkeit hindeuten → DEFINITIV verifiable +_VERIFIABLE_PATTERNS = [ + r"\bin\s+der\s+(datenschutzerklaerung|dse|cookie[\s-]?richtlinie|impressum|agb)\b", + r"\bauf\s+der\s+website\s+(genannt|sichtbar|angegeben)", + r"\bim\s+banner\s+(genannt|sichtbar)", + r"\bim\s+cookie[\s-]?banner", + r"\bauf\s+der\s+startseite", + r"\bim\s+footer", +] + + +def _matches_any(text: str, patterns: list[str]) -> bool: + tl = text.lower() + for pat in patterns: + try: + if re.search(pat, tl): + return True + except re.error: + continue + return False + + +def classify_mc_audit_type( + title: str | None, + check_question: str | None = None, + fail_criteria: dict | None = None, +) -> str: + """Returns 'verifiable', 'process_internal', 'doc_internal', + or 'ambiguous'.""" + blob = " ".join([title or "", check_question or "", + str(fail_criteria or "")]) + if not blob.strip(): + return "ambiguous" + + is_verifiable_hint = _matches_any(blob, _VERIFIABLE_PATTERNS) + is_process = _matches_any(blob, _PROCESS_INTERNAL_PATTERNS) + is_doc = _matches_any(blob, _DOC_INTERNAL_PATTERNS) + + # Wenn explicit Verifiable-Indikator + kein Process → verifiable + if is_verifiable_hint and not (is_process or is_doc): + return "verifiable" + # Wenn Process oder Doc UND nicht Verifiable → intern + if is_process and not is_verifiable_hint: + return "process_internal" + if is_doc and not is_verifiable_hint: + return "doc_internal" + # Beides → ambiguous, im Zweifel CHECK markieren + if is_process or is_doc: + return "ambiguous" + return "verifiable" + + +def annotate_mc_results(check_results: list[dict]) -> list[dict]: + """In-place: setzt mc_audit_type auf jeden MC-Check und ersetzt + Status 'failed' durch 'check' wenn audit_type != verifiable.""" + if not check_results: + return check_results + n_reclassified = 0 + for r in check_results: + if not isinstance(r, dict): + continue + if not (r.get("id") or "").startswith("mc-"): + continue + if "mc_audit_type" not in r: + r["mc_audit_type"] = classify_mc_audit_type( + r.get("label"), r.get("hint"), r.get("fail_criteria"), + ) + # Wenn FAIL aber audit_type != verifiable → "check" (manuell) + if (not r.get("passed") + and not r.get("skipped") + and r["mc_audit_type"] in ( + "process_internal", "doc_internal", "ambiguous", + )): + r["audit_status"] = "check" # NICHT failed + n_reclassified += 1 + elif r.get("passed"): + r["audit_status"] = "pass" + elif r.get("skipped"): + r["audit_status"] = "skip" + else: + r["audit_status"] = "fail" + if n_reclassified: + logger.info( + "MC-Audit-Type: %d/%d MCs reklassifiziert von FAIL → CHECK " + "(interne Pruefung erforderlich)", + n_reclassified, len(check_results), + ) + return check_results + + +def split_by_audit_type(check_results: list[dict]) -> dict[str, list[dict]]: + """Liefert {verifiable_fails, internal_checks, passes, skips}.""" + out = {"verifiable_fails": [], "internal_checks": [], + "passes": [], "skips": []} + for r in (check_results or []): + if not isinstance(r, dict): + continue + if not (r.get("id") or "").startswith("mc-"): + continue + status = r.get("audit_status") + if status == "pass": + out["passes"].append(r) + elif status == "skip": + out["skips"].append(r) + elif status == "check": + out["internal_checks"].append(r) + elif status == "fail" or (not r.get("passed") and not r.get("skipped")): + out["verifiable_fails"].append(r) + return out + + +def build_internal_checks_block_html( + internal_checks: list[dict], + limit: int = 30, +) -> str: + if not internal_checks: + return "" + by_type: dict[str, list[dict]] = {} + for c in internal_checks: + t = c.get("mc_audit_type", "ambiguous") + by_type.setdefault(t, []).append(c) + + sections: list[str] = [] + labels = { + "process_internal": ("Interne Prozesse — bitte beim DSB pruefen", + "#1e40af"), + "doc_internal": ("Interne Dokumentation — bitte im VVT/DSFA pruefen", + "#5b21b6"), + "ambiguous": ("Unklar ob Audit-Befund oder interne Pruefung", + "#92400e"), + } + for atype, (heading, color) in labels.items(): + items = by_type.get(atype) or [] + if not items: + continue + rows = "".join( + f'
' + 'Diese Anforderungen koennen wir per externem Website-Audit nicht ' + 'als erfuellt oder nicht-erfuellt bewerten — sie betreffen interne ' + 'Prozesse (Schulungen, AVV-Vertraege, TOM-Doku) oder interne ' + 'Dokumentation (VVT, DSFA, Loeschkonzept). Sie sind also kein ' + 'Verstoss, sondern Hinweis-Checks fuer Ihren DSB.
' + + "".join(sections) + + '