Files
breakpilot-lehrer/klausur-service/backend/cv_preprocessing.py
Benjamin Admin 438a4495c7
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 27s
CI / test-go-edu-search (push) Successful in 30s
CI / test-python-klausur (push) Failing after 1m56s
CI / test-python-agent-core (push) Successful in 17s
CI / test-nodejs-website (push) Successful in 18s
fix: swap 90°/270° rotation direction in orientation detection
Tesseract OSD 'rotate' returns the clockwise correction needed,
but the code was applying counterclockwise for 90° and clockwise
for 270° — exactly reversed. This caused pages scanned sideways
to be flipped upside down instead of corrected.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 16:39:15 +01:00

1167 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Image I/O, orientation detection, deskew, and dewarp for the CV vocabulary pipeline.
Lizenz: Apache 2.0 (kommerziell nutzbar)
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import logging
import time
from collections import defaultdict
from typing import Any, Dict, List, Tuple
import numpy as np
from cv_vocab_types import (
CV2_AVAILABLE,
TESSERACT_AVAILABLE,
)
logger = logging.getLogger(__name__)
# Guarded imports — mirror cv_vocab_types guards
try:
import cv2
except ImportError:
cv2 = None # type: ignore[assignment]
try:
import pytesseract
from PIL import Image
except ImportError:
pytesseract = None # type: ignore[assignment]
Image = None # type: ignore[assignment,misc]
def render_pdf_high_res(pdf_data: bytes, page_number: int = 0, zoom: float = 3.0) -> np.ndarray:
"""Render a PDF page to a high-resolution numpy array (BGR).
Args:
pdf_data: Raw PDF bytes.
page_number: 0-indexed page number.
zoom: Zoom factor (3.0 = 432 DPI).
Returns:
numpy array in BGR format.
"""
import fitz # PyMuPDF
pdf_doc = fitz.open(stream=pdf_data, filetype="pdf")
if page_number >= pdf_doc.page_count:
raise ValueError(f"Page {page_number} does not exist (PDF has {pdf_doc.page_count} pages)")
page = pdf_doc[page_number]
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
# Convert to numpy BGR
img_data = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n)
if pix.n == 4: # RGBA
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGBA2BGR)
elif pix.n == 3: # RGB
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGB2BGR)
else: # Grayscale
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_GRAY2BGR)
pdf_doc.close()
return img_bgr
def render_image_high_res(image_data: bytes) -> np.ndarray:
"""Load an image (PNG/JPEG) into a numpy array (BGR).
Args:
image_data: Raw image bytes.
Returns:
numpy array in BGR format.
"""
img_array = np.frombuffer(image_data, dtype=np.uint8)
img_bgr = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
if img_bgr is None:
raise ValueError("Could not decode image data")
return img_bgr
# =============================================================================
# Stage 1b: Orientation Detection (0°/90°/180°/270°)
# =============================================================================
def detect_and_fix_orientation(img_bgr: np.ndarray) -> Tuple[np.ndarray, int]:
"""Detect page orientation via Tesseract OSD and rotate if needed.
Handles upside-down scans (180°) common with book scanners where
every other page is flipped due to the scanner hinge.
Returns:
(corrected_image, rotation_degrees) — rotation is 0, 90, 180, or 270.
"""
if pytesseract is None:
return img_bgr, 0
try:
# Tesseract OSD needs a grayscale or RGB image
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
pil_img = Image.fromarray(gray)
osd = pytesseract.image_to_osd(pil_img, output_type=pytesseract.Output.DICT)
rotate = osd.get("rotate", 0)
confidence = osd.get("orientation_conf", 0.0)
logger.info(f"OSD: orientation={rotate}° confidence={confidence:.1f}")
if rotate == 0 or confidence < 1.0:
return img_bgr, 0
# Apply rotation — OSD rotate is the clockwise correction needed
if rotate == 180:
corrected = cv2.rotate(img_bgr, cv2.ROTATE_180)
elif rotate == 90:
corrected = cv2.rotate(img_bgr, cv2.ROTATE_90_CLOCKWISE)
elif rotate == 270:
corrected = cv2.rotate(img_bgr, cv2.ROTATE_90_COUNTERCLOCKWISE)
else:
return img_bgr, 0
logger.info(f"OSD: rotated {rotate}° to fix orientation")
return corrected, rotate
except Exception as e:
logger.warning(f"OSD orientation detection failed: {e}")
return img_bgr, 0
# =============================================================================
# Stage 2: Deskew (Rotation Correction)
# =============================================================================
def deskew_image(img: np.ndarray) -> Tuple[np.ndarray, float]:
"""Correct rotation using Hough Line detection.
Args:
img: BGR image.
Returns:
Tuple of (corrected image, detected angle in degrees).
"""
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Binarize for line detection
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
# Detect lines
lines = cv2.HoughLinesP(binary, 1, np.pi / 180, threshold=100,
minLineLength=img.shape[1] // 4, maxLineGap=20)
if lines is None or len(lines) < 3:
return img, 0.0
# Compute angles of near-horizontal lines
angles = []
for line in lines:
x1, y1, x2, y2 = line[0]
angle = np.degrees(np.arctan2(y2 - y1, x2 - x1))
if abs(angle) < 15: # Only near-horizontal
angles.append(angle)
if not angles:
return img, 0.0
median_angle = float(np.median(angles))
# Limit correction to ±5°
if abs(median_angle) > 5.0:
median_angle = 5.0 * np.sign(median_angle)
if abs(median_angle) < 0.1:
return img, 0.0
# Rotate
h, w = img.shape[:2]
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, median_angle, 1.0)
corrected = cv2.warpAffine(img, M, (w, h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
logger.info(f"Deskew: corrected {median_angle:.2f}° rotation")
return corrected, median_angle
def deskew_image_by_word_alignment(
image_data: bytes,
lang: str = "eng+deu",
downscale_factor: float = 0.5,
) -> Tuple[bytes, float]:
"""Correct rotation by fitting a line through left-most word starts per text line.
More robust than Hough-based deskew for vocabulary worksheets where text lines
have consistent left-alignment. Runs a quick Tesseract pass on a downscaled
copy to find word positions, computes the dominant left-edge column, fits a
line through those points and rotates the full-resolution image.
Args:
image_data: Raw image bytes (PNG/JPEG).
lang: Tesseract language string for the quick pass.
downscale_factor: Shrink factor for the quick Tesseract pass (0.5 = 50%).
Returns:
Tuple of (rotated image as PNG bytes, detected angle in degrees).
"""
if not CV2_AVAILABLE or not TESSERACT_AVAILABLE:
return image_data, 0.0
# 1. Decode image
img_array = np.frombuffer(image_data, dtype=np.uint8)
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
if img is None:
logger.warning("deskew_by_word_alignment: could not decode image")
return image_data, 0.0
orig_h, orig_w = img.shape[:2]
# 2. Downscale for fast Tesseract pass
small_w = int(orig_w * downscale_factor)
small_h = int(orig_h * downscale_factor)
small = cv2.resize(img, (small_w, small_h), interpolation=cv2.INTER_AREA)
# 3. Quick Tesseract — word-level positions
pil_small = Image.fromarray(cv2.cvtColor(small, cv2.COLOR_BGR2RGB))
try:
data = pytesseract.image_to_data(
pil_small, lang=lang, config="--psm 6 --oem 3",
output_type=pytesseract.Output.DICT,
)
except Exception as e:
logger.warning(f"deskew_by_word_alignment: Tesseract failed: {e}")
return image_data, 0.0
# 4. Per text-line, find the left-most word start
# Group by (block_num, par_num, line_num)
line_groups: Dict[tuple, list] = defaultdict(list)
for i in range(len(data["text"])):
text = (data["text"][i] or "").strip()
conf = int(data["conf"][i])
if not text or conf < 20:
continue
key = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
line_groups[key].append(i)
if len(line_groups) < 5:
logger.info(f"deskew_by_word_alignment: only {len(line_groups)} lines, skipping")
return image_data, 0.0
# For each line, pick the word with smallest 'left' → compute (left_x, center_y)
# Scale back to original resolution
scale = 1.0 / downscale_factor
points = [] # list of (x, y) in original-image coords
for key, indices in line_groups.items():
best_idx = min(indices, key=lambda i: data["left"][i])
lx = data["left"][best_idx] * scale
top = data["top"][best_idx] * scale
h = data["height"][best_idx] * scale
cy = top + h / 2.0
points.append((lx, cy))
# 5. Find dominant left-edge column + compute angle
xs = np.array([p[0] for p in points])
ys = np.array([p[1] for p in points])
median_x = float(np.median(xs))
tolerance = orig_w * 0.03 # 3% of image width
mask = np.abs(xs - median_x) <= tolerance
filtered_xs = xs[mask]
filtered_ys = ys[mask]
if len(filtered_xs) < 5:
logger.info(f"deskew_by_word_alignment: only {len(filtered_xs)} aligned points after filter, skipping")
return image_data, 0.0
# polyfit: x = a*y + b → a = dx/dy → angle = arctan(a)
coeffs = np.polyfit(filtered_ys, filtered_xs, 1)
slope = coeffs[0] # dx/dy
angle_rad = np.arctan(slope)
angle_deg = float(np.degrees(angle_rad))
# Clamp to ±5°
angle_deg = max(-5.0, min(5.0, angle_deg))
logger.info(f"deskew_by_word_alignment: detected {angle_deg:.2f}° from {len(filtered_xs)} points "
f"(total lines: {len(line_groups)})")
if abs(angle_deg) < 0.05:
return image_data, 0.0
# 6. Rotate full-res image
center = (orig_w // 2, orig_h // 2)
M = cv2.getRotationMatrix2D(center, angle_deg, 1.0)
rotated = cv2.warpAffine(img, M, (orig_w, orig_h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
# Encode back to PNG
success, png_buf = cv2.imencode(".png", rotated)
if not success:
logger.warning("deskew_by_word_alignment: PNG encoding failed")
return image_data, 0.0
return png_buf.tobytes(), angle_deg
def _projection_gradient_score(profile: np.ndarray) -> float:
"""Score a projection profile by the L2-norm of its first derivative.
Higher score = sharper transitions between text-lines and gaps,
i.e. better row/column alignment.
"""
diff = np.diff(profile)
return float(np.sum(diff * diff))
def deskew_image_iterative(
img: np.ndarray,
coarse_range: float = 5.0,
coarse_step: float = 0.1,
fine_range: float = 0.15,
fine_step: float = 0.02,
) -> Tuple[np.ndarray, float, Dict[str, Any]]:
"""Iterative deskew using vertical-edge projection optimisation.
The key insight: at the correct rotation angle, vertical features
(word left-edges, column borders) become truly vertical, producing
the sharpest peaks in the vertical projection of vertical edges.
Method:
1. Detect vertical edges via Sobel-X on the central crop.
2. Coarse sweep: rotate edge image, compute vertical projection
gradient score. The angle where vertical edges align best wins.
3. Fine sweep: refine around the coarse winner.
Args:
img: BGR image (full resolution).
coarse_range: half-range in degrees for the coarse sweep.
coarse_step: step size in degrees for the coarse sweep.
fine_range: half-range around the coarse winner for the fine sweep.
fine_step: step size in degrees for the fine sweep.
Returns:
(rotated_bgr, angle_degrees, debug_dict)
"""
h, w = img.shape[:2]
debug: Dict[str, Any] = {}
# --- Grayscale + vertical edge detection ---
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Central crop (15%-85% height, 10%-90% width) to avoid page margins
y_lo, y_hi = int(h * 0.15), int(h * 0.85)
x_lo, x_hi = int(w * 0.10), int(w * 0.90)
gray_crop = gray[y_lo:y_hi, x_lo:x_hi]
# Sobel-X → absolute vertical edges
sobel_x = cv2.Sobel(gray_crop, cv2.CV_64F, 1, 0, ksize=3)
edges = np.abs(sobel_x)
# Normalise to 0-255 for consistent scoring
edge_max = edges.max()
if edge_max > 0:
edges = (edges / edge_max * 255).astype(np.uint8)
else:
return img, 0.0, {"error": "no edges detected"}
crop_h, crop_w = edges.shape[:2]
crop_center = (crop_w // 2, crop_h // 2)
# Trim margin after rotation to avoid border artifacts
trim_y = max(4, int(crop_h * 0.03))
trim_x = max(4, int(crop_w * 0.03))
def _sweep_edges(angles: np.ndarray) -> list:
"""Score each angle by vertical projection gradient of vertical edges."""
results = []
for angle in angles:
if abs(angle) < 1e-6:
rotated = edges
else:
M = cv2.getRotationMatrix2D(crop_center, angle, 1.0)
rotated = cv2.warpAffine(edges, M, (crop_w, crop_h),
flags=cv2.INTER_NEAREST,
borderMode=cv2.BORDER_REPLICATE)
# Trim borders to avoid edge artifacts
trimmed = rotated[trim_y:-trim_y, trim_x:-trim_x]
v_profile = np.sum(trimmed, axis=0, dtype=np.float64)
score = _projection_gradient_score(v_profile)
results.append((float(angle), score))
return results
# --- Phase 1: coarse sweep ---
coarse_angles = np.arange(-coarse_range, coarse_range + coarse_step * 0.5, coarse_step)
coarse_results = _sweep_edges(coarse_angles)
best_coarse = max(coarse_results, key=lambda x: x[1])
best_coarse_angle, best_coarse_score = best_coarse
debug["coarse_best_angle"] = round(best_coarse_angle, 2)
debug["coarse_best_score"] = round(best_coarse_score, 1)
debug["coarse_scores"] = [(round(a, 2), round(s, 1)) for a, s in coarse_results]
# --- Phase 2: fine sweep around coarse winner ---
fine_lo = best_coarse_angle - fine_range
fine_hi = best_coarse_angle + fine_range
fine_angles = np.arange(fine_lo, fine_hi + fine_step * 0.5, fine_step)
fine_results = _sweep_edges(fine_angles)
best_fine = max(fine_results, key=lambda x: x[1])
best_fine_angle, best_fine_score = best_fine
debug["fine_best_angle"] = round(best_fine_angle, 2)
debug["fine_best_score"] = round(best_fine_score, 1)
debug["fine_scores"] = [(round(a, 2), round(s, 1)) for a, s in fine_results]
final_angle = best_fine_angle
# Clamp to ±5°
final_angle = max(-5.0, min(5.0, final_angle))
logger.info(f"deskew_iterative: coarse={best_coarse_angle:.2f}° fine={best_fine_angle:.2f}° -> {final_angle:.2f}°")
if abs(final_angle) < 0.05:
return img, 0.0, debug
# --- Rotate full-res image ---
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, final_angle, 1.0)
rotated = cv2.warpAffine(img, M, (w, h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
return rotated, final_angle, debug
def _measure_textline_slope(img: np.ndarray) -> float:
"""Measure residual text-line slope via Tesseract word-position regression.
Groups Tesseract words by (block, par, line), fits a linear regression
per line (y = slope * x + b), and returns the trimmed-mean slope in
degrees. Positive = text rises to the right, negative = falls.
This is the most direct measurement of remaining rotation after deskew.
"""
import math as _math
if not TESSERACT_AVAILABLE or not CV2_AVAILABLE:
return 0.0
h, w = img.shape[:2]
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
data = pytesseract.image_to_data(
Image.fromarray(gray),
output_type=pytesseract.Output.DICT,
config="--psm 6",
)
# Group word centres by text line
lines: Dict[tuple, list] = {}
for i in range(len(data["text"])):
txt = (data["text"][i] or "").strip()
if len(txt) < 2 or int(data["conf"][i]) < 30:
continue
key = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
cx = data["left"][i] + data["width"][i] / 2.0
cy = data["top"][i] + data["height"][i] / 2.0
lines.setdefault(key, []).append((cx, cy))
# Per-line linear regression → slope angle
slopes: list = []
for pts in lines.values():
if len(pts) < 3:
continue
pts.sort(key=lambda p: p[0])
xs = np.array([p[0] for p in pts], dtype=np.float64)
ys = np.array([p[1] for p in pts], dtype=np.float64)
if xs[-1] - xs[0] < w * 0.15:
continue # skip short lines
A = np.vstack([xs, np.ones_like(xs)]).T
result = np.linalg.lstsq(A, ys, rcond=None)
slope = result[0][0]
slopes.append(_math.degrees(_math.atan(slope)))
if len(slopes) < 3:
return 0.0
# Trimmed mean (drop 10% extremes on each side)
slopes.sort()
trim = max(1, len(slopes) // 10)
trimmed = slopes[trim:-trim] if len(slopes) > 2 * trim else slopes
if not trimmed:
return 0.0
return sum(trimmed) / len(trimmed)
def deskew_two_pass(
img: np.ndarray,
coarse_range: float = 5.0,
) -> Tuple[np.ndarray, float, Dict[str, Any]]:
"""Two-pass deskew: iterative projection + word-alignment residual check.
Pass 1: ``deskew_image_iterative()`` (vertical-edge projection, wide range).
Pass 2: ``deskew_image_by_word_alignment()`` on the already-corrected image
to detect and fix residual skew that the projection method missed.
The two corrections are summed. If the residual from Pass 2 is below
0.3° it is ignored (already good enough).
Returns:
(corrected_bgr, total_angle_degrees, debug_dict)
"""
debug: Dict[str, Any] = {}
# --- Pass 1: iterative projection ---
corrected, angle1, dbg1 = deskew_image_iterative(
img.copy(), coarse_range=coarse_range,
)
debug["pass1_angle"] = round(angle1, 3)
debug["pass1_method"] = "iterative"
debug["pass1_debug"] = dbg1
# --- Pass 2: word-alignment residual check on corrected image ---
angle2 = 0.0
try:
# Encode the corrected image to PNG bytes for word-alignment
ok, buf = cv2.imencode(".png", corrected)
if ok:
corrected_bytes, angle2 = deskew_image_by_word_alignment(buf.tobytes())
if abs(angle2) >= 0.3:
# Significant residual — decode and use the second correction
arr2 = np.frombuffer(corrected_bytes, dtype=np.uint8)
corrected2 = cv2.imdecode(arr2, cv2.IMREAD_COLOR)
if corrected2 is not None:
corrected = corrected2
logger.info(f"deskew_two_pass: pass2 residual={angle2:.2f}° applied "
f"(total={angle1 + angle2:.2f}°)")
else:
angle2 = 0.0
else:
logger.info(f"deskew_two_pass: pass2 residual={angle2:.2f}° < 0.3° — skipped")
angle2 = 0.0
except Exception as e:
logger.warning(f"deskew_two_pass: pass2 word-alignment failed: {e}")
angle2 = 0.0
# --- Pass 3: Tesseract text-line regression residual check ---
# The most reliable final check: measure actual text-line slopes
# using Tesseract word positions and linear regression per line.
angle3 = 0.0
try:
residual = _measure_textline_slope(corrected)
debug["pass3_raw"] = round(residual, 3)
if abs(residual) >= 0.3:
h3, w3 = corrected.shape[:2]
center3 = (w3 // 2, h3 // 2)
M3 = cv2.getRotationMatrix2D(center3, residual, 1.0)
corrected = cv2.warpAffine(
corrected, M3, (w3, h3),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE,
)
angle3 = residual
logger.info(
"deskew_two_pass: pass3 text-line residual=%.2f° applied",
residual,
)
else:
logger.info(
"deskew_two_pass: pass3 text-line residual=%.2f° < 0.3° — skipped",
residual,
)
except Exception as e:
logger.warning("deskew_two_pass: pass3 text-line check failed: %s", e)
total_angle = angle1 + angle2 + angle3
debug["pass2_angle"] = round(angle2, 3)
debug["pass2_method"] = "word_alignment"
debug["pass3_angle"] = round(angle3, 3)
debug["pass3_method"] = "textline_regression"
debug["total_angle"] = round(total_angle, 3)
logger.info(
"deskew_two_pass: pass1=%.2f° + pass2=%.2f° + pass3=%.2f° = %.2f°",
angle1, angle2, angle3, total_angle,
)
return corrected, total_angle, debug
# =============================================================================
# Stage 3: Dewarp (Book Curvature Correction)
# =============================================================================
def _detect_shear_angle(img: np.ndarray) -> Dict[str, Any]:
"""Detect the vertical shear angle of the page.
After deskew (horizontal lines aligned), vertical features like column
edges may still be tilted. This measures that tilt by tracking the
strongest vertical edge across horizontal strips.
The result is a shear angle in degrees: the angular difference between
true vertical and the detected column edge.
Returns:
Dict with keys: method, shear_degrees, confidence.
"""
h, w = img.shape[:2]
result = {"method": "vertical_edge", "shear_degrees": 0.0, "confidence": 0.0}
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Vertical Sobel to find vertical edges
sobel_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
abs_sobel = np.abs(sobel_x).astype(np.uint8)
# Binarize with Otsu
_, binary = cv2.threshold(abs_sobel, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
num_strips = 20
strip_h = h // num_strips
edge_positions = [] # (y_center, x_position)
for i in range(num_strips):
y_start = i * strip_h
y_end = min((i + 1) * strip_h, h)
strip = binary[y_start:y_end, :]
# Project vertically (sum along y-axis)
projection = np.sum(strip, axis=0).astype(np.float64)
if projection.max() == 0:
continue
# Find the strongest vertical edge in left 40% of image
search_w = int(w * 0.4)
left_proj = projection[:search_w]
if left_proj.max() == 0:
continue
# Smooth and find peak
kernel_size = max(3, w // 100)
if kernel_size % 2 == 0:
kernel_size += 1
smoothed = cv2.GaussianBlur(left_proj.reshape(1, -1), (kernel_size, 1), 0).flatten()
x_pos = float(np.argmax(smoothed))
y_center = (y_start + y_end) / 2.0
edge_positions.append((y_center, x_pos))
if len(edge_positions) < 8:
return result
ys = np.array([p[0] for p in edge_positions])
xs = np.array([p[1] for p in edge_positions])
# Remove outliers (> 2 std from median)
median_x = np.median(xs)
std_x = max(np.std(xs), 1.0)
mask = np.abs(xs - median_x) < 2 * std_x
ys = ys[mask]
xs = xs[mask]
if len(ys) < 6:
return result
# Fit straight line: x = slope * y + intercept
# The slope tells us the tilt of the vertical edge
straight_coeffs = np.polyfit(ys, xs, 1)
slope = straight_coeffs[0] # dx/dy in pixels
fitted = np.polyval(straight_coeffs, ys)
residuals = xs - fitted
rmse = float(np.sqrt(np.mean(residuals ** 2)))
# Convert slope to angle: arctan(dx/dy) in degrees
import math
shear_degrees = math.degrees(math.atan(slope))
confidence = min(1.0, len(ys) / 15.0) * max(0.5, 1.0 - rmse / 5.0)
result["shear_degrees"] = round(shear_degrees, 3)
result["confidence"] = round(float(confidence), 2)
return result
def _detect_shear_by_projection(img: np.ndarray) -> Dict[str, Any]:
"""Detect shear angle by maximising variance of horizontal text-line projections.
Principle: horizontal text lines produce a row-projection profile with sharp
peaks (high variance) when the image is correctly aligned. Any residual shear
smears the peaks and reduces variance. We sweep ±3° and pick the angle whose
corrected projection has the highest variance.
Works best on pages with clear horizontal banding (vocabulary tables, prose).
Complements _detect_shear_angle() which needs strong vertical edges.
Returns:
Dict with keys: method, shear_degrees, confidence.
"""
import math
result = {"method": "projection", "shear_degrees": 0.0, "confidence": 0.0}
h, w = img.shape[:2]
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Otsu binarisation
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
# Work at half resolution for speed
small = cv2.resize(binary, (w // 2, h // 2), interpolation=cv2.INTER_AREA)
sh, sw = small.shape
# 2-pass angle sweep for 10x better precision:
# Pass 1: Coarse sweep ±3° in 0.5° steps (13 values)
# Pass 2: Fine sweep ±0.5° around coarse best in 0.05° steps (21 values)
def _sweep_variance(angles_list):
results = []
for angle_deg in angles_list:
if abs(angle_deg) < 0.001:
rotated = small
else:
shear_tan = math.tan(math.radians(angle_deg))
M = np.float32([[1, shear_tan, -sh / 2.0 * shear_tan], [0, 1, 0]])
rotated = cv2.warpAffine(small, M, (sw, sh),
flags=cv2.INTER_NEAREST,
borderMode=cv2.BORDER_CONSTANT)
profile = np.sum(rotated, axis=1).astype(float)
results.append((angle_deg, float(np.var(profile))))
return results
# Pass 1: coarse
coarse_angles = [a * 0.5 for a in range(-6, 7)] # 13 values
coarse_results = _sweep_variance(coarse_angles)
coarse_best = max(coarse_results, key=lambda x: x[1])
# Pass 2: fine around coarse best
fine_center = coarse_best[0]
fine_angles = [fine_center + a * 0.05 for a in range(-10, 11)] # 21 values
fine_results = _sweep_variance(fine_angles)
fine_best = max(fine_results, key=lambda x: x[1])
best_angle = fine_best[0]
best_variance = fine_best[1]
variances = coarse_results + fine_results
# Confidence: how much sharper is the best angle vs. the mean?
all_mean = sum(v for _, v in variances) / len(variances)
if all_mean > 0 and best_variance > all_mean:
confidence = min(1.0, (best_variance - all_mean) / (all_mean + 1.0) * 0.6)
else:
confidence = 0.0
result["shear_degrees"] = round(best_angle, 3)
result["confidence"] = round(max(0.0, min(1.0, confidence)), 2)
return result
def _detect_shear_by_hough(img: np.ndarray) -> Dict[str, Any]:
"""Detect shear using Hough transform on printed table / ruled lines.
Vocabulary worksheets have near-horizontal printed table borders. After
deskew these should be exactly horizontal; any residual tilt equals the
vertical shear angle (with inverted sign).
The sign convention: a horizontal line tilting +α degrees (left end lower)
means the page has vertical shear of -α degrees (left column edge drifts
to the left going downward).
Returns:
Dict with keys: method, shear_degrees, confidence.
"""
result = {"method": "hough_lines", "shear_degrees": 0.0, "confidence": 0.0}
h, w = img.shape[:2]
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150, apertureSize=3)
min_len = int(w * 0.15)
lines = cv2.HoughLinesP(
edges, rho=1, theta=np.pi / 360,
threshold=int(w * 0.08),
minLineLength=min_len,
maxLineGap=20,
)
if lines is None or len(lines) < 3:
return result
horizontal_angles: List[Tuple[float, float]] = []
for line in lines:
x1, y1, x2, y2 = line[0]
if x1 == x2:
continue
angle = float(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
if abs(angle) <= 5.0:
length = float(np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2))
horizontal_angles.append((angle, length))
if len(horizontal_angles) < 3:
return result
# Weighted median
angles_arr = np.array([a for a, _ in horizontal_angles])
weights_arr = np.array([l for _, l in horizontal_angles])
sorted_idx = np.argsort(angles_arr)
s_angles = angles_arr[sorted_idx]
s_weights = weights_arr[sorted_idx]
cum = np.cumsum(s_weights)
mid_idx = int(np.searchsorted(cum, cum[-1] / 2.0))
median_angle = float(s_angles[min(mid_idx, len(s_angles) - 1)])
agree = sum(1 for a, _ in horizontal_angles if abs(a - median_angle) < 1.0)
confidence = min(1.0, agree / max(len(horizontal_angles), 1)) * 0.85
# Sign inversion: horizontal line tilt is complementary to vertical shear
shear_degrees = -median_angle
result["shear_degrees"] = round(shear_degrees, 3)
result["confidence"] = round(max(0.0, min(1.0, confidence)), 2)
return result
def _detect_shear_by_text_lines(img: np.ndarray) -> Dict[str, Any]:
"""Detect shear by measuring text-line straightness (Method D).
Runs a quick Tesseract scan (PSM 11, 50% downscale) to locate word
bounding boxes, groups them into vertical columns by X-proximity,
and measures how the left-edge X position drifts with Y (vertical
position). The drift dx/dy is the tangent of the shear angle.
This directly measures vertical shear (column tilt) rather than
horizontal text-line slope, which is already corrected by deskew.
Returns:
Dict with keys: method, shear_degrees, confidence.
"""
import math
result = {"method": "text_lines", "shear_degrees": 0.0, "confidence": 0.0}
h, w = img.shape[:2]
# Downscale 50% for speed
scale = 0.5
small = cv2.resize(img, (int(w * scale), int(h * scale)),
interpolation=cv2.INTER_AREA)
gray = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY)
pil_img = Image.fromarray(gray)
try:
data = pytesseract.image_to_data(
pil_img, lang='eng+deu', config='--psm 11 --oem 3',
output_type=pytesseract.Output.DICT,
)
except Exception:
return result
# Collect word left-edges (x) and vertical centres (y)
words = []
for i in range(len(data['text'])):
text = data['text'][i].strip()
conf = int(data['conf'][i])
if not text or conf < 20 or len(text) < 2:
continue
left_x = float(data['left'][i])
cy = data['top'][i] + data['height'][i] / 2.0
word_w = float(data['width'][i])
words.append((left_x, cy, word_w))
if len(words) < 15:
return result
# --- Group words into vertical columns by left-edge X proximity ---
# Sort by x, then cluster words whose left-edges are within x_tol
avg_w = sum(ww for _, _, ww in words) / len(words)
x_tol = max(avg_w * 0.4, 8) # tolerance for "same column"
words_by_x = sorted(words, key=lambda w: w[0])
columns: List[List[Tuple[float, float]]] = [] # each: [(left_x, cy), ...]
cur_col: List[Tuple[float, float]] = [(words_by_x[0][0], words_by_x[0][1])]
cur_x = words_by_x[0][0]
for lx, cy, _ in words_by_x[1:]:
if abs(lx - cur_x) <= x_tol:
cur_col.append((lx, cy))
# Update running x as median of cluster
cur_x = cur_x * 0.8 + lx * 0.2
else:
if len(cur_col) >= 5:
columns.append(cur_col)
cur_col = [(lx, cy)]
cur_x = lx
if len(cur_col) >= 5:
columns.append(cur_col)
if len(columns) < 2:
return result
# --- For each column, measure X-drift as a function of Y ---
# Fit: left_x = a * cy + b → a = dx/dy = tan(shear_angle)
drifts = []
for col in columns:
ys = np.array([p[1] for p in col])
xs = np.array([p[0] for p in col])
y_range = ys.max() - ys.min()
if y_range < h * scale * 0.3:
continue # column must span at least 30% of image height
# Linear regression: x = a*y + b
coeffs = np.polyfit(ys, xs, 1)
drifts.append(coeffs[0]) # dx/dy
if len(drifts) < 2:
return result
# Median dx/dy → shear angle
# dx/dy > 0 means left-edges move RIGHT as we go DOWN → columns lean right
median_drift = float(np.median(drifts))
shear_degrees = math.degrees(math.atan(median_drift))
# Confidence from column count + drift consistency
drift_std = float(np.std(drifts))
consistency = max(0.0, 1.0 - drift_std * 50) # tighter penalty for drift variance
count_factor = min(1.0, len(drifts) / 4.0)
confidence = count_factor * 0.5 + consistency * 0.5
result["shear_degrees"] = round(shear_degrees, 3)
result["confidence"] = round(max(0.0, min(1.0, confidence)), 2)
logger.info("text_lines(v2): %d columns, %d drifts, median=%.4f, "
"shear=%.3f°, conf=%.2f",
len(columns), len(drifts), median_drift,
shear_degrees, confidence)
return result
def _dewarp_quality_check(original: np.ndarray, corrected: np.ndarray) -> bool:
"""Check whether the dewarp correction actually improved alignment.
Compares horizontal projection variance before and after correction.
Higher variance means sharper text-line peaks, which indicates better
horizontal alignment.
Returns True if the correction improved the image, False if it should
be discarded.
"""
def _h_proj_variance(img: np.ndarray) -> float:
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
_, binary = cv2.threshold(gray, 0, 255,
cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
small = cv2.resize(binary, (binary.shape[1] // 2, binary.shape[0] // 2),
interpolation=cv2.INTER_AREA)
profile = np.sum(small, axis=1).astype(float)
return float(np.var(profile))
var_before = _h_proj_variance(original)
var_after = _h_proj_variance(corrected)
# Correction must improve variance (even by a tiny margin)
return var_after > var_before
def _apply_shear(img: np.ndarray, shear_degrees: float) -> np.ndarray:
"""Apply a vertical shear correction to an image.
Shifts each row horizontally proportional to its distance from the
vertical center. This corrects the tilt of vertical features (columns)
without affecting horizontal alignment (text lines).
Args:
img: BGR image.
shear_degrees: Shear angle in degrees. Positive = shift top-right/bottom-left.
Returns:
Corrected image.
"""
import math
h, w = img.shape[:2]
shear_tan = math.tan(math.radians(shear_degrees))
# Affine matrix: shift x by shear_tan * (y - h/2)
# [1 shear_tan -h/2*shear_tan]
# [0 1 0 ]
M = np.float32([
[1, shear_tan, -h / 2.0 * shear_tan],
[0, 1, 0],
])
corrected = cv2.warpAffine(img, M, (w, h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
return corrected
def _ensemble_shear(detections: List[Dict[str, Any]]) -> Tuple[float, float, str]:
"""Combine multiple shear detections into a single weighted estimate (v2).
Ensemble v2 changes vs v1:
- Minimum confidence raised to 0.5 (was 0.3)
- text_lines method gets 1.5× weight boost (most reliable detector)
- Outlier filter at 1° from weighted mean
Returns:
(shear_degrees, ensemble_confidence, methods_used_str)
"""
# Confidence threshold — lowered from 0.5 to 0.35 to catch subtle shear
# that individual methods detect with moderate confidence.
_MIN_CONF = 0.35
# text_lines gets a weight boost as the most content-aware method
_METHOD_WEIGHT_BOOST = {"text_lines": 1.5}
accepted = []
for d in detections:
if d["confidence"] < _MIN_CONF:
continue
boost = _METHOD_WEIGHT_BOOST.get(d["method"], 1.0)
effective_conf = d["confidence"] * boost
accepted.append((d["shear_degrees"], effective_conf, d["method"]))
if not accepted:
return 0.0, 0.0, "none"
if len(accepted) == 1:
deg, conf, method = accepted[0]
return deg, min(conf, 1.0), method
# First pass: weighted mean
total_w = sum(c for _, c, _ in accepted)
w_mean = sum(d * c for d, c, _ in accepted) / total_w
# Outlier filter: keep results within 1° of weighted mean
filtered = [(d, c, m) for d, c, m in accepted if abs(d - w_mean) <= 1.0]
if not filtered:
filtered = accepted # fallback: keep all
# Second pass: weighted mean on filtered results
total_w2 = sum(c for _, c, _ in filtered)
final_deg = sum(d * c for d, c, _ in filtered) / total_w2
# Ensemble confidence: average of individual confidences, boosted when
# methods agree (all within 0.5° of each other)
avg_conf = total_w2 / len(filtered)
spread = max(d for d, _, _ in filtered) - min(d for d, _, _ in filtered)
agreement_bonus = 0.15 if spread < 0.5 else 0.0
ensemble_conf = min(1.0, avg_conf + agreement_bonus)
methods_str = "+".join(m for _, _, m in filtered)
return round(final_deg, 3), round(min(ensemble_conf, 1.0), 2), methods_str
def dewarp_image(img: np.ndarray, use_ensemble: bool = True) -> Tuple[np.ndarray, Dict[str, Any]]:
"""Correct vertical shear after deskew (v2 with quality gate).
After deskew aligns horizontal text lines, vertical features (column
edges) may still be tilted. This detects the tilt angle using an ensemble
of four complementary methods and applies an affine shear correction.
Methods (all run in ~150ms total):
A. _detect_shear_angle() — vertical edge profile (~50ms)
B. _detect_shear_by_projection() — horizontal text-line variance (~30ms)
C. _detect_shear_by_hough() — Hough lines on table borders (~20ms)
D. _detect_shear_by_text_lines() — text-line straightness (~50ms)
Quality gate: after correction, horizontal projection variance is compared
before vs after. If correction worsened alignment, it is discarded.
Args:
img: BGR image (already deskewed).
use_ensemble: If False, fall back to single-method behaviour (method A only).
Returns:
Tuple of (corrected_image, dewarp_info).
dewarp_info keys: method, shear_degrees, confidence, detections.
"""
no_correction = {
"method": "none",
"shear_degrees": 0.0,
"confidence": 0.0,
"detections": [],
}
if not CV2_AVAILABLE:
return img, no_correction
t0 = time.time()
if use_ensemble:
det_a = _detect_shear_angle(img)
det_b = _detect_shear_by_projection(img)
det_c = _detect_shear_by_hough(img)
det_d = _detect_shear_by_text_lines(img)
detections = [det_a, det_b, det_c, det_d]
shear_deg, confidence, method = _ensemble_shear(detections)
else:
det_a = _detect_shear_angle(img)
detections = [det_a]
shear_deg = det_a["shear_degrees"]
confidence = det_a["confidence"]
method = det_a["method"]
duration = time.time() - t0
logger.info(
"dewarp: ensemble shear=%.3f° conf=%.2f method=%s (%.2fs) | "
"A=%.3f/%.2f B=%.3f/%.2f C=%.3f/%.2f D=%.3f/%.2f",
shear_deg, confidence, method, duration,
detections[0]["shear_degrees"], detections[0]["confidence"],
detections[1]["shear_degrees"] if len(detections) > 1 else 0.0,
detections[1]["confidence"] if len(detections) > 1 else 0.0,
detections[2]["shear_degrees"] if len(detections) > 2 else 0.0,
detections[2]["confidence"] if len(detections) > 2 else 0.0,
detections[3]["shear_degrees"] if len(detections) > 3 else 0.0,
detections[3]["confidence"] if len(detections) > 3 else 0.0,
)
# Always include individual detections (even when no correction applied)
_all_detections = [
{"method": d["method"], "shear_degrees": d["shear_degrees"],
"confidence": d["confidence"]}
for d in detections
]
# Thresholds: very small shear (<0.08°) is truly irrelevant for OCR.
# For ensemble confidence, require at least 0.4 (lowered from 0.5 to
# catch moderate-confidence detections from multiple agreeing methods).
if abs(shear_deg) < 0.08 or confidence < 0.4:
no_correction["detections"] = _all_detections
return img, no_correction
# Apply correction (negate the detected shear to straighten)
corrected = _apply_shear(img, -shear_deg)
# Quality gate: verify the correction actually improved alignment.
# For small corrections (< 0.5°), the projection variance change can be
# negligible, so we skip the quality gate — the cost of a tiny wrong
# correction is much less than the cost of leaving 0.4° uncorrected
# (which shifts content ~25px at image edges on tall scans).
if abs(shear_deg) >= 0.5 and not _dewarp_quality_check(img, corrected):
logger.info("dewarp: quality gate REJECTED correction (%.3f°) — "
"projection variance did not improve", shear_deg)
no_correction["detections"] = _all_detections
return img, no_correction
info = {
"method": method,
"shear_degrees": shear_deg,
"confidence": confidence,
"detections": _all_detections,
}
return corrected, info
def dewarp_image_manual(img: np.ndarray, shear_degrees: float) -> np.ndarray:
"""Apply shear correction with a manual angle.
Args:
img: BGR image (deskewed, before dewarp).
shear_degrees: Shear angle in degrees to correct.
Returns:
Corrected image.
"""
if abs(shear_degrees) < 0.001:
return img
return _apply_shear(img, -shear_degrees)