feat(ocr-pipeline): generic sub-column detection via left-edge clustering

Detects hidden sub-columns (e.g. page references like "p.59") within
already-recognized columns by clustering word left-edge positions and
splitting when a clear minority cluster exists. The sub-column is then
classified as page_ref and mapped to VocabRow.source_page.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-02 18:18:02 +01:00
parent 0532b2a797
commit 1a246eb059
3 changed files with 343 additions and 2 deletions

View File

@@ -140,6 +140,7 @@ class VocabRow:
english: str = ""
german: str = ""
example: str = ""
source_page: str = ""
confidence: float = 0.0
y_position: int = 0
@@ -1033,6 +1034,147 @@ def _detect_columns_by_clustering(
)
def _detect_sub_columns(
geometries: List[ColumnGeometry],
content_w: int,
) -> List[ColumnGeometry]:
"""Split columns that contain internal sub-columns based on left-edge clustering.
Detects cases where a minority of words in a column are left-aligned at a
different position than the majority (e.g. page references "p.59" next to
vocabulary words).
Returns a new list of ColumnGeometry — potentially longer than the input.
"""
if content_w <= 0:
return geometries
result: List[ColumnGeometry] = []
for geo in geometries:
# Only consider wide-enough columns with enough words
if geo.width_ratio < 0.15 or geo.word_count < 5:
result.append(geo)
continue
# Collect left-edges of confident words
left_edges: List[int] = []
for w in geo.words:
if w.get('conf', 0) >= 30:
left_edges.append(w['left'])
if len(left_edges) < 3:
result.append(geo)
continue
# Sort and find the largest gap between consecutive left-edge values
sorted_edges = sorted(left_edges)
best_gap = 0
best_gap_pos = 0 # split point: values <= best_gap_pos go left
for i in range(len(sorted_edges) - 1):
gap = sorted_edges[i + 1] - sorted_edges[i]
if gap > best_gap:
best_gap = gap
best_gap_pos = (sorted_edges[i] + sorted_edges[i + 1]) // 2
# Gap must be significant relative to column width
min_gap = max(15, int(geo.width * 0.08))
if best_gap < min_gap:
result.append(geo)
continue
# Split words into left (minority candidate) and right groups
left_words = [w for w in geo.words if w.get('conf', 0) >= 30 and w['left'] <= best_gap_pos]
right_words = [w for w in geo.words if w.get('conf', 0) >= 30 and w['left'] > best_gap_pos]
# Also include low-conf words by position
for w in geo.words:
if w.get('conf', 0) < 30:
if w['left'] <= best_gap_pos:
left_words.append(w)
else:
right_words.append(w)
total = len(left_words) + len(right_words)
if total == 0:
result.append(geo)
continue
# Determine minority/majority
if len(left_words) <= len(right_words):
minority, majority = left_words, right_words
minority_is_left = True
else:
minority, majority = right_words, left_words
minority_is_left = False
# Check minority constraints
minority_ratio = len(minority) / total
if minority_ratio >= 0.35 or len(minority) < 2:
result.append(geo)
continue
# Build two sub-column geometries
if minority_is_left:
# Minority is left sub-column, majority is right
sub_x = geo.x
sub_width = best_gap_pos - geo.x
main_x = best_gap_pos
main_width = (geo.x + geo.width) - best_gap_pos
else:
# Minority is right sub-column, majority is left
main_x = geo.x
main_width = best_gap_pos - geo.x
sub_x = best_gap_pos
sub_width = (geo.x + geo.width) - best_gap_pos
# Sanity check widths
if sub_width <= 0 or main_width <= 0:
result.append(geo)
continue
sub_geo = ColumnGeometry(
index=0, # will be re-indexed below
x=sub_x,
y=geo.y,
width=sub_width,
height=geo.height,
word_count=len(minority),
words=minority,
width_ratio=sub_width / content_w if content_w > 0 else 0.0,
)
main_geo = ColumnGeometry(
index=0, # will be re-indexed below
x=main_x,
y=geo.y,
width=main_width,
height=geo.height,
word_count=len(majority),
words=majority,
width_ratio=main_width / content_w if content_w > 0 else 0.0,
)
# Insert in left-to-right order
if sub_x < main_x:
result.append(sub_geo)
result.append(main_geo)
else:
result.append(main_geo)
result.append(sub_geo)
logger.info(
f"SubColumnSplit: column idx={geo.index} split at gap={best_gap}px, "
f"minority={len(minority)} words (left={minority_is_left}), "
f"majority={len(majority)} words"
)
# Re-index by left-to-right order
result.sort(key=lambda g: g.x)
for i, g in enumerate(result):
g.index = i
return result
def _build_geometries_from_starts(
col_starts: List[Tuple[int, int]],
word_dicts: List[Dict],
@@ -2727,6 +2869,9 @@ def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Li
geometries, left_x, right_x, top_y, bottom_y, _word_dicts, _inv = result
content_w = right_x - left_x
# Split sub-columns (e.g. page references) before classification
geometries = _detect_sub_columns(geometries, content_w)
# Phase B: Content-based classification
regions = classify_column_types(geometries, content_w, top_y, w, h, bottom_y,
left_x=left_x, right_x=right_x, inv=_inv)
@@ -3841,7 +3986,7 @@ def build_cell_grid(
return [], []
# Use columns only — skip ignore, header, footer, page_ref
_skip_types = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'page_ref', 'margin_left', 'margin_right'}
_skip_types = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'margin_left', 'margin_right'}
relevant_cols = [c for c in column_regions if c.type not in _skip_types]
if not relevant_cols:
logger.warning("build_cell_grid: no usable columns found")
@@ -4003,7 +4148,7 @@ def build_cell_grid_streaming(
if not content_rows:
return
_skip_types = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'page_ref', 'margin_left', 'margin_right'}
_skip_types = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'margin_left', 'margin_right'}
relevant_cols = [c for c in column_regions if c.type not in _skip_types]
if not relevant_cols:
return
@@ -4055,11 +4200,13 @@ def _cells_to_vocab_entries(
'column_en': 'english',
'column_de': 'german',
'column_example': 'example',
'page_ref': 'source_page',
}
bbox_key_map = {
'column_en': 'bbox_en',
'column_de': 'bbox_de',
'column_example': 'bbox_ex',
'page_ref': 'bbox_ref',
}
# Group cells by row_index
@@ -4076,11 +4223,13 @@ def _cells_to_vocab_entries(
'english': '',
'german': '',
'example': '',
'source_page': '',
'confidence': 0.0,
'bbox': None,
'bbox_en': None,
'bbox_de': None,
'bbox_ex': None,
'bbox_ref': None,
'ocr_engine': row_cells[0].get('ocr_engine', '') if row_cells else '',
}

