""" Admin API for NiBiS Data Management Endpoints for ingestion, monitoring, and data management. """ from fastapi import APIRouter, HTTPException, BackgroundTasks, Query, UploadFile, File, Form from pydantic import BaseModel from typing import Optional, List, Dict from datetime import datetime from pathlib import Path import asyncio import zipfile import shutil import tempfile import os from nibis_ingestion import ( run_ingestion, discover_documents, extract_zip_files, DOCS_BASE_PATH, NiBiSDocument, ) from qdrant_service import QdrantService, search_nibis_eh, get_qdrant_client from eh_pipeline import generate_single_embedding # Optional: MinIO and PostgreSQL integrations try: from minio_storage import upload_rag_document, get_storage_stats, init_minio_bucket MINIO_AVAILABLE = True except ImportError: MINIO_AVAILABLE = False try: from metrics_db import ( init_metrics_tables, store_feedback, log_search, log_upload, calculate_metrics, get_recent_feedback, get_upload_history ) METRICS_DB_AVAILABLE = True except ImportError: METRICS_DB_AVAILABLE = False router = APIRouter(prefix="/api/v1/admin", tags=["Admin"]) # Store for background task status _ingestion_status: Dict = { "running": False, "last_run": None, "last_result": None, } # ============================================================================= # Models # ============================================================================= class IngestionRequest(BaseModel): ewh_only: bool = True year_filter: Optional[int] = None subject_filter: Optional[str] = None class IngestionStatus(BaseModel): running: bool last_run: Optional[str] documents_indexed: Optional[int] chunks_created: Optional[int] errors: Optional[List[str]] class NiBiSSearchRequest(BaseModel): query: str year: Optional[int] = None subject: Optional[str] = None niveau: Optional[str] = None limit: int = 5 class NiBiSSearchResult(BaseModel): id: str score: float text: str year: Optional[int] subject: Optional[str] niveau: Optional[str] task_number: Optional[int] class DataSourceStats(BaseModel): source_dir: str year: int document_count: int subjects: List[str] # ============================================================================= # Endpoints # ============================================================================= @router.get("/nibis/status", response_model=IngestionStatus) async def get_ingestion_status(): """Get status of NiBiS ingestion pipeline.""" last_result = _ingestion_status.get("last_result") or {} return IngestionStatus( running=_ingestion_status["running"], last_run=_ingestion_status.get("last_run"), documents_indexed=last_result.get("documents_indexed"), chunks_created=last_result.get("chunks_created"), errors=(last_result.get("errors") or [])[:10], ) @router.post("/nibis/extract-zips") async def extract_zip_files_endpoint(): """Extract all ZIP files in za-download directories.""" try: extracted = extract_zip_files(DOCS_BASE_PATH) return { "status": "success", "extracted_count": len(extracted), "directories": [str(d) for d in extracted], } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/nibis/discover") async def discover_nibis_documents( ewh_only: bool = Query(True, description="Only return Erwartungshorizonte"), year: Optional[int] = Query(None, description="Filter by year"), subject: Optional[str] = Query(None, description="Filter by subject"), ): """ Discover available NiBiS documents without indexing. Useful for previewing what will be indexed. """ try: documents = discover_documents(DOCS_BASE_PATH, ewh_only=ewh_only) # Apply filters if year: documents = [d for d in documents if d.year == year] if subject: documents = [d for d in documents if subject.lower() in d.subject.lower()] # Group by year and subject by_year: Dict[int, int] = {} by_subject: Dict[str, int] = {} for doc in documents: by_year[doc.year] = by_year.get(doc.year, 0) + 1 by_subject[doc.subject] = by_subject.get(doc.subject, 0) + 1 return { "total_documents": len(documents), "by_year": dict(sorted(by_year.items())), "by_subject": dict(sorted(by_subject.items(), key=lambda x: -x[1])), "sample_documents": [ { "id": d.id, "filename": d.raw_filename, "year": d.year, "subject": d.subject, "niveau": d.niveau, "doc_type": d.doc_type, } for d in documents[:20] ], } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/nibis/ingest") async def start_ingestion( request: IngestionRequest, background_tasks: BackgroundTasks, ): """ Start NiBiS data ingestion in background. This will: 1. Extract any ZIP files 2. Discover all Erwartungshorizonte 3. Extract text from PDFs 4. Generate embeddings 5. Index in Qdrant """ if _ingestion_status["running"]: raise HTTPException( status_code=409, detail="Ingestion already running. Check /nibis/status for progress." ) async def run_ingestion_task(): global _ingestion_status _ingestion_status["running"] = True _ingestion_status["last_run"] = datetime.now().isoformat() try: result = await run_ingestion( ewh_only=request.ewh_only, dry_run=False, year_filter=request.year_filter, subject_filter=request.subject_filter, ) _ingestion_status["last_result"] = result except Exception as e: _ingestion_status["last_result"] = {"error": str(e), "errors": [str(e)]} finally: _ingestion_status["running"] = False background_tasks.add_task(run_ingestion_task) return { "status": "started", "message": "Ingestion started in background. Check /nibis/status for progress.", "filters": { "ewh_only": request.ewh_only, "year": request.year_filter, "subject": request.subject_filter, }, } @router.post("/nibis/search", response_model=List[NiBiSSearchResult]) async def search_nibis(request: NiBiSSearchRequest): """ Semantic search in NiBiS Erwartungshorizonte. Returns relevant chunks based on query. """ try: # Generate query embedding query_embedding = await generate_single_embedding(request.query) if not query_embedding: raise HTTPException(status_code=500, detail="Failed to generate embedding") # Search results = await search_nibis_eh( query_embedding=query_embedding, year=request.year, subject=request.subject, niveau=request.niveau, limit=request.limit, ) return [ NiBiSSearchResult( id=r["id"], score=r["score"], text=r.get("text", "")[:500], # Truncate for response year=r.get("year"), subject=r.get("subject"), niveau=r.get("niveau"), task_number=r.get("task_number"), ) for r in results ] except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/nibis/collections") async def get_collections_info(): """Get information about all Qdrant collections.""" try: client = get_qdrant_client() collections = client.get_collections().collections result = [] for c in collections: try: info = client.get_collection(c.name) result.append({ "name": c.name, "vectors_count": info.vectors_count, "points_count": info.points_count, "status": info.status.value, }) except Exception as e: result.append({ "name": c.name, "error": str(e), }) return {"collections": result} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/nibis/stats") async def get_nibis_stats(): """ Get detailed statistics about indexed NiBiS data. """ try: qdrant = QdrantService() stats = await qdrant.get_stats("bp_nibis_eh") if "error" in stats: return { "indexed": False, "message": "NiBiS collection not yet created. Run ingestion first.", } # Get sample data to show coverage client = get_qdrant_client() # Scroll to get unique years/subjects scroll_result = client.scroll( collection_name="bp_nibis_eh", limit=1000, with_payload=True, with_vectors=False, ) years = set() subjects = set() niveaus = set() for point in scroll_result[0]: if point.payload: if "year" in point.payload: years.add(point.payload["year"]) if "subject" in point.payload: subjects.add(point.payload["subject"]) if "niveau" in point.payload: niveaus.add(point.payload["niveau"]) return { "indexed": True, "total_chunks": stats.get("points_count", 0), "years": sorted(list(years)), "subjects": sorted(list(subjects)), "niveaus": sorted(list(niveaus)), } except Exception as e: return { "indexed": False, "error": str(e), } @router.delete("/nibis/collection") async def delete_nibis_collection(): """ Delete the entire NiBiS collection. WARNING: This will remove all indexed data! """ try: client = get_qdrant_client() client.delete_collection("bp_nibis_eh") return {"status": "deleted", "collection": "bp_nibis_eh"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ============================================================================= # RAG Upload API - ZIP and PDF Upload Support # ============================================================================= # Upload directory configuration RAG_UPLOAD_BASE = Path(os.getenv("RAG_UPLOAD_BASE", str(DOCS_BASE_PATH))) # Store for upload tracking _upload_history: List[Dict] = [] class UploadResult(BaseModel): status: str files_received: int pdfs_extracted: int target_directory: str errors: List[str] @router.post("/rag/upload", response_model=UploadResult) async def upload_rag_documents( background_tasks: BackgroundTasks, file: UploadFile = File(...), collection: str = Form(default="bp_nibis_eh"), year: Optional[int] = Form(default=None), auto_ingest: bool = Form(default=False), ): """ Upload documents for RAG indexing. Supports: - ZIP archives (automatically extracted) - Individual PDF files Files are stored in the NiBiS directory structure for ingestion. """ errors = [] pdfs_extracted = 0 # Determine target year target_year = year or datetime.now().year # Target directory: za-download/YYYY/ target_dir = RAG_UPLOAD_BASE / "za-download" / str(target_year) target_dir.mkdir(parents=True, exist_ok=True) try: filename = file.filename or "upload" if filename.lower().endswith(".zip"): # Handle ZIP file with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp: content = await file.read() tmp.write(content) tmp_path = tmp.name try: with zipfile.ZipFile(tmp_path, 'r') as zf: # Extract PDFs from ZIP for member in zf.namelist(): if member.lower().endswith(".pdf") and not member.startswith("__MACOSX"): # Get just the filename, ignore directory structure in ZIP pdf_name = Path(member).name if pdf_name: target_path = target_dir / pdf_name # Extract to target with zf.open(member) as src: with open(target_path, 'wb') as dst: dst.write(src.read()) pdfs_extracted += 1 finally: os.unlink(tmp_path) elif filename.lower().endswith(".pdf"): # Handle single PDF target_path = target_dir / filename content = await file.read() with open(target_path, 'wb') as f: f.write(content) pdfs_extracted = 1 else: raise HTTPException( status_code=400, detail=f"Unsupported file type: {filename}. Only .zip and .pdf are allowed." ) # Track upload in memory upload_record = { "timestamp": datetime.now().isoformat(), "filename": filename, "collection": collection, "year": target_year, "pdfs_extracted": pdfs_extracted, "target_directory": str(target_dir), } _upload_history.append(upload_record) # Keep only last 100 uploads in memory if len(_upload_history) > 100: _upload_history.pop(0) # Store in PostgreSQL if available if METRICS_DB_AVAILABLE: await log_upload( filename=filename, collection_name=collection, year=target_year, pdfs_extracted=pdfs_extracted, minio_path=str(target_dir), ) # Auto-ingest if requested if auto_ingest and not _ingestion_status["running"]: async def run_auto_ingest(): global _ingestion_status _ingestion_status["running"] = True _ingestion_status["last_run"] = datetime.now().isoformat() try: result = await run_ingestion( ewh_only=True, dry_run=False, year_filter=target_year, ) _ingestion_status["last_result"] = result except Exception as e: _ingestion_status["last_result"] = {"error": str(e), "errors": [str(e)]} finally: _ingestion_status["running"] = False background_tasks.add_task(run_auto_ingest) return UploadResult( status="success", files_received=1, pdfs_extracted=pdfs_extracted, target_directory=str(target_dir), errors=errors, ) except HTTPException: raise except Exception as e: errors.append(str(e)) raise HTTPException(status_code=500, detail=str(e)) @router.get("/rag/upload/history") async def get_upload_history(limit: int = Query(default=20, le=100)): """Get recent upload history.""" return { "uploads": _upload_history[-limit:][::-1], # Most recent first "total": len(_upload_history), } @router.get("/rag/metrics") async def get_rag_metrics( collection: Optional[str] = Query(default=None), days: int = Query(default=7, le=90), ): """ Get RAG quality metrics. Uses PostgreSQL for real metrics if available, otherwise returns defaults. """ if METRICS_DB_AVAILABLE: metrics = await calculate_metrics(collection_name=collection, days=days) if metrics.get("connected"): return metrics # Fallback: Return placeholder metrics return { "precision_at_5": 0.78, "recall_at_10": 0.85, "mrr": 0.72, "avg_latency_ms": 52, "total_ratings": len(_upload_history), "error_rate": 0.3, "score_distribution": { "0.9+": 23, "0.7-0.9": 41, "0.5-0.7": 28, "<0.5": 8, }, "note": "Placeholder metrics - PostgreSQL not connected", "connected": False, } @router.post("/rag/search/feedback") async def submit_search_feedback( result_id: str = Form(...), rating: int = Form(..., ge=1, le=5), notes: Optional[str] = Form(default=None), query: Optional[str] = Form(default=None), collection: Optional[str] = Form(default=None), score: Optional[float] = Form(default=None), ): """ Submit feedback for a search result. Used for quality tracking and metrics. """ feedback_record = { "timestamp": datetime.now().isoformat(), "result_id": result_id, "rating": rating, "notes": notes, } stored = False if METRICS_DB_AVAILABLE: stored = await store_feedback( result_id=result_id, rating=rating, query_text=query, collection_name=collection, score=score, notes=notes, ) return { "status": "stored" if stored else "received", "feedback": feedback_record, "persisted": stored, } @router.get("/rag/storage/stats") async def get_storage_statistics(): """Get MinIO storage statistics.""" if MINIO_AVAILABLE: stats = await get_storage_stats() return stats return { "error": "MinIO not available", "connected": False, } @router.post("/rag/init") async def initialize_rag_services(): """Initialize RAG services (MinIO bucket, PostgreSQL tables).""" results = { "minio": False, "postgres": False, } if MINIO_AVAILABLE: results["minio"] = await init_minio_bucket() if METRICS_DB_AVAILABLE: results["postgres"] = await init_metrics_tables() return { "status": "initialized", "services": results, } # ============================================================================= # Legal Templates API - Document Generator Support # ============================================================================= # Import legal templates modules try: from legal_templates_ingestion import ( LegalTemplatesIngestion, LEGAL_TEMPLATES_COLLECTION, ) from template_sources import ( TEMPLATE_SOURCES, TEMPLATE_TYPES, JURISDICTIONS, LicenseType, get_enabled_sources, get_sources_by_priority, ) from qdrant_service import ( search_legal_templates, get_legal_templates_stats, init_legal_templates_collection, ) LEGAL_TEMPLATES_AVAILABLE = True except ImportError as e: print(f"Legal templates module not available: {e}") LEGAL_TEMPLATES_AVAILABLE = False # Store for templates ingestion status _templates_ingestion_status: Dict = { "running": False, "last_run": None, "current_source": None, "results": {}, } class TemplatesSearchRequest(BaseModel): query: str template_type: Optional[str] = None license_types: Optional[List[str]] = None language: Optional[str] = None jurisdiction: Optional[str] = None attribution_required: Optional[bool] = None limit: int = 10 class TemplatesSearchResult(BaseModel): id: str score: float text: str document_title: Optional[str] template_type: Optional[str] clause_category: Optional[str] language: Optional[str] jurisdiction: Optional[str] license_id: Optional[str] license_name: Optional[str] attribution_required: Optional[bool] attribution_text: Optional[str] source_name: Optional[str] source_url: Optional[str] placeholders: Optional[List[str]] is_complete_document: Optional[bool] requires_customization: Optional[bool] class SourceIngestRequest(BaseModel): source_name: str @router.get("/templates/status") async def get_templates_status(): """Get status of legal templates collection and ingestion.""" if not LEGAL_TEMPLATES_AVAILABLE: return { "available": False, "error": "Legal templates module not available", } try: stats = await get_legal_templates_stats() return { "available": True, "collection": LEGAL_TEMPLATES_COLLECTION, "ingestion": { "running": _templates_ingestion_status["running"], "last_run": _templates_ingestion_status.get("last_run"), "current_source": _templates_ingestion_status.get("current_source"), "results": _templates_ingestion_status.get("results", {}), }, "stats": stats, } except Exception as e: return { "available": True, "error": str(e), "ingestion": _templates_ingestion_status, } @router.get("/templates/sources") async def get_templates_sources(): """Get list of all template sources with their configuration.""" if not LEGAL_TEMPLATES_AVAILABLE: raise HTTPException(status_code=503, detail="Legal templates module not available") sources = [] for source in TEMPLATE_SOURCES: sources.append({ "name": source.name, "description": source.description, "license_type": source.license_type.value, "license_name": source.license_info.name, "template_types": source.template_types, "languages": source.languages, "jurisdiction": source.jurisdiction, "repo_url": source.repo_url, "web_url": source.web_url, "priority": source.priority, "enabled": source.enabled, "attribution_required": source.license_info.attribution_required, }) return { "sources": sources, "total": len(sources), "enabled": len([s for s in TEMPLATE_SOURCES if s.enabled]), "template_types": TEMPLATE_TYPES, "jurisdictions": JURISDICTIONS, } @router.get("/templates/licenses") async def get_templates_licenses(): """Get license statistics for indexed templates.""" if not LEGAL_TEMPLATES_AVAILABLE: raise HTTPException(status_code=503, detail="Legal templates module not available") try: stats = await get_legal_templates_stats() return { "licenses": stats.get("licenses", {}), "total_chunks": stats.get("points_count", 0), } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/templates/ingest") async def start_templates_ingestion( background_tasks: BackgroundTasks, max_priority: int = Query(default=3, ge=1, le=5, description="Maximum priority level (1=highest)"), ): """ Start legal templates ingestion in background. Ingests all enabled sources up to the specified priority level. Priority levels: - 1: CC0 sources (github-site-policy, opr-vc, etc.) - 2: MIT sources (webflorist, tempest, etc.) - 3: CC BY 4.0 sources (common-paper, etc.) - 4: Public domain/Unlicense (bundestag-gesetze) - 5: Reuse notice sources """ if not LEGAL_TEMPLATES_AVAILABLE: raise HTTPException(status_code=503, detail="Legal templates module not available") if _templates_ingestion_status["running"]: raise HTTPException( status_code=409, detail="Templates ingestion already running. Check /templates/status for progress." ) async def run_templates_ingestion(): global _templates_ingestion_status _templates_ingestion_status["running"] = True _templates_ingestion_status["last_run"] = datetime.now().isoformat() _templates_ingestion_status["results"] = {} try: ingestion = LegalTemplatesIngestion() sources = get_sources_by_priority(max_priority) for source in sources: _templates_ingestion_status["current_source"] = source.name try: status = await ingestion.ingest_source(source) _templates_ingestion_status["results"][source.name] = { "status": status.status, "documents_found": status.documents_found, "chunks_indexed": status.chunks_indexed, "errors": status.errors[:5] if status.errors else [], } except Exception as e: _templates_ingestion_status["results"][source.name] = { "status": "failed", "error": str(e), } await ingestion.close() except Exception as e: _templates_ingestion_status["results"]["_global_error"] = str(e) finally: _templates_ingestion_status["running"] = False _templates_ingestion_status["current_source"] = None background_tasks.add_task(run_templates_ingestion) sources = get_sources_by_priority(max_priority) return { "status": "started", "message": f"Ingesting {len(sources)} sources up to priority {max_priority}", "sources": [s.name for s in sources], } @router.post("/templates/ingest-source") async def ingest_single_source( request: SourceIngestRequest, background_tasks: BackgroundTasks, ): """Ingest a single template source by name.""" if not LEGAL_TEMPLATES_AVAILABLE: raise HTTPException(status_code=503, detail="Legal templates module not available") source = next((s for s in TEMPLATE_SOURCES if s.name == request.source_name), None) if not source: raise HTTPException( status_code=404, detail=f"Source not found: {request.source_name}. Use /templates/sources to list available sources." ) if not source.enabled: raise HTTPException( status_code=400, detail=f"Source is disabled: {request.source_name}" ) if _templates_ingestion_status["running"]: raise HTTPException( status_code=409, detail="Templates ingestion already running." ) async def run_single_ingestion(): global _templates_ingestion_status _templates_ingestion_status["running"] = True _templates_ingestion_status["current_source"] = source.name _templates_ingestion_status["last_run"] = datetime.now().isoformat() try: ingestion = LegalTemplatesIngestion() status = await ingestion.ingest_source(source) _templates_ingestion_status["results"][source.name] = { "status": status.status, "documents_found": status.documents_found, "chunks_indexed": status.chunks_indexed, "errors": status.errors[:5] if status.errors else [], } await ingestion.close() except Exception as e: _templates_ingestion_status["results"][source.name] = { "status": "failed", "error": str(e), } finally: _templates_ingestion_status["running"] = False _templates_ingestion_status["current_source"] = None background_tasks.add_task(run_single_ingestion) return { "status": "started", "source": source.name, "license": source.license_type.value, "template_types": source.template_types, } @router.post("/templates/search", response_model=List[TemplatesSearchResult]) async def search_templates(request: TemplatesSearchRequest): """ Semantic search in legal templates collection. Returns relevant template chunks with license and attribution info. """ if not LEGAL_TEMPLATES_AVAILABLE: raise HTTPException(status_code=503, detail="Legal templates module not available") try: # Generate query embedding query_embedding = await generate_single_embedding(request.query) if not query_embedding: raise HTTPException(status_code=500, detail="Failed to generate embedding") # Search results = await search_legal_templates( query_embedding=query_embedding, template_type=request.template_type, license_types=request.license_types, language=request.language, jurisdiction=request.jurisdiction, attribution_required=request.attribution_required, limit=request.limit, ) return [ TemplatesSearchResult( id=r["id"], score=r["score"], text=r.get("text", "")[:1000], # Truncate for response document_title=r.get("document_title"), template_type=r.get("template_type"), clause_category=r.get("clause_category"), language=r.get("language"), jurisdiction=r.get("jurisdiction"), license_id=r.get("license_id"), license_name=r.get("license_name"), attribution_required=r.get("attribution_required"), attribution_text=r.get("attribution_text"), source_name=r.get("source_name"), source_url=r.get("source_url"), placeholders=r.get("placeholders"), is_complete_document=r.get("is_complete_document"), requires_customization=r.get("requires_customization"), ) for r in results ] except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.delete("/templates/reset") async def reset_templates_collection(): """ Delete and recreate the legal templates collection. WARNING: This will remove all indexed templates! """ if not LEGAL_TEMPLATES_AVAILABLE: raise HTTPException(status_code=503, detail="Legal templates module not available") if _templates_ingestion_status["running"]: raise HTTPException( status_code=409, detail="Cannot reset while ingestion is running" ) try: ingestion = LegalTemplatesIngestion() ingestion.reset_collection() await ingestion.close() # Clear ingestion status _templates_ingestion_status["results"] = {} return { "status": "reset", "collection": LEGAL_TEMPLATES_COLLECTION, "message": "Collection deleted and recreated. Run ingestion to populate.", } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.delete("/templates/source/{source_name}") async def delete_templates_source(source_name: str): """Delete all templates from a specific source.""" if not LEGAL_TEMPLATES_AVAILABLE: raise HTTPException(status_code=503, detail="Legal templates module not available") try: from qdrant_service import delete_legal_templates_by_source count = await delete_legal_templates_by_source(source_name) # Update status if source_name in _templates_ingestion_status.get("results", {}): del _templates_ingestion_status["results"][source_name] return { "status": "deleted", "source": source_name, "chunks_deleted": count, } except Exception as e: raise HTTPException(status_code=500, detail=str(e))