A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.
This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).
Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
840 lines
29 KiB
Python
840 lines
29 KiB
Python
"""
|
|
Repository layer for ISMS (Information Security Management System) entities.
|
|
|
|
Provides CRUD operations for ISO 27001 certification-related entities:
|
|
- ISMS Scope & Context
|
|
- Policies & Objectives
|
|
- Statement of Applicability (SoA)
|
|
- Audit Findings & CAPA
|
|
- Management Reviews & Internal Audits
|
|
"""
|
|
|
|
import uuid
|
|
from datetime import datetime, date
|
|
from typing import List, Optional, Dict, Any, Tuple
|
|
|
|
from sqlalchemy.orm import Session as DBSession
|
|
from sqlalchemy import func, and_, or_
|
|
|
|
from .models import (
|
|
ISMSScopeDB, ISMSContextDB, ISMSPolicyDB, SecurityObjectiveDB,
|
|
StatementOfApplicabilityDB, AuditFindingDB, CorrectiveActionDB,
|
|
ManagementReviewDB, InternalAuditDB, AuditTrailDB, ISMSReadinessCheckDB,
|
|
ApprovalStatusEnum, FindingTypeEnum, FindingStatusEnum, CAPATypeEnum
|
|
)
|
|
|
|
|
|
class ISMSScopeRepository:
|
|
"""Repository for ISMS Scope (ISO 27001 Chapter 4.3)."""
|
|
|
|
def __init__(self, db: DBSession):
|
|
self.db = db
|
|
|
|
def create(
|
|
self,
|
|
scope_statement: str,
|
|
created_by: str,
|
|
included_locations: Optional[List[str]] = None,
|
|
included_processes: Optional[List[str]] = None,
|
|
included_services: Optional[List[str]] = None,
|
|
excluded_items: Optional[List[str]] = None,
|
|
exclusion_justification: Optional[str] = None,
|
|
organizational_boundary: Optional[str] = None,
|
|
physical_boundary: Optional[str] = None,
|
|
technical_boundary: Optional[str] = None,
|
|
) -> ISMSScopeDB:
|
|
"""Create a new ISMS scope definition."""
|
|
# Supersede existing scopes
|
|
existing = self.db.query(ISMSScopeDB).filter(
|
|
ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED
|
|
).all()
|
|
for s in existing:
|
|
s.status = ApprovalStatusEnum.SUPERSEDED
|
|
|
|
scope = ISMSScopeDB(
|
|
id=str(uuid.uuid4()),
|
|
scope_statement=scope_statement,
|
|
included_locations=included_locations,
|
|
included_processes=included_processes,
|
|
included_services=included_services,
|
|
excluded_items=excluded_items,
|
|
exclusion_justification=exclusion_justification,
|
|
organizational_boundary=organizational_boundary,
|
|
physical_boundary=physical_boundary,
|
|
technical_boundary=technical_boundary,
|
|
status=ApprovalStatusEnum.DRAFT,
|
|
created_by=created_by,
|
|
)
|
|
self.db.add(scope)
|
|
self.db.commit()
|
|
self.db.refresh(scope)
|
|
return scope
|
|
|
|
def get_current(self) -> Optional[ISMSScopeDB]:
|
|
"""Get the current (non-superseded) ISMS scope."""
|
|
return self.db.query(ISMSScopeDB).filter(
|
|
ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED
|
|
).order_by(ISMSScopeDB.created_at.desc()).first()
|
|
|
|
def get_by_id(self, scope_id: str) -> Optional[ISMSScopeDB]:
|
|
"""Get scope by ID."""
|
|
return self.db.query(ISMSScopeDB).filter(ISMSScopeDB.id == scope_id).first()
|
|
|
|
def approve(
|
|
self,
|
|
scope_id: str,
|
|
approved_by: str,
|
|
effective_date: date,
|
|
review_date: date,
|
|
) -> Optional[ISMSScopeDB]:
|
|
"""Approve the ISMS scope."""
|
|
scope = self.get_by_id(scope_id)
|
|
if not scope:
|
|
return None
|
|
|
|
import hashlib
|
|
scope.status = ApprovalStatusEnum.APPROVED
|
|
scope.approved_by = approved_by
|
|
scope.approved_at = datetime.utcnow()
|
|
scope.effective_date = effective_date
|
|
scope.review_date = review_date
|
|
scope.approval_signature = hashlib.sha256(
|
|
f"{scope.scope_statement}|{approved_by}|{datetime.utcnow().isoformat()}".encode()
|
|
).hexdigest()
|
|
|
|
self.db.commit()
|
|
self.db.refresh(scope)
|
|
return scope
|
|
|
|
|
|
class ISMSPolicyRepository:
|
|
"""Repository for ISMS Policies (ISO 27001 Chapter 5.2)."""
|
|
|
|
def __init__(self, db: DBSession):
|
|
self.db = db
|
|
|
|
def create(
|
|
self,
|
|
policy_id: str,
|
|
title: str,
|
|
policy_type: str,
|
|
authored_by: str,
|
|
description: Optional[str] = None,
|
|
policy_text: Optional[str] = None,
|
|
applies_to: Optional[List[str]] = None,
|
|
review_frequency_months: int = 12,
|
|
related_controls: Optional[List[str]] = None,
|
|
) -> ISMSPolicyDB:
|
|
"""Create a new ISMS policy."""
|
|
policy = ISMSPolicyDB(
|
|
id=str(uuid.uuid4()),
|
|
policy_id=policy_id,
|
|
title=title,
|
|
policy_type=policy_type,
|
|
description=description,
|
|
policy_text=policy_text,
|
|
applies_to=applies_to,
|
|
review_frequency_months=review_frequency_months,
|
|
related_controls=related_controls,
|
|
authored_by=authored_by,
|
|
status=ApprovalStatusEnum.DRAFT,
|
|
)
|
|
self.db.add(policy)
|
|
self.db.commit()
|
|
self.db.refresh(policy)
|
|
return policy
|
|
|
|
def get_by_id(self, policy_id: str) -> Optional[ISMSPolicyDB]:
|
|
"""Get policy by UUID or policy_id."""
|
|
return self.db.query(ISMSPolicyDB).filter(
|
|
(ISMSPolicyDB.id == policy_id) | (ISMSPolicyDB.policy_id == policy_id)
|
|
).first()
|
|
|
|
def get_all(
|
|
self,
|
|
policy_type: Optional[str] = None,
|
|
status: Optional[ApprovalStatusEnum] = None,
|
|
) -> List[ISMSPolicyDB]:
|
|
"""Get all policies with optional filters."""
|
|
query = self.db.query(ISMSPolicyDB)
|
|
if policy_type:
|
|
query = query.filter(ISMSPolicyDB.policy_type == policy_type)
|
|
if status:
|
|
query = query.filter(ISMSPolicyDB.status == status)
|
|
return query.order_by(ISMSPolicyDB.policy_id).all()
|
|
|
|
def get_master_policy(self) -> Optional[ISMSPolicyDB]:
|
|
"""Get the approved master ISMS policy."""
|
|
return self.db.query(ISMSPolicyDB).filter(
|
|
ISMSPolicyDB.policy_type == "master",
|
|
ISMSPolicyDB.status == ApprovalStatusEnum.APPROVED
|
|
).first()
|
|
|
|
def approve(
|
|
self,
|
|
policy_id: str,
|
|
approved_by: str,
|
|
reviewed_by: str,
|
|
effective_date: date,
|
|
) -> Optional[ISMSPolicyDB]:
|
|
"""Approve a policy."""
|
|
policy = self.get_by_id(policy_id)
|
|
if not policy:
|
|
return None
|
|
|
|
import hashlib
|
|
policy.status = ApprovalStatusEnum.APPROVED
|
|
policy.reviewed_by = reviewed_by
|
|
policy.approved_by = approved_by
|
|
policy.approved_at = datetime.utcnow()
|
|
policy.effective_date = effective_date
|
|
policy.next_review_date = date(
|
|
effective_date.year + (policy.review_frequency_months // 12),
|
|
effective_date.month,
|
|
effective_date.day
|
|
)
|
|
policy.approval_signature = hashlib.sha256(
|
|
f"{policy.policy_id}|{approved_by}|{datetime.utcnow().isoformat()}".encode()
|
|
).hexdigest()
|
|
|
|
self.db.commit()
|
|
self.db.refresh(policy)
|
|
return policy
|
|
|
|
|
|
class SecurityObjectiveRepository:
|
|
"""Repository for Security Objectives (ISO 27001 Chapter 6.2)."""
|
|
|
|
def __init__(self, db: DBSession):
|
|
self.db = db
|
|
|
|
def create(
|
|
self,
|
|
objective_id: str,
|
|
title: str,
|
|
description: str,
|
|
category: str,
|
|
owner: str,
|
|
kpi_name: Optional[str] = None,
|
|
kpi_target: Optional[float] = None,
|
|
kpi_unit: Optional[str] = None,
|
|
target_date: Optional[date] = None,
|
|
related_controls: Optional[List[str]] = None,
|
|
) -> SecurityObjectiveDB:
|
|
"""Create a new security objective."""
|
|
objective = SecurityObjectiveDB(
|
|
id=str(uuid.uuid4()),
|
|
objective_id=objective_id,
|
|
title=title,
|
|
description=description,
|
|
category=category,
|
|
kpi_name=kpi_name,
|
|
kpi_target=kpi_target,
|
|
kpi_unit=kpi_unit,
|
|
owner=owner,
|
|
target_date=target_date,
|
|
related_controls=related_controls,
|
|
status="active",
|
|
)
|
|
self.db.add(objective)
|
|
self.db.commit()
|
|
self.db.refresh(objective)
|
|
return objective
|
|
|
|
def get_by_id(self, objective_id: str) -> Optional[SecurityObjectiveDB]:
|
|
"""Get objective by UUID or objective_id."""
|
|
return self.db.query(SecurityObjectiveDB).filter(
|
|
(SecurityObjectiveDB.id == objective_id) |
|
|
(SecurityObjectiveDB.objective_id == objective_id)
|
|
).first()
|
|
|
|
def get_all(
|
|
self,
|
|
category: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
) -> List[SecurityObjectiveDB]:
|
|
"""Get all objectives with optional filters."""
|
|
query = self.db.query(SecurityObjectiveDB)
|
|
if category:
|
|
query = query.filter(SecurityObjectiveDB.category == category)
|
|
if status:
|
|
query = query.filter(SecurityObjectiveDB.status == status)
|
|
return query.order_by(SecurityObjectiveDB.objective_id).all()
|
|
|
|
def update_progress(
|
|
self,
|
|
objective_id: str,
|
|
kpi_current: float,
|
|
) -> Optional[SecurityObjectiveDB]:
|
|
"""Update objective progress."""
|
|
objective = self.get_by_id(objective_id)
|
|
if not objective:
|
|
return None
|
|
|
|
objective.kpi_current = kpi_current
|
|
if objective.kpi_target:
|
|
objective.progress_percentage = min(100, (kpi_current / objective.kpi_target) * 100)
|
|
|
|
# Auto-mark as achieved if 100%
|
|
if objective.progress_percentage >= 100 and objective.status == "active":
|
|
objective.status = "achieved"
|
|
objective.achieved_date = date.today()
|
|
|
|
self.db.commit()
|
|
self.db.refresh(objective)
|
|
return objective
|
|
|
|
|
|
class StatementOfApplicabilityRepository:
|
|
"""Repository for Statement of Applicability (SoA)."""
|
|
|
|
def __init__(self, db: DBSession):
|
|
self.db = db
|
|
|
|
def create(
|
|
self,
|
|
annex_a_control: str,
|
|
annex_a_title: str,
|
|
annex_a_category: str,
|
|
is_applicable: bool = True,
|
|
applicability_justification: Optional[str] = None,
|
|
implementation_status: str = "planned",
|
|
breakpilot_control_ids: Optional[List[str]] = None,
|
|
) -> StatementOfApplicabilityDB:
|
|
"""Create a new SoA entry."""
|
|
entry = StatementOfApplicabilityDB(
|
|
id=str(uuid.uuid4()),
|
|
annex_a_control=annex_a_control,
|
|
annex_a_title=annex_a_title,
|
|
annex_a_category=annex_a_category,
|
|
is_applicable=is_applicable,
|
|
applicability_justification=applicability_justification,
|
|
implementation_status=implementation_status,
|
|
breakpilot_control_ids=breakpilot_control_ids or [],
|
|
)
|
|
self.db.add(entry)
|
|
self.db.commit()
|
|
self.db.refresh(entry)
|
|
return entry
|
|
|
|
def get_by_control(self, annex_a_control: str) -> Optional[StatementOfApplicabilityDB]:
|
|
"""Get SoA entry by Annex A control ID (e.g., 'A.5.1')."""
|
|
return self.db.query(StatementOfApplicabilityDB).filter(
|
|
StatementOfApplicabilityDB.annex_a_control == annex_a_control
|
|
).first()
|
|
|
|
def get_all(
|
|
self,
|
|
is_applicable: Optional[bool] = None,
|
|
implementation_status: Optional[str] = None,
|
|
category: Optional[str] = None,
|
|
) -> List[StatementOfApplicabilityDB]:
|
|
"""Get all SoA entries with optional filters."""
|
|
query = self.db.query(StatementOfApplicabilityDB)
|
|
if is_applicable is not None:
|
|
query = query.filter(StatementOfApplicabilityDB.is_applicable == is_applicable)
|
|
if implementation_status:
|
|
query = query.filter(StatementOfApplicabilityDB.implementation_status == implementation_status)
|
|
if category:
|
|
query = query.filter(StatementOfApplicabilityDB.annex_a_category == category)
|
|
return query.order_by(StatementOfApplicabilityDB.annex_a_control).all()
|
|
|
|
def get_statistics(self) -> Dict[str, Any]:
|
|
"""Get SoA statistics."""
|
|
entries = self.get_all()
|
|
total = len(entries)
|
|
applicable = sum(1 for e in entries if e.is_applicable)
|
|
implemented = sum(1 for e in entries if e.implementation_status == "implemented")
|
|
approved = sum(1 for e in entries if e.approved_at)
|
|
|
|
return {
|
|
"total": total,
|
|
"applicable": applicable,
|
|
"not_applicable": total - applicable,
|
|
"implemented": implemented,
|
|
"planned": sum(1 for e in entries if e.implementation_status == "planned"),
|
|
"approved": approved,
|
|
"pending_approval": total - approved,
|
|
"implementation_rate": round((implemented / applicable * 100) if applicable > 0 else 0, 1),
|
|
}
|
|
|
|
|
|
class AuditFindingRepository:
|
|
"""Repository for Audit Findings (Major/Minor/OFI)."""
|
|
|
|
def __init__(self, db: DBSession):
|
|
self.db = db
|
|
|
|
def create(
|
|
self,
|
|
finding_type: FindingTypeEnum,
|
|
title: str,
|
|
description: str,
|
|
auditor: str,
|
|
iso_chapter: Optional[str] = None,
|
|
annex_a_control: Optional[str] = None,
|
|
objective_evidence: Optional[str] = None,
|
|
owner: Optional[str] = None,
|
|
due_date: Optional[date] = None,
|
|
internal_audit_id: Optional[str] = None,
|
|
) -> AuditFindingDB:
|
|
"""Create a new audit finding."""
|
|
# Generate finding ID
|
|
year = date.today().year
|
|
existing_count = self.db.query(AuditFindingDB).filter(
|
|
AuditFindingDB.finding_id.like(f"FIND-{year}-%")
|
|
).count()
|
|
finding_id = f"FIND-{year}-{existing_count + 1:03d}"
|
|
|
|
finding = AuditFindingDB(
|
|
id=str(uuid.uuid4()),
|
|
finding_id=finding_id,
|
|
finding_type=finding_type,
|
|
iso_chapter=iso_chapter,
|
|
annex_a_control=annex_a_control,
|
|
title=title,
|
|
description=description,
|
|
objective_evidence=objective_evidence,
|
|
owner=owner,
|
|
auditor=auditor,
|
|
due_date=due_date,
|
|
internal_audit_id=internal_audit_id,
|
|
status=FindingStatusEnum.OPEN,
|
|
)
|
|
self.db.add(finding)
|
|
self.db.commit()
|
|
self.db.refresh(finding)
|
|
return finding
|
|
|
|
def get_by_id(self, finding_id: str) -> Optional[AuditFindingDB]:
|
|
"""Get finding by UUID or finding_id."""
|
|
return self.db.query(AuditFindingDB).filter(
|
|
(AuditFindingDB.id == finding_id) | (AuditFindingDB.finding_id == finding_id)
|
|
).first()
|
|
|
|
def get_all(
|
|
self,
|
|
finding_type: Optional[FindingTypeEnum] = None,
|
|
status: Optional[FindingStatusEnum] = None,
|
|
internal_audit_id: Optional[str] = None,
|
|
) -> List[AuditFindingDB]:
|
|
"""Get all findings with optional filters."""
|
|
query = self.db.query(AuditFindingDB)
|
|
if finding_type:
|
|
query = query.filter(AuditFindingDB.finding_type == finding_type)
|
|
if status:
|
|
query = query.filter(AuditFindingDB.status == status)
|
|
if internal_audit_id:
|
|
query = query.filter(AuditFindingDB.internal_audit_id == internal_audit_id)
|
|
return query.order_by(AuditFindingDB.identified_date.desc()).all()
|
|
|
|
def get_open_majors(self) -> List[AuditFindingDB]:
|
|
"""Get all open major findings (blocking certification)."""
|
|
return self.db.query(AuditFindingDB).filter(
|
|
AuditFindingDB.finding_type == FindingTypeEnum.MAJOR,
|
|
AuditFindingDB.status != FindingStatusEnum.CLOSED
|
|
).all()
|
|
|
|
def get_statistics(self) -> Dict[str, Any]:
|
|
"""Get finding statistics."""
|
|
findings = self.get_all()
|
|
|
|
return {
|
|
"total": len(findings),
|
|
"major": sum(1 for f in findings if f.finding_type == FindingTypeEnum.MAJOR),
|
|
"minor": sum(1 for f in findings if f.finding_type == FindingTypeEnum.MINOR),
|
|
"ofi": sum(1 for f in findings if f.finding_type == FindingTypeEnum.OFI),
|
|
"positive": sum(1 for f in findings if f.finding_type == FindingTypeEnum.POSITIVE),
|
|
"open": sum(1 for f in findings if f.status != FindingStatusEnum.CLOSED),
|
|
"closed": sum(1 for f in findings if f.status == FindingStatusEnum.CLOSED),
|
|
"blocking_certification": sum(
|
|
1 for f in findings
|
|
if f.finding_type == FindingTypeEnum.MAJOR and f.status != FindingStatusEnum.CLOSED
|
|
),
|
|
}
|
|
|
|
def close(
|
|
self,
|
|
finding_id: str,
|
|
closed_by: str,
|
|
closure_notes: str,
|
|
verification_method: Optional[str] = None,
|
|
verification_evidence: Optional[str] = None,
|
|
) -> Optional[AuditFindingDB]:
|
|
"""Close a finding after verification."""
|
|
finding = self.get_by_id(finding_id)
|
|
if not finding:
|
|
return None
|
|
|
|
finding.status = FindingStatusEnum.CLOSED
|
|
finding.closed_date = date.today()
|
|
finding.closed_by = closed_by
|
|
finding.closure_notes = closure_notes
|
|
finding.verification_method = verification_method
|
|
finding.verification_evidence = verification_evidence
|
|
finding.verified_by = closed_by
|
|
finding.verified_at = datetime.utcnow()
|
|
|
|
self.db.commit()
|
|
self.db.refresh(finding)
|
|
return finding
|
|
|
|
|
|
class CorrectiveActionRepository:
|
|
"""Repository for Corrective/Preventive Actions (CAPA)."""
|
|
|
|
def __init__(self, db: DBSession):
|
|
self.db = db
|
|
|
|
def create(
|
|
self,
|
|
finding_id: str,
|
|
capa_type: CAPATypeEnum,
|
|
title: str,
|
|
description: str,
|
|
assigned_to: str,
|
|
planned_completion: date,
|
|
expected_outcome: Optional[str] = None,
|
|
effectiveness_criteria: Optional[str] = None,
|
|
) -> CorrectiveActionDB:
|
|
"""Create a new CAPA."""
|
|
# Generate CAPA ID
|
|
year = date.today().year
|
|
existing_count = self.db.query(CorrectiveActionDB).filter(
|
|
CorrectiveActionDB.capa_id.like(f"CAPA-{year}-%")
|
|
).count()
|
|
capa_id = f"CAPA-{year}-{existing_count + 1:03d}"
|
|
|
|
capa = CorrectiveActionDB(
|
|
id=str(uuid.uuid4()),
|
|
capa_id=capa_id,
|
|
finding_id=finding_id,
|
|
capa_type=capa_type,
|
|
title=title,
|
|
description=description,
|
|
expected_outcome=expected_outcome,
|
|
assigned_to=assigned_to,
|
|
planned_completion=planned_completion,
|
|
effectiveness_criteria=effectiveness_criteria,
|
|
status="planned",
|
|
)
|
|
self.db.add(capa)
|
|
|
|
# Update finding status
|
|
finding = self.db.query(AuditFindingDB).filter(AuditFindingDB.id == finding_id).first()
|
|
if finding:
|
|
finding.status = FindingStatusEnum.CORRECTIVE_ACTION_PENDING
|
|
|
|
self.db.commit()
|
|
self.db.refresh(capa)
|
|
return capa
|
|
|
|
def get_by_id(self, capa_id: str) -> Optional[CorrectiveActionDB]:
|
|
"""Get CAPA by UUID or capa_id."""
|
|
return self.db.query(CorrectiveActionDB).filter(
|
|
(CorrectiveActionDB.id == capa_id) | (CorrectiveActionDB.capa_id == capa_id)
|
|
).first()
|
|
|
|
def get_by_finding(self, finding_id: str) -> List[CorrectiveActionDB]:
|
|
"""Get all CAPAs for a finding."""
|
|
return self.db.query(CorrectiveActionDB).filter(
|
|
CorrectiveActionDB.finding_id == finding_id
|
|
).order_by(CorrectiveActionDB.planned_completion).all()
|
|
|
|
def verify(
|
|
self,
|
|
capa_id: str,
|
|
verified_by: str,
|
|
is_effective: bool,
|
|
effectiveness_notes: Optional[str] = None,
|
|
) -> Optional[CorrectiveActionDB]:
|
|
"""Verify a completed CAPA."""
|
|
capa = self.get_by_id(capa_id)
|
|
if not capa:
|
|
return None
|
|
|
|
capa.effectiveness_verified = is_effective
|
|
capa.effectiveness_verification_date = date.today()
|
|
capa.effectiveness_notes = effectiveness_notes
|
|
capa.status = "verified" if is_effective else "completed"
|
|
|
|
# If verified, check if all CAPAs for finding are verified
|
|
if is_effective:
|
|
finding = self.db.query(AuditFindingDB).filter(
|
|
AuditFindingDB.id == capa.finding_id
|
|
).first()
|
|
if finding:
|
|
unverified = self.db.query(CorrectiveActionDB).filter(
|
|
CorrectiveActionDB.finding_id == finding.id,
|
|
CorrectiveActionDB.id != capa.id,
|
|
CorrectiveActionDB.status != "verified"
|
|
).count()
|
|
if unverified == 0:
|
|
finding.status = FindingStatusEnum.VERIFICATION_PENDING
|
|
|
|
self.db.commit()
|
|
self.db.refresh(capa)
|
|
return capa
|
|
|
|
|
|
class ManagementReviewRepository:
|
|
"""Repository for Management Reviews (ISO 27001 Chapter 9.3)."""
|
|
|
|
def __init__(self, db: DBSession):
|
|
self.db = db
|
|
|
|
def create(
|
|
self,
|
|
title: str,
|
|
review_date: date,
|
|
chairperson: str,
|
|
review_period_start: Optional[date] = None,
|
|
review_period_end: Optional[date] = None,
|
|
) -> ManagementReviewDB:
|
|
"""Create a new management review."""
|
|
# Generate review ID
|
|
year = review_date.year
|
|
quarter = (review_date.month - 1) // 3 + 1
|
|
review_id = f"MR-{year}-Q{quarter}"
|
|
|
|
# Check for duplicate
|
|
existing = self.db.query(ManagementReviewDB).filter(
|
|
ManagementReviewDB.review_id == review_id
|
|
).first()
|
|
if existing:
|
|
review_id = f"{review_id}-{str(uuid.uuid4())[:4]}"
|
|
|
|
review = ManagementReviewDB(
|
|
id=str(uuid.uuid4()),
|
|
review_id=review_id,
|
|
title=title,
|
|
review_date=review_date,
|
|
review_period_start=review_period_start,
|
|
review_period_end=review_period_end,
|
|
chairperson=chairperson,
|
|
status="draft",
|
|
)
|
|
self.db.add(review)
|
|
self.db.commit()
|
|
self.db.refresh(review)
|
|
return review
|
|
|
|
def get_by_id(self, review_id: str) -> Optional[ManagementReviewDB]:
|
|
"""Get review by UUID or review_id."""
|
|
return self.db.query(ManagementReviewDB).filter(
|
|
(ManagementReviewDB.id == review_id) | (ManagementReviewDB.review_id == review_id)
|
|
).first()
|
|
|
|
def get_latest_approved(self) -> Optional[ManagementReviewDB]:
|
|
"""Get the most recent approved management review."""
|
|
return self.db.query(ManagementReviewDB).filter(
|
|
ManagementReviewDB.status == "approved"
|
|
).order_by(ManagementReviewDB.review_date.desc()).first()
|
|
|
|
def approve(
|
|
self,
|
|
review_id: str,
|
|
approved_by: str,
|
|
next_review_date: date,
|
|
minutes_document_path: Optional[str] = None,
|
|
) -> Optional[ManagementReviewDB]:
|
|
"""Approve a management review."""
|
|
review = self.get_by_id(review_id)
|
|
if not review:
|
|
return None
|
|
|
|
review.status = "approved"
|
|
review.approved_by = approved_by
|
|
review.approved_at = datetime.utcnow()
|
|
review.next_review_date = next_review_date
|
|
review.minutes_document_path = minutes_document_path
|
|
|
|
self.db.commit()
|
|
self.db.refresh(review)
|
|
return review
|
|
|
|
|
|
class InternalAuditRepository:
|
|
"""Repository for Internal Audits (ISO 27001 Chapter 9.2)."""
|
|
|
|
def __init__(self, db: DBSession):
|
|
self.db = db
|
|
|
|
def create(
|
|
self,
|
|
title: str,
|
|
audit_type: str,
|
|
planned_date: date,
|
|
lead_auditor: str,
|
|
scope_description: Optional[str] = None,
|
|
iso_chapters_covered: Optional[List[str]] = None,
|
|
annex_a_controls_covered: Optional[List[str]] = None,
|
|
) -> InternalAuditDB:
|
|
"""Create a new internal audit."""
|
|
# Generate audit ID
|
|
year = planned_date.year
|
|
existing_count = self.db.query(InternalAuditDB).filter(
|
|
InternalAuditDB.audit_id.like(f"IA-{year}-%")
|
|
).count()
|
|
audit_id = f"IA-{year}-{existing_count + 1:03d}"
|
|
|
|
audit = InternalAuditDB(
|
|
id=str(uuid.uuid4()),
|
|
audit_id=audit_id,
|
|
title=title,
|
|
audit_type=audit_type,
|
|
scope_description=scope_description,
|
|
iso_chapters_covered=iso_chapters_covered,
|
|
annex_a_controls_covered=annex_a_controls_covered,
|
|
planned_date=planned_date,
|
|
lead_auditor=lead_auditor,
|
|
status="planned",
|
|
)
|
|
self.db.add(audit)
|
|
self.db.commit()
|
|
self.db.refresh(audit)
|
|
return audit
|
|
|
|
def get_by_id(self, audit_id: str) -> Optional[InternalAuditDB]:
|
|
"""Get audit by UUID or audit_id."""
|
|
return self.db.query(InternalAuditDB).filter(
|
|
(InternalAuditDB.id == audit_id) | (InternalAuditDB.audit_id == audit_id)
|
|
).first()
|
|
|
|
def get_latest_completed(self) -> Optional[InternalAuditDB]:
|
|
"""Get the most recent completed internal audit."""
|
|
return self.db.query(InternalAuditDB).filter(
|
|
InternalAuditDB.status == "completed"
|
|
).order_by(InternalAuditDB.actual_end_date.desc()).first()
|
|
|
|
def complete(
|
|
self,
|
|
audit_id: str,
|
|
audit_conclusion: str,
|
|
overall_assessment: str,
|
|
follow_up_audit_required: bool = False,
|
|
) -> Optional[InternalAuditDB]:
|
|
"""Complete an internal audit."""
|
|
audit = self.get_by_id(audit_id)
|
|
if not audit:
|
|
return None
|
|
|
|
audit.status = "completed"
|
|
audit.actual_end_date = date.today()
|
|
audit.report_date = date.today()
|
|
audit.audit_conclusion = audit_conclusion
|
|
audit.overall_assessment = overall_assessment
|
|
audit.follow_up_audit_required = follow_up_audit_required
|
|
|
|
self.db.commit()
|
|
self.db.refresh(audit)
|
|
return audit
|
|
|
|
|
|
class AuditTrailRepository:
|
|
"""Repository for Audit Trail entries."""
|
|
|
|
def __init__(self, db: DBSession):
|
|
self.db = db
|
|
|
|
def log(
|
|
self,
|
|
entity_type: str,
|
|
entity_id: str,
|
|
entity_name: str,
|
|
action: str,
|
|
performed_by: str,
|
|
field_changed: Optional[str] = None,
|
|
old_value: Optional[str] = None,
|
|
new_value: Optional[str] = None,
|
|
change_summary: Optional[str] = None,
|
|
) -> AuditTrailDB:
|
|
"""Log an audit trail entry."""
|
|
import hashlib
|
|
entry = AuditTrailDB(
|
|
id=str(uuid.uuid4()),
|
|
entity_type=entity_type,
|
|
entity_id=entity_id,
|
|
entity_name=entity_name,
|
|
action=action,
|
|
field_changed=field_changed,
|
|
old_value=old_value,
|
|
new_value=new_value,
|
|
change_summary=change_summary,
|
|
performed_by=performed_by,
|
|
performed_at=datetime.utcnow(),
|
|
checksum=hashlib.sha256(
|
|
f"{entity_type}|{entity_id}|{action}|{performed_by}".encode()
|
|
).hexdigest(),
|
|
)
|
|
self.db.add(entry)
|
|
self.db.commit()
|
|
self.db.refresh(entry)
|
|
return entry
|
|
|
|
def get_by_entity(
|
|
self,
|
|
entity_type: str,
|
|
entity_id: str,
|
|
limit: int = 100,
|
|
) -> List[AuditTrailDB]:
|
|
"""Get audit trail for a specific entity."""
|
|
return self.db.query(AuditTrailDB).filter(
|
|
AuditTrailDB.entity_type == entity_type,
|
|
AuditTrailDB.entity_id == entity_id
|
|
).order_by(AuditTrailDB.performed_at.desc()).limit(limit).all()
|
|
|
|
def get_paginated(
|
|
self,
|
|
page: int = 1,
|
|
page_size: int = 50,
|
|
entity_type: Optional[str] = None,
|
|
entity_id: Optional[str] = None,
|
|
performed_by: Optional[str] = None,
|
|
action: Optional[str] = None,
|
|
) -> Tuple[List[AuditTrailDB], int]:
|
|
"""Get paginated audit trail with filters."""
|
|
query = self.db.query(AuditTrailDB)
|
|
|
|
if entity_type:
|
|
query = query.filter(AuditTrailDB.entity_type == entity_type)
|
|
if entity_id:
|
|
query = query.filter(AuditTrailDB.entity_id == entity_id)
|
|
if performed_by:
|
|
query = query.filter(AuditTrailDB.performed_by == performed_by)
|
|
if action:
|
|
query = query.filter(AuditTrailDB.action == action)
|
|
|
|
total = query.count()
|
|
entries = query.order_by(AuditTrailDB.performed_at.desc()).offset(
|
|
(page - 1) * page_size
|
|
).limit(page_size).all()
|
|
|
|
return entries, total
|
|
|
|
|
|
class ISMSReadinessCheckRepository:
|
|
"""Repository for ISMS Readiness Check results."""
|
|
|
|
def __init__(self, db: DBSession):
|
|
self.db = db
|
|
|
|
def save(self, check: ISMSReadinessCheckDB) -> ISMSReadinessCheckDB:
|
|
"""Save a readiness check result."""
|
|
self.db.add(check)
|
|
self.db.commit()
|
|
self.db.refresh(check)
|
|
return check
|
|
|
|
def get_latest(self) -> Optional[ISMSReadinessCheckDB]:
|
|
"""Get the most recent readiness check."""
|
|
return self.db.query(ISMSReadinessCheckDB).order_by(
|
|
ISMSReadinessCheckDB.check_date.desc()
|
|
).first()
|
|
|
|
def get_history(self, limit: int = 10) -> List[ISMSReadinessCheckDB]:
|
|
"""Get readiness check history."""
|
|
return self.db.query(ISMSReadinessCheckDB).order_by(
|
|
ISMSReadinessCheckDB.check_date.desc()
|
|
).limit(limit).all()
|