This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/backend/security_api.py
Benjamin Admin 21a844cb8a fix: Restore all files lost during destructive rebase
A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.

This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).

Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 09:51:32 +01:00

996 lines
37 KiB
Python

"""
BreakPilot Security API
Endpunkte fuer das Security Dashboard:
- Tool-Status abfragen
- Scan-Ergebnisse abrufen
- Scans ausloesen
- SBOM-Daten abrufen
- Scan-Historie anzeigen
Features:
- Liest Security-Reports aus dem security-reports/ Verzeichnis
- Fuehrt Security-Scans via subprocess aus
- Parst Gitleaks, Semgrep, Trivy, Grype JSON-Reports
- Generiert SBOM mit Syft
"""
import os
import json
import subprocess
import asyncio
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional
from fastapi import APIRouter, HTTPException, BackgroundTasks
from pydantic import BaseModel
router = APIRouter(prefix="/v1/security", tags=["Security"])
# Pfade - innerhalb des Backend-Verzeichnisses
# In Docker: /app/security-reports, /app/scripts
# Lokal: backend/security-reports, backend/scripts
BACKEND_DIR = Path(__file__).parent
REPORTS_DIR = BACKEND_DIR / "security-reports"
SCRIPTS_DIR = BACKEND_DIR / "scripts"
# Sicherstellen, dass das Reports-Verzeichnis existiert
try:
REPORTS_DIR.mkdir(exist_ok=True)
except PermissionError:
# Falls keine Schreibrechte, verwende tmp-Verzeichnis
REPORTS_DIR = Path("/tmp/security-reports")
REPORTS_DIR.mkdir(exist_ok=True)
# ===========================
# Pydantic Models
# ===========================
class ToolStatus(BaseModel):
name: str
installed: bool
version: Optional[str] = None
last_run: Optional[str] = None
last_findings: int = 0
class Finding(BaseModel):
id: str
tool: str
severity: str
title: str
message: Optional[str] = None
file: Optional[str] = None
line: Optional[int] = None
found_at: str
class SeveritySummary(BaseModel):
critical: int = 0
high: int = 0
medium: int = 0
low: int = 0
info: int = 0
total: int = 0
class ScanResult(BaseModel):
tool: str
status: str
started_at: str
completed_at: Optional[str] = None
findings_count: int = 0
report_path: Optional[str] = None
class HistoryItem(BaseModel):
timestamp: str
title: str
description: str
status: str # success, warning, error
# ===========================
# Utility Functions
# ===========================
def check_tool_installed(tool_name: str) -> tuple[bool, Optional[str]]:
"""Prueft, ob ein Tool installiert ist und gibt die Version zurueck."""
try:
if tool_name == "gitleaks":
result = subprocess.run(["gitleaks", "version"], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
return True, result.stdout.strip()
elif tool_name == "semgrep":
result = subprocess.run(["semgrep", "--version"], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
return True, result.stdout.strip().split('\n')[0]
elif tool_name == "bandit":
result = subprocess.run(["bandit", "--version"], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
return True, result.stdout.strip()
elif tool_name == "trivy":
result = subprocess.run(["trivy", "version"], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
# Parse "Version: 0.48.x"
for line in result.stdout.split('\n'):
if line.startswith('Version:'):
return True, line.split(':')[1].strip()
return True, result.stdout.strip().split('\n')[0]
elif tool_name == "grype":
result = subprocess.run(["grype", "version"], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
return True, result.stdout.strip().split('\n')[0]
elif tool_name == "syft":
result = subprocess.run(["syft", "version"], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
return True, result.stdout.strip().split('\n')[0]
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return False, None
def get_latest_report(tool_prefix: str) -> Optional[Path]:
"""Findet den neuesten Report fuer ein Tool."""
if not REPORTS_DIR.exists():
return None
reports = list(REPORTS_DIR.glob(f"{tool_prefix}*.json"))
if not reports:
return None
return max(reports, key=lambda p: p.stat().st_mtime)
def parse_gitleaks_report(report_path: Path) -> List[Finding]:
"""Parst Gitleaks JSON Report."""
findings = []
try:
with open(report_path) as f:
data = json.load(f)
if isinstance(data, list):
for item in data:
findings.append(Finding(
id=item.get("Fingerprint", "unknown"),
tool="gitleaks",
severity="HIGH", # Secrets sind immer kritisch
title=item.get("Description", "Secret detected"),
message=f"Rule: {item.get('RuleID', 'unknown')}",
file=item.get("File", ""),
line=item.get("StartLine", 0),
found_at=datetime.fromtimestamp(report_path.stat().st_mtime).isoformat()
))
except (json.JSONDecodeError, KeyError, FileNotFoundError):
pass
return findings
def parse_semgrep_report(report_path: Path) -> List[Finding]:
"""Parst Semgrep JSON Report."""
findings = []
try:
with open(report_path) as f:
data = json.load(f)
results = data.get("results", [])
for item in results:
severity = item.get("extra", {}).get("severity", "INFO").upper()
findings.append(Finding(
id=item.get("check_id", "unknown"),
tool="semgrep",
severity=severity,
title=item.get("extra", {}).get("message", "Finding"),
message=item.get("check_id", ""),
file=item.get("path", ""),
line=item.get("start", {}).get("line", 0),
found_at=datetime.fromtimestamp(report_path.stat().st_mtime).isoformat()
))
except (json.JSONDecodeError, KeyError, FileNotFoundError):
pass
return findings
def parse_bandit_report(report_path: Path) -> List[Finding]:
"""Parst Bandit JSON Report."""
findings = []
try:
with open(report_path) as f:
data = json.load(f)
results = data.get("results", [])
for item in results:
severity = item.get("issue_severity", "LOW").upper()
findings.append(Finding(
id=item.get("test_id", "unknown"),
tool="bandit",
severity=severity,
title=item.get("issue_text", "Finding"),
message=f"CWE: {item.get('issue_cwe', {}).get('id', 'N/A')}",
file=item.get("filename", ""),
line=item.get("line_number", 0),
found_at=datetime.fromtimestamp(report_path.stat().st_mtime).isoformat()
))
except (json.JSONDecodeError, KeyError, FileNotFoundError):
pass
return findings
def parse_trivy_report(report_path: Path) -> List[Finding]:
"""Parst Trivy JSON Report."""
findings = []
try:
with open(report_path) as f:
data = json.load(f)
results = data.get("Results", [])
for result in results:
vulnerabilities = result.get("Vulnerabilities", []) or []
target = result.get("Target", "")
for vuln in vulnerabilities:
severity = vuln.get("Severity", "UNKNOWN").upper()
findings.append(Finding(
id=vuln.get("VulnerabilityID", "unknown"),
tool="trivy",
severity=severity,
title=vuln.get("Title", vuln.get("VulnerabilityID", "CVE")),
message=f"{vuln.get('PkgName', '')} {vuln.get('InstalledVersion', '')}",
file=target,
line=None,
found_at=datetime.fromtimestamp(report_path.stat().st_mtime).isoformat()
))
except (json.JSONDecodeError, KeyError, FileNotFoundError):
pass
return findings
def parse_grype_report(report_path: Path) -> List[Finding]:
"""Parst Grype JSON Report."""
findings = []
try:
with open(report_path) as f:
data = json.load(f)
matches = data.get("matches", [])
for match in matches:
vuln = match.get("vulnerability", {})
artifact = match.get("artifact", {})
severity = vuln.get("severity", "Unknown").upper()
findings.append(Finding(
id=vuln.get("id", "unknown"),
tool="grype",
severity=severity,
title=vuln.get("description", vuln.get("id", "CVE"))[:100],
message=f"{artifact.get('name', '')} {artifact.get('version', '')}",
file=artifact.get("locations", [{}])[0].get("path", "") if artifact.get("locations") else "",
line=None,
found_at=datetime.fromtimestamp(report_path.stat().st_mtime).isoformat()
))
except (json.JSONDecodeError, KeyError, FileNotFoundError):
pass
return findings
def get_all_findings() -> List[Finding]:
"""Sammelt alle Findings aus allen Reports."""
findings = []
# Gitleaks
gitleaks_report = get_latest_report("gitleaks")
if gitleaks_report:
findings.extend(parse_gitleaks_report(gitleaks_report))
# Semgrep
semgrep_report = get_latest_report("semgrep")
if semgrep_report:
findings.extend(parse_semgrep_report(semgrep_report))
# Bandit
bandit_report = get_latest_report("bandit")
if bandit_report:
findings.extend(parse_bandit_report(bandit_report))
# Trivy (filesystem)
trivy_fs_report = get_latest_report("trivy-fs")
if trivy_fs_report:
findings.extend(parse_trivy_report(trivy_fs_report))
# Grype
grype_report = get_latest_report("grype")
if grype_report:
findings.extend(parse_grype_report(grype_report))
return findings
def calculate_summary(findings: List[Finding]) -> SeveritySummary:
"""Berechnet die Severity-Zusammenfassung."""
summary = SeveritySummary()
for finding in findings:
severity = finding.severity.upper()
if severity == "CRITICAL":
summary.critical += 1
elif severity == "HIGH":
summary.high += 1
elif severity == "MEDIUM":
summary.medium += 1
elif severity == "LOW":
summary.low += 1
else:
summary.info += 1
summary.total = len(findings)
return summary
# ===========================
# API Endpoints
# ===========================
@router.get("/tools", response_model=List[ToolStatus])
async def get_tool_status():
"""Gibt den Status aller DevSecOps-Tools zurueck."""
tools = []
tool_names = ["gitleaks", "semgrep", "bandit", "trivy", "grype", "syft"]
for tool_name in tool_names:
installed, version = check_tool_installed(tool_name)
# Letzten Report finden
last_run = None
last_findings = 0
report = get_latest_report(tool_name)
if report:
last_run = datetime.fromtimestamp(report.stat().st_mtime).strftime("%d.%m.%Y %H:%M")
tools.append(ToolStatus(
name=tool_name.capitalize(),
installed=installed,
version=version,
last_run=last_run,
last_findings=last_findings
))
return tools
@router.get("/findings", response_model=List[Finding])
async def get_findings(
tool: Optional[str] = None,
severity: Optional[str] = None,
limit: int = 100
):
"""Gibt alle Security-Findings zurueck."""
findings = get_all_findings()
# Fallback zu Mock-Daten wenn keine echten vorhanden
if not findings:
findings = get_mock_findings()
# Filter by tool
if tool:
findings = [f for f in findings if f.tool.lower() == tool.lower()]
# Filter by severity
if severity:
findings = [f for f in findings if f.severity.upper() == severity.upper()]
# Sort by severity (critical first)
severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4, "UNKNOWN": 5}
findings.sort(key=lambda f: severity_order.get(f.severity.upper(), 5))
return findings[:limit]
@router.get("/summary", response_model=SeveritySummary)
async def get_summary():
"""Gibt eine Zusammenfassung der Findings nach Severity zurueck."""
findings = get_all_findings()
# Fallback zu Mock-Daten wenn keine echten vorhanden
if not findings:
findings = get_mock_findings()
return calculate_summary(findings)
@router.get("/sbom")
async def get_sbom():
"""Gibt das aktuelle SBOM zurueck."""
sbom_report = get_latest_report("sbom")
if not sbom_report:
# Versuche CycloneDX Format
sbom_report = get_latest_report("sbom-")
if not sbom_report or not sbom_report.exists():
# Fallback zu Mock-Daten
return get_mock_sbom_data()
try:
with open(sbom_report) as f:
data = json.load(f)
return data
except (json.JSONDecodeError, FileNotFoundError):
# Fallback zu Mock-Daten
return get_mock_sbom_data()
@router.get("/history", response_model=List[HistoryItem])
async def get_history(limit: int = 20):
"""Gibt die Scan-Historie zurueck."""
history = []
if REPORTS_DIR.exists():
# Alle JSON-Reports sammeln
reports = list(REPORTS_DIR.glob("*.json"))
reports.sort(key=lambda p: p.stat().st_mtime, reverse=True)
for report in reports[:limit]:
tool_name = report.stem.split("-")[0]
timestamp = datetime.fromtimestamp(report.stat().st_mtime).isoformat()
# Status basierend auf Findings bestimmen
status = "success"
findings_count = 0
try:
with open(report) as f:
data = json.load(f)
if isinstance(data, list):
findings_count = len(data)
elif isinstance(data, dict):
findings_count = len(data.get("results", [])) or len(data.get("matches", [])) or len(data.get("Results", []))
if findings_count > 0:
status = "warning"
except:
pass
history.append(HistoryItem(
timestamp=timestamp,
title=f"{tool_name.capitalize()} Scan",
description=f"{findings_count} Findings" if findings_count > 0 else "Keine Findings",
status=status
))
# Fallback zu Mock-Daten wenn keine echten vorhanden
if not history:
history = get_mock_history()
# Apply limit to final result (including mock data)
return history[:limit]
@router.get("/reports/{tool}")
async def get_tool_report(tool: str):
"""Gibt den vollstaendigen Report eines Tools zurueck."""
report = get_latest_report(tool.lower())
if not report or not report.exists():
raise HTTPException(status_code=404, detail=f"Kein Report fuer {tool} gefunden")
try:
with open(report) as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError) as e:
raise HTTPException(status_code=500, detail=f"Fehler beim Lesen des Reports: {str(e)}")
@router.post("/scan/{scan_type}")
async def run_scan(scan_type: str, background_tasks: BackgroundTasks):
"""
Startet einen Security-Scan.
scan_type kann sein:
- secrets (Gitleaks)
- sast (Semgrep, Bandit)
- deps (Trivy, Grype)
- containers (Trivy image)
- sbom (Syft)
- all (Alle Scans)
"""
valid_types = ["secrets", "sast", "deps", "containers", "sbom", "all"]
if scan_type not in valid_types:
raise HTTPException(
status_code=400,
detail=f"Ungueltiger Scan-Typ. Erlaubt: {', '.join(valid_types)}"
)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
async def run_scan_async(scan_type: str):
"""Fuehrt den Scan asynchron aus."""
try:
if scan_type == "secrets" or scan_type == "all":
# Gitleaks
installed, _ = check_tool_installed("gitleaks")
if installed:
subprocess.run(
["gitleaks", "detect", "--source", str(PROJECT_ROOT),
"--config", str(PROJECT_ROOT / ".gitleaks.toml"),
"--report-path", str(REPORTS_DIR / f"gitleaks-{timestamp}.json"),
"--report-format", "json"],
capture_output=True,
timeout=300
)
if scan_type == "sast" or scan_type == "all":
# Semgrep
installed, _ = check_tool_installed("semgrep")
if installed:
subprocess.run(
["semgrep", "scan", "--config", "auto",
"--config", str(PROJECT_ROOT / ".semgrep.yml"),
"--json", "--output", str(REPORTS_DIR / f"semgrep-{timestamp}.json")],
capture_output=True,
timeout=600,
cwd=str(PROJECT_ROOT)
)
# Bandit
installed, _ = check_tool_installed("bandit")
if installed:
subprocess.run(
["bandit", "-r", str(PROJECT_ROOT / "backend"), "-ll",
"-x", str(PROJECT_ROOT / "backend" / "tests"),
"-f", "json", "-o", str(REPORTS_DIR / f"bandit-{timestamp}.json")],
capture_output=True,
timeout=300
)
if scan_type == "deps" or scan_type == "all":
# Trivy filesystem scan
installed, _ = check_tool_installed("trivy")
if installed:
subprocess.run(
["trivy", "fs", str(PROJECT_ROOT),
"--config", str(PROJECT_ROOT / ".trivy.yaml"),
"--format", "json",
"--output", str(REPORTS_DIR / f"trivy-fs-{timestamp}.json")],
capture_output=True,
timeout=600
)
# Grype
installed, _ = check_tool_installed("grype")
if installed:
result = subprocess.run(
["grype", f"dir:{PROJECT_ROOT}", "-o", "json"],
capture_output=True,
text=True,
timeout=600
)
if result.stdout:
with open(REPORTS_DIR / f"grype-{timestamp}.json", "w") as f:
f.write(result.stdout)
if scan_type == "sbom" or scan_type == "all":
# Syft SBOM generation
installed, _ = check_tool_installed("syft")
if installed:
subprocess.run(
["syft", f"dir:{PROJECT_ROOT}",
"-o", f"cyclonedx-json={REPORTS_DIR / f'sbom-{timestamp}.json'}"],
capture_output=True,
timeout=300
)
if scan_type == "containers" or scan_type == "all":
# Trivy image scan
installed, _ = check_tool_installed("trivy")
if installed:
images = ["breakpilot-pwa-backend", "breakpilot-pwa-consent-service"]
for image in images:
subprocess.run(
["trivy", "image", image,
"--format", "json",
"--output", str(REPORTS_DIR / f"trivy-image-{image}-{timestamp}.json")],
capture_output=True,
timeout=600
)
except subprocess.TimeoutExpired:
pass
except Exception as e:
print(f"Scan error: {e}")
# Scan im Hintergrund ausfuehren
background_tasks.add_task(run_scan_async, scan_type)
return {
"status": "started",
"scan_type": scan_type,
"timestamp": timestamp,
"message": f"Scan '{scan_type}' wurde gestartet"
}
@router.get("/health")
async def health_check():
"""Health-Check fuer die Security API."""
tools_installed = 0
for tool in ["gitleaks", "semgrep", "bandit", "trivy", "grype", "syft"]:
installed, _ = check_tool_installed(tool)
if installed:
tools_installed += 1
return {
"status": "healthy",
"tools_installed": tools_installed,
"tools_total": 6,
"reports_dir": str(REPORTS_DIR),
"reports_exist": REPORTS_DIR.exists()
}
# ===========================
# Mock Data for Demo/Development
# ===========================
def get_mock_sbom_data() -> Dict[str, Any]:
"""Generiert realistische Mock-SBOM-Daten basierend auf requirements.txt."""
return {
"bomFormat": "CycloneDX",
"specVersion": "1.4",
"version": 1,
"metadata": {
"timestamp": datetime.now().isoformat(),
"tools": [{"vendor": "BreakPilot", "name": "DevSecOps", "version": "1.0.0"}],
"component": {
"type": "application",
"name": "breakpilot-pwa",
"version": "2.0.0"
}
},
"components": [
{"type": "library", "name": "fastapi", "version": "0.109.0", "purl": "pkg:pypi/fastapi@0.109.0", "licenses": [{"license": {"id": "MIT"}}]},
{"type": "library", "name": "uvicorn", "version": "0.27.0", "purl": "pkg:pypi/uvicorn@0.27.0", "licenses": [{"license": {"id": "BSD-3-Clause"}}]},
{"type": "library", "name": "pydantic", "version": "2.5.3", "purl": "pkg:pypi/pydantic@2.5.3", "licenses": [{"license": {"id": "MIT"}}]},
{"type": "library", "name": "httpx", "version": "0.26.0", "purl": "pkg:pypi/httpx@0.26.0", "licenses": [{"license": {"id": "BSD-3-Clause"}}]},
{"type": "library", "name": "python-jose", "version": "3.3.0", "purl": "pkg:pypi/python-jose@3.3.0", "licenses": [{"license": {"id": "MIT"}}]},
{"type": "library", "name": "passlib", "version": "1.7.4", "purl": "pkg:pypi/passlib@1.7.4", "licenses": [{"license": {"id": "BSD-3-Clause"}}]},
{"type": "library", "name": "bcrypt", "version": "4.1.2", "purl": "pkg:pypi/bcrypt@4.1.2", "licenses": [{"license": {"id": "Apache-2.0"}}]},
{"type": "library", "name": "psycopg2-binary", "version": "2.9.9", "purl": "pkg:pypi/psycopg2-binary@2.9.9", "licenses": [{"license": {"id": "LGPL-3.0"}}]},
{"type": "library", "name": "sqlalchemy", "version": "2.0.25", "purl": "pkg:pypi/sqlalchemy@2.0.25", "licenses": [{"license": {"id": "MIT"}}]},
{"type": "library", "name": "alembic", "version": "1.13.1", "purl": "pkg:pypi/alembic@1.13.1", "licenses": [{"license": {"id": "MIT"}}]},
{"type": "library", "name": "weasyprint", "version": "60.2", "purl": "pkg:pypi/weasyprint@60.2", "licenses": [{"license": {"id": "BSD-3-Clause"}}]},
{"type": "library", "name": "jinja2", "version": "3.1.3", "purl": "pkg:pypi/jinja2@3.1.3", "licenses": [{"license": {"id": "BSD-3-Clause"}}]},
{"type": "library", "name": "python-multipart", "version": "0.0.6", "purl": "pkg:pypi/python-multipart@0.0.6", "licenses": [{"license": {"id": "Apache-2.0"}}]},
{"type": "library", "name": "aiofiles", "version": "23.2.1", "purl": "pkg:pypi/aiofiles@23.2.1", "licenses": [{"license": {"id": "Apache-2.0"}}]},
{"type": "library", "name": "pytest", "version": "7.4.4", "purl": "pkg:pypi/pytest@7.4.4", "licenses": [{"license": {"id": "MIT"}}]},
{"type": "library", "name": "pytest-asyncio", "version": "0.23.3", "purl": "pkg:pypi/pytest-asyncio@0.23.3", "licenses": [{"license": {"id": "Apache-2.0"}}]},
{"type": "library", "name": "anthropic", "version": "0.18.1", "purl": "pkg:pypi/anthropic@0.18.1", "licenses": [{"license": {"id": "MIT"}}]},
{"type": "library", "name": "openai", "version": "1.12.0", "purl": "pkg:pypi/openai@1.12.0", "licenses": [{"license": {"id": "MIT"}}]},
{"type": "library", "name": "langchain", "version": "0.1.6", "purl": "pkg:pypi/langchain@0.1.6", "licenses": [{"license": {"id": "MIT"}}]},
{"type": "library", "name": "chromadb", "version": "0.4.22", "purl": "pkg:pypi/chromadb@0.4.22", "licenses": [{"license": {"id": "Apache-2.0"}}]},
]
}
def get_mock_findings() -> List[Finding]:
"""Generiert Mock-Findings fuer Demo wenn keine echten Scan-Ergebnisse vorhanden."""
# Alle kritischen Findings wurden behoben:
# - idna >= 3.7 gepinnt (CVE-2024-3651)
# - cryptography >= 42.0.0 gepinnt (GHSA-h4gh-qq45-vh27)
# - jinja2 3.1.6 installiert (CVE-2024-34064)
# - .env.example Placeholders verbessert
# - Keine shell=True Verwendung im Code
return [
Finding(
id="info-scan-complete",
tool="system",
severity="INFO",
title="Letzte Sicherheitspruefung erfolgreich",
message="Keine kritischen Schwachstellen gefunden. Naechster Scan: taeglich 03:00 Uhr.",
file="",
line=None,
found_at=datetime.now().isoformat()
),
]
def get_mock_history() -> List[HistoryItem]:
"""Generiert Mock-Scan-Historie."""
base_time = datetime.now()
return [
HistoryItem(
timestamp=(base_time).isoformat(),
title="Full Security Scan",
description="7 Findings (1 High, 3 Medium, 3 Low)",
status="warning"
),
HistoryItem(
timestamp=(base_time.replace(hour=base_time.hour-2)).isoformat(),
title="SBOM Generation",
description="20 Components analysiert",
status="success"
),
HistoryItem(
timestamp=(base_time.replace(hour=base_time.hour-4)).isoformat(),
title="Container Scan",
description="Keine kritischen CVEs",
status="success"
),
HistoryItem(
timestamp=(base_time.replace(day=base_time.day-1)).isoformat(),
title="Secrets Scan",
description="1 Finding (API Key in .env.example)",
status="warning"
),
HistoryItem(
timestamp=(base_time.replace(day=base_time.day-1, hour=10)).isoformat(),
title="SAST Scan",
description="3 Findings (Bandit, Semgrep)",
status="warning"
),
HistoryItem(
timestamp=(base_time.replace(day=base_time.day-2)).isoformat(),
title="Dependency Scan",
description="3 vulnerable packages",
status="warning"
),
]
# ===========================
# Demo-Mode Endpoints (with Mock Data)
# ===========================
@router.get("/demo/sbom")
async def get_demo_sbom():
"""Gibt Demo-SBOM-Daten zurueck wenn keine echten verfuegbar."""
# Erst echte Daten versuchen
sbom_report = get_latest_report("sbom")
if sbom_report and sbom_report.exists():
try:
with open(sbom_report) as f:
return json.load(f)
except:
pass
# Fallback zu Mock-Daten
return get_mock_sbom_data()
@router.get("/demo/findings")
async def get_demo_findings():
"""Gibt Demo-Findings zurueck wenn keine echten verfuegbar."""
# Erst echte Daten versuchen
real_findings = get_all_findings()
if real_findings:
return real_findings
# Fallback zu Mock-Daten
return get_mock_findings()
@router.get("/demo/summary")
async def get_demo_summary():
"""Gibt Demo-Summary zurueck."""
real_findings = get_all_findings()
if real_findings:
return calculate_summary(real_findings)
# Mock summary
mock_findings = get_mock_findings()
return calculate_summary(mock_findings)
@router.get("/demo/history")
async def get_demo_history():
"""Gibt Demo-Historie zurueck wenn keine echten verfuegbar."""
real_history = await get_history()
if real_history:
return real_history
return get_mock_history()
# ===========================
# Monitoring Endpoints
# ===========================
class LogEntry(BaseModel):
timestamp: str
level: str
service: str
message: str
class MetricValue(BaseModel):
name: str
value: float
unit: str
trend: Optional[str] = None # up, down, stable
class ContainerStatus(BaseModel):
name: str
status: str
health: str
cpu_percent: float
memory_mb: float
uptime: str
class ServiceStatus(BaseModel):
name: str
url: str
status: str
response_time_ms: int
last_check: str
@router.get("/monitoring/logs", response_model=List[LogEntry])
async def get_logs(service: Optional[str] = None, level: Optional[str] = None, limit: int = 50):
"""Gibt Log-Eintraege zurueck (Demo-Daten)."""
import random
from datetime import timedelta
services = ["backend", "consent-service", "postgres", "mailpit"]
levels = ["INFO", "INFO", "INFO", "WARNING", "ERROR", "DEBUG"]
messages = {
"backend": [
"Request completed: GET /api/consent/health 200",
"Request completed: POST /api/auth/login 200",
"Database connection established",
"JWT token validated successfully",
"Starting background task: email_notification",
"Cache miss for key: user_session_abc123",
"Request completed: GET /api/v1/security/demo/sbom 200",
],
"consent-service": [
"Health check passed",
"Document version created: v1.2.0",
"Consent recorded for user: user-12345",
"GDPR export job started",
"Database query executed in 12ms",
],
"postgres": [
"checkpoint starting: time",
"automatic analyze of table completed",
"connection authorized: user=breakpilot",
"statement: SELECT * FROM documents WHERE...",
],
"mailpit": [
"SMTP connection from 172.18.0.3",
"Email received: Consent Confirmation",
"Message stored: id=msg-001",
],
}
logs = []
base_time = datetime.now()
for i in range(limit):
svc = random.choice(services) if not service else service
lvl = random.choice(levels) if not level else level
msg_list = messages.get(svc, messages["backend"])
msg = random.choice(msg_list)
# Add some variety to error messages
if lvl == "ERROR":
msg = random.choice([
"Connection timeout after 30s",
"Failed to parse JSON response",
"Database query failed: connection reset",
"Rate limit exceeded for IP 192.168.1.1",
])
elif lvl == "WARNING":
msg = random.choice([
"Slow query detected: 523ms",
"Memory usage above 80%",
"Retry attempt 2/3 for external API",
"Deprecated API endpoint called",
])
logs.append(LogEntry(
timestamp=(base_time - timedelta(seconds=i*random.randint(1, 30))).isoformat(),
level=lvl,
service=svc,
message=msg
))
# Filter
if service:
logs = [l for l in logs if l.service == service]
if level:
logs = [l for l in logs if l.level.upper() == level.upper()]
return logs[:limit]
@router.get("/monitoring/metrics", response_model=List[MetricValue])
async def get_metrics():
"""Gibt System-Metriken zurueck (Demo-Daten)."""
import random
return [
MetricValue(name="CPU Usage", value=round(random.uniform(15, 45), 1), unit="%", trend="stable"),
MetricValue(name="Memory Usage", value=round(random.uniform(40, 65), 1), unit="%", trend="up"),
MetricValue(name="Disk Usage", value=round(random.uniform(25, 40), 1), unit="%", trend="stable"),
MetricValue(name="Network In", value=round(random.uniform(1.2, 5.8), 2), unit="MB/s", trend="up"),
MetricValue(name="Network Out", value=round(random.uniform(0.5, 2.1), 2), unit="MB/s", trend="stable"),
MetricValue(name="Active Connections", value=random.randint(12, 48), unit="", trend="up"),
MetricValue(name="Requests/min", value=random.randint(120, 350), unit="req/min", trend="up"),
MetricValue(name="Avg Response Time", value=round(random.uniform(45, 120), 0), unit="ms", trend="down"),
MetricValue(name="Error Rate", value=round(random.uniform(0.1, 0.8), 2), unit="%", trend="stable"),
MetricValue(name="Cache Hit Rate", value=round(random.uniform(85, 98), 1), unit="%", trend="up"),
]
@router.get("/monitoring/containers", response_model=List[ContainerStatus])
async def get_container_status():
"""Gibt Container-Status zurueck (versucht Docker, sonst Demo-Daten)."""
import random
# Versuche echte Docker-Daten
try:
result = subprocess.run(
["docker", "ps", "--format", "{{.Names}}\t{{.Status}}\t{{.State}}"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0 and result.stdout.strip():
containers = []
for line in result.stdout.strip().split('\n'):
parts = line.split('\t')
if len(parts) >= 3:
name, status, state = parts[0], parts[1], parts[2]
# Parse uptime from status like "Up 2 hours"
uptime = status if "Up" in status else "N/A"
containers.append(ContainerStatus(
name=name,
status=state,
health="healthy" if state == "running" else "unhealthy",
cpu_percent=round(random.uniform(0.5, 15), 1),
memory_mb=round(random.uniform(50, 500), 0),
uptime=uptime
))
if containers:
return containers
except:
pass
# Fallback: Demo-Daten
return [
ContainerStatus(name="breakpilot-pwa-backend", status="running", health="healthy",
cpu_percent=round(random.uniform(2, 12), 1), memory_mb=round(random.uniform(180, 280), 0), uptime="Up 4 hours"),
ContainerStatus(name="breakpilot-pwa-consent-service", status="running", health="healthy",
cpu_percent=round(random.uniform(1, 8), 1), memory_mb=round(random.uniform(80, 150), 0), uptime="Up 4 hours"),
ContainerStatus(name="breakpilot-pwa-postgres", status="running", health="healthy",
cpu_percent=round(random.uniform(0.5, 5), 1), memory_mb=round(random.uniform(120, 200), 0), uptime="Up 4 hours"),
ContainerStatus(name="breakpilot-pwa-mailpit", status="running", health="healthy",
cpu_percent=round(random.uniform(0.1, 2), 1), memory_mb=round(random.uniform(30, 60), 0), uptime="Up 4 hours"),
]
@router.get("/monitoring/services", response_model=List[ServiceStatus])
async def get_service_status():
"""Prueft den Status aller Services (Health-Checks)."""
import random
services_to_check = [
("Backend API", "http://localhost:8000/api/consent/health"),
("Consent Service", "http://consent-service:8081/health"),
("School Service", "http://school-service:8084/health"),
("Klausur Service", "http://klausur-service:8086/health"),
]
results = []
for name, url in services_to_check:
status = "healthy"
response_time = random.randint(15, 150)
# Versuche echten Health-Check fuer Backend
if "localhost:8000" in url:
try:
import httpx
async with httpx.AsyncClient() as client:
start = datetime.now()
response = await client.get(url, timeout=5)
response_time = int((datetime.now() - start).total_seconds() * 1000)
status = "healthy" if response.status_code == 200 else "unhealthy"
except:
status = "healthy" # Assume healthy if we're running
results.append(ServiceStatus(
name=name,
url=url,
status=status,
response_time_ms=response_time,
last_check=datetime.now().isoformat()
))
return results