Files
breakpilot-lehrer/klausur-service/backend/cv_vocab_pipeline.py
Benjamin Admin c4f2e6554e fix(ocr-pipeline): prevent grid from producing more rows than gap-based
Two fixes:
1. Grid validation: reject word-center grid if it produces MORE rows
   than gap-based detection (more rows = lines were split = worse).
   Falls back to gap-based rows in that case.

2. Words overlay: draw clean grid cells (column × row intersections)
   instead of padded entry bboxes. Eliminates confusing double lines.
   OCR text labels are placed inside the grid cells directly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-01 12:52:41 +01:00

3674 lines
132 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
CV-based Document Reconstruction Pipeline for Vocabulary Extraction.
Uses classical Computer Vision techniques for high-quality OCR:
- High-resolution PDF rendering (432 DPI)
- Deskew (rotation correction via Hough Lines)
- Dewarp (book curvature correction) — pass-through initially
- Dual image preparation (binarized for OCR, CLAHE for layout)
- Projection-profile layout analysis (column/row detection)
- Multi-pass Tesseract OCR with region-specific PSM settings
- Y-coordinate line alignment for vocabulary matching
- Optional LLM post-correction for low-confidence regions
Lizenz: Apache 2.0 (kommerziell nutzbar)
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import io
import logging
import time
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
logger = logging.getLogger(__name__)
# --- Availability Guards ---
try:
import cv2
CV2_AVAILABLE = True
except ImportError:
cv2 = None
CV2_AVAILABLE = False
logger.warning("OpenCV not available — CV pipeline disabled")
try:
import pytesseract
from PIL import Image
TESSERACT_AVAILABLE = True
except ImportError:
pytesseract = None
Image = None
TESSERACT_AVAILABLE = False
logger.warning("pytesseract/Pillow not available — CV pipeline disabled")
CV_PIPELINE_AVAILABLE = CV2_AVAILABLE and TESSERACT_AVAILABLE
# --- IPA Dictionary ---
import json
import os
import re
IPA_AVAILABLE = False
_ipa_convert_american = None
_britfone_dict: Dict[str, str] = {}
try:
import eng_to_ipa as _eng_to_ipa
_ipa_convert_american = _eng_to_ipa.convert
IPA_AVAILABLE = True
logger.info("eng_to_ipa available — American IPA lookup enabled")
except ImportError:
logger.info("eng_to_ipa not installed — American IPA disabled")
# Load Britfone dictionary (MIT license, ~15k British English IPA entries)
_britfone_path = os.path.join(os.path.dirname(__file__), 'data', 'britfone_ipa.json')
if os.path.exists(_britfone_path):
try:
with open(_britfone_path, 'r', encoding='utf-8') as f:
_britfone_dict = json.load(f)
IPA_AVAILABLE = True
logger.info(f"Britfone loaded — {len(_britfone_dict)} British IPA entries")
except Exception as e:
logger.warning(f"Failed to load Britfone: {e}")
else:
logger.info("Britfone not found — British IPA disabled")
# --- Language Detection Constants ---
GERMAN_FUNCTION_WORDS = {'der', 'die', 'das', 'und', 'ist', 'ein', 'eine', 'nicht',
'von', 'zu', 'mit', 'auf', 'fuer', 'den', 'dem', 'sich', 'auch', 'wird',
'nach', 'bei', 'aus', 'wie', 'oder', 'wenn', 'noch', 'aber', 'hat', 'nur',
'ueber', 'kann', 'als', 'ich', 'er', 'sie', 'es', 'wir', 'ihr', 'haben',
'sein', 'werden', 'war', 'sind', 'muss', 'soll', 'dieser', 'diese', 'diesem'}
ENGLISH_FUNCTION_WORDS = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'to', 'of',
'and', 'in', 'that', 'it', 'for', 'on', 'with', 'as', 'at', 'by', 'from',
'or', 'but', 'not', 'be', 'have', 'has', 'had', 'do', 'does', 'did', 'will',
'would', 'can', 'could', 'should', 'may', 'might', 'this', 'they', 'you', 'he',
'she', 'we', 'my', 'your', 'his', 'her', 'its', 'our', 'their', 'which'}
# --- Data Classes ---
@dataclass
class PageRegion:
"""A detected region on the page."""
type: str # 'column_en', 'column_de', 'column_example', 'page_ref', 'column_marker', 'column_text', 'header', 'footer'
x: int
y: int
width: int
height: int
classification_confidence: float = 1.0 # 0.0-1.0
classification_method: str = "" # 'content', 'position_enhanced', 'position_fallback'
@dataclass
class ColumnGeometry:
"""Geometrisch erkannte Spalte vor Typ-Klassifikation."""
index: int # 0-basiert, links->rechts
x: int
y: int
width: int
height: int
word_count: int
words: List[Dict] # Wort-Dicts aus Tesseract (text, conf, left, top, ...)
width_ratio: float # width / content_width (0.0-1.0)
@dataclass
class RowGeometry:
"""Geometrisch erkannte Zeile mit Kopf-/Fusszeilen-Klassifikation."""
index: int # 0-basiert, oben→unten
x: int # absolute left (= content left_x)
y: int # absolute y start
width: int # content width
height: int # Zeilenhoehe in px
word_count: int
words: List[Dict]
row_type: str = 'content' # 'content' | 'header' | 'footer'
gap_before: int = 0 # Gap in px ueber dieser Zeile
@dataclass
class VocabRow:
"""A single vocabulary entry assembled from multi-column OCR."""
english: str = ""
german: str = ""
example: str = ""
confidence: float = 0.0
y_position: int = 0
@dataclass
class PipelineResult:
"""Complete result of the CV pipeline."""
vocabulary: List[Dict[str, Any]] = field(default_factory=list)
word_count: int = 0
columns_detected: int = 0
duration_seconds: float = 0.0
stages: Dict[str, float] = field(default_factory=dict)
error: Optional[str] = None
image_width: int = 0
image_height: int = 0
# =============================================================================
# Stage 1: High-Resolution PDF Rendering
# =============================================================================
def render_pdf_high_res(pdf_data: bytes, page_number: int = 0, zoom: float = 3.0) -> np.ndarray:
"""Render a PDF page to a high-resolution numpy array (BGR).
Args:
pdf_data: Raw PDF bytes.
page_number: 0-indexed page number.
zoom: Zoom factor (3.0 = 432 DPI).
Returns:
numpy array in BGR format.
"""
import fitz # PyMuPDF
pdf_doc = fitz.open(stream=pdf_data, filetype="pdf")
if page_number >= pdf_doc.page_count:
raise ValueError(f"Page {page_number} does not exist (PDF has {pdf_doc.page_count} pages)")
page = pdf_doc[page_number]
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
# Convert to numpy BGR
img_data = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n)
if pix.n == 4: # RGBA
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGBA2BGR)
elif pix.n == 3: # RGB
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGB2BGR)
else: # Grayscale
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_GRAY2BGR)
pdf_doc.close()
return img_bgr
def render_image_high_res(image_data: bytes) -> np.ndarray:
"""Load an image (PNG/JPEG) into a numpy array (BGR).
Args:
image_data: Raw image bytes.
Returns:
numpy array in BGR format.
"""
img_array = np.frombuffer(image_data, dtype=np.uint8)
img_bgr = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
if img_bgr is None:
raise ValueError("Could not decode image data")
return img_bgr
# =============================================================================
# Stage 2: Deskew (Rotation Correction)
# =============================================================================
def deskew_image(img: np.ndarray) -> Tuple[np.ndarray, float]:
"""Correct rotation using Hough Line detection.
Args:
img: BGR image.
Returns:
Tuple of (corrected image, detected angle in degrees).
"""
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Binarize for line detection
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
# Detect lines
lines = cv2.HoughLinesP(binary, 1, np.pi / 180, threshold=100,
minLineLength=img.shape[1] // 4, maxLineGap=20)
if lines is None or len(lines) < 3:
return img, 0.0
# Compute angles of near-horizontal lines
angles = []
for line in lines:
x1, y1, x2, y2 = line[0]
angle = np.degrees(np.arctan2(y2 - y1, x2 - x1))
if abs(angle) < 15: # Only near-horizontal
angles.append(angle)
if not angles:
return img, 0.0
median_angle = float(np.median(angles))
# Limit correction to ±5°
if abs(median_angle) > 5.0:
median_angle = 5.0 * np.sign(median_angle)
if abs(median_angle) < 0.1:
return img, 0.0
# Rotate
h, w = img.shape[:2]
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, median_angle, 1.0)
corrected = cv2.warpAffine(img, M, (w, h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
logger.info(f"Deskew: corrected {median_angle:.2f}° rotation")
return corrected, median_angle
def deskew_image_by_word_alignment(
image_data: bytes,
lang: str = "eng+deu",
downscale_factor: float = 0.5,
) -> Tuple[bytes, float]:
"""Correct rotation by fitting a line through left-most word starts per text line.
More robust than Hough-based deskew for vocabulary worksheets where text lines
have consistent left-alignment. Runs a quick Tesseract pass on a downscaled
copy to find word positions, computes the dominant left-edge column, fits a
line through those points and rotates the full-resolution image.
Args:
image_data: Raw image bytes (PNG/JPEG).
lang: Tesseract language string for the quick pass.
downscale_factor: Shrink factor for the quick Tesseract pass (0.5 = 50%).
Returns:
Tuple of (rotated image as PNG bytes, detected angle in degrees).
"""
if not CV2_AVAILABLE or not TESSERACT_AVAILABLE:
return image_data, 0.0
# 1. Decode image
img_array = np.frombuffer(image_data, dtype=np.uint8)
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
if img is None:
logger.warning("deskew_by_word_alignment: could not decode image")
return image_data, 0.0
orig_h, orig_w = img.shape[:2]
# 2. Downscale for fast Tesseract pass
small_w = int(orig_w * downscale_factor)
small_h = int(orig_h * downscale_factor)
small = cv2.resize(img, (small_w, small_h), interpolation=cv2.INTER_AREA)
# 3. Quick Tesseract — word-level positions
pil_small = Image.fromarray(cv2.cvtColor(small, cv2.COLOR_BGR2RGB))
try:
data = pytesseract.image_to_data(
pil_small, lang=lang, config="--psm 6 --oem 3",
output_type=pytesseract.Output.DICT,
)
except Exception as e:
logger.warning(f"deskew_by_word_alignment: Tesseract failed: {e}")
return image_data, 0.0
# 4. Per text-line, find the left-most word start
# Group by (block_num, par_num, line_num)
from collections import defaultdict
line_groups: Dict[tuple, list] = defaultdict(list)
for i in range(len(data["text"])):
text = (data["text"][i] or "").strip()
conf = int(data["conf"][i])
if not text or conf < 20:
continue
key = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
line_groups[key].append(i)
if len(line_groups) < 5:
logger.info(f"deskew_by_word_alignment: only {len(line_groups)} lines, skipping")
return image_data, 0.0
# For each line, pick the word with smallest 'left' → compute (left_x, center_y)
# Scale back to original resolution
scale = 1.0 / downscale_factor
points = [] # list of (x, y) in original-image coords
for key, indices in line_groups.items():
best_idx = min(indices, key=lambda i: data["left"][i])
lx = data["left"][best_idx] * scale
top = data["top"][best_idx] * scale
h = data["height"][best_idx] * scale
cy = top + h / 2.0
points.append((lx, cy))
# 5. Find dominant left-edge column + compute angle
xs = np.array([p[0] for p in points])
ys = np.array([p[1] for p in points])
median_x = float(np.median(xs))
tolerance = orig_w * 0.03 # 3% of image width
mask = np.abs(xs - median_x) <= tolerance
filtered_xs = xs[mask]
filtered_ys = ys[mask]
if len(filtered_xs) < 5:
logger.info(f"deskew_by_word_alignment: only {len(filtered_xs)} aligned points after filter, skipping")
return image_data, 0.0
# polyfit: x = a*y + b → a = dx/dy → angle = arctan(a)
coeffs = np.polyfit(filtered_ys, filtered_xs, 1)
slope = coeffs[0] # dx/dy
angle_rad = np.arctan(slope)
angle_deg = float(np.degrees(angle_rad))
# Clamp to ±5°
angle_deg = max(-5.0, min(5.0, angle_deg))
logger.info(f"deskew_by_word_alignment: detected {angle_deg:.2f}° from {len(filtered_xs)} points "
f"(total lines: {len(line_groups)})")
if abs(angle_deg) < 0.05:
return image_data, 0.0
# 6. Rotate full-res image
center = (orig_w // 2, orig_h // 2)
M = cv2.getRotationMatrix2D(center, angle_deg, 1.0)
rotated = cv2.warpAffine(img, M, (orig_w, orig_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
# Encode back to PNG
success, png_buf = cv2.imencode(".png", rotated)
if not success:
logger.warning("deskew_by_word_alignment: PNG encoding failed")
return image_data, 0.0
return png_buf.tobytes(), angle_deg
# =============================================================================
# Stage 3: Dewarp (Book Curvature Correction)
# =============================================================================
def _detect_shear_angle(img: np.ndarray) -> Dict[str, Any]:
"""Detect the vertical shear angle of the page.
After deskew (horizontal lines aligned), vertical features like column
edges may still be tilted. This measures that tilt by tracking the
strongest vertical edge across horizontal strips.
The result is a shear angle in degrees: the angular difference between
true vertical and the detected column edge.
Returns:
Dict with keys: method, shear_degrees, confidence.
"""
h, w = img.shape[:2]
result = {"method": "vertical_edge", "shear_degrees": 0.0, "confidence": 0.0}
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Vertical Sobel to find vertical edges
sobel_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
abs_sobel = np.abs(sobel_x).astype(np.uint8)
# Binarize with Otsu
_, binary = cv2.threshold(abs_sobel, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
num_strips = 20
strip_h = h // num_strips
edge_positions = [] # (y_center, x_position)
for i in range(num_strips):
y_start = i * strip_h
y_end = min((i + 1) * strip_h, h)
strip = binary[y_start:y_end, :]
# Project vertically (sum along y-axis)
projection = np.sum(strip, axis=0).astype(np.float64)
if projection.max() == 0:
continue
# Find the strongest vertical edge in left 40% of image
search_w = int(w * 0.4)
left_proj = projection[:search_w]
if left_proj.max() == 0:
continue
# Smooth and find peak
kernel_size = max(3, w // 100)
if kernel_size % 2 == 0:
kernel_size += 1
smoothed = cv2.GaussianBlur(left_proj.reshape(1, -1), (kernel_size, 1), 0).flatten()
x_pos = float(np.argmax(smoothed))
y_center = (y_start + y_end) / 2.0
edge_positions.append((y_center, x_pos))
if len(edge_positions) < 8:
return result
ys = np.array([p[0] for p in edge_positions])
xs = np.array([p[1] for p in edge_positions])
# Remove outliers (> 2 std from median)
median_x = np.median(xs)
std_x = max(np.std(xs), 1.0)
mask = np.abs(xs - median_x) < 2 * std_x
ys = ys[mask]
xs = xs[mask]
if len(ys) < 6:
return result
# Fit straight line: x = slope * y + intercept
# The slope tells us the tilt of the vertical edge
straight_coeffs = np.polyfit(ys, xs, 1)
slope = straight_coeffs[0] # dx/dy in pixels
fitted = np.polyval(straight_coeffs, ys)
residuals = xs - fitted
rmse = float(np.sqrt(np.mean(residuals ** 2)))
# Convert slope to angle: arctan(dx/dy) in degrees
import math
shear_degrees = math.degrees(math.atan(slope))
confidence = min(1.0, len(ys) / 15.0) * max(0.5, 1.0 - rmse / 5.0)
result["shear_degrees"] = round(shear_degrees, 3)
result["confidence"] = round(float(confidence), 2)
return result
def _apply_shear(img: np.ndarray, shear_degrees: float) -> np.ndarray:
"""Apply a vertical shear correction to an image.
Shifts each row horizontally proportional to its distance from the
vertical center. This corrects the tilt of vertical features (columns)
without affecting horizontal alignment (text lines).
Args:
img: BGR image.
shear_degrees: Shear angle in degrees. Positive = shift top-right/bottom-left.
Returns:
Corrected image.
"""
import math
h, w = img.shape[:2]
shear_tan = math.tan(math.radians(shear_degrees))
# Affine matrix: shift x by shear_tan * (y - h/2)
# [1 shear_tan -h/2*shear_tan]
# [0 1 0 ]
M = np.float32([
[1, shear_tan, -h / 2.0 * shear_tan],
[0, 1, 0],
])
corrected = cv2.warpAffine(img, M, (w, h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
return corrected
def dewarp_image(img: np.ndarray) -> Tuple[np.ndarray, Dict[str, Any]]:
"""Correct vertical shear after deskew.
After deskew aligns horizontal text lines, vertical features (column
edges) may still be tilted. This detects the tilt angle of the strongest
vertical edge and applies an affine shear correction.
Args:
img: BGR image (already deskewed).
Returns:
Tuple of (corrected_image, dewarp_info).
dewarp_info keys: method, shear_degrees, confidence.
"""
no_correction = {
"method": "none",
"shear_degrees": 0.0,
"confidence": 0.0,
}
if not CV2_AVAILABLE:
return img, no_correction
t0 = time.time()
detection = _detect_shear_angle(img)
duration = time.time() - t0
shear_deg = detection["shear_degrees"]
confidence = detection["confidence"]
logger.info(f"dewarp: detected shear={shear_deg:.3f}° "
f"conf={confidence:.2f} ({duration:.2f}s)")
# Only correct if shear is significant (> 0.05°)
if abs(shear_deg) < 0.05 or confidence < 0.3:
return img, no_correction
# Apply correction (negate the detected shear to straighten)
corrected = _apply_shear(img, -shear_deg)
info = {
"method": detection["method"],
"shear_degrees": shear_deg,
"confidence": confidence,
}
return corrected, info
def dewarp_image_manual(img: np.ndarray, shear_degrees: float) -> np.ndarray:
"""Apply shear correction with a manual angle.
Args:
img: BGR image (deskewed, before dewarp).
shear_degrees: Shear angle in degrees to correct.
Returns:
Corrected image.
"""
if abs(shear_degrees) < 0.001:
return img
return _apply_shear(img, -shear_degrees)
# =============================================================================
# Stage 4: Dual Image Preparation
# =============================================================================
def create_ocr_image(img: np.ndarray) -> np.ndarray:
"""Create a binarized image optimized for Tesseract OCR.
Steps: Grayscale → Background normalization → Adaptive threshold → Denoise.
Args:
img: BGR image.
Returns:
Binary image (white text on black background inverted to black on white).
"""
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Background normalization: divide by blurred version
bg = cv2.GaussianBlur(gray, (51, 51), 0)
normalized = cv2.divide(gray, bg, scale=255)
# Adaptive binarization
binary = cv2.adaptiveThreshold(
normalized, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 31, 10
)
# Light denoise
denoised = cv2.medianBlur(binary, 3)
return denoised
def create_layout_image(img: np.ndarray) -> np.ndarray:
"""Create a CLAHE-enhanced grayscale image for layout analysis.
Args:
img: BGR image.
Returns:
Enhanced grayscale image.
"""
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(gray)
return enhanced
# =============================================================================
# Stage 5: Layout Analysis (Projection Profiles)
# =============================================================================
def _find_content_bounds(inv: np.ndarray) -> Tuple[int, int, int, int]:
"""Find the bounding box of actual text content (excluding page margins).
Returns:
Tuple of (left_x, right_x, top_y, bottom_y).
"""
h, w = inv.shape[:2]
# Horizontal projection for top/bottom
h_proj = np.sum(inv, axis=1).astype(float) / (w * 255)
top_y = 0
for y in range(h):
if h_proj[y] > 0.005:
top_y = max(0, y - 5)
break
bottom_y = h
for y in range(h - 1, 0, -1):
if h_proj[y] > 0.005:
bottom_y = min(h, y + 5)
break
# Vertical projection for left/right margins
v_proj = np.sum(inv[top_y:bottom_y, :], axis=0).astype(float)
v_proj_norm = v_proj / ((bottom_y - top_y) * 255) if (bottom_y - top_y) > 0 else v_proj
left_x = 0
for x in range(w):
if v_proj_norm[x] > 0.005:
left_x = max(0, x - 2)
break
right_x = w
for x in range(w - 1, 0, -1):
if v_proj_norm[x] > 0.005:
right_x = min(w, x + 2)
break
return left_x, right_x, top_y, bottom_y
def analyze_layout(layout_img: np.ndarray, ocr_img: np.ndarray) -> List[PageRegion]:
"""Detect columns, header, and footer using projection profiles.
Uses content-bounds detection to exclude page margins before searching
for column separators within the actual text area.
Args:
layout_img: CLAHE-enhanced grayscale image.
ocr_img: Binarized image for text density analysis.
Returns:
List of PageRegion objects describing detected regions.
"""
h, w = ocr_img.shape[:2]
# Invert: black text on white → white text on black for projection
inv = cv2.bitwise_not(ocr_img)
# --- Find actual content bounds (exclude page margins) ---
left_x, right_x, top_y, bottom_y = _find_content_bounds(inv)
content_w = right_x - left_x
content_h = bottom_y - top_y
logger.info(f"Layout: content bounds x=[{left_x}..{right_x}] ({content_w}px), "
f"y=[{top_y}..{bottom_y}] ({content_h}px) in {w}x{h} image")
if content_w < w * 0.3 or content_h < h * 0.3:
# Fallback if detection seems wrong
left_x, right_x = 0, w
top_y, bottom_y = 0, h
content_w, content_h = w, h
# --- Vertical projection within content area to find column separators ---
content_strip = inv[top_y:bottom_y, left_x:right_x]
v_proj = np.sum(content_strip, axis=0).astype(float)
v_proj_norm = v_proj / (content_h * 255) if content_h > 0 else v_proj
# Smooth the projection profile
kernel_size = max(5, content_w // 50)
if kernel_size % 2 == 0:
kernel_size += 1
v_proj_smooth = np.convolve(v_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
# Debug: log projection profile statistics
p_mean = float(np.mean(v_proj_smooth))
p_median = float(np.median(v_proj_smooth))
p_min = float(np.min(v_proj_smooth))
p_max = float(np.max(v_proj_smooth))
logger.info(f"Layout: v_proj stats — min={p_min:.4f}, max={p_max:.4f}, "
f"mean={p_mean:.4f}, median={p_median:.4f}")
# Find valleys using multiple threshold strategies
# Strategy 1: relative to median (catches clear separators)
# Strategy 2: local minima approach (catches subtle gaps)
threshold = max(p_median * 0.3, p_mean * 0.2)
logger.info(f"Layout: valley threshold={threshold:.4f}")
in_valley = v_proj_smooth < threshold
# Find contiguous valley regions
all_valleys = []
start = None
for x in range(len(v_proj_smooth)):
if in_valley[x] and start is None:
start = x
elif not in_valley[x] and start is not None:
valley_width = x - start
valley_depth = float(np.min(v_proj_smooth[start:x]))
# Valley must be at least 3px wide
if valley_width >= 3:
all_valleys.append((start, x, (start + x) // 2, valley_width, valley_depth))
start = None
logger.info(f"Layout: raw valleys (before filter): {len(all_valleys)}"
f"{[(v[0]+left_x, v[1]+left_x, v[3], f'{v[4]:.4f}') for v in all_valleys[:10]]}")
# Filter: valleys must be inside the content area (not at edges)
inner_margin = int(content_w * 0.08)
valleys = [v for v in all_valleys if inner_margin < v[2] < content_w - inner_margin]
# If no valleys found with strict threshold, try local minima approach
if len(valleys) < 2:
logger.info("Layout: trying local minima approach for column detection")
# Divide content into 20 segments, find the 2 lowest
seg_count = 20
seg_width = content_w // seg_count
seg_scores = []
for i in range(seg_count):
sx = i * seg_width
ex = min((i + 1) * seg_width, content_w)
seg_mean = float(np.mean(v_proj_smooth[sx:ex]))
seg_scores.append((i, sx, ex, seg_mean))
seg_scores.sort(key=lambda s: s[3])
logger.info(f"Layout: segment scores (lowest 5): "
f"{[(s[0], s[1]+left_x, s[2]+left_x, f'{s[3]:.4f}') for s in seg_scores[:5]]}")
# Find two lowest non-adjacent segments that create reasonable columns
candidate_valleys = []
for seg_idx, sx, ex, seg_mean in seg_scores:
# Must not be at the edges
if seg_idx <= 1 or seg_idx >= seg_count - 2:
continue
# Must be significantly lower than overall mean
if seg_mean < p_mean * 0.6:
center = (sx + ex) // 2
candidate_valleys.append((sx, ex, center, ex - sx, seg_mean))
if len(candidate_valleys) >= 2:
# Pick the best pair: non-adjacent, creating reasonable column widths
candidate_valleys.sort(key=lambda v: v[2])
best_pair = None
best_score = float('inf')
for i in range(len(candidate_valleys)):
for j in range(i + 1, len(candidate_valleys)):
c1 = candidate_valleys[i][2]
c2 = candidate_valleys[j][2]
# Must be at least 20% apart
if (c2 - c1) < content_w * 0.2:
continue
col1 = c1
col2 = c2 - c1
col3 = content_w - c2
# Each column at least 15%
if col1 < content_w * 0.12 or col2 < content_w * 0.12 or col3 < content_w * 0.12:
continue
parts = sorted([col1, col2, col3])
score = parts[2] - parts[0]
if score < best_score:
best_score = score
best_pair = (candidate_valleys[i], candidate_valleys[j])
if best_pair:
valleys = list(best_pair)
logger.info(f"Layout: local minima found 2 valleys: "
f"{[(v[0]+left_x, v[1]+left_x, v[3]) for v in valleys]}")
logger.info(f"Layout: final {len(valleys)} valleys: "
f"{[(v[0]+left_x, v[1]+left_x, v[3]) for v in valleys]}")
regions = []
if len(valleys) >= 2:
# 3-column layout detected
valleys.sort(key=lambda v: v[2])
if len(valleys) == 2:
sep1_center = valleys[0][2]
sep2_center = valleys[1][2]
else:
# Pick the two valleys that best divide into 3 parts
# Prefer wider valleys (more likely true separators)
best_pair = None
best_score = float('inf')
for i in range(len(valleys)):
for j in range(i + 1, len(valleys)):
c1, c2 = valleys[i][2], valleys[j][2]
# Each column should be at least 15% of content width
col1 = c1
col2 = c2 - c1
col3 = content_w - c2
if col1 < content_w * 0.15 or col2 < content_w * 0.15 or col3 < content_w * 0.15:
continue
# Score: lower is better (more even distribution)
parts = sorted([col1, col2, col3])
score = parts[2] - parts[0]
# Bonus for wider valleys (subtract valley width)
score -= (valleys[i][3] + valleys[j][3]) * 0.5
if score < best_score:
best_score = score
best_pair = (c1, c2)
if best_pair:
sep1_center, sep2_center = best_pair
else:
sep1_center = valleys[0][2]
sep2_center = valleys[1][2]
# Convert from content-relative to absolute coordinates
abs_sep1 = sep1_center + left_x
abs_sep2 = sep2_center + left_x
logger.info(f"Layout: 3 columns at separators x={abs_sep1}, x={abs_sep2} "
f"(widths: {abs_sep1}, {abs_sep2-abs_sep1}, {w-abs_sep2})")
regions.append(PageRegion(
type='column_en', x=0, y=top_y,
width=abs_sep1, height=content_h
))
regions.append(PageRegion(
type='column_de', x=abs_sep1, y=top_y,
width=abs_sep2 - abs_sep1, height=content_h
))
regions.append(PageRegion(
type='column_example', x=abs_sep2, y=top_y,
width=w - abs_sep2, height=content_h
))
elif len(valleys) == 1:
# 2-column layout
abs_sep = valleys[0][2] + left_x
logger.info(f"Layout: 2 columns at separator x={abs_sep}")
regions.append(PageRegion(
type='column_en', x=0, y=top_y,
width=abs_sep, height=content_h
))
regions.append(PageRegion(
type='column_de', x=abs_sep, y=top_y,
width=w - abs_sep, height=content_h
))
else:
# No columns detected — run full-page OCR as single column
logger.warning("Layout: no column separators found, using full page")
regions.append(PageRegion(
type='column_en', x=0, y=top_y,
width=w, height=content_h
))
# Add header/footer info
if top_y > 10:
regions.append(PageRegion(
type='header', x=0, y=0,
width=w, height=top_y
))
if bottom_y < h - 10:
regions.append(PageRegion(
type='footer', x=0, y=bottom_y,
width=w, height=h - bottom_y
))
col_count = len([r for r in regions if r.type.startswith('column')])
logger.info(f"Layout: {col_count} columns, "
f"header={'yes' if top_y > 10 else 'no'}, "
f"footer={'yes' if bottom_y < h - 10 else 'no'}")
return regions
# =============================================================================
# Stage 5b: Word-Based Layout Analysis (Two-Phase Column Detection)
# =============================================================================
# --- Phase A: Geometry Detection ---
def _detect_columns_by_clustering(
word_dicts: List[Dict],
left_edges: List[int],
edge_word_indices: List[int],
content_w: int,
content_h: int,
left_x: int,
right_x: int,
top_y: int,
bottom_y: int,
inv: Optional[np.ndarray] = None,
) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int, List[Dict], Optional[np.ndarray]]]:
"""Fallback: detect columns by clustering left-aligned word positions.
Used when the primary gap-based algorithm finds fewer than 2 gaps.
"""
tolerance = max(10, int(content_w * 0.01))
sorted_pairs = sorted(zip(left_edges, edge_word_indices), key=lambda p: p[0])
clusters = []
cluster_widxs = []
cur_edges = [sorted_pairs[0][0]]
cur_widxs = [sorted_pairs[0][1]]
for edge, widx in sorted_pairs[1:]:
if edge - cur_edges[-1] <= tolerance:
cur_edges.append(edge)
cur_widxs.append(widx)
else:
clusters.append(cur_edges)
cluster_widxs.append(cur_widxs)
cur_edges = [edge]
cur_widxs = [widx]
clusters.append(cur_edges)
cluster_widxs.append(cur_widxs)
MIN_Y_COVERAGE_PRIMARY = 0.30
MIN_Y_COVERAGE_SECONDARY = 0.15
MIN_WORDS_SECONDARY = 5
cluster_infos = []
for c_edges, c_widxs in zip(clusters, cluster_widxs):
if len(c_edges) < 2:
continue
y_positions = [word_dicts[idx]['top'] for idx in c_widxs]
y_span = max(y_positions) - min(y_positions)
y_coverage = y_span / content_h if content_h > 0 else 0.0
cluster_infos.append({
'mean_x': int(np.mean(c_edges)),
'count': len(c_edges),
'min_edge': min(c_edges),
'max_edge': max(c_edges),
'y_min': min(y_positions),
'y_max': max(y_positions),
'y_coverage': y_coverage,
})
primary = [c for c in cluster_infos if c['y_coverage'] >= MIN_Y_COVERAGE_PRIMARY]
primary_set = set(id(c) for c in primary)
secondary = [c for c in cluster_infos
if id(c) not in primary_set
and c['y_coverage'] >= MIN_Y_COVERAGE_SECONDARY
and c['count'] >= MIN_WORDS_SECONDARY]
significant = sorted(primary + secondary, key=lambda c: c['mean_x'])
if len(significant) < 3:
logger.info("ColumnGeometry clustering fallback: < 3 significant clusters")
return None
merge_distance = max(30, int(content_w * 0.06))
merged = [significant[0].copy()]
for s in significant[1:]:
if s['mean_x'] - merged[-1]['mean_x'] < merge_distance:
prev = merged[-1]
total = prev['count'] + s['count']
avg_x = (prev['mean_x'] * prev['count'] + s['mean_x'] * s['count']) // total
prev['mean_x'] = avg_x
prev['count'] = total
prev['min_edge'] = min(prev['min_edge'], s['min_edge'])
prev['max_edge'] = max(prev['max_edge'], s['max_edge'])
else:
merged.append(s.copy())
if len(merged) < 3:
logger.info("ColumnGeometry clustering fallback: < 3 merged clusters")
return None
logger.info(f"ColumnGeometry clustering fallback: {len(merged)} columns from clustering")
margin_px = max(6, int(content_w * 0.003))
return _build_geometries_from_starts(
[(max(0, left_x + m['min_edge'] - margin_px), m['count']) for m in merged],
word_dicts, left_x, right_x, top_y, bottom_y, content_w, content_h, inv,
)
def _build_geometries_from_starts(
col_starts: List[Tuple[int, int]],
word_dicts: List[Dict],
left_x: int,
right_x: int,
top_y: int,
bottom_y: int,
content_w: int,
content_h: int,
inv: Optional[np.ndarray] = None,
) -> Tuple[List[ColumnGeometry], int, int, int, int, List[Dict], Optional[np.ndarray]]:
"""Build ColumnGeometry objects from a list of (abs_start_x, word_count) pairs."""
geometries = []
for i, (start_x, count) in enumerate(col_starts):
if i + 1 < len(col_starts):
col_width = col_starts[i + 1][0] - start_x
else:
col_width = right_x - start_x
col_left_rel = start_x - left_x
col_right_rel = col_left_rel + col_width
col_words = [w for w in word_dicts
if col_left_rel <= w['left'] < col_right_rel]
geometries.append(ColumnGeometry(
index=i,
x=start_x,
y=top_y,
width=col_width,
height=content_h,
word_count=len(col_words),
words=col_words,
width_ratio=col_width / content_w if content_w > 0 else 0.0,
))
logger.info(f"ColumnGeometry: {len(geometries)} columns: "
f"{[(g.index, g.x, g.width, g.word_count) for g in geometries]}")
return (geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv)
def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int, List[Dict], np.ndarray]]:
"""Detect column geometry using whitespace-gap analysis with word validation.
Phase A of the two-phase column detection. Uses vertical projection
profiles to find whitespace gaps between columns, then validates that
no gap cuts through a word bounding box.
Falls back to clustering-based detection if fewer than 2 gaps are found.
Args:
ocr_img: Binarized grayscale image for layout analysis.
dewarped_bgr: Original BGR image (for Tesseract word detection).
Returns:
Tuple of (geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv)
or None if detection fails entirely.
"""
h, w = ocr_img.shape[:2]
# --- Step 1: Find content bounds ---
inv = cv2.bitwise_not(ocr_img)
left_x, right_x, top_y, bottom_y = _find_content_bounds(inv)
content_w = right_x - left_x
content_h = bottom_y - top_y
if content_w < w * 0.3 or content_h < h * 0.3:
left_x, right_x = 0, w
top_y, bottom_y = 0, h
content_w, content_h = w, h
logger.info(f"ColumnGeometry: content bounds x=[{left_x}..{right_x}] ({content_w}px), "
f"y=[{top_y}..{bottom_y}] ({content_h}px)")
# --- Step 2: Get word bounding boxes from Tesseract ---
content_roi = dewarped_bgr[top_y:bottom_y, left_x:right_x]
pil_img = Image.fromarray(cv2.cvtColor(content_roi, cv2.COLOR_BGR2RGB))
try:
data = pytesseract.image_to_data(pil_img, lang='eng+deu', output_type=pytesseract.Output.DICT)
except Exception as e:
logger.warning(f"ColumnGeometry: Tesseract image_to_data failed: {e}")
return None
word_dicts = []
left_edges = []
edge_word_indices = []
n_words = len(data['text'])
for i in range(n_words):
conf = int(data['conf'][i]) if str(data['conf'][i]).lstrip('-').isdigit() else -1
text = str(data['text'][i]).strip()
if conf < 30 or not text:
continue
lx = int(data['left'][i])
ty = int(data['top'][i])
bw = int(data['width'][i])
bh = int(data['height'][i])
left_edges.append(lx)
edge_word_indices.append(len(word_dicts))
word_dicts.append({
'text': text, 'conf': conf,
'left': lx, 'top': ty, 'width': bw, 'height': bh,
})
if len(left_edges) < 5:
logger.warning(f"ColumnGeometry: only {len(left_edges)} words detected")
return None
logger.info(f"ColumnGeometry: {len(left_edges)} words detected in content area")
# --- Step 3: Vertical projection profile ---
content_strip = inv[top_y:bottom_y, left_x:right_x]
v_proj = np.sum(content_strip, axis=0).astype(float)
v_proj_norm = v_proj / (content_h * 255) if content_h > 0 else v_proj
# Smooth the projection to avoid noise-induced micro-gaps
kernel_size = max(5, content_w // 80)
if kernel_size % 2 == 0:
kernel_size += 1 # keep odd for symmetry
v_smooth = np.convolve(v_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
# --- Step 4: Find whitespace gaps ---
# Threshold: areas with very little ink density are gaps
median_density = float(np.median(v_smooth[v_smooth > 0])) if np.any(v_smooth > 0) else 0.01
gap_threshold = max(median_density * 0.15, 0.005)
in_gap = v_smooth < gap_threshold
MIN_GAP_WIDTH = max(8, content_w // 200) # min ~8px or 0.5% of content width
# Collect contiguous gap regions
raw_gaps = [] # (start_x_rel, end_x_rel) relative to content ROI
gap_start = None
for x in range(len(in_gap)):
if in_gap[x]:
if gap_start is None:
gap_start = x
else:
if gap_start is not None:
gap_width = x - gap_start
if gap_width >= MIN_GAP_WIDTH:
raw_gaps.append((gap_start, x))
gap_start = None
# Handle gap at the right edge
if gap_start is not None:
gap_width = len(in_gap) - gap_start
if gap_width >= MIN_GAP_WIDTH:
raw_gaps.append((gap_start, len(in_gap)))
logger.info(f"ColumnGeometry: {len(raw_gaps)} raw gaps found (threshold={gap_threshold:.4f}, "
f"min_width={MIN_GAP_WIDTH}px): "
f"{[(g[0]+left_x, g[1]+left_x, g[1]-g[0]) for g in raw_gaps]}")
# --- Step 5: Validate gaps against word bounding boxes ---
validated_gaps = []
for gap_start_rel, gap_end_rel in raw_gaps:
# Check if any word overlaps with this gap region
overlapping = False
for wd in word_dicts:
word_left = wd['left']
word_right = wd['left'] + wd['width']
if word_left < gap_end_rel and word_right > gap_start_rel:
overlapping = True
break
if not overlapping:
validated_gaps.append((gap_start_rel, gap_end_rel))
else:
# Try to shift the gap to avoid the overlapping word(s)
# Find the tightest word boundaries within the gap region
min_word_left = content_w
max_word_right = 0
for wd in word_dicts:
word_left = wd['left']
word_right = wd['left'] + wd['width']
if word_left < gap_end_rel and word_right > gap_start_rel:
min_word_left = min(min_word_left, word_left)
max_word_right = max(max_word_right, word_right)
# Try gap before the overlapping words
if min_word_left - gap_start_rel >= MIN_GAP_WIDTH:
validated_gaps.append((gap_start_rel, min_word_left))
logger.debug(f"ColumnGeometry: gap shifted left to avoid word at {min_word_left}")
# Try gap after the overlapping words
elif gap_end_rel - max_word_right >= MIN_GAP_WIDTH:
validated_gaps.append((max_word_right, gap_end_rel))
logger.debug(f"ColumnGeometry: gap shifted right to avoid word at {max_word_right}")
else:
logger.debug(f"ColumnGeometry: gap [{gap_start_rel}..{gap_end_rel}] "
f"discarded (word overlap, no room to shift)")
logger.info(f"ColumnGeometry: {len(validated_gaps)} gaps after word validation: "
f"{[(g[0]+left_x, g[1]+left_x, g[1]-g[0]) for g in validated_gaps]}")
# --- Step 6: Fallback to clustering if too few gaps ---
if len(validated_gaps) < 2:
logger.info("ColumnGeometry: < 2 gaps found, falling back to clustering")
return _detect_columns_by_clustering(
word_dicts, left_edges, edge_word_indices,
content_w, content_h, left_x, right_x, top_y, bottom_y, inv,
)
# --- Step 7: Derive column boundaries from gaps ---
# Sort gaps by position
validated_gaps.sort(key=lambda g: g[0])
# Identify margin gaps (first and last) vs interior gaps
# A margin gap touches the edge of the content area (within 2% tolerance)
edge_tolerance = max(10, int(content_w * 0.02))
is_left_margin = validated_gaps[0][0] <= edge_tolerance
is_right_margin = validated_gaps[-1][1] >= content_w - edge_tolerance
# Interior gaps define column boundaries
# Column starts at the end of a gap, ends at the start of the next gap
col_starts = []
if is_left_margin:
# First column starts after the left margin gap
first_gap_end = validated_gaps[0][1]
interior_gaps = validated_gaps[1:]
else:
# No left margin gap — first column starts at content left edge
first_gap_end = 0
interior_gaps = validated_gaps[:]
if is_right_margin:
# Last gap is right margin — don't use it as column start
interior_gaps_for_boundaries = interior_gaps[:-1]
right_boundary = validated_gaps[-1][0] # last column ends at right margin gap start
else:
interior_gaps_for_boundaries = interior_gaps
right_boundary = content_w
# First column
col_starts.append(left_x + first_gap_end)
# Columns between interior gaps
for gap_start_rel, gap_end_rel in interior_gaps_for_boundaries:
col_starts.append(left_x + gap_end_rel)
# Count words per column region (for logging)
col_start_counts = []
for i, start_x in enumerate(col_starts):
if i + 1 < len(col_starts):
next_start = col_starts[i + 1]
elif is_right_margin:
next_start = left_x + right_boundary
else:
next_start = right_x
col_left_rel = start_x - left_x
col_right_rel = next_start - left_x
n_words_in_col = sum(1 for w in word_dicts
if col_left_rel <= w['left'] < col_right_rel)
col_start_counts.append((start_x, n_words_in_col))
logger.info(f"ColumnGeometry: {len(col_starts)} columns from {len(validated_gaps)} gaps "
f"(left_margin={is_left_margin}, right_margin={is_right_margin}): "
f"{col_start_counts}")
# --- Step 8: Build ColumnGeometry objects ---
# Determine right edge for each column
all_boundaries = []
for i, start_x in enumerate(col_starts):
if i + 1 < len(col_starts):
end_x = col_starts[i + 1]
elif is_right_margin:
end_x = left_x + right_boundary
else:
end_x = right_x
all_boundaries.append((start_x, end_x))
geometries = []
for i, (start_x, end_x) in enumerate(all_boundaries):
col_width = end_x - start_x
col_left_rel = start_x - left_x
col_right_rel = col_left_rel + col_width
col_words = [w for w in word_dicts
if col_left_rel <= w['left'] < col_right_rel]
geometries.append(ColumnGeometry(
index=i,
x=start_x,
y=top_y,
width=col_width,
height=content_h,
word_count=len(col_words),
words=col_words,
width_ratio=col_width / content_w if content_w > 0 else 0.0,
))
logger.info(f"ColumnGeometry: {len(geometries)} columns: "
f"{[(g.index, g.x, g.width, g.word_count) for g in geometries]}")
return (geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv)
# =============================================================================
# Row Geometry Detection (horizontal whitespace-gap analysis)
# =============================================================================
def detect_row_geometry(
inv: np.ndarray,
word_dicts: List[Dict],
left_x: int, right_x: int,
top_y: int, bottom_y: int,
) -> List['RowGeometry']:
"""Detect row geometry using horizontal whitespace-gap analysis.
Mirrors the vertical gap approach used for columns, but operates on
horizontal projection profiles to find gaps between text lines.
Also classifies header/footer rows based on gap size.
Args:
inv: Inverted binarized image (white text on black bg, full page).
word_dicts: Word bounding boxes from Tesseract (relative to content ROI).
left_x, right_x: Absolute X bounds of the content area.
top_y, bottom_y: Absolute Y bounds of the content area.
Returns:
List of RowGeometry objects sorted top to bottom.
"""
content_w = right_x - left_x
content_h = bottom_y - top_y
if content_h < 10 or content_w < 10:
logger.warning("detect_row_geometry: content area too small")
return []
# --- Step 1: Horizontal projection profile (text-only, images masked out) ---
content_strip = inv[top_y:bottom_y, left_x:right_x]
# Build a word-coverage mask so that image regions (high ink density but no
# Tesseract words) are ignored. Only pixels within/near word bounding boxes
# contribute to the projection. This prevents large illustrations from
# merging multiple vocabulary rows into one.
WORD_PAD_Y = max(4, content_h // 300) # small vertical padding around words
word_mask = np.zeros((content_h, content_w), dtype=np.uint8)
for wd in word_dicts:
y1 = max(0, wd['top'] - WORD_PAD_Y)
y2 = min(content_h, wd['top'] + wd['height'] + WORD_PAD_Y)
x1 = max(0, wd['left'])
x2 = min(content_w, wd['left'] + wd['width'])
word_mask[y1:y2, x1:x2] = 255
masked_strip = cv2.bitwise_and(content_strip, word_mask)
h_proj = np.sum(masked_strip, axis=1).astype(float)
h_proj_norm = h_proj / (content_w * 255) if content_w > 0 else h_proj
# --- Step 2: Smoothing + threshold ---
kernel_size = max(3, content_h // 200)
if kernel_size % 2 == 0:
kernel_size += 1
h_smooth = np.convolve(h_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
median_density = float(np.median(h_smooth[h_smooth > 0])) if np.any(h_smooth > 0) else 0.01
gap_threshold = max(median_density * 0.15, 0.003)
in_gap = h_smooth < gap_threshold
MIN_GAP_HEIGHT = max(3, content_h // 500)
# --- Step 3: Collect contiguous gap regions ---
raw_gaps = [] # (start_y_rel, end_y_rel) relative to content ROI
gap_start = None
for y in range(len(in_gap)):
if in_gap[y]:
if gap_start is None:
gap_start = y
else:
if gap_start is not None:
gap_height = y - gap_start
if gap_height >= MIN_GAP_HEIGHT:
raw_gaps.append((gap_start, y))
gap_start = None
if gap_start is not None:
gap_height = len(in_gap) - gap_start
if gap_height >= MIN_GAP_HEIGHT:
raw_gaps.append((gap_start, len(in_gap)))
logger.info(f"RowGeometry: {len(raw_gaps)} raw gaps found (threshold={gap_threshold:.4f}, "
f"min_height={MIN_GAP_HEIGHT}px)")
# --- Step 4: Validate gaps against word bounding boxes ---
validated_gaps = []
for gap_start_rel, gap_end_rel in raw_gaps:
overlapping = False
for wd in word_dicts:
word_top = wd['top']
word_bottom = wd['top'] + wd['height']
if word_top < gap_end_rel and word_bottom > gap_start_rel:
overlapping = True
break
if not overlapping:
validated_gaps.append((gap_start_rel, gap_end_rel))
else:
# Try to shift the gap to avoid overlapping words
min_word_top = content_h
max_word_bottom = 0
for wd in word_dicts:
word_top = wd['top']
word_bottom = wd['top'] + wd['height']
if word_top < gap_end_rel and word_bottom > gap_start_rel:
min_word_top = min(min_word_top, word_top)
max_word_bottom = max(max_word_bottom, word_bottom)
if min_word_top - gap_start_rel >= MIN_GAP_HEIGHT:
validated_gaps.append((gap_start_rel, min_word_top))
elif gap_end_rel - max_word_bottom >= MIN_GAP_HEIGHT:
validated_gaps.append((max_word_bottom, gap_end_rel))
else:
logger.debug(f"RowGeometry: gap [{gap_start_rel}..{gap_end_rel}] "
f"discarded (word overlap, no room to shift)")
logger.info(f"RowGeometry: {len(validated_gaps)} gaps after word validation")
# --- Fallback if too few gaps ---
if len(validated_gaps) < 2:
logger.info("RowGeometry: < 2 gaps found, falling back to word grouping")
return _build_rows_from_word_grouping(
word_dicts, left_x, right_x, top_y, bottom_y, content_w, content_h,
)
validated_gaps.sort(key=lambda g: g[0])
# --- Step 5: Header/footer detection via gap size ---
HEADER_FOOTER_ZONE = 0.15
GAP_MULTIPLIER = 2.0
gap_sizes = [g[1] - g[0] for g in validated_gaps]
median_gap = float(np.median(gap_sizes)) if gap_sizes else 0
large_gap_threshold = median_gap * GAP_MULTIPLIER
header_boundary_rel = None # y below which is header
footer_boundary_rel = None # y above which is footer
header_zone_limit = int(content_h * HEADER_FOOTER_ZONE)
footer_zone_start = int(content_h * (1.0 - HEADER_FOOTER_ZONE))
# Find largest gap in header zone
best_header_gap = None
for gs, ge in validated_gaps:
gap_mid = (gs + ge) / 2
gap_size = ge - gs
if gap_mid < header_zone_limit and gap_size > large_gap_threshold:
if best_header_gap is None or gap_size > (best_header_gap[1] - best_header_gap[0]):
best_header_gap = (gs, ge)
if best_header_gap is not None:
header_boundary_rel = best_header_gap[1]
logger.info(f"RowGeometry: header boundary at y_rel={header_boundary_rel} "
f"(gap={best_header_gap[1] - best_header_gap[0]}px, "
f"median_gap={median_gap:.0f}px)")
# Find largest gap in footer zone
best_footer_gap = None
for gs, ge in validated_gaps:
gap_mid = (gs + ge) / 2
gap_size = ge - gs
if gap_mid > footer_zone_start and gap_size > large_gap_threshold:
if best_footer_gap is None or gap_size > (best_footer_gap[1] - best_footer_gap[0]):
best_footer_gap = (gs, ge)
if best_footer_gap is not None:
footer_boundary_rel = best_footer_gap[0]
logger.info(f"RowGeometry: footer boundary at y_rel={footer_boundary_rel} "
f"(gap={best_footer_gap[1] - best_footer_gap[0]}px)")
# --- Step 6: Build RowGeometry objects from gaps ---
# Rows are the spans between gaps
row_boundaries = [] # (start_y_rel, end_y_rel)
# Top of content to first gap
if validated_gaps[0][0] > MIN_GAP_HEIGHT:
row_boundaries.append((0, validated_gaps[0][0]))
# Between gaps
for i in range(len(validated_gaps) - 1):
row_start = validated_gaps[i][1]
row_end = validated_gaps[i + 1][0]
if row_end - row_start > 0:
row_boundaries.append((row_start, row_end))
# Last gap to bottom of content
if validated_gaps[-1][1] < content_h - MIN_GAP_HEIGHT:
row_boundaries.append((validated_gaps[-1][1], content_h))
rows = []
for idx, (row_start_rel, row_end_rel) in enumerate(row_boundaries):
# Determine row type
row_mid = (row_start_rel + row_end_rel) / 2
if header_boundary_rel is not None and row_mid < header_boundary_rel:
row_type = 'header'
elif footer_boundary_rel is not None and row_mid > footer_boundary_rel:
row_type = 'footer'
else:
row_type = 'content'
# Collect words in this row
row_words = [w for w in word_dicts
if w['top'] + w['height'] / 2 >= row_start_rel
and w['top'] + w['height'] / 2 < row_end_rel]
# Gap before this row
gap_before = 0
if idx == 0 and validated_gaps[0][0] > 0:
gap_before = validated_gaps[0][0]
elif idx > 0:
# Find the gap just before this row boundary
for gs, ge in validated_gaps:
if ge == row_start_rel:
gap_before = ge - gs
break
rows.append(RowGeometry(
index=idx,
x=left_x,
y=top_y + row_start_rel,
width=content_w,
height=row_end_rel - row_start_rel,
word_count=len(row_words),
words=row_words,
row_type=row_type,
gap_before=gap_before,
))
# --- Step 7: Word-center grid regularization ---
# Derive precise row boundaries from word vertical centers. Detects
# section breaks (headings, paragraphs) and builds per-section grids.
rows = _regularize_row_grid(rows, word_dicts, left_x, right_x, top_y,
content_w, content_h, inv)
type_counts = {}
for r in rows:
type_counts[r.row_type] = type_counts.get(r.row_type, 0) + 1
logger.info(f"RowGeometry: {len(rows)} rows detected: {type_counts}")
return rows
def _regularize_row_grid(
rows: List['RowGeometry'],
word_dicts: List[Dict],
left_x: int, right_x: int,
top_y: int,
content_w: int, content_h: int,
inv: np.ndarray,
) -> List['RowGeometry']:
"""Rebuild row boundaries from word center-lines with section-break awareness.
Instead of overlaying a rigid grid, this derives row positions bottom-up
from the words themselves:
1. Group words into line clusters (by Y proximity).
2. For each cluster compute center_y (median of word vertical centers)
and letter_height (median of word heights).
3. Compute the pitch (distance between consecutive centers).
4. Detect section breaks where the gap is >1.8× the median pitch
(headings, sub-headings, paragraph breaks).
5. Within each section, use the local pitch to place row boundaries
at the midpoints between consecutive centers.
6. Validate that ≥85% of words land in a grid row; otherwise fall back.
Header/footer rows from the gap-based detection are preserved.
"""
content_rows = [r for r in rows if r.row_type == 'content']
non_content = [r for r in rows if r.row_type != 'content']
if len(content_rows) < 5:
return rows
# --- Step A: Group ALL words into line clusters ---
# Collect words that belong to content rows (deduplicated)
content_words: List[Dict] = []
seen_keys: set = set()
for r in content_rows:
for w in r.words:
key = (w['left'], w['top'], w['width'], w['height'])
if key not in seen_keys:
seen_keys.add(key)
content_words.append(w)
if len(content_words) < 5:
return rows
# Compute median word height (excluding outliers like tall brackets/IPA)
word_heights = sorted(w['height'] for w in content_words)
median_wh = word_heights[len(word_heights) // 2]
# Compute median gap-based row height — this is the actual line height
# as detected by the horizontal projection. We use 40% of this as
# grouping tolerance. This is much more reliable than using word height
# alone, because words on the same line can have very different heights
# (e.g. lowercase vs uppercase, brackets, phonetic symbols).
gap_row_heights = sorted(r.height for r in content_rows)
median_row_h = gap_row_heights[len(gap_row_heights) // 2]
# Tolerance: 40% of row height. Words on the same line should have
# centers within this range. Even if a word's bbox is taller/shorter,
# its center should stay within half a row height of the line center.
y_tol = max(10, int(median_row_h * 0.4))
# Sort by center_y, then group by proximity
words_by_center = sorted(content_words,
key=lambda w: (w['top'] + w['height'] / 2, w['left']))
line_clusters: List[List[Dict]] = []
current_line: List[Dict] = [words_by_center[0]]
current_center = words_by_center[0]['top'] + words_by_center[0]['height'] / 2
for w in words_by_center[1:]:
w_center = w['top'] + w['height'] / 2
if abs(w_center - current_center) <= y_tol:
current_line.append(w)
else:
current_line.sort(key=lambda w: w['left'])
line_clusters.append(current_line)
current_line = [w]
current_center = w_center
if current_line:
current_line.sort(key=lambda w: w['left'])
line_clusters.append(current_line)
if len(line_clusters) < 3:
return rows
# --- Step B: Compute center_y per cluster ---
# center_y = median of (word_top + word_height/2) across all words in cluster
# letter_h = median of word heights, but excluding outlier-height words
# (>2× median) so that tall brackets/IPA don't skew the height
cluster_info: List[Dict] = []
for cl_words in line_clusters:
centers = [w['top'] + w['height'] / 2 for w in cl_words]
# Filter outlier heights for letter_h computation
normal_heights = [w['height'] for w in cl_words
if w['height'] <= median_wh * 2.0]
if not normal_heights:
normal_heights = [w['height'] for w in cl_words]
center_y = float(np.median(centers))
letter_h = float(np.median(normal_heights))
cluster_info.append({
'center_y_rel': center_y, # relative to content ROI
'center_y_abs': center_y + top_y, # absolute
'letter_h': letter_h,
'words': cl_words,
})
cluster_info.sort(key=lambda c: c['center_y_rel'])
# --- Step B2: Merge clusters that are too close together ---
# Even with center-based grouping, some edge cases can produce
# spurious clusters. Merge any pair whose centers are closer
# than 30% of the row height (they're definitely the same text line).
merge_threshold = max(8, median_row_h * 0.3)
merged: List[Dict] = [cluster_info[0]]
for cl in cluster_info[1:]:
prev = merged[-1]
if cl['center_y_rel'] - prev['center_y_rel'] < merge_threshold:
# Merge: combine words, recompute center
combined_words = prev['words'] + cl['words']
centers = [w['top'] + w['height'] / 2 for w in combined_words]
normal_heights = [w['height'] for w in combined_words
if w['height'] <= median_wh * 2.0]
if not normal_heights:
normal_heights = [w['height'] for w in combined_words]
prev['center_y_rel'] = float(np.median(centers))
prev['center_y_abs'] = prev['center_y_rel'] + top_y
prev['letter_h'] = float(np.median(normal_heights))
prev['words'] = combined_words
else:
merged.append(cl)
cluster_info = merged
if len(cluster_info) < 3:
return rows
# --- Step C: Compute pitches and detect section breaks ---
pitches: List[float] = []
for i in range(1, len(cluster_info)):
pitch = cluster_info[i]['center_y_rel'] - cluster_info[i - 1]['center_y_rel']
pitches.append(pitch)
if not pitches:
return rows
median_pitch = float(np.median(pitches))
if median_pitch <= 5:
return rows
# A section break is where the gap between line centers is much larger
# than the normal pitch (sub-headings, section titles, etc.)
BREAK_FACTOR = 1.8
# --- Step D: Build sections (groups of consecutive lines with normal spacing) ---
sections: List[List[Dict]] = []
current_section: List[Dict] = [cluster_info[0]]
for i in range(1, len(cluster_info)):
gap = cluster_info[i]['center_y_rel'] - cluster_info[i - 1]['center_y_rel']
if gap > median_pitch * BREAK_FACTOR:
sections.append(current_section)
current_section = [cluster_info[i]]
else:
current_section.append(cluster_info[i])
if current_section:
sections.append(current_section)
# --- Step E: Build row boundaries per section ---
grid_rows: List[RowGeometry] = []
for section in sections:
if not section:
continue
if len(section) == 1:
# Single-line section (likely a heading)
cl = section[0]
half_h = max(cl['letter_h'], median_pitch * 0.4)
row_top = cl['center_y_abs'] - half_h
row_bot = cl['center_y_abs'] + half_h
grid_rows.append(RowGeometry(
index=0,
x=left_x,
y=round(row_top),
width=content_w,
height=round(row_bot - row_top),
word_count=len(cl['words']),
words=cl['words'],
row_type='content',
gap_before=0,
))
continue
# Compute local pitch for this section
local_pitches = []
for i in range(1, len(section)):
local_pitches.append(
section[i]['center_y_rel'] - section[i - 1]['center_y_rel']
)
local_pitch = float(np.median(local_pitches)) if local_pitches else median_pitch
# Row boundaries are placed at midpoints between consecutive centers.
# First row: top = center - local_pitch/2
# Last row: bottom = center + local_pitch/2
for i, cl in enumerate(section):
if i == 0:
row_top = cl['center_y_abs'] - local_pitch / 2
else:
# Midpoint between this center and previous center
prev_center = section[i - 1]['center_y_abs']
row_top = (prev_center + cl['center_y_abs']) / 2
if i == len(section) - 1:
row_bot = cl['center_y_abs'] + local_pitch / 2
else:
next_center = section[i + 1]['center_y_abs']
row_bot = (cl['center_y_abs'] + next_center) / 2
# Clamp to reasonable bounds
row_top = max(top_y, row_top)
row_bot = min(top_y + content_h, row_bot)
if row_bot - row_top < 5:
continue
grid_rows.append(RowGeometry(
index=0,
x=left_x,
y=round(row_top),
width=content_w,
height=round(row_bot - row_top),
word_count=len(cl['words']),
words=cl['words'],
row_type='content',
gap_before=0,
))
if not grid_rows:
return rows
# --- Step F: Re-assign words to grid rows ---
# Words may have shifted slightly; assign each word to the row whose
# center is closest to the word's vertical center.
for gr in grid_rows:
gr.words = []
for w in content_words:
w_center = w['top'] + top_y + w['height'] / 2
best_row = None
best_dist = float('inf')
for gr in grid_rows:
row_center = gr.y + gr.height / 2
dist = abs(w_center - row_center)
if dist < best_dist:
best_dist = dist
best_row = gr
if best_row is not None and best_dist < median_pitch:
best_row.words.append(w)
for gr in grid_rows:
gr.word_count = len(gr.words)
# --- Step G: Validate ---
words_placed = sum(gr.word_count for gr in grid_rows)
if len(content_words) > 0:
match_ratio = words_placed / len(content_words)
if match_ratio < 0.85:
logger.info(f"RowGrid: word-center grid only matches {match_ratio:.0%} "
f"of words, keeping gap-based rows")
return rows
# Remove empty grid rows (no words assigned)
grid_rows = [gr for gr in grid_rows if gr.word_count > 0]
# The grid must not produce MORE rows than gap-based detection.
# More rows means the clustering split actual lines — that's worse.
if len(grid_rows) > len(content_rows):
logger.info(f"RowGrid: grid produced {len(grid_rows)} rows > "
f"{len(content_rows)} gap-based → keeping gap-based rows")
return rows
# --- Step H: Merge header/footer + re-index ---
result = list(non_content) + grid_rows
result.sort(key=lambda r: r.y)
for i, r in enumerate(result):
r.index = i
row_heights = [gr.height for gr in grid_rows]
min_h = min(row_heights) if row_heights else 0
max_h = max(row_heights) if row_heights else 0
logger.info(f"RowGrid: word-center grid applied "
f"(median_pitch={median_pitch:.0f}px, median_row_h={median_row_h}px, median_wh={median_wh}px, "
f"y_tol={y_tol}px, {len(line_clusters)} clusters→{len(cluster_info)} merged, "
f"{len(sections)} sections, "
f"{len(grid_rows)} grid rows [h={min_h}-{max_h}px], "
f"was {len(content_rows)} gap-based rows)")
return result
def _build_rows_from_word_grouping(
word_dicts: List[Dict],
left_x: int, right_x: int,
top_y: int, bottom_y: int,
content_w: int, content_h: int,
) -> List['RowGeometry']:
"""Fallback: build rows by grouping words by Y position.
Uses _group_words_into_lines() with a generous tolerance.
No header/footer detection in fallback mode.
"""
if not word_dicts:
return []
y_tolerance = max(20, content_h // 100)
lines = _group_words_into_lines(word_dicts, y_tolerance_px=y_tolerance)
rows = []
for idx, line_words in enumerate(lines):
if not line_words:
continue
min_top = min(w['top'] for w in line_words)
max_bottom = max(w['top'] + w['height'] for w in line_words)
row_height = max_bottom - min_top
rows.append(RowGeometry(
index=idx,
x=left_x,
y=top_y + min_top,
width=content_w,
height=row_height,
word_count=len(line_words),
words=line_words,
row_type='content',
gap_before=0,
))
logger.info(f"RowGeometry (fallback): {len(rows)} rows from word grouping")
return rows
# --- Phase B: Content-Based Classification ---
def _score_language(words: List[Dict]) -> Dict[str, float]:
"""Score the language of a column's words.
Analyzes function words, umlauts, and capitalization patterns
to determine whether text is English or German.
Args:
words: List of word dicts with 'text' and 'conf' keys.
Returns:
Dict with 'eng' and 'deu' scores (0.0-1.0).
"""
if not words:
return {'eng': 0.0, 'deu': 0.0}
# Only consider words with decent confidence
good_words = [w['text'].lower() for w in words if w.get('conf', 0) > 40 and len(w['text']) > 0]
if not good_words:
return {'eng': 0.0, 'deu': 0.0}
total = len(good_words)
en_hits = sum(1 for w in good_words if w in ENGLISH_FUNCTION_WORDS)
de_hits = sum(1 for w in good_words if w in GERMAN_FUNCTION_WORDS)
# Check for umlauts (strong German signal)
raw_texts = [w['text'] for w in words if w.get('conf', 0) > 40]
umlaut_count = sum(1 for t in raw_texts
for c in t if c in 'äöüÄÖÜß')
# German capitalization: nouns are capitalized mid-sentence
# Count words that start with uppercase but aren't at position 0
cap_words = sum(1 for t in raw_texts if t[0].isupper() and len(t) > 2)
en_score = en_hits / total if total > 0 else 0.0
de_score = de_hits / total if total > 0 else 0.0
# Boost German score for umlauts
if umlaut_count > 0:
de_score = min(1.0, de_score + 0.15 * min(umlaut_count, 5))
# Boost German score for high capitalization ratio (typical for German nouns)
if total > 5:
cap_ratio = cap_words / total
if cap_ratio > 0.3:
de_score = min(1.0, de_score + 0.1)
return {'eng': round(en_score, 3), 'deu': round(de_score, 3)}
def _score_role(geom: ColumnGeometry) -> Dict[str, float]:
"""Score the role of a column based on its geometry and content patterns.
Args:
geom: ColumnGeometry with words and dimensions.
Returns:
Dict with role scores: 'reference', 'marker', 'sentence', 'vocabulary'.
"""
scores = {'reference': 0.0, 'marker': 0.0, 'sentence': 0.0, 'vocabulary': 0.0}
if not geom.words:
return scores
texts = [w['text'] for w in geom.words if w.get('conf', 0) > 40]
if not texts:
return scores
avg_word_len = sum(len(t) for t in texts) / len(texts)
has_punctuation = sum(1 for t in texts if any(c in t for c in '.!?;:,'))
digit_words = sum(1 for t in texts if any(c.isdigit() for c in t))
digit_ratio = digit_words / len(texts) if texts else 0.0
# Reference: narrow + mostly numbers/page references
if geom.width_ratio < 0.12:
scores['reference'] = 0.5
if digit_ratio > 0.4:
scores['reference'] = min(1.0, 0.5 + digit_ratio * 0.5)
# Marker: narrow + few short entries
if geom.width_ratio < 0.06 and geom.word_count <= 15:
scores['marker'] = 0.7
if avg_word_len < 4:
scores['marker'] = 0.9
# Very narrow non-edge column → strong marker regardless of word count
if geom.width_ratio < 0.04 and geom.index > 0:
scores['marker'] = max(scores['marker'], 0.9)
# Sentence: longer words + punctuation present
if geom.width_ratio > 0.15 and has_punctuation > 2:
scores['sentence'] = 0.3 + min(0.5, has_punctuation / len(texts))
if avg_word_len > 4:
scores['sentence'] = min(1.0, scores['sentence'] + 0.2)
# Vocabulary: medium width + medium word length
if 0.10 < geom.width_ratio < 0.45:
scores['vocabulary'] = 0.4
if 3 < avg_word_len < 8:
scores['vocabulary'] = min(1.0, scores['vocabulary'] + 0.3)
return {k: round(v, 3) for k, v in scores.items()}
def classify_column_types(geometries: List[ColumnGeometry],
content_w: int,
top_y: int,
img_w: int,
img_h: int,
bottom_y: int) -> List[PageRegion]:
"""Classify column types using a 3-level fallback chain.
Level 1: Content-based (language + role scoring)
Level 2: Position + language (old rules enhanced with language detection)
Level 3: Pure position (exact old code, no regression)
Args:
geometries: List of ColumnGeometry from Phase A.
content_w: Total content width.
top_y: Top Y of content area.
img_w: Full image width.
img_h: Full image height.
bottom_y: Bottom Y of content area.
Returns:
List of PageRegion with types, confidence, and method.
"""
content_h = bottom_y - top_y
# Special case: single column → plain text page
if len(geometries) == 1:
geom = geometries[0]
return [PageRegion(
type='column_text', x=geom.x, y=geom.y,
width=geom.width, height=geom.height,
classification_confidence=0.9,
classification_method='content',
)]
# --- Pre-filter: first/last columns with very few words → column_ignore ---
ignore_regions = []
active_geometries = []
for idx, g in enumerate(geometries):
if (idx == 0 or idx == len(geometries) - 1) and g.word_count < 8:
ignore_regions.append(PageRegion(
type='column_ignore', x=g.x, y=g.y,
width=g.width, height=content_h,
classification_confidence=0.95,
classification_method='content',
))
logger.info(f"ClassifyColumns: column {idx} (x={g.x}, words={g.word_count}) → column_ignore (edge, few words)")
else:
active_geometries.append(g)
# Re-index active geometries for classification
for new_idx, g in enumerate(active_geometries):
g.index = new_idx
geometries = active_geometries
# Handle edge case: all columns ignored or only 1 left
if len(geometries) == 0:
return ignore_regions
if len(geometries) == 1:
geom = geometries[0]
ignore_regions.append(PageRegion(
type='column_text', x=geom.x, y=geom.y,
width=geom.width, height=geom.height,
classification_confidence=0.9,
classification_method='content',
))
return ignore_regions
# --- Score all columns ---
lang_scores = [_score_language(g.words) for g in geometries]
role_scores = [_score_role(g) for g in geometries]
logger.info(f"ClassifyColumns: language scores: "
f"{[(g.index, ls) for g, ls in zip(geometries, lang_scores)]}")
logger.info(f"ClassifyColumns: role scores: "
f"{[(g.index, rs) for g, rs in zip(geometries, role_scores)]}")
# --- Level 1: Content-based classification ---
regions = _classify_by_content(geometries, lang_scores, role_scores, content_w, content_h)
if regions is not None:
logger.info("ClassifyColumns: Level 1 (content-based) succeeded")
_add_header_footer(regions, top_y, bottom_y, img_w, img_h)
return ignore_regions + regions
# --- Level 2: Position + language enhanced ---
regions = _classify_by_position_enhanced(geometries, lang_scores, content_w, content_h)
if regions is not None:
logger.info("ClassifyColumns: Level 2 (position+language) succeeded")
_add_header_footer(regions, top_y, bottom_y, img_w, img_h)
return ignore_regions + regions
# --- Level 3: Pure position fallback (old code, no regression) ---
logger.info("ClassifyColumns: Level 3 (position fallback)")
regions = _classify_by_position_fallback(geometries, content_w, content_h)
_add_header_footer(regions, top_y, bottom_y, img_w, img_h)
return ignore_regions + regions
def _classify_by_content(geometries: List[ColumnGeometry],
lang_scores: List[Dict[str, float]],
role_scores: List[Dict[str, float]],
content_w: int,
content_h: int) -> Optional[List[PageRegion]]:
"""Level 1: Classify columns purely by content analysis.
Requires clear language signals to distinguish EN/DE columns.
Returns None if language signals are too weak.
"""
regions = []
assigned = set()
# Step 1: Assign structural roles first (reference, marker)
# left_20_threshold: only the leftmost ~20% of content area qualifies for page_ref
left_20_threshold = geometries[0].x + content_w * 0.20 if geometries else 0
for i, (geom, rs, ls) in enumerate(zip(geometries, role_scores, lang_scores)):
is_left_side = geom.x < left_20_threshold
has_strong_language = ls['eng'] > 0.3 or ls['deu'] > 0.3
if rs['reference'] >= 0.5 and geom.width_ratio < 0.12 and is_left_side and not has_strong_language:
regions.append(PageRegion(
type='page_ref', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=rs['reference'],
classification_method='content',
))
assigned.add(i)
elif rs['marker'] >= 0.7 and geom.width_ratio < 0.06:
regions.append(PageRegion(
type='column_marker', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=rs['marker'],
classification_method='content',
))
assigned.add(i)
elif geom.width_ratio < 0.05 and not is_left_side:
# Narrow column on the right side → marker, not page_ref
regions.append(PageRegion(
type='column_marker', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=0.8,
classification_method='content',
))
assigned.add(i)
# Step 2: Among remaining columns, find EN and DE by language scores
remaining = [(i, geometries[i], lang_scores[i], role_scores[i])
for i in range(len(geometries)) if i not in assigned]
if len(remaining) < 2:
# Not enough columns for EN/DE pair
if len(remaining) == 1:
i, geom, ls, rs = remaining[0]
regions.append(PageRegion(
type='column_text', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=0.6,
classification_method='content',
))
regions.sort(key=lambda r: r.x)
return regions
# Check if we have enough language signal
en_candidates = [(i, g, ls) for i, g, ls, rs in remaining if ls['eng'] > ls['deu'] and ls['eng'] > 0.05]
de_candidates = [(i, g, ls) for i, g, ls, rs in remaining if ls['deu'] > ls['eng'] and ls['deu'] > 0.05]
# Position tiebreaker: when language signals are weak, use left=EN, right=DE
if (not en_candidates or not de_candidates) and len(remaining) >= 2:
max_eng = max(ls['eng'] for _, _, ls, _ in remaining)
max_deu = max(ls['deu'] for _, _, ls, _ in remaining)
if max_eng < 0.15 and max_deu < 0.15:
# Both signals weak — fall back to positional: left=EN, right=DE
sorted_remaining = sorted(remaining, key=lambda x: x[1].x)
best_en = (sorted_remaining[0][0], sorted_remaining[0][1], sorted_remaining[0][2])
best_de = (sorted_remaining[1][0], sorted_remaining[1][1], sorted_remaining[1][2])
logger.info("ClassifyColumns: Level 1 using position tiebreaker (weak signals) - left=EN, right=DE")
en_conf = 0.4
de_conf = 0.4
regions.append(PageRegion(
type='column_en', x=best_en[1].x, y=best_en[1].y,
width=best_en[1].width, height=content_h,
classification_confidence=en_conf,
classification_method='content',
))
assigned.add(best_en[0])
regions.append(PageRegion(
type='column_de', x=best_de[1].x, y=best_de[1].y,
width=best_de[1].width, height=content_h,
classification_confidence=de_conf,
classification_method='content',
))
assigned.add(best_de[0])
# Assign remaining as example
for i, geom, ls, rs in remaining:
if i not in assigned:
regions.append(PageRegion(
type='column_example', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=0.4,
classification_method='content',
))
regions.sort(key=lambda r: r.x)
return regions
if not en_candidates or not de_candidates:
# Language signals too weak for content-based classification
logger.info("ClassifyColumns: Level 1 failed - no clear EN/DE language split")
return None
# Pick the best EN and DE candidates
best_en = max(en_candidates, key=lambda x: x[2]['eng'])
best_de = max(de_candidates, key=lambda x: x[2]['deu'])
if best_en[0] == best_de[0]:
# Same column scored highest for both — ambiguous
logger.info("ClassifyColumns: Level 1 failed - same column highest for EN and DE")
return None
en_conf = best_en[2]['eng']
de_conf = best_de[2]['deu']
regions.append(PageRegion(
type='column_en', x=best_en[1].x, y=best_en[1].y,
width=best_en[1].width, height=content_h,
classification_confidence=round(en_conf, 2),
classification_method='content',
))
assigned.add(best_en[0])
regions.append(PageRegion(
type='column_de', x=best_de[1].x, y=best_de[1].y,
width=best_de[1].width, height=content_h,
classification_confidence=round(de_conf, 2),
classification_method='content',
))
assigned.add(best_de[0])
# Step 3: Remaining columns → example or text based on role scores
for i, geom, ls, rs in remaining:
if i in assigned:
continue
if rs['sentence'] > 0.4:
regions.append(PageRegion(
type='column_example', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=round(rs['sentence'], 2),
classification_method='content',
))
else:
regions.append(PageRegion(
type='column_example', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=0.5,
classification_method='content',
))
regions.sort(key=lambda r: r.x)
return regions
def _classify_by_position_enhanced(geometries: List[ColumnGeometry],
lang_scores: List[Dict[str, float]],
content_w: int,
content_h: int) -> Optional[List[PageRegion]]:
"""Level 2: Position-based rules enhanced with language confirmation.
Uses the old positional heuristics but confirms EN/DE assignment
with language scores (swapping if needed).
"""
regions = []
untyped = list(range(len(geometries)))
first_x = geometries[0].x if geometries else 0
left_20_threshold = first_x + content_w * 0.20
# Rule 1: Leftmost narrow column → page_ref (only if in left 20%, no strong language)
g0 = geometries[0]
ls0 = lang_scores[0]
has_strong_lang_0 = ls0['eng'] > 0.3 or ls0['deu'] > 0.3
if g0.width_ratio < 0.12 and g0.x < left_20_threshold and not has_strong_lang_0:
regions.append(PageRegion(
type='page_ref', x=g0.x, y=g0.y,
width=g0.width, height=content_h,
classification_confidence=0.8,
classification_method='position_enhanced',
))
untyped.remove(0)
# Rule 2: Narrow columns with few words → marker
for i in list(untyped):
geom = geometries[i]
if geom.width_ratio < 0.06 and geom.word_count <= 15:
regions.append(PageRegion(
type='column_marker', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=0.7,
classification_method='position_enhanced',
))
untyped.remove(i)
# Rule 3: Rightmost remaining → column_example (if 3+ remaining)
if len(untyped) >= 3:
last_idx = untyped[-1]
geom = geometries[last_idx]
regions.append(PageRegion(
type='column_example', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=0.7,
classification_method='position_enhanced',
))
untyped.remove(last_idx)
# Rule 4: First two remaining → EN/DE, but check language to possibly swap
if len(untyped) >= 2:
idx_a = untyped[0]
idx_b = untyped[1]
ls_a = lang_scores[idx_a]
ls_b = lang_scores[idx_b]
# Default: first=EN, second=DE (old behavior)
en_idx, de_idx = idx_a, idx_b
conf = 0.7
# Swap if language signals clearly indicate the opposite
if ls_a['deu'] > ls_a['eng'] and ls_b['eng'] > ls_b['deu']:
en_idx, de_idx = idx_b, idx_a
conf = 0.85
logger.info(f"ClassifyColumns: Level 2 swapped EN/DE based on language scores")
regions.append(PageRegion(
type='column_en', x=geometries[en_idx].x, y=geometries[en_idx].y,
width=geometries[en_idx].width, height=content_h,
classification_confidence=conf,
classification_method='position_enhanced',
))
regions.append(PageRegion(
type='column_de', x=geometries[de_idx].x, y=geometries[de_idx].y,
width=geometries[de_idx].width, height=content_h,
classification_confidence=conf,
classification_method='position_enhanced',
))
untyped = untyped[2:]
elif len(untyped) == 1:
idx = untyped[0]
geom = geometries[idx]
regions.append(PageRegion(
type='column_en', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=0.5,
classification_method='position_enhanced',
))
untyped = []
# Remaining → example
for idx in untyped:
geom = geometries[idx]
regions.append(PageRegion(
type='column_example', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=0.5,
classification_method='position_enhanced',
))
regions.sort(key=lambda r: r.x)
return regions
def _classify_by_position_fallback(geometries: List[ColumnGeometry],
content_w: int,
content_h: int) -> List[PageRegion]:
"""Level 3: Pure position-based fallback (identical to old code).
Guarantees no regression from the previous behavior.
"""
regions = []
untyped = list(range(len(geometries)))
first_x = geometries[0].x if geometries else 0
left_20_threshold = first_x + content_w * 0.20
# Rule 1: Leftmost narrow column → page_ref (only if in left 20%)
g0 = geometries[0]
if g0.width_ratio < 0.12 and g0.x < left_20_threshold:
regions.append(PageRegion(
type='page_ref', x=g0.x, y=g0.y,
width=g0.width, height=content_h,
classification_confidence=1.0,
classification_method='position_fallback',
))
untyped.remove(0)
# Rule 2: Narrow + few words → marker
for i in list(untyped):
geom = geometries[i]
if geom.width_ratio < 0.06 and geom.word_count <= 15:
regions.append(PageRegion(
type='column_marker', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=1.0,
classification_method='position_fallback',
))
untyped.remove(i)
# Rule 3: Rightmost remaining → example (if 3+)
if len(untyped) >= 3:
last_idx = untyped[-1]
geom = geometries[last_idx]
regions.append(PageRegion(
type='column_example', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=1.0,
classification_method='position_fallback',
))
untyped.remove(last_idx)
# Rule 4: First remaining → EN, second → DE
if len(untyped) >= 2:
en_idx = untyped[0]
de_idx = untyped[1]
regions.append(PageRegion(
type='column_en', x=geometries[en_idx].x, y=geometries[en_idx].y,
width=geometries[en_idx].width, height=content_h,
classification_confidence=1.0,
classification_method='position_fallback',
))
regions.append(PageRegion(
type='column_de', x=geometries[de_idx].x, y=geometries[de_idx].y,
width=geometries[de_idx].width, height=content_h,
classification_confidence=1.0,
classification_method='position_fallback',
))
untyped = untyped[2:]
elif len(untyped) == 1:
idx = untyped[0]
geom = geometries[idx]
regions.append(PageRegion(
type='column_en', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=1.0,
classification_method='position_fallback',
))
untyped = []
for idx in untyped:
geom = geometries[idx]
regions.append(PageRegion(
type='column_example', x=geom.x, y=geom.y,
width=geom.width, height=content_h,
classification_confidence=1.0,
classification_method='position_fallback',
))
regions.sort(key=lambda r: r.x)
return regions
def _add_header_footer(regions: List[PageRegion], top_y: int, bottom_y: int,
img_w: int, img_h: int) -> None:
"""Add header/footer regions in-place."""
if top_y > 10:
regions.append(PageRegion(type='header', x=0, y=0, width=img_w, height=top_y))
if bottom_y < img_h - 10:
regions.append(PageRegion(type='footer', x=0, y=bottom_y, width=img_w, height=img_h - bottom_y))
# --- Main Entry Point ---
def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> List[PageRegion]:
"""Detect columns using two-phase approach: geometry then content classification.
Phase A: detect_column_geometry() — clustering word positions into columns.
Phase B: classify_column_types() — content-based type assignment with fallback.
Falls back to projection-based analyze_layout() if geometry detection fails.
Args:
ocr_img: Binarized grayscale image for layout analysis.
dewarped_bgr: Original BGR image (for Tesseract word detection).
Returns:
List of PageRegion objects with types, confidence, and method.
"""
h, w = ocr_img.shape[:2]
# Phase A: Geometry detection
result = detect_column_geometry(ocr_img, dewarped_bgr)
if result is None:
# Fallback to projection-based layout
logger.info("LayoutByWords: geometry detection failed, falling back to projection profiles")
layout_img = create_layout_image(dewarped_bgr)
return analyze_layout(layout_img, ocr_img)
geometries, left_x, right_x, top_y, bottom_y, _word_dicts, _inv = result
content_w = right_x - left_x
# Phase B: Content-based classification
regions = classify_column_types(geometries, content_w, top_y, w, h, bottom_y)
col_count = len([r for r in regions if r.type.startswith('column') or r.type == 'page_ref'])
methods = set(r.classification_method for r in regions if r.classification_method)
logger.info(f"LayoutByWords: {col_count} columns detected (methods: {methods}): "
f"{[(r.type, r.x, r.width, r.classification_confidence) for r in regions if r.type not in ('header','footer')]}")
return regions
# =============================================================================
# Pipeline Step 5: Word Grid from Columns × Rows
# =============================================================================
def _words_to_reading_order_lines(words: List[Dict], y_tolerance_px: int = 15) -> List[str]:
"""Group OCR words into visual lines in reading order.
Returns a list of line strings (one per visual line in the cell).
"""
if not words:
return []
lines = _group_words_into_lines(words, y_tolerance_px=y_tolerance_px)
return [' '.join(w['text'] for w in line) for line in lines]
def _rejoin_hyphenated(lines: List[str]) -> List[str]:
"""Rejoin words split by line-break hyphenation.
E.g. ['Fuß-', 'boden'] → ['Fußboden']
['some text-', 'thing here'] → ['something here']
"""
if len(lines) <= 1:
return lines
result = []
i = 0
while i < len(lines):
line = lines[i]
# If line ends with '-' and there's a next line, rejoin
if i + 1 < len(lines) and line.rstrip().endswith('-'):
stripped = line.rstrip()
# Get the word fragment before hyphen (last word)
prefix = stripped[:-1] # remove trailing hyphen
next_line = lines[i + 1]
# Join: last word of this line + first word of next line
prefix_words = prefix.rsplit(' ', 1)
next_words = next_line.split(' ', 1)
if len(prefix_words) > 1:
joined = prefix_words[0] + ' ' + prefix_words[1] + next_words[0]
else:
joined = prefix_words[0] + next_words[0]
remainder = next_words[1] if len(next_words) > 1 else ''
if remainder:
result.append(joined + ' ' + remainder)
else:
result.append(joined)
i += 2
else:
result.append(line)
i += 1
return result
def _words_to_reading_order_text(words: List[Dict], y_tolerance_px: int = 15) -> str:
"""Join OCR words into text in correct reading order, preserving line breaks.
Groups words into visual lines by Y-tolerance, sorts each line by X,
rejoins hyphenated words, then joins lines with newlines.
"""
lines = _words_to_reading_order_lines(words, y_tolerance_px)
lines = _rejoin_hyphenated(lines)
return '\n'.join(lines)
# --- RapidOCR integration (PaddleOCR models on ONNX Runtime) ---
_rapid_engine = None
RAPIDOCR_AVAILABLE = False
try:
from rapidocr import RapidOCR as _RapidOCRClass
from rapidocr import LangRec as _LangRec, OCRVersion as _OCRVersion, ModelType as _ModelType
RAPIDOCR_AVAILABLE = True
logger.info("RapidOCR available — can be used as alternative to Tesseract")
except ImportError:
logger.info("RapidOCR not installed — using Tesseract only")
def _get_rapid_engine():
"""Lazy-init RapidOCR engine with PP-OCRv5 Latin model for German support."""
global _rapid_engine
if _rapid_engine is None:
_rapid_engine = _RapidOCRClass(params={
# PP-OCRv5 Latin model — supports German umlauts (ä, ö, ü, ß)
"Rec.lang_type": _LangRec.LATIN,
"Rec.model_type": _ModelType.SERVER,
"Rec.ocr_version": _OCRVersion.PPOCRV5,
# Tighter detection boxes to reduce word merging
"Det.unclip_ratio": 1.3,
"Det.box_thresh": 0.6,
# Silence verbose logging
"Global.log_level": "critical",
})
logger.info("RapidOCR engine initialized (PP-OCRv5 Latin, unclip_ratio=1.3)")
return _rapid_engine
def ocr_region_rapid(
img_bgr: np.ndarray,
region: PageRegion,
) -> List[Dict[str, Any]]:
"""Run RapidOCR on a specific region, returning word dicts compatible with Tesseract format.
Args:
img_bgr: Full-page BGR image (NOT binarized — RapidOCR works on color/gray).
region: Region to crop and OCR.
Returns:
List of word dicts with text, left, top, width, height, conf, region_type.
"""
engine = _get_rapid_engine()
# Crop region from BGR image
crop = img_bgr[region.y:region.y + region.height,
region.x:region.x + region.width]
if crop.size == 0:
return []
result = engine(crop)
if result is None or result.boxes is None or result.txts is None:
return []
words = []
boxes = result.boxes # shape (N, 4, 2) — 4 corner points per text line
txts = result.txts # tuple of strings
scores = result.scores # tuple of floats
for i, (box, txt, score) in enumerate(zip(boxes, txts, scores)):
if not txt or not txt.strip():
continue
# box is [[x1,y1],[x2,y2],[x3,y3],[x4,y4]] (clockwise from top-left)
xs = [p[0] for p in box]
ys = [p[1] for p in box]
left = int(min(xs))
top = int(min(ys))
w = int(max(xs) - left)
h = int(max(ys) - top)
words.append({
'text': txt.strip(),
'left': left + region.x, # Absolute coords
'top': top + region.y,
'width': w,
'height': h,
'conf': int(score * 100), # 0-100 like Tesseract
'region_type': region.type,
})
return words
# =============================================================================
# Post-Processing: Deterministic Quality Fixes
# =============================================================================
# --- A. Character Confusion Fix (I/1/l) ---
# Common OCR confusion pairs in vocabulary context
_CHAR_CONFUSION_RULES = [
# "1" at word start followed by lowercase → likely "I" or "l"
(re.compile(r'\b1([a-z])'), r'I\1'), # 1ch → Ich, 1 want → I want
# Standalone "1" between words → "I" (English pronoun)
(re.compile(r'(?<!\d)\b1\b(?!\d)'), 'I'), # "1 want" → "I want"
# "|" used as "I" or "l"
(re.compile(r'(?<!\|)\|(?!\|)'), 'I'), # |ch → Ich
]
# Cross-language indicators: if DE has these, EN "1" is almost certainly "I"
_DE_INDICATORS_FOR_EN_I = {'ich', 'mich', 'mir', 'mein', 'meine', 'meiner', 'meinem'}
def _fix_character_confusion(entries: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Fix common OCR character confusions using context.
Deterministic rules:
- "1" at word start → "I" or "l" based on context
- Cross-reference EN↔DE: if DE contains "ich/mich/mir", EN "1""I"
- "y " artifact at word boundaries → remove (e.g. "y you""you")
"""
for entry in entries:
en = entry.get('english', '') or ''
de = entry.get('german', '') or ''
ex = entry.get('example', '') or ''
# Apply general rules to all fields
for pattern, replacement in _CHAR_CONFUSION_RULES:
en = pattern.sub(replacement, en)
de = pattern.sub(replacement, de)
ex = pattern.sub(replacement, ex)
# Cross-reference: if DE has "ich"/"mich" indicators, fix EN "1" → "I"
de_lower_words = set(de.lower().replace(',', ' ').split())
if de_lower_words & _DE_INDICATORS_FOR_EN_I:
# Any remaining "1" in EN that looks like "I"
en = re.sub(r'\b1\b', 'I', en)
# Fix "y " artifact before repeated word: "y you" → "you"
en = re.sub(r'\by\s+([a-z])', r'\1', en)
ex = re.sub(r'\by\s+([a-z])', r'\1', ex)
entry['english'] = en.strip()
entry['german'] = de.strip()
entry['example'] = ex.strip()
return entries
# --- B. Comma-Separated Word Form Splitting ---
def _split_comma_entries(entries: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Split entries with comma-separated word forms into individual entries.
E.g. EN: "break, broke, broken" / DE: "brechen, brach, gebrochen"
→ 3 entries: break/brechen, broke/brach, broken/gebrochen
Only splits when both EN and DE have the same number of comma-parts,
or when one side has multiple and the other has exactly one.
"""
result: List[Dict[str, Any]] = []
for entry in entries:
en = (entry.get('english', '') or '').strip()
de = (entry.get('german', '') or '').strip()
# Split by comma (but not inside brackets or parentheses)
en_parts = _split_by_comma(en)
de_parts = _split_by_comma(de)
# Only split if we have multiple parts and counts match or one side is single
should_split = False
if len(en_parts) > 1 and len(de_parts) > 1 and len(en_parts) == len(de_parts):
# Both have same count — each part is a word form
# But only if parts are short (word forms, not sentences)
if all(len(p.split()) <= 3 for p in en_parts) and all(len(p.split()) <= 3 for p in de_parts):
should_split = True
if not should_split:
result.append(entry)
continue
# Split into individual entries
for k in range(len(en_parts)):
sub = dict(entry) # shallow copy
sub['english'] = en_parts[k].strip()
sub['german'] = de_parts[k].strip() if k < len(de_parts) else ''
sub['example'] = '' # examples get attached later
sub['split_from_comma'] = True
result.append(sub)
# Re-number
for i, e in enumerate(result):
e['row_index'] = i
return result
def _split_by_comma(text: str) -> List[str]:
"""Split text by commas, but not inside brackets [...] or parens (...)."""
if ',' not in text:
return [text]
parts = []
depth_bracket = 0
depth_paren = 0
current = []
for ch in text:
if ch == '[':
depth_bracket += 1
elif ch == ']':
depth_bracket = max(0, depth_bracket - 1)
elif ch == '(':
depth_paren += 1
elif ch == ')':
depth_paren = max(0, depth_paren - 1)
elif ch == ',' and depth_bracket == 0 and depth_paren == 0:
parts.append(''.join(current).strip())
current = []
continue
current.append(ch)
if current:
parts.append(''.join(current).strip())
# Filter empty parts
return [p for p in parts if p]
# --- C. Example Sentence Attachment ---
def _find_best_vocab_match(example_text: str, vocab_entries: List[Dict[str, Any]]) -> int:
"""Find the vocab entry whose English word(s) best match the example sentence.
Returns index into vocab_entries, or -1 if no match found.
Uses word stem overlap: "a broken arm" matches "broken" or "break".
"""
if not vocab_entries or not example_text:
return -1
example_lower = example_text.lower()
example_words = set(re.findall(r'[a-zäöüß]+', example_lower))
best_idx = -1
best_score = 0
for i, entry in enumerate(vocab_entries):
en = (entry.get('english', '') or '').lower()
if not en:
continue
# Extract vocab words (split on space, comma, newline)
vocab_words = set(re.findall(r'[a-zäöüß]+', en))
# Score: how many vocab words appear in the example?
# Also check if example words share a common stem (first 4 chars)
direct_matches = vocab_words & example_words
score = len(direct_matches) * 10
# Stem matching: "broken" matches "break" via shared prefix "bro"/"bre"
if score == 0:
for vw in vocab_words:
if len(vw) < 3:
continue
stem = vw[:4] if len(vw) >= 4 else vw[:3]
for ew in example_words:
if len(ew) >= len(stem) and ew[:len(stem)] == stem:
score += 5
break
if score > best_score:
best_score = score
best_idx = i
return best_idx if best_score > 0 else -1
def _attach_example_sentences(entries: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Attach rows with EN text but no DE translation as examples to matching vocab entries.
Vocabulary worksheets often have:
Row 1: break, broke, broken / brechen, brach, gebrochen
Row 2: a broken arm (no DE → example for "broken")
Row 3: a broken plate (no DE → example for "broken")
Row 4: egg / Ei (has DE → new vocab entry)
Rules (deterministic, generic):
- A row is an "example row" if it has EN text but NO DE text (or very short DE ≤2 chars)
- Find the best matching vocab entry by checking which entry's English words
appear in the example sentence (semantic matching via word overlap)
- Fall back to the nearest preceding entry if no word match found
- Multiple examples get joined with " | "
"""
if not entries:
return entries
# Separate into vocab entries (have DE) and example candidates (no DE)
vocab_entries: List[Dict[str, Any]] = []
examples_for: Dict[int, List[str]] = {} # vocab_index → list of example texts
for entry in entries:
en = (entry.get('english', '') or '').strip()
de = (entry.get('german', '') or '').strip()
ex = (entry.get('example', '') or '').strip()
# Treat very short DE (≤2 chars) as OCR noise, not real translation
has_de = len(de) > 2
has_en = bool(en)
if has_en and not has_de and vocab_entries:
# This is an example sentence — find best matching vocab entry
example_text = en
if ex:
example_text = f"{en}{ex}"
match_idx = _find_best_vocab_match(en, vocab_entries)
if match_idx < 0:
# No word match → fall back to last entry
match_idx = len(vocab_entries) - 1
if match_idx not in examples_for:
examples_for[match_idx] = []
examples_for[match_idx].append(example_text)
else:
vocab_entries.append(entry)
# Attach examples to their matched vocab entries
for idx, example_list in examples_for.items():
if 0 <= idx < len(vocab_entries):
entry = vocab_entries[idx]
existing_ex = (entry.get('example', '') or '').strip()
new_examples = ' | '.join(example_list)
entry['example'] = f"{existing_ex} | {new_examples}" if existing_ex else new_examples
# Re-number
for i, e in enumerate(vocab_entries):
e['row_index'] = i
return vocab_entries
# --- D. Phonetic Bracket IPA Replacement ---
# Pattern: word [phonetic] or word (phonetic) — capture the word before brackets
_PHONETIC_BRACKET_RE = re.compile(
r'(\b[a-zA-ZäöüÄÖÜß]+)\s*\[([^\]]*)\]'
)
def _lookup_ipa(word: str, pronunciation: str = 'british') -> Optional[str]:
"""Look up IPA for a word using the selected pronunciation dictionary.
Args:
word: English word to look up.
pronunciation: 'british' (Britfone, MIT) or 'american' (eng_to_ipa, MIT).
Returns:
IPA string or None if not found.
"""
word_lower = word.lower().strip()
if not word_lower:
return None
if pronunciation == 'british' and _britfone_dict:
ipa = _britfone_dict.get(word_lower)
if ipa:
return ipa
# Fallback to American if not in Britfone
if _ipa_convert_american:
result = _ipa_convert_american(word_lower)
if result and '*' not in result:
return result
return None
if pronunciation == 'american' and _ipa_convert_american:
result = _ipa_convert_american(word_lower)
if result and '*' not in result:
return result
# Fallback to Britfone if not in CMU
if _britfone_dict:
ipa = _britfone_dict.get(word_lower)
if ipa:
return ipa
return None
# Try any available source
if _britfone_dict:
ipa = _britfone_dict.get(word_lower)
if ipa:
return ipa
if _ipa_convert_american:
result = _ipa_convert_american(word_lower)
if result and '*' not in result:
return result
return None
def _fix_phonetic_brackets(
entries: List[Dict[str, Any]],
pronunciation: str = 'british',
) -> List[Dict[str, Any]]:
"""Replace OCR'd phonetic transcriptions with dictionary IPA.
Detects patterns like "dance [du:ns]" and replaces with correct IPA:
- British: "dance [dˈɑːns]" (Britfone, MIT)
- American: "dance [dæns]" (eng_to_ipa/CMU, MIT)
Only replaces if the word before brackets is found in the dictionary.
"""
if not IPA_AVAILABLE:
return entries
for entry in entries:
for field in ('english', 'german', 'example'):
text = entry.get(field, '') or ''
if '[' not in text:
continue
entry[field] = _replace_phonetics_in_text(text, pronunciation)
return entries
def _replace_phonetics_in_text(text: str, pronunciation: str = 'british') -> str:
"""Replace [phonetic] after words with dictionary IPA."""
if not IPA_AVAILABLE:
return text
def replacer(match):
word = match.group(1)
ocr_phonetic = match.group(2)
# Skip if bracket content looks like regular text (has spaces + capitals)
if len(ocr_phonetic.split()) > 3:
return match.group(0) # Keep original
# Look up in IPA dictionary
ipa = _lookup_ipa(word, pronunciation)
if not ipa:
return match.group(0) # Keep original
return f"{word} [{ipa}]"
return _PHONETIC_BRACKET_RE.sub(replacer, text)
def build_word_grid(
ocr_img: np.ndarray,
column_regions: List[PageRegion],
row_geometries: List[RowGeometry],
img_w: int,
img_h: int,
lang: str = "eng+deu",
ocr_engine: str = "auto",
img_bgr: Optional[np.ndarray] = None,
pronunciation: str = "british",
) -> List[Dict[str, Any]]:
"""Build a word grid by intersecting columns and rows, then OCR each cell.
Args:
ocr_img: Binarized full-page image (for Tesseract).
column_regions: Classified columns from Step 3 (PageRegion list).
row_geometries: Rows from Step 4 (RowGeometry list).
img_w: Image width in pixels.
img_h: Image height in pixels.
lang: Default Tesseract language.
ocr_engine: 'tesseract', 'rapid', or 'auto' (rapid if available, else tesseract).
img_bgr: BGR color image (required for RapidOCR).
Returns:
List of entry dicts with english/german/example text and bbox info (percent).
"""
# Resolve engine choice
use_rapid = False
if ocr_engine == "auto":
use_rapid = RAPIDOCR_AVAILABLE and img_bgr is not None
elif ocr_engine == "rapid":
if not RAPIDOCR_AVAILABLE:
logger.warning("RapidOCR requested but not available, falling back to Tesseract")
else:
use_rapid = True
engine_name = "rapid" if use_rapid else "tesseract"
logger.info(f"build_word_grid: using OCR engine '{engine_name}'")
# Filter to content rows only (skip header/footer)
content_rows = [r for r in row_geometries if r.row_type == 'content']
if not content_rows:
logger.warning("build_word_grid: no content rows found")
return []
# Map column types to roles
VOCAB_COLUMN_TYPES = {'column_en', 'column_de', 'column_example'}
relevant_cols = [c for c in column_regions if c.type in VOCAB_COLUMN_TYPES]
if not relevant_cols:
logger.warning("build_word_grid: no relevant vocabulary columns found")
return []
# Sort columns left-to-right
relevant_cols.sort(key=lambda c: c.x)
# Choose OCR language per column type (Tesseract only)
lang_map = {
'column_en': 'eng',
'column_de': 'deu',
'column_example': 'eng+deu',
}
entries: List[Dict[str, Any]] = []
for row_idx, row in enumerate(content_rows):
entry: Dict[str, Any] = {
'row_index': row_idx,
'english': '',
'german': '',
'example': '',
'confidence': 0.0,
'bbox': {
'x': round(row.x / img_w * 100, 2),
'y': round(row.y / img_h * 100, 2),
'w': round(row.width / img_w * 100, 2),
'h': round(row.height / img_h * 100, 2),
},
'bbox_en': None,
'bbox_de': None,
'bbox_ex': None,
'ocr_engine': engine_name,
}
confidences: List[float] = []
for col in relevant_cols:
# Compute cell region: column x/width, row y/height
# Add padding to avoid clipping edge words
pad = 8 # pixels
cell_x = col.x - pad
cell_y = row.y - pad
cell_w = col.width + 2 * pad
cell_h = row.height + 2 * pad
# Clamp to image bounds
cell_x = max(0, cell_x)
cell_y = max(0, cell_y)
if cell_x + cell_w > img_w:
cell_w = img_w - cell_x
if cell_y + cell_h > img_h:
cell_h = img_h - cell_y
if cell_w <= 0 or cell_h <= 0:
continue
cell_region = PageRegion(
type=col.type,
x=cell_x, y=cell_y,
width=cell_w, height=cell_h,
)
# OCR the cell
if use_rapid:
words = ocr_region_rapid(img_bgr, cell_region)
else:
cell_lang = lang_map.get(col.type, lang)
words = ocr_region(ocr_img, cell_region, lang=cell_lang, psm=6)
# Group into lines, then join in reading order (Fix A)
# Use half of average word height as Y-tolerance
if words:
avg_h = sum(w['height'] for w in words) / len(words)
y_tol = max(10, int(avg_h * 0.5))
else:
y_tol = 15
text = _words_to_reading_order_text(words, y_tolerance_px=y_tol)
if words:
avg_conf = sum(w['conf'] for w in words) / len(words)
confidences.append(avg_conf)
# Bbox in percent
cell_bbox = {
'x': round(cell_x / img_w * 100, 2),
'y': round(cell_y / img_h * 100, 2),
'w': round(cell_w / img_w * 100, 2),
'h': round(cell_h / img_h * 100, 2),
}
if col.type == 'column_en':
entry['english'] = text
entry['bbox_en'] = cell_bbox
elif col.type == 'column_de':
entry['german'] = text
entry['bbox_de'] = cell_bbox
elif col.type == 'column_example':
entry['example'] = text
entry['bbox_ex'] = cell_bbox
entry['confidence'] = round(
sum(confidences) / len(confidences), 1
) if confidences else 0.0
# Only include if at least one field has text
if entry['english'] or entry['german'] or entry['example']:
entries.append(entry)
# --- Post-processing pipeline (deterministic, no LLM) ---
n_raw = len(entries)
# 1. Fix character confusion (I/1/l based on context)
entries = _fix_character_confusion(entries)
# 2. Replace OCR'd phonetics with dictionary IPA
entries = _fix_phonetic_brackets(entries, pronunciation=pronunciation)
# 3. Split comma-separated word forms (break, broke, broken → 3 entries)
entries = _split_comma_entries(entries)
# 5. Attach example sentences (rows without DE → examples for preceding entry)
entries = _attach_example_sentences(entries)
logger.info(f"build_word_grid: {len(entries)} entries from "
f"{n_raw} raw → {len(entries)} after post-processing "
f"({len(content_rows)} content rows × {len(relevant_cols)} columns, "
f"engine={engine_name})")
return entries
# =============================================================================
# Stage 6: Multi-Pass OCR
# =============================================================================
def ocr_region(ocr_img: np.ndarray, region: PageRegion, lang: str,
psm: int, fallback_psm: Optional[int] = None,
min_confidence: float = 40.0) -> List[Dict[str, Any]]:
"""Run Tesseract OCR on a specific region with given PSM.
Args:
ocr_img: Binarized full-page image.
region: Region to crop and OCR.
lang: Tesseract language string.
psm: Page Segmentation Mode.
fallback_psm: If confidence too low, retry with this PSM per line.
min_confidence: Minimum average confidence before fallback.
Returns:
List of word dicts with text, position, confidence.
"""
# Crop region
crop = ocr_img[region.y:region.y + region.height,
region.x:region.x + region.width]
if crop.size == 0:
return []
# Convert to PIL for pytesseract
pil_img = Image.fromarray(crop)
# Run Tesseract with specified PSM
config = f'--psm {psm} --oem 3'
try:
data = pytesseract.image_to_data(pil_img, lang=lang, config=config,
output_type=pytesseract.Output.DICT)
except Exception as e:
logger.warning(f"Tesseract failed for region {region.type}: {e}")
return []
words = []
for i in range(len(data['text'])):
text = data['text'][i].strip()
conf = int(data['conf'][i])
if not text or conf < 10:
continue
words.append({
'text': text,
'left': data['left'][i] + region.x, # Absolute coords
'top': data['top'][i] + region.y,
'width': data['width'][i],
'height': data['height'][i],
'conf': conf,
'region_type': region.type,
})
# Check average confidence
if words and fallback_psm is not None:
avg_conf = sum(w['conf'] for w in words) / len(words)
if avg_conf < min_confidence:
logger.info(f"Region {region.type}: avg confidence {avg_conf:.0f}% < {min_confidence}%, "
f"trying fallback PSM {fallback_psm}")
words = _ocr_region_line_by_line(ocr_img, region, lang, fallback_psm)
return words
def _ocr_region_line_by_line(ocr_img: np.ndarray, region: PageRegion,
lang: str, psm: int) -> List[Dict[str, Any]]:
"""OCR a region line by line (fallback for low-confidence regions).
Splits the region into horizontal strips based on text density,
then OCRs each strip individually with the given PSM.
"""
crop = ocr_img[region.y:region.y + region.height,
region.x:region.x + region.width]
if crop.size == 0:
return []
# Find text lines via horizontal projection
inv = cv2.bitwise_not(crop)
h_proj = np.sum(inv, axis=1)
threshold = np.max(h_proj) * 0.05 if np.max(h_proj) > 0 else 0
# Find line boundaries
lines = []
in_text = False
line_start = 0
for y in range(len(h_proj)):
if h_proj[y] > threshold and not in_text:
line_start = y
in_text = True
elif h_proj[y] <= threshold and in_text:
if y - line_start > 5: # Minimum line height
lines.append((line_start, y))
in_text = False
if in_text and len(h_proj) - line_start > 5:
lines.append((line_start, len(h_proj)))
all_words = []
config = f'--psm {psm} --oem 3'
for line_y_start, line_y_end in lines:
# Add small padding
pad = 3
y1 = max(0, line_y_start - pad)
y2 = min(crop.shape[0], line_y_end + pad)
line_crop = crop[y1:y2, :]
if line_crop.size == 0:
continue
pil_img = Image.fromarray(line_crop)
try:
data = pytesseract.image_to_data(pil_img, lang=lang, config=config,
output_type=pytesseract.Output.DICT)
except Exception:
continue
for i in range(len(data['text'])):
text = data['text'][i].strip()
conf = int(data['conf'][i])
if not text or conf < 10:
continue
all_words.append({
'text': text,
'left': data['left'][i] + region.x,
'top': data['top'][i] + region.y + y1,
'width': data['width'][i],
'height': data['height'][i],
'conf': conf,
'region_type': region.type,
})
return all_words
def run_multi_pass_ocr(ocr_img: np.ndarray,
regions: List[PageRegion],
lang: str = "eng+deu") -> Dict[str, List[Dict]]:
"""Run OCR on each detected region with optimized settings.
Args:
ocr_img: Binarized full-page image.
regions: Detected page regions.
lang: Default language.
Returns:
Dict mapping region type to list of word dicts.
"""
results: Dict[str, List[Dict]] = {}
for region in regions:
if region.type == 'header' or region.type == 'footer':
continue # Skip non-content regions
if region.type == 'column_en':
words = ocr_region(ocr_img, region, lang='eng', psm=4)
elif region.type == 'column_de':
words = ocr_region(ocr_img, region, lang='deu', psm=4)
elif region.type == 'column_example':
words = ocr_region(ocr_img, region, lang=lang, psm=6,
fallback_psm=7, min_confidence=40.0)
else:
words = ocr_region(ocr_img, region, lang=lang, psm=6)
results[region.type] = words
logger.info(f"OCR {region.type}: {len(words)} words")
return results
# =============================================================================
# Stage 7: Line Alignment → Vocabulary Entries
# =============================================================================
def _group_words_into_lines(words: List[Dict], y_tolerance_px: int = 20) -> List[List[Dict]]:
"""Group words by Y position into lines, sorted by X within each line."""
if not words:
return []
sorted_words = sorted(words, key=lambda w: (w['top'], w['left']))
lines: List[List[Dict]] = []
current_line: List[Dict] = [sorted_words[0]]
current_y = sorted_words[0]['top']
for word in sorted_words[1:]:
if abs(word['top'] - current_y) <= y_tolerance_px:
current_line.append(word)
else:
current_line.sort(key=lambda w: w['left'])
lines.append(current_line)
current_line = [word]
current_y = word['top']
if current_line:
current_line.sort(key=lambda w: w['left'])
lines.append(current_line)
return lines
def match_lines_to_vocab(ocr_results: Dict[str, List[Dict]],
regions: List[PageRegion],
y_tolerance_px: int = 25) -> List[VocabRow]:
"""Align OCR results from different columns into vocabulary rows.
Uses Y-coordinate matching to pair English words, German translations,
and example sentences that appear on the same line.
Args:
ocr_results: Dict mapping region type to word lists.
regions: Detected regions (for reference).
y_tolerance_px: Max Y-distance to consider words on the same row.
Returns:
List of VocabRow objects.
"""
# If no vocabulary columns detected (e.g. plain text page), return empty
if 'column_en' not in ocr_results and 'column_de' not in ocr_results:
logger.info("match_lines_to_vocab: no column_en/column_de in OCR results, returning empty")
return []
# Group words into lines per column
en_lines = _group_words_into_lines(ocr_results.get('column_en', []), y_tolerance_px)
de_lines = _group_words_into_lines(ocr_results.get('column_de', []), y_tolerance_px)
ex_lines = _group_words_into_lines(ocr_results.get('column_example', []), y_tolerance_px)
def line_y_center(line: List[Dict]) -> float:
return sum(w['top'] + w['height'] / 2 for w in line) / len(line)
def line_text(line: List[Dict]) -> str:
return ' '.join(w['text'] for w in line)
def line_confidence(line: List[Dict]) -> float:
return sum(w['conf'] for w in line) / len(line) if line else 0
# Build EN entries as the primary reference
vocab_rows: List[VocabRow] = []
for en_line in en_lines:
en_y = line_y_center(en_line)
en_text = line_text(en_line)
en_conf = line_confidence(en_line)
# Skip very short or likely header content
if len(en_text.strip()) < 2:
continue
# Find matching DE line
de_text = ""
de_conf = 0.0
best_de_dist = float('inf')
best_de_idx = -1
for idx, de_line in enumerate(de_lines):
dist = abs(line_y_center(de_line) - en_y)
if dist < y_tolerance_px and dist < best_de_dist:
best_de_dist = dist
best_de_idx = idx
if best_de_idx >= 0:
de_text = line_text(de_lines[best_de_idx])
de_conf = line_confidence(de_lines[best_de_idx])
# Find matching example line
ex_text = ""
ex_conf = 0.0
best_ex_dist = float('inf')
best_ex_idx = -1
for idx, ex_line in enumerate(ex_lines):
dist = abs(line_y_center(ex_line) - en_y)
if dist < y_tolerance_px and dist < best_ex_dist:
best_ex_dist = dist
best_ex_idx = idx
if best_ex_idx >= 0:
ex_text = line_text(ex_lines[best_ex_idx])
ex_conf = line_confidence(ex_lines[best_ex_idx])
avg_conf = en_conf
conf_count = 1
if de_conf > 0:
avg_conf += de_conf
conf_count += 1
if ex_conf > 0:
avg_conf += ex_conf
conf_count += 1
vocab_rows.append(VocabRow(
english=en_text.strip(),
german=de_text.strip(),
example=ex_text.strip(),
confidence=avg_conf / conf_count,
y_position=int(en_y),
))
# Handle multi-line wrapping in example column:
# If an example line has no matching EN/DE, append to previous entry
matched_ex_ys = set()
for row in vocab_rows:
if row.example:
matched_ex_ys.add(row.y_position)
for ex_line in ex_lines:
ex_y = line_y_center(ex_line)
# Check if already matched
already_matched = any(abs(ex_y - y) < y_tolerance_px for y in matched_ex_ys)
if already_matched:
continue
# Find nearest previous vocab row
best_row = None
best_dist = float('inf')
for row in vocab_rows:
dist = ex_y - row.y_position
if 0 < dist < y_tolerance_px * 3 and dist < best_dist:
best_dist = dist
best_row = row
if best_row:
continuation = line_text(ex_line).strip()
if continuation:
best_row.example = (best_row.example + " " + continuation).strip()
# Sort by Y position
vocab_rows.sort(key=lambda r: r.y_position)
return vocab_rows
# =============================================================================
# Stage 8: Optional LLM Post-Correction
# =============================================================================
async def llm_post_correct(img: np.ndarray, vocab_rows: List[VocabRow],
confidence_threshold: float = 50.0,
enabled: bool = False) -> List[VocabRow]:
"""Optionally send low-confidence regions to Qwen-VL for correction.
Default: disabled. Enable per parameter.
Args:
img: Original BGR image.
vocab_rows: Current vocabulary rows.
confidence_threshold: Rows below this get LLM correction.
enabled: Whether to actually run LLM correction.
Returns:
Corrected vocabulary rows.
"""
if not enabled:
return vocab_rows
# TODO: Implement Qwen-VL correction for low-confidence entries
# For each row with confidence < threshold:
# 1. Crop the relevant region from img
# 2. Send crop + OCR text to Qwen-VL
# 3. Replace text if LLM provides a confident correction
logger.info(f"LLM post-correction skipped (not yet implemented)")
return vocab_rows
# =============================================================================
# Orchestrator
# =============================================================================
async def run_cv_pipeline(
pdf_data: Optional[bytes] = None,
image_data: Optional[bytes] = None,
page_number: int = 0,
zoom: float = 3.0,
enable_dewarp: bool = True,
enable_llm_correction: bool = False,
lang: str = "eng+deu",
) -> PipelineResult:
"""Run the complete CV document reconstruction pipeline.
Args:
pdf_data: Raw PDF bytes (mutually exclusive with image_data).
image_data: Raw image bytes (mutually exclusive with pdf_data).
page_number: 0-indexed page number (for PDF).
zoom: PDF rendering zoom factor.
enable_dewarp: Whether to run dewarp stage.
enable_llm_correction: Whether to run LLM post-correction.
lang: Tesseract language string.
Returns:
PipelineResult with vocabulary and timing info.
"""
if not CV_PIPELINE_AVAILABLE:
return PipelineResult(error="CV pipeline not available (OpenCV or Tesseract missing)")
result = PipelineResult()
total_start = time.time()
try:
# Stage 1: Render
t = time.time()
if pdf_data:
img = render_pdf_high_res(pdf_data, page_number, zoom)
elif image_data:
img = render_image_high_res(image_data)
else:
return PipelineResult(error="No input data (pdf_data or image_data required)")
result.stages['render'] = round(time.time() - t, 2)
result.image_width = img.shape[1]
result.image_height = img.shape[0]
logger.info(f"Stage 1 (render): {img.shape[1]}x{img.shape[0]} in {result.stages['render']}s")
# Stage 2: Deskew
t = time.time()
img, angle = deskew_image(img)
result.stages['deskew'] = round(time.time() - t, 2)
logger.info(f"Stage 2 (deskew): {angle:.2f}° in {result.stages['deskew']}s")
# Stage 3: Dewarp
if enable_dewarp:
t = time.time()
img = dewarp_image(img)
result.stages['dewarp'] = round(time.time() - t, 2)
# Stage 4: Dual image preparation
t = time.time()
ocr_img = create_ocr_image(img)
layout_img = create_layout_image(img)
result.stages['image_prep'] = round(time.time() - t, 2)
# Stage 5: Layout analysis
t = time.time()
regions = analyze_layout(layout_img, ocr_img)
result.stages['layout'] = round(time.time() - t, 2)
result.columns_detected = len([r for r in regions if r.type.startswith('column')])
logger.info(f"Stage 5 (layout): {result.columns_detected} columns in {result.stages['layout']}s")
# Stage 6: Multi-pass OCR
t = time.time()
ocr_results = run_multi_pass_ocr(ocr_img, regions, lang)
result.stages['ocr'] = round(time.time() - t, 2)
total_words = sum(len(w) for w in ocr_results.values())
result.word_count = total_words
logger.info(f"Stage 6 (OCR): {total_words} words in {result.stages['ocr']}s")
# Stage 7: Line alignment
t = time.time()
vocab_rows = match_lines_to_vocab(ocr_results, regions)
result.stages['alignment'] = round(time.time() - t, 2)
# Stage 8: Optional LLM correction
if enable_llm_correction:
t = time.time()
vocab_rows = await llm_post_correct(img, vocab_rows)
result.stages['llm_correction'] = round(time.time() - t, 2)
# Convert to output format
result.vocabulary = [
{
"english": row.english,
"german": row.german,
"example": row.example,
"confidence": round(row.confidence, 1),
}
for row in vocab_rows
if row.english or row.german # Skip empty rows
]
result.duration_seconds = round(time.time() - total_start, 2)
logger.info(f"CV Pipeline complete: {len(result.vocabulary)} entries in {result.duration_seconds}s")
except Exception as e:
logger.error(f"CV Pipeline error: {e}")
import traceback
logger.debug(traceback.format_exc())
result.error = str(e)
result.duration_seconds = round(time.time() - total_start, 2)
return result