fix(quality): Ruff/CVE/TS-Fixes, 104 neue Tests, Complexity-Refactoring
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Failing after 30s
CI / test-python-backend-compliance (push) Successful in 30s
CI / test-python-document-crawler (push) Successful in 21s
CI / test-python-dsms-gateway (push) Successful in 17s

- Ruff: 144 auto-fixes (unused imports, == None → is None), F821/F811/F841 manuell
- CVEs: python-multipart>=0.0.22, weasyprint>=68.0, pillow>=12.1.1, npm audit fix (0 vulns)
- TS: 5 tote Drafting-Engine-Dateien entfernt, allowed-facts/sanitizer/StepHeader/context fixes
- Tests: +104 (ISMS 58, Evidence 18, VVT 14, Generation 14) → 1449 passed
- Refactoring: collect_ci_evidence (F→A), row_to_response (E→A), extract_requirements (E→A)
- Dead Code: pca-platform, 7 Go-Handler, dsr_api.py, duplicate Schemas entfernt

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-07 19:00:33 +01:00
parent 6509e64dd9
commit 95fcba34cd
124 changed files with 2533 additions and 15709 deletions

View File

@@ -50,7 +50,7 @@ class TestRowToResponse:
"""Tests for DB row to response conversion."""
def _make_row(self, **overrides):
"""Create a mock DB row with 30 fields."""
"""Create a mock DB row with 40 fields (matching row_to_response indices)."""
defaults = [
"uuid-123", # 0: id
"default", # 1: tenant_id
@@ -82,6 +82,17 @@ class TestRowToResponse:
"2026-01-01", # 27: completed_at
"2026-01-01", # 28: created_at
"2026-01-01", # 29: updated_at
# Phase 2 fields (indices 30-39)
[], # 30: repos
[], # 31: document_sources
[], # 32: processing_systems
[], # 33: ai_systems
[], # 34: technical_contacts
False, # 35: subject_to_nis2
False, # 36: subject_to_ai_act
False, # 37: subject_to_iso27001
None, # 38: supervisory_authority
12, # 39: review_cycle_months
]
return tuple(defaults)

View File

@@ -429,7 +429,7 @@ class TestGetTenantId:
assert _get_tenant_id("my-tenant") == "my-tenant"
def test_default_constant_value(self):
assert DEFAULT_TENANT_ID == "default"
assert DEFAULT_TENANT_ID == "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
# =============================================================================

View File

