feat(pipeline): G1 Decision Trace — compliance decision tracking

New table: decision_traces (status, reason, evidence, fix plan per control)
New API:
  POST/GET/PUT /v1/decision-traces (CRUD for decisions)
  GET /v1/decision-traces/stats (compliance dashboard)
  GET /v1/controls/{id}/full-trace (Regulation→Obligation→Control→Decision→Evidence)

454 tests pass, 0 regressions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-06 18:26:21 +02:00
parent ad24835940
commit 66a70ab31c
3 changed files with 466 additions and 0 deletions
+4
View File
@@ -5,6 +5,8 @@ from api.canonical_control_routes import router as canonical_router
from api.document_compliance_routes import router as document_router
from api.dependency_routes import router as dependency_router
from api.master_control_routes import router as master_control_router
from api.decision_trace_routes import router as decision_trace_router
from api.decision_trace_routes import full_trace_router
router = APIRouter()
router.include_router(generator_router)
@@ -12,3 +14,5 @@ router.include_router(canonical_router)
router.include_router(document_router)
router.include_router(dependency_router)
router.include_router(master_control_router)
router.include_router(decision_trace_router)
router.include_router(full_trace_router)
@@ -0,0 +1,404 @@
"""Decision Trace API — G1 Compliance Execution Layer.
Tracks compliance decisions per control: who decided, when, why,
what evidence supports it, and what's the remediation plan.
"""
import json
import logging
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import text
from db.session import SessionLocal
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/decision-traces", tags=["decision-traces"])
# ── Request/Response Models ──────────────────────────────────────────
class CreateDecisionRequest(BaseModel):
control_uuid: str
regulation_id: Optional[str] = None
obligation_id: Optional[str] = None
status: str = "not_assessed"
decision_reason: Optional[str] = None
decided_by: Optional[str] = None
fix_strategy: Optional[str] = None
fix_owner: Optional[str] = None
fix_target_date: Optional[str] = None
evidence_ids: list[str] = []
confidence: float = 0.0
tenant_id: Optional[str] = None
project_id: Optional[str] = None
metadata: dict = {}
class UpdateDecisionRequest(BaseModel):
status: Optional[str] = None
decision_reason: Optional[str] = None
decided_by: Optional[str] = None
fix_strategy: Optional[str] = None
fix_owner: Optional[str] = None
fix_target_date: Optional[str] = None
fix_completed_date: Optional[str] = None
evidence_ids: Optional[list[str]] = None
confidence: Optional[float] = None
metadata: Optional[dict] = None
# ── Endpoints ────────────────────────────────────────────────────────
@router.post("")
async def create_decision(req: CreateDecisionRequest):
"""Record a new compliance decision for a control."""
db = SessionLocal()
try:
trace_id = str(uuid.uuid4())
db.execute(text("""
INSERT INTO decision_traces
(id, control_uuid, regulation_id, obligation_id,
status, decision_reason, decided_by, decided_at,
fix_strategy, fix_owner, fix_target_date,
evidence_ids, confidence, tenant_id, project_id, metadata)
VALUES
(CAST(:id AS uuid), CAST(:control_uuid AS uuid), :regulation_id, :obligation_id,
:status, :decision_reason, :decided_by, NOW(),
:fix_strategy, :fix_owner, :fix_target_date,
CAST(:evidence_ids AS jsonb), :confidence,
:tenant_id, :project_id, CAST(:metadata AS jsonb))
"""), {
"id": trace_id,
"control_uuid": req.control_uuid,
"regulation_id": req.regulation_id,
"obligation_id": req.obligation_id,
"status": req.status,
"decision_reason": req.decision_reason,
"decided_by": req.decided_by,
"fix_strategy": req.fix_strategy,
"fix_owner": req.fix_owner,
"fix_target_date": req.fix_target_date,
"evidence_ids": json.dumps(req.evidence_ids),
"confidence": req.confidence,
"tenant_id": req.tenant_id,
"project_id": req.project_id,
"metadata": json.dumps(req.metadata),
})
db.commit()
return {"id": trace_id, "status": "created"}
finally:
db.close()
@router.get("")
async def list_decisions(
control_uuid: Optional[str] = None,
status: Optional[str] = None,
tenant_id: Optional[str] = None,
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""List decision traces with optional filters."""
db = SessionLocal()
try:
clauses = []
params: dict = {"limit": limit, "offset": offset}
if control_uuid:
clauses.append("dt.control_uuid = CAST(:control_uuid AS uuid)")
params["control_uuid"] = control_uuid
if status:
clauses.append("dt.status = :status")
params["status"] = status
if tenant_id:
clauses.append("dt.tenant_id = CAST(:tenant_id AS uuid)")
params["tenant_id"] = tenant_id
where = "WHERE " + " AND ".join(clauses) if clauses else ""
rows = db.execute(text(f"""
SELECT dt.id, dt.control_uuid, cc.control_id, cc.title,
dt.status, dt.decision_reason, dt.decided_by, dt.decided_at,
dt.fix_strategy, dt.fix_owner, dt.fix_target_date, dt.fix_completed_date,
dt.evidence_ids, dt.confidence, dt.regulation_id
FROM decision_traces dt
LEFT JOIN canonical_controls cc ON cc.id = dt.control_uuid
{where}
ORDER BY dt.decided_at DESC NULLS LAST
LIMIT :limit OFFSET :offset
"""), params).fetchall()
total = db.execute(text(f"""
SELECT count(*) FROM decision_traces dt {where}
"""), params).scalar()
return {
"total": total,
"decisions": [
{
"id": str(r[0]),
"control_uuid": str(r[1]),
"control_id": r[2],
"control_title": r[3],
"status": r[4],
"decision_reason": r[5],
"decided_by": r[6],
"decided_at": str(r[7]) if r[7] else None,
"fix_strategy": r[8],
"fix_owner": r[9],
"fix_target_date": str(r[10]) if r[10] else None,
"fix_completed_date": str(r[11]) if r[11] else None,
"evidence_ids": r[12],
"confidence": float(r[13]) if r[13] else 0,
"regulation_id": r[14],
}
for r in rows
],
}
finally:
db.close()
@router.get("/stats")
async def decision_stats(tenant_id: Optional[str] = None):
"""Dashboard statistics for compliance decisions."""
db = SessionLocal()
try:
tenant_filter = ""
params: dict = {}
if tenant_id:
tenant_filter = "WHERE tenant_id = CAST(:tenant_id AS uuid)"
params["tenant_id"] = tenant_id
stats = db.execute(text(f"""
SELECT status, count(*) FROM decision_traces
{tenant_filter}
GROUP BY status
"""), params).fetchall()
total = sum(r[1] for r in stats)
by_status = {r[0]: r[1] for r in stats}
return {
"total_decisions": total,
"by_status": by_status,
"compliance_rate": round(
by_status.get("compliant", 0) / total * 100, 1
) if total > 0 else 0,
"pending_remediation": by_status.get("under_remediation", 0),
"not_assessed": by_status.get("not_assessed", 0),
}
finally:
db.close()
@router.get("/{trace_id}")
async def get_decision(trace_id: str):
"""Get a single decision trace."""
db = SessionLocal()
try:
row = db.execute(text("""
SELECT dt.*, cc.control_id, cc.title, cc.source_citation
FROM decision_traces dt
LEFT JOIN canonical_controls cc ON cc.id = dt.control_uuid
WHERE dt.id = CAST(:id AS uuid)
"""), {"id": trace_id}).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Decision trace not found")
return {
"id": str(row.id),
"control_uuid": str(row.control_uuid),
"control_id": row.control_id,
"control_title": row.title,
"regulation_id": row.regulation_id,
"obligation_id": row.obligation_id,
"status": row.status,
"decision_reason": row.decision_reason,
"decided_by": row.decided_by,
"decided_at": str(row.decided_at) if row.decided_at else None,
"fix_strategy": row.fix_strategy,
"fix_owner": row.fix_owner,
"fix_target_date": str(row.fix_target_date) if row.fix_target_date else None,
"fix_completed_date": str(row.fix_completed_date) if row.fix_completed_date else None,
"evidence_ids": row.evidence_ids,
"confidence": float(row.confidence) if row.confidence else 0,
"source_citation": row.source_citation,
"metadata": row.metadata,
}
finally:
db.close()
@router.put("/{trace_id}")
async def update_decision(trace_id: str, req: UpdateDecisionRequest):
"""Update a decision trace (status, fix progress, evidence)."""
db = SessionLocal()
try:
updates = []
params: dict = {"id": trace_id}
if req.status is not None:
updates.append("status = :status")
params["status"] = req.status
if req.decision_reason is not None:
updates.append("decision_reason = :reason")
params["reason"] = req.decision_reason
if req.decided_by is not None:
updates.append("decided_by = :decided_by")
params["decided_by"] = req.decided_by
if req.fix_strategy is not None:
updates.append("fix_strategy = :fix_strategy")
params["fix_strategy"] = req.fix_strategy
if req.fix_owner is not None:
updates.append("fix_owner = :fix_owner")
params["fix_owner"] = req.fix_owner
if req.fix_target_date is not None:
updates.append("fix_target_date = :fix_target")
params["fix_target"] = req.fix_target_date
if req.fix_completed_date is not None:
updates.append("fix_completed_date = :fix_completed")
params["fix_completed"] = req.fix_completed_date
if req.evidence_ids is not None:
updates.append("evidence_ids = CAST(:evidence AS jsonb)")
params["evidence"] = json.dumps(req.evidence_ids)
if req.confidence is not None:
updates.append("confidence = :confidence")
params["confidence"] = req.confidence
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
result = db.execute(text(f"""
UPDATE decision_traces SET {', '.join(updates)}
WHERE id = CAST(:id AS uuid)
"""), params)
db.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Decision trace not found")
return {"status": "updated", "id": trace_id}
finally:
db.close()
# ── Full Trace Endpoint ──────────────────────────────────────────────
full_trace_router = APIRouter(prefix="/v1/controls", tags=["decision-traces"])
@full_trace_router.get("/{control_id}/full-trace")
async def get_full_trace(control_id: str):
"""Get the complete Decision Trace chain for a control.
Returns: Regulation → Obligation → Control → Master Control → Decision → Evidence
"""
db = SessionLocal()
try:
# 1. Control
ctrl = db.execute(text("""
SELECT id, control_id, title, objective, severity,
source_citation, source_original_text,
verification_method, category,
generation_metadata->>'merge_group_hint' AS merge_hint
FROM canonical_controls
WHERE control_id = :cid
"""), {"cid": control_id}).fetchone()
if not ctrl:
raise HTTPException(status_code=404, detail="Control not found")
# 2. Regulation (from source_citation)
citation = ctrl.source_citation or {}
regulation = {
"source": citation.get("source"),
"article": citation.get("article"),
"paragraph": citation.get("paragraph"),
"source_type": citation.get("source_type"),
"license": citation.get("license"),
}
# 3. Obligation (from parent links)
obligations = db.execute(text("""
SELECT oc.candidate_id, oc.obligation_text, oc.action,
oc.object, oc.normative_strength
FROM obligation_candidates oc
WHERE oc.parent_control_uuid = CAST(:uuid AS uuid)
ORDER BY oc.candidate_id
LIMIT 10
"""), {"uuid": str(ctrl.id)}).fetchall()
# 4. Master Control (if member)
master = db.execute(text("""
SELECT mc.master_control_id, mc.canonical_name, mc.phases_covered
FROM master_control_members mcm
JOIN master_controls mc ON mc.id = mcm.master_control_uuid
WHERE mcm.control_uuid = CAST(:uuid AS uuid)
LIMIT 1
"""), {"uuid": str(ctrl.id)}).fetchone()
# 5. Decision Traces
decisions = db.execute(text("""
SELECT id, status, decision_reason, decided_by, decided_at,
fix_strategy, fix_owner, evidence_ids, confidence
FROM decision_traces
WHERE control_uuid = CAST(:uuid AS uuid)
ORDER BY decided_at DESC NULLS LAST
"""), {"uuid": str(ctrl.id)}).fetchall()
return {
"control": {
"id": ctrl.control_id,
"uuid": str(ctrl.id),
"title": ctrl.title,
"objective": ctrl.objective,
"severity": ctrl.severity,
"category": ctrl.category,
"verification_method": ctrl.verification_method,
},
"regulation": regulation,
"original_text": ctrl.source_original_text[:500] if ctrl.source_original_text else None,
"obligations": [
{
"id": o.candidate_id,
"text": o.obligation_text,
"action": o.action,
"object": o.object,
"strength": o.normative_strength,
}
for o in obligations
],
"master_control": {
"id": master.master_control_id,
"name": master.canonical_name,
"phases": master.phases_covered,
} if master else None,
"decisions": [
{
"id": str(d.id),
"status": d.status,
"reason": d.decision_reason,
"decided_by": d.decided_by,
"decided_at": str(d.decided_at) if d.decided_at else None,
"fix_strategy": d.fix_strategy,
"fix_owner": d.fix_owner,
"evidence_count": len(d.evidence_ids) if d.evidence_ids else 0,
"confidence": float(d.confidence) if d.confidence else 0,
}
for d in decisions
],
"latest_status": decisions[0].status if decisions else "not_assessed",
}
finally:
db.close()
@@ -0,0 +1,58 @@
-- Migration 006: Decision Traces (G1)
-- Schema: compliance
-- Run: ssh macmini "docker exec -i bp-core-postgres psql -U breakpilot -d breakpilot_db" < control-pipeline/migrations/006_decision_traces.sql
SET search_path TO compliance, public;
CREATE TABLE IF NOT EXISTS decision_traces (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
control_uuid UUID NOT NULL,
regulation_id VARCHAR(100),
obligation_id VARCHAR(100),
-- Decision
status VARCHAR(30) NOT NULL DEFAULT 'not_assessed'
CHECK (status IN ('not_assessed', 'compliant', 'partially_compliant',
'not_compliant', 'not_applicable', 'under_remediation')),
decision_reason TEXT,
decided_by VARCHAR(100),
decided_at TIMESTAMPTZ,
-- Fix/Remediation
fix_strategy TEXT,
fix_owner VARCHAR(100),
fix_target_date DATE,
fix_completed_date DATE,
-- Evidence
evidence_ids JSONB DEFAULT '[]',
confidence NUMERIC(3,2) DEFAULT 0.0,
-- Multi-tenant
tenant_id UUID,
project_id UUID,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_dt_control ON decision_traces(control_uuid);
CREATE INDEX IF NOT EXISTS idx_dt_status ON decision_traces(status);
CREATE INDEX IF NOT EXISTS idx_dt_tenant ON decision_traces(tenant_id);
CREATE INDEX IF NOT EXISTS idx_dt_decided_at ON decision_traces(decided_at);
-- Updated-at trigger
CREATE OR REPLACE FUNCTION update_decision_traces_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS trg_decision_traces_updated_at ON decision_traces;
CREATE TRIGGER trg_decision_traces_updated_at
BEFORE UPDATE ON decision_traces
FOR EACH ROW
EXECUTE FUNCTION update_decision_traces_updated_at();