Initial commit: breakpilot-compliance - Compliance SDK Platform

Services: Admin-Compliance, Backend-Compliance,
AI-Compliance-SDK, Consent-SDK, Developer-Portal,
PCA-Platform, DSMS

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Boenisch
2026-02-11 23:47:28 +01:00
commit 4435e7ea0a
734 changed files with 251369 additions and 0 deletions

View File

@@ -0,0 +1,33 @@
"""API routes for Compliance module."""
from .routes import router
from .audit_routes import router as audit_router
from .ai_routes import router as ai_router
from .evidence_routes import router as evidence_router
from .risk_routes import router as risk_router
from .dashboard_routes import router as dashboard_router
from .scraper_routes import router as scraper_router
from .module_routes import router as module_router
from .isms_routes import router as isms_router
# Include sub-routers
router.include_router(audit_router)
router.include_router(ai_router)
router.include_router(evidence_router)
router.include_router(risk_router)
router.include_router(dashboard_router)
router.include_router(scraper_router)
router.include_router(module_router)
router.include_router(isms_router)
__all__ = [
"router",
"audit_router",
"ai_router",
"evidence_router",
"risk_router",
"dashboard_router",
"scraper_router",
"module_router",
"isms_router",
]

View File

@@ -0,0 +1,909 @@
"""
FastAPI routes for AI Compliance Assistant.
Endpoints:
- /ai/status: Get AI provider status
- /ai/interpret: Interpret a requirement
- /ai/suggest-controls: Get AI-suggested controls
- /ai/assess-risk: Assess module risk
- /ai/gap-analysis: Analyze coverage gaps
- /ai/batch-interpret: Batch interpret requirements
- /ai/auto-map-controls: Auto-map controls to requirements
- /ai/batch-map-controls: Batch map controls
- /ai/switch-provider: Switch LLM provider
- /ai/providers: List available providers
- /pdf/*: PDF extraction endpoints
"""
import logging
import os
from typing import Optional, List
from pydantic import BaseModel
from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from ..db import (
RegulationRepository,
RequirementRepository,
ControlRepository,
)
from ..db.models import RegulationDB, RequirementDB
from .schemas import (
# AI Assistant schemas
AIInterpretationRequest, AIInterpretationResponse,
AIBatchInterpretationRequest, AIBatchInterpretationResponse,
AIControlSuggestionRequest, AIControlSuggestionResponse, AIControlSuggestionItem,
AIRiskAssessmentRequest, AIRiskAssessmentResponse, AIRiskFactor,
AIGapAnalysisRequest, AIGapAnalysisResponse,
AIStatusResponse,
# PDF extraction schemas
BSIAspectResponse, PDFExtractionResponse,
)
logger = logging.getLogger(__name__)
router = APIRouter(tags=["compliance-ai"])
# ============================================================================
# AI Assistant Endpoints (Sprint 4)
# ============================================================================
@router.get("/ai/status", response_model=AIStatusResponse)
async def get_ai_status():
"""Get the status of the AI provider."""
from ..services.llm_provider import get_shared_provider, LLMProviderType
try:
provider = get_shared_provider()
return AIStatusResponse(
provider=provider.provider_name,
model=provider.config.model,
is_available=True,
is_mock=provider.provider_name == "mock",
error=None,
)
except Exception as e:
return AIStatusResponse(
provider="unknown",
model="unknown",
is_available=False,
is_mock=True,
error=str(e),
)
@router.post("/ai/interpret", response_model=AIInterpretationResponse)
async def interpret_requirement(
request: AIInterpretationRequest,
db: Session = Depends(get_db),
):
"""Generate AI interpretation for a requirement."""
from ..services.ai_compliance_assistant import get_ai_assistant
# Get requirement from DB
req_repo = RequirementRepository(db)
requirement = req_repo.get_by_id(request.requirement_id)
if not requirement:
raise HTTPException(status_code=404, detail=f"Requirement {request.requirement_id} not found")
# Get regulation info
reg_repo = RegulationRepository(db)
regulation = reg_repo.get_by_id(requirement.regulation_id)
try:
assistant = get_ai_assistant()
result = await assistant.interpret_requirement(
requirement_id=requirement.id,
article=requirement.article,
title=requirement.title,
requirement_text=requirement.requirement_text or requirement.description or "",
regulation_code=regulation.code if regulation else "UNKNOWN",
regulation_name=regulation.name if regulation else "Unknown Regulation",
)
return AIInterpretationResponse(
requirement_id=result.requirement_id,
summary=result.summary,
applicability=result.applicability,
technical_measures=result.technical_measures,
affected_modules=result.affected_modules,
risk_level=result.risk_level,
implementation_hints=result.implementation_hints,
confidence_score=result.confidence_score,
error=result.error,
)
except Exception as e:
logger.error(f"AI interpretation failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/ai/suggest-controls", response_model=AIControlSuggestionResponse)
async def suggest_controls(
request: AIControlSuggestionRequest,
db: Session = Depends(get_db),
):
"""Get AI-suggested controls for a requirement."""
from ..services.ai_compliance_assistant import get_ai_assistant
# Get requirement from DB
req_repo = RequirementRepository(db)
requirement = req_repo.get_by_id(request.requirement_id)
if not requirement:
raise HTTPException(status_code=404, detail=f"Requirement {request.requirement_id} not found")
# Get regulation info
reg_repo = RegulationRepository(db)
regulation = reg_repo.get_by_id(requirement.regulation_id)
try:
assistant = get_ai_assistant()
suggestions = await assistant.suggest_controls(
requirement_title=requirement.title,
requirement_text=requirement.requirement_text or requirement.description or "",
regulation_name=regulation.name if regulation else "Unknown",
affected_modules=[], # Could be populated from previous interpretation
)
return AIControlSuggestionResponse(
requirement_id=request.requirement_id,
suggestions=[
AIControlSuggestionItem(
control_id=s.control_id,
domain=s.domain,
title=s.title,
description=s.description,
pass_criteria=s.pass_criteria,
implementation_guidance=s.implementation_guidance,
is_automated=s.is_automated,
automation_tool=s.automation_tool,
priority=s.priority,
confidence_score=s.confidence_score,
)
for s in suggestions
],
)
except Exception as e:
logger.error(f"AI control suggestion failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/ai/assess-risk", response_model=AIRiskAssessmentResponse)
async def assess_module_risk(
request: AIRiskAssessmentRequest,
db: Session = Depends(get_db),
):
"""Get AI risk assessment for a service module."""
from ..services.ai_compliance_assistant import get_ai_assistant
from ..db.repository import ServiceModuleRepository
# Get module from DB
module_repo = ServiceModuleRepository(db)
module = module_repo.get_by_id(request.module_id)
if not module:
module = module_repo.get_by_name(request.module_id)
if not module:
raise HTTPException(status_code=404, detail=f"Module {request.module_id} not found")
# Get regulations for this module
module_detail = module_repo.get_with_regulations(module.id)
regulations = []
if module_detail and module_detail.get("regulation_mappings"):
for mapping in module_detail["regulation_mappings"]:
regulations.append({
"code": mapping.get("regulation_code", ""),
"relevance": mapping.get("relevance_level", "medium"),
})
try:
assistant = get_ai_assistant()
result = await assistant.assess_module_risk(
module_name=module.name,
service_type=module.service_type.value if module.service_type else "unknown",
description=module.description or "",
processes_pii=module.processes_pii,
ai_components=module.ai_components,
criticality=module.criticality or "medium",
data_categories=module.data_categories or [],
regulations=regulations,
)
return AIRiskAssessmentResponse(
module_name=result.module_name,
overall_risk=result.overall_risk,
risk_factors=[
AIRiskFactor(
factor=f.get("factor", ""),
severity=f.get("severity", "medium"),
likelihood=f.get("likelihood", "medium"),
)
for f in result.risk_factors
],
recommendations=result.recommendations,
compliance_gaps=result.compliance_gaps,
confidence_score=result.confidence_score,
)
except Exception as e:
logger.error(f"AI risk assessment failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/ai/gap-analysis", response_model=AIGapAnalysisResponse)
async def analyze_gap(
request: AIGapAnalysisRequest,
db: Session = Depends(get_db),
):
"""Analyze coverage gaps between a requirement and existing controls."""
from ..services.ai_compliance_assistant import get_ai_assistant
# Get requirement from DB
req_repo = RequirementRepository(db)
requirement = req_repo.get_by_id(request.requirement_id)
if not requirement:
raise HTTPException(status_code=404, detail=f"Requirement {request.requirement_id} not found")
# Get regulation info
reg_repo = RegulationRepository(db)
regulation = reg_repo.get_by_id(requirement.regulation_id)
# Get existing control mappings from eager-loaded relationship
ctrl_repo = ControlRepository(db)
existing_controls = []
if requirement.control_mappings:
for mapping in requirement.control_mappings:
if mapping.control:
existing_controls.append({
"control_id": mapping.control.control_id,
"title": mapping.control.title,
"status": mapping.control.status.value if mapping.control.status else "unknown",
})
try:
assistant = get_ai_assistant()
result = await assistant.analyze_gap(
requirement_id=requirement.id,
requirement_title=requirement.title,
requirement_text=requirement.requirement_text or requirement.description or "",
regulation_code=regulation.code if regulation else "UNKNOWN",
existing_controls=existing_controls,
)
return AIGapAnalysisResponse(
requirement_id=result.requirement_id,
requirement_title=result.requirement_title,
coverage_level=result.coverage_level,
existing_controls=result.existing_controls,
missing_coverage=result.missing_coverage,
suggested_actions=result.suggested_actions,
)
except Exception as e:
logger.error(f"AI gap analysis failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/ai/batch-interpret", response_model=AIBatchInterpretationResponse)
async def batch_interpret_requirements(
request: AIBatchInterpretationRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
):
"""
Batch interpret multiple requirements.
For large batches, this runs in the background and returns immediately.
"""
from ..services.ai_compliance_assistant import get_ai_assistant
req_repo = RequirementRepository(db)
reg_repo = RegulationRepository(db)
# Build list of requirements to process
requirements_to_process = []
if request.requirement_ids:
for req_id in request.requirement_ids:
req = req_repo.get_by_id(req_id)
if req:
reg = reg_repo.get_by_id(req.regulation_id)
requirements_to_process.append({
"id": req.id,
"article": req.article,
"title": req.title,
"requirement_text": req.requirement_text or req.description or "",
"regulation_code": reg.code if reg else "UNKNOWN",
"regulation_name": reg.name if reg else "Unknown",
})
elif request.regulation_code:
# Get all requirements for a regulation
reg = reg_repo.get_by_code(request.regulation_code)
if reg:
reqs = req_repo.get_by_regulation(reg.id)
for req in reqs[:50]: # Limit to 50 for batch processing
requirements_to_process.append({
"id": req.id,
"article": req.article,
"title": req.title,
"requirement_text": req.requirement_text or req.description or "",
"regulation_code": reg.code,
"regulation_name": reg.name,
})
if not requirements_to_process:
raise HTTPException(status_code=400, detail="No requirements found to process")
# For small batches, process synchronously
if len(requirements_to_process) <= 5:
assistant = get_ai_assistant()
results = await assistant.batch_interpret_requirements(
requirements_to_process,
rate_limit=request.rate_limit,
)
return AIBatchInterpretationResponse(
total=len(requirements_to_process),
processed=len(results),
interpretations=[
AIInterpretationResponse(
requirement_id=r.requirement_id,
summary=r.summary,
applicability=r.applicability,
technical_measures=r.technical_measures,
affected_modules=r.affected_modules,
risk_level=r.risk_level,
implementation_hints=r.implementation_hints,
confidence_score=r.confidence_score,
error=r.error,
)
for r in results
],
)
# For large batches, return immediately with info
# (Background processing would be added in a production version)
return AIBatchInterpretationResponse(
total=len(requirements_to_process),
processed=0,
interpretations=[],
)
# ============================================================================
# PDF Extraction (Sprint 2)
# ============================================================================
@router.get("/pdf/available")
async def list_available_pdfs():
"""List available PDF documents for extraction."""
from pathlib import Path
docs_path = Path("/app/docs") if Path("/app/docs").exists() else Path("docs")
available = []
bsi_files = list(docs_path.glob("BSI-TR-*.pdf"))
for pdf_file in bsi_files:
available.append({
"filename": pdf_file.name,
"path": str(pdf_file),
"size_bytes": pdf_file.stat().st_size,
"type": "bsi_standard",
})
return {
"available_pdfs": available,
"total": len(available),
}
@router.post("/pdf/extract/{doc_code}", response_model=PDFExtractionResponse)
async def extract_pdf_requirements(
doc_code: str,
save_to_db: bool = Query(True, description="Save extracted requirements to database"),
db: Session = Depends(get_db),
):
"""
Extract requirements/aspects from a BSI-TR PDF document.
doc_code examples:
- BSI-TR-03161-1: General security requirements
- BSI-TR-03161-2: Web application security
- BSI-TR-03161-3: Backend/server security
"""
from pathlib import Path
from ..services.pdf_extractor import BSIPDFExtractor
from ..db.models import RegulationTypeEnum
# Find the PDF file
docs_path = Path("/app/docs") if Path("/app/docs").exists() else Path("docs")
pdf_path = docs_path / f"{doc_code}.pdf"
if not pdf_path.exists():
raise HTTPException(status_code=404, detail=f"PDF not found: {doc_code}.pdf")
# Extract aspects
extractor = BSIPDFExtractor()
try:
aspects = extractor.extract_from_file(str(pdf_path), source_name=doc_code)
except Exception as e:
logger.error(f"PDF extraction failed: {e}")
raise HTTPException(status_code=500, detail=f"PDF extraction failed: {str(e)}")
# Find or create the regulation
reg_repo = RegulationRepository(db)
regulation = reg_repo.get_by_code(doc_code)
if not regulation:
regulation = reg_repo.create(
code=doc_code,
name=f"BSI Technical Guideline {doc_code.split('-')[-1]}",
full_name=f"BSI Technische Richtlinie {doc_code}",
regulation_type=RegulationTypeEnum.BSI_STANDARD,
local_pdf_path=str(pdf_path),
)
# Save to database if requested
saved_count = 0
if save_to_db and aspects:
req_repo = RequirementRepository(db)
for aspect in aspects:
# Check if requirement already exists
existing = db.query(RequirementDB).filter(
RequirementDB.regulation_id == regulation.id,
RequirementDB.article == aspect.aspect_id,
).first()
if not existing:
try:
req_repo.create(
regulation_id=regulation.id,
article=aspect.aspect_id,
title=aspect.title[:300] if aspect.title else "",
description=f"Category: {aspect.category.value}",
requirement_text=aspect.full_text[:4000] if aspect.full_text else "",
priority=1 if aspect.requirement_level.value == "MUSS" else (
2 if aspect.requirement_level.value == "SOLL" else 3
),
)
saved_count += 1
except Exception as e:
logger.warning(f"Failed to save aspect {aspect.aspect_id}: {e}")
db.commit()
# Convert aspects to response format
aspect_responses = [
BSIAspectResponse(
aspect_id=a.aspect_id,
title=a.title,
full_text=a.full_text,
category=a.category.value,
page_number=a.page_number,
section=a.section,
requirement_level=a.requirement_level.value,
source_document=a.source_document,
)
for a in aspects
]
return PDFExtractionResponse(
doc_code=doc_code,
total_extracted=len(aspects),
saved_to_db=saved_count,
aspects=aspect_responses,
)
@router.get("/pdf/extraction-stats")
async def get_extraction_stats(db: Session = Depends(get_db)):
"""Get statistics about extracted PDF requirements."""
from sqlalchemy import func
# Count requirements per BSI regulation
stats = (
db.query(
RegulationDB.code,
func.count(RequirementDB.id).label('count')
)
.join(RequirementDB, RequirementDB.regulation_id == RegulationDB.id)
.filter(RegulationDB.code.like('BSI-%'))
.group_by(RegulationDB.code)
.all()
)
return {
"bsi_requirements": {code: count for code, count in stats},
"total_bsi_requirements": sum(count for _, count in stats),
}
# ============================================================================
# Automatic Control Mapping
# ============================================================================
# Domain keyword mapping for automatic control assignment
DOMAIN_KEYWORDS = {
"priv": ["datenschutz", "dsgvo", "gdpr", "privacy", "personenbezogen", "einwilligung",
"consent", "betroffenenrechte", "verarbeitungsverzeichnis", "pii", "auftragsverarbeitung"],
"iam": ["authentifizierung", "auth", "login", "passwort", "password", "zugang", "access",
"berechtigung", "session", "token", "jwt", "oauth", "sso", "mfa", "2fa", "rbac"],
"crypto": ["verschlüsselung", "encryption", "kryptograph", "crypto", "hash", "schlüssel",
"key", "tls", "ssl", "zertifikat", "signatur", "aes", "rsa"],
"sdlc": ["entwicklung", "code", "software", "sast", "dast", "dependency", "vulnerable",
"cve", "security scan", "semgrep", "trivy", "sbom", "ci/cd", "build"],
"ops": ["monitoring", "logging", "log", "protokoll", "backup", "incident", "alert",
"availability", "uptime", "patch", "update", "deployment"],
"ai": ["künstliche intelligenz", "ki", "ai", "machine learning", "ml", "modell",
"training", "inference", "bias", "ai act", "hochrisiko"],
"cra": ["vulnerability", "schwachstelle", "disclosure", "patch", "eol", "end-of-life",
"supply chain", "sbom", "cve", "update"],
"gov": ["richtlinie", "policy", "governance", "verantwortlich", "raci", "dokumentation",
"prozess", "awareness", "schulung", "training"],
"aud": ["audit", "prüfung", "nachweis", "evidence", "traceability", "nachvollzieh",
"protokoll", "export", "report"],
}
@router.post("/ai/auto-map-controls")
async def auto_map_controls(
requirement_id: str = Query(..., description="Requirement UUID"),
save_to_db: bool = Query(True, description="Save mappings to database"),
use_ai: bool = Query(False, description="Use AI for better matching (slower)"),
db: Session = Depends(get_db),
):
"""
Automatically map controls to a requirement.
Uses keyword matching by default (fast) or AI for better accuracy (slower).
"""
from ..db.models import ControlMappingDB
# Get requirement
req_repo = RequirementRepository(db)
requirement = req_repo.get_by_id(requirement_id)
if not requirement:
raise HTTPException(status_code=404, detail=f"Requirement {requirement_id} not found")
# Get all controls
ctrl_repo = ControlRepository(db)
all_controls = ctrl_repo.get_all()
# Text to analyze
text_to_analyze = f"{requirement.title} {requirement.requirement_text or ''} {requirement.description or ''}"
text_lower = text_to_analyze.lower()
matched_controls = []
if use_ai:
# Use AI for matching (slower but more accurate)
from ..services.ai_compliance_assistant import get_ai_assistant
assistant = get_ai_assistant()
reg_repo = RegulationRepository(db)
regulation = reg_repo.get_by_id(requirement.regulation_id)
try:
suggestions = await assistant.suggest_controls(
requirement_title=requirement.title,
requirement_text=requirement.requirement_text or "",
regulation_name=regulation.name if regulation else "Unknown",
affected_modules=[],
)
# Match suggestions to existing controls by domain
for suggestion in suggestions:
domain = suggestion.domain.lower()
domain_controls = [c for c in all_controls if c.domain and c.domain.value.lower() == domain]
if domain_controls:
# Take the first matching control from this domain
matched_controls.append({
"control": domain_controls[0],
"coverage": "partial",
"notes": f"AI suggested: {suggestion.title}",
"confidence": suggestion.confidence_score,
})
except Exception as e:
logger.warning(f"AI mapping failed, falling back to keyword matching: {e}")
use_ai = False # Fall back to keyword matching
if not use_ai:
# Keyword-based matching (fast)
domain_scores = {}
for domain, keywords in DOMAIN_KEYWORDS.items():
score = sum(1 for kw in keywords if kw.lower() in text_lower)
if score > 0:
domain_scores[domain] = score
# Sort domains by score
sorted_domains = sorted(domain_scores.items(), key=lambda x: x[1], reverse=True)
# Take top 3 domains
for domain, score in sorted_domains[:3]:
domain_controls = [c for c in all_controls if c.domain and c.domain.value.lower() == domain]
for ctrl in domain_controls[:2]: # Max 2 controls per domain
matched_controls.append({
"control": ctrl,
"coverage": "partial" if score < 3 else "full",
"notes": f"Keyword match (score: {score})",
"confidence": min(0.9, 0.5 + score * 0.1),
})
# Save mappings to database if requested
created_mappings = []
if save_to_db and matched_controls:
for match in matched_controls:
ctrl = match["control"]
# Check if mapping already exists
existing = db.query(ControlMappingDB).filter(
ControlMappingDB.requirement_id == requirement_id,
ControlMappingDB.control_id == ctrl.id,
).first()
if not existing:
mapping = ControlMappingDB(
requirement_id=requirement_id,
control_id=ctrl.id,
coverage_level=match["coverage"],
notes=match["notes"],
)
db.add(mapping)
created_mappings.append({
"control_id": ctrl.control_id,
"domain": ctrl.domain.value if ctrl.domain else None,
"title": ctrl.title,
"coverage_level": match["coverage"],
"notes": match["notes"],
})
db.commit()
return {
"requirement_id": requirement_id,
"requirement_title": requirement.title,
"matched_controls": len(matched_controls),
"created_mappings": len(created_mappings),
"mappings": created_mappings if save_to_db else [
{
"control_id": m["control"].control_id,
"domain": m["control"].domain.value if m["control"].domain else None,
"title": m["control"].title,
"coverage_level": m["coverage"],
"confidence": m.get("confidence", 0.7),
}
for m in matched_controls
],
}
@router.post("/ai/batch-map-controls")
async def batch_map_controls(
regulation_code: Optional[str] = Query(None, description="Filter by regulation code"),
limit: int = Query(100, description="Max requirements to process"),
use_ai: bool = Query(False, description="Use AI for matching (slower)"),
background_tasks: BackgroundTasks = None,
db: Session = Depends(get_db),
):
"""
Batch map controls to multiple requirements.
Processes requirements that don't have mappings yet.
"""
from ..db.models import ControlMappingDB
# Get requirements without mappings
req_repo = RequirementRepository(db)
if regulation_code:
reg_repo = RegulationRepository(db)
regulation = reg_repo.get_by_code(regulation_code)
if not regulation:
raise HTTPException(status_code=404, detail=f"Regulation {regulation_code} not found")
all_requirements = req_repo.get_by_regulation(regulation.id)
else:
all_requirements = req_repo.get_all()
# Filter to requirements without mappings
requirements_without_mappings = []
for req in all_requirements:
existing = db.query(ControlMappingDB).filter(
ControlMappingDB.requirement_id == req.id
).first()
if not existing:
requirements_without_mappings.append(req)
# Limit processing
to_process = requirements_without_mappings[:limit]
# Get all controls once
ctrl_repo = ControlRepository(db)
all_controls = ctrl_repo.get_all()
# Process each requirement
results = []
for req in to_process:
try:
text_to_analyze = f"{req.title} {req.requirement_text or ''} {req.description or ''}"
text_lower = text_to_analyze.lower()
# Quick keyword matching
domain_scores = {}
for domain, keywords in DOMAIN_KEYWORDS.items():
score = sum(1 for kw in keywords if kw.lower() in text_lower)
if score > 0:
domain_scores[domain] = score
if domain_scores:
# Get top domain
top_domain = max(domain_scores.items(), key=lambda x: x[1])[0]
domain_controls = [c for c in all_controls if c.domain and c.domain.value.lower() == top_domain]
if domain_controls:
ctrl = domain_controls[0]
# Create mapping
mapping = ControlMappingDB(
requirement_id=req.id,
control_id=ctrl.id,
coverage_level="partial",
notes=f"Auto-mapped (domain: {top_domain})",
)
db.add(mapping)
results.append({
"requirement_id": req.id,
"requirement_title": req.title[:50],
"control_id": ctrl.control_id,
"domain": top_domain,
})
except Exception as e:
logger.warning(f"Failed to map requirement {req.id}: {e}")
db.commit()
return {
"processed": len(to_process),
"mapped": len(results),
"remaining": len(requirements_without_mappings) - len(to_process),
"mappings": results[:20], # Only return first 20 for readability
}
# ============================================================================
# LLM Provider Switch Endpoints (Runtime Configuration)
# ============================================================================
class ProviderSwitchRequest(BaseModel):
"""Request to switch LLM provider at runtime."""
provider: str # "anthropic" or "self_hosted"
model: Optional[str] = None # Optional: override model
url: Optional[str] = None # Optional: override URL for self-hosted
class ProviderSwitchResponse(BaseModel):
"""Response after switching LLM provider."""
success: bool
previous_provider: str
new_provider: str
model: str
url: Optional[str] = None
message: str
@router.post("/ai/switch-provider", response_model=ProviderSwitchResponse)
async def switch_llm_provider(request: ProviderSwitchRequest):
"""
Switch the LLM provider at runtime between Anthropic API and Self-Hosted (Ollama).
This allows developers to toggle between:
- **anthropic**: Cloud-based Claude API (kostenpflichtig, Daten gehen zu Anthropic)
- **self_hosted**: Self-hosted Ollama on Mac Mini (kostenlos, DSGVO-konform, Daten bleiben intern)
Note: This change is temporary for the current container session.
For permanent changes, modify the docker-compose.yml environment variables.
"""
from ..services.llm_provider import (
reset_shared_provider,
get_shared_provider,
LLMProviderType,
)
try:
# Get current provider info before switch
old_provider = get_shared_provider()
old_provider_name = old_provider.provider_name
# Map string to enum
provider_map = {
"anthropic": LLMProviderType.ANTHROPIC,
"self_hosted": LLMProviderType.SELF_HOSTED,
"mock": LLMProviderType.MOCK,
}
if request.provider.lower() not in provider_map:
raise HTTPException(
status_code=400,
detail=f"Invalid provider: {request.provider}. Use 'anthropic' or 'self_hosted'"
)
# Update environment variables for the new provider
os.environ["COMPLIANCE_LLM_PROVIDER"] = request.provider.lower()
if request.provider.lower() == "self_hosted":
if request.url:
os.environ["SELF_HOSTED_LLM_URL"] = request.url
if request.model:
os.environ["SELF_HOSTED_LLM_MODEL"] = request.model
else:
# Default to llama3.1:70b for compliance tasks
os.environ["SELF_HOSTED_LLM_MODEL"] = os.environ.get(
"SELF_HOSTED_LLM_MODEL", "llama3.1:70b"
)
elif request.provider.lower() == "anthropic":
if request.model:
os.environ["ANTHROPIC_MODEL"] = request.model
# Reset the shared provider to pick up new config
reset_shared_provider()
# Get the new provider
new_provider = get_shared_provider()
return ProviderSwitchResponse(
success=True,
previous_provider=old_provider_name,
new_provider=new_provider.provider_name,
model=new_provider.config.model,
url=new_provider.config.base_url if hasattr(new_provider.config, 'base_url') else None,
message=f"Successfully switched from {old_provider_name} to {new_provider.provider_name}",
)
except Exception as e:
logger.error(f"Failed to switch LLM provider: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/ai/providers")
async def list_available_providers():
"""
List available LLM providers with their descriptions.
This helps developers understand which provider to use for which scenario.
"""
return {
"providers": [
{
"id": "anthropic",
"name": "Anthropic Claude API",
"description_de": "Cloud-basierte KI von Anthropic. Kostenpflichtig (API-Credits). Daten werden zur Verarbeitung an Anthropic gesendet.",
"description_en": "Cloud-based AI from Anthropic. Paid service (API credits). Data is sent to Anthropic for processing.",
"gdpr_compliant": False,
"data_location": "Anthropic Cloud (USA)",
"cost": "Kostenpflichtig pro Token",
"use_case": "Produktiv, wenn hohe Qualitaet benoetigt wird",
"models": ["claude-sonnet-4-20250514", "claude-3-5-sonnet-20241022"],
},
{
"id": "self_hosted",
"name": "Self-Hosted Ollama",
"description_de": "Lokales LLM auf dem Mac Mini M4 Pro (64GB RAM). Kostenlos. Alle Daten bleiben intern - DSGVO-konform!",
"description_en": "Local LLM on Mac Mini M4 Pro (64GB RAM). Free. All data stays internal - GDPR compliant!",
"gdpr_compliant": True,
"data_location": "Lokal auf Mac Mini",
"cost": "Kostenlos (Hardware bereits vorhanden)",
"use_case": "Entwicklung, Testing, DSGVO-sensitive Dokumente",
"models": ["llama3.1:70b", "llama3.2-vision", "mixtral:8x7b"],
},
],
"current_provider": None, # Will be filled by get_ai_status
"note_de": "Umschaltung erfolgt sofort, aber nur fuer diese Container-Session. Fuer permanente Aenderung docker-compose.yml anpassen.",
"note_en": "Switch takes effect immediately but only for this container session. For permanent change, modify docker-compose.yml.",
}

