feat: 7 Analyse-Module auf 100% — Backend-Endpoints, DB-Model, Frontend-Persistenz
Alle 7 Analyse-Module (Requirements → Report) von ~80% auf 100% gebracht: - Modul 1 (Requirements): POST/DELETE Endpoints + Frontend-Anbindung + Rollback - Modul 2 (Controls): Evidence-Linking UI mit Validity-Badge - Modul 3 (Evidence): Pagination (Frontend + Backend) - Modul 4 (Risk Matrix): Mitigation-UI, Residual Risk, Status-Workflow - Modul 5 (AI Act): AISystemDB Model, 6 CRUD-Endpoints, Backend-Persistenz - Modul 6 (Audit Checklist): PDF-Download + Session-History - Modul 7 (Audit Report): Detail-Seite mit Checklist Sign-Off + Navigation Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -30,7 +30,7 @@ from ..db import (
|
||||
RequirementRepository,
|
||||
ControlRepository,
|
||||
)
|
||||
from ..db.models import RegulationDB, RequirementDB
|
||||
from ..db.models import RegulationDB, RequirementDB, AISystemDB, AIClassificationEnum, AISystemStatusEnum
|
||||
from .schemas import (
|
||||
# AI Assistant schemas
|
||||
AIInterpretationRequest, AIInterpretationResponse,
|
||||
@@ -39,6 +39,8 @@ from .schemas import (
|
||||
AIRiskAssessmentRequest, AIRiskAssessmentResponse, AIRiskFactor,
|
||||
AIGapAnalysisRequest, AIGapAnalysisResponse,
|
||||
AIStatusResponse,
|
||||
# AI System schemas
|
||||
AISystemCreate, AISystemUpdate, AISystemResponse, AISystemListResponse,
|
||||
# PDF extraction schemas
|
||||
BSIAspectResponse, PDFExtractionResponse,
|
||||
)
|
||||
@@ -47,6 +49,361 @@ logger = logging.getLogger(__name__)
|
||||
router = APIRouter(tags=["compliance-ai"])
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# AI System CRUD Endpoints (AI Act Compliance)
|
||||
# ============================================================================
|
||||
|
||||
@router.get("/ai/systems", response_model=AISystemListResponse)
|
||||
async def list_ai_systems(
|
||||
classification: Optional[str] = Query(None, description="Filter by classification"),
|
||||
status: Optional[str] = Query(None, description="Filter by status"),
|
||||
sector: Optional[str] = Query(None, description="Filter by sector"),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""List all registered AI systems."""
|
||||
import uuid as _uuid
|
||||
query = db.query(AISystemDB)
|
||||
|
||||
if classification:
|
||||
try:
|
||||
cls_enum = AIClassificationEnum(classification)
|
||||
query = query.filter(AISystemDB.classification == cls_enum)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if status:
|
||||
try:
|
||||
status_enum = AISystemStatusEnum(status)
|
||||
query = query.filter(AISystemDB.status == status_enum)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if sector:
|
||||
query = query.filter(AISystemDB.sector.ilike(f"%{sector}%"))
|
||||
|
||||
systems = query.order_by(AISystemDB.created_at.desc()).all()
|
||||
|
||||
results = [
|
||||
AISystemResponse(
|
||||
id=s.id,
|
||||
name=s.name,
|
||||
description=s.description,
|
||||
purpose=s.purpose,
|
||||
sector=s.sector,
|
||||
classification=s.classification.value if s.classification else "unclassified",
|
||||
status=s.status.value if s.status else "draft",
|
||||
obligations=s.obligations or [],
|
||||
assessment_date=s.assessment_date,
|
||||
assessment_result=s.assessment_result,
|
||||
risk_factors=s.risk_factors,
|
||||
recommendations=s.recommendations,
|
||||
created_at=s.created_at,
|
||||
updated_at=s.updated_at,
|
||||
)
|
||||
for s in systems
|
||||
]
|
||||
|
||||
return AISystemListResponse(systems=results, total=len(results))
|
||||
|
||||
|
||||
@router.post("/ai/systems", response_model=AISystemResponse)
|
||||
async def create_ai_system(
|
||||
data: AISystemCreate,
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Register a new AI system."""
|
||||
import uuid as _uuid
|
||||
from datetime import datetime
|
||||
|
||||
try:
|
||||
cls_enum = AIClassificationEnum(data.classification) if data.classification else AIClassificationEnum.UNCLASSIFIED
|
||||
except ValueError:
|
||||
cls_enum = AIClassificationEnum.UNCLASSIFIED
|
||||
|
||||
try:
|
||||
status_enum = AISystemStatusEnum(data.status) if data.status else AISystemStatusEnum.DRAFT
|
||||
except ValueError:
|
||||
status_enum = AISystemStatusEnum.DRAFT
|
||||
|
||||
system = AISystemDB(
|
||||
id=str(_uuid.uuid4()),
|
||||
name=data.name,
|
||||
description=data.description,
|
||||
purpose=data.purpose,
|
||||
sector=data.sector,
|
||||
classification=cls_enum,
|
||||
status=status_enum,
|
||||
obligations=data.obligations or [],
|
||||
)
|
||||
db.add(system)
|
||||
db.commit()
|
||||
db.refresh(system)
|
||||
|
||||
return AISystemResponse(
|
||||
id=system.id,
|
||||
name=system.name,
|
||||
description=system.description,
|
||||
purpose=system.purpose,
|
||||
sector=system.sector,
|
||||
classification=system.classification.value if system.classification else "unclassified",
|
||||
status=system.status.value if system.status else "draft",
|
||||
obligations=system.obligations or [],
|
||||
assessment_date=system.assessment_date,
|
||||
assessment_result=system.assessment_result,
|
||||
risk_factors=system.risk_factors,
|
||||
recommendations=system.recommendations,
|
||||
created_at=system.created_at,
|
||||
updated_at=system.updated_at,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/ai/systems/{system_id}", response_model=AISystemResponse)
|
||||
async def get_ai_system(system_id: str, db: Session = Depends(get_db)):
|
||||
"""Get a specific AI system by ID."""
|
||||
system = db.query(AISystemDB).filter(AISystemDB.id == system_id).first()
|
||||
if not system:
|
||||
raise HTTPException(status_code=404, detail=f"AI System {system_id} not found")
|
||||
|
||||
return AISystemResponse(
|
||||
id=system.id,
|
||||
name=system.name,
|
||||
description=system.description,
|
||||
purpose=system.purpose,
|
||||
sector=system.sector,
|
||||
classification=system.classification.value if system.classification else "unclassified",
|
||||
status=system.status.value if system.status else "draft",
|
||||
obligations=system.obligations or [],
|
||||
assessment_date=system.assessment_date,
|
||||
assessment_result=system.assessment_result,
|
||||
risk_factors=system.risk_factors,
|
||||
recommendations=system.recommendations,
|
||||
created_at=system.created_at,
|
||||
updated_at=system.updated_at,
|
||||
)
|
||||
|
||||
|
||||
@router.put("/ai/systems/{system_id}", response_model=AISystemResponse)
|
||||
async def update_ai_system(
|
||||
system_id: str,
|
||||
data: AISystemUpdate,
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Update an AI system."""
|
||||
from datetime import datetime
|
||||
|
||||
system = db.query(AISystemDB).filter(AISystemDB.id == system_id).first()
|
||||
if not system:
|
||||
raise HTTPException(status_code=404, detail=f"AI System {system_id} not found")
|
||||
|
||||
update_data = data.model_dump(exclude_unset=True)
|
||||
|
||||
if "classification" in update_data:
|
||||
try:
|
||||
update_data["classification"] = AIClassificationEnum(update_data["classification"])
|
||||
except ValueError:
|
||||
raise HTTPException(status_code=400, detail=f"Invalid classification: {update_data['classification']}")
|
||||
|
||||
if "status" in update_data:
|
||||
try:
|
||||
update_data["status"] = AISystemStatusEnum(update_data["status"])
|
||||
except ValueError:
|
||||
raise HTTPException(status_code=400, detail=f"Invalid status: {update_data['status']}")
|
||||
|
||||
for key, value in update_data.items():
|
||||
if hasattr(system, key):
|
||||
setattr(system, key, value)
|
||||
|
||||
system.updated_at = datetime.utcnow()
|
||||
db.commit()
|
||||
db.refresh(system)
|
||||
|
||||
return AISystemResponse(
|
||||
id=system.id,
|
||||
name=system.name,
|
||||
description=system.description,
|
||||
purpose=system.purpose,
|
||||
sector=system.sector,
|
||||
classification=system.classification.value if system.classification else "unclassified",
|
||||
status=system.status.value if system.status else "draft",
|
||||
obligations=system.obligations or [],
|
||||
assessment_date=system.assessment_date,
|
||||
assessment_result=system.assessment_result,
|
||||
risk_factors=system.risk_factors,
|
||||
recommendations=system.recommendations,
|
||||
created_at=system.created_at,
|
||||
updated_at=system.updated_at,
|
||||
)
|
||||
|
||||
|
||||
@router.delete("/ai/systems/{system_id}")
|
||||
async def delete_ai_system(system_id: str, db: Session = Depends(get_db)):
|
||||
"""Delete an AI system."""
|
||||
system = db.query(AISystemDB).filter(AISystemDB.id == system_id).first()
|
||||
if not system:
|
||||
raise HTTPException(status_code=404, detail=f"AI System {system_id} not found")
|
||||
|
||||
db.delete(system)
|
||||
db.commit()
|
||||
return {"success": True, "message": "AI System deleted"}
|
||||
|
||||
|
||||
@router.post("/ai/systems/{system_id}/assess", response_model=AISystemResponse)
|
||||
async def assess_ai_system(
|
||||
system_id: str,
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Run AI Act risk assessment for an AI system."""
|
||||
from datetime import datetime
|
||||
|
||||
system = db.query(AISystemDB).filter(AISystemDB.id == system_id).first()
|
||||
if not system:
|
||||
raise HTTPException(status_code=404, detail=f"AI System {system_id} not found")
|
||||
|
||||
# Try AI-based assessment
|
||||
assessment_result = None
|
||||
try:
|
||||
from ..services.ai_compliance_assistant import get_ai_assistant
|
||||
assistant = get_ai_assistant()
|
||||
result = await assistant.assess_module_risk(
|
||||
module_name=system.name,
|
||||
service_type="ai_system",
|
||||
description=system.description or "",
|
||||
processes_pii=True,
|
||||
ai_components=True,
|
||||
criticality="high",
|
||||
data_categories=[],
|
||||
regulations=[{"code": "AI-ACT", "relevance": "high"}],
|
||||
)
|
||||
assessment_result = {
|
||||
"overall_risk": result.overall_risk,
|
||||
"risk_factors": result.risk_factors,
|
||||
"recommendations": result.recommendations,
|
||||
"compliance_gaps": result.compliance_gaps,
|
||||
"confidence_score": result.confidence_score,
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning(f"AI assessment failed for {system_id}, using rule-based: {e}")
|
||||
# Rule-based fallback
|
||||
assessment_result = _rule_based_assessment(system)
|
||||
|
||||
# Update system with assessment results
|
||||
classification = _derive_classification(assessment_result)
|
||||
try:
|
||||
system.classification = AIClassificationEnum(classification)
|
||||
except ValueError:
|
||||
system.classification = AIClassificationEnum.UNCLASSIFIED
|
||||
|
||||
system.assessment_date = datetime.utcnow()
|
||||
system.assessment_result = assessment_result
|
||||
system.obligations = _derive_obligations(classification)
|
||||
system.risk_factors = assessment_result.get("risk_factors", [])
|
||||
system.recommendations = assessment_result.get("recommendations", [])
|
||||
system.status = AISystemStatusEnum.CLASSIFIED
|
||||
|
||||
db.commit()
|
||||
db.refresh(system)
|
||||
|
||||
return AISystemResponse(
|
||||
id=system.id,
|
||||
name=system.name,
|
||||
description=system.description,
|
||||
purpose=system.purpose,
|
||||
sector=system.sector,
|
||||
classification=system.classification.value if system.classification else "unclassified",
|
||||
status=system.status.value if system.status else "draft",
|
||||
obligations=system.obligations or [],
|
||||
assessment_date=system.assessment_date,
|
||||
assessment_result=system.assessment_result,
|
||||
risk_factors=system.risk_factors,
|
||||
recommendations=system.recommendations,
|
||||
created_at=system.created_at,
|
||||
updated_at=system.updated_at,
|
||||
)
|
||||
|
||||
|
||||
def _rule_based_assessment(system: AISystemDB) -> dict:
|
||||
"""Simple rule-based AI Act classification when AI service is unavailable."""
|
||||
desc = (system.description or "").lower() + " " + (system.purpose or "").lower()
|
||||
sector = (system.sector or "").lower()
|
||||
|
||||
risk_factors = []
|
||||
risk_score = 0
|
||||
|
||||
# Check for prohibited use cases
|
||||
prohibited_keywords = ["social scoring", "biometric surveillance", "emotion recognition", "subliminal manipulation"]
|
||||
for kw in prohibited_keywords:
|
||||
if kw in desc:
|
||||
risk_factors.append({"factor": f"Prohibited use case: {kw}", "severity": "critical", "likelihood": "high"})
|
||||
risk_score += 10
|
||||
|
||||
# Check for high-risk indicators
|
||||
high_risk_keywords = ["education", "employment", "credit scoring", "law enforcement", "migration", "critical infrastructure", "medical", "bildung", "gesundheit"]
|
||||
for kw in high_risk_keywords:
|
||||
if kw in desc or kw in sector:
|
||||
risk_factors.append({"factor": f"High-risk sector: {kw}", "severity": "high", "likelihood": "medium"})
|
||||
risk_score += 5
|
||||
|
||||
# Check for limited-risk indicators
|
||||
limited_keywords = ["chatbot", "deepfake", "emotion", "biometric"]
|
||||
for kw in limited_keywords:
|
||||
if kw in desc:
|
||||
risk_factors.append({"factor": f"Transparency requirement: {kw}", "severity": "medium", "likelihood": "high"})
|
||||
risk_score += 3
|
||||
|
||||
return {
|
||||
"overall_risk": "critical" if risk_score >= 10 else "high" if risk_score >= 5 else "medium" if risk_score >= 3 else "low",
|
||||
"risk_factors": risk_factors,
|
||||
"recommendations": [
|
||||
"Dokumentation des AI-Systems vervollstaendigen",
|
||||
"Risikomanagement-Framework implementieren",
|
||||
"Transparenzpflichten pruefen",
|
||||
],
|
||||
"compliance_gaps": [],
|
||||
"confidence_score": 0.6,
|
||||
"risk_score": risk_score,
|
||||
}
|
||||
|
||||
|
||||
def _derive_classification(assessment: dict) -> str:
|
||||
"""Derive AI Act classification from assessment result."""
|
||||
risk = assessment.get("overall_risk", "medium")
|
||||
score = assessment.get("risk_score", 0)
|
||||
|
||||
if score >= 10:
|
||||
return "prohibited"
|
||||
elif risk in ("critical", "high") or score >= 5:
|
||||
return "high-risk"
|
||||
elif risk == "medium" or score >= 3:
|
||||
return "limited-risk"
|
||||
else:
|
||||
return "minimal-risk"
|
||||
|
||||
|
||||
def _derive_obligations(classification: str) -> list:
|
||||
"""Derive AI Act obligations based on classification."""
|
||||
obligations_map = {
|
||||
"prohibited": ["Einsatz verboten (Art. 5 AI Act)"],
|
||||
"high-risk": [
|
||||
"Risikomanagementsystem (Art. 9)",
|
||||
"Daten-Governance (Art. 10)",
|
||||
"Technische Dokumentation (Art. 11)",
|
||||
"Aufzeichnungspflicht (Art. 12)",
|
||||
"Transparenz (Art. 13)",
|
||||
"Menschliche Aufsicht (Art. 14)",
|
||||
"Genauigkeit & Robustheit (Art. 15)",
|
||||
"Konformitaetsbewertung (Art. 43)",
|
||||
],
|
||||
"limited-risk": [
|
||||
"Transparenzpflicht (Art. 52)",
|
||||
"Kennzeichnung als KI-System",
|
||||
],
|
||||
"minimal-risk": [
|
||||
"Freiwillige Verhaltenskodizes (Art. 69)",
|
||||
],
|
||||
}
|
||||
return obligations_map.get(classification, [])
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# AI Assistant Endpoints (Sprint 4)
|
||||
# ============================================================================
|
||||
|
||||
@@ -46,9 +46,11 @@ async def list_evidence(
|
||||
control_id: Optional[str] = None,
|
||||
evidence_type: Optional[str] = None,
|
||||
status: Optional[str] = None,
|
||||
page: Optional[int] = Query(None, ge=1, description="Page number (1-based)"),
|
||||
limit: Optional[int] = Query(None, ge=1, le=500, description="Items per page"),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""List evidence with optional filters."""
|
||||
"""List evidence with optional filters and pagination."""
|
||||
repo = EvidenceRepository(db)
|
||||
|
||||
if control_id:
|
||||
@@ -71,6 +73,13 @@ async def list_evidence(
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
total = len(evidence)
|
||||
|
||||
# Apply pagination if requested
|
||||
if page is not None and limit is not None:
|
||||
offset = (page - 1) * limit
|
||||
evidence = evidence[offset:offset + limit]
|
||||
|
||||
results = [
|
||||
EvidenceResponse(
|
||||
id=e.id,
|
||||
@@ -95,7 +104,7 @@ async def list_evidence(
|
||||
for e in evidence
|
||||
]
|
||||
|
||||
return EvidenceListResponse(evidence=results, total=len(results))
|
||||
return EvidenceListResponse(evidence=results, total=total)
|
||||
|
||||
|
||||
@router.post("/evidence", response_model=EvidenceResponse)
|
||||
|
||||
@@ -324,6 +324,59 @@ async def list_requirements_paginated(
|
||||
)
|
||||
|
||||
|
||||
@router.post("/requirements", response_model=RequirementResponse)
|
||||
async def create_requirement(
|
||||
data: RequirementCreate,
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""Create a new requirement."""
|
||||
# Verify regulation exists
|
||||
reg_repo = RegulationRepository(db)
|
||||
regulation = reg_repo.get_by_id(data.regulation_id)
|
||||
if not regulation:
|
||||
raise HTTPException(status_code=404, detail=f"Regulation {data.regulation_id} not found")
|
||||
|
||||
req_repo = RequirementRepository(db)
|
||||
requirement = req_repo.create(
|
||||
regulation_id=data.regulation_id,
|
||||
article=data.article,
|
||||
title=data.title,
|
||||
paragraph=data.paragraph,
|
||||
description=data.description,
|
||||
requirement_text=data.requirement_text,
|
||||
breakpilot_interpretation=data.breakpilot_interpretation,
|
||||
is_applicable=data.is_applicable,
|
||||
priority=data.priority,
|
||||
)
|
||||
|
||||
return RequirementResponse(
|
||||
id=requirement.id,
|
||||
regulation_id=requirement.regulation_id,
|
||||
regulation_code=regulation.code,
|
||||
article=requirement.article,
|
||||
paragraph=requirement.paragraph,
|
||||
title=requirement.title,
|
||||
description=requirement.description,
|
||||
requirement_text=requirement.requirement_text,
|
||||
breakpilot_interpretation=requirement.breakpilot_interpretation,
|
||||
is_applicable=requirement.is_applicable,
|
||||
applicability_reason=requirement.applicability_reason,
|
||||
priority=requirement.priority,
|
||||
created_at=requirement.created_at,
|
||||
updated_at=requirement.updated_at,
|
||||
)
|
||||
|
||||
|
||||
@router.delete("/requirements/{requirement_id}")
|
||||
async def delete_requirement(requirement_id: str, db: Session = Depends(get_db)):
|
||||
"""Delete a requirement by ID."""
|
||||
req_repo = RequirementRepository(db)
|
||||
deleted = req_repo.delete(requirement_id)
|
||||
if not deleted:
|
||||
raise HTTPException(status_code=404, detail=f"Requirement {requirement_id} not found")
|
||||
return {"success": True, "message": "Requirement deleted"}
|
||||
|
||||
|
||||
@router.put("/requirements/{requirement_id}")
|
||||
async def update_requirement(requirement_id: str, updates: dict, db: Session = Depends(get_db)):
|
||||
"""Update a requirement with implementation/audit details."""
|
||||
@@ -818,7 +871,7 @@ async def init_tables(db: Session = Depends(get_db)):
|
||||
from classroom_engine.database import engine
|
||||
from ..db.models import (
|
||||
RegulationDB, RequirementDB, ControlDB, ControlMappingDB,
|
||||
EvidenceDB, RiskDB, AuditExportDB
|
||||
EvidenceDB, RiskDB, AuditExportDB, AISystemDB
|
||||
)
|
||||
|
||||
try:
|
||||
@@ -830,6 +883,7 @@ async def init_tables(db: Session = Depends(get_db)):
|
||||
EvidenceDB.__table__.create(engine, checkfirst=True)
|
||||
RiskDB.__table__.create(engine, checkfirst=True)
|
||||
AuditExportDB.__table__.create(engine, checkfirst=True)
|
||||
AISystemDB.__table__.create(engine, checkfirst=True)
|
||||
|
||||
return {"success": True, "message": "Tables created successfully"}
|
||||
except Exception as e:
|
||||
|
||||
@@ -385,6 +385,52 @@ class RiskMatrixResponse(BaseModel):
|
||||
risks: List[RiskResponse]
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# AI System Schemas (AI Act Compliance)
|
||||
# ============================================================================
|
||||
|
||||
class AISystemBase(BaseModel):
|
||||
name: str
|
||||
description: Optional[str] = None
|
||||
purpose: Optional[str] = None
|
||||
sector: Optional[str] = None
|
||||
classification: str = "unclassified"
|
||||
status: str = "draft"
|
||||
obligations: Optional[List[str]] = None
|
||||
|
||||
|
||||
class AISystemCreate(AISystemBase):
|
||||
pass
|
||||
|
||||
|
||||
class AISystemUpdate(BaseModel):
|
||||
name: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
purpose: Optional[str] = None
|
||||
sector: Optional[str] = None
|
||||
classification: Optional[str] = None
|
||||
status: Optional[str] = None
|
||||
obligations: Optional[List[str]] = None
|
||||
|
||||
|
||||
class AISystemResponse(AISystemBase):
|
||||
id: str
|
||||
assessment_date: Optional[datetime] = None
|
||||
assessment_result: Optional[Dict[str, Any]] = None
|
||||
risk_factors: Optional[List[Dict[str, Any]]] = None
|
||||
recommendations: Optional[List[str]] = None
|
||||
created_at: datetime
|
||||
updated_at: datetime
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
|
||||
class AISystemListResponse(BaseModel):
|
||||
systems: List[AISystemResponse]
|
||||
total: int
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Dashboard & Export Schemas
|
||||
# ============================================================================
|
||||
|
||||
Reference in New Issue
Block a user