Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 26s
CI / test-go-edu-search (push) Successful in 30s
CI / test-python-klausur (push) Failing after 2m4s
CI / test-python-agent-core (push) Successful in 18s
CI / test-nodejs-website (push) Successful in 21s
Funktion war nur in cv_review.py definiert, wurde aber auch in cv_ocr_engines.py und cv_layout.py benutzt — NameError zur Laufzeit. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
3170 lines
121 KiB
Python
3170 lines
121 KiB
Python
"""
|
||
Document type detection, layout analysis, column/row geometry, and classification.
|
||
|
||
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
||
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
||
"""
|
||
|
||
import logging
|
||
import re
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
import numpy as np
|
||
|
||
from cv_vocab_types import (
|
||
ColumnGeometry,
|
||
DetectedBox,
|
||
DocumentTypeResult,
|
||
ENGLISH_FUNCTION_WORDS,
|
||
GERMAN_FUNCTION_WORDS,
|
||
PageRegion,
|
||
PageZone,
|
||
RowGeometry,
|
||
)
|
||
from cv_ocr_engines import _group_words_into_lines # noqa: E402
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
try:
|
||
import cv2
|
||
except ImportError:
|
||
cv2 = None # type: ignore[assignment]
|
||
|
||
try:
|
||
import pytesseract
|
||
from PIL import Image
|
||
except ImportError:
|
||
pytesseract = None # type: ignore[assignment]
|
||
Image = None # type: ignore[assignment,misc]
|
||
|
||
|
||
def detect_document_type(ocr_img: np.ndarray, img_bgr: np.ndarray) -> DocumentTypeResult:
|
||
"""Detect whether the page is a vocab table, generic table, or full text.
|
||
|
||
Uses projection profiles and text density analysis — no OCR required.
|
||
Runs in < 2 seconds.
|
||
|
||
Args:
|
||
ocr_img: Binarized grayscale image (for projection profiles).
|
||
img_bgr: BGR color image.
|
||
|
||
Returns:
|
||
DocumentTypeResult with doc_type, confidence, pipeline, skip_steps.
|
||
"""
|
||
if ocr_img is None or ocr_img.size == 0:
|
||
return DocumentTypeResult(
|
||
doc_type='full_text', confidence=0.5, pipeline='full_page',
|
||
skip_steps=['columns', 'rows'],
|
||
features={'error': 'empty image'},
|
||
)
|
||
|
||
h, w = ocr_img.shape[:2]
|
||
|
||
# --- 1. Vertical projection profile → detect column gaps ---
|
||
# Sum dark pixels along each column (x-axis). Gaps = valleys in the profile.
|
||
# Invert: dark pixels on white background → high values = text.
|
||
vert_proj = np.sum(ocr_img < 128, axis=0).astype(float)
|
||
|
||
# Smooth the profile to avoid noise spikes
|
||
kernel_size = max(3, w // 100)
|
||
if kernel_size % 2 == 0:
|
||
kernel_size += 1
|
||
vert_smooth = np.convolve(vert_proj, np.ones(kernel_size) / kernel_size, mode='same')
|
||
|
||
# Find significant vertical gaps (columns of near-zero text density)
|
||
# A gap must be at least 1% of image width and have < 5% of max density
|
||
max_density = max(vert_smooth.max(), 1)
|
||
gap_threshold = max_density * 0.05
|
||
min_gap_width = max(5, w // 100)
|
||
|
||
in_gap = False
|
||
gap_count = 0
|
||
gap_start = 0
|
||
vert_gaps = []
|
||
|
||
for x in range(w):
|
||
if vert_smooth[x] < gap_threshold:
|
||
if not in_gap:
|
||
in_gap = True
|
||
gap_start = x
|
||
else:
|
||
if in_gap:
|
||
gap_width = x - gap_start
|
||
if gap_width >= min_gap_width:
|
||
gap_count += 1
|
||
vert_gaps.append((gap_start, x, gap_width))
|
||
in_gap = False
|
||
|
||
# Filter out margin gaps (within 10% of image edges)
|
||
margin_threshold = w * 0.10
|
||
internal_gaps = [g for g in vert_gaps if g[0] > margin_threshold and g[1] < w - margin_threshold]
|
||
internal_gap_count = len(internal_gaps)
|
||
|
||
# --- 2. Horizontal projection profile → detect row gaps ---
|
||
horiz_proj = np.sum(ocr_img < 128, axis=1).astype(float)
|
||
h_kernel = max(3, h // 200)
|
||
if h_kernel % 2 == 0:
|
||
h_kernel += 1
|
||
horiz_smooth = np.convolve(horiz_proj, np.ones(h_kernel) / h_kernel, mode='same')
|
||
|
||
h_max = max(horiz_smooth.max(), 1)
|
||
h_gap_threshold = h_max * 0.05
|
||
min_row_gap = max(3, h // 200)
|
||
|
||
row_gap_count = 0
|
||
in_gap = False
|
||
for y in range(h):
|
||
if horiz_smooth[y] < h_gap_threshold:
|
||
if not in_gap:
|
||
in_gap = True
|
||
gap_start = y
|
||
else:
|
||
if in_gap:
|
||
if y - gap_start >= min_row_gap:
|
||
row_gap_count += 1
|
||
in_gap = False
|
||
|
||
# --- 3. Text density distribution (4×4 grid) ---
|
||
grid_rows, grid_cols = 4, 4
|
||
cell_h, cell_w = h // grid_rows, w // grid_cols
|
||
densities = []
|
||
for gr in range(grid_rows):
|
||
for gc in range(grid_cols):
|
||
cell = ocr_img[gr * cell_h:(gr + 1) * cell_h,
|
||
gc * cell_w:(gc + 1) * cell_w]
|
||
if cell.size > 0:
|
||
d = float(np.count_nonzero(cell < 128)) / cell.size
|
||
densities.append(d)
|
||
|
||
density_std = float(np.std(densities)) if densities else 0
|
||
density_mean = float(np.mean(densities)) if densities else 0
|
||
|
||
features = {
|
||
'vertical_gaps': gap_count,
|
||
'internal_vertical_gaps': internal_gap_count,
|
||
'vertical_gap_details': [(g[0], g[1], g[2]) for g in vert_gaps[:10]],
|
||
'row_gaps': row_gap_count,
|
||
'density_mean': round(density_mean, 4),
|
||
'density_std': round(density_std, 4),
|
||
'image_size': (w, h),
|
||
}
|
||
|
||
# --- 4. Decision tree ---
|
||
# Use internal_gap_count (excludes margin gaps) for column detection.
|
||
if internal_gap_count >= 2 and row_gap_count >= 5:
|
||
# Multiple internal vertical gaps + many row gaps → table
|
||
confidence = min(0.95, 0.7 + internal_gap_count * 0.05 + row_gap_count * 0.005)
|
||
return DocumentTypeResult(
|
||
doc_type='vocab_table',
|
||
confidence=round(confidence, 2),
|
||
pipeline='cell_first',
|
||
skip_steps=[],
|
||
features=features,
|
||
)
|
||
elif internal_gap_count >= 1 and row_gap_count >= 3:
|
||
# Some internal structure, likely a table
|
||
confidence = min(0.85, 0.5 + internal_gap_count * 0.1 + row_gap_count * 0.01)
|
||
return DocumentTypeResult(
|
||
doc_type='generic_table',
|
||
confidence=round(confidence, 2),
|
||
pipeline='cell_first',
|
||
skip_steps=[],
|
||
features=features,
|
||
)
|
||
elif internal_gap_count == 0:
|
||
# No internal column gaps → full text (regardless of density)
|
||
confidence = min(0.95, 0.8 + (1 - min(density_std, 0.1)) * 0.15)
|
||
return DocumentTypeResult(
|
||
doc_type='full_text',
|
||
confidence=round(confidence, 2),
|
||
pipeline='full_page',
|
||
skip_steps=['columns', 'rows'],
|
||
features=features,
|
||
)
|
||
else:
|
||
# Ambiguous — default to vocab_table (most common use case)
|
||
return DocumentTypeResult(
|
||
doc_type='vocab_table',
|
||
confidence=0.5,
|
||
pipeline='cell_first',
|
||
skip_steps=[],
|
||
features=features,
|
||
)
|
||
|
||
|
||
# =============================================================================
|
||
# Stage 4: Dual Image Preparation
|
||
# =============================================================================
|
||
|
||
def create_ocr_image(img: np.ndarray) -> np.ndarray:
|
||
"""Create a binarized image optimized for Tesseract OCR.
|
||
|
||
Steps: Grayscale → Background normalization → Adaptive threshold → Denoise.
|
||
|
||
Args:
|
||
img: BGR image.
|
||
|
||
Returns:
|
||
Binary image (white text on black background inverted to black on white).
|
||
"""
|
||
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||
|
||
# Background normalization: divide by blurred version
|
||
bg = cv2.GaussianBlur(gray, (51, 51), 0)
|
||
normalized = cv2.divide(gray, bg, scale=255)
|
||
|
||
# Adaptive binarization
|
||
binary = cv2.adaptiveThreshold(
|
||
normalized, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
||
cv2.THRESH_BINARY, 31, 10
|
||
)
|
||
|
||
# Light denoise
|
||
denoised = cv2.medianBlur(binary, 3)
|
||
|
||
return denoised
|
||
|
||
|
||
def create_layout_image(img: np.ndarray) -> np.ndarray:
|
||
"""Create a CLAHE-enhanced grayscale image for layout analysis.
|
||
|
||
Args:
|
||
img: BGR image.
|
||
|
||
Returns:
|
||
Enhanced grayscale image.
|
||
"""
|
||
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
|
||
enhanced = clahe.apply(gray)
|
||
return enhanced
|
||
|
||
|
||
# =============================================================================
|
||
# Stage 5: Layout Analysis (Projection Profiles)
|
||
# =============================================================================
|
||
|
||
def _filter_narrow_runs(mask: np.ndarray, min_width: int) -> np.ndarray:
|
||
"""Remove contiguous True-runs shorter than *min_width* from a 1-D bool mask."""
|
||
out = mask.copy()
|
||
n = len(out)
|
||
i = 0
|
||
while i < n:
|
||
if out[i]:
|
||
start = i
|
||
while i < n and out[i]:
|
||
i += 1
|
||
if (i - start) < min_width:
|
||
out[start:i] = False
|
||
else:
|
||
i += 1
|
||
return out
|
||
|
||
|
||
def _find_content_bounds(inv: np.ndarray) -> Tuple[int, int, int, int]:
|
||
"""Find the bounding box of actual text content (excluding page margins).
|
||
|
||
Scan artefacts (thin black lines at page edges) are filtered out by
|
||
discarding contiguous projection runs narrower than 1 % of the image
|
||
dimension (min 5 px).
|
||
|
||
Returns:
|
||
Tuple of (left_x, right_x, top_y, bottom_y).
|
||
"""
|
||
h, w = inv.shape[:2]
|
||
threshold = 0.005
|
||
|
||
# --- Horizontal projection for top/bottom ---
|
||
h_proj = np.sum(inv, axis=1).astype(float) / (w * 255)
|
||
h_mask = h_proj > threshold
|
||
min_h_run = max(5, h // 100)
|
||
h_mask = _filter_narrow_runs(h_mask, min_h_run)
|
||
|
||
top_y = 0
|
||
for y in range(h):
|
||
if h_mask[y]:
|
||
top_y = max(0, y - 5)
|
||
break
|
||
|
||
bottom_y = h
|
||
for y in range(h - 1, 0, -1):
|
||
if h_mask[y]:
|
||
bottom_y = min(h, y + 5)
|
||
break
|
||
|
||
# --- Vertical projection for left/right margins ---
|
||
v_proj = np.sum(inv[top_y:bottom_y, :], axis=0).astype(float)
|
||
v_proj_norm = v_proj / ((bottom_y - top_y) * 255) if (bottom_y - top_y) > 0 else v_proj
|
||
v_mask = v_proj_norm > threshold
|
||
min_v_run = max(5, w // 100)
|
||
v_mask = _filter_narrow_runs(v_mask, min_v_run)
|
||
|
||
left_x = 0
|
||
for x in range(w):
|
||
if v_mask[x]:
|
||
left_x = max(0, x - 2)
|
||
break
|
||
|
||
right_x = w
|
||
for x in range(w - 1, 0, -1):
|
||
if v_mask[x]:
|
||
right_x = min(w, x + 2)
|
||
break
|
||
|
||
return left_x, right_x, top_y, bottom_y
|
||
|
||
|
||
def analyze_layout(layout_img: np.ndarray, ocr_img: np.ndarray) -> List[PageRegion]:
|
||
"""Detect columns, header, and footer using projection profiles.
|
||
|
||
Uses content-bounds detection to exclude page margins before searching
|
||
for column separators within the actual text area.
|
||
|
||
Args:
|
||
layout_img: CLAHE-enhanced grayscale image.
|
||
ocr_img: Binarized image for text density analysis.
|
||
|
||
Returns:
|
||
List of PageRegion objects describing detected regions.
|
||
"""
|
||
h, w = ocr_img.shape[:2]
|
||
|
||
# Invert: black text on white → white text on black for projection
|
||
inv = cv2.bitwise_not(ocr_img)
|
||
|
||
# --- Find actual content bounds (exclude page margins) ---
|
||
left_x, right_x, top_y, bottom_y = _find_content_bounds(inv)
|
||
content_w = right_x - left_x
|
||
content_h = bottom_y - top_y
|
||
|
||
logger.info(f"Layout: content bounds x=[{left_x}..{right_x}] ({content_w}px), "
|
||
f"y=[{top_y}..{bottom_y}] ({content_h}px) in {w}x{h} image")
|
||
|
||
if content_w < w * 0.3 or content_h < h * 0.3:
|
||
# Fallback if detection seems wrong
|
||
left_x, right_x = 0, w
|
||
top_y, bottom_y = 0, h
|
||
content_w, content_h = w, h
|
||
|
||
# --- Vertical projection within content area to find column separators ---
|
||
content_strip = inv[top_y:bottom_y, left_x:right_x]
|
||
v_proj = np.sum(content_strip, axis=0).astype(float)
|
||
v_proj_norm = v_proj / (content_h * 255) if content_h > 0 else v_proj
|
||
|
||
# Smooth the projection profile
|
||
kernel_size = max(5, content_w // 50)
|
||
if kernel_size % 2 == 0:
|
||
kernel_size += 1
|
||
v_proj_smooth = np.convolve(v_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
|
||
|
||
# Debug: log projection profile statistics
|
||
p_mean = float(np.mean(v_proj_smooth))
|
||
p_median = float(np.median(v_proj_smooth))
|
||
p_min = float(np.min(v_proj_smooth))
|
||
p_max = float(np.max(v_proj_smooth))
|
||
logger.info(f"Layout: v_proj stats — min={p_min:.4f}, max={p_max:.4f}, "
|
||
f"mean={p_mean:.4f}, median={p_median:.4f}")
|
||
|
||
# Find valleys using multiple threshold strategies
|
||
# Strategy 1: relative to median (catches clear separators)
|
||
# Strategy 2: local minima approach (catches subtle gaps)
|
||
threshold = max(p_median * 0.3, p_mean * 0.2)
|
||
logger.info(f"Layout: valley threshold={threshold:.4f}")
|
||
|
||
in_valley = v_proj_smooth < threshold
|
||
|
||
# Find contiguous valley regions
|
||
all_valleys = []
|
||
start = None
|
||
for x in range(len(v_proj_smooth)):
|
||
if in_valley[x] and start is None:
|
||
start = x
|
||
elif not in_valley[x] and start is not None:
|
||
valley_width = x - start
|
||
valley_depth = float(np.min(v_proj_smooth[start:x]))
|
||
# Valley must be at least 3px wide
|
||
if valley_width >= 3:
|
||
all_valleys.append((start, x, (start + x) // 2, valley_width, valley_depth))
|
||
start = None
|
||
|
||
logger.info(f"Layout: raw valleys (before filter): {len(all_valleys)} — "
|
||
f"{[(v[0]+left_x, v[1]+left_x, v[3], f'{v[4]:.4f}') for v in all_valleys[:10]]}")
|
||
|
||
# Filter: valleys must be inside the content area (not at edges)
|
||
inner_margin = int(content_w * 0.08)
|
||
valleys = [v for v in all_valleys if inner_margin < v[2] < content_w - inner_margin]
|
||
|
||
# If no valleys found with strict threshold, try local minima approach
|
||
if len(valleys) < 2:
|
||
logger.info("Layout: trying local minima approach for column detection")
|
||
# Divide content into 20 segments, find the 2 lowest
|
||
seg_count = 20
|
||
seg_width = content_w // seg_count
|
||
seg_scores = []
|
||
for i in range(seg_count):
|
||
sx = i * seg_width
|
||
ex = min((i + 1) * seg_width, content_w)
|
||
seg_mean = float(np.mean(v_proj_smooth[sx:ex]))
|
||
seg_scores.append((i, sx, ex, seg_mean))
|
||
|
||
seg_scores.sort(key=lambda s: s[3])
|
||
logger.info(f"Layout: segment scores (lowest 5): "
|
||
f"{[(s[0], s[1]+left_x, s[2]+left_x, f'{s[3]:.4f}') for s in seg_scores[:5]]}")
|
||
|
||
# Find two lowest non-adjacent segments that create reasonable columns
|
||
candidate_valleys = []
|
||
for seg_idx, sx, ex, seg_mean in seg_scores:
|
||
# Must not be at the edges
|
||
if seg_idx <= 1 or seg_idx >= seg_count - 2:
|
||
continue
|
||
# Must be significantly lower than overall mean
|
||
if seg_mean < p_mean * 0.6:
|
||
center = (sx + ex) // 2
|
||
candidate_valleys.append((sx, ex, center, ex - sx, seg_mean))
|
||
|
||
if len(candidate_valleys) >= 2:
|
||
# Pick the best pair: non-adjacent, creating reasonable column widths
|
||
candidate_valleys.sort(key=lambda v: v[2])
|
||
best_pair = None
|
||
best_score = float('inf')
|
||
for i in range(len(candidate_valleys)):
|
||
for j in range(i + 1, len(candidate_valleys)):
|
||
c1 = candidate_valleys[i][2]
|
||
c2 = candidate_valleys[j][2]
|
||
# Must be at least 20% apart
|
||
if (c2 - c1) < content_w * 0.2:
|
||
continue
|
||
col1 = c1
|
||
col2 = c2 - c1
|
||
col3 = content_w - c2
|
||
# Each column at least 15%
|
||
if col1 < content_w * 0.12 or col2 < content_w * 0.12 or col3 < content_w * 0.12:
|
||
continue
|
||
parts = sorted([col1, col2, col3])
|
||
score = parts[2] - parts[0]
|
||
if score < best_score:
|
||
best_score = score
|
||
best_pair = (candidate_valleys[i], candidate_valleys[j])
|
||
|
||
if best_pair:
|
||
valleys = list(best_pair)
|
||
logger.info(f"Layout: local minima found 2 valleys: "
|
||
f"{[(v[0]+left_x, v[1]+left_x, v[3]) for v in valleys]}")
|
||
|
||
logger.info(f"Layout: final {len(valleys)} valleys: "
|
||
f"{[(v[0]+left_x, v[1]+left_x, v[3]) for v in valleys]}")
|
||
|
||
regions = []
|
||
|
||
if len(valleys) >= 2:
|
||
# 3-column layout detected
|
||
valleys.sort(key=lambda v: v[2])
|
||
|
||
if len(valleys) == 2:
|
||
sep1_center = valleys[0][2]
|
||
sep2_center = valleys[1][2]
|
||
else:
|
||
# Pick the two valleys that best divide into 3 parts
|
||
# Prefer wider valleys (more likely true separators)
|
||
best_pair = None
|
||
best_score = float('inf')
|
||
for i in range(len(valleys)):
|
||
for j in range(i + 1, len(valleys)):
|
||
c1, c2 = valleys[i][2], valleys[j][2]
|
||
# Each column should be at least 15% of content width
|
||
col1 = c1
|
||
col2 = c2 - c1
|
||
col3 = content_w - c2
|
||
if col1 < content_w * 0.15 or col2 < content_w * 0.15 or col3 < content_w * 0.15:
|
||
continue
|
||
# Score: lower is better (more even distribution)
|
||
parts = sorted([col1, col2, col3])
|
||
score = parts[2] - parts[0]
|
||
# Bonus for wider valleys (subtract valley width)
|
||
score -= (valleys[i][3] + valleys[j][3]) * 0.5
|
||
if score < best_score:
|
||
best_score = score
|
||
best_pair = (c1, c2)
|
||
if best_pair:
|
||
sep1_center, sep2_center = best_pair
|
||
else:
|
||
sep1_center = valleys[0][2]
|
||
sep2_center = valleys[1][2]
|
||
|
||
# Convert from content-relative to absolute coordinates
|
||
abs_sep1 = sep1_center + left_x
|
||
abs_sep2 = sep2_center + left_x
|
||
|
||
logger.info(f"Layout: 3 columns at separators x={abs_sep1}, x={abs_sep2} "
|
||
f"(widths: {abs_sep1}, {abs_sep2-abs_sep1}, {w-abs_sep2})")
|
||
|
||
regions.append(PageRegion(
|
||
type='column_en', x=0, y=top_y,
|
||
width=abs_sep1, height=content_h
|
||
))
|
||
regions.append(PageRegion(
|
||
type='column_de', x=abs_sep1, y=top_y,
|
||
width=abs_sep2 - abs_sep1, height=content_h
|
||
))
|
||
regions.append(PageRegion(
|
||
type='column_example', x=abs_sep2, y=top_y,
|
||
width=w - abs_sep2, height=content_h
|
||
))
|
||
|
||
elif len(valleys) == 1:
|
||
# 2-column layout
|
||
abs_sep = valleys[0][2] + left_x
|
||
|
||
logger.info(f"Layout: 2 columns at separator x={abs_sep}")
|
||
|
||
regions.append(PageRegion(
|
||
type='column_en', x=0, y=top_y,
|
||
width=abs_sep, height=content_h
|
||
))
|
||
regions.append(PageRegion(
|
||
type='column_de', x=abs_sep, y=top_y,
|
||
width=w - abs_sep, height=content_h
|
||
))
|
||
|
||
else:
|
||
# No columns detected — run full-page OCR as single column
|
||
logger.warning("Layout: no column separators found, using full page")
|
||
regions.append(PageRegion(
|
||
type='column_en', x=0, y=top_y,
|
||
width=w, height=content_h
|
||
))
|
||
|
||
# Add header/footer info (gap-based detection with fallback)
|
||
_add_header_footer(regions, top_y, bottom_y, w, h, inv=inv)
|
||
|
||
top_region = next((r.type for r in regions if r.type in ('header', 'margin_top')), 'none')
|
||
bottom_region = next((r.type for r in regions if r.type in ('footer', 'margin_bottom')), 'none')
|
||
col_count = len([r for r in regions if r.type.startswith('column')])
|
||
logger.info(f"Layout: {col_count} columns, top={top_region}, bottom={bottom_region}")
|
||
|
||
return regions
|
||
|
||
|
||
# =============================================================================
|
||
# Stage 5b: Word-Based Layout Analysis (Two-Phase Column Detection)
|
||
# =============================================================================
|
||
|
||
# --- Phase A: Geometry Detection ---
|
||
|
||
def _detect_columns_by_clustering(
|
||
word_dicts: List[Dict],
|
||
left_edges: List[int],
|
||
edge_word_indices: List[int],
|
||
content_w: int,
|
||
content_h: int,
|
||
left_x: int,
|
||
right_x: int,
|
||
top_y: int,
|
||
bottom_y: int,
|
||
inv: Optional[np.ndarray] = None,
|
||
) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int, List[Dict], Optional[np.ndarray]]]:
|
||
"""Fallback: detect columns by clustering left-aligned word positions.
|
||
|
||
Used when the primary gap-based algorithm finds fewer than 2 gaps.
|
||
"""
|
||
tolerance = max(10, int(content_w * 0.01))
|
||
sorted_pairs = sorted(zip(left_edges, edge_word_indices), key=lambda p: p[0])
|
||
|
||
clusters = []
|
||
cluster_widxs = []
|
||
cur_edges = [sorted_pairs[0][0]]
|
||
cur_widxs = [sorted_pairs[0][1]]
|
||
for edge, widx in sorted_pairs[1:]:
|
||
if edge - cur_edges[-1] <= tolerance:
|
||
cur_edges.append(edge)
|
||
cur_widxs.append(widx)
|
||
else:
|
||
clusters.append(cur_edges)
|
||
cluster_widxs.append(cur_widxs)
|
||
cur_edges = [edge]
|
||
cur_widxs = [widx]
|
||
clusters.append(cur_edges)
|
||
cluster_widxs.append(cur_widxs)
|
||
|
||
MIN_Y_COVERAGE_PRIMARY = 0.30
|
||
MIN_Y_COVERAGE_SECONDARY = 0.15
|
||
MIN_WORDS_SECONDARY = 5
|
||
|
||
cluster_infos = []
|
||
for c_edges, c_widxs in zip(clusters, cluster_widxs):
|
||
if len(c_edges) < 2:
|
||
continue
|
||
y_positions = [word_dicts[idx]['top'] for idx in c_widxs]
|
||
y_span = max(y_positions) - min(y_positions)
|
||
y_coverage = y_span / content_h if content_h > 0 else 0.0
|
||
cluster_infos.append({
|
||
'mean_x': int(np.mean(c_edges)),
|
||
'count': len(c_edges),
|
||
'min_edge': min(c_edges),
|
||
'max_edge': max(c_edges),
|
||
'y_min': min(y_positions),
|
||
'y_max': max(y_positions),
|
||
'y_coverage': y_coverage,
|
||
})
|
||
|
||
primary = [c for c in cluster_infos if c['y_coverage'] >= MIN_Y_COVERAGE_PRIMARY]
|
||
primary_set = set(id(c) for c in primary)
|
||
secondary = [c for c in cluster_infos
|
||
if id(c) not in primary_set
|
||
and c['y_coverage'] >= MIN_Y_COVERAGE_SECONDARY
|
||
and c['count'] >= MIN_WORDS_SECONDARY]
|
||
significant = sorted(primary + secondary, key=lambda c: c['mean_x'])
|
||
|
||
if len(significant) < 3:
|
||
logger.info("ColumnGeometry clustering fallback: < 3 significant clusters")
|
||
return None
|
||
|
||
merge_distance = max(30, int(content_w * 0.06))
|
||
merged = [significant[0].copy()]
|
||
for s in significant[1:]:
|
||
if s['mean_x'] - merged[-1]['mean_x'] < merge_distance:
|
||
prev = merged[-1]
|
||
total = prev['count'] + s['count']
|
||
avg_x = (prev['mean_x'] * prev['count'] + s['mean_x'] * s['count']) // total
|
||
prev['mean_x'] = avg_x
|
||
prev['count'] = total
|
||
prev['min_edge'] = min(prev['min_edge'], s['min_edge'])
|
||
prev['max_edge'] = max(prev['max_edge'], s['max_edge'])
|
||
else:
|
||
merged.append(s.copy())
|
||
|
||
if len(merged) < 3:
|
||
logger.info("ColumnGeometry clustering fallback: < 3 merged clusters")
|
||
return None
|
||
|
||
logger.info(f"ColumnGeometry clustering fallback: {len(merged)} columns from clustering")
|
||
|
||
margin_px = max(6, int(content_w * 0.003))
|
||
return _build_geometries_from_starts(
|
||
[(max(0, left_x + m['min_edge'] - margin_px), m['count']) for m in merged],
|
||
word_dicts, left_x, right_x, top_y, bottom_y, content_w, content_h, inv,
|
||
)
|
||
|
||
|
||
def _detect_sub_columns(
|
||
geometries: List[ColumnGeometry],
|
||
content_w: int,
|
||
left_x: int = 0,
|
||
top_y: int = 0,
|
||
header_y: Optional[int] = None,
|
||
footer_y: Optional[int] = None,
|
||
_edge_tolerance: int = 8,
|
||
_min_col_start_ratio: float = 0.10,
|
||
) -> List[ColumnGeometry]:
|
||
"""Split columns that contain internal sub-columns based on left-edge alignment.
|
||
|
||
For each column, clusters word left-edges into alignment bins (within
|
||
``_edge_tolerance`` px). The leftmost bin whose word count reaches
|
||
``_min_col_start_ratio`` of the column total is treated as the true column
|
||
start. Any words to the left of that bin form a sub-column, provided they
|
||
number >= 2 and < 35 % of total.
|
||
|
||
Word ``left`` values are relative to the content ROI (offset by *left_x*),
|
||
while ``ColumnGeometry.x`` is in absolute image coordinates. *left_x*
|
||
bridges the two coordinate systems.
|
||
|
||
If *header_y* / *footer_y* are provided (absolute y-coordinates), words
|
||
in header/footer regions are excluded from alignment clustering to avoid
|
||
polluting the bins with page numbers or chapter titles. Word ``top``
|
||
values are relative to *top_y*.
|
||
|
||
Returns a new list of ColumnGeometry — potentially longer than the input.
|
||
"""
|
||
if content_w <= 0:
|
||
return geometries
|
||
|
||
result: List[ColumnGeometry] = []
|
||
for geo in geometries:
|
||
# Only consider wide-enough columns with enough words
|
||
if geo.width_ratio < 0.15 or geo.word_count < 5:
|
||
result.append(geo)
|
||
continue
|
||
|
||
# Collect left-edges of confident words, excluding header/footer
|
||
# Convert header_y/footer_y from absolute to relative (word 'top' is relative to top_y)
|
||
min_top_rel = (header_y - top_y) if header_y is not None else None
|
||
max_top_rel = (footer_y - top_y) if footer_y is not None else None
|
||
|
||
confident = [w for w in geo.words
|
||
if w.get('conf', 0) >= 30
|
||
and (min_top_rel is None or w['top'] >= min_top_rel)
|
||
and (max_top_rel is None or w['top'] <= max_top_rel)]
|
||
if len(confident) < 3:
|
||
result.append(geo)
|
||
continue
|
||
|
||
# --- Cluster left-edges into alignment bins ---
|
||
sorted_edges = sorted(w['left'] for w in confident)
|
||
bins: List[Tuple[int, int, int, int]] = [] # (center, count, min_edge, max_edge)
|
||
cur = [sorted_edges[0]]
|
||
for i in range(1, len(sorted_edges)):
|
||
if sorted_edges[i] - cur[-1] <= _edge_tolerance:
|
||
cur.append(sorted_edges[i])
|
||
else:
|
||
bins.append((sum(cur) // len(cur), len(cur), min(cur), max(cur)))
|
||
cur = [sorted_edges[i]]
|
||
bins.append((sum(cur) // len(cur), len(cur), min(cur), max(cur)))
|
||
|
||
# --- Find the leftmost bin qualifying as a real column start ---
|
||
total = len(confident)
|
||
min_count = max(3, int(total * _min_col_start_ratio))
|
||
col_start_bin = None
|
||
for b in bins:
|
||
if b[1] >= min_count:
|
||
col_start_bin = b
|
||
break
|
||
|
||
if col_start_bin is None:
|
||
result.append(geo)
|
||
continue
|
||
|
||
# Words to the left of the column-start bin are sub-column candidates
|
||
split_threshold = col_start_bin[2] - _edge_tolerance
|
||
sub_words = [w for w in geo.words if w['left'] < split_threshold]
|
||
main_words = [w for w in geo.words if w['left'] >= split_threshold]
|
||
|
||
# Count only body words (excluding header/footer) for the threshold check
|
||
# so that header/footer words don't artificially trigger a split.
|
||
sub_body = [w for w in sub_words
|
||
if (min_top_rel is None or w['top'] >= min_top_rel)
|
||
and (max_top_rel is None or w['top'] <= max_top_rel)]
|
||
if len(sub_body) < 2 or len(sub_body) / len(geo.words) >= 0.35:
|
||
result.append(geo)
|
||
continue
|
||
|
||
# --- Build two sub-column geometries ---
|
||
# Word 'left' values are relative to left_x; geo.x is absolute.
|
||
# Convert the split position from relative to absolute coordinates.
|
||
max_sub_left = max(w['left'] for w in sub_words)
|
||
split_rel = (max_sub_left + col_start_bin[2]) // 2
|
||
split_abs = split_rel + left_x
|
||
|
||
sub_x = geo.x
|
||
sub_width = split_abs - geo.x
|
||
main_x = split_abs
|
||
main_width = (geo.x + geo.width) - split_abs
|
||
|
||
if sub_width <= 0 or main_width <= 0:
|
||
result.append(geo)
|
||
continue
|
||
|
||
sub_geo = ColumnGeometry(
|
||
index=0,
|
||
x=sub_x,
|
||
y=geo.y,
|
||
width=sub_width,
|
||
height=geo.height,
|
||
word_count=len(sub_words),
|
||
words=sub_words,
|
||
width_ratio=sub_width / content_w if content_w > 0 else 0.0,
|
||
is_sub_column=True,
|
||
)
|
||
main_geo = ColumnGeometry(
|
||
index=0,
|
||
x=main_x,
|
||
y=geo.y,
|
||
width=main_width,
|
||
height=geo.height,
|
||
word_count=len(main_words),
|
||
words=main_words,
|
||
width_ratio=main_width / content_w if content_w > 0 else 0.0,
|
||
is_sub_column=True,
|
||
)
|
||
|
||
result.append(sub_geo)
|
||
result.append(main_geo)
|
||
|
||
logger.info(
|
||
f"SubColumnSplit: column idx={geo.index} split at abs_x={split_abs} "
|
||
f"(rel={split_rel}), sub={len(sub_words)} words, "
|
||
f"main={len(main_words)} words, "
|
||
f"col_start_bin=({col_start_bin[0]}, n={col_start_bin[1]})"
|
||
)
|
||
|
||
# Re-index by left-to-right order
|
||
result.sort(key=lambda g: g.x)
|
||
for i, g in enumerate(result):
|
||
g.index = i
|
||
|
||
return result
|
||
|
||
|
||
def _split_broad_columns(
|
||
geometries: List[ColumnGeometry],
|
||
content_w: int,
|
||
left_x: int = 0,
|
||
_broad_threshold: float = 0.35,
|
||
_min_gap_px: int = 15,
|
||
_min_words_per_split: int = 5,
|
||
) -> List[ColumnGeometry]:
|
||
"""Split overly broad columns that contain two language blocks (EN+DE).
|
||
|
||
Uses word-coverage gap analysis: builds a per-pixel coverage array from the
|
||
words inside each broad column, finds the largest horizontal gap, and splits
|
||
the column at that gap.
|
||
|
||
Args:
|
||
geometries: Column geometries from _detect_sub_columns.
|
||
content_w: Width of the content area in pixels.
|
||
left_x: Left edge of content ROI in absolute image coordinates.
|
||
_broad_threshold: Minimum width_ratio to consider a column "broad".
|
||
_min_gap_px: Minimum gap width (pixels) to trigger a split.
|
||
_min_words_per_split: Both halves must have at least this many words.
|
||
|
||
Returns:
|
||
Updated list of ColumnGeometry (possibly with more columns).
|
||
"""
|
||
result: List[ColumnGeometry] = []
|
||
|
||
logger.info(f"SplitBroadCols: input {len(geometries)} cols: "
|
||
f"{[(g.index, g.x, g.width, g.word_count, round(g.width_ratio, 3)) for g in geometries]}")
|
||
|
||
for geo in geometries:
|
||
if geo.width_ratio <= _broad_threshold or len(geo.words) < 10:
|
||
result.append(geo)
|
||
continue
|
||
|
||
# Build word-coverage array (per pixel within column)
|
||
col_left_rel = geo.x - left_x # column left in content-relative coords
|
||
coverage = np.zeros(geo.width, dtype=np.float32)
|
||
|
||
for wd in geo.words:
|
||
# wd['left'] is relative to left_x (content ROI)
|
||
wl = wd['left'] - col_left_rel
|
||
wr = wl + wd.get('width', 0)
|
||
wl = max(0, int(wl))
|
||
wr = min(geo.width, int(wr))
|
||
if wr > wl:
|
||
coverage[wl:wr] += 1.0
|
||
|
||
# Light smoothing (kernel=3px) to avoid noise
|
||
if len(coverage) > 3:
|
||
kernel = np.ones(3, dtype=np.float32) / 3.0
|
||
coverage = np.convolve(coverage, kernel, mode='same')
|
||
|
||
# Normalise to [0, 1]
|
||
cmax = coverage.max()
|
||
if cmax > 0:
|
||
coverage /= cmax
|
||
|
||
# Find INTERNAL gaps where coverage < 0.5
|
||
# Exclude edge gaps (touching pixel 0 or geo.width) — those are margins.
|
||
low_mask = coverage < 0.5
|
||
all_gaps = []
|
||
_gs = None
|
||
for px in range(len(low_mask)):
|
||
if low_mask[px]:
|
||
if _gs is None:
|
||
_gs = px
|
||
else:
|
||
if _gs is not None:
|
||
all_gaps.append((_gs, px, px - _gs))
|
||
_gs = None
|
||
if _gs is not None:
|
||
all_gaps.append((_gs, len(low_mask), len(low_mask) - _gs))
|
||
|
||
# Filter: only internal gaps (not touching column edges)
|
||
_edge_margin = 10 # pixels from edge to ignore
|
||
internal_gaps = [g for g in all_gaps
|
||
if g[0] > _edge_margin and g[1] < geo.width - _edge_margin]
|
||
best_gap = max(internal_gaps, key=lambda g: g[2]) if internal_gaps else None
|
||
|
||
logger.info(f"SplitBroadCols: col {geo.index} all_gaps(>=5px): "
|
||
f"{[g for g in all_gaps if g[2] >= 5]}, "
|
||
f"internal(>=5px): {[g for g in internal_gaps if g[2] >= 5]}, "
|
||
f"best={best_gap}")
|
||
|
||
if best_gap is None or best_gap[2] < _min_gap_px:
|
||
result.append(geo)
|
||
continue
|
||
|
||
gap_center = (best_gap[0] + best_gap[1]) // 2
|
||
|
||
# Split words by midpoint relative to gap
|
||
left_words = []
|
||
right_words = []
|
||
for wd in geo.words:
|
||
wl = wd['left'] - col_left_rel
|
||
mid = wl + wd.get('width', 0) / 2.0
|
||
if mid < gap_center:
|
||
left_words.append(wd)
|
||
else:
|
||
right_words.append(wd)
|
||
|
||
if len(left_words) < _min_words_per_split or len(right_words) < _min_words_per_split:
|
||
result.append(geo)
|
||
continue
|
||
|
||
# Build two new ColumnGeometry objects
|
||
split_x_abs = geo.x + gap_center
|
||
left_w = gap_center
|
||
right_w = geo.width - gap_center
|
||
|
||
left_geo = ColumnGeometry(
|
||
index=0,
|
||
x=geo.x,
|
||
y=geo.y,
|
||
width=left_w,
|
||
height=geo.height,
|
||
word_count=len(left_words),
|
||
words=left_words,
|
||
width_ratio=left_w / content_w if content_w else 0,
|
||
is_sub_column=True,
|
||
)
|
||
right_geo = ColumnGeometry(
|
||
index=0,
|
||
x=split_x_abs,
|
||
y=geo.y,
|
||
width=right_w,
|
||
height=geo.height,
|
||
word_count=len(right_words),
|
||
words=right_words,
|
||
width_ratio=right_w / content_w if content_w else 0,
|
||
is_sub_column=True,
|
||
)
|
||
|
||
logger.info(
|
||
f"SplitBroadCols: col {geo.index} SPLIT at gap_center={gap_center} "
|
||
f"(gap {best_gap[2]}px @ [{best_gap[0]}..{best_gap[1]}]), "
|
||
f"left={len(left_words)} words (w={left_w}), "
|
||
f"right={len(right_words)} words (w={right_w})"
|
||
)
|
||
|
||
result.append(left_geo)
|
||
result.append(right_geo)
|
||
|
||
# Re-index left-to-right
|
||
result.sort(key=lambda g: g.x)
|
||
for i, g in enumerate(result):
|
||
g.index = i
|
||
|
||
return result
|
||
|
||
|
||
def _build_geometries_from_starts(
|
||
col_starts: List[Tuple[int, int]],
|
||
word_dicts: List[Dict],
|
||
left_x: int,
|
||
right_x: int,
|
||
top_y: int,
|
||
bottom_y: int,
|
||
content_w: int,
|
||
content_h: int,
|
||
inv: Optional[np.ndarray] = None,
|
||
) -> Tuple[List[ColumnGeometry], int, int, int, int, List[Dict], Optional[np.ndarray]]:
|
||
"""Build ColumnGeometry objects from a list of (abs_start_x, word_count) pairs."""
|
||
geometries = []
|
||
for i, (start_x, count) in enumerate(col_starts):
|
||
if i + 1 < len(col_starts):
|
||
col_width = col_starts[i + 1][0] - start_x
|
||
else:
|
||
col_width = right_x - start_x
|
||
|
||
col_left_rel = start_x - left_x
|
||
col_right_rel = col_left_rel + col_width
|
||
col_words = [w for w in word_dicts
|
||
if col_left_rel <= w['left'] < col_right_rel]
|
||
|
||
geometries.append(ColumnGeometry(
|
||
index=i,
|
||
x=start_x,
|
||
y=top_y,
|
||
width=col_width,
|
||
height=content_h,
|
||
word_count=len(col_words),
|
||
words=col_words,
|
||
width_ratio=col_width / content_w if content_w > 0 else 0.0,
|
||
))
|
||
|
||
logger.info(f"ColumnGeometry: {len(geometries)} columns: "
|
||
f"{[(g.index, g.x, g.width, g.word_count) for g in geometries]}")
|
||
return (geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv)
|
||
|
||
|
||
def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int, List[Dict], np.ndarray]]:
|
||
"""Detect column geometry using whitespace-gap analysis with word validation.
|
||
|
||
Phase A of the two-phase column detection. Uses vertical projection
|
||
profiles to find whitespace gaps between columns, then validates that
|
||
no gap cuts through a word bounding box.
|
||
|
||
Falls back to clustering-based detection if fewer than 2 gaps are found.
|
||
|
||
Args:
|
||
ocr_img: Binarized grayscale image for layout analysis.
|
||
dewarped_bgr: Original BGR image (for Tesseract word detection).
|
||
|
||
Returns:
|
||
Tuple of (geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv)
|
||
or None if detection fails entirely.
|
||
"""
|
||
h, w = ocr_img.shape[:2]
|
||
|
||
# --- Step 1: Find content bounds ---
|
||
inv = cv2.bitwise_not(ocr_img)
|
||
left_x, right_x, top_y, bottom_y = _find_content_bounds(inv)
|
||
content_w = right_x - left_x
|
||
content_h = bottom_y - top_y
|
||
|
||
if content_w < w * 0.3 or content_h < h * 0.3:
|
||
left_x, right_x = 0, w
|
||
top_y, bottom_y = 0, h
|
||
content_w, content_h = w, h
|
||
|
||
logger.info(f"ColumnGeometry: content bounds x=[{left_x}..{right_x}] ({content_w}px), "
|
||
f"y=[{top_y}..{bottom_y}] ({content_h}px)")
|
||
|
||
# --- Step 2: Get word bounding boxes from Tesseract ---
|
||
# Crop from left_x to full image width (not right_x) so words at the right
|
||
# edge of the last column are included even if they extend past the detected
|
||
# content boundary (right_x).
|
||
content_roi = dewarped_bgr[top_y:bottom_y, left_x:w]
|
||
pil_img = Image.fromarray(cv2.cvtColor(content_roi, cv2.COLOR_BGR2RGB))
|
||
|
||
try:
|
||
data = pytesseract.image_to_data(pil_img, lang='eng+deu', output_type=pytesseract.Output.DICT)
|
||
except Exception as e:
|
||
logger.warning(f"ColumnGeometry: Tesseract image_to_data failed: {e}")
|
||
return None
|
||
|
||
word_dicts = []
|
||
left_edges = []
|
||
edge_word_indices = []
|
||
n_words = len(data['text'])
|
||
for i in range(n_words):
|
||
conf = int(data['conf'][i]) if str(data['conf'][i]).lstrip('-').isdigit() else -1
|
||
text = str(data['text'][i]).strip()
|
||
if conf < 30 or not text:
|
||
continue
|
||
lx = int(data['left'][i])
|
||
ty = int(data['top'][i])
|
||
bw = int(data['width'][i])
|
||
bh = int(data['height'][i])
|
||
left_edges.append(lx)
|
||
edge_word_indices.append(len(word_dicts))
|
||
word_dicts.append({
|
||
'text': text, 'conf': conf,
|
||
'left': lx, 'top': ty, 'width': bw, 'height': bh,
|
||
})
|
||
|
||
if len(left_edges) < 5:
|
||
logger.warning(f"ColumnGeometry: only {len(left_edges)} words detected")
|
||
return None
|
||
|
||
logger.info(f"ColumnGeometry: {len(left_edges)} words detected in content area")
|
||
|
||
# --- Step 2b: Segment by sub-headers ---
|
||
# Pages with sub-headers (e.g. "Unit 4: Bonnie Scotland") have full-width
|
||
# text bands that pollute the vertical projection. We detect large
|
||
# horizontal gaps (= whitespace rows separating sections) and use only
|
||
# the tallest content segment for the projection. This makes column
|
||
# detection immune to sub-headers, illustrations, and section dividers.
|
||
content_strip = inv[top_y:bottom_y, left_x:right_x]
|
||
h_proj_row = np.sum(content_strip, axis=1).astype(float)
|
||
h_proj_row_norm = h_proj_row / (content_w * 255) if content_w > 0 else h_proj_row
|
||
|
||
# Find horizontal gaps (near-empty rows)
|
||
H_GAP_THRESH = 0.02 # rows with <2% ink density are "empty"
|
||
h_in_gap = h_proj_row_norm < H_GAP_THRESH
|
||
H_MIN_GAP = max(5, content_h // 200) # min gap height ~5-7px
|
||
|
||
h_gaps: List[Tuple[int, int]] = []
|
||
h_gap_start = None
|
||
for y_idx in range(len(h_in_gap)):
|
||
if h_in_gap[y_idx]:
|
||
if h_gap_start is None:
|
||
h_gap_start = y_idx
|
||
else:
|
||
if h_gap_start is not None:
|
||
if y_idx - h_gap_start >= H_MIN_GAP:
|
||
h_gaps.append((h_gap_start, y_idx))
|
||
h_gap_start = None
|
||
if h_gap_start is not None and len(h_in_gap) - h_gap_start >= H_MIN_GAP:
|
||
h_gaps.append((h_gap_start, len(h_in_gap)))
|
||
|
||
# Identify "large" gaps (significantly bigger than median) that indicate
|
||
# section boundaries (sub-headers, chapter titles).
|
||
if len(h_gaps) >= 3:
|
||
gap_sizes = sorted(g[1] - g[0] for g in h_gaps)
|
||
median_gap_h = gap_sizes[len(gap_sizes) // 2]
|
||
large_gap_thresh = max(median_gap_h * 1.8, H_MIN_GAP + 3)
|
||
large_gaps = [(gs, ge) for gs, ge in h_gaps if ge - gs >= large_gap_thresh]
|
||
else:
|
||
large_gaps = h_gaps
|
||
|
||
# Build content segments between large gaps and pick the tallest
|
||
seg_boundaries = [0]
|
||
for gs, ge in large_gaps:
|
||
seg_boundaries.append(gs)
|
||
seg_boundaries.append(ge)
|
||
seg_boundaries.append(content_h)
|
||
|
||
segments = []
|
||
for i in range(0, len(seg_boundaries) - 1, 2):
|
||
seg_top = seg_boundaries[i]
|
||
seg_bot = seg_boundaries[i + 1] if i + 1 < len(seg_boundaries) else content_h
|
||
seg_height = seg_bot - seg_top
|
||
if seg_height > 20: # ignore tiny fragments
|
||
segments.append((seg_top, seg_bot, seg_height))
|
||
|
||
if segments:
|
||
segments.sort(key=lambda s: s[2], reverse=True)
|
||
best_seg = segments[0]
|
||
proj_strip = content_strip[best_seg[0]:best_seg[1], :]
|
||
effective_h = best_seg[2]
|
||
if len(segments) > 1:
|
||
logger.info(f"ColumnGeometry: {len(segments)} segments from {len(large_gaps)} "
|
||
f"large h-gaps, using tallest: rows {best_seg[0]}..{best_seg[1]} "
|
||
f"({effective_h}px, {effective_h*100/content_h:.0f}%)")
|
||
else:
|
||
proj_strip = content_strip
|
||
effective_h = content_h
|
||
|
||
# --- Step 3: Vertical projection profile ---
|
||
v_proj = np.sum(proj_strip, axis=0).astype(float)
|
||
v_proj_norm = v_proj / (effective_h * 255) if effective_h > 0 else v_proj
|
||
|
||
# Smooth the projection to avoid noise-induced micro-gaps
|
||
kernel_size = max(5, content_w // 80)
|
||
if kernel_size % 2 == 0:
|
||
kernel_size += 1 # keep odd for symmetry
|
||
v_smooth = np.convolve(v_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
|
||
|
||
# --- Step 4: Find whitespace gaps ---
|
||
# Threshold: areas with very little ink density are gaps
|
||
median_density = float(np.median(v_smooth[v_smooth > 0])) if np.any(v_smooth > 0) else 0.01
|
||
gap_threshold = max(median_density * 0.15, 0.005)
|
||
|
||
in_gap = v_smooth < gap_threshold
|
||
MIN_GAP_WIDTH = max(8, content_w // 200) # min ~8px or 0.5% of content width
|
||
|
||
# Collect contiguous gap regions
|
||
raw_gaps = [] # (start_x_rel, end_x_rel) relative to content ROI
|
||
gap_start = None
|
||
for x in range(len(in_gap)):
|
||
if in_gap[x]:
|
||
if gap_start is None:
|
||
gap_start = x
|
||
else:
|
||
if gap_start is not None:
|
||
gap_width = x - gap_start
|
||
if gap_width >= MIN_GAP_WIDTH:
|
||
raw_gaps.append((gap_start, x))
|
||
gap_start = None
|
||
# Handle gap at the right edge
|
||
if gap_start is not None:
|
||
gap_width = len(in_gap) - gap_start
|
||
if gap_width >= MIN_GAP_WIDTH:
|
||
raw_gaps.append((gap_start, len(in_gap)))
|
||
|
||
logger.info(f"ColumnGeometry: {len(raw_gaps)} raw gaps found (threshold={gap_threshold:.4f}, "
|
||
f"min_width={MIN_GAP_WIDTH}px): "
|
||
f"{[(g[0]+left_x, g[1]+left_x, g[1]-g[0]) for g in raw_gaps]}")
|
||
|
||
# --- Step 5: Validate gaps against word bounding boxes ---
|
||
# When using a segment for projection, only validate against words
|
||
# inside that segment — words from sub-headers or other sections
|
||
# would incorrectly overlap with real column gaps.
|
||
if segments and len(segments) > 1:
|
||
seg_top_abs = best_seg[0] # relative to content strip
|
||
seg_bot_abs = best_seg[1]
|
||
segment_words = [wd for wd in word_dicts
|
||
if wd['top'] >= seg_top_abs
|
||
and wd['top'] + wd['height'] <= seg_bot_abs]
|
||
logger.info(f"ColumnGeometry: filtering words to segment: "
|
||
f"{len(segment_words)}/{len(word_dicts)} words")
|
||
else:
|
||
segment_words = word_dicts
|
||
|
||
validated_gaps = []
|
||
for gap_start_rel, gap_end_rel in raw_gaps:
|
||
# Check if any word overlaps with this gap region
|
||
overlapping = False
|
||
for wd in segment_words:
|
||
word_left = wd['left']
|
||
word_right = wd['left'] + wd['width']
|
||
if word_left < gap_end_rel and word_right > gap_start_rel:
|
||
overlapping = True
|
||
break
|
||
|
||
if not overlapping:
|
||
validated_gaps.append((gap_start_rel, gap_end_rel))
|
||
else:
|
||
# Try to shift the gap to avoid the overlapping word(s)
|
||
# Find the tightest word boundaries within the gap region
|
||
min_word_left = content_w
|
||
max_word_right = 0
|
||
for wd in segment_words:
|
||
word_left = wd['left']
|
||
word_right = wd['left'] + wd['width']
|
||
if word_left < gap_end_rel and word_right > gap_start_rel:
|
||
min_word_left = min(min_word_left, word_left)
|
||
max_word_right = max(max_word_right, word_right)
|
||
|
||
# Try gap before the overlapping words
|
||
if min_word_left - gap_start_rel >= MIN_GAP_WIDTH:
|
||
validated_gaps.append((gap_start_rel, min_word_left))
|
||
logger.debug(f"ColumnGeometry: gap shifted left to avoid word at {min_word_left}")
|
||
# Try gap after the overlapping words
|
||
elif gap_end_rel - max_word_right >= MIN_GAP_WIDTH:
|
||
validated_gaps.append((max_word_right, gap_end_rel))
|
||
logger.debug(f"ColumnGeometry: gap shifted right to avoid word at {max_word_right}")
|
||
else:
|
||
logger.debug(f"ColumnGeometry: gap [{gap_start_rel}..{gap_end_rel}] "
|
||
f"discarded (word overlap, no room to shift)")
|
||
|
||
logger.info(f"ColumnGeometry: {len(validated_gaps)} gaps after word validation: "
|
||
f"{[(g[0]+left_x, g[1]+left_x, g[1]-g[0]) for g in validated_gaps]}")
|
||
|
||
# --- Step 5b: Word-coverage gap detection (fallback for noisy scans) ---
|
||
# When pixel-based projection fails (e.g. due to illustrations or colored
|
||
# bands), use word bounding boxes to find clear vertical gaps. This is
|
||
# immune to decorative graphics that Tesseract doesn't recognise as words.
|
||
if len(validated_gaps) < 2:
|
||
logger.info("ColumnGeometry: < 2 pixel-gaps, trying word-coverage gaps")
|
||
word_coverage = np.zeros(content_w, dtype=np.int32)
|
||
for wd in segment_words:
|
||
wl = max(0, wd['left'])
|
||
wr = min(wd['left'] + wd['width'], content_w)
|
||
if wr > wl:
|
||
word_coverage[wl:wr] += 1
|
||
|
||
# Smooth slightly to bridge tiny 1-2px noise gaps between words
|
||
wc_kernel = max(3, content_w // 300)
|
||
if wc_kernel % 2 == 0:
|
||
wc_kernel += 1
|
||
wc_smooth = np.convolve(word_coverage.astype(float),
|
||
np.ones(wc_kernel) / wc_kernel, mode='same')
|
||
|
||
wc_in_gap = wc_smooth < 0.5 # effectively zero word coverage
|
||
WC_MIN_GAP = max(4, content_w // 300)
|
||
|
||
wc_gaps: List[Tuple[int, int]] = []
|
||
wc_gap_start = None
|
||
for x in range(len(wc_in_gap)):
|
||
if wc_in_gap[x]:
|
||
if wc_gap_start is None:
|
||
wc_gap_start = x
|
||
else:
|
||
if wc_gap_start is not None:
|
||
if x - wc_gap_start >= WC_MIN_GAP:
|
||
wc_gaps.append((wc_gap_start, x))
|
||
wc_gap_start = None
|
||
if wc_gap_start is not None and len(wc_in_gap) - wc_gap_start >= WC_MIN_GAP:
|
||
wc_gaps.append((wc_gap_start, len(wc_in_gap)))
|
||
|
||
logger.info(f"ColumnGeometry: {len(wc_gaps)} word-coverage gaps found "
|
||
f"(min_width={WC_MIN_GAP}px): "
|
||
f"{[(g[0]+left_x, g[1]+left_x, g[1]-g[0]) for g in wc_gaps]}")
|
||
|
||
if len(wc_gaps) >= 2:
|
||
validated_gaps = wc_gaps
|
||
|
||
# --- Step 6: Fallback to clustering if too few gaps ---
|
||
if len(validated_gaps) < 2:
|
||
logger.info("ColumnGeometry: < 2 gaps found, falling back to clustering")
|
||
return _detect_columns_by_clustering(
|
||
word_dicts, left_edges, edge_word_indices,
|
||
content_w, content_h, left_x, right_x, top_y, bottom_y, inv,
|
||
)
|
||
|
||
# --- Step 7: Derive column boundaries from gaps ---
|
||
# Sort gaps by position
|
||
validated_gaps.sort(key=lambda g: g[0])
|
||
|
||
# Identify margin gaps (first and last) vs interior gaps
|
||
# A margin gap touches the edge of the content area (within 2% tolerance)
|
||
edge_tolerance = max(10, int(content_w * 0.02))
|
||
|
||
is_left_margin = validated_gaps[0][0] <= edge_tolerance
|
||
is_right_margin = validated_gaps[-1][1] >= content_w - edge_tolerance
|
||
|
||
# Interior gaps define column boundaries
|
||
# Column starts at the end of a gap, ends at the start of the next gap
|
||
col_starts = []
|
||
|
||
if is_left_margin:
|
||
# First column starts after the left margin gap
|
||
first_gap_end = validated_gaps[0][1]
|
||
interior_gaps = validated_gaps[1:]
|
||
else:
|
||
# No left margin gap — first column starts at content left edge
|
||
first_gap_end = 0
|
||
interior_gaps = validated_gaps[:]
|
||
|
||
if is_right_margin:
|
||
# Last gap is right margin — don't use it as column start
|
||
interior_gaps_for_boundaries = interior_gaps[:-1]
|
||
right_boundary = validated_gaps[-1][0] # last column ends at right margin gap start
|
||
else:
|
||
interior_gaps_for_boundaries = interior_gaps
|
||
right_boundary = content_w
|
||
|
||
# First column
|
||
col_starts.append(left_x + first_gap_end)
|
||
|
||
# Columns between interior gaps
|
||
for gap_start_rel, gap_end_rel in interior_gaps_for_boundaries:
|
||
col_starts.append(left_x + gap_end_rel)
|
||
|
||
# Count words per column region (for logging)
|
||
col_start_counts = []
|
||
for i, start_x in enumerate(col_starts):
|
||
if i + 1 < len(col_starts):
|
||
next_start = col_starts[i + 1]
|
||
else:
|
||
# Rightmost column always extends to full image width (w).
|
||
# The page margin contains only white space — extending the OCR
|
||
# crop to the image edge is safe and prevents text near the right
|
||
# border from being cut off.
|
||
next_start = w
|
||
|
||
col_left_rel = start_x - left_x
|
||
col_right_rel = next_start - left_x
|
||
n_words_in_col = sum(1 for w in word_dicts
|
||
if col_left_rel <= w['left'] < col_right_rel)
|
||
col_start_counts.append((start_x, n_words_in_col))
|
||
|
||
logger.info(f"ColumnGeometry: {len(col_starts)} columns from {len(validated_gaps)} gaps "
|
||
f"(left_margin={is_left_margin}, right_margin={is_right_margin}): "
|
||
f"{col_start_counts}")
|
||
|
||
# --- Step 8: Build ColumnGeometry objects ---
|
||
# Determine right edge for each column
|
||
all_boundaries = []
|
||
for i, start_x in enumerate(col_starts):
|
||
if i + 1 < len(col_starts):
|
||
end_x = col_starts[i + 1]
|
||
else:
|
||
# Rightmost column always extends to full image width (w).
|
||
end_x = w
|
||
all_boundaries.append((start_x, end_x))
|
||
|
||
geometries = []
|
||
for i, (start_x, end_x) in enumerate(all_boundaries):
|
||
col_width = end_x - start_x
|
||
col_left_rel = start_x - left_x
|
||
col_right_rel = col_left_rel + col_width
|
||
col_words = [w for w in word_dicts
|
||
if col_left_rel <= w['left'] < col_right_rel]
|
||
|
||
geometries.append(ColumnGeometry(
|
||
index=i,
|
||
x=start_x,
|
||
y=top_y,
|
||
width=col_width,
|
||
height=content_h,
|
||
word_count=len(col_words),
|
||
words=col_words,
|
||
width_ratio=col_width / content_w if content_w > 0 else 0.0,
|
||
))
|
||
|
||
logger.info(f"ColumnGeometry: {len(geometries)} columns: "
|
||
f"{[(g.index, g.x, g.width, g.word_count) for g in geometries]}")
|
||
|
||
# --- Step 9: Filter phantom narrow columns ---
|
||
# Tiny spurious gaps (e.g. 11px + 35px adjacent) can create very narrow
|
||
# columns (< 3% of content width) with zero or no words. These are not
|
||
# real columns — remove them and close the gap between neighbors.
|
||
min_real_col_w = max(20, int(content_w * 0.03))
|
||
filtered_geoms = [g for g in geometries
|
||
if not (g.word_count < 3 and g.width < min_real_col_w)]
|
||
if len(filtered_geoms) < len(geometries):
|
||
n_removed = len(geometries) - len(filtered_geoms)
|
||
logger.info(f"ColumnGeometry: removed {n_removed} phantom column(s) "
|
||
f"(width < {min_real_col_w}px and words < 3)")
|
||
# Extend each remaining column to close gaps with its right neighbor
|
||
for i, g in enumerate(filtered_geoms):
|
||
if i + 1 < len(filtered_geoms):
|
||
g.width = filtered_geoms[i + 1].x - g.x
|
||
else:
|
||
g.width = w - g.x
|
||
g.index = i
|
||
col_left_rel = g.x - left_x
|
||
col_right_rel = col_left_rel + g.width
|
||
g.words = [w for w in word_dicts
|
||
if col_left_rel <= w['left'] < col_right_rel]
|
||
g.word_count = len(g.words)
|
||
geometries = filtered_geoms
|
||
logger.info(f"ColumnGeometry: {len(geometries)} columns after phantom filter: "
|
||
f"{[(g.index, g.x, g.width, g.word_count) for g in geometries]}")
|
||
|
||
return (geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv)
|
||
|
||
|
||
def expand_narrow_columns(
|
||
geometries: List[ColumnGeometry],
|
||
content_w: int,
|
||
left_x: int,
|
||
word_dicts: List[Dict],
|
||
) -> List[ColumnGeometry]:
|
||
"""Expand narrow columns into adjacent whitespace gaps.
|
||
|
||
Narrow columns (marker, page_ref, < 10% content width) often lose
|
||
content at image edges due to residual shear. This expands them toward
|
||
the neighbouring column, but never past 40% of the gap or past the
|
||
nearest word in the neighbour.
|
||
|
||
Must be called AFTER _detect_sub_columns() so that sub-column splits
|
||
(which create the narrowest columns) have already happened.
|
||
"""
|
||
_NARROW_THRESHOLD_PCT = 10.0
|
||
_MIN_WORD_MARGIN = 4
|
||
|
||
if len(geometries) < 2:
|
||
return geometries
|
||
|
||
logger.info("ExpandNarrowCols: input %d cols: %s",
|
||
len(geometries),
|
||
[(i, g.x, g.width, round(g.width / content_w * 100, 1))
|
||
for i, g in enumerate(geometries)])
|
||
|
||
for i, g in enumerate(geometries):
|
||
col_pct = g.width / content_w * 100 if content_w > 0 else 100
|
||
if col_pct >= _NARROW_THRESHOLD_PCT:
|
||
continue
|
||
|
||
expanded = False
|
||
orig_pct = col_pct
|
||
|
||
# --- try expanding to the LEFT ---
|
||
if i > 0:
|
||
left_nb = geometries[i - 1]
|
||
# Gap can be 0 if sub-column split created adjacent columns.
|
||
# In that case, look at where the neighbor's rightmost words
|
||
# actually are — there may be unused space we can claim.
|
||
nb_words_right = [wd['left'] + wd.get('width', 0)
|
||
for wd in left_nb.words]
|
||
if nb_words_right:
|
||
rightmost_word_abs = left_x + max(nb_words_right)
|
||
safe_left_abs = rightmost_word_abs + _MIN_WORD_MARGIN
|
||
else:
|
||
# No words in neighbor → we can take up to neighbor's start
|
||
safe_left_abs = left_nb.x + _MIN_WORD_MARGIN
|
||
|
||
if safe_left_abs < g.x:
|
||
g.width += (g.x - safe_left_abs)
|
||
g.x = safe_left_abs
|
||
expanded = True
|
||
|
||
# --- try expanding to the RIGHT ---
|
||
if i + 1 < len(geometries):
|
||
right_nb = geometries[i + 1]
|
||
nb_words_left = [wd['left'] for wd in right_nb.words]
|
||
if nb_words_left:
|
||
leftmost_word_abs = left_x + min(nb_words_left)
|
||
safe_right_abs = leftmost_word_abs - _MIN_WORD_MARGIN
|
||
else:
|
||
safe_right_abs = right_nb.x + right_nb.width - _MIN_WORD_MARGIN
|
||
|
||
cur_right = g.x + g.width
|
||
if safe_right_abs > cur_right:
|
||
g.width = safe_right_abs - g.x
|
||
expanded = True
|
||
|
||
if expanded:
|
||
col_left_rel = g.x - left_x
|
||
col_right_rel = col_left_rel + g.width
|
||
g.words = [wd for wd in word_dicts
|
||
if col_left_rel <= wd['left'] < col_right_rel]
|
||
g.word_count = len(g.words)
|
||
g.width_ratio = g.width / content_w if content_w > 0 else 0.0
|
||
logger.info(
|
||
"ExpandNarrowCols: col %d (%.1f%% → %.1f%%) x=%d w=%d words=%d",
|
||
i, orig_pct, g.width / content_w * 100, g.x, g.width, g.word_count)
|
||
|
||
# --- Shrink overlapping neighbors to match new boundaries ---
|
||
# Left neighbor: its right edge must not exceed our new left edge
|
||
if i > 0:
|
||
left_nb = geometries[i - 1]
|
||
nb_right = left_nb.x + left_nb.width
|
||
if nb_right > g.x:
|
||
left_nb.width = g.x - left_nb.x
|
||
if left_nb.width < 0:
|
||
left_nb.width = 0
|
||
left_nb.width_ratio = left_nb.width / content_w if content_w > 0 else 0.0
|
||
# Re-assign words
|
||
nb_left_rel = left_nb.x - left_x
|
||
nb_right_rel = nb_left_rel + left_nb.width
|
||
left_nb.words = [wd for wd in word_dicts
|
||
if nb_left_rel <= wd['left'] < nb_right_rel]
|
||
left_nb.word_count = len(left_nb.words)
|
||
|
||
# Right neighbor: its left edge must not be before our new right edge
|
||
if i + 1 < len(geometries):
|
||
right_nb = geometries[i + 1]
|
||
my_right = g.x + g.width
|
||
if right_nb.x < my_right:
|
||
old_right_edge = right_nb.x + right_nb.width
|
||
right_nb.x = my_right
|
||
right_nb.width = old_right_edge - right_nb.x
|
||
if right_nb.width < 0:
|
||
right_nb.width = 0
|
||
right_nb.width_ratio = right_nb.width / content_w if content_w > 0 else 0.0
|
||
# Re-assign words
|
||
nb_left_rel = right_nb.x - left_x
|
||
nb_right_rel = nb_left_rel + right_nb.width
|
||
right_nb.words = [wd for wd in word_dicts
|
||
if nb_left_rel <= wd['left'] < nb_right_rel]
|
||
right_nb.word_count = len(right_nb.words)
|
||
|
||
return geometries
|
||
|
||
|
||
# =============================================================================
|
||
# Row Geometry Detection (horizontal whitespace-gap analysis)
|
||
# =============================================================================
|
||
|
||
def detect_row_geometry(
|
||
inv: np.ndarray,
|
||
word_dicts: List[Dict],
|
||
left_x: int, right_x: int,
|
||
top_y: int, bottom_y: int,
|
||
) -> List['RowGeometry']:
|
||
"""Detect row geometry using horizontal whitespace-gap analysis.
|
||
|
||
Mirrors the vertical gap approach used for columns, but operates on
|
||
horizontal projection profiles to find gaps between text lines.
|
||
Also classifies header/footer rows based on gap size.
|
||
|
||
Args:
|
||
inv: Inverted binarized image (white text on black bg, full page).
|
||
word_dicts: Word bounding boxes from Tesseract (relative to content ROI).
|
||
left_x, right_x: Absolute X bounds of the content area.
|
||
top_y, bottom_y: Absolute Y bounds of the content area.
|
||
|
||
Returns:
|
||
List of RowGeometry objects sorted top to bottom.
|
||
"""
|
||
content_w = right_x - left_x
|
||
content_h = bottom_y - top_y
|
||
|
||
if content_h < 10 or content_w < 10:
|
||
logger.warning("detect_row_geometry: content area too small")
|
||
return []
|
||
|
||
# --- Step 1: Horizontal projection profile (text-only, images masked out) ---
|
||
content_strip = inv[top_y:bottom_y, left_x:right_x]
|
||
|
||
# Build a word-coverage mask so that image regions (high ink density but no
|
||
# Tesseract words) are ignored. Only pixels within/near word bounding boxes
|
||
# contribute to the projection. This prevents large illustrations from
|
||
# merging multiple vocabulary rows into one.
|
||
WORD_PAD_Y = max(4, content_h // 300) # small vertical padding around words
|
||
word_mask = np.zeros((content_h, content_w), dtype=np.uint8)
|
||
for wd in word_dicts:
|
||
y1 = max(0, wd['top'] - WORD_PAD_Y)
|
||
y2 = min(content_h, wd['top'] + wd['height'] + WORD_PAD_Y)
|
||
x1 = max(0, wd['left'])
|
||
x2 = min(content_w, wd['left'] + wd['width'])
|
||
word_mask[y1:y2, x1:x2] = 255
|
||
|
||
masked_strip = cv2.bitwise_and(content_strip, word_mask)
|
||
h_proj = np.sum(masked_strip, axis=1).astype(float)
|
||
h_proj_norm = h_proj / (content_w * 255) if content_w > 0 else h_proj
|
||
|
||
# --- Step 2: Smoothing + threshold ---
|
||
kernel_size = max(3, content_h // 200)
|
||
if kernel_size % 2 == 0:
|
||
kernel_size += 1
|
||
h_smooth = np.convolve(h_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
|
||
|
||
median_density = float(np.median(h_smooth[h_smooth > 0])) if np.any(h_smooth > 0) else 0.01
|
||
gap_threshold = max(median_density * 0.15, 0.003)
|
||
|
||
in_gap = h_smooth < gap_threshold
|
||
MIN_GAP_HEIGHT = max(3, content_h // 500)
|
||
|
||
# --- Step 3: Collect contiguous gap regions ---
|
||
raw_gaps = [] # (start_y_rel, end_y_rel) relative to content ROI
|
||
gap_start = None
|
||
for y in range(len(in_gap)):
|
||
if in_gap[y]:
|
||
if gap_start is None:
|
||
gap_start = y
|
||
else:
|
||
if gap_start is not None:
|
||
gap_height = y - gap_start
|
||
if gap_height >= MIN_GAP_HEIGHT:
|
||
raw_gaps.append((gap_start, y))
|
||
gap_start = None
|
||
if gap_start is not None:
|
||
gap_height = len(in_gap) - gap_start
|
||
if gap_height >= MIN_GAP_HEIGHT:
|
||
raw_gaps.append((gap_start, len(in_gap)))
|
||
|
||
logger.info(f"RowGeometry: {len(raw_gaps)} raw gaps found (threshold={gap_threshold:.4f}, "
|
||
f"min_height={MIN_GAP_HEIGHT}px)")
|
||
|
||
# --- Step 4: Validate gaps against word bounding boxes ---
|
||
validated_gaps = []
|
||
for gap_start_rel, gap_end_rel in raw_gaps:
|
||
overlapping = False
|
||
for wd in word_dicts:
|
||
word_top = wd['top']
|
||
word_bottom = wd['top'] + wd['height']
|
||
if word_top < gap_end_rel and word_bottom > gap_start_rel:
|
||
overlapping = True
|
||
break
|
||
|
||
if not overlapping:
|
||
validated_gaps.append((gap_start_rel, gap_end_rel))
|
||
else:
|
||
# Try to shift the gap to avoid overlapping words
|
||
min_word_top = content_h
|
||
max_word_bottom = 0
|
||
for wd in word_dicts:
|
||
word_top = wd['top']
|
||
word_bottom = wd['top'] + wd['height']
|
||
if word_top < gap_end_rel and word_bottom > gap_start_rel:
|
||
min_word_top = min(min_word_top, word_top)
|
||
max_word_bottom = max(max_word_bottom, word_bottom)
|
||
|
||
if min_word_top - gap_start_rel >= MIN_GAP_HEIGHT:
|
||
validated_gaps.append((gap_start_rel, min_word_top))
|
||
elif gap_end_rel - max_word_bottom >= MIN_GAP_HEIGHT:
|
||
validated_gaps.append((max_word_bottom, gap_end_rel))
|
||
else:
|
||
logger.debug(f"RowGeometry: gap [{gap_start_rel}..{gap_end_rel}] "
|
||
f"discarded (word overlap, no room to shift)")
|
||
|
||
logger.info(f"RowGeometry: {len(validated_gaps)} gaps after word validation")
|
||
|
||
# --- Fallback if too few gaps ---
|
||
if len(validated_gaps) < 2:
|
||
logger.info("RowGeometry: < 2 gaps found, falling back to word grouping")
|
||
return _build_rows_from_word_grouping(
|
||
word_dicts, left_x, right_x, top_y, bottom_y, content_w, content_h,
|
||
)
|
||
|
||
validated_gaps.sort(key=lambda g: g[0])
|
||
|
||
# --- Step 5: Header/footer detection via gap size ---
|
||
HEADER_FOOTER_ZONE = 0.15
|
||
GAP_MULTIPLIER = 2.0
|
||
|
||
gap_sizes = [g[1] - g[0] for g in validated_gaps]
|
||
median_gap = float(np.median(gap_sizes)) if gap_sizes else 0
|
||
large_gap_threshold = median_gap * GAP_MULTIPLIER
|
||
|
||
header_boundary_rel = None # y below which is header
|
||
footer_boundary_rel = None # y above which is footer
|
||
|
||
header_zone_limit = int(content_h * HEADER_FOOTER_ZONE)
|
||
footer_zone_start = int(content_h * (1.0 - HEADER_FOOTER_ZONE))
|
||
|
||
# Find largest gap in header zone
|
||
best_header_gap = None
|
||
for gs, ge in validated_gaps:
|
||
gap_mid = (gs + ge) / 2
|
||
gap_size = ge - gs
|
||
if gap_mid < header_zone_limit and gap_size > large_gap_threshold:
|
||
if best_header_gap is None or gap_size > (best_header_gap[1] - best_header_gap[0]):
|
||
best_header_gap = (gs, ge)
|
||
|
||
if best_header_gap is not None:
|
||
header_boundary_rel = best_header_gap[1]
|
||
logger.info(f"RowGeometry: header boundary at y_rel={header_boundary_rel} "
|
||
f"(gap={best_header_gap[1] - best_header_gap[0]}px, "
|
||
f"median_gap={median_gap:.0f}px)")
|
||
|
||
# Find largest gap in footer zone
|
||
best_footer_gap = None
|
||
for gs, ge in validated_gaps:
|
||
gap_mid = (gs + ge) / 2
|
||
gap_size = ge - gs
|
||
if gap_mid > footer_zone_start and gap_size > large_gap_threshold:
|
||
if best_footer_gap is None or gap_size > (best_footer_gap[1] - best_footer_gap[0]):
|
||
best_footer_gap = (gs, ge)
|
||
|
||
if best_footer_gap is not None:
|
||
footer_boundary_rel = best_footer_gap[0]
|
||
logger.info(f"RowGeometry: footer boundary at y_rel={footer_boundary_rel} "
|
||
f"(gap={best_footer_gap[1] - best_footer_gap[0]}px)")
|
||
|
||
# --- Step 6: Build RowGeometry objects from gaps ---
|
||
# Rows are the spans between gaps
|
||
row_boundaries = [] # (start_y_rel, end_y_rel)
|
||
|
||
# Top of content to first gap
|
||
if validated_gaps[0][0] > MIN_GAP_HEIGHT:
|
||
row_boundaries.append((0, validated_gaps[0][0]))
|
||
|
||
# Between gaps
|
||
for i in range(len(validated_gaps) - 1):
|
||
row_start = validated_gaps[i][1]
|
||
row_end = validated_gaps[i + 1][0]
|
||
if row_end - row_start > 0:
|
||
row_boundaries.append((row_start, row_end))
|
||
|
||
# Last gap to bottom of content
|
||
if validated_gaps[-1][1] < content_h - MIN_GAP_HEIGHT:
|
||
row_boundaries.append((validated_gaps[-1][1], content_h))
|
||
|
||
rows = []
|
||
for idx, (row_start_rel, row_end_rel) in enumerate(row_boundaries):
|
||
# Determine row type
|
||
row_mid = (row_start_rel + row_end_rel) / 2
|
||
if header_boundary_rel is not None and row_mid < header_boundary_rel:
|
||
row_type = 'header'
|
||
elif footer_boundary_rel is not None and row_mid > footer_boundary_rel:
|
||
row_type = 'footer'
|
||
else:
|
||
row_type = 'content'
|
||
|
||
# Collect words in this row
|
||
row_words = [w for w in word_dicts
|
||
if w['top'] + w['height'] / 2 >= row_start_rel
|
||
and w['top'] + w['height'] / 2 < row_end_rel]
|
||
|
||
# Gap before this row
|
||
gap_before = 0
|
||
if idx == 0 and validated_gaps[0][0] > 0:
|
||
gap_before = validated_gaps[0][0]
|
||
elif idx > 0:
|
||
# Find the gap just before this row boundary
|
||
for gs, ge in validated_gaps:
|
||
if ge == row_start_rel:
|
||
gap_before = ge - gs
|
||
break
|
||
|
||
rows.append(RowGeometry(
|
||
index=idx,
|
||
x=left_x,
|
||
y=top_y + row_start_rel,
|
||
width=content_w,
|
||
height=row_end_rel - row_start_rel,
|
||
word_count=len(row_words),
|
||
words=row_words,
|
||
row_type=row_type,
|
||
gap_before=gap_before,
|
||
))
|
||
|
||
# --- Step 7: Word-center grid regularization ---
|
||
# Derive precise row boundaries from word vertical centers. Detects
|
||
# section breaks (headings, paragraphs) and builds per-section grids.
|
||
rows = _regularize_row_grid(rows, word_dicts, left_x, right_x, top_y,
|
||
content_w, content_h, inv)
|
||
|
||
type_counts = {}
|
||
for r in rows:
|
||
type_counts[r.row_type] = type_counts.get(r.row_type, 0) + 1
|
||
logger.info(f"RowGeometry: {len(rows)} rows detected: {type_counts}")
|
||
|
||
return rows
|
||
|
||
|
||
def _regularize_row_grid(
|
||
rows: List['RowGeometry'],
|
||
word_dicts: List[Dict],
|
||
left_x: int, right_x: int,
|
||
top_y: int,
|
||
content_w: int, content_h: int,
|
||
inv: np.ndarray,
|
||
) -> List['RowGeometry']:
|
||
"""Rebuild row boundaries from word center-lines with section-break awareness.
|
||
|
||
Instead of overlaying a rigid grid, this derives row positions bottom-up
|
||
from the words themselves:
|
||
|
||
1. Group words into line clusters (by Y proximity).
|
||
2. For each cluster compute center_y (median of word vertical centers)
|
||
and letter_height (median of word heights).
|
||
3. Compute the pitch (distance between consecutive centers).
|
||
4. Detect section breaks where the gap is >1.8× the median pitch
|
||
(headings, sub-headings, paragraph breaks).
|
||
5. Within each section, use the local pitch to place row boundaries
|
||
at the midpoints between consecutive centers.
|
||
6. Validate that ≥85% of words land in a grid row; otherwise fall back.
|
||
|
||
Header/footer rows from the gap-based detection are preserved.
|
||
"""
|
||
content_rows = [r for r in rows if r.row_type == 'content']
|
||
non_content = [r for r in rows if r.row_type != 'content']
|
||
|
||
if len(content_rows) < 5:
|
||
return rows
|
||
|
||
# --- Step A: Group ALL words into line clusters ---
|
||
# Collect words that belong to content rows (deduplicated)
|
||
content_words: List[Dict] = []
|
||
seen_keys: set = set()
|
||
for r in content_rows:
|
||
for w in r.words:
|
||
key = (w['left'], w['top'], w['width'], w['height'])
|
||
if key not in seen_keys:
|
||
seen_keys.add(key)
|
||
content_words.append(w)
|
||
|
||
if len(content_words) < 5:
|
||
return rows
|
||
|
||
# Compute median word height (excluding outliers like tall brackets/IPA)
|
||
word_heights = sorted(w['height'] for w in content_words)
|
||
median_wh = word_heights[len(word_heights) // 2]
|
||
|
||
# Compute median gap-based row height — this is the actual line height
|
||
# as detected by the horizontal projection. We use 40% of this as
|
||
# grouping tolerance. This is much more reliable than using word height
|
||
# alone, because words on the same line can have very different heights
|
||
# (e.g. lowercase vs uppercase, brackets, phonetic symbols).
|
||
gap_row_heights = sorted(r.height for r in content_rows)
|
||
median_row_h = gap_row_heights[len(gap_row_heights) // 2]
|
||
|
||
# Tolerance: 40% of row height. Words on the same line should have
|
||
# centers within this range. Even if a word's bbox is taller/shorter,
|
||
# its center should stay within half a row height of the line center.
|
||
y_tol = max(10, int(median_row_h * 0.4))
|
||
|
||
# Sort by center_y, then group by proximity
|
||
words_by_center = sorted(content_words,
|
||
key=lambda w: (w['top'] + w['height'] / 2, w['left']))
|
||
line_clusters: List[List[Dict]] = []
|
||
current_line: List[Dict] = [words_by_center[0]]
|
||
current_center = words_by_center[0]['top'] + words_by_center[0]['height'] / 2
|
||
|
||
for w in words_by_center[1:]:
|
||
w_center = w['top'] + w['height'] / 2
|
||
if abs(w_center - current_center) <= y_tol:
|
||
current_line.append(w)
|
||
else:
|
||
current_line.sort(key=lambda w: w['left'])
|
||
line_clusters.append(current_line)
|
||
current_line = [w]
|
||
current_center = w_center
|
||
|
||
if current_line:
|
||
current_line.sort(key=lambda w: w['left'])
|
||
line_clusters.append(current_line)
|
||
|
||
if len(line_clusters) < 3:
|
||
return rows
|
||
|
||
# --- Step B: Compute center_y per cluster ---
|
||
# center_y = median of (word_top + word_height/2) across all words in cluster
|
||
# letter_h = median of word heights, but excluding outlier-height words
|
||
# (>2× median) so that tall brackets/IPA don't skew the height
|
||
cluster_info: List[Dict] = []
|
||
for cl_words in line_clusters:
|
||
centers = [w['top'] + w['height'] / 2 for w in cl_words]
|
||
# Filter outlier heights for letter_h computation
|
||
normal_heights = [w['height'] for w in cl_words
|
||
if w['height'] <= median_wh * 2.0]
|
||
if not normal_heights:
|
||
normal_heights = [w['height'] for w in cl_words]
|
||
center_y = float(np.median(centers))
|
||
letter_h = float(np.median(normal_heights))
|
||
cluster_info.append({
|
||
'center_y_rel': center_y, # relative to content ROI
|
||
'center_y_abs': center_y + top_y, # absolute
|
||
'letter_h': letter_h,
|
||
'words': cl_words,
|
||
})
|
||
|
||
cluster_info.sort(key=lambda c: c['center_y_rel'])
|
||
|
||
# --- Step B2: Merge clusters that are too close together ---
|
||
# Even with center-based grouping, some edge cases can produce
|
||
# spurious clusters. Merge any pair whose centers are closer
|
||
# than 30% of the row height (they're definitely the same text line).
|
||
merge_threshold = max(8, median_row_h * 0.3)
|
||
merged: List[Dict] = [cluster_info[0]]
|
||
for cl in cluster_info[1:]:
|
||
prev = merged[-1]
|
||
if cl['center_y_rel'] - prev['center_y_rel'] < merge_threshold:
|
||
# Merge: combine words, recompute center
|
||
combined_words = prev['words'] + cl['words']
|
||
centers = [w['top'] + w['height'] / 2 for w in combined_words]
|
||
normal_heights = [w['height'] for w in combined_words
|
||
if w['height'] <= median_wh * 2.0]
|
||
if not normal_heights:
|
||
normal_heights = [w['height'] for w in combined_words]
|
||
prev['center_y_rel'] = float(np.median(centers))
|
||
prev['center_y_abs'] = prev['center_y_rel'] + top_y
|
||
prev['letter_h'] = float(np.median(normal_heights))
|
||
prev['words'] = combined_words
|
||
else:
|
||
merged.append(cl)
|
||
|
||
cluster_info = merged
|
||
|
||
if len(cluster_info) < 3:
|
||
return rows
|
||
|
||
# --- Step C: Compute pitches and detect section breaks ---
|
||
pitches: List[float] = []
|
||
for i in range(1, len(cluster_info)):
|
||
pitch = cluster_info[i]['center_y_rel'] - cluster_info[i - 1]['center_y_rel']
|
||
pitches.append(pitch)
|
||
|
||
if not pitches:
|
||
return rows
|
||
|
||
median_pitch = float(np.median(pitches))
|
||
if median_pitch <= 5:
|
||
return rows
|
||
|
||
# A section break is where the gap between line centers is much larger
|
||
# than the normal pitch (sub-headings, section titles, etc.)
|
||
BREAK_FACTOR = 1.8
|
||
|
||
# --- Step D: Build sections (groups of consecutive lines with normal spacing) ---
|
||
sections: List[List[Dict]] = []
|
||
current_section: List[Dict] = [cluster_info[0]]
|
||
|
||
for i in range(1, len(cluster_info)):
|
||
gap = cluster_info[i]['center_y_rel'] - cluster_info[i - 1]['center_y_rel']
|
||
if gap > median_pitch * BREAK_FACTOR:
|
||
sections.append(current_section)
|
||
current_section = [cluster_info[i]]
|
||
else:
|
||
current_section.append(cluster_info[i])
|
||
|
||
if current_section:
|
||
sections.append(current_section)
|
||
|
||
# --- Step E: Build row boundaries per section ---
|
||
grid_rows: List[RowGeometry] = []
|
||
|
||
for section in sections:
|
||
if not section:
|
||
continue
|
||
|
||
if len(section) == 1:
|
||
# Single-line section (likely a heading)
|
||
cl = section[0]
|
||
half_h = max(cl['letter_h'], median_pitch * 0.4)
|
||
row_top = cl['center_y_abs'] - half_h
|
||
row_bot = cl['center_y_abs'] + half_h
|
||
grid_rows.append(RowGeometry(
|
||
index=0,
|
||
x=left_x,
|
||
y=round(row_top),
|
||
width=content_w,
|
||
height=round(row_bot - row_top),
|
||
word_count=len(cl['words']),
|
||
words=cl['words'],
|
||
row_type='content',
|
||
gap_before=0,
|
||
))
|
||
continue
|
||
|
||
# Compute local pitch for this section
|
||
local_pitches = []
|
||
for i in range(1, len(section)):
|
||
local_pitches.append(
|
||
section[i]['center_y_rel'] - section[i - 1]['center_y_rel']
|
||
)
|
||
local_pitch = float(np.median(local_pitches)) if local_pitches else median_pitch
|
||
|
||
# Row boundaries are placed at midpoints between consecutive centers.
|
||
# First row: top = center - local_pitch/2
|
||
# Last row: bottom = center + local_pitch/2
|
||
for i, cl in enumerate(section):
|
||
if i == 0:
|
||
row_top = cl['center_y_abs'] - local_pitch / 2
|
||
else:
|
||
# Midpoint between this center and previous center
|
||
prev_center = section[i - 1]['center_y_abs']
|
||
row_top = (prev_center + cl['center_y_abs']) / 2
|
||
|
||
if i == len(section) - 1:
|
||
row_bot = cl['center_y_abs'] + local_pitch / 2
|
||
else:
|
||
next_center = section[i + 1]['center_y_abs']
|
||
row_bot = (cl['center_y_abs'] + next_center) / 2
|
||
|
||
# Clamp to reasonable bounds
|
||
row_top = max(top_y, row_top)
|
||
row_bot = min(top_y + content_h, row_bot)
|
||
|
||
if row_bot - row_top < 5:
|
||
continue
|
||
|
||
grid_rows.append(RowGeometry(
|
||
index=0,
|
||
x=left_x,
|
||
y=round(row_top),
|
||
width=content_w,
|
||
height=round(row_bot - row_top),
|
||
word_count=len(cl['words']),
|
||
words=cl['words'],
|
||
row_type='content',
|
||
gap_before=0,
|
||
))
|
||
|
||
if not grid_rows:
|
||
return rows
|
||
|
||
# --- Step F: Re-assign words to grid rows ---
|
||
# Words may have shifted slightly; assign each word to the row whose
|
||
# center is closest to the word's vertical center.
|
||
for gr in grid_rows:
|
||
gr.words = []
|
||
|
||
for w in content_words:
|
||
w_center = w['top'] + top_y + w['height'] / 2
|
||
best_row = None
|
||
best_dist = float('inf')
|
||
for gr in grid_rows:
|
||
row_center = gr.y + gr.height / 2
|
||
dist = abs(w_center - row_center)
|
||
if dist < best_dist:
|
||
best_dist = dist
|
||
best_row = gr
|
||
if best_row is not None and best_dist < median_pitch:
|
||
best_row.words.append(w)
|
||
|
||
for gr in grid_rows:
|
||
gr.word_count = len(gr.words)
|
||
|
||
# --- Step G: Validate ---
|
||
words_placed = sum(gr.word_count for gr in grid_rows)
|
||
if len(content_words) > 0:
|
||
match_ratio = words_placed / len(content_words)
|
||
if match_ratio < 0.85:
|
||
logger.info(f"RowGrid: word-center grid only matches {match_ratio:.0%} "
|
||
f"of words, keeping gap-based rows")
|
||
return rows
|
||
|
||
# Remove empty grid rows (no words assigned)
|
||
grid_rows = [gr for gr in grid_rows if gr.word_count > 0]
|
||
|
||
# --- Step H: Merge header/footer + re-index ---
|
||
result = list(non_content) + grid_rows
|
||
result.sort(key=lambda r: r.y)
|
||
for i, r in enumerate(result):
|
||
r.index = i
|
||
|
||
row_heights = [gr.height for gr in grid_rows]
|
||
min_h = min(row_heights) if row_heights else 0
|
||
max_h = max(row_heights) if row_heights else 0
|
||
logger.info(f"RowGrid: word-center grid applied "
|
||
f"(median_pitch={median_pitch:.0f}px, median_row_h={median_row_h}px, median_wh={median_wh}px, "
|
||
f"y_tol={y_tol}px, {len(line_clusters)} clusters→{len(cluster_info)} merged, "
|
||
f"{len(sections)} sections, "
|
||
f"{len(grid_rows)} grid rows [h={min_h}-{max_h}px], "
|
||
f"was {len(content_rows)} gap-based rows)")
|
||
|
||
return result
|
||
|
||
|
||
def _build_rows_from_word_grouping(
|
||
word_dicts: List[Dict],
|
||
left_x: int, right_x: int,
|
||
top_y: int, bottom_y: int,
|
||
content_w: int, content_h: int,
|
||
) -> List['RowGeometry']:
|
||
"""Fallback: build rows by grouping words by Y position.
|
||
|
||
Uses _group_words_into_lines() with a generous tolerance.
|
||
No header/footer detection in fallback mode.
|
||
"""
|
||
if not word_dicts:
|
||
return []
|
||
|
||
y_tolerance = max(20, content_h // 100)
|
||
lines = _group_words_into_lines(word_dicts, y_tolerance_px=y_tolerance)
|
||
|
||
rows = []
|
||
for idx, line_words in enumerate(lines):
|
||
if not line_words:
|
||
continue
|
||
min_top = min(w['top'] for w in line_words)
|
||
max_bottom = max(w['top'] + w['height'] for w in line_words)
|
||
row_height = max_bottom - min_top
|
||
|
||
rows.append(RowGeometry(
|
||
index=idx,
|
||
x=left_x,
|
||
y=top_y + min_top,
|
||
width=content_w,
|
||
height=row_height,
|
||
word_count=len(line_words),
|
||
words=line_words,
|
||
row_type='content',
|
||
gap_before=0,
|
||
))
|
||
|
||
logger.info(f"RowGeometry (fallback): {len(rows)} rows from word grouping")
|
||
return rows
|
||
|
||
|
||
# --- Phase B: Content-Based Classification ---
|
||
|
||
def _score_language(words: List[Dict]) -> Dict[str, float]:
|
||
"""Score the language of a column's words.
|
||
|
||
Analyzes function words, umlauts, and capitalization patterns
|
||
to determine whether text is English or German.
|
||
|
||
Args:
|
||
words: List of word dicts with 'text' and 'conf' keys.
|
||
|
||
Returns:
|
||
Dict with 'eng' and 'deu' scores (0.0-1.0).
|
||
"""
|
||
if not words:
|
||
return {'eng': 0.0, 'deu': 0.0}
|
||
|
||
# Only consider words with decent confidence
|
||
good_words = [w['text'].lower() for w in words if w.get('conf', 0) > 40 and len(w['text']) > 0]
|
||
if not good_words:
|
||
return {'eng': 0.0, 'deu': 0.0}
|
||
|
||
total = len(good_words)
|
||
en_hits = sum(1 for w in good_words if w in ENGLISH_FUNCTION_WORDS)
|
||
de_hits = sum(1 for w in good_words if w in GERMAN_FUNCTION_WORDS)
|
||
|
||
# Check for umlauts (strong German signal)
|
||
raw_texts = [w['text'] for w in words if w.get('conf', 0) > 40]
|
||
umlaut_count = sum(1 for t in raw_texts
|
||
for c in t if c in 'äöüÄÖÜß')
|
||
|
||
# German capitalization: nouns are capitalized mid-sentence
|
||
# Count words that start with uppercase but aren't at position 0
|
||
cap_words = sum(1 for t in raw_texts if t[0].isupper() and len(t) > 2)
|
||
|
||
en_score = en_hits / total if total > 0 else 0.0
|
||
de_score = de_hits / total if total > 0 else 0.0
|
||
|
||
# Boost German score for umlauts
|
||
if umlaut_count > 0:
|
||
de_score = min(1.0, de_score + 0.15 * min(umlaut_count, 5))
|
||
|
||
# Boost German score for high capitalization ratio (typical for German nouns)
|
||
if total > 5:
|
||
cap_ratio = cap_words / total
|
||
if cap_ratio > 0.3:
|
||
de_score = min(1.0, de_score + 0.1)
|
||
|
||
return {'eng': round(en_score, 3), 'deu': round(de_score, 3)}
|
||
|
||
|
||
def _score_role(geom: ColumnGeometry) -> Dict[str, float]:
|
||
"""Score the role of a column based on its geometry and content patterns.
|
||
|
||
Args:
|
||
geom: ColumnGeometry with words and dimensions.
|
||
|
||
Returns:
|
||
Dict with role scores: 'reference', 'marker', 'sentence', 'vocabulary'.
|
||
"""
|
||
scores = {'reference': 0.0, 'marker': 0.0, 'sentence': 0.0, 'vocabulary': 0.0}
|
||
|
||
if not geom.words:
|
||
return scores
|
||
|
||
texts = [w['text'] for w in geom.words if w.get('conf', 0) > 40]
|
||
if not texts:
|
||
return scores
|
||
|
||
avg_word_len = sum(len(t) for t in texts) / len(texts)
|
||
has_punctuation = sum(1 for t in texts if any(c in t for c in '.!?;:,'))
|
||
digit_words = sum(1 for t in texts if any(c.isdigit() for c in t))
|
||
digit_ratio = digit_words / len(texts) if texts else 0.0
|
||
|
||
# Reference: narrow + mostly numbers/page references
|
||
if geom.width_ratio < 0.12:
|
||
scores['reference'] = 0.5
|
||
if digit_ratio > 0.4:
|
||
scores['reference'] = min(1.0, 0.5 + digit_ratio * 0.5)
|
||
|
||
# Marker: narrow + few short entries
|
||
if geom.width_ratio < 0.06 and geom.word_count <= 15:
|
||
scores['marker'] = 0.7
|
||
if avg_word_len < 4:
|
||
scores['marker'] = 0.9
|
||
# Very narrow non-edge column → strong marker regardless of word count
|
||
if geom.width_ratio < 0.04 and geom.index > 0:
|
||
scores['marker'] = max(scores['marker'], 0.9)
|
||
|
||
# Sentence: longer words + punctuation present
|
||
if geom.width_ratio > 0.15 and has_punctuation > 2:
|
||
scores['sentence'] = 0.3 + min(0.5, has_punctuation / len(texts))
|
||
if avg_word_len > 4:
|
||
scores['sentence'] = min(1.0, scores['sentence'] + 0.2)
|
||
|
||
# Vocabulary: medium width + medium word length
|
||
if 0.10 < geom.width_ratio < 0.45:
|
||
scores['vocabulary'] = 0.4
|
||
if 3 < avg_word_len < 8:
|
||
scores['vocabulary'] = min(1.0, scores['vocabulary'] + 0.3)
|
||
|
||
return {k: round(v, 3) for k, v in scores.items()}
|
||
|
||
|
||
def _build_margin_regions(
|
||
all_regions: List[PageRegion],
|
||
left_x: int,
|
||
right_x: int,
|
||
img_w: int,
|
||
top_y: int,
|
||
content_h: int,
|
||
) -> List[PageRegion]:
|
||
"""Create margin_left / margin_right PageRegions from content bounds.
|
||
|
||
Margins represent the space between the image edge and the first/last
|
||
content column. They are used downstream for faithful page
|
||
reconstruction but are skipped during OCR.
|
||
"""
|
||
margins: List[PageRegion] = []
|
||
# Minimum gap (px) to create a margin region
|
||
_min_gap = 5
|
||
|
||
if left_x > _min_gap:
|
||
margins.append(PageRegion(
|
||
type='margin_left', x=0, y=top_y,
|
||
width=left_x, height=content_h,
|
||
classification_confidence=1.0,
|
||
classification_method='content_bounds',
|
||
))
|
||
|
||
# Right margin: from end of last content column to image edge
|
||
non_margin = [r for r in all_regions
|
||
if r.type not in ('margin_left', 'margin_right', 'header', 'footer',
|
||
'margin_top', 'margin_bottom')]
|
||
if non_margin:
|
||
last_col_end = max(r.x + r.width for r in non_margin)
|
||
else:
|
||
last_col_end = right_x
|
||
if img_w - last_col_end > _min_gap:
|
||
margins.append(PageRegion(
|
||
type='margin_right', x=last_col_end, y=top_y,
|
||
width=img_w - last_col_end, height=content_h,
|
||
classification_confidence=1.0,
|
||
classification_method='content_bounds',
|
||
))
|
||
|
||
if margins:
|
||
logger.info(f"Margins: {[(m.type, m.x, m.width) for m in margins]} "
|
||
f"(left_x={left_x}, right_x={right_x}, img_w={img_w})")
|
||
|
||
return margins
|
||
|
||
|
||
def positional_column_regions(
|
||
geometries: List[ColumnGeometry],
|
||
content_w: int,
|
||
content_h: int,
|
||
left_x: int,
|
||
) -> List[PageRegion]:
|
||
"""Classify columns by position only (no language scoring).
|
||
|
||
Structural columns (page_ref, column_marker) are identified by geometry.
|
||
Remaining content columns are labelled left→right as column_en, column_de,
|
||
column_example. The names are purely positional – no language analysis.
|
||
"""
|
||
structural: List[PageRegion] = []
|
||
content_cols: List[ColumnGeometry] = []
|
||
|
||
for g in geometries:
|
||
rel_x = g.x - left_x
|
||
# page_ref: narrow column in the leftmost 20% region
|
||
if g.width_ratio < 0.12 and (rel_x / content_w if content_w else 0) < 0.20:
|
||
structural.append(PageRegion(
|
||
type='page_ref', x=g.x, y=g.y,
|
||
width=g.width, height=content_h,
|
||
classification_confidence=0.95,
|
||
classification_method='positional',
|
||
))
|
||
# column_marker: very narrow, few words
|
||
elif g.width_ratio < 0.06 and g.word_count <= 15:
|
||
structural.append(PageRegion(
|
||
type='column_marker', x=g.x, y=g.y,
|
||
width=g.width, height=content_h,
|
||
classification_confidence=0.95,
|
||
classification_method='positional',
|
||
))
|
||
# empty or near-empty narrow column → treat as margin/structural
|
||
elif g.word_count <= 2 and g.width_ratio < 0.15:
|
||
structural.append(PageRegion(
|
||
type='column_marker', x=g.x, y=g.y,
|
||
width=g.width, height=content_h,
|
||
classification_confidence=0.85,
|
||
classification_method='positional',
|
||
))
|
||
else:
|
||
content_cols.append(g)
|
||
|
||
# Single content column → plain text page
|
||
if len(content_cols) == 1:
|
||
g = content_cols[0]
|
||
return structural + [PageRegion(
|
||
type='column_text', x=g.x, y=g.y,
|
||
width=g.width, height=content_h,
|
||
classification_confidence=0.9,
|
||
classification_method='positional',
|
||
)]
|
||
|
||
# No content columns
|
||
if not content_cols:
|
||
return structural
|
||
|
||
# Sort content columns left→right and assign positional labels
|
||
content_cols.sort(key=lambda g: g.x)
|
||
|
||
# With exactly 2 content columns: if the left one is very wide (>35%),
|
||
# it likely contains EN+DE combined, so the right one is examples.
|
||
if (len(content_cols) == 2
|
||
and content_cols[0].width_ratio > 0.35
|
||
and content_cols[1].width_ratio > 0.20):
|
||
labels = ['column_en', 'column_example']
|
||
else:
|
||
labels = ['column_en', 'column_de', 'column_example']
|
||
|
||
regions = list(structural)
|
||
for i, g in enumerate(content_cols):
|
||
label = labels[i] if i < len(labels) else 'column_example'
|
||
regions.append(PageRegion(
|
||
type=label, x=g.x, y=g.y,
|
||
width=g.width, height=content_h,
|
||
classification_confidence=0.95,
|
||
classification_method='positional',
|
||
))
|
||
|
||
logger.info(f"PositionalColumns: {len(structural)} structural, "
|
||
f"{len(content_cols)} content → "
|
||
f"{[r.type for r in regions]}")
|
||
return regions
|
||
|
||
|
||
def classify_column_types(geometries: List[ColumnGeometry],
|
||
content_w: int,
|
||
top_y: int,
|
||
img_w: int,
|
||
img_h: int,
|
||
bottom_y: int,
|
||
left_x: int = 0,
|
||
right_x: int = 0,
|
||
inv: Optional[np.ndarray] = None) -> List[PageRegion]:
|
||
"""Classify column types using a 3-level fallback chain.
|
||
|
||
Level 1: Content-based (language + role scoring)
|
||
Level 2: Position + language (old rules enhanced with language detection)
|
||
Level 3: Pure position (exact old code, no regression)
|
||
|
||
Args:
|
||
geometries: List of ColumnGeometry from Phase A.
|
||
content_w: Total content width.
|
||
top_y: Top Y of content area.
|
||
img_w: Full image width.
|
||
img_h: Full image height.
|
||
bottom_y: Bottom Y of content area.
|
||
left_x: Left content bound (from _find_content_bounds).
|
||
right_x: Right content bound (from _find_content_bounds).
|
||
|
||
Returns:
|
||
List of PageRegion with types, confidence, and method.
|
||
"""
|
||
content_h = bottom_y - top_y
|
||
|
||
def _with_margins(result: List[PageRegion]) -> List[PageRegion]:
|
||
"""Append margin_left / margin_right regions to *result*."""
|
||
margins = _build_margin_regions(result, left_x, right_x, img_w, top_y, content_h)
|
||
return result + margins
|
||
|
||
# Special case: single column → plain text page
|
||
if len(geometries) == 1:
|
||
geom = geometries[0]
|
||
return _with_margins([PageRegion(
|
||
type='column_text', x=geom.x, y=geom.y,
|
||
width=geom.width, height=geom.height,
|
||
classification_confidence=0.9,
|
||
classification_method='content',
|
||
)])
|
||
|
||
# --- Pre-filter: first/last columns with very few words → column_ignore ---
|
||
# Sub-columns from _detect_sub_columns() are exempt: they intentionally
|
||
# have few words (page refs, markers) and should not be discarded.
|
||
ignore_regions = []
|
||
active_geometries = []
|
||
for idx, g in enumerate(geometries):
|
||
if (idx == 0 or idx == len(geometries) - 1) and g.word_count < 8 and not g.is_sub_column:
|
||
ignore_regions.append(PageRegion(
|
||
type='column_ignore', x=g.x, y=g.y,
|
||
width=g.width, height=content_h,
|
||
classification_confidence=0.95,
|
||
classification_method='content',
|
||
))
|
||
logger.info(f"ClassifyColumns: column {idx} (x={g.x}, words={g.word_count}) → column_ignore (edge, few words)")
|
||
else:
|
||
active_geometries.append(g)
|
||
|
||
# Re-index active geometries for classification
|
||
for new_idx, g in enumerate(active_geometries):
|
||
g.index = new_idx
|
||
geometries = active_geometries
|
||
|
||
# Handle edge case: all columns ignored or only 1 left
|
||
if len(geometries) == 0:
|
||
return _with_margins(ignore_regions)
|
||
if len(geometries) == 1:
|
||
geom = geometries[0]
|
||
ignore_regions.append(PageRegion(
|
||
type='column_text', x=geom.x, y=geom.y,
|
||
width=geom.width, height=geom.height,
|
||
classification_confidence=0.9,
|
||
classification_method='content',
|
||
))
|
||
return _with_margins(ignore_regions)
|
||
|
||
# --- Score all columns ---
|
||
lang_scores = [_score_language(g.words) for g in geometries]
|
||
role_scores = [_score_role(g) for g in geometries]
|
||
|
||
logger.info(f"ClassifyColumns: language scores: "
|
||
f"{[(g.index, ls) for g, ls in zip(geometries, lang_scores)]}")
|
||
logger.info(f"ClassifyColumns: role scores: "
|
||
f"{[(g.index, rs) for g, rs in zip(geometries, role_scores)]}")
|
||
|
||
# --- Level 1: Content-based classification ---
|
||
regions = _classify_by_content(geometries, lang_scores, role_scores, content_w, content_h)
|
||
if regions is not None:
|
||
logger.info("ClassifyColumns: Level 1 (content-based) succeeded")
|
||
_add_header_footer(regions, top_y, bottom_y, img_w, img_h, inv=inv)
|
||
return _with_margins(ignore_regions + regions)
|
||
|
||
# --- Level 2: Position + language enhanced ---
|
||
regions = _classify_by_position_enhanced(geometries, lang_scores, content_w, content_h)
|
||
if regions is not None:
|
||
logger.info("ClassifyColumns: Level 2 (position+language) succeeded")
|
||
_add_header_footer(regions, top_y, bottom_y, img_w, img_h, inv=inv)
|
||
return _with_margins(ignore_regions + regions)
|
||
|
||
# --- Level 3: Pure position fallback (old code, no regression) ---
|
||
logger.info("ClassifyColumns: Level 3 (position fallback)")
|
||
regions = _classify_by_position_fallback(geometries, content_w, content_h)
|
||
_add_header_footer(regions, top_y, bottom_y, img_w, img_h, inv=inv)
|
||
return _with_margins(ignore_regions + regions)
|
||
|
||
|
||
def _classify_by_content(geometries: List[ColumnGeometry],
|
||
lang_scores: List[Dict[str, float]],
|
||
role_scores: List[Dict[str, float]],
|
||
content_w: int,
|
||
content_h: int) -> Optional[List[PageRegion]]:
|
||
"""Level 1: Classify columns purely by content analysis.
|
||
|
||
Requires clear language signals to distinguish EN/DE columns.
|
||
Returns None if language signals are too weak.
|
||
"""
|
||
regions = []
|
||
assigned = set()
|
||
|
||
# Step 1: Assign structural roles first (reference, marker)
|
||
# left_20_threshold: only the leftmost ~20% of content area qualifies for page_ref
|
||
left_20_threshold = geometries[0].x + content_w * 0.20 if geometries else 0
|
||
|
||
for i, (geom, rs, ls) in enumerate(zip(geometries, role_scores, lang_scores)):
|
||
is_left_side = geom.x < left_20_threshold
|
||
has_strong_language = ls['eng'] > 0.3 or ls['deu'] > 0.3
|
||
if rs['reference'] >= 0.5 and geom.width_ratio < 0.12 and is_left_side and not has_strong_language:
|
||
regions.append(PageRegion(
|
||
type='page_ref', x=geom.x, y=geom.y,
|
||
width=geom.width, height=content_h,
|
||
classification_confidence=rs['reference'],
|
||
classification_method='content',
|
||
))
|
||
assigned.add(i)
|
||
elif rs['marker'] >= 0.7 and geom.width_ratio < 0.06:
|
||
regions.append(PageRegion(
|
||
type='column_marker', x=geom.x, y=geom.y,
|
||
width=geom.width, height=content_h,
|
||
classification_confidence=rs['marker'],
|
||
classification_method='content',
|
||
))
|
||
assigned.add(i)
|
||
elif geom.width_ratio < 0.05 and not is_left_side:
|
||
# Narrow column on the right side → marker, not page_ref
|
||
regions.append(PageRegion(
|
||
type='column_marker', x=geom.x, y=geom.y,
|
||
width=geom.width, height=content_h,
|
||
classification_confidence=0.8,
|
||
classification_method='content',
|
||
))
|
||
assigned.add(i)
|
||
|
||
# Step 2: Among remaining columns, find EN and DE by language scores
|
||
remaining = [(i, geometries[i], lang_scores[i], role_scores[i])
|
||
for i in range(len(geometries)) if i not in assigned]
|
||
|
||
if len(remaining) < 2:
|
||
# Not enough columns for EN/DE pair
|
||
if len(remaining) == 1:
|
||
i, geom, ls, rs = remaining[0]
|
||
regions.append(PageRegion(
|
||
type='column_text', x=geom.x, y=geom.y,
|
||
width=geom.width, height=content_h,
|
||
classification_confidence=0.6,
|
||
classification_method='content',
|
||
))
|
||
regions.sort(key=lambda r: r.x)
|
||
return regions
|
||
|
||
# Check if we have enough language signal
|
||
en_candidates = [(i, g, ls) for i, g, ls, rs in remaining if ls['eng'] > ls['deu'] and ls['eng'] > 0.05]
|
||
de_candidates = [(i, g, ls) for i, g, ls, rs in remaining if ls['deu'] > ls['eng'] and ls['deu'] > 0.05]
|
||
|
||
# Position tiebreaker: when language signals are weak, use left=EN, right=DE
|
||
if (not en_candidates or not de_candidates) and len(remaining) >= 2:
|
||
max_eng = max(ls['eng'] for _, _, ls, _ in remaining)
|
||
max_deu = max(ls['deu'] for _, _, ls, _ in remaining)
|
||
if max_eng < 0.15 and max_deu < 0.15:
|
||
# Both signals weak — fall back to positional: left=EN, right=DE
|
||
sorted_remaining = sorted(remaining, key=lambda x: x[1].x)
|
||
best_en = (sorted_remaining[0][0], sorted_remaining[0][1], sorted_remaining[0][2])
|
||
best_de = (sorted_remaining[1][0], sorted_remaining[1][1], sorted_remaining[1][2])
|
||
logger.info("ClassifyColumns: Level 1 using position tiebreaker (weak signals) - left=EN, right=DE")
|
||
en_conf = 0.4
|
||
de_conf = 0.4
|
||
|
||
regions.append(PageRegion(
|
||
type='column_en', x=best_en[1].x, y=best_en[1].y,
|
||
width=best_en[1].width, height=content_h,
|
||
classification_confidence=en_conf,
|
||
classification_method='content',
|
||
))
|
||
assigned.add(best_en[0])
|
||
|
||
regions.append(PageRegion(
|
||
type='column_de', x=best_de[1].x, y=best_de[1].y,
|
||
width=best_de[1].width, height=content_h,
|
||
classification_confidence=de_conf,
|
||
classification_method='content',
|
||
))
|
||
assigned.add(best_de[0])
|
||
|
||
# Assign remaining as example
|
||
for i, geom, ls, rs in remaining:
|
||
if i not in assigned:
|
||
regions.append(PageRegion(
|
||
type='column_example', x=geom.x, y=geom.y,
|
||
width=geom.width, height=content_h,
|
||
classification_confidence=0.4,
|
||
classification_method='content',
|
||
))
|
||
regions.sort(key=lambda r: r.x)
|
||
return regions
|
||
|
||
if not en_candidates or not de_candidates:
|
||
# Language signals too weak for content-based classification
|
||
logger.info("ClassifyColumns: Level 1 failed - no clear EN/DE language split")
|
||
return None
|
||
|
||
# Pick the best EN and DE candidates
|
||
best_en = max(en_candidates, key=lambda x: x[2]['eng'])
|
||
best_de = max(de_candidates, key=lambda x: x[2]['deu'])
|
||
|
||
# Position-aware EN selection: in typical textbooks the layout is EN | DE | Example.
|
||
# Example sentences contain English function words ("the", "a", "is") which inflate
|
||
# the eng score of the Example column. When the best EN candidate sits to the RIGHT
|
||
# of the DE column and there is another EN candidate to the LEFT, prefer the left one
|
||
# — it is almost certainly the real vocabulary column.
|
||
if best_de[2]['deu'] > 0.5 and best_en[1].x > best_de[1].x and len(en_candidates) > 1:
|
||
left_of_de = [c for c in en_candidates if c[1].x < best_de[1].x]
|
||
if left_of_de:
|
||
alt_en = max(left_of_de, key=lambda x: x[2]['eng'])
|
||
logger.info(
|
||
f"ClassifyColumns: Level 1 position fix — best EN col {best_en[0]} "
|
||
f"(eng={best_en[2]['eng']:.3f}) is right of DE col {best_de[0]}; "
|
||
f"preferring left col {alt_en[0]} (eng={alt_en[2]['eng']:.3f})")
|
||
best_en = alt_en
|
||
|
||
if best_en[0] == best_de[0]:
|
||
# Same column scored highest for both — ambiguous
|
||
logger.info("ClassifyColumns: Level 1 failed - same column highest for EN and DE")
|
||
return None
|
||
|
||
en_conf = best_en[2]['eng']
|
||
de_conf = best_de[2]['deu']
|
||
|
||
regions.append(PageRegion(
|
||
type='column_en', x=best_en[1].x, y=best_en[1].y,
|
||
width=best_en[1].width, height=content_h,
|
||
classification_confidence=round(en_conf, 2),
|
||
classification_method='content',
|
||
))
|
||
assigned.add(best_en[0])
|
||
|
||
regions.append(PageRegion(
|
||
type='column_de', x=best_de[1].x, y=best_de[1].y,
|
||
width=best_de[1].width, height=content_h,
|
||
classification_confidence=round(de_conf, 2),
|
||
classification_method='content',
|
||
))
|
||
assigned.add(best_de[0])
|
||
|
||
# Step 3: Remaining columns → example or text based on role scores
|
||
for i, geom, ls, rs in remaining:
|
||
if i in assigned:
|
||
continue
|
||
if rs['sentence'] > 0.4:
|
||
regions.append(PageRegion(
|
||
type='column_example', x=geom.x, y=geom.y,
|
||
width=geom.width, height=content_h,
|
||
classification_confidence=round(rs['sentence'], 2),
|
||
classification_method='content',
|
||
))
|
||
else:
|
||
regions.append(PageRegion(
|
||
type='column_example', x=geom.x, y=geom.y,
|
||
width=geom.width, height=content_h,
|
||
classification_confidence=0.5,
|
||
classification_method='content',
|
||
))
|
||
|
||
regions.sort(key=lambda r: r.x)
|
||
return regions
|
||
|
||
|
||
def _classify_by_position_enhanced(geometries: List[ColumnGeometry],
|
||
lang_scores: List[Dict[str, float]],
|
||
content_w: int,
|
||
content_h: int) -> Optional[List[PageRegion]]:
|
||
"""Level 2: Position-based rules enhanced with language confirmation.
|
||
|
||
Uses the old positional heuristics but confirms EN/DE assignment
|
||
with language scores (swapping if needed).
|
||
"""
|
||
regions = []
|
||
untyped = list(range(len(geometries)))
|
||
first_x = geometries[0].x if geometries else 0
|
||
left_20_threshold = first_x + content_w * 0.20
|
||
|
||
# Rule 1: Leftmost narrow column → page_ref (only if in left 20%, no strong language)
|
||
g0 = geometries[0]
|
||
ls0 = lang_scores[0]
|
||
has_strong_lang_0 = ls0['eng'] > 0.3 or ls0['deu'] > 0.3
|
||
if g0.width_ratio < 0.12 and g0.x < left_20_threshold and not has_strong_lang_0:
|
||
regions.append(PageRegion(
|
||
type='page_ref', x=g0.x, y=g0.y,
|
||
width=g0.width, height=content_h,
|
||
classification_confidence=0.8,
|
||
classification_method='position_enhanced',
|
||
))
|
||
untyped.remove(0)
|
||
|
||
# Rule 2: Narrow columns with few words → marker
|
||
for i in list(untyped):
|
||
geom = geometries[i]
|
||
if geom.width_ratio < 0.06 and geom.word_count <= 15:
|
||
regions.append(PageRegion(
|
||
type='column_marker', x=geom.x, y=geom.y,
|
||
width=geom.width, height=content_h,
|
||
classification_confidence=0.7,
|
||
classification_method='position_enhanced',
|
||
))
|
||
untyped.remove(i)
|
||
|
||
# Rule 3: Rightmost remaining → column_example (if 3+ remaining)
|
||
if len(untyped) >= 3:
|
||
last_idx = untyped[-1]
|
||
geom = geometries[last_idx]
|
||
regions.append(PageRegion(
|
||
type='column_example', x=geom.x, y=geom.y,
|
||
width=geom.width, height=content_h,
|
||
classification_confidence=0.7,
|
||
classification_method='position_enhanced',
|
||
))
|
||
untyped.remove(last_idx)
|
||
|
||
# Rule 4: First two remaining → EN/DE, but check language to possibly swap
|
||
if len(untyped) >= 2:
|
||
idx_a = untyped[0]
|
||
idx_b = untyped[1]
|
||
ls_a = lang_scores[idx_a]
|
||
ls_b = lang_scores[idx_b]
|
||
|
||
# Default: first=EN, second=DE (old behavior)
|
||
en_idx, de_idx = idx_a, idx_b
|
||
conf = 0.7
|
||
|
||
# Swap if language signals clearly indicate the opposite
|
||
if ls_a['deu'] > ls_a['eng'] and ls_b['eng'] > ls_b['deu']:
|
||
en_idx, de_idx = idx_b, idx_a
|
||
conf = 0.85
|
||
logger.info(f"ClassifyColumns: Level 2 swapped EN/DE based on language scores")
|
||
|
||
regions.append(PageRegion(
|
||
type='column_en', x=geometries[en_idx].x, y=geometries[en_idx].y,
|
||
width=geometries[en_idx].width, height=content_h,
|
||
classification_confidence=conf,
|
||
classification_method='position_enhanced',
|
||
))
|
||
regions.append(PageRegion(
|
||
type='column_de', x=geometries[de_idx].x, y=geometries[de_idx].y,
|
||
width=geometries[de_idx].width, height=content_h,
|
||
classification_confidence=conf,
|
||
classification_method='position_enhanced',
|
||
))
|
||
untyped = untyped[2:]
|
||
elif len(untyped) == 1:
|
||
idx = untyped[0]
|
||
geom = geometries[idx]
|
||
regions.append(PageRegion(
|
||
type='column_en', x=geom.x, y=geom.y,
|
||
width=geom.width, height=content_h,
|
||
classification_confidence=0.5,
|
||
classification_method='position_enhanced',
|
||
))
|
||
untyped = []
|
||
|
||
# Remaining → example
|
||
for idx in untyped:
|
||
geom = geometries[idx]
|
||
regions.append(PageRegion(
|
||
type='column_example', x=geom.x, y=geom.y,
|
||
width=geom.width, height=content_h,
|
||
classification_confidence=0.5,
|
||
classification_method='position_enhanced',
|
||
))
|
||
|
||
regions.sort(key=lambda r: r.x)
|
||
return regions
|
||
|
||
|
||
def _classify_by_position_fallback(geometries: List[ColumnGeometry],
|
||
content_w: int,
|
||
content_h: int) -> List[PageRegion]:
|
||
"""Level 3: Pure position-based fallback (identical to old code).
|
||
|
||
Guarantees no regression from the previous behavior.
|
||
"""
|
||
regions = []
|
||
untyped = list(range(len(geometries)))
|
||
first_x = geometries[0].x if geometries else 0
|
||
left_20_threshold = first_x + content_w * 0.20
|
||
|
||
# Rule 1: Leftmost narrow column → page_ref (only if in left 20%)
|
||
g0 = geometries[0]
|
||
if g0.width_ratio < 0.12 and g0.x < left_20_threshold:
|
||
regions.append(PageRegion(
|
||
type='page_ref', x=g0.x, y=g0.y,
|
||
width=g0.width, height=content_h,
|
||
classification_confidence=1.0,
|
||
classification_method='position_fallback',
|
||
))
|
||
untyped.remove(0)
|
||
|
||
# Rule 2: Narrow + few words → marker
|
||
for i in list(untyped):
|
||
geom = geometries[i]
|
||
if geom.width_ratio < 0.06 and geom.word_count <= 15:
|
||
regions.append(PageRegion(
|
||
type='column_marker', x=geom.x, y=geom.y,
|
||
width=geom.width, height=content_h,
|
||
classification_confidence=1.0,
|
||
classification_method='position_fallback',
|
||
))
|
||
untyped.remove(i)
|
||
|
||
# Rule 3: Rightmost remaining → example (if 3+)
|
||
if len(untyped) >= 3:
|
||
last_idx = untyped[-1]
|
||
geom = geometries[last_idx]
|
||
regions.append(PageRegion(
|
||
type='column_example', x=geom.x, y=geom.y,
|
||
width=geom.width, height=content_h,
|
||
classification_confidence=1.0,
|
||
classification_method='position_fallback',
|
||
))
|
||
untyped.remove(last_idx)
|
||
|
||
# Rule 4: First remaining → EN, second → DE
|
||
if len(untyped) >= 2:
|
||
en_idx = untyped[0]
|
||
de_idx = untyped[1]
|
||
regions.append(PageRegion(
|
||
type='column_en', x=geometries[en_idx].x, y=geometries[en_idx].y,
|
||
width=geometries[en_idx].width, height=content_h,
|
||
classification_confidence=1.0,
|
||
classification_method='position_fallback',
|
||
))
|
||
regions.append(PageRegion(
|
||
type='column_de', x=geometries[de_idx].x, y=geometries[de_idx].y,
|
||
width=geometries[de_idx].width, height=content_h,
|
||
classification_confidence=1.0,
|
||
classification_method='position_fallback',
|
||
))
|
||
untyped = untyped[2:]
|
||
elif len(untyped) == 1:
|
||
idx = untyped[0]
|
||
geom = geometries[idx]
|
||
regions.append(PageRegion(
|
||
type='column_en', x=geom.x, y=geom.y,
|
||
width=geom.width, height=content_h,
|
||
classification_confidence=1.0,
|
||
classification_method='position_fallback',
|
||
))
|
||
untyped = []
|
||
|
||
for idx in untyped:
|
||
geom = geometries[idx]
|
||
regions.append(PageRegion(
|
||
type='column_example', x=geom.x, y=geom.y,
|
||
width=geom.width, height=content_h,
|
||
classification_confidence=1.0,
|
||
classification_method='position_fallback',
|
||
))
|
||
|
||
regions.sort(key=lambda r: r.x)
|
||
return regions
|
||
|
||
|
||
def _detect_header_footer_gaps(
|
||
inv: np.ndarray,
|
||
img_w: int,
|
||
img_h: int,
|
||
) -> Tuple[Optional[int], Optional[int]]:
|
||
"""Detect header/footer boundaries via horizontal projection gap analysis.
|
||
|
||
Scans the full-page inverted image for large horizontal gaps in the top/bottom
|
||
20% that separate header/footer content from the main body.
|
||
|
||
Returns:
|
||
(header_y, footer_y) — absolute y-coordinates.
|
||
header_y = bottom edge of header region (None if no header detected).
|
||
footer_y = top edge of footer region (None if no footer detected).
|
||
"""
|
||
HEADER_FOOTER_ZONE = 0.20
|
||
GAP_MULTIPLIER = 2.0
|
||
|
||
# Step 1: Horizontal projection — clamp to img_h to avoid dewarp padding
|
||
actual_h = min(inv.shape[0], img_h)
|
||
roi = inv[:actual_h, :]
|
||
h_proj = np.sum(roi, axis=1).astype(float)
|
||
proj_w = roi.shape[1]
|
||
h_proj_norm = h_proj / (proj_w * 255) if proj_w > 0 else h_proj
|
||
|
||
# Step 2: Smoothing
|
||
kernel_size = max(3, actual_h // 200)
|
||
if kernel_size % 2 == 0:
|
||
kernel_size += 1
|
||
h_smooth = np.convolve(h_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
|
||
|
||
# Step 3: Gap threshold
|
||
positive = h_smooth[h_smooth > 0]
|
||
median_density = float(np.median(positive)) if len(positive) > 0 else 0.01
|
||
gap_threshold = max(median_density * 0.15, 0.003)
|
||
|
||
in_gap = h_smooth < gap_threshold
|
||
MIN_GAP_HEIGHT = max(3, actual_h // 500)
|
||
|
||
# Step 4: Collect contiguous gaps
|
||
raw_gaps: List[Tuple[int, int]] = []
|
||
gap_start: Optional[int] = None
|
||
for y in range(len(in_gap)):
|
||
if in_gap[y]:
|
||
if gap_start is None:
|
||
gap_start = y
|
||
else:
|
||
if gap_start is not None:
|
||
gap_height = y - gap_start
|
||
if gap_height >= MIN_GAP_HEIGHT:
|
||
raw_gaps.append((gap_start, y))
|
||
gap_start = None
|
||
if gap_start is not None:
|
||
gap_height = len(in_gap) - gap_start
|
||
if gap_height >= MIN_GAP_HEIGHT:
|
||
raw_gaps.append((gap_start, len(in_gap)))
|
||
|
||
if not raw_gaps:
|
||
return None, None
|
||
|
||
# Step 5: Compute median gap size and large-gap threshold
|
||
gap_sizes = [g[1] - g[0] for g in raw_gaps]
|
||
median_gap = float(np.median(gap_sizes))
|
||
large_gap_threshold = median_gap * GAP_MULTIPLIER
|
||
|
||
# Step 6: Find largest qualifying gap in header / footer zones
|
||
# A separator gap must have content on BOTH sides — edge-touching gaps
|
||
# (e.g. dewarp padding at bottom) are not valid separators.
|
||
EDGE_MARGIN = max(5, actual_h // 400)
|
||
header_zone_limit = int(actual_h * HEADER_FOOTER_ZONE)
|
||
footer_zone_start = int(actual_h * (1.0 - HEADER_FOOTER_ZONE))
|
||
|
||
header_y: Optional[int] = None
|
||
footer_y: Optional[int] = None
|
||
|
||
best_header_size = 0
|
||
for gs, ge in raw_gaps:
|
||
if gs <= EDGE_MARGIN:
|
||
continue # skip gaps touching the top edge
|
||
gap_mid = (gs + ge) / 2
|
||
gap_size = ge - gs
|
||
if gap_mid < header_zone_limit and gap_size > large_gap_threshold:
|
||
if gap_size > best_header_size:
|
||
best_header_size = gap_size
|
||
header_y = ge # bottom edge of gap
|
||
|
||
best_footer_size = 0
|
||
for gs, ge in raw_gaps:
|
||
if ge >= actual_h - EDGE_MARGIN:
|
||
continue # skip gaps touching the bottom edge
|
||
gap_mid = (gs + ge) / 2
|
||
gap_size = ge - gs
|
||
if gap_mid > footer_zone_start and gap_size > large_gap_threshold:
|
||
if gap_size > best_footer_size:
|
||
best_footer_size = gap_size
|
||
footer_y = gs # top edge of gap
|
||
|
||
if header_y is not None:
|
||
logger.info(f"HeaderFooterGaps: header boundary at y={header_y} "
|
||
f"(gap={best_header_size}px, median_gap={median_gap:.0f}px)")
|
||
if footer_y is not None:
|
||
logger.info(f"HeaderFooterGaps: footer boundary at y={footer_y} "
|
||
f"(gap={best_footer_size}px, median_gap={median_gap:.0f}px)")
|
||
|
||
return header_y, footer_y
|
||
|
||
|
||
def _region_has_content(inv: np.ndarray, y_start: int, y_end: int,
|
||
min_density: float = 0.005) -> bool:
|
||
"""Check whether a horizontal strip contains meaningful ink.
|
||
|
||
Args:
|
||
inv: Inverted binarized image (white-on-black).
|
||
y_start: Top of the region (inclusive).
|
||
y_end: Bottom of the region (exclusive).
|
||
min_density: Fraction of white pixels required to count as content.
|
||
|
||
Returns:
|
||
True if the region contains text/graphics, False if empty margin.
|
||
"""
|
||
if y_start >= y_end:
|
||
return False
|
||
strip = inv[y_start:y_end, :]
|
||
density = float(np.sum(strip)) / (strip.shape[0] * strip.shape[1] * 255)
|
||
return density > min_density
|
||
|
||
|
||
def _add_header_footer(regions: List[PageRegion], top_y: int, bottom_y: int,
|
||
img_w: int, img_h: int,
|
||
inv: Optional[np.ndarray] = None) -> None:
|
||
"""Add header/footer/margin regions in-place.
|
||
|
||
Uses gap-based detection when *inv* is provided, otherwise falls back
|
||
to simple top_y/bottom_y bounds.
|
||
|
||
Region types depend on whether there is actual content (text/graphics):
|
||
- 'header' / 'footer' — region contains text (e.g. title, page number)
|
||
- 'margin_top' / 'margin_bottom' — region is empty page margin
|
||
"""
|
||
header_y: Optional[int] = None
|
||
footer_y: Optional[int] = None
|
||
|
||
if inv is not None:
|
||
header_y, footer_y = _detect_header_footer_gaps(inv, img_w, img_h)
|
||
|
||
# --- Top region ---
|
||
top_boundary = header_y if header_y is not None and header_y > 10 else (
|
||
top_y if top_y > 10 else None
|
||
)
|
||
if top_boundary is not None:
|
||
has_content = inv is not None and _region_has_content(inv, 0, top_boundary)
|
||
rtype = 'header' if has_content else 'margin_top'
|
||
regions.append(PageRegion(type=rtype, x=0, y=0, width=img_w, height=top_boundary))
|
||
logger.info(f"HeaderFooter: top region type={rtype} height={top_boundary}px "
|
||
f"(has_content={has_content})")
|
||
|
||
# --- Bottom region ---
|
||
bottom_boundary = footer_y if footer_y is not None and footer_y < img_h - 10 else (
|
||
bottom_y if bottom_y < img_h - 10 else None
|
||
)
|
||
if bottom_boundary is not None:
|
||
has_content = inv is not None and _region_has_content(inv, bottom_boundary, img_h)
|
||
rtype = 'footer' if has_content else 'margin_bottom'
|
||
regions.append(PageRegion(type=rtype, x=0, y=bottom_boundary, width=img_w,
|
||
height=img_h - bottom_boundary))
|
||
logger.info(f"HeaderFooter: bottom region type={rtype} y={bottom_boundary} "
|
||
f"height={img_h - bottom_boundary}px (has_content={has_content})")
|
||
|
||
|
||
# --- Main Entry Point ---
|
||
|
||
def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> List[PageRegion]:
|
||
"""Detect columns using two-phase approach: geometry then content classification.
|
||
|
||
Phase A: detect_column_geometry() — clustering word positions into columns.
|
||
Phase B: classify_column_types() — content-based type assignment with fallback.
|
||
|
||
Falls back to projection-based analyze_layout() if geometry detection fails.
|
||
|
||
Args:
|
||
ocr_img: Binarized grayscale image for layout analysis.
|
||
dewarped_bgr: Original BGR image (for Tesseract word detection).
|
||
|
||
Returns:
|
||
List of PageRegion objects with types, confidence, and method.
|
||
"""
|
||
h, w = ocr_img.shape[:2]
|
||
|
||
# Phase A: Geometry detection
|
||
result = detect_column_geometry(ocr_img, dewarped_bgr)
|
||
|
||
if result is None:
|
||
# Fallback to projection-based layout
|
||
logger.info("LayoutByWords: geometry detection failed, falling back to projection profiles")
|
||
layout_img = create_layout_image(dewarped_bgr)
|
||
return analyze_layout(layout_img, ocr_img)
|
||
|
||
geometries, left_x, right_x, top_y, bottom_y, _word_dicts, _inv = result
|
||
content_w = right_x - left_x
|
||
|
||
# Detect header/footer early so sub-column clustering ignores them
|
||
header_y, footer_y = _detect_header_footer_gaps(_inv, w, h) if _inv is not None else (None, None)
|
||
|
||
# Split sub-columns (e.g. page references) before classification
|
||
geometries = _detect_sub_columns(geometries, content_w, left_x=left_x,
|
||
top_y=top_y, header_y=header_y, footer_y=footer_y)
|
||
|
||
# Split broad columns that contain EN+DE mixed via word-coverage gaps
|
||
geometries = _split_broad_columns(geometries, content_w, left_x=left_x)
|
||
|
||
# Phase B: Positional classification (no language scoring)
|
||
content_h = bottom_y - top_y
|
||
regions = positional_column_regions(geometries, content_w, content_h, left_x)
|
||
|
||
col_count = len([r for r in regions if r.type.startswith('column') or r.type == 'page_ref'])
|
||
methods = set(r.classification_method for r in regions if r.classification_method)
|
||
logger.info(f"LayoutByWords: {col_count} columns detected (methods: {methods}): "
|
||
f"{[(r.type, r.x, r.width, r.classification_confidence) for r in regions if r.type not in ('header','footer','margin_top','margin_bottom')]}")
|
||
|
||
return regions
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Zone-aware column geometry detection
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def detect_column_geometry_zoned(
|
||
ocr_img: np.ndarray,
|
||
dewarped_bgr: np.ndarray,
|
||
) -> Optional[Tuple[
|
||
List[ColumnGeometry], # flat column list (all zones)
|
||
int, int, int, int, # left_x, right_x, top_y, bottom_y
|
||
List[Dict], # word_dicts
|
||
np.ndarray, # inv
|
||
List[Dict], # zones (serializable)
|
||
List[DetectedBox], # detected boxes
|
||
]]:
|
||
"""Zone-aware column geometry detection.
|
||
|
||
1. Finds content bounds.
|
||
2. Runs box detection.
|
||
3. If boxes found: splits page into zones, runs detect_column_geometry()
|
||
per content zone on the corresponding sub-image.
|
||
4. If no boxes: delegates entirely to detect_column_geometry() (backward compat).
|
||
|
||
Returns:
|
||
Extended tuple: (geometries, left_x, right_x, top_y, bottom_y,
|
||
word_dicts, inv, zones_data, boxes)
|
||
or None if detection fails.
|
||
"""
|
||
from cv_box_detect import detect_boxes, split_page_into_zones
|
||
|
||
# First run normal detection to get content bounds and word data
|
||
geo_result = detect_column_geometry(ocr_img, dewarped_bgr)
|
||
if geo_result is None:
|
||
return None
|
||
|
||
geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv = geo_result
|
||
content_w = right_x - left_x
|
||
content_h = bottom_y - top_y
|
||
|
||
# Detect boxes in the image
|
||
boxes = detect_boxes(
|
||
dewarped_bgr, left_x, content_w, top_y, content_h,
|
||
)
|
||
|
||
if not boxes:
|
||
# No boxes — single zone, backward compatible
|
||
zone_data = [{
|
||
"index": 0,
|
||
"zone_type": "content",
|
||
"y": top_y,
|
||
"height": content_h,
|
||
"x": left_x,
|
||
"width": content_w,
|
||
"columns": [], # filled later by caller
|
||
}]
|
||
return (geometries, left_x, right_x, top_y, bottom_y,
|
||
word_dicts, inv, zone_data, boxes)
|
||
|
||
# Split into zones
|
||
zones = split_page_into_zones(left_x, top_y, content_w, content_h, boxes)
|
||
|
||
# Run column detection per content zone
|
||
all_geometries: List[ColumnGeometry] = []
|
||
zones_data: List[Dict] = []
|
||
|
||
for zone in zones:
|
||
zone_dict: Dict = {
|
||
"index": zone.index,
|
||
"zone_type": zone.zone_type,
|
||
"y": zone.y,
|
||
"height": zone.height,
|
||
"x": zone.x,
|
||
"width": zone.width,
|
||
"columns": [],
|
||
}
|
||
|
||
if zone.box is not None:
|
||
zone_dict["box"] = {
|
||
"x": zone.box.x,
|
||
"y": zone.box.y,
|
||
"width": zone.box.width,
|
||
"height": zone.box.height,
|
||
"confidence": zone.box.confidence,
|
||
"border_thickness": zone.box.border_thickness,
|
||
}
|
||
|
||
if zone.zone_type == 'content' and zone.height >= 40:
|
||
# Extract sub-image for this zone
|
||
zone_y_end = zone.y + zone.height
|
||
sub_ocr = ocr_img[zone.y:zone_y_end, :]
|
||
sub_bgr = dewarped_bgr[zone.y:zone_y_end, :]
|
||
|
||
sub_result = detect_column_geometry(sub_ocr, sub_bgr)
|
||
if sub_result is not None:
|
||
sub_geoms, sub_lx, sub_rx, sub_ty, sub_by, _sub_words, _sub_inv = sub_result
|
||
|
||
# Offset column y-coordinates back to absolute page coords
|
||
for g in sub_geoms:
|
||
g.y += zone.y
|
||
|
||
zone_cols = []
|
||
for g in sub_geoms:
|
||
zone_cols.append({
|
||
"index": g.index,
|
||
"x": g.x,
|
||
"y": g.y,
|
||
"width": g.width,
|
||
"height": g.height,
|
||
"word_count": g.word_count,
|
||
"width_ratio": g.width_ratio,
|
||
"zone_index": zone.index,
|
||
})
|
||
zone_dict["columns"] = zone_cols
|
||
all_geometries.extend(sub_geoms)
|
||
else:
|
||
logger.debug(f"ZonedColumns: zone {zone.index} column detection returned None")
|
||
|
||
zones_data.append(zone_dict)
|
||
|
||
# If per-zone detection produced no columns, fall back to the original
|
||
if not all_geometries:
|
||
all_geometries = geometries
|
||
|
||
logger.info(f"ZonedColumns: {len(boxes)} box(es), {len(zones)} zone(s), "
|
||
f"{len(all_geometries)} total columns")
|
||
|
||
return (all_geometries, left_x, right_x, top_y, bottom_y,
|
||
word_dicts, inv, zones_data, boxes)
|