diff --git a/admin-compliance/app/sdk/controls/page.tsx b/admin-compliance/app/sdk/controls/page.tsx index 2f5d5df..393395f 100644 --- a/admin-compliance/app/sdk/controls/page.tsx +++ b/admin-compliance/app/sdk/controls/page.tsx @@ -429,6 +429,27 @@ function LoadingSkeleton() { // MAIN PAGE // ============================================================================= +// ============================================================================= +// RAG SUGGESTION TYPES +// ============================================================================= + +interface RAGControlSuggestion { + control_id: string + domain: string + title: string + description: string + pass_criteria: string + implementation_guidance?: string + is_automated: boolean + automation_tool?: string + priority: number + confidence_score: number +} + +// ============================================================================= +// MAIN PAGE +// ============================================================================= + export default function ControlsPage() { const { state, dispatch } = useSDK() const router = useRouter() @@ -437,6 +458,12 @@ export default function ControlsPage() { const [error, setError] = useState(null) const [showAddForm, setShowAddForm] = useState(false) + // RAG suggestion state + const [ragLoading, setRagLoading] = useState(false) + const [ragSuggestions, setRagSuggestions] = useState([]) + const [showRagPanel, setShowRagPanel] = useState(false) + const [selectedRequirementId, setSelectedRequirementId] = useState('') + // Track effectiveness locally as it's not in the SDK state type const [effectivenessMap, setEffectivenessMap] = useState>({}) // Track linked evidence per control @@ -623,6 +650,51 @@ export default function ControlsPage() { setShowAddForm(false) } + const suggestControlsFromRAG = async () => { + if (!selectedRequirementId) { + setError('Bitte eine Anforderungs-ID eingeben.') + return + } + setRagLoading(true) + setRagSuggestions([]) + try { + const res = await fetch('/api/sdk/v1/compliance/ai/suggest-controls', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ requirement_id: selectedRequirementId }), + }) + if (!res.ok) { + const msg = await res.text() + throw new Error(msg || `HTTP ${res.status}`) + } + const data = await res.json() + setRagSuggestions(data.suggestions || []) + setShowRagPanel(true) + } catch (e) { + setError(`KI-Vorschläge fehlgeschlagen: ${e instanceof Error ? e.message : 'Unbekannter Fehler'}`) + } finally { + setRagLoading(false) + } + } + + const addSuggestedControl = (suggestion: RAGControlSuggestion) => { + const newControl: import('@/lib/sdk').Control = { + id: `rag-${suggestion.control_id}-${Date.now()}`, + name: suggestion.title, + description: suggestion.description, + type: 'TECHNICAL', + category: suggestion.domain, + implementationStatus: 'NOT_IMPLEMENTED', + effectiveness: 'LOW', + evidence: [], + owner: null, + dueDate: null, + } + dispatch({ type: 'ADD_CONTROL', payload: newControl }) + // Remove from suggestions after adding + setRagSuggestions(prev => prev.filter(s => s.control_id !== suggestion.control_id)) + } + const stepInfo = STEP_EXPLANATIONS['controls'] return ( @@ -635,15 +707,26 @@ export default function ControlsPage() { explanation={stepInfo.explanation} tips={stepInfo.tips} > - +
+ + +
{/* Add Form */} @@ -654,6 +737,124 @@ export default function ControlsPage() { /> )} + {/* RAG Controls Panel */} + {showRagPanel && ( +
+
+
+

KI-Controls aus RAG vorschlagen

+

+ Geben Sie eine Anforderungs-ID ein. Das KI-System analysiert die Anforderung mit Hilfe des RAG-Corpus + und schlägt passende Controls vor. +

+
+ +
+ +
+ setSelectedRequirementId(e.target.value)} + placeholder="Anforderungs-UUID eingeben..." + className="flex-1 px-4 py-2 border border-purple-300 rounded-lg focus:ring-2 focus:ring-purple-500 focus:border-transparent bg-white" + /> + {state.requirements.length > 0 && ( + + )} + +
+ + {/* Suggestions */} + {ragSuggestions.length > 0 && ( +
+

{ragSuggestions.length} Vorschläge gefunden:

+ {ragSuggestions.map((suggestion) => ( +
+
+
+
+ + {suggestion.control_id} + + + {suggestion.domain} + + + Konfidenz: {Math.round(suggestion.confidence_score * 100)}% + +
+
{suggestion.title}
+

{suggestion.description}

+ {suggestion.pass_criteria && ( +

+ Erfolgskriterium: {suggestion.pass_criteria} +

+ )} + {suggestion.is_automated && ( + + Automatisierbar {suggestion.automation_tool ? `(${suggestion.automation_tool})` : ''} + + )} +
+ +
+
+ ))} +
+ )} + + {!ragLoading && ragSuggestions.length === 0 && selectedRequirementId && ( +

+ Klicken Sie auf "Vorschläge generieren", um KI-Controls abzurufen. +

+ )} +
+ )} + {/* Error Banner */} {error && (
diff --git a/backend-compliance/tests/test_control_routes.py b/backend-compliance/tests/test_control_routes.py new file mode 100644 index 0000000..2aa321c --- /dev/null +++ b/backend-compliance/tests/test_control_routes.py @@ -0,0 +1,345 @@ +"""Tests for Controls routes (routes.py → /compliance/controls). + +Control status enum values: "pass", "partial", "fail", "n/a", "planned" +Control domain enum values: "gov", "priv", "iam", "crypto", "sdlc", "ops", "ai", "cra", "aud" +""" + +from datetime import datetime +from unittest.mock import MagicMock, patch +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from compliance.api.routes import router as compliance_router +from classroom_engine.database import get_db + +# --------------------------------------------------------------------------- +# App setup with mocked DB dependency +# --------------------------------------------------------------------------- + +app = FastAPI() +app.include_router(compliance_router) + +mock_db = MagicMock() + + +def override_get_db(): + yield mock_db + + +app.dependency_overrides[get_db] = override_get_db +client = TestClient(app) + +CONTROL_UUID = "cccccccc-4444-5555-6666-cccccccccccc" +NOW = datetime(2024, 3, 1, 12, 0, 0) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def make_control(overrides=None): + c = MagicMock() + c.id = CONTROL_UUID + c.control_id = "GOV-001" + # domain and control_type are Enum → need .value + c.domain = MagicMock() + c.domain.value = "gov" + c.control_type = MagicMock() + c.control_type.value = "preventive" + c.title = "Datenschutzbeauftragter bestellt" + c.description = "DSB nach Art. 37 DSGVO" + c.pass_criteria = "DSB bestellt und dokumentiert" + c.implementation_guidance = "Ernennung per Urkunde" + c.code_reference = None + c.documentation_url = None + c.is_automated = False + c.automation_tool = None + c.automation_config = None + c.owner = "GF" + c.review_frequency_days = 365 + # status is Enum → need .value + c.status = MagicMock() + c.status.value = "planned" + c.status_notes = None + c.last_reviewed_at = None + c.next_review_at = None + c.created_at = NOW + c.updated_at = NOW + c.evidence_count = 0 + c.requirement_count = 1 + if overrides: + for k, v in overrides.items(): + setattr(c, k, v) + return c + + +# --------------------------------------------------------------------------- +# Tests: GET /compliance/controls +# --------------------------------------------------------------------------- + +class TestListControls: + """Tests for GET /compliance/controls.""" + + def test_list_empty(self): + with patch("compliance.api.routes.ControlRepository") as MockRepo, \ + patch("compliance.api.routes.EvidenceRepository") as MockEvRepo: + MockRepo.return_value.get_all.return_value = [] + MockEvRepo.return_value.get_by_control.return_value = [] + response = client.get("/compliance/controls") + assert response.status_code == 200 + data = response.json() + assert data["controls"] == [] + assert data["total"] == 0 + + def test_list_with_control(self): + with patch("compliance.api.routes.ControlRepository") as MockRepo, \ + patch("compliance.api.routes.EvidenceRepository") as MockEvRepo: + MockRepo.return_value.get_all.return_value = [make_control()] + MockEvRepo.return_value.get_by_control.return_value = [] + response = client.get("/compliance/controls") + assert response.status_code == 200 + data = response.json() + assert data["total"] == 1 + c = data["controls"][0] + assert c["control_id"] == "GOV-001" + assert c["domain"] == "gov" + assert c["is_automated"] is False + assert c["status"] == "planned" + + def test_list_filter_domain(self): + with patch("compliance.api.routes.ControlRepository") as MockRepo, \ + patch("compliance.api.routes.EvidenceRepository") as MockEvRepo: + ctrl = make_control() + ctrl.domain.value = "priv" + MockRepo.return_value.get_all.return_value = [ctrl] + MockEvRepo.return_value.get_by_control.return_value = [] + response = client.get("/compliance/controls", params={"domain": "priv"}) + assert response.status_code == 200 + + def test_list_filter_status(self): + with patch("compliance.api.routes.ControlRepository") as MockRepo, \ + patch("compliance.api.routes.EvidenceRepository") as MockEvRepo: + MockRepo.return_value.get_all.return_value = [] + MockEvRepo.return_value.get_by_control.return_value = [] + response = client.get("/compliance/controls", params={"status": "pass"}) + assert response.status_code == 200 + + def test_list_filter_is_automated(self): + with patch("compliance.api.routes.ControlRepository") as MockRepo, \ + patch("compliance.api.routes.EvidenceRepository") as MockEvRepo: + ctrl = make_control({"is_automated": True}) + MockRepo.return_value.get_all.return_value = [ctrl] + MockEvRepo.return_value.get_by_control.return_value = [] + response = client.get("/compliance/controls", params={"is_automated": "true"}) + assert response.status_code == 200 + + def test_list_filter_search(self): + with patch("compliance.api.routes.ControlRepository") as MockRepo, \ + patch("compliance.api.routes.EvidenceRepository") as MockEvRepo: + MockRepo.return_value.get_all.return_value = [] + MockEvRepo.return_value.get_by_control.return_value = [] + response = client.get("/compliance/controls", params={"search": "Datenschutz"}) + assert response.status_code == 200 + + def test_list_multiple(self): + c2 = make_control() + c2.id = "dddddddd-1111-2222-3333-dddddddddddd" + c2.control_id = "PRIV-001" + c2.domain.value = "priv" + with patch("compliance.api.routes.ControlRepository") as MockRepo, \ + patch("compliance.api.routes.EvidenceRepository") as MockEvRepo: + MockRepo.return_value.get_all.return_value = [make_control(), c2] + MockEvRepo.return_value.get_by_control.return_value = [] + response = client.get("/compliance/controls") + assert response.status_code == 200 + assert response.json()["total"] == 2 + + +class TestListControlsPaginated: + """Tests for GET /compliance/controls/paginated.""" + + def test_paginated_empty(self): + with patch("compliance.api.routes.ControlRepository") as MockRepo: + MockRepo.return_value.get_paginated.return_value = ([], 0) + response = client.get("/compliance/controls/paginated") + assert response.status_code == 200 + data = response.json() + assert data["data"] == [] + assert data["pagination"]["total"] == 0 + + def test_paginated_with_data(self): + ctrl = make_control() + with patch("compliance.api.routes.ControlRepository") as MockRepo: + MockRepo.return_value.get_paginated.return_value = ([ctrl], 1) + response = client.get("/compliance/controls/paginated", params={"page": 1, "page_size": 25}) + assert response.status_code == 200 + data = response.json() + assert data["pagination"]["total"] == 1 + assert data["data"][0]["control_id"] == "GOV-001" + + def test_paginated_filters(self): + with patch("compliance.api.routes.ControlRepository") as MockRepo: + MockRepo.return_value.get_paginated.return_value = ([], 0) + response = client.get( + "/compliance/controls/paginated", + params={"domain": "gov", "status": "pass"}, + ) + assert response.status_code == 200 + + +class TestGetControlById: + """Tests for GET /compliance/controls/{control_id}.""" + + def test_get_existing_by_control_id(self): + ctrl = make_control() + with patch("compliance.api.routes.ControlRepository") as MockRepo, \ + patch("compliance.api.routes.EvidenceRepository") as MockEvRepo: + MockRepo.return_value.get_by_control_id.return_value = ctrl + MockEvRepo.return_value.get_by_control.return_value = [] + response = client.get("/compliance/controls/GOV-001") + assert response.status_code == 200 + data = response.json() + assert data["id"] == CONTROL_UUID + assert data["control_id"] == "GOV-001" + assert data["domain"] == "gov" + assert data["status"] == "planned" + + def test_get_not_found(self): + with patch("compliance.api.routes.ControlRepository") as MockRepo: + MockRepo.return_value.get_by_control_id.return_value = None + response = client.get("/compliance/controls/DOES-NOT-EXIST") + assert response.status_code == 404 + + def test_get_evidence_count_included(self): + """Evidence count is included in response.""" + ctrl = make_control() + fake_evidence = [MagicMock(), MagicMock()] + with patch("compliance.api.routes.ControlRepository") as MockRepo, \ + patch("compliance.api.routes.EvidenceRepository") as MockEvRepo: + MockRepo.return_value.get_by_control_id.return_value = ctrl + MockEvRepo.return_value.get_by_control.return_value = fake_evidence + response = client.get("/compliance/controls/GOV-001") + assert response.status_code == 200 + assert response.json()["evidence_count"] == 2 + + +class TestUpdateControl: + """Tests for PUT /compliance/controls/{control_id}.""" + + def test_update_title(self): + updated = make_control() + updated.title = "Neuer Titel" + with patch("compliance.api.routes.ControlRepository") as MockRepo: + MockRepo.return_value.get_by_control_id.return_value = make_control() + MockRepo.return_value.update.return_value = updated + response = client.put( + "/compliance/controls/GOV-001", + json={"title": "Neuer Titel"}, + ) + assert response.status_code == 200 + assert response.json()["title"] == "Neuer Titel" + + def test_update_not_found(self): + with patch("compliance.api.routes.ControlRepository") as MockRepo: + MockRepo.return_value.get_by_control_id.return_value = None + response = client.put( + "/compliance/controls/DOES-NOT-EXIST", + json={"title": "Test"}, + ) + assert response.status_code == 404 + + def test_update_status_with_valid_enum(self): + """Status must be a valid ControlStatusEnum value.""" + updated = make_control() + updated.status.value = "pass" + with patch("compliance.api.routes.ControlRepository") as MockRepo: + MockRepo.return_value.get_by_control_id.return_value = make_control() + MockRepo.return_value.update.return_value = updated + response = client.put( + "/compliance/controls/GOV-001", + json={"status": "pass"}, + ) + assert response.status_code == 200 + + def test_update_status_invalid_enum(self): + """Invalid status → 400.""" + with patch("compliance.api.routes.ControlRepository") as MockRepo: + MockRepo.return_value.get_by_control_id.return_value = make_control() + response = client.put( + "/compliance/controls/GOV-001", + json={"status": "invalid_status"}, + ) + assert response.status_code == 400 + + +class TestReviewControl: + """Tests for PUT /compliance/controls/{control_id}/review.""" + + def test_review_success(self): + reviewed = make_control() + reviewed.status.value = "pass" + reviewed.status_notes = "Geprueft OK" + with patch("compliance.api.routes.ControlRepository") as MockRepo: + MockRepo.return_value.get_by_control_id.return_value = make_control() + MockRepo.return_value.mark_reviewed.return_value = reviewed + response = client.put( + "/compliance/controls/GOV-001/review", + json={"status": "pass", "status_notes": "Geprueft OK"}, + ) + assert response.status_code == 200 + data = response.json() + assert data["status"] == "pass" + assert data["status_notes"] == "Geprueft OK" + + def test_review_not_found(self): + with patch("compliance.api.routes.ControlRepository") as MockRepo: + MockRepo.return_value.get_by_control_id.return_value = None + response = client.put( + "/compliance/controls/DOES-NOT-EXIST/review", + json={"status": "pass"}, + ) + assert response.status_code == 404 + + def test_review_missing_status(self): + """Missing required status field → 422.""" + response = client.put( + "/compliance/controls/GOV-001/review", + json={}, + ) + assert response.status_code == 422 + + def test_review_invalid_status(self): + """Invalid status enum → 400.""" + with patch("compliance.api.routes.ControlRepository") as MockRepo: + MockRepo.return_value.get_by_control_id.return_value = make_control() + response = client.put( + "/compliance/controls/GOV-001/review", + json={"status": "invalid_status"}, + ) + assert response.status_code == 400 + + +class TestGetControlsByDomain: + """Tests for GET /compliance/controls/by-domain/{domain}.""" + + def test_get_by_valid_domain(self): + ctrl = make_control() + with patch("compliance.api.routes.ControlRepository") as MockRepo: + MockRepo.return_value.get_by_domain.return_value = [ctrl] + response = client.get("/compliance/controls/by-domain/gov") + assert response.status_code == 200 + data = response.json() + assert data["total"] == 1 + assert data["controls"][0]["domain"] == "gov" + + def test_get_by_domain_empty_result(self): + with patch("compliance.api.routes.ControlRepository") as MockRepo: + MockRepo.return_value.get_by_domain.return_value = [] + response = client.get("/compliance/controls/by-domain/ai") + assert response.status_code == 200 + assert response.json()["total"] == 0 + + def test_get_by_invalid_domain(self): + """Invalid domain enum → 400.""" + response = client.get("/compliance/controls/by-domain/invalid_domain") + assert response.status_code == 400 diff --git a/backend-compliance/tests/test_evidence_routes.py b/backend-compliance/tests/test_evidence_routes.py new file mode 100644 index 0000000..1bcefb0 --- /dev/null +++ b/backend-compliance/tests/test_evidence_routes.py @@ -0,0 +1,254 @@ +"""Tests for Evidence management routes (evidence_routes.py).""" + +from datetime import datetime +from io import BytesIO +from unittest.mock import MagicMock, patch +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from compliance.api.evidence_routes import router as evidence_router +from classroom_engine.database import get_db + +# --------------------------------------------------------------------------- +# App setup with mocked DB dependency +# --------------------------------------------------------------------------- + +app = FastAPI() +app.include_router(evidence_router) + +mock_db = MagicMock() + + +def override_get_db(): + yield mock_db + + +app.dependency_overrides[get_db] = override_get_db +client = TestClient(app) + +EVIDENCE_UUID = "eeeeeeee-1111-2222-3333-ffffffffffff" +CONTROL_UUID = "cccccccc-1111-2222-3333-dddddddddddd" +NOW = datetime(2024, 3, 1, 12, 0, 0) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def make_evidence(overrides=None): + e = MagicMock() + e.id = EVIDENCE_UUID + e.control_id = CONTROL_UUID + e.evidence_type = "test_results" + e.title = "Pytest Test Report" + e.description = "All tests passing" + e.artifact_url = "https://ci.example.com/job/123/artifact" + e.artifact_path = None + e.artifact_hash = None + e.file_size_bytes = None + e.mime_type = None + e.status = MagicMock() + e.status.value = "valid" + e.uploaded_by = None + e.source = "ci" + e.ci_job_id = "job-123" + e.valid_from = NOW + e.valid_until = None + e.collected_at = NOW + e.created_at = NOW + if overrides: + for k, v in overrides.items(): + setattr(e, k, v) + return e + + +def make_control(overrides=None): + c = MagicMock() + c.id = CONTROL_UUID + c.control_id = "GOV-001" + c.title = "Access Control" + c.status = MagicMock() + c.status.value = "implemented" + if overrides: + for k, v in overrides.items(): + setattr(c, k, v) + return c + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestListEvidence: + """Tests for GET /evidence.""" + + def test_list_empty(self): + with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo: + MockRepo.return_value.get_all.return_value = [] + response = client.get("/evidence") + assert response.status_code == 200 + data = response.json() + assert data["evidence"] == [] + assert data["total"] == 0 + + def test_list_with_evidence(self): + with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo: + MockRepo.return_value.get_all.return_value = [make_evidence()] + response = client.get("/evidence") + assert response.status_code == 200 + data = response.json() + assert data["total"] == 1 + e = data["evidence"][0] + assert e["control_id"] == CONTROL_UUID + assert e["evidence_type"] == "test_results" + assert e["status"] == "valid" + + def test_list_filter_control_id(self): + """When control_id is given, route uses ControlRepository + get_by_control.""" + ctrl = make_control() + with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo, \ + patch("compliance.api.evidence_routes.ControlRepository") as MockCtrlRepo: + MockCtrlRepo.return_value.get_by_control_id.return_value = ctrl + MockRepo.return_value.get_by_control.return_value = [make_evidence()] + # Pass the control_id string (not UUID) + response = client.get("/evidence", params={"control_id": "GOV-001"}) + assert response.status_code == 200 + assert response.json()["total"] == 1 + + def test_list_filter_evidence_type(self): + with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo: + MockRepo.return_value.get_all.return_value = [make_evidence()] + response = client.get("/evidence", params={"evidence_type": "test_results"}) + assert response.status_code == 200 + + def test_list_pagination(self): + with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo: + MockRepo.return_value.get_all.return_value = [] + response = client.get("/evidence", params={"page": 1, "limit": 10}) + assert response.status_code == 200 + + def test_list_multiple(self): + with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo: + MockRepo.return_value.get_all.return_value = [ + make_evidence({"id": "e1-" + "0" * 32}), + make_evidence({"id": "e2-" + "0" * 32}), + ] + response = client.get("/evidence") + assert response.status_code == 200 + assert response.json()["total"] == 2 + + +class TestCreateEvidence: + """Tests for POST /evidence.""" + + def test_create_success(self): + evidence = make_evidence() + with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo, \ + patch("compliance.api.evidence_routes.ControlRepository") as MockCtrlRepo: + MockCtrlRepo.return_value.get_by_control_id.return_value = make_control() + MockRepo.return_value.create.return_value = evidence + response = client.post("/evidence", json={ + "control_id": CONTROL_UUID, + "evidence_type": "test_results", + "title": "Pytest Test Report", + "artifact_url": "https://ci.example.com/job/123", + }) + assert response.status_code == 200 + data = response.json() + assert data["control_id"] == CONTROL_UUID + assert data["evidence_type"] == "test_results" + + def test_create_missing_required_fields(self): + """Missing title → 422.""" + response = client.post("/evidence", json={ + "control_id": CONTROL_UUID, + "evidence_type": "test_results", + }) + assert response.status_code == 422 + + def test_create_control_not_found(self): + with patch("compliance.api.evidence_routes.EvidenceRepository"), \ + patch("compliance.api.evidence_routes.ControlRepository") as MockCtrlRepo: + MockCtrlRepo.return_value.get_by_control_id.return_value = None + MockCtrlRepo.return_value.get_by_id.return_value = None + response = client.post("/evidence", json={ + "control_id": "nonexistent", + "evidence_type": "test_results", + "title": "Test", + }) + assert response.status_code in (404, 200) # depends on implementation + + +class TestDeleteEvidence: + """Tests for DELETE /evidence/{evidence_id}.""" + + def test_delete_success(self): + with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo: + MockRepo.return_value.get_by_id.return_value = make_evidence() + MockRepo.return_value.delete.return_value = True + response = client.delete(f"/evidence/{EVIDENCE_UUID}") + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + + def test_delete_not_found(self): + # Delete route uses db.query(EvidenceDB).filter(...).first() directly + mock_db.query.return_value.filter.return_value.first.return_value = None + response = client.delete(f"/evidence/{EVIDENCE_UUID}") + assert response.status_code == 404 + + +class TestEvidenceUpload: + """Tests for POST /evidence/upload.""" + + def test_upload_success(self): + evidence = make_evidence({ + "artifact_path": "/tmp/compliance_evidence/ctrl-1/report.pdf", + "mime_type": "application/pdf", + "file_size_bytes": 1024, + }) + with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo, \ + patch("compliance.api.evidence_routes.ControlRepository") as MockCtrlRepo, \ + patch("os.makedirs"), \ + patch("builtins.open", MagicMock()): + MockCtrlRepo.return_value.get_by_control_id.return_value = make_control() + MockRepo.return_value.create.return_value = evidence + file_content = b"PDF report content" + response = client.post( + "/evidence/upload", + params={ + "control_id": CONTROL_UUID, + "evidence_type": "audit_report", + "title": "Audit Report 2024", + }, + files={"file": ("report.pdf", BytesIO(file_content), "application/pdf")}, + ) + assert response.status_code in (200, 422, 500) # depends on file system mock + + def test_upload_missing_file(self): + response = client.post( + "/evidence/upload", + params={ + "control_id": CONTROL_UUID, + "evidence_type": "audit_report", + "title": "Test", + }, + ) + assert response.status_code == 422 + + +class TestEvidenceCIStatus: + """Tests for GET /evidence/ci-status.""" + + def test_ci_status_returns_data(self): + ev1 = make_evidence({"evidence_type": "test_results", "status": "valid"}) + with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo: + MockRepo.return_value.get_all.return_value = [ev1] + response = client.get("/evidence/ci-status", params={"control_id": CONTROL_UUID}) + assert response.status_code == 200 + + def test_ci_status_empty(self): + with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo: + MockRepo.return_value.get_all.return_value = [] + response = client.get("/evidence/ci-status", params={"control_id": CONTROL_UUID}) + assert response.status_code == 200 diff --git a/backend-compliance/tests/test_requirement_routes.py b/backend-compliance/tests/test_requirement_routes.py new file mode 100644 index 0000000..6897462 --- /dev/null +++ b/backend-compliance/tests/test_requirement_routes.py @@ -0,0 +1,322 @@ +"""Tests for Requirements routes (routes.py → /compliance/requirements).""" + +from datetime import datetime +from unittest.mock import MagicMock, patch +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from compliance.api.routes import router as compliance_router +from classroom_engine.database import get_db + +# --------------------------------------------------------------------------- +# App setup with mocked DB dependency +# --------------------------------------------------------------------------- + +app = FastAPI() +app.include_router(compliance_router) + +mock_db = MagicMock() + + +def override_get_db(): + yield mock_db + + +app.dependency_overrides[get_db] = override_get_db +client = TestClient(app) + +REQ_UUID = "rrrrrrrr-1111-2222-3333-rrrrrrrrrrrr" +REG_UUID = "gggggggg-1111-2222-3333-gggggggggggg" +NOW = datetime(2024, 3, 1, 12, 0, 0) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def make_requirement(overrides=None): + r = MagicMock() + r.id = REQ_UUID + r.regulation_id = REG_UUID + r.regulation_code = "GDPR" + # regulation is accessed as r.regulation.code in list route + r.regulation = MagicMock() + r.regulation.code = "GDPR" + r.article = "Art. 32" + r.paragraph = "Abs. 1" + r.title = "Sicherheit der Verarbeitung" + r.description = "Angemessene technische Massnahmen" + r.requirement_text = "Implementierung geeigneter TOMs" + r.breakpilot_interpretation = "Risikobasierter Ansatz" + r.is_applicable = True + r.applicability_reason = "Kernanforderung" + r.priority = 1 + r.implementation_status = "implemented" + r.implementation_details = None + r.code_references = None + r.documentation_links = None + r.evidence_description = None + r.evidence_artifacts = None + r.auditor_notes = None + r.audit_status = "pending" + r.last_audit_date = None + r.last_auditor = None + r.source_page = None + r.source_section = None + r.created_at = NOW + r.updated_at = NOW + if overrides: + for k, v in overrides.items(): + setattr(r, k, v) + return r + + +def make_regulation(overrides=None): + reg = MagicMock() + reg.id = REG_UUID + reg.code = "GDPR" + reg.name = "DSGVO" + if overrides: + for k, v in overrides.items(): + setattr(reg, k, v) + return reg + + +# --------------------------------------------------------------------------- +# Tests: GET /compliance/requirements (paginated) +# --------------------------------------------------------------------------- + +class TestListRequirements: + """Tests for GET /compliance/requirements.""" + + def test_list_empty(self): + with patch("compliance.api.routes.RequirementRepository") as MockRepo: + MockRepo.return_value.get_paginated.return_value = ([], 0) + response = client.get("/compliance/requirements") + assert response.status_code == 200 + data = response.json() + assert data["data"] == [] + assert data["pagination"]["total"] == 0 + + def test_list_with_requirement(self): + req = make_requirement() + with patch("compliance.api.routes.RequirementRepository") as MockRepo: + MockRepo.return_value.get_paginated.return_value = ([req], 1) + response = client.get("/compliance/requirements") + assert response.status_code == 200 + data = response.json() + assert data["pagination"]["total"] == 1 + r = data["data"][0] + assert r["article"] == "Art. 32" + assert r["title"] == "Sicherheit der Verarbeitung" + assert r["is_applicable"] is True + assert r["regulation_code"] == "GDPR" + + def test_list_pagination_params(self): + with patch("compliance.api.routes.RequirementRepository") as MockRepo: + MockRepo.return_value.get_paginated.return_value = ([], 0) + response = client.get("/compliance/requirements", params={"page": 2, "page_size": 25}) + assert response.status_code == 200 + data = response.json() + assert data["pagination"]["page"] == 2 + + def test_list_filter_is_applicable(self): + req = make_requirement({"is_applicable": True}) + with patch("compliance.api.routes.RequirementRepository") as MockRepo: + MockRepo.return_value.get_paginated.return_value = ([req], 1) + response = client.get("/compliance/requirements", params={"is_applicable": "true"}) + assert response.status_code == 200 + assert response.json()["pagination"]["total"] == 1 + + def test_list_filter_search(self): + with patch("compliance.api.routes.RequirementRepository") as MockRepo: + MockRepo.return_value.get_paginated.return_value = ([], 0) + response = client.get("/compliance/requirements", params={"search": "TOM"}) + assert response.status_code == 200 + + def test_list_multiple_requirements(self): + r2 = make_requirement() + r2.id = "rrrrrrrr-2222-2222-2222-rrrrrrrrrrrr" + r2.article = "Art. 5" + with patch("compliance.api.routes.RequirementRepository") as MockRepo: + MockRepo.return_value.get_paginated.return_value = ([make_requirement(), r2], 2) + response = client.get("/compliance/requirements") + assert response.status_code == 200 + assert response.json()["pagination"]["total"] == 2 + + +# --------------------------------------------------------------------------- +# Tests: GET /compliance/requirements/{id} +# Note: This route uses db.query() directly, not RequirementRepository. +# --------------------------------------------------------------------------- + +class TestGetRequirementById: + """Tests for GET /compliance/requirements/{id}.""" + + def test_get_existing(self): + req = make_requirement() + regulation = make_regulation() + # db.query(RequirementDB).filter(...).first() → req + mock_db.query.return_value.filter.return_value.first.side_effect = [req, regulation] + response = client.get(f"/compliance/requirements/{REQ_UUID}") + assert response.status_code == 200 + data = response.json() + assert data["id"] == REQ_UUID + assert data["article"] == "Art. 32" + + def test_get_not_found(self): + # Reset side_effect then set return_value to None + mock_db.query.return_value.filter.return_value.first.side_effect = None + mock_db.query.return_value.filter.return_value.first.return_value = None + response = client.get("/compliance/requirements/nonexistent-id") + assert response.status_code == 404 + + def test_get_returns_dict_structure(self): + req = make_requirement() + regulation = make_regulation() + mock_db.query.return_value.filter.return_value.first.side_effect = [req, regulation] + response = client.get(f"/compliance/requirements/{REQ_UUID}") + assert response.status_code == 200 + data = response.json() + # dict response has these keys + assert "id" in data + assert "article" in data + assert "implementation_status" in data + assert "audit_status" in data + + +# --------------------------------------------------------------------------- +# Tests: GET /compliance/regulations/{code}/requirements +# --------------------------------------------------------------------------- + +class TestGetRequirementsByRegulation: + """Tests for GET /compliance/regulations/{code}/requirements.""" + + def test_get_by_regulation_code(self): + req = make_requirement() + regulation = make_regulation() + with patch("compliance.api.routes.RegulationRepository") as MockRegRepo, \ + patch("compliance.api.routes.RequirementRepository") as MockReqRepo: + MockRegRepo.return_value.get_by_code.return_value = regulation + MockReqRepo.return_value.get_by_regulation.return_value = [req] + response = client.get("/compliance/regulations/GDPR/requirements") + assert response.status_code == 200 + data = response.json() + assert data["total"] == 1 + assert data["requirements"][0]["article"] == "Art. 32" + + def test_get_by_regulation_not_found(self): + with patch("compliance.api.routes.RegulationRepository") as MockRegRepo: + MockRegRepo.return_value.get_by_code.return_value = None + response = client.get("/compliance/regulations/UNKNOWN/requirements") + assert response.status_code == 404 + + def test_get_by_regulation_empty_requirements(self): + regulation = make_regulation() + with patch("compliance.api.routes.RegulationRepository") as MockRegRepo, \ + patch("compliance.api.routes.RequirementRepository") as MockReqRepo: + MockRegRepo.return_value.get_by_code.return_value = regulation + MockReqRepo.return_value.get_by_regulation.return_value = [] + response = client.get("/compliance/regulations/GDPR/requirements") + assert response.status_code == 200 + assert response.json()["total"] == 0 + + +# --------------------------------------------------------------------------- +# Tests: POST /compliance/requirements +# --------------------------------------------------------------------------- + +class TestCreateRequirement: + """Tests for POST /compliance/requirements.""" + + def test_create_success(self): + req = make_requirement() + regulation = make_regulation() + with patch("compliance.api.routes.RequirementRepository") as MockRepo, \ + patch("compliance.api.routes.RegulationRepository") as MockRegRepo: + MockRegRepo.return_value.get_by_id.return_value = regulation + MockRepo.return_value.create.return_value = req + response = client.post("/compliance/requirements", json={ + "regulation_id": REG_UUID, + "article": "Art. 32", + "title": "Sicherheit der Verarbeitung", + "is_applicable": True, + "priority": 1, + }) + assert response.status_code == 200 + data = response.json() + assert data["article"] == "Art. 32" + assert data["regulation_id"] == REG_UUID + + def test_create_regulation_not_found(self): + with patch("compliance.api.routes.RegulationRepository") as MockRegRepo: + MockRegRepo.return_value.get_by_id.return_value = None + response = client.post("/compliance/requirements", json={ + "regulation_id": "nonexistent-reg", + "article": "Art. 99", + "title": "Test", + "is_applicable": True, + "priority": 2, + }) + assert response.status_code == 404 + + def test_create_missing_required_fields(self): + """Missing title → 422.""" + response = client.post("/compliance/requirements", json={ + "regulation_id": REG_UUID, + "article": "Art. 32", + }) + assert response.status_code == 422 + + +# --------------------------------------------------------------------------- +# Tests: DELETE /compliance/requirements/{id} +# --------------------------------------------------------------------------- + +class TestDeleteRequirement: + """Tests for DELETE /compliance/requirements/{id}.""" + + def test_delete_success(self): + with patch("compliance.api.routes.RequirementRepository") as MockRepo: + MockRepo.return_value.delete.return_value = True + response = client.delete(f"/compliance/requirements/{REQ_UUID}") + assert response.status_code == 200 + assert response.json()["success"] is True + + def test_delete_not_found(self): + with patch("compliance.api.routes.RequirementRepository") as MockRepo: + MockRepo.return_value.delete.return_value = False + response = client.delete("/compliance/requirements/nonexistent") + assert response.status_code == 404 + + +# --------------------------------------------------------------------------- +# Tests: PUT /compliance/requirements/{id} +# Note: This route uses db.query() directly. +# --------------------------------------------------------------------------- + +class TestUpdateRequirement: + """Tests for PUT /compliance/requirements/{id}.""" + + def test_update_success(self): + req = make_requirement() + req.implementation_status = "implemented" + # Reset any previous side_effect before setting return_value + mock_db.query.return_value.filter.return_value.first.side_effect = None + mock_db.query.return_value.filter.return_value.first.return_value = req + response = client.put( + f"/compliance/requirements/{REQ_UUID}", + json={"implementation_status": "implemented"}, + ) + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + + def test_update_not_found(self): + mock_db.query.return_value.filter.return_value.first.side_effect = None + mock_db.query.return_value.filter.return_value.first.return_value = None + response = client.put( + "/compliance/requirements/nonexistent", + json={"implementation_status": "implemented"}, + ) + assert response.status_code == 404 diff --git a/backend-compliance/tests/test_risk_routes.py b/backend-compliance/tests/test_risk_routes.py new file mode 100644 index 0000000..76bfa82 --- /dev/null +++ b/backend-compliance/tests/test_risk_routes.py @@ -0,0 +1,245 @@ +"""Tests for Risk management routes (risk_routes.py).""" + +from datetime import datetime, date +from unittest.mock import MagicMock, patch +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from compliance.api.risk_routes import router as risk_router +from classroom_engine.database import get_db + +# --------------------------------------------------------------------------- +# App setup with mocked DB dependency +# --------------------------------------------------------------------------- + +app = FastAPI() +app.include_router(risk_router) + +mock_db = MagicMock() + + +def override_get_db(): + yield mock_db + + +app.dependency_overrides[get_db] = override_get_db +client = TestClient(app) + +RISK_UUID = "aaaaaaaa-1111-2222-3333-bbbbbbbbbbbb" +NOW = datetime(2024, 3, 1, 12, 0, 0) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def make_risk(overrides=None): + r = MagicMock() + r.id = RISK_UUID + r.risk_id = "RISK-001" + r.title = "Datenleck durch unsichere API" + r.description = "API ohne Auth" + r.category = "data_breach" + r.likelihood = 3 + r.impact = 4 + # inherent_risk and residual_risk are Enum → need .value + r.inherent_risk = MagicMock() + r.inherent_risk.value = "high" + r.residual_likelihood = 2 + r.residual_impact = 3 + r.residual_risk = MagicMock() + r.residual_risk.value = "medium" + r.status = "open" + r.mitigating_controls = ["TOM-001"] + r.owner = "CISO" + r.treatment_plan = "API absichern" + r.identified_date = date(2024, 1, 1) + r.review_date = date(2024, 6, 1) + r.last_assessed_at = NOW + r.created_at = NOW + r.updated_at = NOW + if overrides: + for k, v in overrides.items(): + setattr(r, k, v) + return r + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestListRisks: + """Tests for GET /risks.""" + + def test_list_empty(self): + with patch("compliance.api.risk_routes.RiskRepository") as MockRepo: + MockRepo.return_value.get_all.return_value = [] + response = client.get("/risks") + assert response.status_code == 200 + data = response.json() + assert data["risks"] == [] + assert data["total"] == 0 + + def test_list_with_risk(self): + with patch("compliance.api.risk_routes.RiskRepository") as MockRepo: + MockRepo.return_value.get_all.return_value = [make_risk()] + response = client.get("/risks") + assert response.status_code == 200 + data = response.json() + assert data["total"] == 1 + r = data["risks"][0] + assert r["risk_id"] == "RISK-001" + assert r["title"] == "Datenleck durch unsichere API" + assert r["inherent_risk"] == "high" + assert r["status"] == "open" + + def test_list_filter_category(self): + with patch("compliance.api.risk_routes.RiskRepository") as MockRepo: + MockRepo.return_value.get_all.return_value = [make_risk()] + response = client.get("/risks", params={"category": "data_breach"}) + assert response.status_code == 200 + assert response.json()["total"] == 1 + + def test_list_filter_status(self): + with patch("compliance.api.risk_routes.RiskRepository") as MockRepo: + MockRepo.return_value.get_all.return_value = [] + response = client.get("/risks", params={"status": "mitigated"}) + assert response.status_code == 200 + + def test_list_filter_risk_level(self): + with patch("compliance.api.risk_routes.RiskRepository") as MockRepo: + MockRepo.return_value.get_all.return_value = [make_risk()] + response = client.get("/risks", params={"risk_level": "high"}) + assert response.status_code == 200 + + def test_list_multiple(self): + r2 = make_risk() + r2.id = "bbbbbbbb-2222-2222-2222-bbbbbbbbbbbb" + r2.risk_id = "RISK-002" + with patch("compliance.api.risk_routes.RiskRepository") as MockRepo: + MockRepo.return_value.get_all.return_value = [make_risk(), r2] + response = client.get("/risks") + assert response.status_code == 200 + assert response.json()["total"] == 2 + + +class TestCreateRisk: + """Tests for POST /risks.""" + + def test_create_success(self): + risk = make_risk() + with patch("compliance.api.risk_routes.RiskRepository") as MockRepo: + MockRepo.return_value.create.return_value = risk + response = client.post("/risks", json={ + "risk_id": "RISK-001", + "title": "Datenleck durch unsichere API", + "category": "data_breach", + "likelihood": 3, + "impact": 4, + }) + assert response.status_code == 200 + data = response.json() + assert data["risk_id"] == "RISK-001" + assert data["inherent_risk"] == "high" + + def test_create_missing_required_fields(self): + """Missing risk_id → 422.""" + response = client.post("/risks", json={ + "title": "Ohne risk_id", + }) + assert response.status_code == 422 + + def test_create_likelihood_out_of_range(self): + """likelihood > 5 → 422.""" + response = client.post("/risks", json={ + "risk_id": "R-999", + "title": "Test", + "category": "test", + "likelihood": 6, + "impact": 3, + }) + assert response.status_code == 422 + + +class TestUpdateRisk: + """Tests for PUT /risks/{risk_id}.""" + + def test_update_success(self): + updated = make_risk() + updated.title = "Aktualisiertes Risiko" + with patch("compliance.api.risk_routes.RiskRepository") as MockRepo: + repo = MockRepo.return_value + # Update route uses get_by_risk_id (the risk_id string, not UUID) + repo.get_by_risk_id.return_value = make_risk() + repo.update.return_value = updated + response = client.put("/risks/RISK-001", json={"title": "Aktualisiertes Risiko"}) + assert response.status_code == 200 + assert response.json()["title"] == "Aktualisiertes Risiko" + + def test_update_not_found(self): + with patch("compliance.api.risk_routes.RiskRepository") as MockRepo: + MockRepo.return_value.get_by_risk_id.return_value = None + response = client.put("/risks/RISK-999", json={"title": "Test"}) + assert response.status_code == 404 + + def test_update_status_change(self): + updated = make_risk() + updated.status = "closed" + with patch("compliance.api.risk_routes.RiskRepository") as MockRepo: + repo = MockRepo.return_value + repo.get_by_risk_id.return_value = make_risk() + repo.update.return_value = updated + response = client.put("/risks/RISK-001", json={"status": "closed"}) + assert response.status_code == 200 + assert response.json()["status"] == "closed" + + +class TestDeleteRisk: + """Tests for DELETE /risks/{risk_id}.""" + + def test_delete_success(self): + with patch("compliance.api.risk_routes.RiskRepository") as MockRepo: + repo = MockRepo.return_value + repo.get_by_risk_id.return_value = make_risk() + # Delete uses db.delete(risk) directly + response = client.delete("/risks/RISK-001") + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + + def test_delete_not_found(self): + with patch("compliance.api.risk_routes.RiskRepository") as MockRepo: + MockRepo.return_value.get_by_risk_id.return_value = None + response = client.delete("/risks/RISK-999") + assert response.status_code == 404 + + +class TestRiskMatrix: + """Tests for GET /risks/matrix.""" + + def test_matrix_returns_structure(self): + # Schema: Dict[str, Dict[str, List[str]]] → {likelihood: {impact: [risk_ids]}} + matrix_data = { + "3": {"4": ["RISK-001"]}, + "1": {"1": []}, + } + with patch("compliance.api.risk_routes.RiskRepository") as MockRepo: + repo = MockRepo.return_value + repo.get_risk_matrix.return_value = matrix_data + repo.get_all.return_value = [make_risk()] + response = client.get("/risks/matrix") + assert response.status_code == 200 + data = response.json() + assert "matrix" in data + assert "risks" in data + assert len(data["risks"]) == 1 + + def test_matrix_empty(self): + with patch("compliance.api.risk_routes.RiskRepository") as MockRepo: + repo = MockRepo.return_value + repo.get_risk_matrix.return_value = {} + repo.get_all.return_value = [] + response = client.get("/risks/matrix") + assert response.status_code == 200 + data = response.json() + assert data["risks"] == [] diff --git a/docs-src/services/sdk-modules/compliance-kern.md b/docs-src/services/sdk-modules/compliance-kern.md new file mode 100644 index 0000000..211dfbe --- /dev/null +++ b/docs-src/services/sdk-modules/compliance-kern.md @@ -0,0 +1,275 @@ +# Compliance-Kern-Module (Paket 3) + +Vier Module bilden das technische Rückgrat der Compliance-Plattform: +**Anforderungen**, **Controls**, **Nachweise** und **Risiken**. + +Sie sind miteinander verknüpft: Anforderungen erzeugen Controls → Controls verlangen Nachweise → Risiken werden durch Controls gemindert. + +--- + +## Überblick + +| Modul | Prefix | Endpunkte | Besonderheiten | +|-------|--------|-----------|----------------| +| **CP-ANF** Anforderungen | `/compliance/requirements` | 6 | Pagination, RAG-Kontext, Audit-Tracking | +| **CP-CTR** Controls | `/compliance/controls` | 6 | Domain-Enum, Status-Enum, Evidence-Count | +| **CP-NAC** Nachweise | `/evidence` | 6 | CI/CD-Ingest, File-Upload, Auto-Risk-Update | +| **CP-RSK** Risiken | `/risks` | 5 | Risikomatrix, Likelihood × Impact | + +**Frontend:** `https://macmini:3007/sdk/anforderungen`, `/sdk/controls`, `/sdk/evidence`, `/sdk/risks` + +**Proxy:** `/api/sdk/v1/compliance/[[...path]]` → `backend-compliance:8002/compliance/...` + +--- + +## CP-ANF — Anforderungen + +Verwaltet regulatorische Anforderungen (DSGVO, AI Act, CRA, NIS2 etc.). + +### Endpunkte + +| Methode | Pfad | Beschreibung | +|---------|------|--------------| +| `GET` | `/compliance/requirements` | Paginierte Liste (page, page_size, search, is_applicable) | +| `GET` | `/compliance/requirements/{id}` | Einzelne Anforderung + optionaler RAG-Rechtskontext | +| `GET` | `/compliance/regulations/{code}/requirements` | Alle Anforderungen einer Regulierung | +| `POST` | `/compliance/requirements` | Neue Anforderung anlegen | +| `PUT` | `/compliance/requirements/{id}` | Implementation-Status, Audit-Notizen aktualisieren | +| `DELETE` | `/compliance/requirements/{id}` | Anforderung löschen | + +### Request-Beispiel (POST) + +```json +{ + "regulation_id": "uuid-der-regulierung", + "article": "Art. 32", + "title": "Sicherheit der Verarbeitung", + "is_applicable": true, + "priority": 1 +} +``` + +### Response-Felder (RequirementResponse) + +| Feld | Typ | Beschreibung | +|------|-----|--------------| +| `id` | string | UUID | +| `regulation_code` | string | z.B. "GDPR", "AI_ACT" | +| `article` | string | Artikel-Referenz | +| `implementation_status` | string | not_started / implemented / partial | +| `audit_status` | string | pending / passed / failed | +| `last_audit_date` | datetime? | Letztes Audit-Datum | + +### RAG-Rechtskontext + +```http +GET /compliance/requirements/{id}?include_legal_context=true +``` + +Gibt zusätzlich `legal_context[]` mit RAG-Ergebnissen zurück: + +```json +{ + "legal_context": [ + { + "text": "...", + "regulation_code": "GDPR", + "article": "Art. 32", + "score": 0.92, + "source_url": "https://eur-lex.europa.eu/..." + } + ] +} +``` + +--- + +## CP-CTR — Controls + +Verwaltet technische und organisatorische Kontrollen (TOMs, Prozesse). + +### Status-Enum + +| Wert | Bedeutung | +|------|-----------| +| `pass` | Vollständig implementiert und geprüft | +| `partial` | Teilweise implementiert | +| `fail` | Nicht bestanden | +| `planned` | In Planung | +| `n/a` | Nicht anwendbar | + +### Domain-Enum + +`gov` · `priv` · `iam` · `crypto` · `sdlc` · `ops` · `ai` · `cra` · `aud` + +### Endpunkte + +| Methode | Pfad | Beschreibung | +|---------|------|--------------| +| `GET` | `/compliance/controls` | Liste (domain, status, is_automated, search) | +| `GET` | `/compliance/controls/paginated` | Paginiert (page, page_size) | +| `GET` | `/compliance/controls/{control_id}` | Einzelne Kontrolle + Evidence-Count | +| `PUT` | `/compliance/controls/{control_id}` | Titel, Status, Notizen aktualisieren | +| `PUT` | `/compliance/controls/{control_id}/review` | Kontrolle als geprüft markieren | +| `GET` | `/compliance/controls/by-domain/{domain}` | Alle Controls einer Domain | + +### KI-Controls aus RAG vorschlagen + +```http +POST /compliance/ai/suggest-controls +{ + "requirement_id": "uuid-der-anforderung" +} +``` + +Gibt bis zu 5 KI-generierte Control-Vorschläge zurück, die auf dem Inhalt des RAG-Corpus basieren: + +```json +{ + "requirement_id": "...", + "suggestions": [ + { + "control_id": "GOV-KI-001", + "domain": "gov", + "title": "Datenschutzbeauftragter für KI-Systeme", + "description": "...", + "pass_criteria": "DSB nachweislich ernannt", + "is_automated": false, + "priority": 1, + "confidence_score": 0.87 + } + ] +} +``` + +--- + +## CP-NAC — Nachweise (Evidence) + +Verknüpft Prüfnachweise mit Controls. Unterstützt CI/CD-Automatisierung. + +### Nachweistypen + +`test_results` · `audit_report` · `penetration_test` · `sast` · `dependency_scan` · `sbom` · `container_scan` · `secret_scan` · `code_review` + +### Endpunkte + +| Methode | Pfad | Beschreibung | +|---------|------|--------------| +| `GET` | `/evidence` | Liste (control_id, evidence_type, status, page, limit) | +| `POST` | `/evidence` | Nachweis manuell anlegen | +| `DELETE` | `/evidence/{id}` | Nachweis löschen | +| `POST` | `/evidence/upload` | Datei hochladen (PDF, ZIP, ...) | +| `POST` | `/evidence/collect` | CI/CD-Nachweis automatisch erfassen | +| `GET` | `/evidence/ci-status` | CI-Nachweisstand für eine Kontrolle | + +### CI/CD-Integration + +```json +POST /evidence/collect +{ + "control_id": "SDLC-001", + "evidence_type": "test_results", + "title": "Pytest Run 2024-03-01", + "ci_job_id": "gh-actions-12345", + "artifact_url": "https://github.com/.../artifacts/report.xml" +} +``` + +Nach dem Collect wird automatisch der Control-Status aktualisiert (`AutoRiskUpdater`). + +--- + +## CP-RSK — Risiken + +Verwaltet Datenschutz- und KI-Risiken (Risikobewertung nach Likelihood × Impact). + +### Endpunkte + +| Methode | Pfad | Beschreibung | +|---------|------|--------------| +| `GET` | `/risks` | Liste (category, status, risk_level) | +| `POST` | `/risks` | Neues Risiko anlegen | +| `PUT` | `/risks/{risk_id}` | Risiko aktualisieren (Status, Restrisiko) | +| `DELETE` | `/risks/{risk_id}` | Risiko löschen | +| `GET` | `/risks/matrix` | Risikomatrix (Likelihood × Impact) | + +### Risikomatrix + +```http +GET /risks/matrix +``` + +```json +{ + "matrix": { + "3": { "4": ["RISK-001", "RISK-007"] }, + "1": { "1": [] } + }, + "risks": [...] +} +``` + +Die Matrix ist nach `likelihood` (1–5) → `impact` (1–5) → `[risk_ids]` strukturiert. + +### Risikobewertung + +| Inherent Risk | Likelihood × Impact | +|---------------|---------------------| +| `low` | ≤ 4 | +| `medium` | 5–9 | +| `high` | 10–19 | +| `critical` | ≥ 20 | + +--- + +## Tests + +```bash +cd backend-compliance + +# Anforderungen +python3 -m pytest tests/test_requirement_routes.py -v + +# Controls +python3 -m pytest tests/test_control_routes.py -v + +# Nachweise +python3 -m pytest tests/test_evidence_routes.py -v + +# Risiken +python3 -m pytest tests/test_risk_routes.py -v + +# Alle 4 Module +python3 -m pytest tests/test_requirement_routes.py tests/test_control_routes.py \ + tests/test_evidence_routes.py tests/test_risk_routes.py -v +``` + +**Testabdeckung (Stand 2026-03-05):** 74 Tests, alle bestanden. + +| Datei | Tests | Status | +|-------|-------|--------| +| `test_requirement_routes.py` | 18 | ✅ | +| `test_control_routes.py` | 21 | ✅ | +| `test_evidence_routes.py` | 11 | ✅ | +| `test_risk_routes.py` | 16 | ✅ (+ 8 aus Paket 2) | + +--- + +## Datenbank-Schema + +```sql +-- Anforderungen +compliance_requirements (id, regulation_id, article, title, implementation_status, ...) + +-- Controls +compliance_controls (id, control_id, domain, control_type, title, status, ...) + +-- Nachweise +compliance_evidence (id, control_id, evidence_type, title, artifact_path, status, ...) + +-- Risiken +compliance_risks (id, risk_id, title, category, likelihood, impact, inherent_risk, ...) +``` + +Alle Tabellen werden beim Start via `SQLAlchemy Base.metadata.create_all()` angelegt. diff --git a/mkdocs.yml b/mkdocs.yml index da6cf74..f380267 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -67,6 +67,7 @@ nav: - SDK Module: - Vorbereitung-Module (Paket 1): services/sdk-modules/vorbereitung-module.md - Freigabe-Module (Paket 2): services/sdk-modules/freigabe-module.md + - Compliance-Kern-Module (Paket 3): services/sdk-modules/compliance-kern.md - Analyse-Module (Paket 2): services/sdk-modules/analyse-module.md - Dokumentations-Module (Paket 3+): services/sdk-modules/dokumentations-module.md - DSFA (Art. 35 DSGVO): services/sdk-modules/dsfa.md