feat: replace gap-based column detection with left-edge alignment algorithm
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 26s
CI / test-go-edu-search (push) Successful in 27s
CI / test-python-klausur (push) Failing after 1m58s
CI / test-python-agent-core (push) Successful in 20s
CI / test-nodejs-website (push) Successful in 17s

Column detection now clusters word left-edges by X-proximity and filters
by row coverage (Y-coverage), matching the proven approach from cv_layout.py
but using precise OCR word positions instead of ink-based estimates.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-15 00:03:58 +01:00
parent c3f1547e32
commit 28352f5bab

View File

@@ -20,7 +20,7 @@ import numpy as np
from fastapi import APIRouter, HTTPException, Request
from cv_box_detect import detect_boxes, split_page_into_zones
from cv_words_first import _cluster_columns, _cluster_rows, _build_cells
from cv_words_first import _cluster_rows, _build_cells
from ocr_pipeline_session_store import (
get_session_db,
get_session_image,
@@ -36,6 +36,151 @@ router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["grid-editor"])
# Helpers
# ---------------------------------------------------------------------------
def _cluster_columns_by_alignment(
words: List[Dict],
zone_w: int,
rows: List[Dict],
) -> List[Dict[str, Any]]:
"""Detect columns by clustering left-edge alignment across rows.
Algorithm (adapted from cv_layout._detect_columns_by_clustering):
1. Tag each word with its row index
2. Cluster word left-edges by X-proximity
3. Count distinct rows per cluster (Y-coverage)
4. Keep clusters with sufficient row coverage
5. Merge nearby clusters
6. Build column boundaries
With real OCR words (from Kombi mode) this is more reliable than the
original ink-based version because left-edge positions are precise.
"""
if not words or not rows:
return []
total_rows = len(rows)
if total_rows == 0:
return []
# --- Tag each word with its row index ---
row_of: Dict[int, int] = {}
for w in words:
y_center = w["top"] + w["height"] / 2
best = min(rows, key=lambda r: abs(r["y_center"] - y_center))
row_of[id(w)] = best["index"]
# --- Collect and sort left-edges ---
edge_data = sorted(
((w["left"], row_of[id(w)]) for w in words),
key=lambda x: x[0],
)
# --- Cluster by X-proximity ---
tolerance = max(10, int(zone_w * 0.01))
clusters: List[Dict[str, Any]] = []
cur_edges = [edge_data[0][0]]
cur_rows = {edge_data[0][1]}
for left, row_idx in edge_data[1:]:
if left - cur_edges[-1] <= tolerance:
cur_edges.append(left)
cur_rows.add(row_idx)
else:
clusters.append({
"mean_x": int(sum(cur_edges) / len(cur_edges)),
"min_edge": min(cur_edges),
"max_edge": max(cur_edges),
"count": len(cur_edges),
"distinct_rows": len(cur_rows),
"row_coverage": len(cur_rows) / total_rows,
})
cur_edges = [left]
cur_rows = {row_idx}
clusters.append({
"mean_x": int(sum(cur_edges) / len(cur_edges)),
"min_edge": min(cur_edges),
"max_edge": max(cur_edges),
"count": len(cur_edges),
"distinct_rows": len(cur_rows),
"row_coverage": len(cur_rows) / total_rows,
})
# --- Filter by row coverage ---
MIN_COVERAGE_PRIMARY = 0.15
MIN_COVERAGE_SECONDARY = 0.08
MIN_WORDS_SECONDARY = 3
MIN_DISTINCT_ROWS = 2
primary = [
c for c in clusters
if c["row_coverage"] >= MIN_COVERAGE_PRIMARY
and c["distinct_rows"] >= MIN_DISTINCT_ROWS
]
primary_ids = {id(c) for c in primary}
secondary = [
c for c in clusters
if id(c) not in primary_ids
and c["row_coverage"] >= MIN_COVERAGE_SECONDARY
and c["count"] >= MIN_WORDS_SECONDARY
and c["distinct_rows"] >= MIN_DISTINCT_ROWS
]
significant = sorted(primary + secondary, key=lambda c: c["mean_x"])
logger.info(
"alignment columns: %d clusters total, %d primary, %d secondary → %d significant",
len(clusters), len(primary), len(secondary), len(significant),
)
if not significant:
# Fallback: single column covering all content
x_min = min(w["left"] for w in words)
x_max = max(w["left"] + w["width"] for w in words)
return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}]
# --- Merge nearby clusters ---
merge_distance = max(25, int(zone_w * 0.03))
merged = [significant[0].copy()]
for s in significant[1:]:
if s["mean_x"] - merged[-1]["mean_x"] < merge_distance:
prev = merged[-1]
total = prev["count"] + s["count"]
prev["mean_x"] = (
prev["mean_x"] * prev["count"] + s["mean_x"] * s["count"]
) // total
prev["count"] = total
prev["min_edge"] = min(prev["min_edge"], s["min_edge"])
prev["max_edge"] = max(prev["max_edge"], s["max_edge"])
prev["distinct_rows"] = max(prev["distinct_rows"], s["distinct_rows"])
else:
merged.append(s.copy())
logger.info(
"alignment columns: %d after merge (distance=%d)",
len(merged), merge_distance,
)
# --- Build column boundaries ---
margin = max(5, int(zone_w * 0.005))
content_x_min = min(w["left"] for w in words)
content_x_max = max(w["left"] + w["width"] for w in words)
columns: List[Dict[str, Any]] = []
for i, cluster in enumerate(merged):
x_min = max(content_x_min, cluster["min_edge"] - margin)
if i + 1 < len(merged):
x_max = merged[i + 1]["min_edge"] - margin
else:
x_max = content_x_max
columns.append({
"index": i,
"type": f"column_{i + 1}" if len(merged) > 1 else "column_text",
"x_min": x_min,
"x_max": x_max,
})
return columns
def _flatten_word_boxes(cells: List[Dict]) -> List[Dict]:
"""Extract all word_boxes from cells into a flat list of word dicts."""
words: List[Dict] = []
@@ -127,9 +272,10 @@ def _build_zone_grid(
"header_rows": [],
}
# Cluster columns and rows
columns = _cluster_columns(zone_words, zone_w)
# Cluster rows first (needed for column alignment analysis)
rows = _cluster_rows(zone_words)
# Cluster columns by left-edge alignment
columns = _cluster_columns_by_alignment(zone_words, zone_w, rows)
if not columns or not rows:
return {