Files
breakpilot-lehrer/klausur-service/backend/hybrid_vocab_extractor.py
Benjamin Boenisch 5a31f52310 Initial commit: breakpilot-lehrer - Lehrer KI Platform
Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website,
Klausur-Service, School-Service, Voice-Service, Geo-Service,
BreakPilot Drive, Agent-Core

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 23:47:26 +01:00

665 lines
21 KiB
Python

"""
Hybrid OCR + LLM Vocabulary Extractor
Zweistufiger Ansatz fuer optimale Vokabel-Extraktion:
1. PaddleOCR fuer schnelle, praezise Texterkennung mit Bounding-Boxes
2. qwen2.5:14b (via LLM Gateway) fuer semantische Strukturierung
Vorteile gegenueber reinem Vision LLM:
- 4x schneller (~7-15 Sek vs 30-60 Sek pro Seite)
- Hoehere Genauigkeit bei gedrucktem Text (95-99%)
- Weniger Halluzinationen (LLM korrigiert nur, erfindet nicht)
- Position-basierte Spaltenerkennung moeglich
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal (Mac Mini).
"""
import os
import io
import json
import logging
import re
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
import uuid
import httpx
import numpy as np
from PIL import Image
# OpenCV is optional - only required for actual image processing
try:
import cv2
CV2_AVAILABLE = True
except ImportError:
cv2 = None
CV2_AVAILABLE = False
logger = logging.getLogger(__name__)
# Configuration - Use Ollama directly (no separate LLM Gateway)
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
LLM_MODEL = os.getenv("LLM_MODEL", "qwen2.5:14b")
# PaddleOCR - Lazy loading
_paddle_ocr = None
def get_paddle_ocr():
"""
Lazy load PaddleOCR to avoid startup delay.
PaddleOCR 3.x API (released May 2025):
- Only 'lang' parameter confirmed valid
- Removed parameters: use_gpu, device, show_log, det, rec, use_onnx
- GPU/CPU selection is automatic
"""
global _paddle_ocr
if _paddle_ocr is None:
try:
from paddleocr import PaddleOCR
import logging as std_logging
# Suppress verbose logging from PaddleOCR and PaddlePaddle
for logger_name in ['ppocr', 'paddle', 'paddleocr', 'root']:
std_logging.getLogger(logger_name).setLevel(std_logging.WARNING)
# PaddleOCR 3.x: Only use 'lang' parameter
# Try German first, then English, then minimal
try:
_paddle_ocr = PaddleOCR(lang="de")
logger.info("PaddleOCR 3.x initialized (lang=de)")
except Exception as e1:
logger.warning(f"PaddleOCR lang=de failed: {e1}")
try:
_paddle_ocr = PaddleOCR(lang="en")
logger.info("PaddleOCR 3.x initialized (lang=en)")
except Exception as e2:
logger.warning(f"PaddleOCR lang=en failed: {e2}")
_paddle_ocr = PaddleOCR()
logger.info("PaddleOCR 3.x initialized (defaults)")
except Exception as e:
logger.error(f"PaddleOCR initialization failed: {e}")
_paddle_ocr = None
return _paddle_ocr
@dataclass
class OCRRegion:
"""Ein erkannter Textbereich mit Position."""
text: str
confidence: float
x1: int
y1: int
x2: int
y2: int
@property
def center_x(self) -> int:
return (self.x1 + self.x2) // 2
@property
def center_y(self) -> int:
return (self.y1 + self.y2) // 2
# =============================================================================
# OCR Pipeline
# =============================================================================
def preprocess_image(img: Image.Image) -> np.ndarray:
"""
Bildvorverarbeitung fuer bessere OCR-Ergebnisse.
- Konvertierung zu RGB
- Optional: Kontrastverstarkung
Raises:
ImportError: If OpenCV is not available
"""
if not CV2_AVAILABLE:
raise ImportError(
"OpenCV (cv2) is required for image preprocessing. "
"Install with: pip install opencv-python-headless"
)
# PIL zu numpy array
img_array = np.array(img)
# Zu RGB konvertieren falls noetig
if len(img_array.shape) == 2:
# Graustufen zu RGB
img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB)
elif img_array.shape[2] == 4:
# RGBA zu RGB
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGBA2RGB)
return img_array
def run_paddle_ocr(image_bytes: bytes) -> Tuple[List[OCRRegion], str]:
"""
Fuehrt PaddleOCR auf einem Bild aus.
PaddleOCR 3.x returns results in format:
- result = ocr.ocr(img) returns list of pages
- Each page contains list of text lines
- Each line: [bbox_points, (text, confidence)]
Returns:
Tuple of (list of OCRRegion, raw_text)
"""
ocr = get_paddle_ocr()
if ocr is None:
logger.error("PaddleOCR not available")
return [], ""
try:
# Bild laden und vorverarbeiten
img = Image.open(io.BytesIO(image_bytes))
img_array = preprocess_image(img)
# OCR ausfuehren - PaddleOCR 3.x API
# Note: cls parameter may not be supported in 3.x, try without it
try:
result = ocr.ocr(img_array)
except TypeError:
# Fallback if ocr() doesn't accept the array directly
logger.warning("Trying alternative OCR call method")
result = ocr.ocr(img_array)
if not result:
logger.warning("PaddleOCR returned empty result")
return [], ""
# Handle different result formats
# PaddleOCR 3.x returns list of OCRResult objects (dict-like)
if isinstance(result, dict):
# Direct dict format with 'rec_texts', 'rec_scores', 'dt_polys'
logger.info("Processing PaddleOCR 3.x dict format")
return _parse_paddleocr_v3_dict(result)
elif isinstance(result, list) and len(result) > 0:
first_item = result[0]
if first_item is None:
logger.warning("PaddleOCR returned None for first page")
return [], ""
# PaddleOCR 3.x: list contains OCRResult objects (dict-like)
# Check if first item has 'rec_texts' key (new format)
if hasattr(first_item, 'get') or isinstance(first_item, dict):
# Try to extract dict keys for new 3.x format
item_dict = dict(first_item) if hasattr(first_item, 'items') else first_item
if 'rec_texts' in item_dict or 'texts' in item_dict:
logger.info("Processing PaddleOCR 3.x OCRResult format")
return _parse_paddleocr_v3_dict(item_dict)
# Check if first item is a list (traditional format)
if isinstance(first_item, list):
# Check if it's the traditional line format [[bbox, (text, conf)], ...]
if len(first_item) > 0 and isinstance(first_item[0], (list, tuple)):
logger.info("Processing PaddleOCR traditional list format")
return _parse_paddleocr_list(first_item)
# Unknown format - try to inspect
logger.warning(f"Unknown result format. Type: {type(first_item)}, Keys: {dir(first_item) if hasattr(first_item, '__dir__') else 'N/A'}")
# Try dict conversion as last resort
try:
item_dict = dict(first_item)
if 'rec_texts' in item_dict:
return _parse_paddleocr_v3_dict(item_dict)
except Exception as e:
logger.warning(f"Could not convert to dict: {e}")
return [], ""
else:
logger.warning(f"Unexpected PaddleOCR result type: {type(result)}")
return [], ""
except Exception as e:
logger.error(f"PaddleOCR execution failed: {e}")
import traceback
logger.error(traceback.format_exc())
return [], ""
def _parse_paddleocr_v3_dict(result: dict) -> Tuple[List[OCRRegion], str]:
"""Parse PaddleOCR 3.x dict format result."""
regions = []
all_text_lines = []
texts = result.get('rec_texts', result.get('texts', []))
scores = result.get('rec_scores', result.get('scores', []))
polys = result.get('dt_polys', result.get('boxes', []))
# Also try rec_boxes which gives direct [x1, y1, x2, y2] format
rec_boxes = result.get('rec_boxes', [])
logger.info(f"PaddleOCR 3.x: {len(texts)} texts, {len(scores)} scores, {len(polys)} polys, {len(rec_boxes)} rec_boxes")
for i, (text, score) in enumerate(zip(texts, scores)):
if not text or not str(text).strip():
continue
# Try to get bounding box - prefer rec_boxes if available
x1, y1, x2, y2 = 0, 0, 100, 50 # Default fallback
if i < len(rec_boxes) and rec_boxes[i] is not None:
# rec_boxes format: [x1, y1, x2, y2] or [[x1, y1, x2, y2]]
box = rec_boxes[i]
try:
if hasattr(box, 'flatten'):
box = box.flatten().tolist()
if len(box) >= 4:
x1, y1, x2, y2 = int(box[0]), int(box[1]), int(box[2]), int(box[3])
except Exception as e:
logger.debug(f"Could not parse rec_box: {e}")
elif i < len(polys) and polys[i] is not None:
# dt_polys format: [[x1,y1], [x2,y2], [x3,y3], [x4,y4]] or numpy array
poly = polys[i]
try:
# Convert numpy array to list if needed
if hasattr(poly, 'tolist'):
poly = poly.tolist()
if len(poly) >= 4:
x_coords = [p[0] for p in poly]
y_coords = [p[1] for p in poly]
x1, y1 = int(min(x_coords)), int(min(y_coords))
x2, y2 = int(max(x_coords)), int(max(y_coords))
except Exception as e:
logger.debug(f"Could not parse polygon: {e}")
region = OCRRegion(
text=text.strip(),
confidence=float(score) if score else 0.5,
x1=x1, y1=y1, x2=x2, y2=y2
)
regions.append(region)
all_text_lines.append(text.strip())
regions.sort(key=lambda r: r.y1)
raw_text = "\n".join(all_text_lines)
logger.info(f"PaddleOCR 3.x extracted {len(regions)} text regions")
return regions, raw_text
def _parse_paddleocr_list(page_result: list) -> Tuple[List[OCRRegion], str]:
"""Parse PaddleOCR traditional list format result."""
regions = []
all_text_lines = []
for line in page_result:
if not line or len(line) < 2:
continue
bbox_points = line[0] # [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
text_info = line[1]
# Handle different text_info formats
if isinstance(text_info, tuple) and len(text_info) >= 2:
text, confidence = text_info[0], text_info[1]
elif isinstance(text_info, str):
text, confidence = text_info, 0.5
else:
continue
if not text or not text.strip():
continue
# Bounding Box extrahieren
x_coords = [p[0] for p in bbox_points]
y_coords = [p[1] for p in bbox_points]
region = OCRRegion(
text=text.strip(),
confidence=float(confidence),
x1=int(min(x_coords)),
y1=int(min(y_coords)),
x2=int(max(x_coords)),
y2=int(max(y_coords))
)
regions.append(region)
all_text_lines.append(text.strip())
# Regionen nach Y-Position sortieren (oben nach unten)
regions.sort(key=lambda r: r.y1)
raw_text = "\n".join(all_text_lines)
logger.info(f"PaddleOCR extracted {len(regions)} text regions")
return regions, raw_text
def group_regions_by_rows(regions: List[OCRRegion], y_tolerance: int = 20) -> List[List[OCRRegion]]:
"""
Gruppiert Textregionen in Zeilen basierend auf Y-Position.
Args:
regions: Liste von OCRRegion
y_tolerance: Max Y-Differenz um zur gleichen Zeile zu gehoeren
Returns:
Liste von Zeilen, jede Zeile ist eine Liste von OCRRegion sortiert nach X
"""
if not regions:
return []
rows = []
current_row = [regions[0]]
current_y = regions[0].center_y
for region in regions[1:]:
if abs(region.center_y - current_y) <= y_tolerance:
# Gleiche Zeile
current_row.append(region)
else:
# Neue Zeile
# Sortiere aktuelle Zeile nach X
current_row.sort(key=lambda r: r.x1)
rows.append(current_row)
current_row = [region]
current_y = region.center_y
# Letzte Zeile nicht vergessen
if current_row:
current_row.sort(key=lambda r: r.x1)
rows.append(current_row)
return rows
def detect_columns(rows: List[List[OCRRegion]]) -> int:
"""
Erkennt die Anzahl der Spalten basierend auf den Textpositionen.
Returns:
Geschaetzte Spaltenanzahl (2 oder 3 fuer Vokabellisten)
"""
if not rows:
return 2
# Zaehle wie viele Elemente pro Zeile
items_per_row = [len(row) for row in rows if len(row) >= 2]
if not items_per_row:
return 2
# Durchschnitt und haeufigster Wert
avg_items = sum(items_per_row) / len(items_per_row)
if avg_items >= 2.5:
return 3 # 3 Spalten: Englisch | Deutsch | Beispiel
else:
return 2 # 2 Spalten: Englisch | Deutsch
def format_ocr_for_llm(regions: List[OCRRegion]) -> str:
"""
Formatiert OCR-Output fuer LLM-Verarbeitung.
Inkludiert Positionsinformationen fuer bessere Strukturerkennung.
"""
rows = group_regions_by_rows(regions)
num_columns = detect_columns(rows)
lines = []
lines.append(f"Erkannte Spalten: {num_columns}")
lines.append("---")
for row in rows:
if len(row) >= 2:
# Tab-separierte Werte fuer LLM
row_text = "\t".join(r.text for r in row)
lines.append(row_text)
elif len(row) == 1:
lines.append(row[0].text)
return "\n".join(lines)
# =============================================================================
# LLM Strukturierung
# =============================================================================
STRUCTURE_PROMPT = """Du erhältst OCR-Output einer Vokabelliste aus einem englischen Schulbuch.
Die Zeilen sind Tab-separiert und enthalten typischerweise:
- 2 Spalten: Englisch | Deutsch
- 3 Spalten: Englisch | Deutsch | Beispielsatz
OCR-Text:
{ocr_text}
AUFGABE: Strukturiere die Vokabeln als JSON-Array.
AUSGABE-FORMAT (nur JSON, keine Erklärungen):
{{
"vocabulary": [
{{"english": "to improve", "german": "verbessern", "example": "I want to improve my English."}},
{{"english": "achievement", "german": "Leistung", "example": null}}
]
}}
REGELN:
1. Erkenne das Spalten-Layout aus den Tab-Trennungen
2. Korrigiere offensichtliche OCR-Fehler kontextuell (z.B. "vereessern""verbessern", "0""o")
3. Bei fehlenden Beispielsätzen: "example": null
4. Überspringe Überschriften, Seitenzahlen, Kapitelnummern
5. Behalte Wortarten bei wenn vorhanden (n, v, adj am Ende des englischen Worts)
6. Gib NUR valides JSON zurück"""
async def structure_vocabulary_with_llm(ocr_text: str) -> List[Dict[str, Any]]:
"""
Verwendet Ollama LLM um OCR-Text zu strukturieren.
Args:
ocr_text: Formatierter OCR-Output
Returns:
Liste von Vokabel-Dictionaries
"""
prompt = STRUCTURE_PROMPT.format(ocr_text=ocr_text)
try:
async with httpx.AsyncClient(timeout=120.0) as client:
# Use Ollama's native /api/chat endpoint
response = await client.post(
f"{OLLAMA_URL}/api/chat",
json={
"model": LLM_MODEL,
"messages": [
{"role": "user", "content": prompt}
],
"stream": False,
"options": {
"temperature": 0.1,
"num_predict": 4096
}
}
)
response.raise_for_status()
data = response.json()
content = data.get("message", {}).get("content", "")
logger.info(f"Ollama LLM response received: {len(content)} chars")
# JSON parsen
return parse_llm_vocabulary_json(content)
except httpx.TimeoutException:
logger.error("Ollama LLM request timed out")
return []
except httpx.HTTPStatusError as e:
logger.error(f"Ollama LLM HTTP error: {e}")
return []
except Exception as e:
logger.error(f"LLM structuring failed: {e}")
return []
def parse_llm_vocabulary_json(text: str) -> List[Dict[str, Any]]:
"""Robustes JSON-Parsing des LLM-Outputs."""
try:
# JSON im Text finden
start = text.find('{')
end = text.rfind('}') + 1
if start == -1 or end == 0:
logger.warning("No JSON found in LLM response")
return []
json_str = text[start:end]
data = json.loads(json_str)
vocabulary = data.get("vocabulary", [])
# Validierung
valid_entries = []
for entry in vocabulary:
english = entry.get("english", "").strip()
german = entry.get("german", "").strip()
if english and german:
valid_entries.append({
"english": english,
"german": german,
"example": entry.get("example")
})
return valid_entries
except json.JSONDecodeError as e:
logger.error(f"JSON parse error: {e}")
# Fallback: Regex extraction
return extract_vocabulary_regex(text)
except Exception as e:
logger.error(f"Vocabulary parsing failed: {e}")
return []
def extract_vocabulary_regex(text: str) -> List[Dict[str, Any]]:
"""Fallback: Vokabeln via Regex extrahieren."""
pattern = r'"english"\s*:\s*"([^"]+)"\s*,\s*"german"\s*:\s*"([^"]+)"'
matches = re.findall(pattern, text)
vocabulary = []
for english, german in matches:
vocabulary.append({
"english": english.strip(),
"german": german.strip(),
"example": None
})
logger.info(f"Regex fallback extracted {len(vocabulary)} entries")
return vocabulary
# =============================================================================
# Public API
# =============================================================================
async def extract_vocabulary_hybrid(
image_bytes: bytes,
page_number: int = 0
) -> Tuple[List[Dict[str, Any]], float, str]:
"""
Hybrid-Extraktion: PaddleOCR + LLM Strukturierung.
Args:
image_bytes: Bild als Bytes
page_number: Seitennummer (0-indexed) fuer Fehlermeldungen
Returns:
Tuple of (vocabulary_list, confidence, error_message)
"""
try:
# Step 1: PaddleOCR
logger.info(f"Starting hybrid extraction for page {page_number + 1}")
regions, raw_text = run_paddle_ocr(image_bytes)
if not regions:
return [], 0.0, f"Seite {page_number + 1}: Kein Text erkannt (OCR)"
# Step 2: Formatieren fuer LLM
formatted_text = format_ocr_for_llm(regions)
logger.info(f"Formatted OCR text: {len(formatted_text)} chars")
# Step 3: LLM Strukturierung
vocabulary = await structure_vocabulary_with_llm(formatted_text)
if not vocabulary:
# Fallback: Versuche direkte Zeilen-Analyse
vocabulary = extract_from_rows_directly(regions)
if not vocabulary:
return [], 0.0, f"Seite {page_number + 1}: Keine Vokabeln erkannt"
# Durchschnittliche OCR-Confidence
avg_confidence = sum(r.confidence for r in regions) / len(regions) if regions else 0.0
logger.info(f"Hybrid extraction completed: {len(vocabulary)} entries, {avg_confidence:.2f} confidence")
return vocabulary, avg_confidence, ""
except Exception as e:
logger.error(f"Hybrid extraction failed: {e}")
import traceback
logger.error(traceback.format_exc())
return [], 0.0, f"Seite {page_number + 1}: Fehler - {str(e)[:50]}"
def extract_from_rows_directly(regions: List[OCRRegion]) -> List[Dict[str, Any]]:
"""
Direkter Fallback: Extrahiere Vokabeln ohne LLM basierend auf Zeilen-Struktur.
Funktioniert nur bei klarem 2-3 Spalten-Layout.
"""
rows = group_regions_by_rows(regions)
vocabulary = []
for row in rows:
if len(row) >= 2:
english = row[0].text.strip()
german = row[1].text.strip()
example = row[2].text.strip() if len(row) >= 3 else None
# Einfache Validierung
if english and german and len(english) > 1 and len(german) > 1:
vocabulary.append({
"english": english,
"german": german,
"example": example
})
logger.info(f"Direct row extraction: {len(vocabulary)} entries")
return vocabulary
# =============================================================================
# Test/Debug
# =============================================================================
async def test_hybrid_extraction(image_path: str):
"""Test-Funktion fuer Entwicklung."""
with open(image_path, "rb") as f:
image_bytes = f.read()
vocab, confidence, error = await extract_vocabulary_hybrid(image_bytes)
print(f"\n=== Hybrid OCR Test ===")
print(f"Confidence: {confidence:.2f}")
print(f"Error: {error or 'None'}")
print(f"Vocabulary ({len(vocab)} entries):")
for v in vocab[:10]:
print(f" - {v['english']} = {v['german']}")
return vocab
if __name__ == "__main__":
import asyncio
import sys
if len(sys.argv) > 1:
asyncio.run(test_hybrid_extraction(sys.argv[1]))
else:
print("Usage: python hybrid_vocab_extractor.py <image_path>")