View File

@@ -34,6 +34,7 @@ from cv_vocab_pipeline import (
PageRegion,
RowGeometry,
_cells_to_vocab_entries,
_detect_sub_columns,
_fix_character_confusion,
_fix_phonetic_brackets,
analyze_layout,
@@ -698,6 +699,9 @@ async def detect_columns(session_id: str):
cached["_inv"] = inv
cached["_content_bounds"] = (left_x, right_x, top_y, bottom_y)
# Split sub-columns (e.g. page references) before classification
geometries = _detect_sub_columns(geometries, content_w)
# Phase B: Content-based classification
regions = classify_column_types(geometries, content_w, top_y, w, h, bottom_y,
left_x=left_x, right_x=right_x, inv=inv)

View File

@@ -24,6 +24,7 @@ from dataclasses import asdict
# Import module under test
from cv_vocab_pipeline import (
ColumnGeometry,
PageRegion,
VocabRow,
PipelineResult,
@@ -35,6 +36,7 @@ from cv_vocab_pipeline import (
_filter_narrow_runs,
_build_margin_regions,
_detect_header_footer_gaps,
_detect_sub_columns,
_region_has_content,
_add_header_footer,
analyze_layout,
@@ -1170,6 +1172,192 @@ class TestRegionContentCheck:
assert bottom_regions[0].type == 'footer'
# =============================================
# Sub-Column Detection Tests
# =============================================
class TestSubColumnDetection:
"""Tests for _detect_sub_columns() left-edge clustering."""
def _make_word(self, left: int, text: str = "word", conf: int = 90) -> dict:
return {'left': left, 'top': 100, 'width': 50, 'height': 20,
'text': text, 'conf': conf}
def _make_geo(self, x: int, width: int, words: list, content_w: int = 1000) -> ColumnGeometry:
return ColumnGeometry(
index=0, x=x, y=50, width=width, height=500,
word_count=len(words), words=words,
width_ratio=width / content_w,
)
def test_sub_column_split_page_refs(self):
"""Column with 3 'p.XX' left + 20 EN words right → split into 2."""
content_w = 1000
# 3 page-ref words at left=100, 20 vocab words at left=250
page_words = [self._make_word(100, f"p.{59+i}") for i in range(3)]
vocab_words = [self._make_word(250, f"word{i}") for i in range(20)]
all_words = page_words + vocab_words
geo = self._make_geo(x=80, width=300, words=all_words, content_w=content_w)
result = _detect_sub_columns([geo], content_w)
assert len(result) == 2, f"Expected 2 columns, got {len(result)}"
# Left sub-column should be narrower with fewer words
left_col = result[0]
right_col = result[1]
assert left_col.x < right_col.x
assert left_col.word_count == 3
assert right_col.word_count == 20
# Indices should be 0, 1
assert left_col.index == 0
assert right_col.index == 1
def test_no_split_uniform_alignment(self):
"""All words aligned at same position → no change."""
content_w = 1000
words = [self._make_word(200, f"word{i}") for i in range(15)]
geo = self._make_geo(x=180, width=300, words=words, content_w=content_w)
result = _detect_sub_columns([geo], content_w)
assert len(result) == 1
assert result[0].word_count == 15
def test_no_split_narrow_column(self):
"""Narrow column (width_ratio < 0.15) → no split attempted."""
content_w = 1000
words = [self._make_word(50, "a")] * 3 + [self._make_word(120, "b")] * 10
geo = self._make_geo(x=40, width=140, words=words, content_w=content_w)
# width_ratio = 140/1000 = 0.14 < 0.15
result = _detect_sub_columns([geo], content_w)
assert len(result) == 1
def test_no_split_balanced_clusters(self):
"""Both clusters similarly sized (ratio >= 0.35) → no split."""
content_w = 1000
left_words = [self._make_word(100, f"a{i}") for i in range(8)]
right_words = [self._make_word(300, f"b{i}") for i in range(12)]
all_words = left_words + right_words
geo = self._make_geo(x=80, width=400, words=all_words, content_w=content_w)
# 8/20 = 0.4 >= 0.35 → no split
result = _detect_sub_columns([geo], content_w)
assert len(result) == 1
def test_sub_column_reindexing(self):
"""After split, indices are correctly 0, 1, 2 across all columns."""
content_w = 1000
# First column: no split
words1 = [self._make_word(50, f"de{i}") for i in range(10)]
geo1 = ColumnGeometry(index=0, x=30, y=50, width=200, height=500,
word_count=10, words=words1, width_ratio=0.2)
# Second column: will split
page_words = [self._make_word(400, f"p.{i}") for i in range(3)]
en_words = [self._make_word(550, f"en{i}") for i in range(15)]
geo2 = ColumnGeometry(index=1, x=380, y=50, width=300, height=500,
word_count=18, words=page_words + en_words, width_ratio=0.3)
result = _detect_sub_columns([geo1, geo2], content_w)
assert len(result) == 3
assert [g.index for g in result] == [0, 1, 2]
# First column unchanged
assert result[0].word_count == 10
# Sub-column (page refs)
assert result[1].word_count == 3
# Main column (EN words)
assert result[2].word_count == 15
def test_no_split_too_few_words(self):
"""Column with fewer than 5 words → no split attempted."""
content_w = 1000
words = [self._make_word(100, "a"), self._make_word(300, "b"),
self._make_word(300, "c"), self._make_word(300, "d")]
geo = self._make_geo(x=80, width=300, words=words, content_w=content_w)
result = _detect_sub_columns([geo], content_w)
assert len(result) == 1
def test_no_split_single_minority_word(self):
"""Only 1 word in minority cluster → no split (need >= 2)."""
content_w = 1000
minority = [self._make_word(100, "p.59")]
majority = [self._make_word(300, f"w{i}") for i in range(20)]
geo = self._make_geo(x=80, width=350, words=minority + majority, content_w=content_w)
result = _detect_sub_columns([geo], content_w)
assert len(result) == 1
class TestCellsToVocabEntriesPageRef:
"""Test that page_ref cells are mapped to source_page field."""
def test_page_ref_mapped_to_source_page(self):
"""Cell with col_type='page_ref' → source_page field populated."""
from cv_vocab_pipeline import _cells_to_vocab_entries
cells = [
{
'row_index': 0,
'col_type': 'column_en',
'text': 'hello',
'bbox_pct': [10, 10, 30, 5],
'confidence': 95.0,
'ocr_engine': 'tesseract',
},
{
'row_index': 0,
'col_type': 'column_de',
'text': 'hallo',
'bbox_pct': [40, 10, 30, 5],
'confidence': 90.0,
'ocr_engine': 'tesseract',
},
{
'row_index': 0,
'col_type': 'page_ref',
'text': 'p.59',
'bbox_pct': [5, 10, 5, 5],
'confidence': 80.0,
'ocr_engine': 'tesseract',
},
]
entries = _cells_to_vocab_entries(cells)
assert len(entries) == 1
assert entries[0]['english'] == 'hello'
assert entries[0]['german'] == 'hallo'
assert entries[0]['source_page'] == 'p.59'
assert entries[0]['bbox_ref'] == [5, 10, 5, 5]
def test_no_page_ref_defaults_empty(self):
"""Without page_ref cell, source_page defaults to empty string."""
from cv_vocab_pipeline import _cells_to_vocab_entries
cells = [
{
'row_index': 0,
'col_type': 'column_en',
'text': 'world',
'bbox_pct': [10, 10, 30, 5],
'confidence': 95.0,
'ocr_engine': 'tesseract',
},
]
entries = _cells_to_vocab_entries(cells)
assert len(entries) == 1
assert entries[0]['source_page'] == ''
assert entries[0]['bbox_ref'] is None
# =============================================
# RUN TESTS
# =============================================