Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Failing after 30s
CI / test-python-backend-compliance (push) Successful in 30s
CI / test-python-document-crawler (push) Successful in 21s
CI / test-python-dsms-gateway (push) Successful in 17s
- Ruff: 144 auto-fixes (unused imports, == None → is None), F821/F811/F841 manuell - CVEs: python-multipart>=0.0.22, weasyprint>=68.0, pillow>=12.1.1, npm audit fix (0 vulns) - TS: 5 tote Drafting-Engine-Dateien entfernt, allowed-facts/sanitizer/StepHeader/context fixes - Tests: +104 (ISMS 58, Evidence 18, VVT 14, Generation 14) → 1449 passed - Refactoring: collect_ci_evidence (F→A), row_to_response (E→A), extract_requirements (E→A) - Dead Code: pca-platform, 7 Go-Handler, dsr_api.py, duplicate Schemas entfernt Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
435 lines
16 KiB
Python
435 lines
16 KiB
Python
"""
|
|
Compliance Report Generator Service.
|
|
|
|
Generates periodic compliance reports (weekly, monthly, quarterly, yearly).
|
|
Reports include:
|
|
- Compliance score trends
|
|
- Control status summary
|
|
- Risk assessment summary
|
|
- Evidence coverage
|
|
- Action items / recommendations
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime, date, timedelta
|
|
from typing import Dict, List, Any, Optional
|
|
from enum import Enum
|
|
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func
|
|
|
|
from ..db.models import (
|
|
RequirementDB,
|
|
ControlDB,
|
|
ControlMappingDB,
|
|
ControlStatusEnum,
|
|
RiskLevelEnum,
|
|
)
|
|
from ..db.repository import (
|
|
RegulationRepository,
|
|
ControlRepository,
|
|
EvidenceRepository,
|
|
RiskRepository,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ReportPeriod(str, Enum):
|
|
WEEKLY = "weekly"
|
|
MONTHLY = "monthly"
|
|
QUARTERLY = "quarterly"
|
|
YEARLY = "yearly"
|
|
|
|
|
|
class ComplianceReportGenerator:
|
|
"""Generates compliance reports for different time periods."""
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
self.reg_repo = RegulationRepository(db)
|
|
self.ctrl_repo = ControlRepository(db)
|
|
self.evidence_repo = EvidenceRepository(db)
|
|
self.risk_repo = RiskRepository(db)
|
|
|
|
def generate_report(
|
|
self,
|
|
period: ReportPeriod,
|
|
as_of_date: Optional[date] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Generate a compliance report for the specified period.
|
|
|
|
Args:
|
|
period: Report period (weekly, monthly, quarterly, yearly)
|
|
as_of_date: Report date (defaults to today)
|
|
|
|
Returns:
|
|
Complete report dictionary
|
|
"""
|
|
if as_of_date is None:
|
|
as_of_date = date.today()
|
|
|
|
# Calculate date ranges
|
|
date_range = self._get_date_range(period, as_of_date)
|
|
|
|
report = {
|
|
"report_metadata": {
|
|
"generated_at": datetime.utcnow().isoformat(),
|
|
"period": period.value,
|
|
"as_of_date": as_of_date.isoformat(),
|
|
"date_range_start": date_range["start"].isoformat(),
|
|
"date_range_end": date_range["end"].isoformat(),
|
|
"report_title": self._get_report_title(period, as_of_date),
|
|
},
|
|
"executive_summary": self._generate_executive_summary(),
|
|
"compliance_score": self._generate_compliance_score_section(),
|
|
"regulations_coverage": self._generate_regulations_coverage(),
|
|
"controls_summary": self._generate_controls_summary(),
|
|
"risks_summary": self._generate_risks_summary(),
|
|
"evidence_summary": self._generate_evidence_summary(),
|
|
"action_items": self._generate_action_items(),
|
|
"trends": self._generate_trends_placeholder(period),
|
|
}
|
|
|
|
return report
|
|
|
|
def _get_date_range(self, period: ReportPeriod, as_of: date) -> Dict[str, date]:
|
|
"""Calculate date range for the reporting period."""
|
|
if period == ReportPeriod.WEEKLY:
|
|
# Last 7 days
|
|
start = as_of - timedelta(days=7)
|
|
elif period == ReportPeriod.MONTHLY:
|
|
# Last 30 days
|
|
start = as_of - timedelta(days=30)
|
|
elif period == ReportPeriod.QUARTERLY:
|
|
# Last 90 days
|
|
start = as_of - timedelta(days=90)
|
|
elif period == ReportPeriod.YEARLY:
|
|
# Last 365 days
|
|
start = as_of - timedelta(days=365)
|
|
else:
|
|
start = as_of - timedelta(days=30)
|
|
|
|
return {"start": start, "end": as_of}
|
|
|
|
def _get_report_title(self, period: ReportPeriod, as_of: date) -> str:
|
|
"""Generate report title based on period."""
|
|
titles = {
|
|
ReportPeriod.WEEKLY: f"Woechentlicher Compliance-Report KW{as_of.isocalendar()[1]} {as_of.year}",
|
|
ReportPeriod.MONTHLY: f"Monatlicher Compliance-Report {as_of.strftime('%B %Y')}",
|
|
ReportPeriod.QUARTERLY: f"Quartals-Compliance-Report Q{(as_of.month - 1) // 3 + 1}/{as_of.year}",
|
|
ReportPeriod.YEARLY: f"Jaehrlicher Compliance-Report {as_of.year}",
|
|
}
|
|
return titles.get(period, f"Compliance Report {as_of.isoformat()}")
|
|
|
|
def _generate_executive_summary(self) -> Dict[str, Any]:
|
|
"""Generate executive summary section."""
|
|
stats = self.ctrl_repo.get_statistics()
|
|
risk_matrix = self.risk_repo.get_matrix_data()
|
|
|
|
total_controls = stats.get("total", 0)
|
|
score = stats.get("compliance_score", 0)
|
|
|
|
# Determine overall status
|
|
if score >= 80:
|
|
status = "GREEN"
|
|
status_text = "Guter Compliance-Stand"
|
|
elif score >= 60:
|
|
status = "YELLOW"
|
|
status_text = "Verbesserungsbedarf"
|
|
else:
|
|
status = "RED"
|
|
status_text = "Kritischer Handlungsbedarf"
|
|
|
|
high_critical_risks = (
|
|
risk_matrix["by_level"].get("critical", 0) +
|
|
risk_matrix["by_level"].get("high", 0)
|
|
)
|
|
|
|
return {
|
|
"overall_status": status,
|
|
"status_text": status_text,
|
|
"compliance_score": score,
|
|
"total_controls": total_controls,
|
|
"high_critical_risks": high_critical_risks,
|
|
"key_findings": self._generate_key_findings(stats, risk_matrix),
|
|
}
|
|
|
|
def _generate_key_findings(
|
|
self,
|
|
ctrl_stats: Dict[str, Any],
|
|
risk_matrix: Dict[str, Any],
|
|
) -> List[str]:
|
|
"""Generate key findings for executive summary."""
|
|
findings = []
|
|
|
|
# Control status findings
|
|
by_status = ctrl_stats.get("by_status", {})
|
|
failed = by_status.get("fail", 0)
|
|
planned = by_status.get("planned", 0)
|
|
|
|
if failed > 0:
|
|
findings.append(f"{failed} Controls im Status 'Fail' - sofortige Massnahmen erforderlich")
|
|
|
|
if planned > 5:
|
|
findings.append(f"{planned} Controls noch nicht implementiert")
|
|
|
|
# Risk findings
|
|
critical = risk_matrix["by_level"].get("critical", 0)
|
|
high = risk_matrix["by_level"].get("high", 0)
|
|
|
|
if critical > 0:
|
|
findings.append(f"{critical} kritische Risiken identifiziert - Eskalation empfohlen")
|
|
|
|
if high > 3:
|
|
findings.append(f"{high} hohe Risiken - priorisierte Behandlung erforderlich")
|
|
|
|
if not findings:
|
|
findings.append("Keine kritischen Befunde - Compliance-Status stabil")
|
|
|
|
return findings
|
|
|
|
def _generate_compliance_score_section(self) -> Dict[str, Any]:
|
|
"""Generate compliance score section with breakdown."""
|
|
stats = self.ctrl_repo.get_statistics()
|
|
|
|
controls = self.ctrl_repo.get_all()
|
|
domain_scores = {}
|
|
domain_stats = {}
|
|
|
|
for ctrl in controls:
|
|
domain = ctrl.domain.value if ctrl.domain else "unknown"
|
|
if domain not in domain_stats:
|
|
domain_stats[domain] = {"total": 0, "pass": 0, "partial": 0}
|
|
|
|
domain_stats[domain]["total"] += 1
|
|
if ctrl.status == ControlStatusEnum.PASS:
|
|
domain_stats[domain]["pass"] += 1
|
|
elif ctrl.status == ControlStatusEnum.PARTIAL:
|
|
domain_stats[domain]["partial"] += 1
|
|
|
|
for domain, ds in domain_stats.items():
|
|
if ds["total"] > 0:
|
|
score = ((ds["pass"] + ds["partial"] * 0.5) / ds["total"]) * 100
|
|
domain_scores[domain] = round(score, 1)
|
|
else:
|
|
domain_scores[domain] = 0
|
|
|
|
return {
|
|
"overall_score": stats.get("compliance_score", 0),
|
|
"by_domain": domain_scores,
|
|
"domain_labels": {
|
|
"gov": "Governance",
|
|
"priv": "Datenschutz",
|
|
"iam": "Identity & Access",
|
|
"crypto": "Kryptografie",
|
|
"sdlc": "Secure Development",
|
|
"ops": "Operations",
|
|
"ai": "KI-spezifisch",
|
|
"cra": "Supply Chain",
|
|
"aud": "Audit",
|
|
},
|
|
}
|
|
|
|
def _generate_regulations_coverage(self) -> Dict[str, Any]:
|
|
"""Generate regulations coverage section."""
|
|
regulations = self.reg_repo.get_all()
|
|
coverage = []
|
|
|
|
for reg in regulations:
|
|
# Count requirements for this regulation
|
|
req_count = self.db.query(func.count(RequirementDB.id)).filter(
|
|
RequirementDB.regulation_id == reg.id
|
|
).scalar() or 0
|
|
|
|
# Count mapped controls
|
|
mapped_controls = self.db.query(func.count(ControlMappingDB.id)).join(
|
|
RequirementDB
|
|
).filter(
|
|
RequirementDB.regulation_id == reg.id
|
|
).scalar() or 0
|
|
|
|
coverage.append({
|
|
"code": reg.code,
|
|
"name": reg.name,
|
|
"requirements": req_count,
|
|
"mapped_controls": mapped_controls,
|
|
"coverage_status": "covered" if mapped_controls > 0 else "pending",
|
|
})
|
|
|
|
return {
|
|
"total_regulations": len(regulations),
|
|
"covered_regulations": len([c for c in coverage if c["coverage_status"] == "covered"]),
|
|
"details": coverage,
|
|
}
|
|
|
|
def _generate_controls_summary(self) -> Dict[str, Any]:
|
|
"""Generate controls summary section."""
|
|
stats = self.ctrl_repo.get_statistics()
|
|
due_for_review = self.ctrl_repo.get_due_for_review()
|
|
|
|
return {
|
|
"total": stats.get("total", 0),
|
|
"by_status": stats.get("by_status", {}),
|
|
"by_domain": stats.get("by_domain", {}),
|
|
"due_for_review": len(due_for_review),
|
|
"review_items": [
|
|
{
|
|
"control_id": c.control_id,
|
|
"title": c.title,
|
|
"last_reviewed": c.last_reviewed_at.isoformat() if c.last_reviewed_at else None,
|
|
}
|
|
for c in due_for_review[:10] # Top 10
|
|
],
|
|
}
|
|
|
|
def _generate_risks_summary(self) -> Dict[str, Any]:
|
|
"""Generate risks summary section."""
|
|
matrix = self.risk_repo.get_matrix_data()
|
|
risks = self.risk_repo.get_all()
|
|
|
|
# Group by category
|
|
by_category = {}
|
|
for risk in risks:
|
|
cat = risk.category or "other"
|
|
if cat not in by_category:
|
|
by_category[cat] = 0
|
|
by_category[cat] += 1
|
|
|
|
# High priority risks
|
|
high_priority = [
|
|
{
|
|
"risk_id": r.risk_id,
|
|
"title": r.title,
|
|
"inherent_risk": r.inherent_risk.value if r.inherent_risk else None,
|
|
"owner": r.owner,
|
|
"status": r.status,
|
|
}
|
|
for r in risks
|
|
if r.inherent_risk in [RiskLevelEnum.CRITICAL, RiskLevelEnum.HIGH]
|
|
]
|
|
|
|
return {
|
|
"total_risks": matrix["total_risks"],
|
|
"by_level": matrix["by_level"],
|
|
"by_category": by_category,
|
|
"high_priority_risks": high_priority,
|
|
"risk_matrix": matrix["matrix"],
|
|
}
|
|
|
|
def _generate_evidence_summary(self) -> Dict[str, Any]:
|
|
"""Generate evidence summary section."""
|
|
stats = self.evidence_repo.get_statistics()
|
|
all_evidence = self.evidence_repo.get_all(limit=100)
|
|
|
|
# Find controls without evidence
|
|
controls = self.ctrl_repo.get_all()
|
|
controls_with_evidence = set()
|
|
|
|
for evidence in all_evidence:
|
|
control = self.db.query(ControlDB).filter(
|
|
ControlDB.id == evidence.control_id
|
|
).first()
|
|
if control:
|
|
controls_with_evidence.add(control.control_id)
|
|
|
|
controls_without_evidence = [
|
|
c.control_id for c in controls
|
|
if c.control_id not in controls_with_evidence
|
|
]
|
|
|
|
return {
|
|
"total_evidence": stats.get("total", 0),
|
|
"by_type": stats.get("by_type", {}),
|
|
"by_status": stats.get("by_status", {}),
|
|
"coverage_percent": stats.get("coverage_percent", 0),
|
|
"controls_without_evidence": controls_without_evidence[:20], # Top 20
|
|
}
|
|
|
|
def _generate_action_items(self) -> List[Dict[str, Any]]:
|
|
"""Generate action items based on current status."""
|
|
action_items = []
|
|
|
|
# Check for failed controls
|
|
failed_controls = self.ctrl_repo.get_all(status=ControlStatusEnum.FAIL)
|
|
for ctrl in failed_controls[:5]:
|
|
action_items.append({
|
|
"priority": "high",
|
|
"category": "control_remediation",
|
|
"title": f"Control {ctrl.control_id} beheben",
|
|
"description": f"Control '{ctrl.title}' ist im Status 'Fail'. Sofortige Massnahmen erforderlich.",
|
|
"owner": ctrl.owner,
|
|
"due_date": (date.today() + timedelta(days=7)).isoformat(),
|
|
})
|
|
|
|
# Check for critical/high risks
|
|
critical_risks = self.risk_repo.get_all(min_risk_level=RiskLevelEnum.HIGH)
|
|
for risk in critical_risks[:5]:
|
|
if risk.status == "open":
|
|
action_items.append({
|
|
"priority": "high" if risk.inherent_risk == RiskLevelEnum.CRITICAL else "medium",
|
|
"category": "risk_treatment",
|
|
"title": f"Risiko {risk.risk_id} behandeln",
|
|
"description": f"Risiko '{risk.title}' hat Status 'open' und Level '{risk.inherent_risk.value}'.",
|
|
"owner": risk.owner,
|
|
"due_date": (date.today() + timedelta(days=14)).isoformat(),
|
|
})
|
|
|
|
# Check for overdue reviews
|
|
due_for_review = self.ctrl_repo.get_due_for_review()
|
|
if len(due_for_review) > 5:
|
|
action_items.append({
|
|
"priority": "medium",
|
|
"category": "review",
|
|
"title": f"{len(due_for_review)} Control-Reviews ueberfaellig",
|
|
"description": "Mehrere Controls muessen reviewed werden.",
|
|
"owner": "Compliance Officer",
|
|
"due_date": (date.today() + timedelta(days=30)).isoformat(),
|
|
})
|
|
|
|
return action_items
|
|
|
|
def _generate_trends_placeholder(self, period: ReportPeriod) -> Dict[str, Any]:
|
|
"""
|
|
Generate trends section.
|
|
|
|
Note: Full trend analysis requires historical data storage.
|
|
This is a placeholder for future implementation.
|
|
"""
|
|
return {
|
|
"note": "Trend-Analyse erfordert historische Daten. Feature in Entwicklung.",
|
|
"period": period.value,
|
|
"compliance_score_trend": "stable", # Placeholder
|
|
"risk_trend": "stable", # Placeholder
|
|
"recommendations": [
|
|
"Historische Score-Snapshots aktivieren fuer Trend-Analyse",
|
|
"Regelmaessige Report-Generierung einrichten",
|
|
],
|
|
}
|
|
|
|
def generate_summary_report(self) -> Dict[str, Any]:
|
|
"""Generate a quick summary report (for dashboard)."""
|
|
stats = self.ctrl_repo.get_statistics()
|
|
risk_matrix = self.risk_repo.get_matrix_data()
|
|
evidence_stats = self.evidence_repo.get_statistics()
|
|
|
|
return {
|
|
"generated_at": datetime.utcnow().isoformat(),
|
|
"compliance_score": stats.get("compliance_score", 0),
|
|
"controls": {
|
|
"total": stats.get("total", 0),
|
|
"passing": stats.get("by_status", {}).get("pass", 0),
|
|
"failing": stats.get("by_status", {}).get("fail", 0),
|
|
},
|
|
"risks": {
|
|
"total": risk_matrix["total_risks"],
|
|
"critical": risk_matrix["by_level"].get("critical", 0),
|
|
"high": risk_matrix["by_level"].get("high", 0),
|
|
},
|
|
"evidence": {
|
|
"total": evidence_stats.get("total", 0),
|
|
"coverage": evidence_stats.get("coverage_percent", 0),
|
|
},
|
|
}
|