feat: iterative projection-profile deskew (2-phase variance optimization)
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 25s
CI / test-go-edu-search (push) Successful in 27s
CI / test-python-klausur (push) Failing after 1m53s
CI / test-python-agent-core (push) Successful in 18s
CI / test-nodejs-website (push) Successful in 17s
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 25s
CI / test-go-edu-search (push) Successful in 27s
CI / test-python-klausur (push) Failing after 1m53s
CI / test-python-agent-core (push) Successful in 18s
CI / test-nodejs-website (push) Successful in 17s
Adds deskew_image_iterative() as 3rd deskew method that directly optimizes for projection-profile sharpness instead of proxy signals (Hough/word alignment). Coarse sweep on horizontal profile, fine sweep on vertical profile. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -401,6 +401,117 @@ def deskew_image_by_word_alignment(
|
||||
return png_buf.tobytes(), angle_deg
|
||||
|
||||
|
||||
def deskew_image_iterative(
|
||||
img: np.ndarray,
|
||||
coarse_range: float = 2.0,
|
||||
coarse_step: float = 0.2,
|
||||
fine_range: float = 0.5,
|
||||
fine_step: float = 0.1,
|
||||
) -> Tuple[np.ndarray, float, Dict[str, Any]]:
|
||||
"""Iterative deskew using projection-profile variance optimisation.
|
||||
|
||||
Two-phase search:
|
||||
Phase 1 (coarse): maximise horizontal projection variance (row alignment)
|
||||
Phase 2 (fine): maximise vertical projection variance (column alignment)
|
||||
|
||||
Args:
|
||||
img: BGR image (full resolution).
|
||||
coarse_range: half-range in degrees for the coarse sweep.
|
||||
coarse_step: step size in degrees for the coarse sweep.
|
||||
fine_range: half-range around the coarse winner for the fine sweep.
|
||||
fine_step: step size in degrees for the fine sweep.
|
||||
|
||||
Returns:
|
||||
(rotated_bgr, angle_degrees, debug_dict)
|
||||
"""
|
||||
h, w = img.shape[:2]
|
||||
debug: Dict[str, Any] = {}
|
||||
|
||||
# --- Binarise once (grayscale + Otsu) ---
|
||||
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||||
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
|
||||
|
||||
# --- Central crop (20%-80% height) for fast rotation ---
|
||||
y_lo = int(h * 0.2)
|
||||
y_hi = int(h * 0.8)
|
||||
crop = binary[y_lo:y_hi, :]
|
||||
crop_h, crop_w = crop.shape[:2]
|
||||
crop_center = (crop_w // 2, crop_h // 2)
|
||||
|
||||
# --- Phase 1: coarse sweep (horizontal projection → row alignment) ---
|
||||
coarse_angles = np.arange(-coarse_range, coarse_range + coarse_step * 0.5, coarse_step)
|
||||
best_coarse_angle = 0.0
|
||||
best_coarse_score = -1.0
|
||||
coarse_scores = []
|
||||
|
||||
for angle in coarse_angles:
|
||||
if abs(angle) < 1e-6:
|
||||
rotated_crop = crop
|
||||
else:
|
||||
M = cv2.getRotationMatrix2D(crop_center, angle, 1.0)
|
||||
rotated_crop = cv2.warpAffine(crop, M, (crop_w, crop_h),
|
||||
flags=cv2.INTER_NEAREST,
|
||||
borderMode=cv2.BORDER_CONSTANT,
|
||||
borderValue=0)
|
||||
h_profile = np.sum(rotated_crop, axis=1, dtype=np.float64)
|
||||
score = float(np.var(h_profile))
|
||||
coarse_scores.append((round(float(angle), 2), round(score, 1)))
|
||||
if score > best_coarse_score:
|
||||
best_coarse_score = score
|
||||
best_coarse_angle = float(angle)
|
||||
|
||||
debug["coarse_best_angle"] = round(best_coarse_angle, 2)
|
||||
debug["coarse_best_score"] = round(best_coarse_score, 1)
|
||||
debug["coarse_scores"] = coarse_scores
|
||||
|
||||
# --- Phase 2: fine sweep (vertical projection → column alignment) ---
|
||||
fine_lo = best_coarse_angle - fine_range
|
||||
fine_hi = best_coarse_angle + fine_range
|
||||
fine_angles = np.arange(fine_lo, fine_hi + fine_step * 0.5, fine_step)
|
||||
best_fine_angle = best_coarse_angle
|
||||
best_fine_score = -1.0
|
||||
fine_scores = []
|
||||
|
||||
for angle in fine_angles:
|
||||
if abs(angle) < 1e-6:
|
||||
rotated_crop = crop
|
||||
else:
|
||||
M = cv2.getRotationMatrix2D(crop_center, angle, 1.0)
|
||||
rotated_crop = cv2.warpAffine(crop, M, (crop_w, crop_h),
|
||||
flags=cv2.INTER_NEAREST,
|
||||
borderMode=cv2.BORDER_CONSTANT,
|
||||
borderValue=0)
|
||||
v_profile = np.sum(rotated_crop, axis=0, dtype=np.float64)
|
||||
score = float(np.var(v_profile))
|
||||
fine_scores.append((round(float(angle), 2), round(score, 1)))
|
||||
if score > best_fine_score:
|
||||
best_fine_score = score
|
||||
best_fine_angle = float(angle)
|
||||
|
||||
debug["fine_best_angle"] = round(best_fine_angle, 2)
|
||||
debug["fine_best_score"] = round(best_fine_score, 1)
|
||||
debug["fine_scores"] = fine_scores
|
||||
|
||||
final_angle = best_fine_angle
|
||||
|
||||
# Clamp to ±5°
|
||||
final_angle = max(-5.0, min(5.0, final_angle))
|
||||
|
||||
logger.info(f"deskew_iterative: coarse={best_coarse_angle:.2f}° fine={best_fine_angle:.2f}° -> {final_angle:.2f}°")
|
||||
|
||||
if abs(final_angle) < 0.05:
|
||||
return img, 0.0, debug
|
||||
|
||||
# --- Rotate full-res image ---
|
||||
center = (w // 2, h // 2)
|
||||
M = cv2.getRotationMatrix2D(center, final_angle, 1.0)
|
||||
rotated = cv2.warpAffine(img, M, (w, h),
|
||||
flags=cv2.INTER_LINEAR,
|
||||
borderMode=cv2.BORDER_REPLICATE)
|
||||
|
||||
return rotated, final_angle, debug
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Stage 3: Dewarp (Book Curvature Correction)
|
||||
# =============================================================================
|
||||
|
||||
@@ -52,6 +52,7 @@ from cv_vocab_pipeline import (
|
||||
create_ocr_image,
|
||||
deskew_image,
|
||||
deskew_image_by_word_alignment,
|
||||
deskew_image_iterative,
|
||||
detect_column_geometry,
|
||||
detect_document_type,
|
||||
detect_row_geometry,
|
||||
@@ -485,10 +486,23 @@ async def auto_deskew(session_id: str):
|
||||
logger.warning(f"Word alignment deskew failed: {e}")
|
||||
deskewed_wa_bytes, angle_wa = orig_bytes, 0.0
|
||||
|
||||
# Method 3: Iterative Projection-Profile
|
||||
angle_iterative = 0.0
|
||||
iterative_debug = {}
|
||||
try:
|
||||
deskewed_iter, angle_iterative, iterative_debug = deskew_image_iterative(img_bgr.copy())
|
||||
except Exception as e:
|
||||
logger.warning(f"Iterative deskew failed: {e}")
|
||||
deskewed_iter = img_bgr
|
||||
|
||||
duration = time.time() - t0
|
||||
|
||||
# Pick best method
|
||||
if abs(angle_wa) >= abs(angle_hough) or abs(angle_hough) < 0.1:
|
||||
# Pick best method — prefer iterative when it found a non-zero angle
|
||||
if abs(angle_iterative) >= 0.05:
|
||||
method_used = "iterative"
|
||||
angle_applied = angle_iterative
|
||||
deskewed_bgr = deskewed_iter
|
||||
elif abs(angle_wa) >= abs(angle_hough) or abs(angle_hough) < 0.1:
|
||||
method_used = "word_alignment"
|
||||
angle_applied = angle_wa
|
||||
wa_array = np.frombuffer(deskewed_wa_bytes, dtype=np.uint8)
|
||||
@@ -520,6 +534,7 @@ async def auto_deskew(session_id: str):
|
||||
deskew_result = {
|
||||
"angle_hough": round(angle_hough, 3),
|
||||
"angle_word_alignment": round(angle_wa, 3),
|
||||
"angle_iterative": round(angle_iterative, 3),
|
||||
"angle_applied": round(angle_applied, 3),
|
||||
"method_used": method_used,
|
||||
"confidence": round(confidence, 2),
|
||||
@@ -542,7 +557,8 @@ async def auto_deskew(session_id: str):
|
||||
await update_session_db(session_id, **db_update)
|
||||
|
||||
logger.info(f"OCR Pipeline: deskew session {session_id}: "
|
||||
f"hough={angle_hough:.2f} wa={angle_wa:.2f} -> {method_used} {angle_applied:.2f}")
|
||||
f"hough={angle_hough:.2f} wa={angle_wa:.2f} iter={angle_iterative:.2f} "
|
||||
f"-> {method_used} {angle_applied:.2f}")
|
||||
|
||||
await _append_pipeline_log(session_id, "deskew", {
|
||||
"angle_applied": round(angle_applied, 3),
|
||||
|
||||
Reference in New Issue
Block a user