""" FastAPI routes for the Canonical Control Library. Independently authored security controls anchored in open-source frameworks (OWASP, NIST, ENISA). No proprietary nomenclature. Endpoints: GET /v1/canonical/frameworks — All frameworks GET /v1/canonical/frameworks/{framework_id} — Framework details GET /v1/canonical/frameworks/{framework_id}/controls — Controls of a framework GET /v1/canonical/controls — All controls (filterable) GET /v1/canonical/controls/{control_id} — Single control POST /v1/canonical/controls — Create a control PUT /v1/canonical/controls/{control_id} — Update a control DELETE /v1/canonical/controls/{control_id} — Delete a control GET /v1/canonical/sources — Source registry GET /v1/canonical/licenses — License matrix POST /v1/canonical/controls/{control_id}/similarity-check — Too-close check """ from __future__ import annotations import logging from typing import Any, Optional from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel from sqlalchemy import text from database import SessionLocal from compliance.services.license_gate import get_license_matrix, get_source_permissions from compliance.services.similarity_detector import check_similarity logger = logging.getLogger(__name__) router = APIRouter(prefix="/v1/canonical", tags=["canonical-controls"]) # ============================================================================= # RESPONSE MODELS # ============================================================================= class FrameworkResponse(BaseModel): id: str framework_id: str name: str version: str description: Optional[str] = None owner: Optional[str] = None policy_version: Optional[str] = None release_state: str created_at: str updated_at: str class ControlResponse(BaseModel): id: str framework_id: str control_id: str title: str objective: str rationale: str scope: dict requirements: list test_procedure: list evidence: list severity: str risk_score: Optional[float] = None implementation_effort: Optional[str] = None evidence_confidence: Optional[float] = None open_anchors: list release_state: str tags: list created_at: str updated_at: str class ControlCreateRequest(BaseModel): framework_id: str # e.g. 'bp_security_v1' control_id: str # e.g. 'AUTH-003' title: str objective: str rationale: str scope: dict = {} requirements: list = [] test_procedure: list = [] evidence: list = [] severity: str = "medium" risk_score: Optional[float] = None implementation_effort: Optional[str] = None evidence_confidence: Optional[float] = None open_anchors: list = [] release_state: str = "draft" tags: list = [] class ControlUpdateRequest(BaseModel): title: Optional[str] = None objective: Optional[str] = None rationale: Optional[str] = None scope: Optional[dict] = None requirements: Optional[list] = None test_procedure: Optional[list] = None evidence: Optional[list] = None severity: Optional[str] = None risk_score: Optional[float] = None implementation_effort: Optional[str] = None evidence_confidence: Optional[float] = None open_anchors: Optional[list] = None release_state: Optional[str] = None tags: Optional[list] = None class SimilarityCheckRequest(BaseModel): source_text: str candidate_text: str class SimilarityCheckResponse(BaseModel): max_exact_run: int token_overlap: float ngram_jaccard: float embedding_cosine: float lcs_ratio: float status: str details: dict # ============================================================================= # HELPERS # ============================================================================= def _row_to_dict(row, columns: list[str]) -> dict[str, Any]: """Generic row → dict converter.""" return {col: (getattr(row, col).isoformat() if hasattr(getattr(row, col, None), 'isoformat') else getattr(row, col)) for col in columns} # ============================================================================= # FRAMEWORKS # ============================================================================= @router.get("/frameworks") async def list_frameworks(): """List all registered control frameworks.""" with SessionLocal() as db: rows = db.execute( text(""" SELECT id, framework_id, name, version, description, owner, policy_version, release_state, created_at, updated_at FROM canonical_control_frameworks ORDER BY name """) ).fetchall() return [ { "id": str(r.id), "framework_id": r.framework_id, "name": r.name, "version": r.version, "description": r.description, "owner": r.owner, "policy_version": r.policy_version, "release_state": r.release_state, "created_at": r.created_at.isoformat() if r.created_at else None, "updated_at": r.updated_at.isoformat() if r.updated_at else None, } for r in rows ] @router.get("/frameworks/{framework_id}") async def get_framework(framework_id: str): """Get a single framework by its framework_id.""" with SessionLocal() as db: row = db.execute( text(""" SELECT id, framework_id, name, version, description, owner, policy_version, release_state, created_at, updated_at FROM canonical_control_frameworks WHERE framework_id = :fid """), {"fid": framework_id}, ).fetchone() if not row: raise HTTPException(status_code=404, detail="Framework not found") return { "id": str(row.id), "framework_id": row.framework_id, "name": row.name, "version": row.version, "description": row.description, "owner": row.owner, "policy_version": row.policy_version, "release_state": row.release_state, "created_at": row.created_at.isoformat() if row.created_at else None, "updated_at": row.updated_at.isoformat() if row.updated_at else None, } @router.get("/frameworks/{framework_id}/controls") async def list_framework_controls( framework_id: str, severity: Optional[str] = Query(None), release_state: Optional[str] = Query(None), ): """List controls belonging to a framework.""" with SessionLocal() as db: # Resolve framework UUID fw = db.execute( text("SELECT id FROM canonical_control_frameworks WHERE framework_id = :fid"), {"fid": framework_id}, ).fetchone() if not fw: raise HTTPException(status_code=404, detail="Framework not found") query = """ SELECT id, framework_id, control_id, title, objective, rationale, scope, requirements, test_procedure, evidence, severity, risk_score, implementation_effort, evidence_confidence, open_anchors, release_state, tags, created_at, updated_at FROM canonical_controls WHERE framework_id = :fw_id """ params: dict[str, Any] = {"fw_id": str(fw.id)} if severity: query += " AND severity = :sev" params["sev"] = severity if release_state: query += " AND release_state = :rs" params["rs"] = release_state query += " ORDER BY control_id" rows = db.execute(text(query), params).fetchall() return [_control_row(r) for r in rows] # ============================================================================= # CONTROLS # ============================================================================= @router.get("/controls") async def list_controls( severity: Optional[str] = Query(None), domain: Optional[str] = Query(None), release_state: Optional[str] = Query(None), ): """List all canonical controls, with optional filters.""" query = """ SELECT id, framework_id, control_id, title, objective, rationale, scope, requirements, test_procedure, evidence, severity, risk_score, implementation_effort, evidence_confidence, open_anchors, release_state, tags, created_at, updated_at FROM canonical_controls WHERE 1=1 """ params: dict[str, Any] = {} if severity: query += " AND severity = :sev" params["sev"] = severity if domain: query += " AND LEFT(control_id, LENGTH(:dom)) = :dom" params["dom"] = domain.upper() if release_state: query += " AND release_state = :rs" params["rs"] = release_state query += " ORDER BY control_id" with SessionLocal() as db: rows = db.execute(text(query), params).fetchall() return [_control_row(r) for r in rows] @router.get("/controls/{control_id}") async def get_control(control_id: str): """Get a single canonical control by its control_id (e.g. AUTH-001).""" with SessionLocal() as db: row = db.execute( text(""" SELECT id, framework_id, control_id, title, objective, rationale, scope, requirements, test_procedure, evidence, severity, risk_score, implementation_effort, evidence_confidence, open_anchors, release_state, tags, created_at, updated_at FROM canonical_controls WHERE control_id = :cid """), {"cid": control_id.upper()}, ).fetchone() if not row: raise HTTPException(status_code=404, detail="Control not found") return _control_row(row) # ============================================================================= # CONTROL CRUD (CREATE / UPDATE / DELETE) # ============================================================================= @router.post("/controls", status_code=201) async def create_control(body: ControlCreateRequest): """Create a new canonical control.""" import json as _json import re # Validate control_id format if not re.match(r"^[A-Z]{2,6}-[0-9]{3}$", body.control_id): raise HTTPException(status_code=400, detail="control_id must match DOMAIN-NNN (e.g. AUTH-001)") if body.severity not in ("low", "medium", "high", "critical"): raise HTTPException(status_code=400, detail="severity must be low/medium/high/critical") if body.risk_score is not None and not (0 <= body.risk_score <= 10): raise HTTPException(status_code=400, detail="risk_score must be 0..10") with SessionLocal() as db: # Resolve framework fw = db.execute( text("SELECT id FROM canonical_control_frameworks WHERE framework_id = :fid"), {"fid": body.framework_id}, ).fetchone() if not fw: raise HTTPException(status_code=404, detail=f"Framework '{body.framework_id}' not found") # Check duplicate existing = db.execute( text("SELECT id FROM canonical_controls WHERE framework_id = :fid AND control_id = :cid"), {"fid": str(fw.id), "cid": body.control_id}, ).fetchone() if existing: raise HTTPException(status_code=409, detail=f"Control '{body.control_id}' already exists") row = db.execute( text(""" INSERT INTO canonical_controls ( framework_id, control_id, title, objective, rationale, scope, requirements, test_procedure, evidence, severity, risk_score, implementation_effort, evidence_confidence, open_anchors, release_state, tags ) VALUES ( :fw_id, :cid, :title, :objective, :rationale, :scope::jsonb, :requirements::jsonb, :test_procedure::jsonb, :evidence::jsonb, :severity, :risk_score, :effort, :confidence, :anchors::jsonb, :release_state, :tags::jsonb ) RETURNING id, framework_id, control_id, title, objective, rationale, scope, requirements, test_procedure, evidence, severity, risk_score, implementation_effort, evidence_confidence, open_anchors, release_state, tags, created_at, updated_at """), { "fw_id": str(fw.id), "cid": body.control_id, "title": body.title, "objective": body.objective, "rationale": body.rationale, "scope": _json.dumps(body.scope), "requirements": _json.dumps(body.requirements), "test_procedure": _json.dumps(body.test_procedure), "evidence": _json.dumps(body.evidence), "severity": body.severity, "risk_score": body.risk_score, "effort": body.implementation_effort, "confidence": body.evidence_confidence, "anchors": _json.dumps(body.open_anchors), "release_state": body.release_state, "tags": _json.dumps(body.tags), }, ).fetchone() db.commit() return _control_row(row) @router.put("/controls/{control_id}") async def update_control(control_id: str, body: ControlUpdateRequest): """Update an existing canonical control (partial update).""" import json as _json updates = body.dict(exclude_none=True) if not updates: raise HTTPException(status_code=400, detail="No fields to update") if "severity" in updates and updates["severity"] not in ("low", "medium", "high", "critical"): raise HTTPException(status_code=400, detail="severity must be low/medium/high/critical") if "risk_score" in updates and updates["risk_score"] is not None and not (0 <= updates["risk_score"] <= 10): raise HTTPException(status_code=400, detail="risk_score must be 0..10") # Build dynamic SET clause set_parts = [] params: dict[str, Any] = {"cid": control_id.upper()} json_fields = {"scope", "requirements", "test_procedure", "evidence", "open_anchors", "tags"} for key, val in updates.items(): col = "implementation_effort" if key == "implementation_effort" else key col = "evidence_confidence" if key == "evidence_confidence" else col if key in json_fields: set_parts.append(f"{col} = :{key}::jsonb") params[key] = _json.dumps(val) else: set_parts.append(f"{col} = :{key}") params[key] = val set_parts.append("updated_at = NOW()") with SessionLocal() as db: row = db.execute( text(f""" UPDATE canonical_controls SET {', '.join(set_parts)} WHERE control_id = :cid RETURNING id, framework_id, control_id, title, objective, rationale, scope, requirements, test_procedure, evidence, severity, risk_score, implementation_effort, evidence_confidence, open_anchors, release_state, tags, created_at, updated_at """), params, ).fetchone() if not row: raise HTTPException(status_code=404, detail="Control not found") db.commit() return _control_row(row) @router.delete("/controls/{control_id}", status_code=204) async def delete_control(control_id: str): """Delete a canonical control.""" with SessionLocal() as db: result = db.execute( text("DELETE FROM canonical_controls WHERE control_id = :cid"), {"cid": control_id.upper()}, ) if result.rowcount == 0: raise HTTPException(status_code=404, detail="Control not found") db.commit() return None # ============================================================================= # SIMILARITY CHECK # ============================================================================= @router.post("/controls/{control_id}/similarity-check") async def similarity_check(control_id: str, body: SimilarityCheckRequest): """Run the too-close detector against a source/candidate text pair.""" report = await check_similarity(body.source_text, body.candidate_text) return { "control_id": control_id.upper(), "max_exact_run": report.max_exact_run, "token_overlap": report.token_overlap, "ngram_jaccard": report.ngram_jaccard, "embedding_cosine": report.embedding_cosine, "lcs_ratio": report.lcs_ratio, "status": report.status, "details": report.details, } # ============================================================================= # SOURCES & LICENSES # ============================================================================= @router.get("/sources") async def list_sources(): """List all registered sources with permission flags.""" with SessionLocal() as db: return get_source_permissions(db) @router.get("/licenses") async def list_licenses(): """Return the license matrix.""" with SessionLocal() as db: return get_license_matrix(db) # ============================================================================= # INTERNAL HELPERS # ============================================================================= def _control_row(r) -> dict: return { "id": str(r.id), "framework_id": str(r.framework_id), "control_id": r.control_id, "title": r.title, "objective": r.objective, "rationale": r.rationale, "scope": r.scope, "requirements": r.requirements, "test_procedure": r.test_procedure, "evidence": r.evidence, "severity": r.severity, "risk_score": float(r.risk_score) if r.risk_score is not None else None, "implementation_effort": r.implementation_effort, "evidence_confidence": float(r.evidence_confidence) if r.evidence_confidence is not None else None, "open_anchors": r.open_anchors, "release_state": r.release_state, "tags": r.tags or [], "created_at": r.created_at.isoformat() if r.created_at else None, "updated_at": r.updated_at.isoformat() if r.updated_at else None, }