d5bcd0bd5b
New table: deployment_checks (verdict, blocking/warning controls, risk score)
New API:
POST /v1/deployment-checks (SDK asks: "can I deploy?")
GET /v1/deployment-checks/{id} (check result)
POST /v1/deployment-checks/{id}/override (manual override with justification)
GET /v1/deployment-checks/stats (approval/block rate)
Check logic: queries G1 decision_traces + G3 open failures per affected control.
Verdict: approved (0 blocking) or blocked (with fix recommendations).
454 tests pass, 0 regressions.
Block G complete: G1-G4 all implemented.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
259 lines
8.7 KiB
Python
259 lines
8.7 KiB
Python
"""Pre-Deployment Enforcement API — G4.
|
|
|
|
CI/CD gate: checks if a deployment is safe by evaluating the compliance
|
|
status of all affected controls. Blocks deploys with non-compliant controls.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import uuid
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, HTTPException, Query
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import text
|
|
|
|
from db.session import SessionLocal
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/v1/deployment-checks", tags=["deployment-checks"])
|
|
|
|
SEVERITY_WEIGHT = {
|
|
"critical": 4.0,
|
|
"high": 3.0,
|
|
"medium": 2.0,
|
|
"low": 1.0,
|
|
}
|
|
|
|
|
|
class DeployCheckRequest(BaseModel):
|
|
tenant_id: str
|
|
commit_hash: str
|
|
branch: Optional[str] = None
|
|
environment: str = "production"
|
|
affected_control_ids: list[str] = []
|
|
metadata: dict = {}
|
|
|
|
|
|
class OverrideRequest(BaseModel):
|
|
override_by: str
|
|
override_reason: str
|
|
|
|
|
|
@router.post("")
|
|
async def check_deployment(req: DeployCheckRequest):
|
|
"""Check if a deployment is safe. Returns verdict: approved/blocked."""
|
|
db = SessionLocal()
|
|
try:
|
|
check_id = str(uuid.uuid4())
|
|
|
|
blocking = []
|
|
warnings = []
|
|
risk_score = 0.0
|
|
|
|
if req.affected_control_ids:
|
|
# Look up latest decision status for each affected control
|
|
for ctrl_id in req.affected_control_ids:
|
|
row = db.execute(text("""
|
|
SELECT dt.status, dt.decision_reason, dt.fix_strategy,
|
|
cc.control_id, cc.title, cc.severity
|
|
FROM decision_traces dt
|
|
JOIN canonical_controls cc ON cc.id = dt.control_uuid
|
|
WHERE cc.control_id = :cid
|
|
ORDER BY dt.decided_at DESC NULLS LAST
|
|
LIMIT 1
|
|
"""), {"cid": ctrl_id}).fetchone()
|
|
|
|
if not row:
|
|
# No decision → treat as not_assessed (warning)
|
|
warnings.append({
|
|
"control_id": ctrl_id,
|
|
"status": "not_assessed",
|
|
"reason": "No compliance decision recorded",
|
|
})
|
|
continue
|
|
|
|
status = row[0]
|
|
severity = row[5] or "medium"
|
|
weight = SEVERITY_WEIGHT.get(severity, 2.0)
|
|
|
|
if status in ("not_compliant", "under_remediation"):
|
|
blocking.append({
|
|
"control_id": row[3],
|
|
"title": row[4],
|
|
"status": status,
|
|
"reason": row[1],
|
|
"fix_strategy": row[2],
|
|
"severity": severity,
|
|
})
|
|
risk_score += weight
|
|
elif status == "partially_compliant":
|
|
warnings.append({
|
|
"control_id": row[3],
|
|
"title": row[4],
|
|
"status": status,
|
|
"reason": row[1],
|
|
"severity": severity,
|
|
})
|
|
risk_score += weight * 0.5
|
|
|
|
# Also check for open failure events (G3)
|
|
if req.affected_control_ids:
|
|
placeholders = ",".join(["'%s'" % c for c in req.affected_control_ids])
|
|
open_failures = db.execute(text(f"""
|
|
SELECT cc.control_id, de.summary
|
|
FROM decision_events de
|
|
JOIN canonical_controls cc ON cc.id = de.control_uuid
|
|
WHERE cc.control_id IN ({placeholders})
|
|
AND de.event_type = 'failure'
|
|
AND de.created_at > NOW() - interval '30 days'
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM decision_events de2
|
|
WHERE de2.control_uuid = de.control_uuid
|
|
AND de2.event_type = 'verification'
|
|
AND de2.created_at > de.created_at
|
|
)
|
|
""")).fetchall()
|
|
|
|
for f in open_failures:
|
|
if not any(b["control_id"] == f[0] for b in blocking):
|
|
blocking.append({
|
|
"control_id": f[0],
|
|
"status": "open_failure",
|
|
"reason": f[1] or "Unresolved failure event",
|
|
"severity": "high",
|
|
})
|
|
risk_score += 3.0
|
|
|
|
verdict = "approved" if not blocking else "blocked"
|
|
summary = (
|
|
f"{len(blocking)} blocking, {len(warnings)} warnings. "
|
|
+ ("Deploy approved." if verdict == "approved"
|
|
else f"Fix {', '.join(b['control_id'] for b in blocking)} before deploying.")
|
|
)
|
|
|
|
# Store check result
|
|
db.execute(text("""
|
|
INSERT INTO deployment_checks
|
|
(id, tenant_id, commit_hash, branch, environment,
|
|
verdict, affected_control_ids, blocking_controls,
|
|
warning_controls, risk_score, summary, metadata)
|
|
VALUES
|
|
(CAST(:id AS uuid), CAST(:tid AS uuid), :hash, :branch, :env,
|
|
:verdict, CAST(:affected AS jsonb), CAST(:blocking AS jsonb),
|
|
CAST(:warnings AS jsonb), :risk, :summary, CAST(:meta AS jsonb))
|
|
"""), {
|
|
"id": check_id,
|
|
"tid": req.tenant_id,
|
|
"hash": req.commit_hash,
|
|
"branch": req.branch,
|
|
"env": req.environment,
|
|
"verdict": verdict,
|
|
"affected": json.dumps(req.affected_control_ids),
|
|
"blocking": json.dumps(blocking),
|
|
"warnings": json.dumps(warnings),
|
|
"risk": risk_score,
|
|
"summary": summary,
|
|
"meta": json.dumps(req.metadata),
|
|
})
|
|
db.commit()
|
|
|
|
return {
|
|
"id": check_id,
|
|
"verdict": verdict,
|
|
"risk_score": risk_score,
|
|
"blocking_controls": blocking,
|
|
"warning_controls": warnings,
|
|
"summary": summary,
|
|
}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get("/stats")
|
|
async def check_stats(tenant_id: Optional[str] = None):
|
|
"""Deployment check statistics."""
|
|
db = SessionLocal()
|
|
try:
|
|
tf = ""
|
|
params: dict = {}
|
|
if tenant_id:
|
|
tf = "WHERE tenant_id = CAST(:tid AS uuid)"
|
|
params["tid"] = tenant_id
|
|
|
|
by_verdict = db.execute(text(f"""
|
|
SELECT verdict, count(*) FROM deployment_checks {tf}
|
|
GROUP BY verdict
|
|
"""), params).fetchall()
|
|
|
|
total = sum(r[1] for r in by_verdict)
|
|
verdicts = {r[0]: r[1] for r in by_verdict}
|
|
|
|
return {
|
|
"total_checks": total,
|
|
"by_verdict": verdicts,
|
|
"approval_rate": round(
|
|
verdicts.get("approved", 0) / total * 100, 1
|
|
) if total > 0 else 0,
|
|
"override_count": verdicts.get("override", 0),
|
|
}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.post("/{check_id}/override")
|
|
async def override_check(check_id: str, req: OverrideRequest):
|
|
"""Override a blocked deployment (with justification)."""
|
|
db = SessionLocal()
|
|
try:
|
|
result = db.execute(text("""
|
|
UPDATE deployment_checks
|
|
SET verdict = 'override', override_by = :by, override_reason = :reason
|
|
WHERE id = CAST(:id AS uuid) AND verdict = 'blocked'
|
|
"""), {
|
|
"id": check_id,
|
|
"by": req.override_by,
|
|
"reason": req.override_reason,
|
|
})
|
|
db.commit()
|
|
|
|
if result.rowcount == 0:
|
|
raise HTTPException(status_code=404, detail="Check not found or not blocked")
|
|
|
|
return {"id": check_id, "verdict": "override", "override_by": req.override_by}
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get("/{check_id}")
|
|
async def get_check(check_id: str):
|
|
"""Get details of a deployment check."""
|
|
db = SessionLocal()
|
|
try:
|
|
row = db.execute(text("""
|
|
SELECT * FROM deployment_checks WHERE id = CAST(:id AS uuid)
|
|
"""), {"id": check_id}).fetchone()
|
|
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Check not found")
|
|
|
|
return {
|
|
"id": str(row.id),
|
|
"tenant_id": str(row.tenant_id),
|
|
"commit_hash": row.commit_hash,
|
|
"branch": row.branch,
|
|
"environment": row.environment,
|
|
"verdict": row.verdict,
|
|
"affected_control_ids": row.affected_control_ids,
|
|
"blocking_controls": row.blocking_controls,
|
|
"warning_controls": row.warning_controls,
|
|
"risk_score": float(row.risk_score),
|
|
"override_by": row.override_by,
|
|
"override_reason": row.override_reason,
|
|
"summary": row.summary,
|
|
"created_at": str(row.created_at),
|
|
}
|
|
finally:
|
|
db.close()
|