[split-required] Split remaining 500-680 LOC files (final batch)

website (17 pages + 3 components):
- multiplayer/wizard, middleware/wizard+test-wizard, communication
- builds/wizard, staff-search, voice, sbom/wizard
- foerderantrag, mail/tasks, tools/communication, sbom
- compliance/evidence, uni-crawler, brandbook (already done)
- CollectionsTab, IngestionTab, RiskHeatmap

backend-lehrer (5 files):
- letters_api (641 → 2), certificates_api (636 → 2)
- alerts_agent/db/models (636 → 3)
- llm_gateway/communication_service (614 → 2)
- game/database already done in prior batch

klausur-service (2 files):
- hybrid_vocab_extractor (664 → 2)
- klausur-service/frontend: api.ts (620 → 3), EHUploadWizard (591 → 2)

voice-service (3 files):
- bqas/rag_judge (618 → 3), runner (529 → 2)
- enhanced_task_orchestrator (519 → 2)

studio-v2 (6 files):
- korrektur/[klausurId] (578 → 4), fairness (569 → 2)
- AlertsWizard (552 → 2), OnboardingWizard (513 → 2)
- korrektur/api.ts (506 → 3), geo-lernwelt (501 → 2)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-04-25 08:56:45 +02:00
parent b4613e26f3
commit 451365a312
115 changed files with 10694 additions and 13839 deletions

View File

