Files
breakpilot-compliance/backend-compliance/compliance/services/audit_signoff_service.py
Sharang Parnerkar 4a91814bfc refactor(backend/api): extract AuditSession service layer (Step 4 worked example)
Phase 1 Step 4 of PHASE1_RUNBOOK.md, first worked example. Demonstrates
the router -> service delegation pattern for all 18 oversized route
files still above the 500 LOC hard cap.

compliance/api/audit_routes.py (637 LOC) is decomposed into:

  compliance/api/audit_routes.py              (198) — thin handlers
  compliance/services/audit_session_service.py (259) — session lifecycle
  compliance/services/audit_signoff_service.py (319) — checklist + sign-off
  compliance/api/_http_errors.py               ( 43) — reusable error translator

Handlers shrink to 3-6 lines each:

    @router.post("/sessions", response_model=AuditSessionResponse)
    async def create_audit_session(
        request: CreateAuditSessionRequest,
        service: AuditSessionService = Depends(get_audit_session_service),
    ):
        with translate_domain_errors():
            return service.create(request)

Services are HTTP-agnostic: they raise NotFoundError / ConflictError /
ValidationError from compliance.domain, and the route layer translates
those to HTTPException(404/409/400) via the translate_domain_errors()
context manager in compliance.api._http_errors. The error translator is
reusable by every future Step 4 refactor.

Services take a sqlalchemy Session in the constructor and are wired via
Depends factories (get_audit_session_service / get_audit_signoff_service).
No globals, no module-level state.

Behavior is byte-identical at the HTTP boundary:
  - Same paths, methods, status codes, response models
  - Same error messages (domain error __str__ preserved)
  - Same auto-start-on-first-signoff, same statistics calculation,
    same signature hash format, same PDF streaming response

Verified:
  - 173/173 pytest compliance/tests/ tests/contracts/ pass
  - OpenAPI 360 paths / 484 operations unchanged
  - audit_routes.py under soft 300 target
  - Both new service files under soft 300 / hard 500

Note: compliance/tests/test_audit_routes.py contains placeholder tests
that do not actually import or call the handler functions — they only
assert on request-data shape. Real behavioral coverage relies on the
contract test. A follow-up commit should add TestClient-based
integration tests for the audit endpoints. Flagged in PHASE1_RUNBOOK.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 18:16:50 +02:00

320 lines
11 KiB
Python

