Files

366 lines
11 KiB
Python

"""
BQAS API - Quality Assurance Endpoints
"""
import structlog
import subprocess
from fastapi import APIRouter, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
from datetime import datetime
from bqas.runner import get_runner, BQASRunner
logger = structlog.get_logger(__name__)
router = APIRouter()
# Response Models
class TestRunResponse(BaseModel):
id: int
timestamp: str
git_commit: Optional[str] = None
suite: str
golden_score: float
synthetic_score: float
rag_score: float = 0.0
total_tests: int
passed_tests: int
failed_tests: int
duration_seconds: float
class MetricsResponse(BaseModel):
total_tests: int
passed_tests: int
failed_tests: int
avg_intent_accuracy: float
avg_faithfulness: float
avg_relevance: float
avg_coherence: float
safety_pass_rate: float
avg_composite_score: float
scores_by_intent: Dict[str, float]
failed_test_ids: List[str]
class TrendResponse(BaseModel):
dates: List[str]
scores: List[float]
trend: str # improving, stable, declining, insufficient_data
class LatestMetricsResponse(BaseModel):
golden: Optional[MetricsResponse] = None
synthetic: Optional[MetricsResponse] = None
rag: Optional[MetricsResponse] = None
class RunResultResponse(BaseModel):
success: bool
message: str
metrics: Optional[MetricsResponse] = None
run_id: Optional[int] = None
# State tracking for running tests
_is_running: Dict[str, bool] = {"golden": False, "synthetic": False, "rag": False}
def _get_git_commit() -> Optional[str]:
"""Get current git commit hash."""
try:
result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0:
return result.stdout.strip()
except Exception:
pass
return None
def _metrics_to_response(metrics) -> MetricsResponse:
"""Convert BQASMetrics to API response."""
return MetricsResponse(
total_tests=metrics.total_tests,
passed_tests=metrics.passed_tests,
failed_tests=metrics.failed_tests,
avg_intent_accuracy=round(metrics.avg_intent_accuracy, 2),
avg_faithfulness=round(metrics.avg_faithfulness, 2),
avg_relevance=round(metrics.avg_relevance, 2),
avg_coherence=round(metrics.avg_coherence, 2),
safety_pass_rate=round(metrics.safety_pass_rate, 3),
avg_composite_score=round(metrics.avg_composite_score, 3),
scores_by_intent={k: round(v, 3) for k, v in metrics.scores_by_intent.items()},
failed_test_ids=metrics.failed_test_ids,
)
def _run_to_response(run) -> TestRunResponse:
"""Convert TestRun to API response."""
return TestRunResponse(
id=run.id,
timestamp=run.timestamp.isoformat() + "Z",
git_commit=run.git_commit,
suite=run.suite,
golden_score=round(run.metrics.avg_composite_score, 3) if run.suite == "golden" else 0.0,
synthetic_score=round(run.metrics.avg_composite_score, 3) if run.suite == "synthetic" else 0.0,
rag_score=round(run.metrics.avg_composite_score, 3) if run.suite == "rag" else 0.0,
total_tests=run.metrics.total_tests,
passed_tests=run.metrics.passed_tests,
failed_tests=run.metrics.failed_tests,
duration_seconds=round(run.duration_seconds, 1),
)
@router.get("/runs", response_model=Dict[str, Any])
async def get_test_runs(limit: int = 20):
"""Get recent test runs."""
runner = get_runner()
runs = runner.get_test_runs(limit)
return {
"runs": [_run_to_response(r) for r in runs],
"total": len(runs),
}
@router.get("/run/{run_id}", response_model=TestRunResponse)
async def get_test_run(run_id: int):
"""Get a specific test run."""
runner = get_runner()
runs = runner.get_test_runs(100)
for run in runs:
if run.id == run_id:
return _run_to_response(run)
raise HTTPException(status_code=404, detail="Test run not found")
@router.get("/trend", response_model=TrendResponse)
async def get_trend(days: int = 30):
"""Get score trend over time."""
runner = get_runner()
runs = runner.get_test_runs(100)
# Filter golden suite runs
golden_runs = [r for r in runs if r.suite == "golden"]
if len(golden_runs) < 3:
return TrendResponse(
dates=[],
scores=[],
trend="insufficient_data"
)
# Sort by timestamp
golden_runs.sort(key=lambda r: r.timestamp)
dates = [r.timestamp.isoformat() + "Z" for r in golden_runs]
scores = [round(r.metrics.avg_composite_score, 3) for r in golden_runs]
# Calculate trend
if len(scores) >= 6:
recent_avg = sum(scores[-3:]) / 3
old_avg = sum(scores[:3]) / 3
diff = recent_avg - old_avg
if diff > 0.1:
trend = "improving"
elif diff < -0.1:
trend = "declining"
else:
trend = "stable"
else:
trend = "stable"
return TrendResponse(dates=dates, scores=scores, trend=trend)
@router.get("/latest-metrics", response_model=LatestMetricsResponse)
async def get_latest_metrics():
"""Get latest metrics from all test suites."""
runner = get_runner()
latest = runner.get_latest_metrics()
return LatestMetricsResponse(
golden=_metrics_to_response(latest["golden"]) if latest["golden"] else None,
synthetic=_metrics_to_response(latest["synthetic"]) if latest["synthetic"] else None,
rag=_metrics_to_response(latest["rag"]) if latest["rag"] else None,
)
@router.post("/run/golden", response_model=RunResultResponse)
async def run_golden_suite(background_tasks: BackgroundTasks):
"""Run the golden test suite."""
if _is_running["golden"]:
return RunResultResponse(
success=False,
message="Golden suite is already running"
)
_is_running["golden"] = True
logger.info("Starting Golden Suite via API")
try:
runner = get_runner()
git_commit = _get_git_commit()
# Run the suite
run = await runner.run_golden_suite(git_commit=git_commit)
metrics = _metrics_to_response(run.metrics)
return RunResultResponse(
success=True,
message=f"Golden suite completed: {run.metrics.passed_tests}/{run.metrics.total_tests} passed ({run.metrics.avg_composite_score:.2f} avg score)",
metrics=metrics,
run_id=run.id,
)
except Exception as e:
logger.error("Golden suite failed", error=str(e))
return RunResultResponse(
success=False,
message=f"Golden suite failed: {str(e)}"
)
finally:
_is_running["golden"] = False
@router.post("/run/synthetic", response_model=RunResultResponse)
async def run_synthetic_suite(background_tasks: BackgroundTasks):
"""Run the synthetic test suite."""
if _is_running["synthetic"]:
return RunResultResponse(
success=False,
message="Synthetic suite is already running"
)
_is_running["synthetic"] = True
logger.info("Starting Synthetic Suite via API")
try:
runner = get_runner()
git_commit = _get_git_commit()
# Run the suite
run = await runner.run_synthetic_suite(git_commit=git_commit)
metrics = _metrics_to_response(run.metrics)
return RunResultResponse(
success=True,
message=f"Synthetic suite completed: {run.metrics.passed_tests}/{run.metrics.total_tests} passed ({run.metrics.avg_composite_score:.2f} avg score)",
metrics=metrics,
run_id=run.id,
)
except Exception as e:
logger.error("Synthetic suite failed", error=str(e))
return RunResultResponse(
success=False,
message=f"Synthetic suite failed: {str(e)}"
)
finally:
_is_running["synthetic"] = False
@router.post("/run/rag", response_model=RunResultResponse)
async def run_rag_suite(background_tasks: BackgroundTasks):
"""Run the RAG/Correction test suite."""
if _is_running["rag"]:
return RunResultResponse(
success=False,
message="RAG suite is already running"
)
_is_running["rag"] = True
logger.info("Starting RAG Suite via API")
try:
runner = get_runner()
git_commit = _get_git_commit()
# Run the suite
run = await runner.run_rag_suite(git_commit=git_commit)
metrics = _metrics_to_response(run.metrics)
return RunResultResponse(
success=True,
message=f"RAG suite completed: {run.metrics.passed_tests}/{run.metrics.total_tests} passed ({run.metrics.avg_composite_score:.2f} avg score)",
metrics=metrics,
run_id=run.id,
)
except Exception as e:
logger.error("RAG suite failed", error=str(e))
return RunResultResponse(
success=False,
message=f"RAG suite failed: {str(e)}"
)
finally:
_is_running["rag"] = False
@router.get("/regression-check")
async def check_regression(threshold: float = 0.1):
"""Check for regression in recent scores."""
runner = get_runner()
runs = runner.get_test_runs(20)
golden_runs = [r for r in runs if r.suite == "golden"]
if len(golden_runs) < 2:
return {
"is_regression": False,
"message": "Not enough data for regression check",
"current_score": None,
"previous_avg": None,
"delta": None,
}
# Sort by timestamp (newest first)
golden_runs.sort(key=lambda r: r.timestamp, reverse=True)
current_score = golden_runs[0].metrics.avg_composite_score if golden_runs else 0
previous_scores = [r.metrics.avg_composite_score for r in golden_runs[1:6]]
previous_avg = sum(previous_scores) / len(previous_scores) if previous_scores else 0
delta = previous_avg - current_score
is_regression = delta > threshold
return {
"is_regression": is_regression,
"message": f"Regression detected: score dropped by {delta:.2f}" if is_regression else "No regression detected",
"current_score": round(current_score, 3),
"previous_avg": round(previous_avg, 3),
"delta": round(delta, 3),
"threshold": threshold,
}
@router.get("/health")
async def bqas_health():
"""BQAS health check."""
runner = get_runner()
health = await runner.health_check()
return {
"status": "healthy",
"judge_available": health["judge_available"],
"rag_judge_available": health["rag_judge_available"],
"test_runs_count": health["test_runs_count"],
"is_running": _is_running,
"config": health["config"],
}