feat(ocr-pipeline): uniform grid regularization for row detection (Step 7)

Replace _split_oversized_rows() with _regularize_row_grid(). When ≥60%
of content rows have consistent height (±25% of median), overlay a
uniform grid with the standard row height over the entire content area.
This leverages the fact that books/vocab lists use constant row heights.

Validates grid by checking ≥85% of words land in a grid row. Falls back
to gap-based rows if heights are too irregular or words don't fit.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-01 11:50:50 +01:00
parent ba65e47654
commit ec47045c15

View File

@@ -1539,10 +1539,11 @@ def detect_row_geometry(
gap_before=gap_before,
))
# --- Step 7: Split oversized rows ---
# If a content row is >1.5× the median height, re-analyze it with a local
# horizontal projection to find missed row boundaries within.
rows = _split_oversized_rows(rows, inv, left_x, right_x, top_y, word_dicts)
# --- Step 7: Uniform grid regularization ---
# Books and vocab lists use a constant row height. If most detected rows
# agree on a height, overlay a uniform grid to fix oversized rows.
rows = _regularize_row_grid(rows, word_dicts, left_x, right_x, top_y,
content_w, content_h, inv)
type_counts = {}
for r in rows:
@@ -1552,172 +1553,154 @@ def detect_row_geometry(
return rows
def _split_oversized_rows(
def _regularize_row_grid(
rows: List['RowGeometry'],
inv: np.ndarray,
word_dicts: List[Dict],
left_x: int, right_x: int,
top_y: int,
word_dicts: List[Dict],
content_w: int, content_h: int,
inv: np.ndarray,
) -> List['RowGeometry']:
"""Split content rows that are >1.5× the median height.
"""Replace gap-based rows with a uniform grid when row heights are consistent.
Re-analyses oversized rows with a local horizontal projection profile
to find missed row boundaries within. This catches cases where Step 4's
initial gap analysis merged multiple vocabulary lines (e.g. because an
image or dense text obscured the gap).
Books and vocabulary lists use a constant row height throughout the page.
If ≥60% of detected content rows have a height within ±25% of the median,
we overlay a uniform grid with that height over the entire content area.
This naturally fixes oversized rows without special-case splitting.
Returns a new list with oversized rows replaced by sub-rows, re-indexed.
Header/footer rows are preserved as-is.
Falls back to returning the original rows if the heights are too irregular.
"""
content_rows = [r for r in rows if r.row_type == 'content']
if len(content_rows) < 3:
non_content = [r for r in rows if r.row_type != 'content']
if len(content_rows) < 5:
# Not enough rows to establish a reliable pattern
return rows
heights = sorted(r.height for r in content_rows)
median_h = heights[len(heights) // 2]
heights = [r.height for r in content_rows]
heights_sorted = sorted(heights)
median_h = heights_sorted[len(heights_sorted) // 2]
if median_h <= 10:
return rows
threshold = median_h * 1.5
content_w = right_x - left_x
# Check consistency: how many rows are within ±25% of median?
tolerance = 0.25
lo = median_h * (1 - tolerance)
hi = median_h * (1 + tolerance)
consistent = sum(1 for h in heights if lo <= h <= hi)
consistency_ratio = consistent / len(heights)
result: List[RowGeometry] = []
split_total = 0
if consistency_ratio < 0.6:
logger.info(f"RowGrid: inconsistent heights ({consistency_ratio:.0%} within "
f"±{tolerance:.0%} of median {median_h}px), keeping gap-based rows")
return rows
for row in rows:
if row.row_type != 'content' or row.height <= threshold:
result.append(row)
continue
# --- Determine the standard row height more precisely ---
# Use the mean of consistent rows (those within tolerance) for stability
consistent_heights = [h for h in heights if lo <= h <= hi]
std_height = round(sum(consistent_heights) / len(consistent_heights))
# --- Local horizontal projection on this row's strip ---
row_y_abs = row.y
row_h = row.height
strip = inv[row_y_abs:row_y_abs + row_h, left_x:right_x]
# --- Determine content zone (between header/footer) ---
content_start_abs = min(r.y for r in content_rows)
content_end_abs = max(r.y + r.height for r in content_rows)
if strip.size == 0:
result.append(row)
continue
# Snap to nearest grid line from the first detected content row
# Use the first well-sized content row's top as anchor
anchor_y = content_start_abs
for r in content_rows:
if lo <= r.height <= hi:
anchor_y = r.y
break
# Word-coverage mask (same approach as main detection)
pad_y = max(2, row_h // 50)
word_mask = np.zeros_like(strip)
# --- Build uniform grid ---
# Extend grid upward from anchor to cover content_start_abs
grid_start = anchor_y
while grid_start - std_height >= content_start_abs - std_height * 0.3:
if grid_start - std_height < content_start_abs - std_height * 0.5:
break
grid_start -= std_height
# Generate grid lines from grid_start to content_end_abs
grid_rows: List[RowGeometry] = []
y = grid_start
idx = 0
while y < content_end_abs - std_height * 0.3:
row_y = y
row_h = std_height
# Last row: extend to content_end if remainder > 30% of std_height
if y + std_height >= content_end_abs:
row_h = content_end_abs - y
if row_h < std_height * 0.3:
break # too small, skip
# Assign words whose vertical center falls in this grid row
row_words = [w for w in word_dicts
if w['top'] + top_y >= row_y_abs - pad_y
and w['top'] + top_y < row_y_abs + row_h + pad_y]
if w['top'] + top_y >= row_y - 2
and w['top'] + w['height'] / 2 + top_y < row_y + row_h + 2]
for wd in row_words:
wy = wd['top'] + top_y - row_y_abs # relative to strip
y1 = max(0, wy - pad_y)
y2 = min(row_h, wy + wd['height'] + pad_y)
x1 = max(0, wd['left'])
x2 = min(content_w, wd['left'] + wd['width'])
word_mask[y1:y2, x1:x2] = 255
grid_rows.append(RowGeometry(
index=idx,
x=left_x,
y=round(row_y),
width=content_w,
height=round(row_h),
word_count=len(row_words),
words=row_words,
row_type='content',
gap_before=0,
))
masked = cv2.bitwise_and(strip, word_mask)
h_proj = np.sum(masked, axis=1).astype(float)
h_proj_norm = h_proj / (content_w * 255) if content_w > 0 else h_proj
idx += 1
y += std_height
# Smooth
k = max(3, row_h // 40)
if k % 2 == 0:
k += 1
h_smooth = np.convolve(h_proj_norm, np.ones(k) / k, mode='same')
if not grid_rows:
return rows
# Gap detection within the row
med_density = float(np.median(h_smooth[h_smooth > 0])) if np.any(h_smooth > 0) else 0.01
gap_thresh = max(med_density * 0.15, 0.003)
in_gap = h_smooth < gap_thresh
# --- Validate: check that words fit the grid well ---
# Count words that land in exactly one grid row
all_content_words = []
for r in content_rows:
all_content_words.extend(r.words)
# Deduplicate by position
seen = set()
unique_words = []
for w in all_content_words:
key = (w['left'], w['top'], w['width'], w['height'])
if key not in seen:
seen.add(key)
unique_words.append(w)
min_gap_h = max(2, row_h // 30) # smaller threshold for sub-gaps
local_gaps = []
gap_start = None
for y in range(len(in_gap)):
if in_gap[y]:
if gap_start is None:
gap_start = y
else:
if gap_start is not None:
if y - gap_start >= min_gap_h:
local_gaps.append((gap_start, y))
gap_start = None
if gap_start is not None and len(in_gap) - gap_start >= min_gap_h:
local_gaps.append((gap_start, len(in_gap)))
if not local_gaps:
# No sub-gaps found — keep original row
result.append(row)
continue
# Validate gaps against words (don't split through a word)
valid_gaps = []
for gs, ge in local_gaps:
overlapping = False
for wd in row_words:
wy = wd['top'] + top_y - row_y_abs
wy2 = wy + wd['height']
if wy < ge and wy2 > gs:
overlapping = True
if unique_words:
matched = 0
for w in unique_words:
w_center_y = w['top'] + top_y + w['height'] / 2
for gr in grid_rows:
if gr.y <= w_center_y < gr.y + gr.height:
matched += 1
break
if not overlapping:
valid_gaps.append((gs, ge))
match_ratio = matched / len(unique_words)
if not valid_gaps:
result.append(row)
continue
if match_ratio < 0.85:
logger.info(f"RowGrid: grid only matches {match_ratio:.0%} of words, "
f"keeping gap-based rows")
return rows
valid_gaps.sort()
# --- Merge header/footer rows back ---
result = list(non_content) + grid_rows
result.sort(key=lambda r: r.y)
for i, r in enumerate(result):
r.index = i
# Build sub-row boundaries from gaps
sub_bounds = [] # (start_rel, end_rel) within the row strip
# Before first gap
if valid_gaps[0][0] > 0:
sub_bounds.append((0, valid_gaps[0][0]))
# Between gaps
for i in range(len(valid_gaps) - 1):
sub_bounds.append((valid_gaps[i][1], valid_gaps[i + 1][0]))
# After last gap
if valid_gaps[-1][1] < row_h:
sub_bounds.append((valid_gaps[-1][1], row_h))
# Filter out empty sub-rows
sub_bounds = [(s, e) for s, e in sub_bounds if e - s > 2]
if len(sub_bounds) <= 1:
result.append(row)
continue
# Create sub-rows
for sb_start, sb_end in sub_bounds:
sub_y_abs = row_y_abs + sb_start
sub_h = sb_end - sb_start
# Assign words to this sub-row
sub_words = [w for w in row_words
if w['top'] + top_y >= sub_y_abs - 2
and w['top'] + top_y + w['height'] <= sub_y_abs + sub_h + 2]
result.append(RowGeometry(
index=0, # re-indexed below
x=row.x,
y=sub_y_abs,
width=row.width,
height=sub_h,
word_count=len(sub_words),
words=sub_words,
row_type='content',
gap_before=0,
))
split_total += len(sub_bounds) - 1
logger.info(f"RowGeometry: split oversized row (h={row_h}) "
f"into {len(sub_bounds)} sub-rows "
f"(median_h={median_h}, {len(valid_gaps)} gaps)")
if split_total > 0:
# Re-index all rows
result.sort(key=lambda r: r.y)
for i, r in enumerate(result):
r.index = i
logger.info(f"RowGeometry: {split_total} oversized splits → "
f"{len(result)} total rows (was {len(rows)})")
n_oversized = sum(1 for r in content_rows if r.height > std_height * 1.5)
logger.info(f"RowGrid: uniform grid applied (std_height={std_height}px, "
f"{len(grid_rows)} grid rows, was {len(content_rows)} content rows, "
f"{n_oversized} were oversized, "
f"consistency={consistency_ratio:.0%})")
return result