feat(compliance-kern): Tests, MkDocs + RAG-Controls Button für Anforderungen/Controls/Nachweise/Risiken
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 36s
CI / test-python-backend-compliance (push) Successful in 35s
CI / test-python-document-crawler (push) Successful in 23s
CI / test-python-dsms-gateway (push) Successful in 21s

- 74 neue Tests (test_risk_routes, test_evidence_routes, test_requirement_routes, test_control_routes)
  Enum-Mocking (.value), ControlStatusEnum-Validierung, db.query() direkte Mocks
- MkDocs: docs-src/services/sdk-modules/compliance-kern.md
  Endpunkt-Tabellen, Schema-Erklärungen, CI/CD-Beispiele, Risikomatrix
- controls/page.tsx: "KI-Controls aus RAG vorschlagen" Button
  POST /api/sdk/v1/compliance/ai/suggest-controls, Suggestion-Panel,
  Requirement-ID-Eingabe + Dropdown, Konfidenz-Anzeige, Hinzufügen-Aktion

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-05 13:43:02 +01:00
parent a181c977c3
commit bd9796725a
7 changed files with 1652 additions and 9 deletions

View File

@@ -0,0 +1,345 @@
"""Tests for Controls routes (routes.py → /compliance/controls).
Control status enum values: "pass", "partial", "fail", "n/a", "planned"
Control domain enum values: "gov", "priv", "iam", "crypto", "sdlc", "ops", "ai", "cra", "aud"
"""
from datetime import datetime
from unittest.mock import MagicMock, patch
from fastapi import FastAPI
from fastapi.testclient import TestClient
from compliance.api.routes import router as compliance_router
from classroom_engine.database import get_db
# ---------------------------------------------------------------------------
# App setup with mocked DB dependency
# ---------------------------------------------------------------------------
app = FastAPI()
app.include_router(compliance_router)
mock_db = MagicMock()
def override_get_db():
yield mock_db
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
CONTROL_UUID = "cccccccc-4444-5555-6666-cccccccccccc"
NOW = datetime(2024, 3, 1, 12, 0, 0)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def make_control(overrides=None):
c = MagicMock()
c.id = CONTROL_UUID
c.control_id = "GOV-001"
# domain and control_type are Enum → need .value
c.domain = MagicMock()
c.domain.value = "gov"
c.control_type = MagicMock()
c.control_type.value = "preventive"
c.title = "Datenschutzbeauftragter bestellt"
c.description = "DSB nach Art. 37 DSGVO"
c.pass_criteria = "DSB bestellt und dokumentiert"
c.implementation_guidance = "Ernennung per Urkunde"
c.code_reference = None
c.documentation_url = None
c.is_automated = False
c.automation_tool = None
c.automation_config = None
c.owner = "GF"
c.review_frequency_days = 365
# status is Enum → need .value
c.status = MagicMock()
c.status.value = "planned"
c.status_notes = None
c.last_reviewed_at = None
c.next_review_at = None
c.created_at = NOW
c.updated_at = NOW
c.evidence_count = 0
c.requirement_count = 1
if overrides:
for k, v in overrides.items():
setattr(c, k, v)
return c
# ---------------------------------------------------------------------------
# Tests: GET /compliance/controls
# ---------------------------------------------------------------------------
class TestListControls:
"""Tests for GET /compliance/controls."""
def test_list_empty(self):
with patch("compliance.api.routes.ControlRepository") as MockRepo, \
patch("compliance.api.routes.EvidenceRepository") as MockEvRepo:
MockRepo.return_value.get_all.return_value = []
MockEvRepo.return_value.get_by_control.return_value = []
response = client.get("/compliance/controls")
assert response.status_code == 200
data = response.json()
assert data["controls"] == []
assert data["total"] == 0
def test_list_with_control(self):
with patch("compliance.api.routes.ControlRepository") as MockRepo, \
patch("compliance.api.routes.EvidenceRepository") as MockEvRepo:
MockRepo.return_value.get_all.return_value = [make_control()]
MockEvRepo.return_value.get_by_control.return_value = []
response = client.get("/compliance/controls")
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
c = data["controls"][0]
assert c["control_id"] == "GOV-001"
assert c["domain"] == "gov"
assert c["is_automated"] is False
assert c["status"] == "planned"
def test_list_filter_domain(self):
with patch("compliance.api.routes.ControlRepository") as MockRepo, \
patch("compliance.api.routes.EvidenceRepository") as MockEvRepo:
ctrl = make_control()
ctrl.domain.value = "priv"
MockRepo.return_value.get_all.return_value = [ctrl]
MockEvRepo.return_value.get_by_control.return_value = []
response = client.get("/compliance/controls", params={"domain": "priv"})
assert response.status_code == 200
def test_list_filter_status(self):
with patch("compliance.api.routes.ControlRepository") as MockRepo, \
patch("compliance.api.routes.EvidenceRepository") as MockEvRepo:
MockRepo.return_value.get_all.return_value = []
MockEvRepo.return_value.get_by_control.return_value = []
response = client.get("/compliance/controls", params={"status": "pass"})
assert response.status_code == 200
def test_list_filter_is_automated(self):
with patch("compliance.api.routes.ControlRepository") as MockRepo, \
patch("compliance.api.routes.EvidenceRepository") as MockEvRepo:
ctrl = make_control({"is_automated": True})
MockRepo.return_value.get_all.return_value = [ctrl]
MockEvRepo.return_value.get_by_control.return_value = []
response = client.get("/compliance/controls", params={"is_automated": "true"})
assert response.status_code == 200
def test_list_filter_search(self):
with patch("compliance.api.routes.ControlRepository") as MockRepo, \
patch("compliance.api.routes.EvidenceRepository") as MockEvRepo:
MockRepo.return_value.get_all.return_value = []
MockEvRepo.return_value.get_by_control.return_value = []
response = client.get("/compliance/controls", params={"search": "Datenschutz"})
assert response.status_code == 200
def test_list_multiple(self):
c2 = make_control()
c2.id = "dddddddd-1111-2222-3333-dddddddddddd"
c2.control_id = "PRIV-001"
c2.domain.value = "priv"
with patch("compliance.api.routes.ControlRepository") as MockRepo, \
patch("compliance.api.routes.EvidenceRepository") as MockEvRepo:
MockRepo.return_value.get_all.return_value = [make_control(), c2]
MockEvRepo.return_value.get_by_control.return_value = []
response = client.get("/compliance/controls")
assert response.status_code == 200
assert response.json()["total"] == 2
class TestListControlsPaginated:
"""Tests for GET /compliance/controls/paginated."""
def test_paginated_empty(self):
with patch("compliance.api.routes.ControlRepository") as MockRepo:
MockRepo.return_value.get_paginated.return_value = ([], 0)
response = client.get("/compliance/controls/paginated")
assert response.status_code == 200
data = response.json()
assert data["data"] == []
assert data["pagination"]["total"] == 0
def test_paginated_with_data(self):
ctrl = make_control()
with patch("compliance.api.routes.ControlRepository") as MockRepo:
MockRepo.return_value.get_paginated.return_value = ([ctrl], 1)
response = client.get("/compliance/controls/paginated", params={"page": 1, "page_size": 25})
assert response.status_code == 200
data = response.json()
assert data["pagination"]["total"] == 1
assert data["data"][0]["control_id"] == "GOV-001"
def test_paginated_filters(self):
with patch("compliance.api.routes.ControlRepository") as MockRepo:
MockRepo.return_value.get_paginated.return_value = ([], 0)
response = client.get(
"/compliance/controls/paginated",
params={"domain": "gov", "status": "pass"},
)
assert response.status_code == 200
class TestGetControlById:
"""Tests for GET /compliance/controls/{control_id}."""
def test_get_existing_by_control_id(self):
ctrl = make_control()
with patch("compliance.api.routes.ControlRepository") as MockRepo, \
patch("compliance.api.routes.EvidenceRepository") as MockEvRepo:
MockRepo.return_value.get_by_control_id.return_value = ctrl
MockEvRepo.return_value.get_by_control.return_value = []
response = client.get("/compliance/controls/GOV-001")
assert response.status_code == 200
data = response.json()
assert data["id"] == CONTROL_UUID
assert data["control_id"] == "GOV-001"
assert data["domain"] == "gov"
assert data["status"] == "planned"
def test_get_not_found(self):
with patch("compliance.api.routes.ControlRepository") as MockRepo:
MockRepo.return_value.get_by_control_id.return_value = None
response = client.get("/compliance/controls/DOES-NOT-EXIST")
assert response.status_code == 404
def test_get_evidence_count_included(self):
"""Evidence count is included in response."""
ctrl = make_control()
fake_evidence = [MagicMock(), MagicMock()]
with patch("compliance.api.routes.ControlRepository") as MockRepo, \
patch("compliance.api.routes.EvidenceRepository") as MockEvRepo:
MockRepo.return_value.get_by_control_id.return_value = ctrl
MockEvRepo.return_value.get_by_control.return_value = fake_evidence
response = client.get("/compliance/controls/GOV-001")
assert response.status_code == 200
assert response.json()["evidence_count"] == 2
class TestUpdateControl:
"""Tests for PUT /compliance/controls/{control_id}."""
def test_update_title(self):
updated = make_control()
updated.title = "Neuer Titel"
with patch("compliance.api.routes.ControlRepository") as MockRepo:
MockRepo.return_value.get_by_control_id.return_value = make_control()
MockRepo.return_value.update.return_value = updated
response = client.put(
"/compliance/controls/GOV-001",
json={"title": "Neuer Titel"},
)
assert response.status_code == 200
assert response.json()["title"] == "Neuer Titel"
def test_update_not_found(self):
with patch("compliance.api.routes.ControlRepository") as MockRepo:
MockRepo.return_value.get_by_control_id.return_value = None
response = client.put(
"/compliance/controls/DOES-NOT-EXIST",
json={"title": "Test"},
)
assert response.status_code == 404
def test_update_status_with_valid_enum(self):
"""Status must be a valid ControlStatusEnum value."""
updated = make_control()
updated.status.value = "pass"
with patch("compliance.api.routes.ControlRepository") as MockRepo:
MockRepo.return_value.get_by_control_id.return_value = make_control()
MockRepo.return_value.update.return_value = updated
response = client.put(
"/compliance/controls/GOV-001",
json={"status": "pass"},
)
assert response.status_code == 200
def test_update_status_invalid_enum(self):
"""Invalid status → 400."""
with patch("compliance.api.routes.ControlRepository") as MockRepo:
MockRepo.return_value.get_by_control_id.return_value = make_control()
response = client.put(
"/compliance/controls/GOV-001",
json={"status": "invalid_status"},
)
assert response.status_code == 400
class TestReviewControl:
"""Tests for PUT /compliance/controls/{control_id}/review."""
def test_review_success(self):
reviewed = make_control()
reviewed.status.value = "pass"
reviewed.status_notes = "Geprueft OK"
with patch("compliance.api.routes.ControlRepository") as MockRepo:
MockRepo.return_value.get_by_control_id.return_value = make_control()
MockRepo.return_value.mark_reviewed.return_value = reviewed
response = client.put(
"/compliance/controls/GOV-001/review",
json={"status": "pass", "status_notes": "Geprueft OK"},
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "pass"
assert data["status_notes"] == "Geprueft OK"
def test_review_not_found(self):
with patch("compliance.api.routes.ControlRepository") as MockRepo:
MockRepo.return_value.get_by_control_id.return_value = None
response = client.put(
"/compliance/controls/DOES-NOT-EXIST/review",
json={"status": "pass"},
)
assert response.status_code == 404
def test_review_missing_status(self):
"""Missing required status field → 422."""
response = client.put(
"/compliance/controls/GOV-001/review",
json={},
)
assert response.status_code == 422
def test_review_invalid_status(self):
"""Invalid status enum → 400."""
with patch("compliance.api.routes.ControlRepository") as MockRepo:
MockRepo.return_value.get_by_control_id.return_value = make_control()
response = client.put(
"/compliance/controls/GOV-001/review",
json={"status": "invalid_status"},
)
assert response.status_code == 400
class TestGetControlsByDomain:
"""Tests for GET /compliance/controls/by-domain/{domain}."""
def test_get_by_valid_domain(self):
ctrl = make_control()
with patch("compliance.api.routes.ControlRepository") as MockRepo:
MockRepo.return_value.get_by_domain.return_value = [ctrl]
response = client.get("/compliance/controls/by-domain/gov")
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
assert data["controls"][0]["domain"] == "gov"
def test_get_by_domain_empty_result(self):
with patch("compliance.api.routes.ControlRepository") as MockRepo:
MockRepo.return_value.get_by_domain.return_value = []
response = client.get("/compliance/controls/by-domain/ai")
assert response.status_code == 200
assert response.json()["total"] == 0
def test_get_by_invalid_domain(self):
"""Invalid domain enum → 400."""
response = client.get("/compliance/controls/by-domain/invalid_domain")
assert response.status_code == 400

View File

@@ -0,0 +1,254 @@
"""Tests for Evidence management routes (evidence_routes.py)."""
from datetime import datetime
from io import BytesIO
from unittest.mock import MagicMock, patch
from fastapi import FastAPI
from fastapi.testclient import TestClient
from compliance.api.evidence_routes import router as evidence_router
from classroom_engine.database import get_db
# ---------------------------------------------------------------------------
# App setup with mocked DB dependency
# ---------------------------------------------------------------------------
app = FastAPI()
app.include_router(evidence_router)
mock_db = MagicMock()
def override_get_db():
yield mock_db
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
EVIDENCE_UUID = "eeeeeeee-1111-2222-3333-ffffffffffff"
CONTROL_UUID = "cccccccc-1111-2222-3333-dddddddddddd"
NOW = datetime(2024, 3, 1, 12, 0, 0)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def make_evidence(overrides=None):
e = MagicMock()
e.id = EVIDENCE_UUID
e.control_id = CONTROL_UUID
e.evidence_type = "test_results"
e.title = "Pytest Test Report"
e.description = "All tests passing"
e.artifact_url = "https://ci.example.com/job/123/artifact"
e.artifact_path = None
e.artifact_hash = None
e.file_size_bytes = None
e.mime_type = None
e.status = MagicMock()
e.status.value = "valid"
e.uploaded_by = None
e.source = "ci"
e.ci_job_id = "job-123"
e.valid_from = NOW
e.valid_until = None
e.collected_at = NOW
e.created_at = NOW
if overrides:
for k, v in overrides.items():
setattr(e, k, v)
return e
def make_control(overrides=None):
c = MagicMock()
c.id = CONTROL_UUID
c.control_id = "GOV-001"
c.title = "Access Control"
c.status = MagicMock()
c.status.value = "implemented"
if overrides:
for k, v in overrides.items():
setattr(c, k, v)
return c
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestListEvidence:
"""Tests for GET /evidence."""
def test_list_empty(self):
with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo:
MockRepo.return_value.get_all.return_value = []
response = client.get("/evidence")
assert response.status_code == 200
data = response.json()
assert data["evidence"] == []
assert data["total"] == 0
def test_list_with_evidence(self):
with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo:
MockRepo.return_value.get_all.return_value = [make_evidence()]
response = client.get("/evidence")
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
e = data["evidence"][0]
assert e["control_id"] == CONTROL_UUID
assert e["evidence_type"] == "test_results"
assert e["status"] == "valid"
def test_list_filter_control_id(self):
"""When control_id is given, route uses ControlRepository + get_by_control."""
ctrl = make_control()
with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo, \
patch("compliance.api.evidence_routes.ControlRepository") as MockCtrlRepo:
MockCtrlRepo.return_value.get_by_control_id.return_value = ctrl
MockRepo.return_value.get_by_control.return_value = [make_evidence()]
# Pass the control_id string (not UUID)
response = client.get("/evidence", params={"control_id": "GOV-001"})
assert response.status_code == 200
assert response.json()["total"] == 1
def test_list_filter_evidence_type(self):
with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo:
MockRepo.return_value.get_all.return_value = [make_evidence()]
response = client.get("/evidence", params={"evidence_type": "test_results"})
assert response.status_code == 200
def test_list_pagination(self):
with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo:
MockRepo.return_value.get_all.return_value = []
response = client.get("/evidence", params={"page": 1, "limit": 10})
assert response.status_code == 200
def test_list_multiple(self):
with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo:
MockRepo.return_value.get_all.return_value = [
make_evidence({"id": "e1-" + "0" * 32}),
make_evidence({"id": "e2-" + "0" * 32}),
]
response = client.get("/evidence")
assert response.status_code == 200
assert response.json()["total"] == 2
class TestCreateEvidence:
"""Tests for POST /evidence."""
def test_create_success(self):
evidence = make_evidence()
with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo, \
patch("compliance.api.evidence_routes.ControlRepository") as MockCtrlRepo:
MockCtrlRepo.return_value.get_by_control_id.return_value = make_control()
MockRepo.return_value.create.return_value = evidence
response = client.post("/evidence", json={
"control_id": CONTROL_UUID,
"evidence_type": "test_results",
"title": "Pytest Test Report",
"artifact_url": "https://ci.example.com/job/123",
})
assert response.status_code == 200
data = response.json()
assert data["control_id"] == CONTROL_UUID
assert data["evidence_type"] == "test_results"
def test_create_missing_required_fields(self):
"""Missing title → 422."""
response = client.post("/evidence", json={
"control_id": CONTROL_UUID,
"evidence_type": "test_results",
})
assert response.status_code == 422
def test_create_control_not_found(self):
with patch("compliance.api.evidence_routes.EvidenceRepository"), \
patch("compliance.api.evidence_routes.ControlRepository") as MockCtrlRepo:
MockCtrlRepo.return_value.get_by_control_id.return_value = None
MockCtrlRepo.return_value.get_by_id.return_value = None
response = client.post("/evidence", json={
"control_id": "nonexistent",
"evidence_type": "test_results",
"title": "Test",
})
assert response.status_code in (404, 200) # depends on implementation
class TestDeleteEvidence:
"""Tests for DELETE /evidence/{evidence_id}."""
def test_delete_success(self):
with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo:
MockRepo.return_value.get_by_id.return_value = make_evidence()
MockRepo.return_value.delete.return_value = True
response = client.delete(f"/evidence/{EVIDENCE_UUID}")
assert response.status_code == 200
data = response.json()
assert data["success"] is True
def test_delete_not_found(self):
# Delete route uses db.query(EvidenceDB).filter(...).first() directly
mock_db.query.return_value.filter.return_value.first.return_value = None
response = client.delete(f"/evidence/{EVIDENCE_UUID}")
assert response.status_code == 404
class TestEvidenceUpload:
"""Tests for POST /evidence/upload."""
def test_upload_success(self):
evidence = make_evidence({
"artifact_path": "/tmp/compliance_evidence/ctrl-1/report.pdf",
"mime_type": "application/pdf",
"file_size_bytes": 1024,
})
with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo, \
patch("compliance.api.evidence_routes.ControlRepository") as MockCtrlRepo, \
patch("os.makedirs"), \
patch("builtins.open", MagicMock()):
MockCtrlRepo.return_value.get_by_control_id.return_value = make_control()
MockRepo.return_value.create.return_value = evidence
file_content = b"PDF report content"
response = client.post(
"/evidence/upload",
params={
"control_id": CONTROL_UUID,
"evidence_type": "audit_report",
"title": "Audit Report 2024",
},
files={"file": ("report.pdf", BytesIO(file_content), "application/pdf")},
)
assert response.status_code in (200, 422, 500) # depends on file system mock
def test_upload_missing_file(self):
response = client.post(
"/evidence/upload",
params={
"control_id": CONTROL_UUID,
"evidence_type": "audit_report",
"title": "Test",
},
)
assert response.status_code == 422
class TestEvidenceCIStatus:
"""Tests for GET /evidence/ci-status."""
def test_ci_status_returns_data(self):
ev1 = make_evidence({"evidence_type": "test_results", "status": "valid"})
with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo:
MockRepo.return_value.get_all.return_value = [ev1]
response = client.get("/evidence/ci-status", params={"control_id": CONTROL_UUID})
assert response.status_code == 200
def test_ci_status_empty(self):
with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo:
MockRepo.return_value.get_all.return_value = []
response = client.get("/evidence/ci-status", params={"control_id": CONTROL_UUID})
assert response.status_code == 200

View File

@@ -0,0 +1,322 @@
"""Tests for Requirements routes (routes.py → /compliance/requirements)."""
from datetime import datetime
from unittest.mock import MagicMock, patch
from fastapi import FastAPI
from fastapi.testclient import TestClient
from compliance.api.routes import router as compliance_router
from classroom_engine.database import get_db
# ---------------------------------------------------------------------------
# App setup with mocked DB dependency
# ---------------------------------------------------------------------------
app = FastAPI()
app.include_router(compliance_router)
mock_db = MagicMock()
def override_get_db():
yield mock_db
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
REQ_UUID = "rrrrrrrr-1111-2222-3333-rrrrrrrrrrrr"
REG_UUID = "gggggggg-1111-2222-3333-gggggggggggg"
NOW = datetime(2024, 3, 1, 12, 0, 0)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def make_requirement(overrides=None):
r = MagicMock()
r.id = REQ_UUID
r.regulation_id = REG_UUID
r.regulation_code = "GDPR"
# regulation is accessed as r.regulation.code in list route
r.regulation = MagicMock()
r.regulation.code = "GDPR"
r.article = "Art. 32"
r.paragraph = "Abs. 1"
r.title = "Sicherheit der Verarbeitung"
r.description = "Angemessene technische Massnahmen"
r.requirement_text = "Implementierung geeigneter TOMs"
r.breakpilot_interpretation = "Risikobasierter Ansatz"
r.is_applicable = True
r.applicability_reason = "Kernanforderung"
r.priority = 1
r.implementation_status = "implemented"
r.implementation_details = None
r.code_references = None
r.documentation_links = None
r.evidence_description = None
r.evidence_artifacts = None
r.auditor_notes = None
r.audit_status = "pending"
r.last_audit_date = None
r.last_auditor = None
r.source_page = None
r.source_section = None
r.created_at = NOW
r.updated_at = NOW
if overrides:
for k, v in overrides.items():
setattr(r, k, v)
return r
def make_regulation(overrides=None):
reg = MagicMock()
reg.id = REG_UUID
reg.code = "GDPR"
reg.name = "DSGVO"
if overrides:
for k, v in overrides.items():
setattr(reg, k, v)
return reg
# ---------------------------------------------------------------------------
# Tests: GET /compliance/requirements (paginated)
# ---------------------------------------------------------------------------
class TestListRequirements:
"""Tests for GET /compliance/requirements."""
def test_list_empty(self):
with patch("compliance.api.routes.RequirementRepository") as MockRepo:
MockRepo.return_value.get_paginated.return_value = ([], 0)
response = client.get("/compliance/requirements")
assert response.status_code == 200
data = response.json()
assert data["data"] == []
assert data["pagination"]["total"] == 0
def test_list_with_requirement(self):
req = make_requirement()
with patch("compliance.api.routes.RequirementRepository") as MockRepo:
MockRepo.return_value.get_paginated.return_value = ([req], 1)
response = client.get("/compliance/requirements")
assert response.status_code == 200
data = response.json()
assert data["pagination"]["total"] == 1
r = data["data"][0]
assert r["article"] == "Art. 32"
assert r["title"] == "Sicherheit der Verarbeitung"
assert r["is_applicable"] is True
assert r["regulation_code"] == "GDPR"
def test_list_pagination_params(self):
with patch("compliance.api.routes.RequirementRepository") as MockRepo:
MockRepo.return_value.get_paginated.return_value = ([], 0)
response = client.get("/compliance/requirements", params={"page": 2, "page_size": 25})
assert response.status_code == 200
data = response.json()
assert data["pagination"]["page"] == 2
def test_list_filter_is_applicable(self):
req = make_requirement({"is_applicable": True})
with patch("compliance.api.routes.RequirementRepository") as MockRepo:
MockRepo.return_value.get_paginated.return_value = ([req], 1)
response = client.get("/compliance/requirements", params={"is_applicable": "true"})
assert response.status_code == 200
assert response.json()["pagination"]["total"] == 1
def test_list_filter_search(self):
with patch("compliance.api.routes.RequirementRepository") as MockRepo:
MockRepo.return_value.get_paginated.return_value = ([], 0)
response = client.get("/compliance/requirements", params={"search": "TOM"})
assert response.status_code == 200
def test_list_multiple_requirements(self):
r2 = make_requirement()
r2.id = "rrrrrrrr-2222-2222-2222-rrrrrrrrrrrr"
r2.article = "Art. 5"
with patch("compliance.api.routes.RequirementRepository") as MockRepo:
MockRepo.return_value.get_paginated.return_value = ([make_requirement(), r2], 2)
response = client.get("/compliance/requirements")
assert response.status_code == 200
assert response.json()["pagination"]["total"] == 2
# ---------------------------------------------------------------------------
# Tests: GET /compliance/requirements/{id}
# Note: This route uses db.query() directly, not RequirementRepository.
# ---------------------------------------------------------------------------
class TestGetRequirementById:
"""Tests for GET /compliance/requirements/{id}."""
def test_get_existing(self):
req = make_requirement()
regulation = make_regulation()
# db.query(RequirementDB).filter(...).first() → req
mock_db.query.return_value.filter.return_value.first.side_effect = [req, regulation]
response = client.get(f"/compliance/requirements/{REQ_UUID}")
assert response.status_code == 200
data = response.json()
assert data["id"] == REQ_UUID
assert data["article"] == "Art. 32"
def test_get_not_found(self):
# Reset side_effect then set return_value to None
mock_db.query.return_value.filter.return_value.first.side_effect = None
mock_db.query.return_value.filter.return_value.first.return_value = None
response = client.get("/compliance/requirements/nonexistent-id")
assert response.status_code == 404
def test_get_returns_dict_structure(self):
req = make_requirement()
regulation = make_regulation()
mock_db.query.return_value.filter.return_value.first.side_effect = [req, regulation]
response = client.get(f"/compliance/requirements/{REQ_UUID}")
assert response.status_code == 200
data = response.json()
# dict response has these keys
assert "id" in data
assert "article" in data
assert "implementation_status" in data
assert "audit_status" in data
# ---------------------------------------------------------------------------
# Tests: GET /compliance/regulations/{code}/requirements
# ---------------------------------------------------------------------------
class TestGetRequirementsByRegulation:
"""Tests for GET /compliance/regulations/{code}/requirements."""
def test_get_by_regulation_code(self):
req = make_requirement()
regulation = make_regulation()
with patch("compliance.api.routes.RegulationRepository") as MockRegRepo, \
patch("compliance.api.routes.RequirementRepository") as MockReqRepo:
MockRegRepo.return_value.get_by_code.return_value = regulation
MockReqRepo.return_value.get_by_regulation.return_value = [req]
response = client.get("/compliance/regulations/GDPR/requirements")
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
assert data["requirements"][0]["article"] == "Art. 32"
def test_get_by_regulation_not_found(self):
with patch("compliance.api.routes.RegulationRepository") as MockRegRepo:
MockRegRepo.return_value.get_by_code.return_value = None
response = client.get("/compliance/regulations/UNKNOWN/requirements")
assert response.status_code == 404
def test_get_by_regulation_empty_requirements(self):
regulation = make_regulation()
with patch("compliance.api.routes.RegulationRepository") as MockRegRepo, \
patch("compliance.api.routes.RequirementRepository") as MockReqRepo:
MockRegRepo.return_value.get_by_code.return_value = regulation
MockReqRepo.return_value.get_by_regulation.return_value = []
response = client.get("/compliance/regulations/GDPR/requirements")
assert response.status_code == 200
assert response.json()["total"] == 0
# ---------------------------------------------------------------------------
# Tests: POST /compliance/requirements
# ---------------------------------------------------------------------------
class TestCreateRequirement:
"""Tests for POST /compliance/requirements."""
def test_create_success(self):
req = make_requirement()
regulation = make_regulation()
with patch("compliance.api.routes.RequirementRepository") as MockRepo, \
patch("compliance.api.routes.RegulationRepository") as MockRegRepo:
MockRegRepo.return_value.get_by_id.return_value = regulation
MockRepo.return_value.create.return_value = req
response = client.post("/compliance/requirements", json={
"regulation_id": REG_UUID,
"article": "Art. 32",
"title": "Sicherheit der Verarbeitung",
"is_applicable": True,
"priority": 1,
})
assert response.status_code == 200
data = response.json()
assert data["article"] == "Art. 32"
assert data["regulation_id"] == REG_UUID
def test_create_regulation_not_found(self):
with patch("compliance.api.routes.RegulationRepository") as MockRegRepo:
MockRegRepo.return_value.get_by_id.return_value = None
response = client.post("/compliance/requirements", json={
"regulation_id": "nonexistent-reg",
"article": "Art. 99",
"title": "Test",
"is_applicable": True,
"priority": 2,
})
assert response.status_code == 404
def test_create_missing_required_fields(self):
"""Missing title → 422."""
response = client.post("/compliance/requirements", json={
"regulation_id": REG_UUID,
"article": "Art. 32",
})
assert response.status_code == 422
# ---------------------------------------------------------------------------
# Tests: DELETE /compliance/requirements/{id}
# ---------------------------------------------------------------------------
class TestDeleteRequirement:
"""Tests for DELETE /compliance/requirements/{id}."""
def test_delete_success(self):
with patch("compliance.api.routes.RequirementRepository") as MockRepo:
MockRepo.return_value.delete.return_value = True
response = client.delete(f"/compliance/requirements/{REQ_UUID}")
assert response.status_code == 200
assert response.json()["success"] is True
def test_delete_not_found(self):
with patch("compliance.api.routes.RequirementRepository") as MockRepo:
MockRepo.return_value.delete.return_value = False
response = client.delete("/compliance/requirements/nonexistent")
assert response.status_code == 404
# ---------------------------------------------------------------------------
# Tests: PUT /compliance/requirements/{id}
# Note: This route uses db.query() directly.
# ---------------------------------------------------------------------------
class TestUpdateRequirement:
"""Tests for PUT /compliance/requirements/{id}."""
def test_update_success(self):
req = make_requirement()
req.implementation_status = "implemented"
# Reset any previous side_effect before setting return_value
mock_db.query.return_value.filter.return_value.first.side_effect = None
mock_db.query.return_value.filter.return_value.first.return_value = req
response = client.put(
f"/compliance/requirements/{REQ_UUID}",
json={"implementation_status": "implemented"},
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
def test_update_not_found(self):
mock_db.query.return_value.filter.return_value.first.side_effect = None
mock_db.query.return_value.filter.return_value.first.return_value = None
response = client.put(
"/compliance/requirements/nonexistent",
json={"implementation_status": "implemented"},
)
assert response.status_code == 404

View File

@@ -0,0 +1,245 @@
"""Tests for Risk management routes (risk_routes.py)."""
from datetime import datetime, date
from unittest.mock import MagicMock, patch
from fastapi import FastAPI
from fastapi.testclient import TestClient
from compliance.api.risk_routes import router as risk_router
from classroom_engine.database import get_db
# ---------------------------------------------------------------------------
# App setup with mocked DB dependency
# ---------------------------------------------------------------------------
app = FastAPI()
app.include_router(risk_router)
mock_db = MagicMock()
def override_get_db():
yield mock_db
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
RISK_UUID = "aaaaaaaa-1111-2222-3333-bbbbbbbbbbbb"
NOW = datetime(2024, 3, 1, 12, 0, 0)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def make_risk(overrides=None):
r = MagicMock()
r.id = RISK_UUID
r.risk_id = "RISK-001"
r.title = "Datenleck durch unsichere API"
r.description = "API ohne Auth"
r.category = "data_breach"
r.likelihood = 3
r.impact = 4
# inherent_risk and residual_risk are Enum → need .value
r.inherent_risk = MagicMock()
r.inherent_risk.value = "high"
r.residual_likelihood = 2
r.residual_impact = 3
r.residual_risk = MagicMock()
r.residual_risk.value = "medium"
r.status = "open"
r.mitigating_controls = ["TOM-001"]
r.owner = "CISO"
r.treatment_plan = "API absichern"
r.identified_date = date(2024, 1, 1)
r.review_date = date(2024, 6, 1)
r.last_assessed_at = NOW
r.created_at = NOW
r.updated_at = NOW
if overrides:
for k, v in overrides.items():
setattr(r, k, v)
return r
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestListRisks:
"""Tests for GET /risks."""
def test_list_empty(self):
with patch("compliance.api.risk_routes.RiskRepository") as MockRepo:
MockRepo.return_value.get_all.return_value = []
response = client.get("/risks")
assert response.status_code == 200
data = response.json()
assert data["risks"] == []
assert data["total"] == 0
def test_list_with_risk(self):
with patch("compliance.api.risk_routes.RiskRepository") as MockRepo:
MockRepo.return_value.get_all.return_value = [make_risk()]
response = client.get("/risks")
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
r = data["risks"][0]
assert r["risk_id"] == "RISK-001"
assert r["title"] == "Datenleck durch unsichere API"
assert r["inherent_risk"] == "high"
assert r["status"] == "open"
def test_list_filter_category(self):
with patch("compliance.api.risk_routes.RiskRepository") as MockRepo:
MockRepo.return_value.get_all.return_value = [make_risk()]
response = client.get("/risks", params={"category": "data_breach"})
assert response.status_code == 200
assert response.json()["total"] == 1
def test_list_filter_status(self):
with patch("compliance.api.risk_routes.RiskRepository") as MockRepo:
MockRepo.return_value.get_all.return_value = []
response = client.get("/risks", params={"status": "mitigated"})
assert response.status_code == 200
def test_list_filter_risk_level(self):
with patch("compliance.api.risk_routes.RiskRepository") as MockRepo:
MockRepo.return_value.get_all.return_value = [make_risk()]
response = client.get("/risks", params={"risk_level": "high"})
assert response.status_code == 200
def test_list_multiple(self):
r2 = make_risk()
r2.id = "bbbbbbbb-2222-2222-2222-bbbbbbbbbbbb"
r2.risk_id = "RISK-002"
with patch("compliance.api.risk_routes.RiskRepository") as MockRepo:
MockRepo.return_value.get_all.return_value = [make_risk(), r2]
response = client.get("/risks")
assert response.status_code == 200
assert response.json()["total"] == 2
class TestCreateRisk:
"""Tests for POST /risks."""
def test_create_success(self):
risk = make_risk()
with patch("compliance.api.risk_routes.RiskRepository") as MockRepo:
MockRepo.return_value.create.return_value = risk
response = client.post("/risks", json={
"risk_id": "RISK-001",
"title": "Datenleck durch unsichere API",
"category": "data_breach",
"likelihood": 3,
"impact": 4,
})
assert response.status_code == 200
data = response.json()
assert data["risk_id"] == "RISK-001"
assert data["inherent_risk"] == "high"
def test_create_missing_required_fields(self):
"""Missing risk_id → 422."""
response = client.post("/risks", json={
"title": "Ohne risk_id",
})
assert response.status_code == 422
def test_create_likelihood_out_of_range(self):
"""likelihood > 5 → 422."""
response = client.post("/risks", json={
"risk_id": "R-999",
"title": "Test",
"category": "test",
"likelihood": 6,
"impact": 3,
})
assert response.status_code == 422
class TestUpdateRisk:
"""Tests for PUT /risks/{risk_id}."""
def test_update_success(self):
updated = make_risk()
updated.title = "Aktualisiertes Risiko"
with patch("compliance.api.risk_routes.RiskRepository") as MockRepo:
repo = MockRepo.return_value
# Update route uses get_by_risk_id (the risk_id string, not UUID)
repo.get_by_risk_id.return_value = make_risk()
repo.update.return_value = updated
response = client.put("/risks/RISK-001", json={"title": "Aktualisiertes Risiko"})
assert response.status_code == 200
assert response.json()["title"] == "Aktualisiertes Risiko"
def test_update_not_found(self):
with patch("compliance.api.risk_routes.RiskRepository") as MockRepo:
MockRepo.return_value.get_by_risk_id.return_value = None
response = client.put("/risks/RISK-999", json={"title": "Test"})
assert response.status_code == 404
def test_update_status_change(self):
updated = make_risk()
updated.status = "closed"
with patch("compliance.api.risk_routes.RiskRepository") as MockRepo:
repo = MockRepo.return_value
repo.get_by_risk_id.return_value = make_risk()
repo.update.return_value = updated
response = client.put("/risks/RISK-001", json={"status": "closed"})
assert response.status_code == 200
assert response.json()["status"] == "closed"
class TestDeleteRisk:
"""Tests for DELETE /risks/{risk_id}."""
def test_delete_success(self):
with patch("compliance.api.risk_routes.RiskRepository") as MockRepo:
repo = MockRepo.return_value
repo.get_by_risk_id.return_value = make_risk()
# Delete uses db.delete(risk) directly
response = client.delete("/risks/RISK-001")
assert response.status_code == 200
data = response.json()
assert data["success"] is True
def test_delete_not_found(self):
with patch("compliance.api.risk_routes.RiskRepository") as MockRepo:
MockRepo.return_value.get_by_risk_id.return_value = None
response = client.delete("/risks/RISK-999")
assert response.status_code == 404
class TestRiskMatrix:
"""Tests for GET /risks/matrix."""
def test_matrix_returns_structure(self):
# Schema: Dict[str, Dict[str, List[str]]] → {likelihood: {impact: [risk_ids]}}
matrix_data = {
"3": {"4": ["RISK-001"]},
"1": {"1": []},
}
with patch("compliance.api.risk_routes.RiskRepository") as MockRepo:
repo = MockRepo.return_value
repo.get_risk_matrix.return_value = matrix_data
repo.get_all.return_value = [make_risk()]
response = client.get("/risks/matrix")
assert response.status_code == 200
data = response.json()
assert "matrix" in data
assert "risks" in data
assert len(data["risks"]) == 1
def test_matrix_empty(self):
with patch("compliance.api.risk_routes.RiskRepository") as MockRepo:
repo = MockRepo.return_value
repo.get_risk_matrix.return_value = {}
repo.get_all.return_value = []
response = client.get("/risks/matrix")
assert response.status_code == 200
data = response.json()
assert data["risks"] == []