@@ -252,3 +252,268 @@ class TestEvidenceCIStatus:
MockRepo.return_value.get_all.return_value = []
response = client.get("/evidence/ci-status", params={"control_id": CONTROL_UUID})
assert response.status_code == 200
def test_ci_status_without_control_id(self):
"""GET /evidence/ci-status without control_id returns all CI evidence."""
mock_query = MagicMock()
mock_query.filter.return_value = mock_query
mock_query.order_by.return_value = mock_query
mock_query.limit.return_value = mock_query
mock_query.all.return_value = []
mock_db.query.return_value = mock_query
response = client.get("/evidence/ci-status")
assert response.status_code == 200
data = response.json()
assert data["period_days"] == 30
assert data["total_evidence"] == 0
assert data["controls"] == []
def test_ci_status_custom_days_param(self):
"""GET /evidence/ci-status with custom days lookback."""
mock_query = MagicMock()
mock_query.filter.return_value = mock_query
mock_query.order_by.return_value = mock_query
mock_query.limit.return_value = mock_query
mock_query.all.return_value = []
mock_db.query.return_value = mock_query
response = client.get("/evidence/ci-status", params={"days": 7})
assert response.status_code == 200
data = response.json()
assert data["period_days"] == 7
class TestCollectCIEvidence:
"""Tests for POST /evidence/collect."""
def test_collect_sast_evidence_success(self):
"""Collect SAST evidence with Semgrep-format report data."""
ctrl = make_control({"control_id": "SDLC-001"})
evidence = make_evidence({
"evidence_type": "ci_sast",
"source": "ci_pipeline",
"ci_job_id": "job-456",
})
with patch("compliance.api.evidence_routes.ControlRepository") as MockCtrlRepo, \
patch("compliance.api.evidence_routes._store_evidence", return_value=evidence), \
patch("compliance.api.evidence_routes._update_risks", return_value=None):
MockCtrlRepo.return_value.get_by_control_id.return_value = ctrl
response = client.post(
"/evidence/collect",
params={"source": "sast", "ci_job_id": "job-456"},
json={"results": [
{"check_id": "python.lang.security", "extra": {"severity": "MEDIUM"}},
]},
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert data["source"] == "sast"
assert data["control_id"] == "SDLC-001"
def test_collect_unknown_source_returns_400(self):
"""Unknown source should return 400."""
response = client.post(
"/evidence/collect",
params={"source": "unknown_tool"},
json={},
)
assert response.status_code == 400
assert "Unknown source" in response.json()["detail"]
def test_collect_control_not_found_returns_404(self):
"""If the mapped control does not exist in DB, return 404."""
with patch("compliance.api.evidence_routes.ControlRepository") as MockCtrlRepo:
MockCtrlRepo.return_value.get_by_control_id.return_value = None
response = client.post(
"/evidence/collect",
params={"source": "sast"},
json={"results": []},
)
assert response.status_code == 404
assert "SDLC-001" in response.json()["detail"]
def test_collect_with_null_report_data(self):
"""Collect with no report data body (None)."""
ctrl = make_control({"control_id": "SDLC-002"})
evidence = make_evidence({
"evidence_type": "ci_dependency_scan",
"source": "ci_pipeline",
})
with patch("compliance.api.evidence_routes.ControlRepository") as MockCtrlRepo, \
patch("compliance.api.evidence_routes._store_evidence", return_value=evidence), \
patch("compliance.api.evidence_routes._update_risks", return_value=None):
MockCtrlRepo.return_value.get_by_control_id.return_value = ctrl
response = client.post(
"/evidence/collect",
params={"source": "dependency_scan"},
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
def test_collect_sbom_source(self):
"""Collect SBOM evidence with components list."""
ctrl = make_control({"control_id": "SDLC-005"})
evidence = make_evidence({
"evidence_type": "ci_sbom",
"source": "ci_pipeline",
})
with patch("compliance.api.evidence_routes.ControlRepository") as MockCtrlRepo, \
patch("compliance.api.evidence_routes._store_evidence", return_value=evidence), \
patch("compliance.api.evidence_routes._update_risks", return_value=None):
MockCtrlRepo.return_value.get_by_control_id.return_value = ctrl
response = client.post(
"/evidence/collect",
params={"source": "sbom"},
json={"components": [
{"name": "fastapi", "version": "0.100.0"},
{"name": "pydantic", "version": "2.0.0"},
]},
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert data["source"] == "sbom"
class TestParseCIEvidence:
"""Unit tests for _parse_ci_evidence helper."""
def test_parse_empty_data(self):
from compliance.api.evidence_routes import _parse_ci_evidence
result = _parse_ci_evidence({})
assert result["findings_count"] == 0
assert result["critical_findings"] == 0
assert result["evidence_status"] == "valid"
def test_parse_none_data(self):
from compliance.api.evidence_routes import _parse_ci_evidence
result = _parse_ci_evidence(None)
assert result["evidence_status"] == "valid"
assert result["report_json"] == "{}"
def test_parse_semgrep_with_critical(self):
"""Semgrep results with CRITICAL severity → status=failed."""
from compliance.api.evidence_routes import _parse_ci_evidence
data = {
"results": [
{"check_id": "sql-injection", "extra": {"severity": "CRITICAL"}},
{"check_id": "xss", "extra": {"severity": "MEDIUM"}},
]
}
result = _parse_ci_evidence(data)
assert result["findings_count"] == 2
assert result["critical_findings"] == 1
assert result["evidence_status"] == "failed"
def test_parse_trivy_format(self):
"""Trivy Results format with Vulnerabilities."""
from compliance.api.evidence_routes import _parse_ci_evidence
data = {
"Results": [
{
"Target": "python:3.11",
"Vulnerabilities": [
{"VulnerabilityID": "CVE-2024-001", "Severity": "HIGH"},
{"VulnerabilityID": "CVE-2024-002", "Severity": "LOW"},
],
}
]
}
result = _parse_ci_evidence(data)
assert result["findings_count"] == 2
assert result["critical_findings"] == 1
assert result["evidence_status"] == "failed"
def test_parse_generic_findings(self):
"""Generic findings array format."""
from compliance.api.evidence_routes import _parse_ci_evidence
data = {"findings": [{"id": "f1"}, {"id": "f2"}, {"id": "f3"}]}
result = _parse_ci_evidence(data)
assert result["findings_count"] == 3
assert result["critical_findings"] == 0
assert result["evidence_status"] == "valid"
def test_parse_sbom_components(self):
"""SBOM components → findings_count = number of components."""
from compliance.api.evidence_routes import _parse_ci_evidence
data = {"components": [{"name": "a"}, {"name": "b"}]}
result = _parse_ci_evidence(data)
assert result["findings_count"] == 2
assert result["evidence_status"] == "valid"
class TestExtractFindingsDetail:
"""Unit tests for _extract_findings_detail helper."""
def test_extract_empty(self):
from compliance.api.evidence_routes import _extract_findings_detail
result = _extract_findings_detail({})
assert result == {"critical": 0, "high": 0, "medium": 0, "low": 0}
def test_extract_none(self):
from compliance.api.evidence_routes import _extract_findings_detail
result = _extract_findings_detail(None)
assert result == {"critical": 0, "high": 0, "medium": 0, "low": 0}
def test_extract_semgrep_severities(self):
from compliance.api.evidence_routes import _extract_findings_detail
data = {
"results": [
{"extra": {"severity": "CRITICAL"}},
{"extra": {"severity": "HIGH"}},
{"extra": {"severity": "MEDIUM"}},
{"extra": {"severity": "LOW"}},
{"extra": {"severity": "INFO"}},
]
}
result = _extract_findings_detail(data)
assert result["critical"] == 1
assert result["high"] == 1
assert result["medium"] == 1
assert result["low"] == 2 # LOW + INFO both count as low
class TestListEvidenceEdgeCases:
"""Additional edge-case tests for GET /evidence."""
def test_list_filter_by_status(self):
"""Filter by status parameter."""
ev_valid = make_evidence({"status": MagicMock(value="valid")})
ev_failed = make_evidence({"status": MagicMock(value="failed")})
with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo:
MockRepo.return_value.get_all.return_value = [ev_valid, ev_failed]
response = client.get("/evidence", params={"status": "valid"})
assert response.status_code == 200
# The route filters in-memory by status enum
data = response.json()
# At least it returns without error (status enum matching may differ with mocks)
assert "evidence" in data
def test_list_filter_invalid_status(self):
"""Invalid status value should be ignored (no crash)."""
with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo:
MockRepo.return_value.get_all.return_value = [make_evidence()]
response = client.get("/evidence", params={"status": "nonexistent_status"})
assert response.status_code == 200
# Invalid status is silently ignored per the try/except ValueError in the route
assert response.json()["total"] == 1
def test_list_control_not_found(self):
"""GET /evidence with nonexistent control_id returns 404."""
with patch("compliance.api.evidence_routes.EvidenceRepository"), \
patch("compliance.api.evidence_routes.ControlRepository") as MockCtrlRepo:
MockCtrlRepo.return_value.get_by_control_id.return_value = None
response = client.get("/evidence", params={"control_id": "NONEXISTENT-001"})
assert response.status_code == 404
def test_list_pagination_slices_correctly(self):
"""Pagination returns correct slice while total reflects full count."""
items = [make_evidence({"id": f"e{i}-" + "0" * 32}) for i in range(5)]
with patch("compliance.api.evidence_routes.EvidenceRepository") as MockRepo:
MockRepo.return_value.get_all.return_value = items
response = client.get("/evidence", params={"page": 2, "limit": 2})
assert response.status_code == 200
data = response.json()
assert data["total"] == 5
assert len(data["evidence"]) == 2

View File

@@ -231,3 +231,161 @@ class TestGenerationRouteRegistration:
paths = [r.path for r in router.routes]
assert any("preview" in p for p in paths)
assert any("apply" in p for p in paths)
# =============================================================================
# _generate_for_type dispatcher
# =============================================================================
class TestGenerateForType:
"""Tests for the _generate_for_type dispatcher function."""
def test_dsfa_returns_single_item_list(self):
from compliance.api.generation_routes import _generate_for_type
ctx = _make_ctx()
result = _generate_for_type("dsfa", ctx)
assert isinstance(result, list)
assert len(result) == 1
assert "DSFA" in result[0]["title"]
def test_vvt_dispatches_correctly(self):
from compliance.api.generation_routes import _generate_for_type
ctx = _make_ctx(processing_systems=[
{"name": "HR System", "vendor": "SAP", "hosting": "cloud", "personal_data_categories": ["Mitarbeiter"]},
])
result = _generate_for_type("vvt", ctx)
assert isinstance(result, list)
assert len(result) == 1
assert "HR System" in result[0]["name"]
def test_tom_dispatches_correctly(self):
from compliance.api.generation_routes import _generate_for_type
ctx = _make_ctx()
result = _generate_for_type("tom", ctx)
assert isinstance(result, list)
assert len(result) == 8 # Base TOMs
def test_loeschfristen_dispatches_correctly(self):
from compliance.api.generation_routes import _generate_for_type
ctx = _make_ctx(processing_systems=[
{"name": "Payroll", "personal_data_categories": ["Bankdaten"]},
])
result = _generate_for_type("loeschfristen", ctx)
assert isinstance(result, list)
assert len(result) == 1
def test_obligation_dispatches_correctly(self):
from compliance.api.generation_routes import _generate_for_type
ctx = _make_ctx()
result = _generate_for_type("obligation", ctx)
assert isinstance(result, list)
assert len(result) == 8 # Base DSGVO obligations
def test_invalid_doc_type_raises_value_error(self):
from compliance.api.generation_routes import _generate_for_type
ctx = _make_ctx()
with pytest.raises(ValueError, match="Unknown doc_type"):
_generate_for_type("nonexistent", ctx)
# =============================================================================
# VALID_DOC_TYPES validation
# =============================================================================
class TestValidDocTypes:
"""Tests for doc_type validation constants."""
def test_valid_doc_types_contains_all_expected(self):
from compliance.api.generation_routes import VALID_DOC_TYPES
expected = {"dsfa", "vvt", "tom", "loeschfristen", "obligation"}
assert VALID_DOC_TYPES == expected
def test_invalid_types_not_accepted(self):
from compliance.api.generation_routes import VALID_DOC_TYPES
invalid_types = ["dsgvo", "audit", "risk", "consent", "privacy", ""]
for t in invalid_types:
assert t not in VALID_DOC_TYPES, f"{t} should not be in VALID_DOC_TYPES"
# =============================================================================
# Template Context edge cases
# =============================================================================
class TestTemplateContextEdgeCases:
"""Tests for template context building and edge cases."""
def test_empty_company_name_still_generates(self):
"""Templates should work even with empty company name."""
ctx = _make_ctx(company_name="")
draft = generate_dsfa_draft(ctx)
assert draft["status"] == "draft"
assert "DSFA" in draft["title"]
def test_minimal_context_generates_all_types(self):
"""All generators should handle a minimal context without crashing."""
from compliance.api.generation_routes import _generate_for_type
ctx = _make_ctx()
for doc_type in ["dsfa", "vvt", "tom", "loeschfristen", "obligation"]:
result = _generate_for_type(doc_type, ctx)
assert isinstance(result, list), f"{doc_type} should return a list"
def test_context_with_many_processing_systems(self):
"""Verify generators handle multiple processing systems correctly."""
systems = [
{"name": f"System-{i}", "vendor": f"Vendor-{i}", "hosting": "cloud",
"personal_data_categories": [f"Kategorie-{i}"]}
for i in range(5)
]
ctx = _make_ctx(processing_systems=systems)
vvt_drafts = generate_vvt_drafts(ctx)
assert len(vvt_drafts) == 5
# Verify sequential VVT IDs
for i, draft in enumerate(vvt_drafts):
assert draft["vvt_id"] == f"VVT-AUTO-{i+1:03d}"
def test_context_with_multiple_ai_systems(self):
"""DSFA should list all AI systems in summary."""
ctx = _make_ctx(
has_ai_systems=True,
subject_to_ai_act=True,
ai_systems=[
{"name": "Chatbot", "purpose": "Support", "risk_category": "limited", "has_human_oversight": True},
{"name": "Scoring", "purpose": "Credit", "risk_category": "high", "has_human_oversight": False},
{"name": "OCR", "purpose": "Documents", "risk_category": "minimal", "has_human_oversight": True},
],
)
draft = generate_dsfa_draft(ctx)
assert len(draft["ai_systems_summary"]) == 3
assert draft["risk_level"] == "high"
def test_context_without_dpo_uses_empty_string(self):
"""When dpo_name is empty, templates should still work."""
ctx = _make_ctx(dpo_name="", dpo_email="")
draft = generate_dsfa_draft(ctx)
assert draft["dpo_name"] == ""
# Should still generate valid sections
assert "section_1" in draft["sections"]
def test_all_regulatory_flags_affect_all_generators(self):
"""When all regulatory flags are set, all generators should produce more output."""
from compliance.api.generation_routes import _generate_for_type
ctx_minimal = _make_ctx()
ctx_full = _make_ctx(
subject_to_nis2=True,
subject_to_ai_act=True,
subject_to_iso27001=True,
)
tom_minimal = _generate_for_type("tom", ctx_minimal)
tom_full = _generate_for_type("tom", ctx_full)
assert len(tom_full) > len(tom_minimal)
obligation_minimal = _generate_for_type("obligation", ctx_minimal)
obligation_full = _generate_for_type("obligation", ctx_full)
assert len(obligation_full) > len(obligation_minimal)
def test_dsfa_without_ai_has_empty_ai_summary(self):
"""DSFA without AI systems should have empty ai_systems_summary."""
ctx = _make_ctx(has_ai_systems=False, ai_systems=[])
draft = generate_dsfa_draft(ctx)
assert draft["ai_systems_summary"] == []
assert draft["involves_ai"] is False

View File

@@ -0,0 +1,886 @@
"""Integration tests for ISMS routes (isms_routes.py).
Tests the ISO 27001 ISMS API endpoints using TestClient + SQLite + ORM:
- Scope CRUD + Approval
- Policy CRUD + Approval + Duplicate check
- Overview / Dashboard endpoint
- Readiness check
- Edge cases (not found, invalid data, etc.)
Run with: cd backend-compliance && python3 -m pytest tests/test_isms_routes.py -v
"""
import os
import sys
import pytest
from datetime import date, datetime
from fastapi import FastAPI
from fastapi.testclient import TestClient
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from classroom_engine.database import Base, get_db
from compliance.api.isms_routes import router as isms_router
# =============================================================================
# Test App + SQLite Setup
# =============================================================================
SQLALCHEMY_DATABASE_URL = "sqlite:///./test_isms.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
TestSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
@event.listens_for(engine, "connect")
def _set_sqlite_pragma(dbapi_conn, connection_record):
"""Enable foreign keys and register NOW() for SQLite."""
cursor = dbapi_conn.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
dbapi_conn.create_function("NOW", 0, lambda: datetime.utcnow().isoformat())
app = FastAPI()
app.include_router(isms_router)
def override_get_db():
db = TestSessionLocal()
try:
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
# =============================================================================
# Fixtures
# =============================================================================
@pytest.fixture(autouse=True)
def setup_db():
"""Create all tables before each test module, drop after."""
# Import all models so Base.metadata knows about them
import compliance.db.models # noqa: F401
Base.metadata.create_all(bind=engine)
yield
Base.metadata.drop_all(bind=engine)
# =============================================================================
# Helper data builders
# =============================================================================
def _scope_payload(**overrides):
data = {
"scope_statement": "ISMS covers all BreakPilot digital learning operations",
"included_locations": ["Frankfurt Office", "AWS eu-central-1"],
"included_processes": ["Software Development", "Data Processing"],
"included_services": ["BreakPilot PWA", "AI Assistant"],
"excluded_items": ["Marketing Website"],
"exclusion_justification": "Static site, no user data",
}
data.update(overrides)
return data
def _policy_payload(policy_id="POL-ISMS-001", **overrides):
data = {
"policy_id": policy_id,
"title": "Information Security Policy",
"policy_type": "master",
"description": "Master ISMS policy",
"policy_text": "This policy establishes the framework for information security...",
"applies_to": ["All Employees"],
"review_frequency_months": 12,
"related_controls": ["GOV-001"],
"authored_by": "iso@breakpilot.de",
}
data.update(overrides)
return data
def _objective_payload(objective_id="OBJ-2026-001", **overrides):
data = {
"objective_id": objective_id,
"title": "Reduce Security Incidents",
"description": "Reduce incidents by 30%",
"category": "operational",
"specific": "Reduce from 10 to 7 per year",
"measurable": "Incident count in ticketing system",
"achievable": "Based on trend analysis",
"relevant": "Supports info sec goals",
"time_bound": "By Q4 2026",
"kpi_name": "Security Incident Count",
"kpi_target": "7",
"kpi_unit": "incidents/year",
"measurement_frequency": "monthly",
"owner": "security@breakpilot.de",
"target_date": "2026-12-31",
"related_controls": ["OPS-003"],
}
data.update(overrides)
return data
def _soa_payload(annex_a_control="A.5.1", **overrides):
data = {
"annex_a_control": annex_a_control,
"annex_a_title": "Policies for information security",
"annex_a_category": "organizational",
"is_applicable": True,
"applicability_justification": "Required for ISMS governance",
"implementation_status": "implemented",
"implementation_notes": "Covered by GOV-001",
"breakpilot_control_ids": ["GOV-001"],
"coverage_level": "full",
"evidence_description": "ISMS Policy v2.0",
}
data.update(overrides)
return data
def _finding_payload(**overrides):
data = {
"finding_type": "minor",
"iso_chapter": "9.2",
"annex_a_control": "A.5.35",
"title": "Audit schedule not documented",
"description": "No formal internal audit schedule found",
"objective_evidence": "No document in DMS",
"impact_description": "Cannot demonstrate planned approach",
"owner": "iso@breakpilot.de",
"auditor": "external.auditor@cert.de",
"due_date": "2026-03-31",
}
data.update(overrides)
return data
def _mgmt_review_payload(**overrides):
data = {
"title": "Q1 2026 Management Review",
"review_date": "2026-01-15",
"review_period_start": "2025-10-01",
"review_period_end": "2025-12-31",
"chairperson": "ceo@breakpilot.de",
"attendees": [
{"name": "CEO", "role": "Chairperson"},
{"name": "CTO", "role": "Technical Lead"},
],
}
data.update(overrides)
return data
def _internal_audit_payload(**overrides):
data = {
"title": "ISMS Internal Audit 2026",
"audit_type": "scheduled",
"scope_description": "Complete ISMS audit covering all chapters",
"iso_chapters_covered": ["4", "5", "6", "7", "8", "9", "10"],
"annex_a_controls_covered": ["A.5", "A.6"],
"criteria": "ISO 27001:2022",
"planned_date": "2026-03-01",
"lead_auditor": "internal.auditor@breakpilot.de",
"audit_team": ["internal.auditor@breakpilot.de", "qa@breakpilot.de"],
}
data.update(overrides)
return data
# =============================================================================
# Test: ISMS Scope CRUD
# =============================================================================
class TestISMSScopeCRUD:
"""Tests for ISMS Scope CRUD endpoints."""
def test_create_scope(self):
"""POST /isms/scope should create a new scope."""
r = client.post("/isms/scope", json=_scope_payload(), params={"created_by": "admin@bp.de"})
assert r.status_code == 200
body = r.json()
assert body["scope_statement"] == "ISMS covers all BreakPilot digital learning operations"
assert body["status"] == "draft"
assert body["version"] == "1.0"
assert "id" in body
def test_get_scope(self):
"""GET /isms/scope should return the current scope."""
client.post("/isms/scope", json=_scope_payload(), params={"created_by": "admin@bp.de"})
r = client.get("/isms/scope")
assert r.status_code == 200
assert r.json()["scope_statement"] is not None
def test_get_scope_not_found(self):
"""GET /isms/scope should return 404 when no scope exists."""
r = client.get("/isms/scope")
assert r.status_code == 404
def test_update_scope(self):
"""PUT /isms/scope/{id} should update draft scope."""
create = client.post("/isms/scope", json=_scope_payload(), params={"created_by": "admin@bp.de"})
scope_id = create.json()["id"]
r = client.put(
f"/isms/scope/{scope_id}",
json={"scope_statement": "Updated scope statement"},
params={"updated_by": "admin@bp.de"},
)
assert r.status_code == 200
assert r.json()["scope_statement"] == "Updated scope statement"
assert r.json()["version"] == "1.1"
def test_update_scope_not_found(self):
"""PUT /isms/scope/{id} should return 404 for unknown id."""
r = client.put(
"/isms/scope/nonexistent-id",
json={"scope_statement": "x"},
params={"updated_by": "admin@bp.de"},
)
assert r.status_code == 404
def test_create_scope_supersedes_existing(self):
"""Creating a new scope should supersede the old one."""
client.post("/isms/scope", json=_scope_payload(), params={"created_by": "admin@bp.de"})
client.post(
"/isms/scope",
json=_scope_payload(scope_statement="New scope v2"),
params={"created_by": "admin@bp.de"},
)
r = client.get("/isms/scope")
assert r.status_code == 200
assert r.json()["scope_statement"] == "New scope v2"
def test_approve_scope(self):
"""POST /isms/scope/{id}/approve should approve scope."""
create = client.post("/isms/scope", json=_scope_payload(), params={"created_by": "admin@bp.de"})
scope_id = create.json()["id"]
r = client.post(
f"/isms/scope/{scope_id}/approve",
json={
"approved_by": "ceo@breakpilot.de",
"effective_date": "2026-03-01",
"review_date": "2027-03-01",
},
)
assert r.status_code == 200
assert r.json()["status"] == "approved"
assert r.json()["approved_by"] == "ceo@breakpilot.de"
def test_approve_scope_not_found(self):
"""POST /isms/scope/{id}/approve should return 404 for unknown scope."""
r = client.post(
"/isms/scope/fake-id/approve",
json={
"approved_by": "ceo@breakpilot.de",
"effective_date": "2026-03-01",
"review_date": "2027-03-01",
},
)
assert r.status_code == 404
def test_update_approved_scope_rejected(self):
"""PUT on approved scope should return 400."""
create = client.post("/isms/scope", json=_scope_payload(), params={"created_by": "admin@bp.de"})
scope_id = create.json()["id"]
client.post(
f"/isms/scope/{scope_id}/approve",
json={
"approved_by": "ceo@breakpilot.de",
"effective_date": "2026-03-01",
"review_date": "2027-03-01",
},
)
r = client.put(
f"/isms/scope/{scope_id}",
json={"scope_statement": "changed"},
params={"updated_by": "admin@bp.de"},
)
assert r.status_code == 400
assert "approved" in r.json()["detail"].lower()
# =============================================================================
# Test: ISMS Policy CRUD
# =============================================================================
class TestISMSPolicyCRUD:
"""Tests for ISMS Policy CRUD endpoints."""
def test_create_policy(self):
"""POST /isms/policies should create a new policy."""
r = client.post("/isms/policies", json=_policy_payload())
assert r.status_code == 200
body = r.json()
assert body["policy_id"] == "POL-ISMS-001"
assert body["status"] == "draft"
assert body["version"] == "1.0"
def test_list_policies(self):
"""GET /isms/policies should list all policies."""
client.post("/isms/policies", json=_policy_payload("POL-ISMS-001"))
client.post("/isms/policies", json=_policy_payload("POL-ISMS-002", title="Access Control Policy"))
r = client.get("/isms/policies")
assert r.status_code == 200
assert r.json()["total"] == 2
assert len(r.json()["policies"]) == 2
def test_list_policies_filter_by_type(self):
"""GET /isms/policies?policy_type=master should filter."""
client.post("/isms/policies", json=_policy_payload("POL-001"))
client.post("/isms/policies", json=_policy_payload("POL-002", policy_type="operational"))
r = client.get("/isms/policies", params={"policy_type": "master"})
assert r.status_code == 200
assert r.json()["total"] == 1
def test_get_policy_by_id(self):
"""GET /isms/policies/{id} should return a policy by its UUID."""
create = client.post("/isms/policies", json=_policy_payload())
policy_uuid = create.json()["id"]
r = client.get(f"/isms/policies/{policy_uuid}")
assert r.status_code == 200
assert r.json()["policy_id"] == "POL-ISMS-001"
def test_get_policy_by_policy_id(self):
"""GET /isms/policies/{policy_id} should also match the human-readable id."""
client.post("/isms/policies", json=_policy_payload())
r = client.get("/isms/policies/POL-ISMS-001")
assert r.status_code == 200
assert r.json()["title"] == "Information Security Policy"
def test_get_policy_not_found(self):
"""GET /isms/policies/{id} should return 404 for unknown policy."""
r = client.get("/isms/policies/nonexistent")
assert r.status_code == 404
def test_update_policy(self):
"""PUT /isms/policies/{id} should update a draft policy."""
create = client.post("/isms/policies", json=_policy_payload())
pid = create.json()["id"]
r = client.put(
f"/isms/policies/{pid}",
json={"title": "Updated Title"},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 200
assert r.json()["title"] == "Updated Title"
def test_update_policy_not_found(self):
"""PUT /isms/policies/{id} should return 404 for unknown policy."""
r = client.put(
"/isms/policies/fake-id",
json={"title": "x"},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 404
def test_duplicate_policy_id_rejected(self):
"""POST /isms/policies with duplicate policy_id should return 400."""
client.post("/isms/policies", json=_policy_payload("POL-DUP"))
r = client.post("/isms/policies", json=_policy_payload("POL-DUP"))
assert r.status_code == 400
assert "already exists" in r.json()["detail"]
def test_approve_policy(self):
"""POST /isms/policies/{id}/approve should approve a policy."""
create = client.post("/isms/policies", json=_policy_payload())
pid = create.json()["id"]
r = client.post(
f"/isms/policies/{pid}/approve",
json={
"reviewed_by": "cto@breakpilot.de",
"approved_by": "ceo@breakpilot.de",
"effective_date": "2026-03-01",
},
)
assert r.status_code == 200
assert r.json()["status"] == "approved"
assert r.json()["approved_by"] == "ceo@breakpilot.de"
assert r.json()["next_review_date"] is not None
def test_approve_policy_not_found(self):
"""POST /isms/policies/{id}/approve should 404 for unknown policy."""
r = client.post(
"/isms/policies/fake/approve",
json={
"reviewed_by": "x",
"approved_by": "y",
"effective_date": "2026-03-01",
},
)
assert r.status_code == 404
def test_update_approved_policy_bumps_version(self):
"""Updating an approved policy should increment major version and reset to draft."""
create = client.post("/isms/policies", json=_policy_payload())
pid = create.json()["id"]
client.post(
f"/isms/policies/{pid}/approve",
json={
"reviewed_by": "cto@bp.de",
"approved_by": "ceo@bp.de",
"effective_date": "2026-03-01",
},
)
r = client.put(
f"/isms/policies/{pid}",
json={"title": "Updated after approval"},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 200
assert r.json()["version"] == "2.0"
assert r.json()["status"] == "draft"
# =============================================================================
# Test: Security Objectives
# =============================================================================
class TestSecurityObjectivesCRUD:
"""Tests for Security Objectives endpoints."""
def test_create_objective(self):
"""POST /isms/objectives should create a new objective."""
r = client.post("/isms/objectives", json=_objective_payload(), params={"created_by": "iso@bp.de"})
assert r.status_code == 200
body = r.json()
assert body["objective_id"] == "OBJ-2026-001"
assert body["status"] == "active"
def test_list_objectives(self):
"""GET /isms/objectives should list all objectives."""
client.post("/isms/objectives", json=_objective_payload("OBJ-001"), params={"created_by": "a"})
client.post("/isms/objectives", json=_objective_payload("OBJ-002", title="Uptime"), params={"created_by": "a"})
r = client.get("/isms/objectives")
assert r.status_code == 200
assert r.json()["total"] == 2
def test_update_objective_progress(self):
"""PUT /isms/objectives/{id} should update progress."""
create = client.post("/isms/objectives", json=_objective_payload(), params={"created_by": "a"})
oid = create.json()["id"]
r = client.put(
f"/isms/objectives/{oid}",
json={"progress_percentage": 50},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 200
assert r.json()["progress_percentage"] == 50
def test_update_objective_auto_achieved(self):
"""Setting progress to 100% should auto-set status to 'achieved'."""
create = client.post("/isms/objectives", json=_objective_payload(), params={"created_by": "a"})
oid = create.json()["id"]
r = client.put(
f"/isms/objectives/{oid}",
json={"progress_percentage": 100},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 200
assert r.json()["status"] == "achieved"
def test_update_objective_not_found(self):
"""PUT /isms/objectives/{id} should 404 for unknown objective."""
r = client.put(
"/isms/objectives/fake",
json={"progress_percentage": 10},
params={"updated_by": "a"},
)
assert r.status_code == 404
# =============================================================================
# Test: Statement of Applicability (SoA)
# =============================================================================
class TestSoACRUD:
"""Tests for SoA endpoints."""
def test_create_soa_entry(self):
"""POST /isms/soa should create an SoA entry."""
r = client.post("/isms/soa", json=_soa_payload(), params={"created_by": "iso@bp.de"})
assert r.status_code == 200
body = r.json()
assert body["annex_a_control"] == "A.5.1"
assert body["is_applicable"] is True
def test_list_soa_entries(self):
"""GET /isms/soa should list all SoA entries."""
client.post("/isms/soa", json=_soa_payload("A.5.1"), params={"created_by": "a"})
client.post("/isms/soa", json=_soa_payload("A.6.1", is_applicable=False, applicability_justification="N/A"), params={"created_by": "a"})
r = client.get("/isms/soa")
assert r.status_code == 200
assert r.json()["total"] == 2
assert r.json()["applicable_count"] == 1
assert r.json()["not_applicable_count"] == 1
def test_duplicate_soa_control_rejected(self):
"""POST /isms/soa with duplicate annex_a_control should return 400."""
client.post("/isms/soa", json=_soa_payload("A.5.1"), params={"created_by": "a"})
r = client.post("/isms/soa", json=_soa_payload("A.5.1"), params={"created_by": "a"})
assert r.status_code == 400
assert "already exists" in r.json()["detail"]
def test_update_soa_entry(self):
"""PUT /isms/soa/{id} should update an SoA entry."""
create = client.post("/isms/soa", json=_soa_payload(), params={"created_by": "a"})
eid = create.json()["id"]
r = client.put(
f"/isms/soa/{eid}",
json={"implementation_status": "in_progress"},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 200
assert r.json()["implementation_status"] == "in_progress"
assert r.json()["version"] == "1.1"
def test_update_soa_not_found(self):
"""PUT /isms/soa/{id} should 404 for unknown entry."""
r = client.put(
"/isms/soa/fake",
json={"implementation_status": "implemented"},
params={"updated_by": "a"},
)
assert r.status_code == 404
def test_approve_soa_entry(self):
"""POST /isms/soa/{id}/approve should approve an SoA entry."""
create = client.post("/isms/soa", json=_soa_payload(), params={"created_by": "a"})
eid = create.json()["id"]
r = client.post(
f"/isms/soa/{eid}/approve",
json={"reviewed_by": "cto@bp.de", "approved_by": "ceo@bp.de"},
)
assert r.status_code == 200
assert r.json()["approved_by"] == "ceo@bp.de"
# =============================================================================
# Test: Audit Findings
# =============================================================================
class TestAuditFindingsCRUD:
"""Tests for Audit Finding endpoints."""
def test_create_finding(self):
"""POST /isms/findings should create a finding with auto-generated ID."""
r = client.post("/isms/findings", json=_finding_payload())
assert r.status_code == 200
body = r.json()
assert body["finding_id"].startswith("FIND-")
assert body["status"] == "open"
def test_list_findings(self):
"""GET /isms/findings should list all findings."""
client.post("/isms/findings", json=_finding_payload())
client.post("/isms/findings", json=_finding_payload(finding_type="major", title="Major finding"))
r = client.get("/isms/findings")
assert r.status_code == 200
assert r.json()["total"] == 2
assert r.json()["major_count"] == 1
assert r.json()["minor_count"] == 1
def test_update_finding(self):
"""PUT /isms/findings/{id} should update a finding."""
create = client.post("/isms/findings", json=_finding_payload())
fid = create.json()["id"]
r = client.put(
f"/isms/findings/{fid}",
json={"root_cause": "Missing documentation process"},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 200
assert r.json()["root_cause"] == "Missing documentation process"
def test_update_finding_not_found(self):
"""PUT /isms/findings/{id} should 404 for unknown finding."""
r = client.put(
"/isms/findings/fake",
json={"root_cause": "x"},
params={"updated_by": "a"},
)
assert r.status_code == 404
def test_close_finding_no_capas(self):
"""POST /isms/findings/{id}/close should succeed if no CAPAs exist."""
create = client.post("/isms/findings", json=_finding_payload())
fid = create.json()["id"]
r = client.post(
f"/isms/findings/{fid}/close",
json={
"closure_notes": "Verified corrected",
"closed_by": "auditor@cert.de",
"verification_method": "Document review",
"verification_evidence": "Updated schedule approved",
},
)
assert r.status_code == 200
assert r.json()["status"] == "closed"
def test_close_finding_not_found(self):
"""POST /isms/findings/{id}/close should 404 for unknown finding."""
r = client.post(
"/isms/findings/fake/close",
json={
"closure_notes": "x",
"closed_by": "a",
"verification_method": "x",
"verification_evidence": "x",
},
)
assert r.status_code == 404
# =============================================================================
# Test: Management Reviews
# =============================================================================
class TestManagementReviewCRUD:
"""Tests for Management Review endpoints."""
def test_create_management_review(self):
"""POST /isms/management-reviews should create a review."""
r = client.post(
"/isms/management-reviews",
json=_mgmt_review_payload(),
params={"created_by": "iso@bp.de"},
)
assert r.status_code == 200
body = r.json()
assert body["review_id"].startswith("MR-")
assert body["status"] == "draft"
def test_list_management_reviews(self):
"""GET /isms/management-reviews should list reviews."""
client.post("/isms/management-reviews", json=_mgmt_review_payload(), params={"created_by": "a"})
r = client.get("/isms/management-reviews")
assert r.status_code == 200
assert r.json()["total"] == 1
def test_get_management_review(self):
"""GET /isms/management-reviews/{id} should return a review."""
create = client.post("/isms/management-reviews", json=_mgmt_review_payload(), params={"created_by": "a"})
rid = create.json()["id"]
r = client.get(f"/isms/management-reviews/{rid}")
assert r.status_code == 200
assert r.json()["chairperson"] == "ceo@breakpilot.de"
def test_get_management_review_not_found(self):
"""GET /isms/management-reviews/{id} should 404 for unknown review."""
r = client.get("/isms/management-reviews/fake")
assert r.status_code == 404
def test_update_management_review(self):
"""PUT /isms/management-reviews/{id} should update a review."""
create = client.post("/isms/management-reviews", json=_mgmt_review_payload(), params={"created_by": "a"})
rid = create.json()["id"]
r = client.put(
f"/isms/management-reviews/{rid}",
json={"input_previous_actions": "All actions completed", "status": "conducted"},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 200
assert r.json()["input_previous_actions"] == "All actions completed"
def test_approve_management_review(self):
"""POST /isms/management-reviews/{id}/approve should approve."""
create = client.post("/isms/management-reviews", json=_mgmt_review_payload(), params={"created_by": "a"})
rid = create.json()["id"]
r = client.post(
f"/isms/management-reviews/{rid}/approve",
json={
"approved_by": "ceo@bp.de",
"next_review_date": "2026-07-01",
},
)
assert r.status_code == 200
assert r.json()["status"] == "approved"
assert r.json()["approved_by"] == "ceo@bp.de"
# =============================================================================
# Test: Internal Audits
# =============================================================================
class TestInternalAuditCRUD:
"""Tests for Internal Audit endpoints."""
def test_create_internal_audit(self):
"""POST /isms/internal-audits should create an audit."""
r = client.post(
"/isms/internal-audits",
json=_internal_audit_payload(),
params={"created_by": "iso@bp.de"},
)
assert r.status_code == 200
body = r.json()
assert body["audit_id"].startswith("IA-")
assert body["status"] == "planned"
def test_list_internal_audits(self):
"""GET /isms/internal-audits should list audits."""
client.post("/isms/internal-audits", json=_internal_audit_payload(), params={"created_by": "a"})
r = client.get("/isms/internal-audits")
assert r.status_code == 200
assert r.json()["total"] == 1
def test_update_internal_audit(self):
"""PUT /isms/internal-audits/{id} should update an audit."""
create = client.post("/isms/internal-audits", json=_internal_audit_payload(), params={"created_by": "a"})
aid = create.json()["id"]
r = client.put(
f"/isms/internal-audits/{aid}",
json={"status": "in_progress", "actual_start_date": "2026-03-01"},
params={"updated_by": "iso@bp.de"},
)
assert r.status_code == 200
assert r.json()["status"] == "in_progress"
def test_update_internal_audit_not_found(self):
"""PUT /isms/internal-audits/{id} should 404 for unknown audit."""
r = client.put(
"/isms/internal-audits/fake",
json={"status": "in_progress"},
params={"updated_by": "a"},
)
assert r.status_code == 404
def test_complete_internal_audit(self):
"""POST /isms/internal-audits/{id}/complete should complete an audit."""
create = client.post("/isms/internal-audits", json=_internal_audit_payload(), params={"created_by": "a"})
aid = create.json()["id"]
r = client.post(
f"/isms/internal-audits/{aid}/complete",
json={
"audit_conclusion": "Overall conforming with minor observations",
"overall_assessment": "conforming",
"follow_up_audit_required": False,
},
params={"completed_by": "auditor@bp.de"},
)
assert r.status_code == 200
assert r.json()["status"] == "completed"
assert r.json()["follow_up_audit_required"] is False
def test_complete_internal_audit_not_found(self):
"""POST /isms/internal-audits/{id}/complete should 404 for unknown audit."""
r = client.post(
"/isms/internal-audits/fake/complete",
json={
"audit_conclusion": "x",
"overall_assessment": "conforming",
"follow_up_audit_required": False,
},
params={"completed_by": "a"},
)
assert r.status_code == 404
# =============================================================================
# Test: Readiness Check
# =============================================================================
class TestReadinessCheck:
"""Tests for the ISMS Readiness Check endpoint."""
def test_readiness_check_empty_isms(self):
"""POST /isms/readiness-check on empty DB should show not_ready."""
r = client.post("/isms/readiness-check", json={"triggered_by": "test"})
assert r.status_code == 200
body = r.json()
assert body["certification_possible"] is False
assert body["overall_status"] == "not_ready"
assert len(body["potential_majors"]) > 0
def test_readiness_check_latest_not_found(self):
"""GET /isms/readiness-check/latest should 404 when no check has run."""
r = client.get("/isms/readiness-check/latest")
assert r.status_code == 404
def test_readiness_check_latest_returns_most_recent(self):
"""GET /isms/readiness-check/latest should return last check."""
client.post("/isms/readiness-check", json={"triggered_by": "first"})
client.post("/isms/readiness-check", json={"triggered_by": "second"})
r = client.get("/isms/readiness-check/latest")
assert r.status_code == 200
assert r.json()["triggered_by"] == "second"
# =============================================================================
# Test: Overview / Dashboard
# =============================================================================
class TestOverviewDashboard:
"""Tests for the ISO 27001 overview endpoint."""
def test_overview_empty_isms(self):
"""GET /isms/overview on empty DB should return not_ready."""
r = client.get("/isms/overview")
assert r.status_code == 200
body = r.json()
assert body["overall_status"] in ("not_ready", "at_risk")
assert body["scope_approved"] is False
assert body["open_major_findings"] == 0
assert body["policies_count"] == 0
def test_overview_with_data(self):
"""GET /isms/overview should reflect created data."""
# Create and approve a scope
scope = client.post("/isms/scope", json=_scope_payload(), params={"created_by": "a"})
client.post(
f"/isms/scope/{scope.json()['id']}/approve",
json={"approved_by": "ceo@bp.de", "effective_date": "2026-01-01", "review_date": "2027-01-01"},
)
# Create a policy
client.post("/isms/policies", json=_policy_payload())
# Create an objective
client.post("/isms/objectives", json=_objective_payload(), params={"created_by": "a"})
r = client.get("/isms/overview")
assert r.status_code == 200
body = r.json()
assert body["scope_approved"] is True
assert body["policies_count"] == 1
assert body["objectives_count"] == 1
# =============================================================================
# Test: Audit Trail
# =============================================================================
class TestAuditTrail:
"""Tests for the Audit Trail endpoint."""
def test_audit_trail_records_actions(self):
"""Creating entities should generate audit trail entries."""
client.post("/isms/scope", json=_scope_payload(), params={"created_by": "admin@bp.de"})
client.post("/isms/policies", json=_policy_payload())
r = client.get("/isms/audit-trail")
assert r.status_code == 200
assert r.json()["total"] >= 2
def test_audit_trail_filter_by_entity_type(self):
"""GET /isms/audit-trail?entity_type=isms_policy should filter."""
client.post("/isms/scope", json=_scope_payload(), params={"created_by": "a"})
client.post("/isms/policies", json=_policy_payload())
r = client.get("/isms/audit-trail", params={"entity_type": "isms_policy"})
assert r.status_code == 200
for entry in r.json()["entries"]:
assert entry["entity_type"] == "isms_policy"
def test_audit_trail_pagination(self):
"""GET /isms/audit-trail should support pagination."""
# Create several entries
for i in range(5):
client.post("/isms/policies", json=_policy_payload(f"POL-PAGI-{i:03d}"))
r = client.get("/isms/audit-trail", params={"page": 1, "page_size": 2})
assert r.status_code == 200
assert len(r.json()["entries"]) == 2
assert r.json()["pagination"]["has_next"] is True

View File

@@ -432,3 +432,337 @@ class TestVVTCsvExport:
text = self._collect_csv_body(response)
lines = text.strip().split('\n')
assert len(lines) == 1
# =============================================================================
# API Endpoint Tests (TestClient + mock DB)
# =============================================================================
from fastapi.testclient import TestClient
from fastapi import FastAPI
from compliance.api.vvt_routes import router
_app = FastAPI()
_app.include_router(router)
_client = TestClient(_app)
DEFAULT_TENANT = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
def _make_db_activity(**kwargs):
"""Create a mock VVTActivityDB object for query results."""
act = VVTActivityDB()
act.id = kwargs.get("id", uuid.uuid4())
act.tenant_id = kwargs.get("tenant_id", DEFAULT_TENANT)
act.vvt_id = kwargs.get("vvt_id", "VVT-001")
act.name = kwargs.get("name", "Test Verarbeitung")
act.description = kwargs.get("description", None)
act.purposes = kwargs.get("purposes", ["Vertragserfuellung"])
act.legal_bases = kwargs.get("legal_bases", ["Art. 6 Abs. 1b"])
act.data_subject_categories = kwargs.get("data_subject_categories", ["Kunden"])
act.personal_data_categories = kwargs.get("personal_data_categories", ["Email"])
act.recipient_categories = kwargs.get("recipient_categories", [])
act.third_country_transfers = kwargs.get("third_country_transfers", [])
act.retention_period = kwargs.get("retention_period", {"duration": "3 Jahre"})
act.tom_description = kwargs.get("tom_description", None)
act.business_function = kwargs.get("business_function", "IT")
act.systems = kwargs.get("systems", [])
act.deployment_model = kwargs.get("deployment_model", None)
act.data_sources = kwargs.get("data_sources", [])
act.data_flows = kwargs.get("data_flows", [])
act.protection_level = kwargs.get("protection_level", "MEDIUM")
act.dpia_required = kwargs.get("dpia_required", False)
act.structured_toms = kwargs.get("structured_toms", {})
act.status = kwargs.get("status", "DRAFT")
act.responsible = kwargs.get("responsible", None)
act.owner = kwargs.get("owner", None)
act.last_reviewed_at = kwargs.get("last_reviewed_at", None)
act.next_review_at = kwargs.get("next_review_at", None)
act.created_by = kwargs.get("created_by", "system")
act.dsfa_id = kwargs.get("dsfa_id", None)
act.created_at = kwargs.get("created_at", datetime(2026, 1, 15, 10, 0))
act.updated_at = kwargs.get("updated_at", None)
return act
def _make_db_org(**kwargs):
"""Create a mock VVTOrganizationDB object."""
org = VVTOrganizationDB()
org.id = kwargs.get("id", uuid.uuid4())
org.tenant_id = kwargs.get("tenant_id", DEFAULT_TENANT)
org.organization_name = kwargs.get("organization_name", "BreakPilot GmbH")
org.industry = kwargs.get("industry", "IT")
org.locations = kwargs.get("locations", ["Berlin"])
org.employee_count = kwargs.get("employee_count", 50)
org.dpo_name = kwargs.get("dpo_name", "Max DSB")
org.dpo_contact = kwargs.get("dpo_contact", "dsb@example.com")
org.vvt_version = kwargs.get("vvt_version", "1.0")
org.last_review_date = kwargs.get("last_review_date", None)
org.next_review_date = kwargs.get("next_review_date", None)
org.review_interval = kwargs.get("review_interval", "annual")
org.created_at = kwargs.get("created_at", datetime(2026, 1, 1))
org.updated_at = kwargs.get("updated_at", None)
return org
def _make_audit_entry(**kwargs):
"""Create a mock VVTAuditLogDB object."""
entry = VVTAuditLogDB()
entry.id = kwargs.get("id", uuid.uuid4())
entry.tenant_id = kwargs.get("tenant_id", DEFAULT_TENANT)
entry.action = kwargs.get("action", "CREATE")
entry.entity_type = kwargs.get("entity_type", "activity")
entry.entity_id = kwargs.get("entity_id", uuid.uuid4())
entry.changed_by = kwargs.get("changed_by", "system")
entry.old_values = kwargs.get("old_values", None)
entry.new_values = kwargs.get("new_values", {"name": "Test"})
entry.created_at = kwargs.get("created_at", datetime(2026, 1, 15, 10, 0))
return entry
@pytest.fixture
def mock_db():
from classroom_engine.database import get_db
from compliance.api.tenant_utils import get_tenant_id
db = MagicMock()
_app.dependency_overrides[get_db] = lambda: db
_app.dependency_overrides[get_tenant_id] = lambda: DEFAULT_TENANT
yield db
_app.dependency_overrides.clear()
class TestExportEndpoint:
"""Tests for GET /vvt/export (JSON and CSV)."""
def test_export_json_with_activities(self, mock_db):
act = _make_db_activity(vvt_id="VVT-EXP-001", name="Export Test")
org = _make_db_org()
# mock chained query for org
mock_db.query.return_value.filter.return_value.order_by.return_value.first.return_value = org
# mock chained query for activities
mock_db.query.return_value.filter.return_value.order_by.return_value.all.return_value = [act]
resp = _client.get("/vvt/export?format=json")
assert resp.status_code == 200
data = resp.json()
assert "exported_at" in data
assert "organization" in data
assert data["organization"]["name"] == "BreakPilot GmbH"
assert len(data["activities"]) == 1
assert data["activities"][0]["vvt_id"] == "VVT-EXP-001"
def test_export_json_empty_dataset(self, mock_db):
mock_db.query.return_value.filter.return_value.order_by.return_value.first.return_value = None
mock_db.query.return_value.filter.return_value.order_by.return_value.all.return_value = []
resp = _client.get("/vvt/export?format=json")
assert resp.status_code == 200
data = resp.json()
assert data["organization"] is None
assert data["activities"] == []
def test_export_csv_returns_streaming_response(self, mock_db):
act = _make_db_activity(vvt_id="VVT-CSV-E01", name="CSV Endpoint Test")
mock_db.query.return_value.filter.return_value.order_by.return_value.first.return_value = None
mock_db.query.return_value.filter.return_value.order_by.return_value.all.return_value = [act]
resp = _client.get("/vvt/export?format=csv")
assert resp.status_code == 200
assert "text/csv" in resp.headers.get("content-type", "")
assert "attachment" in resp.headers.get("content-disposition", "")
body = resp.text
assert "VVT-CSV-E01" in body
assert "CSV Endpoint Test" in body
def test_export_invalid_format_rejected(self, mock_db):
resp = _client.get("/vvt/export?format=xml")
assert resp.status_code == 422 # validation error
class TestStatsEndpoint:
"""Tests for GET /vvt/stats."""
def test_stats_empty_tenant(self, mock_db):
mock_db.query.return_value.filter.return_value.all.return_value = []
resp = _client.get("/vvt/stats")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 0
assert data["by_status"] == {}
assert data["dpia_required_count"] == 0
assert data["overdue_review_count"] == 0
def test_stats_with_activities(self, mock_db):
past = datetime(2025, 1, 1, tzinfo=timezone.utc)
acts = [
_make_db_activity(status="DRAFT", business_function="HR", dpia_required=True, next_review_at=past),
_make_db_activity(status="APPROVED", business_function="IT", dpia_required=False),
_make_db_activity(status="DRAFT", business_function="HR", dpia_required=False, third_country_transfers=["USA"]),
]
mock_db.query.return_value.filter.return_value.all.return_value = acts
resp = _client.get("/vvt/stats")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 3
assert data["by_status"]["DRAFT"] == 2
assert data["by_status"]["APPROVED"] == 1
assert data["by_business_function"]["HR"] == 2
assert data["by_business_function"]["IT"] == 1
assert data["dpia_required_count"] == 1
assert data["third_country_count"] == 1
assert data["draft_count"] == 2
assert data["approved_count"] == 1
assert data["overdue_review_count"] == 1
class TestAuditLogEndpoint:
"""Tests for GET /vvt/audit-log."""
def test_audit_log_returns_entries(self, mock_db):
entry = _make_audit_entry(action="CREATE", entity_type="activity")
mock_db.query.return_value.filter.return_value.order_by.return_value.offset.return_value.limit.return_value.all.return_value = [entry]
resp = _client.get("/vvt/audit-log")
assert resp.status_code == 200
data = resp.json()
assert len(data) == 1
assert data[0]["action"] == "CREATE"
assert data[0]["entity_type"] == "activity"
def test_audit_log_empty(self, mock_db):
mock_db.query.return_value.filter.return_value.order_by.return_value.offset.return_value.limit.return_value.all.return_value = []
resp = _client.get("/vvt/audit-log")
assert resp.status_code == 200
assert resp.json() == []
def test_audit_log_pagination_params(self, mock_db):
mock_db.query.return_value.filter.return_value.order_by.return_value.offset.return_value.limit.return_value.all.return_value = []
resp = _client.get("/vvt/audit-log?limit=10&offset=20")
assert resp.status_code == 200
class TestVersioningEndpoints:
"""Tests for GET /vvt/activities/{id}/versions and /versions/{v}."""
@patch("compliance.api.versioning_utils.list_versions")
def test_list_versions_returns_list(self, mock_list_versions, mock_db):
act_id = str(uuid.uuid4())
mock_list_versions.return_value = [
{"id": str(uuid.uuid4()), "version_number": 2, "status": "draft",
"change_summary": "Updated name", "changed_sections": [],
"created_by": "admin", "approved_by": None, "approved_at": None,
"created_at": "2026-01-15T10:00:00"},
{"id": str(uuid.uuid4()), "version_number": 1, "status": "draft",
"change_summary": "Initial", "changed_sections": [],
"created_by": "system", "approved_by": None, "approved_at": None,
"created_at": "2026-01-14T09:00:00"},
]
resp = _client.get(f"/vvt/activities/{act_id}/versions")
assert resp.status_code == 200
data = resp.json()
assert len(data) == 2
assert data[0]["version_number"] == 2
assert data[1]["version_number"] == 1
@patch("compliance.api.versioning_utils.list_versions")
def test_list_versions_empty(self, mock_list_versions, mock_db):
act_id = str(uuid.uuid4())
mock_list_versions.return_value = []
resp = _client.get(f"/vvt/activities/{act_id}/versions")
assert resp.status_code == 200
assert resp.json() == []
@patch("compliance.api.versioning_utils.get_version")
def test_get_specific_version(self, mock_get_version, mock_db):
act_id = str(uuid.uuid4())
mock_get_version.return_value = {
"id": str(uuid.uuid4()),
"version_number": 1,
"status": "approved",
"snapshot": {"name": "Test", "status": "APPROVED"},
"change_summary": "Initial version",
"changed_sections": ["name", "status"],
"created_by": "admin",
"approved_by": "dpo",
"approved_at": "2026-01-16T12:00:00",
"created_at": "2026-01-15T10:00:00",
}
resp = _client.get(f"/vvt/activities/{act_id}/versions/1")
assert resp.status_code == 200
data = resp.json()
assert data["version_number"] == 1
assert data["snapshot"]["name"] == "Test"
assert data["approved_by"] == "dpo"
@patch("compliance.api.versioning_utils.get_version")
def test_get_version_not_found(self, mock_get_version, mock_db):
act_id = str(uuid.uuid4())
mock_get_version.return_value = None
resp = _client.get(f"/vvt/activities/{act_id}/versions/999")
assert resp.status_code == 404
assert "not found" in resp.json()["detail"].lower()
class TestExportCsvEdgeCases:
"""Additional edge cases for CSV export helper."""
def _collect_csv_body(self, response) -> str:
import asyncio
async def _read():
chunks = []
async for chunk in response.body_iterator:
chunks.append(chunk)
return ''.join(chunks)
return asyncio.get_event_loop().run_until_complete(_read())
def test_export_csv_with_third_country_transfers(self):
from compliance.api.vvt_routes import _export_csv
act = _make_db_activity(
third_country_transfers=["USA", "China"],
vvt_id="VVT-TC-001",
name="Third Country Test",
)
response = _export_csv([act])
text = self._collect_csv_body(response)
assert "Ja" in text # third_country_transfers truthy -> "Ja"
def test_export_csv_no_third_country_transfers(self):
from compliance.api.vvt_routes import _export_csv
act = _make_db_activity(
third_country_transfers=[],
vvt_id="VVT-NTC-001",
name="No Third Country",
)
response = _export_csv([act])
text = self._collect_csv_body(response)
assert "Nein" in text # empty list -> "Nein"
def test_export_csv_multiple_activities(self):
from compliance.api.vvt_routes import _export_csv
acts = [
_make_db_activity(vvt_id="VVT-M-001", name="First"),
_make_db_activity(vvt_id="VVT-M-002", name="Second"),
_make_db_activity(vvt_id="VVT-M-003", name="Third"),
]
response = _export_csv(acts)
text = self._collect_csv_body(response)
lines = text.strip().split('\n')
# 1 header + 3 data rows
assert len(lines) == 4
assert "VVT-M-001" in lines[1]
assert "VVT-M-002" in lines[2]
assert "VVT-M-003" in lines[3]
def test_export_csv_content_disposition_filename(self):
from compliance.api.vvt_routes import _export_csv
response = _export_csv([])
assert "vvt_export_" in response.headers.get("content-disposition", "")
assert ".csv" in response.headers.get("content-disposition", "")