Files
breakpilot-lehrer/klausur-service/backend/tests/test_rag_admin.py
Benjamin Boenisch 5a31f52310 Initial commit: breakpilot-lehrer - Lehrer KI Platform
Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website,
Klausur-Service, School-Service, Voice-Service, Geo-Service,
BreakPilot Drive, Agent-Core

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:26 +01:00

357 lines
11 KiB
Python

"""
Tests for RAG Admin API
Tests upload, search, metrics, and storage functionality.
"""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from datetime import datetime
import io
import zipfile
# =============================================================================
# Test Fixtures
# =============================================================================
@pytest.fixture
def mock_qdrant_client():
"""Mock Qdrant client."""
with patch('admin_api.get_qdrant_client') as mock:
client = MagicMock()
client.get_collections.return_value.collections = []
client.get_collection.return_value.vectors_count = 7352
client.get_collection.return_value.points_count = 7352
client.get_collection.return_value.status.value = "green"
mock.return_value = client
yield client
@pytest.fixture
def mock_minio_client():
"""Mock MinIO client."""
with patch('minio_storage._get_minio_client') as mock:
client = MagicMock()
client.bucket_exists.return_value = True
client.list_objects.return_value = []
mock.return_value = client
yield client
@pytest.fixture
def mock_db_pool():
"""Mock PostgreSQL connection pool."""
with patch('metrics_db.get_pool') as mock:
pool = AsyncMock()
mock.return_value = pool
yield pool
# =============================================================================
# Admin API Tests
# =============================================================================
class TestIngestionStatus:
"""Tests for /api/v1/admin/nibis/status endpoint."""
def test_status_not_running(self):
"""Test status when no ingestion is running."""
from admin_api import _ingestion_status
# Reset status
_ingestion_status["running"] = False
_ingestion_status["last_run"] = None
_ingestion_status["last_result"] = None
assert _ingestion_status["running"] is False
def test_status_running(self):
"""Test status when ingestion is running."""
from admin_api import _ingestion_status
_ingestion_status["running"] = True
_ingestion_status["last_run"] = datetime.now().isoformat()
assert _ingestion_status["running"] is True
assert _ingestion_status["last_run"] is not None
class TestUploadAPI:
"""Tests for /api/v1/admin/rag/upload endpoint."""
def test_upload_record_creation(self):
"""Test that upload records are created correctly."""
from admin_api import _upload_history
# Clear history
_upload_history.clear()
# Simulate upload record
upload_record = {
"timestamp": datetime.now().isoformat(),
"filename": "test.pdf",
"collection": "bp_nibis_eh",
"year": 2024,
"pdfs_extracted": 1,
"target_directory": "/tmp/test",
}
_upload_history.append(upload_record)
assert len(_upload_history) == 1
assert _upload_history[0]["filename"] == "test.pdf"
def test_upload_history_limit(self):
"""Test that upload history is limited to 100 entries."""
from admin_api import _upload_history
_upload_history.clear()
# Add 105 entries
for i in range(105):
_upload_history.append({
"timestamp": datetime.now().isoformat(),
"filename": f"test_{i}.pdf",
})
if len(_upload_history) > 100:
_upload_history.pop(0)
assert len(_upload_history) == 100
class TestSearchFeedback:
"""Tests for feedback storage."""
def test_feedback_record_format(self):
"""Test feedback record structure."""
feedback_record = {
"timestamp": datetime.now().isoformat(),
"result_id": "test-123",
"rating": 4,
"notes": "Good result",
}
assert "timestamp" in feedback_record
assert feedback_record["rating"] >= 1
assert feedback_record["rating"] <= 5
# =============================================================================
# MinIO Storage Tests
# =============================================================================
class TestMinIOStorage:
"""Tests for MinIO storage functions."""
def test_get_minio_path(self):
"""Test MinIO path generation."""
from minio_storage import get_minio_path
path = get_minio_path(
data_type="landes-daten",
bundesland="ni",
use_case="klausur",
year=2024,
filename="test.pdf",
)
assert path == "landes-daten/ni/klausur/2024/test.pdf"
def test_get_minio_path_teacher_data(self):
"""Test MinIO path for teacher data."""
from minio_storage import get_minio_path
# Teacher data uses different path structure
path = f"lehrer-daten/tenant_123/teacher_456/test.pdf.enc"
assert "lehrer-daten" in path
assert "tenant_123" in path
assert ".enc" in path
@pytest.mark.asyncio
async def test_storage_stats_no_client(self):
"""Test storage stats when MinIO is not available."""
from minio_storage import get_storage_stats
with patch('minio_storage._get_minio_client', return_value=None):
stats = await get_storage_stats()
assert stats["connected"] is False
# =============================================================================
# Metrics DB Tests
# =============================================================================
class TestMetricsDB:
"""Tests for PostgreSQL metrics functions."""
@pytest.mark.asyncio
async def test_store_feedback_no_pool(self):
"""Test feedback storage when DB is not available."""
from metrics_db import store_feedback
with patch('metrics_db.get_pool', new_callable=AsyncMock, return_value=None):
result = await store_feedback(
result_id="test-123",
rating=4,
)
assert result is False
@pytest.mark.asyncio
async def test_calculate_metrics_no_pool(self):
"""Test metrics calculation when DB is not available."""
from metrics_db import calculate_metrics
with patch('metrics_db.get_pool', new_callable=AsyncMock, return_value=None):
metrics = await calculate_metrics()
assert metrics["connected"] is False
def test_create_tables_sql_structure(self):
"""Test that SQL table creation is properly structured."""
expected_tables = [
"rag_search_feedback",
"rag_search_logs",
"rag_upload_history",
]
# Read the metrics_db module to check table names
from metrics_db import init_metrics_tables
# The function should create these tables
assert callable(init_metrics_tables)
# =============================================================================
# Integration Tests (require running services)
# =============================================================================
class TestRAGIntegration:
"""Integration tests - require Qdrant, MinIO, PostgreSQL running."""
@pytest.mark.skip(reason="Requires running Qdrant")
@pytest.mark.asyncio
async def test_nibis_search(self):
"""Test NiBiS semantic search."""
from admin_api import search_nibis
from admin_api import NiBiSSearchRequest
request = NiBiSSearchRequest(
query="Gedichtanalyse Expressionismus",
limit=5,
)
# This would require Qdrant running
# results = await search_nibis(request)
# assert len(results) <= 5
@pytest.mark.skip(reason="Requires running MinIO")
@pytest.mark.asyncio
async def test_minio_upload(self):
"""Test MinIO document upload."""
from minio_storage import upload_rag_document
test_content = b"%PDF-1.4 test content"
# This would require MinIO running
# path = await upload_rag_document(
# file_data=test_content,
# filename="test.pdf",
# bundesland="ni",
# use_case="klausur",
# year=2024,
# )
# assert path is not None
@pytest.mark.skip(reason="Requires running PostgreSQL")
@pytest.mark.asyncio
async def test_metrics_storage(self):
"""Test metrics storage in PostgreSQL."""
from metrics_db import store_feedback, calculate_metrics
# This would require PostgreSQL running
# stored = await store_feedback(
# result_id="test-123",
# rating=4,
# query_text="test query",
# )
# assert stored is True
# =============================================================================
# ZIP Handling Tests
# =============================================================================
class TestZIPHandling:
"""Tests for ZIP file extraction."""
def test_create_test_zip(self):
"""Test creating a ZIP file in memory."""
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
zf.writestr("test1.pdf", b"%PDF-1.4 test content 1")
zf.writestr("test2.pdf", b"%PDF-1.4 test content 2")
zf.writestr("subfolder/test3.pdf", b"%PDF-1.4 test content 3")
zip_buffer.seek(0)
# Verify ZIP contents
with zipfile.ZipFile(zip_buffer, 'r') as zf:
names = zf.namelist()
assert "test1.pdf" in names
assert "test2.pdf" in names
assert "subfolder/test3.pdf" in names
def test_filter_macosx_files(self):
"""Test filtering out __MACOSX files from ZIP."""
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
zf.writestr("test.pdf", b"%PDF-1.4 test")
zf.writestr("__MACOSX/._test.pdf", b"macosx metadata")
zip_buffer.seek(0)
with zipfile.ZipFile(zip_buffer, 'r') as zf:
pdfs = [
name for name in zf.namelist()
if name.lower().endswith(".pdf") and not name.startswith("__MACOSX")
]
assert len(pdfs) == 1
assert pdfs[0] == "test.pdf"
# =============================================================================
# Embedding Tests
# =============================================================================
class TestEmbeddings:
"""Tests for embedding generation."""
def test_vector_dimensions(self):
"""Test that vector dimensions are configured correctly."""
from eh_pipeline import get_vector_size, EMBEDDING_BACKEND
size = get_vector_size()
if EMBEDDING_BACKEND == "local":
assert size == 384 # all-MiniLM-L6-v2
elif EMBEDDING_BACKEND == "openai":
assert size == 1536 # text-embedding-3-small
def test_chunking_config(self):
"""Test chunking configuration."""
from eh_pipeline import CHUNK_SIZE, CHUNK_OVERLAP
assert CHUNK_SIZE > 0
assert CHUNK_OVERLAP >= 0
assert CHUNK_OVERLAP < CHUNK_SIZE
# =============================================================================
# Run Tests
# =============================================================================
if __name__ == "__main__":
pytest.main([__file__, "-v"])