diff --git a/klausur-service/backend/tests/test_vocab_worksheet.py b/klausur-service/backend/tests/test_vocab_worksheet.py index a5de623..6889d7d 100644 --- a/klausur-service/backend/tests/test_vocab_worksheet.py +++ b/klausur-service/backend/tests/test_vocab_worksheet.py @@ -615,6 +615,121 @@ class TestEdgeCases: assert len(response.json()) == 5 +# ============================================= +# OCR PIPELINE INTEGRATION TESTS +# ============================================= + +class TestProcessSinglePageOCRPipeline: + """Tests for the OCR pipeline integration in process-single-page.""" + + @patch("vocab_worksheet_api.OCR_PIPELINE_AVAILABLE", True) + @patch("vocab_worksheet_api._run_ocr_pipeline_for_page") + def test_process_single_page_uses_ocr_pipeline(self, mock_pipeline, client): + """When OCR pipeline is available, process-single-page should use it.""" + # Create a session with PDF data + session_id = str(uuid.uuid4()) + fake_pdf = b"%PDF-1.4 fake" + _sessions[session_id] = { + "id": session_id, + "name": "Test", + "status": "uploaded", + "pdf_data": fake_pdf, + "pdf_page_count": 2, + "vocabulary": [], + } + + # Mock the pipeline to return vocab entries + mock_pipeline.return_value = [ + { + "id": str(uuid.uuid4()), + "english": "to achieve", + "german": "erreichen", + "example_sentence": "She achieved her goal.", + "source_page": 1, + }, + { + "id": str(uuid.uuid4()), + "english": "goal", + "german": "Ziel", + "example_sentence": "", + "source_page": 1, + }, + ] + + with patch("vocab_worksheet_api.convert_pdf_page_to_image", new_callable=AsyncMock) as mock_convert: + mock_convert.return_value = b"fake-png-data" + response = client.post(f"/api/v1/vocab/sessions/{session_id}/process-single-page/0") + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert data["vocabulary_count"] == 2 + assert data["vocabulary"][0]["english"] == "to achieve" + assert data["vocabulary"][0]["source_page"] == 1 + + # Verify pipeline was called with correct args + mock_pipeline.assert_called_once_with(b"fake-png-data", 0, session_id) + + @patch("vocab_worksheet_api.OCR_PIPELINE_AVAILABLE", True) + @patch("vocab_worksheet_api._run_ocr_pipeline_for_page") + def test_process_single_page_ocr_pipeline_error_returns_failure(self, mock_pipeline, client): + """When the OCR pipeline raises an exception, return success=False.""" + session_id = str(uuid.uuid4()) + _sessions[session_id] = { + "id": session_id, + "name": "Test", + "status": "uploaded", + "pdf_data": b"%PDF-1.4 fake", + "pdf_page_count": 1, + "vocabulary": [], + } + + mock_pipeline.side_effect = ValueError("Column detection failed") + + with patch("vocab_worksheet_api.convert_pdf_page_to_image", new_callable=AsyncMock) as mock_convert: + mock_convert.return_value = b"fake-png-data" + response = client.post(f"/api/v1/vocab/sessions/{session_id}/process-single-page/0") + + assert response.status_code == 200 + data = response.json() + assert data["success"] is False + assert "OCR pipeline error" in data["error"] + assert data["vocabulary"] == [] + + @patch("vocab_worksheet_api.OCR_PIPELINE_AVAILABLE", False) + @patch("vocab_worksheet_api.extract_vocabulary_from_image", new_callable=AsyncMock) + def test_process_single_page_fallback_to_llm(self, mock_llm_extract, client): + """When OCR pipeline is not available, fall back to LLM vision.""" + session_id = str(uuid.uuid4()) + _sessions[session_id] = { + "id": session_id, + "name": "Test", + "status": "uploaded", + "pdf_data": b"%PDF-1.4 fake", + "pdf_page_count": 1, + "vocabulary": [], + } + + mock_entry = MagicMock() + mock_entry.dict.return_value = { + "id": str(uuid.uuid4()), + "english": "house", + "german": "Haus", + "example_sentence": "", + } + mock_llm_extract.return_value = ([mock_entry], 0.85, None) + + with patch("vocab_worksheet_api.convert_pdf_page_to_image", new_callable=AsyncMock) as mock_convert: + mock_convert.return_value = b"fake-png-data" + response = client.post(f"/api/v1/vocab/sessions/{session_id}/process-single-page/0") + + assert response.status_code == 200 + data = response.json() + assert data["success"] is True + assert data["vocabulary_count"] == 1 + assert data["vocabulary"][0]["english"] == "house" + + # ============================================= # RUN TESTS # ============================================= diff --git a/klausur-service/backend/vocab_worksheet_api.py b/klausur-service/backend/vocab_worksheet_api.py index 29b4c69..cc68d81 100644 --- a/klausur-service/backend/vocab_worksheet_api.py +++ b/klausur-service/backend/vocab_worksheet_api.py @@ -59,6 +59,29 @@ except ImportError: CV_PIPELINE_AVAILABLE = False logger.warning("CV vocab pipeline not available") +# Try to import OCR Pipeline functions (for process-single-page) +try: + import cv2 + import numpy as np + from cv_vocab_pipeline import ( + deskew_image, deskew_image_by_word_alignment, deskew_image_iterative, + dewarp_image, create_ocr_image, + detect_column_geometry, analyze_layout_by_words, analyze_layout, create_layout_image, + detect_row_geometry, build_cell_grid_v2, + _cells_to_vocab_entries, _detect_sub_columns, _detect_header_footer_gaps, + expand_narrow_columns, classify_column_types, llm_review_entries, + _fix_phonetic_brackets, + PageRegion, RowGeometry, + ) + from ocr_pipeline_session_store import ( + create_session_db as create_pipeline_session_db, + update_session_db as update_pipeline_session_db, + ) + OCR_PIPELINE_AVAILABLE = True +except ImportError as _ocr_pipe_err: + OCR_PIPELINE_AVAILABLE = False + logger.warning(f"OCR Pipeline functions not available: {_ocr_pipe_err}") + # Try to import Grid Detection Service try: from services.grid_detection_service import GridDetectionService @@ -1221,11 +1244,12 @@ async def process_single_page( page_number: int, ): """ - Process a SINGLE page of an uploaded PDF - completely isolated. + Process a SINGLE page of an uploaded PDF using the OCR pipeline. + + Uses the multi-step CV pipeline (deskew → dewarp → columns → rows → words) + instead of LLM vision for much better extraction quality. - This endpoint processes one page at a time to avoid LLM context issues. The frontend should call this sequentially for each page. - Returns the vocabulary for just this one page. """ logger.info(f"Processing SINGLE page {page_number + 1} for session {session_id}") @@ -1244,33 +1268,50 @@ async def process_single_page( if page_number < 0 or page_number >= page_count: raise HTTPException(status_code=400, detail=f"Invalid page number. PDF has {page_count} pages (0-indexed).") - # Convert just this ONE page to image + # Convert just this ONE page to PNG image_data = await convert_pdf_page_to_image(pdf_data, page_number, thumbnail=False) - # Extract vocabulary from this single page - vocabulary, confidence, error = await extract_vocabulary_from_image( - image_data, - f"page_{page_number + 1}.png", - page_number=page_number - ) - - if error: - logger.warning(f"Page {page_number + 1} failed: {error}") - return { - "session_id": session_id, - "page_number": page_number + 1, - "success": False, - "error": error, - "vocabulary": [], - "vocabulary_count": 0, - } - - # Convert vocabulary entries to dicts with page info - page_vocabulary = [] - for entry in vocabulary: - entry_dict = entry.dict() if hasattr(entry, 'dict') else (entry.__dict__.copy() if hasattr(entry, '__dict__') else dict(entry)) - entry_dict['source_page'] = page_number + 1 - page_vocabulary.append(entry_dict) + # --- OCR Pipeline path --- + if OCR_PIPELINE_AVAILABLE: + try: + page_vocabulary = await _run_ocr_pipeline_for_page( + image_data, page_number, session_id, + ) + except Exception as e: + logger.error(f"OCR pipeline failed for page {page_number + 1}: {e}", exc_info=True) + return { + "session_id": session_id, + "page_number": page_number + 1, + "success": False, + "error": f"OCR pipeline error: {e}", + "vocabulary": [], + "vocabulary_count": 0, + } + else: + # Fallback to LLM vision extraction + logger.warning("OCR pipeline not available, falling back to LLM vision") + vocabulary, confidence, error = await extract_vocabulary_from_image( + image_data, + f"page_{page_number + 1}.png", + page_number=page_number + ) + if error: + logger.warning(f"Page {page_number + 1} failed: {error}") + return { + "session_id": session_id, + "page_number": page_number + 1, + "success": False, + "error": error, + "vocabulary": [], + "vocabulary_count": 0, + } + page_vocabulary = [] + for entry in vocabulary: + entry_dict = entry.dict() if hasattr(entry, 'dict') else (entry.__dict__.copy() if hasattr(entry, '__dict__') else dict(entry)) + entry_dict['source_page'] = page_number + 1 + if 'id' not in entry_dict or not entry_dict['id']: + entry_dict['id'] = str(uuid.uuid4()) + page_vocabulary.append(entry_dict) logger.info(f"Page {page_number + 1}: {len(page_vocabulary)} Vokabeln extrahiert") @@ -1290,10 +1331,225 @@ async def process_single_page( "vocabulary": page_vocabulary, "vocabulary_count": len(page_vocabulary), "total_vocabulary_count": len(existing_vocab), - "extraction_confidence": confidence, + "extraction_confidence": 0.9, } +async def _run_ocr_pipeline_for_page( + png_data: bytes, + page_number: int, + vocab_session_id: str, +) -> list: + """Run the full OCR pipeline on a single page image and return vocab entries. + + Steps: deskew → dewarp → columns → rows → words → (LLM review) + Returns list of dicts with keys: id, english, german, example_sentence, source_page + """ + import time as _time + + t_total = _time.time() + + # 1. Decode PNG → BGR numpy array + arr = np.frombuffer(png_data, dtype=np.uint8) + img_bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR) + if img_bgr is None: + raise ValueError("Failed to decode page image") + + img_h, img_w = img_bgr.shape[:2] + logger.info(f"OCR Pipeline page {page_number + 1}: image {img_w}x{img_h}") + + # 2. Create pipeline session in DB (for debugging in admin UI) + pipeline_session_id = str(uuid.uuid4()) + try: + await create_pipeline_session_db( + pipeline_session_id, + name=f"vocab-ws-{vocab_session_id[:8]}-p{page_number + 1}", + filename=f"page_{page_number + 1}.png", + original_png=png_data, + ) + except Exception as e: + logger.warning(f"Could not create pipeline session in DB: {e}") + + # 3. Deskew (3 methods, pick best) + t0 = _time.time() + try: + deskewed_hough, angle_hough = deskew_image(img_bgr.copy()) + except Exception: + deskewed_hough, angle_hough = img_bgr, 0.0 + + success_enc, png_orig = cv2.imencode(".png", img_bgr) + orig_bytes = png_orig.tobytes() if success_enc else b"" + try: + deskewed_wa_bytes, angle_wa = deskew_image_by_word_alignment(orig_bytes) + except Exception: + deskewed_wa_bytes, angle_wa = orig_bytes, 0.0 + + try: + deskewed_iter, angle_iterative, _ = deskew_image_iterative(img_bgr.copy()) + except Exception: + deskewed_iter, angle_iterative = img_bgr, 0.0 + + # Pick best + if abs(angle_iterative) >= 0.05: + deskewed_bgr = deskewed_iter + angle_applied = angle_iterative + elif abs(angle_wa) >= abs(angle_hough) or abs(angle_hough) < 0.1: + angle_applied = angle_wa + wa_array = np.frombuffer(deskewed_wa_bytes, dtype=np.uint8) + deskewed_bgr = cv2.imdecode(wa_array, cv2.IMREAD_COLOR) + if deskewed_bgr is None: + deskewed_bgr = deskewed_hough + angle_applied = angle_hough + else: + deskewed_bgr = deskewed_hough + angle_applied = angle_hough + + logger.info(f" deskew: hough={angle_hough:.2f} wa={angle_wa:.2f} " + f"iter={angle_iterative:.2f} → applied={angle_applied:.2f} " + f"({_time.time() - t0:.1f}s)") + + # 4. Dewarp + t0 = _time.time() + dewarped_bgr, dewarp_info = dewarp_image(deskewed_bgr) + logger.info(f" dewarp: shear={dewarp_info['shear_degrees']:.3f} ({_time.time() - t0:.1f}s)") + + # 5. Column detection + t0 = _time.time() + ocr_img = create_ocr_image(dewarped_bgr) + h, w = ocr_img.shape[:2] + + geo_result = detect_column_geometry(ocr_img, dewarped_bgr) + if geo_result is None: + layout_img = create_layout_image(dewarped_bgr) + regions = analyze_layout(layout_img, ocr_img) + word_dicts = None + inv = None + content_bounds = None + else: + geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv = geo_result + content_w = right_x - left_x + header_y, footer_y = _detect_header_footer_gaps(inv, w, h) if inv is not None else (None, None) + geometries = _detect_sub_columns(geometries, content_w, left_x=left_x, + top_y=top_y, header_y=header_y, footer_y=footer_y) + geometries = expand_narrow_columns(geometries, content_w, left_x, word_dicts) + regions = classify_column_types(geometries, content_w, top_y, w, h, bottom_y, + left_x=left_x, right_x=right_x, inv=inv) + content_bounds = (left_x, right_x, top_y, bottom_y) + + logger.info(f" columns: {len(regions)} detected ({_time.time() - t0:.1f}s)") + + # 6. Row detection + t0 = _time.time() + if word_dicts is None or inv is None or content_bounds is None: + # Re-run geometry detection to get intermediates + geo_result2 = detect_column_geometry(ocr_img, dewarped_bgr) + if geo_result2 is None: + raise ValueError("Column geometry detection failed — cannot detect rows") + _, left_x, right_x, top_y, bottom_y, word_dicts, inv = geo_result2 + content_bounds = (left_x, right_x, top_y, bottom_y) + + left_x, right_x, top_y, bottom_y = content_bounds + rows = detect_row_geometry(inv, word_dicts, left_x, right_x, top_y, bottom_y) + logger.info(f" rows: {len(rows)} detected ({_time.time() - t0:.1f}s)") + + # 7. Word recognition (cell-first OCR v2) + t0 = _time.time() + col_regions = regions # already PageRegion objects + + # Populate row.words for word_count filtering + for row in rows: + row_y_rel = row.y - top_y + row_bottom_rel = row_y_rel + row.height + row.words = [ + wd for wd in word_dicts + if row_y_rel <= wd['top'] + wd['height'] / 2 < row_bottom_rel + ] + row.word_count = len(row.words) + + cells, columns_meta = build_cell_grid_v2( + ocr_img, col_regions, rows, img_w, img_h, + ocr_engine="auto", img_bgr=dewarped_bgr, + ) + + col_types = {c['type'] for c in columns_meta} + is_vocab = bool(col_types & {'column_en', 'column_de'}) + logger.info(f" words: {len(cells)} cells, vocab={is_vocab} ({_time.time() - t0:.1f}s)") + + if not is_vocab: + logger.warning(f" Page {page_number + 1}: layout is not vocab table " + f"(types: {col_types}), returning empty") + return [] + + # 8. Map cells → vocab entries + entries = _cells_to_vocab_entries(cells, columns_meta) + entries = _fix_phonetic_brackets(entries, pronunciation="british") + + # 9. Optional LLM review + try: + review_result = await llm_review_entries(entries) + if review_result and review_result.get("changes"): + # Apply corrections + changes_map = {} + for ch in review_result["changes"]: + idx = ch.get("index") + if idx is not None: + changes_map[idx] = ch + for idx, ch in changes_map.items(): + if 0 <= idx < len(entries): + for field in ("english", "german", "example"): + if ch.get(field) and ch[field] != entries[idx].get(field): + entries[idx][field] = ch[field] + logger.info(f" llm review: {len(review_result['changes'])} corrections applied") + except Exception as e: + logger.warning(f" llm review skipped: {e}") + + # 10. Map to frontend format + page_vocabulary = [] + for entry in entries: + if not entry.get("english") and not entry.get("german"): + continue # skip empty rows + page_vocabulary.append({ + "id": str(uuid.uuid4()), + "english": entry.get("english", ""), + "german": entry.get("german", ""), + "example_sentence": entry.get("example", ""), + "source_page": page_number + 1, + }) + + # 11. Update pipeline session in DB (for admin debugging) + try: + success_dsk, dsk_buf = cv2.imencode(".png", deskewed_bgr) + deskewed_png = dsk_buf.tobytes() if success_dsk else None + success_dwp, dwp_buf = cv2.imencode(".png", dewarped_bgr) + dewarped_png = dwp_buf.tobytes() if success_dwp else None + + await update_pipeline_session_db( + pipeline_session_id, + deskewed_png=deskewed_png, + dewarped_png=dewarped_png, + deskew_result={"angle_applied": round(angle_applied, 3)}, + dewarp_result={"shear_degrees": dewarp_info.get("shear_degrees", 0)}, + column_result={"columns": [{"type": r.type, "x": r.x, "y": r.y, + "width": r.width, "height": r.height} + for r in col_regions]}, + row_result={"total_rows": len(rows)}, + word_result={ + "entry_count": len(page_vocabulary), + "layout": "vocab", + "vocab_entries": entries, + }, + current_step=6, + ) + except Exception as e: + logger.warning(f"Could not update pipeline session: {e}") + + total_duration = _time.time() - t_total + logger.info(f"OCR Pipeline page {page_number + 1}: " + f"{len(page_vocabulary)} vocab entries in {total_duration:.1f}s") + + return page_vocabulary + + @router.post("/sessions/{session_id}/process-pages") async def process_pdf_pages( session_id: str,