Files
breakpilot-lehrer/klausur-service/backend/cv_cell_grid.py
Benjamin Admin 0ee92e7210
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 37s
CI / test-go-edu-search (push) Successful in 32s
CI / test-python-klausur (push) Failing after 2m10s
CI / test-python-agent-core (push) Successful in 19s
CI / test-nodejs-website (push) Successful in 20s
feat: OCR word_boxes fuer pixelgenaue Overlay-Positionierung
Backend: _ocr_cell_crop speichert jetzt word_boxes mit exakten
Tesseract/RapidOCR Wort-Koordinaten (left, top, width, height)
im Cell-Ergebnis. Absolute Bildkoordinaten, bereits zurueckgemappt.

Frontend: Slide-Hook nutzt word_boxes direkt wenn vorhanden —
jedes Wort wird exakt an seiner OCR-Position platziert. Kein
Pixel-Scanning noetig. Fallback auf alten Slide wenn keine Boxes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 19:39:49 +01:00

1575 lines
61 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Cell-grid construction (v2 + legacy), vocab conversion, and word-grid OCR.
Lizenz: Apache 2.0 (kommerziell nutzbar)
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import logging
import re
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, Generator, List, Optional, Tuple
import numpy as np
from cv_vocab_types import PageRegion, RowGeometry
from cv_ocr_engines import (
RAPIDOCR_AVAILABLE,
_RE_ALPHA,
_assign_row_words_to_columns,
_attach_example_sentences,
_clean_cell_text,
_clean_cell_text_lite,
_fix_phonetic_brackets,
_split_comma_entries,
_words_to_reading_order_text,
_words_to_spaced_text,
ocr_region_lighton,
ocr_region_rapid,
ocr_region_trocr,
)
logger = logging.getLogger(__name__)
try:
import cv2
except ImportError:
cv2 = None # type: ignore[assignment]
try:
from PIL import Image
except ImportError:
Image = None # type: ignore[assignment,misc]
# Minimum OCR word confidence to keep (used across multiple functions)
_MIN_WORD_CONF = 30
# ---------------------------------------------------------------------------
def _ocr_cell_crop(
row_idx: int,
col_idx: int,
row: RowGeometry,
col: PageRegion,
ocr_img: np.ndarray,
img_bgr: Optional[np.ndarray],
img_w: int,
img_h: int,
engine_name: str,
lang: str,
lang_map: Dict[str, str],
) -> Dict[str, Any]:
"""OCR a single cell by cropping the exact column×row intersection.
No padding beyond cell boundaries → no neighbour bleeding.
"""
# Display bbox: exact column × row intersection
disp_x = col.x
disp_y = row.y
disp_w = col.width
disp_h = row.height
# Crop boundaries: add small internal padding (3px each side) to avoid
# clipping characters near column/row edges (e.g. parentheses, descenders).
# Stays within image bounds but may extend slightly beyond strict cell.
# 3px is small enough to avoid neighbour content at typical scan DPI (200-300).
_PAD = 3
cx = max(0, disp_x - _PAD)
cy = max(0, disp_y - _PAD)
cx2 = min(img_w, disp_x + disp_w + _PAD)
cy2 = min(img_h, disp_y + disp_h + _PAD)
cw = cx2 - cx
ch = cy2 - cy
empty_cell = {
'cell_id': f"R{row_idx:02d}_C{col_idx}",
'row_index': row_idx,
'col_index': col_idx,
'col_type': col.type,
'text': '',
'confidence': 0.0,
'bbox_px': {'x': disp_x, 'y': disp_y, 'w': disp_w, 'h': disp_h},
'bbox_pct': {
'x': round(disp_x / img_w * 100, 2) if img_w else 0,
'y': round(disp_y / img_h * 100, 2) if img_h else 0,
'w': round(disp_w / img_w * 100, 2) if img_w else 0,
'h': round(disp_h / img_h * 100, 2) if img_h else 0,
},
'ocr_engine': 'cell_crop_v2',
'is_bold': False,
}
if cw <= 0 or ch <= 0:
logger.debug("_ocr_cell_crop R%02d_C%d: zero-size crop (%dx%d)", row_idx, col_idx, cw, ch)
return empty_cell
# --- Pixel-density check: skip truly empty cells ---
if ocr_img is not None:
crop = ocr_img[cy:cy + ch, cx:cx + cw]
if crop.size > 0:
dark_ratio = float(np.count_nonzero(crop < 180)) / crop.size
if dark_ratio < 0.005:
logger.debug("_ocr_cell_crop R%02d_C%d: skip empty (dark_ratio=%.4f, crop=%dx%d)",
row_idx, col_idx, dark_ratio, cw, ch)
return empty_cell
# --- Prepare crop for OCR ---
cell_lang = lang_map.get(col.type, lang)
psm = _select_psm_for_column(col.type, col.width, row.height)
text = ''
avg_conf = 0.0
used_engine = 'cell_crop_v2'
if engine_name in ("trocr-printed", "trocr-handwritten") and img_bgr is not None:
cell_region = PageRegion(type=col.type, x=cx, y=cy, width=cw, height=ch)
words = ocr_region_trocr(img_bgr, cell_region,
handwritten=(engine_name == "trocr-handwritten"))
elif engine_name == "lighton" and img_bgr is not None:
cell_region = PageRegion(type=col.type, x=cx, y=cy, width=cw, height=ch)
words = ocr_region_lighton(img_bgr, cell_region)
elif engine_name == "rapid" and img_bgr is not None:
# Upscale small BGR crops for RapidOCR.
# Cell crops typically have height 35-55px but width >300px.
# _ensure_minimum_crop_size only scales when EITHER dim < min_dim,
# using uniform scale → a 365×54 crop becomes ~1014×150 (scale ~2.78).
# For very short heights (< 80px), force 3× upscale for better OCR
# of small characters like periods, ellipsis, and phonetic symbols.
bgr_crop = img_bgr[cy:cy + ch, cx:cx + cw]
if bgr_crop.size == 0:
words = []
else:
crop_h, crop_w = bgr_crop.shape[:2]
if crop_h < 80:
# Force 3× upscale for short rows — small chars need more pixels
scale = 3.0
bgr_up = cv2.resize(bgr_crop, None, fx=scale, fy=scale,
interpolation=cv2.INTER_CUBIC)
else:
bgr_up = _ensure_minimum_crop_size(bgr_crop, min_dim=150, max_scale=3)
up_h, up_w = bgr_up.shape[:2]
scale_x = up_w / max(crop_w, 1)
scale_y = up_h / max(crop_h, 1)
was_scaled = (up_w != crop_w or up_h != crop_h)
logger.debug("_ocr_cell_crop R%02d_C%d: rapid %dx%d -> %dx%d (scale=%.1fx)",
row_idx, col_idx, crop_w, crop_h, up_w, up_h, scale_y)
tmp_region = PageRegion(type=col.type, x=0, y=0, width=up_w, height=up_h)
words = ocr_region_rapid(bgr_up, tmp_region)
# Remap positions back to original image coords
if words and was_scaled:
for w in words:
w['left'] = int(w['left'] / scale_x) + cx
w['top'] = int(w['top'] / scale_y) + cy
w['width'] = int(w['width'] / scale_x)
w['height'] = int(w['height'] / scale_y)
elif words:
for w in words:
w['left'] += cx
w['top'] += cy
else:
# Tesseract: upscale tiny crops for better recognition
if ocr_img is not None:
crop_slice = ocr_img[cy:cy + ch, cx:cx + cw]
upscaled = _ensure_minimum_crop_size(crop_slice)
up_h, up_w = upscaled.shape[:2]
tmp_region = PageRegion(type=col.type, x=0, y=0, width=up_w, height=up_h)
words = ocr_region(upscaled, tmp_region, lang=cell_lang, psm=psm)
# Remap word positions back to original image coordinates
if words and (up_w != cw or up_h != ch):
sx = cw / max(up_w, 1)
sy = ch / max(up_h, 1)
for w in words:
w['left'] = int(w['left'] * sx) + cx
w['top'] = int(w['top'] * sy) + cy
w['width'] = int(w['width'] * sx)
w['height'] = int(w['height'] * sy)
elif words:
for w in words:
w['left'] += cx
w['top'] += cy
else:
words = []
# Filter low-confidence words
if words:
words = [w for w in words if w.get('conf', 0) >= _MIN_WORD_CONF]
if words:
y_tol = max(15, ch)
text = _words_to_reading_order_text(words, y_tolerance_px=y_tol)
avg_conf = round(sum(w['conf'] for w in words) / len(words), 1)
logger.debug("_ocr_cell_crop R%02d_C%d: OCR raw text=%r conf=%.1f nwords=%d crop=%dx%d psm=%s engine=%s",
row_idx, col_idx, text, avg_conf, len(words), cw, ch, psm, engine_name)
else:
logger.debug("_ocr_cell_crop R%02d_C%d: OCR returned NO words (crop=%dx%d psm=%s engine=%s)",
row_idx, col_idx, cw, ch, psm, engine_name)
# --- PSM 7 fallback for still-empty Tesseract cells ---
if not text.strip() and engine_name == "tesseract" and ocr_img is not None:
crop_slice = ocr_img[cy:cy + ch, cx:cx + cw]
upscaled = _ensure_minimum_crop_size(crop_slice)
up_h, up_w = upscaled.shape[:2]
tmp_region = PageRegion(type=col.type, x=0, y=0, width=up_w, height=up_h)
psm7_words = ocr_region(upscaled, tmp_region, lang=cell_lang, psm=7)
if psm7_words:
psm7_words = [w for w in psm7_words if w.get('conf', 0) >= _MIN_WORD_CONF]
if psm7_words:
p7_text = _words_to_reading_order_text(psm7_words, y_tolerance_px=10)
if p7_text.strip():
text = p7_text
avg_conf = round(
sum(w['conf'] for w in psm7_words) / len(psm7_words), 1
)
used_engine = 'cell_crop_v2_psm7'
# Remap PSM7 word positions back to original image coords
if up_w != cw or up_h != ch:
sx = cw / max(up_w, 1)
sy = ch / max(up_h, 1)
for w in psm7_words:
w['left'] = int(w['left'] * sx) + cx
w['top'] = int(w['top'] * sy) + cy
w['width'] = int(w['width'] * sx)
w['height'] = int(w['height'] * sy)
else:
for w in psm7_words:
w['left'] += cx
w['top'] += cy
words = psm7_words
# --- Noise filter ---
if text.strip():
pre_filter = text
text = _clean_cell_text_lite(text)
if not text:
logger.debug("_ocr_cell_crop R%02d_C%d: _clean_cell_text_lite REMOVED %r",
row_idx, col_idx, pre_filter)
avg_conf = 0.0
result = dict(empty_cell)
result['text'] = text
result['confidence'] = avg_conf
result['ocr_engine'] = used_engine
# Store individual word bounding boxes (absolute image coordinates)
# for pixel-accurate overlay positioning in the frontend.
if words and text.strip():
result['word_boxes'] = [
{
'text': w.get('text', ''),
'left': w['left'],
'top': w['top'],
'width': w['width'],
'height': w['height'],
'conf': w.get('conf', 0),
}
for w in words
if w.get('text', '').strip()
]
return result
# Threshold: columns narrower than this (% of image width) use single-cell
# crop OCR instead of full-page word assignment.
#
# Broad columns (>= threshold): Full-page Tesseract word assignment.
# Better for multi-word content (sentences, IPA brackets, punctuation).
# Examples: EN vocabulary, DE translation, example sentences.
#
# Narrow columns (< threshold): Isolated cell-crop OCR.
# Prevents neighbour bleeding from adjacent broad columns.
# Examples: page_ref, marker, numbering columns.
#
# 15% was empirically validated across vocab table scans with 3-5 columns.
# Typical broad columns: 20-40% width. Typical narrow columns: 3-12% width.
# The 15% boundary cleanly separates the two groups.
_NARROW_COL_THRESHOLD_PCT = 15.0
def build_cell_grid_v2(
ocr_img: np.ndarray,
column_regions: List[PageRegion],
row_geometries: List[RowGeometry],
img_w: int,
img_h: int,
lang: str = "eng+deu",
ocr_engine: str = "auto",
img_bgr: Optional[np.ndarray] = None,
skip_heal_gaps: bool = False,
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""Hybrid Grid: full-page OCR for broad columns, cell-crop for narrow ones.
Drop-in replacement for build_cell_grid() — same signature & return type.
Strategy:
- Broad columns (>15% image width): Use pre-assigned full-page Tesseract
words (from row.words). Handles IPA brackets, punctuation, sentence
continuity correctly.
- Narrow columns (<15% image width): Use isolated cell-crop OCR to prevent
neighbour bleeding from adjacent broad columns.
"""
engine_name = "tesseract"
if ocr_engine in ("trocr-printed", "trocr-handwritten", "lighton"):
engine_name = ocr_engine
elif ocr_engine == "rapid" and RAPIDOCR_AVAILABLE:
engine_name = "rapid"
logger.info(f"build_cell_grid_v2: using OCR engine '{engine_name}' (hybrid mode)")
# Filter to content rows only
content_rows = [r for r in row_geometries if r.row_type == 'content']
if not content_rows:
logger.warning("build_cell_grid_v2: no content rows found")
return [], []
# Filter phantom rows (word_count=0) and artifact rows
before = len(content_rows)
content_rows = [r for r in content_rows if r.word_count > 0]
skipped = before - len(content_rows)
if skipped > 0:
logger.info(f"build_cell_grid_v2: skipped {skipped} phantom rows (word_count=0)")
if not content_rows:
logger.warning("build_cell_grid_v2: no content rows with words found")
return [], []
before_art = len(content_rows)
content_rows = [r for r in content_rows if not _is_artifact_row(r)]
artifact_skipped = before_art - len(content_rows)
if artifact_skipped > 0:
logger.info(f"build_cell_grid_v2: skipped {artifact_skipped} artifact rows")
if not content_rows:
logger.warning("build_cell_grid_v2: no content rows after artifact filtering")
return [], []
# Filter columns
_skip_types = {'column_ignore', 'header', 'footer', 'margin_top',
'margin_bottom', 'margin_left', 'margin_right'}
relevant_cols = [c for c in column_regions if c.type not in _skip_types]
if not relevant_cols:
logger.warning("build_cell_grid_v2: no usable columns found")
return [], []
# Heal row gaps — use header/footer boundaries
content_rows.sort(key=lambda r: r.y)
header_rows = [r for r in row_geometries if r.row_type == 'header']
footer_rows = [r for r in row_geometries if r.row_type == 'footer']
if header_rows:
top_bound = max(r.y + r.height for r in header_rows)
else:
top_bound = content_rows[0].y
if footer_rows:
bottom_bound = min(r.y for r in footer_rows)
else:
bottom_bound = content_rows[-1].y + content_rows[-1].height
# skip_heal_gaps: When True, keep cell positions at their exact row geometry
# positions without expanding to fill gaps from removed rows. Useful for
# overlay rendering where pixel-precise positioning matters more than
# full-coverage OCR crops.
if not skip_heal_gaps:
_heal_row_gaps(content_rows, top_bound=top_bound, bottom_bound=bottom_bound)
relevant_cols.sort(key=lambda c: c.x)
columns_meta = [
{'index': ci, 'type': c.type, 'x': c.x, 'width': c.width}
for ci, c in enumerate(relevant_cols)
]
lang_map = {
'column_en': 'eng',
'column_de': 'deu',
'column_example': 'eng+deu',
}
# --- Classify columns as broad vs narrow ---
narrow_col_indices = set()
for ci, col in enumerate(relevant_cols):
col_pct = (col.width / img_w * 100) if img_w > 0 else 0
if col_pct < _NARROW_COL_THRESHOLD_PCT:
narrow_col_indices.add(ci)
broad_col_count = len(relevant_cols) - len(narrow_col_indices)
logger.info(f"build_cell_grid_v2: {broad_col_count} broad columns (full-page), "
f"{len(narrow_col_indices)} narrow columns (cell-crop)")
# --- Phase 1: Broad columns via full-page word assignment ---
cells: List[Dict[str, Any]] = []
for row_idx, row in enumerate(content_rows):
# Assign full-page words to columns for this row
col_words = _assign_row_words_to_columns(row, relevant_cols)
for col_idx, col in enumerate(relevant_cols):
if col_idx not in narrow_col_indices:
# BROAD column: use pre-assigned full-page words
words = col_words.get(col_idx, [])
# Filter low-confidence words
words = [w for w in words if w.get('conf', 0) >= _MIN_WORD_CONF]
# Single full-width column (box sub-session): preserve spacing
is_single_full_column = (
len(relevant_cols) == 1
and img_w > 0
and relevant_cols[0].width / img_w > 0.9
)
if words:
y_tol = max(15, row.height)
if is_single_full_column:
text = _words_to_spaced_text(words, y_tolerance_px=y_tol)
logger.info(f"R{row_idx:02d}: {len(words)} words, "
f"text={text!r:.100}")
else:
text = _words_to_reading_order_text(words, y_tolerance_px=y_tol)
avg_conf = round(sum(w['conf'] for w in words) / len(words), 1)
else:
text = ''
avg_conf = 0.0
if is_single_full_column:
logger.info(f"R{row_idx:02d}: 0 words (row has "
f"{row.word_count} total, y={row.y}..{row.y+row.height})")
# Apply noise filter — but NOT for single-column sub-sessions:
# 1. _clean_cell_text strips trailing non-alpha tokens (e.g. €0.50,
# £1, €2.50) which are valid content in box layouts.
# 2. _clean_cell_text joins tokens with single space, destroying
# the proportional spacing from _words_to_spaced_text.
if not is_single_full_column:
text = _clean_cell_text(text)
cell = {
'cell_id': f"R{row_idx:02d}_C{col_idx}",
'row_index': row_idx,
'col_index': col_idx,
'col_type': col.type,
'text': text,
'confidence': avg_conf,
'bbox_px': {
'x': col.x, 'y': row.y,
'w': col.width, 'h': row.height,
},
'bbox_pct': {
'x': round(col.x / img_w * 100, 2) if img_w else 0,
'y': round(row.y / img_h * 100, 2) if img_h else 0,
'w': round(col.width / img_w * 100, 2) if img_w else 0,
'h': round(row.height / img_h * 100, 2) if img_h else 0,
},
'ocr_engine': 'word_lookup',
'is_bold': False,
}
cells.append(cell)
# --- Phase 2: Narrow columns via cell-crop OCR (parallel) ---
narrow_tasks = []
for row_idx, row in enumerate(content_rows):
for col_idx, col in enumerate(relevant_cols):
if col_idx in narrow_col_indices:
narrow_tasks.append((row_idx, col_idx, row, col))
if narrow_tasks:
max_workers = 4 if engine_name == "tesseract" else 2
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {
pool.submit(
_ocr_cell_crop,
ri, ci, row, col,
ocr_img, img_bgr, img_w, img_h,
engine_name, lang, lang_map,
): (ri, ci)
for ri, ci, row, col in narrow_tasks
}
for future in as_completed(futures):
try:
cell = future.result()
cells.append(cell)
except Exception as e:
ri, ci = futures[future]
logger.error(f"build_cell_grid_v2: narrow cell R{ri:02d}_C{ci} failed: {e}")
# Sort cells by (row_index, col_index)
cells.sort(key=lambda c: (c['row_index'], c['col_index']))
# Remove all-empty rows
rows_with_text: set = set()
for cell in cells:
if cell['text'].strip():
rows_with_text.add(cell['row_index'])
before_filter = len(cells)
cells = [c for c in cells if c['row_index'] in rows_with_text]
empty_rows_removed = (before_filter - len(cells)) // max(len(relevant_cols), 1)
if empty_rows_removed > 0:
logger.info(f"build_cell_grid_v2: removed {empty_rows_removed} all-empty rows")
# Bold detection disabled: cell-level stroke-width analysis cannot
# distinguish bold from non-bold when cells contain mixed formatting
# (e.g. "cookie ['kuki]" — bold word + non-bold phonetics).
# TODO: word-level bold detection would require per-word bounding boxes.
logger.info(f"build_cell_grid_v2: {len(cells)} cells from "
f"{len(content_rows)} rows × {len(relevant_cols)} columns, "
f"engine={engine_name} (hybrid)")
return cells, columns_meta
def build_cell_grid_v2_streaming(
ocr_img: np.ndarray,
column_regions: List[PageRegion],
row_geometries: List[RowGeometry],
img_w: int,
img_h: int,
lang: str = "eng+deu",
ocr_engine: str = "auto",
img_bgr: Optional[np.ndarray] = None,
) -> Generator[Tuple[Dict[str, Any], List[Dict[str, Any]], int], None, None]:
"""Streaming variant of build_cell_grid_v2 — yields each cell as OCR'd.
Yields:
(cell_dict, columns_meta, total_cells)
"""
# Resolve engine — default to Tesseract for cell-first OCR.
# Tesseract excels at isolated text crops (binarized, upscaled).
# RapidOCR is optimized for full-page scene-text and produces artifacts
# on small cell crops (extra chars, missing punctuation, garbled IPA).
use_rapid = False
if ocr_engine in ("trocr-printed", "trocr-handwritten", "lighton"):
engine_name = ocr_engine
elif ocr_engine == "auto":
engine_name = "tesseract"
elif ocr_engine == "rapid":
if not RAPIDOCR_AVAILABLE:
logger.warning("RapidOCR requested but not available, falling back to Tesseract")
else:
use_rapid = True
engine_name = "rapid" if use_rapid else "tesseract"
else:
engine_name = "tesseract"
content_rows = [r for r in row_geometries if r.row_type == 'content']
if not content_rows:
return
content_rows = [r for r in content_rows if r.word_count > 0]
if not content_rows:
return
_skip_types = {'column_ignore', 'header', 'footer', 'margin_top',
'margin_bottom', 'margin_left', 'margin_right'}
relevant_cols = [c for c in column_regions if c.type not in _skip_types]
if not relevant_cols:
return
content_rows = [r for r in content_rows if not _is_artifact_row(r)]
if not content_rows:
return
# Use header/footer boundaries for heal_row_gaps (same as build_cell_grid_v2)
content_rows.sort(key=lambda r: r.y)
header_rows = [r for r in row_geometries if r.row_type == 'header']
footer_rows = [r for r in row_geometries if r.row_type == 'footer']
if header_rows:
top_bound = max(r.y + r.height for r in header_rows)
else:
top_bound = content_rows[0].y
if footer_rows:
bottom_bound = min(r.y for r in footer_rows)
else:
bottom_bound = content_rows[-1].y + content_rows[-1].height
_heal_row_gaps(content_rows, top_bound=top_bound, bottom_bound=bottom_bound)
relevant_cols.sort(key=lambda c: c.x)
columns_meta = [
{'index': ci, 'type': c.type, 'x': c.x, 'width': c.width}
for ci, c in enumerate(relevant_cols)
]
lang_map = {
'column_en': 'eng',
'column_de': 'deu',
'column_example': 'eng+deu',
}
total_cells = len(content_rows) * len(relevant_cols)
for row_idx, row in enumerate(content_rows):
for col_idx, col in enumerate(relevant_cols):
cell = _ocr_cell_crop(
row_idx, col_idx, row, col,
ocr_img, img_bgr, img_w, img_h,
engine_name, lang, lang_map,
)
yield cell, columns_meta, total_cells
# ---------------------------------------------------------------------------
# Narrow-column OCR helpers (Proposal B) — DEPRECATED (kept for legacy build_cell_grid)
# ---------------------------------------------------------------------------
def _compute_cell_padding(col_width: int, img_w: int) -> int:
"""Adaptive padding for OCR crops based on column width.
Narrow columns (page_ref, marker) need more surrounding context so
Tesseract can segment characters correctly. Wide columns keep the
minimal 4 px padding to avoid pulling in neighbours.
"""
col_pct = col_width / img_w * 100 if img_w > 0 else 100
if col_pct < 5:
return max(20, col_width // 2)
if col_pct < 10:
return max(12, col_width // 4)
if col_pct < 15:
return 8
return 4
def _ensure_minimum_crop_size(crop: np.ndarray, min_dim: int = 150,
max_scale: int = 3) -> np.ndarray:
"""Upscale tiny crops so Tesseract gets enough pixel data.
If either dimension is below *min_dim*, the crop is bicubic-upscaled
so the smallest dimension reaches *min_dim* (capped at *max_scale* ×).
"""
h, w = crop.shape[:2]
if h >= min_dim and w >= min_dim:
return crop
scale = min(max_scale, max(min_dim / max(h, 1), min_dim / max(w, 1)))
if scale <= 1.0:
return crop
new_w = int(w * scale)
new_h = int(h * scale)
return cv2.resize(crop, (new_w, new_h), interpolation=cv2.INTER_CUBIC)
def _select_psm_for_column(col_type: str, col_width: int,
row_height: int) -> int:
"""Choose the best Tesseract PSM for a given column geometry.
- page_ref columns are almost always single short tokens → PSM 8
- Very narrow or short cells → PSM 7 (single text line)
- Everything else → PSM 6 (uniform block)
"""
if col_type in ('page_ref', 'marker'):
return 8 # single word
if col_width < 100 or row_height < 30:
return 7 # single line
return 6 # uniform block
def _ocr_single_cell(
row_idx: int,
col_idx: int,
row: RowGeometry,
col: PageRegion,
ocr_img: np.ndarray,
img_bgr: Optional[np.ndarray],
img_w: int,
img_h: int,
use_rapid: bool,
engine_name: str,
lang: str,
lang_map: Dict[str, str],
preassigned_words: Optional[List[Dict]] = None,
) -> Dict[str, Any]:
"""Populate a single cell (column x row intersection) via word lookup."""
# Display bbox: exact column × row intersection (no padding)
disp_x = col.x
disp_y = row.y
disp_w = col.width
disp_h = row.height
# OCR crop: adaptive padding — narrow columns get more context
pad = _compute_cell_padding(col.width, img_w)
cell_x = max(0, col.x - pad)
cell_y = max(0, row.y - pad)
cell_w = min(col.width + 2 * pad, img_w - cell_x)
cell_h = min(row.height + 2 * pad, img_h - cell_y)
is_narrow = (col.width / img_w * 100) < 15 if img_w > 0 else False
if disp_w <= 0 or disp_h <= 0:
return {
'cell_id': f"R{row_idx:02d}_C{col_idx}",
'row_index': row_idx,
'col_index': col_idx,
'col_type': col.type,
'text': '',
'confidence': 0.0,
'bbox_px': {'x': col.x, 'y': row.y, 'w': col.width, 'h': row.height},
'bbox_pct': {
'x': round(col.x / img_w * 100, 2),
'y': round(row.y / img_h * 100, 2),
'w': round(col.width / img_w * 100, 2),
'h': round(row.height / img_h * 100, 2),
},
'ocr_engine': 'word_lookup',
}
# --- PRIMARY: Word-lookup from full-page Tesseract ---
words = preassigned_words if preassigned_words is not None else []
used_engine = 'word_lookup'
# Filter low-confidence words (OCR noise from images/artifacts).
# Tesseract gives low confidence to misread image edges, borders,
# and other non-text elements.
if words:
words = [w for w in words if w.get('conf', 0) >= _MIN_WORD_CONF]
if words:
# Use row height as Y-tolerance so all words within a single row
# are grouped onto one line (avoids splitting e.g. "Maus, Mäuse"
# across two lines due to slight vertical offset).
y_tol = max(15, row.height)
text = _words_to_reading_order_text(words, y_tolerance_px=y_tol)
avg_conf = round(sum(w['conf'] for w in words) / len(words), 1)
else:
text = ''
avg_conf = 0.0
# --- FALLBACK: Cell-OCR for empty cells ---
# Full-page Tesseract can miss small or isolated words (e.g. "Ei").
# Re-run OCR on the cell crop to catch what word-lookup missed.
# To avoid wasting time on truly empty cells, check pixel density first:
# only run Tesseract if the cell crop contains enough dark pixels to
# plausibly contain text.
_run_fallback = False
if not text.strip() and cell_w > 0 and cell_h > 0:
if ocr_img is not None:
crop = ocr_img[cell_y:cell_y + cell_h, cell_x:cell_x + cell_w]
if crop.size > 0:
# Threshold: pixels darker than 180 (on 0-255 grayscale).
# Use 0.5% to catch even small text like "Ei" (2 chars)
# in an otherwise empty cell.
dark_ratio = float(np.count_nonzero(crop < 180)) / crop.size
_run_fallback = dark_ratio > 0.005
if _run_fallback:
# For narrow columns, upscale the crop before OCR
if is_narrow and ocr_img is not None:
_crop_slice = ocr_img[cell_y:cell_y + cell_h, cell_x:cell_x + cell_w]
_upscaled = _ensure_minimum_crop_size(_crop_slice)
if _upscaled is not _crop_slice:
# Build a temporary full-size image with the upscaled crop
# placed at origin so ocr_region can crop it cleanly.
_up_h, _up_w = _upscaled.shape[:2]
_tmp_region = PageRegion(
type=col.type, x=0, y=0, width=_up_w, height=_up_h,
)
_cell_psm = _select_psm_for_column(col.type, col.width, row.height)
cell_lang = lang_map.get(col.type, lang)
fallback_words = ocr_region(_upscaled, _tmp_region,
lang=cell_lang, psm=_cell_psm)
# Remap word positions back to original image coordinates
_sx = cell_w / max(_up_w, 1)
_sy = cell_h / max(_up_h, 1)
for _fw in (fallback_words or []):
_fw['left'] = int(_fw['left'] * _sx) + cell_x
_fw['top'] = int(_fw['top'] * _sy) + cell_y
_fw['width'] = int(_fw['width'] * _sx)
_fw['height'] = int(_fw['height'] * _sy)
else:
# No upscaling needed, use adaptive PSM
cell_region = PageRegion(
type=col.type, x=cell_x, y=cell_y,
width=cell_w, height=cell_h,
)
_cell_psm = _select_psm_for_column(col.type, col.width, row.height)
cell_lang = lang_map.get(col.type, lang)
fallback_words = ocr_region(ocr_img, cell_region,
lang=cell_lang, psm=_cell_psm)
else:
cell_region = PageRegion(
type=col.type,
x=cell_x, y=cell_y,
width=cell_w, height=cell_h,
)
if engine_name in ("trocr-printed", "trocr-handwritten") and img_bgr is not None:
fallback_words = ocr_region_trocr(img_bgr, cell_region, handwritten=(engine_name == "trocr-handwritten"))
elif engine_name == "lighton" and img_bgr is not None:
fallback_words = ocr_region_lighton(img_bgr, cell_region)
elif use_rapid and img_bgr is not None:
fallback_words = ocr_region_rapid(img_bgr, cell_region)
else:
_cell_psm = _select_psm_for_column(col.type, col.width, row.height)
cell_lang = lang_map.get(col.type, lang)
fallback_words = ocr_region(ocr_img, cell_region,
lang=cell_lang, psm=_cell_psm)
if fallback_words:
# Apply same confidence filter to fallback words
fallback_words = [w for w in fallback_words if w.get('conf', 0) >= _MIN_WORD_CONF]
if fallback_words:
fb_avg_h = sum(w['height'] for w in fallback_words) / len(fallback_words)
fb_y_tol = max(10, int(fb_avg_h * 0.5))
fb_text = _words_to_reading_order_text(fallback_words, y_tolerance_px=fb_y_tol)
if fb_text.strip():
text = fb_text
avg_conf = round(
sum(w['conf'] for w in fallback_words) / len(fallback_words), 1
)
used_engine = 'cell_ocr_fallback'
# --- SECONDARY FALLBACK: PSM=7 (single line) for still-empty cells ---
if not text.strip() and _run_fallback and not use_rapid:
_fb_region = PageRegion(
type=col.type, x=cell_x, y=cell_y,
width=cell_w, height=cell_h,
)
cell_lang = lang_map.get(col.type, lang)
psm7_words = ocr_region(ocr_img, _fb_region, lang=cell_lang, psm=7)
if psm7_words:
psm7_words = [w for w in psm7_words if w.get('conf', 0) >= _MIN_WORD_CONF]
if psm7_words:
p7_text = _words_to_reading_order_text(psm7_words, y_tolerance_px=10)
if p7_text.strip():
text = p7_text
avg_conf = round(
sum(w['conf'] for w in psm7_words) / len(psm7_words), 1
)
used_engine = 'cell_ocr_psm7'
# --- TERTIARY FALLBACK: Row-strip re-OCR for narrow columns ---
# If a narrow cell is still empty, OCR the entire row strip with
# RapidOCR (which handles small text better) and assign words by
# X-position overlap with this column.
if not text.strip() and is_narrow and img_bgr is not None:
row_region = PageRegion(
type='_row_strip', x=0, y=row.y,
width=img_w, height=row.height,
)
strip_words = ocr_region_rapid(img_bgr, row_region)
if strip_words:
# Filter to words overlapping this column's X-range
col_left = col.x
col_right = col.x + col.width
col_words = []
for sw in strip_words:
sw_left = sw.get('left', 0)
sw_right = sw_left + sw.get('width', 0)
overlap = max(0, min(sw_right, col_right) - max(sw_left, col_left))
if overlap > sw.get('width', 1) * 0.3:
col_words.append(sw)
if col_words:
col_words = [w for w in col_words if w.get('conf', 0) >= _MIN_WORD_CONF]
if col_words:
rs_text = _words_to_reading_order_text(col_words, y_tolerance_px=row.height)
if rs_text.strip():
text = rs_text
avg_conf = round(
sum(w['conf'] for w in col_words) / len(col_words), 1
)
used_engine = 'row_strip_rapid'
# --- NOISE FILTER: clear cells that contain only OCR artifacts ---
if text.strip():
text = _clean_cell_text(text)
if not text:
avg_conf = 0.0
return {
'cell_id': f"R{row_idx:02d}_C{col_idx}",
'row_index': row_idx,
'col_index': col_idx,
'col_type': col.type,
'text': text,
'confidence': avg_conf,
'bbox_px': {'x': disp_x, 'y': disp_y, 'w': disp_w, 'h': disp_h},
'bbox_pct': {
'x': round(disp_x / img_w * 100, 2),
'y': round(disp_y / img_h * 100, 2),
'w': round(disp_w / img_w * 100, 2),
'h': round(disp_h / img_h * 100, 2),
},
'ocr_engine': used_engine,
}
def _is_artifact_row(row: RowGeometry) -> bool:
"""Return True if this row contains only scan artifacts, not real text.
Artifact rows (scanner shadows, noise) typically produce only single-character
detections. A real content row always has at least one token with 2+ characters.
"""
if row.word_count == 0:
return True
texts = [w.get('text', '').strip() for w in row.words]
return all(len(t) <= 1 for t in texts)
def _heal_row_gaps(
rows: List[RowGeometry],
top_bound: int,
bottom_bound: int,
) -> None:
"""Expand row y/height to fill vertical gaps caused by removed adjacent rows.
After filtering out empty or artifact rows, remaining content rows may have
gaps between them where the removed rows used to be. This function mutates
each row to extend upward/downward to the midpoint of such gaps so that
OCR crops cover the full available content area.
The first row always extends to top_bound; the last row to bottom_bound.
"""
if not rows:
return
rows.sort(key=lambda r: r.y)
n = len(rows)
orig = [(r.y, r.y + r.height) for r in rows] # snapshot before mutation
for i, row in enumerate(rows):
# New top: midpoint between previous row's bottom and this row's top
if i == 0:
new_top = top_bound
else:
prev_bot = orig[i - 1][1]
my_top = orig[i][0]
gap = my_top - prev_bot
new_top = prev_bot + gap // 2 if gap > 1 else my_top
# New bottom: midpoint between this row's bottom and next row's top
if i == n - 1:
new_bottom = bottom_bound
else:
my_bot = orig[i][1]
next_top = orig[i + 1][0]
gap = next_top - my_bot
new_bottom = my_bot + gap // 2 if gap > 1 else my_bot
row.y = new_top
row.height = max(5, new_bottom - new_top)
logger.debug(
f"_heal_row_gaps: {n} rows → y range [{rows[0].y}..{rows[-1].y + rows[-1].height}] "
f"(bounds: top={top_bound}, bottom={bottom_bound})"
)
def build_cell_grid(
ocr_img: np.ndarray,
column_regions: List[PageRegion],
row_geometries: List[RowGeometry],
img_w: int,
img_h: int,
lang: str = "eng+deu",
ocr_engine: str = "auto",
img_bgr: Optional[np.ndarray] = None,
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""Generic Cell-Grid: Columns × Rows → cells with OCR text.
This is the layout-agnostic foundation. Every column (except column_ignore)
is intersected with every content row to produce numbered cells.
Args:
ocr_img: Binarized full-page image (for Tesseract).
column_regions: Classified columns from Step 3 (PageRegion list).
row_geometries: Rows from Step 4 (RowGeometry list).
img_w: Image width in pixels.
img_h: Image height in pixels.
lang: Default Tesseract language.
ocr_engine: 'tesseract', 'rapid', 'auto', 'trocr-printed', 'trocr-handwritten', or 'lighton'.
img_bgr: BGR color image (required for RapidOCR / TrOCR / LightOnOCR).
Returns:
(cells, columns_meta) where cells is a list of cell dicts and
columns_meta describes the columns used.
"""
# Resolve engine choice
use_rapid = False
if ocr_engine in ("trocr-printed", "trocr-handwritten", "lighton"):
engine_name = ocr_engine
elif ocr_engine == "auto":
use_rapid = RAPIDOCR_AVAILABLE and img_bgr is not None
engine_name = "rapid" if use_rapid else "tesseract"
elif ocr_engine == "rapid":
if not RAPIDOCR_AVAILABLE:
logger.warning("RapidOCR requested but not available, falling back to Tesseract")
else:
use_rapid = True
engine_name = "rapid" if use_rapid else "tesseract"
else:
engine_name = "tesseract"
logger.info(f"build_cell_grid: using OCR engine '{engine_name}'")
# Filter to content rows only (skip header/footer)
content_rows = [r for r in row_geometries if r.row_type == 'content']
if not content_rows:
logger.warning("build_cell_grid: no content rows found")
return [], []
# Filter phantom rows: rows with no Tesseract words assigned are
# inter-line whitespace gaps that would produce garbage OCR.
before = len(content_rows)
content_rows = [r for r in content_rows if r.word_count > 0]
skipped = before - len(content_rows)
if skipped > 0:
logger.info(f"build_cell_grid: skipped {skipped} phantom rows (word_count=0)")
if not content_rows:
logger.warning("build_cell_grid: no content rows with words found")
return [], []
# Use columns only — skip ignore, header, footer, page_ref
_skip_types = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'margin_left', 'margin_right'}
relevant_cols = [c for c in column_regions if c.type not in _skip_types]
if not relevant_cols:
logger.warning("build_cell_grid: no usable columns found")
return [], []
# Filter artifact rows: rows whose detected words are all single characters
# are caused by scanner shadows or noise, not real text.
before_art = len(content_rows)
content_rows = [r for r in content_rows if not _is_artifact_row(r)]
artifact_skipped = before_art - len(content_rows)
if artifact_skipped > 0:
logger.info(f"build_cell_grid: skipped {artifact_skipped} artifact rows (all single-char words)")
if not content_rows:
logger.warning("build_cell_grid: no content rows after artifact filtering")
return [], []
# Heal row gaps: rows removed above leave vertical gaps; expand adjacent rows
# to fill the space so OCR crops are not artificially narrow.
_heal_row_gaps(
content_rows,
top_bound=min(c.y for c in relevant_cols),
bottom_bound=max(c.y + c.height for c in relevant_cols),
)
# Sort columns left-to-right
relevant_cols.sort(key=lambda c: c.x)
# Build columns_meta
columns_meta = [
{
'index': col_idx,
'type': col.type,
'x': col.x,
'width': col.width,
}
for col_idx, col in enumerate(relevant_cols)
]
# Choose OCR language per column type (Tesseract only)
lang_map = {
'column_en': 'eng',
'column_de': 'deu',
'column_example': 'eng+deu',
}
cells: List[Dict[str, Any]] = []
for row_idx, row in enumerate(content_rows):
# Pre-assign each word to exactly one column (nearest center)
col_words = _assign_row_words_to_columns(row, relevant_cols)
for col_idx, col in enumerate(relevant_cols):
cell = _ocr_single_cell(
row_idx, col_idx, row, col,
ocr_img, img_bgr, img_w, img_h,
use_rapid, engine_name, lang, lang_map,
preassigned_words=col_words[col_idx],
)
cells.append(cell)
# --- BATCH FALLBACK: re-OCR empty cells by column strip ---
# Collect cells that are still empty but have visible pixels.
# Instead of calling Tesseract once per cell (expensive), crop an entire
# column strip and run OCR once, then assign words to cells by Y position.
empty_by_col: Dict[int, List[int]] = {} # col_idx → [cell list indices]
for ci, cell in enumerate(cells):
if not cell['text'].strip() and cell.get('ocr_engine') != 'cell_ocr_psm7':
bpx = cell['bbox_px']
x, y, w, h = bpx['x'], bpx['y'], bpx['w'], bpx['h']
if w > 0 and h > 0 and ocr_img is not None:
crop = ocr_img[y:y + h, x:x + w]
if crop.size > 0:
dark_ratio = float(np.count_nonzero(crop < 180)) / crop.size
if dark_ratio > 0.005:
empty_by_col.setdefault(cell['col_index'], []).append(ci)
for col_idx, cell_indices in empty_by_col.items():
if len(cell_indices) < 3:
continue # Not worth batching for < 3 cells
# Find the column strip bounding box (union of all empty cell bboxes)
min_y = min(cells[ci]['bbox_px']['y'] for ci in cell_indices)
max_y_h = max(cells[ci]['bbox_px']['y'] + cells[ci]['bbox_px']['h'] for ci in cell_indices)
col_x = cells[cell_indices[0]]['bbox_px']['x']
col_w = cells[cell_indices[0]]['bbox_px']['w']
strip_region = PageRegion(
type=relevant_cols[col_idx].type,
x=col_x, y=min_y,
width=col_w, height=max_y_h - min_y,
)
strip_lang = lang_map.get(relevant_cols[col_idx].type, lang)
if engine_name in ("trocr-printed", "trocr-handwritten") and img_bgr is not None:
strip_words = ocr_region_trocr(img_bgr, strip_region, handwritten=(engine_name == "trocr-handwritten"))
elif engine_name == "lighton" and img_bgr is not None:
strip_words = ocr_region_lighton(img_bgr, strip_region)
elif use_rapid and img_bgr is not None:
strip_words = ocr_region_rapid(img_bgr, strip_region)
else:
strip_words = ocr_region(ocr_img, strip_region, lang=strip_lang, psm=6)
if not strip_words:
continue
strip_words = [w for w in strip_words if w.get('conf', 0) >= 30]
if not strip_words:
continue
# Assign words to cells by Y overlap
for ci in cell_indices:
cell_y = cells[ci]['bbox_px']['y']
cell_h = cells[ci]['bbox_px']['h']
cell_mid_y = cell_y + cell_h / 2
matched_words = [
w for w in strip_words
if abs((w['top'] + w['height'] / 2) - cell_mid_y) < cell_h * 0.8
]
if matched_words:
matched_words.sort(key=lambda w: w['left'])
batch_text = ' '.join(w['text'] for w in matched_words)
batch_text = _clean_cell_text(batch_text)
if batch_text.strip():
cells[ci]['text'] = batch_text
cells[ci]['confidence'] = round(
sum(w['conf'] for w in matched_words) / len(matched_words), 1
)
cells[ci]['ocr_engine'] = 'batch_column_ocr'
batch_filled = sum(1 for ci in cell_indices if cells[ci]['text'].strip())
if batch_filled > 0:
logger.info(
f"build_cell_grid: batch OCR filled {batch_filled}/{len(cell_indices)} "
f"empty cells in column {col_idx}"
)
# Post-OCR: remove rows where ALL cells are empty (inter-row gaps
# that had stray Tesseract artifacts giving word_count > 0).
rows_with_text: set = set()
for cell in cells:
if cell['text'].strip():
rows_with_text.add(cell['row_index'])
before_filter = len(cells)
cells = [c for c in cells if c['row_index'] in rows_with_text]
empty_rows_removed = (before_filter - len(cells)) // max(len(relevant_cols), 1)
if empty_rows_removed > 0:
logger.info(f"build_cell_grid: removed {empty_rows_removed} all-empty rows after OCR")
logger.info(f"build_cell_grid: {len(cells)} cells from "
f"{len(content_rows)} rows × {len(relevant_cols)} columns, "
f"engine={engine_name}")
return cells, columns_meta
def build_cell_grid_streaming(
ocr_img: np.ndarray,
column_regions: List[PageRegion],
row_geometries: List[RowGeometry],
img_w: int,
img_h: int,
lang: str = "eng+deu",
ocr_engine: str = "auto",
img_bgr: Optional[np.ndarray] = None,
) -> Generator[Tuple[Dict[str, Any], List[Dict[str, Any]], int], None, None]:
"""Like build_cell_grid(), but yields each cell as it is OCR'd.
Yields:
(cell_dict, columns_meta, total_cells) for each cell.
"""
# Resolve engine choice (same as build_cell_grid)
use_rapid = False
if ocr_engine in ("trocr-printed", "trocr-handwritten", "lighton"):
engine_name = ocr_engine
elif ocr_engine == "auto":
use_rapid = RAPIDOCR_AVAILABLE and img_bgr is not None
engine_name = "rapid" if use_rapid else "tesseract"
elif ocr_engine == "rapid":
if not RAPIDOCR_AVAILABLE:
logger.warning("RapidOCR requested but not available, falling back to Tesseract")
else:
use_rapid = True
engine_name = "rapid" if use_rapid else "tesseract"
else:
engine_name = "tesseract"
content_rows = [r for r in row_geometries if r.row_type == 'content']
if not content_rows:
return
# Filter phantom rows: rows with no Tesseract words assigned are
# inter-line whitespace gaps that would produce garbage OCR.
before = len(content_rows)
content_rows = [r for r in content_rows if r.word_count > 0]
skipped = before - len(content_rows)
if skipped > 0:
logger.info(f"build_cell_grid_streaming: skipped {skipped} phantom rows (word_count=0)")
if not content_rows:
return
_skip_types = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'margin_left', 'margin_right'}
relevant_cols = [c for c in column_regions if c.type not in _skip_types]
if not relevant_cols:
return
# Filter artifact rows + heal gaps (same logic as build_cell_grid)
before_art = len(content_rows)
content_rows = [r for r in content_rows if not _is_artifact_row(r)]
artifact_skipped = before_art - len(content_rows)
if artifact_skipped > 0:
logger.info(f"build_cell_grid_streaming: skipped {artifact_skipped} artifact rows")
if not content_rows:
return
_heal_row_gaps(
content_rows,
top_bound=min(c.y for c in relevant_cols),
bottom_bound=max(c.y + c.height for c in relevant_cols),
)
relevant_cols.sort(key=lambda c: c.x)
columns_meta = [
{
'index': col_idx,
'type': col.type,
'x': col.x,
'width': col.width,
}
for col_idx, col in enumerate(relevant_cols)
]
lang_map = {
'column_en': 'eng',
'column_de': 'deu',
'column_example': 'eng+deu',
}
total_cells = len(content_rows) * len(relevant_cols)
for row_idx, row in enumerate(content_rows):
# Pre-assign each word to exactly one column (nearest center)
col_words = _assign_row_words_to_columns(row, relevant_cols)
for col_idx, col in enumerate(relevant_cols):
cell = _ocr_single_cell(
row_idx, col_idx, row, col,
ocr_img, img_bgr, img_w, img_h,
use_rapid, engine_name, lang, lang_map,
preassigned_words=col_words[col_idx],
)
yield cell, columns_meta, total_cells
def _cells_to_vocab_entries(
cells: List[Dict[str, Any]],
columns_meta: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Map generic cells to vocab entries with english/german/example fields.
Groups cells by row_index, maps col_type → field name, and produces
one entry per row (only rows with at least one non-empty field).
"""
# Determine image dimensions from first cell (for row-level bbox)
col_type_to_field = {
'column_en': 'english',
'column_de': 'german',
'column_example': 'example',
'page_ref': 'source_page',
'column_marker': 'marker',
'column_text': 'text', # generic single-column (box sub-sessions)
}
bbox_key_map = {
'column_en': 'bbox_en',
'column_de': 'bbox_de',
'column_example': 'bbox_ex',
'page_ref': 'bbox_ref',
'column_marker': 'bbox_marker',
'column_text': 'bbox_text',
}
# Group cells by row_index
rows: Dict[int, List[Dict]] = {}
for cell in cells:
ri = cell['row_index']
rows.setdefault(ri, []).append(cell)
entries: List[Dict[str, Any]] = []
for row_idx in sorted(rows.keys()):
row_cells = rows[row_idx]
entry: Dict[str, Any] = {
'row_index': row_idx,
'english': '',
'german': '',
'example': '',
'text': '', # generic single-column (box sub-sessions)
'source_page': '',
'marker': '',
'confidence': 0.0,
'bbox': None,
'bbox_en': None,
'bbox_de': None,
'bbox_ex': None,
'bbox_ref': None,
'bbox_marker': None,
'bbox_text': None,
'ocr_engine': row_cells[0].get('ocr_engine', '') if row_cells else '',
}
confidences = []
for cell in row_cells:
col_type = cell['col_type']
field = col_type_to_field.get(col_type)
if field:
entry[field] = cell['text']
bbox_field = bbox_key_map.get(col_type)
if bbox_field:
entry[bbox_field] = cell['bbox_pct']
if cell['confidence'] > 0:
confidences.append(cell['confidence'])
# Compute row-level bbox as union of all cell bboxes
all_bboxes = [c['bbox_pct'] for c in row_cells if c.get('bbox_pct')]
if all_bboxes:
min_x = min(b['x'] for b in all_bboxes)
min_y = min(b['y'] for b in all_bboxes)
max_x2 = max(b['x'] + b['w'] for b in all_bboxes)
max_y2 = max(b['y'] + b['h'] for b in all_bboxes)
entry['bbox'] = {
'x': round(min_x, 2),
'y': round(min_y, 2),
'w': round(max_x2 - min_x, 2),
'h': round(max_y2 - min_y, 2),
}
entry['confidence'] = round(
sum(confidences) / len(confidences), 1
) if confidences else 0.0
# Only include if at least one mapped field has text
has_content = any(
entry.get(f)
for f in col_type_to_field.values()
)
if has_content:
entries.append(entry)
return entries
# Regex: line starts with phonetic bracket content only (no real word before it)
_PHONETIC_ONLY_RE = re.compile(
r'''^\s*[\[\('"]*[^\]]*[\])\s]*$'''
)
def _is_phonetic_only_text(text: str) -> bool:
"""Check if text consists only of phonetic transcription.
Phonetic-only patterns:
['mani serva] → True
[dɑːns] → True
["a:mand] → True
almond ['a:mand] → False (has real word before bracket)
Mandel → False
"""
t = text.strip()
if not t:
return False
# Must contain at least one bracket
if '[' not in t and ']' not in t:
return False
# Remove all bracket content and surrounding punctuation/whitespace
without_brackets = re.sub(r"\[.*?\]", '', t)
without_brackets = re.sub(r"[\[\]'\"()\s]", '', without_brackets)
# If nothing meaningful remains, it's phonetic-only
alpha_remaining = ''.join(_RE_ALPHA.findall(without_brackets))
return len(alpha_remaining) < 2
def _merge_phonetic_continuation_rows(
entries: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Merge rows that contain only phonetic transcription into previous entry.
In dictionary pages, phonetic transcription sometimes wraps to the next
row. E.g.:
Row 28: EN="it's a money-saver" DE="es spart Kosten"
Row 29: EN="['mani serva]" DE=""
Row 29 is phonetic-only → merge into row 28's EN field.
"""
if len(entries) < 2:
return entries
merged: List[Dict[str, Any]] = []
for entry in entries:
en = (entry.get('english') or '').strip()
de = (entry.get('german') or '').strip()
ex = (entry.get('example') or '').strip()
# Check if this entry is phonetic-only (EN has only phonetics, DE empty)
if merged and _is_phonetic_only_text(en) and not de:
prev = merged[-1]
prev_en = (prev.get('english') or '').strip()
# Append phonetic to previous entry's EN
if prev_en:
prev['english'] = prev_en + ' ' + en
else:
prev['english'] = en
# If there was an example, append to previous too
if ex:
prev_ex = (prev.get('example') or '').strip()
prev['example'] = (prev_ex + ' ' + ex).strip() if prev_ex else ex
logger.debug(
f"Merged phonetic row {entry.get('row_index')} "
f"into previous entry: {prev['english']!r}"
)
continue
merged.append(entry)
return merged
def _merge_continuation_rows(
entries: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Merge multi-line vocabulary entries where text wraps to the next row.
A row is a continuation of the previous entry when:
- EN has text, but DE is empty
- EN starts with a lowercase letter (not a new vocab entry)
- Previous entry's EN does NOT end with a sentence terminator (.!?)
- The continuation text has fewer than 4 words (not an example sentence)
- The row was not already merged as phonetic
Example:
Row 5: EN="to put up" DE="aufstellen"
Row 6: EN="with sth." DE=""
→ Merged: EN="to put up with sth." DE="aufstellen"
"""
if len(entries) < 2:
return entries
merged: List[Dict[str, Any]] = []
for entry in entries:
en = (entry.get('english') or '').strip()
de = (entry.get('german') or '').strip()
if merged and en and not de:
# Check: not phonetic (already handled)
if _is_phonetic_only_text(en):
merged.append(entry)
continue
# Check: starts with lowercase
first_alpha = next((c for c in en if c.isalpha()), '')
starts_lower = first_alpha and first_alpha.islower()
# Check: fewer than 4 words (not an example sentence)
word_count = len(en.split())
is_short = word_count < 4
# Check: previous entry doesn't end with sentence terminator
prev = merged[-1]
prev_en = (prev.get('english') or '').strip()
prev_ends_sentence = prev_en and prev_en[-1] in '.!?'
if starts_lower and is_short and not prev_ends_sentence:
# Merge into previous entry
prev['english'] = (prev_en + ' ' + en).strip()
# Merge example if present
ex = (entry.get('example') or '').strip()
if ex:
prev_ex = (prev.get('example') or '').strip()
prev['example'] = (prev_ex + ' ' + ex).strip() if prev_ex else ex
logger.debug(
f"Merged continuation row {entry.get('row_index')} "
f"into previous entry: {prev['english']!r}"
)
continue
merged.append(entry)
return merged
def build_word_grid(
ocr_img: np.ndarray,
column_regions: List[PageRegion],
row_geometries: List[RowGeometry],
img_w: int,
img_h: int,
lang: str = "eng+deu",
ocr_engine: str = "auto",
img_bgr: Optional[np.ndarray] = None,
pronunciation: str = "british",
) -> List[Dict[str, Any]]:
"""Vocab-specific: Cell-Grid + Vocab-Mapping + Post-Processing.
Wrapper around build_cell_grid() that adds vocabulary-specific logic:
- Maps cells to english/german/example entries
- Applies character confusion fixes, IPA lookup, comma splitting, etc.
- Falls back to returning raw cells if no vocab columns detected.
Args:
ocr_img: Binarized full-page image (for Tesseract).
column_regions: Classified columns from Step 3.
row_geometries: Rows from Step 4.
img_w, img_h: Image dimensions.
lang: Default Tesseract language.
ocr_engine: 'tesseract', 'rapid', or 'auto'.
img_bgr: BGR color image (required for RapidOCR).
pronunciation: 'british' or 'american' for IPA lookup.
Returns:
List of entry dicts with english/german/example text and bbox info (percent).
"""
cells, columns_meta = build_cell_grid(
ocr_img, column_regions, row_geometries, img_w, img_h,
lang=lang, ocr_engine=ocr_engine, img_bgr=img_bgr,
)
if not cells:
return []
# Check if vocab layout is present
col_types = {c['type'] for c in columns_meta}
if not (col_types & {'column_en', 'column_de'}):
logger.info("build_word_grid: no vocab columns — returning raw cells")
return cells
# Vocab mapping: cells → entries
entries = _cells_to_vocab_entries(cells, columns_meta)
# --- Post-processing pipeline (deterministic, no LLM) ---
n_raw = len(entries)
# 0a. Merge phonetic-only continuation rows into previous entry
entries = _merge_phonetic_continuation_rows(entries)
# 0b. Merge multi-line continuation rows (lowercase EN, empty DE)
entries = _merge_continuation_rows(entries)
# 1. Character confusion (| → I, 1 → I, 8 → B) is now run in
# llm_review_entries_streaming so changes are visible to the user in Step 6.
# 2. Replace OCR'd phonetics with dictionary IPA
entries = _fix_phonetic_brackets(entries, pronunciation=pronunciation)
# 3. Split comma-separated word forms (break, broke, broken → 3 entries)
entries = _split_comma_entries(entries)
# 4. Attach example sentences (rows without DE → examples for preceding entry)
entries = _attach_example_sentences(entries)
engine_name = cells[0].get('ocr_engine', 'unknown') if cells else 'unknown'
logger.info(f"build_word_grid: {len(entries)} entries from "
f"{n_raw} raw → {len(entries)} after post-processing "
f"(engine={engine_name})")
return entries