Files
breakpilot-compliance/backend-compliance/compliance/api/isms_routes.py
Sharang Parnerkar 3320ef94fc refactor: phase 0 guardrails + phase 1 step 2 (models.py split)
Squash of branch refactor/phase0-guardrails-and-models-split — 4 commits,
81 files, 173/173 pytest green, OpenAPI contract preserved (360 paths /
484 operations).

## Phase 0 — Architecture guardrails

Three defense-in-depth layers to keep the architecture rules enforced
regardless of who opens Claude Code in this repo:

  1. .claude/settings.json PreToolUse hook on Write/Edit blocks any file
     that would exceed the 500-line hard cap. Auto-loads in every Claude
     session in this repo.
  2. scripts/githooks/pre-commit (install via scripts/install-hooks.sh)
     enforces the LOC cap locally, freezes migrations/ without
     [migration-approved], and protects guardrail files without
     [guardrail-change].
  3. .gitea/workflows/ci.yaml gains loc-budget + guardrail-integrity +
     sbom-scan (syft+grype) jobs, adds mypy --strict for the new Python
     packages (compliance/{services,repositories,domain,schemas}), and
     tsc --noEmit for admin-compliance + developer-portal.

Per-language conventions documented in AGENTS.python.md, AGENTS.go.md,
AGENTS.typescript.md at the repo root — layering, tooling, and explicit
"what you may NOT do" lists. Root CLAUDE.md is prepended with the six
non-negotiable rules. Each of the 10 services gets a README.md.

scripts/check-loc.sh enforces soft 300 / hard 500 and surfaces the
current baseline of 205 hard + 161 soft violations so Phases 1-4 can
drain it incrementally. CI gates only CHANGED files in PRs so the
legacy baseline does not block unrelated work.

## Deprecation sweep

47 files. Pydantic V1 regex= -> pattern= (2 sites), class Config ->
ConfigDict in source_policy_router.py (schemas.py intentionally skipped;
it is the Phase 1 Step 3 split target). datetime.utcnow() ->
datetime.now(timezone.utc) everywhere including SQLAlchemy default=
callables. All DB columns already declare timezone=True, so this is a
latent-bug fix at the Python side, not a schema change.

DeprecationWarning count dropped from 158 to 35.

## Phase 1 Step 1 — Contract test harness

tests/contracts/test_openapi_baseline.py diffs the live FastAPI /openapi.json
against tests/contracts/openapi.baseline.json on every test run. Fails on
removed paths, removed status codes, or new required request body fields.
Regenerate only via tests/contracts/regenerate_baseline.py after a
consumer-updated contract change. This is the safety harness for all
subsequent refactor commits.

## Phase 1 Step 2 — models.py split (1466 -> 85 LOC shim)

compliance/db/models.py is decomposed into seven sibling aggregate modules
following the existing repo pattern (dsr_models.py, vvt_models.py, ...):

  regulation_models.py       (134) — Regulation, Requirement
  control_models.py          (279) — Control, Mapping, Evidence, Risk
  ai_system_models.py        (141) — AISystem, AuditExport
  service_module_models.py   (176) — ServiceModule, ModuleRegulation, ModuleRisk
  audit_session_models.py    (177) — AuditSession, AuditSignOff
  isms_governance_models.py  (323) — ISMSScope, Context, Policy, Objective, SoA
  isms_audit_models.py       (468) — Finding, CAPA, MgmtReview, InternalAudit,
                                     AuditTrail, Readiness

models.py becomes an 85-line re-export shim in dependency order so
existing imports continue to work unchanged. Schema is byte-identical:
__tablename__, column definitions, relationship strings, back_populates,
cascade directives all preserved.

All new sibling files are under the 500-line hard cap; largest is
isms_audit_models.py at 468. No file in compliance/db/ now exceeds
the hard cap.

## Phase 1 Step 3 — infrastructure only

backend-compliance/compliance/{schemas,domain,repositories}/ packages
are created as landing zones with docstrings. compliance/domain/
exports DomainError / NotFoundError / ConflictError / ValidationError /
PermissionError — the base classes services will use to raise
domain-level errors instead of HTTPException.

PHASE1_RUNBOOK.md at backend-compliance/PHASE1_RUNBOOK.md documents
the nine-step execution plan for Phase 1: snapshot baseline,
characterization tests, split models.py (this commit), split schemas.py
(next), extract services, extract repositories, mypy --strict, coverage.

## Verification

  backend-compliance/.venv-phase1: uv python install 3.12 + pip -r requirements.txt
  PYTHONPATH=. pytest compliance/tests/ tests/contracts/
  -> 173 passed, 0 failed, 35 warnings, OpenAPI 360/484 unchanged

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 13:18:29 +02:00

1677 lines
58 KiB
Python

