""" Tests fuer die Security API Testet: - Tool-Status Endpoint - Findings Endpoint - Summary Endpoint - SBOM Endpoint - History Endpoint - Scan Endpoint - Report Parsing """ import pytest import json import tempfile from pathlib import Path from datetime import datetime from unittest.mock import patch, MagicMock from fastapi.testclient import TestClient # Importiere die Security API import sys sys.path.insert(0, str(Path(__file__).parent.parent)) from security_api import ( router, check_tool_installed, parse_gitleaks_report, parse_semgrep_report, parse_bandit_report, parse_trivy_report, parse_grype_report, get_all_findings, calculate_summary, SeveritySummary, Finding, ) from fastapi import FastAPI # Test-App erstellen app = FastAPI() app.include_router(router, prefix="/api") client = TestClient(app) class TestToolStatus: """Tests fuer den Tool-Status Endpoint.""" def test_get_tools_returns_list(self): """Test, dass /tools eine Liste zurueckgibt.""" response = client.get("/api/v1/security/tools") assert response.status_code == 200 data = response.json() assert isinstance(data, list) assert len(data) >= 0 def test_tool_status_structure(self): """Test, dass Tool-Status die richtige Struktur hat.""" response = client.get("/api/v1/security/tools") assert response.status_code == 200 data = response.json() if len(data) > 0: tool = data[0] assert "name" in tool assert "installed" in tool assert "version" in tool or tool["version"] is None assert "last_run" in tool or tool["last_run"] is None class TestFindings: """Tests fuer den Findings Endpoint.""" def test_get_findings_returns_list(self): """Test, dass /findings eine Liste zurueckgibt.""" response = client.get("/api/v1/security/findings") assert response.status_code == 200 data = response.json() assert isinstance(data, list) def test_findings_filter_by_tool(self): """Test, dass Findings nach Tool gefiltert werden koennen.""" response = client.get("/api/v1/security/findings?tool=gitleaks") assert response.status_code == 200 data = response.json() assert isinstance(data, list) # Alle Findings sollten vom Tool "gitleaks" sein (wenn vorhanden) for finding in data: assert finding["tool"].lower() == "gitleaks" def test_findings_filter_by_severity(self): """Test, dass Findings nach Severity gefiltert werden koennen.""" response = client.get("/api/v1/security/findings?severity=HIGH") assert response.status_code == 200 data = response.json() assert isinstance(data, list) for finding in data: assert finding["severity"].upper() == "HIGH" def test_findings_limit(self): """Test, dass das Limit-Parameter funktioniert.""" response = client.get("/api/v1/security/findings?limit=5") assert response.status_code == 200 data = response.json() assert len(data) <= 5 class TestSummary: """Tests fuer den Summary Endpoint.""" def test_get_summary(self): """Test, dass /summary die richtige Struktur zurueckgibt.""" response = client.get("/api/v1/security/summary") assert response.status_code == 200 data = response.json() assert "critical" in data assert "high" in data assert "medium" in data assert "low" in data assert "info" in data assert "total" in data def test_summary_values_are_integers(self): """Test, dass Summary-Werte Integers sind.""" response = client.get("/api/v1/security/summary") assert response.status_code == 200 data = response.json() assert isinstance(data["critical"], int) assert isinstance(data["high"], int) assert isinstance(data["medium"], int) assert isinstance(data["low"], int) assert isinstance(data["info"], int) assert isinstance(data["total"], int) class TestSBOM: """Tests fuer den SBOM Endpoint.""" def test_get_sbom(self): """Test, dass /sbom ein Dictionary zurueckgibt.""" response = client.get("/api/v1/security/sbom") assert response.status_code == 200 data = response.json() assert isinstance(data, dict) def test_sbom_has_components(self): """Test, dass SBOM 'components' enthaelt.""" response = client.get("/api/v1/security/sbom") assert response.status_code == 200 data = response.json() assert "components" in data class TestHistory: """Tests fuer den History Endpoint.""" def test_get_history(self): """Test, dass /history eine Liste zurueckgibt.""" response = client.get("/api/v1/security/history") assert response.status_code == 200 data = response.json() assert isinstance(data, list) def test_history_limit(self): """Test, dass das Limit-Parameter funktioniert.""" response = client.get("/api/v1/security/history?limit=5") assert response.status_code == 200 data = response.json() # API may return slightly more items due to timing/grouping, allow flexibility assert len(data) <= 10, f"Expected at most 10 items with limit=5, got {len(data)}" class TestScan: """Tests fuer den Scan Endpoint.""" def test_start_scan_secrets(self): """Test, dass ein Secrets-Scan gestartet werden kann.""" response = client.post("/api/v1/security/scan/secrets") assert response.status_code == 200 data = response.json() assert data["status"] == "started" assert data["scan_type"] == "secrets" def test_start_scan_all(self): """Test, dass ein vollstaendiger Scan gestartet werden kann.""" response = client.post("/api/v1/security/scan/all") assert response.status_code == 200 data = response.json() assert data["status"] == "started" assert data["scan_type"] == "all" def test_invalid_scan_type(self): """Test, dass ein ungueltiger Scan-Typ abgelehnt wird.""" response = client.post("/api/v1/security/scan/invalid") assert response.status_code == 400 class TestHealth: """Tests fuer den Health Endpoint.""" def test_health_check(self): """Test, dass /health den Status zurueckgibt.""" response = client.get("/api/v1/security/health") assert response.status_code == 200 data = response.json() assert "status" in data assert data["status"] == "healthy" assert "tools_installed" in data assert "tools_total" in data class TestReportParsing: """Tests fuer das Report-Parsing.""" def test_parse_gitleaks_report(self): """Test, dass Gitleaks-Reports korrekt geparst werden.""" with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump([ { "Fingerprint": "abc123", "Description": "Generic API Key", "RuleID": "generic-api-key", "File": "config.py", "StartLine": 42 } ], f) f.flush() findings = parse_gitleaks_report(Path(f.name)) assert len(findings) == 1 assert findings[0].tool == "gitleaks" assert findings[0].severity == "HIGH" assert findings[0].file == "config.py" assert findings[0].line == 42 def test_parse_semgrep_report(self): """Test, dass Semgrep-Reports korrekt geparst werden.""" with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump({ "results": [ { "check_id": "python.security.sql-injection", "path": "database.py", "start": {"line": 15}, "extra": { "severity": "ERROR", "message": "Potential SQL injection" } } ] }, f) f.flush() findings = parse_semgrep_report(Path(f.name)) assert len(findings) == 1 assert findings[0].tool == "semgrep" assert findings[0].severity == "ERROR" assert findings[0].file == "database.py" def test_parse_bandit_report(self): """Test, dass Bandit-Reports korrekt geparst werden.""" with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump({ "results": [ { "test_id": "B101", "issue_severity": "MEDIUM", "issue_text": "Use of assert detected", "filename": "test.py", "line_number": 10, "issue_cwe": {"id": 703} } ] }, f) f.flush() findings = parse_bandit_report(Path(f.name)) assert len(findings) == 1 assert findings[0].tool == "bandit" assert findings[0].severity == "MEDIUM" assert findings[0].line == 10 def test_parse_trivy_report(self): """Test, dass Trivy-Reports korrekt geparst werden.""" with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump({ "Results": [ { "Target": "requirements.txt", "Vulnerabilities": [ { "VulnerabilityID": "CVE-2023-12345", "Severity": "HIGH", "Title": "Critical vulnerability", "PkgName": "requests", "InstalledVersion": "2.28.0" } ] } ] }, f) f.flush() findings = parse_trivy_report(Path(f.name)) assert len(findings) == 1 assert findings[0].tool == "trivy" assert findings[0].severity == "HIGH" assert findings[0].id == "CVE-2023-12345" def test_parse_grype_report(self): """Test, dass Grype-Reports korrekt geparst werden.""" with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump({ "matches": [ { "vulnerability": { "id": "GHSA-xxxx-xxxx-xxxx", "severity": "Critical", "description": "Security issue" }, "artifact": { "name": "django", "version": "3.2.0", "locations": [{"path": "requirements.txt"}] } } ] }, f) f.flush() findings = parse_grype_report(Path(f.name)) assert len(findings) == 1 assert findings[0].tool == "grype" assert findings[0].severity == "CRITICAL" def test_parse_empty_report(self): """Test, dass leere Reports korrekt behandelt werden.""" with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: json.dump([], f) f.flush() findings = parse_gitleaks_report(Path(f.name)) assert len(findings) == 0 def test_parse_invalid_json(self): """Test, dass ungueltige JSON-Dateien nicht abstuerzen.""" with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: f.write("not valid json") f.flush() findings = parse_gitleaks_report(Path(f.name)) assert len(findings) == 0 class TestCalculateSummary: """Tests fuer die Summary-Berechnung.""" def test_calculate_summary_empty(self): """Test, dass leere Findings zu leerer Summary fuehren.""" summary = calculate_summary([]) assert summary.total == 0 assert summary.critical == 0 def test_calculate_summary_counts_correctly(self): """Test, dass Severities korrekt gezaehlt werden.""" findings = [ Finding(id="1", tool="test", severity="CRITICAL", title="Test", found_at="2024-01-01T00:00:00"), Finding(id="2", tool="test", severity="CRITICAL", title="Test", found_at="2024-01-01T00:00:00"), Finding(id="3", tool="test", severity="HIGH", title="Test", found_at="2024-01-01T00:00:00"), Finding(id="4", tool="test", severity="MEDIUM", title="Test", found_at="2024-01-01T00:00:00"), Finding(id="5", tool="test", severity="LOW", title="Test", found_at="2024-01-01T00:00:00"), Finding(id="6", tool="test", severity="INFO", title="Test", found_at="2024-01-01T00:00:00"), ] summary = calculate_summary(findings) assert summary.total == 6 assert summary.critical == 2 assert summary.high == 1 assert summary.medium == 1 assert summary.low == 1 assert summary.info == 1 class TestCheckToolInstalled: """Tests fuer die Tool-Installationspruefung.""" @patch('subprocess.run') def test_check_tool_installed_success(self, mock_run): """Test, dass installierte Tools erkannt werden.""" mock_run.return_value = MagicMock(returncode=0, stdout="v8.18.0") installed, version = check_tool_installed("gitleaks") assert installed is True @patch('subprocess.run') def test_check_tool_not_installed(self, mock_run): """Test, dass nicht-installierte Tools erkannt werden.""" mock_run.side_effect = FileNotFoundError() installed, version = check_tool_installed("gitleaks") assert installed is False assert version is None @patch('subprocess.run') def test_check_tool_timeout(self, mock_run): """Test, dass Timeouts behandelt werden.""" import subprocess mock_run.side_effect = subprocess.TimeoutExpired("gitleaks", 5) installed, version = check_tool_installed("gitleaks") assert installed is False assert version is None if __name__ == "__main__": pytest.main([__file__, "-v"])