feat(ocr-pipeline): word-center grid with section-break detection
Replace rigid uniform grid with bottom-up approach that derives row boundaries from word vertical centers: - Group words into line clusters, compute center_y per cluster - Compute pitch (distance between consecutive centers) - Detect section breaks where gap > 1.8× median pitch - Place row boundaries at midpoints between consecutive centers - Per-section local pitch adapts to heading/paragraph spacing - Validate ≥85% word placement, fallback to gap-based rows Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1539,9 +1539,9 @@ def detect_row_geometry(
|
||||
gap_before=gap_before,
|
||||
))
|
||||
|
||||
# --- Step 7: Uniform grid regularization ---
|
||||
# Books and vocab lists use a constant row height. If most detected rows
|
||||
# agree on a height, overlay a uniform grid to fix oversized rows.
|
||||
# --- Step 7: Word-center grid regularization ---
|
||||
# Derive precise row boundaries from word vertical centers. Detects
|
||||
# section breaks (headings, paragraphs) and builds per-section grids.
|
||||
rows = _regularize_row_grid(rows, word_dicts, left_x, right_x, top_y,
|
||||
content_w, content_h, inv)
|
||||
|
||||
@@ -1561,146 +1561,222 @@ def _regularize_row_grid(
|
||||
content_w: int, content_h: int,
|
||||
inv: np.ndarray,
|
||||
) -> List['RowGeometry']:
|
||||
"""Replace gap-based rows with a uniform grid when row heights are consistent.
|
||||
"""Rebuild row boundaries from word center-lines with section-break awareness.
|
||||
|
||||
Books and vocabulary lists use a constant row height throughout the page.
|
||||
If ≥60% of detected content rows have a height within ±25% of the median,
|
||||
we overlay a uniform grid with that height over the entire content area.
|
||||
This naturally fixes oversized rows without special-case splitting.
|
||||
Instead of overlaying a rigid grid, this derives row positions bottom-up
|
||||
from the words themselves:
|
||||
|
||||
Header/footer rows are preserved as-is.
|
||||
1. Group words into line clusters (by Y proximity).
|
||||
2. For each cluster compute center_y (median of word vertical centers)
|
||||
and letter_height (median of word heights).
|
||||
3. Compute the pitch (distance between consecutive centers).
|
||||
4. Detect section breaks where the gap is >1.8× the median pitch
|
||||
(headings, sub-headings, paragraph breaks).
|
||||
5. Within each section, use the local pitch to place row boundaries
|
||||
at the midpoints between consecutive centers.
|
||||
6. Validate that ≥85% of words land in a grid row; otherwise fall back.
|
||||
|
||||
Falls back to returning the original rows if the heights are too irregular.
|
||||
Header/footer rows from the gap-based detection are preserved.
|
||||
"""
|
||||
content_rows = [r for r in rows if r.row_type == 'content']
|
||||
non_content = [r for r in rows if r.row_type != 'content']
|
||||
|
||||
if len(content_rows) < 5:
|
||||
# Not enough rows to establish a reliable pattern
|
||||
return rows
|
||||
|
||||
heights = [r.height for r in content_rows]
|
||||
heights_sorted = sorted(heights)
|
||||
median_h = heights_sorted[len(heights_sorted) // 2]
|
||||
|
||||
if median_h <= 10:
|
||||
return rows
|
||||
|
||||
# Check consistency: how many rows are within ±25% of median?
|
||||
tolerance = 0.25
|
||||
lo = median_h * (1 - tolerance)
|
||||
hi = median_h * (1 + tolerance)
|
||||
consistent = sum(1 for h in heights if lo <= h <= hi)
|
||||
consistency_ratio = consistent / len(heights)
|
||||
|
||||
if consistency_ratio < 0.6:
|
||||
logger.info(f"RowGrid: inconsistent heights ({consistency_ratio:.0%} within "
|
||||
f"±{tolerance:.0%} of median {median_h}px), keeping gap-based rows")
|
||||
return rows
|
||||
|
||||
# --- Determine the standard row height more precisely ---
|
||||
# Use the mean of consistent rows (those within tolerance) for stability
|
||||
consistent_heights = [h for h in heights if lo <= h <= hi]
|
||||
std_height = round(sum(consistent_heights) / len(consistent_heights))
|
||||
|
||||
# --- Determine content zone (between header/footer) ---
|
||||
content_start_abs = min(r.y for r in content_rows)
|
||||
content_end_abs = max(r.y + r.height for r in content_rows)
|
||||
|
||||
# Snap to nearest grid line from the first detected content row
|
||||
# Use the first well-sized content row's top as anchor
|
||||
anchor_y = content_start_abs
|
||||
# --- Step A: Group ALL words into line clusters ---
|
||||
# Collect words that belong to content rows
|
||||
content_words: List[Dict] = []
|
||||
seen_keys: set = set()
|
||||
for r in content_rows:
|
||||
if lo <= r.height <= hi:
|
||||
anchor_y = r.y
|
||||
break
|
||||
for w in r.words:
|
||||
key = (w['left'], w['top'], w['width'], w['height'])
|
||||
if key not in seen_keys:
|
||||
seen_keys.add(key)
|
||||
content_words.append(w)
|
||||
|
||||
# --- Build uniform grid ---
|
||||
# Extend grid upward from anchor to cover content_start_abs
|
||||
grid_start = anchor_y
|
||||
while grid_start - std_height >= content_start_abs - std_height * 0.3:
|
||||
if grid_start - std_height < content_start_abs - std_height * 0.5:
|
||||
break
|
||||
grid_start -= std_height
|
||||
if len(content_words) < 5:
|
||||
return rows
|
||||
|
||||
# Generate grid lines from grid_start to content_end_abs
|
||||
# Use half the median word height as grouping tolerance
|
||||
word_heights = [w['height'] for w in content_words]
|
||||
median_wh = sorted(word_heights)[len(word_heights) // 2]
|
||||
y_tol = max(8, int(median_wh * 0.5))
|
||||
|
||||
line_clusters = _group_words_into_lines(content_words, y_tolerance_px=y_tol)
|
||||
|
||||
if len(line_clusters) < 3:
|
||||
return rows
|
||||
|
||||
# --- Step B: Compute center_y per cluster ---
|
||||
# center_y = median of (word_top + word_height/2) across all words in cluster
|
||||
# letter_h = median word height in cluster
|
||||
# All coordinates are relative to content ROI (same as word_dicts)
|
||||
cluster_info: List[Dict] = []
|
||||
for cl_words in line_clusters:
|
||||
centers = [w['top'] + w['height'] / 2 for w in cl_words]
|
||||
heights = [w['height'] for w in cl_words]
|
||||
center_y = float(np.median(centers))
|
||||
letter_h = float(np.median(heights))
|
||||
cluster_info.append({
|
||||
'center_y_rel': center_y, # relative to content ROI
|
||||
'center_y_abs': center_y + top_y, # absolute
|
||||
'letter_h': letter_h,
|
||||
'words': cl_words,
|
||||
})
|
||||
|
||||
cluster_info.sort(key=lambda c: c['center_y_rel'])
|
||||
|
||||
# --- Step C: Compute pitches and detect section breaks ---
|
||||
pitches: List[float] = []
|
||||
for i in range(1, len(cluster_info)):
|
||||
pitch = cluster_info[i]['center_y_rel'] - cluster_info[i - 1]['center_y_rel']
|
||||
pitches.append(pitch)
|
||||
|
||||
if not pitches:
|
||||
return rows
|
||||
|
||||
median_pitch = float(np.median(pitches))
|
||||
if median_pitch <= 5:
|
||||
return rows
|
||||
|
||||
# A section break is where the gap between line centers is much larger
|
||||
# than the normal pitch (sub-headings, section titles, etc.)
|
||||
BREAK_FACTOR = 1.8
|
||||
|
||||
# --- Step D: Build sections (groups of consecutive lines with normal spacing) ---
|
||||
sections: List[List[Dict]] = []
|
||||
current_section: List[Dict] = [cluster_info[0]]
|
||||
|
||||
for i in range(1, len(cluster_info)):
|
||||
gap = cluster_info[i]['center_y_rel'] - cluster_info[i - 1]['center_y_rel']
|
||||
if gap > median_pitch * BREAK_FACTOR:
|
||||
sections.append(current_section)
|
||||
current_section = [cluster_info[i]]
|
||||
else:
|
||||
current_section.append(cluster_info[i])
|
||||
|
||||
if current_section:
|
||||
sections.append(current_section)
|
||||
|
||||
# --- Step E: Build row boundaries per section ---
|
||||
grid_rows: List[RowGeometry] = []
|
||||
y = grid_start
|
||||
idx = 0
|
||||
|
||||
while y < content_end_abs - std_height * 0.3:
|
||||
row_y = y
|
||||
row_h = std_height
|
||||
for section in sections:
|
||||
if not section:
|
||||
continue
|
||||
|
||||
# Last row: extend to content_end if remainder > 30% of std_height
|
||||
if y + std_height >= content_end_abs:
|
||||
row_h = content_end_abs - y
|
||||
if row_h < std_height * 0.3:
|
||||
break # too small, skip
|
||||
if len(section) == 1:
|
||||
# Single-line section (likely a heading)
|
||||
cl = section[0]
|
||||
half_h = max(cl['letter_h'], median_pitch * 0.4)
|
||||
row_top = cl['center_y_abs'] - half_h
|
||||
row_bot = cl['center_y_abs'] + half_h
|
||||
grid_rows.append(RowGeometry(
|
||||
index=0,
|
||||
x=left_x,
|
||||
y=round(row_top),
|
||||
width=content_w,
|
||||
height=round(row_bot - row_top),
|
||||
word_count=len(cl['words']),
|
||||
words=cl['words'],
|
||||
row_type='content',
|
||||
gap_before=0,
|
||||
))
|
||||
continue
|
||||
|
||||
# Assign words whose vertical center falls in this grid row
|
||||
row_words = [w for w in word_dicts
|
||||
if w['top'] + top_y >= row_y - 2
|
||||
and w['top'] + w['height'] / 2 + top_y < row_y + row_h + 2]
|
||||
# Compute local pitch for this section
|
||||
local_pitches = []
|
||||
for i in range(1, len(section)):
|
||||
local_pitches.append(
|
||||
section[i]['center_y_rel'] - section[i - 1]['center_y_rel']
|
||||
)
|
||||
local_pitch = float(np.median(local_pitches)) if local_pitches else median_pitch
|
||||
|
||||
grid_rows.append(RowGeometry(
|
||||
index=idx,
|
||||
x=left_x,
|
||||
y=round(row_y),
|
||||
width=content_w,
|
||||
height=round(row_h),
|
||||
word_count=len(row_words),
|
||||
words=row_words,
|
||||
row_type='content',
|
||||
gap_before=0,
|
||||
))
|
||||
# Row boundaries are placed at midpoints between consecutive centers.
|
||||
# First row: top = center - local_pitch/2
|
||||
# Last row: bottom = center + local_pitch/2
|
||||
for i, cl in enumerate(section):
|
||||
if i == 0:
|
||||
row_top = cl['center_y_abs'] - local_pitch / 2
|
||||
else:
|
||||
# Midpoint between this center and previous center
|
||||
prev_center = section[i - 1]['center_y_abs']
|
||||
row_top = (prev_center + cl['center_y_abs']) / 2
|
||||
|
||||
idx += 1
|
||||
y += std_height
|
||||
if i == len(section) - 1:
|
||||
row_bot = cl['center_y_abs'] + local_pitch / 2
|
||||
else:
|
||||
next_center = section[i + 1]['center_y_abs']
|
||||
row_bot = (cl['center_y_abs'] + next_center) / 2
|
||||
|
||||
# Clamp to reasonable bounds
|
||||
row_top = max(top_y, row_top)
|
||||
row_bot = min(top_y + content_h, row_bot)
|
||||
|
||||
if row_bot - row_top < 5:
|
||||
continue
|
||||
|
||||
grid_rows.append(RowGeometry(
|
||||
index=0,
|
||||
x=left_x,
|
||||
y=round(row_top),
|
||||
width=content_w,
|
||||
height=round(row_bot - row_top),
|
||||
word_count=len(cl['words']),
|
||||
words=cl['words'],
|
||||
row_type='content',
|
||||
gap_before=0,
|
||||
))
|
||||
|
||||
if not grid_rows:
|
||||
return rows
|
||||
|
||||
# --- Validate: check that words fit the grid well ---
|
||||
# Count words that land in exactly one grid row
|
||||
all_content_words = []
|
||||
for r in content_rows:
|
||||
all_content_words.extend(r.words)
|
||||
# Deduplicate by position
|
||||
seen = set()
|
||||
unique_words = []
|
||||
for w in all_content_words:
|
||||
key = (w['left'], w['top'], w['width'], w['height'])
|
||||
if key not in seen:
|
||||
seen.add(key)
|
||||
unique_words.append(w)
|
||||
# --- Step F: Re-assign words to grid rows ---
|
||||
# Words may have shifted slightly; assign each word to the row whose
|
||||
# center is closest to the word's vertical center.
|
||||
for gr in grid_rows:
|
||||
gr.words = []
|
||||
|
||||
if unique_words:
|
||||
matched = 0
|
||||
for w in unique_words:
|
||||
w_center_y = w['top'] + top_y + w['height'] / 2
|
||||
for gr in grid_rows:
|
||||
if gr.y <= w_center_y < gr.y + gr.height:
|
||||
matched += 1
|
||||
break
|
||||
match_ratio = matched / len(unique_words)
|
||||
for w in content_words:
|
||||
w_center = w['top'] + top_y + w['height'] / 2
|
||||
best_row = None
|
||||
best_dist = float('inf')
|
||||
for gr in grid_rows:
|
||||
row_center = gr.y + gr.height / 2
|
||||
dist = abs(w_center - row_center)
|
||||
if dist < best_dist:
|
||||
best_dist = dist
|
||||
best_row = gr
|
||||
if best_row is not None and best_dist < median_pitch:
|
||||
best_row.words.append(w)
|
||||
|
||||
for gr in grid_rows:
|
||||
gr.word_count = len(gr.words)
|
||||
|
||||
# --- Step G: Validate ---
|
||||
words_placed = sum(gr.word_count for gr in grid_rows)
|
||||
if len(content_words) > 0:
|
||||
match_ratio = words_placed / len(content_words)
|
||||
if match_ratio < 0.85:
|
||||
logger.info(f"RowGrid: grid only matches {match_ratio:.0%} of words, "
|
||||
f"keeping gap-based rows")
|
||||
logger.info(f"RowGrid: word-center grid only matches {match_ratio:.0%} "
|
||||
f"of words, keeping gap-based rows")
|
||||
return rows
|
||||
|
||||
# --- Merge header/footer rows back ---
|
||||
# Remove empty grid rows (no words assigned)
|
||||
grid_rows = [gr for gr in grid_rows if gr.word_count > 0]
|
||||
|
||||
# --- Step H: Merge header/footer + re-index ---
|
||||
result = list(non_content) + grid_rows
|
||||
result.sort(key=lambda r: r.y)
|
||||
for i, r in enumerate(result):
|
||||
r.index = i
|
||||
|
||||
n_oversized = sum(1 for r in content_rows if r.height > std_height * 1.5)
|
||||
logger.info(f"RowGrid: uniform grid applied (std_height={std_height}px, "
|
||||
f"{len(grid_rows)} grid rows, was {len(content_rows)} content rows, "
|
||||
f"{n_oversized} were oversized, "
|
||||
f"consistency={consistency_ratio:.0%})")
|
||||
logger.info(f"RowGrid: word-center grid applied "
|
||||
f"(median_pitch={median_pitch:.0f}px, "
|
||||
f"{len(sections)} sections, "
|
||||
f"{len(grid_rows)} grid rows, "
|
||||
f"was {len(content_rows)} gap-based rows)")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
Reference in New Issue
Block a user