""" Quality Judge Agent - BQAS Integration with Multi-Agent Architecture Wraps the existing LLMJudge to work as a multi-agent participant: - Subscribes to message bus for evaluation requests - Uses shared memory for consistent evaluations - Provides real-time quality checks """ import structlog import asyncio from typing import Optional, Dict, Any, List from datetime import datetime, timezone from pathlib import Path from bqas.judge import LLMJudge, JudgeResult from bqas.config import BQASConfig # Import agent-core components import sys sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'agent-core')) from brain.memory_store import MemoryStore from orchestrator.message_bus import MessageBus, AgentMessage, MessagePriority logger = structlog.get_logger(__name__) class QualityJudgeAgent: """ BQAS Quality Judge as a multi-agent participant. Provides: - Real-time response quality evaluation - Consistency via shared memory - Message bus integration for async evaluation - Calibration against historical evaluations """ AGENT_ID = "quality-judge" AGENT_TYPE = "quality-judge" # Production readiness thresholds PRODUCTION_READY_THRESHOLD = 80 # composite >= 80% NEEDS_REVIEW_THRESHOLD = 60 # 60 <= composite < 80 FAILED_THRESHOLD = 60 # composite < 60 def __init__( self, message_bus: MessageBus, memory_store: MemoryStore, bqas_config: Optional[BQASConfig] = None ): """ Initialize the Quality Judge Agent. Args: message_bus: Message bus for inter-agent communication memory_store: Shared memory for consistency bqas_config: Optional BQAS configuration """ self.bus = message_bus self.memory = memory_store self.judge = LLMJudge(config=bqas_config) self._running = False self._soul_content: Optional[str] = None # Load SOUL file self._load_soul() def _load_soul(self) -> None: """Loads the SOUL file for agent personality""" soul_path = Path(__file__).parent.parent.parent / 'agent-core' / 'soul' / 'quality-judge.soul.md' try: if soul_path.exists(): self._soul_content = soul_path.read_text() logger.debug("Loaded SOUL file", path=str(soul_path)) except Exception as e: logger.warning("Failed to load SOUL file", error=str(e)) async def start(self) -> None: """Starts the Quality Judge Agent""" self._running = True # Subscribe to evaluation requests await self.bus.subscribe( self.AGENT_ID, self._handle_message ) logger.info("Quality Judge Agent started") async def stop(self) -> None: """Stops the Quality Judge Agent""" self._running = False await self.bus.unsubscribe(self.AGENT_ID) await self.judge.close() logger.info("Quality Judge Agent stopped") async def _handle_message( self, message: AgentMessage ) -> Optional[Dict[str, Any]]: """Handles incoming messages""" if message.message_type == "evaluate_response": return await self._handle_evaluate_request(message) elif message.message_type == "get_evaluation_stats": return await self._handle_stats_request(message) elif message.message_type == "calibrate": return await self._handle_calibration_request(message) return None async def _handle_evaluate_request( self, message: AgentMessage ) -> Dict[str, Any]: """Handles evaluation requests""" payload = message.payload task_id = payload.get("task_id", "") task_type = payload.get("task_type", "") response = payload.get("response", "") context = payload.get("context", {}) user_input = context.get("user_input", "") expected_intent = context.get("expected_intent", task_type) logger.debug( "Evaluating response", task_id=task_id[:8] if task_id else "n/a", response_length=len(response) ) # Check for similar evaluations in memory similar = await self._find_similar_evaluations(task_type, response) # Run evaluation result = await self.judge.evaluate( user_input=user_input, detected_intent=task_type, response=response, expected_intent=expected_intent ) # Convert to percentage scale (0-100) composite_percent = (result.composite_score / 5) * 100 # Determine verdict if composite_percent >= self.PRODUCTION_READY_THRESHOLD: verdict = "production_ready" elif composite_percent >= self.NEEDS_REVIEW_THRESHOLD: verdict = "needs_review" else: verdict = "failed" # Prepare response evaluation = { "task_id": task_id, "intent_accuracy": result.intent_accuracy, "faithfulness": result.faithfulness, "relevance": result.relevance, "coherence": result.coherence, "safety": result.safety, "composite_score": composite_percent, "verdict": verdict, "reasoning": result.reasoning, "similar_count": len(similar), "evaluated_at": datetime.now(timezone.utc).isoformat() } # Store evaluation in memory await self._store_evaluation(task_type, response, evaluation) logger.info( "Evaluation complete", task_id=task_id[:8] if task_id else "n/a", composite=f"{composite_percent:.1f}%", verdict=verdict ) return evaluation async def _handle_stats_request( self, message: AgentMessage ) -> Dict[str, Any]: """Returns evaluation statistics""" task_type = message.payload.get("task_type") hours = message.payload.get("hours", 24) # Get recent evaluations from memory evaluations = await self.memory.get_recent( hours=hours, agent_id=self.AGENT_ID ) if task_type: evaluations = [ e for e in evaluations if e.key.startswith(f"evaluation:{task_type}:") ] # Calculate stats if not evaluations: return { "count": 0, "avg_score": 0, "pass_rate": 0, "by_verdict": {} } scores = [] by_verdict = {"production_ready": 0, "needs_review": 0, "failed": 0} for eval_memory in evaluations: value = eval_memory.value if isinstance(value, dict): scores.append(value.get("composite_score", 0)) verdict = value.get("verdict", "failed") by_verdict[verdict] = by_verdict.get(verdict, 0) + 1 total = len(scores) passed = by_verdict.get("production_ready", 0) return { "count": total, "avg_score": sum(scores) / max(total, 1), "pass_rate": passed / max(total, 1), "by_verdict": by_verdict, "time_range_hours": hours } async def _handle_calibration_request( self, message: AgentMessage ) -> Dict[str, Any]: """Handles calibration against gold standard examples""" examples = message.payload.get("examples", []) if not examples: return {"success": False, "reason": "No examples provided"} results = [] for example in examples: result = await self.judge.evaluate( user_input=example.get("user_input", ""), detected_intent=example.get("intent", ""), response=example.get("response", ""), expected_intent=example.get("expected_intent", "") ) expected_score = example.get("expected_score") if expected_score: actual_score = (result.composite_score / 5) * 100 deviation = abs(actual_score - expected_score) results.append({ "expected": expected_score, "actual": actual_score, "deviation": deviation, "within_tolerance": deviation <= 10 }) # Calculate calibration metrics avg_deviation = sum(r["deviation"] for r in results) / max(len(results), 1) within_tolerance = sum(1 for r in results if r["within_tolerance"]) return { "success": True, "examples_count": len(results), "avg_deviation": avg_deviation, "within_tolerance_count": within_tolerance, "calibration_quality": within_tolerance / max(len(results), 1) } async def _find_similar_evaluations( self, task_type: str, response: str ) -> List[Dict[str, Any]]: """Finds similar evaluations in memory for consistency""" # Search for evaluations of the same task type pattern = f"evaluation:{task_type}:*" similar = await self.memory.search(pattern, limit=5) # Filter to find truly similar responses # (In production, could use embedding similarity) return [m.value for m in similar if isinstance(m.value, dict)] async def _store_evaluation( self, task_type: str, response: str, evaluation: Dict[str, Any] ) -> None: """Stores evaluation in memory for future reference""" # Create unique key import hashlib response_hash = hashlib.sha256(response.encode()).hexdigest()[:16] key = f"evaluation:{task_type}:{response_hash}" await self.memory.remember( key=key, value=evaluation, agent_id=self.AGENT_ID, ttl_days=30 ) # Direct evaluation methods async def evaluate( self, response: str, task_type: str = "", context: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Evaluates a response directly (without message bus). Args: response: The response to evaluate task_type: Type of task that generated the response context: Additional context Returns: Evaluation result dict """ context = context or {} result = await self.judge.evaluate( user_input=context.get("user_input", ""), detected_intent=task_type, response=response, expected_intent=context.get("expected_intent", task_type) ) composite_percent = (result.composite_score / 5) * 100 if composite_percent >= self.PRODUCTION_READY_THRESHOLD: verdict = "production_ready" elif composite_percent >= self.NEEDS_REVIEW_THRESHOLD: verdict = "needs_review" else: verdict = "failed" return { "intent_accuracy": result.intent_accuracy, "faithfulness": result.faithfulness, "relevance": result.relevance, "coherence": result.coherence, "safety": result.safety, "composite_score": composite_percent, "verdict": verdict, "reasoning": result.reasoning } async def is_production_ready( self, response: str, task_type: str = "", context: Optional[Dict[str, Any]] = None ) -> bool: """ Quick check if response is production ready. Args: response: The response to check task_type: Type of task context: Additional context Returns: True if production ready """ evaluation = await self.evaluate(response, task_type, context) return evaluation["verdict"] == "production_ready" async def health_check(self) -> bool: """Checks if the quality judge is operational""" return await self.judge.health_check()