feat(ocr-pipeline): add row detection step with horizontal gap analysis

Add Step 4 (row detection) between column detection and word recognition.
Uses horizontal projection profiles + whitespace gaps (same method as columns).
Includes header/footer classification via gap-size heuristics.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-02-28 01:14:31 +01:00
parent c7ae44ff17
commit 04b83d5f46
4 changed files with 550 additions and 25 deletions

View File

@@ -90,6 +90,20 @@ class ColumnGeometry:
width_ratio: float # width / content_width (0.0-1.0)
@dataclass
class RowGeometry:
"""Geometrisch erkannte Zeile mit Kopf-/Fusszeilen-Klassifikation."""
index: int # 0-basiert, oben→unten
x: int # absolute left (= content left_x)
y: int # absolute y start
width: int # content width
height: int # Zeilenhoehe in px
word_count: int
words: List[Dict]
row_type: str = 'content' # 'content' | 'header' | 'footer'
gap_before: int = 0 # Gap in px ueber dieser Zeile
@dataclass
class VocabRow:
"""A single vocabulary entry assembled from multi-column OCR."""
@@ -885,7 +899,8 @@ def _detect_columns_by_clustering(
right_x: int,
top_y: int,
bottom_y: int,
) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int]]:
inv: Optional[np.ndarray] = None,
) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int, List[Dict], Optional[np.ndarray]]]:
"""Fallback: detect columns by clustering left-aligned word positions.
Used when the primary gap-based algorithm finds fewer than 2 gaps.
@@ -965,7 +980,7 @@ def _detect_columns_by_clustering(
margin_px = max(6, int(content_w * 0.003))
return _build_geometries_from_starts(
[(max(0, left_x + m['min_edge'] - margin_px), m['count']) for m in merged],
word_dicts, left_x, right_x, top_y, bottom_y, content_w, content_h,
word_dicts, left_x, right_x, top_y, bottom_y, content_w, content_h, inv,
)
@@ -978,7 +993,8 @@ def _build_geometries_from_starts(
bottom_y: int,
content_w: int,
content_h: int,
) -> Tuple[List[ColumnGeometry], int, int, int, int]:
inv: Optional[np.ndarray] = None,
) -> Tuple[List[ColumnGeometry], int, int, int, int, List[Dict], Optional[np.ndarray]]:
"""Build ColumnGeometry objects from a list of (abs_start_x, word_count) pairs."""
geometries = []
for i, (start_x, count) in enumerate(col_starts):
@@ -1005,10 +1021,10 @@ def _build_geometries_from_starts(
logger.info(f"ColumnGeometry: {len(geometries)} columns: "
f"{[(g.index, g.x, g.width, g.word_count) for g in geometries]}")
return (geometries, left_x, right_x, top_y, bottom_y)
return (geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv)
def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int]]:
def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int, List[Dict], np.ndarray]]:
"""Detect column geometry using whitespace-gap analysis with word validation.
Phase A of the two-phase column detection. Uses vertical projection
@@ -1022,8 +1038,8 @@ def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Opt
dewarped_bgr: Original BGR image (for Tesseract word detection).
Returns:
Tuple of (geometries, left_x, right_x, top_y, bottom_y) or None if
detection fails entirely.
Tuple of (geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv)
or None if detection fails entirely.
"""
h, w = ocr_img.shape[:2]
@@ -1165,7 +1181,7 @@ def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Opt
logger.info("ColumnGeometry: < 2 gaps found, falling back to clustering")
return _detect_columns_by_clustering(
word_dicts, left_edges, edge_word_indices,
content_w, content_h, left_x, right_x, top_y, bottom_y,
content_w, content_h, left_x, right_x, top_y, bottom_y, inv,
)
# --- Step 7: Derive column boundaries from gaps ---
@@ -1261,7 +1277,270 @@ def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Opt
logger.info(f"ColumnGeometry: {len(geometries)} columns: "
f"{[(g.index, g.x, g.width, g.word_count) for g in geometries]}")
return (geometries, left_x, right_x, top_y, bottom_y)
return (geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv)
# =============================================================================
# Row Geometry Detection (horizontal whitespace-gap analysis)
# =============================================================================
def detect_row_geometry(
inv: np.ndarray,
word_dicts: List[Dict],
left_x: int, right_x: int,
top_y: int, bottom_y: int,
) -> List['RowGeometry']:
"""Detect row geometry using horizontal whitespace-gap analysis.
Mirrors the vertical gap approach used for columns, but operates on
horizontal projection profiles to find gaps between text lines.
Also classifies header/footer rows based on gap size.
Args:
inv: Inverted binarized image (white text on black bg, full page).
word_dicts: Word bounding boxes from Tesseract (relative to content ROI).
left_x, right_x: Absolute X bounds of the content area.
top_y, bottom_y: Absolute Y bounds of the content area.
Returns:
List of RowGeometry objects sorted top to bottom.
"""
content_w = right_x - left_x
content_h = bottom_y - top_y
if content_h < 10 or content_w < 10:
logger.warning("detect_row_geometry: content area too small")
return []
# --- Step 1: Horizontal projection profile ---
content_strip = inv[top_y:bottom_y, left_x:right_x]
h_proj = np.sum(content_strip, axis=1).astype(float)
h_proj_norm = h_proj / (content_w * 255) if content_w > 0 else h_proj
# --- Step 2: Smoothing + threshold ---
kernel_size = max(3, content_h // 200)
if kernel_size % 2 == 0:
kernel_size += 1
h_smooth = np.convolve(h_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
median_density = float(np.median(h_smooth[h_smooth > 0])) if np.any(h_smooth > 0) else 0.01
gap_threshold = max(median_density * 0.15, 0.003)
in_gap = h_smooth < gap_threshold
MIN_GAP_HEIGHT = max(3, content_h // 500)
# --- Step 3: Collect contiguous gap regions ---
raw_gaps = [] # (start_y_rel, end_y_rel) relative to content ROI
gap_start = None
for y in range(len(in_gap)):
if in_gap[y]:
if gap_start is None:
gap_start = y
else:
if gap_start is not None:
gap_height = y - gap_start
if gap_height >= MIN_GAP_HEIGHT:
raw_gaps.append((gap_start, y))
gap_start = None
if gap_start is not None:
gap_height = len(in_gap) - gap_start
if gap_height >= MIN_GAP_HEIGHT:
raw_gaps.append((gap_start, len(in_gap)))
logger.info(f"RowGeometry: {len(raw_gaps)} raw gaps found (threshold={gap_threshold:.4f}, "
f"min_height={MIN_GAP_HEIGHT}px)")
# --- Step 4: Validate gaps against word bounding boxes ---
validated_gaps = []
for gap_start_rel, gap_end_rel in raw_gaps:
overlapping = False
for wd in word_dicts:
word_top = wd['top']
word_bottom = wd['top'] + wd['height']
if word_top < gap_end_rel and word_bottom > gap_start_rel:
overlapping = True
break
if not overlapping:
validated_gaps.append((gap_start_rel, gap_end_rel))
else:
# Try to shift the gap to avoid overlapping words
min_word_top = content_h
max_word_bottom = 0
for wd in word_dicts:
word_top = wd['top']
word_bottom = wd['top'] + wd['height']
if word_top < gap_end_rel and word_bottom > gap_start_rel:
min_word_top = min(min_word_top, word_top)
max_word_bottom = max(max_word_bottom, word_bottom)
if min_word_top - gap_start_rel >= MIN_GAP_HEIGHT:
validated_gaps.append((gap_start_rel, min_word_top))
elif gap_end_rel - max_word_bottom >= MIN_GAP_HEIGHT:
validated_gaps.append((max_word_bottom, gap_end_rel))
else:
logger.debug(f"RowGeometry: gap [{gap_start_rel}..{gap_end_rel}] "
f"discarded (word overlap, no room to shift)")
logger.info(f"RowGeometry: {len(validated_gaps)} gaps after word validation")
# --- Fallback if too few gaps ---
if len(validated_gaps) < 2:
logger.info("RowGeometry: < 2 gaps found, falling back to word grouping")
return _build_rows_from_word_grouping(
word_dicts, left_x, right_x, top_y, bottom_y, content_w, content_h,
)
validated_gaps.sort(key=lambda g: g[0])
# --- Step 5: Header/footer detection via gap size ---
HEADER_FOOTER_ZONE = 0.15
GAP_MULTIPLIER = 2.0
gap_sizes = [g[1] - g[0] for g in validated_gaps]
median_gap = float(np.median(gap_sizes)) if gap_sizes else 0
large_gap_threshold = median_gap * GAP_MULTIPLIER
header_boundary_rel = None # y below which is header
footer_boundary_rel = None # y above which is footer
header_zone_limit = int(content_h * HEADER_FOOTER_ZONE)
footer_zone_start = int(content_h * (1.0 - HEADER_FOOTER_ZONE))
# Find largest gap in header zone
best_header_gap = None
for gs, ge in validated_gaps:
gap_mid = (gs + ge) / 2
gap_size = ge - gs
if gap_mid < header_zone_limit and gap_size > large_gap_threshold:
if best_header_gap is None or gap_size > (best_header_gap[1] - best_header_gap[0]):
best_header_gap = (gs, ge)
if best_header_gap is not None:
header_boundary_rel = best_header_gap[1]
logger.info(f"RowGeometry: header boundary at y_rel={header_boundary_rel} "
f"(gap={best_header_gap[1] - best_header_gap[0]}px, "
f"median_gap={median_gap:.0f}px)")
# Find largest gap in footer zone
best_footer_gap = None
for gs, ge in validated_gaps:
gap_mid = (gs + ge) / 2
gap_size = ge - gs
if gap_mid > footer_zone_start and gap_size > large_gap_threshold:
if best_footer_gap is None or gap_size > (best_footer_gap[1] - best_footer_gap[0]):
best_footer_gap = (gs, ge)
if best_footer_gap is not None:
footer_boundary_rel = best_footer_gap[0]
logger.info(f"RowGeometry: footer boundary at y_rel={footer_boundary_rel} "
f"(gap={best_footer_gap[1] - best_footer_gap[0]}px)")
# --- Step 6: Build RowGeometry objects from gaps ---
# Rows are the spans between gaps
row_boundaries = [] # (start_y_rel, end_y_rel)
# Top of content to first gap
if validated_gaps[0][0] > MIN_GAP_HEIGHT:
row_boundaries.append((0, validated_gaps[0][0]))
# Between gaps
for i in range(len(validated_gaps) - 1):
row_start = validated_gaps[i][1]
row_end = validated_gaps[i + 1][0]
if row_end - row_start > 0:
row_boundaries.append((row_start, row_end))
# Last gap to bottom of content
if validated_gaps[-1][1] < content_h - MIN_GAP_HEIGHT:
row_boundaries.append((validated_gaps[-1][1], content_h))
rows = []
for idx, (row_start_rel, row_end_rel) in enumerate(row_boundaries):
# Determine row type
row_mid = (row_start_rel + row_end_rel) / 2
if header_boundary_rel is not None and row_mid < header_boundary_rel:
row_type = 'header'
elif footer_boundary_rel is not None and row_mid > footer_boundary_rel:
row_type = 'footer'
else:
row_type = 'content'
# Collect words in this row
row_words = [w for w in word_dicts
if w['top'] + w['height'] / 2 >= row_start_rel
and w['top'] + w['height'] / 2 < row_end_rel]
# Gap before this row
gap_before = 0
if idx == 0 and validated_gaps[0][0] > 0:
gap_before = validated_gaps[0][0]
elif idx > 0:
# Find the gap just before this row boundary
for gs, ge in validated_gaps:
if ge == row_start_rel:
gap_before = ge - gs
break
rows.append(RowGeometry(
index=idx,
x=left_x,
y=top_y + row_start_rel,
width=content_w,
height=row_end_rel - row_start_rel,
word_count=len(row_words),
words=row_words,
row_type=row_type,
gap_before=gap_before,
))
type_counts = {}
for r in rows:
type_counts[r.row_type] = type_counts.get(r.row_type, 0) + 1
logger.info(f"RowGeometry: {len(rows)} rows detected: {type_counts}")
return rows
def _build_rows_from_word_grouping(
word_dicts: List[Dict],
left_x: int, right_x: int,
top_y: int, bottom_y: int,
content_w: int, content_h: int,
) -> List['RowGeometry']:
"""Fallback: build rows by grouping words by Y position.
Uses _group_words_into_lines() with a generous tolerance.
No header/footer detection in fallback mode.
"""
if not word_dicts:
return []
y_tolerance = max(20, content_h // 100)
lines = _group_words_into_lines(word_dicts, y_tolerance_px=y_tolerance)
rows = []
for idx, line_words in enumerate(lines):
if not line_words:
continue
min_top = min(w['top'] for w in line_words)
max_bottom = max(w['top'] + w['height'] for w in line_words)
row_height = max_bottom - min_top
rows.append(RowGeometry(
index=idx,
x=left_x,
y=top_y + min_top,
width=content_w,
height=row_height,
word_count=len(line_words),
words=line_words,
row_type='content',
gap_before=0,
))
logger.info(f"RowGeometry (fallback): {len(rows)} rows from word grouping")
return rows
# --- Phase B: Content-Based Classification ---
@@ -1861,7 +2140,7 @@ def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Li
layout_img = create_layout_image(dewarped_bgr)
return analyze_layout(layout_img, ocr_img)
geometries, left_x, right_x, top_y, bottom_y = result
geometries, left_x, right_x, top_y, bottom_y, _word_dicts, _inv = result
content_w = right_x - left_x
# Phase B: Content-based classification