feat(klausur-service): OCR-Pipeline Optimierungen (Improvements 2-4)
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 28s
CI / test-go-edu-search (push) Successful in 26s
CI / test-python-klausur (push) Failing after 1m46s
CI / test-python-agent-core (push) Successful in 16s
CI / test-nodejs-website (push) Successful in 16s

## Improvement 2: VLM-basierter Dewarp
- Neuer Query-Parameter `method` für POST /sessions/{id}/dewarp
  Optionen: ensemble (default) | vlm | cv
- `_detect_shear_with_vlm()`: fragt qwen2.5vl:32b per Ollama nach
  dem Scherwinkel — gibt Zahlenwert + Konfidenz zurück
- `os`, `Query` zu ocr_pipeline_api.py Imports hinzugefügt
- `_apply_shear` aus cv_vocab_pipeline importiert

## Improvement 4: 3-Methoden Ensemble-Dewarp
- `_detect_shear_by_projection()`: Varianz-Sweep ±3° / 0.25°-Schritte
  auf horizontalen Text-Zeilen-Projektionen (~30ms)
- `_detect_shear_by_hough()`: Gewichteter Median über HoughLinesP
  auf Tabellen-Linien, Vorzeichen-Inversion (~20ms)
- `_ensemble_shear()`: Kombiniert alle 3 Methoden (conf >= 0.3),
  Ausreißer-Filter bei >1° Abweichung, Bonus bei Agreement <0.5°
- `dewarp_image()` nutzt jetzt alle 3 Methoden parallel,
  `use_ensemble: bool = True` für Rückwärtskompatibilität
- auto_dewarp Response enthält jetzt `detections`-Array

## Improvement 3: Vollautomatik-Endpoint
- POST /sessions/{id}/run-auto mit RunAutoRequest:
  from_step (1-6), ocr_engine, pronunciation,
  skip_llm_review, dewarp_method
- SSE-Streaming für alle 5+1 Schritte (deskew→dewarp→columns→rows→words→llm-review)
- Jeder Schritt: start / done / skipped / error Events
- Abschluss-Event: {steps_run, steps_skipped}
- LLM-Review-Fehler sind nicht-fatal (Pipeline läuft weiter)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-03 13:13:20 +01:00
parent 2e0f8632f8
commit 50e1c964ee
2 changed files with 975 additions and 27 deletions

View File

