Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 24s
CI / test-go-edu-search (push) Successful in 27s
CI / test-python-klausur (push) Failing after 1m51s
CI / test-python-agent-core (push) Successful in 14s
CI / test-nodejs-website (push) Successful in 17s
Word 'left' values in ColumnGeometry.words are relative to the content ROI (left_x), but geo.x is in absolute image coordinates. The split position was computed from relative word positions and then compared against absolute geo.x, resulting in negative widths and no splits on real data. Pass left_x through to _detect_sub_columns to bridge the two coordinate systems. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1408 lines
54 KiB
Python
1408 lines
54 KiB
Python
"""
|
||
Unit Tests for CV Vocab Pipeline (cv_vocab_pipeline.py)
|
||
|
||
Tests cover:
|
||
- Data classes (PageRegion, VocabRow, PipelineResult)
|
||
- Stage 2: Deskew image
|
||
- Stage 3: Dewarp (pass-through)
|
||
- Stage 4: Image preparation (OCR + Layout images)
|
||
- Stage 5: Layout analysis (content bounds, projection profiles, column detection)
|
||
- Stage 6: Multi-pass OCR region handling
|
||
- Stage 7: Line grouping and vocabulary matching
|
||
- Noise filter functions (_is_noise_tail_token, _clean_cell_text)
|
||
- Phonetic detection (_is_phonetic_only_text)
|
||
- Phonetic & continuation row merging
|
||
- Orchestrator (run_cv_pipeline)
|
||
|
||
DSGVO Note: All tests run locally with synthetic data. No external API calls.
|
||
"""
|
||
|
||
import pytest
|
||
import numpy as np
|
||
from unittest.mock import AsyncMock, MagicMock, patch, PropertyMock
|
||
from dataclasses import asdict
|
||
|
||
# Import module under test
|
||
from cv_vocab_pipeline import (
|
||
ColumnGeometry,
|
||
PageRegion,
|
||
VocabRow,
|
||
PipelineResult,
|
||
deskew_image,
|
||
dewarp_image,
|
||
create_ocr_image,
|
||
create_layout_image,
|
||
_find_content_bounds,
|
||
_filter_narrow_runs,
|
||
_build_margin_regions,
|
||
_detect_header_footer_gaps,
|
||
_detect_sub_columns,
|
||
_region_has_content,
|
||
_add_header_footer,
|
||
analyze_layout,
|
||
_group_words_into_lines,
|
||
match_lines_to_vocab,
|
||
run_cv_pipeline,
|
||
CV2_AVAILABLE,
|
||
TESSERACT_AVAILABLE,
|
||
CV_PIPELINE_AVAILABLE,
|
||
_is_noise_tail_token,
|
||
_clean_cell_text,
|
||
_is_phonetic_only_text,
|
||
_merge_phonetic_continuation_rows,
|
||
_merge_continuation_rows,
|
||
)
|
||
|
||
|
||
# =============================================
|
||
# FIXTURES
|
||
# =============================================
|
||
|
||
@pytest.fixture
|
||
def white_image():
|
||
"""Create a simple 300x200 white BGR image."""
|
||
return np.ones((200, 300, 3), dtype=np.uint8) * 255
|
||
|
||
|
||
@pytest.fixture
|
||
def text_like_image():
|
||
"""Create a 600x400 image with dark text-like regions simulating 3 columns."""
|
||
img = np.ones((400, 600, 3), dtype=np.uint8) * 255
|
||
|
||
# Column 1 (EN): x=20..170
|
||
for y in range(50, 350, 30):
|
||
img[y:y+15, 30:160, :] = 30 # Dark text lines
|
||
|
||
# Gap between col1 and col2: x=170..210 (white)
|
||
|
||
# Column 2 (DE): x=210..370
|
||
for y in range(50, 350, 30):
|
||
img[y:y+15, 220:360, :] = 30
|
||
|
||
# Gap between col2 and col3: x=370..410 (white)
|
||
|
||
# Column 3 (Example): x=410..580
|
||
for y in range(50, 350, 30):
|
||
img[y:y+15, 420:570, :] = 30
|
||
|
||
return img
|
||
|
||
|
||
@pytest.fixture
|
||
def binary_image():
|
||
"""Create a binary (single-channel) image for OCR tests."""
|
||
# White background (255) with some black text-like areas
|
||
img = np.ones((400, 600), dtype=np.uint8) * 255
|
||
# Add text-like dark bands
|
||
for y in range(50, 350, 30):
|
||
img[y:y+15, 30:570] = 0
|
||
return img
|
||
|
||
|
||
@pytest.fixture
|
||
def sample_words_column_en():
|
||
"""Sample OCR word dicts for English column."""
|
||
return [
|
||
{'text': 'achieve', 'left': 30, 'top': 50, 'width': 80, 'height': 15, 'conf': 90, 'region_type': 'column_en'},
|
||
{'text': 'improve', 'left': 30, 'top': 80, 'width': 80, 'height': 15, 'conf': 85, 'region_type': 'column_en'},
|
||
{'text': 'success', 'left': 30, 'top': 110, 'width': 80, 'height': 15, 'conf': 92, 'region_type': 'column_en'},
|
||
]
|
||
|
||
|
||
@pytest.fixture
|
||
def sample_words_column_de():
|
||
"""Sample OCR word dicts for German column."""
|
||
return [
|
||
{'text': 'erreichen', 'left': 220, 'top': 52, 'width': 100, 'height': 15, 'conf': 88, 'region_type': 'column_de'},
|
||
{'text': 'verbessern', 'left': 220, 'top': 82, 'width': 100, 'height': 15, 'conf': 80, 'region_type': 'column_de'},
|
||
{'text': 'Erfolg', 'left': 220, 'top': 112, 'width': 100, 'height': 15, 'conf': 95, 'region_type': 'column_de'},
|
||
]
|
||
|
||
|
||
@pytest.fixture
|
||
def sample_words_column_ex():
|
||
"""Sample OCR word dicts for Example column."""
|
||
return [
|
||
{'text': 'She', 'left': 420, 'top': 50, 'width': 30, 'height': 15, 'conf': 85, 'region_type': 'column_example'},
|
||
{'text': 'achieved', 'left': 455, 'top': 50, 'width': 70, 'height': 15, 'conf': 80, 'region_type': 'column_example'},
|
||
{'text': 'her', 'left': 530, 'top': 50, 'width': 30, 'height': 15, 'conf': 90, 'region_type': 'column_example'},
|
||
{'text': 'goals.', 'left': 420, 'top': 52, 'width': 50, 'height': 15, 'conf': 75, 'region_type': 'column_example'},
|
||
]
|
||
|
||
|
||
@pytest.fixture
|
||
def sample_regions():
|
||
"""Sample 3-column PageRegion layout."""
|
||
return [
|
||
PageRegion(type='column_en', x=0, y=50, width=190, height=300),
|
||
PageRegion(type='column_de', x=210, y=50, width=160, height=300),
|
||
PageRegion(type='column_example', x=410, y=50, width=190, height=300),
|
||
]
|
||
|
||
|
||
# =============================================
|
||
# DATA CLASS TESTS
|
||
# =============================================
|
||
|
||
class TestDataClasses:
|
||
"""Test data classes for correct defaults and fields."""
|
||
|
||
def test_page_region_creation(self):
|
||
region = PageRegion(type='column_en', x=10, y=20, width=100, height=200)
|
||
assert region.type == 'column_en'
|
||
assert region.x == 10
|
||
assert region.y == 20
|
||
assert region.width == 100
|
||
assert region.height == 200
|
||
|
||
def test_vocab_row_defaults(self):
|
||
row = VocabRow()
|
||
assert row.english == ""
|
||
assert row.german == ""
|
||
assert row.example == ""
|
||
assert row.confidence == 0.0
|
||
assert row.y_position == 0
|
||
|
||
def test_vocab_row_with_values(self):
|
||
row = VocabRow(english="test", german="Test", example="A test.", confidence=85.5, y_position=100)
|
||
assert row.english == "test"
|
||
assert row.german == "Test"
|
||
assert row.confidence == 85.5
|
||
|
||
def test_pipeline_result_defaults(self):
|
||
result = PipelineResult()
|
||
assert result.vocabulary == []
|
||
assert result.word_count == 0
|
||
assert result.columns_detected == 0
|
||
assert result.duration_seconds == 0.0
|
||
assert result.stages == {}
|
||
assert result.error is None
|
||
|
||
def test_pipeline_result_error(self):
|
||
result = PipelineResult(error="Something went wrong")
|
||
assert result.error == "Something went wrong"
|
||
|
||
|
||
# =============================================
|
||
# STAGE 2: DESKEW TESTS
|
||
# =============================================
|
||
|
||
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
|
||
class TestDeskew:
|
||
"""Test deskew (rotation correction) stage."""
|
||
|
||
def test_deskew_straight_image(self, white_image):
|
||
"""A perfectly straight image should not be rotated."""
|
||
corrected, angle = deskew_image(white_image)
|
||
assert abs(angle) < 0.1
|
||
assert corrected.shape == white_image.shape
|
||
|
||
def test_deskew_returns_tuple(self, white_image):
|
||
"""deskew_image must return (image, angle) tuple."""
|
||
result = deskew_image(white_image)
|
||
assert isinstance(result, tuple)
|
||
assert len(result) == 2
|
||
assert isinstance(result[0], np.ndarray)
|
||
assert isinstance(result[1], float)
|
||
|
||
def test_deskew_preserves_shape(self, text_like_image):
|
||
"""Output image should have same shape as input."""
|
||
corrected, _ = deskew_image(text_like_image)
|
||
assert corrected.shape == text_like_image.shape
|
||
|
||
|
||
# =============================================
|
||
# STAGE 3: DEWARP TESTS
|
||
# =============================================
|
||
|
||
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
|
||
class TestDewarp:
|
||
"""Test dewarp stage (returns (image, info) tuple)."""
|
||
|
||
def test_dewarp_returns_tuple(self, white_image):
|
||
"""dewarp_image must return (image, dewarp_info) tuple."""
|
||
result = dewarp_image(white_image)
|
||
assert isinstance(result, tuple)
|
||
assert len(result) == 2
|
||
img_out, info = result
|
||
assert isinstance(img_out, np.ndarray)
|
||
assert isinstance(info, dict)
|
||
assert "shear_degrees" in info
|
||
|
||
def test_dewarp_preserves_shape(self, text_like_image):
|
||
"""Output image should have same shape as input."""
|
||
img_out, _ = dewarp_image(text_like_image)
|
||
assert img_out.shape == text_like_image.shape
|
||
|
||
def test_dewarp_white_image_no_correction(self, white_image):
|
||
"""A uniform white image should get no shear correction."""
|
||
img_out, info = dewarp_image(white_image)
|
||
assert abs(info["shear_degrees"]) < 0.5
|
||
assert img_out.shape == white_image.shape
|
||
|
||
|
||
# =============================================
|
||
# STAGE 4: IMAGE PREPARATION TESTS
|
||
# =============================================
|
||
|
||
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
|
||
class TestImagePreparation:
|
||
"""Test OCR and layout image creation."""
|
||
|
||
def test_create_ocr_image_returns_grayscale(self, text_like_image):
|
||
"""OCR image should be single-channel (binarized)."""
|
||
ocr_img = create_ocr_image(text_like_image)
|
||
assert len(ocr_img.shape) == 2 # Single channel
|
||
assert ocr_img.dtype == np.uint8
|
||
|
||
def test_create_ocr_image_is_binary(self, text_like_image):
|
||
"""OCR image should contain only 0 and 255 values."""
|
||
ocr_img = create_ocr_image(text_like_image)
|
||
unique_vals = np.unique(ocr_img)
|
||
assert all(v in [0, 255] for v in unique_vals)
|
||
|
||
def test_create_layout_image_returns_grayscale(self, text_like_image):
|
||
"""Layout image should be single-channel (CLAHE enhanced)."""
|
||
layout_img = create_layout_image(text_like_image)
|
||
assert len(layout_img.shape) == 2
|
||
assert layout_img.dtype == np.uint8
|
||
|
||
def test_create_layout_image_enhanced_contrast(self, text_like_image):
|
||
"""Layout image should have different histogram than simple grayscale."""
|
||
import cv2
|
||
gray = cv2.cvtColor(text_like_image, cv2.COLOR_BGR2GRAY)
|
||
layout_img = create_layout_image(text_like_image)
|
||
# CLAHE should change the histogram
|
||
assert layout_img.shape == gray.shape
|
||
|
||
|
||
# =============================================
|
||
# STAGE 5: LAYOUT ANALYSIS TESTS
|
||
# =============================================
|
||
|
||
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
|
||
class TestContentBounds:
|
||
"""Test _find_content_bounds helper."""
|
||
|
||
def test_empty_image(self):
|
||
"""Fully white (inverted = black) image should return full bounds."""
|
||
inv = np.zeros((200, 300), dtype=np.uint8)
|
||
left, right, top, bottom = _find_content_bounds(inv)
|
||
# With no content, bounds should span the image
|
||
assert left >= 0
|
||
assert right <= 300
|
||
assert top >= 0
|
||
assert bottom <= 200
|
||
|
||
def test_centered_content(self):
|
||
"""Content in center should give tight bounds."""
|
||
inv = np.zeros((400, 600), dtype=np.uint8)
|
||
# Add content block in center
|
||
inv[100:300, 50:550] = 255
|
||
left, right, top, bottom = _find_content_bounds(inv)
|
||
assert left <= 52 # ~50 with 2px margin
|
||
assert right >= 548 # ~550 with 2px margin
|
||
assert top <= 102
|
||
assert bottom >= 298
|
||
|
||
|
||
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
|
||
class TestLayoutAnalysis:
|
||
"""Test analyze_layout for column detection."""
|
||
|
||
def test_returns_list_of_regions(self, text_like_image):
|
||
"""analyze_layout should return a list of PageRegion."""
|
||
ocr_img = create_ocr_image(text_like_image)
|
||
layout_img = create_layout_image(text_like_image)
|
||
regions = analyze_layout(layout_img, ocr_img)
|
||
assert isinstance(regions, list)
|
||
assert all(isinstance(r, PageRegion) for r in regions)
|
||
|
||
def test_detects_columns(self, text_like_image):
|
||
"""With clear 3-column image, should detect at least 1 column."""
|
||
ocr_img = create_ocr_image(text_like_image)
|
||
layout_img = create_layout_image(text_like_image)
|
||
regions = analyze_layout(layout_img, ocr_img)
|
||
column_regions = [r for r in regions if r.type.startswith('column')]
|
||
assert len(column_regions) >= 1
|
||
|
||
def test_single_column_fallback(self):
|
||
"""Image with no clear columns should fall back to single column."""
|
||
# Uniform text across full width
|
||
img = np.ones((400, 600, 3), dtype=np.uint8) * 255
|
||
for y in range(50, 350, 20):
|
||
img[y:y+10, 20:580, :] = 30 # Full-width text
|
||
ocr_img = create_ocr_image(img)
|
||
layout_img = create_layout_image(img)
|
||
regions = analyze_layout(layout_img, ocr_img)
|
||
column_regions = [r for r in regions if r.type.startswith('column')]
|
||
# Should at least return 1 column (full page fallback)
|
||
assert len(column_regions) >= 1
|
||
|
||
def test_region_types_are_valid(self, text_like_image):
|
||
"""All region types should be from the expected set."""
|
||
ocr_img = create_ocr_image(text_like_image)
|
||
layout_img = create_layout_image(text_like_image)
|
||
regions = analyze_layout(layout_img, ocr_img)
|
||
valid_types = {'column_en', 'column_de', 'column_example',
|
||
'header', 'footer', 'margin_top', 'margin_bottom'}
|
||
for r in regions:
|
||
assert r.type in valid_types, f"Unexpected region type: {r.type}"
|
||
|
||
|
||
# =============================================
|
||
# STAGE 7: LINE GROUPING TESTS
|
||
# =============================================
|
||
|
||
class TestLineGrouping:
|
||
"""Test _group_words_into_lines function."""
|
||
|
||
def test_empty_input(self):
|
||
"""Empty word list should return empty lines."""
|
||
assert _group_words_into_lines([]) == []
|
||
|
||
def test_single_word(self):
|
||
"""Single word should return one line with one word."""
|
||
words = [{'text': 'hello', 'left': 10, 'top': 50, 'width': 50, 'height': 15, 'conf': 90}]
|
||
lines = _group_words_into_lines(words)
|
||
assert len(lines) == 1
|
||
assert len(lines[0]) == 1
|
||
assert lines[0][0]['text'] == 'hello'
|
||
|
||
def test_words_on_same_line(self):
|
||
"""Words close in Y should be grouped into one line."""
|
||
words = [
|
||
{'text': 'hello', 'left': 10, 'top': 50, 'width': 50, 'height': 15, 'conf': 90},
|
||
{'text': 'world', 'left': 70, 'top': 52, 'width': 50, 'height': 15, 'conf': 85},
|
||
]
|
||
lines = _group_words_into_lines(words, y_tolerance_px=10)
|
||
assert len(lines) == 1
|
||
assert len(lines[0]) == 2
|
||
|
||
def test_words_on_different_lines(self):
|
||
"""Words far apart in Y should be on different lines."""
|
||
words = [
|
||
{'text': 'line1', 'left': 10, 'top': 50, 'width': 50, 'height': 15, 'conf': 90},
|
||
{'text': 'line2', 'left': 10, 'top': 100, 'width': 50, 'height': 15, 'conf': 85},
|
||
{'text': 'line3', 'left': 10, 'top': 150, 'width': 50, 'height': 15, 'conf': 88},
|
||
]
|
||
lines = _group_words_into_lines(words, y_tolerance_px=20)
|
||
assert len(lines) == 3
|
||
|
||
def test_words_sorted_by_x_within_line(self):
|
||
"""Words within a line should be sorted by X position."""
|
||
words = [
|
||
{'text': 'world', 'left': 100, 'top': 50, 'width': 50, 'height': 15, 'conf': 85},
|
||
{'text': 'hello', 'left': 10, 'top': 52, 'width': 50, 'height': 15, 'conf': 90},
|
||
]
|
||
lines = _group_words_into_lines(words, y_tolerance_px=10)
|
||
assert len(lines) == 1
|
||
assert lines[0][0]['text'] == 'hello'
|
||
assert lines[0][1]['text'] == 'world'
|
||
|
||
|
||
# =============================================
|
||
# STAGE 7: VOCABULARY MATCHING TESTS
|
||
# =============================================
|
||
|
||
class TestVocabMatching:
|
||
"""Test match_lines_to_vocab function."""
|
||
|
||
def test_empty_results(self, sample_regions):
|
||
"""Empty OCR results should return empty vocab."""
|
||
vocab = match_lines_to_vocab({}, sample_regions)
|
||
assert vocab == []
|
||
|
||
def test_en_only(self, sample_words_column_en, sample_regions):
|
||
"""Only EN words should create entries with empty DE/example."""
|
||
ocr_results = {'column_en': sample_words_column_en}
|
||
vocab = match_lines_to_vocab(ocr_results, sample_regions)
|
||
assert len(vocab) == 3
|
||
for row in vocab:
|
||
assert row.english != ""
|
||
assert row.german == ""
|
||
|
||
def test_en_de_matching(self, sample_words_column_en, sample_words_column_de, sample_regions):
|
||
"""EN and DE words on same Y should be matched."""
|
||
ocr_results = {
|
||
'column_en': sample_words_column_en,
|
||
'column_de': sample_words_column_de,
|
||
}
|
||
vocab = match_lines_to_vocab(ocr_results, sample_regions, y_tolerance_px=25)
|
||
assert len(vocab) == 3
|
||
# First entry should match achieve <-> erreichen
|
||
assert vocab[0].english == 'achieve'
|
||
assert vocab[0].german == 'erreichen'
|
||
|
||
def test_full_3_column_matching(self, sample_words_column_en, sample_words_column_de,
|
||
sample_words_column_ex, sample_regions):
|
||
"""All 3 columns should be matched by Y coordinate."""
|
||
ocr_results = {
|
||
'column_en': sample_words_column_en,
|
||
'column_de': sample_words_column_de,
|
||
'column_example': sample_words_column_ex,
|
||
}
|
||
vocab = match_lines_to_vocab(ocr_results, sample_regions, y_tolerance_px=25)
|
||
assert len(vocab) >= 1
|
||
# First entry should have example text
|
||
assert vocab[0].english == 'achieve'
|
||
assert vocab[0].example != ""
|
||
|
||
def test_sorted_by_y_position(self, sample_words_column_en, sample_regions):
|
||
"""Result should be sorted by Y position."""
|
||
ocr_results = {'column_en': sample_words_column_en}
|
||
vocab = match_lines_to_vocab(ocr_results, sample_regions)
|
||
positions = [row.y_position for row in vocab]
|
||
assert positions == sorted(positions)
|
||
|
||
def test_skips_short_entries(self, sample_regions):
|
||
"""Very short text (< 2 chars) should be skipped."""
|
||
words = [
|
||
{'text': 'a', 'left': 30, 'top': 50, 'width': 10, 'height': 15, 'conf': 90, 'region_type': 'column_en'},
|
||
{'text': 'valid', 'left': 30, 'top': 80, 'width': 50, 'height': 15, 'conf': 90, 'region_type': 'column_en'},
|
||
]
|
||
ocr_results = {'column_en': words}
|
||
vocab = match_lines_to_vocab(ocr_results, sample_regions)
|
||
assert len(vocab) == 1
|
||
assert vocab[0].english == 'valid'
|
||
|
||
def test_confidence_calculation(self, sample_words_column_en, sample_words_column_de, sample_regions):
|
||
"""Confidence should be the average of matched columns."""
|
||
ocr_results = {
|
||
'column_en': sample_words_column_en,
|
||
'column_de': sample_words_column_de,
|
||
}
|
||
vocab = match_lines_to_vocab(ocr_results, sample_regions, y_tolerance_px=25)
|
||
# First entry: EN conf=90, DE conf=88 → avg=89
|
||
assert vocab[0].confidence > 0
|
||
assert vocab[0].confidence == pytest.approx(89.0, abs=1.0)
|
||
|
||
|
||
# =============================================
|
||
# ORCHESTRATOR TESTS
|
||
# =============================================
|
||
|
||
class TestOrchestrator:
|
||
"""Test run_cv_pipeline orchestrator."""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_no_input_returns_error(self):
|
||
"""Pipeline without input should return error."""
|
||
result = await run_cv_pipeline()
|
||
assert result.error is not None
|
||
assert "No input data" in result.error
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_pipeline_unavailable(self):
|
||
"""When CV_PIPELINE_AVAILABLE is False, should return error."""
|
||
with patch('cv_vocab_pipeline.CV_PIPELINE_AVAILABLE', False):
|
||
result = await run_cv_pipeline(pdf_data=b"fake")
|
||
assert result.error is not None
|
||
assert "not available" in result.error
|
||
|
||
@pytest.mark.asyncio
|
||
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
|
||
async def test_pipeline_with_image_data(self):
|
||
"""Pipeline with a real synthetic image should run without errors."""
|
||
import cv2
|
||
# Create a simple test image (white with some text-like black bars)
|
||
img = np.ones((200, 300, 3), dtype=np.uint8) * 255
|
||
for y in range(30, 170, 25):
|
||
img[y:y+12, 20:280, :] = 30
|
||
_, img_bytes = cv2.imencode('.png', img)
|
||
image_data = img_bytes.tobytes()
|
||
|
||
with patch('cv_vocab_pipeline.pytesseract') as mock_tess:
|
||
# Mock Tesseract to return empty results
|
||
mock_tess.image_to_data.return_value = {
|
||
'text': [], 'conf': [], 'left': [], 'top': [],
|
||
'width': [], 'height': [],
|
||
}
|
||
mock_tess.Output.DICT = 'dict'
|
||
|
||
result = await run_cv_pipeline(image_data=image_data)
|
||
assert result.error is None
|
||
assert result.image_width == 300
|
||
assert result.image_height == 200
|
||
assert 'render' in result.stages
|
||
assert 'deskew' in result.stages
|
||
|
||
@pytest.mark.asyncio
|
||
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
|
||
async def test_pipeline_records_timing(self):
|
||
"""Pipeline should record timing for each stage."""
|
||
import cv2
|
||
img = np.ones((100, 150, 3), dtype=np.uint8) * 255
|
||
_, img_bytes = cv2.imencode('.png', img)
|
||
|
||
with patch('cv_vocab_pipeline.pytesseract') as mock_tess:
|
||
mock_tess.image_to_data.return_value = {
|
||
'text': [], 'conf': [], 'left': [], 'top': [],
|
||
'width': [], 'height': [],
|
||
}
|
||
mock_tess.Output.DICT = 'dict'
|
||
|
||
result = await run_cv_pipeline(image_data=img_bytes.tobytes())
|
||
assert result.duration_seconds >= 0
|
||
assert all(v >= 0 for v in result.stages.values())
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_pipeline_result_format(self):
|
||
"""PipelineResult vocabulary should be list of dicts with expected keys."""
|
||
result = PipelineResult()
|
||
result.vocabulary = [
|
||
{"english": "test", "german": "Test", "example": "A test.", "confidence": 90.0}
|
||
]
|
||
assert len(result.vocabulary) == 1
|
||
entry = result.vocabulary[0]
|
||
assert "english" in entry
|
||
assert "german" in entry
|
||
assert "example" in entry
|
||
assert "confidence" in entry
|
||
|
||
|
||
# =============================================
|
||
# INTEGRATION-STYLE TESTS (with mocked Tesseract)
|
||
# =============================================
|
||
|
||
@pytest.mark.skipif(not CV2_AVAILABLE, reason="OpenCV not available")
|
||
class TestStageIntegration:
|
||
"""Test multiple stages together (still unit-test level with mocked OCR)."""
|
||
|
||
def test_image_prep_to_layout(self, text_like_image):
|
||
"""Stages 4→5: image prep feeds layout analysis correctly."""
|
||
ocr_img = create_ocr_image(text_like_image)
|
||
layout_img = create_layout_image(text_like_image)
|
||
|
||
assert ocr_img.shape[:2] == text_like_image.shape[:2]
|
||
assert layout_img.shape[:2] == text_like_image.shape[:2]
|
||
|
||
regions = analyze_layout(layout_img, ocr_img)
|
||
assert len(regions) >= 1
|
||
|
||
def test_deskew_to_image_prep(self, text_like_image):
|
||
"""Stages 2→4: deskew output can be processed by image prep."""
|
||
corrected, angle = deskew_image(text_like_image)
|
||
ocr_img = create_ocr_image(corrected)
|
||
layout_img = create_layout_image(corrected)
|
||
assert ocr_img.shape[:2] == corrected.shape[:2]
|
||
assert layout_img.shape[:2] == corrected.shape[:2]
|
||
|
||
|
||
# =============================================
|
||
# NOISE FILTER TESTS
|
||
# =============================================
|
||
|
||
class TestNoiseFilter:
|
||
"""Test _is_noise_tail_token for trailing OCR noise detection."""
|
||
|
||
# --- Tokens that should be KEPT (return False) ---
|
||
|
||
@pytest.mark.parametrize("token", [
|
||
# Compound words with hyphens
|
||
"money-saver",
|
||
"under-",
|
||
"well-known",
|
||
# Words with parenthesized parts (dictionary entries)
|
||
"Schild(chen)",
|
||
"(Salat-)Gurke",
|
||
"(auf)",
|
||
"(on)",
|
||
"selbst)",
|
||
"(wir",
|
||
"Tanz(veranstaltung)",
|
||
"(zer)brechen",
|
||
# Phonetic brackets
|
||
"serva]",
|
||
"['mani",
|
||
"[eg]",
|
||
"[maus]",
|
||
# Words with trailing punctuation
|
||
"cupcakes.",
|
||
"sister.",
|
||
"mice",
|
||
# Abbreviations
|
||
"e.g.",
|
||
"sth.",
|
||
"usw.",
|
||
"adj.",
|
||
# Ellipsis
|
||
"...",
|
||
"\u2026",
|
||
# Regular words
|
||
"the",
|
||
"cat",
|
||
"big",
|
||
"run",
|
||
"set",
|
||
"ago",
|
||
])
|
||
def test_keep_real_tokens(self, token):
|
||
"""Real words, dictionary punctuation, and phonetic brackets are kept."""
|
||
assert _is_noise_tail_token(token) is False, f"Should keep {token!r}"
|
||
|
||
# --- Tokens that should be FILTERED (return True) ---
|
||
|
||
@pytest.mark.parametrize("token", [
|
||
# Pure non-alpha
|
||
"B|",
|
||
"3d",
|
||
"x7",
|
||
")",
|
||
"|",
|
||
"@",
|
||
"3",
|
||
# Very short non-dictionary fragments
|
||
"ee",
|
||
"k",
|
||
"zz",
|
||
"qq",
|
||
# Empty
|
||
"",
|
||
" ",
|
||
])
|
||
def test_filter_noise_tokens(self, token):
|
||
"""OCR noise fragments are filtered."""
|
||
assert _is_noise_tail_token(token) is True, f"Should filter {token!r}"
|
||
|
||
|
||
class TestCleanCellText:
|
||
"""Test _clean_cell_text integration (full text → cleaned text)."""
|
||
|
||
def test_empty_returns_empty(self):
|
||
assert _clean_cell_text("") == ""
|
||
assert _clean_cell_text(" ") == ""
|
||
|
||
def test_real_word_unchanged(self):
|
||
assert _clean_cell_text("cupcakes") == "cupcakes"
|
||
|
||
def test_strips_trailing_noise(self):
|
||
"""Trailing noise tokens should be removed."""
|
||
result = _clean_cell_text("cupcakes B|")
|
||
assert result == "cupcakes"
|
||
|
||
def test_keeps_trailing_real_word(self):
|
||
"""Trailing real words should be kept."""
|
||
result = _clean_cell_text("big cat")
|
||
assert result == "big cat"
|
||
|
||
def test_abbreviation_kept(self):
|
||
"""Known abbreviations should not be cleared."""
|
||
result = _clean_cell_text("e.g.")
|
||
assert result == "e.g."
|
||
|
||
def test_pure_garbage_cleared(self):
|
||
"""OCR garbage without real words should be cleared."""
|
||
result = _clean_cell_text("3d |x")
|
||
assert result == ""
|
||
|
||
def test_compound_word_preserved(self):
|
||
"""Compound words with hyphens should be preserved."""
|
||
result = _clean_cell_text("money-saver")
|
||
assert result == "money-saver"
|
||
|
||
def test_parenthesized_word_preserved(self):
|
||
result = _clean_cell_text("(Salat-)Gurke")
|
||
assert result == "(Salat-)Gurke"
|
||
|
||
def test_multiple_trailing_noise(self):
|
||
"""Multiple trailing noise tokens should all be removed."""
|
||
result = _clean_cell_text("achieve 3 |")
|
||
assert result == "achieve"
|
||
|
||
|
||
class TestPhoneticOnlyText:
|
||
"""Test _is_phonetic_only_text for phonetic transcription detection."""
|
||
|
||
@pytest.mark.parametrize("text,expected", [
|
||
# Phonetic-only patterns → True
|
||
("['mani serva]", True),
|
||
("[dɑːns]", True),
|
||
("[\"a:mand]", True),
|
||
("['wɜːkʃɒp]", True),
|
||
# serva] has 5 alpha chars after bracket removal → NOT phonetic-only
|
||
("serva]", False),
|
||
# NOT phonetic-only → False
|
||
("almond ['a:mand]", False),
|
||
("Mandel", False),
|
||
("cupcakes", False),
|
||
("", False),
|
||
("achieve", False),
|
||
("money-saver ['mani]", False),
|
||
])
|
||
def test_phonetic_detection(self, text, expected):
|
||
assert _is_phonetic_only_text(text) is expected, \
|
||
f"_is_phonetic_only_text({text!r}) should be {expected}"
|
||
|
||
|
||
class TestMergePhoneticContinuationRows:
|
||
"""Test _merge_phonetic_continuation_rows for phonetic row merging."""
|
||
|
||
def test_empty_list(self):
|
||
assert _merge_phonetic_continuation_rows([]) == []
|
||
|
||
def test_single_entry(self):
|
||
entries = [{"english": "cat", "german": "Katze", "example": ""}]
|
||
result = _merge_phonetic_continuation_rows(entries)
|
||
assert len(result) == 1
|
||
assert result[0]["english"] == "cat"
|
||
|
||
def test_merges_phonetic_row(self):
|
||
"""Phonetic-only row should merge into previous entry."""
|
||
entries = [
|
||
{"english": "money-saver", "german": "Sparfuchs", "example": "", "row_index": 0},
|
||
{"english": "['mani serva]", "german": "", "example": "", "row_index": 1},
|
||
]
|
||
result = _merge_phonetic_continuation_rows(entries)
|
||
assert len(result) == 1
|
||
assert result[0]["english"] == "money-saver ['mani serva]"
|
||
assert result[0]["german"] == "Sparfuchs"
|
||
|
||
def test_no_merge_when_de_present(self):
|
||
"""Row with DE text should NOT be merged even if EN looks phonetic."""
|
||
entries = [
|
||
{"english": "cat", "german": "Katze", "example": ""},
|
||
{"english": "[kæt]", "german": "some text", "example": ""},
|
||
]
|
||
result = _merge_phonetic_continuation_rows(entries)
|
||
assert len(result) == 2
|
||
|
||
def test_no_merge_regular_rows(self):
|
||
"""Normal vocab rows should not be merged."""
|
||
entries = [
|
||
{"english": "cat", "german": "Katze", "example": ""},
|
||
{"english": "dog", "german": "Hund", "example": ""},
|
||
]
|
||
result = _merge_phonetic_continuation_rows(entries)
|
||
assert len(result) == 2
|
||
|
||
def test_merges_example_too(self):
|
||
"""If phonetic row has example text, it should merge into previous."""
|
||
entries = [
|
||
{"english": "dance", "german": "tanzen", "example": "", "row_index": 0},
|
||
{"english": "[dɑːns]", "german": "", "example": "Let's dance.", "row_index": 1},
|
||
]
|
||
result = _merge_phonetic_continuation_rows(entries)
|
||
assert len(result) == 1
|
||
assert result[0]["english"] == "dance [dɑːns]"
|
||
assert result[0]["example"] == "Let's dance."
|
||
|
||
|
||
class TestMergeContinuationRows:
|
||
"""Test _merge_continuation_rows for multi-line entry merging."""
|
||
|
||
def test_empty_list(self):
|
||
assert _merge_continuation_rows([]) == []
|
||
|
||
def test_no_merge_independent_rows(self):
|
||
"""Rows with both EN and DE should not be merged."""
|
||
entries = [
|
||
{"english": "cat", "german": "Katze", "example": "", "row_index": 0},
|
||
{"english": "dog", "german": "Hund", "example": "", "row_index": 1},
|
||
]
|
||
result = _merge_continuation_rows(entries)
|
||
assert len(result) == 2
|
||
|
||
def test_merge_lowercase_continuation(self):
|
||
"""Lowercase EN with empty DE should merge into previous."""
|
||
entries = [
|
||
{"english": "to put up", "german": "aufstellen", "example": "", "row_index": 0},
|
||
{"english": "with sth.", "german": "", "example": "", "row_index": 1},
|
||
]
|
||
result = _merge_continuation_rows(entries)
|
||
assert len(result) == 1
|
||
assert result[0]["english"] == "to put up with sth."
|
||
assert result[0]["german"] == "aufstellen"
|
||
|
||
def test_no_merge_uppercase_start(self):
|
||
"""EN starting with uppercase and empty DE is likely its own entry, not a continuation."""
|
||
entries = [
|
||
{"english": "cat", "german": "Katze", "example": "", "row_index": 0},
|
||
{"english": "Dog", "german": "", "example": "", "row_index": 1},
|
||
]
|
||
result = _merge_continuation_rows(entries)
|
||
assert len(result) == 2
|
||
|
||
def test_no_merge_when_previous_ends_with_period(self):
|
||
"""If previous entry ends with sentence terminator, next is not continuation."""
|
||
entries = [
|
||
{"english": "That's great.", "german": "Das ist toll.", "example": "", "row_index": 0},
|
||
{"english": "really nice", "german": "", "example": "", "row_index": 1},
|
||
]
|
||
result = _merge_continuation_rows(entries)
|
||
assert len(result) == 2
|
||
|
||
def test_no_merge_long_text(self):
|
||
"""Text with 4+ words is likely an example sentence, not continuation."""
|
||
entries = [
|
||
{"english": "achieve", "german": "erreichen", "example": "", "row_index": 0},
|
||
{"english": "she achieved her goals", "german": "", "example": "", "row_index": 1},
|
||
]
|
||
result = _merge_continuation_rows(entries)
|
||
assert len(result) == 2
|
||
|
||
def test_first_entry_not_merged(self):
|
||
"""First entry with empty DE should not crash (no previous)."""
|
||
entries = [
|
||
{"english": "something", "german": "", "example": "", "row_index": 0},
|
||
{"english": "cat", "german": "Katze", "example": "", "row_index": 1},
|
||
]
|
||
result = _merge_continuation_rows(entries)
|
||
assert len(result) == 2
|
||
|
||
|
||
# =============================================
|
||
# Test: Content-Bounds Scan-Artifact Filtering
|
||
# =============================================
|
||
|
||
class TestContentBoundsFiltering:
|
||
"""Test that _find_content_bounds filters narrow scan artifacts."""
|
||
|
||
def test_thin_vertical_line_ignored(self):
|
||
"""A 2px black line at the left edge should not pull left_x leftward."""
|
||
inv = np.zeros((400, 600), dtype=np.uint8)
|
||
# Main content block in the middle
|
||
inv[50:350, 100:550] = 255
|
||
# 2px thin vertical scan artifact at x=5..6
|
||
inv[50:350, 5:7] = 255
|
||
|
||
left, right, top, bottom = _find_content_bounds(inv)
|
||
# left_x must be near 100 (the real content), not near 5
|
||
assert left >= 90, f"left_x={left} should be >=90 (near real content, not artifact)"
|
||
|
||
def test_thick_content_preserved(self):
|
||
"""A 50px wide text block is real content and must not be filtered."""
|
||
inv = np.zeros((400, 600), dtype=np.uint8)
|
||
inv[50:350, 80:130] = 255 # 50px wide block
|
||
inv[50:350, 200:500] = 255 # wider block
|
||
|
||
left, right, top, bottom = _find_content_bounds(inv)
|
||
assert left <= 82, f"left_x={left} should be <=82 (50px block is real content)"
|
||
|
||
def test_no_artifacts_unchanged(self):
|
||
"""Normal image without artifacts: bounds should match content."""
|
||
inv = np.zeros((400, 600), dtype=np.uint8)
|
||
inv[100:300, 50:550] = 255
|
||
|
||
left, right, top, bottom = _find_content_bounds(inv)
|
||
assert left <= 52
|
||
assert right >= 548
|
||
assert top <= 105
|
||
assert bottom >= 295
|
||
|
||
def test_right_edge_artifact_ignored(self):
|
||
"""A thin vertical line at the right edge should not pull right_x rightward."""
|
||
inv = np.zeros((400, 600), dtype=np.uint8)
|
||
inv[50:350, 50:500] = 255 # real content
|
||
inv[50:350, 595:598] = 255 # 3px artifact at right edge
|
||
|
||
left, right, top, bottom = _find_content_bounds(inv)
|
||
assert right <= 510, f"right_x={right} should be <=510, ignoring right-edge artifact"
|
||
|
||
def test_horizontal_line_ignored(self):
|
||
"""A thin horizontal line at the top should not pull top_y upward."""
|
||
inv = np.zeros((400, 600), dtype=np.uint8)
|
||
inv[100:350, 50:550] = 255 # real content
|
||
inv[2:4, 50:550] = 255 # 2px horizontal artifact at top
|
||
|
||
left, right, top, bottom = _find_content_bounds(inv)
|
||
assert top >= 90, f"top_y={top} should be >=90 (ignoring thin top line)"
|
||
|
||
|
||
class TestFilterNarrowRuns:
|
||
"""Test the _filter_narrow_runs helper directly."""
|
||
|
||
def test_removes_short_run(self):
|
||
mask = np.array([False, True, True, False, True, True, True, True, True, False])
|
||
result = _filter_narrow_runs(mask, min_width=3)
|
||
# The 2-wide run at indices 1-2 should be removed
|
||
assert not result[1]
|
||
assert not result[2]
|
||
# The 5-wide run at indices 4-8 should remain
|
||
assert result[4]
|
||
assert result[8]
|
||
|
||
def test_keeps_wide_run(self):
|
||
mask = np.array([True] * 10)
|
||
result = _filter_narrow_runs(mask, min_width=5)
|
||
assert all(result)
|
||
|
||
def test_all_narrow(self):
|
||
mask = np.array([True, True, False, True, False])
|
||
result = _filter_narrow_runs(mask, min_width=3)
|
||
assert not any(result)
|
||
|
||
|
||
# =============================================
|
||
# Test: Margin Regions
|
||
# =============================================
|
||
|
||
class TestMarginRegions:
|
||
"""Test _build_margin_regions and margin integration."""
|
||
|
||
def test_margin_left_created(self):
|
||
"""When left_x > 5, a margin_left region should be created."""
|
||
existing = [
|
||
PageRegion(type='column_en', x=100, y=50, width=200, height=300),
|
||
PageRegion(type='column_de', x=320, y=50, width=200, height=300),
|
||
]
|
||
margins = _build_margin_regions(existing, left_x=100, right_x=520,
|
||
img_w=600, top_y=50, content_h=300)
|
||
left_margins = [m for m in margins if m.type == 'margin_left']
|
||
assert len(left_margins) == 1
|
||
ml = left_margins[0]
|
||
assert ml.x == 0
|
||
assert ml.width == 100
|
||
|
||
def test_margin_right_created(self):
|
||
"""When there's space after the last column, margin_right should be created."""
|
||
existing = [
|
||
PageRegion(type='column_en', x=50, y=50, width=200, height=300),
|
||
PageRegion(type='column_de', x=260, y=50, width=200, height=300),
|
||
]
|
||
# last_col_end = 260 + 200 = 460, img_w = 600 → gap = 140
|
||
margins = _build_margin_regions(existing, left_x=50, right_x=460,
|
||
img_w=600, top_y=50, content_h=300)
|
||
right_margins = [m for m in margins if m.type == 'margin_right']
|
||
assert len(right_margins) == 1
|
||
mr = right_margins[0]
|
||
assert mr.x == 460
|
||
assert mr.width == 140
|
||
|
||
def test_no_margin_when_flush(self):
|
||
"""When columns are flush with the image edges, no margins should appear."""
|
||
existing = [
|
||
PageRegion(type='column_en', x=0, y=0, width=300, height=400),
|
||
PageRegion(type='column_de', x=300, y=0, width=300, height=400),
|
||
]
|
||
margins = _build_margin_regions(existing, left_x=0, right_x=600,
|
||
img_w=600, top_y=0, content_h=400)
|
||
assert len(margins) == 0
|
||
|
||
def test_margins_in_skip_types(self):
|
||
"""Verify margin types are in the skip set used by build_cell_grid."""
|
||
skip = {'column_ignore', 'header', 'footer', 'margin_top', 'margin_bottom', 'page_ref', 'margin_left', 'margin_right'}
|
||
assert 'margin_left' in skip
|
||
assert 'margin_right' in skip
|
||
|
||
def test_margin_confidence_and_method(self):
|
||
"""Margin regions should have confidence 1.0 and method 'content_bounds'."""
|
||
existing = [PageRegion(type='column_en', x=80, y=20, width=400, height=500)]
|
||
margins = _build_margin_regions(existing, left_x=80, right_x=480,
|
||
img_w=600, top_y=20, content_h=500)
|
||
for m in margins:
|
||
assert m.classification_confidence == 1.0
|
||
assert m.classification_method == 'content_bounds'
|
||
|
||
|
||
# =============================================
|
||
# Header/Footer Gap Detection
|
||
# =============================================
|
||
|
||
class TestHeaderFooterGapDetection:
|
||
"""Tests for _detect_header_footer_gaps()."""
|
||
|
||
def _make_inv(self, height: int, width: int, bands: list) -> np.ndarray:
|
||
"""Create an inverted binary image with white horizontal bands.
|
||
|
||
Args:
|
||
height: Image height.
|
||
width: Image width.
|
||
bands: List of (y_start, y_end) tuples where pixels are white (255).
|
||
"""
|
||
inv = np.zeros((height, width), dtype=np.uint8)
|
||
for y1, y2 in bands:
|
||
inv[y1:y2, :] = 255
|
||
return inv
|
||
|
||
def _make_body_with_lines(self, h, w, body_start, body_end,
|
||
line_h=15, gap_h=12):
|
||
"""Create bands simulating text lines with inter-line gaps.
|
||
|
||
gap_h must be large enough to survive smoothing (kernel ~ h//200).
|
||
"""
|
||
bands = []
|
||
y = body_start
|
||
while y + line_h <= body_end:
|
||
bands.append((y, y + line_h))
|
||
y += line_h + gap_h
|
||
return bands
|
||
|
||
def test_header_gap_detected(self):
|
||
"""Content at top + large gap + main body → header_y at the gap."""
|
||
h, w = 2000, 800
|
||
# Header content at rows 20-80
|
||
bands = [(20, 80)]
|
||
# Large gap 80-300 (220px) — much larger than 12px line gaps
|
||
# Body lines from 300 to ~1990 (extends near bottom, no footer gap)
|
||
bands += self._make_body_with_lines(h, w, 300, 1990)
|
||
inv = self._make_inv(h, w, bands)
|
||
header_y, footer_y = _detect_header_footer_gaps(inv, w, h)
|
||
assert header_y is not None
|
||
assert 80 <= header_y <= 310
|
||
|
||
def test_footer_gap_detected(self):
|
||
"""Main body + large gap + page number → footer_y at the gap."""
|
||
h, w = 2000, 800
|
||
# Body lines from 10 to 1600 (starts near top, no header gap)
|
||
bands = self._make_body_with_lines(h, w, 10, 1600)
|
||
# Large gap 1600-1880 (280px)
|
||
# Page number 1880-1920
|
||
bands.append((1880, 1920))
|
||
inv = self._make_inv(h, w, bands)
|
||
header_y, footer_y = _detect_header_footer_gaps(inv, w, h)
|
||
assert footer_y is not None
|
||
assert 1580 <= footer_y <= 1890
|
||
|
||
def test_both_header_and_footer(self):
|
||
"""Header + gap + body lines + gap + footer → both detected."""
|
||
h, w = 2000, 800
|
||
# Header 10-60
|
||
bands = [(10, 60)]
|
||
# Large gap 60-250 (190px)
|
||
# Body lines from 250 to 1700
|
||
bands += self._make_body_with_lines(h, w, 250, 1700)
|
||
# Large gap 1700-1900 (200px)
|
||
# Footer 1900-1970
|
||
bands.append((1900, 1970))
|
||
inv = self._make_inv(h, w, bands)
|
||
header_y, footer_y = _detect_header_footer_gaps(inv, w, h)
|
||
assert header_y is not None
|
||
assert footer_y is not None
|
||
assert 60 <= header_y <= 260
|
||
assert 1690 <= footer_y <= 1910
|
||
|
||
def test_no_gaps_returns_none(self):
|
||
"""Uniform content across the page → (None, None)."""
|
||
h, w = 1000, 800
|
||
# Content across entire height
|
||
inv = self._make_inv(h, w, [(0, 1000)])
|
||
header_y, footer_y = _detect_header_footer_gaps(inv, w, h)
|
||
assert header_y is None
|
||
assert footer_y is None
|
||
|
||
def test_small_gaps_ignored(self):
|
||
"""Gaps smaller than 2x median should be ignored."""
|
||
h, w = 1000, 800
|
||
# Many small, evenly-spaced gaps (like line spacing) — no large outlier
|
||
bands = []
|
||
for row_start in range(0, 1000, 20):
|
||
bands.append((row_start, row_start + 15)) # 15px content, 5px gap
|
||
inv = self._make_inv(h, w, bands)
|
||
header_y, footer_y = _detect_header_footer_gaps(inv, w, h)
|
||
# All gaps are equal size, none > 2x median → no header/footer
|
||
assert header_y is None
|
||
assert footer_y is None
|
||
|
||
def test_edge_gaps_ignored_dewarp_padding(self):
|
||
"""Trailing gap at bottom edge (dewarp padding) should not be detected as footer."""
|
||
h, w = 2000, 800
|
||
# Body lines from 10 to 1700
|
||
bands = self._make_body_with_lines(h, w, 10, 1700)
|
||
# Gap from 1700 to 2000 = bottom edge padding (no content after)
|
||
inv = self._make_inv(h, w, bands)
|
||
header_y, footer_y = _detect_header_footer_gaps(inv, w, h)
|
||
# The trailing gap touches the image edge → not a valid separator
|
||
assert footer_y is None
|
||
|
||
|
||
class TestRegionContentCheck:
|
||
"""Tests for _region_has_content() and _add_header_footer() type selection."""
|
||
|
||
def _make_inv(self, height: int, width: int, bands: list) -> np.ndarray:
|
||
inv = np.zeros((height, width), dtype=np.uint8)
|
||
for y1, y2 in bands:
|
||
inv[y1:y2, :] = 255
|
||
return inv
|
||
|
||
def test_region_with_text_has_content(self):
|
||
"""Strip with ink → True."""
|
||
inv = self._make_inv(1000, 800, [(10, 50)])
|
||
assert _region_has_content(inv, 0, 100) is True
|
||
|
||
def test_empty_region_no_content(self):
|
||
"""Strip without ink → False."""
|
||
inv = self._make_inv(1000, 800, [(500, 600)])
|
||
assert _region_has_content(inv, 0, 100) is False
|
||
|
||
def test_header_with_text_is_header(self):
|
||
"""Top region with text → type='header' (via content bounds fallback)."""
|
||
h, w = 1000, 800
|
||
# Header text at 20-60, body starts at 200
|
||
inv = self._make_inv(h, w, [(20, 60), (200, 900)])
|
||
regions: list = []
|
||
# Simulate content bounds detecting body start at y=200
|
||
_add_header_footer(regions, top_y=200, bottom_y=h, img_w=w, img_h=h, inv=inv)
|
||
top_regions = [r for r in regions if r.type in ('header', 'margin_top')]
|
||
assert len(top_regions) == 1
|
||
assert top_regions[0].type == 'header' # text at 20-60 → header
|
||
|
||
def test_empty_top_is_margin_top(self):
|
||
"""Top region without text → type='margin_top'."""
|
||
h, w = 1000, 800
|
||
# Content only in body area (200-900), nothing in top 200px
|
||
inv = self._make_inv(h, w, [(200, 900)])
|
||
regions: list = []
|
||
# Simulate top_y=200 from content bounds
|
||
_add_header_footer(regions, top_y=200, bottom_y=h, img_w=w, img_h=h, inv=inv)
|
||
top_regions = [r for r in regions if r.type in ('header', 'margin_top')]
|
||
assert len(top_regions) == 1
|
||
assert top_regions[0].type == 'margin_top'
|
||
|
||
def test_empty_bottom_is_margin_bottom(self):
|
||
"""Bottom region without text → type='margin_bottom'."""
|
||
h, w = 1000, 800
|
||
# Content only in top/body (50-700), nothing below 700
|
||
inv = self._make_inv(h, w, [(50, 700)])
|
||
regions: list = []
|
||
_add_header_footer(regions, top_y=50, bottom_y=700, img_w=w, img_h=h, inv=inv)
|
||
bottom_regions = [r for r in regions if r.type in ('footer', 'margin_bottom')]
|
||
assert len(bottom_regions) == 1
|
||
assert bottom_regions[0].type == 'margin_bottom'
|
||
|
||
def test_footer_with_page_number_is_footer(self):
|
||
"""Bottom region with page number text → type='footer'."""
|
||
h, w = 1000, 800
|
||
# Body 50-700, page number at 900-930
|
||
inv = self._make_inv(h, w, [(50, 700), (900, 930)])
|
||
regions: list = []
|
||
_add_header_footer(regions, top_y=50, bottom_y=700, img_w=w, img_h=h, inv=inv)
|
||
bottom_regions = [r for r in regions if r.type in ('footer', 'margin_bottom')]
|
||
assert len(bottom_regions) == 1
|
||
assert bottom_regions[0].type == 'footer'
|
||
|
||
|
||
# =============================================
|
||
# Sub-Column Detection Tests
|
||
# =============================================
|
||
|
||
class TestSubColumnDetection:
|
||
"""Tests for _detect_sub_columns() left-edge alignment detection."""
|
||
|
||
def _make_word(self, left: int, text: str = "word", conf: int = 90) -> dict:
|
||
return {'left': left, 'top': 100, 'width': 50, 'height': 20,
|
||
'text': text, 'conf': conf}
|
||
|
||
def _make_geo(self, x: int, width: int, words: list, content_w: int = 1000) -> ColumnGeometry:
|
||
return ColumnGeometry(
|
||
index=0, x=x, y=50, width=width, height=500,
|
||
word_count=len(words), words=words,
|
||
width_ratio=width / content_w,
|
||
)
|
||
|
||
def test_sub_column_split_page_refs(self):
|
||
"""3 page-refs left + 40 vocab words right → split into 2.
|
||
|
||
The leftmost bin with >= 10% of words (>= 5) is the vocab bin
|
||
at left=250, so the 3 page-refs are outliers.
|
||
"""
|
||
content_w = 1000
|
||
page_words = [self._make_word(100, f"p.{59+i}") for i in range(3)]
|
||
vocab_words = [self._make_word(250, f"word{i}") for i in range(40)]
|
||
all_words = page_words + vocab_words
|
||
geo = self._make_geo(x=80, width=300, words=all_words, content_w=content_w)
|
||
|
||
result = _detect_sub_columns([geo], content_w)
|
||
|
||
assert len(result) == 2, f"Expected 2 columns, got {len(result)}"
|
||
left_col = result[0]
|
||
right_col = result[1]
|
||
assert left_col.x < right_col.x
|
||
assert left_col.word_count == 3
|
||
assert right_col.word_count == 40
|
||
assert left_col.index == 0
|
||
assert right_col.index == 1
|
||
|
||
def test_sub_column_split_exclamation_marks(self):
|
||
"""5 '!' (misread as I/|) left + 80 example words → split into 2.
|
||
|
||
Mirrors the real-world case where red ! marks are OCR'd as I, |, B, 1
|
||
at a position slightly left of the example sentence start.
|
||
"""
|
||
content_w = 1500
|
||
bang_words = [self._make_word(950 + i, chr(ord('I')), conf=60) for i in range(5)]
|
||
example_words = [self._make_word(975 + (i * 3), f"word{i}") for i in range(80)]
|
||
all_words = bang_words + example_words
|
||
geo = self._make_geo(x=940, width=530, words=all_words, content_w=content_w)
|
||
|
||
result = _detect_sub_columns([geo], content_w)
|
||
|
||
assert len(result) == 2
|
||
assert result[0].word_count == 5
|
||
assert result[1].word_count == 80
|
||
|
||
def test_no_split_uniform_alignment(self):
|
||
"""All words aligned at same position → no change."""
|
||
content_w = 1000
|
||
words = [self._make_word(200, f"word{i}") for i in range(15)]
|
||
geo = self._make_geo(x=180, width=300, words=words, content_w=content_w)
|
||
|
||
result = _detect_sub_columns([geo], content_w)
|
||
|
||
assert len(result) == 1
|
||
assert result[0].word_count == 15
|
||
|
||
def test_no_split_narrow_column(self):
|
||
"""Narrow column (width_ratio < 0.15) → no split attempted."""
|
||
content_w = 1000
|
||
words = [self._make_word(50, "a")] * 3 + [self._make_word(120, "b")] * 10
|
||
geo = self._make_geo(x=40, width=140, words=words, content_w=content_w)
|
||
|
||
result = _detect_sub_columns([geo], content_w)
|
||
|
||
assert len(result) == 1
|
||
|
||
def test_no_split_balanced_clusters(self):
|
||
"""Both clusters similarly sized (ratio >= 0.35) → no split."""
|
||
content_w = 1000
|
||
left_words = [self._make_word(100, f"a{i}") for i in range(8)]
|
||
right_words = [self._make_word(300, f"b{i}") for i in range(12)]
|
||
all_words = left_words + right_words
|
||
geo = self._make_geo(x=80, width=400, words=all_words, content_w=content_w)
|
||
|
||
result = _detect_sub_columns([geo], content_w)
|
||
|
||
assert len(result) == 1
|
||
|
||
def test_sub_column_reindexing(self):
|
||
"""After split, indices are correctly 0, 1, 2 across all columns."""
|
||
content_w = 1000
|
||
# First column: no split (all words at same alignment)
|
||
words1 = [self._make_word(50, f"de{i}") for i in range(10)]
|
||
geo1 = ColumnGeometry(index=0, x=30, y=50, width=200, height=500,
|
||
word_count=10, words=words1, width_ratio=0.2)
|
||
# Second column: will split (3 outliers + 40 main)
|
||
page_words = [self._make_word(400, f"p.{i}") for i in range(3)]
|
||
en_words = [self._make_word(550, f"en{i}") for i in range(40)]
|
||
geo2 = ColumnGeometry(index=1, x=380, y=50, width=300, height=500,
|
||
word_count=43, words=page_words + en_words, width_ratio=0.3)
|
||
|
||
result = _detect_sub_columns([geo1, geo2], content_w)
|
||
|
||
assert len(result) == 3
|
||
assert [g.index for g in result] == [0, 1, 2]
|
||
assert result[0].word_count == 10
|
||
assert result[1].word_count == 3
|
||
assert result[2].word_count == 40
|
||
|
||
def test_no_split_too_few_words(self):
|
||
"""Column with fewer than 5 words → no split attempted."""
|
||
content_w = 1000
|
||
words = [self._make_word(100, "a"), self._make_word(300, "b"),
|
||
self._make_word(300, "c"), self._make_word(300, "d")]
|
||
geo = self._make_geo(x=80, width=300, words=words, content_w=content_w)
|
||
|
||
result = _detect_sub_columns([geo], content_w)
|
||
|
||
assert len(result) == 1
|
||
|
||
def test_no_split_single_minority_word(self):
|
||
"""Only 1 word left of column start → no split (need >= 2)."""
|
||
content_w = 1000
|
||
minority = [self._make_word(100, "p.59")]
|
||
majority = [self._make_word(300, f"w{i}") for i in range(30)]
|
||
geo = self._make_geo(x=80, width=350, words=minority + majority, content_w=content_w)
|
||
|
||
result = _detect_sub_columns([geo], content_w)
|
||
|
||
assert len(result) == 1
|
||
|
||
def test_sub_column_split_with_left_x_offset(self):
|
||
"""Word 'left' values are relative to left_x; geo.x is absolute.
|
||
|
||
Real-world scenario: left_x=195, EN column at geo.x=310.
|
||
Page refs at relative left=115-157, vocab words at relative left=216.
|
||
Without left_x, split_x would be ~202 (< geo.x=310) → negative width → no split.
|
||
With left_x=195, split_abs = 202 + 195 = 397, which is between geo.x(310)
|
||
and geo.x+geo.width(748) → valid split.
|
||
"""
|
||
content_w = 1469
|
||
left_x = 195
|
||
page_refs = [self._make_word(115, "p.59"), self._make_word(157, "p.60"),
|
||
self._make_word(157, "p.61")]
|
||
vocab = [self._make_word(216, f"word{i}") for i in range(40)]
|
||
all_words = page_refs + vocab
|
||
geo = self._make_geo(x=310, width=438, words=all_words, content_w=content_w)
|
||
|
||
result = _detect_sub_columns([geo], content_w, left_x=left_x)
|
||
|
||
assert len(result) == 2, f"Expected 2 columns, got {len(result)}"
|
||
assert result[0].word_count == 3
|
||
assert result[1].word_count == 40
|
||
|
||
|
||
class TestCellsToVocabEntriesPageRef:
|
||
"""Test that page_ref cells are mapped to source_page field."""
|
||
|
||
def test_page_ref_mapped_to_source_page(self):
|
||
"""Cell with col_type='page_ref' → source_page field populated."""
|
||
from cv_vocab_pipeline import _cells_to_vocab_entries
|
||
|
||
cells = [
|
||
{
|
||
'row_index': 0,
|
||
'col_type': 'column_en',
|
||
'text': 'hello',
|
||
'bbox_pct': {'x': 10, 'y': 10, 'w': 30, 'h': 5},
|
||
'confidence': 95.0,
|
||
'ocr_engine': 'tesseract',
|
||
},
|
||
{
|
||
'row_index': 0,
|
||
'col_type': 'column_de',
|
||
'text': 'hallo',
|
||
'bbox_pct': {'x': 40, 'y': 10, 'w': 30, 'h': 5},
|
||
'confidence': 90.0,
|
||
'ocr_engine': 'tesseract',
|
||
},
|
||
{
|
||
'row_index': 0,
|
||
'col_type': 'page_ref',
|
||
'text': 'p.59',
|
||
'bbox_pct': {'x': 5, 'y': 10, 'w': 5, 'h': 5},
|
||
'confidence': 80.0,
|
||
'ocr_engine': 'tesseract',
|
||
},
|
||
]
|
||
columns_meta = [
|
||
{'type': 'column_en'}, {'type': 'column_de'}, {'type': 'page_ref'},
|
||
]
|
||
|
||
entries = _cells_to_vocab_entries(cells, columns_meta)
|
||
|
||
assert len(entries) == 1
|
||
assert entries[0]['english'] == 'hello'
|
||
assert entries[0]['german'] == 'hallo'
|
||
assert entries[0]['source_page'] == 'p.59'
|
||
assert entries[0]['bbox_ref'] == {'x': 5, 'y': 10, 'w': 5, 'h': 5}
|
||
|
||
def test_no_page_ref_defaults_empty(self):
|
||
"""Without page_ref cell, source_page defaults to empty string."""
|
||
from cv_vocab_pipeline import _cells_to_vocab_entries
|
||
|
||
cells = [
|
||
{
|
||
'row_index': 0,
|
||
'col_type': 'column_en',
|
||
'text': 'world',
|
||
'bbox_pct': {'x': 10, 'y': 10, 'w': 30, 'h': 5},
|
||
'confidence': 95.0,
|
||
'ocr_engine': 'tesseract',
|
||
},
|
||
]
|
||
columns_meta = [{'type': 'column_en'}]
|
||
|
||
entries = _cells_to_vocab_entries(cells, columns_meta)
|
||
|
||
assert len(entries) == 1
|
||
assert entries[0]['source_page'] == ''
|
||
assert entries[0]['bbox_ref'] is None
|
||
|
||
|
||
# =============================================
|
||
# RUN TESTS
|
||
# =============================================
|
||
|
||
if __name__ == "__main__":
|
||
pytest.main([__file__, "-v"])
|