refactor(ocr-pipeline): use left-edge alignment approach for sub-column detection
Replace gap-based splitting with alignment-bin approach: cluster word
left-edges within 8px tolerance, find the leftmost bin with >= 10% of
words as the true column start, split off any words to its left as a
sub-column. This correctly handles both page references ("p.59") and
misread exclamation marks ("!" → "I") even when the pixel gap is small.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1037,12 +1037,16 @@ def _detect_columns_by_clustering(
|
|||||||
def _detect_sub_columns(
|
def _detect_sub_columns(
|
||||||
geometries: List[ColumnGeometry],
|
geometries: List[ColumnGeometry],
|
||||||
content_w: int,
|
content_w: int,
|
||||||
|
_edge_tolerance: int = 8,
|
||||||
|
_min_col_start_ratio: float = 0.10,
|
||||||
) -> List[ColumnGeometry]:
|
) -> List[ColumnGeometry]:
|
||||||
"""Split columns that contain internal sub-columns based on left-edge clustering.
|
"""Split columns that contain internal sub-columns based on left-edge alignment.
|
||||||
|
|
||||||
Detects cases where a minority of words in a column are left-aligned at a
|
For each column, clusters word left-edges into alignment bins (within
|
||||||
different position than the majority (e.g. page references "p.59" next to
|
``_edge_tolerance`` px). The leftmost bin whose word count reaches
|
||||||
vocabulary words).
|
``_min_col_start_ratio`` of the column total is treated as the true column
|
||||||
|
start. Any words to the left of that bin form a sub-column, provided they
|
||||||
|
number >= 2 and < 35 % of total.
|
||||||
|
|
||||||
Returns a new list of ColumnGeometry — potentially longer than the input.
|
Returns a new list of ColumnGeometry — potentially longer than the input.
|
||||||
"""
|
"""
|
||||||
@@ -1057,114 +1061,86 @@ def _detect_sub_columns(
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
# Collect left-edges of confident words
|
# Collect left-edges of confident words
|
||||||
left_edges: List[int] = []
|
confident = [w for w in geo.words if w.get('conf', 0) >= 30]
|
||||||
for w in geo.words:
|
if len(confident) < 3:
|
||||||
if w.get('conf', 0) >= 30:
|
|
||||||
left_edges.append(w['left'])
|
|
||||||
|
|
||||||
if len(left_edges) < 3:
|
|
||||||
result.append(geo)
|
result.append(geo)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Sort and find the largest gap between consecutive left-edge values
|
# --- Cluster left-edges into alignment bins ---
|
||||||
sorted_edges = sorted(left_edges)
|
sorted_edges = sorted(w['left'] for w in confident)
|
||||||
best_gap = 0
|
bins: List[Tuple[int, int, int, int]] = [] # (center, count, min_edge, max_edge)
|
||||||
best_gap_pos = 0 # split point: values <= best_gap_pos go left
|
cur = [sorted_edges[0]]
|
||||||
for i in range(len(sorted_edges) - 1):
|
for i in range(1, len(sorted_edges)):
|
||||||
gap = sorted_edges[i + 1] - sorted_edges[i]
|
if sorted_edges[i] - cur[-1] <= _edge_tolerance:
|
||||||
if gap > best_gap:
|
cur.append(sorted_edges[i])
|
||||||
best_gap = gap
|
else:
|
||||||
best_gap_pos = (sorted_edges[i] + sorted_edges[i + 1]) // 2
|
bins.append((sum(cur) // len(cur), len(cur), min(cur), max(cur)))
|
||||||
|
cur = [sorted_edges[i]]
|
||||||
|
bins.append((sum(cur) // len(cur), len(cur), min(cur), max(cur)))
|
||||||
|
|
||||||
# Gap must be significant relative to column width
|
# --- Find the leftmost bin qualifying as a real column start ---
|
||||||
min_gap = max(15, int(geo.width * 0.08))
|
total = len(confident)
|
||||||
if best_gap < min_gap:
|
min_count = max(3, int(total * _min_col_start_ratio))
|
||||||
|
col_start_bin = None
|
||||||
|
for b in bins:
|
||||||
|
if b[1] >= min_count:
|
||||||
|
col_start_bin = b
|
||||||
|
break
|
||||||
|
|
||||||
|
if col_start_bin is None:
|
||||||
result.append(geo)
|
result.append(geo)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Split words into left (minority candidate) and right groups
|
# Words to the left of the column-start bin are sub-column candidates
|
||||||
left_words = [w for w in geo.words if w.get('conf', 0) >= 30 and w['left'] <= best_gap_pos]
|
split_threshold = col_start_bin[2] - _edge_tolerance
|
||||||
right_words = [w for w in geo.words if w.get('conf', 0) >= 30 and w['left'] > best_gap_pos]
|
sub_words = [w for w in geo.words if w['left'] < split_threshold]
|
||||||
|
main_words = [w for w in geo.words if w['left'] >= split_threshold]
|
||||||
|
|
||||||
# Also include low-conf words by position
|
if len(sub_words) < 2 or len(sub_words) / len(geo.words) >= 0.35:
|
||||||
for w in geo.words:
|
|
||||||
if w.get('conf', 0) < 30:
|
|
||||||
if w['left'] <= best_gap_pos:
|
|
||||||
left_words.append(w)
|
|
||||||
else:
|
|
||||||
right_words.append(w)
|
|
||||||
|
|
||||||
total = len(left_words) + len(right_words)
|
|
||||||
if total == 0:
|
|
||||||
result.append(geo)
|
result.append(geo)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Determine minority/majority
|
# --- Build two sub-column geometries ---
|
||||||
if len(left_words) <= len(right_words):
|
max_sub_left = max(w['left'] for w in sub_words)
|
||||||
minority, majority = left_words, right_words
|
split_x = (max_sub_left + col_start_bin[2]) // 2
|
||||||
minority_is_left = True
|
|
||||||
else:
|
|
||||||
minority, majority = right_words, left_words
|
|
||||||
minority_is_left = False
|
|
||||||
|
|
||||||
# Check minority constraints
|
sub_x = geo.x
|
||||||
minority_ratio = len(minority) / total
|
sub_width = split_x - geo.x
|
||||||
if minority_ratio >= 0.35 or len(minority) < 2:
|
main_x = split_x
|
||||||
result.append(geo)
|
main_width = (geo.x + geo.width) - split_x
|
||||||
continue
|
|
||||||
|
|
||||||
# Build two sub-column geometries
|
|
||||||
if minority_is_left:
|
|
||||||
# Minority is left sub-column, majority is right
|
|
||||||
sub_x = geo.x
|
|
||||||
sub_width = best_gap_pos - geo.x
|
|
||||||
main_x = best_gap_pos
|
|
||||||
main_width = (geo.x + geo.width) - best_gap_pos
|
|
||||||
else:
|
|
||||||
# Minority is right sub-column, majority is left
|
|
||||||
main_x = geo.x
|
|
||||||
main_width = best_gap_pos - geo.x
|
|
||||||
sub_x = best_gap_pos
|
|
||||||
sub_width = (geo.x + geo.width) - best_gap_pos
|
|
||||||
|
|
||||||
# Sanity check widths
|
|
||||||
if sub_width <= 0 or main_width <= 0:
|
if sub_width <= 0 or main_width <= 0:
|
||||||
result.append(geo)
|
result.append(geo)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
sub_geo = ColumnGeometry(
|
sub_geo = ColumnGeometry(
|
||||||
index=0, # will be re-indexed below
|
index=0,
|
||||||
x=sub_x,
|
x=sub_x,
|
||||||
y=geo.y,
|
y=geo.y,
|
||||||
width=sub_width,
|
width=sub_width,
|
||||||
height=geo.height,
|
height=geo.height,
|
||||||
word_count=len(minority),
|
word_count=len(sub_words),
|
||||||
words=minority,
|
words=sub_words,
|
||||||
width_ratio=sub_width / content_w if content_w > 0 else 0.0,
|
width_ratio=sub_width / content_w if content_w > 0 else 0.0,
|
||||||
)
|
)
|
||||||
main_geo = ColumnGeometry(
|
main_geo = ColumnGeometry(
|
||||||
index=0, # will be re-indexed below
|
index=0,
|
||||||
x=main_x,
|
x=main_x,
|
||||||
y=geo.y,
|
y=geo.y,
|
||||||
width=main_width,
|
width=main_width,
|
||||||
height=geo.height,
|
height=geo.height,
|
||||||
word_count=len(majority),
|
word_count=len(main_words),
|
||||||
words=majority,
|
words=main_words,
|
||||||
width_ratio=main_width / content_w if content_w > 0 else 0.0,
|
width_ratio=main_width / content_w if content_w > 0 else 0.0,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Insert in left-to-right order
|
result.append(sub_geo)
|
||||||
if sub_x < main_x:
|
result.append(main_geo)
|
||||||
result.append(sub_geo)
|
|
||||||
result.append(main_geo)
|
|
||||||
else:
|
|
||||||
result.append(main_geo)
|
|
||||||
result.append(sub_geo)
|
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"SubColumnSplit: column idx={geo.index} split at gap={best_gap}px, "
|
f"SubColumnSplit: column idx={geo.index} split at x={split_x}, "
|
||||||
f"minority={len(minority)} words (left={minority_is_left}), "
|
f"sub={len(sub_words)} words (left), main={len(main_words)} words, "
|
||||||
f"majority={len(majority)} words"
|
f"col_start_bin=({col_start_bin[0]}, n={col_start_bin[1]})"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Re-index by left-to-right order
|
# Re-index by left-to-right order
|
||||||
|
|||||||
@@ -1177,7 +1177,7 @@ class TestRegionContentCheck:
|
|||||||
# =============================================
|
# =============================================
|
||||||
|
|
||||||
class TestSubColumnDetection:
|
class TestSubColumnDetection:
|
||||||
"""Tests for _detect_sub_columns() left-edge clustering."""
|
"""Tests for _detect_sub_columns() left-edge alignment detection."""
|
||||||
|
|
||||||
def _make_word(self, left: int, text: str = "word", conf: int = 90) -> dict:
|
def _make_word(self, left: int, text: str = "word", conf: int = 90) -> dict:
|
||||||
return {'left': left, 'top': 100, 'width': 50, 'height': 20,
|
return {'left': left, 'top': 100, 'width': 50, 'height': 20,
|
||||||
@@ -1191,27 +1191,46 @@ class TestSubColumnDetection:
|
|||||||
)
|
)
|
||||||
|
|
||||||
def test_sub_column_split_page_refs(self):
|
def test_sub_column_split_page_refs(self):
|
||||||
"""Column with 3 'p.XX' left + 20 EN words right → split into 2."""
|
"""3 page-refs left + 30 vocab words right → split into 2.
|
||||||
|
|
||||||
|
The leftmost bin with >= 10% of words (i.e. >= 4) is the vocab bin
|
||||||
|
at left=250, so the 3 page-refs are outliers.
|
||||||
|
"""
|
||||||
content_w = 1000
|
content_w = 1000
|
||||||
# 3 page-ref words at left=100, 20 vocab words at left=250
|
|
||||||
page_words = [self._make_word(100, f"p.{59+i}") for i in range(3)]
|
page_words = [self._make_word(100, f"p.{59+i}") for i in range(3)]
|
||||||
vocab_words = [self._make_word(250, f"word{i}") for i in range(20)]
|
vocab_words = [self._make_word(250, f"word{i}") for i in range(30)]
|
||||||
all_words = page_words + vocab_words
|
all_words = page_words + vocab_words
|
||||||
geo = self._make_geo(x=80, width=300, words=all_words, content_w=content_w)
|
geo = self._make_geo(x=80, width=300, words=all_words, content_w=content_w)
|
||||||
|
|
||||||
result = _detect_sub_columns([geo], content_w)
|
result = _detect_sub_columns([geo], content_w)
|
||||||
|
|
||||||
assert len(result) == 2, f"Expected 2 columns, got {len(result)}"
|
assert len(result) == 2, f"Expected 2 columns, got {len(result)}"
|
||||||
# Left sub-column should be narrower with fewer words
|
|
||||||
left_col = result[0]
|
left_col = result[0]
|
||||||
right_col = result[1]
|
right_col = result[1]
|
||||||
assert left_col.x < right_col.x
|
assert left_col.x < right_col.x
|
||||||
assert left_col.word_count == 3
|
assert left_col.word_count == 3
|
||||||
assert right_col.word_count == 20
|
assert right_col.word_count == 30
|
||||||
# Indices should be 0, 1
|
|
||||||
assert left_col.index == 0
|
assert left_col.index == 0
|
||||||
assert right_col.index == 1
|
assert right_col.index == 1
|
||||||
|
|
||||||
|
def test_sub_column_split_exclamation_marks(self):
|
||||||
|
"""5 '!' (misread as I/|) left + 80 example words → split into 2.
|
||||||
|
|
||||||
|
Mirrors the real-world case where red ! marks are OCR'd as I, |, B, 1
|
||||||
|
at a position slightly left of the example sentence start.
|
||||||
|
"""
|
||||||
|
content_w = 1500
|
||||||
|
bang_words = [self._make_word(950 + i, chr(ord('I')), conf=60) for i in range(5)]
|
||||||
|
example_words = [self._make_word(975 + (i * 3), f"word{i}") for i in range(80)]
|
||||||
|
all_words = bang_words + example_words
|
||||||
|
geo = self._make_geo(x=940, width=530, words=all_words, content_w=content_w)
|
||||||
|
|
||||||
|
result = _detect_sub_columns([geo], content_w)
|
||||||
|
|
||||||
|
assert len(result) == 2
|
||||||
|
assert result[0].word_count == 5
|
||||||
|
assert result[1].word_count == 80
|
||||||
|
|
||||||
def test_no_split_uniform_alignment(self):
|
def test_no_split_uniform_alignment(self):
|
||||||
"""All words aligned at same position → no change."""
|
"""All words aligned at same position → no change."""
|
||||||
content_w = 1000
|
content_w = 1000
|
||||||
@@ -1228,7 +1247,6 @@ class TestSubColumnDetection:
|
|||||||
content_w = 1000
|
content_w = 1000
|
||||||
words = [self._make_word(50, "a")] * 3 + [self._make_word(120, "b")] * 10
|
words = [self._make_word(50, "a")] * 3 + [self._make_word(120, "b")] * 10
|
||||||
geo = self._make_geo(x=40, width=140, words=words, content_w=content_w)
|
geo = self._make_geo(x=40, width=140, words=words, content_w=content_w)
|
||||||
# width_ratio = 140/1000 = 0.14 < 0.15
|
|
||||||
|
|
||||||
result = _detect_sub_columns([geo], content_w)
|
result = _detect_sub_columns([geo], content_w)
|
||||||
|
|
||||||
@@ -1241,7 +1259,6 @@ class TestSubColumnDetection:
|
|||||||
right_words = [self._make_word(300, f"b{i}") for i in range(12)]
|
right_words = [self._make_word(300, f"b{i}") for i in range(12)]
|
||||||
all_words = left_words + right_words
|
all_words = left_words + right_words
|
||||||
geo = self._make_geo(x=80, width=400, words=all_words, content_w=content_w)
|
geo = self._make_geo(x=80, width=400, words=all_words, content_w=content_w)
|
||||||
# 8/20 = 0.4 >= 0.35 → no split
|
|
||||||
|
|
||||||
result = _detect_sub_columns([geo], content_w)
|
result = _detect_sub_columns([geo], content_w)
|
||||||
|
|
||||||
@@ -1250,26 +1267,23 @@ class TestSubColumnDetection:
|
|||||||
def test_sub_column_reindexing(self):
|
def test_sub_column_reindexing(self):
|
||||||
"""After split, indices are correctly 0, 1, 2 across all columns."""
|
"""After split, indices are correctly 0, 1, 2 across all columns."""
|
||||||
content_w = 1000
|
content_w = 1000
|
||||||
# First column: no split
|
# First column: no split (all words at same alignment)
|
||||||
words1 = [self._make_word(50, f"de{i}") for i in range(10)]
|
words1 = [self._make_word(50, f"de{i}") for i in range(10)]
|
||||||
geo1 = ColumnGeometry(index=0, x=30, y=50, width=200, height=500,
|
geo1 = ColumnGeometry(index=0, x=30, y=50, width=200, height=500,
|
||||||
word_count=10, words=words1, width_ratio=0.2)
|
word_count=10, words=words1, width_ratio=0.2)
|
||||||
# Second column: will split
|
# Second column: will split (3 outliers + 30 main)
|
||||||
page_words = [self._make_word(400, f"p.{i}") for i in range(3)]
|
page_words = [self._make_word(400, f"p.{i}") for i in range(3)]
|
||||||
en_words = [self._make_word(550, f"en{i}") for i in range(15)]
|
en_words = [self._make_word(550, f"en{i}") for i in range(30)]
|
||||||
geo2 = ColumnGeometry(index=1, x=380, y=50, width=300, height=500,
|
geo2 = ColumnGeometry(index=1, x=380, y=50, width=300, height=500,
|
||||||
word_count=18, words=page_words + en_words, width_ratio=0.3)
|
word_count=33, words=page_words + en_words, width_ratio=0.3)
|
||||||
|
|
||||||
result = _detect_sub_columns([geo1, geo2], content_w)
|
result = _detect_sub_columns([geo1, geo2], content_w)
|
||||||
|
|
||||||
assert len(result) == 3
|
assert len(result) == 3
|
||||||
assert [g.index for g in result] == [0, 1, 2]
|
assert [g.index for g in result] == [0, 1, 2]
|
||||||
# First column unchanged
|
|
||||||
assert result[0].word_count == 10
|
assert result[0].word_count == 10
|
||||||
# Sub-column (page refs)
|
|
||||||
assert result[1].word_count == 3
|
assert result[1].word_count == 3
|
||||||
# Main column (EN words)
|
assert result[2].word_count == 30
|
||||||
assert result[2].word_count == 15
|
|
||||||
|
|
||||||
def test_no_split_too_few_words(self):
|
def test_no_split_too_few_words(self):
|
||||||
"""Column with fewer than 5 words → no split attempted."""
|
"""Column with fewer than 5 words → no split attempted."""
|
||||||
@@ -1283,10 +1297,10 @@ class TestSubColumnDetection:
|
|||||||
assert len(result) == 1
|
assert len(result) == 1
|
||||||
|
|
||||||
def test_no_split_single_minority_word(self):
|
def test_no_split_single_minority_word(self):
|
||||||
"""Only 1 word in minority cluster → no split (need >= 2)."""
|
"""Only 1 word left of column start → no split (need >= 2)."""
|
||||||
content_w = 1000
|
content_w = 1000
|
||||||
minority = [self._make_word(100, "p.59")]
|
minority = [self._make_word(100, "p.59")]
|
||||||
majority = [self._make_word(300, f"w{i}") for i in range(20)]
|
majority = [self._make_word(300, f"w{i}") for i in range(30)]
|
||||||
geo = self._make_geo(x=80, width=350, words=minority + majority, content_w=content_w)
|
geo = self._make_geo(x=80, width=350, words=minority + majority, content_w=content_w)
|
||||||
|
|
||||||
result = _detect_sub_columns([geo], content_w)
|
result = _detect_sub_columns([geo], content_w)
|
||||||
|
|||||||
Reference in New Issue
Block a user