fix(ocr-pipeline): exclusive word-to-column assignment prevents duplicates

Replace per-cell word filtering (which allowed the same word to appear in
multiple columns due to padded overlap) with exclusive nearest-center
assignment. Each word is assigned to exactly one column per row.

Also use row height as Y-tolerance for text assembly so words within
the same row (e.g. "Maus, Mäuse") are always grouped on one line.

Fixes: words leaking into wrong columns, missing words, duplicate words.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-02 07:54:45 +01:00
parent 9bbde1c03e
commit 2c4160e4c4

View File

@@ -3009,46 +3009,52 @@ def _replace_phonetics_in_text(text: str, pronunciation: str = 'british') -> str
return _PHONETIC_BRACKET_RE.sub(replacer, text)
def _lookup_cell_words(
def _assign_row_words_to_columns(
row: RowGeometry,
col: PageRegion,
pad: int = 8,
) -> Tuple[List[Dict], float]:
"""Look up pre-existing Tesseract words that fall within a cell region.
columns: List[PageRegion],
) -> Dict[int, List[Dict]]:
"""Assign each word in a row to exactly one column (nearest center).
Instead of re-running OCR on a cell crop, this filters the full-page
Tesseract words (stored in row.words) by X-overlap with the column.
This prevents the same word from appearing in multiple cells when column
boundaries are close together. Each word is assigned to the column whose
horizontal center is closest to the word's horizontal center.
Words use coordinates relative to the content ROI; columns use absolute
coordinates. row.x equals the content-ROI left_x, so we convert with:
col_left_rel = col.x - row.x
Args:
row: Row with words (relative coordinates).
columns: Sorted list of columns (absolute coordinates).
Returns:
(words_in_cell, avg_confidence) where words_in_cell keep their
original relative coordinates (compatible with
_words_to_reading_order_text).
Dict mapping col_index → list of words assigned to that column.
"""
if not row.words:
return [], 0.0
result: Dict[int, List[Dict]] = {i: [] for i in range(len(columns))}
left_x = row.x # content ROI offset (absolute)
col_left_rel = col.x - left_x - pad
col_right_rel = col.x - left_x + col.width + pad
if not row.words or not columns:
return result
left_x = row.x # content ROI left (absolute)
# Pre-compute column centers in relative coordinates
col_centers_rel = []
for col in columns:
col_left_rel = col.x - left_x
col_center_rel = col_left_rel + col.width / 2
col_centers_rel.append(col_center_rel)
words_in_cell = []
for w in row.words:
w_left = w['left']
w_right = w_left + w['width']
# Word center must be within column bounds
w_center_x = (w_left + w_right) / 2
if col_left_rel <= w_center_x <= col_right_rel:
words_in_cell.append(w)
w_center_x = w['left'] + w['width'] / 2
avg_conf = 0.0
if words_in_cell:
avg_conf = round(sum(w['conf'] for w in words_in_cell) / len(words_in_cell), 1)
# Find nearest column by center distance
best_col = 0
best_dist = abs(w_center_x - col_centers_rel[0])
for ci in range(1, len(columns)):
dist = abs(w_center_x - col_centers_rel[ci])
if dist < best_dist:
best_dist = dist
best_col = ci
return words_in_cell, avg_conf
result[best_col].append(w)
return result
def _ocr_single_cell(
@@ -3064,6 +3070,7 @@ def _ocr_single_cell(
engine_name: str,
lang: str,
lang_map: Dict[str, str],
preassigned_words: Optional[List[Dict]] = None,
) -> Dict[str, Any]:
"""Populate a single cell (column x row intersection) via word lookup."""
pad = 8 # pixels
@@ -3096,19 +3103,21 @@ def _ocr_single_cell(
'ocr_engine': 'word_lookup',
}
# --- PRIMARY: Word-lookup from full-page Tesseract ---
# Use pre-existing words from row.words (Step 4) instead of
# re-running OCR on a small crop. This is more reliable because
# full-page Tesseract has better context for recognition.
words, avg_conf = _lookup_cell_words(row, col, pad=pad)
# Use pre-assigned words (exclusive per column) if provided
words = preassigned_words if preassigned_words is not None else []
if words:
avg_h = sum(w['height'] for w in words) / len(words)
y_tol = max(10, int(avg_h * 0.5))
# Use row height as Y-tolerance so all words within a single row
# are grouped onto one line (avoids splitting e.g. "Maus, Mäuse"
# across two lines due to slight vertical offset).
y_tol = max(15, row.height)
text = _words_to_reading_order_text(words, y_tolerance_px=y_tol)
else:
text = ''
avg_conf = 0.0
avg_conf = 0.0
if words:
avg_conf = round(sum(w['conf'] for w in words) / len(words), 1)
return {
'cell_id': f"R{row_idx:02d}_C{col_idx}",
@@ -3218,11 +3227,14 @@ def build_cell_grid(
cells: List[Dict[str, Any]] = []
for row_idx, row in enumerate(content_rows):
# Pre-assign each word to exactly one column (nearest center)
col_words = _assign_row_words_to_columns(row, relevant_cols)
for col_idx, col in enumerate(relevant_cols):
cell = _ocr_single_cell(
row_idx, col_idx, row, col,
ocr_img, img_bgr, img_w, img_h,
use_rapid, engine_name, lang, lang_map,
preassigned_words=col_words[col_idx],
)
cells.append(cell)
@@ -3300,11 +3312,14 @@ def build_cell_grid_streaming(
total_cells = len(content_rows) * len(relevant_cols)
for row_idx, row in enumerate(content_rows):
# Pre-assign each word to exactly one column (nearest center)
col_words = _assign_row_words_to_columns(row, relevant_cols)
for col_idx, col in enumerate(relevant_cols):
cell = _ocr_single_cell(
row_idx, col_idx, row, col,
ocr_img, img_bgr, img_w, img_h,
use_rapid, engine_name, lang, lang_map,
preassigned_words=col_words[col_idx],
)
yield cell, columns_meta, total_cells