""" Python Test Runner (pytest) Fuehrt Python-Tests aus und parsed die Ergebnisse. """ import subprocess import json import re from datetime import datetime from pathlib import Path from typing import Dict, List, Optional from dataclasses import dataclass, field @dataclass class PytestResult: """Ergebnis eines einzelnen pytest-Tests""" node_id: str test_name: str file_path: str passed: bool duration_seconds: float output: str = "" error_message: Optional[str] = None @dataclass class PytestSummary: """Zusammenfassung eines pytest-Runs""" total: int = 0 passed: int = 0 failed: int = 0 skipped: int = 0 errors: int = 0 duration_seconds: float = 0.0 coverage_percent: Optional[float] = None results: List[PytestResult] = field(default_factory=list) raw_output: str = "" class PytestRunner: """ Runner fuer Python-Tests mit pytest. Verwendet `pytest --json-report` fuer strukturierte Ausgabe. """ def __init__(self, base_path: Path, venv_path: Optional[Path] = None): self.base_path = base_path self.venv_path = venv_path def _get_python_cmd(self) -> str: """Gibt den Python-Befehl zurueck (aus venv wenn vorhanden)""" if self.venv_path and (self.venv_path / "bin" / "python").exists(): return str(self.venv_path / "bin" / "python") return "python" async def run(self, with_coverage: bool = True, timeout: int = 300) -> PytestSummary: """ Fuehrt pytest aus. Args: with_coverage: Coverage erfassen mit pytest-cov timeout: Timeout in Sekunden Returns: PytestSummary mit allen Ergebnissen """ if not self.base_path.exists(): return PytestSummary(raw_output="Pfad existiert nicht") python_cmd = self._get_python_cmd() cmd = [python_cmd, "-m", "pytest", "-v", "--tb=short"] if with_coverage: cmd.extend(["--cov=.", "--cov-report=term-missing"]) cmd.append(str(self.base_path)) try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout, ) return self._parse_output(result.stdout, result.stderr) except subprocess.TimeoutExpired: return PytestSummary(raw_output=f"Timeout nach {timeout} Sekunden") except FileNotFoundError: return PytestSummary(raw_output="Python/pytest nicht installiert") except Exception as e: return PytestSummary(raw_output=str(e)) def _parse_output(self, stdout: str, stderr: str) -> PytestSummary: """Parsed die pytest-Ausgabe""" output = stdout + stderr summary = PytestSummary(raw_output=output[:10000]) # Teste-Zeilen parsen (z.B. "test_file.py::test_name PASSED") test_pattern = re.compile(r"([\w/]+\.py)::(\w+)(?:\[.+\])?\s+(PASSED|FAILED|SKIPPED|ERROR)") for match in test_pattern.finditer(output): file_path, test_name, status = match.groups() result = PytestResult( node_id=f"{file_path}::{test_name}", test_name=test_name, file_path=file_path, passed=status == "PASSED", duration_seconds=0.0, ) summary.results.append(result) if status == "PASSED": summary.passed += 1 elif status == "FAILED": summary.failed += 1 elif status == "SKIPPED": summary.skipped += 1 elif status == "ERROR": summary.errors += 1 summary.total = len(summary.results) # Zusammenfassung parsen (z.B. "5 passed, 2 failed in 3.45s") summary_pattern = re.compile( r"=+\s*(?:(\d+)\s+passed)?[,\s]*(?:(\d+)\s+failed)?[,\s]*(?:(\d+)\s+skipped)?[,\s]*(?:(\d+)\s+error)?.*?in\s+([\d.]+)s" ) match = summary_pattern.search(output) if match: if match.group(1): summary.passed = int(match.group(1)) if match.group(2): summary.failed = int(match.group(2)) if match.group(3): summary.skipped = int(match.group(3)) if match.group(4): summary.errors = int(match.group(4)) if match.group(5): summary.duration_seconds = float(match.group(5)) summary.total = summary.passed + summary.failed + summary.skipped + summary.errors # Coverage parsen (z.B. "TOTAL 1234 567 54%") coverage_pattern = re.compile(r"TOTAL\s+\d+\s+\d+\s+(\d+)%") coverage_match = coverage_pattern.search(output) if coverage_match: summary.coverage_percent = float(coverage_match.group(1)) return summary async def run_single_test(self, test_path: str, timeout: int = 60) -> PytestResult: """ Fuehrt einen einzelnen Test aus. Args: test_path: Pfad zum Test (z.B. "test_file.py::test_name") timeout: Timeout in Sekunden Returns: PytestResult fuer den spezifischen Test """ python_cmd = self._get_python_cmd() cmd = [python_cmd, "-m", "pytest", "-v", test_path] try: result = subprocess.run( cmd, cwd=str(self.base_path), capture_output=True, text=True, timeout=timeout, ) passed = "passed" in result.stdout.lower() and "failed" not in result.stdout.lower() return PytestResult( node_id=test_path, test_name=test_path.split("::")[-1] if "::" in test_path else test_path, file_path=test_path.split("::")[0] if "::" in test_path else test_path, passed=passed, duration_seconds=0.0, output=result.stdout + result.stderr, ) except Exception as e: return PytestResult( node_id=test_path, test_name=test_path, file_path="", passed=False, duration_seconds=0.0, output=str(e), ) async def get_coverage_report(self, format: str = "term") -> Optional[Dict]: """ Generiert einen Coverage-Bericht. Args: format: "term", "html", oder "xml" Returns: Dict mit Coverage-Details oder None """ python_cmd = self._get_python_cmd() cmd = [python_cmd, "-m", "pytest", "--cov=.", f"--cov-report={format}"] try: result = subprocess.run( cmd, cwd=str(self.base_path), capture_output=True, text=True, timeout=120, ) # Parse "TOTAL" Zeile coverage_pattern = re.compile(r"TOTAL\s+\d+\s+\d+\s+(\d+)%") match = coverage_pattern.search(result.stdout) if match: return { "total_coverage": float(match.group(1)), "format": format, "raw_output": result.stdout, } except Exception: pass return None async def list_tests(self) -> List[str]: """ Listet alle verfuegbaren Tests auf. Returns: Liste von Test-IDs """ python_cmd = self._get_python_cmd() cmd = [python_cmd, "-m", "pytest", "--collect-only", "-q"] try: result = subprocess.run( cmd, cwd=str(self.base_path), capture_output=True, text=True, timeout=30, ) tests = [] for line in result.stdout.split("\n"): line = line.strip() if "::" in line and not line.startswith("<"): tests.append(line) return tests except Exception: return []