From df30d4eae34aba9e8db71d08caaa3a982a3b3431 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Thu, 19 Mar 2026 12:22:11 +0100 Subject: [PATCH] Add zone merging across images + heading detection by color/height Zone merging: content zones separated by box zones (images) are merged into a single zone with image_overlays, so split tables reconnect. Heading detection: after color annotation, rows where all words are non-black and taller than 1.2x median are merged into spanning heading cells. Co-Authored-By: Claude Opus 4.6 --- klausur-service/backend/cv_vocab_types.py | 1 + klausur-service/backend/grid_editor_api.py | 225 +++++++++++ .../backend/tests/test_grid_editor_api.py | 360 ++++++++++++++++++ 3 files changed, 586 insertions(+) create mode 100644 klausur-service/backend/tests/test_grid_editor_api.py diff --git a/klausur-service/backend/cv_vocab_types.py b/klausur-service/backend/cv_vocab_types.py index 4673ae3..e28abc8 100644 --- a/klausur-service/backend/cv_vocab_types.py +++ b/klausur-service/backend/cv_vocab_types.py @@ -178,3 +178,4 @@ class PageZone: width: int box: Optional[DetectedBox] = None columns: List[ColumnGeometry] = field(default_factory=list) + image_overlays: List[Dict] = field(default_factory=list) diff --git a/klausur-service/backend/grid_editor_api.py b/klausur-service/backend/grid_editor_api.py index c80daca..9b79c80 100644 --- a/klausur-service/backend/grid_editor_api.py +++ b/klausur-service/backend/grid_editor_api.py @@ -21,6 +21,7 @@ import numpy as np from fastapi import APIRouter, HTTPException, Request from cv_box_detect import detect_boxes, split_page_into_zones +from cv_vocab_types import PageZone from cv_color_detect import detect_word_colors, recover_colored_text from cv_ocr_engines import fix_cell_phonetics, fix_ipa_continuation_cell, _text_has_garbled_ipa from cv_words_first import _cluster_rows, _build_cells @@ -439,6 +440,217 @@ def _words_in_zone( return result +def _merge_content_zones_across_boxes( + zones: List, + content_x: int, + content_w: int, +) -> List: + """Merge content zones separated by box zones into single zones. + + Box zones become image_overlays on the merged content zone. + Pattern: [content, box*, content] → [merged_content with overlay] + Box zones NOT between two content zones stay as standalone zones. + """ + if len(zones) < 3: + return zones + + # Group consecutive runs of [content, box+, content] + result: List = [] + i = 0 + while i < len(zones): + z = zones[i] + if z.zone_type != "content": + result.append(z) + i += 1 + continue + + # Start of a potential merge group: content zone + group_contents = [z] + group_boxes = [] + j = i + 1 + # Absorb [box, content] pairs — only absorb a box if it's + # confirmed to be followed by another content zone. + while j < len(zones): + if (zones[j].zone_type == "box" + and j + 1 < len(zones) + and zones[j + 1].zone_type == "content"): + group_boxes.append(zones[j]) + group_contents.append(zones[j + 1]) + j += 2 + else: + break + + if len(group_contents) >= 2 and group_boxes: + # Merge: create one large content zone spanning all + y_min = min(c.y for c in group_contents) + y_max = max(c.y + c.height for c in group_contents) + overlays = [] + for bz in group_boxes: + overlay = { + "y": bz.y, + "height": bz.height, + "x": bz.x, + "width": bz.width, + } + if bz.box: + overlay["box"] = { + "x": bz.box.x, + "y": bz.box.y, + "width": bz.box.width, + "height": bz.box.height, + "confidence": bz.box.confidence, + "border_thickness": bz.box.border_thickness, + } + overlays.append(overlay) + + merged = PageZone( + index=0, # re-indexed below + zone_type="content", + y=y_min, + height=y_max - y_min, + x=content_x, + width=content_w, + image_overlays=overlays, + ) + result.append(merged) + i = j + else: + # No merge possible — emit just the content zone + result.append(z) + i += 1 + + # Re-index zones + for idx, z in enumerate(result): + z.index = idx + + logger.info( + "zone-merge: %d zones → %d zones after merging across boxes", + len(zones), len(result), + ) + return result + + +def _detect_heading_rows_by_color(zones_data: List[Dict], img_w: int, img_h: int) -> int: + """Detect heading rows by color + height after color annotation. + + A row is a heading if: + 1. ALL word_boxes have color_name != 'black' (typically 'blue') + 2. Mean word height > 1.2x median height of all words in the zone + + Detected heading rows are merged into a single spanning cell. + Returns count of headings detected. + """ + heading_count = 0 + + for z in zones_data: + cells = z.get("cells", []) + rows = z.get("rows", []) + columns = z.get("columns", []) + if not cells or not rows or len(columns) < 2: + continue + + # Compute median word height across the zone + all_heights = [] + for cell in cells: + for wb in cell.get("word_boxes") or []: + h = wb.get("height", 0) + if h > 0: + all_heights.append(h) + if not all_heights: + continue + all_heights_sorted = sorted(all_heights) + median_h = all_heights_sorted[len(all_heights_sorted) // 2] + + heading_row_indices = [] + for row in rows: + if row.get("is_header"): + continue # already detected as header + ri = row["index"] + row_cells = [c for c in cells if c.get("row_index") == ri] + row_wbs = [ + wb for cell in row_cells + for wb in cell.get("word_boxes") or [] + ] + if not row_wbs: + continue + + # Condition 1: ALL words are non-black + all_colored = all( + wb.get("color_name", "black") != "black" + for wb in row_wbs + ) + if not all_colored: + continue + + # Condition 2: mean height > 1.2x median + mean_h = sum(wb.get("height", 0) for wb in row_wbs) / len(row_wbs) + if mean_h <= median_h * 1.2: + continue + + heading_row_indices.append(ri) + + # Merge heading cells into spanning cells + for hri in heading_row_indices: + header_cells = [c for c in cells if c.get("row_index") == hri] + if len(header_cells) <= 1: + # Single cell — just mark it as heading + if header_cells: + header_cells[0]["col_type"] = "heading" + heading_count += 1 + # Mark row as header + for row in rows: + if row["index"] == hri: + row["is_header"] = True + continue + + # Collect all word_boxes and text from all columns + all_wb = [] + all_text_parts = [] + for hc in sorted(header_cells, key=lambda c: c["col_index"]): + all_wb.extend(hc.get("word_boxes", [])) + if hc.get("text", "").strip(): + all_text_parts.append(hc["text"].strip()) + + # Remove all cells for this row, replace with one spanning cell + z["cells"] = [c for c in z["cells"] if c.get("row_index") != hri] + + if all_wb: + x_min = min(wb["left"] for wb in all_wb) + y_min = min(wb["top"] for wb in all_wb) + x_max = max(wb["left"] + wb["width"] for wb in all_wb) + y_max = max(wb["top"] + wb["height"] for wb in all_wb) + + zone_idx = z.get("zone_index", 0) + z["cells"].append({ + "cell_id": f"Z{zone_idx}_R{hri:02d}_C0", + "zone_index": zone_idx, + "row_index": hri, + "col_index": 0, + "col_type": "heading", + "text": " ".join(all_text_parts), + "confidence": 0.0, + "bbox_px": {"x": x_min, "y": y_min, + "w": x_max - x_min, "h": y_max - y_min}, + "bbox_pct": { + "x": round(x_min / img_w * 100, 2) if img_w else 0, + "y": round(y_min / img_h * 100, 2) if img_h else 0, + "w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0, + "h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0, + }, + "word_boxes": all_wb, + "ocr_engine": "words_first", + "is_bold": True, + }) + + # Mark row as header + for row in rows: + if row["index"] == hri: + row["is_header"] = True + heading_count += 1 + + return heading_count + + def _detect_header_rows( rows: List[Dict], zone_words: List[Dict], @@ -1023,6 +1235,11 @@ async def _build_grid_core(session_id: str, session: dict) -> dict: content_x, content_y, content_w, content_h, boxes ) + # Merge content zones separated by box zones + page_zones = _merge_content_zones_across_boxes( + page_zones, content_x, content_w + ) + # --- Union columns from all content zones --- # Each content zone detects columns independently. Narrow # columns (page refs, markers) may appear in only one zone. @@ -1161,6 +1378,9 @@ async def _build_grid_core(session_id: str, session: dict) -> dict: "confidence": pz.box.confidence, } + if pz.image_overlays: + zone_entry["image_overlays"] = pz.image_overlays + zones_data.append(zone_entry) # 4. Fallback: no boxes detected → single zone with all words @@ -1282,6 +1502,11 @@ async def _build_grid_core(session_id: str, session: dict) -> dict: all_wb.extend(cell.get("word_boxes", [])) detect_word_colors(img_bgr, all_wb) + # 5a. Heading detection by color + height (after color is available) + heading_count = _detect_heading_rows_by_color(zones_data, img_w, img_h) + if heading_count: + logger.info("Detected %d heading rows by color+height", heading_count) + # 5b. Fix unmatched parentheses in cell text # OCR often misses opening "(" while detecting closing ")". # If a cell's text has ")" without a matching "(", prepend "(". diff --git a/klausur-service/backend/tests/test_grid_editor_api.py b/klausur-service/backend/tests/test_grid_editor_api.py new file mode 100644 index 0000000..7d9ee84 --- /dev/null +++ b/klausur-service/backend/tests/test_grid_editor_api.py @@ -0,0 +1,360 @@ +""" +Tests for grid_editor_api zone merging and heading detection. + +Covers: +- _merge_content_zones_across_boxes: zone merging logic +- _detect_heading_rows_by_color: heading detection by color + height +""" + +import sys +sys.path.insert(0, '/app') + +import pytest +from cv_vocab_types import PageZone, DetectedBox +from grid_editor_api import ( + _merge_content_zones_across_boxes, + _detect_heading_rows_by_color, +) + + +# --------------------------------------------------------------------------- +# _merge_content_zones_across_boxes +# --------------------------------------------------------------------------- + +class TestMergeContentZonesAcrossBoxes: + """Test zone merging across box zones.""" + + def test_no_merge_when_less_than_3_zones(self): + """Fewer than 3 zones → no merge possible.""" + zones = [ + PageZone(index=0, zone_type="content", y=0, height=100, x=0, width=500), + PageZone(index=1, zone_type="box", y=100, height=50, x=50, width=400, + box=DetectedBox(x=50, y=100, width=400, height=50, confidence=0.9)), + ] + result = _merge_content_zones_across_boxes(zones, 0, 500) + assert len(result) == 2 + assert result[0].zone_type == "content" + assert result[1].zone_type == "box" + + def test_merge_content_box_content(self): + """[content, box, content] → [merged_content with overlay].""" + zones = [ + PageZone(index=0, zone_type="content", y=0, height=100, x=0, width=500), + PageZone(index=1, zone_type="box", y=100, height=50, x=50, width=400, + box=DetectedBox(x=50, y=100, width=400, height=50, confidence=0.9)), + PageZone(index=2, zone_type="content", y=150, height=200, x=0, width=500), + ] + result = _merge_content_zones_across_boxes(zones, 0, 500) + assert len(result) == 1 + merged = result[0] + assert merged.zone_type == "content" + assert merged.y == 0 + assert merged.height == 350 # 0 to 350 + assert len(merged.image_overlays) == 1 + assert merged.image_overlays[0]["y"] == 100 + assert merged.image_overlays[0]["height"] == 50 + + def test_box_at_start_not_merged(self): + """Box at the start (not between contents) stays separate.""" + zones = [ + PageZone(index=0, zone_type="box", y=0, height=50, x=50, width=400, + box=DetectedBox(x=50, y=0, width=400, height=50, confidence=0.9)), + PageZone(index=1, zone_type="content", y=50, height=100, x=0, width=500), + PageZone(index=2, zone_type="box", y=150, height=50, x=50, width=400, + box=DetectedBox(x=50, y=150, width=400, height=50, confidence=0.9)), + PageZone(index=3, zone_type="content", y=200, height=200, x=0, width=500), + ] + result = _merge_content_zones_across_boxes(zones, 0, 500) + # Box at start stays, then content+box+content merges + assert len(result) == 2 + assert result[0].zone_type == "box" + assert result[1].zone_type == "content" + assert len(result[1].image_overlays) == 1 + + def test_consecutive_boxes_not_merged(self): + """[content, box, box, content] → no merge (consecutive boxes rare in practice).""" + zones = [ + PageZone(index=0, zone_type="content", y=0, height=100, x=0, width=500), + PageZone(index=1, zone_type="box", y=100, height=50, x=50, width=400, + box=DetectedBox(x=50, y=100, width=400, height=50, confidence=0.9)), + PageZone(index=2, zone_type="box", y=150, height=30, x=60, width=380, + box=DetectedBox(x=60, y=150, width=380, height=30, confidence=0.8)), + PageZone(index=3, zone_type="content", y=180, height=200, x=0, width=500), + ] + result = _merge_content_zones_across_boxes(zones, 0, 500) + # Two consecutive boxes: the algorithm only merges [content, box, content] + # pairs, so consecutive boxes break the pattern. + assert len(result) == 4 + + def test_zone_reindexing(self): + """Zone indices are re-numbered after merging.""" + zones = [ + PageZone(index=0, zone_type="content", y=0, height=100, x=0, width=500), + PageZone(index=1, zone_type="box", y=100, height=50, x=50, width=400, + box=DetectedBox(x=50, y=100, width=400, height=50, confidence=0.9)), + PageZone(index=2, zone_type="content", y=150, height=200, x=0, width=500), + ] + result = _merge_content_zones_across_boxes(zones, 0, 500) + assert result[0].index == 0 + + def test_no_boxes_passthrough(self): + """All-content zones pass through unchanged.""" + zones = [ + PageZone(index=0, zone_type="content", y=0, height=100, x=0, width=500), + PageZone(index=1, zone_type="content", y=100, height=100, x=0, width=500), + ] + result = _merge_content_zones_across_boxes(zones, 0, 500) + assert len(result) == 2 + + def test_typical_vocab_page_pattern(self): + """Typical pattern: [box(VOCABULARY), content, box(image), content] + → box stays, content+box+content merges.""" + zones = [ + PageZone(index=0, zone_type="box", y=10, height=40, x=50, width=400, + box=DetectedBox(x=50, y=10, width=400, height=40, confidence=0.95)), + PageZone(index=1, zone_type="content", y=60, height=50, x=0, width=500), + PageZone(index=2, zone_type="box", y=120, height=85, x=50, width=400, + box=DetectedBox(x=50, y=120, width=400, height=85, confidence=0.8)), + PageZone(index=3, zone_type="content", y=210, height=500, x=0, width=500), + ] + result = _merge_content_zones_across_boxes(zones, 0, 500) + assert len(result) == 2 + assert result[0].zone_type == "box" # VOCABULARY header box stays + assert result[1].zone_type == "content" # merged content zone + assert result[1].y == 60 + assert result[1].height == 710 - 60 # 60 to 710 + assert len(result[1].image_overlays) == 1 + assert result[1].image_overlays[0]["y"] == 120 + # Check reindexing + assert result[0].index == 0 + assert result[1].index == 1 + + +# --------------------------------------------------------------------------- +# _detect_heading_rows_by_color +# --------------------------------------------------------------------------- + +class TestDetectHeadingRowsByColor: + """Test heading detection by color + height.""" + + def _make_word_box(self, text, left, top, width, height, color="black"): + return { + "text": text, + "left": left, + "top": top, + "width": width, + "height": height, + "color_name": color, + "conf": 90, + } + + def _make_zone(self, cells, rows, columns, zone_index=0, + bbox_x=0, bbox_y=0, bbox_w=800, bbox_h=1000): + return { + "zone_index": zone_index, + "zone_type": "content", + "bbox_px": {"x": bbox_x, "y": bbox_y, "w": bbox_w, "h": bbox_h}, + "cells": cells, + "rows": rows, + "columns": columns, + } + + def test_blue_heading_detected(self): + """Row with all blue words + taller height → heading.""" + # Normal rows: height ~20 + normal_cells = [] + for ri in range(5): + normal_cells.append({ + "cell_id": f"Z0_R{ri:02d}_C0", + "zone_index": 0, + "row_index": ri, + "col_index": 0, + "col_type": "column_1", + "text": f"word_{ri}", + "word_boxes": [ + self._make_word_box(f"word_{ri}", 10, 100 + ri * 30, 80, 20), + ], + }) + normal_cells.append({ + "cell_id": f"Z0_R{ri:02d}_C1", + "zone_index": 0, + "row_index": ri, + "col_index": 1, + "col_type": "column_2", + "text": f"translation_{ri}", + "word_boxes": [ + self._make_word_box(f"translation_{ri}", 300, 100 + ri * 30, 100, 20), + ], + }) + + # Heading row (index 2): blue, taller (height 25) + heading_ri = 2 + for c in normal_cells: + if c["row_index"] == heading_ri: + for wb in c["word_boxes"]: + wb["color_name"] = "blue" + wb["height"] = 25 # > 1.2 * 20 = 24 + + rows = [ + {"index": ri, "y_min_px": 100 + ri * 30, "y_max_px": 120 + ri * 30, "is_header": False} + for ri in range(5) + ] + columns = [ + {"index": 0, "label": "column_1"}, + {"index": 1, "label": "column_2"}, + ] + + zones_data = [self._make_zone(normal_cells, rows, columns)] + count = _detect_heading_rows_by_color(zones_data, 800, 1000) + + assert count == 1 + # Check that row 2 is now marked as header + assert rows[2]["is_header"] is True + # Check that the heading cell was created + heading_cells = [c for c in zones_data[0]["cells"] if c["row_index"] == heading_ri] + assert len(heading_cells) == 1 + assert heading_cells[0]["col_type"] == "heading" + assert "word_2" in heading_cells[0]["text"] + assert "translation_2" in heading_cells[0]["text"] + + def test_black_row_not_heading(self): + """Row with black words → not a heading, even if tall.""" + cells = [ + { + "cell_id": "Z0_R00_C0", "zone_index": 0, "row_index": 0, + "col_index": 0, "col_type": "column_1", "text": "hello", + "word_boxes": [self._make_word_box("hello", 10, 100, 80, 25, "black")], + }, + { + "cell_id": "Z0_R00_C1", "zone_index": 0, "row_index": 0, + "col_index": 1, "col_type": "column_2", "text": "world", + "word_boxes": [self._make_word_box("world", 300, 100, 80, 25, "black")], + }, + { + "cell_id": "Z0_R01_C0", "zone_index": 0, "row_index": 1, + "col_index": 0, "col_type": "column_1", "text": "foo", + "word_boxes": [self._make_word_box("foo", 10, 130, 80, 20, "black")], + }, + { + "cell_id": "Z0_R01_C1", "zone_index": 0, "row_index": 1, + "col_index": 1, "col_type": "column_2", "text": "bar", + "word_boxes": [self._make_word_box("bar", 300, 130, 80, 20, "black")], + }, + ] + rows = [ + {"index": 0, "y_min_px": 100, "y_max_px": 125, "is_header": False}, + {"index": 1, "y_min_px": 130, "y_max_px": 150, "is_header": False}, + ] + columns = [{"index": 0, "label": "column_1"}, {"index": 1, "label": "column_2"}] + zones_data = [self._make_zone(cells, rows, columns)] + count = _detect_heading_rows_by_color(zones_data, 800, 1000) + assert count == 0 + + def test_mixed_color_row_not_heading(self): + """Row with some blue and some black words → not a heading.""" + cells = [ + { + "cell_id": "Z0_R00_C0", "zone_index": 0, "row_index": 0, + "col_index": 0, "col_type": "column_1", "text": "Unit", + "word_boxes": [self._make_word_box("Unit", 10, 100, 60, 25, "blue")], + }, + { + "cell_id": "Z0_R00_C1", "zone_index": 0, "row_index": 0, + "col_index": 1, "col_type": "column_2", "text": "normal", + "word_boxes": [self._make_word_box("normal", 300, 100, 80, 25, "black")], + }, + { + "cell_id": "Z0_R01_C0", "zone_index": 0, "row_index": 1, + "col_index": 0, "col_type": "column_1", "text": "foo", + "word_boxes": [self._make_word_box("foo", 10, 130, 80, 20, "black")], + }, + { + "cell_id": "Z0_R01_C1", "zone_index": 0, "row_index": 1, + "col_index": 1, "col_type": "column_2", "text": "bar", + "word_boxes": [self._make_word_box("bar", 300, 130, 80, 20, "black")], + }, + ] + rows = [ + {"index": 0, "y_min_px": 100, "y_max_px": 125, "is_header": False}, + {"index": 1, "y_min_px": 130, "y_max_px": 150, "is_header": False}, + ] + columns = [{"index": 0, "label": "column_1"}, {"index": 1, "label": "column_2"}] + zones_data = [self._make_zone(cells, rows, columns)] + count = _detect_heading_rows_by_color(zones_data, 800, 1000) + assert count == 0 + + def test_colored_but_not_tall_not_heading(self): + """Row with all blue words but normal height → not a heading.""" + cells = [ + { + "cell_id": "Z0_R00_C0", "zone_index": 0, "row_index": 0, + "col_index": 0, "col_type": "column_1", "text": "Unit", + "word_boxes": [self._make_word_box("Unit", 10, 100, 60, 20, "blue")], + }, + { + "cell_id": "Z0_R00_C1", "zone_index": 0, "row_index": 0, + "col_index": 1, "col_type": "column_2", "text": "four", + "word_boxes": [self._make_word_box("four", 300, 100, 60, 20, "blue")], + }, + { + "cell_id": "Z0_R01_C0", "zone_index": 0, "row_index": 1, + "col_index": 0, "col_type": "column_1", "text": "foo", + "word_boxes": [self._make_word_box("foo", 10, 130, 80, 20, "black")], + }, + { + "cell_id": "Z0_R01_C1", "zone_index": 0, "row_index": 1, + "col_index": 1, "col_type": "column_2", "text": "bar", + "word_boxes": [self._make_word_box("bar", 300, 130, 80, 20, "black")], + }, + ] + rows = [ + {"index": 0, "y_min_px": 100, "y_max_px": 120, "is_header": False}, + {"index": 1, "y_min_px": 130, "y_max_px": 150, "is_header": False}, + ] + columns = [{"index": 0, "label": "column_1"}, {"index": 1, "label": "column_2"}] + zones_data = [self._make_zone(cells, rows, columns)] + count = _detect_heading_rows_by_color(zones_data, 800, 1000) + assert count == 0 + + def test_single_column_zone_skipped(self): + """Zones with < 2 columns are skipped.""" + cells = [ + { + "cell_id": "Z0_R00_C0", "zone_index": 0, "row_index": 0, + "col_index": 0, "col_type": "column_1", "text": "Unit", + "word_boxes": [self._make_word_box("Unit", 10, 100, 60, 25, "blue")], + }, + ] + rows = [{"index": 0, "y_min_px": 100, "y_max_px": 125, "is_header": False}] + columns = [{"index": 0, "label": "column_1"}] + zones_data = [self._make_zone(cells, rows, columns)] + count = _detect_heading_rows_by_color(zones_data, 800, 1000) + assert count == 0 + + def test_already_header_skipped(self): + """Rows already marked is_header are not re-detected.""" + cells = [ + { + "cell_id": "Z0_R00_C0", "zone_index": 0, "row_index": 0, + "col_index": 0, "col_type": "spanning_header", "text": "Header", + "word_boxes": [self._make_word_box("Header", 10, 100, 60, 25, "blue")], + }, + { + "cell_id": "Z0_R01_C0", "zone_index": 0, "row_index": 1, + "col_index": 0, "col_type": "column_1", "text": "foo", + "word_boxes": [self._make_word_box("foo", 10, 130, 80, 20, "black")], + }, + { + "cell_id": "Z0_R01_C1", "zone_index": 0, "row_index": 1, + "col_index": 1, "col_type": "column_2", "text": "bar", + "word_boxes": [self._make_word_box("bar", 300, 130, 80, 20, "black")], + }, + ] + rows = [ + {"index": 0, "y_min_px": 100, "y_max_px": 125, "is_header": True}, + {"index": 1, "y_min_px": 130, "y_max_px": 150, "is_header": False}, + ] + columns = [{"index": 0, "label": "column_1"}, {"index": 1, "label": "column_2"}] + zones_data = [self._make_zone(cells, rows, columns)] + count = _detect_heading_rows_by_color(zones_data, 800, 1000) + assert count == 0