Begradigt schiefe Scans vor der OCR-Extraktion anhand der linksbuendigen
Wortanfaenge der Vokabelspalte. Tesseract liefert achsenparallele Boxen,
die bei ~2-3 Grad Schraege in Nachbarzeilen bluten — der Deskew behebt das.
- Neue Funktion deskew_image_by_word_alignment() in cv_vocab_pipeline.py
- Deskew-Integration im extract-with-boxes Endpoint (vor OCR)
- Neuer GET Endpoint /deskewed-image/{page} fuer begradigtes Seitenbild
- Frontend: GroundTruthPanel wechselt nach Extraktion auf deskewed Image
- ~1s Overhead durch schnellen Tesseract-Pass auf halbiertem Bild
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1141 lines
39 KiB
Python
1141 lines
39 KiB
Python
"""
|
|
CV-based Document Reconstruction Pipeline for Vocabulary Extraction.
|
|
|
|
Uses classical Computer Vision techniques for high-quality OCR:
|
|
- High-resolution PDF rendering (432 DPI)
|
|
- Deskew (rotation correction via Hough Lines)
|
|
- Dewarp (book curvature correction) — pass-through initially
|
|
- Dual image preparation (binarized for OCR, CLAHE for layout)
|
|
- Projection-profile layout analysis (column/row detection)
|
|
- Multi-pass Tesseract OCR with region-specific PSM settings
|
|
- Y-coordinate line alignment for vocabulary matching
|
|
- Optional LLM post-correction for low-confidence regions
|
|
|
|
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
|
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
|
"""
|
|
|
|
import io
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
|
|
import numpy as np
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Availability Guards ---
|
|
|
|
try:
|
|
import cv2
|
|
CV2_AVAILABLE = True
|
|
except ImportError:
|
|
cv2 = None
|
|
CV2_AVAILABLE = False
|
|
logger.warning("OpenCV not available — CV pipeline disabled")
|
|
|
|
try:
|
|
import pytesseract
|
|
from PIL import Image
|
|
TESSERACT_AVAILABLE = True
|
|
except ImportError:
|
|
pytesseract = None
|
|
Image = None
|
|
TESSERACT_AVAILABLE = False
|
|
logger.warning("pytesseract/Pillow not available — CV pipeline disabled")
|
|
|
|
CV_PIPELINE_AVAILABLE = CV2_AVAILABLE and TESSERACT_AVAILABLE
|
|
|
|
|
|
# --- Data Classes ---
|
|
|
|
@dataclass
|
|
class PageRegion:
|
|
"""A detected region on the page."""
|
|
type: str # 'column_en', 'column_de', 'column_example', 'header', 'footer'
|
|
x: int
|
|
y: int
|
|
width: int
|
|
height: int
|
|
|
|
|
|
@dataclass
|
|
class VocabRow:
|
|
"""A single vocabulary entry assembled from multi-column OCR."""
|
|
english: str = ""
|
|
german: str = ""
|
|
example: str = ""
|
|
confidence: float = 0.0
|
|
y_position: int = 0
|
|
|
|
|
|
@dataclass
|
|
class PipelineResult:
|
|
"""Complete result of the CV pipeline."""
|
|
vocabulary: List[Dict[str, Any]] = field(default_factory=list)
|
|
word_count: int = 0
|
|
columns_detected: int = 0
|
|
duration_seconds: float = 0.0
|
|
stages: Dict[str, float] = field(default_factory=dict)
|
|
error: Optional[str] = None
|
|
image_width: int = 0
|
|
image_height: int = 0
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 1: High-Resolution PDF Rendering
|
|
# =============================================================================
|
|
|
|
def render_pdf_high_res(pdf_data: bytes, page_number: int = 0, zoom: float = 3.0) -> np.ndarray:
|
|
"""Render a PDF page to a high-resolution numpy array (BGR).
|
|
|
|
Args:
|
|
pdf_data: Raw PDF bytes.
|
|
page_number: 0-indexed page number.
|
|
zoom: Zoom factor (3.0 = 432 DPI).
|
|
|
|
Returns:
|
|
numpy array in BGR format.
|
|
"""
|
|
import fitz # PyMuPDF
|
|
|
|
pdf_doc = fitz.open(stream=pdf_data, filetype="pdf")
|
|
if page_number >= pdf_doc.page_count:
|
|
raise ValueError(f"Page {page_number} does not exist (PDF has {pdf_doc.page_count} pages)")
|
|
|
|
page = pdf_doc[page_number]
|
|
mat = fitz.Matrix(zoom, zoom)
|
|
pix = page.get_pixmap(matrix=mat)
|
|
|
|
# Convert to numpy BGR
|
|
img_data = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n)
|
|
if pix.n == 4: # RGBA
|
|
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGBA2BGR)
|
|
elif pix.n == 3: # RGB
|
|
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGB2BGR)
|
|
else: # Grayscale
|
|
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_GRAY2BGR)
|
|
|
|
pdf_doc.close()
|
|
return img_bgr
|
|
|
|
|
|
def render_image_high_res(image_data: bytes) -> np.ndarray:
|
|
"""Load an image (PNG/JPEG) into a numpy array (BGR).
|
|
|
|
Args:
|
|
image_data: Raw image bytes.
|
|
|
|
Returns:
|
|
numpy array in BGR format.
|
|
"""
|
|
img_array = np.frombuffer(image_data, dtype=np.uint8)
|
|
img_bgr = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
|
|
if img_bgr is None:
|
|
raise ValueError("Could not decode image data")
|
|
return img_bgr
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 2: Deskew (Rotation Correction)
|
|
# =============================================================================
|
|
|
|
def deskew_image(img: np.ndarray) -> Tuple[np.ndarray, float]:
|
|
"""Correct rotation using Hough Line detection.
|
|
|
|
Args:
|
|
img: BGR image.
|
|
|
|
Returns:
|
|
Tuple of (corrected image, detected angle in degrees).
|
|
"""
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
# Binarize for line detection
|
|
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
|
|
|
|
# Detect lines
|
|
lines = cv2.HoughLinesP(binary, 1, np.pi / 180, threshold=100,
|
|
minLineLength=img.shape[1] // 4, maxLineGap=20)
|
|
|
|
if lines is None or len(lines) < 3:
|
|
return img, 0.0
|
|
|
|
# Compute angles of near-horizontal lines
|
|
angles = []
|
|
for line in lines:
|
|
x1, y1, x2, y2 = line[0]
|
|
angle = np.degrees(np.arctan2(y2 - y1, x2 - x1))
|
|
if abs(angle) < 15: # Only near-horizontal
|
|
angles.append(angle)
|
|
|
|
if not angles:
|
|
return img, 0.0
|
|
|
|
median_angle = float(np.median(angles))
|
|
|
|
# Limit correction to ±5°
|
|
if abs(median_angle) > 5.0:
|
|
median_angle = 5.0 * np.sign(median_angle)
|
|
|
|
if abs(median_angle) < 0.1:
|
|
return img, 0.0
|
|
|
|
# Rotate
|
|
h, w = img.shape[:2]
|
|
center = (w // 2, h // 2)
|
|
M = cv2.getRotationMatrix2D(center, median_angle, 1.0)
|
|
corrected = cv2.warpAffine(img, M, (w, h),
|
|
flags=cv2.INTER_LINEAR,
|
|
borderMode=cv2.BORDER_REPLICATE)
|
|
|
|
logger.info(f"Deskew: corrected {median_angle:.2f}° rotation")
|
|
return corrected, median_angle
|
|
|
|
|
|
def deskew_image_by_word_alignment(
|
|
image_data: bytes,
|
|
lang: str = "eng+deu",
|
|
downscale_factor: float = 0.5,
|
|
) -> Tuple[bytes, float]:
|
|
"""Correct rotation by fitting a line through left-most word starts per text line.
|
|
|
|
More robust than Hough-based deskew for vocabulary worksheets where text lines
|
|
have consistent left-alignment. Runs a quick Tesseract pass on a downscaled
|
|
copy to find word positions, computes the dominant left-edge column, fits a
|
|
line through those points and rotates the full-resolution image.
|
|
|
|
Args:
|
|
image_data: Raw image bytes (PNG/JPEG).
|
|
lang: Tesseract language string for the quick pass.
|
|
downscale_factor: Shrink factor for the quick Tesseract pass (0.5 = 50%).
|
|
|
|
Returns:
|
|
Tuple of (rotated image as PNG bytes, detected angle in degrees).
|
|
"""
|
|
if not CV2_AVAILABLE or not TESSERACT_AVAILABLE:
|
|
return image_data, 0.0
|
|
|
|
# 1. Decode image
|
|
img_array = np.frombuffer(image_data, dtype=np.uint8)
|
|
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
|
|
if img is None:
|
|
logger.warning("deskew_by_word_alignment: could not decode image")
|
|
return image_data, 0.0
|
|
|
|
orig_h, orig_w = img.shape[:2]
|
|
|
|
# 2. Downscale for fast Tesseract pass
|
|
small_w = int(orig_w * downscale_factor)
|
|
small_h = int(orig_h * downscale_factor)
|
|
small = cv2.resize(img, (small_w, small_h), interpolation=cv2.INTER_AREA)
|
|
|
|
# 3. Quick Tesseract — word-level positions
|
|
pil_small = Image.fromarray(cv2.cvtColor(small, cv2.COLOR_BGR2RGB))
|
|
try:
|
|
data = pytesseract.image_to_data(
|
|
pil_small, lang=lang, config="--psm 6 --oem 3",
|
|
output_type=pytesseract.Output.DICT,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"deskew_by_word_alignment: Tesseract failed: {e}")
|
|
return image_data, 0.0
|
|
|
|
# 4. Per text-line, find the left-most word start
|
|
# Group by (block_num, par_num, line_num)
|
|
from collections import defaultdict
|
|
line_groups: Dict[tuple, list] = defaultdict(list)
|
|
for i in range(len(data["text"])):
|
|
text = (data["text"][i] or "").strip()
|
|
conf = int(data["conf"][i])
|
|
if not text or conf < 20:
|
|
continue
|
|
key = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
|
|
line_groups[key].append(i)
|
|
|
|
if len(line_groups) < 5:
|
|
logger.info(f"deskew_by_word_alignment: only {len(line_groups)} lines, skipping")
|
|
return image_data, 0.0
|
|
|
|
# For each line, pick the word with smallest 'left' → compute (left_x, center_y)
|
|
# Scale back to original resolution
|
|
scale = 1.0 / downscale_factor
|
|
points = [] # list of (x, y) in original-image coords
|
|
for key, indices in line_groups.items():
|
|
best_idx = min(indices, key=lambda i: data["left"][i])
|
|
lx = data["left"][best_idx] * scale
|
|
top = data["top"][best_idx] * scale
|
|
h = data["height"][best_idx] * scale
|
|
cy = top + h / 2.0
|
|
points.append((lx, cy))
|
|
|
|
# 5. Find dominant left-edge column + compute angle
|
|
xs = np.array([p[0] for p in points])
|
|
ys = np.array([p[1] for p in points])
|
|
median_x = float(np.median(xs))
|
|
tolerance = orig_w * 0.03 # 3% of image width
|
|
|
|
mask = np.abs(xs - median_x) <= tolerance
|
|
filtered_xs = xs[mask]
|
|
filtered_ys = ys[mask]
|
|
|
|
if len(filtered_xs) < 5:
|
|
logger.info(f"deskew_by_word_alignment: only {len(filtered_xs)} aligned points after filter, skipping")
|
|
return image_data, 0.0
|
|
|
|
# polyfit: x = a*y + b → a = dx/dy → angle = arctan(a)
|
|
coeffs = np.polyfit(filtered_ys, filtered_xs, 1)
|
|
slope = coeffs[0] # dx/dy
|
|
angle_rad = np.arctan(slope)
|
|
angle_deg = float(np.degrees(angle_rad))
|
|
|
|
# Clamp to ±5°
|
|
angle_deg = max(-5.0, min(5.0, angle_deg))
|
|
|
|
logger.info(f"deskew_by_word_alignment: detected {angle_deg:.2f}° from {len(filtered_xs)} points "
|
|
f"(total lines: {len(line_groups)})")
|
|
|
|
if abs(angle_deg) < 0.05:
|
|
return image_data, 0.0
|
|
|
|
# 6. Rotate full-res image
|
|
center = (orig_w // 2, orig_h // 2)
|
|
M = cv2.getRotationMatrix2D(center, angle_deg, 1.0)
|
|
rotated = cv2.warpAffine(img, M, (orig_w, orig_h),
|
|
flags=cv2.INTER_LINEAR,
|
|
borderMode=cv2.BORDER_REPLICATE)
|
|
|
|
# Encode back to PNG
|
|
success, png_buf = cv2.imencode(".png", rotated)
|
|
if not success:
|
|
logger.warning("deskew_by_word_alignment: PNG encoding failed")
|
|
return image_data, 0.0
|
|
|
|
return png_buf.tobytes(), angle_deg
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 3: Dewarp (Book Curvature) — Pass-Through for now
|
|
# =============================================================================
|
|
|
|
def dewarp_image(img: np.ndarray) -> np.ndarray:
|
|
"""Correct book curvature distortion.
|
|
|
|
Currently a pass-through. Will be implemented when book scans are tested.
|
|
|
|
Args:
|
|
img: BGR image.
|
|
|
|
Returns:
|
|
Corrected image (or original if no correction needed).
|
|
"""
|
|
# TODO: Implement polynomial fitting + cv2.remap() for book curvature
|
|
return img
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 4: Dual Image Preparation
|
|
# =============================================================================
|
|
|
|
def create_ocr_image(img: np.ndarray) -> np.ndarray:
|
|
"""Create a binarized image optimized for Tesseract OCR.
|
|
|
|
Steps: Grayscale → Background normalization → Adaptive threshold → Denoise.
|
|
|
|
Args:
|
|
img: BGR image.
|
|
|
|
Returns:
|
|
Binary image (white text on black background inverted to black on white).
|
|
"""
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Background normalization: divide by blurred version
|
|
bg = cv2.GaussianBlur(gray, (51, 51), 0)
|
|
normalized = cv2.divide(gray, bg, scale=255)
|
|
|
|
# Adaptive binarization
|
|
binary = cv2.adaptiveThreshold(
|
|
normalized, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
|
|
cv2.THRESH_BINARY, 31, 10
|
|
)
|
|
|
|
# Light denoise
|
|
denoised = cv2.medianBlur(binary, 3)
|
|
|
|
return denoised
|
|
|
|
|
|
def create_layout_image(img: np.ndarray) -> np.ndarray:
|
|
"""Create a CLAHE-enhanced grayscale image for layout analysis.
|
|
|
|
Args:
|
|
img: BGR image.
|
|
|
|
Returns:
|
|
Enhanced grayscale image.
|
|
"""
|
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
|
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
|
|
enhanced = clahe.apply(gray)
|
|
return enhanced
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 5: Layout Analysis (Projection Profiles)
|
|
# =============================================================================
|
|
|
|
def _find_content_bounds(inv: np.ndarray) -> Tuple[int, int, int, int]:
|
|
"""Find the bounding box of actual text content (excluding page margins).
|
|
|
|
Returns:
|
|
Tuple of (left_x, right_x, top_y, bottom_y).
|
|
"""
|
|
h, w = inv.shape[:2]
|
|
|
|
# Horizontal projection for top/bottom
|
|
h_proj = np.sum(inv, axis=1).astype(float) / (w * 255)
|
|
|
|
top_y = 0
|
|
for y in range(h):
|
|
if h_proj[y] > 0.005:
|
|
top_y = max(0, y - 5)
|
|
break
|
|
|
|
bottom_y = h
|
|
for y in range(h - 1, 0, -1):
|
|
if h_proj[y] > 0.005:
|
|
bottom_y = min(h, y + 5)
|
|
break
|
|
|
|
# Vertical projection for left/right margins
|
|
v_proj = np.sum(inv[top_y:bottom_y, :], axis=0).astype(float)
|
|
v_proj_norm = v_proj / ((bottom_y - top_y) * 255) if (bottom_y - top_y) > 0 else v_proj
|
|
|
|
left_x = 0
|
|
for x in range(w):
|
|
if v_proj_norm[x] > 0.005:
|
|
left_x = max(0, x - 2)
|
|
break
|
|
|
|
right_x = w
|
|
for x in range(w - 1, 0, -1):
|
|
if v_proj_norm[x] > 0.005:
|
|
right_x = min(w, x + 2)
|
|
break
|
|
|
|
return left_x, right_x, top_y, bottom_y
|
|
|
|
|
|
def analyze_layout(layout_img: np.ndarray, ocr_img: np.ndarray) -> List[PageRegion]:
|
|
"""Detect columns, header, and footer using projection profiles.
|
|
|
|
Uses content-bounds detection to exclude page margins before searching
|
|
for column separators within the actual text area.
|
|
|
|
Args:
|
|
layout_img: CLAHE-enhanced grayscale image.
|
|
ocr_img: Binarized image for text density analysis.
|
|
|
|
Returns:
|
|
List of PageRegion objects describing detected regions.
|
|
"""
|
|
h, w = ocr_img.shape[:2]
|
|
|
|
# Invert: black text on white → white text on black for projection
|
|
inv = cv2.bitwise_not(ocr_img)
|
|
|
|
# --- Find actual content bounds (exclude page margins) ---
|
|
left_x, right_x, top_y, bottom_y = _find_content_bounds(inv)
|
|
content_w = right_x - left_x
|
|
content_h = bottom_y - top_y
|
|
|
|
logger.info(f"Layout: content bounds x=[{left_x}..{right_x}] ({content_w}px), "
|
|
f"y=[{top_y}..{bottom_y}] ({content_h}px) in {w}x{h} image")
|
|
|
|
if content_w < w * 0.3 or content_h < h * 0.3:
|
|
# Fallback if detection seems wrong
|
|
left_x, right_x = 0, w
|
|
top_y, bottom_y = 0, h
|
|
content_w, content_h = w, h
|
|
|
|
# --- Vertical projection within content area to find column separators ---
|
|
content_strip = inv[top_y:bottom_y, left_x:right_x]
|
|
v_proj = np.sum(content_strip, axis=0).astype(float)
|
|
v_proj_norm = v_proj / (content_h * 255) if content_h > 0 else v_proj
|
|
|
|
# Smooth the projection profile
|
|
kernel_size = max(5, content_w // 50)
|
|
if kernel_size % 2 == 0:
|
|
kernel_size += 1
|
|
v_proj_smooth = np.convolve(v_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
|
|
|
|
# Debug: log projection profile statistics
|
|
p_mean = float(np.mean(v_proj_smooth))
|
|
p_median = float(np.median(v_proj_smooth))
|
|
p_min = float(np.min(v_proj_smooth))
|
|
p_max = float(np.max(v_proj_smooth))
|
|
logger.info(f"Layout: v_proj stats — min={p_min:.4f}, max={p_max:.4f}, "
|
|
f"mean={p_mean:.4f}, median={p_median:.4f}")
|
|
|
|
# Find valleys using multiple threshold strategies
|
|
# Strategy 1: relative to median (catches clear separators)
|
|
# Strategy 2: local minima approach (catches subtle gaps)
|
|
threshold = max(p_median * 0.3, p_mean * 0.2)
|
|
logger.info(f"Layout: valley threshold={threshold:.4f}")
|
|
|
|
in_valley = v_proj_smooth < threshold
|
|
|
|
# Find contiguous valley regions
|
|
all_valleys = []
|
|
start = None
|
|
for x in range(len(v_proj_smooth)):
|
|
if in_valley[x] and start is None:
|
|
start = x
|
|
elif not in_valley[x] and start is not None:
|
|
valley_width = x - start
|
|
valley_depth = float(np.min(v_proj_smooth[start:x]))
|
|
# Valley must be at least 3px wide
|
|
if valley_width >= 3:
|
|
all_valleys.append((start, x, (start + x) // 2, valley_width, valley_depth))
|
|
start = None
|
|
|
|
logger.info(f"Layout: raw valleys (before filter): {len(all_valleys)} — "
|
|
f"{[(v[0]+left_x, v[1]+left_x, v[3], f'{v[4]:.4f}') for v in all_valleys[:10]]}")
|
|
|
|
# Filter: valleys must be inside the content area (not at edges)
|
|
inner_margin = int(content_w * 0.08)
|
|
valleys = [v for v in all_valleys if inner_margin < v[2] < content_w - inner_margin]
|
|
|
|
# If no valleys found with strict threshold, try local minima approach
|
|
if len(valleys) < 2:
|
|
logger.info("Layout: trying local minima approach for column detection")
|
|
# Divide content into 20 segments, find the 2 lowest
|
|
seg_count = 20
|
|
seg_width = content_w // seg_count
|
|
seg_scores = []
|
|
for i in range(seg_count):
|
|
sx = i * seg_width
|
|
ex = min((i + 1) * seg_width, content_w)
|
|
seg_mean = float(np.mean(v_proj_smooth[sx:ex]))
|
|
seg_scores.append((i, sx, ex, seg_mean))
|
|
|
|
seg_scores.sort(key=lambda s: s[3])
|
|
logger.info(f"Layout: segment scores (lowest 5): "
|
|
f"{[(s[0], s[1]+left_x, s[2]+left_x, f'{s[3]:.4f}') for s in seg_scores[:5]]}")
|
|
|
|
# Find two lowest non-adjacent segments that create reasonable columns
|
|
candidate_valleys = []
|
|
for seg_idx, sx, ex, seg_mean in seg_scores:
|
|
# Must not be at the edges
|
|
if seg_idx <= 1 or seg_idx >= seg_count - 2:
|
|
continue
|
|
# Must be significantly lower than overall mean
|
|
if seg_mean < p_mean * 0.6:
|
|
center = (sx + ex) // 2
|
|
candidate_valleys.append((sx, ex, center, ex - sx, seg_mean))
|
|
|
|
if len(candidate_valleys) >= 2:
|
|
# Pick the best pair: non-adjacent, creating reasonable column widths
|
|
candidate_valleys.sort(key=lambda v: v[2])
|
|
best_pair = None
|
|
best_score = float('inf')
|
|
for i in range(len(candidate_valleys)):
|
|
for j in range(i + 1, len(candidate_valleys)):
|
|
c1 = candidate_valleys[i][2]
|
|
c2 = candidate_valleys[j][2]
|
|
# Must be at least 20% apart
|
|
if (c2 - c1) < content_w * 0.2:
|
|
continue
|
|
col1 = c1
|
|
col2 = c2 - c1
|
|
col3 = content_w - c2
|
|
# Each column at least 15%
|
|
if col1 < content_w * 0.12 or col2 < content_w * 0.12 or col3 < content_w * 0.12:
|
|
continue
|
|
parts = sorted([col1, col2, col3])
|
|
score = parts[2] - parts[0]
|
|
if score < best_score:
|
|
best_score = score
|
|
best_pair = (candidate_valleys[i], candidate_valleys[j])
|
|
|
|
if best_pair:
|
|
valleys = list(best_pair)
|
|
logger.info(f"Layout: local minima found 2 valleys: "
|
|
f"{[(v[0]+left_x, v[1]+left_x, v[3]) for v in valleys]}")
|
|
|
|
logger.info(f"Layout: final {len(valleys)} valleys: "
|
|
f"{[(v[0]+left_x, v[1]+left_x, v[3]) for v in valleys]}")
|
|
|
|
regions = []
|
|
|
|
if len(valleys) >= 2:
|
|
# 3-column layout detected
|
|
valleys.sort(key=lambda v: v[2])
|
|
|
|
if len(valleys) == 2:
|
|
sep1_center = valleys[0][2]
|
|
sep2_center = valleys[1][2]
|
|
else:
|
|
# Pick the two valleys that best divide into 3 parts
|
|
# Prefer wider valleys (more likely true separators)
|
|
best_pair = None
|
|
best_score = float('inf')
|
|
for i in range(len(valleys)):
|
|
for j in range(i + 1, len(valleys)):
|
|
c1, c2 = valleys[i][2], valleys[j][2]
|
|
# Each column should be at least 15% of content width
|
|
col1 = c1
|
|
col2 = c2 - c1
|
|
col3 = content_w - c2
|
|
if col1 < content_w * 0.15 or col2 < content_w * 0.15 or col3 < content_w * 0.15:
|
|
continue
|
|
# Score: lower is better (more even distribution)
|
|
parts = sorted([col1, col2, col3])
|
|
score = parts[2] - parts[0]
|
|
# Bonus for wider valleys (subtract valley width)
|
|
score -= (valleys[i][3] + valleys[j][3]) * 0.5
|
|
if score < best_score:
|
|
best_score = score
|
|
best_pair = (c1, c2)
|
|
if best_pair:
|
|
sep1_center, sep2_center = best_pair
|
|
else:
|
|
sep1_center = valleys[0][2]
|
|
sep2_center = valleys[1][2]
|
|
|
|
# Convert from content-relative to absolute coordinates
|
|
abs_sep1 = sep1_center + left_x
|
|
abs_sep2 = sep2_center + left_x
|
|
|
|
logger.info(f"Layout: 3 columns at separators x={abs_sep1}, x={abs_sep2} "
|
|
f"(widths: {abs_sep1}, {abs_sep2-abs_sep1}, {w-abs_sep2})")
|
|
|
|
regions.append(PageRegion(
|
|
type='column_en', x=0, y=top_y,
|
|
width=abs_sep1, height=content_h
|
|
))
|
|
regions.append(PageRegion(
|
|
type='column_de', x=abs_sep1, y=top_y,
|
|
width=abs_sep2 - abs_sep1, height=content_h
|
|
))
|
|
regions.append(PageRegion(
|
|
type='column_example', x=abs_sep2, y=top_y,
|
|
width=w - abs_sep2, height=content_h
|
|
))
|
|
|
|
elif len(valleys) == 1:
|
|
# 2-column layout
|
|
abs_sep = valleys[0][2] + left_x
|
|
|
|
logger.info(f"Layout: 2 columns at separator x={abs_sep}")
|
|
|
|
regions.append(PageRegion(
|
|
type='column_en', x=0, y=top_y,
|
|
width=abs_sep, height=content_h
|
|
))
|
|
regions.append(PageRegion(
|
|
type='column_de', x=abs_sep, y=top_y,
|
|
width=w - abs_sep, height=content_h
|
|
))
|
|
|
|
else:
|
|
# No columns detected — run full-page OCR as single column
|
|
logger.warning("Layout: no column separators found, using full page")
|
|
regions.append(PageRegion(
|
|
type='column_en', x=0, y=top_y,
|
|
width=w, height=content_h
|
|
))
|
|
|
|
# Add header/footer info
|
|
if top_y > 10:
|
|
regions.append(PageRegion(
|
|
type='header', x=0, y=0,
|
|
width=w, height=top_y
|
|
))
|
|
if bottom_y < h - 10:
|
|
regions.append(PageRegion(
|
|
type='footer', x=0, y=bottom_y,
|
|
width=w, height=h - bottom_y
|
|
))
|
|
|
|
col_count = len([r for r in regions if r.type.startswith('column')])
|
|
logger.info(f"Layout: {col_count} columns, "
|
|
f"header={'yes' if top_y > 10 else 'no'}, "
|
|
f"footer={'yes' if bottom_y < h - 10 else 'no'}")
|
|
|
|
return regions
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 6: Multi-Pass OCR
|
|
# =============================================================================
|
|
|
|
def ocr_region(ocr_img: np.ndarray, region: PageRegion, lang: str,
|
|
psm: int, fallback_psm: Optional[int] = None,
|
|
min_confidence: float = 40.0) -> List[Dict[str, Any]]:
|
|
"""Run Tesseract OCR on a specific region with given PSM.
|
|
|
|
Args:
|
|
ocr_img: Binarized full-page image.
|
|
region: Region to crop and OCR.
|
|
lang: Tesseract language string.
|
|
psm: Page Segmentation Mode.
|
|
fallback_psm: If confidence too low, retry with this PSM per line.
|
|
min_confidence: Minimum average confidence before fallback.
|
|
|
|
Returns:
|
|
List of word dicts with text, position, confidence.
|
|
"""
|
|
# Crop region
|
|
crop = ocr_img[region.y:region.y + region.height,
|
|
region.x:region.x + region.width]
|
|
|
|
if crop.size == 0:
|
|
return []
|
|
|
|
# Convert to PIL for pytesseract
|
|
pil_img = Image.fromarray(crop)
|
|
|
|
# Run Tesseract with specified PSM
|
|
config = f'--psm {psm} --oem 3'
|
|
try:
|
|
data = pytesseract.image_to_data(pil_img, lang=lang, config=config,
|
|
output_type=pytesseract.Output.DICT)
|
|
except Exception as e:
|
|
logger.warning(f"Tesseract failed for region {region.type}: {e}")
|
|
return []
|
|
|
|
words = []
|
|
for i in range(len(data['text'])):
|
|
text = data['text'][i].strip()
|
|
conf = int(data['conf'][i])
|
|
if not text or conf < 10:
|
|
continue
|
|
words.append({
|
|
'text': text,
|
|
'left': data['left'][i] + region.x, # Absolute coords
|
|
'top': data['top'][i] + region.y,
|
|
'width': data['width'][i],
|
|
'height': data['height'][i],
|
|
'conf': conf,
|
|
'region_type': region.type,
|
|
})
|
|
|
|
# Check average confidence
|
|
if words and fallback_psm is not None:
|
|
avg_conf = sum(w['conf'] for w in words) / len(words)
|
|
if avg_conf < min_confidence:
|
|
logger.info(f"Region {region.type}: avg confidence {avg_conf:.0f}% < {min_confidence}%, "
|
|
f"trying fallback PSM {fallback_psm}")
|
|
words = _ocr_region_line_by_line(ocr_img, region, lang, fallback_psm)
|
|
|
|
return words
|
|
|
|
|
|
def _ocr_region_line_by_line(ocr_img: np.ndarray, region: PageRegion,
|
|
lang: str, psm: int) -> List[Dict[str, Any]]:
|
|
"""OCR a region line by line (fallback for low-confidence regions).
|
|
|
|
Splits the region into horizontal strips based on text density,
|
|
then OCRs each strip individually with the given PSM.
|
|
"""
|
|
crop = ocr_img[region.y:region.y + region.height,
|
|
region.x:region.x + region.width]
|
|
|
|
if crop.size == 0:
|
|
return []
|
|
|
|
# Find text lines via horizontal projection
|
|
inv = cv2.bitwise_not(crop)
|
|
h_proj = np.sum(inv, axis=1)
|
|
threshold = np.max(h_proj) * 0.05 if np.max(h_proj) > 0 else 0
|
|
|
|
# Find line boundaries
|
|
lines = []
|
|
in_text = False
|
|
line_start = 0
|
|
for y in range(len(h_proj)):
|
|
if h_proj[y] > threshold and not in_text:
|
|
line_start = y
|
|
in_text = True
|
|
elif h_proj[y] <= threshold and in_text:
|
|
if y - line_start > 5: # Minimum line height
|
|
lines.append((line_start, y))
|
|
in_text = False
|
|
if in_text and len(h_proj) - line_start > 5:
|
|
lines.append((line_start, len(h_proj)))
|
|
|
|
all_words = []
|
|
config = f'--psm {psm} --oem 3'
|
|
|
|
for line_y_start, line_y_end in lines:
|
|
# Add small padding
|
|
pad = 3
|
|
y1 = max(0, line_y_start - pad)
|
|
y2 = min(crop.shape[0], line_y_end + pad)
|
|
line_crop = crop[y1:y2, :]
|
|
|
|
if line_crop.size == 0:
|
|
continue
|
|
|
|
pil_img = Image.fromarray(line_crop)
|
|
try:
|
|
data = pytesseract.image_to_data(pil_img, lang=lang, config=config,
|
|
output_type=pytesseract.Output.DICT)
|
|
except Exception:
|
|
continue
|
|
|
|
for i in range(len(data['text'])):
|
|
text = data['text'][i].strip()
|
|
conf = int(data['conf'][i])
|
|
if not text or conf < 10:
|
|
continue
|
|
all_words.append({
|
|
'text': text,
|
|
'left': data['left'][i] + region.x,
|
|
'top': data['top'][i] + region.y + y1,
|
|
'width': data['width'][i],
|
|
'height': data['height'][i],
|
|
'conf': conf,
|
|
'region_type': region.type,
|
|
})
|
|
|
|
return all_words
|
|
|
|
|
|
def run_multi_pass_ocr(ocr_img: np.ndarray,
|
|
regions: List[PageRegion],
|
|
lang: str = "eng+deu") -> Dict[str, List[Dict]]:
|
|
"""Run OCR on each detected region with optimized settings.
|
|
|
|
Args:
|
|
ocr_img: Binarized full-page image.
|
|
regions: Detected page regions.
|
|
lang: Default language.
|
|
|
|
Returns:
|
|
Dict mapping region type to list of word dicts.
|
|
"""
|
|
results: Dict[str, List[Dict]] = {}
|
|
|
|
for region in regions:
|
|
if region.type == 'header' or region.type == 'footer':
|
|
continue # Skip non-content regions
|
|
|
|
if region.type == 'column_en':
|
|
words = ocr_region(ocr_img, region, lang='eng', psm=4)
|
|
elif region.type == 'column_de':
|
|
words = ocr_region(ocr_img, region, lang='deu', psm=4)
|
|
elif region.type == 'column_example':
|
|
words = ocr_region(ocr_img, region, lang=lang, psm=6,
|
|
fallback_psm=7, min_confidence=40.0)
|
|
else:
|
|
words = ocr_region(ocr_img, region, lang=lang, psm=6)
|
|
|
|
results[region.type] = words
|
|
logger.info(f"OCR {region.type}: {len(words)} words")
|
|
|
|
return results
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 7: Line Alignment → Vocabulary Entries
|
|
# =============================================================================
|
|
|
|
def _group_words_into_lines(words: List[Dict], y_tolerance_px: int = 20) -> List[List[Dict]]:
|
|
"""Group words by Y position into lines, sorted by X within each line."""
|
|
if not words:
|
|
return []
|
|
|
|
sorted_words = sorted(words, key=lambda w: (w['top'], w['left']))
|
|
lines: List[List[Dict]] = []
|
|
current_line: List[Dict] = [sorted_words[0]]
|
|
current_y = sorted_words[0]['top']
|
|
|
|
for word in sorted_words[1:]:
|
|
if abs(word['top'] - current_y) <= y_tolerance_px:
|
|
current_line.append(word)
|
|
else:
|
|
current_line.sort(key=lambda w: w['left'])
|
|
lines.append(current_line)
|
|
current_line = [word]
|
|
current_y = word['top']
|
|
|
|
if current_line:
|
|
current_line.sort(key=lambda w: w['left'])
|
|
lines.append(current_line)
|
|
|
|
return lines
|
|
|
|
|
|
def match_lines_to_vocab(ocr_results: Dict[str, List[Dict]],
|
|
regions: List[PageRegion],
|
|
y_tolerance_px: int = 25) -> List[VocabRow]:
|
|
"""Align OCR results from different columns into vocabulary rows.
|
|
|
|
Uses Y-coordinate matching to pair English words, German translations,
|
|
and example sentences that appear on the same line.
|
|
|
|
Args:
|
|
ocr_results: Dict mapping region type to word lists.
|
|
regions: Detected regions (for reference).
|
|
y_tolerance_px: Max Y-distance to consider words on the same row.
|
|
|
|
Returns:
|
|
List of VocabRow objects.
|
|
"""
|
|
# Group words into lines per column
|
|
en_lines = _group_words_into_lines(ocr_results.get('column_en', []), y_tolerance_px)
|
|
de_lines = _group_words_into_lines(ocr_results.get('column_de', []), y_tolerance_px)
|
|
ex_lines = _group_words_into_lines(ocr_results.get('column_example', []), y_tolerance_px)
|
|
|
|
def line_y_center(line: List[Dict]) -> float:
|
|
return sum(w['top'] + w['height'] / 2 for w in line) / len(line)
|
|
|
|
def line_text(line: List[Dict]) -> str:
|
|
return ' '.join(w['text'] for w in line)
|
|
|
|
def line_confidence(line: List[Dict]) -> float:
|
|
return sum(w['conf'] for w in line) / len(line) if line else 0
|
|
|
|
# Build EN entries as the primary reference
|
|
vocab_rows: List[VocabRow] = []
|
|
|
|
for en_line in en_lines:
|
|
en_y = line_y_center(en_line)
|
|
en_text = line_text(en_line)
|
|
en_conf = line_confidence(en_line)
|
|
|
|
# Skip very short or likely header content
|
|
if len(en_text.strip()) < 2:
|
|
continue
|
|
|
|
# Find matching DE line
|
|
de_text = ""
|
|
de_conf = 0.0
|
|
best_de_dist = float('inf')
|
|
best_de_idx = -1
|
|
for idx, de_line in enumerate(de_lines):
|
|
dist = abs(line_y_center(de_line) - en_y)
|
|
if dist < y_tolerance_px and dist < best_de_dist:
|
|
best_de_dist = dist
|
|
best_de_idx = idx
|
|
|
|
if best_de_idx >= 0:
|
|
de_text = line_text(de_lines[best_de_idx])
|
|
de_conf = line_confidence(de_lines[best_de_idx])
|
|
|
|
# Find matching example line
|
|
ex_text = ""
|
|
ex_conf = 0.0
|
|
best_ex_dist = float('inf')
|
|
best_ex_idx = -1
|
|
for idx, ex_line in enumerate(ex_lines):
|
|
dist = abs(line_y_center(ex_line) - en_y)
|
|
if dist < y_tolerance_px and dist < best_ex_dist:
|
|
best_ex_dist = dist
|
|
best_ex_idx = idx
|
|
|
|
if best_ex_idx >= 0:
|
|
ex_text = line_text(ex_lines[best_ex_idx])
|
|
ex_conf = line_confidence(ex_lines[best_ex_idx])
|
|
|
|
avg_conf = en_conf
|
|
conf_count = 1
|
|
if de_conf > 0:
|
|
avg_conf += de_conf
|
|
conf_count += 1
|
|
if ex_conf > 0:
|
|
avg_conf += ex_conf
|
|
conf_count += 1
|
|
|
|
vocab_rows.append(VocabRow(
|
|
english=en_text.strip(),
|
|
german=de_text.strip(),
|
|
example=ex_text.strip(),
|
|
confidence=avg_conf / conf_count,
|
|
y_position=int(en_y),
|
|
))
|
|
|
|
# Handle multi-line wrapping in example column:
|
|
# If an example line has no matching EN/DE, append to previous entry
|
|
matched_ex_ys = set()
|
|
for row in vocab_rows:
|
|
if row.example:
|
|
matched_ex_ys.add(row.y_position)
|
|
|
|
for ex_line in ex_lines:
|
|
ex_y = line_y_center(ex_line)
|
|
# Check if already matched
|
|
already_matched = any(abs(ex_y - y) < y_tolerance_px for y in matched_ex_ys)
|
|
if already_matched:
|
|
continue
|
|
|
|
# Find nearest previous vocab row
|
|
best_row = None
|
|
best_dist = float('inf')
|
|
for row in vocab_rows:
|
|
dist = ex_y - row.y_position
|
|
if 0 < dist < y_tolerance_px * 3 and dist < best_dist:
|
|
best_dist = dist
|
|
best_row = row
|
|
|
|
if best_row:
|
|
continuation = line_text(ex_line).strip()
|
|
if continuation:
|
|
best_row.example = (best_row.example + " " + continuation).strip()
|
|
|
|
# Sort by Y position
|
|
vocab_rows.sort(key=lambda r: r.y_position)
|
|
|
|
return vocab_rows
|
|
|
|
|
|
# =============================================================================
|
|
# Stage 8: Optional LLM Post-Correction
|
|
# =============================================================================
|
|
|
|
async def llm_post_correct(img: np.ndarray, vocab_rows: List[VocabRow],
|
|
confidence_threshold: float = 50.0,
|
|
enabled: bool = False) -> List[VocabRow]:
|
|
"""Optionally send low-confidence regions to Qwen-VL for correction.
|
|
|
|
Default: disabled. Enable per parameter.
|
|
|
|
Args:
|
|
img: Original BGR image.
|
|
vocab_rows: Current vocabulary rows.
|
|
confidence_threshold: Rows below this get LLM correction.
|
|
enabled: Whether to actually run LLM correction.
|
|
|
|
Returns:
|
|
Corrected vocabulary rows.
|
|
"""
|
|
if not enabled:
|
|
return vocab_rows
|
|
|
|
# TODO: Implement Qwen-VL correction for low-confidence entries
|
|
# For each row with confidence < threshold:
|
|
# 1. Crop the relevant region from img
|
|
# 2. Send crop + OCR text to Qwen-VL
|
|
# 3. Replace text if LLM provides a confident correction
|
|
logger.info(f"LLM post-correction skipped (not yet implemented)")
|
|
return vocab_rows
|
|
|
|
|
|
# =============================================================================
|
|
# Orchestrator
|
|
# =============================================================================
|
|
|
|
async def run_cv_pipeline(
|
|
pdf_data: Optional[bytes] = None,
|
|
image_data: Optional[bytes] = None,
|
|
page_number: int = 0,
|
|
zoom: float = 3.0,
|
|
enable_dewarp: bool = True,
|
|
enable_llm_correction: bool = False,
|
|
lang: str = "eng+deu",
|
|
) -> PipelineResult:
|
|
"""Run the complete CV document reconstruction pipeline.
|
|
|
|
Args:
|
|
pdf_data: Raw PDF bytes (mutually exclusive with image_data).
|
|
image_data: Raw image bytes (mutually exclusive with pdf_data).
|
|
page_number: 0-indexed page number (for PDF).
|
|
zoom: PDF rendering zoom factor.
|
|
enable_dewarp: Whether to run dewarp stage.
|
|
enable_llm_correction: Whether to run LLM post-correction.
|
|
lang: Tesseract language string.
|
|
|
|
Returns:
|
|
PipelineResult with vocabulary and timing info.
|
|
"""
|
|
if not CV_PIPELINE_AVAILABLE:
|
|
return PipelineResult(error="CV pipeline not available (OpenCV or Tesseract missing)")
|
|
|
|
result = PipelineResult()
|
|
total_start = time.time()
|
|
|
|
try:
|
|
# Stage 1: Render
|
|
t = time.time()
|
|
if pdf_data:
|
|
img = render_pdf_high_res(pdf_data, page_number, zoom)
|
|
elif image_data:
|
|
img = render_image_high_res(image_data)
|
|
else:
|
|
return PipelineResult(error="No input data (pdf_data or image_data required)")
|
|
result.stages['render'] = round(time.time() - t, 2)
|
|
result.image_width = img.shape[1]
|
|
result.image_height = img.shape[0]
|
|
logger.info(f"Stage 1 (render): {img.shape[1]}x{img.shape[0]} in {result.stages['render']}s")
|
|
|
|
# Stage 2: Deskew
|
|
t = time.time()
|
|
img, angle = deskew_image(img)
|
|
result.stages['deskew'] = round(time.time() - t, 2)
|
|
logger.info(f"Stage 2 (deskew): {angle:.2f}° in {result.stages['deskew']}s")
|
|
|
|
# Stage 3: Dewarp
|
|
if enable_dewarp:
|
|
t = time.time()
|
|
img = dewarp_image(img)
|
|
result.stages['dewarp'] = round(time.time() - t, 2)
|
|
|
|
# Stage 4: Dual image preparation
|
|
t = time.time()
|
|
ocr_img = create_ocr_image(img)
|
|
layout_img = create_layout_image(img)
|
|
result.stages['image_prep'] = round(time.time() - t, 2)
|
|
|
|
# Stage 5: Layout analysis
|
|
t = time.time()
|
|
regions = analyze_layout(layout_img, ocr_img)
|
|
result.stages['layout'] = round(time.time() - t, 2)
|
|
result.columns_detected = len([r for r in regions if r.type.startswith('column')])
|
|
logger.info(f"Stage 5 (layout): {result.columns_detected} columns in {result.stages['layout']}s")
|
|
|
|
# Stage 6: Multi-pass OCR
|
|
t = time.time()
|
|
ocr_results = run_multi_pass_ocr(ocr_img, regions, lang)
|
|
result.stages['ocr'] = round(time.time() - t, 2)
|
|
total_words = sum(len(w) for w in ocr_results.values())
|
|
result.word_count = total_words
|
|
logger.info(f"Stage 6 (OCR): {total_words} words in {result.stages['ocr']}s")
|
|
|
|
# Stage 7: Line alignment
|
|
t = time.time()
|
|
vocab_rows = match_lines_to_vocab(ocr_results, regions)
|
|
result.stages['alignment'] = round(time.time() - t, 2)
|
|
|
|
# Stage 8: Optional LLM correction
|
|
if enable_llm_correction:
|
|
t = time.time()
|
|
vocab_rows = await llm_post_correct(img, vocab_rows)
|
|
result.stages['llm_correction'] = round(time.time() - t, 2)
|
|
|
|
# Convert to output format
|
|
result.vocabulary = [
|
|
{
|
|
"english": row.english,
|
|
"german": row.german,
|
|
"example": row.example,
|
|
"confidence": round(row.confidence, 1),
|
|
}
|
|
for row in vocab_rows
|
|
if row.english or row.german # Skip empty rows
|
|
]
|
|
|
|
result.duration_seconds = round(time.time() - total_start, 2)
|
|
logger.info(f"CV Pipeline complete: {len(result.vocabulary)} entries in {result.duration_seconds}s")
|
|
|
|
except Exception as e:
|
|
logger.error(f"CV Pipeline error: {e}")
|
|
import traceback
|
|
logger.debug(traceback.format_exc())
|
|
result.error = str(e)
|
|
result.duration_seconds = round(time.time() - total_start, 2)
|
|
|
|
return result
|