""" Regression Tracker Tracks test scores over time to detect quality regressions """ import sqlite3 import json import subprocess import structlog from datetime import datetime, timedelta from typing import List, Optional, Tuple, Dict, Any from dataclasses import dataclass, asdict from pathlib import Path from bqas.config import BQASConfig from bqas.metrics import BQASMetrics logger = structlog.get_logger(__name__) @dataclass class TestRun: """Record of a single test run.""" id: Optional[int] = None timestamp: datetime = None git_commit: str = "" git_branch: str = "" golden_score: float = 0.0 synthetic_score: float = 0.0 total_tests: int = 0 passed_tests: int = 0 failed_tests: int = 0 failures: List[str] = None duration_seconds: float = 0.0 metadata: Dict[str, Any] = None def __post_init__(self): if self.timestamp is None: self.timestamp = datetime.utcnow() if self.failures is None: self.failures = [] if self.metadata is None: self.metadata = {} class RegressionTracker: """ Tracks BQAS test scores over time. Features: - SQLite persistence - Regression detection - Trend analysis - Alerting """ def __init__(self, config: Optional[BQASConfig] = None): self.config = config or BQASConfig.from_env() self.db_path = Path(self.config.db_path) self._init_db() def _init_db(self): """Initialize SQLite database.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS test_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, git_commit TEXT, git_branch TEXT, golden_score REAL, synthetic_score REAL, total_tests INTEGER, passed_tests INTEGER, failed_tests INTEGER, failures TEXT, duration_seconds REAL, metadata TEXT ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_timestamp ON test_runs(timestamp) """) conn.commit() conn.close() def _get_git_info(self) -> Tuple[str, str]: """Get current git commit and branch.""" try: commit = subprocess.check_output( ["git", "rev-parse", "HEAD"], stderr=subprocess.DEVNULL, ).decode().strip()[:8] branch = subprocess.check_output( ["git", "rev-parse", "--abbrev-ref", "HEAD"], stderr=subprocess.DEVNULL, ).decode().strip() return commit, branch except Exception: return "unknown", "unknown" def record_run(self, metrics: BQASMetrics, synthetic_score: float = 0.0) -> TestRun: """ Record a test run. Args: metrics: Aggregated metrics from the test run synthetic_score: Optional synthetic test score Returns: Recorded TestRun """ git_commit, git_branch = self._get_git_info() run = TestRun( timestamp=metrics.timestamp, git_commit=git_commit, git_branch=git_branch, golden_score=metrics.avg_composite_score, synthetic_score=synthetic_score, total_tests=metrics.total_tests, passed_tests=metrics.passed_tests, failed_tests=metrics.failed_tests, failures=metrics.failed_test_ids, duration_seconds=metrics.total_duration_ms / 1000, metadata={"scores_by_intent": metrics.scores_by_intent}, ) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" INSERT INTO test_runs ( timestamp, git_commit, git_branch, golden_score, synthetic_score, total_tests, passed_tests, failed_tests, failures, duration_seconds, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( run.timestamp.isoformat(), run.git_commit, run.git_branch, run.golden_score, run.synthetic_score, run.total_tests, run.passed_tests, run.failed_tests, json.dumps(run.failures), run.duration_seconds, json.dumps(run.metadata), )) run.id = cursor.lastrowid conn.commit() conn.close() logger.info( "Test run recorded", run_id=run.id, score=run.golden_score, passed=run.passed_tests, failed=run.failed_tests, ) return run def get_last_runs(self, n: int = 5) -> List[TestRun]: """Get the last N test runs.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" SELECT id, timestamp, git_commit, git_branch, golden_score, synthetic_score, total_tests, passed_tests, failed_tests, failures, duration_seconds, metadata FROM test_runs ORDER BY timestamp DESC LIMIT ? """, (n,)) runs = [] for row in cursor.fetchall(): runs.append(TestRun( id=row[0], timestamp=datetime.fromisoformat(row[1]), git_commit=row[2], git_branch=row[3], golden_score=row[4], synthetic_score=row[5], total_tests=row[6], passed_tests=row[7], failed_tests=row[8], failures=json.loads(row[9]) if row[9] else [], duration_seconds=row[10], metadata=json.loads(row[11]) if row[11] else {}, )) conn.close() return runs def get_runs_since(self, days: int = 30) -> List[TestRun]: """Get all runs in the last N days.""" since = datetime.utcnow() - timedelta(days=days) conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(""" SELECT id, timestamp, git_commit, git_branch, golden_score, synthetic_score, total_tests, passed_tests, failed_tests, failures, duration_seconds, metadata FROM test_runs WHERE timestamp >= ? ORDER BY timestamp ASC """, (since.isoformat(),)) runs = [] for row in cursor.fetchall(): runs.append(TestRun( id=row[0], timestamp=datetime.fromisoformat(row[1]), git_commit=row[2], git_branch=row[3], golden_score=row[4], synthetic_score=row[5], total_tests=row[6], passed_tests=row[7], failed_tests=row[8], failures=json.loads(row[9]) if row[9] else [], duration_seconds=row[10], metadata=json.loads(row[11]) if row[11] else {}, )) conn.close() return runs def check_regression( self, current_score: float, threshold: Optional[float] = None, ) -> Tuple[bool, float, str]: """ Check if current score indicates a regression. Args: current_score: Current test run score threshold: Optional threshold override Returns: (is_regression, delta, message) """ threshold = threshold or self.config.regression_threshold last_runs = self.get_last_runs(n=5) if len(last_runs) < 2: return False, 0.0, "Not enough historical data" # Calculate average of last runs avg_score = sum(r.golden_score for r in last_runs) / len(last_runs) delta = avg_score - current_score if delta > threshold: msg = f"Regression detected: score dropped from {avg_score:.3f} to {current_score:.3f} (delta: {delta:.3f})" logger.warning(msg) return True, delta, msg return False, delta, f"Score stable: {current_score:.3f} (avg: {avg_score:.3f}, delta: {delta:.3f})" def get_trend(self, days: int = 30) -> Dict[str, Any]: """ Get score trend for the last N days. Returns: Dictionary with dates, scores, and trend direction """ runs = self.get_runs_since(days) if not runs: return { "dates": [], "scores": [], "trend": "unknown", "avg_score": 0.0, } dates = [r.timestamp.isoformat() for r in runs] scores = [r.golden_score for r in runs] avg_score = sum(scores) / len(scores) # Determine trend if len(scores) >= 3: recent = scores[-3:] older = scores[:3] recent_avg = sum(recent) / len(recent) older_avg = sum(older) / len(older) if recent_avg > older_avg + 0.05: trend = "improving" elif recent_avg < older_avg - 0.05: trend = "declining" else: trend = "stable" else: trend = "insufficient_data" return { "dates": dates, "scores": scores, "trend": trend, "avg_score": round(avg_score, 3), "min_score": round(min(scores), 3), "max_score": round(max(scores), 3), } def get_failing_intents(self, n: int = 5) -> Dict[str, float]: """Get intents with lowest scores from recent runs.""" runs = self.get_last_runs(n) intent_scores: Dict[str, List[float]] = {} for run in runs: if "scores_by_intent" in run.metadata: for intent, score in run.metadata["scores_by_intent"].items(): if intent not in intent_scores: intent_scores[intent] = [] intent_scores[intent].append(score) # Calculate averages and sort avg_scores = { intent: sum(scores) / len(scores) for intent, scores in intent_scores.items() } # Return sorted from worst to best return dict(sorted(avg_scores.items(), key=lambda x: x[1]))