Files
breakpilot-compliance/backend-compliance/tests/test_isms_routes.py
Sharang Parnerkar 3320ef94fc refactor: phase 0 guardrails + phase 1 step 2 (models.py split)
Squash of branch refactor/phase0-guardrails-and-models-split — 4 commits,
81 files, 173/173 pytest green, OpenAPI contract preserved (360 paths /
484 operations).

## Phase 0 — Architecture guardrails

Three defense-in-depth layers to keep the architecture rules enforced
regardless of who opens Claude Code in this repo:

  1. .claude/settings.json PreToolUse hook on Write/Edit blocks any file
     that would exceed the 500-line hard cap. Auto-loads in every Claude
     session in this repo.
  2. scripts/githooks/pre-commit (install via scripts/install-hooks.sh)
     enforces the LOC cap locally, freezes migrations/ without
     [migration-approved], and protects guardrail files without
     [guardrail-change].
  3. .gitea/workflows/ci.yaml gains loc-budget + guardrail-integrity +
     sbom-scan (syft+grype) jobs, adds mypy --strict for the new Python
     packages (compliance/{services,repositories,domain,schemas}), and
     tsc --noEmit for admin-compliance + developer-portal.

Per-language conventions documented in AGENTS.python.md, AGENTS.go.md,
AGENTS.typescript.md at the repo root — layering, tooling, and explicit
"what you may NOT do" lists. Root CLAUDE.md is prepended with the six
non-negotiable rules. Each of the 10 services gets a README.md.

scripts/check-loc.sh enforces soft 300 / hard 500 and surfaces the
current baseline of 205 hard + 161 soft violations so Phases 1-4 can
drain it incrementally. CI gates only CHANGED files in PRs so the
legacy baseline does not block unrelated work.

## Deprecation sweep

47 files. Pydantic V1 regex= -> pattern= (2 sites), class Config ->
ConfigDict in source_policy_router.py (schemas.py intentionally skipped;
it is the Phase 1 Step 3 split target). datetime.utcnow() ->
datetime.now(timezone.utc) everywhere including SQLAlchemy default=
callables. All DB columns already declare timezone=True, so this is a
latent-bug fix at the Python side, not a schema change.

DeprecationWarning count dropped from 158 to 35.

## Phase 1 Step 1 — Contract test harness

tests/contracts/test_openapi_baseline.py diffs the live FastAPI /openapi.json
against tests/contracts/openapi.baseline.json on every test run. Fails on
removed paths, removed status codes, or new required request body fields.
Regenerate only via tests/contracts/regenerate_baseline.py after a
consumer-updated contract change. This is the safety harness for all
subsequent refactor commits.

## Phase 1 Step 2 — models.py split (1466 -> 85 LOC shim)

compliance/db/models.py is decomposed into seven sibling aggregate modules
following the existing repo pattern (dsr_models.py, vvt_models.py, ...):

  regulation_models.py       (134) — Regulation, Requirement
  control_models.py          (279) — Control, Mapping, Evidence, Risk
  ai_system_models.py        (141) — AISystem, AuditExport
  service_module_models.py   (176) — ServiceModule, ModuleRegulation, ModuleRisk
  audit_session_models.py    (177) — AuditSession, AuditSignOff
  isms_governance_models.py  (323) — ISMSScope, Context, Policy, Objective, SoA
  isms_audit_models.py       (468) — Finding, CAPA, MgmtReview, InternalAudit,
                                     AuditTrail, Readiness

models.py becomes an 85-line re-export shim in dependency order so
existing imports continue to work unchanged. Schema is byte-identical:
__tablename__, column definitions, relationship strings, back_populates,
cascade directives all preserved.

All new sibling files are under the 500-line hard cap; largest is
isms_audit_models.py at 468. No file in compliance/db/ now exceeds
the hard cap.

## Phase 1 Step 3 — infrastructure only

backend-compliance/compliance/{schemas,domain,repositories}/ packages
are created as landing zones with docstrings. compliance/domain/
exports DomainError / NotFoundError / ConflictError / ValidationError /
PermissionError — the base classes services will use to raise
domain-level errors instead of HTTPException.

