Files
Benjamin Admin 65f4ce1947
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 32s
CI / test-go-edu-search (push) Successful in 25s
CI / test-python-klausur (push) Failing after 1m52s
CI / test-python-agent-core (push) Successful in 15s
CI / test-nodejs-website (push) Successful in 18s
feat: ImageLayoutEditor, arrow-key nav, multi-select bold, wider columns
- New ImageLayoutEditor: SVG overlay on original scan with draggable
  column dividers, horizontal guidelines (margins/header/footer),
  double-click to add columns, x-button to delete
- GridTable: MIN_COL_WIDTH 40→80px for better readability
- Arrow up/down keys navigate between rows in the grid editor
- Ctrl+Click for multi-cell selection, Ctrl+B to toggle bold on selection
- getAdjacentCell works for cells that don't exist yet (new rows/cols)
- deleteColumn now merges x-boundaries correctly
- Session restore fix: grid_editor_result/structure_result in session GET
- Footer row 3-state cycle, auto-create cells for empty footer rows
- Grid save/build/GT-mark now advance current_step=11

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-24 07:45:39 +01:00

3654 lines
142 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Document type detection, layout analysis, column/row geometry, and classification.
Lizenz: Apache 2.0 (kommerziell nutzbar)
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import logging
import re
import statistics
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
from cv_vocab_types import (
ColumnGeometry,
DetectedBox,
DocumentTypeResult,
ENGLISH_FUNCTION_WORDS,
GERMAN_FUNCTION_WORDS,
PageRegion,
PageZone,
RowGeometry,
)
from cv_ocr_engines import _group_words_into_lines # noqa: E402
logger = logging.getLogger(__name__)
try:
import cv2
except ImportError:
cv2 = None # type: ignore[assignment]
try:
import pytesseract
from PIL import Image
except ImportError:
pytesseract = None # type: ignore[assignment]
Image = None # type: ignore[assignment,misc]
def detect_document_type(ocr_img: np.ndarray, img_bgr: np.ndarray) -> DocumentTypeResult:
"""Detect whether the page is a vocab table, generic table, or full text.
Uses projection profiles and text density analysis — no OCR required.
Runs in < 2 seconds.
Args:
ocr_img: Binarized grayscale image (for projection profiles).
img_bgr: BGR color image.
Returns:
DocumentTypeResult with doc_type, confidence, pipeline, skip_steps.
"""
if ocr_img is None or ocr_img.size == 0:
return DocumentTypeResult(
doc_type='full_text', confidence=0.5, pipeline='full_page',
skip_steps=['columns', 'rows'],
features={'error': 'empty image'},
)
h, w = ocr_img.shape[:2]
# --- 1. Vertical projection profile → detect column gaps ---
# Sum dark pixels along each column (x-axis). Gaps = valleys in the profile.
# Invert: dark pixels on white background → high values = text.
vert_proj = np.sum(ocr_img < 128, axis=0).astype(float)
# Smooth the profile to avoid noise spikes
kernel_size = max(3, w // 100)
if kernel_size % 2 == 0:
kernel_size += 1
vert_smooth = np.convolve(vert_proj, np.ones(kernel_size) / kernel_size, mode='same')
# Find significant vertical gaps (columns of near-zero text density)
# A gap must be at least 1% of image width and have < 5% of max density
max_density = max(vert_smooth.max(), 1)
gap_threshold = max_density * 0.05
min_gap_width = max(5, w // 100)
in_gap = False
gap_count = 0
gap_start = 0
vert_gaps = []
for x in range(w):
if vert_smooth[x] < gap_threshold:
if not in_gap:
in_gap = True
gap_start = x
else:
if in_gap:
gap_width = x - gap_start
if gap_width >= min_gap_width:
gap_count += 1
vert_gaps.append((gap_start, x, gap_width))
in_gap = False
# Filter out margin gaps (within 10% of image edges)
margin_threshold = w * 0.10
internal_gaps = [g for g in vert_gaps if g[0] > margin_threshold and g[1] < w - margin_threshold]
internal_gap_count = len(internal_gaps)
# --- 2. Horizontal projection profile → detect row gaps ---
horiz_proj = np.sum(ocr_img < 128, axis=1).astype(float)
h_kernel = max(3, h // 200)
if h_kernel % 2 == 0:
h_kernel += 1
horiz_smooth = np.convolve(horiz_proj, np.ones(h_kernel) / h_kernel, mode='same')
h_max = max(horiz_smooth.max(), 1)
h_gap_threshold = h_max * 0.05
min_row_gap = max(3, h // 200)
row_gap_count = 0
in_gap = False
for y in range(h):
if horiz_smooth[y] < h_gap_threshold:
if not in_gap:
in_gap = True
gap_start = y
else:
if in_gap:
if y - gap_start >= min_row_gap:
row_gap_count += 1
in_gap = False
# --- 3. Text density distribution (4×4 grid) ---
grid_rows, grid_cols = 4, 4
cell_h, cell_w = h // grid_rows, w // grid_cols
densities = []
for gr in range(grid_rows):
for gc in range(grid_cols):
cell = ocr_img[gr * cell_h:(gr + 1) * cell_h,
gc * cell_w:(gc + 1) * cell_w]
if cell.size > 0:
d = float(np.count_nonzero(cell < 128)) / cell.size
densities.append(d)
density_std = float(np.std(densities)) if densities else 0
density_mean = float(np.mean(densities)) if densities else 0
features = {
'vertical_gaps': gap_count,
'internal_vertical_gaps': internal_gap_count,
'vertical_gap_details': [(g[0], g[1], g[2]) for g in vert_gaps[:10]],
'row_gaps': row_gap_count,
'density_mean': round(density_mean, 4),
'density_std': round(density_std, 4),
'image_size': (w, h),
}
# --- 4. Decision tree ---
# Use internal_gap_count (excludes margin gaps) for column detection.
if internal_gap_count >= 2 and row_gap_count >= 5:
# Multiple internal vertical gaps + many row gaps → table
confidence = min(0.95, 0.7 + internal_gap_count * 0.05 + row_gap_count * 0.005)
return DocumentTypeResult(
doc_type='vocab_table',
confidence=round(confidence, 2),
pipeline='cell_first',
skip_steps=[],
features=features,
)
elif internal_gap_count >= 1 and row_gap_count >= 3:
# Some internal structure, likely a table
confidence = min(0.85, 0.5 + internal_gap_count * 0.1 + row_gap_count * 0.01)
return DocumentTypeResult(
doc_type='generic_table',
confidence=round(confidence, 2),
pipeline='cell_first',
skip_steps=[],
features=features,
)
elif internal_gap_count == 0:
# No internal column gaps → full text (regardless of density)
confidence = min(0.95, 0.8 + (1 - min(density_std, 0.1)) * 0.15)
return DocumentTypeResult(
doc_type='full_text',
confidence=round(confidence, 2),
pipeline='full_page',
skip_steps=['columns', 'rows'],
features=features,
)
else:
# Ambiguous — default to vocab_table (most common use case)
return DocumentTypeResult(
doc_type='vocab_table',
confidence=0.5,
pipeline='cell_first',
skip_steps=[],
features=features,
)
# =============================================================================
# Stage 4: Dual Image Preparation
# =============================================================================
def create_ocr_image(img: np.ndarray) -> np.ndarray:
"""Create a binarized image optimized for Tesseract OCR.
Steps: Grayscale → Background normalization → Adaptive threshold → Denoise.
Args:
img: BGR image.
Returns:
Binary image (white text on black background inverted to black on white).
"""
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Background normalization: divide by blurred version
bg = cv2.GaussianBlur(gray, (51, 51), 0)
normalized = cv2.divide(gray, bg, scale=255)
# Adaptive binarization
binary = cv2.adaptiveThreshold(
normalized, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 31, 10
)
# Light denoise
denoised = cv2.medianBlur(binary, 3)
return denoised
def create_layout_image(img: np.ndarray) -> np.ndarray:
"""Create a CLAHE-enhanced grayscale image for layout analysis.
Args:
img: BGR image.
Returns:
Enhanced grayscale image.
"""
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(gray)
return enhanced
# =============================================================================
# Stage 5: Layout Analysis (Projection Profiles)
# =============================================================================
def _filter_narrow_runs(mask: np.ndarray, min_width: int) -> np.ndarray:
"""Remove contiguous True-runs shorter than *min_width* from a 1-D bool mask."""
out = mask.copy()
n = len(out)
i = 0
while i < n:
if out[i]:
start = i
while i < n and out[i]:
i += 1
if (i - start) < min_width:
out[start:i] = False
else:
i += 1
return out
def _find_content_bounds(inv: np.ndarray) -> Tuple[int, int, int, int]:
"""Find the bounding box of actual text content (excluding page margins).
Scan artefacts (thin black lines at page edges) are filtered out by
discarding contiguous projection runs narrower than 1 % of the image
dimension (min 5 px).
Returns:
Tuple of (left_x, right_x, top_y, bottom_y).
"""
h, w = inv.shape[:2]
threshold = 0.005
# --- Horizontal projection for top/bottom ---
h_proj = np.sum(inv, axis=1).astype(float) / (w * 255)
h_mask = h_proj > threshold
min_h_run = max(5, h // 100)
h_mask = _filter_narrow_runs(h_mask, min_h_run)
top_y = 0
for y in range(h):
if h_mask[y]:
top_y = max(0, y - 5)
break
bottom_y = h
for y in range(h - 1, 0, -1):
if h_mask[y]:
bottom_y = min(h, y + 5)
break
# --- Vertical projection for left/right margins ---
v_proj = np.sum(inv[top_y:bottom_y, :], axis=0).astype(float)
v_proj_norm = v_proj / ((bottom_y - top_y) * 255) if (bottom_y - top_y) > 0 else v_proj
v_mask = v_proj_norm > threshold
min_v_run = max(5, w // 100)
v_mask = _filter_narrow_runs(v_mask, min_v_run)
left_x = 0
for x in range(w):
if v_mask[x]:
left_x = max(0, x - 2)
break
right_x = w
for x in range(w - 1, 0, -1):
if v_mask[x]:
right_x = min(w, x + 2)
break
return left_x, right_x, top_y, bottom_y
def analyze_layout(layout_img: np.ndarray, ocr_img: np.ndarray) -> List[PageRegion]:
"""Detect columns, header, and footer using projection profiles.
Uses content-bounds detection to exclude page margins before searching
for column separators within the actual text area.
Args:
layout_img: CLAHE-enhanced grayscale image.
ocr_img: Binarized image for text density analysis.
Returns:
List of PageRegion objects describing detected regions.
"""
h, w = ocr_img.shape[:2]
# Invert: black text on white → white text on black for projection
inv = cv2.bitwise_not(ocr_img)
# --- Find actual content bounds (exclude page margins) ---
left_x, right_x, top_y, bottom_y = _find_content_bounds(inv)
content_w = right_x - left_x
content_h = bottom_y - top_y
logger.info(f"Layout: content bounds x=[{left_x}..{right_x}] ({content_w}px), "
f"y=[{top_y}..{bottom_y}] ({content_h}px) in {w}x{h} image")
if content_w < w * 0.3 or content_h < h * 0.3:
# Fallback if detection seems wrong
left_x, right_x = 0, w
top_y, bottom_y = 0, h
content_w, content_h = w, h
# --- Vertical projection within content area to find column separators ---
content_strip = inv[top_y:bottom_y, left_x:right_x]
v_proj = np.sum(content_strip, axis=0).astype(float)
v_proj_norm = v_proj / (content_h * 255) if content_h > 0 else v_proj
# Smooth the projection profile
kernel_size = max(5, content_w // 50)
if kernel_size % 2 == 0:
kernel_size += 1
v_proj_smooth = np.convolve(v_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
# Debug: log projection profile statistics
p_mean = float(np.mean(v_proj_smooth))
p_median = float(np.median(v_proj_smooth))
p_min = float(np.min(v_proj_smooth))
p_max = float(np.max(v_proj_smooth))
logger.info(f"Layout: v_proj stats — min={p_min:.4f}, max={p_max:.4f}, "
f"mean={p_mean:.4f}, median={p_median:.4f}")
# Find valleys using multiple threshold strategies
# Strategy 1: relative to median (catches clear separators)
# Strategy 2: local minima approach (catches subtle gaps)
threshold = max(p_median * 0.3, p_mean * 0.2)
logger.info(f"Layout: valley threshold={threshold:.4f}")
in_valley = v_proj_smooth < threshold
# Find contiguous valley regions
all_valleys = []
start = None
for x in range(len(v_proj_smooth)):
if in_valley[x] and start is None:
start = x
elif not in_valley[x] and start is not None:
valley_width = x - start
valley_depth = float(np.min(v_proj_smooth[start:x]))
# Valley must be at least 3px wide
if valley_width >= 3:
all_valleys.append((start, x, (start + x) // 2, valley_width, valley_depth))
start = None
logger.info(f"Layout: raw valleys (before filter): {len(all_valleys)}"
f"{[(v[0]+left_x, v[1]+left_x, v[3], f'{v[4]:.4f}') for v in all_valleys[:10]]}")
# Filter: valleys must be inside the content area (not at edges)
inner_margin = int(content_w * 0.08)
valleys = [v for v in all_valleys if inner_margin < v[2] < content_w - inner_margin]
# If no valleys found with strict threshold, try local minima approach
if len(valleys) < 2:
logger.info("Layout: trying local minima approach for column detection")
# Divide content into 20 segments, find the 2 lowest
seg_count = 20
seg_width = content_w // seg_count
seg_scores = []
for i in range(seg_count):
sx = i * seg_width
ex = min((i + 1) * seg_width, content_w)
seg_mean = float(np.mean(v_proj_smooth[sx:ex]))
seg_scores.append((i, sx, ex, seg_mean))
seg_scores.sort(key=lambda s: s[3])
logger.info(f"Layout: segment scores (lowest 5): "
f"{[(s[0], s[1]+left_x, s[2]+left_x, f'{s[3]:.4f}') for s in seg_scores[:5]]}")
# Find two lowest non-adjacent segments that create reasonable columns
candidate_valleys = []
for seg_idx, sx, ex, seg_mean in seg_scores:
# Must not be at the edges
if seg_idx <= 1 or seg_idx >= seg_count - 2:
continue
# Must be significantly lower than overall mean
if seg_mean < p_mean * 0.6:
center = (sx + ex) // 2
candidate_valleys.append((sx, ex, center, ex - sx, seg_mean))
if len(candidate_valleys) >= 2:
# Pick the best pair: non-adjacent, creating reasonable column widths
candidate_valleys.sort(key=lambda v: v[2])
best_pair = None
best_score = float('inf')
for i in range(len(candidate_valleys)):
for j in range(i + 1, len(candidate_valleys)):
c1 = candidate_valleys[i][2]
c2 = candidate_valleys[j][2]
# Must be at least 20% apart
if (c2 - c1) < content_w * 0.2:
continue
col1 = c1
col2 = c2 - c1
col3 = content_w - c2
# Each column at least 15%
if col1 < content_w * 0.12 or col2 < content_w * 0.12 or col3 < content_w * 0.12:
continue
parts = sorted([col1, col2, col3])
score = parts[2] - parts[0]
if score < best_score:
best_score = score
best_pair = (candidate_valleys[i], candidate_valleys[j])
if best_pair:
valleys = list(best_pair)
logger.info(f"Layout: local minima found 2 valleys: "
f"{[(v[0]+left_x, v[1]+left_x, v[3]) for v in valleys]}")
logger.info(f"Layout: final {len(valleys)} valleys: "
f"{[(v[0]+left_x, v[1]+left_x, v[3]) for v in valleys]}")
regions = []
if len(valleys) >= 2:
# 3-column layout detected
valleys.sort(key=lambda v: v[2])
if len(valleys) == 2:
sep1_center = valleys[0][2]
sep2_center = valleys[1][2]
else:
# Pick the two valleys that best divide into 3 parts
# Prefer wider valleys (more likely true separators)
best_pair = None
best_score = float('inf')
for i in range(len(valleys)):
for j in range(i + 1, len(valleys)):
c1, c2 = valleys[i][2], valleys[j][2]
# Each column should be at least 15% of content width
col1 = c1
col2 = c2 - c1
col3 = content_w - c2
if col1 < content_w * 0.15 or col2 < content_w * 0.15 or col3 < content_w * 0.15:
continue
# Score: lower is better (more even distribution)
parts = sorted([col1, col2, col3])
score = parts[2] - parts[0]
# Bonus for wider valleys (subtract valley width)
score -= (valleys[i][3] + valleys[j][3]) * 0.5
if score < best_score:
best_score = score
best_pair = (c1, c2)
if best_pair:
sep1_center, sep2_center = best_pair
else:
sep1_center = valleys[0][2]
sep2_center = valleys[1][2]
# Convert from content-relative to absolute coordinates
abs_sep1 = sep1_center + left_x
abs_sep2 = sep2_center + left_x
logger.info(f"Layout: 3 columns at separators x={abs_sep1}, x={abs_sep2} "
f"(widths: {abs_sep1}, {abs_sep2-abs_sep1}, {w-abs_sep2})")
regions.append(PageRegion(
type='column_en', x=0, y=top_y,
width=abs_sep1, height=content_h
))
regions.append(PageRegion(
type='column_de', x=abs_sep1, y=top_y,
width=abs_sep2 - abs_sep1, height=content_h
))
regions.append(PageRegion(
type='column_example', x=abs_sep2, y=top_y,
width=w - abs_sep2, height=content_h
))
elif len(valleys) == 1:
# 2-column layout
abs_sep = valleys[0][2] + left_x
logger.info(f"Layout: 2 columns at separator x={abs_sep}")
regions.append(PageRegion(
type='column_en', x=0, y=top_y,
width=abs_sep, height=content_h
))
regions.append(PageRegion(
type='column_de', x=abs_sep, y=top_y,
width=w - abs_sep, height=content_h
))
else:
# No columns detected — run full-page OCR as single column
logger.warning("Layout: no column separators found, using full page")
regions.append(PageRegion(
type='column_en', x=0, y=top_y,
width=w, height=content_h
))
# Add header/footer info (gap-based detection with fallback)
_add_header_footer(regions, top_y, bottom_y, w, h, inv=inv)
top_region = next((r.type for r in regions if r.type in ('header', 'margin_top')), 'none')
bottom_region = next((r.type for r in regions if r.type in ('footer', 'margin_bottom')), 'none')
col_count = len([r for r in regions if r.type.startswith('column')])
logger.info(f"Layout: {col_count} columns, top={top_region}, bottom={bottom_region}")
return regions
# =============================================================================
# Stage 5b: Word-Based Layout Analysis (Two-Phase Column Detection)
# =============================================================================
# --- Phase A: Geometry Detection ---
def _detect_columns_by_clustering(
word_dicts: List[Dict],
left_edges: List[int],
edge_word_indices: List[int],
content_w: int,
content_h: int,
left_x: int,
right_x: int,
top_y: int,
bottom_y: int,
inv: Optional[np.ndarray] = None,
) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int, List[Dict], Optional[np.ndarray]]]:
"""Fallback: detect columns by clustering left-aligned word positions.
Used when the primary gap-based algorithm finds fewer than 2 gaps.
"""
tolerance = max(10, int(content_w * 0.01))
sorted_pairs = sorted(zip(left_edges, edge_word_indices), key=lambda p: p[0])
clusters = []
cluster_widxs = []
cur_edges = [sorted_pairs[0][0]]
cur_widxs = [sorted_pairs[0][1]]
for edge, widx in sorted_pairs[1:]:
if edge - cur_edges[-1] <= tolerance:
cur_edges.append(edge)
cur_widxs.append(widx)
else:
clusters.append(cur_edges)
cluster_widxs.append(cur_widxs)
cur_edges = [edge]
cur_widxs = [widx]
clusters.append(cur_edges)
cluster_widxs.append(cur_widxs)
MIN_Y_COVERAGE_PRIMARY = 0.30
MIN_Y_COVERAGE_SECONDARY = 0.15
MIN_WORDS_SECONDARY = 5
cluster_infos = []
for c_edges, c_widxs in zip(clusters, cluster_widxs):
if len(c_edges) < 2:
continue
y_positions = [word_dicts[idx]['top'] for idx in c_widxs]
y_span = max(y_positions) - min(y_positions)
y_coverage = y_span / content_h if content_h > 0 else 0.0
cluster_infos.append({
'mean_x': int(np.mean(c_edges)),
'count': len(c_edges),
'min_edge': min(c_edges),
'max_edge': max(c_edges),
'y_min': min(y_positions),
'y_max': max(y_positions),
'y_coverage': y_coverage,
})
primary = [c for c in cluster_infos if c['y_coverage'] >= MIN_Y_COVERAGE_PRIMARY]
primary_set = set(id(c) for c in primary)
secondary = [c for c in cluster_infos
if id(c) not in primary_set
and c['y_coverage'] >= MIN_Y_COVERAGE_SECONDARY
and c['count'] >= MIN_WORDS_SECONDARY]
significant = sorted(primary + secondary, key=lambda c: c['mean_x'])
if len(significant) < 3:
logger.info("ColumnGeometry clustering fallback: < 3 significant clusters")
return None
merge_distance = max(30, int(content_w * 0.06))
merged = [significant[0].copy()]
for s in significant[1:]:
if s['mean_x'] - merged[-1]['mean_x'] < merge_distance:
prev = merged[-1]
total = prev['count'] + s['count']
avg_x = (prev['mean_x'] * prev['count'] + s['mean_x'] * s['count']) // total
prev['mean_x'] = avg_x
prev['count'] = total
prev['min_edge'] = min(prev['min_edge'], s['min_edge'])
prev['max_edge'] = max(prev['max_edge'], s['max_edge'])
else:
merged.append(s.copy())
if len(merged) < 3:
logger.info("ColumnGeometry clustering fallback: < 3 merged clusters")
return None
logger.info(f"ColumnGeometry clustering fallback: {len(merged)} columns from clustering")
margin_px = max(6, int(content_w * 0.003))
return _build_geometries_from_starts(
[(max(0, left_x + m['min_edge'] - margin_px), m['count']) for m in merged],
word_dicts, left_x, right_x, top_y, bottom_y, content_w, content_h, inv,
)
def _detect_sub_columns(
geometries: List[ColumnGeometry],
content_w: int,
left_x: int = 0,
top_y: int = 0,
header_y: Optional[int] = None,
footer_y: Optional[int] = None,
_edge_tolerance: int = 8,
_min_col_start_ratio: float = 0.10,
) -> List[ColumnGeometry]:
"""Split columns that contain internal sub-columns based on left-edge alignment.
For each column, clusters word left-edges into alignment bins (within
``_edge_tolerance`` px). The leftmost bin whose word count reaches
``_min_col_start_ratio`` of the column total is treated as the true column
start. Any words to the left of that bin form a sub-column, provided they
number >= 2 and < 35 % of total.
Word ``left`` values are relative to the content ROI (offset by *left_x*),
while ``ColumnGeometry.x`` is in absolute image coordinates. *left_x*
bridges the two coordinate systems.
If *header_y* / *footer_y* are provided (absolute y-coordinates), words
in header/footer regions are excluded from alignment clustering to avoid
polluting the bins with page numbers or chapter titles. Word ``top``
values are relative to *top_y*.
Returns a new list of ColumnGeometry — potentially longer than the input.
"""
if content_w <= 0:
return geometries
result: List[ColumnGeometry] = []
for geo in geometries:
# Only consider wide-enough columns with enough words
if geo.width_ratio < 0.15 or geo.word_count < 5:
result.append(geo)
continue
# Collect left-edges of confident words, excluding header/footer
# Convert header_y/footer_y from absolute to relative (word 'top' is relative to top_y)
min_top_rel = (header_y - top_y) if header_y is not None else None
max_top_rel = (footer_y - top_y) if footer_y is not None else None
confident = [w for w in geo.words
if w.get('conf', 0) >= 30
and (min_top_rel is None or w['top'] >= min_top_rel)
and (max_top_rel is None or w['top'] <= max_top_rel)]
if len(confident) < 3:
result.append(geo)
continue
# --- Cluster left-edges into alignment bins ---
sorted_edges = sorted(w['left'] for w in confident)
bins: List[Tuple[int, int, int, int]] = [] # (center, count, min_edge, max_edge)
cur = [sorted_edges[0]]
for i in range(1, len(sorted_edges)):
if sorted_edges[i] - cur[-1] <= _edge_tolerance:
cur.append(sorted_edges[i])
else:
bins.append((sum(cur) // len(cur), len(cur), min(cur), max(cur)))
cur = [sorted_edges[i]]
bins.append((sum(cur) // len(cur), len(cur), min(cur), max(cur)))
# --- Find the leftmost bin qualifying as a real column start ---
total = len(confident)
min_count = max(3, int(total * _min_col_start_ratio))
col_start_bin = None
for b in bins:
if b[1] >= min_count:
col_start_bin = b
break
if col_start_bin is None:
result.append(geo)
continue
# Words to the left of the column-start bin are sub-column candidates
split_threshold = col_start_bin[2] - _edge_tolerance
sub_words = [w for w in geo.words if w['left'] < split_threshold]
main_words = [w for w in geo.words if w['left'] >= split_threshold]
# Count only body words (excluding header/footer) for the threshold check
# so that header/footer words don't artificially trigger a split.
sub_body = [w for w in sub_words
if (min_top_rel is None or w['top'] >= min_top_rel)
and (max_top_rel is None or w['top'] <= max_top_rel)]
if len(sub_body) < 2 or len(sub_body) / len(geo.words) >= 0.35:
result.append(geo)
continue
# --- Guard against inline markers (bullet points, numbering) ---
# Bullet points like "1.", "2.", "•", "-" sit close to the main
# column text and are part of the cell, not a separate column.
# Only split if the horizontal gap between the rightmost sub-word
# and the main column start is large enough.
max_sub_right = max(w['left'] + w.get('width', 0) for w in sub_words)
gap_to_main = col_start_bin[2] - max_sub_right # px gap
median_heights = [w.get('height', 20) for w in confident]
med_h = statistics.median(median_heights) if median_heights else 20
min_gap = max(med_h * 1.2, 20) # at least 1.2× word height or 20px
if gap_to_main < min_gap:
logger.debug(
"SubColumnSplit: column idx=%d skipped — gap=%dpx < min=%dpx "
"(likely inline markers, not a sub-column)",
geo.index, gap_to_main, min_gap)
result.append(geo)
continue
# --- Build two sub-column geometries ---
# Word 'left' values are relative to left_x; geo.x is absolute.
# Convert the split position from relative to absolute coordinates.
max_sub_left = max(w['left'] for w in sub_words)
split_rel = (max_sub_left + col_start_bin[2]) // 2
split_abs = split_rel + left_x
sub_x = geo.x
sub_width = split_abs - geo.x
main_x = split_abs
main_width = (geo.x + geo.width) - split_abs
if sub_width <= 0 or main_width <= 0:
result.append(geo)
continue
sub_geo = ColumnGeometry(
index=0,
x=sub_x,
y=geo.y,
width=sub_width,
height=geo.height,
word_count=len(sub_words),
words=sub_words,
width_ratio=sub_width / content_w if content_w > 0 else 0.0,
is_sub_column=True,
)
main_geo = ColumnGeometry(
index=0,
x=main_x,
y=geo.y,
width=main_width,
height=geo.height,
word_count=len(main_words),
words=main_words,
width_ratio=main_width / content_w if content_w > 0 else 0.0,
is_sub_column=True,
)
result.append(sub_geo)
result.append(main_geo)
logger.info(
f"SubColumnSplit: column idx={geo.index} split at abs_x={split_abs} "
f"(rel={split_rel}), sub={len(sub_words)} words, "
f"main={len(main_words)} words, "
f"col_start_bin=({col_start_bin[0]}, n={col_start_bin[1]})"
)
# Re-index by left-to-right order
result.sort(key=lambda g: g.x)
for i, g in enumerate(result):
g.index = i
return result
def _split_broad_columns(
geometries: List[ColumnGeometry],
content_w: int,
left_x: int = 0,
_broad_threshold: float = 0.35,
_min_gap_px: int = 15,
_min_words_per_split: int = 5,
) -> List[ColumnGeometry]:
"""Split overly broad columns that contain two language blocks (EN+DE).
Uses word-coverage gap analysis: builds a per-pixel coverage array from the
words inside each broad column, finds the largest horizontal gap, and splits
the column at that gap.
Args:
geometries: Column geometries from _detect_sub_columns.
content_w: Width of the content area in pixels.
left_x: Left edge of content ROI in absolute image coordinates.
_broad_threshold: Minimum width_ratio to consider a column "broad".
_min_gap_px: Minimum gap width (pixels) to trigger a split.
_min_words_per_split: Both halves must have at least this many words.
Returns:
Updated list of ColumnGeometry (possibly with more columns).
"""
result: List[ColumnGeometry] = []
logger.info(f"SplitBroadCols: input {len(geometries)} cols: "
f"{[(g.index, g.x, g.width, g.word_count, round(g.width_ratio, 3)) for g in geometries]}")
for geo in geometries:
if geo.width_ratio <= _broad_threshold or len(geo.words) < 10:
result.append(geo)
continue
# Build word-coverage array (per pixel within column)
col_left_rel = geo.x - left_x # column left in content-relative coords
coverage = np.zeros(geo.width, dtype=np.float32)
for wd in geo.words:
# wd['left'] is relative to left_x (content ROI)
wl = wd['left'] - col_left_rel
wr = wl + wd.get('width', 0)
wl = max(0, int(wl))
wr = min(geo.width, int(wr))
if wr > wl:
coverage[wl:wr] += 1.0
# Light smoothing (kernel=3px) to avoid noise
if len(coverage) > 3:
kernel = np.ones(3, dtype=np.float32) / 3.0
coverage = np.convolve(coverage, kernel, mode='same')
# Normalise to [0, 1]
cmax = coverage.max()
if cmax > 0:
coverage /= cmax
# Find INTERNAL gaps where coverage < 0.5
# Exclude edge gaps (touching pixel 0 or geo.width) — those are margins.
low_mask = coverage < 0.5
all_gaps = []
_gs = None
for px in range(len(low_mask)):
if low_mask[px]:
if _gs is None:
_gs = px
else:
if _gs is not None:
all_gaps.append((_gs, px, px - _gs))
_gs = None
if _gs is not None:
all_gaps.append((_gs, len(low_mask), len(low_mask) - _gs))
# Filter: only internal gaps (not touching column edges)
_edge_margin = 10 # pixels from edge to ignore
internal_gaps = [g for g in all_gaps
if g[0] > _edge_margin and g[1] < geo.width - _edge_margin]
best_gap = max(internal_gaps, key=lambda g: g[2]) if internal_gaps else None
logger.info(f"SplitBroadCols: col {geo.index} all_gaps(>=5px): "
f"{[g for g in all_gaps if g[2] >= 5]}, "
f"internal(>=5px): {[g for g in internal_gaps if g[2] >= 5]}, "
f"best={best_gap}")
if best_gap is None or best_gap[2] < _min_gap_px:
result.append(geo)
continue
gap_center = (best_gap[0] + best_gap[1]) // 2
# Split words by midpoint relative to gap
left_words = []
right_words = []
for wd in geo.words:
wl = wd['left'] - col_left_rel
mid = wl + wd.get('width', 0) / 2.0
if mid < gap_center:
left_words.append(wd)
else:
right_words.append(wd)
if len(left_words) < _min_words_per_split or len(right_words) < _min_words_per_split:
result.append(geo)
continue
# Build two new ColumnGeometry objects
split_x_abs = geo.x + gap_center
left_w = gap_center
right_w = geo.width - gap_center
left_geo = ColumnGeometry(
index=0,
x=geo.x,
y=geo.y,
width=left_w,
height=geo.height,
word_count=len(left_words),
words=left_words,
width_ratio=left_w / content_w if content_w else 0,
is_sub_column=True,
)
right_geo = ColumnGeometry(
index=0,
x=split_x_abs,
y=geo.y,
width=right_w,
height=geo.height,
word_count=len(right_words),
words=right_words,
width_ratio=right_w / content_w if content_w else 0,
is_sub_column=True,
)
logger.info(
f"SplitBroadCols: col {geo.index} SPLIT at gap_center={gap_center} "
f"(gap {best_gap[2]}px @ [{best_gap[0]}..{best_gap[1]}]), "
f"left={len(left_words)} words (w={left_w}), "
f"right={len(right_words)} words (w={right_w})"
)
result.append(left_geo)
result.append(right_geo)
# Re-index left-to-right
result.sort(key=lambda g: g.x)
for i, g in enumerate(result):
g.index = i
return result
def _build_geometries_from_starts(
col_starts: List[Tuple[int, int]],
word_dicts: List[Dict],
left_x: int,
right_x: int,
top_y: int,
bottom_y: int,
content_w: int,
content_h: int,
inv: Optional[np.ndarray] = None,
) -> Tuple[List[ColumnGeometry], int, int, int, int, List[Dict], Optional[np.ndarray]]:
"""Build ColumnGeometry objects from a list of (abs_start_x, word_count) pairs."""
geometries = []
for i, (start_x, count) in enumerate(col_starts):
if i + 1 < len(col_starts):
col_width = col_starts[i + 1][0] - start_x
else:
col_width = right_x - start_x
col_left_rel = start_x - left_x
col_right_rel = col_left_rel + col_width
col_words = [w for w in word_dicts
if col_left_rel <= w['left'] < col_right_rel]
geometries.append(ColumnGeometry(
index=i,
x=start_x,
y=top_y,
width=col_width,
height=content_h,
word_count=len(col_words),
words=col_words,
width_ratio=col_width / content_w if content_w > 0 else 0.0,
))
logger.info(f"ColumnGeometry: {len(geometries)} columns: "
f"{[(g.index, g.x, g.width, g.word_count) for g in geometries]}")
return (geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv)
def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int, List[Dict], np.ndarray]]:
"""Detect column geometry using whitespace-gap analysis with word validation.
Phase A of the two-phase column detection. Uses vertical projection
profiles to find whitespace gaps between columns, then validates that
no gap cuts through a word bounding box.
Falls back to clustering-based detection if fewer than 2 gaps are found.
Args:
ocr_img: Binarized grayscale image for layout analysis.
dewarped_bgr: Original BGR image (for Tesseract word detection).
Returns:
Tuple of (geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv)
or None if detection fails entirely.
"""
h, w = ocr_img.shape[:2]
# --- Step 1: Find content bounds ---
inv = cv2.bitwise_not(ocr_img)
left_x, right_x, top_y, bottom_y = _find_content_bounds(inv)
content_w = right_x - left_x
content_h = bottom_y - top_y
if content_w < w * 0.3 or content_h < h * 0.3:
left_x, right_x = 0, w
top_y, bottom_y = 0, h
content_w, content_h = w, h
logger.info(f"ColumnGeometry: content bounds x=[{left_x}..{right_x}] ({content_w}px), "
f"y=[{top_y}..{bottom_y}] ({content_h}px)")
# --- Step 2: Get word bounding boxes from Tesseract ---
# Crop from left_x to full image width (not right_x) so words at the right
# edge of the last column are included even if they extend past the detected
# content boundary (right_x).
content_roi = dewarped_bgr[top_y:bottom_y, left_x:w]
pil_img = Image.fromarray(cv2.cvtColor(content_roi, cv2.COLOR_BGR2RGB))
try:
data = pytesseract.image_to_data(pil_img, lang='eng+deu', output_type=pytesseract.Output.DICT)
except Exception as e:
logger.warning(f"ColumnGeometry: Tesseract image_to_data failed: {e}")
return None
word_dicts = []
left_edges = []
edge_word_indices = []
n_words = len(data['text'])
for i in range(n_words):
conf = int(data['conf'][i]) if str(data['conf'][i]).lstrip('-').isdigit() else -1
text = str(data['text'][i]).strip()
if conf < 30 or not text:
continue
lx = int(data['left'][i])
ty = int(data['top'][i])
bw = int(data['width'][i])
bh = int(data['height'][i])
left_edges.append(lx)
edge_word_indices.append(len(word_dicts))
word_dicts.append({
'text': text, 'conf': conf,
'left': lx, 'top': ty, 'width': bw, 'height': bh,
})
if len(left_edges) < 5:
logger.warning(f"ColumnGeometry: only {len(left_edges)} words detected")
return None
logger.info(f"ColumnGeometry: {len(left_edges)} words detected in content area")
# --- Step 2b: Segment by sub-headers ---
# Pages with sub-headers (e.g. "Unit 4: Bonnie Scotland") have full-width
# text bands that pollute the vertical projection. We detect large
# horizontal gaps (= whitespace rows separating sections) and use only
# the tallest content segment for the projection. This makes column
# detection immune to sub-headers, illustrations, and section dividers.
content_strip = inv[top_y:bottom_y, left_x:right_x]
h_proj_row = np.sum(content_strip, axis=1).astype(float)
h_proj_row_norm = h_proj_row / (content_w * 255) if content_w > 0 else h_proj_row
# Find horizontal gaps (near-empty rows)
H_GAP_THRESH = 0.02 # rows with <2% ink density are "empty"
h_in_gap = h_proj_row_norm < H_GAP_THRESH
H_MIN_GAP = max(5, content_h // 200) # min gap height ~5-7px
h_gaps: List[Tuple[int, int]] = []
h_gap_start = None
for y_idx in range(len(h_in_gap)):
if h_in_gap[y_idx]:
if h_gap_start is None:
h_gap_start = y_idx
else:
if h_gap_start is not None:
if y_idx - h_gap_start >= H_MIN_GAP:
h_gaps.append((h_gap_start, y_idx))
h_gap_start = None
if h_gap_start is not None and len(h_in_gap) - h_gap_start >= H_MIN_GAP:
h_gaps.append((h_gap_start, len(h_in_gap)))
# Identify "large" gaps (significantly bigger than median) that indicate
# section boundaries (sub-headers, chapter titles).
if len(h_gaps) >= 3:
gap_sizes = sorted(g[1] - g[0] for g in h_gaps)
median_gap_h = gap_sizes[len(gap_sizes) // 2]
large_gap_thresh = max(median_gap_h * 1.8, H_MIN_GAP + 3)
large_gaps = [(gs, ge) for gs, ge in h_gaps if ge - gs >= large_gap_thresh]
else:
large_gaps = h_gaps
# Build content segments between large gaps and pick the tallest
seg_boundaries = [0]
for gs, ge in large_gaps:
seg_boundaries.append(gs)
seg_boundaries.append(ge)
seg_boundaries.append(content_h)
segments = []
for i in range(0, len(seg_boundaries) - 1, 2):
seg_top = seg_boundaries[i]
seg_bot = seg_boundaries[i + 1] if i + 1 < len(seg_boundaries) else content_h
seg_height = seg_bot - seg_top
if seg_height > 20: # ignore tiny fragments
segments.append((seg_top, seg_bot, seg_height))
if segments:
segments.sort(key=lambda s: s[2], reverse=True)
best_seg = segments[0]
proj_strip = content_strip[best_seg[0]:best_seg[1], :]
effective_h = best_seg[2]
if len(segments) > 1:
logger.info(f"ColumnGeometry: {len(segments)} segments from {len(large_gaps)} "
f"large h-gaps, using tallest: rows {best_seg[0]}..{best_seg[1]} "
f"({effective_h}px, {effective_h*100/content_h:.0f}%)")
else:
proj_strip = content_strip
effective_h = content_h
# --- Step 3: Vertical projection profile ---
v_proj = np.sum(proj_strip, axis=0).astype(float)
v_proj_norm = v_proj / (effective_h * 255) if effective_h > 0 else v_proj
# Smooth the projection to avoid noise-induced micro-gaps
kernel_size = max(5, content_w // 80)
if kernel_size % 2 == 0:
kernel_size += 1 # keep odd for symmetry
v_smooth = np.convolve(v_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
# --- Step 4: Find whitespace gaps ---
# Threshold: areas with very little ink density are gaps
median_density = float(np.median(v_smooth[v_smooth > 0])) if np.any(v_smooth > 0) else 0.01
gap_threshold = max(median_density * 0.15, 0.005)
in_gap = v_smooth < gap_threshold
MIN_GAP_WIDTH = max(8, content_w // 200) # min ~8px or 0.5% of content width
# Collect contiguous gap regions
raw_gaps = [] # (start_x_rel, end_x_rel) relative to content ROI
gap_start = None
for x in range(len(in_gap)):
if in_gap[x]:
if gap_start is None:
gap_start = x
else:
if gap_start is not None:
gap_width = x - gap_start
if gap_width >= MIN_GAP_WIDTH:
raw_gaps.append((gap_start, x))
gap_start = None
# Handle gap at the right edge
if gap_start is not None:
gap_width = len(in_gap) - gap_start
if gap_width >= MIN_GAP_WIDTH:
raw_gaps.append((gap_start, len(in_gap)))
logger.info(f"ColumnGeometry: {len(raw_gaps)} raw gaps found (threshold={gap_threshold:.4f}, "
f"min_width={MIN_GAP_WIDTH}px): "
f"{[(g[0]+left_x, g[1]+left_x, g[1]-g[0]) for g in raw_gaps]}")
# --- Step 5: Validate gaps against word bounding boxes ---
# When using a segment for projection, only validate against words
# inside that segment — words from sub-headers or other sections
# would incorrectly overlap with real column gaps.
if segments and len(segments) > 1:
seg_top_abs = best_seg[0] # relative to content strip
seg_bot_abs = best_seg[1]
segment_words = [wd for wd in word_dicts
if wd['top'] >= seg_top_abs
and wd['top'] + wd['height'] <= seg_bot_abs]
logger.info(f"ColumnGeometry: filtering words to segment: "
f"{len(segment_words)}/{len(word_dicts)} words")
else:
segment_words = word_dicts
validated_gaps = []
for gap_start_rel, gap_end_rel in raw_gaps:
# Check if any word overlaps with this gap region
overlapping = False
for wd in segment_words:
word_left = wd['left']
word_right = wd['left'] + wd['width']
if word_left < gap_end_rel and word_right > gap_start_rel:
overlapping = True
break
if not overlapping:
validated_gaps.append((gap_start_rel, gap_end_rel))
else:
# Try to shift the gap to avoid the overlapping word(s)
# Find the tightest word boundaries within the gap region
min_word_left = content_w
max_word_right = 0
for wd in segment_words:
word_left = wd['left']
word_right = wd['left'] + wd['width']
if word_left < gap_end_rel and word_right > gap_start_rel:
min_word_left = min(min_word_left, word_left)
max_word_right = max(max_word_right, word_right)
# Try gap before the overlapping words
if min_word_left - gap_start_rel >= MIN_GAP_WIDTH:
validated_gaps.append((gap_start_rel, min_word_left))
logger.debug(f"ColumnGeometry: gap shifted left to avoid word at {min_word_left}")
# Try gap after the overlapping words
elif gap_end_rel - max_word_right >= MIN_GAP_WIDTH:
validated_gaps.append((max_word_right, gap_end_rel))
logger.debug(f"ColumnGeometry: gap shifted right to avoid word at {max_word_right}")
else:
logger.debug(f"ColumnGeometry: gap [{gap_start_rel}..{gap_end_rel}] "
f"discarded (word overlap, no room to shift)")
logger.info(f"ColumnGeometry: {len(validated_gaps)} gaps after word validation: "
f"{[(g[0]+left_x, g[1]+left_x, g[1]-g[0]) for g in validated_gaps]}")
# --- Step 5b: Word-coverage gap detection (fallback for noisy scans) ---
# When pixel-based projection fails (e.g. due to illustrations or colored
# bands), use word bounding boxes to find clear vertical gaps. This is
# immune to decorative graphics that Tesseract doesn't recognise as words.
if len(validated_gaps) < 2:
logger.info("ColumnGeometry: < 2 pixel-gaps, trying word-coverage gaps")
word_coverage = np.zeros(content_w, dtype=np.int32)
for wd in segment_words:
wl = max(0, wd['left'])
wr = min(wd['left'] + wd['width'], content_w)
if wr > wl:
word_coverage[wl:wr] += 1
# Smooth slightly to bridge tiny 1-2px noise gaps between words
wc_kernel = max(3, content_w // 300)
if wc_kernel % 2 == 0:
wc_kernel += 1
wc_smooth = np.convolve(word_coverage.astype(float),
np.ones(wc_kernel) / wc_kernel, mode='same')
wc_in_gap = wc_smooth < 0.5 # effectively zero word coverage
WC_MIN_GAP = max(4, content_w // 300)
wc_gaps: List[Tuple[int, int]] = []
wc_gap_start = None
for x in range(len(wc_in_gap)):
if wc_in_gap[x]:
if wc_gap_start is None:
wc_gap_start = x
else:
if wc_gap_start is not None:
if x - wc_gap_start >= WC_MIN_GAP:
wc_gaps.append((wc_gap_start, x))
wc_gap_start = None
if wc_gap_start is not None and len(wc_in_gap) - wc_gap_start >= WC_MIN_GAP:
wc_gaps.append((wc_gap_start, len(wc_in_gap)))
logger.info(f"ColumnGeometry: {len(wc_gaps)} word-coverage gaps found "
f"(min_width={WC_MIN_GAP}px): "
f"{[(g[0]+left_x, g[1]+left_x, g[1]-g[0]) for g in wc_gaps]}")
if len(wc_gaps) >= 2:
validated_gaps = wc_gaps
# --- Step 6: Fallback to clustering if too few gaps ---
if len(validated_gaps) < 2:
logger.info("ColumnGeometry: < 2 gaps found, falling back to clustering")
return _detect_columns_by_clustering(
word_dicts, left_edges, edge_word_indices,
content_w, content_h, left_x, right_x, top_y, bottom_y, inv,
)
# --- Step 7: Derive column boundaries from gaps ---
# Sort gaps by position
validated_gaps.sort(key=lambda g: g[0])
# Identify margin gaps (first and last) vs interior gaps
# A margin gap touches the edge of the content area (within 2% tolerance)
edge_tolerance = max(10, int(content_w * 0.02))
is_left_margin = validated_gaps[0][0] <= edge_tolerance
is_right_margin = validated_gaps[-1][1] >= content_w - edge_tolerance
# Interior gaps define column boundaries
# Column starts at the end of a gap, ends at the start of the next gap
col_starts = []
if is_left_margin:
# First column starts after the left margin gap
first_gap_end = validated_gaps[0][1]
interior_gaps = validated_gaps[1:]
else:
# No left margin gap — first column starts at content left edge
first_gap_end = 0
interior_gaps = validated_gaps[:]
if is_right_margin:
# Last gap is right margin — don't use it as column start
interior_gaps_for_boundaries = interior_gaps[:-1]
right_boundary = validated_gaps[-1][0] # last column ends at right margin gap start
else:
interior_gaps_for_boundaries = interior_gaps
right_boundary = content_w
# First column
col_starts.append(left_x + first_gap_end)
# Columns between interior gaps
for gap_start_rel, gap_end_rel in interior_gaps_for_boundaries:
col_starts.append(left_x + gap_end_rel)
# Count words per column region (for logging)
col_start_counts = []
for i, start_x in enumerate(col_starts):
if i + 1 < len(col_starts):
next_start = col_starts[i + 1]
else:
# Rightmost column always extends to full image width (w).
# The page margin contains only white space — extending the OCR
# crop to the image edge is safe and prevents text near the right
# border from being cut off.
next_start = w
col_left_rel = start_x - left_x
col_right_rel = next_start - left_x
n_words_in_col = sum(1 for w in word_dicts
if col_left_rel <= w['left'] < col_right_rel)
col_start_counts.append((start_x, n_words_in_col))
logger.info(f"ColumnGeometry: {len(col_starts)} columns from {len(validated_gaps)} gaps "
f"(left_margin={is_left_margin}, right_margin={is_right_margin}): "
f"{col_start_counts}")
# --- Step 8: Build ColumnGeometry objects ---
# Determine right edge for each column
all_boundaries = []
for i, start_x in enumerate(col_starts):
if i + 1 < len(col_starts):
end_x = col_starts[i + 1]
else:
# Rightmost column always extends to full image width (w).
end_x = w
all_boundaries.append((start_x, end_x))
geometries = []
for i, (start_x, end_x) in enumerate(all_boundaries):
col_width = end_x - start_x
col_left_rel = start_x - left_x
col_right_rel = col_left_rel + col_width
col_words = [w for w in word_dicts
if col_left_rel <= w['left'] < col_right_rel]
geometries.append(ColumnGeometry(
index=i,
x=start_x,
y=top_y,
width=col_width,
height=content_h,
word_count=len(col_words),
words=col_words,
width_ratio=col_width / content_w if content_w > 0 else 0.0,
))
logger.info(f"ColumnGeometry: {len(geometries)} columns: "
f"{[(g.index, g.x, g.width, g.word_count) for g in geometries]}")
# --- Step 9: Filter phantom narrow columns ---
# Tiny spurious gaps (e.g. 11px + 35px adjacent) can create very narrow
# columns (< 3% of content width) with zero or no words. These are not
# real columns — remove them and close the gap between neighbors.
min_real_col_w = max(20, int(content_w * 0.03))
filtered_geoms = [g for g in geometries
if not (g.word_count < 3 and g.width < min_real_col_w)]
if len(filtered_geoms) < len(geometries):
n_removed = len(geometries) - len(filtered_geoms)
logger.info(f"ColumnGeometry: removed {n_removed} phantom column(s) "
f"(width < {min_real_col_w}px and words < 3)")
# Extend each remaining column to close gaps with its right neighbor
for i, g in enumerate(filtered_geoms):
if i + 1 < len(filtered_geoms):
g.width = filtered_geoms[i + 1].x - g.x
else:
g.width = w - g.x
g.index = i
col_left_rel = g.x - left_x
col_right_rel = col_left_rel + g.width
g.words = [w for w in word_dicts
if col_left_rel <= w['left'] < col_right_rel]
g.word_count = len(g.words)
geometries = filtered_geoms
logger.info(f"ColumnGeometry: {len(geometries)} columns after phantom filter: "
f"{[(g.index, g.x, g.width, g.word_count) for g in geometries]}")
return (geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv)
def expand_narrow_columns(
geometries: List[ColumnGeometry],
content_w: int,
left_x: int,
word_dicts: List[Dict],
) -> List[ColumnGeometry]:
"""Expand narrow columns into adjacent whitespace gaps.
Narrow columns (marker, page_ref, < 10% content width) often lose
content at image edges due to residual shear. This expands them toward
the neighbouring column, but never past 40% of the gap or past the
nearest word in the neighbour.
Must be called AFTER _detect_sub_columns() so that sub-column splits
(which create the narrowest columns) have already happened.
"""
_NARROW_THRESHOLD_PCT = 10.0
_MIN_WORD_MARGIN = 4
if len(geometries) < 2:
return geometries
logger.info("ExpandNarrowCols: input %d cols: %s",
len(geometries),
[(i, g.x, g.width, round(g.width / content_w * 100, 1))
for i, g in enumerate(geometries)])
for i, g in enumerate(geometries):
col_pct = g.width / content_w * 100 if content_w > 0 else 100
if col_pct >= _NARROW_THRESHOLD_PCT:
continue
expanded = False
orig_pct = col_pct
# --- try expanding to the LEFT ---
if i > 0:
left_nb = geometries[i - 1]
# Gap can be 0 if sub-column split created adjacent columns.
# In that case, look at where the neighbor's rightmost words
# actually are — there may be unused space we can claim.
nb_words_right = [wd['left'] + wd.get('width', 0)
for wd in left_nb.words]
if nb_words_right:
rightmost_word_abs = left_x + max(nb_words_right)
safe_left_abs = rightmost_word_abs + _MIN_WORD_MARGIN
else:
# No words in neighbor → we can take up to neighbor's start
safe_left_abs = left_nb.x + _MIN_WORD_MARGIN
if safe_left_abs < g.x:
g.width += (g.x - safe_left_abs)
g.x = safe_left_abs
expanded = True
# --- try expanding to the RIGHT ---
if i + 1 < len(geometries):
right_nb = geometries[i + 1]
nb_words_left = [wd['left'] for wd in right_nb.words]
if nb_words_left:
leftmost_word_abs = left_x + min(nb_words_left)
safe_right_abs = leftmost_word_abs - _MIN_WORD_MARGIN
else:
safe_right_abs = right_nb.x + right_nb.width - _MIN_WORD_MARGIN
cur_right = g.x + g.width
if safe_right_abs > cur_right:
g.width = safe_right_abs - g.x
expanded = True
if expanded:
col_left_rel = g.x - left_x
col_right_rel = col_left_rel + g.width
g.words = [wd for wd in word_dicts
if col_left_rel <= wd['left'] < col_right_rel]
g.word_count = len(g.words)
g.width_ratio = g.width / content_w if content_w > 0 else 0.0
logger.info(
"ExpandNarrowCols: col %d (%.1f%%%.1f%%) x=%d w=%d words=%d",
i, orig_pct, g.width / content_w * 100, g.x, g.width, g.word_count)
# --- Shrink overlapping neighbors to match new boundaries ---
# Left neighbor: its right edge must not exceed our new left edge
if i > 0:
left_nb = geometries[i - 1]
nb_right = left_nb.x + left_nb.width
if nb_right > g.x:
left_nb.width = g.x - left_nb.x
if left_nb.width < 0:
left_nb.width = 0
left_nb.width_ratio = left_nb.width / content_w if content_w > 0 else 0.0
# Re-assign words
nb_left_rel = left_nb.x - left_x
nb_right_rel = nb_left_rel + left_nb.width
left_nb.words = [wd for wd in word_dicts
if nb_left_rel <= wd['left'] < nb_right_rel]
left_nb.word_count = len(left_nb.words)
# Right neighbor: its left edge must not be before our new right edge
if i + 1 < len(geometries):
right_nb = geometries[i + 1]
my_right = g.x + g.width
if right_nb.x < my_right:
old_right_edge = right_nb.x + right_nb.width
right_nb.x = my_right
right_nb.width = old_right_edge - right_nb.x
if right_nb.width < 0:
right_nb.width = 0
right_nb.width_ratio = right_nb.width / content_w if content_w > 0 else 0.0
# Re-assign words
nb_left_rel = right_nb.x - left_x
nb_right_rel = nb_left_rel + right_nb.width
right_nb.words = [wd for wd in word_dicts
if nb_left_rel <= wd['left'] < nb_right_rel]
right_nb.word_count = len(right_nb.words)
return geometries
# =============================================================================
# Row Geometry Detection (horizontal whitespace-gap analysis)
# =============================================================================
def detect_row_geometry(
inv: np.ndarray,
word_dicts: List[Dict],
left_x: int, right_x: int,
top_y: int, bottom_y: int,
) -> List['RowGeometry']:
"""Detect row geometry using horizontal whitespace-gap analysis.
Algorithm overview (two phases):
Phase 1 — Gap-based detection (Steps 16):
1. Build a horizontal projection profile: for each y-pixel, sum the
ink density across the content width. Only pixels within/near
Tesseract word bounding boxes contribute (word_mask), so that
images/illustrations don't merge adjacent text rows.
2. Smooth the projection and find contiguous regions below a
threshold (= gaps / horizontal whitespace between text lines).
The threshold is 15% of the median non-zero density.
3. Validate gaps against word bounding boxes — discard any gap
that overlaps a word, or shift the gap boundary to avoid the word.
4. Build rows from the spans between validated gaps.
5. Detect header/footer rows: gaps in the top/bottom 15% of the
page that are >= 2× the median gap size mark section boundaries.
Phase 2 — Word-center regularization (_regularize_row_grid, Step 7):
For each word, compute its vertical center (top + height/2).
Group words into line clusters by Y-proximity (tolerance = 40% of
the median gap-based row height).
For each cluster, the line center = median of all word centers.
The "pitch" = distance between consecutive line centers.
Section breaks are detected where the pitch exceeds 1.8× the median.
Within each section, row boundaries are placed at the midpoints
between consecutive line centers:
- Row top = midpoint to previous line center (or center - pitch/2 for first)
- Row bottom = midpoint to next line center (or center + pitch/2 for last)
This ensures rows tile without gaps or overlaps.
Fallback:
If < 2 gaps are found (very dense or uniform text), falls back to
_build_rows_from_word_grouping() which groups words by Y proximity.
Args:
inv: Inverted binarized image (white text on black bg, full page).
word_dicts: Word bounding boxes from Tesseract (relative to content ROI).
left_x, right_x: Absolute X bounds of the content area.
top_y, bottom_y: Absolute Y bounds of the content area.
Returns:
List of RowGeometry objects sorted top to bottom.
"""
content_w = right_x - left_x
content_h = bottom_y - top_y
if content_h < 10 or content_w < 10:
logger.warning("detect_row_geometry: content area too small")
return []
# --- Step 1: Horizontal projection profile ---
# For each y-pixel row, sum ink density across the content width.
# A word-coverage mask ensures only pixels near Tesseract words contribute,
# so that illustrations/images don't inflate the density and merge rows.
content_strip = inv[top_y:bottom_y, left_x:right_x]
WORD_PAD_Y = max(4, content_h // 300) # small vertical padding around words
word_mask = np.zeros((content_h, content_w), dtype=np.uint8)
for wd in word_dicts:
y1 = max(0, wd['top'] - WORD_PAD_Y)
y2 = min(content_h, wd['top'] + wd['height'] + WORD_PAD_Y)
x1 = max(0, wd['left'])
x2 = min(content_w, wd['left'] + wd['width'])
word_mask[y1:y2, x1:x2] = 255
masked_strip = cv2.bitwise_and(content_strip, word_mask)
h_proj = np.sum(masked_strip, axis=1).astype(float)
h_proj_norm = h_proj / (content_w * 255) if content_w > 0 else h_proj
# --- Step 2: Smoothing + gap threshold ---
# Smooth the projection to reduce noise, then threshold at 15% of the
# median non-zero density. Pixels below this threshold are considered
# "gap" (horizontal whitespace between text lines).
# MIN_GAP_HEIGHT prevents tiny noise gaps from splitting rows.
kernel_size = max(3, content_h // 200)
if kernel_size % 2 == 0:
kernel_size += 1
h_smooth = np.convolve(h_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
median_density = float(np.median(h_smooth[h_smooth > 0])) if np.any(h_smooth > 0) else 0.01
gap_threshold = max(median_density * 0.15, 0.003)
in_gap = h_smooth < gap_threshold
MIN_GAP_HEIGHT = max(3, content_h // 500)
# --- Step 3: Collect contiguous gap regions ---
raw_gaps = [] # (start_y_rel, end_y_rel) relative to content ROI
gap_start = None
for y in range(len(in_gap)):
if in_gap[y]:
if gap_start is None:
gap_start = y
else:
if gap_start is not None:
gap_height = y - gap_start
if gap_height >= MIN_GAP_HEIGHT:
raw_gaps.append((gap_start, y))
gap_start = None
if gap_start is not None:
gap_height = len(in_gap) - gap_start
if gap_height >= MIN_GAP_HEIGHT:
raw_gaps.append((gap_start, len(in_gap)))
logger.info(f"RowGeometry: {len(raw_gaps)} raw gaps found (threshold={gap_threshold:.4f}, "
f"min_height={MIN_GAP_HEIGHT}px)")
# --- Step 4: Validate gaps against word bounding boxes ---
# A gap is valid only if no word's bounding box overlaps it vertically.
# If a word overlaps, try to shift the gap boundary above or below the
# word. If neither shift yields enough room (>= MIN_GAP_HEIGHT), discard.
validated_gaps = []
for gap_start_rel, gap_end_rel in raw_gaps:
overlapping = False
for wd in word_dicts:
word_top = wd['top']
word_bottom = wd['top'] + wd['height']
if word_top < gap_end_rel and word_bottom > gap_start_rel:
overlapping = True
break
if not overlapping:
validated_gaps.append((gap_start_rel, gap_end_rel))
else:
# Try to shift the gap to avoid overlapping words
min_word_top = content_h
max_word_bottom = 0
for wd in word_dicts:
word_top = wd['top']
word_bottom = wd['top'] + wd['height']
if word_top < gap_end_rel and word_bottom > gap_start_rel:
min_word_top = min(min_word_top, word_top)
max_word_bottom = max(max_word_bottom, word_bottom)
if min_word_top - gap_start_rel >= MIN_GAP_HEIGHT:
validated_gaps.append((gap_start_rel, min_word_top))
elif gap_end_rel - max_word_bottom >= MIN_GAP_HEIGHT:
validated_gaps.append((max_word_bottom, gap_end_rel))
else:
logger.debug(f"RowGeometry: gap [{gap_start_rel}..{gap_end_rel}] "
f"discarded (word overlap, no room to shift)")
logger.info(f"RowGeometry: {len(validated_gaps)} gaps after word validation")
# --- Fallback if too few gaps ---
if len(validated_gaps) < 2:
logger.info("RowGeometry: < 2 gaps found, falling back to word grouping")
return _build_rows_from_word_grouping(
word_dicts, left_x, right_x, top_y, bottom_y, content_w, content_h,
)
validated_gaps.sort(key=lambda g: g[0])
# --- Step 5: Header/footer detection via gap size ---
HEADER_FOOTER_ZONE = 0.15
GAP_MULTIPLIER = 2.0
gap_sizes = [g[1] - g[0] for g in validated_gaps]
median_gap = float(np.median(gap_sizes)) if gap_sizes else 0
large_gap_threshold = median_gap * GAP_MULTIPLIER
header_boundary_rel = None # y below which is header
footer_boundary_rel = None # y above which is footer
header_zone_limit = int(content_h * HEADER_FOOTER_ZONE)
footer_zone_start = int(content_h * (1.0 - HEADER_FOOTER_ZONE))
# Find largest gap in header zone
best_header_gap = None
for gs, ge in validated_gaps:
gap_mid = (gs + ge) / 2
gap_size = ge - gs
if gap_mid < header_zone_limit and gap_size > large_gap_threshold:
if best_header_gap is None or gap_size > (best_header_gap[1] - best_header_gap[0]):
best_header_gap = (gs, ge)
if best_header_gap is not None:
header_boundary_rel = best_header_gap[1]
logger.info(f"RowGeometry: header boundary at y_rel={header_boundary_rel} "
f"(gap={best_header_gap[1] - best_header_gap[0]}px, "
f"median_gap={median_gap:.0f}px)")
# Find largest gap in footer zone
best_footer_gap = None
for gs, ge in validated_gaps:
gap_mid = (gs + ge) / 2
gap_size = ge - gs
if gap_mid > footer_zone_start and gap_size > large_gap_threshold:
if best_footer_gap is None or gap_size > (best_footer_gap[1] - best_footer_gap[0]):
best_footer_gap = (gs, ge)
if best_footer_gap is not None:
footer_boundary_rel = best_footer_gap[0]
logger.info(f"RowGeometry: footer boundary at y_rel={footer_boundary_rel} "
f"(gap={best_footer_gap[1] - best_footer_gap[0]}px)")
# --- Step 6: Build RowGeometry objects from gaps ---
# Rows are the spans between consecutive gaps. The gap midpoints define
# where one row ends and the next begins. Each row's height extends
# from the end of the previous gap to the start of the next gap.
row_boundaries = [] # (start_y_rel, end_y_rel)
# Top of content to first gap
if validated_gaps[0][0] > MIN_GAP_HEIGHT:
row_boundaries.append((0, validated_gaps[0][0]))
# Between gaps
for i in range(len(validated_gaps) - 1):
row_start = validated_gaps[i][1]
row_end = validated_gaps[i + 1][0]
if row_end - row_start > 0:
row_boundaries.append((row_start, row_end))
# Last gap to bottom of content
if validated_gaps[-1][1] < content_h - MIN_GAP_HEIGHT:
row_boundaries.append((validated_gaps[-1][1], content_h))
rows = []
for idx, (row_start_rel, row_end_rel) in enumerate(row_boundaries):
# Determine row type
row_mid = (row_start_rel + row_end_rel) / 2
if header_boundary_rel is not None and row_mid < header_boundary_rel:
row_type = 'header'
elif footer_boundary_rel is not None and row_mid > footer_boundary_rel:
row_type = 'footer'
else:
row_type = 'content'
# Collect words in this row
row_words = [w for w in word_dicts
if w['top'] + w['height'] / 2 >= row_start_rel
and w['top'] + w['height'] / 2 < row_end_rel]
# Gap before this row
gap_before = 0
if idx == 0 and validated_gaps[0][0] > 0:
gap_before = validated_gaps[0][0]
elif idx > 0:
# Find the gap just before this row boundary
for gs, ge in validated_gaps:
if ge == row_start_rel:
gap_before = ge - gs
break
rows.append(RowGeometry(
index=idx,
x=left_x,
y=top_y + row_start_rel,
width=content_w,
height=row_end_rel - row_start_rel,
word_count=len(row_words),
words=row_words,
row_type=row_type,
gap_before=gap_before,
))
# --- Step 7: Word-center grid regularization ---
# Refine the gap-based rows using word vertical centers. For each word,
# compute center_y = top + height/2. Group into line clusters, compute
# the pitch (distance between consecutive line centers), and place row
# boundaries at the midpoints between centers. This gives more precise
# and evenly-spaced rows than the gap-based approach alone.
# Also detects section breaks (headings, paragraphs) where the pitch
# exceeds 1.8× the median, and handles each section independently.
rows = _regularize_row_grid(rows, word_dicts, left_x, right_x, top_y,
content_w, content_h, inv)
type_counts = {}
for r in rows:
type_counts[r.row_type] = type_counts.get(r.row_type, 0) + 1
logger.info(f"RowGeometry: {len(rows)} rows detected: {type_counts}")
return rows
def _regularize_row_grid(
rows: List['RowGeometry'],
word_dicts: List[Dict],
left_x: int, right_x: int,
top_y: int,
content_w: int, content_h: int,
inv: np.ndarray,
) -> List['RowGeometry']:
"""Rebuild row boundaries from word center-lines with section-break awareness.
Instead of overlaying a rigid grid, this derives row positions bottom-up
from the words themselves:
Step A: Group all content words into line clusters by Y-proximity.
Tolerance = 40% of median gap-based row height.
Step B: For each cluster compute:
- center_y = median of (word_top + word_height/2) for all words
- letter_h = median of word heights (excluding outliers > 2× median)
Step B2: Merge clusters whose centers are closer than 30% of row height
(spurious splits from OCR jitter).
Step C: Compute pitches (distances between consecutive centers).
Detect section breaks where gap > 1.8× median pitch.
Step D: Split clusters into sections at the section breaks.
Step E: Within each section, place row boundaries at midpoints between
consecutive line centers:
- First row top = center - local_pitch/2
- Last row bottom = center + local_pitch/2
- Interior boundaries = (center_i + center_{i+1}) / 2
This ensures rows tile seamlessly without gaps or overlaps.
Step F: Re-assign words to the nearest grid row by vertical center distance.
Step G: Validate that >= 85% of words land in a grid row; otherwise
fall back to the original gap-based rows.
Step H: Merge with preserved header/footer rows and re-index.
Guard: Requires >= 5 content rows from gap-based detection to activate.
This prevents the regularizer from running on very small images (e.g.
box sub-sessions with only 3-6 rows) where the gap-based detection
is already accurate enough.
Header/footer rows from the gap-based detection are preserved.
"""
content_rows = [r for r in rows if r.row_type == 'content']
non_content = [r for r in rows if r.row_type != 'content']
if len(content_rows) < 5:
return rows
# --- Step A: Group ALL words into line clusters ---
# Collect words that belong to content rows (deduplicated)
content_words: List[Dict] = []
seen_keys: set = set()
for r in content_rows:
for w in r.words:
key = (w['left'], w['top'], w['width'], w['height'])
if key not in seen_keys:
seen_keys.add(key)
content_words.append(w)
if len(content_words) < 5:
return rows
# Compute median word height (excluding outliers like tall brackets/IPA)
word_heights = sorted(w['height'] for w in content_words)
median_wh = word_heights[len(word_heights) // 2]
# Compute median gap-based row height — this is the actual line height
# as detected by the horizontal projection. We use 40% of this as
# grouping tolerance. This is much more reliable than using word height
# alone, because words on the same line can have very different heights
# (e.g. lowercase vs uppercase, brackets, phonetic symbols).
gap_row_heights = sorted(r.height for r in content_rows)
median_row_h = gap_row_heights[len(gap_row_heights) // 2]
# Tolerance: 40% of row height. Words on the same line should have
# centers within this range. Even if a word's bbox is taller/shorter,
# its center should stay within half a row height of the line center.
y_tol = max(10, int(median_row_h * 0.4))
# Sort by center_y, then group by proximity
words_by_center = sorted(content_words,
key=lambda w: (w['top'] + w['height'] / 2, w['left']))
line_clusters: List[List[Dict]] = []
current_line: List[Dict] = [words_by_center[0]]
current_center = words_by_center[0]['top'] + words_by_center[0]['height'] / 2
for w in words_by_center[1:]:
w_center = w['top'] + w['height'] / 2
if abs(w_center - current_center) <= y_tol:
current_line.append(w)
else:
current_line.sort(key=lambda w: w['left'])
line_clusters.append(current_line)
current_line = [w]
current_center = w_center
if current_line:
current_line.sort(key=lambda w: w['left'])
line_clusters.append(current_line)
if len(line_clusters) < 3:
return rows
# --- Step B: Compute center_y per cluster ---
# center_y = median of (word_top + word_height/2) across all words in cluster
# letter_h = median of word heights, but excluding outlier-height words
# (>2× median) so that tall brackets/IPA don't skew the height
cluster_info: List[Dict] = []
for cl_words in line_clusters:
centers = [w['top'] + w['height'] / 2 for w in cl_words]
# Filter outlier heights for letter_h computation
normal_heights = [w['height'] for w in cl_words
if w['height'] <= median_wh * 2.0]
if not normal_heights:
normal_heights = [w['height'] for w in cl_words]
center_y = float(np.median(centers))
letter_h = float(np.median(normal_heights))
cluster_info.append({
'center_y_rel': center_y, # relative to content ROI
'center_y_abs': center_y + top_y, # absolute
'letter_h': letter_h,
'words': cl_words,
})
cluster_info.sort(key=lambda c: c['center_y_rel'])
# --- Step B2: Merge clusters that are too close together ---
# Even with center-based grouping, some edge cases can produce
# spurious clusters. Merge any pair whose centers are closer
# than 30% of the row height (they're definitely the same text line).
merge_threshold = max(8, median_row_h * 0.3)
merged: List[Dict] = [cluster_info[0]]
for cl in cluster_info[1:]:
prev = merged[-1]
if cl['center_y_rel'] - prev['center_y_rel'] < merge_threshold:
# Merge: combine words, recompute center
combined_words = prev['words'] + cl['words']
centers = [w['top'] + w['height'] / 2 for w in combined_words]
normal_heights = [w['height'] for w in combined_words
if w['height'] <= median_wh * 2.0]
if not normal_heights:
normal_heights = [w['height'] for w in combined_words]
prev['center_y_rel'] = float(np.median(centers))
prev['center_y_abs'] = prev['center_y_rel'] + top_y
prev['letter_h'] = float(np.median(normal_heights))
prev['words'] = combined_words
else:
merged.append(cl)
cluster_info = merged
if len(cluster_info) < 3:
return rows
# --- Step C: Compute pitches and detect section breaks ---
pitches: List[float] = []
for i in range(1, len(cluster_info)):
pitch = cluster_info[i]['center_y_rel'] - cluster_info[i - 1]['center_y_rel']
pitches.append(pitch)
if not pitches:
return rows
median_pitch = float(np.median(pitches))
if median_pitch <= 5:
return rows
# A section break is where the gap between line centers is much larger
# than the normal pitch (sub-headings, section titles, etc.)
BREAK_FACTOR = 1.8
# --- Step D: Build sections (groups of consecutive lines with normal spacing) ---
sections: List[List[Dict]] = []
current_section: List[Dict] = [cluster_info[0]]
for i in range(1, len(cluster_info)):
gap = cluster_info[i]['center_y_rel'] - cluster_info[i - 1]['center_y_rel']
if gap > median_pitch * BREAK_FACTOR:
sections.append(current_section)
current_section = [cluster_info[i]]
else:
current_section.append(cluster_info[i])
if current_section:
sections.append(current_section)
# --- Step E: Build row boundaries per section ---
grid_rows: List[RowGeometry] = []
for section in sections:
if not section:
continue
if len(section) == 1:
# Single-line section (likely a heading)
cl = section[0]
half_h = max(cl['letter_h'], median_pitch * 0.4)
row_top = cl['center_y_abs'] - half_h
row_bot = cl['center_y_abs'] + half_h
grid_rows.append(RowGeometry(
index=0,
x=left_x,
y=round(row_top),
width=content_w,
height=round(row_bot - row_top),
word_count=len(cl['words']),
words=cl['words'],
row_type='content',
gap_before=0,
))
continue
# Compute local pitch for this section
local_pitches = []
for i in range(1, len(section)):
local_pitches.append(
section[i]['center_y_rel'] - section[i - 1]['center_y_rel']
)
local_pitch = float(np.median(local_pitches)) if local_pitches else median_pitch
# Row boundaries are placed at midpoints between consecutive centers.
# First row: top = center - local_pitch/2
# Last row: bottom = center + local_pitch/2
for i, cl in enumerate(section):
if i == 0:
row_top = cl['center_y_abs'] - local_pitch / 2
else:
# Midpoint between this center and previous center
prev_center = section[i - 1]['center_y_abs']
row_top = (prev_center + cl['center_y_abs']) / 2
if i == len(section) - 1:
row_bot = cl['center_y_abs'] + local_pitch / 2
else:
next_center = section[i + 1]['center_y_abs']
row_bot = (cl['center_y_abs'] + next_center) / 2
# Clamp to reasonable bounds
row_top = max(top_y, row_top)
row_bot = min(top_y + content_h, row_bot)
if row_bot - row_top < 5:
continue
grid_rows.append(RowGeometry(
index=0,
x=left_x,
y=round(row_top),
width=content_w,
height=round(row_bot - row_top),
word_count=len(cl['words']),
words=cl['words'],
row_type='content',
gap_before=0,
))
if not grid_rows:
return rows
# --- Step F: Re-assign words to grid rows ---
# Words may have shifted slightly; assign each word to the row whose
# center is closest to the word's vertical center.
for gr in grid_rows:
gr.words = []
for w in content_words:
w_center = w['top'] + top_y + w['height'] / 2
best_row = None
best_dist = float('inf')
for gr in grid_rows:
row_center = gr.y + gr.height / 2
dist = abs(w_center - row_center)
if dist < best_dist:
best_dist = dist
best_row = gr
if best_row is not None and best_dist < median_pitch:
best_row.words.append(w)
for gr in grid_rows:
gr.word_count = len(gr.words)
# --- Step G: Validate ---
words_placed = sum(gr.word_count for gr in grid_rows)
if len(content_words) > 0:
match_ratio = words_placed / len(content_words)
if match_ratio < 0.85:
logger.info(f"RowGrid: word-center grid only matches {match_ratio:.0%} "
f"of words, keeping gap-based rows")
return rows
# Remove empty grid rows (no words assigned)
grid_rows = [gr for gr in grid_rows if gr.word_count > 0]
# --- Step H: Merge header/footer + re-index ---
result = list(non_content) + grid_rows
result.sort(key=lambda r: r.y)
for i, r in enumerate(result):
r.index = i
row_heights = [gr.height for gr in grid_rows]
min_h = min(row_heights) if row_heights else 0
max_h = max(row_heights) if row_heights else 0
logger.info(f"RowGrid: word-center grid applied "
f"(median_pitch={median_pitch:.0f}px, median_row_h={median_row_h}px, median_wh={median_wh}px, "
f"y_tol={y_tol}px, {len(line_clusters)} clusters→{len(cluster_info)} merged, "
f"{len(sections)} sections, "
f"{len(grid_rows)} grid rows [h={min_h}-{max_h}px], "
f"was {len(content_rows)} gap-based rows)")
return result
def _build_rows_from_word_grouping(
word_dicts: List[Dict],
left_x: int, right_x: int,
top_y: int, bottom_y: int,
content_w: int, content_h: int,
) -> List['RowGeometry']:
"""Fallback: build rows by grouping words by Y position.
Uses _group_words_into_lines() with a generous tolerance.
No header/footer detection in fallback mode.
"""
if not word_dicts:
return []
y_tolerance = max(20, content_h // 100)
lines = _group_words_into_lines(word_dicts, y_tolerance_px=y_tolerance)
rows = []
for idx, line_words in enumerate(lines):
if not line_words:
continue
min_top = min(w['top'] for w in line_words)
max_bottom = max(w['top'] + w['height'] for w in line_words)
row_height = max_bottom - min_top
rows.append(RowGeometry(
index=idx,
x=left_x,
y=top_y + min_top,
width=content_w,
height=row_height,
word_count=len(line_words),
words=line_words,
row_type='content',
gap_before=0,
))
logger.info(f"RowGeometry (fallback): {len(rows)} rows from word grouping")
return rows
# --- Phase B: Content-Based Classification ---
def _score_language(words: List[Dict]) -> Dict[str, float]:
"""Score the language of a column's words.
Analyzes function words, umlauts, and capitalization patterns
to determine whether text is English or German.
Args:
words: List of word dicts with 'text' and 'conf' keys.
Returns:
Dict with 'eng' and 'deu' scores (0.0-1.0).
"""
if not words:
return {'eng': 0.0, 'deu': 0.0}
# Only consider words with decent confidence
good_words = [w['text'].lower() for w in words if w.get('conf', 0) > 40 and len(w['text']) > 0]
if not good_words:
return {'eng': 0.0, 'deu': 0.0}
total = len(good_words)
en_hits = sum(1 for w in good_words if w in ENGLISH_FUNCTION_WORDS)
de_hits = sum(1 for w in good_words if w in GERMAN_FUNCTION_WORDS)
# Check for umlauts (strong German signal)
raw_texts = [w['text'] for w in words if w.get('conf', 0) > 40]
umlaut_count = sum(1 for t in raw_texts
for c in t if c in 'äöüÄÖÜß')
# German capitalization: nouns are capitalized mid-sentence
# Count words that start with uppercase but aren't at position 0
cap_words = sum(1 for t in raw_texts if t[0].isupper() and len(t) > 2)
en_score = en_hits / total if total > 0 else 0.0
de_score = de_hits / total if total > 0 else 0.0
# Boost German score for umlauts
if umlaut_count > 0:
de_score = min(1.0, de_score + 0.15 * min(umlaut_count, 5))
# Boost German score for high capitalization ratio (typical for German nouns)
if total > 5:
cap_ratio = cap_words / total
if cap_ratio > 0.3:
de_score = min(1.0, de_score + 0.1)
return {'eng': round(en_score, 3), 'deu': round(de_score, 3)}
def _score_role(geom: ColumnGeometry) -> Dict[str, float]:
"""Score the role of a column based on its geometry and content patterns.
Args:
geom: ColumnGeometry with words and dimensions.
Returns:
Dict with role scores: 'reference', 'marker', 'sentence', 'vocabulary'.
"""
scores = {'reference': 0.0, 'marker': 0.0, 'sentence': 0.0, 'vocabulary': 0.0}
if not geom.words:
return scores
texts = [w['text'] for w in geom.words if w.get('conf', 0) > 40]
if not texts:
return scores
avg_word_len = sum(len(t) for t in texts) / len(texts)
has_punctuation = sum(1 for t in texts if any(c in t for c in '.!?;:,'))
digit_words = sum(1 for t in texts if any(c.isdigit() for c in t))
digit_ratio = digit_words / len(texts) if texts else 0.0
# Reference: narrow + mostly numbers/page references
if geom.width_ratio < 0.12:
scores['reference'] = 0.5
if digit_ratio > 0.4:
scores['reference'] = min(1.0, 0.5 + digit_ratio * 0.5)
# Marker: narrow + few short entries
if geom.width_ratio < 0.06 and geom.word_count <= 15:
scores['marker'] = 0.7
if avg_word_len < 4:
scores['marker'] = 0.9
# Very narrow non-edge column → strong marker regardless of word count
if geom.width_ratio < 0.04 and geom.index > 0:
scores['marker'] = max(scores['marker'], 0.9)
# Sentence: longer words + punctuation present
if geom.width_ratio > 0.15 and has_punctuation > 2:
scores['sentence'] = 0.3 + min(0.5, has_punctuation / len(texts))
if avg_word_len > 4:
scores['sentence'] = min(1.0, scores['sentence'] + 0.2)
# Vocabulary: medium width + medium word length
if 0.10 < geom.width_ratio < 0.45:
scores['vocabulary'] = 0.4
if 3 < avg_word_len < 8:
scores['vocabulary'] = min(1.0, scores['vocabulary'] + 0.3)
return {k: round(v, 3) for k, v in scores.items()}
# --- Dictionary / Wörterbuch Detection ---
# Article words that appear as a dedicated column in dictionaries
_DICT_ARTICLE_WORDS = {
# German articles
"die", "der", "das", "dem", "den", "des", "ein", "eine", "einem", "einer",
# English articles / infinitive marker
"the", "a", "an", "to",
}
def _score_dictionary_signals(
geometries: List[ColumnGeometry],
document_category: Optional[str] = None,
margin_strip_detected: bool = False,
) -> Dict[str, Any]:
"""Score dictionary-specific patterns across all columns.
Combines 4 independent signals to determine if the page is a dictionary:
1. Alphabetical ordering of words in each column
2. Article column detection (der/die/das, to)
3. First-letter uniformity (most headwords share a letter)
4. Decorative A-Z margin strip (detected upstream)
Args:
geometries: List of ColumnGeometry with words.
document_category: User-selected category (e.g. 'woerterbuch').
margin_strip_detected: Whether a decorative A-Z margin strip was found.
Returns:
Dict with 'is_dictionary', 'confidence', 'article_col_index',
'headword_col_index', and 'signals' sub-dict.
"""
result: Dict[str, Any] = {
"is_dictionary": False,
"confidence": 0.0,
"article_col_index": None,
"headword_col_index": None,
"signals": {},
}
if not geometries or len(geometries) < 2:
return result
# --- Signal 1: Alphabetical ordering per column (weight 0.35) ---
best_alpha_score = 0.0
best_alpha_col = -1
for geom in geometries:
texts = [
w["text"].strip().lower()
for w in sorted(geom.words, key=lambda w: w.get("top", 0))
if w.get("conf", 0) > 30 and len(w["text"].strip()) >= 2
]
if len(texts) < 5:
continue
# Deduplicate consecutive identical words (OCR double-reads)
deduped = [texts[0]]
for t in texts[1:]:
if t != deduped[-1]:
deduped.append(t)
if len(deduped) < 5:
continue
# Count consecutive pairs in alphabetical order
ordered_pairs = sum(
1 for i in range(len(deduped) - 1)
if deduped[i] <= deduped[i + 1]
)
alpha_score = ordered_pairs / (len(deduped) - 1)
if alpha_score > best_alpha_score:
best_alpha_score = alpha_score
best_alpha_col = geom.index
result["signals"]["alphabetical_score"] = round(best_alpha_score, 3)
result["signals"]["alphabetical_col"] = best_alpha_col
# --- Signal 2: Article detection (weight 0.25) ---
# Check three patterns:
# (a) Dedicated narrow article column (der/die/das only)
# (b) Inline articles: multi-word texts starting with "der X", "die X"
# (c) High article word frequency: many individual words ARE articles
# (common when OCR splits "der Zustand" into separate word_boxes)
best_article_density = 0.0
best_article_col = -1
best_inline_article_ratio = 0.0
best_article_word_ratio = 0.0
for geom in geometries:
texts = [
w["text"].strip().lower()
for w in geom.words
if w.get("conf", 0) > 30 and len(w["text"].strip()) > 0
]
if len(texts) < 3:
continue
# (a) Dedicated article column: narrow, mostly article words
article_count = sum(1 for t in texts if t in _DICT_ARTICLE_WORDS)
if geom.width_ratio <= 0.20:
density = article_count / len(texts)
if density > best_article_density:
best_article_density = density
best_article_col = geom.index
# (b) Inline articles: "der Zustand", "die Zutat", etc.
inline_count = sum(
1 for t in texts
if any(t.startswith(art + " ") for art in _DICT_ARTICLE_WORDS)
)
inline_ratio = inline_count / len(texts)
if inline_ratio > best_inline_article_ratio:
best_inline_article_ratio = inline_ratio
# (c) Article word frequency in any column (for OCR-split word_boxes)
# In dictionaries, articles appear frequently among headwords
# Require at least 10% articles and >= 3 article words
if article_count >= 3:
art_ratio = article_count / len(texts)
# Only count if column has enough non-article words too
# (pure article column is handled by (a))
non_art = len(texts) - article_count
if non_art >= 3 and art_ratio > best_article_word_ratio:
best_article_word_ratio = art_ratio
# Use the strongest signal
effective_article_score = max(
best_article_density,
best_inline_article_ratio,
best_article_word_ratio * 0.8, # slight discount for raw word ratio
)
result["signals"]["article_density"] = round(best_article_density, 3)
result["signals"]["inline_article_ratio"] = round(best_inline_article_ratio, 3)
result["signals"]["article_word_ratio"] = round(best_article_word_ratio, 3)
result["signals"]["article_col"] = best_article_col
# --- Signal 3: First-letter uniformity (weight 0.25) ---
best_uniformity = 0.0
best_uniform_col = -1
has_letter_transition = False
for geom in geometries:
texts = [
w["text"].strip().lower()
for w in sorted(geom.words, key=lambda w: w.get("top", 0))
if w.get("conf", 0) > 30 and len(w["text"].strip()) >= 2
]
if len(texts) < 5:
continue
# Count first letters
first_letters = [t[0] for t in texts if t[0].isalpha()]
if not first_letters:
continue
from collections import Counter
letter_counts = Counter(first_letters)
most_common_letter, most_common_count = letter_counts.most_common(1)[0]
uniformity = most_common_count / len(first_letters)
# Check for orderly letter transitions (A→B or Y→Z)
# Group consecutive words by first letter, check if groups are in order
groups = []
current_letter = first_letters[0]
for fl in first_letters:
if fl != current_letter:
groups.append(current_letter)
current_letter = fl
groups.append(current_letter)
if len(groups) >= 2 and len(groups) <= 5:
# Check if groups are alphabetically ordered
if all(groups[i] <= groups[i + 1] for i in range(len(groups) - 1)):
has_letter_transition = True
# Boost uniformity for orderly transitions
uniformity = max(uniformity, 0.70)
if uniformity > best_uniformity:
best_uniformity = uniformity
best_uniform_col = geom.index
result["signals"]["first_letter_uniformity"] = round(best_uniformity, 3)
result["signals"]["uniform_col"] = best_uniform_col
result["signals"]["has_letter_transition"] = has_letter_transition
# --- Signal 4: Decorative margin strip (weight 0.15) ---
result["signals"]["margin_strip_detected"] = margin_strip_detected
# --- Combine signals ---
s1 = min(best_alpha_score, 1.0) * 0.35
s2 = min(effective_article_score, 1.0) * 0.25
s3 = min(best_uniformity, 1.0) * 0.25
s4 = (1.0 if margin_strip_detected else 0.0) * 0.15
combined = s1 + s2 + s3 + s4
# Boost if user set document_category to 'woerterbuch'
if document_category == "woerterbuch":
combined = min(1.0, combined + 0.20)
result["signals"]["category_boost"] = True
result["confidence"] = round(combined, 3)
# Threshold: combined >= 0.40 to classify as dictionary
# (at least 2 strong signals or 3 moderate ones)
if combined >= 0.40:
result["is_dictionary"] = True
# Identify headword column: best alphabetical OR best uniform
if best_alpha_col >= 0 and best_alpha_score >= 0.60:
result["headword_col_index"] = best_alpha_col
elif best_uniform_col >= 0 and best_uniformity >= 0.50:
result["headword_col_index"] = best_uniform_col
if best_article_col >= 0 and best_article_density >= 0.30:
result["article_col_index"] = best_article_col
# If inline articles are strong but no dedicated column, note it
if best_inline_article_ratio >= 0.30 and result["article_col_index"] is None:
result["signals"]["inline_articles_detected"] = True
logger.info(
"DictionaryDetection: combined=%.3f is_dict=%s signals=%s",
combined, result["is_dictionary"], result["signals"],
)
return result
def _classify_dictionary_columns(
geometries: List[ColumnGeometry],
dict_signals: Dict[str, Any],
lang_scores: List[Dict[str, float]],
content_h: int,
) -> Optional[List[PageRegion]]:
"""Classify columns for a detected dictionary page.
Assigns column_headword, column_article, column_ipa, and
column_de/column_en based on dictionary signals and language scores.
Returns None if classification fails.
"""
if not dict_signals.get("is_dictionary"):
return None
regions: List[PageRegion] = []
assigned = set()
article_idx = dict_signals.get("article_col_index")
headword_idx = dict_signals.get("headword_col_index")
# 1. Assign article column if detected
if article_idx is not None:
for geom in geometries:
if geom.index == article_idx:
regions.append(PageRegion(
type="column_article",
x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=round(
dict_signals["signals"].get("article_density", 0.5), 2),
classification_method="dictionary",
))
assigned.add(geom.index)
break
# 2. Assign headword column
if headword_idx is not None and headword_idx not in assigned:
for geom in geometries:
if geom.index == headword_idx:
regions.append(PageRegion(
type="column_headword",
x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=round(
dict_signals["confidence"], 2),
classification_method="dictionary",
))
assigned.add(geom.index)
break
# 3. Assign remaining columns by language + content
remaining = [g for g in geometries if g.index not in assigned]
for geom in remaining:
ls = lang_scores[geom.index] if geom.index < len(lang_scores) else {"eng": 0, "deu": 0}
# Check if column contains IPA (brackets like [, /, ˈ)
ipa_chars = sum(
1 for w in geom.words
if any(c in (w.get("text") or "") for c in "[]/ˈˌːɪəɒʊæɑɔ")
)
ipa_ratio = ipa_chars / max(len(geom.words), 1)
if ipa_ratio > 0.25:
col_type = "column_ipa"
conf = round(min(1.0, ipa_ratio), 2)
elif ls["deu"] > ls["eng"] and ls["deu"] > 0.05:
col_type = "column_de"
conf = round(ls["deu"], 2)
elif ls["eng"] > ls["deu"] and ls["eng"] > 0.05:
col_type = "column_en"
conf = round(ls["eng"], 2)
else:
# Positional fallback: leftmost unassigned = EN, next = DE
left_unassigned = sorted(
[g for g in remaining if g.index not in assigned],
key=lambda g: g.x,
)
if geom == left_unassigned[0] if left_unassigned else None:
col_type = "column_en"
else:
col_type = "column_de"
conf = 0.4
regions.append(PageRegion(
type=col_type,
x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=conf,
classification_method="dictionary",
))
assigned.add(geom.index)
regions.sort(key=lambda r: r.x)
return regions
def _build_margin_regions(
all_regions: List[PageRegion],
left_x: int,
right_x: int,
img_w: int,
top_y: int,
content_h: int,
) -> List[PageRegion]:
"""Create margin_left / margin_right PageRegions from content bounds.
Margins represent the space between the image edge and the first/last
content column. They are used downstream for faithful page
reconstruction but are skipped during OCR.
"""
margins: List[PageRegion] = []
# Minimum gap (px) to create a margin region
_min_gap = 5
if left_x > _min_gap:
margins.append(PageRegion(
type='margin_left', x=0, y=top_y,
width=left_x, height=content_h,
classification_confidence=1.0,
classification_method='content_bounds',
))
# Right margin: from end of last content column to image edge
non_margin = [r for r in all_regions
if r.type not in ('margin_left', 'margin_right', 'header', 'footer',
'margin_top', 'margin_bottom')]
if non_margin:
last_col_end = max(r.x + r.width for r in non_margin)
else:
last_col_end = right_x
if img_w - last_col_end > _min_gap:
margins.append(PageRegion(
type='margin_right', x=last_col_end, y=top_y,
width=img_w - last_col_end, height=content_h,
classification_confidence=1.0,
classification_method='content_bounds',
))
if margins:
logger.info(f"Margins: {[(m.type, m.x, m.width) for m in margins]} "
f"(left_x={left_x}, right_x={right_x}, img_w={img_w})")
return margins
def positional_column_regions(
geometries: List[ColumnGeometry],
content_w: int,
content_h: int,
left_x: int,
) -> List[PageRegion]:
"""Classify columns by position only (no language scoring).
Structural columns (page_ref, column_marker) are identified by geometry.
Remaining content columns are labelled left→right as column_en, column_de,
column_example. The names are purely positional no language analysis.
"""
structural: List[PageRegion] = []
content_cols: List[ColumnGeometry] = []
for g in geometries:
rel_x = g.x - left_x
# page_ref: narrow column in the leftmost 20% region
if g.width_ratio < 0.12 and (rel_x / content_w if content_w else 0) < 0.20:
structural.append(PageRegion(
type='page_ref', x=g.x, y=g.y,
width=g.width, height=content_h,
classification_confidence=0.95,
classification_method='positional',
))
# column_marker: very narrow, few words
elif g.width_ratio < 0.06 and g.word_count <= 15:
structural.append(PageRegion(
type='column_marker', x=g.x, y=g.y,
width=g.width, height=content_h,
classification_confidence=0.95,
classification_method='positional',
))
# empty or near-empty narrow column → treat as margin/structural
elif g.word_count <= 2 and g.width_ratio < 0.15:
structural.append(PageRegion(
type='column_marker', x=g.x, y=g.y,
width=g.width, height=content_h,
classification_confidence=0.85,
classification_method='positional',
))
else:
content_cols.append(g)
# Single content column → plain text page
if len(content_cols) == 1:
g = content_cols[0]
return structural + [PageRegion(
type='column_text', x=g.x, y=g.y,
width=g.width, height=content_h,
classification_confidence=0.9,
classification_method='positional',
)]
# No content columns
if not content_cols:
return structural
# Sort content columns left→right and assign positional labels
content_cols.sort(key=lambda g: g.x)
# With exactly 2 content columns: if the left one is very wide (>35%),
# it likely contains EN+DE combined, so the right one is examples.
if (len(content_cols) == 2
and content_cols[0].width_ratio > 0.35
and content_cols[1].width_ratio > 0.20):
labels = ['column_en', 'column_example']
else:
labels = ['column_en', 'column_de', 'column_example']
regions = list(structural)
for i, g in enumerate(content_cols):
label = labels[i] if i < len(labels) else 'column_example'
regions.append(PageRegion(
type=label, x=g.x, y=g.y,
width=g.width, height=content_h,
classification_confidence=0.95,
classification_method='positional',
))
logger.info(f"PositionalColumns: {len(structural)} structural, "
f"{len(content_cols)} content → "
f"{[r.type for r in regions]}")
return regions
def classify_column_types(geometries: List[ColumnGeometry],
content_w: int,
top_y: int,
img_w: int,
img_h: int,
bottom_y: int,
left_x: int = 0,
right_x: int = 0,
inv: Optional[np.ndarray] = None,
document_category: Optional[str] = None,
margin_strip_detected: bool = False) -> List[PageRegion]:
"""Classify column types using a 3-level fallback chain.
Level 0: Dictionary detection (if signals are strong enough)
Level 1: Content-based (language + role scoring)
Level 2: Position + language (old rules enhanced with language detection)
Level 3: Pure position (exact old code, no regression)
Args:
geometries: List of ColumnGeometry from Phase A.
content_w: Total content width.
top_y: Top Y of content area.
img_w: Full image width.
img_h: Full image height.
bottom_y: Bottom Y of content area.
left_x: Left content bound (from _find_content_bounds).
right_x: Right content bound (from _find_content_bounds).
document_category: User-selected category (e.g. 'woerterbuch').
margin_strip_detected: Whether a decorative A-Z margin strip was found.
Returns:
List of PageRegion with types, confidence, and method.
"""
content_h = bottom_y - top_y
def _with_margins(result: List[PageRegion]) -> List[PageRegion]:
"""Append margin_left / margin_right regions to *result*."""
margins = _build_margin_regions(result, left_x, right_x, img_w, top_y, content_h)
return result + margins
# Special case: single column → plain text page
if len(geometries) == 1:
geom = geometries[0]
return _with_margins([PageRegion(
type='column_text', x=geom.x, y=geom.y,
width=geom.width, height=geom.height,
classification_confidence=0.9,
classification_method='content',
)])
# --- Pre-filter: first/last columns with very few words → column_ignore ---
# Sub-columns from _detect_sub_columns() are exempt: they intentionally
# have few words (page refs, markers) and should not be discarded.
ignore_regions = []
active_geometries = []
for idx, g in enumerate(geometries):
if (idx == 0 or idx == len(geometries) - 1) and g.word_count < 8 and not g.is_sub_column:
ignore_regions.append(PageRegion(
type='column_ignore', x=g.x, y=g.y,
width=g.width, height=content_h,
classification_confidence=0.95,
classification_method='content',
))
logger.info(f"ClassifyColumns: column {idx} (x={g.x}, words={g.word_count}) → column_ignore (edge, few words)")
else:
active_geometries.append(g)
# Re-index active geometries for classification
for new_idx, g in enumerate(active_geometries):
g.index = new_idx
geometries = active_geometries
# Handle edge case: all columns ignored or only 1 left
if len(geometries) == 0:
return _with_margins(ignore_regions)
if len(geometries) == 1:
geom = geometries[0]
ignore_regions.append(PageRegion(
type='column_text', x=geom.x, y=geom.y,
width=geom.width, height=geom.height,
classification_confidence=0.9,
classification_method='content',
))
return _with_margins(ignore_regions)
# --- Score all columns ---
lang_scores = [_score_language(g.words) for g in geometries]
role_scores = [_score_role(g) for g in geometries]
logger.info(f"ClassifyColumns: language scores: "
f"{[(g.index, ls) for g, ls in zip(geometries, lang_scores)]}")
logger.info(f"ClassifyColumns: role scores: "
f"{[(g.index, rs) for g, rs in zip(geometries, role_scores)]}")
# --- Level 0: Dictionary detection ---
dict_signals = _score_dictionary_signals(
geometries,
document_category=document_category,
margin_strip_detected=margin_strip_detected,
)
if dict_signals["is_dictionary"]:
regions = _classify_dictionary_columns(
geometries, dict_signals, lang_scores, content_h,
)
if regions is not None:
logger.info("ClassifyColumns: Level 0 (dictionary) succeeded, confidence=%.3f",
dict_signals["confidence"])
_add_header_footer(regions, top_y, bottom_y, img_w, img_h, inv=inv)
return _with_margins(ignore_regions + regions)
# --- Level 1: Content-based classification ---
regions = _classify_by_content(geometries, lang_scores, role_scores, content_w, content_h)
if regions is not None:
logger.info("ClassifyColumns: Level 1 (content-based) succeeded")
_add_header_footer(regions, top_y, bottom_y, img_w, img_h, inv=inv)
return _with_margins(ignore_regions + regions)
# --- Level 2: Position + language enhanced ---
regions = _classify_by_position_enhanced(geometries, lang_scores, content_w, content_h)
if regions is not None:
logger.info("ClassifyColumns: Level 2 (position+language) succeeded")
_add_header_footer(regions, top_y, bottom_y, img_w, img_h, inv=inv)
return _with_margins(ignore_regions + regions)
# --- Level 3: Pure position fallback (old code, no regression) ---
logger.info("ClassifyColumns: Level 3 (position fallback)")
regions = _classify_by_position_fallback(geometries, content_w, content_h)
_add_header_footer(regions, top_y, bottom_y, img_w, img_h, inv=inv)
return _with_margins(ignore_regions + regions)
def _classify_by_content(geometries: List[ColumnGeometry],
lang_scores: List[Dict[str, float]],
role_scores: List[Dict[str, float]],
content_w: int,
content_h: int) -> Optional[List[PageRegion]]:
"""Level 1: Classify columns purely by content analysis.
Requires clear language signals to distinguish EN/DE columns.
Returns None if language signals are too weak.
"""
regions = []
assigned = set()
# Step 1: Assign structural roles first (reference, marker)
# left_20_threshold: only the leftmost ~20% of content area qualifies for page_ref
left_20_threshold = geometries[0].x + content_w * 0.20 if geometries else 0
for i, (geom, rs, ls) in enumerate(zip(geometries, role_scores, lang_scores)):
is_left_side = geom.x < left_20_threshold
has_strong_language = ls['eng'] > 0.3 or ls['deu'] > 0.3
if rs['reference'] >= 0.5 and geom.width_ratio < 0.12 and is_left_side and not has_strong_language:
regions.append(PageRegion(
type='page_ref', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=rs['reference'],
classification_method='content',
))
assigned.add(i)
elif rs['marker'] >= 0.7 and geom.width_ratio < 0.06:
regions.append(PageRegion(
type='column_marker', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=rs['marker'],
classification_method='content',
))
assigned.add(i)
elif geom.width_ratio < 0.05 and not is_left_side:
# Narrow column on the right side → marker, not page_ref
regions.append(PageRegion(
type='column_marker', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=0.8,
classification_method='content',
))
assigned.add(i)
# Step 2: Among remaining columns, find EN and DE by language scores
remaining = [(i, geometries[i], lang_scores[i], role_scores[i])
for i in range(len(geometries)) if i not in assigned]
if len(remaining) < 2:
# Not enough columns for EN/DE pair
if len(remaining) == 1:
i, geom, ls, rs = remaining[0]
regions.append(PageRegion(
type='column_text', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=0.6,
classification_method='content',
))
regions.sort(key=lambda r: r.x)
return regions
# Check if we have enough language signal
en_candidates = [(i, g, ls) for i, g, ls, rs in remaining if ls['eng'] > ls['deu'] and ls['eng'] > 0.05]
de_candidates = [(i, g, ls) for i, g, ls, rs in remaining if ls['deu'] > ls['eng'] and ls['deu'] > 0.05]
# Position tiebreaker: when language signals are weak, use left=EN, right=DE
if (not en_candidates or not de_candidates) and len(remaining) >= 2:
max_eng = max(ls['eng'] for _, _, ls, _ in remaining)
max_deu = max(ls['deu'] for _, _, ls, _ in remaining)
if max_eng < 0.15 and max_deu < 0.15:
# Both signals weak — fall back to positional: left=EN, right=DE
sorted_remaining = sorted(remaining, key=lambda x: x[1].x)
best_en = (sorted_remaining[0][0], sorted_remaining[0][1], sorted_remaining[0][2])
best_de = (sorted_remaining[1][0], sorted_remaining[1][1], sorted_remaining[1][2])
logger.info("ClassifyColumns: Level 1 using position tiebreaker (weak signals) - left=EN, right=DE")
en_conf = 0.4
de_conf = 0.4
regions.append(PageRegion(
type='column_en', x=best_en[1].x, y=best_en[1].y,
width=best_en[1].width, height=content_h,
classification_confidence=en_conf,
classification_method='content',
))
assigned.add(best_en[0])
regions.append(PageRegion(
type='column_de', x=best_de[1].x, y=best_de[1].y,
width=best_de[1].width, height=content_h,
classification_confidence=de_conf,
classification_method='content',
))
assigned.add(best_de[0])
# Assign remaining as example
for i, geom, ls, rs in remaining:
if i not in assigned:
regions.append(PageRegion(
type='column_example', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=0.4,
classification_method='content',
))
regions.sort(key=lambda r: r.x)
return regions
if not en_candidates or not de_candidates:
# Language signals too weak for content-based classification
logger.info("ClassifyColumns: Level 1 failed - no clear EN/DE language split")
return None
# Pick the best EN and DE candidates
best_en = max(en_candidates, key=lambda x: x[2]['eng'])
best_de = max(de_candidates, key=lambda x: x[2]['deu'])
# Position-aware EN selection: in typical textbooks the layout is EN | DE | Example.
# Example sentences contain English function words ("the", "a", "is") which inflate
# the eng score of the Example column. When the best EN candidate sits to the RIGHT
# of the DE column and there is another EN candidate to the LEFT, prefer the left one
# — it is almost certainly the real vocabulary column.
if best_de[2]['deu'] > 0.5 and best_en[1].x > best_de[1].x and len(en_candidates) > 1:
left_of_de = [c for c in en_candidates if c[1].x < best_de[1].x]
if left_of_de:
alt_en = max(left_of_de, key=lambda x: x[2]['eng'])
logger.info(
f"ClassifyColumns: Level 1 position fix — best EN col {best_en[0]} "
f"(eng={best_en[2]['eng']:.3f}) is right of DE col {best_de[0]}; "
f"preferring left col {alt_en[0]} (eng={alt_en[2]['eng']:.3f})")
best_en = alt_en
if best_en[0] == best_de[0]:
# Same column scored highest for both — ambiguous
logger.info("ClassifyColumns: Level 1 failed - same column highest for EN and DE")
return None
en_conf = best_en[2]['eng']
de_conf = best_de[2]['deu']
regions.append(PageRegion(
type='column_en', x=best_en[1].x, y=best_en[1].y,
width=best_en[1].width, height=content_h,
classification_confidence=round(en_conf, 2),
classification_method='content',
))
assigned.add(best_en[0])
regions.append(PageRegion(
type='column_de', x=best_de[1].x, y=best_de[1].y,
width=best_de[1].width, height=content_h,
classification_confidence=round(de_conf, 2),
classification_method='content',
))
assigned.add(best_de[0])
# Step 3: Remaining columns → example or text based on role scores
for i, geom, ls, rs in remaining:
if i in assigned:
continue
if rs['sentence'] > 0.4:
regions.append(PageRegion(
type='column_example', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=round(rs['sentence'], 2),
classification_method='content',
))
else:
regions.append(PageRegion(
type='column_example', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=0.5,
classification_method='content',
))
regions.sort(key=lambda r: r.x)
return regions
def _classify_by_position_enhanced(geometries: List[ColumnGeometry],
lang_scores: List[Dict[str, float]],
content_w: int,
content_h: int) -> Optional[List[PageRegion]]:
"""Level 2: Position-based rules enhanced with language confirmation.
Uses the old positional heuristics but confirms EN/DE assignment
with language scores (swapping if needed).
"""
regions = []
untyped = list(range(len(geometries)))
first_x = geometries[0].x if geometries else 0
left_20_threshold = first_x + content_w * 0.20
# Rule 1: Leftmost narrow column → page_ref (only if in left 20%, no strong language)
g0 = geometries[0]
ls0 = lang_scores[0]
has_strong_lang_0 = ls0['eng'] > 0.3 or ls0['deu'] > 0.3
if g0.width_ratio < 0.12 and g0.x < left_20_threshold and not has_strong_lang_0:
regions.append(PageRegion(
type='page_ref', x=g0.x, y=g0.y,
width=g0.width, height=content_h,
classification_confidence=0.8,
classification_method='position_enhanced',
))
untyped.remove(0)
# Rule 2: Narrow columns with few words → marker
for i in list(untyped):
geom = geometries[i]
if geom.width_ratio < 0.06 and geom.word_count <= 15:
regions.append(PageRegion(
type='column_marker', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=0.7,
classification_method='position_enhanced',
))
untyped.remove(i)
# Rule 3: Rightmost remaining → column_example (if 3+ remaining)
if len(untyped) >= 3:
last_idx = untyped[-1]
geom = geometries[last_idx]
regions.append(PageRegion(
type='column_example', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=0.7,
classification_method='position_enhanced',
))
untyped.remove(last_idx)
# Rule 4: First two remaining → EN/DE, but check language to possibly swap
if len(untyped) >= 2:
idx_a = untyped[0]
idx_b = untyped[1]
ls_a = lang_scores[idx_a]
ls_b = lang_scores[idx_b]
# Default: first=EN, second=DE (old behavior)
en_idx, de_idx = idx_a, idx_b
conf = 0.7
# Swap if language signals clearly indicate the opposite
if ls_a['deu'] > ls_a['eng'] and ls_b['eng'] > ls_b['deu']:
en_idx, de_idx = idx_b, idx_a
conf = 0.85
logger.info(f"ClassifyColumns: Level 2 swapped EN/DE based on language scores")
regions.append(PageRegion(
type='column_en', x=geometries[en_idx].x, y=geometries[en_idx].y,
width=geometries[en_idx].width, height=content_h,
classification_confidence=conf,
classification_method='position_enhanced',
))
regions.append(PageRegion(
type='column_de', x=geometries[de_idx].x, y=geometries[de_idx].y,
width=geometries[de_idx].width, height=content_h,
classification_confidence=conf,
classification_method='position_enhanced',
))
untyped = untyped[2:]
elif len(untyped) == 1:
idx = untyped[0]
geom = geometries[idx]
regions.append(PageRegion(
type='column_en', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=0.5,
classification_method='position_enhanced',
))
untyped = []
# Remaining → example
for idx in untyped:
geom = geometries[idx]
regions.append(PageRegion(
type='column_example', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=0.5,
classification_method='position_enhanced',
))
regions.sort(key=lambda r: r.x)
return regions
def _classify_by_position_fallback(geometries: List[ColumnGeometry],
content_w: int,
content_h: int) -> List[PageRegion]:
"""Level 3: Pure position-based fallback (identical to old code).
Guarantees no regression from the previous behavior.
"""
regions = []
untyped = list(range(len(geometries)))
first_x = geometries[0].x if geometries else 0
left_20_threshold = first_x + content_w * 0.20
# Rule 1: Leftmost narrow column → page_ref (only if in left 20%)
g0 = geometries[0]
if g0.width_ratio < 0.12 and g0.x < left_20_threshold:
regions.append(PageRegion(
type='page_ref', x=g0.x, y=g0.y,
width=g0.width, height=content_h,
classification_confidence=1.0,
classification_method='position_fallback',
))
untyped.remove(0)
# Rule 2: Narrow + few words → marker
for i in list(untyped):
geom = geometries[i]
if geom.width_ratio < 0.06 and geom.word_count <= 15:
regions.append(PageRegion(
type='column_marker', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=1.0,
classification_method='position_fallback',
))
untyped.remove(i)
# Rule 3: Rightmost remaining → example (if 3+)
if len(untyped) >= 3:
last_idx = untyped[-1]
geom = geometries[last_idx]
regions.append(PageRegion(
type='column_example', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=1.0,
classification_method='position_fallback',
))
untyped.remove(last_idx)
# Rule 4: First remaining → EN, second → DE
if len(untyped) >= 2:
en_idx = untyped[0]
de_idx = untyped[1]
regions.append(PageRegion(
type='column_en', x=geometries[en_idx].x, y=geometries[en_idx].y,
width=geometries[en_idx].width, height=content_h,
classification_confidence=1.0,
classification_method='position_fallback',
))
regions.append(PageRegion(
type='column_de', x=geometries[de_idx].x, y=geometries[de_idx].y,
width=geometries[de_idx].width, height=content_h,
classification_confidence=1.0,
classification_method='position_fallback',
))
untyped = untyped[2:]
elif len(untyped) == 1:
idx = untyped[0]
geom = geometries[idx]
regions.append(PageRegion(
type='column_en', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=1.0,
classification_method='position_fallback',
))
untyped = []
for idx in untyped:
geom = geometries[idx]
regions.append(PageRegion(
type='column_example', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=1.0,
classification_method='position_fallback',
))
regions.sort(key=lambda r: r.x)
return regions
def _detect_header_footer_gaps(
inv: np.ndarray,
img_w: int,
img_h: int,
) -> Tuple[Optional[int], Optional[int]]:
"""Detect header/footer boundaries via horizontal projection gap analysis.
Scans the full-page inverted image for large horizontal gaps in the top/bottom
20% that separate header/footer content from the main body.
Returns:
(header_y, footer_y) — absolute y-coordinates.
header_y = bottom edge of header region (None if no header detected).
footer_y = top edge of footer region (None if no footer detected).
"""
HEADER_FOOTER_ZONE = 0.20
GAP_MULTIPLIER = 2.0
# Step 1: Horizontal projection — clamp to img_h to avoid dewarp padding
actual_h = min(inv.shape[0], img_h)
roi = inv[:actual_h, :]
h_proj = np.sum(roi, axis=1).astype(float)
proj_w = roi.shape[1]
h_proj_norm = h_proj / (proj_w * 255) if proj_w > 0 else h_proj
# Step 2: Smoothing
kernel_size = max(3, actual_h // 200)
if kernel_size % 2 == 0:
kernel_size += 1
h_smooth = np.convolve(h_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
# Step 3: Gap threshold
positive = h_smooth[h_smooth > 0]
median_density = float(np.median(positive)) if len(positive) > 0 else 0.01
gap_threshold = max(median_density * 0.15, 0.003)
in_gap = h_smooth < gap_threshold
MIN_GAP_HEIGHT = max(3, actual_h // 500)
# Step 4: Collect contiguous gaps
raw_gaps: List[Tuple[int, int]] = []
gap_start: Optional[int] = None
for y in range(len(in_gap)):
if in_gap[y]:
if gap_start is None:
gap_start = y
else:
if gap_start is not None:
gap_height = y - gap_start
if gap_height >= MIN_GAP_HEIGHT:
raw_gaps.append((gap_start, y))
gap_start = None
if gap_start is not None:
gap_height = len(in_gap) - gap_start
if gap_height >= MIN_GAP_HEIGHT:
raw_gaps.append((gap_start, len(in_gap)))
if not raw_gaps:
return None, None
# Step 5: Compute median gap size and large-gap threshold
gap_sizes = [g[1] - g[0] for g in raw_gaps]
median_gap = float(np.median(gap_sizes))
large_gap_threshold = median_gap * GAP_MULTIPLIER
# Step 6: Find largest qualifying gap in header / footer zones
# A separator gap must have content on BOTH sides — edge-touching gaps
# (e.g. dewarp padding at bottom) are not valid separators.
EDGE_MARGIN = max(5, actual_h // 400)
header_zone_limit = int(actual_h * HEADER_FOOTER_ZONE)
footer_zone_start = int(actual_h * (1.0 - HEADER_FOOTER_ZONE))
header_y: Optional[int] = None
footer_y: Optional[int] = None
best_header_size = 0
for gs, ge in raw_gaps:
if gs <= EDGE_MARGIN:
continue # skip gaps touching the top edge
gap_mid = (gs + ge) / 2
gap_size = ge - gs
if gap_mid < header_zone_limit and gap_size > large_gap_threshold:
if gap_size > best_header_size:
best_header_size = gap_size
header_y = ge # bottom edge of gap
best_footer_size = 0
for gs, ge in raw_gaps:
if ge >= actual_h - EDGE_MARGIN:
continue # skip gaps touching the bottom edge
gap_mid = (gs + ge) / 2
gap_size = ge - gs
if gap_mid > footer_zone_start and gap_size > large_gap_threshold:
if gap_size > best_footer_size:
best_footer_size = gap_size
footer_y = gs # top edge of gap
if header_y is not None:
logger.info(f"HeaderFooterGaps: header boundary at y={header_y} "
f"(gap={best_header_size}px, median_gap={median_gap:.0f}px)")
if footer_y is not None:
logger.info(f"HeaderFooterGaps: footer boundary at y={footer_y} "
f"(gap={best_footer_size}px, median_gap={median_gap:.0f}px)")
return header_y, footer_y
def _region_has_content(inv: np.ndarray, y_start: int, y_end: int,
min_density: float = 0.005) -> bool:
"""Check whether a horizontal strip contains meaningful ink.
Args:
inv: Inverted binarized image (white-on-black).
y_start: Top of the region (inclusive).
y_end: Bottom of the region (exclusive).
min_density: Fraction of white pixels required to count as content.
Returns:
True if the region contains text/graphics, False if empty margin.
"""
if y_start >= y_end:
return False
strip = inv[y_start:y_end, :]
density = float(np.sum(strip)) / (strip.shape[0] * strip.shape[1] * 255)
return density > min_density
def _add_header_footer(regions: List[PageRegion], top_y: int, bottom_y: int,
img_w: int, img_h: int,
inv: Optional[np.ndarray] = None) -> None:
"""Add header/footer/margin regions in-place.
Uses gap-based detection when *inv* is provided, otherwise falls back
to simple top_y/bottom_y bounds.
Region types depend on whether there is actual content (text/graphics):
- 'header' / 'footer' — region contains text (e.g. title, page number)
- 'margin_top' / 'margin_bottom' — region is empty page margin
"""
header_y: Optional[int] = None
footer_y: Optional[int] = None
if inv is not None:
header_y, footer_y = _detect_header_footer_gaps(inv, img_w, img_h)
# --- Top region ---
top_boundary = header_y if header_y is not None and header_y > 10 else (
top_y if top_y > 10 else None
)
if top_boundary is not None:
has_content = inv is not None and _region_has_content(inv, 0, top_boundary)
rtype = 'header' if has_content else 'margin_top'
regions.append(PageRegion(type=rtype, x=0, y=0, width=img_w, height=top_boundary))
logger.info(f"HeaderFooter: top region type={rtype} height={top_boundary}px "
f"(has_content={has_content})")
# --- Bottom region ---
bottom_boundary = footer_y if footer_y is not None and footer_y < img_h - 10 else (
bottom_y if bottom_y < img_h - 10 else None
)
if bottom_boundary is not None:
has_content = inv is not None and _region_has_content(inv, bottom_boundary, img_h)
rtype = 'footer' if has_content else 'margin_bottom'
regions.append(PageRegion(type=rtype, x=0, y=bottom_boundary, width=img_w,
height=img_h - bottom_boundary))
logger.info(f"HeaderFooter: bottom region type={rtype} y={bottom_boundary} "
f"height={img_h - bottom_boundary}px (has_content={has_content})")
# --- Main Entry Point ---
def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> List[PageRegion]:
"""Detect columns using two-phase approach: geometry then content classification.
Phase A: detect_column_geometry() — clustering word positions into columns.
Phase B: classify_column_types() — content-based type assignment with fallback.
Falls back to projection-based analyze_layout() if geometry detection fails.
Args:
ocr_img: Binarized grayscale image for layout analysis.
dewarped_bgr: Original BGR image (for Tesseract word detection).
Returns:
List of PageRegion objects with types, confidence, and method.
"""
h, w = ocr_img.shape[:2]
# Phase A: Geometry detection
result = detect_column_geometry(ocr_img, dewarped_bgr)
if result is None:
# Fallback to projection-based layout
logger.info("LayoutByWords: geometry detection failed, falling back to projection profiles")
layout_img = create_layout_image(dewarped_bgr)
return analyze_layout(layout_img, ocr_img)
geometries, left_x, right_x, top_y, bottom_y, _word_dicts, _inv = result
content_w = right_x - left_x
# Detect header/footer early so sub-column clustering ignores them
header_y, footer_y = _detect_header_footer_gaps(_inv, w, h) if _inv is not None else (None, None)
# Split sub-columns (e.g. page references) before classification
geometries = _detect_sub_columns(geometries, content_w, left_x=left_x,
top_y=top_y, header_y=header_y, footer_y=footer_y)
# Split broad columns that contain EN+DE mixed via word-coverage gaps
geometries = _split_broad_columns(geometries, content_w, left_x=left_x)
# Phase B: Positional classification (no language scoring)
content_h = bottom_y - top_y
regions = positional_column_regions(geometries, content_w, content_h, left_x)
col_count = len([r for r in regions if r.type.startswith('column') or r.type == 'page_ref'])
methods = set(r.classification_method for r in regions if r.classification_method)
logger.info(f"LayoutByWords: {col_count} columns detected (methods: {methods}): "
f"{[(r.type, r.x, r.width, r.classification_confidence) for r in regions if r.type not in ('header','footer','margin_top','margin_bottom')]}")
return regions
# ---------------------------------------------------------------------------
# Zone-aware column geometry detection
# ---------------------------------------------------------------------------
def detect_column_geometry_zoned(
ocr_img: np.ndarray,
dewarped_bgr: np.ndarray,
) -> Optional[Tuple[
List[ColumnGeometry], # flat column list (all zones)
int, int, int, int, # left_x, right_x, top_y, bottom_y
List[Dict], # word_dicts
np.ndarray, # inv
List[Dict], # zones (serializable)
List[DetectedBox], # detected boxes
]]:
"""Zone-aware column geometry detection.
1. Finds content bounds.
2. Runs box detection.
3. If boxes found: splits page into zones, runs detect_column_geometry()
per content zone on the corresponding sub-image.
4. If no boxes: delegates entirely to detect_column_geometry() (backward compat).
Returns:
Extended tuple: (geometries, left_x, right_x, top_y, bottom_y,
word_dicts, inv, zones_data, boxes)
or None if detection fails.
"""
from cv_box_detect import detect_boxes, split_page_into_zones
# First run normal detection to get content bounds and word data
geo_result = detect_column_geometry(ocr_img, dewarped_bgr)
if geo_result is None:
return None
geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv = geo_result
content_w = right_x - left_x
content_h = bottom_y - top_y
# Detect boxes in the image
boxes = detect_boxes(
dewarped_bgr, left_x, content_w, top_y, content_h,
)
if not boxes:
# No boxes — single zone, backward compatible
zone_data = [{
"index": 0,
"zone_type": "content",
"y": top_y,
"height": content_h,
"x": left_x,
"width": content_w,
"columns": [], # filled later by caller
}]
return (geometries, left_x, right_x, top_y, bottom_y,
word_dicts, inv, zone_data, boxes)
# --- New approach: concatenate content regions (skip boxes), run column
# detection ONCE on the combined image, then map coordinates back. ---
# Split into zones (for metadata / overlay purposes)
zones = split_page_into_zones(left_x, top_y, content_w, content_h, boxes)
# Collect content strips (above/between/below boxes)
content_strips: List[Tuple[int, int]] = [] # (y_start, y_end) in absolute coords
for zone in zones:
if zone.zone_type == 'content' and zone.height >= 40:
content_strips.append((zone.y, zone.y + zone.height))
if not content_strips:
# Only box zones — fall back to original detection
logger.info("ZonedColumns: no content zones with height >= 40, using original result")
zone_data = [{"index": 0, "zone_type": "content", "y": top_y,
"height": content_h, "x": left_x, "width": content_w, "columns": []}]
return (geometries, left_x, right_x, top_y, bottom_y,
word_dicts, inv, zone_data, boxes)
# Build combined image by vertically stacking content strips
ocr_strips = [ocr_img[ys:ye, :] for ys, ye in content_strips]
bgr_strips = [dewarped_bgr[ys:ye, :] for ys, ye in content_strips]
combined_ocr = np.vstack(ocr_strips)
combined_bgr = np.vstack(bgr_strips)
logger.info(f"ZonedColumns: {len(boxes)} box(es), concatenating {len(content_strips)} "
f"content strips into combined image {combined_ocr.shape[1]}x{combined_ocr.shape[0]}")
# Run column detection on the combined (box-free) image
combined_result = detect_column_geometry(combined_ocr, combined_bgr)
if combined_result is not None:
combined_geoms, c_lx, c_rx, c_ty, c_by, combined_words, combined_inv = combined_result
else:
# Fallback to original full-page result
logger.info("ZonedColumns: combined image column detection failed, using original")
combined_geoms = geometries
# Map combined-image y-coordinates back to absolute page coordinates.
# In the combined image, strip i starts at cumulative_y = sum of heights
# of strips 0..i-1. We need to add the offset between the strip's
# original y-position and its position in the combined image.
# Build a mapping: combined_y → absolute_y
strip_offsets: List[Tuple[int, int, int]] = [] # (combined_y_start, strip_height, abs_y_start)
cum_y = 0
for ys, ye in content_strips:
h = ye - ys
strip_offsets.append((cum_y, h, ys))
cum_y += h
def _combined_y_to_abs(cy: int) -> int:
"""Map a y-coordinate in combined image back to absolute page coords."""
for c_start, s_h, abs_start in strip_offsets:
if cy < c_start + s_h:
return abs_start + (cy - c_start)
# Past last strip — clamp to end of last strip
last_c, last_h, last_abs = strip_offsets[-1]
return last_abs + last_h
# Adjust geometries: y and height need remapping
if combined_result is not None:
for g in combined_geoms:
abs_y = _combined_y_to_abs(g.y)
abs_y_end = _combined_y_to_abs(g.y + g.height)
g.y = abs_y
g.height = abs_y_end - abs_y
# --- Enrich column geometries with box-filtered original words ---
# The combined-image Tesseract may miss words in small content strips
# (e.g. a single row above a box). Use the original full-page word_dicts
# filtered to exclude box interiors, so that _detect_sub_columns()
# downstream has ALL content-zone words for left-edge clustering.
# This ensures narrow sub-columns (page_ref, marker) are detectable
# even when only a few entries exist above/below a box.
if word_dicts:
content_words = []
for w in word_dicts:
# word positions are relative to left_x / top_y
w_abs_cx = w['left'] + left_x + w['width'] / 2
w_abs_cy = w['top'] + top_y + w['height'] / 2
inside_box = any(
box.x <= w_abs_cx <= box.x + box.width
and box.y <= w_abs_cy <= box.y + box.height
for box in boxes
)
if not inside_box:
content_words.append(w)
target_geoms = combined_geoms if combined_result is not None else geometries
for g in target_geoms:
# Word 'left' is relative to left_x; geometry 'x' is absolute
g_left_rel = g.x - left_x
g_right_rel = g_left_rel + g.width
g.words = [
w for w in content_words
if g_left_rel <= w['left'] + w['width'] / 2 < g_right_rel
]
g.word_count = len(g.words)
excluded_count = len(word_dicts) - len(content_words)
if excluded_count:
logger.info(
"ZonedColumns: enriched geometries with %d content words "
"(excluded %d box-interior words)",
len(content_words), excluded_count,
)
# Build zones_data for the response
zones_data: List[Dict] = []
for zone in zones:
zone_dict: Dict = {
"index": zone.index,
"zone_type": zone.zone_type,
"y": zone.y,
"height": zone.height,
"x": zone.x,
"width": zone.width,
"columns": [],
}
if zone.box is not None:
zone_dict["box"] = {
"x": zone.box.x,
"y": zone.box.y,
"width": zone.box.width,
"height": zone.box.height,
"confidence": zone.box.confidence,
"border_thickness": zone.box.border_thickness,
}
zones_data.append(zone_dict)
all_geometries = combined_geoms if combined_geoms else geometries
logger.info(f"ZonedColumns: {len(boxes)} box(es), {len(zones)} zone(s), "
f"{len(all_geometries)} total columns (combined-image approach)")
return (all_geometries, left_x, right_x, top_y, bottom_y,
word_dicts, inv, zones_data, boxes)