""" SQLAlchemy Models fuer Test Registry. Definiert die Datenbank-Tabellen fuer persistente Test-Speicherung: - TestRunDB: Jeder Test-Durchlauf - TestResultDB: Einzelne Test-Ergebnisse - FailedTestBacklogDB: Persistenter Backlog fuer zu fixende Tests - TestFixHistoryDB: Historie aller Fix-Versuche - TestServiceStatsDB: Aggregierte Statistiken pro Service """ from datetime import datetime from sqlalchemy import ( Column, Integer, String, Float, Text, DateTime, Boolean, ForeignKey, UniqueConstraint, Index ) from sqlalchemy.orm import relationship # Nutze die gleiche Base wie Classroom Engine fuer konsistente Migrations from classroom_engine.database import Base class TestRunDB(Base): """ Speichert jeden Test-Durchlauf. Enthaelt Metadaten und Aggregat-Statistiken. """ __tablename__ = 'test_runs' id = Column(Integer, primary_key=True, autoincrement=True) run_id = Column(String(50), unique=True, nullable=False, index=True) service = Column(String(100), nullable=False, index=True) framework = Column(String(50), nullable=False) started_at = Column(DateTime, nullable=False, index=True) completed_at = Column(DateTime, nullable=True) status = Column(String(20), nullable=False) # queued, running, completed, failed total_tests = Column(Integer, default=0) passed_tests = Column(Integer, default=0) failed_tests = Column(Integer, default=0) skipped_tests = Column(Integer, default=0) duration_seconds = Column(Float, default=0) git_commit = Column(String(40), nullable=True) git_branch = Column(String(100), nullable=True) triggered_by = Column(String(50), nullable=True) # manual, ci, schedule output = Column(Text, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) # Relationship zu einzelnen Test-Ergebnissen results = relationship("TestResultDB", back_populates="run", cascade="all, delete-orphan") def to_dict(self): return { "id": self.run_id, "run_id": self.run_id, "service": self.service, "framework": self.framework, "started_at": self.started_at.isoformat() if self.started_at else None, "completed_at": self.completed_at.isoformat() if self.completed_at else None, "status": self.status, "total_tests": self.total_tests, "passed_tests": self.passed_tests, "failed_tests": self.failed_tests, "skipped_tests": self.skipped_tests, "duration_seconds": self.duration_seconds, "git_commit": self.git_commit, "git_branch": self.git_branch, "triggered_by": self.triggered_by, } class TestResultDB(Base): """ Speichert einzelne Test-Ergebnisse pro Run. Ermoeglicht detaillierte Analyse fehlgeschlagener Tests. """ __tablename__ = 'test_results' id = Column(Integer, primary_key=True, autoincrement=True) run_id = Column(String(50), ForeignKey('test_runs.run_id', ondelete='CASCADE'), nullable=False, index=True) test_name = Column(String(500), nullable=False, index=True) test_file = Column(String(500), nullable=True) line_number = Column(Integer, nullable=True) status = Column(String(20), nullable=False, index=True) # passed, failed, skipped, error duration_ms = Column(Float, nullable=True) error_message = Column(Text, nullable=True) error_type = Column(String(100), nullable=True) output = Column(Text, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) # Relationship zum Run run = relationship("TestRunDB", back_populates="results") def to_dict(self): return { "id": self.id, "run_id": self.run_id, "test_name": self.test_name, "test_file": self.test_file, "line_number": self.line_number, "status": self.status, "duration_ms": self.duration_ms, "error_message": self.error_message, "error_type": self.error_type, } class FailedTestBacklogDB(Base): """ Persistenter Backlog fuer fehlgeschlagene Tests. Aggregiert Fehler ueber mehrere Runs hinweg. """ __tablename__ = 'failed_tests_backlog' __table_args__ = ( UniqueConstraint('test_name', 'service', name='uq_backlog_test_service'), ) id = Column(Integer, primary_key=True, autoincrement=True) test_name = Column(String(500), nullable=False) test_file = Column(String(500), nullable=True) service = Column(String(100), nullable=False, index=True) framework = Column(String(50), nullable=True) error_message = Column(Text, nullable=True) error_type = Column(String(100), nullable=True) first_failed_at = Column(DateTime, nullable=False) last_failed_at = Column(DateTime, nullable=False) failure_count = Column(Integer, default=1) status = Column(String(30), default='open', index=True) # open, in_progress, fixed, wont_fix, flaky priority = Column(String(20), default='medium', index=True) # critical, high, medium, low assigned_to = Column(String(100), nullable=True) fix_suggestion = Column(Text, nullable=True) notes = Column(Text, nullable=True) # Resolution-Felder (auto-close wenn Tests bestehen) resolved_at = Column(DateTime, nullable=True) resolution_commit = Column(String(50), nullable=True) resolution_notes = Column(Text, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) # Relationship zu Fix-Historie fixes = relationship("TestFixHistoryDB", back_populates="backlog_item", cascade="all, delete-orphan") def to_dict(self): return { "id": self.id, "test_name": self.test_name, "test_file": self.test_file, "service": self.service, "framework": self.framework, "error_message": self.error_message, "error_type": self.error_type, "first_failed_at": self.first_failed_at.isoformat() if self.first_failed_at else None, "last_failed_at": self.last_failed_at.isoformat() if self.last_failed_at else None, "failure_count": self.failure_count, "status": self.status, "priority": self.priority, "assigned_to": self.assigned_to, "fix_suggestion": self.fix_suggestion, "notes": self.notes, "resolved_at": self.resolved_at.isoformat() if self.resolved_at else None, "resolution_commit": self.resolution_commit, "resolution_notes": self.resolution_notes, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None, } class TestFixHistoryDB(Base): """ Historie aller Fix-Versuche fuer einen Backlog-Eintrag. Ermoeglicht Tracking von Auto-Fix und manuellen Fixes. """ __tablename__ = 'test_fixes_history' id = Column(Integer, primary_key=True, autoincrement=True) backlog_id = Column(Integer, ForeignKey('failed_tests_backlog.id', ondelete='CASCADE'), nullable=False, index=True) fix_type = Column(String(50), nullable=True) # manual, auto_claude, auto_script fix_description = Column(Text, nullable=True) commit_hash = Column(String(40), nullable=True) success = Column(Boolean, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) # Relationship zum Backlog-Item backlog_item = relationship("FailedTestBacklogDB", back_populates="fixes") def to_dict(self): return { "id": self.id, "backlog_id": self.backlog_id, "fix_type": self.fix_type, "fix_description": self.fix_description, "commit_hash": self.commit_hash, "success": self.success, "created_at": self.created_at.isoformat() if self.created_at else None, } class TestServiceStatsDB(Base): """ Aggregierte Statistiken pro Service. Wird nach jedem Test-Run aktualisiert fuer schnelle Abfragen. """ __tablename__ = 'test_service_stats' id = Column(Integer, primary_key=True, autoincrement=True) service = Column(String(100), unique=True, nullable=False) total_tests = Column(Integer, default=0) passed_tests = Column(Integer, default=0) failed_tests = Column(Integer, default=0) skipped_tests = Column(Integer, default=0) pass_rate = Column(Float, default=0.0) last_run_id = Column(String(50), nullable=True) last_run_at = Column(DateTime, nullable=True) last_status = Column(String(20), nullable=True) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def to_dict(self): return { "service": self.service, "total_tests": self.total_tests, "passed_tests": self.passed_tests, "failed_tests": self.failed_tests, "skipped_tests": self.skipped_tests, "pass_rate": round(self.pass_rate, 1) if self.pass_rate else 0.0, "last_run_id": self.last_run_id, "last_run_at": self.last_run_at.isoformat() if self.last_run_at else None, "last_status": self.last_status, "updated_at": self.updated_at.isoformat() if self.updated_at else None, }