fix(cleanup): ISMS Bugfix, 13 tote AI-Endpoints entfernt, Compliance-Hub Proxy fix
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 33s
CI / test-python-backend-compliance (push) Successful in 34s
CI / test-python-document-crawler (push) Successful in 22s
CI / test-python-dsms-gateway (push) Successful in 18s

- ISMS: markStepCompleted entfernt (existiert nicht in SDKContext, verursachte Application Error)
- AI Routes: 13 ungenutzte Endpoints entfernt (ai_routes.py 1266→379 Zeilen, -887)
- Schemas: 12 ungenutzte AI-Schema-Klassen entfernt (-108 Zeilen)
- Compliance-Hub: 5 Fetch-URLs von /api/admin/... auf /api/sdk/v1/... umgestellt
- Tests: 1361 passed, 0 Regressionen

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-07 15:13:19 +01:00
parent 9e65dff7d6
commit 7ec6b9f6c0
4 changed files with 12 additions and 1014 deletions

View File

@@ -83,10 +83,10 @@ export default function ComplianceHubPage() {
setError(null)
try {
const [dashboardRes, regulationsRes, mappingsRes, findingsRes] = await Promise.all([
fetch('/api/admin/compliance/dashboard'),
fetch('/api/admin/compliance/regulations'),
fetch('/api/admin/compliance/mappings'),
fetch('/api/admin/compliance/isms/findings/summary'),
fetch('/api/sdk/v1/compliance/dashboard'),
fetch('/api/sdk/v1/compliance/regulations'),
fetch('/api/sdk/v1/compliance/mappings'),
fetch('/api/sdk/v1/isms/findings?status=open'),
])
if (dashboardRes.ok) {
@@ -115,7 +115,7 @@ export default function ComplianceHubPage() {
const seedDatabase = async () => {
setSeeding(true)
try {
const res = await fetch('/api/admin/compliance/seed', {
const res = await fetch('/api/sdk/v1/compliance/seed', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ force: false }),

View File

@@ -1,7 +1,6 @@
'use client'
import React, { useState, useEffect, useCallback } from 'react'
import { useSDK } from '@/lib/sdk'
// =============================================================================
// TYPES
@@ -1220,12 +1219,6 @@ const TABS: { id: TabId; label: string }[] = [
export default function ISMSPage() {
const [tab, setTab] = useState<TabId>('overview')
const { markStepCompleted } = useSDK()
useEffect(() => {
markStepCompleted('isms')
}, [markStepCompleted])
return (
<div className="min-h-screen bg-gray-50 p-6">
<div className="max-w-7xl mx-auto">

View File

@@ -1,48 +1,23 @@
"""
FastAPI routes for AI Compliance Assistant.
FastAPI routes for AI Act Compliance — AI System CRUD.
Endpoints:
- /ai/status: Get AI provider status
- /ai/interpret: Interpret a requirement
- /ai/suggest-controls: Get AI-suggested controls
- /ai/assess-risk: Assess module risk
- /ai/gap-analysis: Analyze coverage gaps
- /ai/batch-interpret: Batch interpret requirements
- /ai/auto-map-controls: Auto-map controls to requirements
- /ai/batch-map-controls: Batch map controls
- /ai/switch-provider: Switch LLM provider
- /ai/providers: List available providers
- /pdf/*: PDF extraction endpoints
- /ai/systems: List/Create AI systems
- /ai/systems/{id}: Get/Update/Delete AI system
- /ai/systems/{id}/assess: Run AI Act risk assessment
"""
import logging
import os
from typing import Optional, List
from typing import Optional
from pydantic import BaseModel
from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from classroom_engine.database import get_db
from ..db import (
RegulationRepository,
RequirementRepository,
ControlRepository,
)
from ..db.models import RegulationDB, RequirementDB, AISystemDB, AIClassificationEnum, AISystemStatusEnum
from ..db.models import AISystemDB, AIClassificationEnum, AISystemStatusEnum
from .schemas import (
# AI Assistant schemas
AIInterpretationRequest, AIInterpretationResponse,
AIBatchInterpretationRequest, AIBatchInterpretationResponse,
AIControlSuggestionRequest, AIControlSuggestionResponse, AIControlSuggestionItem,
AIRiskAssessmentRequest, AIRiskAssessmentResponse, AIRiskFactor,
AIGapAnalysisRequest, AIGapAnalysisResponse,
AIStatusResponse,
# AI System schemas
AISystemCreate, AISystemUpdate, AISystemResponse, AISystemListResponse,
# PDF extraction schemas
BSIAspectResponse, PDFExtractionResponse,
)
logger = logging.getLogger(__name__)
@@ -402,865 +377,3 @@ def _derive_obligations(classification: str) -> list:
],
}
return obligations_map.get(classification, [])
# ============================================================================
# AI Assistant Endpoints (Sprint 4)
# ============================================================================
@router.get("/ai/status", response_model=AIStatusResponse)
async def get_ai_status():
"""Get the status of the AI provider."""
from ..services.llm_provider import get_shared_provider, LLMProviderType
try:
provider = get_shared_provider()
return AIStatusResponse(
provider=provider.provider_name,
model=provider.config.model,
is_available=True,
is_mock=provider.provider_name == "mock",
error=None,
)
except Exception as e:
return AIStatusResponse(
provider="unknown",
model="unknown",
is_available=False,
is_mock=True,
error=str(e),
)
@router.post("/ai/interpret", response_model=AIInterpretationResponse)
async def interpret_requirement(
request: AIInterpretationRequest,
db: Session = Depends(get_db),
):
"""Generate AI interpretation for a requirement."""
from ..services.ai_compliance_assistant import get_ai_assistant
# Get requirement from DB
req_repo = RequirementRepository(db)
requirement = req_repo.get_by_id(request.requirement_id)
if not requirement:
raise HTTPException(status_code=404, detail=f"Requirement {request.requirement_id} not found")
# Get regulation info
reg_repo = RegulationRepository(db)
regulation = reg_repo.get_by_id(requirement.regulation_id)
try:
assistant = get_ai_assistant()
result = await assistant.interpret_requirement(
requirement_id=requirement.id,
article=requirement.article,
title=requirement.title,
requirement_text=requirement.requirement_text or requirement.description or "",
regulation_code=regulation.code if regulation else "UNKNOWN",
regulation_name=regulation.name if regulation else "Unknown Regulation",
)
return AIInterpretationResponse(
requirement_id=result.requirement_id,
summary=result.summary,
applicability=result.applicability,
technical_measures=result.technical_measures,
affected_modules=result.affected_modules,
risk_level=result.risk_level,
implementation_hints=result.implementation_hints,
confidence_score=result.confidence_score,
error=result.error,
)
except Exception as e:
logger.error(f"AI interpretation failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/ai/suggest-controls", response_model=AIControlSuggestionResponse)
async def suggest_controls(
request: AIControlSuggestionRequest,
db: Session = Depends(get_db),
):
"""Get AI-suggested controls for a requirement."""
from ..services.ai_compliance_assistant import get_ai_assistant
# Get requirement from DB
req_repo = RequirementRepository(db)
requirement = req_repo.get_by_id(request.requirement_id)
if not requirement:
raise HTTPException(status_code=404, detail=f"Requirement {request.requirement_id} not found")
# Get regulation info
reg_repo = RegulationRepository(db)
regulation = reg_repo.get_by_id(requirement.regulation_id)
try:
assistant = get_ai_assistant()
suggestions = await assistant.suggest_controls(
requirement_title=requirement.title,
requirement_text=requirement.requirement_text or requirement.description or "",
regulation_name=regulation.name if regulation else "Unknown",
affected_modules=[], # Could be populated from previous interpretation
)
return AIControlSuggestionResponse(
requirement_id=request.requirement_id,
suggestions=[
AIControlSuggestionItem(
control_id=s.control_id,
domain=s.domain,
title=s.title,
description=s.description,
pass_criteria=s.pass_criteria,
implementation_guidance=s.implementation_guidance,
is_automated=s.is_automated,
automation_tool=s.automation_tool,
priority=s.priority,
confidence_score=s.confidence_score,
)
for s in suggestions
],
)
except Exception as e:
logger.error(f"AI control suggestion failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/ai/assess-risk", response_model=AIRiskAssessmentResponse)
async def assess_module_risk(
request: AIRiskAssessmentRequest,
db: Session = Depends(get_db),
):
"""Get AI risk assessment for a service module."""
from ..services.ai_compliance_assistant import get_ai_assistant
from ..db.repository import ServiceModuleRepository
# Get module from DB
module_repo = ServiceModuleRepository(db)
module = module_repo.get_by_id(request.module_id)
if not module:
module = module_repo.get_by_name(request.module_id)
if not module:
raise HTTPException(status_code=404, detail=f"Module {request.module_id} not found")
# Get regulations for this module
module_detail = module_repo.get_with_regulations(module.id)
regulations = []
if module_detail and module_detail.get("regulation_mappings"):
for mapping in module_detail["regulation_mappings"]:
regulations.append({
"code": mapping.get("regulation_code", ""),
"relevance": mapping.get("relevance_level", "medium"),
})
try:
assistant = get_ai_assistant()
result = await assistant.assess_module_risk(
module_name=module.name,
service_type=module.service_type.value if module.service_type else "unknown",
description=module.description or "",
processes_pii=module.processes_pii,
ai_components=module.ai_components,
criticality=module.criticality or "medium",
data_categories=module.data_categories or [],
regulations=regulations,
)
return AIRiskAssessmentResponse(
module_name=result.module_name,
overall_risk=result.overall_risk,
risk_factors=[
AIRiskFactor(
factor=f.get("factor", ""),
severity=f.get("severity", "medium"),
likelihood=f.get("likelihood", "medium"),
)
for f in result.risk_factors
],
recommendations=result.recommendations,
compliance_gaps=result.compliance_gaps,
confidence_score=result.confidence_score,
)
except Exception as e:
logger.error(f"AI risk assessment failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/ai/gap-analysis", response_model=AIGapAnalysisResponse)
async def analyze_gap(
request: AIGapAnalysisRequest,
db: Session = Depends(get_db),
):
"""Analyze coverage gaps between a requirement and existing controls."""
from ..services.ai_compliance_assistant import get_ai_assistant
# Get requirement from DB
req_repo = RequirementRepository(db)
requirement = req_repo.get_by_id(request.requirement_id)
if not requirement:
raise HTTPException(status_code=404, detail=f"Requirement {request.requirement_id} not found")
# Get regulation info
reg_repo = RegulationRepository(db)
regulation = reg_repo.get_by_id(requirement.regulation_id)
# Get existing control mappings from eager-loaded relationship
ctrl_repo = ControlRepository(db)
existing_controls = []
if requirement.control_mappings:
for mapping in requirement.control_mappings:
if mapping.control:
existing_controls.append({
"control_id": mapping.control.control_id,
"title": mapping.control.title,
"status": mapping.control.status.value if mapping.control.status else "unknown",
})
try:
assistant = get_ai_assistant()
result = await assistant.analyze_gap(
requirement_id=requirement.id,
requirement_title=requirement.title,
requirement_text=requirement.requirement_text or requirement.description or "",
regulation_code=regulation.code if regulation else "UNKNOWN",
existing_controls=existing_controls,
)
return AIGapAnalysisResponse(
requirement_id=result.requirement_id,
requirement_title=result.requirement_title,
coverage_level=result.coverage_level,
existing_controls=result.existing_controls,
missing_coverage=result.missing_coverage,
suggested_actions=result.suggested_actions,
)
except Exception as e:
logger.error(f"AI gap analysis failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/ai/batch-interpret", response_model=AIBatchInterpretationResponse)
async def batch_interpret_requirements(
request: AIBatchInterpretationRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
):
"""
Batch interpret multiple requirements.
For large batches, this runs in the background and returns immediately.
"""
from ..services.ai_compliance_assistant import get_ai_assistant
req_repo = RequirementRepository(db)
reg_repo = RegulationRepository(db)
# Build list of requirements to process
requirements_to_process = []
if request.requirement_ids:
for req_id in request.requirement_ids:
req = req_repo.get_by_id(req_id)
if req:
reg = reg_repo.get_by_id(req.regulation_id)
requirements_to_process.append({
"id": req.id,
"article": req.article,
"title": req.title,
"requirement_text": req.requirement_text or req.description or "",
"regulation_code": reg.code if reg else "UNKNOWN",
"regulation_name": reg.name if reg else "Unknown",
})
elif request.regulation_code:
# Get all requirements for a regulation
reg = reg_repo.get_by_code(request.regulation_code)
if reg:
reqs = req_repo.get_by_regulation(reg.id)
for req in reqs[:50]: # Limit to 50 for batch processing
requirements_to_process.append({
"id": req.id,
"article": req.article,
"title": req.title,
"requirement_text": req.requirement_text or req.description or "",
"regulation_code": reg.code,
"regulation_name": reg.name,
})
if not requirements_to_process:
raise HTTPException(status_code=400, detail="No requirements found to process")
# For small batches, process synchronously
if len(requirements_to_process) <= 5:
assistant = get_ai_assistant()
results = await assistant.batch_interpret_requirements(
requirements_to_process,
rate_limit=request.rate_limit,
)
return AIBatchInterpretationResponse(
total=len(requirements_to_process),
processed=len(results),
interpretations=[
AIInterpretationResponse(
requirement_id=r.requirement_id,
summary=r.summary,
applicability=r.applicability,
technical_measures=r.technical_measures,
affected_modules=r.affected_modules,
risk_level=r.risk_level,
implementation_hints=r.implementation_hints,
confidence_score=r.confidence_score,
error=r.error,
)
for r in results
],
)
# For large batches, return immediately with info
# (Background processing would be added in a production version)
return AIBatchInterpretationResponse(
total=len(requirements_to_process),
processed=0,
interpretations=[],
)
# ============================================================================
# PDF Extraction (Sprint 2)
# ============================================================================
@router.get("/pdf/available")
async def list_available_pdfs():
"""List available PDF documents for extraction."""
from pathlib import Path
docs_path = Path("/app/docs") if Path("/app/docs").exists() else Path("docs")
available = []
bsi_files = list(docs_path.glob("BSI-TR-*.pdf"))
for pdf_file in bsi_files:
available.append({
"filename": pdf_file.name,
"path": str(pdf_file),
"size_bytes": pdf_file.stat().st_size,
"type": "bsi_standard",
})
return {
"available_pdfs": available,
"total": len(available),
}
@router.post("/pdf/extract/{doc_code}", response_model=PDFExtractionResponse)
async def extract_pdf_requirements(
doc_code: str,
save_to_db: bool = Query(True, description="Save extracted requirements to database"),
db: Session = Depends(get_db),
):
"""
Extract requirements/aspects from a BSI-TR PDF document.
doc_code examples:
- BSI-TR-03161-1: General security requirements
- BSI-TR-03161-2: Web application security
- BSI-TR-03161-3: Backend/server security
"""
from pathlib import Path
from ..services.pdf_extractor import BSIPDFExtractor
from ..db.models import RegulationTypeEnum
# Find the PDF file
docs_path = Path("/app/docs") if Path("/app/docs").exists() else Path("docs")
pdf_path = docs_path / f"{doc_code}.pdf"
if not pdf_path.exists():
raise HTTPException(status_code=404, detail=f"PDF not found: {doc_code}.pdf")
# Extract aspects
extractor = BSIPDFExtractor()
try:
aspects = extractor.extract_from_file(str(pdf_path), source_name=doc_code)
except Exception as e:
logger.error(f"PDF extraction failed: {e}")
raise HTTPException(status_code=500, detail=f"PDF extraction failed: {str(e)}")
# Find or create the regulation
reg_repo = RegulationRepository(db)
regulation = reg_repo.get_by_code(doc_code)
if not regulation:
regulation = reg_repo.create(
code=doc_code,
name=f"BSI Technical Guideline {doc_code.split('-')[-1]}",
full_name=f"BSI Technische Richtlinie {doc_code}",
regulation_type=RegulationTypeEnum.BSI_STANDARD,
local_pdf_path=str(pdf_path),
)
# Save to database if requested
saved_count = 0
if save_to_db and aspects:
req_repo = RequirementRepository(db)
for aspect in aspects:
# Check if requirement already exists
existing = db.query(RequirementDB).filter(
RequirementDB.regulation_id == regulation.id,
RequirementDB.article == aspect.aspect_id,
).first()
if not existing:
try:
req_repo.create(
regulation_id=regulation.id,
article=aspect.aspect_id,
title=aspect.title[:300] if aspect.title else "",
description=f"Category: {aspect.category.value}",
requirement_text=aspect.full_text[:4000] if aspect.full_text else "",
priority=1 if aspect.requirement_level.value == "MUSS" else (
2 if aspect.requirement_level.value == "SOLL" else 3
),
)
saved_count += 1
except Exception as e:
logger.warning(f"Failed to save aspect {aspect.aspect_id}: {e}")
db.commit()
# Convert aspects to response format
aspect_responses = [
BSIAspectResponse(
aspect_id=a.aspect_id,
title=a.title,
full_text=a.full_text,
category=a.category.value,
page_number=a.page_number,
section=a.section,
requirement_level=a.requirement_level.value,
source_document=a.source_document,
)
for a in aspects
]
return PDFExtractionResponse(
doc_code=doc_code,
total_extracted=len(aspects),
saved_to_db=saved_count,
aspects=aspect_responses,
)
@router.get("/pdf/extraction-stats")
async def get_extraction_stats(db: Session = Depends(get_db)):
"""Get statistics about extracted PDF requirements."""
from sqlalchemy import func
# Count requirements per BSI regulation
stats = (
db.query(
RegulationDB.code,
func.count(RequirementDB.id).label('count')
)
.join(RequirementDB, RequirementDB.regulation_id == RegulationDB.id)
.filter(RegulationDB.code.like('BSI-%'))
.group_by(RegulationDB.code)
.all()
)
return {
"bsi_requirements": {code: count for code, count in stats},
"total_bsi_requirements": sum(count for _, count in stats),
}
# ============================================================================
# Automatic Control Mapping
# ============================================================================
# Domain keyword mapping for automatic control assignment
DOMAIN_KEYWORDS = {
"priv": ["datenschutz", "dsgvo", "gdpr", "privacy", "personenbezogen", "einwilligung",
"consent", "betroffenenrechte", "verarbeitungsverzeichnis", "pii", "auftragsverarbeitung"],
"iam": ["authentifizierung", "auth", "login", "passwort", "password", "zugang", "access",
"berechtigung", "session", "token", "jwt", "oauth", "sso", "mfa", "2fa", "rbac"],
"crypto": ["verschlüsselung", "encryption", "kryptograph", "crypto", "hash", "schlüssel",
"key", "tls", "ssl", "zertifikat", "signatur", "aes", "rsa"],
"sdlc": ["entwicklung", "code", "software", "sast", "dast", "dependency", "vulnerable",
"cve", "security scan", "semgrep", "trivy", "sbom", "ci/cd", "build"],
"ops": ["monitoring", "logging", "log", "protokoll", "backup", "incident", "alert",
"availability", "uptime", "patch", "update", "deployment"],
"ai": ["künstliche intelligenz", "ki", "ai", "machine learning", "ml", "modell",
"training", "inference", "bias", "ai act", "hochrisiko"],
"cra": ["vulnerability", "schwachstelle", "disclosure", "patch", "eol", "end-of-life",
"supply chain", "sbom", "cve", "update"],
"gov": ["richtlinie", "policy", "governance", "verantwortlich", "raci", "dokumentation",
"prozess", "awareness", "schulung", "training"],
"aud": ["audit", "prüfung", "nachweis", "evidence", "traceability", "nachvollzieh",
"protokoll", "export", "report"],
}
@router.post("/ai/auto-map-controls")
async def auto_map_controls(
requirement_id: str = Query(..., description="Requirement UUID"),
save_to_db: bool = Query(True, description="Save mappings to database"),
use_ai: bool = Query(False, description="Use AI for better matching (slower)"),
db: Session = Depends(get_db),
):
"""
Automatically map controls to a requirement.
Uses keyword matching by default (fast) or AI for better accuracy (slower).
"""
from ..db.models import ControlMappingDB
# Get requirement
req_repo = RequirementRepository(db)
requirement = req_repo.get_by_id(requirement_id)
if not requirement:
raise HTTPException(status_code=404, detail=f"Requirement {requirement_id} not found")
# Get all controls
ctrl_repo = ControlRepository(db)
all_controls = ctrl_repo.get_all()
# Text to analyze
text_to_analyze = f"{requirement.title} {requirement.requirement_text or ''} {requirement.description or ''}"
text_lower = text_to_analyze.lower()
matched_controls = []
if use_ai:
# Use AI for matching (slower but more accurate)
from ..services.ai_compliance_assistant import get_ai_assistant
assistant = get_ai_assistant()
reg_repo = RegulationRepository(db)
regulation = reg_repo.get_by_id(requirement.regulation_id)
try:
suggestions = await assistant.suggest_controls(
requirement_title=requirement.title,
requirement_text=requirement.requirement_text or "",
regulation_name=regulation.name if regulation else "Unknown",
affected_modules=[],
)
# Match suggestions to existing controls by domain
for suggestion in suggestions:
domain = suggestion.domain.lower()
domain_controls = [c for c in all_controls if c.domain and c.domain.value.lower() == domain]
if domain_controls:
# Take the first matching control from this domain
matched_controls.append({
"control": domain_controls[0],
"coverage": "partial",
"notes": f"AI suggested: {suggestion.title}",
"confidence": suggestion.confidence_score,
})
except Exception as e:
logger.warning(f"AI mapping failed, falling back to keyword matching: {e}")
use_ai = False # Fall back to keyword matching
if not use_ai:
# Keyword-based matching (fast)
domain_scores = {}
for domain, keywords in DOMAIN_KEYWORDS.items():
score = sum(1 for kw in keywords if kw.lower() in text_lower)
if score > 0:
domain_scores[domain] = score
# Sort domains by score
sorted_domains = sorted(domain_scores.items(), key=lambda x: x[1], reverse=True)
# Take top 3 domains
for domain, score in sorted_domains[:3]:
domain_controls = [c for c in all_controls if c.domain and c.domain.value.lower() == domain]
for ctrl in domain_controls[:2]: # Max 2 controls per domain
matched_controls.append({
"control": ctrl,
"coverage": "partial" if score < 3 else "full",
"notes": f"Keyword match (score: {score})",
"confidence": min(0.9, 0.5 + score * 0.1),
})
# Save mappings to database if requested
created_mappings = []
if save_to_db and matched_controls:
for match in matched_controls:
ctrl = match["control"]
# Check if mapping already exists
existing = db.query(ControlMappingDB).filter(
ControlMappingDB.requirement_id == requirement_id,
ControlMappingDB.control_id == ctrl.id,
).first()
if not existing:
mapping = ControlMappingDB(
requirement_id=requirement_id,
control_id=ctrl.id,
coverage_level=match["coverage"],
notes=match["notes"],
)
db.add(mapping)
created_mappings.append({
"control_id": ctrl.control_id,
"domain": ctrl.domain.value if ctrl.domain else None,
"title": ctrl.title,
"coverage_level": match["coverage"],
"notes": match["notes"],
})
db.commit()
return {
"requirement_id": requirement_id,
"requirement_title": requirement.title,
"matched_controls": len(matched_controls),
"created_mappings": len(created_mappings),
"mappings": created_mappings if save_to_db else [
{
"control_id": m["control"].control_id,
"domain": m["control"].domain.value if m["control"].domain else None,
"title": m["control"].title,
"coverage_level": m["coverage"],
"confidence": m.get("confidence", 0.7),
}
for m in matched_controls
],
}
@router.post("/ai/batch-map-controls")
async def batch_map_controls(
regulation_code: Optional[str] = Query(None, description="Filter by regulation code"),
limit: int = Query(100, description="Max requirements to process"),
use_ai: bool = Query(False, description="Use AI for matching (slower)"),
background_tasks: BackgroundTasks = None,
db: Session = Depends(get_db),
):
"""
Batch map controls to multiple requirements.
Processes requirements that don't have mappings yet.
"""
from ..db.models import ControlMappingDB
# Get requirements without mappings
req_repo = RequirementRepository(db)
if regulation_code:
reg_repo = RegulationRepository(db)
regulation = reg_repo.get_by_code(regulation_code)
if not regulation:
raise HTTPException(status_code=404, detail=f"Regulation {regulation_code} not found")
all_requirements = req_repo.get_by_regulation(regulation.id)
else:
all_requirements = req_repo.get_all()
# Filter to requirements without mappings
requirements_without_mappings = []
for req in all_requirements:
existing = db.query(ControlMappingDB).filter(
ControlMappingDB.requirement_id == req.id
).first()
if not existing:
requirements_without_mappings.append(req)
# Limit processing
to_process = requirements_without_mappings[:limit]
# Get all controls once
ctrl_repo = ControlRepository(db)
all_controls = ctrl_repo.get_all()
# Process each requirement
results = []
for req in to_process:
try:
text_to_analyze = f"{req.title} {req.requirement_text or ''} {req.description or ''}"
text_lower = text_to_analyze.lower()
# Quick keyword matching
domain_scores = {}
for domain, keywords in DOMAIN_KEYWORDS.items():
score = sum(1 for kw in keywords if kw.lower() in text_lower)
if score > 0:
domain_scores[domain] = score
if domain_scores:
# Get top domain
top_domain = max(domain_scores.items(), key=lambda x: x[1])[0]
domain_controls = [c for c in all_controls if c.domain and c.domain.value.lower() == top_domain]
if domain_controls:
ctrl = domain_controls[0]
# Create mapping
mapping = ControlMappingDB(
requirement_id=req.id,
control_id=ctrl.id,
coverage_level="partial",
notes=f"Auto-mapped (domain: {top_domain})",
)
db.add(mapping)
results.append({
"requirement_id": req.id,
"requirement_title": req.title[:50],
"control_id": ctrl.control_id,
"domain": top_domain,
})
except Exception as e:
logger.warning(f"Failed to map requirement {req.id}: {e}")
db.commit()
return {
"processed": len(to_process),
"mapped": len(results),
"remaining": len(requirements_without_mappings) - len(to_process),
"mappings": results[:20], # Only return first 20 for readability
}
# ============================================================================
# LLM Provider Switch Endpoints (Runtime Configuration)
# ============================================================================
class ProviderSwitchRequest(BaseModel):
"""Request to switch LLM provider at runtime."""
provider: str # "anthropic" or "self_hosted"
model: Optional[str] = None # Optional: override model
url: Optional[str] = None # Optional: override URL for self-hosted
class ProviderSwitchResponse(BaseModel):
"""Response after switching LLM provider."""
success: bool
previous_provider: str
new_provider: str
model: str
url: Optional[str] = None
message: str
@router.post("/ai/switch-provider", response_model=ProviderSwitchResponse)
async def switch_llm_provider(request: ProviderSwitchRequest):
"""
Switch the LLM provider at runtime between Anthropic API and Self-Hosted (Ollama).
This allows developers to toggle between:
- **anthropic**: Cloud-based Claude API (kostenpflichtig, Daten gehen zu Anthropic)
- **self_hosted**: Self-hosted Ollama on Mac Mini (kostenlos, DSGVO-konform, Daten bleiben intern)
Note: This change is temporary for the current container session.
For permanent changes, modify the docker-compose.yml environment variables.
"""
from ..services.llm_provider import (
reset_shared_provider,
get_shared_provider,
LLMProviderType,
)
try:
# Get current provider info before switch
old_provider = get_shared_provider()
old_provider_name = old_provider.provider_name
# Map string to enum
provider_map = {
"anthropic": LLMProviderType.ANTHROPIC,
"self_hosted": LLMProviderType.SELF_HOSTED,
"mock": LLMProviderType.MOCK,
}
if request.provider.lower() not in provider_map:
raise HTTPException(
status_code=400,
detail=f"Invalid provider: {request.provider}. Use 'anthropic' or 'self_hosted'"
)
# Update environment variables for the new provider
os.environ["COMPLIANCE_LLM_PROVIDER"] = request.provider.lower()
if request.provider.lower() == "self_hosted":
if request.url:
os.environ["SELF_HOSTED_LLM_URL"] = request.url
if request.model:
os.environ["SELF_HOSTED_LLM_MODEL"] = request.model
else:
# Default to llama3.1:70b for compliance tasks
os.environ["SELF_HOSTED_LLM_MODEL"] = os.environ.get(
"SELF_HOSTED_LLM_MODEL", "llama3.1:70b"
)
elif request.provider.lower() == "anthropic":
if request.model:
os.environ["ANTHROPIC_MODEL"] = request.model
# Reset the shared provider to pick up new config
reset_shared_provider()
# Get the new provider
new_provider = get_shared_provider()
return ProviderSwitchResponse(
success=True,
previous_provider=old_provider_name,
new_provider=new_provider.provider_name,
model=new_provider.config.model,
url=new_provider.config.base_url if hasattr(new_provider.config, 'base_url') else None,
message=f"Successfully switched from {old_provider_name} to {new_provider.provider_name}",
)
except Exception as e:
logger.error(f"Failed to switch LLM provider: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/ai/providers")
async def list_available_providers():
"""
List available LLM providers with their descriptions.
This helps developers understand which provider to use for which scenario.
"""
return {
"providers": [
{
"id": "anthropic",
"name": "Anthropic Claude API",
"description_de": "Cloud-basierte KI von Anthropic. Kostenpflichtig (API-Credits). Daten werden zur Verarbeitung an Anthropic gesendet.",
"description_en": "Cloud-based AI from Anthropic. Paid service (API credits). Data is sent to Anthropic for processing.",
"gdpr_compliant": False,
"data_location": "Anthropic Cloud (USA)",
"cost": "Kostenpflichtig pro Token",
"use_case": "Produktiv, wenn hohe Qualitaet benoetigt wird",
"models": ["claude-sonnet-4-20250514", "claude-3-5-sonnet-20241022"],
},
{
"id": "self_hosted",
"name": "Self-Hosted Ollama",
"description_de": "Lokales LLM auf dem Mac Mini M4 Pro (64GB RAM). Kostenlos. Alle Daten bleiben intern - DSGVO-konform!",
"description_en": "Local LLM on Mac Mini M4 Pro (64GB RAM). Free. All data stays internal - GDPR compliant!",
"gdpr_compliant": True,
"data_location": "Lokal auf Mac Mini",
"cost": "Kostenlos (Hardware bereits vorhanden)",
"use_case": "Entwicklung, Testing, DSGVO-sensitive Dokumente",
"models": ["llama3.1:70b", "llama3.2-vision", "mixtral:8x7b"],
},
],
"current_provider": None, # Will be filled by get_ai_status
"note_de": "Umschaltung erfolgt sofort, aber nur fuer diese Container-Session. Fuer permanente Aenderung docker-compose.yml anpassen.",
"note_en": "Switch takes effect immediately but only for this container session. For permanent change, modify docker-compose.yml.",
}

View File

@@ -664,114 +664,6 @@ class ModuleComplianceOverview(BaseModel):
regulations_coverage: Dict[str, int] # regulation_code -> module_count
# ============================================================================
# AI Assistant Schemas (Sprint 4)
# ============================================================================
class AIInterpretationRequest(BaseModel):
"""Request for AI interpretation of a requirement."""
requirement_id: str
force_refresh: bool = False
class AIInterpretationResponse(BaseModel):
"""AI-generated interpretation of a requirement."""
requirement_id: str
summary: str
applicability: str
technical_measures: List[str]
affected_modules: List[str]
risk_level: str
implementation_hints: List[str]
confidence_score: float
error: Optional[str] = None
class AIBatchInterpretationRequest(BaseModel):
"""Request for batch interpretation of requirements."""
requirement_ids: List[str]
regulation_code: Optional[str] = None
rate_limit: float = 1.0
class AIBatchInterpretationResponse(BaseModel):
"""Response for batch interpretation."""
total: int
processed: int
interpretations: List[AIInterpretationResponse]
class AIControlSuggestionRequest(BaseModel):
"""Request for AI control suggestions."""
requirement_id: str
class AIControlSuggestionItem(BaseModel):
"""A single control suggestion from AI."""
control_id: str
domain: str
title: str
description: str
pass_criteria: str
implementation_guidance: str
is_automated: bool
automation_tool: Optional[str] = None
priority: str
confidence_score: float
class AIControlSuggestionResponse(BaseModel):
"""Response with AI control suggestions."""
requirement_id: str
suggestions: List[AIControlSuggestionItem]
class AIRiskAssessmentRequest(BaseModel):
"""Request for AI risk assessment of a module."""
module_id: str
class AIRiskFactor(BaseModel):
"""A risk factor in the assessment."""
factor: str
severity: str
likelihood: str
class AIRiskAssessmentResponse(BaseModel):
"""AI-generated risk assessment."""
module_name: str
overall_risk: str
risk_factors: List[AIRiskFactor]
recommendations: List[str]
compliance_gaps: List[str]
confidence_score: float
class AIGapAnalysisRequest(BaseModel):
"""Request for gap analysis."""
requirement_id: str
class AIGapAnalysisResponse(BaseModel):
"""AI-generated gap analysis."""
requirement_id: str
requirement_title: str
coverage_level: str
existing_controls: List[str]
missing_coverage: List[str]
suggested_actions: List[str]
class AIStatusResponse(BaseModel):
"""Status of the AI provider."""
provider: str
model: str
is_available: bool
is_mock: bool
error: Optional[str] = None
# ============================================================================
# Executive Dashboard Schemas (Phase 3 - Sprint 1)
# ============================================================================