# mypy: disable-error-code="arg-type,assignment,union-attr,no-any-return,attr-defined,index,call-overload,type-arg,var-annotated,misc,call-arg,return-value" """ ISMS Readiness & Overview service -- Readiness Check and ISO 27001 Overview. Phase 1 Step 4: extracted from ``compliance.api.isms_routes`` via ``compliance.services.isms_assessment_service``. """ from datetime import datetime, date, timezone from typing import List from sqlalchemy.orm import Session from compliance.db.models import ( ISMSScopeDB, ISMSContextDB, ISMSPolicyDB, SecurityObjectiveDB, StatementOfApplicabilityDB, AuditFindingDB, ManagementReviewDB, InternalAuditDB, ISMSReadinessCheckDB, ApprovalStatusEnum, FindingTypeEnum, FindingStatusEnum, ) from compliance.domain import NotFoundError from compliance.services.isms_governance_service import generate_id from compliance.schemas.isms_audit import ( PotentialFinding, ISMSReadinessCheckResponse, ISO27001ChapterStatus, ISO27001OverviewResponse, ) # ============================================================================ # Readiness Check # ============================================================================ def _get_chapter_status(has_major: bool, has_minor: bool) -> str: if has_major: return "fail" elif has_minor: return "warning" return "pass" class ReadinessCheckService: """Business logic for the ISMS Readiness Check.""" @staticmethod def run(db: Session, triggered_by: str) -> ISMSReadinessCheckResponse: potential_majors: List[PotentialFinding] = [] potential_minors: List[PotentialFinding] = [] improvement_opportunities: List[PotentialFinding] = [] # Chapter 4: Context scope = db.query(ISMSScopeDB).filter(ISMSScopeDB.status == ApprovalStatusEnum.APPROVED).first() if not scope: potential_majors.append(PotentialFinding( check="ISMS Scope not approved", status="fail", recommendation="Approve ISMS scope with top management signature", iso_reference="4.3", )) context = db.query(ISMSContextDB).filter(ISMSContextDB.status == ApprovalStatusEnum.APPROVED).first() if not context: potential_majors.append(PotentialFinding( check="ISMS Context not documented", status="fail", recommendation="Document and approve context analysis (4.1, 4.2)", iso_reference="4.1, 4.2", )) # Chapter 5: Leadership master_policy = db.query(ISMSPolicyDB).filter( ISMSPolicyDB.policy_type == "master", ISMSPolicyDB.status == ApprovalStatusEnum.APPROVED, ).first() if not master_policy: potential_majors.append(PotentialFinding( check="Information Security Policy not approved", status="fail", recommendation="Create and approve master ISMS policy", iso_reference="5.2", )) # Chapter 6: Risk Assessment from compliance.db.models import RiskDB risks_without_treatment = db.query(RiskDB).filter( RiskDB.status == "open", RiskDB.treatment_plan is None, ).count() if risks_without_treatment > 0: potential_majors.append(PotentialFinding( check=f"{risks_without_treatment} risks without treatment plan", status="fail", recommendation="Define risk treatment for all identified risks", iso_reference="6.1.2", )) # Chapter 6: Objectives objectives = db.query(SecurityObjectiveDB).filter(SecurityObjectiveDB.status == "active").count() if objectives == 0: potential_majors.append(PotentialFinding( check="No security objectives defined", status="fail", recommendation="Define measurable security objectives", iso_reference="6.2", )) # SoA soa_total = db.query(StatementOfApplicabilityDB).count() soa_unapproved = db.query(StatementOfApplicabilityDB).filter( StatementOfApplicabilityDB.approved_at is None, ).count() if soa_total == 0: potential_majors.append(PotentialFinding( check="Statement of Applicability not created", status="fail", recommendation="Create SoA for all 93 Annex A controls", iso_reference="Annex A", )) elif soa_unapproved > 0: potential_minors.append(PotentialFinding( check=f"{soa_unapproved} SoA entries not approved", status="warning", recommendation="Review and approve all SoA entries", iso_reference="Annex A", )) # Chapter 9: Internal Audit last_year = date.today().replace(year=date.today().year - 1) internal_audit = db.query(InternalAuditDB).filter( InternalAuditDB.status == "completed", InternalAuditDB.actual_end_date >= last_year, ).first() if not internal_audit: potential_majors.append(PotentialFinding( check="No internal audit in last 12 months", status="fail", recommendation="Conduct internal audit before certification", iso_reference="9.2", )) # Chapter 9: Management Review mgmt_review = db.query(ManagementReviewDB).filter( ManagementReviewDB.status == "approved", ManagementReviewDB.review_date >= last_year, ).first() if not mgmt_review: potential_majors.append(PotentialFinding( check="No management review in last 12 months", status="fail", recommendation="Conduct and approve management review", iso_reference="9.3", )) # Chapter 10: Open Findings open_majors = db.query(AuditFindingDB).filter( AuditFindingDB.finding_type == FindingTypeEnum.MAJOR, AuditFindingDB.status != FindingStatusEnum.CLOSED, ).count() if open_majors > 0: potential_majors.append(PotentialFinding( check=f"{open_majors} open major finding(s)", status="fail", recommendation="Close all major findings before certification", iso_reference="10.1", )) open_minors = db.query(AuditFindingDB).filter( AuditFindingDB.finding_type == FindingTypeEnum.MINOR, AuditFindingDB.status != FindingStatusEnum.CLOSED, ).count() if open_minors > 0: potential_minors.append(PotentialFinding( check=f"{open_minors} open minor finding(s)", status="warning", recommendation="Address minor findings or have CAPA in progress", iso_reference="10.1", )) # Calculate scores total_checks = 10 passed_checks = total_checks - len(potential_majors) readiness_score = (passed_checks / total_checks) * 100 certification_possible = len(potential_majors) == 0 if certification_possible: overall_status = "ready" if len(potential_minors) == 0 else "at_risk" else: overall_status = "not_ready" ch4m = any("4." in (f.iso_reference or "") for f in potential_majors) ch5m = any("5." in (f.iso_reference or "") for f in potential_majors) ch6m = any("6." in (f.iso_reference or "") for f in potential_majors) ch9m = any("9." in (f.iso_reference or "") for f in potential_majors) ch10m = any("10." in (f.iso_reference or "") for f in potential_majors) priority_actions = [f.recommendation for f in potential_majors[:5]] check = ISMSReadinessCheckDB( id=generate_id(), check_date=datetime.now(timezone.utc), triggered_by=triggered_by, overall_status=overall_status, certification_possible=certification_possible, chapter_4_status=_get_chapter_status(ch4m, False), chapter_5_status=_get_chapter_status(ch5m, False), chapter_6_status=_get_chapter_status(ch6m, False), chapter_7_status=_get_chapter_status( any("7." in (f.iso_reference or "") for f in potential_majors), any("7." in (f.iso_reference or "") for f in potential_minors), ), chapter_8_status=_get_chapter_status( any("8." in (f.iso_reference or "") for f in potential_majors), any("8." in (f.iso_reference or "") for f in potential_minors), ), chapter_9_status=_get_chapter_status(ch9m, False), chapter_10_status=_get_chapter_status(ch10m, False), potential_majors=[f.model_dump() for f in potential_majors], potential_minors=[f.model_dump() for f in potential_minors], improvement_opportunities=[f.model_dump() for f in improvement_opportunities], readiness_score=readiness_score, priority_actions=priority_actions, ) db.add(check) db.commit() db.refresh(check) return ISMSReadinessCheckResponse( id=check.id, check_date=check.check_date, triggered_by=check.triggered_by, overall_status=check.overall_status, certification_possible=check.certification_possible, chapter_4_status=check.chapter_4_status, chapter_5_status=check.chapter_5_status, chapter_6_status=check.chapter_6_status, chapter_7_status=check.chapter_7_status, chapter_8_status=check.chapter_8_status, chapter_9_status=check.chapter_9_status, chapter_10_status=check.chapter_10_status, potential_majors=potential_majors, potential_minors=potential_minors, improvement_opportunities=improvement_opportunities, readiness_score=check.readiness_score, documentation_score=None, implementation_score=None, evidence_score=None, priority_actions=priority_actions, ) @staticmethod def get_latest(db: Session) -> ISMSReadinessCheckDB: check = ( db.query(ISMSReadinessCheckDB) .order_by(ISMSReadinessCheckDB.check_date.desc()) .first() ) if not check: raise NotFoundError("No readiness check found. Run one first.") return check # ============================================================================ # ISO 27001 Overview # ============================================================================ class OverviewService: """Business logic for the ISO 27001 overview dashboard.""" @staticmethod def get_overview(db: Session) -> ISO27001OverviewResponse: scope = db.query(ISMSScopeDB).filter(ISMSScopeDB.status == ApprovalStatusEnum.APPROVED).first() scope_approved = scope is not None soa_total = db.query(StatementOfApplicabilityDB).count() soa_approved = db.query(StatementOfApplicabilityDB).filter( StatementOfApplicabilityDB.approved_at.isnot(None), ).count() soa_all_approved = soa_total > 0 and soa_approved == soa_total last_year = date.today().replace(year=date.today().year - 1) last_mgmt_review = ( db.query(ManagementReviewDB) .filter(ManagementReviewDB.status == "approved") .order_by(ManagementReviewDB.review_date.desc()) .first() ) last_internal_audit = ( db.query(InternalAuditDB) .filter(InternalAuditDB.status == "completed") .order_by(InternalAuditDB.actual_end_date.desc()) .first() ) open_majors = db.query(AuditFindingDB).filter( AuditFindingDB.finding_type == FindingTypeEnum.MAJOR, AuditFindingDB.status != FindingStatusEnum.CLOSED, ).count() open_minors = db.query(AuditFindingDB).filter( AuditFindingDB.finding_type == FindingTypeEnum.MINOR, AuditFindingDB.status != FindingStatusEnum.CLOSED, ).count() policies_total = db.query(ISMSPolicyDB).count() policies_approved = db.query(ISMSPolicyDB).filter( ISMSPolicyDB.status == ApprovalStatusEnum.APPROVED, ).count() objectives_total = db.query(SecurityObjectiveDB).count() objectives_achieved = db.query(SecurityObjectiveDB).filter( SecurityObjectiveDB.status == "achieved", ).count() has_any_data = any([ scope_approved, soa_total > 0, policies_total > 0, objectives_total > 0, last_mgmt_review is not None, last_internal_audit is not None, ]) if not has_any_data: certification_readiness = 0.0 else: readiness_factors = [ scope_approved, soa_all_approved, last_mgmt_review is not None and last_mgmt_review.review_date >= last_year, last_internal_audit is not None and (last_internal_audit.actual_end_date or date.min) >= last_year, open_majors == 0 and (soa_total > 0 or policies_total > 0), policies_total > 0 and policies_approved >= policies_total * 0.8, objectives_total > 0, ] certification_readiness = sum(readiness_factors) / len(readiness_factors) * 100 if not has_any_data: overall_status = "not_started" elif open_majors > 0: overall_status = "not_ready" elif certification_readiness >= 80: overall_status = "ready" else: overall_status = "at_risk" def _ch_status(has_positive: bool, has_issues: bool) -> str: if not has_positive: return "not_started" return "compliant" if not has_issues else "non_compliant" ch9_parts = sum([last_mgmt_review is not None, last_internal_audit is not None]) ch9_pct = (ch9_parts / 2) * 100 capa_total = db.query(AuditFindingDB).count() ch10_has_data = capa_total > 0 ch10_pct = 100.0 if (ch10_has_data and open_majors == 0) else (50.0 if ch10_has_data else 0.0) chapters = [ ISO27001ChapterStatus( chapter="4", title="Kontext der Organisation", status=_ch_status(scope_approved, False), completion_percentage=100.0 if scope_approved else 0.0, open_findings=0, key_documents=["ISMS Scope", "Context Analysis"] if scope_approved else [], last_reviewed=scope.approved_at if scope else None, ), ISO27001ChapterStatus( chapter="5", title="Fuehrung", status=_ch_status(policies_total > 0, policies_approved < policies_total), completion_percentage=(policies_approved / max(policies_total, 1)) * 100 if policies_total > 0 else 0.0, open_findings=0, key_documents=[f"Policy {i+1}" for i in range(min(policies_approved, 3))] if policies_approved > 0 else [], last_reviewed=None, ), ISO27001ChapterStatus( chapter="6", title="Planung", status=_ch_status(objectives_total > 0, False), completion_percentage=75.0 if objectives_total > 0 else 0.0, open_findings=0, key_documents=["Risk Register", "Security Objectives"] if objectives_total > 0 else [], last_reviewed=None, ), ISO27001ChapterStatus( chapter="9", title="Bewertung der Leistung", status=_ch_status(ch9_parts > 0, open_majors + open_minors > 0), completion_percentage=ch9_pct, open_findings=open_majors + open_minors, key_documents=( (["Internal Audit Report"] if last_internal_audit else []) + (["Management Review Minutes"] if last_mgmt_review else []) ), last_reviewed=last_mgmt_review.approved_at if last_mgmt_review else None, ), ISO27001ChapterStatus( chapter="10", title="Verbesserung", status=_ch_status(ch10_has_data, open_majors > 0), completion_percentage=ch10_pct, open_findings=open_majors, key_documents=["CAPA Register"] if ch10_has_data else [], last_reviewed=None, ), ] return ISO27001OverviewResponse( overall_status=overall_status, certification_readiness=certification_readiness, chapters=chapters, scope_approved=scope_approved, soa_approved=soa_all_approved, last_management_review=last_mgmt_review.approved_at if last_mgmt_review else None, last_internal_audit=( datetime.combine(last_internal_audit.actual_end_date, datetime.min.time()) if last_internal_audit and last_internal_audit.actual_end_date else None ), open_major_findings=open_majors, open_minor_findings=open_minors, policies_count=policies_total, policies_approved=policies_approved, objectives_count=objectives_total, objectives_achieved=objectives_achieved, )