""" FastAPI routes for the Canonical Control Library. Independently authored security controls anchored in open-source frameworks (OWASP, NIST, ENISA). No proprietary nomenclature. Endpoints: GET /v1/canonical/frameworks — All frameworks GET /v1/canonical/frameworks/{framework_id} — Framework details GET /v1/canonical/frameworks/{framework_id}/controls — Controls of a framework GET /v1/canonical/controls — All controls (filterable) GET /v1/canonical/controls/{control_id} — Single control GET /v1/canonical/controls/{control_id}/similar — Find similar controls POST /v1/canonical/controls — Create a control PUT /v1/canonical/controls/{control_id} — Update a control DELETE /v1/canonical/controls/{control_id} — Delete a control GET /v1/canonical/categories — Category list GET /v1/canonical/sources — Source registry GET /v1/canonical/licenses — License matrix POST /v1/canonical/controls/{control_id}/similarity-check — Too-close check """ from __future__ import annotations import logging from typing import Any, Optional from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel from sqlalchemy import text from database import SessionLocal from compliance.services.license_gate import get_license_matrix, get_source_permissions from compliance.services.similarity_detector import check_similarity logger = logging.getLogger(__name__) router = APIRouter(prefix="/v1/canonical", tags=["canonical-controls"]) # ============================================================================= # RESPONSE MODELS # ============================================================================= class FrameworkResponse(BaseModel): id: str framework_id: str name: str version: str description: Optional[str] = None owner: Optional[str] = None policy_version: Optional[str] = None release_state: str created_at: str updated_at: str class ControlResponse(BaseModel): id: str framework_id: str control_id: str title: str objective: str rationale: str scope: dict requirements: list test_procedure: list evidence: list severity: str risk_score: Optional[float] = None implementation_effort: Optional[str] = None evidence_confidence: Optional[float] = None open_anchors: list release_state: str tags: list license_rule: Optional[int] = None source_original_text: Optional[str] = None source_citation: Optional[dict] = None customer_visible: Optional[bool] = None verification_method: Optional[str] = None category: Optional[str] = None target_audience: Optional[str] = None generation_metadata: Optional[dict] = None created_at: str updated_at: str class ControlCreateRequest(BaseModel): framework_id: str # e.g. 'bp_security_v1' control_id: str # e.g. 'AUTH-003' title: str objective: str rationale: str scope: dict = {} requirements: list = [] test_procedure: list = [] evidence: list = [] severity: str = "medium" risk_score: Optional[float] = None implementation_effort: Optional[str] = None evidence_confidence: Optional[float] = None open_anchors: list = [] release_state: str = "draft" tags: list = [] license_rule: Optional[int] = None source_original_text: Optional[str] = None source_citation: Optional[dict] = None customer_visible: Optional[bool] = True verification_method: Optional[str] = None category: Optional[str] = None target_audience: Optional[str] = None generation_metadata: Optional[dict] = None class ControlUpdateRequest(BaseModel): title: Optional[str] = None objective: Optional[str] = None rationale: Optional[str] = None scope: Optional[dict] = None requirements: Optional[list] = None test_procedure: Optional[list] = None evidence: Optional[list] = None severity: Optional[str] = None risk_score: Optional[float] = None implementation_effort: Optional[str] = None evidence_confidence: Optional[float] = None open_anchors: Optional[list] = None release_state: Optional[str] = None tags: Optional[list] = None license_rule: Optional[int] = None source_original_text: Optional[str] = None source_citation: Optional[dict] = None customer_visible: Optional[bool] = None verification_method: Optional[str] = None category: Optional[str] = None target_audience: Optional[str] = None generation_metadata: Optional[dict] = None class SimilarityCheckRequest(BaseModel): source_text: str candidate_text: str class SimilarityCheckResponse(BaseModel): max_exact_run: int token_overlap: float ngram_jaccard: float embedding_cosine: float lcs_ratio: float status: str details: dict # ============================================================================= # HELPERS # ============================================================================= _CONTROL_COLS = """id, framework_id, control_id, title, objective, rationale, scope, requirements, test_procedure, evidence, severity, risk_score, implementation_effort, evidence_confidence, open_anchors, release_state, tags, license_rule, source_original_text, source_citation, customer_visible, verification_method, category, target_audience, generation_metadata, created_at, updated_at""" def _row_to_dict(row, columns: list[str]) -> dict[str, Any]: """Generic row → dict converter.""" return {col: (getattr(row, col).isoformat() if hasattr(getattr(row, col, None), 'isoformat') else getattr(row, col)) for col in columns} # ============================================================================= # FRAMEWORKS # ============================================================================= @router.get("/frameworks") async def list_frameworks(): """List all registered control frameworks.""" with SessionLocal() as db: rows = db.execute( text(""" SELECT id, framework_id, name, version, description, owner, policy_version, release_state, created_at, updated_at FROM canonical_control_frameworks ORDER BY name """) ).fetchall() return [ { "id": str(r.id), "framework_id": r.framework_id, "name": r.name, "version": r.version, "description": r.description, "owner": r.owner, "policy_version": r.policy_version, "release_state": r.release_state, "created_at": r.created_at.isoformat() if r.created_at else None, "updated_at": r.updated_at.isoformat() if r.updated_at else None, } for r in rows ] @router.get("/frameworks/{framework_id}") async def get_framework(framework_id: str): """Get a single framework by its framework_id.""" with SessionLocal() as db: row = db.execute( text(""" SELECT id, framework_id, name, version, description, owner, policy_version, release_state, created_at, updated_at FROM canonical_control_frameworks WHERE framework_id = :fid """), {"fid": framework_id}, ).fetchone() if not row: raise HTTPException(status_code=404, detail="Framework not found") return { "id": str(row.id), "framework_id": row.framework_id, "name": row.name, "version": row.version, "description": row.description, "owner": row.owner, "policy_version": row.policy_version, "release_state": row.release_state, "created_at": row.created_at.isoformat() if row.created_at else None, "updated_at": row.updated_at.isoformat() if row.updated_at else None, } @router.get("/frameworks/{framework_id}/controls") async def list_framework_controls( framework_id: str, severity: Optional[str] = Query(None), release_state: Optional[str] = Query(None), verification_method: Optional[str] = Query(None), category: Optional[str] = Query(None), target_audience: Optional[str] = Query(None), ): """List controls belonging to a framework.""" with SessionLocal() as db: # Resolve framework UUID fw = db.execute( text("SELECT id FROM canonical_control_frameworks WHERE framework_id = :fid"), {"fid": framework_id}, ).fetchone() if not fw: raise HTTPException(status_code=404, detail="Framework not found") query = f""" SELECT {_CONTROL_COLS} FROM canonical_controls WHERE framework_id = :fw_id """ params: dict[str, Any] = {"fw_id": str(fw.id)} if severity: query += " AND severity = :sev" params["sev"] = severity if release_state: query += " AND release_state = :rs" params["rs"] = release_state if verification_method: query += " AND verification_method = :vm" params["vm"] = verification_method if category: query += " AND category = :cat" params["cat"] = category if target_audience: query += " AND target_audience = :ta" params["ta"] = target_audience query += " ORDER BY control_id" rows = db.execute(text(query), params).fetchall() return [_control_row(r) for r in rows] # ============================================================================= # CONTROLS # ============================================================================= @router.get("/controls") async def list_controls( severity: Optional[str] = Query(None), domain: Optional[str] = Query(None), release_state: Optional[str] = Query(None), verification_method: Optional[str] = Query(None), category: Optional[str] = Query(None), target_audience: Optional[str] = Query(None), ): """List all canonical controls, with optional filters.""" query = f""" SELECT {_CONTROL_COLS} FROM canonical_controls WHERE 1=1 """ params: dict[str, Any] = {} if severity: query += " AND severity = :sev" params["sev"] = severity if domain: query += " AND LEFT(control_id, LENGTH(:dom)) = :dom" params["dom"] = domain.upper() if release_state: query += " AND release_state = :rs" params["rs"] = release_state if verification_method: query += " AND verification_method = :vm" params["vm"] = verification_method if category: query += " AND category = :cat" params["cat"] = category if target_audience: query += " AND target_audience = :ta" params["ta"] = target_audience query += " ORDER BY control_id" with SessionLocal() as db: rows = db.execute(text(query), params).fetchall() return [_control_row(r) for r in rows] @router.get("/controls/{control_id}") async def get_control(control_id: str): """Get a single canonical control by its control_id (e.g. AUTH-001).""" with SessionLocal() as db: row = db.execute( text(f""" SELECT {_CONTROL_COLS} FROM canonical_controls WHERE control_id = :cid """), {"cid": control_id.upper()}, ).fetchone() if not row: raise HTTPException(status_code=404, detail="Control not found") return _control_row(row) # ============================================================================= # CONTROL CRUD (CREATE / UPDATE / DELETE) # ============================================================================= @router.post("/controls", status_code=201) async def create_control(body: ControlCreateRequest): """Create a new canonical control.""" import json as _json import re # Validate control_id format if not re.match(r"^[A-Z]{2,6}-[0-9]{3}$", body.control_id): raise HTTPException(status_code=400, detail="control_id must match DOMAIN-NNN (e.g. AUTH-001)") if body.severity not in ("low", "medium", "high", "critical"): raise HTTPException(status_code=400, detail="severity must be low/medium/high/critical") if body.risk_score is not None and not (0 <= body.risk_score <= 10): raise HTTPException(status_code=400, detail="risk_score must be 0..10") with SessionLocal() as db: # Resolve framework fw = db.execute( text("SELECT id FROM canonical_control_frameworks WHERE framework_id = :fid"), {"fid": body.framework_id}, ).fetchone() if not fw: raise HTTPException(status_code=404, detail=f"Framework '{body.framework_id}' not found") # Check duplicate existing = db.execute( text("SELECT id FROM canonical_controls WHERE framework_id = :fid AND control_id = :cid"), {"fid": str(fw.id), "cid": body.control_id}, ).fetchone() if existing: raise HTTPException(status_code=409, detail=f"Control '{body.control_id}' already exists") row = db.execute( text(f""" INSERT INTO canonical_controls ( framework_id, control_id, title, objective, rationale, scope, requirements, test_procedure, evidence, severity, risk_score, implementation_effort, evidence_confidence, open_anchors, release_state, tags, license_rule, source_original_text, source_citation, customer_visible, verification_method, category, target_audience, generation_metadata ) VALUES ( :fw_id, :cid, :title, :objective, :rationale, CAST(:scope AS jsonb), CAST(:requirements AS jsonb), CAST(:test_procedure AS jsonb), CAST(:evidence AS jsonb), :severity, :risk_score, :effort, :confidence, CAST(:anchors AS jsonb), :release_state, CAST(:tags AS jsonb), :license_rule, :source_original_text, CAST(:source_citation AS jsonb), :customer_visible, :verification_method, :category, :target_audience, CAST(:generation_metadata AS jsonb) ) RETURNING {_CONTROL_COLS} """), { "fw_id": str(fw.id), "cid": body.control_id, "title": body.title, "objective": body.objective, "rationale": body.rationale, "scope": _json.dumps(body.scope), "requirements": _json.dumps(body.requirements), "test_procedure": _json.dumps(body.test_procedure), "evidence": _json.dumps(body.evidence), "severity": body.severity, "risk_score": body.risk_score, "effort": body.implementation_effort, "confidence": body.evidence_confidence, "anchors": _json.dumps(body.open_anchors), "release_state": body.release_state, "tags": _json.dumps(body.tags), "license_rule": body.license_rule, "source_original_text": body.source_original_text, "source_citation": _json.dumps(body.source_citation) if body.source_citation else None, "customer_visible": body.customer_visible, "verification_method": body.verification_method, "category": body.category, "target_audience": body.target_audience, "generation_metadata": _json.dumps(body.generation_metadata) if body.generation_metadata else None, }, ).fetchone() db.commit() return _control_row(row) @router.put("/controls/{control_id}") async def update_control(control_id: str, body: ControlUpdateRequest): """Update an existing canonical control (partial update).""" import json as _json updates = body.dict(exclude_none=True) if not updates: raise HTTPException(status_code=400, detail="No fields to update") if "severity" in updates and updates["severity"] not in ("low", "medium", "high", "critical"): raise HTTPException(status_code=400, detail="severity must be low/medium/high/critical") if "risk_score" in updates and updates["risk_score"] is not None and not (0 <= updates["risk_score"] <= 10): raise HTTPException(status_code=400, detail="risk_score must be 0..10") # Build dynamic SET clause set_parts = [] params: dict[str, Any] = {"cid": control_id.upper()} json_fields = {"scope", "requirements", "test_procedure", "evidence", "open_anchors", "tags", "source_citation", "generation_metadata"} for key, val in updates.items(): col = key if key in json_fields: set_parts.append(f"{col} = CAST(:{key} AS jsonb)") params[key] = _json.dumps(val) else: set_parts.append(f"{col} = :{key}") params[key] = val set_parts.append("updated_at = NOW()") with SessionLocal() as db: row = db.execute( text(f""" UPDATE canonical_controls SET {', '.join(set_parts)} WHERE control_id = :cid RETURNING {_CONTROL_COLS} """), params, ).fetchone() if not row: raise HTTPException(status_code=404, detail="Control not found") db.commit() return _control_row(row) @router.delete("/controls/{control_id}", status_code=204) async def delete_control(control_id: str): """Delete a canonical control.""" with SessionLocal() as db: result = db.execute( text("DELETE FROM canonical_controls WHERE control_id = :cid"), {"cid": control_id.upper()}, ) if result.rowcount == 0: raise HTTPException(status_code=404, detail="Control not found") db.commit() return None # ============================================================================= # SIMILARITY CHECK # ============================================================================= @router.post("/controls/{control_id}/similarity-check") async def similarity_check(control_id: str, body: SimilarityCheckRequest): """Run the too-close detector against a source/candidate text pair.""" report = await check_similarity(body.source_text, body.candidate_text) return { "control_id": control_id.upper(), "max_exact_run": report.max_exact_run, "token_overlap": report.token_overlap, "ngram_jaccard": report.ngram_jaccard, "embedding_cosine": report.embedding_cosine, "lcs_ratio": report.lcs_ratio, "status": report.status, "details": report.details, } # ============================================================================= # CATEGORIES # ============================================================================= @router.get("/categories") async def list_categories(): """List all canonical control categories.""" with SessionLocal() as db: rows = db.execute( text("SELECT category_id, label_de, label_en, sort_order FROM canonical_control_categories ORDER BY sort_order") ).fetchall() return [ { "category_id": r.category_id, "label_de": r.label_de, "label_en": r.label_en, "sort_order": r.sort_order, } for r in rows ] # ============================================================================= # SIMILAR CONTROLS (Embedding-based dedup) # ============================================================================= @router.get("/controls/{control_id}/similar") async def find_similar_controls( control_id: str, threshold: float = Query(0.85, ge=0.5, le=1.0), limit: int = Query(20, ge=1, le=100), ): """Find controls similar to the given one using embedding cosine similarity.""" with SessionLocal() as db: # Get the target control's embedding target = db.execute( text(""" SELECT id, control_id, title, objective FROM canonical_controls WHERE control_id = :cid """), {"cid": control_id.upper()}, ).fetchone() if not target: raise HTTPException(status_code=404, detail="Control not found") # Find similar controls using pg_vector cosine distance if available, # otherwise fall back to text-based matching via objective similarity try: rows = db.execute( text(""" SELECT c.control_id, c.title, c.severity, c.release_state, c.tags, c.license_rule, c.verification_method, c.category, 1 - (c.embedding <=> t.embedding) AS similarity FROM canonical_controls c, canonical_controls t WHERE t.control_id = :cid AND c.control_id != :cid AND c.release_state != 'deprecated' AND c.embedding IS NOT NULL AND t.embedding IS NOT NULL AND 1 - (c.embedding <=> t.embedding) >= :threshold ORDER BY similarity DESC LIMIT :lim """), {"cid": control_id.upper(), "threshold": threshold, "lim": limit}, ).fetchall() return [ { "control_id": r.control_id, "title": r.title, "severity": r.severity, "release_state": r.release_state, "tags": r.tags or [], "license_rule": r.license_rule, "verification_method": r.verification_method, "category": r.category, "similarity": round(float(r.similarity), 4), } for r in rows ] except Exception as e: logger.warning("Embedding similarity query failed (no embedding column?): %s", e) return [] # ============================================================================= # SOURCES & LICENSES # ============================================================================= @router.get("/sources") async def list_sources(): """List all registered sources with permission flags.""" with SessionLocal() as db: return get_source_permissions(db) @router.get("/licenses") async def list_licenses(): """Return the license matrix.""" with SessionLocal() as db: return get_license_matrix(db) # ============================================================================= # INTERNAL HELPERS # ============================================================================= def _control_row(r) -> dict: return { "id": str(r.id), "framework_id": str(r.framework_id), "control_id": r.control_id, "title": r.title, "objective": r.objective, "rationale": r.rationale, "scope": r.scope, "requirements": r.requirements, "test_procedure": r.test_procedure, "evidence": r.evidence, "severity": r.severity, "risk_score": float(r.risk_score) if r.risk_score is not None else None, "implementation_effort": r.implementation_effort, "evidence_confidence": float(r.evidence_confidence) if r.evidence_confidence is not None else None, "open_anchors": r.open_anchors, "release_state": r.release_state, "tags": r.tags or [], "license_rule": r.license_rule, "source_original_text": r.source_original_text, "source_citation": r.source_citation, "customer_visible": r.customer_visible, "verification_method": r.verification_method, "category": r.category, "target_audience": r.target_audience, "generation_metadata": r.generation_metadata, "created_at": r.created_at.isoformat() if r.created_at else None, "updated_at": r.updated_at.isoformat() if r.updated_at else None, }