381 lines
12 KiB
Python
381 lines
12 KiB
Python
"""
|
|
Quality Judge Agent - BQAS Integration with Multi-Agent Architecture
|
|
|
|
Wraps the existing LLMJudge to work as a multi-agent participant:
|
|
- Subscribes to message bus for evaluation requests
|
|
- Uses shared memory for consistent evaluations
|
|
- Provides real-time quality checks
|
|
"""
|
|
|
|
import structlog
|
|
import asyncio
|
|
from typing import Optional, Dict, Any, List
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from bqas.judge import LLMJudge, JudgeResult
|
|
from bqas.config import BQASConfig
|
|
|
|
# Import agent-core components
|
|
import sys
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'agent-core'))
|
|
|
|
from brain.memory_store import MemoryStore
|
|
from orchestrator.message_bus import MessageBus, AgentMessage, MessagePriority
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
|
|
class QualityJudgeAgent:
|
|
"""
|
|
BQAS Quality Judge as a multi-agent participant.
|
|
|
|
Provides:
|
|
- Real-time response quality evaluation
|
|
- Consistency via shared memory
|
|
- Message bus integration for async evaluation
|
|
- Calibration against historical evaluations
|
|
"""
|
|
|
|
AGENT_ID = "quality-judge"
|
|
AGENT_TYPE = "quality-judge"
|
|
|
|
# Production readiness thresholds
|
|
PRODUCTION_READY_THRESHOLD = 80 # composite >= 80%
|
|
NEEDS_REVIEW_THRESHOLD = 60 # 60 <= composite < 80
|
|
FAILED_THRESHOLD = 60 # composite < 60
|
|
|
|
def __init__(
|
|
self,
|
|
message_bus: MessageBus,
|
|
memory_store: MemoryStore,
|
|
bqas_config: Optional[BQASConfig] = None
|
|
):
|
|
"""
|
|
Initialize the Quality Judge Agent.
|
|
|
|
Args:
|
|
message_bus: Message bus for inter-agent communication
|
|
memory_store: Shared memory for consistency
|
|
bqas_config: Optional BQAS configuration
|
|
"""
|
|
self.bus = message_bus
|
|
self.memory = memory_store
|
|
self.judge = LLMJudge(config=bqas_config)
|
|
self._running = False
|
|
self._soul_content: Optional[str] = None
|
|
|
|
# Load SOUL file
|
|
self._load_soul()
|
|
|
|
def _load_soul(self) -> None:
|
|
"""Loads the SOUL file for agent personality"""
|
|
soul_path = Path(__file__).parent.parent.parent / 'agent-core' / 'soul' / 'quality-judge.soul.md'
|
|
try:
|
|
if soul_path.exists():
|
|
self._soul_content = soul_path.read_text()
|
|
logger.debug("Loaded SOUL file", path=str(soul_path))
|
|
except Exception as e:
|
|
logger.warning("Failed to load SOUL file", error=str(e))
|
|
|
|
async def start(self) -> None:
|
|
"""Starts the Quality Judge Agent"""
|
|
self._running = True
|
|
|
|
# Subscribe to evaluation requests
|
|
await self.bus.subscribe(
|
|
self.AGENT_ID,
|
|
self._handle_message
|
|
)
|
|
|
|
logger.info("Quality Judge Agent started")
|
|
|
|
async def stop(self) -> None:
|
|
"""Stops the Quality Judge Agent"""
|
|
self._running = False
|
|
|
|
await self.bus.unsubscribe(self.AGENT_ID)
|
|
await self.judge.close()
|
|
|
|
logger.info("Quality Judge Agent stopped")
|
|
|
|
async def _handle_message(
|
|
self,
|
|
message: AgentMessage
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""Handles incoming messages"""
|
|
if message.message_type == "evaluate_response":
|
|
return await self._handle_evaluate_request(message)
|
|
elif message.message_type == "get_evaluation_stats":
|
|
return await self._handle_stats_request(message)
|
|
elif message.message_type == "calibrate":
|
|
return await self._handle_calibration_request(message)
|
|
|
|
return None
|
|
|
|
async def _handle_evaluate_request(
|
|
self,
|
|
message: AgentMessage
|
|
) -> Dict[str, Any]:
|
|
"""Handles evaluation requests"""
|
|
payload = message.payload
|
|
|
|
task_id = payload.get("task_id", "")
|
|
task_type = payload.get("task_type", "")
|
|
response = payload.get("response", "")
|
|
context = payload.get("context", {})
|
|
user_input = context.get("user_input", "")
|
|
expected_intent = context.get("expected_intent", task_type)
|
|
|
|
logger.debug(
|
|
"Evaluating response",
|
|
task_id=task_id[:8] if task_id else "n/a",
|
|
response_length=len(response)
|
|
)
|
|
|
|
# Check for similar evaluations in memory
|
|
similar = await self._find_similar_evaluations(task_type, response)
|
|
|
|
# Run evaluation
|
|
result = await self.judge.evaluate(
|
|
user_input=user_input,
|
|
detected_intent=task_type,
|
|
response=response,
|
|
expected_intent=expected_intent
|
|
)
|
|
|
|
# Convert to percentage scale (0-100)
|
|
composite_percent = (result.composite_score / 5) * 100
|
|
|
|
# Determine verdict
|
|
if composite_percent >= self.PRODUCTION_READY_THRESHOLD:
|
|
verdict = "production_ready"
|
|
elif composite_percent >= self.NEEDS_REVIEW_THRESHOLD:
|
|
verdict = "needs_review"
|
|
else:
|
|
verdict = "failed"
|
|
|
|
# Prepare response
|
|
evaluation = {
|
|
"task_id": task_id,
|
|
"intent_accuracy": result.intent_accuracy,
|
|
"faithfulness": result.faithfulness,
|
|
"relevance": result.relevance,
|
|
"coherence": result.coherence,
|
|
"safety": result.safety,
|
|
"composite_score": composite_percent,
|
|
"verdict": verdict,
|
|
"reasoning": result.reasoning,
|
|
"similar_count": len(similar),
|
|
"evaluated_at": datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
# Store evaluation in memory
|
|
await self._store_evaluation(task_type, response, evaluation)
|
|
|
|
logger.info(
|
|
"Evaluation complete",
|
|
task_id=task_id[:8] if task_id else "n/a",
|
|
composite=f"{composite_percent:.1f}%",
|
|
verdict=verdict
|
|
)
|
|
|
|
return evaluation
|
|
|
|
async def _handle_stats_request(
|
|
self,
|
|
message: AgentMessage
|
|
) -> Dict[str, Any]:
|
|
"""Returns evaluation statistics"""
|
|
task_type = message.payload.get("task_type")
|
|
hours = message.payload.get("hours", 24)
|
|
|
|
# Get recent evaluations from memory
|
|
evaluations = await self.memory.get_recent(
|
|
hours=hours,
|
|
agent_id=self.AGENT_ID
|
|
)
|
|
|
|
if task_type:
|
|
evaluations = [
|
|
e for e in evaluations
|
|
if e.key.startswith(f"evaluation:{task_type}:")
|
|
]
|
|
|
|
# Calculate stats
|
|
if not evaluations:
|
|
return {
|
|
"count": 0,
|
|
"avg_score": 0,
|
|
"pass_rate": 0,
|
|
"by_verdict": {}
|
|
}
|
|
|
|
scores = []
|
|
by_verdict = {"production_ready": 0, "needs_review": 0, "failed": 0}
|
|
|
|
for eval_memory in evaluations:
|
|
value = eval_memory.value
|
|
if isinstance(value, dict):
|
|
scores.append(value.get("composite_score", 0))
|
|
verdict = value.get("verdict", "failed")
|
|
by_verdict[verdict] = by_verdict.get(verdict, 0) + 1
|
|
|
|
total = len(scores)
|
|
passed = by_verdict.get("production_ready", 0)
|
|
|
|
return {
|
|
"count": total,
|
|
"avg_score": sum(scores) / max(total, 1),
|
|
"pass_rate": passed / max(total, 1),
|
|
"by_verdict": by_verdict,
|
|
"time_range_hours": hours
|
|
}
|
|
|
|
async def _handle_calibration_request(
|
|
self,
|
|
message: AgentMessage
|
|
) -> Dict[str, Any]:
|
|
"""Handles calibration against gold standard examples"""
|
|
examples = message.payload.get("examples", [])
|
|
|
|
if not examples:
|
|
return {"success": False, "reason": "No examples provided"}
|
|
|
|
results = []
|
|
for example in examples:
|
|
result = await self.judge.evaluate(
|
|
user_input=example.get("user_input", ""),
|
|
detected_intent=example.get("intent", ""),
|
|
response=example.get("response", ""),
|
|
expected_intent=example.get("expected_intent", "")
|
|
)
|
|
|
|
expected_score = example.get("expected_score")
|
|
if expected_score:
|
|
actual_score = (result.composite_score / 5) * 100
|
|
deviation = abs(actual_score - expected_score)
|
|
results.append({
|
|
"expected": expected_score,
|
|
"actual": actual_score,
|
|
"deviation": deviation,
|
|
"within_tolerance": deviation <= 10
|
|
})
|
|
|
|
# Calculate calibration metrics
|
|
avg_deviation = sum(r["deviation"] for r in results) / max(len(results), 1)
|
|
within_tolerance = sum(1 for r in results if r["within_tolerance"])
|
|
|
|
return {
|
|
"success": True,
|
|
"examples_count": len(results),
|
|
"avg_deviation": avg_deviation,
|
|
"within_tolerance_count": within_tolerance,
|
|
"calibration_quality": within_tolerance / max(len(results), 1)
|
|
}
|
|
|
|
async def _find_similar_evaluations(
|
|
self,
|
|
task_type: str,
|
|
response: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""Finds similar evaluations in memory for consistency"""
|
|
# Search for evaluations of the same task type
|
|
pattern = f"evaluation:{task_type}:*"
|
|
similar = await self.memory.search(pattern, limit=5)
|
|
|
|
# Filter to find truly similar responses
|
|
# (In production, could use embedding similarity)
|
|
return [m.value for m in similar if isinstance(m.value, dict)]
|
|
|
|
async def _store_evaluation(
|
|
self,
|
|
task_type: str,
|
|
response: str,
|
|
evaluation: Dict[str, Any]
|
|
) -> None:
|
|
"""Stores evaluation in memory for future reference"""
|
|
# Create unique key
|
|
import hashlib
|
|
response_hash = hashlib.sha256(response.encode()).hexdigest()[:16]
|
|
key = f"evaluation:{task_type}:{response_hash}"
|
|
|
|
await self.memory.remember(
|
|
key=key,
|
|
value=evaluation,
|
|
agent_id=self.AGENT_ID,
|
|
ttl_days=30
|
|
)
|
|
|
|
# Direct evaluation methods
|
|
|
|
async def evaluate(
|
|
self,
|
|
response: str,
|
|
task_type: str = "",
|
|
context: Optional[Dict[str, Any]] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Evaluates a response directly (without message bus).
|
|
|
|
Args:
|
|
response: The response to evaluate
|
|
task_type: Type of task that generated the response
|
|
context: Additional context
|
|
|
|
Returns:
|
|
Evaluation result dict
|
|
"""
|
|
context = context or {}
|
|
|
|
result = await self.judge.evaluate(
|
|
user_input=context.get("user_input", ""),
|
|
detected_intent=task_type,
|
|
response=response,
|
|
expected_intent=context.get("expected_intent", task_type)
|
|
)
|
|
|
|
composite_percent = (result.composite_score / 5) * 100
|
|
|
|
if composite_percent >= self.PRODUCTION_READY_THRESHOLD:
|
|
verdict = "production_ready"
|
|
elif composite_percent >= self.NEEDS_REVIEW_THRESHOLD:
|
|
verdict = "needs_review"
|
|
else:
|
|
verdict = "failed"
|
|
|
|
return {
|
|
"intent_accuracy": result.intent_accuracy,
|
|
"faithfulness": result.faithfulness,
|
|
"relevance": result.relevance,
|
|
"coherence": result.coherence,
|
|
"safety": result.safety,
|
|
"composite_score": composite_percent,
|
|
"verdict": verdict,
|
|
"reasoning": result.reasoning
|
|
}
|
|
|
|
async def is_production_ready(
|
|
self,
|
|
response: str,
|
|
task_type: str = "",
|
|
context: Optional[Dict[str, Any]] = None
|
|
) -> bool:
|
|
"""
|
|
Quick check if response is production ready.
|
|
|
|
Args:
|
|
response: The response to check
|
|
task_type: Type of task
|
|
context: Additional context
|
|
|
|
Returns:
|
|
True if production ready
|
|
"""
|
|
evaluation = await self.evaluate(response, task_type, context)
|
|
return evaluation["verdict"] == "production_ready"
|
|
|
|
async def health_check(self) -> bool:
|
|
"""Checks if the quality judge is operational"""
|
|
return await self.judge.health_check()
|