Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 25s
CI / test-go-edu-search (push) Successful in 26s
CI / test-python-klausur (push) Failing after 1m55s
CI / test-python-agent-core (push) Successful in 16s
CI / test-nodejs-website (push) Successful in 18s
- Voice-Service von Core nach Lehrer verschoben (bp-lehrer-voice-service) - 4 Jitsi-Services + 2 Synapse-Services in docker-compose.yml aufgenommen - Camunda komplett gelöscht: workflow pages, workflow-config.ts, bpmn-js deps - CAMUNDA_URL aus backend-lehrer environment entfernt - Sidebar: Kategorie "Compliance SDK" + "Katalogverwaltung" entfernt - Sidebar: Neue Kategorie "Kommunikation" mit Video & Chat, Voice Service, Alerts Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
530 lines
19 KiB
Python
530 lines
19 KiB
Python
"""
|
|
BQAS Test Runner - Executes Golden, RAG, and Synthetic test suites
|
|
"""
|
|
import yaml
|
|
import asyncio
|
|
import structlog
|
|
import httpx
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime
|
|
from dataclasses import dataclass, field
|
|
|
|
from bqas.config import BQASConfig
|
|
from bqas.judge import LLMJudge
|
|
from bqas.rag_judge import RAGJudge
|
|
from bqas.metrics import TestResult, BQASMetrics
|
|
from bqas.synthetic_generator import SyntheticGenerator
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class TestRun:
|
|
"""Record of a complete test run."""
|
|
id: int
|
|
suite: str # golden, rag, synthetic
|
|
timestamp: datetime
|
|
git_commit: Optional[str]
|
|
metrics: BQASMetrics
|
|
results: List[TestResult]
|
|
duration_seconds: float
|
|
|
|
|
|
class BQASRunner:
|
|
"""
|
|
Main test runner for BQAS test suites.
|
|
|
|
Executes:
|
|
- Golden Suite: Pre-defined golden test cases from YAML
|
|
- RAG Suite: RAG/Correction quality tests
|
|
- Synthetic Suite: LLM-generated test variations
|
|
"""
|
|
|
|
def __init__(self, config: Optional[BQASConfig] = None):
|
|
self.config = config or BQASConfig.from_env()
|
|
self.judge = LLMJudge(self.config)
|
|
self.rag_judge = RAGJudge(self.config)
|
|
self.synthetic_generator = SyntheticGenerator(self.config)
|
|
self._http_client: Optional[httpx.AsyncClient] = None
|
|
self._test_runs: List[TestRun] = []
|
|
self._run_counter = 0
|
|
|
|
async def _get_client(self) -> httpx.AsyncClient:
|
|
"""Get or create HTTP client for voice service calls."""
|
|
if self._http_client is None:
|
|
self._http_client = httpx.AsyncClient(timeout=30.0)
|
|
return self._http_client
|
|
|
|
# ================================
|
|
# Golden Suite Runner
|
|
# ================================
|
|
|
|
async def run_golden_suite(self, git_commit: Optional[str] = None) -> TestRun:
|
|
"""
|
|
Run the golden test suite.
|
|
|
|
Loads test cases from YAML files and evaluates each one.
|
|
"""
|
|
logger.info("Starting Golden Suite run")
|
|
start_time = datetime.utcnow()
|
|
|
|
# Load all golden test cases
|
|
test_cases = await self._load_golden_tests()
|
|
logger.info(f"Loaded {len(test_cases)} golden test cases")
|
|
|
|
# Run all tests
|
|
results = []
|
|
for i, test_case in enumerate(test_cases):
|
|
try:
|
|
result = await self._run_golden_test(test_case)
|
|
results.append(result)
|
|
|
|
if (i + 1) % 10 == 0:
|
|
logger.info(f"Progress: {i + 1}/{len(test_cases)} tests completed")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Test {test_case.get('id')} failed with error", error=str(e))
|
|
# Create a failed result
|
|
results.append(self._create_error_result(test_case, str(e)))
|
|
|
|
# Calculate metrics
|
|
metrics = BQASMetrics.from_results(results)
|
|
duration = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
# Record run
|
|
self._run_counter += 1
|
|
run = TestRun(
|
|
id=self._run_counter,
|
|
suite="golden",
|
|
timestamp=start_time,
|
|
git_commit=git_commit,
|
|
metrics=metrics,
|
|
results=results,
|
|
duration_seconds=duration,
|
|
)
|
|
self._test_runs.insert(0, run)
|
|
|
|
logger.info(
|
|
"Golden Suite completed",
|
|
total=metrics.total_tests,
|
|
passed=metrics.passed_tests,
|
|
failed=metrics.failed_tests,
|
|
score=metrics.avg_composite_score,
|
|
duration=f"{duration:.1f}s",
|
|
)
|
|
|
|
return run
|
|
|
|
async def _load_golden_tests(self) -> List[Dict[str, Any]]:
|
|
"""Load all golden test cases from YAML files."""
|
|
tests = []
|
|
golden_dir = Path(__file__).parent.parent / "tests" / "bqas" / "golden_tests"
|
|
|
|
yaml_files = [
|
|
"intent_tests.yaml",
|
|
"edge_cases.yaml",
|
|
"workflow_tests.yaml",
|
|
]
|
|
|
|
for filename in yaml_files:
|
|
filepath = golden_dir / filename
|
|
if filepath.exists():
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
if data and 'tests' in data:
|
|
for test in data['tests']:
|
|
test['source_file'] = filename
|
|
tests.extend(data['tests'])
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load {filename}", error=str(e))
|
|
|
|
return tests
|
|
|
|
async def _run_golden_test(self, test_case: Dict[str, Any]) -> TestResult:
|
|
"""Run a single golden test case."""
|
|
test_id = test_case.get('id', 'UNKNOWN')
|
|
test_name = test_case.get('name', '')
|
|
user_input = test_case.get('input', '')
|
|
expected_intent = test_case.get('expected_intent', '')
|
|
min_score = test_case.get('min_score', self.config.min_golden_score)
|
|
|
|
# Get response from voice service (or simulate)
|
|
detected_intent, response = await self._get_voice_response(user_input, expected_intent)
|
|
|
|
# Evaluate with judge
|
|
result = await self.judge.evaluate_test_case(
|
|
test_id=test_id,
|
|
test_name=test_name,
|
|
user_input=user_input,
|
|
expected_intent=expected_intent,
|
|
detected_intent=detected_intent,
|
|
response=response,
|
|
min_score=min_score,
|
|
)
|
|
|
|
return result
|
|
|
|
async def _get_voice_response(
|
|
self,
|
|
user_input: str,
|
|
expected_intent: str
|
|
) -> tuple[str, str]:
|
|
"""
|
|
Get response from voice service.
|
|
|
|
For now, simulates responses since the full voice pipeline
|
|
might not be available. In production, this would call the
|
|
actual voice service endpoints.
|
|
"""
|
|
try:
|
|
client = await self._get_client()
|
|
|
|
# Try to call the voice service intent detection
|
|
response = await client.post(
|
|
f"{self.config.voice_service_url}/api/v1/tasks",
|
|
json={
|
|
"type": "intent_detection",
|
|
"input": user_input,
|
|
"namespace_id": "test_namespace",
|
|
},
|
|
timeout=10.0,
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return data.get('detected_intent', expected_intent), data.get('response', f"Verarbeite: {user_input}")
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Voice service call failed, using simulation", error=str(e))
|
|
|
|
# Simulate response based on expected intent
|
|
return self._simulate_response(user_input, expected_intent)
|
|
|
|
def _simulate_response(self, user_input: str, expected_intent: str) -> tuple[str, str]:
|
|
"""Simulate voice service response for testing without live service."""
|
|
# Simulate realistic detected intent (90% correct for golden tests)
|
|
import random
|
|
if random.random() < 0.90:
|
|
detected_intent = expected_intent
|
|
else:
|
|
# Simulate occasional misclassification
|
|
intents = ["student_observation", "reminder", "worksheet_generate", "parent_letter", "smalltalk"]
|
|
detected_intent = random.choice([i for i in intents if i != expected_intent])
|
|
|
|
# Generate simulated response
|
|
responses = {
|
|
"student_observation": f"Notiz wurde gespeichert: {user_input}",
|
|
"reminder": f"Erinnerung erstellt: {user_input}",
|
|
"worksheet_generate": f"Arbeitsblatt wird generiert basierend auf: {user_input}",
|
|
"homework_check": f"Hausaufgabenkontrolle eingetragen: {user_input}",
|
|
"parent_letter": f"Elternbrief-Entwurf erstellt: {user_input}",
|
|
"class_message": f"Nachricht an Klasse vorbereitet: {user_input}",
|
|
"quiz_generate": f"Quiz wird erstellt: {user_input}",
|
|
"quick_activity": f"Einstiegsaktivitaet geplant: {user_input}",
|
|
"canvas_edit": f"Aenderung am Canvas wird ausgefuehrt: {user_input}",
|
|
"canvas_layout": f"Layout wird angepasst: {user_input}",
|
|
"operator_checklist": f"Operatoren-Checkliste geladen: {user_input}",
|
|
"eh_passage": f"EH-Passage gefunden: {user_input}",
|
|
"feedback_suggest": f"Feedback-Vorschlag: {user_input}",
|
|
"reminder_schedule": f"Erinnerung geplant: {user_input}",
|
|
"task_summary": f"Aufgabenuebersicht: {user_input}",
|
|
"conference_topic": f"Konferenzthema notiert: {user_input}",
|
|
"correction_note": f"Korrekturnotiz gespeichert: {user_input}",
|
|
"worksheet_differentiate": f"Differenzierung wird erstellt: {user_input}",
|
|
}
|
|
|
|
response = responses.get(detected_intent, f"Verstanden: {user_input}")
|
|
return detected_intent, response
|
|
|
|
def _create_error_result(self, test_case: Dict[str, Any], error: str) -> TestResult:
|
|
"""Create a failed test result due to error."""
|
|
return TestResult(
|
|
test_id=test_case.get('id', 'UNKNOWN'),
|
|
test_name=test_case.get('name', 'Error'),
|
|
user_input=test_case.get('input', ''),
|
|
expected_intent=test_case.get('expected_intent', ''),
|
|
detected_intent='error',
|
|
response='',
|
|
intent_accuracy=0,
|
|
faithfulness=1,
|
|
relevance=1,
|
|
coherence=1,
|
|
safety='fail',
|
|
composite_score=0.0,
|
|
passed=False,
|
|
reasoning=f"Test execution error: {error}",
|
|
timestamp=datetime.utcnow(),
|
|
duration_ms=0,
|
|
)
|
|
|
|
# ================================
|
|
# RAG Suite Runner
|
|
# ================================
|
|
|
|
async def run_rag_suite(self, git_commit: Optional[str] = None) -> TestRun:
|
|
"""
|
|
Run the RAG/Correction test suite.
|
|
|
|
Tests EH retrieval, operator alignment, hallucination control, etc.
|
|
"""
|
|
logger.info("Starting RAG Suite run")
|
|
start_time = datetime.utcnow()
|
|
|
|
# Load RAG test cases
|
|
test_cases = await self._load_rag_tests()
|
|
logger.info(f"Loaded {len(test_cases)} RAG test cases")
|
|
|
|
# Run all tests
|
|
results = []
|
|
for i, test_case in enumerate(test_cases):
|
|
try:
|
|
result = await self._run_rag_test(test_case)
|
|
results.append(result)
|
|
|
|
if (i + 1) % 5 == 0:
|
|
logger.info(f"Progress: {i + 1}/{len(test_cases)} RAG tests completed")
|
|
|
|
except Exception as e:
|
|
logger.error(f"RAG test {test_case.get('id')} failed", error=str(e))
|
|
results.append(self._create_error_result(test_case, str(e)))
|
|
|
|
# Calculate metrics
|
|
metrics = BQASMetrics.from_results(results)
|
|
duration = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
# Record run
|
|
self._run_counter += 1
|
|
run = TestRun(
|
|
id=self._run_counter,
|
|
suite="rag",
|
|
timestamp=start_time,
|
|
git_commit=git_commit,
|
|
metrics=metrics,
|
|
results=results,
|
|
duration_seconds=duration,
|
|
)
|
|
self._test_runs.insert(0, run)
|
|
|
|
logger.info(
|
|
"RAG Suite completed",
|
|
total=metrics.total_tests,
|
|
passed=metrics.passed_tests,
|
|
score=metrics.avg_composite_score,
|
|
duration=f"{duration:.1f}s",
|
|
)
|
|
|
|
return run
|
|
|
|
async def _load_rag_tests(self) -> List[Dict[str, Any]]:
|
|
"""Load RAG test cases from YAML."""
|
|
tests = []
|
|
rag_file = Path(__file__).parent.parent / "tests" / "bqas" / "golden_tests" / "golden_rag_correction_v1.yaml"
|
|
|
|
if rag_file.exists():
|
|
try:
|
|
with open(rag_file, 'r', encoding='utf-8') as f:
|
|
# Handle YAML documents separated by ---
|
|
documents = list(yaml.safe_load_all(f))
|
|
for doc in documents:
|
|
if doc and 'tests' in doc:
|
|
tests.extend(doc['tests'])
|
|
if doc and 'edge_cases' in doc:
|
|
tests.extend(doc['edge_cases'])
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load RAG tests", error=str(e))
|
|
|
|
return tests
|
|
|
|
async def _run_rag_test(self, test_case: Dict[str, Any]) -> TestResult:
|
|
"""Run a single RAG test case."""
|
|
# Simulate service response for RAG tests
|
|
service_response = await self._simulate_rag_response(test_case)
|
|
|
|
# Evaluate with RAG judge
|
|
result = await self.rag_judge.evaluate_rag_test_case(
|
|
test_case=test_case,
|
|
service_response=service_response,
|
|
)
|
|
|
|
return result
|
|
|
|
async def _simulate_rag_response(self, test_case: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Simulate RAG service response."""
|
|
category = test_case.get('category', '')
|
|
input_data = test_case.get('input', {})
|
|
expected = test_case.get('expected', {})
|
|
|
|
# Simulate responses based on category
|
|
if category == 'eh_retrieval':
|
|
concepts = expected.get('must_contain_concepts', [])
|
|
passage = f"Der Erwartungshorizont sieht folgende Aspekte vor: {', '.join(concepts[:3])}. "
|
|
passage += "Diese muessen im Rahmen der Aufgabenbearbeitung beruecksichtigt werden."
|
|
return {
|
|
"passage": passage,
|
|
"source": "EH_Deutsch_Abitur_2024_NI.pdf",
|
|
"relevance_score": 0.85,
|
|
}
|
|
|
|
elif category == 'operator_alignment':
|
|
operator = input_data.get('operator', '')
|
|
afb = expected.get('afb_level', 'II')
|
|
actions = expected.get('expected_actions', [])
|
|
return {
|
|
"operator": operator,
|
|
"definition": f"'{operator}' gehoert zu Anforderungsbereich {afb}. Erwartete Handlungen: {', '.join(actions[:2])}.",
|
|
"afb_level": afb,
|
|
}
|
|
|
|
elif category == 'hallucination_control':
|
|
return {
|
|
"response": "Basierend auf den verfuegbaren Informationen kann ich folgendes feststellen...",
|
|
"grounded": True,
|
|
}
|
|
|
|
elif category == 'privacy_compliance':
|
|
return {
|
|
"response": "Die Arbeit zeigt folgende Merkmale... [anonymisiert]",
|
|
"contains_pii": False,
|
|
}
|
|
|
|
elif category == 'namespace_isolation':
|
|
return {
|
|
"response": "Zugriff nur auf Daten im eigenen Namespace.",
|
|
"namespace_violation": False,
|
|
}
|
|
|
|
return {"response": "Simulated response", "success": True}
|
|
|
|
# ================================
|
|
# Synthetic Suite Runner
|
|
# ================================
|
|
|
|
async def run_synthetic_suite(self, git_commit: Optional[str] = None) -> TestRun:
|
|
"""
|
|
Run the synthetic test suite.
|
|
|
|
Generates test variations using LLM and evaluates them.
|
|
"""
|
|
logger.info("Starting Synthetic Suite run")
|
|
start_time = datetime.utcnow()
|
|
|
|
# Generate synthetic tests
|
|
all_variations = await self.synthetic_generator.generate_all_intents(
|
|
count_per_intent=self.config.synthetic_count_per_intent
|
|
)
|
|
|
|
# Flatten variations
|
|
test_cases = []
|
|
for intent, variations in all_variations.items():
|
|
for i, v in enumerate(variations):
|
|
test_cases.append({
|
|
'id': f"SYN-{intent.upper()[:4]}-{i+1:03d}",
|
|
'name': f"Synthetic {intent} #{i+1}",
|
|
'input': v.input,
|
|
'expected_intent': v.expected_intent,
|
|
'slots': v.slots,
|
|
'source': v.source,
|
|
'min_score': self.config.min_synthetic_score,
|
|
})
|
|
|
|
logger.info(f"Generated {len(test_cases)} synthetic test cases")
|
|
|
|
# Run all tests
|
|
results = []
|
|
for i, test_case in enumerate(test_cases):
|
|
try:
|
|
result = await self._run_golden_test(test_case) # Same logic as golden
|
|
results.append(result)
|
|
|
|
if (i + 1) % 20 == 0:
|
|
logger.info(f"Progress: {i + 1}/{len(test_cases)} synthetic tests completed")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Synthetic test {test_case.get('id')} failed", error=str(e))
|
|
results.append(self._create_error_result(test_case, str(e)))
|
|
|
|
# Calculate metrics
|
|
metrics = BQASMetrics.from_results(results)
|
|
duration = (datetime.utcnow() - start_time).total_seconds()
|
|
|
|
# Record run
|
|
self._run_counter += 1
|
|
run = TestRun(
|
|
id=self._run_counter,
|
|
suite="synthetic",
|
|
timestamp=start_time,
|
|
git_commit=git_commit,
|
|
metrics=metrics,
|
|
results=results,
|
|
duration_seconds=duration,
|
|
)
|
|
self._test_runs.insert(0, run)
|
|
|
|
logger.info(
|
|
"Synthetic Suite completed",
|
|
total=metrics.total_tests,
|
|
passed=metrics.passed_tests,
|
|
score=metrics.avg_composite_score,
|
|
duration=f"{duration:.1f}s",
|
|
)
|
|
|
|
return run
|
|
|
|
# ================================
|
|
# Utility Methods
|
|
# ================================
|
|
|
|
def get_test_runs(self, limit: int = 20) -> List[TestRun]:
|
|
"""Get recent test runs."""
|
|
return self._test_runs[:limit]
|
|
|
|
def get_latest_metrics(self) -> Dict[str, Optional[BQASMetrics]]:
|
|
"""Get latest metrics for each suite."""
|
|
result = {"golden": None, "rag": None, "synthetic": None}
|
|
|
|
for run in self._test_runs:
|
|
if result[run.suite] is None:
|
|
result[run.suite] = run.metrics
|
|
if all(v is not None for v in result.values()):
|
|
break
|
|
|
|
return result
|
|
|
|
async def health_check(self) -> Dict[str, Any]:
|
|
"""Check health of BQAS components."""
|
|
judge_ok = await self.judge.health_check()
|
|
rag_judge_ok = await self.rag_judge.health_check()
|
|
|
|
return {
|
|
"judge_available": judge_ok,
|
|
"rag_judge_available": rag_judge_ok,
|
|
"test_runs_count": len(self._test_runs),
|
|
"config": {
|
|
"ollama_url": self.config.ollama_base_url,
|
|
"judge_model": self.config.judge_model,
|
|
}
|
|
}
|
|
|
|
async def close(self):
|
|
"""Cleanup resources."""
|
|
await self.judge.close()
|
|
await self.rag_judge.close()
|
|
await self.synthetic_generator.close()
|
|
if self._http_client:
|
|
await self._http_client.aclose()
|
|
self._http_client = None
|
|
|
|
|
|
# Singleton instance for the API
|
|
_runner_instance: Optional[BQASRunner] = None
|
|
|
|
|
|
def get_runner() -> BQASRunner:
|
|
"""Get or create the global BQASRunner instance."""
|
|
global _runner_instance
|
|
if _runner_instance is None:
|
|
_runner_instance = BQASRunner()
|
|
return _runner_instance
|