@@ -484,6 +484,133 @@ def _detect_shear_angle(img: np.ndarray) -> Dict[str, Any]:
return result
def _detect_shear_by_projection(img: np.ndarray) -> Dict[str, Any]:
"""Detect shear angle by maximising variance of horizontal text-line projections.
Principle: horizontal text lines produce a row-projection profile with sharp
peaks (high variance) when the image is correctly aligned. Any residual shear
smears the peaks and reduces variance. We sweep ±3° and pick the angle whose
corrected projection has the highest variance.
Works best on pages with clear horizontal banding (vocabulary tables, prose).
Complements _detect_shear_angle() which needs strong vertical edges.
Returns:
Dict with keys: method, shear_degrees, confidence.
"""
import math
result = {"method": "projection", "shear_degrees": 0.0, "confidence": 0.0}
h, w = img.shape[:2]
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Otsu binarisation
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
# Work at half resolution for speed
small = cv2.resize(binary, (w // 2, h // 2), interpolation=cv2.INTER_AREA)
sh, sw = small.shape
# Angle sweep: ±3° in 0.25° steps
angles = [a * 0.25 for a in range(-12, 13)] # 25 values
best_angle = 0.0
best_variance = -1.0
variances: List[Tuple[float, float]] = []
for angle_deg in angles:
if abs(angle_deg) < 0.01:
rotated = small
else:
shear_tan = math.tan(math.radians(angle_deg))
M = np.float32([[1, shear_tan, -sh / 2.0 * shear_tan], [0, 1, 0]])
rotated = cv2.warpAffine(small, M, (sw, sh),
flags=cv2.INTER_NEAREST,
borderMode=cv2.BORDER_CONSTANT)
profile = np.sum(rotated, axis=1).astype(float)
var = float(np.var(profile))
variances.append((angle_deg, var))
if var > best_variance:
best_variance = var
best_angle = angle_deg
# Confidence: how much sharper is the best angle vs. the mean?
all_mean = sum(v for _, v in variances) / len(variances)
if all_mean > 0 and best_variance > all_mean:
confidence = min(1.0, (best_variance - all_mean) / (all_mean + 1.0) * 0.6)
else:
confidence = 0.0
result["shear_degrees"] = round(best_angle, 3)
result["confidence"] = round(max(0.0, min(1.0, confidence)), 2)
return result
def _detect_shear_by_hough(img: np.ndarray) -> Dict[str, Any]:
"""Detect shear using Hough transform on printed table / ruled lines.
Vocabulary worksheets have near-horizontal printed table borders. After
deskew these should be exactly horizontal; any residual tilt equals the
vertical shear angle (with inverted sign).
The sign convention: a horizontal line tilting +α degrees (left end lower)
means the page has vertical shear of -α degrees (left column edge drifts
to the left going downward).
Returns:
Dict with keys: method, shear_degrees, confidence.
"""
result = {"method": "hough_lines", "shear_degrees": 0.0, "confidence": 0.0}
h, w = img.shape[:2]
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 50, 150, apertureSize=3)
min_len = int(w * 0.15)
lines = cv2.HoughLinesP(
edges, rho=1, theta=np.pi / 360,
threshold=int(w * 0.08),
minLineLength=min_len,
maxLineGap=20,
)
if lines is None or len(lines) < 3:
return result
horizontal_angles: List[Tuple[float, float]] = []
for line in lines:
x1, y1, x2, y2 = line[0]
if x1 == x2:
continue
angle = float(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
if abs(angle) <= 5.0:
length = float(np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2))
horizontal_angles.append((angle, length))
if len(horizontal_angles) < 3:
return result
# Weighted median
angles_arr = np.array([a for a, _ in horizontal_angles])
weights_arr = np.array([l for _, l in horizontal_angles])
sorted_idx = np.argsort(angles_arr)
s_angles = angles_arr[sorted_idx]
s_weights = weights_arr[sorted_idx]
cum = np.cumsum(s_weights)
mid_idx = int(np.searchsorted(cum, cum[-1] / 2.0))
median_angle = float(s_angles[min(mid_idx, len(s_angles) - 1)])
agree = sum(1 for a, _ in horizontal_angles if abs(a - median_angle) < 1.0)
confidence = min(1.0, agree / max(len(horizontal_angles), 1)) * 0.85
# Sign inversion: horizontal line tilt is complementary to vertical shear
shear_degrees = -median_angle
result["shear_degrees"] = round(shear_degrees, 3)
result["confidence"] = round(max(0.0, min(1.0, confidence)), 2)
return result
def _apply_shear(img: np.ndarray, shear_degrees: float) -> np.ndarray:
"""Apply a vertical shear correction to an image.
@@ -516,24 +643,78 @@ def _apply_shear(img: np.ndarray, shear_degrees: float) -> np.ndarray:
return corrected
def dewarp_image(img: np.ndarray) -> Tuple[np.ndarray, Dict[str, Any]]:
def _ensemble_shear(detections: List[Dict[str, Any]]) -> Tuple[float, float, str]:
"""Combine multiple shear detections into a single weighted estimate.
Only methods with confidence >= 0.3 are considered.
Results are outlier-filtered: if any accepted result differs by more than
1° from the weighted mean, it is discarded.
Returns:
(shear_degrees, ensemble_confidence, methods_used_str)
"""
accepted = [(d["shear_degrees"], d["confidence"], d["method"])
for d in detections if d["confidence"] >= 0.3]
if not accepted:
return 0.0, 0.0, "none"
if len(accepted) == 1:
deg, conf, method = accepted[0]
return deg, conf, method
# First pass: weighted mean
total_w = sum(c for _, c, _ in accepted)
w_mean = sum(d * c for d, c, _ in accepted) / total_w
# Outlier filter: keep results within 1° of weighted mean
filtered = [(d, c, m) for d, c, m in accepted if abs(d - w_mean) <= 1.0]
if not filtered:
filtered = accepted # fallback: keep all
# Second pass: weighted mean on filtered results
total_w2 = sum(c for _, c, _ in filtered)
final_deg = sum(d * c for d, c, _ in filtered) / total_w2
# Ensemble confidence: average of individual confidences, boosted when
# methods agree (all within 0.5° of each other)
avg_conf = total_w2 / len(filtered)
spread = max(d for d, _, _ in filtered) - min(d for d, _, _ in filtered)
agreement_bonus = 0.15 if spread < 0.5 else 0.0
ensemble_conf = min(1.0, avg_conf + agreement_bonus)
methods_str = "+".join(m for _, _, m in filtered)
return round(final_deg, 3), round(ensemble_conf, 2), methods_str
def dewarp_image(img: np.ndarray, use_ensemble: bool = True) -> Tuple[np.ndarray, Dict[str, Any]]:
"""Correct vertical shear after deskew.
After deskew aligns horizontal text lines, vertical features (column
edges) may still be tilted. This detects the tilt angle of the strongest
vertical edge and applies an affine shear correction.
edges) may still be tilted. This detects the tilt angle using an ensemble
of three complementary methods and applies an affine shear correction.
Methods (all run in ~100ms total):
A. _detect_shear_angle() — vertical edge profile (~50ms)
B. _detect_shear_by_projection() — horizontal text-line variance (~30ms)
C. _detect_shear_by_hough() — Hough lines on table borders (~20ms)
Only methods with confidence >= 0.3 contribute to the ensemble.
Outlier filtering discards results deviating > 1° from the weighted mean.
Args:
img: BGR image (already deskewed).
use_ensemble: If False, fall back to single-method behaviour (method A only).
Returns:
Tuple of (corrected_image, dewarp_info).
dewarp_info keys: method, shear_degrees, confidence.
dewarp_info keys: method, shear_degrees, confidence, detections.
"""
no_correction = {
"method": "none",
"shear_degrees": 0.0,
"confidence": 0.0,
"detections": [],
}
if not CV2_AVAILABLE:
@@ -541,14 +722,31 @@ def dewarp_image(img: np.ndarray) -> Tuple[np.ndarray, Dict[str, Any]]:
t0 = time.time()
detection = _detect_shear_angle(img)
if use_ensemble:
det_a = _detect_shear_angle(img)
det_b = _detect_shear_by_projection(img)
det_c = _detect_shear_by_hough(img)
detections = [det_a, det_b, det_c]
shear_deg, confidence, method = _ensemble_shear(detections)
else:
det_a = _detect_shear_angle(img)
detections = [det_a]
shear_deg = det_a["shear_degrees"]
confidence = det_a["confidence"]
method = det_a["method"]
duration = time.time() - t0
shear_deg = detection["shear_degrees"]
confidence = detection["confidence"]
logger.info(f"dewarp: detected shear={shear_deg:.3f}° "
f"conf={confidence:.2f} ({duration:.2f}s)")
logger.info(
"dewarp: ensemble shear=%.3f° conf=%.2f method=%s (%.2fs) | "
"A=%.3f/%.2f B=%.3f/%.2f C=%.3f/%.2f",
shear_deg, confidence, method, duration,
detections[0]["shear_degrees"], detections[0]["confidence"],
detections[1]["shear_degrees"] if len(detections) > 1 else 0.0,
detections[1]["confidence"] if len(detections) > 1 else 0.0,
detections[2]["shear_degrees"] if len(detections) > 2 else 0.0,
detections[2]["confidence"] if len(detections) > 2 else 0.0,
)
# Only correct if shear is significant (> 0.05°)
if abs(shear_deg) < 0.05 or confidence < 0.3:
@@ -558,9 +756,14 @@ def dewarp_image(img: np.ndarray) -> Tuple[np.ndarray, Dict[str, Any]]:
corrected = _apply_shear(img, -shear_deg)
info = {
"method": detection["method"],
"method": method,
"shear_degrees": shear_deg,
"confidence": confidence,
"detections": [
{"method": d["method"], "shear_degrees": d["shear_degrees"],
"confidence": d["confidence"]}
for d in detections
],
}
return corrected, info
@@ -3053,6 +3256,142 @@ def ocr_region_rapid(
return words
def ocr_region_trocr(img_bgr: np.ndarray, region: PageRegion, handwritten: bool = False) -> List[Dict[str, Any]]:
"""Run TrOCR on a region. Returns line-level word dicts (same format as ocr_region_rapid).
Uses trocr_service.get_trocr_model() + _split_into_lines() for line segmentation.
Bboxes are approximated from equal line-height distribution within the region.
Falls back to Tesseract if TrOCR is not available.
"""
from services.trocr_service import get_trocr_model, _split_into_lines, _check_trocr_available
if not _check_trocr_available():
logger.warning("TrOCR not available, falling back to Tesseract")
if region.height > 0 and region.width > 0:
ocr_img_crop = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) if img_bgr is not None else None
if ocr_img_crop is not None:
return ocr_region(ocr_img_crop, region, lang="eng+deu", psm=6)
return []
crop = img_bgr[region.y:region.y + region.height, region.x:region.x + region.width]
if crop.size == 0:
return []
try:
import torch
from PIL import Image as _PILImage
processor, model = get_trocr_model(handwritten=handwritten)
if processor is None or model is None:
logger.warning("TrOCR model not loaded, falling back to Tesseract")
ocr_img_crop = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
return ocr_region(ocr_img_crop, region, lang="eng+deu", psm=6)
pil_crop = _PILImage.fromarray(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB))
lines = _split_into_lines(pil_crop)
if not lines:
lines = [pil_crop]
device = next(model.parameters()).device
all_text = []
confidences = []
for line_img in lines:
pixel_values = processor(images=line_img, return_tensors="pt").pixel_values.to(device)
with torch.no_grad():
generated_ids = model.generate(pixel_values, max_length=128)
text_line = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
if text_line:
all_text.append(text_line)
confidences.append(0.85 if len(text_line) > 3 else 0.5)
if not all_text:
return []
avg_conf = int(sum(confidences) / len(confidences) * 100)
line_h = region.height // max(len(all_text), 1)
words = []
for i, line in enumerate(all_text):
words.append({
"text": line,
"left": region.x,
"top": region.y + i * line_h,
"width": region.width,
"height": line_h,
"conf": avg_conf,
"region_type": region.type,
})
return words
except Exception as e:
logger.error(f"ocr_region_trocr failed: {e}")
return []
def ocr_region_lighton(img_bgr: np.ndarray, region: PageRegion) -> List[Dict[str, Any]]:
"""Run LightOnOCR-2-1B on a region. Returns line-level word dicts (same format as ocr_region_rapid).
Falls back to RapidOCR or Tesseract if LightOnOCR is not available.
"""
from services.lighton_ocr_service import get_lighton_model, _check_lighton_available
if not _check_lighton_available():
logger.warning("LightOnOCR not available, falling back to RapidOCR/Tesseract")
if RAPIDOCR_AVAILABLE and img_bgr is not None:
return ocr_region_rapid(img_bgr, region)
ocr_img_crop = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) if img_bgr is not None else None
return ocr_region(ocr_img_crop, region, lang="eng+deu", psm=6) if ocr_img_crop is not None else []
crop = img_bgr[region.y:region.y + region.height, region.x:region.x + region.width]
if crop.size == 0:
return []
try:
import io
import torch
from PIL import Image as _PILImage
processor, model = get_lighton_model()
if processor is None or model is None:
logger.warning("LightOnOCR model not loaded, falling back to RapidOCR/Tesseract")
if RAPIDOCR_AVAILABLE and img_bgr is not None:
return ocr_region_rapid(img_bgr, region)
ocr_img_crop = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
return ocr_region(ocr_img_crop, region, lang="eng+deu", psm=6)
pil_crop = _PILImage.fromarray(cv2.cvtColor(crop, cv2.COLOR_BGR2RGB))
conversation = [{"role": "user", "content": [{"type": "image"}]}]
inputs = processor.apply_chat_template(
conversation, images=[pil_crop],
add_generation_prompt=True, return_tensors="pt"
).to(model.device)
with torch.no_grad():
output_ids = model.generate(**inputs, max_new_tokens=1024)
text = processor.decode(output_ids[0], skip_special_tokens=True).strip()
if not text:
return []
lines = [l.strip() for l in text.split("\n") if l.strip()]
line_h = region.height // max(len(lines), 1)
words = []
for i, line in enumerate(lines):
words.append({
"text": line,
"left": region.x,
"top": region.y + i * line_h,
"width": region.width,
"height": line_h,
"conf": 85,
"region_type": region.type,
})
return words
except Exception as e:
logger.error(f"ocr_region_lighton failed: {e}")
return []
# =============================================================================
# Post-Processing: Deterministic Quality Fixes
# =============================================================================
@@ -3900,7 +4239,11 @@ def _ocr_single_cell(
x=cell_x, y=cell_y,
width=cell_w, height=cell_h,
)
if use_rapid and img_bgr is not None:
if engine_name in ("trocr-printed", "trocr-handwritten") and img_bgr is not None:
fallback_words = ocr_region_trocr(img_bgr, cell_region, handwritten=(engine_name == "trocr-handwritten"))
elif engine_name == "lighton" and img_bgr is not None:
fallback_words = ocr_region_lighton(img_bgr, cell_region)
elif use_rapid and img_bgr is not None:
fallback_words = ocr_region_rapid(img_bgr, cell_region)
else:
cell_lang = lang_map.get(col.type, lang)
@@ -3981,8 +4324,8 @@ def build_cell_grid(
img_w: Image width in pixels.
img_h: Image height in pixels.
lang: Default Tesseract language.
ocr_engine: 'tesseract', 'rapid', or 'auto'.
img_bgr: BGR color image (required for RapidOCR).
ocr_engine: 'tesseract', 'rapid', 'auto', 'trocr-printed', 'trocr-handwritten', or 'lighton'.
img_bgr: BGR color image (required for RapidOCR / TrOCR / LightOnOCR).
Returns:
(cells, columns_meta) where cells is a list of cell dicts and
@@ -3990,15 +4333,20 @@ def build_cell_grid(
"""
# Resolve engine choice
use_rapid = False
if ocr_engine == "auto":
if ocr_engine in ("trocr-printed", "trocr-handwritten", "lighton"):
engine_name = ocr_engine
elif ocr_engine == "auto":
use_rapid = RAPIDOCR_AVAILABLE and img_bgr is not None
engine_name = "rapid" if use_rapid else "tesseract"
elif ocr_engine == "rapid":
if not RAPIDOCR_AVAILABLE:
logger.warning("RapidOCR requested but not available, falling back to Tesseract")
else:
use_rapid = True
engine_name = "rapid" if use_rapid else "tesseract"
else:
engine_name = "tesseract"
engine_name = "rapid" if use_rapid else "tesseract"
logger.info(f"build_cell_grid: using OCR engine '{engine_name}'")
# Filter to content rows only (skip header/footer)
@@ -4093,7 +4441,11 @@ def build_cell_grid(
)
strip_lang = lang_map.get(relevant_cols[col_idx].type, lang)
if use_rapid and img_bgr is not None:
if engine_name in ("trocr-printed", "trocr-handwritten") and img_bgr is not None:
strip_words = ocr_region_trocr(img_bgr, strip_region, handwritten=(engine_name == "trocr-handwritten"))
elif engine_name == "lighton" and img_bgr is not None:
strip_words = ocr_region_lighton(img_bgr, strip_region)
elif use_rapid and img_bgr is not None:
strip_words = ocr_region_rapid(img_bgr, strip_region)
else:
strip_words = ocr_region(ocr_img, strip_region, lang=strip_lang, psm=6)
@@ -4169,15 +4521,19 @@ def build_cell_grid_streaming(
"""
# Resolve engine choice (same as build_cell_grid)
use_rapid = False
if ocr_engine == "auto":
if ocr_engine in ("trocr-printed", "trocr-handwritten", "lighton"):
engine_name = ocr_engine
elif ocr_engine == "auto":
use_rapid = RAPIDOCR_AVAILABLE and img_bgr is not None
engine_name = "rapid" if use_rapid else "tesseract"
elif ocr_engine == "rapid":
if not RAPIDOCR_AVAILABLE:
logger.warning("RapidOCR requested but not available, falling back to Tesseract")
else:
use_rapid = True
engine_name = "rapid" if use_rapid else "tesseract"
engine_name = "rapid" if use_rapid else "tesseract"
else:
engine_name = "tesseract"
content_rows = [r for r in row_geometries if r.row_type == 'content']
if not content_rows:
@@ -5026,8 +5382,10 @@ import os
import json as _json
import re as _re
_OLLAMA_URL = os.getenv("OLLAMA_URL", os.getenv("OLLAMA_BASE_URL", "http://host.docker.internal:11434"))
OLLAMA_REVIEW_MODEL = os.getenv("OLLAMA_REVIEW_MODEL", "qwen3:30b-a3b")
_OLLAMA_URL = os.getenv("OLLAMA_BASE_URL", "http://host.docker.internal:11434")
OLLAMA_REVIEW_MODEL = os.getenv("OLLAMA_REVIEW_MODEL", "qwen3:0.6b")
_REVIEW_BATCH_SIZE = int(os.getenv("OLLAMA_REVIEW_BATCH_SIZE", "20"))
logger.info("LLM review model: %s (batch=%d)", OLLAMA_REVIEW_MODEL, _REVIEW_BATCH_SIZE)
# Regex: entry contains IPA phonetic brackets like "dance [dɑːns]"
_HAS_PHONETIC_RE = _re.compile(r'\[.*?[ˈˌːʃʒθðŋɑɒɔəɜɪʊʌæ].*?\]')
@@ -5205,7 +5563,7 @@ async def llm_review_entries(
async def llm_review_entries_streaming(
entries: List[Dict],
model: str = None,
batch_size: int = 8,
batch_size: int = _REVIEW_BATCH_SIZE,
):
"""Async generator: yield SSE events while reviewing entries in batches."""
model = model or OLLAMA_REVIEW_MODEL