Files
breakpilot-compliance/backend-compliance/compliance/services/compliance_audit_log.py
T
Benjamin Admin 6ed30dae5b feat(agent): MC scorecard + audit drill-down + tenant trend (A1-A6)
Now that all 1874 MCs run per check (Task #30 cap removal), the report
was about to drown in noise. This commit adds the full aggregation /
persistence / drill-down stack so each MC is actionable, not just
counted.

A1 mc_scorecard.py (new):
  build_scorecard(checks)    -> per-regulation PASS/FAIL/SKIP + severity
  top_fails(checks, n)       -> N most severe failed MCs
  full_audit_records(...)    -> flat rows ready for sidecar SQLite

A2 Email rendering:
  agent_doc_check_scorecard.py (new) builds an HTML scorecard table
  (regulation × passed/failed/HIGH/MEDIUM/score) shown at the top of
  the email. agent_doc_check_report._render_document now collapses
  the 500-MC L2 forest into 'X/Y bestanden (Z Fail)' summary plus
  a top-10 fails block per doc — old verbose render is gone.

A3 compliance_audit_log.py (new) — sidecar SQLite at
  /data/compliance_audits.db (separate from compliance Postgres
  schema to comply with the no-new-migrations rule in CLAUDE.md):
    check_runs(check_id, ts, tenant_id, site_name, base_domain,
               doc_count, scorecard json, vvt_summary json)
    mc_results(check_id, doc_type, mc_id, label, passed, skipped,
               severity, regulation, matched_text, hint)
  Route persists every run after the email is sent.
  docker-compose.yml adds compliance-audit volume + env.

A4 backfill_mc_regulation_llm.py (new) — Qwen-tagged backfill for
  the 1636 MCs the regex pass couldn't classify. Batches of 25,
  format=json, output constrained to the canonical regulation list.
  Run manually: docker exec bp-compliance-backend python3 \
                 /app/scripts/backfill_mc_regulation_llm.py [--dry-run]

A5 Admin audit tab — GET /api/compliance/agent/audit/<check_id>
  proxied via /api/sdk/v1/agent/audit/<id>. New page
  /sdk/agent/audit/[checkId] renders scorecard + filterable MC table
  (status / doc_type / regulation, expandable rows with matched_text
  + hint). ComplianceCheckTab now shows 'Voll-Audit oeffnen' link.

A6 Trend per tenant — GET /api/compliance/agent/audit/tenant/<id>
  returns recent runs. Email scorecard shows per-regulation delta
  badges ('(+12%)', '(-3%)') compared with the previous run for the
  same tenant + base_domain. Lookup is one SQLite query.

Plumbing:
  rag_document_checker.py — SELECT now includes 'article'; MC results
    carry 'regulation' + 'article' through to CheckItem.
  agent_doc_check_routes.CheckItem schema gains regulation + article
    fields (defaults '') so old clients still parse.
  agent_compliance_check_routes — response gains 'check_id' so the
    frontend can build the audit link.
2026-05-17 13:45:58 +02:00

197 lines
7.1 KiB
Python

"""
Compliance-Check Audit Log — sidecar SQLite persistence.
Every compliance-check run flattens its MC results into rows here so
we have:
- per-tenant history of scorecards (Task A6 trend view)
- drill-down on individual MCs for the admin frontend (Task A5)
- export-ability (DSB receives JSON attachment derived from this)
Sidecar SQLite (`/data/compliance_audits.db`) instead of a new table in
the compliance schema, because the repo policy forbids new migrations
without explicit DB-owner sign-off (see CLAUDE.md guardrails).
"""
from __future__ import annotations
import json
import logging
import os
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
logger = logging.getLogger(__name__)
DB_PATH = os.getenv("COMPLIANCE_AUDIT_DB", "/data/compliance_audits.db")
def _ensure_db() -> None:
Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(DB_PATH) as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS check_runs (
check_id TEXT PRIMARY KEY,
ts TEXT NOT NULL,
tenant_id TEXT,
site_name TEXT,
base_domain TEXT,
doc_count INTEGER,
scorecard TEXT, -- JSON {by_regulation, totals}
vvt_summary TEXT -- JSON {total, internal, external, critical}
);
CREATE INDEX IF NOT EXISTS idx_runs_tenant ON check_runs(tenant_id, ts);
CREATE INDEX IF NOT EXISTS idx_runs_domain ON check_runs(base_domain, ts);
CREATE TABLE IF NOT EXISTS mc_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
check_id TEXT NOT NULL,
doc_type TEXT,
mc_id TEXT,
label TEXT,
passed INTEGER,
skipped INTEGER,
severity TEXT,
regulation TEXT,
matched_text TEXT,
hint TEXT
);
CREATE INDEX IF NOT EXISTS idx_mc_check ON mc_results(check_id);
CREATE INDEX IF NOT EXISTS idx_mc_reg ON mc_results(regulation, passed);
""")
def record_check_run(
check_id: str,
tenant_id: str,
site_name: str,
base_domain: str,
doc_count: int,
scorecard: dict,
vvt_summary: dict | None = None,
mc_records: list[dict] | None = None,
) -> None:
"""Persist one check run + all its MC rows. Idempotent on check_id."""
try:
_ensure_db()
ts = datetime.now(timezone.utc).isoformat()
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"INSERT OR REPLACE INTO check_runs "
"(check_id, ts, tenant_id, site_name, base_domain, doc_count, "
" scorecard, vvt_summary) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(
check_id, ts, tenant_id, site_name, base_domain, doc_count,
json.dumps(scorecard, ensure_ascii=False),
json.dumps(vvt_summary or {}, ensure_ascii=False),
),
)
# Clear old rows for the same check_id before re-inserting (idempotency)
conn.execute("DELETE FROM mc_results WHERE check_id=?", (check_id,))
if mc_records:
conn.executemany(
"INSERT INTO mc_results "
"(check_id, doc_type, mc_id, label, passed, skipped, "
" severity, regulation, matched_text, hint) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
[
(
r.get("check_id", check_id),
r.get("doc_type", ""),
r.get("mc_id", ""),
(r.get("label") or "")[:300],
1 if r.get("passed") else 0,
1 if r.get("skipped") else 0,
(r.get("severity") or "").upper(),
r.get("regulation") or "",
(r.get("matched_text") or "")[:500],
(r.get("hint") or "")[:500],
)
for r in mc_records
],
)
conn.commit()
logger.info("Audit recorded: check_id=%s mc_rows=%d",
check_id, len(mc_records or []))
except Exception as e:
logger.warning("Audit persistence failed for %s: %s", check_id, e)
# ── Read API (used by the admin endpoints + trend view) ─────────────
def get_check_run(check_id: str) -> dict | None:
try:
_ensure_db()
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT * FROM check_runs WHERE check_id=?", (check_id,),
).fetchone()
if not row:
return None
d = dict(row)
d["scorecard"] = json.loads(d.get("scorecard") or "{}")
d["vvt_summary"] = json.loads(d.get("vvt_summary") or "{}")
return d
except Exception as e:
logger.warning("get_check_run failed: %s", e)
return None
def list_mc_results(
check_id: str,
doc_type: str | None = None,
regulation: str | None = None,
only_failed: bool = False,
) -> list[dict]:
try:
_ensure_db()
where = ["check_id = ?"]
params: list = [check_id]
if doc_type:
where.append("doc_type = ?")
params.append(doc_type)
if regulation:
where.append("regulation = ?")
params.append(regulation)
if only_failed:
where.append("passed = 0 AND skipped = 0")
sql = ("SELECT * FROM mc_results WHERE " + " AND ".join(where)
+ " ORDER BY severity, label")
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(sql, params).fetchall()
return [dict(r) for r in rows]
except Exception as e:
logger.warning("list_mc_results failed: %s", e)
return []
def list_runs_for_tenant(
tenant_id: str,
base_domain: str | None = None,
limit: int = 30,
) -> list[dict]:
try:
_ensure_db()
where = ["tenant_id = ?"]
params: list = [tenant_id]
if base_domain:
where.append("base_domain = ?")
params.append(base_domain)
sql = ("SELECT * FROM check_runs WHERE " + " AND ".join(where)
+ " ORDER BY ts DESC LIMIT ?")
params.append(limit)
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(sql, params).fetchall()
out = []
for r in rows:
d = dict(r)
d["scorecard"] = json.loads(d.get("scorecard") or "{}")
out.append(d)
return out
except Exception as e:
logger.warning("list_runs_for_tenant failed: %s", e)
return []