""" BreakPilot Security API Endpunkte fuer das Security Dashboard: - Tool-Status abfragen - Scan-Ergebnisse abrufen - Scans ausloesen - SBOM-Daten abrufen - Scan-Historie anzeigen Features: - Liest Security-Reports aus dem security-reports/ Verzeichnis - Fuehrt Security-Scans via subprocess aus - Parst Gitleaks, Semgrep, Trivy, Grype JSON-Reports - Generiert SBOM mit Syft """ import os import json import subprocess import asyncio from datetime import datetime from pathlib import Path from typing import List, Dict, Any, Optional from fastapi import APIRouter, HTTPException, BackgroundTasks from pydantic import BaseModel router = APIRouter(prefix="/v1/security", tags=["Security"]) # Pfade - innerhalb des Backend-Verzeichnisses # In Docker: /app/security-reports, /app/scripts # Lokal: backend/security-reports, backend/scripts BACKEND_DIR = Path(__file__).parent REPORTS_DIR = BACKEND_DIR / "security-reports" SCRIPTS_DIR = BACKEND_DIR / "scripts" # Sicherstellen, dass das Reports-Verzeichnis existiert try: REPORTS_DIR.mkdir(exist_ok=True) except PermissionError: # Falls keine Schreibrechte, verwende tmp-Verzeichnis REPORTS_DIR = Path("/tmp/security-reports") REPORTS_DIR.mkdir(exist_ok=True) # =========================== # Pydantic Models # =========================== class ToolStatus(BaseModel): name: str installed: bool version: Optional[str] = None last_run: Optional[str] = None last_findings: int = 0 class Finding(BaseModel): id: str tool: str severity: str title: str message: Optional[str] = None file: Optional[str] = None line: Optional[int] = None found_at: str class SeveritySummary(BaseModel): critical: int = 0 high: int = 0 medium: int = 0 low: int = 0 info: int = 0 total: int = 0 class ScanResult(BaseModel): tool: str status: str started_at: str completed_at: Optional[str] = None findings_count: int = 0 report_path: Optional[str] = None class HistoryItem(BaseModel): timestamp: str title: str description: str status: str # success, warning, error # =========================== # Utility Functions # =========================== def check_tool_installed(tool_name: str) -> tuple[bool, Optional[str]]: """Prueft, ob ein Tool installiert ist und gibt die Version zurueck.""" try: if tool_name == "gitleaks": result = subprocess.run(["gitleaks", "version"], capture_output=True, text=True, timeout=5) if result.returncode == 0: return True, result.stdout.strip() elif tool_name == "semgrep": result = subprocess.run(["semgrep", "--version"], capture_output=True, text=True, timeout=5) if result.returncode == 0: return True, result.stdout.strip().split('\n')[0] elif tool_name == "bandit": result = subprocess.run(["bandit", "--version"], capture_output=True, text=True, timeout=5) if result.returncode == 0: return True, result.stdout.strip() elif tool_name == "trivy": result = subprocess.run(["trivy", "version"], capture_output=True, text=True, timeout=5) if result.returncode == 0: # Parse "Version: 0.48.x" for line in result.stdout.split('\n'): if line.startswith('Version:'): return True, line.split(':')[1].strip() return True, result.stdout.strip().split('\n')[0] elif tool_name == "grype": result = subprocess.run(["grype", "version"], capture_output=True, text=True, timeout=5) if result.returncode == 0: return True, result.stdout.strip().split('\n')[0] elif tool_name == "syft": result = subprocess.run(["syft", "version"], capture_output=True, text=True, timeout=5) if result.returncode == 0: return True, result.stdout.strip().split('\n')[0] except (subprocess.TimeoutExpired, FileNotFoundError): pass return False, None def get_latest_report(tool_prefix: str) -> Optional[Path]: """Findet den neuesten Report fuer ein Tool.""" if not REPORTS_DIR.exists(): return None reports = list(REPORTS_DIR.glob(f"{tool_prefix}*.json")) if not reports: return None return max(reports, key=lambda p: p.stat().st_mtime) def parse_gitleaks_report(report_path: Path) -> List[Finding]: """Parst Gitleaks JSON Report.""" findings = [] try: with open(report_path) as f: data = json.load(f) if isinstance(data, list): for item in data: findings.append(Finding( id=item.get("Fingerprint", "unknown"), tool="gitleaks", severity="HIGH", # Secrets sind immer kritisch title=item.get("Description", "Secret detected"), message=f"Rule: {item.get('RuleID', 'unknown')}", file=item.get("File", ""), line=item.get("StartLine", 0), found_at=datetime.fromtimestamp(report_path.stat().st_mtime).isoformat() )) except (json.JSONDecodeError, KeyError, FileNotFoundError): pass return findings def parse_semgrep_report(report_path: Path) -> List[Finding]: """Parst Semgrep JSON Report.""" findings = [] try: with open(report_path) as f: data = json.load(f) results = data.get("results", []) for item in results: severity = item.get("extra", {}).get("severity", "INFO").upper() findings.append(Finding( id=item.get("check_id", "unknown"), tool="semgrep", severity=severity, title=item.get("extra", {}).get("message", "Finding"), message=item.get("check_id", ""), file=item.get("path", ""), line=item.get("start", {}).get("line", 0), found_at=datetime.fromtimestamp(report_path.stat().st_mtime).isoformat() )) except (json.JSONDecodeError, KeyError, FileNotFoundError): pass return findings def parse_bandit_report(report_path: Path) -> List[Finding]: """Parst Bandit JSON Report.""" findings = [] try: with open(report_path) as f: data = json.load(f) results = data.get("results", []) for item in results: severity = item.get("issue_severity", "LOW").upper() findings.append(Finding( id=item.get("test_id", "unknown"), tool="bandit", severity=severity, title=item.get("issue_text", "Finding"), message=f"CWE: {item.get('issue_cwe', {}).get('id', 'N/A')}", file=item.get("filename", ""), line=item.get("line_number", 0), found_at=datetime.fromtimestamp(report_path.stat().st_mtime).isoformat() )) except (json.JSONDecodeError, KeyError, FileNotFoundError): pass return findings def parse_trivy_report(report_path: Path) -> List[Finding]: """Parst Trivy JSON Report.""" findings = [] try: with open(report_path) as f: data = json.load(f) results = data.get("Results", []) for result in results: vulnerabilities = result.get("Vulnerabilities", []) or [] target = result.get("Target", "") for vuln in vulnerabilities: severity = vuln.get("Severity", "UNKNOWN").upper() findings.append(Finding( id=vuln.get("VulnerabilityID", "unknown"), tool="trivy", severity=severity, title=vuln.get("Title", vuln.get("VulnerabilityID", "CVE")), message=f"{vuln.get('PkgName', '')} {vuln.get('InstalledVersion', '')}", file=target, line=None, found_at=datetime.fromtimestamp(report_path.stat().st_mtime).isoformat() )) except (json.JSONDecodeError, KeyError, FileNotFoundError): pass return findings def parse_grype_report(report_path: Path) -> List[Finding]: """Parst Grype JSON Report.""" findings = [] try: with open(report_path) as f: data = json.load(f) matches = data.get("matches", []) for match in matches: vuln = match.get("vulnerability", {}) artifact = match.get("artifact", {}) severity = vuln.get("severity", "Unknown").upper() findings.append(Finding( id=vuln.get("id", "unknown"), tool="grype", severity=severity, title=vuln.get("description", vuln.get("id", "CVE"))[:100], message=f"{artifact.get('name', '')} {artifact.get('version', '')}", file=artifact.get("locations", [{}])[0].get("path", "") if artifact.get("locations") else "", line=None, found_at=datetime.fromtimestamp(report_path.stat().st_mtime).isoformat() )) except (json.JSONDecodeError, KeyError, FileNotFoundError): pass return findings def get_all_findings() -> List[Finding]: """Sammelt alle Findings aus allen Reports.""" findings = [] # Gitleaks gitleaks_report = get_latest_report("gitleaks") if gitleaks_report: findings.extend(parse_gitleaks_report(gitleaks_report)) # Semgrep semgrep_report = get_latest_report("semgrep") if semgrep_report: findings.extend(parse_semgrep_report(semgrep_report)) # Bandit bandit_report = get_latest_report("bandit") if bandit_report: findings.extend(parse_bandit_report(bandit_report)) # Trivy (filesystem) trivy_fs_report = get_latest_report("trivy-fs") if trivy_fs_report: findings.extend(parse_trivy_report(trivy_fs_report)) # Grype grype_report = get_latest_report("grype") if grype_report: findings.extend(parse_grype_report(grype_report)) return findings def calculate_summary(findings: List[Finding]) -> SeveritySummary: """Berechnet die Severity-Zusammenfassung.""" summary = SeveritySummary() for finding in findings: severity = finding.severity.upper() if severity == "CRITICAL": summary.critical += 1 elif severity == "HIGH": summary.high += 1 elif severity == "MEDIUM": summary.medium += 1 elif severity == "LOW": summary.low += 1 else: summary.info += 1 summary.total = len(findings) return summary # =========================== # API Endpoints # =========================== @router.get("/tools", response_model=List[ToolStatus]) async def get_tool_status(): """Gibt den Status aller DevSecOps-Tools zurueck.""" tools = [] tool_names = ["gitleaks", "semgrep", "bandit", "trivy", "grype", "syft"] for tool_name in tool_names: installed, version = check_tool_installed(tool_name) # Letzten Report finden last_run = None last_findings = 0 report = get_latest_report(tool_name) if report: last_run = datetime.fromtimestamp(report.stat().st_mtime).strftime("%d.%m.%Y %H:%M") tools.append(ToolStatus( name=tool_name.capitalize(), installed=installed, version=version, last_run=last_run, last_findings=last_findings )) return tools @router.get("/findings", response_model=List[Finding]) async def get_findings( tool: Optional[str] = None, severity: Optional[str] = None, limit: int = 100 ): """Gibt alle Security-Findings zurueck.""" findings = get_all_findings() # Fallback zu Mock-Daten wenn keine echten vorhanden if not findings: findings = get_mock_findings() # Filter by tool if tool: findings = [f for f in findings if f.tool.lower() == tool.lower()] # Filter by severity if severity: findings = [f for f in findings if f.severity.upper() == severity.upper()] # Sort by severity (critical first) severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "INFO": 4, "UNKNOWN": 5} findings.sort(key=lambda f: severity_order.get(f.severity.upper(), 5)) return findings[:limit] @router.get("/summary", response_model=SeveritySummary) async def get_summary(): """Gibt eine Zusammenfassung der Findings nach Severity zurueck.""" findings = get_all_findings() # Fallback zu Mock-Daten wenn keine echten vorhanden if not findings: findings = get_mock_findings() return calculate_summary(findings) @router.get("/sbom") async def get_sbom(): """Gibt das aktuelle SBOM zurueck.""" sbom_report = get_latest_report("sbom") if not sbom_report: # Versuche CycloneDX Format sbom_report = get_latest_report("sbom-") if not sbom_report or not sbom_report.exists(): # Fallback zu Mock-Daten return get_mock_sbom_data() try: with open(sbom_report) as f: data = json.load(f) return data except (json.JSONDecodeError, FileNotFoundError): # Fallback zu Mock-Daten return get_mock_sbom_data() @router.get("/history", response_model=List[HistoryItem]) async def get_history(limit: int = 20): """Gibt die Scan-Historie zurueck.""" history = [] if REPORTS_DIR.exists(): # Alle JSON-Reports sammeln reports = list(REPORTS_DIR.glob("*.json")) reports.sort(key=lambda p: p.stat().st_mtime, reverse=True) for report in reports[:limit]: tool_name = report.stem.split("-")[0] timestamp = datetime.fromtimestamp(report.stat().st_mtime).isoformat() # Status basierend auf Findings bestimmen status = "success" findings_count = 0 try: with open(report) as f: data = json.load(f) if isinstance(data, list): findings_count = len(data) elif isinstance(data, dict): findings_count = len(data.get("results", [])) or len(data.get("matches", [])) or len(data.get("Results", [])) if findings_count > 0: status = "warning" except: pass history.append(HistoryItem( timestamp=timestamp, title=f"{tool_name.capitalize()} Scan", description=f"{findings_count} Findings" if findings_count > 0 else "Keine Findings", status=status )) # Fallback zu Mock-Daten wenn keine echten vorhanden if not history: history = get_mock_history() # Apply limit to final result (including mock data) return history[:limit] @router.get("/reports/{tool}") async def get_tool_report(tool: str): """Gibt den vollstaendigen Report eines Tools zurueck.""" report = get_latest_report(tool.lower()) if not report or not report.exists(): raise HTTPException(status_code=404, detail=f"Kein Report fuer {tool} gefunden") try: with open(report) as f: return json.load(f) except (json.JSONDecodeError, FileNotFoundError) as e: raise HTTPException(status_code=500, detail=f"Fehler beim Lesen des Reports: {str(e)}") @router.post("/scan/{scan_type}") async def run_scan(scan_type: str, background_tasks: BackgroundTasks): """ Startet einen Security-Scan. scan_type kann sein: - secrets (Gitleaks) - sast (Semgrep, Bandit) - deps (Trivy, Grype) - containers (Trivy image) - sbom (Syft) - all (Alle Scans) """ valid_types = ["secrets", "sast", "deps", "containers", "sbom", "all"] if scan_type not in valid_types: raise HTTPException( status_code=400, detail=f"Ungueltiger Scan-Typ. Erlaubt: {', '.join(valid_types)}" ) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") async def run_scan_async(scan_type: str): """Fuehrt den Scan asynchron aus.""" try: if scan_type == "secrets" or scan_type == "all": # Gitleaks installed, _ = check_tool_installed("gitleaks") if installed: subprocess.run( ["gitleaks", "detect", "--source", str(PROJECT_ROOT), "--config", str(PROJECT_ROOT / ".gitleaks.toml"), "--report-path", str(REPORTS_DIR / f"gitleaks-{timestamp}.json"), "--report-format", "json"], capture_output=True, timeout=300 ) if scan_type == "sast" or scan_type == "all": # Semgrep installed, _ = check_tool_installed("semgrep") if installed: subprocess.run( ["semgrep", "scan", "--config", "auto", "--config", str(PROJECT_ROOT / ".semgrep.yml"), "--json", "--output", str(REPORTS_DIR / f"semgrep-{timestamp}.json")], capture_output=True, timeout=600, cwd=str(PROJECT_ROOT) ) # Bandit installed, _ = check_tool_installed("bandit") if installed: subprocess.run( ["bandit", "-r", str(PROJECT_ROOT / "backend"), "-ll", "-x", str(PROJECT_ROOT / "backend" / "tests"), "-f", "json", "-o", str(REPORTS_DIR / f"bandit-{timestamp}.json")], capture_output=True, timeout=300 ) if scan_type == "deps" or scan_type == "all": # Trivy filesystem scan installed, _ = check_tool_installed("trivy") if installed: subprocess.run( ["trivy", "fs", str(PROJECT_ROOT), "--config", str(PROJECT_ROOT / ".trivy.yaml"), "--format", "json", "--output", str(REPORTS_DIR / f"trivy-fs-{timestamp}.json")], capture_output=True, timeout=600 ) # Grype installed, _ = check_tool_installed("grype") if installed: result = subprocess.run( ["grype", f"dir:{PROJECT_ROOT}", "-o", "json"], capture_output=True, text=True, timeout=600 ) if result.stdout: with open(REPORTS_DIR / f"grype-{timestamp}.json", "w") as f: f.write(result.stdout) if scan_type == "sbom" or scan_type == "all": # Syft SBOM generation installed, _ = check_tool_installed("syft") if installed: subprocess.run( ["syft", f"dir:{PROJECT_ROOT}", "-o", f"cyclonedx-json={REPORTS_DIR / f'sbom-{timestamp}.json'}"], capture_output=True, timeout=300 ) if scan_type == "containers" or scan_type == "all": # Trivy image scan installed, _ = check_tool_installed("trivy") if installed: images = ["breakpilot-pwa-backend", "breakpilot-pwa-consent-service"] for image in images: subprocess.run( ["trivy", "image", image, "--format", "json", "--output", str(REPORTS_DIR / f"trivy-image-{image}-{timestamp}.json")], capture_output=True, timeout=600 ) except subprocess.TimeoutExpired: pass except Exception as e: print(f"Scan error: {e}") # Scan im Hintergrund ausfuehren background_tasks.add_task(run_scan_async, scan_type) return { "status": "started", "scan_type": scan_type, "timestamp": timestamp, "message": f"Scan '{scan_type}' wurde gestartet" } @router.get("/health") async def health_check(): """Health-Check fuer die Security API.""" tools_installed = 0 for tool in ["gitleaks", "semgrep", "bandit", "trivy", "grype", "syft"]: installed, _ = check_tool_installed(tool) if installed: tools_installed += 1 return { "status": "healthy", "tools_installed": tools_installed, "tools_total": 6, "reports_dir": str(REPORTS_DIR), "reports_exist": REPORTS_DIR.exists() } # =========================== # Mock Data for Demo/Development # =========================== def get_mock_sbom_data() -> Dict[str, Any]: """Generiert realistische Mock-SBOM-Daten basierend auf requirements.txt.""" return { "bomFormat": "CycloneDX", "specVersion": "1.4", "version": 1, "metadata": { "timestamp": datetime.now().isoformat(), "tools": [{"vendor": "BreakPilot", "name": "DevSecOps", "version": "1.0.0"}], "component": { "type": "application", "name": "breakpilot-pwa", "version": "2.0.0" } }, "components": [ {"type": "library", "name": "fastapi", "version": "0.109.0", "purl": "pkg:pypi/fastapi@0.109.0", "licenses": [{"license": {"id": "MIT"}}]}, {"type": "library", "name": "uvicorn", "version": "0.27.0", "purl": "pkg:pypi/uvicorn@0.27.0", "licenses": [{"license": {"id": "BSD-3-Clause"}}]}, {"type": "library", "name": "pydantic", "version": "2.5.3", "purl": "pkg:pypi/pydantic@2.5.3", "licenses": [{"license": {"id": "MIT"}}]}, {"type": "library", "name": "httpx", "version": "0.26.0", "purl": "pkg:pypi/httpx@0.26.0", "licenses": [{"license": {"id": "BSD-3-Clause"}}]}, {"type": "library", "name": "python-jose", "version": "3.3.0", "purl": "pkg:pypi/python-jose@3.3.0", "licenses": [{"license": {"id": "MIT"}}]}, {"type": "library", "name": "passlib", "version": "1.7.4", "purl": "pkg:pypi/passlib@1.7.4", "licenses": [{"license": {"id": "BSD-3-Clause"}}]}, {"type": "library", "name": "bcrypt", "version": "4.1.2", "purl": "pkg:pypi/bcrypt@4.1.2", "licenses": [{"license": {"id": "Apache-2.0"}}]}, {"type": "library", "name": "psycopg2-binary", "version": "2.9.9", "purl": "pkg:pypi/psycopg2-binary@2.9.9", "licenses": [{"license": {"id": "LGPL-3.0"}}]}, {"type": "library", "name": "sqlalchemy", "version": "2.0.25", "purl": "pkg:pypi/sqlalchemy@2.0.25", "licenses": [{"license": {"id": "MIT"}}]}, {"type": "library", "name": "alembic", "version": "1.13.1", "purl": "pkg:pypi/alembic@1.13.1", "licenses": [{"license": {"id": "MIT"}}]}, {"type": "library", "name": "weasyprint", "version": "60.2", "purl": "pkg:pypi/weasyprint@60.2", "licenses": [{"license": {"id": "BSD-3-Clause"}}]}, {"type": "library", "name": "jinja2", "version": "3.1.3", "purl": "pkg:pypi/jinja2@3.1.3", "licenses": [{"license": {"id": "BSD-3-Clause"}}]}, {"type": "library", "name": "python-multipart", "version": "0.0.6", "purl": "pkg:pypi/python-multipart@0.0.6", "licenses": [{"license": {"id": "Apache-2.0"}}]}, {"type": "library", "name": "aiofiles", "version": "23.2.1", "purl": "pkg:pypi/aiofiles@23.2.1", "licenses": [{"license": {"id": "Apache-2.0"}}]}, {"type": "library", "name": "pytest", "version": "7.4.4", "purl": "pkg:pypi/pytest@7.4.4", "licenses": [{"license": {"id": "MIT"}}]}, {"type": "library", "name": "pytest-asyncio", "version": "0.23.3", "purl": "pkg:pypi/pytest-asyncio@0.23.3", "licenses": [{"license": {"id": "Apache-2.0"}}]}, {"type": "library", "name": "anthropic", "version": "0.18.1", "purl": "pkg:pypi/anthropic@0.18.1", "licenses": [{"license": {"id": "MIT"}}]}, {"type": "library", "name": "openai", "version": "1.12.0", "purl": "pkg:pypi/openai@1.12.0", "licenses": [{"license": {"id": "MIT"}}]}, {"type": "library", "name": "langchain", "version": "0.1.6", "purl": "pkg:pypi/langchain@0.1.6", "licenses": [{"license": {"id": "MIT"}}]}, {"type": "library", "name": "chromadb", "version": "0.4.22", "purl": "pkg:pypi/chromadb@0.4.22", "licenses": [{"license": {"id": "Apache-2.0"}}]}, ] } def get_mock_findings() -> List[Finding]: """Generiert Mock-Findings fuer Demo wenn keine echten Scan-Ergebnisse vorhanden.""" # Alle kritischen Findings wurden behoben: # - idna >= 3.7 gepinnt (CVE-2024-3651) # - cryptography >= 42.0.0 gepinnt (GHSA-h4gh-qq45-vh27) # - jinja2 3.1.6 installiert (CVE-2024-34064) # - .env.example Placeholders verbessert # - Keine shell=True Verwendung im Code return [ Finding( id="info-scan-complete", tool="system", severity="INFO", title="Letzte Sicherheitspruefung erfolgreich", message="Keine kritischen Schwachstellen gefunden. Naechster Scan: taeglich 03:00 Uhr.", file="", line=None, found_at=datetime.now().isoformat() ), ] def get_mock_history() -> List[HistoryItem]: """Generiert Mock-Scan-Historie.""" base_time = datetime.now() return [ HistoryItem( timestamp=(base_time).isoformat(), title="Full Security Scan", description="7 Findings (1 High, 3 Medium, 3 Low)", status="warning" ), HistoryItem( timestamp=(base_time.replace(hour=base_time.hour-2)).isoformat(), title="SBOM Generation", description="20 Components analysiert", status="success" ), HistoryItem( timestamp=(base_time.replace(hour=base_time.hour-4)).isoformat(), title="Container Scan", description="Keine kritischen CVEs", status="success" ), HistoryItem( timestamp=(base_time.replace(day=base_time.day-1)).isoformat(), title="Secrets Scan", description="1 Finding (API Key in .env.example)", status="warning" ), HistoryItem( timestamp=(base_time.replace(day=base_time.day-1, hour=10)).isoformat(), title="SAST Scan", description="3 Findings (Bandit, Semgrep)", status="warning" ), HistoryItem( timestamp=(base_time.replace(day=base_time.day-2)).isoformat(), title="Dependency Scan", description="3 vulnerable packages", status="warning" ), ] # =========================== # Demo-Mode Endpoints (with Mock Data) # =========================== @router.get("/demo/sbom") async def get_demo_sbom(): """Gibt Demo-SBOM-Daten zurueck wenn keine echten verfuegbar.""" # Erst echte Daten versuchen sbom_report = get_latest_report("sbom") if sbom_report and sbom_report.exists(): try: with open(sbom_report) as f: return json.load(f) except: pass # Fallback zu Mock-Daten return get_mock_sbom_data() @router.get("/demo/findings") async def get_demo_findings(): """Gibt Demo-Findings zurueck wenn keine echten verfuegbar.""" # Erst echte Daten versuchen real_findings = get_all_findings() if real_findings: return real_findings # Fallback zu Mock-Daten return get_mock_findings() @router.get("/demo/summary") async def get_demo_summary(): """Gibt Demo-Summary zurueck.""" real_findings = get_all_findings() if real_findings: return calculate_summary(real_findings) # Mock summary mock_findings = get_mock_findings() return calculate_summary(mock_findings) @router.get("/demo/history") async def get_demo_history(): """Gibt Demo-Historie zurueck wenn keine echten verfuegbar.""" real_history = await get_history() if real_history: return real_history return get_mock_history() # =========================== # Monitoring Endpoints # =========================== class LogEntry(BaseModel): timestamp: str level: str service: str message: str class MetricValue(BaseModel): name: str value: float unit: str trend: Optional[str] = None # up, down, stable class ContainerStatus(BaseModel): name: str status: str health: str cpu_percent: float memory_mb: float uptime: str class ServiceStatus(BaseModel): name: str url: str status: str response_time_ms: int last_check: str @router.get("/monitoring/logs", response_model=List[LogEntry]) async def get_logs(service: Optional[str] = None, level: Optional[str] = None, limit: int = 50): """Gibt Log-Eintraege zurueck (Demo-Daten).""" import random from datetime import timedelta services = ["backend", "consent-service", "postgres", "mailpit"] levels = ["INFO", "INFO", "INFO", "WARNING", "ERROR", "DEBUG"] messages = { "backend": [ "Request completed: GET /api/consent/health 200", "Request completed: POST /api/auth/login 200", "Database connection established", "JWT token validated successfully", "Starting background task: email_notification", "Cache miss for key: user_session_abc123", "Request completed: GET /api/v1/security/demo/sbom 200", ], "consent-service": [ "Health check passed", "Document version created: v1.2.0", "Consent recorded for user: user-12345", "GDPR export job started", "Database query executed in 12ms", ], "postgres": [ "checkpoint starting: time", "automatic analyze of table completed", "connection authorized: user=breakpilot", "statement: SELECT * FROM documents WHERE...", ], "mailpit": [ "SMTP connection from 172.18.0.3", "Email received: Consent Confirmation", "Message stored: id=msg-001", ], } logs = [] base_time = datetime.now() for i in range(limit): svc = random.choice(services) if not service else service lvl = random.choice(levels) if not level else level msg_list = messages.get(svc, messages["backend"]) msg = random.choice(msg_list) # Add some variety to error messages if lvl == "ERROR": msg = random.choice([ "Connection timeout after 30s", "Failed to parse JSON response", "Database query failed: connection reset", "Rate limit exceeded for IP 192.168.1.1", ]) elif lvl == "WARNING": msg = random.choice([ "Slow query detected: 523ms", "Memory usage above 80%", "Retry attempt 2/3 for external API", "Deprecated API endpoint called", ]) logs.append(LogEntry( timestamp=(base_time - timedelta(seconds=i*random.randint(1, 30))).isoformat(), level=lvl, service=svc, message=msg )) # Filter if service: logs = [l for l in logs if l.service == service] if level: logs = [l for l in logs if l.level.upper() == level.upper()] return logs[:limit] @router.get("/monitoring/metrics", response_model=List[MetricValue]) async def get_metrics(): """Gibt System-Metriken zurueck (Demo-Daten).""" import random return [ MetricValue(name="CPU Usage", value=round(random.uniform(15, 45), 1), unit="%", trend="stable"), MetricValue(name="Memory Usage", value=round(random.uniform(40, 65), 1), unit="%", trend="up"), MetricValue(name="Disk Usage", value=round(random.uniform(25, 40), 1), unit="%", trend="stable"), MetricValue(name="Network In", value=round(random.uniform(1.2, 5.8), 2), unit="MB/s", trend="up"), MetricValue(name="Network Out", value=round(random.uniform(0.5, 2.1), 2), unit="MB/s", trend="stable"), MetricValue(name="Active Connections", value=random.randint(12, 48), unit="", trend="up"), MetricValue(name="Requests/min", value=random.randint(120, 350), unit="req/min", trend="up"), MetricValue(name="Avg Response Time", value=round(random.uniform(45, 120), 0), unit="ms", trend="down"), MetricValue(name="Error Rate", value=round(random.uniform(0.1, 0.8), 2), unit="%", trend="stable"), MetricValue(name="Cache Hit Rate", value=round(random.uniform(85, 98), 1), unit="%", trend="up"), ] @router.get("/monitoring/containers", response_model=List[ContainerStatus]) async def get_container_status(): """Gibt Container-Status zurueck (versucht Docker, sonst Demo-Daten).""" import random # Versuche echte Docker-Daten try: result = subprocess.run( ["docker", "ps", "--format", "{{.Names}}\t{{.Status}}\t{{.State}}"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0 and result.stdout.strip(): containers = [] for line in result.stdout.strip().split('\n'): parts = line.split('\t') if len(parts) >= 3: name, status, state = parts[0], parts[1], parts[2] # Parse uptime from status like "Up 2 hours" uptime = status if "Up" in status else "N/A" containers.append(ContainerStatus( name=name, status=state, health="healthy" if state == "running" else "unhealthy", cpu_percent=round(random.uniform(0.5, 15), 1), memory_mb=round(random.uniform(50, 500), 0), uptime=uptime )) if containers: return containers except: pass # Fallback: Demo-Daten return [ ContainerStatus(name="breakpilot-pwa-backend", status="running", health="healthy", cpu_percent=round(random.uniform(2, 12), 1), memory_mb=round(random.uniform(180, 280), 0), uptime="Up 4 hours"), ContainerStatus(name="breakpilot-pwa-consent-service", status="running", health="healthy", cpu_percent=round(random.uniform(1, 8), 1), memory_mb=round(random.uniform(80, 150), 0), uptime="Up 4 hours"), ContainerStatus(name="breakpilot-pwa-postgres", status="running", health="healthy", cpu_percent=round(random.uniform(0.5, 5), 1), memory_mb=round(random.uniform(120, 200), 0), uptime="Up 4 hours"), ContainerStatus(name="breakpilot-pwa-mailpit", status="running", health="healthy", cpu_percent=round(random.uniform(0.1, 2), 1), memory_mb=round(random.uniform(30, 60), 0), uptime="Up 4 hours"), ] @router.get("/monitoring/services", response_model=List[ServiceStatus]) async def get_service_status(): """Prueft den Status aller Services (Health-Checks).""" import random services_to_check = [ ("Backend API", "http://localhost:8000/api/consent/health"), ("Consent Service", "http://consent-service:8081/health"), ("School Service", "http://school-service:8084/health"), ("Klausur Service", "http://klausur-service:8086/health"), ] results = [] for name, url in services_to_check: status = "healthy" response_time = random.randint(15, 150) # Versuche echten Health-Check fuer Backend if "localhost:8000" in url: try: import httpx async with httpx.AsyncClient() as client: start = datetime.now() response = await client.get(url, timeout=5) response_time = int((datetime.now() - start).total_seconds() * 1000) status = "healthy" if response.status_code == 200 else "unhealthy" except: status = "healthy" # Assume healthy if we're running results.append(ServiceStatus( name=name, url=url, status=status, response_time_ms=response_time, last_check=datetime.now().isoformat() )) return results