Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 25s
CI / test-go-edu-search (push) Successful in 26s
CI / test-python-klausur (push) Failing after 1m55s
CI / test-python-agent-core (push) Successful in 16s
CI / test-nodejs-website (push) Successful in 18s
- Voice-Service von Core nach Lehrer verschoben (bp-lehrer-voice-service) - 4 Jitsi-Services + 2 Synapse-Services in docker-compose.yml aufgenommen - Camunda komplett gelöscht: workflow pages, workflow-config.ts, bpmn-js deps - CAMUNDA_URL aus backend-lehrer environment entfernt - Sidebar: Kategorie "Compliance SDK" + "Katalogverwaltung" entfernt - Sidebar: Neue Kategorie "Kommunikation" mit Video & Chat, Voice Service, Alerts Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
341 lines
10 KiB
Python
341 lines
10 KiB
Python
"""
|
|
Regression Tracker
|
|
Tracks test scores over time to detect quality regressions
|
|
"""
|
|
import sqlite3
|
|
import json
|
|
import subprocess
|
|
import structlog
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Optional, Tuple, Dict, Any
|
|
from dataclasses import dataclass, asdict
|
|
from pathlib import Path
|
|
|
|
from bqas.config import BQASConfig
|
|
from bqas.metrics import BQASMetrics
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class TestRun:
|
|
"""Record of a single test run."""
|
|
id: Optional[int] = None
|
|
timestamp: datetime = None
|
|
git_commit: str = ""
|
|
git_branch: str = ""
|
|
golden_score: float = 0.0
|
|
synthetic_score: float = 0.0
|
|
total_tests: int = 0
|
|
passed_tests: int = 0
|
|
failed_tests: int = 0
|
|
failures: List[str] = None
|
|
duration_seconds: float = 0.0
|
|
metadata: Dict[str, Any] = None
|
|
|
|
def __post_init__(self):
|
|
if self.timestamp is None:
|
|
self.timestamp = datetime.utcnow()
|
|
if self.failures is None:
|
|
self.failures = []
|
|
if self.metadata is None:
|
|
self.metadata = {}
|
|
|
|
|
|
class RegressionTracker:
|
|
"""
|
|
Tracks BQAS test scores over time.
|
|
|
|
Features:
|
|
- SQLite persistence
|
|
- Regression detection
|
|
- Trend analysis
|
|
- Alerting
|
|
"""
|
|
|
|
def __init__(self, config: Optional[BQASConfig] = None):
|
|
self.config = config or BQASConfig.from_env()
|
|
self.db_path = Path(self.config.db_path)
|
|
self._init_db()
|
|
|
|
def _init_db(self):
|
|
"""Initialize SQLite database."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS test_runs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT NOT NULL,
|
|
git_commit TEXT,
|
|
git_branch TEXT,
|
|
golden_score REAL,
|
|
synthetic_score REAL,
|
|
total_tests INTEGER,
|
|
passed_tests INTEGER,
|
|
failed_tests INTEGER,
|
|
failures TEXT,
|
|
duration_seconds REAL,
|
|
metadata TEXT
|
|
)
|
|
""")
|
|
|
|
cursor.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_timestamp
|
|
ON test_runs(timestamp)
|
|
""")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def _get_git_info(self) -> Tuple[str, str]:
|
|
"""Get current git commit and branch."""
|
|
try:
|
|
commit = subprocess.check_output(
|
|
["git", "rev-parse", "HEAD"],
|
|
stderr=subprocess.DEVNULL,
|
|
).decode().strip()[:8]
|
|
|
|
branch = subprocess.check_output(
|
|
["git", "rev-parse", "--abbrev-ref", "HEAD"],
|
|
stderr=subprocess.DEVNULL,
|
|
).decode().strip()
|
|
|
|
return commit, branch
|
|
except Exception:
|
|
return "unknown", "unknown"
|
|
|
|
def record_run(self, metrics: BQASMetrics, synthetic_score: float = 0.0) -> TestRun:
|
|
"""
|
|
Record a test run.
|
|
|
|
Args:
|
|
metrics: Aggregated metrics from the test run
|
|
synthetic_score: Optional synthetic test score
|
|
|
|
Returns:
|
|
Recorded TestRun
|
|
"""
|
|
git_commit, git_branch = self._get_git_info()
|
|
|
|
run = TestRun(
|
|
timestamp=metrics.timestamp,
|
|
git_commit=git_commit,
|
|
git_branch=git_branch,
|
|
golden_score=metrics.avg_composite_score,
|
|
synthetic_score=synthetic_score,
|
|
total_tests=metrics.total_tests,
|
|
passed_tests=metrics.passed_tests,
|
|
failed_tests=metrics.failed_tests,
|
|
failures=metrics.failed_test_ids,
|
|
duration_seconds=metrics.total_duration_ms / 1000,
|
|
metadata={"scores_by_intent": metrics.scores_by_intent},
|
|
)
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
INSERT INTO test_runs (
|
|
timestamp, git_commit, git_branch, golden_score,
|
|
synthetic_score, total_tests, passed_tests, failed_tests,
|
|
failures, duration_seconds, metadata
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
run.timestamp.isoformat(),
|
|
run.git_commit,
|
|
run.git_branch,
|
|
run.golden_score,
|
|
run.synthetic_score,
|
|
run.total_tests,
|
|
run.passed_tests,
|
|
run.failed_tests,
|
|
json.dumps(run.failures),
|
|
run.duration_seconds,
|
|
json.dumps(run.metadata),
|
|
))
|
|
|
|
run.id = cursor.lastrowid
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
logger.info(
|
|
"Test run recorded",
|
|
run_id=run.id,
|
|
score=run.golden_score,
|
|
passed=run.passed_tests,
|
|
failed=run.failed_tests,
|
|
)
|
|
|
|
return run
|
|
|
|
def get_last_runs(self, n: int = 5) -> List[TestRun]:
|
|
"""Get the last N test runs."""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT id, timestamp, git_commit, git_branch, golden_score,
|
|
synthetic_score, total_tests, passed_tests, failed_tests,
|
|
failures, duration_seconds, metadata
|
|
FROM test_runs
|
|
ORDER BY timestamp DESC
|
|
LIMIT ?
|
|
""", (n,))
|
|
|
|
runs = []
|
|
for row in cursor.fetchall():
|
|
runs.append(TestRun(
|
|
id=row[0],
|
|
timestamp=datetime.fromisoformat(row[1]),
|
|
git_commit=row[2],
|
|
git_branch=row[3],
|
|
golden_score=row[4],
|
|
synthetic_score=row[5],
|
|
total_tests=row[6],
|
|
passed_tests=row[7],
|
|
failed_tests=row[8],
|
|
failures=json.loads(row[9]) if row[9] else [],
|
|
duration_seconds=row[10],
|
|
metadata=json.loads(row[11]) if row[11] else {},
|
|
))
|
|
|
|
conn.close()
|
|
return runs
|
|
|
|
def get_runs_since(self, days: int = 30) -> List[TestRun]:
|
|
"""Get all runs in the last N days."""
|
|
since = datetime.utcnow() - timedelta(days=days)
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
SELECT id, timestamp, git_commit, git_branch, golden_score,
|
|
synthetic_score, total_tests, passed_tests, failed_tests,
|
|
failures, duration_seconds, metadata
|
|
FROM test_runs
|
|
WHERE timestamp >= ?
|
|
ORDER BY timestamp ASC
|
|
""", (since.isoformat(),))
|
|
|
|
runs = []
|
|
for row in cursor.fetchall():
|
|
runs.append(TestRun(
|
|
id=row[0],
|
|
timestamp=datetime.fromisoformat(row[1]),
|
|
git_commit=row[2],
|
|
git_branch=row[3],
|
|
golden_score=row[4],
|
|
synthetic_score=row[5],
|
|
total_tests=row[6],
|
|
passed_tests=row[7],
|
|
failed_tests=row[8],
|
|
failures=json.loads(row[9]) if row[9] else [],
|
|
duration_seconds=row[10],
|
|
metadata=json.loads(row[11]) if row[11] else {},
|
|
))
|
|
|
|
conn.close()
|
|
return runs
|
|
|
|
def check_regression(
|
|
self,
|
|
current_score: float,
|
|
threshold: Optional[float] = None,
|
|
) -> Tuple[bool, float, str]:
|
|
"""
|
|
Check if current score indicates a regression.
|
|
|
|
Args:
|
|
current_score: Current test run score
|
|
threshold: Optional threshold override
|
|
|
|
Returns:
|
|
(is_regression, delta, message)
|
|
"""
|
|
threshold = threshold or self.config.regression_threshold
|
|
last_runs = self.get_last_runs(n=5)
|
|
|
|
if len(last_runs) < 2:
|
|
return False, 0.0, "Not enough historical data"
|
|
|
|
# Calculate average of last runs
|
|
avg_score = sum(r.golden_score for r in last_runs) / len(last_runs)
|
|
delta = avg_score - current_score
|
|
|
|
if delta > threshold:
|
|
msg = f"Regression detected: score dropped from {avg_score:.3f} to {current_score:.3f} (delta: {delta:.3f})"
|
|
logger.warning(msg)
|
|
return True, delta, msg
|
|
|
|
return False, delta, f"Score stable: {current_score:.3f} (avg: {avg_score:.3f}, delta: {delta:.3f})"
|
|
|
|
def get_trend(self, days: int = 30) -> Dict[str, Any]:
|
|
"""
|
|
Get score trend for the last N days.
|
|
|
|
Returns:
|
|
Dictionary with dates, scores, and trend direction
|
|
"""
|
|
runs = self.get_runs_since(days)
|
|
|
|
if not runs:
|
|
return {
|
|
"dates": [],
|
|
"scores": [],
|
|
"trend": "unknown",
|
|
"avg_score": 0.0,
|
|
}
|
|
|
|
dates = [r.timestamp.isoformat() for r in runs]
|
|
scores = [r.golden_score for r in runs]
|
|
avg_score = sum(scores) / len(scores)
|
|
|
|
# Determine trend
|
|
if len(scores) >= 3:
|
|
recent = scores[-3:]
|
|
older = scores[:3]
|
|
recent_avg = sum(recent) / len(recent)
|
|
older_avg = sum(older) / len(older)
|
|
|
|
if recent_avg > older_avg + 0.05:
|
|
trend = "improving"
|
|
elif recent_avg < older_avg - 0.05:
|
|
trend = "declining"
|
|
else:
|
|
trend = "stable"
|
|
else:
|
|
trend = "insufficient_data"
|
|
|
|
return {
|
|
"dates": dates,
|
|
"scores": scores,
|
|
"trend": trend,
|
|
"avg_score": round(avg_score, 3),
|
|
"min_score": round(min(scores), 3),
|
|
"max_score": round(max(scores), 3),
|
|
}
|
|
|
|
def get_failing_intents(self, n: int = 5) -> Dict[str, float]:
|
|
"""Get intents with lowest scores from recent runs."""
|
|
runs = self.get_last_runs(n)
|
|
|
|
intent_scores: Dict[str, List[float]] = {}
|
|
|
|
for run in runs:
|
|
if "scores_by_intent" in run.metadata:
|
|
for intent, score in run.metadata["scores_by_intent"].items():
|
|
if intent not in intent_scores:
|
|
intent_scores[intent] = []
|
|
intent_scores[intent].append(score)
|
|
|
|
# Calculate averages and sort
|
|
avg_scores = {
|
|
intent: sum(scores) / len(scores)
|
|
for intent, scores in intent_scores.items()
|
|
}
|
|
|
|
# Return sorted from worst to best
|
|
return dict(sorted(avg_scores.items(), key=lambda x: x[1]))
|