66a70ab31c
New table: decision_traces (status, reason, evidence, fix plan per control)
New API:
POST/GET/PUT /v1/decision-traces (CRUD for decisions)
GET /v1/decision-traces/stats (compliance dashboard)
GET /v1/controls/{id}/full-trace (Regulation→Obligation→Control→Decision→Evidence)
454 tests pass, 0 regressions.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
405 lines
15 KiB
Python
405 lines
15 KiB
Python
"""Decision Trace API — G1 Compliance Execution Layer.
|
|
|
|
Tracks compliance decisions per control: who decided, when, why,
|
|
what evidence supports it, and what's the remediation plan.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, HTTPException, Query
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import text
|
|
|
|
from db.session import SessionLocal
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/v1/decision-traces", tags=["decision-traces"])
|
|
|
|
|
|
# ── Request/Response Models ──────────────────────────────────────────
|
|
|
|
|
|
class CreateDecisionRequest(BaseModel):
|
|
control_uuid: str
|
|
regulation_id: Optional[str] = None
|
|
obligation_id: Optional[str] = None
|
|
status: str = "not_assessed"
|
|
decision_reason: Optional[str] = None
|
|
decided_by: Optional[str] = None
|
|
fix_strategy: Optional[str] = None
|
|
fix_owner: Optional[str] = None
|
|
fix_target_date: Optional[str] = None
|
|
evidence_ids: list[str] = []
|
|
confidence: float = 0.0
|
|
tenant_id: Optional[str] = None
|
|
project_id: Optional[str] = None
|
|
metadata: dict = {}
|
|
|
|
|
|
class UpdateDecisionRequest(BaseModel):
|
|
status: Optional[str] = None
|
|
decision_reason: Optional[str] = None
|
|
decided_by: Optional[str] = None
|
|
fix_strategy: Optional[str] = None
|
|
fix_owner: Optional[str] = None
|
|
fix_target_date: Optional[str] = None
|
|
fix_completed_date: Optional[str] = None
|
|
evidence_ids: Optional[list[str]] = None
|
|
confidence: Optional[float] = None
|
|
metadata: Optional[dict] = None
|
|
|
|
|
|
# ── Endpoints ────────────────────────────────────────────────────────
|
|
|
|
|
|
@router.post("")
|
|
async def create_decision(req: CreateDecisionRequest):
|
|
"""Record a new compliance decision for a control."""
|
|
db = SessionLocal()
|
|
try:
|
|
trace_id = str(uuid.uuid4())
|
|
db.execute(text("""
|
|
INSERT INTO decision_traces
|
|
(id, control_uuid, regulation_id, obligation_id,
|
|
status, decision_reason, decided_by, decided_at,
|
|
fix_strategy, fix_owner, fix_target_date,
|
|
evidence_ids, confidence, tenant_id, project_id, metadata)
|
|
VALUES
|
|
(CAST(:id AS uuid), CAST(:control_uuid AS uuid), :regulation_id, :obligation_id,
|
|
:status, :decision_reason, :decided_by, NOW(),
|
|
:fix_strategy, :fix_owner, :fix_target_date,
|
|
CAST(:evidence_ids AS jsonb), :confidence,
|
|
:tenant_id, :project_id, CAST(:metadata AS jsonb))
|
|
"""), {
|
|
"id": trace_id,
|
|
"control_uuid": req.control_uuid,
|
|
"regulation_id": req.regulation_id,
|
|
"obligation_id": req.obligation_id,
|
|
"status": req.status,
|
|
"decision_reason": req.decision_reason,
|
|
"decided_by": req.decided_by,
|
|
"fix_strategy": req.fix_strategy,
|
|
"fix_owner": req.fix_owner,
|
|
"fix_target_date": req.fix_target_date,
|
|
"evidence_ids": json.dumps(req.evidence_ids),
|
|
"confidence": req.confidence,
|
|
"tenant_id": req.tenant_id,
|
|
"project_id": req.project_id,
|
|
"metadata": json.dumps(req.metadata),
|
|
})
|
|
db.commit()
|
|
return {"id": trace_id, "status": "created"}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get("")
|
|
async def list_decisions(
|
|
control_uuid: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
tenant_id: Optional[str] = None,
|
|
limit: int = Query(50, ge=1, le=500),
|
|
offset: int = Query(0, ge=0),
|
|
):
|
|
"""List decision traces with optional filters."""
|
|
db = SessionLocal()
|
|
try:
|
|
clauses = []
|
|
params: dict = {"limit": limit, "offset": offset}
|
|
|
|
if control_uuid:
|
|
clauses.append("dt.control_uuid = CAST(:control_uuid AS uuid)")
|
|
params["control_uuid"] = control_uuid
|
|
if status:
|
|
clauses.append("dt.status = :status")
|
|
params["status"] = status
|
|
if tenant_id:
|
|
clauses.append("dt.tenant_id = CAST(:tenant_id AS uuid)")
|
|
params["tenant_id"] = tenant_id
|
|
|
|
where = "WHERE " + " AND ".join(clauses) if clauses else ""
|
|
|
|
rows = db.execute(text(f"""
|
|
SELECT dt.id, dt.control_uuid, cc.control_id, cc.title,
|
|
dt.status, dt.decision_reason, dt.decided_by, dt.decided_at,
|
|
dt.fix_strategy, dt.fix_owner, dt.fix_target_date, dt.fix_completed_date,
|
|
dt.evidence_ids, dt.confidence, dt.regulation_id
|
|
FROM decision_traces dt
|
|
LEFT JOIN canonical_controls cc ON cc.id = dt.control_uuid
|
|
{where}
|
|
ORDER BY dt.decided_at DESC NULLS LAST
|
|
LIMIT :limit OFFSET :offset
|
|
"""), params).fetchall()
|
|
|
|
total = db.execute(text(f"""
|
|
SELECT count(*) FROM decision_traces dt {where}
|
|
"""), params).scalar()
|
|
|
|
return {
|
|
"total": total,
|
|
"decisions": [
|
|
{
|
|
"id": str(r[0]),
|
|
"control_uuid": str(r[1]),
|
|
"control_id": r[2],
|
|
"control_title": r[3],
|
|
"status": r[4],
|
|
"decision_reason": r[5],
|
|
"decided_by": r[6],
|
|
"decided_at": str(r[7]) if r[7] else None,
|
|
"fix_strategy": r[8],
|
|
"fix_owner": r[9],
|
|
"fix_target_date": str(r[10]) if r[10] else None,
|
|
"fix_completed_date": str(r[11]) if r[11] else None,
|
|
"evidence_ids": r[12],
|
|
"confidence": float(r[13]) if r[13] else 0,
|
|
"regulation_id": r[14],
|
|
}
|
|
for r in rows
|
|
],
|
|
}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get("/stats")
|
|
async def decision_stats(tenant_id: Optional[str] = None):
|
|
"""Dashboard statistics for compliance decisions."""
|
|
db = SessionLocal()
|
|
try:
|
|
tenant_filter = ""
|
|
params: dict = {}
|
|
if tenant_id:
|
|
tenant_filter = "WHERE tenant_id = CAST(:tenant_id AS uuid)"
|
|
params["tenant_id"] = tenant_id
|
|
|
|
stats = db.execute(text(f"""
|
|
SELECT status, count(*) FROM decision_traces
|
|
{tenant_filter}
|
|
GROUP BY status
|
|
"""), params).fetchall()
|
|
|
|
total = sum(r[1] for r in stats)
|
|
by_status = {r[0]: r[1] for r in stats}
|
|
|
|
return {
|
|
"total_decisions": total,
|
|
"by_status": by_status,
|
|
"compliance_rate": round(
|
|
by_status.get("compliant", 0) / total * 100, 1
|
|
) if total > 0 else 0,
|
|
"pending_remediation": by_status.get("under_remediation", 0),
|
|
"not_assessed": by_status.get("not_assessed", 0),
|
|
}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get("/{trace_id}")
|
|
async def get_decision(trace_id: str):
|
|
"""Get a single decision trace."""
|
|
db = SessionLocal()
|
|
try:
|
|
row = db.execute(text("""
|
|
SELECT dt.*, cc.control_id, cc.title, cc.source_citation
|
|
FROM decision_traces dt
|
|
LEFT JOIN canonical_controls cc ON cc.id = dt.control_uuid
|
|
WHERE dt.id = CAST(:id AS uuid)
|
|
"""), {"id": trace_id}).fetchone()
|
|
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Decision trace not found")
|
|
|
|
return {
|
|
"id": str(row.id),
|
|
"control_uuid": str(row.control_uuid),
|
|
"control_id": row.control_id,
|
|
"control_title": row.title,
|
|
"regulation_id": row.regulation_id,
|
|
"obligation_id": row.obligation_id,
|
|
"status": row.status,
|
|
"decision_reason": row.decision_reason,
|
|
"decided_by": row.decided_by,
|
|
"decided_at": str(row.decided_at) if row.decided_at else None,
|
|
"fix_strategy": row.fix_strategy,
|
|
"fix_owner": row.fix_owner,
|
|
"fix_target_date": str(row.fix_target_date) if row.fix_target_date else None,
|
|
"fix_completed_date": str(row.fix_completed_date) if row.fix_completed_date else None,
|
|
"evidence_ids": row.evidence_ids,
|
|
"confidence": float(row.confidence) if row.confidence else 0,
|
|
"source_citation": row.source_citation,
|
|
"metadata": row.metadata,
|
|
}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.put("/{trace_id}")
|
|
async def update_decision(trace_id: str, req: UpdateDecisionRequest):
|
|
"""Update a decision trace (status, fix progress, evidence)."""
|
|
db = SessionLocal()
|
|
try:
|
|
updates = []
|
|
params: dict = {"id": trace_id}
|
|
|
|
if req.status is not None:
|
|
updates.append("status = :status")
|
|
params["status"] = req.status
|
|
if req.decision_reason is not None:
|
|
updates.append("decision_reason = :reason")
|
|
params["reason"] = req.decision_reason
|
|
if req.decided_by is not None:
|
|
updates.append("decided_by = :decided_by")
|
|
params["decided_by"] = req.decided_by
|
|
if req.fix_strategy is not None:
|
|
updates.append("fix_strategy = :fix_strategy")
|
|
params["fix_strategy"] = req.fix_strategy
|
|
if req.fix_owner is not None:
|
|
updates.append("fix_owner = :fix_owner")
|
|
params["fix_owner"] = req.fix_owner
|
|
if req.fix_target_date is not None:
|
|
updates.append("fix_target_date = :fix_target")
|
|
params["fix_target"] = req.fix_target_date
|
|
if req.fix_completed_date is not None:
|
|
updates.append("fix_completed_date = :fix_completed")
|
|
params["fix_completed"] = req.fix_completed_date
|
|
if req.evidence_ids is not None:
|
|
updates.append("evidence_ids = CAST(:evidence AS jsonb)")
|
|
params["evidence"] = json.dumps(req.evidence_ids)
|
|
if req.confidence is not None:
|
|
updates.append("confidence = :confidence")
|
|
params["confidence"] = req.confidence
|
|
|
|
if not updates:
|
|
raise HTTPException(status_code=400, detail="No fields to update")
|
|
|
|
result = db.execute(text(f"""
|
|
UPDATE decision_traces SET {', '.join(updates)}
|
|
WHERE id = CAST(:id AS uuid)
|
|
"""), params)
|
|
db.commit()
|
|
|
|
if result.rowcount == 0:
|
|
raise HTTPException(status_code=404, detail="Decision trace not found")
|
|
|
|
return {"status": "updated", "id": trace_id}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# ── Full Trace Endpoint ──────────────────────────────────────────────
|
|
|
|
|
|
full_trace_router = APIRouter(prefix="/v1/controls", tags=["decision-traces"])
|
|
|
|
|
|
@full_trace_router.get("/{control_id}/full-trace")
|
|
async def get_full_trace(control_id: str):
|
|
"""Get the complete Decision Trace chain for a control.
|
|
|
|
Returns: Regulation → Obligation → Control → Master Control → Decision → Evidence
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
# 1. Control
|
|
ctrl = db.execute(text("""
|
|
SELECT id, control_id, title, objective, severity,
|
|
source_citation, source_original_text,
|
|
verification_method, category,
|
|
generation_metadata->>'merge_group_hint' AS merge_hint
|
|
FROM canonical_controls
|
|
WHERE control_id = :cid
|
|
"""), {"cid": control_id}).fetchone()
|
|
|
|
if not ctrl:
|
|
raise HTTPException(status_code=404, detail="Control not found")
|
|
|
|
# 2. Regulation (from source_citation)
|
|
citation = ctrl.source_citation or {}
|
|
regulation = {
|
|
"source": citation.get("source"),
|
|
"article": citation.get("article"),
|
|
"paragraph": citation.get("paragraph"),
|
|
"source_type": citation.get("source_type"),
|
|
"license": citation.get("license"),
|
|
}
|
|
|
|
# 3. Obligation (from parent links)
|
|
obligations = db.execute(text("""
|
|
SELECT oc.candidate_id, oc.obligation_text, oc.action,
|
|
oc.object, oc.normative_strength
|
|
FROM obligation_candidates oc
|
|
WHERE oc.parent_control_uuid = CAST(:uuid AS uuid)
|
|
ORDER BY oc.candidate_id
|
|
LIMIT 10
|
|
"""), {"uuid": str(ctrl.id)}).fetchall()
|
|
|
|
# 4. Master Control (if member)
|
|
master = db.execute(text("""
|
|
SELECT mc.master_control_id, mc.canonical_name, mc.phases_covered
|
|
FROM master_control_members mcm
|
|
JOIN master_controls mc ON mc.id = mcm.master_control_uuid
|
|
WHERE mcm.control_uuid = CAST(:uuid AS uuid)
|
|
LIMIT 1
|
|
"""), {"uuid": str(ctrl.id)}).fetchone()
|
|
|
|
# 5. Decision Traces
|
|
decisions = db.execute(text("""
|
|
SELECT id, status, decision_reason, decided_by, decided_at,
|
|
fix_strategy, fix_owner, evidence_ids, confidence
|
|
FROM decision_traces
|
|
WHERE control_uuid = CAST(:uuid AS uuid)
|
|
ORDER BY decided_at DESC NULLS LAST
|
|
"""), {"uuid": str(ctrl.id)}).fetchall()
|
|
|
|
return {
|
|
"control": {
|
|
"id": ctrl.control_id,
|
|
"uuid": str(ctrl.id),
|
|
"title": ctrl.title,
|
|
"objective": ctrl.objective,
|
|
"severity": ctrl.severity,
|
|
"category": ctrl.category,
|
|
"verification_method": ctrl.verification_method,
|
|
},
|
|
"regulation": regulation,
|
|
"original_text": ctrl.source_original_text[:500] if ctrl.source_original_text else None,
|
|
"obligations": [
|
|
{
|
|
"id": o.candidate_id,
|
|
"text": o.obligation_text,
|
|
"action": o.action,
|
|
"object": o.object,
|
|
"strength": o.normative_strength,
|
|
}
|
|
for o in obligations
|
|
],
|
|
"master_control": {
|
|
"id": master.master_control_id,
|
|
"name": master.canonical_name,
|
|
"phases": master.phases_covered,
|
|
} if master else None,
|
|
"decisions": [
|
|
{
|
|
"id": str(d.id),
|
|
"status": d.status,
|
|
"reason": d.decision_reason,
|
|
"decided_by": d.decided_by,
|
|
"decided_at": str(d.decided_at) if d.decided_at else None,
|
|
"fix_strategy": d.fix_strategy,
|
|
"fix_owner": d.fix_owner,
|
|
"evidence_count": len(d.evidence_ids) if d.evidence_ids else 0,
|
|
"confidence": float(d.confidence) if d.confidence else 0,
|
|
}
|
|
for d in decisions
|
|
],
|
|
"latest_status": decisions[0].status if decisions else "not_assessed",
|
|
}
|
|
finally:
|
|
db.close()
|