Implement full evidence integrity pipeline to prevent compliance theater: - Confidence levels (E0-E4), truth status tracking, assertion engine - Four-Eyes approval workflow, audit trail, reject endpoint - Evidence distribution dashboard, LLM audit routes - Traceability matrix (backend endpoint + Compliance Hub UI tab) - Anti-fake badges, control status machine, normative patterns - 2 migrations, 4 test suites, MkDocs documentation Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
828 lines
28 KiB
Python
828 lines
28 KiB
Python
"""
|
|
FastAPI routes for Dashboard, Executive Dashboard, and Reports.
|
|
|
|
Endpoints:
|
|
- /dashboard: Main compliance dashboard
|
|
- /dashboard/executive: Executive summary for managers
|
|
- /dashboard/trend: Compliance score trend over time
|
|
- /dashboard/roadmap: Prioritised controls in 4 buckets
|
|
- /dashboard/module-status: Completion status of each SDK module
|
|
- /dashboard/next-actions: Top 5 most important actions
|
|
- /dashboard/snapshot: Save / query compliance score snapshots
|
|
- /score: Quick compliance score
|
|
- /reports: Report generation
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime, date, timedelta
|
|
from calendar import month_abbr
|
|
from typing import Optional, Dict, Any, List
|
|
from decimal import Decimal
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import text
|
|
from sqlalchemy.orm import Session
|
|
|
|
from classroom_engine.database import get_db
|
|
|
|
from ..db import (
|
|
RegulationRepository,
|
|
RequirementRepository,
|
|
ControlRepository,
|
|
EvidenceRepository,
|
|
RiskRepository,
|
|
AssertionDB,
|
|
)
|
|
from .schemas import (
|
|
DashboardResponse,
|
|
MultiDimensionalScore,
|
|
ExecutiveDashboardResponse,
|
|
TrendDataPoint,
|
|
RiskSummary,
|
|
DeadlineItem,
|
|
TeamWorkloadItem,
|
|
TraceabilityAssertion,
|
|
TraceabilityEvidence,
|
|
TraceabilityCoverage,
|
|
TraceabilityControl,
|
|
TraceabilityMatrixResponse,
|
|
)
|
|
from .tenant_utils import get_tenant_id as _get_tenant_id
|
|
from .db_utils import row_to_dict as _row_to_dict
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(tags=["compliance-dashboard"])
|
|
|
|
|
|
# ============================================================================
|
|
# Dashboard
|
|
# ============================================================================
|
|
|
|
@router.get("/dashboard", response_model=DashboardResponse)
|
|
async def get_dashboard(db: Session = Depends(get_db)):
|
|
"""Get compliance dashboard statistics."""
|
|
reg_repo = RegulationRepository(db)
|
|
req_repo = RequirementRepository(db)
|
|
ctrl_repo = ControlRepository(db)
|
|
evidence_repo = EvidenceRepository(db)
|
|
risk_repo = RiskRepository(db)
|
|
|
|
# Regulations
|
|
regulations = reg_repo.get_active()
|
|
requirements = req_repo.get_all()
|
|
|
|
# Controls statistics
|
|
ctrl_stats = ctrl_repo.get_statistics()
|
|
controls = ctrl_repo.get_all()
|
|
|
|
# Group controls by domain
|
|
controls_by_domain = {}
|
|
for ctrl in controls:
|
|
domain = ctrl.domain.value if ctrl.domain else "unknown"
|
|
if domain not in controls_by_domain:
|
|
controls_by_domain[domain] = {"total": 0, "pass": 0, "partial": 0, "fail": 0, "planned": 0}
|
|
controls_by_domain[domain]["total"] += 1
|
|
status = ctrl.status.value if ctrl.status else "planned"
|
|
if status in controls_by_domain[domain]:
|
|
controls_by_domain[domain][status] += 1
|
|
|
|
# Evidence statistics
|
|
evidence_stats = evidence_repo.get_statistics()
|
|
|
|
# Risk statistics
|
|
risks = risk_repo.get_all()
|
|
risks_by_level = {"low": 0, "medium": 0, "high": 0, "critical": 0}
|
|
for risk in risks:
|
|
level = risk.inherent_risk.value if risk.inherent_risk else "low"
|
|
if level in risks_by_level:
|
|
risks_by_level[level] += 1
|
|
|
|
# Calculate compliance score - use pre-calculated score from get_statistics()
|
|
# or compute from by_status dict
|
|
score = ctrl_stats.get("compliance_score", 0.0)
|
|
|
|
# Multi-dimensional score (Anti-Fake-Evidence)
|
|
try:
|
|
ms = ctrl_repo.get_multi_dimensional_score()
|
|
multi_score = MultiDimensionalScore(**ms)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to compute multi-dimensional score: {e}")
|
|
multi_score = None
|
|
|
|
return DashboardResponse(
|
|
compliance_score=round(score, 1),
|
|
total_regulations=len(regulations),
|
|
total_requirements=len(requirements),
|
|
total_controls=ctrl_stats.get("total", 0),
|
|
controls_by_status=ctrl_stats.get("by_status", {}),
|
|
controls_by_domain=controls_by_domain,
|
|
total_evidence=evidence_stats.get("total", 0),
|
|
evidence_by_status=evidence_stats.get("by_status", {}),
|
|
total_risks=len(risks),
|
|
risks_by_level=risks_by_level,
|
|
recent_activity=[],
|
|
multi_score=multi_score,
|
|
)
|
|
|
|
|
|
@router.get("/score")
|
|
async def get_compliance_score(db: Session = Depends(get_db)):
|
|
"""Get just the compliance score."""
|
|
ctrl_repo = ControlRepository(db)
|
|
stats = ctrl_repo.get_statistics()
|
|
|
|
total = stats.get("total", 0)
|
|
passing = stats.get("pass", 0)
|
|
partial = stats.get("partial", 0)
|
|
|
|
if total > 0:
|
|
score = ((passing + partial * 0.5) / total) * 100
|
|
else:
|
|
score = 0
|
|
|
|
# Multi-dimensional score (Anti-Fake-Evidence)
|
|
try:
|
|
multi_score = ctrl_repo.get_multi_dimensional_score()
|
|
except Exception:
|
|
multi_score = None
|
|
|
|
return {
|
|
"score": round(score, 1),
|
|
"total_controls": total,
|
|
"passing_controls": passing,
|
|
"partial_controls": partial,
|
|
"multi_score": multi_score,
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# Executive Dashboard (Phase 3 - Sprint 1)
|
|
# ============================================================================
|
|
|
|
@router.get("/dashboard/executive", response_model=ExecutiveDashboardResponse)
|
|
async def get_executive_dashboard(db: Session = Depends(get_db)):
|
|
"""
|
|
Get executive dashboard for managers and decision makers.
|
|
|
|
Provides:
|
|
- Traffic light status (green/yellow/red)
|
|
- Overall compliance score with trend
|
|
- Top 5 open risks
|
|
- Upcoming deadlines (control reviews, evidence expiry)
|
|
- Team workload distribution
|
|
"""
|
|
reg_repo = RegulationRepository(db)
|
|
req_repo = RequirementRepository(db)
|
|
ctrl_repo = ControlRepository(db)
|
|
risk_repo = RiskRepository(db)
|
|
|
|
# Calculate compliance score
|
|
ctrl_stats = ctrl_repo.get_statistics()
|
|
total = ctrl_stats.get("total", 0)
|
|
passing = ctrl_stats.get("pass", 0)
|
|
partial = ctrl_stats.get("partial", 0)
|
|
|
|
if total > 0:
|
|
score = ((passing + partial * 0.5) / total) * 100
|
|
else:
|
|
score = 0
|
|
|
|
# Determine traffic light status
|
|
if score >= 80:
|
|
traffic_light = "green"
|
|
elif score >= 60:
|
|
traffic_light = "yellow"
|
|
else:
|
|
traffic_light = "red"
|
|
|
|
# Trend data — only show current score, no simulated history
|
|
trend_data = []
|
|
if total > 0:
|
|
now = datetime.utcnow()
|
|
trend_data.append(TrendDataPoint(
|
|
date=now.strftime("%Y-%m-%d"),
|
|
score=round(score, 1),
|
|
label=month_abbr[now.month][:3],
|
|
))
|
|
|
|
# Get top 5 risks (sorted by severity)
|
|
risks = risk_repo.get_all()
|
|
risk_priority = {"critical": 4, "high": 3, "medium": 2, "low": 1}
|
|
sorted_risks = sorted(
|
|
[r for r in risks if r.status != "mitigated"],
|
|
key=lambda r: (
|
|
risk_priority.get(r.inherent_risk.value if r.inherent_risk else "low", 1),
|
|
r.impact * r.likelihood
|
|
),
|
|
reverse=True
|
|
)[:5]
|
|
|
|
top_risks = [
|
|
RiskSummary(
|
|
id=r.id,
|
|
risk_id=r.risk_id,
|
|
title=r.title,
|
|
risk_level=r.inherent_risk.value if r.inherent_risk else "medium",
|
|
owner=r.owner,
|
|
status=r.status,
|
|
category=r.category,
|
|
impact=r.impact,
|
|
likelihood=r.likelihood,
|
|
)
|
|
for r in sorted_risks
|
|
]
|
|
|
|
# Get upcoming deadlines
|
|
controls = ctrl_repo.get_all()
|
|
upcoming_deadlines = []
|
|
today = datetime.utcnow().date()
|
|
|
|
for ctrl in controls:
|
|
if ctrl.next_review_at:
|
|
review_date = ctrl.next_review_at.date() if hasattr(ctrl.next_review_at, 'date') else ctrl.next_review_at
|
|
days_remaining = (review_date - today).days
|
|
|
|
if days_remaining <= 30:
|
|
if days_remaining < 0:
|
|
status = "overdue"
|
|
elif days_remaining <= 7:
|
|
status = "at_risk"
|
|
else:
|
|
status = "on_track"
|
|
|
|
upcoming_deadlines.append(DeadlineItem(
|
|
id=ctrl.id,
|
|
title=f"Review: {ctrl.control_id} - {ctrl.title[:30]}",
|
|
deadline=review_date.isoformat(),
|
|
days_remaining=days_remaining,
|
|
type="control_review",
|
|
status=status,
|
|
owner=ctrl.owner,
|
|
))
|
|
|
|
upcoming_deadlines.sort(key=lambda x: x.days_remaining)
|
|
upcoming_deadlines = upcoming_deadlines[:10]
|
|
|
|
# Calculate team workload (by owner)
|
|
owner_workload = {}
|
|
for ctrl in controls:
|
|
owner = ctrl.owner or "Unassigned"
|
|
if owner not in owner_workload:
|
|
owner_workload[owner] = {"pending": 0, "in_progress": 0, "completed": 0}
|
|
|
|
status = ctrl.status.value if ctrl.status else "planned"
|
|
if status in ["pass"]:
|
|
owner_workload[owner]["completed"] += 1
|
|
elif status in ["partial"]:
|
|
owner_workload[owner]["in_progress"] += 1
|
|
else:
|
|
owner_workload[owner]["pending"] += 1
|
|
|
|
team_workload = []
|
|
for name, stats in owner_workload.items():
|
|
total_tasks = stats["pending"] + stats["in_progress"] + stats["completed"]
|
|
completion_rate = (stats["completed"] / total_tasks * 100) if total_tasks > 0 else 0
|
|
team_workload.append(TeamWorkloadItem(
|
|
name=name,
|
|
pending_tasks=stats["pending"],
|
|
in_progress_tasks=stats["in_progress"],
|
|
completed_tasks=stats["completed"],
|
|
total_tasks=total_tasks,
|
|
completion_rate=round(completion_rate, 1),
|
|
))
|
|
|
|
team_workload.sort(key=lambda x: x.total_tasks, reverse=True)
|
|
|
|
# Get counts
|
|
regulations = reg_repo.get_active()
|
|
requirements = req_repo.get_all()
|
|
open_risks = len([r for r in risks if r.status != "mitigated"])
|
|
|
|
return ExecutiveDashboardResponse(
|
|
traffic_light_status=traffic_light,
|
|
overall_score=round(score, 1),
|
|
score_trend=trend_data,
|
|
previous_score=trend_data[-2].score if len(trend_data) >= 2 else None,
|
|
score_change=round(score - trend_data[-2].score, 1) if len(trend_data) >= 2 else None,
|
|
total_regulations=len(regulations),
|
|
total_requirements=len(requirements),
|
|
total_controls=total,
|
|
open_risks=open_risks,
|
|
top_risks=top_risks,
|
|
upcoming_deadlines=upcoming_deadlines,
|
|
team_workload=team_workload,
|
|
last_updated=datetime.utcnow().isoformat(),
|
|
)
|
|
|
|
|
|
@router.get("/dashboard/trend")
|
|
async def get_compliance_trend(
|
|
months: int = Query(12, ge=1, le=24, description="Number of months to include"),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
Get compliance score trend over time.
|
|
|
|
Returns monthly compliance scores for trend visualization.
|
|
"""
|
|
ctrl_repo = ControlRepository(db)
|
|
stats = ctrl_repo.get_statistics()
|
|
total = stats.get("total", 0)
|
|
passing = stats.get("pass", 0)
|
|
partial = stats.get("partial", 0)
|
|
|
|
current_score = ((passing + partial * 0.5) / total) * 100 if total > 0 else 0
|
|
|
|
# Trend data — only current score, no simulated history
|
|
trend_data = []
|
|
if total > 0:
|
|
now = datetime.utcnow()
|
|
trend_data.append({
|
|
"date": now.strftime("%Y-%m-%d"),
|
|
"score": round(current_score, 1),
|
|
"label": f"{month_abbr[now.month]} {now.year % 100}",
|
|
"month": now.month,
|
|
"year": now.year,
|
|
})
|
|
|
|
return {
|
|
"current_score": round(current_score, 1),
|
|
"trend": trend_data,
|
|
"period_months": months,
|
|
"generated_at": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# Dashboard Extended — Roadmap, Module-Status, Next-Actions, Snapshots
|
|
# ============================================================================
|
|
|
|
# Weight map for control prioritisation
|
|
_PRIORITY_WEIGHTS = {"legal": 5, "security": 3, "best_practice": 1, "operational": 2}
|
|
|
|
# SDK module definitions → DB table used for counting completion
|
|
_MODULE_DEFS: List[Dict[str, str]] = [
|
|
{"key": "vvt", "label": "VVT", "table": "compliance_vvt_activities"},
|
|
{"key": "tom", "label": "TOM", "table": "compliance_toms"},
|
|
{"key": "dsfa", "label": "DSFA", "table": "compliance_dsfa_assessments"},
|
|
{"key": "loeschfristen", "label": "Loeschfristen", "table": "compliance_loeschfristen"},
|
|
{"key": "risks", "label": "Risiken", "table": "compliance_risks"},
|
|
{"key": "controls", "label": "Controls", "table": "compliance_controls"},
|
|
{"key": "evidence", "label": "Nachweise", "table": "compliance_evidence"},
|
|
{"key": "obligations", "label": "Pflichten", "table": "compliance_obligations"},
|
|
{"key": "incidents", "label": "Vorfaelle", "table": "compliance_notfallplan_incidents"},
|
|
{"key": "vendor", "label": "Auftragsverarbeiter", "table": "compliance_vendor_assessments"},
|
|
{"key": "legal_templates", "label": "Rechtl. Dokumente", "table": "compliance_legal_templates"},
|
|
{"key": "training", "label": "Schulungen", "table": "training_modules"},
|
|
{"key": "audit", "label": "Audit", "table": "compliance_audit_sessions"},
|
|
{"key": "security_backlog", "label": "Security-Backlog", "table": "compliance_security_backlog"},
|
|
{"key": "quality", "label": "Qualitaet", "table": "compliance_quality_items"},
|
|
]
|
|
|
|
|
|
@router.get("/dashboard/roadmap")
|
|
async def get_dashboard_roadmap(
|
|
db: Session = Depends(get_db),
|
|
tenant_id: str = Depends(_get_tenant_id),
|
|
):
|
|
"""Prioritised controls in 4 buckets: Quick Wins, Must Have, Should Have, Nice to Have."""
|
|
ctrl_repo = ControlRepository(db)
|
|
controls = ctrl_repo.get_all()
|
|
today = datetime.utcnow().date()
|
|
|
|
buckets: Dict[str, list] = {
|
|
"quick_wins": [],
|
|
"must_have": [],
|
|
"should_have": [],
|
|
"nice_to_have": [],
|
|
}
|
|
|
|
for ctrl in controls:
|
|
status = ctrl.status.value if ctrl.status else "planned"
|
|
if status == "pass":
|
|
continue # already done
|
|
|
|
weight = _PRIORITY_WEIGHTS.get(ctrl.category if hasattr(ctrl, "category") else "best_practice", 1)
|
|
days_overdue = 0
|
|
if ctrl.next_review_at:
|
|
review_date = ctrl.next_review_at.date() if hasattr(ctrl.next_review_at, "date") else ctrl.next_review_at
|
|
days_overdue = (today - review_date).days
|
|
|
|
urgency = weight * 2 + (1 if days_overdue > 0 else 0)
|
|
|
|
item = {
|
|
"id": str(ctrl.id),
|
|
"control_id": ctrl.control_id,
|
|
"title": ctrl.title,
|
|
"status": status,
|
|
"domain": ctrl.domain.value if ctrl.domain else "unknown",
|
|
"owner": ctrl.owner,
|
|
"next_review_at": ctrl.next_review_at.isoformat() if ctrl.next_review_at else None,
|
|
"days_overdue": max(0, days_overdue),
|
|
"weight": weight,
|
|
}
|
|
|
|
if weight >= 5 and days_overdue > 0:
|
|
buckets["quick_wins"].append(item)
|
|
elif weight >= 4:
|
|
buckets["must_have"].append(item)
|
|
elif weight >= 2:
|
|
buckets["should_have"].append(item)
|
|
else:
|
|
buckets["nice_to_have"].append(item)
|
|
|
|
# Sort each bucket by urgency desc
|
|
for key in buckets:
|
|
buckets[key].sort(key=lambda x: x["days_overdue"], reverse=True)
|
|
|
|
return {
|
|
"buckets": buckets,
|
|
"counts": {k: len(v) for k, v in buckets.items()},
|
|
"generated_at": datetime.utcnow().isoformat(),
|
|
}
|
|
|
|
|
|
@router.get("/dashboard/module-status")
|
|
async def get_module_status(
|
|
db: Session = Depends(get_db),
|
|
tenant_id: str = Depends(_get_tenant_id),
|
|
):
|
|
"""Completion status for each SDK module based on DB record counts."""
|
|
modules = []
|
|
for mod in _MODULE_DEFS:
|
|
try:
|
|
row = db.execute(
|
|
text(f"SELECT COUNT(*) FROM {mod['table']} WHERE tenant_id = :tid"),
|
|
{"tid": tenant_id},
|
|
).fetchone()
|
|
count = int(row[0]) if row else 0
|
|
except Exception:
|
|
count = 0
|
|
|
|
# Simple heuristic: 0 = not started, 1-2 = in progress, 3+ = complete
|
|
if count == 0:
|
|
status = "not_started"
|
|
progress = 0
|
|
elif count < 3:
|
|
status = "in_progress"
|
|
progress = min(60, count * 30)
|
|
else:
|
|
status = "complete"
|
|
progress = 100
|
|
|
|
modules.append({
|
|
"key": mod["key"],
|
|
"label": mod["label"],
|
|
"count": count,
|
|
"status": status,
|
|
"progress": progress,
|
|
})
|
|
|
|
started = sum(1 for m in modules if m["status"] != "not_started")
|
|
complete = sum(1 for m in modules if m["status"] == "complete")
|
|
|
|
return {
|
|
"modules": modules,
|
|
"total": len(modules),
|
|
"started": started,
|
|
"complete": complete,
|
|
"overall_progress": round((complete / len(modules)) * 100, 1) if modules else 0,
|
|
}
|
|
|
|
|
|
@router.get("/dashboard/next-actions")
|
|
async def get_next_actions(
|
|
limit: int = Query(5, ge=1, le=20),
|
|
db: Session = Depends(get_db),
|
|
tenant_id: str = Depends(_get_tenant_id),
|
|
):
|
|
"""Top N most important actions sorted by urgency*impact."""
|
|
ctrl_repo = ControlRepository(db)
|
|
controls = ctrl_repo.get_all()
|
|
today = datetime.utcnow().date()
|
|
|
|
actions = []
|
|
for ctrl in controls:
|
|
status = ctrl.status.value if ctrl.status else "planned"
|
|
if status == "pass":
|
|
continue
|
|
|
|
days_overdue = 0
|
|
if ctrl.next_review_at:
|
|
review_date = ctrl.next_review_at.date() if hasattr(ctrl.next_review_at, "date") else ctrl.next_review_at
|
|
days_overdue = max(0, (today - review_date).days)
|
|
|
|
weight = _PRIORITY_WEIGHTS.get(ctrl.category if hasattr(ctrl, "category") else "best_practice", 1)
|
|
urgency_score = weight * 10 + days_overdue
|
|
|
|
actions.append({
|
|
"id": str(ctrl.id),
|
|
"control_id": ctrl.control_id,
|
|
"title": ctrl.title,
|
|
"status": status,
|
|
"domain": ctrl.domain.value if ctrl.domain else "unknown",
|
|
"owner": ctrl.owner,
|
|
"days_overdue": days_overdue,
|
|
"urgency_score": urgency_score,
|
|
"reason": "Ueberfaellig" if days_overdue > 0 else "Offen",
|
|
})
|
|
|
|
actions.sort(key=lambda x: x["urgency_score"], reverse=True)
|
|
return {"actions": actions[:limit]}
|
|
|
|
|
|
@router.post("/dashboard/snapshot")
|
|
async def create_score_snapshot(
|
|
db: Session = Depends(get_db),
|
|
tenant_id: str = Depends(_get_tenant_id),
|
|
):
|
|
"""Save current compliance score as a historical snapshot."""
|
|
ctrl_repo = ControlRepository(db)
|
|
evidence_repo = EvidenceRepository(db)
|
|
risk_repo = RiskRepository(db)
|
|
|
|
ctrl_stats = ctrl_repo.get_statistics()
|
|
evidence_stats = evidence_repo.get_statistics()
|
|
risks = risk_repo.get_all()
|
|
|
|
total = ctrl_stats.get("total", 0)
|
|
passing = ctrl_stats.get("pass", 0)
|
|
partial = ctrl_stats.get("partial", 0)
|
|
score = round(((passing + partial * 0.5) / total) * 100, 2) if total > 0 else 0
|
|
|
|
risks_high = sum(1 for r in risks if (r.inherent_risk.value if r.inherent_risk else "low") in ("high", "critical"))
|
|
|
|
today = date.today()
|
|
|
|
row = db.execute(text("""
|
|
INSERT INTO compliance_score_snapshots (
|
|
tenant_id, score, controls_total, controls_pass, controls_partial,
|
|
evidence_total, evidence_valid, risks_total, risks_high, snapshot_date
|
|
) VALUES (
|
|
:tenant_id, :score, :controls_total, :controls_pass, :controls_partial,
|
|
:evidence_total, :evidence_valid, :risks_total, :risks_high, :snapshot_date
|
|
)
|
|
ON CONFLICT (tenant_id, project_id, snapshot_date) DO UPDATE SET
|
|
score = EXCLUDED.score,
|
|
controls_total = EXCLUDED.controls_total,
|
|
controls_pass = EXCLUDED.controls_pass,
|
|
controls_partial = EXCLUDED.controls_partial,
|
|
evidence_total = EXCLUDED.evidence_total,
|
|
evidence_valid = EXCLUDED.evidence_valid,
|
|
risks_total = EXCLUDED.risks_total,
|
|
risks_high = EXCLUDED.risks_high
|
|
RETURNING *
|
|
"""), {
|
|
"tenant_id": tenant_id,
|
|
"score": score,
|
|
"controls_total": total,
|
|
"controls_pass": passing,
|
|
"controls_partial": partial,
|
|
"evidence_total": evidence_stats.get("total", 0),
|
|
"evidence_valid": evidence_stats.get("by_status", {}).get("valid", 0),
|
|
"risks_total": len(risks),
|
|
"risks_high": risks_high,
|
|
"snapshot_date": today,
|
|
}).fetchone()
|
|
db.commit()
|
|
|
|
return _row_to_dict(row)
|
|
|
|
|
|
@router.get("/dashboard/score-history")
|
|
async def get_score_history(
|
|
months: int = Query(12, ge=1, le=36),
|
|
db: Session = Depends(get_db),
|
|
tenant_id: str = Depends(_get_tenant_id),
|
|
):
|
|
"""Get compliance score history from snapshots."""
|
|
since = date.today() - timedelta(days=months * 30)
|
|
|
|
rows = db.execute(text("""
|
|
SELECT * FROM compliance_score_snapshots
|
|
WHERE tenant_id = :tenant_id AND snapshot_date >= :since
|
|
ORDER BY snapshot_date ASC
|
|
"""), {"tenant_id": tenant_id, "since": since}).fetchall()
|
|
|
|
snapshots = []
|
|
for r in rows:
|
|
d = _row_to_dict(r)
|
|
# Convert Decimal to float for JSON
|
|
if isinstance(d.get("score"), Decimal):
|
|
d["score"] = float(d["score"])
|
|
snapshots.append(d)
|
|
|
|
return {
|
|
"snapshots": snapshots,
|
|
"total": len(snapshots),
|
|
"period_months": months,
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# Evidence Distribution (Anti-Fake-Evidence Phase 3)
|
|
# ============================================================================
|
|
|
|
@router.get("/dashboard/evidence-distribution")
|
|
async def get_evidence_distribution(
|
|
db: Session = Depends(get_db),
|
|
tenant_id: str = Depends(_get_tenant_id),
|
|
):
|
|
"""Evidence counts by confidence level and four-eyes status."""
|
|
evidence_repo = EvidenceRepository(db)
|
|
all_evidence = evidence_repo.get_all()
|
|
|
|
by_confidence = {"E0": 0, "E1": 0, "E2": 0, "E3": 0, "E4": 0}
|
|
four_eyes_pending = 0
|
|
|
|
for e in all_evidence:
|
|
level = e.confidence_level.value if e.confidence_level else "E1"
|
|
if level in by_confidence:
|
|
by_confidence[level] += 1
|
|
if e.requires_four_eyes and e.approval_status not in ("approved", "rejected"):
|
|
four_eyes_pending += 1
|
|
|
|
return {
|
|
"by_confidence": by_confidence,
|
|
"four_eyes_pending": four_eyes_pending,
|
|
"total": len(all_evidence),
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# Traceability Matrix (Anti-Fake-Evidence Phase 4a)
|
|
# ============================================================================
|
|
|
|
@router.get("/dashboard/traceability-matrix", response_model=TraceabilityMatrixResponse)
|
|
async def get_traceability_matrix(
|
|
db: Session = Depends(get_db),
|
|
tenant_id: str = Depends(_get_tenant_id),
|
|
):
|
|
"""
|
|
Full traceability chain: Control → Evidence → Assertions.
|
|
|
|
Loads each entity set once, builds in-memory indices, and nests
|
|
the result so the frontend can render a matrix view.
|
|
"""
|
|
ctrl_repo = ControlRepository(db)
|
|
evidence_repo = EvidenceRepository(db)
|
|
|
|
# 1. Load all three entity sets
|
|
controls = ctrl_repo.get_all()
|
|
all_evidence = evidence_repo.get_all()
|
|
all_assertions = db.query(AssertionDB).filter(
|
|
AssertionDB.entity_type == "evidence",
|
|
).all()
|
|
|
|
# 2. Index assertions by evidence_id (entity_id)
|
|
assertions_by_evidence: Dict[str, list] = {}
|
|
for a in all_assertions:
|
|
assertions_by_evidence.setdefault(a.entity_id, []).append(a)
|
|
|
|
# 3. Index evidence by control_id
|
|
evidence_by_control: Dict[str, list] = {}
|
|
for e in all_evidence:
|
|
evidence_by_control.setdefault(str(e.control_id), []).append(e)
|
|
|
|
# 4. Build nested response
|
|
result_controls: list = []
|
|
total_controls = 0
|
|
covered_controls = 0
|
|
fully_verified = 0
|
|
|
|
for ctrl in controls:
|
|
total_controls += 1
|
|
ctrl_id = str(ctrl.id)
|
|
ctrl_evidence = evidence_by_control.get(ctrl_id, [])
|
|
|
|
nested_evidence: list = []
|
|
has_evidence = len(ctrl_evidence) > 0
|
|
has_assertions = False
|
|
all_verified = True
|
|
min_conf: Optional[str] = None
|
|
conf_order = {"E0": 0, "E1": 1, "E2": 2, "E3": 3, "E4": 4}
|
|
|
|
for e in ctrl_evidence:
|
|
ev_id = str(e.id)
|
|
ev_assertions = assertions_by_evidence.get(ev_id, [])
|
|
|
|
nested_assertions = [
|
|
TraceabilityAssertion(
|
|
id=str(a.id),
|
|
sentence_text=a.sentence_text,
|
|
assertion_type=a.assertion_type or "assertion",
|
|
confidence=a.confidence or 0.0,
|
|
verified=a.verified_by is not None,
|
|
)
|
|
for a in ev_assertions
|
|
]
|
|
|
|
if nested_assertions:
|
|
has_assertions = True
|
|
for na in nested_assertions:
|
|
if not na.verified:
|
|
all_verified = False
|
|
|
|
conf = e.confidence_level.value if e.confidence_level else "E1"
|
|
if min_conf is None or conf_order.get(conf, 1) < conf_order.get(min_conf, 1):
|
|
min_conf = conf
|
|
|
|
nested_evidence.append(TraceabilityEvidence(
|
|
id=ev_id,
|
|
title=e.title,
|
|
evidence_type=e.evidence_type,
|
|
confidence_level=conf,
|
|
status=e.status.value if e.status else "valid",
|
|
assertions=nested_assertions,
|
|
))
|
|
|
|
if not has_assertions:
|
|
all_verified = False
|
|
|
|
if has_evidence:
|
|
covered_controls += 1
|
|
if has_evidence and has_assertions and all_verified:
|
|
fully_verified += 1
|
|
|
|
coverage = TraceabilityCoverage(
|
|
has_evidence=has_evidence,
|
|
has_assertions=has_assertions,
|
|
all_assertions_verified=all_verified,
|
|
min_confidence_level=min_conf,
|
|
)
|
|
|
|
result_controls.append(TraceabilityControl(
|
|
id=ctrl_id,
|
|
control_id=ctrl.control_id,
|
|
title=ctrl.title,
|
|
status=ctrl.status.value if ctrl.status else "planned",
|
|
domain=ctrl.domain.value if ctrl.domain else "unknown",
|
|
evidence=nested_evidence,
|
|
coverage=coverage,
|
|
))
|
|
|
|
summary = {
|
|
"total_controls": total_controls,
|
|
"covered_controls": covered_controls,
|
|
"fully_verified": fully_verified,
|
|
"uncovered_controls": total_controls - covered_controls,
|
|
}
|
|
|
|
return TraceabilityMatrixResponse(controls=result_controls, summary=summary)
|
|
|
|
|
|
# ============================================================================
|
|
# Reports
|
|
# ============================================================================
|
|
|
|
@router.get("/reports/summary")
|
|
async def get_summary_report(db: Session = Depends(get_db)):
|
|
"""Get a quick summary report for the dashboard."""
|
|
from ..services.report_generator import ComplianceReportGenerator
|
|
|
|
generator = ComplianceReportGenerator(db)
|
|
return generator.generate_summary_report()
|
|
|
|
|
|
@router.get("/reports/{period}")
|
|
async def generate_period_report(
|
|
period: str = "monthly",
|
|
as_of_date: Optional[str] = None,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
Generate a compliance report for the specified period.
|
|
|
|
Args:
|
|
period: One of 'weekly', 'monthly', 'quarterly', 'yearly'
|
|
as_of_date: Report date (YYYY-MM-DD format, defaults to today)
|
|
|
|
Returns:
|
|
Complete compliance report
|
|
"""
|
|
from ..services.report_generator import ComplianceReportGenerator, ReportPeriod
|
|
|
|
# Validate period
|
|
try:
|
|
report_period = ReportPeriod(period)
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid period '{period}'. Must be one of: weekly, monthly, quarterly, yearly"
|
|
)
|
|
|
|
# Parse date
|
|
report_date = None
|
|
if as_of_date:
|
|
try:
|
|
report_date = datetime.strptime(as_of_date, "%Y-%m-%d").date()
|
|
except ValueError:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Invalid date format. Use YYYY-MM-DD"
|
|
)
|
|
|
|
generator = ComplianceReportGenerator(db)
|
|
return generator.generate_report(report_period, report_date)
|