The previous commit (32e121f) left isms_assessment_service.py at 639 LOC,
exceeding the 500-line hard cap. This follow-up extracts ReadinessCheckService
and OverviewService into a new isms_readiness_service.py (400 LOC), leaving
isms_assessment_service.py at 257 LOC (Management Reviews, Internal Audits,
Audit Trail only).
Updated isms_routes.py imports to reference the new service file.
File sizes after split:
- isms_routes.py: 446 LOC (thin handlers)
- isms_governance_service.py: 416 LOC (scope, context, policy, objectives, SoA)
- isms_findings_service.py: 276 LOC (findings, CAPA)
- isms_assessment_service.py: 257 LOC (mgmt reviews, internal audits, audit trail)
- isms_readiness_service.py: 400 LOC (readiness check, ISO 27001 overview)
All 58 integration tests + 173 unit/contract tests pass.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
446 lines
19 KiB
Python
446 lines
19 KiB
Python
"""
|
|
ISO 27001 ISMS API Routes — thin handlers.
|
|
|
|
Phase 1 Step 4: business logic extracted to:
|
|
- ``compliance.services.isms_governance_service`` (Scope, Context, Policy, Objectives, SoA)
|
|
- ``compliance.services.isms_findings_service`` (Findings, CAPA)
|
|
- ``compliance.services.isms_assessment_service`` (Reviews, Audits, Readiness, Trail, Overview)
|
|
"""
|
|
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, HTTPException, Query, Depends
|
|
from sqlalchemy.orm import Session
|
|
|
|
from .schemas import (
|
|
# Scope
|
|
ISMSScopeCreate, ISMSScopeUpdate, ISMSScopeResponse, ISMSScopeApproveRequest,
|
|
# Context
|
|
ISMSContextCreate, ISMSContextResponse,
|
|
# Policies
|
|
ISMSPolicyCreate, ISMSPolicyUpdate, ISMSPolicyResponse, ISMSPolicyListResponse,
|
|
ISMSPolicyApproveRequest,
|
|
# Objectives
|
|
SecurityObjectiveCreate, SecurityObjectiveUpdate, SecurityObjectiveResponse,
|
|
SecurityObjectiveListResponse,
|
|
# SoA
|
|
SoAEntryCreate, SoAEntryUpdate, SoAEntryResponse, SoAListResponse, SoAApproveRequest,
|
|
# Findings
|
|
AuditFindingCreate, AuditFindingUpdate, AuditFindingResponse,
|
|
AuditFindingListResponse, AuditFindingCloseRequest,
|
|
# CAPA
|
|
CorrectiveActionCreate, CorrectiveActionUpdate, CorrectiveActionResponse,
|
|
CorrectiveActionListResponse, CAPAVerifyRequest,
|
|
# Management Review
|
|
ManagementReviewCreate, ManagementReviewUpdate, ManagementReviewResponse,
|
|
ManagementReviewListResponse, ManagementReviewApproveRequest,
|
|
# Internal Audit
|
|
InternalAuditCreate, InternalAuditUpdate, InternalAuditResponse,
|
|
InternalAuditListResponse, InternalAuditCompleteRequest,
|
|
# Readiness
|
|
ISMSReadinessCheckResponse, ISMSReadinessCheckRequest,
|
|
# Audit Trail
|
|
AuditTrailResponse,
|
|
# Overview
|
|
ISO27001OverviewResponse,
|
|
)
|
|
|
|
from classroom_engine.database import get_db
|
|
from compliance.domain import NotFoundError, ConflictError, ValidationError
|
|
|
|
# Services
|
|
from compliance.services.isms_governance_service import (
|
|
ISMSScopeService, ISMSContextService, ISMSPolicyService,
|
|
SecurityObjectiveService, SoAService,
|
|
# Re-export helpers for legacy test imports
|
|
generate_id, create_signature, log_audit_trail,
|
|
)
|
|
from compliance.services.isms_findings_service import AuditFindingService, CAPAService
|
|
from compliance.services.isms_assessment_service import (
|
|
ManagementReviewService, InternalAuditService, AuditTrailService,
|
|
)
|
|
from compliance.services.isms_readiness_service import ReadinessCheckService, OverviewService
|
|
|
|
router = APIRouter(prefix="/isms", tags=["ISMS"])
|
|
|
|
|
|
# ============================================================================
|
|
# Error mapping
|
|
# ============================================================================
|
|
|
|
def _handle(func, *args, **kwargs): # type: ignore[no-untyped-def]
|
|
"""Call *func* and translate domain errors to HTTP exceptions."""
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except NotFoundError as exc:
|
|
raise HTTPException(status_code=404, detail=str(exc))
|
|
except ConflictError as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
except ValidationError as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc))
|
|
|
|
|
|
# ============================================================================
|
|
# ISMS Scope (ISO 27001 4.3)
|
|
# ============================================================================
|
|
|
|
@router.get("/scope", response_model=ISMSScopeResponse)
|
|
async def get_isms_scope(db: Session = Depends(get_db)):
|
|
"""Get the current ISMS scope."""
|
|
return _handle(ISMSScopeService.get_current, db)
|
|
|
|
|
|
@router.post("/scope", response_model=ISMSScopeResponse)
|
|
async def create_isms_scope(
|
|
data: ISMSScopeCreate,
|
|
created_by: str = Query(..., description="User creating the scope"),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Create a new ISMS scope definition. Supersedes any existing scope."""
|
|
return _handle(ISMSScopeService.create, db, data.model_dump(), created_by)
|
|
|
|
|
|
@router.put("/scope/{scope_id}", response_model=ISMSScopeResponse)
|
|
async def update_isms_scope(
|
|
scope_id: str,
|
|
data: ISMSScopeUpdate,
|
|
updated_by: str = Query(..., description="User updating the scope"),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Update ISMS scope (only if in draft status)."""
|
|
return _handle(ISMSScopeService.update, db, scope_id, data.model_dump(exclude_unset=True), updated_by)
|
|
|
|
|
|
@router.post("/scope/{scope_id}/approve", response_model=ISMSScopeResponse)
|
|
async def approve_isms_scope(scope_id: str, data: ISMSScopeApproveRequest, db: Session = Depends(get_db)):
|
|
"""Approve the ISMS scope. Must be approved by top management."""
|
|
return _handle(ISMSScopeService.approve, db, scope_id, data.approved_by, data.effective_date, data.review_date)
|
|
|
|
|
|
# ============================================================================
|
|
# ISMS Context (ISO 27001 4.1, 4.2)
|
|
# ============================================================================
|
|
|
|
@router.get("/context", response_model=ISMSContextResponse)
|
|
async def get_isms_context(db: Session = Depends(get_db)):
|
|
"""Get the current ISMS context analysis."""
|
|
return _handle(ISMSContextService.get_current, db)
|
|
|
|
|
|
@router.post("/context", response_model=ISMSContextResponse)
|
|
async def create_isms_context(data: ISMSContextCreate, created_by: str = Query(...), db: Session = Depends(get_db)):
|
|
"""Create or update ISMS context analysis."""
|
|
raw = data.model_dump()
|
|
raw["internal_issues"] = [i.model_dump() for i in data.internal_issues] if data.internal_issues else None
|
|
raw["external_issues"] = [i.model_dump() for i in data.external_issues] if data.external_issues else None
|
|
raw["interested_parties"] = [p.model_dump() for p in data.interested_parties] if data.interested_parties else None
|
|
return _handle(ISMSContextService.create, db, raw, created_by)
|
|
|
|
|
|
# ============================================================================
|
|
# ISMS Policies (ISO 27001 5.2)
|
|
# ============================================================================
|
|
|
|
@router.get("/policies", response_model=ISMSPolicyListResponse)
|
|
async def list_policies(
|
|
policy_type: Optional[str] = None, status: Optional[str] = None, db: Session = Depends(get_db),
|
|
):
|
|
"""List all ISMS policies."""
|
|
policies, total = _handle(ISMSPolicyService.list_policies, db, policy_type, status)
|
|
return ISMSPolicyListResponse(policies=policies, total=total)
|
|
|
|
|
|
@router.post("/policies", response_model=ISMSPolicyResponse)
|
|
async def create_policy(data: ISMSPolicyCreate, db: Session = Depends(get_db)):
|
|
"""Create a new ISMS policy."""
|
|
return _handle(ISMSPolicyService.create, db, data.model_dump())
|
|
|
|
|
|
@router.get("/policies/{policy_id}", response_model=ISMSPolicyResponse)
|
|
async def get_policy(policy_id: str, db: Session = Depends(get_db)):
|
|
"""Get a specific policy by ID."""
|
|
return _handle(ISMSPolicyService.get, db, policy_id)
|
|
|
|
|
|
@router.put("/policies/{policy_id}", response_model=ISMSPolicyResponse)
|
|
async def update_policy(
|
|
policy_id: str, data: ISMSPolicyUpdate, updated_by: str = Query(...), db: Session = Depends(get_db),
|
|
):
|
|
"""Update a policy (creates new version if approved)."""
|
|
return _handle(ISMSPolicyService.update, db, policy_id, data.model_dump(exclude_unset=True), updated_by)
|
|
|
|
|
|
@router.post("/policies/{policy_id}/approve", response_model=ISMSPolicyResponse)
|
|
async def approve_policy(policy_id: str, data: ISMSPolicyApproveRequest, db: Session = Depends(get_db)):
|
|
"""Approve a policy. Must be approved by top management."""
|
|
return _handle(ISMSPolicyService.approve, db, policy_id, data.reviewed_by, data.approved_by, data.effective_date)
|
|
|
|
|
|
# ============================================================================
|
|
# Security Objectives (ISO 27001 6.2)
|
|
# ============================================================================
|
|
|
|
@router.get("/objectives", response_model=SecurityObjectiveListResponse)
|
|
async def list_objectives(
|
|
category: Optional[str] = None, status: Optional[str] = None, db: Session = Depends(get_db),
|
|
):
|
|
"""List all security objectives."""
|
|
objectives, total = _handle(SecurityObjectiveService.list_objectives, db, category, status)
|
|
return SecurityObjectiveListResponse(objectives=objectives, total=total)
|
|
|
|
|
|
@router.post("/objectives", response_model=SecurityObjectiveResponse)
|
|
async def create_objective(data: SecurityObjectiveCreate, created_by: str = Query(...), db: Session = Depends(get_db)):
|
|
"""Create a new security objective."""
|
|
return _handle(SecurityObjectiveService.create, db, data.model_dump(), created_by)
|
|
|
|
|
|
@router.put("/objectives/{objective_id}", response_model=SecurityObjectiveResponse)
|
|
async def update_objective(
|
|
objective_id: str, data: SecurityObjectiveUpdate, updated_by: str = Query(...), db: Session = Depends(get_db),
|
|
):
|
|
"""Update a security objective's progress."""
|
|
return _handle(SecurityObjectiveService.update, db, objective_id, data.model_dump(exclude_unset=True), updated_by)
|
|
|
|
|
|
# ============================================================================
|
|
# Statement of Applicability (SoA)
|
|
# ============================================================================
|
|
|
|
@router.get("/soa", response_model=SoAListResponse)
|
|
async def list_soa_entries(
|
|
is_applicable: Optional[bool] = None,
|
|
implementation_status: Optional[str] = None,
|
|
category: Optional[str] = None,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""List all Statement of Applicability entries."""
|
|
return _handle(SoAService.list_entries, db, is_applicable, implementation_status, category)
|
|
|
|
|
|
@router.post("/soa", response_model=SoAEntryResponse)
|
|
async def create_soa_entry(data: SoAEntryCreate, created_by: str = Query(...), db: Session = Depends(get_db)):
|
|
"""Create a new SoA entry for an Annex A control."""
|
|
return _handle(SoAService.create, db, data.model_dump(), created_by)
|
|
|
|
|
|
@router.put("/soa/{entry_id}", response_model=SoAEntryResponse)
|
|
async def update_soa_entry(
|
|
entry_id: str, data: SoAEntryUpdate, updated_by: str = Query(...), db: Session = Depends(get_db),
|
|
):
|
|
"""Update an SoA entry."""
|
|
return _handle(SoAService.update, db, entry_id, data.model_dump(exclude_unset=True), updated_by)
|
|
|
|
|
|
@router.post("/soa/{entry_id}/approve", response_model=SoAEntryResponse)
|
|
async def approve_soa_entry(entry_id: str, data: SoAApproveRequest, db: Session = Depends(get_db)):
|
|
"""Approve an SoA entry."""
|
|
return _handle(SoAService.approve, db, entry_id, data.reviewed_by, data.approved_by)
|
|
|
|
|
|
# ============================================================================
|
|
# Audit Findings
|
|
# ============================================================================
|
|
|
|
@router.get("/findings", response_model=AuditFindingListResponse)
|
|
async def list_findings(
|
|
finding_type: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
internal_audit_id: Optional[str] = None,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""List all audit findings."""
|
|
return _handle(AuditFindingService.list_findings, db, finding_type, status, internal_audit_id)
|
|
|
|
|
|
@router.post("/findings", response_model=AuditFindingResponse)
|
|
async def create_finding(data: AuditFindingCreate, db: Session = Depends(get_db)):
|
|
"""Create a new audit finding."""
|
|
return _handle(AuditFindingService.create, db, data.model_dump())
|
|
|
|
|
|
@router.put("/findings/{finding_id}", response_model=AuditFindingResponse)
|
|
async def update_finding(
|
|
finding_id: str, data: AuditFindingUpdate, updated_by: str = Query(...), db: Session = Depends(get_db),
|
|
):
|
|
"""Update an audit finding."""
|
|
return _handle(AuditFindingService.update, db, finding_id, data.model_dump(exclude_unset=True), updated_by)
|
|
|
|
|
|
@router.post("/findings/{finding_id}/close", response_model=AuditFindingResponse)
|
|
async def close_finding(finding_id: str, data: AuditFindingCloseRequest, db: Session = Depends(get_db)):
|
|
"""Close an audit finding after verification."""
|
|
return _handle(
|
|
AuditFindingService.close, db, finding_id,
|
|
data.closure_notes, data.closed_by, data.verification_method, data.verification_evidence,
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# Corrective Actions (CAPA)
|
|
# ============================================================================
|
|
|
|
@router.get("/capa", response_model=CorrectiveActionListResponse)
|
|
async def list_capas(
|
|
finding_id: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
assigned_to: Optional[str] = None,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""List all corrective/preventive actions."""
|
|
actions, total = _handle(CAPAService.list_capas, db, finding_id, status, assigned_to)
|
|
return CorrectiveActionListResponse(actions=actions, total=total)
|
|
|
|
|
|
@router.post("/capa", response_model=CorrectiveActionResponse)
|
|
async def create_capa(data: CorrectiveActionCreate, created_by: str = Query(...), db: Session = Depends(get_db)):
|
|
"""Create a new corrective/preventive action for a finding."""
|
|
return _handle(CAPAService.create, db, data.model_dump(), created_by)
|
|
|
|
|
|
@router.put("/capa/{capa_id}", response_model=CorrectiveActionResponse)
|
|
async def update_capa(
|
|
capa_id: str, data: CorrectiveActionUpdate, updated_by: str = Query(...), db: Session = Depends(get_db),
|
|
):
|
|
"""Update a CAPA's progress."""
|
|
return _handle(CAPAService.update, db, capa_id, data.model_dump(exclude_unset=True), updated_by)
|
|
|
|
|
|
@router.post("/capa/{capa_id}/verify", response_model=CorrectiveActionResponse)
|
|
async def verify_capa(capa_id: str, data: CAPAVerifyRequest, db: Session = Depends(get_db)):
|
|
"""Verify the effectiveness of a CAPA."""
|
|
return _handle(CAPAService.verify, db, capa_id, data.verified_by, data.is_effective, data.effectiveness_notes)
|
|
|
|
|
|
# ============================================================================
|
|
# Management Reviews (ISO 27001 9.3)
|
|
# ============================================================================
|
|
|
|
@router.get("/management-reviews", response_model=ManagementReviewListResponse)
|
|
async def list_management_reviews(status: Optional[str] = None, db: Session = Depends(get_db)):
|
|
"""List all management reviews."""
|
|
reviews, total = _handle(ManagementReviewService.list_reviews, db, status)
|
|
return ManagementReviewListResponse(reviews=reviews, total=total)
|
|
|
|
|
|
@router.post("/management-reviews", response_model=ManagementReviewResponse)
|
|
async def create_management_review(
|
|
data: ManagementReviewCreate, created_by: str = Query(...), db: Session = Depends(get_db),
|
|
):
|
|
"""Create a new management review."""
|
|
raw = data.model_dump()
|
|
raw["attendees"] = data.attendees # Keep as pydantic objects for model_dump() in service
|
|
return _handle(ManagementReviewService.create, db, raw, created_by)
|
|
|
|
|
|
@router.get("/management-reviews/{review_id}", response_model=ManagementReviewResponse)
|
|
async def get_management_review(review_id: str, db: Session = Depends(get_db)):
|
|
"""Get a specific management review."""
|
|
return _handle(ManagementReviewService.get, db, review_id)
|
|
|
|
|
|
@router.put("/management-reviews/{review_id}", response_model=ManagementReviewResponse)
|
|
async def update_management_review(
|
|
review_id: str, data: ManagementReviewUpdate, updated_by: str = Query(...), db: Session = Depends(get_db),
|
|
):
|
|
"""Update a management review with inputs/outputs."""
|
|
raw = data.model_dump(exclude_unset=True)
|
|
if "action_items" in raw and data.action_items is not None:
|
|
raw["action_items"] = data.action_items # Keep pydantic objects for service
|
|
return _handle(ManagementReviewService.update, db, review_id, raw, updated_by)
|
|
|
|
|
|
@router.post("/management-reviews/{review_id}/approve", response_model=ManagementReviewResponse)
|
|
async def approve_management_review(
|
|
review_id: str, data: ManagementReviewApproveRequest, db: Session = Depends(get_db),
|
|
):
|
|
"""Approve a management review."""
|
|
return _handle(
|
|
ManagementReviewService.approve, db, review_id,
|
|
data.approved_by, data.next_review_date, data.minutes_document_path,
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# Internal Audits (ISO 27001 9.2)
|
|
# ============================================================================
|
|
|
|
@router.get("/internal-audits", response_model=InternalAuditListResponse)
|
|
async def list_internal_audits(
|
|
status: Optional[str] = None, audit_type: Optional[str] = None, db: Session = Depends(get_db),
|
|
):
|
|
"""List all internal audits."""
|
|
audits, total = _handle(InternalAuditService.list_audits, db, status, audit_type)
|
|
return InternalAuditListResponse(audits=audits, total=total)
|
|
|
|
|
|
@router.post("/internal-audits", response_model=InternalAuditResponse)
|
|
async def create_internal_audit(
|
|
data: InternalAuditCreate, created_by: str = Query(...), db: Session = Depends(get_db),
|
|
):
|
|
"""Create a new internal audit."""
|
|
return _handle(InternalAuditService.create, db, data.model_dump(), created_by)
|
|
|
|
|
|
@router.put("/internal-audits/{audit_id}", response_model=InternalAuditResponse)
|
|
async def update_internal_audit(
|
|
audit_id: str, data: InternalAuditUpdate, updated_by: str = Query(...), db: Session = Depends(get_db),
|
|
):
|
|
"""Update an internal audit."""
|
|
return _handle(InternalAuditService.update, db, audit_id, data.model_dump(exclude_unset=True), updated_by)
|
|
|
|
|
|
@router.post("/internal-audits/{audit_id}/complete", response_model=InternalAuditResponse)
|
|
async def complete_internal_audit(
|
|
audit_id: str, data: InternalAuditCompleteRequest, completed_by: str = Query(...), db: Session = Depends(get_db),
|
|
):
|
|
"""Complete an internal audit with conclusion."""
|
|
return _handle(
|
|
InternalAuditService.complete, db, audit_id,
|
|
data.audit_conclusion, data.overall_assessment, data.follow_up_audit_required, completed_by,
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# ISMS Readiness Check
|
|
# ============================================================================
|
|
|
|
@router.post("/readiness-check", response_model=ISMSReadinessCheckResponse)
|
|
async def run_readiness_check(data: ISMSReadinessCheckRequest, db: Session = Depends(get_db)):
|
|
"""Run ISMS readiness check before external audit."""
|
|
return _handle(ReadinessCheckService.run, db, data.triggered_by)
|
|
|
|
|
|
@router.get("/readiness-check/latest", response_model=ISMSReadinessCheckResponse)
|
|
async def get_latest_readiness_check(db: Session = Depends(get_db)):
|
|
"""Get the most recent readiness check result."""
|
|
return _handle(ReadinessCheckService.get_latest, db)
|
|
|
|
|
|
# ============================================================================
|
|
# Audit Trail
|
|
# ============================================================================
|
|
|
|
@router.get("/audit-trail", response_model=AuditTrailResponse)
|
|
async def get_audit_trail(
|
|
entity_type: Optional[str] = None,
|
|
entity_id: Optional[str] = None,
|
|
performed_by: Optional[str] = None,
|
|
action: Optional[str] = None,
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(50, ge=1, le=200),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""Query the audit trail with filters."""
|
|
return _handle(AuditTrailService.query, db, entity_type, entity_id, performed_by, action, page, page_size)
|
|
|
|
|
|
# ============================================================================
|
|
# ISO 27001 Overview
|
|
# ============================================================================
|
|
|
|
@router.get("/overview", response_model=ISO27001OverviewResponse)
|
|
async def get_iso27001_overview(db: Session = Depends(get_db)):
|
|
"""Get complete ISO 27001 compliance overview."""
|
|
return _handle(OverviewService.get_overview, db)
|