A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.
This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).
Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
677 lines
22 KiB
Python
677 lines
22 KiB
Python
"""
|
|
Zeugnis Rights-Aware Crawler
|
|
|
|
Crawls official government documents about school certificates (Zeugnisse)
|
|
from all 16 German federal states. Only indexes documents where AI training
|
|
is legally permitted.
|
|
"""
|
|
|
|
import asyncio
|
|
import hashlib
|
|
import os
|
|
import re
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import Optional, List, Dict, Any, Tuple
|
|
from dataclasses import dataclass, field
|
|
|
|
import httpx
|
|
|
|
# Local imports
|
|
from zeugnis_models import (
|
|
CrawlStatus, LicenseType, DocType, EventType,
|
|
BUNDESLAENDER, TRAINING_PERMISSIONS,
|
|
generate_id, get_training_allowed, get_bundesland_name,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Configuration
|
|
# =============================================================================
|
|
|
|
QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333")
|
|
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "localhost:9000")
|
|
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "test-access-key")
|
|
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "test-secret-key")
|
|
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "breakpilot-rag")
|
|
EMBEDDING_BACKEND = os.getenv("EMBEDDING_BACKEND", "local")
|
|
|
|
ZEUGNIS_COLLECTION = "bp_zeugnis"
|
|
CHUNK_SIZE = 1000
|
|
CHUNK_OVERLAP = 200
|
|
MAX_RETRIES = 3
|
|
RETRY_DELAY = 5 # seconds
|
|
REQUEST_TIMEOUT = 30 # seconds
|
|
USER_AGENT = "BreakPilot-Zeugnis-Crawler/1.0 (Educational Research)"
|
|
|
|
|
|
# =============================================================================
|
|
# Crawler State
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class CrawlerState:
|
|
"""Global crawler state."""
|
|
is_running: bool = False
|
|
current_source_id: Optional[str] = None
|
|
current_bundesland: Optional[str] = None
|
|
queue: List[Dict] = field(default_factory=list)
|
|
documents_crawled_today: int = 0
|
|
documents_indexed_today: int = 0
|
|
errors_today: int = 0
|
|
last_activity: Optional[datetime] = None
|
|
|
|
|
|
_crawler_state = CrawlerState()
|
|
|
|
|
|
# =============================================================================
|
|
# Text Extraction
|
|
# =============================================================================
|
|
|
|
def extract_text_from_pdf(content: bytes) -> str:
|
|
"""Extract text from PDF bytes."""
|
|
try:
|
|
from PyPDF2 import PdfReader
|
|
import io
|
|
|
|
reader = PdfReader(io.BytesIO(content))
|
|
text_parts = []
|
|
for page in reader.pages:
|
|
text = page.extract_text()
|
|
if text:
|
|
text_parts.append(text)
|
|
return "\n\n".join(text_parts)
|
|
except Exception as e:
|
|
print(f"PDF extraction failed: {e}")
|
|
return ""
|
|
|
|
|
|
def extract_text_from_html(content: bytes, encoding: str = "utf-8") -> str:
|
|
"""Extract text from HTML bytes."""
|
|
try:
|
|
from bs4 import BeautifulSoup
|
|
|
|
html = content.decode(encoding, errors="replace")
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
|
|
# Remove script and style elements
|
|
for element in soup(["script", "style", "nav", "header", "footer"]):
|
|
element.decompose()
|
|
|
|
# Get text
|
|
text = soup.get_text(separator="\n", strip=True)
|
|
|
|
# Clean up whitespace
|
|
lines = [line.strip() for line in text.splitlines() if line.strip()]
|
|
return "\n".join(lines)
|
|
except Exception as e:
|
|
print(f"HTML extraction failed: {e}")
|
|
return ""
|
|
|
|
|
|
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> List[str]:
|
|
"""Split text into overlapping chunks."""
|
|
if not text:
|
|
return []
|
|
|
|
chunks = []
|
|
separators = ["\n\n", "\n", ". ", " "]
|
|
|
|
def split_recursive(text: str, sep_index: int = 0) -> List[str]:
|
|
if len(text) <= chunk_size:
|
|
return [text] if text.strip() else []
|
|
|
|
if sep_index >= len(separators):
|
|
# Force split at chunk_size
|
|
result = []
|
|
for i in range(0, len(text), chunk_size - overlap):
|
|
chunk = text[i:i + chunk_size]
|
|
if chunk.strip():
|
|
result.append(chunk)
|
|
return result
|
|
|
|
sep = separators[sep_index]
|
|
parts = text.split(sep)
|
|
result = []
|
|
current = ""
|
|
|
|
for part in parts:
|
|
if len(current) + len(sep) + len(part) <= chunk_size:
|
|
current = current + sep + part if current else part
|
|
else:
|
|
if current.strip():
|
|
result.extend(split_recursive(current, sep_index + 1) if len(current) > chunk_size else [current])
|
|
current = part
|
|
|
|
if current.strip():
|
|
result.extend(split_recursive(current, sep_index + 1) if len(current) > chunk_size else [current])
|
|
|
|
return result
|
|
|
|
chunks = split_recursive(text)
|
|
|
|
# Add overlap
|
|
if overlap > 0 and len(chunks) > 1:
|
|
overlapped = []
|
|
for i, chunk in enumerate(chunks):
|
|
if i > 0:
|
|
# Add end of previous chunk
|
|
prev_end = chunks[i - 1][-overlap:]
|
|
chunk = prev_end + chunk
|
|
overlapped.append(chunk)
|
|
chunks = overlapped
|
|
|
|
return chunks
|
|
|
|
|
|
def compute_hash(content: bytes) -> str:
|
|
"""Compute SHA-256 hash of content."""
|
|
return hashlib.sha256(content).hexdigest()
|
|
|
|
|
|
# =============================================================================
|
|
# Embedding Generation
|
|
# =============================================================================
|
|
|
|
_embedding_model = None
|
|
|
|
|
|
def get_embedding_model():
|
|
"""Get or initialize embedding model."""
|
|
global _embedding_model
|
|
if _embedding_model is None and EMBEDDING_BACKEND == "local":
|
|
try:
|
|
from sentence_transformers import SentenceTransformer
|
|
_embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
|
|
print("Loaded local embedding model: all-MiniLM-L6-v2")
|
|
except ImportError:
|
|
print("Warning: sentence-transformers not installed")
|
|
return _embedding_model
|
|
|
|
|
|
async def generate_embeddings(texts: List[str]) -> List[List[float]]:
|
|
"""Generate embeddings for a list of texts."""
|
|
if not texts:
|
|
return []
|
|
|
|
if EMBEDDING_BACKEND == "local":
|
|
model = get_embedding_model()
|
|
if model:
|
|
embeddings = model.encode(texts, show_progress_bar=False)
|
|
return [emb.tolist() for emb in embeddings]
|
|
return []
|
|
|
|
elif EMBEDDING_BACKEND == "openai":
|
|
import openai
|
|
api_key = os.getenv("OPENAI_API_KEY")
|
|
if not api_key:
|
|
print("Warning: OPENAI_API_KEY not set")
|
|
return []
|
|
|
|
client = openai.AsyncOpenAI(api_key=api_key)
|
|
response = await client.embeddings.create(
|
|
input=texts,
|
|
model="text-embedding-3-small"
|
|
)
|
|
return [item.embedding for item in response.data]
|
|
|
|
return []
|
|
|
|
|
|
# =============================================================================
|
|
# MinIO Storage
|
|
# =============================================================================
|
|
|
|
async def upload_to_minio(
|
|
content: bytes,
|
|
bundesland: str,
|
|
filename: str,
|
|
content_type: str = "application/pdf",
|
|
year: Optional[int] = None,
|
|
) -> Optional[str]:
|
|
"""Upload document to MinIO."""
|
|
try:
|
|
from minio import Minio
|
|
|
|
client = Minio(
|
|
MINIO_ENDPOINT,
|
|
access_key=MINIO_ACCESS_KEY,
|
|
secret_key=MINIO_SECRET_KEY,
|
|
secure=os.getenv("MINIO_SECURE", "false").lower() == "true"
|
|
)
|
|
|
|
# Ensure bucket exists
|
|
if not client.bucket_exists(MINIO_BUCKET):
|
|
client.make_bucket(MINIO_BUCKET)
|
|
|
|
# Build path
|
|
year_str = str(year) if year else str(datetime.now().year)
|
|
object_name = f"landes-daten/{bundesland}/zeugnis/{year_str}/{filename}"
|
|
|
|
# Upload
|
|
import io
|
|
client.put_object(
|
|
MINIO_BUCKET,
|
|
object_name,
|
|
io.BytesIO(content),
|
|
len(content),
|
|
content_type=content_type,
|
|
)
|
|
|
|
return object_name
|
|
except Exception as e:
|
|
print(f"MinIO upload failed: {e}")
|
|
return None
|
|
|
|
|
|
# =============================================================================
|
|
# Qdrant Indexing
|
|
# =============================================================================
|
|
|
|
async def index_in_qdrant(
|
|
doc_id: str,
|
|
chunks: List[str],
|
|
embeddings: List[List[float]],
|
|
metadata: Dict[str, Any],
|
|
) -> int:
|
|
"""Index document chunks in Qdrant."""
|
|
try:
|
|
from qdrant_client import QdrantClient
|
|
from qdrant_client.models import VectorParams, Distance, PointStruct
|
|
|
|
client = QdrantClient(url=QDRANT_URL)
|
|
|
|
# Ensure collection exists
|
|
collections = client.get_collections().collections
|
|
if not any(c.name == ZEUGNIS_COLLECTION for c in collections):
|
|
vector_size = len(embeddings[0]) if embeddings else 384
|
|
client.create_collection(
|
|
collection_name=ZEUGNIS_COLLECTION,
|
|
vectors_config=VectorParams(
|
|
size=vector_size,
|
|
distance=Distance.COSINE,
|
|
),
|
|
)
|
|
print(f"Created Qdrant collection: {ZEUGNIS_COLLECTION}")
|
|
|
|
# Create points
|
|
points = []
|
|
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
|
|
point_id = str(uuid.uuid4())
|
|
points.append(PointStruct(
|
|
id=point_id,
|
|
vector=embedding,
|
|
payload={
|
|
"document_id": doc_id,
|
|
"chunk_index": i,
|
|
"chunk_text": chunk[:500], # Store first 500 chars for preview
|
|
"bundesland": metadata.get("bundesland"),
|
|
"doc_type": metadata.get("doc_type"),
|
|
"title": metadata.get("title"),
|
|
"source_url": metadata.get("url"),
|
|
"training_allowed": metadata.get("training_allowed", False),
|
|
"indexed_at": datetime.now().isoformat(),
|
|
}
|
|
))
|
|
|
|
# Upsert
|
|
if points:
|
|
client.upsert(
|
|
collection_name=ZEUGNIS_COLLECTION,
|
|
points=points,
|
|
)
|
|
|
|
return len(points)
|
|
except Exception as e:
|
|
print(f"Qdrant indexing failed: {e}")
|
|
return 0
|
|
|
|
|
|
# =============================================================================
|
|
# Crawler Worker
|
|
# =============================================================================
|
|
|
|
class ZeugnisCrawler:
|
|
"""Rights-aware crawler for zeugnis documents."""
|
|
|
|
def __init__(self):
|
|
self.http_client: Optional[httpx.AsyncClient] = None
|
|
self.db_pool = None
|
|
|
|
async def init(self):
|
|
"""Initialize crawler resources."""
|
|
self.http_client = httpx.AsyncClient(
|
|
timeout=REQUEST_TIMEOUT,
|
|
follow_redirects=True,
|
|
headers={"User-Agent": USER_AGENT},
|
|
)
|
|
|
|
# Initialize database connection
|
|
try:
|
|
from metrics_db import get_pool
|
|
self.db_pool = await get_pool()
|
|
except Exception as e:
|
|
print(f"Failed to get database pool: {e}")
|
|
|
|
async def close(self):
|
|
"""Close crawler resources."""
|
|
if self.http_client:
|
|
await self.http_client.aclose()
|
|
|
|
async def fetch_url(self, url: str) -> Tuple[Optional[bytes], Optional[str]]:
|
|
"""Fetch URL with retry logic."""
|
|
for attempt in range(MAX_RETRIES):
|
|
try:
|
|
response = await self.http_client.get(url)
|
|
response.raise_for_status()
|
|
content_type = response.headers.get("content-type", "")
|
|
return response.content, content_type
|
|
except httpx.HTTPStatusError as e:
|
|
print(f"HTTP error {e.response.status_code} for {url}")
|
|
if e.response.status_code == 404:
|
|
return None, None
|
|
except Exception as e:
|
|
print(f"Attempt {attempt + 1}/{MAX_RETRIES} failed for {url}: {e}")
|
|
if attempt < MAX_RETRIES - 1:
|
|
await asyncio.sleep(RETRY_DELAY * (attempt + 1))
|
|
return None, None
|
|
|
|
async def crawl_seed_url(
|
|
self,
|
|
seed_url_id: str,
|
|
url: str,
|
|
bundesland: str,
|
|
doc_type: str,
|
|
training_allowed: bool,
|
|
) -> Dict[str, Any]:
|
|
"""Crawl a single seed URL."""
|
|
global _crawler_state
|
|
|
|
result = {
|
|
"seed_url_id": seed_url_id,
|
|
"url": url,
|
|
"success": False,
|
|
"document_id": None,
|
|
"indexed": False,
|
|
"error": None,
|
|
}
|
|
|
|
try:
|
|
# Fetch content
|
|
content, content_type = await self.fetch_url(url)
|
|
if not content:
|
|
result["error"] = "Failed to fetch URL"
|
|
return result
|
|
|
|
# Determine file type
|
|
is_pdf = "pdf" in content_type.lower() or url.lower().endswith(".pdf")
|
|
|
|
# Extract text
|
|
if is_pdf:
|
|
text = extract_text_from_pdf(content)
|
|
filename = url.split("/")[-1] or f"document_{seed_url_id}.pdf"
|
|
else:
|
|
text = extract_text_from_html(content)
|
|
filename = f"document_{seed_url_id}.html"
|
|
|
|
if not text:
|
|
result["error"] = "No text extracted"
|
|
return result
|
|
|
|
# Compute hash for versioning
|
|
content_hash = compute_hash(content)
|
|
|
|
# Upload to MinIO
|
|
minio_path = await upload_to_minio(
|
|
content,
|
|
bundesland,
|
|
filename,
|
|
content_type=content_type or "application/octet-stream",
|
|
)
|
|
|
|
# Generate document ID
|
|
doc_id = generate_id()
|
|
|
|
# Store document in database
|
|
if self.db_pool:
|
|
async with self.db_pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO zeugnis_documents
|
|
(id, seed_url_id, title, url, content_hash, minio_path,
|
|
training_allowed, file_size, content_type)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
|
ON CONFLICT DO NOTHING
|
|
""",
|
|
doc_id, seed_url_id, filename, url, content_hash,
|
|
minio_path, training_allowed, len(content), content_type
|
|
)
|
|
|
|
result["document_id"] = doc_id
|
|
result["success"] = True
|
|
_crawler_state.documents_crawled_today += 1
|
|
|
|
# Only index if training is allowed
|
|
if training_allowed:
|
|
chunks = chunk_text(text)
|
|
if chunks:
|
|
embeddings = await generate_embeddings(chunks)
|
|
if embeddings:
|
|
indexed_count = await index_in_qdrant(
|
|
doc_id,
|
|
chunks,
|
|
embeddings,
|
|
{
|
|
"bundesland": bundesland,
|
|
"doc_type": doc_type,
|
|
"title": filename,
|
|
"url": url,
|
|
"training_allowed": True,
|
|
}
|
|
)
|
|
if indexed_count > 0:
|
|
result["indexed"] = True
|
|
_crawler_state.documents_indexed_today += 1
|
|
|
|
# Update database
|
|
if self.db_pool:
|
|
async with self.db_pool.acquire() as conn:
|
|
await conn.execute(
|
|
"UPDATE zeugnis_documents SET indexed_in_qdrant = true WHERE id = $1",
|
|
doc_id
|
|
)
|
|
else:
|
|
result["indexed"] = False
|
|
result["error"] = "Training not allowed for this source"
|
|
|
|
_crawler_state.last_activity = datetime.now()
|
|
|
|
except Exception as e:
|
|
result["error"] = str(e)
|
|
_crawler_state.errors_today += 1
|
|
|
|
return result
|
|
|
|
async def crawl_source(self, source_id: str) -> Dict[str, Any]:
|
|
"""Crawl all seed URLs for a source."""
|
|
global _crawler_state
|
|
|
|
result = {
|
|
"source_id": source_id,
|
|
"documents_found": 0,
|
|
"documents_indexed": 0,
|
|
"errors": [],
|
|
"started_at": datetime.now(),
|
|
"completed_at": None,
|
|
}
|
|
|
|
if not self.db_pool:
|
|
result["errors"].append("Database not available")
|
|
return result
|
|
|
|
try:
|
|
async with self.db_pool.acquire() as conn:
|
|
# Get source info
|
|
source = await conn.fetchrow(
|
|
"SELECT * FROM zeugnis_sources WHERE id = $1",
|
|
source_id
|
|
)
|
|
if not source:
|
|
result["errors"].append(f"Source not found: {source_id}")
|
|
return result
|
|
|
|
bundesland = source["bundesland"]
|
|
training_allowed = source["training_allowed"]
|
|
|
|
_crawler_state.current_source_id = source_id
|
|
_crawler_state.current_bundesland = bundesland
|
|
|
|
# Get seed URLs
|
|
seed_urls = await conn.fetch(
|
|
"SELECT * FROM zeugnis_seed_urls WHERE source_id = $1 AND status != 'completed'",
|
|
source_id
|
|
)
|
|
|
|
for seed_url in seed_urls:
|
|
# Update status to running
|
|
await conn.execute(
|
|
"UPDATE zeugnis_seed_urls SET status = 'running' WHERE id = $1",
|
|
seed_url["id"]
|
|
)
|
|
|
|
# Crawl
|
|
crawl_result = await self.crawl_seed_url(
|
|
seed_url["id"],
|
|
seed_url["url"],
|
|
bundesland,
|
|
seed_url["doc_type"],
|
|
training_allowed,
|
|
)
|
|
|
|
# Update status
|
|
if crawl_result["success"]:
|
|
result["documents_found"] += 1
|
|
if crawl_result["indexed"]:
|
|
result["documents_indexed"] += 1
|
|
await conn.execute(
|
|
"UPDATE zeugnis_seed_urls SET status = 'completed', last_crawled = NOW() WHERE id = $1",
|
|
seed_url["id"]
|
|
)
|
|
else:
|
|
result["errors"].append(f"{seed_url['url']}: {crawl_result['error']}")
|
|
await conn.execute(
|
|
"UPDATE zeugnis_seed_urls SET status = 'failed', error_message = $2 WHERE id = $1",
|
|
seed_url["id"], crawl_result["error"]
|
|
)
|
|
|
|
# Small delay between requests
|
|
await asyncio.sleep(1)
|
|
|
|
except Exception as e:
|
|
result["errors"].append(str(e))
|
|
|
|
finally:
|
|
result["completed_at"] = datetime.now()
|
|
_crawler_state.current_source_id = None
|
|
_crawler_state.current_bundesland = None
|
|
|
|
return result
|
|
|
|
|
|
# =============================================================================
|
|
# Crawler Control Functions
|
|
# =============================================================================
|
|
|
|
_crawler_instance: Optional[ZeugnisCrawler] = None
|
|
_crawler_task: Optional[asyncio.Task] = None
|
|
|
|
|
|
async def start_crawler(bundesland: Optional[str] = None, source_id: Optional[str] = None) -> bool:
|
|
"""Start the crawler."""
|
|
global _crawler_state, _crawler_instance, _crawler_task
|
|
|
|
if _crawler_state.is_running:
|
|
return False
|
|
|
|
_crawler_state.is_running = True
|
|
_crawler_state.documents_crawled_today = 0
|
|
_crawler_state.documents_indexed_today = 0
|
|
_crawler_state.errors_today = 0
|
|
|
|
_crawler_instance = ZeugnisCrawler()
|
|
await _crawler_instance.init()
|
|
|
|
async def run_crawler():
|
|
try:
|
|
from metrics_db import get_pool
|
|
pool = await get_pool()
|
|
|
|
if pool:
|
|
async with pool.acquire() as conn:
|
|
# Get sources to crawl
|
|
if source_id:
|
|
sources = await conn.fetch(
|
|
"SELECT id, bundesland FROM zeugnis_sources WHERE id = $1",
|
|
source_id
|
|
)
|
|
elif bundesland:
|
|
sources = await conn.fetch(
|
|
"SELECT id, bundesland FROM zeugnis_sources WHERE bundesland = $1",
|
|
bundesland
|
|
)
|
|
else:
|
|
sources = await conn.fetch(
|
|
"SELECT id, bundesland FROM zeugnis_sources ORDER BY bundesland"
|
|
)
|
|
|
|
for source in sources:
|
|
if not _crawler_state.is_running:
|
|
break
|
|
await _crawler_instance.crawl_source(source["id"])
|
|
|
|
except Exception as e:
|
|
print(f"Crawler error: {e}")
|
|
|
|
finally:
|
|
_crawler_state.is_running = False
|
|
if _crawler_instance:
|
|
await _crawler_instance.close()
|
|
|
|
_crawler_task = asyncio.create_task(run_crawler())
|
|
return True
|
|
|
|
|
|
async def stop_crawler() -> bool:
|
|
"""Stop the crawler."""
|
|
global _crawler_state, _crawler_task
|
|
|
|
if not _crawler_state.is_running:
|
|
return False
|
|
|
|
_crawler_state.is_running = False
|
|
|
|
if _crawler_task:
|
|
_crawler_task.cancel()
|
|
try:
|
|
await _crawler_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
return True
|
|
|
|
|
|
def get_crawler_status() -> Dict[str, Any]:
|
|
"""Get current crawler status."""
|
|
global _crawler_state
|
|
return {
|
|
"is_running": _crawler_state.is_running,
|
|
"current_source": _crawler_state.current_source_id,
|
|
"current_bundesland": _crawler_state.current_bundesland,
|
|
"queue_length": len(_crawler_state.queue),
|
|
"documents_crawled_today": _crawler_state.documents_crawled_today,
|
|
"documents_indexed_today": _crawler_state.documents_indexed_today,
|
|
"errors_today": _crawler_state.errors_today,
|
|
"last_activity": _crawler_state.last_activity.isoformat() if _crawler_state.last_activity else None,
|
|
}
|