"""
ISO 27001 ISMS API Routes
Provides endpoints for ISO 27001 certification-ready ISMS management:
- Scope & Context (Kapitel 4)
- Policies & Objectives (Kapitel 5, 6)
- Statement of Applicability (SoA)
- Audit Findings & CAPA (Kapitel 9, 10)
- Management Reviews (Kapitel 9.3)
- Internal Audits (Kapitel 9.2)
- ISMS Readiness Check
"""
import uuid
import hashlib
from datetime import datetime, date, timezone
from typing import Optional
from fastapi import APIRouter, HTTPException, Query, Depends
from sqlalchemy.orm import Session
from ..db.models import (
ISMSScopeDB, ISMSContextDB, ISMSPolicyDB, SecurityObjectiveDB,
StatementOfApplicabilityDB, AuditFindingDB, CorrectiveActionDB,
ManagementReviewDB, InternalAuditDB, AuditTrailDB, ISMSReadinessCheckDB,
ApprovalStatusEnum, FindingTypeEnum, FindingStatusEnum, CAPATypeEnum
)
from .schemas import (
# Scope
ISMSScopeCreate, ISMSScopeUpdate, ISMSScopeResponse, ISMSScopeApproveRequest,
# Context
ISMSContextCreate, ISMSContextResponse,
# Policies
ISMSPolicyCreate, ISMSPolicyUpdate, ISMSPolicyResponse, ISMSPolicyListResponse,
ISMSPolicyApproveRequest,
# Objectives
SecurityObjectiveCreate, SecurityObjectiveUpdate, SecurityObjectiveResponse,
SecurityObjectiveListResponse,
# SoA
SoAEntryCreate, SoAEntryUpdate, SoAEntryResponse, SoAListResponse, SoAApproveRequest,
# Findings
AuditFindingCreate, AuditFindingUpdate, AuditFindingResponse,
AuditFindingListResponse, AuditFindingCloseRequest,
# CAPA
CorrectiveActionCreate, CorrectiveActionUpdate, CorrectiveActionResponse,
CorrectiveActionListResponse, CAPAVerifyRequest,
# Management Review
ManagementReviewCreate, ManagementReviewUpdate, ManagementReviewResponse,
ManagementReviewListResponse, ManagementReviewApproveRequest,
# Internal Audit
InternalAuditCreate, InternalAuditUpdate, InternalAuditResponse,
InternalAuditListResponse, InternalAuditCompleteRequest,
# Readiness
ISMSReadinessCheckResponse, ISMSReadinessCheckRequest, PotentialFinding,
# Audit Trail
AuditTrailResponse, PaginationMeta,
# Overview
ISO27001OverviewResponse, ISO27001ChapterStatus
)
# Import database session dependency
from classroom_engine.database import get_db
router = APIRouter(prefix="/isms", tags=["ISMS"])
# =============================================================================
# Helper Functions
# =============================================================================
def generate_id() -> str:
"""Generate a UUID string."""
return str(uuid.uuid4())
def create_signature(data: str) -> str:
"""Create SHA-256 signature."""
return hashlib.sha256(data.encode()).hexdigest()
def log_audit_trail(
db: Session,
entity_type: str,
entity_id: str,
entity_name: str,
action: str,
performed_by: str,
field_changed: str = None,
old_value: str = None,
new_value: str = None,
change_summary: str = None
):
"""Log an entry to the audit trail."""
trail = AuditTrailDB(
id=generate_id(),
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
action=action,
field_changed=field_changed,
old_value=old_value,
new_value=new_value,
change_summary=change_summary,
performed_by=performed_by,
performed_at=datetime.now(timezone.utc),
checksum=create_signature(f"{entity_type}|{entity_id}|{action}|{performed_by}")
)
db.add(trail)
# =============================================================================
# ISMS SCOPE (ISO 27001 4.3)
# =============================================================================
@router.get("/scope", response_model=ISMSScopeResponse)
async def get_isms_scope(db: Session = Depends(get_db)):
"""
Get the current ISMS scope.
The scope defines the boundaries and applicability of the ISMS.
Only one active scope should exist at a time.
"""
scope = db.query(ISMSScopeDB).filter(
ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED
).order_by(ISMSScopeDB.created_at.desc()).first()
if not scope:
raise HTTPException(status_code=404, detail="No ISMS scope defined yet")
return scope
@router.post("/scope", response_model=ISMSScopeResponse)
async def create_isms_scope(
data: ISMSScopeCreate,
created_by: str = Query(..., description="User creating the scope"),
db: Session = Depends(get_db)
):
"""
Create a new ISMS scope definition.
Supersedes any existing scope.
"""
# Supersede existing scopes
existing = db.query(ISMSScopeDB).filter(
ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED
).all()
for s in existing:
s.status = ApprovalStatusEnum.SUPERSEDED
scope = ISMSScopeDB(
id=generate_id(),
scope_statement=data.scope_statement,
included_locations=data.included_locations,
included_processes=data.included_processes,
included_services=data.included_services,
excluded_items=data.excluded_items,
exclusion_justification=data.exclusion_justification,
organizational_boundary=data.organizational_boundary,
physical_boundary=data.physical_boundary,
technical_boundary=data.technical_boundary,
status=ApprovalStatusEnum.DRAFT,
created_by=created_by
)
db.add(scope)
log_audit_trail(db, "isms_scope", scope.id, "ISMS Scope", "create", created_by)
db.commit()
db.refresh(scope)
return scope
@router.put("/scope/{scope_id}", response_model=ISMSScopeResponse)
async def update_isms_scope(
scope_id: str,
data: ISMSScopeUpdate,
updated_by: str = Query(..., description="User updating the scope"),
db: Session = Depends(get_db)
):
"""Update ISMS scope (only if in draft status)."""
scope = db.query(ISMSScopeDB).filter(ISMSScopeDB.id == scope_id).first()
if not scope:
raise HTTPException(status_code=404, detail="Scope not found")
if scope.status == ApprovalStatusEnum.APPROVED:
raise HTTPException(status_code=400, detail="Cannot modify approved scope. Create new version.")
for field, value in data.model_dump(exclude_unset=True).items():
setattr(scope, field, value)
scope.updated_by = updated_by
scope.updated_at = datetime.now(timezone.utc)
# Increment version if significant changes
version_parts = scope.version.split(".")
scope.version = f"{version_parts[0]}.{int(version_parts[1]) + 1}"
log_audit_trail(db, "isms_scope", scope.id, "ISMS Scope", "update", updated_by)
db.commit()
db.refresh(scope)
return scope
@router.post("/scope/{scope_id}/approve", response_model=ISMSScopeResponse)
async def approve_isms_scope(
scope_id: str,
data: ISMSScopeApproveRequest,
db: Session = Depends(get_db)
):
"""
Approve the ISMS scope.
This is a MANDATORY step for ISO 27001 certification.
Must be approved by top management.
"""
scope = db.query(ISMSScopeDB).filter(ISMSScopeDB.id == scope_id).first()
if not scope:
raise HTTPException(status_code=404, detail="Scope not found")
scope.status = ApprovalStatusEnum.APPROVED
scope.approved_by = data.approved_by
scope.approved_at = datetime.now(timezone.utc)
scope.effective_date = data.effective_date
scope.review_date = data.review_date
scope.approval_signature = create_signature(
f"{scope.scope_statement}|{data.approved_by}|{datetime.now(timezone.utc).isoformat()}"
)
log_audit_trail(db, "isms_scope", scope.id, "ISMS Scope", "approve", data.approved_by)
db.commit()
db.refresh(scope)
return scope
# =============================================================================
# ISMS CONTEXT (ISO 27001 4.1, 4.2)
# =============================================================================
@router.get("/context", response_model=ISMSContextResponse)
async def get_isms_context(db: Session = Depends(get_db)):
"""Get the current ISMS context analysis."""
context = db.query(ISMSContextDB).filter(
ISMSContextDB.status != ApprovalStatusEnum.SUPERSEDED
).order_by(ISMSContextDB.created_at.desc()).first()
if not context:
raise HTTPException(status_code=404, detail="No ISMS context defined yet")
return context
@router.post("/context", response_model=ISMSContextResponse)
async def create_isms_context(
data: ISMSContextCreate,
created_by: str = Query(...),
db: Session = Depends(get_db)
):
"""Create or update ISMS context analysis."""
# Supersede existing
existing = db.query(ISMSContextDB).filter(
ISMSContextDB.status != ApprovalStatusEnum.SUPERSEDED
).all()
for c in existing:
c.status = ApprovalStatusEnum.SUPERSEDED
context = ISMSContextDB(
id=generate_id(),
internal_issues=[i.model_dump() for i in data.internal_issues] if data.internal_issues else None,
external_issues=[i.model_dump() for i in data.external_issues] if data.external_issues else None,
interested_parties=[p.model_dump() for p in data.interested_parties] if data.interested_parties else None,
regulatory_requirements=data.regulatory_requirements,
contractual_requirements=data.contractual_requirements,
swot_strengths=data.swot_strengths,
swot_weaknesses=data.swot_weaknesses,
swot_opportunities=data.swot_opportunities,
swot_threats=data.swot_threats,
status=ApprovalStatusEnum.DRAFT
)
db.add(context)
log_audit_trail(db, "isms_context", context.id, "ISMS Context", "create", created_by)
db.commit()
db.refresh(context)
return context
# =============================================================================
# ISMS POLICIES (ISO 27001 5.2)
# =============================================================================
@router.get("/policies", response_model=ISMSPolicyListResponse)
async def list_policies(
policy_type: Optional[str] = None,
status: Optional[str] = None,
db: Session = Depends(get_db)
):
"""List all ISMS policies."""
query = db.query(ISMSPolicyDB)
if policy_type:
query = query.filter(ISMSPolicyDB.policy_type == policy_type)
if status:
query = query.filter(ISMSPolicyDB.status == status)
policies = query.order_by(ISMSPolicyDB.policy_id).all()
return ISMSPolicyListResponse(policies=policies, total=len(policies))
@router.post("/policies", response_model=ISMSPolicyResponse)
async def create_policy(data: ISMSPolicyCreate, db: Session = Depends(get_db)):
"""Create a new ISMS policy."""
# Check for duplicate policy_id
existing = db.query(ISMSPolicyDB).filter(
ISMSPolicyDB.policy_id == data.policy_id
).first()
if existing:
raise HTTPException(status_code=400, detail=f"Policy {data.policy_id} already exists")
policy = ISMSPolicyDB(
id=generate_id(),
policy_id=data.policy_id,
title=data.title,
policy_type=data.policy_type,
description=data.description,
policy_text=data.policy_text,
applies_to=data.applies_to,
review_frequency_months=data.review_frequency_months,
related_controls=data.related_controls,
authored_by=data.authored_by,
status=ApprovalStatusEnum.DRAFT
)
db.add(policy)
log_audit_trail(db, "isms_policy", policy.id, policy.policy_id, "create", data.authored_by)
db.commit()
db.refresh(policy)
return policy
@router.get("/policies/{policy_id}", response_model=ISMSPolicyResponse)
async def get_policy(policy_id: str, db: Session = Depends(get_db)):
"""Get a specific policy by ID."""
policy = db.query(ISMSPolicyDB).filter(
(ISMSPolicyDB.id == policy_id) | (ISMSPolicyDB.policy_id == policy_id)
).first()
if not policy:
raise HTTPException(status_code=404, detail="Policy not found")
return policy
@router.put("/policies/{policy_id}", response_model=ISMSPolicyResponse)
async def update_policy(
policy_id: str,
data: ISMSPolicyUpdate,
updated_by: str = Query(...),
db: Session = Depends(get_db)
):
"""Update a policy (creates new version if approved)."""
policy = db.query(ISMSPolicyDB).filter(
(ISMSPolicyDB.id == policy_id) | (ISMSPolicyDB.policy_id == policy_id)
).first()
if not policy:
raise HTTPException(status_code=404, detail="Policy not found")
if policy.status == ApprovalStatusEnum.APPROVED:
# Increment major version
version_parts = policy.version.split(".")
policy.version = f"{int(version_parts[0]) + 1}.0"
policy.status = ApprovalStatusEnum.DRAFT
for field, value in data.model_dump(exclude_unset=True).items():
setattr(policy, field, value)
log_audit_trail(db, "isms_policy", policy.id, policy.policy_id, "update", updated_by)
db.commit()
db.refresh(policy)
return policy
@router.post("/policies/{policy_id}/approve", response_model=ISMSPolicyResponse)
async def approve_policy(
policy_id: str,
data: ISMSPolicyApproveRequest,
db: Session = Depends(get_db)
):
"""Approve a policy. Must be approved by top management."""
policy = db.query(ISMSPolicyDB).filter(
(ISMSPolicyDB.id == policy_id) | (ISMSPolicyDB.policy_id == policy_id)
).first()
if not policy:
raise HTTPException(status_code=404, detail="Policy not found")
policy.reviewed_by = data.reviewed_by
policy.approved_by = data.approved_by
policy.approved_at = datetime.now(timezone.utc)
policy.effective_date = data.effective_date
policy.next_review_date = date(
data.effective_date.year + (policy.review_frequency_months // 12),
data.effective_date.month,
data.effective_date.day
)
policy.status = ApprovalStatusEnum.APPROVED
policy.approval_signature = create_signature(
f"{policy.policy_id}|{data.approved_by}|{datetime.now(timezone.utc).isoformat()}"
)
log_audit_trail(db, "isms_policy", policy.id, policy.policy_id, "approve", data.approved_by)
db.commit()
db.refresh(policy)
return policy
# =============================================================================
# SECURITY OBJECTIVES (ISO 27001 6.2)
# =============================================================================
@router.get("/objectives", response_model=SecurityObjectiveListResponse)
async def list_objectives(
category: Optional[str] = None,
status: Optional[str] = None,
db: Session = Depends(get_db)
):
"""List all security objectives."""
query = db.query(SecurityObjectiveDB)
if category:
query = query.filter(SecurityObjectiveDB.category == category)
if status:
query = query.filter(SecurityObjectiveDB.status == status)
objectives = query.order_by(SecurityObjectiveDB.objective_id).all()
return SecurityObjectiveListResponse(objectives=objectives, total=len(objectives))
@router.post("/objectives", response_model=SecurityObjectiveResponse)
async def create_objective(
data: SecurityObjectiveCreate,
created_by: str = Query(...),
db: Session = Depends(get_db)
):
"""Create a new security objective."""
objective = SecurityObjectiveDB(
id=generate_id(),
objective_id=data.objective_id,
title=data.title,
description=data.description,
category=data.category,
specific=data.specific,
measurable=data.measurable,
achievable=data.achievable,
relevant=data.relevant,
time_bound=data.time_bound,
kpi_name=data.kpi_name,
kpi_target=data.kpi_target,
kpi_unit=data.kpi_unit,
measurement_frequency=data.measurement_frequency,
owner=data.owner,
target_date=data.target_date,
related_controls=data.related_controls,
related_risks=data.related_risks,
status="active"
)
db.add(objective)
log_audit_trail(db, "security_objective", objective.id, objective.objective_id, "create", created_by)
db.commit()
db.refresh(objective)
return objective
@router.put("/objectives/{objective_id}", response_model=SecurityObjectiveResponse)
async def update_objective(
objective_id: str,
data: SecurityObjectiveUpdate,
updated_by: str = Query(...),
db: Session = Depends(get_db)
):
"""Update a security objective's progress."""
objective = db.query(SecurityObjectiveDB).filter(
(SecurityObjectiveDB.id == objective_id) |
(SecurityObjectiveDB.objective_id == objective_id)
).first()
if not objective:
raise HTTPException(status_code=404, detail="Objective not found")
for field, value in data.model_dump(exclude_unset=True).items():
setattr(objective, field, value)
# Mark as achieved if progress is 100%
if objective.progress_percentage >= 100 and objective.status == "active":
objective.status = "achieved"
objective.achieved_date = date.today()
log_audit_trail(db, "security_objective", objective.id, objective.objective_id, "update", updated_by)
db.commit()
db.refresh(objective)
return objective
# =============================================================================
# STATEMENT OF APPLICABILITY (SoA)
# =============================================================================
@router.get("/soa", response_model=SoAListResponse)
async def list_soa_entries(
is_applicable: Optional[bool] = None,
implementation_status: Optional[str] = None,
category: Optional[str] = None,
db: Session = Depends(get_db)
):
"""List all Statement of Applicability entries."""
query = db.query(StatementOfApplicabilityDB)
if is_applicable is not None:
query = query.filter(StatementOfApplicabilityDB.is_applicable == is_applicable)
if implementation_status:
query = query.filter(StatementOfApplicabilityDB.implementation_status == implementation_status)
if category:
query = query.filter(StatementOfApplicabilityDB.annex_a_category == category)
entries = query.order_by(StatementOfApplicabilityDB.annex_a_control).all()
applicable_count = sum(1 for e in entries if e.is_applicable)
implemented_count = sum(1 for e in entries if e.implementation_status == "implemented")
planned_count = sum(1 for e in entries if e.implementation_status == "planned")
return SoAListResponse(
entries=entries,
total=len(entries),
applicable_count=applicable_count,
not_applicable_count=len(entries) - applicable_count,
implemented_count=implemented_count,
planned_count=planned_count
)
@router.post("/soa", response_model=SoAEntryResponse)
async def create_soa_entry(
data: SoAEntryCreate,
created_by: str = Query(...),
db: Session = Depends(get_db)
):
"""Create a new SoA entry for an Annex A control."""
# Check for duplicate
existing = db.query(StatementOfApplicabilityDB).filter(
StatementOfApplicabilityDB.annex_a_control == data.annex_a_control
).first()
if existing:
raise HTTPException(status_code=400, detail=f"SoA entry for {data.annex_a_control} already exists")
entry = StatementOfApplicabilityDB(
id=generate_id(),
annex_a_control=data.annex_a_control,
annex_a_title=data.annex_a_title,
annex_a_category=data.annex_a_category,
is_applicable=data.is_applicable,
applicability_justification=data.applicability_justification,
implementation_status=data.implementation_status,
implementation_notes=data.implementation_notes,
breakpilot_control_ids=data.breakpilot_control_ids,
coverage_level=data.coverage_level,
evidence_description=data.evidence_description,
risk_assessment_notes=data.risk_assessment_notes,
compensating_controls=data.compensating_controls
)
db.add(entry)
log_audit_trail(db, "soa", entry.id, entry.annex_a_control, "create", created_by)
db.commit()
db.refresh(entry)
return entry
@router.put("/soa/{entry_id}", response_model=SoAEntryResponse)
async def update_soa_entry(
entry_id: str,
data: SoAEntryUpdate,
updated_by: str = Query(...),
db: Session = Depends(get_db)
):
"""Update an SoA entry."""
entry = db.query(StatementOfApplicabilityDB).filter(
(StatementOfApplicabilityDB.id == entry_id) |
(StatementOfApplicabilityDB.annex_a_control == entry_id)
).first()
if not entry:
raise HTTPException(status_code=404, detail="SoA entry not found")
for field, value in data.model_dump(exclude_unset=True).items():
setattr(entry, field, value)
# Increment version
version_parts = entry.version.split(".")
entry.version = f"{version_parts[0]}.{int(version_parts[1]) + 1}"
log_audit_trail(db, "soa", entry.id, entry.annex_a_control, "update", updated_by)
db.commit()
db.refresh(entry)
return entry
@router.post("/soa/{entry_id}/approve", response_model=SoAEntryResponse)
async def approve_soa_entry(
entry_id: str,
data: SoAApproveRequest,
db: Session = Depends(get_db)
):
"""Approve an SoA entry."""
entry = db.query(StatementOfApplicabilityDB).filter(
(StatementOfApplicabilityDB.id == entry_id) |
(StatementOfApplicabilityDB.annex_a_control == entry_id)
).first()
if not entry:
raise HTTPException(status_code=404, detail="SoA entry not found")
entry.reviewed_by = data.reviewed_by
entry.reviewed_at = datetime.now(timezone.utc)
entry.approved_by = data.approved_by
entry.approved_at = datetime.now(timezone.utc)
log_audit_trail(db, "soa", entry.id, entry.annex_a_control, "approve", data.approved_by)
db.commit()
db.refresh(entry)
return entry
# =============================================================================
# AUDIT FINDINGS (Major/Minor/OFI)
# =============================================================================
@router.get("/findings", response_model=AuditFindingListResponse)
async def list_findings(
finding_type: Optional[str] = None,
status: Optional[str] = None,
internal_audit_id: Optional[str] = None,
db: Session = Depends(get_db)
):
"""List all audit findings."""
query = db.query(AuditFindingDB)
if finding_type:
query = query.filter(AuditFindingDB.finding_type == finding_type)
if status:
query = query.filter(AuditFindingDB.status == status)
if internal_audit_id:
query = query.filter(AuditFindingDB.internal_audit_id == internal_audit_id)
findings = query.order_by(AuditFindingDB.identified_date.desc()).all()
major_count = sum(1 for f in findings if f.finding_type == FindingTypeEnum.MAJOR)
minor_count = sum(1 for f in findings if f.finding_type == FindingTypeEnum.MINOR)
ofi_count = sum(1 for f in findings if f.finding_type == FindingTypeEnum.OFI)
open_count = sum(1 for f in findings if f.status != FindingStatusEnum.CLOSED)
return AuditFindingListResponse(
findings=findings,
total=len(findings),
major_count=major_count,
minor_count=minor_count,
ofi_count=ofi_count,
open_count=open_count
)
@router.post("/findings", response_model=AuditFindingResponse)
async def create_finding(data: AuditFindingCreate, db: Session = Depends(get_db)):
"""
Create a new audit finding.
Finding types:
- major: Blocks certification, requires immediate CAPA
- minor: Requires CAPA within deadline
- ofi: Opportunity for improvement (no mandatory action)
- positive: Good practice observation
"""
# Generate finding ID
year = date.today().year
existing_count = db.query(AuditFindingDB).filter(
AuditFindingDB.finding_id.like(f"FIND-{year}-%")
).count()
finding_id = f"FIND-{year}-{existing_count + 1:03d}"
finding = AuditFindingDB(
id=generate_id(),
finding_id=finding_id,
audit_session_id=data.audit_session_id,
internal_audit_id=data.internal_audit_id,
finding_type=FindingTypeEnum(data.finding_type),
iso_chapter=data.iso_chapter,
annex_a_control=data.annex_a_control,
title=data.title,
description=data.description,
objective_evidence=data.objective_evidence,
impact_description=data.impact_description,
affected_processes=data.affected_processes,
affected_assets=data.affected_assets,
owner=data.owner,
auditor=data.auditor,
due_date=data.due_date,
status=FindingStatusEnum.OPEN
)
db.add(finding)
# Update internal audit counts if linked
if data.internal_audit_id:
audit = db.query(InternalAuditDB).filter(
InternalAuditDB.id == data.internal_audit_id
).first()
if audit:
audit.total_findings = (audit.total_findings or 0) + 1
if data.finding_type == "major":
audit.major_findings = (audit.major_findings or 0) + 1
elif data.finding_type == "minor":
audit.minor_findings = (audit.minor_findings or 0) + 1
elif data.finding_type == "ofi":
audit.ofi_count = (audit.ofi_count or 0) + 1
elif data.finding_type == "positive":
audit.positive_observations = (audit.positive_observations or 0) + 1
log_audit_trail(db, "audit_finding", finding.id, finding_id, "create", data.auditor)
db.commit()
db.refresh(finding)
return finding
@router.put("/findings/{finding_id}", response_model=AuditFindingResponse)
async def update_finding(
finding_id: str,
data: AuditFindingUpdate,
updated_by: str = Query(...),
db: Session = Depends(get_db)
):
"""Update an audit finding."""
finding = db.query(AuditFindingDB).filter(
(AuditFindingDB.id == finding_id) | (AuditFindingDB.finding_id == finding_id)
).first()
if not finding:
raise HTTPException(status_code=404, detail="Finding not found")
for field, value in data.model_dump(exclude_unset=True).items():
if field == "status" and value:
setattr(finding, field, FindingStatusEnum(value))
else:
setattr(finding, field, value)
log_audit_trail(db, "audit_finding", finding.id, finding.finding_id, "update", updated_by)
db.commit()
db.refresh(finding)
return finding
@router.post("/findings/{finding_id}/close", response_model=AuditFindingResponse)
async def close_finding(
finding_id: str,
data: AuditFindingCloseRequest,
db: Session = Depends(get_db)
):
"""
Close an audit finding after verification.
Requires:
- All CAPAs to be completed and verified
- Verification evidence documenting the fix
"""
finding = db.query(AuditFindingDB).filter(
(AuditFindingDB.id == finding_id) | (AuditFindingDB.finding_id == finding_id)
).first()
if not finding:
raise HTTPException(status_code=404, detail="Finding not found")
# Check if all CAPAs are verified
open_capas = db.query(CorrectiveActionDB).filter(
CorrectiveActionDB.finding_id == finding.id,
CorrectiveActionDB.status != "verified"
).count()
if open_capas > 0:
raise HTTPException(
status_code=400,
detail=f"Cannot close finding: {open_capas} CAPA(s) not yet verified"
)
finding.status = FindingStatusEnum.CLOSED
finding.closed_date = date.today()
finding.closure_notes = data.closure_notes
finding.closed_by = data.closed_by
finding.verification_method = data.verification_method
finding.verification_evidence = data.verification_evidence
finding.verified_by = data.closed_by
finding.verified_at = datetime.now(timezone.utc)
log_audit_trail(db, "audit_finding", finding.id, finding.finding_id, "close", data.closed_by)
db.commit()
db.refresh(finding)
return finding
# =============================================================================
# CORRECTIVE ACTIONS (CAPA)
# =============================================================================
@router.get("/capa", response_model=CorrectiveActionListResponse)
async def list_capas(
finding_id: Optional[str] = None,
status: Optional[str] = None,
assigned_to: Optional[str] = None,
db: Session = Depends(get_db)
):
"""List all corrective/preventive actions."""
query = db.query(CorrectiveActionDB)
if finding_id:
query = query.filter(CorrectiveActionDB.finding_id == finding_id)
if status:
query = query.filter(CorrectiveActionDB.status == status)
if assigned_to:
query = query.filter(CorrectiveActionDB.assigned_to == assigned_to)
actions = query.order_by(CorrectiveActionDB.planned_completion).all()
return CorrectiveActionListResponse(actions=actions, total=len(actions))
@router.post("/capa", response_model=CorrectiveActionResponse)
async def create_capa(
data: CorrectiveActionCreate,
created_by: str = Query(...),
db: Session = Depends(get_db)
):
"""Create a new corrective/preventive action for a finding."""
# Verify finding exists
finding = db.query(AuditFindingDB).filter(AuditFindingDB.id == data.finding_id).first()
if not finding:
raise HTTPException(status_code=404, detail="Finding not found")
# Generate CAPA ID
year = date.today().year
existing_count = db.query(CorrectiveActionDB).filter(
CorrectiveActionDB.capa_id.like(f"CAPA-{year}-%")
).count()
capa_id = f"CAPA-{year}-{existing_count + 1:03d}"
capa = CorrectiveActionDB(
id=generate_id(),
capa_id=capa_id,
finding_id=data.finding_id,
capa_type=CAPATypeEnum(data.capa_type),
title=data.title,
description=data.description,
expected_outcome=data.expected_outcome,
assigned_to=data.assigned_to,
planned_start=data.planned_start,
planned_completion=data.planned_completion,
effectiveness_criteria=data.effectiveness_criteria,
estimated_effort_hours=data.estimated_effort_hours,
resources_required=data.resources_required,
status="planned"
)
db.add(capa)
# Update finding status
finding.status = FindingStatusEnum.CORRECTIVE_ACTION_PENDING
log_audit_trail(db, "capa", capa.id, capa_id, "create", created_by)
db.commit()
db.refresh(capa)
return capa
@router.put("/capa/{capa_id}", response_model=CorrectiveActionResponse)
async def update_capa(
capa_id: str,
data: CorrectiveActionUpdate,
updated_by: str = Query(...),
db: Session = Depends(get_db)
):
"""Update a CAPA's progress."""
capa = db.query(CorrectiveActionDB).filter(
(CorrectiveActionDB.id == capa_id) | (CorrectiveActionDB.capa_id == capa_id)
).first()
if not capa:
raise HTTPException(status_code=404, detail="CAPA not found")
for field, value in data.model_dump(exclude_unset=True).items():
setattr(capa, field, value)
# If completed, set actual completion date
if capa.status == "completed" and not capa.actual_completion:
capa.actual_completion = date.today()
log_audit_trail(db, "capa", capa.id, capa.capa_id, "update", updated_by)
db.commit()
db.refresh(capa)
return capa
@router.post("/capa/{capa_id}/verify", response_model=CorrectiveActionResponse)
async def verify_capa(
capa_id: str,
data: CAPAVerifyRequest,
db: Session = Depends(get_db)
):
"""Verify the effectiveness of a CAPA."""
capa = db.query(CorrectiveActionDB).filter(
(CorrectiveActionDB.id == capa_id) | (CorrectiveActionDB.capa_id == capa_id)
).first()
if not capa:
raise HTTPException(status_code=404, detail="CAPA not found")
if capa.status != "completed":
raise HTTPException(status_code=400, detail="CAPA must be completed before verification")
capa.effectiveness_verified = data.is_effective
capa.effectiveness_verification_date = date.today()
capa.effectiveness_notes = data.effectiveness_notes
capa.status = "verified" if data.is_effective else "completed"
# If verified and all CAPAs for finding are verified, update finding status
if data.is_effective:
finding = db.query(AuditFindingDB).filter(AuditFindingDB.id == capa.finding_id).first()
if finding:
unverified = db.query(CorrectiveActionDB).filter(
CorrectiveActionDB.finding_id == finding.id,
CorrectiveActionDB.id != capa.id,
CorrectiveActionDB.status != "verified"
).count()
if unverified == 0:
finding.status = FindingStatusEnum.VERIFICATION_PENDING
log_audit_trail(db, "capa", capa.id, capa.capa_id, "verify", data.verified_by)
db.commit()
db.refresh(capa)
return capa
# =============================================================================
# MANAGEMENT REVIEW (ISO 27001 9.3)
# =============================================================================
@router.get("/management-reviews", response_model=ManagementReviewListResponse)
async def list_management_reviews(
status: Optional[str] = None,
db: Session = Depends(get_db)
):
"""List all management reviews."""
query = db.query(ManagementReviewDB)
if status:
query = query.filter(ManagementReviewDB.status == status)
reviews = query.order_by(ManagementReviewDB.review_date.desc()).all()
return ManagementReviewListResponse(reviews=reviews, total=len(reviews))
@router.post("/management-reviews", response_model=ManagementReviewResponse)
async def create_management_review(
data: ManagementReviewCreate,
created_by: str = Query(...),
db: Session = Depends(get_db)
):
"""Create a new management review."""
# Generate review ID
year = data.review_date.year
quarter = (data.review_date.month - 1) // 3 + 1
review_id = f"MR-{year}-Q{quarter}"
# Check for duplicate
existing = db.query(ManagementReviewDB).filter(
ManagementReviewDB.review_id == review_id
).first()
if existing:
review_id = f"{review_id}-{generate_id()[:4]}"
review = ManagementReviewDB(
id=generate_id(),
review_id=review_id,
title=data.title,
review_date=data.review_date,
review_period_start=data.review_period_start,
review_period_end=data.review_period_end,
chairperson=data.chairperson,
attendees=[a.model_dump() for a in data.attendees] if data.attendees else None,
status="draft"
)
db.add(review)
log_audit_trail(db, "management_review", review.id, review_id, "create", created_by)
db.commit()
db.refresh(review)
return review
@router.get("/management-reviews/{review_id}", response_model=ManagementReviewResponse)
async def get_management_review(review_id: str, db: Session = Depends(get_db)):
"""Get a specific management review."""
review = db.query(ManagementReviewDB).filter(
(ManagementReviewDB.id == review_id) | (ManagementReviewDB.review_id == review_id)
).first()
if not review:
raise HTTPException(status_code=404, detail="Management review not found")
return review
@router.put("/management-reviews/{review_id}", response_model=ManagementReviewResponse)
async def update_management_review(
review_id: str,
data: ManagementReviewUpdate,
updated_by: str = Query(...),
db: Session = Depends(get_db)
):
"""Update a management review with inputs/outputs."""
review = db.query(ManagementReviewDB).filter(
(ManagementReviewDB.id == review_id) | (ManagementReviewDB.review_id == review_id)
).first()
if not review:
raise HTTPException(status_code=404, detail="Management review not found")
for field, value in data.model_dump(exclude_unset=True).items():
if field == "action_items" and value:
setattr(review, field, [item.model_dump() for item in value])
else:
setattr(review, field, value)
log_audit_trail(db, "management_review", review.id, review.review_id, "update", updated_by)
db.commit()
db.refresh(review)
return review
@router.post("/management-reviews/{review_id}/approve", response_model=ManagementReviewResponse)
async def approve_management_review(
review_id: str,
data: ManagementReviewApproveRequest,
db: Session = Depends(get_db)
):
"""Approve a management review."""
review = db.query(ManagementReviewDB).filter(
(ManagementReviewDB.id == review_id) | (ManagementReviewDB.review_id == review_id)
).first()
if not review:
raise HTTPException(status_code=404, detail="Management review not found")
review.status = "approved"
review.approved_by = data.approved_by
review.approved_at = datetime.now(timezone.utc)
review.next_review_date = data.next_review_date
review.minutes_document_path = data.minutes_document_path
log_audit_trail(db, "management_review", review.id, review.review_id, "approve", data.approved_by)
db.commit()
db.refresh(review)
return review
# =============================================================================
# INTERNAL AUDIT (ISO 27001 9.2)
# =============================================================================
@router.get("/internal-audits", response_model=InternalAuditListResponse)
async def list_internal_audits(
status: Optional[str] = None,
audit_type: Optional[str] = None,
db: Session = Depends(get_db)
):
"""List all internal audits."""
query = db.query(InternalAuditDB)
if status:
query = query.filter(InternalAuditDB.status == status)
if audit_type:
query = query.filter(InternalAuditDB.audit_type == audit_type)
audits = query.order_by(InternalAuditDB.planned_date.desc()).all()
return InternalAuditListResponse(audits=audits, total=len(audits))
@router.post("/internal-audits", response_model=InternalAuditResponse)
async def create_internal_audit(
data: InternalAuditCreate,
created_by: str = Query(...),
db: Session = Depends(get_db)
):
"""Create a new internal audit."""
# Generate audit ID
year = data.planned_date.year
existing_count = db.query(InternalAuditDB).filter(
InternalAuditDB.audit_id.like(f"IA-{year}-%")
).count()
audit_id = f"IA-{year}-{existing_count + 1:03d}"
audit = InternalAuditDB(
id=generate_id(),
audit_id=audit_id,
title=data.title,
audit_type=data.audit_type,
scope_description=data.scope_description,
iso_chapters_covered=data.iso_chapters_covered,
annex_a_controls_covered=data.annex_a_controls_covered,
processes_covered=data.processes_covered,
departments_covered=data.departments_covered,
criteria=data.criteria,
planned_date=data.planned_date,
lead_auditor=data.lead_auditor,
audit_team=data.audit_team,
status="planned"
)
db.add(audit)
log_audit_trail(db, "internal_audit", audit.id, audit_id, "create", created_by)
db.commit()
db.refresh(audit)
return audit
@router.put("/internal-audits/{audit_id}", response_model=InternalAuditResponse)
async def update_internal_audit(
audit_id: str,
data: InternalAuditUpdate,
updated_by: str = Query(...),
db: Session = Depends(get_db)
):
"""Update an internal audit."""
audit = db.query(InternalAuditDB).filter(
(InternalAuditDB.id == audit_id) | (InternalAuditDB.audit_id == audit_id)
).first()
if not audit:
raise HTTPException(status_code=404, detail="Internal audit not found")
for field, value in data.model_dump(exclude_unset=True).items():
setattr(audit, field, value)
log_audit_trail(db, "internal_audit", audit.id, audit.audit_id, "update", updated_by)
db.commit()
db.refresh(audit)
return audit
@router.post("/internal-audits/{audit_id}/complete", response_model=InternalAuditResponse)
async def complete_internal_audit(
audit_id: str,
data: InternalAuditCompleteRequest,
completed_by: str = Query(...),
db: Session = Depends(get_db)
):
"""Complete an internal audit with conclusion."""
audit = db.query(InternalAuditDB).filter(
(InternalAuditDB.id == audit_id) | (InternalAuditDB.audit_id == audit_id)
).first()
if not audit:
raise HTTPException(status_code=404, detail="Internal audit not found")
audit.status = "completed"
audit.actual_end_date = date.today()
audit.report_date = date.today()
audit.audit_conclusion = data.audit_conclusion
audit.overall_assessment = data.overall_assessment
audit.follow_up_audit_required = data.follow_up_audit_required
log_audit_trail(db, "internal_audit", audit.id, audit.audit_id, "complete", completed_by)
db.commit()
db.refresh(audit)
return audit
# =============================================================================
# ISMS READINESS CHECK
# =============================================================================
@router.post("/readiness-check", response_model=ISMSReadinessCheckResponse)
async def run_readiness_check(
data: ISMSReadinessCheckRequest,
db: Session = Depends(get_db)
):
"""
Run ISMS readiness check.
Identifies potential Major/Minor findings BEFORE external audit.
This helps achieve ISO 27001 certification on the first attempt.
"""
potential_majors = []
potential_minors = []
improvement_opportunities = []
# Chapter 4: Context
scope = db.query(ISMSScopeDB).filter(
ISMSScopeDB.status == ApprovalStatusEnum.APPROVED
).first()
if not scope:
potential_majors.append(PotentialFinding(
check="ISMS Scope not approved",
status="fail",
recommendation="Approve ISMS scope with top management signature",
iso_reference="4.3"
))
context = db.query(ISMSContextDB).filter(
ISMSContextDB.status == ApprovalStatusEnum.APPROVED
).first()
if not context:
potential_majors.append(PotentialFinding(
check="ISMS Context not documented",
status="fail",
recommendation="Document and approve context analysis (4.1, 4.2)",
iso_reference="4.1, 4.2"
))
# Chapter 5: Leadership
master_policy = db.query(ISMSPolicyDB).filter(
ISMSPolicyDB.policy_type == "master",
ISMSPolicyDB.status == ApprovalStatusEnum.APPROVED
).first()
if not master_policy:
potential_majors.append(PotentialFinding(
check="Information Security Policy not approved",
status="fail",
recommendation="Create and approve master ISMS policy",
iso_reference="5.2"
))
# Chapter 6: Planning - Risk Assessment
from ..db.models import RiskDB
risks_without_treatment = db.query(RiskDB).filter(
RiskDB.status == "open",
RiskDB.treatment_plan is None
).count()
if risks_without_treatment > 0:
potential_majors.append(PotentialFinding(
check=f"{risks_without_treatment} risks without treatment plan",
status="fail",
recommendation="Define risk treatment for all identified risks",
iso_reference="6.1.2"
))
# Chapter 6: Objectives
objectives = db.query(SecurityObjectiveDB).filter(
SecurityObjectiveDB.status == "active"
).count()
if objectives == 0:
potential_majors.append(PotentialFinding(
check="No security objectives defined",
status="fail",
recommendation="Define measurable security objectives",
iso_reference="6.2"
))
# SoA
soa_total = db.query(StatementOfApplicabilityDB).count()
soa_unapproved = db.query(StatementOfApplicabilityDB).filter(
StatementOfApplicabilityDB.approved_at is None
).count()
if soa_total == 0:
potential_majors.append(PotentialFinding(
check="Statement of Applicability not created",
status="fail",
recommendation="Create SoA for all 93 Annex A controls",
iso_reference="Annex A"
))
elif soa_unapproved > 0:
potential_minors.append(PotentialFinding(
check=f"{soa_unapproved} SoA entries not approved",
status="warning",
recommendation="Review and approve all SoA entries",
iso_reference="Annex A"
))
# Chapter 9: Internal Audit
last_year = date.today().replace(year=date.today().year - 1)
internal_audit = db.query(InternalAuditDB).filter(
InternalAuditDB.status == "completed",
InternalAuditDB.actual_end_date >= last_year
).first()
if not internal_audit:
potential_majors.append(PotentialFinding(
check="No internal audit in last 12 months",
status="fail",
recommendation="Conduct internal audit before certification",
iso_reference="9.2"
))
# Chapter 9: Management Review
mgmt_review = db.query(ManagementReviewDB).filter(
ManagementReviewDB.status == "approved",
ManagementReviewDB.review_date >= last_year
).first()
if not mgmt_review:
potential_majors.append(PotentialFinding(
check="No management review in last 12 months",
status="fail",
recommendation="Conduct and approve management review",
iso_reference="9.3"
))
# Chapter 10: Open Findings
open_majors = db.query(AuditFindingDB).filter(
AuditFindingDB.finding_type == FindingTypeEnum.MAJOR,
AuditFindingDB.status != FindingStatusEnum.CLOSED
).count()
if open_majors > 0:
potential_majors.append(PotentialFinding(
check=f"{open_majors} open major finding(s)",
status="fail",
recommendation="Close all major findings before certification",
iso_reference="10.1"
))
open_minors = db.query(AuditFindingDB).filter(
AuditFindingDB.finding_type == FindingTypeEnum.MINOR,
AuditFindingDB.status != FindingStatusEnum.CLOSED
).count()
if open_minors > 0:
potential_minors.append(PotentialFinding(
check=f"{open_minors} open minor finding(s)",
status="warning",
recommendation="Address minor findings or have CAPA in progress",
iso_reference="10.1"
))
# Calculate scores
total_checks = 10
passed_checks = total_checks - len(potential_majors)
readiness_score = (passed_checks / total_checks) * 100
# Determine overall status
certification_possible = len(potential_majors) == 0
if certification_possible:
overall_status = "ready" if len(potential_minors) == 0 else "at_risk"
else:
overall_status = "not_ready"
# Determine chapter statuses
def get_chapter_status(has_major: bool, has_minor: bool) -> str:
if has_major:
return "fail"
elif has_minor:
return "warning"
return "pass"
chapter_4_majors = any("4." in (f.iso_reference or "") for f in potential_majors)
chapter_5_majors = any("5." in (f.iso_reference or "") for f in potential_majors)
chapter_6_majors = any("6." in (f.iso_reference or "") for f in potential_majors)
chapter_9_majors = any("9." in (f.iso_reference or "") for f in potential_majors)
chapter_10_majors = any("10." in (f.iso_reference or "") for f in potential_majors)
# Priority actions
priority_actions = [f.recommendation for f in potential_majors[:5]]
# Save check result
check = ISMSReadinessCheckDB(
id=generate_id(),
check_date=datetime.now(timezone.utc),
triggered_by=data.triggered_by,
overall_status=overall_status,
certification_possible=certification_possible,
chapter_4_status=get_chapter_status(chapter_4_majors, False),
chapter_5_status=get_chapter_status(chapter_5_majors, False),
chapter_6_status=get_chapter_status(chapter_6_majors, False),
chapter_7_status=get_chapter_status(
any("7." in (f.iso_reference or "") for f in potential_majors),
any("7." in (f.iso_reference or "") for f in potential_minors)
),
chapter_8_status=get_chapter_status(
any("8." in (f.iso_reference or "") for f in potential_majors),
any("8." in (f.iso_reference or "") for f in potential_minors)
),
chapter_9_status=get_chapter_status(chapter_9_majors, False),
chapter_10_status=get_chapter_status(chapter_10_majors, False),
potential_majors=[f.model_dump() for f in potential_majors],
potential_minors=[f.model_dump() for f in potential_minors],
improvement_opportunities=[f.model_dump() for f in improvement_opportunities],
readiness_score=readiness_score,
priority_actions=priority_actions
)
db.add(check)
db.commit()
db.refresh(check)
return ISMSReadinessCheckResponse(
id=check.id,
check_date=check.check_date,
triggered_by=check.triggered_by,
overall_status=check.overall_status,
certification_possible=check.certification_possible,
chapter_4_status=check.chapter_4_status,
chapter_5_status=check.chapter_5_status,
chapter_6_status=check.chapter_6_status,
chapter_7_status=check.chapter_7_status,
chapter_8_status=check.chapter_8_status,
chapter_9_status=check.chapter_9_status,
chapter_10_status=check.chapter_10_status,
potential_majors=potential_majors,
potential_minors=potential_minors,
improvement_opportunities=improvement_opportunities,
readiness_score=check.readiness_score,
documentation_score=None,
implementation_score=None,
evidence_score=None,
priority_actions=priority_actions
)
@router.get("/readiness-check/latest", response_model=ISMSReadinessCheckResponse)
async def get_latest_readiness_check(db: Session = Depends(get_db)):
"""Get the most recent readiness check result."""
check = db.query(ISMSReadinessCheckDB).order_by(
ISMSReadinessCheckDB.check_date.desc()
).first()
if not check:
raise HTTPException(status_code=404, detail="No readiness check found. Run one first.")
return check
# =============================================================================
# AUDIT TRAIL
# =============================================================================
@router.get("/audit-trail", response_model=AuditTrailResponse)
async def get_audit_trail(
entity_type: Optional[str] = None,
entity_id: Optional[str] = None,
performed_by: Optional[str] = None,
action: Optional[str] = None,
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db)
):
"""Query the audit trail with filters."""
query = db.query(AuditTrailDB)
if entity_type:
query = query.filter(AuditTrailDB.entity_type == entity_type)
if entity_id:
query = query.filter(AuditTrailDB.entity_id == entity_id)
if performed_by:
query = query.filter(AuditTrailDB.performed_by == performed_by)
if action:
query = query.filter(AuditTrailDB.action == action)
total = query.count()
entries = query.order_by(AuditTrailDB.performed_at.desc()).offset(
(page - 1) * page_size
).limit(page_size).all()
total_pages = (total + page_size - 1) // page_size
return AuditTrailResponse(
entries=entries,
total=total,
pagination=PaginationMeta(
page=page,
page_size=page_size,
total=total,
total_pages=total_pages,
has_next=page < total_pages,
has_prev=page > 1
)
)
# =============================================================================
# ISO 27001 OVERVIEW
# =============================================================================
@router.get("/overview", response_model=ISO27001OverviewResponse)
async def get_iso27001_overview(db: Session = Depends(get_db)):
"""
Get complete ISO 27001 compliance overview.
Shows status of all chapters, key metrics, and readiness for certification.
"""
# Scope & SoA approval status
scope = db.query(ISMSScopeDB).filter(
ISMSScopeDB.status == ApprovalStatusEnum.APPROVED
).first()
scope_approved = scope is not None
soa_total = db.query(StatementOfApplicabilityDB).count()
soa_approved = db.query(StatementOfApplicabilityDB).filter(
StatementOfApplicabilityDB.approved_at.isnot(None)
).count()
soa_all_approved = soa_total > 0 and soa_approved == soa_total
# Management Review & Internal Audit
last_year = date.today().replace(year=date.today().year - 1)
last_mgmt_review = db.query(ManagementReviewDB).filter(
ManagementReviewDB.status == "approved"
).order_by(ManagementReviewDB.review_date.desc()).first()
last_internal_audit = db.query(InternalAuditDB).filter(
InternalAuditDB.status == "completed"
).order_by(InternalAuditDB.actual_end_date.desc()).first()
# Findings
open_majors = db.query(AuditFindingDB).filter(
AuditFindingDB.finding_type == FindingTypeEnum.MAJOR,
AuditFindingDB.status != FindingStatusEnum.CLOSED
).count()
open_minors = db.query(AuditFindingDB).filter(
AuditFindingDB.finding_type == FindingTypeEnum.MINOR,
AuditFindingDB.status != FindingStatusEnum.CLOSED
).count()
# Policies
policies_total = db.query(ISMSPolicyDB).count()
policies_approved = db.query(ISMSPolicyDB).filter(
ISMSPolicyDB.status == ApprovalStatusEnum.APPROVED
).count()
# Objectives
objectives_total = db.query(SecurityObjectiveDB).count()
objectives_achieved = db.query(SecurityObjectiveDB).filter(
SecurityObjectiveDB.status == "achieved"
).count()
# Calculate readiness — empty DB must yield 0%
# Each factor requires positive evidence (not just absence of problems)
has_any_data = any([
scope_approved, soa_total > 0, policies_total > 0,
objectives_total > 0, last_mgmt_review is not None,
last_internal_audit is not None
])
if not has_any_data:
certification_readiness = 0.0
else:
readiness_factors = [
scope_approved,
soa_all_approved,
last_mgmt_review is not None and last_mgmt_review.review_date >= last_year,
last_internal_audit is not None and (last_internal_audit.actual_end_date or date.min) >= last_year,
open_majors == 0 and (soa_total > 0 or policies_total > 0), # Only counts if there's actual data
policies_total > 0 and policies_approved >= policies_total * 0.8,
objectives_total > 0
]
certification_readiness = sum(readiness_factors) / len(readiness_factors) * 100
# Overall status
if not has_any_data:
overall_status = "not_started"
elif open_majors > 0:
overall_status = "not_ready"
elif certification_readiness >= 80:
overall_status = "ready"
else:
overall_status = "at_risk"
# Build chapter status list — empty DB must show 0% / "not_started"
def _chapter_status(has_positive_evidence: bool, has_issues: bool) -> str:
if not has_positive_evidence:
return "not_started"
return "compliant" if not has_issues else "non_compliant"
# Chapter 9: count sub-components for percentage
ch9_parts = sum([last_mgmt_review is not None, last_internal_audit is not None])
ch9_pct = (ch9_parts / 2) * 100
# Chapter 10: only show 100% if there's actual CAPA activity, not just empty
capa_total = db.query(AuditFindingDB).count()
ch10_has_data = capa_total > 0
ch10_pct = 100.0 if (ch10_has_data and open_majors == 0) else (50.0 if ch10_has_data else 0.0)
chapters = [
ISO27001ChapterStatus(
chapter="4",
title="Kontext der Organisation",
status=_chapter_status(scope_approved, False),
completion_percentage=100.0 if scope_approved else 0.0,
open_findings=0,
key_documents=["ISMS Scope", "Context Analysis"] if scope_approved else [],
last_reviewed=scope.approved_at if scope else None
),
ISO27001ChapterStatus(
chapter="5",
title="Führung",
status=_chapter_status(policies_total > 0, policies_approved < policies_total),
completion_percentage=(policies_approved / max(policies_total, 1)) * 100 if policies_total > 0 else 0.0,
open_findings=0,
key_documents=[f"Policy {i+1}" for i in range(min(policies_approved, 3))] if policies_approved > 0 else [],
last_reviewed=None
),
ISO27001ChapterStatus(
chapter="6",
title="Planung",
status=_chapter_status(objectives_total > 0, False),
completion_percentage=75.0 if objectives_total > 0 else 0.0,
open_findings=0,
key_documents=["Risk Register", "Security Objectives"] if objectives_total > 0 else [],
last_reviewed=None
),
ISO27001ChapterStatus(
chapter="9",
title="Bewertung der Leistung",
status=_chapter_status(ch9_parts > 0, open_majors + open_minors > 0),
completion_percentage=ch9_pct,
open_findings=open_majors + open_minors,
key_documents=(
(["Internal Audit Report"] if last_internal_audit else []) +
(["Management Review Minutes"] if last_mgmt_review else [])
),
last_reviewed=last_mgmt_review.approved_at if last_mgmt_review else None
),
ISO27001ChapterStatus(
chapter="10",
title="Verbesserung",
status=_chapter_status(ch10_has_data, open_majors > 0),
completion_percentage=ch10_pct,
open_findings=open_majors,
key_documents=["CAPA Register"] if ch10_has_data else [],
last_reviewed=None
)
]
return ISO27001OverviewResponse(
overall_status=overall_status,
certification_readiness=certification_readiness,
chapters=chapters,
scope_approved=scope_approved,
soa_approved=soa_all_approved,
last_management_review=last_mgmt_review.approved_at if last_mgmt_review else None,
last_internal_audit=datetime.combine(last_internal_audit.actual_end_date, datetime.min.time()) if last_internal_audit and last_internal_audit.actual_end_date else None,
open_major_findings=open_majors,
open_minor_findings=open_minors,
policies_count=policies_total,
policies_approved=policies_approved,
objectives_count=objectives_total,
objectives_achieved=objectives_achieved
)