Compare commits
6 Commits
0532b2a797
...
3904ddb493
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3904ddb493 | ||
|
|
6e1a349eed | ||
|
|
7252f9a956 | ||
|
|
f13116345b | ||
|
|
991984d9c3 | ||
|
|
1a246eb059 |
@@ -140,6 +140,7 @@ class VocabRow:
|
||||
english: str = ""
|
||||
german: str = ""
|
||||
example: str = ""
|
||||
source_page: str = ""
|
||||
confidence: float = 0.0
|
||||
y_position: int = 0
|
||||
|
||||
@@ -1033,6 +1034,132 @@ def _detect_columns_by_clustering(
|
||||
)
|
||||
|
||||
|
||||
def _detect_sub_columns(
|
||||
geometries: List[ColumnGeometry],
|
||||
content_w: int,
|
||||
left_x: int = 0,
|
||||
_edge_tolerance: int = 8,
|
||||
_min_col_start_ratio: float = 0.10,
|
||||
) -> List[ColumnGeometry]:
|
||||
"""Split columns that contain internal sub-columns based on left-edge alignment.
|
||||
|
||||
For each column, clusters word left-edges into alignment bins (within
|
||||
``_edge_tolerance`` px). The leftmost bin whose word count reaches
|
||||
``_min_col_start_ratio`` of the column total is treated as the true column
|
||||
start. Any words to the left of that bin form a sub-column, provided they
|
||||
number >= 2 and < 35 % of total.
|
||||
|
||||
Word ``left`` values are relative to the content ROI (offset by *left_x*),
|
||||
while ``ColumnGeometry.x`` is in absolute image coordinates. *left_x*
|
||||
bridges the two coordinate systems.
|
||||
|
||||
Returns a new list of ColumnGeometry — potentially longer than the input.
|
||||
"""
|
||||
if content_w <= 0:
|
||||
return geometries
|
||||
|
||||
result: List[ColumnGeometry] = []
|
||||
for geo in geometries:
|
||||
# Only consider wide-enough columns with enough words
|
||||
if geo.width_ratio < 0.15 or geo.word_count < 5:
|
||||
result.append(geo)
|
||||
continue
|
||||
|
||||
# Collect left-edges of confident words
|
||||
confident = [w for w in geo.words if w.get('conf', 0) >= 30]
|
||||
if len(confident) < 3:
|
||||
result.append(geo)
|
||||
continue
|
||||
|
||||
# --- Cluster left-edges into alignment bins ---
|
||||
sorted_edges = sorted(w['left'] for w in confident)
|
||||
bins: List[Tuple[int, int, int, int]] = [] # (center, count, min_edge, max_edge)
|
||||
cur = [sorted_edges[0]]
|
||||
for i in range(1, len(sorted_edges)):
|
||||
if sorted_edges[i] - cur[-1] <= _edge_tolerance:
|
||||
cur.append(sorted_edges[i])
|
||||
else:
|
||||
bins.append((sum(cur) // len(cur), len(cur), min(cur), max(cur)))
|
||||
cur = [sorted_edges[i]]
|
||||
bins.append((sum(cur) // len(cur), len(cur), min(cur), max(cur)))
|
||||
|
||||
# --- Find the leftmost bin qualifying as a real column start ---
|
||||
total = len(confident)
|
||||
min_count = max(3, int(total * _min_col_start_ratio))
|
||||
col_start_bin = None
|
||||
for b in bins:
|
||||
if b[1] >= min_count:
|
||||
col_start_bin = b
|
||||
break
|
||||
|
||||
if col_start_bin is None:
|
||||
result.append(geo)
|
||||
continue
|
||||
|
||||
# Words to the left of the column-start bin are sub-column candidates
|
||||
split_threshold = col_start_bin[2] - _edge_tolerance
|
||||
sub_words = [w for w in geo.words if w['left'] < split_threshold]
|
||||
main_words = [w for w in geo.words if w['left'] >= split_threshold]
|
||||
|
||||
if len(sub_words) < 2 or len(sub_words) / len(geo.words) >= 0.35:
|
||||
result.append(geo)
|
||||
continue
|
||||
|
||||
# --- Build two sub-column geometries ---
|
||||
# Word 'left' values are relative to left_x; geo.x is absolute.
|
||||
# Convert the split position from relative to absolute coordinates.
|
||||
max_sub_left = max(w['left'] for w in sub_words)
|
||||
split_rel = (max_sub_left + col_start_bin[2]) // 2
|
||||
split_abs = split_rel + left_x
|
||||
|
||||
sub_x = geo.x
|
||||
sub_width = split_abs - geo.x
|
||||
main_x = split_abs
|
||||
main_width = (geo.x + geo.width) - split_abs
|
||||
|
||||
if sub_width <= 0 or main_width <= 0:
|
||||
result.append(geo)
|
||||
continue
|
||||
|
||||
sub_geo = ColumnGeometry(
|
||||
index=0,
|
||||
x=sub_x,
|
||||
y=geo.y,
|
||||
width=sub_width,
|
||||
height=geo.height,
|
||||
word_count=len(sub_words),
|
||||
words=sub_words,
|
||||
width_ratio=sub_width / content_w if content_w > 0 else 0.0,
|
||||
)
|
||||
main_geo = ColumnGeometry(
|
||||
index=0,
|
||||
x=main_x,
|
||||
y=geo.y,
|
||||
width=main_width,
|
||||
height=geo.height,
|
||||
word_count=len(main_words),
|
||||
words=main_words,
|
||||
width_ratio=main_width / content_w if content_w > 0 else 0.0,
|
||||
)
|
||||
|
||||
result.append(sub_geo)
|
||||
result.append(main_geo)
|
||||
|
||||
logger.info(
|
||||
f"SubColumnSplit: column idx={geo.index} split at abs_x={split_abs} "
|
||||
f"(rel={split_rel}), sub={len(sub_words)} words, "
|
||||
f"main={len(main_words)} words, "
|
||||
f"col_start_bin=({col_start_bin[0]}, n={col_start_bin[1]})"
|
||||
)
|
||||
|
||||
# Re-index by left-to-right order
|
||||
result.sort(key=lambda g: g.x)
|
||||
for i, g in enumerate(result):
|
||||
g.index = i
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _build_geometries_from_starts(
|
||||
col_starts: List[Tuple[int, int]],
|
||||
word_dicts: List[Dict],
|
||||
@@ -2727,6 +2854,9 @@ def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Li
|
||||
geometries, left_x, right_x, top_y, bottom_y, _word_dicts, _inv = result
|
||||
content_w = right_x - left_x
|
||||
|
||||
# Split sub-columns (e.g. page references) before classification
|
||||
geometries = _detect_sub_columns(geometries, content_w, left_x=left_x)
|
||||
|
||||
# Phase B: Content-based classification
|
||||
regions = classify_column_types(geometries, content_w, top_y, w, h, bottom_y,
|
||||
left_x=left_x, right_x=right_x, inv=_inv)
|
||||
@@ -3841,7 +3971,7 @@ def build_cell_grid(
|
||||
return [], []
|
||||
|
||||
# Use columns only — skip ignore, header, footer, page_ref
|
||||
_skip_types = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'page_ref', 'margin_left', 'margin_right'}
|
||||
_skip_types = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'margin_left', 'margin_right'}
|
||||
relevant_cols = [c for c in column_regions if c.type not in _skip_types]
|
||||
if not relevant_cols:
|
||||
logger.warning("build_cell_grid: no usable columns found")
|
||||
@@ -4003,7 +4133,7 @@ def build_cell_grid_streaming(
|
||||
if not content_rows:
|
||||
return
|
||||
|
||||
_skip_types = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'page_ref', 'margin_left', 'margin_right'}
|
||||
_skip_types = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'margin_left', 'margin_right'}
|
||||
relevant_cols = [c for c in column_regions if c.type not in _skip_types]
|
||||
if not relevant_cols:
|
||||
return
|
||||
@@ -4055,11 +4185,13 @@ def _cells_to_vocab_entries(
|
||||
'column_en': 'english',
|
||||
'column_de': 'german',
|
||||
'column_example': 'example',
|
||||
'page_ref': 'source_page',
|
||||
}
|
||||
bbox_key_map = {
|
||||
'column_en': 'bbox_en',
|
||||
'column_de': 'bbox_de',
|
||||
'column_example': 'bbox_ex',
|
||||
'page_ref': 'bbox_ref',
|
||||
}
|
||||
|
||||
# Group cells by row_index
|
||||
@@ -4076,11 +4208,13 @@ def _cells_to_vocab_entries(
|
||||
'english': '',
|
||||
'german': '',
|
||||
'example': '',
|
||||
'source_page': '',
|
||||
'confidence': 0.0,
|
||||
'bbox': None,
|
||||
'bbox_en': None,
|
||||
'bbox_de': None,
|
||||
'bbox_ex': None,
|
||||
'bbox_ref': None,
|
||||
'ocr_engine': row_cells[0].get('ocr_engine', '') if row_cells else '',
|
||||
}
|
||||
|
||||
|
||||
@@ -34,6 +34,7 @@ from cv_vocab_pipeline import (
|
||||
PageRegion,
|
||||
RowGeometry,
|
||||
_cells_to_vocab_entries,
|
||||
_detect_sub_columns,
|
||||
_fix_character_confusion,
|
||||
_fix_phonetic_brackets,
|
||||
analyze_layout,
|
||||
@@ -698,6 +699,9 @@ async def detect_columns(session_id: str):
|
||||
cached["_inv"] = inv
|
||||
cached["_content_bounds"] = (left_x, right_x, top_y, bottom_y)
|
||||
|
||||
# Split sub-columns (e.g. page references) before classification
|
||||
geometries = _detect_sub_columns(geometries, content_w, left_x=left_x)
|
||||
|
||||
# Phase B: Content-based classification
|
||||
regions = classify_column_types(geometries, content_w, top_y, w, h, bottom_y,
|
||||
left_x=left_x, right_x=right_x, inv=inv)
|
||||
|
||||
@@ -24,6 +24,7 @@ from dataclasses import asdict
|
||||
|
||||
# Import module under test
|
||||
from cv_vocab_pipeline import (
|
||||
ColumnGeometry,
|
||||
PageRegion,
|
||||
VocabRow,
|
||||
PipelineResult,
|
||||
@@ -35,6 +36,7 @@ from cv_vocab_pipeline import (
|
||||
_filter_narrow_runs,
|
||||
_build_margin_regions,
|
||||
_detect_header_footer_gaps,
|
||||
_detect_sub_columns,
|
||||
_region_has_content,
|
||||
_add_header_footer,
|
||||
analyze_layout,
|
||||
@@ -1170,6 +1172,233 @@ class TestRegionContentCheck:
|
||||
assert bottom_regions[0].type == 'footer'
|
||||
|
||||
|
||||
# =============================================
|
||||
# Sub-Column Detection Tests
|
||||
# =============================================
|
||||
|
||||
class TestSubColumnDetection:
|
||||
"""Tests for _detect_sub_columns() left-edge alignment detection."""
|
||||
|
||||
def _make_word(self, left: int, text: str = "word", conf: int = 90) -> dict:
|
||||
return {'left': left, 'top': 100, 'width': 50, 'height': 20,
|
||||
'text': text, 'conf': conf}
|
||||
|
||||
def _make_geo(self, x: int, width: int, words: list, content_w: int = 1000) -> ColumnGeometry:
|
||||
return ColumnGeometry(
|
||||
index=0, x=x, y=50, width=width, height=500,
|
||||
word_count=len(words), words=words,
|
||||
width_ratio=width / content_w,
|
||||
)
|
||||
|
||||
def test_sub_column_split_page_refs(self):
|
||||
"""3 page-refs left + 40 vocab words right → split into 2.
|
||||
|
||||
The leftmost bin with >= 10% of words (>= 5) is the vocab bin
|
||||
at left=250, so the 3 page-refs are outliers.
|
||||
"""
|
||||
content_w = 1000
|
||||
page_words = [self._make_word(100, f"p.{59+i}") for i in range(3)]
|
||||
vocab_words = [self._make_word(250, f"word{i}") for i in range(40)]
|
||||
all_words = page_words + vocab_words
|
||||
geo = self._make_geo(x=80, width=300, words=all_words, content_w=content_w)
|
||||
|
||||
result = _detect_sub_columns([geo], content_w)
|
||||
|
||||
assert len(result) == 2, f"Expected 2 columns, got {len(result)}"
|
||||
left_col = result[0]
|
||||
right_col = result[1]
|
||||
assert left_col.x < right_col.x
|
||||
assert left_col.word_count == 3
|
||||
assert right_col.word_count == 40
|
||||
assert left_col.index == 0
|
||||
assert right_col.index == 1
|
||||
|
||||
def test_sub_column_split_exclamation_marks(self):
|
||||
"""5 '!' (misread as I/|) left + 80 example words → split into 2.
|
||||
|
||||
Mirrors the real-world case where red ! marks are OCR'd as I, |, B, 1
|
||||
at a position slightly left of the example sentence start.
|
||||
"""
|
||||
content_w = 1500
|
||||
bang_words = [self._make_word(950 + i, chr(ord('I')), conf=60) for i in range(5)]
|
||||
example_words = [self._make_word(975 + (i * 3), f"word{i}") for i in range(80)]
|
||||
all_words = bang_words + example_words
|
||||
geo = self._make_geo(x=940, width=530, words=all_words, content_w=content_w)
|
||||
|
||||
result = _detect_sub_columns([geo], content_w)
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0].word_count == 5
|
||||
assert result[1].word_count == 80
|
||||
|
||||
def test_no_split_uniform_alignment(self):
|
||||
"""All words aligned at same position → no change."""
|
||||
content_w = 1000
|
||||
words = [self._make_word(200, f"word{i}") for i in range(15)]
|
||||
geo = self._make_geo(x=180, width=300, words=words, content_w=content_w)
|
||||
|
||||
result = _detect_sub_columns([geo], content_w)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].word_count == 15
|
||||
|
||||
def test_no_split_narrow_column(self):
|
||||
"""Narrow column (width_ratio < 0.15) → no split attempted."""
|
||||
content_w = 1000
|
||||
words = [self._make_word(50, "a")] * 3 + [self._make_word(120, "b")] * 10
|
||||
geo = self._make_geo(x=40, width=140, words=words, content_w=content_w)
|
||||
|
||||
result = _detect_sub_columns([geo], content_w)
|
||||
|
||||
assert len(result) == 1
|
||||
|
||||
def test_no_split_balanced_clusters(self):
|
||||
"""Both clusters similarly sized (ratio >= 0.35) → no split."""
|
||||
content_w = 1000
|
||||
left_words = [self._make_word(100, f"a{i}") for i in range(8)]
|
||||
right_words = [self._make_word(300, f"b{i}") for i in range(12)]
|
||||
all_words = left_words + right_words
|
||||
geo = self._make_geo(x=80, width=400, words=all_words, content_w=content_w)
|
||||
|
||||
result = _detect_sub_columns([geo], content_w)
|
||||
|
||||
assert len(result) == 1
|
||||
|
||||
def test_sub_column_reindexing(self):
|
||||
"""After split, indices are correctly 0, 1, 2 across all columns."""
|
||||
content_w = 1000
|
||||
# First column: no split (all words at same alignment)
|
||||
words1 = [self._make_word(50, f"de{i}") for i in range(10)]
|
||||
geo1 = ColumnGeometry(index=0, x=30, y=50, width=200, height=500,
|
||||
word_count=10, words=words1, width_ratio=0.2)
|
||||
# Second column: will split (3 outliers + 40 main)
|
||||
page_words = [self._make_word(400, f"p.{i}") for i in range(3)]
|
||||
en_words = [self._make_word(550, f"en{i}") for i in range(40)]
|
||||
geo2 = ColumnGeometry(index=1, x=380, y=50, width=300, height=500,
|
||||
word_count=43, words=page_words + en_words, width_ratio=0.3)
|
||||
|
||||
result = _detect_sub_columns([geo1, geo2], content_w)
|
||||
|
||||
assert len(result) == 3
|
||||
assert [g.index for g in result] == [0, 1, 2]
|
||||
assert result[0].word_count == 10
|
||||
assert result[1].word_count == 3
|
||||
assert result[2].word_count == 40
|
||||
|
||||
def test_no_split_too_few_words(self):
|
||||
"""Column with fewer than 5 words → no split attempted."""
|
||||
content_w = 1000
|
||||
words = [self._make_word(100, "a"), self._make_word(300, "b"),
|
||||
self._make_word(300, "c"), self._make_word(300, "d")]
|
||||
geo = self._make_geo(x=80, width=300, words=words, content_w=content_w)
|
||||
|
||||
result = _detect_sub_columns([geo], content_w)
|
||||
|
||||
assert len(result) == 1
|
||||
|
||||
def test_no_split_single_minority_word(self):
|
||||
"""Only 1 word left of column start → no split (need >= 2)."""
|
||||
content_w = 1000
|
||||
minority = [self._make_word(100, "p.59")]
|
||||
majority = [self._make_word(300, f"w{i}") for i in range(30)]
|
||||
geo = self._make_geo(x=80, width=350, words=minority + majority, content_w=content_w)
|
||||
|
||||
result = _detect_sub_columns([geo], content_w)
|
||||
|
||||
assert len(result) == 1
|
||||
|
||||
def test_sub_column_split_with_left_x_offset(self):
|
||||
"""Word 'left' values are relative to left_x; geo.x is absolute.
|
||||
|
||||
Real-world scenario: left_x=195, EN column at geo.x=310.
|
||||
Page refs at relative left=115-157, vocab words at relative left=216.
|
||||
Without left_x, split_x would be ~202 (< geo.x=310) → negative width → no split.
|
||||
With left_x=195, split_abs = 202 + 195 = 397, which is between geo.x(310)
|
||||
and geo.x+geo.width(748) → valid split.
|
||||
"""
|
||||
content_w = 1469
|
||||
left_x = 195
|
||||
page_refs = [self._make_word(115, "p.59"), self._make_word(157, "p.60"),
|
||||
self._make_word(157, "p.61")]
|
||||
vocab = [self._make_word(216, f"word{i}") for i in range(40)]
|
||||
all_words = page_refs + vocab
|
||||
geo = self._make_geo(x=310, width=438, words=all_words, content_w=content_w)
|
||||
|
||||
result = _detect_sub_columns([geo], content_w, left_x=left_x)
|
||||
|
||||
assert len(result) == 2, f"Expected 2 columns, got {len(result)}"
|
||||
assert result[0].word_count == 3
|
||||
assert result[1].word_count == 40
|
||||
|
||||
|
||||
class TestCellsToVocabEntriesPageRef:
|
||||
"""Test that page_ref cells are mapped to source_page field."""
|
||||
|
||||
def test_page_ref_mapped_to_source_page(self):
|
||||
"""Cell with col_type='page_ref' → source_page field populated."""
|
||||
from cv_vocab_pipeline import _cells_to_vocab_entries
|
||||
|
||||
cells = [
|
||||
{
|
||||
'row_index': 0,
|
||||
'col_type': 'column_en',
|
||||
'text': 'hello',
|
||||
'bbox_pct': {'x': 10, 'y': 10, 'w': 30, 'h': 5},
|
||||
'confidence': 95.0,
|
||||
'ocr_engine': 'tesseract',
|
||||
},
|
||||
{
|
||||
'row_index': 0,
|
||||
'col_type': 'column_de',
|
||||
'text': 'hallo',
|
||||
'bbox_pct': {'x': 40, 'y': 10, 'w': 30, 'h': 5},
|
||||
'confidence': 90.0,
|
||||
'ocr_engine': 'tesseract',
|
||||
},
|
||||
{
|
||||
'row_index': 0,
|
||||
'col_type': 'page_ref',
|
||||
'text': 'p.59',
|
||||
'bbox_pct': {'x': 5, 'y': 10, 'w': 5, 'h': 5},
|
||||
'confidence': 80.0,
|
||||
'ocr_engine': 'tesseract',
|
||||
},
|
||||
]
|
||||
columns_meta = [
|
||||
{'type': 'column_en'}, {'type': 'column_de'}, {'type': 'page_ref'},
|
||||
]
|
||||
|
||||
entries = _cells_to_vocab_entries(cells, columns_meta)
|
||||
|
||||
assert len(entries) == 1
|
||||
assert entries[0]['english'] == 'hello'
|
||||
assert entries[0]['german'] == 'hallo'
|
||||
assert entries[0]['source_page'] == 'p.59'
|
||||
assert entries[0]['bbox_ref'] == {'x': 5, 'y': 10, 'w': 5, 'h': 5}
|
||||
|
||||
def test_no_page_ref_defaults_empty(self):
|
||||
"""Without page_ref cell, source_page defaults to empty string."""
|
||||
from cv_vocab_pipeline import _cells_to_vocab_entries
|
||||
|
||||
cells = [
|
||||
{
|
||||
'row_index': 0,
|
||||
'col_type': 'column_en',
|
||||
'text': 'world',
|
||||
'bbox_pct': {'x': 10, 'y': 10, 'w': 30, 'h': 5},
|
||||
'confidence': 95.0,
|
||||
'ocr_engine': 'tesseract',
|
||||
},
|
||||
]
|
||||
columns_meta = [{'type': 'column_en'}]
|
||||
|
||||
entries = _cells_to_vocab_entries(cells, columns_meta)
|
||||
|
||||
assert len(entries) == 1
|
||||
assert entries[0]['source_page'] == ''
|
||||
assert entries[0]['bbox_ref'] is None
|
||||
|
||||
|
||||
# =============================================
|
||||
# RUN TESTS
|
||||
# =============================================
|
||||
|
||||
Reference in New Issue
Block a user