""" RAG Judge Evaluators - Individual evaluation methods for RAG quality """ import json import time import structlog from typing import List, Dict, Any from datetime import datetime from bqas.config import BQASConfig from bqas.prompts import ( RAG_RETRIEVAL_JUDGE_PROMPT, RAG_OPERATOR_JUDGE_PROMPT, RAG_HALLUCINATION_JUDGE_PROMPT, RAG_PRIVACY_JUDGE_PROMPT, RAG_NAMESPACE_JUDGE_PROMPT, ) from bqas.metrics import TestResult from bqas.rag_judge_types import ( RAGRetrievalResult, RAGOperatorResult, RAGHallucinationResult, RAGPrivacyResult, RAGNamespaceResult, ) logger = structlog.get_logger(__name__) async def evaluate_retrieval( call_ollama, parse_json_response, config: BQASConfig, query: str, aufgabentyp: str, subject: str, level: str, retrieved_passage: str, expected_concepts: List[str], ) -> RAGRetrievalResult: """Evaluate EH retrieval quality.""" prompt = RAG_RETRIEVAL_JUDGE_PROMPT.format( query=query, aufgabentyp=aufgabentyp, subject=subject, level=level, retrieved_passage=retrieved_passage, expected_concepts=", ".join(expected_concepts), ) try: response_text = await call_ollama(prompt) data = parse_json_response(response_text) retrieval_precision = max(0, min(100, int(data.get("retrieval_precision", 0)))) faithfulness = max(1, min(5, int(data.get("faithfulness", 1)))) relevance = max(1, min(5, int(data.get("relevance", 1)))) citation_accuracy = max(1, min(5, int(data.get("citation_accuracy", 1)))) composite = _calculate_retrieval_composite( config, retrieval_precision, faithfulness, relevance, citation_accuracy ) return RAGRetrievalResult( retrieval_precision=retrieval_precision, faithfulness=faithfulness, relevance=relevance, citation_accuracy=citation_accuracy, reasoning=str(data.get("reasoning", ""))[:500], composite_score=composite, ) except Exception as e: logger.error("Retrieval evaluation failed", error=str(e)) return RAGRetrievalResult( retrieval_precision=0, faithfulness=1, relevance=1, citation_accuracy=1, reasoning=f"Evaluation failed: {str(e)}", composite_score=0.0, ) def _calculate_retrieval_composite( config: BQASConfig, retrieval_precision: int, faithfulness: int, relevance: int, citation_accuracy: int, ) -> float: """Calculate composite score for retrieval evaluation.""" retrieval_score = (retrieval_precision / 100) * 5 composite = ( retrieval_score * config.rag_retrieval_precision_weight + faithfulness * config.rag_faithfulness_weight + relevance * 0.3 + citation_accuracy * config.rag_citation_accuracy_weight ) return round(composite, 3) async def evaluate_operator( call_ollama, parse_json_response, operator: str, generated_definition: str, expected_afb: str, expected_actions: List[str], ) -> RAGOperatorResult: """Evaluate operator alignment.""" prompt = RAG_OPERATOR_JUDGE_PROMPT.format( operator=operator, generated_definition=generated_definition, expected_afb=expected_afb, expected_actions=", ".join(expected_actions), ) try: response_text = await call_ollama(prompt) data = parse_json_response(response_text) operator_alignment = max(0, min(100, int(data.get("operator_alignment", 0)))) faithfulness = max(1, min(5, int(data.get("faithfulness", 1)))) completeness = max(1, min(5, int(data.get("completeness", 1)))) detected_afb = str(data.get("detected_afb", "")) alignment_score = (operator_alignment / 100) * 5 composite = round( alignment_score * 0.5 + faithfulness * 0.3 + completeness * 0.2, 3 ) return RAGOperatorResult( operator_alignment=operator_alignment, faithfulness=faithfulness, completeness=completeness, detected_afb=detected_afb, reasoning=str(data.get("reasoning", ""))[:500], composite_score=composite, ) except Exception as e: logger.error("Operator evaluation failed", error=str(e)) return RAGOperatorResult( operator_alignment=0, faithfulness=1, completeness=1, detected_afb="", reasoning=f"Evaluation failed: {str(e)}", composite_score=0.0, ) async def evaluate_hallucination( call_ollama, parse_json_response, query: str, response: str, available_facts: List[str], ) -> RAGHallucinationResult: """Evaluate for hallucinations.""" prompt = RAG_HALLUCINATION_JUDGE_PROMPT.format( query=query, response=response, available_facts="\n".join(f"- {f}" for f in available_facts), ) try: response_text = await call_ollama(prompt) data = parse_json_response(response_text) grounding_score = max(0, min(100, int(data.get("grounding_score", 0)))) invention_detection = "pass" if data.get("invention_detection") == "pass" else "fail" source_attribution = max(1, min(5, int(data.get("source_attribution", 1)))) hallucinated_claims = data.get("hallucinated_claims", []) grounding = (grounding_score / 100) * 5 invention = 5.0 if invention_detection == "pass" else 0.0 composite = round(grounding * 0.4 + invention * 0.4 + source_attribution * 0.2, 3) return RAGHallucinationResult( grounding_score=grounding_score, invention_detection=invention_detection, source_attribution=source_attribution, hallucinated_claims=hallucinated_claims[:5], reasoning=str(data.get("reasoning", ""))[:500], composite_score=composite, ) except Exception as e: logger.error("Hallucination evaluation failed", error=str(e)) return RAGHallucinationResult( grounding_score=0, invention_detection="fail", source_attribution=1, hallucinated_claims=[], reasoning=f"Evaluation failed: {str(e)}", composite_score=0.0, ) async def evaluate_privacy( call_ollama, parse_json_response, query: str, context: Dict[str, Any], response: str, ) -> RAGPrivacyResult: """Evaluate privacy/DSGVO compliance.""" prompt = RAG_PRIVACY_JUDGE_PROMPT.format( query=query, context=json.dumps(context, ensure_ascii=False, indent=2), response=response, ) try: response_text = await call_ollama(prompt) data = parse_json_response(response_text) privacy_compliance = "pass" if data.get("privacy_compliance") == "pass" else "fail" anonymization = max(1, min(5, int(data.get("anonymization", 1)))) dsgvo_compliance = "pass" if data.get("dsgvo_compliance") == "pass" else "fail" detected_pii = data.get("detected_pii", []) privacy = 5.0 if privacy_compliance == "pass" else 0.0 dsgvo = 5.0 if dsgvo_compliance == "pass" else 0.0 composite = round(privacy * 0.4 + anonymization * 0.2 + dsgvo * 0.4, 3) return RAGPrivacyResult( privacy_compliance=privacy_compliance, anonymization=anonymization, dsgvo_compliance=dsgvo_compliance, detected_pii=detected_pii[:5], reasoning=str(data.get("reasoning", ""))[:500], composite_score=composite, ) except Exception as e: logger.error("Privacy evaluation failed", error=str(e)) return RAGPrivacyResult( privacy_compliance="fail", anonymization=1, dsgvo_compliance="fail", detected_pii=[], reasoning=f"Evaluation failed: {str(e)}", composite_score=0.0, ) async def evaluate_namespace( call_ollama, parse_json_response, teacher_id: str, namespace: str, school_id: str, requested_data: str, response: str, ) -> RAGNamespaceResult: """Evaluate namespace isolation.""" prompt = RAG_NAMESPACE_JUDGE_PROMPT.format( teacher_id=teacher_id, namespace=namespace, school_id=school_id, requested_data=requested_data, response=response, ) try: response_text = await call_ollama(prompt) data = parse_json_response(response_text) namespace_compliance = "pass" if data.get("namespace_compliance") == "pass" else "fail" cross_tenant_leak = "pass" if data.get("cross_tenant_leak") == "pass" else "fail" school_sharing_compliance = max(1, min(5, int(data.get("school_sharing_compliance", 1)))) detected_leaks = data.get("detected_leaks", []) ns_compliance = 5.0 if namespace_compliance == "pass" else 0.0 cross_tenant = 5.0 if cross_tenant_leak == "pass" else 0.0 composite = round( ns_compliance * 0.4 + cross_tenant * 0.4 + school_sharing_compliance * 0.2, 3 ) return RAGNamespaceResult( namespace_compliance=namespace_compliance, cross_tenant_leak=cross_tenant_leak, school_sharing_compliance=school_sharing_compliance, detected_leaks=detected_leaks[:5], reasoning=str(data.get("reasoning", ""))[:500], composite_score=composite, ) except Exception as e: logger.error("Namespace evaluation failed", error=str(e)) return RAGNamespaceResult( namespace_compliance="fail", cross_tenant_leak="fail", school_sharing_compliance=1, detected_leaks=[], reasoning=f"Evaluation failed: {str(e)}", composite_score=0.0, ) async def evaluate_rag_test_case( judge_instance, test_case: Dict[str, Any], service_response: Dict[str, Any], ) -> TestResult: """ Evaluate a full RAG test case from the golden suite. """ start_time = time.time() test_id = test_case.get("id", "UNKNOWN") test_name = test_case.get("name", "") category = test_case.get("category", "") min_score = test_case.get("min_score", 3.5) composite_score = 0.0 reasoning = "" if category == "eh_retrieval": result = await judge_instance.evaluate_retrieval( query=test_case.get("input", {}).get("query", ""), aufgabentyp=test_case.get("input", {}).get("context", {}).get("aufgabentyp", ""), subject=test_case.get("input", {}).get("context", {}).get("subject", "Deutsch"), level=test_case.get("input", {}).get("context", {}).get("level", "Abitur"), retrieved_passage=service_response.get("passage", ""), expected_concepts=test_case.get("expected", {}).get("must_contain_concepts", []), ) composite_score = result.composite_score reasoning = result.reasoning elif category == "operator_alignment": result = await judge_instance.evaluate_operator( operator=test_case.get("input", {}).get("operator", ""), generated_definition=service_response.get("definition", ""), expected_afb=test_case.get("expected", {}).get("afb_level", ""), expected_actions=test_case.get("expected", {}).get("expected_actions", []), ) composite_score = result.composite_score reasoning = result.reasoning elif category == "hallucination_control": result = await judge_instance.evaluate_hallucination( query=test_case.get("input", {}).get("query", ""), response=service_response.get("response", ""), available_facts=test_case.get("input", {}).get("context", {}).get("available_facts", []), ) composite_score = result.composite_score reasoning = result.reasoning elif category == "privacy_compliance": result = await judge_instance.evaluate_privacy( query=test_case.get("input", {}).get("query", ""), context=test_case.get("input", {}).get("context", {}), response=service_response.get("response", ""), ) composite_score = result.composite_score reasoning = result.reasoning elif category == "namespace_isolation": context = test_case.get("input", {}).get("context", {}) result = await judge_instance.evaluate_namespace( teacher_id=context.get("teacher_id", ""), namespace=context.get("namespace", ""), school_id=context.get("school_id", ""), requested_data=test_case.get("input", {}).get("query", ""), response=service_response.get("response", ""), ) composite_score = result.composite_score reasoning = result.reasoning else: reasoning = f"Unknown category: {category}" duration_ms = int((time.time() - start_time) * 1000) passed = composite_score >= min_score return TestResult( test_id=test_id, test_name=test_name, user_input=str(test_case.get("input", {})), expected_intent=category, detected_intent=category, response=str(service_response), intent_accuracy=int(composite_score / 5 * 100), faithfulness=int(composite_score), relevance=int(composite_score), coherence=int(composite_score), safety="pass" if composite_score >= min_score else "fail", composite_score=composite_score, passed=passed, reasoning=reasoning, timestamp=datetime.utcnow(), duration_ms=duration_ms, )