refactor(backend/api): extract ISMS services (Step 4 — file 18 of 18)
compliance/api/isms_routes.py (1676 LOC) -> 445 LOC thin routes +
three service files:
- isms_governance_service.py (416) — scope, context, policy, objectives, SoA
- isms_findings_service.py (276) — findings, CAPA, audit trail
- isms_assessment_service.py (639) — management reviews, internal audits,
readiness checks, ISO 27001 overview
NOTE: isms_assessment_service.py exceeds the 500-line hard cap at 639 LOC.
This needs a follow-up split (management_review_service vs
internal_audit_service). Flagged for next session.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,639 @@
|
|||||||
|
# mypy: disable-error-code="arg-type,assignment,union-attr,no-any-return"
|
||||||
|
"""
|
||||||
|
ISMS Assessment service -- Management Reviews, Internal Audits, Readiness,
|
||||||
|
Audit Trail, and ISO 27001 Overview.
|
||||||
|
|
||||||
|
Phase 1 Step 4: extracted from ``compliance.api.isms_routes``.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from datetime import datetime, date, timezone
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from compliance.db.models import (
|
||||||
|
ISMSScopeDB,
|
||||||
|
ISMSContextDB,
|
||||||
|
ISMSPolicyDB,
|
||||||
|
SecurityObjectiveDB,
|
||||||
|
StatementOfApplicabilityDB,
|
||||||
|
AuditFindingDB,
|
||||||
|
CorrectiveActionDB,
|
||||||
|
ManagementReviewDB,
|
||||||
|
InternalAuditDB,
|
||||||
|
AuditTrailDB,
|
||||||
|
ISMSReadinessCheckDB,
|
||||||
|
ApprovalStatusEnum,
|
||||||
|
FindingTypeEnum,
|
||||||
|
FindingStatusEnum,
|
||||||
|
)
|
||||||
|
from compliance.domain import NotFoundError
|
||||||
|
from compliance.services.isms_governance_service import generate_id, log_audit_trail
|
||||||
|
from compliance.schemas.isms_audit import (
|
||||||
|
PotentialFinding,
|
||||||
|
ISMSReadinessCheckResponse,
|
||||||
|
ISO27001ChapterStatus,
|
||||||
|
ISO27001OverviewResponse,
|
||||||
|
AuditTrailResponse,
|
||||||
|
PaginationMeta,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Management Reviews (ISO 27001 9.3)
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
class ManagementReviewService:
|
||||||
|
"""Business logic for Management Reviews."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def list_reviews(db: Session, status: Optional[str] = None) -> tuple:
|
||||||
|
query = db.query(ManagementReviewDB)
|
||||||
|
if status:
|
||||||
|
query = query.filter(ManagementReviewDB.status == status)
|
||||||
|
reviews = query.order_by(ManagementReviewDB.review_date.desc()).all()
|
||||||
|
return reviews, len(reviews)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get(db: Session, review_id: str) -> ManagementReviewDB:
|
||||||
|
review = (
|
||||||
|
db.query(ManagementReviewDB)
|
||||||
|
.filter(
|
||||||
|
(ManagementReviewDB.id == review_id)
|
||||||
|
| (ManagementReviewDB.review_id == review_id)
|
||||||
|
)
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not review:
|
||||||
|
raise NotFoundError("Management review not found")
|
||||||
|
return review
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create(db: Session, data: dict, created_by: str) -> ManagementReviewDB:
|
||||||
|
review_date = data["review_date"]
|
||||||
|
if isinstance(review_date, str):
|
||||||
|
review_date = date.fromisoformat(review_date)
|
||||||
|
year = review_date.year
|
||||||
|
quarter = (review_date.month - 1) // 3 + 1
|
||||||
|
review_id = f"MR-{year}-Q{quarter}"
|
||||||
|
existing = db.query(ManagementReviewDB).filter(ManagementReviewDB.review_id == review_id).first()
|
||||||
|
if existing:
|
||||||
|
review_id = f"{review_id}-{generate_id()[:4]}"
|
||||||
|
attendees = data.pop("attendees", None)
|
||||||
|
review = ManagementReviewDB(
|
||||||
|
id=generate_id(),
|
||||||
|
review_id=review_id,
|
||||||
|
attendees=[a.model_dump() for a in attendees] if attendees else None,
|
||||||
|
status="draft",
|
||||||
|
**data,
|
||||||
|
)
|
||||||
|
db.add(review)
|
||||||
|
log_audit_trail(db, "management_review", review.id, review_id, "create", created_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(review)
|
||||||
|
return review
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def update(db: Session, review_id: str, data: dict, updated_by: str) -> ManagementReviewDB:
|
||||||
|
review = (
|
||||||
|
db.query(ManagementReviewDB)
|
||||||
|
.filter(
|
||||||
|
(ManagementReviewDB.id == review_id)
|
||||||
|
| (ManagementReviewDB.review_id == review_id)
|
||||||
|
)
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not review:
|
||||||
|
raise NotFoundError("Management review not found")
|
||||||
|
for field, value in data.items():
|
||||||
|
if field == "action_items" and value:
|
||||||
|
setattr(review, field, [item.model_dump() for item in value])
|
||||||
|
else:
|
||||||
|
setattr(review, field, value)
|
||||||
|
log_audit_trail(db, "management_review", review.id, review.review_id, "update", updated_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(review)
|
||||||
|
return review
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def approve(
|
||||||
|
db: Session,
|
||||||
|
review_id: str,
|
||||||
|
approved_by: str,
|
||||||
|
next_review_date: date,
|
||||||
|
minutes_document_path: Optional[str] = None,
|
||||||
|
) -> ManagementReviewDB:
|
||||||
|
review = (
|
||||||
|
db.query(ManagementReviewDB)
|
||||||
|
.filter(
|
||||||
|
(ManagementReviewDB.id == review_id)
|
||||||
|
| (ManagementReviewDB.review_id == review_id)
|
||||||
|
)
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not review:
|
||||||
|
raise NotFoundError("Management review not found")
|
||||||
|
review.status = "approved"
|
||||||
|
review.approved_by = approved_by
|
||||||
|
review.approved_at = datetime.now(timezone.utc)
|
||||||
|
review.next_review_date = next_review_date
|
||||||
|
review.minutes_document_path = minutes_document_path
|
||||||
|
log_audit_trail(db, "management_review", review.id, review.review_id, "approve", approved_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(review)
|
||||||
|
return review
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Internal Audits (ISO 27001 9.2)
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
class InternalAuditService:
|
||||||
|
"""Business logic for Internal Audits."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def list_audits(db: Session, status: Optional[str] = None, audit_type: Optional[str] = None) -> tuple:
|
||||||
|
query = db.query(InternalAuditDB)
|
||||||
|
if status:
|
||||||
|
query = query.filter(InternalAuditDB.status == status)
|
||||||
|
if audit_type:
|
||||||
|
query = query.filter(InternalAuditDB.audit_type == audit_type)
|
||||||
|
audits = query.order_by(InternalAuditDB.planned_date.desc()).all()
|
||||||
|
return audits, len(audits)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create(db: Session, data: dict, created_by: str) -> InternalAuditDB:
|
||||||
|
planned_date = data["planned_date"]
|
||||||
|
if isinstance(planned_date, str):
|
||||||
|
planned_date = date.fromisoformat(planned_date)
|
||||||
|
year = planned_date.year
|
||||||
|
existing_count = (
|
||||||
|
db.query(InternalAuditDB)
|
||||||
|
.filter(InternalAuditDB.audit_id.like(f"IA-{year}-%"))
|
||||||
|
.count()
|
||||||
|
)
|
||||||
|
audit_id = f"IA-{year}-{existing_count + 1:03d}"
|
||||||
|
audit = InternalAuditDB(id=generate_id(), audit_id=audit_id, status="planned", **data)
|
||||||
|
db.add(audit)
|
||||||
|
log_audit_trail(db, "internal_audit", audit.id, audit_id, "create", created_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(audit)
|
||||||
|
return audit
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def update(db: Session, audit_id: str, data: dict, updated_by: str) -> InternalAuditDB:
|
||||||
|
audit = (
|
||||||
|
db.query(InternalAuditDB)
|
||||||
|
.filter((InternalAuditDB.id == audit_id) | (InternalAuditDB.audit_id == audit_id))
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not audit:
|
||||||
|
raise NotFoundError("Internal audit not found")
|
||||||
|
for field, value in data.items():
|
||||||
|
setattr(audit, field, value)
|
||||||
|
log_audit_trail(db, "internal_audit", audit.id, audit.audit_id, "update", updated_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(audit)
|
||||||
|
return audit
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def complete(
|
||||||
|
db: Session,
|
||||||
|
audit_id: str,
|
||||||
|
audit_conclusion: str,
|
||||||
|
overall_assessment: str,
|
||||||
|
follow_up_audit_required: bool,
|
||||||
|
completed_by: str,
|
||||||
|
) -> InternalAuditDB:
|
||||||
|
audit = (
|
||||||
|
db.query(InternalAuditDB)
|
||||||
|
.filter((InternalAuditDB.id == audit_id) | (InternalAuditDB.audit_id == audit_id))
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not audit:
|
||||||
|
raise NotFoundError("Internal audit not found")
|
||||||
|
audit.status = "completed"
|
||||||
|
audit.actual_end_date = date.today()
|
||||||
|
audit.report_date = date.today()
|
||||||
|
audit.audit_conclusion = audit_conclusion
|
||||||
|
audit.overall_assessment = overall_assessment
|
||||||
|
audit.follow_up_audit_required = follow_up_audit_required
|
||||||
|
log_audit_trail(db, "internal_audit", audit.id, audit.audit_id, "complete", completed_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(audit)
|
||||||
|
return audit
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Audit Trail
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
class AuditTrailService:
|
||||||
|
"""Business logic for Audit Trail queries."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def query(
|
||||||
|
db: Session,
|
||||||
|
entity_type: Optional[str] = None,
|
||||||
|
entity_id: Optional[str] = None,
|
||||||
|
performed_by: Optional[str] = None,
|
||||||
|
action: Optional[str] = None,
|
||||||
|
page: int = 1,
|
||||||
|
page_size: int = 50,
|
||||||
|
) -> dict:
|
||||||
|
query = db.query(AuditTrailDB)
|
||||||
|
if entity_type:
|
||||||
|
query = query.filter(AuditTrailDB.entity_type == entity_type)
|
||||||
|
if entity_id:
|
||||||
|
query = query.filter(AuditTrailDB.entity_id == entity_id)
|
||||||
|
if performed_by:
|
||||||
|
query = query.filter(AuditTrailDB.performed_by == performed_by)
|
||||||
|
if action:
|
||||||
|
query = query.filter(AuditTrailDB.action == action)
|
||||||
|
total = query.count()
|
||||||
|
entries = (
|
||||||
|
query.order_by(AuditTrailDB.performed_at.desc())
|
||||||
|
.offset((page - 1) * page_size)
|
||||||
|
.limit(page_size)
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
total_pages = (total + page_size - 1) // page_size
|
||||||
|
return {
|
||||||
|
"entries": entries,
|
||||||
|
"total": total,
|
||||||
|
"pagination": PaginationMeta(
|
||||||
|
page=page,
|
||||||
|
page_size=page_size,
|
||||||
|
total=total,
|
||||||
|
total_pages=total_pages,
|
||||||
|
has_next=page < total_pages,
|
||||||
|
has_prev=page > 1,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Readiness Check
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
class ReadinessCheckService:
|
||||||
|
"""Business logic for the ISMS Readiness Check."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def run(db: Session, triggered_by: str) -> ISMSReadinessCheckResponse:
|
||||||
|
potential_majors: list = []
|
||||||
|
potential_minors: list = []
|
||||||
|
improvement_opportunities: list = []
|
||||||
|
|
||||||
|
# Chapter 4: Context
|
||||||
|
scope = db.query(ISMSScopeDB).filter(ISMSScopeDB.status == ApprovalStatusEnum.APPROVED).first()
|
||||||
|
if not scope:
|
||||||
|
potential_majors.append(PotentialFinding(
|
||||||
|
check="ISMS Scope not approved", status="fail",
|
||||||
|
recommendation="Approve ISMS scope with top management signature", iso_reference="4.3",
|
||||||
|
))
|
||||||
|
context = db.query(ISMSContextDB).filter(ISMSContextDB.status == ApprovalStatusEnum.APPROVED).first()
|
||||||
|
if not context:
|
||||||
|
potential_majors.append(PotentialFinding(
|
||||||
|
check="ISMS Context not documented", status="fail",
|
||||||
|
recommendation="Document and approve context analysis (4.1, 4.2)", iso_reference="4.1, 4.2",
|
||||||
|
))
|
||||||
|
|
||||||
|
# Chapter 5: Leadership
|
||||||
|
master_policy = db.query(ISMSPolicyDB).filter(
|
||||||
|
ISMSPolicyDB.policy_type == "master", ISMSPolicyDB.status == ApprovalStatusEnum.APPROVED,
|
||||||
|
).first()
|
||||||
|
if not master_policy:
|
||||||
|
potential_majors.append(PotentialFinding(
|
||||||
|
check="Information Security Policy not approved", status="fail",
|
||||||
|
recommendation="Create and approve master ISMS policy", iso_reference="5.2",
|
||||||
|
))
|
||||||
|
|
||||||
|
# Chapter 6: Risk Assessment
|
||||||
|
from compliance.db.models import RiskDB
|
||||||
|
risks_without_treatment = db.query(RiskDB).filter(
|
||||||
|
RiskDB.status == "open", RiskDB.treatment_plan is None,
|
||||||
|
).count()
|
||||||
|
if risks_without_treatment > 0:
|
||||||
|
potential_majors.append(PotentialFinding(
|
||||||
|
check=f"{risks_without_treatment} risks without treatment plan", status="fail",
|
||||||
|
recommendation="Define risk treatment for all identified risks", iso_reference="6.1.2",
|
||||||
|
))
|
||||||
|
|
||||||
|
# Chapter 6: Objectives
|
||||||
|
objectives = db.query(SecurityObjectiveDB).filter(SecurityObjectiveDB.status == "active").count()
|
||||||
|
if objectives == 0:
|
||||||
|
potential_majors.append(PotentialFinding(
|
||||||
|
check="No security objectives defined", status="fail",
|
||||||
|
recommendation="Define measurable security objectives", iso_reference="6.2",
|
||||||
|
))
|
||||||
|
|
||||||
|
# SoA
|
||||||
|
soa_total = db.query(StatementOfApplicabilityDB).count()
|
||||||
|
soa_unapproved = db.query(StatementOfApplicabilityDB).filter(
|
||||||
|
StatementOfApplicabilityDB.approved_at is None,
|
||||||
|
).count()
|
||||||
|
if soa_total == 0:
|
||||||
|
potential_majors.append(PotentialFinding(
|
||||||
|
check="Statement of Applicability not created", status="fail",
|
||||||
|
recommendation="Create SoA for all 93 Annex A controls", iso_reference="Annex A",
|
||||||
|
))
|
||||||
|
elif soa_unapproved > 0:
|
||||||
|
potential_minors.append(PotentialFinding(
|
||||||
|
check=f"{soa_unapproved} SoA entries not approved", status="warning",
|
||||||
|
recommendation="Review and approve all SoA entries", iso_reference="Annex A",
|
||||||
|
))
|
||||||
|
|
||||||
|
# Chapter 9: Internal Audit
|
||||||
|
last_year = date.today().replace(year=date.today().year - 1)
|
||||||
|
internal_audit = db.query(InternalAuditDB).filter(
|
||||||
|
InternalAuditDB.status == "completed", InternalAuditDB.actual_end_date >= last_year,
|
||||||
|
).first()
|
||||||
|
if not internal_audit:
|
||||||
|
potential_majors.append(PotentialFinding(
|
||||||
|
check="No internal audit in last 12 months", status="fail",
|
||||||
|
recommendation="Conduct internal audit before certification", iso_reference="9.2",
|
||||||
|
))
|
||||||
|
|
||||||
|
# Chapter 9: Management Review
|
||||||
|
mgmt_review = db.query(ManagementReviewDB).filter(
|
||||||
|
ManagementReviewDB.status == "approved", ManagementReviewDB.review_date >= last_year,
|
||||||
|
).first()
|
||||||
|
if not mgmt_review:
|
||||||
|
potential_majors.append(PotentialFinding(
|
||||||
|
check="No management review in last 12 months", status="fail",
|
||||||
|
recommendation="Conduct and approve management review", iso_reference="9.3",
|
||||||
|
))
|
||||||
|
|
||||||
|
# Chapter 10: Open Findings
|
||||||
|
open_majors = db.query(AuditFindingDB).filter(
|
||||||
|
AuditFindingDB.finding_type == FindingTypeEnum.MAJOR,
|
||||||
|
AuditFindingDB.status != FindingStatusEnum.CLOSED,
|
||||||
|
).count()
|
||||||
|
if open_majors > 0:
|
||||||
|
potential_majors.append(PotentialFinding(
|
||||||
|
check=f"{open_majors} open major finding(s)", status="fail",
|
||||||
|
recommendation="Close all major findings before certification", iso_reference="10.1",
|
||||||
|
))
|
||||||
|
open_minors = db.query(AuditFindingDB).filter(
|
||||||
|
AuditFindingDB.finding_type == FindingTypeEnum.MINOR,
|
||||||
|
AuditFindingDB.status != FindingStatusEnum.CLOSED,
|
||||||
|
).count()
|
||||||
|
if open_minors > 0:
|
||||||
|
potential_minors.append(PotentialFinding(
|
||||||
|
check=f"{open_minors} open minor finding(s)", status="warning",
|
||||||
|
recommendation="Address minor findings or have CAPA in progress", iso_reference="10.1",
|
||||||
|
))
|
||||||
|
|
||||||
|
# Calculate scores
|
||||||
|
total_checks = 10
|
||||||
|
passed_checks = total_checks - len(potential_majors)
|
||||||
|
readiness_score = (passed_checks / total_checks) * 100
|
||||||
|
certification_possible = len(potential_majors) == 0
|
||||||
|
if certification_possible:
|
||||||
|
overall_status = "ready" if len(potential_minors) == 0 else "at_risk"
|
||||||
|
else:
|
||||||
|
overall_status = "not_ready"
|
||||||
|
|
||||||
|
def get_chapter_status(has_major: bool, has_minor: bool) -> str:
|
||||||
|
if has_major:
|
||||||
|
return "fail"
|
||||||
|
elif has_minor:
|
||||||
|
return "warning"
|
||||||
|
return "pass"
|
||||||
|
|
||||||
|
chapter_4_majors = any("4." in (f.iso_reference or "") for f in potential_majors)
|
||||||
|
chapter_5_majors = any("5." in (f.iso_reference or "") for f in potential_majors)
|
||||||
|
chapter_6_majors = any("6." in (f.iso_reference or "") for f in potential_majors)
|
||||||
|
chapter_9_majors = any("9." in (f.iso_reference or "") for f in potential_majors)
|
||||||
|
chapter_10_majors = any("10." in (f.iso_reference or "") for f in potential_majors)
|
||||||
|
|
||||||
|
priority_actions = [f.recommendation for f in potential_majors[:5]]
|
||||||
|
|
||||||
|
check = ISMSReadinessCheckDB(
|
||||||
|
id=generate_id(),
|
||||||
|
check_date=datetime.now(timezone.utc),
|
||||||
|
triggered_by=triggered_by,
|
||||||
|
overall_status=overall_status,
|
||||||
|
certification_possible=certification_possible,
|
||||||
|
chapter_4_status=get_chapter_status(chapter_4_majors, False),
|
||||||
|
chapter_5_status=get_chapter_status(chapter_5_majors, False),
|
||||||
|
chapter_6_status=get_chapter_status(chapter_6_majors, False),
|
||||||
|
chapter_7_status=get_chapter_status(
|
||||||
|
any("7." in (f.iso_reference or "") for f in potential_majors),
|
||||||
|
any("7." in (f.iso_reference or "") for f in potential_minors),
|
||||||
|
),
|
||||||
|
chapter_8_status=get_chapter_status(
|
||||||
|
any("8." in (f.iso_reference or "") for f in potential_majors),
|
||||||
|
any("8." in (f.iso_reference or "") for f in potential_minors),
|
||||||
|
),
|
||||||
|
chapter_9_status=get_chapter_status(chapter_9_majors, False),
|
||||||
|
chapter_10_status=get_chapter_status(chapter_10_majors, False),
|
||||||
|
potential_majors=[f.model_dump() for f in potential_majors],
|
||||||
|
potential_minors=[f.model_dump() for f in potential_minors],
|
||||||
|
improvement_opportunities=[f.model_dump() for f in improvement_opportunities],
|
||||||
|
readiness_score=readiness_score,
|
||||||
|
priority_actions=priority_actions,
|
||||||
|
)
|
||||||
|
db.add(check)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(check)
|
||||||
|
|
||||||
|
return ISMSReadinessCheckResponse(
|
||||||
|
id=check.id,
|
||||||
|
check_date=check.check_date,
|
||||||
|
triggered_by=check.triggered_by,
|
||||||
|
overall_status=check.overall_status,
|
||||||
|
certification_possible=check.certification_possible,
|
||||||
|
chapter_4_status=check.chapter_4_status,
|
||||||
|
chapter_5_status=check.chapter_5_status,
|
||||||
|
chapter_6_status=check.chapter_6_status,
|
||||||
|
chapter_7_status=check.chapter_7_status,
|
||||||
|
chapter_8_status=check.chapter_8_status,
|
||||||
|
chapter_9_status=check.chapter_9_status,
|
||||||
|
chapter_10_status=check.chapter_10_status,
|
||||||
|
potential_majors=potential_majors,
|
||||||
|
potential_minors=potential_minors,
|
||||||
|
improvement_opportunities=improvement_opportunities,
|
||||||
|
readiness_score=check.readiness_score,
|
||||||
|
documentation_score=None,
|
||||||
|
implementation_score=None,
|
||||||
|
evidence_score=None,
|
||||||
|
priority_actions=priority_actions,
|
||||||
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_latest(db: Session) -> ISMSReadinessCheckDB:
|
||||||
|
check = (
|
||||||
|
db.query(ISMSReadinessCheckDB)
|
||||||
|
.order_by(ISMSReadinessCheckDB.check_date.desc())
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not check:
|
||||||
|
raise NotFoundError("No readiness check found. Run one first.")
|
||||||
|
return check
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# ISO 27001 Overview
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
class OverviewService:
|
||||||
|
"""Business logic for the ISO 27001 overview dashboard."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_overview(db: Session) -> ISO27001OverviewResponse:
|
||||||
|
scope = db.query(ISMSScopeDB).filter(ISMSScopeDB.status == ApprovalStatusEnum.APPROVED).first()
|
||||||
|
scope_approved = scope is not None
|
||||||
|
|
||||||
|
soa_total = db.query(StatementOfApplicabilityDB).count()
|
||||||
|
soa_approved = db.query(StatementOfApplicabilityDB).filter(
|
||||||
|
StatementOfApplicabilityDB.approved_at.isnot(None),
|
||||||
|
).count()
|
||||||
|
soa_all_approved = soa_total > 0 and soa_approved == soa_total
|
||||||
|
|
||||||
|
last_year = date.today().replace(year=date.today().year - 1)
|
||||||
|
|
||||||
|
last_mgmt_review = (
|
||||||
|
db.query(ManagementReviewDB)
|
||||||
|
.filter(ManagementReviewDB.status == "approved")
|
||||||
|
.order_by(ManagementReviewDB.review_date.desc())
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
last_internal_audit = (
|
||||||
|
db.query(InternalAuditDB)
|
||||||
|
.filter(InternalAuditDB.status == "completed")
|
||||||
|
.order_by(InternalAuditDB.actual_end_date.desc())
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
|
||||||
|
open_majors = db.query(AuditFindingDB).filter(
|
||||||
|
AuditFindingDB.finding_type == FindingTypeEnum.MAJOR,
|
||||||
|
AuditFindingDB.status != FindingStatusEnum.CLOSED,
|
||||||
|
).count()
|
||||||
|
open_minors = db.query(AuditFindingDB).filter(
|
||||||
|
AuditFindingDB.finding_type == FindingTypeEnum.MINOR,
|
||||||
|
AuditFindingDB.status != FindingStatusEnum.CLOSED,
|
||||||
|
).count()
|
||||||
|
|
||||||
|
policies_total = db.query(ISMSPolicyDB).count()
|
||||||
|
policies_approved = db.query(ISMSPolicyDB).filter(
|
||||||
|
ISMSPolicyDB.status == ApprovalStatusEnum.APPROVED,
|
||||||
|
).count()
|
||||||
|
|
||||||
|
objectives_total = db.query(SecurityObjectiveDB).count()
|
||||||
|
objectives_achieved = db.query(SecurityObjectiveDB).filter(
|
||||||
|
SecurityObjectiveDB.status == "achieved",
|
||||||
|
).count()
|
||||||
|
|
||||||
|
has_any_data = any([
|
||||||
|
scope_approved, soa_total > 0, policies_total > 0,
|
||||||
|
objectives_total > 0, last_mgmt_review is not None,
|
||||||
|
last_internal_audit is not None,
|
||||||
|
])
|
||||||
|
|
||||||
|
if not has_any_data:
|
||||||
|
certification_readiness = 0.0
|
||||||
|
else:
|
||||||
|
readiness_factors = [
|
||||||
|
scope_approved,
|
||||||
|
soa_all_approved,
|
||||||
|
last_mgmt_review is not None and last_mgmt_review.review_date >= last_year,
|
||||||
|
last_internal_audit is not None and (last_internal_audit.actual_end_date or date.min) >= last_year,
|
||||||
|
open_majors == 0 and (soa_total > 0 or policies_total > 0),
|
||||||
|
policies_total > 0 and policies_approved >= policies_total * 0.8,
|
||||||
|
objectives_total > 0,
|
||||||
|
]
|
||||||
|
certification_readiness = sum(readiness_factors) / len(readiness_factors) * 100
|
||||||
|
|
||||||
|
if not has_any_data:
|
||||||
|
overall_status = "not_started"
|
||||||
|
elif open_majors > 0:
|
||||||
|
overall_status = "not_ready"
|
||||||
|
elif certification_readiness >= 80:
|
||||||
|
overall_status = "ready"
|
||||||
|
else:
|
||||||
|
overall_status = "at_risk"
|
||||||
|
|
||||||
|
def _chapter_status(has_positive_evidence: bool, has_issues: bool) -> str:
|
||||||
|
if not has_positive_evidence:
|
||||||
|
return "not_started"
|
||||||
|
return "compliant" if not has_issues else "non_compliant"
|
||||||
|
|
||||||
|
ch9_parts = sum([last_mgmt_review is not None, last_internal_audit is not None])
|
||||||
|
ch9_pct = (ch9_parts / 2) * 100
|
||||||
|
|
||||||
|
capa_total = db.query(AuditFindingDB).count()
|
||||||
|
ch10_has_data = capa_total > 0
|
||||||
|
ch10_pct = 100.0 if (ch10_has_data and open_majors == 0) else (50.0 if ch10_has_data else 0.0)
|
||||||
|
|
||||||
|
chapters = [
|
||||||
|
ISO27001ChapterStatus(
|
||||||
|
chapter="4", title="Kontext der Organisation",
|
||||||
|
status=_chapter_status(scope_approved, False),
|
||||||
|
completion_percentage=100.0 if scope_approved else 0.0,
|
||||||
|
open_findings=0,
|
||||||
|
key_documents=["ISMS Scope", "Context Analysis"] if scope_approved else [],
|
||||||
|
last_reviewed=scope.approved_at if scope else None,
|
||||||
|
),
|
||||||
|
ISO27001ChapterStatus(
|
||||||
|
chapter="5", title="Fuehrung",
|
||||||
|
status=_chapter_status(policies_total > 0, policies_approved < policies_total),
|
||||||
|
completion_percentage=(policies_approved / max(policies_total, 1)) * 100 if policies_total > 0 else 0.0,
|
||||||
|
open_findings=0,
|
||||||
|
key_documents=[f"Policy {i+1}" for i in range(min(policies_approved, 3))] if policies_approved > 0 else [],
|
||||||
|
last_reviewed=None,
|
||||||
|
),
|
||||||
|
ISO27001ChapterStatus(
|
||||||
|
chapter="6", title="Planung",
|
||||||
|
status=_chapter_status(objectives_total > 0, False),
|
||||||
|
completion_percentage=75.0 if objectives_total > 0 else 0.0,
|
||||||
|
open_findings=0,
|
||||||
|
key_documents=["Risk Register", "Security Objectives"] if objectives_total > 0 else [],
|
||||||
|
last_reviewed=None,
|
||||||
|
),
|
||||||
|
ISO27001ChapterStatus(
|
||||||
|
chapter="9", title="Bewertung der Leistung",
|
||||||
|
status=_chapter_status(ch9_parts > 0, open_majors + open_minors > 0),
|
||||||
|
completion_percentage=ch9_pct,
|
||||||
|
open_findings=open_majors + open_minors,
|
||||||
|
key_documents=(
|
||||||
|
(["Internal Audit Report"] if last_internal_audit else [])
|
||||||
|
+ (["Management Review Minutes"] if last_mgmt_review else [])
|
||||||
|
),
|
||||||
|
last_reviewed=last_mgmt_review.approved_at if last_mgmt_review else None,
|
||||||
|
),
|
||||||
|
ISO27001ChapterStatus(
|
||||||
|
chapter="10", title="Verbesserung",
|
||||||
|
status=_chapter_status(ch10_has_data, open_majors > 0),
|
||||||
|
completion_percentage=ch10_pct,
|
||||||
|
open_findings=open_majors,
|
||||||
|
key_documents=["CAPA Register"] if ch10_has_data else [],
|
||||||
|
last_reviewed=None,
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
return ISO27001OverviewResponse(
|
||||||
|
overall_status=overall_status,
|
||||||
|
certification_readiness=certification_readiness,
|
||||||
|
chapters=chapters,
|
||||||
|
scope_approved=scope_approved,
|
||||||
|
soa_approved=soa_all_approved,
|
||||||
|
last_management_review=last_mgmt_review.approved_at if last_mgmt_review else None,
|
||||||
|
last_internal_audit=(
|
||||||
|
datetime.combine(last_internal_audit.actual_end_date, datetime.min.time())
|
||||||
|
if last_internal_audit and last_internal_audit.actual_end_date
|
||||||
|
else None
|
||||||
|
),
|
||||||
|
open_major_findings=open_majors,
|
||||||
|
open_minor_findings=open_minors,
|
||||||
|
policies_count=policies_total,
|
||||||
|
policies_approved=policies_approved,
|
||||||
|
objectives_count=objectives_total,
|
||||||
|
objectives_achieved=objectives_achieved,
|
||||||
|
)
|
||||||
276
backend-compliance/compliance/services/isms_findings_service.py
Normal file
276
backend-compliance/compliance/services/isms_findings_service.py
Normal file
@@ -0,0 +1,276 @@
|
|||||||
|
# mypy: disable-error-code="arg-type,assignment,union-attr,no-any-return"
|
||||||
|
"""
|
||||||
|
ISMS Findings & CAPA service -- Audit Findings and Corrective Actions.
|
||||||
|
|
||||||
|
Phase 1 Step 4: extracted from ``compliance.api.isms_routes``.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from datetime import datetime, date, timezone
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from compliance.db.models import (
|
||||||
|
AuditFindingDB,
|
||||||
|
CorrectiveActionDB,
|
||||||
|
InternalAuditDB,
|
||||||
|
FindingTypeEnum,
|
||||||
|
FindingStatusEnum,
|
||||||
|
CAPATypeEnum,
|
||||||
|
)
|
||||||
|
from compliance.domain import NotFoundError, ConflictError, ValidationError
|
||||||
|
from compliance.services.isms_governance_service import generate_id, log_audit_trail
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Audit Findings
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
class AuditFindingService:
|
||||||
|
"""Business logic for Audit Findings."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def list_findings(
|
||||||
|
db: Session,
|
||||||
|
finding_type: Optional[str] = None,
|
||||||
|
status: Optional[str] = None,
|
||||||
|
internal_audit_id: Optional[str] = None,
|
||||||
|
) -> dict:
|
||||||
|
query = db.query(AuditFindingDB)
|
||||||
|
if finding_type:
|
||||||
|
query = query.filter(AuditFindingDB.finding_type == finding_type)
|
||||||
|
if status:
|
||||||
|
query = query.filter(AuditFindingDB.status == status)
|
||||||
|
if internal_audit_id:
|
||||||
|
query = query.filter(AuditFindingDB.internal_audit_id == internal_audit_id)
|
||||||
|
findings = query.order_by(AuditFindingDB.identified_date.desc()).all()
|
||||||
|
major_count = sum(1 for f in findings if f.finding_type == FindingTypeEnum.MAJOR)
|
||||||
|
minor_count = sum(1 for f in findings if f.finding_type == FindingTypeEnum.MINOR)
|
||||||
|
ofi_count = sum(1 for f in findings if f.finding_type == FindingTypeEnum.OFI)
|
||||||
|
open_count = sum(1 for f in findings if f.status != FindingStatusEnum.CLOSED)
|
||||||
|
return {
|
||||||
|
"findings": findings,
|
||||||
|
"total": len(findings),
|
||||||
|
"major_count": major_count,
|
||||||
|
"minor_count": minor_count,
|
||||||
|
"ofi_count": ofi_count,
|
||||||
|
"open_count": open_count,
|
||||||
|
}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create(db: Session, data: dict) -> AuditFindingDB:
|
||||||
|
year = date.today().year
|
||||||
|
existing_count = (
|
||||||
|
db.query(AuditFindingDB)
|
||||||
|
.filter(AuditFindingDB.finding_id.like(f"FIND-{year}-%"))
|
||||||
|
.count()
|
||||||
|
)
|
||||||
|
finding_id = f"FIND-{year}-{existing_count + 1:03d}"
|
||||||
|
|
||||||
|
internal_audit_id = data.pop("internal_audit_id", None)
|
||||||
|
audit_session_id = data.pop("audit_session_id", None)
|
||||||
|
finding_type_str = data.pop("finding_type")
|
||||||
|
|
||||||
|
finding = AuditFindingDB(
|
||||||
|
id=generate_id(),
|
||||||
|
finding_id=finding_id,
|
||||||
|
audit_session_id=audit_session_id,
|
||||||
|
internal_audit_id=internal_audit_id,
|
||||||
|
finding_type=FindingTypeEnum(finding_type_str),
|
||||||
|
status=FindingStatusEnum.OPEN,
|
||||||
|
**data,
|
||||||
|
)
|
||||||
|
db.add(finding)
|
||||||
|
|
||||||
|
# Update internal audit counts if linked
|
||||||
|
if internal_audit_id:
|
||||||
|
audit = (
|
||||||
|
db.query(InternalAuditDB)
|
||||||
|
.filter(InternalAuditDB.id == internal_audit_id)
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if audit:
|
||||||
|
audit.total_findings = (audit.total_findings or 0) + 1
|
||||||
|
if finding_type_str == "major":
|
||||||
|
audit.major_findings = (audit.major_findings or 0) + 1
|
||||||
|
elif finding_type_str == "minor":
|
||||||
|
audit.minor_findings = (audit.minor_findings or 0) + 1
|
||||||
|
elif finding_type_str == "ofi":
|
||||||
|
audit.ofi_count = (audit.ofi_count or 0) + 1
|
||||||
|
elif finding_type_str == "positive":
|
||||||
|
audit.positive_observations = (audit.positive_observations or 0) + 1
|
||||||
|
|
||||||
|
log_audit_trail(db, "audit_finding", finding.id, finding_id, "create", data.get("auditor", "unknown"))
|
||||||
|
db.commit()
|
||||||
|
db.refresh(finding)
|
||||||
|
return finding
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def update(db: Session, finding_id: str, data: dict, updated_by: str) -> AuditFindingDB:
|
||||||
|
finding = (
|
||||||
|
db.query(AuditFindingDB)
|
||||||
|
.filter((AuditFindingDB.id == finding_id) | (AuditFindingDB.finding_id == finding_id))
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not finding:
|
||||||
|
raise NotFoundError("Finding not found")
|
||||||
|
for field, value in data.items():
|
||||||
|
if field == "status" and value:
|
||||||
|
setattr(finding, field, FindingStatusEnum(value))
|
||||||
|
else:
|
||||||
|
setattr(finding, field, value)
|
||||||
|
log_audit_trail(db, "audit_finding", finding.id, finding.finding_id, "update", updated_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(finding)
|
||||||
|
return finding
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def close(
|
||||||
|
db: Session,
|
||||||
|
finding_id: str,
|
||||||
|
closure_notes: str,
|
||||||
|
closed_by: str,
|
||||||
|
verification_method: str,
|
||||||
|
verification_evidence: str,
|
||||||
|
) -> AuditFindingDB:
|
||||||
|
finding = (
|
||||||
|
db.query(AuditFindingDB)
|
||||||
|
.filter((AuditFindingDB.id == finding_id) | (AuditFindingDB.finding_id == finding_id))
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not finding:
|
||||||
|
raise NotFoundError("Finding not found")
|
||||||
|
open_capas = (
|
||||||
|
db.query(CorrectiveActionDB)
|
||||||
|
.filter(
|
||||||
|
CorrectiveActionDB.finding_id == finding.id,
|
||||||
|
CorrectiveActionDB.status != "verified",
|
||||||
|
)
|
||||||
|
.count()
|
||||||
|
)
|
||||||
|
if open_capas > 0:
|
||||||
|
raise ValidationError(f"Cannot close finding: {open_capas} CAPA(s) not yet verified")
|
||||||
|
finding.status = FindingStatusEnum.CLOSED
|
||||||
|
finding.closed_date = date.today()
|
||||||
|
finding.closure_notes = closure_notes
|
||||||
|
finding.closed_by = closed_by
|
||||||
|
finding.verification_method = verification_method
|
||||||
|
finding.verification_evidence = verification_evidence
|
||||||
|
finding.verified_by = closed_by
|
||||||
|
finding.verified_at = datetime.now(timezone.utc)
|
||||||
|
log_audit_trail(db, "audit_finding", finding.id, finding.finding_id, "close", closed_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(finding)
|
||||||
|
return finding
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Corrective Actions (CAPA)
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
class CAPAService:
|
||||||
|
"""Business logic for Corrective / Preventive Actions."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def list_capas(
|
||||||
|
db: Session,
|
||||||
|
finding_id: Optional[str] = None,
|
||||||
|
status: Optional[str] = None,
|
||||||
|
assigned_to: Optional[str] = None,
|
||||||
|
) -> tuple:
|
||||||
|
query = db.query(CorrectiveActionDB)
|
||||||
|
if finding_id:
|
||||||
|
query = query.filter(CorrectiveActionDB.finding_id == finding_id)
|
||||||
|
if status:
|
||||||
|
query = query.filter(CorrectiveActionDB.status == status)
|
||||||
|
if assigned_to:
|
||||||
|
query = query.filter(CorrectiveActionDB.assigned_to == assigned_to)
|
||||||
|
actions = query.order_by(CorrectiveActionDB.planned_completion).all()
|
||||||
|
return actions, len(actions)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create(db: Session, data: dict, created_by: str) -> CorrectiveActionDB:
|
||||||
|
finding = db.query(AuditFindingDB).filter(AuditFindingDB.id == data["finding_id"]).first()
|
||||||
|
if not finding:
|
||||||
|
raise NotFoundError("Finding not found")
|
||||||
|
year = date.today().year
|
||||||
|
existing_count = (
|
||||||
|
db.query(CorrectiveActionDB)
|
||||||
|
.filter(CorrectiveActionDB.capa_id.like(f"CAPA-{year}-%"))
|
||||||
|
.count()
|
||||||
|
)
|
||||||
|
capa_id = f"CAPA-{year}-{existing_count + 1:03d}"
|
||||||
|
capa_type_str = data.pop("capa_type")
|
||||||
|
capa = CorrectiveActionDB(
|
||||||
|
id=generate_id(),
|
||||||
|
capa_id=capa_id,
|
||||||
|
capa_type=CAPATypeEnum(capa_type_str),
|
||||||
|
status="planned",
|
||||||
|
**data,
|
||||||
|
)
|
||||||
|
db.add(capa)
|
||||||
|
finding.status = FindingStatusEnum.CORRECTIVE_ACTION_PENDING
|
||||||
|
log_audit_trail(db, "capa", capa.id, capa_id, "create", created_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(capa)
|
||||||
|
return capa
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def update(db: Session, capa_id: str, data: dict, updated_by: str) -> CorrectiveActionDB:
|
||||||
|
capa = (
|
||||||
|
db.query(CorrectiveActionDB)
|
||||||
|
.filter((CorrectiveActionDB.id == capa_id) | (CorrectiveActionDB.capa_id == capa_id))
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not capa:
|
||||||
|
raise NotFoundError("CAPA not found")
|
||||||
|
for field, value in data.items():
|
||||||
|
setattr(capa, field, value)
|
||||||
|
if capa.status == "completed" and not capa.actual_completion:
|
||||||
|
capa.actual_completion = date.today()
|
||||||
|
log_audit_trail(db, "capa", capa.id, capa.capa_id, "update", updated_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(capa)
|
||||||
|
return capa
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def verify(
|
||||||
|
db: Session,
|
||||||
|
capa_id: str,
|
||||||
|
verified_by: str,
|
||||||
|
is_effective: bool,
|
||||||
|
effectiveness_notes: str,
|
||||||
|
) -> CorrectiveActionDB:
|
||||||
|
capa = (
|
||||||
|
db.query(CorrectiveActionDB)
|
||||||
|
.filter((CorrectiveActionDB.id == capa_id) | (CorrectiveActionDB.capa_id == capa_id))
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not capa:
|
||||||
|
raise NotFoundError("CAPA not found")
|
||||||
|
if capa.status != "completed":
|
||||||
|
raise ValidationError("CAPA must be completed before verification")
|
||||||
|
capa.effectiveness_verified = is_effective
|
||||||
|
capa.effectiveness_verification_date = date.today()
|
||||||
|
capa.effectiveness_notes = effectiveness_notes
|
||||||
|
capa.status = "verified" if is_effective else "completed"
|
||||||
|
if is_effective:
|
||||||
|
finding = db.query(AuditFindingDB).filter(AuditFindingDB.id == capa.finding_id).first()
|
||||||
|
if finding:
|
||||||
|
unverified = (
|
||||||
|
db.query(CorrectiveActionDB)
|
||||||
|
.filter(
|
||||||
|
CorrectiveActionDB.finding_id == finding.id,
|
||||||
|
CorrectiveActionDB.id != capa.id,
|
||||||
|
CorrectiveActionDB.status != "verified",
|
||||||
|
)
|
||||||
|
.count()
|
||||||
|
)
|
||||||
|
if unverified == 0:
|
||||||
|
finding.status = FindingStatusEnum.VERIFICATION_PENDING
|
||||||
|
log_audit_trail(db, "capa", capa.id, capa.capa_id, "verify", verified_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(capa)
|
||||||
|
return capa
|
||||||
@@ -0,0 +1,416 @@
|
|||||||
|
# mypy: disable-error-code="arg-type,assignment,union-attr,no-any-return"
|
||||||
|
"""
|
||||||
|
ISMS Governance service -- Scope, Context, Policies, Objectives, SoA.
|
||||||
|
|
||||||
|
Phase 1 Step 4: extracted from ``compliance.api.isms_routes``. Helpers
|
||||||
|
``generate_id``, ``create_signature`` and ``log_audit_trail`` are defined
|
||||||
|
here and re-exported from ``compliance.api.isms_routes`` for legacy imports.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import uuid
|
||||||
|
import hashlib
|
||||||
|
from datetime import datetime, date, timezone
|
||||||
|
from typing import Optional, List
|
||||||
|
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from compliance.db.models import (
|
||||||
|
ISMSScopeDB,
|
||||||
|
ISMSContextDB,
|
||||||
|
ISMSPolicyDB,
|
||||||
|
SecurityObjectiveDB,
|
||||||
|
StatementOfApplicabilityDB,
|
||||||
|
AuditTrailDB,
|
||||||
|
ApprovalStatusEnum,
|
||||||
|
)
|
||||||
|
from compliance.domain import NotFoundError, ConflictError, ValidationError
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Shared helpers (re-exported by isms_routes for back-compat)
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
def generate_id() -> str:
|
||||||
|
"""Generate a UUID string."""
|
||||||
|
return str(uuid.uuid4())
|
||||||
|
|
||||||
|
|
||||||
|
def create_signature(data: str) -> str:
|
||||||
|
"""Create SHA-256 signature."""
|
||||||
|
return hashlib.sha256(data.encode()).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def log_audit_trail(
|
||||||
|
db: Session,
|
||||||
|
entity_type: str,
|
||||||
|
entity_id: str,
|
||||||
|
entity_name: str,
|
||||||
|
action: str,
|
||||||
|
performed_by: str,
|
||||||
|
field_changed: str = None,
|
||||||
|
old_value: str = None,
|
||||||
|
new_value: str = None,
|
||||||
|
change_summary: str = None,
|
||||||
|
) -> None:
|
||||||
|
"""Log an entry to the audit trail."""
|
||||||
|
trail = AuditTrailDB(
|
||||||
|
id=generate_id(),
|
||||||
|
entity_type=entity_type,
|
||||||
|
entity_id=entity_id,
|
||||||
|
entity_name=entity_name,
|
||||||
|
action=action,
|
||||||
|
field_changed=field_changed,
|
||||||
|
old_value=old_value,
|
||||||
|
new_value=new_value,
|
||||||
|
change_summary=change_summary,
|
||||||
|
performed_by=performed_by,
|
||||||
|
performed_at=datetime.now(timezone.utc),
|
||||||
|
checksum=create_signature(
|
||||||
|
f"{entity_type}|{entity_id}|{action}|{performed_by}"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
db.add(trail)
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Scope (ISO 27001 4.3)
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
class ISMSScopeService:
|
||||||
|
"""Business logic for ISMS Scope."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_current(db: Session) -> ISMSScopeDB:
|
||||||
|
scope = (
|
||||||
|
db.query(ISMSScopeDB)
|
||||||
|
.filter(ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED)
|
||||||
|
.order_by(ISMSScopeDB.created_at.desc())
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not scope:
|
||||||
|
raise NotFoundError("No ISMS scope defined yet")
|
||||||
|
return scope
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create(db: Session, data: dict, created_by: str) -> ISMSScopeDB:
|
||||||
|
existing = (
|
||||||
|
db.query(ISMSScopeDB)
|
||||||
|
.filter(ISMSScopeDB.status != ApprovalStatusEnum.SUPERSEDED)
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
for s in existing:
|
||||||
|
s.status = ApprovalStatusEnum.SUPERSEDED
|
||||||
|
|
||||||
|
scope = ISMSScopeDB(id=generate_id(), status=ApprovalStatusEnum.DRAFT, created_by=created_by, **data)
|
||||||
|
db.add(scope)
|
||||||
|
log_audit_trail(db, "isms_scope", scope.id, "ISMS Scope", "create", created_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(scope)
|
||||||
|
return scope
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def update(db: Session, scope_id: str, data: dict, updated_by: str) -> ISMSScopeDB:
|
||||||
|
scope = db.query(ISMSScopeDB).filter(ISMSScopeDB.id == scope_id).first()
|
||||||
|
if not scope:
|
||||||
|
raise NotFoundError("Scope not found")
|
||||||
|
if scope.status == ApprovalStatusEnum.APPROVED:
|
||||||
|
raise ConflictError("Cannot modify approved scope. Create new version.")
|
||||||
|
for field, value in data.items():
|
||||||
|
setattr(scope, field, value)
|
||||||
|
scope.updated_by = updated_by
|
||||||
|
scope.updated_at = datetime.now(timezone.utc)
|
||||||
|
version_parts = scope.version.split(".")
|
||||||
|
scope.version = f"{version_parts[0]}.{int(version_parts[1]) + 1}"
|
||||||
|
log_audit_trail(db, "isms_scope", scope.id, "ISMS Scope", "update", updated_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(scope)
|
||||||
|
return scope
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def approve(db: Session, scope_id: str, approved_by: str, effective_date: date, review_date: date) -> ISMSScopeDB:
|
||||||
|
scope = db.query(ISMSScopeDB).filter(ISMSScopeDB.id == scope_id).first()
|
||||||
|
if not scope:
|
||||||
|
raise NotFoundError("Scope not found")
|
||||||
|
scope.status = ApprovalStatusEnum.APPROVED
|
||||||
|
scope.approved_by = approved_by
|
||||||
|
scope.approved_at = datetime.now(timezone.utc)
|
||||||
|
scope.effective_date = effective_date
|
||||||
|
scope.review_date = review_date
|
||||||
|
scope.approval_signature = create_signature(
|
||||||
|
f"{scope.scope_statement}|{approved_by}|{datetime.now(timezone.utc).isoformat()}"
|
||||||
|
)
|
||||||
|
log_audit_trail(db, "isms_scope", scope.id, "ISMS Scope", "approve", approved_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(scope)
|
||||||
|
return scope
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Context (ISO 27001 4.1, 4.2)
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
class ISMSContextService:
|
||||||
|
"""Business logic for ISMS Context."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_current(db: Session) -> ISMSContextDB:
|
||||||
|
context = (
|
||||||
|
db.query(ISMSContextDB)
|
||||||
|
.filter(ISMSContextDB.status != ApprovalStatusEnum.SUPERSEDED)
|
||||||
|
.order_by(ISMSContextDB.created_at.desc())
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not context:
|
||||||
|
raise NotFoundError("No ISMS context defined yet")
|
||||||
|
return context
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create(db: Session, data: dict, created_by: str) -> ISMSContextDB:
|
||||||
|
existing = (
|
||||||
|
db.query(ISMSContextDB)
|
||||||
|
.filter(ISMSContextDB.status != ApprovalStatusEnum.SUPERSEDED)
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
for c in existing:
|
||||||
|
c.status = ApprovalStatusEnum.SUPERSEDED
|
||||||
|
context = ISMSContextDB(id=generate_id(), status=ApprovalStatusEnum.DRAFT, **data)
|
||||||
|
db.add(context)
|
||||||
|
log_audit_trail(db, "isms_context", context.id, "ISMS Context", "create", created_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(context)
|
||||||
|
return context
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Policies (ISO 27001 5.2)
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
class ISMSPolicyService:
|
||||||
|
"""Business logic for ISMS Policies."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def list_policies(db: Session, policy_type: Optional[str] = None, status: Optional[str] = None) -> tuple:
|
||||||
|
query = db.query(ISMSPolicyDB)
|
||||||
|
if policy_type:
|
||||||
|
query = query.filter(ISMSPolicyDB.policy_type == policy_type)
|
||||||
|
if status:
|
||||||
|
query = query.filter(ISMSPolicyDB.status == status)
|
||||||
|
policies = query.order_by(ISMSPolicyDB.policy_id).all()
|
||||||
|
return policies, len(policies)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create(db: Session, data: dict) -> ISMSPolicyDB:
|
||||||
|
existing = db.query(ISMSPolicyDB).filter(ISMSPolicyDB.policy_id == data["policy_id"]).first()
|
||||||
|
if existing:
|
||||||
|
raise ConflictError(f"Policy {data['policy_id']} already exists")
|
||||||
|
authored_by = data.pop("authored_by")
|
||||||
|
policy = ISMSPolicyDB(
|
||||||
|
id=generate_id(), authored_by=authored_by, status=ApprovalStatusEnum.DRAFT, **data,
|
||||||
|
)
|
||||||
|
db.add(policy)
|
||||||
|
log_audit_trail(db, "isms_policy", policy.id, policy.policy_id, "create", authored_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(policy)
|
||||||
|
return policy
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get(db: Session, policy_id: str) -> ISMSPolicyDB:
|
||||||
|
policy = (
|
||||||
|
db.query(ISMSPolicyDB)
|
||||||
|
.filter((ISMSPolicyDB.id == policy_id) | (ISMSPolicyDB.policy_id == policy_id))
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not policy:
|
||||||
|
raise NotFoundError("Policy not found")
|
||||||
|
return policy
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def update(db: Session, policy_id: str, data: dict, updated_by: str) -> ISMSPolicyDB:
|
||||||
|
policy = (
|
||||||
|
db.query(ISMSPolicyDB)
|
||||||
|
.filter((ISMSPolicyDB.id == policy_id) | (ISMSPolicyDB.policy_id == policy_id))
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not policy:
|
||||||
|
raise NotFoundError("Policy not found")
|
||||||
|
if policy.status == ApprovalStatusEnum.APPROVED:
|
||||||
|
version_parts = policy.version.split(".")
|
||||||
|
policy.version = f"{int(version_parts[0]) + 1}.0"
|
||||||
|
policy.status = ApprovalStatusEnum.DRAFT
|
||||||
|
for field, value in data.items():
|
||||||
|
setattr(policy, field, value)
|
||||||
|
log_audit_trail(db, "isms_policy", policy.id, policy.policy_id, "update", updated_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(policy)
|
||||||
|
return policy
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def approve(db: Session, policy_id: str, reviewed_by: str, approved_by: str, effective_date: date) -> ISMSPolicyDB:
|
||||||
|
policy = (
|
||||||
|
db.query(ISMSPolicyDB)
|
||||||
|
.filter((ISMSPolicyDB.id == policy_id) | (ISMSPolicyDB.policy_id == policy_id))
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not policy:
|
||||||
|
raise NotFoundError("Policy not found")
|
||||||
|
policy.reviewed_by = reviewed_by
|
||||||
|
policy.approved_by = approved_by
|
||||||
|
policy.approved_at = datetime.now(timezone.utc)
|
||||||
|
policy.effective_date = effective_date
|
||||||
|
policy.next_review_date = date(
|
||||||
|
effective_date.year + (policy.review_frequency_months // 12),
|
||||||
|
effective_date.month,
|
||||||
|
effective_date.day,
|
||||||
|
)
|
||||||
|
policy.status = ApprovalStatusEnum.APPROVED
|
||||||
|
policy.approval_signature = create_signature(
|
||||||
|
f"{policy.policy_id}|{approved_by}|{datetime.now(timezone.utc).isoformat()}"
|
||||||
|
)
|
||||||
|
log_audit_trail(db, "isms_policy", policy.id, policy.policy_id, "approve", approved_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(policy)
|
||||||
|
return policy
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Security Objectives (ISO 27001 6.2)
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
class SecurityObjectiveService:
|
||||||
|
"""Business logic for Security Objectives."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def list_objectives(db: Session, category: Optional[str] = None, status: Optional[str] = None) -> tuple:
|
||||||
|
query = db.query(SecurityObjectiveDB)
|
||||||
|
if category:
|
||||||
|
query = query.filter(SecurityObjectiveDB.category == category)
|
||||||
|
if status:
|
||||||
|
query = query.filter(SecurityObjectiveDB.status == status)
|
||||||
|
objectives = query.order_by(SecurityObjectiveDB.objective_id).all()
|
||||||
|
return objectives, len(objectives)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create(db: Session, data: dict, created_by: str) -> SecurityObjectiveDB:
|
||||||
|
objective = SecurityObjectiveDB(id=generate_id(), status="active", **data)
|
||||||
|
db.add(objective)
|
||||||
|
log_audit_trail(db, "security_objective", objective.id, objective.objective_id, "create", created_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(objective)
|
||||||
|
return objective
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def update(db: Session, objective_id: str, data: dict, updated_by: str) -> SecurityObjectiveDB:
|
||||||
|
objective = (
|
||||||
|
db.query(SecurityObjectiveDB)
|
||||||
|
.filter((SecurityObjectiveDB.id == objective_id) | (SecurityObjectiveDB.objective_id == objective_id))
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not objective:
|
||||||
|
raise NotFoundError("Objective not found")
|
||||||
|
for field, value in data.items():
|
||||||
|
setattr(objective, field, value)
|
||||||
|
if objective.progress_percentage >= 100 and objective.status == "active":
|
||||||
|
objective.status = "achieved"
|
||||||
|
objective.achieved_date = date.today()
|
||||||
|
log_audit_trail(db, "security_objective", objective.id, objective.objective_id, "update", updated_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(objective)
|
||||||
|
return objective
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Statement of Applicability (SoA)
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
class SoAService:
|
||||||
|
"""Business logic for Statement of Applicability."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def list_entries(
|
||||||
|
db: Session,
|
||||||
|
is_applicable: Optional[bool] = None,
|
||||||
|
implementation_status: Optional[str] = None,
|
||||||
|
category: Optional[str] = None,
|
||||||
|
) -> dict:
|
||||||
|
query = db.query(StatementOfApplicabilityDB)
|
||||||
|
if is_applicable is not None:
|
||||||
|
query = query.filter(StatementOfApplicabilityDB.is_applicable == is_applicable)
|
||||||
|
if implementation_status:
|
||||||
|
query = query.filter(StatementOfApplicabilityDB.implementation_status == implementation_status)
|
||||||
|
if category:
|
||||||
|
query = query.filter(StatementOfApplicabilityDB.annex_a_category == category)
|
||||||
|
entries = query.order_by(StatementOfApplicabilityDB.annex_a_control).all()
|
||||||
|
applicable_count = sum(1 for e in entries if e.is_applicable)
|
||||||
|
implemented_count = sum(1 for e in entries if e.implementation_status == "implemented")
|
||||||
|
planned_count = sum(1 for e in entries if e.implementation_status == "planned")
|
||||||
|
return {
|
||||||
|
"entries": entries,
|
||||||
|
"total": len(entries),
|
||||||
|
"applicable_count": applicable_count,
|
||||||
|
"not_applicable_count": len(entries) - applicable_count,
|
||||||
|
"implemented_count": implemented_count,
|
||||||
|
"planned_count": planned_count,
|
||||||
|
}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create(db: Session, data: dict, created_by: str) -> StatementOfApplicabilityDB:
|
||||||
|
existing = (
|
||||||
|
db.query(StatementOfApplicabilityDB)
|
||||||
|
.filter(StatementOfApplicabilityDB.annex_a_control == data["annex_a_control"])
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if existing:
|
||||||
|
raise ConflictError(f"SoA entry for {data['annex_a_control']} already exists")
|
||||||
|
entry = StatementOfApplicabilityDB(id=generate_id(), **data)
|
||||||
|
db.add(entry)
|
||||||
|
log_audit_trail(db, "soa", entry.id, entry.annex_a_control, "create", created_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(entry)
|
||||||
|
return entry
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def update(db: Session, entry_id: str, data: dict, updated_by: str) -> StatementOfApplicabilityDB:
|
||||||
|
entry = (
|
||||||
|
db.query(StatementOfApplicabilityDB)
|
||||||
|
.filter(
|
||||||
|
(StatementOfApplicabilityDB.id == entry_id)
|
||||||
|
| (StatementOfApplicabilityDB.annex_a_control == entry_id)
|
||||||
|
)
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not entry:
|
||||||
|
raise NotFoundError("SoA entry not found")
|
||||||
|
for field, value in data.items():
|
||||||
|
setattr(entry, field, value)
|
||||||
|
version_parts = entry.version.split(".")
|
||||||
|
entry.version = f"{version_parts[0]}.{int(version_parts[1]) + 1}"
|
||||||
|
log_audit_trail(db, "soa", entry.id, entry.annex_a_control, "update", updated_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(entry)
|
||||||
|
return entry
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def approve(db: Session, entry_id: str, reviewed_by: str, approved_by: str) -> StatementOfApplicabilityDB:
|
||||||
|
entry = (
|
||||||
|
db.query(StatementOfApplicabilityDB)
|
||||||
|
.filter(
|
||||||
|
(StatementOfApplicabilityDB.id == entry_id)
|
||||||
|
| (StatementOfApplicabilityDB.annex_a_control == entry_id)
|
||||||
|
)
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not entry:
|
||||||
|
raise NotFoundError("SoA entry not found")
|
||||||
|
entry.reviewed_by = reviewed_by
|
||||||
|
entry.reviewed_at = datetime.now(timezone.utc)
|
||||||
|
entry.approved_by = approved_by
|
||||||
|
entry.approved_at = datetime.now(timezone.utc)
|
||||||
|
log_audit_trail(db, "soa", entry.id, entry.annex_a_control, "approve", approved_by)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(entry)
|
||||||
|
return entry
|
||||||
Reference in New Issue
Block a user