e2be51b0aa
CI / secret-scan (push) Has been skipped
CI / dep-audit (push) Has been skipped
CI / sbom-scan (push) Has been skipped
CI / validate-canonical-controls (push) Successful in 16s
CI / detect-changes (push) Successful in 11s
CI / branch-name (push) Has been skipped
CI / guardrail-integrity (push) Has been skipped
CI / loc-budget (push) Failing after 16s
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / nodejs-build (push) Successful in 2m42s
CI / test-go (push) Has been skipped
CI / iace-gt-coverage (push) Has been skipped
CI / test-python-backend (push) Successful in 41s
CI / test-python-document-crawler (push) Has been skipped
CI / test-python-dsms-gateway (push) Has been skipped
P106 — mc_audit_type.py: zentrales Quality-Thema. Klassifiziert pro MC: verifiable / process_internal / doc_internal / ambiguous. Pattern-Match auf check_question + title + fail_criteria (Schulung, AVV abgeschlossen, TOM umgesetzt, DSFA durchgefuehrt, Ausnahmen dokumentieren, kostenfrei zur Verfuegung, opt-out intern ermoeglichen, …). Interne MCs werden in der MC-Auswertung NICHT mehr als FAIL gewertet, sondern als CHECK markiert (audit_status='check'). Sie zaehlen im build_scorecard als skipped (nicht failed) damit der Score realistisch ist. build_internal_checks_block_html() rendert sie als separaten blauen Block 'Pruefungen die wir von aussen NICHT durchfuehren koennen' nach dem MC-Scorecard. Erwartete Wirkung: bei VW 95 FAILs → wahrscheinlich 30-40 echte verifiable_fails + 50-60 internal_checks. GF-Mail wird drastisch realistischer (statt 'Sie haben 95 Verstoesse' → 'Sie haben 35 extern sichtbare Themen + 60 interne Checks, bitte mit DSB klaeren'). P83 — BUILD_SHA in backend/admin/consent-tester Dockerfiles als ARG + ENV. check-rebuild-needed.sh kann jetzt deployed vs local SHA vergleichen + REBUILD REQUIRED melden. P80 v2 — check_replay.py macht jetzt vollstaendigen Replay aller post-fetch Quality-Generatoren: vendor_normalizer (Dedup), audit_quality_checks, cookie_compliance_audit, tcf_vendor_authority, cookie_value_entropy, cookie_network_tracer. Snapshots aus alter Zeit zeigen jetzt im Replay den aktuellen Audit-Stand. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
220 lines
7.5 KiB
Python
220 lines
7.5 KiB
Python
"""
|
|
Master-Control Scorecard — group + summarise MC results.
|
|
|
|
With max_controls=0 (#30 fix) every doc-check now evaluates 75-571 MCs
|
|
per document. Rendering all of them verbatim makes the email + frontend
|
|
unreadable. This module produces three structured artefacts:
|
|
|
|
1. `build_scorecard(check_results)` — per-regulation aggregate (PASS /
|
|
FAIL / SKIP counts + severity histogram + compliance %)
|
|
|
|
2. `top_fails(check_results, n=10)` — top-N failed MCs ranked by
|
|
severity then absence of evidence
|
|
|
|
3. `full_audit_records(check_results, check_id, tenant_id)` — flat
|
|
list ready for SQLite persistence + JSON export
|
|
|
|
The functions are pure — no DB / network — so they're cheap to call
|
|
from inside the route and unit-testable.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from collections import defaultdict
|
|
from datetime import datetime, timezone
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Severity order: CRITICAL > HIGH > MEDIUM > LOW > INFO
|
|
_SEV_RANK = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4}
|
|
|
|
|
|
def build_scorecard(check_results: list[dict]) -> dict:
|
|
"""Aggregate per-regulation pass/fail/skip + severity buckets.
|
|
|
|
Args:
|
|
check_results: list of dicts, each typically a CheckItem-like
|
|
record with keys: id, label, passed, severity, skipped,
|
|
regulation, doc_type.
|
|
|
|
Returns:
|
|
{
|
|
"by_regulation": [
|
|
{"regulation": "DSGVO", "total": 193, "passed": 167,
|
|
"failed": 24, "skipped": 2, "pct": 87,
|
|
"severity": {"HIGH": 22, "MEDIUM": 2}}
|
|
],
|
|
"totals": {"total": 1874, "passed": 1300, "failed": 540,
|
|
"skipped": 34, "pct": 70},
|
|
}
|
|
"""
|
|
buckets: dict[str, dict] = defaultdict(
|
|
lambda: {"total": 0, "passed": 0, "failed": 0, "skipped": 0,
|
|
"severity": defaultdict(int)},
|
|
)
|
|
for r in check_results or []:
|
|
reg = (r.get("regulation") or "—").strip() or "—"
|
|
b = buckets[reg]
|
|
b["total"] += 1
|
|
if r.get("skipped"):
|
|
b["skipped"] += 1
|
|
elif r.get("passed"):
|
|
b["passed"] += 1
|
|
# P106 — interner Check ist KEIN Fail (zaehlt als skipped fuer
|
|
# die Score-Berechnung damit der Score realistisch ist).
|
|
elif r.get("audit_status") == "check":
|
|
b["skipped"] += 1
|
|
b.setdefault("internal_checks", 0)
|
|
b["internal_checks"] += 1
|
|
else:
|
|
b["failed"] += 1
|
|
sev = (r.get("severity") or "MEDIUM").upper()
|
|
b["severity"][sev] += 1
|
|
|
|
rows = []
|
|
grand_total = grand_passed = grand_failed = grand_skipped = 0
|
|
for reg, b in buckets.items():
|
|
# Convert defaultdict for serialisability
|
|
sev_dict = dict(b["severity"])
|
|
active = b["total"] - b["skipped"]
|
|
pct = round(b["passed"] / active * 100) if active else 0
|
|
rows.append({
|
|
"regulation": reg,
|
|
"total": b["total"],
|
|
"passed": b["passed"],
|
|
"failed": b["failed"],
|
|
"skipped": b["skipped"],
|
|
"pct": pct,
|
|
"severity": sev_dict,
|
|
})
|
|
grand_total += b["total"]
|
|
grand_passed += b["passed"]
|
|
grand_failed += b["failed"]
|
|
grand_skipped += b["skipped"]
|
|
|
|
rows.sort(key=lambda r: (-r["failed"], r["regulation"]))
|
|
|
|
grand_active = grand_total - grand_skipped
|
|
grand_pct = round(grand_passed / grand_active * 100) if grand_active else 0
|
|
return {
|
|
"by_regulation": rows,
|
|
"totals": {
|
|
"total": grand_total, "passed": grand_passed,
|
|
"failed": grand_failed, "skipped": grand_skipped,
|
|
"pct": grand_pct,
|
|
},
|
|
}
|
|
|
|
|
|
_DEDUP_KEYWORDS = [
|
|
"einfache sprache", "verstaendliche sprache", "verständliche sprache",
|
|
"klare sprache", "einwilligungstexte", "einwilligungsaufforderung",
|
|
"einwilligungserklaerung", "einwilligungserklärung",
|
|
"mehrdeutige", "verstaendliche form", "verständliche form",
|
|
"fachbegriffe erklaeren", "fachbegriffe erklären",
|
|
]
|
|
|
|
|
|
def _dedup_key(label: str) -> str:
|
|
"""Cluster label to a stable dedup-key: if it contains one of the
|
|
well-known repetitive Sprache/Einwilligungs-Aufforderungs-Concepts,
|
|
collapse them all to that single concept. Otherwise return original."""
|
|
l = (label or "").lower()
|
|
for kw in _DEDUP_KEYWORDS:
|
|
if kw in l:
|
|
return f"_dup:{kw}"
|
|
return label
|
|
|
|
|
|
_CONDITIONAL_MARKERS = ("falls ", "sofern ", "wenn ", "soweit ",
|
|
"bei bedarf", "ggf.", "gegebenenfalls")
|
|
|
|
|
|
def _is_hard_finding(r: dict) -> bool:
|
|
"""Echtes Finding = wir haben einen positiven Treffer im Text der den
|
|
Verstoss belegt. Stille im Text reicht NICHT — das wandert ins MC-Audit
|
|
als "selbst pruefen", nicht ins Email als HIGH-Drohung.
|
|
|
|
Heuristik:
|
|
- matched_text nicht leer = textuelle Evidenz vorhanden → hart
|
|
- konditionales Label ("falls / sofern / wenn") UND matched_text leer
|
|
→ weich (Pre-Condition nicht belegt) → raus aus Top-Fails
|
|
- sonst: hart (klassische Pflichtangaben-Lücke wie "DSB fehlt")
|
|
"""
|
|
mt = (r.get("matched_text") or "").strip()
|
|
if mt:
|
|
return True
|
|
label_low = (r.get("label") or "").lower()
|
|
if any(m in label_low for m in _CONDITIONAL_MARKERS):
|
|
return False
|
|
return True
|
|
|
|
|
|
def top_fails(check_results: list[dict], n: int = 10) -> list[dict]:
|
|
"""Return top-N failing MCs sorted by severity then label.
|
|
|
|
Skipped + passed MCs are excluded. INFO severity is excluded by
|
|
default since those are guidance, not findings. Konditionale MCs
|
|
ohne Negativ-Beleg (P8) werden ebenfalls ausgesteuert — sie
|
|
erscheinen nur noch im MC-Audit als "selbst pruefen".
|
|
|
|
Near-duplicates (multiple MCs that all complain about "einfache
|
|
Sprache" / "Einwilligungsaufforderung" / ...) are collapsed to ONE
|
|
representative entry — sonst dominieren UI-Sprache-Hinweise die
|
|
Top-Liste und echte Lecks gehen unter.
|
|
"""
|
|
fails = [
|
|
r for r in (check_results or [])
|
|
if not r.get("passed") and not r.get("skipped")
|
|
and (r.get("severity") or "").upper() != "INFO"
|
|
and _is_hard_finding(r)
|
|
]
|
|
fails.sort(key=lambda r: (
|
|
_SEV_RANK.get((r.get("severity") or "MEDIUM").upper(), 5),
|
|
r.get("label", ""),
|
|
))
|
|
seen_keys: set[str] = set()
|
|
deduped: list[dict] = []
|
|
for r in fails:
|
|
k = _dedup_key(r.get("label", ""))
|
|
if k in seen_keys:
|
|
continue
|
|
seen_keys.add(k)
|
|
deduped.append(r)
|
|
if len(deduped) >= n:
|
|
break
|
|
return deduped
|
|
|
|
|
|
def full_audit_records(
|
|
check_results: list[dict],
|
|
check_id: str,
|
|
tenant_id: str = "",
|
|
doc_type: str = "",
|
|
) -> list[dict]:
|
|
"""Flatten check results into rows ready for SQLite persistence.
|
|
|
|
Returns one record per MC. Keeps the original fields plus
|
|
check_id + doc_type + tenant_id + ts.
|
|
"""
|
|
ts = datetime.now(timezone.utc).isoformat()
|
|
out: list[dict] = []
|
|
for r in check_results or []:
|
|
out.append({
|
|
"check_id": check_id,
|
|
"tenant_id": tenant_id,
|
|
"doc_type": doc_type,
|
|
"ts": ts,
|
|
"mc_id": r.get("id", ""),
|
|
"label": (r.get("label") or "")[:300],
|
|
"passed": bool(r.get("passed")),
|
|
"skipped": bool(r.get("skipped")),
|
|
"severity": (r.get("severity") or "").upper(),
|
|
"regulation": r.get("regulation") or "",
|
|
"matched_text": (r.get("matched_text") or "")[:500],
|
|
"hint": (r.get("hint") or "")[:500],
|
|
"level": int(r.get("level") or 1),
|
|
})
|
|
return out
|