@@ -1,425 +1,42 @@
"""
Hybrid OCR + LLM Vocabulary Extractor
Zweistufiger Ansatz fuer optimale Vokabel-Extraktion:
1. PaddleOCR fuer schnelle, praezise Texterkennung mit Bounding-Boxes
2. qwen2.5:14b (via LLM Gateway) fuer semantische Strukturierung
Split into:
- hybrid_vocab_ocr.py: PaddleOCR integration, parsing, row/column detection
- hybrid_vocab_extractor.py (this file): LLM structuring, public API, barrel re-exports
Vorteile gegenueber reinem Vision LLM:
- 4x schneller (~7-15 Sek vs 30-60 Sek pro Seite)
- Hoehere Genauigkeit bei gedrucktem Text (95-99%)
- Weniger Halluzinationen (LLM korrigiert nur, erfindet nicht)
- Position-basierte Spaltenerkennung moeglich
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal (Mac Mini).
All symbols re-exported for backward compatibility.
"""
import os
import io
import json
import logging
import re
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
import uuid
from typing import List, Dict, Any, Tuple
import httpx
import numpy as np
from PIL import Image
# OpenCV is optional - only required for actual image processing
try:
import cv2
CV2_AVAILABLE = True
except ImportError:
cv2 = None
CV2_AVAILABLE = False
# Re-export everything from ocr module for backward compatibility
from hybrid_vocab_ocr import (
OCRRegion,
get_paddle_ocr,
preprocess_image,
run_paddle_ocr,
group_regions_by_rows,
detect_columns,
format_ocr_for_llm,
)
logger = logging.getLogger(__name__)
# Configuration - Use Ollama directly (no separate LLM Gateway)
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
LLM_MODEL = os.getenv("LLM_MODEL", "qwen2.5:14b")
# PaddleOCR - Lazy loading
_paddle_ocr = None
def get_paddle_ocr():
"""
Lazy load PaddleOCR to avoid startup delay.
PaddleOCR 3.x API (released May 2025):
- Only 'lang' parameter confirmed valid
- Removed parameters: use_gpu, device, show_log, det, rec, use_onnx
- GPU/CPU selection is automatic
"""
global _paddle_ocr
if _paddle_ocr is None:
try:
from paddleocr import PaddleOCR
import logging as std_logging
# Suppress verbose logging from PaddleOCR and PaddlePaddle
for logger_name in ['ppocr', 'paddle', 'paddleocr', 'root']:
std_logging.getLogger(logger_name).setLevel(std_logging.WARNING)
# PaddleOCR 3.x: Only use 'lang' parameter
# Try German first, then English, then minimal
try:
_paddle_ocr = PaddleOCR(lang="de")
logger.info("PaddleOCR 3.x initialized (lang=de)")
except Exception as e1:
logger.warning(f"PaddleOCR lang=de failed: {e1}")
try:
_paddle_ocr = PaddleOCR(lang="en")
logger.info("PaddleOCR 3.x initialized (lang=en)")
except Exception as e2:
logger.warning(f"PaddleOCR lang=en failed: {e2}")
_paddle_ocr = PaddleOCR()
logger.info("PaddleOCR 3.x initialized (defaults)")
except Exception as e:
logger.error(f"PaddleOCR initialization failed: {e}")
_paddle_ocr = None
return _paddle_ocr
@dataclass
class OCRRegion:
"""Ein erkannter Textbereich mit Position."""
text: str
confidence: float
x1: int
y1: int
x2: int
y2: int
@property
def center_x(self) -> int:
return (self.x1 + self.x2) // 2
@property
def center_y(self) -> int:
return (self.y1 + self.y2) // 2
# =============================================================================
# OCR Pipeline
# =============================================================================
def preprocess_image(img: Image.Image) -> np.ndarray:
"""
Bildvorverarbeitung fuer bessere OCR-Ergebnisse.
- Konvertierung zu RGB
- Optional: Kontrastverstarkung
Raises:
ImportError: If OpenCV is not available
"""
if not CV2_AVAILABLE:
raise ImportError(
"OpenCV (cv2) is required for image preprocessing. "
"Install with: pip install opencv-python-headless"
)
# PIL zu numpy array
img_array = np.array(img)
# Zu RGB konvertieren falls noetig
if len(img_array.shape) == 2:
# Graustufen zu RGB
img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB)
elif img_array.shape[2] == 4:
# RGBA zu RGB
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGBA2RGB)
return img_array
def run_paddle_ocr(image_bytes: bytes) -> Tuple[List[OCRRegion], str]:
"""
Fuehrt PaddleOCR auf einem Bild aus.
PaddleOCR 3.x returns results in format:
- result = ocr.ocr(img) returns list of pages
- Each page contains list of text lines
- Each line: [bbox_points, (text, confidence)]
Returns:
Tuple of (list of OCRRegion, raw_text)
"""
ocr = get_paddle_ocr()
if ocr is None:
logger.error("PaddleOCR not available")
return [], ""
try:
# Bild laden und vorverarbeiten
img = Image.open(io.BytesIO(image_bytes))
img_array = preprocess_image(img)
# OCR ausfuehren - PaddleOCR 3.x API
# Note: cls parameter may not be supported in 3.x, try without it
try:
result = ocr.ocr(img_array)
except TypeError:
# Fallback if ocr() doesn't accept the array directly
logger.warning("Trying alternative OCR call method")
result = ocr.ocr(img_array)
if not result:
logger.warning("PaddleOCR returned empty result")
return [], ""
# Handle different result formats
# PaddleOCR 3.x returns list of OCRResult objects (dict-like)
if isinstance(result, dict):
# Direct dict format with 'rec_texts', 'rec_scores', 'dt_polys'
logger.info("Processing PaddleOCR 3.x dict format")
return _parse_paddleocr_v3_dict(result)
elif isinstance(result, list) and len(result) > 0:
first_item = result[0]
if first_item is None:
logger.warning("PaddleOCR returned None for first page")
return [], ""
# PaddleOCR 3.x: list contains OCRResult objects (dict-like)
# Check if first item has 'rec_texts' key (new format)
if hasattr(first_item, 'get') or isinstance(first_item, dict):
# Try to extract dict keys for new 3.x format
item_dict = dict(first_item) if hasattr(first_item, 'items') else first_item
if 'rec_texts' in item_dict or 'texts' in item_dict:
logger.info("Processing PaddleOCR 3.x OCRResult format")
return _parse_paddleocr_v3_dict(item_dict)
# Check if first item is a list (traditional format)
if isinstance(first_item, list):
# Check if it's the traditional line format [[bbox, (text, conf)], ...]
if len(first_item) > 0 and isinstance(first_item[0], (list, tuple)):
logger.info("Processing PaddleOCR traditional list format")
return _parse_paddleocr_list(first_item)
# Unknown format - try to inspect
logger.warning(f"Unknown result format. Type: {type(first_item)}, Keys: {dir(first_item) if hasattr(first_item, '__dir__') else 'N/A'}")
# Try dict conversion as last resort
try:
item_dict = dict(first_item)
if 'rec_texts' in item_dict:
return _parse_paddleocr_v3_dict(item_dict)
except Exception as e:
logger.warning(f"Could not convert to dict: {e}")
return [], ""
else:
logger.warning(f"Unexpected PaddleOCR result type: {type(result)}")
return [], ""
except Exception as e:
logger.error(f"PaddleOCR execution failed: {e}")
import traceback
logger.error(traceback.format_exc())
return [], ""
def _parse_paddleocr_v3_dict(result: dict) -> Tuple[List[OCRRegion], str]:
"""Parse PaddleOCR 3.x dict format result."""
regions = []
all_text_lines = []
texts = result.get('rec_texts', result.get('texts', []))
scores = result.get('rec_scores', result.get('scores', []))
polys = result.get('dt_polys', result.get('boxes', []))
# Also try rec_boxes which gives direct [x1, y1, x2, y2] format
rec_boxes = result.get('rec_boxes', [])
logger.info(f"PaddleOCR 3.x: {len(texts)} texts, {len(scores)} scores, {len(polys)} polys, {len(rec_boxes)} rec_boxes")
for i, (text, score) in enumerate(zip(texts, scores)):
if not text or not str(text).strip():
continue
# Try to get bounding box - prefer rec_boxes if available
x1, y1, x2, y2 = 0, 0, 100, 50 # Default fallback
if i < len(rec_boxes) and rec_boxes[i] is not None:
# rec_boxes format: [x1, y1, x2, y2] or [[x1, y1, x2, y2]]
box = rec_boxes[i]
try:
if hasattr(box, 'flatten'):
box = box.flatten().tolist()
if len(box) >= 4:
x1, y1, x2, y2 = int(box[0]), int(box[1]), int(box[2]), int(box[3])
except Exception as e:
logger.debug(f"Could not parse rec_box: {e}")
elif i < len(polys) and polys[i] is not None:
# dt_polys format: [[x1,y1], [x2,y2], [x3,y3], [x4,y4]] or numpy array
poly = polys[i]
try:
# Convert numpy array to list if needed
if hasattr(poly, 'tolist'):
poly = poly.tolist()
if len(poly) >= 4:
x_coords = [p[0] for p in poly]
y_coords = [p[1] for p in poly]
x1, y1 = int(min(x_coords)), int(min(y_coords))
x2, y2 = int(max(x_coords)), int(max(y_coords))
except Exception as e:
logger.debug(f"Could not parse polygon: {e}")
region = OCRRegion(
text=text.strip(),
confidence=float(score) if score else 0.5,
x1=x1, y1=y1, x2=x2, y2=y2
)
regions.append(region)
all_text_lines.append(text.strip())
regions.sort(key=lambda r: r.y1)
raw_text = "\n".join(all_text_lines)
logger.info(f"PaddleOCR 3.x extracted {len(regions)} text regions")
return regions, raw_text
def _parse_paddleocr_list(page_result: list) -> Tuple[List[OCRRegion], str]:
"""Parse PaddleOCR traditional list format result."""
regions = []
all_text_lines = []
for line in page_result:
if not line or len(line) < 2:
continue
bbox_points = line[0] # [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
text_info = line[1]
# Handle different text_info formats
if isinstance(text_info, tuple) and len(text_info) >= 2:
text, confidence = text_info[0], text_info[1]
elif isinstance(text_info, str):
text, confidence = text_info, 0.5
else:
continue
if not text or not text.strip():
continue
# Bounding Box extrahieren
x_coords = [p[0] for p in bbox_points]
y_coords = [p[1] for p in bbox_points]
region = OCRRegion(
text=text.strip(),
confidence=float(confidence),
x1=int(min(x_coords)),
y1=int(min(y_coords)),
x2=int(max(x_coords)),
y2=int(max(y_coords))
)
regions.append(region)
all_text_lines.append(text.strip())
# Regionen nach Y-Position sortieren (oben nach unten)
regions.sort(key=lambda r: r.y1)
raw_text = "\n".join(all_text_lines)
logger.info(f"PaddleOCR extracted {len(regions)} text regions")
return regions, raw_text
def group_regions_by_rows(regions: List[OCRRegion], y_tolerance: int = 20) -> List[List[OCRRegion]]:
"""
Gruppiert Textregionen in Zeilen basierend auf Y-Position.
Args:
regions: Liste von OCRRegion
y_tolerance: Max Y-Differenz um zur gleichen Zeile zu gehoeren
Returns:
Liste von Zeilen, jede Zeile ist eine Liste von OCRRegion sortiert nach X
"""
if not regions:
return []
rows = []
current_row = [regions[0]]
current_y = regions[0].center_y
for region in regions[1:]:
if abs(region.center_y - current_y) <= y_tolerance:
# Gleiche Zeile
current_row.append(region)
else:
# Neue Zeile
# Sortiere aktuelle Zeile nach X
current_row.sort(key=lambda r: r.x1)
rows.append(current_row)
current_row = [region]
current_y = region.center_y
# Letzte Zeile nicht vergessen
if current_row:
current_row.sort(key=lambda r: r.x1)
rows.append(current_row)
return rows
def detect_columns(rows: List[List[OCRRegion]]) -> int:
"""
Erkennt die Anzahl der Spalten basierend auf den Textpositionen.
Returns:
Geschaetzte Spaltenanzahl (2 oder 3 fuer Vokabellisten)
"""
if not rows:
return 2
# Zaehle wie viele Elemente pro Zeile
items_per_row = [len(row) for row in rows if len(row) >= 2]
if not items_per_row:
return 2
# Durchschnitt und haeufigster Wert
avg_items = sum(items_per_row) / len(items_per_row)
if avg_items >= 2.5:
return 3 # 3 Spalten: Englisch | Deutsch | Beispiel
else:
return 2 # 2 Spalten: Englisch | Deutsch
def format_ocr_for_llm(regions: List[OCRRegion]) -> str:
"""
Formatiert OCR-Output fuer LLM-Verarbeitung.
Inkludiert Positionsinformationen fuer bessere Strukturerkennung.
"""
rows = group_regions_by_rows(regions)
num_columns = detect_columns(rows)
lines = []
lines.append(f"Erkannte Spalten: {num_columns}")
lines.append("---")
for row in rows:
if len(row) >= 2:
# Tab-separierte Werte fuer LLM
row_text = "\t".join(r.text for r in row)
lines.append(row_text)
elif len(row) == 1:
lines.append(row[0].text)
return "\n".join(lines)
# =============================================================================
# LLM Strukturierung
# =============================================================================
STRUCTURE_PROMPT = """Du erhältst OCR-Output einer Vokabelliste aus einem englischen Schulbuch.
STRUCTURE_PROMPT = """Du erhaeltst OCR-Output einer Vokabelliste aus einem englischen Schulbuch.
Die Zeilen sind Tab-separiert und enthalten typischerweise:
- 2 Spalten: Englisch | Deutsch
- 3 Spalten: Englisch | Deutsch | Beispielsatz
@@ -429,7 +46,7 @@ OCR-Text:
AUFGABE: Strukturiere die Vokabeln als JSON-Array.
AUSGABE-FORMAT (nur JSON, keine Erklärungen):
AUSGABE-FORMAT (nur JSON, keine Erklaerungen):
{{
"vocabulary": [
{{"english": "to improve", "german": "verbessern", "example": "I want to improve my English."}},
@@ -439,50 +56,32 @@ AUSGABE-FORMAT (nur JSON, keine Erklärungen):
REGELN:
1. Erkenne das Spalten-Layout aus den Tab-Trennungen
2. Korrigiere offensichtliche OCR-Fehler kontextuell (z.B. "vereessern" "verbessern", "0" "o")
3. Bei fehlenden Beispielsätzen: "example": null
4. Überspringe Überschriften, Seitenzahlen, Kapitelnummern
2. Korrigiere offensichtliche OCR-Fehler kontextuell (z.B. "vereessern" -> "verbessern", "0" -> "o")
3. Bei fehlenden Beispielsaetzen: "example": null
4. Ueberspringe Ueberschriften, Seitenzahlen, Kapitelnummern
5. Behalte Wortarten bei wenn vorhanden (n, v, adj am Ende des englischen Worts)
6. Gib NUR valides JSON zurück"""
6. Gib NUR valides JSON zurueck"""
async def structure_vocabulary_with_llm(ocr_text: str) -> List[Dict[str, Any]]:
"""
Verwendet Ollama LLM um OCR-Text zu strukturieren.
Args:
ocr_text: Formatierter OCR-Output
Returns:
Liste von Vokabel-Dictionaries
"""
"""Verwendet Ollama LLM um OCR-Text zu strukturieren."""
prompt = STRUCTURE_PROMPT.format(ocr_text=ocr_text)
try:
async with httpx.AsyncClient(timeout=120.0) as client:
# Use Ollama's native /api/chat endpoint
response = await client.post(
f"{OLLAMA_URL}/api/chat",
json={
"model": LLM_MODEL,
"messages": [
{"role": "user", "content": prompt}
],
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"options": {
"temperature": 0.1,
"num_predict": 4096
}
"options": {"temperature": 0.1, "num_predict": 4096}
}
)
response.raise_for_status()
data = response.json()
content = data.get("message", {}).get("content", "")
logger.info(f"Ollama LLM response received: {len(content)} chars")
# JSON parsen
return parse_llm_vocabulary_json(content)
except httpx.TimeoutException:
@@ -499,37 +98,29 @@ async def structure_vocabulary_with_llm(ocr_text: str) -> List[Dict[str, Any]]:
def parse_llm_vocabulary_json(text: str) -> List[Dict[str, Any]]:
"""Robustes JSON-Parsing des LLM-Outputs."""
try:
# JSON im Text finden
start = text.find('{')
end = text.rfind('}') + 1
if start == -1 or end == 0:
logger.warning("No JSON found in LLM response")
return []
json_str = text[start:end]
data = json.loads(json_str)
vocabulary = data.get("vocabulary", [])
# Validierung
valid_entries = []
for entry in vocabulary:
english = entry.get("english", "").strip()
german = entry.get("german", "").strip()
if english and german:
valid_entries.append({
"english": english,
"german": german,
"english": english, "german": german,
"example": entry.get("example")
})
return valid_entries
except json.JSONDecodeError as e:
logger.error(f"JSON parse error: {e}")
# Fallback: Regex extraction
return extract_vocabulary_regex(text)
except Exception as e:
logger.error(f"Vocabulary parsing failed: {e}")
@@ -544,11 +135,8 @@ def extract_vocabulary_regex(text: str) -> List[Dict[str, Any]]:
vocabulary = []
for english, german in matches:
vocabulary.append({
"english": english.strip(),
"german": german.strip(),
"example": None
"english": english.strip(), "german": german.strip(), "example": None
})
logger.info(f"Regex fallback extracted {len(vocabulary)} entries")
return vocabulary
@@ -558,46 +146,29 @@ def extract_vocabulary_regex(text: str) -> List[Dict[str, Any]]:
# =============================================================================
async def extract_vocabulary_hybrid(
image_bytes: bytes,
page_number: int = 0
image_bytes: bytes, page_number: int = 0
) -> Tuple[List[Dict[str, Any]], float, str]:
"""
Hybrid-Extraktion: PaddleOCR + LLM Strukturierung.
Args:
image_bytes: Bild als Bytes
page_number: Seitennummer (0-indexed) fuer Fehlermeldungen
Returns:
Tuple of (vocabulary_list, confidence, error_message)
"""
"""Hybrid-Extraktion: PaddleOCR + LLM Strukturierung."""
try:
# Step 1: PaddleOCR
logger.info(f"Starting hybrid extraction for page {page_number + 1}")
regions, raw_text = run_paddle_ocr(image_bytes)
if not regions:
return [], 0.0, f"Seite {page_number + 1}: Kein Text erkannt (OCR)"
# Step 2: Formatieren fuer LLM
formatted_text = format_ocr_for_llm(regions)
logger.info(f"Formatted OCR text: {len(formatted_text)} chars")
# Step 3: LLM Strukturierung
vocabulary = await structure_vocabulary_with_llm(formatted_text)
if not vocabulary:
# Fallback: Versuche direkte Zeilen-Analyse
vocabulary = extract_from_rows_directly(regions)
if not vocabulary:
return [], 0.0, f"Seite {page_number + 1}: Keine Vokabeln erkannt"
# Durchschnittliche OCR-Confidence
avg_confidence = sum(r.confidence for r in regions) / len(regions) if regions else 0.0
logger.info(f"Hybrid extraction completed: {len(vocabulary)} entries, {avg_confidence:.2f} confidence")
return vocabulary, avg_confidence, ""
except Exception as e:
@@ -608,10 +179,7 @@ async def extract_vocabulary_hybrid(
def extract_from_rows_directly(regions: List[OCRRegion]) -> List[Dict[str, Any]]:
"""
Direkter Fallback: Extrahiere Vokabeln ohne LLM basierend auf Zeilen-Struktur.
Funktioniert nur bei klarem 2-3 Spalten-Layout.
"""
"""Direkter Fallback: Extrahiere Vokabeln ohne LLM."""
rows = group_regions_by_rows(regions)
vocabulary = []
@@ -620,13 +188,9 @@ def extract_from_rows_directly(regions: List[OCRRegion]) -> List[Dict[str, Any]]
english = row[0].text.strip()
german = row[1].text.strip()
example = row[2].text.strip() if len(row) >= 3 else None
# Einfache Validierung
if english and german and len(english) > 1 and len(german) > 1:
vocabulary.append({
"english": english,
"german": german,
"example": example
"english": english, "german": german, "example": example
})
logger.info(f"Direct row extraction: {len(vocabulary)} entries")

View File

@@ -0,0 +1,300 @@
"""
Hybrid Vocab OCR - PaddleOCR integration and result parsing.
Handles:
- PaddleOCR lazy loading and initialization
- Running OCR on image bytes
- Parsing PaddleOCR v3 dict and traditional list formats
- Grouping regions by rows and detecting columns
"""
import io
import logging
from typing import List, Tuple
from dataclasses import dataclass
import numpy as np
from PIL import Image
# OpenCV is optional
try:
import cv2
CV2_AVAILABLE = True
except ImportError:
cv2 = None
CV2_AVAILABLE = False
logger = logging.getLogger(__name__)
_paddle_ocr = None
@dataclass
class OCRRegion:
"""Ein erkannter Textbereich mit Position."""
text: str
confidence: float
x1: int
y1: int
x2: int
y2: int
@property
def center_x(self) -> int:
return (self.x1 + self.x2) // 2
@property
def center_y(self) -> int:
return (self.y1 + self.y2) // 2
def get_paddle_ocr():
"""Lazy load PaddleOCR to avoid startup delay."""
global _paddle_ocr
if _paddle_ocr is None:
try:
from paddleocr import PaddleOCR
import logging as std_logging
for logger_name in ['ppocr', 'paddle', 'paddleocr', 'root']:
std_logging.getLogger(logger_name).setLevel(std_logging.WARNING)
try:
_paddle_ocr = PaddleOCR(lang="de")
logger.info("PaddleOCR 3.x initialized (lang=de)")
except Exception as e1:
logger.warning(f"PaddleOCR lang=de failed: {e1}")
try:
_paddle_ocr = PaddleOCR(lang="en")
logger.info("PaddleOCR 3.x initialized (lang=en)")
except Exception as e2:
logger.warning(f"PaddleOCR lang=en failed: {e2}")
_paddle_ocr = PaddleOCR()
logger.info("PaddleOCR 3.x initialized (defaults)")
except Exception as e:
logger.error(f"PaddleOCR initialization failed: {e}")
_paddle_ocr = None
return _paddle_ocr
def preprocess_image(img: Image.Image) -> np.ndarray:
"""Bildvorverarbeitung fuer bessere OCR-Ergebnisse."""
if not CV2_AVAILABLE:
raise ImportError(
"OpenCV (cv2) is required for image preprocessing. "
"Install with: pip install opencv-python-headless"
)
img_array = np.array(img)
if len(img_array.shape) == 2:
img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB)
elif img_array.shape[2] == 4:
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGBA2RGB)
return img_array
def run_paddle_ocr(image_bytes: bytes) -> Tuple[List[OCRRegion], str]:
"""Fuehrt PaddleOCR auf einem Bild aus."""
ocr = get_paddle_ocr()
if ocr is None:
logger.error("PaddleOCR not available")
return [], ""
try:
img = Image.open(io.BytesIO(image_bytes))
img_array = preprocess_image(img)
try:
result = ocr.ocr(img_array)
except TypeError:
logger.warning("Trying alternative OCR call method")
result = ocr.ocr(img_array)
if not result:
logger.warning("PaddleOCR returned empty result")
return [], ""
if isinstance(result, dict):
logger.info("Processing PaddleOCR 3.x dict format")
return _parse_paddleocr_v3_dict(result)
elif isinstance(result, list) and len(result) > 0:
first_item = result[0]
if first_item is None:
logger.warning("PaddleOCR returned None for first page")
return [], ""
if hasattr(first_item, 'get') or isinstance(first_item, dict):
item_dict = dict(first_item) if hasattr(first_item, 'items') else first_item
if 'rec_texts' in item_dict or 'texts' in item_dict:
logger.info("Processing PaddleOCR 3.x OCRResult format")
return _parse_paddleocr_v3_dict(item_dict)
if isinstance(first_item, list):
if len(first_item) > 0 and isinstance(first_item[0], (list, tuple)):
logger.info("Processing PaddleOCR traditional list format")
return _parse_paddleocr_list(first_item)
logger.warning(f"Unknown result format. Type: {type(first_item)}")
try:
item_dict = dict(first_item)
if 'rec_texts' in item_dict:
return _parse_paddleocr_v3_dict(item_dict)
except Exception as e:
logger.warning(f"Could not convert to dict: {e}")
return [], ""
else:
logger.warning(f"Unexpected PaddleOCR result type: {type(result)}")
return [], ""
except Exception as e:
logger.error(f"PaddleOCR execution failed: {e}")
import traceback
logger.error(traceback.format_exc())
return [], ""
def _parse_paddleocr_v3_dict(result: dict) -> Tuple[List[OCRRegion], str]:
"""Parse PaddleOCR 3.x dict format result."""
regions = []
all_text_lines = []
texts = result.get('rec_texts', result.get('texts', []))
scores = result.get('rec_scores', result.get('scores', []))
polys = result.get('dt_polys', result.get('boxes', []))
rec_boxes = result.get('rec_boxes', [])
logger.info(f"PaddleOCR 3.x: {len(texts)} texts, {len(scores)} scores, {len(polys)} polys, {len(rec_boxes)} rec_boxes")
for i, (text, score) in enumerate(zip(texts, scores)):
if not text or not str(text).strip():
continue
x1, y1, x2, y2 = 0, 0, 100, 50
if i < len(rec_boxes) and rec_boxes[i] is not None:
box = rec_boxes[i]
try:
if hasattr(box, 'flatten'):
box = box.flatten().tolist()
if len(box) >= 4:
x1, y1, x2, y2 = int(box[0]), int(box[1]), int(box[2]), int(box[3])
except Exception as e:
logger.debug(f"Could not parse rec_box: {e}")
elif i < len(polys) and polys[i] is not None:
poly = polys[i]
try:
if hasattr(poly, 'tolist'):
poly = poly.tolist()
if len(poly) >= 4:
x_coords = [p[0] for p in poly]
y_coords = [p[1] for p in poly]
x1, y1 = int(min(x_coords)), int(min(y_coords))
x2, y2 = int(max(x_coords)), int(max(y_coords))
except Exception as e:
logger.debug(f"Could not parse polygon: {e}")
region = OCRRegion(
text=text.strip(), confidence=float(score) if score else 0.5,
x1=x1, y1=y1, x2=x2, y2=y2
)
regions.append(region)
all_text_lines.append(text.strip())
regions.sort(key=lambda r: r.y1)
raw_text = "\n".join(all_text_lines)
logger.info(f"PaddleOCR 3.x extracted {len(regions)} text regions")
return regions, raw_text
def _parse_paddleocr_list(page_result: list) -> Tuple[List[OCRRegion], str]:
"""Parse PaddleOCR traditional list format result."""
regions = []
all_text_lines = []
for line in page_result:
if not line or len(line) < 2:
continue
bbox_points = line[0]
text_info = line[1]
if isinstance(text_info, tuple) and len(text_info) >= 2:
text, confidence = text_info[0], text_info[1]
elif isinstance(text_info, str):
text, confidence = text_info, 0.5
else:
continue
if not text or not text.strip():
continue
x_coords = [p[0] for p in bbox_points]
y_coords = [p[1] for p in bbox_points]
region = OCRRegion(
text=text.strip(), confidence=float(confidence),
x1=int(min(x_coords)), y1=int(min(y_coords)),
x2=int(max(x_coords)), y2=int(max(y_coords))
)
regions.append(region)
all_text_lines.append(text.strip())
regions.sort(key=lambda r: r.y1)
raw_text = "\n".join(all_text_lines)
logger.info(f"PaddleOCR extracted {len(regions)} text regions")
return regions, raw_text
def group_regions_by_rows(regions: List[OCRRegion], y_tolerance: int = 20) -> List[List[OCRRegion]]:
"""Gruppiert Textregionen in Zeilen basierend auf Y-Position."""
if not regions:
return []
rows = []
current_row = [regions[0]]
current_y = regions[0].center_y
for region in regions[1:]:
if abs(region.center_y - current_y) <= y_tolerance:
current_row.append(region)
else:
current_row.sort(key=lambda r: r.x1)
rows.append(current_row)
current_row = [region]
current_y = region.center_y
if current_row:
current_row.sort(key=lambda r: r.x1)
rows.append(current_row)
return rows
def detect_columns(rows: List[List[OCRRegion]]) -> int:
"""Erkennt die Anzahl der Spalten basierend auf den Textpositionen."""
if not rows:
return 2
items_per_row = [len(row) for row in rows if len(row) >= 2]
if not items_per_row:
return 2
avg_items = sum(items_per_row) / len(items_per_row)
return 3 if avg_items >= 2.5 else 2
def format_ocr_for_llm(regions: List[OCRRegion]) -> str:
"""Formatiert OCR-Output fuer LLM-Verarbeitung."""
rows = group_regions_by_rows(regions)
num_columns = detect_columns(rows)
lines = [f"Erkannte Spalten: {num_columns}", "---"]
for row in rows:
if len(row) >= 2:
lines.append("\t".join(r.text for r in row))
elif len(row) == 1:
lines.append(row[0].text)
return "\n".join(lines)