Files
breakpilot-compliance/backend-compliance/compliance/services/ai_compliance_assistant.py
Benjamin Boenisch 4435e7ea0a Initial commit: breakpilot-compliance - Compliance SDK Platform
Services: Admin-Compliance, Backend-Compliance,
AI-Compliance-SDK, Consent-SDK, Developer-Portal,
PCA-Platform, DSMS

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:28 +01:00

501 lines
16 KiB
Python

"""
AI Compliance Assistant for Breakpilot.
Provides AI-powered features for:
- Requirement interpretation (translating legal text to technical guidance)
- Control suggestions (recommending controls for requirements)
- Risk assessment (evaluating compliance risks)
- Gap analysis (identifying missing controls)
"""
import json
import logging
import re
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from enum import Enum
from .llm_provider import LLMProvider, get_shared_provider, LLMResponse
logger = logging.getLogger(__name__)
class InterpretationSection(str, Enum):
"""Sections in a requirement interpretation."""
SUMMARY = "summary"
APPLICABILITY = "applicability"
TECHNICAL_MEASURES = "technical_measures"
AFFECTED_MODULES = "affected_modules"
RISK_LEVEL = "risk_level"
IMPLEMENTATION_HINTS = "implementation_hints"
@dataclass
class RequirementInterpretation:
"""AI-generated interpretation of a regulatory requirement."""
requirement_id: str
summary: str
applicability: str
technical_measures: List[str]
affected_modules: List[str]
risk_level: str # low, medium, high, critical
implementation_hints: List[str]
confidence_score: float # 0.0 - 1.0
raw_response: Optional[str] = None
error: Optional[str] = None
@dataclass
class ControlSuggestion:
"""AI-suggested control for a requirement."""
control_id: str # Suggested ID like "PRIV-XXX"
domain: str # Control domain (priv, sdlc, iam, etc.)
title: str
description: str
pass_criteria: str
implementation_guidance: str
is_automated: bool
automation_tool: Optional[str] = None
priority: str = "medium" # low, medium, high, critical
confidence_score: float = 0.0
@dataclass
class RiskAssessment:
"""AI-generated risk assessment for a module."""
module_name: str
overall_risk: str # low, medium, high, critical
risk_factors: List[Dict[str, Any]]
recommendations: List[str]
compliance_gaps: List[str]
confidence_score: float = 0.0
@dataclass
class GapAnalysis:
"""Gap analysis result for requirement-control mapping."""
requirement_id: str
requirement_title: str
coverage_level: str # full, partial, none
existing_controls: List[str]
missing_coverage: List[str]
suggested_actions: List[str]
class AIComplianceAssistant:
"""
AI-powered compliance assistant using LLM providers.
Supports both Claude API and self-hosted LLMs through the
abstracted LLMProvider interface.
"""
# System prompts for different tasks
SYSTEM_PROMPT_BASE = """Du bist ein Compliance-Experte für die Breakpilot Bildungsplattform.
Breakpilot ist ein EdTech SaaS-System mit folgenden Eigenschaften:
- KI-gestützte Klausurkorrektur und Feedback
- Videokonferenzen (Jitsi) und Chat (Matrix)
- Schulverwaltung mit Noten und Zeugnissen
- Consent-Management und DSGVO-Compliance
- Self-Hosted in Deutschland
Du analysierst regulatorische Anforderungen und gibst konkrete technische Empfehlungen."""
INTERPRETATION_PROMPT = """Analysiere folgende regulatorische Anforderung für Breakpilot:
Verordnung: {regulation_name} ({regulation_code})
Artikel: {article}
Titel: {title}
Originaltext: {requirement_text}
Erstelle eine strukturierte Analyse im JSON-Format:
{{
"summary": "Kurze Zusammenfassung in 2-3 Sätzen",
"applicability": "Erklärung wie dies auf Breakpilot anwendbar ist",
"technical_measures": ["Liste konkreter technischer Maßnahmen"],
"affected_modules": ["Liste betroffener Breakpilot-Module (z.B. consent-service, klausur-service, matrix-synapse)"],
"risk_level": "low|medium|high|critical",
"implementation_hints": ["Konkrete Implementierungshinweise"]
}}
Gib NUR das JSON zurück, keine zusätzlichen Erklärungen."""
CONTROL_SUGGESTION_PROMPT = """Basierend auf folgender Anforderung, schlage passende Controls vor:
Verordnung: {regulation_name}
Anforderung: {requirement_title}
Beschreibung: {requirement_text}
Betroffene Module: {affected_modules}
Schlage 1-3 Controls im JSON-Format vor:
{{
"controls": [
{{
"control_id": "DOMAIN-XXX",
"domain": "priv|iam|sdlc|crypto|ops|ai|cra|gov|aud",
"title": "Kurzer Titel",
"description": "Beschreibung des Controls",
"pass_criteria": "Messbare Erfolgskriterien",
"implementation_guidance": "Wie implementieren",
"is_automated": true|false,
"automation_tool": "Tool-Name oder null",
"priority": "low|medium|high|critical"
}}
]
}}
Domains:
- priv: Datenschutz & Privacy (DSGVO)
- iam: Identity & Access Management
- sdlc: Secure Development Lifecycle
- crypto: Kryptografie
- ops: Betrieb & Monitoring
- ai: KI-spezifisch (AI Act)
- cra: Cyber Resilience Act
- gov: Governance
- aud: Audit & Nachvollziehbarkeit
Gib NUR das JSON zurück."""
RISK_ASSESSMENT_PROMPT = """Bewerte das Compliance-Risiko für folgendes Breakpilot-Modul:
Modul: {module_name}
Typ: {service_type}
Beschreibung: {description}
Verarbeitet PII: {processes_pii}
KI-Komponenten: {ai_components}
Kritikalität: {criticality}
Daten-Kategorien: {data_categories}
Zugeordnete Verordnungen: {regulations}
Erstelle eine Risikobewertung im JSON-Format:
{{
"overall_risk": "low|medium|high|critical",
"risk_factors": [
{{"factor": "Beschreibung", "severity": "low|medium|high", "likelihood": "low|medium|high"}}
],
"recommendations": ["Empfehlungen zur Risikominderung"],
"compliance_gaps": ["Identifizierte Compliance-Lücken"]
}}
Gib NUR das JSON zurück."""
GAP_ANALYSIS_PROMPT = """Analysiere die Control-Abdeckung für folgende Anforderung:
Anforderung: {requirement_title}
Verordnung: {regulation_code}
Beschreibung: {requirement_text}
Existierende Controls:
{existing_controls}
Bewerte die Abdeckung und identifiziere Lücken im JSON-Format:
{{
"coverage_level": "full|partial|none",
"covered_aspects": ["Was ist bereits abgedeckt"],
"missing_coverage": ["Was fehlt noch"],
"suggested_actions": ["Empfohlene Maßnahmen"]
}}
Gib NUR das JSON zurück."""
def __init__(self, llm_provider: Optional[LLMProvider] = None):
"""Initialize the assistant with an LLM provider."""
self.llm = llm_provider or get_shared_provider()
async def interpret_requirement(
self,
requirement_id: str,
article: str,
title: str,
requirement_text: str,
regulation_code: str,
regulation_name: str
) -> RequirementInterpretation:
"""
Generate an interpretation for a regulatory requirement.
Translates legal text into practical technical guidance
for the Breakpilot development team.
"""
prompt = self.INTERPRETATION_PROMPT.format(
regulation_name=regulation_name,
regulation_code=regulation_code,
article=article,
title=title,
requirement_text=requirement_text or "Kein Text verfügbar"
)
try:
response = await self.llm.complete(
prompt=prompt,
system_prompt=self.SYSTEM_PROMPT_BASE,
max_tokens=2000,
temperature=0.3
)
# Parse JSON response
data = self._parse_json_response(response.content)
return RequirementInterpretation(
requirement_id=requirement_id,
summary=data.get("summary", ""),
applicability=data.get("applicability", ""),
technical_measures=data.get("technical_measures", []),
affected_modules=data.get("affected_modules", []),
risk_level=data.get("risk_level", "medium"),
implementation_hints=data.get("implementation_hints", []),
confidence_score=0.85, # Based on model quality
raw_response=response.content
)
except Exception as e:
logger.error(f"Failed to interpret requirement {requirement_id}: {e}")
return RequirementInterpretation(
requirement_id=requirement_id,
summary="",
applicability="",
technical_measures=[],
affected_modules=[],
risk_level="medium",
implementation_hints=[],
confidence_score=0.0,
error=str(e)
)
async def suggest_controls(
self,
requirement_title: str,
requirement_text: str,
regulation_name: str,
affected_modules: List[str]
) -> List[ControlSuggestion]:
"""
Suggest controls for a given requirement.
Returns a list of control suggestions with implementation guidance.
"""
prompt = self.CONTROL_SUGGESTION_PROMPT.format(
regulation_name=regulation_name,
requirement_title=requirement_title,
requirement_text=requirement_text or "Keine Beschreibung",
affected_modules=", ".join(affected_modules) if affected_modules else "Alle Module"
)
try:
response = await self.llm.complete(
prompt=prompt,
system_prompt=self.SYSTEM_PROMPT_BASE,
max_tokens=2000,
temperature=0.4
)
data = self._parse_json_response(response.content)
controls = data.get("controls", [])
return [
ControlSuggestion(
control_id=c.get("control_id", "NEW-001"),
domain=c.get("domain", "gov"),
title=c.get("title", ""),
description=c.get("description", ""),
pass_criteria=c.get("pass_criteria", ""),
implementation_guidance=c.get("implementation_guidance", ""),
is_automated=c.get("is_automated", False),
automation_tool=c.get("automation_tool"),
priority=c.get("priority", "medium"),
confidence_score=0.75
)
for c in controls
]
except Exception as e:
logger.error(f"Failed to suggest controls: {e}")
return []
async def assess_module_risk(
self,
module_name: str,
service_type: str,
description: str,
processes_pii: bool,
ai_components: bool,
criticality: str,
data_categories: List[str],
regulations: List[Dict[str, str]]
) -> RiskAssessment:
"""
Assess the compliance risk for a service module.
"""
prompt = self.RISK_ASSESSMENT_PROMPT.format(
module_name=module_name,
service_type=service_type,
description=description or "Keine Beschreibung",
processes_pii="Ja" if processes_pii else "Nein",
ai_components="Ja" if ai_components else "Nein",
criticality=criticality,
data_categories=", ".join(data_categories) if data_categories else "Keine",
regulations=", ".join([f"{r['code']} ({r.get('relevance', 'medium')})" for r in regulations]) if regulations else "Keine"
)
try:
response = await self.llm.complete(
prompt=prompt,
system_prompt=self.SYSTEM_PROMPT_BASE,
max_tokens=1500,
temperature=0.3
)
data = self._parse_json_response(response.content)
return RiskAssessment(
module_name=module_name,
overall_risk=data.get("overall_risk", "medium"),
risk_factors=data.get("risk_factors", []),
recommendations=data.get("recommendations", []),
compliance_gaps=data.get("compliance_gaps", []),
confidence_score=0.8
)
except Exception as e:
logger.error(f"Failed to assess risk for {module_name}: {e}")
return RiskAssessment(
module_name=module_name,
overall_risk="unknown",
risk_factors=[],
recommendations=[],
compliance_gaps=[],
confidence_score=0.0
)
async def analyze_gap(
self,
requirement_id: str,
requirement_title: str,
requirement_text: str,
regulation_code: str,
existing_controls: List[Dict[str, str]]
) -> GapAnalysis:
"""
Analyze gaps between requirements and existing controls.
"""
controls_text = "\n".join([
f"- {c.get('control_id', 'N/A')}: {c.get('title', 'N/A')} - {c.get('status', 'N/A')}"
for c in existing_controls
]) if existing_controls else "Keine Controls zugeordnet"
prompt = self.GAP_ANALYSIS_PROMPT.format(
requirement_title=requirement_title,
regulation_code=regulation_code,
requirement_text=requirement_text or "Keine Beschreibung",
existing_controls=controls_text
)
try:
response = await self.llm.complete(
prompt=prompt,
system_prompt=self.SYSTEM_PROMPT_BASE,
max_tokens=1500,
temperature=0.3
)
data = self._parse_json_response(response.content)
return GapAnalysis(
requirement_id=requirement_id,
requirement_title=requirement_title,
coverage_level=data.get("coverage_level", "none"),
existing_controls=[c.get("control_id", "") for c in existing_controls],
missing_coverage=data.get("missing_coverage", []),
suggested_actions=data.get("suggested_actions", [])
)
except Exception as e:
logger.error(f"Failed to analyze gap for {requirement_id}: {e}")
return GapAnalysis(
requirement_id=requirement_id,
requirement_title=requirement_title,
coverage_level="unknown",
existing_controls=[],
missing_coverage=[],
suggested_actions=[]
)
async def batch_interpret_requirements(
self,
requirements: List[Dict[str, Any]],
rate_limit: float = 1.0
) -> List[RequirementInterpretation]:
"""
Process multiple requirements with rate limiting.
Useful for bulk processing of regulations.
"""
results = []
for i, req in enumerate(requirements):
if i > 0:
import asyncio
await asyncio.sleep(rate_limit)
result = await self.interpret_requirement(
requirement_id=req.get("id", str(i)),
article=req.get("article", ""),
title=req.get("title", ""),
requirement_text=req.get("requirement_text", ""),
regulation_code=req.get("regulation_code", ""),
regulation_name=req.get("regulation_name", "")
)
results.append(result)
logger.info(f"Processed requirement {i+1}/{len(requirements)}: {req.get('title', 'N/A')}")
return results
def _parse_json_response(self, content: str) -> Dict[str, Any]:
"""
Parse JSON from LLM response, handling common formatting issues.
"""
# Try to extract JSON from the response
content = content.strip()
# Remove markdown code blocks if present
if content.startswith("```json"):
content = content[7:]
elif content.startswith("```"):
content = content[3:]
if content.endswith("```"):
content = content[:-3]
content = content.strip()
# Find JSON object in the response
json_match = re.search(r'\{[\s\S]*\}', content)
if json_match:
content = json_match.group(0)
try:
return json.loads(content)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON response: {e}")
logger.debug(f"Raw content: {content[:500]}")
return {}
# Singleton instance
_assistant_instance: Optional[AIComplianceAssistant] = None
def get_ai_assistant() -> AIComplianceAssistant:
"""Get the shared AI compliance assistant instance."""
global _assistant_instance
if _assistant_instance is None:
_assistant_instance = AIComplianceAssistant()
return _assistant_instance
def reset_ai_assistant():
"""Reset the shared assistant instance (useful for testing)."""
global _assistant_instance
_assistant_instance = None