Files
breakpilot-lehrer/klausur-service/backend/grid_editor_api.py
Benjamin Admin c42924a94a
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 34s
CI / test-go-edu-search (push) Successful in 24s
CI / test-python-klausur (push) Failing after 1m57s
CI / test-python-agent-core (push) Successful in 15s
CI / test-nodejs-website (push) Successful in 21s
Fix IPA correction persistence and false-positive prefix matching
Step 5i was overwriting IPA-corrected text from Step 5c when
reconstructing cells from word_boxes. Added _ipa_corrected flag
to preserve corrections. Also tightened merged-token prefix matching
(min prefix 4 chars, min suffix 3 chars) to prevent false positives
like "sis" being extracted from "si:said".

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-25 07:26:32 +01:00

1644 lines
73 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Grid Editor API — builds a structured, zone-aware grid from Kombi OCR results.
Takes the merged word positions from paddle-kombi / rapid-kombi and:
1. Detects bordered boxes on the image (cv_box_detect)
2. Splits the page into zones (content + box regions)
3. Clusters words into columns and rows per zone
4. Returns a hierarchical StructuredGrid for the frontend Excel-like editor
Lizenz: Apache 2.0 (kommerziell nutzbar)
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import logging
import re
import time
from typing import Any, Dict, List, Optional, Tuple
import cv2
import numpy as np
from fastapi import APIRouter, HTTPException, Request
from cv_box_detect import detect_boxes, split_page_into_zones
from cv_graphic_detect import detect_graphic_elements
from cv_vocab_types import PageZone
from cv_color_detect import detect_word_colors, recover_colored_text
from cv_ocr_engines import fix_cell_phonetics, fix_ipa_continuation_cell, _text_has_garbled_ipa, _lookup_ipa, _words_to_reading_order_text, _group_words_into_lines
from ocr_pipeline_session_store import (
get_session_db,
get_session_image,
update_session_db,
)
from grid_editor_helpers import (
_filter_border_strip_words,
_cluster_columns_by_alignment,
_GRID_GHOST_CHARS,
_filter_border_ghosts,
_MARKER_CHARS,
_merge_inline_marker_columns,
_flatten_word_boxes,
_words_in_zone,
_PIPE_RE_VSPLIT,
_detect_vertical_dividers,
_split_zone_at_vertical_dividers,
_merge_content_zones_across_boxes,
_detect_heading_rows_by_color,
_detect_heading_rows_by_single_cell,
_detect_header_rows,
_build_zone_grid,
_get_content_bounds,
_filter_decorative_margin,
_filter_footer_words,
_filter_header_junk,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["grid-editor"])
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Core computation (used by build-grid endpoint and regression tests)
# ---------------------------------------------------------------------------
async def _build_grid_core(session_id: str, session: dict) -> dict:
"""Core grid building logic — pure computation, no HTTP or DB side effects.
Args:
session_id: Session identifier (for logging and image loading).
session: Full session dict from get_session_db().
Returns:
StructuredGrid result dict.
Raises:
ValueError: If session data is incomplete.
"""
t0 = time.time()
# 1. Validate and load word results
word_result = session.get("word_result")
if not word_result or not word_result.get("cells"):
raise ValueError("No word results found. Run paddle-kombi or rapid-kombi first.")
img_w = word_result.get("image_width", 0)
img_h = word_result.get("image_height", 0)
if not img_w or not img_h:
raise ValueError("Missing image dimensions in word_result")
# 2. Flatten all word boxes from cells
all_words = _flatten_word_boxes(word_result["cells"])
if not all_words:
raise ValueError("No word boxes found in cells")
logger.info("build-grid session %s: %d words from %d cells",
session_id, len(all_words), len(word_result["cells"]))
# 2b. Filter decorative margin columns (alphabet graphics).
# Some worksheets have a decorative alphabet strip along one margin
# (A-Z in a graphic). OCR reads these as single-char words aligned
# vertically. Detect and remove them before grid building.
margin_strip_info = _filter_decorative_margin(all_words, img_w, logger, session_id)
margin_strip_detected = margin_strip_info.get("found", False)
# Read document_category from session (user-selected or auto-detected)
document_category = session.get("document_category")
# 2c. Filter footer rows (page numbers at the very bottom).
# Isolated short text in the bottom 5% of the page is typically a
# page number ("64", "S. 12") and not real content.
_filter_footer_words(all_words, img_h, logger, session_id)
# 2c2. Filter OCR junk from header illustrations.
# Low-confidence short fragments above the first real content row.
_filter_header_junk(all_words, img_h, logger, session_id)
# 2d. Filter words inside user-defined exclude regions (from Structure step).
# These are explicitly marked by the user, so ALL words inside are removed
# regardless of confidence.
structure_result = session.get("structure_result")
exclude_rects = []
if structure_result:
for er in structure_result.get("exclude_regions", []):
exclude_rects.append({
"x": er["x"], "y": er["y"],
"w": er["w"], "h": er["h"],
})
if exclude_rects:
before = len(all_words)
filtered = []
for w in all_words:
w_cx = w["left"] + w.get("width", 0) / 2
w_cy = w["top"] + w.get("height", 0) / 2
inside = any(
er["x"] <= w_cx <= er["x"] + er["w"]
and er["y"] <= w_cy <= er["y"] + er["h"]
for er in exclude_rects
)
if not inside:
filtered.append(w)
removed = before - len(filtered)
if removed:
all_words = filtered
logger.info(
"build-grid session %s: removed %d words inside %d user exclude region(s)",
session_id, removed, len(exclude_rects),
)
# 2e. Hard-filter words inside graphic/image regions from structure step.
# ALL words inside graphic regions are removed regardless of confidence —
# images cannot contain real text; any OCR words inside are artifacts.
# After image loading (Step 3a) we augment these with freshly detected
# graphic regions from cv_graphic_detect.
graphic_rects: List[Dict[str, int]] = []
if structure_result:
for g in structure_result.get("graphics", []):
graphic_rects.append({
"x": g["x"], "y": g["y"],
"w": g["w"], "h": g["h"],
})
if graphic_rects:
before = len(all_words)
all_words = [
w for w in all_words
if not any(
gr["x"] <= w["left"] + w.get("width", 0) / 2 <= gr["x"] + gr["w"]
and gr["y"] <= w["top"] + w.get("height", 0) / 2 <= gr["y"] + gr["h"]
for gr in graphic_rects
)
]
removed = before - len(all_words)
if removed:
logger.info(
"build-grid session %s: hard-removed %d words inside %d structure graphic region(s)",
session_id, removed, len(graphic_rects),
)
# 3. Load image for box detection
img_png = await get_session_image(session_id, "cropped")
if not img_png:
img_png = await get_session_image(session_id, "dewarped")
if not img_png:
img_png = await get_session_image(session_id, "original")
zones_data: List[Dict[str, Any]] = []
boxes_detected = 0
recovered_count = 0
border_prefiltered = False
img_bgr = None
content_x, content_y, content_w, content_h = _get_content_bounds(all_words)
if img_png:
# Decode image for color detection + box detection
arr = np.frombuffer(img_png, dtype=np.uint8)
img_bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if img_bgr is not None:
# --- 3a. Detect graphic/image regions via CV and hard-filter ---
# Pass only significant words (len >= 3) to the detector so that
# short OCR artifacts inside images don't fool the text-vs-graphic
# heuristic (it counts word centroids to distinguish text from images).
sig_words = [w for w in all_words if len((w.get("text") or "").strip()) >= 3]
fresh_graphics = detect_graphic_elements(img_bgr, sig_words)
if fresh_graphics:
fresh_rects = [
{"x": g.x, "y": g.y, "w": g.width, "h": g.height}
for g in fresh_graphics
]
graphic_rects.extend(fresh_rects)
logger.info(
"build-grid session %s: detected %d graphic region(s) via CV",
session_id, len(fresh_graphics),
)
# Hard-filter words inside newly detected graphic regions
before = len(all_words)
all_words = [
w for w in all_words
if not any(
gr["x"] <= w["left"] + w.get("width", 0) / 2 <= gr["x"] + gr["w"]
and gr["y"] <= w["top"] + w.get("height", 0) / 2 <= gr["y"] + gr["h"]
for gr in fresh_rects
)
]
removed = before - len(all_words)
if removed:
logger.info(
"build-grid session %s: hard-removed %d words inside %d fresh graphic region(s)",
session_id, removed, len(fresh_rects),
)
# --- Recover colored text that OCR missed (before grid building) ---
recovered = recover_colored_text(img_bgr, all_words)
if recovered and graphic_rects:
# Filter recovered chars inside graphic regions
recovered = [
r for r in recovered
if not any(
gr["x"] <= r["left"] + r.get("width", 0) / 2 <= gr["x"] + gr["w"]
and gr["y"] <= r["top"] + r.get("height", 0) / 2 <= gr["y"] + gr["h"]
for gr in graphic_rects
)
]
if recovered:
recovered_count = len(recovered)
all_words.extend(recovered)
logger.info(
"build-grid session %s: +%d recovered colored words",
session_id, recovered_count,
)
# Detect bordered boxes
boxes = detect_boxes(
img_bgr,
content_x=content_x,
content_w=content_w,
content_y=content_y,
content_h=content_h,
)
boxes_detected = len(boxes)
if boxes:
# Filter border ghost words before grid building
all_words, ghost_count = _filter_border_ghosts(all_words, boxes)
if ghost_count:
logger.info(
"build-grid session %s: removed %d border ghost words",
session_id, ghost_count,
)
# Split page into zones
page_zones = split_page_into_zones(
content_x, content_y, content_w, content_h, boxes
)
# Merge content zones separated by box zones
page_zones = _merge_content_zones_across_boxes(
page_zones, content_x, content_w
)
# 3b. Detect vertical dividers and split content zones
vsplit_group_counter = 0
expanded_zones: List = []
for pz in page_zones:
if pz.zone_type != "content":
expanded_zones.append(pz)
continue
zone_words = _words_in_zone(
all_words, pz.y, pz.height, pz.x, pz.width
)
divider_xs = _detect_vertical_dividers(
zone_words, pz.x, pz.width, pz.y, pz.height
)
if divider_xs:
sub_zones = _split_zone_at_vertical_dividers(
pz, divider_xs, vsplit_group_counter
)
expanded_zones.extend(sub_zones)
vsplit_group_counter += 1
# Remove pipe words so they don't appear in sub-zones
pipe_ids = set(
id(w) for w in zone_words
if _PIPE_RE_VSPLIT.match((w.get("text") or "").strip())
)
all_words[:] = [w for w in all_words if id(w) not in pipe_ids]
logger.info(
"build-grid: vertical split zone %d at x=%s%d sub-zones",
pz.index, [int(x) for x in divider_xs], len(sub_zones),
)
else:
expanded_zones.append(pz)
# Re-index zones
for i, pz in enumerate(expanded_zones):
pz.index = i
page_zones = expanded_zones
# --- Union columns from all content zones ---
# Each content zone detects columns independently. Narrow
# columns (page refs, markers) may appear in only one zone.
# Merge column split-points from ALL content zones so every
# zone shares the full column set.
# NOTE: Zones from a vertical split are independent and must
# NOT share columns with each other.
# First pass: build grids per zone independently
zone_grids: List[Dict] = []
for pz in page_zones:
zone_words = _words_in_zone(
all_words, pz.y, pz.height, pz.x, pz.width
)
# Filter recovered single-char artifacts in ALL zones
# (decorative colored pixel blobs like !, ?, • from
# recover_colored_text that don't represent real text)
before = len(zone_words)
zone_words = [
w for w in zone_words
if not (
w.get("recovered")
and len(w.get("text", "").strip()) <= 2
)
]
removed = before - len(zone_words)
if removed:
logger.info(
"build-grid: filtered %d recovered artifacts from %s zone %d",
removed, pz.zone_type, pz.index,
)
# Filter words inside image overlay regions (merged box zones)
if pz.image_overlays:
before_ov = len(zone_words)
zone_words = [
w for w in zone_words
if not any(
ov["y"] <= w["top"] + w["height"] / 2 <= ov["y"] + ov["height"]
and ov["x"] <= w["left"] + w["width"] / 2 <= ov["x"] + ov["width"]
for ov in pz.image_overlays
)
]
ov_removed = before_ov - len(zone_words)
if ov_removed:
logger.info(
"build-grid: filtered %d words inside image overlays from zone %d",
ov_removed, pz.index,
)
zone_words, bs_removed = _filter_border_strip_words(zone_words)
if bs_removed:
border_prefiltered = True
logger.info(
"build-grid: pre-filtered %d border-strip words from zone %d",
bs_removed, pz.index,
)
grid = _build_zone_grid(
zone_words, pz.x, pz.y, pz.width, pz.height,
pz.index, img_w, img_h,
skip_first_row_header=bool(pz.image_overlays),
)
zone_grids.append({"pz": pz, "words": zone_words, "grid": grid})
# Second pass: merge column boundaries from all content zones
# Exclude zones from vertical splits — they have independent columns.
content_zones = [
zg for zg in zone_grids
if zg["pz"].zone_type == "content"
and zg["pz"].vsplit_group is None
]
if len(content_zones) > 1:
# Collect column split points (x_min of non-first columns)
all_split_xs: List[float] = []
for zg in content_zones:
raw_cols = zg["grid"].get("_raw_columns", [])
for col in raw_cols[1:]:
all_split_xs.append(col["x_min"])
if all_split_xs:
all_split_xs.sort()
merge_distance = max(25, int(content_w * 0.03))
merged_xs = [all_split_xs[0]]
for x in all_split_xs[1:]:
if x - merged_xs[-1] < merge_distance:
merged_xs[-1] = (merged_xs[-1] + x) / 2
else:
merged_xs.append(x)
total_cols = len(merged_xs) + 1
max_zone_cols = max(
len(zg["grid"].get("_raw_columns", []))
for zg in content_zones
)
# Apply union whenever it has at least as many
# columns as the best single zone. Even with the
# same count the union boundaries are better because
# they incorporate evidence from all zones.
if total_cols >= max_zone_cols:
cx_min = min(w["left"] for w in all_words)
cx_max = max(
w["left"] + w["width"] for w in all_words
)
merged_columns: List[Dict[str, Any]] = []
prev_x = cx_min
for i, sx in enumerate(merged_xs):
merged_columns.append({
"index": i,
"type": f"column_{i + 1}",
"x_min": prev_x,
"x_max": sx,
})
prev_x = sx
merged_columns.append({
"index": len(merged_xs),
"type": f"column_{len(merged_xs) + 1}",
"x_min": prev_x,
"x_max": cx_max,
})
# Re-build ALL content zones with merged columns
for zg in zone_grids:
pz = zg["pz"]
if pz.zone_type == "content":
grid = _build_zone_grid(
zg["words"], pz.x, pz.y,
pz.width, pz.height,
pz.index, img_w, img_h,
global_columns=merged_columns,
skip_first_row_header=bool(pz.image_overlays),
)
zg["grid"] = grid
logger.info(
"build-grid session %s: union of %d content "
"zones → %d merged columns (max single zone: %d)",
session_id, len(content_zones),
total_cols, max_zone_cols,
)
for zg in zone_grids:
pz = zg["pz"]
grid = zg["grid"]
# Remove internal _raw_columns before adding to response
grid.pop("_raw_columns", None)
zone_entry: Dict[str, Any] = {
"zone_index": pz.index,
"zone_type": pz.zone_type,
"bbox_px": {
"x": pz.x, "y": pz.y,
"w": pz.width, "h": pz.height,
},
"bbox_pct": {
"x": round(pz.x / img_w * 100, 2) if img_w else 0,
"y": round(pz.y / img_h * 100, 2) if img_h else 0,
"w": round(pz.width / img_w * 100, 2) if img_w else 0,
"h": round(pz.height / img_h * 100, 2) if img_h else 0,
},
"border": None,
"word_count": len(zg["words"]),
**grid,
}
if pz.box:
zone_entry["border"] = {
"thickness": pz.box.border_thickness,
"confidence": pz.box.confidence,
}
if pz.image_overlays:
zone_entry["image_overlays"] = pz.image_overlays
if pz.layout_hint:
zone_entry["layout_hint"] = pz.layout_hint
if pz.vsplit_group is not None:
zone_entry["vsplit_group"] = pz.vsplit_group
zones_data.append(zone_entry)
# 4. Fallback: no boxes detected → single zone with all words
if not zones_data:
# Filter recovered single-char artifacts (same as in zone loop above)
before = len(all_words)
filtered_words = [
w for w in all_words
if not (w.get("recovered") and len(w.get("text", "").strip()) <= 2)
]
removed = before - len(filtered_words)
if removed:
logger.info(
"build-grid session %s: filtered %d recovered artifacts (fallback zone)",
session_id, removed,
)
# Pre-filter border-strip words so column detection is not
# confused by edge artifacts. When this removes words, Step 4e
# is skipped (it would otherwise re-detect content as a "strip").
filtered_words, bs_removed = _filter_border_strip_words(filtered_words)
if bs_removed:
border_prefiltered = True
logger.info(
"build-grid session %s: pre-filtered %d border-strip words",
session_id, bs_removed,
)
grid = _build_zone_grid(
filtered_words, content_x, content_y, content_w, content_h,
0, img_w, img_h,
)
grid.pop("_raw_columns", None)
zones_data.append({
"zone_index": 0,
"zone_type": "content",
"bbox_px": {
"x": content_x, "y": content_y,
"w": content_w, "h": content_h,
},
"bbox_pct": {
"x": round(content_x / img_w * 100, 2) if img_w else 0,
"y": round(content_y / img_h * 100, 2) if img_h else 0,
"w": round(content_w / img_w * 100, 2) if img_w else 0,
"h": round(content_h / img_h * 100, 2) if img_h else 0,
},
"border": None,
"word_count": len(all_words),
**grid,
})
# 4b. Remove junk rows: rows where ALL cells contain only short,
# low-confidence text (OCR noise, stray marks). Real vocabulary rows
# have at least one word with conf >= 50 or meaningful text length.
# Also remove "oversized stub" rows: rows with ≤2 very short words
# whose word-boxes are significantly taller than the median (e.g.
# large red page numbers like "( 9" that are not real text content).
_JUNK_CONF_THRESHOLD = 50
_JUNK_MAX_TEXT_LEN = 3
for z in zones_data:
cells = z.get("cells", [])
rows = z.get("rows", [])
if not cells or not rows:
continue
# Compute median word height across the zone for oversized detection
all_wb_heights = [
wb["height"]
for cell in cells
for wb in cell.get("word_boxes") or []
if wb.get("height", 0) > 0
]
median_wb_h = sorted(all_wb_heights)[len(all_wb_heights) // 2] if all_wb_heights else 28
junk_row_indices = set()
for row in rows:
ri = row["index"]
row_cells = [c for c in cells if c.get("row_index") == ri]
if not row_cells:
continue
row_wbs = [
wb for cell in row_cells
for wb in cell.get("word_boxes") or []
]
# Rule 1: ALL word_boxes are low-conf AND short text
all_junk = True
for wb in row_wbs:
text = (wb.get("text") or "").strip()
conf = wb.get("conf", 0)
if conf >= _JUNK_CONF_THRESHOLD or len(text) > _JUNK_MAX_TEXT_LEN:
all_junk = False
break
if all_junk and row_wbs:
junk_row_indices.add(ri)
continue
# Rule 2: oversized stub — ≤3 words, short total text,
# and word height > 1.8× median (page numbers, stray marks,
# OCR from illustration labels like "SEA &")
if len(row_wbs) <= 3:
total_text = "".join((wb.get("text") or "").strip() for wb in row_wbs)
max_h = max((wb.get("height", 0) for wb in row_wbs), default=0)
if len(total_text) <= 5 and max_h > median_wb_h * 1.8:
junk_row_indices.add(ri)
continue
# Rule 3: scattered debris — rows with only tiny fragments
# (e.g. OCR artifacts from illustrations/graphics).
# If the row has no word longer than 2 chars, it's noise.
longest = max(len((wb.get("text") or "").strip()) for wb in row_wbs)
if longest <= 2:
junk_row_indices.add(ri)
continue
if junk_row_indices:
z["cells"] = [c for c in cells if c.get("row_index") not in junk_row_indices]
z["rows"] = [r for r in rows if r["index"] not in junk_row_indices]
logger.info(
"build-grid: removed %d junk rows from zone %d: %s",
len(junk_row_indices), z["zone_index"],
sorted(junk_row_indices),
)
# 4c. Remove oversized word_boxes from individual cells.
# OCR artifacts from graphics/images (e.g. a huge "N" from a map image)
# have word heights 3-5x the median. Remove them per-word so they don't
# pollute cells that also contain valid text in other columns.
for z in zones_data:
cells = z.get("cells", [])
if not cells:
continue
all_wh = [
wb["height"]
for cell in cells
for wb in cell.get("word_boxes") or []
if wb.get("height", 0) > 0
]
if not all_wh:
continue
med_h = sorted(all_wh)[len(all_wh) // 2]
oversized_threshold = med_h * 3
removed_oversized = 0
for cell in cells:
wbs = cell.get("word_boxes") or []
filtered = [wb for wb in wbs if wb.get("height", 0) <= oversized_threshold]
if len(filtered) < len(wbs):
removed_oversized += len(wbs) - len(filtered)
cell["word_boxes"] = filtered
cell["text"] = _words_to_reading_order_text(filtered)
if removed_oversized:
# Remove cells that became empty after oversized removal
z["cells"] = [c for c in cells if c.get("word_boxes")]
logger.info(
"build-grid: removed %d oversized word_boxes (>%dpx) from zone %d",
removed_oversized, oversized_threshold, z.get("zone_index", 0),
)
# 4d. Remove pipe-character word_boxes (column divider artifacts).
# OCR reads physical vertical divider lines as "|" or "||" characters.
# These sit at consistent x positions near column boundaries and pollute
# cell text. Remove them from word_boxes and rebuild cell text.
# NOTE: Zones from a vertical split already had pipes removed in step 3b.
_PIPE_RE = re.compile(r"^\|+$")
for z in zones_data:
if z.get("vsplit_group") is not None:
continue # pipes already removed before split
removed_pipes = 0
for cell in z.get("cells", []):
wbs = cell.get("word_boxes") or []
filtered = [wb for wb in wbs if not _PIPE_RE.match((wb.get("text") or "").strip())]
if len(filtered) < len(wbs):
removed_pipes += len(wbs) - len(filtered)
cell["word_boxes"] = filtered
cell["text"] = _words_to_reading_order_text(filtered)
# Remove cells that became empty after pipe removal
if removed_pipes:
z["cells"] = [c for c in z.get("cells", []) if (c.get("word_boxes") or c.get("text", "").strip())]
logger.info(
"build-grid: removed %d pipe-divider word_boxes from zone %d",
removed_pipes, z.get("zone_index", 0),
)
# Strip pipe chars ONLY from word_boxes/cells where the pipe is an
# OCR column-divider artifact. Preserve pipes that are embedded in
# words as syllable separators (e.g. "zu|trau|en") — these are
# intentional and used in dictionary Ground Truth.
for z in zones_data:
for cell in z.get("cells", []):
for wb in cell.get("word_boxes", []):
wbt = wb.get("text", "")
# Only strip if the ENTIRE word_box is just pipe(s)
# (handled by _PIPE_RE above) — leave embedded pipes alone
text = cell.get("text", "")
if "|" in text:
# Only strip leading/trailing pipes (OCR artifacts at cell edges)
cleaned = text.strip("|").strip()
if cleaned != text.strip():
cell["text"] = cleaned
# 4e. Detect and remove page-border decoration strips.
# Skipped when the pre-filter already removed border words BEFORE
# column detection — re-running would incorrectly detect the
# leftmost content column as a "strip".
border_strip_removed = 0
if border_prefiltered:
logger.info("Step 4e: skipped (border pre-filter already applied)")
else:
# Some textbooks have decorative alphabet strips along the page
# edge. OCR picks up scattered letters from these as artifacts.
# Detection: find the first significant x-gap (>30 px) from each
# page edge between a small cluster (<20 %) and the main content.
for z in zones_data:
cells = z.get("cells", [])
if not cells:
continue
all_wbs_with_cell: List[tuple] = [] # (left, wb, cell)
for cell in cells:
for wb in cell.get("word_boxes") or []:
all_wbs_with_cell.append((wb.get("left", 0), wb, cell))
if len(all_wbs_with_cell) < 10:
continue
all_wbs_with_cell.sort(key=lambda t: t[0])
total = len(all_wbs_with_cell)
# -- Left-edge scan --
left_strip_count = 0
left_gap = 0
running_right = 0
for gi in range(total - 1):
running_right = max(
running_right,
all_wbs_with_cell[gi][0] + all_wbs_with_cell[gi][1].get("width", 0),
)
gap = all_wbs_with_cell[gi + 1][0] - running_right
if gap > 30:
left_strip_count = gi + 1
left_gap = gap
break
# -- Right-edge scan --
right_strip_count = 0
right_gap = 0
running_left = all_wbs_with_cell[-1][0]
for gi in range(total - 1, 0, -1):
running_left = min(running_left, all_wbs_with_cell[gi][0])
prev_right = (
all_wbs_with_cell[gi - 1][0]
+ all_wbs_with_cell[gi - 1][1].get("width", 0)
)
gap = running_left - prev_right
if gap > 30:
right_strip_count = total - gi
right_gap = gap
break
strip_wbs: set = set()
strip_side = ""
strip_gap = 0
strip_count = 0
if left_strip_count > 0 and left_strip_count / total < 0.20:
strip_side = "left"
strip_count = left_strip_count
strip_gap = left_gap
strip_wbs = {id(t[1]) for t in all_wbs_with_cell[:left_strip_count]}
elif right_strip_count > 0 and right_strip_count / total < 0.20:
strip_side = "right"
strip_count = right_strip_count
strip_gap = right_gap
strip_wbs = {id(t[1]) for t in all_wbs_with_cell[total - right_strip_count:]}
if not strip_wbs:
continue
for cell in cells:
wbs = cell.get("word_boxes") or []
filtered = [wb for wb in wbs if id(wb) not in strip_wbs]
if len(filtered) < len(wbs):
border_strip_removed += len(wbs) - len(filtered)
cell["word_boxes"] = filtered
cell["text"] = _words_to_reading_order_text(filtered)
z["cells"] = [c for c in cells
if (c.get("word_boxes") or c.get("text", "").strip())]
logger.info(
"Step 4e: removed %d border-strip word_boxes (%s) from zone %d "
"(gap=%dpx, strip=%d/%d wbs)",
border_strip_removed, strip_side, z.get("zone_index", 0),
strip_gap, strip_count, total,
)
# 4f. Remove decorative edge columns (alphabet sidebar safety net).
# Dictionary pages have A-Z letter sidebars that OCR reads as single-
# character word_boxes. These form narrow columns with very short text.
# Detection: edge column where almost ALL cells are single characters.
for z in zones_data:
columns = z.get("columns", [])
cells = z.get("cells", [])
if len(columns) < 3 or not cells:
continue
# Group cells by col_type (skip spanning_header)
col_cells: Dict[str, List[Dict]] = {}
for cell in cells:
ct = cell.get("col_type", "")
if ct.startswith("column_"):
col_cells.setdefault(ct, []).append(cell)
col_types_ordered = sorted(col_cells.keys())
if len(col_types_ordered) < 3:
continue
for edge_ct in [col_types_ordered[0], col_types_ordered[-1]]:
edge_cells_list = col_cells.get(edge_ct, [])
if len(edge_cells_list) < 3:
continue
# Key criterion: average text length and single-char ratio.
# Alphabet sidebars have avg_len ≈ 1.0 and nearly all cells
# are single characters.
texts = [(c.get("text") or "").strip() for c in edge_cells_list]
avg_len = sum(len(t) for t in texts) / len(texts)
single_char = sum(1 for t in texts if len(t) <= 1)
single_ratio = single_char / len(texts)
if avg_len > 1.5:
continue # real content has longer text
if single_ratio < 0.7:
continue # not dominated by single chars
# Remove this edge column
removed_count = len(edge_cells_list)
edge_ids = {id(c) for c in edge_cells_list}
z["cells"] = [c for c in cells if id(c) not in edge_ids]
z["columns"] = [col for col in columns if col.get("col_type") != edge_ct]
logger.info(
"Step 4f: removed decorative edge column '%s' from zone %d "
"(%d cells, avg_len=%.1f, single_char=%.0f%%)",
edge_ct, z.get("zone_index", 0), removed_count,
avg_len, single_ratio * 100,
)
break # only remove one edge per zone
# 5. Color annotation on final word_boxes in cells
if img_bgr is not None:
all_wb: List[Dict] = []
for z in zones_data:
for cell in z.get("cells", []):
all_wb.extend(cell.get("word_boxes", []))
detect_word_colors(img_bgr, all_wb)
# 5a. Heading detection by color + height (after color is available)
heading_count = _detect_heading_rows_by_color(zones_data, img_w, img_h)
if heading_count:
logger.info("Detected %d heading rows by color+height", heading_count)
# 5b. Fix unmatched parentheses in cell text
# OCR often misses opening "(" while detecting closing ")".
# If a cell's text has ")" without a matching "(", prepend "(".
for z in zones_data:
for cell in z.get("cells", []):
text = cell.get("text", "")
if ")" in text and "(" not in text:
cell["text"] = "(" + text
# 5c. IPA phonetic correction — replace garbled OCR phonetics with
# correct IPA from the dictionary (same as in the OCR pipeline).
# Only applies to vocabulary tables (≥3 columns: EN | article | DE).
# Single/two-column layouts are continuous text, not vocab tables.
all_cells = [cell for z in zones_data for cell in z.get("cells", [])]
total_cols = sum(len(z.get("columns", [])) for z in zones_data)
en_col_type = None
if total_cols >= 3:
# Find the column that contains IPA brackets → English headwords.
# Count cells with bracket patterns per col_type. The column with
# the most brackets is the headword column (IPA sits after or below
# headwords). Falls back to longest-average if no brackets found.
col_bracket_count: Dict[str, int] = {}
col_avg_len: Dict[str, List[int]] = {}
for cell in all_cells:
ct = cell.get("col_type", "")
txt = cell.get("text", "") or ""
col_avg_len.setdefault(ct, []).append(len(txt))
if ct.startswith("column_") and '[' in txt:
col_bracket_count[ct] = col_bracket_count.get(ct, 0) + 1
# Pick column with most bracket IPA patterns
if col_bracket_count:
en_col_type = max(col_bracket_count, key=col_bracket_count.get)
else:
# Fallback: longest average text
best_avg = 0
for ct, lengths in col_avg_len.items():
if not ct.startswith("column_"):
continue
avg = sum(lengths) / len(lengths) if lengths else 0
if avg > best_avg:
best_avg = avg
en_col_type = ct
if en_col_type:
for cell in all_cells:
if cell.get("col_type") == en_col_type:
cell["_orig_col_type"] = en_col_type
cell["col_type"] = "column_en"
# Snapshot text before IPA fix to detect which cells were modified
_pre_ipa = {id(c): c.get("text", "") for c in all_cells}
fix_cell_phonetics(all_cells, pronunciation="british")
for cell in all_cells:
orig = cell.pop("_orig_col_type", None)
if orig:
cell["col_type"] = orig
# Mark cells whose text was changed by IPA correction so that
# later steps (5i) don't overwrite the corrected text when
# reconstructing from word_boxes.
if cell.get("text", "") != _pre_ipa.get(id(cell), ""):
cell["_ipa_corrected"] = True
# 5d. Fix IPA continuation cells — cells where the printed
# phonetic transcription wraps to a line below the headword.
# These contain garbled IPA (e.g. "[n, nn]", "[1uedtX,1]").
# Replace garbled text with proper IPA looked up from the
# headword in the previous row's same column.
# Note: We check ALL columns, not just en_col_type, because
# the EN headword column may not be the longest-average column.
_REAL_IPA_CHARS = set("ˈˌəɪɛɒʊʌæɑɔʃʒθðŋ")
ipa_cont_fixed = 0
for z in zones_data:
rows_sorted = sorted(z.get("rows", []), key=lambda r: r["index"])
z_cells = z.get("cells", [])
for idx, row in enumerate(rows_sorted):
if idx == 0:
continue
ri = row["index"]
row_cells = [c for c in z_cells if c.get("row_index") == ri]
for cell in row_cells:
ct = cell.get("col_type", "")
if not ct.startswith("column_"):
continue
cell_text = (cell.get("text") or "").strip()
if not cell_text:
# Step 5c may have emptied garbled IPA cells like
# "[n, nn]" — recover text from word_boxes.
wb_texts = [w.get("text", "")
for w in cell.get("word_boxes", [])]
cell_text = " ".join(wb_texts).strip()
if not cell_text:
continue
is_bracketed = (
cell_text.startswith('[') and cell_text.endswith(']')
)
if is_bracketed:
# Bracketed continuation: "[n, nn]", "[klaoz 'daun]"
# Text like "employee [im'ploi:]" is NOT fully
# bracketed and won't match here.
if not _text_has_garbled_ipa(cell_text):
continue
# Already has proper IPA brackets → skip
if re.search(r'\[[^\]]*[ˈˌəɪɛɒʊʌæɑɔʃʒθðŋ][^\]]*\]', cell_text):
continue
else:
# Unbracketed continuation: "ska:f ska:vz",
# "'sekandarr sku:l". Only treat as IPA
# continuation if this is the ONLY content cell
# in the row (single-cell row) and the text is
# garbled IPA without real IPA Unicode symbols.
content_cells_in_row = [
c for c in row_cells
if c.get("col_type", "").startswith("column_")
and c.get("col_type") != "column_1"
]
if len(content_cells_in_row) != 1:
continue
if not _text_has_garbled_ipa(cell_text):
continue
# Has real IPA symbols → already fixed or valid
if any(c in _REAL_IPA_CHARS for c in cell_text):
continue
# Find headword in previous row, same column
prev_ri = rows_sorted[idx - 1]["index"]
prev_same_col = [
c for c in z_cells
if c.get("row_index") == prev_ri
and c.get("col_type") == ct
]
if not prev_same_col:
continue
prev_text = prev_same_col[0].get("text", "")
fixed = fix_ipa_continuation_cell(
cell_text, prev_text, pronunciation="british",
)
if fixed != cell_text:
cell["text"] = fixed
ipa_cont_fixed += 1
logger.info(
"IPA continuation R%d %s: '%s''%s'",
ri, ct, cell_text, fixed,
)
if ipa_cont_fixed:
logger.info("Fixed %d IPA continuation cells", ipa_cont_fixed)
# 5e. Heading detection by single-cell rows — black headings like
# "Theme" that have normal color and height but are the ONLY cell
# in their row (excluding page_ref column_1). Must run AFTER 5d
# so IPA continuation cells are already processed.
single_heading_count = _detect_heading_rows_by_single_cell(zones_data, img_w, img_h)
if single_heading_count:
logger.info("Detected %d heading rows by single-cell heuristic", single_heading_count)
# 5f. Strip IPA from headings — headings detected in 5e ran AFTER
# IPA correction (5c), so they may have dictionary IPA appended
# (e.g. "Theme [θˈiːm]" → "Theme"). Headings should show the
# original text only.
for z in zones_data:
for cell in z.get("cells", []):
if cell.get("col_type") != "heading":
continue
text = cell.get("text", "")
# Strip trailing IPA bracket: "Theme [θˈiːm]" → "Theme"
stripped = re.sub(r'\s*\[[^\]]*\]\s*$', '', text).strip()
if stripped and stripped != text:
cell["text"] = stripped
# 5g. Extract page_ref cells and footer rows from content zones.
# Page references (column_1 cells like "p.70") sit in rows that
# also contain vocabulary — extract them as zone metadata without
# removing the row. Footer lines (e.g. "two hundred and twelve"
# = page number at bottom) are standalone rows that should be
# removed from the table entirely.
_REAL_IPA_CHARS_SET = set("ˈˌəɪɛɒʊʌæɑɔʃʒθðŋ")
# Page-ref pattern: "p.70", "P.70", ",.65" (garbled "p"), or bare "70"
_PAGE_REF_RE = re.compile(r'^[pP,]?\s*\.?\s*\d+$')
for z in zones_data:
if z.get("zone_type") != "content":
continue
cells = z.get("cells", [])
rows = z.get("rows", [])
if not rows:
continue
# Extract column_1 cells that look like page references
page_refs = []
page_ref_cell_ids = set()
for cell in cells:
if cell.get("col_type") != "column_1":
continue
text = (cell.get("text") or "").strip()
if not text:
continue
if not _PAGE_REF_RE.match(text):
continue
page_refs.append({
"row_index": cell.get("row_index"),
"text": text,
"bbox_pct": cell.get("bbox_pct", {}),
})
page_ref_cell_ids.add(cell.get("cell_id"))
# Remove page_ref cells from the table (but keep their rows)
if page_ref_cell_ids:
z["cells"] = [c for c in z["cells"]
if c.get("cell_id") not in page_ref_cell_ids]
# Detect footer: last non-header row if it has only 1 cell
# and the text is NOT IPA (no real IPA Unicode symbols).
# This catches page numbers like "two hundred and twelve".
footer_rows = []
non_header_rows = [r for r in rows if not r.get("is_header")]
if non_header_rows:
last_row = non_header_rows[-1]
last_ri = last_row["index"]
last_cells = [c for c in z["cells"]
if c.get("row_index") == last_ri]
if len(last_cells) == 1:
text = (last_cells[0].get("text") or "").strip()
# Not IPA (no real IPA symbols) and not a heading
has_real_ipa = any(c in _REAL_IPA_CHARS_SET for c in text)
if text and not has_real_ipa and last_cells[0].get("col_type") != "heading":
footer_rows.append({
"row_index": last_ri,
"text": text,
"bbox_pct": last_cells[0].get("bbox_pct", {}),
})
# Mark footer rows (keep in table, just tag for frontend)
if footer_rows:
footer_ris = {fr["row_index"] for fr in footer_rows}
for r in z["rows"]:
if r["index"] in footer_ris:
r["is_footer"] = True
for c in z["cells"]:
if c.get("row_index") in footer_ris:
c["col_type"] = "footer"
if page_refs or footer_rows:
logger.info(
"Extracted %d page_refs + %d footer rows from zone %d",
len(page_refs), len(footer_rows), z.get("zone_index", 0),
)
# Store as zone-level metadata
if page_refs:
z["page_refs"] = page_refs
if footer_rows:
z["footer"] = footer_rows
# 5h. Convert slash-delimited IPA to bracket notation.
# Dictionary-style pages print IPA between slashes: "tiger /'taiga/"
# Detect the pattern <headword> /ocr_ipa/ and replace with [dict_ipa]
# using the IPA dictionary when available, falling back to the OCR text.
# The regex requires a word character (or ² ³) right before the opening
# slash to avoid false positives like "sb/sth".
_SLASH_IPA_RE = re.compile(
r'(\b[a-zA-Z]+[²³¹]?)\s*' # headword (capture group 1)
r"(/[^/]{2,}/)" # /ipa/ (capture group 2), min 2 chars
)
# Standalone slash IPA at start of text (headword on previous line)
_STANDALONE_SLASH_IPA_RE = re.compile(r'^/([^/]{2,})/')
# IPA between slashes never contains spaces, parentheses, or commas.
# Reject matches that look like grammar: "sb/sth up a) jdn/"
_SLASH_IPA_REJECT_RE = re.compile(r'[\s(),]')
slash_ipa_fixed = 0
for z in zones_data:
for cell in z.get("cells", []):
# Only process English headword column — avoid converting
# German text like "der/die/das" to IPA.
if en_col_type and cell.get("col_type") != en_col_type:
continue
text = cell.get("text", "")
if "/" not in text:
continue
def _replace_slash_ipa(m: re.Match) -> str:
nonlocal slash_ipa_fixed
headword = m.group(1)
ocr_ipa = m.group(2) # includes slashes
inner_raw = ocr_ipa.strip("/").strip()
# Reject if inner content has spaces/parens/commas (grammar)
if _SLASH_IPA_REJECT_RE.search(inner_raw):
return m.group(0)
# Strip superscript digits for lookup
clean_hw = re.sub(r'[²³¹\d]', '', headword).strip()
ipa = _lookup_ipa(clean_hw, "british") if clean_hw else None
if ipa:
slash_ipa_fixed += 1
return f"{headword} [{ipa}]"
# Fallback: keep OCR IPA but convert slashes to brackets
inner = inner_raw.lstrip("'").strip()
if inner:
slash_ipa_fixed += 1
return f"{headword} [{inner}]"
return m.group(0)
new_text = _SLASH_IPA_RE.sub(_replace_slash_ipa, text)
# Second pass: convert remaining /ipa/ after [ipa] from first pass.
# Pattern: [ipa] /ipa2/ → [ipa] [ipa2] (second pronunciation variant)
_AFTER_BRACKET_SLASH = re.compile(r'(?<=\])\s*(/[^/]{2,}/)')
def _replace_trailing_slash(m: re.Match) -> str:
nonlocal slash_ipa_fixed
inner = m.group(1).strip("/").strip().lstrip("'").strip()
if _SLASH_IPA_REJECT_RE.search(inner):
return m.group(0)
if inner:
slash_ipa_fixed += 1
return f" [{inner}]"
return m.group(0)
new_text = _AFTER_BRACKET_SLASH.sub(_replace_trailing_slash, new_text)
# Handle standalone /ipa/ at start (no headword in this cell)
if new_text == text:
m = _STANDALONE_SLASH_IPA_RE.match(text)
if m:
inner = m.group(1).strip()
if not _SLASH_IPA_REJECT_RE.search(inner):
inner = inner.lstrip("'").strip()
if inner:
new_text = "[" + inner + "]" + text[m.end():]
slash_ipa_fixed += 1
if new_text != text:
cell["text"] = new_text
if slash_ipa_fixed:
logger.info("Step 5h: converted %d slash-IPA to bracket notation", slash_ipa_fixed)
# 5i. Remove blue bullet/artifact word_boxes.
# Dictionary pages have small blue square bullets (■) before entries.
# OCR reads these as text artifacts (©, e, *, or even plausible words
# like "fighily" overlapping the real word "tightly").
# Detection rules:
# a) Tiny coloured symbols: area < 200 AND conf < 85 (any non-black)
# b) Overlapping word_boxes: >40% x-overlap → remove lower confidence
# c) Duplicate text: consecutive blue wbs with identical text, gap < 6px
bullet_removed = 0
for z in zones_data:
for cell in z.get("cells", []):
wbs = cell.get("word_boxes") or []
if len(wbs) < 2:
continue
to_remove: set = set()
# Rule (a): tiny coloured symbols (bullets, graphic fragments)
for i, wb in enumerate(wbs):
cn = wb.get("color_name", "black")
if (cn != "black"
and wb.get("width", 0) * wb.get("height", 0) < 200
and wb.get("conf", 100) < 85):
to_remove.add(i)
# Rule (b) + (c): overlap and duplicate detection
# Sort by x for pairwise comparison
_ALPHA_WORD_RE = re.compile(r'^[A-Za-z\u00c0-\u024f\-]+[.,;:!?]*$')
to_merge: List[Tuple[int, int]] = [] # pairs (i1, i2) to merge
indexed = sorted(enumerate(wbs), key=lambda iw: iw[1].get("left", 0))
for p in range(len(indexed) - 1):
i1, w1 = indexed[p]
i2, w2 = indexed[p + 1]
x1s, x1e = w1.get("left", 0), w1.get("left", 0) + w1.get("width", 0)
x2s, x2e = w2.get("left", 0), w2.get("left", 0) + w2.get("width", 0)
overlap = max(0, min(x1e, x2e) - max(x1s, x2s))
min_w = min(w1.get("width", 1), w2.get("width", 1))
gap = x2s - x1e
overlap_pct = overlap / min_w if min_w > 0 else 0
# (b) Significant x-overlap
if overlap_pct > 0.20:
t1 = (w1.get("text") or "").strip()
t2 = (w2.get("text") or "").strip()
# Syllable-split words: both are alphabetic text with
# moderate overlap (20-75%). Merge instead of removing.
# OCR splits words at syllable marks, producing overlapping
# boxes like "zu" + "tiefst" → "zutiefst".
if (overlap_pct <= 0.75
and _ALPHA_WORD_RE.match(t1)
and _ALPHA_WORD_RE.match(t2)):
to_merge.append((i1, i2))
continue
if overlap_pct <= 0.40:
continue # too little overlap and not alphabetic merge
c1 = w1.get("conf", 50)
c2 = w2.get("conf", 50)
# For very high overlap (>90%) with different text,
# prefer the word that exists in the IPA dictionary
# over confidence (OCR can give artifacts high conf).
if overlap_pct > 0.90 and t1.lower() != t2.lower():
in_dict_1 = bool(_lookup_ipa(re.sub(r'[²³¹\d/]', '', t1.lower()), "british")) if t1.isalpha() else False
in_dict_2 = bool(_lookup_ipa(re.sub(r'[²³¹\d/]', '', t2.lower()), "british")) if t2.isalpha() else False
if in_dict_1 and not in_dict_2:
to_remove.add(i2)
continue
elif in_dict_2 and not in_dict_1:
to_remove.add(i1)
continue
if c1 < c2:
to_remove.add(i1)
elif c2 < c1:
to_remove.add(i2)
else:
# Same confidence: remove the taller one (bullet slivers)
if w1.get("height", 0) > w2.get("height", 0):
to_remove.add(i1)
else:
to_remove.add(i2)
# (c) Duplicate text: consecutive blue with same text, gap < 6px
elif (gap < 6
and w1.get("color_name") == "blue"
and w2.get("color_name") == "blue"
and (w1.get("text") or "").strip() == (w2.get("text") or "").strip()):
# Remove the one with lower confidence; if equal, first one
c1 = w1.get("conf", 50)
c2 = w2.get("conf", 50)
to_remove.add(i1 if c1 <= c2 else i2)
# Execute merges first (syllable-split words)
if to_merge:
merged_indices: set = set()
for mi1, mi2 in to_merge:
if mi1 in to_remove or mi2 in to_remove:
continue # don't merge if one is being removed
if mi1 in merged_indices or mi2 in merged_indices:
continue # already merged
mw1, mw2 = wbs[mi1], wbs[mi2]
# Concatenate text (no space — they're parts of one word)
mt1 = (mw1.get("text") or "").rstrip(".,;:!?")
mt2 = (mw2.get("text") or "").strip()
merged_text = mt1 + mt2
# Union bounding box
mx = min(mw1["left"], mw2["left"])
my = min(mw1["top"], mw2["top"])
mr = max(mw1["left"] + mw1["width"],
mw2["left"] + mw2["width"])
mb = max(mw1["top"] + mw1["height"],
mw2["top"] + mw2["height"])
mw1["text"] = merged_text
mw1["left"] = mx
mw1["top"] = my
mw1["width"] = mr - mx
mw1["height"] = mb - my
mw1["conf"] = (mw1.get("conf", 50) + mw2.get("conf", 50)) / 2
to_remove.add(mi2) # remove the second one
merged_indices.add(mi1)
merged_indices.add(mi2)
bullet_removed -= 1 # net: merge, not removal
if to_remove:
bullet_removed += len(to_remove)
filtered = [wb for i, wb in enumerate(wbs) if i not in to_remove]
cell["word_boxes"] = filtered
# Don't overwrite text that was corrected by Step 5c IPA fix
if not cell.get("_ipa_corrected"):
cell["text"] = _words_to_reading_order_text(filtered)
# Remove cells that became empty after bullet removal
if bullet_removed:
for z in zones_data:
z["cells"] = [c for c in z.get("cells", [])
if (c.get("word_boxes") or c.get("text", "").strip())]
logger.info("Step 5i: removed %d bullet/artifact word_boxes", bullet_removed)
# 5j-pre. Remove cells whose text is entirely garbled / artifact noise.
# OCR on image areas produces short nonsensical fragments ("7 EN", "Tr",
# "\\", "PEE", "a=") that survive earlier filters because their rows also
# contain real content in other columns. Remove them here.
_COMMON_SHORT_WORDS = {
# German
"ab", "am", "an", "da", "du", "er", "es", "im", "in", "ja",
"ob", "so", "um", "zu", "wo", "je", "oh", "or",
"die", "der", "das", "dem", "den", "des", "ein", "und",
"auf", "aus", "bei", "bis", "für", "mit", "nur", "von",
# English
"a", "i", "an", "as", "at", "be", "by", "do", "go", "he",
"if", "in", "is", "it", "me", "my", "no", "of", "oh", "ok",
"on", "or", "so", "to", "up", "us", "we",
"the", "and", "but", "for", "not",
}
_PURE_JUNK_RE = re.compile(r'^[\W\d\s]+$')
artifact_cells_removed = 0
for z in zones_data:
before = len(z.get("cells", []))
kept = []
for cell in z.get("cells", []):
text = (cell.get("text") or "").strip()
core = text.rstrip(".,;:!?'\"")
is_artifact = False
if not core:
is_artifact = True
elif _PURE_JUNK_RE.match(core):
is_artifact = True
elif len(core) <= 2 and core.lower() not in _COMMON_SHORT_WORDS and not core.isalpha():
# Short non-alphabetic text like "a=", not word beginnings like "Zw"
is_artifact = True
elif len(core) <= 3 and core.isupper() and core.lower() not in _COMMON_SHORT_WORDS:
is_artifact = True
elif len(core) <= 5 and re.search(r'\d', core) and re.search(r'[A-Za-z]', core):
# Mixed digits + letters in short text (e.g. "7 EN", "a=3")
is_artifact = True
if is_artifact:
kept.append(None) # placeholder
else:
kept.append(cell)
z["cells"] = [c for c in kept if c is not None]
artifact_cells_removed += before - len(z["cells"])
if artifact_cells_removed:
# Also remove rows that became completely empty
for z in zones_data:
cell_ris = {c.get("row_index") for c in z.get("cells", [])}
z["rows"] = [r for r in z.get("rows", []) if r["index"] in cell_ris]
logger.info("Step 5j-pre: removed %d artifact cells", artifact_cells_removed)
# 5j. Normalise word_box order to reading order (group by Y, sort by X).
# The frontend renders colored cells from word_boxes array order
# (GridTable.tsx), so they MUST be in left-to-right reading order.
wb_reordered = 0
for z in zones_data:
for cell in z.get("cells", []):
wbs = cell.get("word_boxes") or []
if len(wbs) < 2:
continue
lines = _group_words_into_lines(wbs, y_tolerance_px=15)
sorted_wbs = [w for line in lines for w in line]
# Check if order actually changed
if [id(w) for w in sorted_wbs] != [id(w) for w in wbs]:
cell["word_boxes"] = sorted_wbs
wb_reordered += 1
if wb_reordered:
logger.info("Step 5j: re-ordered word_boxes in %d cells to reading order", wb_reordered)
duration = time.time() - t0
# 6. Build result
total_cells = sum(len(z.get("cells", [])) for z in zones_data)
total_columns = sum(len(z.get("columns", [])) for z in zones_data)
total_rows = sum(len(z.get("rows", [])) for z in zones_data)
# Collect color statistics from all word_boxes in cells
color_stats: Dict[str, int] = {}
for z in zones_data:
for cell in z.get("cells", []):
for wb in cell.get("word_boxes", []):
cn = wb.get("color_name", "black")
color_stats[cn] = color_stats.get(cn, 0) + 1
# Compute layout metrics for faithful grid reconstruction
all_content_row_heights: List[float] = []
for z in zones_data:
for row in z.get("rows", []):
if not row.get("is_header", False):
h = row.get("y_max_px", 0) - row.get("y_min_px", 0)
if h > 0:
all_content_row_heights.append(h)
avg_row_height = (
sum(all_content_row_heights) / len(all_content_row_heights)
if all_content_row_heights else 30.0
)
font_size_suggestion = max(10, int(avg_row_height * 0.6))
# --- Dictionary detection on assembled grid ---
# Build lightweight ColumnGeometry-like structures from zone columns for
# dictionary signal scoring.
from cv_layout import _score_dictionary_signals
dict_detection: Dict[str, Any] = {"is_dictionary": False, "confidence": 0.0}
try:
from cv_vocab_types import ColumnGeometry
for z in zones_data:
zone_cells = z.get("cells", [])
zone_cols = z.get("columns", [])
if len(zone_cols) < 2 or len(zone_cells) < 10:
continue
# Build pseudo-ColumnGeometry per column
pseudo_geoms = []
for col in zone_cols:
ci = col["index"]
col_cells = [c for c in zone_cells if c.get("col_index") == ci]
# Flatten word_boxes into word dicts compatible with _score_language
col_words = []
for cell in col_cells:
for wb in cell.get("word_boxes") or []:
col_words.append({
"text": wb.get("text", ""),
"conf": wb.get("conf", 0),
"top": wb.get("top", 0),
"left": wb.get("left", 0),
"height": wb.get("height", 0),
"width": wb.get("width", 0),
})
# Fallback: use cell text if no word_boxes
if not cell.get("word_boxes") and cell.get("text"):
col_words.append({
"text": cell["text"],
"conf": cell.get("confidence", 50),
"top": cell.get("bbox_px", {}).get("y", 0),
"left": cell.get("bbox_px", {}).get("x", 0),
"height": cell.get("bbox_px", {}).get("h", 20),
"width": cell.get("bbox_px", {}).get("w", 50),
})
col_w = col.get("x_max_px", 0) - col.get("x_min_px", 0)
pseudo_geoms.append(ColumnGeometry(
index=ci, x=col.get("x_min_px", 0), y=0,
width=max(col_w, 1), height=img_h,
word_count=len(col_words), words=col_words,
width_ratio=col_w / max(img_w, 1),
))
if len(pseudo_geoms) >= 2:
dd = _score_dictionary_signals(
pseudo_geoms,
document_category=document_category,
margin_strip_detected=margin_strip_detected,
)
if dd["confidence"] > dict_detection["confidence"]:
dict_detection = dd
except Exception as e:
logger.warning("Dictionary detection failed: %s", e)
# --- Syllable divider insertion for dictionary pages ---
# Only on confirmed dictionary pages with article columns (der/die/das).
# The article_col_index check avoids false positives on synonym lists,
# word frequency tables, and other alphabetically sorted non-dictionary pages.
# Additionally, insert_syllable_dividers has its own pre-check for existing
# pipe characters in cells (OCR must have already found some).
syllable_insertions = 0
if (dict_detection.get("is_dictionary")
and dict_detection.get("article_col_index") is not None
and img_bgr is not None):
try:
from cv_syllable_detect import insert_syllable_dividers
syllable_insertions = insert_syllable_dividers(
zones_data, img_bgr, session_id,
)
except Exception as e:
logger.warning("Syllable insertion failed: %s", e)
# Clean up internal flags before returning
for z in zones_data:
for cell in z.get("cells", []):
cell.pop("_ipa_corrected", None)
result = {
"session_id": session_id,
"image_width": img_w,
"image_height": img_h,
"zones": zones_data,
"boxes_detected": boxes_detected,
"summary": {
"total_zones": len(zones_data),
"total_columns": total_columns,
"total_rows": total_rows,
"total_cells": total_cells,
"total_words": len(all_words),
"recovered_colored": recovered_count,
"color_stats": color_stats,
},
"formatting": {
"bold_columns": [],
"header_rows": [],
},
"layout_metrics": {
"page_width_px": img_w,
"page_height_px": img_h,
"avg_row_height_px": round(avg_row_height, 1),
"font_size_suggestion_px": font_size_suggestion,
},
"dictionary_detection": {
"is_dictionary": dict_detection.get("is_dictionary", False),
"confidence": dict_detection.get("confidence", 0.0),
"signals": dict_detection.get("signals", {}),
"article_col_index": dict_detection.get("article_col_index"),
"headword_col_index": dict_detection.get("headword_col_index"),
},
"duration_seconds": round(duration, 2),
}
return result
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.post("/sessions/{session_id}/build-grid")
async def build_grid(session_id: str):
"""Build a structured, zone-aware grid from existing Kombi word results.
Requires that paddle-kombi or rapid-kombi has already been run on the session.
Uses the image for box detection and the word positions for grid structuring.
Returns a StructuredGrid with zones, each containing their own
columns, rows, and cells — ready for the frontend Excel-like editor.
"""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
try:
result = await _build_grid_core(session_id, session)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Save automatic grid snapshot for later comparison with manual corrections
# Lazy import to avoid circular dependency with ocr_pipeline_regression
from ocr_pipeline_regression import _build_reference_snapshot
wr = session.get("word_result") or {}
engine = wr.get("ocr_engine", "")
if engine in ("kombi", "rapid_kombi"):
auto_pipeline = "kombi"
elif engine == "paddle_direct":
auto_pipeline = "paddle-direct"
else:
auto_pipeline = "pipeline"
auto_snapshot = _build_reference_snapshot(result, pipeline=auto_pipeline)
gt = session.get("ground_truth") or {}
gt["auto_grid_snapshot"] = auto_snapshot
# Persist to DB and advance current_step to 11 (reconstruction complete)
await update_session_db(session_id, grid_editor_result=result, ground_truth=gt, current_step=11)
logger.info(
"build-grid session %s: %d zones, %d cols, %d rows, %d cells, "
"%d boxes in %.2fs",
session_id,
len(result.get("zones", [])),
result.get("summary", {}).get("total_columns", 0),
result.get("summary", {}).get("total_rows", 0),
result.get("summary", {}).get("total_cells", 0),
result.get("boxes_detected", 0),
result.get("duration_seconds", 0),
)
return result
@router.post("/sessions/{session_id}/save-grid")
async def save_grid(session_id: str, request: Request):
"""Save edited grid data from the frontend Excel-like editor.
Receives the full StructuredGrid with user edits (text changes,
formatting changes like bold columns, header rows, etc.) and
persists it to the session's grid_editor_result.
"""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
body = await request.json()
# Validate basic structure
if "zones" not in body:
raise HTTPException(status_code=400, detail="Missing 'zones' in request body")
# Preserve metadata from the original build
existing = session.get("grid_editor_result") or {}
result = {
"session_id": session_id,
"image_width": body.get("image_width", existing.get("image_width", 0)),
"image_height": body.get("image_height", existing.get("image_height", 0)),
"zones": body["zones"],
"boxes_detected": body.get("boxes_detected", existing.get("boxes_detected", 0)),
"summary": body.get("summary", existing.get("summary", {})),
"formatting": body.get("formatting", existing.get("formatting", {})),
"duration_seconds": existing.get("duration_seconds", 0),
"edited": True,
}
await update_session_db(session_id, grid_editor_result=result, current_step=11)
logger.info("save-grid session %s: %d zones saved", session_id, len(body["zones"]))
return {"session_id": session_id, "saved": True}
@router.get("/sessions/{session_id}/grid-editor")
async def get_grid(session_id: str):
"""Retrieve the current grid editor state for a session."""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
result = session.get("grid_editor_result")
if not result:
raise HTTPException(
status_code=404,
detail="No grid editor data. Run build-grid first.",
)
return result