Files
Benjamin Admin d5bcd0bd5b feat(pipeline): G4 Pre-Deployment Enforcement — CI/CD compliance gate
New table: deployment_checks (verdict, blocking/warning controls, risk score)
New API:
  POST /v1/deployment-checks (SDK asks: "can I deploy?")
  GET /v1/deployment-checks/{id} (check result)
  POST /v1/deployment-checks/{id}/override (manual override with justification)
  GET /v1/deployment-checks/stats (approval/block rate)

Check logic: queries G1 decision_traces + G3 open failures per affected control.
Verdict: approved (0 blocking) or blocked (with fix recommendations).
454 tests pass, 0 regressions.

Block G complete: G1-G4 all implemented.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-06 20:24:45 +02:00

259 lines
8.7 KiB
Python

"""Pre-Deployment Enforcement API — G4.
CI/CD gate: checks if a deployment is safe by evaluating the compliance
status of all affected controls. Blocks deploys with non-compliant controls.
"""
import json
import logging
import uuid
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import text
from db.session import SessionLocal
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/deployment-checks", tags=["deployment-checks"])
SEVERITY_WEIGHT = {
"critical": 4.0,
"high": 3.0,
"medium": 2.0,
"low": 1.0,
}
class DeployCheckRequest(BaseModel):
tenant_id: str
commit_hash: str
branch: Optional[str] = None
environment: str = "production"
affected_control_ids: list[str] = []
metadata: dict = {}
class OverrideRequest(BaseModel):
override_by: str
override_reason: str
@router.post("")
async def check_deployment(req: DeployCheckRequest):
"""Check if a deployment is safe. Returns verdict: approved/blocked."""
db = SessionLocal()
try:
check_id = str(uuid.uuid4())
blocking = []
warnings = []
risk_score = 0.0
if req.affected_control_ids:
# Look up latest decision status for each affected control
for ctrl_id in req.affected_control_ids:
row = db.execute(text("""
SELECT dt.status, dt.decision_reason, dt.fix_strategy,
cc.control_id, cc.title, cc.severity
FROM decision_traces dt
JOIN canonical_controls cc ON cc.id = dt.control_uuid
WHERE cc.control_id = :cid
ORDER BY dt.decided_at DESC NULLS LAST
LIMIT 1
"""), {"cid": ctrl_id}).fetchone()
if not row:
# No decision → treat as not_assessed (warning)
warnings.append({
"control_id": ctrl_id,
"status": "not_assessed",
"reason": "No compliance decision recorded",
})
continue
status = row[0]
severity = row[5] or "medium"
weight = SEVERITY_WEIGHT.get(severity, 2.0)
if status in ("not_compliant", "under_remediation"):
blocking.append({
"control_id": row[3],
"title": row[4],
"status": status,
"reason": row[1],
"fix_strategy": row[2],
"severity": severity,
})
risk_score += weight
elif status == "partially_compliant":
warnings.append({
"control_id": row[3],
"title": row[4],
"status": status,
"reason": row[1],
"severity": severity,
})
risk_score += weight * 0.5
# Also check for open failure events (G3)
if req.affected_control_ids:
placeholders = ",".join(["'%s'" % c for c in req.affected_control_ids])
open_failures = db.execute(text(f"""
SELECT cc.control_id, de.summary
FROM decision_events de
JOIN canonical_controls cc ON cc.id = de.control_uuid
WHERE cc.control_id IN ({placeholders})
AND de.event_type = 'failure'
AND de.created_at > NOW() - interval '30 days'
AND NOT EXISTS (
SELECT 1 FROM decision_events de2
WHERE de2.control_uuid = de.control_uuid
AND de2.event_type = 'verification'
AND de2.created_at > de.created_at
)
""")).fetchall()
for f in open_failures:
if not any(b["control_id"] == f[0] for b in blocking):
blocking.append({
"control_id": f[0],
"status": "open_failure",
"reason": f[1] or "Unresolved failure event",
"severity": "high",
})
risk_score += 3.0
verdict = "approved" if not blocking else "blocked"
summary = (
f"{len(blocking)} blocking, {len(warnings)} warnings. "
+ ("Deploy approved." if verdict == "approved"
else f"Fix {', '.join(b['control_id'] for b in blocking)} before deploying.")
)
# Store check result
db.execute(text("""
INSERT INTO deployment_checks
(id, tenant_id, commit_hash, branch, environment,
verdict, affected_control_ids, blocking_controls,
warning_controls, risk_score, summary, metadata)
VALUES
(CAST(:id AS uuid), CAST(:tid AS uuid), :hash, :branch, :env,
:verdict, CAST(:affected AS jsonb), CAST(:blocking AS jsonb),
CAST(:warnings AS jsonb), :risk, :summary, CAST(:meta AS jsonb))
"""), {
"id": check_id,
"tid": req.tenant_id,
"hash": req.commit_hash,
"branch": req.branch,
"env": req.environment,
"verdict": verdict,
"affected": json.dumps(req.affected_control_ids),
"blocking": json.dumps(blocking),
"warnings": json.dumps(warnings),
"risk": risk_score,
"summary": summary,
"meta": json.dumps(req.metadata),
})
db.commit()
return {
"id": check_id,
"verdict": verdict,
"risk_score": risk_score,
"blocking_controls": blocking,
"warning_controls": warnings,
"summary": summary,
}
finally:
db.close()
@router.get("/stats")
async def check_stats(tenant_id: Optional[str] = None):
"""Deployment check statistics."""
db = SessionLocal()
try:
tf = ""
params: dict = {}
if tenant_id:
tf = "WHERE tenant_id = CAST(:tid AS uuid)"
params["tid"] = tenant_id
by_verdict = db.execute(text(f"""
SELECT verdict, count(*) FROM deployment_checks {tf}
GROUP BY verdict
"""), params).fetchall()
total = sum(r[1] for r in by_verdict)
verdicts = {r[0]: r[1] for r in by_verdict}
return {
"total_checks": total,
"by_verdict": verdicts,
"approval_rate": round(
verdicts.get("approved", 0) / total * 100, 1
) if total > 0 else 0,
"override_count": verdicts.get("override", 0),
}
finally:
db.close()
@router.post("/{check_id}/override")
async def override_check(check_id: str, req: OverrideRequest):
"""Override a blocked deployment (with justification)."""
db = SessionLocal()
try:
result = db.execute(text("""
UPDATE deployment_checks
SET verdict = 'override', override_by = :by, override_reason = :reason
WHERE id = CAST(:id AS uuid) AND verdict = 'blocked'
"""), {
"id": check_id,
"by": req.override_by,
"reason": req.override_reason,
})
db.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Check not found or not blocked")
return {"id": check_id, "verdict": "override", "override_by": req.override_by}
finally:
db.close()
@router.get("/{check_id}")
async def get_check(check_id: str):
"""Get details of a deployment check."""
db = SessionLocal()
try:
row = db.execute(text("""
SELECT * FROM deployment_checks WHERE id = CAST(:id AS uuid)
"""), {"id": check_id}).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Check not found")
return {
"id": str(row.id),
"tenant_id": str(row.tenant_id),
"commit_hash": row.commit_hash,
"branch": row.branch,
"environment": row.environment,
"verdict": row.verdict,
"affected_control_ids": row.affected_control_ids,
"blocking_controls": row.blocking_controls,
"warning_controls": row.warning_controls,
"risk_score": float(row.risk_score),
"override_by": row.override_by,
"override_reason": row.override_reason,
"summary": row.summary,
"created_at": str(row.created_at),
}
finally:
db.close()