- Reduce left-side threshold from 35% to 20% of content width - Strong language signal (eng/deu > 0.3) now prevents page_ref assignment - Increase column_ignore word threshold from 3 to 8 for edge columns - Apply language guard to Level 1 and Level 2 classification Fixes: column with deu=0.921 was misclassified as page_ref because reference score check ran before language analysis. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2194 lines
79 KiB
Python
2194 lines
79 KiB
Python
"""
|
|
CV-based Document Reconstruction Pipeline for Vocabulary Extraction.
|
|
|
|
Uses classical Computer Vision techniques for high-quality OCR:
|
|
- High-resolution PDF rendering (432 DPI)
|
|
- Deskew (rotation correction via Hough Lines)
|
|
- Dewarp (book curvature correction) — pass-through initially
|
|
- Dual image preparation (binarized for OCR, CLAHE for layout)
|
|
- Projection-profile layout analysis (column/row detection)
|
|
- Multi-pass Tesseract OCR with region-specific PSM settings
|
|
- Y-coordinate line alignment for vocabulary matching
|
|
- Optional LLM post-correction for low-confidence regions
|
|
|
|
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
|
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
|
"""
|
|
|
|
import io
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
|
|
import numpy as np
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Availability Guards ---
|
|
|
|
try:
|
|
import cv2
|
|
CV2_AVAILABLE = True
|
|
except ImportError:
|
|
cv2 = None
|
|
CV2_AVAILABLE = False
|
|
logger.warning("OpenCV not available — CV pipeline disabled")
|
|
|
|
try:
|
|
import pytesseract
|
|
from PIL import Image
|
|
TESSERACT_AVAILABLE = True
|
|
except ImportError:
|
|
pytesseract = None
|
|
Image = None
|
|
TESSERACT_AVAILABLE = False
|
|
logger.warning("pytesseract/Pillow not available — CV pipeline disabled")
|
|
|
|
CV_PIPELINE_AVAILABLE = CV2_AVAILABLE and TESSERACT_AVAILABLE
|
|
|
|
|
|
# --- Language Detection Constants ---
|
|
|
|
GERMAN_FUNCTION_WORDS = {'der', 'die', 'das', 'und', 'ist', 'ein', 'eine', 'nicht',
|
|
'von', 'zu', 'mit', 'auf', 'fuer', 'den', 'dem', 'sich', 'auch', 'wird',
|
|
'nach', 'bei', 'aus', 'wie', 'oder', 'wenn', 'noch', 'aber', 'hat', 'nur',
|
|
'ueber', 'kann', 'als', 'ich', 'er', 'sie', 'es', 'wir', 'ihr', 'haben',
|
|
'sein', 'werden', 'war', 'sind', 'muss', 'soll', 'dieser', 'diese', 'diesem'}
|
|
|
|
ENGLISH_FUNCTION_WORDS = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'to', 'of',
|
|
'and', 'in', 'that', 'it', 'for', 'on', 'with', 'as', 'at', 'by', 'from',
|
|
'or', 'but', 'not', 'be', 'have', 'has', 'had', 'do', 'does', 'did', 'will',
|
|
'would', 'can', 'could', 'should', 'may', 'might', 'this', 'they', 'you', 'he',
|
|
'she', 'we', 'my', 'your', 'his', 'her', 'its', 'our', 'their', 'which'}
|
|
|
|
|
|
# --- Data Classes ---
|
|
|
|
@dataclass
|
|
class PageRegion:
|
|
"""A detected region on the page."""
|
|
type: str # 'column_en', 'column_de', 'column_example', 'page_ref', 'column_marker', 'column_text', 'header', 'footer'
|
|
x: int
|
|
y: int
|
|
width: int
|
|
height: int
|
|
classification_confidence: float = 1.0 # 0.0-1.0
|
|
classification_method: str = "" # 'content', 'position_enhanced', 'position_fallback'
|
|
|
|
|
|
@dataclass
|
|
class ColumnGeometry:
|
|
"""Geometrisch erkannte Spalte vor Typ-Klassifikation."""
|
|
index: int # 0-basiert, links->rechts
|
|
x: int
|
|
y: int
|
|
width: int
|
|
height: int
|
|
word_count: int
|
|
words: List[Dict] # Wort-Dicts aus Tesseract (text, conf, left, top, ...)
|
|
width_ratio: float # width / content_width (0.0-1.0)
|
|
|
|
|
|
@dataclass
|
|
class VocabRow:
|
|
"""A single vocabulary entry assembled from multi-column OCR."""
|
|
english: str = ""
|
|
german: str = ""
|
|
example: str = ""
|
|
confidence: float = 0.0
|
|
y_position: int = 0
|
|
|
|
|
|
@dataclass
|
|
class PipelineResult:
|
|
"""Complete result of the CV pipeline."""
|
|
vocabulary: List[Dict[str, Any]] = field(default_factory=list)
|
|
word_count: int = 0
|
|
columns_detected: int = 0
|
|
duration_seconds: float = 0.0
|
|
stages: Dict[str, float] = field(default_factory=dict)
|
|
error: Optional[str] = None
|
|
image_width: int = 0
|
|
image_height: int = 0
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 1: High-Resolution PDF Rendering
|
|
# =============================================================================
|
|
|
|
def render_pdf_high_res(pdf_data: bytes, page_number: int = 0, zoom: float = 3.0) -> np.ndarray:
|
|
"""Render a PDF page to a high-resolution numpy array (BGR).
|
|
|
|
Args:
|
|
pdf_data: Raw PDF bytes.
|
|
page_number: 0-indexed page number.
|
|
zoom: Zoom factor (3.0 = 432 DPI).
|
|
|
|
Returns:
|
|
numpy array in BGR format.
|
|
"""
|
|
import fitz # PyMuPDF
|
|
|
|
pdf_doc = fitz.open(stream=pdf_data, filetype="pdf")
|
|
if page_number >= pdf_doc.page_count:
|
|
raise ValueError(f"Page {page_number} does not exist (PDF has {pdf_doc.page_count} pages)")
|
|
|
|
page = pdf_doc[page_number]
|
|
mat = fitz.Matrix(zoom, zoom)
|
|
pix = page.get_pixmap(matrix=mat)
|
|
|
|
# Convert to numpy BGR
|
|
img_data = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n)
|
|
if pix.n == 4: # RGBA
|
|
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGBA2BGR)
|
|
elif pix.n == 3: # RGB
|
|
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGB2BGR)
|
|
else: # Grayscale
|
|
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_GRAY2BGR)
|
|
|
|
pdf_doc.close()
|
|
return img_bgr
|
|
|
|
|
|
def render_image_high_res(image_data: bytes) -> np.ndarray:
|
|
"""Load an image (PNG/JPEG) into a numpy array (BGR).
|
|
|
|
Args:
|
|
image_data: Raw image bytes.
|
|
|
|
Returns:
|
|
numpy array in BGR format.
|
|
"""
|
|
img_array = np.frombuffer(image_data, dtype=np.uint8)
|
|
img_bgr = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
|
|
if img_bgr is None:
|
|
raise ValueError("Could not decode image data")
|
|
return img_bgr
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 2: Deskew (Rotation Correction)
|
|
# =============================================================================
|
|
|
|
def deskew_image(img: np.ndarray) -> Tuple[np.ndarray, float]:
|
|
"""Correct rotation using Hough Line detection.
|
|
|
|
Args:
|
|
img: BGR image.
|
|
|
|
Returns:
|
|
Tuple of (corrected image, detected angle in degrees).
|
|
"""
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
# Binarize for line detection
|
|
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
|
|
|
|
# Detect lines
|
|
lines = cv2.HoughLinesP(binary, 1, np.pi / 180, threshold=100,
|
|
minLineLength=img.shape[1] // 4, maxLineGap=20)
|
|
|
|
if lines is None or len(lines) < 3:
|
|
return img, 0.0
|
|
|
|
# Compute angles of near-horizontal lines
|
|
angles = []
|
|
for line in lines:
|
|
x1, y1, x2, y2 = line[0]
|
|
angle = np.degrees(np.arctan2(y2 - y1, x2 - x1))
|
|
if abs(angle) < 15: # Only near-horizontal
|
|
angles.append(angle)
|
|
|
|
if not angles:
|
|
return img, 0.0
|
|
|
|
median_angle = float(np.median(angles))
|
|
|
|
# Limit correction to ±5°
|
|
if abs(median_angle) > 5.0:
|
|
median_angle = 5.0 * np.sign(median_angle)
|
|
|
|
if abs(median_angle) < 0.1:
|
|
return img, 0.0
|
|
|
|
# Rotate
|
|
h, w = img.shape[:2]
|
|
center = (w // 2, h // 2)
|
|
M = cv2.getRotationMatrix2D(center, median_angle, 1.0)
|
|
corrected = cv2.warpAffine(img, M, (w, h),
|
|
flags=cv2.INTER_LINEAR,
|
|
borderMode=cv2.BORDER_REPLICATE)
|
|
|
|
logger.info(f"Deskew: corrected {median_angle:.2f}° rotation")
|
|
return corrected, median_angle
|
|
|
|
|
|
def deskew_image_by_word_alignment(
|
|
image_data: bytes,
|
|
lang: str = "eng+deu",
|
|
downscale_factor: float = 0.5,
|
|
) -> Tuple[bytes, float]:
|
|
"""Correct rotation by fitting a line through left-most word starts per text line.
|
|
|
|
More robust than Hough-based deskew for vocabulary worksheets where text lines
|
|
have consistent left-alignment. Runs a quick Tesseract pass on a downscaled
|
|
copy to find word positions, computes the dominant left-edge column, fits a
|
|
line through those points and rotates the full-resolution image.
|
|
|
|
Args:
|
|
image_data: Raw image bytes (PNG/JPEG).
|
|
lang: Tesseract language string for the quick pass.
|
|
downscale_factor: Shrink factor for the quick Tesseract pass (0.5 = 50%).
|
|
|
|
Returns:
|
|
Tuple of (rotated image as PNG bytes, detected angle in degrees).
|
|
"""
|
|
if not CV2_AVAILABLE or not TESSERACT_AVAILABLE:
|
|
return image_data, 0.0
|
|
|
|
# 1. Decode image
|
|
img_array = np.frombuffer(image_data, dtype=np.uint8)
|
|
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
|
|
if img is None:
|
|
logger.warning("deskew_by_word_alignment: could not decode image")
|
|
return image_data, 0.0
|
|
|
|
orig_h, orig_w = img.shape[:2]
|
|
|
|
# 2. Downscale for fast Tesseract pass
|
|
small_w = int(orig_w * downscale_factor)
|
|
small_h = int(orig_h * downscale_factor)
|
|
small = cv2.resize(img, (small_w, small_h), interpolation=cv2.INTER_AREA)
|
|
|
|
# 3. Quick Tesseract — word-level positions
|
|
pil_small = Image.fromarray(cv2.cvtColor(small, cv2.COLOR_BGR2RGB))
|
|
try:
|
|
data = pytesseract.image_to_data(
|
|
pil_small, lang=lang, config="--psm 6 --oem 3",
|
|
output_type=pytesseract.Output.DICT,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"deskew_by_word_alignment: Tesseract failed: {e}")
|
|
return image_data, 0.0
|
|
|
|
# 4. Per text-line, find the left-most word start
|
|
# Group by (block_num, par_num, line_num)
|
|
from collections import defaultdict
|
|
line_groups: Dict[tuple, list] = defaultdict(list)
|
|
for i in range(len(data["text"])):
|
|
text = (data["text"][i] or "").strip()
|
|
conf = int(data["conf"][i])
|
|
if not text or conf < 20:
|
|
continue
|
|
key = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
|
|
line_groups[key].append(i)
|
|
|
|
if len(line_groups) < 5:
|
|
logger.info(f"deskew_by_word_alignment: only {len(line_groups)} lines, skipping")
|
|
return image_data, 0.0
|
|
|
|
# For each line, pick the word with smallest 'left' → compute (left_x, center_y)
|
|
# Scale back to original resolution
|
|
scale = 1.0 / downscale_factor
|
|
points = [] # list of (x, y) in original-image coords
|
|
for key, indices in line_groups.items():
|
|
best_idx = min(indices, key=lambda i: data["left"][i])
|
|
lx = data["left"][best_idx] * scale
|
|
top = data["top"][best_idx] * scale
|
|
h = data["height"][best_idx] * scale
|
|
cy = top + h / 2.0
|
|
points.append((lx, cy))
|
|
|
|
# 5. Find dominant left-edge column + compute angle
|
|
xs = np.array([p[0] for p in points])
|
|
ys = np.array([p[1] for p in points])
|
|
median_x = float(np.median(xs))
|
|
tolerance = orig_w * 0.03 # 3% of image width
|
|
|
|
mask = np.abs(xs - median_x) <= tolerance
|
|
filtered_xs = xs[mask]
|
|
filtered_ys = ys[mask]
|
|
|
|
if len(filtered_xs) < 5:
|
|
logger.info(f"deskew_by_word_alignment: only {len(filtered_xs)} aligned points after filter, skipping")
|
|
return image_data, 0.0
|
|
|
|
# polyfit: x = a*y + b → a = dx/dy → angle = arctan(a)
|
|
coeffs = np.polyfit(filtered_ys, filtered_xs, 1)
|
|
slope = coeffs[0] # dx/dy
|
|
angle_rad = np.arctan(slope)
|
|
angle_deg = float(np.degrees(angle_rad))
|
|
|
|
# Clamp to ±5°
|
|
angle_deg = max(-5.0, min(5.0, angle_deg))
|
|
|
|
logger.info(f"deskew_by_word_alignment: detected {angle_deg:.2f}° from {len(filtered_xs)} points "
|
|
f"(total lines: {len(line_groups)})")
|
|
|
|
if abs(angle_deg) < 0.05:
|
|
return image_data, 0.0
|
|
|
|
# 6. Rotate full-res image
|
|
center = (orig_w // 2, orig_h // 2)
|
|
M = cv2.getRotationMatrix2D(center, angle_deg, 1.0)
|
|
rotated = cv2.warpAffine(img, M, (orig_w, orig_h),
|
|
flags=cv2.INTER_LINEAR,
|
|
borderMode=cv2.BORDER_REPLICATE)
|
|
|
|
# Encode back to PNG
|
|
success, png_buf = cv2.imencode(".png", rotated)
|
|
if not success:
|
|
logger.warning("deskew_by_word_alignment: PNG encoding failed")
|
|
return image_data, 0.0
|
|
|
|
return png_buf.tobytes(), angle_deg
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 3: Dewarp (Book Curvature Correction)
|
|
# =============================================================================
|
|
|
|
def _detect_shear_angle(img: np.ndarray) -> Dict[str, Any]:
|
|
"""Detect the vertical shear angle of the page.
|
|
|
|
After deskew (horizontal lines aligned), vertical features like column
|
|
edges may still be tilted. This measures that tilt by tracking the
|
|
strongest vertical edge across horizontal strips.
|
|
|
|
The result is a shear angle in degrees: the angular difference between
|
|
true vertical and the detected column edge.
|
|
|
|
Returns:
|
|
Dict with keys: method, shear_degrees, confidence.
|
|
"""
|
|
h, w = img.shape[:2]
|
|
result = {"method": "vertical_edge", "shear_degrees": 0.0, "confidence": 0.0}
|
|
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Vertical Sobel to find vertical edges
|
|
sobel_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
|
|
abs_sobel = np.abs(sobel_x).astype(np.uint8)
|
|
|
|
# Binarize with Otsu
|
|
_, binary = cv2.threshold(abs_sobel, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
|
|
|
|
num_strips = 20
|
|
strip_h = h // num_strips
|
|
edge_positions = [] # (y_center, x_position)
|
|
|
|
for i in range(num_strips):
|
|
y_start = i * strip_h
|
|
y_end = min((i + 1) * strip_h, h)
|
|
strip = binary[y_start:y_end, :]
|
|
|
|
# Project vertically (sum along y-axis)
|
|
projection = np.sum(strip, axis=0).astype(np.float64)
|
|
if projection.max() == 0:
|
|
continue
|
|
|
|
# Find the strongest vertical edge in left 40% of image
|
|
search_w = int(w * 0.4)
|
|
left_proj = projection[:search_w]
|
|
if left_proj.max() == 0:
|
|
continue
|
|
|
|
# Smooth and find peak
|
|
kernel_size = max(3, w // 100)
|
|
if kernel_size % 2 == 0:
|
|
kernel_size += 1
|
|
smoothed = cv2.GaussianBlur(left_proj.reshape(1, -1), (kernel_size, 1), 0).flatten()
|
|
x_pos = float(np.argmax(smoothed))
|
|
y_center = (y_start + y_end) / 2.0
|
|
edge_positions.append((y_center, x_pos))
|
|
|
|
if len(edge_positions) < 8:
|
|
return result
|
|
|
|
ys = np.array([p[0] for p in edge_positions])
|
|
xs = np.array([p[1] for p in edge_positions])
|
|
|
|
# Remove outliers (> 2 std from median)
|
|
median_x = np.median(xs)
|
|
std_x = max(np.std(xs), 1.0)
|
|
mask = np.abs(xs - median_x) < 2 * std_x
|
|
ys = ys[mask]
|
|
xs = xs[mask]
|
|
|
|
if len(ys) < 6:
|
|
return result
|
|
|
|
# Fit straight line: x = slope * y + intercept
|
|
# The slope tells us the tilt of the vertical edge
|
|
straight_coeffs = np.polyfit(ys, xs, 1)
|
|
slope = straight_coeffs[0] # dx/dy in pixels
|
|
fitted = np.polyval(straight_coeffs, ys)
|
|
residuals = xs - fitted
|
|
rmse = float(np.sqrt(np.mean(residuals ** 2)))
|
|
|
|
# Convert slope to angle: arctan(dx/dy) in degrees
|
|
import math
|
|
shear_degrees = math.degrees(math.atan(slope))
|
|
|
|
confidence = min(1.0, len(ys) / 15.0) * max(0.5, 1.0 - rmse / 5.0)
|
|
|
|
result["shear_degrees"] = round(shear_degrees, 3)
|
|
result["confidence"] = round(float(confidence), 2)
|
|
|
|
return result
|
|
|
|
|
|
def _apply_shear(img: np.ndarray, shear_degrees: float) -> np.ndarray:
|
|
"""Apply a vertical shear correction to an image.
|
|
|
|
Shifts each row horizontally proportional to its distance from the
|
|
vertical center. This corrects the tilt of vertical features (columns)
|
|
without affecting horizontal alignment (text lines).
|
|
|
|
Args:
|
|
img: BGR image.
|
|
shear_degrees: Shear angle in degrees. Positive = shift top-right/bottom-left.
|
|
|
|
Returns:
|
|
Corrected image.
|
|
"""
|
|
import math
|
|
h, w = img.shape[:2]
|
|
shear_tan = math.tan(math.radians(shear_degrees))
|
|
|
|
# Affine matrix: shift x by shear_tan * (y - h/2)
|
|
# [1 shear_tan -h/2*shear_tan]
|
|
# [0 1 0 ]
|
|
M = np.float32([
|
|
[1, shear_tan, -h / 2.0 * shear_tan],
|
|
[0, 1, 0],
|
|
])
|
|
|
|
corrected = cv2.warpAffine(img, M, (w, h),
|
|
flags=cv2.INTER_LINEAR,
|
|
borderMode=cv2.BORDER_REPLICATE)
|
|
return corrected
|
|
|
|
|
|
def dewarp_image(img: np.ndarray) -> Tuple[np.ndarray, Dict[str, Any]]:
|
|
"""Correct vertical shear after deskew.
|
|
|
|
After deskew aligns horizontal text lines, vertical features (column
|
|
edges) may still be tilted. This detects the tilt angle of the strongest
|
|
vertical edge and applies an affine shear correction.
|
|
|
|
Args:
|
|
img: BGR image (already deskewed).
|
|
|
|
Returns:
|
|
Tuple of (corrected_image, dewarp_info).
|
|
dewarp_info keys: method, shear_degrees, confidence.
|
|
"""
|
|
no_correction = {
|
|
"method": "none",
|
|
"shear_degrees": 0.0,
|
|
"confidence": 0.0,
|
|
}
|
|
|
|
if not CV2_AVAILABLE:
|
|
return img, no_correction
|
|
|
|
t0 = time.time()
|
|
|
|
detection = _detect_shear_angle(img)
|
|
duration = time.time() - t0
|
|
|
|
shear_deg = detection["shear_degrees"]
|
|
confidence = detection["confidence"]
|
|
|
|
logger.info(f"dewarp: detected shear={shear_deg:.3f}° "
|
|
f"conf={confidence:.2f} ({duration:.2f}s)")
|
|
|
|
# Only correct if shear is significant (> 0.05°)
|
|
if abs(shear_deg) < 0.05 or confidence < 0.3:
|
|
return img, no_correction
|
|
|
|
# Apply correction (negate the detected shear to straighten)
|
|
corrected = _apply_shear(img, -shear_deg)
|
|
|
|
info = {
|
|
"method": detection["method"],
|
|
"shear_degrees": shear_deg,
|
|
"confidence": confidence,
|
|
}
|
|
|
|
return corrected, info
|
|
|
|
|
|
def dewarp_image_manual(img: np.ndarray, shear_degrees: float) -> np.ndarray:
|
|
"""Apply shear correction with a manual angle.
|
|
|
|
Args:
|
|
img: BGR image (deskewed, before dewarp).
|
|
shear_degrees: Shear angle in degrees to correct.
|
|
|
|
Returns:
|
|
Corrected image.
|
|
"""
|
|
if abs(shear_degrees) < 0.001:
|
|
return img
|
|
return _apply_shear(img, -shear_degrees)
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 4: Dual Image Preparation
|
|
# =============================================================================
|
|
|
|
def create_ocr_image(img: np.ndarray) -> np.ndarray:
|
|
"""Create a binarized image optimized for Tesseract OCR.
|
|
|
|
Steps: Grayscale → Background normalization → Adaptive threshold → Denoise.
|
|
|
|
Args:
|
|
img: BGR image.
|
|
|
|
Returns:
|
|
Binary image (white text on black background inverted to black on white).
|
|
"""
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Background normalization: divide by blurred version
|
|
bg = cv2.GaussianBlur(gray, (51, 51), 0)
|
|
normalized = cv2.divide(gray, bg, scale=255)
|
|
|
|
# Adaptive binarization
|
|
binary = cv2.adaptiveThreshold(
|
|
normalized, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
|
cv2.THRESH_BINARY, 31, 10
|
|
)
|
|
|
|
# Light denoise
|
|
denoised = cv2.medianBlur(binary, 3)
|
|
|
|
return denoised
|
|
|
|
|
|
def create_layout_image(img: np.ndarray) -> np.ndarray:
|
|
"""Create a CLAHE-enhanced grayscale image for layout analysis.
|
|
|
|
Args:
|
|
img: BGR image.
|
|
|
|
Returns:
|
|
Enhanced grayscale image.
|
|
"""
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
|
|
enhanced = clahe.apply(gray)
|
|
return enhanced
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 5: Layout Analysis (Projection Profiles)
|
|
# =============================================================================
|
|
|
|
def _find_content_bounds(inv: np.ndarray) -> Tuple[int, int, int, int]:
|
|
"""Find the bounding box of actual text content (excluding page margins).
|
|
|
|
Returns:
|
|
Tuple of (left_x, right_x, top_y, bottom_y).
|
|
"""
|
|
h, w = inv.shape[:2]
|
|
|
|
# Horizontal projection for top/bottom
|
|
h_proj = np.sum(inv, axis=1).astype(float) / (w * 255)
|
|
|
|
top_y = 0
|
|
for y in range(h):
|
|
if h_proj[y] > 0.005:
|
|
top_y = max(0, y - 5)
|
|
break
|
|
|
|
bottom_y = h
|
|
for y in range(h - 1, 0, -1):
|
|
if h_proj[y] > 0.005:
|
|
bottom_y = min(h, y + 5)
|
|
break
|
|
|
|
# Vertical projection for left/right margins
|
|
v_proj = np.sum(inv[top_y:bottom_y, :], axis=0).astype(float)
|
|
v_proj_norm = v_proj / ((bottom_y - top_y) * 255) if (bottom_y - top_y) > 0 else v_proj
|
|
|
|
left_x = 0
|
|
for x in range(w):
|
|
if v_proj_norm[x] > 0.005:
|
|
left_x = max(0, x - 2)
|
|
break
|
|
|
|
right_x = w
|
|
for x in range(w - 1, 0, -1):
|
|
if v_proj_norm[x] > 0.005:
|
|
right_x = min(w, x + 2)
|
|
break
|
|
|
|
return left_x, right_x, top_y, bottom_y
|
|
|
|
|
|
def analyze_layout(layout_img: np.ndarray, ocr_img: np.ndarray) -> List[PageRegion]:
|
|
"""Detect columns, header, and footer using projection profiles.
|
|
|
|
Uses content-bounds detection to exclude page margins before searching
|
|
for column separators within the actual text area.
|
|
|
|
Args:
|
|
layout_img: CLAHE-enhanced grayscale image.
|
|
ocr_img: Binarized image for text density analysis.
|
|
|
|
Returns:
|
|
List of PageRegion objects describing detected regions.
|
|
"""
|
|
h, w = ocr_img.shape[:2]
|
|
|
|
# Invert: black text on white → white text on black for projection
|
|
inv = cv2.bitwise_not(ocr_img)
|
|
|
|
# --- Find actual content bounds (exclude page margins) ---
|
|
left_x, right_x, top_y, bottom_y = _find_content_bounds(inv)
|
|
content_w = right_x - left_x
|
|
content_h = bottom_y - top_y
|
|
|
|
logger.info(f"Layout: content bounds x=[{left_x}..{right_x}] ({content_w}px), "
|
|
f"y=[{top_y}..{bottom_y}] ({content_h}px) in {w}x{h} image")
|
|
|
|
if content_w < w * 0.3 or content_h < h * 0.3:
|
|
# Fallback if detection seems wrong
|
|
left_x, right_x = 0, w
|
|
top_y, bottom_y = 0, h
|
|
content_w, content_h = w, h
|
|
|
|
# --- Vertical projection within content area to find column separators ---
|
|
content_strip = inv[top_y:bottom_y, left_x:right_x]
|
|
v_proj = np.sum(content_strip, axis=0).astype(float)
|
|
v_proj_norm = v_proj / (content_h * 255) if content_h > 0 else v_proj
|
|
|
|
# Smooth the projection profile
|
|
kernel_size = max(5, content_w // 50)
|
|
if kernel_size % 2 == 0:
|
|
kernel_size += 1
|
|
v_proj_smooth = np.convolve(v_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
|
|
|
|
# Debug: log projection profile statistics
|
|
p_mean = float(np.mean(v_proj_smooth))
|
|
p_median = float(np.median(v_proj_smooth))
|
|
p_min = float(np.min(v_proj_smooth))
|
|
p_max = float(np.max(v_proj_smooth))
|
|
logger.info(f"Layout: v_proj stats — min={p_min:.4f}, max={p_max:.4f}, "
|
|
f"mean={p_mean:.4f}, median={p_median:.4f}")
|
|
|
|
# Find valleys using multiple threshold strategies
|
|
# Strategy 1: relative to median (catches clear separators)
|
|
# Strategy 2: local minima approach (catches subtle gaps)
|
|
threshold = max(p_median * 0.3, p_mean * 0.2)
|
|
logger.info(f"Layout: valley threshold={threshold:.4f}")
|
|
|
|
in_valley = v_proj_smooth < threshold
|
|
|
|
# Find contiguous valley regions
|
|
all_valleys = []
|
|
start = None
|
|
for x in range(len(v_proj_smooth)):
|
|
if in_valley[x] and start is None:
|
|
start = x
|
|
elif not in_valley[x] and start is not None:
|
|
valley_width = x - start
|
|
valley_depth = float(np.min(v_proj_smooth[start:x]))
|
|
# Valley must be at least 3px wide
|
|
if valley_width >= 3:
|
|
all_valleys.append((start, x, (start + x) // 2, valley_width, valley_depth))
|
|
start = None
|
|
|
|
logger.info(f"Layout: raw valleys (before filter): {len(all_valleys)} — "
|
|
f"{[(v[0]+left_x, v[1]+left_x, v[3], f'{v[4]:.4f}') for v in all_valleys[:10]]}")
|
|
|
|
# Filter: valleys must be inside the content area (not at edges)
|
|
inner_margin = int(content_w * 0.08)
|
|
valleys = [v for v in all_valleys if inner_margin < v[2] < content_w - inner_margin]
|
|
|
|
# If no valleys found with strict threshold, try local minima approach
|
|
if len(valleys) < 2:
|
|
logger.info("Layout: trying local minima approach for column detection")
|
|
# Divide content into 20 segments, find the 2 lowest
|
|
seg_count = 20
|
|
seg_width = content_w // seg_count
|
|
seg_scores = []
|
|
for i in range(seg_count):
|
|
sx = i * seg_width
|
|
ex = min((i + 1) * seg_width, content_w)
|
|
seg_mean = float(np.mean(v_proj_smooth[sx:ex]))
|
|
seg_scores.append((i, sx, ex, seg_mean))
|
|
|
|
seg_scores.sort(key=lambda s: s[3])
|
|
logger.info(f"Layout: segment scores (lowest 5): "
|
|
f"{[(s[0], s[1]+left_x, s[2]+left_x, f'{s[3]:.4f}') for s in seg_scores[:5]]}")
|
|
|
|
# Find two lowest non-adjacent segments that create reasonable columns
|
|
candidate_valleys = []
|
|
for seg_idx, sx, ex, seg_mean in seg_scores:
|
|
# Must not be at the edges
|
|
if seg_idx <= 1 or seg_idx >= seg_count - 2:
|
|
continue
|
|
# Must be significantly lower than overall mean
|
|
if seg_mean < p_mean * 0.6:
|
|
center = (sx + ex) // 2
|
|
candidate_valleys.append((sx, ex, center, ex - sx, seg_mean))
|
|
|
|
if len(candidate_valleys) >= 2:
|
|
# Pick the best pair: non-adjacent, creating reasonable column widths
|
|
candidate_valleys.sort(key=lambda v: v[2])
|
|
best_pair = None
|
|
best_score = float('inf')
|
|
for i in range(len(candidate_valleys)):
|
|
for j in range(i + 1, len(candidate_valleys)):
|
|
c1 = candidate_valleys[i][2]
|
|
c2 = candidate_valleys[j][2]
|
|
# Must be at least 20% apart
|
|
if (c2 - c1) < content_w * 0.2:
|
|
continue
|
|
col1 = c1
|
|
col2 = c2 - c1
|
|
col3 = content_w - c2
|
|
# Each column at least 15%
|
|
if col1 < content_w * 0.12 or col2 < content_w * 0.12 or col3 < content_w * 0.12:
|
|
continue
|
|
parts = sorted([col1, col2, col3])
|
|
score = parts[2] - parts[0]
|
|
if score < best_score:
|
|
best_score = score
|
|
best_pair = (candidate_valleys[i], candidate_valleys[j])
|
|
|
|
if best_pair:
|
|
valleys = list(best_pair)
|
|
logger.info(f"Layout: local minima found 2 valleys: "
|
|
f"{[(v[0]+left_x, v[1]+left_x, v[3]) for v in valleys]}")
|
|
|
|
logger.info(f"Layout: final {len(valleys)} valleys: "
|
|
f"{[(v[0]+left_x, v[1]+left_x, v[3]) for v in valleys]}")
|
|
|
|
regions = []
|
|
|
|
if len(valleys) >= 2:
|
|
# 3-column layout detected
|
|
valleys.sort(key=lambda v: v[2])
|
|
|
|
if len(valleys) == 2:
|
|
sep1_center = valleys[0][2]
|
|
sep2_center = valleys[1][2]
|
|
else:
|
|
# Pick the two valleys that best divide into 3 parts
|
|
# Prefer wider valleys (more likely true separators)
|
|
best_pair = None
|
|
best_score = float('inf')
|
|
for i in range(len(valleys)):
|
|
for j in range(i + 1, len(valleys)):
|
|
c1, c2 = valleys[i][2], valleys[j][2]
|
|
# Each column should be at least 15% of content width
|
|
col1 = c1
|
|
col2 = c2 - c1
|
|
col3 = content_w - c2
|
|
if col1 < content_w * 0.15 or col2 < content_w * 0.15 or col3 < content_w * 0.15:
|
|
continue
|
|
# Score: lower is better (more even distribution)
|
|
parts = sorted([col1, col2, col3])
|
|
score = parts[2] - parts[0]
|
|
# Bonus for wider valleys (subtract valley width)
|
|
score -= (valleys[i][3] + valleys[j][3]) * 0.5
|
|
if score < best_score:
|
|
best_score = score
|
|
best_pair = (c1, c2)
|
|
if best_pair:
|
|
sep1_center, sep2_center = best_pair
|
|
else:
|
|
sep1_center = valleys[0][2]
|
|
sep2_center = valleys[1][2]
|
|
|
|
# Convert from content-relative to absolute coordinates
|
|
abs_sep1 = sep1_center + left_x
|
|
abs_sep2 = sep2_center + left_x
|
|
|
|
logger.info(f"Layout: 3 columns at separators x={abs_sep1}, x={abs_sep2} "
|
|
f"(widths: {abs_sep1}, {abs_sep2-abs_sep1}, {w-abs_sep2})")
|
|
|
|
regions.append(PageRegion(
|
|
type='column_en', x=0, y=top_y,
|
|
width=abs_sep1, height=content_h
|
|
))
|
|
regions.append(PageRegion(
|
|
type='column_de', x=abs_sep1, y=top_y,
|
|
width=abs_sep2 - abs_sep1, height=content_h
|
|
))
|
|
regions.append(PageRegion(
|
|
type='column_example', x=abs_sep2, y=top_y,
|
|
width=w - abs_sep2, height=content_h
|
|
))
|
|
|
|
elif len(valleys) == 1:
|
|
# 2-column layout
|
|
abs_sep = valleys[0][2] + left_x
|
|
|
|
logger.info(f"Layout: 2 columns at separator x={abs_sep}")
|
|
|
|
regions.append(PageRegion(
|
|
type='column_en', x=0, y=top_y,
|
|
width=abs_sep, height=content_h
|
|
))
|
|
regions.append(PageRegion(
|
|
type='column_de', x=abs_sep, y=top_y,
|
|
width=w - abs_sep, height=content_h
|
|
))
|
|
|
|
else:
|
|
# No columns detected — run full-page OCR as single column
|
|
logger.warning("Layout: no column separators found, using full page")
|
|
regions.append(PageRegion(
|
|
type='column_en', x=0, y=top_y,
|
|
width=w, height=content_h
|
|
))
|
|
|
|
# Add header/footer info
|
|
if top_y > 10:
|
|
regions.append(PageRegion(
|
|
type='header', x=0, y=0,
|
|
width=w, height=top_y
|
|
))
|
|
if bottom_y < h - 10:
|
|
regions.append(PageRegion(
|
|
type='footer', x=0, y=bottom_y,
|
|
width=w, height=h - bottom_y
|
|
))
|
|
|
|
col_count = len([r for r in regions if r.type.startswith('column')])
|
|
logger.info(f"Layout: {col_count} columns, "
|
|
f"header={'yes' if top_y > 10 else 'no'}, "
|
|
f"footer={'yes' if bottom_y < h - 10 else 'no'}")
|
|
|
|
return regions
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 5b: Word-Based Layout Analysis (Two-Phase Column Detection)
|
|
# =============================================================================
|
|
|
|
# --- Phase A: Geometry Detection ---
|
|
|
|
def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int]]:
|
|
"""Detect column geometry by clustering left-aligned word positions.
|
|
|
|
Phase A of the two-phase column detection. Returns untyped column
|
|
geometries with their words for subsequent content-based classification.
|
|
|
|
Args:
|
|
ocr_img: Binarized grayscale image for layout analysis.
|
|
dewarped_bgr: Original BGR image (for Tesseract word detection).
|
|
|
|
Returns:
|
|
Tuple of (geometries, left_x, right_x, top_y, bottom_y) or None if
|
|
fewer than 3 clusters are found (signals fallback needed).
|
|
"""
|
|
h, w = ocr_img.shape[:2]
|
|
|
|
# --- Find content bounds ---
|
|
inv = cv2.bitwise_not(ocr_img)
|
|
left_x, right_x, top_y, bottom_y = _find_content_bounds(inv)
|
|
content_w = right_x - left_x
|
|
content_h = bottom_y - top_y
|
|
|
|
if content_w < w * 0.3 or content_h < h * 0.3:
|
|
left_x, right_x = 0, w
|
|
top_y, bottom_y = 0, h
|
|
content_w, content_h = w, h
|
|
|
|
logger.info(f"ColumnGeometry: content bounds x=[{left_x}..{right_x}] ({content_w}px), "
|
|
f"y=[{top_y}..{bottom_y}] ({content_h}px)")
|
|
|
|
# --- Get word bounding boxes from Tesseract ---
|
|
content_roi = dewarped_bgr[top_y:bottom_y, left_x:right_x]
|
|
pil_img = Image.fromarray(cv2.cvtColor(content_roi, cv2.COLOR_BGR2RGB))
|
|
|
|
try:
|
|
data = pytesseract.image_to_data(pil_img, lang='eng+deu', output_type=pytesseract.Output.DICT)
|
|
except Exception as e:
|
|
logger.warning(f"ColumnGeometry: Tesseract image_to_data failed: {e}")
|
|
return None
|
|
|
|
# Collect words with their full info
|
|
word_dicts = []
|
|
left_edges = []
|
|
edge_word_indices = [] # Track which word_dicts index each edge belongs to
|
|
n_words = len(data['text'])
|
|
for i in range(n_words):
|
|
conf = int(data['conf'][i]) if str(data['conf'][i]).lstrip('-').isdigit() else -1
|
|
text = str(data['text'][i]).strip()
|
|
if conf < 30 or not text:
|
|
continue
|
|
lx = int(data['left'][i])
|
|
ty = int(data['top'][i])
|
|
bw = int(data['width'][i])
|
|
bh = int(data['height'][i])
|
|
left_edges.append(lx)
|
|
edge_word_indices.append(len(word_dicts))
|
|
word_dicts.append({
|
|
'text': text, 'conf': conf,
|
|
'left': lx, 'top': ty, 'width': bw, 'height': bh,
|
|
})
|
|
|
|
if len(left_edges) < 5:
|
|
logger.warning(f"ColumnGeometry: only {len(left_edges)} words detected")
|
|
return None
|
|
|
|
logger.info(f"ColumnGeometry: {len(left_edges)} words detected in content area")
|
|
|
|
# --- Cluster left edges (tracking word indices per cluster) ---
|
|
tolerance = max(10, int(content_w * 0.01))
|
|
|
|
# Sort edges while keeping word index association
|
|
sorted_pairs = sorted(zip(left_edges, edge_word_indices), key=lambda p: p[0])
|
|
|
|
clusters = [] # list of lists of edge x-values
|
|
cluster_widxs = [] # parallel list of lists of word_dicts indices
|
|
cur_edges = [sorted_pairs[0][0]]
|
|
cur_widxs = [sorted_pairs[0][1]]
|
|
for edge, widx in sorted_pairs[1:]:
|
|
if edge - cur_edges[-1] <= tolerance:
|
|
cur_edges.append(edge)
|
|
cur_widxs.append(widx)
|
|
else:
|
|
clusters.append(cur_edges)
|
|
cluster_widxs.append(cur_widxs)
|
|
cur_edges = [edge]
|
|
cur_widxs = [widx]
|
|
clusters.append(cur_edges)
|
|
cluster_widxs.append(cur_widxs)
|
|
|
|
# --- Enrich clusters with Y-span info and apply verticality filter ---
|
|
MIN_Y_COVERAGE_PRIMARY = 0.30 # Primary columns span >= 30% of page height
|
|
MIN_Y_COVERAGE_SECONDARY = 0.15 # Secondary columns span >= 15%
|
|
MIN_WORDS_SECONDARY = 5 # Secondary columns need >= 5 words
|
|
|
|
cluster_infos = []
|
|
for c_edges, c_widxs in zip(clusters, cluster_widxs):
|
|
if len(c_edges) < 2:
|
|
continue
|
|
y_positions = [word_dicts[idx]['top'] for idx in c_widxs]
|
|
y_span = max(y_positions) - min(y_positions)
|
|
y_coverage = y_span / content_h if content_h > 0 else 0.0
|
|
|
|
cluster_infos.append({
|
|
'mean_x': int(np.mean(c_edges)),
|
|
'count': len(c_edges),
|
|
'min_edge': min(c_edges),
|
|
'max_edge': max(c_edges),
|
|
'y_min': min(y_positions),
|
|
'y_max': max(y_positions),
|
|
'y_coverage': y_coverage,
|
|
})
|
|
|
|
_ci_summary = [(ci['mean_x']+left_x, ci['count'], format(ci['y_coverage'], '.0%')) for ci in cluster_infos[:12]]
|
|
logger.info(f"ColumnGeometry: {len(cluster_infos)} clusters with >=2 words "
|
|
f"(from {len(clusters)} total), y_coverage: {_ci_summary}")
|
|
|
|
# Primary: good vertical coverage
|
|
primary = [c for c in cluster_infos if c['y_coverage'] >= MIN_Y_COVERAGE_PRIMARY]
|
|
# Secondary: moderate coverage with enough words
|
|
primary_set = set(id(c) for c in primary)
|
|
secondary = [c for c in cluster_infos
|
|
if id(c) not in primary_set
|
|
and c['y_coverage'] >= MIN_Y_COVERAGE_SECONDARY
|
|
and c['count'] >= MIN_WORDS_SECONDARY]
|
|
|
|
significant = sorted(primary + secondary, key=lambda c: c['mean_x'])
|
|
|
|
_sig_summary = [(s['mean_x']+left_x, s['count'], format(s['y_coverage'], '.0%')) for s in significant[:10]]
|
|
logger.info(f"ColumnGeometry: {len(significant)} significant clusters "
|
|
f"(primary={len(primary)}, secondary={len(secondary)}): {_sig_summary}")
|
|
|
|
if len(significant) < 3:
|
|
logger.info("ColumnGeometry: < 3 clusters after verticality filter, signaling fallback")
|
|
return None
|
|
|
|
# --- Merge clusters that are very close ---
|
|
# 6% of content width: on a typical 5-col vocab page (~1500px wide),
|
|
# this is ~90px, which merges sub-alignments within a single column
|
|
# while keeping real column boundaries (~300px apart) separate.
|
|
merge_distance = max(30, int(content_w * 0.06))
|
|
merged = [significant[0].copy()]
|
|
for s in significant[1:]:
|
|
if s['mean_x'] - merged[-1]['mean_x'] < merge_distance:
|
|
prev = merged[-1]
|
|
total = prev['count'] + s['count']
|
|
avg_x = (prev['mean_x'] * prev['count'] + s['mean_x'] * s['count']) // total
|
|
prev['mean_x'] = avg_x
|
|
prev['count'] = total
|
|
prev['min_edge'] = min(prev['min_edge'], s['min_edge'])
|
|
prev['max_edge'] = max(prev['max_edge'], s['max_edge'])
|
|
prev['y_min'] = min(prev['y_min'], s['y_min'])
|
|
prev['y_max'] = max(prev['y_max'], s['y_max'])
|
|
prev['y_coverage'] = (prev['y_max'] - prev['y_min']) / content_h if content_h > 0 else 0.0
|
|
else:
|
|
merged.append(s.copy())
|
|
|
|
# --- Post-merge: absorb tiny clusters (< 5% content width) into neighbors ---
|
|
i = 0
|
|
absorbed_count = 0
|
|
while i < len(merged) and len(merged) > 3:
|
|
if i + 1 < len(merged):
|
|
cluster_w = merged[i + 1]['mean_x'] - merged[i]['mean_x']
|
|
else:
|
|
cluster_w = content_w - (merged[i]['mean_x'] - merged[0]['mean_x'])
|
|
if cluster_w / content_w < 0.05:
|
|
# Absorb into neighbor (prefer left)
|
|
if i > 0:
|
|
target = merged[i - 1]
|
|
else:
|
|
target = merged[i + 1]
|
|
target['count'] += merged[i]['count']
|
|
target['min_edge'] = min(target['min_edge'], merged[i]['min_edge'])
|
|
target['max_edge'] = max(target['max_edge'], merged[i]['max_edge'])
|
|
target['y_min'] = min(target['y_min'], merged[i]['y_min'])
|
|
target['y_max'] = max(target['y_max'], merged[i]['y_max'])
|
|
target['y_coverage'] = (target['y_max'] - target['y_min']) / content_h if content_h > 0 else 0.0
|
|
del merged[i]
|
|
absorbed_count += 1
|
|
else:
|
|
i += 1
|
|
if absorbed_count:
|
|
logger.info(f"ColumnGeometry: absorbed {absorbed_count} tiny clusters (< 5% width)")
|
|
|
|
_merged_summary = [(m['mean_x']+left_x, m['count'], format(m['y_coverage'], '.0%')) for m in merged]
|
|
logger.info(f"ColumnGeometry: {len(merged)} clusters after merging (dist={merge_distance}px): {_merged_summary}")
|
|
|
|
if len(merged) < 3:
|
|
logger.info("ColumnGeometry: < 3 merged clusters, signaling fallback")
|
|
return None
|
|
|
|
# --- Derive column boundaries ---
|
|
margin_px = max(6, int(content_w * 0.003)) # ~2mm margin before column start
|
|
|
|
col_starts = []
|
|
for m in merged:
|
|
abs_start = max(0, left_x + m['min_edge'] - margin_px)
|
|
col_starts.append((abs_start, m['count']))
|
|
|
|
# Calculate column widths and assign words to columns
|
|
geometries = []
|
|
for i, (start_x, count) in enumerate(col_starts):
|
|
if i + 1 < len(col_starts):
|
|
col_width = col_starts[i + 1][0] - start_x
|
|
else:
|
|
col_width = right_x - start_x
|
|
|
|
# Assign words to this column based on left edge
|
|
col_left_rel = start_x - left_x
|
|
col_right_rel = col_left_rel + col_width
|
|
col_words = [w for w in word_dicts
|
|
if col_left_rel <= w['left'] < col_right_rel]
|
|
|
|
geometries.append(ColumnGeometry(
|
|
index=i,
|
|
x=start_x,
|
|
y=top_y,
|
|
width=col_width,
|
|
height=content_h,
|
|
word_count=len(col_words),
|
|
words=col_words,
|
|
width_ratio=col_width / content_w if content_w > 0 else 0.0,
|
|
))
|
|
|
|
logger.info(f"ColumnGeometry: {len(geometries)} columns: "
|
|
f"{[(g.index, g.x, g.width, g.word_count) for g in geometries]}")
|
|
|
|
return (geometries, left_x, right_x, top_y, bottom_y)
|
|
|
|
|
|
# --- Phase B: Content-Based Classification ---
|
|
|
|
def _score_language(words: List[Dict]) -> Dict[str, float]:
|
|
"""Score the language of a column's words.
|
|
|
|
Analyzes function words, umlauts, and capitalization patterns
|
|
to determine whether text is English or German.
|
|
|
|
Args:
|
|
words: List of word dicts with 'text' and 'conf' keys.
|
|
|
|
Returns:
|
|
Dict with 'eng' and 'deu' scores (0.0-1.0).
|
|
"""
|
|
if not words:
|
|
return {'eng': 0.0, 'deu': 0.0}
|
|
|
|
# Only consider words with decent confidence
|
|
good_words = [w['text'].lower() for w in words if w.get('conf', 0) > 40 and len(w['text']) > 0]
|
|
if not good_words:
|
|
return {'eng': 0.0, 'deu': 0.0}
|
|
|
|
total = len(good_words)
|
|
en_hits = sum(1 for w in good_words if w in ENGLISH_FUNCTION_WORDS)
|
|
de_hits = sum(1 for w in good_words if w in GERMAN_FUNCTION_WORDS)
|
|
|
|
# Check for umlauts (strong German signal)
|
|
raw_texts = [w['text'] for w in words if w.get('conf', 0) > 40]
|
|
umlaut_count = sum(1 for t in raw_texts
|
|
for c in t if c in 'äöüÄÖÜß')
|
|
|
|
# German capitalization: nouns are capitalized mid-sentence
|
|
# Count words that start with uppercase but aren't at position 0
|
|
cap_words = sum(1 for t in raw_texts if t[0].isupper() and len(t) > 2)
|
|
|
|
en_score = en_hits / total if total > 0 else 0.0
|
|
de_score = de_hits / total if total > 0 else 0.0
|
|
|
|
# Boost German score for umlauts
|
|
if umlaut_count > 0:
|
|
de_score = min(1.0, de_score + 0.15 * min(umlaut_count, 5))
|
|
|
|
# Boost German score for high capitalization ratio (typical for German nouns)
|
|
if total > 5:
|
|
cap_ratio = cap_words / total
|
|
if cap_ratio > 0.3:
|
|
de_score = min(1.0, de_score + 0.1)
|
|
|
|
return {'eng': round(en_score, 3), 'deu': round(de_score, 3)}
|
|
|
|
|
|
def _score_role(geom: ColumnGeometry) -> Dict[str, float]:
|
|
"""Score the role of a column based on its geometry and content patterns.
|
|
|
|
Args:
|
|
geom: ColumnGeometry with words and dimensions.
|
|
|
|
Returns:
|
|
Dict with role scores: 'reference', 'marker', 'sentence', 'vocabulary'.
|
|
"""
|
|
scores = {'reference': 0.0, 'marker': 0.0, 'sentence': 0.0, 'vocabulary': 0.0}
|
|
|
|
if not geom.words:
|
|
return scores
|
|
|
|
texts = [w['text'] for w in geom.words if w.get('conf', 0) > 40]
|
|
if not texts:
|
|
return scores
|
|
|
|
avg_word_len = sum(len(t) for t in texts) / len(texts)
|
|
has_punctuation = sum(1 for t in texts if any(c in t for c in '.!?;:,'))
|
|
digit_words = sum(1 for t in texts if any(c.isdigit() for c in t))
|
|
digit_ratio = digit_words / len(texts) if texts else 0.0
|
|
|
|
# Reference: narrow + mostly numbers/page references
|
|
if geom.width_ratio < 0.12:
|
|
scores['reference'] = 0.5
|
|
if digit_ratio > 0.4:
|
|
scores['reference'] = min(1.0, 0.5 + digit_ratio * 0.5)
|
|
|
|
# Marker: narrow + few short entries
|
|
if geom.width_ratio < 0.06 and geom.word_count <= 15:
|
|
scores['marker'] = 0.7
|
|
if avg_word_len < 4:
|
|
scores['marker'] = 0.9
|
|
# Very narrow non-edge column → strong marker regardless of word count
|
|
if geom.width_ratio < 0.04 and geom.index > 0:
|
|
scores['marker'] = max(scores['marker'], 0.9)
|
|
|
|
# Sentence: longer words + punctuation present
|
|
if geom.width_ratio > 0.15 and has_punctuation > 2:
|
|
scores['sentence'] = 0.3 + min(0.5, has_punctuation / len(texts))
|
|
if avg_word_len > 4:
|
|
scores['sentence'] = min(1.0, scores['sentence'] + 0.2)
|
|
|
|
# Vocabulary: medium width + medium word length
|
|
if 0.10 < geom.width_ratio < 0.45:
|
|
scores['vocabulary'] = 0.4
|
|
if 3 < avg_word_len < 8:
|
|
scores['vocabulary'] = min(1.0, scores['vocabulary'] + 0.3)
|
|
|
|
return {k: round(v, 3) for k, v in scores.items()}
|
|
|
|
|
|
def classify_column_types(geometries: List[ColumnGeometry],
|
|
content_w: int,
|
|
top_y: int,
|
|
img_w: int,
|
|
img_h: int,
|
|
bottom_y: int) -> List[PageRegion]:
|
|
"""Classify column types using a 3-level fallback chain.
|
|
|
|
Level 1: Content-based (language + role scoring)
|
|
Level 2: Position + language (old rules enhanced with language detection)
|
|
Level 3: Pure position (exact old code, no regression)
|
|
|
|
Args:
|
|
geometries: List of ColumnGeometry from Phase A.
|
|
content_w: Total content width.
|
|
top_y: Top Y of content area.
|
|
img_w: Full image width.
|
|
img_h: Full image height.
|
|
bottom_y: Bottom Y of content area.
|
|
|
|
Returns:
|
|
List of PageRegion with types, confidence, and method.
|
|
"""
|
|
content_h = bottom_y - top_y
|
|
|
|
# Special case: single column → plain text page
|
|
if len(geometries) == 1:
|
|
geom = geometries[0]
|
|
return [PageRegion(
|
|
type='column_text', x=geom.x, y=geom.y,
|
|
width=geom.width, height=geom.height,
|
|
classification_confidence=0.9,
|
|
classification_method='content',
|
|
)]
|
|
|
|
# --- Pre-filter: first/last columns with very few words → column_ignore ---
|
|
ignore_regions = []
|
|
active_geometries = []
|
|
for idx, g in enumerate(geometries):
|
|
if (idx == 0 or idx == len(geometries) - 1) and g.word_count < 8:
|
|
ignore_regions.append(PageRegion(
|
|
type='column_ignore', x=g.x, y=g.y,
|
|
width=g.width, height=content_h,
|
|
classification_confidence=0.95,
|
|
classification_method='content',
|
|
))
|
|
logger.info(f"ClassifyColumns: column {idx} (x={g.x}, words={g.word_count}) → column_ignore (edge, few words)")
|
|
else:
|
|
active_geometries.append(g)
|
|
|
|
# Re-index active geometries for classification
|
|
for new_idx, g in enumerate(active_geometries):
|
|
g.index = new_idx
|
|
geometries = active_geometries
|
|
|
|
# Handle edge case: all columns ignored or only 1 left
|
|
if len(geometries) == 0:
|
|
return ignore_regions
|
|
if len(geometries) == 1:
|
|
geom = geometries[0]
|
|
ignore_regions.append(PageRegion(
|
|
type='column_text', x=geom.x, y=geom.y,
|
|
width=geom.width, height=geom.height,
|
|
classification_confidence=0.9,
|
|
classification_method='content',
|
|
))
|
|
return ignore_regions
|
|
|
|
# --- Score all columns ---
|
|
lang_scores = [_score_language(g.words) for g in geometries]
|
|
role_scores = [_score_role(g) for g in geometries]
|
|
|
|
logger.info(f"ClassifyColumns: language scores: "
|
|
f"{[(g.index, ls) for g, ls in zip(geometries, lang_scores)]}")
|
|
logger.info(f"ClassifyColumns: role scores: "
|
|
f"{[(g.index, rs) for g, rs in zip(geometries, role_scores)]}")
|
|
|
|
# --- Level 1: Content-based classification ---
|
|
regions = _classify_by_content(geometries, lang_scores, role_scores, content_w, content_h)
|
|
if regions is not None:
|
|
logger.info("ClassifyColumns: Level 1 (content-based) succeeded")
|
|
_add_header_footer(regions, top_y, bottom_y, img_w, img_h)
|
|
return ignore_regions + regions
|
|
|
|
# --- Level 2: Position + language enhanced ---
|
|
regions = _classify_by_position_enhanced(geometries, lang_scores, content_w, content_h)
|
|
if regions is not None:
|
|
logger.info("ClassifyColumns: Level 2 (position+language) succeeded")
|
|
_add_header_footer(regions, top_y, bottom_y, img_w, img_h)
|
|
return ignore_regions + regions
|
|
|
|
# --- Level 3: Pure position fallback (old code, no regression) ---
|
|
logger.info("ClassifyColumns: Level 3 (position fallback)")
|
|
regions = _classify_by_position_fallback(geometries, content_w, content_h)
|
|
_add_header_footer(regions, top_y, bottom_y, img_w, img_h)
|
|
return ignore_regions + regions
|
|
|
|
|
|
def _classify_by_content(geometries: List[ColumnGeometry],
|
|
lang_scores: List[Dict[str, float]],
|
|
role_scores: List[Dict[str, float]],
|
|
content_w: int,
|
|
content_h: int) -> Optional[List[PageRegion]]:
|
|
"""Level 1: Classify columns purely by content analysis.
|
|
|
|
Requires clear language signals to distinguish EN/DE columns.
|
|
Returns None if language signals are too weak.
|
|
"""
|
|
regions = []
|
|
assigned = set()
|
|
|
|
# Step 1: Assign structural roles first (reference, marker)
|
|
# left_20_threshold: only the leftmost ~20% of content area qualifies for page_ref
|
|
left_20_threshold = geometries[0].x + content_w * 0.20 if geometries else 0
|
|
|
|
for i, (geom, rs, ls) in enumerate(zip(geometries, role_scores, lang_scores)):
|
|
is_left_side = geom.x < left_20_threshold
|
|
has_strong_language = ls['eng'] > 0.3 or ls['deu'] > 0.3
|
|
if rs['reference'] >= 0.5 and geom.width_ratio < 0.12 and is_left_side and not has_strong_language:
|
|
regions.append(PageRegion(
|
|
type='page_ref', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=rs['reference'],
|
|
classification_method='content',
|
|
))
|
|
assigned.add(i)
|
|
elif rs['marker'] >= 0.7 and geom.width_ratio < 0.06:
|
|
regions.append(PageRegion(
|
|
type='column_marker', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=rs['marker'],
|
|
classification_method='content',
|
|
))
|
|
assigned.add(i)
|
|
elif geom.width_ratio < 0.05 and not is_left_side:
|
|
# Narrow column on the right side → marker, not page_ref
|
|
regions.append(PageRegion(
|
|
type='column_marker', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=0.8,
|
|
classification_method='content',
|
|
))
|
|
assigned.add(i)
|
|
|
|
# Step 2: Among remaining columns, find EN and DE by language scores
|
|
remaining = [(i, geometries[i], lang_scores[i], role_scores[i])
|
|
for i in range(len(geometries)) if i not in assigned]
|
|
|
|
if len(remaining) < 2:
|
|
# Not enough columns for EN/DE pair
|
|
if len(remaining) == 1:
|
|
i, geom, ls, rs = remaining[0]
|
|
regions.append(PageRegion(
|
|
type='column_text', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=0.6,
|
|
classification_method='content',
|
|
))
|
|
regions.sort(key=lambda r: r.x)
|
|
return regions
|
|
|
|
# Check if we have enough language signal
|
|
en_candidates = [(i, g, ls) for i, g, ls, rs in remaining if ls['eng'] > ls['deu'] and ls['eng'] > 0.05]
|
|
de_candidates = [(i, g, ls) for i, g, ls, rs in remaining if ls['deu'] > ls['eng'] and ls['deu'] > 0.05]
|
|
|
|
# Position tiebreaker: when language signals are weak, use left=EN, right=DE
|
|
if (not en_candidates or not de_candidates) and len(remaining) >= 2:
|
|
max_eng = max(ls['eng'] for _, _, ls, _ in remaining)
|
|
max_deu = max(ls['deu'] for _, _, ls, _ in remaining)
|
|
if max_eng < 0.15 and max_deu < 0.15:
|
|
# Both signals weak — fall back to positional: left=EN, right=DE
|
|
sorted_remaining = sorted(remaining, key=lambda x: x[1].x)
|
|
best_en = (sorted_remaining[0][0], sorted_remaining[0][1], sorted_remaining[0][2])
|
|
best_de = (sorted_remaining[1][0], sorted_remaining[1][1], sorted_remaining[1][2])
|
|
logger.info("ClassifyColumns: Level 1 using position tiebreaker (weak signals) - left=EN, right=DE")
|
|
en_conf = 0.4
|
|
de_conf = 0.4
|
|
|
|
regions.append(PageRegion(
|
|
type='column_en', x=best_en[1].x, y=best_en[1].y,
|
|
width=best_en[1].width, height=content_h,
|
|
classification_confidence=en_conf,
|
|
classification_method='content',
|
|
))
|
|
assigned.add(best_en[0])
|
|
|
|
regions.append(PageRegion(
|
|
type='column_de', x=best_de[1].x, y=best_de[1].y,
|
|
width=best_de[1].width, height=content_h,
|
|
classification_confidence=de_conf,
|
|
classification_method='content',
|
|
))
|
|
assigned.add(best_de[0])
|
|
|
|
# Assign remaining as example
|
|
for i, geom, ls, rs in remaining:
|
|
if i not in assigned:
|
|
regions.append(PageRegion(
|
|
type='column_example', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=0.4,
|
|
classification_method='content',
|
|
))
|
|
regions.sort(key=lambda r: r.x)
|
|
return regions
|
|
|
|
if not en_candidates or not de_candidates:
|
|
# Language signals too weak for content-based classification
|
|
logger.info("ClassifyColumns: Level 1 failed - no clear EN/DE language split")
|
|
return None
|
|
|
|
# Pick the best EN and DE candidates
|
|
best_en = max(en_candidates, key=lambda x: x[2]['eng'])
|
|
best_de = max(de_candidates, key=lambda x: x[2]['deu'])
|
|
|
|
if best_en[0] == best_de[0]:
|
|
# Same column scored highest for both — ambiguous
|
|
logger.info("ClassifyColumns: Level 1 failed - same column highest for EN and DE")
|
|
return None
|
|
|
|
en_conf = best_en[2]['eng']
|
|
de_conf = best_de[2]['deu']
|
|
|
|
regions.append(PageRegion(
|
|
type='column_en', x=best_en[1].x, y=best_en[1].y,
|
|
width=best_en[1].width, height=content_h,
|
|
classification_confidence=round(en_conf, 2),
|
|
classification_method='content',
|
|
))
|
|
assigned.add(best_en[0])
|
|
|
|
regions.append(PageRegion(
|
|
type='column_de', x=best_de[1].x, y=best_de[1].y,
|
|
width=best_de[1].width, height=content_h,
|
|
classification_confidence=round(de_conf, 2),
|
|
classification_method='content',
|
|
))
|
|
assigned.add(best_de[0])
|
|
|
|
# Step 3: Remaining columns → example or text based on role scores
|
|
for i, geom, ls, rs in remaining:
|
|
if i in assigned:
|
|
continue
|
|
if rs['sentence'] > 0.4:
|
|
regions.append(PageRegion(
|
|
type='column_example', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=round(rs['sentence'], 2),
|
|
classification_method='content',
|
|
))
|
|
else:
|
|
regions.append(PageRegion(
|
|
type='column_example', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=0.5,
|
|
classification_method='content',
|
|
))
|
|
|
|
regions.sort(key=lambda r: r.x)
|
|
return regions
|
|
|
|
|
|
def _classify_by_position_enhanced(geometries: List[ColumnGeometry],
|
|
lang_scores: List[Dict[str, float]],
|
|
content_w: int,
|
|
content_h: int) -> Optional[List[PageRegion]]:
|
|
"""Level 2: Position-based rules enhanced with language confirmation.
|
|
|
|
Uses the old positional heuristics but confirms EN/DE assignment
|
|
with language scores (swapping if needed).
|
|
"""
|
|
regions = []
|
|
untyped = list(range(len(geometries)))
|
|
first_x = geometries[0].x if geometries else 0
|
|
left_20_threshold = first_x + content_w * 0.20
|
|
|
|
# Rule 1: Leftmost narrow column → page_ref (only if in left 20%, no strong language)
|
|
g0 = geometries[0]
|
|
ls0 = lang_scores[0]
|
|
has_strong_lang_0 = ls0['eng'] > 0.3 or ls0['deu'] > 0.3
|
|
if g0.width_ratio < 0.12 and g0.x < left_20_threshold and not has_strong_lang_0:
|
|
regions.append(PageRegion(
|
|
type='page_ref', x=g0.x, y=g0.y,
|
|
width=g0.width, height=content_h,
|
|
classification_confidence=0.8,
|
|
classification_method='position_enhanced',
|
|
))
|
|
untyped.remove(0)
|
|
|
|
# Rule 2: Narrow columns with few words → marker
|
|
for i in list(untyped):
|
|
geom = geometries[i]
|
|
if geom.width_ratio < 0.06 and geom.word_count <= 15:
|
|
regions.append(PageRegion(
|
|
type='column_marker', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=0.7,
|
|
classification_method='position_enhanced',
|
|
))
|
|
untyped.remove(i)
|
|
|
|
# Rule 3: Rightmost remaining → column_example (if 3+ remaining)
|
|
if len(untyped) >= 3:
|
|
last_idx = untyped[-1]
|
|
geom = geometries[last_idx]
|
|
regions.append(PageRegion(
|
|
type='column_example', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=0.7,
|
|
classification_method='position_enhanced',
|
|
))
|
|
untyped.remove(last_idx)
|
|
|
|
# Rule 4: First two remaining → EN/DE, but check language to possibly swap
|
|
if len(untyped) >= 2:
|
|
idx_a = untyped[0]
|
|
idx_b = untyped[1]
|
|
ls_a = lang_scores[idx_a]
|
|
ls_b = lang_scores[idx_b]
|
|
|
|
# Default: first=EN, second=DE (old behavior)
|
|
en_idx, de_idx = idx_a, idx_b
|
|
conf = 0.7
|
|
|
|
# Swap if language signals clearly indicate the opposite
|
|
if ls_a['deu'] > ls_a['eng'] and ls_b['eng'] > ls_b['deu']:
|
|
en_idx, de_idx = idx_b, idx_a
|
|
conf = 0.85
|
|
logger.info(f"ClassifyColumns: Level 2 swapped EN/DE based on language scores")
|
|
|
|
regions.append(PageRegion(
|
|
type='column_en', x=geometries[en_idx].x, y=geometries[en_idx].y,
|
|
width=geometries[en_idx].width, height=content_h,
|
|
classification_confidence=conf,
|
|
classification_method='position_enhanced',
|
|
))
|
|
regions.append(PageRegion(
|
|
type='column_de', x=geometries[de_idx].x, y=geometries[de_idx].y,
|
|
width=geometries[de_idx].width, height=content_h,
|
|
classification_confidence=conf,
|
|
classification_method='position_enhanced',
|
|
))
|
|
untyped = untyped[2:]
|
|
elif len(untyped) == 1:
|
|
idx = untyped[0]
|
|
geom = geometries[idx]
|
|
regions.append(PageRegion(
|
|
type='column_en', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=0.5,
|
|
classification_method='position_enhanced',
|
|
))
|
|
untyped = []
|
|
|
|
# Remaining → example
|
|
for idx in untyped:
|
|
geom = geometries[idx]
|
|
regions.append(PageRegion(
|
|
type='column_example', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=0.5,
|
|
classification_method='position_enhanced',
|
|
))
|
|
|
|
regions.sort(key=lambda r: r.x)
|
|
return regions
|
|
|
|
|
|
def _classify_by_position_fallback(geometries: List[ColumnGeometry],
|
|
content_w: int,
|
|
content_h: int) -> List[PageRegion]:
|
|
"""Level 3: Pure position-based fallback (identical to old code).
|
|
|
|
Guarantees no regression from the previous behavior.
|
|
"""
|
|
regions = []
|
|
untyped = list(range(len(geometries)))
|
|
first_x = geometries[0].x if geometries else 0
|
|
left_20_threshold = first_x + content_w * 0.20
|
|
|
|
# Rule 1: Leftmost narrow column → page_ref (only if in left 20%)
|
|
g0 = geometries[0]
|
|
if g0.width_ratio < 0.12 and g0.x < left_20_threshold:
|
|
regions.append(PageRegion(
|
|
type='page_ref', x=g0.x, y=g0.y,
|
|
width=g0.width, height=content_h,
|
|
classification_confidence=1.0,
|
|
classification_method='position_fallback',
|
|
))
|
|
untyped.remove(0)
|
|
|
|
# Rule 2: Narrow + few words → marker
|
|
for i in list(untyped):
|
|
geom = geometries[i]
|
|
if geom.width_ratio < 0.06 and geom.word_count <= 15:
|
|
regions.append(PageRegion(
|
|
type='column_marker', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=1.0,
|
|
classification_method='position_fallback',
|
|
))
|
|
untyped.remove(i)
|
|
|
|
# Rule 3: Rightmost remaining → example (if 3+)
|
|
if len(untyped) >= 3:
|
|
last_idx = untyped[-1]
|
|
geom = geometries[last_idx]
|
|
regions.append(PageRegion(
|
|
type='column_example', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=1.0,
|
|
classification_method='position_fallback',
|
|
))
|
|
untyped.remove(last_idx)
|
|
|
|
# Rule 4: First remaining → EN, second → DE
|
|
if len(untyped) >= 2:
|
|
en_idx = untyped[0]
|
|
de_idx = untyped[1]
|
|
regions.append(PageRegion(
|
|
type='column_en', x=geometries[en_idx].x, y=geometries[en_idx].y,
|
|
width=geometries[en_idx].width, height=content_h,
|
|
classification_confidence=1.0,
|
|
classification_method='position_fallback',
|
|
))
|
|
regions.append(PageRegion(
|
|
type='column_de', x=geometries[de_idx].x, y=geometries[de_idx].y,
|
|
width=geometries[de_idx].width, height=content_h,
|
|
classification_confidence=1.0,
|
|
classification_method='position_fallback',
|
|
))
|
|
untyped = untyped[2:]
|
|
elif len(untyped) == 1:
|
|
idx = untyped[0]
|
|
geom = geometries[idx]
|
|
regions.append(PageRegion(
|
|
type='column_en', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=1.0,
|
|
classification_method='position_fallback',
|
|
))
|
|
untyped = []
|
|
|
|
for idx in untyped:
|
|
geom = geometries[idx]
|
|
regions.append(PageRegion(
|
|
type='column_example', x=geom.x, y=geom.y,
|
|
width=geom.width, height=content_h,
|
|
classification_confidence=1.0,
|
|
classification_method='position_fallback',
|
|
))
|
|
|
|
regions.sort(key=lambda r: r.x)
|
|
return regions
|
|
|
|
|
|
def _add_header_footer(regions: List[PageRegion], top_y: int, bottom_y: int,
|
|
img_w: int, img_h: int) -> None:
|
|
"""Add header/footer regions in-place."""
|
|
if top_y > 10:
|
|
regions.append(PageRegion(type='header', x=0, y=0, width=img_w, height=top_y))
|
|
if bottom_y < img_h - 10:
|
|
regions.append(PageRegion(type='footer', x=0, y=bottom_y, width=img_w, height=img_h - bottom_y))
|
|
|
|
|
|
# --- Main Entry Point ---
|
|
|
|
def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> List[PageRegion]:
|
|
"""Detect columns using two-phase approach: geometry then content classification.
|
|
|
|
Phase A: detect_column_geometry() — clustering word positions into columns.
|
|
Phase B: classify_column_types() — content-based type assignment with fallback.
|
|
|
|
Falls back to projection-based analyze_layout() if geometry detection fails.
|
|
|
|
Args:
|
|
ocr_img: Binarized grayscale image for layout analysis.
|
|
dewarped_bgr: Original BGR image (for Tesseract word detection).
|
|
|
|
Returns:
|
|
List of PageRegion objects with types, confidence, and method.
|
|
"""
|
|
h, w = ocr_img.shape[:2]
|
|
|
|
# Phase A: Geometry detection
|
|
result = detect_column_geometry(ocr_img, dewarped_bgr)
|
|
|
|
if result is None:
|
|
# Fallback to projection-based layout
|
|
logger.info("LayoutByWords: geometry detection failed, falling back to projection profiles")
|
|
layout_img = create_layout_image(dewarped_bgr)
|
|
return analyze_layout(layout_img, ocr_img)
|
|
|
|
geometries, left_x, right_x, top_y, bottom_y = result
|
|
content_w = right_x - left_x
|
|
|
|
# Phase B: Content-based classification
|
|
regions = classify_column_types(geometries, content_w, top_y, w, h, bottom_y)
|
|
|
|
col_count = len([r for r in regions if r.type.startswith('column') or r.type == 'page_ref'])
|
|
methods = set(r.classification_method for r in regions if r.classification_method)
|
|
logger.info(f"LayoutByWords: {col_count} columns detected (methods: {methods}): "
|
|
f"{[(r.type, r.x, r.width, r.classification_confidence) for r in regions if r.type not in ('header','footer')]}")
|
|
|
|
return regions
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 6: Multi-Pass OCR
|
|
# =============================================================================
|
|
|
|
def ocr_region(ocr_img: np.ndarray, region: PageRegion, lang: str,
|
|
psm: int, fallback_psm: Optional[int] = None,
|
|
min_confidence: float = 40.0) -> List[Dict[str, Any]]:
|
|
"""Run Tesseract OCR on a specific region with given PSM.
|
|
|
|
Args:
|
|
ocr_img: Binarized full-page image.
|
|
region: Region to crop and OCR.
|
|
lang: Tesseract language string.
|
|
psm: Page Segmentation Mode.
|
|
fallback_psm: If confidence too low, retry with this PSM per line.
|
|
min_confidence: Minimum average confidence before fallback.
|
|
|
|
Returns:
|
|
List of word dicts with text, position, confidence.
|
|
"""
|
|
# Crop region
|
|
crop = ocr_img[region.y:region.y + region.height,
|
|
region.x:region.x + region.width]
|
|
|
|
if crop.size == 0:
|
|
return []
|
|
|
|
# Convert to PIL for pytesseract
|
|
pil_img = Image.fromarray(crop)
|
|
|
|
# Run Tesseract with specified PSM
|
|
config = f'--psm {psm} --oem 3'
|
|
try:
|
|
data = pytesseract.image_to_data(pil_img, lang=lang, config=config,
|
|
output_type=pytesseract.Output.DICT)
|
|
except Exception as e:
|
|
logger.warning(f"Tesseract failed for region {region.type}: {e}")
|
|
return []
|
|
|
|
words = []
|
|
for i in range(len(data['text'])):
|
|
text = data['text'][i].strip()
|
|
conf = int(data['conf'][i])
|
|
if not text or conf < 10:
|
|
continue
|
|
words.append({
|
|
'text': text,
|
|
'left': data['left'][i] + region.x, # Absolute coords
|
|
'top': data['top'][i] + region.y,
|
|
'width': data['width'][i],
|
|
'height': data['height'][i],
|
|
'conf': conf,
|
|
'region_type': region.type,
|
|
})
|
|
|
|
# Check average confidence
|
|
if words and fallback_psm is not None:
|
|
avg_conf = sum(w['conf'] for w in words) / len(words)
|
|
if avg_conf < min_confidence:
|
|
logger.info(f"Region {region.type}: avg confidence {avg_conf:.0f}% < {min_confidence}%, "
|
|
f"trying fallback PSM {fallback_psm}")
|
|
words = _ocr_region_line_by_line(ocr_img, region, lang, fallback_psm)
|
|
|
|
return words
|
|
|
|
|
|
def _ocr_region_line_by_line(ocr_img: np.ndarray, region: PageRegion,
|
|
lang: str, psm: int) -> List[Dict[str, Any]]:
|
|
"""OCR a region line by line (fallback for low-confidence regions).
|
|
|
|
Splits the region into horizontal strips based on text density,
|
|
then OCRs each strip individually with the given PSM.
|
|
"""
|
|
crop = ocr_img[region.y:region.y + region.height,
|
|
region.x:region.x + region.width]
|
|
|
|
if crop.size == 0:
|
|
return []
|
|
|
|
# Find text lines via horizontal projection
|
|
inv = cv2.bitwise_not(crop)
|
|
h_proj = np.sum(inv, axis=1)
|
|
threshold = np.max(h_proj) * 0.05 if np.max(h_proj) > 0 else 0
|
|
|
|
# Find line boundaries
|
|
lines = []
|
|
in_text = False
|
|
line_start = 0
|
|
for y in range(len(h_proj)):
|
|
if h_proj[y] > threshold and not in_text:
|
|
line_start = y
|
|
in_text = True
|
|
elif h_proj[y] <= threshold and in_text:
|
|
if y - line_start > 5: # Minimum line height
|
|
lines.append((line_start, y))
|
|
in_text = False
|
|
if in_text and len(h_proj) - line_start > 5:
|
|
lines.append((line_start, len(h_proj)))
|
|
|
|
all_words = []
|
|
config = f'--psm {psm} --oem 3'
|
|
|
|
for line_y_start, line_y_end in lines:
|
|
# Add small padding
|
|
pad = 3
|
|
y1 = max(0, line_y_start - pad)
|
|
y2 = min(crop.shape[0], line_y_end + pad)
|
|
line_crop = crop[y1:y2, :]
|
|
|
|
if line_crop.size == 0:
|
|
continue
|
|
|
|
pil_img = Image.fromarray(line_crop)
|
|
try:
|
|
data = pytesseract.image_to_data(pil_img, lang=lang, config=config,
|
|
output_type=pytesseract.Output.DICT)
|
|
except Exception:
|
|
continue
|
|
|
|
for i in range(len(data['text'])):
|
|
text = data['text'][i].strip()
|
|
conf = int(data['conf'][i])
|
|
if not text or conf < 10:
|
|
continue
|
|
all_words.append({
|
|
'text': text,
|
|
'left': data['left'][i] + region.x,
|
|
'top': data['top'][i] + region.y + y1,
|
|
'width': data['width'][i],
|
|
'height': data['height'][i],
|
|
'conf': conf,
|
|
'region_type': region.type,
|
|
})
|
|
|
|
return all_words
|
|
|
|
|
|
def run_multi_pass_ocr(ocr_img: np.ndarray,
|
|
regions: List[PageRegion],
|
|
lang: str = "eng+deu") -> Dict[str, List[Dict]]:
|
|
"""Run OCR on each detected region with optimized settings.
|
|
|
|
Args:
|
|
ocr_img: Binarized full-page image.
|
|
regions: Detected page regions.
|
|
lang: Default language.
|
|
|
|
Returns:
|
|
Dict mapping region type to list of word dicts.
|
|
"""
|
|
results: Dict[str, List[Dict]] = {}
|
|
|
|
for region in regions:
|
|
if region.type == 'header' or region.type == 'footer':
|
|
continue # Skip non-content regions
|
|
|
|
if region.type == 'column_en':
|
|
words = ocr_region(ocr_img, region, lang='eng', psm=4)
|
|
elif region.type == 'column_de':
|
|
words = ocr_region(ocr_img, region, lang='deu', psm=4)
|
|
elif region.type == 'column_example':
|
|
words = ocr_region(ocr_img, region, lang=lang, psm=6,
|
|
fallback_psm=7, min_confidence=40.0)
|
|
else:
|
|
words = ocr_region(ocr_img, region, lang=lang, psm=6)
|
|
|
|
results[region.type] = words
|
|
logger.info(f"OCR {region.type}: {len(words)} words")
|
|
|
|
return results
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 7: Line Alignment → Vocabulary Entries
|
|
# =============================================================================
|
|
|
|
def _group_words_into_lines(words: List[Dict], y_tolerance_px: int = 20) -> List[List[Dict]]:
|
|
"""Group words by Y position into lines, sorted by X within each line."""
|
|
if not words:
|
|
return []
|
|
|
|
sorted_words = sorted(words, key=lambda w: (w['top'], w['left']))
|
|
lines: List[List[Dict]] = []
|
|
current_line: List[Dict] = [sorted_words[0]]
|
|
current_y = sorted_words[0]['top']
|
|
|
|
for word in sorted_words[1:]:
|
|
if abs(word['top'] - current_y) <= y_tolerance_px:
|
|
current_line.append(word)
|
|
else:
|
|
current_line.sort(key=lambda w: w['left'])
|
|
lines.append(current_line)
|
|
current_line = [word]
|
|
current_y = word['top']
|
|
|
|
if current_line:
|
|
current_line.sort(key=lambda w: w['left'])
|
|
lines.append(current_line)
|
|
|
|
return lines
|
|
|
|
|
|
def match_lines_to_vocab(ocr_results: Dict[str, List[Dict]],
|
|
regions: List[PageRegion],
|
|
y_tolerance_px: int = 25) -> List[VocabRow]:
|
|
"""Align OCR results from different columns into vocabulary rows.
|
|
|
|
Uses Y-coordinate matching to pair English words, German translations,
|
|
and example sentences that appear on the same line.
|
|
|
|
Args:
|
|
ocr_results: Dict mapping region type to word lists.
|
|
regions: Detected regions (for reference).
|
|
y_tolerance_px: Max Y-distance to consider words on the same row.
|
|
|
|
Returns:
|
|
List of VocabRow objects.
|
|
"""
|
|
# If no vocabulary columns detected (e.g. plain text page), return empty
|
|
if 'column_en' not in ocr_results and 'column_de' not in ocr_results:
|
|
logger.info("match_lines_to_vocab: no column_en/column_de in OCR results, returning empty")
|
|
return []
|
|
|
|
# Group words into lines per column
|
|
en_lines = _group_words_into_lines(ocr_results.get('column_en', []), y_tolerance_px)
|
|
de_lines = _group_words_into_lines(ocr_results.get('column_de', []), y_tolerance_px)
|
|
ex_lines = _group_words_into_lines(ocr_results.get('column_example', []), y_tolerance_px)
|
|
|
|
def line_y_center(line: List[Dict]) -> float:
|
|
return sum(w['top'] + w['height'] / 2 for w in line) / len(line)
|
|
|
|
def line_text(line: List[Dict]) -> str:
|
|
return ' '.join(w['text'] for w in line)
|
|
|
|
def line_confidence(line: List[Dict]) -> float:
|
|
return sum(w['conf'] for w in line) / len(line) if line else 0
|
|
|
|
# Build EN entries as the primary reference
|
|
vocab_rows: List[VocabRow] = []
|
|
|
|
for en_line in en_lines:
|
|
en_y = line_y_center(en_line)
|
|
en_text = line_text(en_line)
|
|
en_conf = line_confidence(en_line)
|
|
|
|
# Skip very short or likely header content
|
|
if len(en_text.strip()) < 2:
|
|
continue
|
|
|
|
# Find matching DE line
|
|
de_text = ""
|
|
de_conf = 0.0
|
|
best_de_dist = float('inf')
|
|
best_de_idx = -1
|
|
for idx, de_line in enumerate(de_lines):
|
|
dist = abs(line_y_center(de_line) - en_y)
|
|
if dist < y_tolerance_px and dist < best_de_dist:
|
|
best_de_dist = dist
|
|
best_de_idx = idx
|
|
|
|
if best_de_idx >= 0:
|
|
de_text = line_text(de_lines[best_de_idx])
|
|
de_conf = line_confidence(de_lines[best_de_idx])
|
|
|
|
# Find matching example line
|
|
ex_text = ""
|
|
ex_conf = 0.0
|
|
best_ex_dist = float('inf')
|
|
best_ex_idx = -1
|
|
for idx, ex_line in enumerate(ex_lines):
|
|
dist = abs(line_y_center(ex_line) - en_y)
|
|
if dist < y_tolerance_px and dist < best_ex_dist:
|
|
best_ex_dist = dist
|
|
best_ex_idx = idx
|
|
|
|
if best_ex_idx >= 0:
|
|
ex_text = line_text(ex_lines[best_ex_idx])
|
|
ex_conf = line_confidence(ex_lines[best_ex_idx])
|
|
|
|
avg_conf = en_conf
|
|
conf_count = 1
|
|
if de_conf > 0:
|
|
avg_conf += de_conf
|
|
conf_count += 1
|
|
if ex_conf > 0:
|
|
avg_conf += ex_conf
|
|
conf_count += 1
|
|
|
|
vocab_rows.append(VocabRow(
|
|
english=en_text.strip(),
|
|
german=de_text.strip(),
|
|
example=ex_text.strip(),
|
|
confidence=avg_conf / conf_count,
|
|
y_position=int(en_y),
|
|
))
|
|
|
|
# Handle multi-line wrapping in example column:
|
|
# If an example line has no matching EN/DE, append to previous entry
|
|
matched_ex_ys = set()
|
|
for row in vocab_rows:
|
|
if row.example:
|
|
matched_ex_ys.add(row.y_position)
|
|
|
|
for ex_line in ex_lines:
|
|
ex_y = line_y_center(ex_line)
|
|
# Check if already matched
|
|
already_matched = any(abs(ex_y - y) < y_tolerance_px for y in matched_ex_ys)
|
|
if already_matched:
|
|
continue
|
|
|
|
# Find nearest previous vocab row
|
|
best_row = None
|
|
best_dist = float('inf')
|
|
for row in vocab_rows:
|
|
dist = ex_y - row.y_position
|
|
if 0 < dist < y_tolerance_px * 3 and dist < best_dist:
|
|
best_dist = dist
|
|
best_row = row
|
|
|
|
if best_row:
|
|
continuation = line_text(ex_line).strip()
|
|
if continuation:
|
|
best_row.example = (best_row.example + " " + continuation).strip()
|
|
|
|
# Sort by Y position
|
|
vocab_rows.sort(key=lambda r: r.y_position)
|
|
|
|
return vocab_rows
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 8: Optional LLM Post-Correction
|
|
# =============================================================================
|
|
|
|
async def llm_post_correct(img: np.ndarray, vocab_rows: List[VocabRow],
|
|
confidence_threshold: float = 50.0,
|
|
enabled: bool = False) -> List[VocabRow]:
|
|
"""Optionally send low-confidence regions to Qwen-VL for correction.
|
|
|
|
Default: disabled. Enable per parameter.
|
|
|
|
Args:
|
|
img: Original BGR image.
|
|
vocab_rows: Current vocabulary rows.
|
|
confidence_threshold: Rows below this get LLM correction.
|
|
enabled: Whether to actually run LLM correction.
|
|
|
|
Returns:
|
|
Corrected vocabulary rows.
|
|
"""
|
|
if not enabled:
|
|
return vocab_rows
|
|
|
|
# TODO: Implement Qwen-VL correction for low-confidence entries
|
|
# For each row with confidence < threshold:
|
|
# 1. Crop the relevant region from img
|
|
# 2. Send crop + OCR text to Qwen-VL
|
|
# 3. Replace text if LLM provides a confident correction
|
|
logger.info(f"LLM post-correction skipped (not yet implemented)")
|
|
return vocab_rows
|
|
|
|
|
|
# =============================================================================
|
|
# Orchestrator
|
|
# =============================================================================
|
|
|
|
async def run_cv_pipeline(
|
|
pdf_data: Optional[bytes] = None,
|
|
image_data: Optional[bytes] = None,
|
|
page_number: int = 0,
|
|
zoom: float = 3.0,
|
|
enable_dewarp: bool = True,
|
|
enable_llm_correction: bool = False,
|
|
lang: str = "eng+deu",
|
|
) -> PipelineResult:
|
|
"""Run the complete CV document reconstruction pipeline.
|
|
|
|
Args:
|
|
pdf_data: Raw PDF bytes (mutually exclusive with image_data).
|
|
image_data: Raw image bytes (mutually exclusive with pdf_data).
|
|
page_number: 0-indexed page number (for PDF).
|
|
zoom: PDF rendering zoom factor.
|
|
enable_dewarp: Whether to run dewarp stage.
|
|
enable_llm_correction: Whether to run LLM post-correction.
|
|
lang: Tesseract language string.
|
|
|
|
Returns:
|
|
PipelineResult with vocabulary and timing info.
|
|
"""
|
|
if not CV_PIPELINE_AVAILABLE:
|
|
return PipelineResult(error="CV pipeline not available (OpenCV or Tesseract missing)")
|
|
|
|
result = PipelineResult()
|
|
total_start = time.time()
|
|
|
|
try:
|
|
# Stage 1: Render
|
|
t = time.time()
|
|
if pdf_data:
|
|
img = render_pdf_high_res(pdf_data, page_number, zoom)
|
|
elif image_data:
|
|
img = render_image_high_res(image_data)
|
|
else:
|
|
return PipelineResult(error="No input data (pdf_data or image_data required)")
|
|
result.stages['render'] = round(time.time() - t, 2)
|
|
result.image_width = img.shape[1]
|
|
result.image_height = img.shape[0]
|
|
logger.info(f"Stage 1 (render): {img.shape[1]}x{img.shape[0]} in {result.stages['render']}s")
|
|
|
|
# Stage 2: Deskew
|
|
t = time.time()
|
|
img, angle = deskew_image(img)
|
|
result.stages['deskew'] = round(time.time() - t, 2)
|
|
logger.info(f"Stage 2 (deskew): {angle:.2f}° in {result.stages['deskew']}s")
|
|
|
|
# Stage 3: Dewarp
|
|
if enable_dewarp:
|
|
t = time.time()
|
|
img = dewarp_image(img)
|
|
result.stages['dewarp'] = round(time.time() - t, 2)
|
|
|
|
# Stage 4: Dual image preparation
|
|
t = time.time()
|
|
ocr_img = create_ocr_image(img)
|
|
layout_img = create_layout_image(img)
|
|
result.stages['image_prep'] = round(time.time() - t, 2)
|
|
|
|
# Stage 5: Layout analysis
|
|
t = time.time()
|
|
regions = analyze_layout(layout_img, ocr_img)
|
|
result.stages['layout'] = round(time.time() - t, 2)
|
|
result.columns_detected = len([r for r in regions if r.type.startswith('column')])
|
|
logger.info(f"Stage 5 (layout): {result.columns_detected} columns in {result.stages['layout']}s")
|
|
|
|
# Stage 6: Multi-pass OCR
|
|
t = time.time()
|
|
ocr_results = run_multi_pass_ocr(ocr_img, regions, lang)
|
|
result.stages['ocr'] = round(time.time() - t, 2)
|
|
total_words = sum(len(w) for w in ocr_results.values())
|
|
result.word_count = total_words
|
|
logger.info(f"Stage 6 (OCR): {total_words} words in {result.stages['ocr']}s")
|
|
|
|
# Stage 7: Line alignment
|
|
t = time.time()
|
|
vocab_rows = match_lines_to_vocab(ocr_results, regions)
|
|
result.stages['alignment'] = round(time.time() - t, 2)
|
|
|
|
# Stage 8: Optional LLM correction
|
|
if enable_llm_correction:
|
|
t = time.time()
|
|
vocab_rows = await llm_post_correct(img, vocab_rows)
|
|
result.stages['llm_correction'] = round(time.time() - t, 2)
|
|
|
|
# Convert to output format
|
|
result.vocabulary = [
|
|
{
|
|
"english": row.english,
|
|
"german": row.german,
|
|
"example": row.example,
|
|
"confidence": round(row.confidence, 1),
|
|
}
|
|
for row in vocab_rows
|
|
if row.english or row.german # Skip empty rows
|
|
]
|
|
|
|
result.duration_seconds = round(time.time() - total_start, 2)
|
|
logger.info(f"CV Pipeline complete: {len(result.vocabulary)} entries in {result.duration_seconds}s")
|
|
|
|
except Exception as e:
|
|
logger.error(f"CV Pipeline error: {e}")
|
|
import traceback
|
|
logger.debug(traceback.format_exc())
|
|
result.error = str(e)
|
|
result.duration_seconds = round(time.time() - total_start, 2)
|
|
|
|
return result
|