feat: Dewarp-Korrektur als Schritt 2 in OCR Pipeline (7 Schritte)

Implementiert Buchwoelbungs-Entzerrung mit zwei Methoden:
- Methode A: Vertikale-Kanten-Analyse (Sobel + Polynom 2. Grades)
- Methode B: Textzeilen-Baseline (Tesseract + Baseline-Kruemmung)
Beste Methode wird automatisch gewaehlt, manueller Slider (-3 bis +3).

Backend: 3 neue Endpoints (auto/manual dewarp, ground truth)
Frontend: StepDewarp + DewarpControls, Pipeline von 6 auf 7 Schritte

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-02-26 16:46:41 +01:00
parent d552fd8b6b
commit 589d2f811a
13 changed files with 858 additions and 28 deletions

View File

@@ -315,22 +315,356 @@ def deskew_image_by_word_alignment(
# =============================================================================
# Stage 3: Dewarp (Book Curvature) — Pass-Through for now
# Stage 3: Dewarp (Book Curvature Correction)
# =============================================================================
def dewarp_image(img: np.ndarray) -> np.ndarray:
"""Correct book curvature distortion.
def _dewarp_by_vertical_edges(img: np.ndarray) -> Dict[str, Any]:
"""Method A: Detect curvature from strongest vertical text edges.
Currently a pass-through. Will be implemented when book scans are tested.
Splits image into horizontal strips, finds the dominant vertical edge
X-position per strip, fits a 2nd-degree polynomial, and generates a
displacement map if curvature exceeds threshold.
Returns:
Dict with keys: method, curvature_px, confidence, displacement_map (or None).
"""
h, w = img.shape[:2]
result = {"method": "vertical_edge", "curvature_px": 0.0, "confidence": 0.0, "displacement_map": None}
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Vertical Sobel to find vertical edges
sobel_x = cv2.Sobel(gray, cv2.CV_64F, 1, 0, ksize=3)
abs_sobel = np.abs(sobel_x).astype(np.uint8)
# Binarize with Otsu
_, binary = cv2.threshold(abs_sobel, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
num_strips = 20
strip_h = h // num_strips
edge_positions = [] # (y_center, x_position)
for i in range(num_strips):
y_start = i * strip_h
y_end = min((i + 1) * strip_h, h)
strip = binary[y_start:y_end, :]
# Project vertically (sum along y-axis)
projection = np.sum(strip, axis=0).astype(np.float64)
if projection.max() == 0:
continue
# Find the strongest vertical edge in left 40% of image (left margin area)
search_w = int(w * 0.4)
left_proj = projection[:search_w]
if left_proj.max() == 0:
continue
# Smooth and find peak
kernel_size = max(3, w // 100)
if kernel_size % 2 == 0:
kernel_size += 1
smoothed = cv2.GaussianBlur(left_proj.reshape(1, -1), (kernel_size, 1), 0).flatten()
x_pos = float(np.argmax(smoothed))
y_center = (y_start + y_end) / 2.0
edge_positions.append((y_center, x_pos))
if len(edge_positions) < 8:
return result
ys = np.array([p[0] for p in edge_positions])
xs = np.array([p[1] for p in edge_positions])
# Remove outliers (> 2 std from median)
median_x = np.median(xs)
std_x = max(np.std(xs), 1.0)
mask = np.abs(xs - median_x) < 2 * std_x
ys = ys[mask]
xs = xs[mask]
if len(ys) < 6:
return result
# Fit 2nd degree polynomial: x = a*y^2 + b*y + c
coeffs = np.polyfit(ys, xs, 2)
fitted = np.polyval(coeffs, ys)
residuals = xs - fitted
rmse = float(np.sqrt(np.mean(residuals ** 2)))
# Measure curvature: max deviation from straight line
straight_coeffs = np.polyfit(ys, xs, 1)
straight_fitted = np.polyval(straight_coeffs, ys)
curvature_px = float(np.max(np.abs(fitted - straight_fitted)))
if curvature_px < 2.0:
result["confidence"] = 0.3
return result
# Generate displacement map
y_coords = np.arange(h)
all_fitted = np.polyval(coeffs, y_coords)
all_straight = np.polyval(straight_coeffs, y_coords)
dx_per_row = all_fitted - all_straight # displacement per row
# Create full displacement map: each pixel shifts horizontally by dx_per_row[y]
displacement_map = np.zeros((h, w), dtype=np.float32)
for y in range(h):
displacement_map[y, :] = -dx_per_row[y]
confidence = min(1.0, len(ys) / 15.0) * max(0.5, 1.0 - rmse / 5.0)
result["curvature_px"] = round(curvature_px, 2)
result["confidence"] = round(float(confidence), 2)
result["displacement_map"] = displacement_map
return result
def _dewarp_by_text_baseline(img: np.ndarray) -> Dict[str, Any]:
"""Method B: Detect curvature from Tesseract text baseline positions.
Uses a quick Tesseract pass on a downscaled image, groups words into lines,
measures baseline curvature per line, and aggregates into a displacement map.
Returns:
Dict with keys: method, curvature_px, confidence, displacement_map (or None).
"""
h, w = img.shape[:2]
result = {"method": "text_baseline", "curvature_px": 0.0, "confidence": 0.0, "displacement_map": None}
if not TESSERACT_AVAILABLE:
return result
# Downscale for speed
max_dim = 1500
scale_factor = min(1.0, max_dim / max(h, w))
if scale_factor < 1.0:
small = cv2.resize(img, (int(w * scale_factor), int(h * scale_factor)), interpolation=cv2.INTER_AREA)
else:
small = img
scale_factor = 1.0
pil_img = Image.fromarray(cv2.cvtColor(small, cv2.COLOR_BGR2RGB))
try:
data = pytesseract.image_to_data(
pil_img, lang="eng+deu", config="--psm 6 --oem 3",
output_type=pytesseract.Output.DICT,
)
except Exception as e:
logger.warning(f"dewarp text_baseline: Tesseract failed: {e}")
return result
# Group words by line
from collections import defaultdict
line_groups: Dict[tuple, list] = defaultdict(list)
for i in range(len(data["text"])):
text = (data["text"][i] or "").strip()
conf = int(data["conf"][i])
if not text or conf < 20:
continue
key = (data["block_num"][i], data["par_num"][i], data["line_num"][i])
line_groups[key].append(i)
if len(line_groups) < 5:
return result
inv_scale = 1.0 / scale_factor
# For each line with enough words, measure baseline curvature
line_curvatures = [] # (y_center, curvature_px)
all_baselines = [] # (y_center, dx_offset) for displacement map
for key, indices in line_groups.items():
if len(indices) < 3:
continue
# Collect baseline points: (x_center, y_bottom) for each word
points = []
for idx in indices:
x_center = (data["left"][idx] + data["width"][idx] / 2.0) * inv_scale
y_bottom = (data["top"][idx] + data["height"][idx]) * inv_scale
points.append((x_center, y_bottom))
points.sort(key=lambda p: p[0])
xs_line = np.array([p[0] for p in points])
ys_line = np.array([p[1] for p in points])
if len(xs_line) < 3:
continue
# Fit 2nd degree: y = a*x^2 + b*x + c
try:
coeffs = np.polyfit(xs_line, ys_line, 2)
except (np.linalg.LinAlgError, ValueError):
continue
fitted = np.polyval(coeffs, xs_line)
straight = np.polyval(np.polyfit(xs_line, ys_line, 1), xs_line)
curvature = float(np.max(np.abs(fitted - straight)))
y_center = float(np.mean(ys_line))
line_curvatures.append((y_center, curvature, coeffs, xs_line, ys_line))
if len(line_curvatures) < 3:
return result
# Average curvature
avg_curvature = float(np.mean([c[1] for c in line_curvatures]))
if avg_curvature < 1.5:
result["confidence"] = 0.3
return result
# Build displacement map from line baselines
# For each line, compute the vertical offset needed to straighten
displacement_map = np.zeros((h, w), dtype=np.float32)
for y_center, curvature, coeffs, xs_line, ys_line in line_curvatures:
# The displacement is the difference between curved and straight baseline
x_range = np.arange(w, dtype=np.float64)
fitted_y = np.polyval(coeffs, x_range)
straight_y = np.polyval(np.polyfit(xs_line, ys_line, 1), x_range)
dy = fitted_y - straight_y
# Convert vertical curvature to horizontal displacement estimate
# (curvature bends text → horizontal shift proportional to curvature)
# Use the vertical curvature as proxy for horizontal distortion
y_int = int(y_center)
spread = max(int(h / len(line_curvatures) / 2), 20)
y_start = max(0, y_int - spread)
y_end = min(h, y_int + spread)
for y in range(y_start, y_end):
weight = 1.0 - abs(y - y_int) / spread
displacement_map[y, :] += (dy * weight).astype(np.float32)
# Normalize: the displacement map represents vertical shifts
# Convert to horizontal displacement (since curvature typically shifts columns)
# Use the sign of the 2nd-degree coefficient averaged across lines
avg_a = float(np.mean([c[2][0] for c in line_curvatures]))
if abs(avg_a) > 0:
# Scale displacement map to represent horizontal pixel shifts
max_disp = np.max(np.abs(displacement_map))
if max_disp > 0:
displacement_map = displacement_map * (avg_curvature / max_disp)
confidence = min(1.0, len(line_curvatures) / 10.0) * 0.8
result["curvature_px"] = round(avg_curvature, 2)
result["confidence"] = round(float(confidence), 2)
result["displacement_map"] = displacement_map
return result
def _apply_displacement_map(img: np.ndarray, displacement_map: np.ndarray,
scale: float = 1.0) -> np.ndarray:
"""Apply a horizontal displacement map to an image using cv2.remap().
Args:
img: BGR image.
displacement_map: Float32 array (h, w) of horizontal pixel shifts.
scale: Multiplier for the displacement (-3.0 to +3.0).
Returns:
Corrected image (or original if no correction needed).
Corrected image.
"""
# TODO: Implement polynomial fitting + cv2.remap() for book curvature
return img
h, w = img.shape[:2]
# Base coordinate grids
map_x = np.tile(np.arange(w, dtype=np.float32), (h, 1))
map_y = np.tile(np.arange(h, dtype=np.float32).reshape(-1, 1), (1, w))
# Apply scaled displacement
map_x = map_x + displacement_map * scale
# Remap
corrected = cv2.remap(img, map_x, map_y,
interpolation=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
return corrected
def dewarp_image(img: np.ndarray) -> Tuple[np.ndarray, Dict[str, Any]]:
"""Correct book curvature distortion using the best of two methods.
Method A: Vertical edge analysis — detects curvature of the strongest
vertical text edge (left column margin).
Method B: Text baseline analysis — uses Tesseract word positions to
measure baseline curvature across text lines.
The method with higher confidence wins. Returns the corrected image
and a DewarpInfo dict for the API.
Args:
img: BGR image (already deskewed).
Returns:
Tuple of (corrected_image, dewarp_info).
dewarp_info keys: method, curvature_px, confidence, displacement_map.
"""
no_correction = {
"method": "none",
"curvature_px": 0.0,
"confidence": 0.0,
"displacement_map": None,
}
if not CV2_AVAILABLE:
return img, no_correction
t0 = time.time()
# Run both methods
result_a = _dewarp_by_vertical_edges(img)
result_b = _dewarp_by_text_baseline(img)
duration = time.time() - t0
logger.info(f"dewarp: vertical_edge conf={result_a['confidence']:.2f} "
f"curv={result_a['curvature_px']:.1f}px | "
f"text_baseline conf={result_b['confidence']:.2f} "
f"curv={result_b['curvature_px']:.1f}px "
f"({duration:.2f}s)")
# Pick method with higher confidence
if result_a["confidence"] >= result_b["confidence"]:
best = result_a
else:
best = result_b
if best["displacement_map"] is None or best["curvature_px"] < 2.0:
return img, no_correction
# Apply correction
corrected = _apply_displacement_map(img, best["displacement_map"], scale=1.0)
info = {
"method": best["method"],
"curvature_px": best["curvature_px"],
"confidence": best["confidence"],
"displacement_map": best["displacement_map"],
}
return corrected, info
def dewarp_image_manual(img: np.ndarray, displacement_map: np.ndarray,
scale: float) -> np.ndarray:
"""Apply dewarp with manual scale adjustment.
Args:
img: BGR image (deskewed, before dewarp).
displacement_map: The displacement map from auto-dewarp.
scale: Manual scale factor (-3.0 to +3.0).
Returns:
Corrected image.
"""
scale = max(-3.0, min(3.0, scale))
if abs(scale) < 0.01:
return img
return _apply_displacement_map(img, displacement_map, scale=scale)
# =============================================================================

View File

@@ -1,13 +1,14 @@
"""
OCR Pipeline API - Schrittweise Seitenrekonstruktion.
Zerlegt den OCR-Prozess in 6 einzelne Schritte:
Zerlegt den OCR-Prozess in 7 einzelne Schritte:
1. Deskewing - Scan begradigen
2. Spaltenerkennung - Unsichtbare Spalten finden
3. Worterkennung - OCR mit Bounding Boxes
4. Koordinatenzuweisung - Exakte Positionen
5. Seitenrekonstruktion - Seite nachbauen
6. Ground Truth Validierung - Gesamtpruefung
2. Dewarping - Buchwoelbung entzerren
3. Spaltenerkennung - Unsichtbare Spalten finden
4. Worterkennung - OCR mit Bounding Boxes
5. Koordinatenzuweisung - Exakte Positionen
6. Seitenrekonstruktion - Seite nachbauen
7. Ground Truth Validierung - Gesamtpruefung
Lizenz: Apache 2.0
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
@@ -30,6 +31,8 @@ from cv_vocab_pipeline import (
create_ocr_image,
deskew_image,
deskew_image_by_word_alignment,
dewarp_image,
dewarp_image_manual,
render_image_high_res,
render_pdf_high_res,
)
@@ -77,6 +80,16 @@ class DeskewGroundTruthRequest(BaseModel):
notes: Optional[str] = None
class ManualDewarpRequest(BaseModel):
scale: float
class DewarpGroundTruthRequest(BaseModel):
is_correct: bool
corrected_scale: Optional[float] = None
notes: Optional[str] = None
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@@ -116,6 +129,10 @@ async def create_session(file: UploadFile = File(...)):
"deskewed_png": None,
"binarized_png": None,
"deskew_result": None,
"dewarped_bgr": None,
"dewarped_png": None,
"dewarp_result": None,
"displacement_map": None,
"ground_truth": {},
"current_step": 1,
}
@@ -263,13 +280,15 @@ async def manual_deskew(session_id: str, req: ManualDeskewRequest):
@router.get("/sessions/{session_id}/image/{image_type}")
async def get_image(session_id: str, image_type: str):
"""Serve session images: original, deskewed, or binarized."""
"""Serve session images: original, deskewed, dewarped, or binarized."""
session = _get_session(session_id)
if image_type == "original":
data = session.get("original_png")
elif image_type == "deskewed":
data = session.get("deskewed_png")
elif image_type == "dewarped":
data = session.get("dewarped_png")
elif image_type == "binarized":
data = session.get("binarized_png")
else:
@@ -299,3 +318,106 @@ async def save_deskew_ground_truth(session_id: str, req: DeskewGroundTruthReques
f"correct={req.is_correct}, corrected_angle={req.corrected_angle}")
return {"session_id": session_id, "ground_truth": gt}
# ---------------------------------------------------------------------------
# Dewarp Endpoints
# ---------------------------------------------------------------------------
@router.post("/sessions/{session_id}/dewarp")
async def auto_dewarp(session_id: str):
"""Run both dewarp methods on the deskewed image and pick the best."""
session = _get_session(session_id)
deskewed_bgr = session.get("deskewed_bgr")
if deskewed_bgr is None:
raise HTTPException(status_code=400, detail="Deskew must be completed before dewarp")
t0 = time.time()
dewarped_bgr, dewarp_info = dewarp_image(deskewed_bgr)
duration = time.time() - t0
# Encode dewarped as PNG
success, png_buf = cv2.imencode(".png", dewarped_bgr)
dewarped_png = png_buf.tobytes() if success else session["deskewed_png"]
session["dewarped_bgr"] = dewarped_bgr
session["dewarped_png"] = dewarped_png
session["dewarp_result"] = {
"method_used": dewarp_info["method"],
"curvature_px": dewarp_info["curvature_px"],
"confidence": dewarp_info["confidence"],
"duration_seconds": round(duration, 2),
}
session["displacement_map"] = dewarp_info.get("displacement_map")
logger.info(f"OCR Pipeline: dewarp session {session_id}: "
f"method={dewarp_info['method']} curvature={dewarp_info['curvature_px']:.1f}px "
f"conf={dewarp_info['confidence']:.2f} ({duration:.2f}s)")
return {
"session_id": session_id,
"method_used": dewarp_info["method"],
"curvature_px": dewarp_info["curvature_px"],
"confidence": dewarp_info["confidence"],
"duration_seconds": round(duration, 2),
"dewarped_image_url": f"/api/v1/ocr-pipeline/sessions/{session_id}/image/dewarped",
}
@router.post("/sessions/{session_id}/dewarp/manual")
async def manual_dewarp(session_id: str, req: ManualDewarpRequest):
"""Apply dewarp with a manually scaled displacement map."""
session = _get_session(session_id)
deskewed_bgr = session.get("deskewed_bgr")
displacement_map = session.get("displacement_map")
if deskewed_bgr is None:
raise HTTPException(status_code=400, detail="Deskew must be completed before dewarp")
scale = max(-3.0, min(3.0, req.scale))
if displacement_map is None or abs(scale) < 0.01:
# No displacement map or zero scale — use deskewed as-is
dewarped_bgr = deskewed_bgr
else:
dewarped_bgr = dewarp_image_manual(deskewed_bgr, displacement_map, scale)
success, png_buf = cv2.imencode(".png", dewarped_bgr)
dewarped_png = png_buf.tobytes() if success else session.get("deskewed_png")
session["dewarped_bgr"] = dewarped_bgr
session["dewarped_png"] = dewarped_png
session["dewarp_result"] = {
**(session.get("dewarp_result") or {}),
"method_used": "manual",
"scale_applied": round(scale, 2),
}
logger.info(f"OCR Pipeline: manual dewarp session {session_id}: scale={scale:.2f}")
return {
"session_id": session_id,
"scale_applied": round(scale, 2),
"method_used": "manual",
"dewarped_image_url": f"/api/v1/ocr-pipeline/sessions/{session_id}/image/dewarped",
}
@router.post("/sessions/{session_id}/ground-truth/dewarp")
async def save_dewarp_ground_truth(session_id: str, req: DewarpGroundTruthRequest):
"""Save ground truth feedback for the dewarp step."""
session = _get_session(session_id)
gt = {
"is_correct": req.is_correct,
"corrected_scale": req.corrected_scale,
"notes": req.notes,
"saved_at": datetime.utcnow().isoformat(),
"dewarp_result": session.get("dewarp_result"),
}
session["ground_truth"]["dewarp"] = gt
logger.info(f"OCR Pipeline: ground truth dewarp session {session_id}: "
f"correct={req.is_correct}, corrected_scale={req.corrected_scale}")
return {"session_id": session_id, "ground_truth": gt}