This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/backend/compliance/db/isms_repository.py
Benjamin Admin 21a844cb8a fix: Restore all files lost during destructive rebase
A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.

This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).

Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 09:51:32 +01:00

840 lines
29 KiB
Python

"""
Repository layer for ISMS (Information Security Management System) entities.
Provides CRUD operations for ISO 27001 certification-related entities:
- ISMS Scope & Context
- Policies & Objectives
- Statement of Applicability (SoA)
- Audit Findings & CAPA
- Management Reviews & Internal Audits
"""
import uuid
from datetime import datetime, date
from typing import List, Optional, Dict, Any, Tuple
from sqlalchemy.orm import Session as DBSession
from sqlalchemy import func, and_, or_
from .models import (
ISMSScopeDB, ISMSContextDB, ISMSPolicyDB, SecurityObjectiveDB,
StatementOfApplicabilityDB, AuditFindingDB, CorrectiveActionDB,
ManagementReviewDB, InternalAuditDB, AuditTrailDB, ISMSReadinessCheckDB,
ApprovalStatusEnum, FindingTypeEnum, FindingStatusEnum, CAPATypeEnum
)
class ISMSScopeRepository:
"""Repository for ISMS Scope (ISO 27001 Chapter 4.3)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
scope_statement: str,
created_by: str,
included_locations: Optional[List[str]] = None,
included_processes: Optional[List[str]] = None,
included_services: Optional[List[str]] = None,
excluded_items: Optional[List[str]] = None,
exclusion_justification: Optional[str] = None,
organizational_boundary: Optional[str] = None,
physical_boundary: Optional[str] = None,
technical_boundary: Optional[str] = None,
) -> ISMSScopeDB:
"""Create a new ISMS scope definition."""
# Supersede existing scopes
existing = self.db.query(ISMSScopeDB).filter(
ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED
).all()
for s in existing:
s.status = ApprovalStatusEnum.SUPERSEDED
scope = ISMSScopeDB(
id=str(uuid.uuid4()),
scope_statement=scope_statement,
included_locations=included_locations,
included_processes=included_processes,
included_services=included_services,
excluded_items=excluded_items,
exclusion_justification=exclusion_justification,
organizational_boundary=organizational_boundary,
physical_boundary=physical_boundary,
technical_boundary=technical_boundary,
status=ApprovalStatusEnum.DRAFT,
created_by=created_by,
)
self.db.add(scope)
self.db.commit()
self.db.refresh(scope)
return scope
def get_current(self) -> Optional[ISMSScopeDB]:
"""Get the current (non-superseded) ISMS scope."""
return self.db.query(ISMSScopeDB).filter(
ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED
).order_by(ISMSScopeDB.created_at.desc()).first()
def get_by_id(self, scope_id: str) -> Optional[ISMSScopeDB]:
"""Get scope by ID."""
return self.db.query(ISMSScopeDB).filter(ISMSScopeDB.id == scope_id).first()
def approve(
self,
scope_id: str,
approved_by: str,
effective_date: date,
review_date: date,
) -> Optional[ISMSScopeDB]:
"""Approve the ISMS scope."""
scope = self.get_by_id(scope_id)
if not scope:
return None
import hashlib
scope.status = ApprovalStatusEnum.APPROVED
scope.approved_by = approved_by
scope.approved_at = datetime.utcnow()
scope.effective_date = effective_date
scope.review_date = review_date
scope.approval_signature = hashlib.sha256(
f"{scope.scope_statement}|{approved_by}|{datetime.utcnow().isoformat()}".encode()
).hexdigest()
self.db.commit()
self.db.refresh(scope)
return scope
class ISMSPolicyRepository:
"""Repository for ISMS Policies (ISO 27001 Chapter 5.2)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
policy_id: str,
title: str,
policy_type: str,
authored_by: str,
description: Optional[str] = None,
policy_text: Optional[str] = None,
applies_to: Optional[List[str]] = None,
review_frequency_months: int = 12,
related_controls: Optional[List[str]] = None,
) -> ISMSPolicyDB:
"""Create a new ISMS policy."""
policy = ISMSPolicyDB(
id=str(uuid.uuid4()),
policy_id=policy_id,
title=title,
policy_type=policy_type,
description=description,
policy_text=policy_text,
applies_to=applies_to,
review_frequency_months=review_frequency_months,
related_controls=related_controls,
authored_by=authored_by,
status=ApprovalStatusEnum.DRAFT,
)
self.db.add(policy)
self.db.commit()
self.db.refresh(policy)
return policy
def get_by_id(self, policy_id: str) -> Optional[ISMSPolicyDB]:
"""Get policy by UUID or policy_id."""
return self.db.query(ISMSPolicyDB).filter(
(ISMSPolicyDB.id == policy_id) | (ISMSPolicyDB.policy_id == policy_id)
).first()
def get_all(
self,
policy_type: Optional[str] = None,
status: Optional[ApprovalStatusEnum] = None,
) -> List[ISMSPolicyDB]:
"""Get all policies with optional filters."""
query = self.db.query(ISMSPolicyDB)
if policy_type:
query = query.filter(ISMSPolicyDB.policy_type == policy_type)
if status:
query = query.filter(ISMSPolicyDB.status == status)
return query.order_by(ISMSPolicyDB.policy_id).all()
def get_master_policy(self) -> Optional[ISMSPolicyDB]:
"""Get the approved master ISMS policy."""
return self.db.query(ISMSPolicyDB).filter(
ISMSPolicyDB.policy_type == "master",
ISMSPolicyDB.status == ApprovalStatusEnum.APPROVED
).first()
def approve(
self,
policy_id: str,
approved_by: str,
reviewed_by: str,
effective_date: date,
) -> Optional[ISMSPolicyDB]:
"""Approve a policy."""
policy = self.get_by_id(policy_id)
if not policy:
return None
import hashlib
policy.status = ApprovalStatusEnum.APPROVED
policy.reviewed_by = reviewed_by
policy.approved_by = approved_by
policy.approved_at = datetime.utcnow()
policy.effective_date = effective_date
policy.next_review_date = date(
effective_date.year + (policy.review_frequency_months // 12),
effective_date.month,
effective_date.day
)
policy.approval_signature = hashlib.sha256(
f"{policy.policy_id}|{approved_by}|{datetime.utcnow().isoformat()}".encode()
).hexdigest()
self.db.commit()
self.db.refresh(policy)
return policy
class SecurityObjectiveRepository:
"""Repository for Security Objectives (ISO 27001 Chapter 6.2)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
objective_id: str,
title: str,
description: str,
category: str,
owner: str,
kpi_name: Optional[str] = None,
kpi_target: Optional[float] = None,
kpi_unit: Optional[str] = None,
target_date: Optional[date] = None,
related_controls: Optional[List[str]] = None,
) -> SecurityObjectiveDB:
"""Create a new security objective."""
objective = SecurityObjectiveDB(
id=str(uuid.uuid4()),
objective_id=objective_id,
title=title,
description=description,
category=category,
kpi_name=kpi_name,
kpi_target=kpi_target,
kpi_unit=kpi_unit,
owner=owner,
target_date=target_date,
related_controls=related_controls,
status="active",
)
self.db.add(objective)
self.db.commit()
self.db.refresh(objective)
return objective
def get_by_id(self, objective_id: str) -> Optional[SecurityObjectiveDB]:
"""Get objective by UUID or objective_id."""
return self.db.query(SecurityObjectiveDB).filter(
(SecurityObjectiveDB.id == objective_id) |
(SecurityObjectiveDB.objective_id == objective_id)
).first()
def get_all(
self,
category: Optional[str] = None,
status: Optional[str] = None,
) -> List[SecurityObjectiveDB]:
"""Get all objectives with optional filters."""
query = self.db.query(SecurityObjectiveDB)
if category:
query = query.filter(SecurityObjectiveDB.category == category)
if status:
query = query.filter(SecurityObjectiveDB.status == status)
return query.order_by(SecurityObjectiveDB.objective_id).all()
def update_progress(
self,
objective_id: str,
kpi_current: float,
) -> Optional[SecurityObjectiveDB]:
"""Update objective progress."""
objective = self.get_by_id(objective_id)
if not objective:
return None
objective.kpi_current = kpi_current
if objective.kpi_target:
objective.progress_percentage = min(100, (kpi_current / objective.kpi_target) * 100)
# Auto-mark as achieved if 100%
if objective.progress_percentage >= 100 and objective.status == "active":
objective.status = "achieved"
objective.achieved_date = date.today()
self.db.commit()
self.db.refresh(objective)
return objective
class StatementOfApplicabilityRepository:
"""Repository for Statement of Applicability (SoA)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
annex_a_control: str,
annex_a_title: str,
annex_a_category: str,
is_applicable: bool = True,
applicability_justification: Optional[str] = None,
implementation_status: str = "planned",
breakpilot_control_ids: Optional[List[str]] = None,
) -> StatementOfApplicabilityDB:
"""Create a new SoA entry."""
entry = StatementOfApplicabilityDB(
id=str(uuid.uuid4()),
annex_a_control=annex_a_control,
annex_a_title=annex_a_title,
annex_a_category=annex_a_category,
is_applicable=is_applicable,
applicability_justification=applicability_justification,
implementation_status=implementation_status,
breakpilot_control_ids=breakpilot_control_ids or [],
)
self.db.add(entry)
self.db.commit()
self.db.refresh(entry)
return entry
def get_by_control(self, annex_a_control: str) -> Optional[StatementOfApplicabilityDB]:
"""Get SoA entry by Annex A control ID (e.g., 'A.5.1')."""
return self.db.query(StatementOfApplicabilityDB).filter(
StatementOfApplicabilityDB.annex_a_control == annex_a_control
).first()
def get_all(
self,
is_applicable: Optional[bool] = None,
implementation_status: Optional[str] = None,
category: Optional[str] = None,
) -> List[StatementOfApplicabilityDB]:
"""Get all SoA entries with optional filters."""
query = self.db.query(StatementOfApplicabilityDB)
if is_applicable is not None:
query = query.filter(StatementOfApplicabilityDB.is_applicable == is_applicable)
if implementation_status:
query = query.filter(StatementOfApplicabilityDB.implementation_status == implementation_status)
if category:
query = query.filter(StatementOfApplicabilityDB.annex_a_category == category)
return query.order_by(StatementOfApplicabilityDB.annex_a_control).all()
def get_statistics(self) -> Dict[str, Any]:
"""Get SoA statistics."""
entries = self.get_all()
total = len(entries)
applicable = sum(1 for e in entries if e.is_applicable)
implemented = sum(1 for e in entries if e.implementation_status == "implemented")
approved = sum(1 for e in entries if e.approved_at)
return {
"total": total,
"applicable": applicable,
"not_applicable": total - applicable,
"implemented": implemented,
"planned": sum(1 for e in entries if e.implementation_status == "planned"),
"approved": approved,
"pending_approval": total - approved,
"implementation_rate": round((implemented / applicable * 100) if applicable > 0 else 0, 1),
}
class AuditFindingRepository:
"""Repository for Audit Findings (Major/Minor/OFI)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
finding_type: FindingTypeEnum,
title: str,
description: str,
auditor: str,
iso_chapter: Optional[str] = None,
annex_a_control: Optional[str] = None,
objective_evidence: Optional[str] = None,
owner: Optional[str] = None,
due_date: Optional[date] = None,
internal_audit_id: Optional[str] = None,
) -> AuditFindingDB:
"""Create a new audit finding."""
# Generate finding ID
year = date.today().year
existing_count = self.db.query(AuditFindingDB).filter(
AuditFindingDB.finding_id.like(f"FIND-{year}-%")
).count()
finding_id = f"FIND-{year}-{existing_count + 1:03d}"
finding = AuditFindingDB(
id=str(uuid.uuid4()),
finding_id=finding_id,
finding_type=finding_type,
iso_chapter=iso_chapter,
annex_a_control=annex_a_control,
title=title,
description=description,
objective_evidence=objective_evidence,
owner=owner,
auditor=auditor,
due_date=due_date,
internal_audit_id=internal_audit_id,
status=FindingStatusEnum.OPEN,
)
self.db.add(finding)
self.db.commit()
self.db.refresh(finding)
return finding
def get_by_id(self, finding_id: str) -> Optional[AuditFindingDB]:
"""Get finding by UUID or finding_id."""
return self.db.query(AuditFindingDB).filter(
(AuditFindingDB.id == finding_id) | (AuditFindingDB.finding_id == finding_id)
).first()
def get_all(
self,
finding_type: Optional[FindingTypeEnum] = None,
status: Optional[FindingStatusEnum] = None,
internal_audit_id: Optional[str] = None,
) -> List[AuditFindingDB]:
"""Get all findings with optional filters."""
query = self.db.query(AuditFindingDB)
if finding_type:
query = query.filter(AuditFindingDB.finding_type == finding_type)
if status:
query = query.filter(AuditFindingDB.status == status)
if internal_audit_id:
query = query.filter(AuditFindingDB.internal_audit_id == internal_audit_id)
return query.order_by(AuditFindingDB.identified_date.desc()).all()
def get_open_majors(self) -> List[AuditFindingDB]:
"""Get all open major findings (blocking certification)."""
return self.db.query(AuditFindingDB).filter(
AuditFindingDB.finding_type == FindingTypeEnum.MAJOR,
AuditFindingDB.status != FindingStatusEnum.CLOSED
).all()
def get_statistics(self) -> Dict[str, Any]:
"""Get finding statistics."""
findings = self.get_all()
return {
"total": len(findings),
"major": sum(1 for f in findings if f.finding_type == FindingTypeEnum.MAJOR),
"minor": sum(1 for f in findings if f.finding_type == FindingTypeEnum.MINOR),
"ofi": sum(1 for f in findings if f.finding_type == FindingTypeEnum.OFI),
"positive": sum(1 for f in findings if f.finding_type == FindingTypeEnum.POSITIVE),
"open": sum(1 for f in findings if f.status != FindingStatusEnum.CLOSED),
"closed": sum(1 for f in findings if f.status == FindingStatusEnum.CLOSED),
"blocking_certification": sum(
1 for f in findings
if f.finding_type == FindingTypeEnum.MAJOR and f.status != FindingStatusEnum.CLOSED
),
}
def close(
self,
finding_id: str,
closed_by: str,
closure_notes: str,
verification_method: Optional[str] = None,
verification_evidence: Optional[str] = None,
) -> Optional[AuditFindingDB]:
"""Close a finding after verification."""
finding = self.get_by_id(finding_id)
if not finding:
return None
finding.status = FindingStatusEnum.CLOSED
finding.closed_date = date.today()
finding.closed_by = closed_by
finding.closure_notes = closure_notes
finding.verification_method = verification_method
finding.verification_evidence = verification_evidence
finding.verified_by = closed_by
finding.verified_at = datetime.utcnow()
self.db.commit()
self.db.refresh(finding)
return finding
class CorrectiveActionRepository:
"""Repository for Corrective/Preventive Actions (CAPA)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
finding_id: str,
capa_type: CAPATypeEnum,
title: str,
description: str,
assigned_to: str,
planned_completion: date,
expected_outcome: Optional[str] = None,
effectiveness_criteria: Optional[str] = None,
) -> CorrectiveActionDB:
"""Create a new CAPA."""
# Generate CAPA ID
year = date.today().year
existing_count = self.db.query(CorrectiveActionDB).filter(
CorrectiveActionDB.capa_id.like(f"CAPA-{year}-%")
).count()
capa_id = f"CAPA-{year}-{existing_count + 1:03d}"
capa = CorrectiveActionDB(
id=str(uuid.uuid4()),
capa_id=capa_id,
finding_id=finding_id,
capa_type=capa_type,
title=title,
description=description,
expected_outcome=expected_outcome,
assigned_to=assigned_to,
planned_completion=planned_completion,
effectiveness_criteria=effectiveness_criteria,
status="planned",
)
self.db.add(capa)
# Update finding status
finding = self.db.query(AuditFindingDB).filter(AuditFindingDB.id == finding_id).first()
if finding:
finding.status = FindingStatusEnum.CORRECTIVE_ACTION_PENDING
self.db.commit()
self.db.refresh(capa)
return capa
def get_by_id(self, capa_id: str) -> Optional[CorrectiveActionDB]:
"""Get CAPA by UUID or capa_id."""
return self.db.query(CorrectiveActionDB).filter(
(CorrectiveActionDB.id == capa_id) | (CorrectiveActionDB.capa_id == capa_id)
).first()
def get_by_finding(self, finding_id: str) -> List[CorrectiveActionDB]:
"""Get all CAPAs for a finding."""
return self.db.query(CorrectiveActionDB).filter(
CorrectiveActionDB.finding_id == finding_id
).order_by(CorrectiveActionDB.planned_completion).all()
def verify(
self,
capa_id: str,
verified_by: str,
is_effective: bool,
effectiveness_notes: Optional[str] = None,
) -> Optional[CorrectiveActionDB]:
"""Verify a completed CAPA."""
capa = self.get_by_id(capa_id)
if not capa:
return None
capa.effectiveness_verified = is_effective
capa.effectiveness_verification_date = date.today()
capa.effectiveness_notes = effectiveness_notes
capa.status = "verified" if is_effective else "completed"
# If verified, check if all CAPAs for finding are verified
if is_effective:
finding = self.db.query(AuditFindingDB).filter(
AuditFindingDB.id == capa.finding_id
).first()
if finding:
unverified = self.db.query(CorrectiveActionDB).filter(
CorrectiveActionDB.finding_id == finding.id,
CorrectiveActionDB.id != capa.id,
CorrectiveActionDB.status != "verified"
).count()
if unverified == 0:
finding.status = FindingStatusEnum.VERIFICATION_PENDING
self.db.commit()
self.db.refresh(capa)
return capa
class ManagementReviewRepository:
"""Repository for Management Reviews (ISO 27001 Chapter 9.3)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
title: str,
review_date: date,
chairperson: str,
review_period_start: Optional[date] = None,
review_period_end: Optional[date] = None,
) -> ManagementReviewDB:
"""Create a new management review."""
# Generate review ID
year = review_date.year
quarter = (review_date.month - 1) // 3 + 1
review_id = f"MR-{year}-Q{quarter}"
# Check for duplicate
existing = self.db.query(ManagementReviewDB).filter(
ManagementReviewDB.review_id == review_id
).first()
if existing:
review_id = f"{review_id}-{str(uuid.uuid4())[:4]}"
review = ManagementReviewDB(
id=str(uuid.uuid4()),
review_id=review_id,
title=title,
review_date=review_date,
review_period_start=review_period_start,
review_period_end=review_period_end,
chairperson=chairperson,
status="draft",
)
self.db.add(review)
self.db.commit()
self.db.refresh(review)
return review
def get_by_id(self, review_id: str) -> Optional[ManagementReviewDB]:
"""Get review by UUID or review_id."""
return self.db.query(ManagementReviewDB).filter(
(ManagementReviewDB.id == review_id) | (ManagementReviewDB.review_id == review_id)
).first()
def get_latest_approved(self) -> Optional[ManagementReviewDB]:
"""Get the most recent approved management review."""
return self.db.query(ManagementReviewDB).filter(
ManagementReviewDB.status == "approved"
).order_by(ManagementReviewDB.review_date.desc()).first()
def approve(
self,
review_id: str,
approved_by: str,
next_review_date: date,
minutes_document_path: Optional[str] = None,
) -> Optional[ManagementReviewDB]:
"""Approve a management review."""
review = self.get_by_id(review_id)
if not review:
return None
review.status = "approved"
review.approved_by = approved_by
review.approved_at = datetime.utcnow()
review.next_review_date = next_review_date
review.minutes_document_path = minutes_document_path
self.db.commit()
self.db.refresh(review)
return review
class InternalAuditRepository:
"""Repository for Internal Audits (ISO 27001 Chapter 9.2)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
title: str,
audit_type: str,
planned_date: date,
lead_auditor: str,
scope_description: Optional[str] = None,
iso_chapters_covered: Optional[List[str]] = None,
annex_a_controls_covered: Optional[List[str]] = None,
) -> InternalAuditDB:
"""Create a new internal audit."""
# Generate audit ID
year = planned_date.year
existing_count = self.db.query(InternalAuditDB).filter(
InternalAuditDB.audit_id.like(f"IA-{year}-%")
).count()
audit_id = f"IA-{year}-{existing_count + 1:03d}"
audit = InternalAuditDB(
id=str(uuid.uuid4()),
audit_id=audit_id,
title=title,
audit_type=audit_type,
scope_description=scope_description,
iso_chapters_covered=iso_chapters_covered,
annex_a_controls_covered=annex_a_controls_covered,
planned_date=planned_date,
lead_auditor=lead_auditor,
status="planned",
)
self.db.add(audit)
self.db.commit()
self.db.refresh(audit)
return audit
def get_by_id(self, audit_id: str) -> Optional[InternalAuditDB]:
"""Get audit by UUID or audit_id."""
return self.db.query(InternalAuditDB).filter(
(InternalAuditDB.id == audit_id) | (InternalAuditDB.audit_id == audit_id)
).first()
def get_latest_completed(self) -> Optional[InternalAuditDB]:
"""Get the most recent completed internal audit."""
return self.db.query(InternalAuditDB).filter(
InternalAuditDB.status == "completed"
).order_by(InternalAuditDB.actual_end_date.desc()).first()
def complete(
self,
audit_id: str,
audit_conclusion: str,
overall_assessment: str,
follow_up_audit_required: bool = False,
) -> Optional[InternalAuditDB]:
"""Complete an internal audit."""
audit = self.get_by_id(audit_id)
if not audit:
return None
audit.status = "completed"
audit.actual_end_date = date.today()
audit.report_date = date.today()
audit.audit_conclusion = audit_conclusion
audit.overall_assessment = overall_assessment
audit.follow_up_audit_required = follow_up_audit_required
self.db.commit()
self.db.refresh(audit)
return audit
class AuditTrailRepository:
"""Repository for Audit Trail entries."""
def __init__(self, db: DBSession):
self.db = db
def log(
self,
entity_type: str,
entity_id: str,
entity_name: str,
action: str,
performed_by: str,
field_changed: Optional[str] = None,
old_value: Optional[str] = None,
new_value: Optional[str] = None,
change_summary: Optional[str] = None,
) -> AuditTrailDB:
"""Log an audit trail entry."""
import hashlib
entry = AuditTrailDB(
id=str(uuid.uuid4()),
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
action=action,
field_changed=field_changed,
old_value=old_value,
new_value=new_value,
change_summary=change_summary,
performed_by=performed_by,
performed_at=datetime.utcnow(),
checksum=hashlib.sha256(
f"{entity_type}|{entity_id}|{action}|{performed_by}".encode()
).hexdigest(),
)
self.db.add(entry)
self.db.commit()
self.db.refresh(entry)
return entry
def get_by_entity(
self,
entity_type: str,
entity_id: str,
limit: int = 100,
) -> List[AuditTrailDB]:
"""Get audit trail for a specific entity."""
return self.db.query(AuditTrailDB).filter(
AuditTrailDB.entity_type == entity_type,
AuditTrailDB.entity_id == entity_id
).order_by(AuditTrailDB.performed_at.desc()).limit(limit).all()
def get_paginated(
self,
page: int = 1,
page_size: int = 50,
entity_type: Optional[str] = None,
entity_id: Optional[str] = None,
performed_by: Optional[str] = None,
action: Optional[str] = None,
) -> Tuple[List[AuditTrailDB], int]:
"""Get paginated audit trail with filters."""
query = self.db.query(AuditTrailDB)
if entity_type:
query = query.filter(AuditTrailDB.entity_type == entity_type)
if entity_id:
query = query.filter(AuditTrailDB.entity_id == entity_id)
if performed_by:
query = query.filter(AuditTrailDB.performed_by == performed_by)
if action:
query = query.filter(AuditTrailDB.action == action)
total = query.count()
entries = query.order_by(AuditTrailDB.performed_at.desc()).offset(
(page - 1) * page_size
).limit(page_size).all()
return entries, total
class ISMSReadinessCheckRepository:
"""Repository for ISMS Readiness Check results."""
def __init__(self, db: DBSession):
self.db = db
def save(self, check: ISMSReadinessCheckDB) -> ISMSReadinessCheckDB:
"""Save a readiness check result."""
self.db.add(check)
self.db.commit()
self.db.refresh(check)
return check
def get_latest(self) -> Optional[ISMSReadinessCheckDB]:
"""Get the most recent readiness check."""
return self.db.query(ISMSReadinessCheckDB).order_by(
ISMSReadinessCheckDB.check_date.desc()
).first()
def get_history(self, limit: int = 10) -> List[ISMSReadinessCheckDB]:
"""Get readiness check history."""
return self.db.query(ISMSReadinessCheckDB).order_by(
ISMSReadinessCheckDB.check_date.desc()
).limit(limit).all()