Files
breakpilot-compliance/backend-compliance/tests/test_security_backlog_routes.py
Benjamin Admin 25d5da78ef
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-ai-compliance (push) Successful in 34s
CI / test-python-backend-compliance (push) Successful in 32s
CI / test-python-document-crawler (push) Successful in 21s
CI / test-python-dsms-gateway (push) Successful in 17s
feat: Alle 5 verbleibenden SDK-Module auf 100% — RAG, Security-Backlog, Quality, Notfallplan, Loeschfristen
Paket A — RAG Proxy:
- NEU: admin-compliance/app/api/sdk/v1/rag/[[...path]]/route.ts
  → Proxy zu ai-compliance-sdk:8090, GET+POST, UUID-Validierung
- UPDATE: rag/page.tsx — setTimeout Mock → echte API-Calls
  GET /regulations → dynamische suggestedQuestions
  POST /search → Qdrant-Ergebnisse mit score, title, reference

Paket B — Security-Backlog + Quality:
- NEU: migrations/014_security_backlog.sql + 015_quality.sql
- NEU: compliance/api/security_backlog_routes.py — CRUD + Stats
- NEU: compliance/api/quality_routes.py — Metrics + Tests CRUD + Stats
- UPDATE: security-backlog/page.tsx — mockItems → API
- UPDATE: quality/page.tsx — mockMetrics/mockTests → API
- UPDATE: compliance/api/__init__.py — Router-Registrierung
- NEU: tests/test_security_backlog_routes.py (48 Tests — 48/48 bestanden)
- NEU: tests/test_quality_routes.py (67 Tests — 67/67 bestanden)

Paket C — Notfallplan Incidents + Templates:
- NEU: migrations/016_notfallplan_incidents.sql
  compliance_notfallplan_incidents + compliance_notfallplan_templates
- UPDATE: notfallplan_routes.py — GET/POST/PUT/DELETE für /incidents + /templates
- UPDATE: notfallplan/page.tsx — Incidents-Tab + Templates-Tab → API
- UPDATE: tests/test_notfallplan_routes.py (+76 neue Tests — alle bestanden)

Paket D — Loeschfristen localStorage → API:
- NEU: migrations/017_loeschfristen.sql (JSONB: legal_holds, storage_locations, ...)
- NEU: compliance/api/loeschfristen_routes.py — CRUD + Stats + Status-Update
- UPDATE: loeschfristen/page.tsx — vollständige localStorage → API Migration
  createNewPolicy → POST (API-UUID als id), deletePolicy → DELETE,
  handleSaveAndClose → PUT, adoptGeneratedPolicies → POST je Policy
  apiToPolicy() + policyToPayload() Mapper, saving-State für Buttons
- NEU: tests/test_loeschfristen_routes.py (58 Tests — alle bestanden)

Gesamt: 253 neue Tests, alle bestanden (48 + 67 + 76 + 58 + bestehende)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-03 18:04:53 +01:00

699 lines
25 KiB
Python

