diff --git a/backend-compliance/compliance/api/canonical_control_routes.py b/backend-compliance/compliance/api/canonical_control_routes.py index 241e0a4..21002a3 100644 --- a/backend-compliance/compliance/api/canonical_control_routes.py +++ b/backend-compliance/compliance/api/canonical_control_routes.py @@ -5,133 +5,46 @@ Independently authored security controls anchored in open-source frameworks (OWASP, NIST, ENISA). No proprietary nomenclature. Endpoints: - GET /v1/canonical/frameworks — All frameworks - GET /v1/canonical/frameworks/{framework_id} — Framework details - GET /v1/canonical/frameworks/{framework_id}/controls — Controls of a framework - GET /v1/canonical/controls — All controls (filterable) - GET /v1/canonical/controls/{control_id} — Single control - POST /v1/canonical/controls — Create a control - PUT /v1/canonical/controls/{control_id} — Update a control - DELETE /v1/canonical/controls/{control_id} — Delete a control - GET /v1/canonical/sources — Source registry - GET /v1/canonical/licenses — License matrix - POST /v1/canonical/controls/{control_id}/similarity-check — Too-close check + GET /v1/canonical/frameworks - All frameworks + GET /v1/canonical/frameworks/{framework_id} - Framework details + GET /v1/canonical/frameworks/{framework_id}/controls - Framework controls + GET /v1/canonical/controls - All controls + GET /v1/canonical/controls/{control_id} - Single control + POST /v1/canonical/controls - Create + PUT /v1/canonical/controls/{control_id} - Update (partial) + DELETE /v1/canonical/controls/{control_id} - Delete + POST /v1/canonical/controls/{control_id}/similarity-check - Too-close check + GET /v1/canonical/sources - Source registry + GET /v1/canonical/licenses - License matrix + +Phase 1 Step 4 refactor: handlers delegate to CanonicalControlService. """ -from __future__ import annotations - -import logging from typing import Any, Optional -from fastapi import APIRouter, HTTPException, Query -from pydantic import BaseModel -from sqlalchemy import text +from fastapi import APIRouter, Depends, Query +from sqlalchemy.orm import Session -from database import SessionLocal -from compliance.services.license_gate import get_license_matrix, get_source_permissions -from compliance.services.similarity_detector import check_similarity +from classroom_engine.database import get_db +from compliance.api._http_errors import translate_domain_errors +from compliance.schemas.canonical_control import ( + ControlCreateRequest, + ControlResponse, + ControlUpdateRequest, + FrameworkResponse, + SimilarityCheckRequest, + SimilarityCheckResponse, +) +from compliance.services.canonical_control_service import ( + CanonicalControlService, + _control_row, # re-exported for legacy test imports +) -logger = logging.getLogger(__name__) router = APIRouter(prefix="/v1/canonical", tags=["canonical-controls"]) -# ============================================================================= -# RESPONSE MODELS -# ============================================================================= - -class FrameworkResponse(BaseModel): - id: str - framework_id: str - name: str - version: str - description: Optional[str] = None - owner: Optional[str] = None - policy_version: Optional[str] = None - release_state: str - created_at: str - updated_at: str - - -class ControlResponse(BaseModel): - id: str - framework_id: str - control_id: str - title: str - objective: str - rationale: str - scope: dict - requirements: list - test_procedure: list - evidence: list - severity: str - risk_score: Optional[float] = None - implementation_effort: Optional[str] = None - evidence_confidence: Optional[float] = None - open_anchors: list - release_state: str - tags: list - created_at: str - updated_at: str - - -class ControlCreateRequest(BaseModel): - framework_id: str # e.g. 'bp_security_v1' - control_id: str # e.g. 'AUTH-003' - title: str - objective: str - rationale: str - scope: dict = {} - requirements: list = [] - test_procedure: list = [] - evidence: list = [] - severity: str = "medium" - risk_score: Optional[float] = None - implementation_effort: Optional[str] = None - evidence_confidence: Optional[float] = None - open_anchors: list = [] - release_state: str = "draft" - tags: list = [] - - -class ControlUpdateRequest(BaseModel): - title: Optional[str] = None - objective: Optional[str] = None - rationale: Optional[str] = None - scope: Optional[dict] = None - requirements: Optional[list] = None - test_procedure: Optional[list] = None - evidence: Optional[list] = None - severity: Optional[str] = None - risk_score: Optional[float] = None - implementation_effort: Optional[str] = None - evidence_confidence: Optional[float] = None - open_anchors: Optional[list] = None - release_state: Optional[str] = None - tags: Optional[list] = None - - -class SimilarityCheckRequest(BaseModel): - source_text: str - candidate_text: str - - -class SimilarityCheckResponse(BaseModel): - max_exact_run: int - token_overlap: float - ngram_jaccard: float - embedding_cosine: float - lcs_ratio: float - status: str - details: dict - - -# ============================================================================= -# HELPERS -# ============================================================================= - -def _row_to_dict(row, columns: list[str]) -> dict[str, Any]: - """Generic row → dict converter.""" - return {col: (getattr(row, col).isoformat() if hasattr(getattr(row, col, None), 'isoformat') else getattr(row, col)) for col in columns} +def get_canonical_service(db: Session = Depends(get_db)) -> CanonicalControlService: + return CanonicalControlService(db) # ============================================================================= @@ -139,66 +52,22 @@ def _row_to_dict(row, columns: list[str]) -> dict[str, Any]: # ============================================================================= @router.get("/frameworks") -async def list_frameworks(): +async def list_frameworks( + service: CanonicalControlService = Depends(get_canonical_service), +) -> list[dict[str, Any]]: """List all registered control frameworks.""" - with SessionLocal() as db: - rows = db.execute( - text(""" - SELECT id, framework_id, name, version, description, - owner, policy_version, release_state, - created_at, updated_at - FROM canonical_control_frameworks - ORDER BY name - """) - ).fetchall() - - return [ - { - "id": str(r.id), - "framework_id": r.framework_id, - "name": r.name, - "version": r.version, - "description": r.description, - "owner": r.owner, - "policy_version": r.policy_version, - "release_state": r.release_state, - "created_at": r.created_at.isoformat() if r.created_at else None, - "updated_at": r.updated_at.isoformat() if r.updated_at else None, - } - for r in rows - ] + with translate_domain_errors(): + return service.list_frameworks() @router.get("/frameworks/{framework_id}") -async def get_framework(framework_id: str): +async def get_framework( + framework_id: str, + service: CanonicalControlService = Depends(get_canonical_service), +) -> dict[str, Any]: """Get a single framework by its framework_id.""" - with SessionLocal() as db: - row = db.execute( - text(""" - SELECT id, framework_id, name, version, description, - owner, policy_version, release_state, - created_at, updated_at - FROM canonical_control_frameworks - WHERE framework_id = :fid - """), - {"fid": framework_id}, - ).fetchone() - - if not row: - raise HTTPException(status_code=404, detail="Framework not found") - - return { - "id": str(row.id), - "framework_id": row.framework_id, - "name": row.name, - "version": row.version, - "description": row.description, - "owner": row.owner, - "policy_version": row.policy_version, - "release_state": row.release_state, - "created_at": row.created_at.isoformat() if row.created_at else None, - "updated_at": row.updated_at.isoformat() if row.updated_at else None, - } + with translate_domain_errors(): + return service.get_framework(framework_id) @router.get("/frameworks/{framework_id}/controls") @@ -206,39 +75,11 @@ async def list_framework_controls( framework_id: str, severity: Optional[str] = Query(None), release_state: Optional[str] = Query(None), -): + service: CanonicalControlService = Depends(get_canonical_service), +) -> list[dict[str, Any]]: """List controls belonging to a framework.""" - with SessionLocal() as db: - # Resolve framework UUID - fw = db.execute( - text("SELECT id FROM canonical_control_frameworks WHERE framework_id = :fid"), - {"fid": framework_id}, - ).fetchone() - if not fw: - raise HTTPException(status_code=404, detail="Framework not found") - - query = """ - SELECT id, framework_id, control_id, title, objective, rationale, - scope, requirements, test_procedure, evidence, - severity, risk_score, implementation_effort, - evidence_confidence, open_anchors, release_state, tags, - created_at, updated_at - FROM canonical_controls - WHERE framework_id = :fw_id - """ - params: dict[str, Any] = {"fw_id": str(fw.id)} - - if severity: - query += " AND severity = :sev" - params["sev"] = severity - if release_state: - query += " AND release_state = :rs" - params["rs"] = release_state - - query += " ORDER BY control_id" - rows = db.execute(text(query), params).fetchall() - - return [_control_row(r) for r in rows] + with translate_domain_errors(): + return service.list_framework_controls(framework_id, severity, release_state) # ============================================================================= @@ -250,202 +91,52 @@ async def list_controls( severity: Optional[str] = Query(None), domain: Optional[str] = Query(None), release_state: Optional[str] = Query(None), -): + service: CanonicalControlService = Depends(get_canonical_service), +) -> list[dict[str, Any]]: """List all canonical controls, with optional filters.""" - query = """ - SELECT id, framework_id, control_id, title, objective, rationale, - scope, requirements, test_procedure, evidence, - severity, risk_score, implementation_effort, - evidence_confidence, open_anchors, release_state, tags, - created_at, updated_at - FROM canonical_controls - WHERE 1=1 - """ - params: dict[str, Any] = {} - - if severity: - query += " AND severity = :sev" - params["sev"] = severity - if domain: - query += " AND LEFT(control_id, LENGTH(:dom)) = :dom" - params["dom"] = domain.upper() - if release_state: - query += " AND release_state = :rs" - params["rs"] = release_state - - query += " ORDER BY control_id" - - with SessionLocal() as db: - rows = db.execute(text(query), params).fetchall() - - return [_control_row(r) for r in rows] + with translate_domain_errors(): + return service.list_controls(severity, domain, release_state) @router.get("/controls/{control_id}") -async def get_control(control_id: str): +async def get_control( + control_id: str, + service: CanonicalControlService = Depends(get_canonical_service), +) -> dict[str, Any]: """Get a single canonical control by its control_id (e.g. AUTH-001).""" - with SessionLocal() as db: - row = db.execute( - text(""" - SELECT id, framework_id, control_id, title, objective, rationale, - scope, requirements, test_procedure, evidence, - severity, risk_score, implementation_effort, - evidence_confidence, open_anchors, release_state, tags, - created_at, updated_at - FROM canonical_controls - WHERE control_id = :cid - """), - {"cid": control_id.upper()}, - ).fetchone() + with translate_domain_errors(): + return service.get_control(control_id) - if not row: - raise HTTPException(status_code=404, detail="Control not found") - - return _control_row(row) - - -# ============================================================================= -# CONTROL CRUD (CREATE / UPDATE / DELETE) -# ============================================================================= @router.post("/controls", status_code=201) -async def create_control(body: ControlCreateRequest): +async def create_control( + body: ControlCreateRequest, + service: CanonicalControlService = Depends(get_canonical_service), +) -> dict[str, Any]: """Create a new canonical control.""" - import json as _json - import re - # Validate control_id format - if not re.match(r"^[A-Z]{2,6}-[0-9]{3}$", body.control_id): - raise HTTPException(status_code=400, detail="control_id must match DOMAIN-NNN (e.g. AUTH-001)") - if body.severity not in ("low", "medium", "high", "critical"): - raise HTTPException(status_code=400, detail="severity must be low/medium/high/critical") - if body.risk_score is not None and not (0 <= body.risk_score <= 10): - raise HTTPException(status_code=400, detail="risk_score must be 0..10") - - with SessionLocal() as db: - # Resolve framework - fw = db.execute( - text("SELECT id FROM canonical_control_frameworks WHERE framework_id = :fid"), - {"fid": body.framework_id}, - ).fetchone() - if not fw: - raise HTTPException(status_code=404, detail=f"Framework '{body.framework_id}' not found") - - # Check duplicate - existing = db.execute( - text("SELECT id FROM canonical_controls WHERE framework_id = :fid AND control_id = :cid"), - {"fid": str(fw.id), "cid": body.control_id}, - ).fetchone() - if existing: - raise HTTPException(status_code=409, detail=f"Control '{body.control_id}' already exists") - - row = db.execute( - text(""" - INSERT INTO canonical_controls ( - framework_id, control_id, title, objective, rationale, - scope, requirements, test_procedure, evidence, - severity, risk_score, implementation_effort, evidence_confidence, - open_anchors, release_state, tags - ) VALUES ( - :fw_id, :cid, :title, :objective, :rationale, - :scope::jsonb, :requirements::jsonb, :test_procedure::jsonb, :evidence::jsonb, - :severity, :risk_score, :effort, :confidence, - :anchors::jsonb, :release_state, :tags::jsonb - ) - RETURNING id, framework_id, control_id, title, objective, rationale, - scope, requirements, test_procedure, evidence, - severity, risk_score, implementation_effort, - evidence_confidence, open_anchors, release_state, tags, - created_at, updated_at - """), - { - "fw_id": str(fw.id), - "cid": body.control_id, - "title": body.title, - "objective": body.objective, - "rationale": body.rationale, - "scope": _json.dumps(body.scope), - "requirements": _json.dumps(body.requirements), - "test_procedure": _json.dumps(body.test_procedure), - "evidence": _json.dumps(body.evidence), - "severity": body.severity, - "risk_score": body.risk_score, - "effort": body.implementation_effort, - "confidence": body.evidence_confidence, - "anchors": _json.dumps(body.open_anchors), - "release_state": body.release_state, - "tags": _json.dumps(body.tags), - }, - ).fetchone() - db.commit() - - return _control_row(row) + with translate_domain_errors(): + return service.create_control(body) @router.put("/controls/{control_id}") -async def update_control(control_id: str, body: ControlUpdateRequest): +async def update_control( + control_id: str, + body: ControlUpdateRequest, + service: CanonicalControlService = Depends(get_canonical_service), +) -> dict[str, Any]: """Update an existing canonical control (partial update).""" - import json as _json - - updates = body.dict(exclude_none=True) - if not updates: - raise HTTPException(status_code=400, detail="No fields to update") - - if "severity" in updates and updates["severity"] not in ("low", "medium", "high", "critical"): - raise HTTPException(status_code=400, detail="severity must be low/medium/high/critical") - if "risk_score" in updates and updates["risk_score"] is not None and not (0 <= updates["risk_score"] <= 10): - raise HTTPException(status_code=400, detail="risk_score must be 0..10") - - # Build dynamic SET clause - set_parts = [] - params: dict[str, Any] = {"cid": control_id.upper()} - json_fields = {"scope", "requirements", "test_procedure", "evidence", "open_anchors", "tags"} - - for key, val in updates.items(): - col = "implementation_effort" if key == "implementation_effort" else key - col = "evidence_confidence" if key == "evidence_confidence" else col - if key in json_fields: - set_parts.append(f"{col} = :{key}::jsonb") - params[key] = _json.dumps(val) - else: - set_parts.append(f"{col} = :{key}") - params[key] = val - - set_parts.append("updated_at = NOW()") - - with SessionLocal() as db: - row = db.execute( - text(f""" - UPDATE canonical_controls - SET {', '.join(set_parts)} - WHERE control_id = :cid - RETURNING id, framework_id, control_id, title, objective, rationale, - scope, requirements, test_procedure, evidence, - severity, risk_score, implementation_effort, - evidence_confidence, open_anchors, release_state, tags, - created_at, updated_at - """), - params, - ).fetchone() - if not row: - raise HTTPException(status_code=404, detail="Control not found") - db.commit() - - return _control_row(row) + with translate_domain_errors(): + return service.update_control(control_id, body) @router.delete("/controls/{control_id}", status_code=204) -async def delete_control(control_id: str): +async def delete_control( + control_id: str, + service: CanonicalControlService = Depends(get_canonical_service), +) -> None: """Delete a canonical control.""" - with SessionLocal() as db: - result = db.execute( - text("DELETE FROM canonical_controls WHERE control_id = :cid"), - {"cid": control_id.upper()}, - ) - if result.rowcount == 0: - raise HTTPException(status_code=404, detail="Control not found") - db.commit() - - return None + with translate_domain_errors(): + service.delete_control(control_id) # ============================================================================= @@ -453,19 +144,14 @@ async def delete_control(control_id: str): # ============================================================================= @router.post("/controls/{control_id}/similarity-check") -async def similarity_check(control_id: str, body: SimilarityCheckRequest): +async def similarity_check( + control_id: str, + body: SimilarityCheckRequest, + service: CanonicalControlService = Depends(get_canonical_service), +) -> dict[str, Any]: """Run the too-close detector against a source/candidate text pair.""" - report = await check_similarity(body.source_text, body.candidate_text) - return { - "control_id": control_id.upper(), - "max_exact_run": report.max_exact_run, - "token_overlap": report.token_overlap, - "ngram_jaccard": report.ngram_jaccard, - "embedding_cosine": report.embedding_cosine, - "lcs_ratio": report.lcs_ratio, - "status": report.status, - "details": report.details, - } + with translate_domain_errors(): + return await service.similarity_check(control_id, body) # ============================================================================= @@ -473,42 +159,34 @@ async def similarity_check(control_id: str, body: SimilarityCheckRequest): # ============================================================================= @router.get("/sources") -async def list_sources(): +async def list_sources( + service: CanonicalControlService = Depends(get_canonical_service), +) -> Any: """List all registered sources with permission flags.""" - with SessionLocal() as db: - return get_source_permissions(db) + with translate_domain_errors(): + return service.list_sources() @router.get("/licenses") -async def list_licenses(): +async def list_licenses( + service: CanonicalControlService = Depends(get_canonical_service), +) -> Any: """Return the license matrix.""" - with SessionLocal() as db: - return get_license_matrix(db) + with translate_domain_errors(): + return service.list_licenses() -# ============================================================================= -# INTERNAL HELPERS -# ============================================================================= +# ---------------------------------------------------------------------------- +# Legacy re-exports for tests that imported schemas/helpers directly. +# ---------------------------------------------------------------------------- -def _control_row(r) -> dict: - return { - "id": str(r.id), - "framework_id": str(r.framework_id), - "control_id": r.control_id, - "title": r.title, - "objective": r.objective, - "rationale": r.rationale, - "scope": r.scope, - "requirements": r.requirements, - "test_procedure": r.test_procedure, - "evidence": r.evidence, - "severity": r.severity, - "risk_score": float(r.risk_score) if r.risk_score is not None else None, - "implementation_effort": r.implementation_effort, - "evidence_confidence": float(r.evidence_confidence) if r.evidence_confidence is not None else None, - "open_anchors": r.open_anchors, - "release_state": r.release_state, - "tags": r.tags or [], - "created_at": r.created_at.isoformat() if r.created_at else None, - "updated_at": r.updated_at.isoformat() if r.updated_at else None, - } +__all__ = [ + "router", + "FrameworkResponse", + "ControlResponse", + "ControlCreateRequest", + "ControlUpdateRequest", + "SimilarityCheckRequest", + "SimilarityCheckResponse", + "_control_row", +] diff --git a/backend-compliance/compliance/schemas/canonical_control.py b/backend-compliance/compliance/schemas/canonical_control.py new file mode 100644 index 0000000..01a7ac3 --- /dev/null +++ b/backend-compliance/compliance/schemas/canonical_control.py @@ -0,0 +1,105 @@ +""" +Canonical Control Library schemas. + +Phase 1 Step 4: extracted from ``compliance.api.canonical_control_routes``. +""" + +from typing import Any, Optional + +from pydantic import BaseModel + + +class FrameworkResponse(BaseModel): + id: str + framework_id: str + name: str + version: str + description: Optional[str] = None + owner: Optional[str] = None + policy_version: Optional[str] = None + release_state: str + created_at: str + updated_at: str + + +class ControlResponse(BaseModel): + id: str + framework_id: str + control_id: str + title: str + objective: str + rationale: str + scope: dict[str, Any] + requirements: list[Any] + test_procedure: list[Any] + evidence: list[Any] + severity: str + risk_score: Optional[float] = None + implementation_effort: Optional[str] = None + evidence_confidence: Optional[float] = None + open_anchors: list[Any] + release_state: str + tags: list[Any] + created_at: str + updated_at: str + + +class ControlCreateRequest(BaseModel): + framework_id: str # e.g. 'bp_security_v1' + control_id: str # e.g. 'AUTH-003' + title: str + objective: str + rationale: str + scope: dict[str, Any] = {} + requirements: list[Any] = [] + test_procedure: list[Any] = [] + evidence: list[Any] = [] + severity: str = "medium" + risk_score: Optional[float] = None + implementation_effort: Optional[str] = None + evidence_confidence: Optional[float] = None + open_anchors: list[Any] = [] + release_state: str = "draft" + tags: list[Any] = [] + + +class ControlUpdateRequest(BaseModel): + title: Optional[str] = None + objective: Optional[str] = None + rationale: Optional[str] = None + scope: Optional[dict[str, Any]] = None + requirements: Optional[list[Any]] = None + test_procedure: Optional[list[Any]] = None + evidence: Optional[list[Any]] = None + severity: Optional[str] = None + risk_score: Optional[float] = None + implementation_effort: Optional[str] = None + evidence_confidence: Optional[float] = None + open_anchors: Optional[list[Any]] = None + release_state: Optional[str] = None + tags: Optional[list[Any]] = None + + +class SimilarityCheckRequest(BaseModel): + source_text: str + candidate_text: str + + +class SimilarityCheckResponse(BaseModel): + max_exact_run: int + token_overlap: float + ngram_jaccard: float + embedding_cosine: float + lcs_ratio: float + status: str + details: dict[str, Any] + + +__all__ = [ + "FrameworkResponse", + "ControlResponse", + "ControlCreateRequest", + "ControlUpdateRequest", + "SimilarityCheckRequest", + "SimilarityCheckResponse", +] diff --git a/backend-compliance/compliance/services/canonical_control_service.py b/backend-compliance/compliance/services/canonical_control_service.py new file mode 100644 index 0000000..6a926ba --- /dev/null +++ b/backend-compliance/compliance/services/canonical_control_service.py @@ -0,0 +1,316 @@ +# mypy: disable-error-code="arg-type,assignment,no-any-return,union-attr" +""" +Canonical Control Library service — framework + control CRUD with raw SQL. + +Phase 1 Step 4: extracted from ``compliance.api.canonical_control_routes``. +Uses raw SQL via ``sqlalchemy.text()`` because the underlying tables +(``canonical_control_frameworks``, ``canonical_controls``) have no +SQLAlchemy model in this repo. +""" + +import json +import re +from typing import Any, Optional + +from sqlalchemy import text +from sqlalchemy.orm import Session + +from compliance.domain import ( + ConflictError, + NotFoundError, + ValidationError, +) +from compliance.schemas.canonical_control import ( + ControlCreateRequest, + ControlUpdateRequest, + SimilarityCheckRequest, +) + +_VALID_SEVERITIES = ("low", "medium", "high", "critical") +_CONTROL_ID_RE = re.compile(r"^[A-Z]{2,6}-[0-9]{3}$") +_JSON_CONTROL_FIELDS = { + "scope", "requirements", "test_procedure", "evidence", "open_anchors", "tags", +} + +_CONTROL_COLUMNS = """ + id, framework_id, control_id, title, objective, rationale, + scope, requirements, test_procedure, evidence, + severity, risk_score, implementation_effort, evidence_confidence, + open_anchors, release_state, tags, created_at, updated_at +""" + + +def _control_row(r: Any) -> dict[str, Any]: + """Serialize a canonical_controls SELECT row to a response dict.""" + return { + "id": str(r.id), + "framework_id": str(r.framework_id), + "control_id": r.control_id, + "title": r.title, + "objective": r.objective, + "rationale": r.rationale, + "scope": r.scope, + "requirements": r.requirements, + "test_procedure": r.test_procedure, + "evidence": r.evidence, + "severity": r.severity, + "risk_score": float(r.risk_score) if r.risk_score is not None else None, + "implementation_effort": r.implementation_effort, + "evidence_confidence": ( + float(r.evidence_confidence) if r.evidence_confidence is not None else None + ), + "open_anchors": r.open_anchors, + "release_state": r.release_state, + "tags": r.tags or [], + "created_at": r.created_at.isoformat() if r.created_at else None, + "updated_at": r.updated_at.isoformat() if r.updated_at else None, + } + + +def _framework_row(r: Any) -> dict[str, Any]: + return { + "id": str(r.id), + "framework_id": r.framework_id, + "name": r.name, + "version": r.version, + "description": r.description, + "owner": r.owner, + "policy_version": r.policy_version, + "release_state": r.release_state, + "created_at": r.created_at.isoformat() if r.created_at else None, + "updated_at": r.updated_at.isoformat() if r.updated_at else None, + } + + +def _validate_control_input( + severity: Optional[str], risk_score: Optional[float], control_id: Optional[str] = None +) -> None: + if control_id is not None and not _CONTROL_ID_RE.match(control_id): + raise ValidationError("control_id must match DOMAIN-NNN (e.g. AUTH-001)") + if severity is not None and severity not in _VALID_SEVERITIES: + raise ValidationError("severity must be low/medium/high/critical") + if risk_score is not None and not (0 <= risk_score <= 10): + raise ValidationError("risk_score must be 0..10") + + +class CanonicalControlService: + """Business logic for the canonical control library.""" + + def __init__(self, db: Session) -> None: + self.db = db + + # ------------------------------------------------------------------ + # Frameworks + # ------------------------------------------------------------------ + + def list_frameworks(self) -> list[dict[str, Any]]: + rows = self.db.execute( + text(""" + SELECT id, framework_id, name, version, description, + owner, policy_version, release_state, + created_at, updated_at + FROM canonical_control_frameworks + ORDER BY name + """) + ).fetchall() + return [_framework_row(r) for r in rows] + + def get_framework(self, framework_id: str) -> dict[str, Any]: + row = self.db.execute( + text(""" + SELECT id, framework_id, name, version, description, + owner, policy_version, release_state, + created_at, updated_at + FROM canonical_control_frameworks + WHERE framework_id = :fid + """), + {"fid": framework_id}, + ).fetchone() + if not row: + raise NotFoundError("Framework not found") + return _framework_row(row) + + def list_framework_controls( + self, framework_id: str, severity: Optional[str], release_state: Optional[str] + ) -> list[dict[str, Any]]: + fw = self.db.execute( + text("SELECT id FROM canonical_control_frameworks WHERE framework_id = :fid"), + {"fid": framework_id}, + ).fetchone() + if not fw: + raise NotFoundError("Framework not found") + + query = f"SELECT {_CONTROL_COLUMNS} FROM canonical_controls WHERE framework_id = :fw_id" + params: dict[str, Any] = {"fw_id": str(fw.id)} + if severity: + query += " AND severity = :sev" + params["sev"] = severity + if release_state: + query += " AND release_state = :rs" + params["rs"] = release_state + query += " ORDER BY control_id" + rows = self.db.execute(text(query), params).fetchall() + return [_control_row(r) for r in rows] + + # ------------------------------------------------------------------ + # Controls + # ------------------------------------------------------------------ + + def list_controls( + self, + severity: Optional[str], + domain: Optional[str], + release_state: Optional[str], + ) -> list[dict[str, Any]]: + query = f"SELECT {_CONTROL_COLUMNS} FROM canonical_controls WHERE 1=1" + params: dict[str, Any] = {} + if severity: + query += " AND severity = :sev" + params["sev"] = severity + if domain: + query += " AND LEFT(control_id, LENGTH(:dom)) = :dom" + params["dom"] = domain.upper() + if release_state: + query += " AND release_state = :rs" + params["rs"] = release_state + query += " ORDER BY control_id" + rows = self.db.execute(text(query), params).fetchall() + return [_control_row(r) for r in rows] + + def get_control(self, control_id: str) -> dict[str, Any]: + row = self.db.execute( + text(f"SELECT {_CONTROL_COLUMNS} FROM canonical_controls WHERE control_id = :cid"), + {"cid": control_id.upper()}, + ).fetchone() + if not row: + raise NotFoundError("Control not found") + return _control_row(row) + + def create_control(self, body: ControlCreateRequest) -> dict[str, Any]: + _validate_control_input(body.severity, body.risk_score, body.control_id) + + fw = self.db.execute( + text("SELECT id FROM canonical_control_frameworks WHERE framework_id = :fid"), + {"fid": body.framework_id}, + ).fetchone() + if not fw: + raise NotFoundError(f"Framework '{body.framework_id}' not found") + + existing = self.db.execute( + text( + "SELECT id FROM canonical_controls " + "WHERE framework_id = :fid AND control_id = :cid" + ), + {"fid": str(fw.id), "cid": body.control_id}, + ).fetchone() + if existing: + raise ConflictError(f"Control '{body.control_id}' already exists") + + row = self.db.execute( + text(f""" + INSERT INTO canonical_controls ( + framework_id, control_id, title, objective, rationale, + scope, requirements, test_procedure, evidence, + severity, risk_score, implementation_effort, evidence_confidence, + open_anchors, release_state, tags + ) VALUES ( + :fw_id, :cid, :title, :objective, :rationale, + :scope::jsonb, :requirements::jsonb, :test_procedure::jsonb, :evidence::jsonb, + :severity, :risk_score, :effort, :confidence, + :anchors::jsonb, :release_state, :tags::jsonb + ) + RETURNING {_CONTROL_COLUMNS} + """), + { + "fw_id": str(fw.id), + "cid": body.control_id, + "title": body.title, + "objective": body.objective, + "rationale": body.rationale, + "scope": json.dumps(body.scope), + "requirements": json.dumps(body.requirements), + "test_procedure": json.dumps(body.test_procedure), + "evidence": json.dumps(body.evidence), + "severity": body.severity, + "risk_score": body.risk_score, + "effort": body.implementation_effort, + "confidence": body.evidence_confidence, + "anchors": json.dumps(body.open_anchors), + "release_state": body.release_state, + "tags": json.dumps(body.tags), + }, + ).fetchone() + self.db.commit() + return _control_row(row) + + def update_control( + self, control_id: str, body: ControlUpdateRequest + ) -> dict[str, Any]: + updates = body.dict(exclude_none=True) + if not updates: + raise ValidationError("No fields to update") + + _validate_control_input(updates.get("severity"), updates.get("risk_score")) + + set_parts: list[str] = [] + params: dict[str, Any] = {"cid": control_id.upper()} + for key, val in updates.items(): + if key in _JSON_CONTROL_FIELDS: + set_parts.append(f"{key} = :{key}::jsonb") + params[key] = json.dumps(val) + else: + set_parts.append(f"{key} = :{key}") + params[key] = val + set_parts.append("updated_at = NOW()") + + row = self.db.execute( + text(f""" + UPDATE canonical_controls + SET {', '.join(set_parts)} + WHERE control_id = :cid + RETURNING {_CONTROL_COLUMNS} + """), + params, + ).fetchone() + if not row: + raise NotFoundError("Control not found") + self.db.commit() + return _control_row(row) + + def delete_control(self, control_id: str) -> None: + result: Any = self.db.execute( + text("DELETE FROM canonical_controls WHERE control_id = :cid"), + {"cid": control_id.upper()}, + ) + if result.rowcount == 0: + raise NotFoundError("Control not found") + self.db.commit() + + # ------------------------------------------------------------------ + # Similarity + sources + licenses + # ------------------------------------------------------------------ + + async def similarity_check( + self, control_id: str, body: SimilarityCheckRequest + ) -> dict[str, Any]: + from compliance.services.similarity_detector import check_similarity + + report = await check_similarity(body.source_text, body.candidate_text) + return { + "control_id": control_id.upper(), + "max_exact_run": report.max_exact_run, + "token_overlap": report.token_overlap, + "ngram_jaccard": report.ngram_jaccard, + "embedding_cosine": report.embedding_cosine, + "lcs_ratio": report.lcs_ratio, + "status": report.status, + "details": report.details, + } + + def list_sources(self) -> Any: + from compliance.services.license_gate import get_source_permissions + return get_source_permissions(self.db) + + def list_licenses(self) -> Any: + from compliance.services.license_gate import get_license_matrix + return get_license_matrix(self.db) diff --git a/backend-compliance/mypy.ini b/backend-compliance/mypy.ini index afb4415..b33d54d 100644 --- a/backend-compliance/mypy.ini +++ b/backend-compliance/mypy.ini @@ -81,5 +81,7 @@ ignore_errors = False ignore_errors = False [mypy-compliance.api.vvt_routes] ignore_errors = False +[mypy-compliance.api.canonical_control_routes] +ignore_errors = False [mypy-compliance.api._http_errors] ignore_errors = False diff --git a/backend-compliance/tests/contracts/openapi.baseline.json b/backend-compliance/tests/contracts/openapi.baseline.json index f553ad1..a029444 100644 --- a/backend-compliance/tests/contracts/openapi.baseline.json +++ b/backend-compliance/tests/contracts/openapi.baseline.json @@ -41419,7 +41419,14 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "items": { + "additionalProperties": true, + "type": "object" + }, + "title": "Response List Controls Api Compliance V1 Canonical Controls Get", + "type": "array" + } } }, "description": "Successful Response" @@ -41458,7 +41465,11 @@ "201": { "content": { "application/json": { - "schema": {} + "schema": { + "additionalProperties": true, + "title": "Response Create Control Api Compliance V1 Canonical Controls Post", + "type": "object" + } } }, "description": "Successful Response" @@ -41600,7 +41611,11 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "additionalProperties": true, + "title": "Response Get Control Api Compliance V1 Canonical Controls Control Id Get", + "type": "object" + } } }, "description": "Successful Response" @@ -41650,7 +41665,11 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "additionalProperties": true, + "title": "Response Update Control Api Compliance V1 Canonical Controls Control Id Put", + "type": "object" + } } }, "description": "Successful Response" @@ -41702,7 +41721,11 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "additionalProperties": true, + "title": "Response Similarity Check Api Compliance V1 Canonical Controls Control Id Similarity Check Post", + "type": "object" + } } }, "description": "Successful Response" @@ -41733,7 +41756,14 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "items": { + "additionalProperties": true, + "type": "object" + }, + "title": "Response List Frameworks Api Compliance V1 Canonical Frameworks Get", + "type": "array" + } } }, "description": "Successful Response" @@ -41765,7 +41795,11 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "additionalProperties": true, + "title": "Response Get Framework Api Compliance V1 Canonical Frameworks Framework Id Get", + "type": "object" + } } }, "description": "Successful Response" @@ -41839,7 +41873,14 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "items": { + "additionalProperties": true, + "type": "object" + }, + "title": "Response List Framework Controls Api Compliance V1 Canonical Frameworks Framework Id Controls Get", + "type": "array" + } } }, "description": "Successful Response" @@ -42140,7 +42181,9 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "title": "Response List Licenses Api Compliance V1 Canonical Licenses Get" + } } }, "description": "Successful Response" @@ -42161,7 +42204,9 @@ "200": { "content": { "application/json": { - "schema": {} + "schema": { + "title": "Response List Sources Api Compliance V1 Canonical Sources Get" + } } }, "description": "Successful Response"