feat: cell-first OCR + document type detection + dynamic pipeline steps

Cell-First OCR (v2): Each cell is cropped and OCR'd in isolation,
eliminating neighbour bleeding (e.g. "to", "ps" in marker columns).
Uses ThreadPoolExecutor for parallel Tesseract calls.

Document type detection: Classifies pages as vocab_table, full_text,
or generic_table using projection profiles (<2s, no OCR needed).
Frontend dynamically skips columns/rows steps for full-text pages.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-04 13:52:38 +01:00
parent 00a74b3144
commit 29c74a9962
7 changed files with 1001 additions and 75 deletions

View File

@@ -18,6 +18,7 @@ DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
import io
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from typing import Any, Dict, Generator, List, Optional, Tuple
@@ -159,6 +160,16 @@ class PipelineResult:
image_height: int = 0
@dataclass
class DocumentTypeResult:
"""Result of automatic document type detection."""
doc_type: str # 'vocab_table' | 'full_text' | 'generic_table'
confidence: float # 0.0-1.0
pipeline: str # 'cell_first' | 'full_page'
skip_steps: List[str] = field(default_factory=list) # e.g. ['columns', 'rows']
features: Dict[str, Any] = field(default_factory=dict) # debug info
# =============================================================================
# Stage 1: High-Resolution PDF Rendering
# =============================================================================
@@ -966,6 +977,164 @@ def dewarp_image_manual(img: np.ndarray, shear_degrees: float) -> np.ndarray:
return _apply_shear(img, -shear_degrees)
# =============================================================================
# Document Type Detection
# =============================================================================
def detect_document_type(ocr_img: np.ndarray, img_bgr: np.ndarray) -> DocumentTypeResult:
"""Detect whether the page is a vocab table, generic table, or full text.
Uses projection profiles and text density analysis — no OCR required.
Runs in < 2 seconds.
Args:
ocr_img: Binarized grayscale image (for projection profiles).
img_bgr: BGR color image.
Returns:
DocumentTypeResult with doc_type, confidence, pipeline, skip_steps.
"""
if ocr_img is None or ocr_img.size == 0:
return DocumentTypeResult(
doc_type='full_text', confidence=0.5, pipeline='full_page',
skip_steps=['columns', 'rows'],
features={'error': 'empty image'},
)
h, w = ocr_img.shape[:2]
# --- 1. Vertical projection profile → detect column gaps ---
# Sum dark pixels along each column (x-axis). Gaps = valleys in the profile.
# Invert: dark pixels on white background → high values = text.
vert_proj = np.sum(ocr_img < 128, axis=0).astype(float)
# Smooth the profile to avoid noise spikes
kernel_size = max(3, w // 100)
if kernel_size % 2 == 0:
kernel_size += 1
vert_smooth = np.convolve(vert_proj, np.ones(kernel_size) / kernel_size, mode='same')
# Find significant vertical gaps (columns of near-zero text density)
# A gap must be at least 1% of image width and have < 5% of max density
max_density = max(vert_smooth.max(), 1)
gap_threshold = max_density * 0.05
min_gap_width = max(5, w // 100)
in_gap = False
gap_count = 0
gap_start = 0
vert_gaps = []
for x in range(w):
if vert_smooth[x] < gap_threshold:
if not in_gap:
in_gap = True
gap_start = x
else:
if in_gap:
gap_width = x - gap_start
if gap_width >= min_gap_width:
gap_count += 1
vert_gaps.append((gap_start, x, gap_width))
in_gap = False
# Filter out margin gaps (within 10% of image edges)
margin_threshold = w * 0.10
internal_gaps = [g for g in vert_gaps if g[0] > margin_threshold and g[1] < w - margin_threshold]
internal_gap_count = len(internal_gaps)
# --- 2. Horizontal projection profile → detect row gaps ---
horiz_proj = np.sum(ocr_img < 128, axis=1).astype(float)
h_kernel = max(3, h // 200)
if h_kernel % 2 == 0:
h_kernel += 1
horiz_smooth = np.convolve(horiz_proj, np.ones(h_kernel) / h_kernel, mode='same')
h_max = max(horiz_smooth.max(), 1)
h_gap_threshold = h_max * 0.05
min_row_gap = max(3, h // 200)
row_gap_count = 0
in_gap = False
for y in range(h):
if horiz_smooth[y] < h_gap_threshold:
if not in_gap:
in_gap = True
gap_start = y
else:
if in_gap:
if y - gap_start >= min_row_gap:
row_gap_count += 1
in_gap = False
# --- 3. Text density distribution (4×4 grid) ---
grid_rows, grid_cols = 4, 4
cell_h, cell_w = h // grid_rows, w // grid_cols
densities = []
for gr in range(grid_rows):
for gc in range(grid_cols):
cell = ocr_img[gr * cell_h:(gr + 1) * cell_h,
gc * cell_w:(gc + 1) * cell_w]
if cell.size > 0:
d = float(np.count_nonzero(cell < 128)) / cell.size
densities.append(d)
density_std = float(np.std(densities)) if densities else 0
density_mean = float(np.mean(densities)) if densities else 0
features = {
'vertical_gaps': gap_count,
'internal_vertical_gaps': internal_gap_count,
'vertical_gap_details': [(g[0], g[1], g[2]) for g in vert_gaps[:10]],
'row_gaps': row_gap_count,
'density_mean': round(density_mean, 4),
'density_std': round(density_std, 4),
'image_size': (w, h),
}
# --- 4. Decision tree ---
# Use internal_gap_count (excludes margin gaps) for column detection.
if internal_gap_count >= 2 and row_gap_count >= 5:
# Multiple internal vertical gaps + many row gaps → table
confidence = min(0.95, 0.7 + internal_gap_count * 0.05 + row_gap_count * 0.005)
return DocumentTypeResult(
doc_type='vocab_table',
confidence=round(confidence, 2),
pipeline='cell_first',
skip_steps=[],
features=features,
)
elif internal_gap_count >= 1 and row_gap_count >= 3:
# Some internal structure, likely a table
confidence = min(0.85, 0.5 + internal_gap_count * 0.1 + row_gap_count * 0.01)
return DocumentTypeResult(
doc_type='generic_table',
confidence=round(confidence, 2),
pipeline='cell_first',
skip_steps=[],
features=features,
)
elif internal_gap_count == 0:
# No internal column gaps → full text (regardless of density)
confidence = min(0.95, 0.8 + (1 - min(density_std, 0.1)) * 0.15)
return DocumentTypeResult(
doc_type='full_text',
confidence=round(confidence, 2),
pipeline='full_page',
skip_steps=['columns', 'rows'],
features=features,
)
else:
# Ambiguous — default to vocab_table (most common use case)
return DocumentTypeResult(
doc_type='vocab_table',
confidence=0.5,
pipeline='cell_first',
skip_steps=[],
features=features,
)
# =============================================================================
# Stage 4: Dual Image Preparation
# =============================================================================
@@ -4481,8 +4650,395 @@ def _clean_cell_text(text: str) -> str:
return ' '.join(tokens)
def _clean_cell_text_lite(text: str) -> str:
"""Simplified noise filter for cell-first OCR (isolated cell crops).
Since each cell is OCR'd in isolation (no neighbour content visible),
trailing-noise stripping is unnecessary. Only 2 filters remain:
1. No real alphabetic word (>= 2 letters) and not a known abbreviation → empty.
2. Entire text is garbage (no dictionary word) → empty.
"""
stripped = text.strip()
if not stripped:
return ''
# --- Filter 1: No real word at all ---
if not _RE_REAL_WORD.search(stripped):
alpha_only = ''.join(_RE_ALPHA.findall(stripped)).lower()
if alpha_only not in _KNOWN_ABBREVIATIONS:
return ''
# --- Filter 2: Entire text is garbage ---
if _is_garbage_text(stripped):
return ''
return stripped
# ---------------------------------------------------------------------------
# Narrow-column OCR helpers (Proposal B)
# Cell-First OCR (v2) — each cell cropped and OCR'd in isolation
# ---------------------------------------------------------------------------
def _ocr_cell_crop(
row_idx: int,
col_idx: int,
row: RowGeometry,
col: PageRegion,
ocr_img: np.ndarray,
img_bgr: Optional[np.ndarray],
img_w: int,
img_h: int,
engine_name: str,
lang: str,
lang_map: Dict[str, str],
) -> Dict[str, Any]:
"""OCR a single cell by cropping the exact column×row intersection.
No padding beyond cell boundaries → no neighbour bleeding.
"""
# Display bbox: exact column × row intersection
disp_x = col.x
disp_y = row.y
disp_w = col.width
disp_h = row.height
# Crop boundaries (clamped to image)
cx = max(0, disp_x)
cy = max(0, disp_y)
cw = min(disp_w, img_w - cx)
ch = min(disp_h, img_h - cy)
empty_cell = {
'cell_id': f"R{row_idx:02d}_C{col_idx}",
'row_index': row_idx,
'col_index': col_idx,
'col_type': col.type,
'text': '',
'confidence': 0.0,
'bbox_px': {'x': disp_x, 'y': disp_y, 'w': disp_w, 'h': disp_h},
'bbox_pct': {
'x': round(disp_x / img_w * 100, 2) if img_w else 0,
'y': round(disp_y / img_h * 100, 2) if img_h else 0,
'w': round(disp_w / img_w * 100, 2) if img_w else 0,
'h': round(disp_h / img_h * 100, 2) if img_h else 0,
},
'ocr_engine': 'cell_crop_v2',
}
if cw <= 0 or ch <= 0:
return empty_cell
# --- Pixel-density check: skip truly empty cells ---
if ocr_img is not None:
crop = ocr_img[cy:cy + ch, cx:cx + cw]
if crop.size > 0:
dark_ratio = float(np.count_nonzero(crop < 180)) / crop.size
if dark_ratio < 0.005:
return empty_cell
# --- Prepare crop for OCR ---
cell_lang = lang_map.get(col.type, lang)
psm = _select_psm_for_column(col.type, col.width, row.height)
text = ''
avg_conf = 0.0
used_engine = 'cell_crop_v2'
if engine_name in ("trocr-printed", "trocr-handwritten") and img_bgr is not None:
cell_region = PageRegion(type=col.type, x=cx, y=cy, width=cw, height=ch)
words = ocr_region_trocr(img_bgr, cell_region,
handwritten=(engine_name == "trocr-handwritten"))
elif engine_name == "lighton" and img_bgr is not None:
cell_region = PageRegion(type=col.type, x=cx, y=cy, width=cw, height=ch)
words = ocr_region_lighton(img_bgr, cell_region)
elif engine_name == "rapid" and img_bgr is not None:
cell_region = PageRegion(type=col.type, x=cx, y=cy, width=cw, height=ch)
words = ocr_region_rapid(img_bgr, cell_region)
else:
# Tesseract: upscale tiny crops for better recognition
if ocr_img is not None:
crop_slice = ocr_img[cy:cy + ch, cx:cx + cw]
upscaled = _ensure_minimum_crop_size(crop_slice)
up_h, up_w = upscaled.shape[:2]
tmp_region = PageRegion(type=col.type, x=0, y=0, width=up_w, height=up_h)
words = ocr_region(upscaled, tmp_region, lang=cell_lang, psm=psm)
# Remap word positions back to original image coordinates
if words and (up_w != cw or up_h != ch):
sx = cw / max(up_w, 1)
sy = ch / max(up_h, 1)
for w in words:
w['left'] = int(w['left'] * sx) + cx
w['top'] = int(w['top'] * sy) + cy
w['width'] = int(w['width'] * sx)
w['height'] = int(w['height'] * sy)
elif words:
for w in words:
w['left'] += cx
w['top'] += cy
else:
words = []
# Filter low-confidence words
_MIN_WORD_CONF = 30
if words:
words = [w for w in words if w.get('conf', 0) >= _MIN_WORD_CONF]
if words:
y_tol = max(15, ch)
text = _words_to_reading_order_text(words, y_tolerance_px=y_tol)
avg_conf = round(sum(w['conf'] for w in words) / len(words), 1)
# --- PSM 7 fallback for still-empty Tesseract cells ---
if not text.strip() and engine_name == "tesseract" and ocr_img is not None:
crop_slice = ocr_img[cy:cy + ch, cx:cx + cw]
upscaled = _ensure_minimum_crop_size(crop_slice)
up_h, up_w = upscaled.shape[:2]
tmp_region = PageRegion(type=col.type, x=0, y=0, width=up_w, height=up_h)
psm7_words = ocr_region(upscaled, tmp_region, lang=cell_lang, psm=7)
if psm7_words:
psm7_words = [w for w in psm7_words if w.get('conf', 0) >= _MIN_WORD_CONF]
if psm7_words:
p7_text = _words_to_reading_order_text(psm7_words, y_tolerance_px=10)
if p7_text.strip():
text = p7_text
avg_conf = round(
sum(w['conf'] for w in psm7_words) / len(psm7_words), 1
)
used_engine = 'cell_crop_v2_psm7'
# --- Noise filter ---
if text.strip():
text = _clean_cell_text_lite(text)
if not text:
avg_conf = 0.0
result = dict(empty_cell)
result['text'] = text
result['confidence'] = avg_conf
result['ocr_engine'] = used_engine
return result
def build_cell_grid_v2(
ocr_img: np.ndarray,
column_regions: List[PageRegion],
row_geometries: List[RowGeometry],
img_w: int,
img_h: int,
lang: str = "eng+deu",
ocr_engine: str = "auto",
img_bgr: Optional[np.ndarray] = None,
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""Cell-First Grid: crop each cell in isolation, then OCR.
Drop-in replacement for build_cell_grid() — same signature & return type.
No full-page word assignment; each cell is OCR'd from its own crop.
"""
# Resolve engine
use_rapid = False
if ocr_engine in ("trocr-printed", "trocr-handwritten", "lighton"):
engine_name = ocr_engine
elif ocr_engine == "auto":
use_rapid = RAPIDOCR_AVAILABLE and img_bgr is not None
engine_name = "rapid" if use_rapid else "tesseract"
elif ocr_engine == "rapid":
if not RAPIDOCR_AVAILABLE:
logger.warning("RapidOCR requested but not available, falling back to Tesseract")
else:
use_rapid = True
engine_name = "rapid" if use_rapid else "tesseract"
else:
engine_name = "tesseract"
logger.info(f"build_cell_grid_v2: using OCR engine '{engine_name}'")
# Filter to content rows only
content_rows = [r for r in row_geometries if r.row_type == 'content']
if not content_rows:
logger.warning("build_cell_grid_v2: no content rows found")
return [], []
# Filter phantom rows (word_count=0) and artifact rows
before = len(content_rows)
content_rows = [r for r in content_rows if r.word_count > 0]
skipped = before - len(content_rows)
if skipped > 0:
logger.info(f"build_cell_grid_v2: skipped {skipped} phantom rows (word_count=0)")
if not content_rows:
logger.warning("build_cell_grid_v2: no content rows with words found")
return [], []
before_art = len(content_rows)
content_rows = [r for r in content_rows if not _is_artifact_row(r)]
artifact_skipped = before_art - len(content_rows)
if artifact_skipped > 0:
logger.info(f"build_cell_grid_v2: skipped {artifact_skipped} artifact rows")
if not content_rows:
logger.warning("build_cell_grid_v2: no content rows after artifact filtering")
return [], []
# Filter columns
_skip_types = {'column_ignore', 'header', 'footer', 'margin_top',
'margin_bottom', 'margin_left', 'margin_right'}
relevant_cols = [c for c in column_regions if c.type not in _skip_types]
if not relevant_cols:
logger.warning("build_cell_grid_v2: no usable columns found")
return [], []
# Heal row gaps
_heal_row_gaps(
content_rows,
top_bound=min(c.y for c in relevant_cols),
bottom_bound=max(c.y + c.height for c in relevant_cols),
)
relevant_cols.sort(key=lambda c: c.x)
columns_meta = [
{'index': ci, 'type': c.type, 'x': c.x, 'width': c.width}
for ci, c in enumerate(relevant_cols)
]
lang_map = {
'column_en': 'eng',
'column_de': 'deu',
'column_example': 'eng+deu',
}
# --- Parallel OCR with ThreadPoolExecutor ---
# Tesseract is single-threaded per call, so we benefit from parallelism.
# ~40 rows × 4 cols = 160 cells, ~50% empty (density skip) → ~80 OCR calls.
cells: List[Dict[str, Any]] = []
cell_tasks = []
for row_idx, row in enumerate(content_rows):
for col_idx, col in enumerate(relevant_cols):
cell_tasks.append((row_idx, col_idx, row, col))
max_workers = 4 if engine_name == "tesseract" else 2
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {
pool.submit(
_ocr_cell_crop,
ri, ci, row, col,
ocr_img, img_bgr, img_w, img_h,
engine_name, lang, lang_map,
): (ri, ci)
for ri, ci, row, col in cell_tasks
}
for future in as_completed(futures):
try:
cell = future.result()
cells.append(cell)
except Exception as e:
ri, ci = futures[future]
logger.error(f"build_cell_grid_v2: cell R{ri:02d}_C{ci} failed: {e}")
# Sort cells by (row_index, col_index) since futures complete out of order
cells.sort(key=lambda c: (c['row_index'], c['col_index']))
# Remove all-empty rows
rows_with_text: set = set()
for cell in cells:
if cell['text'].strip():
rows_with_text.add(cell['row_index'])
before_filter = len(cells)
cells = [c for c in cells if c['row_index'] in rows_with_text]
empty_rows_removed = (before_filter - len(cells)) // max(len(relevant_cols), 1)
if empty_rows_removed > 0:
logger.info(f"build_cell_grid_v2: removed {empty_rows_removed} all-empty rows")
logger.info(f"build_cell_grid_v2: {len(cells)} cells from "
f"{len(content_rows)} rows × {len(relevant_cols)} columns, "
f"engine={engine_name}")
return cells, columns_meta
def build_cell_grid_v2_streaming(
ocr_img: np.ndarray,
column_regions: List[PageRegion],
row_geometries: List[RowGeometry],
img_w: int,
img_h: int,
lang: str = "eng+deu",
ocr_engine: str = "auto",
img_bgr: Optional[np.ndarray] = None,
) -> Generator[Tuple[Dict[str, Any], List[Dict[str, Any]], int], None, None]:
"""Streaming variant of build_cell_grid_v2 — yields each cell as OCR'd.
Yields:
(cell_dict, columns_meta, total_cells)
"""
# Resolve engine
use_rapid = False
if ocr_engine in ("trocr-printed", "trocr-handwritten", "lighton"):
engine_name = ocr_engine
elif ocr_engine == "auto":
use_rapid = RAPIDOCR_AVAILABLE and img_bgr is not None
engine_name = "rapid" if use_rapid else "tesseract"
elif ocr_engine == "rapid":
if not RAPIDOCR_AVAILABLE:
logger.warning("RapidOCR requested but not available, falling back to Tesseract")
else:
use_rapid = True
engine_name = "rapid" if use_rapid else "tesseract"
else:
engine_name = "tesseract"
content_rows = [r for r in row_geometries if r.row_type == 'content']
if not content_rows:
return
content_rows = [r for r in content_rows if r.word_count > 0]
if not content_rows:
return
_skip_types = {'column_ignore', 'header', 'footer', 'margin_top',
'margin_bottom', 'margin_left', 'margin_right'}
relevant_cols = [c for c in column_regions if c.type not in _skip_types]
if not relevant_cols:
return
content_rows = [r for r in content_rows if not _is_artifact_row(r)]
if not content_rows:
return
_heal_row_gaps(
content_rows,
top_bound=min(c.y for c in relevant_cols),
bottom_bound=max(c.y + c.height for c in relevant_cols),
)
relevant_cols.sort(key=lambda c: c.x)
columns_meta = [
{'index': ci, 'type': c.type, 'x': c.x, 'width': c.width}
for ci, c in enumerate(relevant_cols)
]
lang_map = {
'column_en': 'eng',
'column_de': 'deu',
'column_example': 'eng+deu',
}
total_cells = len(content_rows) * len(relevant_cols)
for row_idx, row in enumerate(content_rows):
for col_idx, col in enumerate(relevant_cols):
cell = _ocr_cell_crop(
row_idx, col_idx, row, col,
ocr_img, img_bgr, img_w, img_h,
engine_name, lang, lang_map,
)
yield cell, columns_meta, total_cells
# ---------------------------------------------------------------------------
# Narrow-column OCR helpers (Proposal B) — DEPRECATED (kept for legacy build_cell_grid)
# ---------------------------------------------------------------------------
def _compute_cell_padding(col_width: int, img_w: int) -> int: