""" Hybrid OCR + LLM Vocabulary Extractor Zweistufiger Ansatz fuer optimale Vokabel-Extraktion: 1. PaddleOCR fuer schnelle, praezise Texterkennung mit Bounding-Boxes 2. qwen2.5:14b (via LLM Gateway) fuer semantische Strukturierung Vorteile gegenueber reinem Vision LLM: - 4x schneller (~7-15 Sek vs 30-60 Sek pro Seite) - Hoehere Genauigkeit bei gedrucktem Text (95-99%) - Weniger Halluzinationen (LLM korrigiert nur, erfindet nicht) - Position-basierte Spaltenerkennung moeglich DATENSCHUTZ: Alle Verarbeitung erfolgt lokal (Mac Mini). """ import os import io import json import logging import re from typing import List, Dict, Any, Optional, Tuple from dataclasses import dataclass import uuid import httpx import numpy as np from PIL import Image # OpenCV is optional - only required for actual image processing try: import cv2 CV2_AVAILABLE = True except ImportError: cv2 = None CV2_AVAILABLE = False logger = logging.getLogger(__name__) # Configuration - Use Ollama directly (no separate LLM Gateway) OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") LLM_MODEL = os.getenv("LLM_MODEL", "qwen2.5:14b") # PaddleOCR - Lazy loading _paddle_ocr = None def get_paddle_ocr(): """ Lazy load PaddleOCR to avoid startup delay. PaddleOCR 3.x API (released May 2025): - Only 'lang' parameter confirmed valid - Removed parameters: use_gpu, device, show_log, det, rec, use_onnx - GPU/CPU selection is automatic """ global _paddle_ocr if _paddle_ocr is None: try: from paddleocr import PaddleOCR import logging as std_logging # Suppress verbose logging from PaddleOCR and PaddlePaddle for logger_name in ['ppocr', 'paddle', 'paddleocr', 'root']: std_logging.getLogger(logger_name).setLevel(std_logging.WARNING) # PaddleOCR 3.x: Only use 'lang' parameter # Try German first, then English, then minimal try: _paddle_ocr = PaddleOCR(lang="de") logger.info("PaddleOCR 3.x initialized (lang=de)") except Exception as e1: logger.warning(f"PaddleOCR lang=de failed: {e1}") try: _paddle_ocr = PaddleOCR(lang="en") logger.info("PaddleOCR 3.x initialized (lang=en)") except Exception as e2: logger.warning(f"PaddleOCR lang=en failed: {e2}") _paddle_ocr = PaddleOCR() logger.info("PaddleOCR 3.x initialized (defaults)") except Exception as e: logger.error(f"PaddleOCR initialization failed: {e}") _paddle_ocr = None return _paddle_ocr @dataclass class OCRRegion: """Ein erkannter Textbereich mit Position.""" text: str confidence: float x1: int y1: int x2: int y2: int @property def center_x(self) -> int: return (self.x1 + self.x2) // 2 @property def center_y(self) -> int: return (self.y1 + self.y2) // 2 # ============================================================================= # OCR Pipeline # ============================================================================= def preprocess_image(img: Image.Image) -> np.ndarray: """ Bildvorverarbeitung fuer bessere OCR-Ergebnisse. - Konvertierung zu RGB - Optional: Kontrastverstarkung Raises: ImportError: If OpenCV is not available """ if not CV2_AVAILABLE: raise ImportError( "OpenCV (cv2) is required for image preprocessing. " "Install with: pip install opencv-python-headless" ) # PIL zu numpy array img_array = np.array(img) # Zu RGB konvertieren falls noetig if len(img_array.shape) == 2: # Graustufen zu RGB img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB) elif img_array.shape[2] == 4: # RGBA zu RGB img_array = cv2.cvtColor(img_array, cv2.COLOR_RGBA2RGB) return img_array def run_paddle_ocr(image_bytes: bytes) -> Tuple[List[OCRRegion], str]: """ Fuehrt PaddleOCR auf einem Bild aus. PaddleOCR 3.x returns results in format: - result = ocr.ocr(img) returns list of pages - Each page contains list of text lines - Each line: [bbox_points, (text, confidence)] Returns: Tuple of (list of OCRRegion, raw_text) """ ocr = get_paddle_ocr() if ocr is None: logger.error("PaddleOCR not available") return [], "" try: # Bild laden und vorverarbeiten img = Image.open(io.BytesIO(image_bytes)) img_array = preprocess_image(img) # OCR ausfuehren - PaddleOCR 3.x API # Note: cls parameter may not be supported in 3.x, try without it try: result = ocr.ocr(img_array) except TypeError: # Fallback if ocr() doesn't accept the array directly logger.warning("Trying alternative OCR call method") result = ocr.ocr(img_array) if not result: logger.warning("PaddleOCR returned empty result") return [], "" # Handle different result formats # PaddleOCR 3.x returns list of OCRResult objects (dict-like) if isinstance(result, dict): # Direct dict format with 'rec_texts', 'rec_scores', 'dt_polys' logger.info("Processing PaddleOCR 3.x dict format") return _parse_paddleocr_v3_dict(result) elif isinstance(result, list) and len(result) > 0: first_item = result[0] if first_item is None: logger.warning("PaddleOCR returned None for first page") return [], "" # PaddleOCR 3.x: list contains OCRResult objects (dict-like) # Check if first item has 'rec_texts' key (new format) if hasattr(first_item, 'get') or isinstance(first_item, dict): # Try to extract dict keys for new 3.x format item_dict = dict(first_item) if hasattr(first_item, 'items') else first_item if 'rec_texts' in item_dict or 'texts' in item_dict: logger.info("Processing PaddleOCR 3.x OCRResult format") return _parse_paddleocr_v3_dict(item_dict) # Check if first item is a list (traditional format) if isinstance(first_item, list): # Check if it's the traditional line format [[bbox, (text, conf)], ...] if len(first_item) > 0 and isinstance(first_item[0], (list, tuple)): logger.info("Processing PaddleOCR traditional list format") return _parse_paddleocr_list(first_item) # Unknown format - try to inspect logger.warning(f"Unknown result format. Type: {type(first_item)}, Keys: {dir(first_item) if hasattr(first_item, '__dir__') else 'N/A'}") # Try dict conversion as last resort try: item_dict = dict(first_item) if 'rec_texts' in item_dict: return _parse_paddleocr_v3_dict(item_dict) except Exception as e: logger.warning(f"Could not convert to dict: {e}") return [], "" else: logger.warning(f"Unexpected PaddleOCR result type: {type(result)}") return [], "" except Exception as e: logger.error(f"PaddleOCR execution failed: {e}") import traceback logger.error(traceback.format_exc()) return [], "" def _parse_paddleocr_v3_dict(result: dict) -> Tuple[List[OCRRegion], str]: """Parse PaddleOCR 3.x dict format result.""" regions = [] all_text_lines = [] texts = result.get('rec_texts', result.get('texts', [])) scores = result.get('rec_scores', result.get('scores', [])) polys = result.get('dt_polys', result.get('boxes', [])) # Also try rec_boxes which gives direct [x1, y1, x2, y2] format rec_boxes = result.get('rec_boxes', []) logger.info(f"PaddleOCR 3.x: {len(texts)} texts, {len(scores)} scores, {len(polys)} polys, {len(rec_boxes)} rec_boxes") for i, (text, score) in enumerate(zip(texts, scores)): if not text or not str(text).strip(): continue # Try to get bounding box - prefer rec_boxes if available x1, y1, x2, y2 = 0, 0, 100, 50 # Default fallback if i < len(rec_boxes) and rec_boxes[i] is not None: # rec_boxes format: [x1, y1, x2, y2] or [[x1, y1, x2, y2]] box = rec_boxes[i] try: if hasattr(box, 'flatten'): box = box.flatten().tolist() if len(box) >= 4: x1, y1, x2, y2 = int(box[0]), int(box[1]), int(box[2]), int(box[3]) except Exception as e: logger.debug(f"Could not parse rec_box: {e}") elif i < len(polys) and polys[i] is not None: # dt_polys format: [[x1,y1], [x2,y2], [x3,y3], [x4,y4]] or numpy array poly = polys[i] try: # Convert numpy array to list if needed if hasattr(poly, 'tolist'): poly = poly.tolist() if len(poly) >= 4: x_coords = [p[0] for p in poly] y_coords = [p[1] for p in poly] x1, y1 = int(min(x_coords)), int(min(y_coords)) x2, y2 = int(max(x_coords)), int(max(y_coords)) except Exception as e: logger.debug(f"Could not parse polygon: {e}") region = OCRRegion( text=text.strip(), confidence=float(score) if score else 0.5, x1=x1, y1=y1, x2=x2, y2=y2 ) regions.append(region) all_text_lines.append(text.strip()) regions.sort(key=lambda r: r.y1) raw_text = "\n".join(all_text_lines) logger.info(f"PaddleOCR 3.x extracted {len(regions)} text regions") return regions, raw_text def _parse_paddleocr_list(page_result: list) -> Tuple[List[OCRRegion], str]: """Parse PaddleOCR traditional list format result.""" regions = [] all_text_lines = [] for line in page_result: if not line or len(line) < 2: continue bbox_points = line[0] # [[x1,y1], [x2,y2], [x3,y3], [x4,y4]] text_info = line[1] # Handle different text_info formats if isinstance(text_info, tuple) and len(text_info) >= 2: text, confidence = text_info[0], text_info[1] elif isinstance(text_info, str): text, confidence = text_info, 0.5 else: continue if not text or not text.strip(): continue # Bounding Box extrahieren x_coords = [p[0] for p in bbox_points] y_coords = [p[1] for p in bbox_points] region = OCRRegion( text=text.strip(), confidence=float(confidence), x1=int(min(x_coords)), y1=int(min(y_coords)), x2=int(max(x_coords)), y2=int(max(y_coords)) ) regions.append(region) all_text_lines.append(text.strip()) # Regionen nach Y-Position sortieren (oben nach unten) regions.sort(key=lambda r: r.y1) raw_text = "\n".join(all_text_lines) logger.info(f"PaddleOCR extracted {len(regions)} text regions") return regions, raw_text def group_regions_by_rows(regions: List[OCRRegion], y_tolerance: int = 20) -> List[List[OCRRegion]]: """ Gruppiert Textregionen in Zeilen basierend auf Y-Position. Args: regions: Liste von OCRRegion y_tolerance: Max Y-Differenz um zur gleichen Zeile zu gehoeren Returns: Liste von Zeilen, jede Zeile ist eine Liste von OCRRegion sortiert nach X """ if not regions: return [] rows = [] current_row = [regions[0]] current_y = regions[0].center_y for region in regions[1:]: if abs(region.center_y - current_y) <= y_tolerance: # Gleiche Zeile current_row.append(region) else: # Neue Zeile # Sortiere aktuelle Zeile nach X current_row.sort(key=lambda r: r.x1) rows.append(current_row) current_row = [region] current_y = region.center_y # Letzte Zeile nicht vergessen if current_row: current_row.sort(key=lambda r: r.x1) rows.append(current_row) return rows def detect_columns(rows: List[List[OCRRegion]]) -> int: """ Erkennt die Anzahl der Spalten basierend auf den Textpositionen. Returns: Geschaetzte Spaltenanzahl (2 oder 3 fuer Vokabellisten) """ if not rows: return 2 # Zaehle wie viele Elemente pro Zeile items_per_row = [len(row) for row in rows if len(row) >= 2] if not items_per_row: return 2 # Durchschnitt und haeufigster Wert avg_items = sum(items_per_row) / len(items_per_row) if avg_items >= 2.5: return 3 # 3 Spalten: Englisch | Deutsch | Beispiel else: return 2 # 2 Spalten: Englisch | Deutsch def format_ocr_for_llm(regions: List[OCRRegion]) -> str: """ Formatiert OCR-Output fuer LLM-Verarbeitung. Inkludiert Positionsinformationen fuer bessere Strukturerkennung. """ rows = group_regions_by_rows(regions) num_columns = detect_columns(rows) lines = [] lines.append(f"Erkannte Spalten: {num_columns}") lines.append("---") for row in rows: if len(row) >= 2: # Tab-separierte Werte fuer LLM row_text = "\t".join(r.text for r in row) lines.append(row_text) elif len(row) == 1: lines.append(row[0].text) return "\n".join(lines) # ============================================================================= # LLM Strukturierung # ============================================================================= STRUCTURE_PROMPT = """Du erhältst OCR-Output einer Vokabelliste aus einem englischen Schulbuch. Die Zeilen sind Tab-separiert und enthalten typischerweise: - 2 Spalten: Englisch | Deutsch - 3 Spalten: Englisch | Deutsch | Beispielsatz OCR-Text: {ocr_text} AUFGABE: Strukturiere die Vokabeln als JSON-Array. AUSGABE-FORMAT (nur JSON, keine Erklärungen): {{ "vocabulary": [ {{"english": "to improve", "german": "verbessern", "example": "I want to improve my English."}}, {{"english": "achievement", "german": "Leistung", "example": null}} ] }} REGELN: 1. Erkenne das Spalten-Layout aus den Tab-Trennungen 2. Korrigiere offensichtliche OCR-Fehler kontextuell (z.B. "vereessern" → "verbessern", "0" → "o") 3. Bei fehlenden Beispielsätzen: "example": null 4. Überspringe Überschriften, Seitenzahlen, Kapitelnummern 5. Behalte Wortarten bei wenn vorhanden (n, v, adj am Ende des englischen Worts) 6. Gib NUR valides JSON zurück""" async def structure_vocabulary_with_llm(ocr_text: str) -> List[Dict[str, Any]]: """ Verwendet Ollama LLM um OCR-Text zu strukturieren. Args: ocr_text: Formatierter OCR-Output Returns: Liste von Vokabel-Dictionaries """ prompt = STRUCTURE_PROMPT.format(ocr_text=ocr_text) try: async with httpx.AsyncClient(timeout=120.0) as client: # Use Ollama's native /api/chat endpoint response = await client.post( f"{OLLAMA_URL}/api/chat", json={ "model": LLM_MODEL, "messages": [ {"role": "user", "content": prompt} ], "stream": False, "options": { "temperature": 0.1, "num_predict": 4096 } } ) response.raise_for_status() data = response.json() content = data.get("message", {}).get("content", "") logger.info(f"Ollama LLM response received: {len(content)} chars") # JSON parsen return parse_llm_vocabulary_json(content) except httpx.TimeoutException: logger.error("Ollama LLM request timed out") return [] except httpx.HTTPStatusError as e: logger.error(f"Ollama LLM HTTP error: {e}") return [] except Exception as e: logger.error(f"LLM structuring failed: {e}") return [] def parse_llm_vocabulary_json(text: str) -> List[Dict[str, Any]]: """Robustes JSON-Parsing des LLM-Outputs.""" try: # JSON im Text finden start = text.find('{') end = text.rfind('}') + 1 if start == -1 or end == 0: logger.warning("No JSON found in LLM response") return [] json_str = text[start:end] data = json.loads(json_str) vocabulary = data.get("vocabulary", []) # Validierung valid_entries = [] for entry in vocabulary: english = entry.get("english", "").strip() german = entry.get("german", "").strip() if english and german: valid_entries.append({ "english": english, "german": german, "example": entry.get("example") }) return valid_entries except json.JSONDecodeError as e: logger.error(f"JSON parse error: {e}") # Fallback: Regex extraction return extract_vocabulary_regex(text) except Exception as e: logger.error(f"Vocabulary parsing failed: {e}") return [] def extract_vocabulary_regex(text: str) -> List[Dict[str, Any]]: """Fallback: Vokabeln via Regex extrahieren.""" pattern = r'"english"\s*:\s*"([^"]+)"\s*,\s*"german"\s*:\s*"([^"]+)"' matches = re.findall(pattern, text) vocabulary = [] for english, german in matches: vocabulary.append({ "english": english.strip(), "german": german.strip(), "example": None }) logger.info(f"Regex fallback extracted {len(vocabulary)} entries") return vocabulary # ============================================================================= # Public API # ============================================================================= async def extract_vocabulary_hybrid( image_bytes: bytes, page_number: int = 0 ) -> Tuple[List[Dict[str, Any]], float, str]: """ Hybrid-Extraktion: PaddleOCR + LLM Strukturierung. Args: image_bytes: Bild als Bytes page_number: Seitennummer (0-indexed) fuer Fehlermeldungen Returns: Tuple of (vocabulary_list, confidence, error_message) """ try: # Step 1: PaddleOCR logger.info(f"Starting hybrid extraction for page {page_number + 1}") regions, raw_text = run_paddle_ocr(image_bytes) if not regions: return [], 0.0, f"Seite {page_number + 1}: Kein Text erkannt (OCR)" # Step 2: Formatieren fuer LLM formatted_text = format_ocr_for_llm(regions) logger.info(f"Formatted OCR text: {len(formatted_text)} chars") # Step 3: LLM Strukturierung vocabulary = await structure_vocabulary_with_llm(formatted_text) if not vocabulary: # Fallback: Versuche direkte Zeilen-Analyse vocabulary = extract_from_rows_directly(regions) if not vocabulary: return [], 0.0, f"Seite {page_number + 1}: Keine Vokabeln erkannt" # Durchschnittliche OCR-Confidence avg_confidence = sum(r.confidence for r in regions) / len(regions) if regions else 0.0 logger.info(f"Hybrid extraction completed: {len(vocabulary)} entries, {avg_confidence:.2f} confidence") return vocabulary, avg_confidence, "" except Exception as e: logger.error(f"Hybrid extraction failed: {e}") import traceback logger.error(traceback.format_exc()) return [], 0.0, f"Seite {page_number + 1}: Fehler - {str(e)[:50]}" def extract_from_rows_directly(regions: List[OCRRegion]) -> List[Dict[str, Any]]: """ Direkter Fallback: Extrahiere Vokabeln ohne LLM basierend auf Zeilen-Struktur. Funktioniert nur bei klarem 2-3 Spalten-Layout. """ rows = group_regions_by_rows(regions) vocabulary = [] for row in rows: if len(row) >= 2: english = row[0].text.strip() german = row[1].text.strip() example = row[2].text.strip() if len(row) >= 3 else None # Einfache Validierung if english and german and len(english) > 1 and len(german) > 1: vocabulary.append({ "english": english, "german": german, "example": example }) logger.info(f"Direct row extraction: {len(vocabulary)} entries") return vocabulary # ============================================================================= # Test/Debug # ============================================================================= async def test_hybrid_extraction(image_path: str): """Test-Funktion fuer Entwicklung.""" with open(image_path, "rb") as f: image_bytes = f.read() vocab, confidence, error = await extract_vocabulary_hybrid(image_bytes) print(f"\n=== Hybrid OCR Test ===") print(f"Confidence: {confidence:.2f}") print(f"Error: {error or 'None'}") print(f"Vocabulary ({len(vocab)} entries):") for v in vocab[:10]: print(f" - {v['english']} = {v['german']}") return vocab if __name__ == "__main__": import asyncio import sys if len(sys.argv) > 1: asyncio.run(test_hybrid_extraction(sys.argv[1])) else: print("Usage: python hybrid_vocab_extractor.py ")