""" FastAPI routes for the Canonical Control Library. Provides read-only access to independently authored security controls. All controls are formulated without proprietary nomenclature and anchored in open-source frameworks (OWASP, NIST, ENISA). Endpoints: GET /v1/canonical/frameworks — All frameworks GET /v1/canonical/frameworks/{framework_id} — Framework details GET /v1/canonical/frameworks/{framework_id}/controls — Controls of a framework GET /v1/canonical/controls — All controls (filterable) GET /v1/canonical/controls/{control_id} — Single control by control_id GET /v1/canonical/sources — Source registry GET /v1/canonical/licenses — License matrix POST /v1/canonical/controls/{control_id}/similarity-check — Too-close check """ from __future__ import annotations import logging from typing import Any, Optional from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel from sqlalchemy import text from database import SessionLocal from compliance.services.license_gate import get_license_matrix, get_source_permissions from compliance.services.similarity_detector import check_similarity logger = logging.getLogger(__name__) router = APIRouter(prefix="/v1/canonical", tags=["canonical-controls"]) # ============================================================================= # RESPONSE MODELS # ============================================================================= class FrameworkResponse(BaseModel): id: str framework_id: str name: str version: str description: Optional[str] = None owner: Optional[str] = None policy_version: Optional[str] = None release_state: str created_at: str updated_at: str class ControlResponse(BaseModel): id: str framework_id: str control_id: str title: str objective: str rationale: str scope: dict requirements: list test_procedure: list evidence: list severity: str risk_score: Optional[float] = None implementation_effort: Optional[str] = None evidence_confidence: Optional[float] = None open_anchors: list release_state: str tags: list created_at: str updated_at: str class SimilarityCheckRequest(BaseModel): source_text: str candidate_text: str class SimilarityCheckResponse(BaseModel): max_exact_run: int token_overlap: float ngram_jaccard: float embedding_cosine: float lcs_ratio: float status: str details: dict # ============================================================================= # HELPERS # ============================================================================= def _row_to_dict(row, columns: list[str]) -> dict[str, Any]: """Generic row → dict converter.""" return {col: (getattr(row, col).isoformat() if hasattr(getattr(row, col, None), 'isoformat') else getattr(row, col)) for col in columns} # ============================================================================= # FRAMEWORKS # ============================================================================= @router.get("/frameworks") async def list_frameworks(): """List all registered control frameworks.""" with SessionLocal() as db: rows = db.execute( text(""" SELECT id, framework_id, name, version, description, owner, policy_version, release_state, created_at, updated_at FROM canonical_control_frameworks ORDER BY name """) ).fetchall() return [ { "id": str(r.id), "framework_id": r.framework_id, "name": r.name, "version": r.version, "description": r.description, "owner": r.owner, "policy_version": r.policy_version, "release_state": r.release_state, "created_at": r.created_at.isoformat() if r.created_at else None, "updated_at": r.updated_at.isoformat() if r.updated_at else None, } for r in rows ] @router.get("/frameworks/{framework_id}") async def get_framework(framework_id: str): """Get a single framework by its framework_id.""" with SessionLocal() as db: row = db.execute( text(""" SELECT id, framework_id, name, version, description, owner, policy_version, release_state, created_at, updated_at FROM canonical_control_frameworks WHERE framework_id = :fid """), {"fid": framework_id}, ).fetchone() if not row: raise HTTPException(status_code=404, detail="Framework not found") return { "id": str(row.id), "framework_id": row.framework_id, "name": row.name, "version": row.version, "description": row.description, "owner": row.owner, "policy_version": row.policy_version, "release_state": row.release_state, "created_at": row.created_at.isoformat() if row.created_at else None, "updated_at": row.updated_at.isoformat() if row.updated_at else None, } @router.get("/frameworks/{framework_id}/controls") async def list_framework_controls( framework_id: str, severity: Optional[str] = Query(None), release_state: Optional[str] = Query(None), ): """List controls belonging to a framework.""" with SessionLocal() as db: # Resolve framework UUID fw = db.execute( text("SELECT id FROM canonical_control_frameworks WHERE framework_id = :fid"), {"fid": framework_id}, ).fetchone() if not fw: raise HTTPException(status_code=404, detail="Framework not found") query = """ SELECT id, framework_id, control_id, title, objective, rationale, scope, requirements, test_procedure, evidence, severity, risk_score, implementation_effort, evidence_confidence, open_anchors, release_state, tags, created_at, updated_at FROM canonical_controls WHERE framework_id = :fw_id """ params: dict[str, Any] = {"fw_id": str(fw.id)} if severity: query += " AND severity = :sev" params["sev"] = severity if release_state: query += " AND release_state = :rs" params["rs"] = release_state query += " ORDER BY control_id" rows = db.execute(text(query), params).fetchall() return [_control_row(r) for r in rows] # ============================================================================= # CONTROLS # ============================================================================= @router.get("/controls") async def list_controls( severity: Optional[str] = Query(None), domain: Optional[str] = Query(None), release_state: Optional[str] = Query(None), ): """List all canonical controls, with optional filters.""" query = """ SELECT id, framework_id, control_id, title, objective, rationale, scope, requirements, test_procedure, evidence, severity, risk_score, implementation_effort, evidence_confidence, open_anchors, release_state, tags, created_at, updated_at FROM canonical_controls WHERE 1=1 """ params: dict[str, Any] = {} if severity: query += " AND severity = :sev" params["sev"] = severity if domain: query += " AND LEFT(control_id, LENGTH(:dom)) = :dom" params["dom"] = domain.upper() if release_state: query += " AND release_state = :rs" params["rs"] = release_state query += " ORDER BY control_id" with SessionLocal() as db: rows = db.execute(text(query), params).fetchall() return [_control_row(r) for r in rows] @router.get("/controls/{control_id}") async def get_control(control_id: str): """Get a single canonical control by its control_id (e.g. AUTH-001).""" with SessionLocal() as db: row = db.execute( text(""" SELECT id, framework_id, control_id, title, objective, rationale, scope, requirements, test_procedure, evidence, severity, risk_score, implementation_effort, evidence_confidence, open_anchors, release_state, tags, created_at, updated_at FROM canonical_controls WHERE control_id = :cid """), {"cid": control_id.upper()}, ).fetchone() if not row: raise HTTPException(status_code=404, detail="Control not found") return _control_row(row) # ============================================================================= # SIMILARITY CHECK # ============================================================================= @router.post("/controls/{control_id}/similarity-check") async def similarity_check(control_id: str, body: SimilarityCheckRequest): """Run the too-close detector against a source/candidate text pair.""" report = await check_similarity(body.source_text, body.candidate_text) return { "control_id": control_id.upper(), "max_exact_run": report.max_exact_run, "token_overlap": report.token_overlap, "ngram_jaccard": report.ngram_jaccard, "embedding_cosine": report.embedding_cosine, "lcs_ratio": report.lcs_ratio, "status": report.status, "details": report.details, } # ============================================================================= # SOURCES & LICENSES # ============================================================================= @router.get("/sources") async def list_sources(): """List all registered sources with permission flags.""" with SessionLocal() as db: return get_source_permissions(db) @router.get("/licenses") async def list_licenses(): """Return the license matrix.""" with SessionLocal() as db: return get_license_matrix(db) # ============================================================================= # INTERNAL HELPERS # ============================================================================= def _control_row(r) -> dict: return { "id": str(r.id), "framework_id": str(r.framework_id), "control_id": r.control_id, "title": r.title, "objective": r.objective, "rationale": r.rationale, "scope": r.scope, "requirements": r.requirements, "test_procedure": r.test_procedure, "evidence": r.evidence, "severity": r.severity, "risk_score": float(r.risk_score) if r.risk_score is not None else None, "implementation_effort": r.implementation_effort, "evidence_confidence": float(r.evidence_confidence) if r.evidence_confidence is not None else None, "open_anchors": r.open_anchors, "release_state": r.release_state, "tags": r.tags or [], "created_at": r.created_at.isoformat() if r.created_at else None, "updated_at": r.updated_at.isoformat() if r.updated_at else None, }