From 28352f5bab4260d6f71a969433aafe5f74ec46a0 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sun, 15 Mar 2026 00:03:58 +0100 Subject: [PATCH] feat: replace gap-based column detection with left-edge alignment algorithm Column detection now clusters word left-edges by X-proximity and filters by row coverage (Y-coverage), matching the proven approach from cv_layout.py but using precise OCR word positions instead of ink-based estimates. Co-Authored-By: Claude Opus 4.6 --- klausur-service/backend/grid_editor_api.py | 152 ++++++++++++++++++++- 1 file changed, 149 insertions(+), 3 deletions(-) diff --git a/klausur-service/backend/grid_editor_api.py b/klausur-service/backend/grid_editor_api.py index 1b0edf9..4e1173f 100644 --- a/klausur-service/backend/grid_editor_api.py +++ b/klausur-service/backend/grid_editor_api.py @@ -20,7 +20,7 @@ import numpy as np from fastapi import APIRouter, HTTPException, Request from cv_box_detect import detect_boxes, split_page_into_zones -from cv_words_first import _cluster_columns, _cluster_rows, _build_cells +from cv_words_first import _cluster_rows, _build_cells from ocr_pipeline_session_store import ( get_session_db, get_session_image, @@ -36,6 +36,151 @@ router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["grid-editor"]) # Helpers # --------------------------------------------------------------------------- +def _cluster_columns_by_alignment( + words: List[Dict], + zone_w: int, + rows: List[Dict], +) -> List[Dict[str, Any]]: + """Detect columns by clustering left-edge alignment across rows. + + Algorithm (adapted from cv_layout._detect_columns_by_clustering): + 1. Tag each word with its row index + 2. Cluster word left-edges by X-proximity + 3. Count distinct rows per cluster (Y-coverage) + 4. Keep clusters with sufficient row coverage + 5. Merge nearby clusters + 6. Build column boundaries + + With real OCR words (from Kombi mode) this is more reliable than the + original ink-based version because left-edge positions are precise. + """ + if not words or not rows: + return [] + + total_rows = len(rows) + if total_rows == 0: + return [] + + # --- Tag each word with its row index --- + row_of: Dict[int, int] = {} + for w in words: + y_center = w["top"] + w["height"] / 2 + best = min(rows, key=lambda r: abs(r["y_center"] - y_center)) + row_of[id(w)] = best["index"] + + # --- Collect and sort left-edges --- + edge_data = sorted( + ((w["left"], row_of[id(w)]) for w in words), + key=lambda x: x[0], + ) + + # --- Cluster by X-proximity --- + tolerance = max(10, int(zone_w * 0.01)) + clusters: List[Dict[str, Any]] = [] + cur_edges = [edge_data[0][0]] + cur_rows = {edge_data[0][1]} + + for left, row_idx in edge_data[1:]: + if left - cur_edges[-1] <= tolerance: + cur_edges.append(left) + cur_rows.add(row_idx) + else: + clusters.append({ + "mean_x": int(sum(cur_edges) / len(cur_edges)), + "min_edge": min(cur_edges), + "max_edge": max(cur_edges), + "count": len(cur_edges), + "distinct_rows": len(cur_rows), + "row_coverage": len(cur_rows) / total_rows, + }) + cur_edges = [left] + cur_rows = {row_idx} + clusters.append({ + "mean_x": int(sum(cur_edges) / len(cur_edges)), + "min_edge": min(cur_edges), + "max_edge": max(cur_edges), + "count": len(cur_edges), + "distinct_rows": len(cur_rows), + "row_coverage": len(cur_rows) / total_rows, + }) + + # --- Filter by row coverage --- + MIN_COVERAGE_PRIMARY = 0.15 + MIN_COVERAGE_SECONDARY = 0.08 + MIN_WORDS_SECONDARY = 3 + MIN_DISTINCT_ROWS = 2 + + primary = [ + c for c in clusters + if c["row_coverage"] >= MIN_COVERAGE_PRIMARY + and c["distinct_rows"] >= MIN_DISTINCT_ROWS + ] + primary_ids = {id(c) for c in primary} + secondary = [ + c for c in clusters + if id(c) not in primary_ids + and c["row_coverage"] >= MIN_COVERAGE_SECONDARY + and c["count"] >= MIN_WORDS_SECONDARY + and c["distinct_rows"] >= MIN_DISTINCT_ROWS + ] + significant = sorted(primary + secondary, key=lambda c: c["mean_x"]) + + logger.info( + "alignment columns: %d clusters total, %d primary, %d secondary → %d significant", + len(clusters), len(primary), len(secondary), len(significant), + ) + + if not significant: + # Fallback: single column covering all content + x_min = min(w["left"] for w in words) + x_max = max(w["left"] + w["width"] for w in words) + return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}] + + # --- Merge nearby clusters --- + merge_distance = max(25, int(zone_w * 0.03)) + merged = [significant[0].copy()] + for s in significant[1:]: + if s["mean_x"] - merged[-1]["mean_x"] < merge_distance: + prev = merged[-1] + total = prev["count"] + s["count"] + prev["mean_x"] = ( + prev["mean_x"] * prev["count"] + s["mean_x"] * s["count"] + ) // total + prev["count"] = total + prev["min_edge"] = min(prev["min_edge"], s["min_edge"]) + prev["max_edge"] = max(prev["max_edge"], s["max_edge"]) + prev["distinct_rows"] = max(prev["distinct_rows"], s["distinct_rows"]) + else: + merged.append(s.copy()) + + logger.info( + "alignment columns: %d after merge (distance=%d)", + len(merged), merge_distance, + ) + + # --- Build column boundaries --- + margin = max(5, int(zone_w * 0.005)) + content_x_min = min(w["left"] for w in words) + content_x_max = max(w["left"] + w["width"] for w in words) + + columns: List[Dict[str, Any]] = [] + for i, cluster in enumerate(merged): + x_min = max(content_x_min, cluster["min_edge"] - margin) + if i + 1 < len(merged): + x_max = merged[i + 1]["min_edge"] - margin + else: + x_max = content_x_max + + columns.append({ + "index": i, + "type": f"column_{i + 1}" if len(merged) > 1 else "column_text", + "x_min": x_min, + "x_max": x_max, + }) + + return columns + + def _flatten_word_boxes(cells: List[Dict]) -> List[Dict]: """Extract all word_boxes from cells into a flat list of word dicts.""" words: List[Dict] = [] @@ -127,9 +272,10 @@ def _build_zone_grid( "header_rows": [], } - # Cluster columns and rows - columns = _cluster_columns(zone_words, zone_w) + # Cluster rows first (needed for column alignment analysis) rows = _cluster_rows(zone_words) + # Cluster columns by left-edge alignment + columns = _cluster_columns_by_alignment(zone_words, zone_w, rows) if not columns or not rows: return {