From d39d249daa162f35c99c9769e4ec54318c8a13ac Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Thu, 5 Mar 2026 17:53:11 +0100 Subject: [PATCH] feat: add pass 3 text-line regression to deskew pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After iterative projection (pass 1) and word-alignment (pass 2), a third pass uses Tesseract word positions + linear regression per text line to measure and correct residual rotation. This catches cases where passes 1-2 leave significant slope (e.g. 1.7° residual on heavily skewed scans). Co-Authored-By: Claude Opus 4.6 --- klausur-service/backend/cv_vocab_pipeline.py | 99 ++++++++++++++++++- klausur-service/backend/ocr_pipeline_api.py | 10 +- .../backend/vocab_worksheet_api.py | 8 +- 3 files changed, 109 insertions(+), 8 deletions(-) diff --git a/klausur-service/backend/cv_vocab_pipeline.py b/klausur-service/backend/cv_vocab_pipeline.py index e893c7f..e775c27 100644 --- a/klausur-service/backend/cv_vocab_pipeline.py +++ b/klausur-service/backend/cv_vocab_pipeline.py @@ -528,6 +528,67 @@ def deskew_image_iterative( return rotated, final_angle, debug +def _measure_textline_slope(img: np.ndarray) -> float: + """Measure residual text-line slope via Tesseract word-position regression. + + Groups Tesseract words by (block, par, line), fits a linear regression + per line (y = slope * x + b), and returns the trimmed-mean slope in + degrees. Positive = text rises to the right, negative = falls. + + This is the most direct measurement of remaining rotation after deskew. + """ + import math as _math + + if not TESSERACT_AVAILABLE or not CV2_AVAILABLE: + return 0.0 + + h, w = img.shape[:2] + gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + data = pytesseract.image_to_data( + Image.fromarray(gray), + output_type=pytesseract.Output.DICT, + config="--psm 6", + ) + + # Group word centres by text line + lines: Dict[tuple, list] = {} + for i in range(len(data["text"])): + txt = (data["text"][i] or "").strip() + if len(txt) < 2 or int(data["conf"][i]) < 30: + continue + key = (data["block_num"][i], data["par_num"][i], data["line_num"][i]) + cx = data["left"][i] + data["width"][i] / 2.0 + cy = data["top"][i] + data["height"][i] / 2.0 + lines.setdefault(key, []).append((cx, cy)) + + # Per-line linear regression → slope angle + slopes: list = [] + for pts in lines.values(): + if len(pts) < 3: + continue + pts.sort(key=lambda p: p[0]) + xs = np.array([p[0] for p in pts], dtype=np.float64) + ys = np.array([p[1] for p in pts], dtype=np.float64) + if xs[-1] - xs[0] < w * 0.15: + continue # skip short lines + A = np.vstack([xs, np.ones_like(xs)]).T + result = np.linalg.lstsq(A, ys, rcond=None) + slope = result[0][0] + slopes.append(_math.degrees(_math.atan(slope))) + + if len(slopes) < 3: + return 0.0 + + # Trimmed mean (drop 10% extremes on each side) + slopes.sort() + trim = max(1, len(slopes) // 10) + trimmed = slopes[trim:-trim] if len(slopes) > 2 * trim else slopes + if not trimmed: + return 0.0 + + return sum(trimmed) / len(trimmed) + + def deskew_two_pass( img: np.ndarray, coarse_range: float = 5.0, @@ -578,12 +639,46 @@ def deskew_two_pass( logger.warning(f"deskew_two_pass: pass2 word-alignment failed: {e}") angle2 = 0.0 - total_angle = angle1 + angle2 + # --- Pass 3: Tesseract text-line regression residual check --- + # The most reliable final check: measure actual text-line slopes + # using Tesseract word positions and linear regression per line. + angle3 = 0.0 + try: + residual = _measure_textline_slope(corrected) + debug["pass3_raw"] = round(residual, 3) + if abs(residual) >= 0.3: + h3, w3 = corrected.shape[:2] + center3 = (w3 // 2, h3 // 2) + M3 = cv2.getRotationMatrix2D(center3, residual, 1.0) + corrected = cv2.warpAffine( + corrected, M3, (w3, h3), + flags=cv2.INTER_LINEAR, + borderMode=cv2.BORDER_REPLICATE, + ) + angle3 = residual + logger.info( + "deskew_two_pass: pass3 text-line residual=%.2f° applied", + residual, + ) + else: + logger.info( + "deskew_two_pass: pass3 text-line residual=%.2f° < 0.3° — skipped", + residual, + ) + except Exception as e: + logger.warning("deskew_two_pass: pass3 text-line check failed: %s", e) + + total_angle = angle1 + angle2 + angle3 debug["pass2_angle"] = round(angle2, 3) debug["pass2_method"] = "word_alignment" + debug["pass3_angle"] = round(angle3, 3) + debug["pass3_method"] = "textline_regression" debug["total_angle"] = round(total_angle, 3) - logger.info(f"deskew_two_pass: pass1={angle1:.2f}° + pass2={angle2:.2f}° = {total_angle:.2f}°") + logger.info( + "deskew_two_pass: pass1=%.2f° + pass2=%.2f° + pass3=%.2f° = %.2f°", + angle1, angle2, angle3, total_angle, + ) return corrected, total_angle, debug diff --git a/klausur-service/backend/ocr_pipeline_api.py b/klausur-service/backend/ocr_pipeline_api.py index d8084b6..43425b3 100644 --- a/klausur-service/backend/ocr_pipeline_api.py +++ b/klausur-service/backend/ocr_pipeline_api.py @@ -488,12 +488,13 @@ async def auto_deskew(session_id: str): angle_iterative = two_pass_debug.get("pass1_angle", 0.0) angle_residual = two_pass_debug.get("pass2_angle", 0.0) + angle_textline = two_pass_debug.get("pass3_angle", 0.0) duration = time.time() - t0 - method_used = "two_pass" - if abs(angle_residual) < 0.3: - method_used = "iterative" # pass2 didn't contribute + method_used = "three_pass" if abs(angle_textline) >= 0.01 else ( + "two_pass" if abs(angle_residual) >= 0.01 else "iterative" + ) # Encode as PNG success, deskewed_png_buf = cv2.imencode(".png", deskewed_bgr) @@ -515,6 +516,7 @@ async def auto_deskew(session_id: str): "angle_word_alignment": round(angle_wa, 3), "angle_iterative": round(angle_iterative, 3), "angle_residual": round(angle_residual, 3), + "angle_textline": round(angle_textline, 3), "angle_applied": round(angle_applied, 3), "method_used": method_used, "confidence": round(confidence, 2), @@ -540,12 +542,14 @@ async def auto_deskew(session_id: str): logger.info(f"OCR Pipeline: deskew session {session_id}: " f"hough={angle_hough:.2f} wa={angle_wa:.2f} " f"iter={angle_iterative:.2f} residual={angle_residual:.2f} " + f"textline={angle_textline:.2f} " f"-> {method_used} total={angle_applied:.2f}") await _append_pipeline_log(session_id, "deskew", { "angle_applied": round(angle_applied, 3), "angle_iterative": round(angle_iterative, 3), "angle_residual": round(angle_residual, 3), + "angle_textline": round(angle_textline, 3), "confidence": round(confidence, 2), "method": method_used, }, duration_ms=int(duration * 1000)) diff --git a/klausur-service/backend/vocab_worksheet_api.py b/klausur-service/backend/vocab_worksheet_api.py index 8ece7c2..832e348 100644 --- a/klausur-service/backend/vocab_worksheet_api.py +++ b/klausur-service/backend/vocab_worksheet_api.py @@ -1371,14 +1371,16 @@ async def _run_ocr_pipeline_for_page( except Exception as e: logger.warning(f"Could not create pipeline session in DB: {e}") - # 3. Two-pass deskew: iterative (±5°) + word-alignment residual + # 3. Three-pass deskew: iterative + word-alignment + text-line regression t0 = _time.time() deskewed_bgr, angle_applied, deskew_debug = deskew_two_pass(img_bgr.copy()) angle_pass1 = deskew_debug.get("pass1_angle", 0.0) angle_pass2 = deskew_debug.get("pass2_angle", 0.0) + angle_pass3 = deskew_debug.get("pass3_angle", 0.0) - logger.info(f" deskew: pass1={angle_pass1:.2f} pass2={angle_pass2:.2f} " - f"total={angle_applied:.2f} ({_time.time() - t0:.1f}s)") + logger.info(f" deskew: p1={angle_pass1:.2f} p2={angle_pass2:.2f} " + f"p3={angle_pass3:.2f} total={angle_applied:.2f} " + f"({_time.time() - t0:.1f}s)") # 4. Dewarp t0 = _time.time()