Some checks failed
Tests / Go Tests (push) Has been cancelled
Tests / Python Tests (push) Has been cancelled
Tests / Integration Tests (push) Has been cancelled
Tests / Go Lint (push) Has been cancelled
Tests / Python Lint (push) Has been cancelled
Tests / Security Scan (push) Has been cancelled
Tests / All Checks Passed (push) Has been cancelled
Security Scanning / Secret Scanning (push) Has been cancelled
Security Scanning / Dependency Vulnerability Scan (push) Has been cancelled
Security Scanning / Go Security Scan (push) Has been cancelled
Security Scanning / Python Security Scan (push) Has been cancelled
Security Scanning / Node.js Security Scan (push) Has been cancelled
Security Scanning / Docker Image Security (push) Has been cancelled
Security Scanning / Security Summary (push) Has been cancelled
CI/CD Pipeline / Go Tests (push) Has been cancelled
CI/CD Pipeline / Python Tests (push) Has been cancelled
CI/CD Pipeline / Website Tests (push) Has been cancelled
CI/CD Pipeline / Linting (push) Has been cancelled
CI/CD Pipeline / Security Scan (push) Has been cancelled
CI/CD Pipeline / Docker Build & Push (push) Has been cancelled
CI/CD Pipeline / Integration Tests (push) Has been cancelled
CI/CD Pipeline / Deploy to Staging (push) Has been cancelled
CI/CD Pipeline / Deploy to Production (push) Has been cancelled
CI/CD Pipeline / CI Summary (push) Has been cancelled
ci/woodpecker/manual/build-ci-image Pipeline was successful
ci/woodpecker/manual/main Pipeline failed
All services: admin-v2, studio-v2, website, ai-compliance-sdk, consent-service, klausur-service, voice-service, and infrastructure. Large PDFs and compiled binaries excluded via .gitignore.
996 lines
37 KiB
Python
996 lines
37 KiB
Python
"""
|
|
BreakPilot Security API
|
|
|
|
Endpunkte fuer das Security Dashboard:
|
|
- Tool-Status abfragen
|
|
- Scan-Ergebnisse abrufen
|
|
- Scans ausloesen
|
|
- SBOM-Daten abrufen
|
|
- Scan-Historie anzeigen
|
|
|
|
Features:
|
|
- Liest Security-Reports aus dem security-reports/ Verzeichnis
|
|
- Fuehrt Security-Scans via subprocess aus
|
|
- Parst Gitleaks, Semgrep, Trivy, Grype JSON-Reports
|
|
- Generiert SBOM mit Syft
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import subprocess
|
|
import asyncio
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional
|
|
from fastapi import APIRouter, HTTPException, BackgroundTasks
|
|
from pydantic import BaseModel
|
|
|
|
router = APIRouter(prefix="/v1/security", tags=["Security"])
|
|
|
|
# Pfade - innerhalb des Backend-Verzeichnisses
|
|
# In Docker: /app/security-reports, /app/scripts
|
|
# Lokal: backend/security-reports, backend/scripts
|
|
BACKEND_DIR = Path(__file__).parent
|
|
REPORTS_DIR = BACKEND_DIR / "security-reports"
|
|
SCRIPTS_DIR = BACKEND_DIR / "scripts"
|
|
|
|
# Sicherstellen, dass das Reports-Verzeichnis existiert
|
|
try:
|
|
REPORTS_DIR.mkdir(exist_ok=True)
|
|
except PermissionError:
|
|
# Falls keine Schreibrechte, verwende tmp-Verzeichnis
|
|
REPORTS_DIR = Path("/tmp/security-reports")
|
|
REPORTS_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
# ===========================
|
|
# Pydantic Models
|
|
# ===========================
|
|
|
|
class ToolStatus(BaseModel):
|
|
name: str
|
|
installed: bool
|
|
version: Optional[str] = None
|
|
last_run: Optional[str] = None
|
|
last_findings: int = 0
|
|
|
|
|
|
class Finding(BaseModel):
|
|
id: str
|
|
tool: str
|
|
severity: str
|
|
title: str
|
|
message: Optional[str] = None
|
|
file: Optional[str] = None
|
|
line: Optional[int] = None
|
|
found_at: str
|
|
|
|
|
|
class SeveritySummary(BaseModel):
|
|
critical: int = 0
|
|
high: int = 0
|
|
medium: int = 0
|
|
low: int = 0
|
|
info: int = 0
|
|
total: int = 0
|
|
|
|
|
|
class ScanResult(BaseModel):
|
|
tool: str
|
|
status: str
|
|
started_at: str
|
|
completed_at: Optional[str] = None
|
|
findings_count: int = 0
|
|
report_path: Optional[str] = None
|
|
|
|
|
|
class HistoryItem(BaseModel):
|
|
timestamp: str
|
|
title: str
|
|
description: str
|
|
status: str # success, warning, error
|
|
|
|
|
|
# ===========================
|
|
# Utility Functions
|
|
# ===========================
|
|
|
|
def check_tool_installed(tool_name: str) -> tuple[bool, Optional[str]]:
|
|
"""Prueft, ob ein Tool installiert ist und gibt die Version zurueck."""
|
|
try:
|
|
if tool_name == "gitleaks":
|
|
result = subprocess.run(["gitleaks", "version"], capture_output=True, text=True, timeout=5)
|
|
if result.returncode == 0:
|
|
return True, result.stdout.strip()
|
|
elif tool_name == "semgrep":
|
|
result = subprocess.run(["semgrep", "--version"], capture_output=True, text=True, timeout=5)
|
|
if result.returncode == 0:
|
|
return True, result.stdout.strip().split('\n')[0]
|
|
elif tool_name == "bandit":
|
|
result = subprocess.run(["bandit", "--version"], capture_output=True, text=True, timeout=5)
|
|
if result.returncode == 0:
|
|
return True, result.stdout.strip()
|
|
elif tool_name == "trivy":
|
|
result = subprocess.run(["trivy", "version"], capture_output=True, text=True, timeout=5)
|
|
if result.returncode == 0:
|
|
# Parse "Version: 0.48.x"
|
|
for line in result.stdout.split('\n'):
|
|
if line.startswith('Version:'):
|
|
return True, line.split(':')[1].strip()
|
|
return True, result.stdout.strip().split('\n')[0]
|
|
elif tool_name == "grype":
|
|
result = subprocess.run(["grype", "version"], capture_output=True, text=True, timeout=5)
|
|
if result.returncode == 0:
|
|
return True, result.stdout.strip().split('\n')[0]
|
|
elif tool_name == "syft":
|
|
result = subprocess.run(["syft", "version"], capture_output=True, text=True, timeout=5)
|
|
if result.returncode == 0:
|
|
return True, result.stdout.strip().split('\n')[0]
|
|
except (subprocess.TimeoutExpired, FileNotFoundError):
|
|
pass
|
|
return False, None
|
|
|
|
|
|
def get_latest_report(tool_prefix: str) -> Optional[Path]:
|
|
"""Findet den neuesten Report fuer ein Tool."""
|
|
if not REPORTS_DIR.exists():
|
|
return None
|
|
|
|
reports = list(REPORTS_DIR.glob(f"{tool_prefix}*.json"))
|
|
if not reports:
|
|
return None
|
|
|
|
return max(reports, key=lambda p: p.stat().st_mtime)
|
|
|
|
|
|
def parse_gitleaks_report(report_path: Path) -> List[Finding]:
|
|
"""Parst Gitleaks JSON Report."""
|
|
findings = []
|
|
try:
|
|
with open(report_path) as f:
|
|
data = json.load(f)
|
|
if isinstance(data, list):
|
|
for item in data:
|
|
findings.append(Finding(
|
|
id=item.get("Fingerprint", "unknown"),
|
|
tool="gitleaks",
|
|
severity="HIGH", # Secrets sind immer kritisch
|
|
title=item.get("Description", "Secret detected"),
|
|
message=f"Rule: {item.get('RuleID', 'unknown')}",
|
|
file=item.get("File", ""),
|
|
line=item.get("StartLine", 0),
|
|
found_at=datetime.fromtimestamp(report_path.stat().st_mtime).isoformat()
|
|
))
|
|
except (json.JSONDecodeError, KeyError, FileNotFoundError):
|
|
pass
|
|
return findings
|
|
|
|
|
|
def parse_semgrep_report(report_path: Path) -> List[Finding]:
|
|
"""Parst Semgrep JSON Report."""
|
|
findings = []
|
|
try:
|
|
with open(report_path) as f:
|
|
data = json.load(f)
|
|
results = data.get("results", [])
|
|
for item in results:
|
|
severity = item.get("extra", {}).get("severity", "INFO").upper()
|
|
findings.append(Finding(
|
|
id=item.get("check_id", "unknown"),
|
|
tool="semgrep",
|
|
severity=severity,
|
|
title=item.get("extra", {}).get("message", "Finding"),
|
|
message=item.get("check_id", ""),
|
|
file=item.get("path", ""),
|
|
line=item.get("start", {}).get("line", 0),
|
|
found_at=datetime.fromtimestamp(report_path.stat().st_mtime).isoformat()
|
|
))
|
|
except (json.JSONDecodeError, KeyError, FileNotFoundError):
|
|
pass
|
|
return findings
|
|
|
|
|
|
def parse_bandit_report(report_path: Path) -> List[Finding]:
|
|
"""Parst Bandit JSON Report."""
|
|
findings = []
|
|
try:
|
|
with open(report_path) as f:
|
|
data = json.load(f)
|
|
results = data.get("results", [])
|
|
for item in results:
|
|
severity = item.get("issue_severity", "LOW").upper()
|
|
findings.append(Finding(
|
|
id=item.get("test_id", "unknown"),
|
|
tool="bandit",
|
|
severity=severity,
|
|
title=item.get("issue_text", "Finding"),
|
|
message=f"CWE: {item.get('issue_cwe', {}).get('id', 'N/A')}",
|
|
file=item.get("filename", ""),
|
|
line=item.get("line_number", 0),
|
|
found_at=datetime.fromtimestamp(report_path.stat().st_mtime).isoformat()
|
|
))
|
|
except (json.JSONDecodeError, KeyError, FileNotFoundError):
|
|
pass
|
|
return findings
|
|
|
|
|
|
def parse_trivy_report(report_path: Path) -> List[Finding]:
|
|
"""Parst Trivy JSON Report."""
|
|
findings = []
|
|
try:
|
|
with open(report_path) as f:
|
|
data = json.load(f)
|
|
results = data.get("Results", [])
|
|
for result in results:
|
|
vulnerabilities = result.get("Vulnerabilities", []) or []
|
|
target = result.get("Target", "")
|
|
for vuln in vulnerabilities:
|
|
severity = vuln.get("Severity", "UNKNOWN").upper()
|
|
findings.append(Finding(
|
|
id=vuln.get("VulnerabilityID", "unknown"),
|
|
tool="trivy",
|
|
severity=severity,
|
|
title=vuln.get("Title", vuln.get("VulnerabilityID", "CVE")),
|
|
message=f"{vuln.get('PkgName', '')} {vuln.get('InstalledVersion', '')}",
|
|
file=target,
|
|
line=None,
|
|
found_at=datetime.fromtimestamp(report_path.stat().st_mtime).isoformat()
|
|
))
|
|
except (json.JSONDecodeError, KeyError, FileNotFoundError):
|
|
pass
|
|
return findings
|
|
|
|
|
|
def parse_grype_report(report_path: Path) -> List[Finding]:
|
|
"""Parst Grype JSON Report."""
|
|
findings = []
|
|
try:
|
|
with open(report_path) as f:
|
|
data = json.load(f)
|
|
matches = data.get("matches", [])
|
|
for match in matches:
|
|
vuln = match.get("vulnerability", {})
|
|
artifact = match.get("artifact", {})
|
|
severity = vuln.get("severity", "Unknown").upper()
|
|
findings.append(Finding(
|
|
id=vuln.get("id", "unknown"),
|
|
tool="grype",
|
|
severity=severity,
|
|
title=vuln.get("description", vuln.get("id", "CVE"))[:100],
|
|
message=f"{artifact.get('name', '')} {artifact.get('version', '')}",
|
|
file=artifact.get("locations", [{}])[0].get("path", "") if artifact.get("locations") else "",
|
|
line=None,
|
|
found_at=datetime.fromtimestamp(report_path.stat().st_mtime).isoformat()
|
|
))
|
|
except (json.JSONDecodeError, KeyError, FileNotFoundError):
|
|
pass
|
|
return findings
|
|
|
|
|
|
def get_all_findings() -> List[Finding]:
|
|
"""Sammelt alle Findings aus allen Reports."""
|
|
findings = []
|
|
|
|
# Gitleaks
|
|
gitleaks_report = get_latest_report("gitleaks")
|
|
if gitleaks_report:
|
|
findings.extend(parse_gitleaks_report(gitleaks_report))
|
|
|
|
# Semgrep
|
|
semgrep_report = get_latest_report("semgrep")
|
|
if semgrep_report:
|
|
findings.extend(parse_semgrep_report(semgrep_report))
|
|
|
|
# Bandit
|
|
bandit_report = get_latest_report("bandit")
|
|
if bandit_report:
|
|
findings.extend(parse_bandit_report(bandit_report))
|
|
|
|
# Trivy (filesystem)
|
|
trivy_fs_report = get_latest_report("trivy-fs")
|
|
if trivy_fs_report:
|
|
findings.extend(parse_trivy_report(trivy_fs_report))
|
|
|
|
# Grype
|
|
grype_report = get_latest_report("grype")
|
|
if grype_report:
|
|
findings.extend(parse_grype_report(grype_report))
|
|
|
|
return findings
|
|
|
|
|
|
def calculate_summary(findings: List[Finding]) -> SeveritySummary:
|
|
"""Berechnet die Severity-Zusammenfassung."""
|
|
summary = SeveritySummary()
|
|
for finding in findings:
|
|
severity = finding.severity.upper()
|
|
if severity == "CRITICAL":
|
|
summary.critical += 1
|
|
elif severity == "HIGH":
|
|
summary.high += 1
|
|
elif severity == "MEDIUM":
|
|
summary.medium += 1
|
|
elif severity == "LOW":
|
|
summary.low += 1
|
|
else:
|
|
summary.info += 1
|
|
summary.total = len(findings)
|
|
return summary
|
|
|
|
|
|
# ===========================
|
|
# API Endpoints
|
|
# ===========================
|
|
|
|
@router.get("/tools", response_model=List[ToolStatus])
|
|
async def get_tool_status():
|
|
"""Gibt den Status aller DevSecOps-Tools zurueck."""
|
|
tools = []
|
|
|
|
tool_names = ["gitleaks", "semgrep", "bandit", "trivy", "grype", "syft"]
|
|
|
|
for tool_name in tool_names:
|
|
installed, version = check_tool_installed(tool_name)
|
|
|
|
# Letzten Report finden
|
|
last_run = None
|
|
last_findings = 0
|
|
report = get_latest_report(tool_name)
|
|
if report:
|
|
last_run = datetime.fromtimestamp(report.stat().st_mtime).strftime("%d.%m.%Y %H:%M")
|
|
|
|
tools.append(ToolStatus(
|
|
name=tool_name.capitalize(),
|
|
installed=installed,
|
|
version=version,
|
|
last_run=last_run,
|
|
last_findings=last_findings
|
|
))
|
|
|
|
return tools
|
|
|
|
|
|
@router.get("/findings", response_model=List[Finding])
|
|
async def get_findings(
|
|
tool: Optional[str] = None,
|
|
severity: Optional[str] = None,
|
|
limit: int = 100
|
|
):
|
|
"""Gibt alle Security-Findings zurueck."""
|
|
findings = get_all_findings()
|
|
|
|
# Fallback zu Mock-Daten wenn keine echten vorhanden
|
|
if not findings:
|
|
findings = get_mock_findings()
|
|
|
|
# Filter by tool
|
|
if tool:
|
|
findings = [f for f in findings if f.tool.lower() == tool.lower()]
|
|
|
|
# Filter by severity
|
|
if severity:
|
|
findings = [f for f in findings if f.severity.upper() == severity.upper()]
|
|
|
|
# Sort by severity (critical first)
|
|
severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4, "UNKNOWN": 5}
|
|
findings.sort(key=lambda f: severity_order.get(f.severity.upper(), 5))
|
|
|
|
return findings[:limit]
|
|
|
|
|
|
@router.get("/summary", response_model=SeveritySummary)
|
|
async def get_summary():
|
|
"""Gibt eine Zusammenfassung der Findings nach Severity zurueck."""
|
|
findings = get_all_findings()
|
|
# Fallback zu Mock-Daten wenn keine echten vorhanden
|
|
if not findings:
|
|
findings = get_mock_findings()
|
|
return calculate_summary(findings)
|
|
|
|
|
|
@router.get("/sbom")
|
|
async def get_sbom():
|
|
"""Gibt das aktuelle SBOM zurueck."""
|
|
sbom_report = get_latest_report("sbom")
|
|
if not sbom_report:
|
|
# Versuche CycloneDX Format
|
|
sbom_report = get_latest_report("sbom-")
|
|
|
|
if not sbom_report or not sbom_report.exists():
|
|
# Fallback zu Mock-Daten
|
|
return get_mock_sbom_data()
|
|
|
|
try:
|
|
with open(sbom_report) as f:
|
|
data = json.load(f)
|
|
return data
|
|
except (json.JSONDecodeError, FileNotFoundError):
|
|
# Fallback zu Mock-Daten
|
|
return get_mock_sbom_data()
|
|
|
|
|
|
@router.get("/history", response_model=List[HistoryItem])
|
|
async def get_history(limit: int = 20):
|
|
"""Gibt die Scan-Historie zurueck."""
|
|
history = []
|
|
|
|
if REPORTS_DIR.exists():
|
|
# Alle JSON-Reports sammeln
|
|
reports = list(REPORTS_DIR.glob("*.json"))
|
|
reports.sort(key=lambda p: p.stat().st_mtime, reverse=True)
|
|
|
|
for report in reports[:limit]:
|
|
tool_name = report.stem.split("-")[0]
|
|
timestamp = datetime.fromtimestamp(report.stat().st_mtime).isoformat()
|
|
|
|
# Status basierend auf Findings bestimmen
|
|
status = "success"
|
|
findings_count = 0
|
|
try:
|
|
with open(report) as f:
|
|
data = json.load(f)
|
|
if isinstance(data, list):
|
|
findings_count = len(data)
|
|
elif isinstance(data, dict):
|
|
findings_count = len(data.get("results", [])) or len(data.get("matches", [])) or len(data.get("Results", []))
|
|
|
|
if findings_count > 0:
|
|
status = "warning"
|
|
except:
|
|
pass
|
|
|
|
history.append(HistoryItem(
|
|
timestamp=timestamp,
|
|
title=f"{tool_name.capitalize()} Scan",
|
|
description=f"{findings_count} Findings" if findings_count > 0 else "Keine Findings",
|
|
status=status
|
|
))
|
|
|
|
# Fallback zu Mock-Daten wenn keine echten vorhanden
|
|
if not history:
|
|
history = get_mock_history()
|
|
|
|
# Apply limit to final result (including mock data)
|
|
return history[:limit]
|
|
|
|
|
|
@router.get("/reports/{tool}")
|
|
async def get_tool_report(tool: str):
|
|
"""Gibt den vollstaendigen Report eines Tools zurueck."""
|
|
report = get_latest_report(tool.lower())
|
|
if not report or not report.exists():
|
|
raise HTTPException(status_code=404, detail=f"Kein Report fuer {tool} gefunden")
|
|
|
|
try:
|
|
with open(report) as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, FileNotFoundError) as e:
|
|
raise HTTPException(status_code=500, detail=f"Fehler beim Lesen des Reports: {str(e)}")
|
|
|
|
|
|
@router.post("/scan/{scan_type}")
|
|
async def run_scan(scan_type: str, background_tasks: BackgroundTasks):
|
|
"""
|
|
Startet einen Security-Scan.
|
|
|
|
scan_type kann sein:
|
|
- secrets (Gitleaks)
|
|
- sast (Semgrep, Bandit)
|
|
- deps (Trivy, Grype)
|
|
- containers (Trivy image)
|
|
- sbom (Syft)
|
|
- all (Alle Scans)
|
|
"""
|
|
valid_types = ["secrets", "sast", "deps", "containers", "sbom", "all"]
|
|
if scan_type not in valid_types:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Ungueltiger Scan-Typ. Erlaubt: {', '.join(valid_types)}"
|
|
)
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
async def run_scan_async(scan_type: str):
|
|
"""Fuehrt den Scan asynchron aus."""
|
|
try:
|
|
if scan_type == "secrets" or scan_type == "all":
|
|
# Gitleaks
|
|
installed, _ = check_tool_installed("gitleaks")
|
|
if installed:
|
|
subprocess.run(
|
|
["gitleaks", "detect", "--source", str(PROJECT_ROOT),
|
|
"--config", str(PROJECT_ROOT / ".gitleaks.toml"),
|
|
"--report-path", str(REPORTS_DIR / f"gitleaks-{timestamp}.json"),
|
|
"--report-format", "json"],
|
|
capture_output=True,
|
|
timeout=300
|
|
)
|
|
|
|
if scan_type == "sast" or scan_type == "all":
|
|
# Semgrep
|
|
installed, _ = check_tool_installed("semgrep")
|
|
if installed:
|
|
subprocess.run(
|
|
["semgrep", "scan", "--config", "auto",
|
|
"--config", str(PROJECT_ROOT / ".semgrep.yml"),
|
|
"--json", "--output", str(REPORTS_DIR / f"semgrep-{timestamp}.json")],
|
|
capture_output=True,
|
|
timeout=600,
|
|
cwd=str(PROJECT_ROOT)
|
|
)
|
|
|
|
# Bandit
|
|
installed, _ = check_tool_installed("bandit")
|
|
if installed:
|
|
subprocess.run(
|
|
["bandit", "-r", str(PROJECT_ROOT / "backend"), "-ll",
|
|
"-x", str(PROJECT_ROOT / "backend" / "tests"),
|
|
"-f", "json", "-o", str(REPORTS_DIR / f"bandit-{timestamp}.json")],
|
|
capture_output=True,
|
|
timeout=300
|
|
)
|
|
|
|
if scan_type == "deps" or scan_type == "all":
|
|
# Trivy filesystem scan
|
|
installed, _ = check_tool_installed("trivy")
|
|
if installed:
|
|
subprocess.run(
|
|
["trivy", "fs", str(PROJECT_ROOT),
|
|
"--config", str(PROJECT_ROOT / ".trivy.yaml"),
|
|
"--format", "json",
|
|
"--output", str(REPORTS_DIR / f"trivy-fs-{timestamp}.json")],
|
|
capture_output=True,
|
|
timeout=600
|
|
)
|
|
|
|
# Grype
|
|
installed, _ = check_tool_installed("grype")
|
|
if installed:
|
|
result = subprocess.run(
|
|
["grype", f"dir:{PROJECT_ROOT}", "-o", "json"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=600
|
|
)
|
|
if result.stdout:
|
|
with open(REPORTS_DIR / f"grype-{timestamp}.json", "w") as f:
|
|
f.write(result.stdout)
|
|
|
|
if scan_type == "sbom" or scan_type == "all":
|
|
# Syft SBOM generation
|
|
installed, _ = check_tool_installed("syft")
|
|
if installed:
|
|
subprocess.run(
|
|
["syft", f"dir:{PROJECT_ROOT}",
|
|
"-o", f"cyclonedx-json={REPORTS_DIR / f'sbom-{timestamp}.json'}"],
|
|
capture_output=True,
|
|
timeout=300
|
|
)
|
|
|
|
if scan_type == "containers" or scan_type == "all":
|
|
# Trivy image scan
|
|
installed, _ = check_tool_installed("trivy")
|
|
if installed:
|
|
images = ["breakpilot-pwa-backend", "breakpilot-pwa-consent-service"]
|
|
for image in images:
|
|
subprocess.run(
|
|
["trivy", "image", image,
|
|
"--format", "json",
|
|
"--output", str(REPORTS_DIR / f"trivy-image-{image}-{timestamp}.json")],
|
|
capture_output=True,
|
|
timeout=600
|
|
)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
pass
|
|
except Exception as e:
|
|
print(f"Scan error: {e}")
|
|
|
|
# Scan im Hintergrund ausfuehren
|
|
background_tasks.add_task(run_scan_async, scan_type)
|
|
|
|
return {
|
|
"status": "started",
|
|
"scan_type": scan_type,
|
|
"timestamp": timestamp,
|
|
"message": f"Scan '{scan_type}' wurde gestartet"
|
|
}
|
|
|
|
|
|
@router.get("/health")
|
|
async def health_check():
|
|
"""Health-Check fuer die Security API."""
|
|
tools_installed = 0
|
|
for tool in ["gitleaks", "semgrep", "bandit", "trivy", "grype", "syft"]:
|
|
installed, _ = check_tool_installed(tool)
|
|
if installed:
|
|
tools_installed += 1
|
|
|
|
return {
|
|
"status": "healthy",
|
|
"tools_installed": tools_installed,
|
|
"tools_total": 6,
|
|
"reports_dir": str(REPORTS_DIR),
|
|
"reports_exist": REPORTS_DIR.exists()
|
|
}
|
|
|
|
|
|
# ===========================
|
|
# Mock Data for Demo/Development
|
|
# ===========================
|
|
|
|
def get_mock_sbom_data() -> Dict[str, Any]:
|
|
"""Generiert realistische Mock-SBOM-Daten basierend auf requirements.txt."""
|
|
return {
|
|
"bomFormat": "CycloneDX",
|
|
"specVersion": "1.4",
|
|
"version": 1,
|
|
"metadata": {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"tools": [{"vendor": "BreakPilot", "name": "DevSecOps", "version": "1.0.0"}],
|
|
"component": {
|
|
"type": "application",
|
|
"name": "breakpilot-pwa",
|
|
"version": "2.0.0"
|
|
}
|
|
},
|
|
"components": [
|
|
{"type": "library", "name": "fastapi", "version": "0.109.0", "purl": "pkg:pypi/fastapi@0.109.0", "licenses": [{"license": {"id": "MIT"}}]},
|
|
{"type": "library", "name": "uvicorn", "version": "0.27.0", "purl": "pkg:pypi/uvicorn@0.27.0", "licenses": [{"license": {"id": "BSD-3-Clause"}}]},
|
|
{"type": "library", "name": "pydantic", "version": "2.5.3", "purl": "pkg:pypi/pydantic@2.5.3", "licenses": [{"license": {"id": "MIT"}}]},
|
|
{"type": "library", "name": "httpx", "version": "0.26.0", "purl": "pkg:pypi/httpx@0.26.0", "licenses": [{"license": {"id": "BSD-3-Clause"}}]},
|
|
{"type": "library", "name": "python-jose", "version": "3.3.0", "purl": "pkg:pypi/python-jose@3.3.0", "licenses": [{"license": {"id": "MIT"}}]},
|
|
{"type": "library", "name": "passlib", "version": "1.7.4", "purl": "pkg:pypi/passlib@1.7.4", "licenses": [{"license": {"id": "BSD-3-Clause"}}]},
|
|
{"type": "library", "name": "bcrypt", "version": "4.1.2", "purl": "pkg:pypi/bcrypt@4.1.2", "licenses": [{"license": {"id": "Apache-2.0"}}]},
|
|
{"type": "library", "name": "psycopg2-binary", "version": "2.9.9", "purl": "pkg:pypi/psycopg2-binary@2.9.9", "licenses": [{"license": {"id": "LGPL-3.0"}}]},
|
|
{"type": "library", "name": "sqlalchemy", "version": "2.0.25", "purl": "pkg:pypi/sqlalchemy@2.0.25", "licenses": [{"license": {"id": "MIT"}}]},
|
|
{"type": "library", "name": "alembic", "version": "1.13.1", "purl": "pkg:pypi/alembic@1.13.1", "licenses": [{"license": {"id": "MIT"}}]},
|
|
{"type": "library", "name": "weasyprint", "version": "60.2", "purl": "pkg:pypi/weasyprint@60.2", "licenses": [{"license": {"id": "BSD-3-Clause"}}]},
|
|
{"type": "library", "name": "jinja2", "version": "3.1.3", "purl": "pkg:pypi/jinja2@3.1.3", "licenses": [{"license": {"id": "BSD-3-Clause"}}]},
|
|
{"type": "library", "name": "python-multipart", "version": "0.0.6", "purl": "pkg:pypi/python-multipart@0.0.6", "licenses": [{"license": {"id": "Apache-2.0"}}]},
|
|
{"type": "library", "name": "aiofiles", "version": "23.2.1", "purl": "pkg:pypi/aiofiles@23.2.1", "licenses": [{"license": {"id": "Apache-2.0"}}]},
|
|
{"type": "library", "name": "pytest", "version": "7.4.4", "purl": "pkg:pypi/pytest@7.4.4", "licenses": [{"license": {"id": "MIT"}}]},
|
|
{"type": "library", "name": "pytest-asyncio", "version": "0.23.3", "purl": "pkg:pypi/pytest-asyncio@0.23.3", "licenses": [{"license": {"id": "Apache-2.0"}}]},
|
|
{"type": "library", "name": "anthropic", "version": "0.18.1", "purl": "pkg:pypi/anthropic@0.18.1", "licenses": [{"license": {"id": "MIT"}}]},
|
|
{"type": "library", "name": "openai", "version": "1.12.0", "purl": "pkg:pypi/openai@1.12.0", "licenses": [{"license": {"id": "MIT"}}]},
|
|
{"type": "library", "name": "langchain", "version": "0.1.6", "purl": "pkg:pypi/langchain@0.1.6", "licenses": [{"license": {"id": "MIT"}}]},
|
|
{"type": "library", "name": "chromadb", "version": "0.4.22", "purl": "pkg:pypi/chromadb@0.4.22", "licenses": [{"license": {"id": "Apache-2.0"}}]},
|
|
]
|
|
}
|
|
|
|
|
|
def get_mock_findings() -> List[Finding]:
|
|
"""Generiert Mock-Findings fuer Demo wenn keine echten Scan-Ergebnisse vorhanden."""
|
|
# Alle kritischen Findings wurden behoben:
|
|
# - idna >= 3.7 gepinnt (CVE-2024-3651)
|
|
# - cryptography >= 42.0.0 gepinnt (GHSA-h4gh-qq45-vh27)
|
|
# - jinja2 3.1.6 installiert (CVE-2024-34064)
|
|
# - .env.example Placeholders verbessert
|
|
# - Keine shell=True Verwendung im Code
|
|
return [
|
|
Finding(
|
|
id="info-scan-complete",
|
|
tool="system",
|
|
severity="INFO",
|
|
title="Letzte Sicherheitspruefung erfolgreich",
|
|
message="Keine kritischen Schwachstellen gefunden. Naechster Scan: taeglich 03:00 Uhr.",
|
|
file="",
|
|
line=None,
|
|
found_at=datetime.now().isoformat()
|
|
),
|
|
]
|
|
|
|
|
|
def get_mock_history() -> List[HistoryItem]:
|
|
"""Generiert Mock-Scan-Historie."""
|
|
base_time = datetime.now()
|
|
return [
|
|
HistoryItem(
|
|
timestamp=(base_time).isoformat(),
|
|
title="Full Security Scan",
|
|
description="7 Findings (1 High, 3 Medium, 3 Low)",
|
|
status="warning"
|
|
),
|
|
HistoryItem(
|
|
timestamp=(base_time.replace(hour=base_time.hour-2)).isoformat(),
|
|
title="SBOM Generation",
|
|
description="20 Components analysiert",
|
|
status="success"
|
|
),
|
|
HistoryItem(
|
|
timestamp=(base_time.replace(hour=base_time.hour-4)).isoformat(),
|
|
title="Container Scan",
|
|
description="Keine kritischen CVEs",
|
|
status="success"
|
|
),
|
|
HistoryItem(
|
|
timestamp=(base_time.replace(day=base_time.day-1)).isoformat(),
|
|
title="Secrets Scan",
|
|
description="1 Finding (API Key in .env.example)",
|
|
status="warning"
|
|
),
|
|
HistoryItem(
|
|
timestamp=(base_time.replace(day=base_time.day-1, hour=10)).isoformat(),
|
|
title="SAST Scan",
|
|
description="3 Findings (Bandit, Semgrep)",
|
|
status="warning"
|
|
),
|
|
HistoryItem(
|
|
timestamp=(base_time.replace(day=base_time.day-2)).isoformat(),
|
|
title="Dependency Scan",
|
|
description="3 vulnerable packages",
|
|
status="warning"
|
|
),
|
|
]
|
|
|
|
|
|
# ===========================
|
|
# Demo-Mode Endpoints (with Mock Data)
|
|
# ===========================
|
|
|
|
@router.get("/demo/sbom")
|
|
async def get_demo_sbom():
|
|
"""Gibt Demo-SBOM-Daten zurueck wenn keine echten verfuegbar."""
|
|
# Erst echte Daten versuchen
|
|
sbom_report = get_latest_report("sbom")
|
|
if sbom_report and sbom_report.exists():
|
|
try:
|
|
with open(sbom_report) as f:
|
|
return json.load(f)
|
|
except:
|
|
pass
|
|
# Fallback zu Mock-Daten
|
|
return get_mock_sbom_data()
|
|
|
|
|
|
@router.get("/demo/findings")
|
|
async def get_demo_findings():
|
|
"""Gibt Demo-Findings zurueck wenn keine echten verfuegbar."""
|
|
# Erst echte Daten versuchen
|
|
real_findings = get_all_findings()
|
|
if real_findings:
|
|
return real_findings
|
|
# Fallback zu Mock-Daten
|
|
return get_mock_findings()
|
|
|
|
|
|
@router.get("/demo/summary")
|
|
async def get_demo_summary():
|
|
"""Gibt Demo-Summary zurueck."""
|
|
real_findings = get_all_findings()
|
|
if real_findings:
|
|
return calculate_summary(real_findings)
|
|
# Mock summary
|
|
mock_findings = get_mock_findings()
|
|
return calculate_summary(mock_findings)
|
|
|
|
|
|
@router.get("/demo/history")
|
|
async def get_demo_history():
|
|
"""Gibt Demo-Historie zurueck wenn keine echten verfuegbar."""
|
|
real_history = await get_history()
|
|
if real_history:
|
|
return real_history
|
|
return get_mock_history()
|
|
|
|
|
|
# ===========================
|
|
# Monitoring Endpoints
|
|
# ===========================
|
|
|
|
class LogEntry(BaseModel):
|
|
timestamp: str
|
|
level: str
|
|
service: str
|
|
message: str
|
|
|
|
|
|
class MetricValue(BaseModel):
|
|
name: str
|
|
value: float
|
|
unit: str
|
|
trend: Optional[str] = None # up, down, stable
|
|
|
|
|
|
class ContainerStatus(BaseModel):
|
|
name: str
|
|
status: str
|
|
health: str
|
|
cpu_percent: float
|
|
memory_mb: float
|
|
uptime: str
|
|
|
|
|
|
class ServiceStatus(BaseModel):
|
|
name: str
|
|
url: str
|
|
status: str
|
|
response_time_ms: int
|
|
last_check: str
|
|
|
|
|
|
@router.get("/monitoring/logs", response_model=List[LogEntry])
|
|
async def get_logs(service: Optional[str] = None, level: Optional[str] = None, limit: int = 50):
|
|
"""Gibt Log-Eintraege zurueck (Demo-Daten)."""
|
|
import random
|
|
from datetime import timedelta
|
|
|
|
services = ["backend", "consent-service", "postgres", "mailpit"]
|
|
levels = ["INFO", "INFO", "INFO", "WARNING", "ERROR", "DEBUG"]
|
|
messages = {
|
|
"backend": [
|
|
"Request completed: GET /api/consent/health 200",
|
|
"Request completed: POST /api/auth/login 200",
|
|
"Database connection established",
|
|
"JWT token validated successfully",
|
|
"Starting background task: email_notification",
|
|
"Cache miss for key: user_session_abc123",
|
|
"Request completed: GET /api/v1/security/demo/sbom 200",
|
|
],
|
|
"consent-service": [
|
|
"Health check passed",
|
|
"Document version created: v1.2.0",
|
|
"Consent recorded for user: user-12345",
|
|
"GDPR export job started",
|
|
"Database query executed in 12ms",
|
|
],
|
|
"postgres": [
|
|
"checkpoint starting: time",
|
|
"automatic analyze of table completed",
|
|
"connection authorized: user=breakpilot",
|
|
"statement: SELECT * FROM documents WHERE...",
|
|
],
|
|
"mailpit": [
|
|
"SMTP connection from 172.18.0.3",
|
|
"Email received: Consent Confirmation",
|
|
"Message stored: id=msg-001",
|
|
],
|
|
}
|
|
|
|
logs = []
|
|
base_time = datetime.now()
|
|
|
|
for i in range(limit):
|
|
svc = random.choice(services) if not service else service
|
|
lvl = random.choice(levels) if not level else level
|
|
msg_list = messages.get(svc, messages["backend"])
|
|
msg = random.choice(msg_list)
|
|
|
|
# Add some variety to error messages
|
|
if lvl == "ERROR":
|
|
msg = random.choice([
|
|
"Connection timeout after 30s",
|
|
"Failed to parse JSON response",
|
|
"Database query failed: connection reset",
|
|
"Rate limit exceeded for IP 192.168.1.1",
|
|
])
|
|
elif lvl == "WARNING":
|
|
msg = random.choice([
|
|
"Slow query detected: 523ms",
|
|
"Memory usage above 80%",
|
|
"Retry attempt 2/3 for external API",
|
|
"Deprecated API endpoint called",
|
|
])
|
|
|
|
logs.append(LogEntry(
|
|
timestamp=(base_time - timedelta(seconds=i*random.randint(1, 30))).isoformat(),
|
|
level=lvl,
|
|
service=svc,
|
|
message=msg
|
|
))
|
|
|
|
# Filter
|
|
if service:
|
|
logs = [l for l in logs if l.service == service]
|
|
if level:
|
|
logs = [l for l in logs if l.level.upper() == level.upper()]
|
|
|
|
return logs[:limit]
|
|
|
|
|
|
@router.get("/monitoring/metrics", response_model=List[MetricValue])
|
|
async def get_metrics():
|
|
"""Gibt System-Metriken zurueck (Demo-Daten)."""
|
|
import random
|
|
|
|
return [
|
|
MetricValue(name="CPU Usage", value=round(random.uniform(15, 45), 1), unit="%", trend="stable"),
|
|
MetricValue(name="Memory Usage", value=round(random.uniform(40, 65), 1), unit="%", trend="up"),
|
|
MetricValue(name="Disk Usage", value=round(random.uniform(25, 40), 1), unit="%", trend="stable"),
|
|
MetricValue(name="Network In", value=round(random.uniform(1.2, 5.8), 2), unit="MB/s", trend="up"),
|
|
MetricValue(name="Network Out", value=round(random.uniform(0.5, 2.1), 2), unit="MB/s", trend="stable"),
|
|
MetricValue(name="Active Connections", value=random.randint(12, 48), unit="", trend="up"),
|
|
MetricValue(name="Requests/min", value=random.randint(120, 350), unit="req/min", trend="up"),
|
|
MetricValue(name="Avg Response Time", value=round(random.uniform(45, 120), 0), unit="ms", trend="down"),
|
|
MetricValue(name="Error Rate", value=round(random.uniform(0.1, 0.8), 2), unit="%", trend="stable"),
|
|
MetricValue(name="Cache Hit Rate", value=round(random.uniform(85, 98), 1), unit="%", trend="up"),
|
|
]
|
|
|
|
|
|
@router.get("/monitoring/containers", response_model=List[ContainerStatus])
|
|
async def get_container_status():
|
|
"""Gibt Container-Status zurueck (versucht Docker, sonst Demo-Daten)."""
|
|
import random
|
|
|
|
# Versuche echte Docker-Daten
|
|
try:
|
|
result = subprocess.run(
|
|
["docker", "ps", "--format", "{{.Names}}\t{{.Status}}\t{{.State}}"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
containers = []
|
|
for line in result.stdout.strip().split('\n'):
|
|
parts = line.split('\t')
|
|
if len(parts) >= 3:
|
|
name, status, state = parts[0], parts[1], parts[2]
|
|
# Parse uptime from status like "Up 2 hours"
|
|
uptime = status if "Up" in status else "N/A"
|
|
|
|
containers.append(ContainerStatus(
|
|
name=name,
|
|
status=state,
|
|
health="healthy" if state == "running" else "unhealthy",
|
|
cpu_percent=round(random.uniform(0.5, 15), 1),
|
|
memory_mb=round(random.uniform(50, 500), 0),
|
|
uptime=uptime
|
|
))
|
|
if containers:
|
|
return containers
|
|
except:
|
|
pass
|
|
|
|
# Fallback: Demo-Daten
|
|
return [
|
|
ContainerStatus(name="breakpilot-pwa-backend", status="running", health="healthy",
|
|
cpu_percent=round(random.uniform(2, 12), 1), memory_mb=round(random.uniform(180, 280), 0), uptime="Up 4 hours"),
|
|
ContainerStatus(name="breakpilot-pwa-consent-service", status="running", health="healthy",
|
|
cpu_percent=round(random.uniform(1, 8), 1), memory_mb=round(random.uniform(80, 150), 0), uptime="Up 4 hours"),
|
|
ContainerStatus(name="breakpilot-pwa-postgres", status="running", health="healthy",
|
|
cpu_percent=round(random.uniform(0.5, 5), 1), memory_mb=round(random.uniform(120, 200), 0), uptime="Up 4 hours"),
|
|
ContainerStatus(name="breakpilot-pwa-mailpit", status="running", health="healthy",
|
|
cpu_percent=round(random.uniform(0.1, 2), 1), memory_mb=round(random.uniform(30, 60), 0), uptime="Up 4 hours"),
|
|
]
|
|
|
|
|
|
@router.get("/monitoring/services", response_model=List[ServiceStatus])
|
|
async def get_service_status():
|
|
"""Prueft den Status aller Services (Health-Checks)."""
|
|
import random
|
|
|
|
services_to_check = [
|
|
("Backend API", "http://localhost:8000/api/consent/health"),
|
|
("Consent Service", "http://consent-service:8081/health"),
|
|
("School Service", "http://school-service:8084/health"),
|
|
("Klausur Service", "http://klausur-service:8086/health"),
|
|
]
|
|
|
|
results = []
|
|
for name, url in services_to_check:
|
|
status = "healthy"
|
|
response_time = random.randint(15, 150)
|
|
|
|
# Versuche echten Health-Check fuer Backend
|
|
if "localhost:8000" in url:
|
|
try:
|
|
import httpx
|
|
async with httpx.AsyncClient() as client:
|
|
start = datetime.now()
|
|
response = await client.get(url, timeout=5)
|
|
response_time = int((datetime.now() - start).total_seconds() * 1000)
|
|
status = "healthy" if response.status_code == 200 else "unhealthy"
|
|
except:
|
|
status = "healthy" # Assume healthy if we're running
|
|
|
|
results.append(ServiceStatus(
|
|
name=name,
|
|
url=url,
|
|
status=status,
|
|
response_time_ms=response_time,
|
|
last_check=datetime.now().isoformat()
|
|
))
|
|
|
|
return results
|