feat(ocr-pipeline): replace clustering column detection with whitespace-gap analysis

Column detection now uses vertical projection profiles to find whitespace
gaps between columns, then validates gaps against word bounding boxes to
prevent splitting through words. Old clustering algorithm extracted as
fallback (_detect_columns_by_clustering) for pages with < 2 detected gaps.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-02-28 00:36:28 +01:00
parent b03cb0a1e6
commit ce0815007e

View File

@@ -875,11 +875,147 @@ def analyze_layout(layout_img: np.ndarray, ocr_img: np.ndarray) -> List[PageRegi
# --- Phase A: Geometry Detection ---
def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int]]:
"""Detect column geometry by clustering left-aligned word positions.
def _detect_columns_by_clustering(
word_dicts: List[Dict],
left_edges: List[int],
edge_word_indices: List[int],
content_w: int,
content_h: int,
left_x: int,
right_x: int,
top_y: int,
bottom_y: int,
) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int]]:
"""Fallback: detect columns by clustering left-aligned word positions.
Phase A of the two-phase column detection. Returns untyped column
geometries with their words for subsequent content-based classification.
Used when the primary gap-based algorithm finds fewer than 2 gaps.
"""
tolerance = max(10, int(content_w * 0.01))
sorted_pairs = sorted(zip(left_edges, edge_word_indices), key=lambda p: p[0])
clusters = []
cluster_widxs = []
cur_edges = [sorted_pairs[0][0]]
cur_widxs = [sorted_pairs[0][1]]
for edge, widx in sorted_pairs[1:]:
if edge - cur_edges[-1] <= tolerance:
cur_edges.append(edge)
cur_widxs.append(widx)
else:
clusters.append(cur_edges)
cluster_widxs.append(cur_widxs)
cur_edges = [edge]
cur_widxs = [widx]
clusters.append(cur_edges)
cluster_widxs.append(cur_widxs)
MIN_Y_COVERAGE_PRIMARY = 0.30
MIN_Y_COVERAGE_SECONDARY = 0.15
MIN_WORDS_SECONDARY = 5
cluster_infos = []
for c_edges, c_widxs in zip(clusters, cluster_widxs):
if len(c_edges) < 2:
continue
y_positions = [word_dicts[idx]['top'] for idx in c_widxs]
y_span = max(y_positions) - min(y_positions)
y_coverage = y_span / content_h if content_h > 0 else 0.0
cluster_infos.append({
'mean_x': int(np.mean(c_edges)),
'count': len(c_edges),
'min_edge': min(c_edges),
'max_edge': max(c_edges),
'y_min': min(y_positions),
'y_max': max(y_positions),
'y_coverage': y_coverage,
})
primary = [c for c in cluster_infos if c['y_coverage'] >= MIN_Y_COVERAGE_PRIMARY]
primary_set = set(id(c) for c in primary)
secondary = [c for c in cluster_infos
if id(c) not in primary_set
and c['y_coverage'] >= MIN_Y_COVERAGE_SECONDARY
and c['count'] >= MIN_WORDS_SECONDARY]
significant = sorted(primary + secondary, key=lambda c: c['mean_x'])
if len(significant) < 3:
logger.info("ColumnGeometry clustering fallback: < 3 significant clusters")
return None
merge_distance = max(30, int(content_w * 0.06))
merged = [significant[0].copy()]
for s in significant[1:]:
if s['mean_x'] - merged[-1]['mean_x'] < merge_distance:
prev = merged[-1]
total = prev['count'] + s['count']
avg_x = (prev['mean_x'] * prev['count'] + s['mean_x'] * s['count']) // total
prev['mean_x'] = avg_x
prev['count'] = total
prev['min_edge'] = min(prev['min_edge'], s['min_edge'])
prev['max_edge'] = max(prev['max_edge'], s['max_edge'])
else:
merged.append(s.copy())
if len(merged) < 3:
logger.info("ColumnGeometry clustering fallback: < 3 merged clusters")
return None
logger.info(f"ColumnGeometry clustering fallback: {len(merged)} columns from clustering")
margin_px = max(6, int(content_w * 0.003))
return _build_geometries_from_starts(
[(max(0, left_x + m['min_edge'] - margin_px), m['count']) for m in merged],
word_dicts, left_x, right_x, top_y, bottom_y, content_w, content_h,
)
def _build_geometries_from_starts(
col_starts: List[Tuple[int, int]],
word_dicts: List[Dict],
left_x: int,
right_x: int,
top_y: int,
bottom_y: int,
content_w: int,
content_h: int,
) -> Tuple[List[ColumnGeometry], int, int, int, int]:
"""Build ColumnGeometry objects from a list of (abs_start_x, word_count) pairs."""
geometries = []
for i, (start_x, count) in enumerate(col_starts):
if i + 1 < len(col_starts):
col_width = col_starts[i + 1][0] - start_x
else:
col_width = right_x - start_x
col_left_rel = start_x - left_x
col_right_rel = col_left_rel + col_width
col_words = [w for w in word_dicts
if col_left_rel <= w['left'] < col_right_rel]
geometries.append(ColumnGeometry(
index=i,
x=start_x,
y=top_y,
width=col_width,
height=content_h,
word_count=len(col_words),
words=col_words,
width_ratio=col_width / content_w if content_w > 0 else 0.0,
))
logger.info(f"ColumnGeometry: {len(geometries)} columns: "
f"{[(g.index, g.x, g.width, g.word_count) for g in geometries]}")
return (geometries, left_x, right_x, top_y, bottom_y)
def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int]]:
"""Detect column geometry using whitespace-gap analysis with word validation.
Phase A of the two-phase column detection. Uses vertical projection
profiles to find whitespace gaps between columns, then validates that
no gap cuts through a word bounding box.
Falls back to clustering-based detection if fewer than 2 gaps are found.
Args:
ocr_img: Binarized grayscale image for layout analysis.
@@ -887,11 +1023,11 @@ def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Opt
Returns:
Tuple of (geometries, left_x, right_x, top_y, bottom_y) or None if
fewer than 3 clusters are found (signals fallback needed).
detection fails entirely.
"""
h, w = ocr_img.shape[:2]
# --- Find content bounds ---
# --- Step 1: Find content bounds ---
inv = cv2.bitwise_not(ocr_img)
left_x, right_x, top_y, bottom_y = _find_content_bounds(inv)
content_w = right_x - left_x
@@ -905,7 +1041,7 @@ def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Opt
logger.info(f"ColumnGeometry: content bounds x=[{left_x}..{right_x}] ({content_w}px), "
f"y=[{top_y}..{bottom_y}] ({content_h}px)")
# --- Get word bounding boxes from Tesseract ---
# --- Step 2: Get word bounding boxes from Tesseract ---
content_roi = dewarped_bgr[top_y:bottom_y, left_x:right_x]
pil_img = Image.fromarray(cv2.cvtColor(content_roi, cv2.COLOR_BGR2RGB))
@@ -915,10 +1051,9 @@ def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Opt
logger.warning(f"ColumnGeometry: Tesseract image_to_data failed: {e}")
return None
# Collect words with their full info
word_dicts = []
left_edges = []
edge_word_indices = [] # Track which word_dicts index each edge belongs to
edge_word_indices = []
n_words = len(data['text'])
for i in range(n_words):
conf = int(data['conf'][i]) if str(data['conf'][i]).lstrip('-').isdigit() else -1
@@ -942,146 +1077,171 @@ def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Opt
logger.info(f"ColumnGeometry: {len(left_edges)} words detected in content area")
# --- Cluster left edges (tracking word indices per cluster) ---
tolerance = max(10, int(content_w * 0.01))
# --- Step 3: Vertical projection profile ---
content_strip = inv[top_y:bottom_y, left_x:right_x]
v_proj = np.sum(content_strip, axis=0).astype(float)
v_proj_norm = v_proj / (content_h * 255) if content_h > 0 else v_proj
# Sort edges while keeping word index association
sorted_pairs = sorted(zip(left_edges, edge_word_indices), key=lambda p: p[0])
# Smooth the projection to avoid noise-induced micro-gaps
kernel_size = max(5, content_w // 80)
if kernel_size % 2 == 0:
kernel_size += 1 # keep odd for symmetry
v_smooth = np.convolve(v_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
clusters = [] # list of lists of edge x-values
cluster_widxs = [] # parallel list of lists of word_dicts indices
cur_edges = [sorted_pairs[0][0]]
cur_widxs = [sorted_pairs[0][1]]
for edge, widx in sorted_pairs[1:]:
if edge - cur_edges[-1] <= tolerance:
cur_edges.append(edge)
cur_widxs.append(widx)
# --- Step 4: Find whitespace gaps ---
# Threshold: areas with very little ink density are gaps
median_density = float(np.median(v_smooth[v_smooth > 0])) if np.any(v_smooth > 0) else 0.01
gap_threshold = max(median_density * 0.15, 0.005)
in_gap = v_smooth < gap_threshold
MIN_GAP_WIDTH = max(8, content_w // 200) # min ~8px or 0.5% of content width
# Collect contiguous gap regions
raw_gaps = [] # (start_x_rel, end_x_rel) relative to content ROI
gap_start = None
for x in range(len(in_gap)):
if in_gap[x]:
if gap_start is None:
gap_start = x
else:
clusters.append(cur_edges)
cluster_widxs.append(cur_widxs)
cur_edges = [edge]
cur_widxs = [widx]
clusters.append(cur_edges)
cluster_widxs.append(cur_widxs)
if gap_start is not None:
gap_width = x - gap_start
if gap_width >= MIN_GAP_WIDTH:
raw_gaps.append((gap_start, x))
gap_start = None
# Handle gap at the right edge
if gap_start is not None:
gap_width = len(in_gap) - gap_start
if gap_width >= MIN_GAP_WIDTH:
raw_gaps.append((gap_start, len(in_gap)))
# --- Enrich clusters with Y-span info and apply verticality filter ---
MIN_Y_COVERAGE_PRIMARY = 0.30 # Primary columns span >= 30% of page height
MIN_Y_COVERAGE_SECONDARY = 0.15 # Secondary columns span >= 15%
MIN_WORDS_SECONDARY = 5 # Secondary columns need >= 5 words
logger.info(f"ColumnGeometry: {len(raw_gaps)} raw gaps found (threshold={gap_threshold:.4f}, "
f"min_width={MIN_GAP_WIDTH}px): "
f"{[(g[0]+left_x, g[1]+left_x, g[1]-g[0]) for g in raw_gaps]}")
cluster_infos = []
for c_edges, c_widxs in zip(clusters, cluster_widxs):
if len(c_edges) < 2:
continue
y_positions = [word_dicts[idx]['top'] for idx in c_widxs]
y_span = max(y_positions) - min(y_positions)
y_coverage = y_span / content_h if content_h > 0 else 0.0
# --- Step 5: Validate gaps against word bounding boxes ---
validated_gaps = []
for gap_start_rel, gap_end_rel in raw_gaps:
# Check if any word overlaps with this gap region
overlapping = False
for wd in word_dicts:
word_left = wd['left']
word_right = wd['left'] + wd['width']
if word_left < gap_end_rel and word_right > gap_start_rel:
overlapping = True
break
cluster_infos.append({
'mean_x': int(np.mean(c_edges)),
'count': len(c_edges),
'min_edge': min(c_edges),
'max_edge': max(c_edges),
'y_min': min(y_positions),
'y_max': max(y_positions),
'y_coverage': y_coverage,
})
_ci_summary = [(ci['mean_x']+left_x, ci['count'], format(ci['y_coverage'], '.0%')) for ci in cluster_infos[:12]]
logger.info(f"ColumnGeometry: {len(cluster_infos)} clusters with >=2 words "
f"(from {len(clusters)} total), y_coverage: {_ci_summary}")
# Primary: good vertical coverage
primary = [c for c in cluster_infos if c['y_coverage'] >= MIN_Y_COVERAGE_PRIMARY]
# Secondary: moderate coverage with enough words
primary_set = set(id(c) for c in primary)
secondary = [c for c in cluster_infos
if id(c) not in primary_set
and c['y_coverage'] >= MIN_Y_COVERAGE_SECONDARY
and c['count'] >= MIN_WORDS_SECONDARY]
significant = sorted(primary + secondary, key=lambda c: c['mean_x'])
_sig_summary = [(s['mean_x']+left_x, s['count'], format(s['y_coverage'], '.0%')) for s in significant[:10]]
logger.info(f"ColumnGeometry: {len(significant)} significant clusters "
f"(primary={len(primary)}, secondary={len(secondary)}): {_sig_summary}")
if len(significant) < 3:
logger.info("ColumnGeometry: < 3 clusters after verticality filter, signaling fallback")
return None
# --- Merge clusters that are very close ---
# 6% of content width: on a typical 5-col vocab page (~1500px wide),
# this is ~90px, which merges sub-alignments within a single column
# while keeping real column boundaries (~300px apart) separate.
merge_distance = max(30, int(content_w * 0.06))
merged = [significant[0].copy()]
for s in significant[1:]:
if s['mean_x'] - merged[-1]['mean_x'] < merge_distance:
prev = merged[-1]
total = prev['count'] + s['count']
avg_x = (prev['mean_x'] * prev['count'] + s['mean_x'] * s['count']) // total
prev['mean_x'] = avg_x
prev['count'] = total
prev['min_edge'] = min(prev['min_edge'], s['min_edge'])
prev['max_edge'] = max(prev['max_edge'], s['max_edge'])
prev['y_min'] = min(prev['y_min'], s['y_min'])
prev['y_max'] = max(prev['y_max'], s['y_max'])
prev['y_coverage'] = (prev['y_max'] - prev['y_min']) / content_h if content_h > 0 else 0.0
if not overlapping:
validated_gaps.append((gap_start_rel, gap_end_rel))
else:
merged.append(s.copy())
# Try to shift the gap to avoid the overlapping word(s)
# Find the tightest word boundaries within the gap region
min_word_left = content_w
max_word_right = 0
for wd in word_dicts:
word_left = wd['left']
word_right = wd['left'] + wd['width']
if word_left < gap_end_rel and word_right > gap_start_rel:
min_word_left = min(min_word_left, word_left)
max_word_right = max(max_word_right, word_right)
# --- Post-merge: absorb tiny clusters (< 5% content width) into neighbors ---
i = 0
absorbed_count = 0
while i < len(merged) and len(merged) > 3:
if i + 1 < len(merged):
cluster_w = merged[i + 1]['mean_x'] - merged[i]['mean_x']
else:
cluster_w = content_w - (merged[i]['mean_x'] - merged[0]['mean_x'])
if cluster_w / content_w < 0.05:
# Absorb into neighbor (prefer left)
if i > 0:
target = merged[i - 1]
# Try gap before the overlapping words
if min_word_left - gap_start_rel >= MIN_GAP_WIDTH:
validated_gaps.append((gap_start_rel, min_word_left))
logger.debug(f"ColumnGeometry: gap shifted left to avoid word at {min_word_left}")
# Try gap after the overlapping words
elif gap_end_rel - max_word_right >= MIN_GAP_WIDTH:
validated_gaps.append((max_word_right, gap_end_rel))
logger.debug(f"ColumnGeometry: gap shifted right to avoid word at {max_word_right}")
else:
target = merged[i + 1]
target['count'] += merged[i]['count']
target['min_edge'] = min(target['min_edge'], merged[i]['min_edge'])
target['max_edge'] = max(target['max_edge'], merged[i]['max_edge'])
target['y_min'] = min(target['y_min'], merged[i]['y_min'])
target['y_max'] = max(target['y_max'], merged[i]['y_max'])
target['y_coverage'] = (target['y_max'] - target['y_min']) / content_h if content_h > 0 else 0.0
del merged[i]
absorbed_count += 1
else:
i += 1
if absorbed_count:
logger.info(f"ColumnGeometry: absorbed {absorbed_count} tiny clusters (< 5% width)")
logger.debug(f"ColumnGeometry: gap [{gap_start_rel}..{gap_end_rel}] "
f"discarded (word overlap, no room to shift)")
_merged_summary = [(m['mean_x']+left_x, m['count'], format(m['y_coverage'], '.0%')) for m in merged]
logger.info(f"ColumnGeometry: {len(merged)} clusters after merging (dist={merge_distance}px): {_merged_summary}")
logger.info(f"ColumnGeometry: {len(validated_gaps)} gaps after word validation: "
f"{[(g[0]+left_x, g[1]+left_x, g[1]-g[0]) for g in validated_gaps]}")
if len(merged) < 3:
logger.info("ColumnGeometry: < 3 merged clusters, signaling fallback")
return None
# --- Step 6: Fallback to clustering if too few gaps ---
if len(validated_gaps) < 2:
logger.info("ColumnGeometry: < 2 gaps found, falling back to clustering")
return _detect_columns_by_clustering(
word_dicts, left_edges, edge_word_indices,
content_w, content_h, left_x, right_x, top_y, bottom_y,
)
# --- Derive column boundaries ---
margin_px = max(6, int(content_w * 0.003)) # ~2mm margin before column start
# --- Step 7: Derive column boundaries from gaps ---
# Sort gaps by position
validated_gaps.sort(key=lambda g: g[0])
# Identify margin gaps (first and last) vs interior gaps
# A margin gap touches the edge of the content area (within 2% tolerance)
edge_tolerance = max(10, int(content_w * 0.02))
is_left_margin = validated_gaps[0][0] <= edge_tolerance
is_right_margin = validated_gaps[-1][1] >= content_w - edge_tolerance
# Interior gaps define column boundaries
# Column starts at the end of a gap, ends at the start of the next gap
col_starts = []
for m in merged:
abs_start = max(0, left_x + m['min_edge'] - margin_px)
col_starts.append((abs_start, m['count']))
# Calculate column widths and assign words to columns
geometries = []
for i, (start_x, count) in enumerate(col_starts):
if is_left_margin:
# First column starts after the left margin gap
first_gap_end = validated_gaps[0][1]
interior_gaps = validated_gaps[1:]
else:
# No left margin gap — first column starts at content left edge
first_gap_end = 0
interior_gaps = validated_gaps[:]
if is_right_margin:
# Last gap is right margin — don't use it as column start
interior_gaps_for_boundaries = interior_gaps[:-1]
right_boundary = validated_gaps[-1][0] # last column ends at right margin gap start
else:
interior_gaps_for_boundaries = interior_gaps
right_boundary = content_w
# First column
col_starts.append(left_x + first_gap_end)
# Columns between interior gaps
for gap_start_rel, gap_end_rel in interior_gaps_for_boundaries:
col_starts.append(left_x + gap_end_rel)
# Count words per column region (for logging)
col_start_counts = []
for i, start_x in enumerate(col_starts):
if i + 1 < len(col_starts):
col_width = col_starts[i + 1][0] - start_x
next_start = col_starts[i + 1]
elif is_right_margin:
next_start = left_x + right_boundary
else:
col_width = right_x - start_x
next_start = right_x
# Assign words to this column based on left edge
col_left_rel = start_x - left_x
col_right_rel = next_start - left_x
n_words_in_col = sum(1 for w in word_dicts
if col_left_rel <= w['left'] < col_right_rel)
col_start_counts.append((start_x, n_words_in_col))
logger.info(f"ColumnGeometry: {len(col_starts)} columns from {len(validated_gaps)} gaps "
f"(left_margin={is_left_margin}, right_margin={is_right_margin}): "
f"{col_start_counts}")
# --- Step 8: Build ColumnGeometry objects ---
# Determine right edge for each column
all_boundaries = []
for i, start_x in enumerate(col_starts):
if i + 1 < len(col_starts):
end_x = col_starts[i + 1]
elif is_right_margin:
end_x = left_x + right_boundary
else:
end_x = right_x
all_boundaries.append((start_x, end_x))
geometries = []
for i, (start_x, end_x) in enumerate(all_boundaries):
col_width = end_x - start_x
col_left_rel = start_x - left_x
col_right_rel = col_left_rel + col_width
col_words = [w for w in word_dicts