"""Tests for Security Backlog routes (security_backlog_routes.py). Covers: - Schema validation (SecurityItemCreate, SecurityItemUpdate) - Helper functions (_row_to_dict, _get_tenant_id) - HTTP endpoints via FastAPI TestClient with mocked DB session """ import pytest from unittest.mock import MagicMock from datetime import datetime from fastapi import FastAPI from fastapi.testclient import TestClient from compliance.api.security_backlog_routes import ( router, SecurityItemCreate, SecurityItemUpdate, _row_to_dict, _get_tenant_id, DEFAULT_TENANT_ID, ) from classroom_engine.database import get_db # ============================================================================= # TestClient Setup # ============================================================================= app = FastAPI() app.include_router(router) DEFAULT_TENANT = "9282a473-5c95-4b3a-bf78-0ecc0ec71d3e" OTHER_TENANT = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" ITEM_ID = "aaaaaaaa-0001-0001-0001-000000000001" def make_item_row(overrides=None): data = { "id": ITEM_ID, "tenant_id": DEFAULT_TENANT, "title": "Test Item", "description": "Test description", "type": "vulnerability", "severity": "medium", "status": "open", "source": None, "cve": None, "cvss": None, "affected_asset": None, "assigned_to": None, "due_date": None, "remediation": None, "created_at": datetime(2024, 1, 1), "updated_at": datetime(2024, 1, 1), } if overrides: data.update(overrides) row = MagicMock() row._mapping = data return row def make_stats_row(overrides=None): data = { "open": 3, "in_progress": 1, "resolved": 2, "accepted_risk": 0, "critical": 1, "high": 2, "overdue": 1, "total": 6, } if overrides: data.update(overrides) row = MagicMock() row._mapping = data return row @pytest.fixture def mock_db(): db = MagicMock() def override_get_db(): yield db app.dependency_overrides[get_db] = override_get_db yield db app.dependency_overrides.clear() @pytest.fixture def test_client(): return TestClient(app) # Module-level client for simple tests that set up their own mocks per test client = TestClient(app) # ============================================================================= # Schema Tests — SecurityItemCreate # ============================================================================= class TestSecurityItemCreate: def test_minimal_valid(self): req = SecurityItemCreate(title="SQL Injection in Login") assert req.title == "SQL Injection in Login" assert req.type == "vulnerability" assert req.severity == "medium" assert req.status == "open" assert req.description is None assert req.source is None assert req.cve is None assert req.cvss is None assert req.affected_asset is None assert req.assigned_to is None assert req.due_date is None assert req.remediation is None def test_full_values(self): due = datetime(2026, 6, 30) req = SecurityItemCreate( title="CVE-2024-1234", description="Remote code execution in parser", type="cve", severity="critical", status="in-progress", source="NVD", cve="CVE-2024-1234", cvss=9.8, affected_asset="api-server", assigned_to="security-team", due_date=due, remediation="Upgrade to v2.1.0", ) assert req.title == "CVE-2024-1234" assert req.type == "cve" assert req.severity == "critical" assert req.cvss == 9.8 assert req.cve == "CVE-2024-1234" assert req.assigned_to == "security-team" def test_serialization_excludes_none(self): req = SecurityItemCreate(title="Patch missing", severity="high") data = req.model_dump(exclude_none=True) assert data["title"] == "Patch missing" assert data["severity"] == "high" assert "description" not in data assert "cve" not in data def test_serialization_includes_defaults(self): req = SecurityItemCreate(title="Test") data = req.model_dump() assert data["type"] == "vulnerability" assert data["severity"] == "medium" assert data["status"] == "open" # ============================================================================= # Schema Tests — SecurityItemUpdate # ============================================================================= class TestSecurityItemUpdate: def test_empty_update(self): req = SecurityItemUpdate() data = req.model_dump(exclude_unset=True) assert data == {} def test_partial_update_status(self): req = SecurityItemUpdate(status="resolved") data = req.model_dump(exclude_unset=True) assert data == {"status": "resolved"} def test_partial_update_severity(self): req = SecurityItemUpdate(severity="critical", assigned_to="john@example.com") data = req.model_dump(exclude_unset=True) assert data["severity"] == "critical" assert data["assigned_to"] == "john@example.com" assert "title" not in data def test_full_update(self): req = SecurityItemUpdate( title="Updated Title", description="New desc", type="misconfiguration", severity="low", status="accepted-risk", remediation="Accept the risk", ) data = req.model_dump(exclude_unset=True) assert len(data) == 6 assert data["type"] == "misconfiguration" assert data["status"] == "accepted-risk" # ============================================================================= # Helper Tests — _row_to_dict # ============================================================================= class TestRowToDict: def test_basic_conversion(self): row = make_item_row() result = _row_to_dict(row) assert result["id"] == ITEM_ID assert result["title"] == "Test Item" assert result["severity"] == "medium" def test_datetime_serialized(self): ts = datetime(2024, 6, 15, 10, 30, 0) row = make_item_row({"created_at": ts, "updated_at": ts}) result = _row_to_dict(row) assert result["created_at"] == ts.isoformat() assert result["updated_at"] == ts.isoformat() def test_none_values_preserved(self): row = make_item_row() result = _row_to_dict(row) assert result["source"] is None assert result["cve"] is None assert result["cvss"] is None assert result["affected_asset"] is None assert result["due_date"] is None def test_uuid_converted_to_string(self): import uuid uid = uuid.UUID(DEFAULT_TENANT) row = MagicMock() row._mapping = {"id": uid, "tenant_id": uid} result = _row_to_dict(row) assert result["id"] == str(uid) assert result["tenant_id"] == str(uid) def test_string_and_numeric_unchanged(self): row = MagicMock() row._mapping = {"title": "Test", "cvss": 7.5, "active": True} result = _row_to_dict(row) assert result["title"] == "Test" assert result["cvss"] == 7.5 assert result["active"] is True # ============================================================================= # Helper Tests — _get_tenant_id # ============================================================================= class TestGetTenantId: def test_valid_uuid_returned(self): result = _get_tenant_id(x_tenant_id=DEFAULT_TENANT) assert result == DEFAULT_TENANT def test_none_returns_default(self): result = _get_tenant_id(x_tenant_id=None) assert result == DEFAULT_TENANT_ID def test_invalid_uuid_returns_default(self): result = _get_tenant_id(x_tenant_id="not-a-uuid") assert result == DEFAULT_TENANT_ID def test_empty_string_returns_default(self): result = _get_tenant_id(x_tenant_id="") assert result == DEFAULT_TENANT_ID def test_different_valid_tenant(self): result = _get_tenant_id(x_tenant_id=OTHER_TENANT) assert result == OTHER_TENANT def test_partial_uuid_returns_default(self): result = _get_tenant_id(x_tenant_id="9282a473-5c95-4b3a") assert result == DEFAULT_TENANT_ID # ============================================================================= # HTTP Tests — GET /security-backlog # ============================================================================= class TestListSecurityItems: def test_empty_list(self, mock_db): count_row = MagicMock() count_row.__getitem__ = lambda self, i: 0 mock_db.execute.return_value.fetchone.return_value = count_row mock_db.execute.return_value.fetchall.return_value = [] response = client.get("/security-backlog") assert response.status_code == 200 data = response.json() assert data["items"] == [] assert data["total"] == 0 def test_list_with_data(self, mock_db): count_row = MagicMock() count_row.__getitem__ = lambda self, i: 2 item1 = make_item_row() item2 = make_item_row({"id": "aaaaaaaa-0002-0002-0002-000000000002", "title": "Second Item"}) execute_result = MagicMock() execute_result.fetchone.return_value = count_row execute_result.fetchall.return_value = [item1, item2] mock_db.execute.return_value = execute_result response = client.get("/security-backlog") assert response.status_code == 200 data = response.json() assert len(data["items"]) == 2 assert data["total"] == 2 assert data["items"][0]["title"] == "Test Item" def test_filter_by_status(self, mock_db): count_row = MagicMock() count_row.__getitem__ = lambda self, i: 1 execute_result = MagicMock() execute_result.fetchone.return_value = count_row execute_result.fetchall.return_value = [make_item_row({"status": "resolved"})] mock_db.execute.return_value = execute_result response = client.get("/security-backlog?status=resolved") assert response.status_code == 200 data = response.json() assert data["items"][0]["status"] == "resolved" def test_filter_by_severity(self, mock_db): count_row = MagicMock() count_row.__getitem__ = lambda self, i: 1 execute_result = MagicMock() execute_result.fetchone.return_value = count_row execute_result.fetchall.return_value = [make_item_row({"severity": "critical"})] mock_db.execute.return_value = execute_result response = client.get("/security-backlog?severity=critical") assert response.status_code == 200 data = response.json() assert data["items"][0]["severity"] == "critical" def test_filter_by_type(self, mock_db): count_row = MagicMock() count_row.__getitem__ = lambda self, i: 1 execute_result = MagicMock() execute_result.fetchone.return_value = count_row execute_result.fetchall.return_value = [make_item_row({"type": "misconfiguration"})] mock_db.execute.return_value = execute_result response = client.get("/security-backlog?type=misconfiguration") assert response.status_code == 200 data = response.json() assert data["items"][0]["type"] == "misconfiguration" def test_filter_by_search(self, mock_db): count_row = MagicMock() count_row.__getitem__ = lambda self, i: 1 execute_result = MagicMock() execute_result.fetchone.return_value = count_row execute_result.fetchall.return_value = [make_item_row({"title": "SQL Injection"})] mock_db.execute.return_value = execute_result response = client.get("/security-backlog?search=SQL") assert response.status_code == 200 data = response.json() assert "SQL" in data["items"][0]["title"] def test_pagination_limit_offset(self, mock_db): count_row = MagicMock() count_row.__getitem__ = lambda self, i: 10 execute_result = MagicMock() execute_result.fetchone.return_value = count_row execute_result.fetchall.return_value = [make_item_row()] mock_db.execute.return_value = execute_result response = client.get("/security-backlog?limit=1&offset=5") assert response.status_code == 200 data = response.json() assert data["total"] == 10 assert len(data["items"]) == 1 def test_default_tenant_used_without_header(self, mock_db): count_row = MagicMock() count_row.__getitem__ = lambda self, i: 0 execute_result = MagicMock() execute_result.fetchone.return_value = count_row execute_result.fetchall.return_value = [] mock_db.execute.return_value = execute_result response = client.get("/security-backlog") assert response.status_code == 200 # Verify execute was called (tenant was resolved internally) assert mock_db.execute.called # ============================================================================= # HTTP Tests — GET /security-backlog/stats # ============================================================================= class TestGetSecurityStats: def test_stats_all_zeros_empty(self, mock_db): zero_row = MagicMock() zero_row._mapping = { "open": 0, "in_progress": 0, "resolved": 0, "accepted_risk": 0, "critical": 0, "high": 0, "overdue": 0, "total": 0, } mock_db.execute.return_value.fetchone.return_value = zero_row response = client.get("/security-backlog/stats") assert response.status_code == 200 data = response.json() assert data["total"] == 0 assert data["open"] == 0 assert data["critical"] == 0 assert data["overdue"] == 0 def test_stats_with_data(self, mock_db): stats_row = make_stats_row() mock_db.execute.return_value.fetchone.return_value = stats_row response = client.get("/security-backlog/stats") assert response.status_code == 200 data = response.json() assert data["total"] == 6 assert data["open"] == 3 assert data["critical"] == 1 assert data["high"] == 2 assert data["overdue"] == 1 assert data["in_progress"] == 1 assert data["resolved"] == 2 assert data["accepted_risk"] == 0 def test_stats_none_values_become_zero(self, mock_db): none_row = MagicMock() none_row._mapping = { "open": None, "in_progress": None, "resolved": None, "accepted_risk": None, "critical": None, "high": None, "overdue": None, "total": None, } mock_db.execute.return_value.fetchone.return_value = none_row response = client.get("/security-backlog/stats") assert response.status_code == 200 data = response.json() for key in ("open", "in_progress", "resolved", "accepted_risk", "critical", "high", "overdue", "total"): assert data[key] == 0 def test_stats_with_tenant_header(self, mock_db): stats_row = make_stats_row({"total": 1, "open": 1}) mock_db.execute.return_value.fetchone.return_value = stats_row response = client.get( "/security-backlog/stats", headers={"X-Tenant-Id": OTHER_TENANT}, ) assert response.status_code == 200 assert response.json()["total"] == 1 # ============================================================================= # HTTP Tests — POST /security-backlog # ============================================================================= class TestCreateSecurityItem: def test_create_success(self, mock_db): created_row = make_item_row({"title": "New Vulnerability"}) mock_db.execute.return_value.fetchone.return_value = created_row response = client.post( "/security-backlog", json={"title": "New Vulnerability"}, ) assert response.status_code == 201 data = response.json() assert data["title"] == "New Vulnerability" assert data["id"] == ITEM_ID mock_db.commit.assert_called_once() def test_create_full_item(self, mock_db): created_row = make_item_row({ "title": "CVE-2024-9999", "type": "cve", "severity": "critical", "status": "open", "cve": "CVE-2024-9999", "cvss": 9.8, }) mock_db.execute.return_value.fetchone.return_value = created_row response = client.post("/security-backlog", json={ "title": "CVE-2024-9999", "type": "cve", "severity": "critical", "cve": "CVE-2024-9999", "cvss": 9.8, }) assert response.status_code == 201 data = response.json() assert data["severity"] == "critical" assert data["cvss"] == 9.8 def test_create_missing_title_fails(self, mock_db): response = client.post("/security-backlog", json={"severity": "high"}) assert response.status_code == 422 def test_create_with_tenant_header(self, mock_db): created_row = make_item_row({"tenant_id": OTHER_TENANT}) mock_db.execute.return_value.fetchone.return_value = created_row response = client.post( "/security-backlog", json={"title": "Tenant-specific item"}, headers={"X-Tenant-Id": OTHER_TENANT}, ) assert response.status_code == 201 assert mock_db.commit.called def test_create_default_type_and_severity(self, mock_db): created_row = make_item_row() mock_db.execute.return_value.fetchone.return_value = created_row response = client.post("/security-backlog", json={"title": "Basic item"}) assert response.status_code == 201 data = response.json() assert data["type"] == "vulnerability" assert data["severity"] == "medium" assert data["status"] == "open" # ============================================================================= # HTTP Tests — PUT /security-backlog/{id} # ============================================================================= class TestUpdateSecurityItem: def test_update_success(self, mock_db): updated_row = make_item_row({"status": "resolved", "severity": "low"}) mock_db.execute.return_value.fetchone.return_value = updated_row response = client.put( f"/security-backlog/{ITEM_ID}", json={"status": "resolved", "severity": "low"}, ) assert response.status_code == 200 data = response.json() assert data["status"] == "resolved" assert data["severity"] == "low" mock_db.commit.assert_called_once() def test_update_partial_title_only(self, mock_db): updated_row = make_item_row({"title": "Updated Title"}) mock_db.execute.return_value.fetchone.return_value = updated_row response = client.put( f"/security-backlog/{ITEM_ID}", json={"title": "Updated Title"}, ) assert response.status_code == 200 assert response.json()["title"] == "Updated Title" def test_update_not_found_returns_404(self, mock_db): mock_db.execute.return_value.fetchone.return_value = None response = client.put( "/security-backlog/nonexistent-id", json={"status": "resolved"}, ) assert response.status_code == 404 assert "not found" in response.json()["detail"].lower() def test_update_no_fields_returns_400(self, mock_db): response = client.put(f"/security-backlog/{ITEM_ID}", json={}) assert response.status_code == 400 def test_update_with_tenant_header(self, mock_db): updated_row = make_item_row({"tenant_id": OTHER_TENANT, "status": "in-progress"}) mock_db.execute.return_value.fetchone.return_value = updated_row response = client.put( f"/security-backlog/{ITEM_ID}", json={"status": "in-progress"}, headers={"X-Tenant-Id": OTHER_TENANT}, ) assert response.status_code == 200 assert response.json()["status"] == "in-progress" # ============================================================================= # HTTP Tests — DELETE /security-backlog/{id} # ============================================================================= class TestDeleteSecurityItem: def test_delete_success_204(self, mock_db): delete_result = MagicMock() delete_result.rowcount = 1 mock_db.execute.return_value = delete_result response = client.delete(f"/security-backlog/{ITEM_ID}") assert response.status_code == 204 mock_db.commit.assert_called_once() def test_delete_not_found_returns_404(self, mock_db): delete_result = MagicMock() delete_result.rowcount = 0 mock_db.execute.return_value = delete_result response = client.delete("/security-backlog/nonexistent-id") assert response.status_code == 404 assert "not found" in response.json()["detail"].lower() def test_delete_with_tenant_header(self, mock_db): delete_result = MagicMock() delete_result.rowcount = 1 mock_db.execute.return_value = delete_result response = client.delete( f"/security-backlog/{ITEM_ID}", headers={"X-Tenant-Id": OTHER_TENANT}, ) assert response.status_code == 204 assert mock_db.commit.called # ============================================================================= # Tenant Isolation Tests # ============================================================================= class TestTenantIsolation: def test_different_tenant_sees_different_items(self, mock_db): """Tenant A has 3 items, Tenant B has 0 — each sees only their own data.""" call_count = 0 def side_effect(query, params): nonlocal call_count result = MagicMock() tid = params.get("tenant_id", "") if tid == DEFAULT_TENANT: count_row = MagicMock() count_row.__getitem__ = lambda self, i: 3 result.fetchone.return_value = count_row result.fetchall.return_value = [ make_item_row(), make_item_row({"id": "aaaaaaaa-0002-0002-0002-000000000002"}), make_item_row({"id": "aaaaaaaa-0003-0003-0003-000000000003"}), ] else: count_row = MagicMock() count_row.__getitem__ = lambda self, i: 0 result.fetchone.return_value = count_row result.fetchall.return_value = [] return result mock_db.execute.side_effect = side_effect resp_a = client.get( "/security-backlog", headers={"X-Tenant-Id": DEFAULT_TENANT}, ) assert resp_a.status_code == 200 assert resp_a.json()["total"] == 3 resp_b = client.get( "/security-backlog", headers={"X-Tenant-Id": OTHER_TENANT}, ) assert resp_b.status_code == 200 assert resp_b.json()["total"] == 0 def test_invalid_tenant_header_falls_back_to_default(self, mock_db): count_row = MagicMock() count_row.__getitem__ = lambda self, i: 0 execute_result = MagicMock() execute_result.fetchone.return_value = count_row execute_result.fetchall.return_value = [] mock_db.execute.return_value = execute_result response = client.get( "/security-backlog", headers={"X-Tenant-Id": "not-a-real-uuid"}, ) assert response.status_code == 200 # Should succeed (falls back to DEFAULT_TENANT_ID) assert "items" in response.json() def test_create_uses_tenant_from_header(self, mock_db): created_row = make_item_row({"tenant_id": OTHER_TENANT}) mock_db.execute.return_value.fetchone.return_value = created_row response = client.post( "/security-backlog", json={"title": "Tenant B item"}, headers={"X-Tenant-Id": OTHER_TENANT}, ) assert response.status_code == 201 assert mock_db.commit.called def test_delete_tenant_isolation_not_found_for_wrong_tenant(self, mock_db): """Deleting an item that belongs to a different tenant returns 404.""" delete_result = MagicMock() delete_result.rowcount = 0 # No rows deleted (item belongs to other tenant) mock_db.execute.return_value = delete_result response = client.delete( f"/security-backlog/{ITEM_ID}", headers={"X-Tenant-Id": OTHER_TENANT}, ) assert response.status_code == 404