""" EduSearch Seeds API Routes. CRUD operations for managing education search crawler seed URLs. Direct database access to PostgreSQL. """ import os import logging from typing import Optional, List from datetime import datetime from uuid import UUID from fastapi import APIRouter, HTTPException, Depends, Query from pydantic import BaseModel, Field, HttpUrl import asyncpg logger = logging.getLogger(__name__) router = APIRouter(prefix="/edu-search", tags=["edu-search"]) # Database connection pool _pool: Optional[asyncpg.Pool] = None async def get_db_pool() -> asyncpg.Pool: """Get or create database connection pool.""" global _pool if _pool is None: database_url = os.environ.get("DATABASE_URL") if not database_url: raise RuntimeError("DATABASE_URL nicht konfiguriert - bitte via Vault oder Umgebungsvariable setzen") _pool = await asyncpg.create_pool(database_url, min_size=2, max_size=10) return _pool # ============================================================================= # Pydantic Models # ============================================================================= class CategoryResponse(BaseModel): """Category response model.""" id: str name: str display_name: str description: Optional[str] = None icon: Optional[str] = None sort_order: int is_active: bool class SeedBase(BaseModel): """Base seed model for creation/update.""" url: str = Field(..., max_length=500) name: str = Field(..., max_length=255) description: Optional[str] = None category_name: Optional[str] = Field(None, description="Category name (federal, states, etc.)") source_type: str = Field("GOV", description="GOV, EDU, UNI, etc.") scope: str = Field("FEDERAL", description="FEDERAL, STATE, etc.") state: Optional[str] = Field(None, max_length=5, description="State code (BW, BY, etc.)") trust_boost: float = Field(0.50, ge=0.0, le=1.0) enabled: bool = True crawl_depth: int = Field(2, ge=1, le=5) crawl_frequency: str = Field("weekly", description="hourly, daily, weekly, monthly") class SeedCreate(SeedBase): """Seed creation model.""" pass class SeedUpdate(BaseModel): """Seed update model (all fields optional).""" url: Optional[str] = Field(None, max_length=500) name: Optional[str] = Field(None, max_length=255) description: Optional[str] = None category_name: Optional[str] = None source_type: Optional[str] = None scope: Optional[str] = None state: Optional[str] = Field(None, max_length=5) trust_boost: Optional[float] = Field(None, ge=0.0, le=1.0) enabled: Optional[bool] = None crawl_depth: Optional[int] = Field(None, ge=1, le=5) crawl_frequency: Optional[str] = None class SeedResponse(BaseModel): """Seed response model.""" id: str url: str name: str description: Optional[str] = None category: Optional[str] = None category_display_name: Optional[str] = None source_type: str scope: str state: Optional[str] = None trust_boost: float enabled: bool crawl_depth: int crawl_frequency: str last_crawled_at: Optional[datetime] = None last_crawl_status: Optional[str] = None last_crawl_docs: int = 0 total_documents: int = 0 created_at: datetime updated_at: datetime class SeedsListResponse(BaseModel): """List response with pagination info.""" seeds: List[SeedResponse] total: int page: int page_size: int class StatsResponse(BaseModel): """Crawl statistics response.""" total_seeds: int enabled_seeds: int total_documents: int seeds_by_category: dict seeds_by_state: dict last_crawl_time: Optional[datetime] = None class BulkImportRequest(BaseModel): """Bulk import request.""" seeds: List[SeedCreate] class BulkImportResponse(BaseModel): """Bulk import response.""" imported: int skipped: int errors: List[str] # ============================================================================= # API Endpoints # ============================================================================= @router.get("/categories", response_model=List[CategoryResponse]) async def list_categories(): """List all seed categories.""" pool = await get_db_pool() async with pool.acquire() as conn: rows = await conn.fetch(""" SELECT id, name, display_name, description, icon, sort_order, is_active FROM edu_search_categories WHERE is_active = TRUE ORDER BY sort_order """) return [ CategoryResponse( id=str(row["id"]), name=row["name"], display_name=row["display_name"], description=row["description"], icon=row["icon"], sort_order=row["sort_order"], is_active=row["is_active"], ) for row in rows ] @router.get("/seeds", response_model=SeedsListResponse) async def list_seeds( category: Optional[str] = Query(None, description="Filter by category name"), state: Optional[str] = Query(None, description="Filter by state code"), enabled: Optional[bool] = Query(None, description="Filter by enabled status"), search: Optional[str] = Query(None, description="Search in name/url"), page: int = Query(1, ge=1), page_size: int = Query(50, ge=1, le=200), ): """List seeds with optional filtering and pagination.""" pool = await get_db_pool() async with pool.acquire() as conn: # Build WHERE clause conditions = [] params = [] param_idx = 1 if category: conditions.append(f"c.name = ${param_idx}") params.append(category) param_idx += 1 if state: conditions.append(f"s.state = ${param_idx}") params.append(state) param_idx += 1 if enabled is not None: conditions.append(f"s.enabled = ${param_idx}") params.append(enabled) param_idx += 1 if search: conditions.append(f"(s.name ILIKE ${param_idx} OR s.url ILIKE ${param_idx})") params.append(f"%{search}%") param_idx += 1 where_clause = " AND ".join(conditions) if conditions else "TRUE" # Count total count_query = f""" SELECT COUNT(*) FROM edu_search_seeds s LEFT JOIN edu_search_categories c ON s.category_id = c.id WHERE {where_clause} """ total = await conn.fetchval(count_query, *params) # Get paginated results offset = (page - 1) * page_size params.extend([page_size, offset]) query = f""" SELECT s.id, s.url, s.name, s.description, c.name as category, c.display_name as category_display_name, s.source_type, s.scope, s.state, s.trust_boost, s.enabled, s.crawl_depth, s.crawl_frequency, s.last_crawled_at, s.last_crawl_status, s.last_crawl_docs, s.total_documents, s.created_at, s.updated_at FROM edu_search_seeds s LEFT JOIN edu_search_categories c ON s.category_id = c.id WHERE {where_clause} ORDER BY c.sort_order, s.name LIMIT ${param_idx} OFFSET ${param_idx + 1} """ rows = await conn.fetch(query, *params) seeds = [ SeedResponse( id=str(row["id"]), url=row["url"], name=row["name"], description=row["description"], category=row["category"], category_display_name=row["category_display_name"], source_type=row["source_type"], scope=row["scope"], state=row["state"], trust_boost=float(row["trust_boost"]), enabled=row["enabled"], crawl_depth=row["crawl_depth"], crawl_frequency=row["crawl_frequency"], last_crawled_at=row["last_crawled_at"], last_crawl_status=row["last_crawl_status"], last_crawl_docs=row["last_crawl_docs"] or 0, total_documents=row["total_documents"] or 0, created_at=row["created_at"], updated_at=row["updated_at"], ) for row in rows ] return SeedsListResponse( seeds=seeds, total=total, page=page, page_size=page_size, ) @router.get("/seeds/{seed_id}", response_model=SeedResponse) async def get_seed(seed_id: str): """Get a single seed by ID.""" pool = await get_db_pool() async with pool.acquire() as conn: row = await conn.fetchrow(""" SELECT s.id, s.url, s.name, s.description, c.name as category, c.display_name as category_display_name, s.source_type, s.scope, s.state, s.trust_boost, s.enabled, s.crawl_depth, s.crawl_frequency, s.last_crawled_at, s.last_crawl_status, s.last_crawl_docs, s.total_documents, s.created_at, s.updated_at FROM edu_search_seeds s LEFT JOIN edu_search_categories c ON s.category_id = c.id WHERE s.id = $1 """, seed_id) if not row: raise HTTPException(status_code=404, detail="Seed nicht gefunden") return SeedResponse( id=str(row["id"]), url=row["url"], name=row["name"], description=row["description"], category=row["category"], category_display_name=row["category_display_name"], source_type=row["source_type"], scope=row["scope"], state=row["state"], trust_boost=float(row["trust_boost"]), enabled=row["enabled"], crawl_depth=row["crawl_depth"], crawl_frequency=row["crawl_frequency"], last_crawled_at=row["last_crawled_at"], last_crawl_status=row["last_crawl_status"], last_crawl_docs=row["last_crawl_docs"] or 0, total_documents=row["total_documents"] or 0, created_at=row["created_at"], updated_at=row["updated_at"], ) @router.post("/seeds", response_model=SeedResponse, status_code=201) async def create_seed(seed: SeedCreate): """Create a new seed URL.""" pool = await get_db_pool() async with pool.acquire() as conn: # Get category ID if provided category_id = None if seed.category_name: category_id = await conn.fetchval( "SELECT id FROM edu_search_categories WHERE name = $1", seed.category_name ) try: row = await conn.fetchrow(""" INSERT INTO edu_search_seeds ( url, name, description, category_id, source_type, scope, state, trust_boost, enabled, crawl_depth, crawl_frequency ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING id, created_at, updated_at """, seed.url, seed.name, seed.description, category_id, seed.source_type, seed.scope, seed.state, seed.trust_boost, seed.enabled, seed.crawl_depth, seed.crawl_frequency ) except asyncpg.UniqueViolationError: raise HTTPException(status_code=409, detail="URL existiert bereits") return SeedResponse( id=str(row["id"]), url=seed.url, name=seed.name, description=seed.description, category=seed.category_name, category_display_name=None, source_type=seed.source_type, scope=seed.scope, state=seed.state, trust_boost=seed.trust_boost, enabled=seed.enabled, crawl_depth=seed.crawl_depth, crawl_frequency=seed.crawl_frequency, last_crawled_at=None, last_crawl_status=None, last_crawl_docs=0, total_documents=0, created_at=row["created_at"], updated_at=row["updated_at"], ) @router.put("/seeds/{seed_id}", response_model=SeedResponse) async def update_seed(seed_id: str, seed: SeedUpdate): """Update an existing seed.""" pool = await get_db_pool() async with pool.acquire() as conn: # Build update statement dynamically updates = [] params = [] param_idx = 1 if seed.url is not None: updates.append(f"url = ${param_idx}") params.append(seed.url) param_idx += 1 if seed.name is not None: updates.append(f"name = ${param_idx}") params.append(seed.name) param_idx += 1 if seed.description is not None: updates.append(f"description = ${param_idx}") params.append(seed.description) param_idx += 1 if seed.category_name is not None: category_id = await conn.fetchval( "SELECT id FROM edu_search_categories WHERE name = $1", seed.category_name ) updates.append(f"category_id = ${param_idx}") params.append(category_id) param_idx += 1 if seed.source_type is not None: updates.append(f"source_type = ${param_idx}") params.append(seed.source_type) param_idx += 1 if seed.scope is not None: updates.append(f"scope = ${param_idx}") params.append(seed.scope) param_idx += 1 if seed.state is not None: updates.append(f"state = ${param_idx}") params.append(seed.state) param_idx += 1 if seed.trust_boost is not None: updates.append(f"trust_boost = ${param_idx}") params.append(seed.trust_boost) param_idx += 1 if seed.enabled is not None: updates.append(f"enabled = ${param_idx}") params.append(seed.enabled) param_idx += 1 if seed.crawl_depth is not None: updates.append(f"crawl_depth = ${param_idx}") params.append(seed.crawl_depth) param_idx += 1 if seed.crawl_frequency is not None: updates.append(f"crawl_frequency = ${param_idx}") params.append(seed.crawl_frequency) param_idx += 1 if not updates: raise HTTPException(status_code=400, detail="Keine Felder zum Aktualisieren") updates.append("updated_at = NOW()") params.append(seed_id) query = f""" UPDATE edu_search_seeds SET {", ".join(updates)} WHERE id = ${param_idx} RETURNING id """ result = await conn.fetchrow(query, *params) if not result: raise HTTPException(status_code=404, detail="Seed nicht gefunden") # Return updated seed return await get_seed(seed_id) @router.delete("/seeds/{seed_id}") async def delete_seed(seed_id: str): """Delete a seed.""" pool = await get_db_pool() async with pool.acquire() as conn: result = await conn.execute( "DELETE FROM edu_search_seeds WHERE id = $1", seed_id ) if result == "DELETE 0": raise HTTPException(status_code=404, detail="Seed nicht gefunden") return {"status": "deleted", "id": seed_id} @router.post("/seeds/bulk-import", response_model=BulkImportResponse) async def bulk_import_seeds(request: BulkImportRequest): """Bulk import seeds (skip duplicates).""" pool = await get_db_pool() imported = 0 skipped = 0 errors = [] async with pool.acquire() as conn: # Pre-fetch all category IDs categories = {} rows = await conn.fetch("SELECT id, name FROM edu_search_categories") for row in rows: categories[row["name"]] = row["id"] for seed in request.seeds: try: category_id = categories.get(seed.category_name) if seed.category_name else None await conn.execute(""" INSERT INTO edu_search_seeds ( url, name, description, category_id, source_type, scope, state, trust_boost, enabled, crawl_depth, crawl_frequency ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (url) DO NOTHING """, seed.url, seed.name, seed.description, category_id, seed.source_type, seed.scope, seed.state, seed.trust_boost, seed.enabled, seed.crawl_depth, seed.crawl_frequency ) imported += 1 except asyncpg.UniqueViolationError: skipped += 1 except Exception as e: errors.append(f"{seed.url}: {str(e)}") return BulkImportResponse(imported=imported, skipped=skipped, errors=errors) @router.get("/stats", response_model=StatsResponse) async def get_stats(): """Get crawl statistics.""" pool = await get_db_pool() async with pool.acquire() as conn: # Basic counts total = await conn.fetchval("SELECT COUNT(*) FROM edu_search_seeds") enabled = await conn.fetchval("SELECT COUNT(*) FROM edu_search_seeds WHERE enabled = TRUE") total_docs = await conn.fetchval("SELECT COALESCE(SUM(total_documents), 0) FROM edu_search_seeds") # By category cat_rows = await conn.fetch(""" SELECT c.name, COUNT(s.id) as count FROM edu_search_categories c LEFT JOIN edu_search_seeds s ON c.id = s.category_id GROUP BY c.name """) by_category = {row["name"]: row["count"] for row in cat_rows} # By state state_rows = await conn.fetch(""" SELECT COALESCE(state, 'federal') as state, COUNT(*) as count FROM edu_search_seeds GROUP BY state """) by_state = {row["state"]: row["count"] for row in state_rows} # Last crawl time last_crawl = await conn.fetchval( "SELECT MAX(last_crawled_at) FROM edu_search_seeds" ) return StatsResponse( total_seeds=total, enabled_seeds=enabled, total_documents=total_docs, seeds_by_category=by_category, seeds_by_state=by_state, last_crawl_time=last_crawl, ) # Export for external use (edu-search-service) @router.get("/seeds/export/for-crawler") async def export_seeds_for_crawler(): """Export enabled seeds in format suitable for crawler.""" pool = await get_db_pool() async with pool.acquire() as conn: rows = await conn.fetch(""" SELECT s.url, s.trust_boost, s.source_type, s.scope, s.state, s.crawl_depth, c.name as category FROM edu_search_seeds s LEFT JOIN edu_search_categories c ON s.category_id = c.id WHERE s.enabled = TRUE ORDER BY s.trust_boost DESC """) return { "seeds": [ { "url": row["url"], "trust": float(row["trust_boost"]), "source": row["source_type"], "scope": row["scope"], "state": row["state"], "depth": row["crawl_depth"], "category": row["category"], } for row in rows ], "total": len(rows), "exported_at": datetime.utcnow().isoformat(), } # ============================================================================= # Crawl Status Feedback (from edu-search-service) # ============================================================================= class CrawlStatusUpdate(BaseModel): """Crawl status update from edu-search-service.""" seed_url: str = Field(..., description="The seed URL that was crawled") status: str = Field(..., description="Crawl status: success, error, partial") documents_crawled: int = Field(0, ge=0, description="Number of documents crawled") error_message: Optional[str] = Field(None, description="Error message if status is error") crawl_duration_seconds: float = Field(0.0, ge=0.0, description="Duration of the crawl in seconds") class CrawlStatusResponse(BaseModel): """Response for crawl status update.""" success: bool seed_url: str message: str @router.post("/seeds/crawl-status", response_model=CrawlStatusResponse) async def update_crawl_status(update: CrawlStatusUpdate): """Update crawl status for a seed URL (called by edu-search-service).""" pool = await get_db_pool() async with pool.acquire() as conn: # Find the seed by URL seed = await conn.fetchrow( "SELECT id, total_documents FROM edu_search_seeds WHERE url = $1", update.seed_url ) if not seed: raise HTTPException( status_code=404, detail=f"Seed nicht gefunden: {update.seed_url}" ) # Update the seed with crawl status new_total = (seed["total_documents"] or 0) + update.documents_crawled await conn.execute(""" UPDATE edu_search_seeds SET last_crawled_at = NOW(), last_crawl_status = $2, last_crawl_docs = $3, total_documents = $4, updated_at = NOW() WHERE id = $1 """, seed["id"], update.status, update.documents_crawled, new_total) logger.info( f"Crawl status updated: {update.seed_url} - " f"status={update.status}, docs={update.documents_crawled}, " f"duration={update.crawl_duration_seconds:.1f}s" ) return CrawlStatusResponse( success=True, seed_url=update.seed_url, message=f"Status aktualisiert: {update.documents_crawled} Dokumente gecrawlt" ) class BulkCrawlStatusUpdate(BaseModel): """Bulk crawl status update.""" updates: List[CrawlStatusUpdate] class BulkCrawlStatusResponse(BaseModel): """Response for bulk crawl status update.""" updated: int failed: int errors: List[str] @router.post("/seeds/crawl-status/bulk", response_model=BulkCrawlStatusResponse) async def bulk_update_crawl_status(request: BulkCrawlStatusUpdate): """Bulk update crawl status for multiple seeds.""" pool = await get_db_pool() updated = 0 failed = 0 errors = [] async with pool.acquire() as conn: for update in request.updates: try: seed = await conn.fetchrow( "SELECT id, total_documents FROM edu_search_seeds WHERE url = $1", update.seed_url ) if not seed: failed += 1 errors.append(f"Seed nicht gefunden: {update.seed_url}") continue new_total = (seed["total_documents"] or 0) + update.documents_crawled await conn.execute(""" UPDATE edu_search_seeds SET last_crawled_at = NOW(), last_crawl_status = $2, last_crawl_docs = $3, total_documents = $4, updated_at = NOW() WHERE id = $1 """, seed["id"], update.status, update.documents_crawled, new_total) updated += 1 except Exception as e: failed += 1 errors.append(f"{update.seed_url}: {str(e)}") logger.info(f"Bulk crawl status update: {updated} updated, {failed} failed") return BulkCrawlStatusResponse( updated=updated, failed=failed, errors=errors )