Files
breakpilot-lehrer/klausur-service/backend/tests/test_cv_vocab_pipeline.py
T
Benjamin Admin 5f2ed44654
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 41s
CI / test-go-edu-search (push) Successful in 32s
CI / test-python-klausur (push) Failing after 2m41s
CI / test-python-agent-core (push) Successful in 34s
CI / test-nodejs-website (push) Successful in 39s
Cleanup: Delete ALL 242 shims, update ALL consumer imports
klausur-service: 183 shims deleted, 26 test files + 8 source files updated
backend-lehrer: 59 shims deleted, main.py + 8 source files updated

All imports now use the new package paths directly.
Zero shims remaining in the entire codebase.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-26 00:11:33 +02:00

1741 lines
68 KiB
Python

"""
Unit Tests for CV Vocab Pipeline (cv_vocab_pipeline.py)
Tests cover:
- Data classes (PageRegion, VocabRow, PipelineResult)
- Stage 2: Deskew image
- Stage 3: Dewarp (pass-through)
- Stage 4: Image preparation (OCR + Layout images)
- Stage 5: Layout analysis (content bounds, projection profiles, column detection)
- Stage 6: Multi-pass OCR region handling
- Stage 7: Line grouping and vocabulary matching
- Noise filter functions (_is_noise_tail_token, _clean_cell_text)
- Phonetic detection (_is_phonetic_only_text)
- Phonetic & continuation row merging
- Orchestrator (run_cv_pipeline)
DSGVO Note: All tests run locally with synthetic data. No external API calls.
"""
import pytest
import numpy as np
from unittest.mock import AsyncMock, MagicMock, patch, PropertyMock
from dataclasses import asdict
# Import module under test
from ocr.cv_pipeline import (
ColumnGeometry,
DocumentTypeResult,
PageRegion,
RowGeometry,
VocabRow,
PipelineResult,
deskew_image,
dewarp_image,
create_ocr_image,
create_layout_image,
_find_content_bounds,
_filter_narrow_runs,
_build_margin_regions,
_detect_header_footer_gaps,
_detect_sub_columns,
_region_has_content,
_add_header_footer,
analyze_layout,
_group_words_into_lines,
match_lines_to_vocab,
run_cv_pipeline,
CV2_AVAILABLE,
TESSERACT_AVAILABLE,
CV_PIPELINE_AVAILABLE,
_is_noise_tail_token,
_clean_cell_text,
_clean_cell_text_lite,
_is_phonetic_only_text,
_merge_phonetic_continuation_rows,
_merge_continuation_rows,
_ocr_cell_crop,
detect_document_type,
)
# =============================================
# FIXTURES
# =============================================
@pytest.fixture
def white_image():
"""Create a simple 300x200 white BGR image."""
return np.ones((200, 300, 3), dtype=np.uint8) * 255
@pytest.fixture
def text_like_image():
"""Create a 600x400 image with dark text-like regions simulating 3 columns."""
img = np.ones((400, 600, 3), dtype=np.uint8) * 255
# Column 1 (EN): x=20..170
for y in range(50, 350, 30):
img[y:y+15, 30:160, :] = 30 # Dark text lines
# Gap between col1 and col2: x=170..210 (white)
# Column 2 (DE): x=210..370
for y in range(50, 350, 30):
img[y:y+15, 220:360, :] = 30
# Gap between col2 and col3: x=370..410 (white)
# Column 3 (Example): x=410..580
for y in range(50, 350, 30):
img[y:y+15, 420:570, :] = 30
return img
@pytest.fixture
def binary_image():
"""Create a binary (single-channel) image for OCR tests."""
# White background (255) with some black text-like areas
img = np.ones((400, 600), dtype=np.uint8) * 255
# Add text-like dark bands
for y in range(50, 350, 30):
img[y:y+15, 30:570] = 0
return img
@pytest.fixture
def sample_words_column_en():
"""Sample OCR word dicts for English column."""
return [
{'text': 'achieve', 'left': 30, 'top': 50, 'width': 80, 'height': 15, 'conf': 90, 'region_type': 'column_en'},
{'text': 'improve', 'left': 30, 'top': 80, 'width': 80, 'height': 15, 'conf': 85, 'region_type': 'column_en'},
{'text': 'success', 'left': 30, 'top': 110, 'width': 80, 'height': 15, 'conf': 92, 'region_type': 'column_en'},
]
@pytest.fixture
def sample_words_column_de():
"""Sample OCR word dicts for German column."""
return [
{'text': 'erreichen', 'left': 220, 'top': 52, 'width': 100, 'height': 15, 'conf': 88, 'region_type': 'column_de'},
{'text': 'verbessern', 'left': 220, 'top': 82, 'width': 100, 'height': 15, 'conf': 80, 'region_type': 'column_de'},
{'text': 'Erfolg', 'left': 220, 'top': 112, 'width': 100, 'height': 15, 'conf': 95, 'region_type': 'column_de'},
]
@pytest.fixture
def sample_words_column_ex():
"""Sample OCR word dicts for Example column."""
return [
{'text': 'She', 'left': 420, 'top': 50, 'width': 30, 'height': 15, 'conf': 85, 'region_type': 'column_example'},
{'text': 'achieved', 'left': 455, 'top': 50, 'width': 70, 'height': 15, 'conf': 80, 'region_type': 'column_example'},
{'text': 'her', 'left': 530, 'top': 50, 'width': 30, 'height': 15, 'conf': 90, 'region_type': 'column_example'},
{'text': 'goals.', 'left': 420, 'top': 52, 'width': 50, 'height': 15, 'conf': 75, 'region_type': 'column_example'},
]
@pytest.fixture
def sample_regions():
"""Sample 3-column PageRegion layout."""
return [
PageRegion(type='column_en', x=0, y=50, width=190, height=300),
PageRegion(type='column_de', x=210, y=50, width=160, height=300),
PageRegion(type='column_example', x=410, y=50, width=190, height=300),
]
# =============================================
# DATA CLASS TESTS
# =============================================
class TestDataClasses:
"""Test data classes for correct defaults and fields."""
def test_page_region_creation(self):
region = PageRegion(type='column_en', x=10, y=20, width=100, height=200)
assert region.type == 'column_en'
assert region.x == 10
assert region.y == 20
assert region.width == 100
assert region.height == 200
def test_vocab_row_defaults(self):
row = VocabRow()
assert row.english == ""
assert row.german == ""
assert row.example == ""
assert row.confidence == 0.0
assert row.y_position == 0
def test_vocab_row_with_values(self):
row = VocabRow(english="test", german="Test", example="A test.", confidence=85.5, y_position=100)
assert row.english == "test"
assert row.german == "Test"
assert row.confidence == 85.5
def test_pipeline_result_defaults(self):
result = PipelineResult()
assert result.vocabulary == []
assert result.word_count == 0
assert result.columns_detected == 0
assert result.duration_seconds == 0.0
assert result.stages == {}
assert result.error is None
def test_pipeline_result_error(self):
result = PipelineResult(error="Something went wrong")
assert result.error == "Something went wrong"
# =============================================
# STAGE 2: DESKEW TESTS
# =============================================
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
class TestDeskew:
"""Test deskew (rotation correction) stage."""
def test_deskew_straight_image(self, white_image):
"""A perfectly straight image should not be rotated."""
corrected, angle = deskew_image(white_image)
assert abs(angle) < 0.1
assert corrected.shape == white_image.shape
def test_deskew_returns_tuple(self, white_image):
"""deskew_image must return (image, angle) tuple."""
result = deskew_image(white_image)
assert isinstance(result, tuple)
assert len(result) == 2
assert isinstance(result[0], np.ndarray)
assert isinstance(result[1], float)
def test_deskew_preserves_shape(self, text_like_image):
"""Output image should have same shape as input."""
corrected, _ = deskew_image(text_like_image)
assert corrected.shape == text_like_image.shape
# =============================================
# STAGE 3: DEWARP TESTS
# =============================================
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
class TestDewarp:
"""Test dewarp stage (returns (image, info) tuple)."""
def test_dewarp_returns_tuple(self, white_image):
"""dewarp_image must return (image, dewarp_info) tuple."""
result = dewarp_image(white_image)
assert isinstance(result, tuple)
assert len(result) == 2
img_out, info = result
assert isinstance(img_out, np.ndarray)
assert isinstance(info, dict)
assert "shear_degrees" in info
def test_dewarp_preserves_shape(self, text_like_image):
"""Output image should have same shape as input."""
img_out, _ = dewarp_image(text_like_image)
assert img_out.shape == text_like_image.shape
def test_dewarp_white_image_no_correction(self, white_image):
"""A uniform white image should get no shear correction."""
img_out, info = dewarp_image(white_image)
assert abs(info["shear_degrees"]) < 0.5
assert img_out.shape == white_image.shape
# =============================================
# STAGE 4: IMAGE PREPARATION TESTS
# =============================================
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
class TestImagePreparation:
"""Test OCR and layout image creation."""
def test_create_ocr_image_returns_grayscale(self, text_like_image):
"""OCR image should be single-channel (binarized)."""
ocr_img = create_ocr_image(text_like_image)
assert len(ocr_img.shape) == 2 # Single channel
assert ocr_img.dtype == np.uint8
def test_create_ocr_image_is_binary(self, text_like_image):
"""OCR image should contain only 0 and 255 values."""
ocr_img = create_ocr_image(text_like_image)
unique_vals = np.unique(ocr_img)
assert all(v in [0, 255] for v in unique_vals)
def test_create_layout_image_returns_grayscale(self, text_like_image):
"""Layout image should be single-channel (CLAHE enhanced)."""
layout_img = create_layout_image(text_like_image)
assert len(layout_img.shape) == 2
assert layout_img.dtype == np.uint8
def test_create_layout_image_enhanced_contrast(self, text_like_image):
"""Layout image should have different histogram than simple grayscale."""
import cv2
gray = cv2.cvtColor(text_like_image, cv2.COLOR_BGR2GRAY)
layout_img = create_layout_image(text_like_image)
# CLAHE should change the histogram
assert layout_img.shape == gray.shape
# =============================================
# STAGE 5: LAYOUT ANALYSIS TESTS
# =============================================
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
class TestContentBounds:
"""Test _find_content_bounds helper."""
def test_empty_image(self):
"""Fully white (inverted = black) image should return full bounds."""
inv = np.zeros((200, 300), dtype=np.uint8)
left, right, top, bottom = _find_content_bounds(inv)
# With no content, bounds should span the image
assert left >= 0
assert right <= 300
assert top >= 0
assert bottom <= 200
def test_centered_content(self):
"""Content in center should give tight bounds."""
inv = np.zeros((400, 600), dtype=np.uint8)
# Add content block in center
inv[100:300, 50:550] = 255
left, right, top, bottom = _find_content_bounds(inv)
assert left <= 52 # ~50 with 2px margin
assert right >= 548 # ~550 with 2px margin
assert top <= 102
assert bottom >= 298
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
class TestLayoutAnalysis:
"""Test analyze_layout for column detection."""
def test_returns_list_of_regions(self, text_like_image):
"""analyze_layout should return a list of PageRegion."""
ocr_img = create_ocr_image(text_like_image)
layout_img = create_layout_image(text_like_image)
regions = analyze_layout(layout_img, ocr_img)
assert isinstance(regions, list)
assert all(isinstance(r, PageRegion) for r in regions)
def test_detects_columns(self, text_like_image):
"""With clear 3-column image, should detect at least 1 column."""
ocr_img = create_ocr_image(text_like_image)
layout_img = create_layout_image(text_like_image)
regions = analyze_layout(layout_img, ocr_img)
column_regions = [r for r in regions if r.type.startswith('column')]
assert len(column_regions) >= 1
def test_single_column_fallback(self):
"""Image with no clear columns should fall back to single column."""
# Uniform text across full width
img = np.ones((400, 600, 3), dtype=np.uint8) * 255
for y in range(50, 350, 20):
img[y:y+10, 20:580, :] = 30 # Full-width text
ocr_img = create_ocr_image(img)
layout_img = create_layout_image(img)
regions = analyze_layout(layout_img, ocr_img)
column_regions = [r for r in regions if r.type.startswith('column')]
# Should at least return 1 column (full page fallback)
assert len(column_regions) >= 1
def test_region_types_are_valid(self, text_like_image):
"""All region types should be from the expected set."""
ocr_img = create_ocr_image(text_like_image)
layout_img = create_layout_image(text_like_image)
regions = analyze_layout(layout_img, ocr_img)
valid_types = {'column_en', 'column_de', 'column_example',
'header', 'footer', 'margin_top', 'margin_bottom'}
for r in regions:
assert r.type in valid_types, f"Unexpected region type: {r.type}"
# =============================================
# STAGE 7: LINE GROUPING TESTS
# =============================================
class TestLineGrouping:
"""Test _group_words_into_lines function."""
def test_empty_input(self):
"""Empty word list should return empty lines."""
assert _group_words_into_lines([]) == []
def test_single_word(self):
"""Single word should return one line with one word."""
words = [{'text': 'hello', 'left': 10, 'top': 50, 'width': 50, 'height': 15, 'conf': 90}]
lines = _group_words_into_lines(words)
assert len(lines) == 1
assert len(lines[0]) == 1
assert lines[0][0]['text'] == 'hello'
def test_words_on_same_line(self):
"""Words close in Y should be grouped into one line."""
words = [
{'text': 'hello', 'left': 10, 'top': 50, 'width': 50, 'height': 15, 'conf': 90},
{'text': 'world', 'left': 70, 'top': 52, 'width': 50, 'height': 15, 'conf': 85},
]
lines = _group_words_into_lines(words, y_tolerance_px=10)
assert len(lines) == 1
assert len(lines[0]) == 2
def test_words_on_different_lines(self):
"""Words far apart in Y should be on different lines."""
words = [
{'text': 'line1', 'left': 10, 'top': 50, 'width': 50, 'height': 15, 'conf': 90},
{'text': 'line2', 'left': 10, 'top': 100, 'width': 50, 'height': 15, 'conf': 85},
{'text': 'line3', 'left': 10, 'top': 150, 'width': 50, 'height': 15, 'conf': 88},
]
lines = _group_words_into_lines(words, y_tolerance_px=20)
assert len(lines) == 3
def test_words_sorted_by_x_within_line(self):
"""Words within a line should be sorted by X position."""
words = [
{'text': 'world', 'left': 100, 'top': 50, 'width': 50, 'height': 15, 'conf': 85},
{'text': 'hello', 'left': 10, 'top': 52, 'width': 50, 'height': 15, 'conf': 90},
]
lines = _group_words_into_lines(words, y_tolerance_px=10)
assert len(lines) == 1
assert lines[0][0]['text'] == 'hello'
assert lines[0][1]['text'] == 'world'
# =============================================
# STAGE 7: VOCABULARY MATCHING TESTS
# =============================================
class TestVocabMatching:
"""Test match_lines_to_vocab function."""
def test_empty_results(self, sample_regions):
"""Empty OCR results should return empty vocab."""
vocab = match_lines_to_vocab({}, sample_regions)
assert vocab == []
def test_en_only(self, sample_words_column_en, sample_regions):
"""Only EN words should create entries with empty DE/example."""
ocr_results = {'column_en': sample_words_column_en}
vocab = match_lines_to_vocab(ocr_results, sample_regions)
assert len(vocab) == 3
for row in vocab:
assert row.english != ""
assert row.german == ""
def test_en_de_matching(self, sample_words_column_en, sample_words_column_de, sample_regions):
"""EN and DE words on same Y should be matched."""
ocr_results = {
'column_en': sample_words_column_en,
'column_de': sample_words_column_de,
}
vocab = match_lines_to_vocab(ocr_results, sample_regions, y_tolerance_px=25)
assert len(vocab) == 3
# First entry should match achieve <-> erreichen
assert vocab[0].english == 'achieve'
assert vocab[0].german == 'erreichen'
def test_full_3_column_matching(self, sample_words_column_en, sample_words_column_de,
sample_words_column_ex, sample_regions):
"""All 3 columns should be matched by Y coordinate."""
ocr_results = {
'column_en': sample_words_column_en,
'column_de': sample_words_column_de,
'column_example': sample_words_column_ex,
}
vocab = match_lines_to_vocab(ocr_results, sample_regions, y_tolerance_px=25)
assert len(vocab) >= 1
# First entry should have example text
assert vocab[0].english == 'achieve'
assert vocab[0].example != ""
def test_sorted_by_y_position(self, sample_words_column_en, sample_regions):
"""Result should be sorted by Y position."""
ocr_results = {'column_en': sample_words_column_en}
vocab = match_lines_to_vocab(ocr_results, sample_regions)
positions = [row.y_position for row in vocab]
assert positions == sorted(positions)
def test_skips_short_entries(self, sample_regions):
"""Very short text (< 2 chars) should be skipped."""
words = [
{'text': 'a', 'left': 30, 'top': 50, 'width': 10, 'height': 15, 'conf': 90, 'region_type': 'column_en'},
{'text': 'valid', 'left': 30, 'top': 80, 'width': 50, 'height': 15, 'conf': 90, 'region_type': 'column_en'},
]
ocr_results = {'column_en': words}
vocab = match_lines_to_vocab(ocr_results, sample_regions)
assert len(vocab) == 1
assert vocab[0].english == 'valid'
def test_confidence_calculation(self, sample_words_column_en, sample_words_column_de, sample_regions):
"""Confidence should be the average of matched columns."""
ocr_results = {
'column_en': sample_words_column_en,
'column_de': sample_words_column_de,
}
vocab = match_lines_to_vocab(ocr_results, sample_regions, y_tolerance_px=25)
# First entry: EN conf=90, DE conf=88 → avg=89
assert vocab[0].confidence > 0
assert vocab[0].confidence == pytest.approx(89.0, abs=1.0)
# =============================================
# ORCHESTRATOR TESTS
# =============================================
class TestOrchestrator:
"""Test run_cv_pipeline orchestrator."""
@pytest.mark.asyncio
async def test_no_input_returns_error(self):
"""Pipeline without input should return error."""
result = await run_cv_pipeline()
assert result.error is not None
assert "No input data" in result.error
@pytest.mark.asyncio
async def test_pipeline_unavailable(self):
"""When CV_PIPELINE_AVAILABLE is False, should return error."""
with patch('cv_vocab_pipeline.CV_PIPELINE_AVAILABLE', False):
result = await run_cv_pipeline(pdf_data=b"fake")
assert result.error is not None
assert "not available" in result.error
@pytest.mark.asyncio
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
async def test_pipeline_with_image_data(self):
"""Pipeline with a real synthetic image should run without errors."""
import cv2
# Create a simple test image (white with some text-like black bars)
img = np.ones((200, 300, 3), dtype=np.uint8) * 255
for y in range(30, 170, 25):
img[y:y+12, 20:280, :] = 30
_, img_bytes = cv2.imencode('.png', img)
image_data = img_bytes.tobytes()
with patch('cv_vocab_pipeline.pytesseract') as mock_tess:
# Mock Tesseract to return empty results
mock_tess.image_to_data.return_value = {
'text': [], 'conf': [], 'left': [], 'top': [],
'width': [], 'height': [],
}
mock_tess.Output.DICT = 'dict'
result = await run_cv_pipeline(image_data=image_data)
assert result.error is None
assert result.image_width == 300
assert result.image_height == 200
assert 'render' in result.stages
assert 'deskew' in result.stages
@pytest.mark.asyncio
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
async def test_pipeline_records_timing(self):
"""Pipeline should record timing for each stage."""
import cv2
img = np.ones((100, 150, 3), dtype=np.uint8) * 255
_, img_bytes = cv2.imencode('.png', img)
with patch('cv_vocab_pipeline.pytesseract') as mock_tess:
mock_tess.image_to_data.return_value = {
'text': [], 'conf': [], 'left': [], 'top': [],
'width': [], 'height': [],
}
mock_tess.Output.DICT = 'dict'
result = await run_cv_pipeline(image_data=img_bytes.tobytes())
assert result.duration_seconds >= 0
assert all(v >= 0 for v in result.stages.values())
@pytest.mark.asyncio
async def test_pipeline_result_format(self):
"""PipelineResult vocabulary should be list of dicts with expected keys."""
result = PipelineResult()
result.vocabulary = [
{"english": "test", "german": "Test", "example": "A test.", "confidence": 90.0}
]
assert len(result.vocabulary) == 1
entry = result.vocabulary[0]
assert "english" in entry
assert "german" in entry
assert "example" in entry
assert "confidence" in entry
# =============================================
# INTEGRATION-STYLE TESTS (with mocked Tesseract)
# =============================================
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
class TestStageIntegration:
"""Test multiple stages together (still unit-test level with mocked OCR)."""
def test_image_prep_to_layout(self, text_like_image):
"""Stages 4→5: image prep feeds layout analysis correctly."""
ocr_img = create_ocr_image(text_like_image)
layout_img = create_layout_image(text_like_image)
assert ocr_img.shape[:2] == text_like_image.shape[:2]
assert layout_img.shape[:2] == text_like_image.shape[:2]
regions = analyze_layout(layout_img, ocr_img)
assert len(regions) >= 1
def test_deskew_to_image_prep(self, text_like_image):
"""Stages 2→4: deskew output can be processed by image prep."""
corrected, angle = deskew_image(text_like_image)
ocr_img = create_ocr_image(corrected)
layout_img = create_layout_image(corrected)
assert ocr_img.shape[:2] == corrected.shape[:2]
assert layout_img.shape[:2] == corrected.shape[:2]
# =============================================
# NOISE FILTER TESTS
# =============================================
class TestNoiseFilter:
"""Test _is_noise_tail_token for trailing OCR noise detection."""
# --- Tokens that should be KEPT (return False) ---
@pytest.mark.parametrize("token", [
# Compound words with hyphens
"money-saver",
"under-",
"well-known",
# Words with parenthesized parts (dictionary entries)
"Schild(chen)",
"(Salat-)Gurke",
"(auf)",
"(on)",
"selbst)",
"(wir",
"Tanz(veranstaltung)",
"(zer)brechen",
# Phonetic brackets
"serva]",
"['mani",
"[eg]",
"[maus]",
# Words with trailing punctuation
"cupcakes.",
"sister.",
"mice",
# Abbreviations
"e.g.",
"sth.",
"usw.",
"adj.",
# Ellipsis
"...",
"\u2026",
# Regular words
"the",
"cat",
"big",
"run",
"set",
"ago",
])
def test_keep_real_tokens(self, token):
"""Real words, dictionary punctuation, and phonetic brackets are kept."""
assert _is_noise_tail_token(token) is False, f"Should keep {token!r}"
# --- Tokens that should be FILTERED (return True) ---
@pytest.mark.parametrize("token", [
# Pure non-alpha
"B|",
"3d",
"x7",
")",
"|",
"@",
"3",
# Very short non-dictionary fragments
"ee",
"k",
"zz",
"qq",
# Empty
"",
" ",
])
def test_filter_noise_tokens(self, token):
"""OCR noise fragments are filtered."""
assert _is_noise_tail_token(token) is True, f"Should filter {token!r}"
class TestCleanCellText:
"""Test _clean_cell_text integration (full text → cleaned text)."""
def test_empty_returns_empty(self):
assert _clean_cell_text("") == ""
assert _clean_cell_text(" ") == ""
def test_real_word_unchanged(self):
assert _clean_cell_text("cupcakes") == "cupcakes"
def test_strips_trailing_noise(self):
"""Trailing noise tokens should be removed."""
result = _clean_cell_text("cupcakes B|")
assert result == "cupcakes"
def test_keeps_trailing_real_word(self):
"""Trailing real words should be kept."""
result = _clean_cell_text("big cat")
assert result == "big cat"
def test_abbreviation_kept(self):
"""Known abbreviations should not be cleared."""
result = _clean_cell_text("e.g.")
assert result == "e.g."
def test_pure_garbage_cleared(self):
"""OCR garbage without real words should be cleared."""
result = _clean_cell_text("3d |x")
assert result == ""
def test_compound_word_preserved(self):
"""Compound words with hyphens should be preserved."""
result = _clean_cell_text("money-saver")
assert result == "money-saver"
def test_parenthesized_word_preserved(self):
result = _clean_cell_text("(Salat-)Gurke")
assert result == "(Salat-)Gurke"
def test_multiple_trailing_noise(self):
"""Multiple trailing noise tokens should all be removed."""
result = _clean_cell_text("achieve 3 |")
assert result == "achieve"
class TestPhoneticOnlyText:
"""Test _is_phonetic_only_text for phonetic transcription detection."""
@pytest.mark.parametrize("text,expected", [
# Phonetic-only patterns → True
("['mani serva]", True),
("[dɑːns]", True),
("[\"a:mand]", True),
("['wɜːkʃɒp]", True),
# serva] has 5 alpha chars after bracket removal → NOT phonetic-only
("serva]", False),
# NOT phonetic-only → False
("almond ['a:mand]", False),
("Mandel", False),
("cupcakes", False),
("", False),
("achieve", False),
("money-saver ['mani]", False),
])
def test_phonetic_detection(self, text, expected):
assert _is_phonetic_only_text(text) is expected, \
f"_is_phonetic_only_text({text!r}) should be {expected}"
class TestMergePhoneticContinuationRows:
"""Test _merge_phonetic_continuation_rows for phonetic row merging."""
def test_empty_list(self):
assert _merge_phonetic_continuation_rows([]) == []
def test_single_entry(self):
entries = [{"english": "cat", "german": "Katze", "example": ""}]
result = _merge_phonetic_continuation_rows(entries)
assert len(result) == 1
assert result[0]["english"] == "cat"
def test_merges_phonetic_row(self):
"""Phonetic-only row should merge into previous entry."""
entries = [
{"english": "money-saver", "german": "Sparfuchs", "example": "", "row_index": 0},
{"english": "['mani serva]", "german": "", "example": "", "row_index": 1},
]
result = _merge_phonetic_continuation_rows(entries)
assert len(result) == 1
assert result[0]["english"] == "money-saver ['mani serva]"
assert result[0]["german"] == "Sparfuchs"
def test_no_merge_when_de_present(self):
"""Row with DE text should NOT be merged even if EN looks phonetic."""
entries = [
{"english": "cat", "german": "Katze", "example": ""},
{"english": "[kæt]", "german": "some text", "example": ""},
]
result = _merge_phonetic_continuation_rows(entries)
assert len(result) == 2
def test_no_merge_regular_rows(self):
"""Normal vocab rows should not be merged."""
entries = [
{"english": "cat", "german": "Katze", "example": ""},
{"english": "dog", "german": "Hund", "example": ""},
]
result = _merge_phonetic_continuation_rows(entries)
assert len(result) == 2
def test_merges_example_too(self):
"""If phonetic row has example text, it should merge into previous."""
entries = [
{"english": "dance", "german": "tanzen", "example": "", "row_index": 0},
{"english": "[dɑːns]", "german": "", "example": "Let's dance.", "row_index": 1},
]
result = _merge_phonetic_continuation_rows(entries)
assert len(result) == 1
assert result[0]["english"] == "dance [dɑːns]"
assert result[0]["example"] == "Let's dance."
class TestMergeContinuationRows:
"""Test _merge_continuation_rows for multi-line entry merging."""
def test_empty_list(self):
assert _merge_continuation_rows([]) == []
def test_no_merge_independent_rows(self):
"""Rows with both EN and DE should not be merged."""
entries = [
{"english": "cat", "german": "Katze", "example": "", "row_index": 0},
{"english": "dog", "german": "Hund", "example": "", "row_index": 1},
]
result = _merge_continuation_rows(entries)
assert len(result) == 2
def test_merge_lowercase_continuation(self):
"""Lowercase EN with empty DE should merge into previous."""
entries = [
{"english": "to put up", "german": "aufstellen", "example": "", "row_index": 0},
{"english": "with sth.", "german": "", "example": "", "row_index": 1},
]
result = _merge_continuation_rows(entries)
assert len(result) == 1
assert result[0]["english"] == "to put up with sth."
assert result[0]["german"] == "aufstellen"
def test_no_merge_uppercase_start(self):
"""EN starting with uppercase and empty DE is likely its own entry, not a continuation."""
entries = [
{"english": "cat", "german": "Katze", "example": "", "row_index": 0},
{"english": "Dog", "german": "", "example": "", "row_index": 1},
]
result = _merge_continuation_rows(entries)
assert len(result) == 2
def test_no_merge_when_previous_ends_with_period(self):
"""If previous entry ends with sentence terminator, next is not continuation."""
entries = [
{"english": "That's great.", "german": "Das ist toll.", "example": "", "row_index": 0},
{"english": "really nice", "german": "", "example": "", "row_index": 1},
]
result = _merge_continuation_rows(entries)
assert len(result) == 2
def test_no_merge_long_text(self):
"""Text with 4+ words is likely an example sentence, not continuation."""
entries = [
{"english": "achieve", "german": "erreichen", "example": "", "row_index": 0},
{"english": "she achieved her goals", "german": "", "example": "", "row_index": 1},
]
result = _merge_continuation_rows(entries)
assert len(result) == 2
def test_first_entry_not_merged(self):
"""First entry with empty DE should not crash (no previous)."""
entries = [
{"english": "something", "german": "", "example": "", "row_index": 0},
{"english": "cat", "german": "Katze", "example": "", "row_index": 1},
]
result = _merge_continuation_rows(entries)
assert len(result) == 2
# =============================================
# Test: Content-Bounds Scan-Artifact Filtering
# =============================================
class TestContentBoundsFiltering:
"""Test that _find_content_bounds filters narrow scan artifacts."""
def test_thin_vertical_line_ignored(self):
"""A 2px black line at the left edge should not pull left_x leftward."""
inv = np.zeros((400, 600), dtype=np.uint8)
# Main content block in the middle
inv[50:350, 100:550] = 255
# 2px thin vertical scan artifact at x=5..6
inv[50:350, 5:7] = 255
left, right, top, bottom = _find_content_bounds(inv)
# left_x must be near 100 (the real content), not near 5
assert left >= 90, f"left_x={left} should be >=90 (near real content, not artifact)"
def test_thick_content_preserved(self):
"""A 50px wide text block is real content and must not be filtered."""
inv = np.zeros((400, 600), dtype=np.uint8)
inv[50:350, 80:130] = 255 # 50px wide block
inv[50:350, 200:500] = 255 # wider block
left, right, top, bottom = _find_content_bounds(inv)
assert left <= 82, f"left_x={left} should be <=82 (50px block is real content)"
def test_no_artifacts_unchanged(self):
"""Normal image without artifacts: bounds should match content."""
inv = np.zeros((400, 600), dtype=np.uint8)
inv[100:300, 50:550] = 255
left, right, top, bottom = _find_content_bounds(inv)
assert left <= 52
assert right >= 548
assert top <= 105
assert bottom >= 295
def test_right_edge_artifact_ignored(self):
"""A thin vertical line at the right edge should not pull right_x rightward."""
inv = np.zeros((400, 600), dtype=np.uint8)
inv[50:350, 50:500] = 255 # real content
inv[50:350, 595:598] = 255 # 3px artifact at right edge
left, right, top, bottom = _find_content_bounds(inv)
assert right <= 510, f"right_x={right} should be <=510, ignoring right-edge artifact"
def test_horizontal_line_ignored(self):
"""A thin horizontal line at the top should not pull top_y upward."""
inv = np.zeros((400, 600), dtype=np.uint8)
inv[100:350, 50:550] = 255 # real content
inv[2:4, 50:550] = 255 # 2px horizontal artifact at top
left, right, top, bottom = _find_content_bounds(inv)
assert top >= 90, f"top_y={top} should be >=90 (ignoring thin top line)"
class TestFilterNarrowRuns:
"""Test the _filter_narrow_runs helper directly."""
def test_removes_short_run(self):
mask = np.array([False, True, True, False, True, True, True, True, True, False])
result = _filter_narrow_runs(mask, min_width=3)
# The 2-wide run at indices 1-2 should be removed
assert not result[1]
assert not result[2]
# The 5-wide run at indices 4-8 should remain
assert result[4]
assert result[8]
def test_keeps_wide_run(self):
mask = np.array([True] * 10)
result = _filter_narrow_runs(mask, min_width=5)
assert all(result)
def test_all_narrow(self):
mask = np.array([True, True, False, True, False])
result = _filter_narrow_runs(mask, min_width=3)
assert not any(result)
# =============================================
# Test: Margin Regions
# =============================================
class TestMarginRegions:
"""Test _build_margin_regions and margin integration."""
def test_margin_left_created(self):
"""When left_x > 5, a margin_left region should be created."""
existing = [
PageRegion(type='column_en', x=100, y=50, width=200, height=300),
PageRegion(type='column_de', x=320, y=50, width=200, height=300),
]
margins = _build_margin_regions(existing, left_x=100, right_x=520,
img_w=600, top_y=50, content_h=300)
left_margins = [m for m in margins if m.type == 'margin_left']
assert len(left_margins) == 1
ml = left_margins[0]
assert ml.x == 0
assert ml.width == 100
def test_margin_right_created(self):
"""When there's space after the last column, margin_right should be created."""
existing = [
PageRegion(type='column_en', x=50, y=50, width=200, height=300),
PageRegion(type='column_de', x=260, y=50, width=200, height=300),
]
# last_col_end = 260 + 200 = 460, img_w = 600 → gap = 140
margins = _build_margin_regions(existing, left_x=50, right_x=460,
img_w=600, top_y=50, content_h=300)
right_margins = [m for m in margins if m.type == 'margin_right']
assert len(right_margins) == 1
mr = right_margins[0]
assert mr.x == 460
assert mr.width == 140
def test_no_margin_when_flush(self):
"""When columns are flush with the image edges, no margins should appear."""
existing = [
PageRegion(type='column_en', x=0, y=0, width=300, height=400),
PageRegion(type='column_de', x=300, y=0, width=300, height=400),
]
margins = _build_margin_regions(existing, left_x=0, right_x=600,
img_w=600, top_y=0, content_h=400)
assert len(margins) == 0
def test_margins_in_skip_types(self):
"""Verify margin types are in the skip set used by build_cell_grid."""
skip = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'page_ref', 'margin_left', 'margin_right'}
assert 'margin_left' in skip
assert 'margin_right' in skip
def test_margin_confidence_and_method(self):
"""Margin regions should have confidence 1.0 and method 'content_bounds'."""
existing = [PageRegion(type='column_en', x=80, y=20, width=400, height=500)]
margins = _build_margin_regions(existing, left_x=80, right_x=480,
img_w=600, top_y=20, content_h=500)
for m in margins:
assert m.classification_confidence == 1.0
assert m.classification_method == 'content_bounds'
# =============================================
# Header/Footer Gap Detection
# =============================================
class TestHeaderFooterGapDetection:
"""Tests for _detect_header_footer_gaps()."""
def _make_inv(self, height: int, width: int, bands: list) -> np.ndarray:
"""Create an inverted binary image with white horizontal bands.
Args:
height: Image height.
width: Image width.
bands: List of (y_start, y_end) tuples where pixels are white (255).
"""
inv = np.zeros((height, width), dtype=np.uint8)
for y1, y2 in bands:
inv[y1:y2, :] = 255
return inv
def _make_body_with_lines(self, h, w, body_start, body_end,
line_h=15, gap_h=12):
"""Create bands simulating text lines with inter-line gaps.
gap_h must be large enough to survive smoothing (kernel ~ h//200).
"""
bands = []
y = body_start
while y + line_h <= body_end:
bands.append((y, y + line_h))
y += line_h + gap_h
return bands
def test_header_gap_detected(self):
"""Content at top + large gap + main body → header_y at the gap."""
h, w = 2000, 800
# Header content at rows 20-80
bands = [(20, 80)]
# Large gap 80-300 (220px) — much larger than 12px line gaps
# Body lines from 300 to ~1990 (extends near bottom, no footer gap)
bands += self._make_body_with_lines(h, w, 300, 1990)
inv = self._make_inv(h, w, bands)
header_y, footer_y = _detect_header_footer_gaps(inv, w, h)
assert header_y is not None
assert 80 <= header_y <= 310
def test_footer_gap_detected(self):
"""Main body + large gap + page number → footer_y at the gap."""
h, w = 2000, 800
# Body lines from 10 to 1600 (starts near top, no header gap)
bands = self._make_body_with_lines(h, w, 10, 1600)
# Large gap 1600-1880 (280px)
# Page number 1880-1920
bands.append((1880, 1920))
inv = self._make_inv(h, w, bands)
header_y, footer_y = _detect_header_footer_gaps(inv, w, h)
assert footer_y is not None
assert 1580 <= footer_y <= 1890
def test_both_header_and_footer(self):
"""Header + gap + body lines + gap + footer → both detected."""
h, w = 2000, 800
# Header 10-60
bands = [(10, 60)]
# Large gap 60-250 (190px)
# Body lines from 250 to 1700
bands += self._make_body_with_lines(h, w, 250, 1700)
# Large gap 1700-1900 (200px)
# Footer 1900-1970
bands.append((1900, 1970))
inv = self._make_inv(h, w, bands)
header_y, footer_y = _detect_header_footer_gaps(inv, w, h)
assert header_y is not None
assert footer_y is not None
assert 60 <= header_y <= 260
assert 1690 <= footer_y <= 1910
def test_no_gaps_returns_none(self):
"""Uniform content across the page → (None, None)."""
h, w = 1000, 800
# Content across entire height
inv = self._make_inv(h, w, [(0, 1000)])
header_y, footer_y = _detect_header_footer_gaps(inv, w, h)
assert header_y is None
assert footer_y is None
def test_small_gaps_ignored(self):
"""Gaps smaller than 2x median should be ignored."""
h, w = 1000, 800
# Many small, evenly-spaced gaps (like line spacing) — no large outlier
bands = []
for row_start in range(0, 1000, 20):
bands.append((row_start, row_start + 15)) # 15px content, 5px gap
inv = self._make_inv(h, w, bands)
header_y, footer_y = _detect_header_footer_gaps(inv, w, h)
# All gaps are equal size, none > 2x median → no header/footer
assert header_y is None
assert footer_y is None
def test_edge_gaps_ignored_dewarp_padding(self):
"""Trailing gap at bottom edge (dewarp padding) should not be detected as footer."""
h, w = 2000, 800
# Body lines from 10 to 1700
bands = self._make_body_with_lines(h, w, 10, 1700)
# Gap from 1700 to 2000 = bottom edge padding (no content after)
inv = self._make_inv(h, w, bands)
header_y, footer_y = _detect_header_footer_gaps(inv, w, h)
# The trailing gap touches the image edge → not a valid separator
assert footer_y is None
class TestRegionContentCheck:
"""Tests for _region_has_content() and _add_header_footer() type selection."""
def _make_inv(self, height: int, width: int, bands: list) -> np.ndarray:
inv = np.zeros((height, width), dtype=np.uint8)
for y1, y2 in bands:
inv[y1:y2, :] = 255
return inv
def test_region_with_text_has_content(self):
"""Strip with ink → True."""
inv = self._make_inv(1000, 800, [(10, 50)])
assert _region_has_content(inv, 0, 100) is True
def test_empty_region_no_content(self):
"""Strip without ink → False."""
inv = self._make_inv(1000, 800, [(500, 600)])
assert _region_has_content(inv, 0, 100) is False
def test_header_with_text_is_header(self):
"""Top region with text → type='header' (via content bounds fallback)."""
h, w = 1000, 800
# Header text at 20-60, body starts at 200
inv = self._make_inv(h, w, [(20, 60), (200, 900)])
regions: list = []
# Simulate content bounds detecting body start at y=200
_add_header_footer(regions, top_y=200, bottom_y=h, img_w=w, img_h=h, inv=inv)
top_regions = [r for r in regions if r.type in ('header', 'margin_top')]
assert len(top_regions) == 1
assert top_regions[0].type == 'header' # text at 20-60 → header
def test_empty_top_is_margin_top(self):
"""Top region without text → type='margin_top'."""
h, w = 1000, 800
# Content only in body area (200-900), nothing in top 200px
inv = self._make_inv(h, w, [(200, 900)])
regions: list = []
# Simulate top_y=200 from content bounds
_add_header_footer(regions, top_y=200, bottom_y=h, img_w=w, img_h=h, inv=inv)
top_regions = [r for r in regions if r.type in ('header', 'margin_top')]
assert len(top_regions) == 1
assert top_regions[0].type == 'margin_top'
def test_empty_bottom_is_margin_bottom(self):
"""Bottom region without text → type='margin_bottom'."""
h, w = 1000, 800
# Content only in top/body (50-700), nothing below 700
inv = self._make_inv(h, w, [(50, 700)])
regions: list = []
_add_header_footer(regions, top_y=50, bottom_y=700, img_w=w, img_h=h, inv=inv)
bottom_regions = [r for r in regions if r.type in ('footer', 'margin_bottom')]
assert len(bottom_regions) == 1
assert bottom_regions[0].type == 'margin_bottom'
def test_footer_with_page_number_is_footer(self):
"""Bottom region with page number text → type='footer'."""
h, w = 1000, 800
# Body 50-700, page number at 900-930
inv = self._make_inv(h, w, [(50, 700), (900, 930)])
regions: list = []
_add_header_footer(regions, top_y=50, bottom_y=700, img_w=w, img_h=h, inv=inv)
bottom_regions = [r for r in regions if r.type in ('footer', 'margin_bottom')]
assert len(bottom_regions) == 1
assert bottom_regions[0].type == 'footer'
# =============================================
# Sub-Column Detection Tests
# =============================================
class TestSubColumnDetection:
"""Tests for _detect_sub_columns() left-edge alignment detection."""
def _make_word(self, left: int, text: str = "word", conf: int = 90) -> dict:
return {'left': left, 'top': 100, 'width': 50, 'height': 20,
'text': text, 'conf': conf}
def _make_geo(self, x: int, width: int, words: list, content_w: int = 1000) -> ColumnGeometry:
return ColumnGeometry(
index=0, x=x, y=50, width=width, height=500,
word_count=len(words), words=words,
width_ratio=width / content_w,
)
def test_sub_column_split_page_refs(self):
"""3 page-refs left + 40 vocab words right → split into 2.
The leftmost bin with >= 10% of words (>= 5) is the vocab bin
at left=250, so the 3 page-refs are outliers.
"""
content_w = 1000
page_words = [self._make_word(100, f"p.{59+i}") for i in range(3)]
vocab_words = [self._make_word(250, f"word{i}") for i in range(40)]
all_words = page_words + vocab_words
geo = self._make_geo(x=80, width=300, words=all_words, content_w=content_w)
result = _detect_sub_columns([geo], content_w)
assert len(result) == 2, f"Expected 2 columns, got {len(result)}"
left_col = result[0]
right_col = result[1]
assert left_col.x < right_col.x
assert left_col.word_count == 3
assert right_col.word_count == 40
assert left_col.index == 0
assert right_col.index == 1
def test_sub_column_split_exclamation_marks(self):
"""5 '!' (misread as I/|) left + 80 example words → split into 2.
Mirrors the real-world case where red ! marks are OCR'd as I, |, B, 1
at a position slightly left of the example sentence start.
"""
content_w = 1500
bang_words = [self._make_word(950 + i, chr(ord('I')), conf=60) for i in range(5)]
example_words = [self._make_word(975 + (i * 3), f"word{i}") for i in range(80)]
all_words = bang_words + example_words
geo = self._make_geo(x=940, width=530, words=all_words, content_w=content_w)
result = _detect_sub_columns([geo], content_w)
assert len(result) == 2
assert result[0].word_count == 5
assert result[1].word_count == 80
def test_no_split_uniform_alignment(self):
"""All words aligned at same position → no change."""
content_w = 1000
words = [self._make_word(200, f"word{i}") for i in range(15)]
geo = self._make_geo(x=180, width=300, words=words, content_w=content_w)
result = _detect_sub_columns([geo], content_w)
assert len(result) == 1
assert result[0].word_count == 15
def test_no_split_narrow_column(self):
"""Narrow column (width_ratio < 0.15) → no split attempted."""
content_w = 1000
words = [self._make_word(50, "a")] * 3 + [self._make_word(120, "b")] * 10
geo = self._make_geo(x=40, width=140, words=words, content_w=content_w)
result = _detect_sub_columns([geo], content_w)
assert len(result) == 1
def test_no_split_balanced_clusters(self):
"""Both clusters similarly sized (ratio >= 0.35) → no split."""
content_w = 1000
left_words = [self._make_word(100, f"a{i}") for i in range(8)]
right_words = [self._make_word(300, f"b{i}") for i in range(12)]
all_words = left_words + right_words
geo = self._make_geo(x=80, width=400, words=all_words, content_w=content_w)
result = _detect_sub_columns([geo], content_w)
assert len(result) == 1
def test_sub_column_reindexing(self):
"""After split, indices are correctly 0, 1, 2 across all columns."""
content_w = 1000
# First column: no split (all words at same alignment)
words1 = [self._make_word(50, f"de{i}") for i in range(10)]
geo1 = ColumnGeometry(index=0, x=30, y=50, width=200, height=500,
word_count=10, words=words1, width_ratio=0.2)
# Second column: will split (3 outliers + 40 main)
page_words = [self._make_word(400, f"p.{i}") for i in range(3)]
en_words = [self._make_word(550, f"en{i}") for i in range(40)]
geo2 = ColumnGeometry(index=1, x=380, y=50, width=300, height=500,
word_count=43, words=page_words + en_words, width_ratio=0.3)
result = _detect_sub_columns([geo1, geo2], content_w)
assert len(result) == 3
assert [g.index for g in result] == [0, 1, 2]
assert result[0].word_count == 10
assert result[1].word_count == 3
assert result[2].word_count == 40
def test_no_split_too_few_words(self):
"""Column with fewer than 5 words → no split attempted."""
content_w = 1000
words = [self._make_word(100, "a"), self._make_word(300, "b"),
self._make_word(300, "c"), self._make_word(300, "d")]
geo = self._make_geo(x=80, width=300, words=words, content_w=content_w)
result = _detect_sub_columns([geo], content_w)
assert len(result) == 1
def test_no_split_single_minority_word(self):
"""Only 1 word left of column start → no split (need >= 2)."""
content_w = 1000
minority = [self._make_word(100, "p.59")]
majority = [self._make_word(300, f"w{i}") for i in range(30)]
geo = self._make_geo(x=80, width=350, words=minority + majority, content_w=content_w)
result = _detect_sub_columns([geo], content_w)
assert len(result) == 1
def test_sub_column_split_with_left_x_offset(self):
"""Word 'left' values are relative to left_x; geo.x is absolute.
Real-world scenario: left_x=195, EN column at geo.x=310.
Page refs at relative left=115-157, vocab words at relative left=216.
Without left_x, split_x would be ~202 (< geo.x=310) → negative width → no split.
With left_x=195, split_abs = 202 + 195 = 397, which is between geo.x(310)
and geo.x+geo.width(748) → valid split.
"""
content_w = 1469
left_x = 195
page_refs = [self._make_word(115, "p.59"), self._make_word(157, "p.60"),
self._make_word(157, "p.61")]
vocab = [self._make_word(216, f"word{i}") for i in range(40)]
all_words = page_refs + vocab
geo = self._make_geo(x=310, width=438, words=all_words, content_w=content_w)
result = _detect_sub_columns([geo], content_w, left_x=left_x)
assert len(result) == 2, f"Expected 2 columns, got {len(result)}"
assert result[0].word_count == 3
assert result[1].word_count == 40
def test_header_words_excluded_from_alignment(self):
"""Header words (top < header_y) should not participate in alignment clustering.
Without header_y: 3 header words at left=100 + 40 content words at left=250
would cause a split (3 outliers vs 40 main).
With header_y: the 3 header words are excluded from clustering, leaving only
40 uniform words at left=250 → no split.
"""
content_w = 1000
top_y = 0
# Header words: top=5 (relative to top_y=0), well above header_y=50
header_words = [{'left': 100, 'top': 5, 'width': 50, 'height': 20,
'text': f"Ch.{i}", 'conf': 90} for i in range(3)]
# Content words: top=200, below header_y=50
content_words = [{'left': 250, 'top': 200, 'width': 50, 'height': 20,
'text': f"word{i}", 'conf': 90} for i in range(40)]
all_words = header_words + content_words
geo = self._make_geo(x=80, width=300, words=all_words, content_w=content_w)
# Without header_y: split happens (3 outliers at left=100)
result_no_filter = _detect_sub_columns([geo], content_w)
assert len(result_no_filter) == 2, "Should split without header filtering"
# With header_y=50: header words excluded, only 40 uniform words remain → no split
result_filtered = _detect_sub_columns([geo], content_w, top_y=top_y, header_y=50)
assert len(result_filtered) == 1, "Should NOT split with header words excluded"
assert result_filtered[0].word_count == 43 # all words still in the geometry
def test_footer_words_excluded_from_alignment(self):
"""Footer words (top > footer_y) should not participate in alignment clustering.
Analog to header test but with footer words at the bottom.
"""
content_w = 1000
top_y = 0
# Content words: top=200, above footer_y=800
content_words = [{'left': 250, 'top': 200, 'width': 50, 'height': 20,
'text': f"word{i}", 'conf': 90} for i in range(40)]
# Footer words: top=900, below footer_y=800
footer_words = [{'left': 100, 'top': 900, 'width': 50, 'height': 20,
'text': f"p.{i}", 'conf': 90} for i in range(3)]
all_words = content_words + footer_words
geo = self._make_geo(x=80, width=300, words=all_words, content_w=content_w)
# Without footer_y: split happens (3 outliers at left=100)
result_no_filter = _detect_sub_columns([geo], content_w)
assert len(result_no_filter) == 2, "Should split without footer filtering"
# With footer_y=800: footer words excluded → no split
result_filtered = _detect_sub_columns([geo], content_w, top_y=top_y, footer_y=800)
assert len(result_filtered) == 1, "Should NOT split with footer words excluded"
assert result_filtered[0].word_count == 43
def test_header_footer_none_no_filtering(self):
"""header_y=None, footer_y=None → same behavior as before (no filtering)."""
content_w = 1000
page_words = [self._make_word(100, f"p.{59+i}") for i in range(3)]
vocab_words = [self._make_word(250, f"word{i}") for i in range(40)]
all_words = page_words + vocab_words
geo = self._make_geo(x=80, width=300, words=all_words, content_w=content_w)
result = _detect_sub_columns([geo], content_w, header_y=None, footer_y=None)
assert len(result) == 2, "Should still split with None header/footer"
assert result[0].word_count == 3
assert result[1].word_count == 40
class TestCellsToVocabEntriesPageRef:
"""Test that page_ref cells are mapped to source_page field."""
def test_page_ref_mapped_to_source_page(self):
"""Cell with col_type='page_ref' → source_page field populated."""
from ocr.cv_pipeline import _cells_to_vocab_entries
cells = [
{
'row_index': 0,
'col_type': 'column_en',
'text': 'hello',
'bbox_pct': {'x': 10, 'y': 10, 'w': 30, 'h': 5},
'confidence': 95.0,
'ocr_engine': 'tesseract',
},
{
'row_index': 0,
'col_type': 'column_de',
'text': 'hallo',
'bbox_pct': {'x': 40, 'y': 10, 'w': 30, 'h': 5},
'confidence': 90.0,
'ocr_engine': 'tesseract',
},
{
'row_index': 0,
'col_type': 'page_ref',
'text': 'p.59',
'bbox_pct': {'x': 5, 'y': 10, 'w': 5, 'h': 5},
'confidence': 80.0,
'ocr_engine': 'tesseract',
},
]
columns_meta = [
{'type': 'column_en'}, {'type': 'column_de'}, {'type': 'page_ref'},
]
entries = _cells_to_vocab_entries(cells, columns_meta)
assert len(entries) == 1
assert entries[0]['english'] == 'hello'
assert entries[0]['german'] == 'hallo'
assert entries[0]['source_page'] == 'p.59'
assert entries[0]['bbox_ref'] == {'x': 5, 'y': 10, 'w': 5, 'h': 5}
def test_no_page_ref_defaults_empty(self):
"""Without page_ref cell, source_page defaults to empty string."""
from ocr.cv_pipeline import _cells_to_vocab_entries
cells = [
{
'row_index': 0,
'col_type': 'column_en',
'text': 'world',
'bbox_pct': {'x': 10, 'y': 10, 'w': 30, 'h': 5},
'confidence': 95.0,
'ocr_engine': 'tesseract',
},
]
columns_meta = [{'type': 'column_en'}]
entries = _cells_to_vocab_entries(cells, columns_meta)
assert len(entries) == 1
assert entries[0]['source_page'] == ''
assert entries[0]['bbox_ref'] is None
def test_marker_only_row_included(self):
"""Row with only a marker (no english/german/example) is kept."""
from ocr.cv_pipeline import _cells_to_vocab_entries
cells = [
# Row 0: has english + marker
{
'row_index': 0,
'col_type': 'column_en',
'text': 'hello',
'bbox_pct': {'x': 10, 'y': 10, 'w': 30, 'h': 5},
'confidence': 95.0,
'ocr_engine': 'tesseract',
},
{
'row_index': 0,
'col_type': 'column_marker',
'text': '!',
'bbox_pct': {'x': 5, 'y': 10, 'w': 3, 'h': 5},
'confidence': 80.0,
'ocr_engine': 'tesseract',
},
# Row 1: marker only (no english/german/example)
{
'row_index': 1,
'col_type': 'column_en',
'text': '',
'bbox_pct': {'x': 10, 'y': 20, 'w': 30, 'h': 5},
'confidence': 0.0,
'ocr_engine': 'tesseract',
},
{
'row_index': 1,
'col_type': 'column_marker',
'text': '!',
'bbox_pct': {'x': 5, 'y': 20, 'w': 3, 'h': 5},
'confidence': 70.0,
'ocr_engine': 'tesseract',
},
# Row 2: completely empty (should be excluded)
{
'row_index': 2,
'col_type': 'column_en',
'text': '',
'bbox_pct': {'x': 10, 'y': 30, 'w': 30, 'h': 5},
'confidence': 0.0,
'ocr_engine': 'tesseract',
},
{
'row_index': 2,
'col_type': 'column_marker',
'text': '',
'bbox_pct': {'x': 5, 'y': 30, 'w': 3, 'h': 5},
'confidence': 0.0,
'ocr_engine': 'tesseract',
},
]
columns_meta = [
{'type': 'column_en'}, {'type': 'column_marker'},
]
entries = _cells_to_vocab_entries(cells, columns_meta)
# Row 0 (has english) and Row 1 (has marker) should be included
# Row 2 (completely empty) should be excluded
assert len(entries) == 2
assert entries[0]['english'] == 'hello'
assert entries[0]['marker'] == '!'
assert entries[1]['english'] == ''
assert entries[1]['marker'] == '!'
def test_page_ref_only_row_included(self):
"""Row with only source_page text is kept (no english/german/example)."""
from ocr.cv_pipeline import _cells_to_vocab_entries
cells = [
{
'row_index': 0,
'col_type': 'column_en',
'text': '',
'bbox_pct': {'x': 10, 'y': 10, 'w': 30, 'h': 5},
'confidence': 0.0,
'ocr_engine': 'tesseract',
},
{
'row_index': 0,
'col_type': 'page_ref',
'text': 'p.59',
'bbox_pct': {'x': 5, 'y': 10, 'w': 5, 'h': 5},
'confidence': 80.0,
'ocr_engine': 'tesseract',
},
]
columns_meta = [{'type': 'column_en'}, {'type': 'page_ref'}]
entries = _cells_to_vocab_entries(cells, columns_meta)
assert len(entries) == 1
assert entries[0]['source_page'] == 'p.59'
# =============================================
# CELL-FIRST OCR (v2) TESTS
# =============================================
class TestCleanCellTextLite:
"""Tests for _clean_cell_text_lite() — simplified noise filter."""
def test_empty_string(self):
assert _clean_cell_text_lite('') == ''
def test_whitespace_only(self):
assert _clean_cell_text_lite(' ') == ''
def test_real_word_passes(self):
assert _clean_cell_text_lite('hello') == 'hello'
def test_sentence_passes(self):
assert _clean_cell_text_lite('to have dinner') == 'to have dinner'
def test_garbage_text_cleared(self):
"""Garbage text (no dictionary words) should be cleared."""
assert _clean_cell_text_lite('xqzjk') == ''
def test_no_real_word_cleared(self):
"""Single chars with no real word (2+ letters) cleared."""
assert _clean_cell_text_lite('3') == ''
assert _clean_cell_text_lite('|') == ''
def test_known_abbreviation_kept(self):
"""Known abbreviations should pass through."""
assert _clean_cell_text_lite('sth') == 'sth'
assert _clean_cell_text_lite('eg') == 'eg'
def test_no_trailing_noise_stripping(self):
"""Unlike _clean_cell_text, lite does NOT strip trailing tokens.
Since cells are isolated, all tokens are legitimate."""
result = _clean_cell_text_lite('apple tree')
assert result == 'apple tree'
def test_page_reference(self):
"""Page references like p.60 should pass."""
# 'p' is a known abbreviation
assert _clean_cell_text_lite('p.60') != ''
class TestOcrCellCrop:
"""Tests for _ocr_cell_crop() — isolated cell OCR."""
def test_empty_cell_pixel_density(self):
"""Cells with very few dark pixels should return empty text."""
# All white image → no text
ocr_img = np.ones((400, 600), dtype=np.uint8) * 255
row = RowGeometry(index=0, x=0, y=50, width=600, height=30,
word_count=1, words=[{'text': 'a'}])
col = PageRegion(type='column_en', x=50, y=0, width=200, height=400)
result = _ocr_cell_crop(
0, 0, row, col, ocr_img, None, 600, 400,
'tesseract', 'eng+deu', {'column_en': 'eng'},
)
assert result['text'] == ''
assert result['cell_id'] == 'R00_C0'
assert result['col_type'] == 'column_en'
def test_zero_width_cell(self):
"""Zero-width cells should return empty."""
ocr_img = np.ones((400, 600), dtype=np.uint8) * 255
row = RowGeometry(index=0, x=0, y=50, width=600, height=30,
word_count=1, words=[])
col = PageRegion(type='column_en', x=50, y=0, width=0, height=400)
result = _ocr_cell_crop(
0, 0, row, col, ocr_img, None, 600, 400,
'tesseract', 'eng+deu', {},
)
assert result['text'] == ''
def test_bbox_calculation(self):
"""Check bbox_px and bbox_pct are correct."""
ocr_img = np.ones((1000, 2000), dtype=np.uint8) * 255
row = RowGeometry(index=0, x=0, y=100, width=2000, height=50,
word_count=1, words=[{'text': 'test'}])
col = PageRegion(type='column_de', x=400, y=0, width=600, height=1000)
result = _ocr_cell_crop(
0, 0, row, col, ocr_img, None, 2000, 1000,
'tesseract', 'eng+deu', {'column_de': 'deu'},
)
assert result['bbox_px'] == {'x': 400, 'y': 100, 'w': 600, 'h': 50}
assert result['bbox_pct']['x'] == 20.0 # 400/2000*100
assert result['bbox_pct']['y'] == 10.0 # 100/1000*100
class TestDetectDocumentType:
"""Tests for detect_document_type() — image-based classification."""
def test_empty_image(self):
"""Empty image should default to full_text."""
empty = np.array([], dtype=np.uint8).reshape(0, 0)
result = detect_document_type(empty, empty)
assert result.doc_type == 'full_text'
assert result.pipeline == 'full_page'
def test_table_image_detected(self):
"""Image with clear column gaps and row gaps → table."""
# Create 600x400 binary image with 3 columns separated by white gaps
img = np.ones((400, 600), dtype=np.uint8) * 255
# Column 1: x=20..170
for y in range(30, 370, 20):
img[y:y+10, 20:170] = 0
# Gap: x=170..210 (white)
# Column 2: x=210..370
for y in range(30, 370, 20):
img[y:y+10, 210:370] = 0
# Gap: x=370..410 (white)
# Column 3: x=410..580
for y in range(30, 370, 20):
img[y:y+10, 410:580] = 0
bgr = np.stack([img, img, img], axis=-1)
result = detect_document_type(img, bgr)
assert result.doc_type in ('vocab_table', 'generic_table')
assert result.pipeline == 'cell_first'
assert result.confidence >= 0.5
def test_fulltext_image_detected(self):
"""Uniform text without column gaps → full_text."""
img = np.ones((400, 600), dtype=np.uint8) * 255
# Uniform text lines across full width (no column gaps)
for y in range(30, 370, 15):
img[y:y+8, 30:570] = 0
bgr = np.stack([img, img, img], axis=-1)
result = detect_document_type(img, bgr)
assert result.doc_type == 'full_text'
assert result.pipeline == 'full_page'
assert 'columns' in result.skip_steps
assert 'rows' in result.skip_steps
def test_result_has_features(self):
"""Result should contain debug features."""
img = np.ones((200, 300), dtype=np.uint8) * 255
bgr = np.stack([img, img, img], axis=-1)
result = detect_document_type(img, bgr)
assert 'vertical_gaps' in result.features
assert 'row_gaps' in result.features
assert 'density_mean' in result.features
assert 'density_std' in result.features
def test_document_type_result_dataclass(self):
"""DocumentTypeResult dataclass should initialize correctly."""
r = DocumentTypeResult(
doc_type='vocab_table',
confidence=0.9,
pipeline='cell_first',
)
assert r.doc_type == 'vocab_table'
assert r.skip_steps == []
assert r.features == {}
# =============================================
# RUN TESTS
# =============================================
if __name__ == "__main__":
pytest.main([__file__, "-v"])