Files
breakpilot-compliance/backend-compliance/compliance/services/mc_scorecard.py
T
Benjamin Admin 6ed30dae5b feat(agent): MC scorecard + audit drill-down + tenant trend (A1-A6)
Now that all 1874 MCs run per check (Task #30 cap removal), the report
was about to drown in noise. This commit adds the full aggregation /
persistence / drill-down stack so each MC is actionable, not just
counted.

A1 mc_scorecard.py (new):
  build_scorecard(checks)    -> per-regulation PASS/FAIL/SKIP + severity
  top_fails(checks, n)       -> N most severe failed MCs
  full_audit_records(...)    -> flat rows ready for sidecar SQLite

A2 Email rendering:
  agent_doc_check_scorecard.py (new) builds an HTML scorecard table
  (regulation × passed/failed/HIGH/MEDIUM/score) shown at the top of
  the email. agent_doc_check_report._render_document now collapses
  the 500-MC L2 forest into 'X/Y bestanden (Z Fail)' summary plus
  a top-10 fails block per doc — old verbose render is gone.

A3 compliance_audit_log.py (new) — sidecar SQLite at
  /data/compliance_audits.db (separate from compliance Postgres
  schema to comply with the no-new-migrations rule in CLAUDE.md):
    check_runs(check_id, ts, tenant_id, site_name, base_domain,
               doc_count, scorecard json, vvt_summary json)
    mc_results(check_id, doc_type, mc_id, label, passed, skipped,
               severity, regulation, matched_text, hint)
  Route persists every run after the email is sent.
  docker-compose.yml adds compliance-audit volume + env.

A4 backfill_mc_regulation_llm.py (new) — Qwen-tagged backfill for
  the 1636 MCs the regex pass couldn't classify. Batches of 25,
  format=json, output constrained to the canonical regulation list.
  Run manually: docker exec bp-compliance-backend python3 \
                 /app/scripts/backfill_mc_regulation_llm.py [--dry-run]

A5 Admin audit tab — GET /api/compliance/agent/audit/<check_id>
  proxied via /api/sdk/v1/agent/audit/<id>. New page
  /sdk/agent/audit/[checkId] renders scorecard + filterable MC table
  (status / doc_type / regulation, expandable rows with matched_text
  + hint). ComplianceCheckTab now shows 'Voll-Audit oeffnen' link.

A6 Trend per tenant — GET /api/compliance/agent/audit/tenant/<id>
  returns recent runs. Email scorecard shows per-regulation delta
  badges ('(+12%)', '(-3%)') compared with the previous run for the
  same tenant + base_domain. Lookup is one SQLite query.

Plumbing:
  rag_document_checker.py — SELECT now includes 'article'; MC results
    carry 'regulation' + 'article' through to CheckItem.
  agent_doc_check_routes.CheckItem schema gains regulation + article
    fields (defaults '') so old clients still parse.
  agent_compliance_check_routes — response gains 'check_id' so the
    frontend can build the audit link.
2026-05-17 13:45:58 +02:00

152 lines
4.9 KiB
Python

"""
Master-Control Scorecard — group + summarise MC results.
With max_controls=0 (#30 fix) every doc-check now evaluates 75-571 MCs
per document. Rendering all of them verbatim makes the email + frontend
unreadable. This module produces three structured artefacts:
1. `build_scorecard(check_results)` — per-regulation aggregate (PASS /
FAIL / SKIP counts + severity histogram + compliance %)
2. `top_fails(check_results, n=10)` — top-N failed MCs ranked by
severity then absence of evidence
3. `full_audit_records(check_results, check_id, tenant_id)` — flat
list ready for SQLite persistence + JSON export
The functions are pure — no DB / network — so they're cheap to call
from inside the route and unit-testable.
"""
from __future__ import annotations
import logging
from collections import defaultdict
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
# Severity order: CRITICAL > HIGH > MEDIUM > LOW > INFO
_SEV_RANK = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4}
def build_scorecard(check_results: list[dict]) -> dict:
"""Aggregate per-regulation pass/fail/skip + severity buckets.
Args:
check_results: list of dicts, each typically a CheckItem-like
record with keys: id, label, passed, severity, skipped,
regulation, doc_type.
Returns:
{
"by_regulation": [
{"regulation": "DSGVO", "total": 193, "passed": 167,
"failed": 24, "skipped": 2, "pct": 87,
"severity": {"HIGH": 22, "MEDIUM": 2}}
],
"totals": {"total": 1874, "passed": 1300, "failed": 540,
"skipped": 34, "pct": 70},
}
"""
buckets: dict[str, dict] = defaultdict(
lambda: {"total": 0, "passed": 0, "failed": 0, "skipped": 0,
"severity": defaultdict(int)},
)
for r in check_results or []:
reg = (r.get("regulation") or "").strip() or ""
b = buckets[reg]
b["total"] += 1
if r.get("skipped"):
b["skipped"] += 1
elif r.get("passed"):
b["passed"] += 1
else:
b["failed"] += 1
sev = (r.get("severity") or "MEDIUM").upper()
b["severity"][sev] += 1
rows = []
grand_total = grand_passed = grand_failed = grand_skipped = 0
for reg, b in buckets.items():
# Convert defaultdict for serialisability
sev_dict = dict(b["severity"])
active = b["total"] - b["skipped"]
pct = round(b["passed"] / active * 100) if active else 0
rows.append({
"regulation": reg,
"total": b["total"],
"passed": b["passed"],
"failed": b["failed"],
"skipped": b["skipped"],
"pct": pct,
"severity": sev_dict,
})
grand_total += b["total"]
grand_passed += b["passed"]
grand_failed += b["failed"]
grand_skipped += b["skipped"]
rows.sort(key=lambda r: (-r["failed"], r["regulation"]))
grand_active = grand_total - grand_skipped
grand_pct = round(grand_passed / grand_active * 100) if grand_active else 0
return {
"by_regulation": rows,
"totals": {
"total": grand_total, "passed": grand_passed,
"failed": grand_failed, "skipped": grand_skipped,
"pct": grand_pct,
},
}
def top_fails(check_results: list[dict], n: int = 10) -> list[dict]:
"""Return top-N failing MCs sorted by severity then label.
Skipped + passed MCs are excluded. INFO severity is excluded by
default since those are guidance, not findings.
"""
fails = [
r for r in (check_results or [])
if not r.get("passed") and not r.get("skipped")
and (r.get("severity") or "").upper() != "INFO"
]
fails.sort(key=lambda r: (
_SEV_RANK.get((r.get("severity") or "MEDIUM").upper(), 5),
r.get("label", ""),
))
return fails[:n]
def full_audit_records(
check_results: list[dict],
check_id: str,
tenant_id: str = "",
doc_type: str = "",
) -> list[dict]:
"""Flatten check results into rows ready for SQLite persistence.
Returns one record per MC. Keeps the original fields plus
check_id + doc_type + tenant_id + ts.
"""
ts = datetime.now(timezone.utc).isoformat()
out: list[dict] = []
for r in check_results or []:
out.append({
"check_id": check_id,
"tenant_id": tenant_id,
"doc_type": doc_type,
"ts": ts,
"mc_id": r.get("id", ""),
"label": (r.get("label") or "")[:300],
"passed": bool(r.get("passed")),
"skipped": bool(r.get("skipped")),
"severity": (r.get("severity") or "").upper(),
"regulation": r.get("regulation") or "",
"matched_text": (r.get("matched_text") or "")[:500],
"hint": (r.get("hint") or "")[:500],
"level": int(r.get("level") or 1),
})
return out