Files
Benjamin Admin c398e74d5e feat(pipeline): G3 Full Decision Memory — compliance lifecycle event stream
New table: decision_events (assessment→decision→fix→verification→failure cycle)
New API:
  POST /v1/decision-events (record lifecycle event)
  GET /v1/decision-events (list with filters)
  GET /v1/decision-events/timeline/{control_id} (full chronological timeline)
  GET /v1/decision-events/stats (failure rate, cycle times)

Each event captures input_state, output_state, actor, evidence.
454 tests pass, 0 regressions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-06 20:16:25 +02:00

225 lines
7.4 KiB
Python

"""Decision Events API — G3 Full Decision Memory.
Event-stream for each control's compliance lifecycle:
assessment → decision → fix → verification → (failure → new cycle)
"""
import json
import logging
import uuid
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import text
from db.session import SessionLocal
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/decision-events", tags=["decision-events"])
class CreateEventRequest(BaseModel):
control_uuid: str
decision_trace_id: Optional[str] = None
tenant_id: Optional[str] = None
event_type: str
input_state: dict = {}
output_state: dict = {}
summary: Optional[str] = None
actor: Optional[str] = None
evidence_ids: list[str] = []
metadata: dict = {}
@router.post("")
async def create_event(req: CreateEventRequest):
"""Record a decision event in the compliance lifecycle."""
db = SessionLocal()
try:
eid = str(uuid.uuid4())
db.execute(text("""
INSERT INTO decision_events
(id, decision_trace_id, control_uuid, tenant_id,
event_type, input_state, output_state,
summary, actor, evidence_ids, metadata)
VALUES
(CAST(:id AS uuid),
CASE WHEN :trace_id IS NOT NULL THEN CAST(:trace_id AS uuid) ELSE NULL END,
CAST(:control_uuid AS uuid),
CASE WHEN :tenant_id IS NOT NULL THEN CAST(:tenant_id AS uuid) ELSE NULL END,
:event_type, CAST(:input AS jsonb), CAST(:output AS jsonb),
:summary, :actor, CAST(:evidence AS jsonb), CAST(:meta AS jsonb))
"""), {
"id": eid,
"trace_id": req.decision_trace_id,
"control_uuid": req.control_uuid,
"tenant_id": req.tenant_id,
"event_type": req.event_type,
"input": json.dumps(req.input_state),
"output": json.dumps(req.output_state),
"summary": req.summary,
"actor": req.actor,
"evidence": json.dumps(req.evidence_ids),
"meta": json.dumps(req.metadata),
})
db.commit()
return {"id": eid, "event_type": req.event_type, "status": "recorded"}
finally:
db.close()
@router.get("")
async def list_events(
control_uuid: Optional[str] = None,
tenant_id: Optional[str] = None,
event_type: Optional[str] = None,
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
):
"""List decision events with filters."""
db = SessionLocal()
try:
clauses = []
params: dict = {"limit": limit, "offset": offset}
if control_uuid:
clauses.append("de.control_uuid = CAST(:cuuid AS uuid)")
params["cuuid"] = control_uuid
if tenant_id:
clauses.append("de.tenant_id = CAST(:tid AS uuid)")
params["tid"] = tenant_id
if event_type:
clauses.append("de.event_type = :etype")
params["etype"] = event_type
where = "WHERE " + " AND ".join(clauses) if clauses else ""
rows = db.execute(text(f"""
SELECT de.id, de.control_uuid, cc.control_id,
de.event_type, de.summary, de.actor,
de.input_state, de.output_state,
de.evidence_ids, de.created_at
FROM decision_events de
LEFT JOIN canonical_controls cc ON cc.id = de.control_uuid
{where}
ORDER BY de.created_at DESC
LIMIT :limit OFFSET :offset
"""), params).fetchall()
return {
"total": len(rows),
"events": [
{
"id": str(r[0]),
"control_uuid": str(r[1]),
"control_id": r[2],
"event_type": r[3],
"summary": r[4],
"actor": r[5],
"input_state": r[6],
"output_state": r[7],
"evidence_ids": r[8],
"created_at": str(r[9]),
}
for r in rows
],
}
finally:
db.close()
@router.get("/stats")
async def event_stats(tenant_id: Optional[str] = None):
"""Lifecycle statistics: cycle times, failure rates."""
db = SessionLocal()
try:
tf = ""
params: dict = {}
if tenant_id:
tf = "WHERE tenant_id = CAST(:tid AS uuid)"
params["tid"] = tenant_id
by_type = db.execute(text(f"""
SELECT event_type, count(*) FROM decision_events {tf}
GROUP BY event_type ORDER BY count(*) DESC
"""), params).fetchall()
total = sum(r[1] for r in by_type)
failures = next((r[1] for r in by_type if r[0] == "failure"), 0)
verifications = next((r[1] for r in by_type if r[0] == "verification"), 0)
return {
"total_events": total,
"by_event_type": {r[0]: r[1] for r in by_type},
"failure_rate": round(failures / total * 100, 1) if total > 0 else 0,
"verification_rate": round(verifications / total * 100, 1) if total > 0 else 0,
}
finally:
db.close()
@router.get("/timeline/{control_id}")
async def get_timeline(control_id: str):
"""Full chronological timeline for a control's compliance lifecycle."""
db = SessionLocal()
try:
# Resolve control_id to UUID
ctrl = db.execute(text("""
SELECT id, control_id, title FROM canonical_controls
WHERE control_id = :cid
"""), {"cid": control_id}).fetchone()
if not ctrl:
raise HTTPException(status_code=404, detail="Control not found")
events = db.execute(text("""
SELECT id, event_type, summary, actor,
input_state, output_state, evidence_ids, created_at
FROM decision_events
WHERE control_uuid = CAST(:uuid AS uuid)
ORDER BY created_at ASC
"""), {"uuid": str(ctrl[0])}).fetchall()
# Determine current state from latest event
current_state = "unknown"
if events:
last = events[-1]
output = last[5] or {}
current_state = output.get("status", last[1])
# Calculate avg fix time (assessment → fix_completed)
fix_times = []
assessment_at = None
for e in events:
if e[1] == "assessment":
assessment_at = e[7]
elif e[1] == "fix_completed" and assessment_at:
delta = (e[7] - assessment_at).total_seconds() / 3600
fix_times.append(delta)
assessment_at = None
return {
"control_id": ctrl[1],
"control_title": ctrl[2],
"current_state": current_state,
"total_events": len(events),
"time_to_fix_avg_hours": round(sum(fix_times) / len(fix_times), 1) if fix_times else None,
"events": [
{
"id": str(e[0]),
"type": e[1],
"summary": e[2],
"actor": e[3],
"input_state": e[4],
"output_state": e[5],
"evidence_count": len(e[6]) if e[6] else 0,
"at": str(e[7]),
}
for e in events
],
}
finally:
db.close()