""" Image I/O, orientation detection, deskew, and dewarp for the CV vocabulary pipeline. Lizenz: Apache 2.0 (kommerziell nutzbar) DATENSCHUTZ: Alle Verarbeitung erfolgt lokal. """ import logging import time from collections import defaultdict from typing import Any, Dict, List, Tuple import numpy as np from cv_vocab_types import ( CV2_AVAILABLE, TESSERACT_AVAILABLE, ) logger = logging.getLogger(__name__) # Guarded imports — mirror cv_vocab_types guards try: import cv2 except ImportError: cv2 = None # type: ignore[assignment] try: import pytesseract from PIL import Image except ImportError: pytesseract = None # type: ignore[assignment] Image = None # type: ignore[assignment,misc] def render_pdf_high_res(pdf_data: bytes, page_number: int = 0, zoom: float = 3.0) -> np.ndarray: """Render a PDF page to a high-resolution numpy array (BGR). Args: pdf_data: Raw PDF bytes. page_number: 0-indexed page number. zoom: Zoom factor (3.0 = 432 DPI). Returns: numpy array in BGR format. """ import fitz # PyMuPDF pdf_doc = fitz.open(stream=pdf_data, filetype="pdf") if page_number >= pdf_doc.page_count: raise ValueError(f"Page {page_number} does not exist (PDF has {pdf_doc.page_count} pages)") page = pdf_doc[page_number] mat = fitz.Matrix(zoom, zoom) pix = page.get_pixmap(matrix=mat) # Convert to numpy BGR img_data = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.h, pix.w, pix.n) if pix.n == 4: # RGBA img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGBA2BGR) elif pix.n == 3: # RGB img_bgr = cv2.cvtColor(img_data, cv2.COLOR_RGB2BGR) else: # Grayscale img_bgr = cv2.cvtColor(img_data, cv2.COLOR_GRAY2BGR) pdf_doc.close() return img_bgr def render_image_high_res(image_data: bytes) -> np.ndarray: """Load an image (PNG/JPEG) into a numpy array (BGR). Args: image_data: Raw image bytes. Returns: numpy array in BGR format. """ img_array = np.frombuffer(image_data, dtype=np.uint8) img_bgr = cv2.imdecode(img_array, cv2.IMREAD_COLOR) if img_bgr is None: raise ValueError("Could not decode image data") return img_bgr # ============================================================================= # Stage 1b: Orientation Detection (0°/90°/180°/270°) # ============================================================================= def detect_and_fix_orientation(img_bgr: np.ndarray) -> Tuple[np.ndarray, int]: """Detect page orientation via Tesseract OSD and rotate if needed. Handles upside-down scans (180°) common with book scanners where every other page is flipped due to the scanner hinge. Returns: (corrected_image, rotation_degrees) — rotation is 0, 90, 180, or 270. """ if pytesseract is None: return img_bgr, 0 try: # Tesseract OSD needs a grayscale or RGB image gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) pil_img = Image.fromarray(gray) osd = pytesseract.image_to_osd(pil_img, output_type=pytesseract.Output.DICT) rotate = osd.get("rotate", 0) confidence = osd.get("orientation_conf", 0.0) logger.info(f"OSD: orientation={rotate}° confidence={confidence:.1f}") if rotate == 0 or confidence < 1.0: return img_bgr, 0 # Apply rotation — OSD rotate is the clockwise correction needed if rotate == 180: corrected = cv2.rotate(img_bgr, cv2.ROTATE_180) elif rotate == 90: corrected = cv2.rotate(img_bgr, cv2.ROTATE_90_CLOCKWISE) elif rotate == 270: corrected = cv2.rotate(img_bgr, cv2.ROTATE_90_COUNTERCLOCKWISE) else: return img_bgr, 0 logger.info(f"OSD: rotated {rotate}° to fix orientation") return corrected, rotate except Exception as e: logger.warning(f"OSD orientation detection failed: {e}") return img_bgr, 0 # ============================================================================= # Stage 2: Deskew (Rotation Correction) # ============================================================================= def deskew_image(img: np.ndarray) -> Tuple[np.ndarray, float]: """Correct rotation using Hough Line detection. Args: img: BGR image. Returns: Tuple of (corrected image, detected angle in degrees). """ gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # Binarize for line detection _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) # Detect lines lines = cv2.HoughLinesP(binary, 1, np.pi / 180, threshold=100, minLineLength=img.shape[1] // 4, maxLineGap=20) if lines is None or len(lines) < 3: return img, 0.0 # Compute angles of near-horizontal lines angles = [] for line in lines: x1, y1, x2, y2 = line[0] angle = np.degrees(np.arctan2(y2 - y1, x2 - x1)) if abs(angle) < 15: # Only near-horizontal angles.append(angle) if not angles: return img, 0.0 median_angle = float(np.median(angles)) # Limit correction to ±5° if abs(median_angle) > 5.0: median_angle = 5.0 * np.sign(median_angle) if abs(median_angle) < 0.1: return img, 0.0 # Rotate h, w = img.shape[:2] center = (w // 2, h // 2) M = cv2.getRotationMatrix2D(center, median_angle, 1.0) corrected = cv2.warpAffine(img, M, (w, h), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE) logger.info(f"Deskew: corrected {median_angle:.2f}° rotation") return corrected, median_angle def deskew_image_by_word_alignment( image_data: bytes, lang: str = "eng+deu", downscale_factor: float = 0.5, ) -> Tuple[bytes, float]: """Correct rotation by fitting a line through left-most word starts per text line. More robust than Hough-based deskew for vocabulary worksheets where text lines have consistent left-alignment. Runs a quick Tesseract pass on a downscaled copy to find word positions, computes the dominant left-edge column, fits a line through those points and rotates the full-resolution image. Args: image_data: Raw image bytes (PNG/JPEG). lang: Tesseract language string for the quick pass. downscale_factor: Shrink factor for the quick Tesseract pass (0.5 = 50%). Returns: Tuple of (rotated image as PNG bytes, detected angle in degrees). """ if not CV2_AVAILABLE or not TESSERACT_AVAILABLE: return image_data, 0.0 # 1. Decode image img_array = np.frombuffer(image_data, dtype=np.uint8) img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) if img is None: logger.warning("deskew_by_word_alignment: could not decode image") return image_data, 0.0 orig_h, orig_w = img.shape[:2] # 2. Downscale for fast Tesseract pass small_w = int(orig_w * downscale_factor) small_h = int(orig_h * downscale_factor) small = cv2.resize(img, (small_w, small_h), interpolation=cv2.INTER_AREA) # 3. Quick Tesseract — word-level positions pil_small = Image.fromarray(cv2.cvtColor(small, cv2.COLOR_BGR2RGB)) try: data = pytesseract.image_to_data( pil_small, lang=lang, config="--psm 6 --oem 3", output_type=pytesseract.Output.DICT, ) except Exception as e: logger.warning(f"deskew_by_word_alignment: Tesseract failed: {e}") return image_data, 0.0 # 4. Per text-line, find the left-most word start # Group by (block_num, par_num, line_num) line_groups: Dict[tuple, list] = defaultdict(list) for i in range(len(data["text"])): text = (data["text"][i] or "").strip() conf = int(data["conf"][i]) if not text or conf < 20: continue key = (data["block_num"][i], data["par_num"][i], data["line_num"][i]) line_groups[key].append(i) if len(line_groups) < 5: logger.info(f"deskew_by_word_alignment: only {len(line_groups)} lines, skipping") return image_data, 0.0 # For each line, pick the word with smallest 'left' → compute (left_x, center_y) # Scale back to original resolution scale = 1.0 / downscale_factor points = [] # list of (x, y) in original-image coords for key, indices in line_groups.items(): best_idx = min(indices, key=lambda i: data["left"][i]) lx = data["left"][best_idx] * scale top = data["top"][best_idx] * scale h = data["height"][best_idx] * scale cy = top + h / 2.0 points.append((lx, cy)) # 5. Find dominant left-edge column + compute angle xs = np.array([p[0] for p in points]) ys = np.array([p[1] for p in points]) median_x = float(np.median(xs)) tolerance = orig_w * 0.03 # 3% of image width mask = np.abs(xs - median_x) <= tolerance filtered_xs = xs[mask] filtered_ys = ys[mask] if len(filtered_xs) < 5: logger.info(f"deskew_by_word_alignment: only {len(filtered_xs)} aligned points after filter, skipping") return image_data, 0.0 # polyfit: x = a*y + b → a = dx/dy → angle = arctan(a) coeffs = np.polyfit(filtered_ys, filtered_xs, 1) slope = coeffs[0] # dx/dy angle_rad = np.arctan(slope) angle_deg = float(np.degrees(angle_rad)) # Clamp to ±5° angle_deg = max(-5.0, min(5.0, angle_deg)) logger.info(f"deskew_by_word_alignment: detected {angle_deg:.2f}° from {len(filtered_xs)} points " f"(total lines: {len(line_groups)})") if abs(angle_deg) < 0.05: return image_data, 0.0 # 6. Rotate full-res image center = (orig_w // 2, orig_h // 2) M = cv2.getRotationMatrix2D(center, angle_deg, 1.0) rotated = cv2.warpAffine(img, M, (orig_w, orig_h), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE) # Encode back to PNG success, png_buf = cv2.imencode(".png", rotated) if not success: logger.warning("deskew_by_word_alignment: PNG encoding failed") return image_data, 0.0 return png_buf.tobytes(), angle_deg def _projection_gradient_score(profile: np.ndarray) -> float: """Score a projection profile by the L2-norm of its first derivative. Higher score = sharper transitions between text-lines and gaps, i.e. better row/column alignment. """ diff = np.diff(profile) return float(np.sum(diff * diff)) def deskew_image_iterative( img: np.ndarray, coarse_range: float = 5.0, coarse_step: float = 0.1, fine_range: float = 0.15, fine_step: float = 0.02, ) -> Tuple[np.ndarray, float, Dict[str, Any]]: """Iterative deskew using vertical-edge projection optimisation. The key insight: at the correct rotation angle, vertical features (word left-edges, column borders) become truly vertical, producing the sharpest peaks in the vertical projection of vertical edges. Method: 1. Detect vertical edges via Sobel-X on the central crop. 2. Coarse sweep: rotate edge image, compute vertical projection gradient score. The angle where vertical edges align best wins. 3. Fine sweep: refine around the coarse winner. Args: img: BGR image (full resolution). coarse_range: half-range in degrees for the coarse sweep. coarse_step: step size in degrees for the coarse sweep. fine_range: half-range around the coarse winner for the fine sweep. fine_step: step size in degrees for the fine sweep. Returns: (rotated_bgr, angle_degrees, debug_dict) """ h, w = img.shape[:2] debug: Dict[str, Any] = {} # --- Grayscale + vertical edge detection --- gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # Central crop (15%-85% height, 10%-90% width) to avoid page margins y_lo, y_hi = int(h * 0.15), int(h * 0.85) x_lo, x_hi = int(w * 0.10), int(w * 0.90) gray_crop = gray[y_lo:y_hi, x_lo:x_hi] # Sobel-X → absolute vertical edges sobel_x = cv2.Sobel(gray_crop, cv2.CV_64F, 1, 0, ksize=3) edges = np.abs(sobel_x) # Normalise to 0-255 for consistent scoring edge_max = edges.max() if edge_max > 0: edges = (edges / edge_max * 255).astype(np.uint8) else: return img, 0.0, {"error": "no edges detected"} crop_h, crop_w = edges.shape[:2] crop_center = (crop_w // 2, crop_h // 2) # Trim margin after rotation to avoid border artifacts trim_y = max(4, int(crop_h * 0.03)) trim_x = max(4, int(crop_w * 0.03)) def _sweep_edges(angles: np.ndarray) -> list: """Score each angle by vertical projection gradient of vertical edges.""" results = [] for angle in angles: if abs(angle) < 1e-6: rotated = edges else: M = cv2.getRotationMatrix2D(crop_center, angle, 1.0) rotated = cv2.warpAffine(edges, M, (crop_w, crop_h), flags=cv2.INTER_NEAREST, borderMode=cv2.BORDER_REPLICATE) # Trim borders to avoid edge artifacts trimmed = rotated[trim_y:-trim_y, trim_x:-trim_x] v_profile = np.sum(trimmed, axis=0, dtype=np.float64) score = _projection_gradient_score(v_profile) results.append((float(angle), score)) return results # --- Phase 1: coarse sweep --- coarse_angles = np.arange(-coarse_range, coarse_range + coarse_step * 0.5, coarse_step) coarse_results = _sweep_edges(coarse_angles) best_coarse = max(coarse_results, key=lambda x: x[1]) best_coarse_angle, best_coarse_score = best_coarse debug["coarse_best_angle"] = round(best_coarse_angle, 2) debug["coarse_best_score"] = round(best_coarse_score, 1) debug["coarse_scores"] = [(round(a, 2), round(s, 1)) for a, s in coarse_results] # --- Phase 2: fine sweep around coarse winner --- fine_lo = best_coarse_angle - fine_range fine_hi = best_coarse_angle + fine_range fine_angles = np.arange(fine_lo, fine_hi + fine_step * 0.5, fine_step) fine_results = _sweep_edges(fine_angles) best_fine = max(fine_results, key=lambda x: x[1]) best_fine_angle, best_fine_score = best_fine debug["fine_best_angle"] = round(best_fine_angle, 2) debug["fine_best_score"] = round(best_fine_score, 1) debug["fine_scores"] = [(round(a, 2), round(s, 1)) for a, s in fine_results] final_angle = best_fine_angle # Clamp to ±5° final_angle = max(-5.0, min(5.0, final_angle)) logger.info(f"deskew_iterative: coarse={best_coarse_angle:.2f}° fine={best_fine_angle:.2f}° -> {final_angle:.2f}°") if abs(final_angle) < 0.05: return img, 0.0, debug # --- Rotate full-res image --- center = (w // 2, h // 2) M = cv2.getRotationMatrix2D(center, final_angle, 1.0) rotated = cv2.warpAffine(img, M, (w, h), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE) return rotated, final_angle, debug def _measure_textline_slope(img: np.ndarray) -> float: """Measure residual text-line slope via Tesseract word-position regression. Groups Tesseract words by (block, par, line), fits a linear regression per line (y = slope * x + b), and returns the trimmed-mean slope in degrees. Positive = text rises to the right, negative = falls. This is the most direct measurement of remaining rotation after deskew. """ import math as _math if not TESSERACT_AVAILABLE or not CV2_AVAILABLE: return 0.0 h, w = img.shape[:2] gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) data = pytesseract.image_to_data( Image.fromarray(gray), output_type=pytesseract.Output.DICT, config="--psm 6", ) # Group word centres by text line lines: Dict[tuple, list] = {} for i in range(len(data["text"])): txt = (data["text"][i] or "").strip() if len(txt) < 2 or int(data["conf"][i]) < 30: continue key = (data["block_num"][i], data["par_num"][i], data["line_num"][i]) cx = data["left"][i] + data["width"][i] / 2.0 cy = data["top"][i] + data["height"][i] / 2.0 lines.setdefault(key, []).append((cx, cy)) # Per-line linear regression → slope angle slopes: list = [] for pts in lines.values(): if len(pts) < 3: continue pts.sort(key=lambda p: p[0]) xs = np.array([p[0] for p in pts], dtype=np.float64) ys = np.array([p[1] for p in pts], dtype=np.float64) if xs[-1] - xs[0] < w * 0.15: continue # skip short lines A = np.vstack([xs, np.ones_like(xs)]).T result = np.linalg.lstsq(A, ys, rcond=None) slope = result[0][0] slopes.append(_math.degrees(_math.atan(slope))) if len(slopes) < 3: return 0.0 # Trimmed mean (drop 10% extremes on each side) slopes.sort() trim = max(1, len(slopes) // 10) trimmed = slopes[trim:-trim] if len(slopes) > 2 * trim else slopes if not trimmed: return 0.0 return sum(trimmed) / len(trimmed) def deskew_two_pass( img: np.ndarray, coarse_range: float = 5.0, ) -> Tuple[np.ndarray, float, Dict[str, Any]]: """Two-pass deskew: iterative projection + word-alignment residual check. Pass 1: ``deskew_image_iterative()`` (vertical-edge projection, wide range). Pass 2: ``deskew_image_by_word_alignment()`` on the already-corrected image to detect and fix residual skew that the projection method missed. The two corrections are summed. If the residual from Pass 2 is below 0.3° it is ignored (already good enough). Returns: (corrected_bgr, total_angle_degrees, debug_dict) """ debug: Dict[str, Any] = {} # --- Pass 1: iterative projection --- corrected, angle1, dbg1 = deskew_image_iterative( img.copy(), coarse_range=coarse_range, ) debug["pass1_angle"] = round(angle1, 3) debug["pass1_method"] = "iterative" debug["pass1_debug"] = dbg1 # --- Pass 2: word-alignment residual check on corrected image --- angle2 = 0.0 try: # Encode the corrected image to PNG bytes for word-alignment ok, buf = cv2.imencode(".png", corrected) if ok: corrected_bytes, angle2 = deskew_image_by_word_alignment(buf.tobytes()) if abs(angle2) >= 0.3: # Significant residual — decode and use the second correction arr2 = np.frombuffer(corrected_bytes, dtype=np.uint8) corrected2 = cv2.imdecode(arr2, cv2.IMREAD_COLOR) if corrected2 is not None: corrected = corrected2 logger.info(f"deskew_two_pass: pass2 residual={angle2:.2f}° applied " f"(total={angle1 + angle2:.2f}°)") else: angle2 = 0.0 else: logger.info(f"deskew_two_pass: pass2 residual={angle2:.2f}° < 0.3° — skipped") angle2 = 0.0 except Exception as e: logger.warning(f"deskew_two_pass: pass2 word-alignment failed: {e}") angle2 = 0.0 # --- Pass 3: Tesseract text-line regression residual check --- # The most reliable final check: measure actual text-line slopes # using Tesseract word positions and linear regression per line. angle3 = 0.0 try: residual = _measure_textline_slope(corrected) debug["pass3_raw"] = round(residual, 3) if abs(residual) >= 0.3: h3, w3 = corrected.shape[:2] center3 = (w3 // 2, h3 // 2) M3 = cv2.getRotationMatrix2D(center3, residual, 1.0) corrected = cv2.warpAffine( corrected, M3, (w3, h3), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE, ) angle3 = residual logger.info( "deskew_two_pass: pass3 text-line residual=%.2f° applied", residual, ) else: logger.info( "deskew_two_pass: pass3 text-line residual=%.2f° < 0.3° — skipped", residual, ) except Exception as e: logger.warning("deskew_two_pass: pass3 text-line check failed: %s", e) total_angle = angle1 + angle2 + angle3 debug["pass2_angle"] = round(angle2, 3) debug["pass2_method"] = "word_alignment" debug["pass3_angle"] = round(angle3, 3) debug["pass3_method"] = "textline_regression" debug["total_angle"] = round(total_angle, 3) logger.info( "deskew_two_pass: pass1=%.2f° + pass2=%.2f° + pass3=%.2f° = %.2f°", angle1, angle2, angle3, total_angle, ) return corrected, total_angle, debug # ============================================================================= # Stage 3: Dewarp (Book Curvature Correction) # ============================================================================= def _detect_shear_angle(img: np.ndarray) -> Dict[str, Any]: """Detect the vertical shear angle of the page. After deskew (horizontal lines aligned), vertical features like column edges may still be tilted. This measures that tilt by tracking the strongest vertical edge across horizontal strips. The result is a shear angle in degrees: the angular difference between true vertical and the detected column edge. Returns: Dict with keys: method, shear_degrees, confidence. """ h, w = img.shape[:2] result = {"method": "vertical_edge", "shear_degrees": 0.0, "confidence": 0.0} gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # Vertical Sobel to find vertical edges sobel_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3) abs_sobel = np.abs(sobel_x).astype(np.uint8) # Binarize with Otsu _, binary = cv2.threshold(abs_sobel, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) num_strips = 20 strip_h = h // num_strips edge_positions = [] # (y_center, x_position) for i in range(num_strips): y_start = i * strip_h y_end = min((i + 1) * strip_h, h) strip = binary[y_start:y_end, :] # Project vertically (sum along y-axis) projection = np.sum(strip, axis=0).astype(np.float64) if projection.max() == 0: continue # Find the strongest vertical edge in left 40% of image search_w = int(w * 0.4) left_proj = projection[:search_w] if left_proj.max() == 0: continue # Smooth and find peak kernel_size = max(3, w // 100) if kernel_size % 2 == 0: kernel_size += 1 smoothed = cv2.GaussianBlur(left_proj.reshape(1, -1), (kernel_size, 1), 0).flatten() x_pos = float(np.argmax(smoothed)) y_center = (y_start + y_end) / 2.0 edge_positions.append((y_center, x_pos)) if len(edge_positions) < 8: return result ys = np.array([p[0] for p in edge_positions]) xs = np.array([p[1] for p in edge_positions]) # Remove outliers (> 2 std from median) median_x = np.median(xs) std_x = max(np.std(xs), 1.0) mask = np.abs(xs - median_x) < 2 * std_x ys = ys[mask] xs = xs[mask] if len(ys) < 6: return result # Fit straight line: x = slope * y + intercept # The slope tells us the tilt of the vertical edge straight_coeffs = np.polyfit(ys, xs, 1) slope = straight_coeffs[0] # dx/dy in pixels fitted = np.polyval(straight_coeffs, ys) residuals = xs - fitted rmse = float(np.sqrt(np.mean(residuals ** 2))) # Convert slope to angle: arctan(dx/dy) in degrees import math shear_degrees = math.degrees(math.atan(slope)) confidence = min(1.0, len(ys) / 15.0) * max(0.5, 1.0 - rmse / 5.0) result["shear_degrees"] = round(shear_degrees, 3) result["confidence"] = round(float(confidence), 2) return result def _detect_shear_by_projection(img: np.ndarray) -> Dict[str, Any]: """Detect shear angle by maximising variance of horizontal text-line projections. Principle: horizontal text lines produce a row-projection profile with sharp peaks (high variance) when the image is correctly aligned. Any residual shear smears the peaks and reduces variance. We sweep ±3° and pick the angle whose corrected projection has the highest variance. Works best on pages with clear horizontal banding (vocabulary tables, prose). Complements _detect_shear_angle() which needs strong vertical edges. Returns: Dict with keys: method, shear_degrees, confidence. """ import math result = {"method": "projection", "shear_degrees": 0.0, "confidence": 0.0} h, w = img.shape[:2] gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # Otsu binarisation _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) # Work at half resolution for speed small = cv2.resize(binary, (w // 2, h // 2), interpolation=cv2.INTER_AREA) sh, sw = small.shape # 2-pass angle sweep for 10x better precision: # Pass 1: Coarse sweep ±3° in 0.5° steps (13 values) # Pass 2: Fine sweep ±0.5° around coarse best in 0.05° steps (21 values) def _sweep_variance(angles_list): results = [] for angle_deg in angles_list: if abs(angle_deg) < 0.001: rotated = small else: shear_tan = math.tan(math.radians(angle_deg)) M = np.float32([[1, shear_tan, -sh / 2.0 * shear_tan], [0, 1, 0]]) rotated = cv2.warpAffine(small, M, (sw, sh), flags=cv2.INTER_NEAREST, borderMode=cv2.BORDER_CONSTANT) profile = np.sum(rotated, axis=1).astype(float) results.append((angle_deg, float(np.var(profile)))) return results # Pass 1: coarse coarse_angles = [a * 0.5 for a in range(-6, 7)] # 13 values coarse_results = _sweep_variance(coarse_angles) coarse_best = max(coarse_results, key=lambda x: x[1]) # Pass 2: fine around coarse best fine_center = coarse_best[0] fine_angles = [fine_center + a * 0.05 for a in range(-10, 11)] # 21 values fine_results = _sweep_variance(fine_angles) fine_best = max(fine_results, key=lambda x: x[1]) best_angle = fine_best[0] best_variance = fine_best[1] variances = coarse_results + fine_results # Confidence: how much sharper is the best angle vs. the mean? all_mean = sum(v for _, v in variances) / len(variances) if all_mean > 0 and best_variance > all_mean: confidence = min(1.0, (best_variance - all_mean) / (all_mean + 1.0) * 0.6) else: confidence = 0.0 result["shear_degrees"] = round(best_angle, 3) result["confidence"] = round(max(0.0, min(1.0, confidence)), 2) return result def _detect_shear_by_hough(img: np.ndarray) -> Dict[str, Any]: """Detect shear using Hough transform on printed table / ruled lines. Vocabulary worksheets have near-horizontal printed table borders. After deskew these should be exactly horizontal; any residual tilt equals the vertical shear angle (with inverted sign). The sign convention: a horizontal line tilting +α degrees (left end lower) means the page has vertical shear of -α degrees (left column edge drifts to the left going downward). Returns: Dict with keys: method, shear_degrees, confidence. """ result = {"method": "hough_lines", "shear_degrees": 0.0, "confidence": 0.0} h, w = img.shape[:2] gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) edges = cv2.Canny(gray, 50, 150, apertureSize=3) min_len = int(w * 0.15) lines = cv2.HoughLinesP( edges, rho=1, theta=np.pi / 360, threshold=int(w * 0.08), minLineLength=min_len, maxLineGap=20, ) if lines is None or len(lines) < 3: return result horizontal_angles: List[Tuple[float, float]] = [] for line in lines: x1, y1, x2, y2 = line[0] if x1 == x2: continue angle = float(np.degrees(np.arctan2(y2 - y1, x2 - x1))) if abs(angle) <= 5.0: length = float(np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)) horizontal_angles.append((angle, length)) if len(horizontal_angles) < 3: return result # Weighted median angles_arr = np.array([a for a, _ in horizontal_angles]) weights_arr = np.array([l for _, l in horizontal_angles]) sorted_idx = np.argsort(angles_arr) s_angles = angles_arr[sorted_idx] s_weights = weights_arr[sorted_idx] cum = np.cumsum(s_weights) mid_idx = int(np.searchsorted(cum, cum[-1] / 2.0)) median_angle = float(s_angles[min(mid_idx, len(s_angles) - 1)]) agree = sum(1 for a, _ in horizontal_angles if abs(a - median_angle) < 1.0) confidence = min(1.0, agree / max(len(horizontal_angles), 1)) * 0.85 # Sign inversion: horizontal line tilt is complementary to vertical shear shear_degrees = -median_angle result["shear_degrees"] = round(shear_degrees, 3) result["confidence"] = round(max(0.0, min(1.0, confidence)), 2) return result def _detect_shear_by_text_lines(img: np.ndarray) -> Dict[str, Any]: """Detect shear by measuring text-line straightness (Method D). Runs a quick Tesseract scan (PSM 11, 50% downscale) to locate word bounding boxes, groups them into vertical columns by X-proximity, and measures how the left-edge X position drifts with Y (vertical position). The drift dx/dy is the tangent of the shear angle. This directly measures vertical shear (column tilt) rather than horizontal text-line slope, which is already corrected by deskew. Returns: Dict with keys: method, shear_degrees, confidence. """ import math result = {"method": "text_lines", "shear_degrees": 0.0, "confidence": 0.0} h, w = img.shape[:2] # Downscale 50% for speed scale = 0.5 small = cv2.resize(img, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_AREA) gray = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY) pil_img = Image.fromarray(gray) try: data = pytesseract.image_to_data( pil_img, lang='eng+deu', config='--psm 11 --oem 3', output_type=pytesseract.Output.DICT, ) except Exception: return result # Collect word left-edges (x) and vertical centres (y) words = [] for i in range(len(data['text'])): text = data['text'][i].strip() conf = int(data['conf'][i]) if not text or conf < 20 or len(text) < 2: continue left_x = float(data['left'][i]) cy = data['top'][i] + data['height'][i] / 2.0 word_w = float(data['width'][i]) words.append((left_x, cy, word_w)) if len(words) < 15: return result # --- Group words into vertical columns by left-edge X proximity --- # Sort by x, then cluster words whose left-edges are within x_tol avg_w = sum(ww for _, _, ww in words) / len(words) x_tol = max(avg_w * 0.4, 8) # tolerance for "same column" words_by_x = sorted(words, key=lambda w: w[0]) columns: List[List[Tuple[float, float]]] = [] # each: [(left_x, cy), ...] cur_col: List[Tuple[float, float]] = [(words_by_x[0][0], words_by_x[0][1])] cur_x = words_by_x[0][0] for lx, cy, _ in words_by_x[1:]: if abs(lx - cur_x) <= x_tol: cur_col.append((lx, cy)) # Update running x as median of cluster cur_x = cur_x * 0.8 + lx * 0.2 else: if len(cur_col) >= 5: columns.append(cur_col) cur_col = [(lx, cy)] cur_x = lx if len(cur_col) >= 5: columns.append(cur_col) if len(columns) < 2: return result # --- For each column, measure X-drift as a function of Y --- # Fit: left_x = a * cy + b → a = dx/dy = tan(shear_angle) drifts = [] for col in columns: ys = np.array([p[1] for p in col]) xs = np.array([p[0] for p in col]) y_range = ys.max() - ys.min() if y_range < h * scale * 0.3: continue # column must span at least 30% of image height # Linear regression: x = a*y + b coeffs = np.polyfit(ys, xs, 1) drifts.append(coeffs[0]) # dx/dy if len(drifts) < 2: return result # Median dx/dy → shear angle # dx/dy > 0 means left-edges move RIGHT as we go DOWN → columns lean right median_drift = float(np.median(drifts)) shear_degrees = math.degrees(math.atan(median_drift)) # Confidence from column count + drift consistency drift_std = float(np.std(drifts)) consistency = max(0.0, 1.0 - drift_std * 50) # tighter penalty for drift variance count_factor = min(1.0, len(drifts) / 4.0) confidence = count_factor * 0.5 + consistency * 0.5 result["shear_degrees"] = round(shear_degrees, 3) result["confidence"] = round(max(0.0, min(1.0, confidence)), 2) logger.info("text_lines(v2): %d columns, %d drifts, median=%.4f, " "shear=%.3f°, conf=%.2f", len(columns), len(drifts), median_drift, shear_degrees, confidence) return result def _dewarp_quality_check(original: np.ndarray, corrected: np.ndarray) -> bool: """Check whether the dewarp correction actually improved alignment. Compares horizontal projection variance before and after correction. Higher variance means sharper text-line peaks, which indicates better horizontal alignment. Returns True if the correction improved the image, False if it should be discarded. """ def _h_proj_variance(img: np.ndarray) -> float: gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) small = cv2.resize(binary, (binary.shape[1] // 2, binary.shape[0] // 2), interpolation=cv2.INTER_AREA) profile = np.sum(small, axis=1).astype(float) return float(np.var(profile)) var_before = _h_proj_variance(original) var_after = _h_proj_variance(corrected) # Correction must improve variance (even by a tiny margin) return var_after > var_before def _apply_shear(img: np.ndarray, shear_degrees: float) -> np.ndarray: """Apply a vertical shear correction to an image. Shifts each row horizontally proportional to its distance from the vertical center. This corrects the tilt of vertical features (columns) without affecting horizontal alignment (text lines). Args: img: BGR image. shear_degrees: Shear angle in degrees. Positive = shift top-right/bottom-left. Returns: Corrected image. """ import math h, w = img.shape[:2] shear_tan = math.tan(math.radians(shear_degrees)) # Affine matrix: shift x by shear_tan * (y - h/2) # [1 shear_tan -h/2*shear_tan] # [0 1 0 ] M = np.float32([ [1, shear_tan, -h / 2.0 * shear_tan], [0, 1, 0], ]) corrected = cv2.warpAffine(img, M, (w, h), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE) return corrected def _ensemble_shear(detections: List[Dict[str, Any]]) -> Tuple[float, float, str]: """Combine multiple shear detections into a single weighted estimate (v2). Ensemble v2 changes vs v1: - Minimum confidence raised to 0.5 (was 0.3) - text_lines method gets 1.5× weight boost (most reliable detector) - Outlier filter at 1° from weighted mean Returns: (shear_degrees, ensemble_confidence, methods_used_str) """ # Confidence threshold — lowered from 0.5 to 0.35 to catch subtle shear # that individual methods detect with moderate confidence. _MIN_CONF = 0.35 # text_lines gets a weight boost as the most content-aware method _METHOD_WEIGHT_BOOST = {"text_lines": 1.5} accepted = [] for d in detections: if d["confidence"] < _MIN_CONF: continue boost = _METHOD_WEIGHT_BOOST.get(d["method"], 1.0) effective_conf = d["confidence"] * boost accepted.append((d["shear_degrees"], effective_conf, d["method"])) if not accepted: return 0.0, 0.0, "none" if len(accepted) == 1: deg, conf, method = accepted[0] return deg, min(conf, 1.0), method # First pass: weighted mean total_w = sum(c for _, c, _ in accepted) w_mean = sum(d * c for d, c, _ in accepted) / total_w # Outlier filter: keep results within 1° of weighted mean filtered = [(d, c, m) for d, c, m in accepted if abs(d - w_mean) <= 1.0] if not filtered: filtered = accepted # fallback: keep all # Second pass: weighted mean on filtered results total_w2 = sum(c for _, c, _ in filtered) final_deg = sum(d * c for d, c, _ in filtered) / total_w2 # Ensemble confidence: average of individual confidences, boosted when # methods agree (all within 0.5° of each other) avg_conf = total_w2 / len(filtered) spread = max(d for d, _, _ in filtered) - min(d for d, _, _ in filtered) agreement_bonus = 0.15 if spread < 0.5 else 0.0 ensemble_conf = min(1.0, avg_conf + agreement_bonus) methods_str = "+".join(m for _, _, m in filtered) return round(final_deg, 3), round(min(ensemble_conf, 1.0), 2), methods_str def dewarp_image(img: np.ndarray, use_ensemble: bool = True) -> Tuple[np.ndarray, Dict[str, Any]]: """Correct vertical shear after deskew (v2 with quality gate). After deskew aligns horizontal text lines, vertical features (column edges) may still be tilted. This detects the tilt angle using an ensemble of four complementary methods and applies an affine shear correction. Methods (all run in ~150ms total): A. _detect_shear_angle() — vertical edge profile (~50ms) B. _detect_shear_by_projection() — horizontal text-line variance (~30ms) C. _detect_shear_by_hough() — Hough lines on table borders (~20ms) D. _detect_shear_by_text_lines() — text-line straightness (~50ms) Quality gate: after correction, horizontal projection variance is compared before vs after. If correction worsened alignment, it is discarded. Args: img: BGR image (already deskewed). use_ensemble: If False, fall back to single-method behaviour (method A only). Returns: Tuple of (corrected_image, dewarp_info). dewarp_info keys: method, shear_degrees, confidence, detections. """ no_correction = { "method": "none", "shear_degrees": 0.0, "confidence": 0.0, "detections": [], } if not CV2_AVAILABLE: return img, no_correction t0 = time.time() if use_ensemble: det_a = _detect_shear_angle(img) det_b = _detect_shear_by_projection(img) det_c = _detect_shear_by_hough(img) det_d = _detect_shear_by_text_lines(img) detections = [det_a, det_b, det_c, det_d] shear_deg, confidence, method = _ensemble_shear(detections) else: det_a = _detect_shear_angle(img) detections = [det_a] shear_deg = det_a["shear_degrees"] confidence = det_a["confidence"] method = det_a["method"] duration = time.time() - t0 logger.info( "dewarp: ensemble shear=%.3f° conf=%.2f method=%s (%.2fs) | " "A=%.3f/%.2f B=%.3f/%.2f C=%.3f/%.2f D=%.3f/%.2f", shear_deg, confidence, method, duration, detections[0]["shear_degrees"], detections[0]["confidence"], detections[1]["shear_degrees"] if len(detections) > 1 else 0.0, detections[1]["confidence"] if len(detections) > 1 else 0.0, detections[2]["shear_degrees"] if len(detections) > 2 else 0.0, detections[2]["confidence"] if len(detections) > 2 else 0.0, detections[3]["shear_degrees"] if len(detections) > 3 else 0.0, detections[3]["confidence"] if len(detections) > 3 else 0.0, ) # Always include individual detections (even when no correction applied) _all_detections = [ {"method": d["method"], "shear_degrees": d["shear_degrees"], "confidence": d["confidence"]} for d in detections ] # Thresholds: very small shear (<0.08°) is truly irrelevant for OCR. # For ensemble confidence, require at least 0.4 (lowered from 0.5 to # catch moderate-confidence detections from multiple agreeing methods). if abs(shear_deg) < 0.08 or confidence < 0.4: no_correction["detections"] = _all_detections return img, no_correction # Apply correction (negate the detected shear to straighten) corrected = _apply_shear(img, -shear_deg) # Quality gate: verify the correction actually improved alignment. # For small corrections (< 0.5°), the projection variance change can be # negligible, so we skip the quality gate — the cost of a tiny wrong # correction is much less than the cost of leaving 0.4° uncorrected # (which shifts content ~25px at image edges on tall scans). if abs(shear_deg) >= 0.5 and not _dewarp_quality_check(img, corrected): logger.info("dewarp: quality gate REJECTED correction (%.3f°) — " "projection variance did not improve", shear_deg) no_correction["detections"] = _all_detections return img, no_correction info = { "method": method, "shear_degrees": shear_deg, "confidence": confidence, "detections": _all_detections, } return corrected, info def dewarp_image_manual(img: np.ndarray, shear_degrees: float) -> np.ndarray: """Apply shear correction with a manual angle. Args: img: BGR image (deskewed, before dewarp). shear_degrees: Shear angle in degrees to correct. Returns: Corrected image. """ if abs(shear_degrees) < 0.001: return img return _apply_shear(img, -shear_degrees)