""" RAG & Training Test API - Test Runner fuer Retrieval Augmented Generation Endpoint: /api/admin/rag-tests """ from fastapi import APIRouter from pydantic import BaseModel from typing import List, Optional, Literal import httpx import asyncio import time import os router = APIRouter(prefix="/api/admin/rag-tests", tags=["RAG Tests"]) # ============================================== # Models # ============================================== class TestResult(BaseModel): name: str description: str expected: str actual: str status: Literal["passed", "failed", "pending", "skipped"] duration_ms: float error_message: Optional[str] = None class TestCategoryResult(BaseModel): category: str display_name: str description: str tests: List[TestResult] passed: int failed: int total: int class FullTestResults(BaseModel): categories: List[TestCategoryResult] total_passed: int total_failed: int total_tests: int duration_ms: float # ============================================== # Configuration # ============================================== BACKEND_URL = os.getenv("BACKEND_URL", "http://localhost:8000") VECTOR_DB_URL = os.getenv("VECTOR_DB_URL", "http://localhost:6333") # Qdrant EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small") # ============================================== # Test Implementations # ============================================== async def test_vector_db_health() -> TestResult: """Test Vector Database Connection (Qdrant)""" start = time.time() try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{VECTOR_DB_URL}/health") duration = (time.time() - start) * 1000 if response.status_code == 200: return TestResult( name="Vector Datenbank (Qdrant)", description="Prueft ob die Vector-DB fuer Embeddings erreichbar ist", expected="Qdrant erreichbar", actual="Qdrant aktiv und gesund", status="passed", duration_ms=duration ) else: return TestResult( name="Vector Datenbank (Qdrant)", description="Prueft ob die Vector-DB fuer Embeddings erreichbar ist", expected="Qdrant erreichbar", actual=f"HTTP {response.status_code}", status="failed", duration_ms=duration, error_message="Qdrant nicht erreichbar" ) except Exception as e: return TestResult( name="Vector Datenbank (Qdrant)", description="Prueft ob die Vector-DB fuer Embeddings erreichbar ist", expected="Qdrant erreichbar", actual="Nicht verfuegbar", status="skipped", duration_ms=(time.time() - start) * 1000, error_message=str(e) ) async def test_vector_collections() -> TestResult: """Test Vector Collections""" start = time.time() try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{VECTOR_DB_URL}/collections") duration = (time.time() - start) * 1000 if response.status_code == 200: data = response.json() collections = data.get("result", {}).get("collections", []) names = [c.get("name", "?") for c in collections] return TestResult( name="Vector Collections", description="Prueft ob RAG-Collections (Dokumente, Embeddings) existieren", expected="Collections verfuegbar", actual=f"{len(collections)} Collections: {', '.join(names[:3])}", status="passed" if collections else "skipped", duration_ms=duration, error_message=None if collections else "Keine Collections gefunden" ) else: return TestResult( name="Vector Collections", description="Prueft ob RAG-Collections (Dokumente, Embeddings) existieren", expected="Collections verfuegbar", actual=f"HTTP {response.status_code}", status="failed", duration_ms=duration, error_message="Collections nicht abrufbar" ) except Exception as e: return TestResult( name="Vector Collections", description="Prueft ob RAG-Collections (Dokumente, Embeddings) existieren", expected="Collections verfuegbar", actual="Nicht verfuegbar", status="skipped", duration_ms=(time.time() - start) * 1000, error_message=str(e) ) async def test_embedding_api() -> TestResult: """Test Embedding Generation API""" start = time.time() openai_key = os.getenv("OPENAI_API_KEY", "") if not openai_key: return TestResult( name="Embedding API (OpenAI)", description="Prueft ob Embeddings generiert werden koennen", expected="Embedding-Modell verfuegbar", actual="OPENAI_API_KEY nicht gesetzt", status="skipped", duration_ms=(time.time() - start) * 1000, error_message="API Key fehlt" ) try: async with httpx.AsyncClient(timeout=15.0) as client: response = await client.post( "https://api.openai.com/v1/embeddings", headers={"Authorization": f"Bearer {openai_key}"}, json={ "model": EMBEDDING_MODEL, "input": "Test embedding" } ) duration = (time.time() - start) * 1000 if response.status_code == 200: data = response.json() dims = len(data.get("data", [{}])[0].get("embedding", [])) return TestResult( name="Embedding API (OpenAI)", description="Prueft ob Embeddings generiert werden koennen", expected="Embedding-Modell verfuegbar", actual=f"{EMBEDDING_MODEL}: {dims} Dimensionen", status="passed", duration_ms=duration ) else: return TestResult( name="Embedding API (OpenAI)", description="Prueft ob Embeddings generiert werden koennen", expected="Embedding-Modell verfuegbar", actual=f"HTTP {response.status_code}", status="failed", duration_ms=duration, error_message="Embedding-Generierung fehlgeschlagen" ) except Exception as e: return TestResult( name="Embedding API (OpenAI)", description="Prueft ob Embeddings generiert werden koennen", expected="Embedding-Modell verfuegbar", actual=f"Fehler: {str(e)}", status="failed", duration_ms=(time.time() - start) * 1000, error_message=str(e) ) async def test_document_api() -> TestResult: """Test Document Management API""" start = time.time() try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{BACKEND_URL}/api/rag/documents") duration = (time.time() - start) * 1000 if response.status_code == 200: data = response.json() count = len(data) if isinstance(data, list) else data.get("total", 0) return TestResult( name="Dokument-Verwaltung API", description="Prueft ob die RAG-Dokument-Verwaltung verfuegbar ist", expected="Dokument-API verfuegbar", actual=f"{count} Dokumente indiziert", status="passed", duration_ms=duration ) elif response.status_code == 404: return TestResult( name="Dokument-Verwaltung API", description="Prueft ob die RAG-Dokument-Verwaltung verfuegbar ist", expected="Dokument-API verfuegbar", actual="Endpoint nicht implementiert", status="skipped", duration_ms=duration, error_message="RAG API nicht aktiviert" ) else: return TestResult( name="Dokument-Verwaltung API", description="Prueft ob die RAG-Dokument-Verwaltung verfuegbar ist", expected="Dokument-API verfuegbar", actual=f"HTTP {response.status_code}", status="failed", duration_ms=duration, error_message=f"Unerwarteter Status: {response.status_code}" ) except Exception as e: return TestResult( name="Dokument-Verwaltung API", description="Prueft ob die RAG-Dokument-Verwaltung verfuegbar ist", expected="Dokument-API verfuegbar", actual=f"Fehler: {str(e)}", status="failed", duration_ms=(time.time() - start) * 1000, error_message=str(e) ) async def test_training_api() -> TestResult: """Test Training Jobs API""" start = time.time() try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{BACKEND_URL}/api/training/jobs") duration = (time.time() - start) * 1000 if response.status_code == 200: data = response.json() count = len(data) if isinstance(data, list) else data.get("total", 0) return TestResult( name="Training Jobs API", description="Prueft ob die Modell-Training-Verwaltung verfuegbar ist", expected="Training-API verfuegbar", actual=f"{count} Training Jobs", status="passed", duration_ms=duration ) elif response.status_code == 404: return TestResult( name="Training Jobs API", description="Prueft ob die Modell-Training-Verwaltung verfuegbar ist", expected="Training-API verfuegbar", actual="Endpoint nicht implementiert", status="skipped", duration_ms=duration, error_message="Training API nicht aktiviert" ) else: return TestResult( name="Training Jobs API", description="Prueft ob die Modell-Training-Verwaltung verfuegbar ist", expected="Training-API verfuegbar", actual=f"HTTP {response.status_code}", status="failed", duration_ms=duration, error_message=f"Unerwarteter Status: {response.status_code}" ) except Exception as e: return TestResult( name="Training Jobs API", description="Prueft ob die Modell-Training-Verwaltung verfuegbar ist", expected="Training-API verfuegbar", actual=f"Fehler: {str(e)}", status="failed", duration_ms=(time.time() - start) * 1000, error_message=str(e) ) async def test_edu_search_api() -> TestResult: """Test EduSearch RAG API""" start = time.time() try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{BACKEND_URL}/v1/edu-search/seeds") duration = (time.time() - start) * 1000 if response.status_code == 200: data = response.json() count = len(data) if isinstance(data, list) else data.get("total", 0) return TestResult( name="EduSearch Seeds API", description="Prueft ob die Bildungs-Suchmaschinen-Seeds verfuegbar sind", expected="EduSearch API verfuegbar", actual=f"{count} Crawler Seeds", status="passed", duration_ms=duration ) elif response.status_code == 404: return TestResult( name="EduSearch Seeds API", description="Prueft ob die Bildungs-Suchmaschinen-Seeds verfuegbar sind", expected="EduSearch API verfuegbar", actual="Endpoint nicht implementiert", status="skipped", duration_ms=duration, error_message="EduSearch nicht aktiviert" ) else: return TestResult( name="EduSearch Seeds API", description="Prueft ob die Bildungs-Suchmaschinen-Seeds verfuegbar sind", expected="EduSearch API verfuegbar", actual=f"HTTP {response.status_code}", status="failed", duration_ms=duration, error_message=f"Unerwarteter Status" ) except Exception as e: return TestResult( name="EduSearch Seeds API", description="Prueft ob die Bildungs-Suchmaschinen-Seeds verfuegbar sind", expected="EduSearch API verfuegbar", actual=f"Fehler: {str(e)}", status="failed", duration_ms=(time.time() - start) * 1000, error_message=str(e) ) # ============================================== # Category Runners # ============================================== async def run_vector_tests() -> TestCategoryResult: """Run Vector DB tests""" tests = await asyncio.gather( test_vector_db_health(), test_vector_collections(), ) passed = sum(1 for t in tests if t.status == "passed") failed = sum(1 for t in tests if t.status == "failed") return TestCategoryResult( category="vector-db", display_name="Vector Datenbank", description="Tests fuer Qdrant Vector Store", tests=list(tests), passed=passed, failed=failed, total=len(tests) ) async def run_embedding_tests() -> TestCategoryResult: """Run Embedding tests""" tests = await asyncio.gather( test_embedding_api(), ) passed = sum(1 for t in tests if t.status == "passed") failed = sum(1 for t in tests if t.status == "failed") return TestCategoryResult( category="embeddings", display_name="Embeddings", description="Tests fuer Embedding-Generierung", tests=list(tests), passed=passed, failed=failed, total=len(tests) ) async def run_rag_tests() -> TestCategoryResult: """Run RAG Pipeline tests""" tests = await asyncio.gather( test_document_api(), test_edu_search_api(), ) passed = sum(1 for t in tests if t.status == "passed") failed = sum(1 for t in tests if t.status == "failed") return TestCategoryResult( category="rag-pipeline", display_name="RAG Pipeline", description="Tests fuer Retrieval Augmented Generation", tests=list(tests), passed=passed, failed=failed, total=len(tests) ) async def run_training_tests() -> TestCategoryResult: """Run Training tests""" tests = await asyncio.gather( test_training_api(), ) passed = sum(1 for t in tests if t.status == "passed") failed = sum(1 for t in tests if t.status == "failed") return TestCategoryResult( category="training", display_name="Model Training", description="Tests fuer Fine-Tuning und Training Jobs", tests=list(tests), passed=passed, failed=failed, total=len(tests) ) # ============================================== # API Endpoints # ============================================== @router.post("/{category}", response_model=TestCategoryResult) async def run_category_tests(category: str): """Run tests for a specific category""" runners = { "vector-db": run_vector_tests, "embeddings": run_embedding_tests, "rag-pipeline": run_rag_tests, "training": run_training_tests, } if category not in runners: return TestCategoryResult( category=category, display_name=f"Unbekannt: {category}", description="Kategorie nicht gefunden", tests=[], passed=0, failed=0, total=0 ) return await runners[category]() @router.post("/run-all", response_model=FullTestResults) async def run_all_tests(): """Run all RAG tests""" start = time.time() categories = await asyncio.gather( run_vector_tests(), run_embedding_tests(), run_rag_tests(), run_training_tests(), ) total_passed = sum(c.passed for c in categories) total_failed = sum(c.failed for c in categories) total_tests = sum(c.total for c in categories) return FullTestResults( categories=list(categories), total_passed=total_passed, total_failed=total_failed, total_tests=total_tests, duration_ms=(time.time() - start) * 1000 ) @router.get("/categories") async def get_categories(): """Get available test categories""" return { "categories": [ {"id": "vector-db", "name": "Vector DB", "description": "Qdrant Health & Collections"}, {"id": "embeddings", "name": "Embeddings", "description": "Embedding-Generierung"}, {"id": "rag-pipeline", "name": "RAG Pipeline", "description": "Dokumente & Suche"}, {"id": "training", "name": "Training", "description": "Fine-Tuning Jobs"}, ] }