Files
Benjamin Admin e6201d5239 feat: Anti-Fake-Evidence System (Phase 1-4b)
Implement full evidence integrity pipeline to prevent compliance theater:
- Confidence levels (E0-E4), truth status tracking, assertion engine
- Four-Eyes approval workflow, audit trail, reject endpoint
- Evidence distribution dashboard, LLM audit routes
- Traceability matrix (backend endpoint + Compliance Hub UI tab)
- Anti-fake badges, control status machine, normative patterns
- 2 migrations, 4 test suites, MkDocs documentation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-23 17:15:45 +01:00

828 lines
28 KiB
Python

"""
FastAPI routes for Dashboard, Executive Dashboard, and Reports.
Endpoints:
- /dashboard: Main compliance dashboard
- /dashboard/executive: Executive summary for managers
- /dashboard/trend: Compliance score trend over time
- /dashboard/roadmap: Prioritised controls in 4 buckets
- /dashboard/module-status: Completion status of each SDK module
- /dashboard/next-actions: Top 5 most important actions
- /dashboard/snapshot: Save / query compliance score snapshots
- /score: Quick compliance score
- /reports: Report generation
"""
import logging
from datetime import datetime, date, timedelta
from calendar import month_abbr
from typing import Optional, Dict, Any, List
from decimal import Decimal
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from ..db import (
RegulationRepository,
RequirementRepository,
ControlRepository,
EvidenceRepository,
RiskRepository,
AssertionDB,
)
from .schemas import (
DashboardResponse,
MultiDimensionalScore,
ExecutiveDashboardResponse,
TrendDataPoint,
RiskSummary,
DeadlineItem,
TeamWorkloadItem,
TraceabilityAssertion,
TraceabilityEvidence,
TraceabilityCoverage,
TraceabilityControl,
TraceabilityMatrixResponse,
)
from .tenant_utils import get_tenant_id as _get_tenant_id
from .db_utils import row_to_dict as _row_to_dict
logger = logging.getLogger(__name__)
router = APIRouter(tags=["compliance-dashboard"])
# ============================================================================
# Dashboard
# ============================================================================
@router.get("/dashboard", response_model=DashboardResponse)
async def get_dashboard(db: Session = Depends(get_db)):
"""Get compliance dashboard statistics."""
reg_repo = RegulationRepository(db)
req_repo = RequirementRepository(db)
ctrl_repo = ControlRepository(db)
evidence_repo = EvidenceRepository(db)
risk_repo = RiskRepository(db)
# Regulations
regulations = reg_repo.get_active()
requirements = req_repo.get_all()
# Controls statistics
ctrl_stats = ctrl_repo.get_statistics()
controls = ctrl_repo.get_all()
# Group controls by domain
controls_by_domain = {}
for ctrl in controls:
domain = ctrl.domain.value if ctrl.domain else "unknown"
if domain not in controls_by_domain:
controls_by_domain[domain] = {"total": 0, "pass": 0, "partial": 0, "fail": 0, "planned": 0}
controls_by_domain[domain]["total"] += 1
status = ctrl.status.value if ctrl.status else "planned"
if status in controls_by_domain[domain]:
controls_by_domain[domain][status] += 1
# Evidence statistics
evidence_stats = evidence_repo.get_statistics()
# Risk statistics
risks = risk_repo.get_all()
risks_by_level = {"low": 0, "medium": 0, "high": 0, "critical": 0}
for risk in risks:
level = risk.inherent_risk.value if risk.inherent_risk else "low"
if level in risks_by_level:
risks_by_level[level] += 1
# Calculate compliance score - use pre-calculated score from get_statistics()
# or compute from by_status dict
score = ctrl_stats.get("compliance_score", 0.0)
# Multi-dimensional score (Anti-Fake-Evidence)
try:
ms = ctrl_repo.get_multi_dimensional_score()
multi_score = MultiDimensionalScore(**ms)
except Exception as e:
logger.warning(f"Failed to compute multi-dimensional score: {e}")
multi_score = None
return DashboardResponse(
compliance_score=round(score, 1),
total_regulations=len(regulations),
total_requirements=len(requirements),
total_controls=ctrl_stats.get("total", 0),
controls_by_status=ctrl_stats.get("by_status", {}),
controls_by_domain=controls_by_domain,
total_evidence=evidence_stats.get("total", 0),
evidence_by_status=evidence_stats.get("by_status", {}),
total_risks=len(risks),
risks_by_level=risks_by_level,
recent_activity=[],
multi_score=multi_score,
)
@router.get("/score")
async def get_compliance_score(db: Session = Depends(get_db)):
"""Get just the compliance score."""
ctrl_repo = ControlRepository(db)
stats = ctrl_repo.get_statistics()
total = stats.get("total", 0)
passing = stats.get("pass", 0)
partial = stats.get("partial", 0)
if total > 0:
score = ((passing + partial * 0.5) / total) * 100
else:
score = 0
# Multi-dimensional score (Anti-Fake-Evidence)
try:
multi_score = ctrl_repo.get_multi_dimensional_score()
except Exception:
multi_score = None
return {
"score": round(score, 1),
"total_controls": total,
"passing_controls": passing,
"partial_controls": partial,
"multi_score": multi_score,
}
# ============================================================================
# Executive Dashboard (Phase 3 - Sprint 1)
# ============================================================================
@router.get("/dashboard/executive", response_model=ExecutiveDashboardResponse)
async def get_executive_dashboard(db: Session = Depends(get_db)):
"""
Get executive dashboard for managers and decision makers.
Provides:
- Traffic light status (green/yellow/red)
- Overall compliance score with trend
- Top 5 open risks
- Upcoming deadlines (control reviews, evidence expiry)
- Team workload distribution
"""
reg_repo = RegulationRepository(db)
req_repo = RequirementRepository(db)
ctrl_repo = ControlRepository(db)
risk_repo = RiskRepository(db)
# Calculate compliance score
ctrl_stats = ctrl_repo.get_statistics()
total = ctrl_stats.get("total", 0)
passing = ctrl_stats.get("pass", 0)
partial = ctrl_stats.get("partial", 0)
if total > 0:
score = ((passing + partial * 0.5) / total) * 100
else:
score = 0
# Determine traffic light status
if score >= 80:
traffic_light = "green"
elif score >= 60:
traffic_light = "yellow"
else:
traffic_light = "red"
# Trend data — only show current score, no simulated history
trend_data = []
if total > 0:
now = datetime.utcnow()
trend_data.append(TrendDataPoint(
date=now.strftime("%Y-%m-%d"),
score=round(score, 1),
label=month_abbr[now.month][:3],
))
# Get top 5 risks (sorted by severity)
risks = risk_repo.get_all()
risk_priority = {"critical": 4, "high": 3, "medium": 2, "low": 1}
sorted_risks = sorted(
[r for r in risks if r.status != "mitigated"],
key=lambda r: (
risk_priority.get(r.inherent_risk.value if r.inherent_risk else "low", 1),
r.impact * r.likelihood
),
reverse=True
)[:5]
top_risks = [
RiskSummary(
id=r.id,
risk_id=r.risk_id,
title=r.title,
risk_level=r.inherent_risk.value if r.inherent_risk else "medium",
owner=r.owner,
status=r.status,
category=r.category,
impact=r.impact,
likelihood=r.likelihood,
)
for r in sorted_risks
]
# Get upcoming deadlines
controls = ctrl_repo.get_all()
upcoming_deadlines = []
today = datetime.utcnow().date()
for ctrl in controls:
if ctrl.next_review_at:
review_date = ctrl.next_review_at.date() if hasattr(ctrl.next_review_at, 'date') else ctrl.next_review_at
days_remaining = (review_date - today).days
if days_remaining <= 30:
if days_remaining < 0:
status = "overdue"
elif days_remaining <= 7:
status = "at_risk"
else:
status = "on_track"
upcoming_deadlines.append(DeadlineItem(
id=ctrl.id,
title=f"Review: {ctrl.control_id} - {ctrl.title[:30]}",
deadline=review_date.isoformat(),
days_remaining=days_remaining,
type="control_review",
status=status,
owner=ctrl.owner,
))
upcoming_deadlines.sort(key=lambda x: x.days_remaining)
upcoming_deadlines = upcoming_deadlines[:10]
# Calculate team workload (by owner)
owner_workload = {}
for ctrl in controls:
owner = ctrl.owner or "Unassigned"
if owner not in owner_workload:
owner_workload[owner] = {"pending": 0, "in_progress": 0, "completed": 0}
status = ctrl.status.value if ctrl.status else "planned"
if status in ["pass"]:
owner_workload[owner]["completed"] += 1
elif status in ["partial"]:
owner_workload[owner]["in_progress"] += 1
else:
owner_workload[owner]["pending"] += 1
team_workload = []
for name, stats in owner_workload.items():
total_tasks = stats["pending"] + stats["in_progress"] + stats["completed"]
completion_rate = (stats["completed"] / total_tasks * 100) if total_tasks > 0 else 0
team_workload.append(TeamWorkloadItem(
name=name,
pending_tasks=stats["pending"],
in_progress_tasks=stats["in_progress"],
completed_tasks=stats["completed"],
total_tasks=total_tasks,
completion_rate=round(completion_rate, 1),
))
team_workload.sort(key=lambda x: x.total_tasks, reverse=True)
# Get counts
regulations = reg_repo.get_active()
requirements = req_repo.get_all()
open_risks = len([r for r in risks if r.status != "mitigated"])
return ExecutiveDashboardResponse(
traffic_light_status=traffic_light,
overall_score=round(score, 1),
score_trend=trend_data,
previous_score=trend_data[-2].score if len(trend_data) >= 2 else None,
score_change=round(score - trend_data[-2].score, 1) if len(trend_data) >= 2 else None,
total_regulations=len(regulations),
total_requirements=len(requirements),
total_controls=total,
open_risks=open_risks,
top_risks=top_risks,
upcoming_deadlines=upcoming_deadlines,
team_workload=team_workload,
last_updated=datetime.utcnow().isoformat(),
)
@router.get("/dashboard/trend")
async def get_compliance_trend(
months: int = Query(12, ge=1, le=24, description="Number of months to include"),
db: Session = Depends(get_db),
):
"""
Get compliance score trend over time.
Returns monthly compliance scores for trend visualization.
"""
ctrl_repo = ControlRepository(db)
stats = ctrl_repo.get_statistics()
total = stats.get("total", 0)
passing = stats.get("pass", 0)
partial = stats.get("partial", 0)
current_score = ((passing + partial * 0.5) / total) * 100 if total > 0 else 0
# Trend data — only current score, no simulated history
trend_data = []
if total > 0:
now = datetime.utcnow()
trend_data.append({
"date": now.strftime("%Y-%m-%d"),
"score": round(current_score, 1),
"label": f"{month_abbr[now.month]} {now.year % 100}",
"month": now.month,
"year": now.year,
})
return {
"current_score": round(current_score, 1),
"trend": trend_data,
"period_months": months,
"generated_at": datetime.utcnow().isoformat(),
}
# ============================================================================
# Dashboard Extended — Roadmap, Module-Status, Next-Actions, Snapshots
# ============================================================================
# Weight map for control prioritisation
_PRIORITY_WEIGHTS = {"legal": 5, "security": 3, "best_practice": 1, "operational": 2}
# SDK module definitions → DB table used for counting completion
_MODULE_DEFS: List[Dict[str, str]] = [
{"key": "vvt", "label": "VVT", "table": "compliance_vvt_activities"},
{"key": "tom", "label": "TOM", "table": "compliance_toms"},
{"key": "dsfa", "label": "DSFA", "table": "compliance_dsfa_assessments"},
{"key": "loeschfristen", "label": "Loeschfristen", "table": "compliance_loeschfristen"},
{"key": "risks", "label": "Risiken", "table": "compliance_risks"},
{"key": "controls", "label": "Controls", "table": "compliance_controls"},
{"key": "evidence", "label": "Nachweise", "table": "compliance_evidence"},
{"key": "obligations", "label": "Pflichten", "table": "compliance_obligations"},
{"key": "incidents", "label": "Vorfaelle", "table": "compliance_notfallplan_incidents"},
{"key": "vendor", "label": "Auftragsverarbeiter", "table": "compliance_vendor_assessments"},
{"key": "legal_templates", "label": "Rechtl. Dokumente", "table": "compliance_legal_templates"},
{"key": "training", "label": "Schulungen", "table": "training_modules"},
{"key": "audit", "label": "Audit", "table": "compliance_audit_sessions"},
{"key": "security_backlog", "label": "Security-Backlog", "table": "compliance_security_backlog"},
{"key": "quality", "label": "Qualitaet", "table": "compliance_quality_items"},
]
@router.get("/dashboard/roadmap")
async def get_dashboard_roadmap(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
"""Prioritised controls in 4 buckets: Quick Wins, Must Have, Should Have, Nice to Have."""
ctrl_repo = ControlRepository(db)
controls = ctrl_repo.get_all()
today = datetime.utcnow().date()
buckets: Dict[str, list] = {
"quick_wins": [],
"must_have": [],
"should_have": [],
"nice_to_have": [],
}
for ctrl in controls:
status = ctrl.status.value if ctrl.status else "planned"
if status == "pass":
continue # already done
weight = _PRIORITY_WEIGHTS.get(ctrl.category if hasattr(ctrl, "category") else "best_practice", 1)
days_overdue = 0
if ctrl.next_review_at:
review_date = ctrl.next_review_at.date() if hasattr(ctrl.next_review_at, "date") else ctrl.next_review_at
days_overdue = (today - review_date).days
urgency = weight * 2 + (1 if days_overdue > 0 else 0)
item = {
"id": str(ctrl.id),
"control_id": ctrl.control_id,
"title": ctrl.title,
"status": status,
"domain": ctrl.domain.value if ctrl.domain else "unknown",
"owner": ctrl.owner,
"next_review_at": ctrl.next_review_at.isoformat() if ctrl.next_review_at else None,
"days_overdue": max(0, days_overdue),
"weight": weight,
}
if weight >= 5 and days_overdue > 0:
buckets["quick_wins"].append(item)
elif weight >= 4:
buckets["must_have"].append(item)
elif weight >= 2:
buckets["should_have"].append(item)
else:
buckets["nice_to_have"].append(item)
# Sort each bucket by urgency desc
for key in buckets:
buckets[key].sort(key=lambda x: x["days_overdue"], reverse=True)
return {
"buckets": buckets,
"counts": {k: len(v) for k, v in buckets.items()},
"generated_at": datetime.utcnow().isoformat(),
}
@router.get("/dashboard/module-status")
async def get_module_status(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
"""Completion status for each SDK module based on DB record counts."""
modules = []
for mod in _MODULE_DEFS:
try:
row = db.execute(
text(f"SELECT COUNT(*) FROM {mod['table']} WHERE tenant_id = :tid"),
{"tid": tenant_id},
).fetchone()
count = int(row[0]) if row else 0
except Exception:
count = 0
# Simple heuristic: 0 = not started, 1-2 = in progress, 3+ = complete
if count == 0:
status = "not_started"
progress = 0
elif count < 3:
status = "in_progress"
progress = min(60, count * 30)
else:
status = "complete"
progress = 100
modules.append({
"key": mod["key"],
"label": mod["label"],
"count": count,
"status": status,
"progress": progress,
})
started = sum(1 for m in modules if m["status"] != "not_started")
complete = sum(1 for m in modules if m["status"] == "complete")
return {
"modules": modules,
"total": len(modules),
"started": started,
"complete": complete,
"overall_progress": round((complete / len(modules)) * 100, 1) if modules else 0,
}
@router.get("/dashboard/next-actions")
async def get_next_actions(
limit: int = Query(5, ge=1, le=20),
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
"""Top N most important actions sorted by urgency*impact."""
ctrl_repo = ControlRepository(db)
controls = ctrl_repo.get_all()
today = datetime.utcnow().date()
actions = []
for ctrl in controls:
status = ctrl.status.value if ctrl.status else "planned"
if status == "pass":
continue
days_overdue = 0
if ctrl.next_review_at:
review_date = ctrl.next_review_at.date() if hasattr(ctrl.next_review_at, "date") else ctrl.next_review_at
days_overdue = max(0, (today - review_date).days)
weight = _PRIORITY_WEIGHTS.get(ctrl.category if hasattr(ctrl, "category") else "best_practice", 1)
urgency_score = weight * 10 + days_overdue
actions.append({
"id": str(ctrl.id),
"control_id": ctrl.control_id,
"title": ctrl.title,
"status": status,
"domain": ctrl.domain.value if ctrl.domain else "unknown",
"owner": ctrl.owner,
"days_overdue": days_overdue,
"urgency_score": urgency_score,
"reason": "Ueberfaellig" if days_overdue > 0 else "Offen",
})
actions.sort(key=lambda x: x["urgency_score"], reverse=True)
return {"actions": actions[:limit]}
@router.post("/dashboard/snapshot")
async def create_score_snapshot(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
"""Save current compliance score as a historical snapshot."""
ctrl_repo = ControlRepository(db)
evidence_repo = EvidenceRepository(db)
risk_repo = RiskRepository(db)
ctrl_stats = ctrl_repo.get_statistics()
evidence_stats = evidence_repo.get_statistics()
risks = risk_repo.get_all()
total = ctrl_stats.get("total", 0)
passing = ctrl_stats.get("pass", 0)
partial = ctrl_stats.get("partial", 0)
score = round(((passing + partial * 0.5) / total) * 100, 2) if total > 0 else 0
risks_high = sum(1 for r in risks if (r.inherent_risk.value if r.inherent_risk else "low") in ("high", "critical"))
today = date.today()
row = db.execute(text("""
INSERT INTO compliance_score_snapshots (
tenant_id, score, controls_total, controls_pass, controls_partial,
evidence_total, evidence_valid, risks_total, risks_high, snapshot_date
) VALUES (
:tenant_id, :score, :controls_total, :controls_pass, :controls_partial,
:evidence_total, :evidence_valid, :risks_total, :risks_high, :snapshot_date
)
ON CONFLICT (tenant_id, project_id, snapshot_date) DO UPDATE SET
score = EXCLUDED.score,
controls_total = EXCLUDED.controls_total,
controls_pass = EXCLUDED.controls_pass,
controls_partial = EXCLUDED.controls_partial,
evidence_total = EXCLUDED.evidence_total,
evidence_valid = EXCLUDED.evidence_valid,
risks_total = EXCLUDED.risks_total,
risks_high = EXCLUDED.risks_high
RETURNING *
"""), {
"tenant_id": tenant_id,
"score": score,
"controls_total": total,
"controls_pass": passing,
"controls_partial": partial,
"evidence_total": evidence_stats.get("total", 0),
"evidence_valid": evidence_stats.get("by_status", {}).get("valid", 0),
"risks_total": len(risks),
"risks_high": risks_high,
"snapshot_date": today,
}).fetchone()
db.commit()
return _row_to_dict(row)
@router.get("/dashboard/score-history")
async def get_score_history(
months: int = Query(12, ge=1, le=36),
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
"""Get compliance score history from snapshots."""
since = date.today() - timedelta(days=months * 30)
rows = db.execute(text("""
SELECT * FROM compliance_score_snapshots
WHERE tenant_id = :tenant_id AND snapshot_date >= :since
ORDER BY snapshot_date ASC
"""), {"tenant_id": tenant_id, "since": since}).fetchall()
snapshots = []
for r in rows:
d = _row_to_dict(r)
# Convert Decimal to float for JSON
if isinstance(d.get("score"), Decimal):
d["score"] = float(d["score"])
snapshots.append(d)
return {
"snapshots": snapshots,
"total": len(snapshots),
"period_months": months,
}
# ============================================================================
# Evidence Distribution (Anti-Fake-Evidence Phase 3)
# ============================================================================
@router.get("/dashboard/evidence-distribution")
async def get_evidence_distribution(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
"""Evidence counts by confidence level and four-eyes status."""
evidence_repo = EvidenceRepository(db)
all_evidence = evidence_repo.get_all()
by_confidence = {"E0": 0, "E1": 0, "E2": 0, "E3": 0, "E4": 0}
four_eyes_pending = 0
for e in all_evidence:
level = e.confidence_level.value if e.confidence_level else "E1"
if level in by_confidence:
by_confidence[level] += 1
if e.requires_four_eyes and e.approval_status not in ("approved", "rejected"):
four_eyes_pending += 1
return {
"by_confidence": by_confidence,
"four_eyes_pending": four_eyes_pending,
"total": len(all_evidence),
}
# ============================================================================
# Traceability Matrix (Anti-Fake-Evidence Phase 4a)
# ============================================================================
@router.get("/dashboard/traceability-matrix", response_model=TraceabilityMatrixResponse)
async def get_traceability_matrix(
db: Session = Depends(get_db),
tenant_id: str = Depends(_get_tenant_id),
):
"""
Full traceability chain: Control → Evidence → Assertions.
Loads each entity set once, builds in-memory indices, and nests
the result so the frontend can render a matrix view.
"""
ctrl_repo = ControlRepository(db)
evidence_repo = EvidenceRepository(db)
# 1. Load all three entity sets
controls = ctrl_repo.get_all()
all_evidence = evidence_repo.get_all()
all_assertions = db.query(AssertionDB).filter(
AssertionDB.entity_type == "evidence",
).all()
# 2. Index assertions by evidence_id (entity_id)
assertions_by_evidence: Dict[str, list] = {}
for a in all_assertions:
assertions_by_evidence.setdefault(a.entity_id, []).append(a)
# 3. Index evidence by control_id
evidence_by_control: Dict[str, list] = {}
for e in all_evidence:
evidence_by_control.setdefault(str(e.control_id), []).append(e)
# 4. Build nested response
result_controls: list = []
total_controls = 0
covered_controls = 0
fully_verified = 0
for ctrl in controls:
total_controls += 1
ctrl_id = str(ctrl.id)
ctrl_evidence = evidence_by_control.get(ctrl_id, [])
nested_evidence: list = []
has_evidence = len(ctrl_evidence) > 0
has_assertions = False
all_verified = True
min_conf: Optional[str] = None
conf_order = {"E0": 0, "E1": 1, "E2": 2, "E3": 3, "E4": 4}
for e in ctrl_evidence:
ev_id = str(e.id)
ev_assertions = assertions_by_evidence.get(ev_id, [])
nested_assertions = [
TraceabilityAssertion(
id=str(a.id),
sentence_text=a.sentence_text,
assertion_type=a.assertion_type or "assertion",
confidence=a.confidence or 0.0,
verified=a.verified_by is not None,
)
for a in ev_assertions
]
if nested_assertions:
has_assertions = True
for na in nested_assertions:
if not na.verified:
all_verified = False
conf = e.confidence_level.value if e.confidence_level else "E1"
if min_conf is None or conf_order.get(conf, 1) < conf_order.get(min_conf, 1):
min_conf = conf
nested_evidence.append(TraceabilityEvidence(
id=ev_id,
title=e.title,
evidence_type=e.evidence_type,
confidence_level=conf,
status=e.status.value if e.status else "valid",
assertions=nested_assertions,
))
if not has_assertions:
all_verified = False
if has_evidence:
covered_controls += 1
if has_evidence and has_assertions and all_verified:
fully_verified += 1
coverage = TraceabilityCoverage(
has_evidence=has_evidence,
has_assertions=has_assertions,
all_assertions_verified=all_verified,
min_confidence_level=min_conf,
)
result_controls.append(TraceabilityControl(
id=ctrl_id,
control_id=ctrl.control_id,
title=ctrl.title,
status=ctrl.status.value if ctrl.status else "planned",
domain=ctrl.domain.value if ctrl.domain else "unknown",
evidence=nested_evidence,
coverage=coverage,
))
summary = {
"total_controls": total_controls,
"covered_controls": covered_controls,
"fully_verified": fully_verified,
"uncovered_controls": total_controls - covered_controls,
}
return TraceabilityMatrixResponse(controls=result_controls, summary=summary)
# ============================================================================
# Reports
# ============================================================================
@router.get("/reports/summary")
async def get_summary_report(db: Session = Depends(get_db)):
"""Get a quick summary report for the dashboard."""
from ..services.report_generator import ComplianceReportGenerator
generator = ComplianceReportGenerator(db)
return generator.generate_summary_report()
@router.get("/reports/{period}")
async def generate_period_report(
period: str = "monthly",
as_of_date: Optional[str] = None,
db: Session = Depends(get_db),
):
"""
Generate a compliance report for the specified period.
Args:
period: One of 'weekly', 'monthly', 'quarterly', 'yearly'
as_of_date: Report date (YYYY-MM-DD format, defaults to today)
Returns:
Complete compliance report
"""
from ..services.report_generator import ComplianceReportGenerator, ReportPeriod
# Validate period
try:
report_period = ReportPeriod(period)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid period '{period}'. Must be one of: weekly, monthly, quarterly, yearly"
)
# Parse date
report_date = None
if as_of_date:
try:
report_date = datetime.strptime(as_of_date, "%Y-%m-%d").date()
except ValueError:
raise HTTPException(
status_code=400,
detail="Invalid date format. Use YYYY-MM-DD"
)
generator = ComplianceReportGenerator(db)
return generator.generate_report(report_period, report_date)