"""
Audit Sign-Off service — audit checklist retrieval and per-requirement sign-off
operations.
Phase 1 Step 4: extracted from ``compliance/api/audit_routes.py``. HTTP-agnostic;
raises ``compliance.domain`` errors translated at the route layer.
"""
import hashlib
from datetime import datetime, timezone
from typing import Optional
from uuid import uuid4
from sqlalchemy import func
from sqlalchemy.orm import Session
from compliance.db.models import (
AuditResultEnum,
AuditSessionDB,
AuditSessionStatusEnum,
AuditSignOffDB,
ControlMappingDB,
RegulationDB,
RequirementDB,
)
from compliance.domain import ConflictError, NotFoundError, ValidationError
from compliance.schemas.audit_session import (
AuditChecklistItem,
AuditChecklistResponse,
AuditSessionSummary,
AuditStatistics,
SignOffRequest,
SignOffResponse,
)
from compliance.schemas.common import PaginationMeta
class AuditSignOffService:
"""Business logic for audit checklist & per-requirement sign-offs."""
def __init__(self, db: Session) -> None:
self.db = db
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _get_session_or_raise(self, session_id: str) -> AuditSessionDB:
session = (
self.db.query(AuditSessionDB)
.filter(AuditSessionDB.id == session_id)
.first()
)
if not session:
raise NotFoundError(f"Audit session {session_id} not found")
return session
@staticmethod
def _signoff_to_response(signoff: AuditSignOffDB) -> SignOffResponse:
return SignOffResponse(
id=signoff.id,
session_id=signoff.session_id,
requirement_id=signoff.requirement_id,
result=signoff.result.value,
notes=signoff.notes,
is_signed=signoff.signature_hash is not None,
signature_hash=signoff.signature_hash,
signed_at=signoff.signed_at,
signed_by=signoff.signed_by,
created_at=signoff.created_at,
updated_at=signoff.updated_at,
)
@staticmethod
def _compute_stats(
total: int, signoffs: list[AuditSignOffDB], completion_percentage: float
) -> AuditStatistics:
return AuditStatistics(
total=total,
compliant=sum(1 for s in signoffs if s.result == AuditResultEnum.COMPLIANT),
compliant_with_notes=sum(
1 for s in signoffs if s.result == AuditResultEnum.COMPLIANT_WITH_NOTES
),
non_compliant=sum(
1 for s in signoffs if s.result == AuditResultEnum.NON_COMPLIANT
),
not_applicable=sum(
1 for s in signoffs if s.result == AuditResultEnum.NOT_APPLICABLE
),
pending=total - len(signoffs),
completion_percentage=completion_percentage,
)
# ------------------------------------------------------------------
# Queries
# ------------------------------------------------------------------
def get_checklist(
self,
session_id: str,
page: int,
page_size: int,
status_filter: Optional[str],
regulation_filter: Optional[str],
search: Optional[str],
) -> AuditChecklistResponse:
"""Return the paginated audit checklist with per-requirement sign-off status."""
session = self._get_session_or_raise(session_id)
query = self.db.query(RequirementDB).join(RegulationDB)
if session.regulation_ids:
query = query.filter(RegulationDB.code.in_(session.regulation_ids))
if regulation_filter:
query = query.filter(RegulationDB.code == regulation_filter)
if search:
term = f"%{search}%"
query = query.filter(
(RequirementDB.title.ilike(term))
| (RequirementDB.article.ilike(term))
| (RequirementDB.description.ilike(term))
)
total_count = query.count()
requirements = (
query.order_by(RegulationDB.code, RequirementDB.article)
.offset((page - 1) * page_size)
.limit(page_size)
.all()
)
req_ids = [r.id for r in requirements]
signoffs = (
self.db.query(AuditSignOffDB)
.filter(AuditSignOffDB.session_id == session_id)
.filter(AuditSignOffDB.requirement_id.in_(req_ids))
.all()
)
signoff_map = {s.requirement_id: s for s in signoffs}
mapping_counts = (
self.db.query(ControlMappingDB.requirement_id, func.count(ControlMappingDB.id))
.filter(ControlMappingDB.requirement_id.in_(req_ids))
.group_by(ControlMappingDB.requirement_id)
.all()
)
mapping_count_map = dict(mapping_counts)
items: list[AuditChecklistItem] = []
for req in requirements:
signoff = signoff_map.get(req.id)
if status_filter:
if status_filter == "pending" and signoff is not None:
continue
if status_filter != "pending" and (
signoff is None or signoff.result.value != status_filter
):
continue
items.append(
AuditChecklistItem(
requirement_id=req.id,
regulation_code=req.regulation.code,
article=req.article,
paragraph=req.paragraph,
title=req.title,
description=req.description,
current_result=signoff.result.value if signoff else "pending",
notes=signoff.notes if signoff else None,
is_signed=signoff.signature_hash is not None if signoff else False,
signed_at=signoff.signed_at if signoff else None,
signed_by=signoff.signed_by if signoff else None,
evidence_count=0, # TODO: Add evidence count
controls_mapped=mapping_count_map.get(req.id, 0),
implementation_status=req.implementation_status,
priority=req.priority,
)
)
all_signoffs = (
self.db.query(AuditSignOffDB)
.filter(AuditSignOffDB.session_id == session_id)
.all()
)
stats = self._compute_stats(
session.total_items, all_signoffs, session.completion_percentage
)
return AuditChecklistResponse(
session=AuditSessionSummary(
id=session.id,
name=session.name,
auditor_name=session.auditor_name,
status=session.status.value,
total_items=session.total_items,
completed_items=session.completed_items,
completion_percentage=session.completion_percentage,
created_at=session.created_at,
started_at=session.started_at,
completed_at=session.completed_at,
),
items=items,
pagination=PaginationMeta(
page=page,
page_size=page_size,
total=total_count,
total_pages=(total_count + page_size - 1) // page_size,
),
statistics=stats,
)
def get_sign_off(self, session_id: str, requirement_id: str) -> SignOffResponse:
"""Return a single sign-off record for (session, requirement)."""
signoff = (
self.db.query(AuditSignOffDB)
.filter(AuditSignOffDB.session_id == session_id)
.filter(AuditSignOffDB.requirement_id == requirement_id)
.first()
)
if not signoff:
raise NotFoundError(
f"No sign-off found for requirement {requirement_id} in session {session_id}"
)
return self._signoff_to_response(signoff)
# ------------------------------------------------------------------
# Commands
# ------------------------------------------------------------------
def sign_off(
self,
session_id: str,
requirement_id: str,
request: SignOffRequest,
) -> SignOffResponse:
"""Create or update a sign-off; optionally produce a SHA-256 digital signature."""
session = self._get_session_or_raise(session_id)
if session.status not in (
AuditSessionStatusEnum.DRAFT,
AuditSessionStatusEnum.IN_PROGRESS,
):
raise ConflictError(
f"Cannot sign off items in session with status: {session.status.value}"
)
requirement = (
self.db.query(RequirementDB)
.filter(RequirementDB.id == requirement_id)
.first()
)
if not requirement:
raise NotFoundError(f"Requirement {requirement_id} not found")
try:
result_enum = AuditResultEnum(request.result)
except ValueError as exc:
raise ValidationError(
"Invalid result: "
f"{request.result}. Valid values: compliant, compliant_notes, "
"non_compliant, not_applicable, pending"
) from exc
signoff = (
self.db.query(AuditSignOffDB)
.filter(AuditSignOffDB.session_id == session_id)
.filter(AuditSignOffDB.requirement_id == requirement_id)
.first()
)
was_new = signoff is None
old_result = signoff.result if signoff else None
if signoff:
signoff.result = result_enum
signoff.notes = request.notes
signoff.updated_at = datetime.now(timezone.utc)
else:
signoff = AuditSignOffDB(
id=str(uuid4()),
session_id=session_id,
requirement_id=requirement_id,
result=result_enum,
notes=request.notes,
)
self.db.add(signoff)
if request.sign:
timestamp = datetime.now(timezone.utc).isoformat()
data = (
f"{result_enum.value}|{requirement_id}|{session.auditor_name}|{timestamp}"
)
signoff.signature_hash = hashlib.sha256(data.encode()).hexdigest()
signoff.signed_at = datetime.now(timezone.utc)
signoff.signed_by = session.auditor_name
if was_new:
session.completed_items += 1
if old_result != result_enum:
if old_result in (
AuditResultEnum.COMPLIANT,
AuditResultEnum.COMPLIANT_WITH_NOTES,
):
session.compliant_count = max(0, session.compliant_count - 1)
elif old_result == AuditResultEnum.NON_COMPLIANT:
session.non_compliant_count = max(0, session.non_compliant_count - 1)
if result_enum in (
AuditResultEnum.COMPLIANT,
AuditResultEnum.COMPLIANT_WITH_NOTES,
):
session.compliant_count += 1
elif result_enum == AuditResultEnum.NON_COMPLIANT:
session.non_compliant_count += 1
if session.status == AuditSessionStatusEnum.DRAFT:
session.status = AuditSessionStatusEnum.IN_PROGRESS
session.started_at = datetime.now(timezone.utc)
self.db.commit()
self.db.refresh(signoff)
return self._signoff_to_response(signoff)