feat: use OCR pipeline instead of LLM vision for vocab worksheet extraction
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 26s
CI / test-go-edu-search (push) Successful in 25s
CI / test-python-klausur (push) Failing after 1m52s
CI / test-python-agent-core (push) Successful in 18s
CI / test-nodejs-website (push) Successful in 17s

process-single-page now runs the full CV pipeline (deskew → dewarp → columns →
rows → cell-first OCR v2 → LLM review) for much better extraction quality.
Falls back to LLM vision if pipeline imports are unavailable.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-05 15:35:44 +01:00
parent 9ea77ba157
commit b7ae36e92b
2 changed files with 400 additions and 29 deletions

View File

@@ -615,6 +615,121 @@ class TestEdgeCases:
assert len(response.json()) == 5
# =============================================
# OCR PIPELINE INTEGRATION TESTS
# =============================================
class TestProcessSinglePageOCRPipeline:
"""Tests for the OCR pipeline integration in process-single-page."""
@patch("vocab_worksheet_api.OCR_PIPELINE_AVAILABLE", True)
@patch("vocab_worksheet_api._run_ocr_pipeline_for_page")
def test_process_single_page_uses_ocr_pipeline(self, mock_pipeline, client):
"""When OCR pipeline is available, process-single-page should use it."""
# Create a session with PDF data
session_id = str(uuid.uuid4())
fake_pdf = b"%PDF-1.4 fake"
_sessions[session_id] = {
"id": session_id,
"name": "Test",
"status": "uploaded",
"pdf_data": fake_pdf,
"pdf_page_count": 2,
"vocabulary": [],
}
# Mock the pipeline to return vocab entries
mock_pipeline.return_value = [
{
"id": str(uuid.uuid4()),
"english": "to achieve",
"german": "erreichen",
"example_sentence": "She achieved her goal.",
"source_page": 1,
},
{
"id": str(uuid.uuid4()),
"english": "goal",
"german": "Ziel",
"example_sentence": "",
"source_page": 1,
},
]
with patch("vocab_worksheet_api.convert_pdf_page_to_image", new_callable=AsyncMock) as mock_convert:
mock_convert.return_value = b"fake-png-data"
response = client.post(f"/api/v1/vocab/sessions/{session_id}/process-single-page/0")
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert data["vocabulary_count"] == 2
assert data["vocabulary"][0]["english"] == "to achieve"
assert data["vocabulary"][0]["source_page"] == 1
# Verify pipeline was called with correct args
mock_pipeline.assert_called_once_with(b"fake-png-data", 0, session_id)
@patch("vocab_worksheet_api.OCR_PIPELINE_AVAILABLE", True)
@patch("vocab_worksheet_api._run_ocr_pipeline_for_page")
def test_process_single_page_ocr_pipeline_error_returns_failure(self, mock_pipeline, client):
"""When the OCR pipeline raises an exception, return success=False."""
session_id = str(uuid.uuid4())
_sessions[session_id] = {
"id": session_id,
"name": "Test",
"status": "uploaded",
"pdf_data": b"%PDF-1.4 fake",
"pdf_page_count": 1,
"vocabulary": [],
}
mock_pipeline.side_effect = ValueError("Column detection failed")
with patch("vocab_worksheet_api.convert_pdf_page_to_image", new_callable=AsyncMock) as mock_convert:
mock_convert.return_value = b"fake-png-data"
response = client.post(f"/api/v1/vocab/sessions/{session_id}/process-single-page/0")
assert response.status_code == 200
data = response.json()
assert data["success"] is False
assert "OCR pipeline error" in data["error"]
assert data["vocabulary"] == []
@patch("vocab_worksheet_api.OCR_PIPELINE_AVAILABLE", False)
@patch("vocab_worksheet_api.extract_vocabulary_from_image", new_callable=AsyncMock)
def test_process_single_page_fallback_to_llm(self, mock_llm_extract, client):
"""When OCR pipeline is not available, fall back to LLM vision."""
session_id = str(uuid.uuid4())
_sessions[session_id] = {
"id": session_id,
"name": "Test",
"status": "uploaded",
"pdf_data": b"%PDF-1.4 fake",
"pdf_page_count": 1,
"vocabulary": [],
}
mock_entry = MagicMock()
mock_entry.dict.return_value = {
"id": str(uuid.uuid4()),
"english": "house",
"german": "Haus",
"example_sentence": "",
}
mock_llm_extract.return_value = ([mock_entry], 0.85, None)
with patch("vocab_worksheet_api.convert_pdf_page_to_image", new_callable=AsyncMock) as mock_convert:
mock_convert.return_value = b"fake-png-data"
response = client.post(f"/api/v1/vocab/sessions/{session_id}/process-single-page/0")
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert data["vocabulary_count"] == 1
assert data["vocabulary"][0]["english"] == "house"
# =============================================
# RUN TESTS
# =============================================

