Files
breakpilot-compliance/backend-compliance/compliance/api/canonical_control_routes.py
Sharang Parnerkar b850368ec9 refactor(backend/api): extract CanonicalControlService (Step 4 — file 6 of 18)
compliance/api/canonical_control_routes.py (514 LOC) -> 192 LOC thin
routes + 316-line CanonicalControlService + 105-line schemas file.

Canonical Control Library manages OWASP/NIST/ENISA-anchored security
control frameworks and controls. Like company_profile_routes, this file
uses raw SQL via sqlalchemy.text() because there are no SQLAlchemy
models for canonical_control_frameworks or canonical_controls.

Single-service split. Session management moved from bespoke
`with SessionLocal() as db:` blocks to Depends(get_db) for consistency.

Legacy test imports preserved via re-export (FrameworkResponse,
ControlResponse, SimilarityCheckRequest, SimilarityCheckResponse,
_control_row).

Validation extracted to a module-level `_validate_control_input` helper
so both create and update share the same checks. ValidationError (from
compliance.domain) replaces raw HTTPException(400) raises.

Verified:
  - 187/187 pytest (173 core + 14 canonical) pass
  - OpenAPI 360/484 unchanged
  - mypy compliance/ -> Success on 130 source files
  - canonical_control_routes.py 514 -> 192 LOC
  - Hard-cap violations: 13 -> 12

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 19:53:55 +02:00

193 lines
6.7 KiB
Python

"""
FastAPI routes for the Canonical Control Library.
Independently authored security controls anchored in open-source frameworks
(OWASP, NIST, ENISA). No proprietary nomenclature.
Endpoints:
GET /v1/canonical/frameworks - All frameworks
GET /v1/canonical/frameworks/{framework_id} - Framework details
GET /v1/canonical/frameworks/{framework_id}/controls - Framework controls
GET /v1/canonical/controls - All controls
GET /v1/canonical/controls/{control_id} - Single control
POST /v1/canonical/controls - Create
PUT /v1/canonical/controls/{control_id} - Update (partial)
DELETE /v1/canonical/controls/{control_id} - Delete
POST /v1/canonical/controls/{control_id}/similarity-check - Too-close check
GET /v1/canonical/sources - Source registry
GET /v1/canonical/licenses - License matrix
Phase 1 Step 4 refactor: handlers delegate to CanonicalControlService.
"""
from typing import Any, Optional
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from compliance.api._http_errors import translate_domain_errors
from compliance.schemas.canonical_control import (
ControlCreateRequest,
ControlResponse,
ControlUpdateRequest,
FrameworkResponse,
SimilarityCheckRequest,
SimilarityCheckResponse,
)
from compliance.services.canonical_control_service import (
CanonicalControlService,
_control_row, # re-exported for legacy test imports
)
router = APIRouter(prefix="/v1/canonical", tags=["canonical-controls"])
def get_canonical_service(db: Session = Depends(get_db)) -> CanonicalControlService:
return CanonicalControlService(db)
# =============================================================================
# FRAMEWORKS
# =============================================================================
@router.get("/frameworks")
async def list_frameworks(
service: CanonicalControlService = Depends(get_canonical_service),
) -> list[dict[str, Any]]:
"""List all registered control frameworks."""
with translate_domain_errors():
return service.list_frameworks()
@router.get("/frameworks/{framework_id}")
async def get_framework(
framework_id: str,
service: CanonicalControlService = Depends(get_canonical_service),
) -> dict[str, Any]:
"""Get a single framework by its framework_id."""
with translate_domain_errors():
return service.get_framework(framework_id)
@router.get("/frameworks/{framework_id}/controls")
async def list_framework_controls(
framework_id: str,
severity: Optional[str] = Query(None),
release_state: Optional[str] = Query(None),
service: CanonicalControlService = Depends(get_canonical_service),
) -> list[dict[str, Any]]:
"""List controls belonging to a framework."""
with translate_domain_errors():
return service.list_framework_controls(framework_id, severity, release_state)
# =============================================================================
# CONTROLS
# =============================================================================
@router.get("/controls")
async def list_controls(
severity: Optional[str] = Query(None),
domain: Optional[str] = Query(None),
release_state: Optional[str] = Query(None),
service: CanonicalControlService = Depends(get_canonical_service),
) -> list[dict[str, Any]]:
"""List all canonical controls, with optional filters."""
with translate_domain_errors():
return service.list_controls(severity, domain, release_state)
@router.get("/controls/{control_id}")
async def get_control(
control_id: str,
service: CanonicalControlService = Depends(get_canonical_service),
) -> dict[str, Any]:
"""Get a single canonical control by its control_id (e.g. AUTH-001)."""
with translate_domain_errors():
return service.get_control(control_id)
@router.post("/controls", status_code=201)
async def create_control(
body: ControlCreateRequest,
service: CanonicalControlService = Depends(get_canonical_service),
) -> dict[str, Any]:
"""Create a new canonical control."""
with translate_domain_errors():
return service.create_control(body)
@router.put("/controls/{control_id}")
async def update_control(
control_id: str,
body: ControlUpdateRequest,
service: CanonicalControlService = Depends(get_canonical_service),
) -> dict[str, Any]:
"""Update an existing canonical control (partial update)."""
with translate_domain_errors():
return service.update_control(control_id, body)
@router.delete("/controls/{control_id}", status_code=204)
async def delete_control(
control_id: str,
service: CanonicalControlService = Depends(get_canonical_service),
) -> None:
"""Delete a canonical control."""
with translate_domain_errors():
service.delete_control(control_id)
# =============================================================================
# SIMILARITY CHECK
# =============================================================================
@router.post("/controls/{control_id}/similarity-check")
async def similarity_check(
control_id: str,
body: SimilarityCheckRequest,
service: CanonicalControlService = Depends(get_canonical_service),
) -> dict[str, Any]:
"""Run the too-close detector against a source/candidate text pair."""
with translate_domain_errors():
return await service.similarity_check(control_id, body)
# =============================================================================
# SOURCES & LICENSES
# =============================================================================
@router.get("/sources")
async def list_sources(
service: CanonicalControlService = Depends(get_canonical_service),
) -> Any:
"""List all registered sources with permission flags."""
with translate_domain_errors():
return service.list_sources()
@router.get("/licenses")
async def list_licenses(
service: CanonicalControlService = Depends(get_canonical_service),
) -> Any:
"""Return the license matrix."""
with translate_domain_errors():
return service.list_licenses()
# ----------------------------------------------------------------------------
# Legacy re-exports for tests that imported schemas/helpers directly.
# ----------------------------------------------------------------------------
__all__ = [
"router",
"FrameworkResponse",
"ControlResponse",
"ControlCreateRequest",
"ControlUpdateRequest",
"SimilarityCheckRequest",
"SimilarityCheckResponse",
"_control_row",
]