This repository has been archived on 2026-02-15. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
breakpilot-pwa/klausur-service/backend/cv_vocab_pipeline.py
BreakPilot Dev 945b955b54 feat(ocr): Word-based image deskew for Ground Truth pipeline
Begradigt schiefe Scans vor der OCR-Extraktion anhand der linksbuendigen
Wortanfaenge der Vokabelspalte. Tesseract liefert achsenparallele Boxen,
die bei ~2-3 Grad Schraege in Nachbarzeilen bluten — der Deskew behebt das.

- Neue Funktion deskew_image_by_word_alignment() in cv_vocab_pipeline.py
- Deskew-Integration im extract-with-boxes Endpoint (vor OCR)
- Neuer GET Endpoint /deskewed-image/{page} fuer begradigtes Seitenbild
- Frontend: GroundTruthPanel wechselt nach Extraktion auf deskewed Image
- ~1s Overhead durch schnellen Tesseract-Pass auf halbiertem Bild

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 12:14:44 +01:00

1141 lines
39 KiB
Python

"""
CV-based Document Reconstruction Pipeline for Vocabulary Extraction.
Uses classical Computer Vision techniques for high-quality OCR:
- High-resolution PDF rendering (432 DPI)
- Deskew (rotation correction via Hough Lines)
- Dewarp (book curvature correction) — pass-through initially
- Dual image preparation (binarized for OCR, CLAHE for layout)
- Projection-profile layout analysis (column/row detection)
- Multi-pass Tesseract OCR with region-specific PSM settings
- Y-coordinate line alignment for vocabulary matching
- Optional LLM post-correction for low-confidence regions
Lizenz: Apache 2.0 (kommerziell nutzbar)
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import io
import logging
import time
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
logger = logging.getLogger(__name__)
# --- Availability Guards ---
try:
import cv2
CV2_AVAILABLE = True
except ImportError:
cv2 = None
CV2_AVAILABLE = False
logger.warning("OpenCV not available — CV pipeline disabled")
try:
import pytesseract
from PIL import Image
TESSERACT_AVAILABLE = True
except ImportError:
pytesseract = None
Image = None
TESSERACT_AVAILABLE = False
logger.warning("pytesseract/Pillow not available — CV pipeline disabled")
CV_PIPELINE_AVAILABLE = CV2_AVAILABLE and TESSERACT_AVAILABLE
# --- Data Classes ---
@dataclass
class PageRegion:
"""A detected region on the page."""
type: str # 'column_en', 'column_de', 'column_example', 'header', 'footer'
x: int
y: int
width: int
height: int
@dataclass
class VocabRow:
"""A single vocabulary entry assembled from multi-column OCR."""
english: str = ""
german: str = ""
example: str = ""
confidence: float = 0.0
y_position: int = 0
@dataclass
class PipelineResult:
"""Complete result of the CV pipeline."""
vocabulary: List[Dict[str, Any]] = field(default_factory=list)
word_count: int = 0
columns_detected: int = 0
duration_seconds: float = 0.0
stages: Dict[str, float] = field(default_factory=dict)
error: Optional[str] = None
image_width: int = 0
image_height: int = 0
# =============================================================================
# Stage 1: High-Resolution PDF Rendering
# =============================================================================
def render_pdf_high_res(pdf_data: bytes, page_number: int = 0, zoom: float = 3.0) -> np.ndarray:
"""Render a PDF page to a high-resolution numpy array (BGR).
Args:
pdf_data: Raw PDF bytes.
page_number: 0-indexed page number.
zoom: Zoom factor (3.0 = 432 DPI).
Returns:
numpy array in BGR format.
"""
import fitz # PyMuPDF
pdf_doc = fitz.open(stream=pdf_data, filetype="pdf")
if page_number >= pdf_doc.page_count:
raise ValueError(f"Page {page_number} does not exist (PDF has {pdf_doc.page_count} pages)")
page = pdf_doc[page_number]
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
# Convert to numpy BGR
img_data = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n)
if pix.n == 4: # RGBA
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGBA2BGR)
elif pix.n == 3: # RGB
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGB2BGR)
else: # Grayscale
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_GRAY2BGR)
pdf_doc.close()
return img_bgr
def render_image_high_res(image_data: bytes) -> np.ndarray:
"""Load an image (PNG/JPEG) into a numpy array (BGR).
Args:
image_data: Raw image bytes.
Returns:
numpy array in BGR format.
"""
img_array = np.frombuffer(image_data, dtype=np.uint8)
img_bgr = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
if img_bgr is None:
raise ValueError("Could not decode image data")
return img_bgr
# =============================================================================
# Stage 2: Deskew (Rotation Correction)
# =============================================================================
def deskew_image(img: np.ndarray) -> Tuple[np.ndarray, float]:
"""Correct rotation using Hough Line detection.
Args:
img: BGR image.
Returns:
Tuple of (corrected image, detected angle in degrees).
"""
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Binarize for line detection
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
# Detect lines
lines = cv2.HoughLinesP(binary, 1, np.pi / 180, threshold=100,
minLineLength=img.shape[1] // 4, maxLineGap=20)
if lines is None or len(lines) < 3:
return img, 0.0
# Compute angles of near-horizontal lines
angles = []
for line in lines:
x1, y1, x2, y2 = line[0]
angle = np.degrees(np.arctan2(y2 - y1, x2 - x1))
if abs(angle) < 15: # Only near-horizontal
angles.append(angle)
if not angles:
return img, 0.0
median_angle = float(np.median(angles))
# Limit correction to ±5°
if abs(median_angle) > 5.0:
median_angle = 5.0 * np.sign(median_angle)
if abs(median_angle) < 0.1:
return img, 0.0
# Rotate
h, w = img.shape[:2]
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, median_angle, 1.0)
corrected = cv2.warpAffine(img, M, (w, h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
logger.info(f"Deskew: corrected {median_angle:.2f}° rotation")
return corrected, median_angle
def deskew_image_by_word_alignment(
image_data: bytes,
lang: str = "eng+deu",
downscale_factor: float = 0.5,
) -> Tuple[bytes, float]:
"""Correct rotation by fitting a line through left-most word starts per text line.
More robust than Hough-based deskew for vocabulary worksheets where text lines
have consistent left-alignment. Runs a quick Tesseract pass on a downscaled
copy to find word positions, computes the dominant left-edge column, fits a
line through those points and rotates the full-resolution image.
Args:
image_data: Raw image bytes (PNG/JPEG).
lang: Tesseract language string for the quick pass.
downscale_factor: Shrink factor for the quick Tesseract pass (0.5 = 50%).
Returns:
Tuple of (rotated image as PNG bytes, detected angle in degrees).
"""
if not CV2_AVAILABLE or not TESSERACT_AVAILABLE:
return image_data, 0.0
# 1. Decode image
img_array = np.frombuffer(image_data, dtype=np.uint8)
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
if img is None:
logger.warning("deskew_by_word_alignment: could not decode image")
return image_data, 0.0
orig_h, orig_w = img.shape[:2]
# 2. Downscale for fast Tesseract pass
small_w = int(orig_w * downscale_factor)
small_h = int(orig_h * downscale_factor)
small = cv2.resize(img, (small_w, small_h), interpolation=cv2.INTER_AREA)
# 3. Quick Tesseract — word-level positions
pil_small = Image.fromarray(cv2.cvtColor(small, cv2.COLOR_BGR2RGB))
try:
data = pytesseract.image_to_data(
pil_small, lang=lang, config="--psm 6 --oem 3",
output_type=pytesseract.Output.DICT,
)
except Exception as e:
logger.warning(f"deskew_by_word_alignment: Tesseract failed: {e}")
return image_data, 0.0
# 4. Per text-line, find the left-most word start
# Group by (block_num, par_num, line_num)
from collections import defaultdict
line_groups: Dict[tuple, list] = defaultdict(list)
for i in range(len(data["text"])):
text = (data["text"][i] or "").strip()
conf = int(data["conf"][i])
if not text or conf < 20:
continue
key = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
line_groups[key].append(i)
if len(line_groups) < 5:
logger.info(f"deskew_by_word_alignment: only {len(line_groups)} lines, skipping")
return image_data, 0.0
# For each line, pick the word with smallest 'left' → compute (left_x, center_y)
# Scale back to original resolution
scale = 1.0 / downscale_factor
points = [] # list of (x, y) in original-image coords
for key, indices in line_groups.items():
best_idx = min(indices, key=lambda i: data["left"][i])
lx = data["left"][best_idx] * scale
top = data["top"][best_idx] * scale
h = data["height"][best_idx] * scale
cy = top + h / 2.0
points.append((lx, cy))
# 5. Find dominant left-edge column + compute angle
xs = np.array([p[0] for p in points])
ys = np.array([p[1] for p in points])
median_x = float(np.median(xs))
tolerance = orig_w * 0.03 # 3% of image width
mask = np.abs(xs - median_x) <= tolerance
filtered_xs = xs[mask]
filtered_ys = ys[mask]
if len(filtered_xs) < 5:
logger.info(f"deskew_by_word_alignment: only {len(filtered_xs)} aligned points after filter, skipping")
return image_data, 0.0
# polyfit: x = a*y + b → a = dx/dy → angle = arctan(a)
coeffs = np.polyfit(filtered_ys, filtered_xs, 1)
slope = coeffs[0] # dx/dy
angle_rad = np.arctan(slope)
angle_deg = float(np.degrees(angle_rad))
# Clamp to ±5°
angle_deg = max(-5.0, min(5.0, angle_deg))
logger.info(f"deskew_by_word_alignment: detected {angle_deg:.2f}° from {len(filtered_xs)} points "
f"(total lines: {len(line_groups)})")
if abs(angle_deg) < 0.05:
return image_data, 0.0
# 6. Rotate full-res image
center = (orig_w // 2, orig_h // 2)
M = cv2.getRotationMatrix2D(center, angle_deg, 1.0)
rotated = cv2.warpAffine(img, M, (orig_w, orig_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
# Encode back to PNG
success, png_buf = cv2.imencode(".png", rotated)
if not success:
logger.warning("deskew_by_word_alignment: PNG encoding failed")
return image_data, 0.0
return png_buf.tobytes(), angle_deg
# =============================================================================
# Stage 3: Dewarp (Book Curvature) — Pass-Through for now
# =============================================================================
def dewarp_image(img: np.ndarray) -> np.ndarray:
"""Correct book curvature distortion.
Currently a pass-through. Will be implemented when book scans are tested.
Args:
img: BGR image.
Returns:
Corrected image (or original if no correction needed).
"""
# TODO: Implement polynomial fitting + cv2.remap() for book curvature
return img
# =============================================================================
# Stage 4: Dual Image Preparation
# =============================================================================
def create_ocr_image(img: np.ndarray) -> np.ndarray:
"""Create a binarized image optimized for Tesseract OCR.
Steps: Grayscale → Background normalization → Adaptive threshold → Denoise.
Args:
img: BGR image.
Returns:
Binary image (white text on black background inverted to black on white).
"""
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Background normalization: divide by blurred version
bg = cv2.GaussianBlur(gray, (51, 51), 0)
normalized = cv2.divide(gray, bg, scale=255)
# Adaptive binarization
binary = cv2.adaptiveThreshold(
normalized, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 31, 10
)
# Light denoise
denoised = cv2.medianBlur(binary, 3)
return denoised
def create_layout_image(img: np.ndarray) -> np.ndarray:
"""Create a CLAHE-enhanced grayscale image for layout analysis.
Args:
img: BGR image.
Returns:
Enhanced grayscale image.
"""
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(gray)
return enhanced
# =============================================================================
# Stage 5: Layout Analysis (Projection Profiles)
# =============================================================================
def _find_content_bounds(inv: np.ndarray) -> Tuple[int, int, int, int]:
"""Find the bounding box of actual text content (excluding page margins).
Returns:
Tuple of (left_x, right_x, top_y, bottom_y).
"""
h, w = inv.shape[:2]
# Horizontal projection for top/bottom
h_proj = np.sum(inv, axis=1).astype(float) / (w * 255)
top_y = 0
for y in range(h):
if h_proj[y] > 0.005:
top_y = max(0, y - 5)
break
bottom_y = h
for y in range(h - 1, 0, -1):
if h_proj[y] > 0.005:
bottom_y = min(h, y + 5)
break
# Vertical projection for left/right margins
v_proj = np.sum(inv[top_y:bottom_y, :], axis=0).astype(float)
v_proj_norm = v_proj / ((bottom_y - top_y) * 255) if (bottom_y - top_y) > 0 else v_proj
left_x = 0
for x in range(w):
if v_proj_norm[x] > 0.005:
left_x = max(0, x - 2)
break
right_x = w
for x in range(w - 1, 0, -1):
if v_proj_norm[x] > 0.005:
right_x = min(w, x + 2)
break
return left_x, right_x, top_y, bottom_y
def analyze_layout(layout_img: np.ndarray, ocr_img: np.ndarray) -> List[PageRegion]:
"""Detect columns, header, and footer using projection profiles.
Uses content-bounds detection to exclude page margins before searching
for column separators within the actual text area.
Args:
layout_img: CLAHE-enhanced grayscale image.
ocr_img: Binarized image for text density analysis.
Returns:
List of PageRegion objects describing detected regions.
"""
h, w = ocr_img.shape[:2]
# Invert: black text on white → white text on black for projection
inv = cv2.bitwise_not(ocr_img)
# --- Find actual content bounds (exclude page margins) ---
left_x, right_x, top_y, bottom_y = _find_content_bounds(inv)
content_w = right_x - left_x
content_h = bottom_y - top_y
logger.info(f"Layout: content bounds x=[{left_x}..{right_x}] ({content_w}px), "
f"y=[{top_y}..{bottom_y}] ({content_h}px) in {w}x{h} image")
if content_w < w * 0.3 or content_h < h * 0.3:
# Fallback if detection seems wrong
left_x, right_x = 0, w
top_y, bottom_y = 0, h
content_w, content_h = w, h
# --- Vertical projection within content area to find column separators ---
content_strip = inv[top_y:bottom_y, left_x:right_x]
v_proj = np.sum(content_strip, axis=0).astype(float)
v_proj_norm = v_proj / (content_h * 255) if content_h > 0 else v_proj
# Smooth the projection profile
kernel_size = max(5, content_w // 50)
if kernel_size % 2 == 0:
kernel_size += 1
v_proj_smooth = np.convolve(v_proj_norm, np.ones(kernel_size) / kernel_size, mode='same')
# Debug: log projection profile statistics
p_mean = float(np.mean(v_proj_smooth))
p_median = float(np.median(v_proj_smooth))
p_min = float(np.min(v_proj_smooth))
p_max = float(np.max(v_proj_smooth))
logger.info(f"Layout: v_proj stats — min={p_min:.4f}, max={p_max:.4f}, "
f"mean={p_mean:.4f}, median={p_median:.4f}")
# Find valleys using multiple threshold strategies
# Strategy 1: relative to median (catches clear separators)
# Strategy 2: local minima approach (catches subtle gaps)
threshold = max(p_median * 0.3, p_mean * 0.2)
logger.info(f"Layout: valley threshold={threshold:.4f}")
in_valley = v_proj_smooth < threshold
# Find contiguous valley regions
all_valleys = []
start = None
for x in range(len(v_proj_smooth)):
if in_valley[x] and start is None:
start = x
elif not in_valley[x] and start is not None:
valley_width = x - start
valley_depth = float(np.min(v_proj_smooth[start:x]))
# Valley must be at least 3px wide
if valley_width >= 3:
all_valleys.append((start, x, (start + x) // 2, valley_width, valley_depth))
start = None
logger.info(f"Layout: raw valleys (before filter): {len(all_valleys)}"
f"{[(v[0]+left_x, v[1]+left_x, v[3], f'{v[4]:.4f}') for v in all_valleys[:10]]}")
# Filter: valleys must be inside the content area (not at edges)
inner_margin = int(content_w * 0.08)
valleys = [v for v in all_valleys if inner_margin < v[2] < content_w - inner_margin]
# If no valleys found with strict threshold, try local minima approach
if len(valleys) < 2:
logger.info("Layout: trying local minima approach for column detection")
# Divide content into 20 segments, find the 2 lowest
seg_count = 20
seg_width = content_w // seg_count
seg_scores = []
for i in range(seg_count):
sx = i * seg_width
ex = min((i + 1) * seg_width, content_w)
seg_mean = float(np.mean(v_proj_smooth[sx:ex]))
seg_scores.append((i, sx, ex, seg_mean))
seg_scores.sort(key=lambda s: s[3])
logger.info(f"Layout: segment scores (lowest 5): "
f"{[(s[0], s[1]+left_x, s[2]+left_x, f'{s[3]:.4f}') for s in seg_scores[:5]]}")
# Find two lowest non-adjacent segments that create reasonable columns
candidate_valleys = []
for seg_idx, sx, ex, seg_mean in seg_scores:
# Must not be at the edges
if seg_idx <= 1 or seg_idx >= seg_count - 2:
continue
# Must be significantly lower than overall mean
if seg_mean < p_mean * 0.6:
center = (sx + ex) // 2
candidate_valleys.append((sx, ex, center, ex - sx, seg_mean))
if len(candidate_valleys) >= 2:
# Pick the best pair: non-adjacent, creating reasonable column widths
candidate_valleys.sort(key=lambda v: v[2])
best_pair = None
best_score = float('inf')
for i in range(len(candidate_valleys)):
for j in range(i + 1, len(candidate_valleys)):
c1 = candidate_valleys[i][2]
c2 = candidate_valleys[j][2]
# Must be at least 20% apart
if (c2 - c1) < content_w * 0.2:
continue
col1 = c1
col2 = c2 - c1
col3 = content_w - c2
# Each column at least 15%
if col1 < content_w * 0.12 or col2 < content_w * 0.12 or col3 < content_w * 0.12:
continue
parts = sorted([col1, col2, col3])
score = parts[2] - parts[0]
if score < best_score:
best_score = score
best_pair = (candidate_valleys[i], candidate_valleys[j])
if best_pair:
valleys = list(best_pair)
logger.info(f"Layout: local minima found 2 valleys: "
f"{[(v[0]+left_x, v[1]+left_x, v[3]) for v in valleys]}")
logger.info(f"Layout: final {len(valleys)} valleys: "
f"{[(v[0]+left_x, v[1]+left_x, v[3]) for v in valleys]}")
regions = []
if len(valleys) >= 2:
# 3-column layout detected
valleys.sort(key=lambda v: v[2])
if len(valleys) == 2:
sep1_center = valleys[0][2]
sep2_center = valleys[1][2]
else:
# Pick the two valleys that best divide into 3 parts
# Prefer wider valleys (more likely true separators)
best_pair = None
best_score = float('inf')
for i in range(len(valleys)):
for j in range(i + 1, len(valleys)):
c1, c2 = valleys[i][2], valleys[j][2]
# Each column should be at least 15% of content width
col1 = c1
col2 = c2 - c1
col3 = content_w - c2
if col1 < content_w * 0.15 or col2 < content_w * 0.15 or col3 < content_w * 0.15:
continue
# Score: lower is better (more even distribution)
parts = sorted([col1, col2, col3])
score = parts[2] - parts[0]
# Bonus for wider valleys (subtract valley width)
score -= (valleys[i][3] + valleys[j][3]) * 0.5
if score < best_score:
best_score = score
best_pair = (c1, c2)
if best_pair:
sep1_center, sep2_center = best_pair
else:
sep1_center = valleys[0][2]
sep2_center = valleys[1][2]
# Convert from content-relative to absolute coordinates
abs_sep1 = sep1_center + left_x
abs_sep2 = sep2_center + left_x
logger.info(f"Layout: 3 columns at separators x={abs_sep1}, x={abs_sep2} "
f"(widths: {abs_sep1}, {abs_sep2-abs_sep1}, {w-abs_sep2})")
regions.append(PageRegion(
type='column_en', x=0, y=top_y,
width=abs_sep1, height=content_h
))
regions.append(PageRegion(
type='column_de', x=abs_sep1, y=top_y,
width=abs_sep2 - abs_sep1, height=content_h
))
regions.append(PageRegion(
type='column_example', x=abs_sep2, y=top_y,
width=w - abs_sep2, height=content_h
))
elif len(valleys) == 1:
# 2-column layout
abs_sep = valleys[0][2] + left_x
logger.info(f"Layout: 2 columns at separator x={abs_sep}")
regions.append(PageRegion(
type='column_en', x=0, y=top_y,
width=abs_sep, height=content_h
))
regions.append(PageRegion(
type='column_de', x=abs_sep, y=top_y,
width=w - abs_sep, height=content_h
))
else:
# No columns detected — run full-page OCR as single column
logger.warning("Layout: no column separators found, using full page")
regions.append(PageRegion(
type='column_en', x=0, y=top_y,
width=w, height=content_h
))
# Add header/footer info
if top_y > 10:
regions.append(PageRegion(
type='header', x=0, y=0,
width=w, height=top_y
))
if bottom_y < h - 10:
regions.append(PageRegion(
type='footer', x=0, y=bottom_y,
width=w, height=h - bottom_y
))
col_count = len([r for r in regions if r.type.startswith('column')])
logger.info(f"Layout: {col_count} columns, "
f"header={'yes' if top_y > 10 else 'no'}, "
f"footer={'yes' if bottom_y < h - 10 else 'no'}")
return regions
# =============================================================================
# Stage 6: Multi-Pass OCR
# =============================================================================
def ocr_region(ocr_img: np.ndarray, region: PageRegion, lang: str,
psm: int, fallback_psm: Optional[int] = None,
min_confidence: float = 40.0) -> List[Dict[str, Any]]:
"""Run Tesseract OCR on a specific region with given PSM.
Args:
ocr_img: Binarized full-page image.
region: Region to crop and OCR.
lang: Tesseract language string.
psm: Page Segmentation Mode.
fallback_psm: If confidence too low, retry with this PSM per line.
min_confidence: Minimum average confidence before fallback.
Returns:
List of word dicts with text, position, confidence.
"""
# Crop region
crop = ocr_img[region.y:region.y + region.height,
region.x:region.x + region.width]
if crop.size == 0:
return []
# Convert to PIL for pytesseract
pil_img = Image.fromarray(crop)
# Run Tesseract with specified PSM
config = f'--psm {psm} --oem 3'
try:
data = pytesseract.image_to_data(pil_img, lang=lang, config=config,
output_type=pytesseract.Output.DICT)
except Exception as e:
logger.warning(f"Tesseract failed for region {region.type}: {e}")
return []
words = []
for i in range(len(data['text'])):
text = data['text'][i].strip()
conf = int(data['conf'][i])
if not text or conf < 10:
continue
words.append({
'text': text,
'left': data['left'][i] + region.x, # Absolute coords
'top': data['top'][i] + region.y,
'width': data['width'][i],
'height': data['height'][i],
'conf': conf,
'region_type': region.type,
})
# Check average confidence
if words and fallback_psm is not None:
avg_conf = sum(w['conf'] for w in words) / len(words)
if avg_conf < min_confidence:
logger.info(f"Region {region.type}: avg confidence {avg_conf:.0f}% < {min_confidence}%, "
f"trying fallback PSM {fallback_psm}")
words = _ocr_region_line_by_line(ocr_img, region, lang, fallback_psm)
return words
def _ocr_region_line_by_line(ocr_img: np.ndarray, region: PageRegion,
lang: str, psm: int) -> List[Dict[str, Any]]:
"""OCR a region line by line (fallback for low-confidence regions).
Splits the region into horizontal strips based on text density,
then OCRs each strip individually with the given PSM.
"""
crop = ocr_img[region.y:region.y + region.height,
region.x:region.x + region.width]
if crop.size == 0:
return []
# Find text lines via horizontal projection
inv = cv2.bitwise_not(crop)
h_proj = np.sum(inv, axis=1)
threshold = np.max(h_proj) * 0.05 if np.max(h_proj) > 0 else 0
# Find line boundaries
lines = []
in_text = False
line_start = 0
for y in range(len(h_proj)):
if h_proj[y] > threshold and not in_text:
line_start = y
in_text = True
elif h_proj[y] <= threshold and in_text:
if y - line_start > 5: # Minimum line height
lines.append((line_start, y))
in_text = False
if in_text and len(h_proj) - line_start > 5:
lines.append((line_start, len(h_proj)))
all_words = []
config = f'--psm {psm} --oem 3'
for line_y_start, line_y_end in lines:
# Add small padding
pad = 3
y1 = max(0, line_y_start - pad)
y2 = min(crop.shape[0], line_y_end + pad)
line_crop = crop[y1:y2, :]
if line_crop.size == 0:
continue
pil_img = Image.fromarray(line_crop)
try:
data = pytesseract.image_to_data(pil_img, lang=lang, config=config,
output_type=pytesseract.Output.DICT)
except Exception:
continue
for i in range(len(data['text'])):
text = data['text'][i].strip()
conf = int(data['conf'][i])
if not text or conf < 10:
continue
all_words.append({
'text': text,
'left': data['left'][i] + region.x,
'top': data['top'][i] + region.y + y1,
'width': data['width'][i],
'height': data['height'][i],
'conf': conf,
'region_type': region.type,
})
return all_words
def run_multi_pass_ocr(ocr_img: np.ndarray,
regions: List[PageRegion],
lang: str = "eng+deu") -> Dict[str, List[Dict]]:
"""Run OCR on each detected region with optimized settings.
Args:
ocr_img: Binarized full-page image.
regions: Detected page regions.
lang: Default language.
Returns:
Dict mapping region type to list of word dicts.
"""
results: Dict[str, List[Dict]] = {}
for region in regions:
if region.type == 'header' or region.type == 'footer':
continue # Skip non-content regions
if region.type == 'column_en':
words = ocr_region(ocr_img, region, lang='eng', psm=4)
elif region.type == 'column_de':
words = ocr_region(ocr_img, region, lang='deu', psm=4)
elif region.type == 'column_example':
words = ocr_region(ocr_img, region, lang=lang, psm=6,
fallback_psm=7, min_confidence=40.0)
else:
words = ocr_region(ocr_img, region, lang=lang, psm=6)
results[region.type] = words
logger.info(f"OCR {region.type}: {len(words)} words")
return results
# =============================================================================
# Stage 7: Line Alignment → Vocabulary Entries
# =============================================================================
def _group_words_into_lines(words: List[Dict], y_tolerance_px: int = 20) -> List[List[Dict]]:
"""Group words by Y position into lines, sorted by X within each line."""
if not words:
return []
sorted_words = sorted(words, key=lambda w: (w['top'], w['left']))
lines: List[List[Dict]] = []
current_line: List[Dict] = [sorted_words[0]]
current_y = sorted_words[0]['top']
for word in sorted_words[1:]:
if abs(word['top'] - current_y) <= y_tolerance_px:
current_line.append(word)
else:
current_line.sort(key=lambda w: w['left'])
lines.append(current_line)
current_line = [word]
current_y = word['top']
if current_line:
current_line.sort(key=lambda w: w['left'])
lines.append(current_line)
return lines
def match_lines_to_vocab(ocr_results: Dict[str, List[Dict]],
regions: List[PageRegion],
y_tolerance_px: int = 25) -> List[VocabRow]:
"""Align OCR results from different columns into vocabulary rows.
Uses Y-coordinate matching to pair English words, German translations,
and example sentences that appear on the same line.
Args:
ocr_results: Dict mapping region type to word lists.
regions: Detected regions (for reference).
y_tolerance_px: Max Y-distance to consider words on the same row.
Returns:
List of VocabRow objects.
"""
# Group words into lines per column
en_lines = _group_words_into_lines(ocr_results.get('column_en', []), y_tolerance_px)
de_lines = _group_words_into_lines(ocr_results.get('column_de', []), y_tolerance_px)
ex_lines = _group_words_into_lines(ocr_results.get('column_example', []), y_tolerance_px)
def line_y_center(line: List[Dict]) -> float:
return sum(w['top'] + w['height'] / 2 for w in line) / len(line)
def line_text(line: List[Dict]) -> str:
return ' '.join(w['text'] for w in line)
def line_confidence(line: List[Dict]) -> float:
return sum(w['conf'] for w in line) / len(line) if line else 0
# Build EN entries as the primary reference
vocab_rows: List[VocabRow] = []
for en_line in en_lines:
en_y = line_y_center(en_line)
en_text = line_text(en_line)
en_conf = line_confidence(en_line)
# Skip very short or likely header content
if len(en_text.strip()) < 2:
continue
# Find matching DE line
de_text = ""
de_conf = 0.0
best_de_dist = float('inf')
best_de_idx = -1
for idx, de_line in enumerate(de_lines):
dist = abs(line_y_center(de_line) - en_y)
if dist < y_tolerance_px and dist < best_de_dist:
best_de_dist = dist
best_de_idx = idx
if best_de_idx >= 0:
de_text = line_text(de_lines[best_de_idx])
de_conf = line_confidence(de_lines[best_de_idx])
# Find matching example line
ex_text = ""
ex_conf = 0.0
best_ex_dist = float('inf')
best_ex_idx = -1
for idx, ex_line in enumerate(ex_lines):
dist = abs(line_y_center(ex_line) - en_y)
if dist < y_tolerance_px and dist < best_ex_dist:
best_ex_dist = dist
best_ex_idx = idx
if best_ex_idx >= 0:
ex_text = line_text(ex_lines[best_ex_idx])
ex_conf = line_confidence(ex_lines[best_ex_idx])
avg_conf = en_conf
conf_count = 1
if de_conf > 0:
avg_conf += de_conf
conf_count += 1
if ex_conf > 0:
avg_conf += ex_conf
conf_count += 1
vocab_rows.append(VocabRow(
english=en_text.strip(),
german=de_text.strip(),
example=ex_text.strip(),
confidence=avg_conf / conf_count,
y_position=int(en_y),
))
# Handle multi-line wrapping in example column:
# If an example line has no matching EN/DE, append to previous entry
matched_ex_ys = set()
for row in vocab_rows:
if row.example:
matched_ex_ys.add(row.y_position)
for ex_line in ex_lines:
ex_y = line_y_center(ex_line)
# Check if already matched
already_matched = any(abs(ex_y - y) < y_tolerance_px for y in matched_ex_ys)
if already_matched:
continue
# Find nearest previous vocab row
best_row = None
best_dist = float('inf')
for row in vocab_rows:
dist = ex_y - row.y_position
if 0 < dist < y_tolerance_px * 3 and dist < best_dist:
best_dist = dist
best_row = row
if best_row:
continuation = line_text(ex_line).strip()
if continuation:
best_row.example = (best_row.example + " " + continuation).strip()
# Sort by Y position
vocab_rows.sort(key=lambda r: r.y_position)
return vocab_rows
# =============================================================================
# Stage 8: Optional LLM Post-Correction
# =============================================================================
async def llm_post_correct(img: np.ndarray, vocab_rows: List[VocabRow],
confidence_threshold: float = 50.0,
enabled: bool = False) -> List[VocabRow]:
"""Optionally send low-confidence regions to Qwen-VL for correction.
Default: disabled. Enable per parameter.
Args:
img: Original BGR image.
vocab_rows: Current vocabulary rows.
confidence_threshold: Rows below this get LLM correction.
enabled: Whether to actually run LLM correction.
Returns:
Corrected vocabulary rows.
"""
if not enabled:
return vocab_rows
# TODO: Implement Qwen-VL correction for low-confidence entries
# For each row with confidence < threshold:
# 1. Crop the relevant region from img
# 2. Send crop + OCR text to Qwen-VL
# 3. Replace text if LLM provides a confident correction
logger.info(f"LLM post-correction skipped (not yet implemented)")
return vocab_rows
# =============================================================================
# Orchestrator
# =============================================================================
async def run_cv_pipeline(
pdf_data: Optional[bytes] = None,
image_data: Optional[bytes] = None,
page_number: int = 0,
zoom: float = 3.0,
enable_dewarp: bool = True,
enable_llm_correction: bool = False,
lang: str = "eng+deu",
) -> PipelineResult:
"""Run the complete CV document reconstruction pipeline.
Args:
pdf_data: Raw PDF bytes (mutually exclusive with image_data).
image_data: Raw image bytes (mutually exclusive with pdf_data).
page_number: 0-indexed page number (for PDF).
zoom: PDF rendering zoom factor.
enable_dewarp: Whether to run dewarp stage.
enable_llm_correction: Whether to run LLM post-correction.
lang: Tesseract language string.
Returns:
PipelineResult with vocabulary and timing info.
"""
if not CV_PIPELINE_AVAILABLE:
return PipelineResult(error="CV pipeline not available (OpenCV or Tesseract missing)")
result = PipelineResult()
total_start = time.time()
try:
# Stage 1: Render
t = time.time()
if pdf_data:
img = render_pdf_high_res(pdf_data, page_number, zoom)
elif image_data:
img = render_image_high_res(image_data)
else:
return PipelineResult(error="No input data (pdf_data or image_data required)")
result.stages['render'] = round(time.time() - t, 2)
result.image_width = img.shape[1]
result.image_height = img.shape[0]
logger.info(f"Stage 1 (render): {img.shape[1]}x{img.shape[0]} in {result.stages['render']}s")
# Stage 2: Deskew
t = time.time()
img, angle = deskew_image(img)
result.stages['deskew'] = round(time.time() - t, 2)
logger.info(f"Stage 2 (deskew): {angle:.2f}° in {result.stages['deskew']}s")
# Stage 3: Dewarp
if enable_dewarp:
t = time.time()
img = dewarp_image(img)
result.stages['dewarp'] = round(time.time() - t, 2)
# Stage 4: Dual image preparation
t = time.time()
ocr_img = create_ocr_image(img)
layout_img = create_layout_image(img)
result.stages['image_prep'] = round(time.time() - t, 2)
# Stage 5: Layout analysis
t = time.time()
regions = analyze_layout(layout_img, ocr_img)
result.stages['layout'] = round(time.time() - t, 2)
result.columns_detected = len([r for r in regions if r.type.startswith('column')])
logger.info(f"Stage 5 (layout): {result.columns_detected} columns in {result.stages['layout']}s")
# Stage 6: Multi-pass OCR
t = time.time()
ocr_results = run_multi_pass_ocr(ocr_img, regions, lang)
result.stages['ocr'] = round(time.time() - t, 2)
total_words = sum(len(w) for w in ocr_results.values())
result.word_count = total_words
logger.info(f"Stage 6 (OCR): {total_words} words in {result.stages['ocr']}s")
# Stage 7: Line alignment
t = time.time()
vocab_rows = match_lines_to_vocab(ocr_results, regions)
result.stages['alignment'] = round(time.time() - t, 2)
# Stage 8: Optional LLM correction
if enable_llm_correction:
t = time.time()
vocab_rows = await llm_post_correct(img, vocab_rows)
result.stages['llm_correction'] = round(time.time() - t, 2)
# Convert to output format
result.vocabulary = [
{
"english": row.english,
"german": row.german,
"example": row.example,
"confidence": round(row.confidence, 1),
}
for row in vocab_rows
if row.english or row.german # Skip empty rows
]
result.duration_seconds = round(time.time() - total_start, 2)
logger.info(f"CV Pipeline complete: {len(result.vocabulary)} entries in {result.duration_seconds}s")
except Exception as e:
logger.error(f"CV Pipeline error: {e}")
import traceback
logger.debug(traceback.format_exc())
result.error = str(e)
result.duration_seconds = round(time.time() - total_start, 2)
return result