# mypy: disable-error-code="arg-type,assignment,union-attr,no-any-return,attr-defined,index,call-overload,type-arg,var-annotated,misc,call-arg,return-value" """ ISMS Governance service -- Scope, Context, Policies, Objectives, SoA. Phase 1 Step 4: extracted from ``compliance.api.isms_routes``. Helpers ``generate_id``, ``create_signature`` and ``log_audit_trail`` are defined here and re-exported from ``compliance.api.isms_routes`` for legacy imports. """ import uuid import hashlib from datetime import datetime, date, timezone from typing import Optional, List from sqlalchemy.orm import Session from compliance.db.models import ( ISMSScopeDB, ISMSContextDB, ISMSPolicyDB, SecurityObjectiveDB, StatementOfApplicabilityDB, AuditTrailDB, ApprovalStatusEnum, ) from compliance.domain import NotFoundError, ConflictError, ValidationError # ============================================================================ # Shared helpers (re-exported by isms_routes for back-compat) # ============================================================================ def generate_id() -> str: """Generate a UUID string.""" return str(uuid.uuid4()) def create_signature(data: str) -> str: """Create SHA-256 signature.""" return hashlib.sha256(data.encode()).hexdigest() def log_audit_trail( db: Session, entity_type: str, entity_id: str, entity_name: str, action: str, performed_by: str, field_changed: str = None, old_value: str = None, new_value: str = None, change_summary: str = None, ) -> None: """Log an entry to the audit trail.""" trail = AuditTrailDB( id=generate_id(), entity_type=entity_type, entity_id=entity_id, entity_name=entity_name, action=action, field_changed=field_changed, old_value=old_value, new_value=new_value, change_summary=change_summary, performed_by=performed_by, performed_at=datetime.now(timezone.utc), checksum=create_signature( f"{entity_type}|{entity_id}|{action}|{performed_by}" ), ) db.add(trail) # ============================================================================ # Scope (ISO 27001 4.3) # ============================================================================ class ISMSScopeService: """Business logic for ISMS Scope.""" @staticmethod def get_current(db: Session) -> ISMSScopeDB: scope = ( db.query(ISMSScopeDB) .filter(ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED) .order_by(ISMSScopeDB.created_at.desc()) .first() ) if not scope: raise NotFoundError("No ISMS scope defined yet") return scope @staticmethod def create(db: Session, data: dict, created_by: str) -> ISMSScopeDB: existing = ( db.query(ISMSScopeDB) .filter(ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED) .all() ) for s in existing: s.status = ApprovalStatusEnum.SUPERSEDED scope = ISMSScopeDB(id=generate_id(), status=ApprovalStatusEnum.DRAFT, created_by=created_by, **data) db.add(scope) log_audit_trail(db, "isms_scope", scope.id, "ISMS Scope", "create", created_by) db.commit() db.refresh(scope) return scope @staticmethod def update(db: Session, scope_id: str, data: dict, updated_by: str) -> ISMSScopeDB: scope = db.query(ISMSScopeDB).filter(ISMSScopeDB.id == scope_id).first() if not scope: raise NotFoundError("Scope not found") if scope.status == ApprovalStatusEnum.APPROVED: raise ConflictError("Cannot modify approved scope. Create new version.") for field, value in data.items(): setattr(scope, field, value) scope.updated_by = updated_by scope.updated_at = datetime.now(timezone.utc) version_parts = scope.version.split(".") scope.version = f"{version_parts[0]}.{int(version_parts[1]) + 1}" log_audit_trail(db, "isms_scope", scope.id, "ISMS Scope", "update", updated_by) db.commit() db.refresh(scope) return scope @staticmethod def approve(db: Session, scope_id: str, approved_by: str, effective_date: date, review_date: date) -> ISMSScopeDB: scope = db.query(ISMSScopeDB).filter(ISMSScopeDB.id == scope_id).first() if not scope: raise NotFoundError("Scope not found") scope.status = ApprovalStatusEnum.APPROVED scope.approved_by = approved_by scope.approved_at = datetime.now(timezone.utc) scope.effective_date = effective_date scope.review_date = review_date scope.approval_signature = create_signature( f"{scope.scope_statement}|{approved_by}|{datetime.now(timezone.utc).isoformat()}" ) log_audit_trail(db, "isms_scope", scope.id, "ISMS Scope", "approve", approved_by) db.commit() db.refresh(scope) return scope # ============================================================================ # Context (ISO 27001 4.1, 4.2) # ============================================================================ class ISMSContextService: """Business logic for ISMS Context.""" @staticmethod def get_current(db: Session) -> ISMSContextDB: context = ( db.query(ISMSContextDB) .filter(ISMSContextDB.status != ApprovalStatusEnum.SUPERSEDED) .order_by(ISMSContextDB.created_at.desc()) .first() ) if not context: raise NotFoundError("No ISMS context defined yet") return context @staticmethod def create(db: Session, data: dict, created_by: str) -> ISMSContextDB: existing = ( db.query(ISMSContextDB) .filter(ISMSContextDB.status != ApprovalStatusEnum.SUPERSEDED) .all() ) for c in existing: c.status = ApprovalStatusEnum.SUPERSEDED context = ISMSContextDB(id=generate_id(), status=ApprovalStatusEnum.DRAFT, **data) db.add(context) log_audit_trail(db, "isms_context", context.id, "ISMS Context", "create", created_by) db.commit() db.refresh(context) return context # ============================================================================ # Policies (ISO 27001 5.2) # ============================================================================ class ISMSPolicyService: """Business logic for ISMS Policies.""" @staticmethod def list_policies(db: Session, policy_type: Optional[str] = None, status: Optional[str] = None) -> tuple: query = db.query(ISMSPolicyDB) if policy_type: query = query.filter(ISMSPolicyDB.policy_type == policy_type) if status: query = query.filter(ISMSPolicyDB.status == status) policies = query.order_by(ISMSPolicyDB.policy_id).all() return policies, len(policies) @staticmethod def create(db: Session, data: dict) -> ISMSPolicyDB: existing = db.query(ISMSPolicyDB).filter(ISMSPolicyDB.policy_id == data["policy_id"]).first() if existing: raise ConflictError(f"Policy {data['policy_id']} already exists") authored_by = data.pop("authored_by") policy = ISMSPolicyDB( id=generate_id(), authored_by=authored_by, status=ApprovalStatusEnum.DRAFT, **data, ) db.add(policy) log_audit_trail(db, "isms_policy", policy.id, policy.policy_id, "create", authored_by) db.commit() db.refresh(policy) return policy @staticmethod def get(db: Session, policy_id: str) -> ISMSPolicyDB: policy = ( db.query(ISMSPolicyDB) .filter((ISMSPolicyDB.id == policy_id) | (ISMSPolicyDB.policy_id == policy_id)) .first() ) if not policy: raise NotFoundError("Policy not found") return policy @staticmethod def update(db: Session, policy_id: str, data: dict, updated_by: str) -> ISMSPolicyDB: policy = ( db.query(ISMSPolicyDB) .filter((ISMSPolicyDB.id == policy_id) | (ISMSPolicyDB.policy_id == policy_id)) .first() ) if not policy: raise NotFoundError("Policy not found") if policy.status == ApprovalStatusEnum.APPROVED: version_parts = policy.version.split(".") policy.version = f"{int(version_parts[0]) + 1}.0" policy.status = ApprovalStatusEnum.DRAFT for field, value in data.items(): setattr(policy, field, value) log_audit_trail(db, "isms_policy", policy.id, policy.policy_id, "update", updated_by) db.commit() db.refresh(policy) return policy @staticmethod def approve(db: Session, policy_id: str, reviewed_by: str, approved_by: str, effective_date: date) -> ISMSPolicyDB: policy = ( db.query(ISMSPolicyDB) .filter((ISMSPolicyDB.id == policy_id) | (ISMSPolicyDB.policy_id == policy_id)) .first() ) if not policy: raise NotFoundError("Policy not found") policy.reviewed_by = reviewed_by policy.approved_by = approved_by policy.approved_at = datetime.now(timezone.utc) policy.effective_date = effective_date policy.next_review_date = date( effective_date.year + (policy.review_frequency_months // 12), effective_date.month, effective_date.day, ) policy.status = ApprovalStatusEnum.APPROVED policy.approval_signature = create_signature( f"{policy.policy_id}|{approved_by}|{datetime.now(timezone.utc).isoformat()}" ) log_audit_trail(db, "isms_policy", policy.id, policy.policy_id, "approve", approved_by) db.commit() db.refresh(policy) return policy # ============================================================================ # Security Objectives (ISO 27001 6.2) # ============================================================================ class SecurityObjectiveService: """Business logic for Security Objectives.""" @staticmethod def list_objectives(db: Session, category: Optional[str] = None, status: Optional[str] = None) -> tuple: query = db.query(SecurityObjectiveDB) if category: query = query.filter(SecurityObjectiveDB.category == category) if status: query = query.filter(SecurityObjectiveDB.status == status) objectives = query.order_by(SecurityObjectiveDB.objective_id).all() return objectives, len(objectives) @staticmethod def create(db: Session, data: dict, created_by: str) -> SecurityObjectiveDB: objective = SecurityObjectiveDB(id=generate_id(), status="active", **data) db.add(objective) log_audit_trail(db, "security_objective", objective.id, objective.objective_id, "create", created_by) db.commit() db.refresh(objective) return objective @staticmethod def update(db: Session, objective_id: str, data: dict, updated_by: str) -> SecurityObjectiveDB: objective = ( db.query(SecurityObjectiveDB) .filter((SecurityObjectiveDB.id == objective_id) | (SecurityObjectiveDB.objective_id == objective_id)) .first() ) if not objective: raise NotFoundError("Objective not found") for field, value in data.items(): setattr(objective, field, value) if objective.progress_percentage >= 100 and objective.status == "active": objective.status = "achieved" objective.achieved_date = date.today() log_audit_trail(db, "security_objective", objective.id, objective.objective_id, "update", updated_by) db.commit() db.refresh(objective) return objective # ============================================================================ # Statement of Applicability (SoA) # ============================================================================ class SoAService: """Business logic for Statement of Applicability.""" @staticmethod def list_entries( db: Session, is_applicable: Optional[bool] = None, implementation_status: Optional[str] = None, category: Optional[str] = None, ) -> dict: query = db.query(StatementOfApplicabilityDB) if is_applicable is not None: query = query.filter(StatementOfApplicabilityDB.is_applicable == is_applicable) if implementation_status: query = query.filter(StatementOfApplicabilityDB.implementation_status == implementation_status) if category: query = query.filter(StatementOfApplicabilityDB.annex_a_category == category) entries = query.order_by(StatementOfApplicabilityDB.annex_a_control).all() applicable_count = sum(1 for e in entries if e.is_applicable) implemented_count = sum(1 for e in entries if e.implementation_status == "implemented") planned_count = sum(1 for e in entries if e.implementation_status == "planned") return { "entries": entries, "total": len(entries), "applicable_count": applicable_count, "not_applicable_count": len(entries) - applicable_count, "implemented_count": implemented_count, "planned_count": planned_count, } @staticmethod def create(db: Session, data: dict, created_by: str) -> StatementOfApplicabilityDB: existing = ( db.query(StatementOfApplicabilityDB) .filter(StatementOfApplicabilityDB.annex_a_control == data["annex_a_control"]) .first() ) if existing: raise ConflictError(f"SoA entry for {data['annex_a_control']} already exists") entry = StatementOfApplicabilityDB(id=generate_id(), **data) db.add(entry) log_audit_trail(db, "soa", entry.id, entry.annex_a_control, "create", created_by) db.commit() db.refresh(entry) return entry @staticmethod def update(db: Session, entry_id: str, data: dict, updated_by: str) -> StatementOfApplicabilityDB: entry = ( db.query(StatementOfApplicabilityDB) .filter( (StatementOfApplicabilityDB.id == entry_id) | (StatementOfApplicabilityDB.annex_a_control == entry_id) ) .first() ) if not entry: raise NotFoundError("SoA entry not found") for field, value in data.items(): setattr(entry, field, value) version_parts = entry.version.split(".") entry.version = f"{version_parts[0]}.{int(version_parts[1]) + 1}" log_audit_trail(db, "soa", entry.id, entry.annex_a_control, "update", updated_by) db.commit() db.refresh(entry) return entry @staticmethod def approve(db: Session, entry_id: str, reviewed_by: str, approved_by: str) -> StatementOfApplicabilityDB: entry = ( db.query(StatementOfApplicabilityDB) .filter( (StatementOfApplicabilityDB.id == entry_id) | (StatementOfApplicabilityDB.annex_a_control == entry_id) ) .first() ) if not entry: raise NotFoundError("SoA entry not found") entry.reviewed_by = reviewed_by entry.reviewed_at = datetime.now(timezone.utc) entry.approved_by = approved_by entry.approved_at = datetime.now(timezone.utc) log_audit_trail(db, "soa", entry.id, entry.annex_a_control, "approve", approved_by) db.commit() db.refresh(entry) return entry