diff --git a/klausur-service/backend/cv_vocab_pipeline.py b/klausur-service/backend/cv_vocab_pipeline.py index 27f7e57..93b20a5 100644 --- a/klausur-service/backend/cv_vocab_pipeline.py +++ b/klausur-service/backend/cv_vocab_pipeline.py @@ -401,6 +401,117 @@ def deskew_image_by_word_alignment( return png_buf.tobytes(), angle_deg +def deskew_image_iterative( + img: np.ndarray, + coarse_range: float = 2.0, + coarse_step: float = 0.2, + fine_range: float = 0.5, + fine_step: float = 0.1, +) -> Tuple[np.ndarray, float, Dict[str, Any]]: + """Iterative deskew using projection-profile variance optimisation. + + Two-phase search: + Phase 1 (coarse): maximise horizontal projection variance (row alignment) + Phase 2 (fine): maximise vertical projection variance (column alignment) + + Args: + img: BGR image (full resolution). + coarse_range: half-range in degrees for the coarse sweep. + coarse_step: step size in degrees for the coarse sweep. + fine_range: half-range around the coarse winner for the fine sweep. + fine_step: step size in degrees for the fine sweep. + + Returns: + (rotated_bgr, angle_degrees, debug_dict) + """ + h, w = img.shape[:2] + debug: Dict[str, Any] = {} + + # --- Binarise once (grayscale + Otsu) --- + gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) + + # --- Central crop (20%-80% height) for fast rotation --- + y_lo = int(h * 0.2) + y_hi = int(h * 0.8) + crop = binary[y_lo:y_hi, :] + crop_h, crop_w = crop.shape[:2] + crop_center = (crop_w // 2, crop_h // 2) + + # --- Phase 1: coarse sweep (horizontal projection → row alignment) --- + coarse_angles = np.arange(-coarse_range, coarse_range + coarse_step * 0.5, coarse_step) + best_coarse_angle = 0.0 + best_coarse_score = -1.0 + coarse_scores = [] + + for angle in coarse_angles: + if abs(angle) < 1e-6: + rotated_crop = crop + else: + M = cv2.getRotationMatrix2D(crop_center, angle, 1.0) + rotated_crop = cv2.warpAffine(crop, M, (crop_w, crop_h), + flags=cv2.INTER_NEAREST, + borderMode=cv2.BORDER_CONSTANT, + borderValue=0) + h_profile = np.sum(rotated_crop, axis=1, dtype=np.float64) + score = float(np.var(h_profile)) + coarse_scores.append((round(float(angle), 2), round(score, 1))) + if score > best_coarse_score: + best_coarse_score = score + best_coarse_angle = float(angle) + + debug["coarse_best_angle"] = round(best_coarse_angle, 2) + debug["coarse_best_score"] = round(best_coarse_score, 1) + debug["coarse_scores"] = coarse_scores + + # --- Phase 2: fine sweep (vertical projection → column alignment) --- + fine_lo = best_coarse_angle - fine_range + fine_hi = best_coarse_angle + fine_range + fine_angles = np.arange(fine_lo, fine_hi + fine_step * 0.5, fine_step) + best_fine_angle = best_coarse_angle + best_fine_score = -1.0 + fine_scores = [] + + for angle in fine_angles: + if abs(angle) < 1e-6: + rotated_crop = crop + else: + M = cv2.getRotationMatrix2D(crop_center, angle, 1.0) + rotated_crop = cv2.warpAffine(crop, M, (crop_w, crop_h), + flags=cv2.INTER_NEAREST, + borderMode=cv2.BORDER_CONSTANT, + borderValue=0) + v_profile = np.sum(rotated_crop, axis=0, dtype=np.float64) + score = float(np.var(v_profile)) + fine_scores.append((round(float(angle), 2), round(score, 1))) + if score > best_fine_score: + best_fine_score = score + best_fine_angle = float(angle) + + debug["fine_best_angle"] = round(best_fine_angle, 2) + debug["fine_best_score"] = round(best_fine_score, 1) + debug["fine_scores"] = fine_scores + + final_angle = best_fine_angle + + # Clamp to ±5° + final_angle = max(-5.0, min(5.0, final_angle)) + + logger.info(f"deskew_iterative: coarse={best_coarse_angle:.2f}° fine={best_fine_angle:.2f}° -> {final_angle:.2f}°") + + if abs(final_angle) < 0.05: + return img, 0.0, debug + + # --- Rotate full-res image --- + center = (w // 2, h // 2) + M = cv2.getRotationMatrix2D(center, final_angle, 1.0) + rotated = cv2.warpAffine(img, M, (w, h), + flags=cv2.INTER_LINEAR, + borderMode=cv2.BORDER_REPLICATE) + + return rotated, final_angle, debug + + # ============================================================================= # Stage 3: Dewarp (Book Curvature Correction) # ============================================================================= diff --git a/klausur-service/backend/ocr_pipeline_api.py b/klausur-service/backend/ocr_pipeline_api.py index 4d8b3e8..75ba7ff 100644 --- a/klausur-service/backend/ocr_pipeline_api.py +++ b/klausur-service/backend/ocr_pipeline_api.py @@ -52,6 +52,7 @@ from cv_vocab_pipeline import ( create_ocr_image, deskew_image, deskew_image_by_word_alignment, + deskew_image_iterative, detect_column_geometry, detect_document_type, detect_row_geometry, @@ -485,10 +486,23 @@ async def auto_deskew(session_id: str): logger.warning(f"Word alignment deskew failed: {e}") deskewed_wa_bytes, angle_wa = orig_bytes, 0.0 + # Method 3: Iterative Projection-Profile + angle_iterative = 0.0 + iterative_debug = {} + try: + deskewed_iter, angle_iterative, iterative_debug = deskew_image_iterative(img_bgr.copy()) + except Exception as e: + logger.warning(f"Iterative deskew failed: {e}") + deskewed_iter = img_bgr + duration = time.time() - t0 - # Pick best method - if abs(angle_wa) >= abs(angle_hough) or abs(angle_hough) < 0.1: + # Pick best method — prefer iterative when it found a non-zero angle + if abs(angle_iterative) >= 0.05: + method_used = "iterative" + angle_applied = angle_iterative + deskewed_bgr = deskewed_iter + elif abs(angle_wa) >= abs(angle_hough) or abs(angle_hough) < 0.1: method_used = "word_alignment" angle_applied = angle_wa wa_array = np.frombuffer(deskewed_wa_bytes, dtype=np.uint8) @@ -520,6 +534,7 @@ async def auto_deskew(session_id: str): deskew_result = { "angle_hough": round(angle_hough, 3), "angle_word_alignment": round(angle_wa, 3), + "angle_iterative": round(angle_iterative, 3), "angle_applied": round(angle_applied, 3), "method_used": method_used, "confidence": round(confidence, 2), @@ -542,7 +557,8 @@ async def auto_deskew(session_id: str): await update_session_db(session_id, **db_update) logger.info(f"OCR Pipeline: deskew session {session_id}: " - f"hough={angle_hough:.2f} wa={angle_wa:.2f} -> {method_used} {angle_applied:.2f}") + f"hough={angle_hough:.2f} wa={angle_wa:.2f} iter={angle_iterative:.2f} " + f"-> {method_used} {angle_applied:.2f}") await _append_pipeline_log(session_id, "deskew", { "angle_applied": round(angle_applied, 3),