refactor(backend/db): split repository.py + isms_repository.py per-aggregate

Phase 1 Step 5 of PHASE1_RUNBOOK.md.

compliance/db/repository.py (1547 LOC) decomposed into seven sibling
per-aggregate repository modules:

  regulation_repository.py     (268) — Regulation + Requirement
  control_repository.py        (291) — Control + ControlMapping
  evidence_repository.py       (143)
  risk_repository.py           (148)
  audit_export_repository.py   (110)
  service_module_repository.py (247)
  audit_session_repository.py  (478) — AuditSession + AuditSignOff

compliance/db/isms_repository.py (838 LOC) decomposed into two
sub-aggregate modules mirroring the models split:

  isms_governance_repository.py (354) — Scope, Policy, Objective, SoA
  isms_audit_repository.py      (499) — Finding, CAPA, Review, Internal Audit,
                                         Trail, Readiness

Both original files become thin re-export shims (37 and 25 LOC
respectively) so every existing import continues to work unchanged.
New code SHOULD import from the aggregate module directly.

All new sibling files under the 500-line hard cap; largest is
isms_audit_repository.py at 499 (on the edge; when Phase 1 Step 4
router->service extraction lands, the audit_session repo may split
further if growth exceeds 500).

Verified:
  - 173/173 pytest compliance/tests/ tests/contracts/ pass
  - OpenAPI 360 paths / 484 operations unchanged
  - All repo files under 500 LOC

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sharang Parnerkar
2026-04-07 18:08:39 +02:00
parent d9dcfb97ef
commit 482e8574ad
11 changed files with 2590 additions and 2375 deletions

View File

@@ -1,838 +1,25 @@
"""
Repository layer for ISMS (Information Security Management System) entities.
compliance.db.isms_repository — backwards-compatibility re-export shim.
Provides CRUD operations for ISO 27001 certification-related entities:
- ISMS Scope & Context
- Policies & Objectives
- Statement of Applicability (SoA)
- Audit Findings & CAPA
- Management Reviews & Internal Audits
Phase 1 Step 5 split the 838-line ISMS repository module into two
sub-aggregate sibling modules: governance (scope, policy, objective, SoA)
and audit execution (finding, CAPA, review, internal audit, trail, readiness).
Every repository class is re-exported so existing imports continue to work.
New code SHOULD import from the sub-aggregate module directly.
"""
import uuid
from datetime import datetime, date, timezone
from typing import List, Optional, Dict, Any, Tuple
from sqlalchemy.orm import Session as DBSession
from .models import (
ISMSScopeDB, ISMSPolicyDB, SecurityObjectiveDB,
StatementOfApplicabilityDB, AuditFindingDB, CorrectiveActionDB,
ManagementReviewDB, InternalAuditDB, AuditTrailDB, ISMSReadinessCheckDB,
ApprovalStatusEnum, FindingTypeEnum, FindingStatusEnum, CAPATypeEnum
from compliance.db.isms_governance_repository import ( # noqa: F401
ISMSScopeRepository,
ISMSPolicyRepository,
SecurityObjectiveRepository,
StatementOfApplicabilityRepository,
)
from compliance.db.isms_audit_repository import ( # noqa: F401
AuditFindingRepository,
CorrectiveActionRepository,
ManagementReviewRepository,
InternalAuditRepository,
AuditTrailRepository,
ISMSReadinessCheckRepository,
)
class ISMSScopeRepository:
"""Repository for ISMS Scope (ISO 27001 Chapter 4.3)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
scope_statement: str,
created_by: str,
included_locations: Optional[List[str]] = None,
included_processes: Optional[List[str]] = None,
included_services: Optional[List[str]] = None,
excluded_items: Optional[List[str]] = None,
exclusion_justification: Optional[str] = None,
organizational_boundary: Optional[str] = None,
physical_boundary: Optional[str] = None,
technical_boundary: Optional[str] = None,
) -> ISMSScopeDB:
"""Create a new ISMS scope definition."""
# Supersede existing scopes
existing = self.db.query(ISMSScopeDB).filter(
ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED
).all()
for s in existing:
s.status = ApprovalStatusEnum.SUPERSEDED
scope = ISMSScopeDB(
id=str(uuid.uuid4()),
scope_statement=scope_statement,
included_locations=included_locations,
included_processes=included_processes,
included_services=included_services,
excluded_items=excluded_items,
exclusion_justification=exclusion_justification,
organizational_boundary=organizational_boundary,
physical_boundary=physical_boundary,
technical_boundary=technical_boundary,
status=ApprovalStatusEnum.DRAFT,
created_by=created_by,
)
self.db.add(scope)
self.db.commit()
self.db.refresh(scope)
return scope
def get_current(self) -> Optional[ISMSScopeDB]:
"""Get the current (non-superseded) ISMS scope."""
return self.db.query(ISMSScopeDB).filter(
ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED
).order_by(ISMSScopeDB.created_at.desc()).first()
def get_by_id(self, scope_id: str) -> Optional[ISMSScopeDB]:
"""Get scope by ID."""
return self.db.query(ISMSScopeDB).filter(ISMSScopeDB.id == scope_id).first()
def approve(
self,
scope_id: str,
approved_by: str,
effective_date: date,
review_date: date,
) -> Optional[ISMSScopeDB]:
"""Approve the ISMS scope."""
scope = self.get_by_id(scope_id)
if not scope:
return None
import hashlib
scope.status = ApprovalStatusEnum.APPROVED
scope.approved_by = approved_by
scope.approved_at = datetime.now(timezone.utc)
scope.effective_date = effective_date
scope.review_date = review_date
scope.approval_signature = hashlib.sha256(
f"{scope.scope_statement}|{approved_by}|{datetime.now(timezone.utc).isoformat()}".encode()
).hexdigest()
self.db.commit()
self.db.refresh(scope)
return scope
class ISMSPolicyRepository:
"""Repository for ISMS Policies (ISO 27001 Chapter 5.2)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
policy_id: str,
title: str,
policy_type: str,
authored_by: str,
description: Optional[str] = None,
policy_text: Optional[str] = None,
applies_to: Optional[List[str]] = None,
review_frequency_months: int = 12,
related_controls: Optional[List[str]] = None,
) -> ISMSPolicyDB:
"""Create a new ISMS policy."""
policy = ISMSPolicyDB(
id=str(uuid.uuid4()),
policy_id=policy_id,
title=title,
policy_type=policy_type,
description=description,
policy_text=policy_text,
applies_to=applies_to,
review_frequency_months=review_frequency_months,
related_controls=related_controls,
authored_by=authored_by,
status=ApprovalStatusEnum.DRAFT,
)
self.db.add(policy)
self.db.commit()
self.db.refresh(policy)
return policy
def get_by_id(self, policy_id: str) -> Optional[ISMSPolicyDB]:
"""Get policy by UUID or policy_id."""
return self.db.query(ISMSPolicyDB).filter(
(ISMSPolicyDB.id == policy_id) | (ISMSPolicyDB.policy_id == policy_id)
).first()
def get_all(
self,
policy_type: Optional[str] = None,
status: Optional[ApprovalStatusEnum] = None,
) -> List[ISMSPolicyDB]:
"""Get all policies with optional filters."""
query = self.db.query(ISMSPolicyDB)
if policy_type:
query = query.filter(ISMSPolicyDB.policy_type == policy_type)
if status:
query = query.filter(ISMSPolicyDB.status == status)
return query.order_by(ISMSPolicyDB.policy_id).all()
def get_master_policy(self) -> Optional[ISMSPolicyDB]:
"""Get the approved master ISMS policy."""
return self.db.query(ISMSPolicyDB).filter(
ISMSPolicyDB.policy_type == "master",
ISMSPolicyDB.status == ApprovalStatusEnum.APPROVED
).first()
def approve(
self,
policy_id: str,
approved_by: str,
reviewed_by: str,
effective_date: date,
) -> Optional[ISMSPolicyDB]:
"""Approve a policy."""
policy = self.get_by_id(policy_id)
if not policy:
return None
import hashlib
policy.status = ApprovalStatusEnum.APPROVED
policy.reviewed_by = reviewed_by
policy.approved_by = approved_by
policy.approved_at = datetime.now(timezone.utc)
policy.effective_date = effective_date
policy.next_review_date = date(
effective_date.year + (policy.review_frequency_months // 12),
effective_date.month,
effective_date.day
)
policy.approval_signature = hashlib.sha256(
f"{policy.policy_id}|{approved_by}|{datetime.now(timezone.utc).isoformat()}".encode()
).hexdigest()
self.db.commit()
self.db.refresh(policy)
return policy
class SecurityObjectiveRepository:
"""Repository for Security Objectives (ISO 27001 Chapter 6.2)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
objective_id: str,
title: str,
description: str,
category: str,
owner: str,
kpi_name: Optional[str] = None,
kpi_target: Optional[float] = None,
kpi_unit: Optional[str] = None,
target_date: Optional[date] = None,
related_controls: Optional[List[str]] = None,
) -> SecurityObjectiveDB:
"""Create a new security objective."""
objective = SecurityObjectiveDB(
id=str(uuid.uuid4()),
objective_id=objective_id,
title=title,
description=description,
category=category,
kpi_name=kpi_name,
kpi_target=kpi_target,
kpi_unit=kpi_unit,
owner=owner,
target_date=target_date,
related_controls=related_controls,
status="active",
)
self.db.add(objective)
self.db.commit()
self.db.refresh(objective)
return objective
def get_by_id(self, objective_id: str) -> Optional[SecurityObjectiveDB]:
"""Get objective by UUID or objective_id."""
return self.db.query(SecurityObjectiveDB).filter(
(SecurityObjectiveDB.id == objective_id) |
(SecurityObjectiveDB.objective_id == objective_id)
).first()
def get_all(
self,
category: Optional[str] = None,
status: Optional[str] = None,
) -> List[SecurityObjectiveDB]:
"""Get all objectives with optional filters."""
query = self.db.query(SecurityObjectiveDB)
if category:
query = query.filter(SecurityObjectiveDB.category == category)
if status:
query = query.filter(SecurityObjectiveDB.status == status)
return query.order_by(SecurityObjectiveDB.objective_id).all()
def update_progress(
self,
objective_id: str,
kpi_current: float,
) -> Optional[SecurityObjectiveDB]:
"""Update objective progress."""
objective = self.get_by_id(objective_id)
if not objective:
return None
objective.kpi_current = kpi_current
if objective.kpi_target:
objective.progress_percentage = min(100, (kpi_current / objective.kpi_target) * 100)
# Auto-mark as achieved if 100%
if objective.progress_percentage >= 100 and objective.status == "active":
objective.status = "achieved"
objective.achieved_date = date.today()
self.db.commit()
self.db.refresh(objective)
return objective
class StatementOfApplicabilityRepository:
"""Repository for Statement of Applicability (SoA)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
annex_a_control: str,
annex_a_title: str,
annex_a_category: str,
is_applicable: bool = True,
applicability_justification: Optional[str] = None,
implementation_status: str = "planned",
breakpilot_control_ids: Optional[List[str]] = None,
) -> StatementOfApplicabilityDB:
"""Create a new SoA entry."""
entry = StatementOfApplicabilityDB(
id=str(uuid.uuid4()),
annex_a_control=annex_a_control,
annex_a_title=annex_a_title,
annex_a_category=annex_a_category,
is_applicable=is_applicable,
applicability_justification=applicability_justification,
implementation_status=implementation_status,
breakpilot_control_ids=breakpilot_control_ids or [],
)
self.db.add(entry)
self.db.commit()
self.db.refresh(entry)
return entry
def get_by_control(self, annex_a_control: str) -> Optional[StatementOfApplicabilityDB]:
"""Get SoA entry by Annex A control ID (e.g., 'A.5.1')."""
return self.db.query(StatementOfApplicabilityDB).filter(
StatementOfApplicabilityDB.annex_a_control == annex_a_control
).first()
def get_all(
self,
is_applicable: Optional[bool] = None,
implementation_status: Optional[str] = None,
category: Optional[str] = None,
) -> List[StatementOfApplicabilityDB]:
"""Get all SoA entries with optional filters."""
query = self.db.query(StatementOfApplicabilityDB)
if is_applicable is not None:
query = query.filter(StatementOfApplicabilityDB.is_applicable == is_applicable)
if implementation_status:
query = query.filter(StatementOfApplicabilityDB.implementation_status == implementation_status)
if category:
query = query.filter(StatementOfApplicabilityDB.annex_a_category == category)
return query.order_by(StatementOfApplicabilityDB.annex_a_control).all()
def get_statistics(self) -> Dict[str, Any]:
"""Get SoA statistics."""
entries = self.get_all()
total = len(entries)
applicable = sum(1 for e in entries if e.is_applicable)
implemented = sum(1 for e in entries if e.implementation_status == "implemented")
approved = sum(1 for e in entries if e.approved_at)
return {
"total": total,
"applicable": applicable,
"not_applicable": total - applicable,
"implemented": implemented,
"planned": sum(1 for e in entries if e.implementation_status == "planned"),
"approved": approved,
"pending_approval": total - approved,
"implementation_rate": round((implemented / applicable * 100) if applicable > 0 else 0, 1),
}
class AuditFindingRepository:
"""Repository for Audit Findings (Major/Minor/OFI)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
finding_type: FindingTypeEnum,
title: str,
description: str,
auditor: str,
iso_chapter: Optional[str] = None,
annex_a_control: Optional[str] = None,
objective_evidence: Optional[str] = None,
owner: Optional[str] = None,
due_date: Optional[date] = None,
internal_audit_id: Optional[str] = None,
) -> AuditFindingDB:
"""Create a new audit finding."""
# Generate finding ID
year = date.today().year
existing_count = self.db.query(AuditFindingDB).filter(
AuditFindingDB.finding_id.like(f"FIND-{year}-%")
).count()
finding_id = f"FIND-{year}-{existing_count + 1:03d}"
finding = AuditFindingDB(
id=str(uuid.uuid4()),
finding_id=finding_id,
finding_type=finding_type,
iso_chapter=iso_chapter,
annex_a_control=annex_a_control,
title=title,
description=description,
objective_evidence=objective_evidence,
owner=owner,
auditor=auditor,
due_date=due_date,
internal_audit_id=internal_audit_id,
status=FindingStatusEnum.OPEN,
)
self.db.add(finding)
self.db.commit()
self.db.refresh(finding)
return finding
def get_by_id(self, finding_id: str) -> Optional[AuditFindingDB]:
"""Get finding by UUID or finding_id."""
return self.db.query(AuditFindingDB).filter(
(AuditFindingDB.id == finding_id) | (AuditFindingDB.finding_id == finding_id)
).first()
def get_all(
self,
finding_type: Optional[FindingTypeEnum] = None,
status: Optional[FindingStatusEnum] = None,
internal_audit_id: Optional[str] = None,
) -> List[AuditFindingDB]:
"""Get all findings with optional filters."""
query = self.db.query(AuditFindingDB)
if finding_type:
query = query.filter(AuditFindingDB.finding_type == finding_type)
if status:
query = query.filter(AuditFindingDB.status == status)
if internal_audit_id:
query = query.filter(AuditFindingDB.internal_audit_id == internal_audit_id)
return query.order_by(AuditFindingDB.identified_date.desc()).all()
def get_open_majors(self) -> List[AuditFindingDB]:
"""Get all open major findings (blocking certification)."""
return self.db.query(AuditFindingDB).filter(
AuditFindingDB.finding_type == FindingTypeEnum.MAJOR,
AuditFindingDB.status != FindingStatusEnum.CLOSED
).all()
def get_statistics(self) -> Dict[str, Any]:
"""Get finding statistics."""
findings = self.get_all()
return {
"total": len(findings),
"major": sum(1 for f in findings if f.finding_type == FindingTypeEnum.MAJOR),
"minor": sum(1 for f in findings if f.finding_type == FindingTypeEnum.MINOR),
"ofi": sum(1 for f in findings if f.finding_type == FindingTypeEnum.OFI),
"positive": sum(1 for f in findings if f.finding_type == FindingTypeEnum.POSITIVE),
"open": sum(1 for f in findings if f.status != FindingStatusEnum.CLOSED),
"closed": sum(1 for f in findings if f.status == FindingStatusEnum.CLOSED),
"blocking_certification": sum(
1 for f in findings
if f.finding_type == FindingTypeEnum.MAJOR and f.status != FindingStatusEnum.CLOSED
),
}
def close(
self,
finding_id: str,
closed_by: str,
closure_notes: str,
verification_method: Optional[str] = None,
verification_evidence: Optional[str] = None,
) -> Optional[AuditFindingDB]:
"""Close a finding after verification."""
finding = self.get_by_id(finding_id)
if not finding:
return None
finding.status = FindingStatusEnum.CLOSED
finding.closed_date = date.today()
finding.closed_by = closed_by
finding.closure_notes = closure_notes
finding.verification_method = verification_method
finding.verification_evidence = verification_evidence
finding.verified_by = closed_by
finding.verified_at = datetime.now(timezone.utc)
self.db.commit()
self.db.refresh(finding)
return finding
class CorrectiveActionRepository:
"""Repository for Corrective/Preventive Actions (CAPA)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
finding_id: str,
capa_type: CAPATypeEnum,
title: str,
description: str,
assigned_to: str,
planned_completion: date,
expected_outcome: Optional[str] = None,
effectiveness_criteria: Optional[str] = None,
) -> CorrectiveActionDB:
"""Create a new CAPA."""
# Generate CAPA ID
year = date.today().year
existing_count = self.db.query(CorrectiveActionDB).filter(
CorrectiveActionDB.capa_id.like(f"CAPA-{year}-%")
).count()
capa_id = f"CAPA-{year}-{existing_count + 1:03d}"
capa = CorrectiveActionDB(
id=str(uuid.uuid4()),
capa_id=capa_id,
finding_id=finding_id,
capa_type=capa_type,
title=title,
description=description,
expected_outcome=expected_outcome,
assigned_to=assigned_to,
planned_completion=planned_completion,
effectiveness_criteria=effectiveness_criteria,
status="planned",
)
self.db.add(capa)
# Update finding status
finding = self.db.query(AuditFindingDB).filter(AuditFindingDB.id == finding_id).first()
if finding:
finding.status = FindingStatusEnum.CORRECTIVE_ACTION_PENDING
self.db.commit()
self.db.refresh(capa)
return capa
def get_by_id(self, capa_id: str) -> Optional[CorrectiveActionDB]:
"""Get CAPA by UUID or capa_id."""
return self.db.query(CorrectiveActionDB).filter(
(CorrectiveActionDB.id == capa_id) | (CorrectiveActionDB.capa_id == capa_id)
).first()
def get_by_finding(self, finding_id: str) -> List[CorrectiveActionDB]:
"""Get all CAPAs for a finding."""
return self.db.query(CorrectiveActionDB).filter(
CorrectiveActionDB.finding_id == finding_id
).order_by(CorrectiveActionDB.planned_completion).all()
def verify(
self,
capa_id: str,
verified_by: str,
is_effective: bool,
effectiveness_notes: Optional[str] = None,
) -> Optional[CorrectiveActionDB]:
"""Verify a completed CAPA."""
capa = self.get_by_id(capa_id)
if not capa:
return None
capa.effectiveness_verified = is_effective
capa.effectiveness_verification_date = date.today()
capa.effectiveness_notes = effectiveness_notes
capa.status = "verified" if is_effective else "completed"
# If verified, check if all CAPAs for finding are verified
if is_effective:
finding = self.db.query(AuditFindingDB).filter(
AuditFindingDB.id == capa.finding_id
).first()
if finding:
unverified = self.db.query(CorrectiveActionDB).filter(
CorrectiveActionDB.finding_id == finding.id,
CorrectiveActionDB.id != capa.id,
CorrectiveActionDB.status != "verified"
).count()
if unverified == 0:
finding.status = FindingStatusEnum.VERIFICATION_PENDING
self.db.commit()
self.db.refresh(capa)
return capa
class ManagementReviewRepository:
"""Repository for Management Reviews (ISO 27001 Chapter 9.3)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
title: str,
review_date: date,
chairperson: str,
review_period_start: Optional[date] = None,
review_period_end: Optional[date] = None,
) -> ManagementReviewDB:
"""Create a new management review."""
# Generate review ID
year = review_date.year
quarter = (review_date.month - 1) // 3 + 1
review_id = f"MR-{year}-Q{quarter}"
# Check for duplicate
existing = self.db.query(ManagementReviewDB).filter(
ManagementReviewDB.review_id == review_id
).first()
if existing:
review_id = f"{review_id}-{str(uuid.uuid4())[:4]}"
review = ManagementReviewDB(
id=str(uuid.uuid4()),
review_id=review_id,
title=title,
review_date=review_date,
review_period_start=review_period_start,
review_period_end=review_period_end,
chairperson=chairperson,
status="draft",
)
self.db.add(review)
self.db.commit()
self.db.refresh(review)
return review
def get_by_id(self, review_id: str) -> Optional[ManagementReviewDB]:
"""Get review by UUID or review_id."""
return self.db.query(ManagementReviewDB).filter(
(ManagementReviewDB.id == review_id) | (ManagementReviewDB.review_id == review_id)
).first()
def get_latest_approved(self) -> Optional[ManagementReviewDB]:
"""Get the most recent approved management review."""
return self.db.query(ManagementReviewDB).filter(
ManagementReviewDB.status == "approved"
).order_by(ManagementReviewDB.review_date.desc()).first()
def approve(
self,
review_id: str,
approved_by: str,
next_review_date: date,
minutes_document_path: Optional[str] = None,
) -> Optional[ManagementReviewDB]:
"""Approve a management review."""
review = self.get_by_id(review_id)
if not review:
return None
review.status = "approved"
review.approved_by = approved_by
review.approved_at = datetime.now(timezone.utc)
review.next_review_date = next_review_date
review.minutes_document_path = minutes_document_path
self.db.commit()
self.db.refresh(review)
return review
class InternalAuditRepository:
"""Repository for Internal Audits (ISO 27001 Chapter 9.2)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
title: str,
audit_type: str,
planned_date: date,
lead_auditor: str,
scope_description: Optional[str] = None,
iso_chapters_covered: Optional[List[str]] = None,
annex_a_controls_covered: Optional[List[str]] = None,
) -> InternalAuditDB:
"""Create a new internal audit."""
# Generate audit ID
year = planned_date.year
existing_count = self.db.query(InternalAuditDB).filter(
InternalAuditDB.audit_id.like(f"IA-{year}-%")
).count()
audit_id = f"IA-{year}-{existing_count + 1:03d}"
audit = InternalAuditDB(
id=str(uuid.uuid4()),
audit_id=audit_id,
title=title,
audit_type=audit_type,
scope_description=scope_description,
iso_chapters_covered=iso_chapters_covered,
annex_a_controls_covered=annex_a_controls_covered,
planned_date=planned_date,
lead_auditor=lead_auditor,
status="planned",
)
self.db.add(audit)
self.db.commit()
self.db.refresh(audit)
return audit
def get_by_id(self, audit_id: str) -> Optional[InternalAuditDB]:
"""Get audit by UUID or audit_id."""
return self.db.query(InternalAuditDB).filter(
(InternalAuditDB.id == audit_id) | (InternalAuditDB.audit_id == audit_id)
).first()
def get_latest_completed(self) -> Optional[InternalAuditDB]:
"""Get the most recent completed internal audit."""
return self.db.query(InternalAuditDB).filter(
InternalAuditDB.status == "completed"
).order_by(InternalAuditDB.actual_end_date.desc()).first()
def complete(
self,
audit_id: str,
audit_conclusion: str,
overall_assessment: str,
follow_up_audit_required: bool = False,
) -> Optional[InternalAuditDB]:
"""Complete an internal audit."""
audit = self.get_by_id(audit_id)
if not audit:
return None
audit.status = "completed"
audit.actual_end_date = date.today()
audit.report_date = date.today()
audit.audit_conclusion = audit_conclusion
audit.overall_assessment = overall_assessment
audit.follow_up_audit_required = follow_up_audit_required
self.db.commit()
self.db.refresh(audit)
return audit
class AuditTrailRepository:
"""Repository for Audit Trail entries."""
def __init__(self, db: DBSession):
self.db = db
def log(
self,
entity_type: str,
entity_id: str,
entity_name: str,
action: str,
performed_by: str,
field_changed: Optional[str] = None,
old_value: Optional[str] = None,
new_value: Optional[str] = None,
change_summary: Optional[str] = None,
) -> AuditTrailDB:
"""Log an audit trail entry."""
import hashlib
entry = AuditTrailDB(
id=str(uuid.uuid4()),
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
action=action,
field_changed=field_changed,
old_value=old_value,
new_value=new_value,
change_summary=change_summary,
performed_by=performed_by,
performed_at=datetime.now(timezone.utc),
checksum=hashlib.sha256(
f"{entity_type}|{entity_id}|{action}|{performed_by}".encode()
).hexdigest(),
)
self.db.add(entry)
self.db.commit()
self.db.refresh(entry)
return entry
def get_by_entity(
self,
entity_type: str,
entity_id: str,
limit: int = 100,
) -> List[AuditTrailDB]:
"""Get audit trail for a specific entity."""
return self.db.query(AuditTrailDB).filter(
AuditTrailDB.entity_type == entity_type,
AuditTrailDB.entity_id == entity_id
).order_by(AuditTrailDB.performed_at.desc()).limit(limit).all()
def get_paginated(
self,
page: int = 1,
page_size: int = 50,
entity_type: Optional[str] = None,
entity_id: Optional[str] = None,
performed_by: Optional[str] = None,
action: Optional[str] = None,
) -> Tuple[List[AuditTrailDB], int]:
"""Get paginated audit trail with filters."""
query = self.db.query(AuditTrailDB)
if entity_type:
query = query.filter(AuditTrailDB.entity_type == entity_type)
if entity_id:
query = query.filter(AuditTrailDB.entity_id == entity_id)
if performed_by:
query = query.filter(AuditTrailDB.performed_by == performed_by)
if action:
query = query.filter(AuditTrailDB.action == action)
total = query.count()
entries = query.order_by(AuditTrailDB.performed_at.desc()).offset(
(page - 1) * page_size
).limit(page_size).all()
return entries, total
class ISMSReadinessCheckRepository:
"""Repository for ISMS Readiness Check results."""
def __init__(self, db: DBSession):
self.db = db
def save(self, check: ISMSReadinessCheckDB) -> ISMSReadinessCheckDB:
"""Save a readiness check result."""
self.db.add(check)
self.db.commit()
self.db.refresh(check)
return check
def get_latest(self) -> Optional[ISMSReadinessCheckDB]:
"""Get the most recent readiness check."""
return self.db.query(ISMSReadinessCheckDB).order_by(
ISMSReadinessCheckDB.check_date.desc()
).first()
def get_history(self, limit: int = 10) -> List[ISMSReadinessCheckDB]:
"""Get readiness check history."""
return self.db.query(ISMSReadinessCheckDB).order_by(
ISMSReadinessCheckDB.check_date.desc()
).limit(limit).all()