""" Grid Editor API — builds a structured, zone-aware grid from Kombi OCR results. Takes the merged word positions from paddle-kombi / rapid-kombi and: 1. Detects bordered boxes on the image (cv_box_detect) 2. Splits the page into zones (content + box regions) 3. Clusters words into columns and rows per zone 4. Returns a hierarchical StructuredGrid for the frontend Excel-like editor Lizenz: Apache 2.0 (kommerziell nutzbar) DATENSCHUTZ: Alle Verarbeitung erfolgt lokal. """ import logging import time from typing import Any, Dict, List, Optional import cv2 import numpy as np from fastapi import APIRouter, HTTPException, Request from cv_box_detect import detect_boxes, split_page_into_zones from cv_color_detect import detect_word_colors, recover_colored_text from cv_ocr_engines import fix_cell_phonetics from cv_words_first import _cluster_rows, _build_cells from ocr_pipeline_session_store import ( get_session_db, get_session_image, update_session_db, ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["grid-editor"]) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _cluster_columns_by_alignment( words: List[Dict], zone_w: int, rows: List[Dict], ) -> List[Dict[str, Any]]: """Detect columns by clustering left-edge alignment across rows. Hybrid approach: 1. Group words by row, find "group start" positions within each row (words preceded by a large gap or first word in row) 2. Cluster group-start left-edges by X-proximity across rows 3. Filter by row coverage (how many rows have a group start here) 4. Merge nearby clusters 5. Build column boundaries This filters out mid-phrase word positions (e.g. IPA transcriptions, second words in multi-word entries) by only considering positions where a new word group begins within a row. """ if not words or not rows: return [] total_rows = len(rows) if total_rows == 0: return [] # --- Group words by row --- row_words: Dict[int, List[Dict]] = {} for w in words: y_center = w["top"] + w["height"] / 2 best = min(rows, key=lambda r: abs(r["y_center"] - y_center)) row_words.setdefault(best["index"], []).append(w) # --- Compute adaptive gap threshold for group-start detection --- all_gaps: List[float] = [] for ri, rw_list in row_words.items(): sorted_rw = sorted(rw_list, key=lambda w: w["left"]) for i in range(len(sorted_rw) - 1): right = sorted_rw[i]["left"] + sorted_rw[i]["width"] gap = sorted_rw[i + 1]["left"] - right if gap > 0: all_gaps.append(gap) if all_gaps: sorted_gaps = sorted(all_gaps) median_gap = sorted_gaps[len(sorted_gaps) // 2] heights = [w["height"] for w in words if w.get("height", 0) > 0] median_h = sorted(heights)[len(heights) // 2] if heights else 25 # Column boundary: gap > 3× median gap or > 1.5× median word height gap_threshold = max(median_gap * 3, median_h * 1.5, 30) else: gap_threshold = 50 # --- Find group-start positions (left-edges that begin a new column) --- start_positions: List[tuple] = [] # (left_edge, row_index) for ri, rw_list in row_words.items(): sorted_rw = sorted(rw_list, key=lambda w: w["left"]) # First word in row is always a group start start_positions.append((sorted_rw[0]["left"], ri)) for i in range(1, len(sorted_rw)): right_prev = sorted_rw[i - 1]["left"] + sorted_rw[i - 1]["width"] gap = sorted_rw[i]["left"] - right_prev if gap >= gap_threshold: start_positions.append((sorted_rw[i]["left"], ri)) start_positions.sort(key=lambda x: x[0]) logger.info( "alignment columns: %d group-start positions from %d words " "(gap_threshold=%.0f, %d rows)", len(start_positions), len(words), gap_threshold, total_rows, ) if not start_positions: x_min = min(w["left"] for w in words) x_max = max(w["left"] + w["width"] for w in words) return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}] # --- Cluster group-start positions by X-proximity --- tolerance = max(10, int(zone_w * 0.01)) clusters: List[Dict[str, Any]] = [] cur_edges = [start_positions[0][0]] cur_rows = {start_positions[0][1]} for left, row_idx in start_positions[1:]: if left - cur_edges[-1] <= tolerance: cur_edges.append(left) cur_rows.add(row_idx) else: clusters.append({ "mean_x": int(sum(cur_edges) / len(cur_edges)), "min_edge": min(cur_edges), "max_edge": max(cur_edges), "count": len(cur_edges), "distinct_rows": len(cur_rows), "row_coverage": len(cur_rows) / total_rows, }) cur_edges = [left] cur_rows = {row_idx} clusters.append({ "mean_x": int(sum(cur_edges) / len(cur_edges)), "min_edge": min(cur_edges), "max_edge": max(cur_edges), "count": len(cur_edges), "distinct_rows": len(cur_rows), "row_coverage": len(cur_rows) / total_rows, }) # --- Filter by row coverage --- # These thresholds must be high enough to avoid false columns in flowing # text (random inter-word gaps) while still detecting real columns in # vocabulary worksheets (which typically have >80% row coverage). MIN_COVERAGE_PRIMARY = 0.35 MIN_COVERAGE_SECONDARY = 0.20 MIN_WORDS_SECONDARY = 4 MIN_DISTINCT_ROWS = 3 # Content boundary for left-margin detection content_x_min = min(w["left"] for w in words) content_x_max = max(w["left"] + w["width"] for w in words) content_span = content_x_max - content_x_min primary = [ c for c in clusters if c["row_coverage"] >= MIN_COVERAGE_PRIMARY and c["distinct_rows"] >= MIN_DISTINCT_ROWS ] primary_ids = {id(c) for c in primary} secondary = [ c for c in clusters if id(c) not in primary_ids and c["row_coverage"] >= MIN_COVERAGE_SECONDARY and c["count"] >= MIN_WORDS_SECONDARY and c["distinct_rows"] >= MIN_DISTINCT_ROWS ] # Tertiary: narrow left-margin columns (page refs, markers) that have # too few rows for secondary but are clearly left-aligned and separated # from the main content. These appear at the far left or far right and # have a large gap to the nearest significant cluster. used_ids = {id(c) for c in primary} | {id(c) for c in secondary} sig_xs = [c["mean_x"] for c in primary + secondary] tertiary = [] for c in clusters: if id(c) in used_ids or c["distinct_rows"] < MIN_DISTINCT_ROWS: continue # Must be near left or right content margin (within 15%) rel_pos = (c["mean_x"] - content_x_min) / content_span if content_span else 0.5 if not (rel_pos < 0.15 or rel_pos > 0.85): continue # Must have significant gap to nearest significant cluster if sig_xs: min_dist = min(abs(c["mean_x"] - sx) for sx in sig_xs) if min_dist < max(30, content_span * 0.02): continue tertiary.append(c) if tertiary: for c in tertiary: logger.info( " tertiary (margin) cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)", c["mean_x"], c["min_edge"], c["max_edge"], c["count"], c["distinct_rows"], c["row_coverage"] * 100, ) significant = sorted(primary + secondary + tertiary, key=lambda c: c["mean_x"]) for c in significant: logger.info( " significant cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)", c["mean_x"], c["min_edge"], c["max_edge"], c["count"], c["distinct_rows"], c["row_coverage"] * 100, ) logger.info( "alignment columns: %d clusters, %d primary, %d secondary → %d significant", len(clusters), len(primary), len(secondary), len(significant), ) if not significant: # Fallback: single column covering all content x_min = min(w["left"] for w in words) x_max = max(w["left"] + w["width"] for w in words) return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}] # --- Merge nearby clusters --- merge_distance = max(25, int(zone_w * 0.03)) merged = [significant[0].copy()] for s in significant[1:]: if s["mean_x"] - merged[-1]["mean_x"] < merge_distance: prev = merged[-1] total = prev["count"] + s["count"] prev["mean_x"] = ( prev["mean_x"] * prev["count"] + s["mean_x"] * s["count"] ) // total prev["count"] = total prev["min_edge"] = min(prev["min_edge"], s["min_edge"]) prev["max_edge"] = max(prev["max_edge"], s["max_edge"]) prev["distinct_rows"] = max(prev["distinct_rows"], s["distinct_rows"]) else: merged.append(s.copy()) logger.info( "alignment columns: %d after merge (distance=%d)", len(merged), merge_distance, ) # --- Build column boundaries --- margin = max(5, int(zone_w * 0.005)) content_x_min = min(w["left"] for w in words) content_x_max = max(w["left"] + w["width"] for w in words) columns: List[Dict[str, Any]] = [] for i, cluster in enumerate(merged): x_min = max(content_x_min, cluster["min_edge"] - margin) if i + 1 < len(merged): x_max = merged[i + 1]["min_edge"] - margin else: x_max = content_x_max columns.append({ "index": i, "type": f"column_{i + 1}" if len(merged) > 1 else "column_text", "x_min": x_min, "x_max": x_max, }) return columns # Characters that are typically OCR artefacts from box border lines. # Intentionally excludes ! (red markers) and . , ; (real punctuation). _GRID_GHOST_CHARS = set("|1lI[](){}/\\-—–_~=+") def _filter_border_ghosts( words: List[Dict], boxes: List, ) -> tuple: """Remove words sitting on box borders that are OCR artefacts. Returns (filtered_words, removed_count). """ if not boxes or not words: return words, 0 # Build border bands from detected boxes x_bands: List[tuple] = [] y_bands: List[tuple] = [] for b in boxes: bx = b.x if hasattr(b, "x") else b.get("x", 0) by = b.y if hasattr(b, "y") else b.get("y", 0) bw = b.width if hasattr(b, "width") else b.get("w", b.get("width", 0)) bh = b.height if hasattr(b, "height") else b.get("h", b.get("height", 0)) bt = ( b.border_thickness if hasattr(b, "border_thickness") else b.get("border_thickness", 3) ) margin = max(bt * 2, 10) + 6 x_bands.append((bx - margin, bx + margin)) x_bands.append((bx + bw - margin, bx + bw + margin)) y_bands.append((by - margin, by + margin)) y_bands.append((by + bh - margin, by + bh + margin)) def _is_ghost(w: Dict) -> bool: text = (w.get("text") or "").strip() if not text: return False # Check if any word edge (not just center) touches a border band w_left = w["left"] w_right = w["left"] + w["width"] w_top = w["top"] w_bottom = w["top"] + w["height"] on_border = ( any(lo <= w_left <= hi or lo <= w_right <= hi for lo, hi in x_bands) or any(lo <= w_top <= hi or lo <= w_bottom <= hi for lo, hi in y_bands) ) if not on_border: return False if all(c in _GRID_GHOST_CHARS for c in text): return True return False filtered = [w for w in words if not _is_ghost(w)] return filtered, len(words) - len(filtered) _MARKER_CHARS = set("•*·-–—|~=+#>→►▸▪◆○●□■✓✗✔✘") def _merge_inline_marker_columns( columns: List[Dict], words: List[Dict], ) -> List[Dict]: """Merge narrow marker columns (bullets, numbering) into adjacent text. Bullet points (•, *, -) and numbering (1., 2.) create narrow columns at the left edge of a zone. These are inline markers that indent text, not real separate columns. Merge them with their right neighbour. Does NOT merge columns containing alphabetic words like "to", "in", "der", "die", "das" — those are legitimate content columns. """ if len(columns) < 2: return columns merged: List[Dict] = [] skip: set = set() for i, col in enumerate(columns): if i in skip: continue # Find words in this column col_words = [ w for w in words if col["x_min"] <= w["left"] + w["width"] / 2 < col["x_max"] ] col_width = col["x_max"] - col["x_min"] # Narrow column with mostly short words → MIGHT be inline markers if col_words and col_width < 80: avg_len = sum(len(w.get("text", "")) for w in col_words) / len(col_words) if avg_len <= 2 and i + 1 < len(columns): # Check if words are actual markers (symbols/numbers) vs # real alphabetic words like "to", "in", "der", "die" texts = [(w.get("text") or "").strip() for w in col_words] alpha_count = sum( 1 for t in texts if t and t[0].isalpha() and t not in _MARKER_CHARS ) alpha_ratio = alpha_count / len(texts) if texts else 0 # If ≥50% of words are alphabetic, this is a real column if alpha_ratio >= 0.5: logger.info( " kept narrow column %d (w=%d, avg_len=%.1f, " "alpha=%.0f%%) — contains real words", i, col_width, avg_len, alpha_ratio * 100, ) else: # Merge into next column next_col = columns[i + 1].copy() next_col["x_min"] = col["x_min"] merged.append(next_col) skip.add(i + 1) logger.info( " merged inline marker column %d (w=%d, avg_len=%.1f) " "into column %d", i, col_width, avg_len, i + 1, ) continue merged.append(col) # Re-index for i, col in enumerate(merged): col["index"] = i col["type"] = f"column_{i + 1}" if len(merged) > 1 else "column_text" return merged def _flatten_word_boxes(cells: List[Dict]) -> List[Dict]: """Extract all word_boxes from cells into a flat list of word dicts.""" words: List[Dict] = [] for cell in cells: for wb in cell.get("word_boxes") or []: if wb.get("text", "").strip(): words.append({ "text": wb["text"], "left": wb["left"], "top": wb["top"], "width": wb["width"], "height": wb["height"], "conf": wb.get("conf", 0), }) return words def _words_in_zone( words: List[Dict], zone_y: int, zone_h: int, zone_x: int, zone_w: int, ) -> List[Dict]: """Filter words whose Y-center falls within a zone's bounds.""" zone_y_end = zone_y + zone_h zone_x_end = zone_x + zone_w result = [] for w in words: cy = w["top"] + w["height"] / 2 cx = w["left"] + w["width"] / 2 if zone_y <= cy <= zone_y_end and zone_x <= cx <= zone_x_end: result.append(w) return result def _detect_header_rows( rows: List[Dict], zone_words: List[Dict], zone_y: int, columns: Optional[List[Dict]] = None, ) -> List[int]: """Detect header rows: first-row heuristic + spanning header detection. A "spanning header" is a row whose words stretch across multiple column boundaries (e.g. "Unit4: Bonnie Scotland" centred across 4 columns). """ if len(rows) < 2: return [] headers = [] first_row = rows[0] second_row = rows[1] # Gap between first and second row > 0.5x average row height avg_h = sum(r["y_max"] - r["y_min"] for r in rows) / len(rows) gap = second_row["y_min"] - first_row["y_max"] if gap > avg_h * 0.5: headers.append(0) # Also check if first row words are taller than average (bold/header text) all_heights = [w["height"] for w in zone_words] median_h = sorted(all_heights)[len(all_heights) // 2] if all_heights else 20 first_row_words = [ w for w in zone_words if first_row["y_min"] <= w["top"] + w["height"] / 2 <= first_row["y_max"] ] if first_row_words: first_h = max(w["height"] for w in first_row_words) if first_h > median_h * 1.3: if 0 not in headers: headers.append(0) # Note: Spanning-header detection (rows spanning all columns) has been # disabled because it produces too many false positives on vocabulary # worksheets where IPA transcriptions or short entries naturally span # multiple columns with few words. The first-row heuristic above is # sufficient for detecting real headers. return headers def _build_zone_grid( zone_words: List[Dict], zone_x: int, zone_y: int, zone_w: int, zone_h: int, zone_index: int, img_w: int, img_h: int, global_columns: Optional[List[Dict]] = None, ) -> Dict[str, Any]: """Build columns, rows, cells for a single zone from its words. Args: global_columns: If provided, use these pre-computed column boundaries instead of detecting columns per zone. Used for content zones so that all content zones (above/between/below boxes) share the same column structure. Box zones always detect columns independently. """ if not zone_words: return { "columns": [], "rows": [], "cells": [], "header_rows": [], } # Cluster rows first (needed for column alignment analysis) rows = _cluster_rows(zone_words) # Diagnostic logging for small/medium zones (box zones typically have 40-60 words) if len(zone_words) <= 60: import statistics as _st _heights = [w['height'] for w in zone_words if w.get('height', 0) > 0] _med_h = _st.median(_heights) if _heights else 20 _y_tol = max(_med_h * 0.5, 5) logger.info( "zone %d row-clustering: %d words, median_h=%.0f, y_tol=%.1f → %d rows", zone_index, len(zone_words), _med_h, _y_tol, len(rows), ) for w in sorted(zone_words, key=lambda ww: (ww['top'], ww['left'])): logger.info( " zone %d word: y=%d x=%d h=%d w=%d '%s'", zone_index, w['top'], w['left'], w['height'], w['width'], w.get('text', '')[:40], ) for r in rows: logger.info( " zone %d row %d: y_min=%d y_max=%d y_center=%.0f", zone_index, r['index'], r['y_min'], r['y_max'], r['y_center'], ) # Use global columns if provided, otherwise detect per zone columns = global_columns if global_columns else _cluster_columns_by_alignment(zone_words, zone_w, rows) # Merge inline marker columns (bullets, numbering) into adjacent text if not global_columns: columns = _merge_inline_marker_columns(columns, zone_words) if not columns or not rows: return { "columns": [], "rows": [], "cells": [], "header_rows": [], } # Build cells cells = _build_cells(zone_words, columns, rows, img_w, img_h) # Prefix cell IDs with zone index for cell in cells: cell["cell_id"] = f"Z{zone_index}_{cell['cell_id']}" cell["zone_index"] = zone_index # Detect header rows (pass columns for spanning header detection) header_rows = _detect_header_rows(rows, zone_words, zone_y, columns) # Merge cells in spanning header rows into a single col-0 cell if header_rows and len(columns) >= 2: for hri in header_rows: header_cells = [c for c in cells if c["row_index"] == hri] if len(header_cells) <= 1: continue # Collect all word_boxes and text from all columns all_wb = [] all_text_parts = [] for hc in sorted(header_cells, key=lambda c: c["col_index"]): all_wb.extend(hc.get("word_boxes", [])) if hc.get("text", "").strip(): all_text_parts.append(hc["text"].strip()) # Remove all header cells, replace with one spanning cell cells = [c for c in cells if c["row_index"] != hri] if all_wb: x_min = min(wb["left"] for wb in all_wb) y_min = min(wb["top"] for wb in all_wb) x_max = max(wb["left"] + wb["width"] for wb in all_wb) y_max = max(wb["top"] + wb["height"] for wb in all_wb) cells.append({ "cell_id": f"R{hri:02d}_C0", "row_index": hri, "col_index": 0, "col_type": "spanning_header", "text": " ".join(all_text_parts), "confidence": 0.0, "bbox_px": {"x": x_min, "y": y_min, "w": x_max - x_min, "h": y_max - y_min}, "bbox_pct": { "x": round(x_min / img_w * 100, 2) if img_w else 0, "y": round(y_min / img_h * 100, 2) if img_h else 0, "w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0, "h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0, }, "word_boxes": all_wb, "ocr_engine": "words_first", "is_bold": True, }) # Convert columns to output format with percentages out_columns = [] for col in columns: x_min = col["x_min"] x_max = col["x_max"] out_columns.append({ "index": col["index"], "label": col["type"], "x_min_px": round(x_min), "x_max_px": round(x_max), "x_min_pct": round(x_min / img_w * 100, 2) if img_w else 0, "x_max_pct": round(x_max / img_w * 100, 2) if img_w else 0, "bold": False, }) # Convert rows to output format with percentages out_rows = [] for row in rows: out_rows.append({ "index": row["index"], "y_min_px": round(row["y_min"]), "y_max_px": round(row["y_max"]), "y_min_pct": round(row["y_min"] / img_h * 100, 2) if img_h else 0, "y_max_pct": round(row["y_max"] / img_h * 100, 2) if img_h else 0, "is_header": row["index"] in header_rows, }) return { "columns": out_columns, "rows": out_rows, "cells": cells, "header_rows": header_rows, "_raw_columns": columns, # internal: for propagation to other zones } def _get_content_bounds(words: List[Dict]) -> tuple: """Get content bounds from word positions.""" if not words: return 0, 0, 0, 0 x_min = min(w["left"] for w in words) y_min = min(w["top"] for w in words) x_max = max(w["left"] + w["width"] for w in words) y_max = max(w["top"] + w["height"] for w in words) return x_min, y_min, x_max - x_min, y_max - y_min def _filter_decorative_margin( words: List[Dict], img_w: int, log: Any, session_id: str, ) -> None: """Remove words that belong to a decorative alphabet strip on a margin. Some vocabulary worksheets have a vertical A–Z alphabet graphic along the left or right edge. OCR reads each letter as an isolated single- character word. These decorative elements are not content and confuse column/row detection. Detection criteria: - Words are in the outer 30% of the page (left or right) - Nearly all words are single characters (letters or digits) - At least 8 such words form a vertical strip (≥8 unique Y positions) - Average horizontal spread of the strip is small (< 60px) Modifies *words* in place. """ if not words or img_w <= 0: return margin_cutoff = img_w * 0.30 # Candidate margin words: single char, in left or right 30% left_strip = [ w for w in words if len((w.get("text") or "").strip()) == 1 and w["left"] + w.get("width", 0) / 2 < margin_cutoff ] right_strip = [ w for w in words if len((w.get("text") or "").strip()) == 1 and w["left"] + w.get("width", 0) / 2 > img_w - margin_cutoff ] for strip, side in [(left_strip, "left"), (right_strip, "right")]: if len(strip) < 8: continue # Check vertical distribution: should have many distinct Y positions y_centers = sorted(set( int(w["top"] + w.get("height", 0) / 2) // 20 * 20 # bucket for w in strip )) if len(y_centers) < 6: continue # Check horizontal compactness x_positions = [w["left"] for w in strip] x_spread = max(x_positions) - min(x_positions) if x_spread > 80: continue # This looks like a decorative alphabet strip — remove these words strip_set = set(id(w) for w in strip) before = len(words) words[:] = [w for w in words if id(w) not in strip_set] removed = before - len(words) if removed: log.info( "build-grid session %s: removed %d decorative %s-margin chars", session_id, removed, side, ) def _filter_footer_words( words: List[Dict], img_h: int, log: Any, session_id: str, ) -> None: """Remove isolated words in the bottom 5% of the page (page numbers). Modifies *words* in place. """ if not words or img_h <= 0: return footer_y = img_h * 0.95 footer_words = [ w for w in words if w["top"] + w.get("height", 0) / 2 > footer_y ] if not footer_words: return # Only remove if footer has very few words (≤ 3) with short text total_text = "".join((w.get("text") or "").strip() for w in footer_words) if len(footer_words) <= 3 and len(total_text) <= 10: footer_set = set(id(w) for w in footer_words) words[:] = [w for w in words if id(w) not in footer_set] log.info( "build-grid session %s: removed %d footer words ('%s')", session_id, len(footer_words), total_text, ) # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @router.post("/sessions/{session_id}/build-grid") async def build_grid(session_id: str): """Build a structured, zone-aware grid from existing Kombi word results. Requires that paddle-kombi or rapid-kombi has already been run on the session. Uses the image for box detection and the word positions for grid structuring. Returns a StructuredGrid with zones, each containing their own columns, rows, and cells — ready for the frontend Excel-like editor. """ t0 = time.time() # 1. Load session and word results session = await get_session_db(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found") word_result = session.get("word_result") if not word_result or not word_result.get("cells"): raise HTTPException( status_code=400, detail="No word results found. Run paddle-kombi or rapid-kombi first.", ) img_w = word_result.get("image_width", 0) img_h = word_result.get("image_height", 0) if not img_w or not img_h: raise HTTPException(status_code=400, detail="Missing image dimensions in word_result") # 2. Flatten all word boxes from cells all_words = _flatten_word_boxes(word_result["cells"]) if not all_words: raise HTTPException(status_code=400, detail="No word boxes found in cells") logger.info("build-grid session %s: %d words from %d cells", session_id, len(all_words), len(word_result["cells"])) # 2b. Filter decorative margin columns (alphabet graphics). # Some worksheets have a decorative alphabet strip along one margin # (A-Z in a graphic). OCR reads these as single-char words aligned # vertically. Detect and remove them before grid building. _filter_decorative_margin(all_words, img_w, logger, session_id) # 2c. Filter footer rows (page numbers at the very bottom). # Isolated short text in the bottom 5% of the page is typically a # page number ("64", "S. 12") and not real content. _filter_footer_words(all_words, img_h, logger, session_id) # 2d. Filter words inside detected graphic/image regions # Only remove LOW-CONFIDENCE words (likely OCR artifacts from images). # High-confidence words are real text even if they overlap a detected # graphic region (e.g. colored text that graphic detection couldn't # fully distinguish from an image). _GRAPHIC_CONF_THRESHOLD = 50 # keep words with conf >= 50 structure_result = session.get("structure_result") graphic_rects = [] if structure_result: for g in structure_result.get("graphics", []): graphic_rects.append({ "x": g["x"], "y": g["y"], "w": g["w"], "h": g["h"], }) if graphic_rects: before = len(all_words) filtered = [] for w in all_words: w_cx = w["left"] + w.get("width", 0) / 2 w_cy = w["top"] + w.get("height", 0) / 2 inside = any( gr["x"] <= w_cx <= gr["x"] + gr["w"] and gr["y"] <= w_cy <= gr["y"] + gr["h"] for gr in graphic_rects ) if inside and w.get("conf", 0) < _GRAPHIC_CONF_THRESHOLD: continue # remove low-confidence artifact filtered.append(w) removed = before - len(filtered) if removed: all_words = filtered logger.info( "build-grid session %s: removed %d low-conf words inside %d graphic region(s)", session_id, removed, len(graphic_rects), ) # 3. Load image for box detection img_png = await get_session_image(session_id, "cropped") if not img_png: img_png = await get_session_image(session_id, "dewarped") if not img_png: img_png = await get_session_image(session_id, "original") zones_data: List[Dict[str, Any]] = [] boxes_detected = 0 recovered_count = 0 img_bgr = None content_x, content_y, content_w, content_h = _get_content_bounds(all_words) if img_png: # Decode image for color detection + box detection arr = np.frombuffer(img_png, dtype=np.uint8) img_bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR) if img_bgr is not None: # --- Recover colored text that OCR missed (before grid building) --- recovered = recover_colored_text(img_bgr, all_words) if recovered and graphic_rects: # Filter recovered chars inside graphic regions recovered = [ r for r in recovered if not any( gr["x"] <= r["left"] + r.get("width", 0) / 2 <= gr["x"] + gr["w"] and gr["y"] <= r["top"] + r.get("height", 0) / 2 <= gr["y"] + gr["h"] for gr in graphic_rects ) ] if recovered: recovered_count = len(recovered) all_words.extend(recovered) logger.info( "build-grid session %s: +%d recovered colored words", session_id, recovered_count, ) # Detect bordered boxes boxes = detect_boxes( img_bgr, content_x=content_x, content_w=content_w, content_y=content_y, content_h=content_h, ) boxes_detected = len(boxes) if boxes: # Filter border ghost words before grid building all_words, ghost_count = _filter_border_ghosts(all_words, boxes) if ghost_count: logger.info( "build-grid session %s: removed %d border ghost words", session_id, ghost_count, ) # Split page into zones page_zones = split_page_into_zones( content_x, content_y, content_w, content_h, boxes ) # --- Union columns from all content zones --- # Each content zone detects columns independently. Narrow # columns (page refs, markers) may appear in only one zone. # Merge column split-points from ALL content zones so every # zone shares the full column set. # First pass: build grids per zone independently zone_grids: List[Dict] = [] for pz in page_zones: zone_words = _words_in_zone( all_words, pz.y, pz.height, pz.x, pz.width ) # Filter recovered single-char artifacts in ALL zones # (decorative colored pixel blobs like !, ?, • from # recover_colored_text that don't represent real text) before = len(zone_words) zone_words = [ w for w in zone_words if not ( w.get("recovered") and len(w.get("text", "").strip()) <= 2 ) ] removed = before - len(zone_words) if removed: logger.info( "build-grid: filtered %d recovered artifacts from %s zone %d", removed, pz.zone_type, pz.index, ) grid = _build_zone_grid( zone_words, pz.x, pz.y, pz.width, pz.height, pz.index, img_w, img_h, ) zone_grids.append({"pz": pz, "words": zone_words, "grid": grid}) # Second pass: merge column boundaries from all content zones content_zones = [ zg for zg in zone_grids if zg["pz"].zone_type == "content" ] if len(content_zones) > 1: # Collect column split points (x_min of non-first columns) all_split_xs: List[float] = [] for zg in content_zones: raw_cols = zg["grid"].get("_raw_columns", []) for col in raw_cols[1:]: all_split_xs.append(col["x_min"]) if all_split_xs: all_split_xs.sort() merge_distance = max(25, int(content_w * 0.03)) merged_xs = [all_split_xs[0]] for x in all_split_xs[1:]: if x - merged_xs[-1] < merge_distance: merged_xs[-1] = (merged_xs[-1] + x) / 2 else: merged_xs.append(x) total_cols = len(merged_xs) + 1 max_zone_cols = max( len(zg["grid"].get("_raw_columns", [])) for zg in content_zones ) # Apply union whenever it has at least as many # columns as the best single zone. Even with the # same count the union boundaries are better because # they incorporate evidence from all zones. if total_cols >= max_zone_cols: cx_min = min(w["left"] for w in all_words) cx_max = max( w["left"] + w["width"] for w in all_words ) merged_columns: List[Dict[str, Any]] = [] prev_x = cx_min for i, sx in enumerate(merged_xs): merged_columns.append({ "index": i, "type": f"column_{i + 1}", "x_min": prev_x, "x_max": sx, }) prev_x = sx merged_columns.append({ "index": len(merged_xs), "type": f"column_{len(merged_xs) + 1}", "x_min": prev_x, "x_max": cx_max, }) # Re-build ALL content zones with merged columns for zg in zone_grids: pz = zg["pz"] if pz.zone_type == "content": grid = _build_zone_grid( zg["words"], pz.x, pz.y, pz.width, pz.height, pz.index, img_w, img_h, global_columns=merged_columns, ) zg["grid"] = grid logger.info( "build-grid session %s: union of %d content " "zones → %d merged columns (max single zone: %d)", session_id, len(content_zones), total_cols, max_zone_cols, ) for zg in zone_grids: pz = zg["pz"] grid = zg["grid"] # Remove internal _raw_columns before adding to response grid.pop("_raw_columns", None) zone_entry: Dict[str, Any] = { "zone_index": pz.index, "zone_type": pz.zone_type, "bbox_px": { "x": pz.x, "y": pz.y, "w": pz.width, "h": pz.height, }, "bbox_pct": { "x": round(pz.x / img_w * 100, 2) if img_w else 0, "y": round(pz.y / img_h * 100, 2) if img_h else 0, "w": round(pz.width / img_w * 100, 2) if img_w else 0, "h": round(pz.height / img_h * 100, 2) if img_h else 0, }, "border": None, "word_count": len(zg["words"]), **grid, } if pz.box: zone_entry["border"] = { "thickness": pz.box.border_thickness, "confidence": pz.box.confidence, } zones_data.append(zone_entry) # 4. Fallback: no boxes detected → single zone with all words if not zones_data: # Filter recovered single-char artifacts (same as in zone loop above) before = len(all_words) filtered_words = [ w for w in all_words if not (w.get("recovered") and len(w.get("text", "").strip()) <= 2) ] removed = before - len(filtered_words) if removed: logger.info( "build-grid session %s: filtered %d recovered artifacts (fallback zone)", session_id, removed, ) grid = _build_zone_grid( filtered_words, content_x, content_y, content_w, content_h, 0, img_w, img_h, ) grid.pop("_raw_columns", None) zones_data.append({ "zone_index": 0, "zone_type": "content", "bbox_px": { "x": content_x, "y": content_y, "w": content_w, "h": content_h, }, "bbox_pct": { "x": round(content_x / img_w * 100, 2) if img_w else 0, "y": round(content_y / img_h * 100, 2) if img_h else 0, "w": round(content_w / img_w * 100, 2) if img_w else 0, "h": round(content_h / img_h * 100, 2) if img_h else 0, }, "border": None, "word_count": len(all_words), **grid, }) # 4b. Remove junk rows: rows where ALL cells contain only short, # low-confidence text (OCR noise, stray marks). Real vocabulary rows # have at least one word with conf >= 50 or meaningful text length. # Also remove "oversized stub" rows: rows with ≤2 very short words # whose word-boxes are significantly taller than the median (e.g. # large red page numbers like "( 9" that are not real text content). _JUNK_CONF_THRESHOLD = 50 _JUNK_MAX_TEXT_LEN = 3 for z in zones_data: cells = z.get("cells", []) rows = z.get("rows", []) if not cells or not rows: continue # Compute median word height across the zone for oversized detection all_wb_heights = [ wb["height"] for cell in cells for wb in cell.get("word_boxes") or [] if wb.get("height", 0) > 0 ] median_wb_h = sorted(all_wb_heights)[len(all_wb_heights) // 2] if all_wb_heights else 28 junk_row_indices = set() for row in rows: ri = row["index"] row_cells = [c for c in cells if c.get("row_index") == ri] if not row_cells: continue row_wbs = [ wb for cell in row_cells for wb in cell.get("word_boxes") or [] ] # Rule 1: ALL word_boxes are low-conf AND short text all_junk = True for wb in row_wbs: text = (wb.get("text") or "").strip() conf = wb.get("conf", 0) if conf >= _JUNK_CONF_THRESHOLD or len(text) > _JUNK_MAX_TEXT_LEN: all_junk = False break if all_junk and row_wbs: junk_row_indices.add(ri) continue # Rule 2: oversized stub — ≤3 words, short total text, # and word height > 1.8× median (page numbers, stray marks, # OCR from illustration labels like "SEA &") if len(row_wbs) <= 3: total_text = "".join((wb.get("text") or "").strip() for wb in row_wbs) max_h = max((wb.get("height", 0) for wb in row_wbs), default=0) if len(total_text) <= 5 and max_h > median_wb_h * 1.8: junk_row_indices.add(ri) continue # Rule 3: scattered debris — rows with only tiny fragments # (e.g. OCR artifacts from illustrations/graphics). # If the row has no word longer than 2 chars, it's noise. longest = max(len((wb.get("text") or "").strip()) for wb in row_wbs) if longest <= 2: junk_row_indices.add(ri) continue if junk_row_indices: z["cells"] = [c for c in cells if c.get("row_index") not in junk_row_indices] z["rows"] = [r for r in rows if r["index"] not in junk_row_indices] logger.info( "build-grid: removed %d junk rows from zone %d: %s", len(junk_row_indices), z["zone_index"], sorted(junk_row_indices), ) # 5. Color annotation on final word_boxes in cells if img_bgr is not None: all_wb: List[Dict] = [] for z in zones_data: for cell in z.get("cells", []): all_wb.extend(cell.get("word_boxes", [])) detect_word_colors(img_bgr, all_wb) # 5b. Fix unmatched parentheses in cell text # OCR often misses opening "(" while detecting closing ")". # If a cell's text has ")" without a matching "(", prepend "(". for z in zones_data: for cell in z.get("cells", []): text = cell.get("text", "") if ")" in text and "(" not in text: cell["text"] = "(" + text # 5c. IPA phonetic correction — replace garbled OCR phonetics with # correct IPA from the dictionary (same as in the OCR pipeline). # Only applies to vocabulary tables (≥3 columns: EN | article | DE). # Single/two-column layouts are continuous text, not vocab tables. all_cells = [cell for z in zones_data for cell in z.get("cells", [])] total_cols = sum(len(z.get("columns", [])) for z in zones_data) if total_cols >= 3: # Find which col_type has the longest average text → English headwords col_avg_len: Dict[str, List[int]] = {} for cell in all_cells: ct = cell.get("col_type", "") txt = cell.get("text", "") col_avg_len.setdefault(ct, []).append(len(txt)) en_col_type = None best_avg = 0 for ct, lengths in col_avg_len.items(): if not ct.startswith("column_"): continue avg = sum(lengths) / len(lengths) if lengths else 0 if avg > best_avg: best_avg = avg en_col_type = ct if en_col_type: for cell in all_cells: if cell.get("col_type") == en_col_type: cell["_orig_col_type"] = en_col_type cell["col_type"] = "column_en" fix_cell_phonetics(all_cells, pronunciation="british") for cell in all_cells: orig = cell.pop("_orig_col_type", None) if orig: cell["col_type"] = orig # 5d. Remove IPA continuation rows — rows where the printed # phonetic transcription wraps to a line below the headword. # These rows have text only in the English column (+ margin # noise) and fix_cell_phonetics did NOT insert IPA brackets # (because there's no real English word to look up). ipa_cont_rows: set = set() for z in zones_data: for row in z.get("rows", []): ri = row["index"] row_cells = [ c for c in z.get("cells", []) if c.get("row_index") == ri ] en_cells = [ c for c in row_cells if c.get("col_type") == en_col_type ] # Other cells with ≥3 chars (ignore margin noise) other_cells = [ c for c in row_cells if c.get("col_type") != en_col_type and len((c.get("text") or "").strip()) >= 3 ] if en_cells and not other_cells: en_text = en_cells[0].get("text", "") # No IPA brackets → phonetics not recognized → # this is a garbled IPA continuation row if "[" not in en_text: ipa_cont_rows.add(ri) if ipa_cont_rows: for z in zones_data: z["rows"] = [ r for r in z.get("rows", []) if r["index"] not in ipa_cont_rows ] z["cells"] = [ c for c in z.get("cells", []) if c.get("row_index") not in ipa_cont_rows ] logger.info( "removed %d IPA continuation rows: %s", len(ipa_cont_rows), sorted(ipa_cont_rows), ) duration = time.time() - t0 # 6. Build result total_cells = sum(len(z.get("cells", [])) for z in zones_data) total_columns = sum(len(z.get("columns", [])) for z in zones_data) total_rows = sum(len(z.get("rows", [])) for z in zones_data) # Collect color statistics from all word_boxes in cells color_stats: Dict[str, int] = {} for z in zones_data: for cell in z.get("cells", []): for wb in cell.get("word_boxes", []): cn = wb.get("color_name", "black") color_stats[cn] = color_stats.get(cn, 0) + 1 # Compute layout metrics for faithful grid reconstruction all_content_row_heights: List[float] = [] for z in zones_data: for row in z.get("rows", []): if not row.get("is_header", False): h = row.get("y_max_px", 0) - row.get("y_min_px", 0) if h > 0: all_content_row_heights.append(h) avg_row_height = ( sum(all_content_row_heights) / len(all_content_row_heights) if all_content_row_heights else 30.0 ) font_size_suggestion = max(10, int(avg_row_height * 0.6)) result = { "session_id": session_id, "image_width": img_w, "image_height": img_h, "zones": zones_data, "boxes_detected": boxes_detected, "summary": { "total_zones": len(zones_data), "total_columns": total_columns, "total_rows": total_rows, "total_cells": total_cells, "total_words": len(all_words), "recovered_colored": recovered_count, "color_stats": color_stats, }, "formatting": { "bold_columns": [], "header_rows": [], }, "layout_metrics": { "page_width_px": img_w, "page_height_px": img_h, "avg_row_height_px": round(avg_row_height, 1), "font_size_suggestion_px": font_size_suggestion, }, "duration_seconds": round(duration, 2), } # 7. Persist to DB await update_session_db(session_id, grid_editor_result=result) logger.info( "build-grid session %s: %d zones, %d cols, %d rows, %d cells, " "%d boxes in %.2fs", session_id, len(zones_data), total_columns, total_rows, total_cells, boxes_detected, duration, ) return result @router.post("/sessions/{session_id}/save-grid") async def save_grid(session_id: str, request: Request): """Save edited grid data from the frontend Excel-like editor. Receives the full StructuredGrid with user edits (text changes, formatting changes like bold columns, header rows, etc.) and persists it to the session's grid_editor_result. """ session = await get_session_db(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found") body = await request.json() # Validate basic structure if "zones" not in body: raise HTTPException(status_code=400, detail="Missing 'zones' in request body") # Preserve metadata from the original build existing = session.get("grid_editor_result") or {} result = { "session_id": session_id, "image_width": body.get("image_width", existing.get("image_width", 0)), "image_height": body.get("image_height", existing.get("image_height", 0)), "zones": body["zones"], "boxes_detected": body.get("boxes_detected", existing.get("boxes_detected", 0)), "summary": body.get("summary", existing.get("summary", {})), "formatting": body.get("formatting", existing.get("formatting", {})), "duration_seconds": existing.get("duration_seconds", 0), "edited": True, } await update_session_db(session_id, grid_editor_result=result) logger.info("save-grid session %s: %d zones saved", session_id, len(body["zones"])) return {"session_id": session_id, "saved": True} @router.get("/sessions/{session_id}/grid-editor") async def get_grid(session_id: str): """Retrieve the current grid editor state for a session.""" session = await get_session_db(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found") result = session.get("grid_editor_result") if not result: raise HTTPException( status_code=404, detail="No grid editor data. Run build-grid first.", ) return result