View File

@@ -0,0 +1,637 @@
"""
FastAPI routes for Audit Sessions & Sign-off functionality.
Sprint 3 Phase 3: Auditor-Verbesserungen
Endpoints:
- /audit/sessions: Manage audit sessions
- /audit/checklist: Audit checklist with sign-off
"""
import logging
from datetime import datetime
from typing import Optional, List
from uuid import uuid4
import hashlib
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from sqlalchemy import func
from classroom_engine.database import get_db
from ..db.models import (
AuditSessionDB, AuditSignOffDB, AuditResultEnum, AuditSessionStatusEnum,
RequirementDB, RegulationDB, ControlMappingDB
)
from .schemas import (
CreateAuditSessionRequest, AuditSessionResponse, AuditSessionSummary, AuditSessionDetailResponse,
AuditSessionListResponse, SignOffRequest, SignOffResponse,
AuditChecklistItem, AuditChecklistResponse, AuditStatistics,
PaginationMeta,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/audit", tags=["compliance-audit"])
# ============================================================================
# Audit Sessions
# ============================================================================
@router.post("/sessions", response_model=AuditSessionResponse)
async def create_audit_session(
request: CreateAuditSessionRequest,
db: Session = Depends(get_db),
):
"""
Create a new audit session for structured compliance reviews.
An audit session groups requirements for systematic review by an auditor.
"""
# Get total requirements count based on filters
query = db.query(RequirementDB)
if request.regulation_codes:
reg_ids = db.query(RegulationDB.id).filter(
RegulationDB.code.in_(request.regulation_codes)
).all()
reg_ids = [r[0] for r in reg_ids]
query = query.filter(RequirementDB.regulation_id.in_(reg_ids))
total_items = query.count()
# Create the session
session = AuditSessionDB(
id=str(uuid4()),
name=request.name,
description=request.description,
auditor_name=request.auditor_name,
auditor_email=request.auditor_email,
auditor_organization=request.auditor_organization,
status=AuditSessionStatusEnum.DRAFT,
regulation_ids=request.regulation_codes,
total_items=total_items,
completed_items=0,
compliant_count=0,
non_compliant_count=0,
)
db.add(session)
db.commit()
db.refresh(session)
return AuditSessionResponse(
id=session.id,
name=session.name,
description=session.description,
auditor_name=session.auditor_name,
auditor_email=session.auditor_email,
auditor_organization=session.auditor_organization,
status=session.status.value,
regulation_ids=session.regulation_ids,
total_items=session.total_items,
completed_items=session.completed_items,
compliant_count=session.compliant_count,
non_compliant_count=session.non_compliant_count,
completion_percentage=session.completion_percentage,
created_at=session.created_at,
started_at=session.started_at,
completed_at=session.completed_at,
)
@router.get("/sessions", response_model=List[AuditSessionSummary])
async def list_audit_sessions(
status: Optional[str] = None,
db: Session = Depends(get_db),
):
"""
List all audit sessions, optionally filtered by status.
"""
query = db.query(AuditSessionDB)
if status:
try:
status_enum = AuditSessionStatusEnum(status)
query = query.filter(AuditSessionDB.status == status_enum)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid status: {status}. Valid values: draft, in_progress, completed, archived"
)
sessions = query.order_by(AuditSessionDB.created_at.desc()).all()
return [
AuditSessionSummary(
id=s.id,
name=s.name,
auditor_name=s.auditor_name,
status=s.status.value,
total_items=s.total_items,
completed_items=s.completed_items,
completion_percentage=s.completion_percentage,
created_at=s.created_at,
started_at=s.started_at,
completed_at=s.completed_at,
)
for s in sessions
]
@router.get("/sessions/{session_id}", response_model=AuditSessionDetailResponse)
async def get_audit_session(
session_id: str,
db: Session = Depends(get_db),
):
"""
Get detailed information about a specific audit session.
"""
session = db.query(AuditSessionDB).filter(AuditSessionDB.id == session_id).first()
if not session:
raise HTTPException(status_code=404, detail=f"Audit session {session_id} not found")
# Get sign-off statistics
signoffs = db.query(AuditSignOffDB).filter(AuditSignOffDB.session_id == session_id).all()
stats = AuditStatistics(
total=session.total_items,
compliant=sum(1 for s in signoffs if s.result == AuditResultEnum.COMPLIANT),
compliant_with_notes=sum(1 for s in signoffs if s.result == AuditResultEnum.COMPLIANT_WITH_NOTES),
non_compliant=sum(1 for s in signoffs if s.result == AuditResultEnum.NON_COMPLIANT),
not_applicable=sum(1 for s in signoffs if s.result == AuditResultEnum.NOT_APPLICABLE),
pending=session.total_items - len(signoffs),
completion_percentage=session.completion_percentage,
)
return AuditSessionDetail(
id=session.id,
name=session.name,
description=session.description,
auditor_name=session.auditor_name,
auditor_email=session.auditor_email,
auditor_organization=session.auditor_organization,
status=session.status.value,
regulation_ids=session.regulation_ids,
total_items=session.total_items,
completed_items=session.completed_items,
compliant_count=session.compliant_count,
non_compliant_count=session.non_compliant_count,
completion_percentage=session.completion_percentage,
created_at=session.created_at,
started_at=session.started_at,
completed_at=session.completed_at,
statistics=stats,
)
@router.put("/sessions/{session_id}/start")
async def start_audit_session(
session_id: str,
db: Session = Depends(get_db),
):
"""
Start an audit session (change status from draft to in_progress).
"""
session = db.query(AuditSessionDB).filter(AuditSessionDB.id == session_id).first()
if not session:
raise HTTPException(status_code=404, detail=f"Audit session {session_id} not found")
if session.status != AuditSessionStatusEnum.DRAFT:
raise HTTPException(
status_code=400,
detail=f"Session cannot be started. Current status: {session.status.value}"
)
session.status = AuditSessionStatusEnum.IN_PROGRESS
session.started_at = datetime.utcnow()
db.commit()
return {"success": True, "message": "Audit session started", "status": "in_progress"}
@router.put("/sessions/{session_id}/complete")
async def complete_audit_session(
session_id: str,
db: Session = Depends(get_db),
):
"""
Complete an audit session (change status from in_progress to completed).
"""
session = db.query(AuditSessionDB).filter(AuditSessionDB.id == session_id).first()
if not session:
raise HTTPException(status_code=404, detail=f"Audit session {session_id} not found")
if session.status != AuditSessionStatusEnum.IN_PROGRESS:
raise HTTPException(
status_code=400,
detail=f"Session cannot be completed. Current status: {session.status.value}"
)
session.status = AuditSessionStatusEnum.COMPLETED
session.completed_at = datetime.utcnow()
db.commit()
return {"success": True, "message": "Audit session completed", "status": "completed"}
@router.put("/sessions/{session_id}/archive")
async def archive_audit_session(
session_id: str,
db: Session = Depends(get_db),
):
"""
Archive a completed audit session.
"""
session = db.query(AuditSessionDB).filter(AuditSessionDB.id == session_id).first()
if not session:
raise HTTPException(status_code=404, detail=f"Audit session {session_id} not found")
if session.status != AuditSessionStatusEnum.COMPLETED:
raise HTTPException(
status_code=400,
detail=f"Only completed sessions can be archived. Current status: {session.status.value}"
)
session.status = AuditSessionStatusEnum.ARCHIVED
db.commit()
return {"success": True, "message": "Audit session archived", "status": "archived"}
@router.delete("/sessions/{session_id}")
async def delete_audit_session(
session_id: str,
db: Session = Depends(get_db),
):
"""
Delete an audit session and all its sign-offs.
Only draft sessions can be deleted.
"""
session = db.query(AuditSessionDB).filter(AuditSessionDB.id == session_id).first()
if not session:
raise HTTPException(status_code=404, detail=f"Audit session {session_id} not found")
if session.status not in [AuditSessionStatusEnum.DRAFT, AuditSessionStatusEnum.ARCHIVED]:
raise HTTPException(
status_code=400,
detail=f"Cannot delete session with status: {session.status.value}. Archive it first."
)
# Delete all sign-offs first (cascade should handle this, but be explicit)
db.query(AuditSignOffDB).filter(AuditSignOffDB.session_id == session_id).delete()
# Delete the session
db.delete(session)
db.commit()
return {"success": True, "message": f"Audit session {session_id} deleted"}
# ============================================================================
# Audit Checklist & Sign-off
# ============================================================================
@router.get("/checklist/{session_id}", response_model=AuditChecklistResponse)
async def get_audit_checklist(
session_id: str,
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
status_filter: Optional[str] = None,
regulation_filter: Optional[str] = None,
search: Optional[str] = None,
db: Session = Depends(get_db),
):
"""
Get the audit checklist for a session with pagination.
Returns requirements with their current sign-off status.
"""
# Get the session
session = db.query(AuditSessionDB).filter(AuditSessionDB.id == session_id).first()
if not session:
raise HTTPException(status_code=404, detail=f"Audit session {session_id} not found")
# Build base query for requirements
query = db.query(RequirementDB).join(RegulationDB)
# Apply session's regulation filter
if session.regulation_ids:
query = query.filter(RegulationDB.code.in_(session.regulation_ids))
# Apply additional filters
if regulation_filter:
query = query.filter(RegulationDB.code == regulation_filter)
if search:
search_term = f"%{search}%"
query = query.filter(
(RequirementDB.title.ilike(search_term)) |
(RequirementDB.article.ilike(search_term)) |
(RequirementDB.description.ilike(search_term))
)
# Get total count before pagination
total_count = query.count()
# Apply pagination
requirements = (
query
.order_by(RegulationDB.code, RequirementDB.article)
.offset((page - 1) * page_size)
.limit(page_size)
.all()
)
# Get existing sign-offs for these requirements
req_ids = [r.id for r in requirements]
signoffs = (
db.query(AuditSignOffDB)
.filter(AuditSignOffDB.session_id == session_id)
.filter(AuditSignOffDB.requirement_id.in_(req_ids))
.all()
)
signoff_map = {s.requirement_id: s for s in signoffs}
# Get control mappings counts
mapping_counts = (
db.query(ControlMappingDB.requirement_id, func.count(ControlMappingDB.id))
.filter(ControlMappingDB.requirement_id.in_(req_ids))
.group_by(ControlMappingDB.requirement_id)
.all()
)
mapping_count_map = dict(mapping_counts)
# Build checklist items
items = []
for req in requirements:
signoff = signoff_map.get(req.id)
# Apply status filter if specified
if status_filter:
if status_filter == "pending" and signoff is not None:
continue
elif status_filter != "pending" and (signoff is None or signoff.result.value != status_filter):
continue
item = AuditChecklistItem(
requirement_id=req.id,
regulation_code=req.regulation.code,
article=req.article,
paragraph=req.paragraph,
title=req.title,
description=req.description,
current_result=signoff.result.value if signoff else "pending",
notes=signoff.notes if signoff else None,
is_signed=signoff.signature_hash is not None if signoff else False,
signed_at=signoff.signed_at if signoff else None,
signed_by=signoff.signed_by if signoff else None,
evidence_count=0, # TODO: Add evidence count
controls_mapped=mapping_count_map.get(req.id, 0),
implementation_status=req.implementation_status,
priority=req.priority,
)
items.append(item)
# Calculate statistics
all_signoffs = db.query(AuditSignOffDB).filter(AuditSignOffDB.session_id == session_id).all()
stats = AuditStatistics(
total=session.total_items,
compliant=sum(1 for s in all_signoffs if s.result == AuditResultEnum.COMPLIANT),
compliant_with_notes=sum(1 for s in all_signoffs if s.result == AuditResultEnum.COMPLIANT_WITH_NOTES),
non_compliant=sum(1 for s in all_signoffs if s.result == AuditResultEnum.NON_COMPLIANT),
not_applicable=sum(1 for s in all_signoffs if s.result == AuditResultEnum.NOT_APPLICABLE),
pending=session.total_items - len(all_signoffs),
completion_percentage=session.completion_percentage,
)
return AuditChecklistResponse(
session=AuditSessionSummary(
id=session.id,
name=session.name,
auditor_name=session.auditor_name,
status=session.status.value,
total_items=session.total_items,
completed_items=session.completed_items,
completion_percentage=session.completion_percentage,
created_at=session.created_at,
started_at=session.started_at,
completed_at=session.completed_at,
),
items=items,
pagination=PaginationMeta(
page=page,
page_size=page_size,
total=total_count,
total_pages=(total_count + page_size - 1) // page_size,
),
statistics=stats,
)
@router.put("/checklist/{session_id}/items/{requirement_id}/sign-off", response_model=SignOffResponse)
async def sign_off_item(
session_id: str,
requirement_id: str,
request: SignOffRequest,
db: Session = Depends(get_db),
):
"""
Sign off on a specific requirement in an audit session.
If sign=True, creates a digital signature (SHA-256 hash).
"""
# Validate session exists and is in progress
session = db.query(AuditSessionDB).filter(AuditSessionDB.id == session_id).first()
if not session:
raise HTTPException(status_code=404, detail=f"Audit session {session_id} not found")
if session.status not in [AuditSessionStatusEnum.DRAFT, AuditSessionStatusEnum.IN_PROGRESS]:
raise HTTPException(
status_code=400,
detail=f"Cannot sign off items in session with status: {session.status.value}"
)
# Validate requirement exists
requirement = db.query(RequirementDB).filter(RequirementDB.id == requirement_id).first()
if not requirement:
raise HTTPException(status_code=404, detail=f"Requirement {requirement_id} not found")
# Map string result to enum
try:
result_enum = AuditResultEnum(request.result)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid result: {request.result}. Valid values: compliant, compliant_notes, non_compliant, not_applicable, pending"
)
# Check if sign-off already exists
signoff = (
db.query(AuditSignOffDB)
.filter(AuditSignOffDB.session_id == session_id)
.filter(AuditSignOffDB.requirement_id == requirement_id)
.first()
)
was_new = signoff is None
old_result = signoff.result if signoff else None
if signoff:
# Update existing sign-off
signoff.result = result_enum
signoff.notes = request.notes
signoff.updated_at = datetime.utcnow()
else:
# Create new sign-off
signoff = AuditSignOffDB(
id=str(uuid4()),
session_id=session_id,
requirement_id=requirement_id,
result=result_enum,
notes=request.notes,
)
db.add(signoff)
# Create digital signature if requested
signature = None
if request.sign:
timestamp = datetime.utcnow().isoformat()
data = f"{result_enum.value}|{requirement_id}|{session.auditor_name}|{timestamp}"
signature = hashlib.sha256(data.encode()).hexdigest()
signoff.signature_hash = signature
signoff.signed_at = datetime.utcnow()
signoff.signed_by = session.auditor_name
# Update session statistics
if was_new:
session.completed_items += 1
# Update compliant/non-compliant counts
if old_result != result_enum:
if old_result == AuditResultEnum.COMPLIANT or old_result == AuditResultEnum.COMPLIANT_WITH_NOTES:
session.compliant_count = max(0, session.compliant_count - 1)
elif old_result == AuditResultEnum.NON_COMPLIANT:
session.non_compliant_count = max(0, session.non_compliant_count - 1)
if result_enum == AuditResultEnum.COMPLIANT or result_enum == AuditResultEnum.COMPLIANT_WITH_NOTES:
session.compliant_count += 1
elif result_enum == AuditResultEnum.NON_COMPLIANT:
session.non_compliant_count += 1
# Auto-start session if this is the first sign-off
if session.status == AuditSessionStatusEnum.DRAFT:
session.status = AuditSessionStatusEnum.IN_PROGRESS
session.started_at = datetime.utcnow()
db.commit()
db.refresh(signoff)
return SignOffResponse(
id=signoff.id,
session_id=signoff.session_id,
requirement_id=signoff.requirement_id,
result=signoff.result.value,
notes=signoff.notes,
is_signed=signoff.signature_hash is not None,
signature_hash=signoff.signature_hash,
signed_at=signoff.signed_at,
signed_by=signoff.signed_by,
created_at=signoff.created_at,
updated_at=signoff.updated_at,
)
@router.get("/checklist/{session_id}/items/{requirement_id}", response_model=SignOffResponse)
async def get_sign_off(
session_id: str,
requirement_id: str,
db: Session = Depends(get_db),
):
"""
Get the current sign-off status for a specific requirement.
"""
signoff = (
db.query(AuditSignOffDB)
.filter(AuditSignOffDB.session_id == session_id)
.filter(AuditSignOffDB.requirement_id == requirement_id)
.first()
)
if not signoff:
raise HTTPException(
status_code=404,
detail=f"No sign-off found for requirement {requirement_id} in session {session_id}"
)
return SignOffResponse(
id=signoff.id,
session_id=signoff.session_id,
requirement_id=signoff.requirement_id,
result=signoff.result.value,
notes=signoff.notes,
is_signed=signoff.signature_hash is not None,
signature_hash=signoff.signature_hash,
signed_at=signoff.signed_at,
signed_by=signoff.signed_by,
created_at=signoff.created_at,
updated_at=signoff.updated_at,
)
# ============================================================================
# PDF Report Generation
# ============================================================================
@router.get("/sessions/{session_id}/report/pdf")
async def generate_audit_pdf_report(
session_id: str,
language: str = Query("de", regex="^(de|en)$"),
include_signatures: bool = Query(True),
db: Session = Depends(get_db),
):
"""
Generate a PDF report for an audit session.
Parameters:
- session_id: The audit session ID
- language: Output language ('de' or 'en'), default 'de'
- include_signatures: Include digital signature verification section
Returns:
- PDF file as streaming response
"""
from fastapi.responses import StreamingResponse
import io
from ..services.audit_pdf_generator import AuditPDFGenerator
# Validate session exists
session = db.query(AuditSessionDB).filter(AuditSessionDB.id == session_id).first()
if not session:
raise HTTPException(
status_code=404,
detail=f"Audit session {session_id} not found"
)
try:
generator = AuditPDFGenerator(db)
pdf_bytes, filename = generator.generate(
session_id=session_id,
language=language,
include_signatures=include_signatures,
)
return StreamingResponse(
io.BytesIO(pdf_bytes),
media_type="application/pdf",
headers={
"Content-Disposition": f"attachment; filename={filename}",
}
)
except Exception as e:
logger.error(f"Failed to generate PDF report: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to generate PDF report: {str(e)}"
)

