Files
breakpilot-core/control-pipeline/api/decision_trace_routes.py
T
Benjamin Admin 66a70ab31c feat(pipeline): G1 Decision Trace — compliance decision tracking
New table: decision_traces (status, reason, evidence, fix plan per control)
New API:
  POST/GET/PUT /v1/decision-traces (CRUD for decisions)
  GET /v1/decision-traces/stats (compliance dashboard)
  GET /v1/controls/{id}/full-trace (Regulation→Obligation→Control→Decision→Evidence)

454 tests pass, 0 regressions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-06 18:26:21 +02:00

405 lines
15 KiB
Python

"""Decision Trace API — G1 Compliance Execution Layer.
Tracks compliance decisions per control: who decided, when, why,
what evidence supports it, and what's the remediation plan.
"""
import json
import logging
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import text
from db.session import SessionLocal
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/decision-traces", tags=["decision-traces"])
# ── Request/Response Models ──────────────────────────────────────────
class CreateDecisionRequest(BaseModel):
control_uuid: str
regulation_id: Optional[str] = None
obligation_id: Optional[str] = None
status: str = "not_assessed"
decision_reason: Optional[str] = None
decided_by: Optional[str] = None
fix_strategy: Optional[str] = None
fix_owner: Optional[str] = None
fix_target_date: Optional[str] = None
evidence_ids: list[str] = []
confidence: float = 0.0
tenant_id: Optional[str] = None
project_id: Optional[str] = None
metadata: dict = {}
class UpdateDecisionRequest(BaseModel):
status: Optional[str] = None
decision_reason: Optional[str] = None
decided_by: Optional[str] = None
fix_strategy: Optional[str] = None
fix_owner: Optional[str] = None
fix_target_date: Optional[str] = None
fix_completed_date: Optional[str] = None
evidence_ids: Optional[list[str]] = None
confidence: Optional[float] = None
metadata: Optional[dict] = None
# ── Endpoints ────────────────────────────────────────────────────────
@router.post("")
async def create_decision(req: CreateDecisionRequest):
"""Record a new compliance decision for a control."""
db = SessionLocal()
try:
trace_id = str(uuid.uuid4())
db.execute(text("""
INSERT INTO decision_traces
(id, control_uuid, regulation_id, obligation_id,
status, decision_reason, decided_by, decided_at,
fix_strategy, fix_owner, fix_target_date,
evidence_ids, confidence, tenant_id, project_id, metadata)
VALUES
(CAST(:id AS uuid), CAST(:control_uuid AS uuid), :regulation_id, :obligation_id,
:status, :decision_reason, :decided_by, NOW(),
:fix_strategy, :fix_owner, :fix_target_date,
CAST(:evidence_ids AS jsonb), :confidence,
:tenant_id, :project_id, CAST(:metadata AS jsonb))
"""), {
"id": trace_id,
"control_uuid": req.control_uuid,
"regulation_id": req.regulation_id,
"obligation_id": req.obligation_id,
"status": req.status,
"decision_reason": req.decision_reason,
"decided_by": req.decided_by,
"fix_strategy": req.fix_strategy,
"fix_owner": req.fix_owner,
"fix_target_date": req.fix_target_date,
"evidence_ids": json.dumps(req.evidence_ids),
"confidence": req.confidence,
"tenant_id": req.tenant_id,
"project_id": req.project_id,
"metadata": json.dumps(req.metadata),
})
db.commit()
return {"id": trace_id, "status": "created"}
finally:
db.close()
@router.get("")
async def list_decisions(
control_uuid: Optional[str] = None,
status: Optional[str] = None,
tenant_id: Optional[str] = None,
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""List decision traces with optional filters."""
db = SessionLocal()
try:
clauses = []
params: dict = {"limit": limit, "offset": offset}
if control_uuid:
clauses.append("dt.control_uuid = CAST(:control_uuid AS uuid)")
params["control_uuid"] = control_uuid
if status:
clauses.append("dt.status = :status")
params["status"] = status
if tenant_id:
clauses.append("dt.tenant_id = CAST(:tenant_id AS uuid)")
params["tenant_id"] = tenant_id
where = "WHERE " + " AND ".join(clauses) if clauses else ""
rows = db.execute(text(f"""
SELECT dt.id, dt.control_uuid, cc.control_id, cc.title,
dt.status, dt.decision_reason, dt.decided_by, dt.decided_at,
dt.fix_strategy, dt.fix_owner, dt.fix_target_date, dt.fix_completed_date,
dt.evidence_ids, dt.confidence, dt.regulation_id
FROM decision_traces dt
LEFT JOIN canonical_controls cc ON cc.id = dt.control_uuid
{where}
ORDER BY dt.decided_at DESC NULLS LAST
LIMIT :limit OFFSET :offset
"""), params).fetchall()
total = db.execute(text(f"""
SELECT count(*) FROM decision_traces dt {where}
"""), params).scalar()
return {
"total": total,
"decisions": [
{
"id": str(r[0]),
"control_uuid": str(r[1]),
"control_id": r[2],
"control_title": r[3],
"status": r[4],
"decision_reason": r[5],
"decided_by": r[6],
"decided_at": str(r[7]) if r[7] else None,
"fix_strategy": r[8],
"fix_owner": r[9],
"fix_target_date": str(r[10]) if r[10] else None,
"fix_completed_date": str(r[11]) if r[11] else None,
"evidence_ids": r[12],
"confidence": float(r[13]) if r[13] else 0,
"regulation_id": r[14],
}
for r in rows
],
}
finally:
db.close()
@router.get("/stats")
async def decision_stats(tenant_id: Optional[str] = None):
"""Dashboard statistics for compliance decisions."""
db = SessionLocal()
try:
tenant_filter = ""
params: dict = {}
if tenant_id:
tenant_filter = "WHERE tenant_id = CAST(:tenant_id AS uuid)"
params["tenant_id"] = tenant_id
stats = db.execute(text(f"""
SELECT status, count(*) FROM decision_traces
{tenant_filter}
GROUP BY status
"""), params).fetchall()
total = sum(r[1] for r in stats)
by_status = {r[0]: r[1] for r in stats}
return {
"total_decisions": total,
"by_status": by_status,
"compliance_rate": round(
by_status.get("compliant", 0) / total * 100, 1
) if total > 0 else 0,
"pending_remediation": by_status.get("under_remediation", 0),
"not_assessed": by_status.get("not_assessed", 0),
}
finally:
db.close()
@router.get("/{trace_id}")
async def get_decision(trace_id: str):
"""Get a single decision trace."""
db = SessionLocal()
try:
row = db.execute(text("""
SELECT dt.*, cc.control_id, cc.title, cc.source_citation
FROM decision_traces dt
LEFT JOIN canonical_controls cc ON cc.id = dt.control_uuid
WHERE dt.id = CAST(:id AS uuid)
"""), {"id": trace_id}).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Decision trace not found")
return {
"id": str(row.id),
"control_uuid": str(row.control_uuid),
"control_id": row.control_id,
"control_title": row.title,
"regulation_id": row.regulation_id,
"obligation_id": row.obligation_id,
"status": row.status,
"decision_reason": row.decision_reason,
"decided_by": row.decided_by,
"decided_at": str(row.decided_at) if row.decided_at else None,
"fix_strategy": row.fix_strategy,
"fix_owner": row.fix_owner,
"fix_target_date": str(row.fix_target_date) if row.fix_target_date else None,
"fix_completed_date": str(row.fix_completed_date) if row.fix_completed_date else None,
"evidence_ids": row.evidence_ids,
"confidence": float(row.confidence) if row.confidence else 0,
"source_citation": row.source_citation,
"metadata": row.metadata,
}
finally:
db.close()
@router.put("/{trace_id}")
async def update_decision(trace_id: str, req: UpdateDecisionRequest):
"""Update a decision trace (status, fix progress, evidence)."""
db = SessionLocal()
try:
updates = []
params: dict = {"id": trace_id}
if req.status is not None:
updates.append("status = :status")
params["status"] = req.status
if req.decision_reason is not None:
updates.append("decision_reason = :reason")
params["reason"] = req.decision_reason
if req.decided_by is not None:
updates.append("decided_by = :decided_by")
params["decided_by"] = req.decided_by
if req.fix_strategy is not None:
updates.append("fix_strategy = :fix_strategy")
params["fix_strategy"] = req.fix_strategy
if req.fix_owner is not None:
updates.append("fix_owner = :fix_owner")
params["fix_owner"] = req.fix_owner
if req.fix_target_date is not None:
updates.append("fix_target_date = :fix_target")
params["fix_target"] = req.fix_target_date
if req.fix_completed_date is not None:
updates.append("fix_completed_date = :fix_completed")
params["fix_completed"] = req.fix_completed_date
if req.evidence_ids is not None:
updates.append("evidence_ids = CAST(:evidence AS jsonb)")
params["evidence"] = json.dumps(req.evidence_ids)
if req.confidence is not None:
updates.append("confidence = :confidence")
params["confidence"] = req.confidence
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
result = db.execute(text(f"""
UPDATE decision_traces SET {', '.join(updates)}
WHERE id = CAST(:id AS uuid)
"""), params)
db.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Decision trace not found")
return {"status": "updated", "id": trace_id}
finally:
db.close()
# ── Full Trace Endpoint ──────────────────────────────────────────────
full_trace_router = APIRouter(prefix="/v1/controls", tags=["decision-traces"])
@full_trace_router.get("/{control_id}/full-trace")
async def get_full_trace(control_id: str):
"""Get the complete Decision Trace chain for a control.
Returns: Regulation → Obligation → Control → Master Control → Decision → Evidence
"""
db = SessionLocal()
try:
# 1. Control
ctrl = db.execute(text("""
SELECT id, control_id, title, objective, severity,
source_citation, source_original_text,
verification_method, category,
generation_metadata->>'merge_group_hint' AS merge_hint
FROM canonical_controls
WHERE control_id = :cid
"""), {"cid": control_id}).fetchone()
if not ctrl:
raise HTTPException(status_code=404, detail="Control not found")
# 2. Regulation (from source_citation)
citation = ctrl.source_citation or {}
regulation = {
"source": citation.get("source"),
"article": citation.get("article"),
"paragraph": citation.get("paragraph"),
"source_type": citation.get("source_type"),
"license": citation.get("license"),
}
# 3. Obligation (from parent links)
obligations = db.execute(text("""
SELECT oc.candidate_id, oc.obligation_text, oc.action,
oc.object, oc.normative_strength
FROM obligation_candidates oc
WHERE oc.parent_control_uuid = CAST(:uuid AS uuid)
ORDER BY oc.candidate_id
LIMIT 10
"""), {"uuid": str(ctrl.id)}).fetchall()
# 4. Master Control (if member)
master = db.execute(text("""
SELECT mc.master_control_id, mc.canonical_name, mc.phases_covered
FROM master_control_members mcm
JOIN master_controls mc ON mc.id = mcm.master_control_uuid
WHERE mcm.control_uuid = CAST(:uuid AS uuid)
LIMIT 1
"""), {"uuid": str(ctrl.id)}).fetchone()
# 5. Decision Traces
decisions = db.execute(text("""
SELECT id, status, decision_reason, decided_by, decided_at,
fix_strategy, fix_owner, evidence_ids, confidence
FROM decision_traces
WHERE control_uuid = CAST(:uuid AS uuid)
ORDER BY decided_at DESC NULLS LAST
"""), {"uuid": str(ctrl.id)}).fetchall()
return {
"control": {
"id": ctrl.control_id,
"uuid": str(ctrl.id),
"title": ctrl.title,
"objective": ctrl.objective,
"severity": ctrl.severity,
"category": ctrl.category,
"verification_method": ctrl.verification_method,
},
"regulation": regulation,
"original_text": ctrl.source_original_text[:500] if ctrl.source_original_text else None,
"obligations": [
{
"id": o.candidate_id,
"text": o.obligation_text,
"action": o.action,
"object": o.object,
"strength": o.normative_strength,
}
for o in obligations
],
"master_control": {
"id": master.master_control_id,
"name": master.canonical_name,
"phases": master.phases_covered,
} if master else None,
"decisions": [
{
"id": str(d.id),
"status": d.status,
"reason": d.decision_reason,
"decided_by": d.decided_by,
"decided_at": str(d.decided_at) if d.decided_at else None,
"fix_strategy": d.fix_strategy,
"fix_owner": d.fix_owner,
"evidence_count": len(d.evidence_ids) if d.evidence_ids else 0,
"confidence": float(d.confidence) if d.confidence else 0,
}
for d in decisions
],
"latest_status": decisions[0].status if decisions else "not_assessed",
}
finally:
db.close()