Files
breakpilot-core/control-pipeline/api/compliance_commit_routes.py
T
Benjamin Admin e82f99b8cb feat(pipeline): G2 Compliance Commit Ledger — code↔control audit trail
New table: compliance_commits (commit hash, affected controls, risk level)
New API:
  POST /v1/compliance-commits (SDK registers commit + impact)
  GET /v1/compliance-commits (list with filters)
  GET /v1/compliance-commits/by-control/{id} (all commits for a control)
  GET /v1/compliance-commits/stats (dashboard)
  GET /v1/compliance-commits/{id} (detail)

GIN index on affected_control_ids for fast @> containment queries.
454 tests pass, 0 regressions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-06 19:17:45 +02:00

256 lines
8.3 KiB
Python

"""Compliance Commit Ledger API — G2.
Tracks code commits and their compliance impact. SDK reports each commit
with affected controls, building an audit trail for code↔compliance mapping.
"""
import json
import logging
import uuid
from typing import Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import text
from db.session import SessionLocal
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/compliance-commits", tags=["compliance-commits"])
class CreateCommitRequest(BaseModel):
tenant_id: str
project_id: Optional[str] = None
commit_hash: str
commit_message: Optional[str] = None
commit_author: Optional[str] = None
commit_date: Optional[str] = None
branch: Optional[str] = None
repo_url: Optional[str] = None
affected_control_ids: list[str] = []
affected_files: list[str] = []
risk_level: str = "low"
analysis_summary: Optional[str] = None
analysis_metadata: dict = {}
@router.post("")
async def register_commit(req: CreateCommitRequest):
"""Register a code commit with its compliance impact."""
db = SessionLocal()
try:
cid = str(uuid.uuid4())
db.execute(text("""
INSERT INTO compliance_commits
(id, tenant_id, project_id, commit_hash, commit_message,
commit_author, commit_date, branch, repo_url,
affected_control_ids, affected_files,
risk_level, analysis_summary, analysis_metadata)
VALUES
(CAST(:id AS uuid), CAST(:tenant_id AS uuid), :project_id,
:commit_hash, :commit_message, :commit_author,
:commit_date, :branch, :repo_url,
CAST(:control_ids AS jsonb), CAST(:files AS jsonb),
:risk_level, :analysis_summary, CAST(:metadata AS jsonb))
"""), {
"id": cid,
"tenant_id": req.tenant_id,
"project_id": req.project_id,
"commit_hash": req.commit_hash,
"commit_message": req.commit_message,
"commit_author": req.commit_author,
"commit_date": req.commit_date,
"branch": req.branch,
"repo_url": req.repo_url,
"control_ids": json.dumps(req.affected_control_ids),
"files": json.dumps(req.affected_files),
"risk_level": req.risk_level,
"analysis_summary": req.analysis_summary,
"metadata": json.dumps(req.analysis_metadata),
})
db.commit()
return {
"id": cid,
"status": "registered",
"affected_controls": len(req.affected_control_ids),
"risk_level": req.risk_level,
}
finally:
db.close()
@router.get("")
async def list_commits(
tenant_id: Optional[str] = None,
control_id: Optional[str] = None,
risk_level: Optional[str] = None,
branch: Optional[str] = None,
since: Optional[str] = None,
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
):
"""List compliance commits with filters."""
db = SessionLocal()
try:
clauses = []
params: dict = {"limit": limit, "offset": offset}
if tenant_id:
clauses.append("tenant_id = CAST(:tenant_id AS uuid)")
params["tenant_id"] = tenant_id
if control_id:
clauses.append("affected_control_ids @> CAST(:cid_json AS jsonb)")
params["cid_json"] = json.dumps([control_id])
if risk_level:
clauses.append("risk_level = :risk")
params["risk"] = risk_level
if branch:
clauses.append("branch = :branch")
params["branch"] = branch
if since:
clauses.append("commit_date >= CAST(:since AS timestamptz)")
params["since"] = since
where = "WHERE " + " AND ".join(clauses) if clauses else ""
rows = db.execute(text(f"""
SELECT id, commit_hash, commit_message, commit_author, commit_date,
branch, affected_control_ids, affected_files, risk_level
FROM compliance_commits
{where}
ORDER BY commit_date DESC NULLS LAST
LIMIT :limit OFFSET :offset
"""), params).fetchall()
total = db.execute(text(f"""
SELECT count(*) FROM compliance_commits {where}
"""), params).scalar()
return {
"total": total,
"commits": [
{
"id": str(r[0]),
"commit_hash": r[1],
"message": r[2],
"author": r[3],
"date": str(r[4]) if r[4] else None,
"branch": r[5],
"affected_control_ids": r[6],
"affected_files": r[7],
"risk_level": r[8],
}
for r in rows
],
}
finally:
db.close()
@router.get("/stats")
async def commit_stats(tenant_id: Optional[str] = None):
"""Dashboard stats for compliance commits."""
db = SessionLocal()
try:
tf = ""
params: dict = {}
if tenant_id:
tf = "WHERE tenant_id = CAST(:tid AS uuid)"
params["tid"] = tenant_id
risk = db.execute(text(f"""
SELECT risk_level, count(*) FROM compliance_commits {tf}
GROUP BY risk_level
"""), params).fetchall()
recent = db.execute(text(f"""
SELECT count(*) FROM compliance_commits
{tf + ' AND' if tf else 'WHERE'} commit_date > NOW() - interval '7 days'
"""), params).scalar()
total = sum(r[1] for r in risk)
return {
"total_commits": total,
"last_7_days": recent,
"by_risk_level": {r[0]: r[1] for r in risk},
}
finally:
db.close()
@router.get("/by-control/{control_id}")
async def commits_by_control(
control_id: str,
limit: int = Query(50, ge=1, le=200),
):
"""Get all commits that affect a specific control."""
db = SessionLocal()
try:
rows = db.execute(text("""
SELECT id, commit_hash, commit_message, commit_author, commit_date,
branch, repo_url, affected_files, risk_level
FROM compliance_commits
WHERE affected_control_ids @> CAST(:cid_json AS jsonb)
ORDER BY commit_date DESC NULLS LAST
LIMIT :limit
"""), {
"cid_json": json.dumps([control_id]),
"limit": limit,
}).fetchall()
return {
"control_id": control_id,
"total_commits": len(rows),
"commits": [
{
"id": str(r[0]),
"commit_hash": r[1],
"message": r[2],
"author": r[3],
"date": str(r[4]) if r[4] else None,
"branch": r[5],
"repo_url": r[6],
"affected_files": r[7],
"risk_level": r[8],
}
for r in rows
],
}
finally:
db.close()
@router.get("/{commit_id}")
async def get_commit(commit_id: str):
"""Get details of a single compliance commit."""
db = SessionLocal()
try:
row = db.execute(text("""
SELECT * FROM compliance_commits WHERE id = CAST(:id AS uuid)
"""), {"id": commit_id}).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Commit not found")
return {
"id": str(row.id),
"tenant_id": str(row.tenant_id),
"project_id": str(row.project_id) if row.project_id else None,
"commit_hash": row.commit_hash,
"commit_message": row.commit_message,
"commit_author": row.commit_author,
"commit_date": str(row.commit_date) if row.commit_date else None,
"branch": row.branch,
"repo_url": row.repo_url,
"affected_control_ids": row.affected_control_ids,
"affected_files": row.affected_files,
"risk_level": row.risk_level,
"analysis_summary": row.analysis_summary,
"analysis_metadata": row.analysis_metadata,
}
finally:
db.close()