View File

@@ -59,6 +59,29 @@ except ImportError:
CV_PIPELINE_AVAILABLE = False
logger.warning("CV vocab pipeline not available")
# Try to import OCR Pipeline functions (for process-single-page)
try:
import cv2
import numpy as np
from cv_vocab_pipeline import (
deskew_image, deskew_image_by_word_alignment, deskew_image_iterative,
dewarp_image, create_ocr_image,
detect_column_geometry, analyze_layout_by_words, analyze_layout, create_layout_image,
detect_row_geometry, build_cell_grid_v2,
_cells_to_vocab_entries, _detect_sub_columns, _detect_header_footer_gaps,
expand_narrow_columns, classify_column_types, llm_review_entries,
_fix_phonetic_brackets,
PageRegion, RowGeometry,
)
from ocr_pipeline_session_store import (
create_session_db as create_pipeline_session_db,
update_session_db as update_pipeline_session_db,
)
OCR_PIPELINE_AVAILABLE = True
except ImportError as _ocr_pipe_err:
OCR_PIPELINE_AVAILABLE = False
logger.warning(f"OCR Pipeline functions not available: {_ocr_pipe_err}")
# Try to import Grid Detection Service
try:
from services.grid_detection_service import GridDetectionService
@@ -1221,11 +1244,12 @@ async def process_single_page(
page_number: int,
):
"""
Process a SINGLE page of an uploaded PDF - completely isolated.
Process a SINGLE page of an uploaded PDF using the OCR pipeline.
Uses the multi-step CV pipeline (deskew → dewarp → columns → rows → words)
instead of LLM vision for much better extraction quality.
This endpoint processes one page at a time to avoid LLM context issues.
The frontend should call this sequentially for each page.
Returns the vocabulary for just this one page.
"""
logger.info(f"Processing SINGLE page {page_number + 1} for session {session_id}")
@@ -1244,33 +1268,50 @@ async def process_single_page(
if page_number < 0 or page_number >= page_count:
raise HTTPException(status_code=400, detail=f"Invalid page number. PDF has {page_count} pages (0-indexed).")
# Convert just this ONE page to image
# Convert just this ONE page to PNG
image_data = await convert_pdf_page_to_image(pdf_data, page_number, thumbnail=False)
# Extract vocabulary from this single page
vocabulary, confidence, error = await extract_vocabulary_from_image(
image_data,
f"page_{page_number + 1}.png",
page_number=page_number
)
if error:
logger.warning(f"Page {page_number + 1} failed: {error}")
return {
"session_id": session_id,
"page_number": page_number + 1,
"success": False,
"error": error,
"vocabulary": [],
"vocabulary_count": 0,
}
# Convert vocabulary entries to dicts with page info
page_vocabulary = []
for entry in vocabulary:
entry_dict = entry.dict() if hasattr(entry, 'dict') else (entry.__dict__.copy() if hasattr(entry, '__dict__') else dict(entry))
entry_dict['source_page'] = page_number + 1
page_vocabulary.append(entry_dict)
# --- OCR Pipeline path ---
if OCR_PIPELINE_AVAILABLE:
try:
page_vocabulary = await _run_ocr_pipeline_for_page(
image_data, page_number, session_id,
)
except Exception as e:
logger.error(f"OCR pipeline failed for page {page_number + 1}: {e}", exc_info=True)
return {
"session_id": session_id,
"page_number": page_number + 1,
"success": False,
"error": f"OCR pipeline error: {e}",
"vocabulary": [],
"vocabulary_count": 0,
}
else:
# Fallback to LLM vision extraction
logger.warning("OCR pipeline not available, falling back to LLM vision")
vocabulary, confidence, error = await extract_vocabulary_from_image(
image_data,
f"page_{page_number + 1}.png",
page_number=page_number
)
if error:
logger.warning(f"Page {page_number + 1} failed: {error}")
return {
"session_id": session_id,
"page_number": page_number + 1,
"success": False,
"error": error,
"vocabulary": [],
"vocabulary_count": 0,
}
page_vocabulary = []
for entry in vocabulary:
entry_dict = entry.dict() if hasattr(entry, 'dict') else (entry.__dict__.copy() if hasattr(entry, '__dict__') else dict(entry))
entry_dict['source_page'] = page_number + 1
if 'id' not in entry_dict or not entry_dict['id']:
entry_dict['id'] = str(uuid.uuid4())
page_vocabulary.append(entry_dict)
logger.info(f"Page {page_number + 1}: {len(page_vocabulary)} Vokabeln extrahiert")
@@ -1290,10 +1331,225 @@ async def process_single_page(
"vocabulary": page_vocabulary,
"vocabulary_count": len(page_vocabulary),
"total_vocabulary_count": len(existing_vocab),
"extraction_confidence": confidence,
"extraction_confidence": 0.9,
}
async def _run_ocr_pipeline_for_page(
png_data: bytes,
page_number: int,
vocab_session_id: str,
) -> list:
"""Run the full OCR pipeline on a single page image and return vocab entries.
Steps: deskew → dewarp → columns → rows → words → (LLM review)
Returns list of dicts with keys: id, english, german, example_sentence, source_page
"""
import time as _time
t_total = _time.time()
# 1. Decode PNG → BGR numpy array
arr = np.frombuffer(png_data, dtype=np.uint8)
img_bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if img_bgr is None:
raise ValueError("Failed to decode page image")
img_h, img_w = img_bgr.shape[:2]
logger.info(f"OCR Pipeline page {page_number + 1}: image {img_w}x{img_h}")
# 2. Create pipeline session in DB (for debugging in admin UI)
pipeline_session_id = str(uuid.uuid4())
try:
await create_pipeline_session_db(
pipeline_session_id,
name=f"vocab-ws-{vocab_session_id[:8]}-p{page_number + 1}",
filename=f"page_{page_number + 1}.png",
original_png=png_data,
)
except Exception as e:
logger.warning(f"Could not create pipeline session in DB: {e}")
# 3. Deskew (3 methods, pick best)
t0 = _time.time()
try:
deskewed_hough, angle_hough = deskew_image(img_bgr.copy())
except Exception:
deskewed_hough, angle_hough = img_bgr, 0.0
success_enc, png_orig = cv2.imencode(".png", img_bgr)
orig_bytes = png_orig.tobytes() if success_enc else b""
try:
deskewed_wa_bytes, angle_wa = deskew_image_by_word_alignment(orig_bytes)
except Exception:
deskewed_wa_bytes, angle_wa = orig_bytes, 0.0
try:
deskewed_iter, angle_iterative, _ = deskew_image_iterative(img_bgr.copy())
except Exception:
deskewed_iter, angle_iterative = img_bgr, 0.0
# Pick best
if abs(angle_iterative) >= 0.05:
deskewed_bgr = deskewed_iter
angle_applied = angle_iterative
elif abs(angle_wa) >= abs(angle_hough) or abs(angle_hough) < 0.1:
angle_applied = angle_wa
wa_array = np.frombuffer(deskewed_wa_bytes, dtype=np.uint8)
deskewed_bgr = cv2.imdecode(wa_array, cv2.IMREAD_COLOR)
if deskewed_bgr is None:
deskewed_bgr = deskewed_hough
angle_applied = angle_hough
else:
deskewed_bgr = deskewed_hough
angle_applied = angle_hough
logger.info(f" deskew: hough={angle_hough:.2f} wa={angle_wa:.2f} "
f"iter={angle_iterative:.2f} → applied={angle_applied:.2f} "
f"({_time.time() - t0:.1f}s)")
# 4. Dewarp
t0 = _time.time()
dewarped_bgr, dewarp_info = dewarp_image(deskewed_bgr)
logger.info(f" dewarp: shear={dewarp_info['shear_degrees']:.3f} ({_time.time() - t0:.1f}s)")
# 5. Column detection
t0 = _time.time()
ocr_img = create_ocr_image(dewarped_bgr)
h, w = ocr_img.shape[:2]
geo_result = detect_column_geometry(ocr_img, dewarped_bgr)
if geo_result is None:
layout_img = create_layout_image(dewarped_bgr)
regions = analyze_layout(layout_img, ocr_img)
word_dicts = None
inv = None
content_bounds = None
else:
geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv = geo_result
content_w = right_x - left_x
header_y, footer_y = _detect_header_footer_gaps(inv, w, h) if inv is not None else (None, None)
geometries = _detect_sub_columns(geometries, content_w, left_x=left_x,
top_y=top_y, header_y=header_y, footer_y=footer_y)
geometries = expand_narrow_columns(geometries, content_w, left_x, word_dicts)
regions = classify_column_types(geometries, content_w, top_y, w, h, bottom_y,
left_x=left_x, right_x=right_x, inv=inv)
content_bounds = (left_x, right_x, top_y, bottom_y)
logger.info(f" columns: {len(regions)} detected ({_time.time() - t0:.1f}s)")
# 6. Row detection
t0 = _time.time()
if word_dicts is None or inv is None or content_bounds is None:
# Re-run geometry detection to get intermediates
geo_result2 = detect_column_geometry(ocr_img, dewarped_bgr)
if geo_result2 is None:
raise ValueError("Column geometry detection failed — cannot detect rows")
_, left_x, right_x, top_y, bottom_y, word_dicts, inv = geo_result2
content_bounds = (left_x, right_x, top_y, bottom_y)
left_x, right_x, top_y, bottom_y = content_bounds
rows = detect_row_geometry(inv, word_dicts, left_x, right_x, top_y, bottom_y)
logger.info(f" rows: {len(rows)} detected ({_time.time() - t0:.1f}s)")
# 7. Word recognition (cell-first OCR v2)
t0 = _time.time()
col_regions = regions # already PageRegion objects
# Populate row.words for word_count filtering
for row in rows:
row_y_rel = row.y - top_y
row_bottom_rel = row_y_rel + row.height
row.words = [
wd for wd in word_dicts
if row_y_rel <= wd['top'] + wd['height'] / 2 < row_bottom_rel
]
row.word_count = len(row.words)
cells, columns_meta = build_cell_grid_v2(
ocr_img, col_regions, rows, img_w, img_h,
ocr_engine="auto", img_bgr=dewarped_bgr,
)
col_types = {c['type'] for c in columns_meta}
is_vocab = bool(col_types & {'column_en', 'column_de'})
logger.info(f" words: {len(cells)} cells, vocab={is_vocab} ({_time.time() - t0:.1f}s)")
if not is_vocab:
logger.warning(f" Page {page_number + 1}: layout is not vocab table "
f"(types: {col_types}), returning empty")
return []
# 8. Map cells → vocab entries
entries = _cells_to_vocab_entries(cells, columns_meta)
entries = _fix_phonetic_brackets(entries, pronunciation="british")
# 9. Optional LLM review
try:
review_result = await llm_review_entries(entries)
if review_result and review_result.get("changes"):
# Apply corrections
changes_map = {}
for ch in review_result["changes"]:
idx = ch.get("index")
if idx is not None:
changes_map[idx] = ch
for idx, ch in changes_map.items():
if 0 <= idx < len(entries):
for field in ("english", "german", "example"):
if ch.get(field) and ch[field] != entries[idx].get(field):
entries[idx][field] = ch[field]
logger.info(f" llm review: {len(review_result['changes'])} corrections applied")
except Exception as e:
logger.warning(f" llm review skipped: {e}")
# 10. Map to frontend format
page_vocabulary = []
for entry in entries:
if not entry.get("english") and not entry.get("german"):
continue # skip empty rows
page_vocabulary.append({
"id": str(uuid.uuid4()),
"english": entry.get("english", ""),
"german": entry.get("german", ""),
"example_sentence": entry.get("example", ""),
"source_page": page_number + 1,
})
# 11. Update pipeline session in DB (for admin debugging)
try:
success_dsk, dsk_buf = cv2.imencode(".png", deskewed_bgr)
deskewed_png = dsk_buf.tobytes() if success_dsk else None
success_dwp, dwp_buf = cv2.imencode(".png", dewarped_bgr)
dewarped_png = dwp_buf.tobytes() if success_dwp else None
await update_pipeline_session_db(
pipeline_session_id,
deskewed_png=deskewed_png,
dewarped_png=dewarped_png,
deskew_result={"angle_applied": round(angle_applied, 3)},
dewarp_result={"shear_degrees": dewarp_info.get("shear_degrees", 0)},
column_result={"columns": [{"type": r.type, "x": r.x, "y": r.y,
"width": r.width, "height": r.height}
for r in col_regions]},
row_result={"total_rows": len(rows)},
word_result={
"entry_count": len(page_vocabulary),
"layout": "vocab",
"vocab_entries": entries,
},
current_step=6,
)
except Exception as e:
logger.warning(f"Could not update pipeline session: {e}")
total_duration = _time.time() - t_total
logger.info(f"OCR Pipeline page {page_number + 1}: "
f"{len(page_vocabulary)} vocab entries in {total_duration:.1f}s")
return page_vocabulary
@router.post("/sessions/{session_id}/process-pages")
async def process_pdf_pages(
session_id: str,