Files
breakpilot-lehrer/klausur-service/backend/vocab_worksheet_ocr.py
Benjamin Admin 9ba420fa91
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 42s
CI / test-go-edu-search (push) Successful in 34s
CI / test-python-klausur (push) Failing after 2m51s
CI / test-python-agent-core (push) Successful in 21s
CI / test-nodejs-website (push) Successful in 29s
Fix: Remove broken getKlausurApiUrl and clean up empty lines
sed replacement left orphaned hostname references in story page
and empty lines in getApiBase functions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-24 16:02:04 +02:00

482 lines
19 KiB
Python

"""
Vocab Worksheet OCR Pipeline — full Kombi OCR pipeline for a single page.
Extracted from vocab_worksheet_api.py to keep file sizes manageable.
Pipeline steps:
orientation → deskew → dewarp → crop → scan-quality → enhance →
dual-engine OCR (RapidOCR + Tesseract) → merge → grid-build →
vocab extraction → row merging
"""
import logging
import uuid
from typing import Optional
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Optional heavy dependencies (not available in every environment)
# ---------------------------------------------------------------------------
try:
import cv2
import numpy as np
except ImportError:
cv2 = None # type: ignore[assignment]
np = None # type: ignore[assignment]
logger.warning("cv2 / numpy not available — OCR pipeline disabled")
try:
from PIL import Image
except ImportError:
Image = None # type: ignore[assignment]
try:
import pytesseract
except ImportError:
pytesseract = None # type: ignore[assignment]
# CV pipeline helpers
try:
from cv_vocab_pipeline import (
deskew_two_pass,
dewarp_image,
detect_and_fix_orientation,
_cells_to_vocab_entries,
_fix_phonetic_brackets,
)
except ImportError:
deskew_two_pass = None # type: ignore[assignment]
dewarp_image = None # type: ignore[assignment]
detect_and_fix_orientation = None # type: ignore[assignment]
_cells_to_vocab_entries = None # type: ignore[assignment]
_fix_phonetic_brackets = None # type: ignore[assignment]
try:
from cv_cell_grid import (
_merge_wrapped_rows,
_merge_phonetic_continuation_rows,
_merge_continuation_rows,
)
except ImportError:
_merge_wrapped_rows = None # type: ignore[assignment]
_merge_phonetic_continuation_rows = None # type: ignore[assignment]
_merge_continuation_rows = None # type: ignore[assignment]
try:
from cv_ocr_engines import ocr_region_rapid
except ImportError:
ocr_region_rapid = None # type: ignore[assignment]
try:
from cv_vocab_types import PageRegion
except ImportError:
PageRegion = None # type: ignore[assignment]
try:
from ocr_pipeline_ocr_merge import (
_split_paddle_multi_words,
_merge_paddle_tesseract,
_deduplicate_words,
)
except ImportError:
_split_paddle_multi_words = None # type: ignore[assignment]
_merge_paddle_tesseract = None # type: ignore[assignment]
_deduplicate_words = None # type: ignore[assignment]
try:
from cv_words_first import build_grid_from_words
except ImportError:
build_grid_from_words = None # type: ignore[assignment]
try:
from ocr_pipeline_session_store import (
create_session_db as create_pipeline_session_db,
update_session_db as update_pipeline_session_db,
)
except ImportError:
create_pipeline_session_db = None # type: ignore[assignment]
update_pipeline_session_db = None # type: ignore[assignment]
# ---------------------------------------------------------------------------
# Main pipeline function
# ---------------------------------------------------------------------------
async def _run_ocr_pipeline_for_page(
img_bgr: "np.ndarray",
page_number: int,
vocab_session_id: str,
*,
ipa_mode: str = "none",
syllable_mode: str = "none",
enable_enhance: bool = True,
max_columns: Optional[int] = 3,
override_min_conf: Optional[int] = None,
) -> tuple:
"""Run the full Kombi OCR pipeline on a single page and return vocab entries.
Uses the same pipeline as the admin OCR Kombi pipeline:
orientation → deskew → dewarp → crop → dual-engine OCR → grid-build
(with pipe-autocorrect, word-gap merge, dictionary detection, etc.)
Args:
img_bgr: BGR numpy array.
page_number: 0-indexed page number.
vocab_session_id: Vocab session ID for logging.
ipa_mode: "none" (default for worksheets), "auto", "all", "en", "de".
syllable_mode: "none" (default for worksheets), "auto", "all", "en", "de".
Returns (entries, rotation_deg) where entries is a list of dicts and
rotation_deg is the orientation correction applied (0, 90, 180, 270).
"""
import time as _time
t_total = _time.time()
img_h, img_w = img_bgr.shape[:2]
logger.info(f"Kombi Pipeline page {page_number + 1}: image {img_w}x{img_h}")
# 1. Orientation detection (fix upside-down scans)
t0 = _time.time()
img_bgr, rotation = detect_and_fix_orientation(img_bgr)
if rotation:
img_h, img_w = img_bgr.shape[:2]
logger.info(f" orientation: rotated {rotation}° ({_time.time() - t0:.1f}s)")
else:
logger.info(f" orientation: OK ({_time.time() - t0:.1f}s)")
# 2. Create pipeline session in DB (visible in admin Kombi UI)
pipeline_session_id = str(uuid.uuid4())
try:
_, png_buf = cv2.imencode(".png", img_bgr)
original_png = png_buf.tobytes()
await create_pipeline_session_db(
pipeline_session_id,
name=f"vocab-ws-{vocab_session_id[:8]}-p{page_number + 1}",
filename=f"page_{page_number + 1}.png",
original_png=original_png,
)
except Exception as e:
logger.warning(f"Could not create pipeline session in DB: {e}")
# 3. Three-pass deskew
t0 = _time.time()
deskewed_bgr, angle_applied, deskew_debug = deskew_two_pass(img_bgr.copy())
logger.info(f" deskew: angle={angle_applied:.2f} ({_time.time() - t0:.1f}s)")
# 4. Dewarp
t0 = _time.time()
dewarped_bgr, dewarp_info = dewarp_image(deskewed_bgr)
logger.info(f" dewarp: shear={dewarp_info['shear_degrees']:.3f} ({_time.time() - t0:.1f}s)")
# 5. Content crop (removes scanner borders, gutter shadows)
t0 = _time.time()
try:
from page_crop import detect_and_crop_page
cropped_bgr, crop_result = detect_and_crop_page(dewarped_bgr)
if crop_result.get("crop_applied"):
dewarped_bgr = cropped_bgr
logger.info(f" crop: applied ({_time.time() - t0:.1f}s)")
else:
logger.info(f" crop: skipped ({_time.time() - t0:.1f}s)")
except Exception as e:
logger.warning(f" crop: failed ({e}), continuing with uncropped image")
# 5b. Scan quality assessment
scan_quality_report = None
try:
from scan_quality import score_scan_quality
scan_quality_report = score_scan_quality(dewarped_bgr)
except Exception as e:
logger.warning(f" scan quality: failed ({e})")
if override_min_conf:
min_ocr_conf = override_min_conf
else:
min_ocr_conf = scan_quality_report.recommended_min_conf if scan_quality_report else 40
# 5c. Image enhancement for degraded scans
is_degraded = scan_quality_report.is_degraded if scan_quality_report else False
if is_degraded and enable_enhance:
try:
from ocr_image_enhance import enhance_for_ocr
dewarped_bgr = enhance_for_ocr(dewarped_bgr, is_degraded=True)
logger.info(" enhancement: applied (degraded scan)")
except Exception as e:
logger.warning(f" enhancement: failed ({e})")
# 6. Dual-engine OCR (RapidOCR + Tesseract → merge)
t0 = _time.time()
img_h, img_w = dewarped_bgr.shape[:2]
# RapidOCR (local ONNX)
try:
from cv_ocr_engines import ocr_region_rapid
from cv_vocab_types import PageRegion
full_region = PageRegion(type="full_page", x=0, y=0, width=img_w, height=img_h)
rapid_words = ocr_region_rapid(dewarped_bgr, full_region) or []
except Exception as e:
logger.warning(f" RapidOCR failed: {e}")
rapid_words = []
# Tesseract
from PIL import Image
import pytesseract
pil_img = Image.fromarray(cv2.cvtColor(dewarped_bgr, cv2.COLOR_BGR2RGB))
data = pytesseract.image_to_data(
pil_img, lang="eng+deu", config="--psm 6 --oem 3",
output_type=pytesseract.Output.DICT,
)
tess_words = []
for i in range(len(data["text"])):
text = str(data["text"][i]).strip()
conf_raw = str(data["conf"][i])
conf = int(conf_raw) if conf_raw.lstrip("-").isdigit() else -1
if not text or conf < min_ocr_conf:
continue
tess_words.append({
"text": text,
"left": data["left"][i], "top": data["top"][i],
"width": data["width"][i], "height": data["height"][i],
"conf": conf,
})
# Merge dual-engine results
from ocr_pipeline_ocr_merge import _split_paddle_multi_words, _merge_paddle_tesseract, _deduplicate_words
from cv_words_first import build_grid_from_words
rapid_split = _split_paddle_multi_words(rapid_words) if rapid_words else []
if rapid_split or tess_words:
merged_words = _merge_paddle_tesseract(rapid_split, tess_words)
merged_words = _deduplicate_words(merged_words)
else:
merged_words = tess_words # fallback to Tesseract only
# Build initial grid from merged words
cells, columns_meta = build_grid_from_words(merged_words, img_w, img_h, max_columns=max_columns)
for cell in cells:
cell["ocr_engine"] = "rapid_kombi"
n_rows = len(set(c["row_index"] for c in cells)) if cells else 0
n_cols = len(columns_meta)
logger.info(f" ocr: rapid={len(rapid_words)}, tess={len(tess_words)}, "
f"merged={len(merged_words)}, cells={len(cells)} ({_time.time() - t0:.1f}s)")
# 7. Save word_result to pipeline session (needed by _build_grid_core)
word_result = {
"cells": cells,
"grid_shape": {"rows": n_rows, "cols": n_cols, "total_cells": len(cells)},
"columns_used": columns_meta,
"layout": "vocab" if {c.get("type") for c in columns_meta} & {"column_en", "column_de"} else "generic",
"image_width": img_w,
"image_height": img_h,
"duration_seconds": 0,
"ocr_engine": "rapid_kombi",
"raw_tesseract_words": tess_words,
"summary": {
"total_cells": len(cells),
"non_empty_cells": sum(1 for c in cells if c.get("text")),
},
}
# Save images + word_result to pipeline session for admin visibility
try:
_, dsk_buf = cv2.imencode(".png", deskewed_bgr)
_, dwp_buf = cv2.imencode(".png", dewarped_bgr)
await update_pipeline_session_db(
pipeline_session_id,
deskewed_png=dsk_buf.tobytes(),
dewarped_png=dwp_buf.tobytes(),
cropped_png=cv2.imencode(".png", dewarped_bgr)[1].tobytes(),
word_result=word_result,
deskew_result={"angle_applied": round(angle_applied, 3)},
dewarp_result={"shear_degrees": dewarp_info.get("shear_degrees", 0)},
current_step=8,
)
except Exception as e:
logger.warning(f"Could not update pipeline session: {e}")
# 8. Run full grid-build (with pipe-autocorrect, word-gap merge, etc.)
t0 = _time.time()
try:
from grid_editor_api import _build_grid_core
session_data = {
"word_result": word_result,
}
grid_result = await _build_grid_core(
pipeline_session_id, session_data,
ipa_mode=ipa_mode, syllable_mode=syllable_mode,
)
logger.info(f" grid-build: {grid_result.get('summary', {}).get('total_cells', 0)} cells "
f"({_time.time() - t0:.1f}s)")
# Save grid result to pipeline session
try:
await update_pipeline_session_db(
pipeline_session_id,
grid_editor_result=grid_result,
current_step=11,
)
except Exception:
pass
except Exception as e:
logger.warning(f" grid-build failed: {e}, falling back to basic grid")
grid_result = None
# 9. Extract vocab entries
# Prefer grid-build result (better column detection, more cells) over
# the initial build_grid_from_words() which often under-clusters.
page_vocabulary = []
extraction_source = "none"
# A) Try grid-build zones first (best quality: 4-column detection, autocorrect)
if grid_result and grid_result.get("zones"):
for zone in grid_result["zones"]:
zone_cols = zone.get("columns", [])
zone_cells = zone.get("cells", [])
if not zone_cols or not zone_cells:
continue
# Sort columns by x position to determine roles
sorted_cols = sorted(zone_cols, key=lambda c: c.get("x_min_px", 0))
col_idx_to_pos = {}
for pos, col in enumerate(sorted_cols):
ci = col.get("col_index", col.get("index", -1))
col_idx_to_pos[ci] = pos
# Skip zones with only 1 column (likely headers/boxes)
if len(sorted_cols) < 2:
continue
# Group cells by row
rows_map: dict = {}
for cell in zone_cells:
ri = cell.get("row_index", 0)
if ri not in rows_map:
rows_map[ri] = {}
ci = cell.get("col_index", 0)
rows_map[ri][ci] = (cell.get("text") or "").strip()
n_cols = len(sorted_cols)
for ri in sorted(rows_map.keys()):
row = rows_map[ri]
# Collect texts in column-position order
texts = []
for col in sorted_cols:
ci = col.get("col_index", col.get("index", -1))
texts.append(row.get(ci, ""))
if not any(texts):
continue
# Map by position, skipping narrow first column (page refs/markers)
# Heuristic: if first column is very narrow (<15% of zone width),
# it's likely a marker/ref column — skip it for vocab
first_col_width = sorted_cols[0].get("x_max_px", 0) - sorted_cols[0].get("x_min_px", 0)
zone_width = max(1, (sorted_cols[-1].get("x_max_px", 0) - sorted_cols[0].get("x_min_px", 0)))
skip_first = first_col_width / zone_width < 0.15 and n_cols >= 3
data_texts = texts[1:] if skip_first else texts
entry = {
"id": str(uuid.uuid4()),
"english": data_texts[0] if len(data_texts) > 0 else "",
"german": data_texts[1] if len(data_texts) > 1 else "",
"example_sentence": " ".join(t for t in data_texts[2:] if t) if len(data_texts) > 2 else "",
"source_page": page_number + 1,
}
if entry["english"] or entry["german"]:
page_vocabulary.append(entry)
if page_vocabulary:
extraction_source = f"grid-zones ({len(grid_result['zones'])} zones)"
# B) Fallback: original cells with column classification
if not page_vocabulary:
col_types = {c.get("type") for c in columns_meta}
is_vocab = bool(col_types & {"column_en", "column_de"})
if is_vocab:
entries = _cells_to_vocab_entries(cells, columns_meta)
entries = _fix_phonetic_brackets(entries, pronunciation="british")
for entry in entries:
if not entry.get("english") and not entry.get("german"):
continue
page_vocabulary.append({
"id": str(uuid.uuid4()),
"english": entry.get("english", ""),
"german": entry.get("german", ""),
"example_sentence": entry.get("example", ""),
"source_page": page_number + 1,
})
extraction_source = f"classified ({len(columns_meta)} cols)"
else:
# Last resort: all cells by position
rows_map2: dict = {}
for cell in cells:
ri = cell.get("row_index", 0)
if ri not in rows_map2:
rows_map2[ri] = {}
ci = cell.get("col_index", 0)
rows_map2[ri][ci] = (cell.get("text") or "").strip()
all_ci = sorted({ci for r in rows_map2.values() for ci in r.keys()})
for ri in sorted(rows_map2.keys()):
row = rows_map2[ri]
texts = [row.get(ci, "") for ci in all_ci]
if not any(texts):
continue
page_vocabulary.append({
"id": str(uuid.uuid4()),
"english": texts[0] if len(texts) > 0 else "",
"german": texts[1] if len(texts) > 1 else "",
"example_sentence": " ".join(texts[2:]) if len(texts) > 2 else "",
"source_page": page_number + 1,
})
extraction_source = f"generic ({len(all_ci)} cols)"
# --- Post-processing: merge cell-wrap continuation rows ---
if len(page_vocabulary) >= 2:
try:
# Convert to internal format (example_sentence → example)
internal = []
for v in page_vocabulary:
internal.append({
'row_index': len(internal),
'english': v.get('english', ''),
'german': v.get('german', ''),
'example': v.get('example_sentence', ''),
})
n_before = len(internal)
internal = _merge_wrapped_rows(internal)
internal = _merge_phonetic_continuation_rows(internal)
internal = _merge_continuation_rows(internal)
if len(internal) < n_before:
# Rebuild page_vocabulary from merged entries
merged_vocab = []
for entry in internal:
if not entry.get('english') and not entry.get('german'):
continue
merged_vocab.append({
'id': str(uuid.uuid4()),
'english': entry.get('english', ''),
'german': entry.get('german', ''),
'example_sentence': entry.get('example', ''),
'source_page': page_number + 1,
})
logger.info(f" row merging: {n_before}{len(merged_vocab)} entries")
page_vocabulary = merged_vocab
except Exception as e:
logger.warning(f" row merging failed (non-critical): {e}")
logger.info(f" vocab extraction: {len(page_vocabulary)} entries via {extraction_source}")
total_duration = _time.time() - t_total
logger.info(f"Kombi Pipeline page {page_number + 1}: "
f"{len(page_vocabulary)} vocab entries in {total_duration:.1f}s")
return page_vocabulary, rotation, scan_quality_report