"""Tests for Security Backlog routes (security_backlog_routes.py).
Covers:
- Schema validation (SecurityItemCreate, SecurityItemUpdate)
- Helper functions (_row_to_dict, _get_tenant_id)
- HTTP endpoints via FastAPI TestClient with mocked DB session
"""
import pytest
from unittest.mock import MagicMock
from datetime import datetime
from fastapi import FastAPI
from fastapi.testclient import TestClient
from compliance.api.security_backlog_routes import (
router,
SecurityItemCreate,
SecurityItemUpdate,
_row_to_dict,
_get_tenant_id,
DEFAULT_TENANT_ID,
)
from classroom_engine.database import get_db
# =============================================================================
# TestClient Setup
# =============================================================================
app = FastAPI()
app.include_router(router)
DEFAULT_TENANT = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e"
OTHER_TENANT = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
ITEM_ID = "aaaaaaaa-0001-0001-0001-000000000001"
def make_item_row(overrides=None):
data = {
"id": ITEM_ID,
"tenant_id": DEFAULT_TENANT,
"title": "Test Item",
"description": "Test description",
"type": "vulnerability",
"severity": "medium",
"status": "open",
"source": None,
"cve": None,
"cvss": None,
"affected_asset": None,
"assigned_to": None,
"due_date": None,
"remediation": None,
"created_at": datetime(2024, 1, 1),
"updated_at": datetime(2024, 1, 1),
}
if overrides:
data.update(overrides)
row = MagicMock()
row._mapping = data
return row
def make_stats_row(overrides=None):
data = {
"open": 3,
"in_progress": 1,
"resolved": 2,
"accepted_risk": 0,
"critical": 1,
"high": 2,
"overdue": 1,
"total": 6,
}
if overrides:
data.update(overrides)
row = MagicMock()
row._mapping = data
return row
@pytest.fixture
def mock_db():
db = MagicMock()
def override_get_db():
yield db
app.dependency_overrides[get_db] = override_get_db
yield db
app.dependency_overrides.clear()
@pytest.fixture
def test_client():
return TestClient(app)
# Module-level client for simple tests that set up their own mocks per test
client = TestClient(app)
# =============================================================================
# Schema Tests — SecurityItemCreate
# =============================================================================
class TestSecurityItemCreate:
def test_minimal_valid(self):
req = SecurityItemCreate(title="SQL Injection in Login")
assert req.title == "SQL Injection in Login"
assert req.type == "vulnerability"
assert req.severity == "medium"
assert req.status == "open"
assert req.description is None
assert req.source is None
assert req.cve is None
assert req.cvss is None
assert req.affected_asset is None
assert req.assigned_to is None
assert req.due_date is None
assert req.remediation is None
def test_full_values(self):
due = datetime(2026, 6, 30)
req = SecurityItemCreate(
title="CVE-2024-1234",
description="Remote code execution in parser",
type="cve",
severity="critical",
status="in-progress",
source="NVD",
cve="CVE-2024-1234",
cvss=9.8,
affected_asset="api-server",
assigned_to="security-team",
due_date=due,
remediation="Upgrade to v2.1.0",
)
assert req.title == "CVE-2024-1234"
assert req.type == "cve"
assert req.severity == "critical"
assert req.cvss == 9.8
assert req.cve == "CVE-2024-1234"
assert req.assigned_to == "security-team"
def test_serialization_excludes_none(self):
req = SecurityItemCreate(title="Patch missing", severity="high")
data = req.model_dump(exclude_none=True)
assert data["title"] == "Patch missing"
assert data["severity"] == "high"
assert "description" not in data
assert "cve" not in data
def test_serialization_includes_defaults(self):
req = SecurityItemCreate(title="Test")
data = req.model_dump()
assert data["type"] == "vulnerability"
assert data["severity"] == "medium"
assert data["status"] == "open"
# =============================================================================
# Schema Tests — SecurityItemUpdate
# =============================================================================
class TestSecurityItemUpdate:
def test_empty_update(self):
req = SecurityItemUpdate()
data = req.model_dump(exclude_unset=True)
assert data == {}
def test_partial_update_status(self):
req = SecurityItemUpdate(status="resolved")
data = req.model_dump(exclude_unset=True)
assert data == {"status": "resolved"}
def test_partial_update_severity(self):
req = SecurityItemUpdate(severity="critical", assigned_to="john@example.com")
data = req.model_dump(exclude_unset=True)
assert data["severity"] == "critical"
assert data["assigned_to"] == "john@example.com"
assert "title" not in data
def test_full_update(self):
req = SecurityItemUpdate(
title="Updated Title",
description="New desc",
type="misconfiguration",
severity="low",
status="accepted-risk",
remediation="Accept the risk",
)
data = req.model_dump(exclude_unset=True)
assert len(data) == 6
assert data["type"] == "misconfiguration"
assert data["status"] == "accepted-risk"
# =============================================================================
# Helper Tests — _row_to_dict
# =============================================================================
class TestRowToDict:
def test_basic_conversion(self):
row = make_item_row()
result = _row_to_dict(row)
assert result["id"] == ITEM_ID
assert result["title"] == "Test Item"
assert result["severity"] == "medium"
def test_datetime_serialized(self):
ts = datetime(2024, 6, 15, 10, 30, 0)
row = make_item_row({"created_at": ts, "updated_at": ts})
result = _row_to_dict(row)
assert result["created_at"] == ts.isoformat()
assert result["updated_at"] == ts.isoformat()
def test_none_values_preserved(self):
row = make_item_row()
result = _row_to_dict(row)
assert result["source"] is None
assert result["cve"] is None
assert result["cvss"] is None
assert result["affected_asset"] is None
assert result["due_date"] is None
def test_uuid_converted_to_string(self):
import uuid
uid = uuid.UUID(DEFAULT_TENANT)
row = MagicMock()
row._mapping = {"id": uid, "tenant_id": uid}
result = _row_to_dict(row)
assert result["id"] == str(uid)
assert result["tenant_id"] == str(uid)
def test_string_and_numeric_unchanged(self):
row = MagicMock()
row._mapping = {"title": "Test", "cvss": 7.5, "active": True}
result = _row_to_dict(row)
assert result["title"] == "Test"
assert result["cvss"] == 7.5
assert result["active"] is True
# =============================================================================
# Helper Tests — _get_tenant_id
# =============================================================================
class TestGetTenantId:
def test_valid_uuid_returned(self):
result = _get_tenant_id(x_tenant_id=DEFAULT_TENANT)
assert result == DEFAULT_TENANT
def test_none_returns_default(self):
result = _get_tenant_id(x_tenant_id=None)
assert result == DEFAULT_TENANT_ID
def test_invalid_uuid_returns_default(self):
result = _get_tenant_id(x_tenant_id="not-a-uuid")
assert result == DEFAULT_TENANT_ID
def test_empty_string_returns_default(self):
result = _get_tenant_id(x_tenant_id="")
assert result == DEFAULT_TENANT_ID
def test_different_valid_tenant(self):
result = _get_tenant_id(x_tenant_id=OTHER_TENANT)
assert result == OTHER_TENANT
def test_partial_uuid_returns_default(self):
result = _get_tenant_id(x_tenant_id="9282a473-5c95-4b3a")
assert result == DEFAULT_TENANT_ID
# =============================================================================
# HTTP Tests — GET /security-backlog
# =============================================================================
class TestListSecurityItems:
def test_empty_list(self, mock_db):
count_row = MagicMock()
count_row.__getitem__ = lambda self, i: 0
mock_db.execute.return_value.fetchone.return_value = count_row
mock_db.execute.return_value.fetchall.return_value = []
response = client.get("/security-backlog")
assert response.status_code == 200
data = response.json()
assert data["items"] == []
assert data["total"] == 0
def test_list_with_data(self, mock_db):
count_row = MagicMock()
count_row.__getitem__ = lambda self, i: 2
item1 = make_item_row()
item2 = make_item_row({"id": "aaaaaaaa-0002-0002-0002-000000000002", "title": "Second Item"})
execute_result = MagicMock()
execute_result.fetchone.return_value = count_row
execute_result.fetchall.return_value = [item1, item2]
mock_db.execute.return_value = execute_result
response = client.get("/security-backlog")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 2
assert data["total"] == 2
assert data["items"][0]["title"] == "Test Item"
def test_filter_by_status(self, mock_db):
count_row = MagicMock()
count_row.__getitem__ = lambda self, i: 1
execute_result = MagicMock()
execute_result.fetchone.return_value = count_row
execute_result.fetchall.return_value = [make_item_row({"status": "resolved"})]
mock_db.execute.return_value = execute_result
response = client.get("/security-backlog?status=resolved")
assert response.status_code == 200
data = response.json()
assert data["items"][0]["status"] == "resolved"
def test_filter_by_severity(self, mock_db):
count_row = MagicMock()
count_row.__getitem__ = lambda self, i: 1
execute_result = MagicMock()
execute_result.fetchone.return_value = count_row
execute_result.fetchall.return_value = [make_item_row({"severity": "critical"})]
mock_db.execute.return_value = execute_result
response = client.get("/security-backlog?severity=critical")
assert response.status_code == 200
data = response.json()
assert data["items"][0]["severity"] == "critical"
def test_filter_by_type(self, mock_db):
count_row = MagicMock()
count_row.__getitem__ = lambda self, i: 1
execute_result = MagicMock()
execute_result.fetchone.return_value = count_row
execute_result.fetchall.return_value = [make_item_row({"type": "misconfiguration"})]
mock_db.execute.return_value = execute_result
response = client.get("/security-backlog?type=misconfiguration")
assert response.status_code == 200
data = response.json()
assert data["items"][0]["type"] == "misconfiguration"
def test_filter_by_search(self, mock_db):
count_row = MagicMock()
count_row.__getitem__ = lambda self, i: 1
execute_result = MagicMock()
execute_result.fetchone.return_value = count_row
execute_result.fetchall.return_value = [make_item_row({"title": "SQL Injection"})]
mock_db.execute.return_value = execute_result
response = client.get("/security-backlog?search=SQL")
assert response.status_code == 200
data = response.json()
assert "SQL" in data["items"][0]["title"]
def test_pagination_limit_offset(self, mock_db):
count_row = MagicMock()
count_row.__getitem__ = lambda self, i: 10
execute_result = MagicMock()
execute_result.fetchone.return_value = count_row
execute_result.fetchall.return_value = [make_item_row()]
mock_db.execute.return_value = execute_result
response = client.get("/security-backlog?limit=1&offset=5")
assert response.status_code == 200
data = response.json()
assert data["total"] == 10
assert len(data["items"]) == 1
def test_default_tenant_used_without_header(self, mock_db):
count_row = MagicMock()
count_row.__getitem__ = lambda self, i: 0
execute_result = MagicMock()
execute_result.fetchone.return_value = count_row
execute_result.fetchall.return_value = []
mock_db.execute.return_value = execute_result
response = client.get("/security-backlog")
assert response.status_code == 200
# Verify execute was called (tenant was resolved internally)
assert mock_db.execute.called
# =============================================================================
# HTTP Tests — GET /security-backlog/stats
# =============================================================================
class TestGetSecurityStats:
def test_stats_all_zeros_empty(self, mock_db):
zero_row = MagicMock()
zero_row._mapping = {
"open": 0, "in_progress": 0, "resolved": 0, "accepted_risk": 0,
"critical": 0, "high": 0, "overdue": 0, "total": 0,
}
mock_db.execute.return_value.fetchone.return_value = zero_row
response = client.get("/security-backlog/stats")
assert response.status_code == 200
data = response.json()
assert data["total"] == 0
assert data["open"] == 0
assert data["critical"] == 0
assert data["overdue"] == 0
def test_stats_with_data(self, mock_db):
stats_row = make_stats_row()
mock_db.execute.return_value.fetchone.return_value = stats_row
response = client.get("/security-backlog/stats")
assert response.status_code == 200
data = response.json()
assert data["total"] == 6
assert data["open"] == 3
assert data["critical"] == 1
assert data["high"] == 2
assert data["overdue"] == 1
assert data["in_progress"] == 1
assert data["resolved"] == 2
assert data["accepted_risk"] == 0
def test_stats_none_values_become_zero(self, mock_db):
none_row = MagicMock()
none_row._mapping = {
"open": None, "in_progress": None, "resolved": None, "accepted_risk": None,
"critical": None, "high": None, "overdue": None, "total": None,
}
mock_db.execute.return_value.fetchone.return_value = none_row
response = client.get("/security-backlog/stats")
assert response.status_code == 200
data = response.json()
for key in ("open", "in_progress", "resolved", "accepted_risk", "critical", "high", "overdue", "total"):
assert data[key] == 0
def test_stats_with_tenant_header(self, mock_db):
stats_row = make_stats_row({"total": 1, "open": 1})
mock_db.execute.return_value.fetchone.return_value = stats_row
response = client.get(
"/security-backlog/stats",
headers={"X-Tenant-Id": OTHER_TENANT},
)
assert response.status_code == 200
assert response.json()["total"] == 1
# =============================================================================
# HTTP Tests — POST /security-backlog
# =============================================================================
class TestCreateSecurityItem:
def test_create_success(self, mock_db):
created_row = make_item_row({"title": "New Vulnerability"})
mock_db.execute.return_value.fetchone.return_value = created_row
response = client.post(
"/security-backlog",
json={"title": "New Vulnerability"},
)
assert response.status_code == 201
data = response.json()
assert data["title"] == "New Vulnerability"
assert data["id"] == ITEM_ID
mock_db.commit.assert_called_once()
def test_create_full_item(self, mock_db):
created_row = make_item_row({
"title": "CVE-2024-9999",
"type": "cve",
"severity": "critical",
"status": "open",
"cve": "CVE-2024-9999",
"cvss": 9.8,
})
mock_db.execute.return_value.fetchone.return_value = created_row
response = client.post("/security-backlog", json={
"title": "CVE-2024-9999",
"type": "cve",
"severity": "critical",
"cve": "CVE-2024-9999",
"cvss": 9.8,
})
assert response.status_code == 201
data = response.json()
assert data["severity"] == "critical"
assert data["cvss"] == 9.8
def test_create_missing_title_fails(self, mock_db):
response = client.post("/security-backlog", json={"severity": "high"})
assert response.status_code == 422
def test_create_with_tenant_header(self, mock_db):
created_row = make_item_row({"tenant_id": OTHER_TENANT})
mock_db.execute.return_value.fetchone.return_value = created_row
response = client.post(
"/security-backlog",
json={"title": "Tenant-specific item"},
headers={"X-Tenant-Id": OTHER_TENANT},
)
assert response.status_code == 201
assert mock_db.commit.called
def test_create_default_type_and_severity(self, mock_db):
created_row = make_item_row()
mock_db.execute.return_value.fetchone.return_value = created_row
response = client.post("/security-backlog", json={"title": "Basic item"})
assert response.status_code == 201
data = response.json()
assert data["type"] == "vulnerability"
assert data["severity"] == "medium"
assert data["status"] == "open"
# =============================================================================
# HTTP Tests — PUT /security-backlog/{id}
# =============================================================================
class TestUpdateSecurityItem:
def test_update_success(self, mock_db):
updated_row = make_item_row({"status": "resolved", "severity": "low"})
mock_db.execute.return_value.fetchone.return_value = updated_row
response = client.put(
f"/security-backlog/{ITEM_ID}",
json={"status": "resolved", "severity": "low"},
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "resolved"
assert data["severity"] == "low"
mock_db.commit.assert_called_once()
def test_update_partial_title_only(self, mock_db):
updated_row = make_item_row({"title": "Updated Title"})
mock_db.execute.return_value.fetchone.return_value = updated_row
response = client.put(
f"/security-backlog/{ITEM_ID}",
json={"title": "Updated Title"},
)
assert response.status_code == 200
assert response.json()["title"] == "Updated Title"
def test_update_not_found_returns_404(self, mock_db):
mock_db.execute.return_value.fetchone.return_value = None
response = client.put(
"/security-backlog/nonexistent-id",
json={"status": "resolved"},
)
assert response.status_code == 404
assert "not found" in response.json()["detail"].lower()
def test_update_no_fields_returns_400(self, mock_db):
response = client.put(f"/security-backlog/{ITEM_ID}", json={})
assert response.status_code == 400
def test_update_with_tenant_header(self, mock_db):
updated_row = make_item_row({"tenant_id": OTHER_TENANT, "status": "in-progress"})
mock_db.execute.return_value.fetchone.return_value = updated_row
response = client.put(
f"/security-backlog/{ITEM_ID}",
json={"status": "in-progress"},
headers={"X-Tenant-Id": OTHER_TENANT},
)
assert response.status_code == 200
assert response.json()["status"] == "in-progress"
# =============================================================================
# HTTP Tests — DELETE /security-backlog/{id}
# =============================================================================
class TestDeleteSecurityItem:
def test_delete_success_204(self, mock_db):
delete_result = MagicMock()
delete_result.rowcount = 1
mock_db.execute.return_value = delete_result
response = client.delete(f"/security-backlog/{ITEM_ID}")
assert response.status_code == 204
mock_db.commit.assert_called_once()
def test_delete_not_found_returns_404(self, mock_db):
delete_result = MagicMock()
delete_result.rowcount = 0
mock_db.execute.return_value = delete_result
response = client.delete("/security-backlog/nonexistent-id")
assert response.status_code == 404
assert "not found" in response.json()["detail"].lower()
def test_delete_with_tenant_header(self, mock_db):
delete_result = MagicMock()
delete_result.rowcount = 1
mock_db.execute.return_value = delete_result
response = client.delete(
f"/security-backlog/{ITEM_ID}",
headers={"X-Tenant-Id": OTHER_TENANT},
)
assert response.status_code == 204
assert mock_db.commit.called
# =============================================================================
# Tenant Isolation Tests
# =============================================================================
class TestTenantIsolation:
def test_different_tenant_sees_different_items(self, mock_db):
"""Tenant A has 3 items, Tenant B has 0 — each sees only their own data."""
call_count = 0
def side_effect(query, params):
nonlocal call_count
result = MagicMock()
tid = params.get("tenant_id", "")
if tid == DEFAULT_TENANT:
count_row = MagicMock()
count_row.__getitem__ = lambda self, i: 3
result.fetchone.return_value = count_row
result.fetchall.return_value = [
make_item_row(),
make_item_row({"id": "aaaaaaaa-0002-0002-0002-000000000002"}),
make_item_row({"id": "aaaaaaaa-0003-0003-0003-000000000003"}),
]
else:
count_row = MagicMock()
count_row.__getitem__ = lambda self, i: 0
result.fetchone.return_value = count_row
result.fetchall.return_value = []
return result
mock_db.execute.side_effect = side_effect
resp_a = client.get(
"/security-backlog",
headers={"X-Tenant-Id": DEFAULT_TENANT},
)
assert resp_a.status_code == 200
assert resp_a.json()["total"] == 3
resp_b = client.get(
"/security-backlog",
headers={"X-Tenant-Id": OTHER_TENANT},
)
assert resp_b.status_code == 200
assert resp_b.json()["total"] == 0
def test_invalid_tenant_header_falls_back_to_default(self, mock_db):
count_row = MagicMock()
count_row.__getitem__ = lambda self, i: 0
execute_result = MagicMock()
execute_result.fetchone.return_value = count_row
execute_result.fetchall.return_value = []
mock_db.execute.return_value = execute_result
response = client.get(
"/security-backlog",
headers={"X-Tenant-Id": "not-a-real-uuid"},
)
assert response.status_code == 200
# Should succeed (falls back to DEFAULT_TENANT_ID)
assert "items" in response.json()
def test_create_uses_tenant_from_header(self, mock_db):
created_row = make_item_row({"tenant_id": OTHER_TENANT})
mock_db.execute.return_value.fetchone.return_value = created_row
response = client.post(
"/security-backlog",
json={"title": "Tenant B item"},
headers={"X-Tenant-Id": OTHER_TENANT},
)
assert response.status_code == 201
assert mock_db.commit.called
def test_delete_tenant_isolation_not_found_for_wrong_tenant(self, mock_db):
"""Deleting an item that belongs to a different tenant returns 404."""
delete_result = MagicMock()
delete_result.rowcount = 0 # No rows deleted (item belongs to other tenant)
mock_db.execute.return_value = delete_result
response = client.delete(
f"/security-backlog/{ITEM_ID}",
headers={"X-Tenant-Id": OTHER_TENANT},
)
assert response.status_code == 404