Files
breakpilot-compliance/backend-compliance/compliance/api/canonical_control_routes.py
Benjamin Admin c87f07c99a
All checks were successful
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 39s
CI/CD / test-python-backend-compliance (push) Successful in 39s
CI/CD / test-python-document-crawler (push) Successful in 30s
CI/CD / test-python-dsms-gateway (push) Successful in 20s
CI/CD / validate-canonical-controls (push) Successful in 12s
CI/CD / deploy-hetzner (push) Successful in 1m37s
feat: seed 10 canonical controls + CRUD endpoints + frontend editor
- Migration 045: Seed 10 controls (AUTH, NET, SUP, LOG, WEB, DATA, CRYP, REL)
  with 39 open-source anchors into the database
- Backend: POST/PUT/DELETE endpoints for canonical controls CRUD
- Frontend proxy: PUT and DELETE methods added to canonical route
- Frontend: Control Library with create/edit/delete UI, full form with
  open anchor management, scope, requirements, evidence, test procedures

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-13 00:28:21 +01:00

515 lines
18 KiB
Python

"""
FastAPI routes for the Canonical Control Library.
Independently authored security controls anchored in open-source frameworks
(OWASP, NIST, ENISA). No proprietary nomenclature.
Endpoints:
GET /v1/canonical/frameworks — All frameworks
GET /v1/canonical/frameworks/{framework_id} — Framework details
GET /v1/canonical/frameworks/{framework_id}/controls — Controls of a framework
GET /v1/canonical/controls — All controls (filterable)
GET /v1/canonical/controls/{control_id} — Single control
POST /v1/canonical/controls — Create a control
PUT /v1/canonical/controls/{control_id} — Update a control
DELETE /v1/canonical/controls/{control_id} — Delete a control
GET /v1/canonical/sources — Source registry
GET /v1/canonical/licenses — License matrix
POST /v1/canonical/controls/{control_id}/similarity-check — Too-close check
"""
from __future__ import annotations
import logging
from typing import Any, Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import text
from database import SessionLocal
from compliance.services.license_gate import get_license_matrix, get_source_permissions
from compliance.services.similarity_detector import check_similarity
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/canonical", tags=["canonical-controls"])
# =============================================================================
# RESPONSE MODELS
# =============================================================================
class FrameworkResponse(BaseModel):
id: str
framework_id: str
name: str
version: str
description: Optional[str] = None
owner: Optional[str] = None
policy_version: Optional[str] = None
release_state: str
created_at: str
updated_at: str
class ControlResponse(BaseModel):
id: str
framework_id: str
control_id: str
title: str
objective: str
rationale: str
scope: dict
requirements: list
test_procedure: list
evidence: list
severity: str
risk_score: Optional[float] = None
implementation_effort: Optional[str] = None
evidence_confidence: Optional[float] = None
open_anchors: list
release_state: str
tags: list
created_at: str
updated_at: str
class ControlCreateRequest(BaseModel):
framework_id: str # e.g. 'bp_security_v1'
control_id: str # e.g. 'AUTH-003'
title: str
objective: str
rationale: str
scope: dict = {}
requirements: list = []
test_procedure: list = []
evidence: list = []
severity: str = "medium"
risk_score: Optional[float] = None
implementation_effort: Optional[str] = None
evidence_confidence: Optional[float] = None
open_anchors: list = []
release_state: str = "draft"
tags: list = []
class ControlUpdateRequest(BaseModel):
title: Optional[str] = None
objective: Optional[str] = None
rationale: Optional[str] = None
scope: Optional[dict] = None
requirements: Optional[list] = None
test_procedure: Optional[list] = None
evidence: Optional[list] = None
severity: Optional[str] = None
risk_score: Optional[float] = None
implementation_effort: Optional[str] = None
evidence_confidence: Optional[float] = None
open_anchors: Optional[list] = None
release_state: Optional[str] = None
tags: Optional[list] = None
class SimilarityCheckRequest(BaseModel):
source_text: str
candidate_text: str
class SimilarityCheckResponse(BaseModel):
max_exact_run: int
token_overlap: float
ngram_jaccard: float
embedding_cosine: float
lcs_ratio: float
status: str
details: dict
# =============================================================================
# HELPERS
# =============================================================================
def _row_to_dict(row, columns: list[str]) -> dict[str, Any]:
"""Generic row → dict converter."""
return {col: (getattr(row, col).isoformat() if hasattr(getattr(row, col, None), 'isoformat') else getattr(row, col)) for col in columns}
# =============================================================================
# FRAMEWORKS
# =============================================================================
@router.get("/frameworks")
async def list_frameworks():
"""List all registered control frameworks."""
with SessionLocal() as db:
rows = db.execute(
text("""
SELECT id, framework_id, name, version, description,
owner, policy_version, release_state,
created_at, updated_at
FROM canonical_control_frameworks
ORDER BY name
""")
).fetchall()
return [
{
"id": str(r.id),
"framework_id": r.framework_id,
"name": r.name,
"version": r.version,
"description": r.description,
"owner": r.owner,
"policy_version": r.policy_version,
"release_state": r.release_state,
"created_at": r.created_at.isoformat() if r.created_at else None,
"updated_at": r.updated_at.isoformat() if r.updated_at else None,
}
for r in rows
]
@router.get("/frameworks/{framework_id}")
async def get_framework(framework_id: str):
"""Get a single framework by its framework_id."""
with SessionLocal() as db:
row = db.execute(
text("""
SELECT id, framework_id, name, version, description,
owner, policy_version, release_state,
created_at, updated_at
FROM canonical_control_frameworks
WHERE framework_id = :fid
"""),
{"fid": framework_id},
).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Framework not found")
return {
"id": str(row.id),
"framework_id": row.framework_id,
"name": row.name,
"version": row.version,
"description": row.description,
"owner": row.owner,
"policy_version": row.policy_version,
"release_state": row.release_state,
"created_at": row.created_at.isoformat() if row.created_at else None,
"updated_at": row.updated_at.isoformat() if row.updated_at else None,
}
@router.get("/frameworks/{framework_id}/controls")
async def list_framework_controls(
framework_id: str,
severity: Optional[str] = Query(None),
release_state: Optional[str] = Query(None),
):
"""List controls belonging to a framework."""
with SessionLocal() as db:
# Resolve framework UUID
fw = db.execute(
text("SELECT id FROM canonical_control_frameworks WHERE framework_id = :fid"),
{"fid": framework_id},
).fetchone()
if not fw:
raise HTTPException(status_code=404, detail="Framework not found")
query = """
SELECT id, framework_id, control_id, title, objective, rationale,
scope, requirements, test_procedure, evidence,
severity, risk_score, implementation_effort,
evidence_confidence, open_anchors, release_state, tags,
created_at, updated_at
FROM canonical_controls
WHERE framework_id = :fw_id
"""
params: dict[str, Any] = {"fw_id": str(fw.id)}
if severity:
query += " AND severity = :sev"
params["sev"] = severity
if release_state:
query += " AND release_state = :rs"
params["rs"] = release_state
query += " ORDER BY control_id"
rows = db.execute(text(query), params).fetchall()
return [_control_row(r) for r in rows]
# =============================================================================
# CONTROLS
# =============================================================================
@router.get("/controls")
async def list_controls(
severity: Optional[str] = Query(None),
domain: Optional[str] = Query(None),
release_state: Optional[str] = Query(None),
):
"""List all canonical controls, with optional filters."""
query = """
SELECT id, framework_id, control_id, title, objective, rationale,
scope, requirements, test_procedure, evidence,
severity, risk_score, implementation_effort,
evidence_confidence, open_anchors, release_state, tags,
created_at, updated_at
FROM canonical_controls
WHERE 1=1
"""
params: dict[str, Any] = {}
if severity:
query += " AND severity = :sev"
params["sev"] = severity
if domain:
query += " AND LEFT(control_id, LENGTH(:dom)) = :dom"
params["dom"] = domain.upper()
if release_state:
query += " AND release_state = :rs"
params["rs"] = release_state
query += " ORDER BY control_id"
with SessionLocal() as db:
rows = db.execute(text(query), params).fetchall()
return [_control_row(r) for r in rows]
@router.get("/controls/{control_id}")
async def get_control(control_id: str):
"""Get a single canonical control by its control_id (e.g. AUTH-001)."""
with SessionLocal() as db:
row = db.execute(
text("""
SELECT id, framework_id, control_id, title, objective, rationale,
scope, requirements, test_procedure, evidence,
severity, risk_score, implementation_effort,
evidence_confidence, open_anchors, release_state, tags,
created_at, updated_at
FROM canonical_controls
WHERE control_id = :cid
"""),
{"cid": control_id.upper()},
).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Control not found")
return _control_row(row)
# =============================================================================
# CONTROL CRUD (CREATE / UPDATE / DELETE)
# =============================================================================
@router.post("/controls", status_code=201)
async def create_control(body: ControlCreateRequest):
"""Create a new canonical control."""
import json as _json
import re
# Validate control_id format
if not re.match(r"^[A-Z]{2,6}-[0-9]{3}$", body.control_id):
raise HTTPException(status_code=400, detail="control_id must match DOMAIN-NNN (e.g. AUTH-001)")
if body.severity not in ("low", "medium", "high", "critical"):
raise HTTPException(status_code=400, detail="severity must be low/medium/high/critical")
if body.risk_score is not None and not (0 <= body.risk_score <= 10):
raise HTTPException(status_code=400, detail="risk_score must be 0..10")
with SessionLocal() as db:
# Resolve framework
fw = db.execute(
text("SELECT id FROM canonical_control_frameworks WHERE framework_id = :fid"),
{"fid": body.framework_id},
).fetchone()
if not fw:
raise HTTPException(status_code=404, detail=f"Framework '{body.framework_id}' not found")
# Check duplicate
existing = db.execute(
text("SELECT id FROM canonical_controls WHERE framework_id = :fid AND control_id = :cid"),
{"fid": str(fw.id), "cid": body.control_id},
).fetchone()
if existing:
raise HTTPException(status_code=409, detail=f"Control '{body.control_id}' already exists")
row = db.execute(
text("""
INSERT INTO canonical_controls (
framework_id, control_id, title, objective, rationale,
scope, requirements, test_procedure, evidence,
severity, risk_score, implementation_effort, evidence_confidence,
open_anchors, release_state, tags
) VALUES (
:fw_id, :cid, :title, :objective, :rationale,
:scope::jsonb, :requirements::jsonb, :test_procedure::jsonb, :evidence::jsonb,
:severity, :risk_score, :effort, :confidence,
:anchors::jsonb, :release_state, :tags::jsonb
)
RETURNING id, framework_id, control_id, title, objective, rationale,
scope, requirements, test_procedure, evidence,
severity, risk_score, implementation_effort,
evidence_confidence, open_anchors, release_state, tags,
created_at, updated_at
"""),
{
"fw_id": str(fw.id),
"cid": body.control_id,
"title": body.title,
"objective": body.objective,
"rationale": body.rationale,
"scope": _json.dumps(body.scope),
"requirements": _json.dumps(body.requirements),
"test_procedure": _json.dumps(body.test_procedure),
"evidence": _json.dumps(body.evidence),
"severity": body.severity,
"risk_score": body.risk_score,
"effort": body.implementation_effort,
"confidence": body.evidence_confidence,
"anchors": _json.dumps(body.open_anchors),
"release_state": body.release_state,
"tags": _json.dumps(body.tags),
},
).fetchone()
db.commit()
return _control_row(row)
@router.put("/controls/{control_id}")
async def update_control(control_id: str, body: ControlUpdateRequest):
"""Update an existing canonical control (partial update)."""
import json as _json
updates = body.dict(exclude_none=True)
if not updates:
raise HTTPException(status_code=400, detail="No fields to update")
if "severity" in updates and updates["severity"] not in ("low", "medium", "high", "critical"):
raise HTTPException(status_code=400, detail="severity must be low/medium/high/critical")
if "risk_score" in updates and updates["risk_score"] is not None and not (0 <= updates["risk_score"] <= 10):
raise HTTPException(status_code=400, detail="risk_score must be 0..10")
# Build dynamic SET clause
set_parts = []
params: dict[str, Any] = {"cid": control_id.upper()}
json_fields = {"scope", "requirements", "test_procedure", "evidence", "open_anchors", "tags"}
for key, val in updates.items():
col = "implementation_effort" if key == "implementation_effort" else key
col = "evidence_confidence" if key == "evidence_confidence" else col
if key in json_fields:
set_parts.append(f"{col} = :{key}::jsonb")
params[key] = _json.dumps(val)
else:
set_parts.append(f"{col} = :{key}")
params[key] = val
set_parts.append("updated_at = NOW()")
with SessionLocal() as db:
row = db.execute(
text(f"""
UPDATE canonical_controls
SET {', '.join(set_parts)}
WHERE control_id = :cid
RETURNING id, framework_id, control_id, title, objective, rationale,
scope, requirements, test_procedure, evidence,
severity, risk_score, implementation_effort,
evidence_confidence, open_anchors, release_state, tags,
created_at, updated_at
"""),
params,
).fetchone()
if not row:
raise HTTPException(status_code=404, detail="Control not found")
db.commit()
return _control_row(row)
@router.delete("/controls/{control_id}", status_code=204)
async def delete_control(control_id: str):
"""Delete a canonical control."""
with SessionLocal() as db:
result = db.execute(
text("DELETE FROM canonical_controls WHERE control_id = :cid"),
{"cid": control_id.upper()},
)
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Control not found")
db.commit()
return None
# =============================================================================
# SIMILARITY CHECK
# =============================================================================
@router.post("/controls/{control_id}/similarity-check")
async def similarity_check(control_id: str, body: SimilarityCheckRequest):
"""Run the too-close detector against a source/candidate text pair."""
report = await check_similarity(body.source_text, body.candidate_text)
return {
"control_id": control_id.upper(),
"max_exact_run": report.max_exact_run,
"token_overlap": report.token_overlap,
"ngram_jaccard": report.ngram_jaccard,
"embedding_cosine": report.embedding_cosine,
"lcs_ratio": report.lcs_ratio,
"status": report.status,
"details": report.details,
}
# =============================================================================
# SOURCES & LICENSES
# =============================================================================
@router.get("/sources")
async def list_sources():
"""List all registered sources with permission flags."""
with SessionLocal() as db:
return get_source_permissions(db)
@router.get("/licenses")
async def list_licenses():
"""Return the license matrix."""
with SessionLocal() as db:
return get_license_matrix(db)
# =============================================================================
# INTERNAL HELPERS
# =============================================================================
def _control_row(r) -> dict:
return {
"id": str(r.id),
"framework_id": str(r.framework_id),
"control_id": r.control_id,
"title": r.title,
"objective": r.objective,
"rationale": r.rationale,
"scope": r.scope,
"requirements": r.requirements,
"test_procedure": r.test_procedure,
"evidence": r.evidence,
"severity": r.severity,
"risk_score": float(r.risk_score) if r.risk_score is not None else None,
"implementation_effort": r.implementation_effort,
"evidence_confidence": float(r.evidence_confidence) if r.evidence_confidence is not None else None,
"open_anchors": r.open_anchors,
"release_state": r.release_state,
"tags": r.tags or [],
"created_at": r.created_at.isoformat() if r.created_at else None,
"updated_at": r.updated_at.isoformat() if r.updated_at else None,
}