This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/backend/api/tests/repository.py
Benjamin Admin 21a844cb8a fix: Restore all files lost during destructive rebase
A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.

This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).

Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 09:51:32 +01:00

501 lines
18 KiB
Python

"""
Repository fuer Test Registry Datenbank-Operationen.
Abstrahiert alle DB-Zugriffe fuer:
- Test-Runs speichern und abrufen
- Test-Ergebnisse verwalten
- Backlog-Items verwalten
- Service-Statistiken aktualisieren
"""
from datetime import datetime
from typing import List, Optional, Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy import func, desc
from .db_models import (
TestRunDB,
TestResultDB,
FailedTestBacklogDB,
TestFixHistoryDB,
TestServiceStatsDB
)
class TestRepository:
"""Repository fuer Test-bezogene Datenbank-Operationen."""
def __init__(self, db: Session):
self.db = db
# ========================================
# Test Runs
# ========================================
def create_run(
self,
run_id: str,
service: str,
framework: str,
triggered_by: str = "manual",
git_commit: Optional[str] = None,
git_branch: Optional[str] = None
) -> TestRunDB:
"""Erstellt einen neuen Test-Run."""
run = TestRunDB(
run_id=run_id,
service=service,
framework=framework,
started_at=datetime.utcnow(),
status="running",
triggered_by=triggered_by,
git_commit=git_commit,
git_branch=git_branch
)
self.db.add(run)
self.db.commit()
self.db.refresh(run)
return run
def complete_run(
self,
run_id: str,
status: str,
total_tests: int,
passed_tests: int,
failed_tests: int,
skipped_tests: int = 0,
duration_seconds: float = 0,
output: Optional[str] = None
) -> Optional[TestRunDB]:
"""Markiert einen Run als abgeschlossen und aktualisiert Statistiken."""
run = self.db.query(TestRunDB).filter(TestRunDB.run_id == run_id).first()
if run:
run.completed_at = datetime.utcnow()
run.status = status
run.total_tests = total_tests
run.passed_tests = passed_tests
run.failed_tests = failed_tests
run.skipped_tests = skipped_tests
run.duration_seconds = duration_seconds
run.output = output[:10000] if output else None # Truncate output
self.db.commit()
self.db.refresh(run)
# Aktualisiere Service-Statistiken
self._update_service_stats(run)
return run
def get_run(self, run_id: str) -> Optional[TestRunDB]:
"""Holt einen Run anhand der ID."""
return self.db.query(TestRunDB).filter(TestRunDB.run_id == run_id).first()
def get_runs(
self,
service: Optional[str] = None,
limit: int = 50,
offset: int = 0
) -> List[TestRunDB]:
"""Holt Test-Runs mit optionalem Service-Filter."""
query = self.db.query(TestRunDB)
if service:
query = query.filter(TestRunDB.service == service)
return query.order_by(desc(TestRunDB.started_at)).offset(offset).limit(limit).all()
def get_runs_count(self, service: Optional[str] = None) -> int:
"""Zaehlt Test-Runs."""
query = self.db.query(func.count(TestRunDB.id))
if service:
query = query.filter(TestRunDB.service == service)
return query.scalar() or 0
# ========================================
# Test Results
# ========================================
def add_results(self, run_id: str, results: List[Dict[str, Any]]) -> int:
"""Fuegt mehrere Test-Ergebnisse hinzu."""
count = 0
for result in results:
db_result = TestResultDB(
run_id=run_id,
test_name=result.get("name") or result.get("test_name", "unknown"),
test_file=result.get("file_path") or result.get("test_file"),
line_number=result.get("line_number"),
status=result.get("status", "unknown"),
duration_ms=result.get("duration_ms"),
error_message=result.get("error_message"),
error_type=result.get("error_type"),
output=result.get("output")
)
self.db.add(db_result)
count += 1
# Bei fehlgeschlagenen Tests: Backlog aktualisieren
if result.get("status") in ["failed", "error"]:
self._update_backlog(
run_id=run_id,
test_name=result.get("name") or result.get("test_name", "unknown"),
test_file=result.get("file_path") or result.get("test_file"),
error_message=result.get("error_message"),
error_type=result.get("error_type"),
suggestion=result.get("suggestion")
)
self.db.commit()
return count
def get_results(self, run_id: str) -> List[TestResultDB]:
"""Holt alle Ergebnisse eines Runs."""
return self.db.query(TestResultDB).filter(TestResultDB.run_id == run_id).all()
def get_failed_results(self, run_id: str) -> List[TestResultDB]:
"""Holt nur fehlgeschlagene Ergebnisse eines Runs."""
return self.db.query(TestResultDB).filter(
TestResultDB.run_id == run_id,
TestResultDB.status.in_(["failed", "error"])
).all()
# ========================================
# Backlog
# ========================================
def _update_backlog(
self,
run_id: str,
test_name: str,
test_file: Optional[str],
error_message: Optional[str],
error_type: Optional[str],
suggestion: Optional[str] = None
):
"""Aktualisiert oder erstellt einen Backlog-Eintrag fuer einen fehlgeschlagenen Test."""
# Hole den Run um Service und Framework zu bekommen
run = self.db.query(TestRunDB).filter(TestRunDB.run_id == run_id).first()
if not run:
return
# Suche nach existierendem Backlog-Eintrag
backlog = self.db.query(FailedTestBacklogDB).filter(
FailedTestBacklogDB.test_name == test_name,
FailedTestBacklogDB.service == run.service
).first()
now = datetime.utcnow()
if backlog:
# Existiert bereits - aktualisiere
backlog.last_failed_at = now
backlog.failure_count += 1
backlog.error_message = error_message or backlog.error_message
backlog.error_type = error_type or backlog.error_type
if suggestion:
backlog.fix_suggestion = suggestion
# Reset status wenn es wieder fehlschlaegt
if backlog.status == "fixed":
backlog.status = "open"
backlog.notes = f"Erneut fehlgeschlagen nach Fix am {now.isoformat()}"
else:
# Neu erstellen
backlog = FailedTestBacklogDB(
test_name=test_name,
test_file=test_file,
service=run.service,
framework=run.framework,
error_message=error_message,
error_type=error_type,
first_failed_at=now,
last_failed_at=now,
failure_count=1,
status="open",
priority=self._calculate_priority(error_type),
fix_suggestion=suggestion
)
self.db.add(backlog)
def _calculate_priority(self, error_type: Optional[str]) -> str:
"""Berechnet Prioritaet basierend auf Fehlertyp."""
high_priority = ["nil_pointer", "panic", "security", "critical"]
medium_priority = ["assertion", "type_error", "value_error"]
if error_type:
if any(p in error_type.lower() for p in high_priority):
return "high"
if any(p in error_type.lower() for p in medium_priority):
return "medium"
return "medium"
def get_backlog(
self,
status: Optional[str] = None,
service: Optional[str] = None,
priority: Optional[str] = None,
limit: int = 100,
offset: int = 0
) -> List[FailedTestBacklogDB]:
"""Holt Backlog-Eintraege mit optionalen Filtern."""
query = self.db.query(FailedTestBacklogDB)
if status:
query = query.filter(FailedTestBacklogDB.status == status)
if service:
query = query.filter(FailedTestBacklogDB.service == service)
if priority:
query = query.filter(FailedTestBacklogDB.priority == priority)
return query.order_by(
desc(FailedTestBacklogDB.failure_count),
desc(FailedTestBacklogDB.last_failed_at)
).offset(offset).limit(limit).all()
def get_backlog_count(
self,
status: Optional[str] = None,
service: Optional[str] = None
) -> int:
"""Zaehlt Backlog-Eintraege."""
query = self.db.query(func.count(FailedTestBacklogDB.id))
if status:
query = query.filter(FailedTestBacklogDB.status == status)
if service:
query = query.filter(FailedTestBacklogDB.service == service)
return query.scalar() or 0
def get_backlog_item(self, backlog_id: int) -> Optional[FailedTestBacklogDB]:
"""Holt einen einzelnen Backlog-Eintrag."""
return self.db.query(FailedTestBacklogDB).filter(FailedTestBacklogDB.id == backlog_id).first()
def update_backlog_status(
self,
backlog_id: int,
status: str,
notes: Optional[str] = None,
assigned_to: Optional[str] = None
) -> Optional[FailedTestBacklogDB]:
"""Aktualisiert den Status eines Backlog-Eintrags."""
backlog = self.get_backlog_item(backlog_id)
if backlog:
backlog.status = status
if notes:
backlog.notes = notes
if assigned_to:
backlog.assigned_to = assigned_to
self.db.commit()
self.db.refresh(backlog)
return backlog
def update_backlog_priority(self, backlog_id: int, priority: str) -> Optional[FailedTestBacklogDB]:
"""Aktualisiert die Prioritaet eines Backlog-Eintrags."""
backlog = self.get_backlog_item(backlog_id)
if backlog:
backlog.priority = priority
self.db.commit()
self.db.refresh(backlog)
return backlog
# ========================================
# Fix History
# ========================================
def add_fix_attempt(
self,
backlog_id: int,
fix_type: str,
fix_description: str,
commit_hash: Optional[str] = None,
success: bool = False
) -> TestFixHistoryDB:
"""Fuegt einen Fix-Versuch zur Historie hinzu."""
fix = TestFixHistoryDB(
backlog_id=backlog_id,
fix_type=fix_type,
fix_description=fix_description,
commit_hash=commit_hash,
success=success
)
self.db.add(fix)
# Bei Erfolg: Backlog-Status aktualisieren
if success:
backlog = self.get_backlog_item(backlog_id)
if backlog:
backlog.status = "fixed"
self.db.commit()
self.db.refresh(fix)
return fix
def get_fix_history(self, backlog_id: int) -> List[TestFixHistoryDB]:
"""Holt die Fix-Historie fuer einen Backlog-Eintrag."""
return self.db.query(TestFixHistoryDB).filter(
TestFixHistoryDB.backlog_id == backlog_id
).order_by(desc(TestFixHistoryDB.created_at)).all()
# ========================================
# Service Statistics
# ========================================
def _update_service_stats(self, run: TestRunDB):
"""Aktualisiert die Service-Statistiken nach einem Run."""
stats = self.db.query(TestServiceStatsDB).filter(
TestServiceStatsDB.service == run.service
).first()
if not stats:
stats = TestServiceStatsDB(service=run.service)
self.db.add(stats)
stats.total_tests = run.total_tests
stats.passed_tests = run.passed_tests
stats.failed_tests = run.failed_tests
stats.skipped_tests = run.skipped_tests
stats.pass_rate = (run.passed_tests / run.total_tests * 100) if run.total_tests > 0 else 0.0
stats.last_run_id = run.run_id
stats.last_run_at = run.completed_at or datetime.utcnow()
stats.last_status = run.status
self.db.commit()
def get_service_stats(self, service: str) -> Optional[TestServiceStatsDB]:
"""Holt Statistiken fuer einen Service."""
return self.db.query(TestServiceStatsDB).filter(
TestServiceStatsDB.service == service
).first()
def get_all_service_stats(self) -> List[TestServiceStatsDB]:
"""Holt Statistiken fuer alle Services."""
return self.db.query(TestServiceStatsDB).all()
# ========================================
# History & Trends
# ========================================
def get_run_history(
self,
service: Optional[str] = None,
days: int = 30,
limit: int = 100
) -> List[Dict[str, Any]]:
"""
Holt die Run-Historie fuer Trend-Analysen.
Gruppiert nach Tag.
"""
from datetime import timedelta
cutoff = datetime.utcnow() - timedelta(days=days)
query = self.db.query(
func.date(TestRunDB.started_at).label('date'),
TestRunDB.service,
func.count(TestRunDB.id).label('runs'),
func.sum(TestRunDB.total_tests).label('total_tests'),
func.sum(TestRunDB.passed_tests).label('passed'),
func.sum(TestRunDB.failed_tests).label('failed')
).filter(TestRunDB.started_at >= cutoff)
if service:
query = query.filter(TestRunDB.service == service)
results = query.group_by(
func.date(TestRunDB.started_at),
TestRunDB.service
).order_by(desc(func.date(TestRunDB.started_at))).limit(limit).all()
return [
{
"date": str(r.date),
"service": r.service,
"runs": r.runs,
"total_tests": r.total_tests or 0,
"passed": r.passed or 0,
"failed": r.failed or 0,
"pass_rate": round((r.passed / r.total_tests * 100) if r.total_tests else 0, 1)
}
for r in results
]
def get_summary_stats(self) -> Dict[str, Any]:
"""Holt aggregierte Statistiken ueber alle Services."""
stats = self.db.query(
func.sum(TestServiceStatsDB.total_tests).label('total_tests'),
func.sum(TestServiceStatsDB.passed_tests).label('passed'),
func.sum(TestServiceStatsDB.failed_tests).label('failed'),
func.sum(TestServiceStatsDB.skipped_tests).label('skipped'),
func.count(TestServiceStatsDB.id).label('services_count')
).first()
total = stats.total_tests or 0
passed = stats.passed or 0
return {
"total_tests": total,
"total_passed": passed,
"total_failed": stats.failed or 0,
"total_skipped": stats.skipped or 0,
"services_count": stats.services_count or 0,
"overall_pass_rate": round((passed / total * 100) if total > 0 else 0, 1)
}
# ========================================
# Migration Helper
# ========================================
def migrate_from_json(self, persisted_results: Dict[str, Dict]) -> int:
"""
Migriert bestehende JSON-Daten in die Datenbank.
Wird einmalig beim Upgrade ausgefuehrt.
"""
count = 0
for service, data in persisted_results.items():
# Service-Stats aktualisieren
stats = self.db.query(TestServiceStatsDB).filter(
TestServiceStatsDB.service == service
).first()
if not stats:
stats = TestServiceStatsDB(service=service)
self.db.add(stats)
stats.total_tests = data.get("total", 0)
stats.passed_tests = data.get("passed", 0)
stats.failed_tests = data.get("failed", 0)
stats.pass_rate = (stats.passed_tests / stats.total_tests * 100) if stats.total_tests > 0 else 0.0
last_run = data.get("last_run")
if last_run:
try:
stats.last_run_at = datetime.fromisoformat(last_run)
except:
stats.last_run_at = datetime.utcnow()
stats.last_status = data.get("status", "unknown")
# Fehlgeschlagene Tests ins Backlog
for failed in data.get("failed_test_ids", []):
if isinstance(failed, dict):
test_name = failed.get("id") or failed.get("name", "unknown")
existing = self.db.query(FailedTestBacklogDB).filter(
FailedTestBacklogDB.test_name == test_name,
FailedTestBacklogDB.service == service
).first()
if not existing:
backlog = FailedTestBacklogDB(
test_name=test_name,
test_file=failed.get("file_path"),
service=service,
error_message=failed.get("error_message"),
error_type=failed.get("error_type"),
first_failed_at=stats.last_run_at or datetime.utcnow(),
last_failed_at=stats.last_run_at or datetime.utcnow(),
failure_count=1,
status="open",
priority=self._calculate_priority(failed.get("error_type")),
fix_suggestion=failed.get("suggestion")
)
self.db.add(backlog)
count += 1
self.db.commit()
return count