Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 41s
CI / test-go-edu-search (push) Successful in 32s
CI / test-python-klausur (push) Failing after 2m41s
CI / test-python-agent-core (push) Successful in 34s
CI / test-nodejs-website (push) Successful in 39s
klausur-service: 183 shims deleted, 26 test files + 8 source files updated backend-lehrer: 59 shims deleted, main.py + 8 source files updated All imports now use the new package paths directly. Zero shims remaining in the entire codebase. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
317 lines
9.5 KiB
Python
317 lines
9.5 KiB
Python
"""
|
|
Admin API - NiBiS Ingestion & Search
|
|
|
|
Endpoints for NiBiS data discovery, ingestion, search, and statistics.
|
|
Extracted from admin_api.py for file-size compliance.
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, BackgroundTasks, Query
|
|
from pydantic import BaseModel
|
|
from typing import Optional, List, Dict
|
|
from datetime import datetime
|
|
|
|
from nibis_ingestion import (
|
|
run_ingestion,
|
|
discover_documents,
|
|
extract_zip_files,
|
|
DOCS_BASE_PATH,
|
|
)
|
|
from qdrant_service import QdrantService, search_nibis_eh, get_qdrant_client
|
|
from korrektur.eh_pipeline import generate_single_embedding
|
|
|
|
router = APIRouter(prefix="/api/v1/admin", tags=["Admin"])
|
|
|
|
# Store for background task status
|
|
_ingestion_status: Dict = {
|
|
"running": False,
|
|
"last_run": None,
|
|
"last_result": None,
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# Models
|
|
# =============================================================================
|
|
|
|
class IngestionRequest(BaseModel):
|
|
ewh_only: bool = True
|
|
year_filter: Optional[int] = None
|
|
subject_filter: Optional[str] = None
|
|
|
|
|
|
class IngestionStatus(BaseModel):
|
|
running: bool
|
|
last_run: Optional[str]
|
|
documents_indexed: Optional[int]
|
|
chunks_created: Optional[int]
|
|
errors: Optional[List[str]]
|
|
|
|
|
|
class NiBiSSearchRequest(BaseModel):
|
|
query: str
|
|
year: Optional[int] = None
|
|
subject: Optional[str] = None
|
|
niveau: Optional[str] = None
|
|
limit: int = 5
|
|
|
|
|
|
class NiBiSSearchResult(BaseModel):
|
|
id: str
|
|
score: float
|
|
text: str
|
|
year: Optional[int]
|
|
subject: Optional[str]
|
|
niveau: Optional[str]
|
|
task_number: Optional[int]
|
|
|
|
|
|
class DataSourceStats(BaseModel):
|
|
source_dir: str
|
|
year: int
|
|
document_count: int
|
|
subjects: List[str]
|
|
|
|
|
|
# =============================================================================
|
|
# Endpoints
|
|
# =============================================================================
|
|
|
|
@router.get("/nibis/status", response_model=IngestionStatus)
|
|
async def get_ingestion_status():
|
|
"""Get status of NiBiS ingestion pipeline."""
|
|
last_result = _ingestion_status.get("last_result") or {}
|
|
return IngestionStatus(
|
|
running=_ingestion_status["running"],
|
|
last_run=_ingestion_status.get("last_run"),
|
|
documents_indexed=last_result.get("documents_indexed"),
|
|
chunks_created=last_result.get("chunks_created"),
|
|
errors=(last_result.get("errors") or [])[:10],
|
|
)
|
|
|
|
|
|
@router.post("/nibis/extract-zips")
|
|
async def extract_zip_files_endpoint():
|
|
"""Extract all ZIP files in za-download directories."""
|
|
try:
|
|
extracted = extract_zip_files(DOCS_BASE_PATH)
|
|
return {
|
|
"status": "success",
|
|
"extracted_count": len(extracted),
|
|
"directories": [str(d) for d in extracted],
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/nibis/discover")
|
|
async def discover_nibis_documents(
|
|
ewh_only: bool = Query(True, description="Only return Erwartungshorizonte"),
|
|
year: Optional[int] = Query(None, description="Filter by year"),
|
|
subject: Optional[str] = Query(None, description="Filter by subject"),
|
|
):
|
|
"""
|
|
Discover available NiBiS documents without indexing.
|
|
Useful for previewing what will be indexed.
|
|
"""
|
|
try:
|
|
documents = discover_documents(DOCS_BASE_PATH, ewh_only=ewh_only)
|
|
|
|
# Apply filters
|
|
if year:
|
|
documents = [d for d in documents if d.year == year]
|
|
if subject:
|
|
documents = [d for d in documents if subject.lower() in d.subject.lower()]
|
|
|
|
# Group by year and subject
|
|
by_year: Dict[int, int] = {}
|
|
by_subject: Dict[str, int] = {}
|
|
for doc in documents:
|
|
by_year[doc.year] = by_year.get(doc.year, 0) + 1
|
|
by_subject[doc.subject] = by_subject.get(doc.subject, 0) + 1
|
|
|
|
return {
|
|
"total_documents": len(documents),
|
|
"by_year": dict(sorted(by_year.items())),
|
|
"by_subject": dict(sorted(by_subject.items(), key=lambda x: -x[1])),
|
|
"sample_documents": [
|
|
{
|
|
"id": d.id,
|
|
"filename": d.raw_filename,
|
|
"year": d.year,
|
|
"subject": d.subject,
|
|
"niveau": d.niveau,
|
|
"doc_type": d.doc_type,
|
|
}
|
|
for d in documents[:20]
|
|
],
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.post("/nibis/ingest")
|
|
async def start_ingestion(
|
|
request: IngestionRequest,
|
|
background_tasks: BackgroundTasks,
|
|
):
|
|
"""
|
|
Start NiBiS data ingestion in background.
|
|
"""
|
|
if _ingestion_status["running"]:
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail="Ingestion already running. Check /nibis/status for progress."
|
|
)
|
|
|
|
async def run_ingestion_task():
|
|
global _ingestion_status
|
|
_ingestion_status["running"] = True
|
|
_ingestion_status["last_run"] = datetime.now().isoformat()
|
|
|
|
try:
|
|
result = await run_ingestion(
|
|
ewh_only=request.ewh_only,
|
|
dry_run=False,
|
|
year_filter=request.year_filter,
|
|
subject_filter=request.subject_filter,
|
|
)
|
|
_ingestion_status["last_result"] = result
|
|
except Exception as e:
|
|
_ingestion_status["last_result"] = {"error": str(e), "errors": [str(e)]}
|
|
finally:
|
|
_ingestion_status["running"] = False
|
|
|
|
background_tasks.add_task(run_ingestion_task)
|
|
|
|
return {
|
|
"status": "started",
|
|
"message": "Ingestion started in background. Check /nibis/status for progress.",
|
|
"filters": {
|
|
"ewh_only": request.ewh_only,
|
|
"year": request.year_filter,
|
|
"subject": request.subject_filter,
|
|
},
|
|
}
|
|
|
|
|
|
@router.post("/nibis/search", response_model=List[NiBiSSearchResult])
|
|
async def search_nibis(request: NiBiSSearchRequest):
|
|
"""
|
|
Semantic search in NiBiS Erwartungshorizonte.
|
|
"""
|
|
try:
|
|
query_embedding = await generate_single_embedding(request.query)
|
|
|
|
if not query_embedding:
|
|
raise HTTPException(status_code=500, detail="Failed to generate embedding")
|
|
|
|
results = await search_nibis_eh(
|
|
query_embedding=query_embedding,
|
|
year=request.year,
|
|
subject=request.subject,
|
|
niveau=request.niveau,
|
|
limit=request.limit,
|
|
)
|
|
|
|
return [
|
|
NiBiSSearchResult(
|
|
id=r["id"],
|
|
score=r["score"],
|
|
text=r.get("text", "")[:500],
|
|
year=r.get("year"),
|
|
subject=r.get("subject"),
|
|
niveau=r.get("niveau"),
|
|
task_number=r.get("task_number"),
|
|
)
|
|
for r in results
|
|
]
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/nibis/collections")
|
|
async def get_collections_info():
|
|
"""Get information about all Qdrant collections."""
|
|
try:
|
|
client = get_qdrant_client()
|
|
collections = client.get_collections().collections
|
|
|
|
result = []
|
|
for c in collections:
|
|
try:
|
|
info = client.get_collection(c.name)
|
|
result.append({
|
|
"name": c.name,
|
|
"vectors_count": info.vectors_count,
|
|
"points_count": info.points_count,
|
|
"status": info.status.value,
|
|
})
|
|
except Exception as e:
|
|
result.append({
|
|
"name": c.name,
|
|
"error": str(e),
|
|
})
|
|
|
|
return {"collections": result}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/nibis/stats")
|
|
async def get_nibis_stats():
|
|
"""Get detailed statistics about indexed NiBiS data."""
|
|
try:
|
|
qdrant = QdrantService()
|
|
stats = await qdrant.get_stats("bp_nibis_eh")
|
|
|
|
if "error" in stats:
|
|
return {
|
|
"indexed": False,
|
|
"message": "NiBiS collection not yet created. Run ingestion first.",
|
|
}
|
|
|
|
client = get_qdrant_client()
|
|
scroll_result = client.scroll(
|
|
collection_name="bp_nibis_eh",
|
|
limit=1000,
|
|
with_payload=True,
|
|
with_vectors=False,
|
|
)
|
|
|
|
years = set()
|
|
subjects = set()
|
|
niveaus = set()
|
|
|
|
for point in scroll_result[0]:
|
|
if point.payload:
|
|
if "year" in point.payload:
|
|
years.add(point.payload["year"])
|
|
if "subject" in point.payload:
|
|
subjects.add(point.payload["subject"])
|
|
if "niveau" in point.payload:
|
|
niveaus.add(point.payload["niveau"])
|
|
|
|
return {
|
|
"indexed": True,
|
|
"total_chunks": stats.get("points_count", 0),
|
|
"years": sorted(list(years)),
|
|
"subjects": sorted(list(subjects)),
|
|
"niveaus": sorted(list(niveaus)),
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"indexed": False,
|
|
"error": str(e),
|
|
}
|
|
|
|
|
|
@router.delete("/nibis/collection")
|
|
async def delete_nibis_collection():
|
|
"""Delete the entire NiBiS collection. WARNING: removes all indexed data!"""
|
|
try:
|
|
client = get_qdrant_client()
|
|
client.delete_collection("bp_nibis_eh")
|
|
return {"status": "deleted", "collection": "bp_nibis_eh"}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|