feat: OCR pipeline v2.1 – narrow column OCR, dewarp automation, Fabric.js editor
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 24s
CI / test-go-edu-search (push) Successful in 27s
CI / test-python-klausur (push) Failing after 1m50s
CI / test-python-agent-core (push) Successful in 19s
CI / test-nodejs-website (push) Successful in 15s

Proposal B: Adaptive padding, crop upscaling, PSM selection, row-strip re-OCR
for narrow columns (<15% width) – expected accuracy boost 60-70% → 85-90%.

Proposal A: New text-line straightness detector (Method D), quality gate
(rejects counterproductive corrections), 2-pass projection refinement,
higher confidence thresholds – expected manual dewarp reduction to <10%.

Proposal C: Fabric.js canvas editor with drag/drop, inline editing, undo/redo,
opacity slider, zoom, PDF/DOCX export endpoints.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-03 22:44:14 +01:00
parent 970ec1f548
commit ab3ecc7c08
7 changed files with 1105 additions and 128 deletions

View File

@@ -511,27 +511,39 @@ def _detect_shear_by_projection(img: np.ndarray) -> Dict[str, Any]:
small = cv2.resize(binary, (w // 2, h // 2), interpolation=cv2.INTER_AREA)
sh, sw = small.shape
# Angle sweep: ±3° in 0.25° steps
angles = [a * 0.25 for a in range(-12, 13)] # 25 values
best_angle = 0.0
best_variance = -1.0
variances: List[Tuple[float, float]] = []
# 2-pass angle sweep for 10x better precision:
# Pass 1: Coarse sweep ±3° in 0.5° steps (13 values)
# Pass 2: Fine sweep ±0.5° around coarse best in 0.05° steps (21 values)
for angle_deg in angles:
if abs(angle_deg) < 0.01:
rotated = small
else:
shear_tan = math.tan(math.radians(angle_deg))
M = np.float32([[1, shear_tan, -sh / 2.0 * shear_tan], [0, 1, 0]])
rotated = cv2.warpAffine(small, M, (sw, sh),
flags=cv2.INTER_NEAREST,
borderMode=cv2.BORDER_CONSTANT)
profile = np.sum(rotated, axis=1).astype(float)
var = float(np.var(profile))
variances.append((angle_deg, var))
if var > best_variance:
best_variance = var
best_angle = angle_deg
def _sweep_variance(angles_list):
results = []
for angle_deg in angles_list:
if abs(angle_deg) < 0.001:
rotated = small
else:
shear_tan = math.tan(math.radians(angle_deg))
M = np.float32([[1, shear_tan, -sh / 2.0 * shear_tan], [0, 1, 0]])
rotated = cv2.warpAffine(small, M, (sw, sh),
flags=cv2.INTER_NEAREST,
borderMode=cv2.BORDER_CONSTANT)
profile = np.sum(rotated, axis=1).astype(float)
results.append((angle_deg, float(np.var(profile))))
return results
# Pass 1: coarse
coarse_angles = [a * 0.5 for a in range(-6, 7)] # 13 values
coarse_results = _sweep_variance(coarse_angles)
coarse_best = max(coarse_results, key=lambda x: x[1])
# Pass 2: fine around coarse best
fine_center = coarse_best[0]
fine_angles = [fine_center + a * 0.05 for a in range(-10, 11)] # 21 values
fine_results = _sweep_variance(fine_angles)
fine_best = max(fine_results, key=lambda x: x[1])
best_angle = fine_best[0]
best_variance = fine_best[1]
variances = coarse_results + fine_results
# Confidence: how much sharper is the best angle vs. the mean?
all_mean = sum(v for _, v in variances) / len(variances)
@@ -611,6 +623,133 @@ def _detect_shear_by_hough(img: np.ndarray) -> Dict[str, Any]:
return result
def _detect_shear_by_text_lines(img: np.ndarray) -> Dict[str, Any]:
"""Detect shear by measuring text-line straightness (Method D).
Runs a quick Tesseract scan (PSM 11, 50% downscale) to locate word
bounding boxes, groups them into horizontal lines by Y-proximity,
fits a linear regression to each line, and takes the median slope
as the shear angle.
This is the most robust method because it measures actual text content
rather than relying on edges, projections, or printed lines.
Returns:
Dict with keys: method, shear_degrees, confidence.
"""
import math
result = {"method": "text_lines", "shear_degrees": 0.0, "confidence": 0.0}
h, w = img.shape[:2]
# Downscale 50% for speed
scale = 0.5
small = cv2.resize(img, (int(w * scale), int(h * scale)),
interpolation=cv2.INTER_AREA)
gray = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY)
pil_img = Image.fromarray(gray)
try:
data = pytesseract.image_to_data(
pil_img, lang='eng+deu', config='--psm 11 --oem 3',
output_type=pytesseract.Output.DICT,
)
except Exception:
return result
# Collect word centres
words = []
for i in range(len(data['text'])):
text = data['text'][i].strip()
conf = int(data['conf'][i])
if not text or conf < 20 or len(text) < 2:
continue
cx = data['left'][i] + data['width'][i] / 2.0
cy = data['top'][i] + data['height'][i] / 2.0
words.append((cx, cy, data['height'][i]))
if len(words) < 10:
return result
# Group words into lines by Y-proximity
avg_h = sum(wh for _, _, wh in words) / len(words)
y_tol = max(avg_h * 0.6, 8)
words_sorted = sorted(words, key=lambda w: w[1])
lines: List[List[Tuple[float, float]]] = []
current_line: List[Tuple[float, float]] = [(words_sorted[0][0], words_sorted[0][1])]
current_y = words_sorted[0][1]
for cx, cy, _ in words_sorted[1:]:
if abs(cy - current_y) <= y_tol:
current_line.append((cx, cy))
else:
if len(current_line) >= 3:
lines.append(current_line)
current_line = [(cx, cy)]
current_y = cy
if len(current_line) >= 3:
lines.append(current_line)
if len(lines) < 3:
return result
# Linear regression per line → slope (dy/dx)
slopes = []
for line in lines:
xs = np.array([p[0] for p in line])
ys = np.array([p[1] for p in line])
x_range = xs.max() - xs.min()
if x_range < 20:
continue
coeffs = np.polyfit(xs, ys, 1)
slopes.append(coeffs[0]) # dy/dx
if len(slopes) < 3:
return result
# Median slope → shear angle
# dy/dx of horizontal text lines = tan(shear_angle)
# Positive slope means text tilts down-right → vertical columns lean right
median_slope = float(np.median(slopes))
shear_degrees = math.degrees(math.atan(median_slope))
# Confidence from line count + slope consistency
slope_std = float(np.std(slopes))
consistency = max(0.0, 1.0 - slope_std * 20) # penalise high variance
count_factor = min(1.0, len(slopes) / 8.0)
confidence = count_factor * 0.6 + consistency * 0.4
result["shear_degrees"] = round(shear_degrees, 3)
result["confidence"] = round(max(0.0, min(1.0, confidence)), 2)
return result
def _dewarp_quality_check(original: np.ndarray, corrected: np.ndarray) -> bool:
"""Check whether the dewarp correction actually improved alignment.
Compares horizontal projection variance before and after correction.
Higher variance means sharper text-line peaks, which indicates better
horizontal alignment.
Returns True if the correction improved the image, False if it should
be discarded.
"""
def _h_proj_variance(img: np.ndarray) -> float:
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
_, binary = cv2.threshold(gray, 0, 255,
cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
small = cv2.resize(binary, (binary.shape[1] // 2, binary.shape[0] // 2),
interpolation=cv2.INTER_AREA)
profile = np.sum(small, axis=1).astype(float)
return float(np.var(profile))
var_before = _h_proj_variance(original)
var_after = _h_proj_variance(corrected)
# Correction must improve variance (even by a tiny margin)
return var_after > var_before
def _apply_shear(img: np.ndarray, shear_degrees: float) -> np.ndarray:
"""Apply a vertical shear correction to an image.
@@ -644,24 +783,36 @@ def _apply_shear(img: np.ndarray, shear_degrees: float) -> np.ndarray:
def _ensemble_shear(detections: List[Dict[str, Any]]) -> Tuple[float, float, str]:
"""Combine multiple shear detections into a single weighted estimate.
"""Combine multiple shear detections into a single weighted estimate (v2).
Only methods with confidence >= 0.3 are considered.
Results are outlier-filtered: if any accepted result differs by more than
1° from the weighted mean, it is discarded.
Ensemble v2 changes vs v1:
- Minimum confidence raised to 0.5 (was 0.3)
- text_lines method gets 1.5× weight boost (most reliable detector)
- Outlier filter at 1° from weighted mean
Returns:
(shear_degrees, ensemble_confidence, methods_used_str)
"""
accepted = [(d["shear_degrees"], d["confidence"], d["method"])
for d in detections if d["confidence"] >= 0.3]
# Higher confidence threshold — "im Zweifel nichts tun"
_MIN_CONF = 0.5
# text_lines gets a weight boost as the most content-aware method
_METHOD_WEIGHT_BOOST = {"text_lines": 1.5}
accepted = []
for d in detections:
if d["confidence"] < _MIN_CONF:
continue
boost = _METHOD_WEIGHT_BOOST.get(d["method"], 1.0)
effective_conf = d["confidence"] * boost
accepted.append((d["shear_degrees"], effective_conf, d["method"]))
if not accepted:
return 0.0, 0.0, "none"
if len(accepted) == 1:
deg, conf, method = accepted[0]
return deg, conf, method
return deg, min(conf, 1.0), method
# First pass: weighted mean
total_w = sum(c for _, c, _ in accepted)
@@ -684,23 +835,24 @@ def _ensemble_shear(detections: List[Dict[str, Any]]) -> Tuple[float, float, str
ensemble_conf = min(1.0, avg_conf + agreement_bonus)
methods_str = "+".join(m for _, _, m in filtered)
return round(final_deg, 3), round(ensemble_conf, 2), methods_str
return round(final_deg, 3), round(min(ensemble_conf, 1.0), 2), methods_str
def dewarp_image(img: np.ndarray, use_ensemble: bool = True) -> Tuple[np.ndarray, Dict[str, Any]]:
"""Correct vertical shear after deskew.
"""Correct vertical shear after deskew (v2 with quality gate).
After deskew aligns horizontal text lines, vertical features (column
edges) may still be tilted. This detects the tilt angle using an ensemble
of three complementary methods and applies an affine shear correction.
of four complementary methods and applies an affine shear correction.
Methods (all run in ~100ms total):
A. _detect_shear_angle() — vertical edge profile (~50ms)
B. _detect_shear_by_projection() — horizontal text-line variance (~30ms)
C. _detect_shear_by_hough() — Hough lines on table borders (~20ms)
Methods (all run in ~150ms total):
A. _detect_shear_angle() — vertical edge profile (~50ms)
B. _detect_shear_by_projection() — horizontal text-line variance (~30ms)
C. _detect_shear_by_hough() — Hough lines on table borders (~20ms)
D. _detect_shear_by_text_lines() — text-line straightness (~50ms)
Only methods with confidence >= 0.3 contribute to the ensemble.
Outlier filtering discards results deviating > 1° from the weighted mean.
Quality gate: after correction, horizontal projection variance is compared
before vs after. If correction worsened alignment, it is discarded.
Args:
img: BGR image (already deskewed).
@@ -726,7 +878,8 @@ def dewarp_image(img: np.ndarray, use_ensemble: bool = True) -> Tuple[np.ndarray
det_a = _detect_shear_angle(img)
det_b = _detect_shear_by_projection(img)
det_c = _detect_shear_by_hough(img)
detections = [det_a, det_b, det_c]
det_d = _detect_shear_by_text_lines(img)
detections = [det_a, det_b, det_c, det_d]
shear_deg, confidence, method = _ensemble_shear(detections)
else:
det_a = _detect_shear_angle(img)
@@ -739,22 +892,35 @@ def dewarp_image(img: np.ndarray, use_ensemble: bool = True) -> Tuple[np.ndarray
logger.info(
"dewarp: ensemble shear=%.3f° conf=%.2f method=%s (%.2fs) | "
"A=%.3f/%.2f B=%.3f/%.2f C=%.3f/%.2f",
"A=%.3f/%.2f B=%.3f/%.2f C=%.3f/%.2f D=%.3f/%.2f",
shear_deg, confidence, method, duration,
detections[0]["shear_degrees"], detections[0]["confidence"],
detections[1]["shear_degrees"] if len(detections) > 1 else 0.0,
detections[1]["confidence"] if len(detections) > 1 else 0.0,
detections[2]["shear_degrees"] if len(detections) > 2 else 0.0,
detections[2]["confidence"] if len(detections) > 2 else 0.0,
detections[3]["shear_degrees"] if len(detections) > 3 else 0.0,
detections[3]["confidence"] if len(detections) > 3 else 0.0,
)
# Only correct if shear is significant (> 0.05°)
if abs(shear_deg) < 0.05 or confidence < 0.3:
# Higher thresholds: subtle shear (<0.15°) is irrelevant for OCR
if abs(shear_deg) < 0.15 or confidence < 0.5:
return img, no_correction
# Apply correction (negate the detected shear to straighten)
corrected = _apply_shear(img, -shear_deg)
# Quality gate: verify the correction actually improved alignment
if not _dewarp_quality_check(img, corrected):
logger.info("dewarp: quality gate REJECTED correction (%.3f°) — "
"projection variance did not improve", shear_deg)
no_correction["detections"] = [
{"method": d["method"], "shear_degrees": d["shear_degrees"],
"confidence": d["confidence"]}
for d in detections
]
return img, no_correction
info = {
"method": method,
"shear_degrees": shear_deg,
@@ -4180,6 +4346,60 @@ def _clean_cell_text(text: str) -> str:
return ' '.join(tokens)
# ---------------------------------------------------------------------------
# Narrow-column OCR helpers (Proposal B)
# ---------------------------------------------------------------------------
def _compute_cell_padding(col_width: int, img_w: int) -> int:
"""Adaptive padding for OCR crops based on column width.
Narrow columns (page_ref, marker) need more surrounding context so
Tesseract can segment characters correctly. Wide columns keep the
minimal 4 px padding to avoid pulling in neighbours.
"""
col_pct = col_width / img_w * 100 if img_w > 0 else 100
if col_pct < 5:
return max(20, col_width // 2)
if col_pct < 10:
return max(12, col_width // 4)
if col_pct < 15:
return 8
return 4
def _ensure_minimum_crop_size(crop: np.ndarray, min_dim: int = 150,
max_scale: int = 3) -> np.ndarray:
"""Upscale tiny crops so Tesseract gets enough pixel data.
If either dimension is below *min_dim*, the crop is bicubic-upscaled
so the smallest dimension reaches *min_dim* (capped at *max_scale* ×).
"""
h, w = crop.shape[:2]
if h >= min_dim and w >= min_dim:
return crop
scale = min(max_scale, max(min_dim / max(h, 1), min_dim / max(w, 1)))
if scale <= 1.0:
return crop
new_w = int(w * scale)
new_h = int(h * scale)
return cv2.resize(crop, (new_w, new_h), interpolation=cv2.INTER_CUBIC)
def _select_psm_for_column(col_type: str, col_width: int,
row_height: int) -> int:
"""Choose the best Tesseract PSM for a given column geometry.
- page_ref columns are almost always single short tokens → PSM 8
- Very narrow or short cells → PSM 7 (single text line)
- Everything else → PSM 6 (uniform block)
"""
if col_type in ('page_ref', 'marker'):
return 8 # single word
if col_width < 100 or row_height < 30:
return 7 # single line
return 6 # uniform block
def _ocr_single_cell(
row_idx: int,
col_idx: int,
@@ -4202,12 +4422,13 @@ def _ocr_single_cell(
disp_w = col.width
disp_h = row.height
# OCR crop: slightly wider to catch edge characters (internal only)
pad = 4
# OCR crop: adaptive padding — narrow columns get more context
pad = _compute_cell_padding(col.width, img_w)
cell_x = max(0, col.x - pad)
cell_y = max(0, row.y - pad)
cell_w = min(col.width + 2 * pad, img_w - cell_x)
cell_h = min(row.height + 2 * pad, img_h - cell_y)
is_narrow = (col.width / img_w * 100) < 15 if img_w > 0 else False
if disp_w <= 0 or disp_h <= 0:
return {
@@ -4266,20 +4487,56 @@ def _ocr_single_cell(
dark_ratio = float(np.count_nonzero(crop < 180)) / crop.size
_run_fallback = dark_ratio > 0.005
if _run_fallback:
cell_region = PageRegion(
type=col.type,
x=cell_x, y=cell_y,
width=cell_w, height=cell_h,
)
if engine_name in ("trocr-printed", "trocr-handwritten") and img_bgr is not None:
fallback_words = ocr_region_trocr(img_bgr, cell_region, handwritten=(engine_name == "trocr-handwritten"))
elif engine_name == "lighton" and img_bgr is not None:
fallback_words = ocr_region_lighton(img_bgr, cell_region)
elif use_rapid and img_bgr is not None:
fallback_words = ocr_region_rapid(img_bgr, cell_region)
# For narrow columns, upscale the crop before OCR
if is_narrow and ocr_img is not None:
_crop_slice = ocr_img[cell_y:cell_y + cell_h, cell_x:cell_x + cell_w]
_upscaled = _ensure_minimum_crop_size(_crop_slice)
if _upscaled is not _crop_slice:
# Build a temporary full-size image with the upscaled crop
# placed at origin so ocr_region can crop it cleanly.
_up_h, _up_w = _upscaled.shape[:2]
_tmp_region = PageRegion(
type=col.type, x=0, y=0, width=_up_w, height=_up_h,
)
_cell_psm = _select_psm_for_column(col.type, col.width, row.height)
cell_lang = lang_map.get(col.type, lang)
fallback_words = ocr_region(_upscaled, _tmp_region,
lang=cell_lang, psm=_cell_psm)
# Remap word positions back to original image coordinates
_sx = cell_w / max(_up_w, 1)
_sy = cell_h / max(_up_h, 1)
for _fw in (fallback_words or []):
_fw['left'] = int(_fw['left'] * _sx) + cell_x
_fw['top'] = int(_fw['top'] * _sy) + cell_y
_fw['width'] = int(_fw['width'] * _sx)
_fw['height'] = int(_fw['height'] * _sy)
else:
# No upscaling needed, use adaptive PSM
cell_region = PageRegion(
type=col.type, x=cell_x, y=cell_y,
width=cell_w, height=cell_h,
)
_cell_psm = _select_psm_for_column(col.type, col.width, row.height)
cell_lang = lang_map.get(col.type, lang)
fallback_words = ocr_region(ocr_img, cell_region,
lang=cell_lang, psm=_cell_psm)
else:
cell_lang = lang_map.get(col.type, lang)
fallback_words = ocr_region(ocr_img, cell_region, lang=cell_lang, psm=6)
cell_region = PageRegion(
type=col.type,
x=cell_x, y=cell_y,
width=cell_w, height=cell_h,
)
if engine_name in ("trocr-printed", "trocr-handwritten") and img_bgr is not None:
fallback_words = ocr_region_trocr(img_bgr, cell_region, handwritten=(engine_name == "trocr-handwritten"))
elif engine_name == "lighton" and img_bgr is not None:
fallback_words = ocr_region_lighton(img_bgr, cell_region)
elif use_rapid and img_bgr is not None:
fallback_words = ocr_region_rapid(img_bgr, cell_region)
else:
_cell_psm = _select_psm_for_column(col.type, col.width, row.height)
cell_lang = lang_map.get(col.type, lang)
fallback_words = ocr_region(ocr_img, cell_region,
lang=cell_lang, psm=_cell_psm)
if fallback_words:
# Apply same confidence filter to fallback words
@@ -4297,8 +4554,12 @@ def _ocr_single_cell(
# --- SECONDARY FALLBACK: PSM=7 (single line) for still-empty cells ---
if not text.strip() and _run_fallback and not use_rapid:
_fb_region = PageRegion(
type=col.type, x=cell_x, y=cell_y,
width=cell_w, height=cell_h,
)
cell_lang = lang_map.get(col.type, lang)
psm7_words = ocr_region(ocr_img, cell_region, lang=cell_lang, psm=7)
psm7_words = ocr_region(ocr_img, _fb_region, lang=cell_lang, psm=7)
if psm7_words:
psm7_words = [w for w in psm7_words if w.get('conf', 0) >= _MIN_WORD_CONF]
if psm7_words:
@@ -4310,6 +4571,38 @@ def _ocr_single_cell(
)
used_engine = 'cell_ocr_psm7'
# --- TERTIARY FALLBACK: Row-strip re-OCR for narrow columns ---
# If a narrow cell is still empty, OCR the entire row strip with
# RapidOCR (which handles small text better) and assign words by
# X-position overlap with this column.
if not text.strip() and is_narrow and img_bgr is not None:
row_region = PageRegion(
type='_row_strip', x=0, y=row.y,
width=img_w, height=row.height,
)
strip_words = ocr_region_rapid(img_bgr, row_region)
if strip_words:
# Filter to words overlapping this column's X-range
col_left = col.x
col_right = col.x + col.width
col_words = []
for sw in strip_words:
sw_left = sw.get('left', 0)
sw_right = sw_left + sw.get('width', 0)
overlap = max(0, min(sw_right, col_right) - max(sw_left, col_left))
if overlap > sw.get('width', 1) * 0.3:
col_words.append(sw)
if col_words:
col_words = [w for w in col_words if w.get('conf', 0) >= _MIN_WORD_CONF]
if col_words:
rs_text = _words_to_reading_order_text(col_words, y_tolerance_px=row.height)
if rs_text.strip():
text = rs_text
avg_conf = round(
sum(w['conf'] for w in col_words) / len(col_words), 1
)
used_engine = 'row_strip_rapid'
# --- NOISE FILTER: clear cells that contain only OCR artifacts ---
if text.strip():
text = _clean_cell_text(text)