feat(ocr-pipeline): verticality filter for column detection

Clusters now track Y-positions of their words and filter by vertical
coverage (>=30% primary, >=15%+5words secondary) to reject noise from
indentations or page numbers. Merge distance widened to 3% content width.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-02-27 19:57:13 +01:00
parent bb879a03a8
commit 4f37afa222

View File

@@ -918,6 +918,7 @@ def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Opt
# Collect words with their full info
word_dicts = []
left_edges = []
edge_word_indices = [] # Track which word_dicts index each edge belongs to
n_words = len(data['text'])
for i in range(n_words):
conf = int(data['conf'][i]) if str(data['conf'][i]).lstrip('-').isdigit() else -1
@@ -929,6 +930,7 @@ def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Opt
bw = int(data['width'][i])
bh = int(data['height'][i])
left_edges.append(lx)
edge_word_indices.append(len(word_dicts))
word_dicts.append({
'text': text, 'conf': conf,
'left': lx, 'top': ty, 'width': bw, 'height': bh,
@@ -940,57 +942,106 @@ def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Opt
logger.info(f"ColumnGeometry: {len(left_edges)} words detected in content area")
# --- Cluster left edges ---
# --- Cluster left edges (tracking word indices per cluster) ---
tolerance = max(10, int(content_w * 0.01))
sorted_edges = sorted(left_edges)
clusters = []
current_cluster = [sorted_edges[0]]
for edge in sorted_edges[1:]:
if edge - current_cluster[-1] <= tolerance:
current_cluster.append(edge)
# Sort edges while keeping word index association
sorted_pairs = sorted(zip(left_edges, edge_word_indices), key=lambda p: p[0])
clusters = [] # list of lists of edge x-values
cluster_widxs = [] # parallel list of lists of word_dicts indices
cur_edges = [sorted_pairs[0][0]]
cur_widxs = [sorted_pairs[0][1]]
for edge, widx in sorted_pairs[1:]:
if edge - cur_edges[-1] <= tolerance:
cur_edges.append(edge)
cur_widxs.append(widx)
else:
clusters.append(current_cluster)
current_cluster = [edge]
clusters.append(current_cluster)
clusters.append(cur_edges)
cluster_widxs.append(cur_widxs)
cur_edges = [edge]
cur_widxs = [widx]
clusters.append(cur_edges)
cluster_widxs.append(cur_widxs)
# Filter: only clusters with >= 2 words
significant = [(int(np.mean(c)), len(c), min(c), max(c)) for c in clusters if len(c) >= 2]
significant.sort(key=lambda s: s[0])
# --- Enrich clusters with Y-span info and apply verticality filter ---
MIN_Y_COVERAGE_PRIMARY = 0.30 # Primary columns span >= 30% of page height
MIN_Y_COVERAGE_SECONDARY = 0.15 # Secondary columns span >= 15%
MIN_WORDS_SECONDARY = 5 # Secondary columns need >= 5 words
cluster_infos = []
for c_edges, c_widxs in zip(clusters, cluster_widxs):
if len(c_edges) < 2:
continue
y_positions = [word_dicts[idx]['top'] for idx in c_widxs]
y_span = max(y_positions) - min(y_positions)
y_coverage = y_span / content_h if content_h > 0 else 0.0
cluster_infos.append({
'mean_x': int(np.mean(c_edges)),
'count': len(c_edges),
'min_edge': min(c_edges),
'max_edge': max(c_edges),
'y_min': min(y_positions),
'y_max': max(y_positions),
'y_coverage': y_coverage,
})
logger.info(f"ColumnGeometry: {len(cluster_infos)} clusters with >=2 words "
f"(from {len(clusters)} total), y_coverage: "
f"{[(ci['mean_x']+left_x, ci['count'], f\"{ci['y_coverage']:.0%}\") for ci in cluster_infos[:12]]}")
# Primary: good vertical coverage
primary = [c for c in cluster_infos if c['y_coverage'] >= MIN_Y_COVERAGE_PRIMARY]
# Secondary: moderate coverage with enough words
primary_set = set(id(c) for c in primary)
secondary = [c for c in cluster_infos
if id(c) not in primary_set
and c['y_coverage'] >= MIN_Y_COVERAGE_SECONDARY
and c['count'] >= MIN_WORDS_SECONDARY]
significant = sorted(primary + secondary, key=lambda c: c['mean_x'])
logger.info(f"ColumnGeometry: {len(significant)} significant clusters "
f"(from {len(clusters)} total): "
f"{[(s[0]+left_x, s[1]) for s in significant[:10]]}")
f"(primary={len(primary)}, secondary={len(secondary)}): "
f"{[(s['mean_x']+left_x, s['count'], f\"{s['y_coverage']:.0%}\") for s in significant[:10]]}")
if len(significant) < 3:
logger.info("ColumnGeometry: < 3 clusters, signaling fallback")
logger.info("ColumnGeometry: < 3 clusters after verticality filter, signaling fallback")
return None
# --- Merge clusters that are very close (within 2*tolerance) ---
merged = [significant[0]]
# --- Merge clusters that are very close (3% of content width) ---
merge_distance = max(20, int(content_w * 0.03))
merged = [significant[0].copy()]
for s in significant[1:]:
if s[0] - merged[-1][0] < 2 * tolerance:
if s['mean_x'] - merged[-1]['mean_x'] < merge_distance:
prev = merged[-1]
total = prev[1] + s[1]
avg_x = (prev[0] * prev[1] + s[0] * s[1]) // total
merged[-1] = (avg_x, total, min(prev[2], s[2]), max(prev[3], s[3]))
total = prev['count'] + s['count']
avg_x = (prev['mean_x'] * prev['count'] + s['mean_x'] * s['count']) // total
prev['mean_x'] = avg_x
prev['count'] = total
prev['min_edge'] = min(prev['min_edge'], s['min_edge'])
prev['max_edge'] = max(prev['max_edge'], s['max_edge'])
prev['y_min'] = min(prev['y_min'], s['y_min'])
prev['y_max'] = max(prev['y_max'], s['y_max'])
prev['y_coverage'] = (prev['y_max'] - prev['y_min']) / content_h if content_h > 0 else 0.0
else:
merged.append(s)
merged.append(s.copy())
logger.info(f"ColumnGeometry: {len(merged)} clusters after merging: "
f"{[(m[0]+left_x, m[1]) for m in merged]}")
logger.info(f"ColumnGeometry: {len(merged)} clusters after merging (dist={merge_distance}px): "
f"{[(m['mean_x']+left_x, m['count'], f\"{m['y_coverage']:.0%}\") for m in merged]}")
if len(merged) < 3:
logger.info("ColumnGeometry: < 3 merged clusters, signaling fallback")
return None
# --- Derive column boundaries ---
margin_px = max(5, int(content_w * 0.005))
margin_px = max(6, int(content_w * 0.003)) # ~2mm margin before column start
col_starts = []
for center_x, count, min_edge, max_edge in merged:
abs_start = max(0, left_x + min_edge - margin_px)
col_starts.append((abs_start, count))
for m in merged:
abs_start = max(0, left_x + m['min_edge'] - margin_px)
col_starts.append((abs_start, m['count']))
# Calculate column widths and assign words to columns
geometries = []