This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/backend/compliance/db/repository.py
Benjamin Admin 21a844cb8a fix: Restore all files lost during destructive rebase
A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.

This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).

Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 09:51:32 +01:00

1539 lines
52 KiB
Python

"""
Repository layer for Compliance module.
Provides CRUD operations and business logic queries for all compliance entities.
"""
import uuid
from datetime import datetime, date
from typing import List, Optional, Dict, Any
from sqlalchemy.orm import Session as DBSession, selectinload, joinedload
from sqlalchemy import func, and_, or_
from typing import Tuple
from .models import (
RegulationDB, RequirementDB, ControlDB, ControlMappingDB,
EvidenceDB, RiskDB, AuditExportDB,
AuditSessionDB, AuditSignOffDB, AuditResultEnum, AuditSessionStatusEnum,
RegulationTypeEnum, ControlDomainEnum, ControlStatusEnum,
RiskLevelEnum, EvidenceStatusEnum, ExportStatusEnum
)
class RegulationRepository:
"""Repository for regulations/standards."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
code: str,
name: str,
regulation_type: RegulationTypeEnum,
full_name: Optional[str] = None,
source_url: Optional[str] = None,
local_pdf_path: Optional[str] = None,
effective_date: Optional[date] = None,
description: Optional[str] = None,
) -> RegulationDB:
"""Create a new regulation."""
regulation = RegulationDB(
id=str(uuid.uuid4()),
code=code,
name=name,
full_name=full_name,
regulation_type=regulation_type,
source_url=source_url,
local_pdf_path=local_pdf_path,
effective_date=effective_date,
description=description,
)
self.db.add(regulation)
self.db.commit()
self.db.refresh(regulation)
return regulation
def get_by_id(self, regulation_id: str) -> Optional[RegulationDB]:
"""Get regulation by ID."""
return self.db.query(RegulationDB).filter(RegulationDB.id == regulation_id).first()
def get_by_code(self, code: str) -> Optional[RegulationDB]:
"""Get regulation by code (e.g., 'GDPR')."""
return self.db.query(RegulationDB).filter(RegulationDB.code == code).first()
def get_all(
self,
regulation_type: Optional[RegulationTypeEnum] = None,
is_active: Optional[bool] = True
) -> List[RegulationDB]:
"""Get all regulations with optional filters."""
query = self.db.query(RegulationDB)
if regulation_type:
query = query.filter(RegulationDB.regulation_type == regulation_type)
if is_active is not None:
query = query.filter(RegulationDB.is_active == is_active)
return query.order_by(RegulationDB.code).all()
def update(self, regulation_id: str, **kwargs) -> Optional[RegulationDB]:
"""Update a regulation."""
regulation = self.get_by_id(regulation_id)
if not regulation:
return None
for key, value in kwargs.items():
if hasattr(regulation, key):
setattr(regulation, key, value)
regulation.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(regulation)
return regulation
def delete(self, regulation_id: str) -> bool:
"""Delete a regulation."""
regulation = self.get_by_id(regulation_id)
if not regulation:
return False
self.db.delete(regulation)
self.db.commit()
return True
def get_active(self) -> List[RegulationDB]:
"""Get all active regulations."""
return self.get_all(is_active=True)
def count(self) -> int:
"""Count all regulations."""
return self.db.query(func.count(RegulationDB.id)).scalar() or 0
class RequirementRepository:
"""Repository for requirements."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
regulation_id: str,
article: str,
title: str,
paragraph: Optional[str] = None,
description: Optional[str] = None,
requirement_text: Optional[str] = None,
breakpilot_interpretation: Optional[str] = None,
is_applicable: bool = True,
priority: int = 2,
) -> RequirementDB:
"""Create a new requirement."""
requirement = RequirementDB(
id=str(uuid.uuid4()),
regulation_id=regulation_id,
article=article,
paragraph=paragraph,
title=title,
description=description,
requirement_text=requirement_text,
breakpilot_interpretation=breakpilot_interpretation,
is_applicable=is_applicable,
priority=priority,
)
self.db.add(requirement)
self.db.commit()
self.db.refresh(requirement)
return requirement
def get_by_id(self, requirement_id: str) -> Optional[RequirementDB]:
"""Get requirement by ID with eager-loaded relationships."""
return (
self.db.query(RequirementDB)
.options(
selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control),
joinedload(RequirementDB.regulation)
)
.filter(RequirementDB.id == requirement_id)
.first()
)
def get_by_regulation(
self,
regulation_id: str,
is_applicable: Optional[bool] = None
) -> List[RequirementDB]:
"""Get all requirements for a regulation with eager-loaded controls."""
query = (
self.db.query(RequirementDB)
.options(
selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control),
joinedload(RequirementDB.regulation)
)
.filter(RequirementDB.regulation_id == regulation_id)
)
if is_applicable is not None:
query = query.filter(RequirementDB.is_applicable == is_applicable)
return query.order_by(RequirementDB.article, RequirementDB.paragraph).all()
def get_by_regulation_code(self, code: str) -> List[RequirementDB]:
"""Get requirements by regulation code with eager-loaded relationships."""
return (
self.db.query(RequirementDB)
.options(
selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control),
joinedload(RequirementDB.regulation)
)
.join(RegulationDB)
.filter(RegulationDB.code == code)
.order_by(RequirementDB.article, RequirementDB.paragraph)
.all()
)
def get_all(self, is_applicable: Optional[bool] = None) -> List[RequirementDB]:
"""Get all requirements with optional filter and eager-loading."""
query = (
self.db.query(RequirementDB)
.options(
selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control),
joinedload(RequirementDB.regulation)
)
)
if is_applicable is not None:
query = query.filter(RequirementDB.is_applicable == is_applicable)
return query.order_by(RequirementDB.article, RequirementDB.paragraph).all()
def get_paginated(
self,
page: int = 1,
page_size: int = 50,
regulation_code: Optional[str] = None,
status: Optional[str] = None,
is_applicable: Optional[bool] = None,
search: Optional[str] = None,
) -> Tuple[List[RequirementDB], int]:
"""
Get paginated requirements with eager-loaded relationships.
Returns tuple of (items, total_count).
"""
query = (
self.db.query(RequirementDB)
.options(
selectinload(RequirementDB.control_mappings).selectinload(ControlMappingDB.control),
joinedload(RequirementDB.regulation)
)
)
# Filters
if regulation_code:
query = query.join(RegulationDB).filter(RegulationDB.code == regulation_code)
if status:
query = query.filter(RequirementDB.implementation_status == status)
if is_applicable is not None:
query = query.filter(RequirementDB.is_applicable == is_applicable)
if search:
search_term = f"%{search}%"
query = query.filter(
or_(
RequirementDB.title.ilike(search_term),
RequirementDB.description.ilike(search_term),
RequirementDB.article.ilike(search_term),
)
)
# Count before pagination
total = query.count()
# Apply pagination and ordering
items = (
query
.order_by(RequirementDB.priority.desc(), RequirementDB.article, RequirementDB.paragraph)
.offset((page - 1) * page_size)
.limit(page_size)
.all()
)
return items, total
def count(self) -> int:
"""Count all requirements."""
return self.db.query(func.count(RequirementDB.id)).scalar() or 0
class ControlRepository:
"""Repository for controls."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
control_id: str,
domain: ControlDomainEnum,
control_type: str,
title: str,
pass_criteria: str,
description: Optional[str] = None,
implementation_guidance: Optional[str] = None,
code_reference: Optional[str] = None,
is_automated: bool = False,
automation_tool: Optional[str] = None,
owner: Optional[str] = None,
review_frequency_days: int = 90,
) -> ControlDB:
"""Create a new control."""
control = ControlDB(
id=str(uuid.uuid4()),
control_id=control_id,
domain=domain,
control_type=control_type,
title=title,
description=description,
pass_criteria=pass_criteria,
implementation_guidance=implementation_guidance,
code_reference=code_reference,
is_automated=is_automated,
automation_tool=automation_tool,
owner=owner,
review_frequency_days=review_frequency_days,
)
self.db.add(control)
self.db.commit()
self.db.refresh(control)
return control
def get_by_id(self, control_uuid: str) -> Optional[ControlDB]:
"""Get control by UUID with eager-loaded relationships."""
return (
self.db.query(ControlDB)
.options(
selectinload(ControlDB.mappings).selectinload(ControlMappingDB.requirement),
selectinload(ControlDB.evidence)
)
.filter(ControlDB.id == control_uuid)
.first()
)
def get_by_control_id(self, control_id: str) -> Optional[ControlDB]:
"""Get control by control_id (e.g., 'PRIV-001') with eager-loaded relationships."""
return (
self.db.query(ControlDB)
.options(
selectinload(ControlDB.mappings).selectinload(ControlMappingDB.requirement),
selectinload(ControlDB.evidence)
)
.filter(ControlDB.control_id == control_id)
.first()
)
def get_all(
self,
domain: Optional[ControlDomainEnum] = None,
status: Optional[ControlStatusEnum] = None,
is_automated: Optional[bool] = None,
) -> List[ControlDB]:
"""Get all controls with optional filters and eager-loading."""
query = (
self.db.query(ControlDB)
.options(
selectinload(ControlDB.mappings),
selectinload(ControlDB.evidence)
)
)
if domain:
query = query.filter(ControlDB.domain == domain)
if status:
query = query.filter(ControlDB.status == status)
if is_automated is not None:
query = query.filter(ControlDB.is_automated == is_automated)
return query.order_by(ControlDB.control_id).all()
def get_paginated(
self,
page: int = 1,
page_size: int = 50,
domain: Optional[ControlDomainEnum] = None,
status: Optional[ControlStatusEnum] = None,
is_automated: Optional[bool] = None,
search: Optional[str] = None,
) -> Tuple[List[ControlDB], int]:
"""
Get paginated controls with eager-loaded relationships.
Returns tuple of (items, total_count).
"""
query = (
self.db.query(ControlDB)
.options(
selectinload(ControlDB.mappings),
selectinload(ControlDB.evidence)
)
)
if domain:
query = query.filter(ControlDB.domain == domain)
if status:
query = query.filter(ControlDB.status == status)
if is_automated is not None:
query = query.filter(ControlDB.is_automated == is_automated)
if search:
search_term = f"%{search}%"
query = query.filter(
or_(
ControlDB.title.ilike(search_term),
ControlDB.description.ilike(search_term),
ControlDB.control_id.ilike(search_term),
)
)
total = query.count()
items = (
query
.order_by(ControlDB.control_id)
.offset((page - 1) * page_size)
.limit(page_size)
.all()
)
return items, total
def get_by_domain(self, domain: ControlDomainEnum) -> List[ControlDB]:
"""Get all controls in a domain."""
return self.get_all(domain=domain)
def get_by_status(self, status: ControlStatusEnum) -> List[ControlDB]:
"""Get all controls with a specific status."""
return self.get_all(status=status)
def update_status(
self,
control_id: str,
status: ControlStatusEnum,
status_notes: Optional[str] = None
) -> Optional[ControlDB]:
"""Update control status."""
control = self.get_by_control_id(control_id)
if not control:
return None
control.status = status
if status_notes:
control.status_notes = status_notes
control.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(control)
return control
def mark_reviewed(self, control_id: str) -> Optional[ControlDB]:
"""Mark control as reviewed."""
control = self.get_by_control_id(control_id)
if not control:
return None
control.last_reviewed_at = datetime.utcnow()
from datetime import timedelta
control.next_review_at = datetime.utcnow() + timedelta(days=control.review_frequency_days)
control.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(control)
return control
def get_due_for_review(self) -> List[ControlDB]:
"""Get controls due for review."""
return (
self.db.query(ControlDB)
.filter(
or_(
ControlDB.next_review_at == None,
ControlDB.next_review_at <= datetime.utcnow()
)
)
.order_by(ControlDB.next_review_at)
.all()
)
def get_statistics(self) -> Dict[str, Any]:
"""Get control statistics by status and domain."""
total = self.db.query(func.count(ControlDB.id)).scalar()
by_status = dict(
self.db.query(ControlDB.status, func.count(ControlDB.id))
.group_by(ControlDB.status)
.all()
)
by_domain = dict(
self.db.query(ControlDB.domain, func.count(ControlDB.id))
.group_by(ControlDB.domain)
.all()
)
passed = by_status.get(ControlStatusEnum.PASS, 0)
partial = by_status.get(ControlStatusEnum.PARTIAL, 0)
score = 0.0
if total > 0:
score = ((passed + (partial * 0.5)) / total) * 100
return {
"total": total,
"by_status": {str(k.value) if k else "none": v for k, v in by_status.items()},
"by_domain": {str(k.value) if k else "none": v for k, v in by_domain.items()},
"compliance_score": round(score, 1),
}
class ControlMappingRepository:
"""Repository for requirement-control mappings."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
requirement_id: str,
control_id: str,
coverage_level: str = "full",
notes: Optional[str] = None,
) -> ControlMappingDB:
"""Create a mapping."""
# Get the control UUID from control_id
control = self.db.query(ControlDB).filter(ControlDB.control_id == control_id).first()
if not control:
raise ValueError(f"Control {control_id} not found")
mapping = ControlMappingDB(
id=str(uuid.uuid4()),
requirement_id=requirement_id,
control_id=control.id,
coverage_level=coverage_level,
notes=notes,
)
self.db.add(mapping)
self.db.commit()
self.db.refresh(mapping)
return mapping
def get_by_requirement(self, requirement_id: str) -> List[ControlMappingDB]:
"""Get all mappings for a requirement."""
return (
self.db.query(ControlMappingDB)
.filter(ControlMappingDB.requirement_id == requirement_id)
.all()
)
def get_by_control(self, control_uuid: str) -> List[ControlMappingDB]:
"""Get all mappings for a control."""
return (
self.db.query(ControlMappingDB)
.filter(ControlMappingDB.control_id == control_uuid)
.all()
)
class EvidenceRepository:
"""Repository for evidence."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
control_id: str,
evidence_type: str,
title: str,
description: Optional[str] = None,
artifact_path: Optional[str] = None,
artifact_url: Optional[str] = None,
artifact_hash: Optional[str] = None,
file_size_bytes: Optional[int] = None,
mime_type: Optional[str] = None,
valid_until: Optional[datetime] = None,
source: str = "manual",
ci_job_id: Optional[str] = None,
uploaded_by: Optional[str] = None,
) -> EvidenceDB:
"""Create evidence record."""
# Get control UUID
control = self.db.query(ControlDB).filter(ControlDB.control_id == control_id).first()
if not control:
raise ValueError(f"Control {control_id} not found")
evidence = EvidenceDB(
id=str(uuid.uuid4()),
control_id=control.id,
evidence_type=evidence_type,
title=title,
description=description,
artifact_path=artifact_path,
artifact_url=artifact_url,
artifact_hash=artifact_hash,
file_size_bytes=file_size_bytes,
mime_type=mime_type,
valid_until=valid_until,
source=source,
ci_job_id=ci_job_id,
uploaded_by=uploaded_by,
)
self.db.add(evidence)
self.db.commit()
self.db.refresh(evidence)
return evidence
def get_by_id(self, evidence_id: str) -> Optional[EvidenceDB]:
"""Get evidence by ID."""
return self.db.query(EvidenceDB).filter(EvidenceDB.id == evidence_id).first()
def get_by_control(
self,
control_id: str,
status: Optional[EvidenceStatusEnum] = None
) -> List[EvidenceDB]:
"""Get all evidence for a control."""
control = self.db.query(ControlDB).filter(ControlDB.control_id == control_id).first()
if not control:
return []
query = self.db.query(EvidenceDB).filter(EvidenceDB.control_id == control.id)
if status:
query = query.filter(EvidenceDB.status == status)
return query.order_by(EvidenceDB.collected_at.desc()).all()
def get_all(
self,
evidence_type: Optional[str] = None,
status: Optional[EvidenceStatusEnum] = None,
limit: int = 100,
) -> List[EvidenceDB]:
"""Get all evidence with filters."""
query = self.db.query(EvidenceDB)
if evidence_type:
query = query.filter(EvidenceDB.evidence_type == evidence_type)
if status:
query = query.filter(EvidenceDB.status == status)
return query.order_by(EvidenceDB.collected_at.desc()).limit(limit).all()
def update_status(self, evidence_id: str, status: EvidenceStatusEnum) -> Optional[EvidenceDB]:
"""Update evidence status."""
evidence = self.get_by_id(evidence_id)
if not evidence:
return None
evidence.status = status
evidence.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(evidence)
return evidence
def get_statistics(self) -> Dict[str, Any]:
"""Get evidence statistics."""
total = self.db.query(func.count(EvidenceDB.id)).scalar()
by_type = dict(
self.db.query(EvidenceDB.evidence_type, func.count(EvidenceDB.id))
.group_by(EvidenceDB.evidence_type)
.all()
)
by_status = dict(
self.db.query(EvidenceDB.status, func.count(EvidenceDB.id))
.group_by(EvidenceDB.status)
.all()
)
valid = by_status.get(EvidenceStatusEnum.VALID, 0)
coverage = (valid / total * 100) if total > 0 else 0
return {
"total": total,
"by_type": by_type,
"by_status": {str(k.value) if k else "none": v for k, v in by_status.items()},
"coverage_percent": round(coverage, 1),
}
class RiskRepository:
"""Repository for risks."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
risk_id: str,
title: str,
category: str,
likelihood: int,
impact: int,
description: Optional[str] = None,
mitigating_controls: Optional[List[str]] = None,
owner: Optional[str] = None,
treatment_plan: Optional[str] = None,
) -> RiskDB:
"""Create a risk."""
inherent_risk = RiskDB.calculate_risk_level(likelihood, impact)
risk = RiskDB(
id=str(uuid.uuid4()),
risk_id=risk_id,
title=title,
description=description,
category=category,
likelihood=likelihood,
impact=impact,
inherent_risk=inherent_risk,
mitigating_controls=mitigating_controls or [],
owner=owner,
treatment_plan=treatment_plan,
)
self.db.add(risk)
self.db.commit()
self.db.refresh(risk)
return risk
def get_by_id(self, risk_uuid: str) -> Optional[RiskDB]:
"""Get risk by UUID."""
return self.db.query(RiskDB).filter(RiskDB.id == risk_uuid).first()
def get_by_risk_id(self, risk_id: str) -> Optional[RiskDB]:
"""Get risk by risk_id (e.g., 'RISK-001')."""
return self.db.query(RiskDB).filter(RiskDB.risk_id == risk_id).first()
def get_all(
self,
category: Optional[str] = None,
status: Optional[str] = None,
min_risk_level: Optional[RiskLevelEnum] = None,
) -> List[RiskDB]:
"""Get all risks with filters."""
query = self.db.query(RiskDB)
if category:
query = query.filter(RiskDB.category == category)
if status:
query = query.filter(RiskDB.status == status)
if min_risk_level:
risk_order = {
RiskLevelEnum.LOW: 1,
RiskLevelEnum.MEDIUM: 2,
RiskLevelEnum.HIGH: 3,
RiskLevelEnum.CRITICAL: 4,
}
min_order = risk_order.get(min_risk_level, 1)
query = query.filter(
RiskDB.inherent_risk.in_(
[k for k, v in risk_order.items() if v >= min_order]
)
)
return query.order_by(RiskDB.risk_id).all()
def update(self, risk_id: str, **kwargs) -> Optional[RiskDB]:
"""Update a risk."""
risk = self.get_by_risk_id(risk_id)
if not risk:
return None
for key, value in kwargs.items():
if hasattr(risk, key):
setattr(risk, key, value)
# Recalculate risk levels if likelihood/impact changed
if 'likelihood' in kwargs or 'impact' in kwargs:
risk.inherent_risk = RiskDB.calculate_risk_level(risk.likelihood, risk.impact)
if 'residual_likelihood' in kwargs or 'residual_impact' in kwargs:
if risk.residual_likelihood and risk.residual_impact:
risk.residual_risk = RiskDB.calculate_risk_level(
risk.residual_likelihood, risk.residual_impact
)
risk.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(risk)
return risk
def get_matrix_data(self) -> Dict[str, Any]:
"""Get data for risk matrix visualization."""
risks = self.get_all()
matrix = {}
for risk in risks:
key = f"{risk.likelihood}_{risk.impact}"
if key not in matrix:
matrix[key] = []
matrix[key].append({
"risk_id": risk.risk_id,
"title": risk.title,
"inherent_risk": risk.inherent_risk.value if risk.inherent_risk else None,
})
return {
"matrix": matrix,
"total_risks": len(risks),
"by_level": {
"critical": len([r for r in risks if r.inherent_risk == RiskLevelEnum.CRITICAL]),
"high": len([r for r in risks if r.inherent_risk == RiskLevelEnum.HIGH]),
"medium": len([r for r in risks if r.inherent_risk == RiskLevelEnum.MEDIUM]),
"low": len([r for r in risks if r.inherent_risk == RiskLevelEnum.LOW]),
}
}
class AuditExportRepository:
"""Repository for audit exports."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
export_type: str,
requested_by: str,
export_name: Optional[str] = None,
included_regulations: Optional[List[str]] = None,
included_domains: Optional[List[str]] = None,
date_range_start: Optional[date] = None,
date_range_end: Optional[date] = None,
) -> AuditExportDB:
"""Create an export request."""
export = AuditExportDB(
id=str(uuid.uuid4()),
export_type=export_type,
export_name=export_name or f"audit_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
requested_by=requested_by,
included_regulations=included_regulations,
included_domains=included_domains,
date_range_start=date_range_start,
date_range_end=date_range_end,
)
self.db.add(export)
self.db.commit()
self.db.refresh(export)
return export
def get_by_id(self, export_id: str) -> Optional[AuditExportDB]:
"""Get export by ID."""
return self.db.query(AuditExportDB).filter(AuditExportDB.id == export_id).first()
def get_all(self, limit: int = 50) -> List[AuditExportDB]:
"""Get all exports."""
return (
self.db.query(AuditExportDB)
.order_by(AuditExportDB.requested_at.desc())
.limit(limit)
.all()
)
def update_status(
self,
export_id: str,
status: ExportStatusEnum,
file_path: Optional[str] = None,
file_hash: Optional[str] = None,
file_size_bytes: Optional[int] = None,
error_message: Optional[str] = None,
total_controls: Optional[int] = None,
total_evidence: Optional[int] = None,
compliance_score: Optional[float] = None,
) -> Optional[AuditExportDB]:
"""Update export status."""
export = self.get_by_id(export_id)
if not export:
return None
export.status = status
if file_path:
export.file_path = file_path
if file_hash:
export.file_hash = file_hash
if file_size_bytes:
export.file_size_bytes = file_size_bytes
if error_message:
export.error_message = error_message
if total_controls is not None:
export.total_controls = total_controls
if total_evidence is not None:
export.total_evidence = total_evidence
if compliance_score is not None:
export.compliance_score = compliance_score
if status == ExportStatusEnum.COMPLETED:
export.completed_at = datetime.utcnow()
export.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(export)
return export
class ServiceModuleRepository:
"""Repository for service modules (Sprint 3)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
name: str,
display_name: str,
service_type: str,
description: Optional[str] = None,
port: Optional[int] = None,
technology_stack: Optional[List[str]] = None,
repository_path: Optional[str] = None,
docker_image: Optional[str] = None,
data_categories: Optional[List[str]] = None,
processes_pii: bool = False,
processes_health_data: bool = False,
ai_components: bool = False,
criticality: str = "medium",
owner_team: Optional[str] = None,
owner_contact: Optional[str] = None,
) -> "ServiceModuleDB":
"""Create a service module."""
from .models import ServiceModuleDB, ServiceTypeEnum
module = ServiceModuleDB(
id=str(uuid.uuid4()),
name=name,
display_name=display_name,
description=description,
service_type=ServiceTypeEnum(service_type),
port=port,
technology_stack=technology_stack or [],
repository_path=repository_path,
docker_image=docker_image,
data_categories=data_categories or [],
processes_pii=processes_pii,
processes_health_data=processes_health_data,
ai_components=ai_components,
criticality=criticality,
owner_team=owner_team,
owner_contact=owner_contact,
)
self.db.add(module)
self.db.commit()
self.db.refresh(module)
return module
def get_by_id(self, module_id: str) -> Optional["ServiceModuleDB"]:
"""Get module by ID."""
from .models import ServiceModuleDB
return self.db.query(ServiceModuleDB).filter(ServiceModuleDB.id == module_id).first()
def get_by_name(self, name: str) -> Optional["ServiceModuleDB"]:
"""Get module by name."""
from .models import ServiceModuleDB
return self.db.query(ServiceModuleDB).filter(ServiceModuleDB.name == name).first()
def get_all(
self,
service_type: Optional[str] = None,
criticality: Optional[str] = None,
processes_pii: Optional[bool] = None,
ai_components: Optional[bool] = None,
) -> List["ServiceModuleDB"]:
"""Get all modules with filters."""
from .models import ServiceModuleDB, ServiceTypeEnum
query = self.db.query(ServiceModuleDB).filter(ServiceModuleDB.is_active == True)
if service_type:
query = query.filter(ServiceModuleDB.service_type == ServiceTypeEnum(service_type))
if criticality:
query = query.filter(ServiceModuleDB.criticality == criticality)
if processes_pii is not None:
query = query.filter(ServiceModuleDB.processes_pii == processes_pii)
if ai_components is not None:
query = query.filter(ServiceModuleDB.ai_components == ai_components)
return query.order_by(ServiceModuleDB.name).all()
def get_with_regulations(self, module_id: str) -> Optional["ServiceModuleDB"]:
"""Get module with regulation mappings loaded."""
from .models import ServiceModuleDB, ModuleRegulationMappingDB
from sqlalchemy.orm import selectinload
return (
self.db.query(ServiceModuleDB)
.options(
selectinload(ServiceModuleDB.regulation_mappings)
.selectinload(ModuleRegulationMappingDB.regulation)
)
.filter(ServiceModuleDB.id == module_id)
.first()
)
def add_regulation_mapping(
self,
module_id: str,
regulation_id: str,
relevance_level: str = "medium",
notes: Optional[str] = None,
applicable_articles: Optional[List[str]] = None,
) -> "ModuleRegulationMappingDB":
"""Add a regulation mapping to a module."""
from .models import ModuleRegulationMappingDB, RelevanceLevelEnum
mapping = ModuleRegulationMappingDB(
id=str(uuid.uuid4()),
module_id=module_id,
regulation_id=regulation_id,
relevance_level=RelevanceLevelEnum(relevance_level),
notes=notes,
applicable_articles=applicable_articles,
)
self.db.add(mapping)
self.db.commit()
self.db.refresh(mapping)
return mapping
def get_overview(self) -> Dict[str, Any]:
"""Get overview statistics for all modules."""
from .models import ServiceModuleDB, ModuleRegulationMappingDB
from sqlalchemy import func
modules = self.get_all()
total = len(modules)
by_type = {}
by_criticality = {}
pii_count = 0
ai_count = 0
for m in modules:
type_key = m.service_type.value if m.service_type else "unknown"
by_type[type_key] = by_type.get(type_key, 0) + 1
by_criticality[m.criticality] = by_criticality.get(m.criticality, 0) + 1
if m.processes_pii:
pii_count += 1
if m.ai_components:
ai_count += 1
# Get regulation coverage
regulation_coverage = {}
mappings = self.db.query(ModuleRegulationMappingDB).all()
for mapping in mappings:
reg = mapping.regulation
if reg:
code = reg.code
regulation_coverage[code] = regulation_coverage.get(code, 0) + 1
# Calculate average compliance score
scores = [m.compliance_score for m in modules if m.compliance_score is not None]
avg_score = sum(scores) / len(scores) if scores else None
return {
"total_modules": total,
"modules_by_type": by_type,
"modules_by_criticality": by_criticality,
"modules_processing_pii": pii_count,
"modules_with_ai": ai_count,
"average_compliance_score": round(avg_score, 1) if avg_score else None,
"regulations_coverage": regulation_coverage,
}
def seed_from_data(self, services_data: List[Dict[str, Any]], force: bool = False) -> Dict[str, int]:
"""Seed modules from service_modules.py data."""
from .models import ServiceModuleDB
modules_created = 0
mappings_created = 0
for svc in services_data:
# Check if module exists
existing = self.get_by_name(svc["name"])
if existing and not force:
continue
if existing and force:
# Delete existing module (cascades to mappings)
self.db.delete(existing)
self.db.commit()
# Create module
module = self.create(
name=svc["name"],
display_name=svc["display_name"],
description=svc.get("description"),
service_type=svc["service_type"],
port=svc.get("port"),
technology_stack=svc.get("technology_stack"),
repository_path=svc.get("repository_path"),
docker_image=svc.get("docker_image"),
data_categories=svc.get("data_categories"),
processes_pii=svc.get("processes_pii", False),
processes_health_data=svc.get("processes_health_data", False),
ai_components=svc.get("ai_components", False),
criticality=svc.get("criticality", "medium"),
owner_team=svc.get("owner_team"),
)
modules_created += 1
# Create regulation mappings
for reg_data in svc.get("regulations", []):
# Find regulation by code
reg = self.db.query(RegulationDB).filter(
RegulationDB.code == reg_data["code"]
).first()
if reg:
self.add_regulation_mapping(
module_id=module.id,
regulation_id=reg.id,
relevance_level=reg_data.get("relevance", "medium"),
notes=reg_data.get("notes"),
)
mappings_created += 1
return {
"modules_created": modules_created,
"mappings_created": mappings_created,
}
class AuditSessionRepository:
"""Repository for audit sessions (Sprint 3: Auditor-Verbesserungen)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
name: str,
auditor_name: str,
description: Optional[str] = None,
auditor_email: Optional[str] = None,
regulation_ids: Optional[List[str]] = None,
) -> AuditSessionDB:
"""Create a new audit session."""
session = AuditSessionDB(
id=str(uuid.uuid4()),
name=name,
description=description,
auditor_name=auditor_name,
auditor_email=auditor_email,
regulation_ids=regulation_ids,
status=AuditSessionStatusEnum.DRAFT,
)
self.db.add(session)
self.db.commit()
self.db.refresh(session)
return session
def get_by_id(self, session_id: str) -> Optional[AuditSessionDB]:
"""Get audit session by ID with eager-loaded signoffs."""
return (
self.db.query(AuditSessionDB)
.options(
selectinload(AuditSessionDB.signoffs)
.selectinload(AuditSignOffDB.requirement)
)
.filter(AuditSessionDB.id == session_id)
.first()
)
def get_all(
self,
status: Optional[AuditSessionStatusEnum] = None,
limit: int = 50,
) -> List[AuditSessionDB]:
"""Get all audit sessions with optional status filter."""
query = self.db.query(AuditSessionDB)
if status:
query = query.filter(AuditSessionDB.status == status)
return query.order_by(AuditSessionDB.created_at.desc()).limit(limit).all()
def update_status(
self,
session_id: str,
status: AuditSessionStatusEnum,
) -> Optional[AuditSessionDB]:
"""Update session status and set appropriate timestamps."""
session = self.get_by_id(session_id)
if not session:
return None
session.status = status
if status == AuditSessionStatusEnum.IN_PROGRESS and not session.started_at:
session.started_at = datetime.utcnow()
elif status == AuditSessionStatusEnum.COMPLETED:
session.completed_at = datetime.utcnow()
session.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(session)
return session
def update_progress(
self,
session_id: str,
total_items: Optional[int] = None,
completed_items: Optional[int] = None,
) -> Optional[AuditSessionDB]:
"""Update session progress counters."""
session = self.db.query(AuditSessionDB).filter(
AuditSessionDB.id == session_id
).first()
if not session:
return None
if total_items is not None:
session.total_items = total_items
if completed_items is not None:
session.completed_items = completed_items
session.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(session)
return session
def start_session(self, session_id: str) -> Optional[AuditSessionDB]:
"""
Start an audit session:
- Set status to IN_PROGRESS
- Initialize total_items based on requirements count
"""
session = self.get_by_id(session_id)
if not session:
return None
# Count requirements for this session
query = self.db.query(func.count(RequirementDB.id))
if session.regulation_ids:
query = query.join(RegulationDB).filter(
RegulationDB.id.in_(session.regulation_ids)
)
total_requirements = query.scalar() or 0
session.status = AuditSessionStatusEnum.IN_PROGRESS
session.started_at = datetime.utcnow()
session.total_items = total_requirements
session.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(session)
return session
def delete(self, session_id: str) -> bool:
"""Delete an audit session (cascades to signoffs)."""
session = self.db.query(AuditSessionDB).filter(
AuditSessionDB.id == session_id
).first()
if not session:
return False
self.db.delete(session)
self.db.commit()
return True
def get_statistics(self, session_id: str) -> Dict[str, Any]:
"""Get detailed statistics for an audit session."""
session = self.get_by_id(session_id)
if not session:
return {}
signoffs = session.signoffs or []
stats = {
"total": session.total_items or 0,
"completed": len([s for s in signoffs if s.result != AuditResultEnum.PENDING]),
"compliant": len([s for s in signoffs if s.result == AuditResultEnum.COMPLIANT]),
"compliant_with_notes": len([s for s in signoffs if s.result == AuditResultEnum.COMPLIANT_WITH_NOTES]),
"non_compliant": len([s for s in signoffs if s.result == AuditResultEnum.NON_COMPLIANT]),
"not_applicable": len([s for s in signoffs if s.result == AuditResultEnum.NOT_APPLICABLE]),
"pending": len([s for s in signoffs if s.result == AuditResultEnum.PENDING]),
"signed": len([s for s in signoffs if s.signature_hash]),
}
total = stats["total"] if stats["total"] > 0 else 1
stats["completion_percentage"] = round(
(stats["completed"] / total) * 100, 1
)
return stats
class AuditSignOffRepository:
"""Repository for audit sign-offs (Sprint 3: Auditor-Verbesserungen)."""
def __init__(self, db: DBSession):
self.db = db
def create(
self,
session_id: str,
requirement_id: str,
result: AuditResultEnum = AuditResultEnum.PENDING,
notes: Optional[str] = None,
) -> AuditSignOffDB:
"""Create a new sign-off for a requirement."""
signoff = AuditSignOffDB(
id=str(uuid.uuid4()),
session_id=session_id,
requirement_id=requirement_id,
result=result,
notes=notes,
)
self.db.add(signoff)
self.db.commit()
self.db.refresh(signoff)
return signoff
def get_by_id(self, signoff_id: str) -> Optional[AuditSignOffDB]:
"""Get sign-off by ID."""
return (
self.db.query(AuditSignOffDB)
.options(joinedload(AuditSignOffDB.requirement))
.filter(AuditSignOffDB.id == signoff_id)
.first()
)
def get_by_session_and_requirement(
self,
session_id: str,
requirement_id: str,
) -> Optional[AuditSignOffDB]:
"""Get sign-off by session and requirement ID."""
return (
self.db.query(AuditSignOffDB)
.filter(
and_(
AuditSignOffDB.session_id == session_id,
AuditSignOffDB.requirement_id == requirement_id,
)
)
.first()
)
def get_by_session(
self,
session_id: str,
result_filter: Optional[AuditResultEnum] = None,
) -> List[AuditSignOffDB]:
"""Get all sign-offs for a session."""
query = (
self.db.query(AuditSignOffDB)
.options(joinedload(AuditSignOffDB.requirement))
.filter(AuditSignOffDB.session_id == session_id)
)
if result_filter:
query = query.filter(AuditSignOffDB.result == result_filter)
return query.order_by(AuditSignOffDB.created_at).all()
def update(
self,
signoff_id: str,
result: Optional[AuditResultEnum] = None,
notes: Optional[str] = None,
sign: bool = False,
signed_by: Optional[str] = None,
) -> Optional[AuditSignOffDB]:
"""Update a sign-off with optional digital signature."""
signoff = self.db.query(AuditSignOffDB).filter(
AuditSignOffDB.id == signoff_id
).first()
if not signoff:
return None
if result is not None:
signoff.result = result
if notes is not None:
signoff.notes = notes
if sign and signed_by:
signoff.create_signature(signed_by)
signoff.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(signoff)
# Update session progress
self._update_session_progress(signoff.session_id)
return signoff
def sign_off(
self,
session_id: str,
requirement_id: str,
result: AuditResultEnum,
notes: Optional[str] = None,
sign: bool = False,
signed_by: Optional[str] = None,
) -> AuditSignOffDB:
"""
Create or update a sign-off for a requirement.
This is the main method for auditors to record their findings.
"""
# Check if sign-off already exists
signoff = self.get_by_session_and_requirement(session_id, requirement_id)
if signoff:
# Update existing
signoff.result = result
if notes is not None:
signoff.notes = notes
if sign and signed_by:
signoff.create_signature(signed_by)
signoff.updated_at = datetime.utcnow()
else:
# Create new
signoff = AuditSignOffDB(
id=str(uuid.uuid4()),
session_id=session_id,
requirement_id=requirement_id,
result=result,
notes=notes,
)
if sign and signed_by:
signoff.create_signature(signed_by)
self.db.add(signoff)
self.db.commit()
self.db.refresh(signoff)
# Update session progress
self._update_session_progress(session_id)
return signoff
def _update_session_progress(self, session_id: str) -> None:
"""Update the session's completed_items count."""
completed = (
self.db.query(func.count(AuditSignOffDB.id))
.filter(
and_(
AuditSignOffDB.session_id == session_id,
AuditSignOffDB.result != AuditResultEnum.PENDING,
)
)
.scalar()
) or 0
session = self.db.query(AuditSessionDB).filter(
AuditSessionDB.id == session_id
).first()
if session:
session.completed_items = completed
session.updated_at = datetime.utcnow()
self.db.commit()
def get_checklist(
self,
session_id: str,
page: int = 1,
page_size: int = 50,
result_filter: Optional[AuditResultEnum] = None,
regulation_code: Optional[str] = None,
search: Optional[str] = None,
) -> Tuple[List[Dict[str, Any]], int]:
"""
Get audit checklist items for a session with pagination.
Returns requirements with their sign-off status.
"""
session = self.db.query(AuditSessionDB).filter(
AuditSessionDB.id == session_id
).first()
if not session:
return [], 0
# Base query for requirements
query = (
self.db.query(RequirementDB)
.options(
joinedload(RequirementDB.regulation),
selectinload(RequirementDB.control_mappings),
)
)
# Filter by session's regulation_ids if set
if session.regulation_ids:
query = query.filter(RequirementDB.regulation_id.in_(session.regulation_ids))
# Filter by regulation code
if regulation_code:
query = query.join(RegulationDB).filter(RegulationDB.code == regulation_code)
# Search
if search:
search_term = f"%{search}%"
query = query.filter(
or_(
RequirementDB.title.ilike(search_term),
RequirementDB.article.ilike(search_term),
)
)
# Get existing sign-offs for this session
signoffs_map = {}
signoffs = (
self.db.query(AuditSignOffDB)
.filter(AuditSignOffDB.session_id == session_id)
.all()
)
for s in signoffs:
signoffs_map[s.requirement_id] = s
# Filter by result if specified
if result_filter:
if result_filter == AuditResultEnum.PENDING:
# Requirements without sign-off or with pending status
signed_req_ids = [
s.requirement_id for s in signoffs
if s.result != AuditResultEnum.PENDING
]
if signed_req_ids:
query = query.filter(~RequirementDB.id.in_(signed_req_ids))
else:
# Requirements with specific result
matching_req_ids = [
s.requirement_id for s in signoffs
if s.result == result_filter
]
if matching_req_ids:
query = query.filter(RequirementDB.id.in_(matching_req_ids))
else:
return [], 0
# Count and paginate
total = query.count()
requirements = (
query
.order_by(RequirementDB.article, RequirementDB.paragraph)
.offset((page - 1) * page_size)
.limit(page_size)
.all()
)
# Build checklist items
items = []
for req in requirements:
signoff = signoffs_map.get(req.id)
items.append({
"requirement_id": req.id,
"regulation_code": req.regulation.code if req.regulation else None,
"regulation_name": req.regulation.name if req.regulation else None,
"article": req.article,
"paragraph": req.paragraph,
"title": req.title,
"description": req.description,
"current_result": signoff.result.value if signoff else AuditResultEnum.PENDING.value,
"notes": signoff.notes if signoff else None,
"is_signed": bool(signoff.signature_hash) if signoff else False,
"signed_at": signoff.signed_at if signoff else None,
"signed_by": signoff.signed_by if signoff else None,
"evidence_count": len(req.control_mappings) if req.control_mappings else 0,
"controls_mapped": len(req.control_mappings) if req.control_mappings else 0,
})
return items, total
def delete(self, signoff_id: str) -> bool:
"""Delete a sign-off."""
signoff = self.db.query(AuditSignOffDB).filter(
AuditSignOffDB.id == signoff_id
).first()
if not signoff:
return False
session_id = signoff.session_id
self.db.delete(signoff)
self.db.commit()
# Update session progress
self._update_session_progress(session_id)
return True