View File

@@ -0,0 +1,384 @@
"""
FastAPI routes for Dashboard, Executive Dashboard, and Reports.
Endpoints:
- /dashboard: Main compliance dashboard
- /dashboard/executive: Executive summary for managers
- /dashboard/trend: Compliance score trend over time
- /score: Quick compliance score
- /reports: Report generation
"""
import logging
from datetime import datetime, timedelta
from calendar import month_abbr
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from ..db import (
RegulationRepository,
RequirementRepository,
ControlRepository,
EvidenceRepository,
RiskRepository,
)
from .schemas import (
DashboardResponse,
ExecutiveDashboardResponse,
TrendDataPoint,
RiskSummary,
DeadlineItem,
TeamWorkloadItem,
)
logger = logging.getLogger(__name__)
router = APIRouter(tags=["compliance-dashboard"])
# ============================================================================
# Dashboard
# ============================================================================
@router.get("/dashboard", response_model=DashboardResponse)
async def get_dashboard(db: Session = Depends(get_db)):
"""Get compliance dashboard statistics."""
reg_repo = RegulationRepository(db)
req_repo = RequirementRepository(db)
ctrl_repo = ControlRepository(db)
evidence_repo = EvidenceRepository(db)
risk_repo = RiskRepository(db)
# Regulations
regulations = reg_repo.get_active()
requirements = req_repo.get_all()
# Controls statistics
ctrl_stats = ctrl_repo.get_statistics()
controls = ctrl_repo.get_all()
# Group controls by domain
controls_by_domain = {}
for ctrl in controls:
domain = ctrl.domain.value if ctrl.domain else "unknown"
if domain not in controls_by_domain:
controls_by_domain[domain] = {"total": 0, "pass": 0, "partial": 0, "fail": 0, "planned": 0}
controls_by_domain[domain]["total"] += 1
status = ctrl.status.value if ctrl.status else "planned"
if status in controls_by_domain[domain]:
controls_by_domain[domain][status] += 1
# Evidence statistics
evidence_stats = evidence_repo.get_statistics()
# Risk statistics
risks = risk_repo.get_all()
risks_by_level = {"low": 0, "medium": 0, "high": 0, "critical": 0}
for risk in risks:
level = risk.inherent_risk.value if risk.inherent_risk else "low"
if level in risks_by_level:
risks_by_level[level] += 1
# Calculate compliance score - use pre-calculated score from get_statistics()
# or compute from by_status dict
score = ctrl_stats.get("compliance_score", 0.0)
return DashboardResponse(
compliance_score=round(score, 1),
total_regulations=len(regulations),
total_requirements=len(requirements),
total_controls=ctrl_stats.get("total", 0),
controls_by_status=ctrl_stats.get("by_status", {}),
controls_by_domain=controls_by_domain,
total_evidence=evidence_stats.get("total", 0),
evidence_by_status=evidence_stats.get("by_status", {}),
total_risks=len(risks),
risks_by_level=risks_by_level,
recent_activity=[],
)
@router.get("/score")
async def get_compliance_score(db: Session = Depends(get_db)):
"""Get just the compliance score."""
ctrl_repo = ControlRepository(db)
stats = ctrl_repo.get_statistics()
total = stats.get("total", 0)
passing = stats.get("pass", 0)
partial = stats.get("partial", 0)
if total > 0:
score = ((passing + partial * 0.5) / total) * 100
else:
score = 0
return {
"score": round(score, 1),
"total_controls": total,
"passing_controls": passing,
"partial_controls": partial,
}
# ============================================================================
# Executive Dashboard (Phase 3 - Sprint 1)
# ============================================================================
@router.get("/dashboard/executive", response_model=ExecutiveDashboardResponse)
async def get_executive_dashboard(db: Session = Depends(get_db)):
"""
Get executive dashboard for managers and decision makers.
Provides:
- Traffic light status (green/yellow/red)
- Overall compliance score with trend
- Top 5 open risks
- Upcoming deadlines (control reviews, evidence expiry)
- Team workload distribution
"""
reg_repo = RegulationRepository(db)
req_repo = RequirementRepository(db)
ctrl_repo = ControlRepository(db)
risk_repo = RiskRepository(db)
# Calculate compliance score
ctrl_stats = ctrl_repo.get_statistics()
total = ctrl_stats.get("total", 0)
passing = ctrl_stats.get("pass", 0)
partial = ctrl_stats.get("partial", 0)
if total > 0:
score = ((passing + partial * 0.5) / total) * 100
else:
score = 0
# Determine traffic light status
if score >= 80:
traffic_light = "green"
elif score >= 60:
traffic_light = "yellow"
else:
traffic_light = "red"
# Generate trend data (last 12 months - simulated for now)
trend_data = []
now = datetime.utcnow()
for i in range(11, -1, -1):
month_date = now - timedelta(days=i * 30)
trend_score = max(0, min(100, score - (11 - i) * 2 + (5 if i > 6 else 0)))
trend_data.append(TrendDataPoint(
date=month_date.strftime("%Y-%m-%d"),
score=round(trend_score, 1),
label=month_abbr[month_date.month][:3],
))
# Get top 5 risks (sorted by severity)
risks = risk_repo.get_all()
risk_priority = {"critical": 4, "high": 3, "medium": 2, "low": 1}
sorted_risks = sorted(
[r for r in risks if r.status != "mitigated"],
key=lambda r: (
risk_priority.get(r.inherent_risk.value if r.inherent_risk else "low", 1),
r.impact * r.likelihood
),
reverse=True
)[:5]
top_risks = [
RiskSummary(
id=r.id,
risk_id=r.risk_id,
title=r.title,
risk_level=r.inherent_risk.value if r.inherent_risk else "medium",
owner=r.owner,
status=r.status,
category=r.category,
impact=r.impact,
likelihood=r.likelihood,
)
for r in sorted_risks
]
# Get upcoming deadlines
controls = ctrl_repo.get_all()
upcoming_deadlines = []
today = datetime.utcnow().date()
for ctrl in controls:
if ctrl.next_review_at:
review_date = ctrl.next_review_at.date() if hasattr(ctrl.next_review_at, 'date') else ctrl.next_review_at
days_remaining = (review_date - today).days
if days_remaining <= 30:
if days_remaining < 0:
status = "overdue"
elif days_remaining <= 7:
status = "at_risk"
else:
status = "on_track"
upcoming_deadlines.append(DeadlineItem(
id=ctrl.id,
title=f"Review: {ctrl.control_id} - {ctrl.title[:30]}",
deadline=review_date.isoformat(),
days_remaining=days_remaining,
type="control_review",
status=status,
owner=ctrl.owner,
))
upcoming_deadlines.sort(key=lambda x: x.days_remaining)
upcoming_deadlines = upcoming_deadlines[:10]
# Calculate team workload (by owner)
owner_workload = {}
for ctrl in controls:
owner = ctrl.owner or "Unassigned"
if owner not in owner_workload:
owner_workload[owner] = {"pending": 0, "in_progress": 0, "completed": 0}
status = ctrl.status.value if ctrl.status else "planned"
if status in ["pass"]:
owner_workload[owner]["completed"] += 1
elif status in ["partial"]:
owner_workload[owner]["in_progress"] += 1
else:
owner_workload[owner]["pending"] += 1
team_workload = []
for name, stats in owner_workload.items():
total_tasks = stats["pending"] + stats["in_progress"] + stats["completed"]
completion_rate = (stats["completed"] / total_tasks * 100) if total_tasks > 0 else 0
team_workload.append(TeamWorkloadItem(
name=name,
pending_tasks=stats["pending"],
in_progress_tasks=stats["in_progress"],
completed_tasks=stats["completed"],
total_tasks=total_tasks,
completion_rate=round(completion_rate, 1),
))
team_workload.sort(key=lambda x: x.total_tasks, reverse=True)
# Get counts
regulations = reg_repo.get_active()
requirements = req_repo.get_all()
open_risks = len([r for r in risks if r.status != "mitigated"])
return ExecutiveDashboardResponse(
traffic_light_status=traffic_light,
overall_score=round(score, 1),
score_trend=trend_data,
previous_score=trend_data[-2].score if len(trend_data) >= 2 else None,
score_change=round(score - trend_data[-2].score, 1) if len(trend_data) >= 2 else None,
total_regulations=len(regulations),
total_requirements=len(requirements),
total_controls=total,
open_risks=open_risks,
top_risks=top_risks,
upcoming_deadlines=upcoming_deadlines,
team_workload=team_workload,
last_updated=datetime.utcnow().isoformat(),
)
@router.get("/dashboard/trend")
async def get_compliance_trend(
months: int = Query(12, ge=1, le=24, description="Number of months to include"),
db: Session = Depends(get_db),
):
"""
Get compliance score trend over time.
Returns monthly compliance scores for trend visualization.
"""
ctrl_repo = ControlRepository(db)
stats = ctrl_repo.get_statistics()
total = stats.get("total", 0)
passing = stats.get("pass", 0)
partial = stats.get("partial", 0)
current_score = ((passing + partial * 0.5) / total) * 100 if total > 0 else 0
# Generate simulated historical data
trend_data = []
now = datetime.utcnow()
for i in range(months - 1, -1, -1):
month_date = now - timedelta(days=i * 30)
variation = ((i * 7) % 5) - 2
trend_score = max(0, min(100, current_score - (months - 1 - i) * 1.5 + variation))
trend_data.append({
"date": month_date.strftime("%Y-%m-%d"),
"score": round(trend_score, 1),
"label": f"{month_abbr[month_date.month]} {month_date.year % 100}",
"month": month_date.month,
"year": month_date.year,
})
return {
"current_score": round(current_score, 1),
"trend": trend_data,
"period_months": months,
"generated_at": datetime.utcnow().isoformat(),
}
# ============================================================================
# Reports
# ============================================================================
@router.get("/reports/summary")
async def get_summary_report(db: Session = Depends(get_db)):
"""Get a quick summary report for the dashboard."""
from ..services.report_generator import ComplianceReportGenerator
generator = ComplianceReportGenerator(db)
return generator.generate_summary_report()
@router.get("/reports/{period}")
async def generate_period_report(
period: str = "monthly",
as_of_date: Optional[str] = None,
db: Session = Depends(get_db),
):
"""
Generate a compliance report for the specified period.
Args:
period: One of 'weekly', 'monthly', 'quarterly', 'yearly'
as_of_date: Report date (YYYY-MM-DD format, defaults to today)
Returns:
Complete compliance report
"""
from ..services.report_generator import ComplianceReportGenerator, ReportPeriod
# Validate period
try:
report_period = ReportPeriod(period)
except ValueError:
raise HTTPException(
status_code=400,
detail=f"Invalid period '{period}'. Must be one of: weekly, monthly, quarterly, yearly"
)
# Parse date
report_date = None
if as_of_date:
try:
report_date = datetime.strptime(as_of_date, "%Y-%m-%d").date()
except ValueError:
raise HTTPException(
status_code=400,
detail="Invalid date format. Use YYYY-MM-DD"
)
generator = ComplianceReportGenerator(db)
return generator.generate_report(report_period, report_date)

