Build a word-coverage mask so only pixels near Tesseract word bounding boxes contribute to the horizontal projection. Image regions (high ink but no words) are treated as white, preventing illustrations from merging multiple vocabulary rows into one. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2648 lines
96 KiB
Python
2648 lines
96 KiB
Python
"""
|
|
CV-based Document Reconstruction Pipeline for Vocabulary Extraction.
|
|
|
|
Uses classical Computer Vision techniques for high-quality OCR:
|
|
- High-resolution PDF rendering (432 DPI)
|
|
- Deskew (rotation correction via Hough Lines)
|
|
- Dewarp (book curvature correction) — pass-through initially
|
|
- Dual image preparation (binarized for OCR, CLAHE for layout)
|
|
- Projection-profile layout analysis (column/row detection)
|
|
- Multi-pass Tesseract OCR with region-specific PSM settings
|
|
- Y-coordinate line alignment for vocabulary matching
|
|
- Optional LLM post-correction for low-confidence regions
|
|
|
|
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
|
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
|
"""
|
|
|
|
import io
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
|
|
import numpy as np
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Availability Guards ---
|
|
|
|
try:
|
|
import cv2
|
|
CV2_AVAILABLE = True
|
|
except ImportError:
|
|
cv2 = None
|
|
CV2_AVAILABLE = False
|
|
logger.warning("OpenCV not available — CV pipeline disabled")
|
|
|
|
try:
|
|
import pytesseract
|
|
from PIL import Image
|
|
TESSERACT_AVAILABLE = True
|
|
except ImportError:
|
|
pytesseract = None
|
|
Image = None
|
|
TESSERACT_AVAILABLE = False
|
|
logger.warning("pytesseract/Pillow not available — CV pipeline disabled")
|
|
|
|
CV_PIPELINE_AVAILABLE = CV2_AVAILABLE and TESSERACT_AVAILABLE
|
|
|
|
|
|
# --- Language Detection Constants ---
|
|
|
|
GERMAN_FUNCTION_WORDS = {'der', 'die', 'das', 'und', 'ist', 'ein', 'eine', 'nicht',
|
|
'von', 'zu', 'mit', 'auf', 'fuer', 'den', 'dem', 'sich', 'auch', 'wird',
|
|
'nach', 'bei', 'aus', 'wie', 'oder', 'wenn', 'noch', 'aber', 'hat', 'nur',
|
|
'ueber', 'kann', 'als', 'ich', 'er', 'sie', 'es', 'wir', 'ihr', 'haben',
|
|
'sein', 'werden', 'war', 'sind', 'muss', 'soll', 'dieser', 'diese', 'diesem'}
|
|
|
|
ENGLISH_FUNCTION_WORDS = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'to', 'of',
|
|
'and', 'in', 'that', 'it', 'for', 'on', 'with', 'as', 'at', 'by', 'from',
|
|
'or', 'but', 'not', 'be', 'have', 'has', 'had', 'do', 'does', 'did', 'will',
|
|
'would', 'can', 'could', 'should', 'may', 'might', 'this', 'they', 'you', 'he',
|
|
'she', 'we', 'my', 'your', 'his', 'her', 'its', 'our', 'their', 'which'}
|
|
|
|
|
|
# --- Data Classes ---
|
|
|
|
@dataclass
|
|
class PageRegion:
|
|
"""A detected region on the page."""
|
|
type: str # 'column_en', 'column_de', 'column_example', 'page_ref', 'column_marker', 'column_text', 'header', 'footer'
|
|
x: int
|
|
y: int
|
|
width: int
|
|
height: int
|
|
classification_confidence: float = 1.0 # 0.0-1.0
|
|
classification_method: str = "" # 'content', 'position_enhanced', 'position_fallback'
|
|
|
|
|
|
@dataclass
|
|
class ColumnGeometry:
|
|
"""Geometrisch erkannte Spalte vor Typ-Klassifikation."""
|
|
index: int # 0-basiert, links->rechts
|
|
x: int
|
|
y: int
|
|
width: int
|
|
height: int
|
|
word_count: int
|
|
words: List[Dict] # Wort-Dicts aus Tesseract (text, conf, left, top, ...)
|
|
width_ratio: float # width / content_width (0.0-1.0)
|
|
|
|
|
|
@dataclass
|
|
class RowGeometry:
|
|
"""Geometrisch erkannte Zeile mit Kopf-/Fusszeilen-Klassifikation."""
|
|
index: int # 0-basiert, oben→unten
|
|
x: int # absolute left (= content left_x)
|
|
y: int # absolute y start
|
|
width: int # content width
|
|
height: int # Zeilenhoehe in px
|
|
word_count: int
|
|
words: List[Dict]
|
|
row_type: str = 'content' # 'content' | 'header' | 'footer'
|
|
gap_before: int = 0 # Gap in px ueber dieser Zeile
|
|
|
|
|
|
@dataclass
|
|
class VocabRow:
|
|
"""A single vocabulary entry assembled from multi-column OCR."""
|
|
english: str = ""
|
|
german: str = ""
|
|
example: str = ""
|
|
confidence: float = 0.0
|
|
y_position: int = 0
|
|
|
|
|
|
@dataclass
|
|
class PipelineResult:
|
|
"""Complete result of the CV pipeline."""
|
|
vocabulary: List[Dict[str, Any]] = field(default_factory=list)
|
|
word_count: int = 0
|
|
columns_detected: int = 0
|
|
duration_seconds: float = 0.0
|
|
stages: Dict[str, float] = field(default_factory=dict)
|
|
error: Optional[str] = None
|
|
image_width: int = 0
|
|
image_height: int = 0
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 1: High-Resolution PDF Rendering
|
|
# =============================================================================
|
|
|
|
def render_pdf_high_res(pdf_data: bytes, page_number: int = 0, zoom: float = 3.0) -> np.ndarray:
|
|
"""Render a PDF page to a high-resolution numpy array (BGR).
|
|
|
|
Args:
|
|
pdf_data: Raw PDF bytes.
|
|
page_number: 0-indexed page number.
|
|
zoom: Zoom factor (3.0 = 432 DPI).
|
|
|
|
Returns:
|
|
numpy array in BGR format.
|
|
"""
|
|
import fitz # PyMuPDF
|
|
|
|
pdf_doc = fitz.open(stream=pdf_data, filetype="pdf")
|
|
if page_number >= pdf_doc.page_count:
|
|
raise ValueError(f"Page {page_number} does not exist (PDF has {pdf_doc.page_count} pages)")
|
|
|
|
page = pdf_doc[page_number]
|
|
mat = fitz.Matrix(zoom, zoom)
|
|
pix = page.get_pixmap(matrix=mat)
|
|
|
|
# Convert to numpy BGR
|
|
img_data = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n)
|
|
if pix.n == 4: # RGBA
|
|
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGBA2BGR)
|
|
elif pix.n == 3: # RGB
|
|
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGB2BGR)
|
|
else: # Grayscale
|
|
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_GRAY2BGR)
|
|
|
|
pdf_doc.close()
|
|
return img_bgr
|
|
|
|
|
|
def render_image_high_res(image_data: bytes) -> np.ndarray:
|
|
"""Load an image (PNG/JPEG) into a numpy array (BGR).
|
|
|
|
Args:
|
|
image_data: Raw image bytes.
|
|
|
|
Returns:
|
|
numpy array in BGR format.
|
|
"""
|
|
img_array = np.frombuffer(image_data, dtype=np.uint8)
|
|
img_bgr = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
|
|
if img_bgr is None:
|
|
raise ValueError("Could not decode image data")
|
|
return img_bgr
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 2: Deskew (Rotation Correction)
|
|
# =============================================================================
|
|
|
|
def deskew_image(img: np.ndarray) -> Tuple[np.ndarray, float]:
|
|
"""Correct rotation using Hough Line detection.
|
|
|
|
Args:
|
|
img: BGR image.
|
|
|
|
Returns:
|
|
Tuple of (corrected image, detected angle in degrees).
|
|
"""
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
# Binarize for line detection
|
|
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
|
|
|
|
# Detect lines
|
|
lines = cv2.HoughLinesP(binary, 1, np.pi / 180, threshold=100,
|
|
minLineLength=img.shape[1] // 4, maxLineGap=20)
|
|
|
|
if lines is None or len(lines) < 3:
|
|
return img, 0.0
|
|
|
|
# Compute angles of near-horizontal lines
|
|
angles = []
|
|
for line in lines:
|
|
x1, y1, x2, y2 = line[0]
|
|
angle = np.degrees(np.arctan2(y2 - y1, x2 - x1))
|
|
if abs(angle) < 15: # Only near-horizontal
|
|
angles.append(angle)
|
|
|
|
if not angles:
|
|
return img, 0.0
|
|
|
|
median_angle = float(np.median(angles))
|
|
|
|
# Limit correction to ±5°
|
|
if abs(median_angle) > 5.0:
|
|
median_angle = 5.0 * np.sign(median_angle)
|
|
|
|
if abs(median_angle) < 0.1:
|
|
return img, 0.0
|
|
|
|
# Rotate
|
|
h, w = img.shape[:2]
|
|
center = (w // 2, h // 2)
|
|
M = cv2.getRotationMatrix2D(center, median_angle, 1.0)
|
|
corrected = cv2.warpAffine(img, M, (w, h),
|
|
flags=cv2.INTER_LINEAR,
|
|
borderMode=cv2.BORDER_REPLICATE)
|
|
|
|
logger.info(f"Deskew: corrected {median_angle:.2f}° rotation")
|
|
return corrected, median_angle
|
|
|
|
|
|
def deskew_image_by_word_alignment(
|
|
image_data: bytes,
|
|
lang: str = "eng+deu",
|
|
downscale_factor: float = 0.5,
|
|
) -> Tuple[bytes, float]:
|
|
"""Correct rotation by fitting a line through left-most word starts per text line.
|
|
|
|
More robust than Hough-based deskew for vocabulary worksheets where text lines
|
|
have consistent left-alignment. Runs a quick Tesseract pass on a downscaled
|
|
copy to find word positions, computes the dominant left-edge column, fits a
|
|
line through those points and rotates the full-resolution image.
|
|
|
|
Args:
|
|
image_data: Raw image bytes (PNG/JPEG).
|
|
lang: Tesseract language string for the quick pass.
|
|
downscale_factor: Shrink factor for the quick Tesseract pass (0.5 = 50%).
|
|
|
|
Returns:
|
|
Tuple of (rotated image as PNG bytes, detected angle in degrees).
|
|
"""
|
|
if not CV2_AVAILABLE or not TESSERACT_AVAILABLE:
|
|
return image_data, 0.0
|
|
|
|
# 1. Decode image
|
|
img_array = np.frombuffer(image_data, dtype=np.uint8)
|
|
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
|
|
if img is None:
|
|
logger.warning("deskew_by_word_alignment: could not decode image")
|
|
return image_data, 0.0
|
|
|
|
orig_h, orig_w = img.shape[:2]
|
|
|
|
# 2. Downscale for fast Tesseract pass
|
|
small_w = int(orig_w * downscale_factor)
|
|
small_h = int(orig_h * downscale_factor)
|
|
small = cv2.resize(img, (small_w, small_h), interpolation=cv2.INTER_AREA)
|
|
|
|
# 3. Quick Tesseract — word-level positions
|
|
pil_small = Image.fromarray(cv2.cvtColor(small, cv2.COLOR_BGR2RGB))
|
|
try:
|
|
data = pytesseract.image_to_data(
|
|
pil_small, lang=lang, config="--psm 6 --oem 3",
|
|
output_type=pytesseract.Output.DICT,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"deskew_by_word_alignment: Tesseract failed: {e}")
|
|
return image_data, 0.0
|
|
|
|
# 4. Per text-line, find the left-most word start
|
|
# Group by (block_num, par_num, line_num)
|
|
from collections import defaultdict
|
|
line_groups: Dict[tuple, list] = defaultdict(list)
|
|
for i in range(len(data["text"])):
|
|
text = (data["text"][i] or "").strip()
|
|
conf = int(data["conf"][i])
|
|
if not text or conf < 20:
|
|
continue
|
|
key = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
|
|
line_groups[key].append(i)
|
|
|
|
if len(line_groups) < 5:
|
|
logger.info(f"deskew_by_word_alignment: only {len(line_groups)} lines, skipping")
|
|
return image_data, 0.0
|
|
|
|
# For each line, pick the word with smallest 'left' → compute (left_x, center_y)
|
|
# Scale back to original resolution
|
|
scale = 1.0 / downscale_factor
|
|
points = [] # list of (x, y) in original-image coords
|
|
for key, indices in line_groups.items():
|
|
best_idx = min(indices, key=lambda i: data["left"][i])
|
|
lx = data["left"][best_idx] * scale
|
|
top = data["top"][best_idx] * scale
|
|
h = data["height"][best_idx] * scale
|
|
cy = top + h / 2.0
|
|
points.append((lx, cy))
|
|
|
|
# 5. Find dominant left-edge column + compute angle
|
|
xs = np.array([p[0] for p in points])
|
|
ys = np.array([p[1] for p in points])
|
|
median_x = float(np.median(xs))
|
|
tolerance = orig_w * 0.03 # 3% of image width
|
|
|
|
mask = np.abs(xs - median_x) <= tolerance
|
|
filtered_xs = xs[mask]
|
|
filtered_ys = ys[mask]
|
|
|
|
if len(filtered_xs) < 5:
|
|
logger.info(f"deskew_by_word_alignment: only {len(filtered_xs)} aligned points after filter, skipping")
|
|
return image_data, 0.0
|
|
|
|
# polyfit: x = a*y + b → a = dx/dy → angle = arctan(a)
|
|
coeffs = np.polyfit(filtered_ys, filtered_xs, 1)
|
|
slope = coeffs[0] # dx/dy
|
|
angle_rad = np.arctan(slope)
|
|
angle_deg = float(np.degrees(angle_rad))
|
|
|
|
# Clamp to ±5°
|
|
angle_deg = max(-5.0, min(5.0, angle_deg))
|
|
|
|
logger.info(f"deskew_by_word_alignment: detected {angle_deg:.2f}° from {len(filtered_xs)} points "
|
|
f"(total lines: {len(line_groups)})")
|
|
|
|
if abs(angle_deg) < 0.05:
|
|
return image_data, 0.0
|
|
|
|
# 6. Rotate full-res image
|
|
center = (orig_w // 2, orig_h // 2)
|
|
M = cv2.getRotationMatrix2D(center, angle_deg, 1.0)
|
|
rotated = cv2.warpAffine(img, M, (orig_w, orig_h),
|
|
flags=cv2.INTER_LINEAR,
|
|
borderMode=cv2.BORDER_REPLICATE)
|
|
|
|
# Encode back to PNG
|
|
success, png_buf = cv2.imencode(".png", rotated)
|
|
if not success:
|
|
logger.warning("deskew_by_word_alignment: PNG encoding failed")
|
|
return image_data, 0.0
|
|
|
|
return png_buf.tobytes(), angle_deg
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 3: Dewarp (Book Curvature Correction)
|
|
# =============================================================================
|
|
|
|
def _detect_shear_angle(img: np.ndarray) -> Dict[str, Any]:
|
|
"""Detect the vertical shear angle of the page.
|
|
|
|
After deskew (horizontal lines aligned), vertical features like column
|
|
edges may still be tilted. This measures that tilt by tracking the
|
|
strongest vertical edge across horizontal strips.
|
|
|
|
The result is a shear angle in degrees: the angular difference between
|
|
true vertical and the detected column edge.
|
|
|
|
Returns:
|
|
Dict with keys: method, shear_degrees, confidence.
|
|
"""
|
|
h, w = img.shape[:2]
|
|
result = {"method": "vertical_edge", "shear_degrees": 0.0, "confidence": 0.0}
|
|
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Vertical Sobel to find vertical edges
|
|
sobel_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
|
|
abs_sobel = np.abs(sobel_x).astype(np.uint8)
|
|
|
|
# Binarize with Otsu
|
|
_, binary = cv2.threshold(abs_sobel, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
|
|
|
|
num_strips = 20
|
|
strip_h = h // num_strips
|
|
edge_positions = [] # (y_center, x_position)
|
|
|
|
for i in range(num_strips):
|
|
y_start = i * strip_h
|
|
y_end = min((i + 1) * strip_h, h)
|
|
strip = binary[y_start:y_end, :]
|
|
|
|
# Project vertically (sum along y-axis)
|
|
projection = np.sum(strip, axis=0).astype(np.float64)
|
|
if projection.max() == 0:
|
|
continue
|
|
|
|
# Find the strongest vertical edge in left 40% of image
|
|
search_w = int(w * 0.4)
|
|
left_proj = projection[:search_w]
|
|
if left_proj.max() == 0:
|
|
continue
|
|
|
|
# Smooth and find peak
|
|
kernel_size = max(3, w // 100)
|
|
if kernel_size % 2 == 0:
|
|
kernel_size += 1
|
|
smoothed = cv2.GaussianBlur(left_proj.reshape(1, -1), (kernel_size, 1), 0).flatten()
|
|
x_pos = float(np.argmax(smoothed))
|
|
y_center = (y_start + y_end) / 2.0
|
|
edge_positions.append((y_center, x_pos))
|
|
|
|
if len(edge_positions) < 8:
|
|
return result
|
|
|
|
ys = np.array([p[0] for p in edge_positions])
|
|
xs = np.array([p[1] for p in edge_positions])
|
|
|
|
# Remove outliers (> 2 std from median)
|
|
median_x = np.median(xs)
|
|
std_x = max(np.std(xs), 1.0)
|
|
mask = np.abs(xs - median_x) < 2 * std_x
|
|
ys = ys[mask]
|
|
xs = xs[mask]
|
|
|
|
if len(ys) < 6:
|
|
return result
|
|
|
|
# Fit straight line: x = slope * y + intercept
|
|
# The slope tells us the tilt of the vertical edge
|
|
straight_coeffs = np.polyfit(ys, xs, 1)
|
|
slope = straight_coeffs[0] # dx/dy in pixels
|
|
fitted = np.polyval(straight_coeffs, ys)
|
|
residuals = xs - fitted
|
|
rmse = float(np.sqrt(np.mean(residuals ** 2)))
|
|
|
|
# Convert slope to angle: arctan(dx/dy) in degrees
|
|
import math
|
|
shear_degrees = math.degrees(math.atan(slope))
|
|
|
|
confidence = min(1.0, len(ys) / 15.0) * max(0.5, 1.0 - rmse / 5.0)
|
|
|
|
result["shear_degrees"] = round(shear_degrees, 3)
|
|
result["confidence"] = round(float(confidence), 2)
|
|
|
|
return result
|
|
|
|
|
|
def _apply_shear(img: np.ndarray, shear_degrees: float) -> np.ndarray:
|
|
"""Apply a vertical shear correction to an image.
|
|
|
|
Shifts each row horizontally proportional to its distance from the
|
|
vertical center. This corrects the tilt of vertical features (columns)
|
|
without affecting horizontal alignment (text lines).
|
|
|
|
Args:
|
|
img: BGR image.
|
|
shear_degrees: Shear angle in degrees. Positive = shift top-right/bottom-left.
|
|
|
|
Returns:
|
|
Corrected image.
|
|
"""
|
|
import math
|
|
h, w = img.shape[:2]
|
|
shear_tan = math.tan(math.radians(shear_degrees))
|
|
|
|
# Affine matrix: shift x by shear_tan * (y - h/2)
|
|
# [1 shear_tan -h/2*shear_tan]
|
|
# [0 1 0 ]
|
|
M = np.float32([
|
|
[1, shear_tan, -h / 2.0 * shear_tan],
|
|
[0, 1, 0],
|
|
])
|
|
|
|
corrected = cv2.warpAffine(img, M, (w, h),
|
|
flags=cv2.INTER_LINEAR,
|
|
borderMode=cv2.BORDER_REPLICATE)
|
|
return corrected
|
|
|
|
|
|
def dewarp_image(img: np.ndarray) -> Tuple[np.ndarray, Dict[str, Any]]:
|
|
"""Correct vertical shear after deskew.
|
|
|
|
After deskew aligns horizontal text lines, vertical features (column
|
|
edges) may still be tilted. This detects the tilt angle of the strongest
|
|
vertical edge and applies an affine shear correction.
|
|
|
|
Args:
|
|
img: BGR image (already deskewed).
|
|
|
|
Returns:
|
|
Tuple of (corrected_image, dewarp_info).
|
|
dewarp_info keys: method, shear_degrees, confidence.
|
|
"""
|
|
no_correction = {
|
|
"method": "none",
|
|
"shear_degrees": 0.0,
|
|
"confidence": 0.0,
|
|
}
|
|
|
|
if not CV2_AVAILABLE:
|
|
return img, no_correction
|
|
|
|
t0 = time.time()
|
|
|
|
detection = _detect_shear_angle(img)
|
|
duration = time.time() - t0
|
|
|
|
shear_deg = detection["shear_degrees"]
|
|
confidence = detection["confidence"]
|
|
|
|
logger.info(f"dewarp: detected shear={shear_deg:.3f}° "
|
|
f"conf={confidence:.2f} ({duration:.2f}s)")
|
|
|
|
# Only correct if shear is significant (> 0.05°)
|
|
if abs(shear_deg) < 0.05 or confidence < 0.3:
|
|
return img, no_correction
|
|
|
|
# Apply correction (negate the detected shear to straighten)
|
|
corrected = _apply_shear(img, -shear_deg)
|
|
|
|
info = {
|
|
"method": detection["method"],
|
|
"shear_degrees": shear_deg,
|
|
"confidence": confidence,
|
|
}
|
|
|
|
return corrected, info
|
|
|
|
|
|
def dewarp_image_manual(img: np.ndarray, shear_degrees: float) -> np.ndarray:
|
|
"""Apply shear correction with a manual angle.
|
|
|
|
Args:
|
|
img: BGR image (deskewed, before dewarp).
|
|
shear_degrees: Shear angle in degrees to correct.
|
|
|
|
Returns:
|
|
Corrected image.
|
|
"""
|
|
if abs(shear_degrees) < 0.001:
|
|
return img
|
|
return _apply_shear(img, -shear_degrees)
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 4: Dual Image Preparation
|
|
# =============================================================================
|
|
|
|
def create_ocr_image(img: np.ndarray) -> np.ndarray:
|
|
"""Create a binarized image optimized for Tesseract OCR.
|
|
|
|
Steps: Grayscale → Background normalization → Adaptive threshold → Denoise.
|
|
|
|
Args:
|
|
img: BGR image.
|
|
|
|
Returns:
|
|
Binary image (white text on black background inverted to black on white).
|
|
"""
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Background normalization: divide by blurred version
|
|
bg = cv2.GaussianBlur(gray, (51, 51), 0)
|
|
normalized = cv2.divide(gray, bg, scale=255)
|
|
|
|
# Adaptive binarization
|
|
binary = cv2.adaptiveThreshold(
|
|
normalized, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
|
cv2.THRESH_BINARY, 31, 10
|
|
)
|
|
|
|
# Light denoise
|
|
denoised = cv2.medianBlur(binary, 3)
|
|
|
|
return denoised
|
|
|
|
|
|
def create_layout_image(img: np.ndarray) -> np.ndarray:
|
|
"""Create a CLAHE-enhanced grayscale image for layout analysis.
|
|
|
|
Args:
|
|
img: BGR image.
|
|
|
|
Returns:
|
|
Enhanced grayscale image.
|
|
"""
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
|
|
enhanced = clahe.apply(gray)
|
|
return enhanced
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 5: Layout Analysis (Projection Profiles)
|
|
# =============================================================================
|
|
|
|
def _find_content_bounds(inv: np.ndarray) -> Tuple[int, int, int, int]:
|
|
"""Find the bounding box of actual text content (excluding page margins).
|
|
|
|
Returns:
|
|
Tuple of (left_x, right_x, top_y, bottom_y).
|
|
"""
|
|
h, w = inv.shape[:2]
|
|
|
|
# Horizontal projection for top/bottom
|
|
h_proj = np.sum(inv, axis=1).astype(float) / (w * 255)
|
|
|
|
top_y = 0
|
|
for y in range(h):
|
|
if h_proj[y] > 0.005:
|
|
top_y = max(0, y - 5)
|
|
break
|
|
|
|
bottom_y = h
|
|
for y in range(h - 1, 0, -1):
|
|
if h_proj[y] > 0.005:
|
|
bottom_y = min(h, y + 5)
|
|
break
|
|
|
|
# Vertical projection for left/right margins
|
|
v_proj = np.sum(inv[top_y:bottom_y, :], axis=0).astype(float)
|
|
v_proj_norm = v_proj / ((bottom_y - top_y) * 255) if (bottom_y - top_y) > 0 else v_proj
|
|
|
|
left_x = 0
|
|
for x in range(w):
|
|
if v_proj_norm[x] > 0.005:
|
|
left_x = max(0, x - 2)
|
|
break
|
|
|
|
right_x = w
|
|
for x in range(w - 1, 0, -1):
|
|
if v_proj_norm[x] > 0.005:
|
|
right_x = min(w, x + 2)
|
|
break
|
|
|
|
return left_x, right_x, top_y, bottom_y
|
|
|
|
|
|
def analyze_layout(layout_img: np.ndarray, ocr_img: np.ndarray) -> List[PageRegion]:
|
|
"""Detect columns, header, and footer using projection profiles.
|
|
|
|
Uses content-bounds detection to exclude page margins before searching
|
|
for column separators within the actual text area.
|
|
|
|
Args:
|
|
layout_img: CLAHE-enhanced grayscale image.
|
|
ocr_img: Binarized image for text density analysis.
|
|
|
|
Returns:
|
|
List of PageRegion objects describing detected regions.
|
|
"""
|
|
h, w = ocr_img.shape[:2]
|
|
|
|
# Invert: black text on white → white text on black for projection
|
|
inv = cv2.bitwise_not(ocr_img)
|
|
|
|
# --- Find actual content bounds (exclude page margins) ---
|
|
left_x, right_x, top_y, bottom_y = _find_content_bounds(inv)
|
|
content_w = right_x - left_x
|
|
content_h = bottom_y - top_y
|
|
|
|
logger.info(f"Layout: content bounds x=[{left_x}..{right_x}] ({content_w}px), "
|
|
f"y=[{top_y}..{bottom_y}] ({content_h}px) in {w}x{h} image")
|
|
|
|
if content_w < w * 0.3 or content_h < h * 0.3:
|
|
# Fallback if detection seems wrong
|
|
left_x, right_x = 0, w
|
|
top_y, bottom_y = 0, h
|
|
content_w, content_h = w, h
|
|
|
|
# --- Vertical projection within content area to find column separators ---
|
|
content_strip = inv[top_y:bottom_y, left_x:right_x]
|
|
v_proj = np.sum(content_strip, axis=0).astype(float)
|
|
v_proj_norm = v_proj / (content_h * 255) if content_h > 0 else v_proj
|
|
|
|
# Smooth the projection profile
|
|
kernel_size = max(5, content_w // 50)
|
|
if kernel_size % 2 == 0:
|
|
kernel_size += 1
|
|
v_proj_smooth = np.convolve(v_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
|
|
|
|
# Debug: log projection profile statistics
|
|
p_mean = float(np.mean(v_proj_smooth))
|
|
p_median = float(np.median(v_proj_smooth))
|
|
p_min = float(np.min(v_proj_smooth))
|
|
p_max = float(np.max(v_proj_smooth))
|
|
logger.info(f"Layout: v_proj stats — min={p_min:.4f}, max={p_max:.4f}, "
|
|
f"mean={p_mean:.4f}, median={p_median:.4f}")
|
|
|
|
# Find valleys using multiple threshold strategies
|
|
# Strategy 1: relative to median (catches clear separators)
|
|
# Strategy 2: local minima approach (catches subtle gaps)
|
|
threshold = max(p_median * 0.3, p_mean * 0.2)
|
|
logger.info(f"Layout: valley threshold={threshold:.4f}")
|
|
|
|
in_valley = v_proj_smooth < threshold
|
|
|
|
# Find contiguous valley regions
|
|
all_valleys = []
|
|
start = None
|
|
for x in range(len(v_proj_smooth)):
|
|
if in_valley[x] and start is None:
|
|
start = x
|
|
elif not in_valley[x] and start is not None:
|
|
valley_width = x - start
|
|
valley_depth = float(np.min(v_proj_smooth[start:x]))
|
|
# Valley must be at least 3px wide
|
|
if valley_width >= 3:
|
|
all_valleys.append((start, x, (start + x) // 2, valley_width, valley_depth))
|
|
start = None
|
|
|
|
logger.info(f"Layout: raw valleys (before filter): {len(all_valleys)} — "
|
|
f"{[(v[0]+left_x, v[1]+left_x, v[3], f'{v[4]:.4f}') for v in all_valleys[:10]]}")
|
|
|
|
# Filter: valleys must be inside the content area (not at edges)
|
|
inner_margin = int(content_w * 0.08)
|
|
valleys = [v for v in all_valleys if inner_margin < v[2] < content_w - inner_margin]
|
|
|
|
# If no valleys found with strict threshold, try local minima approach
|
|
if len(valleys) < 2:
|
|
logger.info("Layout: trying local minima approach for column detection")
|
|
# Divide content into 20 segments, find the 2 lowest
|
|
seg_count = 20
|
|
seg_width = content_w // seg_count
|
|
seg_scores = []
|
|
for i in range(seg_count):
|
|
sx = i * seg_width
|
|
ex = min((i + 1) * seg_width, content_w)
|
|
seg_mean = float(np.mean(v_proj_smooth[sx:ex]))
|
|
seg_scores.append((i, sx, ex, seg_mean))
|
|
|
|
seg_scores.sort(key=lambda s: s[3])
|
|
logger.info(f"Layout: segment scores (lowest 5): "
|
|
f"{[(s[0], s[1]+left_x, s[2]+left_x, f'{s[3]:.4f}') for s in seg_scores[:5]]}")
|
|
|
|
# Find two lowest non-adjacent segments that create reasonable columns
|
|
candidate_valleys = []
|
|
for seg_idx, sx, ex, seg_mean in seg_scores:
|
|
# Must not be at the edges
|
|
if seg_idx <= 1 or seg_idx >= seg_count - 2:
|
|
continue
|
|
# Must be significantly lower than overall mean
|
|
if seg_mean < p_mean * 0.6:
|
|
center = (sx + ex) // 2
|
|
candidate_valleys.append((sx, ex, center, ex - sx, seg_mean))
|
|
|
|
if len(candidate_valleys) >= 2:
|
|
# Pick the best pair: non-adjacent, creating reasonable column widths
|
|
candidate_valleys.sort(key=lambda v: v[2])
|
|
best_pair = None
|
|
best_score = float('inf')
|
|
for i in range(len(candidate_valleys)):
|
|
for j in range(i + 1, len(candidate_valleys)):
|
|
c1 = candidate_valleys[i][2]
|
|
c2 = candidate_valleys[j][2]
|
|
# Must be at least 20% apart
|
|
if (c2 - c1) < content_w * 0.2:
|
|
continue
|
|
col1 = c1
|
|
col2 = c2 - c1
|
|
col3 = content_w - c2
|
|
# Each column at least 15%
|
|
if col1 < content_w * 0.12 or col2 < content_w * 0.12 or col3 < content_w * 0.12:
|
|
continue
|
|
parts = sorted([col1, col2, col3])
|
|
score = parts[2] - parts[0]
|
|
if score < best_score:
|
|
best_score = score
|
|
best_pair = (candidate_valleys[i], candidate_valleys[j])
|
|
|
|
if best_pair:
|
|
valleys = list(best_pair)
|
|
logger.info(f"Layout: local minima found 2 valleys: "
|
|
f"{[(v[0]+left_x, v[1]+left_x, v[3]) for v in valleys]}")
|
|
|
|
logger.info(f"Layout: final {len(valleys)} valleys: "
|
|
f"{[(v[0]+left_x, v[1]+left_x, v[3]) for v in valleys]}")
|
|
|
|
regions = []
|
|
|
|
if len(valleys) >= 2:
|
|
# 3-column layout detected
|
|
valleys.sort(key=lambda v: v[2])
|
|
|
|
if len(valleys) == 2:
|
|
sep1_center = valleys[0][2]
|
|
sep2_center = valleys[1][2]
|
|
else:
|
|
# Pick the two valleys that best divide into 3 parts
|
|
# Prefer wider valleys (more likely true separators)
|
|
best_pair = None
|
|
best_score = float('inf')
|
|
for i in range(len(valleys)):
|
|
for j in range(i + 1, len(valleys)):
|
|
c1, c2 = valleys[i][2], valleys[j][2]
|
|
# Each column should be at least 15% of content width
|
|
col1 = c1
|
|
col2 = c2 - c1
|
|
col3 = content_w - c2
|
|
if col1 < content_w * 0.15 or col2 < content_w * 0.15 or col3 < content_w * 0.15:
|
|
continue
|
|
# Score: lower is better (more even distribution)
|
|
parts = sorted([col1, col2, col3])
|
|
score = parts[2] - parts[0]
|
|
# Bonus for wider valleys (subtract valley width)
|
|
score -= (valleys[i][3] + valleys[j][3]) * 0.5
|
|
if score < best_score:
|
|
best_score = score
|
|
best_pair = (c1, c2)
|
|
if best_pair:
|
|
sep1_center, sep2_center = best_pair
|
|
else:
|
|
sep1_center = valleys[0][2]
|
|
sep2_center = valleys[1][2]
|
|
|
|
# Convert from content-relative to absolute coordinates
|
|
abs_sep1 = sep1_center + left_x
|
|
abs_sep2 = sep2_center + left_x
|
|
|
|
logger.info(f"Layout: 3 columns at separators x={abs_sep1}, x={abs_sep2} "
|
|
f"(widths: {abs_sep1}, {abs_sep2-abs_sep1}, {w-abs_sep2})")
|
|
|
|
regions.append(PageRegion(
|
|
type='column_en', x=0, y=top_y,
|
|
width=abs_sep1, height=content_h
|
|
))
|
|
regions.append(PageRegion(
|
|
type='column_de', x=abs_sep1, y=top_y,
|
|
width=abs_sep2 - abs_sep1, height=content_h
|
|
))
|
|
regions.append(PageRegion(
|
|
type='column_example', x=abs_sep2, y=top_y,
|
|
width=w - abs_sep2, height=content_h
|
|
))
|
|
|
|
elif len(valleys) == 1:
|
|
# 2-column layout
|
|
abs_sep = valleys[0][2] + left_x
|
|
|
|
logger.info(f"Layout: 2 columns at separator x={abs_sep}")
|
|
|
|
regions.append(PageRegion(
|
|
type='column_en', x=0, y=top_y,
|
|
width=abs_sep, height=content_h
|
|
))
|
|
regions.append(PageRegion(
|
|
type='column_de', x=abs_sep, y=top_y,
|
|
width=w - abs_sep, height=content_h
|
|
))
|
|
|
|
else:
|
|
# No columns detected — run full-page OCR as single column
|
|
logger.warning("Layout: no column separators found, using full page")
|
|
regions.append(PageRegion(
|
|
type='column_en', x=0, y=top_y,
|
|
width=w, height=content_h
|
|
))
|
|
|
|
# Add header/footer info
|
|
if top_y > 10:
|
|
regions.append(PageRegion(
|
|
type='header', x=0, y=0,
|
|
width=w, height=top_y
|
|
))
|
|
if bottom_y < h - 10:
|
|
regions.append(PageRegion(
|
|
type='footer', x=0, y=bottom_y,
|
|
width=w, height=h - bottom_y
|
|
))
|
|
|
|
col_count = len([r for r in regions if r.type.startswith('column')])
|
|
logger.info(f"Layout: {col_count} columns, "
|
|
f"header={'yes' if top_y > 10 else 'no'}, "
|
|
f"footer={'yes' if bottom_y < h - 10 else 'no'}")
|
|
|
|
return regions
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 5b: Word-Based Layout Analysis (Two-Phase Column Detection)
|
|
# =============================================================================
|
|
|
|
# --- Phase A: Geometry Detection ---
|
|
|
|
def _detect_columns_by_clustering(
|
|
word_dicts: List[Dict],
|
|
left_edges: List[int],
|
|
edge_word_indices: List[int],
|
|
content_w: int,
|
|
content_h: int,
|
|
left_x: int,
|
|
right_x: int,
|
|
top_y: int,
|
|
bottom_y: int,
|
|
inv: Optional[np.ndarray] = None,
|
|
) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int, List[Dict], Optional[np.ndarray]]]:
|
|
"""Fallback: detect columns by clustering left-aligned word positions.
|
|
|
|
Used when the primary gap-based algorithm finds fewer than 2 gaps.
|
|
"""
|
|
tolerance = max(10, int(content_w * 0.01))
|
|
sorted_pairs = sorted(zip(left_edges, edge_word_indices), key=lambda p: p[0])
|
|
|
|
clusters = []
|
|
cluster_widxs = []
|
|
cur_edges = [sorted_pairs[0][0]]
|
|
cur_widxs = [sorted_pairs[0][1]]
|
|
for edge, widx in sorted_pairs[1:]:
|
|
if edge - cur_edges[-1] <= tolerance:
|
|
cur_edges.append(edge)
|
|
cur_widxs.append(widx)
|
|
else:
|
|
clusters.append(cur_edges)
|
|
cluster_widxs.append(cur_widxs)
|
|
cur_edges = [edge]
|
|
cur_widxs = [widx]
|
|
clusters.append(cur_edges)
|
|
cluster_widxs.append(cur_widxs)
|
|
|
|
MIN_Y_COVERAGE_PRIMARY = 0.30
|
|
MIN_Y_COVERAGE_SECONDARY = 0.15
|
|
MIN_WORDS_SECONDARY = 5
|
|
|
|
cluster_infos = []
|
|
for c_edges, c_widxs in zip(clusters, cluster_widxs):
|
|
if len(c_edges) < 2:
|
|
continue
|
|
y_positions = [word_dicts[idx]['top'] for idx in c_widxs]
|
|
y_span = max(y_positions) - min(y_positions)
|
|
y_coverage = y_span / content_h if content_h > 0 else 0.0
|
|
cluster_infos.append({
|
|
'mean_x': int(np.mean(c_edges)),
|
|
'count': len(c_edges),
|
|
'min_edge': min(c_edges),
|
|
'max_edge': max(c_edges),
|
|
'y_min': min(y_positions),
|
|
'y_max': max(y_positions),
|
|
'y_coverage': y_coverage,
|
|
})
|
|
|
|
primary = [c for c in cluster_infos if c['y_coverage'] >= MIN_Y_COVERAGE_PRIMARY]
|
|
primary_set = set(id(c) for c in primary)
|
|
secondary = [c for c in cluster_infos
|
|
if id(c) not in primary_set
|
|
and c['y_coverage'] >= MIN_Y_COVERAGE_SECONDARY
|
|
and c['count'] >= MIN_WORDS_SECONDARY]
|
|
significant = sorted(primary + secondary, key=lambda c: c['mean_x'])
|
|
|
|
if len(significant) < 3:
|
|
logger.info("ColumnGeometry clustering fallback: < 3 significant clusters")
|
|
return None
|
|
|
|
merge_distance = max(30, int(content_w * 0.06))
|
|
merged = [significant[0].copy()]
|
|
for s in significant[1:]:
|
|
if s['mean_x'] - merged[-1]['mean_x'] < merge_distance:
|
|
prev = merged[-1]
|
|
total = prev['count'] + s['count']
|
|
avg_x = (prev['mean_x'] * prev['count'] + s['mean_x'] * s['count']) // total
|
|
prev['mean_x'] = avg_x
|
|
prev['count'] = total
|
|
prev['min_edge'] = min(prev['min_edge'], s['min_edge'])
|
|
prev['max_edge'] = max(prev['max_edge'], s['max_edge'])
|
|
else:
|
|
merged.append(s.copy())
|
|
|
|
if len(merged) < 3:
|
|
logger.info("ColumnGeometry clustering fallback: < 3 merged clusters")
|
|
return None
|
|
|
|
logger.info(f"ColumnGeometry clustering fallback: {len(merged)} columns from clustering")
|
|
|
|
margin_px = max(6, int(content_w * 0.003))
|
|
return _build_geometries_from_starts(
|
|
[(max(0, left_x + m['min_edge'] - margin_px), m['count']) for m in merged],
|
|
word_dicts, left_x, right_x, top_y, bottom_y, content_w, content_h, inv,
|
|
)
|
|
|
|
|
|
def _build_geometries_from_starts(
|
|
col_starts: List[Tuple[int, int]],
|
|
word_dicts: List[Dict],
|
|
left_x: int,
|
|
right_x: int,
|
|
top_y: int,
|
|
bottom_y: int,
|
|
content_w: int,
|
|
content_h: int,
|
|
inv: Optional[np.ndarray] = None,
|
|
) -> Tuple[List[ColumnGeometry], int, int, int, int, List[Dict], Optional[np.ndarray]]:
|
|
"""Build ColumnGeometry objects from a list of (abs_start_x, word_count) pairs."""
|
|
geometries = []
|
|
for i, (start_x, count) in enumerate(col_starts):
|
|
if i + 1 < len(col_starts):
|
|
col_width = col_starts[i + 1][0] - start_x
|
|
else:
|
|
col_width = right_x - start_x
|
|
|
|
col_left_rel = start_x - left_x
|
|
col_right_rel = col_left_rel + col_width
|
|
col_words = [w for w in word_dicts
|
|
if col_left_rel <= w['left'] < col_right_rel]
|
|
|
|
geometries.append(ColumnGeometry(
|
|
index=i,
|
|
x=start_x,
|
|
y=top_y,
|
|
width=col_width,
|
|
height=content_h,
|
|
word_count=len(col_words),
|
|
words=col_words,
|
|
width_ratio=col_width / content_w if content_w > 0 else 0.0,
|
|
))
|
|
|
|
logger.info(f"ColumnGeometry: {len(geometries)} columns: "
|
|
f"{[(g.index, g.x, g.width, g.word_count) for g in geometries]}")
|
|
return (geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv)
|
|
|
|
|
|
def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int, List[Dict], np.ndarray]]:
|
|
"""Detect column geometry using whitespace-gap analysis with word validation.
|
|
|
|
Phase A of the two-phase column detection. Uses vertical projection
|
|
profiles to find whitespace gaps between columns, then validates that
|
|
no gap cuts through a word bounding box.
|
|
|
|
Falls back to clustering-based detection if fewer than 2 gaps are found.
|
|
|
|
Args:
|
|
ocr_img: Binarized grayscale image for layout analysis.
|
|
dewarped_bgr: Original BGR image (for Tesseract word detection).
|
|
|
|
Returns:
|
|
Tuple of (geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv)
|
|
or None if detection fails entirely.
|
|
"""
|
|
h, w = ocr_img.shape[:2]
|
|
|
|
# --- Step 1: Find content bounds ---
|
|
inv = cv2.bitwise_not(ocr_img)
|
|
left_x, right_x, top_y, bottom_y = _find_content_bounds(inv)
|
|
content_w = right_x - left_x
|
|
content_h = bottom_y - top_y
|
|
|
|
if content_w < w * 0.3 or content_h < h * 0.3:
|
|
left_x, right_x = 0, w
|
|
top_y, bottom_y = 0, h
|
|
content_w, content_h = w, h
|
|
|
|
logger.info(f"ColumnGeometry: content bounds x=[{left_x}..{right_x}] ({content_w}px), "
|
|
f"y=[{top_y}..{bottom_y}] ({content_h}px)")
|
|
|
|
# --- Step 2: Get word bounding boxes from Tesseract ---
|
|
content_roi = dewarped_bgr[top_y:bottom_y, left_x:right_x]
|
|
pil_img = Image.fromarray(cv2.cvtColor(content_roi, cv2.COLOR_BGR2RGB))
|
|
|
|
try:
|
|
data = pytesseract.image_to_data(pil_img, lang='eng+deu', output_type=pytesseract.Output.DICT)
|
|
except Exception as e:
|
|
logger.warning(f"ColumnGeometry: Tesseract image_to_data failed: {e}")
|
|
return None
|
|
|
|
word_dicts = []
|
|
left_edges = []
|
|
edge_word_indices = []
|
|
n_words = len(data['text'])
|
|
for i in range(n_words):
|
|
conf = int(data['conf'][i]) if str(data['conf'][i]).lstrip('-').isdigit() else -1
|
|
text = str(data['text'][i]).strip()
|
|
if conf < 30 or not text:
|
|
continue
|
|
lx = int(data['left'][i])
|
|
ty = int(data['top'][i])
|
|
bw = int(data['width'][i])
|
|
bh = int(data['height'][i])
|
|
left_edges.append(lx)
|
|
edge_word_indices.append(len(word_dicts))
|
|
word_dicts.append({
|
|
'text': text, 'conf': conf,
|
|
'left': lx, 'top': ty, 'width': bw, 'height': bh,
|
|
})
|
|
|
|
if len(left_edges) < 5:
|
|
logger.warning(f"ColumnGeometry: only {len(left_edges)} words detected")
|
|
return None
|
|
|
|
logger.info(f"ColumnGeometry: {len(left_edges)} words detected in content area")
|
|
|
|
# --- Step 3: Vertical projection profile ---
|
|
content_strip = inv[top_y:bottom_y, left_x:right_x]
|
|
v_proj = np.sum(content_strip, axis=0).astype(float)
|
|
v_proj_norm = v_proj / (content_h * 255) if content_h > 0 else v_proj
|
|
|
|
# Smooth the projection to avoid noise-induced micro-gaps
|
|
kernel_size = max(5, content_w // 80)
|
|
if kernel_size % 2 == 0:
|
|
kernel_size += 1 # keep odd for symmetry
|
|
v_smooth = np.convolve(v_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
|
|
|
|
# --- Step 4: Find whitespace gaps ---
|
|
# Threshold: areas with very little ink density are gaps
|
|
median_density = float(np.median(v_smooth[v_smooth > 0])) if np.any(v_smooth > 0) else 0.01
|
|
gap_threshold = max(median_density * 0.15, 0.005)
|
|
|
|
in_gap = v_smooth < gap_threshold
|
|
MIN_GAP_WIDTH = max(8, content_w // 200) # min ~8px or 0.5% of content width
|
|
|
|
# Collect contiguous gap regions
|
|
raw_gaps = [] # (start_x_rel, end_x_rel) relative to content ROI
|
|
gap_start = None
|
|
for x in range(len(in_gap)):
|
|
if in_gap[x]:
|
|
if gap_start is None:
|
|
gap_start = x
|
|
else:
|
|
if gap_start is not None:
|
|
gap_width = x - gap_start
|
|
if gap_width >= MIN_GAP_WIDTH:
|
|
raw_gaps.append((gap_start, x))
|
|
gap_start = None
|
|
# Handle gap at the right edge
|
|
if gap_start is not None:
|
|
gap_width = len(in_gap) - gap_start
|
|
if gap_width >= MIN_GAP_WIDTH:
|
|
raw_gaps.append((gap_start, len(in_gap)))
|
|
|
|
logger.info(f"ColumnGeometry: {len(raw_gaps)} raw gaps found (threshold={gap_threshold:.4f}, "
|
|
f"min_width={MIN_GAP_WIDTH}px): "
|
|
f"{[(g[0]+left_x, g[1]+left_x, g[1]-g[0]) for g in raw_gaps]}")
|
|
|
|
# --- Step 5: Validate gaps against word bounding boxes ---
|
|
validated_gaps = []
|
|
for gap_start_rel, gap_end_rel in raw_gaps:
|
|
# Check if any word overlaps with this gap region
|
|
overlapping = False
|
|
for wd in word_dicts:
|
|
word_left = wd['left']
|
|
word_right = wd['left'] + wd['width']
|
|
if word_left < gap_end_rel and word_right > gap_start_rel:
|
|
overlapping = True
|
|
break
|
|
|
|
if not overlapping:
|
|
validated_gaps.append((gap_start_rel, gap_end_rel))
|
|
else:
|
|
# Try to shift the gap to avoid the overlapping word(s)
|
|
# Find the tightest word boundaries within the gap region
|
|
min_word_left = content_w
|
|
max_word_right = 0
|
|
for wd in word_dicts:
|
|
word_left = wd['left']
|
|
word_right = wd['left'] + wd['width']
|
|
if word_left < gap_end_rel and word_right > gap_start_rel:
|
|
min_word_left = min(min_word_left, word_left)
|
|
max_word_right = max(max_word_right, word_right)
|
|
|
|
# Try gap before the overlapping words
|
|
if min_word_left - gap_start_rel >= MIN_GAP_WIDTH:
|
|
validated_gaps.append((gap_start_rel, min_word_left))
|
|
logger.debug(f"ColumnGeometry: gap shifted left to avoid word at {min_word_left}")
|
|
# Try gap after the overlapping words
|
|
elif gap_end_rel - max_word_right >= MIN_GAP_WIDTH:
|
|
validated_gaps.append((max_word_right, gap_end_rel))
|
|
logger.debug(f"ColumnGeometry: gap shifted right to avoid word at {max_word_right}")
|
|
else:
|
|
logger.debug(f"ColumnGeometry: gap [{gap_start_rel}..{gap_end_rel}] "
|
|
f"discarded (word overlap, no room to shift)")
|
|
|
|
logger.info(f"ColumnGeometry: {len(validated_gaps)} gaps after word validation: "
|
|
f"{[(g[0]+left_x, g[1]+left_x, g[1]-g[0]) for g in validated_gaps]}")
|
|
|
|
# --- Step 6: Fallback to clustering if too few gaps ---
|
|
if len(validated_gaps) < 2:
|
|
logger.info("ColumnGeometry: < 2 gaps found, falling back to clustering")
|
|
return _detect_columns_by_clustering(
|
|
word_dicts, left_edges, edge_word_indices,
|
|
content_w, content_h, left_x, right_x, top_y, bottom_y, inv,
|
|
)
|
|
|
|
# --- Step 7: Derive column boundaries from gaps ---
|
|
# Sort gaps by position
|
|
validated_gaps.sort(key=lambda g: g[0])
|
|
|
|
# Identify margin gaps (first and last) vs interior gaps
|
|
# A margin gap touches the edge of the content area (within 2% tolerance)
|
|
edge_tolerance = max(10, int(content_w * 0.02))
|
|
|
|
is_left_margin = validated_gaps[0][0] <= edge_tolerance
|
|
is_right_margin = validated_gaps[-1][1] >= content_w - edge_tolerance
|
|
|
|
# Interior gaps define column boundaries
|
|
# Column starts at the end of a gap, ends at the start of the next gap
|
|
col_starts = []
|
|
|
|
if is_left_margin:
|
|
# First column starts after the left margin gap
|
|
first_gap_end = validated_gaps[0][1]
|
|
interior_gaps = validated_gaps[1:]
|
|
else:
|
|
# No left margin gap — first column starts at content left edge
|
|
first_gap_end = 0
|
|
interior_gaps = validated_gaps[:]
|
|
|
|
if is_right_margin:
|
|
# Last gap is right margin — don't use it as column start
|
|
interior_gaps_for_boundaries = interior_gaps[:-1]
|
|
right_boundary = validated_gaps[-1][0] # last column ends at right margin gap start
|
|
else:
|
|
interior_gaps_for_boundaries = interior_gaps
|
|
right_boundary = content_w
|
|
|
|
# First column
|
|
col_starts.append(left_x + first_gap_end)
|
|
|
|
# Columns between interior gaps
|
|
for gap_start_rel, gap_end_rel in interior_gaps_for_boundaries:
|
|
col_starts.append(left_x + gap_end_rel)
|
|
|
|
# Count words per column region (for logging)
|
|
col_start_counts = []
|
|
for i, start_x in enumerate(col_starts):
|
|
if i + 1 < len(col_starts):
|
|
next_start = col_starts[i + 1]
|
|
elif is_right_margin:
|
|
next_start = left_x + right_boundary
|
|
else:
|
|
next_start = right_x
|
|
|
|
col_left_rel = start_x - left_x
|
|
col_right_rel = next_start - left_x
|
|
n_words_in_col = sum(1 for w in word_dicts
|
|
if col_left_rel <= w['left'] < col_right_rel)
|
|
col_start_counts.append((start_x, n_words_in_col))
|
|
|
|
logger.info(f"ColumnGeometry: {len(col_starts)} columns from {len(validated_gaps)} gaps "
|
|
f"(left_margin={is_left_margin}, right_margin={is_right_margin}): "
|
|
f"{col_start_counts}")
|
|
|
|
# --- Step 8: Build ColumnGeometry objects ---
|
|
# Determine right edge for each column
|
|
all_boundaries = []
|
|
for i, start_x in enumerate(col_starts):
|
|
if i + 1 < len(col_starts):
|
|
end_x = col_starts[i + 1]
|
|
elif is_right_margin:
|
|
end_x = left_x + right_boundary
|
|
else:
|
|
end_x = right_x
|
|
all_boundaries.append((start_x, end_x))
|
|
|
|
geometries = []
|
|
for i, (start_x, end_x) in enumerate(all_boundaries):
|
|
col_width = end_x - start_x
|
|
col_left_rel = start_x - left_x
|
|
col_right_rel = col_left_rel + col_width
|
|
col_words = [w for w in word_dicts
|
|
if col_left_rel <= w['left'] < col_right_rel]
|
|
|
|
geometries.append(ColumnGeometry(
|
|
index=i,
|
|
x=start_x,
|
|
y=top_y,
|
|
width=col_width,
|
|
height=content_h,
|
|
word_count=len(col_words),
|
|
words=col_words,
|
|
width_ratio=col_width / content_w if content_w > 0 else 0.0,
|
|
))
|
|
|
|
logger.info(f"ColumnGeometry: {len(geometries)} columns: "
|
|
f"{[(g.index, g.x, g.width, g.word_count) for g in geometries]}")
|
|
|
|
return (geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv)
|
|
|
|
|
|
# =============================================================================
|
|
# Row Geometry Detection (horizontal whitespace-gap analysis)
|
|
# =============================================================================
|
|
|
|
def detect_row_geometry(
|
|
inv: np.ndarray,
|
|
word_dicts: List[Dict],
|
|
left_x: int, right_x: int,
|
|
top_y: int, bottom_y: int,
|
|
) -> List['RowGeometry']:
|
|
"""Detect row geometry using horizontal whitespace-gap analysis.
|
|
|
|
Mirrors the vertical gap approach used for columns, but operates on
|
|
horizontal projection profiles to find gaps between text lines.
|
|
Also classifies header/footer rows based on gap size.
|
|
|
|
Args:
|
|
inv: Inverted binarized image (white text on black bg, full page).
|
|
word_dicts: Word bounding boxes from Tesseract (relative to content ROI).
|
|
left_x, right_x: Absolute X bounds of the content area.
|
|
top_y, bottom_y: Absolute Y bounds of the content area.
|
|
|
|
Returns:
|
|
List of RowGeometry objects sorted top to bottom.
|
|
"""
|
|
content_w = right_x - left_x
|
|
content_h = bottom_y - top_y
|
|
|
|
if content_h < 10 or content_w < 10:
|
|
logger.warning("detect_row_geometry: content area too small")
|
|
return []
|
|
|
|
# --- Step 1: Horizontal projection profile (text-only, images masked out) ---
|
|
content_strip = inv[top_y:bottom_y, left_x:right_x]
|
|
|
|
# Build a word-coverage mask so that image regions (high ink density but no
|
|
# Tesseract words) are ignored. Only pixels within/near word bounding boxes
|
|
# contribute to the projection. This prevents large illustrations from
|
|
# merging multiple vocabulary rows into one.
|
|
WORD_PAD_Y = max(4, content_h // 300) # small vertical padding around words
|
|
word_mask = np.zeros((content_h, content_w), dtype=np.uint8)
|
|
for wd in word_dicts:
|
|
y1 = max(0, wd['top'] - WORD_PAD_Y)
|
|
y2 = min(content_h, wd['top'] + wd['height'] + WORD_PAD_Y)
|
|
x1 = max(0, wd['left'])
|
|
x2 = min(content_w, wd['left'] + wd['width'])
|
|
word_mask[y1:y2, x1:x2] = 255
|
|
|
|
masked_strip = cv2.bitwise_and(content_strip, word_mask)
|
|
h_proj = np.sum(masked_strip, axis=1).astype(float)
|
|
h_proj_norm = h_proj / (content_w * 255) if content_w > 0 else h_proj
|
|
|
|
# --- Step 2: Smoothing + threshold ---
|
|
kernel_size = max(3, content_h // 200)
|
|
if kernel_size % 2 == 0:
|
|
kernel_size += 1
|
|
h_smooth = np.convolve(h_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
|
|
|
|
median_density = float(np.median(h_smooth[h_smooth > 0])) if np.any(h_smooth > 0) else 0.01
|
|
gap_threshold = max(median_density * 0.15, 0.003)
|
|
|
|
in_gap = h_smooth < gap_threshold
|
|
MIN_GAP_HEIGHT = max(3, content_h // 500)
|
|
|
|
# --- Step 3: Collect contiguous gap regions ---
|
|
raw_gaps = [] # (start_y_rel, end_y_rel) relative to content ROI
|
|
gap_start = None
|
|
for y in range(len(in_gap)):
|
|
if in_gap[y]:
|
|
if gap_start is None:
|
|
gap_start = y
|
|
else:
|
|
if gap_start is not None:
|
|
gap_height = y - gap_start
|
|
if gap_height >= MIN_GAP_HEIGHT:
|
|
raw_gaps.append((gap_start, y))
|
|
gap_start = None
|
|
if gap_start is not None:
|
|
gap_height = len(in_gap) - gap_start
|
|
if gap_height >= MIN_GAP_HEIGHT:
|
|
raw_gaps.append((gap_start, len(in_gap)))
|
|
|
|
logger.info(f"RowGeometry: {len(raw_gaps)} raw gaps found (threshold={gap_threshold:.4f}, "
|
|
f"min_height={MIN_GAP_HEIGHT}px)")
|
|
|
|
# --- Step 4: Validate gaps against word bounding boxes ---
|
|
validated_gaps = []
|
|
for gap_start_rel, gap_end_rel in raw_gaps:
|
|
overlapping = False
|
|
for wd in word_dicts:
|
|
word_top = wd['top']
|
|
word_bottom = wd['top'] + wd['height']
|
|
if word_top < gap_end_rel and word_bottom > gap_start_rel:
|
|
overlapping = True
|
|
break
|
|
|
|
if not overlapping:
|
|
validated_gaps.append((gap_start_rel, gap_end_rel))
|
|
else:
|
|
# Try to shift the gap to avoid overlapping words
|
|
min_word_top = content_h
|
|
max_word_bottom = 0
|
|
for wd in word_dicts:
|
|
word_top = wd['top']
|
|
word_bottom = wd['top'] + wd['height']
|
|
if word_top < gap_end_rel and word_bottom > gap_start_rel:
|
|
min_word_top = min(min_word_top, word_top)
|
|
max_word_bottom = max(max_word_bottom, word_bottom)
|
|
|
|
if min_word_top - gap_start_rel >= MIN_GAP_HEIGHT:
|
|
validated_gaps.append((gap_start_rel, min_word_top))
|
|
elif gap_end_rel - max_word_bottom >= MIN_GAP_HEIGHT:
|
|
validated_gaps.append((max_word_bottom, gap_end_rel))
|
|
else:
|
|
logger.debug(f"RowGeometry: gap [{gap_start_rel}..{gap_end_rel}] "
|
|
f"discarded (word overlap, no room to shift)")
|
|
|
|
logger.info(f"RowGeometry: {len(validated_gaps)} gaps after word validation")
|
|
|
|
# --- Fallback if too few gaps ---
|
|
if len(validated_gaps) < 2:
|
|
logger.info("RowGeometry: < 2 gaps found, falling back to word grouping")
|
|
return _build_rows_from_word_grouping(
|
|
word_dicts, left_x, right_x, top_y, bottom_y, content_w, content_h,
|
|
)
|
|
|
|
validated_gaps.sort(key=lambda g: g[0])
|
|
|
|
# --- Step 5: Header/footer detection via gap size ---
|
|
HEADER_FOOTER_ZONE = 0.15
|
|
GAP_MULTIPLIER = 2.0
|
|
|
|
gap_sizes = [g[1] - g[0] for g in validated_gaps]
|
|
median_gap = float(np.median(gap_sizes)) if gap_sizes else 0
|
|
large_gap_threshold = median_gap * GAP_MULTIPLIER
|
|
|
|
header_boundary_rel = None # y below which is header
|
|
footer_boundary_rel = None # y above which is footer
|
|
|
|
header_zone_limit = int(content_h * HEADER_FOOTER_ZONE)
|
|
footer_zone_start = int(content_h * (1.0 - HEADER_FOOTER_ZONE))
|
|
|
|
# Find largest gap in header zone
|
|
best_header_gap = None
|
|
for gs, ge in validated_gaps:
|
|
gap_mid = (gs + ge) / 2
|
|
gap_size = ge - gs
|
|
if gap_mid < header_zone_limit and gap_size > large_gap_threshold:
|
|
if best_header_gap is None or gap_size > (best_header_gap[1] - best_header_gap[0]):
|
|
best_header_gap = (gs, ge)
|
|
|
|
if best_header_gap is not None:
|
|
header_boundary_rel = best_header_gap[1]
|
|
logger.info(f"RowGeometry: header boundary at y_rel={header_boundary_rel} "
|
|
f"(gap={best_header_gap[1] - best_header_gap[0]}px, "
|
|
f"median_gap={median_gap:.0f}px)")
|
|
|
|
# Find largest gap in footer zone
|
|
best_footer_gap = None
|
|
for gs, ge in validated_gaps:
|
|
gap_mid = (gs + ge) / 2
|
|
gap_size = ge - gs
|
|
if gap_mid > footer_zone_start and gap_size > large_gap_threshold:
|
|
if best_footer_gap is None or gap_size > (best_footer_gap[1] - best_footer_gap[0]):
|
|
best_footer_gap = (gs, ge)
|
|
|
|
if best_footer_gap is not None:
|
|
footer_boundary_rel = best_footer_gap[0]
|
|
logger.info(f"RowGeometry: footer boundary at y_rel={footer_boundary_rel} "
|
|
f"(gap={best_footer_gap[1] - best_footer_gap[0]}px)")
|
|
|
|
# --- Step 6: Build RowGeometry objects from gaps ---
|
|
# Rows are the spans between gaps
|
|
row_boundaries = [] # (start_y_rel, end_y_rel)
|
|
|
|
# Top of content to first gap
|
|
if validated_gaps[0][0] > MIN_GAP_HEIGHT:
|
|
row_boundaries.append((0, validated_gaps[0][0]))
|
|
|
|
# Between gaps
|
|
for i in range(len(validated_gaps) - 1):
|
|
row_start = validated_gaps[i][1]
|
|
row_end = validated_gaps[i + 1][0]
|
|
if row_end - row_start > 0:
|
|
row_boundaries.append((row_start, row_end))
|
|
|
|
# Last gap to bottom of content
|
|
if validated_gaps[-1][1] < content_h - MIN_GAP_HEIGHT:
|
|
row_boundaries.append((validated_gaps[-1][1], content_h))
|
|
|
|
rows = []
|
|
for idx, (row_start_rel, row_end_rel) in enumerate(row_boundaries):
|
|
# Determine row type
|
|
row_mid = (row_start_rel + row_end_rel) / 2
|
|
if header_boundary_rel is not None and row_mid < header_boundary_rel:
|
|
row_type = 'header'
|
|
elif footer_boundary_rel is not None and row_mid > footer_boundary_rel:
|
|
row_type = 'footer'
|
|
else:
|
|
row_type = 'content'
|
|
|
|
# Collect words in this row
|
|
row_words = [w for w in word_dicts
|
|
if w['top'] + w['height'] / 2 >= row_start_rel
|
|
and w['top'] + w['height'] / 2 < row_end_rel]
|
|
|
|
# Gap before this row
|
|
gap_before = 0
|
|
if idx == 0 and validated_gaps[0][0] > 0:
|
|
gap_before = validated_gaps[0][0]
|
|
elif idx > 0:
|
|
# Find the gap just before this row boundary
|
|
for gs, ge in validated_gaps:
|
|
if ge == row_start_rel:
|
|
gap_before = ge - gs
|
|
break
|
|
|
|
rows.append(RowGeometry(
|
|
index=idx,
|
|
x=left_x,
|
|
y=top_y + row_start_rel,
|
|
width=content_w,
|
|
height=row_end_rel - row_start_rel,
|
|
word_count=len(row_words),
|
|
words=row_words,
|
|
row_type=row_type,
|
|
gap_before=gap_before,
|
|
))
|
|
|
|
type_counts = {}
|
|
for r in rows:
|
|
type_counts[r.row_type] = type_counts.get(r.row_type, 0) + 1
|
|
logger.info(f"RowGeometry: {len(rows)} rows detected: {type_counts}")
|
|
|
|
return rows
|
|
|
|
|
|
def _build_rows_from_word_grouping(
|
|
word_dicts: List[Dict],
|
|
left_x: int, right_x: int,
|
|
top_y: int, bottom_y: int,
|
|
content_w: int, content_h: int,
|
|
) -> List['RowGeometry']:
|
|
"""Fallback: build rows by grouping words by Y position.
|
|
|
|
Uses _group_words_into_lines() with a generous tolerance.
|
|
No header/footer detection in fallback mode.
|
|
"""
|
|
if not word_dicts:
|
|
return []
|
|
|
|
y_tolerance = max(20, content_h // 100)
|
|
lines = _group_words_into_lines(word_dicts, y_tolerance_px=y_tolerance)
|
|
|
|
rows = []
|
|
for idx, line_words in enumerate(lines):
|
|
if not line_words:
|
|
continue
|
|
min_top = min(w['top'] for w in line_words)
|
|
max_bottom = max(w['top'] + w['height'] for w in line_words)
|
|
row_height = max_bottom - min_top
|
|
|
|
rows.append(RowGeometry(
|
|
index=idx,
|
|
x=left_x,
|
|
y=top_y + min_top,
|
|
width=content_w,
|
|
height=row_height,
|
|
word_count=len(line_words),
|
|
words=line_words,
|
|
row_type='content',
|
|
gap_before=0,
|
|
))
|
|
|
|
logger.info(f"RowGeometry (fallback): {len(rows)} rows from word grouping")
|
|
return rows
|
|
|
|
|
|
# --- Phase B: Content-Based Classification ---
|
|
|
|
def _score_language(words: List[Dict]) -> Dict[str, float]:
|
|
"""Score the language of a column's words.
|
|
|
|
Analyzes function words, umlauts, and capitalization patterns
|
|
to determine whether text is English or German.
|
|
|
|
Args:
|
|
words: List of word dicts with 'text' and 'conf' keys.
|
|
|
|
Returns:
|
|
Dict with 'eng' and 'deu' scores (0.0-1.0).
|
|
"""
|
|
if not words:
|
|
return {'eng': 0.0, 'deu': 0.0}
|
|
|
|
# Only consider words with decent confidence
|
|
good_words = [w['text'].lower() for w in words if w.get('conf', 0) > 40 and len(w['text']) > 0]
|
|
if not good_words:
|
|
return {'eng': 0.0, 'deu': 0.0}
|
|
|
|
total = len(good_words)
|
|
en_hits = sum(1 for w in good_words if w in ENGLISH_FUNCTION_WORDS)
|
|
de_hits = sum(1 for w in good_words if w in GERMAN_FUNCTION_WORDS)
|
|
|
|
# Check for umlauts (strong German signal)
|
|
raw_texts = [w['text'] for w in words if w.get('conf', 0) > 40]
|
|
umlaut_count = sum(1 for t in raw_texts
|
|
for c in t if c in 'äöüÄÖÜß')
|
|
|
|
# German capitalization: nouns are capitalized mid-sentence
|
|
# Count words that start with uppercase but aren't at position 0
|
|
cap_words = sum(1 for t in raw_texts if t[0].isupper() and len(t) > 2)
|
|
|
|
en_score = en_hits / total if total > 0 else 0.0
|
|
de_score = de_hits / total if total > 0 else 0.0
|
|
|
|
# Boost German score for umlauts
|
|
if umlaut_count > 0:
|
|
de_score = min(1.0, de_score + 0.15 * min(umlaut_count, 5))
|
|
|
|
# Boost German score for high capitalization ratio (typical for German nouns)
|
|
if total > 5:
|
|
cap_ratio = cap_words / total
|
|
if cap_ratio > 0.3:
|
|
de_score = min(1.0, de_score + 0.1)
|
|
|
|
return {'eng': round(en_score, 3), 'deu': round(de_score, 3)}
|
|
|
|
|
|
def _score_role(geom: ColumnGeometry) -> Dict[str, float]:
|
|
"""Score the role of a column based on its geometry and content patterns.
|
|
|
|
Args:
|
|
geom: ColumnGeometry with words and dimensions.
|
|
|
|
Returns:
|
|
Dict with role scores: 'reference', 'marker', 'sentence', 'vocabulary'.
|
|
"""
|
|
scores = {'reference': 0.0, 'marker': 0.0, 'sentence': 0.0, 'vocabulary': 0.0}
|
|
|
|
if not geom.words:
|
|
return scores
|
|
|
|
texts = [w['text'] for w in geom.words if w.get('conf', 0) > 40]
|
|
if not texts:
|
|
return scores
|
|
|
|
avg_word_len = sum(len(t) for t in texts) / len(texts)
|
|
has_punctuation = sum(1 for t in texts if any(c in t for c in '.!?;:,'))
|
|
digit_words = sum(1 for t in texts if any(c.isdigit() for c in t))
|
|
digit_ratio = digit_words / len(texts) if texts else 0.0
|
|
|
|
# Reference: narrow + mostly numbers/page references
|
|
if geom.width_ratio < 0.12:
|
|
scores['reference'] = 0.5
|
|
if digit_ratio > 0.4:
|
|
scores['reference'] = min(1.0, 0.5 + digit_ratio * 0.5)
|
|
|
|
# Marker: narrow + few short entries
|
|
if geom.width_ratio < 0.06 and geom.word_count <= 15:
|
|
scores['marker'] = 0.7
|
|
if avg_word_len < 4:
|
|
scores['marker'] = 0.9
|
|
# Very narrow non-edge column → strong marker regardless of word count
|
|
if geom.width_ratio < 0.04 and geom.index > 0:
|
|
scores['marker'] = max(scores['marker'], 0.9)
|
|
|
|
# Sentence: longer words + punctuation present
|
|
if geom.width_ratio > 0.15 and has_punctuation > 2:
|
|
scores['sentence'] = 0.3 + min(0.5, has_punctuation / len(texts))
|
|
if avg_word_len > 4:
|
|
scores['sentence'] = min(1.0, scores['sentence'] + 0.2)
|
|
|
|
# Vocabulary: medium width + medium word length
|
|
if 0.10 < geom.width_ratio < 0.45:
|
|
scores['vocabulary'] = 0.4
|
|
if 3 < avg_word_len < 8:
|
|
scores['vocabulary'] = min(1.0, scores['vocabulary'] + 0.3)
|
|
|
|
return {k: round(v, 3) for k, v in scores.items()}
|
|
|
|
|
|
def classify_column_types(geometries: List[ColumnGeometry],
|
|
content_w: int,
|
|
top_y: int,
|
|
img_w: int,
|
|
img_h: int,
|
|
bottom_y: int) -> List[PageRegion]:
|
|
"""Classify column types using a 3-level fallback chain.
|
|
|
|
Level 1: Content-based (language + role scoring)
|
|
Level 2: Position + language (old rules enhanced with language detection)
|
|
Level 3: Pure position (exact old code, no regression)
|
|
|
|
Args:
|
|
geometries: List of ColumnGeometry from Phase A.
|
|
content_w: Total content width.
|
|
top_y: Top Y of content area.
|
|
img_w: Full image width.
|
|
img_h: Full image height.
|
|
bottom_y: Bottom Y of content area.
|
|
|
|
Returns:
|
|
List of PageRegion with types, confidence, and method.
|
|
"""
|
|
content_h = bottom_y - top_y
|
|
|
|
# Special case: single column → plain text page
|
|
if len(geometries) == 1:
|
|
geom = geometries[0]
|
|
return [PageRegion(
|
|
type='column_text', x=geom.x, y=geom.y,
|
|
width=geom.width, height=geom.height,
|
|
classification_confidence=0.9,
|
|
classification_method='content',
|
|
)]
|
|
|
|
# --- Pre-filter: first/last columns with very few words → column_ignore ---
|
|
ignore_regions = []
|
|
active_geometries = []
|
|
for idx, g in enumerate(geometries):
|
|
if (idx == 0 or idx == len(geometries) - 1) and g.word_count < 8:
|
|
ignore_regions.append(PageRegion(
|
|
type='column_ignore', x=g.x, y=g.y,
|
|
width=g.width, height=content_h,
|
|
classification_confidence=0.95,
|
|
classification_method='content',
|
|
))
|
|
logger.info(f"ClassifyColumns: column {idx} (x={g.x}, words={g.word_count}) → column_ignore (edge, few words)")
|
|
else:
|
|
active_geometries.append(g)
|
|
|
|
# Re-index active geometries for classification
|
|
for new_idx, g in enumerate(active_geometries):
|
|
g.index = new_idx
|
|
geometries = active_geometries
|
|
|
|
# Handle edge case: all columns ignored or only 1 left
|
|
if len(geometries) == 0:
|
|
return ignore_regions
|
|
if len(geometries) == 1:
|
|
geom = geometries[0]
|
|
ignore_regions.append(PageRegion(
|
|
type='column_text', x=geom.x, y=geom.y,
|
|
width=geom.width, height=geom.height,
|
|
classification_confidence=0.9,
|
|
classification_method='content',
|
|
))
|
|
return ignore_regions
|
|
|
|
# --- Score all columns ---
|
|
lang_scores = [_score_language(g.words) for g in geometries]
|
|
role_scores = [_score_role(g) for g in geometries]
|
|
|
|
logger.info(f"ClassifyColumns: language scores: "
|
|
f"{[(g.index, ls) for g, ls in zip(geometries, lang_scores)]}")
|
|
logger.info(f"ClassifyColumns: role scores: "
|
|
f"{[(g.index, rs) for g, rs in zip(geometries, role_scores)]}")
|
|
|
|
# --- Level 1: Content-based classification ---
|
|
regions = _classify_by_content(geometries, lang_scores, role_scores, content_w, content_h)
|
|
if regions is not None:
|
|
logger.info("ClassifyColumns: Level 1 (content-based) succeeded")
|
|
_add_header_footer(regions, top_y, bottom_y, img_w, img_h)
|
|
return ignore_regions + regions
|
|
|
|
# --- Level 2: Position + language enhanced ---
|
|
regions = _classify_by_position_enhanced(geometries, lang_scores, content_w, content_h)
|
|
if regions is not None:
|
|
logger.info("ClassifyColumns: Level 2 (position+language) succeeded")
|
|
_add_header_footer(regions, top_y, bottom_y, img_w, img_h)
|
|
return ignore_regions + regions
|
|
|
|
# --- Level 3: Pure position fallback (old code, no regression) ---
|
|
logger.info("ClassifyColumns: Level 3 (position fallback)")
|
|
regions = _classify_by_position_fallback(geometries, content_w, content_h)
|
|
_add_header_footer(regions, top_y, bottom_y, img_w, img_h)
|
|
return ignore_regions + regions
|
|
|
|
|
|
def _classify_by_content(geometries: List[ColumnGeometry],
|
|
lang_scores: List[Dict[str, float]],
|
|
role_scores: List[Dict[str, float]],
|
|
content_w: int,
|
|
content_h: int) -> Optional[List[PageRegion]]:
|
|
"""Level 1: Classify columns purely by content analysis.
|
|
|
|
Requires clear language signals to distinguish EN/DE columns.
|
|
Returns None if language signals are too weak.
|
|
"""
|
|
regions = []
|
|
assigned = set()
|
|
|
|
# Step 1: Assign structural roles first (reference, marker)
|
|
# left_20_threshold: only the leftmost ~20% of content area qualifies for page_ref
|
|
left_20_threshold = geometries[0].x + content_w * 0.20 if geometries else 0
|
|
|
|
for i, (geom, rs, ls) in enumerate(zip(geometries, role_scores, lang_scores)):
|
|
is_left_side = geom.x < left_20_threshold
|
|
has_strong_language = ls['eng'] > 0.3 or ls['deu'] > 0.3
|
|
if rs['reference'] >= 0.5 and geom.width_ratio < 0.12 and is_left_side and not has_strong_language:
|
|
regions.append(PageRegion(
|
|
type='page_ref', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=rs['reference'],
|
|
classification_method='content',
|
|
))
|
|
assigned.add(i)
|
|
elif rs['marker'] >= 0.7 and geom.width_ratio < 0.06:
|
|
regions.append(PageRegion(
|
|
type='column_marker', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=rs['marker'],
|
|
classification_method='content',
|
|
))
|
|
assigned.add(i)
|
|
elif geom.width_ratio < 0.05 and not is_left_side:
|
|
# Narrow column on the right side → marker, not page_ref
|
|
regions.append(PageRegion(
|
|
type='column_marker', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=0.8,
|
|
classification_method='content',
|
|
))
|
|
assigned.add(i)
|
|
|
|
# Step 2: Among remaining columns, find EN and DE by language scores
|
|
remaining = [(i, geometries[i], lang_scores[i], role_scores[i])
|
|
for i in range(len(geometries)) if i not in assigned]
|
|
|
|
if len(remaining) < 2:
|
|
# Not enough columns for EN/DE pair
|
|
if len(remaining) == 1:
|
|
i, geom, ls, rs = remaining[0]
|
|
regions.append(PageRegion(
|
|
type='column_text', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=0.6,
|
|
classification_method='content',
|
|
))
|
|
regions.sort(key=lambda r: r.x)
|
|
return regions
|
|
|
|
# Check if we have enough language signal
|
|
en_candidates = [(i, g, ls) for i, g, ls, rs in remaining if ls['eng'] > ls['deu'] and ls['eng'] > 0.05]
|
|
de_candidates = [(i, g, ls) for i, g, ls, rs in remaining if ls['deu'] > ls['eng'] and ls['deu'] > 0.05]
|
|
|
|
# Position tiebreaker: when language signals are weak, use left=EN, right=DE
|
|
if (not en_candidates or not de_candidates) and len(remaining) >= 2:
|
|
max_eng = max(ls['eng'] for _, _, ls, _ in remaining)
|
|
max_deu = max(ls['deu'] for _, _, ls, _ in remaining)
|
|
if max_eng < 0.15 and max_deu < 0.15:
|
|
# Both signals weak — fall back to positional: left=EN, right=DE
|
|
sorted_remaining = sorted(remaining, key=lambda x: x[1].x)
|
|
best_en = (sorted_remaining[0][0], sorted_remaining[0][1], sorted_remaining[0][2])
|
|
best_de = (sorted_remaining[1][0], sorted_remaining[1][1], sorted_remaining[1][2])
|
|
logger.info("ClassifyColumns: Level 1 using position tiebreaker (weak signals) - left=EN, right=DE")
|
|
en_conf = 0.4
|
|
de_conf = 0.4
|
|
|
|
regions.append(PageRegion(
|
|
type='column_en', x=best_en[1].x, y=best_en[1].y,
|
|
width=best_en[1].width, height=content_h,
|
|
classification_confidence=en_conf,
|
|
classification_method='content',
|
|
))
|
|
assigned.add(best_en[0])
|
|
|
|
regions.append(PageRegion(
|
|
type='column_de', x=best_de[1].x, y=best_de[1].y,
|
|
width=best_de[1].width, height=content_h,
|
|
classification_confidence=de_conf,
|
|
classification_method='content',
|
|
))
|
|
assigned.add(best_de[0])
|
|
|
|
# Assign remaining as example
|
|
for i, geom, ls, rs in remaining:
|
|
if i not in assigned:
|
|
regions.append(PageRegion(
|
|
type='column_example', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=0.4,
|
|
classification_method='content',
|
|
))
|
|
regions.sort(key=lambda r: r.x)
|
|
return regions
|
|
|
|
if not en_candidates or not de_candidates:
|
|
# Language signals too weak for content-based classification
|
|
logger.info("ClassifyColumns: Level 1 failed - no clear EN/DE language split")
|
|
return None
|
|
|
|
# Pick the best EN and DE candidates
|
|
best_en = max(en_candidates, key=lambda x: x[2]['eng'])
|
|
best_de = max(de_candidates, key=lambda x: x[2]['deu'])
|
|
|
|
if best_en[0] == best_de[0]:
|
|
# Same column scored highest for both — ambiguous
|
|
logger.info("ClassifyColumns: Level 1 failed - same column highest for EN and DE")
|
|
return None
|
|
|
|
en_conf = best_en[2]['eng']
|
|
de_conf = best_de[2]['deu']
|
|
|
|
regions.append(PageRegion(
|
|
type='column_en', x=best_en[1].x, y=best_en[1].y,
|
|
width=best_en[1].width, height=content_h,
|
|
classification_confidence=round(en_conf, 2),
|
|
classification_method='content',
|
|
))
|
|
assigned.add(best_en[0])
|
|
|
|
regions.append(PageRegion(
|
|
type='column_de', x=best_de[1].x, y=best_de[1].y,
|
|
width=best_de[1].width, height=content_h,
|
|
classification_confidence=round(de_conf, 2),
|
|
classification_method='content',
|
|
))
|
|
assigned.add(best_de[0])
|
|
|
|
# Step 3: Remaining columns → example or text based on role scores
|
|
for i, geom, ls, rs in remaining:
|
|
if i in assigned:
|
|
continue
|
|
if rs['sentence'] > 0.4:
|
|
regions.append(PageRegion(
|
|
type='column_example', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=round(rs['sentence'], 2),
|
|
classification_method='content',
|
|
))
|
|
else:
|
|
regions.append(PageRegion(
|
|
type='column_example', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=0.5,
|
|
classification_method='content',
|
|
))
|
|
|
|
regions.sort(key=lambda r: r.x)
|
|
return regions
|
|
|
|
|
|
def _classify_by_position_enhanced(geometries: List[ColumnGeometry],
|
|
lang_scores: List[Dict[str, float]],
|
|
content_w: int,
|
|
content_h: int) -> Optional[List[PageRegion]]:
|
|
"""Level 2: Position-based rules enhanced with language confirmation.
|
|
|
|
Uses the old positional heuristics but confirms EN/DE assignment
|
|
with language scores (swapping if needed).
|
|
"""
|
|
regions = []
|
|
untyped = list(range(len(geometries)))
|
|
first_x = geometries[0].x if geometries else 0
|
|
left_20_threshold = first_x + content_w * 0.20
|
|
|
|
# Rule 1: Leftmost narrow column → page_ref (only if in left 20%, no strong language)
|
|
g0 = geometries[0]
|
|
ls0 = lang_scores[0]
|
|
has_strong_lang_0 = ls0['eng'] > 0.3 or ls0['deu'] > 0.3
|
|
if g0.width_ratio < 0.12 and g0.x < left_20_threshold and not has_strong_lang_0:
|
|
regions.append(PageRegion(
|
|
type='page_ref', x=g0.x, y=g0.y,
|
|
width=g0.width, height=content_h,
|
|
classification_confidence=0.8,
|
|
classification_method='position_enhanced',
|
|
))
|
|
untyped.remove(0)
|
|
|
|
# Rule 2: Narrow columns with few words → marker
|
|
for i in list(untyped):
|
|
geom = geometries[i]
|
|
if geom.width_ratio < 0.06 and geom.word_count <= 15:
|
|
regions.append(PageRegion(
|
|
type='column_marker', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=0.7,
|
|
classification_method='position_enhanced',
|
|
))
|
|
untyped.remove(i)
|
|
|
|
# Rule 3: Rightmost remaining → column_example (if 3+ remaining)
|
|
if len(untyped) >= 3:
|
|
last_idx = untyped[-1]
|
|
geom = geometries[last_idx]
|
|
regions.append(PageRegion(
|
|
type='column_example', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=0.7,
|
|
classification_method='position_enhanced',
|
|
))
|
|
untyped.remove(last_idx)
|
|
|
|
# Rule 4: First two remaining → EN/DE, but check language to possibly swap
|
|
if len(untyped) >= 2:
|
|
idx_a = untyped[0]
|
|
idx_b = untyped[1]
|
|
ls_a = lang_scores[idx_a]
|
|
ls_b = lang_scores[idx_b]
|
|
|
|
# Default: first=EN, second=DE (old behavior)
|
|
en_idx, de_idx = idx_a, idx_b
|
|
conf = 0.7
|
|
|
|
# Swap if language signals clearly indicate the opposite
|
|
if ls_a['deu'] > ls_a['eng'] and ls_b['eng'] > ls_b['deu']:
|
|
en_idx, de_idx = idx_b, idx_a
|
|
conf = 0.85
|
|
logger.info(f"ClassifyColumns: Level 2 swapped EN/DE based on language scores")
|
|
|
|
regions.append(PageRegion(
|
|
type='column_en', x=geometries[en_idx].x, y=geometries[en_idx].y,
|
|
width=geometries[en_idx].width, height=content_h,
|
|
classification_confidence=conf,
|
|
classification_method='position_enhanced',
|
|
))
|
|
regions.append(PageRegion(
|
|
type='column_de', x=geometries[de_idx].x, y=geometries[de_idx].y,
|
|
width=geometries[de_idx].width, height=content_h,
|
|
classification_confidence=conf,
|
|
classification_method='position_enhanced',
|
|
))
|
|
untyped = untyped[2:]
|
|
elif len(untyped) == 1:
|
|
idx = untyped[0]
|
|
geom = geometries[idx]
|
|
regions.append(PageRegion(
|
|
type='column_en', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=0.5,
|
|
classification_method='position_enhanced',
|
|
))
|
|
untyped = []
|
|
|
|
# Remaining → example
|
|
for idx in untyped:
|
|
geom = geometries[idx]
|
|
regions.append(PageRegion(
|
|
type='column_example', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=0.5,
|
|
classification_method='position_enhanced',
|
|
))
|
|
|
|
regions.sort(key=lambda r: r.x)
|
|
return regions
|
|
|
|
|
|
def _classify_by_position_fallback(geometries: List[ColumnGeometry],
|
|
content_w: int,
|
|
content_h: int) -> List[PageRegion]:
|
|
"""Level 3: Pure position-based fallback (identical to old code).
|
|
|
|
Guarantees no regression from the previous behavior.
|
|
"""
|
|
regions = []
|
|
untyped = list(range(len(geometries)))
|
|
first_x = geometries[0].x if geometries else 0
|
|
left_20_threshold = first_x + content_w * 0.20
|
|
|
|
# Rule 1: Leftmost narrow column → page_ref (only if in left 20%)
|
|
g0 = geometries[0]
|
|
if g0.width_ratio < 0.12 and g0.x < left_20_threshold:
|
|
regions.append(PageRegion(
|
|
type='page_ref', x=g0.x, y=g0.y,
|
|
width=g0.width, height=content_h,
|
|
classification_confidence=1.0,
|
|
classification_method='position_fallback',
|
|
))
|
|
untyped.remove(0)
|
|
|
|
# Rule 2: Narrow + few words → marker
|
|
for i in list(untyped):
|
|
geom = geometries[i]
|
|
if geom.width_ratio < 0.06 and geom.word_count <= 15:
|
|
regions.append(PageRegion(
|
|
type='column_marker', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=1.0,
|
|
classification_method='position_fallback',
|
|
))
|
|
untyped.remove(i)
|
|
|
|
# Rule 3: Rightmost remaining → example (if 3+)
|
|
if len(untyped) >= 3:
|
|
last_idx = untyped[-1]
|
|
geom = geometries[last_idx]
|
|
regions.append(PageRegion(
|
|
type='column_example', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=1.0,
|
|
classification_method='position_fallback',
|
|
))
|
|
untyped.remove(last_idx)
|
|
|
|
# Rule 4: First remaining → EN, second → DE
|
|
if len(untyped) >= 2:
|
|
en_idx = untyped[0]
|
|
de_idx = untyped[1]
|
|
regions.append(PageRegion(
|
|
type='column_en', x=geometries[en_idx].x, y=geometries[en_idx].y,
|
|
width=geometries[en_idx].width, height=content_h,
|
|
classification_confidence=1.0,
|
|
classification_method='position_fallback',
|
|
))
|
|
regions.append(PageRegion(
|
|
type='column_de', x=geometries[de_idx].x, y=geometries[de_idx].y,
|
|
width=geometries[de_idx].width, height=content_h,
|
|
classification_confidence=1.0,
|
|
classification_method='position_fallback',
|
|
))
|
|
untyped = untyped[2:]
|
|
elif len(untyped) == 1:
|
|
idx = untyped[0]
|
|
geom = geometries[idx]
|
|
regions.append(PageRegion(
|
|
type='column_en', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=1.0,
|
|
classification_method='position_fallback',
|
|
))
|
|
untyped = []
|
|
|
|
for idx in untyped:
|
|
geom = geometries[idx]
|
|
regions.append(PageRegion(
|
|
type='column_example', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=1.0,
|
|
classification_method='position_fallback',
|
|
))
|
|
|
|
regions.sort(key=lambda r: r.x)
|
|
return regions
|
|
|
|
|
|
def _add_header_footer(regions: List[PageRegion], top_y: int, bottom_y: int,
|
|
img_w: int, img_h: int) -> None:
|
|
"""Add header/footer regions in-place."""
|
|
if top_y > 10:
|
|
regions.append(PageRegion(type='header', x=0, y=0, width=img_w, height=top_y))
|
|
if bottom_y < img_h - 10:
|
|
regions.append(PageRegion(type='footer', x=0, y=bottom_y, width=img_w, height=img_h - bottom_y))
|
|
|
|
|
|
# --- Main Entry Point ---
|
|
|
|
def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> List[PageRegion]:
|
|
"""Detect columns using two-phase approach: geometry then content classification.
|
|
|
|
Phase A: detect_column_geometry() — clustering word positions into columns.
|
|
Phase B: classify_column_types() — content-based type assignment with fallback.
|
|
|
|
Falls back to projection-based analyze_layout() if geometry detection fails.
|
|
|
|
Args:
|
|
ocr_img: Binarized grayscale image for layout analysis.
|
|
dewarped_bgr: Original BGR image (for Tesseract word detection).
|
|
|
|
Returns:
|
|
List of PageRegion objects with types, confidence, and method.
|
|
"""
|
|
h, w = ocr_img.shape[:2]
|
|
|
|
# Phase A: Geometry detection
|
|
result = detect_column_geometry(ocr_img, dewarped_bgr)
|
|
|
|
if result is None:
|
|
# Fallback to projection-based layout
|
|
logger.info("LayoutByWords: geometry detection failed, falling back to projection profiles")
|
|
layout_img = create_layout_image(dewarped_bgr)
|
|
return analyze_layout(layout_img, ocr_img)
|
|
|
|
geometries, left_x, right_x, top_y, bottom_y, _word_dicts, _inv = result
|
|
content_w = right_x - left_x
|
|
|
|
# Phase B: Content-based classification
|
|
regions = classify_column_types(geometries, content_w, top_y, w, h, bottom_y)
|
|
|
|
col_count = len([r for r in regions if r.type.startswith('column') or r.type == 'page_ref'])
|
|
methods = set(r.classification_method for r in regions if r.classification_method)
|
|
logger.info(f"LayoutByWords: {col_count} columns detected (methods: {methods}): "
|
|
f"{[(r.type, r.x, r.width, r.classification_confidence) for r in regions if r.type not in ('header','footer')]}")
|
|
|
|
return regions
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 6: Multi-Pass OCR
|
|
# =============================================================================
|
|
|
|
def ocr_region(ocr_img: np.ndarray, region: PageRegion, lang: str,
|
|
psm: int, fallback_psm: Optional[int] = None,
|
|
min_confidence: float = 40.0) -> List[Dict[str, Any]]:
|
|
"""Run Tesseract OCR on a specific region with given PSM.
|
|
|
|
Args:
|
|
ocr_img: Binarized full-page image.
|
|
region: Region to crop and OCR.
|
|
lang: Tesseract language string.
|
|
psm: Page Segmentation Mode.
|
|
fallback_psm: If confidence too low, retry with this PSM per line.
|
|
min_confidence: Minimum average confidence before fallback.
|
|
|
|
Returns:
|
|
List of word dicts with text, position, confidence.
|
|
"""
|
|
# Crop region
|
|
crop = ocr_img[region.y:region.y + region.height,
|
|
region.x:region.x + region.width]
|
|
|
|
if crop.size == 0:
|
|
return []
|
|
|
|
# Convert to PIL for pytesseract
|
|
pil_img = Image.fromarray(crop)
|
|
|
|
# Run Tesseract with specified PSM
|
|
config = f'--psm {psm} --oem 3'
|
|
try:
|
|
data = pytesseract.image_to_data(pil_img, lang=lang, config=config,
|
|
output_type=pytesseract.Output.DICT)
|
|
except Exception as e:
|
|
logger.warning(f"Tesseract failed for region {region.type}: {e}")
|
|
return []
|
|
|
|
words = []
|
|
for i in range(len(data['text'])):
|
|
text = data['text'][i].strip()
|
|
conf = int(data['conf'][i])
|
|
if not text or conf < 10:
|
|
continue
|
|
words.append({
|
|
'text': text,
|
|
'left': data['left'][i] + region.x, # Absolute coords
|
|
'top': data['top'][i] + region.y,
|
|
'width': data['width'][i],
|
|
'height': data['height'][i],
|
|
'conf': conf,
|
|
'region_type': region.type,
|
|
})
|
|
|
|
# Check average confidence
|
|
if words and fallback_psm is not None:
|
|
avg_conf = sum(w['conf'] for w in words) / len(words)
|
|
if avg_conf < min_confidence:
|
|
logger.info(f"Region {region.type}: avg confidence {avg_conf:.0f}% < {min_confidence}%, "
|
|
f"trying fallback PSM {fallback_psm}")
|
|
words = _ocr_region_line_by_line(ocr_img, region, lang, fallback_psm)
|
|
|
|
return words
|
|
|
|
|
|
def _ocr_region_line_by_line(ocr_img: np.ndarray, region: PageRegion,
|
|
lang: str, psm: int) -> List[Dict[str, Any]]:
|
|
"""OCR a region line by line (fallback for low-confidence regions).
|
|
|
|
Splits the region into horizontal strips based on text density,
|
|
then OCRs each strip individually with the given PSM.
|
|
"""
|
|
crop = ocr_img[region.y:region.y + region.height,
|
|
region.x:region.x + region.width]
|
|
|
|
if crop.size == 0:
|
|
return []
|
|
|
|
# Find text lines via horizontal projection
|
|
inv = cv2.bitwise_not(crop)
|
|
h_proj = np.sum(inv, axis=1)
|
|
threshold = np.max(h_proj) * 0.05 if np.max(h_proj) > 0 else 0
|
|
|
|
# Find line boundaries
|
|
lines = []
|
|
in_text = False
|
|
line_start = 0
|
|
for y in range(len(h_proj)):
|
|
if h_proj[y] > threshold and not in_text:
|
|
line_start = y
|
|
in_text = True
|
|
elif h_proj[y] <= threshold and in_text:
|
|
if y - line_start > 5: # Minimum line height
|
|
lines.append((line_start, y))
|
|
in_text = False
|
|
if in_text and len(h_proj) - line_start > 5:
|
|
lines.append((line_start, len(h_proj)))
|
|
|
|
all_words = []
|
|
config = f'--psm {psm} --oem 3'
|
|
|
|
for line_y_start, line_y_end in lines:
|
|
# Add small padding
|
|
pad = 3
|
|
y1 = max(0, line_y_start - pad)
|
|
y2 = min(crop.shape[0], line_y_end + pad)
|
|
line_crop = crop[y1:y2, :]
|
|
|
|
if line_crop.size == 0:
|
|
continue
|
|
|
|
pil_img = Image.fromarray(line_crop)
|
|
try:
|
|
data = pytesseract.image_to_data(pil_img, lang=lang, config=config,
|
|
output_type=pytesseract.Output.DICT)
|
|
except Exception:
|
|
continue
|
|
|
|
for i in range(len(data['text'])):
|
|
text = data['text'][i].strip()
|
|
conf = int(data['conf'][i])
|
|
if not text or conf < 10:
|
|
continue
|
|
all_words.append({
|
|
'text': text,
|
|
'left': data['left'][i] + region.x,
|
|
'top': data['top'][i] + region.y + y1,
|
|
'width': data['width'][i],
|
|
'height': data['height'][i],
|
|
'conf': conf,
|
|
'region_type': region.type,
|
|
})
|
|
|
|
return all_words
|
|
|
|
|
|
def run_multi_pass_ocr(ocr_img: np.ndarray,
|
|
regions: List[PageRegion],
|
|
lang: str = "eng+deu") -> Dict[str, List[Dict]]:
|
|
"""Run OCR on each detected region with optimized settings.
|
|
|
|
Args:
|
|
ocr_img: Binarized full-page image.
|
|
regions: Detected page regions.
|
|
lang: Default language.
|
|
|
|
Returns:
|
|
Dict mapping region type to list of word dicts.
|
|
"""
|
|
results: Dict[str, List[Dict]] = {}
|
|
|
|
for region in regions:
|
|
if region.type == 'header' or region.type == 'footer':
|
|
continue # Skip non-content regions
|
|
|
|
if region.type == 'column_en':
|
|
words = ocr_region(ocr_img, region, lang='eng', psm=4)
|
|
elif region.type == 'column_de':
|
|
words = ocr_region(ocr_img, region, lang='deu', psm=4)
|
|
elif region.type == 'column_example':
|
|
words = ocr_region(ocr_img, region, lang=lang, psm=6,
|
|
fallback_psm=7, min_confidence=40.0)
|
|
else:
|
|
words = ocr_region(ocr_img, region, lang=lang, psm=6)
|
|
|
|
results[region.type] = words
|
|
logger.info(f"OCR {region.type}: {len(words)} words")
|
|
|
|
return results
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 7: Line Alignment → Vocabulary Entries
|
|
# =============================================================================
|
|
|
|
def _group_words_into_lines(words: List[Dict], y_tolerance_px: int = 20) -> List[List[Dict]]:
|
|
"""Group words by Y position into lines, sorted by X within each line."""
|
|
if not words:
|
|
return []
|
|
|
|
sorted_words = sorted(words, key=lambda w: (w['top'], w['left']))
|
|
lines: List[List[Dict]] = []
|
|
current_line: List[Dict] = [sorted_words[0]]
|
|
current_y = sorted_words[0]['top']
|
|
|
|
for word in sorted_words[1:]:
|
|
if abs(word['top'] - current_y) <= y_tolerance_px:
|
|
current_line.append(word)
|
|
else:
|
|
current_line.sort(key=lambda w: w['left'])
|
|
lines.append(current_line)
|
|
current_line = [word]
|
|
current_y = word['top']
|
|
|
|
if current_line:
|
|
current_line.sort(key=lambda w: w['left'])
|
|
lines.append(current_line)
|
|
|
|
return lines
|
|
|
|
|
|
def match_lines_to_vocab(ocr_results: Dict[str, List[Dict]],
|
|
regions: List[PageRegion],
|
|
y_tolerance_px: int = 25) -> List[VocabRow]:
|
|
"""Align OCR results from different columns into vocabulary rows.
|
|
|
|
Uses Y-coordinate matching to pair English words, German translations,
|
|
and example sentences that appear on the same line.
|
|
|
|
Args:
|
|
ocr_results: Dict mapping region type to word lists.
|
|
regions: Detected regions (for reference).
|
|
y_tolerance_px: Max Y-distance to consider words on the same row.
|
|
|
|
Returns:
|
|
List of VocabRow objects.
|
|
"""
|
|
# If no vocabulary columns detected (e.g. plain text page), return empty
|
|
if 'column_en' not in ocr_results and 'column_de' not in ocr_results:
|
|
logger.info("match_lines_to_vocab: no column_en/column_de in OCR results, returning empty")
|
|
return []
|
|
|
|
# Group words into lines per column
|
|
en_lines = _group_words_into_lines(ocr_results.get('column_en', []), y_tolerance_px)
|
|
de_lines = _group_words_into_lines(ocr_results.get('column_de', []), y_tolerance_px)
|
|
ex_lines = _group_words_into_lines(ocr_results.get('column_example', []), y_tolerance_px)
|
|
|
|
def line_y_center(line: List[Dict]) -> float:
|
|
return sum(w['top'] + w['height'] / 2 for w in line) / len(line)
|
|
|
|
def line_text(line: List[Dict]) -> str:
|
|
return ' '.join(w['text'] for w in line)
|
|
|
|
def line_confidence(line: List[Dict]) -> float:
|
|
return sum(w['conf'] for w in line) / len(line) if line else 0
|
|
|
|
# Build EN entries as the primary reference
|
|
vocab_rows: List[VocabRow] = []
|
|
|
|
for en_line in en_lines:
|
|
en_y = line_y_center(en_line)
|
|
en_text = line_text(en_line)
|
|
en_conf = line_confidence(en_line)
|
|
|
|
# Skip very short or likely header content
|
|
if len(en_text.strip()) < 2:
|
|
continue
|
|
|
|
# Find matching DE line
|
|
de_text = ""
|
|
de_conf = 0.0
|
|
best_de_dist = float('inf')
|
|
best_de_idx = -1
|
|
for idx, de_line in enumerate(de_lines):
|
|
dist = abs(line_y_center(de_line) - en_y)
|
|
if dist < y_tolerance_px and dist < best_de_dist:
|
|
best_de_dist = dist
|
|
best_de_idx = idx
|
|
|
|
if best_de_idx >= 0:
|
|
de_text = line_text(de_lines[best_de_idx])
|
|
de_conf = line_confidence(de_lines[best_de_idx])
|
|
|
|
# Find matching example line
|
|
ex_text = ""
|
|
ex_conf = 0.0
|
|
best_ex_dist = float('inf')
|
|
best_ex_idx = -1
|
|
for idx, ex_line in enumerate(ex_lines):
|
|
dist = abs(line_y_center(ex_line) - en_y)
|
|
if dist < y_tolerance_px and dist < best_ex_dist:
|
|
best_ex_dist = dist
|
|
best_ex_idx = idx
|
|
|
|
if best_ex_idx >= 0:
|
|
ex_text = line_text(ex_lines[best_ex_idx])
|
|
ex_conf = line_confidence(ex_lines[best_ex_idx])
|
|
|
|
avg_conf = en_conf
|
|
conf_count = 1
|
|
if de_conf > 0:
|
|
avg_conf += de_conf
|
|
conf_count += 1
|
|
if ex_conf > 0:
|
|
avg_conf += ex_conf
|
|
conf_count += 1
|
|
|
|
vocab_rows.append(VocabRow(
|
|
english=en_text.strip(),
|
|
german=de_text.strip(),
|
|
example=ex_text.strip(),
|
|
confidence=avg_conf / conf_count,
|
|
y_position=int(en_y),
|
|
))
|
|
|
|
# Handle multi-line wrapping in example column:
|
|
# If an example line has no matching EN/DE, append to previous entry
|
|
matched_ex_ys = set()
|
|
for row in vocab_rows:
|
|
if row.example:
|
|
matched_ex_ys.add(row.y_position)
|
|
|
|
for ex_line in ex_lines:
|
|
ex_y = line_y_center(ex_line)
|
|
# Check if already matched
|
|
already_matched = any(abs(ex_y - y) < y_tolerance_px for y in matched_ex_ys)
|
|
if already_matched:
|
|
continue
|
|
|
|
# Find nearest previous vocab row
|
|
best_row = None
|
|
best_dist = float('inf')
|
|
for row in vocab_rows:
|
|
dist = ex_y - row.y_position
|
|
if 0 < dist < y_tolerance_px * 3 and dist < best_dist:
|
|
best_dist = dist
|
|
best_row = row
|
|
|
|
if best_row:
|
|
continuation = line_text(ex_line).strip()
|
|
if continuation:
|
|
best_row.example = (best_row.example + " " + continuation).strip()
|
|
|
|
# Sort by Y position
|
|
vocab_rows.sort(key=lambda r: r.y_position)
|
|
|
|
return vocab_rows
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 8: Optional LLM Post-Correction
|
|
# =============================================================================
|
|
|
|
async def llm_post_correct(img: np.ndarray, vocab_rows: List[VocabRow],
|
|
confidence_threshold: float = 50.0,
|
|
enabled: bool = False) -> List[VocabRow]:
|
|
"""Optionally send low-confidence regions to Qwen-VL for correction.
|
|
|
|
Default: disabled. Enable per parameter.
|
|
|
|
Args:
|
|
img: Original BGR image.
|
|
vocab_rows: Current vocabulary rows.
|
|
confidence_threshold: Rows below this get LLM correction.
|
|
enabled: Whether to actually run LLM correction.
|
|
|
|
Returns:
|
|
Corrected vocabulary rows.
|
|
"""
|
|
if not enabled:
|
|
return vocab_rows
|
|
|
|
# TODO: Implement Qwen-VL correction for low-confidence entries
|
|
# For each row with confidence < threshold:
|
|
# 1. Crop the relevant region from img
|
|
# 2. Send crop + OCR text to Qwen-VL
|
|
# 3. Replace text if LLM provides a confident correction
|
|
logger.info(f"LLM post-correction skipped (not yet implemented)")
|
|
return vocab_rows
|
|
|
|
|
|
# =============================================================================
|
|
# Orchestrator
|
|
# =============================================================================
|
|
|
|
async def run_cv_pipeline(
|
|
pdf_data: Optional[bytes] = None,
|
|
image_data: Optional[bytes] = None,
|
|
page_number: int = 0,
|
|
zoom: float = 3.0,
|
|
enable_dewarp: bool = True,
|
|
enable_llm_correction: bool = False,
|
|
lang: str = "eng+deu",
|
|
) -> PipelineResult:
|
|
"""Run the complete CV document reconstruction pipeline.
|
|
|
|
Args:
|
|
pdf_data: Raw PDF bytes (mutually exclusive with image_data).
|
|
image_data: Raw image bytes (mutually exclusive with pdf_data).
|
|
page_number: 0-indexed page number (for PDF).
|
|
zoom: PDF rendering zoom factor.
|
|
enable_dewarp: Whether to run dewarp stage.
|
|
enable_llm_correction: Whether to run LLM post-correction.
|
|
lang: Tesseract language string.
|
|
|
|
Returns:
|
|
PipelineResult with vocabulary and timing info.
|
|
"""
|
|
if not CV_PIPELINE_AVAILABLE:
|
|
return PipelineResult(error="CV pipeline not available (OpenCV or Tesseract missing)")
|
|
|
|
result = PipelineResult()
|
|
total_start = time.time()
|
|
|
|
try:
|
|
# Stage 1: Render
|
|
t = time.time()
|
|
if pdf_data:
|
|
img = render_pdf_high_res(pdf_data, page_number, zoom)
|
|
elif image_data:
|
|
img = render_image_high_res(image_data)
|
|
else:
|
|
return PipelineResult(error="No input data (pdf_data or image_data required)")
|
|
result.stages['render'] = round(time.time() - t, 2)
|
|
result.image_width = img.shape[1]
|
|
result.image_height = img.shape[0]
|
|
logger.info(f"Stage 1 (render): {img.shape[1]}x{img.shape[0]} in {result.stages['render']}s")
|
|
|
|
# Stage 2: Deskew
|
|
t = time.time()
|
|
img, angle = deskew_image(img)
|
|
result.stages['deskew'] = round(time.time() - t, 2)
|
|
logger.info(f"Stage 2 (deskew): {angle:.2f}° in {result.stages['deskew']}s")
|
|
|
|
# Stage 3: Dewarp
|
|
if enable_dewarp:
|
|
t = time.time()
|
|
img = dewarp_image(img)
|
|
result.stages['dewarp'] = round(time.time() - t, 2)
|
|
|
|
# Stage 4: Dual image preparation
|
|
t = time.time()
|
|
ocr_img = create_ocr_image(img)
|
|
layout_img = create_layout_image(img)
|
|
result.stages['image_prep'] = round(time.time() - t, 2)
|
|
|
|
# Stage 5: Layout analysis
|
|
t = time.time()
|
|
regions = analyze_layout(layout_img, ocr_img)
|
|
result.stages['layout'] = round(time.time() - t, 2)
|
|
result.columns_detected = len([r for r in regions if r.type.startswith('column')])
|
|
logger.info(f"Stage 5 (layout): {result.columns_detected} columns in {result.stages['layout']}s")
|
|
|
|
# Stage 6: Multi-pass OCR
|
|
t = time.time()
|
|
ocr_results = run_multi_pass_ocr(ocr_img, regions, lang)
|
|
result.stages['ocr'] = round(time.time() - t, 2)
|
|
total_words = sum(len(w) for w in ocr_results.values())
|
|
result.word_count = total_words
|
|
logger.info(f"Stage 6 (OCR): {total_words} words in {result.stages['ocr']}s")
|
|
|
|
# Stage 7: Line alignment
|
|
t = time.time()
|
|
vocab_rows = match_lines_to_vocab(ocr_results, regions)
|
|
result.stages['alignment'] = round(time.time() - t, 2)
|
|
|
|
# Stage 8: Optional LLM correction
|
|
if enable_llm_correction:
|
|
t = time.time()
|
|
vocab_rows = await llm_post_correct(img, vocab_rows)
|
|
result.stages['llm_correction'] = round(time.time() - t, 2)
|
|
|
|
# Convert to output format
|
|
result.vocabulary = [
|
|
{
|
|
"english": row.english,
|
|
"german": row.german,
|
|
"example": row.example,
|
|
"confidence": round(row.confidence, 1),
|
|
}
|
|
for row in vocab_rows
|
|
if row.english or row.german # Skip empty rows
|
|
]
|
|
|
|
result.duration_seconds = round(time.time() - total_start, 2)
|
|
logger.info(f"CV Pipeline complete: {len(result.vocabulary)} entries in {result.duration_seconds}s")
|
|
|
|
except Exception as e:
|
|
logger.error(f"CV Pipeline error: {e}")
|
|
import traceback
|
|
logger.debug(traceback.format_exc())
|
|
result.error = str(e)
|
|
result.duration_seconds = round(time.time() - total_start, 2)
|
|
|
|
return result
|