This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/klausur-service/backend/zeugnis_crawler.py
Benjamin Admin 21a844cb8a fix: Restore all files lost during destructive rebase
A previous `git pull --rebase origin main` dropped 177 local commits,
losing 3400+ files across admin-v2, backend, studio-v2, website,
klausur-service, and many other services. The partial restore attempt
(660295e2) only recovered some files.

This commit restores all missing files from pre-rebase ref 98933f5e
while preserving post-rebase additions (night-scheduler, night-mode UI,
NightModeWidget dashboard integration).

Restored features include:
- AI Module Sidebar (FAB), OCR Labeling, OCR Compare
- GPU Dashboard, RAG Pipeline, Magic Help
- Klausur-Korrektur (8 files), Abitur-Archiv (5+ files)
- Companion, Zeugnisse-Crawler, Screen Flow
- Full backend, studio-v2, website, klausur-service
- All compliance SDKs, agent-core, voice-service
- CI/CD configs, documentation, scripts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 09:51:32 +01:00

677 lines
22 KiB
Python

"""
Zeugnis Rights-Aware Crawler
Crawls official government documents about school certificates (Zeugnisse)
from all 16 German federal states. Only indexes documents where AI training
is legally permitted.
"""
import asyncio
import hashlib
import os
import re
import uuid
from datetime import datetime
from typing import Optional, List, Dict, Any, Tuple
from dataclasses import dataclass, field
import httpx
# Local imports
from zeugnis_models import (
CrawlStatus, LicenseType, DocType, EventType,
BUNDESLAENDER, TRAINING_PERMISSIONS,
generate_id, get_training_allowed, get_bundesland_name,
)
# =============================================================================
# Configuration
# =============================================================================
QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333")
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", "localhost:9000")
MINIO_ACCESS_KEY = os.getenv("MINIO_ACCESS_KEY", "test-access-key")
MINIO_SECRET_KEY = os.getenv("MINIO_SECRET_KEY", "test-secret-key")
MINIO_BUCKET = os.getenv("MINIO_BUCKET", "breakpilot-rag")
EMBEDDING_BACKEND = os.getenv("EMBEDDING_BACKEND", "local")
ZEUGNIS_COLLECTION = "bp_zeugnis"
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200
MAX_RETRIES = 3
RETRY_DELAY = 5 # seconds
REQUEST_TIMEOUT = 30 # seconds
USER_AGENT = "BreakPilot-Zeugnis-Crawler/1.0 (Educational Research)"
# =============================================================================
# Crawler State
# =============================================================================
@dataclass
class CrawlerState:
"""Global crawler state."""
is_running: bool = False
current_source_id: Optional[str] = None
current_bundesland: Optional[str] = None
queue: List[Dict] = field(default_factory=list)
documents_crawled_today: int = 0
documents_indexed_today: int = 0
errors_today: int = 0
last_activity: Optional[datetime] = None
_crawler_state = CrawlerState()
# =============================================================================
# Text Extraction
# =============================================================================
def extract_text_from_pdf(content: bytes) -> str:
"""Extract text from PDF bytes."""
try:
from PyPDF2 import PdfReader
import io
reader = PdfReader(io.BytesIO(content))
text_parts = []
for page in reader.pages:
text = page.extract_text()
if text:
text_parts.append(text)
return "\n\n".join(text_parts)
except Exception as e:
print(f"PDF extraction failed: {e}")
return ""
def extract_text_from_html(content: bytes, encoding: str = "utf-8") -> str:
"""Extract text from HTML bytes."""
try:
from bs4 import BeautifulSoup
html = content.decode(encoding, errors="replace")
soup = BeautifulSoup(html, "html.parser")
# Remove script and style elements
for element in soup(["script", "style", "nav", "header", "footer"]):
element.decompose()
# Get text
text = soup.get_text(separator="\n", strip=True)
# Clean up whitespace
lines = [line.strip() for line in text.splitlines() if line.strip()]
return "\n".join(lines)
except Exception as e:
print(f"HTML extraction failed: {e}")
return ""
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> List[str]:
"""Split text into overlapping chunks."""
if not text:
return []
chunks = []
separators = ["\n\n", "\n", ". ", " "]
def split_recursive(text: str, sep_index: int = 0) -> List[str]:
if len(text) <= chunk_size:
return [text] if text.strip() else []
if sep_index >= len(separators):
# Force split at chunk_size
result = []
for i in range(0, len(text), chunk_size - overlap):
chunk = text[i:i + chunk_size]
if chunk.strip():
result.append(chunk)
return result
sep = separators[sep_index]
parts = text.split(sep)
result = []
current = ""
for part in parts:
if len(current) + len(sep) + len(part) <= chunk_size:
current = current + sep + part if current else part
else:
if current.strip():
result.extend(split_recursive(current, sep_index + 1) if len(current) > chunk_size else [current])
current = part
if current.strip():
result.extend(split_recursive(current, sep_index + 1) if len(current) > chunk_size else [current])
return result
chunks = split_recursive(text)
# Add overlap
if overlap > 0 and len(chunks) > 1:
overlapped = []
for i, chunk in enumerate(chunks):
if i > 0:
# Add end of previous chunk
prev_end = chunks[i - 1][-overlap:]
chunk = prev_end + chunk
overlapped.append(chunk)
chunks = overlapped
return chunks
def compute_hash(content: bytes) -> str:
"""Compute SHA-256 hash of content."""
return hashlib.sha256(content).hexdigest()
# =============================================================================
# Embedding Generation
# =============================================================================
_embedding_model = None
def get_embedding_model():
"""Get or initialize embedding model."""
global _embedding_model
if _embedding_model is None and EMBEDDING_BACKEND == "local":
try:
from sentence_transformers import SentenceTransformer
_embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
print("Loaded local embedding model: all-MiniLM-L6-v2")
except ImportError:
print("Warning: sentence-transformers not installed")
return _embedding_model
async def generate_embeddings(texts: List[str]) -> List[List[float]]:
"""Generate embeddings for a list of texts."""
if not texts:
return []
if EMBEDDING_BACKEND == "local":
model = get_embedding_model()
if model:
embeddings = model.encode(texts, show_progress_bar=False)
return [emb.tolist() for emb in embeddings]
return []
elif EMBEDDING_BACKEND == "openai":
import openai
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
print("Warning: OPENAI_API_KEY not set")
return []
client = openai.AsyncOpenAI(api_key=api_key)
response = await client.embeddings.create(
input=texts,
model="text-embedding-3-small"
)
return [item.embedding for item in response.data]
return []
# =============================================================================
# MinIO Storage
# =============================================================================
async def upload_to_minio(
content: bytes,
bundesland: str,
filename: str,
content_type: str = "application/pdf",
year: Optional[int] = None,
) -> Optional[str]:
"""Upload document to MinIO."""
try:
from minio import Minio
client = Minio(
MINIO_ENDPOINT,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=os.getenv("MINIO_SECURE", "false").lower() == "true"
)
# Ensure bucket exists
if not client.bucket_exists(MINIO_BUCKET):
client.make_bucket(MINIO_BUCKET)
# Build path
year_str = str(year) if year else str(datetime.now().year)
object_name = f"landes-daten/{bundesland}/zeugnis/{year_str}/{filename}"
# Upload
import io
client.put_object(
MINIO_BUCKET,
object_name,
io.BytesIO(content),
len(content),
content_type=content_type,
)
return object_name
except Exception as e:
print(f"MinIO upload failed: {e}")
return None
# =============================================================================
# Qdrant Indexing
# =============================================================================
async def index_in_qdrant(
doc_id: str,
chunks: List[str],
embeddings: List[List[float]],
metadata: Dict[str, Any],
) -> int:
"""Index document chunks in Qdrant."""
try:
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, Distance, PointStruct
client = QdrantClient(url=QDRANT_URL)
# Ensure collection exists
collections = client.get_collections().collections
if not any(c.name == ZEUGNIS_COLLECTION for c in collections):
vector_size = len(embeddings[0]) if embeddings else 384
client.create_collection(
collection_name=ZEUGNIS_COLLECTION,
vectors_config=VectorParams(
size=vector_size,
distance=Distance.COSINE,
),
)
print(f"Created Qdrant collection: {ZEUGNIS_COLLECTION}")
# Create points
points = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
point_id = str(uuid.uuid4())
points.append(PointStruct(
id=point_id,
vector=embedding,
payload={
"document_id": doc_id,
"chunk_index": i,
"chunk_text": chunk[:500], # Store first 500 chars for preview
"bundesland": metadata.get("bundesland"),
"doc_type": metadata.get("doc_type"),
"title": metadata.get("title"),
"source_url": metadata.get("url"),
"training_allowed": metadata.get("training_allowed", False),
"indexed_at": datetime.now().isoformat(),
}
))
# Upsert
if points:
client.upsert(
collection_name=ZEUGNIS_COLLECTION,
points=points,
)
return len(points)
except Exception as e:
print(f"Qdrant indexing failed: {e}")
return 0
# =============================================================================
# Crawler Worker
# =============================================================================
class ZeugnisCrawler:
"""Rights-aware crawler for zeugnis documents."""
def __init__(self):
self.http_client: Optional[httpx.AsyncClient] = None
self.db_pool = None
async def init(self):
"""Initialize crawler resources."""
self.http_client = httpx.AsyncClient(
timeout=REQUEST_TIMEOUT,
follow_redirects=True,
headers={"User-Agent": USER_AGENT},
)
# Initialize database connection
try:
from metrics_db import get_pool
self.db_pool = await get_pool()
except Exception as e:
print(f"Failed to get database pool: {e}")
async def close(self):
"""Close crawler resources."""
if self.http_client:
await self.http_client.aclose()
async def fetch_url(self, url: str) -> Tuple[Optional[bytes], Optional[str]]:
"""Fetch URL with retry logic."""
for attempt in range(MAX_RETRIES):
try:
response = await self.http_client.get(url)
response.raise_for_status()
content_type = response.headers.get("content-type", "")
return response.content, content_type
except httpx.HTTPStatusError as e:
print(f"HTTP error {e.response.status_code} for {url}")
if e.response.status_code == 404:
return None, None
except Exception as e:
print(f"Attempt {attempt + 1}/{MAX_RETRIES} failed for {url}: {e}")
if attempt < MAX_RETRIES - 1:
await asyncio.sleep(RETRY_DELAY * (attempt + 1))
return None, None
async def crawl_seed_url(
self,
seed_url_id: str,
url: str,
bundesland: str,
doc_type: str,
training_allowed: bool,
) -> Dict[str, Any]:
"""Crawl a single seed URL."""
global _crawler_state
result = {
"seed_url_id": seed_url_id,
"url": url,
"success": False,
"document_id": None,
"indexed": False,
"error": None,
}
try:
# Fetch content
content, content_type = await self.fetch_url(url)
if not content:
result["error"] = "Failed to fetch URL"
return result
# Determine file type
is_pdf = "pdf" in content_type.lower() or url.lower().endswith(".pdf")
# Extract text
if is_pdf:
text = extract_text_from_pdf(content)
filename = url.split("/")[-1] or f"document_{seed_url_id}.pdf"
else:
text = extract_text_from_html(content)
filename = f"document_{seed_url_id}.html"
if not text:
result["error"] = "No text extracted"
return result
# Compute hash for versioning
content_hash = compute_hash(content)
# Upload to MinIO
minio_path = await upload_to_minio(
content,
bundesland,
filename,
content_type=content_type or "application/octet-stream",
)
# Generate document ID
doc_id = generate_id()
# Store document in database
if self.db_pool:
async with self.db_pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO zeugnis_documents
(id, seed_url_id, title, url, content_hash, minio_path,
training_allowed, file_size, content_type)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT DO NOTHING
""",
doc_id, seed_url_id, filename, url, content_hash,
minio_path, training_allowed, len(content), content_type
)
result["document_id"] = doc_id
result["success"] = True
_crawler_state.documents_crawled_today += 1
# Only index if training is allowed
if training_allowed:
chunks = chunk_text(text)
if chunks:
embeddings = await generate_embeddings(chunks)
if embeddings:
indexed_count = await index_in_qdrant(
doc_id,
chunks,
embeddings,
{
"bundesland": bundesland,
"doc_type": doc_type,
"title": filename,
"url": url,
"training_allowed": True,
}
)
if indexed_count > 0:
result["indexed"] = True
_crawler_state.documents_indexed_today += 1
# Update database
if self.db_pool:
async with self.db_pool.acquire() as conn:
await conn.execute(
"UPDATE zeugnis_documents SET indexed_in_qdrant = true WHERE id = $1",
doc_id
)
else:
result["indexed"] = False
result["error"] = "Training not allowed for this source"
_crawler_state.last_activity = datetime.now()
except Exception as e:
result["error"] = str(e)
_crawler_state.errors_today += 1
return result
async def crawl_source(self, source_id: str) -> Dict[str, Any]:
"""Crawl all seed URLs for a source."""
global _crawler_state
result = {
"source_id": source_id,
"documents_found": 0,
"documents_indexed": 0,
"errors": [],
"started_at": datetime.now(),
"completed_at": None,
}
if not self.db_pool:
result["errors"].append("Database not available")
return result
try:
async with self.db_pool.acquire() as conn:
# Get source info
source = await conn.fetchrow(
"SELECT * FROM zeugnis_sources WHERE id = $1",
source_id
)
if not source:
result["errors"].append(f"Source not found: {source_id}")
return result
bundesland = source["bundesland"]
training_allowed = source["training_allowed"]
_crawler_state.current_source_id = source_id
_crawler_state.current_bundesland = bundesland
# Get seed URLs
seed_urls = await conn.fetch(
"SELECT * FROM zeugnis_seed_urls WHERE source_id = $1 AND status != 'completed'",
source_id
)
for seed_url in seed_urls:
# Update status to running
await conn.execute(
"UPDATE zeugnis_seed_urls SET status = 'running' WHERE id = $1",
seed_url["id"]
)
# Crawl
crawl_result = await self.crawl_seed_url(
seed_url["id"],
seed_url["url"],
bundesland,
seed_url["doc_type"],
training_allowed,
)
# Update status
if crawl_result["success"]:
result["documents_found"] += 1
if crawl_result["indexed"]:
result["documents_indexed"] += 1
await conn.execute(
"UPDATE zeugnis_seed_urls SET status = 'completed', last_crawled = NOW() WHERE id = $1",
seed_url["id"]
)
else:
result["errors"].append(f"{seed_url['url']}: {crawl_result['error']}")
await conn.execute(
"UPDATE zeugnis_seed_urls SET status = 'failed', error_message = $2 WHERE id = $1",
seed_url["id"], crawl_result["error"]
)
# Small delay between requests
await asyncio.sleep(1)
except Exception as e:
result["errors"].append(str(e))
finally:
result["completed_at"] = datetime.now()
_crawler_state.current_source_id = None
_crawler_state.current_bundesland = None
return result
# =============================================================================
# Crawler Control Functions
# =============================================================================
_crawler_instance: Optional[ZeugnisCrawler] = None
_crawler_task: Optional[asyncio.Task] = None
async def start_crawler(bundesland: Optional[str] = None, source_id: Optional[str] = None) -> bool:
"""Start the crawler."""
global _crawler_state, _crawler_instance, _crawler_task
if _crawler_state.is_running:
return False
_crawler_state.is_running = True
_crawler_state.documents_crawled_today = 0
_crawler_state.documents_indexed_today = 0
_crawler_state.errors_today = 0
_crawler_instance = ZeugnisCrawler()
await _crawler_instance.init()
async def run_crawler():
try:
from metrics_db import get_pool
pool = await get_pool()
if pool:
async with pool.acquire() as conn:
# Get sources to crawl
if source_id:
sources = await conn.fetch(
"SELECT id, bundesland FROM zeugnis_sources WHERE id = $1",
source_id
)
elif bundesland:
sources = await conn.fetch(
"SELECT id, bundesland FROM zeugnis_sources WHERE bundesland = $1",
bundesland
)
else:
sources = await conn.fetch(
"SELECT id, bundesland FROM zeugnis_sources ORDER BY bundesland"
)
for source in sources:
if not _crawler_state.is_running:
break
await _crawler_instance.crawl_source(source["id"])
except Exception as e:
print(f"Crawler error: {e}")
finally:
_crawler_state.is_running = False
if _crawler_instance:
await _crawler_instance.close()
_crawler_task = asyncio.create_task(run_crawler())
return True
async def stop_crawler() -> bool:
"""Stop the crawler."""
global _crawler_state, _crawler_task
if not _crawler_state.is_running:
return False
_crawler_state.is_running = False
if _crawler_task:
_crawler_task.cancel()
try:
await _crawler_task
except asyncio.CancelledError:
pass
return True
def get_crawler_status() -> Dict[str, Any]:
"""Get current crawler status."""
global _crawler_state
return {
"is_running": _crawler_state.is_running,
"current_source": _crawler_state.current_source_id,
"current_bundesland": _crawler_state.current_bundesland,
"queue_length": len(_crawler_state.queue),
"documents_crawled_today": _crawler_state.documents_crawled_today,
"documents_indexed_today": _crawler_state.documents_indexed_today,
"errors_today": _crawler_state.errors_today,
"last_activity": _crawler_state.last_activity.isoformat() if _crawler_state.last_activity else None,
}