A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.
This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).
Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
430 lines
15 KiB
Python
430 lines
15 KiB
Python
"""
|
|
Tests for Compliance AI Integration (Sprint 4).
|
|
|
|
Tests the AI-powered compliance features:
|
|
- Requirement interpretation
|
|
- Control suggestions
|
|
- Risk assessment
|
|
- Gap analysis
|
|
"""
|
|
|
|
import pytest
|
|
import asyncio
|
|
from unittest.mock import patch, AsyncMock
|
|
|
|
# Import the services
|
|
from compliance.services.llm_provider import (
|
|
LLMProvider, LLMConfig, LLMProviderType,
|
|
AnthropicProvider, SelfHostedProvider, MockProvider,
|
|
get_llm_provider, LLMResponse
|
|
)
|
|
from compliance.services.ai_compliance_assistant import (
|
|
AIComplianceAssistant,
|
|
RequirementInterpretation,
|
|
ControlSuggestion,
|
|
RiskAssessment,
|
|
GapAnalysis
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# LLM Provider Tests
|
|
# ============================================================================
|
|
|
|
class TestMockProvider:
|
|
"""Test the MockProvider for testing scenarios."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_mock_provider_basic(self):
|
|
"""Test basic mock provider functionality."""
|
|
config = LLMConfig(provider_type=LLMProviderType.MOCK)
|
|
provider = MockProvider(config)
|
|
|
|
response = await provider.complete("Test prompt")
|
|
|
|
assert response.content is not None
|
|
assert response.provider == "mock"
|
|
assert response.model == "mock-model"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_mock_provider_custom_responses(self):
|
|
"""Test mock provider with custom responses."""
|
|
config = LLMConfig(provider_type=LLMProviderType.MOCK)
|
|
provider = MockProvider(config)
|
|
|
|
# Set custom responses
|
|
provider.set_responses([
|
|
"First response",
|
|
"Second response"
|
|
])
|
|
|
|
resp1 = await provider.complete("Prompt 1")
|
|
resp2 = await provider.complete("Prompt 2")
|
|
|
|
assert resp1.content == "First response"
|
|
assert resp2.content == "Second response"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_mock_provider_batch(self):
|
|
"""Test batch processing with mock provider."""
|
|
config = LLMConfig(provider_type=LLMProviderType.MOCK)
|
|
provider = MockProvider(config)
|
|
|
|
prompts = ["Prompt 1", "Prompt 2", "Prompt 3"]
|
|
responses = await provider.batch_complete(prompts)
|
|
|
|
assert len(responses) == 3
|
|
for resp in responses:
|
|
assert resp.provider == "mock"
|
|
|
|
|
|
class TestLLMProviderFactory:
|
|
"""Test the LLM provider factory function."""
|
|
|
|
def test_factory_mock_provider(self):
|
|
"""Test factory creates mock provider when configured."""
|
|
config = LLMConfig(provider_type=LLMProviderType.MOCK)
|
|
provider = get_llm_provider(config)
|
|
|
|
assert isinstance(provider, MockProvider)
|
|
assert provider.provider_name == "mock"
|
|
|
|
def test_factory_anthropic_without_key(self):
|
|
"""Test factory falls back to mock when API key is missing."""
|
|
config = LLMConfig(
|
|
provider_type=LLMProviderType.ANTHROPIC,
|
|
api_key=None
|
|
)
|
|
provider = get_llm_provider(config)
|
|
|
|
# Should fall back to mock
|
|
assert isinstance(provider, MockProvider)
|
|
|
|
def test_factory_self_hosted_without_url(self):
|
|
"""Test factory falls back to mock when URL is missing."""
|
|
config = LLMConfig(
|
|
provider_type=LLMProviderType.SELF_HOSTED,
|
|
base_url=None
|
|
)
|
|
provider = get_llm_provider(config)
|
|
|
|
# Should fall back to mock
|
|
assert isinstance(provider, MockProvider)
|
|
|
|
|
|
# ============================================================================
|
|
# AI Compliance Assistant Tests
|
|
# ============================================================================
|
|
|
|
class TestAIComplianceAssistant:
|
|
"""Test the AI Compliance Assistant."""
|
|
|
|
@pytest.fixture
|
|
def mock_provider(self):
|
|
"""Create a mock provider with predefined responses."""
|
|
config = LLMConfig(provider_type=LLMProviderType.MOCK)
|
|
provider = MockProvider(config)
|
|
|
|
# Set up responses for different test scenarios
|
|
provider.set_responses([
|
|
# Interpretation response
|
|
'''{
|
|
"summary": "Die Anforderung betrifft Datenverschlüsselung",
|
|
"applicability": "Gilt für alle Module die PII verarbeiten",
|
|
"technical_measures": ["AES-256 Verschlüsselung", "TLS 1.3"],
|
|
"affected_modules": ["consent-service", "klausur-service"],
|
|
"risk_level": "high",
|
|
"implementation_hints": ["Verwende SOPS", "Aktiviere TLS"]
|
|
}''',
|
|
# Control suggestion response
|
|
'''{
|
|
"controls": [
|
|
{
|
|
"control_id": "PRIV-042",
|
|
"domain": "priv",
|
|
"title": "Verschlüsselung personenbezogener Daten",
|
|
"description": "Alle PII müssen verschlüsselt sein",
|
|
"pass_criteria": "100% der PII sind AES-256 verschlüsselt",
|
|
"implementation_guidance": "Verwende SOPS mit Age-Keys",
|
|
"is_automated": true,
|
|
"automation_tool": "SOPS",
|
|
"priority": "high"
|
|
}
|
|
]
|
|
}''',
|
|
# Risk assessment response
|
|
'''{
|
|
"overall_risk": "high",
|
|
"risk_factors": [
|
|
{
|
|
"factor": "Verarbeitet personenbezogene Daten",
|
|
"severity": "high",
|
|
"likelihood": "high"
|
|
}
|
|
],
|
|
"recommendations": ["Verschlüsselung implementieren"],
|
|
"compliance_gaps": ["Fehlende Verschlüsselung"]
|
|
}''',
|
|
# Gap analysis response
|
|
'''{
|
|
"coverage_level": "partial",
|
|
"covered_aspects": ["Verschlüsselung in Transit"],
|
|
"missing_coverage": ["Verschlüsselung at Rest"],
|
|
"suggested_actions": ["Implementiere Disk-Encryption"]
|
|
}'''
|
|
])
|
|
|
|
return provider
|
|
|
|
@pytest.fixture
|
|
def assistant(self, mock_provider):
|
|
"""Create an AI assistant with mock provider."""
|
|
return AIComplianceAssistant(llm_provider=mock_provider)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_interpret_requirement(self, assistant):
|
|
"""Test requirement interpretation."""
|
|
result = await assistant.interpret_requirement(
|
|
requirement_id="req-123",
|
|
article="Art. 32",
|
|
title="Sicherheit der Verarbeitung",
|
|
requirement_text="Der Verantwortliche muss geeignete Maßnahmen treffen...",
|
|
regulation_code="GDPR",
|
|
regulation_name="DSGVO"
|
|
)
|
|
|
|
assert isinstance(result, RequirementInterpretation)
|
|
assert result.requirement_id == "req-123"
|
|
assert result.summary is not None
|
|
assert len(result.technical_measures) > 0
|
|
assert len(result.affected_modules) > 0
|
|
assert result.risk_level in ["low", "medium", "high", "critical"]
|
|
assert result.confidence_score > 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_suggest_controls(self, mock_provider):
|
|
"""Test control suggestions."""
|
|
# Set up mock with control suggestion response
|
|
mock_provider.set_responses(['''{
|
|
"controls": [
|
|
{
|
|
"control_id": "PRIV-042",
|
|
"domain": "priv",
|
|
"title": "Verschlüsselung personenbezogener Daten",
|
|
"description": "Alle PII müssen verschlüsselt sein",
|
|
"pass_criteria": "100% der PII sind AES-256 verschlüsselt",
|
|
"implementation_guidance": "Verwende SOPS mit Age-Keys",
|
|
"is_automated": true,
|
|
"automation_tool": "SOPS",
|
|
"priority": "high"
|
|
}
|
|
]
|
|
}'''])
|
|
assistant = AIComplianceAssistant(llm_provider=mock_provider)
|
|
|
|
suggestions = await assistant.suggest_controls(
|
|
requirement_title="Verschlüsselung der Verarbeitung",
|
|
requirement_text="Personenbezogene Daten müssen verschlüsselt werden",
|
|
regulation_name="DSGVO",
|
|
affected_modules=["consent-service"]
|
|
)
|
|
|
|
assert isinstance(suggestions, list)
|
|
assert len(suggestions) > 0
|
|
|
|
control = suggestions[0]
|
|
assert isinstance(control, ControlSuggestion)
|
|
assert control.control_id is not None
|
|
assert control.domain in ["priv", "iam", "sdlc", "crypto", "ops", "ai", "cra", "gov", "aud"]
|
|
assert control.title is not None
|
|
assert control.pass_criteria is not None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_assess_module_risk(self, mock_provider):
|
|
"""Test module risk assessment."""
|
|
# Set up mock with risk assessment response
|
|
mock_provider.set_responses(['''{
|
|
"overall_risk": "high",
|
|
"risk_factors": [
|
|
{
|
|
"factor": "Verarbeitet personenbezogene Daten",
|
|
"severity": "high",
|
|
"likelihood": "high"
|
|
}
|
|
],
|
|
"recommendations": ["Verschlüsselung implementieren"],
|
|
"compliance_gaps": ["Fehlende Verschlüsselung"]
|
|
}'''])
|
|
assistant = AIComplianceAssistant(llm_provider=mock_provider)
|
|
|
|
result = await assistant.assess_module_risk(
|
|
module_name="consent-service",
|
|
service_type="backend",
|
|
description="Verwaltet Einwilligungen",
|
|
processes_pii=True,
|
|
ai_components=False,
|
|
criticality="critical",
|
|
data_categories=["consent_records", "personal_data"],
|
|
regulations=[{"code": "GDPR", "relevance": "critical"}]
|
|
)
|
|
|
|
assert isinstance(result, RiskAssessment)
|
|
assert result.module_name == "consent-service"
|
|
assert result.overall_risk in ["low", "medium", "high", "critical"]
|
|
assert len(result.risk_factors) > 0
|
|
assert len(result.recommendations) > 0
|
|
assert result.confidence_score > 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_analyze_gap(self, assistant):
|
|
"""Test gap analysis."""
|
|
result = await assistant.analyze_gap(
|
|
requirement_id="req-456",
|
|
requirement_title="Verschlüsselung",
|
|
requirement_text="Daten müssen verschlüsselt sein",
|
|
regulation_code="GDPR",
|
|
existing_controls=[
|
|
{"control_id": "PRIV-001", "title": "TLS 1.3", "status": "pass"}
|
|
]
|
|
)
|
|
|
|
assert isinstance(result, GapAnalysis)
|
|
assert result.requirement_id == "req-456"
|
|
assert result.coverage_level in ["full", "partial", "none", "unknown"]
|
|
assert len(result.existing_controls) > 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_batch_interpret(self, assistant):
|
|
"""Test batch requirement interpretation."""
|
|
requirements = [
|
|
{
|
|
"id": "req-1",
|
|
"article": "Art. 32",
|
|
"title": "Sicherheit",
|
|
"requirement_text": "Sicherheitsmaßnahmen",
|
|
"regulation_code": "GDPR",
|
|
"regulation_name": "DSGVO"
|
|
},
|
|
{
|
|
"id": "req-2",
|
|
"article": "Art. 33",
|
|
"title": "Meldung",
|
|
"requirement_text": "Meldung von Datenpannen",
|
|
"regulation_code": "GDPR",
|
|
"regulation_name": "DSGVO"
|
|
}
|
|
]
|
|
|
|
results = await assistant.batch_interpret_requirements(
|
|
requirements=requirements,
|
|
rate_limit=0.1 # Fast for testing
|
|
)
|
|
|
|
assert len(results) == 2
|
|
for result in results:
|
|
assert isinstance(result, RequirementInterpretation)
|
|
|
|
|
|
class TestJSONParsing:
|
|
"""Test JSON parsing from LLM responses."""
|
|
|
|
@pytest.fixture
|
|
def assistant(self):
|
|
"""Create assistant for testing."""
|
|
config = LLMConfig(provider_type=LLMProviderType.MOCK)
|
|
provider = MockProvider(config)
|
|
return AIComplianceAssistant(llm_provider=provider)
|
|
|
|
def test_parse_clean_json(self, assistant):
|
|
"""Test parsing clean JSON response."""
|
|
content = '{"key": "value", "list": [1, 2, 3]}'
|
|
result = assistant._parse_json_response(content)
|
|
|
|
assert result == {"key": "value", "list": [1, 2, 3]}
|
|
|
|
def test_parse_json_with_markdown(self, assistant):
|
|
"""Test parsing JSON wrapped in markdown code blocks."""
|
|
content = '''```json
|
|
{"key": "value"}
|
|
```'''
|
|
result = assistant._parse_json_response(content)
|
|
|
|
assert result == {"key": "value"}
|
|
|
|
def test_parse_json_with_text(self, assistant):
|
|
"""Test extracting JSON from text response."""
|
|
content = '''
|
|
Here is the analysis:
|
|
{"key": "value", "nested": {"a": 1}}
|
|
That's the result.
|
|
'''
|
|
result = assistant._parse_json_response(content)
|
|
|
|
assert result == {"key": "value", "nested": {"a": 1}}
|
|
|
|
def test_parse_invalid_json(self, assistant):
|
|
"""Test handling of invalid JSON."""
|
|
content = "This is not JSON at all"
|
|
result = assistant._parse_json_response(content)
|
|
|
|
assert result == {}
|
|
|
|
|
|
# ============================================================================
|
|
# Integration Test Markers
|
|
# ============================================================================
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.skipif(
|
|
True, # Skip by default, run with --integration flag
|
|
reason="Requires API key or running LLM service"
|
|
)
|
|
class TestRealLLMIntegration:
|
|
"""Integration tests with real LLM providers (requires API keys)."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_anthropic_integration(self):
|
|
"""Test with real Anthropic API (requires ANTHROPIC_API_KEY)."""
|
|
import os
|
|
api_key = os.getenv("ANTHROPIC_API_KEY")
|
|
if not api_key:
|
|
pytest.skip("ANTHROPIC_API_KEY not set")
|
|
|
|
config = LLMConfig(
|
|
provider_type=LLMProviderType.ANTHROPIC,
|
|
api_key=api_key,
|
|
model="claude-sonnet-4-20250514"
|
|
)
|
|
provider = AnthropicProvider(config)
|
|
|
|
response = await provider.complete("Sage Hallo auf Deutsch.")
|
|
|
|
assert response.content is not None
|
|
assert "hallo" in response.content.lower() or "guten" in response.content.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_self_hosted_integration(self):
|
|
"""Test with self-hosted LLM (requires running Ollama/vLLM)."""
|
|
import os
|
|
base_url = os.getenv("SELF_HOSTED_LLM_URL", "http://localhost:11434")
|
|
|
|
config = LLMConfig(
|
|
provider_type=LLMProviderType.SELF_HOSTED,
|
|
base_url=base_url,
|
|
model="llama3.1:8b"
|
|
)
|
|
provider = SelfHostedProvider(config)
|
|
|
|
response = await provider.complete("Sage Hallo auf Deutsch.")
|
|
|
|
assert response.content is not None
|
|
|
|
|
|
# ============================================================================
|
|
# Run Tests
|
|
# ============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
# Run with: python -m pytest tests/test_compliance_ai.py -v
|
|
pytest.main([__file__, "-v"])
|