""" Legacy cell-grid construction (v1) -- DEPRECATED, kept for backward compat. Extracted from cv_cell_grid.py. Prefer build_cell_grid_v2 for new code. Lizenz: Apache 2.0 (kommerziell nutzbar) DATENSCHUTZ: Alle Verarbeitung erfolgt lokal. """ import logging from typing import Any, Dict, List, Optional, Tuple import numpy as np from cv_vocab_types import PageRegion, RowGeometry from cv_ocr_engines import ( RAPIDOCR_AVAILABLE, _assign_row_words_to_columns, _clean_cell_text, _words_to_reading_order_text, ocr_region_lighton, ocr_region_rapid, ocr_region_trocr, ) from cv_cell_grid_helpers import ( _MIN_WORD_CONF, _compute_cell_padding, _ensure_minimum_crop_size, _heal_row_gaps, _is_artifact_row, _select_psm_for_column, ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # _ocr_single_cell — legacy per-cell OCR with multi-level fallback # --------------------------------------------------------------------------- def _ocr_single_cell( row_idx: int, col_idx: int, row: RowGeometry, col: PageRegion, ocr_img: np.ndarray, img_bgr: Optional[np.ndarray], img_w: int, img_h: int, use_rapid: bool, engine_name: str, lang: str, lang_map: Dict[str, str], preassigned_words: Optional[List[Dict]] = None, ) -> Dict[str, Any]: """Populate a single cell (column x row intersection) via word lookup.""" # Display bbox: exact column x row intersection (no padding) disp_x = col.x disp_y = row.y disp_w = col.width disp_h = row.height # OCR crop: adaptive padding -- narrow columns get more context pad = _compute_cell_padding(col.width, img_w) cell_x = max(0, col.x - pad) cell_y = max(0, row.y - pad) cell_w = min(col.width + 2 * pad, img_w - cell_x) cell_h = min(row.height + 2 * pad, img_h - cell_y) is_narrow = (col.width / img_w * 100) < 15 if img_w > 0 else False if disp_w <= 0 or disp_h <= 0: return { 'cell_id': f"R{row_idx:02d}_C{col_idx}", 'row_index': row_idx, 'col_index': col_idx, 'col_type': col.type, 'text': '', 'confidence': 0.0, 'bbox_px': {'x': col.x, 'y': row.y, 'w': col.width, 'h': row.height}, 'bbox_pct': { 'x': round(col.x / img_w * 100, 2), 'y': round(row.y / img_h * 100, 2), 'w': round(col.width / img_w * 100, 2), 'h': round(row.height / img_h * 100, 2), }, 'ocr_engine': 'word_lookup', } # --- PRIMARY: Word-lookup from full-page Tesseract --- words = preassigned_words if preassigned_words is not None else [] used_engine = 'word_lookup' # Filter low-confidence words if words: words = [w for w in words if w.get('conf', 0) >= _MIN_WORD_CONF] if words: y_tol = max(15, row.height) text = _words_to_reading_order_text(words, y_tolerance_px=y_tol) avg_conf = round(sum(w['conf'] for w in words) / len(words), 1) else: text = '' avg_conf = 0.0 # --- FALLBACK: Cell-OCR for empty cells --- _run_fallback = False if not text.strip() and cell_w > 0 and cell_h > 0: if ocr_img is not None: crop = ocr_img[cell_y:cell_y + cell_h, cell_x:cell_x + cell_w] if crop.size > 0: dark_ratio = float(np.count_nonzero(crop < 180)) / crop.size _run_fallback = dark_ratio > 0.005 if _run_fallback: # For narrow columns, upscale the crop before OCR if is_narrow and ocr_img is not None: _crop_slice = ocr_img[cell_y:cell_y + cell_h, cell_x:cell_x + cell_w] _upscaled = _ensure_minimum_crop_size(_crop_slice) if _upscaled is not _crop_slice: _up_h, _up_w = _upscaled.shape[:2] _tmp_region = PageRegion( type=col.type, x=0, y=0, width=_up_w, height=_up_h, ) _cell_psm = _select_psm_for_column(col.type, col.width, row.height) cell_lang = lang_map.get(col.type, lang) fallback_words = ocr_region(_upscaled, _tmp_region, lang=cell_lang, psm=_cell_psm) # Remap word positions back to original image coordinates _sx = cell_w / max(_up_w, 1) _sy = cell_h / max(_up_h, 1) for _fw in (fallback_words or []): _fw['left'] = int(_fw['left'] * _sx) + cell_x _fw['top'] = int(_fw['top'] * _sy) + cell_y _fw['width'] = int(_fw['width'] * _sx) _fw['height'] = int(_fw['height'] * _sy) else: cell_region = PageRegion( type=col.type, x=cell_x, y=cell_y, width=cell_w, height=cell_h, ) _cell_psm = _select_psm_for_column(col.type, col.width, row.height) cell_lang = lang_map.get(col.type, lang) fallback_words = ocr_region(ocr_img, cell_region, lang=cell_lang, psm=_cell_psm) else: cell_region = PageRegion( type=col.type, x=cell_x, y=cell_y, width=cell_w, height=cell_h, ) if engine_name in ("trocr-printed", "trocr-handwritten") and img_bgr is not None: fallback_words = ocr_region_trocr(img_bgr, cell_region, handwritten=(engine_name == "trocr-handwritten")) elif engine_name == "lighton" and img_bgr is not None: fallback_words = ocr_region_lighton(img_bgr, cell_region) elif use_rapid and img_bgr is not None: fallback_words = ocr_region_rapid(img_bgr, cell_region) else: _cell_psm = _select_psm_for_column(col.type, col.width, row.height) cell_lang = lang_map.get(col.type, lang) fallback_words = ocr_region(ocr_img, cell_region, lang=cell_lang, psm=_cell_psm) if fallback_words: fallback_words = [w for w in fallback_words if w.get('conf', 0) >= _MIN_WORD_CONF] if fallback_words: fb_avg_h = sum(w['height'] for w in fallback_words) / len(fallback_words) fb_y_tol = max(10, int(fb_avg_h * 0.5)) fb_text = _words_to_reading_order_text(fallback_words, y_tolerance_px=fb_y_tol) if fb_text.strip(): text = fb_text avg_conf = round( sum(w['conf'] for w in fallback_words) / len(fallback_words), 1 ) used_engine = 'cell_ocr_fallback' # --- SECONDARY FALLBACK: PSM=7 (single line) for still-empty cells --- if not text.strip() and _run_fallback and not use_rapid: _fb_region = PageRegion( type=col.type, x=cell_x, y=cell_y, width=cell_w, height=cell_h, ) cell_lang = lang_map.get(col.type, lang) psm7_words = ocr_region(ocr_img, _fb_region, lang=cell_lang, psm=7) if psm7_words: psm7_words = [w for w in psm7_words if w.get('conf', 0) >= _MIN_WORD_CONF] if psm7_words: p7_text = _words_to_reading_order_text(psm7_words, y_tolerance_px=10) if p7_text.strip(): text = p7_text avg_conf = round( sum(w['conf'] for w in psm7_words) / len(psm7_words), 1 ) used_engine = 'cell_ocr_psm7' # --- TERTIARY FALLBACK: Row-strip re-OCR for narrow columns --- if not text.strip() and is_narrow and img_bgr is not None: row_region = PageRegion( type='_row_strip', x=0, y=row.y, width=img_w, height=row.height, ) strip_words = ocr_region_rapid(img_bgr, row_region) if strip_words: col_left = col.x col_right = col.x + col.width col_words = [] for sw in strip_words: sw_left = sw.get('left', 0) sw_right = sw_left + sw.get('width', 0) overlap = max(0, min(sw_right, col_right) - max(sw_left, col_left)) if overlap > sw.get('width', 1) * 0.3: col_words.append(sw) if col_words: col_words = [w for w in col_words if w.get('conf', 0) >= _MIN_WORD_CONF] if col_words: rs_text = _words_to_reading_order_text(col_words, y_tolerance_px=row.height) if rs_text.strip(): text = rs_text avg_conf = round( sum(w['conf'] for w in col_words) / len(col_words), 1 ) used_engine = 'row_strip_rapid' # --- NOISE FILTER: clear cells that contain only OCR artifacts --- if text.strip(): text = _clean_cell_text(text) if not text: avg_conf = 0.0 return { 'cell_id': f"R{row_idx:02d}_C{col_idx}", 'row_index': row_idx, 'col_index': col_idx, 'col_type': col.type, 'text': text, 'confidence': avg_conf, 'bbox_px': {'x': disp_x, 'y': disp_y, 'w': disp_w, 'h': disp_h}, 'bbox_pct': { 'x': round(disp_x / img_w * 100, 2), 'y': round(disp_y / img_h * 100, 2), 'w': round(disp_w / img_w * 100, 2), 'h': round(disp_h / img_h * 100, 2), }, 'ocr_engine': used_engine, } # --------------------------------------------------------------------------- # build_cell_grid — legacy grid builder (DEPRECATED) # --------------------------------------------------------------------------- def build_cell_grid( ocr_img: np.ndarray, column_regions: List[PageRegion], row_geometries: List[RowGeometry], img_w: int, img_h: int, lang: str = "eng+deu", ocr_engine: str = "auto", img_bgr: Optional[np.ndarray] = None, ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: """Generic Cell-Grid: Columns x Rows -> cells with OCR text. DEPRECATED: Use build_cell_grid_v2 instead. """ # Resolve engine choice use_rapid = False if ocr_engine in ("trocr-printed", "trocr-handwritten", "lighton"): engine_name = ocr_engine elif ocr_engine == "auto": use_rapid = RAPIDOCR_AVAILABLE and img_bgr is not None engine_name = "rapid" if use_rapid else "tesseract" elif ocr_engine == "rapid": if not RAPIDOCR_AVAILABLE: logger.warning("RapidOCR requested but not available, falling back to Tesseract") else: use_rapid = True engine_name = "rapid" if use_rapid else "tesseract" else: engine_name = "tesseract" logger.info(f"build_cell_grid: using OCR engine '{engine_name}'") # Filter to content rows only (skip header/footer) content_rows = [r for r in row_geometries if r.row_type == 'content'] if not content_rows: logger.warning("build_cell_grid: no content rows found") return [], [] before = len(content_rows) content_rows = [r for r in content_rows if r.word_count > 0] skipped = before - len(content_rows) if skipped > 0: logger.info(f"build_cell_grid: skipped {skipped} phantom rows (word_count=0)") if not content_rows: logger.warning("build_cell_grid: no content rows with words found") return [], [] _skip_types = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'margin_left', 'margin_right'} relevant_cols = [c for c in column_regions if c.type not in _skip_types] if not relevant_cols: logger.warning("build_cell_grid: no usable columns found") return [], [] before_art = len(content_rows) content_rows = [r for r in content_rows if not _is_artifact_row(r)] artifact_skipped = before_art - len(content_rows) if artifact_skipped > 0: logger.info(f"build_cell_grid: skipped {artifact_skipped} artifact rows (all single-char words)") if not content_rows: logger.warning("build_cell_grid: no content rows after artifact filtering") return [], [] _heal_row_gaps( content_rows, top_bound=min(c.y for c in relevant_cols), bottom_bound=max(c.y + c.height for c in relevant_cols), ) relevant_cols.sort(key=lambda c: c.x) columns_meta = [ { 'index': col_idx, 'type': col.type, 'x': col.x, 'width': col.width, } for col_idx, col in enumerate(relevant_cols) ] lang_map = { 'column_en': 'eng', 'column_de': 'deu', 'column_example': 'eng+deu', } cells: List[Dict[str, Any]] = [] for row_idx, row in enumerate(content_rows): col_words = _assign_row_words_to_columns(row, relevant_cols) for col_idx, col in enumerate(relevant_cols): cell = _ocr_single_cell( row_idx, col_idx, row, col, ocr_img, img_bgr, img_w, img_h, use_rapid, engine_name, lang, lang_map, preassigned_words=col_words[col_idx], ) cells.append(cell) # --- BATCH FALLBACK: re-OCR empty cells by column strip --- empty_by_col: Dict[int, List[int]] = {} for ci, cell in enumerate(cells): if not cell['text'].strip() and cell.get('ocr_engine') != 'cell_ocr_psm7': bpx = cell['bbox_px'] x, y, w, h = bpx['x'], bpx['y'], bpx['w'], bpx['h'] if w > 0 and h > 0 and ocr_img is not None: crop = ocr_img[y:y + h, x:x + w] if crop.size > 0: dark_ratio = float(np.count_nonzero(crop < 180)) / crop.size if dark_ratio > 0.005: empty_by_col.setdefault(cell['col_index'], []).append(ci) for col_idx, cell_indices in empty_by_col.items(): if len(cell_indices) < 3: continue min_y = min(cells[ci]['bbox_px']['y'] for ci in cell_indices) max_y_h = max(cells[ci]['bbox_px']['y'] + cells[ci]['bbox_px']['h'] for ci in cell_indices) col_x = cells[cell_indices[0]]['bbox_px']['x'] col_w = cells[cell_indices[0]]['bbox_px']['w'] strip_region = PageRegion( type=relevant_cols[col_idx].type, x=col_x, y=min_y, width=col_w, height=max_y_h - min_y, ) strip_lang = lang_map.get(relevant_cols[col_idx].type, lang) if engine_name in ("trocr-printed", "trocr-handwritten") and img_bgr is not None: strip_words = ocr_region_trocr(img_bgr, strip_region, handwritten=(engine_name == "trocr-handwritten")) elif engine_name == "lighton" and img_bgr is not None: strip_words = ocr_region_lighton(img_bgr, strip_region) elif use_rapid and img_bgr is not None: strip_words = ocr_region_rapid(img_bgr, strip_region) else: strip_words = ocr_region(ocr_img, strip_region, lang=strip_lang, psm=6) if not strip_words: continue strip_words = [w for w in strip_words if w.get('conf', 0) >= 30] if not strip_words: continue for ci in cell_indices: cell_y = cells[ci]['bbox_px']['y'] cell_h = cells[ci]['bbox_px']['h'] cell_mid_y = cell_y + cell_h / 2 matched_words = [ w for w in strip_words if abs((w['top'] + w['height'] / 2) - cell_mid_y) < cell_h * 0.8 ] if matched_words: matched_words.sort(key=lambda w: w['left']) batch_text = ' '.join(w['text'] for w in matched_words) batch_text = _clean_cell_text(batch_text) if batch_text.strip(): cells[ci]['text'] = batch_text cells[ci]['confidence'] = round( sum(w['conf'] for w in matched_words) / len(matched_words), 1 ) cells[ci]['ocr_engine'] = 'batch_column_ocr' batch_filled = sum(1 for ci in cell_indices if cells[ci]['text'].strip()) if batch_filled > 0: logger.info( f"build_cell_grid: batch OCR filled {batch_filled}/{len(cell_indices)} " f"empty cells in column {col_idx}" ) # Remove all-empty rows rows_with_text: set = set() for cell in cells: if cell['text'].strip(): rows_with_text.add(cell['row_index']) before_filter = len(cells) cells = [c for c in cells if c['row_index'] in rows_with_text] empty_rows_removed = (before_filter - len(cells)) // max(len(relevant_cols), 1) if empty_rows_removed > 0: logger.info(f"build_cell_grid: removed {empty_rows_removed} all-empty rows after OCR") logger.info(f"build_cell_grid: {len(cells)} cells from " f"{len(content_rows)} rows x {len(relevant_cols)} columns, " f"engine={engine_name}") return cells, columns_meta