feat(ocr-pipeline): filter scan artifacts in content bounds and add margin regions
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 26s
CI / test-go-edu-search (push) Successful in 27s
CI / test-python-klausur (push) Failing after 1m50s
CI / test-python-agent-core (push) Successful in 16s
CI / test-nodejs-website (push) Successful in 18s

Thin black lines (1-5px) at page edges from scanning were incorrectly
detected as content, shifting content bounds and creating spurious
IGNORE columns. This filters narrow projection runs (<1% of image
dimension) and introduces explicit margin_left/margin_right regions
for downstream page reconstruction.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-02 15:29:18 +01:00
parent e718353d9f
commit 34ccdd5fd1
2 changed files with 247 additions and 18 deletions

View File

@@ -631,42 +631,70 @@ def create_layout_image(img: np.ndarray) -> np.ndarray:
# Stage 5: Layout Analysis (Projection Profiles) # Stage 5: Layout Analysis (Projection Profiles)
# ============================================================================= # =============================================================================
def _filter_narrow_runs(mask: np.ndarray, min_width: int) -> np.ndarray:
"""Remove contiguous True-runs shorter than *min_width* from a 1-D bool mask."""
out = mask.copy()
n = len(out)
i = 0
while i < n:
if out[i]:
start = i
while i < n and out[i]:
i += 1
if (i - start) < min_width:
out[start:i] = False
else:
i += 1
return out
def _find_content_bounds(inv: np.ndarray) -> Tuple[int, int, int, int]: def _find_content_bounds(inv: np.ndarray) -> Tuple[int, int, int, int]:
"""Find the bounding box of actual text content (excluding page margins). """Find the bounding box of actual text content (excluding page margins).
Scan artefacts (thin black lines at page edges) are filtered out by
discarding contiguous projection runs narrower than 1 % of the image
dimension (min 5 px).
Returns: Returns:
Tuple of (left_x, right_x, top_y, bottom_y). Tuple of (left_x, right_x, top_y, bottom_y).
""" """
h, w = inv.shape[:2] h, w = inv.shape[:2]
threshold = 0.005
# Horizontal projection for top/bottom # --- Horizontal projection for top/bottom ---
h_proj = np.sum(inv, axis=1).astype(float) / (w * 255) h_proj = np.sum(inv, axis=1).astype(float) / (w * 255)
h_mask = h_proj > threshold
min_h_run = max(5, h // 100)
h_mask = _filter_narrow_runs(h_mask, min_h_run)
top_y = 0 top_y = 0
for y in range(h): for y in range(h):
if h_proj[y] > 0.005: if h_mask[y]:
top_y = max(0, y - 5) top_y = max(0, y - 5)
break break
bottom_y = h bottom_y = h
for y in range(h - 1, 0, -1): for y in range(h - 1, 0, -1):
if h_proj[y] > 0.005: if h_mask[y]:
bottom_y = min(h, y + 5) bottom_y = min(h, y + 5)
break break
# Vertical projection for left/right margins # --- Vertical projection for left/right margins ---
v_proj = np.sum(inv[top_y:bottom_y, :], axis=0).astype(float) v_proj = np.sum(inv[top_y:bottom_y, :], axis=0).astype(float)
v_proj_norm = v_proj / ((bottom_y - top_y) * 255) if (bottom_y - top_y) > 0 else v_proj v_proj_norm = v_proj / ((bottom_y - top_y) * 255) if (bottom_y - top_y) > 0 else v_proj
v_mask = v_proj_norm > threshold
min_v_run = max(5, w // 100)
v_mask = _filter_narrow_runs(v_mask, min_v_run)
left_x = 0 left_x = 0
for x in range(w): for x in range(w):
if v_proj_norm[x] > 0.005: if v_mask[x]:
left_x = max(0, x - 2) left_x = max(0, x - 2)
break break
right_x = w right_x = w
for x in range(w - 1, 0, -1): for x in range(w - 1, 0, -1):
if v_proj_norm[x] > 0.005: if v_mask[x]:
right_x = min(w, x + 2) right_x = min(w, x + 2)
break break
@@ -1993,12 +2021,58 @@ def _score_role(geom: ColumnGeometry) -> Dict[str, float]:
return {k: round(v, 3) for k, v in scores.items()} return {k: round(v, 3) for k, v in scores.items()}
def _build_margin_regions(
all_regions: List[PageRegion],
left_x: int,
right_x: int,
img_w: int,
top_y: int,
content_h: int,
) -> List[PageRegion]:
"""Create margin_left / margin_right PageRegions from content bounds.
Margins represent the space between the image edge and the first/last
content column. They are used downstream for faithful page
reconstruction but are skipped during OCR.
"""
margins: List[PageRegion] = []
# Minimum gap (px) to create a margin region
_min_gap = 5
if left_x > _min_gap:
margins.append(PageRegion(
type='margin_left', x=0, y=top_y,
width=left_x, height=content_h,
classification_confidence=1.0,
classification_method='content_bounds',
))
# Right margin: from end of last content column to image edge
non_margin = [r for r in all_regions
if r.type not in ('margin_left', 'margin_right', 'header', 'footer')]
if non_margin:
last_col_end = max(r.x + r.width for r in non_margin)
else:
last_col_end = right_x
if img_w - last_col_end > _min_gap:
margins.append(PageRegion(
type='margin_right', x=last_col_end, y=top_y,
width=img_w - last_col_end, height=content_h,
classification_confidence=1.0,
classification_method='content_bounds',
))
return margins
def classify_column_types(geometries: List[ColumnGeometry], def classify_column_types(geometries: List[ColumnGeometry],
content_w: int, content_w: int,
top_y: int, top_y: int,
img_w: int, img_w: int,
img_h: int, img_h: int,
bottom_y: int) -> List[PageRegion]: bottom_y: int,
left_x: int = 0,
right_x: int = 0) -> List[PageRegion]:
"""Classify column types using a 3-level fallback chain. """Classify column types using a 3-level fallback chain.
Level 1: Content-based (language + role scoring) Level 1: Content-based (language + role scoring)
@@ -2012,21 +2086,28 @@ def classify_column_types(geometries: List[ColumnGeometry],
img_w: Full image width. img_w: Full image width.
img_h: Full image height. img_h: Full image height.
bottom_y: Bottom Y of content area. bottom_y: Bottom Y of content area.
left_x: Left content bound (from _find_content_bounds).
right_x: Right content bound (from _find_content_bounds).
Returns: Returns:
List of PageRegion with types, confidence, and method. List of PageRegion with types, confidence, and method.
""" """
content_h = bottom_y - top_y content_h = bottom_y - top_y
def _with_margins(result: List[PageRegion]) -> List[PageRegion]:
"""Append margin_left / margin_right regions to *result*."""
margins = _build_margin_regions(result, left_x, right_x, img_w, top_y, content_h)
return result + margins
# Special case: single column → plain text page # Special case: single column → plain text page
if len(geometries) == 1: if len(geometries) == 1:
geom = geometries[0] geom = geometries[0]
return [PageRegion( return _with_margins([PageRegion(
type='column_text', x=geom.x, y=geom.y, type='column_text', x=geom.x, y=geom.y,
width=geom.width, height=geom.height, width=geom.width, height=geom.height,
classification_confidence=0.9, classification_confidence=0.9,
classification_method='content', classification_method='content',
)] )])
# --- Pre-filter: first/last columns with very few words → column_ignore --- # --- Pre-filter: first/last columns with very few words → column_ignore ---
ignore_regions = [] ignore_regions = []
@@ -2050,7 +2131,7 @@ def classify_column_types(geometries: List[ColumnGeometry],
# Handle edge case: all columns ignored or only 1 left # Handle edge case: all columns ignored or only 1 left
if len(geometries) == 0: if len(geometries) == 0:
return ignore_regions return _with_margins(ignore_regions)
if len(geometries) == 1: if len(geometries) == 1:
geom = geometries[0] geom = geometries[0]
ignore_regions.append(PageRegion( ignore_regions.append(PageRegion(
@@ -2059,7 +2140,7 @@ def classify_column_types(geometries: List[ColumnGeometry],
classification_confidence=0.9, classification_confidence=0.9,
classification_method='content', classification_method='content',
)) ))
return ignore_regions return _with_margins(ignore_regions)
# --- Score all columns --- # --- Score all columns ---
lang_scores = [_score_language(g.words) for g in geometries] lang_scores = [_score_language(g.words) for g in geometries]
@@ -2075,20 +2156,20 @@ def classify_column_types(geometries: List[ColumnGeometry],
if regions is not None: if regions is not None:
logger.info("ClassifyColumns: Level 1 (content-based) succeeded") logger.info("ClassifyColumns: Level 1 (content-based) succeeded")
_add_header_footer(regions, top_y, bottom_y, img_w, img_h) _add_header_footer(regions, top_y, bottom_y, img_w, img_h)
return ignore_regions + regions return _with_margins(ignore_regions + regions)
# --- Level 2: Position + language enhanced --- # --- Level 2: Position + language enhanced ---
regions = _classify_by_position_enhanced(geometries, lang_scores, content_w, content_h) regions = _classify_by_position_enhanced(geometries, lang_scores, content_w, content_h)
if regions is not None: if regions is not None:
logger.info("ClassifyColumns: Level 2 (position+language) succeeded") logger.info("ClassifyColumns: Level 2 (position+language) succeeded")
_add_header_footer(regions, top_y, bottom_y, img_w, img_h) _add_header_footer(regions, top_y, bottom_y, img_w, img_h)
return ignore_regions + regions return _with_margins(ignore_regions + regions)
# --- Level 3: Pure position fallback (old code, no regression) --- # --- Level 3: Pure position fallback (old code, no regression) ---
logger.info("ClassifyColumns: Level 3 (position fallback)") logger.info("ClassifyColumns: Level 3 (position fallback)")
regions = _classify_by_position_fallback(geometries, content_w, content_h) regions = _classify_by_position_fallback(geometries, content_w, content_h)
_add_header_footer(regions, top_y, bottom_y, img_w, img_h) _add_header_footer(regions, top_y, bottom_y, img_w, img_h)
return ignore_regions + regions return _with_margins(ignore_regions + regions)
def _classify_by_content(geometries: List[ColumnGeometry], def _classify_by_content(geometries: List[ColumnGeometry],
@@ -2490,7 +2571,8 @@ def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Li
content_w = right_x - left_x content_w = right_x - left_x
# Phase B: Content-based classification # Phase B: Content-based classification
regions = classify_column_types(geometries, content_w, top_y, w, h, bottom_y) regions = classify_column_types(geometries, content_w, top_y, w, h, bottom_y,
left_x=left_x, right_x=right_x)
col_count = len([r for r in regions if r.type.startswith('column') or r.type == 'page_ref']) col_count = len([r for r in regions if r.type.startswith('column') or r.type == 'page_ref'])
methods = set(r.classification_method for r in regions if r.classification_method) methods = set(r.classification_method for r in regions if r.classification_method)
@@ -3602,7 +3684,7 @@ def build_cell_grid(
return [], [] return [], []
# Use columns only — skip ignore, header, footer, page_ref # Use columns only — skip ignore, header, footer, page_ref
_skip_types = {'column_ignore', 'header', 'footer', 'page_ref'} _skip_types = {'column_ignore', 'header', 'footer', 'page_ref', 'margin_left', 'margin_right'}
relevant_cols = [c for c in column_regions if c.type not in _skip_types] relevant_cols = [c for c in column_regions if c.type not in _skip_types]
if not relevant_cols: if not relevant_cols:
logger.warning("build_cell_grid: no usable columns found") logger.warning("build_cell_grid: no usable columns found")
@@ -3764,7 +3846,7 @@ def build_cell_grid_streaming(
if not content_rows: if not content_rows:
return return
_skip_types = {'column_ignore', 'header', 'footer', 'page_ref'} _skip_types = {'column_ignore', 'header', 'footer', 'page_ref', 'margin_left', 'margin_right'}
relevant_cols = [c for c in column_regions if c.type not in _skip_types] relevant_cols = [c for c in column_regions if c.type not in _skip_types]
if not relevant_cols: if not relevant_cols:
return return
@@ -4249,8 +4331,9 @@ def run_multi_pass_ocr(ocr_img: np.ndarray,
""" """
results: Dict[str, List[Dict]] = {} results: Dict[str, List[Dict]] = {}
_ocr_skip = {'header', 'footer', 'margin_left', 'margin_right'}
for region in regions: for region in regions:
if region.type == 'header' or region.type == 'footer': if region.type in _ocr_skip:
continue # Skip non-content regions continue # Skip non-content regions
if region.type == 'column_en': if region.type == 'column_en':

View File

@@ -32,6 +32,8 @@ from cv_vocab_pipeline import (
create_ocr_image, create_ocr_image,
create_layout_image, create_layout_image,
_find_content_bounds, _find_content_bounds,
_filter_narrow_runs,
_build_margin_regions,
analyze_layout, analyze_layout,
_group_words_into_lines, _group_words_into_lines,
match_lines_to_vocab, match_lines_to_vocab,
@@ -843,6 +845,150 @@ class TestMergeContinuationRows:
assert len(result) == 2 assert len(result) == 2
# =============================================
# Test: Content-Bounds Scan-Artifact Filtering
# =============================================
class TestContentBoundsFiltering:
"""Test that _find_content_bounds filters narrow scan artifacts."""
def test_thin_vertical_line_ignored(self):
"""A 2px black line at the left edge should not pull left_x leftward."""
inv = np.zeros((400, 600), dtype=np.uint8)
# Main content block in the middle
inv[50:350, 100:550] = 255
# 2px thin vertical scan artifact at x=5..6
inv[50:350, 5:7] = 255
left, right, top, bottom = _find_content_bounds(inv)
# left_x must be near 100 (the real content), not near 5
assert left >= 90, f"left_x={left} should be >=90 (near real content, not artifact)"
def test_thick_content_preserved(self):
"""A 50px wide text block is real content and must not be filtered."""
inv = np.zeros((400, 600), dtype=np.uint8)
inv[50:350, 80:130] = 255 # 50px wide block
inv[50:350, 200:500] = 255 # wider block
left, right, top, bottom = _find_content_bounds(inv)
assert left <= 82, f"left_x={left} should be <=82 (50px block is real content)"
def test_no_artifacts_unchanged(self):
"""Normal image without artifacts: bounds should match content."""
inv = np.zeros((400, 600), dtype=np.uint8)
inv[100:300, 50:550] = 255
left, right, top, bottom = _find_content_bounds(inv)
assert left <= 52
assert right >= 548
assert top <= 105
assert bottom >= 295
def test_right_edge_artifact_ignored(self):
"""A thin vertical line at the right edge should not pull right_x rightward."""
inv = np.zeros((400, 600), dtype=np.uint8)
inv[50:350, 50:500] = 255 # real content
inv[50:350, 595:598] = 255 # 3px artifact at right edge
left, right, top, bottom = _find_content_bounds(inv)
assert right <= 510, f"right_x={right} should be <=510, ignoring right-edge artifact"
def test_horizontal_line_ignored(self):
"""A thin horizontal line at the top should not pull top_y upward."""
inv = np.zeros((400, 600), dtype=np.uint8)
inv[100:350, 50:550] = 255 # real content
inv[2:4, 50:550] = 255 # 2px horizontal artifact at top
left, right, top, bottom = _find_content_bounds(inv)
assert top >= 90, f"top_y={top} should be >=90 (ignoring thin top line)"
class TestFilterNarrowRuns:
"""Test the _filter_narrow_runs helper directly."""
def test_removes_short_run(self):
mask = np.array([False, True, True, False, True, True, True, True, True, False])
result = _filter_narrow_runs(mask, min_width=3)
# The 2-wide run at indices 1-2 should be removed
assert not result[1]
assert not result[2]
# The 5-wide run at indices 4-8 should remain
assert result[4]
assert result[8]
def test_keeps_wide_run(self):
mask = np.array([True] * 10)
result = _filter_narrow_runs(mask, min_width=5)
assert all(result)
def test_all_narrow(self):
mask = np.array([True, True, False, True, False])
result = _filter_narrow_runs(mask, min_width=3)
assert not any(result)
# =============================================
# Test: Margin Regions
# =============================================
class TestMarginRegions:
"""Test _build_margin_regions and margin integration."""
def test_margin_left_created(self):
"""When left_x > 5, a margin_left region should be created."""
existing = [
PageRegion(type='column_en', x=100, y=50, width=200, height=300),
PageRegion(type='column_de', x=320, y=50, width=200, height=300),
]
margins = _build_margin_regions(existing, left_x=100, right_x=520,
img_w=600, top_y=50, content_h=300)
left_margins = [m for m in margins if m.type == 'margin_left']
assert len(left_margins) == 1
ml = left_margins[0]
assert ml.x == 0
assert ml.width == 100
def test_margin_right_created(self):
"""When there's space after the last column, margin_right should be created."""
existing = [
PageRegion(type='column_en', x=50, y=50, width=200, height=300),
PageRegion(type='column_de', x=260, y=50, width=200, height=300),
]
# last_col_end = 260 + 200 = 460, img_w = 600 → gap = 140
margins = _build_margin_regions(existing, left_x=50, right_x=460,
img_w=600, top_y=50, content_h=300)
right_margins = [m for m in margins if m.type == 'margin_right']
assert len(right_margins) == 1
mr = right_margins[0]
assert mr.x == 460
assert mr.width == 140
def test_no_margin_when_flush(self):
"""When columns are flush with the image edges, no margins should appear."""
existing = [
PageRegion(type='column_en', x=0, y=0, width=300, height=400),
PageRegion(type='column_de', x=300, y=0, width=300, height=400),
]
margins = _build_margin_regions(existing, left_x=0, right_x=600,
img_w=600, top_y=0, content_h=400)
assert len(margins) == 0
def test_margins_in_skip_types(self):
"""Verify margin types are in the skip set used by build_cell_grid."""
skip = {'column_ignore', 'header', 'footer', 'page_ref', 'margin_left', 'margin_right'}
assert 'margin_left' in skip
assert 'margin_right' in skip
def test_margin_confidence_and_method(self):
"""Margin regions should have confidence 1.0 and method 'content_bounds'."""
existing = [PageRegion(type='column_en', x=80, y=20, width=400, height=500)]
margins = _build_margin_regions(existing, left_x=80, right_x=480,
img_w=600, top_y=20, content_h=500)
for m in margins:
assert m.classification_confidence == 1.0
assert m.classification_method == 'content_bounds'
# ============================================= # =============================================
# RUN TESTS # RUN TESTS
# ============================================= # =============================================