Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website, Klausur-Service, School-Service, Voice-Service, Geo-Service, BreakPilot Drive, Agent-Core Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
665 lines
21 KiB
Python
665 lines
21 KiB
Python
"""
|
|
Hybrid OCR + LLM Vocabulary Extractor
|
|
|
|
Zweistufiger Ansatz fuer optimale Vokabel-Extraktion:
|
|
1. PaddleOCR fuer schnelle, praezise Texterkennung mit Bounding-Boxes
|
|
2. qwen2.5:14b (via LLM Gateway) fuer semantische Strukturierung
|
|
|
|
Vorteile gegenueber reinem Vision LLM:
|
|
- 4x schneller (~7-15 Sek vs 30-60 Sek pro Seite)
|
|
- Hoehere Genauigkeit bei gedrucktem Text (95-99%)
|
|
- Weniger Halluzinationen (LLM korrigiert nur, erfindet nicht)
|
|
- Position-basierte Spaltenerkennung moeglich
|
|
|
|
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal (Mac Mini).
|
|
"""
|
|
|
|
import os
|
|
import io
|
|
import json
|
|
import logging
|
|
import re
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
from dataclasses import dataclass
|
|
import uuid
|
|
|
|
import httpx
|
|
import numpy as np
|
|
from PIL import Image
|
|
|
|
# OpenCV is optional - only required for actual image processing
|
|
try:
|
|
import cv2
|
|
CV2_AVAILABLE = True
|
|
except ImportError:
|
|
cv2 = None
|
|
CV2_AVAILABLE = False
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration - Use Ollama directly (no separate LLM Gateway)
|
|
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
|
|
LLM_MODEL = os.getenv("LLM_MODEL", "qwen2.5:14b")
|
|
|
|
# PaddleOCR - Lazy loading
|
|
_paddle_ocr = None
|
|
|
|
|
|
def get_paddle_ocr():
|
|
"""
|
|
Lazy load PaddleOCR to avoid startup delay.
|
|
|
|
PaddleOCR 3.x API (released May 2025):
|
|
- Only 'lang' parameter confirmed valid
|
|
- Removed parameters: use_gpu, device, show_log, det, rec, use_onnx
|
|
- GPU/CPU selection is automatic
|
|
"""
|
|
global _paddle_ocr
|
|
if _paddle_ocr is None:
|
|
try:
|
|
from paddleocr import PaddleOCR
|
|
import logging as std_logging
|
|
|
|
# Suppress verbose logging from PaddleOCR and PaddlePaddle
|
|
for logger_name in ['ppocr', 'paddle', 'paddleocr', 'root']:
|
|
std_logging.getLogger(logger_name).setLevel(std_logging.WARNING)
|
|
|
|
# PaddleOCR 3.x: Only use 'lang' parameter
|
|
# Try German first, then English, then minimal
|
|
try:
|
|
_paddle_ocr = PaddleOCR(lang="de")
|
|
logger.info("PaddleOCR 3.x initialized (lang=de)")
|
|
except Exception as e1:
|
|
logger.warning(f"PaddleOCR lang=de failed: {e1}")
|
|
try:
|
|
_paddle_ocr = PaddleOCR(lang="en")
|
|
logger.info("PaddleOCR 3.x initialized (lang=en)")
|
|
except Exception as e2:
|
|
logger.warning(f"PaddleOCR lang=en failed: {e2}")
|
|
_paddle_ocr = PaddleOCR()
|
|
logger.info("PaddleOCR 3.x initialized (defaults)")
|
|
|
|
except Exception as e:
|
|
logger.error(f"PaddleOCR initialization failed: {e}")
|
|
_paddle_ocr = None
|
|
|
|
return _paddle_ocr
|
|
|
|
|
|
@dataclass
|
|
class OCRRegion:
|
|
"""Ein erkannter Textbereich mit Position."""
|
|
text: str
|
|
confidence: float
|
|
x1: int
|
|
y1: int
|
|
x2: int
|
|
y2: int
|
|
|
|
@property
|
|
def center_x(self) -> int:
|
|
return (self.x1 + self.x2) // 2
|
|
|
|
@property
|
|
def center_y(self) -> int:
|
|
return (self.y1 + self.y2) // 2
|
|
|
|
|
|
# =============================================================================
|
|
# OCR Pipeline
|
|
# =============================================================================
|
|
|
|
def preprocess_image(img: Image.Image) -> np.ndarray:
|
|
"""
|
|
Bildvorverarbeitung fuer bessere OCR-Ergebnisse.
|
|
|
|
- Konvertierung zu RGB
|
|
- Optional: Kontrastverstarkung
|
|
|
|
Raises:
|
|
ImportError: If OpenCV is not available
|
|
"""
|
|
if not CV2_AVAILABLE:
|
|
raise ImportError(
|
|
"OpenCV (cv2) is required for image preprocessing. "
|
|
"Install with: pip install opencv-python-headless"
|
|
)
|
|
|
|
# PIL zu numpy array
|
|
img_array = np.array(img)
|
|
|
|
# Zu RGB konvertieren falls noetig
|
|
if len(img_array.shape) == 2:
|
|
# Graustufen zu RGB
|
|
img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB)
|
|
elif img_array.shape[2] == 4:
|
|
# RGBA zu RGB
|
|
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGBA2RGB)
|
|
|
|
return img_array
|
|
|
|
|
|
def run_paddle_ocr(image_bytes: bytes) -> Tuple[List[OCRRegion], str]:
|
|
"""
|
|
Fuehrt PaddleOCR auf einem Bild aus.
|
|
|
|
PaddleOCR 3.x returns results in format:
|
|
- result = ocr.ocr(img) returns list of pages
|
|
- Each page contains list of text lines
|
|
- Each line: [bbox_points, (text, confidence)]
|
|
|
|
Returns:
|
|
Tuple of (list of OCRRegion, raw_text)
|
|
"""
|
|
ocr = get_paddle_ocr()
|
|
if ocr is None:
|
|
logger.error("PaddleOCR not available")
|
|
return [], ""
|
|
|
|
try:
|
|
# Bild laden und vorverarbeiten
|
|
img = Image.open(io.BytesIO(image_bytes))
|
|
img_array = preprocess_image(img)
|
|
|
|
# OCR ausfuehren - PaddleOCR 3.x API
|
|
# Note: cls parameter may not be supported in 3.x, try without it
|
|
try:
|
|
result = ocr.ocr(img_array)
|
|
except TypeError:
|
|
# Fallback if ocr() doesn't accept the array directly
|
|
logger.warning("Trying alternative OCR call method")
|
|
result = ocr.ocr(img_array)
|
|
|
|
if not result:
|
|
logger.warning("PaddleOCR returned empty result")
|
|
return [], ""
|
|
|
|
# Handle different result formats
|
|
# PaddleOCR 3.x returns list of OCRResult objects (dict-like)
|
|
if isinstance(result, dict):
|
|
# Direct dict format with 'rec_texts', 'rec_scores', 'dt_polys'
|
|
logger.info("Processing PaddleOCR 3.x dict format")
|
|
return _parse_paddleocr_v3_dict(result)
|
|
elif isinstance(result, list) and len(result) > 0:
|
|
first_item = result[0]
|
|
if first_item is None:
|
|
logger.warning("PaddleOCR returned None for first page")
|
|
return [], ""
|
|
|
|
# PaddleOCR 3.x: list contains OCRResult objects (dict-like)
|
|
# Check if first item has 'rec_texts' key (new format)
|
|
if hasattr(first_item, 'get') or isinstance(first_item, dict):
|
|
# Try to extract dict keys for new 3.x format
|
|
item_dict = dict(first_item) if hasattr(first_item, 'items') else first_item
|
|
if 'rec_texts' in item_dict or 'texts' in item_dict:
|
|
logger.info("Processing PaddleOCR 3.x OCRResult format")
|
|
return _parse_paddleocr_v3_dict(item_dict)
|
|
|
|
# Check if first item is a list (traditional format)
|
|
if isinstance(first_item, list):
|
|
# Check if it's the traditional line format [[bbox, (text, conf)], ...]
|
|
if len(first_item) > 0 and isinstance(first_item[0], (list, tuple)):
|
|
logger.info("Processing PaddleOCR traditional list format")
|
|
return _parse_paddleocr_list(first_item)
|
|
|
|
# Unknown format - try to inspect
|
|
logger.warning(f"Unknown result format. Type: {type(first_item)}, Keys: {dir(first_item) if hasattr(first_item, '__dir__') else 'N/A'}")
|
|
# Try dict conversion as last resort
|
|
try:
|
|
item_dict = dict(first_item)
|
|
if 'rec_texts' in item_dict:
|
|
return _parse_paddleocr_v3_dict(item_dict)
|
|
except Exception as e:
|
|
logger.warning(f"Could not convert to dict: {e}")
|
|
return [], ""
|
|
else:
|
|
logger.warning(f"Unexpected PaddleOCR result type: {type(result)}")
|
|
return [], ""
|
|
|
|
except Exception as e:
|
|
logger.error(f"PaddleOCR execution failed: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return [], ""
|
|
|
|
|
|
def _parse_paddleocr_v3_dict(result: dict) -> Tuple[List[OCRRegion], str]:
|
|
"""Parse PaddleOCR 3.x dict format result."""
|
|
regions = []
|
|
all_text_lines = []
|
|
|
|
texts = result.get('rec_texts', result.get('texts', []))
|
|
scores = result.get('rec_scores', result.get('scores', []))
|
|
polys = result.get('dt_polys', result.get('boxes', []))
|
|
# Also try rec_boxes which gives direct [x1, y1, x2, y2] format
|
|
rec_boxes = result.get('rec_boxes', [])
|
|
|
|
logger.info(f"PaddleOCR 3.x: {len(texts)} texts, {len(scores)} scores, {len(polys)} polys, {len(rec_boxes)} rec_boxes")
|
|
|
|
for i, (text, score) in enumerate(zip(texts, scores)):
|
|
if not text or not str(text).strip():
|
|
continue
|
|
|
|
# Try to get bounding box - prefer rec_boxes if available
|
|
x1, y1, x2, y2 = 0, 0, 100, 50 # Default fallback
|
|
|
|
if i < len(rec_boxes) and rec_boxes[i] is not None:
|
|
# rec_boxes format: [x1, y1, x2, y2] or [[x1, y1, x2, y2]]
|
|
box = rec_boxes[i]
|
|
try:
|
|
if hasattr(box, 'flatten'):
|
|
box = box.flatten().tolist()
|
|
if len(box) >= 4:
|
|
x1, y1, x2, y2 = int(box[0]), int(box[1]), int(box[2]), int(box[3])
|
|
except Exception as e:
|
|
logger.debug(f"Could not parse rec_box: {e}")
|
|
|
|
elif i < len(polys) and polys[i] is not None:
|
|
# dt_polys format: [[x1,y1], [x2,y2], [x3,y3], [x4,y4]] or numpy array
|
|
poly = polys[i]
|
|
try:
|
|
# Convert numpy array to list if needed
|
|
if hasattr(poly, 'tolist'):
|
|
poly = poly.tolist()
|
|
if len(poly) >= 4:
|
|
x_coords = [p[0] for p in poly]
|
|
y_coords = [p[1] for p in poly]
|
|
x1, y1 = int(min(x_coords)), int(min(y_coords))
|
|
x2, y2 = int(max(x_coords)), int(max(y_coords))
|
|
except Exception as e:
|
|
logger.debug(f"Could not parse polygon: {e}")
|
|
|
|
region = OCRRegion(
|
|
text=text.strip(),
|
|
confidence=float(score) if score else 0.5,
|
|
x1=x1, y1=y1, x2=x2, y2=y2
|
|
)
|
|
regions.append(region)
|
|
all_text_lines.append(text.strip())
|
|
|
|
regions.sort(key=lambda r: r.y1)
|
|
raw_text = "\n".join(all_text_lines)
|
|
logger.info(f"PaddleOCR 3.x extracted {len(regions)} text regions")
|
|
return regions, raw_text
|
|
|
|
|
|
def _parse_paddleocr_list(page_result: list) -> Tuple[List[OCRRegion], str]:
|
|
"""Parse PaddleOCR traditional list format result."""
|
|
regions = []
|
|
all_text_lines = []
|
|
|
|
for line in page_result:
|
|
if not line or len(line) < 2:
|
|
continue
|
|
|
|
bbox_points = line[0] # [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
|
|
text_info = line[1]
|
|
|
|
# Handle different text_info formats
|
|
if isinstance(text_info, tuple) and len(text_info) >= 2:
|
|
text, confidence = text_info[0], text_info[1]
|
|
elif isinstance(text_info, str):
|
|
text, confidence = text_info, 0.5
|
|
else:
|
|
continue
|
|
|
|
if not text or not text.strip():
|
|
continue
|
|
|
|
# Bounding Box extrahieren
|
|
x_coords = [p[0] for p in bbox_points]
|
|
y_coords = [p[1] for p in bbox_points]
|
|
|
|
region = OCRRegion(
|
|
text=text.strip(),
|
|
confidence=float(confidence),
|
|
x1=int(min(x_coords)),
|
|
y1=int(min(y_coords)),
|
|
x2=int(max(x_coords)),
|
|
y2=int(max(y_coords))
|
|
)
|
|
regions.append(region)
|
|
all_text_lines.append(text.strip())
|
|
|
|
# Regionen nach Y-Position sortieren (oben nach unten)
|
|
regions.sort(key=lambda r: r.y1)
|
|
raw_text = "\n".join(all_text_lines)
|
|
logger.info(f"PaddleOCR extracted {len(regions)} text regions")
|
|
|
|
return regions, raw_text
|
|
|
|
|
|
def group_regions_by_rows(regions: List[OCRRegion], y_tolerance: int = 20) -> List[List[OCRRegion]]:
|
|
"""
|
|
Gruppiert Textregionen in Zeilen basierend auf Y-Position.
|
|
|
|
Args:
|
|
regions: Liste von OCRRegion
|
|
y_tolerance: Max Y-Differenz um zur gleichen Zeile zu gehoeren
|
|
|
|
Returns:
|
|
Liste von Zeilen, jede Zeile ist eine Liste von OCRRegion sortiert nach X
|
|
"""
|
|
if not regions:
|
|
return []
|
|
|
|
rows = []
|
|
current_row = [regions[0]]
|
|
current_y = regions[0].center_y
|
|
|
|
for region in regions[1:]:
|
|
if abs(region.center_y - current_y) <= y_tolerance:
|
|
# Gleiche Zeile
|
|
current_row.append(region)
|
|
else:
|
|
# Neue Zeile
|
|
# Sortiere aktuelle Zeile nach X
|
|
current_row.sort(key=lambda r: r.x1)
|
|
rows.append(current_row)
|
|
current_row = [region]
|
|
current_y = region.center_y
|
|
|
|
# Letzte Zeile nicht vergessen
|
|
if current_row:
|
|
current_row.sort(key=lambda r: r.x1)
|
|
rows.append(current_row)
|
|
|
|
return rows
|
|
|
|
|
|
def detect_columns(rows: List[List[OCRRegion]]) -> int:
|
|
"""
|
|
Erkennt die Anzahl der Spalten basierend auf den Textpositionen.
|
|
|
|
Returns:
|
|
Geschaetzte Spaltenanzahl (2 oder 3 fuer Vokabellisten)
|
|
"""
|
|
if not rows:
|
|
return 2
|
|
|
|
# Zaehle wie viele Elemente pro Zeile
|
|
items_per_row = [len(row) for row in rows if len(row) >= 2]
|
|
|
|
if not items_per_row:
|
|
return 2
|
|
|
|
# Durchschnitt und haeufigster Wert
|
|
avg_items = sum(items_per_row) / len(items_per_row)
|
|
|
|
if avg_items >= 2.5:
|
|
return 3 # 3 Spalten: Englisch | Deutsch | Beispiel
|
|
else:
|
|
return 2 # 2 Spalten: Englisch | Deutsch
|
|
|
|
|
|
def format_ocr_for_llm(regions: List[OCRRegion]) -> str:
|
|
"""
|
|
Formatiert OCR-Output fuer LLM-Verarbeitung.
|
|
Inkludiert Positionsinformationen fuer bessere Strukturerkennung.
|
|
"""
|
|
rows = group_regions_by_rows(regions)
|
|
num_columns = detect_columns(rows)
|
|
|
|
lines = []
|
|
lines.append(f"Erkannte Spalten: {num_columns}")
|
|
lines.append("---")
|
|
|
|
for row in rows:
|
|
if len(row) >= 2:
|
|
# Tab-separierte Werte fuer LLM
|
|
row_text = "\t".join(r.text for r in row)
|
|
lines.append(row_text)
|
|
elif len(row) == 1:
|
|
lines.append(row[0].text)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# =============================================================================
|
|
# LLM Strukturierung
|
|
# =============================================================================
|
|
|
|
STRUCTURE_PROMPT = """Du erhältst OCR-Output einer Vokabelliste aus einem englischen Schulbuch.
|
|
Die Zeilen sind Tab-separiert und enthalten typischerweise:
|
|
- 2 Spalten: Englisch | Deutsch
|
|
- 3 Spalten: Englisch | Deutsch | Beispielsatz
|
|
|
|
OCR-Text:
|
|
{ocr_text}
|
|
|
|
AUFGABE: Strukturiere die Vokabeln als JSON-Array.
|
|
|
|
AUSGABE-FORMAT (nur JSON, keine Erklärungen):
|
|
{{
|
|
"vocabulary": [
|
|
{{"english": "to improve", "german": "verbessern", "example": "I want to improve my English."}},
|
|
{{"english": "achievement", "german": "Leistung", "example": null}}
|
|
]
|
|
}}
|
|
|
|
REGELN:
|
|
1. Erkenne das Spalten-Layout aus den Tab-Trennungen
|
|
2. Korrigiere offensichtliche OCR-Fehler kontextuell (z.B. "vereessern" → "verbessern", "0" → "o")
|
|
3. Bei fehlenden Beispielsätzen: "example": null
|
|
4. Überspringe Überschriften, Seitenzahlen, Kapitelnummern
|
|
5. Behalte Wortarten bei wenn vorhanden (n, v, adj am Ende des englischen Worts)
|
|
6. Gib NUR valides JSON zurück"""
|
|
|
|
|
|
async def structure_vocabulary_with_llm(ocr_text: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Verwendet Ollama LLM um OCR-Text zu strukturieren.
|
|
|
|
Args:
|
|
ocr_text: Formatierter OCR-Output
|
|
|
|
Returns:
|
|
Liste von Vokabel-Dictionaries
|
|
"""
|
|
prompt = STRUCTURE_PROMPT.format(ocr_text=ocr_text)
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
|
# Use Ollama's native /api/chat endpoint
|
|
response = await client.post(
|
|
f"{OLLAMA_URL}/api/chat",
|
|
json={
|
|
"model": LLM_MODEL,
|
|
"messages": [
|
|
{"role": "user", "content": prompt}
|
|
],
|
|
"stream": False,
|
|
"options": {
|
|
"temperature": 0.1,
|
|
"num_predict": 4096
|
|
}
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
content = data.get("message", {}).get("content", "")
|
|
|
|
logger.info(f"Ollama LLM response received: {len(content)} chars")
|
|
|
|
# JSON parsen
|
|
return parse_llm_vocabulary_json(content)
|
|
|
|
except httpx.TimeoutException:
|
|
logger.error("Ollama LLM request timed out")
|
|
return []
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Ollama LLM HTTP error: {e}")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"LLM structuring failed: {e}")
|
|
return []
|
|
|
|
|
|
def parse_llm_vocabulary_json(text: str) -> List[Dict[str, Any]]:
|
|
"""Robustes JSON-Parsing des LLM-Outputs."""
|
|
try:
|
|
# JSON im Text finden
|
|
start = text.find('{')
|
|
end = text.rfind('}') + 1
|
|
|
|
if start == -1 or end == 0:
|
|
logger.warning("No JSON found in LLM response")
|
|
return []
|
|
|
|
json_str = text[start:end]
|
|
data = json.loads(json_str)
|
|
|
|
vocabulary = data.get("vocabulary", [])
|
|
|
|
# Validierung
|
|
valid_entries = []
|
|
for entry in vocabulary:
|
|
english = entry.get("english", "").strip()
|
|
german = entry.get("german", "").strip()
|
|
|
|
if english and german:
|
|
valid_entries.append({
|
|
"english": english,
|
|
"german": german,
|
|
"example": entry.get("example")
|
|
})
|
|
|
|
return valid_entries
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"JSON parse error: {e}")
|
|
# Fallback: Regex extraction
|
|
return extract_vocabulary_regex(text)
|
|
except Exception as e:
|
|
logger.error(f"Vocabulary parsing failed: {e}")
|
|
return []
|
|
|
|
|
|
def extract_vocabulary_regex(text: str) -> List[Dict[str, Any]]:
|
|
"""Fallback: Vokabeln via Regex extrahieren."""
|
|
pattern = r'"english"\s*:\s*"([^"]+)"\s*,\s*"german"\s*:\s*"([^"]+)"'
|
|
matches = re.findall(pattern, text)
|
|
|
|
vocabulary = []
|
|
for english, german in matches:
|
|
vocabulary.append({
|
|
"english": english.strip(),
|
|
"german": german.strip(),
|
|
"example": None
|
|
})
|
|
|
|
logger.info(f"Regex fallback extracted {len(vocabulary)} entries")
|
|
return vocabulary
|
|
|
|
|
|
# =============================================================================
|
|
# Public API
|
|
# =============================================================================
|
|
|
|
async def extract_vocabulary_hybrid(
|
|
image_bytes: bytes,
|
|
page_number: int = 0
|
|
) -> Tuple[List[Dict[str, Any]], float, str]:
|
|
"""
|
|
Hybrid-Extraktion: PaddleOCR + LLM Strukturierung.
|
|
|
|
Args:
|
|
image_bytes: Bild als Bytes
|
|
page_number: Seitennummer (0-indexed) fuer Fehlermeldungen
|
|
|
|
Returns:
|
|
Tuple of (vocabulary_list, confidence, error_message)
|
|
"""
|
|
try:
|
|
# Step 1: PaddleOCR
|
|
logger.info(f"Starting hybrid extraction for page {page_number + 1}")
|
|
regions, raw_text = run_paddle_ocr(image_bytes)
|
|
|
|
if not regions:
|
|
return [], 0.0, f"Seite {page_number + 1}: Kein Text erkannt (OCR)"
|
|
|
|
# Step 2: Formatieren fuer LLM
|
|
formatted_text = format_ocr_for_llm(regions)
|
|
logger.info(f"Formatted OCR text: {len(formatted_text)} chars")
|
|
|
|
# Step 3: LLM Strukturierung
|
|
vocabulary = await structure_vocabulary_with_llm(formatted_text)
|
|
|
|
if not vocabulary:
|
|
# Fallback: Versuche direkte Zeilen-Analyse
|
|
vocabulary = extract_from_rows_directly(regions)
|
|
|
|
if not vocabulary:
|
|
return [], 0.0, f"Seite {page_number + 1}: Keine Vokabeln erkannt"
|
|
|
|
# Durchschnittliche OCR-Confidence
|
|
avg_confidence = sum(r.confidence for r in regions) / len(regions) if regions else 0.0
|
|
|
|
logger.info(f"Hybrid extraction completed: {len(vocabulary)} entries, {avg_confidence:.2f} confidence")
|
|
|
|
return vocabulary, avg_confidence, ""
|
|
|
|
except Exception as e:
|
|
logger.error(f"Hybrid extraction failed: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return [], 0.0, f"Seite {page_number + 1}: Fehler - {str(e)[:50]}"
|
|
|
|
|
|
def extract_from_rows_directly(regions: List[OCRRegion]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Direkter Fallback: Extrahiere Vokabeln ohne LLM basierend auf Zeilen-Struktur.
|
|
Funktioniert nur bei klarem 2-3 Spalten-Layout.
|
|
"""
|
|
rows = group_regions_by_rows(regions)
|
|
vocabulary = []
|
|
|
|
for row in rows:
|
|
if len(row) >= 2:
|
|
english = row[0].text.strip()
|
|
german = row[1].text.strip()
|
|
example = row[2].text.strip() if len(row) >= 3 else None
|
|
|
|
# Einfache Validierung
|
|
if english and german and len(english) > 1 and len(german) > 1:
|
|
vocabulary.append({
|
|
"english": english,
|
|
"german": german,
|
|
"example": example
|
|
})
|
|
|
|
logger.info(f"Direct row extraction: {len(vocabulary)} entries")
|
|
return vocabulary
|
|
|
|
|
|
# =============================================================================
|
|
# Test/Debug
|
|
# =============================================================================
|
|
|
|
async def test_hybrid_extraction(image_path: str):
|
|
"""Test-Funktion fuer Entwicklung."""
|
|
with open(image_path, "rb") as f:
|
|
image_bytes = f.read()
|
|
|
|
vocab, confidence, error = await extract_vocabulary_hybrid(image_bytes)
|
|
|
|
print(f"\n=== Hybrid OCR Test ===")
|
|
print(f"Confidence: {confidence:.2f}")
|
|
print(f"Error: {error or 'None'}")
|
|
print(f"Vocabulary ({len(vocab)} entries):")
|
|
for v in vocab[:10]:
|
|
print(f" - {v['english']} = {v['german']}")
|
|
|
|
return vocab
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import asyncio
|
|
import sys
|
|
|
|
if len(sys.argv) > 1:
|
|
asyncio.run(test_hybrid_extraction(sys.argv[1]))
|
|
else:
|
|
print("Usage: python hybrid_vocab_extractor.py <image_path>")
|