Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 27s
CI / test-go-edu-search (push) Successful in 30s
CI / test-python-klausur (push) Failing after 1m59s
CI / test-python-agent-core (push) Successful in 16s
CI / test-nodejs-website (push) Successful in 18s
Monolithische 8163-Zeilen-Datei aufgeteilt in fokussierte Module: - cv_vocab_types.py (156 Z.): Dataklassen, Konstanten, IPA, Feature-Flags - cv_preprocessing.py (1166 Z.): Bild-I/O, Orientierung, Deskew, Dewarp - cv_layout.py (3036 Z.): Dokumenttyp, Spalten, Zeilen, Klassifikation - cv_ocr_engines.py (1282 Z.): OCR-Engines, Vocab-Postprocessing, Text-Cleaning - cv_cell_grid.py (1510 Z.): Cell-Grid v2+Legacy, Vocab-Konvertierung - cv_review.py (1184 Z.): LLM/Spell Review, Pipeline-Orchestrierung cv_vocab_pipeline.py ist jetzt eine Re-Export-Fassade (35 Z.) — alle bestehenden Imports bleiben unveraendert. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1167 lines
42 KiB
Python
1167 lines
42 KiB
Python
"""
|
||
Image I/O, orientation detection, deskew, and dewarp for the CV vocabulary pipeline.
|
||
|
||
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
||
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
||
"""
|
||
|
||
import logging
|
||
import time
|
||
from collections import defaultdict
|
||
from typing import Any, Dict, List, Tuple
|
||
|
||
import numpy as np
|
||
|
||
from cv_vocab_types import (
|
||
CV2_AVAILABLE,
|
||
TESSERACT_AVAILABLE,
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Guarded imports — mirror cv_vocab_types guards
|
||
try:
|
||
import cv2
|
||
except ImportError:
|
||
cv2 = None # type: ignore[assignment]
|
||
|
||
try:
|
||
import pytesseract
|
||
from PIL import Image
|
||
except ImportError:
|
||
pytesseract = None # type: ignore[assignment]
|
||
Image = None # type: ignore[assignment,misc]
|
||
|
||
|
||
def render_pdf_high_res(pdf_data: bytes, page_number: int = 0, zoom: float = 3.0) -> np.ndarray:
|
||
"""Render a PDF page to a high-resolution numpy array (BGR).
|
||
|
||
Args:
|
||
pdf_data: Raw PDF bytes.
|
||
page_number: 0-indexed page number.
|
||
zoom: Zoom factor (3.0 = 432 DPI).
|
||
|
||
Returns:
|
||
numpy array in BGR format.
|
||
"""
|
||
import fitz # PyMuPDF
|
||
|
||
pdf_doc = fitz.open(stream=pdf_data, filetype="pdf")
|
||
if page_number >= pdf_doc.page_count:
|
||
raise ValueError(f"Page {page_number} does not exist (PDF has {pdf_doc.page_count} pages)")
|
||
|
||
page = pdf_doc[page_number]
|
||
mat = fitz.Matrix(zoom, zoom)
|
||
pix = page.get_pixmap(matrix=mat)
|
||
|
||
# Convert to numpy BGR
|
||
img_data = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n)
|
||
if pix.n == 4: # RGBA
|
||
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGBA2BGR)
|
||
elif pix.n == 3: # RGB
|
||
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGB2BGR)
|
||
else: # Grayscale
|
||
img_bgr = cv2.cvtColor(img_data, cv2.COLOR_GRAY2BGR)
|
||
|
||
pdf_doc.close()
|
||
return img_bgr
|
||
|
||
|
||
def render_image_high_res(image_data: bytes) -> np.ndarray:
|
||
"""Load an image (PNG/JPEG) into a numpy array (BGR).
|
||
|
||
Args:
|
||
image_data: Raw image bytes.
|
||
|
||
Returns:
|
||
numpy array in BGR format.
|
||
"""
|
||
img_array = np.frombuffer(image_data, dtype=np.uint8)
|
||
img_bgr = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
|
||
if img_bgr is None:
|
||
raise ValueError("Could not decode image data")
|
||
return img_bgr
|
||
|
||
|
||
# =============================================================================
|
||
# Stage 1b: Orientation Detection (0°/90°/180°/270°)
|
||
# =============================================================================
|
||
|
||
def detect_and_fix_orientation(img_bgr: np.ndarray) -> Tuple[np.ndarray, int]:
|
||
"""Detect page orientation via Tesseract OSD and rotate if needed.
|
||
|
||
Handles upside-down scans (180°) common with book scanners where
|
||
every other page is flipped due to the scanner hinge.
|
||
|
||
Returns:
|
||
(corrected_image, rotation_degrees) — rotation is 0, 90, 180, or 270.
|
||
"""
|
||
if pytesseract is None:
|
||
return img_bgr, 0
|
||
|
||
try:
|
||
# Tesseract OSD needs a grayscale or RGB image
|
||
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
|
||
pil_img = Image.fromarray(gray)
|
||
|
||
osd = pytesseract.image_to_osd(pil_img, output_type=pytesseract.Output.DICT)
|
||
rotate = osd.get("rotate", 0)
|
||
confidence = osd.get("orientation_conf", 0.0)
|
||
|
||
logger.info(f"OSD: orientation={rotate}° confidence={confidence:.1f}")
|
||
|
||
if rotate == 0 or confidence < 1.0:
|
||
return img_bgr, 0
|
||
|
||
# Apply rotation
|
||
if rotate == 180:
|
||
corrected = cv2.rotate(img_bgr, cv2.ROTATE_180)
|
||
elif rotate == 90:
|
||
corrected = cv2.rotate(img_bgr, cv2.ROTATE_90_COUNTERCLOCKWISE)
|
||
elif rotate == 270:
|
||
corrected = cv2.rotate(img_bgr, cv2.ROTATE_90_CLOCKWISE)
|
||
else:
|
||
return img_bgr, 0
|
||
|
||
logger.info(f"OSD: rotated {rotate}° to fix orientation")
|
||
return corrected, rotate
|
||
|
||
except Exception as e:
|
||
logger.warning(f"OSD orientation detection failed: {e}")
|
||
return img_bgr, 0
|
||
|
||
|
||
# =============================================================================
|
||
# Stage 2: Deskew (Rotation Correction)
|
||
# =============================================================================
|
||
|
||
def deskew_image(img: np.ndarray) -> Tuple[np.ndarray, float]:
|
||
"""Correct rotation using Hough Line detection.
|
||
|
||
Args:
|
||
img: BGR image.
|
||
|
||
Returns:
|
||
Tuple of (corrected image, detected angle in degrees).
|
||
"""
|
||
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||
# Binarize for line detection
|
||
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
|
||
|
||
# Detect lines
|
||
lines = cv2.HoughLinesP(binary, 1, np.pi / 180, threshold=100,
|
||
minLineLength=img.shape[1] // 4, maxLineGap=20)
|
||
|
||
if lines is None or len(lines) < 3:
|
||
return img, 0.0
|
||
|
||
# Compute angles of near-horizontal lines
|
||
angles = []
|
||
for line in lines:
|
||
x1, y1, x2, y2 = line[0]
|
||
angle = np.degrees(np.arctan2(y2 - y1, x2 - x1))
|
||
if abs(angle) < 15: # Only near-horizontal
|
||
angles.append(angle)
|
||
|
||
if not angles:
|
||
return img, 0.0
|
||
|
||
median_angle = float(np.median(angles))
|
||
|
||
# Limit correction to ±5°
|
||
if abs(median_angle) > 5.0:
|
||
median_angle = 5.0 * np.sign(median_angle)
|
||
|
||
if abs(median_angle) < 0.1:
|
||
return img, 0.0
|
||
|
||
# Rotate
|
||
h, w = img.shape[:2]
|
||
center = (w // 2, h // 2)
|
||
M = cv2.getRotationMatrix2D(center, median_angle, 1.0)
|
||
corrected = cv2.warpAffine(img, M, (w, h),
|
||
flags=cv2.INTER_LINEAR,
|
||
borderMode=cv2.BORDER_REPLICATE)
|
||
|
||
logger.info(f"Deskew: corrected {median_angle:.2f}° rotation")
|
||
return corrected, median_angle
|
||
|
||
|
||
def deskew_image_by_word_alignment(
|
||
image_data: bytes,
|
||
lang: str = "eng+deu",
|
||
downscale_factor: float = 0.5,
|
||
) -> Tuple[bytes, float]:
|
||
"""Correct rotation by fitting a line through left-most word starts per text line.
|
||
|
||
More robust than Hough-based deskew for vocabulary worksheets where text lines
|
||
have consistent left-alignment. Runs a quick Tesseract pass on a downscaled
|
||
copy to find word positions, computes the dominant left-edge column, fits a
|
||
line through those points and rotates the full-resolution image.
|
||
|
||
Args:
|
||
image_data: Raw image bytes (PNG/JPEG).
|
||
lang: Tesseract language string for the quick pass.
|
||
downscale_factor: Shrink factor for the quick Tesseract pass (0.5 = 50%).
|
||
|
||
Returns:
|
||
Tuple of (rotated image as PNG bytes, detected angle in degrees).
|
||
"""
|
||
if not CV2_AVAILABLE or not TESSERACT_AVAILABLE:
|
||
return image_data, 0.0
|
||
|
||
# 1. Decode image
|
||
img_array = np.frombuffer(image_data, dtype=np.uint8)
|
||
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
|
||
if img is None:
|
||
logger.warning("deskew_by_word_alignment: could not decode image")
|
||
return image_data, 0.0
|
||
|
||
orig_h, orig_w = img.shape[:2]
|
||
|
||
# 2. Downscale for fast Tesseract pass
|
||
small_w = int(orig_w * downscale_factor)
|
||
small_h = int(orig_h * downscale_factor)
|
||
small = cv2.resize(img, (small_w, small_h), interpolation=cv2.INTER_AREA)
|
||
|
||
# 3. Quick Tesseract — word-level positions
|
||
pil_small = Image.fromarray(cv2.cvtColor(small, cv2.COLOR_BGR2RGB))
|
||
try:
|
||
data = pytesseract.image_to_data(
|
||
pil_small, lang=lang, config="--psm 6 --oem 3",
|
||
output_type=pytesseract.Output.DICT,
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"deskew_by_word_alignment: Tesseract failed: {e}")
|
||
return image_data, 0.0
|
||
|
||
# 4. Per text-line, find the left-most word start
|
||
# Group by (block_num, par_num, line_num)
|
||
line_groups: Dict[tuple, list] = defaultdict(list)
|
||
for i in range(len(data["text"])):
|
||
text = (data["text"][i] or "").strip()
|
||
conf = int(data["conf"][i])
|
||
if not text or conf < 20:
|
||
continue
|
||
key = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
|
||
line_groups[key].append(i)
|
||
|
||
if len(line_groups) < 5:
|
||
logger.info(f"deskew_by_word_alignment: only {len(line_groups)} lines, skipping")
|
||
return image_data, 0.0
|
||
|
||
# For each line, pick the word with smallest 'left' → compute (left_x, center_y)
|
||
# Scale back to original resolution
|
||
scale = 1.0 / downscale_factor
|
||
points = [] # list of (x, y) in original-image coords
|
||
for key, indices in line_groups.items():
|
||
best_idx = min(indices, key=lambda i: data["left"][i])
|
||
lx = data["left"][best_idx] * scale
|
||
top = data["top"][best_idx] * scale
|
||
h = data["height"][best_idx] * scale
|
||
cy = top + h / 2.0
|
||
points.append((lx, cy))
|
||
|
||
# 5. Find dominant left-edge column + compute angle
|
||
xs = np.array([p[0] for p in points])
|
||
ys = np.array([p[1] for p in points])
|
||
median_x = float(np.median(xs))
|
||
tolerance = orig_w * 0.03 # 3% of image width
|
||
|
||
mask = np.abs(xs - median_x) <= tolerance
|
||
filtered_xs = xs[mask]
|
||
filtered_ys = ys[mask]
|
||
|
||
if len(filtered_xs) < 5:
|
||
logger.info(f"deskew_by_word_alignment: only {len(filtered_xs)} aligned points after filter, skipping")
|
||
return image_data, 0.0
|
||
|
||
# polyfit: x = a*y + b → a = dx/dy → angle = arctan(a)
|
||
coeffs = np.polyfit(filtered_ys, filtered_xs, 1)
|
||
slope = coeffs[0] # dx/dy
|
||
angle_rad = np.arctan(slope)
|
||
angle_deg = float(np.degrees(angle_rad))
|
||
|
||
# Clamp to ±5°
|
||
angle_deg = max(-5.0, min(5.0, angle_deg))
|
||
|
||
logger.info(f"deskew_by_word_alignment: detected {angle_deg:.2f}° from {len(filtered_xs)} points "
|
||
f"(total lines: {len(line_groups)})")
|
||
|
||
if abs(angle_deg) < 0.05:
|
||
return image_data, 0.0
|
||
|
||
# 6. Rotate full-res image
|
||
center = (orig_w // 2, orig_h // 2)
|
||
M = cv2.getRotationMatrix2D(center, angle_deg, 1.0)
|
||
rotated = cv2.warpAffine(img, M, (orig_w, orig_h),
|
||
flags=cv2.INTER_LINEAR,
|
||
borderMode=cv2.BORDER_REPLICATE)
|
||
|
||
# Encode back to PNG
|
||
success, png_buf = cv2.imencode(".png", rotated)
|
||
if not success:
|
||
logger.warning("deskew_by_word_alignment: PNG encoding failed")
|
||
return image_data, 0.0
|
||
|
||
return png_buf.tobytes(), angle_deg
|
||
|
||
|
||
def _projection_gradient_score(profile: np.ndarray) -> float:
|
||
"""Score a projection profile by the L2-norm of its first derivative.
|
||
|
||
Higher score = sharper transitions between text-lines and gaps,
|
||
i.e. better row/column alignment.
|
||
"""
|
||
diff = np.diff(profile)
|
||
return float(np.sum(diff * diff))
|
||
|
||
|
||
def deskew_image_iterative(
|
||
img: np.ndarray,
|
||
coarse_range: float = 5.0,
|
||
coarse_step: float = 0.1,
|
||
fine_range: float = 0.15,
|
||
fine_step: float = 0.02,
|
||
) -> Tuple[np.ndarray, float, Dict[str, Any]]:
|
||
"""Iterative deskew using vertical-edge projection optimisation.
|
||
|
||
The key insight: at the correct rotation angle, vertical features
|
||
(word left-edges, column borders) become truly vertical, producing
|
||
the sharpest peaks in the vertical projection of vertical edges.
|
||
|
||
Method:
|
||
1. Detect vertical edges via Sobel-X on the central crop.
|
||
2. Coarse sweep: rotate edge image, compute vertical projection
|
||
gradient score. The angle where vertical edges align best wins.
|
||
3. Fine sweep: refine around the coarse winner.
|
||
|
||
Args:
|
||
img: BGR image (full resolution).
|
||
coarse_range: half-range in degrees for the coarse sweep.
|
||
coarse_step: step size in degrees for the coarse sweep.
|
||
fine_range: half-range around the coarse winner for the fine sweep.
|
||
fine_step: step size in degrees for the fine sweep.
|
||
|
||
Returns:
|
||
(rotated_bgr, angle_degrees, debug_dict)
|
||
"""
|
||
h, w = img.shape[:2]
|
||
debug: Dict[str, Any] = {}
|
||
|
||
# --- Grayscale + vertical edge detection ---
|
||
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||
|
||
# Central crop (15%-85% height, 10%-90% width) to avoid page margins
|
||
y_lo, y_hi = int(h * 0.15), int(h * 0.85)
|
||
x_lo, x_hi = int(w * 0.10), int(w * 0.90)
|
||
gray_crop = gray[y_lo:y_hi, x_lo:x_hi]
|
||
|
||
# Sobel-X → absolute vertical edges
|
||
sobel_x = cv2.Sobel(gray_crop, cv2.CV_64F, 1, 0, ksize=3)
|
||
edges = np.abs(sobel_x)
|
||
# Normalise to 0-255 for consistent scoring
|
||
edge_max = edges.max()
|
||
if edge_max > 0:
|
||
edges = (edges / edge_max * 255).astype(np.uint8)
|
||
else:
|
||
return img, 0.0, {"error": "no edges detected"}
|
||
|
||
crop_h, crop_w = edges.shape[:2]
|
||
crop_center = (crop_w // 2, crop_h // 2)
|
||
|
||
# Trim margin after rotation to avoid border artifacts
|
||
trim_y = max(4, int(crop_h * 0.03))
|
||
trim_x = max(4, int(crop_w * 0.03))
|
||
|
||
def _sweep_edges(angles: np.ndarray) -> list:
|
||
"""Score each angle by vertical projection gradient of vertical edges."""
|
||
results = []
|
||
for angle in angles:
|
||
if abs(angle) < 1e-6:
|
||
rotated = edges
|
||
else:
|
||
M = cv2.getRotationMatrix2D(crop_center, angle, 1.0)
|
||
rotated = cv2.warpAffine(edges, M, (crop_w, crop_h),
|
||
flags=cv2.INTER_NEAREST,
|
||
borderMode=cv2.BORDER_REPLICATE)
|
||
# Trim borders to avoid edge artifacts
|
||
trimmed = rotated[trim_y:-trim_y, trim_x:-trim_x]
|
||
v_profile = np.sum(trimmed, axis=0, dtype=np.float64)
|
||
score = _projection_gradient_score(v_profile)
|
||
results.append((float(angle), score))
|
||
return results
|
||
|
||
# --- Phase 1: coarse sweep ---
|
||
coarse_angles = np.arange(-coarse_range, coarse_range + coarse_step * 0.5, coarse_step)
|
||
coarse_results = _sweep_edges(coarse_angles)
|
||
best_coarse = max(coarse_results, key=lambda x: x[1])
|
||
best_coarse_angle, best_coarse_score = best_coarse
|
||
|
||
debug["coarse_best_angle"] = round(best_coarse_angle, 2)
|
||
debug["coarse_best_score"] = round(best_coarse_score, 1)
|
||
debug["coarse_scores"] = [(round(a, 2), round(s, 1)) for a, s in coarse_results]
|
||
|
||
# --- Phase 2: fine sweep around coarse winner ---
|
||
fine_lo = best_coarse_angle - fine_range
|
||
fine_hi = best_coarse_angle + fine_range
|
||
fine_angles = np.arange(fine_lo, fine_hi + fine_step * 0.5, fine_step)
|
||
fine_results = _sweep_edges(fine_angles)
|
||
best_fine = max(fine_results, key=lambda x: x[1])
|
||
best_fine_angle, best_fine_score = best_fine
|
||
|
||
debug["fine_best_angle"] = round(best_fine_angle, 2)
|
||
debug["fine_best_score"] = round(best_fine_score, 1)
|
||
debug["fine_scores"] = [(round(a, 2), round(s, 1)) for a, s in fine_results]
|
||
|
||
final_angle = best_fine_angle
|
||
|
||
# Clamp to ±5°
|
||
final_angle = max(-5.0, min(5.0, final_angle))
|
||
|
||
logger.info(f"deskew_iterative: coarse={best_coarse_angle:.2f}° fine={best_fine_angle:.2f}° -> {final_angle:.2f}°")
|
||
|
||
if abs(final_angle) < 0.05:
|
||
return img, 0.0, debug
|
||
|
||
# --- Rotate full-res image ---
|
||
center = (w // 2, h // 2)
|
||
M = cv2.getRotationMatrix2D(center, final_angle, 1.0)
|
||
rotated = cv2.warpAffine(img, M, (w, h),
|
||
flags=cv2.INTER_LINEAR,
|
||
borderMode=cv2.BORDER_REPLICATE)
|
||
|
||
return rotated, final_angle, debug
|
||
|
||
|
||
def _measure_textline_slope(img: np.ndarray) -> float:
|
||
"""Measure residual text-line slope via Tesseract word-position regression.
|
||
|
||
Groups Tesseract words by (block, par, line), fits a linear regression
|
||
per line (y = slope * x + b), and returns the trimmed-mean slope in
|
||
degrees. Positive = text rises to the right, negative = falls.
|
||
|
||
This is the most direct measurement of remaining rotation after deskew.
|
||
"""
|
||
import math as _math
|
||
|
||
if not TESSERACT_AVAILABLE or not CV2_AVAILABLE:
|
||
return 0.0
|
||
|
||
h, w = img.shape[:2]
|
||
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||
data = pytesseract.image_to_data(
|
||
Image.fromarray(gray),
|
||
output_type=pytesseract.Output.DICT,
|
||
config="--psm 6",
|
||
)
|
||
|
||
# Group word centres by text line
|
||
lines: Dict[tuple, list] = {}
|
||
for i in range(len(data["text"])):
|
||
txt = (data["text"][i] or "").strip()
|
||
if len(txt) < 2 or int(data["conf"][i]) < 30:
|
||
continue
|
||
key = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
|
||
cx = data["left"][i] + data["width"][i] / 2.0
|
||
cy = data["top"][i] + data["height"][i] / 2.0
|
||
lines.setdefault(key, []).append((cx, cy))
|
||
|
||
# Per-line linear regression → slope angle
|
||
slopes: list = []
|
||
for pts in lines.values():
|
||
if len(pts) < 3:
|
||
continue
|
||
pts.sort(key=lambda p: p[0])
|
||
xs = np.array([p[0] for p in pts], dtype=np.float64)
|
||
ys = np.array([p[1] for p in pts], dtype=np.float64)
|
||
if xs[-1] - xs[0] < w * 0.15:
|
||
continue # skip short lines
|
||
A = np.vstack([xs, np.ones_like(xs)]).T
|
||
result = np.linalg.lstsq(A, ys, rcond=None)
|
||
slope = result[0][0]
|
||
slopes.append(_math.degrees(_math.atan(slope)))
|
||
|
||
if len(slopes) < 3:
|
||
return 0.0
|
||
|
||
# Trimmed mean (drop 10% extremes on each side)
|
||
slopes.sort()
|
||
trim = max(1, len(slopes) // 10)
|
||
trimmed = slopes[trim:-trim] if len(slopes) > 2 * trim else slopes
|
||
if not trimmed:
|
||
return 0.0
|
||
|
||
return sum(trimmed) / len(trimmed)
|
||
|
||
|
||
def deskew_two_pass(
|
||
img: np.ndarray,
|
||
coarse_range: float = 5.0,
|
||
) -> Tuple[np.ndarray, float, Dict[str, Any]]:
|
||
"""Two-pass deskew: iterative projection + word-alignment residual check.
|
||
|
||
Pass 1: ``deskew_image_iterative()`` (vertical-edge projection, wide range).
|
||
Pass 2: ``deskew_image_by_word_alignment()`` on the already-corrected image
|
||
to detect and fix residual skew that the projection method missed.
|
||
|
||
The two corrections are summed. If the residual from Pass 2 is below
|
||
0.3° it is ignored (already good enough).
|
||
|
||
Returns:
|
||
(corrected_bgr, total_angle_degrees, debug_dict)
|
||
"""
|
||
debug: Dict[str, Any] = {}
|
||
|
||
# --- Pass 1: iterative projection ---
|
||
corrected, angle1, dbg1 = deskew_image_iterative(
|
||
img.copy(), coarse_range=coarse_range,
|
||
)
|
||
debug["pass1_angle"] = round(angle1, 3)
|
||
debug["pass1_method"] = "iterative"
|
||
debug["pass1_debug"] = dbg1
|
||
|
||
# --- Pass 2: word-alignment residual check on corrected image ---
|
||
angle2 = 0.0
|
||
try:
|
||
# Encode the corrected image to PNG bytes for word-alignment
|
||
ok, buf = cv2.imencode(".png", corrected)
|
||
if ok:
|
||
corrected_bytes, angle2 = deskew_image_by_word_alignment(buf.tobytes())
|
||
if abs(angle2) >= 0.3:
|
||
# Significant residual — decode and use the second correction
|
||
arr2 = np.frombuffer(corrected_bytes, dtype=np.uint8)
|
||
corrected2 = cv2.imdecode(arr2, cv2.IMREAD_COLOR)
|
||
if corrected2 is not None:
|
||
corrected = corrected2
|
||
logger.info(f"deskew_two_pass: pass2 residual={angle2:.2f}° applied "
|
||
f"(total={angle1 + angle2:.2f}°)")
|
||
else:
|
||
angle2 = 0.0
|
||
else:
|
||
logger.info(f"deskew_two_pass: pass2 residual={angle2:.2f}° < 0.3° — skipped")
|
||
angle2 = 0.0
|
||
except Exception as e:
|
||
logger.warning(f"deskew_two_pass: pass2 word-alignment failed: {e}")
|
||
angle2 = 0.0
|
||
|
||
# --- Pass 3: Tesseract text-line regression residual check ---
|
||
# The most reliable final check: measure actual text-line slopes
|
||
# using Tesseract word positions and linear regression per line.
|
||
angle3 = 0.0
|
||
try:
|
||
residual = _measure_textline_slope(corrected)
|
||
debug["pass3_raw"] = round(residual, 3)
|
||
if abs(residual) >= 0.3:
|
||
h3, w3 = corrected.shape[:2]
|
||
center3 = (w3 // 2, h3 // 2)
|
||
M3 = cv2.getRotationMatrix2D(center3, residual, 1.0)
|
||
corrected = cv2.warpAffine(
|
||
corrected, M3, (w3, h3),
|
||
flags=cv2.INTER_LINEAR,
|
||
borderMode=cv2.BORDER_REPLICATE,
|
||
)
|
||
angle3 = residual
|
||
logger.info(
|
||
"deskew_two_pass: pass3 text-line residual=%.2f° applied",
|
||
residual,
|
||
)
|
||
else:
|
||
logger.info(
|
||
"deskew_two_pass: pass3 text-line residual=%.2f° < 0.3° — skipped",
|
||
residual,
|
||
)
|
||
except Exception as e:
|
||
logger.warning("deskew_two_pass: pass3 text-line check failed: %s", e)
|
||
|
||
total_angle = angle1 + angle2 + angle3
|
||
debug["pass2_angle"] = round(angle2, 3)
|
||
debug["pass2_method"] = "word_alignment"
|
||
debug["pass3_angle"] = round(angle3, 3)
|
||
debug["pass3_method"] = "textline_regression"
|
||
debug["total_angle"] = round(total_angle, 3)
|
||
|
||
logger.info(
|
||
"deskew_two_pass: pass1=%.2f° + pass2=%.2f° + pass3=%.2f° = %.2f°",
|
||
angle1, angle2, angle3, total_angle,
|
||
)
|
||
|
||
return corrected, total_angle, debug
|
||
|
||
|
||
# =============================================================================
|
||
# Stage 3: Dewarp (Book Curvature Correction)
|
||
# =============================================================================
|
||
|
||
def _detect_shear_angle(img: np.ndarray) -> Dict[str, Any]:
|
||
"""Detect the vertical shear angle of the page.
|
||
|
||
After deskew (horizontal lines aligned), vertical features like column
|
||
edges may still be tilted. This measures that tilt by tracking the
|
||
strongest vertical edge across horizontal strips.
|
||
|
||
The result is a shear angle in degrees: the angular difference between
|
||
true vertical and the detected column edge.
|
||
|
||
Returns:
|
||
Dict with keys: method, shear_degrees, confidence.
|
||
"""
|
||
h, w = img.shape[:2]
|
||
result = {"method": "vertical_edge", "shear_degrees": 0.0, "confidence": 0.0}
|
||
|
||
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||
|
||
# Vertical Sobel to find vertical edges
|
||
sobel_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
|
||
abs_sobel = np.abs(sobel_x).astype(np.uint8)
|
||
|
||
# Binarize with Otsu
|
||
_, binary = cv2.threshold(abs_sobel, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
|
||
|
||
num_strips = 20
|
||
strip_h = h // num_strips
|
||
edge_positions = [] # (y_center, x_position)
|
||
|
||
for i in range(num_strips):
|
||
y_start = i * strip_h
|
||
y_end = min((i + 1) * strip_h, h)
|
||
strip = binary[y_start:y_end, :]
|
||
|
||
# Project vertically (sum along y-axis)
|
||
projection = np.sum(strip, axis=0).astype(np.float64)
|
||
if projection.max() == 0:
|
||
continue
|
||
|
||
# Find the strongest vertical edge in left 40% of image
|
||
search_w = int(w * 0.4)
|
||
left_proj = projection[:search_w]
|
||
if left_proj.max() == 0:
|
||
continue
|
||
|
||
# Smooth and find peak
|
||
kernel_size = max(3, w // 100)
|
||
if kernel_size % 2 == 0:
|
||
kernel_size += 1
|
||
smoothed = cv2.GaussianBlur(left_proj.reshape(1, -1), (kernel_size, 1), 0).flatten()
|
||
x_pos = float(np.argmax(smoothed))
|
||
y_center = (y_start + y_end) / 2.0
|
||
edge_positions.append((y_center, x_pos))
|
||
|
||
if len(edge_positions) < 8:
|
||
return result
|
||
|
||
ys = np.array([p[0] for p in edge_positions])
|
||
xs = np.array([p[1] for p in edge_positions])
|
||
|
||
# Remove outliers (> 2 std from median)
|
||
median_x = np.median(xs)
|
||
std_x = max(np.std(xs), 1.0)
|
||
mask = np.abs(xs - median_x) < 2 * std_x
|
||
ys = ys[mask]
|
||
xs = xs[mask]
|
||
|
||
if len(ys) < 6:
|
||
return result
|
||
|
||
# Fit straight line: x = slope * y + intercept
|
||
# The slope tells us the tilt of the vertical edge
|
||
straight_coeffs = np.polyfit(ys, xs, 1)
|
||
slope = straight_coeffs[0] # dx/dy in pixels
|
||
fitted = np.polyval(straight_coeffs, ys)
|
||
residuals = xs - fitted
|
||
rmse = float(np.sqrt(np.mean(residuals ** 2)))
|
||
|
||
# Convert slope to angle: arctan(dx/dy) in degrees
|
||
import math
|
||
shear_degrees = math.degrees(math.atan(slope))
|
||
|
||
confidence = min(1.0, len(ys) / 15.0) * max(0.5, 1.0 - rmse / 5.0)
|
||
|
||
result["shear_degrees"] = round(shear_degrees, 3)
|
||
result["confidence"] = round(float(confidence), 2)
|
||
|
||
return result
|
||
|
||
|
||
def _detect_shear_by_projection(img: np.ndarray) -> Dict[str, Any]:
|
||
"""Detect shear angle by maximising variance of horizontal text-line projections.
|
||
|
||
Principle: horizontal text lines produce a row-projection profile with sharp
|
||
peaks (high variance) when the image is correctly aligned. Any residual shear
|
||
smears the peaks and reduces variance. We sweep ±3° and pick the angle whose
|
||
corrected projection has the highest variance.
|
||
|
||
Works best on pages with clear horizontal banding (vocabulary tables, prose).
|
||
Complements _detect_shear_angle() which needs strong vertical edges.
|
||
|
||
Returns:
|
||
Dict with keys: method, shear_degrees, confidence.
|
||
"""
|
||
import math
|
||
result = {"method": "projection", "shear_degrees": 0.0, "confidence": 0.0}
|
||
|
||
h, w = img.shape[:2]
|
||
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||
|
||
# Otsu binarisation
|
||
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
|
||
|
||
# Work at half resolution for speed
|
||
small = cv2.resize(binary, (w // 2, h // 2), interpolation=cv2.INTER_AREA)
|
||
sh, sw = small.shape
|
||
|
||
# 2-pass angle sweep for 10x better precision:
|
||
# Pass 1: Coarse sweep ±3° in 0.5° steps (13 values)
|
||
# Pass 2: Fine sweep ±0.5° around coarse best in 0.05° steps (21 values)
|
||
|
||
def _sweep_variance(angles_list):
|
||
results = []
|
||
for angle_deg in angles_list:
|
||
if abs(angle_deg) < 0.001:
|
||
rotated = small
|
||
else:
|
||
shear_tan = math.tan(math.radians(angle_deg))
|
||
M = np.float32([[1, shear_tan, -sh / 2.0 * shear_tan], [0, 1, 0]])
|
||
rotated = cv2.warpAffine(small, M, (sw, sh),
|
||
flags=cv2.INTER_NEAREST,
|
||
borderMode=cv2.BORDER_CONSTANT)
|
||
profile = np.sum(rotated, axis=1).astype(float)
|
||
results.append((angle_deg, float(np.var(profile))))
|
||
return results
|
||
|
||
# Pass 1: coarse
|
||
coarse_angles = [a * 0.5 for a in range(-6, 7)] # 13 values
|
||
coarse_results = _sweep_variance(coarse_angles)
|
||
coarse_best = max(coarse_results, key=lambda x: x[1])
|
||
|
||
# Pass 2: fine around coarse best
|
||
fine_center = coarse_best[0]
|
||
fine_angles = [fine_center + a * 0.05 for a in range(-10, 11)] # 21 values
|
||
fine_results = _sweep_variance(fine_angles)
|
||
fine_best = max(fine_results, key=lambda x: x[1])
|
||
|
||
best_angle = fine_best[0]
|
||
best_variance = fine_best[1]
|
||
variances = coarse_results + fine_results
|
||
|
||
# Confidence: how much sharper is the best angle vs. the mean?
|
||
all_mean = sum(v for _, v in variances) / len(variances)
|
||
if all_mean > 0 and best_variance > all_mean:
|
||
confidence = min(1.0, (best_variance - all_mean) / (all_mean + 1.0) * 0.6)
|
||
else:
|
||
confidence = 0.0
|
||
|
||
result["shear_degrees"] = round(best_angle, 3)
|
||
result["confidence"] = round(max(0.0, min(1.0, confidence)), 2)
|
||
return result
|
||
|
||
|
||
def _detect_shear_by_hough(img: np.ndarray) -> Dict[str, Any]:
|
||
"""Detect shear using Hough transform on printed table / ruled lines.
|
||
|
||
Vocabulary worksheets have near-horizontal printed table borders. After
|
||
deskew these should be exactly horizontal; any residual tilt equals the
|
||
vertical shear angle (with inverted sign).
|
||
|
||
The sign convention: a horizontal line tilting +α degrees (left end lower)
|
||
means the page has vertical shear of -α degrees (left column edge drifts
|
||
to the left going downward).
|
||
|
||
Returns:
|
||
Dict with keys: method, shear_degrees, confidence.
|
||
"""
|
||
result = {"method": "hough_lines", "shear_degrees": 0.0, "confidence": 0.0}
|
||
|
||
h, w = img.shape[:2]
|
||
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||
|
||
edges = cv2.Canny(gray, 50, 150, apertureSize=3)
|
||
|
||
min_len = int(w * 0.15)
|
||
lines = cv2.HoughLinesP(
|
||
edges, rho=1, theta=np.pi / 360,
|
||
threshold=int(w * 0.08),
|
||
minLineLength=min_len,
|
||
maxLineGap=20,
|
||
)
|
||
|
||
if lines is None or len(lines) < 3:
|
||
return result
|
||
|
||
horizontal_angles: List[Tuple[float, float]] = []
|
||
for line in lines:
|
||
x1, y1, x2, y2 = line[0]
|
||
if x1 == x2:
|
||
continue
|
||
angle = float(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
|
||
if abs(angle) <= 5.0:
|
||
length = float(np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2))
|
||
horizontal_angles.append((angle, length))
|
||
|
||
if len(horizontal_angles) < 3:
|
||
return result
|
||
|
||
# Weighted median
|
||
angles_arr = np.array([a for a, _ in horizontal_angles])
|
||
weights_arr = np.array([l for _, l in horizontal_angles])
|
||
sorted_idx = np.argsort(angles_arr)
|
||
s_angles = angles_arr[sorted_idx]
|
||
s_weights = weights_arr[sorted_idx]
|
||
cum = np.cumsum(s_weights)
|
||
mid_idx = int(np.searchsorted(cum, cum[-1] / 2.0))
|
||
median_angle = float(s_angles[min(mid_idx, len(s_angles) - 1)])
|
||
|
||
agree = sum(1 for a, _ in horizontal_angles if abs(a - median_angle) < 1.0)
|
||
confidence = min(1.0, agree / max(len(horizontal_angles), 1)) * 0.85
|
||
|
||
# Sign inversion: horizontal line tilt is complementary to vertical shear
|
||
shear_degrees = -median_angle
|
||
|
||
result["shear_degrees"] = round(shear_degrees, 3)
|
||
result["confidence"] = round(max(0.0, min(1.0, confidence)), 2)
|
||
return result
|
||
|
||
|
||
def _detect_shear_by_text_lines(img: np.ndarray) -> Dict[str, Any]:
|
||
"""Detect shear by measuring text-line straightness (Method D).
|
||
|
||
Runs a quick Tesseract scan (PSM 11, 50% downscale) to locate word
|
||
bounding boxes, groups them into vertical columns by X-proximity,
|
||
and measures how the left-edge X position drifts with Y (vertical
|
||
position). The drift dx/dy is the tangent of the shear angle.
|
||
|
||
This directly measures vertical shear (column tilt) rather than
|
||
horizontal text-line slope, which is already corrected by deskew.
|
||
|
||
Returns:
|
||
Dict with keys: method, shear_degrees, confidence.
|
||
"""
|
||
import math
|
||
result = {"method": "text_lines", "shear_degrees": 0.0, "confidence": 0.0}
|
||
|
||
h, w = img.shape[:2]
|
||
# Downscale 50% for speed
|
||
scale = 0.5
|
||
small = cv2.resize(img, (int(w * scale), int(h * scale)),
|
||
interpolation=cv2.INTER_AREA)
|
||
gray = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY)
|
||
pil_img = Image.fromarray(gray)
|
||
|
||
try:
|
||
data = pytesseract.image_to_data(
|
||
pil_img, lang='eng+deu', config='--psm 11 --oem 3',
|
||
output_type=pytesseract.Output.DICT,
|
||
)
|
||
except Exception:
|
||
return result
|
||
|
||
# Collect word left-edges (x) and vertical centres (y)
|
||
words = []
|
||
for i in range(len(data['text'])):
|
||
text = data['text'][i].strip()
|
||
conf = int(data['conf'][i])
|
||
if not text or conf < 20 or len(text) < 2:
|
||
continue
|
||
left_x = float(data['left'][i])
|
||
cy = data['top'][i] + data['height'][i] / 2.0
|
||
word_w = float(data['width'][i])
|
||
words.append((left_x, cy, word_w))
|
||
|
||
if len(words) < 15:
|
||
return result
|
||
|
||
# --- Group words into vertical columns by left-edge X proximity ---
|
||
# Sort by x, then cluster words whose left-edges are within x_tol
|
||
avg_w = sum(ww for _, _, ww in words) / len(words)
|
||
x_tol = max(avg_w * 0.4, 8) # tolerance for "same column"
|
||
|
||
words_by_x = sorted(words, key=lambda w: w[0])
|
||
columns: List[List[Tuple[float, float]]] = [] # each: [(left_x, cy), ...]
|
||
cur_col: List[Tuple[float, float]] = [(words_by_x[0][0], words_by_x[0][1])]
|
||
cur_x = words_by_x[0][0]
|
||
|
||
for lx, cy, _ in words_by_x[1:]:
|
||
if abs(lx - cur_x) <= x_tol:
|
||
cur_col.append((lx, cy))
|
||
# Update running x as median of cluster
|
||
cur_x = cur_x * 0.8 + lx * 0.2
|
||
else:
|
||
if len(cur_col) >= 5:
|
||
columns.append(cur_col)
|
||
cur_col = [(lx, cy)]
|
||
cur_x = lx
|
||
if len(cur_col) >= 5:
|
||
columns.append(cur_col)
|
||
|
||
if len(columns) < 2:
|
||
return result
|
||
|
||
# --- For each column, measure X-drift as a function of Y ---
|
||
# Fit: left_x = a * cy + b → a = dx/dy = tan(shear_angle)
|
||
drifts = []
|
||
for col in columns:
|
||
ys = np.array([p[1] for p in col])
|
||
xs = np.array([p[0] for p in col])
|
||
y_range = ys.max() - ys.min()
|
||
if y_range < h * scale * 0.3:
|
||
continue # column must span at least 30% of image height
|
||
# Linear regression: x = a*y + b
|
||
coeffs = np.polyfit(ys, xs, 1)
|
||
drifts.append(coeffs[0]) # dx/dy
|
||
|
||
if len(drifts) < 2:
|
||
return result
|
||
|
||
# Median dx/dy → shear angle
|
||
# dx/dy > 0 means left-edges move RIGHT as we go DOWN → columns lean right
|
||
median_drift = float(np.median(drifts))
|
||
shear_degrees = math.degrees(math.atan(median_drift))
|
||
|
||
# Confidence from column count + drift consistency
|
||
drift_std = float(np.std(drifts))
|
||
consistency = max(0.0, 1.0 - drift_std * 50) # tighter penalty for drift variance
|
||
count_factor = min(1.0, len(drifts) / 4.0)
|
||
confidence = count_factor * 0.5 + consistency * 0.5
|
||
|
||
result["shear_degrees"] = round(shear_degrees, 3)
|
||
result["confidence"] = round(max(0.0, min(1.0, confidence)), 2)
|
||
logger.info("text_lines(v2): %d columns, %d drifts, median=%.4f, "
|
||
"shear=%.3f°, conf=%.2f",
|
||
len(columns), len(drifts), median_drift,
|
||
shear_degrees, confidence)
|
||
return result
|
||
|
||
|
||
def _dewarp_quality_check(original: np.ndarray, corrected: np.ndarray) -> bool:
|
||
"""Check whether the dewarp correction actually improved alignment.
|
||
|
||
Compares horizontal projection variance before and after correction.
|
||
Higher variance means sharper text-line peaks, which indicates better
|
||
horizontal alignment.
|
||
|
||
Returns True if the correction improved the image, False if it should
|
||
be discarded.
|
||
"""
|
||
def _h_proj_variance(img: np.ndarray) -> float:
|
||
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||
_, binary = cv2.threshold(gray, 0, 255,
|
||
cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
|
||
small = cv2.resize(binary, (binary.shape[1] // 2, binary.shape[0] // 2),
|
||
interpolation=cv2.INTER_AREA)
|
||
profile = np.sum(small, axis=1).astype(float)
|
||
return float(np.var(profile))
|
||
|
||
var_before = _h_proj_variance(original)
|
||
var_after = _h_proj_variance(corrected)
|
||
|
||
# Correction must improve variance (even by a tiny margin)
|
||
return var_after > var_before
|
||
|
||
|
||
def _apply_shear(img: np.ndarray, shear_degrees: float) -> np.ndarray:
|
||
"""Apply a vertical shear correction to an image.
|
||
|
||
Shifts each row horizontally proportional to its distance from the
|
||
vertical center. This corrects the tilt of vertical features (columns)
|
||
without affecting horizontal alignment (text lines).
|
||
|
||
Args:
|
||
img: BGR image.
|
||
shear_degrees: Shear angle in degrees. Positive = shift top-right/bottom-left.
|
||
|
||
Returns:
|
||
Corrected image.
|
||
"""
|
||
import math
|
||
h, w = img.shape[:2]
|
||
shear_tan = math.tan(math.radians(shear_degrees))
|
||
|
||
# Affine matrix: shift x by shear_tan * (y - h/2)
|
||
# [1 shear_tan -h/2*shear_tan]
|
||
# [0 1 0 ]
|
||
M = np.float32([
|
||
[1, shear_tan, -h / 2.0 * shear_tan],
|
||
[0, 1, 0],
|
||
])
|
||
|
||
corrected = cv2.warpAffine(img, M, (w, h),
|
||
flags=cv2.INTER_LINEAR,
|
||
borderMode=cv2.BORDER_REPLICATE)
|
||
return corrected
|
||
|
||
|
||
def _ensemble_shear(detections: List[Dict[str, Any]]) -> Tuple[float, float, str]:
|
||
"""Combine multiple shear detections into a single weighted estimate (v2).
|
||
|
||
Ensemble v2 changes vs v1:
|
||
- Minimum confidence raised to 0.5 (was 0.3)
|
||
- text_lines method gets 1.5× weight boost (most reliable detector)
|
||
- Outlier filter at 1° from weighted mean
|
||
|
||
Returns:
|
||
(shear_degrees, ensemble_confidence, methods_used_str)
|
||
"""
|
||
# Confidence threshold — lowered from 0.5 to 0.35 to catch subtle shear
|
||
# that individual methods detect with moderate confidence.
|
||
_MIN_CONF = 0.35
|
||
|
||
# text_lines gets a weight boost as the most content-aware method
|
||
_METHOD_WEIGHT_BOOST = {"text_lines": 1.5}
|
||
|
||
accepted = []
|
||
for d in detections:
|
||
if d["confidence"] < _MIN_CONF:
|
||
continue
|
||
boost = _METHOD_WEIGHT_BOOST.get(d["method"], 1.0)
|
||
effective_conf = d["confidence"] * boost
|
||
accepted.append((d["shear_degrees"], effective_conf, d["method"]))
|
||
|
||
if not accepted:
|
||
return 0.0, 0.0, "none"
|
||
|
||
if len(accepted) == 1:
|
||
deg, conf, method = accepted[0]
|
||
return deg, min(conf, 1.0), method
|
||
|
||
# First pass: weighted mean
|
||
total_w = sum(c for _, c, _ in accepted)
|
||
w_mean = sum(d * c for d, c, _ in accepted) / total_w
|
||
|
||
# Outlier filter: keep results within 1° of weighted mean
|
||
filtered = [(d, c, m) for d, c, m in accepted if abs(d - w_mean) <= 1.0]
|
||
if not filtered:
|
||
filtered = accepted # fallback: keep all
|
||
|
||
# Second pass: weighted mean on filtered results
|
||
total_w2 = sum(c for _, c, _ in filtered)
|
||
final_deg = sum(d * c for d, c, _ in filtered) / total_w2
|
||
|
||
# Ensemble confidence: average of individual confidences, boosted when
|
||
# methods agree (all within 0.5° of each other)
|
||
avg_conf = total_w2 / len(filtered)
|
||
spread = max(d for d, _, _ in filtered) - min(d for d, _, _ in filtered)
|
||
agreement_bonus = 0.15 if spread < 0.5 else 0.0
|
||
ensemble_conf = min(1.0, avg_conf + agreement_bonus)
|
||
|
||
methods_str = "+".join(m for _, _, m in filtered)
|
||
return round(final_deg, 3), round(min(ensemble_conf, 1.0), 2), methods_str
|
||
|
||
|
||
def dewarp_image(img: np.ndarray, use_ensemble: bool = True) -> Tuple[np.ndarray, Dict[str, Any]]:
|
||
"""Correct vertical shear after deskew (v2 with quality gate).
|
||
|
||
After deskew aligns horizontal text lines, vertical features (column
|
||
edges) may still be tilted. This detects the tilt angle using an ensemble
|
||
of four complementary methods and applies an affine shear correction.
|
||
|
||
Methods (all run in ~150ms total):
|
||
A. _detect_shear_angle() — vertical edge profile (~50ms)
|
||
B. _detect_shear_by_projection() — horizontal text-line variance (~30ms)
|
||
C. _detect_shear_by_hough() — Hough lines on table borders (~20ms)
|
||
D. _detect_shear_by_text_lines() — text-line straightness (~50ms)
|
||
|
||
Quality gate: after correction, horizontal projection variance is compared
|
||
before vs after. If correction worsened alignment, it is discarded.
|
||
|
||
Args:
|
||
img: BGR image (already deskewed).
|
||
use_ensemble: If False, fall back to single-method behaviour (method A only).
|
||
|
||
Returns:
|
||
Tuple of (corrected_image, dewarp_info).
|
||
dewarp_info keys: method, shear_degrees, confidence, detections.
|
||
"""
|
||
no_correction = {
|
||
"method": "none",
|
||
"shear_degrees": 0.0,
|
||
"confidence": 0.0,
|
||
"detections": [],
|
||
}
|
||
|
||
if not CV2_AVAILABLE:
|
||
return img, no_correction
|
||
|
||
t0 = time.time()
|
||
|
||
if use_ensemble:
|
||
det_a = _detect_shear_angle(img)
|
||
det_b = _detect_shear_by_projection(img)
|
||
det_c = _detect_shear_by_hough(img)
|
||
det_d = _detect_shear_by_text_lines(img)
|
||
detections = [det_a, det_b, det_c, det_d]
|
||
shear_deg, confidence, method = _ensemble_shear(detections)
|
||
else:
|
||
det_a = _detect_shear_angle(img)
|
||
detections = [det_a]
|
||
shear_deg = det_a["shear_degrees"]
|
||
confidence = det_a["confidence"]
|
||
method = det_a["method"]
|
||
|
||
duration = time.time() - t0
|
||
|
||
logger.info(
|
||
"dewarp: ensemble shear=%.3f° conf=%.2f method=%s (%.2fs) | "
|
||
"A=%.3f/%.2f B=%.3f/%.2f C=%.3f/%.2f D=%.3f/%.2f",
|
||
shear_deg, confidence, method, duration,
|
||
detections[0]["shear_degrees"], detections[0]["confidence"],
|
||
detections[1]["shear_degrees"] if len(detections) > 1 else 0.0,
|
||
detections[1]["confidence"] if len(detections) > 1 else 0.0,
|
||
detections[2]["shear_degrees"] if len(detections) > 2 else 0.0,
|
||
detections[2]["confidence"] if len(detections) > 2 else 0.0,
|
||
detections[3]["shear_degrees"] if len(detections) > 3 else 0.0,
|
||
detections[3]["confidence"] if len(detections) > 3 else 0.0,
|
||
)
|
||
|
||
# Always include individual detections (even when no correction applied)
|
||
_all_detections = [
|
||
{"method": d["method"], "shear_degrees": d["shear_degrees"],
|
||
"confidence": d["confidence"]}
|
||
for d in detections
|
||
]
|
||
|
||
# Thresholds: very small shear (<0.08°) is truly irrelevant for OCR.
|
||
# For ensemble confidence, require at least 0.4 (lowered from 0.5 to
|
||
# catch moderate-confidence detections from multiple agreeing methods).
|
||
if abs(shear_deg) < 0.08 or confidence < 0.4:
|
||
no_correction["detections"] = _all_detections
|
||
return img, no_correction
|
||
|
||
# Apply correction (negate the detected shear to straighten)
|
||
corrected = _apply_shear(img, -shear_deg)
|
||
|
||
# Quality gate: verify the correction actually improved alignment.
|
||
# For small corrections (< 0.5°), the projection variance change can be
|
||
# negligible, so we skip the quality gate — the cost of a tiny wrong
|
||
# correction is much less than the cost of leaving 0.4° uncorrected
|
||
# (which shifts content ~25px at image edges on tall scans).
|
||
if abs(shear_deg) >= 0.5 and not _dewarp_quality_check(img, corrected):
|
||
logger.info("dewarp: quality gate REJECTED correction (%.3f°) — "
|
||
"projection variance did not improve", shear_deg)
|
||
no_correction["detections"] = _all_detections
|
||
return img, no_correction
|
||
|
||
info = {
|
||
"method": method,
|
||
"shear_degrees": shear_deg,
|
||
"confidence": confidence,
|
||
"detections": _all_detections,
|
||
}
|
||
|
||
return corrected, info
|
||
|
||
|
||
def dewarp_image_manual(img: np.ndarray, shear_degrees: float) -> np.ndarray:
|
||
"""Apply shear correction with a manual angle.
|
||
|
||
Args:
|
||
img: BGR image (deskewed, before dewarp).
|
||
shear_degrees: Shear angle in degrees to correct.
|
||
|
||
Returns:
|
||
Corrected image.
|
||
"""
|
||
if abs(shear_degrees) < 0.001:
|
||
return img
|
||
return _apply_shear(img, -shear_degrees)
|
||
|