diff --git a/backend-compliance/compliance/db/audit_export_repository.py b/backend-compliance/compliance/db/audit_export_repository.py new file mode 100644 index 0000000..e8d8540 --- /dev/null +++ b/backend-compliance/compliance/db/audit_export_repository.py @@ -0,0 +1,110 @@ +""" +Compliance repositories — extracted from compliance/db/repository.py. + +Phase 1 Step 5: the monolithic repository module is decomposed per +aggregate. Every repository class is re-exported from +``compliance.db.repository`` for backwards compatibility. +""" + +import uuid +from datetime import datetime, date, timezone +from typing import List, Optional, Dict, Any, Tuple + +from sqlalchemy.orm import Session as DBSession, selectinload, joinedload +from sqlalchemy import func, and_, or_ + +from compliance.db.models import ( + RegulationDB, RequirementDB, ControlDB, ControlMappingDB, + EvidenceDB, RiskDB, AuditExportDB, + AuditSessionDB, AuditSignOffDB, AuditResultEnum, AuditSessionStatusEnum, + RegulationTypeEnum, ControlDomainEnum, ControlStatusEnum, + RiskLevelEnum, EvidenceStatusEnum, ExportStatusEnum, + ServiceModuleDB, ModuleRegulationMappingDB, +) + +class AuditExportRepository: + """Repository for audit exports.""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + export_type: str, + requested_by: str, + export_name: Optional[str] = None, + included_regulations: Optional[List[str]] = None, + included_domains: Optional[List[str]] = None, + date_range_start: Optional[date] = None, + date_range_end: Optional[date] = None, + ) -> AuditExportDB: + """Create an export request.""" + export = AuditExportDB( + id=str(uuid.uuid4()), + export_type=export_type, + export_name=export_name or f"audit_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}", + requested_by=requested_by, + included_regulations=included_regulations, + included_domains=included_domains, + date_range_start=date_range_start, + date_range_end=date_range_end, + ) + self.db.add(export) + self.db.commit() + self.db.refresh(export) + return export + + def get_by_id(self, export_id: str) -> Optional[AuditExportDB]: + """Get export by ID.""" + return self.db.query(AuditExportDB).filter(AuditExportDB.id == export_id).first() + + def get_all(self, limit: int = 50) -> List[AuditExportDB]: + """Get all exports.""" + return ( + self.db.query(AuditExportDB) + .order_by(AuditExportDB.requested_at.desc()) + .limit(limit) + .all() + ) + + def update_status( + self, + export_id: str, + status: ExportStatusEnum, + file_path: Optional[str] = None, + file_hash: Optional[str] = None, + file_size_bytes: Optional[int] = None, + error_message: Optional[str] = None, + total_controls: Optional[int] = None, + total_evidence: Optional[int] = None, + compliance_score: Optional[float] = None, + ) -> Optional[AuditExportDB]: + """Update export status.""" + export = self.get_by_id(export_id) + if not export: + return None + + export.status = status + if file_path: + export.file_path = file_path + if file_hash: + export.file_hash = file_hash + if file_size_bytes: + export.file_size_bytes = file_size_bytes + if error_message: + export.error_message = error_message + if total_controls is not None: + export.total_controls = total_controls + if total_evidence is not None: + export.total_evidence = total_evidence + if compliance_score is not None: + export.compliance_score = compliance_score + + if status == ExportStatusEnum.COMPLETED: + export.completed_at = datetime.now(timezone.utc) + + export.updated_at = datetime.now(timezone.utc) + self.db.commit() + self.db.refresh(export) + return export + diff --git a/backend-compliance/compliance/db/audit_session_repository.py b/backend-compliance/compliance/db/audit_session_repository.py new file mode 100644 index 0000000..b9dabf5 --- /dev/null +++ b/backend-compliance/compliance/db/audit_session_repository.py @@ -0,0 +1,478 @@ +""" +Compliance repositories — extracted from compliance/db/repository.py. + +Phase 1 Step 5: the monolithic repository module is decomposed per +aggregate. Every repository class is re-exported from +``compliance.db.repository`` for backwards compatibility. +""" + +import uuid +from datetime import datetime, date, timezone +from typing import List, Optional, Dict, Any, Tuple + +from sqlalchemy.orm import Session as DBSession, selectinload, joinedload +from sqlalchemy import func, and_, or_ + +from compliance.db.models import ( + RegulationDB, RequirementDB, ControlDB, ControlMappingDB, + EvidenceDB, RiskDB, AuditExportDB, + AuditSessionDB, AuditSignOffDB, AuditResultEnum, AuditSessionStatusEnum, + RegulationTypeEnum, ControlDomainEnum, ControlStatusEnum, + RiskLevelEnum, EvidenceStatusEnum, ExportStatusEnum, + ServiceModuleDB, ModuleRegulationMappingDB, +) + +class AuditSessionRepository: + """Repository for audit sessions (Sprint 3: Auditor-Verbesserungen).""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + name: str, + auditor_name: str, + description: Optional[str] = None, + auditor_email: Optional[str] = None, + regulation_ids: Optional[List[str]] = None, + ) -> AuditSessionDB: + """Create a new audit session.""" + session = AuditSessionDB( + id=str(uuid.uuid4()), + name=name, + description=description, + auditor_name=auditor_name, + auditor_email=auditor_email, + regulation_ids=regulation_ids, + status=AuditSessionStatusEnum.DRAFT, + ) + self.db.add(session) + self.db.commit() + self.db.refresh(session) + return session + + def get_by_id(self, session_id: str) -> Optional[AuditSessionDB]: + """Get audit session by ID with eager-loaded signoffs.""" + return ( + self.db.query(AuditSessionDB) + .options( + selectinload(AuditSessionDB.signoffs) + .selectinload(AuditSignOffDB.requirement) + ) + .filter(AuditSessionDB.id == session_id) + .first() + ) + + def get_all( + self, + status: Optional[AuditSessionStatusEnum] = None, + limit: int = 50, + ) -> List[AuditSessionDB]: + """Get all audit sessions with optional status filter.""" + query = self.db.query(AuditSessionDB) + if status: + query = query.filter(AuditSessionDB.status == status) + return query.order_by(AuditSessionDB.created_at.desc()).limit(limit).all() + + def update_status( + self, + session_id: str, + status: AuditSessionStatusEnum, + ) -> Optional[AuditSessionDB]: + """Update session status and set appropriate timestamps.""" + session = self.get_by_id(session_id) + if not session: + return None + + session.status = status + if status == AuditSessionStatusEnum.IN_PROGRESS and not session.started_at: + session.started_at = datetime.now(timezone.utc) + elif status == AuditSessionStatusEnum.COMPLETED: + session.completed_at = datetime.now(timezone.utc) + + session.updated_at = datetime.now(timezone.utc) + self.db.commit() + self.db.refresh(session) + return session + + def update_progress( + self, + session_id: str, + total_items: Optional[int] = None, + completed_items: Optional[int] = None, + ) -> Optional[AuditSessionDB]: + """Update session progress counters.""" + session = self.db.query(AuditSessionDB).filter( + AuditSessionDB.id == session_id + ).first() + if not session: + return None + + if total_items is not None: + session.total_items = total_items + if completed_items is not None: + session.completed_items = completed_items + + session.updated_at = datetime.now(timezone.utc) + self.db.commit() + self.db.refresh(session) + return session + + def start_session(self, session_id: str) -> Optional[AuditSessionDB]: + """ + Start an audit session: + - Set status to IN_PROGRESS + - Initialize total_items based on requirements count + """ + session = self.get_by_id(session_id) + if not session: + return None + + # Count requirements for this session + query = self.db.query(func.count(RequirementDB.id)) + if session.regulation_ids: + query = query.join(RegulationDB).filter( + RegulationDB.id.in_(session.regulation_ids) + ) + total_requirements = query.scalar() or 0 + + session.status = AuditSessionStatusEnum.IN_PROGRESS + session.started_at = datetime.now(timezone.utc) + session.total_items = total_requirements + session.updated_at = datetime.now(timezone.utc) + + self.db.commit() + self.db.refresh(session) + return session + + def delete(self, session_id: str) -> bool: + """Delete an audit session (cascades to signoffs).""" + session = self.db.query(AuditSessionDB).filter( + AuditSessionDB.id == session_id + ).first() + if not session: + return False + + self.db.delete(session) + self.db.commit() + return True + + def get_statistics(self, session_id: str) -> Dict[str, Any]: + """Get detailed statistics for an audit session.""" + session = self.get_by_id(session_id) + if not session: + return {} + + signoffs = session.signoffs or [] + + stats = { + "total": session.total_items or 0, + "completed": len([s for s in signoffs if s.result != AuditResultEnum.PENDING]), + "compliant": len([s for s in signoffs if s.result == AuditResultEnum.COMPLIANT]), + "compliant_with_notes": len([s for s in signoffs if s.result == AuditResultEnum.COMPLIANT_WITH_NOTES]), + "non_compliant": len([s for s in signoffs if s.result == AuditResultEnum.NON_COMPLIANT]), + "not_applicable": len([s for s in signoffs if s.result == AuditResultEnum.NOT_APPLICABLE]), + "pending": len([s for s in signoffs if s.result == AuditResultEnum.PENDING]), + "signed": len([s for s in signoffs if s.signature_hash]), + } + + total = stats["total"] if stats["total"] > 0 else 1 + stats["completion_percentage"] = round( + (stats["completed"] / total) * 100, 1 + ) + + return stats + + +class AuditSignOffRepository: + """Repository for audit sign-offs (Sprint 3: Auditor-Verbesserungen).""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + session_id: str, + requirement_id: str, + result: AuditResultEnum = AuditResultEnum.PENDING, + notes: Optional[str] = None, + ) -> AuditSignOffDB: + """Create a new sign-off for a requirement.""" + signoff = AuditSignOffDB( + id=str(uuid.uuid4()), + session_id=session_id, + requirement_id=requirement_id, + result=result, + notes=notes, + ) + self.db.add(signoff) + self.db.commit() + self.db.refresh(signoff) + return signoff + + def get_by_id(self, signoff_id: str) -> Optional[AuditSignOffDB]: + """Get sign-off by ID.""" + return ( + self.db.query(AuditSignOffDB) + .options(joinedload(AuditSignOffDB.requirement)) + .filter(AuditSignOffDB.id == signoff_id) + .first() + ) + + def get_by_session_and_requirement( + self, + session_id: str, + requirement_id: str, + ) -> Optional[AuditSignOffDB]: + """Get sign-off by session and requirement ID.""" + return ( + self.db.query(AuditSignOffDB) + .filter( + and_( + AuditSignOffDB.session_id == session_id, + AuditSignOffDB.requirement_id == requirement_id, + ) + ) + .first() + ) + + def get_by_session( + self, + session_id: str, + result_filter: Optional[AuditResultEnum] = None, + ) -> List[AuditSignOffDB]: + """Get all sign-offs for a session.""" + query = ( + self.db.query(AuditSignOffDB) + .options(joinedload(AuditSignOffDB.requirement)) + .filter(AuditSignOffDB.session_id == session_id) + ) + if result_filter: + query = query.filter(AuditSignOffDB.result == result_filter) + return query.order_by(AuditSignOffDB.created_at).all() + + def update( + self, + signoff_id: str, + result: Optional[AuditResultEnum] = None, + notes: Optional[str] = None, + sign: bool = False, + signed_by: Optional[str] = None, + ) -> Optional[AuditSignOffDB]: + """Update a sign-off with optional digital signature.""" + signoff = self.db.query(AuditSignOffDB).filter( + AuditSignOffDB.id == signoff_id + ).first() + if not signoff: + return None + + if result is not None: + signoff.result = result + if notes is not None: + signoff.notes = notes + + if sign and signed_by: + signoff.create_signature(signed_by) + + signoff.updated_at = datetime.now(timezone.utc) + self.db.commit() + self.db.refresh(signoff) + + # Update session progress + self._update_session_progress(signoff.session_id) + + return signoff + + def sign_off( + self, + session_id: str, + requirement_id: str, + result: AuditResultEnum, + notes: Optional[str] = None, + sign: bool = False, + signed_by: Optional[str] = None, + ) -> AuditSignOffDB: + """ + Create or update a sign-off for a requirement. + This is the main method for auditors to record their findings. + """ + # Check if sign-off already exists + signoff = self.get_by_session_and_requirement(session_id, requirement_id) + + if signoff: + # Update existing + signoff.result = result + if notes is not None: + signoff.notes = notes + if sign and signed_by: + signoff.create_signature(signed_by) + signoff.updated_at = datetime.now(timezone.utc) + else: + # Create new + signoff = AuditSignOffDB( + id=str(uuid.uuid4()), + session_id=session_id, + requirement_id=requirement_id, + result=result, + notes=notes, + ) + if sign and signed_by: + signoff.create_signature(signed_by) + self.db.add(signoff) + + self.db.commit() + self.db.refresh(signoff) + + # Update session progress + self._update_session_progress(session_id) + + return signoff + + def _update_session_progress(self, session_id: str) -> None: + """Update the session's completed_items count.""" + completed = ( + self.db.query(func.count(AuditSignOffDB.id)) + .filter( + and_( + AuditSignOffDB.session_id == session_id, + AuditSignOffDB.result != AuditResultEnum.PENDING, + ) + ) + .scalar() + ) or 0 + + session = self.db.query(AuditSessionDB).filter( + AuditSessionDB.id == session_id + ).first() + if session: + session.completed_items = completed + session.updated_at = datetime.now(timezone.utc) + self.db.commit() + + def get_checklist( + self, + session_id: str, + page: int = 1, + page_size: int = 50, + result_filter: Optional[AuditResultEnum] = None, + regulation_code: Optional[str] = None, + search: Optional[str] = None, + ) -> Tuple[List[Dict[str, Any]], int]: + """ + Get audit checklist items for a session with pagination. + Returns requirements with their sign-off status. + """ + session = self.db.query(AuditSessionDB).filter( + AuditSessionDB.id == session_id + ).first() + if not session: + return [], 0 + + # Base query for requirements + query = ( + self.db.query(RequirementDB) + .options( + joinedload(RequirementDB.regulation), + selectinload(RequirementDB.control_mappings), + ) + ) + + # Filter by session's regulation_ids if set + if session.regulation_ids: + query = query.filter(RequirementDB.regulation_id.in_(session.regulation_ids)) + + # Filter by regulation code + if regulation_code: + query = query.join(RegulationDB).filter(RegulationDB.code == regulation_code) + + # Search + if search: + search_term = f"%{search}%" + query = query.filter( + or_( + RequirementDB.title.ilike(search_term), + RequirementDB.article.ilike(search_term), + ) + ) + + # Get existing sign-offs for this session + signoffs_map = {} + signoffs = ( + self.db.query(AuditSignOffDB) + .filter(AuditSignOffDB.session_id == session_id) + .all() + ) + for s in signoffs: + signoffs_map[s.requirement_id] = s + + # Filter by result if specified + if result_filter: + if result_filter == AuditResultEnum.PENDING: + # Requirements without sign-off or with pending status + signed_req_ids = [ + s.requirement_id for s in signoffs + if s.result != AuditResultEnum.PENDING + ] + if signed_req_ids: + query = query.filter(~RequirementDB.id.in_(signed_req_ids)) + else: + # Requirements with specific result + matching_req_ids = [ + s.requirement_id for s in signoffs + if s.result == result_filter + ] + if matching_req_ids: + query = query.filter(RequirementDB.id.in_(matching_req_ids)) + else: + return [], 0 + + # Count and paginate + total = query.count() + requirements = ( + query + .order_by(RequirementDB.article, RequirementDB.paragraph) + .offset((page - 1) * page_size) + .limit(page_size) + .all() + ) + + # Build checklist items + items = [] + for req in requirements: + signoff = signoffs_map.get(req.id) + items.append({ + "requirement_id": req.id, + "regulation_code": req.regulation.code if req.regulation else None, + "regulation_name": req.regulation.name if req.regulation else None, + "article": req.article, + "paragraph": req.paragraph, + "title": req.title, + "description": req.description, + "current_result": signoff.result.value if signoff else AuditResultEnum.PENDING.value, + "notes": signoff.notes if signoff else None, + "is_signed": bool(signoff.signature_hash) if signoff else False, + "signed_at": signoff.signed_at if signoff else None, + "signed_by": signoff.signed_by if signoff else None, + "evidence_count": len(req.control_mappings) if req.control_mappings else 0, + "controls_mapped": len(req.control_mappings) if req.control_mappings else 0, + }) + + return items, total + + def delete(self, signoff_id: str) -> bool: + """Delete a sign-off.""" + signoff = self.db.query(AuditSignOffDB).filter( + AuditSignOffDB.id == signoff_id + ).first() + if not signoff: + return False + + session_id = signoff.session_id + self.db.delete(signoff) + self.db.commit() + + # Update session progress + self._update_session_progress(session_id) + + return True + diff --git a/backend-compliance/compliance/db/control_repository.py b/backend-compliance/compliance/db/control_repository.py new file mode 100644 index 0000000..2bef143 --- /dev/null +++ b/backend-compliance/compliance/db/control_repository.py @@ -0,0 +1,291 @@ +""" +Compliance repositories — extracted from compliance/db/repository.py. + +Phase 1 Step 5: the monolithic repository module is decomposed per +aggregate. Every repository class is re-exported from +``compliance.db.repository`` for backwards compatibility. +""" + +import uuid +from datetime import datetime, date, timezone +from typing import List, Optional, Dict, Any, Tuple + +from sqlalchemy.orm import Session as DBSession, selectinload, joinedload +from sqlalchemy import func, and_, or_ + +from compliance.db.models import ( + RegulationDB, RequirementDB, ControlDB, ControlMappingDB, + EvidenceDB, RiskDB, AuditExportDB, + AuditSessionDB, AuditSignOffDB, AuditResultEnum, AuditSessionStatusEnum, + RegulationTypeEnum, ControlDomainEnum, ControlStatusEnum, + RiskLevelEnum, EvidenceStatusEnum, ExportStatusEnum, + ServiceModuleDB, ModuleRegulationMappingDB, +) + +class ControlRepository: + """Repository for controls.""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + control_id: str, + domain: ControlDomainEnum, + control_type: str, + title: str, + pass_criteria: str, + description: Optional[str] = None, + implementation_guidance: Optional[str] = None, + code_reference: Optional[str] = None, + is_automated: bool = False, + automation_tool: Optional[str] = None, + owner: Optional[str] = None, + review_frequency_days: int = 90, + ) -> ControlDB: + """Create a new control.""" + control = ControlDB( + id=str(uuid.uuid4()), + control_id=control_id, + domain=domain, + control_type=control_type, + title=title, + description=description, + pass_criteria=pass_criteria, + implementation_guidance=implementation_guidance, + code_reference=code_reference, + is_automated=is_automated, + automation_tool=automation_tool, + owner=owner, + review_frequency_days=review_frequency_days, + ) + self.db.add(control) + self.db.commit() + self.db.refresh(control) + return control + + def get_by_id(self, control_uuid: str) -> Optional[ControlDB]: + """Get control by UUID with eager-loaded relationships.""" + return ( + self.db.query(ControlDB) + .options( + selectinload(ControlDB.mappings).selectinload(ControlMappingDB.requirement), + selectinload(ControlDB.evidence) + ) + .filter(ControlDB.id == control_uuid) + .first() + ) + + def get_by_control_id(self, control_id: str) -> Optional[ControlDB]: + """Get control by control_id (e.g., 'PRIV-001') with eager-loaded relationships.""" + return ( + self.db.query(ControlDB) + .options( + selectinload(ControlDB.mappings).selectinload(ControlMappingDB.requirement), + selectinload(ControlDB.evidence) + ) + .filter(ControlDB.control_id == control_id) + .first() + ) + + def get_all( + self, + domain: Optional[ControlDomainEnum] = None, + status: Optional[ControlStatusEnum] = None, + is_automated: Optional[bool] = None, + ) -> List[ControlDB]: + """Get all controls with optional filters and eager-loading.""" + query = ( + self.db.query(ControlDB) + .options( + selectinload(ControlDB.mappings), + selectinload(ControlDB.evidence) + ) + ) + if domain: + query = query.filter(ControlDB.domain == domain) + if status: + query = query.filter(ControlDB.status == status) + if is_automated is not None: + query = query.filter(ControlDB.is_automated == is_automated) + return query.order_by(ControlDB.control_id).all() + + def get_paginated( + self, + page: int = 1, + page_size: int = 50, + domain: Optional[ControlDomainEnum] = None, + status: Optional[ControlStatusEnum] = None, + is_automated: Optional[bool] = None, + search: Optional[str] = None, + ) -> Tuple[List[ControlDB], int]: + """ + Get paginated controls with eager-loaded relationships. + Returns tuple of (items, total_count). + """ + query = ( + self.db.query(ControlDB) + .options( + selectinload(ControlDB.mappings), + selectinload(ControlDB.evidence) + ) + ) + + if domain: + query = query.filter(ControlDB.domain == domain) + if status: + query = query.filter(ControlDB.status == status) + if is_automated is not None: + query = query.filter(ControlDB.is_automated == is_automated) + if search: + search_term = f"%{search}%" + query = query.filter( + or_( + ControlDB.title.ilike(search_term), + ControlDB.description.ilike(search_term), + ControlDB.control_id.ilike(search_term), + ) + ) + + total = query.count() + items = ( + query + .order_by(ControlDB.control_id) + .offset((page - 1) * page_size) + .limit(page_size) + .all() + ) + + return items, total + + def get_by_domain(self, domain: ControlDomainEnum) -> List[ControlDB]: + """Get all controls in a domain.""" + return self.get_all(domain=domain) + + def get_by_status(self, status: ControlStatusEnum) -> List[ControlDB]: + """Get all controls with a specific status.""" + return self.get_all(status=status) + + def update_status( + self, + control_id: str, + status: ControlStatusEnum, + status_notes: Optional[str] = None + ) -> Optional[ControlDB]: + """Update control status.""" + control = self.get_by_control_id(control_id) + if not control: + return None + control.status = status + if status_notes: + control.status_notes = status_notes + control.updated_at = datetime.now(timezone.utc) + self.db.commit() + self.db.refresh(control) + return control + + def mark_reviewed(self, control_id: str) -> Optional[ControlDB]: + """Mark control as reviewed.""" + control = self.get_by_control_id(control_id) + if not control: + return None + control.last_reviewed_at = datetime.now(timezone.utc) + from datetime import timedelta + control.next_review_at = datetime.now(timezone.utc) + timedelta(days=control.review_frequency_days) + control.updated_at = datetime.now(timezone.utc) + self.db.commit() + self.db.refresh(control) + return control + + def get_due_for_review(self) -> List[ControlDB]: + """Get controls due for review.""" + return ( + self.db.query(ControlDB) + .filter( + or_( + ControlDB.next_review_at is None, + ControlDB.next_review_at <= datetime.now(timezone.utc) + ) + ) + .order_by(ControlDB.next_review_at) + .all() + ) + + def get_statistics(self) -> Dict[str, Any]: + """Get control statistics by status and domain.""" + total = self.db.query(func.count(ControlDB.id)).scalar() + + by_status = dict( + self.db.query(ControlDB.status, func.count(ControlDB.id)) + .group_by(ControlDB.status) + .all() + ) + + by_domain = dict( + self.db.query(ControlDB.domain, func.count(ControlDB.id)) + .group_by(ControlDB.domain) + .all() + ) + + passed = by_status.get(ControlStatusEnum.PASS, 0) + partial = by_status.get(ControlStatusEnum.PARTIAL, 0) + + score = 0.0 + if total > 0: + score = ((passed + (partial * 0.5)) / total) * 100 + + return { + "total": total, + "by_status": {str(k.value) if k else "none": v for k, v in by_status.items()}, + "by_domain": {str(k.value) if k else "none": v for k, v in by_domain.items()}, + "compliance_score": round(score, 1), + } + + +class ControlMappingRepository: + """Repository for requirement-control mappings.""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + requirement_id: str, + control_id: str, + coverage_level: str = "full", + notes: Optional[str] = None, + ) -> ControlMappingDB: + """Create a mapping.""" + # Get the control UUID from control_id + control = self.db.query(ControlDB).filter(ControlDB.control_id == control_id).first() + if not control: + raise ValueError(f"Control {control_id} not found") + + mapping = ControlMappingDB( + id=str(uuid.uuid4()), + requirement_id=requirement_id, + control_id=control.id, + coverage_level=coverage_level, + notes=notes, + ) + self.db.add(mapping) + self.db.commit() + self.db.refresh(mapping) + return mapping + + def get_by_requirement(self, requirement_id: str) -> List[ControlMappingDB]: + """Get all mappings for a requirement.""" + return ( + self.db.query(ControlMappingDB) + .filter(ControlMappingDB.requirement_id == requirement_id) + .all() + ) + + def get_by_control(self, control_uuid: str) -> List[ControlMappingDB]: + """Get all mappings for a control.""" + return ( + self.db.query(ControlMappingDB) + .filter(ControlMappingDB.control_id == control_uuid) + .all() + ) + diff --git a/backend-compliance/compliance/db/evidence_repository.py b/backend-compliance/compliance/db/evidence_repository.py new file mode 100644 index 0000000..d645bd0 --- /dev/null +++ b/backend-compliance/compliance/db/evidence_repository.py @@ -0,0 +1,143 @@ +""" +Compliance repositories — extracted from compliance/db/repository.py. + +Phase 1 Step 5: the monolithic repository module is decomposed per +aggregate. Every repository class is re-exported from +``compliance.db.repository`` for backwards compatibility. +""" + +import uuid +from datetime import datetime, date, timezone +from typing import List, Optional, Dict, Any, Tuple + +from sqlalchemy.orm import Session as DBSession, selectinload, joinedload +from sqlalchemy import func, and_, or_ + +from compliance.db.models import ( + RegulationDB, RequirementDB, ControlDB, ControlMappingDB, + EvidenceDB, RiskDB, AuditExportDB, + AuditSessionDB, AuditSignOffDB, AuditResultEnum, AuditSessionStatusEnum, + RegulationTypeEnum, ControlDomainEnum, ControlStatusEnum, + RiskLevelEnum, EvidenceStatusEnum, ExportStatusEnum, + ServiceModuleDB, ModuleRegulationMappingDB, +) + +class EvidenceRepository: + """Repository for evidence.""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + control_id: str, + evidence_type: str, + title: str, + description: Optional[str] = None, + artifact_path: Optional[str] = None, + artifact_url: Optional[str] = None, + artifact_hash: Optional[str] = None, + file_size_bytes: Optional[int] = None, + mime_type: Optional[str] = None, + valid_until: Optional[datetime] = None, + source: str = "manual", + ci_job_id: Optional[str] = None, + uploaded_by: Optional[str] = None, + ) -> EvidenceDB: + """Create evidence record.""" + # Get control UUID + control = self.db.query(ControlDB).filter(ControlDB.control_id == control_id).first() + if not control: + raise ValueError(f"Control {control_id} not found") + + evidence = EvidenceDB( + id=str(uuid.uuid4()), + control_id=control.id, + evidence_type=evidence_type, + title=title, + description=description, + artifact_path=artifact_path, + artifact_url=artifact_url, + artifact_hash=artifact_hash, + file_size_bytes=file_size_bytes, + mime_type=mime_type, + valid_until=valid_until, + source=source, + ci_job_id=ci_job_id, + uploaded_by=uploaded_by, + ) + self.db.add(evidence) + self.db.commit() + self.db.refresh(evidence) + return evidence + + def get_by_id(self, evidence_id: str) -> Optional[EvidenceDB]: + """Get evidence by ID.""" + return self.db.query(EvidenceDB).filter(EvidenceDB.id == evidence_id).first() + + def get_by_control( + self, + control_id: str, + status: Optional[EvidenceStatusEnum] = None + ) -> List[EvidenceDB]: + """Get all evidence for a control.""" + control = self.db.query(ControlDB).filter(ControlDB.control_id == control_id).first() + if not control: + return [] + + query = self.db.query(EvidenceDB).filter(EvidenceDB.control_id == control.id) + if status: + query = query.filter(EvidenceDB.status == status) + return query.order_by(EvidenceDB.collected_at.desc()).all() + + def get_all( + self, + evidence_type: Optional[str] = None, + status: Optional[EvidenceStatusEnum] = None, + limit: int = 100, + ) -> List[EvidenceDB]: + """Get all evidence with filters.""" + query = self.db.query(EvidenceDB) + if evidence_type: + query = query.filter(EvidenceDB.evidence_type == evidence_type) + if status: + query = query.filter(EvidenceDB.status == status) + return query.order_by(EvidenceDB.collected_at.desc()).limit(limit).all() + + def update_status(self, evidence_id: str, status: EvidenceStatusEnum) -> Optional[EvidenceDB]: + """Update evidence status.""" + evidence = self.get_by_id(evidence_id) + if not evidence: + return None + evidence.status = status + evidence.updated_at = datetime.now(timezone.utc) + self.db.commit() + self.db.refresh(evidence) + return evidence + + def get_statistics(self) -> Dict[str, Any]: + """Get evidence statistics.""" + total = self.db.query(func.count(EvidenceDB.id)).scalar() + + by_type = dict( + self.db.query(EvidenceDB.evidence_type, func.count(EvidenceDB.id)) + .group_by(EvidenceDB.evidence_type) + .all() + ) + + by_status = dict( + self.db.query(EvidenceDB.status, func.count(EvidenceDB.id)) + .group_by(EvidenceDB.status) + .all() + ) + + valid = by_status.get(EvidenceStatusEnum.VALID, 0) + coverage = (valid / total * 100) if total > 0 else 0 + + return { + "total": total, + "by_type": by_type, + "by_status": {str(k.value) if k else "none": v for k, v in by_status.items()}, + "coverage_percent": round(coverage, 1), + } + diff --git a/backend-compliance/compliance/db/isms_audit_repository.py b/backend-compliance/compliance/db/isms_audit_repository.py new file mode 100644 index 0000000..48df478 --- /dev/null +++ b/backend-compliance/compliance/db/isms_audit_repository.py @@ -0,0 +1,499 @@ +""" +ISMS repositories — extracted from compliance/db/isms_repository.py. + +Phase 1 Step 5: split per sub-aggregate. Re-exported from +``compliance.db.isms_repository`` for backwards compatibility. +""" + +import uuid +from datetime import datetime, date, timezone +from typing import List, Optional, Dict, Any, Tuple + +from sqlalchemy.orm import Session as DBSession + +from compliance.db.models import ( + ISMSScopeDB, ISMSPolicyDB, SecurityObjectiveDB, + StatementOfApplicabilityDB, AuditFindingDB, CorrectiveActionDB, + ManagementReviewDB, InternalAuditDB, AuditTrailDB, ISMSReadinessCheckDB, + ApprovalStatusEnum, FindingTypeEnum, FindingStatusEnum, CAPATypeEnum, +) + +class AuditFindingRepository: + """Repository for Audit Findings (Major/Minor/OFI).""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + finding_type: FindingTypeEnum, + title: str, + description: str, + auditor: str, + iso_chapter: Optional[str] = None, + annex_a_control: Optional[str] = None, + objective_evidence: Optional[str] = None, + owner: Optional[str] = None, + due_date: Optional[date] = None, + internal_audit_id: Optional[str] = None, + ) -> AuditFindingDB: + """Create a new audit finding.""" + # Generate finding ID + year = date.today().year + existing_count = self.db.query(AuditFindingDB).filter( + AuditFindingDB.finding_id.like(f"FIND-{year}-%") + ).count() + finding_id = f"FIND-{year}-{existing_count + 1:03d}" + + finding = AuditFindingDB( + id=str(uuid.uuid4()), + finding_id=finding_id, + finding_type=finding_type, + iso_chapter=iso_chapter, + annex_a_control=annex_a_control, + title=title, + description=description, + objective_evidence=objective_evidence, + owner=owner, + auditor=auditor, + due_date=due_date, + internal_audit_id=internal_audit_id, + status=FindingStatusEnum.OPEN, + ) + self.db.add(finding) + self.db.commit() + self.db.refresh(finding) + return finding + + def get_by_id(self, finding_id: str) -> Optional[AuditFindingDB]: + """Get finding by UUID or finding_id.""" + return self.db.query(AuditFindingDB).filter( + (AuditFindingDB.id == finding_id) | (AuditFindingDB.finding_id == finding_id) + ).first() + + def get_all( + self, + finding_type: Optional[FindingTypeEnum] = None, + status: Optional[FindingStatusEnum] = None, + internal_audit_id: Optional[str] = None, + ) -> List[AuditFindingDB]: + """Get all findings with optional filters.""" + query = self.db.query(AuditFindingDB) + if finding_type: + query = query.filter(AuditFindingDB.finding_type == finding_type) + if status: + query = query.filter(AuditFindingDB.status == status) + if internal_audit_id: + query = query.filter(AuditFindingDB.internal_audit_id == internal_audit_id) + return query.order_by(AuditFindingDB.identified_date.desc()).all() + + def get_open_majors(self) -> List[AuditFindingDB]: + """Get all open major findings (blocking certification).""" + return self.db.query(AuditFindingDB).filter( + AuditFindingDB.finding_type == FindingTypeEnum.MAJOR, + AuditFindingDB.status != FindingStatusEnum.CLOSED + ).all() + + def get_statistics(self) -> Dict[str, Any]: + """Get finding statistics.""" + findings = self.get_all() + + return { + "total": len(findings), + "major": sum(1 for f in findings if f.finding_type == FindingTypeEnum.MAJOR), + "minor": sum(1 for f in findings if f.finding_type == FindingTypeEnum.MINOR), + "ofi": sum(1 for f in findings if f.finding_type == FindingTypeEnum.OFI), + "positive": sum(1 for f in findings if f.finding_type == FindingTypeEnum.POSITIVE), + "open": sum(1 for f in findings if f.status != FindingStatusEnum.CLOSED), + "closed": sum(1 for f in findings if f.status == FindingStatusEnum.CLOSED), + "blocking_certification": sum( + 1 for f in findings + if f.finding_type == FindingTypeEnum.MAJOR and f.status != FindingStatusEnum.CLOSED + ), + } + + def close( + self, + finding_id: str, + closed_by: str, + closure_notes: str, + verification_method: Optional[str] = None, + verification_evidence: Optional[str] = None, + ) -> Optional[AuditFindingDB]: + """Close a finding after verification.""" + finding = self.get_by_id(finding_id) + if not finding: + return None + + finding.status = FindingStatusEnum.CLOSED + finding.closed_date = date.today() + finding.closed_by = closed_by + finding.closure_notes = closure_notes + finding.verification_method = verification_method + finding.verification_evidence = verification_evidence + finding.verified_by = closed_by + finding.verified_at = datetime.now(timezone.utc) + + self.db.commit() + self.db.refresh(finding) + return finding + + +class CorrectiveActionRepository: + """Repository for Corrective/Preventive Actions (CAPA).""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + finding_id: str, + capa_type: CAPATypeEnum, + title: str, + description: str, + assigned_to: str, + planned_completion: date, + expected_outcome: Optional[str] = None, + effectiveness_criteria: Optional[str] = None, + ) -> CorrectiveActionDB: + """Create a new CAPA.""" + # Generate CAPA ID + year = date.today().year + existing_count = self.db.query(CorrectiveActionDB).filter( + CorrectiveActionDB.capa_id.like(f"CAPA-{year}-%") + ).count() + capa_id = f"CAPA-{year}-{existing_count + 1:03d}" + + capa = CorrectiveActionDB( + id=str(uuid.uuid4()), + capa_id=capa_id, + finding_id=finding_id, + capa_type=capa_type, + title=title, + description=description, + expected_outcome=expected_outcome, + assigned_to=assigned_to, + planned_completion=planned_completion, + effectiveness_criteria=effectiveness_criteria, + status="planned", + ) + self.db.add(capa) + + # Update finding status + finding = self.db.query(AuditFindingDB).filter(AuditFindingDB.id == finding_id).first() + if finding: + finding.status = FindingStatusEnum.CORRECTIVE_ACTION_PENDING + + self.db.commit() + self.db.refresh(capa) + return capa + + def get_by_id(self, capa_id: str) -> Optional[CorrectiveActionDB]: + """Get CAPA by UUID or capa_id.""" + return self.db.query(CorrectiveActionDB).filter( + (CorrectiveActionDB.id == capa_id) | (CorrectiveActionDB.capa_id == capa_id) + ).first() + + def get_by_finding(self, finding_id: str) -> List[CorrectiveActionDB]: + """Get all CAPAs for a finding.""" + return self.db.query(CorrectiveActionDB).filter( + CorrectiveActionDB.finding_id == finding_id + ).order_by(CorrectiveActionDB.planned_completion).all() + + def verify( + self, + capa_id: str, + verified_by: str, + is_effective: bool, + effectiveness_notes: Optional[str] = None, + ) -> Optional[CorrectiveActionDB]: + """Verify a completed CAPA.""" + capa = self.get_by_id(capa_id) + if not capa: + return None + + capa.effectiveness_verified = is_effective + capa.effectiveness_verification_date = date.today() + capa.effectiveness_notes = effectiveness_notes + capa.status = "verified" if is_effective else "completed" + + # If verified, check if all CAPAs for finding are verified + if is_effective: + finding = self.db.query(AuditFindingDB).filter( + AuditFindingDB.id == capa.finding_id + ).first() + if finding: + unverified = self.db.query(CorrectiveActionDB).filter( + CorrectiveActionDB.finding_id == finding.id, + CorrectiveActionDB.id != capa.id, + CorrectiveActionDB.status != "verified" + ).count() + if unverified == 0: + finding.status = FindingStatusEnum.VERIFICATION_PENDING + + self.db.commit() + self.db.refresh(capa) + return capa + + +class ManagementReviewRepository: + """Repository for Management Reviews (ISO 27001 Chapter 9.3).""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + title: str, + review_date: date, + chairperson: str, + review_period_start: Optional[date] = None, + review_period_end: Optional[date] = None, + ) -> ManagementReviewDB: + """Create a new management review.""" + # Generate review ID + year = review_date.year + quarter = (review_date.month - 1) // 3 + 1 + review_id = f"MR-{year}-Q{quarter}" + + # Check for duplicate + existing = self.db.query(ManagementReviewDB).filter( + ManagementReviewDB.review_id == review_id + ).first() + if existing: + review_id = f"{review_id}-{str(uuid.uuid4())[:4]}" + + review = ManagementReviewDB( + id=str(uuid.uuid4()), + review_id=review_id, + title=title, + review_date=review_date, + review_period_start=review_period_start, + review_period_end=review_period_end, + chairperson=chairperson, + status="draft", + ) + self.db.add(review) + self.db.commit() + self.db.refresh(review) + return review + + def get_by_id(self, review_id: str) -> Optional[ManagementReviewDB]: + """Get review by UUID or review_id.""" + return self.db.query(ManagementReviewDB).filter( + (ManagementReviewDB.id == review_id) | (ManagementReviewDB.review_id == review_id) + ).first() + + def get_latest_approved(self) -> Optional[ManagementReviewDB]: + """Get the most recent approved management review.""" + return self.db.query(ManagementReviewDB).filter( + ManagementReviewDB.status == "approved" + ).order_by(ManagementReviewDB.review_date.desc()).first() + + def approve( + self, + review_id: str, + approved_by: str, + next_review_date: date, + minutes_document_path: Optional[str] = None, + ) -> Optional[ManagementReviewDB]: + """Approve a management review.""" + review = self.get_by_id(review_id) + if not review: + return None + + review.status = "approved" + review.approved_by = approved_by + review.approved_at = datetime.now(timezone.utc) + review.next_review_date = next_review_date + review.minutes_document_path = minutes_document_path + + self.db.commit() + self.db.refresh(review) + return review + + +class InternalAuditRepository: + """Repository for Internal Audits (ISO 27001 Chapter 9.2).""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + title: str, + audit_type: str, + planned_date: date, + lead_auditor: str, + scope_description: Optional[str] = None, + iso_chapters_covered: Optional[List[str]] = None, + annex_a_controls_covered: Optional[List[str]] = None, + ) -> InternalAuditDB: + """Create a new internal audit.""" + # Generate audit ID + year = planned_date.year + existing_count = self.db.query(InternalAuditDB).filter( + InternalAuditDB.audit_id.like(f"IA-{year}-%") + ).count() + audit_id = f"IA-{year}-{existing_count + 1:03d}" + + audit = InternalAuditDB( + id=str(uuid.uuid4()), + audit_id=audit_id, + title=title, + audit_type=audit_type, + scope_description=scope_description, + iso_chapters_covered=iso_chapters_covered, + annex_a_controls_covered=annex_a_controls_covered, + planned_date=planned_date, + lead_auditor=lead_auditor, + status="planned", + ) + self.db.add(audit) + self.db.commit() + self.db.refresh(audit) + return audit + + def get_by_id(self, audit_id: str) -> Optional[InternalAuditDB]: + """Get audit by UUID or audit_id.""" + return self.db.query(InternalAuditDB).filter( + (InternalAuditDB.id == audit_id) | (InternalAuditDB.audit_id == audit_id) + ).first() + + def get_latest_completed(self) -> Optional[InternalAuditDB]: + """Get the most recent completed internal audit.""" + return self.db.query(InternalAuditDB).filter( + InternalAuditDB.status == "completed" + ).order_by(InternalAuditDB.actual_end_date.desc()).first() + + def complete( + self, + audit_id: str, + audit_conclusion: str, + overall_assessment: str, + follow_up_audit_required: bool = False, + ) -> Optional[InternalAuditDB]: + """Complete an internal audit.""" + audit = self.get_by_id(audit_id) + if not audit: + return None + + audit.status = "completed" + audit.actual_end_date = date.today() + audit.report_date = date.today() + audit.audit_conclusion = audit_conclusion + audit.overall_assessment = overall_assessment + audit.follow_up_audit_required = follow_up_audit_required + + self.db.commit() + self.db.refresh(audit) + return audit + + +class AuditTrailRepository: + """Repository for Audit Trail entries.""" + + def __init__(self, db: DBSession): + self.db = db + + def log( + self, + entity_type: str, + entity_id: str, + entity_name: str, + action: str, + performed_by: str, + field_changed: Optional[str] = None, + old_value: Optional[str] = None, + new_value: Optional[str] = None, + change_summary: Optional[str] = None, + ) -> AuditTrailDB: + """Log an audit trail entry.""" + import hashlib + entry = AuditTrailDB( + id=str(uuid.uuid4()), + entity_type=entity_type, + entity_id=entity_id, + entity_name=entity_name, + action=action, + field_changed=field_changed, + old_value=old_value, + new_value=new_value, + change_summary=change_summary, + performed_by=performed_by, + performed_at=datetime.now(timezone.utc), + checksum=hashlib.sha256( + f"{entity_type}|{entity_id}|{action}|{performed_by}".encode() + ).hexdigest(), + ) + self.db.add(entry) + self.db.commit() + self.db.refresh(entry) + return entry + + def get_by_entity( + self, + entity_type: str, + entity_id: str, + limit: int = 100, + ) -> List[AuditTrailDB]: + """Get audit trail for a specific entity.""" + return self.db.query(AuditTrailDB).filter( + AuditTrailDB.entity_type == entity_type, + AuditTrailDB.entity_id == entity_id + ).order_by(AuditTrailDB.performed_at.desc()).limit(limit).all() + + def get_paginated( + self, + page: int = 1, + page_size: int = 50, + entity_type: Optional[str] = None, + entity_id: Optional[str] = None, + performed_by: Optional[str] = None, + action: Optional[str] = None, + ) -> Tuple[List[AuditTrailDB], int]: + """Get paginated audit trail with filters.""" + query = self.db.query(AuditTrailDB) + + if entity_type: + query = query.filter(AuditTrailDB.entity_type == entity_type) + if entity_id: + query = query.filter(AuditTrailDB.entity_id == entity_id) + if performed_by: + query = query.filter(AuditTrailDB.performed_by == performed_by) + if action: + query = query.filter(AuditTrailDB.action == action) + + total = query.count() + entries = query.order_by(AuditTrailDB.performed_at.desc()).offset( + (page - 1) * page_size + ).limit(page_size).all() + + return entries, total + + +class ISMSReadinessCheckRepository: + """Repository for ISMS Readiness Check results.""" + + def __init__(self, db: DBSession): + self.db = db + + def save(self, check: ISMSReadinessCheckDB) -> ISMSReadinessCheckDB: + """Save a readiness check result.""" + self.db.add(check) + self.db.commit() + self.db.refresh(check) + return check + + def get_latest(self) -> Optional[ISMSReadinessCheckDB]: + """Get the most recent readiness check.""" + return self.db.query(ISMSReadinessCheckDB).order_by( + ISMSReadinessCheckDB.check_date.desc() + ).first() + + def get_history(self, limit: int = 10) -> List[ISMSReadinessCheckDB]: + """Get readiness check history.""" + return self.db.query(ISMSReadinessCheckDB).order_by( + ISMSReadinessCheckDB.check_date.desc() + ).limit(limit).all() + diff --git a/backend-compliance/compliance/db/isms_governance_repository.py b/backend-compliance/compliance/db/isms_governance_repository.py new file mode 100644 index 0000000..d09bc14 --- /dev/null +++ b/backend-compliance/compliance/db/isms_governance_repository.py @@ -0,0 +1,354 @@ +""" +ISMS repositories — extracted from compliance/db/isms_repository.py. + +Phase 1 Step 5: split per sub-aggregate. Re-exported from +``compliance.db.isms_repository`` for backwards compatibility. +""" + +import uuid +from datetime import datetime, date, timezone +from typing import List, Optional, Dict, Any, Tuple + +from sqlalchemy.orm import Session as DBSession + +from compliance.db.models import ( + ISMSScopeDB, ISMSPolicyDB, SecurityObjectiveDB, + StatementOfApplicabilityDB, AuditFindingDB, CorrectiveActionDB, + ManagementReviewDB, InternalAuditDB, AuditTrailDB, ISMSReadinessCheckDB, + ApprovalStatusEnum, FindingTypeEnum, FindingStatusEnum, CAPATypeEnum, +) + +class ISMSScopeRepository: + """Repository for ISMS Scope (ISO 27001 Chapter 4.3).""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + scope_statement: str, + created_by: str, + included_locations: Optional[List[str]] = None, + included_processes: Optional[List[str]] = None, + included_services: Optional[List[str]] = None, + excluded_items: Optional[List[str]] = None, + exclusion_justification: Optional[str] = None, + organizational_boundary: Optional[str] = None, + physical_boundary: Optional[str] = None, + technical_boundary: Optional[str] = None, + ) -> ISMSScopeDB: + """Create a new ISMS scope definition.""" + # Supersede existing scopes + existing = self.db.query(ISMSScopeDB).filter( + ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED + ).all() + for s in existing: + s.status = ApprovalStatusEnum.SUPERSEDED + + scope = ISMSScopeDB( + id=str(uuid.uuid4()), + scope_statement=scope_statement, + included_locations=included_locations, + included_processes=included_processes, + included_services=included_services, + excluded_items=excluded_items, + exclusion_justification=exclusion_justification, + organizational_boundary=organizational_boundary, + physical_boundary=physical_boundary, + technical_boundary=technical_boundary, + status=ApprovalStatusEnum.DRAFT, + created_by=created_by, + ) + self.db.add(scope) + self.db.commit() + self.db.refresh(scope) + return scope + + def get_current(self) -> Optional[ISMSScopeDB]: + """Get the current (non-superseded) ISMS scope.""" + return self.db.query(ISMSScopeDB).filter( + ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED + ).order_by(ISMSScopeDB.created_at.desc()).first() + + def get_by_id(self, scope_id: str) -> Optional[ISMSScopeDB]: + """Get scope by ID.""" + return self.db.query(ISMSScopeDB).filter(ISMSScopeDB.id == scope_id).first() + + def approve( + self, + scope_id: str, + approved_by: str, + effective_date: date, + review_date: date, + ) -> Optional[ISMSScopeDB]: + """Approve the ISMS scope.""" + scope = self.get_by_id(scope_id) + if not scope: + return None + + import hashlib + scope.status = ApprovalStatusEnum.APPROVED + scope.approved_by = approved_by + scope.approved_at = datetime.now(timezone.utc) + scope.effective_date = effective_date + scope.review_date = review_date + scope.approval_signature = hashlib.sha256( + f"{scope.scope_statement}|{approved_by}|{datetime.now(timezone.utc).isoformat()}".encode() + ).hexdigest() + + self.db.commit() + self.db.refresh(scope) + return scope + + +class ISMSPolicyRepository: + """Repository for ISMS Policies (ISO 27001 Chapter 5.2).""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + policy_id: str, + title: str, + policy_type: str, + authored_by: str, + description: Optional[str] = None, + policy_text: Optional[str] = None, + applies_to: Optional[List[str]] = None, + review_frequency_months: int = 12, + related_controls: Optional[List[str]] = None, + ) -> ISMSPolicyDB: + """Create a new ISMS policy.""" + policy = ISMSPolicyDB( + id=str(uuid.uuid4()), + policy_id=policy_id, + title=title, + policy_type=policy_type, + description=description, + policy_text=policy_text, + applies_to=applies_to, + review_frequency_months=review_frequency_months, + related_controls=related_controls, + authored_by=authored_by, + status=ApprovalStatusEnum.DRAFT, + ) + self.db.add(policy) + self.db.commit() + self.db.refresh(policy) + return policy + + def get_by_id(self, policy_id: str) -> Optional[ISMSPolicyDB]: + """Get policy by UUID or policy_id.""" + return self.db.query(ISMSPolicyDB).filter( + (ISMSPolicyDB.id == policy_id) | (ISMSPolicyDB.policy_id == policy_id) + ).first() + + def get_all( + self, + policy_type: Optional[str] = None, + status: Optional[ApprovalStatusEnum] = None, + ) -> List[ISMSPolicyDB]: + """Get all policies with optional filters.""" + query = self.db.query(ISMSPolicyDB) + if policy_type: + query = query.filter(ISMSPolicyDB.policy_type == policy_type) + if status: + query = query.filter(ISMSPolicyDB.status == status) + return query.order_by(ISMSPolicyDB.policy_id).all() + + def get_master_policy(self) -> Optional[ISMSPolicyDB]: + """Get the approved master ISMS policy.""" + return self.db.query(ISMSPolicyDB).filter( + ISMSPolicyDB.policy_type == "master", + ISMSPolicyDB.status == ApprovalStatusEnum.APPROVED + ).first() + + def approve( + self, + policy_id: str, + approved_by: str, + reviewed_by: str, + effective_date: date, + ) -> Optional[ISMSPolicyDB]: + """Approve a policy.""" + policy = self.get_by_id(policy_id) + if not policy: + return None + + import hashlib + policy.status = ApprovalStatusEnum.APPROVED + policy.reviewed_by = reviewed_by + policy.approved_by = approved_by + policy.approved_at = datetime.now(timezone.utc) + policy.effective_date = effective_date + policy.next_review_date = date( + effective_date.year + (policy.review_frequency_months // 12), + effective_date.month, + effective_date.day + ) + policy.approval_signature = hashlib.sha256( + f"{policy.policy_id}|{approved_by}|{datetime.now(timezone.utc).isoformat()}".encode() + ).hexdigest() + + self.db.commit() + self.db.refresh(policy) + return policy + + +class SecurityObjectiveRepository: + """Repository for Security Objectives (ISO 27001 Chapter 6.2).""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + objective_id: str, + title: str, + description: str, + category: str, + owner: str, + kpi_name: Optional[str] = None, + kpi_target: Optional[float] = None, + kpi_unit: Optional[str] = None, + target_date: Optional[date] = None, + related_controls: Optional[List[str]] = None, + ) -> SecurityObjectiveDB: + """Create a new security objective.""" + objective = SecurityObjectiveDB( + id=str(uuid.uuid4()), + objective_id=objective_id, + title=title, + description=description, + category=category, + kpi_name=kpi_name, + kpi_target=kpi_target, + kpi_unit=kpi_unit, + owner=owner, + target_date=target_date, + related_controls=related_controls, + status="active", + ) + self.db.add(objective) + self.db.commit() + self.db.refresh(objective) + return objective + + def get_by_id(self, objective_id: str) -> Optional[SecurityObjectiveDB]: + """Get objective by UUID or objective_id.""" + return self.db.query(SecurityObjectiveDB).filter( + (SecurityObjectiveDB.id == objective_id) | + (SecurityObjectiveDB.objective_id == objective_id) + ).first() + + def get_all( + self, + category: Optional[str] = None, + status: Optional[str] = None, + ) -> List[SecurityObjectiveDB]: + """Get all objectives with optional filters.""" + query = self.db.query(SecurityObjectiveDB) + if category: + query = query.filter(SecurityObjectiveDB.category == category) + if status: + query = query.filter(SecurityObjectiveDB.status == status) + return query.order_by(SecurityObjectiveDB.objective_id).all() + + def update_progress( + self, + objective_id: str, + kpi_current: float, + ) -> Optional[SecurityObjectiveDB]: + """Update objective progress.""" + objective = self.get_by_id(objective_id) + if not objective: + return None + + objective.kpi_current = kpi_current + if objective.kpi_target: + objective.progress_percentage = min(100, (kpi_current / objective.kpi_target) * 100) + + # Auto-mark as achieved if 100% + if objective.progress_percentage >= 100 and objective.status == "active": + objective.status = "achieved" + objective.achieved_date = date.today() + + self.db.commit() + self.db.refresh(objective) + return objective + + +class StatementOfApplicabilityRepository: + """Repository for Statement of Applicability (SoA).""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + annex_a_control: str, + annex_a_title: str, + annex_a_category: str, + is_applicable: bool = True, + applicability_justification: Optional[str] = None, + implementation_status: str = "planned", + breakpilot_control_ids: Optional[List[str]] = None, + ) -> StatementOfApplicabilityDB: + """Create a new SoA entry.""" + entry = StatementOfApplicabilityDB( + id=str(uuid.uuid4()), + annex_a_control=annex_a_control, + annex_a_title=annex_a_title, + annex_a_category=annex_a_category, + is_applicable=is_applicable, + applicability_justification=applicability_justification, + implementation_status=implementation_status, + breakpilot_control_ids=breakpilot_control_ids or [], + ) + self.db.add(entry) + self.db.commit() + self.db.refresh(entry) + return entry + + def get_by_control(self, annex_a_control: str) -> Optional[StatementOfApplicabilityDB]: + """Get SoA entry by Annex A control ID (e.g., 'A.5.1').""" + return self.db.query(StatementOfApplicabilityDB).filter( + StatementOfApplicabilityDB.annex_a_control == annex_a_control + ).first() + + def get_all( + self, + is_applicable: Optional[bool] = None, + implementation_status: Optional[str] = None, + category: Optional[str] = None, + ) -> List[StatementOfApplicabilityDB]: + """Get all SoA entries with optional filters.""" + query = self.db.query(StatementOfApplicabilityDB) + if is_applicable is not None: + query = query.filter(StatementOfApplicabilityDB.is_applicable == is_applicable) + if implementation_status: + query = query.filter(StatementOfApplicabilityDB.implementation_status == implementation_status) + if category: + query = query.filter(StatementOfApplicabilityDB.annex_a_category == category) + return query.order_by(StatementOfApplicabilityDB.annex_a_control).all() + + def get_statistics(self) -> Dict[str, Any]: + """Get SoA statistics.""" + entries = self.get_all() + total = len(entries) + applicable = sum(1 for e in entries if e.is_applicable) + implemented = sum(1 for e in entries if e.implementation_status == "implemented") + approved = sum(1 for e in entries if e.approved_at) + + return { + "total": total, + "applicable": applicable, + "not_applicable": total - applicable, + "implemented": implemented, + "planned": sum(1 for e in entries if e.implementation_status == "planned"), + "approved": approved, + "pending_approval": total - approved, + "implementation_rate": round((implemented / applicable * 100) if applicable > 0 else 0, 1), + } + diff --git a/backend-compliance/compliance/db/isms_repository.py b/backend-compliance/compliance/db/isms_repository.py index e3ce768..7ee4241 100644 --- a/backend-compliance/compliance/db/isms_repository.py +++ b/backend-compliance/compliance/db/isms_repository.py @@ -1,838 +1,25 @@ """ -Repository layer for ISMS (Information Security Management System) entities. +compliance.db.isms_repository — backwards-compatibility re-export shim. -Provides CRUD operations for ISO 27001 certification-related entities: -- ISMS Scope & Context -- Policies & Objectives -- Statement of Applicability (SoA) -- Audit Findings & CAPA -- Management Reviews & Internal Audits +Phase 1 Step 5 split the 838-line ISMS repository module into two +sub-aggregate sibling modules: governance (scope, policy, objective, SoA) +and audit execution (finding, CAPA, review, internal audit, trail, readiness). + +Every repository class is re-exported so existing imports continue to work. +New code SHOULD import from the sub-aggregate module directly. """ -import uuid -from datetime import datetime, date, timezone -from typing import List, Optional, Dict, Any, Tuple - -from sqlalchemy.orm import Session as DBSession - -from .models import ( - ISMSScopeDB, ISMSPolicyDB, SecurityObjectiveDB, - StatementOfApplicabilityDB, AuditFindingDB, CorrectiveActionDB, - ManagementReviewDB, InternalAuditDB, AuditTrailDB, ISMSReadinessCheckDB, - ApprovalStatusEnum, FindingTypeEnum, FindingStatusEnum, CAPATypeEnum +from compliance.db.isms_governance_repository import ( # noqa: F401 + ISMSScopeRepository, + ISMSPolicyRepository, + SecurityObjectiveRepository, + StatementOfApplicabilityRepository, +) +from compliance.db.isms_audit_repository import ( # noqa: F401 + AuditFindingRepository, + CorrectiveActionRepository, + ManagementReviewRepository, + InternalAuditRepository, + AuditTrailRepository, + ISMSReadinessCheckRepository, ) - - -class ISMSScopeRepository: - """Repository for ISMS Scope (ISO 27001 Chapter 4.3).""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - scope_statement: str, - created_by: str, - included_locations: Optional[List[str]] = None, - included_processes: Optional[List[str]] = None, - included_services: Optional[List[str]] = None, - excluded_items: Optional[List[str]] = None, - exclusion_justification: Optional[str] = None, - organizational_boundary: Optional[str] = None, - physical_boundary: Optional[str] = None, - technical_boundary: Optional[str] = None, - ) -> ISMSScopeDB: - """Create a new ISMS scope definition.""" - # Supersede existing scopes - existing = self.db.query(ISMSScopeDB).filter( - ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED - ).all() - for s in existing: - s.status = ApprovalStatusEnum.SUPERSEDED - - scope = ISMSScopeDB( - id=str(uuid.uuid4()), - scope_statement=scope_statement, - included_locations=included_locations, - included_processes=included_processes, - included_services=included_services, - excluded_items=excluded_items, - exclusion_justification=exclusion_justification, - organizational_boundary=organizational_boundary, - physical_boundary=physical_boundary, - technical_boundary=technical_boundary, - status=ApprovalStatusEnum.DRAFT, - created_by=created_by, - ) - self.db.add(scope) - self.db.commit() - self.db.refresh(scope) - return scope - - def get_current(self) -> Optional[ISMSScopeDB]: - """Get the current (non-superseded) ISMS scope.""" - return self.db.query(ISMSScopeDB).filter( - ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED - ).order_by(ISMSScopeDB.created_at.desc()).first() - - def get_by_id(self, scope_id: str) -> Optional[ISMSScopeDB]: - """Get scope by ID.""" - return self.db.query(ISMSScopeDB).filter(ISMSScopeDB.id == scope_id).first() - - def approve( - self, - scope_id: str, - approved_by: str, - effective_date: date, - review_date: date, - ) -> Optional[ISMSScopeDB]: - """Approve the ISMS scope.""" - scope = self.get_by_id(scope_id) - if not scope: - return None - - import hashlib - scope.status = ApprovalStatusEnum.APPROVED - scope.approved_by = approved_by - scope.approved_at = datetime.now(timezone.utc) - scope.effective_date = effective_date - scope.review_date = review_date - scope.approval_signature = hashlib.sha256( - f"{scope.scope_statement}|{approved_by}|{datetime.now(timezone.utc).isoformat()}".encode() - ).hexdigest() - - self.db.commit() - self.db.refresh(scope) - return scope - - -class ISMSPolicyRepository: - """Repository for ISMS Policies (ISO 27001 Chapter 5.2).""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - policy_id: str, - title: str, - policy_type: str, - authored_by: str, - description: Optional[str] = None, - policy_text: Optional[str] = None, - applies_to: Optional[List[str]] = None, - review_frequency_months: int = 12, - related_controls: Optional[List[str]] = None, - ) -> ISMSPolicyDB: - """Create a new ISMS policy.""" - policy = ISMSPolicyDB( - id=str(uuid.uuid4()), - policy_id=policy_id, - title=title, - policy_type=policy_type, - description=description, - policy_text=policy_text, - applies_to=applies_to, - review_frequency_months=review_frequency_months, - related_controls=related_controls, - authored_by=authored_by, - status=ApprovalStatusEnum.DRAFT, - ) - self.db.add(policy) - self.db.commit() - self.db.refresh(policy) - return policy - - def get_by_id(self, policy_id: str) -> Optional[ISMSPolicyDB]: - """Get policy by UUID or policy_id.""" - return self.db.query(ISMSPolicyDB).filter( - (ISMSPolicyDB.id == policy_id) | (ISMSPolicyDB.policy_id == policy_id) - ).first() - - def get_all( - self, - policy_type: Optional[str] = None, - status: Optional[ApprovalStatusEnum] = None, - ) -> List[ISMSPolicyDB]: - """Get all policies with optional filters.""" - query = self.db.query(ISMSPolicyDB) - if policy_type: - query = query.filter(ISMSPolicyDB.policy_type == policy_type) - if status: - query = query.filter(ISMSPolicyDB.status == status) - return query.order_by(ISMSPolicyDB.policy_id).all() - - def get_master_policy(self) -> Optional[ISMSPolicyDB]: - """Get the approved master ISMS policy.""" - return self.db.query(ISMSPolicyDB).filter( - ISMSPolicyDB.policy_type == "master", - ISMSPolicyDB.status == ApprovalStatusEnum.APPROVED - ).first() - - def approve( - self, - policy_id: str, - approved_by: str, - reviewed_by: str, - effective_date: date, - ) -> Optional[ISMSPolicyDB]: - """Approve a policy.""" - policy = self.get_by_id(policy_id) - if not policy: - return None - - import hashlib - policy.status = ApprovalStatusEnum.APPROVED - policy.reviewed_by = reviewed_by - policy.approved_by = approved_by - policy.approved_at = datetime.now(timezone.utc) - policy.effective_date = effective_date - policy.next_review_date = date( - effective_date.year + (policy.review_frequency_months // 12), - effective_date.month, - effective_date.day - ) - policy.approval_signature = hashlib.sha256( - f"{policy.policy_id}|{approved_by}|{datetime.now(timezone.utc).isoformat()}".encode() - ).hexdigest() - - self.db.commit() - self.db.refresh(policy) - return policy - - -class SecurityObjectiveRepository: - """Repository for Security Objectives (ISO 27001 Chapter 6.2).""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - objective_id: str, - title: str, - description: str, - category: str, - owner: str, - kpi_name: Optional[str] = None, - kpi_target: Optional[float] = None, - kpi_unit: Optional[str] = None, - target_date: Optional[date] = None, - related_controls: Optional[List[str]] = None, - ) -> SecurityObjectiveDB: - """Create a new security objective.""" - objective = SecurityObjectiveDB( - id=str(uuid.uuid4()), - objective_id=objective_id, - title=title, - description=description, - category=category, - kpi_name=kpi_name, - kpi_target=kpi_target, - kpi_unit=kpi_unit, - owner=owner, - target_date=target_date, - related_controls=related_controls, - status="active", - ) - self.db.add(objective) - self.db.commit() - self.db.refresh(objective) - return objective - - def get_by_id(self, objective_id: str) -> Optional[SecurityObjectiveDB]: - """Get objective by UUID or objective_id.""" - return self.db.query(SecurityObjectiveDB).filter( - (SecurityObjectiveDB.id == objective_id) | - (SecurityObjectiveDB.objective_id == objective_id) - ).first() - - def get_all( - self, - category: Optional[str] = None, - status: Optional[str] = None, - ) -> List[SecurityObjectiveDB]: - """Get all objectives with optional filters.""" - query = self.db.query(SecurityObjectiveDB) - if category: - query = query.filter(SecurityObjectiveDB.category == category) - if status: - query = query.filter(SecurityObjectiveDB.status == status) - return query.order_by(SecurityObjectiveDB.objective_id).all() - - def update_progress( - self, - objective_id: str, - kpi_current: float, - ) -> Optional[SecurityObjectiveDB]: - """Update objective progress.""" - objective = self.get_by_id(objective_id) - if not objective: - return None - - objective.kpi_current = kpi_current - if objective.kpi_target: - objective.progress_percentage = min(100, (kpi_current / objective.kpi_target) * 100) - - # Auto-mark as achieved if 100% - if objective.progress_percentage >= 100 and objective.status == "active": - objective.status = "achieved" - objective.achieved_date = date.today() - - self.db.commit() - self.db.refresh(objective) - return objective - - -class StatementOfApplicabilityRepository: - """Repository for Statement of Applicability (SoA).""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - annex_a_control: str, - annex_a_title: str, - annex_a_category: str, - is_applicable: bool = True, - applicability_justification: Optional[str] = None, - implementation_status: str = "planned", - breakpilot_control_ids: Optional[List[str]] = None, - ) -> StatementOfApplicabilityDB: - """Create a new SoA entry.""" - entry = StatementOfApplicabilityDB( - id=str(uuid.uuid4()), - annex_a_control=annex_a_control, - annex_a_title=annex_a_title, - annex_a_category=annex_a_category, - is_applicable=is_applicable, - applicability_justification=applicability_justification, - implementation_status=implementation_status, - breakpilot_control_ids=breakpilot_control_ids or [], - ) - self.db.add(entry) - self.db.commit() - self.db.refresh(entry) - return entry - - def get_by_control(self, annex_a_control: str) -> Optional[StatementOfApplicabilityDB]: - """Get SoA entry by Annex A control ID (e.g., 'A.5.1').""" - return self.db.query(StatementOfApplicabilityDB).filter( - StatementOfApplicabilityDB.annex_a_control == annex_a_control - ).first() - - def get_all( - self, - is_applicable: Optional[bool] = None, - implementation_status: Optional[str] = None, - category: Optional[str] = None, - ) -> List[StatementOfApplicabilityDB]: - """Get all SoA entries with optional filters.""" - query = self.db.query(StatementOfApplicabilityDB) - if is_applicable is not None: - query = query.filter(StatementOfApplicabilityDB.is_applicable == is_applicable) - if implementation_status: - query = query.filter(StatementOfApplicabilityDB.implementation_status == implementation_status) - if category: - query = query.filter(StatementOfApplicabilityDB.annex_a_category == category) - return query.order_by(StatementOfApplicabilityDB.annex_a_control).all() - - def get_statistics(self) -> Dict[str, Any]: - """Get SoA statistics.""" - entries = self.get_all() - total = len(entries) - applicable = sum(1 for e in entries if e.is_applicable) - implemented = sum(1 for e in entries if e.implementation_status == "implemented") - approved = sum(1 for e in entries if e.approved_at) - - return { - "total": total, - "applicable": applicable, - "not_applicable": total - applicable, - "implemented": implemented, - "planned": sum(1 for e in entries if e.implementation_status == "planned"), - "approved": approved, - "pending_approval": total - approved, - "implementation_rate": round((implemented / applicable * 100) if applicable > 0 else 0, 1), - } - - -class AuditFindingRepository: - """Repository for Audit Findings (Major/Minor/OFI).""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - finding_type: FindingTypeEnum, - title: str, - description: str, - auditor: str, - iso_chapter: Optional[str] = None, - annex_a_control: Optional[str] = None, - objective_evidence: Optional[str] = None, - owner: Optional[str] = None, - due_date: Optional[date] = None, - internal_audit_id: Optional[str] = None, - ) -> AuditFindingDB: - """Create a new audit finding.""" - # Generate finding ID - year = date.today().year - existing_count = self.db.query(AuditFindingDB).filter( - AuditFindingDB.finding_id.like(f"FIND-{year}-%") - ).count() - finding_id = f"FIND-{year}-{existing_count + 1:03d}" - - finding = AuditFindingDB( - id=str(uuid.uuid4()), - finding_id=finding_id, - finding_type=finding_type, - iso_chapter=iso_chapter, - annex_a_control=annex_a_control, - title=title, - description=description, - objective_evidence=objective_evidence, - owner=owner, - auditor=auditor, - due_date=due_date, - internal_audit_id=internal_audit_id, - status=FindingStatusEnum.OPEN, - ) - self.db.add(finding) - self.db.commit() - self.db.refresh(finding) - return finding - - def get_by_id(self, finding_id: str) -> Optional[AuditFindingDB]: - """Get finding by UUID or finding_id.""" - return self.db.query(AuditFindingDB).filter( - (AuditFindingDB.id == finding_id) | (AuditFindingDB.finding_id == finding_id) - ).first() - - def get_all( - self, - finding_type: Optional[FindingTypeEnum] = None, - status: Optional[FindingStatusEnum] = None, - internal_audit_id: Optional[str] = None, - ) -> List[AuditFindingDB]: - """Get all findings with optional filters.""" - query = self.db.query(AuditFindingDB) - if finding_type: - query = query.filter(AuditFindingDB.finding_type == finding_type) - if status: - query = query.filter(AuditFindingDB.status == status) - if internal_audit_id: - query = query.filter(AuditFindingDB.internal_audit_id == internal_audit_id) - return query.order_by(AuditFindingDB.identified_date.desc()).all() - - def get_open_majors(self) -> List[AuditFindingDB]: - """Get all open major findings (blocking certification).""" - return self.db.query(AuditFindingDB).filter( - AuditFindingDB.finding_type == FindingTypeEnum.MAJOR, - AuditFindingDB.status != FindingStatusEnum.CLOSED - ).all() - - def get_statistics(self) -> Dict[str, Any]: - """Get finding statistics.""" - findings = self.get_all() - - return { - "total": len(findings), - "major": sum(1 for f in findings if f.finding_type == FindingTypeEnum.MAJOR), - "minor": sum(1 for f in findings if f.finding_type == FindingTypeEnum.MINOR), - "ofi": sum(1 for f in findings if f.finding_type == FindingTypeEnum.OFI), - "positive": sum(1 for f in findings if f.finding_type == FindingTypeEnum.POSITIVE), - "open": sum(1 for f in findings if f.status != FindingStatusEnum.CLOSED), - "closed": sum(1 for f in findings if f.status == FindingStatusEnum.CLOSED), - "blocking_certification": sum( - 1 for f in findings - if f.finding_type == FindingTypeEnum.MAJOR and f.status != FindingStatusEnum.CLOSED - ), - } - - def close( - self, - finding_id: str, - closed_by: str, - closure_notes: str, - verification_method: Optional[str] = None, - verification_evidence: Optional[str] = None, - ) -> Optional[AuditFindingDB]: - """Close a finding after verification.""" - finding = self.get_by_id(finding_id) - if not finding: - return None - - finding.status = FindingStatusEnum.CLOSED - finding.closed_date = date.today() - finding.closed_by = closed_by - finding.closure_notes = closure_notes - finding.verification_method = verification_method - finding.verification_evidence = verification_evidence - finding.verified_by = closed_by - finding.verified_at = datetime.now(timezone.utc) - - self.db.commit() - self.db.refresh(finding) - return finding - - -class CorrectiveActionRepository: - """Repository for Corrective/Preventive Actions (CAPA).""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - finding_id: str, - capa_type: CAPATypeEnum, - title: str, - description: str, - assigned_to: str, - planned_completion: date, - expected_outcome: Optional[str] = None, - effectiveness_criteria: Optional[str] = None, - ) -> CorrectiveActionDB: - """Create a new CAPA.""" - # Generate CAPA ID - year = date.today().year - existing_count = self.db.query(CorrectiveActionDB).filter( - CorrectiveActionDB.capa_id.like(f"CAPA-{year}-%") - ).count() - capa_id = f"CAPA-{year}-{existing_count + 1:03d}" - - capa = CorrectiveActionDB( - id=str(uuid.uuid4()), - capa_id=capa_id, - finding_id=finding_id, - capa_type=capa_type, - title=title, - description=description, - expected_outcome=expected_outcome, - assigned_to=assigned_to, - planned_completion=planned_completion, - effectiveness_criteria=effectiveness_criteria, - status="planned", - ) - self.db.add(capa) - - # Update finding status - finding = self.db.query(AuditFindingDB).filter(AuditFindingDB.id == finding_id).first() - if finding: - finding.status = FindingStatusEnum.CORRECTIVE_ACTION_PENDING - - self.db.commit() - self.db.refresh(capa) - return capa - - def get_by_id(self, capa_id: str) -> Optional[CorrectiveActionDB]: - """Get CAPA by UUID or capa_id.""" - return self.db.query(CorrectiveActionDB).filter( - (CorrectiveActionDB.id == capa_id) | (CorrectiveActionDB.capa_id == capa_id) - ).first() - - def get_by_finding(self, finding_id: str) -> List[CorrectiveActionDB]: - """Get all CAPAs for a finding.""" - return self.db.query(CorrectiveActionDB).filter( - CorrectiveActionDB.finding_id == finding_id - ).order_by(CorrectiveActionDB.planned_completion).all() - - def verify( - self, - capa_id: str, - verified_by: str, - is_effective: bool, - effectiveness_notes: Optional[str] = None, - ) -> Optional[CorrectiveActionDB]: - """Verify a completed CAPA.""" - capa = self.get_by_id(capa_id) - if not capa: - return None - - capa.effectiveness_verified = is_effective - capa.effectiveness_verification_date = date.today() - capa.effectiveness_notes = effectiveness_notes - capa.status = "verified" if is_effective else "completed" - - # If verified, check if all CAPAs for finding are verified - if is_effective: - finding = self.db.query(AuditFindingDB).filter( - AuditFindingDB.id == capa.finding_id - ).first() - if finding: - unverified = self.db.query(CorrectiveActionDB).filter( - CorrectiveActionDB.finding_id == finding.id, - CorrectiveActionDB.id != capa.id, - CorrectiveActionDB.status != "verified" - ).count() - if unverified == 0: - finding.status = FindingStatusEnum.VERIFICATION_PENDING - - self.db.commit() - self.db.refresh(capa) - return capa - - -class ManagementReviewRepository: - """Repository for Management Reviews (ISO 27001 Chapter 9.3).""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - title: str, - review_date: date, - chairperson: str, - review_period_start: Optional[date] = None, - review_period_end: Optional[date] = None, - ) -> ManagementReviewDB: - """Create a new management review.""" - # Generate review ID - year = review_date.year - quarter = (review_date.month - 1) // 3 + 1 - review_id = f"MR-{year}-Q{quarter}" - - # Check for duplicate - existing = self.db.query(ManagementReviewDB).filter( - ManagementReviewDB.review_id == review_id - ).first() - if existing: - review_id = f"{review_id}-{str(uuid.uuid4())[:4]}" - - review = ManagementReviewDB( - id=str(uuid.uuid4()), - review_id=review_id, - title=title, - review_date=review_date, - review_period_start=review_period_start, - review_period_end=review_period_end, - chairperson=chairperson, - status="draft", - ) - self.db.add(review) - self.db.commit() - self.db.refresh(review) - return review - - def get_by_id(self, review_id: str) -> Optional[ManagementReviewDB]: - """Get review by UUID or review_id.""" - return self.db.query(ManagementReviewDB).filter( - (ManagementReviewDB.id == review_id) | (ManagementReviewDB.review_id == review_id) - ).first() - - def get_latest_approved(self) -> Optional[ManagementReviewDB]: - """Get the most recent approved management review.""" - return self.db.query(ManagementReviewDB).filter( - ManagementReviewDB.status == "approved" - ).order_by(ManagementReviewDB.review_date.desc()).first() - - def approve( - self, - review_id: str, - approved_by: str, - next_review_date: date, - minutes_document_path: Optional[str] = None, - ) -> Optional[ManagementReviewDB]: - """Approve a management review.""" - review = self.get_by_id(review_id) - if not review: - return None - - review.status = "approved" - review.approved_by = approved_by - review.approved_at = datetime.now(timezone.utc) - review.next_review_date = next_review_date - review.minutes_document_path = minutes_document_path - - self.db.commit() - self.db.refresh(review) - return review - - -class InternalAuditRepository: - """Repository for Internal Audits (ISO 27001 Chapter 9.2).""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - title: str, - audit_type: str, - planned_date: date, - lead_auditor: str, - scope_description: Optional[str] = None, - iso_chapters_covered: Optional[List[str]] = None, - annex_a_controls_covered: Optional[List[str]] = None, - ) -> InternalAuditDB: - """Create a new internal audit.""" - # Generate audit ID - year = planned_date.year - existing_count = self.db.query(InternalAuditDB).filter( - InternalAuditDB.audit_id.like(f"IA-{year}-%") - ).count() - audit_id = f"IA-{year}-{existing_count + 1:03d}" - - audit = InternalAuditDB( - id=str(uuid.uuid4()), - audit_id=audit_id, - title=title, - audit_type=audit_type, - scope_description=scope_description, - iso_chapters_covered=iso_chapters_covered, - annex_a_controls_covered=annex_a_controls_covered, - planned_date=planned_date, - lead_auditor=lead_auditor, - status="planned", - ) - self.db.add(audit) - self.db.commit() - self.db.refresh(audit) - return audit - - def get_by_id(self, audit_id: str) -> Optional[InternalAuditDB]: - """Get audit by UUID or audit_id.""" - return self.db.query(InternalAuditDB).filter( - (InternalAuditDB.id == audit_id) | (InternalAuditDB.audit_id == audit_id) - ).first() - - def get_latest_completed(self) -> Optional[InternalAuditDB]: - """Get the most recent completed internal audit.""" - return self.db.query(InternalAuditDB).filter( - InternalAuditDB.status == "completed" - ).order_by(InternalAuditDB.actual_end_date.desc()).first() - - def complete( - self, - audit_id: str, - audit_conclusion: str, - overall_assessment: str, - follow_up_audit_required: bool = False, - ) -> Optional[InternalAuditDB]: - """Complete an internal audit.""" - audit = self.get_by_id(audit_id) - if not audit: - return None - - audit.status = "completed" - audit.actual_end_date = date.today() - audit.report_date = date.today() - audit.audit_conclusion = audit_conclusion - audit.overall_assessment = overall_assessment - audit.follow_up_audit_required = follow_up_audit_required - - self.db.commit() - self.db.refresh(audit) - return audit - - -class AuditTrailRepository: - """Repository for Audit Trail entries.""" - - def __init__(self, db: DBSession): - self.db = db - - def log( - self, - entity_type: str, - entity_id: str, - entity_name: str, - action: str, - performed_by: str, - field_changed: Optional[str] = None, - old_value: Optional[str] = None, - new_value: Optional[str] = None, - change_summary: Optional[str] = None, - ) -> AuditTrailDB: - """Log an audit trail entry.""" - import hashlib - entry = AuditTrailDB( - id=str(uuid.uuid4()), - entity_type=entity_type, - entity_id=entity_id, - entity_name=entity_name, - action=action, - field_changed=field_changed, - old_value=old_value, - new_value=new_value, - change_summary=change_summary, - performed_by=performed_by, - performed_at=datetime.now(timezone.utc), - checksum=hashlib.sha256( - f"{entity_type}|{entity_id}|{action}|{performed_by}".encode() - ).hexdigest(), - ) - self.db.add(entry) - self.db.commit() - self.db.refresh(entry) - return entry - - def get_by_entity( - self, - entity_type: str, - entity_id: str, - limit: int = 100, - ) -> List[AuditTrailDB]: - """Get audit trail for a specific entity.""" - return self.db.query(AuditTrailDB).filter( - AuditTrailDB.entity_type == entity_type, - AuditTrailDB.entity_id == entity_id - ).order_by(AuditTrailDB.performed_at.desc()).limit(limit).all() - - def get_paginated( - self, - page: int = 1, - page_size: int = 50, - entity_type: Optional[str] = None, - entity_id: Optional[str] = None, - performed_by: Optional[str] = None, - action: Optional[str] = None, - ) -> Tuple[List[AuditTrailDB], int]: - """Get paginated audit trail with filters.""" - query = self.db.query(AuditTrailDB) - - if entity_type: - query = query.filter(AuditTrailDB.entity_type == entity_type) - if entity_id: - query = query.filter(AuditTrailDB.entity_id == entity_id) - if performed_by: - query = query.filter(AuditTrailDB.performed_by == performed_by) - if action: - query = query.filter(AuditTrailDB.action == action) - - total = query.count() - entries = query.order_by(AuditTrailDB.performed_at.desc()).offset( - (page - 1) * page_size - ).limit(page_size).all() - - return entries, total - - -class ISMSReadinessCheckRepository: - """Repository for ISMS Readiness Check results.""" - - def __init__(self, db: DBSession): - self.db = db - - def save(self, check: ISMSReadinessCheckDB) -> ISMSReadinessCheckDB: - """Save a readiness check result.""" - self.db.add(check) - self.db.commit() - self.db.refresh(check) - return check - - def get_latest(self) -> Optional[ISMSReadinessCheckDB]: - """Get the most recent readiness check.""" - return self.db.query(ISMSReadinessCheckDB).order_by( - ISMSReadinessCheckDB.check_date.desc() - ).first() - - def get_history(self, limit: int = 10) -> List[ISMSReadinessCheckDB]: - """Get readiness check history.""" - return self.db.query(ISMSReadinessCheckDB).order_by( - ISMSReadinessCheckDB.check_date.desc() - ).limit(limit).all() diff --git a/backend-compliance/compliance/db/regulation_repository.py b/backend-compliance/compliance/db/regulation_repository.py new file mode 100644 index 0000000..57866e4 --- /dev/null +++ b/backend-compliance/compliance/db/regulation_repository.py @@ -0,0 +1,268 @@ +""" +Compliance repositories — extracted from compliance/db/repository.py. + +Phase 1 Step 5: the monolithic repository module is decomposed per +aggregate. Every repository class is re-exported from +``compliance.db.repository`` for backwards compatibility. +""" + +import uuid +from datetime import datetime, date, timezone +from typing import List, Optional, Dict, Any, Tuple + +from sqlalchemy.orm import Session as DBSession, selectinload, joinedload +from sqlalchemy import func, and_, or_ + +from compliance.db.models import ( + RegulationDB, RequirementDB, ControlDB, ControlMappingDB, + EvidenceDB, RiskDB, AuditExportDB, + AuditSessionDB, AuditSignOffDB, AuditResultEnum, AuditSessionStatusEnum, + RegulationTypeEnum, ControlDomainEnum, ControlStatusEnum, + RiskLevelEnum, EvidenceStatusEnum, ExportStatusEnum, + ServiceModuleDB, ModuleRegulationMappingDB, +) + +class RegulationRepository: + """Repository for regulations/standards.""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + code: str, + name: str, + regulation_type: RegulationTypeEnum, + full_name: Optional[str] = None, + source_url: Optional[str] = None, + local_pdf_path: Optional[str] = None, + effective_date: Optional[date] = None, + description: Optional[str] = None, + ) -> RegulationDB: + """Create a new regulation.""" + regulation = RegulationDB( + id=str(uuid.uuid4()), + code=code, + name=name, + full_name=full_name, + regulation_type=regulation_type, + source_url=source_url, + local_pdf_path=local_pdf_path, + effective_date=effective_date, + description=description, + ) + self.db.add(regulation) + self.db.commit() + self.db.refresh(regulation) + return regulation + + def get_by_id(self, regulation_id: str) -> Optional[RegulationDB]: + """Get regulation by ID.""" + return self.db.query(RegulationDB).filter(RegulationDB.id == regulation_id).first() + + def get_by_code(self, code: str) -> Optional[RegulationDB]: + """Get regulation by code (e.g., 'GDPR').""" + return self.db.query(RegulationDB).filter(RegulationDB.code == code).first() + + def get_all( + self, + regulation_type: Optional[RegulationTypeEnum] = None, + is_active: Optional[bool] = True + ) -> List[RegulationDB]: + """Get all regulations with optional filters.""" + query = self.db.query(RegulationDB) + if regulation_type: + query = query.filter(RegulationDB.regulation_type == regulation_type) + if is_active is not None: + query = query.filter(RegulationDB.is_active == is_active) + return query.order_by(RegulationDB.code).all() + + def update(self, regulation_id: str, **kwargs) -> Optional[RegulationDB]: + """Update a regulation.""" + regulation = self.get_by_id(regulation_id) + if not regulation: + return None + for key, value in kwargs.items(): + if hasattr(regulation, key): + setattr(regulation, key, value) + regulation.updated_at = datetime.now(timezone.utc) + self.db.commit() + self.db.refresh(regulation) + return regulation + + def delete(self, regulation_id: str) -> bool: + """Delete a regulation.""" + regulation = self.get_by_id(regulation_id) + if not regulation: + return False + self.db.delete(regulation) + self.db.commit() + return True + + def get_active(self) -> List[RegulationDB]: + """Get all active regulations.""" + return self.get_all(is_active=True) + + def count(self) -> int: + """Count all regulations.""" + return self.db.query(func.count(RegulationDB.id)).scalar() or 0 + + +class RequirementRepository: + """Repository for requirements.""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + regulation_id: str, + article: str, + title: str, + paragraph: Optional[str] = None, + description: Optional[str] = None, + requirement_text: Optional[str] = None, + breakpilot_interpretation: Optional[str] = None, + is_applicable: bool = True, + priority: int = 2, + ) -> RequirementDB: + """Create a new requirement.""" + requirement = RequirementDB( + id=str(uuid.uuid4()), + regulation_id=regulation_id, + article=article, + paragraph=paragraph, + title=title, + description=description, + requirement_text=requirement_text, + breakpilot_interpretation=breakpilot_interpretation, + is_applicable=is_applicable, + priority=priority, + ) + self.db.add(requirement) + self.db.commit() + self.db.refresh(requirement) + return requirement + + def get_by_id(self, requirement_id: str) -> Optional[RequirementDB]: + """Get requirement by ID with eager-loaded relationships.""" + return ( + self.db.query(RequirementDB) + .options( + selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control), + joinedload(RequirementDB.regulation) + ) + .filter(RequirementDB.id == requirement_id) + .first() + ) + + def get_by_regulation( + self, + regulation_id: str, + is_applicable: Optional[bool] = None + ) -> List[RequirementDB]: + """Get all requirements for a regulation with eager-loaded controls.""" + query = ( + self.db.query(RequirementDB) + .options( + selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control), + joinedload(RequirementDB.regulation) + ) + .filter(RequirementDB.regulation_id == regulation_id) + ) + if is_applicable is not None: + query = query.filter(RequirementDB.is_applicable == is_applicable) + return query.order_by(RequirementDB.article, RequirementDB.paragraph).all() + + def get_by_regulation_code(self, code: str) -> List[RequirementDB]: + """Get requirements by regulation code with eager-loaded relationships.""" + return ( + self.db.query(RequirementDB) + .options( + selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control), + joinedload(RequirementDB.regulation) + ) + .join(RegulationDB) + .filter(RegulationDB.code == code) + .order_by(RequirementDB.article, RequirementDB.paragraph) + .all() + ) + + def get_all(self, is_applicable: Optional[bool] = None) -> List[RequirementDB]: + """Get all requirements with optional filter and eager-loading.""" + query = ( + self.db.query(RequirementDB) + .options( + selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control), + joinedload(RequirementDB.regulation) + ) + ) + if is_applicable is not None: + query = query.filter(RequirementDB.is_applicable == is_applicable) + return query.order_by(RequirementDB.article, RequirementDB.paragraph).all() + + def get_paginated( + self, + page: int = 1, + page_size: int = 50, + regulation_code: Optional[str] = None, + status: Optional[str] = None, + is_applicable: Optional[bool] = None, + search: Optional[str] = None, + ) -> Tuple[List[RequirementDB], int]: + """ + Get paginated requirements with eager-loaded relationships. + Returns tuple of (items, total_count). + """ + query = ( + self.db.query(RequirementDB) + .options( + selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control), + joinedload(RequirementDB.regulation) + ) + ) + + # Filters + if regulation_code: + query = query.join(RegulationDB).filter(RegulationDB.code == regulation_code) + if status: + query = query.filter(RequirementDB.implementation_status == status) + if is_applicable is not None: + query = query.filter(RequirementDB.is_applicable == is_applicable) + if search: + search_term = f"%{search}%" + query = query.filter( + or_( + RequirementDB.title.ilike(search_term), + RequirementDB.description.ilike(search_term), + RequirementDB.article.ilike(search_term), + ) + ) + + # Count before pagination + total = query.count() + + # Apply pagination and ordering + items = ( + query + .order_by(RequirementDB.priority.desc(), RequirementDB.article, RequirementDB.paragraph) + .offset((page - 1) * page_size) + .limit(page_size) + .all() + ) + + return items, total + + def delete(self, requirement_id: str) -> bool: + """Delete a requirement.""" + requirement = self.db.query(RequirementDB).filter(RequirementDB.id == requirement_id).first() + if not requirement: + return False + self.db.delete(requirement) + self.db.commit() + return True + + def count(self) -> int: + """Count all requirements.""" + return self.db.query(func.count(RequirementDB.id)).scalar() or 0 + diff --git a/backend-compliance/compliance/db/repository.py b/backend-compliance/compliance/db/repository.py index 0122bb5..39953a6 100644 --- a/backend-compliance/compliance/db/repository.py +++ b/backend-compliance/compliance/db/repository.py @@ -1,1547 +1,37 @@ """ -Repository layer for Compliance module. +compliance.db.repository — backwards-compatibility re-export shim. -Provides CRUD operations and business logic queries for all compliance entities. +Phase 1 Step 5 split the monolithic 1547-line repository module into per-aggregate +sibling modules. Every repository class is re-exported here so existing imports +(``from compliance.db.repository import ControlRepository, ...``) continue to +work unchanged. + +New code SHOULD import directly from the aggregate module: + + from compliance.db.regulation_repository import RegulationRepository, RequirementRepository + from compliance.db.control_repository import ControlRepository, ControlMappingRepository + from compliance.db.evidence_repository import EvidenceRepository + from compliance.db.risk_repository import RiskRepository + from compliance.db.audit_export_repository import AuditExportRepository + from compliance.db.service_module_repository import ServiceModuleRepository + from compliance.db.audit_session_repository import AuditSessionRepository, AuditSignOffRepository + +DO NOT add new classes to this file. """ -from __future__ import annotations -import uuid -from datetime import datetime, date, timezone -from typing import List, Optional, Dict, Any - -from sqlalchemy.orm import Session as DBSession, selectinload, joinedload -from sqlalchemy import func, and_, or_ -from typing import Tuple - -from .models import ( - RegulationDB, RequirementDB, ControlDB, ControlMappingDB, - EvidenceDB, RiskDB, AuditExportDB, - AuditSessionDB, AuditSignOffDB, AuditResultEnum, AuditSessionStatusEnum, - RegulationTypeEnum, ControlDomainEnum, ControlStatusEnum, - RiskLevelEnum, EvidenceStatusEnum, ExportStatusEnum, - ServiceModuleDB, ModuleRegulationMappingDB, +from compliance.db.regulation_repository import ( # noqa: F401 + RegulationRepository, + RequirementRepository, +) +from compliance.db.control_repository import ( # noqa: F401 + ControlRepository, + ControlMappingRepository, +) +from compliance.db.evidence_repository import EvidenceRepository # noqa: F401 +from compliance.db.risk_repository import RiskRepository # noqa: F401 +from compliance.db.audit_export_repository import AuditExportRepository # noqa: F401 +from compliance.db.service_module_repository import ServiceModuleRepository # noqa: F401 +from compliance.db.audit_session_repository import ( # noqa: F401 + AuditSessionRepository, + AuditSignOffRepository, ) - - -class RegulationRepository: - """Repository for regulations/standards.""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - code: str, - name: str, - regulation_type: RegulationTypeEnum, - full_name: Optional[str] = None, - source_url: Optional[str] = None, - local_pdf_path: Optional[str] = None, - effective_date: Optional[date] = None, - description: Optional[str] = None, - ) -> RegulationDB: - """Create a new regulation.""" - regulation = RegulationDB( - id=str(uuid.uuid4()), - code=code, - name=name, - full_name=full_name, - regulation_type=regulation_type, - source_url=source_url, - local_pdf_path=local_pdf_path, - effective_date=effective_date, - description=description, - ) - self.db.add(regulation) - self.db.commit() - self.db.refresh(regulation) - return regulation - - def get_by_id(self, regulation_id: str) -> Optional[RegulationDB]: - """Get regulation by ID.""" - return self.db.query(RegulationDB).filter(RegulationDB.id == regulation_id).first() - - def get_by_code(self, code: str) -> Optional[RegulationDB]: - """Get regulation by code (e.g., 'GDPR').""" - return self.db.query(RegulationDB).filter(RegulationDB.code == code).first() - - def get_all( - self, - regulation_type: Optional[RegulationTypeEnum] = None, - is_active: Optional[bool] = True - ) -> List[RegulationDB]: - """Get all regulations with optional filters.""" - query = self.db.query(RegulationDB) - if regulation_type: - query = query.filter(RegulationDB.regulation_type == regulation_type) - if is_active is not None: - query = query.filter(RegulationDB.is_active == is_active) - return query.order_by(RegulationDB.code).all() - - def update(self, regulation_id: str, **kwargs) -> Optional[RegulationDB]: - """Update a regulation.""" - regulation = self.get_by_id(regulation_id) - if not regulation: - return None - for key, value in kwargs.items(): - if hasattr(regulation, key): - setattr(regulation, key, value) - regulation.updated_at = datetime.now(timezone.utc) - self.db.commit() - self.db.refresh(regulation) - return regulation - - def delete(self, regulation_id: str) -> bool: - """Delete a regulation.""" - regulation = self.get_by_id(regulation_id) - if not regulation: - return False - self.db.delete(regulation) - self.db.commit() - return True - - def get_active(self) -> List[RegulationDB]: - """Get all active regulations.""" - return self.get_all(is_active=True) - - def count(self) -> int: - """Count all regulations.""" - return self.db.query(func.count(RegulationDB.id)).scalar() or 0 - - -class RequirementRepository: - """Repository for requirements.""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - regulation_id: str, - article: str, - title: str, - paragraph: Optional[str] = None, - description: Optional[str] = None, - requirement_text: Optional[str] = None, - breakpilot_interpretation: Optional[str] = None, - is_applicable: bool = True, - priority: int = 2, - ) -> RequirementDB: - """Create a new requirement.""" - requirement = RequirementDB( - id=str(uuid.uuid4()), - regulation_id=regulation_id, - article=article, - paragraph=paragraph, - title=title, - description=description, - requirement_text=requirement_text, - breakpilot_interpretation=breakpilot_interpretation, - is_applicable=is_applicable, - priority=priority, - ) - self.db.add(requirement) - self.db.commit() - self.db.refresh(requirement) - return requirement - - def get_by_id(self, requirement_id: str) -> Optional[RequirementDB]: - """Get requirement by ID with eager-loaded relationships.""" - return ( - self.db.query(RequirementDB) - .options( - selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control), - joinedload(RequirementDB.regulation) - ) - .filter(RequirementDB.id == requirement_id) - .first() - ) - - def get_by_regulation( - self, - regulation_id: str, - is_applicable: Optional[bool] = None - ) -> List[RequirementDB]: - """Get all requirements for a regulation with eager-loaded controls.""" - query = ( - self.db.query(RequirementDB) - .options( - selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control), - joinedload(RequirementDB.regulation) - ) - .filter(RequirementDB.regulation_id == regulation_id) - ) - if is_applicable is not None: - query = query.filter(RequirementDB.is_applicable == is_applicable) - return query.order_by(RequirementDB.article, RequirementDB.paragraph).all() - - def get_by_regulation_code(self, code: str) -> List[RequirementDB]: - """Get requirements by regulation code with eager-loaded relationships.""" - return ( - self.db.query(RequirementDB) - .options( - selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control), - joinedload(RequirementDB.regulation) - ) - .join(RegulationDB) - .filter(RegulationDB.code == code) - .order_by(RequirementDB.article, RequirementDB.paragraph) - .all() - ) - - def get_all(self, is_applicable: Optional[bool] = None) -> List[RequirementDB]: - """Get all requirements with optional filter and eager-loading.""" - query = ( - self.db.query(RequirementDB) - .options( - selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control), - joinedload(RequirementDB.regulation) - ) - ) - if is_applicable is not None: - query = query.filter(RequirementDB.is_applicable == is_applicable) - return query.order_by(RequirementDB.article, RequirementDB.paragraph).all() - - def get_paginated( - self, - page: int = 1, - page_size: int = 50, - regulation_code: Optional[str] = None, - status: Optional[str] = None, - is_applicable: Optional[bool] = None, - search: Optional[str] = None, - ) -> Tuple[List[RequirementDB], int]: - """ - Get paginated requirements with eager-loaded relationships. - Returns tuple of (items, total_count). - """ - query = ( - self.db.query(RequirementDB) - .options( - selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control), - joinedload(RequirementDB.regulation) - ) - ) - - # Filters - if regulation_code: - query = query.join(RegulationDB).filter(RegulationDB.code == regulation_code) - if status: - query = query.filter(RequirementDB.implementation_status == status) - if is_applicable is not None: - query = query.filter(RequirementDB.is_applicable == is_applicable) - if search: - search_term = f"%{search}%" - query = query.filter( - or_( - RequirementDB.title.ilike(search_term), - RequirementDB.description.ilike(search_term), - RequirementDB.article.ilike(search_term), - ) - ) - - # Count before pagination - total = query.count() - - # Apply pagination and ordering - items = ( - query - .order_by(RequirementDB.priority.desc(), RequirementDB.article, RequirementDB.paragraph) - .offset((page - 1) * page_size) - .limit(page_size) - .all() - ) - - return items, total - - def delete(self, requirement_id: str) -> bool: - """Delete a requirement.""" - requirement = self.db.query(RequirementDB).filter(RequirementDB.id == requirement_id).first() - if not requirement: - return False - self.db.delete(requirement) - self.db.commit() - return True - - def count(self) -> int: - """Count all requirements.""" - return self.db.query(func.count(RequirementDB.id)).scalar() or 0 - - -class ControlRepository: - """Repository for controls.""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - control_id: str, - domain: ControlDomainEnum, - control_type: str, - title: str, - pass_criteria: str, - description: Optional[str] = None, - implementation_guidance: Optional[str] = None, - code_reference: Optional[str] = None, - is_automated: bool = False, - automation_tool: Optional[str] = None, - owner: Optional[str] = None, - review_frequency_days: int = 90, - ) -> ControlDB: - """Create a new control.""" - control = ControlDB( - id=str(uuid.uuid4()), - control_id=control_id, - domain=domain, - control_type=control_type, - title=title, - description=description, - pass_criteria=pass_criteria, - implementation_guidance=implementation_guidance, - code_reference=code_reference, - is_automated=is_automated, - automation_tool=automation_tool, - owner=owner, - review_frequency_days=review_frequency_days, - ) - self.db.add(control) - self.db.commit() - self.db.refresh(control) - return control - - def get_by_id(self, control_uuid: str) -> Optional[ControlDB]: - """Get control by UUID with eager-loaded relationships.""" - return ( - self.db.query(ControlDB) - .options( - selectinload(ControlDB.mappings).selectinload(ControlMappingDB.requirement), - selectinload(ControlDB.evidence) - ) - .filter(ControlDB.id == control_uuid) - .first() - ) - - def get_by_control_id(self, control_id: str) -> Optional[ControlDB]: - """Get control by control_id (e.g., 'PRIV-001') with eager-loaded relationships.""" - return ( - self.db.query(ControlDB) - .options( - selectinload(ControlDB.mappings).selectinload(ControlMappingDB.requirement), - selectinload(ControlDB.evidence) - ) - .filter(ControlDB.control_id == control_id) - .first() - ) - - def get_all( - self, - domain: Optional[ControlDomainEnum] = None, - status: Optional[ControlStatusEnum] = None, - is_automated: Optional[bool] = None, - ) -> List[ControlDB]: - """Get all controls with optional filters and eager-loading.""" - query = ( - self.db.query(ControlDB) - .options( - selectinload(ControlDB.mappings), - selectinload(ControlDB.evidence) - ) - ) - if domain: - query = query.filter(ControlDB.domain == domain) - if status: - query = query.filter(ControlDB.status == status) - if is_automated is not None: - query = query.filter(ControlDB.is_automated == is_automated) - return query.order_by(ControlDB.control_id).all() - - def get_paginated( - self, - page: int = 1, - page_size: int = 50, - domain: Optional[ControlDomainEnum] = None, - status: Optional[ControlStatusEnum] = None, - is_automated: Optional[bool] = None, - search: Optional[str] = None, - ) -> Tuple[List[ControlDB], int]: - """ - Get paginated controls with eager-loaded relationships. - Returns tuple of (items, total_count). - """ - query = ( - self.db.query(ControlDB) - .options( - selectinload(ControlDB.mappings), - selectinload(ControlDB.evidence) - ) - ) - - if domain: - query = query.filter(ControlDB.domain == domain) - if status: - query = query.filter(ControlDB.status == status) - if is_automated is not None: - query = query.filter(ControlDB.is_automated == is_automated) - if search: - search_term = f"%{search}%" - query = query.filter( - or_( - ControlDB.title.ilike(search_term), - ControlDB.description.ilike(search_term), - ControlDB.control_id.ilike(search_term), - ) - ) - - total = query.count() - items = ( - query - .order_by(ControlDB.control_id) - .offset((page - 1) * page_size) - .limit(page_size) - .all() - ) - - return items, total - - def get_by_domain(self, domain: ControlDomainEnum) -> List[ControlDB]: - """Get all controls in a domain.""" - return self.get_all(domain=domain) - - def get_by_status(self, status: ControlStatusEnum) -> List[ControlDB]: - """Get all controls with a specific status.""" - return self.get_all(status=status) - - def update_status( - self, - control_id: str, - status: ControlStatusEnum, - status_notes: Optional[str] = None - ) -> Optional[ControlDB]: - """Update control status.""" - control = self.get_by_control_id(control_id) - if not control: - return None - control.status = status - if status_notes: - control.status_notes = status_notes - control.updated_at = datetime.now(timezone.utc) - self.db.commit() - self.db.refresh(control) - return control - - def mark_reviewed(self, control_id: str) -> Optional[ControlDB]: - """Mark control as reviewed.""" - control = self.get_by_control_id(control_id) - if not control: - return None - control.last_reviewed_at = datetime.now(timezone.utc) - from datetime import timedelta - control.next_review_at = datetime.now(timezone.utc) + timedelta(days=control.review_frequency_days) - control.updated_at = datetime.now(timezone.utc) - self.db.commit() - self.db.refresh(control) - return control - - def get_due_for_review(self) -> List[ControlDB]: - """Get controls due for review.""" - return ( - self.db.query(ControlDB) - .filter( - or_( - ControlDB.next_review_at is None, - ControlDB.next_review_at <= datetime.now(timezone.utc) - ) - ) - .order_by(ControlDB.next_review_at) - .all() - ) - - def get_statistics(self) -> Dict[str, Any]: - """Get control statistics by status and domain.""" - total = self.db.query(func.count(ControlDB.id)).scalar() - - by_status = dict( - self.db.query(ControlDB.status, func.count(ControlDB.id)) - .group_by(ControlDB.status) - .all() - ) - - by_domain = dict( - self.db.query(ControlDB.domain, func.count(ControlDB.id)) - .group_by(ControlDB.domain) - .all() - ) - - passed = by_status.get(ControlStatusEnum.PASS, 0) - partial = by_status.get(ControlStatusEnum.PARTIAL, 0) - - score = 0.0 - if total > 0: - score = ((passed + (partial * 0.5)) / total) * 100 - - return { - "total": total, - "by_status": {str(k.value) if k else "none": v for k, v in by_status.items()}, - "by_domain": {str(k.value) if k else "none": v for k, v in by_domain.items()}, - "compliance_score": round(score, 1), - } - - -class ControlMappingRepository: - """Repository for requirement-control mappings.""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - requirement_id: str, - control_id: str, - coverage_level: str = "full", - notes: Optional[str] = None, - ) -> ControlMappingDB: - """Create a mapping.""" - # Get the control UUID from control_id - control = self.db.query(ControlDB).filter(ControlDB.control_id == control_id).first() - if not control: - raise ValueError(f"Control {control_id} not found") - - mapping = ControlMappingDB( - id=str(uuid.uuid4()), - requirement_id=requirement_id, - control_id=control.id, - coverage_level=coverage_level, - notes=notes, - ) - self.db.add(mapping) - self.db.commit() - self.db.refresh(mapping) - return mapping - - def get_by_requirement(self, requirement_id: str) -> List[ControlMappingDB]: - """Get all mappings for a requirement.""" - return ( - self.db.query(ControlMappingDB) - .filter(ControlMappingDB.requirement_id == requirement_id) - .all() - ) - - def get_by_control(self, control_uuid: str) -> List[ControlMappingDB]: - """Get all mappings for a control.""" - return ( - self.db.query(ControlMappingDB) - .filter(ControlMappingDB.control_id == control_uuid) - .all() - ) - - -class EvidenceRepository: - """Repository for evidence.""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - control_id: str, - evidence_type: str, - title: str, - description: Optional[str] = None, - artifact_path: Optional[str] = None, - artifact_url: Optional[str] = None, - artifact_hash: Optional[str] = None, - file_size_bytes: Optional[int] = None, - mime_type: Optional[str] = None, - valid_until: Optional[datetime] = None, - source: str = "manual", - ci_job_id: Optional[str] = None, - uploaded_by: Optional[str] = None, - ) -> EvidenceDB: - """Create evidence record.""" - # Get control UUID - control = self.db.query(ControlDB).filter(ControlDB.control_id == control_id).first() - if not control: - raise ValueError(f"Control {control_id} not found") - - evidence = EvidenceDB( - id=str(uuid.uuid4()), - control_id=control.id, - evidence_type=evidence_type, - title=title, - description=description, - artifact_path=artifact_path, - artifact_url=artifact_url, - artifact_hash=artifact_hash, - file_size_bytes=file_size_bytes, - mime_type=mime_type, - valid_until=valid_until, - source=source, - ci_job_id=ci_job_id, - uploaded_by=uploaded_by, - ) - self.db.add(evidence) - self.db.commit() - self.db.refresh(evidence) - return evidence - - def get_by_id(self, evidence_id: str) -> Optional[EvidenceDB]: - """Get evidence by ID.""" - return self.db.query(EvidenceDB).filter(EvidenceDB.id == evidence_id).first() - - def get_by_control( - self, - control_id: str, - status: Optional[EvidenceStatusEnum] = None - ) -> List[EvidenceDB]: - """Get all evidence for a control.""" - control = self.db.query(ControlDB).filter(ControlDB.control_id == control_id).first() - if not control: - return [] - - query = self.db.query(EvidenceDB).filter(EvidenceDB.control_id == control.id) - if status: - query = query.filter(EvidenceDB.status == status) - return query.order_by(EvidenceDB.collected_at.desc()).all() - - def get_all( - self, - evidence_type: Optional[str] = None, - status: Optional[EvidenceStatusEnum] = None, - limit: int = 100, - ) -> List[EvidenceDB]: - """Get all evidence with filters.""" - query = self.db.query(EvidenceDB) - if evidence_type: - query = query.filter(EvidenceDB.evidence_type == evidence_type) - if status: - query = query.filter(EvidenceDB.status == status) - return query.order_by(EvidenceDB.collected_at.desc()).limit(limit).all() - - def update_status(self, evidence_id: str, status: EvidenceStatusEnum) -> Optional[EvidenceDB]: - """Update evidence status.""" - evidence = self.get_by_id(evidence_id) - if not evidence: - return None - evidence.status = status - evidence.updated_at = datetime.now(timezone.utc) - self.db.commit() - self.db.refresh(evidence) - return evidence - - def get_statistics(self) -> Dict[str, Any]: - """Get evidence statistics.""" - total = self.db.query(func.count(EvidenceDB.id)).scalar() - - by_type = dict( - self.db.query(EvidenceDB.evidence_type, func.count(EvidenceDB.id)) - .group_by(EvidenceDB.evidence_type) - .all() - ) - - by_status = dict( - self.db.query(EvidenceDB.status, func.count(EvidenceDB.id)) - .group_by(EvidenceDB.status) - .all() - ) - - valid = by_status.get(EvidenceStatusEnum.VALID, 0) - coverage = (valid / total * 100) if total > 0 else 0 - - return { - "total": total, - "by_type": by_type, - "by_status": {str(k.value) if k else "none": v for k, v in by_status.items()}, - "coverage_percent": round(coverage, 1), - } - - -class RiskRepository: - """Repository for risks.""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - risk_id: str, - title: str, - category: str, - likelihood: int, - impact: int, - description: Optional[str] = None, - mitigating_controls: Optional[List[str]] = None, - owner: Optional[str] = None, - treatment_plan: Optional[str] = None, - ) -> RiskDB: - """Create a risk.""" - inherent_risk = RiskDB.calculate_risk_level(likelihood, impact) - - risk = RiskDB( - id=str(uuid.uuid4()), - risk_id=risk_id, - title=title, - description=description, - category=category, - likelihood=likelihood, - impact=impact, - inherent_risk=inherent_risk, - mitigating_controls=mitigating_controls or [], - owner=owner, - treatment_plan=treatment_plan, - ) - self.db.add(risk) - self.db.commit() - self.db.refresh(risk) - return risk - - def get_by_id(self, risk_uuid: str) -> Optional[RiskDB]: - """Get risk by UUID.""" - return self.db.query(RiskDB).filter(RiskDB.id == risk_uuid).first() - - def get_by_risk_id(self, risk_id: str) -> Optional[RiskDB]: - """Get risk by risk_id (e.g., 'RISK-001').""" - return self.db.query(RiskDB).filter(RiskDB.risk_id == risk_id).first() - - def get_all( - self, - category: Optional[str] = None, - status: Optional[str] = None, - min_risk_level: Optional[RiskLevelEnum] = None, - ) -> List[RiskDB]: - """Get all risks with filters.""" - query = self.db.query(RiskDB) - if category: - query = query.filter(RiskDB.category == category) - if status: - query = query.filter(RiskDB.status == status) - if min_risk_level: - risk_order = { - RiskLevelEnum.LOW: 1, - RiskLevelEnum.MEDIUM: 2, - RiskLevelEnum.HIGH: 3, - RiskLevelEnum.CRITICAL: 4, - } - min_order = risk_order.get(min_risk_level, 1) - query = query.filter( - RiskDB.inherent_risk.in_( - [k for k, v in risk_order.items() if v >= min_order] - ) - ) - return query.order_by(RiskDB.risk_id).all() - - def update(self, risk_id: str, **kwargs) -> Optional[RiskDB]: - """Update a risk.""" - risk = self.get_by_risk_id(risk_id) - if not risk: - return None - - for key, value in kwargs.items(): - if hasattr(risk, key): - setattr(risk, key, value) - - # Recalculate risk levels if likelihood/impact changed - if 'likelihood' in kwargs or 'impact' in kwargs: - risk.inherent_risk = RiskDB.calculate_risk_level(risk.likelihood, risk.impact) - if 'residual_likelihood' in kwargs or 'residual_impact' in kwargs: - if risk.residual_likelihood and risk.residual_impact: - risk.residual_risk = RiskDB.calculate_risk_level( - risk.residual_likelihood, risk.residual_impact - ) - - risk.updated_at = datetime.now(timezone.utc) - self.db.commit() - self.db.refresh(risk) - return risk - - def get_matrix_data(self) -> Dict[str, Any]: - """Get data for risk matrix visualization.""" - risks = self.get_all() - - matrix = {} - for risk in risks: - key = f"{risk.likelihood}_{risk.impact}" - if key not in matrix: - matrix[key] = [] - matrix[key].append({ - "risk_id": risk.risk_id, - "title": risk.title, - "inherent_risk": risk.inherent_risk.value if risk.inherent_risk else None, - }) - - return { - "matrix": matrix, - "total_risks": len(risks), - "by_level": { - "critical": len([r for r in risks if r.inherent_risk == RiskLevelEnum.CRITICAL]), - "high": len([r for r in risks if r.inherent_risk == RiskLevelEnum.HIGH]), - "medium": len([r for r in risks if r.inherent_risk == RiskLevelEnum.MEDIUM]), - "low": len([r for r in risks if r.inherent_risk == RiskLevelEnum.LOW]), - } - } - - -class AuditExportRepository: - """Repository for audit exports.""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - export_type: str, - requested_by: str, - export_name: Optional[str] = None, - included_regulations: Optional[List[str]] = None, - included_domains: Optional[List[str]] = None, - date_range_start: Optional[date] = None, - date_range_end: Optional[date] = None, - ) -> AuditExportDB: - """Create an export request.""" - export = AuditExportDB( - id=str(uuid.uuid4()), - export_type=export_type, - export_name=export_name or f"audit_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}", - requested_by=requested_by, - included_regulations=included_regulations, - included_domains=included_domains, - date_range_start=date_range_start, - date_range_end=date_range_end, - ) - self.db.add(export) - self.db.commit() - self.db.refresh(export) - return export - - def get_by_id(self, export_id: str) -> Optional[AuditExportDB]: - """Get export by ID.""" - return self.db.query(AuditExportDB).filter(AuditExportDB.id == export_id).first() - - def get_all(self, limit: int = 50) -> List[AuditExportDB]: - """Get all exports.""" - return ( - self.db.query(AuditExportDB) - .order_by(AuditExportDB.requested_at.desc()) - .limit(limit) - .all() - ) - - def update_status( - self, - export_id: str, - status: ExportStatusEnum, - file_path: Optional[str] = None, - file_hash: Optional[str] = None, - file_size_bytes: Optional[int] = None, - error_message: Optional[str] = None, - total_controls: Optional[int] = None, - total_evidence: Optional[int] = None, - compliance_score: Optional[float] = None, - ) -> Optional[AuditExportDB]: - """Update export status.""" - export = self.get_by_id(export_id) - if not export: - return None - - export.status = status - if file_path: - export.file_path = file_path - if file_hash: - export.file_hash = file_hash - if file_size_bytes: - export.file_size_bytes = file_size_bytes - if error_message: - export.error_message = error_message - if total_controls is not None: - export.total_controls = total_controls - if total_evidence is not None: - export.total_evidence = total_evidence - if compliance_score is not None: - export.compliance_score = compliance_score - - if status == ExportStatusEnum.COMPLETED: - export.completed_at = datetime.now(timezone.utc) - - export.updated_at = datetime.now(timezone.utc) - self.db.commit() - self.db.refresh(export) - return export - - -class ServiceModuleRepository: - """Repository for service modules (Sprint 3).""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - name: str, - display_name: str, - service_type: str, - description: Optional[str] = None, - port: Optional[int] = None, - technology_stack: Optional[List[str]] = None, - repository_path: Optional[str] = None, - docker_image: Optional[str] = None, - data_categories: Optional[List[str]] = None, - processes_pii: bool = False, - processes_health_data: bool = False, - ai_components: bool = False, - criticality: str = "medium", - owner_team: Optional[str] = None, - owner_contact: Optional[str] = None, - ) -> "ServiceModuleDB": - """Create a service module.""" - from .models import ServiceModuleDB, ServiceTypeEnum - - module = ServiceModuleDB( - id=str(uuid.uuid4()), - name=name, - display_name=display_name, - description=description, - service_type=ServiceTypeEnum(service_type), - port=port, - technology_stack=technology_stack or [], - repository_path=repository_path, - docker_image=docker_image, - data_categories=data_categories or [], - processes_pii=processes_pii, - processes_health_data=processes_health_data, - ai_components=ai_components, - criticality=criticality, - owner_team=owner_team, - owner_contact=owner_contact, - ) - self.db.add(module) - self.db.commit() - self.db.refresh(module) - return module - - def get_by_id(self, module_id: str) -> Optional["ServiceModuleDB"]: - """Get module by ID.""" - from .models import ServiceModuleDB - return self.db.query(ServiceModuleDB).filter(ServiceModuleDB.id == module_id).first() - - def get_by_name(self, name: str) -> Optional["ServiceModuleDB"]: - """Get module by name.""" - from .models import ServiceModuleDB - return self.db.query(ServiceModuleDB).filter(ServiceModuleDB.name == name).first() - - def get_all( - self, - service_type: Optional[str] = None, - criticality: Optional[str] = None, - processes_pii: Optional[bool] = None, - ai_components: Optional[bool] = None, - ) -> List["ServiceModuleDB"]: - """Get all modules with filters.""" - from .models import ServiceModuleDB, ServiceTypeEnum - - query = self.db.query(ServiceModuleDB).filter(ServiceModuleDB.is_active) - - if service_type: - query = query.filter(ServiceModuleDB.service_type == ServiceTypeEnum(service_type)) - if criticality: - query = query.filter(ServiceModuleDB.criticality == criticality) - if processes_pii is not None: - query = query.filter(ServiceModuleDB.processes_pii == processes_pii) - if ai_components is not None: - query = query.filter(ServiceModuleDB.ai_components == ai_components) - - return query.order_by(ServiceModuleDB.name).all() - - def get_with_regulations(self, module_id: str) -> Optional["ServiceModuleDB"]: - """Get module with regulation mappings loaded.""" - from .models import ServiceModuleDB, ModuleRegulationMappingDB - from sqlalchemy.orm import selectinload - - return ( - self.db.query(ServiceModuleDB) - .options( - selectinload(ServiceModuleDB.regulation_mappings) - .selectinload(ModuleRegulationMappingDB.regulation) - ) - .filter(ServiceModuleDB.id == module_id) - .first() - ) - - def add_regulation_mapping( - self, - module_id: str, - regulation_id: str, - relevance_level: str = "medium", - notes: Optional[str] = None, - applicable_articles: Optional[List[str]] = None, - ) -> "ModuleRegulationMappingDB": - """Add a regulation mapping to a module.""" - from .models import ModuleRegulationMappingDB, RelevanceLevelEnum - - mapping = ModuleRegulationMappingDB( - id=str(uuid.uuid4()), - module_id=module_id, - regulation_id=regulation_id, - relevance_level=RelevanceLevelEnum(relevance_level), - notes=notes, - applicable_articles=applicable_articles, - ) - self.db.add(mapping) - self.db.commit() - self.db.refresh(mapping) - return mapping - - def get_overview(self) -> Dict[str, Any]: - """Get overview statistics for all modules.""" - from .models import ModuleRegulationMappingDB - - modules = self.get_all() - total = len(modules) - - by_type = {} - by_criticality = {} - pii_count = 0 - ai_count = 0 - - for m in modules: - type_key = m.service_type.value if m.service_type else "unknown" - by_type[type_key] = by_type.get(type_key, 0) + 1 - by_criticality[m.criticality] = by_criticality.get(m.criticality, 0) + 1 - if m.processes_pii: - pii_count += 1 - if m.ai_components: - ai_count += 1 - - # Get regulation coverage - regulation_coverage = {} - mappings = self.db.query(ModuleRegulationMappingDB).all() - for mapping in mappings: - reg = mapping.regulation - if reg: - code = reg.code - regulation_coverage[code] = regulation_coverage.get(code, 0) + 1 - - # Calculate average compliance score - scores = [m.compliance_score for m in modules if m.compliance_score is not None] - avg_score = sum(scores) / len(scores) if scores else None - - return { - "total_modules": total, - "modules_by_type": by_type, - "modules_by_criticality": by_criticality, - "modules_processing_pii": pii_count, - "modules_with_ai": ai_count, - "average_compliance_score": round(avg_score, 1) if avg_score else None, - "regulations_coverage": regulation_coverage, - } - - def seed_from_data(self, services_data: List[Dict[str, Any]], force: bool = False) -> Dict[str, int]: - """Seed modules from service_modules.py data.""" - - modules_created = 0 - mappings_created = 0 - - for svc in services_data: - # Check if module exists - existing = self.get_by_name(svc["name"]) - if existing and not force: - continue - - if existing and force: - # Delete existing module (cascades to mappings) - self.db.delete(existing) - self.db.commit() - - # Create module - module = self.create( - name=svc["name"], - display_name=svc["display_name"], - description=svc.get("description"), - service_type=svc["service_type"], - port=svc.get("port"), - technology_stack=svc.get("technology_stack"), - repository_path=svc.get("repository_path"), - docker_image=svc.get("docker_image"), - data_categories=svc.get("data_categories"), - processes_pii=svc.get("processes_pii", False), - processes_health_data=svc.get("processes_health_data", False), - ai_components=svc.get("ai_components", False), - criticality=svc.get("criticality", "medium"), - owner_team=svc.get("owner_team"), - ) - modules_created += 1 - - # Create regulation mappings - for reg_data in svc.get("regulations", []): - # Find regulation by code - reg = self.db.query(RegulationDB).filter( - RegulationDB.code == reg_data["code"] - ).first() - - if reg: - self.add_regulation_mapping( - module_id=module.id, - regulation_id=reg.id, - relevance_level=reg_data.get("relevance", "medium"), - notes=reg_data.get("notes"), - ) - mappings_created += 1 - - return { - "modules_created": modules_created, - "mappings_created": mappings_created, - } - - -class AuditSessionRepository: - """Repository for audit sessions (Sprint 3: Auditor-Verbesserungen).""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - name: str, - auditor_name: str, - description: Optional[str] = None, - auditor_email: Optional[str] = None, - regulation_ids: Optional[List[str]] = None, - ) -> AuditSessionDB: - """Create a new audit session.""" - session = AuditSessionDB( - id=str(uuid.uuid4()), - name=name, - description=description, - auditor_name=auditor_name, - auditor_email=auditor_email, - regulation_ids=regulation_ids, - status=AuditSessionStatusEnum.DRAFT, - ) - self.db.add(session) - self.db.commit() - self.db.refresh(session) - return session - - def get_by_id(self, session_id: str) -> Optional[AuditSessionDB]: - """Get audit session by ID with eager-loaded signoffs.""" - return ( - self.db.query(AuditSessionDB) - .options( - selectinload(AuditSessionDB.signoffs) - .selectinload(AuditSignOffDB.requirement) - ) - .filter(AuditSessionDB.id == session_id) - .first() - ) - - def get_all( - self, - status: Optional[AuditSessionStatusEnum] = None, - limit: int = 50, - ) -> List[AuditSessionDB]: - """Get all audit sessions with optional status filter.""" - query = self.db.query(AuditSessionDB) - if status: - query = query.filter(AuditSessionDB.status == status) - return query.order_by(AuditSessionDB.created_at.desc()).limit(limit).all() - - def update_status( - self, - session_id: str, - status: AuditSessionStatusEnum, - ) -> Optional[AuditSessionDB]: - """Update session status and set appropriate timestamps.""" - session = self.get_by_id(session_id) - if not session: - return None - - session.status = status - if status == AuditSessionStatusEnum.IN_PROGRESS and not session.started_at: - session.started_at = datetime.now(timezone.utc) - elif status == AuditSessionStatusEnum.COMPLETED: - session.completed_at = datetime.now(timezone.utc) - - session.updated_at = datetime.now(timezone.utc) - self.db.commit() - self.db.refresh(session) - return session - - def update_progress( - self, - session_id: str, - total_items: Optional[int] = None, - completed_items: Optional[int] = None, - ) -> Optional[AuditSessionDB]: - """Update session progress counters.""" - session = self.db.query(AuditSessionDB).filter( - AuditSessionDB.id == session_id - ).first() - if not session: - return None - - if total_items is not None: - session.total_items = total_items - if completed_items is not None: - session.completed_items = completed_items - - session.updated_at = datetime.now(timezone.utc) - self.db.commit() - self.db.refresh(session) - return session - - def start_session(self, session_id: str) -> Optional[AuditSessionDB]: - """ - Start an audit session: - - Set status to IN_PROGRESS - - Initialize total_items based on requirements count - """ - session = self.get_by_id(session_id) - if not session: - return None - - # Count requirements for this session - query = self.db.query(func.count(RequirementDB.id)) - if session.regulation_ids: - query = query.join(RegulationDB).filter( - RegulationDB.id.in_(session.regulation_ids) - ) - total_requirements = query.scalar() or 0 - - session.status = AuditSessionStatusEnum.IN_PROGRESS - session.started_at = datetime.now(timezone.utc) - session.total_items = total_requirements - session.updated_at = datetime.now(timezone.utc) - - self.db.commit() - self.db.refresh(session) - return session - - def delete(self, session_id: str) -> bool: - """Delete an audit session (cascades to signoffs).""" - session = self.db.query(AuditSessionDB).filter( - AuditSessionDB.id == session_id - ).first() - if not session: - return False - - self.db.delete(session) - self.db.commit() - return True - - def get_statistics(self, session_id: str) -> Dict[str, Any]: - """Get detailed statistics for an audit session.""" - session = self.get_by_id(session_id) - if not session: - return {} - - signoffs = session.signoffs or [] - - stats = { - "total": session.total_items or 0, - "completed": len([s for s in signoffs if s.result != AuditResultEnum.PENDING]), - "compliant": len([s for s in signoffs if s.result == AuditResultEnum.COMPLIANT]), - "compliant_with_notes": len([s for s in signoffs if s.result == AuditResultEnum.COMPLIANT_WITH_NOTES]), - "non_compliant": len([s for s in signoffs if s.result == AuditResultEnum.NON_COMPLIANT]), - "not_applicable": len([s for s in signoffs if s.result == AuditResultEnum.NOT_APPLICABLE]), - "pending": len([s for s in signoffs if s.result == AuditResultEnum.PENDING]), - "signed": len([s for s in signoffs if s.signature_hash]), - } - - total = stats["total"] if stats["total"] > 0 else 1 - stats["completion_percentage"] = round( - (stats["completed"] / total) * 100, 1 - ) - - return stats - - -class AuditSignOffRepository: - """Repository for audit sign-offs (Sprint 3: Auditor-Verbesserungen).""" - - def __init__(self, db: DBSession): - self.db = db - - def create( - self, - session_id: str, - requirement_id: str, - result: AuditResultEnum = AuditResultEnum.PENDING, - notes: Optional[str] = None, - ) -> AuditSignOffDB: - """Create a new sign-off for a requirement.""" - signoff = AuditSignOffDB( - id=str(uuid.uuid4()), - session_id=session_id, - requirement_id=requirement_id, - result=result, - notes=notes, - ) - self.db.add(signoff) - self.db.commit() - self.db.refresh(signoff) - return signoff - - def get_by_id(self, signoff_id: str) -> Optional[AuditSignOffDB]: - """Get sign-off by ID.""" - return ( - self.db.query(AuditSignOffDB) - .options(joinedload(AuditSignOffDB.requirement)) - .filter(AuditSignOffDB.id == signoff_id) - .first() - ) - - def get_by_session_and_requirement( - self, - session_id: str, - requirement_id: str, - ) -> Optional[AuditSignOffDB]: - """Get sign-off by session and requirement ID.""" - return ( - self.db.query(AuditSignOffDB) - .filter( - and_( - AuditSignOffDB.session_id == session_id, - AuditSignOffDB.requirement_id == requirement_id, - ) - ) - .first() - ) - - def get_by_session( - self, - session_id: str, - result_filter: Optional[AuditResultEnum] = None, - ) -> List[AuditSignOffDB]: - """Get all sign-offs for a session.""" - query = ( - self.db.query(AuditSignOffDB) - .options(joinedload(AuditSignOffDB.requirement)) - .filter(AuditSignOffDB.session_id == session_id) - ) - if result_filter: - query = query.filter(AuditSignOffDB.result == result_filter) - return query.order_by(AuditSignOffDB.created_at).all() - - def update( - self, - signoff_id: str, - result: Optional[AuditResultEnum] = None, - notes: Optional[str] = None, - sign: bool = False, - signed_by: Optional[str] = None, - ) -> Optional[AuditSignOffDB]: - """Update a sign-off with optional digital signature.""" - signoff = self.db.query(AuditSignOffDB).filter( - AuditSignOffDB.id == signoff_id - ).first() - if not signoff: - return None - - if result is not None: - signoff.result = result - if notes is not None: - signoff.notes = notes - - if sign and signed_by: - signoff.create_signature(signed_by) - - signoff.updated_at = datetime.now(timezone.utc) - self.db.commit() - self.db.refresh(signoff) - - # Update session progress - self._update_session_progress(signoff.session_id) - - return signoff - - def sign_off( - self, - session_id: str, - requirement_id: str, - result: AuditResultEnum, - notes: Optional[str] = None, - sign: bool = False, - signed_by: Optional[str] = None, - ) -> AuditSignOffDB: - """ - Create or update a sign-off for a requirement. - This is the main method for auditors to record their findings. - """ - # Check if sign-off already exists - signoff = self.get_by_session_and_requirement(session_id, requirement_id) - - if signoff: - # Update existing - signoff.result = result - if notes is not None: - signoff.notes = notes - if sign and signed_by: - signoff.create_signature(signed_by) - signoff.updated_at = datetime.now(timezone.utc) - else: - # Create new - signoff = AuditSignOffDB( - id=str(uuid.uuid4()), - session_id=session_id, - requirement_id=requirement_id, - result=result, - notes=notes, - ) - if sign and signed_by: - signoff.create_signature(signed_by) - self.db.add(signoff) - - self.db.commit() - self.db.refresh(signoff) - - # Update session progress - self._update_session_progress(session_id) - - return signoff - - def _update_session_progress(self, session_id: str) -> None: - """Update the session's completed_items count.""" - completed = ( - self.db.query(func.count(AuditSignOffDB.id)) - .filter( - and_( - AuditSignOffDB.session_id == session_id, - AuditSignOffDB.result != AuditResultEnum.PENDING, - ) - ) - .scalar() - ) or 0 - - session = self.db.query(AuditSessionDB).filter( - AuditSessionDB.id == session_id - ).first() - if session: - session.completed_items = completed - session.updated_at = datetime.now(timezone.utc) - self.db.commit() - - def get_checklist( - self, - session_id: str, - page: int = 1, - page_size: int = 50, - result_filter: Optional[AuditResultEnum] = None, - regulation_code: Optional[str] = None, - search: Optional[str] = None, - ) -> Tuple[List[Dict[str, Any]], int]: - """ - Get audit checklist items for a session with pagination. - Returns requirements with their sign-off status. - """ - session = self.db.query(AuditSessionDB).filter( - AuditSessionDB.id == session_id - ).first() - if not session: - return [], 0 - - # Base query for requirements - query = ( - self.db.query(RequirementDB) - .options( - joinedload(RequirementDB.regulation), - selectinload(RequirementDB.control_mappings), - ) - ) - - # Filter by session's regulation_ids if set - if session.regulation_ids: - query = query.filter(RequirementDB.regulation_id.in_(session.regulation_ids)) - - # Filter by regulation code - if regulation_code: - query = query.join(RegulationDB).filter(RegulationDB.code == regulation_code) - - # Search - if search: - search_term = f"%{search}%" - query = query.filter( - or_( - RequirementDB.title.ilike(search_term), - RequirementDB.article.ilike(search_term), - ) - ) - - # Get existing sign-offs for this session - signoffs_map = {} - signoffs = ( - self.db.query(AuditSignOffDB) - .filter(AuditSignOffDB.session_id == session_id) - .all() - ) - for s in signoffs: - signoffs_map[s.requirement_id] = s - - # Filter by result if specified - if result_filter: - if result_filter == AuditResultEnum.PENDING: - # Requirements without sign-off or with pending status - signed_req_ids = [ - s.requirement_id for s in signoffs - if s.result != AuditResultEnum.PENDING - ] - if signed_req_ids: - query = query.filter(~RequirementDB.id.in_(signed_req_ids)) - else: - # Requirements with specific result - matching_req_ids = [ - s.requirement_id for s in signoffs - if s.result == result_filter - ] - if matching_req_ids: - query = query.filter(RequirementDB.id.in_(matching_req_ids)) - else: - return [], 0 - - # Count and paginate - total = query.count() - requirements = ( - query - .order_by(RequirementDB.article, RequirementDB.paragraph) - .offset((page - 1) * page_size) - .limit(page_size) - .all() - ) - - # Build checklist items - items = [] - for req in requirements: - signoff = signoffs_map.get(req.id) - items.append({ - "requirement_id": req.id, - "regulation_code": req.regulation.code if req.regulation else None, - "regulation_name": req.regulation.name if req.regulation else None, - "article": req.article, - "paragraph": req.paragraph, - "title": req.title, - "description": req.description, - "current_result": signoff.result.value if signoff else AuditResultEnum.PENDING.value, - "notes": signoff.notes if signoff else None, - "is_signed": bool(signoff.signature_hash) if signoff else False, - "signed_at": signoff.signed_at if signoff else None, - "signed_by": signoff.signed_by if signoff else None, - "evidence_count": len(req.control_mappings) if req.control_mappings else 0, - "controls_mapped": len(req.control_mappings) if req.control_mappings else 0, - }) - - return items, total - - def delete(self, signoff_id: str) -> bool: - """Delete a sign-off.""" - signoff = self.db.query(AuditSignOffDB).filter( - AuditSignOffDB.id == signoff_id - ).first() - if not signoff: - return False - - session_id = signoff.session_id - self.db.delete(signoff) - self.db.commit() - - # Update session progress - self._update_session_progress(session_id) - - return True diff --git a/backend-compliance/compliance/db/risk_repository.py b/backend-compliance/compliance/db/risk_repository.py new file mode 100644 index 0000000..cd2ad91 --- /dev/null +++ b/backend-compliance/compliance/db/risk_repository.py @@ -0,0 +1,148 @@ +""" +Compliance repositories — extracted from compliance/db/repository.py. + +Phase 1 Step 5: the monolithic repository module is decomposed per +aggregate. Every repository class is re-exported from +``compliance.db.repository`` for backwards compatibility. +""" + +import uuid +from datetime import datetime, date, timezone +from typing import List, Optional, Dict, Any, Tuple + +from sqlalchemy.orm import Session as DBSession, selectinload, joinedload +from sqlalchemy import func, and_, or_ + +from compliance.db.models import ( + RegulationDB, RequirementDB, ControlDB, ControlMappingDB, + EvidenceDB, RiskDB, AuditExportDB, + AuditSessionDB, AuditSignOffDB, AuditResultEnum, AuditSessionStatusEnum, + RegulationTypeEnum, ControlDomainEnum, ControlStatusEnum, + RiskLevelEnum, EvidenceStatusEnum, ExportStatusEnum, + ServiceModuleDB, ModuleRegulationMappingDB, +) + +class RiskRepository: + """Repository for risks.""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + risk_id: str, + title: str, + category: str, + likelihood: int, + impact: int, + description: Optional[str] = None, + mitigating_controls: Optional[List[str]] = None, + owner: Optional[str] = None, + treatment_plan: Optional[str] = None, + ) -> RiskDB: + """Create a risk.""" + inherent_risk = RiskDB.calculate_risk_level(likelihood, impact) + + risk = RiskDB( + id=str(uuid.uuid4()), + risk_id=risk_id, + title=title, + description=description, + category=category, + likelihood=likelihood, + impact=impact, + inherent_risk=inherent_risk, + mitigating_controls=mitigating_controls or [], + owner=owner, + treatment_plan=treatment_plan, + ) + self.db.add(risk) + self.db.commit() + self.db.refresh(risk) + return risk + + def get_by_id(self, risk_uuid: str) -> Optional[RiskDB]: + """Get risk by UUID.""" + return self.db.query(RiskDB).filter(RiskDB.id == risk_uuid).first() + + def get_by_risk_id(self, risk_id: str) -> Optional[RiskDB]: + """Get risk by risk_id (e.g., 'RISK-001').""" + return self.db.query(RiskDB).filter(RiskDB.risk_id == risk_id).first() + + def get_all( + self, + category: Optional[str] = None, + status: Optional[str] = None, + min_risk_level: Optional[RiskLevelEnum] = None, + ) -> List[RiskDB]: + """Get all risks with filters.""" + query = self.db.query(RiskDB) + if category: + query = query.filter(RiskDB.category == category) + if status: + query = query.filter(RiskDB.status == status) + if min_risk_level: + risk_order = { + RiskLevelEnum.LOW: 1, + RiskLevelEnum.MEDIUM: 2, + RiskLevelEnum.HIGH: 3, + RiskLevelEnum.CRITICAL: 4, + } + min_order = risk_order.get(min_risk_level, 1) + query = query.filter( + RiskDB.inherent_risk.in_( + [k for k, v in risk_order.items() if v >= min_order] + ) + ) + return query.order_by(RiskDB.risk_id).all() + + def update(self, risk_id: str, **kwargs) -> Optional[RiskDB]: + """Update a risk.""" + risk = self.get_by_risk_id(risk_id) + if not risk: + return None + + for key, value in kwargs.items(): + if hasattr(risk, key): + setattr(risk, key, value) + + # Recalculate risk levels if likelihood/impact changed + if 'likelihood' in kwargs or 'impact' in kwargs: + risk.inherent_risk = RiskDB.calculate_risk_level(risk.likelihood, risk.impact) + if 'residual_likelihood' in kwargs or 'residual_impact' in kwargs: + if risk.residual_likelihood and risk.residual_impact: + risk.residual_risk = RiskDB.calculate_risk_level( + risk.residual_likelihood, risk.residual_impact + ) + + risk.updated_at = datetime.now(timezone.utc) + self.db.commit() + self.db.refresh(risk) + return risk + + def get_matrix_data(self) -> Dict[str, Any]: + """Get data for risk matrix visualization.""" + risks = self.get_all() + + matrix = {} + for risk in risks: + key = f"{risk.likelihood}_{risk.impact}" + if key not in matrix: + matrix[key] = [] + matrix[key].append({ + "risk_id": risk.risk_id, + "title": risk.title, + "inherent_risk": risk.inherent_risk.value if risk.inherent_risk else None, + }) + + return { + "matrix": matrix, + "total_risks": len(risks), + "by_level": { + "critical": len([r for r in risks if r.inherent_risk == RiskLevelEnum.CRITICAL]), + "high": len([r for r in risks if r.inherent_risk == RiskLevelEnum.HIGH]), + "medium": len([r for r in risks if r.inherent_risk == RiskLevelEnum.MEDIUM]), + "low": len([r for r in risks if r.inherent_risk == RiskLevelEnum.LOW]), + } + } + diff --git a/backend-compliance/compliance/db/service_module_repository.py b/backend-compliance/compliance/db/service_module_repository.py new file mode 100644 index 0000000..4e24473 --- /dev/null +++ b/backend-compliance/compliance/db/service_module_repository.py @@ -0,0 +1,247 @@ +""" +Compliance repositories — extracted from compliance/db/repository.py. + +Phase 1 Step 5: the monolithic repository module is decomposed per +aggregate. Every repository class is re-exported from +``compliance.db.repository`` for backwards compatibility. +""" + +import uuid +from datetime import datetime, date, timezone +from typing import List, Optional, Dict, Any, Tuple + +from sqlalchemy.orm import Session as DBSession, selectinload, joinedload +from sqlalchemy import func, and_, or_ + +from compliance.db.models import ( + RegulationDB, RequirementDB, ControlDB, ControlMappingDB, + EvidenceDB, RiskDB, AuditExportDB, + AuditSessionDB, AuditSignOffDB, AuditResultEnum, AuditSessionStatusEnum, + RegulationTypeEnum, ControlDomainEnum, ControlStatusEnum, + RiskLevelEnum, EvidenceStatusEnum, ExportStatusEnum, + ServiceModuleDB, ModuleRegulationMappingDB, +) + +class ServiceModuleRepository: + """Repository for service modules (Sprint 3).""" + + def __init__(self, db: DBSession): + self.db = db + + def create( + self, + name: str, + display_name: str, + service_type: str, + description: Optional[str] = None, + port: Optional[int] = None, + technology_stack: Optional[List[str]] = None, + repository_path: Optional[str] = None, + docker_image: Optional[str] = None, + data_categories: Optional[List[str]] = None, + processes_pii: bool = False, + processes_health_data: bool = False, + ai_components: bool = False, + criticality: str = "medium", + owner_team: Optional[str] = None, + owner_contact: Optional[str] = None, + ) -> "ServiceModuleDB": + """Create a service module.""" + from .models import ServiceModuleDB, ServiceTypeEnum + + module = ServiceModuleDB( + id=str(uuid.uuid4()), + name=name, + display_name=display_name, + description=description, + service_type=ServiceTypeEnum(service_type), + port=port, + technology_stack=technology_stack or [], + repository_path=repository_path, + docker_image=docker_image, + data_categories=data_categories or [], + processes_pii=processes_pii, + processes_health_data=processes_health_data, + ai_components=ai_components, + criticality=criticality, + owner_team=owner_team, + owner_contact=owner_contact, + ) + self.db.add(module) + self.db.commit() + self.db.refresh(module) + return module + + def get_by_id(self, module_id: str) -> Optional["ServiceModuleDB"]: + """Get module by ID.""" + from .models import ServiceModuleDB + return self.db.query(ServiceModuleDB).filter(ServiceModuleDB.id == module_id).first() + + def get_by_name(self, name: str) -> Optional["ServiceModuleDB"]: + """Get module by name.""" + from .models import ServiceModuleDB + return self.db.query(ServiceModuleDB).filter(ServiceModuleDB.name == name).first() + + def get_all( + self, + service_type: Optional[str] = None, + criticality: Optional[str] = None, + processes_pii: Optional[bool] = None, + ai_components: Optional[bool] = None, + ) -> List["ServiceModuleDB"]: + """Get all modules with filters.""" + from .models import ServiceModuleDB, ServiceTypeEnum + + query = self.db.query(ServiceModuleDB).filter(ServiceModuleDB.is_active) + + if service_type: + query = query.filter(ServiceModuleDB.service_type == ServiceTypeEnum(service_type)) + if criticality: + query = query.filter(ServiceModuleDB.criticality == criticality) + if processes_pii is not None: + query = query.filter(ServiceModuleDB.processes_pii == processes_pii) + if ai_components is not None: + query = query.filter(ServiceModuleDB.ai_components == ai_components) + + return query.order_by(ServiceModuleDB.name).all() + + def get_with_regulations(self, module_id: str) -> Optional["ServiceModuleDB"]: + """Get module with regulation mappings loaded.""" + from .models import ServiceModuleDB, ModuleRegulationMappingDB + from sqlalchemy.orm import selectinload + + return ( + self.db.query(ServiceModuleDB) + .options( + selectinload(ServiceModuleDB.regulation_mappings) + .selectinload(ModuleRegulationMappingDB.regulation) + ) + .filter(ServiceModuleDB.id == module_id) + .first() + ) + + def add_regulation_mapping( + self, + module_id: str, + regulation_id: str, + relevance_level: str = "medium", + notes: Optional[str] = None, + applicable_articles: Optional[List[str]] = None, + ) -> "ModuleRegulationMappingDB": + """Add a regulation mapping to a module.""" + from .models import ModuleRegulationMappingDB, RelevanceLevelEnum + + mapping = ModuleRegulationMappingDB( + id=str(uuid.uuid4()), + module_id=module_id, + regulation_id=regulation_id, + relevance_level=RelevanceLevelEnum(relevance_level), + notes=notes, + applicable_articles=applicable_articles, + ) + self.db.add(mapping) + self.db.commit() + self.db.refresh(mapping) + return mapping + + def get_overview(self) -> Dict[str, Any]: + """Get overview statistics for all modules.""" + from .models import ModuleRegulationMappingDB + + modules = self.get_all() + total = len(modules) + + by_type = {} + by_criticality = {} + pii_count = 0 + ai_count = 0 + + for m in modules: + type_key = m.service_type.value if m.service_type else "unknown" + by_type[type_key] = by_type.get(type_key, 0) + 1 + by_criticality[m.criticality] = by_criticality.get(m.criticality, 0) + 1 + if m.processes_pii: + pii_count += 1 + if m.ai_components: + ai_count += 1 + + # Get regulation coverage + regulation_coverage = {} + mappings = self.db.query(ModuleRegulationMappingDB).all() + for mapping in mappings: + reg = mapping.regulation + if reg: + code = reg.code + regulation_coverage[code] = regulation_coverage.get(code, 0) + 1 + + # Calculate average compliance score + scores = [m.compliance_score for m in modules if m.compliance_score is not None] + avg_score = sum(scores) / len(scores) if scores else None + + return { + "total_modules": total, + "modules_by_type": by_type, + "modules_by_criticality": by_criticality, + "modules_processing_pii": pii_count, + "modules_with_ai": ai_count, + "average_compliance_score": round(avg_score, 1) if avg_score else None, + "regulations_coverage": regulation_coverage, + } + + def seed_from_data(self, services_data: List[Dict[str, Any]], force: bool = False) -> Dict[str, int]: + """Seed modules from service_modules.py data.""" + + modules_created = 0 + mappings_created = 0 + + for svc in services_data: + # Check if module exists + existing = self.get_by_name(svc["name"]) + if existing and not force: + continue + + if existing and force: + # Delete existing module (cascades to mappings) + self.db.delete(existing) + self.db.commit() + + # Create module + module = self.create( + name=svc["name"], + display_name=svc["display_name"], + description=svc.get("description"), + service_type=svc["service_type"], + port=svc.get("port"), + technology_stack=svc.get("technology_stack"), + repository_path=svc.get("repository_path"), + docker_image=svc.get("docker_image"), + data_categories=svc.get("data_categories"), + processes_pii=svc.get("processes_pii", False), + processes_health_data=svc.get("processes_health_data", False), + ai_components=svc.get("ai_components", False), + criticality=svc.get("criticality", "medium"), + owner_team=svc.get("owner_team"), + ) + modules_created += 1 + + # Create regulation mappings + for reg_data in svc.get("regulations", []): + # Find regulation by code + reg = self.db.query(RegulationDB).filter( + RegulationDB.code == reg_data["code"] + ).first() + + if reg: + self.add_regulation_mapping( + module_id=module.id, + regulation_id=reg.id, + relevance_level=reg_data.get("relevance", "medium"), + notes=reg_data.get("notes"), + ) + mappings_created += 1 + + return { + "modules_created": modules_created, + "mappings_created": mappings_created, + } +