PHASE1_RUNBOOK.md at backend-compliance/PHASE1_RUNBOOK.md documents
the nine-step execution plan for Phase 1: snapshot baseline,
characterization tests, split models.py (this commit), split schemas.py
(next), extract services, extract repositories, mypy --strict, coverage.

## Verification

  backend-compliance/.venv-phase1: uv python install 3.12 + pip -r requirements.txt
  PYTHONPATH=. pytest compliance/tests/ tests/contracts/
  -> 173 passed, 0 failed, 35 warnings, OpenAPI 360/484 unchanged

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 13:18:29 +02:00

891 lines
35 KiB
Python

"""Integration tests for ISMS routes (isms_routes.py).
Tests the ISO 27001 ISMS API endpoints using TestClient + SQLite + ORM:
- Scope CRUD + Approval
- Policy CRUD + Approval + Duplicate check
- Overview / Dashboard endpoint
- Readiness check
- Edge cases (not found, invalid data, etc.)
Run with: cd backend-compliance && python3 -m pytest tests/test_isms_routes.py -v
"""
import os
import sys
import pytest
from datetime import date, datetime, timezone
from fastapi import FastAPI
from fastapi.testclient import TestClient
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from classroom_engine.database import Base, get_db
from compliance.api.isms_routes import router as isms_router
# =============================================================================
# Test App + SQLite Setup
# =============================================================================
SQLALCHEMY_DATABASE_URL = "sqlite:///./test_isms.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
TestSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@event.listens_for(engine, "connect")
def _set_sqlite_pragma(dbapi_conn, connection_record):
"""Enable foreign keys and register NOW() for SQLite."""
cursor = dbapi_conn.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
dbapi_conn.create_function("NOW", 0, lambda: datetime.now(timezone.utc).isoformat())
app = FastAPI()
app.include_router(isms_router)
def override_get_db():
db = TestSessionLocal()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
# =============================================================================
# Fixtures
# =============================================================================
@pytest.fixture(autouse=True)
def setup_db():
"""Create all tables before each test module, drop after."""
# Import all models so Base.metadata knows about them
import compliance.db.models # noqa: F401
Base.metadata.create_all(bind=engine)
yield
Base.metadata.drop_all(bind=engine)
# =============================================================================
# Helper data builders
# =============================================================================
def _scope_payload(**overrides):
data = {
"scope_statement": "ISMS covers all BreakPilot digital learning operations",
"included_locations": ["Frankfurt Office", "AWS eu-central-1"],
"included_processes": ["Software Development", "Data Processing"],
"included_services": ["BreakPilot PWA", "AI Assistant"],
"excluded_items": ["Marketing Website"],
"exclusion_justification": "Static site, no user data",
}
data.update(overrides)
return data
def _policy_payload(policy_id="POL-ISMS-001", **overrides):
data = {
"policy_id": policy_id,
"title": "Information Security Policy",
"policy_type": "master",
"description": "Master ISMS policy",
"policy_text": "This policy establishes the framework for information security...",
"applies_to": ["All Employees"],
"review_frequency_months": 12,
"related_controls": ["GOV-001"],
"authored_by": "iso@breakpilot.de",
}
data.update(overrides)
return data
def _objective_payload(objective_id="OBJ-2026-001", **overrides):
data = {
"objective_id": objective_id,
"title": "Reduce Security Incidents",
"description": "Reduce incidents by 30%",
"category": "operational",
"specific": "Reduce from 10 to 7 per year",
"measurable": "Incident count in ticketing system",
"achievable": "Based on trend analysis",
"relevant": "Supports info sec goals",
"time_bound": "By Q4 2026",
"kpi_name": "Security Incident Count",
"kpi_target": "7",
"kpi_unit": "incidents/year",
"measurement_frequency": "monthly",
"owner": "security@breakpilot.de",
"target_date": "2026-12-31",
"related_controls": ["OPS-003"],
}
data.update(overrides)
return data
def _soa_payload(annex_a_control="A.5.1", **overrides):
data = {
"annex_a_control": annex_a_control,
"annex_a_title": "Policies for information security",
"annex_a_category": "organizational",
"is_applicable": True,
"applicability_justification": "Required for ISMS governance",
"implementation_status": "implemented",
"implementation_notes": "Covered by GOV-001",
"breakpilot_control_ids": ["GOV-001"],
"coverage_level": "full",
"evidence_description": "ISMS Policy v2.0",
}
data.update(overrides)
return data
def _finding_payload(**overrides):
data = {
"finding_type": "minor",
"iso_chapter": "9.2",
"annex_a_control": "A.5.35",
"title": "Audit schedule not documented",
"description": "No formal internal audit schedule found",
"objective_evidence": "No document in DMS",
"impact_description": "Cannot demonstrate planned approach",
"owner": "iso@breakpilot.de",
"auditor": "external.auditor@cert.de",
"due_date": "2026-03-31",
}
data.update(overrides)
return data
def _mgmt_review_payload(**overrides):
data = {
"title": "Q1 2026 Management Review",
"review_date": "2026-01-15",
"review_period_start": "2025-10-01",
"review_period_end": "2025-12-31",
"chairperson": "ceo@breakpilot.de",
"attendees": [
{"name": "CEO", "role": "Chairperson"},
{"name": "CTO", "role": "Technical Lead"},
],
}
data.update(overrides)
return data
def _internal_audit_payload(**overrides):
data = {
"title": "ISMS Internal Audit 2026",
"audit_type": "scheduled",
"scope_description": "Complete ISMS audit covering all chapters",
"iso_chapters_covered": ["4", "5", "6", "7", "8", "9", "10"],
"annex_a_controls_covered": ["A.5", "A.6"],
"criteria": "ISO 27001:2022",
"planned_date": "2026-03-01",
"lead_auditor": "internal.auditor@breakpilot.de",
"audit_team": ["internal.auditor@breakpilot.de", "qa@breakpilot.de"],
}
data.update(overrides)
return data
# =============================================================================
# Test: ISMS Scope CRUD
# =============================================================================
class TestISMSScopeCRUD:
"""Tests for ISMS Scope CRUD endpoints."""
def test_create_scope(self):
"""POST /isms/scope should create a new scope."""
r = client.post("/isms/scope", json=_scope_payload(), params={"created_by": "admin@bp.de"})
assert r.status_code == 200
body = r.json()
assert body["scope_statement"] == "ISMS covers all BreakPilot digital learning operations"
assert body["status"] == "draft"
assert body["version"] == "1.0"
assert "id" in body
def test_get_scope(self):
"""GET /isms/scope should return the current scope."""
client.post("/isms/scope", json=_scope_payload(), params={"created_by": "admin@bp.de"})
r = client.get("/isms/scope")
assert r.status_code == 200
assert r.json()["scope_statement"] is not None
def test_get_scope_not_found(self):
"""GET /isms/scope should return 404 when no scope exists."""
r = client.get("/isms/scope")
assert r.status_code == 404
def test_update_scope(self):
"""PUT /isms/scope/{id} should update draft scope."""
create = client.post("/isms/scope", json=_scope_payload(), params={"created_by": "admin@bp.de"})
scope_id = create.json()["id"]
r = client.put(
f"/isms/scope/{scope_id}",
json={"scope_statement": "Updated scope statement"},
params={"updated_by": "admin@bp.de"},
)
assert r.status_code == 200
assert r.json()["scope_statement"] == "Updated scope statement"
assert r.json()["version"] == "1.1"
def test_update_scope_not_found(self):
"""PUT /isms/scope/{id} should return 404 for unknown id."""
r = client.put(
"/isms/scope/nonexistent-id",
json={"scope_statement": "x"},
params={"updated_by": "admin@bp.de"},
)
assert r.status_code == 404
def test_create_scope_supersedes_existing(self):
"""Creating a new scope should supersede the old one."""
client.post("/isms/scope", json=_scope_payload(), params={"created_by": "admin@bp.de"})
client.post(
"/isms/scope",
json=_scope_payload(scope_statement="New scope v2"),
params={"created_by": "admin@bp.de"},
)
r = client.get("/isms/scope")
assert r.status_code == 200
assert r.json()["scope_statement"] == "New scope v2"
def test_approve_scope(self):
"""POST /isms/scope/{id}/approve should approve scope."""
create = client.post("/isms/scope", json=_scope_payload(), params={"created_by": "admin@bp.de"})
scope_id = create.json()["id"]
r = client.post(
f"/isms/scope/{scope_id}/approve",
json={
"approved_by": "ceo@breakpilot.de",
"effective_date": "2026-03-01",
"review_date": "2027-03-01",
},
)
assert r.status_code == 200
assert r.json()["status"] == "approved"
assert r.json()["approved_by"] == "ceo@breakpilot.de"
def test_approve_scope_not_found(self):
"""POST /isms/scope/{id}/approve should return 404 for unknown scope."""
r = client.post(
"/isms/scope/fake-id/approve",
json={
"approved_by": "ceo@breakpilot.de",
"effective_date": "2026-03-01",
"review_date": "2027-03-01",
},
)
assert r.status_code == 404
def test_update_approved_scope_rejected(self):
"""PUT on approved scope should return 400."""
create = client.post("/isms/scope", json=_scope_payload(), params={"created_by": "admin@bp.de"})
scope_id = create.json()["id"]
client.post(
f"/isms/scope/{scope_id}/approve",
json={
"approved_by": "ceo@breakpilot.de",
"effective_date": "2026-03-01",
"review_date": "2027-03-01",
},
)
r = client.put(
f"/isms/scope/{scope_id}",
json={"scope_statement": "changed"},
params={"updated_by": "admin@bp.de"},
)
assert r.status_code == 400
assert "approved" in r.json()["detail"].lower()
# =============================================================================
# Test: ISMS Policy CRUD
# =============================================================================
class TestISMSPolicyCRUD:
"""Tests for ISMS Policy CRUD endpoints."""
def test_create_policy(self):
"""POST /isms/policies should create a new policy."""
r = client.post("/isms/policies", json=_policy_payload())
assert r.status_code == 200
body = r.json()
assert body["policy_id"] == "POL-ISMS-001"
assert body["status"] == "draft"
assert body["version"] == "1.0"
def test_list_policies(self):
"""GET /isms/policies should list all policies."""
client.post("/isms/policies", json=_policy_payload("POL-ISMS-001"))
client.post("/isms/policies", json=_policy_payload("POL-ISMS-002", title="Access Control Policy"))
r = client.get("/isms/policies")
assert r.status_code == 200
assert r.json()["total"] == 2
assert len(r.json()["policies"]) == 2
def test_list_policies_filter_by_type(self):
"""GET /isms/policies?policy_type=master should filter."""
client.post("/isms/policies", json=_policy_payload("POL-001"))
client.post("/isms/policies", json=_policy_payload("POL-002", policy_type="operational"))
r = client.get("/isms/policies", params={"policy_type": "master"})
assert r.status_code == 200
assert r.json()["total"] == 1
def test_get_policy_by_id(self):
"""GET /isms/policies/{id} should return a policy by its UUID."""
create = client.post("/isms/policies", json=_policy_payload())
policy_uuid = create.json()["id"]
r = client.get(f"/isms/policies/{policy_uuid}")
assert r.status_code == 200
assert r.json()["policy_id"] == "POL-ISMS-001"
def test_get_policy_by_policy_id(self):
"""GET /isms/policies/{policy_id} should also match the human-readable id."""
client.post("/isms/policies", json=_policy_payload())
r = client.get("/isms/policies/POL-ISMS-001")
assert r.status_code == 200
assert r.json()["title"] == "Information Security Policy"
def test_get_policy_not_found(self):
"""GET /isms/policies/{id} should return 404 for unknown policy."""
r = client.get("/isms/policies/nonexistent")
assert r.status_code == 404
def test_update_policy(self):
"""PUT /isms/policies/{id} should update a draft policy."""
create = client.post("/isms/policies", json=_policy_payload())
pid = create.json()["id"]
r = client.put(
f"/isms/policies/{pid}",
json={"title": "Updated Title"},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 200
assert r.json()["title"] == "Updated Title"
def test_update_policy_not_found(self):
"""PUT /isms/policies/{id} should return 404 for unknown policy."""
r = client.put(
"/isms/policies/fake-id",
json={"title": "x"},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 404
def test_duplicate_policy_id_rejected(self):
"""POST /isms/policies with duplicate policy_id should return 400."""
client.post("/isms/policies", json=_policy_payload("POL-DUP"))
r = client.post("/isms/policies", json=_policy_payload("POL-DUP"))
assert r.status_code == 400
assert "already exists" in r.json()["detail"]
def test_approve_policy(self):
"""POST /isms/policies/{id}/approve should approve a policy."""
create = client.post("/isms/policies", json=_policy_payload())
pid = create.json()["id"]
r = client.post(
f"/isms/policies/{pid}/approve",
json={
"reviewed_by": "cto@breakpilot.de",
"approved_by": "ceo@breakpilot.de",
"effective_date": "2026-03-01",
},
)
assert r.status_code == 200
assert r.json()["status"] == "approved"
assert r.json()["approved_by"] == "ceo@breakpilot.de"
assert r.json()["next_review_date"] is not None
def test_approve_policy_not_found(self):
"""POST /isms/policies/{id}/approve should 404 for unknown policy."""
r = client.post(
"/isms/policies/fake/approve",
json={
"reviewed_by": "x",
"approved_by": "y",
"effective_date": "2026-03-01",
},
)
assert r.status_code == 404
def test_update_approved_policy_bumps_version(self):
"""Updating an approved policy should increment major version and reset to draft."""
create = client.post("/isms/policies", json=_policy_payload())
pid = create.json()["id"]
client.post(
f"/isms/policies/{pid}/approve",
json={
"reviewed_by": "cto@bp.de",
"approved_by": "ceo@bp.de",
"effective_date": "2026-03-01",
},
)
r = client.put(
f"/isms/policies/{pid}",
json={"title": "Updated after approval"},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 200
assert r.json()["version"] == "2.0"
assert r.json()["status"] == "draft"
# =============================================================================
# Test: Security Objectives
# =============================================================================
class TestSecurityObjectivesCRUD:
"""Tests for Security Objectives endpoints."""
def test_create_objective(self):
"""POST /isms/objectives should create a new objective."""
r = client.post("/isms/objectives", json=_objective_payload(), params={"created_by": "iso@bp.de"})
assert r.status_code == 200
body = r.json()
assert body["objective_id"] == "OBJ-2026-001"
assert body["status"] == "active"
def test_list_objectives(self):
"""GET /isms/objectives should list all objectives."""
client.post("/isms/objectives", json=_objective_payload("OBJ-001"), params={"created_by": "a"})
client.post("/isms/objectives", json=_objective_payload("OBJ-002", title="Uptime"), params={"created_by": "a"})
r = client.get("/isms/objectives")
assert r.status_code == 200
assert r.json()["total"] == 2
def test_update_objective_progress(self):
"""PUT /isms/objectives/{id} should update progress."""
create = client.post("/isms/objectives", json=_objective_payload(), params={"created_by": "a"})
oid = create.json()["id"]
r = client.put(
f"/isms/objectives/{oid}",
json={"progress_percentage": 50},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 200
assert r.json()["progress_percentage"] == 50
def test_update_objective_auto_achieved(self):
"""Setting progress to 100% should auto-set status to 'achieved'."""
create = client.post("/isms/objectives", json=_objective_payload(), params={"created_by": "a"})
oid = create.json()["id"]
r = client.put(
f"/isms/objectives/{oid}",
json={"progress_percentage": 100},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 200
assert r.json()["status"] == "achieved"
def test_update_objective_not_found(self):
"""PUT /isms/objectives/{id} should 404 for unknown objective."""
r = client.put(
"/isms/objectives/fake",
json={"progress_percentage": 10},
params={"updated_by": "a"},
)
assert r.status_code == 404
# =============================================================================
# Test: Statement of Applicability (SoA)
# =============================================================================
class TestSoACRUD:
"""Tests for SoA endpoints."""
def test_create_soa_entry(self):
"""POST /isms/soa should create an SoA entry."""
r = client.post("/isms/soa", json=_soa_payload(), params={"created_by": "iso@bp.de"})
assert r.status_code == 200
body = r.json()
assert body["annex_a_control"] == "A.5.1"
assert body["is_applicable"] is True
def test_list_soa_entries(self):
"""GET /isms/soa should list all SoA entries."""
client.post("/isms/soa", json=_soa_payload("A.5.1"), params={"created_by": "a"})
client.post("/isms/soa", json=_soa_payload("A.6.1", is_applicable=False, applicability_justification="N/A"), params={"created_by": "a"})
r = client.get("/isms/soa")
assert r.status_code == 200
assert r.json()["total"] == 2
assert r.json()["applicable_count"] == 1
assert r.json()["not_applicable_count"] == 1
def test_duplicate_soa_control_rejected(self):
"""POST /isms/soa with duplicate annex_a_control should return 400."""
client.post("/isms/soa", json=_soa_payload("A.5.1"), params={"created_by": "a"})
r = client.post("/isms/soa", json=_soa_payload("A.5.1"), params={"created_by": "a"})
assert r.status_code == 400
assert "already exists" in r.json()["detail"]
def test_update_soa_entry(self):
"""PUT /isms/soa/{id} should update an SoA entry."""
create = client.post("/isms/soa", json=_soa_payload(), params={"created_by": "a"})
eid = create.json()["id"]
r = client.put(
f"/isms/soa/{eid}",
json={"implementation_status": "in_progress"},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 200
assert r.json()["implementation_status"] == "in_progress"
assert r.json()["version"] == "1.1"
def test_update_soa_not_found(self):
"""PUT /isms/soa/{id} should 404 for unknown entry."""
r = client.put(
"/isms/soa/fake",
json={"implementation_status": "implemented"},
params={"updated_by": "a"},
)
assert r.status_code == 404
def test_approve_soa_entry(self):
"""POST /isms/soa/{id}/approve should approve an SoA entry."""
create = client.post("/isms/soa", json=_soa_payload(), params={"created_by": "a"})
eid = create.json()["id"]
r = client.post(
f"/isms/soa/{eid}/approve",
json={"reviewed_by": "cto@bp.de", "approved_by": "ceo@bp.de"},
)
assert r.status_code == 200
assert r.json()["approved_by"] == "ceo@bp.de"
# =============================================================================
# Test: Audit Findings
# =============================================================================
class TestAuditFindingsCRUD:
"""Tests for Audit Finding endpoints."""
def test_create_finding(self):
"""POST /isms/findings should create a finding with auto-generated ID."""
r = client.post("/isms/findings", json=_finding_payload())
assert r.status_code == 200
body = r.json()
assert body["finding_id"].startswith("FIND-")
assert body["status"] == "open"
def test_list_findings(self):
"""GET /isms/findings should list all findings."""
client.post("/isms/findings", json=_finding_payload())
client.post("/isms/findings", json=_finding_payload(finding_type="major", title="Major finding"))
r = client.get("/isms/findings")
assert r.status_code == 200
assert r.json()["total"] == 2
assert r.json()["major_count"] == 1
assert r.json()["minor_count"] == 1
def test_update_finding(self):
"""PUT /isms/findings/{id} should update a finding."""
create = client.post("/isms/findings", json=_finding_payload())
fid = create.json()["id"]
r = client.put(
f"/isms/findings/{fid}",
json={"root_cause": "Missing documentation process"},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 200
assert r.json()["root_cause"] == "Missing documentation process"
def test_update_finding_not_found(self):
"""PUT /isms/findings/{id} should 404 for unknown finding."""
r = client.put(
"/isms/findings/fake",
json={"root_cause": "x"},
params={"updated_by": "a"},
)
assert r.status_code == 404
def test_close_finding_no_capas(self):
"""POST /isms/findings/{id}/close should succeed if no CAPAs exist."""
create = client.post("/isms/findings", json=_finding_payload())
fid = create.json()["id"]
r = client.post(
f"/isms/findings/{fid}/close",
json={
"closure_notes": "Verified corrected",
"closed_by": "auditor@cert.de",
"verification_method": "Document review",
"verification_evidence": "Updated schedule approved",
},
)
assert r.status_code == 200
assert r.json()["status"] == "closed"
def test_close_finding_not_found(self):
"""POST /isms/findings/{id}/close should 404 for unknown finding."""
r = client.post(
"/isms/findings/fake/close",
json={
"closure_notes": "x",
"closed_by": "a",
"verification_method": "x",
"verification_evidence": "x",
},
)
assert r.status_code == 404
# =============================================================================
# Test: Management Reviews
# =============================================================================
class TestManagementReviewCRUD:
"""Tests for Management Review endpoints."""
def test_create_management_review(self):
"""POST /isms/management-reviews should create a review."""
r = client.post(
"/isms/management-reviews",
json=_mgmt_review_payload(),
params={"created_by": "iso@bp.de"},
)
assert r.status_code == 200
body = r.json()
assert body["review_id"].startswith("MR-")
assert body["status"] == "draft"
def test_list_management_reviews(self):
"""GET /isms/management-reviews should list reviews."""
client.post("/isms/management-reviews", json=_mgmt_review_payload(), params={"created_by": "a"})
r = client.get("/isms/management-reviews")
assert r.status_code == 200
assert r.json()["total"] == 1
def test_get_management_review(self):
"""GET /isms/management-reviews/{id} should return a review."""
create = client.post("/isms/management-reviews", json=_mgmt_review_payload(), params={"created_by": "a"})
rid = create.json()["id"]
r = client.get(f"/isms/management-reviews/{rid}")
assert r.status_code == 200
assert r.json()["chairperson"] == "ceo@breakpilot.de"
def test_get_management_review_not_found(self):
"""GET /isms/management-reviews/{id} should 404 for unknown review."""
r = client.get("/isms/management-reviews/fake")
assert r.status_code == 404
def test_update_management_review(self):
"""PUT /isms/management-reviews/{id} should update a review."""
create = client.post("/isms/management-reviews", json=_mgmt_review_payload(), params={"created_by": "a"})
rid = create.json()["id"]
r = client.put(
f"/isms/management-reviews/{rid}",
json={"input_previous_actions": "All actions completed", "status": "conducted"},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 200
assert r.json()["input_previous_actions"] == "All actions completed"
def test_approve_management_review(self):
"""POST /isms/management-reviews/{id}/approve should approve."""
create = client.post("/isms/management-reviews", json=_mgmt_review_payload(), params={"created_by": "a"})
rid = create.json()["id"]
r = client.post(
f"/isms/management-reviews/{rid}/approve",
json={
"approved_by": "ceo@bp.de",
"next_review_date": "2026-07-01",
},
)
assert r.status_code == 200
assert r.json()["status"] == "approved"
assert r.json()["approved_by"] == "ceo@bp.de"
# =============================================================================
# Test: Internal Audits
# =============================================================================
class TestInternalAuditCRUD:
"""Tests for Internal Audit endpoints."""
def test_create_internal_audit(self):
"""POST /isms/internal-audits should create an audit."""
r = client.post(
"/isms/internal-audits",
json=_internal_audit_payload(),
params={"created_by": "iso@bp.de"},
)
assert r.status_code == 200
body = r.json()
assert body["audit_id"].startswith("IA-")
assert body["status"] == "planned"
def test_list_internal_audits(self):
"""GET /isms/internal-audits should list audits."""
client.post("/isms/internal-audits", json=_internal_audit_payload(), params={"created_by": "a"})
r = client.get("/isms/internal-audits")
assert r.status_code == 200
assert r.json()["total"] == 1
def test_update_internal_audit(self):
"""PUT /isms/internal-audits/{id} should update an audit."""
create = client.post("/isms/internal-audits", json=_internal_audit_payload(), params={"created_by": "a"})
aid = create.json()["id"]
r = client.put(
f"/isms/internal-audits/{aid}",
json={"status": "in_progress", "actual_start_date": "2026-03-01"},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 200
assert r.json()["status"] == "in_progress"
def test_update_internal_audit_not_found(self):
"""PUT /isms/internal-audits/{id} should 404 for unknown audit."""
r = client.put(
"/isms/internal-audits/fake",
json={"status": "in_progress"},
params={"updated_by": "a"},
)
assert r.status_code == 404
def test_complete_internal_audit(self):
"""POST /isms/internal-audits/{id}/complete should complete an audit."""
create = client.post("/isms/internal-audits", json=_internal_audit_payload(), params={"created_by": "a"})
aid = create.json()["id"]
r = client.post(
f"/isms/internal-audits/{aid}/complete",
json={
"audit_conclusion": "Overall conforming with minor observations",
"overall_assessment": "conforming",
"follow_up_audit_required": False,
},
params={"completed_by": "auditor@bp.de"},
)
assert r.status_code == 200
assert r.json()["status"] == "completed"
assert r.json()["follow_up_audit_required"] is False
def test_complete_internal_audit_not_found(self):
"""POST /isms/internal-audits/{id}/complete should 404 for unknown audit."""
r = client.post(
"/isms/internal-audits/fake/complete",
json={
"audit_conclusion": "x",
"overall_assessment": "conforming",
"follow_up_audit_required": False,
},
params={"completed_by": "a"},
)
assert r.status_code == 404
# =============================================================================
# Test: Readiness Check
# =============================================================================
class TestReadinessCheck:
"""Tests for the ISMS Readiness Check endpoint."""
def test_readiness_check_empty_isms(self):
"""POST /isms/readiness-check on empty DB should show not_ready."""
r = client.post("/isms/readiness-check", json={"triggered_by": "test"})
assert r.status_code == 200
body = r.json()
assert body["certification_possible"] is False
assert body["overall_status"] == "not_ready"
assert len(body["potential_majors"]) > 0
def test_readiness_check_latest_not_found(self):
"""GET /isms/readiness-check/latest should 404 when no check has run."""
r = client.get("/isms/readiness-check/latest")
assert r.status_code == 404
def test_readiness_check_latest_returns_most_recent(self):
"""GET /isms/readiness-check/latest should return last check."""
client.post("/isms/readiness-check", json={"triggered_by": "first"})
client.post("/isms/readiness-check", json={"triggered_by": "second"})
r = client.get("/isms/readiness-check/latest")
assert r.status_code == 200
assert r.json()["triggered_by"] == "second"
# =============================================================================
# Test: Overview / Dashboard
# =============================================================================
class TestOverviewDashboard:
"""Tests for the ISO 27001 overview endpoint."""
def test_overview_empty_isms(self):
"""GET /isms/overview on empty DB should return not_started with 0% readiness."""
r = client.get("/isms/overview")
assert r.status_code == 200
body = r.json()
assert body["overall_status"] == "not_started"
assert body["certification_readiness"] == 0.0
assert body["scope_approved"] is False
assert body["open_major_findings"] == 0
assert body["policies_count"] == 0
# All chapters should show 0% on empty DB
for ch in body["chapters"]:
assert ch["completion_percentage"] == 0.0, f"Chapter {ch['chapter']} should be 0% on empty DB"
def test_overview_with_data(self):
"""GET /isms/overview should reflect created data."""
# Create and approve a scope
scope = client.post("/isms/scope", json=_scope_payload(), params={"created_by": "a"})
client.post(
f"/isms/scope/{scope.json()['id']}/approve",
json={"approved_by": "ceo@bp.de", "effective_date": "2026-01-01", "review_date": "2027-01-01"},
)
# Create a policy
client.post("/isms/policies", json=_policy_payload())
# Create an objective
client.post("/isms/objectives", json=_objective_payload(), params={"created_by": "a"})
r = client.get("/isms/overview")
assert r.status_code == 200
body = r.json()
assert body["scope_approved"] is True
assert body["policies_count"] == 1
assert body["objectives_count"] == 1
# =============================================================================
# Test: Audit Trail
# =============================================================================
class TestAuditTrail:
"""Tests for the Audit Trail endpoint."""
def test_audit_trail_records_actions(self):
"""Creating entities should generate audit trail entries."""
client.post("/isms/scope", json=_scope_payload(), params={"created_by": "admin@bp.de"})
client.post("/isms/policies", json=_policy_payload())
r = client.get("/isms/audit-trail")
assert r.status_code == 200
assert r.json()["total"] >= 2
def test_audit_trail_filter_by_entity_type(self):
"""GET /isms/audit-trail?entity_type=isms_policy should filter."""
client.post("/isms/scope", json=_scope_payload(), params={"created_by": "a"})
client.post("/isms/policies", json=_policy_payload())
r = client.get("/isms/audit-trail", params={"entity_type": "isms_policy"})
assert r.status_code == 200
for entry in r.json()["entries"]:
assert entry["entity_type"] == "isms_policy"
def test_audit_trail_pagination(self):
"""GET /isms/audit-trail should support pagination."""
# Create several entries
for i in range(5):
client.post("/isms/policies", json=_policy_payload(f"POL-PAGI-{i:03d}"))
r = client.get("/isms/audit-trail", params={"page": 1, "page_size": 2})
assert r.status_code == 200
assert len(r.json()["entries"]) == 2
assert r.json()["pagination"]["has_next"] is True