Install LOC guardrails (check-loc.sh, architecture.md, pre-commit hook) and split all 44 files exceeding 500 LOC into domain-focused modules: - consent-service (Go): models, handlers, services, database splits - backend-core (Python): security_api, rbac_api, pdf_service, auth splits - admin-core (TypeScript): 5 page.tsx + sidebar extractions - pitch-deck (TypeScript): 6 slides, 3 UI components, engine.ts splits - voice-service (Python): enhanced_task_orchestrator split Result: 0 violations, 36 exempted (pipeline, tests, pure-data files). Go build verified clean. No behavior changes — pure structural splits. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
375 lines
11 KiB
Python
375 lines
11 KiB
Python
"""
|
|
BreakPilot Security API
|
|
|
|
Endpunkte fuer das Security Dashboard:
|
|
- Tool-Status abfragen
|
|
- Scan-Ergebnisse abrufen
|
|
- Scans ausloesen
|
|
- SBOM-Daten abrufen
|
|
- Scan-Historie anzeigen
|
|
|
|
Features:
|
|
- Liest Security-Reports aus dem security-reports/ Verzeichnis
|
|
- Fuehrt Security-Scans via subprocess aus
|
|
- Parst Gitleaks, Semgrep, Trivy, Grype JSON-Reports
|
|
- Generiert SBOM mit Syft
|
|
|
|
Split structure:
|
|
- security_models.py — Pydantic models
|
|
- security_report_parsers.py — Report parsing, tool detection, aggregation
|
|
- security_mock_data.py — Mock data generators + /demo/* endpoints
|
|
- security_monitoring.py — /monitoring/* endpoints (logs, metrics, containers)
|
|
"""
|
|
|
|
import json
|
|
import subprocess
|
|
from datetime import datetime
|
|
from typing import List, Optional
|
|
from fastapi import APIRouter, HTTPException, BackgroundTasks
|
|
|
|
from security_models import (
|
|
ToolStatus,
|
|
Finding,
|
|
SeveritySummary,
|
|
HistoryItem,
|
|
)
|
|
from security_report_parsers import (
|
|
REPORTS_DIR,
|
|
PROJECT_ROOT,
|
|
check_tool_installed,
|
|
get_latest_report,
|
|
get_all_findings,
|
|
calculate_summary,
|
|
)
|
|
from security_mock_data import (
|
|
get_mock_findings,
|
|
get_mock_sbom_data,
|
|
get_mock_history,
|
|
router as mock_data_router,
|
|
)
|
|
from security_monitoring import router as monitoring_router
|
|
|
|
router = APIRouter(prefix="/v1/security", tags=["Security"])
|
|
|
|
# Include sub-routers (they share the same prefix/tags)
|
|
router.include_router(mock_data_router, prefix="", tags=["Security"])
|
|
router.include_router(monitoring_router, prefix="", tags=["Security"])
|
|
|
|
|
|
# ===========================
|
|
# API Endpoints
|
|
# ===========================
|
|
|
|
@router.get("/tools", response_model=List[ToolStatus])
|
|
async def get_tool_status():
|
|
"""Gibt den Status aller DevSecOps-Tools zurueck."""
|
|
tools = []
|
|
|
|
tool_names = ["gitleaks", "semgrep", "bandit", "trivy", "grype", "syft"]
|
|
|
|
for tool_name in tool_names:
|
|
installed, version = check_tool_installed(tool_name)
|
|
|
|
# Letzten Report finden
|
|
last_run = None
|
|
last_findings = 0
|
|
report = get_latest_report(tool_name)
|
|
if report:
|
|
last_run = datetime.fromtimestamp(report.stat().st_mtime).strftime("%d.%m.%Y %H:%M")
|
|
|
|
tools.append(ToolStatus(
|
|
name=tool_name.capitalize(),
|
|
installed=installed,
|
|
version=version,
|
|
last_run=last_run,
|
|
last_findings=last_findings
|
|
))
|
|
|
|
return tools
|
|
|
|
|
|
@router.get("/findings", response_model=List[Finding])
|
|
async def get_findings(
|
|
tool: Optional[str] = None,
|
|
severity: Optional[str] = None,
|
|
limit: int = 100
|
|
):
|
|
"""Gibt alle Security-Findings zurueck."""
|
|
findings = get_all_findings()
|
|
|
|
# Fallback zu Mock-Daten wenn keine echten vorhanden
|
|
if not findings:
|
|
findings = get_mock_findings()
|
|
|
|
# Filter by tool
|
|
if tool:
|
|
findings = [f for f in findings if f.tool.lower() == tool.lower()]
|
|
|
|
# Filter by severity
|
|
if severity:
|
|
findings = [f for f in findings if f.severity.upper() == severity.upper()]
|
|
|
|
# Sort by severity (critical first)
|
|
severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4, "UNKNOWN": 5}
|
|
findings.sort(key=lambda f: severity_order.get(f.severity.upper(), 5))
|
|
|
|
return findings[:limit]
|
|
|
|
|
|
@router.get("/summary", response_model=SeveritySummary)
|
|
async def get_summary():
|
|
"""Gibt eine Zusammenfassung der Findings nach Severity zurueck."""
|
|
findings = get_all_findings()
|
|
# Fallback zu Mock-Daten wenn keine echten vorhanden
|
|
if not findings:
|
|
findings = get_mock_findings()
|
|
return calculate_summary(findings)
|
|
|
|
|
|
@router.get("/sbom")
|
|
async def get_sbom():
|
|
"""Gibt das aktuelle SBOM zurueck."""
|
|
sbom_report = get_latest_report("sbom")
|
|
if not sbom_report:
|
|
# Versuche CycloneDX Format
|
|
sbom_report = get_latest_report("sbom-")
|
|
|
|
if not sbom_report or not sbom_report.exists():
|
|
# Fallback zu Mock-Daten
|
|
return get_mock_sbom_data()
|
|
|
|
try:
|
|
with open(sbom_report) as f:
|
|
data = json.load(f)
|
|
return data
|
|
except (json.JSONDecodeError, FileNotFoundError):
|
|
# Fallback zu Mock-Daten
|
|
return get_mock_sbom_data()
|
|
|
|
|
|
@router.get("/history", response_model=List[HistoryItem])
|
|
async def get_history(limit: int = 20):
|
|
"""Gibt die Scan-Historie zurueck."""
|
|
history = []
|
|
|
|
if REPORTS_DIR.exists():
|
|
# Alle JSON-Reports sammeln
|
|
reports = list(REPORTS_DIR.glob("*.json"))
|
|
reports.sort(key=lambda p: p.stat().st_mtime, reverse=True)
|
|
|
|
for report in reports[:limit]:
|
|
tool_name = report.stem.split("-")[0]
|
|
timestamp = datetime.fromtimestamp(report.stat().st_mtime).isoformat()
|
|
|
|
# Status basierend auf Findings bestimmen
|
|
status = "success"
|
|
findings_count = 0
|
|
try:
|
|
with open(report) as f:
|
|
data = json.load(f)
|
|
if isinstance(data, list):
|
|
findings_count = len(data)
|
|
elif isinstance(data, dict):
|
|
findings_count = (
|
|
len(data.get("results", []))
|
|
or len(data.get("matches", []))
|
|
or len(data.get("Results", []))
|
|
)
|
|
|
|
if findings_count > 0:
|
|
status = "warning"
|
|
except Exception:
|
|
pass
|
|
|
|
history.append(HistoryItem(
|
|
timestamp=timestamp,
|
|
title=f"{tool_name.capitalize()} Scan",
|
|
description=f"{findings_count} Findings" if findings_count > 0 else "Keine Findings",
|
|
status=status
|
|
))
|
|
|
|
# Fallback zu Mock-Daten wenn keine echten vorhanden
|
|
if not history:
|
|
history = get_mock_history()
|
|
|
|
# Apply limit to final result (including mock data)
|
|
return history[:limit]
|
|
|
|
|
|
@router.get("/reports/{tool}")
|
|
async def get_tool_report(tool: str):
|
|
"""Gibt den vollstaendigen Report eines Tools zurueck."""
|
|
report = get_latest_report(tool.lower())
|
|
if not report or not report.exists():
|
|
raise HTTPException(status_code=404, detail=f"Kein Report fuer {tool} gefunden")
|
|
|
|
try:
|
|
with open(report) as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, FileNotFoundError) as e:
|
|
raise HTTPException(status_code=500, detail=f"Fehler beim Lesen des Reports: {str(e)}")
|
|
|
|
|
|
@router.post("/scan/{scan_type}")
|
|
async def run_scan(scan_type: str, background_tasks: BackgroundTasks):
|
|
"""
|
|
Startet einen Security-Scan.
|
|
|
|
scan_type kann sein:
|
|
- secrets (Gitleaks)
|
|
- sast (Semgrep, Bandit)
|
|
- deps (Trivy, Grype)
|
|
- containers (Trivy image)
|
|
- sbom (Syft)
|
|
- all (Alle Scans)
|
|
"""
|
|
valid_types = ["secrets", "sast", "deps", "containers", "sbom", "all"]
|
|
if scan_type not in valid_types:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Ungueltiger Scan-Typ. Erlaubt: {', '.join(valid_types)}"
|
|
)
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
async def run_scan_async(st: str):
|
|
"""Fuehrt den Scan asynchron aus."""
|
|
try:
|
|
if st in ("secrets", "all"):
|
|
_run_secrets_scan(timestamp)
|
|
if st in ("sast", "all"):
|
|
_run_sast_scan(timestamp)
|
|
if st in ("deps", "all"):
|
|
_run_deps_scan(timestamp)
|
|
if st in ("sbom", "all"):
|
|
_run_sbom_scan(timestamp)
|
|
if st in ("containers", "all"):
|
|
_run_container_scan(timestamp)
|
|
except subprocess.TimeoutExpired:
|
|
pass
|
|
except Exception as e:
|
|
print(f"Scan error: {e}")
|
|
|
|
# Scan im Hintergrund ausfuehren
|
|
background_tasks.add_task(run_scan_async, scan_type)
|
|
|
|
return {
|
|
"status": "started",
|
|
"scan_type": scan_type,
|
|
"timestamp": timestamp,
|
|
"message": f"Scan '{scan_type}' wurde gestartet"
|
|
}
|
|
|
|
|
|
@router.get("/health")
|
|
async def health_check():
|
|
"""Health-Check fuer die Security API."""
|
|
tools_installed = 0
|
|
for tool in ["gitleaks", "semgrep", "bandit", "trivy", "grype", "syft"]:
|
|
installed, _ = check_tool_installed(tool)
|
|
if installed:
|
|
tools_installed += 1
|
|
|
|
return {
|
|
"status": "healthy",
|
|
"tools_installed": tools_installed,
|
|
"tools_total": 6,
|
|
"reports_dir": str(REPORTS_DIR),
|
|
"reports_exist": REPORTS_DIR.exists()
|
|
}
|
|
|
|
|
|
# ===========================
|
|
# Scan Helper Functions
|
|
# ===========================
|
|
|
|
def _run_secrets_scan(timestamp: str):
|
|
"""Gitleaks scan."""
|
|
installed, _ = check_tool_installed("gitleaks")
|
|
if installed:
|
|
subprocess.run(
|
|
["gitleaks", "detect", "--source", str(PROJECT_ROOT),
|
|
"--config", str(PROJECT_ROOT / ".gitleaks.toml"),
|
|
"--report-path", str(REPORTS_DIR / f"gitleaks-{timestamp}.json"),
|
|
"--report-format", "json"],
|
|
capture_output=True,
|
|
timeout=300
|
|
)
|
|
|
|
|
|
def _run_sast_scan(timestamp: str):
|
|
"""Semgrep + Bandit scan."""
|
|
installed, _ = check_tool_installed("semgrep")
|
|
if installed:
|
|
subprocess.run(
|
|
["semgrep", "scan", "--config", "auto",
|
|
"--config", str(PROJECT_ROOT / ".semgrep.yml"),
|
|
"--json", "--output", str(REPORTS_DIR / f"semgrep-{timestamp}.json")],
|
|
capture_output=True,
|
|
timeout=600,
|
|
cwd=str(PROJECT_ROOT)
|
|
)
|
|
|
|
installed, _ = check_tool_installed("bandit")
|
|
if installed:
|
|
subprocess.run(
|
|
["bandit", "-r", str(PROJECT_ROOT / "backend"), "-ll",
|
|
"-x", str(PROJECT_ROOT / "backend" / "tests"),
|
|
"-f", "json", "-o", str(REPORTS_DIR / f"bandit-{timestamp}.json")],
|
|
capture_output=True,
|
|
timeout=300
|
|
)
|
|
|
|
|
|
def _run_deps_scan(timestamp: str):
|
|
"""Trivy filesystem + Grype scan."""
|
|
installed, _ = check_tool_installed("trivy")
|
|
if installed:
|
|
subprocess.run(
|
|
["trivy", "fs", str(PROJECT_ROOT),
|
|
"--config", str(PROJECT_ROOT / ".trivy.yaml"),
|
|
"--format", "json",
|
|
"--output", str(REPORTS_DIR / f"trivy-fs-{timestamp}.json")],
|
|
capture_output=True,
|
|
timeout=600
|
|
)
|
|
|
|
installed, _ = check_tool_installed("grype")
|
|
if installed:
|
|
result = subprocess.run(
|
|
["grype", f"dir:{PROJECT_ROOT}", "-o", "json"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=600
|
|
)
|
|
if result.stdout:
|
|
with open(REPORTS_DIR / f"grype-{timestamp}.json", "w") as f:
|
|
f.write(result.stdout)
|
|
|
|
|
|
def _run_sbom_scan(timestamp: str):
|
|
"""Syft SBOM generation."""
|
|
installed, _ = check_tool_installed("syft")
|
|
if installed:
|
|
subprocess.run(
|
|
["syft", f"dir:{PROJECT_ROOT}",
|
|
"-o", f"cyclonedx-json={REPORTS_DIR / f'sbom-{timestamp}.json'}"],
|
|
capture_output=True,
|
|
timeout=300
|
|
)
|
|
|
|
|
|
def _run_container_scan(timestamp: str):
|
|
"""Trivy image scan."""
|
|
installed, _ = check_tool_installed("trivy")
|
|
if installed:
|
|
images = ["breakpilot-pwa-backend", "breakpilot-pwa-consent-service"]
|
|
for image in images:
|
|
subprocess.run(
|
|
["trivy", "image", image,
|
|
"--format", "json",
|
|
"--output", str(REPORTS_DIR / f"trivy-image-{image}-{timestamp}.json")],
|
|
capture_output=True,
|
|
timeout=600
|
|
)
|