Files
breakpilot-core/backend-core/security_api.py
Benjamin Admin 92c86ec6ba [split-required] [guardrail-change] Enforce 500 LOC budget across all services
Install LOC guardrails (check-loc.sh, architecture.md, pre-commit hook)
and split all 44 files exceeding 500 LOC into domain-focused modules:

- consent-service (Go): models, handlers, services, database splits
- backend-core (Python): security_api, rbac_api, pdf_service, auth splits
- admin-core (TypeScript): 5 page.tsx + sidebar extractions
- pitch-deck (TypeScript): 6 slides, 3 UI components, engine.ts splits
- voice-service (Python): enhanced_task_orchestrator split

Result: 0 violations, 36 exempted (pipeline, tests, pure-data files).
Go build verified clean. No behavior changes — pure structural splits.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 00:09:30 +02:00

375 lines
11 KiB
Python

"""
BreakPilot Security API
Endpunkte fuer das Security Dashboard:
- Tool-Status abfragen
- Scan-Ergebnisse abrufen
- Scans ausloesen
- SBOM-Daten abrufen
- Scan-Historie anzeigen
Features:
- Liest Security-Reports aus dem security-reports/ Verzeichnis
- Fuehrt Security-Scans via subprocess aus
- Parst Gitleaks, Semgrep, Trivy, Grype JSON-Reports
- Generiert SBOM mit Syft
Split structure:
- security_models.py — Pydantic models
- security_report_parsers.py — Report parsing, tool detection, aggregation
- security_mock_data.py — Mock data generators + /demo/* endpoints
- security_monitoring.py — /monitoring/* endpoints (logs, metrics, containers)
"""
import json
import subprocess
from datetime import datetime
from typing import List, Optional
from fastapi import APIRouter, HTTPException, BackgroundTasks
from security_models import (
ToolStatus,
Finding,
SeveritySummary,
HistoryItem,
)
from security_report_parsers import (
REPORTS_DIR,
PROJECT_ROOT,
check_tool_installed,
get_latest_report,
get_all_findings,
calculate_summary,
)
from security_mock_data import (
get_mock_findings,
get_mock_sbom_data,
get_mock_history,
router as mock_data_router,
)
from security_monitoring import router as monitoring_router
router = APIRouter(prefix="/v1/security", tags=["Security"])
# Include sub-routers (they share the same prefix/tags)
router.include_router(mock_data_router, prefix="", tags=["Security"])
router.include_router(monitoring_router, prefix="", tags=["Security"])
# ===========================
# API Endpoints
# ===========================
@router.get("/tools", response_model=List[ToolStatus])
async def get_tool_status():
"""Gibt den Status aller DevSecOps-Tools zurueck."""
tools = []
tool_names = ["gitleaks", "semgrep", "bandit", "trivy", "grype", "syft"]
for tool_name in tool_names:
installed, version = check_tool_installed(tool_name)
# Letzten Report finden
last_run = None
last_findings = 0
report = get_latest_report(tool_name)
if report:
last_run = datetime.fromtimestamp(report.stat().st_mtime).strftime("%d.%m.%Y %H:%M")
tools.append(ToolStatus(
name=tool_name.capitalize(),
installed=installed,
version=version,
last_run=last_run,
last_findings=last_findings
))
return tools
@router.get("/findings", response_model=List[Finding])
async def get_findings(
tool: Optional[str] = None,
severity: Optional[str] = None,
limit: int = 100
):
"""Gibt alle Security-Findings zurueck."""
findings = get_all_findings()
# Fallback zu Mock-Daten wenn keine echten vorhanden
if not findings:
findings = get_mock_findings()
# Filter by tool
if tool:
findings = [f for f in findings if f.tool.lower() == tool.lower()]
# Filter by severity
if severity:
findings = [f for f in findings if f.severity.upper() == severity.upper()]
# Sort by severity (critical first)
severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4, "UNKNOWN": 5}
findings.sort(key=lambda f: severity_order.get(f.severity.upper(), 5))
return findings[:limit]
@router.get("/summary", response_model=SeveritySummary)
async def get_summary():
"""Gibt eine Zusammenfassung der Findings nach Severity zurueck."""
findings = get_all_findings()
# Fallback zu Mock-Daten wenn keine echten vorhanden
if not findings:
findings = get_mock_findings()
return calculate_summary(findings)
@router.get("/sbom")
async def get_sbom():
"""Gibt das aktuelle SBOM zurueck."""
sbom_report = get_latest_report("sbom")
if not sbom_report:
# Versuche CycloneDX Format
sbom_report = get_latest_report("sbom-")
if not sbom_report or not sbom_report.exists():
# Fallback zu Mock-Daten
return get_mock_sbom_data()
try:
with open(sbom_report) as f:
data = json.load(f)
return data
except (json.JSONDecodeError, FileNotFoundError):
# Fallback zu Mock-Daten
return get_mock_sbom_data()
@router.get("/history", response_model=List[HistoryItem])
async def get_history(limit: int = 20):
"""Gibt die Scan-Historie zurueck."""
history = []
if REPORTS_DIR.exists():
# Alle JSON-Reports sammeln
reports = list(REPORTS_DIR.glob("*.json"))
reports.sort(key=lambda p: p.stat().st_mtime, reverse=True)
for report in reports[:limit]:
tool_name = report.stem.split("-")[0]
timestamp = datetime.fromtimestamp(report.stat().st_mtime).isoformat()
# Status basierend auf Findings bestimmen
status = "success"
findings_count = 0
try:
with open(report) as f:
data = json.load(f)
if isinstance(data, list):
findings_count = len(data)
elif isinstance(data, dict):
findings_count = (
len(data.get("results", []))
or len(data.get("matches", []))
or len(data.get("Results", []))
)
if findings_count > 0:
status = "warning"
except Exception:
pass
history.append(HistoryItem(
timestamp=timestamp,
title=f"{tool_name.capitalize()} Scan",
description=f"{findings_count} Findings" if findings_count > 0 else "Keine Findings",
status=status
))
# Fallback zu Mock-Daten wenn keine echten vorhanden
if not history:
history = get_mock_history()
# Apply limit to final result (including mock data)
return history[:limit]
@router.get("/reports/{tool}")
async def get_tool_report(tool: str):
"""Gibt den vollstaendigen Report eines Tools zurueck."""
report = get_latest_report(tool.lower())
if not report or not report.exists():
raise HTTPException(status_code=404, detail=f"Kein Report fuer {tool} gefunden")
try:
with open(report) as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError) as e:
raise HTTPException(status_code=500, detail=f"Fehler beim Lesen des Reports: {str(e)}")
@router.post("/scan/{scan_type}")
async def run_scan(scan_type: str, background_tasks: BackgroundTasks):
"""
Startet einen Security-Scan.
scan_type kann sein:
- secrets (Gitleaks)
- sast (Semgrep, Bandit)
- deps (Trivy, Grype)
- containers (Trivy image)
- sbom (Syft)
- all (Alle Scans)
"""
valid_types = ["secrets", "sast", "deps", "containers", "sbom", "all"]
if scan_type not in valid_types:
raise HTTPException(
status_code=400,
detail=f"Ungueltiger Scan-Typ. Erlaubt: {', '.join(valid_types)}"
)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
async def run_scan_async(st: str):
"""Fuehrt den Scan asynchron aus."""
try:
if st in ("secrets", "all"):
_run_secrets_scan(timestamp)
if st in ("sast", "all"):
_run_sast_scan(timestamp)
if st in ("deps", "all"):
_run_deps_scan(timestamp)
if st in ("sbom", "all"):
_run_sbom_scan(timestamp)
if st in ("containers", "all"):
_run_container_scan(timestamp)
except subprocess.TimeoutExpired:
pass
except Exception as e:
print(f"Scan error: {e}")
# Scan im Hintergrund ausfuehren
background_tasks.add_task(run_scan_async, scan_type)
return {
"status": "started",
"scan_type": scan_type,
"timestamp": timestamp,
"message": f"Scan '{scan_type}' wurde gestartet"
}
@router.get("/health")
async def health_check():
"""Health-Check fuer die Security API."""
tools_installed = 0
for tool in ["gitleaks", "semgrep", "bandit", "trivy", "grype", "syft"]:
installed, _ = check_tool_installed(tool)
if installed:
tools_installed += 1
return {
"status": "healthy",
"tools_installed": tools_installed,
"tools_total": 6,
"reports_dir": str(REPORTS_DIR),
"reports_exist": REPORTS_DIR.exists()
}
# ===========================
# Scan Helper Functions
# ===========================
def _run_secrets_scan(timestamp: str):
"""Gitleaks scan."""
installed, _ = check_tool_installed("gitleaks")
if installed:
subprocess.run(
["gitleaks", "detect", "--source", str(PROJECT_ROOT),
"--config", str(PROJECT_ROOT / ".gitleaks.toml"),
"--report-path", str(REPORTS_DIR / f"gitleaks-{timestamp}.json"),
"--report-format", "json"],
capture_output=True,
timeout=300
)
def _run_sast_scan(timestamp: str):
"""Semgrep + Bandit scan."""
installed, _ = check_tool_installed("semgrep")
if installed:
subprocess.run(
["semgrep", "scan", "--config", "auto",
"--config", str(PROJECT_ROOT / ".semgrep.yml"),
"--json", "--output", str(REPORTS_DIR / f"semgrep-{timestamp}.json")],
capture_output=True,
timeout=600,
cwd=str(PROJECT_ROOT)
)
installed, _ = check_tool_installed("bandit")
if installed:
subprocess.run(
["bandit", "-r", str(PROJECT_ROOT / "backend"), "-ll",
"-x", str(PROJECT_ROOT / "backend" / "tests"),
"-f", "json", "-o", str(REPORTS_DIR / f"bandit-{timestamp}.json")],
capture_output=True,
timeout=300
)
def _run_deps_scan(timestamp: str):
"""Trivy filesystem + Grype scan."""
installed, _ = check_tool_installed("trivy")
if installed:
subprocess.run(
["trivy", "fs", str(PROJECT_ROOT),
"--config", str(PROJECT_ROOT / ".trivy.yaml"),
"--format", "json",
"--output", str(REPORTS_DIR / f"trivy-fs-{timestamp}.json")],
capture_output=True,
timeout=600
)
installed, _ = check_tool_installed("grype")
if installed:
result = subprocess.run(
["grype", f"dir:{PROJECT_ROOT}", "-o", "json"],
capture_output=True,
text=True,
timeout=600
)
if result.stdout:
with open(REPORTS_DIR / f"grype-{timestamp}.json", "w") as f:
f.write(result.stdout)
def _run_sbom_scan(timestamp: str):
"""Syft SBOM generation."""
installed, _ = check_tool_installed("syft")
if installed:
subprocess.run(
["syft", f"dir:{PROJECT_ROOT}",
"-o", f"cyclonedx-json={REPORTS_DIR / f'sbom-{timestamp}.json'}"],
capture_output=True,
timeout=300
)
def _run_container_scan(timestamp: str):
"""Trivy image scan."""
installed, _ = check_tool_installed("trivy")
if installed:
images = ["breakpilot-pwa-backend", "breakpilot-pwa-consent-service"]
for image in images:
subprocess.run(
["trivy", "image", image,
"--format", "json",
"--output", str(REPORTS_DIR / f"trivy-image-{image}-{timestamp}.json")],
capture_output=True,
timeout=600
)