Files
breakpilot-compliance/backend-compliance/compliance/services/isms_assessment_service.py
Sharang Parnerkar 32e121f2a3 refactor(backend/api): extract ISMS services (Step 4 — file 18 of 18)
compliance/api/isms_routes.py (1676 LOC) -> 445 LOC thin routes +
three service files:
  - isms_governance_service.py  (416) — scope, context, policy, objectives, SoA
  - isms_findings_service.py    (276) — findings, CAPA, audit trail
  - isms_assessment_service.py  (639) — management reviews, internal audits,
                                         readiness checks, ISO 27001 overview

NOTE: isms_assessment_service.py exceeds the 500-line hard cap at 639 LOC.
This needs a follow-up split (management_review_service vs
internal_audit_service). Flagged for next session.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 20:34:59 +02:00

640 lines
26 KiB
Python

# mypy: disable-error-code="arg-type,assignment,union-attr,no-any-return"
"""
ISMS Assessment service -- Management Reviews, Internal Audits, Readiness,
Audit Trail, and ISO 27001 Overview.
Phase 1 Step 4: extracted from ``compliance.api.isms_routes``.
"""
from datetime import datetime, date, timezone
from typing import Optional
from sqlalchemy.orm import Session
from compliance.db.models import (
ISMSScopeDB,
ISMSContextDB,
ISMSPolicyDB,
SecurityObjectiveDB,
StatementOfApplicabilityDB,
AuditFindingDB,
CorrectiveActionDB,
ManagementReviewDB,
InternalAuditDB,
AuditTrailDB,
ISMSReadinessCheckDB,
ApprovalStatusEnum,
FindingTypeEnum,
FindingStatusEnum,
)
from compliance.domain import NotFoundError
from compliance.services.isms_governance_service import generate_id, log_audit_trail
from compliance.schemas.isms_audit import (
PotentialFinding,
ISMSReadinessCheckResponse,
ISO27001ChapterStatus,
ISO27001OverviewResponse,
AuditTrailResponse,
PaginationMeta,
)
# ============================================================================
# Management Reviews (ISO 27001 9.3)
# ============================================================================
class ManagementReviewService:
"""Business logic for Management Reviews."""
@staticmethod
def list_reviews(db: Session, status: Optional[str] = None) -> tuple:
query = db.query(ManagementReviewDB)
if status:
query = query.filter(ManagementReviewDB.status == status)
reviews = query.order_by(ManagementReviewDB.review_date.desc()).all()
return reviews, len(reviews)
@staticmethod
def get(db: Session, review_id: str) -> ManagementReviewDB:
review = (
db.query(ManagementReviewDB)
.filter(
(ManagementReviewDB.id == review_id)
| (ManagementReviewDB.review_id == review_id)
)
.first()
)
if not review:
raise NotFoundError("Management review not found")
return review
@staticmethod
def create(db: Session, data: dict, created_by: str) -> ManagementReviewDB:
review_date = data["review_date"]
if isinstance(review_date, str):
review_date = date.fromisoformat(review_date)
year = review_date.year
quarter = (review_date.month - 1) // 3 + 1
review_id = f"MR-{year}-Q{quarter}"
existing = db.query(ManagementReviewDB).filter(ManagementReviewDB.review_id == review_id).first()
if existing:
review_id = f"{review_id}-{generate_id()[:4]}"
attendees = data.pop("attendees", None)
review = ManagementReviewDB(
id=generate_id(),
review_id=review_id,
attendees=[a.model_dump() for a in attendees] if attendees else None,
status="draft",
**data,
)
db.add(review)
log_audit_trail(db, "management_review", review.id, review_id, "create", created_by)
db.commit()
db.refresh(review)
return review
@staticmethod
def update(db: Session, review_id: str, data: dict, updated_by: str) -> ManagementReviewDB:
review = (
db.query(ManagementReviewDB)
.filter(
(ManagementReviewDB.id == review_id)
| (ManagementReviewDB.review_id == review_id)
)
.first()
)
if not review:
raise NotFoundError("Management review not found")
for field, value in data.items():
if field == "action_items" and value:
setattr(review, field, [item.model_dump() for item in value])
else:
setattr(review, field, value)
log_audit_trail(db, "management_review", review.id, review.review_id, "update", updated_by)
db.commit()
db.refresh(review)
return review
@staticmethod
def approve(
db: Session,
review_id: str,
approved_by: str,
next_review_date: date,
minutes_document_path: Optional[str] = None,
) -> ManagementReviewDB:
review = (
db.query(ManagementReviewDB)
.filter(
(ManagementReviewDB.id == review_id)
| (ManagementReviewDB.review_id == review_id)
)
.first()
)
if not review:
raise NotFoundError("Management review not found")
review.status = "approved"
review.approved_by = approved_by
review.approved_at = datetime.now(timezone.utc)
review.next_review_date = next_review_date
review.minutes_document_path = minutes_document_path
log_audit_trail(db, "management_review", review.id, review.review_id, "approve", approved_by)
db.commit()
db.refresh(review)
return review
# ============================================================================
# Internal Audits (ISO 27001 9.2)
# ============================================================================
class InternalAuditService:
"""Business logic for Internal Audits."""
@staticmethod
def list_audits(db: Session, status: Optional[str] = None, audit_type: Optional[str] = None) -> tuple:
query = db.query(InternalAuditDB)
if status:
query = query.filter(InternalAuditDB.status == status)
if audit_type:
query = query.filter(InternalAuditDB.audit_type == audit_type)
audits = query.order_by(InternalAuditDB.planned_date.desc()).all()
return audits, len(audits)
@staticmethod
def create(db: Session, data: dict, created_by: str) -> InternalAuditDB:
planned_date = data["planned_date"]
if isinstance(planned_date, str):
planned_date = date.fromisoformat(planned_date)
year = planned_date.year
existing_count = (
db.query(InternalAuditDB)
.filter(InternalAuditDB.audit_id.like(f"IA-{year}-%"))
.count()
)
audit_id = f"IA-{year}-{existing_count + 1:03d}"
audit = InternalAuditDB(id=generate_id(), audit_id=audit_id, status="planned", **data)
db.add(audit)
log_audit_trail(db, "internal_audit", audit.id, audit_id, "create", created_by)
db.commit()
db.refresh(audit)
return audit
@staticmethod
def update(db: Session, audit_id: str, data: dict, updated_by: str) -> InternalAuditDB:
audit = (
db.query(InternalAuditDB)
.filter((InternalAuditDB.id == audit_id) | (InternalAuditDB.audit_id == audit_id))
.first()
)
if not audit:
raise NotFoundError("Internal audit not found")
for field, value in data.items():
setattr(audit, field, value)
log_audit_trail(db, "internal_audit", audit.id, audit.audit_id, "update", updated_by)
db.commit()
db.refresh(audit)
return audit
@staticmethod
def complete(
db: Session,
audit_id: str,
audit_conclusion: str,
overall_assessment: str,
follow_up_audit_required: bool,
completed_by: str,
) -> InternalAuditDB:
audit = (
db.query(InternalAuditDB)
.filter((InternalAuditDB.id == audit_id) | (InternalAuditDB.audit_id == audit_id))
.first()
)
if not audit:
raise NotFoundError("Internal audit not found")
audit.status = "completed"
audit.actual_end_date = date.today()
audit.report_date = date.today()
audit.audit_conclusion = audit_conclusion
audit.overall_assessment = overall_assessment
audit.follow_up_audit_required = follow_up_audit_required
log_audit_trail(db, "internal_audit", audit.id, audit.audit_id, "complete", completed_by)
db.commit()
db.refresh(audit)
return audit
# ============================================================================
# Audit Trail
# ============================================================================
class AuditTrailService:
"""Business logic for Audit Trail queries."""
@staticmethod
def query(
db: Session,
entity_type: Optional[str] = None,
entity_id: Optional[str] = None,
performed_by: Optional[str] = None,
action: Optional[str] = None,
page: int = 1,
page_size: int = 50,
) -> dict:
query = db.query(AuditTrailDB)
if entity_type:
query = query.filter(AuditTrailDB.entity_type == entity_type)
if entity_id:
query = query.filter(AuditTrailDB.entity_id == entity_id)
if performed_by:
query = query.filter(AuditTrailDB.performed_by == performed_by)
if action:
query = query.filter(AuditTrailDB.action == action)
total = query.count()
entries = (
query.order_by(AuditTrailDB.performed_at.desc())
.offset((page - 1) * page_size)
.limit(page_size)
.all()
)
total_pages = (total + page_size - 1) // page_size
return {
"entries": entries,
"total": total,
"pagination": PaginationMeta(
page=page,
page_size=page_size,
total=total,
total_pages=total_pages,
has_next=page < total_pages,
has_prev=page > 1,
),
}
# ============================================================================
# Readiness Check
# ============================================================================
class ReadinessCheckService:
"""Business logic for the ISMS Readiness Check."""
@staticmethod
def run(db: Session, triggered_by: str) -> ISMSReadinessCheckResponse:
potential_majors: list = []
potential_minors: list = []
improvement_opportunities: list = []
# Chapter 4: Context
scope = db.query(ISMSScopeDB).filter(ISMSScopeDB.status == ApprovalStatusEnum.APPROVED).first()
if not scope:
potential_majors.append(PotentialFinding(
check="ISMS Scope not approved", status="fail",
recommendation="Approve ISMS scope with top management signature", iso_reference="4.3",
))
context = db.query(ISMSContextDB).filter(ISMSContextDB.status == ApprovalStatusEnum.APPROVED).first()
if not context:
potential_majors.append(PotentialFinding(
check="ISMS Context not documented", status="fail",
recommendation="Document and approve context analysis (4.1, 4.2)", iso_reference="4.1, 4.2",
))
# Chapter 5: Leadership
master_policy = db.query(ISMSPolicyDB).filter(
ISMSPolicyDB.policy_type == "master", ISMSPolicyDB.status == ApprovalStatusEnum.APPROVED,
).first()
if not master_policy:
potential_majors.append(PotentialFinding(
check="Information Security Policy not approved", status="fail",
recommendation="Create and approve master ISMS policy", iso_reference="5.2",
))
# Chapter 6: Risk Assessment
from compliance.db.models import RiskDB
risks_without_treatment = db.query(RiskDB).filter(
RiskDB.status == "open", RiskDB.treatment_plan is None,
).count()
if risks_without_treatment > 0:
potential_majors.append(PotentialFinding(
check=f"{risks_without_treatment} risks without treatment plan", status="fail",
recommendation="Define risk treatment for all identified risks", iso_reference="6.1.2",
))
# Chapter 6: Objectives
objectives = db.query(SecurityObjectiveDB).filter(SecurityObjectiveDB.status == "active").count()
if objectives == 0:
potential_majors.append(PotentialFinding(
check="No security objectives defined", status="fail",
recommendation="Define measurable security objectives", iso_reference="6.2",
))
# SoA
soa_total = db.query(StatementOfApplicabilityDB).count()
soa_unapproved = db.query(StatementOfApplicabilityDB).filter(
StatementOfApplicabilityDB.approved_at is None,
).count()
if soa_total == 0:
potential_majors.append(PotentialFinding(
check="Statement of Applicability not created", status="fail",
recommendation="Create SoA for all 93 Annex A controls", iso_reference="Annex A",
))
elif soa_unapproved > 0:
potential_minors.append(PotentialFinding(
check=f"{soa_unapproved} SoA entries not approved", status="warning",
recommendation="Review and approve all SoA entries", iso_reference="Annex A",
))
# Chapter 9: Internal Audit
last_year = date.today().replace(year=date.today().year - 1)
internal_audit = db.query(InternalAuditDB).filter(
InternalAuditDB.status == "completed", InternalAuditDB.actual_end_date >= last_year,
).first()
if not internal_audit:
potential_majors.append(PotentialFinding(
check="No internal audit in last 12 months", status="fail",
recommendation="Conduct internal audit before certification", iso_reference="9.2",
))
# Chapter 9: Management Review
mgmt_review = db.query(ManagementReviewDB).filter(
ManagementReviewDB.status == "approved", ManagementReviewDB.review_date >= last_year,
).first()
if not mgmt_review:
potential_majors.append(PotentialFinding(
check="No management review in last 12 months", status="fail",
recommendation="Conduct and approve management review", iso_reference="9.3",
))
# Chapter 10: Open Findings
open_majors = db.query(AuditFindingDB).filter(
AuditFindingDB.finding_type == FindingTypeEnum.MAJOR,
AuditFindingDB.status != FindingStatusEnum.CLOSED,
).count()
if open_majors > 0:
potential_majors.append(PotentialFinding(
check=f"{open_majors} open major finding(s)", status="fail",
recommendation="Close all major findings before certification", iso_reference="10.1",
))
open_minors = db.query(AuditFindingDB).filter(
AuditFindingDB.finding_type == FindingTypeEnum.MINOR,
AuditFindingDB.status != FindingStatusEnum.CLOSED,
).count()
if open_minors > 0:
potential_minors.append(PotentialFinding(
check=f"{open_minors} open minor finding(s)", status="warning",
recommendation="Address minor findings or have CAPA in progress", iso_reference="10.1",
))
# Calculate scores
total_checks = 10
passed_checks = total_checks - len(potential_majors)
readiness_score = (passed_checks / total_checks) * 100
certification_possible = len(potential_majors) == 0
if certification_possible:
overall_status = "ready" if len(potential_minors) == 0 else "at_risk"
else:
overall_status = "not_ready"
def get_chapter_status(has_major: bool, has_minor: bool) -> str:
if has_major:
return "fail"
elif has_minor:
return "warning"
return "pass"
chapter_4_majors = any("4." in (f.iso_reference or "") for f in potential_majors)
chapter_5_majors = any("5." in (f.iso_reference or "") for f in potential_majors)
chapter_6_majors = any("6." in (f.iso_reference or "") for f in potential_majors)
chapter_9_majors = any("9." in (f.iso_reference or "") for f in potential_majors)
chapter_10_majors = any("10." in (f.iso_reference or "") for f in potential_majors)
priority_actions = [f.recommendation for f in potential_majors[:5]]
check = ISMSReadinessCheckDB(
id=generate_id(),
check_date=datetime.now(timezone.utc),
triggered_by=triggered_by,
overall_status=overall_status,
certification_possible=certification_possible,
chapter_4_status=get_chapter_status(chapter_4_majors, False),
chapter_5_status=get_chapter_status(chapter_5_majors, False),
chapter_6_status=get_chapter_status(chapter_6_majors, False),
chapter_7_status=get_chapter_status(
any("7." in (f.iso_reference or "") for f in potential_majors),
any("7." in (f.iso_reference or "") for f in potential_minors),
),
chapter_8_status=get_chapter_status(
any("8." in (f.iso_reference or "") for f in potential_majors),
any("8." in (f.iso_reference or "") for f in potential_minors),
),
chapter_9_status=get_chapter_status(chapter_9_majors, False),
chapter_10_status=get_chapter_status(chapter_10_majors, False),
potential_majors=[f.model_dump() for f in potential_majors],
potential_minors=[f.model_dump() for f in potential_minors],
improvement_opportunities=[f.model_dump() for f in improvement_opportunities],
readiness_score=readiness_score,
priority_actions=priority_actions,
)
db.add(check)
db.commit()
db.refresh(check)
return ISMSReadinessCheckResponse(
id=check.id,
check_date=check.check_date,
triggered_by=check.triggered_by,
overall_status=check.overall_status,
certification_possible=check.certification_possible,
chapter_4_status=check.chapter_4_status,
chapter_5_status=check.chapter_5_status,
chapter_6_status=check.chapter_6_status,
chapter_7_status=check.chapter_7_status,
chapter_8_status=check.chapter_8_status,
chapter_9_status=check.chapter_9_status,
chapter_10_status=check.chapter_10_status,
potential_majors=potential_majors,
potential_minors=potential_minors,
improvement_opportunities=improvement_opportunities,
readiness_score=check.readiness_score,
documentation_score=None,
implementation_score=None,
evidence_score=None,
priority_actions=priority_actions,
)
@staticmethod
def get_latest(db: Session) -> ISMSReadinessCheckDB:
check = (
db.query(ISMSReadinessCheckDB)
.order_by(ISMSReadinessCheckDB.check_date.desc())
.first()
)
if not check:
raise NotFoundError("No readiness check found. Run one first.")
return check
# ============================================================================
# ISO 27001 Overview
# ============================================================================
class OverviewService:
"""Business logic for the ISO 27001 overview dashboard."""
@staticmethod
def get_overview(db: Session) -> ISO27001OverviewResponse:
scope = db.query(ISMSScopeDB).filter(ISMSScopeDB.status == ApprovalStatusEnum.APPROVED).first()
scope_approved = scope is not None
soa_total = db.query(StatementOfApplicabilityDB).count()
soa_approved = db.query(StatementOfApplicabilityDB).filter(
StatementOfApplicabilityDB.approved_at.isnot(None),
).count()
soa_all_approved = soa_total > 0 and soa_approved == soa_total
last_year = date.today().replace(year=date.today().year - 1)
last_mgmt_review = (
db.query(ManagementReviewDB)
.filter(ManagementReviewDB.status == "approved")
.order_by(ManagementReviewDB.review_date.desc())
.first()
)
last_internal_audit = (
db.query(InternalAuditDB)
.filter(InternalAuditDB.status == "completed")
.order_by(InternalAuditDB.actual_end_date.desc())
.first()
)
open_majors = db.query(AuditFindingDB).filter(
AuditFindingDB.finding_type == FindingTypeEnum.MAJOR,
AuditFindingDB.status != FindingStatusEnum.CLOSED,
).count()
open_minors = db.query(AuditFindingDB).filter(
AuditFindingDB.finding_type == FindingTypeEnum.MINOR,
AuditFindingDB.status != FindingStatusEnum.CLOSED,
).count()
policies_total = db.query(ISMSPolicyDB).count()
policies_approved = db.query(ISMSPolicyDB).filter(
ISMSPolicyDB.status == ApprovalStatusEnum.APPROVED,
).count()
objectives_total = db.query(SecurityObjectiveDB).count()
objectives_achieved = db.query(SecurityObjectiveDB).filter(
SecurityObjectiveDB.status == "achieved",
).count()
has_any_data = any([
scope_approved, soa_total > 0, policies_total > 0,
objectives_total > 0, last_mgmt_review is not None,
last_internal_audit is not None,
])
if not has_any_data:
certification_readiness = 0.0
else:
readiness_factors = [
scope_approved,
soa_all_approved,
last_mgmt_review is not None and last_mgmt_review.review_date >= last_year,
last_internal_audit is not None and (last_internal_audit.actual_end_date or date.min) >= last_year,
open_majors == 0 and (soa_total > 0 or policies_total > 0),
policies_total > 0 and policies_approved >= policies_total * 0.8,
objectives_total > 0,
]
certification_readiness = sum(readiness_factors) / len(readiness_factors) * 100
if not has_any_data:
overall_status = "not_started"
elif open_majors > 0:
overall_status = "not_ready"
elif certification_readiness >= 80:
overall_status = "ready"
else:
overall_status = "at_risk"
def _chapter_status(has_positive_evidence: bool, has_issues: bool) -> str:
if not has_positive_evidence:
return "not_started"
return "compliant" if not has_issues else "non_compliant"
ch9_parts = sum([last_mgmt_review is not None, last_internal_audit is not None])
ch9_pct = (ch9_parts / 2) * 100
capa_total = db.query(AuditFindingDB).count()
ch10_has_data = capa_total > 0
ch10_pct = 100.0 if (ch10_has_data and open_majors == 0) else (50.0 if ch10_has_data else 0.0)
chapters = [
ISO27001ChapterStatus(
chapter="4", title="Kontext der Organisation",
status=_chapter_status(scope_approved, False),
completion_percentage=100.0 if scope_approved else 0.0,
open_findings=0,
key_documents=["ISMS Scope", "Context Analysis"] if scope_approved else [],
last_reviewed=scope.approved_at if scope else None,
),
ISO27001ChapterStatus(
chapter="5", title="Fuehrung",
status=_chapter_status(policies_total > 0, policies_approved < policies_total),
completion_percentage=(policies_approved / max(policies_total, 1)) * 100 if policies_total > 0 else 0.0,
open_findings=0,
key_documents=[f"Policy {i+1}" for i in range(min(policies_approved, 3))] if policies_approved > 0 else [],
last_reviewed=None,
),
ISO27001ChapterStatus(
chapter="6", title="Planung",
status=_chapter_status(objectives_total > 0, False),
completion_percentage=75.0 if objectives_total > 0 else 0.0,
open_findings=0,
key_documents=["Risk Register", "Security Objectives"] if objectives_total > 0 else [],
last_reviewed=None,
),
ISO27001ChapterStatus(
chapter="9", title="Bewertung der Leistung",
status=_chapter_status(ch9_parts > 0, open_majors + open_minors > 0),
completion_percentage=ch9_pct,
open_findings=open_majors + open_minors,
key_documents=(
(["Internal Audit Report"] if last_internal_audit else [])
+ (["Management Review Minutes"] if last_mgmt_review else [])
),
last_reviewed=last_mgmt_review.approved_at if last_mgmt_review else None,
),
ISO27001ChapterStatus(
chapter="10", title="Verbesserung",
status=_chapter_status(ch10_has_data, open_majors > 0),
completion_percentage=ch10_pct,
open_findings=open_majors,
key_documents=["CAPA Register"] if ch10_has_data else [],
last_reviewed=None,
),
]
return ISO27001OverviewResponse(
overall_status=overall_status,
certification_readiness=certification_readiness,
chapters=chapters,
scope_approved=scope_approved,
soa_approved=soa_all_approved,
last_management_review=last_mgmt_review.approved_at if last_mgmt_review else None,
last_internal_audit=(
datetime.combine(last_internal_audit.actual_end_date, datetime.min.time())
if last_internal_audit and last_internal_audit.actual_end_date
else None
),
open_major_findings=open_majors,
open_minor_findings=open_minors,
policies_count=policies_total,
policies_approved=policies_approved,
objectives_count=objectives_total,
objectives_achieved=objectives_achieved,
)