""" ISMS repositories — extracted from compliance/db/isms_repository.py. Phase 1 Step 5: split per sub-aggregate. Re-exported from ``compliance.db.isms_repository`` for backwards compatibility. """ import uuid from datetime import datetime, date, timezone from typing import List, Optional, Dict, Any, Tuple from sqlalchemy.orm import Session as DBSession from compliance.db.models import ( ISMSScopeDB, ISMSPolicyDB, SecurityObjectiveDB, StatementOfApplicabilityDB, AuditFindingDB, CorrectiveActionDB, ManagementReviewDB, InternalAuditDB, AuditTrailDB, ISMSReadinessCheckDB, ApprovalStatusEnum, FindingTypeEnum, FindingStatusEnum, CAPATypeEnum, ) class ISMSScopeRepository: """Repository for ISMS Scope (ISO 27001 Chapter 4.3).""" def __init__(self, db: DBSession): self.db = db def create( self, scope_statement: str, created_by: str, included_locations: Optional[List[str]] = None, included_processes: Optional[List[str]] = None, included_services: Optional[List[str]] = None, excluded_items: Optional[List[str]] = None, exclusion_justification: Optional[str] = None, organizational_boundary: Optional[str] = None, physical_boundary: Optional[str] = None, technical_boundary: Optional[str] = None, ) -> ISMSScopeDB: """Create a new ISMS scope definition.""" # Supersede existing scopes existing = self.db.query(ISMSScopeDB).filter( ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED ).all() for s in existing: s.status = ApprovalStatusEnum.SUPERSEDED scope = ISMSScopeDB( id=str(uuid.uuid4()), scope_statement=scope_statement, included_locations=included_locations, included_processes=included_processes, included_services=included_services, excluded_items=excluded_items, exclusion_justification=exclusion_justification, organizational_boundary=organizational_boundary, physical_boundary=physical_boundary, technical_boundary=technical_boundary, status=ApprovalStatusEnum.DRAFT, created_by=created_by, ) self.db.add(scope) self.db.commit() self.db.refresh(scope) return scope def get_current(self) -> Optional[ISMSScopeDB]: """Get the current (non-superseded) ISMS scope.""" return self.db.query(ISMSScopeDB).filter( ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED ).order_by(ISMSScopeDB.created_at.desc()).first() def get_by_id(self, scope_id: str) -> Optional[ISMSScopeDB]: """Get scope by ID.""" return self.db.query(ISMSScopeDB).filter(ISMSScopeDB.id == scope_id).first() def approve( self, scope_id: str, approved_by: str, effective_date: date, review_date: date, ) -> Optional[ISMSScopeDB]: """Approve the ISMS scope.""" scope = self.get_by_id(scope_id) if not scope: return None import hashlib scope.status = ApprovalStatusEnum.APPROVED scope.approved_by = approved_by scope.approved_at = datetime.now(timezone.utc) scope.effective_date = effective_date scope.review_date = review_date scope.approval_signature = hashlib.sha256( f"{scope.scope_statement}|{approved_by}|{datetime.now(timezone.utc).isoformat()}".encode() ).hexdigest() self.db.commit() self.db.refresh(scope) return scope class ISMSPolicyRepository: """Repository for ISMS Policies (ISO 27001 Chapter 5.2).""" def __init__(self, db: DBSession): self.db = db def create( self, policy_id: str, title: str, policy_type: str, authored_by: str, description: Optional[str] = None, policy_text: Optional[str] = None, applies_to: Optional[List[str]] = None, review_frequency_months: int = 12, related_controls: Optional[List[str]] = None, ) -> ISMSPolicyDB: """Create a new ISMS policy.""" policy = ISMSPolicyDB( id=str(uuid.uuid4()), policy_id=policy_id, title=title, policy_type=policy_type, description=description, policy_text=policy_text, applies_to=applies_to, review_frequency_months=review_frequency_months, related_controls=related_controls, authored_by=authored_by, status=ApprovalStatusEnum.DRAFT, ) self.db.add(policy) self.db.commit() self.db.refresh(policy) return policy def get_by_id(self, policy_id: str) -> Optional[ISMSPolicyDB]: """Get policy by UUID or policy_id.""" return self.db.query(ISMSPolicyDB).filter( (ISMSPolicyDB.id == policy_id) | (ISMSPolicyDB.policy_id == policy_id) ).first() def get_all( self, policy_type: Optional[str] = None, status: Optional[ApprovalStatusEnum] = None, ) -> List[ISMSPolicyDB]: """Get all policies with optional filters.""" query = self.db.query(ISMSPolicyDB) if policy_type: query = query.filter(ISMSPolicyDB.policy_type == policy_type) if status: query = query.filter(ISMSPolicyDB.status == status) return query.order_by(ISMSPolicyDB.policy_id).all() def get_master_policy(self) -> Optional[ISMSPolicyDB]: """Get the approved master ISMS policy.""" return self.db.query(ISMSPolicyDB).filter( ISMSPolicyDB.policy_type == "master", ISMSPolicyDB.status == ApprovalStatusEnum.APPROVED ).first() def approve( self, policy_id: str, approved_by: str, reviewed_by: str, effective_date: date, ) -> Optional[ISMSPolicyDB]: """Approve a policy.""" policy = self.get_by_id(policy_id) if not policy: return None import hashlib policy.status = ApprovalStatusEnum.APPROVED policy.reviewed_by = reviewed_by policy.approved_by = approved_by policy.approved_at = datetime.now(timezone.utc) policy.effective_date = effective_date policy.next_review_date = date( effective_date.year + (policy.review_frequency_months // 12), effective_date.month, effective_date.day ) policy.approval_signature = hashlib.sha256( f"{policy.policy_id}|{approved_by}|{datetime.now(timezone.utc).isoformat()}".encode() ).hexdigest() self.db.commit() self.db.refresh(policy) return policy class SecurityObjectiveRepository: """Repository for Security Objectives (ISO 27001 Chapter 6.2).""" def __init__(self, db: DBSession): self.db = db def create( self, objective_id: str, title: str, description: str, category: str, owner: str, kpi_name: Optional[str] = None, kpi_target: Optional[float] = None, kpi_unit: Optional[str] = None, target_date: Optional[date] = None, related_controls: Optional[List[str]] = None, ) -> SecurityObjectiveDB: """Create a new security objective.""" objective = SecurityObjectiveDB( id=str(uuid.uuid4()), objective_id=objective_id, title=title, description=description, category=category, kpi_name=kpi_name, kpi_target=kpi_target, kpi_unit=kpi_unit, owner=owner, target_date=target_date, related_controls=related_controls, status="active", ) self.db.add(objective) self.db.commit() self.db.refresh(objective) return objective def get_by_id(self, objective_id: str) -> Optional[SecurityObjectiveDB]: """Get objective by UUID or objective_id.""" return self.db.query(SecurityObjectiveDB).filter( (SecurityObjectiveDB.id == objective_id) | (SecurityObjectiveDB.objective_id == objective_id) ).first() def get_all( self, category: Optional[str] = None, status: Optional[str] = None, ) -> List[SecurityObjectiveDB]: """Get all objectives with optional filters.""" query = self.db.query(SecurityObjectiveDB) if category: query = query.filter(SecurityObjectiveDB.category == category) if status: query = query.filter(SecurityObjectiveDB.status == status) return query.order_by(SecurityObjectiveDB.objective_id).all() def update_progress( self, objective_id: str, kpi_current: float, ) -> Optional[SecurityObjectiveDB]: """Update objective progress.""" objective = self.get_by_id(objective_id) if not objective: return None objective.kpi_current = kpi_current if objective.kpi_target: objective.progress_percentage = min(100, (kpi_current / objective.kpi_target) * 100) # Auto-mark as achieved if 100% if objective.progress_percentage >= 100 and objective.status == "active": objective.status = "achieved" objective.achieved_date = date.today() self.db.commit() self.db.refresh(objective) return objective class StatementOfApplicabilityRepository: """Repository for Statement of Applicability (SoA).""" def __init__(self, db: DBSession): self.db = db def create( self, annex_a_control: str, annex_a_title: str, annex_a_category: str, is_applicable: bool = True, applicability_justification: Optional[str] = None, implementation_status: str = "planned", breakpilot_control_ids: Optional[List[str]] = None, ) -> StatementOfApplicabilityDB: """Create a new SoA entry.""" entry = StatementOfApplicabilityDB( id=str(uuid.uuid4()), annex_a_control=annex_a_control, annex_a_title=annex_a_title, annex_a_category=annex_a_category, is_applicable=is_applicable, applicability_justification=applicability_justification, implementation_status=implementation_status, breakpilot_control_ids=breakpilot_control_ids or [], ) self.db.add(entry) self.db.commit() self.db.refresh(entry) return entry def get_by_control(self, annex_a_control: str) -> Optional[StatementOfApplicabilityDB]: """Get SoA entry by Annex A control ID (e.g., 'A.5.1').""" return self.db.query(StatementOfApplicabilityDB).filter( StatementOfApplicabilityDB.annex_a_control == annex_a_control ).first() def get_all( self, is_applicable: Optional[bool] = None, implementation_status: Optional[str] = None, category: Optional[str] = None, ) -> List[StatementOfApplicabilityDB]: """Get all SoA entries with optional filters.""" query = self.db.query(StatementOfApplicabilityDB) if is_applicable is not None: query = query.filter(StatementOfApplicabilityDB.is_applicable == is_applicable) if implementation_status: query = query.filter(StatementOfApplicabilityDB.implementation_status == implementation_status) if category: query = query.filter(StatementOfApplicabilityDB.annex_a_category == category) return query.order_by(StatementOfApplicabilityDB.annex_a_control).all() def get_statistics(self) -> Dict[str, Any]: """Get SoA statistics.""" entries = self.get_all() total = len(entries) applicable = sum(1 for e in entries if e.is_applicable) implemented = sum(1 for e in entries if e.implementation_status == "implemented") approved = sum(1 for e in entries if e.approved_at) return { "total": total, "applicable": applicable, "not_applicable": total - applicable, "implemented": implemented, "planned": sum(1 for e in entries if e.implementation_status == "planned"), "approved": approved, "pending_approval": total - approved, "implementation_rate": round((implemented / applicable * 100) if applicable > 0 else 0, 1), }