From 66a70ab31c1d0b35d30ae6655124ba96ff9b680c Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Wed, 6 May 2026 18:26:21 +0200 Subject: [PATCH] =?UTF-8?q?feat(pipeline):=20G1=20Decision=20Trace=20?= =?UTF-8?q?=E2=80=94=20compliance=20decision=20tracking?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New table: decision_traces (status, reason, evidence, fix plan per control) New API: POST/GET/PUT /v1/decision-traces (CRUD for decisions) GET /v1/decision-traces/stats (compliance dashboard) GET /v1/controls/{id}/full-trace (Regulation→Obligation→Control→Decision→Evidence) 454 tests pass, 0 regressions. Co-Authored-By: Claude Opus 4.6 (1M context) --- control-pipeline/api/__init__.py | 4 + control-pipeline/api/decision_trace_routes.py | 404 ++++++++++++++++++ .../migrations/006_decision_traces.sql | 58 +++ 3 files changed, 466 insertions(+) create mode 100644 control-pipeline/api/decision_trace_routes.py create mode 100644 control-pipeline/migrations/006_decision_traces.sql diff --git a/control-pipeline/api/__init__.py b/control-pipeline/api/__init__.py index c3087c9..a4c2ec6 100644 --- a/control-pipeline/api/__init__.py +++ b/control-pipeline/api/__init__.py @@ -5,6 +5,8 @@ from api.canonical_control_routes import router as canonical_router from api.document_compliance_routes import router as document_router from api.dependency_routes import router as dependency_router from api.master_control_routes import router as master_control_router +from api.decision_trace_routes import router as decision_trace_router +from api.decision_trace_routes import full_trace_router router = APIRouter() router.include_router(generator_router) @@ -12,3 +14,5 @@ router.include_router(canonical_router) router.include_router(document_router) router.include_router(dependency_router) router.include_router(master_control_router) +router.include_router(decision_trace_router) +router.include_router(full_trace_router) diff --git a/control-pipeline/api/decision_trace_routes.py b/control-pipeline/api/decision_trace_routes.py new file mode 100644 index 0000000..48594cd --- /dev/null +++ b/control-pipeline/api/decision_trace_routes.py @@ -0,0 +1,404 @@ +"""Decision Trace API — G1 Compliance Execution Layer. + +Tracks compliance decisions per control: who decided, when, why, +what evidence supports it, and what's the remediation plan. +""" + +import json +import logging +import uuid +from datetime import datetime +from typing import Optional + +from fastapi import APIRouter, HTTPException, Query +from pydantic import BaseModel +from sqlalchemy import text + +from db.session import SessionLocal + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/v1/decision-traces", tags=["decision-traces"]) + + +# ── Request/Response Models ────────────────────────────────────────── + + +class CreateDecisionRequest(BaseModel): + control_uuid: str + regulation_id: Optional[str] = None + obligation_id: Optional[str] = None + status: str = "not_assessed" + decision_reason: Optional[str] = None + decided_by: Optional[str] = None + fix_strategy: Optional[str] = None + fix_owner: Optional[str] = None + fix_target_date: Optional[str] = None + evidence_ids: list[str] = [] + confidence: float = 0.0 + tenant_id: Optional[str] = None + project_id: Optional[str] = None + metadata: dict = {} + + +class UpdateDecisionRequest(BaseModel): + status: Optional[str] = None + decision_reason: Optional[str] = None + decided_by: Optional[str] = None + fix_strategy: Optional[str] = None + fix_owner: Optional[str] = None + fix_target_date: Optional[str] = None + fix_completed_date: Optional[str] = None + evidence_ids: Optional[list[str]] = None + confidence: Optional[float] = None + metadata: Optional[dict] = None + + +# ── Endpoints ──────────────────────────────────────────────────────── + + +@router.post("") +async def create_decision(req: CreateDecisionRequest): + """Record a new compliance decision for a control.""" + db = SessionLocal() + try: + trace_id = str(uuid.uuid4()) + db.execute(text(""" + INSERT INTO decision_traces + (id, control_uuid, regulation_id, obligation_id, + status, decision_reason, decided_by, decided_at, + fix_strategy, fix_owner, fix_target_date, + evidence_ids, confidence, tenant_id, project_id, metadata) + VALUES + (CAST(:id AS uuid), CAST(:control_uuid AS uuid), :regulation_id, :obligation_id, + :status, :decision_reason, :decided_by, NOW(), + :fix_strategy, :fix_owner, :fix_target_date, + CAST(:evidence_ids AS jsonb), :confidence, + :tenant_id, :project_id, CAST(:metadata AS jsonb)) + """), { + "id": trace_id, + "control_uuid": req.control_uuid, + "regulation_id": req.regulation_id, + "obligation_id": req.obligation_id, + "status": req.status, + "decision_reason": req.decision_reason, + "decided_by": req.decided_by, + "fix_strategy": req.fix_strategy, + "fix_owner": req.fix_owner, + "fix_target_date": req.fix_target_date, + "evidence_ids": json.dumps(req.evidence_ids), + "confidence": req.confidence, + "tenant_id": req.tenant_id, + "project_id": req.project_id, + "metadata": json.dumps(req.metadata), + }) + db.commit() + return {"id": trace_id, "status": "created"} + finally: + db.close() + + +@router.get("") +async def list_decisions( + control_uuid: Optional[str] = None, + status: Optional[str] = None, + tenant_id: Optional[str] = None, + limit: int = Query(50, ge=1, le=500), + offset: int = Query(0, ge=0), +): + """List decision traces with optional filters.""" + db = SessionLocal() + try: + clauses = [] + params: dict = {"limit": limit, "offset": offset} + + if control_uuid: + clauses.append("dt.control_uuid = CAST(:control_uuid AS uuid)") + params["control_uuid"] = control_uuid + if status: + clauses.append("dt.status = :status") + params["status"] = status + if tenant_id: + clauses.append("dt.tenant_id = CAST(:tenant_id AS uuid)") + params["tenant_id"] = tenant_id + + where = "WHERE " + " AND ".join(clauses) if clauses else "" + + rows = db.execute(text(f""" + SELECT dt.id, dt.control_uuid, cc.control_id, cc.title, + dt.status, dt.decision_reason, dt.decided_by, dt.decided_at, + dt.fix_strategy, dt.fix_owner, dt.fix_target_date, dt.fix_completed_date, + dt.evidence_ids, dt.confidence, dt.regulation_id + FROM decision_traces dt + LEFT JOIN canonical_controls cc ON cc.id = dt.control_uuid + {where} + ORDER BY dt.decided_at DESC NULLS LAST + LIMIT :limit OFFSET :offset + """), params).fetchall() + + total = db.execute(text(f""" + SELECT count(*) FROM decision_traces dt {where} + """), params).scalar() + + return { + "total": total, + "decisions": [ + { + "id": str(r[0]), + "control_uuid": str(r[1]), + "control_id": r[2], + "control_title": r[3], + "status": r[4], + "decision_reason": r[5], + "decided_by": r[6], + "decided_at": str(r[7]) if r[7] else None, + "fix_strategy": r[8], + "fix_owner": r[9], + "fix_target_date": str(r[10]) if r[10] else None, + "fix_completed_date": str(r[11]) if r[11] else None, + "evidence_ids": r[12], + "confidence": float(r[13]) if r[13] else 0, + "regulation_id": r[14], + } + for r in rows + ], + } + finally: + db.close() + + +@router.get("/stats") +async def decision_stats(tenant_id: Optional[str] = None): + """Dashboard statistics for compliance decisions.""" + db = SessionLocal() + try: + tenant_filter = "" + params: dict = {} + if tenant_id: + tenant_filter = "WHERE tenant_id = CAST(:tenant_id AS uuid)" + params["tenant_id"] = tenant_id + + stats = db.execute(text(f""" + SELECT status, count(*) FROM decision_traces + {tenant_filter} + GROUP BY status + """), params).fetchall() + + total = sum(r[1] for r in stats) + by_status = {r[0]: r[1] for r in stats} + + return { + "total_decisions": total, + "by_status": by_status, + "compliance_rate": round( + by_status.get("compliant", 0) / total * 100, 1 + ) if total > 0 else 0, + "pending_remediation": by_status.get("under_remediation", 0), + "not_assessed": by_status.get("not_assessed", 0), + } + finally: + db.close() + + +@router.get("/{trace_id}") +async def get_decision(trace_id: str): + """Get a single decision trace.""" + db = SessionLocal() + try: + row = db.execute(text(""" + SELECT dt.*, cc.control_id, cc.title, cc.source_citation + FROM decision_traces dt + LEFT JOIN canonical_controls cc ON cc.id = dt.control_uuid + WHERE dt.id = CAST(:id AS uuid) + """), {"id": trace_id}).fetchone() + + if not row: + raise HTTPException(status_code=404, detail="Decision trace not found") + + return { + "id": str(row.id), + "control_uuid": str(row.control_uuid), + "control_id": row.control_id, + "control_title": row.title, + "regulation_id": row.regulation_id, + "obligation_id": row.obligation_id, + "status": row.status, + "decision_reason": row.decision_reason, + "decided_by": row.decided_by, + "decided_at": str(row.decided_at) if row.decided_at else None, + "fix_strategy": row.fix_strategy, + "fix_owner": row.fix_owner, + "fix_target_date": str(row.fix_target_date) if row.fix_target_date else None, + "fix_completed_date": str(row.fix_completed_date) if row.fix_completed_date else None, + "evidence_ids": row.evidence_ids, + "confidence": float(row.confidence) if row.confidence else 0, + "source_citation": row.source_citation, + "metadata": row.metadata, + } + finally: + db.close() + + +@router.put("/{trace_id}") +async def update_decision(trace_id: str, req: UpdateDecisionRequest): + """Update a decision trace (status, fix progress, evidence).""" + db = SessionLocal() + try: + updates = [] + params: dict = {"id": trace_id} + + if req.status is not None: + updates.append("status = :status") + params["status"] = req.status + if req.decision_reason is not None: + updates.append("decision_reason = :reason") + params["reason"] = req.decision_reason + if req.decided_by is not None: + updates.append("decided_by = :decided_by") + params["decided_by"] = req.decided_by + if req.fix_strategy is not None: + updates.append("fix_strategy = :fix_strategy") + params["fix_strategy"] = req.fix_strategy + if req.fix_owner is not None: + updates.append("fix_owner = :fix_owner") + params["fix_owner"] = req.fix_owner + if req.fix_target_date is not None: + updates.append("fix_target_date = :fix_target") + params["fix_target"] = req.fix_target_date + if req.fix_completed_date is not None: + updates.append("fix_completed_date = :fix_completed") + params["fix_completed"] = req.fix_completed_date + if req.evidence_ids is not None: + updates.append("evidence_ids = CAST(:evidence AS jsonb)") + params["evidence"] = json.dumps(req.evidence_ids) + if req.confidence is not None: + updates.append("confidence = :confidence") + params["confidence"] = req.confidence + + if not updates: + raise HTTPException(status_code=400, detail="No fields to update") + + result = db.execute(text(f""" + UPDATE decision_traces SET {', '.join(updates)} + WHERE id = CAST(:id AS uuid) + """), params) + db.commit() + + if result.rowcount == 0: + raise HTTPException(status_code=404, detail="Decision trace not found") + + return {"status": "updated", "id": trace_id} + finally: + db.close() + + +# ── Full Trace Endpoint ────────────────────────────────────────────── + + +full_trace_router = APIRouter(prefix="/v1/controls", tags=["decision-traces"]) + + +@full_trace_router.get("/{control_id}/full-trace") +async def get_full_trace(control_id: str): + """Get the complete Decision Trace chain for a control. + + Returns: Regulation → Obligation → Control → Master Control → Decision → Evidence + """ + db = SessionLocal() + try: + # 1. Control + ctrl = db.execute(text(""" + SELECT id, control_id, title, objective, severity, + source_citation, source_original_text, + verification_method, category, + generation_metadata->>'merge_group_hint' AS merge_hint + FROM canonical_controls + WHERE control_id = :cid + """), {"cid": control_id}).fetchone() + + if not ctrl: + raise HTTPException(status_code=404, detail="Control not found") + + # 2. Regulation (from source_citation) + citation = ctrl.source_citation or {} + regulation = { + "source": citation.get("source"), + "article": citation.get("article"), + "paragraph": citation.get("paragraph"), + "source_type": citation.get("source_type"), + "license": citation.get("license"), + } + + # 3. Obligation (from parent links) + obligations = db.execute(text(""" + SELECT oc.candidate_id, oc.obligation_text, oc.action, + oc.object, oc.normative_strength + FROM obligation_candidates oc + WHERE oc.parent_control_uuid = CAST(:uuid AS uuid) + ORDER BY oc.candidate_id + LIMIT 10 + """), {"uuid": str(ctrl.id)}).fetchall() + + # 4. Master Control (if member) + master = db.execute(text(""" + SELECT mc.master_control_id, mc.canonical_name, mc.phases_covered + FROM master_control_members mcm + JOIN master_controls mc ON mc.id = mcm.master_control_uuid + WHERE mcm.control_uuid = CAST(:uuid AS uuid) + LIMIT 1 + """), {"uuid": str(ctrl.id)}).fetchone() + + # 5. Decision Traces + decisions = db.execute(text(""" + SELECT id, status, decision_reason, decided_by, decided_at, + fix_strategy, fix_owner, evidence_ids, confidence + FROM decision_traces + WHERE control_uuid = CAST(:uuid AS uuid) + ORDER BY decided_at DESC NULLS LAST + """), {"uuid": str(ctrl.id)}).fetchall() + + return { + "control": { + "id": ctrl.control_id, + "uuid": str(ctrl.id), + "title": ctrl.title, + "objective": ctrl.objective, + "severity": ctrl.severity, + "category": ctrl.category, + "verification_method": ctrl.verification_method, + }, + "regulation": regulation, + "original_text": ctrl.source_original_text[:500] if ctrl.source_original_text else None, + "obligations": [ + { + "id": o.candidate_id, + "text": o.obligation_text, + "action": o.action, + "object": o.object, + "strength": o.normative_strength, + } + for o in obligations + ], + "master_control": { + "id": master.master_control_id, + "name": master.canonical_name, + "phases": master.phases_covered, + } if master else None, + "decisions": [ + { + "id": str(d.id), + "status": d.status, + "reason": d.decision_reason, + "decided_by": d.decided_by, + "decided_at": str(d.decided_at) if d.decided_at else None, + "fix_strategy": d.fix_strategy, + "fix_owner": d.fix_owner, + "evidence_count": len(d.evidence_ids) if d.evidence_ids else 0, + "confidence": float(d.confidence) if d.confidence else 0, + } + for d in decisions + ], + "latest_status": decisions[0].status if decisions else "not_assessed", + } + finally: + db.close() diff --git a/control-pipeline/migrations/006_decision_traces.sql b/control-pipeline/migrations/006_decision_traces.sql new file mode 100644 index 0000000..cae896d --- /dev/null +++ b/control-pipeline/migrations/006_decision_traces.sql @@ -0,0 +1,58 @@ +-- Migration 006: Decision Traces (G1) +-- Schema: compliance +-- Run: ssh macmini "docker exec -i bp-core-postgres psql -U breakpilot -d breakpilot_db" < control-pipeline/migrations/006_decision_traces.sql + +SET search_path TO compliance, public; + +CREATE TABLE IF NOT EXISTS decision_traces ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + control_uuid UUID NOT NULL, + regulation_id VARCHAR(100), + obligation_id VARCHAR(100), + + -- Decision + status VARCHAR(30) NOT NULL DEFAULT 'not_assessed' + CHECK (status IN ('not_assessed', 'compliant', 'partially_compliant', + 'not_compliant', 'not_applicable', 'under_remediation')), + decision_reason TEXT, + decided_by VARCHAR(100), + decided_at TIMESTAMPTZ, + + -- Fix/Remediation + fix_strategy TEXT, + fix_owner VARCHAR(100), + fix_target_date DATE, + fix_completed_date DATE, + + -- Evidence + evidence_ids JSONB DEFAULT '[]', + confidence NUMERIC(3,2) DEFAULT 0.0, + + -- Multi-tenant + tenant_id UUID, + project_id UUID, + metadata JSONB DEFAULT '{}', + + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_dt_control ON decision_traces(control_uuid); +CREATE INDEX IF NOT EXISTS idx_dt_status ON decision_traces(status); +CREATE INDEX IF NOT EXISTS idx_dt_tenant ON decision_traces(tenant_id); +CREATE INDEX IF NOT EXISTS idx_dt_decided_at ON decision_traces(decided_at); + +-- Updated-at trigger +CREATE OR REPLACE FUNCTION update_decision_traces_updated_at() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = NOW(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS trg_decision_traces_updated_at ON decision_traces; +CREATE TRIGGER trg_decision_traces_updated_at + BEFORE UPDATE ON decision_traces + FOR EACH ROW + EXECUTE FUNCTION update_decision_traces_updated_at();