From 1a246eb059c4606a149d08ac2fabcfef621b0aa3 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Mon, 2 Mar 2026 18:18:02 +0100 Subject: [PATCH] feat(ocr-pipeline): generic sub-column detection via left-edge clustering Detects hidden sub-columns (e.g. page references like "p.59") within already-recognized columns by clustering word left-edge positions and splitting when a clear minority cluster exists. The sub-column is then classified as page_ref and mapped to VocabRow.source_page. Co-Authored-By: Claude Sonnet 4.6 --- klausur-service/backend/cv_vocab_pipeline.py | 153 +++++++++++++- klausur-service/backend/ocr_pipeline_api.py | 4 + .../backend/tests/test_cv_vocab_pipeline.py | 188 ++++++++++++++++++ 3 files changed, 343 insertions(+), 2 deletions(-) diff --git a/klausur-service/backend/cv_vocab_pipeline.py b/klausur-service/backend/cv_vocab_pipeline.py index 2bd8bd1..a147d53 100644 --- a/klausur-service/backend/cv_vocab_pipeline.py +++ b/klausur-service/backend/cv_vocab_pipeline.py @@ -140,6 +140,7 @@ class VocabRow: english: str = "" german: str = "" example: str = "" + source_page: str = "" confidence: float = 0.0 y_position: int = 0 @@ -1033,6 +1034,147 @@ def _detect_columns_by_clustering( ) +def _detect_sub_columns( + geometries: List[ColumnGeometry], + content_w: int, +) -> List[ColumnGeometry]: + """Split columns that contain internal sub-columns based on left-edge clustering. + + Detects cases where a minority of words in a column are left-aligned at a + different position than the majority (e.g. page references "p.59" next to + vocabulary words). + + Returns a new list of ColumnGeometry — potentially longer than the input. + """ + if content_w <= 0: + return geometries + + result: List[ColumnGeometry] = [] + for geo in geometries: + # Only consider wide-enough columns with enough words + if geo.width_ratio < 0.15 or geo.word_count < 5: + result.append(geo) + continue + + # Collect left-edges of confident words + left_edges: List[int] = [] + for w in geo.words: + if w.get('conf', 0) >= 30: + left_edges.append(w['left']) + + if len(left_edges) < 3: + result.append(geo) + continue + + # Sort and find the largest gap between consecutive left-edge values + sorted_edges = sorted(left_edges) + best_gap = 0 + best_gap_pos = 0 # split point: values <= best_gap_pos go left + for i in range(len(sorted_edges) - 1): + gap = sorted_edges[i + 1] - sorted_edges[i] + if gap > best_gap: + best_gap = gap + best_gap_pos = (sorted_edges[i] + sorted_edges[i + 1]) // 2 + + # Gap must be significant relative to column width + min_gap = max(15, int(geo.width * 0.08)) + if best_gap < min_gap: + result.append(geo) + continue + + # Split words into left (minority candidate) and right groups + left_words = [w for w in geo.words if w.get('conf', 0) >= 30 and w['left'] <= best_gap_pos] + right_words = [w for w in geo.words if w.get('conf', 0) >= 30 and w['left'] > best_gap_pos] + + # Also include low-conf words by position + for w in geo.words: + if w.get('conf', 0) < 30: + if w['left'] <= best_gap_pos: + left_words.append(w) + else: + right_words.append(w) + + total = len(left_words) + len(right_words) + if total == 0: + result.append(geo) + continue + + # Determine minority/majority + if len(left_words) <= len(right_words): + minority, majority = left_words, right_words + minority_is_left = True + else: + minority, majority = right_words, left_words + minority_is_left = False + + # Check minority constraints + minority_ratio = len(minority) / total + if minority_ratio >= 0.35 or len(minority) < 2: + result.append(geo) + continue + + # Build two sub-column geometries + if minority_is_left: + # Minority is left sub-column, majority is right + sub_x = geo.x + sub_width = best_gap_pos - geo.x + main_x = best_gap_pos + main_width = (geo.x + geo.width) - best_gap_pos + else: + # Minority is right sub-column, majority is left + main_x = geo.x + main_width = best_gap_pos - geo.x + sub_x = best_gap_pos + sub_width = (geo.x + geo.width) - best_gap_pos + + # Sanity check widths + if sub_width <= 0 or main_width <= 0: + result.append(geo) + continue + + sub_geo = ColumnGeometry( + index=0, # will be re-indexed below + x=sub_x, + y=geo.y, + width=sub_width, + height=geo.height, + word_count=len(minority), + words=minority, + width_ratio=sub_width / content_w if content_w > 0 else 0.0, + ) + main_geo = ColumnGeometry( + index=0, # will be re-indexed below + x=main_x, + y=geo.y, + width=main_width, + height=geo.height, + word_count=len(majority), + words=majority, + width_ratio=main_width / content_w if content_w > 0 else 0.0, + ) + + # Insert in left-to-right order + if sub_x < main_x: + result.append(sub_geo) + result.append(main_geo) + else: + result.append(main_geo) + result.append(sub_geo) + + logger.info( + f"SubColumnSplit: column idx={geo.index} split at gap={best_gap}px, " + f"minority={len(minority)} words (left={minority_is_left}), " + f"majority={len(majority)} words" + ) + + # Re-index by left-to-right order + result.sort(key=lambda g: g.x) + for i, g in enumerate(result): + g.index = i + + return result + + def _build_geometries_from_starts( col_starts: List[Tuple[int, int]], word_dicts: List[Dict], @@ -2727,6 +2869,9 @@ def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Li geometries, left_x, right_x, top_y, bottom_y, _word_dicts, _inv = result content_w = right_x - left_x + # Split sub-columns (e.g. page references) before classification + geometries = _detect_sub_columns(geometries, content_w) + # Phase B: Content-based classification regions = classify_column_types(geometries, content_w, top_y, w, h, bottom_y, left_x=left_x, right_x=right_x, inv=_inv) @@ -3841,7 +3986,7 @@ def build_cell_grid( return [], [] # Use columns only — skip ignore, header, footer, page_ref - _skip_types = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'page_ref', 'margin_left', 'margin_right'} + _skip_types = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'margin_left', 'margin_right'} relevant_cols = [c for c in column_regions if c.type not in _skip_types] if not relevant_cols: logger.warning("build_cell_grid: no usable columns found") @@ -4003,7 +4148,7 @@ def build_cell_grid_streaming( if not content_rows: return - _skip_types = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'page_ref', 'margin_left', 'margin_right'} + _skip_types = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'margin_left', 'margin_right'} relevant_cols = [c for c in column_regions if c.type not in _skip_types] if not relevant_cols: return @@ -4055,11 +4200,13 @@ def _cells_to_vocab_entries( 'column_en': 'english', 'column_de': 'german', 'column_example': 'example', + 'page_ref': 'source_page', } bbox_key_map = { 'column_en': 'bbox_en', 'column_de': 'bbox_de', 'column_example': 'bbox_ex', + 'page_ref': 'bbox_ref', } # Group cells by row_index @@ -4076,11 +4223,13 @@ def _cells_to_vocab_entries( 'english': '', 'german': '', 'example': '', + 'source_page': '', 'confidence': 0.0, 'bbox': None, 'bbox_en': None, 'bbox_de': None, 'bbox_ex': None, + 'bbox_ref': None, 'ocr_engine': row_cells[0].get('ocr_engine', '') if row_cells else '', } diff --git a/klausur-service/backend/ocr_pipeline_api.py b/klausur-service/backend/ocr_pipeline_api.py index 78299f0..e5f83d2 100644 --- a/klausur-service/backend/ocr_pipeline_api.py +++ b/klausur-service/backend/ocr_pipeline_api.py @@ -34,6 +34,7 @@ from cv_vocab_pipeline import ( PageRegion, RowGeometry, _cells_to_vocab_entries, + _detect_sub_columns, _fix_character_confusion, _fix_phonetic_brackets, analyze_layout, @@ -698,6 +699,9 @@ async def detect_columns(session_id: str): cached["_inv"] = inv cached["_content_bounds"] = (left_x, right_x, top_y, bottom_y) + # Split sub-columns (e.g. page references) before classification + geometries = _detect_sub_columns(geometries, content_w) + # Phase B: Content-based classification regions = classify_column_types(geometries, content_w, top_y, w, h, bottom_y, left_x=left_x, right_x=right_x, inv=inv) diff --git a/klausur-service/backend/tests/test_cv_vocab_pipeline.py b/klausur-service/backend/tests/test_cv_vocab_pipeline.py index 4d764c7..4e79929 100644 --- a/klausur-service/backend/tests/test_cv_vocab_pipeline.py +++ b/klausur-service/backend/tests/test_cv_vocab_pipeline.py @@ -24,6 +24,7 @@ from dataclasses import asdict # Import module under test from cv_vocab_pipeline import ( + ColumnGeometry, PageRegion, VocabRow, PipelineResult, @@ -35,6 +36,7 @@ from cv_vocab_pipeline import ( _filter_narrow_runs, _build_margin_regions, _detect_header_footer_gaps, + _detect_sub_columns, _region_has_content, _add_header_footer, analyze_layout, @@ -1170,6 +1172,192 @@ class TestRegionContentCheck: assert bottom_regions[0].type == 'footer' +# ============================================= +# Sub-Column Detection Tests +# ============================================= + +class TestSubColumnDetection: + """Tests for _detect_sub_columns() left-edge clustering.""" + + def _make_word(self, left: int, text: str = "word", conf: int = 90) -> dict: + return {'left': left, 'top': 100, 'width': 50, 'height': 20, + 'text': text, 'conf': conf} + + def _make_geo(self, x: int, width: int, words: list, content_w: int = 1000) -> ColumnGeometry: + return ColumnGeometry( + index=0, x=x, y=50, width=width, height=500, + word_count=len(words), words=words, + width_ratio=width / content_w, + ) + + def test_sub_column_split_page_refs(self): + """Column with 3 'p.XX' left + 20 EN words right → split into 2.""" + content_w = 1000 + # 3 page-ref words at left=100, 20 vocab words at left=250 + page_words = [self._make_word(100, f"p.{59+i}") for i in range(3)] + vocab_words = [self._make_word(250, f"word{i}") for i in range(20)] + all_words = page_words + vocab_words + geo = self._make_geo(x=80, width=300, words=all_words, content_w=content_w) + + result = _detect_sub_columns([geo], content_w) + + assert len(result) == 2, f"Expected 2 columns, got {len(result)}" + # Left sub-column should be narrower with fewer words + left_col = result[0] + right_col = result[1] + assert left_col.x < right_col.x + assert left_col.word_count == 3 + assert right_col.word_count == 20 + # Indices should be 0, 1 + assert left_col.index == 0 + assert right_col.index == 1 + + def test_no_split_uniform_alignment(self): + """All words aligned at same position → no change.""" + content_w = 1000 + words = [self._make_word(200, f"word{i}") for i in range(15)] + geo = self._make_geo(x=180, width=300, words=words, content_w=content_w) + + result = _detect_sub_columns([geo], content_w) + + assert len(result) == 1 + assert result[0].word_count == 15 + + def test_no_split_narrow_column(self): + """Narrow column (width_ratio < 0.15) → no split attempted.""" + content_w = 1000 + words = [self._make_word(50, "a")] * 3 + [self._make_word(120, "b")] * 10 + geo = self._make_geo(x=40, width=140, words=words, content_w=content_w) + # width_ratio = 140/1000 = 0.14 < 0.15 + + result = _detect_sub_columns([geo], content_w) + + assert len(result) == 1 + + def test_no_split_balanced_clusters(self): + """Both clusters similarly sized (ratio >= 0.35) → no split.""" + content_w = 1000 + left_words = [self._make_word(100, f"a{i}") for i in range(8)] + right_words = [self._make_word(300, f"b{i}") for i in range(12)] + all_words = left_words + right_words + geo = self._make_geo(x=80, width=400, words=all_words, content_w=content_w) + # 8/20 = 0.4 >= 0.35 → no split + + result = _detect_sub_columns([geo], content_w) + + assert len(result) == 1 + + def test_sub_column_reindexing(self): + """After split, indices are correctly 0, 1, 2 across all columns.""" + content_w = 1000 + # First column: no split + words1 = [self._make_word(50, f"de{i}") for i in range(10)] + geo1 = ColumnGeometry(index=0, x=30, y=50, width=200, height=500, + word_count=10, words=words1, width_ratio=0.2) + # Second column: will split + page_words = [self._make_word(400, f"p.{i}") for i in range(3)] + en_words = [self._make_word(550, f"en{i}") for i in range(15)] + geo2 = ColumnGeometry(index=1, x=380, y=50, width=300, height=500, + word_count=18, words=page_words + en_words, width_ratio=0.3) + + result = _detect_sub_columns([geo1, geo2], content_w) + + assert len(result) == 3 + assert [g.index for g in result] == [0, 1, 2] + # First column unchanged + assert result[0].word_count == 10 + # Sub-column (page refs) + assert result[1].word_count == 3 + # Main column (EN words) + assert result[2].word_count == 15 + + def test_no_split_too_few_words(self): + """Column with fewer than 5 words → no split attempted.""" + content_w = 1000 + words = [self._make_word(100, "a"), self._make_word(300, "b"), + self._make_word(300, "c"), self._make_word(300, "d")] + geo = self._make_geo(x=80, width=300, words=words, content_w=content_w) + + result = _detect_sub_columns([geo], content_w) + + assert len(result) == 1 + + def test_no_split_single_minority_word(self): + """Only 1 word in minority cluster → no split (need >= 2).""" + content_w = 1000 + minority = [self._make_word(100, "p.59")] + majority = [self._make_word(300, f"w{i}") for i in range(20)] + geo = self._make_geo(x=80, width=350, words=minority + majority, content_w=content_w) + + result = _detect_sub_columns([geo], content_w) + + assert len(result) == 1 + + +class TestCellsToVocabEntriesPageRef: + """Test that page_ref cells are mapped to source_page field.""" + + def test_page_ref_mapped_to_source_page(self): + """Cell with col_type='page_ref' → source_page field populated.""" + from cv_vocab_pipeline import _cells_to_vocab_entries + + cells = [ + { + 'row_index': 0, + 'col_type': 'column_en', + 'text': 'hello', + 'bbox_pct': [10, 10, 30, 5], + 'confidence': 95.0, + 'ocr_engine': 'tesseract', + }, + { + 'row_index': 0, + 'col_type': 'column_de', + 'text': 'hallo', + 'bbox_pct': [40, 10, 30, 5], + 'confidence': 90.0, + 'ocr_engine': 'tesseract', + }, + { + 'row_index': 0, + 'col_type': 'page_ref', + 'text': 'p.59', + 'bbox_pct': [5, 10, 5, 5], + 'confidence': 80.0, + 'ocr_engine': 'tesseract', + }, + ] + + entries = _cells_to_vocab_entries(cells) + + assert len(entries) == 1 + assert entries[0]['english'] == 'hello' + assert entries[0]['german'] == 'hallo' + assert entries[0]['source_page'] == 'p.59' + assert entries[0]['bbox_ref'] == [5, 10, 5, 5] + + def test_no_page_ref_defaults_empty(self): + """Without page_ref cell, source_page defaults to empty string.""" + from cv_vocab_pipeline import _cells_to_vocab_entries + + cells = [ + { + 'row_index': 0, + 'col_type': 'column_en', + 'text': 'world', + 'bbox_pct': [10, 10, 30, 5], + 'confidence': 95.0, + 'ocr_engine': 'tesseract', + }, + ] + + entries = _cells_to_vocab_entries(cells) + + assert len(entries) == 1 + assert entries[0]['source_page'] == '' + assert entries[0]['bbox_ref'] is None + + # ============================================= # RUN TESTS # =============================================