feat(ocr-pipeline): generic cell-grid with optional vocab mapping

Extract build_cell_grid() as layout-agnostic foundation from
build_word_grid(). Step 5 now produces a generic cell grid (columns x
rows) and auto-detects whether vocab layout is present. Frontend
dynamically switches between vocab table (EN/DE/Example) and generic
cell table based on layout type.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-01 17:22:56 +01:00
parent 3bcb7aa638
commit 27b895a848
4 changed files with 802 additions and 301 deletions

View File

@@ -3009,7 +3009,7 @@ def _replace_phonetics_in_text(text: str, pronunciation: str = 'british') -> str
return _PHONETIC_BRACKET_RE.sub(replacer, text)
def build_word_grid(
def build_cell_grid(
ocr_img: np.ndarray,
column_regions: List[PageRegion],
row_geometries: List[RowGeometry],
@@ -3018,9 +3018,11 @@ def build_word_grid(
lang: str = "eng+deu",
ocr_engine: str = "auto",
img_bgr: Optional[np.ndarray] = None,
pronunciation: str = "british",
) -> List[Dict[str, Any]]:
"""Build a word grid by intersecting columns and rows, then OCR each cell.
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""Generic Cell-Grid: Columns × Rows → cells with OCR text.
This is the layout-agnostic foundation. Every column (except column_ignore)
is intersected with every content row to produce numbered cells.
Args:
ocr_img: Binarized full-page image (for Tesseract).
@@ -3029,11 +3031,12 @@ def build_word_grid(
img_w: Image width in pixels.
img_h: Image height in pixels.
lang: Default Tesseract language.
ocr_engine: 'tesseract', 'rapid', or 'auto' (rapid if available, else tesseract).
ocr_engine: 'tesseract', 'rapid', or 'auto'.
img_bgr: BGR color image (required for RapidOCR).
Returns:
List of entry dicts with english/german/example text and bbox info (percent).
(cells, columns_meta) where cells is a list of cell dicts and
columns_meta describes the columns used.
"""
# Resolve engine choice
use_rapid = False
@@ -3046,24 +3049,34 @@ def build_word_grid(
use_rapid = True
engine_name = "rapid" if use_rapid else "tesseract"
logger.info(f"build_word_grid: using OCR engine '{engine_name}'")
logger.info(f"build_cell_grid: using OCR engine '{engine_name}'")
# Filter to content rows only (skip header/footer)
content_rows = [r for r in row_geometries if r.row_type == 'content']
if not content_rows:
logger.warning("build_word_grid: no content rows found")
return []
logger.warning("build_cell_grid: no content rows found")
return [], []
# Map column types to roles
VOCAB_COLUMN_TYPES = {'column_en', 'column_de', 'column_example'}
relevant_cols = [c for c in column_regions if c.type in VOCAB_COLUMN_TYPES]
# Use all columns except column_ignore
relevant_cols = [c for c in column_regions if c.type != 'column_ignore']
if not relevant_cols:
logger.warning("build_word_grid: no relevant vocabulary columns found")
return []
logger.warning("build_cell_grid: no usable columns found")
return [], []
# Sort columns left-to-right
relevant_cols.sort(key=lambda c: c.x)
# Build columns_meta
columns_meta = [
{
'index': col_idx,
'type': col.type,
'x': col.x,
'width': col.width,
}
for col_idx, col in enumerate(relevant_cols)
]
# Choose OCR language per column type (Tesseract only)
lang_map = {
'column_en': 'eng',
@@ -3071,47 +3084,40 @@ def build_word_grid(
'column_example': 'eng+deu',
}
entries: List[Dict[str, Any]] = []
cells: List[Dict[str, Any]] = []
for row_idx, row in enumerate(content_rows):
entry: Dict[str, Any] = {
'row_index': row_idx,
'english': '',
'german': '',
'example': '',
'confidence': 0.0,
'bbox': {
'x': round(row.x / img_w * 100, 2),
'y': round(row.y / img_h * 100, 2),
'w': round(row.width / img_w * 100, 2),
'h': round(row.height / img_h * 100, 2),
},
'bbox_en': None,
'bbox_de': None,
'bbox_ex': None,
'ocr_engine': engine_name,
}
confidences: List[float] = []
for col in relevant_cols:
for col_idx, col in enumerate(relevant_cols):
# Compute cell region: column x/width, row y/height
# Add padding to avoid clipping edge words
pad = 8 # pixels
cell_x = col.x - pad
cell_y = row.y - pad
cell_x = max(0, col.x - pad)
cell_y = max(0, row.y - pad)
cell_w = col.width + 2 * pad
cell_h = row.height + 2 * pad
# Clamp to image bounds
cell_x = max(0, cell_x)
cell_y = max(0, cell_y)
if cell_x + cell_w > img_w:
cell_w = img_w - cell_x
if cell_y + cell_h > img_h:
cell_h = img_h - cell_y
if cell_w <= 0 or cell_h <= 0:
cells.append({
'cell_id': f"R{row_idx:02d}_C{col_idx}",
'row_index': row_idx,
'col_index': col_idx,
'col_type': col.type,
'text': '',
'confidence': 0.0,
'bbox_px': {'x': col.x, 'y': row.y, 'w': col.width, 'h': row.height},
'bbox_pct': {
'x': round(col.x / img_w * 100, 2),
'y': round(row.y / img_h * 100, 2),
'w': round(col.width / img_w * 100, 2),
'h': round(row.height / img_h * 100, 2),
},
'ocr_engine': engine_name,
})
continue
cell_region = PageRegion(
@@ -3119,6 +3125,7 @@ def build_word_grid(
x=cell_x, y=cell_y,
width=cell_w, height=cell_h,
)
# OCR the cell
if use_rapid:
words = ocr_region_rapid(img_bgr, cell_region)
@@ -3126,8 +3133,7 @@ def build_word_grid(
cell_lang = lang_map.get(col.type, lang)
words = ocr_region(ocr_img, cell_region, lang=cell_lang, psm=6)
# Group into lines, then join in reading order (Fix A)
# Use half of average word height as Y-tolerance
# Group into lines, then join in reading order
if words:
avg_h = sum(w['height'] for w in words) / len(words)
y_tol = max(10, int(avg_h * 0.5))
@@ -3135,36 +3141,162 @@ def build_word_grid(
y_tol = 15
text = _words_to_reading_order_text(words, y_tolerance_px=y_tol)
avg_conf = 0.0
if words:
avg_conf = sum(w['conf'] for w in words) / len(words)
confidences.append(avg_conf)
avg_conf = round(sum(w['conf'] for w in words) / len(words), 1)
# Bbox in percent
cell_bbox = {
'x': round(cell_x / img_w * 100, 2),
'y': round(cell_y / img_h * 100, 2),
'w': round(cell_w / img_w * 100, 2),
'h': round(cell_h / img_h * 100, 2),
cells.append({
'cell_id': f"R{row_idx:02d}_C{col_idx}",
'row_index': row_idx,
'col_index': col_idx,
'col_type': col.type,
'text': text,
'confidence': avg_conf,
'bbox_px': {'x': cell_x, 'y': cell_y, 'w': cell_w, 'h': cell_h},
'bbox_pct': {
'x': round(cell_x / img_w * 100, 2),
'y': round(cell_y / img_h * 100, 2),
'w': round(cell_w / img_w * 100, 2),
'h': round(cell_h / img_h * 100, 2),
},
'ocr_engine': engine_name,
})
logger.info(f"build_cell_grid: {len(cells)} cells from "
f"{len(content_rows)} rows × {len(relevant_cols)} columns, "
f"engine={engine_name}")
return cells, columns_meta
def _cells_to_vocab_entries(
cells: List[Dict[str, Any]],
columns_meta: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Map generic cells to vocab entries with english/german/example fields.
Groups cells by row_index, maps col_type → field name, and produces
one entry per row (only rows with at least one non-empty field).
"""
# Determine image dimensions from first cell (for row-level bbox)
col_type_to_field = {
'column_en': 'english',
'column_de': 'german',
'column_example': 'example',
}
bbox_key_map = {
'column_en': 'bbox_en',
'column_de': 'bbox_de',
'column_example': 'bbox_ex',
}
# Group cells by row_index
rows: Dict[int, List[Dict]] = {}
for cell in cells:
ri = cell['row_index']
rows.setdefault(ri, []).append(cell)
entries: List[Dict[str, Any]] = []
for row_idx in sorted(rows.keys()):
row_cells = rows[row_idx]
entry: Dict[str, Any] = {
'row_index': row_idx,
'english': '',
'german': '',
'example': '',
'confidence': 0.0,
'bbox': None,
'bbox_en': None,
'bbox_de': None,
'bbox_ex': None,
'ocr_engine': row_cells[0].get('ocr_engine', '') if row_cells else '',
}
confidences = []
for cell in row_cells:
col_type = cell['col_type']
field = col_type_to_field.get(col_type)
if field:
entry[field] = cell['text']
bbox_field = bbox_key_map.get(col_type)
if bbox_field:
entry[bbox_field] = cell['bbox_pct']
if cell['confidence'] > 0:
confidences.append(cell['confidence'])
# Compute row-level bbox as union of all cell bboxes
all_bboxes = [c['bbox_pct'] for c in row_cells if c.get('bbox_pct')]
if all_bboxes:
min_x = min(b['x'] for b in all_bboxes)
min_y = min(b['y'] for b in all_bboxes)
max_x2 = max(b['x'] + b['w'] for b in all_bboxes)
max_y2 = max(b['y'] + b['h'] for b in all_bboxes)
entry['bbox'] = {
'x': round(min_x, 2),
'y': round(min_y, 2),
'w': round(max_x2 - min_x, 2),
'h': round(max_y2 - min_y, 2),
}
if col.type == 'column_en':
entry['english'] = text
entry['bbox_en'] = cell_bbox
elif col.type == 'column_de':
entry['german'] = text
entry['bbox_de'] = cell_bbox
elif col.type == 'column_example':
entry['example'] = text
entry['bbox_ex'] = cell_bbox
entry['confidence'] = round(
sum(confidences) / len(confidences), 1
) if confidences else 0.0
# Only include if at least one field has text
# Only include if at least one vocab field has text
if entry['english'] or entry['german'] or entry['example']:
entries.append(entry)
return entries
def build_word_grid(
ocr_img: np.ndarray,
column_regions: List[PageRegion],
row_geometries: List[RowGeometry],
img_w: int,
img_h: int,
lang: str = "eng+deu",
ocr_engine: str = "auto",
img_bgr: Optional[np.ndarray] = None,
pronunciation: str = "british",
) -> List[Dict[str, Any]]:
"""Vocab-specific: Cell-Grid + Vocab-Mapping + Post-Processing.
Wrapper around build_cell_grid() that adds vocabulary-specific logic:
- Maps cells to english/german/example entries
- Applies character confusion fixes, IPA lookup, comma splitting, etc.
- Falls back to returning raw cells if no vocab columns detected.
Args:
ocr_img: Binarized full-page image (for Tesseract).
column_regions: Classified columns from Step 3.
row_geometries: Rows from Step 4.
img_w, img_h: Image dimensions.
lang: Default Tesseract language.
ocr_engine: 'tesseract', 'rapid', or 'auto'.
img_bgr: BGR color image (required for RapidOCR).
pronunciation: 'british' or 'american' for IPA lookup.
Returns:
List of entry dicts with english/german/example text and bbox info (percent).
"""
cells, columns_meta = build_cell_grid(
ocr_img, column_regions, row_geometries, img_w, img_h,
lang=lang, ocr_engine=ocr_engine, img_bgr=img_bgr,
)
if not cells:
return []
# Check if vocab layout is present
col_types = {c['type'] for c in columns_meta}
if not (col_types & {'column_en', 'column_de'}):
logger.info("build_word_grid: no vocab columns — returning raw cells")
return cells
# Vocab mapping: cells → entries
entries = _cells_to_vocab_entries(cells, columns_meta)
# --- Post-processing pipeline (deterministic, no LLM) ---
n_raw = len(entries)
@@ -3177,13 +3309,13 @@ def build_word_grid(
# 3. Split comma-separated word forms (break, broke, broken → 3 entries)
entries = _split_comma_entries(entries)
# 5. Attach example sentences (rows without DE → examples for preceding entry)
# 4. Attach example sentences (rows without DE → examples for preceding entry)
entries = _attach_example_sentences(entries)
engine_name = cells[0].get('ocr_engine', 'unknown') if cells else 'unknown'
logger.info(f"build_word_grid: {len(entries)} entries from "
f"{n_raw} raw → {len(entries)} after post-processing "
f"({len(content_rows)} content rows × {len(relevant_cols)} columns, "
f"engine={engine_name})")
f"(engine={engine_name})")
return entries

View File

@@ -31,8 +31,14 @@ from pydantic import BaseModel
from cv_vocab_pipeline import (
PageRegion,
RowGeometry,
_cells_to_vocab_entries,
_fix_character_confusion,
_fix_phonetic_brackets,
_split_comma_entries,
_attach_example_sentences,
analyze_layout,
analyze_layout_by_words,
build_cell_grid,
build_word_grid,
classify_column_types,
create_layout_image,
@@ -1075,35 +1081,60 @@ async def detect_words(session_id: str, engine: str = "auto", pronunciation: str
for r in row_result["rows"]
]
# Build word grid — pass both binarized (for Tesseract) and BGR (for RapidOCR)
entries = build_word_grid(
# Build generic cell grid
cells, columns_meta = build_cell_grid(
ocr_img, col_regions, row_geoms, img_w, img_h,
ocr_engine=engine, img_bgr=dewarped_bgr,
pronunciation=pronunciation,
)
duration = time.time() - t0
# Build summary
summary = {
"total_entries": len(entries),
"with_english": sum(1 for e in entries if e.get("english")),
"with_german": sum(1 for e in entries if e.get("german")),
"low_confidence": sum(1 for e in entries if e.get("confidence", 0) < 50),
}
# Layout detection
col_types = {c['type'] for c in columns_meta}
is_vocab = bool(col_types & {'column_en', 'column_de'})
# Count content rows and columns for grid_shape
n_content_rows = len([r for r in row_geoms if r.row_type == 'content'])
n_cols = len(columns_meta)
# Determine which engine was actually used
used_engine = entries[0].get("ocr_engine", "tesseract") if entries else engine
used_engine = cells[0].get("ocr_engine", "tesseract") if cells else engine
# Grid result (always generic)
word_result = {
"entries": entries,
"entry_count": len(entries),
"cells": cells,
"grid_shape": {
"rows": n_content_rows,
"cols": n_cols,
"total_cells": len(cells),
},
"columns_used": columns_meta,
"layout": "vocab" if is_vocab else "generic",
"image_width": img_w,
"image_height": img_h,
"duration_seconds": round(duration, 2),
"summary": summary,
"ocr_engine": used_engine,
"summary": {
"total_cells": len(cells),
"non_empty_cells": sum(1 for c in cells if c.get("text")),
"low_confidence": sum(1 for c in cells if 0 < c.get("confidence", 0) < 50),
},
}
# For vocab layout: add post-processed vocab_entries (backwards compat)
if is_vocab:
entries = _cells_to_vocab_entries(cells, columns_meta)
entries = _fix_character_confusion(entries)
entries = _fix_phonetic_brackets(entries, pronunciation=pronunciation)
entries = _split_comma_entries(entries)
entries = _attach_example_sentences(entries)
word_result["vocab_entries"] = entries
# Also keep "entries" key for backwards compatibility
word_result["entries"] = entries
word_result["entry_count"] = len(entries)
word_result["summary"]["total_entries"] = len(entries)
word_result["summary"]["with_english"] = sum(1 for e in entries if e.get("english"))
word_result["summary"]["with_german"] = sum(1 for e in entries if e.get("german"))
# Persist to DB
await update_session_db(
session_id,
@@ -1114,7 +1145,8 @@ async def detect_words(session_id: str, engine: str = "auto", pronunciation: str
cached["word_result"] = word_result
logger.info(f"OCR Pipeline: words session {session_id}: "
f"{len(entries)} entries ({duration:.2f}s), summary: {summary}")
f"layout={word_result['layout']}, "
f"{len(cells)} cells ({duration:.2f}s), summary: {word_result['summary']}")
return {
"session_id": session_id,
@@ -1232,17 +1264,19 @@ async def _get_rows_overlay(session_id: str) -> Response:
async def _get_words_overlay(session_id: str) -> Response:
"""Generate dewarped image with word grid cells drawn on it."""
"""Generate dewarped image with cell grid drawn on it."""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
word_result = session.get("word_result")
if not word_result or not word_result.get("entries"):
if not word_result:
raise HTTPException(status_code=404, detail="No word data available")
column_result = session.get("column_result")
row_result = session.get("row_result")
# Support both new cell-based and legacy entry-based formats
cells = word_result.get("cells")
if not cells and not word_result.get("entries"):
raise HTTPException(status_code=404, detail="No word data available")
# Load dewarped image
dewarped_png = await get_session_image(session_id, "dewarped")
@@ -1256,80 +1290,105 @@ async def _get_words_overlay(session_id: str) -> Response:
img_h, img_w = img.shape[:2]
# Color map for column types (BGR)
col_colors = {
"column_en": (255, 180, 0), # Blue
"column_de": (0, 200, 0), # Green
"column_example": (0, 140, 255), # Orange
}
overlay = img.copy()
# Build grid from column_result × row_result (the actual cells)
columns = []
if column_result and column_result.get("columns"):
columns = [c for c in column_result["columns"]
if c.get("type", "").startswith("column_")]
if cells:
# New cell-based overlay: color by column index
col_palette = [
(255, 180, 0), # Blue (BGR)
(0, 200, 0), # Green
(0, 140, 255), # Orange
(200, 100, 200), # Purple
(200, 200, 0), # Cyan
(100, 200, 200), # Yellow-ish
]
content_rows_data = []
if row_result and row_result.get("rows"):
content_rows_data = [r for r in row_result["rows"]
if r.get("row_type") == "content"]
for cell in cells:
bbox = cell.get("bbox_px", {})
cx = bbox.get("x", 0)
cy = bbox.get("y", 0)
cw = bbox.get("w", 0)
ch = bbox.get("h", 0)
if cw <= 0 or ch <= 0:
continue
# Draw grid: column × row cells
for col in columns:
col_type = col.get("type", "")
color = col_colors.get(col_type, (200, 200, 200))
cx, cw = col["x"], col["width"]
col_idx = cell.get("col_index", 0)
color = col_palette[col_idx % len(col_palette)]
for row in content_rows_data:
ry, rh = row["y"], row["height"]
# Cell rectangle (exact grid intersection, no padding)
cv2.rectangle(img, (cx, ry), (cx + cw, ry + rh), color, 1)
# Cell rectangle border
cv2.rectangle(img, (cx, cy), (cx + cw, cy + ch), color, 1)
# Semi-transparent fill
cv2.rectangle(overlay, (cx, ry), (cx + cw, ry + rh), color, -1)
cv2.rectangle(overlay, (cx, cy), (cx + cw, cy + ch), color, -1)
# Place OCR text labels inside grid cells
# Build lookup: row_index → entry for fast access
entries = word_result["entries"]
entry_by_row: Dict[int, Dict] = {}
for entry in entries:
entry_by_row[entry.get("row_index", -1)] = entry
# Cell-ID label (top-left corner)
cell_id = cell.get("cell_id", "")
cv2.putText(img, cell_id, (cx + 2, cy + 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.28, color, 1)
for row_idx, row in enumerate(content_rows_data):
entry = entry_by_row.get(row_idx)
if not entry:
continue
# Text label (bottom of cell)
text = cell.get("text", "")
if text:
conf = cell.get("confidence", 0)
if conf >= 70:
text_color = (0, 180, 0)
elif conf >= 50:
text_color = (0, 180, 220)
else:
text_color = (0, 0, 220)
conf = entry.get("confidence", 0)
if conf >= 70:
text_color = (0, 180, 0)
elif conf >= 50:
text_color = (0, 180, 220)
else:
text_color = (0, 0, 220)
label = text.replace('\n', ' ')[:30]
cv2.putText(img, label, (cx + 3, cy + ch - 4),
cv2.FONT_HERSHEY_SIMPLEX, 0.35, text_color, 1)
else:
# Legacy fallback: entry-based overlay (for old sessions)
column_result = session.get("column_result")
row_result = session.get("row_result")
col_colors = {
"column_en": (255, 180, 0),
"column_de": (0, 200, 0),
"column_example": (0, 140, 255),
}
ry, rh = row["y"], row["height"]
columns = []
if column_result and column_result.get("columns"):
columns = [c for c in column_result["columns"]
if c.get("type", "").startswith("column_")]
content_rows_data = []
if row_result and row_result.get("rows"):
content_rows_data = [r for r in row_result["rows"]
if r.get("row_type") == "content"]
for col in columns:
col_type = col.get("type", "")
color = col_colors.get(col_type, (200, 200, 200))
cx, cw = col["x"], col["width"]
for row in content_rows_data:
ry, rh = row["y"], row["height"]
cv2.rectangle(img, (cx, ry), (cx + cw, ry + rh), color, 1)
cv2.rectangle(overlay, (cx, ry), (cx + cw, ry + rh), color, -1)
# Pick the right text field for this column
if col_type == "column_en":
text = entry.get("english", "")
elif col_type == "column_de":
text = entry.get("german", "")
elif col_type == "column_example":
text = entry.get("example", "")
else:
text = ""
entries = word_result["entries"]
entry_by_row: Dict[int, Dict] = {}
for entry in entries:
entry_by_row[entry.get("row_index", -1)] = entry
if text:
label = text.replace('\n', ' ')[:30]
font_scale = 0.35
cv2.putText(img, label, (cx + 3, ry + rh - 4),
cv2.FONT_HERSHEY_SIMPLEX, font_scale, text_color, 1)
for row_idx, row in enumerate(content_rows_data):
entry = entry_by_row.get(row_idx)
if not entry:
continue
conf = entry.get("confidence", 0)
text_color = (0, 180, 0) if conf >= 70 else (0, 180, 220) if conf >= 50 else (0, 0, 220)
ry, rh = row["y"], row["height"]
for col in columns:
col_type = col.get("type", "")
cx, cw = col["x"], col["width"]
field = {"column_en": "english", "column_de": "german", "column_example": "example"}.get(col_type, "")
text = entry.get(field, "") if field else ""
if text:
label = text.replace('\n', ' ')[:30]
cv2.putText(img, label, (cx + 3, ry + rh - 4),
cv2.FONT_HERSHEY_SIMPLEX, 0.35, text_color, 1)
# Blend overlay at 10% opacity
cv2.addWeighted(overlay, 0.1, img, 0.9, 0, img)