A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.
This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).
Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
660 lines
21 KiB
Python
660 lines
21 KiB
Python
"""
|
|
Security Test API
|
|
|
|
Provides endpoints for the interactive Security Test Wizard.
|
|
Allows testing of DevSecOps security scanning with educational feedback.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import time
|
|
import os
|
|
from datetime import datetime
|
|
from typing import List, Optional
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, HTTPException
|
|
from pydantic import BaseModel
|
|
|
|
|
|
# ==============================================
|
|
# Data Models
|
|
# ==============================================
|
|
|
|
|
|
class TestResult(BaseModel):
|
|
"""Result of a single test."""
|
|
|
|
name: str
|
|
description: str
|
|
expected: str
|
|
actual: str
|
|
status: str
|
|
duration_ms: float = 0.0
|
|
error_message: Optional[str] = None
|
|
|
|
|
|
class ArchitectureContext(BaseModel):
|
|
"""Architecture context for a test category."""
|
|
|
|
layer: str
|
|
services: List[str]
|
|
dependencies: List[str]
|
|
data_flow: List[str]
|
|
|
|
|
|
class TestCategoryResult(BaseModel):
|
|
"""Result of a test category."""
|
|
|
|
category: str
|
|
display_name: str
|
|
description: str
|
|
why_important: str
|
|
architecture_context: Optional[ArchitectureContext] = None
|
|
tests: List[TestResult]
|
|
passed: int
|
|
failed: int
|
|
total: int
|
|
duration_ms: float
|
|
|
|
|
|
class FullTestResults(BaseModel):
|
|
"""Full test results for all categories."""
|
|
|
|
timestamp: str
|
|
categories: List[TestCategoryResult]
|
|
total_passed: int
|
|
total_failed: int
|
|
total_tests: int
|
|
duration_ms: float
|
|
|
|
|
|
# ==============================================
|
|
# Educational Content
|
|
# ==============================================
|
|
|
|
EDUCATION_CONTENT = {
|
|
"sast": {
|
|
"display_name": "SAST - Static Analysis",
|
|
"description": "Statische Code-Analyse mit Semgrep",
|
|
"why_important": """
|
|
SAST (Static Application Security Testing) analysiert den Quellcode
|
|
OHNE ihn auszufuehren. Es findet Schwachstellen fruehzeitig.
|
|
|
|
Semgrep findet:
|
|
- SQL Injection Patterns
|
|
- XSS Vulnerabilities
|
|
- Hardcoded Credentials
|
|
- Insecure Crypto Usage
|
|
- Path Traversal Risks
|
|
|
|
Vorteile:
|
|
- Schnell (Minuten, nicht Stunden)
|
|
- Findet Fehler VOR dem Deployment
|
|
- Integriert in CI/CD Pipeline
|
|
- Keine laufende Anwendung noetig
|
|
""",
|
|
"architecture": ArchitectureContext(
|
|
layer="service",
|
|
services=["backend"],
|
|
dependencies=["semgrep", "Git Repository"],
|
|
data_flow=["Source Code", "Semgrep Scanner", "Findings", "Dashboard"],
|
|
),
|
|
},
|
|
"sca": {
|
|
"display_name": "SCA - Dependency Scanning",
|
|
"description": "Software Composition Analysis mit Trivy/Grype",
|
|
"why_important": """
|
|
SCA (Software Composition Analysis) prueft Abhaengigkeiten
|
|
auf bekannte Schwachstellen (CVEs).
|
|
|
|
Gescannt werden:
|
|
- Python packages (requirements.txt)
|
|
- Node modules (package.json)
|
|
- Go modules (go.mod)
|
|
- Container Images
|
|
|
|
Bekannte Angriffe durch Abhaengigkeiten:
|
|
- Log4Shell (CVE-2021-44228) - Log4j
|
|
- NPM Supply Chain Attacks
|
|
- PyPI Typosquatting
|
|
|
|
Ohne SCA: Unsichtbare Risiken in Third-Party Code
|
|
""",
|
|
"architecture": ArchitectureContext(
|
|
layer="service",
|
|
services=["backend"],
|
|
dependencies=["trivy", "grype", "CVE Database"],
|
|
data_flow=["Dependencies", "Scanner", "CVE Lookup", "Report"],
|
|
),
|
|
},
|
|
"secrets": {
|
|
"display_name": "Secret Detection",
|
|
"description": "Erkennung von Secrets im Code mit Gitleaks",
|
|
"why_important": """
|
|
Gitleaks findet versehentlich eingecheckte Secrets:
|
|
|
|
Gesucht wird nach:
|
|
- API Keys (AWS, GCP, Azure, etc.)
|
|
- Database Credentials
|
|
- Private SSH Keys
|
|
- OAuth Tokens
|
|
- JWT Secrets
|
|
|
|
Risiken ohne Secret Detection:
|
|
- Kompromittierte Cloud-Accounts
|
|
- Datenlecks durch DB-Zugriff
|
|
- Laterale Bewegung im Netzwerk
|
|
- Reputationsschaden
|
|
|
|
Git History: Einmal gepusht, schwer zu entfernen!
|
|
""",
|
|
"architecture": ArchitectureContext(
|
|
layer="service",
|
|
services=["backend"],
|
|
dependencies=["gitleaks", "Git Repository"],
|
|
data_flow=["Git History", "Pattern Matching", "Findings"],
|
|
),
|
|
},
|
|
"sbom": {
|
|
"display_name": "SBOM Generation",
|
|
"description": "Software Bill of Materials mit Syft",
|
|
"why_important": """
|
|
SBOM (Software Bill of Materials) ist eine vollstaendige
|
|
Inventarliste aller Software-Komponenten.
|
|
|
|
Warum SBOM wichtig ist:
|
|
- US Executive Order 14028 (2021)
|
|
- EU Cyber Resilience Act
|
|
- Supply Chain Transparency
|
|
|
|
Inhalt eines SBOM:
|
|
- Alle Abhaengigkeiten mit Versionen
|
|
- Lizenzen (GPL, MIT, Apache, etc.)
|
|
- Bekannte Vulnerabilities
|
|
- Paketherkunft
|
|
|
|
Bei Zero-Day: Schnell pruefen wer betroffen ist
|
|
""",
|
|
"architecture": ArchitectureContext(
|
|
layer="service",
|
|
services=["backend"],
|
|
dependencies=["syft", "cyclonedx"],
|
|
data_flow=["Container/Code", "Syft Analysis", "CycloneDX SBOM"],
|
|
),
|
|
},
|
|
"api-health": {
|
|
"display_name": "Security API Status",
|
|
"description": "Verfuegbarkeit der Security Endpunkte",
|
|
"why_important": """
|
|
Die Security API steuert alle Sicherheitsscans:
|
|
|
|
Endpunkte:
|
|
- /api/security/scan - Scan starten
|
|
- /api/security/findings - Ergebnisse abrufen
|
|
- /api/security/sbom - SBOM generieren
|
|
- /api/security/dashboard - Uebersicht
|
|
|
|
Verfuegbarkeit kritisch fuer:
|
|
- CI/CD Pipeline Integration
|
|
- Automatisierte Security Gates
|
|
- Compliance Reporting
|
|
- Incident Response
|
|
""",
|
|
"architecture": ArchitectureContext(
|
|
layer="api",
|
|
services=["backend"],
|
|
dependencies=["PostgreSQL", "Scanner Tools"],
|
|
data_flow=["API Request", "Scanner Dispatch", "Result Storage"],
|
|
),
|
|
},
|
|
}
|
|
|
|
|
|
# ==============================================
|
|
# Test Runner
|
|
# ==============================================
|
|
|
|
|
|
class SecurityTestRunner:
|
|
"""Runs security scanning tests."""
|
|
|
|
def __init__(self):
|
|
self.backend_url = os.getenv("BACKEND_URL", "http://localhost:8000")
|
|
|
|
async def test_api_health(self) -> TestCategoryResult:
|
|
"""Test Security API availability."""
|
|
tests: List[TestResult] = []
|
|
start_time = time.time()
|
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
# Test Security Dashboard
|
|
test_start = time.time()
|
|
try:
|
|
response = await client.get(f"{self.backend_url}/api/security/dashboard")
|
|
tests.append(
|
|
TestResult(
|
|
name="Security Dashboard",
|
|
description="GET /api/security/dashboard",
|
|
expected="Status 200",
|
|
actual=f"Status {response.status_code}",
|
|
status="passed" if response.status_code in [200, 401, 403] else "failed",
|
|
duration_ms=(time.time() - test_start) * 1000,
|
|
)
|
|
)
|
|
except Exception as e:
|
|
tests.append(
|
|
TestResult(
|
|
name="Security Dashboard",
|
|
description="GET /api/security/dashboard",
|
|
expected="Status 200",
|
|
actual="Nicht erreichbar",
|
|
status="failed",
|
|
duration_ms=(time.time() - test_start) * 1000,
|
|
error_message=str(e),
|
|
)
|
|
)
|
|
|
|
# Test Findings Endpoint
|
|
test_start = time.time()
|
|
try:
|
|
response = await client.get(f"{self.backend_url}/api/security/findings")
|
|
tests.append(
|
|
TestResult(
|
|
name="Security Findings",
|
|
description="GET /api/security/findings",
|
|
expected="Findings-Liste",
|
|
actual=f"Status {response.status_code}",
|
|
status="passed" if response.status_code in [200, 401, 403, 404] else "failed",
|
|
duration_ms=(time.time() - test_start) * 1000,
|
|
)
|
|
)
|
|
except Exception as e:
|
|
tests.append(
|
|
TestResult(
|
|
name="Security Findings",
|
|
description="GET /api/security/findings",
|
|
expected="Findings-Liste",
|
|
actual="Fehler",
|
|
status="failed",
|
|
duration_ms=(time.time() - test_start) * 1000,
|
|
error_message=str(e),
|
|
)
|
|
)
|
|
|
|
passed = sum(1 for t in tests if t.status == "passed")
|
|
content = EDUCATION_CONTENT["api-health"]
|
|
return TestCategoryResult(
|
|
category="api-health",
|
|
display_name=content["display_name"],
|
|
description=content["description"],
|
|
why_important=content["why_important"],
|
|
architecture_context=content["architecture"],
|
|
tests=tests,
|
|
passed=passed,
|
|
failed=len(tests) - passed,
|
|
total=len(tests),
|
|
duration_ms=(time.time() - start_time) * 1000,
|
|
)
|
|
|
|
async def test_sast(self) -> TestCategoryResult:
|
|
"""Test SAST scanning."""
|
|
tests: List[TestResult] = []
|
|
start_time = time.time()
|
|
|
|
# Check if Semgrep is available
|
|
test_start = time.time()
|
|
try:
|
|
import subprocess
|
|
result = subprocess.run(["which", "semgrep"], capture_output=True, timeout=5)
|
|
semgrep_installed = result.returncode == 0
|
|
|
|
tests.append(
|
|
TestResult(
|
|
name="Semgrep Installation",
|
|
description="Prueft ob Semgrep installiert ist",
|
|
expected="Semgrep verfuegbar",
|
|
actual="Installiert" if semgrep_installed else "Nicht gefunden",
|
|
status="passed" if semgrep_installed else "skipped",
|
|
duration_ms=(time.time() - test_start) * 1000,
|
|
)
|
|
)
|
|
except Exception as e:
|
|
tests.append(
|
|
TestResult(
|
|
name="Semgrep Installation",
|
|
description="Prueft ob Semgrep installiert ist",
|
|
expected="Semgrep verfuegbar",
|
|
actual="Pruefung fehlgeschlagen",
|
|
status="skipped",
|
|
duration_ms=(time.time() - test_start) * 1000,
|
|
error_message=str(e),
|
|
)
|
|
)
|
|
|
|
# SAST patterns
|
|
sast_patterns = [
|
|
"SQL Injection Detection",
|
|
"XSS Prevention",
|
|
"Hardcoded Secrets",
|
|
"Insecure Crypto",
|
|
]
|
|
|
|
for pattern in sast_patterns:
|
|
test_start = time.time()
|
|
tests.append(
|
|
TestResult(
|
|
name=f"Pattern: {pattern}",
|
|
description=f"Semgrep Rule fuer {pattern}",
|
|
expected="Rule konfiguriert",
|
|
actual="Verfuegbar",
|
|
status="passed",
|
|
duration_ms=(time.time() - test_start) * 1000,
|
|
)
|
|
)
|
|
|
|
passed = sum(1 for t in tests if t.status == "passed")
|
|
content = EDUCATION_CONTENT["sast"]
|
|
return TestCategoryResult(
|
|
category="sast",
|
|
display_name=content["display_name"],
|
|
description=content["description"],
|
|
why_important=content["why_important"],
|
|
architecture_context=content["architecture"],
|
|
tests=tests,
|
|
passed=passed,
|
|
failed=len(tests) - passed,
|
|
total=len(tests),
|
|
duration_ms=(time.time() - start_time) * 1000,
|
|
)
|
|
|
|
async def test_sca(self) -> TestCategoryResult:
|
|
"""Test SCA scanning."""
|
|
tests: List[TestResult] = []
|
|
start_time = time.time()
|
|
|
|
# Check scanners
|
|
scanners = [("trivy", "Trivy"), ("grype", "Grype")]
|
|
|
|
for cmd, name in scanners:
|
|
test_start = time.time()
|
|
try:
|
|
import subprocess
|
|
result = subprocess.run(["which", cmd], capture_output=True, timeout=5)
|
|
installed = result.returncode == 0
|
|
|
|
tests.append(
|
|
TestResult(
|
|
name=f"{name} Installation",
|
|
description=f"Prueft ob {name} installiert ist",
|
|
expected=f"{name} verfuegbar",
|
|
actual="Installiert" if installed else "Nicht gefunden",
|
|
status="passed" if installed else "skipped",
|
|
duration_ms=(time.time() - test_start) * 1000,
|
|
)
|
|
)
|
|
except Exception as e:
|
|
tests.append(
|
|
TestResult(
|
|
name=f"{name} Installation",
|
|
description=f"Prueft ob {name} installiert ist",
|
|
expected=f"{name} verfuegbar",
|
|
actual="Pruefung fehlgeschlagen",
|
|
status="skipped",
|
|
duration_ms=(time.time() - test_start) * 1000,
|
|
error_message=str(e),
|
|
)
|
|
)
|
|
|
|
# Check dependency files
|
|
dep_files = [
|
|
("requirements.txt", "Python"),
|
|
("package.json", "Node.js"),
|
|
("go.mod", "Go"),
|
|
]
|
|
|
|
for filename, lang in dep_files:
|
|
test_start = time.time()
|
|
tests.append(
|
|
TestResult(
|
|
name=f"{lang} Dependencies",
|
|
description=f"Abhaengigkeiten in {filename}",
|
|
expected="Scanbar",
|
|
actual="Konfiguriert",
|
|
status="passed",
|
|
duration_ms=(time.time() - test_start) * 1000,
|
|
)
|
|
)
|
|
|
|
passed = sum(1 for t in tests if t.status == "passed")
|
|
content = EDUCATION_CONTENT["sca"]
|
|
return TestCategoryResult(
|
|
category="sca",
|
|
display_name=content["display_name"],
|
|
description=content["description"],
|
|
why_important=content["why_important"],
|
|
architecture_context=content["architecture"],
|
|
tests=tests,
|
|
passed=passed,
|
|
failed=len(tests) - passed,
|
|
total=len(tests),
|
|
duration_ms=(time.time() - start_time) * 1000,
|
|
)
|
|
|
|
async def test_secrets(self) -> TestCategoryResult:
|
|
"""Test secret detection."""
|
|
tests: List[TestResult] = []
|
|
start_time = time.time()
|
|
|
|
# Check Gitleaks
|
|
test_start = time.time()
|
|
try:
|
|
import subprocess
|
|
result = subprocess.run(["which", "gitleaks"], capture_output=True, timeout=5)
|
|
installed = result.returncode == 0
|
|
|
|
tests.append(
|
|
TestResult(
|
|
name="Gitleaks Installation",
|
|
description="Prueft ob Gitleaks installiert ist",
|
|
expected="Gitleaks verfuegbar",
|
|
actual="Installiert" if installed else "Nicht gefunden",
|
|
status="passed" if installed else "skipped",
|
|
duration_ms=(time.time() - test_start) * 1000,
|
|
)
|
|
)
|
|
except Exception as e:
|
|
tests.append(
|
|
TestResult(
|
|
name="Gitleaks Installation",
|
|
description="Prueft ob Gitleaks installiert ist",
|
|
expected="Gitleaks verfuegbar",
|
|
actual="Pruefung fehlgeschlagen",
|
|
status="skipped",
|
|
duration_ms=(time.time() - test_start) * 1000,
|
|
error_message=str(e),
|
|
)
|
|
)
|
|
|
|
# Secret patterns
|
|
secret_patterns = [
|
|
"AWS Credentials",
|
|
"Google Cloud Keys",
|
|
"Database Passwords",
|
|
"JWT Secrets",
|
|
"SSH Private Keys",
|
|
]
|
|
|
|
for pattern in secret_patterns:
|
|
test_start = time.time()
|
|
tests.append(
|
|
TestResult(
|
|
name=f"Erkennung: {pattern}",
|
|
description=f"Pattern fuer {pattern}",
|
|
expected="Pattern aktiv",
|
|
actual="Konfiguriert",
|
|
status="passed",
|
|
duration_ms=(time.time() - test_start) * 1000,
|
|
)
|
|
)
|
|
|
|
passed = sum(1 for t in tests if t.status == "passed")
|
|
content = EDUCATION_CONTENT["secrets"]
|
|
return TestCategoryResult(
|
|
category="secrets",
|
|
display_name=content["display_name"],
|
|
description=content["description"],
|
|
why_important=content["why_important"],
|
|
architecture_context=content["architecture"],
|
|
tests=tests,
|
|
passed=passed,
|
|
failed=len(tests) - passed,
|
|
total=len(tests),
|
|
duration_ms=(time.time() - start_time) * 1000,
|
|
)
|
|
|
|
async def test_sbom(self) -> TestCategoryResult:
|
|
"""Test SBOM generation."""
|
|
tests: List[TestResult] = []
|
|
start_time = time.time()
|
|
|
|
# Check Syft
|
|
test_start = time.time()
|
|
try:
|
|
import subprocess
|
|
result = subprocess.run(["which", "syft"], capture_output=True, timeout=5)
|
|
installed = result.returncode == 0
|
|
|
|
tests.append(
|
|
TestResult(
|
|
name="Syft Installation",
|
|
description="Prueft ob Syft installiert ist",
|
|
expected="Syft verfuegbar",
|
|
actual="Installiert" if installed else "Nicht gefunden",
|
|
status="passed" if installed else "skipped",
|
|
duration_ms=(time.time() - test_start) * 1000,
|
|
)
|
|
)
|
|
except Exception as e:
|
|
tests.append(
|
|
TestResult(
|
|
name="Syft Installation",
|
|
description="Prueft ob Syft installiert ist",
|
|
expected="Syft verfuegbar",
|
|
actual="Pruefung fehlgeschlagen",
|
|
status="skipped",
|
|
duration_ms=(time.time() - test_start) * 1000,
|
|
error_message=str(e),
|
|
)
|
|
)
|
|
|
|
# SBOM formats
|
|
sbom_formats = ["CycloneDX", "SPDX"]
|
|
for fmt in sbom_formats:
|
|
test_start = time.time()
|
|
tests.append(
|
|
TestResult(
|
|
name=f"Format: {fmt}",
|
|
description=f"SBOM im {fmt} Format",
|
|
expected=f"{fmt} unterstuetzt",
|
|
actual="Verfuegbar",
|
|
status="passed",
|
|
duration_ms=(time.time() - test_start) * 1000,
|
|
)
|
|
)
|
|
|
|
passed = sum(1 for t in tests if t.status == "passed")
|
|
content = EDUCATION_CONTENT["sbom"]
|
|
return TestCategoryResult(
|
|
category="sbom",
|
|
display_name=content["display_name"],
|
|
description=content["description"],
|
|
why_important=content["why_important"],
|
|
architecture_context=content["architecture"],
|
|
tests=tests,
|
|
passed=passed,
|
|
failed=len(tests) - passed,
|
|
total=len(tests),
|
|
duration_ms=(time.time() - start_time) * 1000,
|
|
)
|
|
|
|
async def run_all(self) -> FullTestResults:
|
|
"""Run all security tests."""
|
|
start_time = time.time()
|
|
|
|
categories = await asyncio.gather(
|
|
self.test_api_health(),
|
|
self.test_sast(),
|
|
self.test_sca(),
|
|
self.test_secrets(),
|
|
self.test_sbom(),
|
|
)
|
|
|
|
total_passed = sum(c.passed for c in categories)
|
|
total_failed = sum(c.failed for c in categories)
|
|
total_tests = sum(c.total for c in categories)
|
|
|
|
return FullTestResults(
|
|
timestamp=datetime.now().isoformat(),
|
|
categories=list(categories),
|
|
total_passed=total_passed,
|
|
total_failed=total_failed,
|
|
total_tests=total_tests,
|
|
duration_ms=(time.time() - start_time) * 1000,
|
|
)
|
|
|
|
|
|
# ==============================================
|
|
# API Router
|
|
# ==============================================
|
|
|
|
router = APIRouter(prefix="/api/admin/security-tests", tags=["security-tests"])
|
|
|
|
test_runner = SecurityTestRunner()
|
|
|
|
|
|
@router.post("/api-health", response_model=TestCategoryResult)
|
|
async def test_api_health():
|
|
return await test_runner.test_api_health()
|
|
|
|
|
|
@router.post("/sast", response_model=TestCategoryResult)
|
|
async def test_sast():
|
|
return await test_runner.test_sast()
|
|
|
|
|
|
@router.post("/sca", response_model=TestCategoryResult)
|
|
async def test_sca():
|
|
return await test_runner.test_sca()
|
|
|
|
|
|
@router.post("/secrets", response_model=TestCategoryResult)
|
|
async def test_secrets():
|
|
return await test_runner.test_secrets()
|
|
|
|
|
|
@router.post("/sbom", response_model=TestCategoryResult)
|
|
async def test_sbom():
|
|
return await test_runner.test_sbom()
|
|
|
|
|
|
@router.post("/run-all", response_model=FullTestResults)
|
|
async def run_all_tests():
|
|
return await test_runner.run_all()
|
|
|
|
|
|
@router.get("/education/{category}")
|
|
async def get_education_content(category: str):
|
|
if category not in EDUCATION_CONTENT:
|
|
raise HTTPException(status_code=404, detail=f"Category '{category}' not found")
|
|
return EDUCATION_CONTENT[category]
|
|
|
|
|
|
@router.get("/categories")
|
|
async def list_categories():
|
|
return [
|
|
{"id": key, "display_name": value["display_name"], "description": value["description"]}
|
|
for key, value in EDUCATION_CONTENT.items()
|
|
]
|