feat(pipeline): G3 Full Decision Memory — compliance lifecycle event stream

New table: decision_events (assessment→decision→fix→verification→failure cycle)
New API:
  POST /v1/decision-events (record lifecycle event)
  GET /v1/decision-events (list with filters)
  GET /v1/decision-events/timeline/{control_id} (full chronological timeline)
  GET /v1/decision-events/stats (failure rate, cycle times)

Each event captures input_state, output_state, actor, evidence.
454 tests pass, 0 regressions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-06 20:16:25 +02:00
parent e82f99b8cb
commit c398e74d5e
3 changed files with 263 additions and 0 deletions
+2
View File
@@ -8,6 +8,7 @@ from api.master_control_routes import router as master_control_router
from api.decision_trace_routes import router as decision_trace_router
from api.decision_trace_routes import full_trace_router
from api.compliance_commit_routes import router as compliance_commit_router
from api.decision_event_routes import router as decision_event_router
router = APIRouter()
router.include_router(generator_router)
@@ -18,3 +19,4 @@ router.include_router(master_control_router)
router.include_router(decision_trace_router)
router.include_router(full_trace_router)
router.include_router(compliance_commit_router)
router.include_router(decision_event_router)
@@ -0,0 +1,224 @@
"""Decision Events API — G3 Full Decision Memory.
Event-stream for each control's compliance lifecycle:
assessment → decision → fix → verification → (failure → new cycle)
"""
import json
import logging
import uuid
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import text
from db.session import SessionLocal
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/decision-events", tags=["decision-events"])
class CreateEventRequest(BaseModel):
control_uuid: str
decision_trace_id: Optional[str] = None
tenant_id: Optional[str] = None
event_type: str
input_state: dict = {}
output_state: dict = {}
summary: Optional[str] = None
actor: Optional[str] = None
evidence_ids: list[str] = []
metadata: dict = {}
@router.post("")
async def create_event(req: CreateEventRequest):
"""Record a decision event in the compliance lifecycle."""
db = SessionLocal()
try:
eid = str(uuid.uuid4())
db.execute(text("""
INSERT INTO decision_events
(id, decision_trace_id, control_uuid, tenant_id,
event_type, input_state, output_state,
summary, actor, evidence_ids, metadata)
VALUES
(CAST(:id AS uuid),
CASE WHEN :trace_id IS NOT NULL THEN CAST(:trace_id AS uuid) ELSE NULL END,
CAST(:control_uuid AS uuid),
CASE WHEN :tenant_id IS NOT NULL THEN CAST(:tenant_id AS uuid) ELSE NULL END,
:event_type, CAST(:input AS jsonb), CAST(:output AS jsonb),
:summary, :actor, CAST(:evidence AS jsonb), CAST(:meta AS jsonb))
"""), {
"id": eid,
"trace_id": req.decision_trace_id,
"control_uuid": req.control_uuid,
"tenant_id": req.tenant_id,
"event_type": req.event_type,
"input": json.dumps(req.input_state),
"output": json.dumps(req.output_state),
"summary": req.summary,
"actor": req.actor,
"evidence": json.dumps(req.evidence_ids),
"meta": json.dumps(req.metadata),
})
db.commit()
return {"id": eid, "event_type": req.event_type, "status": "recorded"}
finally:
db.close()
@router.get("")
async def list_events(
control_uuid: Optional[str] = None,
tenant_id: Optional[str] = None,
event_type: Optional[str] = None,
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
):
"""List decision events with filters."""
db = SessionLocal()
try:
clauses = []
params: dict = {"limit": limit, "offset": offset}
if control_uuid:
clauses.append("de.control_uuid = CAST(:cuuid AS uuid)")
params["cuuid"] = control_uuid
if tenant_id:
clauses.append("de.tenant_id = CAST(:tid AS uuid)")
params["tid"] = tenant_id
if event_type:
clauses.append("de.event_type = :etype")
params["etype"] = event_type
where = "WHERE " + " AND ".join(clauses) if clauses else ""
rows = db.execute(text(f"""
SELECT de.id, de.control_uuid, cc.control_id,
de.event_type, de.summary, de.actor,
de.input_state, de.output_state,
de.evidence_ids, de.created_at
FROM decision_events de
LEFT JOIN canonical_controls cc ON cc.id = de.control_uuid
{where}
ORDER BY de.created_at DESC
LIMIT :limit OFFSET :offset
"""), params).fetchall()
return {
"total": len(rows),
"events": [
{
"id": str(r[0]),
"control_uuid": str(r[1]),
"control_id": r[2],
"event_type": r[3],
"summary": r[4],
"actor": r[5],
"input_state": r[6],
"output_state": r[7],
"evidence_ids": r[8],
"created_at": str(r[9]),
}
for r in rows
],
}
finally:
db.close()
@router.get("/stats")
async def event_stats(tenant_id: Optional[str] = None):
"""Lifecycle statistics: cycle times, failure rates."""
db = SessionLocal()
try:
tf = ""
params: dict = {}
if tenant_id:
tf = "WHERE tenant_id = CAST(:tid AS uuid)"
params["tid"] = tenant_id
by_type = db.execute(text(f"""
SELECT event_type, count(*) FROM decision_events {tf}
GROUP BY event_type ORDER BY count(*) DESC
"""), params).fetchall()
total = sum(r[1] for r in by_type)
failures = next((r[1] for r in by_type if r[0] == "failure"), 0)
verifications = next((r[1] for r in by_type if r[0] == "verification"), 0)
return {
"total_events": total,
"by_event_type": {r[0]: r[1] for r in by_type},
"failure_rate": round(failures / total * 100, 1) if total > 0 else 0,
"verification_rate": round(verifications / total * 100, 1) if total > 0 else 0,
}
finally:
db.close()
@router.get("/timeline/{control_id}")
async def get_timeline(control_id: str):
"""Full chronological timeline for a control's compliance lifecycle."""
db = SessionLocal()
try:
# Resolve control_id to UUID
ctrl = db.execute(text("""
SELECT id, control_id, title FROM canonical_controls
WHERE control_id = :cid
"""), {"cid": control_id}).fetchone()
if not ctrl:
raise HTTPException(status_code=404, detail="Control not found")
events = db.execute(text("""
SELECT id, event_type, summary, actor,
input_state, output_state, evidence_ids, created_at
FROM decision_events
WHERE control_uuid = CAST(:uuid AS uuid)
ORDER BY created_at ASC
"""), {"uuid": str(ctrl[0])}).fetchall()
# Determine current state from latest event
current_state = "unknown"
if events:
last = events[-1]
output = last[5] or {}
current_state = output.get("status", last[1])
# Calculate avg fix time (assessment → fix_completed)
fix_times = []
assessment_at = None
for e in events:
if e[1] == "assessment":
assessment_at = e[7]
elif e[1] == "fix_completed" and assessment_at:
delta = (e[7] - assessment_at).total_seconds() / 3600
fix_times.append(delta)
assessment_at = None
return {
"control_id": ctrl[1],
"control_title": ctrl[2],
"current_state": current_state,
"total_events": len(events),
"time_to_fix_avg_hours": round(sum(fix_times) / len(fix_times), 1) if fix_times else None,
"events": [
{
"id": str(e[0]),
"type": e[1],
"summary": e[2],
"actor": e[3],
"input_state": e[4],
"output_state": e[5],
"evidence_count": len(e[6]) if e[6] else 0,
"at": str(e[7]),
}
for e in events
],
}
finally:
db.close()
@@ -0,0 +1,37 @@
-- Migration 008: Decision Events / Full Decision Memory (G3)
-- Schema: compliance
-- Run: ssh macmini "docker exec -i bp-core-postgres psql -U breakpilot -d breakpilot_db" < control-pipeline/migrations/008_decision_events.sql
SET search_path TO compliance, public;
CREATE TABLE IF NOT EXISTS decision_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
decision_trace_id UUID REFERENCES decision_traces(id) ON DELETE SET NULL,
control_uuid UUID NOT NULL,
tenant_id UUID,
-- Event type
event_type VARCHAR(30) NOT NULL
CHECK (event_type IN (
'assessment', 'decision', 'fix_planned', 'fix_started',
'fix_completed', 'verification', 'failure', 'exception', 'escalation'
)),
-- State before/after
input_state JSONB DEFAULT '{}',
output_state JSONB DEFAULT '{}',
-- Details
summary TEXT,
actor VARCHAR(200),
evidence_ids JSONB DEFAULT '[]',
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_de_control ON decision_events(control_uuid);
CREATE INDEX IF NOT EXISTS idx_de_trace ON decision_events(decision_trace_id);
CREATE INDEX IF NOT EXISTS idx_de_tenant ON decision_events(tenant_id);
CREATE INDEX IF NOT EXISTS idx_de_type ON decision_events(event_type);
CREATE INDEX IF NOT EXISTS idx_de_created ON decision_events(created_at);