"""Decision Trace API — G1 Compliance Execution Layer. Tracks compliance decisions per control: who decided, when, why, what evidence supports it, and what's the remediation plan. """ import json import logging import uuid from datetime import datetime from typing import Optional from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel from sqlalchemy import text from db.session import SessionLocal logger = logging.getLogger(__name__) router = APIRouter(prefix="/v1/decision-traces", tags=["decision-traces"]) # ── Request/Response Models ────────────────────────────────────────── class CreateDecisionRequest(BaseModel): control_uuid: str regulation_id: Optional[str] = None obligation_id: Optional[str] = None status: str = "not_assessed" decision_reason: Optional[str] = None decided_by: Optional[str] = None fix_strategy: Optional[str] = None fix_owner: Optional[str] = None fix_target_date: Optional[str] = None evidence_ids: list[str] = [] confidence: float = 0.0 tenant_id: Optional[str] = None project_id: Optional[str] = None metadata: dict = {} class UpdateDecisionRequest(BaseModel): status: Optional[str] = None decision_reason: Optional[str] = None decided_by: Optional[str] = None fix_strategy: Optional[str] = None fix_owner: Optional[str] = None fix_target_date: Optional[str] = None fix_completed_date: Optional[str] = None evidence_ids: Optional[list[str]] = None confidence: Optional[float] = None metadata: Optional[dict] = None # ── Endpoints ──────────────────────────────────────────────────────── @router.post("") async def create_decision(req: CreateDecisionRequest): """Record a new compliance decision for a control.""" db = SessionLocal() try: trace_id = str(uuid.uuid4()) db.execute(text(""" INSERT INTO decision_traces (id, control_uuid, regulation_id, obligation_id, status, decision_reason, decided_by, decided_at, fix_strategy, fix_owner, fix_target_date, evidence_ids, confidence, tenant_id, project_id, metadata) VALUES (CAST(:id AS uuid), CAST(:control_uuid AS uuid), :regulation_id, :obligation_id, :status, :decision_reason, :decided_by, NOW(), :fix_strategy, :fix_owner, :fix_target_date, CAST(:evidence_ids AS jsonb), :confidence, :tenant_id, :project_id, CAST(:metadata AS jsonb)) """), { "id": trace_id, "control_uuid": req.control_uuid, "regulation_id": req.regulation_id, "obligation_id": req.obligation_id, "status": req.status, "decision_reason": req.decision_reason, "decided_by": req.decided_by, "fix_strategy": req.fix_strategy, "fix_owner": req.fix_owner, "fix_target_date": req.fix_target_date, "evidence_ids": json.dumps(req.evidence_ids), "confidence": req.confidence, "tenant_id": req.tenant_id, "project_id": req.project_id, "metadata": json.dumps(req.metadata), }) db.commit() return {"id": trace_id, "status": "created"} finally: db.close() @router.get("") async def list_decisions( control_uuid: Optional[str] = None, status: Optional[str] = None, tenant_id: Optional[str] = None, limit: int = Query(50, ge=1, le=500), offset: int = Query(0, ge=0), ): """List decision traces with optional filters.""" db = SessionLocal() try: clauses = [] params: dict = {"limit": limit, "offset": offset} if control_uuid: clauses.append("dt.control_uuid = CAST(:control_uuid AS uuid)") params["control_uuid"] = control_uuid if status: clauses.append("dt.status = :status") params["status"] = status if tenant_id: clauses.append("dt.tenant_id = CAST(:tenant_id AS uuid)") params["tenant_id"] = tenant_id where = "WHERE " + " AND ".join(clauses) if clauses else "" rows = db.execute(text(f""" SELECT dt.id, dt.control_uuid, cc.control_id, cc.title, dt.status, dt.decision_reason, dt.decided_by, dt.decided_at, dt.fix_strategy, dt.fix_owner, dt.fix_target_date, dt.fix_completed_date, dt.evidence_ids, dt.confidence, dt.regulation_id FROM decision_traces dt LEFT JOIN canonical_controls cc ON cc.id = dt.control_uuid {where} ORDER BY dt.decided_at DESC NULLS LAST LIMIT :limit OFFSET :offset """), params).fetchall() total = db.execute(text(f""" SELECT count(*) FROM decision_traces dt {where} """), params).scalar() return { "total": total, "decisions": [ { "id": str(r[0]), "control_uuid": str(r[1]), "control_id": r[2], "control_title": r[3], "status": r[4], "decision_reason": r[5], "decided_by": r[6], "decided_at": str(r[7]) if r[7] else None, "fix_strategy": r[8], "fix_owner": r[9], "fix_target_date": str(r[10]) if r[10] else None, "fix_completed_date": str(r[11]) if r[11] else None, "evidence_ids": r[12], "confidence": float(r[13]) if r[13] else 0, "regulation_id": r[14], } for r in rows ], } finally: db.close() @router.get("/stats") async def decision_stats(tenant_id: Optional[str] = None): """Dashboard statistics for compliance decisions.""" db = SessionLocal() try: tenant_filter = "" params: dict = {} if tenant_id: tenant_filter = "WHERE tenant_id = CAST(:tenant_id AS uuid)" params["tenant_id"] = tenant_id stats = db.execute(text(f""" SELECT status, count(*) FROM decision_traces {tenant_filter} GROUP BY status """), params).fetchall() total = sum(r[1] for r in stats) by_status = {r[0]: r[1] for r in stats} return { "total_decisions": total, "by_status": by_status, "compliance_rate": round( by_status.get("compliant", 0) / total * 100, 1 ) if total > 0 else 0, "pending_remediation": by_status.get("under_remediation", 0), "not_assessed": by_status.get("not_assessed", 0), } finally: db.close() @router.get("/{trace_id}") async def get_decision(trace_id: str): """Get a single decision trace.""" db = SessionLocal() try: row = db.execute(text(""" SELECT dt.*, cc.control_id, cc.title, cc.source_citation FROM decision_traces dt LEFT JOIN canonical_controls cc ON cc.id = dt.control_uuid WHERE dt.id = CAST(:id AS uuid) """), {"id": trace_id}).fetchone() if not row: raise HTTPException(status_code=404, detail="Decision trace not found") return { "id": str(row.id), "control_uuid": str(row.control_uuid), "control_id": row.control_id, "control_title": row.title, "regulation_id": row.regulation_id, "obligation_id": row.obligation_id, "status": row.status, "decision_reason": row.decision_reason, "decided_by": row.decided_by, "decided_at": str(row.decided_at) if row.decided_at else None, "fix_strategy": row.fix_strategy, "fix_owner": row.fix_owner, "fix_target_date": str(row.fix_target_date) if row.fix_target_date else None, "fix_completed_date": str(row.fix_completed_date) if row.fix_completed_date else None, "evidence_ids": row.evidence_ids, "confidence": float(row.confidence) if row.confidence else 0, "source_citation": row.source_citation, "metadata": row.metadata, } finally: db.close() @router.put("/{trace_id}") async def update_decision(trace_id: str, req: UpdateDecisionRequest): """Update a decision trace (status, fix progress, evidence).""" db = SessionLocal() try: updates = [] params: dict = {"id": trace_id} if req.status is not None: updates.append("status = :status") params["status"] = req.status if req.decision_reason is not None: updates.append("decision_reason = :reason") params["reason"] = req.decision_reason if req.decided_by is not None: updates.append("decided_by = :decided_by") params["decided_by"] = req.decided_by if req.fix_strategy is not None: updates.append("fix_strategy = :fix_strategy") params["fix_strategy"] = req.fix_strategy if req.fix_owner is not None: updates.append("fix_owner = :fix_owner") params["fix_owner"] = req.fix_owner if req.fix_target_date is not None: updates.append("fix_target_date = :fix_target") params["fix_target"] = req.fix_target_date if req.fix_completed_date is not None: updates.append("fix_completed_date = :fix_completed") params["fix_completed"] = req.fix_completed_date if req.evidence_ids is not None: updates.append("evidence_ids = CAST(:evidence AS jsonb)") params["evidence"] = json.dumps(req.evidence_ids) if req.confidence is not None: updates.append("confidence = :confidence") params["confidence"] = req.confidence if not updates: raise HTTPException(status_code=400, detail="No fields to update") result = db.execute(text(f""" UPDATE decision_traces SET {', '.join(updates)} WHERE id = CAST(:id AS uuid) """), params) db.commit() if result.rowcount == 0: raise HTTPException(status_code=404, detail="Decision trace not found") return {"status": "updated", "id": trace_id} finally: db.close() # ── Full Trace Endpoint ────────────────────────────────────────────── full_trace_router = APIRouter(prefix="/v1/controls", tags=["decision-traces"]) @full_trace_router.get("/{control_id}/full-trace") async def get_full_trace(control_id: str): """Get the complete Decision Trace chain for a control. Returns: Regulation → Obligation → Control → Master Control → Decision → Evidence """ db = SessionLocal() try: # 1. Control ctrl = db.execute(text(""" SELECT id, control_id, title, objective, severity, source_citation, source_original_text, verification_method, category, generation_metadata->>'merge_group_hint' AS merge_hint FROM canonical_controls WHERE control_id = :cid """), {"cid": control_id}).fetchone() if not ctrl: raise HTTPException(status_code=404, detail="Control not found") # 2. Regulation (from source_citation) citation = ctrl.source_citation or {} regulation = { "source": citation.get("source"), "article": citation.get("article"), "paragraph": citation.get("paragraph"), "source_type": citation.get("source_type"), "license": citation.get("license"), } # 3. Obligation (from parent links) obligations = db.execute(text(""" SELECT oc.candidate_id, oc.obligation_text, oc.action, oc.object, oc.normative_strength FROM obligation_candidates oc WHERE oc.parent_control_uuid = CAST(:uuid AS uuid) ORDER BY oc.candidate_id LIMIT 10 """), {"uuid": str(ctrl.id)}).fetchall() # 4. Master Control (if member) master = db.execute(text(""" SELECT mc.master_control_id, mc.canonical_name, mc.phases_covered FROM master_control_members mcm JOIN master_controls mc ON mc.id = mcm.master_control_uuid WHERE mcm.control_uuid = CAST(:uuid AS uuid) LIMIT 1 """), {"uuid": str(ctrl.id)}).fetchone() # 5. Decision Traces decisions = db.execute(text(""" SELECT id, status, decision_reason, decided_by, decided_at, fix_strategy, fix_owner, evidence_ids, confidence FROM decision_traces WHERE control_uuid = CAST(:uuid AS uuid) ORDER BY decided_at DESC NULLS LAST """), {"uuid": str(ctrl.id)}).fetchall() return { "control": { "id": ctrl.control_id, "uuid": str(ctrl.id), "title": ctrl.title, "objective": ctrl.objective, "severity": ctrl.severity, "category": ctrl.category, "verification_method": ctrl.verification_method, }, "regulation": regulation, "original_text": ctrl.source_original_text[:500] if ctrl.source_original_text else None, "obligations": [ { "id": o.candidate_id, "text": o.obligation_text, "action": o.action, "object": o.object, "strength": o.normative_strength, } for o in obligations ], "master_control": { "id": master.master_control_id, "name": master.canonical_name, "phases": master.phases_covered, } if master else None, "decisions": [ { "id": str(d.id), "status": d.status, "reason": d.decision_reason, "decided_by": d.decided_by, "decided_at": str(d.decided_at) if d.decided_at else None, "fix_strategy": d.fix_strategy, "fix_owner": d.fix_owner, "evidence_count": len(d.evidence_ids) if d.evidence_ids else 0, "confidence": float(d.confidence) if d.confidence else 0, } for d in decisions ], "latest_status": decisions[0].status if decisions else "not_assessed", } finally: db.close()