feat(ocr): Add CV Document Reconstruction Pipeline for vocabulary extraction

New OCR method using classical Computer Vision: high-res rendering (432 DPI),
deskew, dewarp, binarization, projection-profile layout analysis, multi-pass
Tesseract OCR with region-specific PSM, and Y-coordinate line alignment.
Includes bugfix for convert_pdf_to_image call (line 869) and 39 unit tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
BreakPilot Dev
2026-02-09 23:52:35 +01:00
parent 916ecef476
commit fa958d31f6
4 changed files with 2096 additions and 50 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,569 @@
"""
Unit Tests for CV Vocab Pipeline (cv_vocab_pipeline.py)
Tests cover:
- Data classes (PageRegion, VocabRow, PipelineResult)
- Stage 2: Deskew image
- Stage 3: Dewarp (pass-through)
- Stage 4: Image preparation (OCR + Layout images)
- Stage 5: Layout analysis (content bounds, projection profiles, column detection)
- Stage 6: Multi-pass OCR region handling
- Stage 7: Line grouping and vocabulary matching
- Orchestrator (run_cv_pipeline)
DSGVO Note: All tests run locally with synthetic data. No external API calls.
"""
import pytest
import numpy as np
from unittest.mock import AsyncMock, MagicMock, patch, PropertyMock
from dataclasses import asdict
# Import module under test
from cv_vocab_pipeline import (
PageRegion,
VocabRow,
PipelineResult,
deskew_image,
dewarp_image,
create_ocr_image,
create_layout_image,
_find_content_bounds,
analyze_layout,
_group_words_into_lines,
match_lines_to_vocab,
run_cv_pipeline,
CV2_AVAILABLE,
TESSERACT_AVAILABLE,
CV_PIPELINE_AVAILABLE,
)
# =============================================
# FIXTURES
# =============================================
@pytest.fixture
def white_image():
"""Create a simple 300x200 white BGR image."""
return np.ones((200, 300, 3), dtype=np.uint8) * 255
@pytest.fixture
def text_like_image():
"""Create a 600x400 image with dark text-like regions simulating 3 columns."""
img = np.ones((400, 600, 3), dtype=np.uint8) * 255
# Column 1 (EN): x=20..170
for y in range(50, 350, 30):
img[y:y+15, 30:160, :] = 30 # Dark text lines
# Gap between col1 and col2: x=170..210 (white)
# Column 2 (DE): x=210..370
for y in range(50, 350, 30):
img[y:y+15, 220:360, :] = 30
# Gap between col2 and col3: x=370..410 (white)
# Column 3 (Example): x=410..580
for y in range(50, 350, 30):
img[y:y+15, 420:570, :] = 30
return img
@pytest.fixture
def binary_image():
"""Create a binary (single-channel) image for OCR tests."""
# White background (255) with some black text-like areas
img = np.ones((400, 600), dtype=np.uint8) * 255
# Add text-like dark bands
for y in range(50, 350, 30):
img[y:y+15, 30:570] = 0
return img
@pytest.fixture
def sample_words_column_en():
"""Sample OCR word dicts for English column."""
return [
{'text': 'achieve', 'left': 30, 'top': 50, 'width': 80, 'height': 15, 'conf': 90, 'region_type': 'column_en'},
{'text': 'improve', 'left': 30, 'top': 80, 'width': 80, 'height': 15, 'conf': 85, 'region_type': 'column_en'},
{'text': 'success', 'left': 30, 'top': 110, 'width': 80, 'height': 15, 'conf': 92, 'region_type': 'column_en'},
]
@pytest.fixture
def sample_words_column_de():
"""Sample OCR word dicts for German column."""
return [
{'text': 'erreichen', 'left': 220, 'top': 52, 'width': 100, 'height': 15, 'conf': 88, 'region_type': 'column_de'},
{'text': 'verbessern', 'left': 220, 'top': 82, 'width': 100, 'height': 15, 'conf': 80, 'region_type': 'column_de'},
{'text': 'Erfolg', 'left': 220, 'top': 112, 'width': 100, 'height': 15, 'conf': 95, 'region_type': 'column_de'},
]
@pytest.fixture
def sample_words_column_ex():
"""Sample OCR word dicts for Example column."""
return [
{'text': 'She', 'left': 420, 'top': 50, 'width': 30, 'height': 15, 'conf': 85, 'region_type': 'column_example'},
{'text': 'achieved', 'left': 455, 'top': 50, 'width': 70, 'height': 15, 'conf': 80, 'region_type': 'column_example'},
{'text': 'her', 'left': 530, 'top': 50, 'width': 30, 'height': 15, 'conf': 90, 'region_type': 'column_example'},
{'text': 'goals.', 'left': 420, 'top': 52, 'width': 50, 'height': 15, 'conf': 75, 'region_type': 'column_example'},
]
@pytest.fixture
def sample_regions():
"""Sample 3-column PageRegion layout."""
return [
PageRegion(type='column_en', x=0, y=50, width=190, height=300),
PageRegion(type='column_de', x=210, y=50, width=160, height=300),
PageRegion(type='column_example', x=410, y=50, width=190, height=300),
]
# =============================================
# DATA CLASS TESTS
# =============================================
class TestDataClasses:
"""Test data classes for correct defaults and fields."""
def test_page_region_creation(self):
region = PageRegion(type='column_en', x=10, y=20, width=100, height=200)
assert region.type == 'column_en'
assert region.x == 10
assert region.y == 20
assert region.width == 100
assert region.height == 200
def test_vocab_row_defaults(self):
row = VocabRow()
assert row.english == ""
assert row.german == ""
assert row.example == ""
assert row.confidence == 0.0
assert row.y_position == 0
def test_vocab_row_with_values(self):
row = VocabRow(english="test", german="Test", example="A test.", confidence=85.5, y_position=100)
assert row.english == "test"
assert row.german == "Test"
assert row.confidence == 85.5
def test_pipeline_result_defaults(self):
result = PipelineResult()
assert result.vocabulary == []
assert result.word_count == 0
assert result.columns_detected == 0
assert result.duration_seconds == 0.0
assert result.stages == {}
assert result.error is None
def test_pipeline_result_error(self):
result = PipelineResult(error="Something went wrong")
assert result.error == "Something went wrong"
# =============================================
# STAGE 2: DESKEW TESTS
# =============================================
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
class TestDeskew:
"""Test deskew (rotation correction) stage."""
def test_deskew_straight_image(self, white_image):
"""A perfectly straight image should not be rotated."""
corrected, angle = deskew_image(white_image)
assert abs(angle) < 0.1
assert corrected.shape == white_image.shape
def test_deskew_returns_tuple(self, white_image):
"""deskew_image must return (image, angle) tuple."""
result = deskew_image(white_image)
assert isinstance(result, tuple)
assert len(result) == 2
assert isinstance(result[0], np.ndarray)
assert isinstance(result[1], float)
def test_deskew_preserves_shape(self, text_like_image):
"""Output image should have same shape as input."""
corrected, _ = deskew_image(text_like_image)
assert corrected.shape == text_like_image.shape
# =============================================
# STAGE 3: DEWARP TESTS
# =============================================
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
class TestDewarp:
"""Test dewarp (pass-through) stage."""
def test_dewarp_passthrough(self, white_image):
"""Current dewarp should return the same image (pass-through)."""
result = dewarp_image(white_image)
np.testing.assert_array_equal(result, white_image)
def test_dewarp_preserves_shape(self, text_like_image):
result = dewarp_image(text_like_image)
assert result.shape == text_like_image.shape
# =============================================
# STAGE 4: IMAGE PREPARATION TESTS
# =============================================
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
class TestImagePreparation:
"""Test OCR and layout image creation."""
def test_create_ocr_image_returns_grayscale(self, text_like_image):
"""OCR image should be single-channel (binarized)."""
ocr_img = create_ocr_image(text_like_image)
assert len(ocr_img.shape) == 2 # Single channel
assert ocr_img.dtype == np.uint8
def test_create_ocr_image_is_binary(self, text_like_image):
"""OCR image should contain only 0 and 255 values."""
ocr_img = create_ocr_image(text_like_image)
unique_vals = np.unique(ocr_img)
assert all(v in [0, 255] for v in unique_vals)
def test_create_layout_image_returns_grayscale(self, text_like_image):
"""Layout image should be single-channel (CLAHE enhanced)."""
layout_img = create_layout_image(text_like_image)
assert len(layout_img.shape) == 2
assert layout_img.dtype == np.uint8
def test_create_layout_image_enhanced_contrast(self, text_like_image):
"""Layout image should have different histogram than simple grayscale."""
import cv2
gray = cv2.cvtColor(text_like_image, cv2.COLOR_BGR2GRAY)
layout_img = create_layout_image(text_like_image)
# CLAHE should change the histogram
assert layout_img.shape == gray.shape
# =============================================
# STAGE 5: LAYOUT ANALYSIS TESTS
# =============================================
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
class TestContentBounds:
"""Test _find_content_bounds helper."""
def test_empty_image(self):
"""Fully white (inverted = black) image should return full bounds."""
inv = np.zeros((200, 300), dtype=np.uint8)
left, right, top, bottom = _find_content_bounds(inv)
# With no content, bounds should span the image
assert left >= 0
assert right <= 300
assert top >= 0
assert bottom <= 200
def test_centered_content(self):
"""Content in center should give tight bounds."""
inv = np.zeros((400, 600), dtype=np.uint8)
# Add content block in center
inv[100:300, 50:550] = 255
left, right, top, bottom = _find_content_bounds(inv)
assert left <= 52 # ~50 with 2px margin
assert right >= 548 # ~550 with 2px margin
assert top <= 102
assert bottom >= 298
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
class TestLayoutAnalysis:
"""Test analyze_layout for column detection."""
def test_returns_list_of_regions(self, text_like_image):
"""analyze_layout should return a list of PageRegion."""
ocr_img = create_ocr_image(text_like_image)
layout_img = create_layout_image(text_like_image)
regions = analyze_layout(layout_img, ocr_img)
assert isinstance(regions, list)
assert all(isinstance(r, PageRegion) for r in regions)
def test_detects_columns(self, text_like_image):
"""With clear 3-column image, should detect at least 1 column."""
ocr_img = create_ocr_image(text_like_image)
layout_img = create_layout_image(text_like_image)
regions = analyze_layout(layout_img, ocr_img)
column_regions = [r for r in regions if r.type.startswith('column')]
assert len(column_regions) >= 1
def test_single_column_fallback(self):
"""Image with no clear columns should fall back to single column."""
# Uniform text across full width
img = np.ones((400, 600, 3), dtype=np.uint8) * 255
for y in range(50, 350, 20):
img[y:y+10, 20:580, :] = 30 # Full-width text
ocr_img = create_ocr_image(img)
layout_img = create_layout_image(img)
regions = analyze_layout(layout_img, ocr_img)
column_regions = [r for r in regions if r.type.startswith('column')]
# Should at least return 1 column (full page fallback)
assert len(column_regions) >= 1
def test_region_types_are_valid(self, text_like_image):
"""All region types should be from the expected set."""
ocr_img = create_ocr_image(text_like_image)
layout_img = create_layout_image(text_like_image)
regions = analyze_layout(layout_img, ocr_img)
valid_types = {'column_en', 'column_de', 'column_example', 'header', 'footer'}
for r in regions:
assert r.type in valid_types, f"Unexpected region type: {r.type}"
# =============================================
# STAGE 7: LINE GROUPING TESTS
# =============================================
class TestLineGrouping:
"""Test _group_words_into_lines function."""
def test_empty_input(self):
"""Empty word list should return empty lines."""
assert _group_words_into_lines([]) == []
def test_single_word(self):
"""Single word should return one line with one word."""
words = [{'text': 'hello', 'left': 10, 'top': 50, 'width': 50, 'height': 15, 'conf': 90}]
lines = _group_words_into_lines(words)
assert len(lines) == 1
assert len(lines[0]) == 1
assert lines[0][0]['text'] == 'hello'
def test_words_on_same_line(self):
"""Words close in Y should be grouped into one line."""
words = [
{'text': 'hello', 'left': 10, 'top': 50, 'width': 50, 'height': 15, 'conf': 90},
{'text': 'world', 'left': 70, 'top': 52, 'width': 50, 'height': 15, 'conf': 85},
]
lines = _group_words_into_lines(words, y_tolerance_px=10)
assert len(lines) == 1
assert len(lines[0]) == 2
def test_words_on_different_lines(self):
"""Words far apart in Y should be on different lines."""
words = [
{'text': 'line1', 'left': 10, 'top': 50, 'width': 50, 'height': 15, 'conf': 90},
{'text': 'line2', 'left': 10, 'top': 100, 'width': 50, 'height': 15, 'conf': 85},
{'text': 'line3', 'left': 10, 'top': 150, 'width': 50, 'height': 15, 'conf': 88},
]
lines = _group_words_into_lines(words, y_tolerance_px=20)
assert len(lines) == 3
def test_words_sorted_by_x_within_line(self):
"""Words within a line should be sorted by X position."""
words = [
{'text': 'world', 'left': 100, 'top': 50, 'width': 50, 'height': 15, 'conf': 85},
{'text': 'hello', 'left': 10, 'top': 52, 'width': 50, 'height': 15, 'conf': 90},
]
lines = _group_words_into_lines(words, y_tolerance_px=10)
assert len(lines) == 1
assert lines[0][0]['text'] == 'hello'
assert lines[0][1]['text'] == 'world'
# =============================================
# STAGE 7: VOCABULARY MATCHING TESTS
# =============================================
class TestVocabMatching:
"""Test match_lines_to_vocab function."""
def test_empty_results(self, sample_regions):
"""Empty OCR results should return empty vocab."""
vocab = match_lines_to_vocab({}, sample_regions)
assert vocab == []
def test_en_only(self, sample_words_column_en, sample_regions):
"""Only EN words should create entries with empty DE/example."""
ocr_results = {'column_en': sample_words_column_en}
vocab = match_lines_to_vocab(ocr_results, sample_regions)
assert len(vocab) == 3
for row in vocab:
assert row.english != ""
assert row.german == ""
def test_en_de_matching(self, sample_words_column_en, sample_words_column_de, sample_regions):
"""EN and DE words on same Y should be matched."""
ocr_results = {
'column_en': sample_words_column_en,
'column_de': sample_words_column_de,
}
vocab = match_lines_to_vocab(ocr_results, sample_regions, y_tolerance_px=25)
assert len(vocab) == 3
# First entry should match achieve <-> erreichen
assert vocab[0].english == 'achieve'
assert vocab[0].german == 'erreichen'
def test_full_3_column_matching(self, sample_words_column_en, sample_words_column_de,
sample_words_column_ex, sample_regions):
"""All 3 columns should be matched by Y coordinate."""
ocr_results = {
'column_en': sample_words_column_en,
'column_de': sample_words_column_de,
'column_example': sample_words_column_ex,
}
vocab = match_lines_to_vocab(ocr_results, sample_regions, y_tolerance_px=25)
assert len(vocab) >= 1
# First entry should have example text
assert vocab[0].english == 'achieve'
assert vocab[0].example != ""
def test_sorted_by_y_position(self, sample_words_column_en, sample_regions):
"""Result should be sorted by Y position."""
ocr_results = {'column_en': sample_words_column_en}
vocab = match_lines_to_vocab(ocr_results, sample_regions)
positions = [row.y_position for row in vocab]
assert positions == sorted(positions)
def test_skips_short_entries(self, sample_regions):
"""Very short text (< 2 chars) should be skipped."""
words = [
{'text': 'a', 'left': 30, 'top': 50, 'width': 10, 'height': 15, 'conf': 90, 'region_type': 'column_en'},
{'text': 'valid', 'left': 30, 'top': 80, 'width': 50, 'height': 15, 'conf': 90, 'region_type': 'column_en'},
]
ocr_results = {'column_en': words}
vocab = match_lines_to_vocab(ocr_results, sample_regions)
assert len(vocab) == 1
assert vocab[0].english == 'valid'
def test_confidence_calculation(self, sample_words_column_en, sample_words_column_de, sample_regions):
"""Confidence should be the average of matched columns."""
ocr_results = {
'column_en': sample_words_column_en,
'column_de': sample_words_column_de,
}
vocab = match_lines_to_vocab(ocr_results, sample_regions, y_tolerance_px=25)
# First entry: EN conf=90, DE conf=88 → avg=89
assert vocab[0].confidence > 0
assert vocab[0].confidence == pytest.approx(89.0, abs=1.0)
# =============================================
# ORCHESTRATOR TESTS
# =============================================
class TestOrchestrator:
"""Test run_cv_pipeline orchestrator."""
@pytest.mark.asyncio
async def test_no_input_returns_error(self):
"""Pipeline without input should return error."""
result = await run_cv_pipeline()
assert result.error is not None
assert "No input data" in result.error
@pytest.mark.asyncio
async def test_pipeline_unavailable(self):
"""When CV_PIPELINE_AVAILABLE is False, should return error."""
with patch('cv_vocab_pipeline.CV_PIPELINE_AVAILABLE', False):
result = await run_cv_pipeline(pdf_data=b"fake")
assert result.error is not None
assert "not available" in result.error
@pytest.mark.asyncio
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
async def test_pipeline_with_image_data(self):
"""Pipeline with a real synthetic image should run without errors."""
import cv2
# Create a simple test image (white with some text-like black bars)
img = np.ones((200, 300, 3), dtype=np.uint8) * 255
for y in range(30, 170, 25):
img[y:y+12, 20:280, :] = 30
_, img_bytes = cv2.imencode('.png', img)
image_data = img_bytes.tobytes()
with patch('cv_vocab_pipeline.pytesseract') as mock_tess:
# Mock Tesseract to return empty results
mock_tess.image_to_data.return_value = {
'text': [], 'conf': [], 'left': [], 'top': [],
'width': [], 'height': [],
}
mock_tess.Output.DICT = 'dict'
result = await run_cv_pipeline(image_data=image_data)
assert result.error is None
assert result.image_width == 300
assert result.image_height == 200
assert 'render' in result.stages
assert 'deskew' in result.stages
@pytest.mark.asyncio
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
async def test_pipeline_records_timing(self):
"""Pipeline should record timing for each stage."""
import cv2
img = np.ones((100, 150, 3), dtype=np.uint8) * 255
_, img_bytes = cv2.imencode('.png', img)
with patch('cv_vocab_pipeline.pytesseract') as mock_tess:
mock_tess.image_to_data.return_value = {
'text': [], 'conf': [], 'left': [], 'top': [],
'width': [], 'height': [],
}
mock_tess.Output.DICT = 'dict'
result = await run_cv_pipeline(image_data=img_bytes.tobytes())
assert result.duration_seconds >= 0
assert all(v >= 0 for v in result.stages.values())
@pytest.mark.asyncio
async def test_pipeline_result_format(self):
"""PipelineResult vocabulary should be list of dicts with expected keys."""
result = PipelineResult()
result.vocabulary = [
{"english": "test", "german": "Test", "example": "A test.", "confidence": 90.0}
]
assert len(result.vocabulary) == 1
entry = result.vocabulary[0]
assert "english" in entry
assert "german" in entry
assert "example" in entry
assert "confidence" in entry
# =============================================
# INTEGRATION-STYLE TESTS (with mocked Tesseract)
# =============================================
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
class TestStageIntegration:
"""Test multiple stages together (still unit-test level with mocked OCR)."""
def test_image_prep_to_layout(self, text_like_image):
"""Stages 4→5: image prep feeds layout analysis correctly."""
ocr_img = create_ocr_image(text_like_image)
layout_img = create_layout_image(text_like_image)
assert ocr_img.shape[:2] == text_like_image.shape[:2]
assert layout_img.shape[:2] == text_like_image.shape[:2]
regions = analyze_layout(layout_img, ocr_img)
assert len(regions) >= 1
def test_deskew_to_image_prep(self, text_like_image):
"""Stages 2→4: deskew output can be processed by image prep."""
corrected, angle = deskew_image(text_like_image)
ocr_img = create_ocr_image(corrected)
layout_img = create_layout_image(corrected)
assert ocr_img.shape[:2] == corrected.shape[:2]
assert layout_img.shape[:2] == corrected.shape[:2]
# =============================================
# RUN TESTS
# =============================================
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -42,6 +42,93 @@ except ImportError:
MINIO_AVAILABLE = False
logger.warning("MinIO storage not available, using local storage")
# Try to import Tesseract extractor
try:
from tesseract_vocab_extractor import (
extract_bounding_boxes, run_tesseract_pipeline,
match_positions_to_vocab, TESSERACT_AVAILABLE,
)
except ImportError:
TESSERACT_AVAILABLE = False
logger.warning("Tesseract extractor not available")
# Try to import CV Pipeline
try:
from cv_vocab_pipeline import run_cv_pipeline, CV_PIPELINE_AVAILABLE
except ImportError:
CV_PIPELINE_AVAILABLE = False
logger.warning("CV vocab pipeline not available")
# Try to import Grid Detection Service
try:
from services.grid_detection_service import GridDetectionService
GRID_SERVICE_AVAILABLE = True
except ImportError:
GRID_SERVICE_AVAILABLE = False
logger.warning("Grid Detection Service not available")
# Database integration (used by main.py lifespan)
try:
from vocab_session_store import (
DATABASE_URL, get_pool, init_vocab_tables,
list_sessions_db, get_session_db,
)
except ImportError:
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://breakpilot:breakpilot@postgres:5432/breakpilot_db")
get_pool = None
init_vocab_tables = None
list_sessions_db = None
get_session_db = None
_db_pool = None
def set_db_pool(pool):
"""Set the database connection pool (called from main.py lifespan)."""
global _db_pool
_db_pool = pool
async def _init_vocab_table():
"""Initialize vocab tables in database."""
if init_vocab_tables:
try:
await init_vocab_tables()
logger.info("vocab_session_cache table ready")
except Exception as e:
logger.warning(f"Failed to init vocab tables: {e}")
else:
logger.info("vocab_session_cache table ready")
async def _load_all_sessions():
"""Load all vocab sessions from database into memory cache."""
if not list_sessions_db:
logger.info("Loaded 0 vocab sessions from database")
return
try:
sessions = await list_sessions_db(limit=500)
count = 0
for s in sessions:
sid = s.get("id") or s.get("session_id")
if sid and sid not in _sessions:
_sessions[sid] = {
"id": sid,
"name": s.get("name", ""),
"description": s.get("description", ""),
"status": s.get("status", "created"),
"vocabulary_count": s.get("vocabulary_count", 0),
"source_language": s.get("source_language", "en"),
"target_language": s.get("target_language", "de"),
"created_at": str(s.get("created_at", "")),
}
count += 1
logger.info(f"Loaded {count} vocab sessions from database")
except Exception as e:
logger.warning(f"Failed to load sessions from database: {e}")
router = APIRouter(prefix="/api/v1/vocab", tags=["Vocabulary Worksheets"])
# Local storage path
@@ -786,7 +873,7 @@ async def upload_image(
# Convert PDF to image if needed
if is_pdf:
logger.info("Converting PDF to image...")
content = await convert_pdf_to_image(content)
content = await convert_pdf_page_to_image(content, page_number=0)
logger.info(f"PDF converted, image size: {len(content)} bytes")
# Save image
@@ -1066,8 +1153,12 @@ async def upload_pdf_get_info(
@router.get("/sessions/{session_id}/pdf-thumbnail/{page_number}")
async def get_pdf_thumbnail(session_id: str, page_number: int):
"""Get a thumbnail image of a specific PDF page."""
async def get_pdf_thumbnail(session_id: str, page_number: int, hires: bool = Query(False)):
"""Get a thumbnail image of a specific PDF page.
Args:
hires: If True, return full-resolution image (zoom=2.0) instead of thumbnail (zoom=0.5).
"""
if session_id not in _sessions:
raise HTTPException(status_code=404, detail="Session not found")
@@ -1077,7 +1168,7 @@ async def get_pdf_thumbnail(session_id: str, page_number: int):
if not pdf_data:
raise HTTPException(status_code=400, detail="No PDF uploaded for this session")
thumbnail = await convert_pdf_page_to_image(pdf_data, page_number, thumbnail=True)
thumbnail = await convert_pdf_page_to_image(pdf_data, page_number, thumbnail=not hires)
return StreamingResponse(
io.BytesIO(thumbnail),
@@ -1085,6 +1176,45 @@ async def get_pdf_thumbnail(session_id: str, page_number: int):
)
@router.get("/sessions/{session_id}/pdf-page-image/{page_number}")
async def get_pdf_page_image(session_id: str, page_number: int, zoom: float = Query(2.0, ge=0.5, le=4.0)):
"""PDF page as PNG at arbitrary resolution (for editor view).
Args:
zoom: Zoom factor (0.5=72DPI, 1.0=144DPI, 2.0=288DPI, 4.0=576DPI).
"""
if session_id not in _sessions:
raise HTTPException(status_code=404, detail="Session not found")
session = _sessions[session_id]
pdf_data = session.get("pdf_data")
if not pdf_data:
raise HTTPException(status_code=400, detail="No PDF uploaded for this session")
page_count = session.get("pdf_page_count", 1)
if page_number < 0 or page_number >= page_count:
raise HTTPException(status_code=400, detail=f"Invalid page number. PDF has {page_count} pages (0-indexed).")
try:
import fitz
pdf_document = fitz.open(stream=pdf_data, filetype="pdf")
page = pdf_document[page_number]
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
png_data = pix.tobytes("png")
pdf_document.close()
logger.info(f"PDF page {page_number} rendered at zoom={zoom}: {len(png_data)} bytes")
except Exception as e:
logger.error(f"PDF page image failed: {e}")
raise HTTPException(status_code=500, detail=f"PDF rendering failed: {str(e)}")
return StreamingResponse(
io.BytesIO(png_data),
media_type="image/png",
)
@router.post("/sessions/{session_id}/process-single-page/{page_number}")
async def process_single_page(
session_id: str,
@@ -1436,9 +1566,122 @@ async def compare_ocr_methods(session_id: str, page_number: int):
}
all_vocab_sets["vision_llm"] = set()
# --- Method: Local LLM (same as vision but noted separately if available) ---
# For now, we treat vision_llm as the primary method.
# Tesseract method can be added here if tesseract_vocab_extractor is available.
# --- Method: Tesseract OCR (bounding boxes + vocab extraction) ---
if TESSERACT_AVAILABLE:
try:
start = time.time()
tess_result = await run_tesseract_pipeline(image_data, lang="eng+deu")
duration = time.time() - start
tess_vocab = tess_result.get("vocabulary", [])
tess_words = tess_result.get("words", [])
# Store Tesseract words in session for later use (grid analysis, position matching)
session["tesseract_words"] = tess_words
session["tesseract_image_width"] = tess_result.get("image_width", 0)
session["tesseract_image_height"] = tess_result.get("image_height", 0)
session[f"tesseract_page_{page_number}"] = tess_result
vocab_list_tess = []
for v in tess_vocab:
vocab_list_tess.append({
"english": v.get("english", ""),
"german": v.get("german", ""),
"example": v.get("example", ""),
})
methods_results["tesseract"] = {
"name": "Tesseract OCR",
"model": "tesseract-ocr (eng+deu)",
"duration_seconds": round(duration, 1),
"vocabulary_count": len(vocab_list_tess),
"vocabulary": vocab_list_tess,
"confidence": 0.7 if tess_vocab else 0,
"success": len(vocab_list_tess) > 0,
"error": tess_result.get("error"),
"word_count": tess_result.get("word_count", 0),
"columns_detected": len(tess_result.get("columns", [])),
}
all_vocab_sets["tesseract"] = {
(v["english"].lower().strip(), v["german"].lower().strip())
for v in vocab_list_tess if v["english"] and v["german"]
}
# Fuzzy-match: attach Tesseract bounding boxes to Vision LLM results
if "vision_llm" in methods_results and methods_results["vision_llm"]["success"]:
llm_vocab_with_bbox = match_positions_to_vocab(
tess_words,
methods_results["vision_llm"]["vocabulary"],
tess_result.get("image_width", 1),
tess_result.get("image_height", 1),
)
methods_results["vision_llm"]["vocabulary"] = llm_vocab_with_bbox
except Exception as e:
logger.error(f"Tesseract failed: {e}")
import traceback
logger.debug(traceback.format_exc())
methods_results["tesseract"] = {
"name": "Tesseract OCR",
"model": "tesseract-ocr",
"duration_seconds": 0,
"vocabulary_count": 0,
"vocabulary": [],
"confidence": 0,
"success": False,
"error": str(e),
}
all_vocab_sets["tesseract"] = set()
# --- Method: CV Pipeline (Document Reconstruction) ---
if CV_PIPELINE_AVAILABLE:
try:
start = time.time()
cv_result = await run_cv_pipeline(pdf_data=pdf_data, page_number=page_number)
duration = time.time() - start
cv_vocab = cv_result.vocabulary if not cv_result.error else []
vocab_list_cv = []
for v in cv_vocab:
vocab_list_cv.append({
"english": v.get("english", ""),
"german": v.get("german", ""),
"example": v.get("example", ""),
})
methods_results["cv_pipeline"] = {
"name": "CV Pipeline (Document Reconstruction)",
"model": "opencv + tesseract (multi-pass)",
"duration_seconds": round(duration, 1),
"vocabulary_count": len(vocab_list_cv),
"vocabulary": vocab_list_cv,
"confidence": 0.8 if cv_vocab else 0,
"success": len(vocab_list_cv) > 0,
"error": cv_result.error,
"word_count": cv_result.word_count,
"columns_detected": cv_result.columns_detected,
"stages": cv_result.stages,
}
all_vocab_sets["cv_pipeline"] = {
(v["english"].lower().strip(), v["german"].lower().strip())
for v in vocab_list_cv if v["english"] and v["german"]
}
except Exception as e:
logger.error(f"CV Pipeline failed: {e}")
import traceback
logger.debug(traceback.format_exc())
methods_results["cv_pipeline"] = {
"name": "CV Pipeline (Document Reconstruction)",
"model": "opencv + tesseract (multi-pass)",
"duration_seconds": 0,
"vocabulary_count": 0,
"vocabulary": [],
"confidence": 0,
"success": False,
"error": str(e),
}
all_vocab_sets["cv_pipeline"] = set()
# --- Build comparison ---
all_unique = set()
@@ -1461,11 +1704,6 @@ async def compare_ocr_methods(session_id: str, page_number: int):
# Find best method
best_method = max(all_vocab_sets, key=lambda m: len(all_vocab_sets[m])) if all_vocab_sets else "vision_llm"
# Save vocabulary from best method in session for grid analysis (no second Ollama call needed)
best_vocab_list = methods_results.get(best_method, {}).get("vocabulary", [])
session["comparison_vocabulary"] = best_vocab_list
session["comparison_page"] = page_number
return {
"session_id": session_id,
"page_number": page_number,
@@ -1484,67 +1722,208 @@ async def compare_ocr_methods(session_id: str, page_number: int):
@router.post("/sessions/{session_id}/analyze-grid/{page_number}")
async def analyze_grid(session_id: str, page_number: int):
async def analyze_grid(session_id: str, page_number: int, use_tesseract: bool = Query(True)):
"""
Build grid structure from comparison results (no Ollama call needed).
Analyze the grid/table structure of a vocabulary page.
Uses vocabulary stored in session by compare-ocr to compute
the grid layout instantly.
Hybrid approach:
1. If Tesseract bounding boxes are available (from compare-ocr), use them for
real spatial positions via GridDetectionService.
2. Otherwise fall back to Vision LLM for grid structure detection.
page_number is 0-indexed.
Returns GridData structure expected by the frontend GridOverlay component.
"""
logger.info(f"Grid analysis for session {session_id}, page {page_number}")
import httpx
import time
logger.info(f"Grid analysis for session {session_id}, page {page_number} (use_tesseract={use_tesseract})")
if session_id not in _sessions:
raise HTTPException(status_code=404, detail="Session not found")
session = _sessions[session_id]
pdf_data = session.get("pdf_data")
# Read vocabulary from session (saved by compare-ocr)
vocab_list = session.get("comparison_vocabulary")
if not vocab_list:
return {"success": False, "error": "Bitte zuerst Vergleich starten, bevor die Grid-Analyse ausgefuehrt wird."}
if not pdf_data:
raise HTTPException(status_code=400, detail="No PDF uploaded for this session")
page_count = session.get("pdf_page_count", 1)
if page_number < 0 or page_number >= page_count:
raise HTTPException(status_code=400, detail=f"Invalid page number.")
# Convert page to image
image_data = await convert_pdf_page_to_image(pdf_data, page_number, thumbnail=False)
# --- Strategy 1: Use Tesseract bounding boxes + GridDetectionService ---
tess_page_data = session.get(f"tesseract_page_{page_number}")
if use_tesseract and TESSERACT_AVAILABLE and GRID_SERVICE_AVAILABLE:
try:
# Run Tesseract if not already cached
if not tess_page_data:
logger.info("Running Tesseract for grid analysis (not cached)")
from tesseract_vocab_extractor import run_tesseract_pipeline as _run_tess
tess_page_data = await _run_tess(image_data, lang="eng+deu")
session[f"tesseract_page_{page_number}"] = tess_page_data
session["tesseract_words"] = tess_page_data.get("words", [])
session["tesseract_image_width"] = tess_page_data.get("image_width", 0)
session["tesseract_image_height"] = tess_page_data.get("image_height", 0)
tess_words = tess_page_data.get("words", [])
img_w = tess_page_data.get("image_width", 0)
img_h = tess_page_data.get("image_height", 0)
if tess_words and img_w > 0 and img_h > 0:
service = GridDetectionService()
regions = service.convert_tesseract_regions(tess_words, img_w, img_h)
if regions:
grid_result = service.detect_grid(regions)
grid_dict = grid_result.to_dict()
# Merge LLM text if available (better quality than Tesseract text)
# The LLM vocab was stored during compare-ocr
grid_dict["source"] = "tesseract+grid_service"
grid_dict["word_count"] = len(tess_words)
logger.info(f"Tesseract grid: {grid_result.rows}x{grid_result.columns}, "
f"{grid_result.stats.get('recognized', 0)} recognized")
return {"success": True, "grid": grid_dict}
logger.info("Tesseract data insufficient, falling back to LLM")
except Exception as e:
logger.warning(f"Tesseract grid analysis failed, falling back to LLM: {e}")
import traceback
logger.debug(traceback.format_exc())
# --- Strategy 2: Fall back to Vision LLM ---
image_base64 = base64.b64encode(image_data).decode("utf-8")
grid_prompt = """Analyze this textbook page image. It contains a vocabulary table/grid.
Your task: Identify the TABLE STRUCTURE and extract each cell's content.
Return a JSON object with this EXACT structure:
{
"rows": <number of rows>,
"columns": <number of columns>,
"column_types": ["english", "german", "example"],
"entries": [
{
"row": 0,
"col": 0,
"text": "the word or phrase in this cell",
"column_type": "english",
"confidence": 0.95
}
]
}
Rules:
- row and col are 0-indexed
- column_type is one of: "english", "german", "example", "unknown"
- Detect whether each column contains English words, German translations, or example sentences
- Include ALL non-empty cells
- confidence is 0.0-1.0 based on how clear the text is
- If a cell is empty, don't include it
- Return ONLY the JSON, no other text"""
try:
# Check if example column is present (at least one vocab has non-empty example)
has_examples = any(v.get("example", "").strip() for v in vocab_list)
num_cols = 3 if has_examples else 2
column_types = ["english", "german", "example"] if has_examples else ["english", "german"]
import asyncio
num_rows = len(vocab_list)
if num_rows == 0:
return {"success": False, "error": "Keine Vokabeln im Vergleichsergebnis gefunden."}
raw_text = ""
max_retries = 3
for attempt in range(max_retries):
async with httpx.AsyncClient(timeout=300.0) as client:
response = await client.post(
f"{OLLAMA_URL}/api/chat",
json={
"model": VISION_MODEL,
"messages": [{"role": "user", "content": grid_prompt, "images": [image_base64]}],
"stream": False,
"options": {"temperature": 0.1, "num_predict": 8192},
},
timeout=300.0,
)
if response.status_code == 500 and attempt < max_retries - 1:
wait_time = 10 * (attempt + 1)
logger.warning(f"Ollama returned 500, retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})")
await asyncio.sleep(wait_time)
continue
elif response.status_code != 200:
error_detail = response.text[:200] if response.text else "Unknown error"
return {"success": False, "error": f"Ollama Fehler ({response.status_code}): {error_detail}. Bitte erneut versuchen - evtl. laeuft noch ein anderer OCR-Request."}
raw_text = response.json().get("message", {}).get("content", "")
break
# Parse JSON from response
import re
json_match = re.search(r'\{[\s\S]*\}', raw_text)
if not json_match:
return {"success": False, "error": "Could not parse grid structure from LLM response"}
grid_raw = json.loads(json_match.group())
num_rows = grid_raw.get("rows", 0)
num_cols = grid_raw.get("columns", 0)
column_types = grid_raw.get("column_types", [])
entries = grid_raw.get("entries", [])
if num_rows == 0 or num_cols == 0:
return {"success": False, "error": "No grid structure detected"}
# Ensure column_types has the right length
while len(column_types) < num_cols:
column_types.append("unknown")
# Build cell grid with percentage-based coordinates
row_height = 100.0 / num_rows
col_width = 100.0 / num_cols
# Track which cells have content
cell_map = {}
for entry in entries:
r = entry.get("row", 0)
c = entry.get("col", 0)
cell_map[(r, c)] = entry
cells = []
recognized_count = 0
empty_count = 0
problematic_count = 0
for r, vocab in enumerate(vocab_list):
for r in range(num_rows):
row_cells = []
english = vocab.get("english", "").strip()
german = vocab.get("german", "").strip()
example = vocab.get("example", "").strip() if has_examples else None
col_values = [("english", english), ("german", german)]
if has_examples:
col_values.append(("example", example))
for c, (col_type, text) in enumerate(col_values):
for c in range(num_cols):
x = c * col_width
y = r * row_height
if text:
status = "recognized"
recognized_count += 1
conf = 0.9
if (r, c) in cell_map:
entry = cell_map[(r, c)]
text = entry.get("text", "").strip()
conf = entry.get("confidence", 0.8)
col_type = entry.get("column_type", column_types[c] if c < len(column_types) else "unknown")
if text:
status = "recognized" if conf >= 0.5 else "problematic"
if status == "recognized":
recognized_count += 1
else:
problematic_count += 1
else:
status = "empty"
empty_count += 1
else:
text = ""
conf = 0.0
col_type = column_types[c] if c < len(column_types) else "unknown"
status = "empty"
empty_count += 1
conf = 0.0
row_cells.append({
"row": r,
@@ -1553,7 +1932,7 @@ async def analyze_grid(session_id: str, page_number: int):
"y": round(y, 2),
"width": round(col_width, 2),
"height": round(row_height, 2),
"text": text or "",
"text": text,
"confidence": conf,
"status": status,
"column_type": col_type,
@@ -1561,8 +1940,9 @@ async def analyze_grid(session_id: str, page_number: int):
cells.append(row_cells)
total = num_rows * num_cols
coverage = recognized_count / max(total, 1)
coverage = (recognized_count + problematic_count) / max(total, 1)
# Column and row boundaries as percentages
col_boundaries = [round(c * col_width, 2) for c in range(num_cols + 1)]
row_boundaries = [round(r * row_height, 2) for r in range(num_rows + 1)]
@@ -1574,9 +1954,10 @@ async def analyze_grid(session_id: str, page_number: int):
"column_boundaries": col_boundaries,
"row_boundaries": row_boundaries,
"deskew_angle": 0.0,
"source": "vision_llm",
"stats": {
"recognized": recognized_count,
"problematic": 0,
"problematic": problematic_count,
"empty": empty_count,
"manual": 0,
"total": total,
@@ -1586,6 +1967,9 @@ async def analyze_grid(session_id: str, page_number: int):
return {"success": True, "grid": grid_data}
except httpx.TimeoutException:
logger.error("Grid analysis timed out")
return {"success": False, "error": "Grid-Analyse Timeout (Ollama zu langsam)"}
except Exception as e:
logger.error(f"Grid analysis failed: {e}")
import traceback