- Fix dewarp method selection: prefer methods with >5px curvature over
higher confidence (vertical_edge 79px was being ignored for text_baseline 2px)
- Add grid overlay on left image in Dewarp step for side-by-side comparison
- Add GET /sessions/{id} endpoint to reload session data
- StepDeskew accepts sessionId prop to restore state when navigating back
- SessionInfo type extended with optional deskew_result and dewarp_result
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1487 lines
51 KiB
Python
1487 lines
51 KiB
Python
"""
|
|
CV-based Document Reconstruction Pipeline for Vocabulary Extraction.
|
|
|
|
Uses classical Computer Vision techniques for high-quality OCR:
|
|
- High-resolution PDF rendering (432 DPI)
|
|
- Deskew (rotation correction via Hough Lines)
|
|
- Dewarp (book curvature correction) — pass-through initially
|
|
- Dual image preparation (binarized for OCR, CLAHE for layout)
|
|
- Projection-profile layout analysis (column/row detection)
|
|
- Multi-pass Tesseract OCR with region-specific PSM settings
|
|
- Y-coordinate line alignment for vocabulary matching
|
|
- Optional LLM post-correction for low-confidence regions
|
|
|
|
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
|
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
|
"""
|
|
|
|
import io
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
|
|
import numpy as np
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Availability Guards ---
|
|
|
|
try:
|
|
import cv2
|
|
CV2_AVAILABLE = True
|
|
except ImportError:
|
|
cv2 = None
|
|
CV2_AVAILABLE = False
|
|
logger.warning("OpenCV not available — CV pipeline disabled")
|
|
|
|
try:
|
|
import pytesseract
|
|
from PIL import Image
|
|
TESSERACT_AVAILABLE = True
|
|
except ImportError:
|
|
pytesseract = None
|
|
Image = None
|
|
TESSERACT_AVAILABLE = False
|
|
logger.warning("pytesseract/Pillow not available — CV pipeline disabled")
|
|
|
|
CV_PIPELINE_AVAILABLE = CV2_AVAILABLE and TESSERACT_AVAILABLE
|
|
|
|
|
|
# --- Data Classes ---
|
|
|
|
@dataclass
|
|
class PageRegion:
|
|
"""A detected region on the page."""
|
|
type: str # 'column_en', 'column_de', 'column_example', 'header', 'footer'
|
|
x: int
|
|
y: int
|
|
width: int
|
|
height: int
|
|
|
|
|
|
@dataclass
|
|
class VocabRow:
|
|
"""A single vocabulary entry assembled from multi-column OCR."""
|
|
english: str = ""
|
|
german: str = ""
|
|
example: str = ""
|
|
confidence: float = 0.0
|
|
y_position: int = 0
|
|
|
|
|
|
@dataclass
|
|
class PipelineResult:
|
|
"""Complete result of the CV pipeline."""
|
|
vocabulary: List[Dict[str, Any]] = field(default_factory=list)
|
|
word_count: int = 0
|
|
columns_detected: int = 0
|
|
duration_seconds: float = 0.0
|
|
stages: Dict[str, float] = field(default_factory=dict)
|
|
error: Optional[str] = None
|
|
image_width: int = 0
|
|
image_height: int = 0
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 1: High-Resolution PDF Rendering
|
|
# =============================================================================
|
|
|
|
def render_pdf_high_res(pdf_data: bytes, page_number: int = 0, zoom: float = 3.0) -> np.ndarray:
|
|
"""Render a PDF page to a high-resolution numpy array (BGR).
|
|
|
|
Args:
|
|
pdf_data: Raw PDF bytes.
|
|
page_number: 0-indexed page number.
|
|
zoom: Zoom factor (3.0 = 432 DPI).
|
|
|
|
Returns:
|
|
numpy array in BGR format.
|
|
"""
|
|
import fitz # PyMuPDF
|
|
|
|
pdf_doc = fitz.open(stream=pdf_data, filetype="pdf")
|
|
if page_number >= pdf_doc.page_count:
|
|
raise ValueError(f"Page {page_number} does not exist (PDF has {pdf_doc.page_count} pages)")
|
|
|
|
page = pdf_doc[page_number]
|
|
mat = fitz.Matrix(zoom, zoom)
|
|
pix = page.get_pixmap(matrix=mat)
|
|
|
|
# Convert to numpy BGR
|
|
img_data = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n)
|
|
if pix.n == 4: # RGBA
|
|
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGBA2BGR)
|
|
elif pix.n == 3: # RGB
|
|
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGB2BGR)
|
|
else: # Grayscale
|
|
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_GRAY2BGR)
|
|
|
|
pdf_doc.close()
|
|
return img_bgr
|
|
|
|
|
|
def render_image_high_res(image_data: bytes) -> np.ndarray:
|
|
"""Load an image (PNG/JPEG) into a numpy array (BGR).
|
|
|
|
Args:
|
|
image_data: Raw image bytes.
|
|
|
|
Returns:
|
|
numpy array in BGR format.
|
|
"""
|
|
img_array = np.frombuffer(image_data, dtype=np.uint8)
|
|
img_bgr = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
|
|
if img_bgr is None:
|
|
raise ValueError("Could not decode image data")
|
|
return img_bgr
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 2: Deskew (Rotation Correction)
|
|
# =============================================================================
|
|
|
|
def deskew_image(img: np.ndarray) -> Tuple[np.ndarray, float]:
|
|
"""Correct rotation using Hough Line detection.
|
|
|
|
Args:
|
|
img: BGR image.
|
|
|
|
Returns:
|
|
Tuple of (corrected image, detected angle in degrees).
|
|
"""
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
# Binarize for line detection
|
|
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
|
|
|
|
# Detect lines
|
|
lines = cv2.HoughLinesP(binary, 1, np.pi / 180, threshold=100,
|
|
minLineLength=img.shape[1] // 4, maxLineGap=20)
|
|
|
|
if lines is None or len(lines) < 3:
|
|
return img, 0.0
|
|
|
|
# Compute angles of near-horizontal lines
|
|
angles = []
|
|
for line in lines:
|
|
x1, y1, x2, y2 = line[0]
|
|
angle = np.degrees(np.arctan2(y2 - y1, x2 - x1))
|
|
if abs(angle) < 15: # Only near-horizontal
|
|
angles.append(angle)
|
|
|
|
if not angles:
|
|
return img, 0.0
|
|
|
|
median_angle = float(np.median(angles))
|
|
|
|
# Limit correction to ±5°
|
|
if abs(median_angle) > 5.0:
|
|
median_angle = 5.0 * np.sign(median_angle)
|
|
|
|
if abs(median_angle) < 0.1:
|
|
return img, 0.0
|
|
|
|
# Rotate
|
|
h, w = img.shape[:2]
|
|
center = (w // 2, h // 2)
|
|
M = cv2.getRotationMatrix2D(center, median_angle, 1.0)
|
|
corrected = cv2.warpAffine(img, M, (w, h),
|
|
flags=cv2.INTER_LINEAR,
|
|
borderMode=cv2.BORDER_REPLICATE)
|
|
|
|
logger.info(f"Deskew: corrected {median_angle:.2f}° rotation")
|
|
return corrected, median_angle
|
|
|
|
|
|
def deskew_image_by_word_alignment(
|
|
image_data: bytes,
|
|
lang: str = "eng+deu",
|
|
downscale_factor: float = 0.5,
|
|
) -> Tuple[bytes, float]:
|
|
"""Correct rotation by fitting a line through left-most word starts per text line.
|
|
|
|
More robust than Hough-based deskew for vocabulary worksheets where text lines
|
|
have consistent left-alignment. Runs a quick Tesseract pass on a downscaled
|
|
copy to find word positions, computes the dominant left-edge column, fits a
|
|
line through those points and rotates the full-resolution image.
|
|
|
|
Args:
|
|
image_data: Raw image bytes (PNG/JPEG).
|
|
lang: Tesseract language string for the quick pass.
|
|
downscale_factor: Shrink factor for the quick Tesseract pass (0.5 = 50%).
|
|
|
|
Returns:
|
|
Tuple of (rotated image as PNG bytes, detected angle in degrees).
|
|
"""
|
|
if not CV2_AVAILABLE or not TESSERACT_AVAILABLE:
|
|
return image_data, 0.0
|
|
|
|
# 1. Decode image
|
|
img_array = np.frombuffer(image_data, dtype=np.uint8)
|
|
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
|
|
if img is None:
|
|
logger.warning("deskew_by_word_alignment: could not decode image")
|
|
return image_data, 0.0
|
|
|
|
orig_h, orig_w = img.shape[:2]
|
|
|
|
# 2. Downscale for fast Tesseract pass
|
|
small_w = int(orig_w * downscale_factor)
|
|
small_h = int(orig_h * downscale_factor)
|
|
small = cv2.resize(img, (small_w, small_h), interpolation=cv2.INTER_AREA)
|
|
|
|
# 3. Quick Tesseract — word-level positions
|
|
pil_small = Image.fromarray(cv2.cvtColor(small, cv2.COLOR_BGR2RGB))
|
|
try:
|
|
data = pytesseract.image_to_data(
|
|
pil_small, lang=lang, config="--psm 6 --oem 3",
|
|
output_type=pytesseract.Output.DICT,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"deskew_by_word_alignment: Tesseract failed: {e}")
|
|
return image_data, 0.0
|
|
|
|
# 4. Per text-line, find the left-most word start
|
|
# Group by (block_num, par_num, line_num)
|
|
from collections import defaultdict
|
|
line_groups: Dict[tuple, list] = defaultdict(list)
|
|
for i in range(len(data["text"])):
|
|
text = (data["text"][i] or "").strip()
|
|
conf = int(data["conf"][i])
|
|
if not text or conf < 20:
|
|
continue
|
|
key = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
|
|
line_groups[key].append(i)
|
|
|
|
if len(line_groups) < 5:
|
|
logger.info(f"deskew_by_word_alignment: only {len(line_groups)} lines, skipping")
|
|
return image_data, 0.0
|
|
|
|
# For each line, pick the word with smallest 'left' → compute (left_x, center_y)
|
|
# Scale back to original resolution
|
|
scale = 1.0 / downscale_factor
|
|
points = [] # list of (x, y) in original-image coords
|
|
for key, indices in line_groups.items():
|
|
best_idx = min(indices, key=lambda i: data["left"][i])
|
|
lx = data["left"][best_idx] * scale
|
|
top = data["top"][best_idx] * scale
|
|
h = data["height"][best_idx] * scale
|
|
cy = top + h / 2.0
|
|
points.append((lx, cy))
|
|
|
|
# 5. Find dominant left-edge column + compute angle
|
|
xs = np.array([p[0] for p in points])
|
|
ys = np.array([p[1] for p in points])
|
|
median_x = float(np.median(xs))
|
|
tolerance = orig_w * 0.03 # 3% of image width
|
|
|
|
mask = np.abs(xs - median_x) <= tolerance
|
|
filtered_xs = xs[mask]
|
|
filtered_ys = ys[mask]
|
|
|
|
if len(filtered_xs) < 5:
|
|
logger.info(f"deskew_by_word_alignment: only {len(filtered_xs)} aligned points after filter, skipping")
|
|
return image_data, 0.0
|
|
|
|
# polyfit: x = a*y + b → a = dx/dy → angle = arctan(a)
|
|
coeffs = np.polyfit(filtered_ys, filtered_xs, 1)
|
|
slope = coeffs[0] # dx/dy
|
|
angle_rad = np.arctan(slope)
|
|
angle_deg = float(np.degrees(angle_rad))
|
|
|
|
# Clamp to ±5°
|
|
angle_deg = max(-5.0, min(5.0, angle_deg))
|
|
|
|
logger.info(f"deskew_by_word_alignment: detected {angle_deg:.2f}° from {len(filtered_xs)} points "
|
|
f"(total lines: {len(line_groups)})")
|
|
|
|
if abs(angle_deg) < 0.05:
|
|
return image_data, 0.0
|
|
|
|
# 6. Rotate full-res image
|
|
center = (orig_w // 2, orig_h // 2)
|
|
M = cv2.getRotationMatrix2D(center, angle_deg, 1.0)
|
|
rotated = cv2.warpAffine(img, M, (orig_w, orig_h),
|
|
flags=cv2.INTER_LINEAR,
|
|
borderMode=cv2.BORDER_REPLICATE)
|
|
|
|
# Encode back to PNG
|
|
success, png_buf = cv2.imencode(".png", rotated)
|
|
if not success:
|
|
logger.warning("deskew_by_word_alignment: PNG encoding failed")
|
|
return image_data, 0.0
|
|
|
|
return png_buf.tobytes(), angle_deg
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 3: Dewarp (Book Curvature Correction)
|
|
# =============================================================================
|
|
|
|
def _dewarp_by_vertical_edges(img: np.ndarray) -> Dict[str, Any]:
|
|
"""Method A: Detect curvature from strongest vertical text edges.
|
|
|
|
Splits image into horizontal strips, finds the dominant vertical edge
|
|
X-position per strip, fits a 2nd-degree polynomial, and generates a
|
|
displacement map if curvature exceeds threshold.
|
|
|
|
Returns:
|
|
Dict with keys: method, curvature_px, confidence, displacement_map (or None).
|
|
"""
|
|
h, w = img.shape[:2]
|
|
result = {"method": "vertical_edge", "curvature_px": 0.0, "confidence": 0.0, "displacement_map": None}
|
|
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Vertical Sobel to find vertical edges
|
|
sobel_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
|
|
abs_sobel = np.abs(sobel_x).astype(np.uint8)
|
|
|
|
# Binarize with Otsu
|
|
_, binary = cv2.threshold(abs_sobel, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
|
|
|
|
num_strips = 20
|
|
strip_h = h // num_strips
|
|
edge_positions = [] # (y_center, x_position)
|
|
|
|
for i in range(num_strips):
|
|
y_start = i * strip_h
|
|
y_end = min((i + 1) * strip_h, h)
|
|
strip = binary[y_start:y_end, :]
|
|
|
|
# Project vertically (sum along y-axis)
|
|
projection = np.sum(strip, axis=0).astype(np.float64)
|
|
if projection.max() == 0:
|
|
continue
|
|
|
|
# Find the strongest vertical edge in left 40% of image (left margin area)
|
|
search_w = int(w * 0.4)
|
|
left_proj = projection[:search_w]
|
|
if left_proj.max() == 0:
|
|
continue
|
|
|
|
# Smooth and find peak
|
|
kernel_size = max(3, w // 100)
|
|
if kernel_size % 2 == 0:
|
|
kernel_size += 1
|
|
smoothed = cv2.GaussianBlur(left_proj.reshape(1, -1), (kernel_size, 1), 0).flatten()
|
|
x_pos = float(np.argmax(smoothed))
|
|
y_center = (y_start + y_end) / 2.0
|
|
edge_positions.append((y_center, x_pos))
|
|
|
|
if len(edge_positions) < 8:
|
|
return result
|
|
|
|
ys = np.array([p[0] for p in edge_positions])
|
|
xs = np.array([p[1] for p in edge_positions])
|
|
|
|
# Remove outliers (> 2 std from median)
|
|
median_x = np.median(xs)
|
|
std_x = max(np.std(xs), 1.0)
|
|
mask = np.abs(xs - median_x) < 2 * std_x
|
|
ys = ys[mask]
|
|
xs = xs[mask]
|
|
|
|
if len(ys) < 6:
|
|
return result
|
|
|
|
# Fit 2nd degree polynomial: x = a*y^2 + b*y + c
|
|
coeffs = np.polyfit(ys, xs, 2)
|
|
fitted = np.polyval(coeffs, ys)
|
|
residuals = xs - fitted
|
|
rmse = float(np.sqrt(np.mean(residuals ** 2)))
|
|
|
|
# Measure curvature: max deviation from straight line
|
|
straight_coeffs = np.polyfit(ys, xs, 1)
|
|
straight_fitted = np.polyval(straight_coeffs, ys)
|
|
curvature_px = float(np.max(np.abs(fitted - straight_fitted)))
|
|
|
|
if curvature_px < 2.0:
|
|
result["confidence"] = 0.3
|
|
return result
|
|
|
|
# Generate displacement map
|
|
y_coords = np.arange(h)
|
|
all_fitted = np.polyval(coeffs, y_coords)
|
|
all_straight = np.polyval(straight_coeffs, y_coords)
|
|
dx_per_row = all_fitted - all_straight # displacement per row
|
|
|
|
# Create full displacement map: each pixel shifts horizontally by dx_per_row[y]
|
|
displacement_map = np.zeros((h, w), dtype=np.float32)
|
|
for y in range(h):
|
|
displacement_map[y, :] = -dx_per_row[y]
|
|
|
|
confidence = min(1.0, len(ys) / 15.0) * max(0.5, 1.0 - rmse / 5.0)
|
|
|
|
result["curvature_px"] = round(curvature_px, 2)
|
|
result["confidence"] = round(float(confidence), 2)
|
|
result["displacement_map"] = displacement_map
|
|
|
|
return result
|
|
|
|
|
|
def _dewarp_by_text_baseline(img: np.ndarray) -> Dict[str, Any]:
|
|
"""Method B: Detect curvature from Tesseract text baseline positions.
|
|
|
|
Uses a quick Tesseract pass on a downscaled image, groups words into lines,
|
|
measures baseline curvature per line, and aggregates into a displacement map.
|
|
|
|
Returns:
|
|
Dict with keys: method, curvature_px, confidence, displacement_map (or None).
|
|
"""
|
|
h, w = img.shape[:2]
|
|
result = {"method": "text_baseline", "curvature_px": 0.0, "confidence": 0.0, "displacement_map": None}
|
|
|
|
if not TESSERACT_AVAILABLE:
|
|
return result
|
|
|
|
# Downscale for speed
|
|
max_dim = 1500
|
|
scale_factor = min(1.0, max_dim / max(h, w))
|
|
if scale_factor < 1.0:
|
|
small = cv2.resize(img, (int(w * scale_factor), int(h * scale_factor)), interpolation=cv2.INTER_AREA)
|
|
else:
|
|
small = img
|
|
scale_factor = 1.0
|
|
|
|
pil_img = Image.fromarray(cv2.cvtColor(small, cv2.COLOR_BGR2RGB))
|
|
try:
|
|
data = pytesseract.image_to_data(
|
|
pil_img, lang="eng+deu", config="--psm 6 --oem 3",
|
|
output_type=pytesseract.Output.DICT,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"dewarp text_baseline: Tesseract failed: {e}")
|
|
return result
|
|
|
|
# Group words by line
|
|
from collections import defaultdict
|
|
line_groups: Dict[tuple, list] = defaultdict(list)
|
|
for i in range(len(data["text"])):
|
|
text = (data["text"][i] or "").strip()
|
|
conf = int(data["conf"][i])
|
|
if not text or conf < 20:
|
|
continue
|
|
key = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
|
|
line_groups[key].append(i)
|
|
|
|
if len(line_groups) < 5:
|
|
return result
|
|
|
|
inv_scale = 1.0 / scale_factor
|
|
|
|
# For each line with enough words, measure baseline curvature
|
|
line_curvatures = [] # (y_center, curvature_px)
|
|
all_baselines = [] # (y_center, dx_offset) for displacement map
|
|
|
|
for key, indices in line_groups.items():
|
|
if len(indices) < 3:
|
|
continue
|
|
|
|
# Collect baseline points: (x_center, y_bottom) for each word
|
|
points = []
|
|
for idx in indices:
|
|
x_center = (data["left"][idx] + data["width"][idx] / 2.0) * inv_scale
|
|
y_bottom = (data["top"][idx] + data["height"][idx]) * inv_scale
|
|
points.append((x_center, y_bottom))
|
|
|
|
points.sort(key=lambda p: p[0])
|
|
xs_line = np.array([p[0] for p in points])
|
|
ys_line = np.array([p[1] for p in points])
|
|
|
|
if len(xs_line) < 3:
|
|
continue
|
|
|
|
# Fit 2nd degree: y = a*x^2 + b*x + c
|
|
try:
|
|
coeffs = np.polyfit(xs_line, ys_line, 2)
|
|
except (np.linalg.LinAlgError, ValueError):
|
|
continue
|
|
|
|
fitted = np.polyval(coeffs, xs_line)
|
|
straight = np.polyval(np.polyfit(xs_line, ys_line, 1), xs_line)
|
|
curvature = float(np.max(np.abs(fitted - straight)))
|
|
|
|
y_center = float(np.mean(ys_line))
|
|
line_curvatures.append((y_center, curvature, coeffs, xs_line, ys_line))
|
|
|
|
if len(line_curvatures) < 3:
|
|
return result
|
|
|
|
# Average curvature
|
|
avg_curvature = float(np.mean([c[1] for c in line_curvatures]))
|
|
|
|
if avg_curvature < 1.5:
|
|
result["confidence"] = 0.3
|
|
return result
|
|
|
|
# Build displacement map from line baselines
|
|
# For each line, compute the vertical offset needed to straighten
|
|
displacement_map = np.zeros((h, w), dtype=np.float32)
|
|
|
|
for y_center, curvature, coeffs, xs_line, ys_line in line_curvatures:
|
|
# The displacement is the difference between curved and straight baseline
|
|
x_range = np.arange(w, dtype=np.float64)
|
|
fitted_y = np.polyval(coeffs, x_range)
|
|
straight_y = np.polyval(np.polyfit(xs_line, ys_line, 1), x_range)
|
|
dy = fitted_y - straight_y
|
|
|
|
# Convert vertical curvature to horizontal displacement estimate
|
|
# (curvature bends text → horizontal shift proportional to curvature)
|
|
# Use the vertical curvature as proxy for horizontal distortion
|
|
y_int = int(y_center)
|
|
spread = max(int(h / len(line_curvatures) / 2), 20)
|
|
y_start = max(0, y_int - spread)
|
|
y_end = min(h, y_int + spread)
|
|
|
|
for y in range(y_start, y_end):
|
|
weight = 1.0 - abs(y - y_int) / spread
|
|
displacement_map[y, :] += (dy * weight).astype(np.float32)
|
|
|
|
# Normalize: the displacement map represents vertical shifts
|
|
# Convert to horizontal displacement (since curvature typically shifts columns)
|
|
# Use the sign of the 2nd-degree coefficient averaged across lines
|
|
avg_a = float(np.mean([c[2][0] for c in line_curvatures]))
|
|
if abs(avg_a) > 0:
|
|
# Scale displacement map to represent horizontal pixel shifts
|
|
max_disp = np.max(np.abs(displacement_map))
|
|
if max_disp > 0:
|
|
displacement_map = displacement_map * (avg_curvature / max_disp)
|
|
|
|
confidence = min(1.0, len(line_curvatures) / 10.0) * 0.8
|
|
result["curvature_px"] = round(avg_curvature, 2)
|
|
result["confidence"] = round(float(confidence), 2)
|
|
result["displacement_map"] = displacement_map
|
|
|
|
return result
|
|
|
|
|
|
def _apply_displacement_map(img: np.ndarray, displacement_map: np.ndarray,
|
|
scale: float = 1.0) -> np.ndarray:
|
|
"""Apply a horizontal displacement map to an image using cv2.remap().
|
|
|
|
Args:
|
|
img: BGR image.
|
|
displacement_map: Float32 array (h, w) of horizontal pixel shifts.
|
|
scale: Multiplier for the displacement (-3.0 to +3.0).
|
|
|
|
Returns:
|
|
Corrected image.
|
|
"""
|
|
h, w = img.shape[:2]
|
|
|
|
# Base coordinate grids
|
|
map_x = np.tile(np.arange(w, dtype=np.float32), (h, 1))
|
|
map_y = np.tile(np.arange(h, dtype=np.float32).reshape(-1, 1), (1, w))
|
|
|
|
# Apply scaled displacement
|
|
map_x = map_x + displacement_map * scale
|
|
|
|
# Remap
|
|
corrected = cv2.remap(img, map_x, map_y,
|
|
interpolation=cv2.INTER_LINEAR,
|
|
borderMode=cv2.BORDER_REPLICATE)
|
|
return corrected
|
|
|
|
|
|
def dewarp_image(img: np.ndarray) -> Tuple[np.ndarray, Dict[str, Any]]:
|
|
"""Correct book curvature distortion using the best of two methods.
|
|
|
|
Method A: Vertical edge analysis — detects curvature of the strongest
|
|
vertical text edge (left column margin).
|
|
|
|
Method B: Text baseline analysis — uses Tesseract word positions to
|
|
measure baseline curvature across text lines.
|
|
|
|
The method with higher confidence wins. Returns the corrected image
|
|
and a DewarpInfo dict for the API.
|
|
|
|
Args:
|
|
img: BGR image (already deskewed).
|
|
|
|
Returns:
|
|
Tuple of (corrected_image, dewarp_info).
|
|
dewarp_info keys: method, curvature_px, confidence, displacement_map.
|
|
"""
|
|
no_correction = {
|
|
"method": "none",
|
|
"curvature_px": 0.0,
|
|
"confidence": 0.0,
|
|
"displacement_map": None,
|
|
}
|
|
|
|
if not CV2_AVAILABLE:
|
|
return img, no_correction
|
|
|
|
t0 = time.time()
|
|
|
|
# Run both methods
|
|
result_a = _dewarp_by_vertical_edges(img)
|
|
result_b = _dewarp_by_text_baseline(img)
|
|
|
|
duration = time.time() - t0
|
|
|
|
logger.info(f"dewarp: vertical_edge conf={result_a['confidence']:.2f} "
|
|
f"curv={result_a['curvature_px']:.1f}px | "
|
|
f"text_baseline conf={result_b['confidence']:.2f} "
|
|
f"curv={result_b['curvature_px']:.1f}px "
|
|
f"({duration:.2f}s)")
|
|
|
|
# Pick best method: prefer significant curvature over high confidence
|
|
# If one method found real curvature (>5px) and the other didn't (<3px),
|
|
# prefer the one with real curvature regardless of confidence.
|
|
a_has_curvature = result_a["curvature_px"] >= 5.0 and result_a["displacement_map"] is not None
|
|
b_has_curvature = result_b["curvature_px"] >= 5.0 and result_b["displacement_map"] is not None
|
|
|
|
if a_has_curvature and not b_has_curvature:
|
|
best = result_a
|
|
elif b_has_curvature and not a_has_curvature:
|
|
best = result_b
|
|
elif result_a["confidence"] >= result_b["confidence"]:
|
|
best = result_a
|
|
else:
|
|
best = result_b
|
|
|
|
logger.info(f"dewarp: selected {best['method']} "
|
|
f"(curv={best['curvature_px']:.1f}px, conf={best['confidence']:.2f})")
|
|
|
|
if best["displacement_map"] is None or best["curvature_px"] < 2.0:
|
|
return img, no_correction
|
|
|
|
# Apply correction
|
|
corrected = _apply_displacement_map(img, best["displacement_map"], scale=1.0)
|
|
|
|
info = {
|
|
"method": best["method"],
|
|
"curvature_px": best["curvature_px"],
|
|
"confidence": best["confidence"],
|
|
"displacement_map": best["displacement_map"],
|
|
}
|
|
|
|
return corrected, info
|
|
|
|
|
|
def dewarp_image_manual(img: np.ndarray, displacement_map: np.ndarray,
|
|
scale: float) -> np.ndarray:
|
|
"""Apply dewarp with manual scale adjustment.
|
|
|
|
Args:
|
|
img: BGR image (deskewed, before dewarp).
|
|
displacement_map: The displacement map from auto-dewarp.
|
|
scale: Manual scale factor (-3.0 to +3.0).
|
|
|
|
Returns:
|
|
Corrected image.
|
|
"""
|
|
scale = max(-3.0, min(3.0, scale))
|
|
if abs(scale) < 0.01:
|
|
return img
|
|
return _apply_displacement_map(img, displacement_map, scale=scale)
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 4: Dual Image Preparation
|
|
# =============================================================================
|
|
|
|
def create_ocr_image(img: np.ndarray) -> np.ndarray:
|
|
"""Create a binarized image optimized for Tesseract OCR.
|
|
|
|
Steps: Grayscale → Background normalization → Adaptive threshold → Denoise.
|
|
|
|
Args:
|
|
img: BGR image.
|
|
|
|
Returns:
|
|
Binary image (white text on black background inverted to black on white).
|
|
"""
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Background normalization: divide by blurred version
|
|
bg = cv2.GaussianBlur(gray, (51, 51), 0)
|
|
normalized = cv2.divide(gray, bg, scale=255)
|
|
|
|
# Adaptive binarization
|
|
binary = cv2.adaptiveThreshold(
|
|
normalized, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
|
cv2.THRESH_BINARY, 31, 10
|
|
)
|
|
|
|
# Light denoise
|
|
denoised = cv2.medianBlur(binary, 3)
|
|
|
|
return denoised
|
|
|
|
|
|
def create_layout_image(img: np.ndarray) -> np.ndarray:
|
|
"""Create a CLAHE-enhanced grayscale image for layout analysis.
|
|
|
|
Args:
|
|
img: BGR image.
|
|
|
|
Returns:
|
|
Enhanced grayscale image.
|
|
"""
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
|
|
enhanced = clahe.apply(gray)
|
|
return enhanced
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 5: Layout Analysis (Projection Profiles)
|
|
# =============================================================================
|
|
|
|
def _find_content_bounds(inv: np.ndarray) -> Tuple[int, int, int, int]:
|
|
"""Find the bounding box of actual text content (excluding page margins).
|
|
|
|
Returns:
|
|
Tuple of (left_x, right_x, top_y, bottom_y).
|
|
"""
|
|
h, w = inv.shape[:2]
|
|
|
|
# Horizontal projection for top/bottom
|
|
h_proj = np.sum(inv, axis=1).astype(float) / (w * 255)
|
|
|
|
top_y = 0
|
|
for y in range(h):
|
|
if h_proj[y] > 0.005:
|
|
top_y = max(0, y - 5)
|
|
break
|
|
|
|
bottom_y = h
|
|
for y in range(h - 1, 0, -1):
|
|
if h_proj[y] > 0.005:
|
|
bottom_y = min(h, y + 5)
|
|
break
|
|
|
|
# Vertical projection for left/right margins
|
|
v_proj = np.sum(inv[top_y:bottom_y, :], axis=0).astype(float)
|
|
v_proj_norm = v_proj / ((bottom_y - top_y) * 255) if (bottom_y - top_y) > 0 else v_proj
|
|
|
|
left_x = 0
|
|
for x in range(w):
|
|
if v_proj_norm[x] > 0.005:
|
|
left_x = max(0, x - 2)
|
|
break
|
|
|
|
right_x = w
|
|
for x in range(w - 1, 0, -1):
|
|
if v_proj_norm[x] > 0.005:
|
|
right_x = min(w, x + 2)
|
|
break
|
|
|
|
return left_x, right_x, top_y, bottom_y
|
|
|
|
|
|
def analyze_layout(layout_img: np.ndarray, ocr_img: np.ndarray) -> List[PageRegion]:
|
|
"""Detect columns, header, and footer using projection profiles.
|
|
|
|
Uses content-bounds detection to exclude page margins before searching
|
|
for column separators within the actual text area.
|
|
|
|
Args:
|
|
layout_img: CLAHE-enhanced grayscale image.
|
|
ocr_img: Binarized image for text density analysis.
|
|
|
|
Returns:
|
|
List of PageRegion objects describing detected regions.
|
|
"""
|
|
h, w = ocr_img.shape[:2]
|
|
|
|
# Invert: black text on white → white text on black for projection
|
|
inv = cv2.bitwise_not(ocr_img)
|
|
|
|
# --- Find actual content bounds (exclude page margins) ---
|
|
left_x, right_x, top_y, bottom_y = _find_content_bounds(inv)
|
|
content_w = right_x - left_x
|
|
content_h = bottom_y - top_y
|
|
|
|
logger.info(f"Layout: content bounds x=[{left_x}..{right_x}] ({content_w}px), "
|
|
f"y=[{top_y}..{bottom_y}] ({content_h}px) in {w}x{h} image")
|
|
|
|
if content_w < w * 0.3 or content_h < h * 0.3:
|
|
# Fallback if detection seems wrong
|
|
left_x, right_x = 0, w
|
|
top_y, bottom_y = 0, h
|
|
content_w, content_h = w, h
|
|
|
|
# --- Vertical projection within content area to find column separators ---
|
|
content_strip = inv[top_y:bottom_y, left_x:right_x]
|
|
v_proj = np.sum(content_strip, axis=0).astype(float)
|
|
v_proj_norm = v_proj / (content_h * 255) if content_h > 0 else v_proj
|
|
|
|
# Smooth the projection profile
|
|
kernel_size = max(5, content_w // 50)
|
|
if kernel_size % 2 == 0:
|
|
kernel_size += 1
|
|
v_proj_smooth = np.convolve(v_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
|
|
|
|
# Debug: log projection profile statistics
|
|
p_mean = float(np.mean(v_proj_smooth))
|
|
p_median = float(np.median(v_proj_smooth))
|
|
p_min = float(np.min(v_proj_smooth))
|
|
p_max = float(np.max(v_proj_smooth))
|
|
logger.info(f"Layout: v_proj stats — min={p_min:.4f}, max={p_max:.4f}, "
|
|
f"mean={p_mean:.4f}, median={p_median:.4f}")
|
|
|
|
# Find valleys using multiple threshold strategies
|
|
# Strategy 1: relative to median (catches clear separators)
|
|
# Strategy 2: local minima approach (catches subtle gaps)
|
|
threshold = max(p_median * 0.3, p_mean * 0.2)
|
|
logger.info(f"Layout: valley threshold={threshold:.4f}")
|
|
|
|
in_valley = v_proj_smooth < threshold
|
|
|
|
# Find contiguous valley regions
|
|
all_valleys = []
|
|
start = None
|
|
for x in range(len(v_proj_smooth)):
|
|
if in_valley[x] and start is None:
|
|
start = x
|
|
elif not in_valley[x] and start is not None:
|
|
valley_width = x - start
|
|
valley_depth = float(np.min(v_proj_smooth[start:x]))
|
|
# Valley must be at least 3px wide
|
|
if valley_width >= 3:
|
|
all_valleys.append((start, x, (start + x) // 2, valley_width, valley_depth))
|
|
start = None
|
|
|
|
logger.info(f"Layout: raw valleys (before filter): {len(all_valleys)} — "
|
|
f"{[(v[0]+left_x, v[1]+left_x, v[3], f'{v[4]:.4f}') for v in all_valleys[:10]]}")
|
|
|
|
# Filter: valleys must be inside the content area (not at edges)
|
|
inner_margin = int(content_w * 0.08)
|
|
valleys = [v for v in all_valleys if inner_margin < v[2] < content_w - inner_margin]
|
|
|
|
# If no valleys found with strict threshold, try local minima approach
|
|
if len(valleys) < 2:
|
|
logger.info("Layout: trying local minima approach for column detection")
|
|
# Divide content into 20 segments, find the 2 lowest
|
|
seg_count = 20
|
|
seg_width = content_w // seg_count
|
|
seg_scores = []
|
|
for i in range(seg_count):
|
|
sx = i * seg_width
|
|
ex = min((i + 1) * seg_width, content_w)
|
|
seg_mean = float(np.mean(v_proj_smooth[sx:ex]))
|
|
seg_scores.append((i, sx, ex, seg_mean))
|
|
|
|
seg_scores.sort(key=lambda s: s[3])
|
|
logger.info(f"Layout: segment scores (lowest 5): "
|
|
f"{[(s[0], s[1]+left_x, s[2]+left_x, f'{s[3]:.4f}') for s in seg_scores[:5]]}")
|
|
|
|
# Find two lowest non-adjacent segments that create reasonable columns
|
|
candidate_valleys = []
|
|
for seg_idx, sx, ex, seg_mean in seg_scores:
|
|
# Must not be at the edges
|
|
if seg_idx <= 1 or seg_idx >= seg_count - 2:
|
|
continue
|
|
# Must be significantly lower than overall mean
|
|
if seg_mean < p_mean * 0.6:
|
|
center = (sx + ex) // 2
|
|
candidate_valleys.append((sx, ex, center, ex - sx, seg_mean))
|
|
|
|
if len(candidate_valleys) >= 2:
|
|
# Pick the best pair: non-adjacent, creating reasonable column widths
|
|
candidate_valleys.sort(key=lambda v: v[2])
|
|
best_pair = None
|
|
best_score = float('inf')
|
|
for i in range(len(candidate_valleys)):
|
|
for j in range(i + 1, len(candidate_valleys)):
|
|
c1 = candidate_valleys[i][2]
|
|
c2 = candidate_valleys[j][2]
|
|
# Must be at least 20% apart
|
|
if (c2 - c1) < content_w * 0.2:
|
|
continue
|
|
col1 = c1
|
|
col2 = c2 - c1
|
|
col3 = content_w - c2
|
|
# Each column at least 15%
|
|
if col1 < content_w * 0.12 or col2 < content_w * 0.12 or col3 < content_w * 0.12:
|
|
continue
|
|
parts = sorted([col1, col2, col3])
|
|
score = parts[2] - parts[0]
|
|
if score < best_score:
|
|
best_score = score
|
|
best_pair = (candidate_valleys[i], candidate_valleys[j])
|
|
|
|
if best_pair:
|
|
valleys = list(best_pair)
|
|
logger.info(f"Layout: local minima found 2 valleys: "
|
|
f"{[(v[0]+left_x, v[1]+left_x, v[3]) for v in valleys]}")
|
|
|
|
logger.info(f"Layout: final {len(valleys)} valleys: "
|
|
f"{[(v[0]+left_x, v[1]+left_x, v[3]) for v in valleys]}")
|
|
|
|
regions = []
|
|
|
|
if len(valleys) >= 2:
|
|
# 3-column layout detected
|
|
valleys.sort(key=lambda v: v[2])
|
|
|
|
if len(valleys) == 2:
|
|
sep1_center = valleys[0][2]
|
|
sep2_center = valleys[1][2]
|
|
else:
|
|
# Pick the two valleys that best divide into 3 parts
|
|
# Prefer wider valleys (more likely true separators)
|
|
best_pair = None
|
|
best_score = float('inf')
|
|
for i in range(len(valleys)):
|
|
for j in range(i + 1, len(valleys)):
|
|
c1, c2 = valleys[i][2], valleys[j][2]
|
|
# Each column should be at least 15% of content width
|
|
col1 = c1
|
|
col2 = c2 - c1
|
|
col3 = content_w - c2
|
|
if col1 < content_w * 0.15 or col2 < content_w * 0.15 or col3 < content_w * 0.15:
|
|
continue
|
|
# Score: lower is better (more even distribution)
|
|
parts = sorted([col1, col2, col3])
|
|
score = parts[2] - parts[0]
|
|
# Bonus for wider valleys (subtract valley width)
|
|
score -= (valleys[i][3] + valleys[j][3]) * 0.5
|
|
if score < best_score:
|
|
best_score = score
|
|
best_pair = (c1, c2)
|
|
if best_pair:
|
|
sep1_center, sep2_center = best_pair
|
|
else:
|
|
sep1_center = valleys[0][2]
|
|
sep2_center = valleys[1][2]
|
|
|
|
# Convert from content-relative to absolute coordinates
|
|
abs_sep1 = sep1_center + left_x
|
|
abs_sep2 = sep2_center + left_x
|
|
|
|
logger.info(f"Layout: 3 columns at separators x={abs_sep1}, x={abs_sep2} "
|
|
f"(widths: {abs_sep1}, {abs_sep2-abs_sep1}, {w-abs_sep2})")
|
|
|
|
regions.append(PageRegion(
|
|
type='column_en', x=0, y=top_y,
|
|
width=abs_sep1, height=content_h
|
|
))
|
|
regions.append(PageRegion(
|
|
type='column_de', x=abs_sep1, y=top_y,
|
|
width=abs_sep2 - abs_sep1, height=content_h
|
|
))
|
|
regions.append(PageRegion(
|
|
type='column_example', x=abs_sep2, y=top_y,
|
|
width=w - abs_sep2, height=content_h
|
|
))
|
|
|
|
elif len(valleys) == 1:
|
|
# 2-column layout
|
|
abs_sep = valleys[0][2] + left_x
|
|
|
|
logger.info(f"Layout: 2 columns at separator x={abs_sep}")
|
|
|
|
regions.append(PageRegion(
|
|
type='column_en', x=0, y=top_y,
|
|
width=abs_sep, height=content_h
|
|
))
|
|
regions.append(PageRegion(
|
|
type='column_de', x=abs_sep, y=top_y,
|
|
width=w - abs_sep, height=content_h
|
|
))
|
|
|
|
else:
|
|
# No columns detected — run full-page OCR as single column
|
|
logger.warning("Layout: no column separators found, using full page")
|
|
regions.append(PageRegion(
|
|
type='column_en', x=0, y=top_y,
|
|
width=w, height=content_h
|
|
))
|
|
|
|
# Add header/footer info
|
|
if top_y > 10:
|
|
regions.append(PageRegion(
|
|
type='header', x=0, y=0,
|
|
width=w, height=top_y
|
|
))
|
|
if bottom_y < h - 10:
|
|
regions.append(PageRegion(
|
|
type='footer', x=0, y=bottom_y,
|
|
width=w, height=h - bottom_y
|
|
))
|
|
|
|
col_count = len([r for r in regions if r.type.startswith('column')])
|
|
logger.info(f"Layout: {col_count} columns, "
|
|
f"header={'yes' if top_y > 10 else 'no'}, "
|
|
f"footer={'yes' if bottom_y < h - 10 else 'no'}")
|
|
|
|
return regions
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 6: Multi-Pass OCR
|
|
# =============================================================================
|
|
|
|
def ocr_region(ocr_img: np.ndarray, region: PageRegion, lang: str,
|
|
psm: int, fallback_psm: Optional[int] = None,
|
|
min_confidence: float = 40.0) -> List[Dict[str, Any]]:
|
|
"""Run Tesseract OCR on a specific region with given PSM.
|
|
|
|
Args:
|
|
ocr_img: Binarized full-page image.
|
|
region: Region to crop and OCR.
|
|
lang: Tesseract language string.
|
|
psm: Page Segmentation Mode.
|
|
fallback_psm: If confidence too low, retry with this PSM per line.
|
|
min_confidence: Minimum average confidence before fallback.
|
|
|
|
Returns:
|
|
List of word dicts with text, position, confidence.
|
|
"""
|
|
# Crop region
|
|
crop = ocr_img[region.y:region.y + region.height,
|
|
region.x:region.x + region.width]
|
|
|
|
if crop.size == 0:
|
|
return []
|
|
|
|
# Convert to PIL for pytesseract
|
|
pil_img = Image.fromarray(crop)
|
|
|
|
# Run Tesseract with specified PSM
|
|
config = f'--psm {psm} --oem 3'
|
|
try:
|
|
data = pytesseract.image_to_data(pil_img, lang=lang, config=config,
|
|
output_type=pytesseract.Output.DICT)
|
|
except Exception as e:
|
|
logger.warning(f"Tesseract failed for region {region.type}: {e}")
|
|
return []
|
|
|
|
words = []
|
|
for i in range(len(data['text'])):
|
|
text = data['text'][i].strip()
|
|
conf = int(data['conf'][i])
|
|
if not text or conf < 10:
|
|
continue
|
|
words.append({
|
|
'text': text,
|
|
'left': data['left'][i] + region.x, # Absolute coords
|
|
'top': data['top'][i] + region.y,
|
|
'width': data['width'][i],
|
|
'height': data['height'][i],
|
|
'conf': conf,
|
|
'region_type': region.type,
|
|
})
|
|
|
|
# Check average confidence
|
|
if words and fallback_psm is not None:
|
|
avg_conf = sum(w['conf'] for w in words) / len(words)
|
|
if avg_conf < min_confidence:
|
|
logger.info(f"Region {region.type}: avg confidence {avg_conf:.0f}% < {min_confidence}%, "
|
|
f"trying fallback PSM {fallback_psm}")
|
|
words = _ocr_region_line_by_line(ocr_img, region, lang, fallback_psm)
|
|
|
|
return words
|
|
|
|
|
|
def _ocr_region_line_by_line(ocr_img: np.ndarray, region: PageRegion,
|
|
lang: str, psm: int) -> List[Dict[str, Any]]:
|
|
"""OCR a region line by line (fallback for low-confidence regions).
|
|
|
|
Splits the region into horizontal strips based on text density,
|
|
then OCRs each strip individually with the given PSM.
|
|
"""
|
|
crop = ocr_img[region.y:region.y + region.height,
|
|
region.x:region.x + region.width]
|
|
|
|
if crop.size == 0:
|
|
return []
|
|
|
|
# Find text lines via horizontal projection
|
|
inv = cv2.bitwise_not(crop)
|
|
h_proj = np.sum(inv, axis=1)
|
|
threshold = np.max(h_proj) * 0.05 if np.max(h_proj) > 0 else 0
|
|
|
|
# Find line boundaries
|
|
lines = []
|
|
in_text = False
|
|
line_start = 0
|
|
for y in range(len(h_proj)):
|
|
if h_proj[y] > threshold and not in_text:
|
|
line_start = y
|
|
in_text = True
|
|
elif h_proj[y] <= threshold and in_text:
|
|
if y - line_start > 5: # Minimum line height
|
|
lines.append((line_start, y))
|
|
in_text = False
|
|
if in_text and len(h_proj) - line_start > 5:
|
|
lines.append((line_start, len(h_proj)))
|
|
|
|
all_words = []
|
|
config = f'--psm {psm} --oem 3'
|
|
|
|
for line_y_start, line_y_end in lines:
|
|
# Add small padding
|
|
pad = 3
|
|
y1 = max(0, line_y_start - pad)
|
|
y2 = min(crop.shape[0], line_y_end + pad)
|
|
line_crop = crop[y1:y2, :]
|
|
|
|
if line_crop.size == 0:
|
|
continue
|
|
|
|
pil_img = Image.fromarray(line_crop)
|
|
try:
|
|
data = pytesseract.image_to_data(pil_img, lang=lang, config=config,
|
|
output_type=pytesseract.Output.DICT)
|
|
except Exception:
|
|
continue
|
|
|
|
for i in range(len(data['text'])):
|
|
text = data['text'][i].strip()
|
|
conf = int(data['conf'][i])
|
|
if not text or conf < 10:
|
|
continue
|
|
all_words.append({
|
|
'text': text,
|
|
'left': data['left'][i] + region.x,
|
|
'top': data['top'][i] + region.y + y1,
|
|
'width': data['width'][i],
|
|
'height': data['height'][i],
|
|
'conf': conf,
|
|
'region_type': region.type,
|
|
})
|
|
|
|
return all_words
|
|
|
|
|
|
def run_multi_pass_ocr(ocr_img: np.ndarray,
|
|
regions: List[PageRegion],
|
|
lang: str = "eng+deu") -> Dict[str, List[Dict]]:
|
|
"""Run OCR on each detected region with optimized settings.
|
|
|
|
Args:
|
|
ocr_img: Binarized full-page image.
|
|
regions: Detected page regions.
|
|
lang: Default language.
|
|
|
|
Returns:
|
|
Dict mapping region type to list of word dicts.
|
|
"""
|
|
results: Dict[str, List[Dict]] = {}
|
|
|
|
for region in regions:
|
|
if region.type == 'header' or region.type == 'footer':
|
|
continue # Skip non-content regions
|
|
|
|
if region.type == 'column_en':
|
|
words = ocr_region(ocr_img, region, lang='eng', psm=4)
|
|
elif region.type == 'column_de':
|
|
words = ocr_region(ocr_img, region, lang='deu', psm=4)
|
|
elif region.type == 'column_example':
|
|
words = ocr_region(ocr_img, region, lang=lang, psm=6,
|
|
fallback_psm=7, min_confidence=40.0)
|
|
else:
|
|
words = ocr_region(ocr_img, region, lang=lang, psm=6)
|
|
|
|
results[region.type] = words
|
|
logger.info(f"OCR {region.type}: {len(words)} words")
|
|
|
|
return results
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 7: Line Alignment → Vocabulary Entries
|
|
# =============================================================================
|
|
|
|
def _group_words_into_lines(words: List[Dict], y_tolerance_px: int = 20) -> List[List[Dict]]:
|
|
"""Group words by Y position into lines, sorted by X within each line."""
|
|
if not words:
|
|
return []
|
|
|
|
sorted_words = sorted(words, key=lambda w: (w['top'], w['left']))
|
|
lines: List[List[Dict]] = []
|
|
current_line: List[Dict] = [sorted_words[0]]
|
|
current_y = sorted_words[0]['top']
|
|
|
|
for word in sorted_words[1:]:
|
|
if abs(word['top'] - current_y) <= y_tolerance_px:
|
|
current_line.append(word)
|
|
else:
|
|
current_line.sort(key=lambda w: w['left'])
|
|
lines.append(current_line)
|
|
current_line = [word]
|
|
current_y = word['top']
|
|
|
|
if current_line:
|
|
current_line.sort(key=lambda w: w['left'])
|
|
lines.append(current_line)
|
|
|
|
return lines
|
|
|
|
|
|
def match_lines_to_vocab(ocr_results: Dict[str, List[Dict]],
|
|
regions: List[PageRegion],
|
|
y_tolerance_px: int = 25) -> List[VocabRow]:
|
|
"""Align OCR results from different columns into vocabulary rows.
|
|
|
|
Uses Y-coordinate matching to pair English words, German translations,
|
|
and example sentences that appear on the same line.
|
|
|
|
Args:
|
|
ocr_results: Dict mapping region type to word lists.
|
|
regions: Detected regions (for reference).
|
|
y_tolerance_px: Max Y-distance to consider words on the same row.
|
|
|
|
Returns:
|
|
List of VocabRow objects.
|
|
"""
|
|
# Group words into lines per column
|
|
en_lines = _group_words_into_lines(ocr_results.get('column_en', []), y_tolerance_px)
|
|
de_lines = _group_words_into_lines(ocr_results.get('column_de', []), y_tolerance_px)
|
|
ex_lines = _group_words_into_lines(ocr_results.get('column_example', []), y_tolerance_px)
|
|
|
|
def line_y_center(line: List[Dict]) -> float:
|
|
return sum(w['top'] + w['height'] / 2 for w in line) / len(line)
|
|
|
|
def line_text(line: List[Dict]) -> str:
|
|
return ' '.join(w['text'] for w in line)
|
|
|
|
def line_confidence(line: List[Dict]) -> float:
|
|
return sum(w['conf'] for w in line) / len(line) if line else 0
|
|
|
|
# Build EN entries as the primary reference
|
|
vocab_rows: List[VocabRow] = []
|
|
|
|
for en_line in en_lines:
|
|
en_y = line_y_center(en_line)
|
|
en_text = line_text(en_line)
|
|
en_conf = line_confidence(en_line)
|
|
|
|
# Skip very short or likely header content
|
|
if len(en_text.strip()) < 2:
|
|
continue
|
|
|
|
# Find matching DE line
|
|
de_text = ""
|
|
de_conf = 0.0
|
|
best_de_dist = float('inf')
|
|
best_de_idx = -1
|
|
for idx, de_line in enumerate(de_lines):
|
|
dist = abs(line_y_center(de_line) - en_y)
|
|
if dist < y_tolerance_px and dist < best_de_dist:
|
|
best_de_dist = dist
|
|
best_de_idx = idx
|
|
|
|
if best_de_idx >= 0:
|
|
de_text = line_text(de_lines[best_de_idx])
|
|
de_conf = line_confidence(de_lines[best_de_idx])
|
|
|
|
# Find matching example line
|
|
ex_text = ""
|
|
ex_conf = 0.0
|
|
best_ex_dist = float('inf')
|
|
best_ex_idx = -1
|
|
for idx, ex_line in enumerate(ex_lines):
|
|
dist = abs(line_y_center(ex_line) - en_y)
|
|
if dist < y_tolerance_px and dist < best_ex_dist:
|
|
best_ex_dist = dist
|
|
best_ex_idx = idx
|
|
|
|
if best_ex_idx >= 0:
|
|
ex_text = line_text(ex_lines[best_ex_idx])
|
|
ex_conf = line_confidence(ex_lines[best_ex_idx])
|
|
|
|
avg_conf = en_conf
|
|
conf_count = 1
|
|
if de_conf > 0:
|
|
avg_conf += de_conf
|
|
conf_count += 1
|
|
if ex_conf > 0:
|
|
avg_conf += ex_conf
|
|
conf_count += 1
|
|
|
|
vocab_rows.append(VocabRow(
|
|
english=en_text.strip(),
|
|
german=de_text.strip(),
|
|
example=ex_text.strip(),
|
|
confidence=avg_conf / conf_count,
|
|
y_position=int(en_y),
|
|
))
|
|
|
|
# Handle multi-line wrapping in example column:
|
|
# If an example line has no matching EN/DE, append to previous entry
|
|
matched_ex_ys = set()
|
|
for row in vocab_rows:
|
|
if row.example:
|
|
matched_ex_ys.add(row.y_position)
|
|
|
|
for ex_line in ex_lines:
|
|
ex_y = line_y_center(ex_line)
|
|
# Check if already matched
|
|
already_matched = any(abs(ex_y - y) < y_tolerance_px for y in matched_ex_ys)
|
|
if already_matched:
|
|
continue
|
|
|
|
# Find nearest previous vocab row
|
|
best_row = None
|
|
best_dist = float('inf')
|
|
for row in vocab_rows:
|
|
dist = ex_y - row.y_position
|
|
if 0 < dist < y_tolerance_px * 3 and dist < best_dist:
|
|
best_dist = dist
|
|
best_row = row
|
|
|
|
if best_row:
|
|
continuation = line_text(ex_line).strip()
|
|
if continuation:
|
|
best_row.example = (best_row.example + " " + continuation).strip()
|
|
|
|
# Sort by Y position
|
|
vocab_rows.sort(key=lambda r: r.y_position)
|
|
|
|
return vocab_rows
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 8: Optional LLM Post-Correction
|
|
# =============================================================================
|
|
|
|
async def llm_post_correct(img: np.ndarray, vocab_rows: List[VocabRow],
|
|
confidence_threshold: float = 50.0,
|
|
enabled: bool = False) -> List[VocabRow]:
|
|
"""Optionally send low-confidence regions to Qwen-VL for correction.
|
|
|
|
Default: disabled. Enable per parameter.
|
|
|
|
Args:
|
|
img: Original BGR image.
|
|
vocab_rows: Current vocabulary rows.
|
|
confidence_threshold: Rows below this get LLM correction.
|
|
enabled: Whether to actually run LLM correction.
|
|
|
|
Returns:
|
|
Corrected vocabulary rows.
|
|
"""
|
|
if not enabled:
|
|
return vocab_rows
|
|
|
|
# TODO: Implement Qwen-VL correction for low-confidence entries
|
|
# For each row with confidence < threshold:
|
|
# 1. Crop the relevant region from img
|
|
# 2. Send crop + OCR text to Qwen-VL
|
|
# 3. Replace text if LLM provides a confident correction
|
|
logger.info(f"LLM post-correction skipped (not yet implemented)")
|
|
return vocab_rows
|
|
|
|
|
|
# =============================================================================
|
|
# Orchestrator
|
|
# =============================================================================
|
|
|
|
async def run_cv_pipeline(
|
|
pdf_data: Optional[bytes] = None,
|
|
image_data: Optional[bytes] = None,
|
|
page_number: int = 0,
|
|
zoom: float = 3.0,
|
|
enable_dewarp: bool = True,
|
|
enable_llm_correction: bool = False,
|
|
lang: str = "eng+deu",
|
|
) -> PipelineResult:
|
|
"""Run the complete CV document reconstruction pipeline.
|
|
|
|
Args:
|
|
pdf_data: Raw PDF bytes (mutually exclusive with image_data).
|
|
image_data: Raw image bytes (mutually exclusive with pdf_data).
|
|
page_number: 0-indexed page number (for PDF).
|
|
zoom: PDF rendering zoom factor.
|
|
enable_dewarp: Whether to run dewarp stage.
|
|
enable_llm_correction: Whether to run LLM post-correction.
|
|
lang: Tesseract language string.
|
|
|
|
Returns:
|
|
PipelineResult with vocabulary and timing info.
|
|
"""
|
|
if not CV_PIPELINE_AVAILABLE:
|
|
return PipelineResult(error="CV pipeline not available (OpenCV or Tesseract missing)")
|
|
|
|
result = PipelineResult()
|
|
total_start = time.time()
|
|
|
|
try:
|
|
# Stage 1: Render
|
|
t = time.time()
|
|
if pdf_data:
|
|
img = render_pdf_high_res(pdf_data, page_number, zoom)
|
|
elif image_data:
|
|
img = render_image_high_res(image_data)
|
|
else:
|
|
return PipelineResult(error="No input data (pdf_data or image_data required)")
|
|
result.stages['render'] = round(time.time() - t, 2)
|
|
result.image_width = img.shape[1]
|
|
result.image_height = img.shape[0]
|
|
logger.info(f"Stage 1 (render): {img.shape[1]}x{img.shape[0]} in {result.stages['render']}s")
|
|
|
|
# Stage 2: Deskew
|
|
t = time.time()
|
|
img, angle = deskew_image(img)
|
|
result.stages['deskew'] = round(time.time() - t, 2)
|
|
logger.info(f"Stage 2 (deskew): {angle:.2f}° in {result.stages['deskew']}s")
|
|
|
|
# Stage 3: Dewarp
|
|
if enable_dewarp:
|
|
t = time.time()
|
|
img = dewarp_image(img)
|
|
result.stages['dewarp'] = round(time.time() - t, 2)
|
|
|
|
# Stage 4: Dual image preparation
|
|
t = time.time()
|
|
ocr_img = create_ocr_image(img)
|
|
layout_img = create_layout_image(img)
|
|
result.stages['image_prep'] = round(time.time() - t, 2)
|
|
|
|
# Stage 5: Layout analysis
|
|
t = time.time()
|
|
regions = analyze_layout(layout_img, ocr_img)
|
|
result.stages['layout'] = round(time.time() - t, 2)
|
|
result.columns_detected = len([r for r in regions if r.type.startswith('column')])
|
|
logger.info(f"Stage 5 (layout): {result.columns_detected} columns in {result.stages['layout']}s")
|
|
|
|
# Stage 6: Multi-pass OCR
|
|
t = time.time()
|
|
ocr_results = run_multi_pass_ocr(ocr_img, regions, lang)
|
|
result.stages['ocr'] = round(time.time() - t, 2)
|
|
total_words = sum(len(w) for w in ocr_results.values())
|
|
result.word_count = total_words
|
|
logger.info(f"Stage 6 (OCR): {total_words} words in {result.stages['ocr']}s")
|
|
|
|
# Stage 7: Line alignment
|
|
t = time.time()
|
|
vocab_rows = match_lines_to_vocab(ocr_results, regions)
|
|
result.stages['alignment'] = round(time.time() - t, 2)
|
|
|
|
# Stage 8: Optional LLM correction
|
|
if enable_llm_correction:
|
|
t = time.time()
|
|
vocab_rows = await llm_post_correct(img, vocab_rows)
|
|
result.stages['llm_correction'] = round(time.time() - t, 2)
|
|
|
|
# Convert to output format
|
|
result.vocabulary = [
|
|
{
|
|
"english": row.english,
|
|
"german": row.german,
|
|
"example": row.example,
|
|
"confidence": round(row.confidence, 1),
|
|
}
|
|
for row in vocab_rows
|
|
if row.english or row.german # Skip empty rows
|
|
]
|
|
|
|
result.duration_seconds = round(time.time() - total_start, 2)
|
|
logger.info(f"CV Pipeline complete: {len(result.vocabulary)} entries in {result.duration_seconds}s")
|
|
|
|
except Exception as e:
|
|
logger.error(f"CV Pipeline error: {e}")
|
|
import traceback
|
|
logger.debug(traceback.format_exc())
|
|
result.error = str(e)
|
|
result.duration_seconds = round(time.time() - total_start, 2)
|
|
|
|
return result
|