A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.
This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).
Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
501 lines
18 KiB
Python
501 lines
18 KiB
Python
"""
|
|
Repository fuer Test Registry Datenbank-Operationen.
|
|
|
|
Abstrahiert alle DB-Zugriffe fuer:
|
|
- Test-Runs speichern und abrufen
|
|
- Test-Ergebnisse verwalten
|
|
- Backlog-Items verwalten
|
|
- Service-Statistiken aktualisieren
|
|
"""
|
|
from datetime import datetime
|
|
from typing import List, Optional, Dict, Any
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func, desc
|
|
|
|
from .db_models import (
|
|
TestRunDB,
|
|
TestResultDB,
|
|
FailedTestBacklogDB,
|
|
TestFixHistoryDB,
|
|
TestServiceStatsDB
|
|
)
|
|
|
|
|
|
class TestRepository:
|
|
"""Repository fuer Test-bezogene Datenbank-Operationen."""
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
|
|
# ========================================
|
|
# Test Runs
|
|
# ========================================
|
|
|
|
def create_run(
|
|
self,
|
|
run_id: str,
|
|
service: str,
|
|
framework: str,
|
|
triggered_by: str = "manual",
|
|
git_commit: Optional[str] = None,
|
|
git_branch: Optional[str] = None
|
|
) -> TestRunDB:
|
|
"""Erstellt einen neuen Test-Run."""
|
|
run = TestRunDB(
|
|
run_id=run_id,
|
|
service=service,
|
|
framework=framework,
|
|
started_at=datetime.utcnow(),
|
|
status="running",
|
|
triggered_by=triggered_by,
|
|
git_commit=git_commit,
|
|
git_branch=git_branch
|
|
)
|
|
self.db.add(run)
|
|
self.db.commit()
|
|
self.db.refresh(run)
|
|
return run
|
|
|
|
def complete_run(
|
|
self,
|
|
run_id: str,
|
|
status: str,
|
|
total_tests: int,
|
|
passed_tests: int,
|
|
failed_tests: int,
|
|
skipped_tests: int = 0,
|
|
duration_seconds: float = 0,
|
|
output: Optional[str] = None
|
|
) -> Optional[TestRunDB]:
|
|
"""Markiert einen Run als abgeschlossen und aktualisiert Statistiken."""
|
|
run = self.db.query(TestRunDB).filter(TestRunDB.run_id == run_id).first()
|
|
if run:
|
|
run.completed_at = datetime.utcnow()
|
|
run.status = status
|
|
run.total_tests = total_tests
|
|
run.passed_tests = passed_tests
|
|
run.failed_tests = failed_tests
|
|
run.skipped_tests = skipped_tests
|
|
run.duration_seconds = duration_seconds
|
|
run.output = output[:10000] if output else None # Truncate output
|
|
self.db.commit()
|
|
self.db.refresh(run)
|
|
|
|
# Aktualisiere Service-Statistiken
|
|
self._update_service_stats(run)
|
|
|
|
return run
|
|
|
|
def get_run(self, run_id: str) -> Optional[TestRunDB]:
|
|
"""Holt einen Run anhand der ID."""
|
|
return self.db.query(TestRunDB).filter(TestRunDB.run_id == run_id).first()
|
|
|
|
def get_runs(
|
|
self,
|
|
service: Optional[str] = None,
|
|
limit: int = 50,
|
|
offset: int = 0
|
|
) -> List[TestRunDB]:
|
|
"""Holt Test-Runs mit optionalem Service-Filter."""
|
|
query = self.db.query(TestRunDB)
|
|
if service:
|
|
query = query.filter(TestRunDB.service == service)
|
|
return query.order_by(desc(TestRunDB.started_at)).offset(offset).limit(limit).all()
|
|
|
|
def get_runs_count(self, service: Optional[str] = None) -> int:
|
|
"""Zaehlt Test-Runs."""
|
|
query = self.db.query(func.count(TestRunDB.id))
|
|
if service:
|
|
query = query.filter(TestRunDB.service == service)
|
|
return query.scalar() or 0
|
|
|
|
# ========================================
|
|
# Test Results
|
|
# ========================================
|
|
|
|
def add_results(self, run_id: str, results: List[Dict[str, Any]]) -> int:
|
|
"""Fuegt mehrere Test-Ergebnisse hinzu."""
|
|
count = 0
|
|
for result in results:
|
|
db_result = TestResultDB(
|
|
run_id=run_id,
|
|
test_name=result.get("name") or result.get("test_name", "unknown"),
|
|
test_file=result.get("file_path") or result.get("test_file"),
|
|
line_number=result.get("line_number"),
|
|
status=result.get("status", "unknown"),
|
|
duration_ms=result.get("duration_ms"),
|
|
error_message=result.get("error_message"),
|
|
error_type=result.get("error_type"),
|
|
output=result.get("output")
|
|
)
|
|
self.db.add(db_result)
|
|
count += 1
|
|
|
|
# Bei fehlgeschlagenen Tests: Backlog aktualisieren
|
|
if result.get("status") in ["failed", "error"]:
|
|
self._update_backlog(
|
|
run_id=run_id,
|
|
test_name=result.get("name") or result.get("test_name", "unknown"),
|
|
test_file=result.get("file_path") or result.get("test_file"),
|
|
error_message=result.get("error_message"),
|
|
error_type=result.get("error_type"),
|
|
suggestion=result.get("suggestion")
|
|
)
|
|
|
|
self.db.commit()
|
|
return count
|
|
|
|
def get_results(self, run_id: str) -> List[TestResultDB]:
|
|
"""Holt alle Ergebnisse eines Runs."""
|
|
return self.db.query(TestResultDB).filter(TestResultDB.run_id == run_id).all()
|
|
|
|
def get_failed_results(self, run_id: str) -> List[TestResultDB]:
|
|
"""Holt nur fehlgeschlagene Ergebnisse eines Runs."""
|
|
return self.db.query(TestResultDB).filter(
|
|
TestResultDB.run_id == run_id,
|
|
TestResultDB.status.in_(["failed", "error"])
|
|
).all()
|
|
|
|
# ========================================
|
|
# Backlog
|
|
# ========================================
|
|
|
|
def _update_backlog(
|
|
self,
|
|
run_id: str,
|
|
test_name: str,
|
|
test_file: Optional[str],
|
|
error_message: Optional[str],
|
|
error_type: Optional[str],
|
|
suggestion: Optional[str] = None
|
|
):
|
|
"""Aktualisiert oder erstellt einen Backlog-Eintrag fuer einen fehlgeschlagenen Test."""
|
|
# Hole den Run um Service und Framework zu bekommen
|
|
run = self.db.query(TestRunDB).filter(TestRunDB.run_id == run_id).first()
|
|
if not run:
|
|
return
|
|
|
|
# Suche nach existierendem Backlog-Eintrag
|
|
backlog = self.db.query(FailedTestBacklogDB).filter(
|
|
FailedTestBacklogDB.test_name == test_name,
|
|
FailedTestBacklogDB.service == run.service
|
|
).first()
|
|
|
|
now = datetime.utcnow()
|
|
|
|
if backlog:
|
|
# Existiert bereits - aktualisiere
|
|
backlog.last_failed_at = now
|
|
backlog.failure_count += 1
|
|
backlog.error_message = error_message or backlog.error_message
|
|
backlog.error_type = error_type or backlog.error_type
|
|
if suggestion:
|
|
backlog.fix_suggestion = suggestion
|
|
# Reset status wenn es wieder fehlschlaegt
|
|
if backlog.status == "fixed":
|
|
backlog.status = "open"
|
|
backlog.notes = f"Erneut fehlgeschlagen nach Fix am {now.isoformat()}"
|
|
else:
|
|
# Neu erstellen
|
|
backlog = FailedTestBacklogDB(
|
|
test_name=test_name,
|
|
test_file=test_file,
|
|
service=run.service,
|
|
framework=run.framework,
|
|
error_message=error_message,
|
|
error_type=error_type,
|
|
first_failed_at=now,
|
|
last_failed_at=now,
|
|
failure_count=1,
|
|
status="open",
|
|
priority=self._calculate_priority(error_type),
|
|
fix_suggestion=suggestion
|
|
)
|
|
self.db.add(backlog)
|
|
|
|
def _calculate_priority(self, error_type: Optional[str]) -> str:
|
|
"""Berechnet Prioritaet basierend auf Fehlertyp."""
|
|
high_priority = ["nil_pointer", "panic", "security", "critical"]
|
|
medium_priority = ["assertion", "type_error", "value_error"]
|
|
|
|
if error_type:
|
|
if any(p in error_type.lower() for p in high_priority):
|
|
return "high"
|
|
if any(p in error_type.lower() for p in medium_priority):
|
|
return "medium"
|
|
return "medium"
|
|
|
|
def get_backlog(
|
|
self,
|
|
status: Optional[str] = None,
|
|
service: Optional[str] = None,
|
|
priority: Optional[str] = None,
|
|
limit: int = 100,
|
|
offset: int = 0
|
|
) -> List[FailedTestBacklogDB]:
|
|
"""Holt Backlog-Eintraege mit optionalen Filtern."""
|
|
query = self.db.query(FailedTestBacklogDB)
|
|
|
|
if status:
|
|
query = query.filter(FailedTestBacklogDB.status == status)
|
|
if service:
|
|
query = query.filter(FailedTestBacklogDB.service == service)
|
|
if priority:
|
|
query = query.filter(FailedTestBacklogDB.priority == priority)
|
|
|
|
return query.order_by(
|
|
desc(FailedTestBacklogDB.failure_count),
|
|
desc(FailedTestBacklogDB.last_failed_at)
|
|
).offset(offset).limit(limit).all()
|
|
|
|
def get_backlog_count(
|
|
self,
|
|
status: Optional[str] = None,
|
|
service: Optional[str] = None
|
|
) -> int:
|
|
"""Zaehlt Backlog-Eintraege."""
|
|
query = self.db.query(func.count(FailedTestBacklogDB.id))
|
|
if status:
|
|
query = query.filter(FailedTestBacklogDB.status == status)
|
|
if service:
|
|
query = query.filter(FailedTestBacklogDB.service == service)
|
|
return query.scalar() or 0
|
|
|
|
def get_backlog_item(self, backlog_id: int) -> Optional[FailedTestBacklogDB]:
|
|
"""Holt einen einzelnen Backlog-Eintrag."""
|
|
return self.db.query(FailedTestBacklogDB).filter(FailedTestBacklogDB.id == backlog_id).first()
|
|
|
|
def update_backlog_status(
|
|
self,
|
|
backlog_id: int,
|
|
status: str,
|
|
notes: Optional[str] = None,
|
|
assigned_to: Optional[str] = None
|
|
) -> Optional[FailedTestBacklogDB]:
|
|
"""Aktualisiert den Status eines Backlog-Eintrags."""
|
|
backlog = self.get_backlog_item(backlog_id)
|
|
if backlog:
|
|
backlog.status = status
|
|
if notes:
|
|
backlog.notes = notes
|
|
if assigned_to:
|
|
backlog.assigned_to = assigned_to
|
|
self.db.commit()
|
|
self.db.refresh(backlog)
|
|
return backlog
|
|
|
|
def update_backlog_priority(self, backlog_id: int, priority: str) -> Optional[FailedTestBacklogDB]:
|
|
"""Aktualisiert die Prioritaet eines Backlog-Eintrags."""
|
|
backlog = self.get_backlog_item(backlog_id)
|
|
if backlog:
|
|
backlog.priority = priority
|
|
self.db.commit()
|
|
self.db.refresh(backlog)
|
|
return backlog
|
|
|
|
# ========================================
|
|
# Fix History
|
|
# ========================================
|
|
|
|
def add_fix_attempt(
|
|
self,
|
|
backlog_id: int,
|
|
fix_type: str,
|
|
fix_description: str,
|
|
commit_hash: Optional[str] = None,
|
|
success: bool = False
|
|
) -> TestFixHistoryDB:
|
|
"""Fuegt einen Fix-Versuch zur Historie hinzu."""
|
|
fix = TestFixHistoryDB(
|
|
backlog_id=backlog_id,
|
|
fix_type=fix_type,
|
|
fix_description=fix_description,
|
|
commit_hash=commit_hash,
|
|
success=success
|
|
)
|
|
self.db.add(fix)
|
|
|
|
# Bei Erfolg: Backlog-Status aktualisieren
|
|
if success:
|
|
backlog = self.get_backlog_item(backlog_id)
|
|
if backlog:
|
|
backlog.status = "fixed"
|
|
|
|
self.db.commit()
|
|
self.db.refresh(fix)
|
|
return fix
|
|
|
|
def get_fix_history(self, backlog_id: int) -> List[TestFixHistoryDB]:
|
|
"""Holt die Fix-Historie fuer einen Backlog-Eintrag."""
|
|
return self.db.query(TestFixHistoryDB).filter(
|
|
TestFixHistoryDB.backlog_id == backlog_id
|
|
).order_by(desc(TestFixHistoryDB.created_at)).all()
|
|
|
|
# ========================================
|
|
# Service Statistics
|
|
# ========================================
|
|
|
|
def _update_service_stats(self, run: TestRunDB):
|
|
"""Aktualisiert die Service-Statistiken nach einem Run."""
|
|
stats = self.db.query(TestServiceStatsDB).filter(
|
|
TestServiceStatsDB.service == run.service
|
|
).first()
|
|
|
|
if not stats:
|
|
stats = TestServiceStatsDB(service=run.service)
|
|
self.db.add(stats)
|
|
|
|
stats.total_tests = run.total_tests
|
|
stats.passed_tests = run.passed_tests
|
|
stats.failed_tests = run.failed_tests
|
|
stats.skipped_tests = run.skipped_tests
|
|
stats.pass_rate = (run.passed_tests / run.total_tests * 100) if run.total_tests > 0 else 0.0
|
|
stats.last_run_id = run.run_id
|
|
stats.last_run_at = run.completed_at or datetime.utcnow()
|
|
stats.last_status = run.status
|
|
|
|
self.db.commit()
|
|
|
|
def get_service_stats(self, service: str) -> Optional[TestServiceStatsDB]:
|
|
"""Holt Statistiken fuer einen Service."""
|
|
return self.db.query(TestServiceStatsDB).filter(
|
|
TestServiceStatsDB.service == service
|
|
).first()
|
|
|
|
def get_all_service_stats(self) -> List[TestServiceStatsDB]:
|
|
"""Holt Statistiken fuer alle Services."""
|
|
return self.db.query(TestServiceStatsDB).all()
|
|
|
|
# ========================================
|
|
# History & Trends
|
|
# ========================================
|
|
|
|
def get_run_history(
|
|
self,
|
|
service: Optional[str] = None,
|
|
days: int = 30,
|
|
limit: int = 100
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Holt die Run-Historie fuer Trend-Analysen.
|
|
Gruppiert nach Tag.
|
|
"""
|
|
from datetime import timedelta
|
|
cutoff = datetime.utcnow() - timedelta(days=days)
|
|
|
|
query = self.db.query(
|
|
func.date(TestRunDB.started_at).label('date'),
|
|
TestRunDB.service,
|
|
func.count(TestRunDB.id).label('runs'),
|
|
func.sum(TestRunDB.total_tests).label('total_tests'),
|
|
func.sum(TestRunDB.passed_tests).label('passed'),
|
|
func.sum(TestRunDB.failed_tests).label('failed')
|
|
).filter(TestRunDB.started_at >= cutoff)
|
|
|
|
if service:
|
|
query = query.filter(TestRunDB.service == service)
|
|
|
|
results = query.group_by(
|
|
func.date(TestRunDB.started_at),
|
|
TestRunDB.service
|
|
).order_by(desc(func.date(TestRunDB.started_at))).limit(limit).all()
|
|
|
|
return [
|
|
{
|
|
"date": str(r.date),
|
|
"service": r.service,
|
|
"runs": r.runs,
|
|
"total_tests": r.total_tests or 0,
|
|
"passed": r.passed or 0,
|
|
"failed": r.failed or 0,
|
|
"pass_rate": round((r.passed / r.total_tests * 100) if r.total_tests else 0, 1)
|
|
}
|
|
for r in results
|
|
]
|
|
|
|
def get_summary_stats(self) -> Dict[str, Any]:
|
|
"""Holt aggregierte Statistiken ueber alle Services."""
|
|
stats = self.db.query(
|
|
func.sum(TestServiceStatsDB.total_tests).label('total_tests'),
|
|
func.sum(TestServiceStatsDB.passed_tests).label('passed'),
|
|
func.sum(TestServiceStatsDB.failed_tests).label('failed'),
|
|
func.sum(TestServiceStatsDB.skipped_tests).label('skipped'),
|
|
func.count(TestServiceStatsDB.id).label('services_count')
|
|
).first()
|
|
|
|
total = stats.total_tests or 0
|
|
passed = stats.passed or 0
|
|
|
|
return {
|
|
"total_tests": total,
|
|
"total_passed": passed,
|
|
"total_failed": stats.failed or 0,
|
|
"total_skipped": stats.skipped or 0,
|
|
"services_count": stats.services_count or 0,
|
|
"overall_pass_rate": round((passed / total * 100) if total > 0 else 0, 1)
|
|
}
|
|
|
|
# ========================================
|
|
# Migration Helper
|
|
# ========================================
|
|
|
|
def migrate_from_json(self, persisted_results: Dict[str, Dict]) -> int:
|
|
"""
|
|
Migriert bestehende JSON-Daten in die Datenbank.
|
|
Wird einmalig beim Upgrade ausgefuehrt.
|
|
"""
|
|
count = 0
|
|
for service, data in persisted_results.items():
|
|
# Service-Stats aktualisieren
|
|
stats = self.db.query(TestServiceStatsDB).filter(
|
|
TestServiceStatsDB.service == service
|
|
).first()
|
|
|
|
if not stats:
|
|
stats = TestServiceStatsDB(service=service)
|
|
self.db.add(stats)
|
|
|
|
stats.total_tests = data.get("total", 0)
|
|
stats.passed_tests = data.get("passed", 0)
|
|
stats.failed_tests = data.get("failed", 0)
|
|
stats.pass_rate = (stats.passed_tests / stats.total_tests * 100) if stats.total_tests > 0 else 0.0
|
|
|
|
last_run = data.get("last_run")
|
|
if last_run:
|
|
try:
|
|
stats.last_run_at = datetime.fromisoformat(last_run)
|
|
except:
|
|
stats.last_run_at = datetime.utcnow()
|
|
|
|
stats.last_status = data.get("status", "unknown")
|
|
|
|
# Fehlgeschlagene Tests ins Backlog
|
|
for failed in data.get("failed_test_ids", []):
|
|
if isinstance(failed, dict):
|
|
test_name = failed.get("id") or failed.get("name", "unknown")
|
|
existing = self.db.query(FailedTestBacklogDB).filter(
|
|
FailedTestBacklogDB.test_name == test_name,
|
|
FailedTestBacklogDB.service == service
|
|
).first()
|
|
|
|
if not existing:
|
|
backlog = FailedTestBacklogDB(
|
|
test_name=test_name,
|
|
test_file=failed.get("file_path"),
|
|
service=service,
|
|
error_message=failed.get("error_message"),
|
|
error_type=failed.get("error_type"),
|
|
first_failed_at=stats.last_run_at or datetime.utcnow(),
|
|
last_failed_at=stats.last_run_at or datetime.utcnow(),
|
|
failure_count=1,
|
|
status="open",
|
|
priority=self._calculate_priority(failed.get("error_type")),
|
|
fix_suggestion=failed.get("suggestion")
|
|
)
|
|
self.db.add(backlog)
|
|
|
|
count += 1
|
|
|
|
self.db.commit()
|
|
return count
|