""" OCR engines (RapidOCR, TrOCR, LightOn) and re-exports. This module contains the OCR engine wrappers and re-exports all functions from the split sub-modules for backward compatibility. Sub-modules: - cv_ocr_word_assembly: Word grouping and text assembly - cv_ocr_vocab_postprocess: Vocabulary postprocessing (char confusion, comma split) - cv_ocr_ipa_lookup: Core IPA lookup and bracket handling - cv_ocr_ipa_repair: Advanced IPA repair (continuation cells, post-bracket cleanup) - cv_ocr_cell_phonetics: Cell-level phonetics for overlay - cv_ocr_cell_filter: Cell text filtering, column assignment, bold detection Lizenz: Apache 2.0 (kommerziell nutzbar) DATENSCHUTZ: Alle Verarbeitung erfolgt lokal. """ import io import logging import os import re from typing import Any, Dict, List, Optional, Tuple import numpy as np from cv_vocab_types import ( IPA_AVAILABLE, PageRegion, RowGeometry, _britfone_dict, _ipa_convert_american, ) logger = logging.getLogger(__name__) try: import cv2 except ImportError: cv2 = None # type: ignore[assignment] try: from PIL import Image except ImportError: Image = None # type: ignore[assignment,misc] # ── Re-exports from sub-modules (backward compatibility) ────────────────── from cv_ocr_word_assembly import ( # noqa: F401 _group_words_into_lines, _words_to_reading_order_lines, _rejoin_hyphenated, _words_to_reading_order_text, _words_to_spaced_text, ) from cv_ocr_vocab_postprocess import ( # noqa: F401 _CHAR_CONFUSION_RULES, _DE_INDICATORS_FOR_EN_I, _fix_character_confusion, _is_singular_plural_pair, _split_comma_entries, _split_by_comma, _find_best_vocab_match, _attach_example_sentences, ) from cv_ocr_ipa_lookup import ( # noqa: F401 _PHONETIC_BRACKET_RE, _IPA_CHARS, _MIN_WORD_CONF, _GRAMMAR_BRACKET_WORDS, _lookup_ipa, _fix_phonetic_brackets, _is_grammar_bracket_content, _replace_phonetics_in_text, _text_has_garbled_ipa, _decompose_compound, _insert_missing_ipa, ) from cv_ocr_ipa_repair import ( # noqa: F401 _has_non_dict_trailing, _strip_post_bracket_garbled, fix_ipa_continuation_cell, _insert_headword_ipa, ) from cv_ocr_cell_phonetics import ( # noqa: F401 fix_cell_phonetics, _has_ipa_gap, _sync_word_boxes_after_ipa_insert, ) from cv_ocr_cell_filter import ( # noqa: F401 _RE_REAL_WORD, _RE_ALPHA, _COMMON_SHORT_WORDS, _KNOWN_ABBREVIATIONS, _assign_row_words_to_columns, _is_noise_tail_token, _is_garbage_text, _clean_cell_text, _clean_cell_text_lite, _measure_stroke_width, _classify_bold_cells, ) # ── OCR Engine Wrappers ─────────────────────────────────────────────────── _rapid_engine = None RAPIDOCR_AVAILABLE = False try: from rapidocr import RapidOCR as _RapidOCRClass from rapidocr import LangRec as _LangRec, OCRVersion as _OCRVersion, ModelType as _ModelType RAPIDOCR_AVAILABLE = True logger.info("RapidOCR available — can be used as alternative to Tesseract") except ImportError: logger.info("RapidOCR not installed — using Tesseract only") def _get_rapid_engine(): """Lazy-init RapidOCR engine with PP-OCRv5 Latin model for German support.""" global _rapid_engine if _rapid_engine is None: _rapid_engine = _RapidOCRClass(params={ "Rec.lang_type": _LangRec.LATIN, "Rec.model_type": _ModelType.SERVER, "Rec.ocr_version": _OCRVersion.PPOCRV5, "Det.unclip_ratio": 1.3, "Det.box_thresh": 0.4, "Global.log_level": "critical", }) logger.info("RapidOCR engine initialized (PP-OCRv5 Latin, unclip_ratio=1.3)") return _rapid_engine def ocr_region_rapid( img_bgr: np.ndarray, region: PageRegion, ) -> List[Dict[str, Any]]: """Run RapidOCR on a specific region, returning word dicts compatible with Tesseract format.""" engine = _get_rapid_engine() crop = img_bgr[region.y:region.y + region.height, region.x:region.x + region.width] if crop.size == 0: return [] result = engine(crop) if result is None or result.boxes is None or result.txts is None: return [] words = [] boxes = result.boxes txts = result.txts scores = result.scores for i, (box, txt, score) in enumerate(zip(boxes, txts, scores)): if not txt or not txt.strip(): continue xs = [p[0] for p in box] ys = [p[1] for p in box] left = int(min(xs)) top = int(min(ys)) w = int(max(xs) - left) h = int(max(ys) - top) words.append({ 'text': txt.strip(), 'left': left + region.x, 'top': top + region.y, 'width': w, 'height': h, 'conf': int(score * 100), 'region_type': region.type, }) return words def ocr_region_trocr(img_bgr: np.ndarray, region: PageRegion, handwritten: bool = False) -> List[Dict[str, Any]]: """Run TrOCR on a region. Returns line-level word dicts.""" from services.trocr_service import get_trocr_model, _split_into_lines, _check_trocr_available if not _check_trocr_available(): logger.warning("TrOCR not available, falling back to Tesseract") if region.height > 0 and region.width > 0: ocr_img_crop = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) if img_bgr is not None else None if ocr_img_crop is not None: return ocr_region(ocr_img_crop, region, lang="eng+deu", psm=6) return [] crop = img_bgr[region.y:region.y + region.height, region.x:region.x + region.width] if crop.size == 0: return [] try: import torch from PIL import Image as _PILImage processor, model = get_trocr_model(handwritten=handwritten) if processor is None or model is None: logger.warning("TrOCR model not loaded, falling back to Tesseract") ocr_img_crop = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) return ocr_region(ocr_img_crop, region, lang="eng+deu", psm=6) pil_crop = _PILImage.fromarray(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)) lines = _split_into_lines(pil_crop) if not lines: lines = [pil_crop] device = next(model.parameters()).device all_text = [] confidences = [] for line_img in lines: pixel_values = processor(images=line_img, return_tensors="pt").pixel_values.to(device) with torch.no_grad(): generated_ids = model.generate(pixel_values, max_length=128) text_line = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip() if text_line: all_text.append(text_line) confidences.append(0.85 if len(text_line) > 3 else 0.5) if not all_text: return [] avg_conf = int(sum(confidences) / len(confidences) * 100) line_h = region.height // max(len(all_text), 1) words = [] for i, line in enumerate(all_text): words.append({ "text": line, "left": region.x, "top": region.y + i * line_h, "width": region.width, "height": line_h, "conf": avg_conf, "region_type": region.type, }) return words except Exception as e: logger.error(f"ocr_region_trocr failed: {e}") return [] def ocr_region_lighton(img_bgr: np.ndarray, region: PageRegion) -> List[Dict[str, Any]]: """Run LightOnOCR-2-1B on a region. Returns line-level word dicts.""" from services.lighton_ocr_service import get_lighton_model, _check_lighton_available if not _check_lighton_available(): logger.warning("LightOnOCR not available, falling back to RapidOCR/Tesseract") if RAPIDOCR_AVAILABLE and img_bgr is not None: return ocr_region_rapid(img_bgr, region) ocr_img_crop = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) if img_bgr is not None else None return ocr_region(ocr_img_crop, region, lang="eng+deu", psm=6) if ocr_img_crop is not None else [] crop = img_bgr[region.y:region.y + region.height, region.x:region.x + region.width] if crop.size == 0: return [] try: import io import torch from PIL import Image as _PILImage processor, model = get_lighton_model() if processor is None or model is None: logger.warning("LightOnOCR model not loaded, falling back to RapidOCR/Tesseract") if RAPIDOCR_AVAILABLE and img_bgr is not None: return ocr_region_rapid(img_bgr, region) ocr_img_crop = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) return ocr_region(ocr_img_crop, region, lang="eng+deu", psm=6) pil_crop = _PILImage.fromarray(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)) conversation = [{"role": "user", "content": [{"type": "image"}]}] inputs = processor.apply_chat_template( conversation, images=[pil_crop], add_generation_prompt=True, return_tensors="pt" ).to(model.device) with torch.no_grad(): output_ids = model.generate(**inputs, max_new_tokens=1024) text = processor.decode(output_ids[0], skip_special_tokens=True).strip() if not text: return [] lines = [l.strip() for l in text.split("\n") if l.strip()] line_h = region.height // max(len(lines), 1) words = [] for i, line in enumerate(lines): words.append({ "text": line, "left": region.x, "top": region.y + i * line_h, "width": region.width, "height": line_h, "conf": 85, "region_type": region.type, }) return words except Exception as e: logger.error(f"ocr_region_lighton failed: {e}") return [] async def ocr_region_paddle( img_bgr: np.ndarray, region: Optional["PageRegion"] = None, ) -> List[Dict[str, Any]]: """Run OCR via local RapidOCR (default) or remote PaddleOCR (fallback).""" force_remote = os.environ.get("FORCE_REMOTE_PADDLE", "").strip() == "1" if not force_remote: try: if region is None: h, w = img_bgr.shape[:2] _region = PageRegion(type="full_page", x=0, y=0, width=w, height=h) else: _region = region words = ocr_region_rapid(img_bgr, _region) if words: logger.info("ocr_region_paddle: used local RapidOCR (%d words)", len(words)) return words logger.warning("ocr_region_paddle: RapidOCR returned 0 words, trying remote") except Exception as e: logger.warning("ocr_region_paddle: RapidOCR failed (%s), trying remote", e) from services.paddleocr_remote import ocr_remote_paddle if region is not None: crop = img_bgr[ region.y : region.y + region.height, region.x : region.x + region.width, ] offset_x, offset_y = region.x, region.y else: crop = img_bgr offset_x, offset_y = 0, 0 if crop.size == 0: return [] h, w = crop.shape[:2] scale = 1.0 _MAX_DIM = 1500 if max(h, w) > _MAX_DIM: scale = _MAX_DIM / max(h, w) new_w, new_h = int(w * scale), int(h * scale) crop = cv2.resize(crop, (new_w, new_h), interpolation=cv2.INTER_AREA) logger.info("ocr_region_paddle: downscaled %dx%d → %dx%d (scale=%.2f)", w, h, new_w, new_h, scale) success, jpg_buf = cv2.imencode(".jpg", crop, [cv2.IMWRITE_JPEG_QUALITY, 90]) if not success: logger.error("ocr_region_paddle: cv2.imencode failed") return [] words, _w, _h = await ocr_remote_paddle(jpg_buf.tobytes(), filename="scan.jpg") logger.info("ocr_region_paddle: used remote PaddleOCR (%d words)", len(words)) inv_scale = 1.0 / scale if scale != 1.0 else 1.0 for wd in words: wd["left"] = int(wd["left"] * inv_scale) + offset_x wd["top"] = int(wd["top"] * inv_scale) + offset_y wd["width"] = int(wd["width"] * inv_scale) wd["height"] = int(wd["height"] * inv_scale) if region is not None: wd["region_type"] = region.type return words