""" LLM Judge - Qwen2.5-32B based evaluation """ import json import time import structlog import httpx from dataclasses import dataclass from typing import Literal, Optional from datetime import datetime from bqas.config import BQASConfig from bqas.prompts import JUDGE_PROMPT from bqas.metrics import TestResult logger = structlog.get_logger(__name__) @dataclass class JudgeResult: """Result from LLM Judge evaluation.""" intent_accuracy: int # 0-100 faithfulness: int # 1-5 relevance: int # 1-5 coherence: int # 1-5 safety: Literal["pass", "fail"] reasoning: str composite_score: float # Weighted average class LLMJudge: """ LLM-based evaluation of voice service responses. Uses Qwen2.5-32B via Ollama to evaluate: - Intent accuracy - Faithfulness (factual correctness) - Relevance (addresses the question) - Coherence (logical consistency) - Safety (no PII/DSGVO violations) """ def __init__(self, config: Optional[BQASConfig] = None): self.config = config or BQASConfig.from_env() self._client: Optional[httpx.AsyncClient] = None async def _get_client(self) -> httpx.AsyncClient: """Get or create HTTP client.""" if self._client is None: self._client = httpx.AsyncClient(timeout=self.config.judge_timeout) return self._client async def evaluate( self, user_input: str, detected_intent: str, response: str, expected_intent: str, ) -> JudgeResult: """ Evaluate a voice service response. Args: user_input: Original user voice command detected_intent: Intent detected by the service response: Generated response text expected_intent: Expected (ground truth) intent Returns: JudgeResult with all metrics """ prompt = JUDGE_PROMPT.format( user_input=user_input, detected_intent=detected_intent, response=response, expected_intent=expected_intent, ) client = await self._get_client() try: resp = await client.post( f"{self.config.ollama_base_url}/api/generate", json={ "model": self.config.judge_model, "prompt": prompt, "stream": False, "options": { "temperature": 0.1, "num_predict": 500, }, }, ) resp.raise_for_status() result_text = resp.json().get("response", "") # Parse JSON from response parsed = self._parse_judge_response(result_text) # Calculate composite score composite = self._calculate_composite(parsed) parsed["composite_score"] = composite return JudgeResult(**parsed) except httpx.HTTPError as e: logger.error("Judge request failed", error=str(e)) # Return a failed result return JudgeResult( intent_accuracy=0, faithfulness=1, relevance=1, coherence=1, safety="fail", reasoning=f"Evaluation failed: {str(e)}", composite_score=0.0, ) except Exception as e: logger.error("Unexpected error during evaluation", error=str(e)) return JudgeResult( intent_accuracy=0, faithfulness=1, relevance=1, coherence=1, safety="fail", reasoning=f"Unexpected error: {str(e)}", composite_score=0.0, ) def _parse_judge_response(self, text: str) -> dict: """Parse JSON from judge response.""" try: # Find JSON in response start = text.find("{") end = text.rfind("}") + 1 if start >= 0 and end > start: json_str = text[start:end] data = json.loads(json_str) # Validate and clamp values return { "intent_accuracy": max(0, min(100, int(data.get("intent_accuracy", 0)))), "faithfulness": max(1, min(5, int(data.get("faithfulness", 1)))), "relevance": max(1, min(5, int(data.get("relevance", 1)))), "coherence": max(1, min(5, int(data.get("coherence", 1)))), "safety": "pass" if data.get("safety", "fail") == "pass" else "fail", "reasoning": str(data.get("reasoning", ""))[:500], } except (json.JSONDecodeError, ValueError, TypeError) as e: logger.warning("Failed to parse judge response", error=str(e), text=text[:200]) # Default values on parse failure return { "intent_accuracy": 0, "faithfulness": 1, "relevance": 1, "coherence": 1, "safety": "fail", "reasoning": "Parse error", } def _calculate_composite(self, result: dict) -> float: """Calculate weighted composite score (0-5 scale).""" c = self.config # Normalize intent accuracy to 0-5 scale intent_score = (result["intent_accuracy"] / 100) * 5 # Safety score: 5 if pass, 0 if fail safety_score = 5.0 if result["safety"] == "pass" else 0.0 composite = ( intent_score * c.intent_accuracy_weight + result["faithfulness"] * c.faithfulness_weight + result["relevance"] * c.relevance_weight + result["coherence"] * c.coherence_weight + safety_score * c.safety_weight ) return round(composite, 3) async def evaluate_test_case( self, test_id: str, test_name: str, user_input: str, expected_intent: str, detected_intent: str, response: str, min_score: float = 3.5, ) -> TestResult: """ Evaluate a full test case and return TestResult. Args: test_id: Unique test identifier test_name: Human-readable test name user_input: Original voice command expected_intent: Ground truth intent detected_intent: Detected intent from service response: Generated response min_score: Minimum score to pass Returns: TestResult with all metrics and pass/fail status """ start_time = time.time() judge_result = await self.evaluate( user_input=user_input, detected_intent=detected_intent, response=response, expected_intent=expected_intent, ) duration_ms = int((time.time() - start_time) * 1000) passed = judge_result.composite_score >= min_score return TestResult( test_id=test_id, test_name=test_name, user_input=user_input, expected_intent=expected_intent, detected_intent=detected_intent, response=response, intent_accuracy=judge_result.intent_accuracy, faithfulness=judge_result.faithfulness, relevance=judge_result.relevance, coherence=judge_result.coherence, safety=judge_result.safety, composite_score=judge_result.composite_score, passed=passed, reasoning=judge_result.reasoning, timestamp=datetime.utcnow(), duration_ms=duration_ms, ) async def health_check(self) -> bool: """Check if Ollama and judge model are available.""" try: client = await self._get_client() response = await client.get(f"{self.config.ollama_base_url}/api/tags") if response.status_code != 200: return False # Check if model is available models = response.json().get("models", []) model_names = [m.get("name", "") for m in models] # Check for exact match or partial match for name in model_names: if self.config.judge_model in name: return True logger.warning( "Judge model not found", model=self.config.judge_model, available=model_names[:5], ) return False except Exception as e: logger.error("Health check failed", error=str(e)) return False async def close(self): """Close HTTP client.""" if self._client: await self._client.aclose() self._client = None