feat: add pass 3 text-line regression to deskew pipeline
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 24s
CI / test-go-edu-search (push) Successful in 26s
CI / test-python-klausur (push) Failing after 1m53s
CI / test-python-agent-core (push) Successful in 15s
CI / test-nodejs-website (push) Successful in 15s
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 24s
CI / test-go-edu-search (push) Successful in 26s
CI / test-python-klausur (push) Failing after 1m53s
CI / test-python-agent-core (push) Successful in 15s
CI / test-nodejs-website (push) Successful in 15s
After iterative projection (pass 1) and word-alignment (pass 2), a third pass uses Tesseract word positions + linear regression per text line to measure and correct residual rotation. This catches cases where passes 1-2 leave significant slope (e.g. 1.7° residual on heavily skewed scans). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -528,6 +528,67 @@ def deskew_image_iterative(
|
|||||||
return rotated, final_angle, debug
|
return rotated, final_angle, debug
|
||||||
|
|
||||||
|
|
||||||
|
def _measure_textline_slope(img: np.ndarray) -> float:
|
||||||
|
"""Measure residual text-line slope via Tesseract word-position regression.
|
||||||
|
|
||||||
|
Groups Tesseract words by (block, par, line), fits a linear regression
|
||||||
|
per line (y = slope * x + b), and returns the trimmed-mean slope in
|
||||||
|
degrees. Positive = text rises to the right, negative = falls.
|
||||||
|
|
||||||
|
This is the most direct measurement of remaining rotation after deskew.
|
||||||
|
"""
|
||||||
|
import math as _math
|
||||||
|
|
||||||
|
if not TESSERACT_AVAILABLE or not CV2_AVAILABLE:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
h, w = img.shape[:2]
|
||||||
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||||||
|
data = pytesseract.image_to_data(
|
||||||
|
Image.fromarray(gray),
|
||||||
|
output_type=pytesseract.Output.DICT,
|
||||||
|
config="--psm 6",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Group word centres by text line
|
||||||
|
lines: Dict[tuple, list] = {}
|
||||||
|
for i in range(len(data["text"])):
|
||||||
|
txt = (data["text"][i] or "").strip()
|
||||||
|
if len(txt) < 2 or int(data["conf"][i]) < 30:
|
||||||
|
continue
|
||||||
|
key = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
|
||||||
|
cx = data["left"][i] + data["width"][i] / 2.0
|
||||||
|
cy = data["top"][i] + data["height"][i] / 2.0
|
||||||
|
lines.setdefault(key, []).append((cx, cy))
|
||||||
|
|
||||||
|
# Per-line linear regression → slope angle
|
||||||
|
slopes: list = []
|
||||||
|
for pts in lines.values():
|
||||||
|
if len(pts) < 3:
|
||||||
|
continue
|
||||||
|
pts.sort(key=lambda p: p[0])
|
||||||
|
xs = np.array([p[0] for p in pts], dtype=np.float64)
|
||||||
|
ys = np.array([p[1] for p in pts], dtype=np.float64)
|
||||||
|
if xs[-1] - xs[0] < w * 0.15:
|
||||||
|
continue # skip short lines
|
||||||
|
A = np.vstack([xs, np.ones_like(xs)]).T
|
||||||
|
result = np.linalg.lstsq(A, ys, rcond=None)
|
||||||
|
slope = result[0][0]
|
||||||
|
slopes.append(_math.degrees(_math.atan(slope)))
|
||||||
|
|
||||||
|
if len(slopes) < 3:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
# Trimmed mean (drop 10% extremes on each side)
|
||||||
|
slopes.sort()
|
||||||
|
trim = max(1, len(slopes) // 10)
|
||||||
|
trimmed = slopes[trim:-trim] if len(slopes) > 2 * trim else slopes
|
||||||
|
if not trimmed:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
return sum(trimmed) / len(trimmed)
|
||||||
|
|
||||||
|
|
||||||
def deskew_two_pass(
|
def deskew_two_pass(
|
||||||
img: np.ndarray,
|
img: np.ndarray,
|
||||||
coarse_range: float = 5.0,
|
coarse_range: float = 5.0,
|
||||||
@@ -578,12 +639,46 @@ def deskew_two_pass(
|
|||||||
logger.warning(f"deskew_two_pass: pass2 word-alignment failed: {e}")
|
logger.warning(f"deskew_two_pass: pass2 word-alignment failed: {e}")
|
||||||
angle2 = 0.0
|
angle2 = 0.0
|
||||||
|
|
||||||
total_angle = angle1 + angle2
|
# --- Pass 3: Tesseract text-line regression residual check ---
|
||||||
|
# The most reliable final check: measure actual text-line slopes
|
||||||
|
# using Tesseract word positions and linear regression per line.
|
||||||
|
angle3 = 0.0
|
||||||
|
try:
|
||||||
|
residual = _measure_textline_slope(corrected)
|
||||||
|
debug["pass3_raw"] = round(residual, 3)
|
||||||
|
if abs(residual) >= 0.3:
|
||||||
|
h3, w3 = corrected.shape[:2]
|
||||||
|
center3 = (w3 // 2, h3 // 2)
|
||||||
|
M3 = cv2.getRotationMatrix2D(center3, residual, 1.0)
|
||||||
|
corrected = cv2.warpAffine(
|
||||||
|
corrected, M3, (w3, h3),
|
||||||
|
flags=cv2.INTER_LINEAR,
|
||||||
|
borderMode=cv2.BORDER_REPLICATE,
|
||||||
|
)
|
||||||
|
angle3 = residual
|
||||||
|
logger.info(
|
||||||
|
"deskew_two_pass: pass3 text-line residual=%.2f° applied",
|
||||||
|
residual,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.info(
|
||||||
|
"deskew_two_pass: pass3 text-line residual=%.2f° < 0.3° — skipped",
|
||||||
|
residual,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("deskew_two_pass: pass3 text-line check failed: %s", e)
|
||||||
|
|
||||||
|
total_angle = angle1 + angle2 + angle3
|
||||||
debug["pass2_angle"] = round(angle2, 3)
|
debug["pass2_angle"] = round(angle2, 3)
|
||||||
debug["pass2_method"] = "word_alignment"
|
debug["pass2_method"] = "word_alignment"
|
||||||
|
debug["pass3_angle"] = round(angle3, 3)
|
||||||
|
debug["pass3_method"] = "textline_regression"
|
||||||
debug["total_angle"] = round(total_angle, 3)
|
debug["total_angle"] = round(total_angle, 3)
|
||||||
|
|
||||||
logger.info(f"deskew_two_pass: pass1={angle1:.2f}° + pass2={angle2:.2f}° = {total_angle:.2f}°")
|
logger.info(
|
||||||
|
"deskew_two_pass: pass1=%.2f° + pass2=%.2f° + pass3=%.2f° = %.2f°",
|
||||||
|
angle1, angle2, angle3, total_angle,
|
||||||
|
)
|
||||||
|
|
||||||
return corrected, total_angle, debug
|
return corrected, total_angle, debug
|
||||||
|
|
||||||
|
|||||||
@@ -488,12 +488,13 @@ async def auto_deskew(session_id: str):
|
|||||||
|
|
||||||
angle_iterative = two_pass_debug.get("pass1_angle", 0.0)
|
angle_iterative = two_pass_debug.get("pass1_angle", 0.0)
|
||||||
angle_residual = two_pass_debug.get("pass2_angle", 0.0)
|
angle_residual = two_pass_debug.get("pass2_angle", 0.0)
|
||||||
|
angle_textline = two_pass_debug.get("pass3_angle", 0.0)
|
||||||
|
|
||||||
duration = time.time() - t0
|
duration = time.time() - t0
|
||||||
|
|
||||||
method_used = "two_pass"
|
method_used = "three_pass" if abs(angle_textline) >= 0.01 else (
|
||||||
if abs(angle_residual) < 0.3:
|
"two_pass" if abs(angle_residual) >= 0.01 else "iterative"
|
||||||
method_used = "iterative" # pass2 didn't contribute
|
)
|
||||||
|
|
||||||
# Encode as PNG
|
# Encode as PNG
|
||||||
success, deskewed_png_buf = cv2.imencode(".png", deskewed_bgr)
|
success, deskewed_png_buf = cv2.imencode(".png", deskewed_bgr)
|
||||||
@@ -515,6 +516,7 @@ async def auto_deskew(session_id: str):
|
|||||||
"angle_word_alignment": round(angle_wa, 3),
|
"angle_word_alignment": round(angle_wa, 3),
|
||||||
"angle_iterative": round(angle_iterative, 3),
|
"angle_iterative": round(angle_iterative, 3),
|
||||||
"angle_residual": round(angle_residual, 3),
|
"angle_residual": round(angle_residual, 3),
|
||||||
|
"angle_textline": round(angle_textline, 3),
|
||||||
"angle_applied": round(angle_applied, 3),
|
"angle_applied": round(angle_applied, 3),
|
||||||
"method_used": method_used,
|
"method_used": method_used,
|
||||||
"confidence": round(confidence, 2),
|
"confidence": round(confidence, 2),
|
||||||
@@ -540,12 +542,14 @@ async def auto_deskew(session_id: str):
|
|||||||
logger.info(f"OCR Pipeline: deskew session {session_id}: "
|
logger.info(f"OCR Pipeline: deskew session {session_id}: "
|
||||||
f"hough={angle_hough:.2f} wa={angle_wa:.2f} "
|
f"hough={angle_hough:.2f} wa={angle_wa:.2f} "
|
||||||
f"iter={angle_iterative:.2f} residual={angle_residual:.2f} "
|
f"iter={angle_iterative:.2f} residual={angle_residual:.2f} "
|
||||||
|
f"textline={angle_textline:.2f} "
|
||||||
f"-> {method_used} total={angle_applied:.2f}")
|
f"-> {method_used} total={angle_applied:.2f}")
|
||||||
|
|
||||||
await _append_pipeline_log(session_id, "deskew", {
|
await _append_pipeline_log(session_id, "deskew", {
|
||||||
"angle_applied": round(angle_applied, 3),
|
"angle_applied": round(angle_applied, 3),
|
||||||
"angle_iterative": round(angle_iterative, 3),
|
"angle_iterative": round(angle_iterative, 3),
|
||||||
"angle_residual": round(angle_residual, 3),
|
"angle_residual": round(angle_residual, 3),
|
||||||
|
"angle_textline": round(angle_textline, 3),
|
||||||
"confidence": round(confidence, 2),
|
"confidence": round(confidence, 2),
|
||||||
"method": method_used,
|
"method": method_used,
|
||||||
}, duration_ms=int(duration * 1000))
|
}, duration_ms=int(duration * 1000))
|
||||||
|
|||||||
@@ -1371,14 +1371,16 @@ async def _run_ocr_pipeline_for_page(
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Could not create pipeline session in DB: {e}")
|
logger.warning(f"Could not create pipeline session in DB: {e}")
|
||||||
|
|
||||||
# 3. Two-pass deskew: iterative (±5°) + word-alignment residual
|
# 3. Three-pass deskew: iterative + word-alignment + text-line regression
|
||||||
t0 = _time.time()
|
t0 = _time.time()
|
||||||
deskewed_bgr, angle_applied, deskew_debug = deskew_two_pass(img_bgr.copy())
|
deskewed_bgr, angle_applied, deskew_debug = deskew_two_pass(img_bgr.copy())
|
||||||
angle_pass1 = deskew_debug.get("pass1_angle", 0.0)
|
angle_pass1 = deskew_debug.get("pass1_angle", 0.0)
|
||||||
angle_pass2 = deskew_debug.get("pass2_angle", 0.0)
|
angle_pass2 = deskew_debug.get("pass2_angle", 0.0)
|
||||||
|
angle_pass3 = deskew_debug.get("pass3_angle", 0.0)
|
||||||
|
|
||||||
logger.info(f" deskew: pass1={angle_pass1:.2f} pass2={angle_pass2:.2f} "
|
logger.info(f" deskew: p1={angle_pass1:.2f} p2={angle_pass2:.2f} "
|
||||||
f"total={angle_applied:.2f} ({_time.time() - t0:.1f}s)")
|
f"p3={angle_pass3:.2f} total={angle_applied:.2f} "
|
||||||
|
f"({_time.time() - t0:.1f}s)")
|
||||||
|
|
||||||
# 4. Dewarp
|
# 4. Dewarp
|
||||||
t0 = _time.time()
|
t0 = _time.time()
|
||||||
|
|||||||
Reference in New Issue
Block a user