# mypy: disable-error-code="arg-type,assignment,union-attr,no-any-return,attr-defined,index,call-overload,type-arg,var-annotated,misc,call-arg,return-value" """ ISMS Assessment service -- Management Reviews, Internal Audits, Audit Trail. Phase 1 Step 4: extracted from ``compliance.api.isms_routes``. Readiness Check and Overview live in ``isms_readiness_service``. """ from datetime import datetime, date, timezone from typing import Optional from sqlalchemy.orm import Session from compliance.db.models import ( ManagementReviewDB, InternalAuditDB, AuditTrailDB, ) from compliance.domain import NotFoundError from compliance.services.isms_governance_service import generate_id, log_audit_trail from compliance.schemas.isms_audit import PaginationMeta # ============================================================================ # Management Reviews (ISO 27001 9.3) # ============================================================================ class ManagementReviewService: """Business logic for Management Reviews.""" @staticmethod def list_reviews(db: Session, status: Optional[str] = None) -> tuple: query = db.query(ManagementReviewDB) if status: query = query.filter(ManagementReviewDB.status == status) reviews = query.order_by(ManagementReviewDB.review_date.desc()).all() return reviews, len(reviews) @staticmethod def get(db: Session, review_id: str) -> ManagementReviewDB: review = ( db.query(ManagementReviewDB) .filter( (ManagementReviewDB.id == review_id) | (ManagementReviewDB.review_id == review_id) ) .first() ) if not review: raise NotFoundError("Management review not found") return review @staticmethod def create(db: Session, data: dict, created_by: str) -> ManagementReviewDB: review_date = data["review_date"] if isinstance(review_date, str): review_date = date.fromisoformat(review_date) year = review_date.year quarter = (review_date.month - 1) // 3 + 1 review_id = f"MR-{year}-Q{quarter}" existing = db.query(ManagementReviewDB).filter(ManagementReviewDB.review_id == review_id).first() if existing: review_id = f"{review_id}-{generate_id()[:4]}" attendees = data.pop("attendees", None) review = ManagementReviewDB( id=generate_id(), review_id=review_id, attendees=[a.model_dump() for a in attendees] if attendees else None, status="draft", **data, ) db.add(review) log_audit_trail(db, "management_review", review.id, review_id, "create", created_by) db.commit() db.refresh(review) return review @staticmethod def update(db: Session, review_id: str, data: dict, updated_by: str) -> ManagementReviewDB: review = ( db.query(ManagementReviewDB) .filter( (ManagementReviewDB.id == review_id) | (ManagementReviewDB.review_id == review_id) ) .first() ) if not review: raise NotFoundError("Management review not found") for field, value in data.items(): if field == "action_items" and value: setattr(review, field, [item.model_dump() for item in value]) else: setattr(review, field, value) log_audit_trail(db, "management_review", review.id, review.review_id, "update", updated_by) db.commit() db.refresh(review) return review @staticmethod def approve( db: Session, review_id: str, approved_by: str, next_review_date: date, minutes_document_path: Optional[str] = None, ) -> ManagementReviewDB: review = ( db.query(ManagementReviewDB) .filter( (ManagementReviewDB.id == review_id) | (ManagementReviewDB.review_id == review_id) ) .first() ) if not review: raise NotFoundError("Management review not found") review.status = "approved" review.approved_by = approved_by review.approved_at = datetime.now(timezone.utc) review.next_review_date = next_review_date review.minutes_document_path = minutes_document_path log_audit_trail(db, "management_review", review.id, review.review_id, "approve", approved_by) db.commit() db.refresh(review) return review # ============================================================================ # Internal Audits (ISO 27001 9.2) # ============================================================================ class InternalAuditService: """Business logic for Internal Audits.""" @staticmethod def list_audits(db: Session, status: Optional[str] = None, audit_type: Optional[str] = None) -> tuple: query = db.query(InternalAuditDB) if status: query = query.filter(InternalAuditDB.status == status) if audit_type: query = query.filter(InternalAuditDB.audit_type == audit_type) audits = query.order_by(InternalAuditDB.planned_date.desc()).all() return audits, len(audits) @staticmethod def create(db: Session, data: dict, created_by: str) -> InternalAuditDB: planned_date = data["planned_date"] if isinstance(planned_date, str): planned_date = date.fromisoformat(planned_date) year = planned_date.year existing_count = ( db.query(InternalAuditDB) .filter(InternalAuditDB.audit_id.like(f"IA-{year}-%")) .count() ) audit_id = f"IA-{year}-{existing_count + 1:03d}" audit = InternalAuditDB(id=generate_id(), audit_id=audit_id, status="planned", **data) db.add(audit) log_audit_trail(db, "internal_audit", audit.id, audit_id, "create", created_by) db.commit() db.refresh(audit) return audit @staticmethod def update(db: Session, audit_id: str, data: dict, updated_by: str) -> InternalAuditDB: audit = ( db.query(InternalAuditDB) .filter((InternalAuditDB.id == audit_id) | (InternalAuditDB.audit_id == audit_id)) .first() ) if not audit: raise NotFoundError("Internal audit not found") for field, value in data.items(): setattr(audit, field, value) log_audit_trail(db, "internal_audit", audit.id, audit.audit_id, "update", updated_by) db.commit() db.refresh(audit) return audit @staticmethod def complete( db: Session, audit_id: str, audit_conclusion: str, overall_assessment: str, follow_up_audit_required: bool, completed_by: str, ) -> InternalAuditDB: audit = ( db.query(InternalAuditDB) .filter((InternalAuditDB.id == audit_id) | (InternalAuditDB.audit_id == audit_id)) .first() ) if not audit: raise NotFoundError("Internal audit not found") audit.status = "completed" audit.actual_end_date = date.today() audit.report_date = date.today() audit.audit_conclusion = audit_conclusion audit.overall_assessment = overall_assessment audit.follow_up_audit_required = follow_up_audit_required log_audit_trail(db, "internal_audit", audit.id, audit.audit_id, "complete", completed_by) db.commit() db.refresh(audit) return audit # ============================================================================ # Audit Trail # ============================================================================ class AuditTrailService: """Business logic for Audit Trail queries.""" @staticmethod def query( db: Session, entity_type: Optional[str] = None, entity_id: Optional[str] = None, performed_by: Optional[str] = None, action: Optional[str] = None, page: int = 1, page_size: int = 50, ) -> dict: q = db.query(AuditTrailDB) if entity_type: q = q.filter(AuditTrailDB.entity_type == entity_type) if entity_id: q = q.filter(AuditTrailDB.entity_id == entity_id) if performed_by: q = q.filter(AuditTrailDB.performed_by == performed_by) if action: q = q.filter(AuditTrailDB.action == action) total = q.count() entries = ( q.order_by(AuditTrailDB.performed_at.desc()) .offset((page - 1) * page_size) .limit(page_size) .all() ) total_pages = (total + page_size - 1) // page_size return { "entries": entries, "total": total, "pagination": PaginationMeta( page=page, page_size=page_size, total=total, total_pages=total_pages, has_next=page < total_pages, has_prev=page > 1, ), }