diff --git a/admin-compliance/app/sdk/compliance-hub/page.tsx b/admin-compliance/app/sdk/compliance-hub/page.tsx index e13f5ed..bdc5174 100644 --- a/admin-compliance/app/sdk/compliance-hub/page.tsx +++ b/admin-compliance/app/sdk/compliance-hub/page.tsx @@ -83,10 +83,10 @@ export default function ComplianceHubPage() { setError(null) try { const [dashboardRes, regulationsRes, mappingsRes, findingsRes] = await Promise.all([ - fetch('/api/admin/compliance/dashboard'), - fetch('/api/admin/compliance/regulations'), - fetch('/api/admin/compliance/mappings'), - fetch('/api/admin/compliance/isms/findings/summary'), + fetch('/api/sdk/v1/compliance/dashboard'), + fetch('/api/sdk/v1/compliance/regulations'), + fetch('/api/sdk/v1/compliance/mappings'), + fetch('/api/sdk/v1/isms/findings?status=open'), ]) if (dashboardRes.ok) { @@ -115,7 +115,7 @@ export default function ComplianceHubPage() { const seedDatabase = async () => { setSeeding(true) try { - const res = await fetch('/api/admin/compliance/seed', { + const res = await fetch('/api/sdk/v1/compliance/seed', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ force: false }), diff --git a/admin-compliance/app/sdk/isms/page.tsx b/admin-compliance/app/sdk/isms/page.tsx index a7e946c..7b80ec5 100644 --- a/admin-compliance/app/sdk/isms/page.tsx +++ b/admin-compliance/app/sdk/isms/page.tsx @@ -1,7 +1,6 @@ 'use client' import React, { useState, useEffect, useCallback } from 'react' -import { useSDK } from '@/lib/sdk' // ============================================================================= // TYPES @@ -1220,12 +1219,6 @@ const TABS: { id: TabId; label: string }[] = [ export default function ISMSPage() { const [tab, setTab] = useState('overview') - const { markStepCompleted } = useSDK() - - useEffect(() => { - markStepCompleted('isms') - }, [markStepCompleted]) - return (
diff --git a/backend-compliance/compliance/api/ai_routes.py b/backend-compliance/compliance/api/ai_routes.py index 777c844..7be91f0 100644 --- a/backend-compliance/compliance/api/ai_routes.py +++ b/backend-compliance/compliance/api/ai_routes.py @@ -1,48 +1,23 @@ """ -FastAPI routes for AI Compliance Assistant. +FastAPI routes for AI Act Compliance — AI System CRUD. Endpoints: -- /ai/status: Get AI provider status -- /ai/interpret: Interpret a requirement -- /ai/suggest-controls: Get AI-suggested controls -- /ai/assess-risk: Assess module risk -- /ai/gap-analysis: Analyze coverage gaps -- /ai/batch-interpret: Batch interpret requirements -- /ai/auto-map-controls: Auto-map controls to requirements -- /ai/batch-map-controls: Batch map controls -- /ai/switch-provider: Switch LLM provider -- /ai/providers: List available providers -- /pdf/*: PDF extraction endpoints +- /ai/systems: List/Create AI systems +- /ai/systems/{id}: Get/Update/Delete AI system +- /ai/systems/{id}/assess: Run AI Act risk assessment """ import logging -import os -from typing import Optional, List +from typing import Optional -from pydantic import BaseModel -from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks +from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.orm import Session from classroom_engine.database import get_db -from ..db import ( - RegulationRepository, - RequirementRepository, - ControlRepository, -) -from ..db.models import RegulationDB, RequirementDB, AISystemDB, AIClassificationEnum, AISystemStatusEnum +from ..db.models import AISystemDB, AIClassificationEnum, AISystemStatusEnum from .schemas import ( - # AI Assistant schemas - AIInterpretationRequest, AIInterpretationResponse, - AIBatchInterpretationRequest, AIBatchInterpretationResponse, - AIControlSuggestionRequest, AIControlSuggestionResponse, AIControlSuggestionItem, - AIRiskAssessmentRequest, AIRiskAssessmentResponse, AIRiskFactor, - AIGapAnalysisRequest, AIGapAnalysisResponse, - AIStatusResponse, - # AI System schemas AISystemCreate, AISystemUpdate, AISystemResponse, AISystemListResponse, - # PDF extraction schemas - BSIAspectResponse, PDFExtractionResponse, ) logger = logging.getLogger(__name__) @@ -402,865 +377,3 @@ def _derive_obligations(classification: str) -> list: ], } return obligations_map.get(classification, []) - - -# ============================================================================ -# AI Assistant Endpoints (Sprint 4) -# ============================================================================ - -@router.get("/ai/status", response_model=AIStatusResponse) -async def get_ai_status(): - """Get the status of the AI provider.""" - from ..services.llm_provider import get_shared_provider, LLMProviderType - - try: - provider = get_shared_provider() - return AIStatusResponse( - provider=provider.provider_name, - model=provider.config.model, - is_available=True, - is_mock=provider.provider_name == "mock", - error=None, - ) - except Exception as e: - return AIStatusResponse( - provider="unknown", - model="unknown", - is_available=False, - is_mock=True, - error=str(e), - ) - - -@router.post("/ai/interpret", response_model=AIInterpretationResponse) -async def interpret_requirement( - request: AIInterpretationRequest, - db: Session = Depends(get_db), -): - """Generate AI interpretation for a requirement.""" - from ..services.ai_compliance_assistant import get_ai_assistant - - # Get requirement from DB - req_repo = RequirementRepository(db) - requirement = req_repo.get_by_id(request.requirement_id) - - if not requirement: - raise HTTPException(status_code=404, detail=f"Requirement {request.requirement_id} not found") - - # Get regulation info - reg_repo = RegulationRepository(db) - regulation = reg_repo.get_by_id(requirement.regulation_id) - - try: - assistant = get_ai_assistant() - result = await assistant.interpret_requirement( - requirement_id=requirement.id, - article=requirement.article, - title=requirement.title, - requirement_text=requirement.requirement_text or requirement.description or "", - regulation_code=regulation.code if regulation else "UNKNOWN", - regulation_name=regulation.name if regulation else "Unknown Regulation", - ) - - return AIInterpretationResponse( - requirement_id=result.requirement_id, - summary=result.summary, - applicability=result.applicability, - technical_measures=result.technical_measures, - affected_modules=result.affected_modules, - risk_level=result.risk_level, - implementation_hints=result.implementation_hints, - confidence_score=result.confidence_score, - error=result.error, - ) - except Exception as e: - logger.error(f"AI interpretation failed: {e}") - raise HTTPException(status_code=500, detail=str(e)) - - -@router.post("/ai/suggest-controls", response_model=AIControlSuggestionResponse) -async def suggest_controls( - request: AIControlSuggestionRequest, - db: Session = Depends(get_db), -): - """Get AI-suggested controls for a requirement.""" - from ..services.ai_compliance_assistant import get_ai_assistant - - # Get requirement from DB - req_repo = RequirementRepository(db) - requirement = req_repo.get_by_id(request.requirement_id) - - if not requirement: - raise HTTPException(status_code=404, detail=f"Requirement {request.requirement_id} not found") - - # Get regulation info - reg_repo = RegulationRepository(db) - regulation = reg_repo.get_by_id(requirement.regulation_id) - - try: - assistant = get_ai_assistant() - suggestions = await assistant.suggest_controls( - requirement_title=requirement.title, - requirement_text=requirement.requirement_text or requirement.description or "", - regulation_name=regulation.name if regulation else "Unknown", - affected_modules=[], # Could be populated from previous interpretation - ) - - return AIControlSuggestionResponse( - requirement_id=request.requirement_id, - suggestions=[ - AIControlSuggestionItem( - control_id=s.control_id, - domain=s.domain, - title=s.title, - description=s.description, - pass_criteria=s.pass_criteria, - implementation_guidance=s.implementation_guidance, - is_automated=s.is_automated, - automation_tool=s.automation_tool, - priority=s.priority, - confidence_score=s.confidence_score, - ) - for s in suggestions - ], - ) - except Exception as e: - logger.error(f"AI control suggestion failed: {e}") - raise HTTPException(status_code=500, detail=str(e)) - - -@router.post("/ai/assess-risk", response_model=AIRiskAssessmentResponse) -async def assess_module_risk( - request: AIRiskAssessmentRequest, - db: Session = Depends(get_db), -): - """Get AI risk assessment for a service module.""" - from ..services.ai_compliance_assistant import get_ai_assistant - from ..db.repository import ServiceModuleRepository - - # Get module from DB - module_repo = ServiceModuleRepository(db) - module = module_repo.get_by_id(request.module_id) - - if not module: - module = module_repo.get_by_name(request.module_id) - - if not module: - raise HTTPException(status_code=404, detail=f"Module {request.module_id} not found") - - # Get regulations for this module - module_detail = module_repo.get_with_regulations(module.id) - regulations = [] - if module_detail and module_detail.get("regulation_mappings"): - for mapping in module_detail["regulation_mappings"]: - regulations.append({ - "code": mapping.get("regulation_code", ""), - "relevance": mapping.get("relevance_level", "medium"), - }) - - try: - assistant = get_ai_assistant() - result = await assistant.assess_module_risk( - module_name=module.name, - service_type=module.service_type.value if module.service_type else "unknown", - description=module.description or "", - processes_pii=module.processes_pii, - ai_components=module.ai_components, - criticality=module.criticality or "medium", - data_categories=module.data_categories or [], - regulations=regulations, - ) - - return AIRiskAssessmentResponse( - module_name=result.module_name, - overall_risk=result.overall_risk, - risk_factors=[ - AIRiskFactor( - factor=f.get("factor", ""), - severity=f.get("severity", "medium"), - likelihood=f.get("likelihood", "medium"), - ) - for f in result.risk_factors - ], - recommendations=result.recommendations, - compliance_gaps=result.compliance_gaps, - confidence_score=result.confidence_score, - ) - except Exception as e: - logger.error(f"AI risk assessment failed: {e}") - raise HTTPException(status_code=500, detail=str(e)) - - -@router.post("/ai/gap-analysis", response_model=AIGapAnalysisResponse) -async def analyze_gap( - request: AIGapAnalysisRequest, - db: Session = Depends(get_db), -): - """Analyze coverage gaps between a requirement and existing controls.""" - from ..services.ai_compliance_assistant import get_ai_assistant - - # Get requirement from DB - req_repo = RequirementRepository(db) - requirement = req_repo.get_by_id(request.requirement_id) - - if not requirement: - raise HTTPException(status_code=404, detail=f"Requirement {request.requirement_id} not found") - - # Get regulation info - reg_repo = RegulationRepository(db) - regulation = reg_repo.get_by_id(requirement.regulation_id) - - # Get existing control mappings from eager-loaded relationship - ctrl_repo = ControlRepository(db) - existing_controls = [] - - if requirement.control_mappings: - for mapping in requirement.control_mappings: - if mapping.control: - existing_controls.append({ - "control_id": mapping.control.control_id, - "title": mapping.control.title, - "status": mapping.control.status.value if mapping.control.status else "unknown", - }) - - try: - assistant = get_ai_assistant() - result = await assistant.analyze_gap( - requirement_id=requirement.id, - requirement_title=requirement.title, - requirement_text=requirement.requirement_text or requirement.description or "", - regulation_code=regulation.code if regulation else "UNKNOWN", - existing_controls=existing_controls, - ) - - return AIGapAnalysisResponse( - requirement_id=result.requirement_id, - requirement_title=result.requirement_title, - coverage_level=result.coverage_level, - existing_controls=result.existing_controls, - missing_coverage=result.missing_coverage, - suggested_actions=result.suggested_actions, - ) - except Exception as e: - logger.error(f"AI gap analysis failed: {e}") - raise HTTPException(status_code=500, detail=str(e)) - - -@router.post("/ai/batch-interpret", response_model=AIBatchInterpretationResponse) -async def batch_interpret_requirements( - request: AIBatchInterpretationRequest, - background_tasks: BackgroundTasks, - db: Session = Depends(get_db), -): - """ - Batch interpret multiple requirements. - - For large batches, this runs in the background and returns immediately. - """ - from ..services.ai_compliance_assistant import get_ai_assistant - - req_repo = RequirementRepository(db) - reg_repo = RegulationRepository(db) - - # Build list of requirements to process - requirements_to_process = [] - - if request.requirement_ids: - for req_id in request.requirement_ids: - req = req_repo.get_by_id(req_id) - if req: - reg = reg_repo.get_by_id(req.regulation_id) - requirements_to_process.append({ - "id": req.id, - "article": req.article, - "title": req.title, - "requirement_text": req.requirement_text or req.description or "", - "regulation_code": reg.code if reg else "UNKNOWN", - "regulation_name": reg.name if reg else "Unknown", - }) - - elif request.regulation_code: - # Get all requirements for a regulation - reg = reg_repo.get_by_code(request.regulation_code) - if reg: - reqs = req_repo.get_by_regulation(reg.id) - for req in reqs[:50]: # Limit to 50 for batch processing - requirements_to_process.append({ - "id": req.id, - "article": req.article, - "title": req.title, - "requirement_text": req.requirement_text or req.description or "", - "regulation_code": reg.code, - "regulation_name": reg.name, - }) - - if not requirements_to_process: - raise HTTPException(status_code=400, detail="No requirements found to process") - - # For small batches, process synchronously - if len(requirements_to_process) <= 5: - assistant = get_ai_assistant() - results = await assistant.batch_interpret_requirements( - requirements_to_process, - rate_limit=request.rate_limit, - ) - - return AIBatchInterpretationResponse( - total=len(requirements_to_process), - processed=len(results), - interpretations=[ - AIInterpretationResponse( - requirement_id=r.requirement_id, - summary=r.summary, - applicability=r.applicability, - technical_measures=r.technical_measures, - affected_modules=r.affected_modules, - risk_level=r.risk_level, - implementation_hints=r.implementation_hints, - confidence_score=r.confidence_score, - error=r.error, - ) - for r in results - ], - ) - - # For large batches, return immediately with info - # (Background processing would be added in a production version) - return AIBatchInterpretationResponse( - total=len(requirements_to_process), - processed=0, - interpretations=[], - ) - - -# ============================================================================ -# PDF Extraction (Sprint 2) -# ============================================================================ - -@router.get("/pdf/available") -async def list_available_pdfs(): - """List available PDF documents for extraction.""" - from pathlib import Path - - docs_path = Path("/app/docs") if Path("/app/docs").exists() else Path("docs") - - available = [] - bsi_files = list(docs_path.glob("BSI-TR-*.pdf")) - - for pdf_file in bsi_files: - available.append({ - "filename": pdf_file.name, - "path": str(pdf_file), - "size_bytes": pdf_file.stat().st_size, - "type": "bsi_standard", - }) - - return { - "available_pdfs": available, - "total": len(available), - } - - -@router.post("/pdf/extract/{doc_code}", response_model=PDFExtractionResponse) -async def extract_pdf_requirements( - doc_code: str, - save_to_db: bool = Query(True, description="Save extracted requirements to database"), - db: Session = Depends(get_db), -): - """ - Extract requirements/aspects from a BSI-TR PDF document. - - doc_code examples: - - BSI-TR-03161-1: General security requirements - - BSI-TR-03161-2: Web application security - - BSI-TR-03161-3: Backend/server security - """ - from pathlib import Path - from ..services.pdf_extractor import BSIPDFExtractor - from ..db.models import RegulationTypeEnum - - # Find the PDF file - docs_path = Path("/app/docs") if Path("/app/docs").exists() else Path("docs") - pdf_path = docs_path / f"{doc_code}.pdf" - - if not pdf_path.exists(): - raise HTTPException(status_code=404, detail=f"PDF not found: {doc_code}.pdf") - - # Extract aspects - extractor = BSIPDFExtractor() - try: - aspects = extractor.extract_from_file(str(pdf_path), source_name=doc_code) - except Exception as e: - logger.error(f"PDF extraction failed: {e}") - raise HTTPException(status_code=500, detail=f"PDF extraction failed: {str(e)}") - - # Find or create the regulation - reg_repo = RegulationRepository(db) - regulation = reg_repo.get_by_code(doc_code) - - if not regulation: - regulation = reg_repo.create( - code=doc_code, - name=f"BSI Technical Guideline {doc_code.split('-')[-1]}", - full_name=f"BSI Technische Richtlinie {doc_code}", - regulation_type=RegulationTypeEnum.BSI_STANDARD, - local_pdf_path=str(pdf_path), - ) - - # Save to database if requested - saved_count = 0 - if save_to_db and aspects: - req_repo = RequirementRepository(db) - for aspect in aspects: - # Check if requirement already exists - existing = db.query(RequirementDB).filter( - RequirementDB.regulation_id == regulation.id, - RequirementDB.article == aspect.aspect_id, - ).first() - - if not existing: - try: - req_repo.create( - regulation_id=regulation.id, - article=aspect.aspect_id, - title=aspect.title[:300] if aspect.title else "", - description=f"Category: {aspect.category.value}", - requirement_text=aspect.full_text[:4000] if aspect.full_text else "", - priority=1 if aspect.requirement_level.value == "MUSS" else ( - 2 if aspect.requirement_level.value == "SOLL" else 3 - ), - ) - saved_count += 1 - except Exception as e: - logger.warning(f"Failed to save aspect {aspect.aspect_id}: {e}") - - db.commit() - - # Convert aspects to response format - aspect_responses = [ - BSIAspectResponse( - aspect_id=a.aspect_id, - title=a.title, - full_text=a.full_text, - category=a.category.value, - page_number=a.page_number, - section=a.section, - requirement_level=a.requirement_level.value, - source_document=a.source_document, - ) - for a in aspects - ] - - return PDFExtractionResponse( - doc_code=doc_code, - total_extracted=len(aspects), - saved_to_db=saved_count, - aspects=aspect_responses, - ) - - -@router.get("/pdf/extraction-stats") -async def get_extraction_stats(db: Session = Depends(get_db)): - """Get statistics about extracted PDF requirements.""" - from sqlalchemy import func - - # Count requirements per BSI regulation - stats = ( - db.query( - RegulationDB.code, - func.count(RequirementDB.id).label('count') - ) - .join(RequirementDB, RequirementDB.regulation_id == RegulationDB.id) - .filter(RegulationDB.code.like('BSI-%')) - .group_by(RegulationDB.code) - .all() - ) - - return { - "bsi_requirements": {code: count for code, count in stats}, - "total_bsi_requirements": sum(count for _, count in stats), - } - - -# ============================================================================ -# Automatic Control Mapping -# ============================================================================ - -# Domain keyword mapping for automatic control assignment -DOMAIN_KEYWORDS = { - "priv": ["datenschutz", "dsgvo", "gdpr", "privacy", "personenbezogen", "einwilligung", - "consent", "betroffenenrechte", "verarbeitungsverzeichnis", "pii", "auftragsverarbeitung"], - "iam": ["authentifizierung", "auth", "login", "passwort", "password", "zugang", "access", - "berechtigung", "session", "token", "jwt", "oauth", "sso", "mfa", "2fa", "rbac"], - "crypto": ["verschlüsselung", "encryption", "kryptograph", "crypto", "hash", "schlüssel", - "key", "tls", "ssl", "zertifikat", "signatur", "aes", "rsa"], - "sdlc": ["entwicklung", "code", "software", "sast", "dast", "dependency", "vulnerable", - "cve", "security scan", "semgrep", "trivy", "sbom", "ci/cd", "build"], - "ops": ["monitoring", "logging", "log", "protokoll", "backup", "incident", "alert", - "availability", "uptime", "patch", "update", "deployment"], - "ai": ["künstliche intelligenz", "ki", "ai", "machine learning", "ml", "modell", - "training", "inference", "bias", "ai act", "hochrisiko"], - "cra": ["vulnerability", "schwachstelle", "disclosure", "patch", "eol", "end-of-life", - "supply chain", "sbom", "cve", "update"], - "gov": ["richtlinie", "policy", "governance", "verantwortlich", "raci", "dokumentation", - "prozess", "awareness", "schulung", "training"], - "aud": ["audit", "prüfung", "nachweis", "evidence", "traceability", "nachvollzieh", - "protokoll", "export", "report"], -} - - -@router.post("/ai/auto-map-controls") -async def auto_map_controls( - requirement_id: str = Query(..., description="Requirement UUID"), - save_to_db: bool = Query(True, description="Save mappings to database"), - use_ai: bool = Query(False, description="Use AI for better matching (slower)"), - db: Session = Depends(get_db), -): - """ - Automatically map controls to a requirement. - - Uses keyword matching by default (fast) or AI for better accuracy (slower). - """ - from ..db.models import ControlMappingDB - - # Get requirement - req_repo = RequirementRepository(db) - requirement = req_repo.get_by_id(requirement_id) - if not requirement: - raise HTTPException(status_code=404, detail=f"Requirement {requirement_id} not found") - - # Get all controls - ctrl_repo = ControlRepository(db) - all_controls = ctrl_repo.get_all() - - # Text to analyze - text_to_analyze = f"{requirement.title} {requirement.requirement_text or ''} {requirement.description or ''}" - text_lower = text_to_analyze.lower() - - matched_controls = [] - - if use_ai: - # Use AI for matching (slower but more accurate) - from ..services.ai_compliance_assistant import get_ai_assistant - assistant = get_ai_assistant() - - reg_repo = RegulationRepository(db) - regulation = reg_repo.get_by_id(requirement.regulation_id) - - try: - suggestions = await assistant.suggest_controls( - requirement_title=requirement.title, - requirement_text=requirement.requirement_text or "", - regulation_name=regulation.name if regulation else "Unknown", - affected_modules=[], - ) - - # Match suggestions to existing controls by domain - for suggestion in suggestions: - domain = suggestion.domain.lower() - domain_controls = [c for c in all_controls if c.domain and c.domain.value.lower() == domain] - if domain_controls: - # Take the first matching control from this domain - matched_controls.append({ - "control": domain_controls[0], - "coverage": "partial", - "notes": f"AI suggested: {suggestion.title}", - "confidence": suggestion.confidence_score, - }) - except Exception as e: - logger.warning(f"AI mapping failed, falling back to keyword matching: {e}") - use_ai = False # Fall back to keyword matching - - if not use_ai: - # Keyword-based matching (fast) - domain_scores = {} - for domain, keywords in DOMAIN_KEYWORDS.items(): - score = sum(1 for kw in keywords if kw.lower() in text_lower) - if score > 0: - domain_scores[domain] = score - - # Sort domains by score - sorted_domains = sorted(domain_scores.items(), key=lambda x: x[1], reverse=True) - - # Take top 3 domains - for domain, score in sorted_domains[:3]: - domain_controls = [c for c in all_controls if c.domain and c.domain.value.lower() == domain] - for ctrl in domain_controls[:2]: # Max 2 controls per domain - matched_controls.append({ - "control": ctrl, - "coverage": "partial" if score < 3 else "full", - "notes": f"Keyword match (score: {score})", - "confidence": min(0.9, 0.5 + score * 0.1), - }) - - # Save mappings to database if requested - created_mappings = [] - if save_to_db and matched_controls: - for match in matched_controls: - ctrl = match["control"] - - # Check if mapping already exists - existing = db.query(ControlMappingDB).filter( - ControlMappingDB.requirement_id == requirement_id, - ControlMappingDB.control_id == ctrl.id, - ).first() - - if not existing: - mapping = ControlMappingDB( - requirement_id=requirement_id, - control_id=ctrl.id, - coverage_level=match["coverage"], - notes=match["notes"], - ) - db.add(mapping) - created_mappings.append({ - "control_id": ctrl.control_id, - "domain": ctrl.domain.value if ctrl.domain else None, - "title": ctrl.title, - "coverage_level": match["coverage"], - "notes": match["notes"], - }) - - db.commit() - - return { - "requirement_id": requirement_id, - "requirement_title": requirement.title, - "matched_controls": len(matched_controls), - "created_mappings": len(created_mappings), - "mappings": created_mappings if save_to_db else [ - { - "control_id": m["control"].control_id, - "domain": m["control"].domain.value if m["control"].domain else None, - "title": m["control"].title, - "coverage_level": m["coverage"], - "confidence": m.get("confidence", 0.7), - } - for m in matched_controls - ], - } - - -@router.post("/ai/batch-map-controls") -async def batch_map_controls( - regulation_code: Optional[str] = Query(None, description="Filter by regulation code"), - limit: int = Query(100, description="Max requirements to process"), - use_ai: bool = Query(False, description="Use AI for matching (slower)"), - background_tasks: BackgroundTasks = None, - db: Session = Depends(get_db), -): - """ - Batch map controls to multiple requirements. - - Processes requirements that don't have mappings yet. - """ - from ..db.models import ControlMappingDB - - # Get requirements without mappings - req_repo = RequirementRepository(db) - - if regulation_code: - reg_repo = RegulationRepository(db) - regulation = reg_repo.get_by_code(regulation_code) - if not regulation: - raise HTTPException(status_code=404, detail=f"Regulation {regulation_code} not found") - all_requirements = req_repo.get_by_regulation(regulation.id) - else: - all_requirements = req_repo.get_all() - - # Filter to requirements without mappings - requirements_without_mappings = [] - for req in all_requirements: - existing = db.query(ControlMappingDB).filter( - ControlMappingDB.requirement_id == req.id - ).first() - if not existing: - requirements_without_mappings.append(req) - - # Limit processing - to_process = requirements_without_mappings[:limit] - - # Get all controls once - ctrl_repo = ControlRepository(db) - all_controls = ctrl_repo.get_all() - - # Process each requirement - results = [] - for req in to_process: - try: - text_to_analyze = f"{req.title} {req.requirement_text or ''} {req.description or ''}" - text_lower = text_to_analyze.lower() - - # Quick keyword matching - domain_scores = {} - for domain, keywords in DOMAIN_KEYWORDS.items(): - score = sum(1 for kw in keywords if kw.lower() in text_lower) - if score > 0: - domain_scores[domain] = score - - if domain_scores: - # Get top domain - top_domain = max(domain_scores.items(), key=lambda x: x[1])[0] - domain_controls = [c for c in all_controls if c.domain and c.domain.value.lower() == top_domain] - - if domain_controls: - ctrl = domain_controls[0] - - # Create mapping - mapping = ControlMappingDB( - requirement_id=req.id, - control_id=ctrl.id, - coverage_level="partial", - notes=f"Auto-mapped (domain: {top_domain})", - ) - db.add(mapping) - - results.append({ - "requirement_id": req.id, - "requirement_title": req.title[:50], - "control_id": ctrl.control_id, - "domain": top_domain, - }) - except Exception as e: - logger.warning(f"Failed to map requirement {req.id}: {e}") - - db.commit() - - return { - "processed": len(to_process), - "mapped": len(results), - "remaining": len(requirements_without_mappings) - len(to_process), - "mappings": results[:20], # Only return first 20 for readability - } - - -# ============================================================================ -# LLM Provider Switch Endpoints (Runtime Configuration) -# ============================================================================ - -class ProviderSwitchRequest(BaseModel): - """Request to switch LLM provider at runtime.""" - provider: str # "anthropic" or "self_hosted" - model: Optional[str] = None # Optional: override model - url: Optional[str] = None # Optional: override URL for self-hosted - - -class ProviderSwitchResponse(BaseModel): - """Response after switching LLM provider.""" - success: bool - previous_provider: str - new_provider: str - model: str - url: Optional[str] = None - message: str - - -@router.post("/ai/switch-provider", response_model=ProviderSwitchResponse) -async def switch_llm_provider(request: ProviderSwitchRequest): - """ - Switch the LLM provider at runtime between Anthropic API and Self-Hosted (Ollama). - - This allows developers to toggle between: - - **anthropic**: Cloud-based Claude API (kostenpflichtig, Daten gehen zu Anthropic) - - **self_hosted**: Self-hosted Ollama on Mac Mini (kostenlos, DSGVO-konform, Daten bleiben intern) - - Note: This change is temporary for the current container session. - For permanent changes, modify the docker-compose.yml environment variables. - """ - from ..services.llm_provider import ( - reset_shared_provider, - get_shared_provider, - LLMProviderType, - ) - - try: - # Get current provider info before switch - old_provider = get_shared_provider() - old_provider_name = old_provider.provider_name - - # Map string to enum - provider_map = { - "anthropic": LLMProviderType.ANTHROPIC, - "self_hosted": LLMProviderType.SELF_HOSTED, - "mock": LLMProviderType.MOCK, - } - - if request.provider.lower() not in provider_map: - raise HTTPException( - status_code=400, - detail=f"Invalid provider: {request.provider}. Use 'anthropic' or 'self_hosted'" - ) - - # Update environment variables for the new provider - os.environ["COMPLIANCE_LLM_PROVIDER"] = request.provider.lower() - - if request.provider.lower() == "self_hosted": - if request.url: - os.environ["SELF_HOSTED_LLM_URL"] = request.url - if request.model: - os.environ["SELF_HOSTED_LLM_MODEL"] = request.model - else: - # Default to llama3.1:70b for compliance tasks - os.environ["SELF_HOSTED_LLM_MODEL"] = os.environ.get( - "SELF_HOSTED_LLM_MODEL", "llama3.1:70b" - ) - elif request.provider.lower() == "anthropic": - if request.model: - os.environ["ANTHROPIC_MODEL"] = request.model - - # Reset the shared provider to pick up new config - reset_shared_provider() - - # Get the new provider - new_provider = get_shared_provider() - - return ProviderSwitchResponse( - success=True, - previous_provider=old_provider_name, - new_provider=new_provider.provider_name, - model=new_provider.config.model, - url=new_provider.config.base_url if hasattr(new_provider.config, 'base_url') else None, - message=f"Successfully switched from {old_provider_name} to {new_provider.provider_name}", - ) - - except Exception as e: - logger.error(f"Failed to switch LLM provider: {e}") - raise HTTPException(status_code=500, detail=str(e)) - - -@router.get("/ai/providers") -async def list_available_providers(): - """ - List available LLM providers with their descriptions. - - This helps developers understand which provider to use for which scenario. - """ - return { - "providers": [ - { - "id": "anthropic", - "name": "Anthropic Claude API", - "description_de": "Cloud-basierte KI von Anthropic. Kostenpflichtig (API-Credits). Daten werden zur Verarbeitung an Anthropic gesendet.", - "description_en": "Cloud-based AI from Anthropic. Paid service (API credits). Data is sent to Anthropic for processing.", - "gdpr_compliant": False, - "data_location": "Anthropic Cloud (USA)", - "cost": "Kostenpflichtig pro Token", - "use_case": "Produktiv, wenn hohe Qualitaet benoetigt wird", - "models": ["claude-sonnet-4-20250514", "claude-3-5-sonnet-20241022"], - }, - { - "id": "self_hosted", - "name": "Self-Hosted Ollama", - "description_de": "Lokales LLM auf dem Mac Mini M4 Pro (64GB RAM). Kostenlos. Alle Daten bleiben intern - DSGVO-konform!", - "description_en": "Local LLM on Mac Mini M4 Pro (64GB RAM). Free. All data stays internal - GDPR compliant!", - "gdpr_compliant": True, - "data_location": "Lokal auf Mac Mini", - "cost": "Kostenlos (Hardware bereits vorhanden)", - "use_case": "Entwicklung, Testing, DSGVO-sensitive Dokumente", - "models": ["llama3.1:70b", "llama3.2-vision", "mixtral:8x7b"], - }, - ], - "current_provider": None, # Will be filled by get_ai_status - "note_de": "Umschaltung erfolgt sofort, aber nur fuer diese Container-Session. Fuer permanente Aenderung docker-compose.yml anpassen.", - "note_en": "Switch takes effect immediately but only for this container session. For permanent change, modify docker-compose.yml.", - } diff --git a/backend-compliance/compliance/api/schemas.py b/backend-compliance/compliance/api/schemas.py index 059ccfa..c132e10 100644 --- a/backend-compliance/compliance/api/schemas.py +++ b/backend-compliance/compliance/api/schemas.py @@ -664,114 +664,6 @@ class ModuleComplianceOverview(BaseModel): regulations_coverage: Dict[str, int] # regulation_code -> module_count -# ============================================================================ -# AI Assistant Schemas (Sprint 4) -# ============================================================================ - -class AIInterpretationRequest(BaseModel): - """Request for AI interpretation of a requirement.""" - requirement_id: str - force_refresh: bool = False - - -class AIInterpretationResponse(BaseModel): - """AI-generated interpretation of a requirement.""" - requirement_id: str - summary: str - applicability: str - technical_measures: List[str] - affected_modules: List[str] - risk_level: str - implementation_hints: List[str] - confidence_score: float - error: Optional[str] = None - - -class AIBatchInterpretationRequest(BaseModel): - """Request for batch interpretation of requirements.""" - requirement_ids: List[str] - regulation_code: Optional[str] = None - rate_limit: float = 1.0 - - -class AIBatchInterpretationResponse(BaseModel): - """Response for batch interpretation.""" - total: int - processed: int - interpretations: List[AIInterpretationResponse] - - -class AIControlSuggestionRequest(BaseModel): - """Request for AI control suggestions.""" - requirement_id: str - - -class AIControlSuggestionItem(BaseModel): - """A single control suggestion from AI.""" - control_id: str - domain: str - title: str - description: str - pass_criteria: str - implementation_guidance: str - is_automated: bool - automation_tool: Optional[str] = None - priority: str - confidence_score: float - - -class AIControlSuggestionResponse(BaseModel): - """Response with AI control suggestions.""" - requirement_id: str - suggestions: List[AIControlSuggestionItem] - - -class AIRiskAssessmentRequest(BaseModel): - """Request for AI risk assessment of a module.""" - module_id: str - - -class AIRiskFactor(BaseModel): - """A risk factor in the assessment.""" - factor: str - severity: str - likelihood: str - - -class AIRiskAssessmentResponse(BaseModel): - """AI-generated risk assessment.""" - module_name: str - overall_risk: str - risk_factors: List[AIRiskFactor] - recommendations: List[str] - compliance_gaps: List[str] - confidence_score: float - - -class AIGapAnalysisRequest(BaseModel): - """Request for gap analysis.""" - requirement_id: str - - -class AIGapAnalysisResponse(BaseModel): - """AI-generated gap analysis.""" - requirement_id: str - requirement_title: str - coverage_level: str - existing_controls: List[str] - missing_coverage: List[str] - suggested_actions: List[str] - - -class AIStatusResponse(BaseModel): - """Status of the AI provider.""" - provider: str - model: str - is_available: bool - is_mock: bool - error: Optional[str] = None - - # ============================================================================ # Executive Dashboard Schemas (Phase 3 - Sprint 1) # ============================================================================