View File

@@ -0,0 +1,530 @@
"""
FastAPI routes for Evidence management.
Endpoints:
- /evidence: Evidence listing and creation
- /evidence/upload: Evidence file upload
- /evidence/collect: CI/CD evidence collection
- /evidence/ci-status: CI/CD evidence status
"""
import logging
import os
from datetime import datetime, timedelta
from typing import Optional
from collections import defaultdict
import uuid as uuid_module
import hashlib
import json
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Query
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from ..db import (
ControlRepository,
EvidenceRepository,
EvidenceStatusEnum,
)
from ..db.models import EvidenceDB, ControlDB
from ..services.auto_risk_updater import AutoRiskUpdater
from .schemas import (
EvidenceCreate, EvidenceResponse, EvidenceListResponse,
)
logger = logging.getLogger(__name__)
router = APIRouter(tags=["compliance-evidence"])
# ============================================================================
# Evidence
# ============================================================================
@router.get("/evidence", response_model=EvidenceListResponse)
async def list_evidence(
control_id: Optional[str] = None,
evidence_type: Optional[str] = None,
status: Optional[str] = None,
db: Session = Depends(get_db),
):
"""List evidence with optional filters."""
repo = EvidenceRepository(db)
if control_id:
# First get the control UUID
ctrl_repo = ControlRepository(db)
control = ctrl_repo.get_by_control_id(control_id)
if not control:
raise HTTPException(status_code=404, detail=f"Control {control_id} not found")
evidence = repo.get_by_control(control.id)
else:
evidence = repo.get_all()
if evidence_type:
evidence = [e for e in evidence if e.evidence_type == evidence_type]
if status:
try:
status_enum = EvidenceStatusEnum(status)
evidence = [e for e in evidence if e.status == status_enum]
except ValueError:
pass
results = [
EvidenceResponse(
id=e.id,
control_id=e.control_id,
evidence_type=e.evidence_type,
title=e.title,
description=e.description,
artifact_path=e.artifact_path,
artifact_url=e.artifact_url,
artifact_hash=e.artifact_hash,
file_size_bytes=e.file_size_bytes,
mime_type=e.mime_type,
valid_from=e.valid_from,
valid_until=e.valid_until,
status=e.status.value if e.status else None,
source=e.source,
ci_job_id=e.ci_job_id,
uploaded_by=e.uploaded_by,
collected_at=e.collected_at,
created_at=e.created_at,
)
for e in evidence
]
return EvidenceListResponse(evidence=results, total=len(results))
@router.post("/evidence", response_model=EvidenceResponse)
async def create_evidence(
evidence_data: EvidenceCreate,
db: Session = Depends(get_db),
):
"""Create new evidence record."""
repo = EvidenceRepository(db)
# Get control UUID
ctrl_repo = ControlRepository(db)
control = ctrl_repo.get_by_control_id(evidence_data.control_id)
if not control:
raise HTTPException(status_code=404, detail=f"Control {evidence_data.control_id} not found")
evidence = repo.create(
control_id=control.id,
evidence_type=evidence_data.evidence_type,
title=evidence_data.title,
description=evidence_data.description,
artifact_url=evidence_data.artifact_url,
valid_from=evidence_data.valid_from,
valid_until=evidence_data.valid_until,
source=evidence_data.source or "api",
ci_job_id=evidence_data.ci_job_id,
)
db.commit()
return EvidenceResponse(
id=evidence.id,
control_id=evidence.control_id,
evidence_type=evidence.evidence_type,
title=evidence.title,
description=evidence.description,
artifact_path=evidence.artifact_path,
artifact_url=evidence.artifact_url,
artifact_hash=evidence.artifact_hash,
file_size_bytes=evidence.file_size_bytes,
mime_type=evidence.mime_type,
valid_from=evidence.valid_from,
valid_until=evidence.valid_until,
status=evidence.status.value if evidence.status else None,
source=evidence.source,
ci_job_id=evidence.ci_job_id,
uploaded_by=evidence.uploaded_by,
collected_at=evidence.collected_at,
created_at=evidence.created_at,
)
@router.post("/evidence/upload")
async def upload_evidence(
control_id: str = Query(...),
evidence_type: str = Query(...),
title: str = Query(...),
file: UploadFile = File(...),
description: Optional[str] = Query(None),
db: Session = Depends(get_db),
):
"""Upload evidence file."""
# Get control UUID
ctrl_repo = ControlRepository(db)
control = ctrl_repo.get_by_control_id(control_id)
if not control:
raise HTTPException(status_code=404, detail=f"Control {control_id} not found")
# Create upload directory
upload_dir = f"/tmp/compliance_evidence/{control_id}"
os.makedirs(upload_dir, exist_ok=True)
# Save file
file_path = os.path.join(upload_dir, file.filename)
content = await file.read()
with open(file_path, "wb") as f:
f.write(content)
# Calculate hash
file_hash = hashlib.sha256(content).hexdigest()
# Create evidence record
repo = EvidenceRepository(db)
evidence = repo.create(
control_id=control.id,
evidence_type=evidence_type,
title=title,
description=description,
artifact_path=file_path,
artifact_hash=file_hash,
file_size_bytes=len(content),
mime_type=file.content_type,
source="upload",
)
db.commit()
return EvidenceResponse(
id=evidence.id,
control_id=evidence.control_id,
evidence_type=evidence.evidence_type,
title=evidence.title,
description=evidence.description,
artifact_path=evidence.artifact_path,
artifact_url=evidence.artifact_url,
artifact_hash=evidence.artifact_hash,
file_size_bytes=evidence.file_size_bytes,
mime_type=evidence.mime_type,
valid_from=evidence.valid_from,
valid_until=evidence.valid_until,
status=evidence.status.value if evidence.status else None,
source=evidence.source,
ci_job_id=evidence.ci_job_id,
uploaded_by=evidence.uploaded_by,
collected_at=evidence.collected_at,
created_at=evidence.created_at,
)
# ============================================================================
# CI/CD Evidence Collection
# ============================================================================
@router.post("/evidence/collect")
async def collect_ci_evidence(
source: str = Query(..., description="Evidence source: sast, dependency_scan, sbom, container_scan, test_results"),
ci_job_id: str = Query(None, description="CI/CD Job ID for traceability"),
ci_job_url: str = Query(None, description="URL to CI/CD job"),
report_data: dict = None,
db: Session = Depends(get_db),
):
"""
Collect evidence from CI/CD pipeline.
This endpoint is designed to be called from CI/CD workflows (GitHub Actions,
GitLab CI, Jenkins, etc.) to automatically collect compliance evidence.
Supported sources:
- sast: Static Application Security Testing (Semgrep, SonarQube, etc.)
- dependency_scan: Dependency vulnerability scanning (Trivy, Grype, Snyk)
- sbom: Software Bill of Materials (CycloneDX, SPDX)
- container_scan: Container image scanning (Trivy, Grype)
- test_results: Test coverage and results
- secret_scan: Secret detection (Gitleaks, TruffleHog)
- code_review: Code review metrics
"""
# Map source to control_id
SOURCE_CONTROL_MAP = {
"sast": "SDLC-001",
"dependency_scan": "SDLC-002",
"secret_scan": "SDLC-003",
"code_review": "SDLC-004",
"sbom": "SDLC-005",
"container_scan": "SDLC-006",
"test_results": "AUD-001",
}
if source not in SOURCE_CONTROL_MAP:
raise HTTPException(
status_code=400,
detail=f"Unknown source '{source}'. Supported: {list(SOURCE_CONTROL_MAP.keys())}"
)
control_id = SOURCE_CONTROL_MAP[source]
# Get control
ctrl_repo = ControlRepository(db)
control = ctrl_repo.get_by_control_id(control_id)
if not control:
raise HTTPException(
status_code=404,
detail=f"Control {control_id} not found. Please seed the database first."
)
# Parse and validate report data
report_json = json.dumps(report_data) if report_data else "{}"
report_hash = hashlib.sha256(report_json.encode()).hexdigest()
# Determine evidence status based on report content
evidence_status = "valid"
findings_count = 0
critical_findings = 0
if report_data:
# Try to extract findings from common report formats
if isinstance(report_data, dict):
# Semgrep format
if "results" in report_data:
findings_count = len(report_data.get("results", []))
critical_findings = len([
r for r in report_data.get("results", [])
if r.get("extra", {}).get("severity", "").upper() in ["CRITICAL", "HIGH"]
])
# Trivy format
elif "Results" in report_data:
for result in report_data.get("Results", []):
vulns = result.get("Vulnerabilities", [])
findings_count += len(vulns)
critical_findings += len([
v for v in vulns
if v.get("Severity", "").upper() in ["CRITICAL", "HIGH"]
])
# Generic findings array
elif "findings" in report_data:
findings_count = len(report_data.get("findings", []))
# SBOM format - just count components
elif "components" in report_data:
findings_count = len(report_data.get("components", []))
# If critical findings exist, mark as failed
if critical_findings > 0:
evidence_status = "failed"
# Create evidence title
title = f"{source.upper()} Report - {datetime.now().strftime('%Y-%m-%d %H:%M')}"
description = f"Automatically collected from CI/CD pipeline"
if findings_count > 0:
description += f"\n- Total findings: {findings_count}"
if critical_findings > 0:
description += f"\n- Critical/High findings: {critical_findings}"
if ci_job_id:
description += f"\n- CI Job ID: {ci_job_id}"
if ci_job_url:
description += f"\n- CI Job URL: {ci_job_url}"
# Store report file
upload_dir = f"/tmp/compliance_evidence/ci/{source}"
os.makedirs(upload_dir, exist_ok=True)
file_name = f"{source}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{report_hash[:8]}.json"
file_path = os.path.join(upload_dir, file_name)
with open(file_path, "w") as f:
json.dump(report_data or {}, f, indent=2)
# Create evidence record directly
evidence = EvidenceDB(
id=str(uuid_module.uuid4()),
control_id=control.id,
evidence_type=f"ci_{source}",
title=title,
description=description,
artifact_path=file_path,
artifact_hash=report_hash,
file_size_bytes=len(report_json),
mime_type="application/json",
source="ci_pipeline",
ci_job_id=ci_job_id,
valid_from=datetime.utcnow(),
valid_until=datetime.utcnow() + timedelta(days=90),
status=EvidenceStatusEnum(evidence_status),
)
db.add(evidence)
db.commit()
db.refresh(evidence)
# =========================================================================
# AUTOMATIC RISK UPDATE
# Update Control status and linked Risks based on findings
# =========================================================================
risk_update_result = None
try:
# Extract detailed findings for risk assessment
findings_detail = {
"critical": 0,
"high": 0,
"medium": 0,
"low": 0,
}
if report_data:
# Semgrep format
if "results" in report_data:
for r in report_data.get("results", []):
severity = r.get("extra", {}).get("severity", "").upper()
if severity == "CRITICAL":
findings_detail["critical"] += 1
elif severity == "HIGH":
findings_detail["high"] += 1
elif severity == "MEDIUM":
findings_detail["medium"] += 1
elif severity in ["LOW", "INFO"]:
findings_detail["low"] += 1
# Trivy format
elif "Results" in report_data:
for result in report_data.get("Results", []):
for v in result.get("Vulnerabilities", []):
severity = v.get("Severity", "").upper()
if severity == "CRITICAL":
findings_detail["critical"] += 1
elif severity == "HIGH":
findings_detail["high"] += 1
elif severity == "MEDIUM":
findings_detail["medium"] += 1
elif severity == "LOW":
findings_detail["low"] += 1
# Generic findings with severity
elif "findings" in report_data:
for f in report_data.get("findings", []):
severity = f.get("severity", "").upper()
if severity == "CRITICAL":
findings_detail["critical"] += 1
elif severity == "HIGH":
findings_detail["high"] += 1
elif severity == "MEDIUM":
findings_detail["medium"] += 1
else:
findings_detail["low"] += 1
# Use AutoRiskUpdater to update Control status and Risks
auto_updater = AutoRiskUpdater(db)
risk_update_result = auto_updater.process_evidence_collect_request(
tool=source,
control_id=control_id,
evidence_type=f"ci_{source}",
timestamp=datetime.utcnow().isoformat(),
commit_sha=report_data.get("commit_sha", "unknown") if report_data else "unknown",
ci_job_id=ci_job_id,
findings=findings_detail,
)
logger.info(f"Auto-risk update completed for {control_id}: "
f"control_updated={risk_update_result.control_updated}, "
f"risks_affected={len(risk_update_result.risks_affected)}")
except Exception as e:
logger.error(f"Auto-risk update failed for {control_id}: {str(e)}")
return {
"success": True,
"evidence_id": evidence.id,
"control_id": control_id,
"source": source,
"status": evidence_status,
"findings_count": findings_count,
"critical_findings": critical_findings,
"artifact_path": file_path,
"message": f"Evidence collected successfully for control {control_id}",
"auto_risk_update": {
"enabled": True,
"control_updated": risk_update_result.control_updated if risk_update_result else False,
"old_status": risk_update_result.old_status if risk_update_result else None,
"new_status": risk_update_result.new_status if risk_update_result else None,
"risks_affected": risk_update_result.risks_affected if risk_update_result else [],
"alerts_generated": risk_update_result.alerts_generated if risk_update_result else [],
} if risk_update_result else {"enabled": False, "error": "Auto-update skipped"},
}
@router.get("/evidence/ci-status")
async def get_ci_evidence_status(
control_id: str = Query(None, description="Filter by control ID"),
days: int = Query(30, description="Look back N days"),
db: Session = Depends(get_db),
):
"""
Get CI/CD evidence collection status.
Returns overview of recent evidence collected from CI/CD pipelines,
useful for dashboards and monitoring.
"""
cutoff_date = datetime.utcnow() - timedelta(days=days)
# Build query
query = db.query(EvidenceDB).filter(
EvidenceDB.source == "ci_pipeline",
EvidenceDB.collected_at >= cutoff_date,
)
if control_id:
ctrl_repo = ControlRepository(db)
control = ctrl_repo.get_by_control_id(control_id)
if control:
query = query.filter(EvidenceDB.control_id == control.id)
evidence_list = query.order_by(EvidenceDB.collected_at.desc()).limit(100).all()
# Group by control and calculate stats
control_stats = defaultdict(lambda: {
"total": 0,
"valid": 0,
"failed": 0,
"last_collected": None,
"evidence": [],
})
for e in evidence_list:
# Get control_id string
control = db.query(ControlDB).filter(ControlDB.id == e.control_id).first()
ctrl_id = control.control_id if control else "unknown"
stats = control_stats[ctrl_id]
stats["total"] += 1
if e.status:
if e.status.value == "valid":
stats["valid"] += 1
elif e.status.value == "failed":
stats["failed"] += 1
if not stats["last_collected"] or e.collected_at > stats["last_collected"]:
stats["last_collected"] = e.collected_at
# Add evidence summary
stats["evidence"].append({
"id": e.id,
"type": e.evidence_type,
"status": e.status.value if e.status else None,
"collected_at": e.collected_at.isoformat() if e.collected_at else None,
"ci_job_id": e.ci_job_id,
})
# Convert to list and sort
result = []
for ctrl_id, stats in control_stats.items():
result.append({
"control_id": ctrl_id,
"total_evidence": stats["total"],
"valid_count": stats["valid"],
"failed_count": stats["failed"],
"last_collected": stats["last_collected"].isoformat() if stats["last_collected"] else None,
"recent_evidence": stats["evidence"][:5],
})
result.sort(key=lambda x: x["last_collected"] or "", reverse=True)
return {
"period_days": days,
"total_evidence": len(evidence_list),
"controls": result,
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,250 @@
"""
FastAPI routes for Service Module Registry.
Endpoints:
- /modules: Module listing and management
- /modules/overview: Module compliance overview
- /modules/{module_id}: Module details
- /modules/seed: Seed modules from data
- /modules/{module_id}/regulations: Add regulation mapping
"""
import logging
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from ..db import RegulationRepository
from .schemas import (
ServiceModuleResponse, ServiceModuleListResponse, ServiceModuleDetailResponse,
ModuleRegulationMappingCreate, ModuleRegulationMappingResponse,
ModuleSeedRequest, ModuleSeedResponse, ModuleComplianceOverview,
)
logger = logging.getLogger(__name__)
router = APIRouter(tags=["compliance-modules"])
# ============================================================================
# Service Module Registry
# ============================================================================
@router.get("/modules", response_model=ServiceModuleListResponse)
async def list_modules(
service_type: Optional[str] = None,
criticality: Optional[str] = None,
processes_pii: Optional[bool] = None,
ai_components: Optional[bool] = None,
db: Session = Depends(get_db),
):
"""List all service modules with optional filters."""
from ..db.repository import ServiceModuleRepository
repo = ServiceModuleRepository(db)
modules = repo.get_all(
service_type=service_type,
criticality=criticality,
processes_pii=processes_pii,
ai_components=ai_components,
)
# Count regulations and risks for each module
results = []
for m in modules:
reg_count = len(m.regulation_mappings) if m.regulation_mappings else 0
risk_count = len(m.module_risks) if m.module_risks else 0
results.append(ServiceModuleResponse(
id=m.id,
name=m.name,
display_name=m.display_name,
description=m.description,
service_type=m.service_type.value if m.service_type else None,
port=m.port,
technology_stack=m.technology_stack or [],
repository_path=m.repository_path,
docker_image=m.docker_image,
data_categories=m.data_categories or [],
processes_pii=m.processes_pii,
processes_health_data=m.processes_health_data,
ai_components=m.ai_components,
criticality=m.criticality,
owner_team=m.owner_team,
owner_contact=m.owner_contact,
is_active=m.is_active,
compliance_score=m.compliance_score,
last_compliance_check=m.last_compliance_check,
created_at=m.created_at,
updated_at=m.updated_at,
regulation_count=reg_count,
risk_count=risk_count,
))
return ServiceModuleListResponse(modules=results, total=len(results))
@router.get("/modules/overview", response_model=ModuleComplianceOverview)
async def get_modules_overview(db: Session = Depends(get_db)):
"""Get overview statistics for all modules."""
from ..db.repository import ServiceModuleRepository
repo = ServiceModuleRepository(db)
overview = repo.get_overview()
return ModuleComplianceOverview(**overview)
@router.get("/modules/{module_id}", response_model=ServiceModuleDetailResponse)
async def get_module(module_id: str, db: Session = Depends(get_db)):
"""Get a specific module with its regulations and risks."""
from ..db.repository import ServiceModuleRepository
repo = ServiceModuleRepository(db)
module = repo.get_with_regulations(module_id)
if not module:
# Try by name
module = repo.get_by_name(module_id)
if module:
module = repo.get_with_regulations(module.id)
if not module:
raise HTTPException(status_code=404, detail=f"Module {module_id} not found")
# Build regulation list
regulations = []
for mapping in (module.regulation_mappings or []):
reg = mapping.regulation
if reg:
regulations.append({
"code": reg.code,
"name": reg.name,
"relevance_level": mapping.relevance_level.value if mapping.relevance_level else "medium",
"notes": mapping.notes,
})
# Build risk list
risks = []
for mr in (module.module_risks or []):
risk = mr.risk
if risk:
risks.append({
"risk_id": risk.risk_id,
"title": risk.title,
"inherent_risk": risk.inherent_risk.value if risk.inherent_risk else None,
"module_risk_level": mr.module_risk_level.value if mr.module_risk_level else None,
})
return ServiceModuleDetailResponse(
id=module.id,
name=module.name,
display_name=module.display_name,
description=module.description,
service_type=module.service_type.value if module.service_type else None,
port=module.port,
technology_stack=module.technology_stack or [],
repository_path=module.repository_path,
docker_image=module.docker_image,
data_categories=module.data_categories or [],
processes_pii=module.processes_pii,
processes_health_data=module.processes_health_data,
ai_components=module.ai_components,
criticality=module.criticality,
owner_team=module.owner_team,
owner_contact=module.owner_contact,
is_active=module.is_active,
compliance_score=module.compliance_score,
last_compliance_check=module.last_compliance_check,
created_at=module.created_at,
updated_at=module.updated_at,
regulation_count=len(regulations),
risk_count=len(risks),
regulations=regulations,
risks=risks,
)
@router.post("/modules/seed", response_model=ModuleSeedResponse)
async def seed_modules(
request: ModuleSeedRequest,
db: Session = Depends(get_db),
):
"""Seed service modules from predefined data."""
from classroom_engine.database import engine
from ..db.models import ServiceModuleDB, ModuleRegulationMappingDB, ModuleRiskDB
from ..db.repository import ServiceModuleRepository
from ..data.service_modules import BREAKPILOT_SERVICES
try:
# Ensure tables exist
ServiceModuleDB.__table__.create(engine, checkfirst=True)
ModuleRegulationMappingDB.__table__.create(engine, checkfirst=True)
ModuleRiskDB.__table__.create(engine, checkfirst=True)
repo = ServiceModuleRepository(db)
result = repo.seed_from_data(BREAKPILOT_SERVICES, force=request.force)
return ModuleSeedResponse(
success=True,
message=f"Seeded {result['modules_created']} modules with {result['mappings_created']} regulation mappings",
modules_created=result["modules_created"],
mappings_created=result["mappings_created"],
)
except Exception as e:
logger.error(f"Module seeding failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/modules/{module_id}/regulations", response_model=ModuleRegulationMappingResponse)
async def add_module_regulation(
module_id: str,
mapping: ModuleRegulationMappingCreate,
db: Session = Depends(get_db),
):
"""Add a regulation mapping to a module."""
from ..db.repository import ServiceModuleRepository
repo = ServiceModuleRepository(db)
module = repo.get_by_id(module_id)
if not module:
module = repo.get_by_name(module_id)
if not module:
raise HTTPException(status_code=404, detail=f"Module {module_id} not found")
# Verify regulation exists
reg_repo = RegulationRepository(db)
regulation = reg_repo.get_by_id(mapping.regulation_id)
if not regulation:
regulation = reg_repo.get_by_code(mapping.regulation_id)
if not regulation:
raise HTTPException(status_code=404, detail=f"Regulation {mapping.regulation_id} not found")
try:
new_mapping = repo.add_regulation_mapping(
module_id=module.id,
regulation_id=regulation.id,
relevance_level=mapping.relevance_level,
notes=mapping.notes,
applicable_articles=mapping.applicable_articles,
)
return ModuleRegulationMappingResponse(
id=new_mapping.id,
module_id=new_mapping.module_id,
regulation_id=new_mapping.regulation_id,
relevance_level=new_mapping.relevance_level.value if new_mapping.relevance_level else "medium",
notes=new_mapping.notes,
applicable_articles=new_mapping.applicable_articles,
module_name=module.name,
regulation_code=regulation.code,
regulation_name=regulation.name,
created_at=new_mapping.created_at,
)
except Exception as e:
logger.error(f"Failed to add regulation mapping: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -0,0 +1,200 @@
"""
FastAPI routes for Risk management.
Endpoints:
- /risks: Risk listing and CRUD
- /risks/matrix: Risk matrix visualization
"""
import logging
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from ..db import RiskRepository, RiskLevelEnum
from .schemas import (
RiskCreate, RiskUpdate, RiskResponse, RiskListResponse, RiskMatrixResponse,
)
logger = logging.getLogger(__name__)
router = APIRouter(tags=["compliance-risks"])
# ============================================================================
# Risks
# ============================================================================
@router.get("/risks", response_model=RiskListResponse)
async def list_risks(
category: Optional[str] = None,
status: Optional[str] = None,
risk_level: Optional[str] = None,
db: Session = Depends(get_db),
):
"""List risks with optional filters."""
repo = RiskRepository(db)
risks = repo.get_all()
if category:
risks = [r for r in risks if r.category == category]
if status:
risks = [r for r in risks if r.status == status]
if risk_level:
try:
level = RiskLevelEnum(risk_level)
risks = [r for r in risks if r.inherent_risk == level]
except ValueError:
pass
results = [
RiskResponse(
id=r.id,
risk_id=r.risk_id,
title=r.title,
description=r.description,
category=r.category,
likelihood=r.likelihood,
impact=r.impact,
inherent_risk=r.inherent_risk.value if r.inherent_risk else None,
mitigating_controls=r.mitigating_controls,
residual_likelihood=r.residual_likelihood,
residual_impact=r.residual_impact,
residual_risk=r.residual_risk.value if r.residual_risk else None,
owner=r.owner,
status=r.status,
treatment_plan=r.treatment_plan,
identified_date=r.identified_date,
review_date=r.review_date,
last_assessed_at=r.last_assessed_at,
created_at=r.created_at,
updated_at=r.updated_at,
)
for r in risks
]
return RiskListResponse(risks=results, total=len(results))
@router.post("/risks", response_model=RiskResponse)
async def create_risk(
risk_data: RiskCreate,
db: Session = Depends(get_db),
):
"""Create a new risk."""
repo = RiskRepository(db)
risk = repo.create(
risk_id=risk_data.risk_id,
title=risk_data.title,
description=risk_data.description,
category=risk_data.category,
likelihood=risk_data.likelihood,
impact=risk_data.impact,
mitigating_controls=risk_data.mitigating_controls,
owner=risk_data.owner,
treatment_plan=risk_data.treatment_plan,
)
db.commit()
return RiskResponse(
id=risk.id,
risk_id=risk.risk_id,
title=risk.title,
description=risk.description,
category=risk.category,
likelihood=risk.likelihood,
impact=risk.impact,
inherent_risk=risk.inherent_risk.value if risk.inherent_risk else None,
mitigating_controls=risk.mitigating_controls,
residual_likelihood=risk.residual_likelihood,
residual_impact=risk.residual_impact,
residual_risk=risk.residual_risk.value if risk.residual_risk else None,
owner=risk.owner,
status=risk.status,
treatment_plan=risk.treatment_plan,
identified_date=risk.identified_date,
review_date=risk.review_date,
last_assessed_at=risk.last_assessed_at,
created_at=risk.created_at,
updated_at=risk.updated_at,
)
@router.put("/risks/{risk_id}", response_model=RiskResponse)
async def update_risk(
risk_id: str,
update: RiskUpdate,
db: Session = Depends(get_db),
):
"""Update a risk."""
repo = RiskRepository(db)
risk = repo.get_by_risk_id(risk_id)
if not risk:
raise HTTPException(status_code=404, detail=f"Risk {risk_id} not found")
update_data = update.model_dump(exclude_unset=True)
updated = repo.update(risk.id, **update_data)
db.commit()
return RiskResponse(
id=updated.id,
risk_id=updated.risk_id,
title=updated.title,
description=updated.description,
category=updated.category,
likelihood=updated.likelihood,
impact=updated.impact,
inherent_risk=updated.inherent_risk.value if updated.inherent_risk else None,
mitigating_controls=updated.mitigating_controls,
residual_likelihood=updated.residual_likelihood,
residual_impact=updated.residual_impact,
residual_risk=updated.residual_risk.value if updated.residual_risk else None,
owner=updated.owner,
status=updated.status,
treatment_plan=updated.treatment_plan,
identified_date=updated.identified_date,
review_date=updated.review_date,
last_assessed_at=updated.last_assessed_at,
created_at=updated.created_at,
updated_at=updated.updated_at,
)
@router.get("/risks/matrix", response_model=RiskMatrixResponse)
async def get_risk_matrix(db: Session = Depends(get_db)):
"""Get risk matrix data for visualization."""
repo = RiskRepository(db)
matrix_data = repo.get_risk_matrix()
risks = repo.get_all()
risk_responses = [
RiskResponse(
id=r.id,
risk_id=r.risk_id,
title=r.title,
description=r.description,
category=r.category,
likelihood=r.likelihood,
impact=r.impact,
inherent_risk=r.inherent_risk.value if r.inherent_risk else None,
mitigating_controls=r.mitigating_controls,
residual_likelihood=r.residual_likelihood,
residual_impact=r.residual_impact,
residual_risk=r.residual_risk.value if r.residual_risk else None,
owner=r.owner,
status=r.status,
treatment_plan=r.treatment_plan,
identified_date=r.identified_date,
review_date=r.review_date,
last_assessed_at=r.last_assessed_at,
created_at=r.created_at,
updated_at=r.updated_at,
)
for r in risks
]
return RiskMatrixResponse(matrix=matrix_data, risks=risk_responses)

View File

@@ -0,0 +1,914 @@
"""
FastAPI routes for Compliance module.
Endpoints:
- /regulations: Manage regulations
- /requirements: Manage requirements
- /controls: Manage controls
- /mappings: Requirement-Control mappings
- /evidence: Evidence management
- /risks: Risk management
- /dashboard: Dashboard statistics
- /export: Audit export
"""
import logging
logger = logging.getLogger(__name__)
import os
from datetime import datetime, timedelta
from typing import Optional, List
from pydantic import BaseModel
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Query, BackgroundTasks
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from ..db import (
RegulationRepository,
RequirementRepository,
ControlRepository,
EvidenceRepository,
RiskRepository,
AuditExportRepository,
ControlStatusEnum,
ControlDomainEnum,
RiskLevelEnum,
EvidenceStatusEnum,
)
from ..db.models import EvidenceDB, ControlDB
from ..services.seeder import ComplianceSeeder
from ..services.export_generator import AuditExportGenerator
from ..services.auto_risk_updater import AutoRiskUpdater, ScanType
from .schemas import (
RegulationCreate, RegulationResponse, RegulationListResponse,
RequirementCreate, RequirementResponse, RequirementListResponse,
ControlCreate, ControlUpdate, ControlResponse, ControlListResponse, ControlReviewRequest,
MappingCreate, MappingResponse, MappingListResponse,
ExportRequest, ExportResponse, ExportListResponse,
SeedRequest, SeedResponse,
# Pagination schemas
PaginationMeta, PaginatedRequirementResponse, PaginatedControlResponse,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/compliance", tags=["compliance"])
# ============================================================================
# Regulations
# ============================================================================
@router.get("/regulations", response_model=RegulationListResponse)
async def list_regulations(
is_active: Optional[bool] = None,
regulation_type: Optional[str] = None,
db: Session = Depends(get_db),
):
"""List all regulations."""
repo = RegulationRepository(db)
if is_active is not None:
regulations = repo.get_active() if is_active else repo.get_all()
else:
regulations = repo.get_all()
if regulation_type:
from ..db.models import RegulationTypeEnum
try:
reg_type = RegulationTypeEnum(regulation_type)
regulations = [r for r in regulations if r.regulation_type == reg_type]
except ValueError:
pass
# Add requirement counts
req_repo = RequirementRepository(db)
results = []
for reg in regulations:
reqs = req_repo.get_by_regulation(reg.id)
reg_dict = {
"id": reg.id,
"code": reg.code,
"name": reg.name,
"full_name": reg.full_name,
"regulation_type": reg.regulation_type.value if reg.regulation_type else None,
"source_url": reg.source_url,
"local_pdf_path": reg.local_pdf_path,
"effective_date": reg.effective_date,
"description": reg.description,
"is_active": reg.is_active,
"created_at": reg.created_at,
"updated_at": reg.updated_at,
"requirement_count": len(reqs),
}
results.append(RegulationResponse(**reg_dict))
return RegulationListResponse(regulations=results, total=len(results))
@router.get("/regulations/{code}", response_model=RegulationResponse)
async def get_regulation(code: str, db: Session = Depends(get_db)):
"""Get a specific regulation by code."""
repo = RegulationRepository(db)
regulation = repo.get_by_code(code)
if not regulation:
raise HTTPException(status_code=404, detail=f"Regulation {code} not found")
req_repo = RequirementRepository(db)
reqs = req_repo.get_by_regulation(regulation.id)
return RegulationResponse(
id=regulation.id,
code=regulation.code,
name=regulation.name,
full_name=regulation.full_name,
regulation_type=regulation.regulation_type.value if regulation.regulation_type else None,
source_url=regulation.source_url,
local_pdf_path=regulation.local_pdf_path,
effective_date=regulation.effective_date,
description=regulation.description,
is_active=regulation.is_active,
created_at=regulation.created_at,
updated_at=regulation.updated_at,
requirement_count=len(reqs),
)
@router.get("/regulations/{code}/requirements", response_model=RequirementListResponse)
async def get_regulation_requirements(
code: str,
is_applicable: Optional[bool] = None,
db: Session = Depends(get_db),
):
"""Get requirements for a specific regulation."""
reg_repo = RegulationRepository(db)
regulation = reg_repo.get_by_code(code)
if not regulation:
raise HTTPException(status_code=404, detail=f"Regulation {code} not found")
req_repo = RequirementRepository(db)
if is_applicable is not None:
requirements = req_repo.get_applicable(regulation.id) if is_applicable else req_repo.get_by_regulation(regulation.id)
else:
requirements = req_repo.get_by_regulation(regulation.id)
results = [
RequirementResponse(
id=r.id,
regulation_id=r.regulation_id,
regulation_code=code,
article=r.article,
paragraph=r.paragraph,
title=r.title,
description=r.description,
requirement_text=r.requirement_text,
breakpilot_interpretation=r.breakpilot_interpretation,
is_applicable=r.is_applicable,
applicability_reason=r.applicability_reason,
priority=r.priority,
created_at=r.created_at,
updated_at=r.updated_at,
)
for r in requirements
]
return RequirementListResponse(requirements=results, total=len(results))
@router.get("/requirements/{requirement_id}")
async def get_requirement(requirement_id: str, db: Session = Depends(get_db)):
"""Get a specific requirement by ID."""
from ..db.models import RequirementDB, RegulationDB
requirement = db.query(RequirementDB).filter(RequirementDB.id == requirement_id).first()
if not requirement:
raise HTTPException(status_code=404, detail=f"Requirement {requirement_id} not found")
regulation = db.query(RegulationDB).filter(RegulationDB.id == requirement.regulation_id).first()
return {
"id": requirement.id,
"regulation_id": requirement.regulation_id,
"regulation_code": regulation.code if regulation else None,
"article": requirement.article,
"paragraph": requirement.paragraph,
"title": requirement.title,
"description": requirement.description,
"requirement_text": requirement.requirement_text,
"breakpilot_interpretation": requirement.breakpilot_interpretation,
"implementation_status": requirement.implementation_status or "not_started",
"implementation_details": requirement.implementation_details,
"code_references": requirement.code_references,
"documentation_links": requirement.documentation_links,
"evidence_description": requirement.evidence_description,
"evidence_artifacts": requirement.evidence_artifacts,
"auditor_notes": requirement.auditor_notes,
"audit_status": requirement.audit_status or "pending",
"last_audit_date": requirement.last_audit_date,
"last_auditor": requirement.last_auditor,
"is_applicable": requirement.is_applicable,
"applicability_reason": requirement.applicability_reason,
"priority": requirement.priority,
"source_page": requirement.source_page,
"source_section": requirement.source_section,
}
@router.get("/requirements", response_model=PaginatedRequirementResponse)
async def list_requirements_paginated(
page: int = Query(1, ge=1, description="Page number"),
page_size: int = Query(50, ge=1, le=500, description="Items per page"),
regulation_code: Optional[str] = Query(None, description="Filter by regulation code"),
status: Optional[str] = Query(None, description="Filter by implementation status"),
is_applicable: Optional[bool] = Query(None, description="Filter by applicability"),
search: Optional[str] = Query(None, description="Search in title/description"),
db: Session = Depends(get_db),
):
"""
List requirements with pagination and eager-loaded relationships.
This endpoint is optimized for large datasets (1000+ requirements) with:
- Eager loading to prevent N+1 queries
- Server-side pagination
- Full-text search support
"""
req_repo = RequirementRepository(db)
# Use the new paginated method with eager loading
requirements, total = req_repo.get_paginated(
page=page,
page_size=page_size,
regulation_code=regulation_code,
status=status,
is_applicable=is_applicable,
search=search,
)
# Calculate pagination metadata
total_pages = (total + page_size - 1) // page_size
results = [
RequirementResponse(
id=r.id,
regulation_id=r.regulation_id,
regulation_code=r.regulation.code if r.regulation else None,
article=r.article,
paragraph=r.paragraph,
title=r.title,
description=r.description,
requirement_text=r.requirement_text,
breakpilot_interpretation=r.breakpilot_interpretation,
is_applicable=r.is_applicable,
applicability_reason=r.applicability_reason,
priority=r.priority,
implementation_status=r.implementation_status or "not_started",
implementation_details=r.implementation_details,
code_references=r.code_references,
documentation_links=r.documentation_links,
evidence_description=r.evidence_description,
evidence_artifacts=r.evidence_artifacts,
auditor_notes=r.auditor_notes,
audit_status=r.audit_status or "pending",
last_audit_date=r.last_audit_date,
last_auditor=r.last_auditor,
source_page=r.source_page,
source_section=r.source_section,
created_at=r.created_at,
updated_at=r.updated_at,
)
for r in requirements
]
return PaginatedRequirementResponse(
data=results,
pagination=PaginationMeta(
page=page,
page_size=page_size,
total=total,
total_pages=total_pages,
has_next=page < total_pages,
has_prev=page > 1,
),
)
@router.put("/requirements/{requirement_id}")
async def update_requirement(requirement_id: str, updates: dict, db: Session = Depends(get_db)):
"""Update a requirement with implementation/audit details."""
from ..db.models import RequirementDB
from datetime import datetime
requirement = db.query(RequirementDB).filter(RequirementDB.id == requirement_id).first()
if not requirement:
raise HTTPException(status_code=404, detail=f"Requirement {requirement_id} not found")
# Allowed fields to update
allowed_fields = [
'implementation_status', 'implementation_details', 'code_references',
'documentation_links', 'evidence_description', 'evidence_artifacts',
'auditor_notes', 'audit_status', 'is_applicable', 'applicability_reason',
'breakpilot_interpretation'
]
for field in allowed_fields:
if field in updates:
setattr(requirement, field, updates[field])
# Track audit changes
if 'audit_status' in updates:
requirement.last_audit_date = datetime.utcnow()
# TODO: Get auditor from auth
requirement.last_auditor = updates.get('auditor_name', 'api_user')
requirement.updated_at = datetime.utcnow()
db.commit()
db.refresh(requirement)
return {"success": True, "message": "Requirement updated"}
# ============================================================================
# Controls
# ============================================================================
@router.get("/controls", response_model=ControlListResponse)
async def list_controls(
domain: Optional[str] = None,
status: Optional[str] = None,
is_automated: Optional[bool] = None,
search: Optional[str] = None,
db: Session = Depends(get_db),
):
"""List all controls with optional filters."""
repo = ControlRepository(db)
if domain:
try:
domain_enum = ControlDomainEnum(domain)
controls = repo.get_by_domain(domain_enum)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid domain: {domain}")
elif status:
try:
status_enum = ControlStatusEnum(status)
controls = repo.get_by_status(status_enum)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid status: {status}")
else:
controls = repo.get_all()
# Apply additional filters
if is_automated is not None:
controls = [c for c in controls if c.is_automated == is_automated]
if search:
search_lower = search.lower()
controls = [
c for c in controls
if search_lower in c.control_id.lower()
or search_lower in c.title.lower()
or (c.description and search_lower in c.description.lower())
]
# Add counts
evidence_repo = EvidenceRepository(db)
results = []
for ctrl in controls:
evidence = evidence_repo.get_by_control(ctrl.id)
results.append(ControlResponse(
id=ctrl.id,
control_id=ctrl.control_id,
domain=ctrl.domain.value if ctrl.domain else None,
control_type=ctrl.control_type.value if ctrl.control_type else None,
title=ctrl.title,
description=ctrl.description,
pass_criteria=ctrl.pass_criteria,
implementation_guidance=ctrl.implementation_guidance,
code_reference=ctrl.code_reference,
documentation_url=ctrl.documentation_url,
is_automated=ctrl.is_automated,
automation_tool=ctrl.automation_tool,
automation_config=ctrl.automation_config,
owner=ctrl.owner,
review_frequency_days=ctrl.review_frequency_days,
status=ctrl.status.value if ctrl.status else None,
status_notes=ctrl.status_notes,
last_reviewed_at=ctrl.last_reviewed_at,
next_review_at=ctrl.next_review_at,
created_at=ctrl.created_at,
updated_at=ctrl.updated_at,
evidence_count=len(evidence),
))
return ControlListResponse(controls=results, total=len(results))
@router.get("/controls/paginated", response_model=PaginatedControlResponse)
async def list_controls_paginated(
page: int = Query(1, ge=1, description="Page number"),
page_size: int = Query(50, ge=1, le=500, description="Items per page"),
domain: Optional[str] = Query(None, description="Filter by domain"),
status: Optional[str] = Query(None, description="Filter by status"),
is_automated: Optional[bool] = Query(None, description="Filter by automation"),
search: Optional[str] = Query(None, description="Search in title/description"),
db: Session = Depends(get_db),
):
"""
List controls with pagination and eager-loaded relationships.
This endpoint is optimized for large datasets with:
- Eager loading to prevent N+1 queries
- Server-side pagination
- Full-text search support
"""
repo = ControlRepository(db)
# Convert domain/status to enums if provided
domain_enum = None
status_enum = None
if domain:
try:
domain_enum = ControlDomainEnum(domain)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid domain: {domain}")
if status:
try:
status_enum = ControlStatusEnum(status)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid status: {status}")
controls, total = repo.get_paginated(
page=page,
page_size=page_size,
domain=domain_enum,
status=status_enum,
is_automated=is_automated,
search=search,
)
total_pages = (total + page_size - 1) // page_size
results = [
ControlResponse(
id=c.id,
control_id=c.control_id,
domain=c.domain.value if c.domain else None,
control_type=c.control_type.value if c.control_type else None,
title=c.title,
description=c.description,
pass_criteria=c.pass_criteria,
implementation_guidance=c.implementation_guidance,
code_reference=c.code_reference,
documentation_url=c.documentation_url,
is_automated=c.is_automated,
automation_tool=c.automation_tool,
automation_config=c.automation_config,
owner=c.owner,
review_frequency_days=c.review_frequency_days,
status=c.status.value if c.status else None,
status_notes=c.status_notes,
last_reviewed_at=c.last_reviewed_at,
next_review_at=c.next_review_at,
created_at=c.created_at,
updated_at=c.updated_at,
evidence_count=len(c.evidence) if c.evidence else 0,
)
for c in controls
]
return PaginatedControlResponse(
data=results,
pagination=PaginationMeta(
page=page,
page_size=page_size,
total=total,
total_pages=total_pages,
has_next=page < total_pages,
has_prev=page > 1,
),
)
@router.get("/controls/{control_id}", response_model=ControlResponse)
async def get_control(control_id: str, db: Session = Depends(get_db)):
"""Get a specific control by control_id."""
repo = ControlRepository(db)
control = repo.get_by_control_id(control_id)
if not control:
raise HTTPException(status_code=404, detail=f"Control {control_id} not found")
evidence_repo = EvidenceRepository(db)
evidence = evidence_repo.get_by_control(control.id)
return ControlResponse(
id=control.id,
control_id=control.control_id,
domain=control.domain.value if control.domain else None,
control_type=control.control_type.value if control.control_type else None,
title=control.title,
description=control.description,
pass_criteria=control.pass_criteria,
implementation_guidance=control.implementation_guidance,
code_reference=control.code_reference,
documentation_url=control.documentation_url,
is_automated=control.is_automated,
automation_tool=control.automation_tool,
automation_config=control.automation_config,
owner=control.owner,
review_frequency_days=control.review_frequency_days,
status=control.status.value if control.status else None,
status_notes=control.status_notes,
last_reviewed_at=control.last_reviewed_at,
next_review_at=control.next_review_at,
created_at=control.created_at,
updated_at=control.updated_at,
evidence_count=len(evidence),
)
@router.put("/controls/{control_id}", response_model=ControlResponse)
async def update_control(
control_id: str,
update: ControlUpdate,
db: Session = Depends(get_db),
):
"""Update a control."""
repo = ControlRepository(db)
control = repo.get_by_control_id(control_id)
if not control:
raise HTTPException(status_code=404, detail=f"Control {control_id} not found")
update_data = update.model_dump(exclude_unset=True)
# Convert status string to enum
if "status" in update_data:
try:
update_data["status"] = ControlStatusEnum(update_data["status"])
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid status: {update_data['status']}")
updated = repo.update(control.id, **update_data)
db.commit()
return ControlResponse(
id=updated.id,
control_id=updated.control_id,
domain=updated.domain.value if updated.domain else None,
control_type=updated.control_type.value if updated.control_type else None,
title=updated.title,
description=updated.description,
pass_criteria=updated.pass_criteria,
implementation_guidance=updated.implementation_guidance,
code_reference=updated.code_reference,
documentation_url=updated.documentation_url,
is_automated=updated.is_automated,
automation_tool=updated.automation_tool,
automation_config=updated.automation_config,
owner=updated.owner,
review_frequency_days=updated.review_frequency_days,
status=updated.status.value if updated.status else None,
status_notes=updated.status_notes,
last_reviewed_at=updated.last_reviewed_at,
next_review_at=updated.next_review_at,
created_at=updated.created_at,
updated_at=updated.updated_at,
)
@router.put("/controls/{control_id}/review", response_model=ControlResponse)
async def review_control(
control_id: str,
review: ControlReviewRequest,
db: Session = Depends(get_db),
):
"""Mark a control as reviewed with new status."""
repo = ControlRepository(db)
control = repo.get_by_control_id(control_id)
if not control:
raise HTTPException(status_code=404, detail=f"Control {control_id} not found")
try:
status_enum = ControlStatusEnum(review.status)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid status: {review.status}")
updated = repo.mark_reviewed(control.id, status_enum, review.status_notes)
db.commit()
return ControlResponse(
id=updated.id,
control_id=updated.control_id,
domain=updated.domain.value if updated.domain else None,
control_type=updated.control_type.value if updated.control_type else None,
title=updated.title,
description=updated.description,
pass_criteria=updated.pass_criteria,
implementation_guidance=updated.implementation_guidance,
code_reference=updated.code_reference,
documentation_url=updated.documentation_url,
is_automated=updated.is_automated,
automation_tool=updated.automation_tool,
automation_config=updated.automation_config,
owner=updated.owner,
review_frequency_days=updated.review_frequency_days,
status=updated.status.value if updated.status else None,
status_notes=updated.status_notes,
last_reviewed_at=updated.last_reviewed_at,
next_review_at=updated.next_review_at,
created_at=updated.created_at,
updated_at=updated.updated_at,
)
@router.get("/controls/by-domain/{domain}", response_model=ControlListResponse)
async def get_controls_by_domain(domain: str, db: Session = Depends(get_db)):
"""Get controls by domain."""
try:
domain_enum = ControlDomainEnum(domain)
except ValueError:
raise HTTPException(status_code=400, detail=f"Invalid domain: {domain}")
repo = ControlRepository(db)
controls = repo.get_by_domain(domain_enum)
results = [
ControlResponse(
id=c.id,
control_id=c.control_id,
domain=c.domain.value if c.domain else None,
control_type=c.control_type.value if c.control_type else None,
title=c.title,
description=c.description,
pass_criteria=c.pass_criteria,
implementation_guidance=c.implementation_guidance,
code_reference=c.code_reference,
documentation_url=c.documentation_url,
is_automated=c.is_automated,
automation_tool=c.automation_tool,
automation_config=c.automation_config,
owner=c.owner,
review_frequency_days=c.review_frequency_days,
status=c.status.value if c.status else None,
status_notes=c.status_notes,
last_reviewed_at=c.last_reviewed_at,
next_review_at=c.next_review_at,
created_at=c.created_at,
updated_at=c.updated_at,
)
for c in controls
]
return ControlListResponse(controls=results, total=len(results))
@router.post("/export", response_model=ExportResponse)
async def create_export(
request: ExportRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
):
"""Create a new audit export."""
generator = AuditExportGenerator(db)
export = generator.create_export(
requested_by="api_user", # TODO: Get from auth
export_type=request.export_type,
included_regulations=request.included_regulations,
included_domains=request.included_domains,
date_range_start=request.date_range_start,
date_range_end=request.date_range_end,
)
return ExportResponse(
id=export.id,
export_type=export.export_type,
export_name=export.export_name,
status=export.status.value if export.status else None,
requested_by=export.requested_by,
requested_at=export.requested_at,
completed_at=export.completed_at,
file_path=export.file_path,
file_hash=export.file_hash,
file_size_bytes=export.file_size_bytes,
total_controls=export.total_controls,
total_evidence=export.total_evidence,
compliance_score=export.compliance_score,
error_message=export.error_message,
)
@router.get("/export/{export_id}", response_model=ExportResponse)
async def get_export(export_id: str, db: Session = Depends(get_db)):
"""Get export status."""
generator = AuditExportGenerator(db)
export = generator.get_export_status(export_id)
if not export:
raise HTTPException(status_code=404, detail=f"Export {export_id} not found")
return ExportResponse(
id=export.id,
export_type=export.export_type,
export_name=export.export_name,
status=export.status.value if export.status else None,
requested_by=export.requested_by,
requested_at=export.requested_at,
completed_at=export.completed_at,
file_path=export.file_path,
file_hash=export.file_hash,
file_size_bytes=export.file_size_bytes,
total_controls=export.total_controls,
total_evidence=export.total_evidence,
compliance_score=export.compliance_score,
error_message=export.error_message,
)
@router.get("/export/{export_id}/download")
async def download_export(export_id: str, db: Session = Depends(get_db)):
"""Download export file."""
generator = AuditExportGenerator(db)
export = generator.get_export_status(export_id)
if not export:
raise HTTPException(status_code=404, detail=f"Export {export_id} not found")
if export.status.value != "completed":
raise HTTPException(status_code=400, detail="Export not completed")
if not export.file_path or not os.path.exists(export.file_path):
raise HTTPException(status_code=404, detail="Export file not found")
return FileResponse(
export.file_path,
media_type="application/zip",
filename=os.path.basename(export.file_path),
)
@router.get("/exports", response_model=ExportListResponse)
async def list_exports(
limit: int = 20,
offset: int = 0,
db: Session = Depends(get_db),
):
"""List recent exports."""
generator = AuditExportGenerator(db)
exports = generator.list_exports(limit, offset)
results = [
ExportResponse(
id=e.id,
export_type=e.export_type,
export_name=e.export_name,
status=e.status.value if e.status else None,
requested_by=e.requested_by,
requested_at=e.requested_at,
completed_at=e.completed_at,
file_path=e.file_path,
file_hash=e.file_hash,
file_size_bytes=e.file_size_bytes,
total_controls=e.total_controls,
total_evidence=e.total_evidence,
compliance_score=e.compliance_score,
error_message=e.error_message,
)
for e in exports
]
return ExportListResponse(exports=results, total=len(results))
# ============================================================================
# Seeding
# ============================================================================
@router.post("/init-tables")
async def init_tables(db: Session = Depends(get_db)):
"""Create compliance tables if they don't exist."""
from classroom_engine.database import engine
from ..db.models import (
RegulationDB, RequirementDB, ControlDB, ControlMappingDB,
EvidenceDB, RiskDB, AuditExportDB
)
try:
# Create all tables
RegulationDB.__table__.create(engine, checkfirst=True)
RequirementDB.__table__.create(engine, checkfirst=True)
ControlDB.__table__.create(engine, checkfirst=True)
ControlMappingDB.__table__.create(engine, checkfirst=True)
EvidenceDB.__table__.create(engine, checkfirst=True)
RiskDB.__table__.create(engine, checkfirst=True)
AuditExportDB.__table__.create(engine, checkfirst=True)
return {"success": True, "message": "Tables created successfully"}
except Exception as e:
logger.error(f"Table creation failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/create-indexes")
async def create_performance_indexes(db: Session = Depends(get_db)):
"""
Create additional performance indexes for large datasets.
These indexes are optimized for:
- Pagination queries (1000+ requirements)
- Full-text search
- Filtering by status/priority
"""
from sqlalchemy import text
indexes = [
# Priority index for sorting (descending, as we want high priority first)
("ix_req_priority_desc", "CREATE INDEX IF NOT EXISTS ix_req_priority_desc ON compliance_requirements (priority DESC)"),
# Compound index for common filtering patterns
("ix_req_applicable_status", "CREATE INDEX IF NOT EXISTS ix_req_applicable_status ON compliance_requirements (is_applicable, implementation_status)"),
# Control status index
("ix_ctrl_status", "CREATE INDEX IF NOT EXISTS ix_ctrl_status ON compliance_controls (status)"),
# Evidence collected_at for timeline queries
("ix_evidence_collected", "CREATE INDEX IF NOT EXISTS ix_evidence_collected ON compliance_evidence (collected_at DESC)"),
# Risk inherent risk level
("ix_risk_level", "CREATE INDEX IF NOT EXISTS ix_risk_level ON compliance_risks (inherent_risk)"),
]
created = []
errors = []
for idx_name, idx_sql in indexes:
try:
db.execute(text(idx_sql))
db.commit()
created.append(idx_name)
except Exception as e:
errors.append({"index": idx_name, "error": str(e)})
logger.warning(f"Index creation failed for {idx_name}: {e}")
return {
"success": len(errors) == 0,
"created": created,
"errors": errors,
"message": f"Created {len(created)} indexes" + (f", {len(errors)} failed" if errors else ""),
}
@router.post("/seed-risks")
async def seed_risks_only(db: Session = Depends(get_db)):
"""Seed only risks (incremental update for existing databases)."""
from classroom_engine.database import engine
from ..db.models import RiskDB
try:
# Ensure table exists
RiskDB.__table__.create(engine, checkfirst=True)
seeder = ComplianceSeeder(db)
count = seeder.seed_risks_only()
return {
"success": True,
"message": f"Successfully seeded {count} risks",
"risks_seeded": count,
}
except Exception as e:
logger.error(f"Risk seeding failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/seed", response_model=SeedResponse)
async def seed_database(
request: SeedRequest,
db: Session = Depends(get_db),
):
"""Seed the compliance database with initial data."""
from classroom_engine.database import engine
from ..db.models import (
RegulationDB, RequirementDB, ControlDB, ControlMappingDB,
EvidenceDB, RiskDB, AuditExportDB
)
try:
# Ensure tables exist first
RegulationDB.__table__.create(engine, checkfirst=True)
RequirementDB.__table__.create(engine, checkfirst=True)
ControlDB.__table__.create(engine, checkfirst=True)
ControlMappingDB.__table__.create(engine, checkfirst=True)
EvidenceDB.__table__.create(engine, checkfirst=True)
RiskDB.__table__.create(engine, checkfirst=True)
AuditExportDB.__table__.create(engine, checkfirst=True)
seeder = ComplianceSeeder(db)
counts = seeder.seed_all(force=request.force)
return SeedResponse(
success=True,
message="Database seeded successfully",
counts=counts,
)
except Exception as e:
logger.error(f"Seeding failed: {e}")
raise HTTPException(status_code=500, detail=str(e))

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,296 @@
"""
FastAPI routes for Regulation Scraper and PDF extraction.
Endpoints:
- /scraper/status: Scraper status
- /scraper/sources: Available sources
- /scraper/scrape-all: Scrape all sources
- /scraper/scrape/{code}: Scrape single source
- /scraper/extract-bsi: Extract BSI requirements
- /scraper/extract-pdf: Extract from PDF
- /scraper/pdf-documents: List available PDFs
"""
import logging
import os
import uuid
from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from ..db import RegulationRepository, RequirementRepository
from ..db.models import RequirementDB
from .schemas import BSIAspectResponse, PDFExtractionResponse, PDFExtractionRequest
logger = logging.getLogger(__name__)
router = APIRouter(tags=["compliance-scraper"])
# ============================================================================
# Regulation Scraper
# ============================================================================
@router.get("/scraper/status")
async def get_scraper_status(db: Session = Depends(get_db)):
"""Get current scraper status."""
from ..services.regulation_scraper import RegulationScraperService
scraper = RegulationScraperService(db)
return await scraper.get_status()
@router.get("/scraper/sources")
async def get_scraper_sources(db: Session = Depends(get_db)):
"""Get list of known regulation sources."""
from ..services.regulation_scraper import RegulationScraperService
scraper = RegulationScraperService(db)
return {
"sources": scraper.get_known_sources(),
"total": len(scraper.KNOWN_SOURCES),
}
@router.post("/scraper/scrape-all")
async def scrape_all_sources(
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
):
"""Start scraping all known regulation sources."""
from ..services.regulation_scraper import RegulationScraperService
scraper = RegulationScraperService(db)
results = await scraper.scrape_all()
return {
"status": "completed",
"results": results,
}
@router.post("/scraper/scrape/{code}")
async def scrape_single_source(
code: str,
force: bool = Query(False, description="Force re-scrape even if data exists"),
db: Session = Depends(get_db),
):
"""Scrape a specific regulation source."""
from ..services.regulation_scraper import RegulationScraperService
scraper = RegulationScraperService(db)
try:
result = await scraper.scrape_single(code, force=force)
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Scraping {code} failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/scraper/extract-bsi")
async def extract_bsi_requirements(
code: str = Query("BSI-TR-03161-2", description="BSI TR code"),
force: bool = Query(False),
db: Session = Depends(get_db),
):
"""
Extract requirements from BSI Technical Guidelines.
Uses pre-defined Pruefaspekte from BSI-TR-03161 documents.
"""
from ..services.regulation_scraper import RegulationScraperService
if not code.startswith("BSI"):
raise HTTPException(status_code=400, detail="Only BSI codes are supported")
scraper = RegulationScraperService(db)
try:
result = await scraper.scrape_single(code, force=force)
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"BSI extraction failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/scraper/extract-pdf", response_model=PDFExtractionResponse)
async def extract_pdf_requirements(
request: PDFExtractionRequest,
db: Session = Depends(get_db),
):
"""
Extract Pruefaspekte from BSI-TR PDF documents using PyMuPDF.
Supported documents:
- BSI-TR-03161-1: General security requirements
- BSI-TR-03161-2: Web application security (OAuth, Sessions, etc.)
- BSI-TR-03161-3: Backend/server security
"""
from ..services.pdf_extractor import BSIPDFExtractor
from ..db.models import RegulationTypeEnum
# Map document codes to file paths
PDF_PATHS = {
"BSI-TR-03161-1": "/app/docs/BSI-TR-03161-1.pdf",
"BSI-TR-03161-2": "/app/docs/BSI-TR-03161-2.pdf",
"BSI-TR-03161-3": "/app/docs/BSI-TR-03161-3.pdf",
}
# Local development paths (fallback)
LOCAL_PDF_PATHS = {
"BSI-TR-03161-1": "/Users/benjaminadmin/Projekte/breakpilot-pwa/docs/BSI-TR-03161-1.pdf",
"BSI-TR-03161-2": "/Users/benjaminadmin/Projekte/breakpilot-pwa/docs/BSI-TR-03161-2.pdf",
"BSI-TR-03161-3": "/Users/benjaminadmin/Projekte/breakpilot-pwa/docs/BSI-TR-03161-3.pdf",
}
doc_code = request.document_code.upper()
if doc_code not in PDF_PATHS:
raise HTTPException(
status_code=400,
detail=f"Unsupported document: {doc_code}. Supported: {list(PDF_PATHS.keys())}"
)
# Try container path first, then local path
pdf_path = PDF_PATHS[doc_code]
if not os.path.exists(pdf_path):
pdf_path = LOCAL_PDF_PATHS.get(doc_code)
if not pdf_path or not os.path.exists(pdf_path):
raise HTTPException(
status_code=404,
detail=f"PDF file not found for {doc_code}"
)
try:
extractor = BSIPDFExtractor()
aspects = extractor.extract_from_file(pdf_path, source_name=doc_code)
stats = extractor.get_statistics(aspects)
# Convert to response format
aspect_responses = [
BSIAspectResponse(
aspect_id=a.aspect_id,
title=a.title,
full_text=a.full_text[:2000],
category=a.category.value,
page_number=a.page_number,
section=a.section,
requirement_level=a.requirement_level.value,
source_document=a.source_document,
keywords=a.keywords,
related_aspects=a.related_aspects,
)
for a in aspects
]
requirements_created = 0
# Save to database if requested
if request.save_to_db:
# Get or create regulation
reg_repo = RegulationRepository(db)
regulation = reg_repo.get_by_code(doc_code)
if not regulation:
regulation = reg_repo.create(
code=doc_code,
name=f"BSI TR {doc_code.split('-')[-1]}",
full_name=f"BSI Technische Richtlinie {doc_code}",
regulation_type=RegulationTypeEnum.BSI_STANDARD,
local_pdf_path=pdf_path,
)
# Create requirements from extracted aspects
req_repo = RequirementRepository(db)
existing_articles = {r.article for r in req_repo.get_by_regulation(regulation.id)}
for aspect in aspects:
if aspect.aspect_id not in existing_articles or request.force:
# Delete existing if force
if request.force and aspect.aspect_id in existing_articles:
existing = db.query(RequirementDB).filter(
RequirementDB.regulation_id == regulation.id,
RequirementDB.article == aspect.aspect_id
).first()
if existing:
db.delete(existing)
# Determine priority based on requirement level
priority_map = {"MUSS": 3, "SOLL": 2, "KANN": 1, "DARF NICHT": 3}
priority = priority_map.get(aspect.requirement_level.value, 2)
requirement = RequirementDB(
id=str(uuid.uuid4()),
regulation_id=regulation.id,
article=aspect.aspect_id,
paragraph=aspect.section,
title=aspect.title[:300],
description=f"Kategorie: {aspect.category.value}",
requirement_text=aspect.full_text[:4000],
is_applicable=True,
priority=priority,
source_page=aspect.page_number,
source_section=aspect.section,
)
db.add(requirement)
requirements_created += 1
db.commit()
return PDFExtractionResponse(
success=True,
source_document=doc_code,
total_aspects=len(aspects),
aspects=aspect_responses,
statistics=stats,
requirements_created=requirements_created,
)
except ImportError as e:
raise HTTPException(
status_code=500,
detail=f"PyMuPDF not installed: {e}. Run: pip install PyMuPDF"
)
except Exception as e:
logger.error(f"PDF extraction failed for {doc_code}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/scraper/pdf-documents")
async def list_pdf_documents():
"""List available PDF documents for extraction."""
PDF_DOCS = [
{
"code": "BSI-TR-03161-1",
"name": "BSI TR 03161 Teil 1",
"description": "Allgemeine Sicherheitsanforderungen für mobile Anwendungen",
"expected_aspects": "~30",
},
{
"code": "BSI-TR-03161-2",
"name": "BSI TR 03161 Teil 2",
"description": "Web-Anwendungssicherheit (OAuth, Sessions, Input Validation, etc.)",
"expected_aspects": "~80-100",
},
{
"code": "BSI-TR-03161-3",
"name": "BSI TR 03161 Teil 3",
"description": "Backend/Server-Sicherheit",
"expected_aspects": "~40",
},
]
# Check which PDFs exist
for doc in PDF_DOCS:
local_path = f"/Users/benjaminadmin/Projekte/breakpilot-pwa/docs/{doc['code']}.pdf"
container_path = f"/app/docs/{doc['code']}.pdf"
doc["available"] = os.path.exists(local_path) or os.path.exists(container_path)
return {
"documents": PDF_DOCS,
"total": len(PDF_DOCS),
}