""" Grid Editor API — builds a structured, zone-aware grid from Kombi OCR results. Takes the merged word positions from paddle-kombi / rapid-kombi and: 1. Detects bordered boxes on the image (cv_box_detect) 2. Splits the page into zones (content + box regions) 3. Clusters words into columns and rows per zone 4. Returns a hierarchical StructuredGrid for the frontend Excel-like editor Lizenz: Apache 2.0 (kommerziell nutzbar) DATENSCHUTZ: Alle Verarbeitung erfolgt lokal. """ import logging import time from typing import Any, Dict, List, Optional import cv2 import numpy as np from fastapi import APIRouter, HTTPException, Request from cv_box_detect import detect_boxes, split_page_into_zones from cv_words_first import _cluster_rows, _build_cells from ocr_pipeline_session_store import ( get_session_db, get_session_image, update_session_db, ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["grid-editor"]) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _cluster_columns_by_alignment( words: List[Dict], zone_w: int, rows: List[Dict], ) -> List[Dict[str, Any]]: """Detect columns by clustering left-edge alignment across rows. Hybrid approach: 1. Group words by row, find "group start" positions within each row (words preceded by a large gap or first word in row) 2. Cluster group-start left-edges by X-proximity across rows 3. Filter by row coverage (how many rows have a group start here) 4. Merge nearby clusters 5. Build column boundaries This filters out mid-phrase word positions (e.g. IPA transcriptions, second words in multi-word entries) by only considering positions where a new word group begins within a row. """ if not words or not rows: return [] total_rows = len(rows) if total_rows == 0: return [] # --- Group words by row --- row_words: Dict[int, List[Dict]] = {} for w in words: y_center = w["top"] + w["height"] / 2 best = min(rows, key=lambda r: abs(r["y_center"] - y_center)) row_words.setdefault(best["index"], []).append(w) # --- Compute adaptive gap threshold for group-start detection --- all_gaps: List[float] = [] for ri, rw_list in row_words.items(): sorted_rw = sorted(rw_list, key=lambda w: w["left"]) for i in range(len(sorted_rw) - 1): right = sorted_rw[i]["left"] + sorted_rw[i]["width"] gap = sorted_rw[i + 1]["left"] - right if gap > 0: all_gaps.append(gap) if all_gaps: sorted_gaps = sorted(all_gaps) median_gap = sorted_gaps[len(sorted_gaps) // 2] heights = [w["height"] for w in words if w.get("height", 0) > 0] median_h = sorted(heights)[len(heights) // 2] if heights else 25 # Column boundary: gap > 3× median gap or > 1.5× median word height gap_threshold = max(median_gap * 3, median_h * 1.5, 30) else: gap_threshold = 50 # --- Find group-start positions (left-edges that begin a new column) --- start_positions: List[tuple] = [] # (left_edge, row_index) for ri, rw_list in row_words.items(): sorted_rw = sorted(rw_list, key=lambda w: w["left"]) # First word in row is always a group start start_positions.append((sorted_rw[0]["left"], ri)) for i in range(1, len(sorted_rw)): right_prev = sorted_rw[i - 1]["left"] + sorted_rw[i - 1]["width"] gap = sorted_rw[i]["left"] - right_prev if gap >= gap_threshold: start_positions.append((sorted_rw[i]["left"], ri)) start_positions.sort(key=lambda x: x[0]) logger.info( "alignment columns: %d group-start positions from %d words " "(gap_threshold=%.0f, %d rows)", len(start_positions), len(words), gap_threshold, total_rows, ) if not start_positions: x_min = min(w["left"] for w in words) x_max = max(w["left"] + w["width"] for w in words) return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}] # --- Cluster group-start positions by X-proximity --- tolerance = max(10, int(zone_w * 0.01)) clusters: List[Dict[str, Any]] = [] cur_edges = [start_positions[0][0]] cur_rows = {start_positions[0][1]} for left, row_idx in start_positions[1:]: if left - cur_edges[-1] <= tolerance: cur_edges.append(left) cur_rows.add(row_idx) else: clusters.append({ "mean_x": int(sum(cur_edges) / len(cur_edges)), "min_edge": min(cur_edges), "max_edge": max(cur_edges), "count": len(cur_edges), "distinct_rows": len(cur_rows), "row_coverage": len(cur_rows) / total_rows, }) cur_edges = [left] cur_rows = {row_idx} clusters.append({ "mean_x": int(sum(cur_edges) / len(cur_edges)), "min_edge": min(cur_edges), "max_edge": max(cur_edges), "count": len(cur_edges), "distinct_rows": len(cur_rows), "row_coverage": len(cur_rows) / total_rows, }) # --- Filter by row coverage --- MIN_COVERAGE_PRIMARY = 0.20 MIN_COVERAGE_SECONDARY = 0.12 MIN_WORDS_SECONDARY = 3 MIN_DISTINCT_ROWS = 2 primary = [ c for c in clusters if c["row_coverage"] >= MIN_COVERAGE_PRIMARY and c["distinct_rows"] >= MIN_DISTINCT_ROWS ] primary_ids = {id(c) for c in primary} secondary = [ c for c in clusters if id(c) not in primary_ids and c["row_coverage"] >= MIN_COVERAGE_SECONDARY and c["count"] >= MIN_WORDS_SECONDARY and c["distinct_rows"] >= MIN_DISTINCT_ROWS ] significant = sorted(primary + secondary, key=lambda c: c["mean_x"]) logger.info( "alignment columns: %d clusters, %d primary, %d secondary → %d significant", len(clusters), len(primary), len(secondary), len(significant), ) if not significant: # Fallback: single column covering all content x_min = min(w["left"] for w in words) x_max = max(w["left"] + w["width"] for w in words) return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}] # --- Merge nearby clusters --- merge_distance = max(25, int(zone_w * 0.03)) merged = [significant[0].copy()] for s in significant[1:]: if s["mean_x"] - merged[-1]["mean_x"] < merge_distance: prev = merged[-1] total = prev["count"] + s["count"] prev["mean_x"] = ( prev["mean_x"] * prev["count"] + s["mean_x"] * s["count"] ) // total prev["count"] = total prev["min_edge"] = min(prev["min_edge"], s["min_edge"]) prev["max_edge"] = max(prev["max_edge"], s["max_edge"]) prev["distinct_rows"] = max(prev["distinct_rows"], s["distinct_rows"]) else: merged.append(s.copy()) logger.info( "alignment columns: %d after merge (distance=%d)", len(merged), merge_distance, ) # --- Build column boundaries --- margin = max(5, int(zone_w * 0.005)) content_x_min = min(w["left"] for w in words) content_x_max = max(w["left"] + w["width"] for w in words) columns: List[Dict[str, Any]] = [] for i, cluster in enumerate(merged): x_min = max(content_x_min, cluster["min_edge"] - margin) if i + 1 < len(merged): x_max = merged[i + 1]["min_edge"] - margin else: x_max = content_x_max columns.append({ "index": i, "type": f"column_{i + 1}" if len(merged) > 1 else "column_text", "x_min": x_min, "x_max": x_max, }) return columns def _flatten_word_boxes(cells: List[Dict]) -> List[Dict]: """Extract all word_boxes from cells into a flat list of word dicts.""" words: List[Dict] = [] for cell in cells: for wb in cell.get("word_boxes") or []: if wb.get("text", "").strip(): words.append({ "text": wb["text"], "left": wb["left"], "top": wb["top"], "width": wb["width"], "height": wb["height"], "conf": wb.get("conf", 0), }) return words def _words_in_zone( words: List[Dict], zone_y: int, zone_h: int, zone_x: int, zone_w: int, ) -> List[Dict]: """Filter words whose Y-center falls within a zone's bounds.""" zone_y_end = zone_y + zone_h zone_x_end = zone_x + zone_w result = [] for w in words: cy = w["top"] + w["height"] / 2 cx = w["left"] + w["width"] / 2 if zone_y <= cy <= zone_y_end and zone_x <= cx <= zone_x_end: result.append(w) return result def _detect_header_rows( rows: List[Dict], zone_words: List[Dict], zone_y: int, ) -> List[int]: """Heuristic: the first row is a header if it has bold/large text or there's a significant gap after it.""" if len(rows) < 2: return [] headers = [] first_row = rows[0] second_row = rows[1] # Gap between first and second row > 1.5x average row height avg_h = sum(r["y_max"] - r["y_min"] for r in rows) / len(rows) gap = second_row["y_min"] - first_row["y_max"] if gap > avg_h * 0.5: headers.append(0) # Also check if first row words are taller than average (bold/header text) first_row_words = [ w for w in zone_words if first_row["y_min"] <= w["top"] + w["height"] / 2 <= first_row["y_max"] ] if first_row_words: first_h = max(w["height"] for w in first_row_words) all_heights = [w["height"] for w in zone_words] median_h = sorted(all_heights)[len(all_heights) // 2] if all_heights else first_h if first_h > median_h * 1.3: if 0 not in headers: headers.append(0) return headers def _build_zone_grid( zone_words: List[Dict], zone_x: int, zone_y: int, zone_w: int, zone_h: int, zone_index: int, img_w: int, img_h: int, ) -> Dict[str, Any]: """Build columns, rows, cells for a single zone from its words.""" if not zone_words: return { "columns": [], "rows": [], "cells": [], "header_rows": [], } # Cluster rows first (needed for column alignment analysis) rows = _cluster_rows(zone_words) # Cluster columns by left-edge alignment columns = _cluster_columns_by_alignment(zone_words, zone_w, rows) if not columns or not rows: return { "columns": [], "rows": [], "cells": [], "header_rows": [], } # Build cells cells = _build_cells(zone_words, columns, rows, img_w, img_h) # Prefix cell IDs with zone index for cell in cells: cell["cell_id"] = f"Z{zone_index}_{cell['cell_id']}" cell["zone_index"] = zone_index # Detect header rows header_rows = _detect_header_rows(rows, zone_words, zone_y) # Convert columns to output format with percentages out_columns = [] for col in columns: x_min = col["x_min"] x_max = col["x_max"] out_columns.append({ "index": col["index"], "label": col["type"], "x_min_px": round(x_min), "x_max_px": round(x_max), "x_min_pct": round(x_min / img_w * 100, 2) if img_w else 0, "x_max_pct": round(x_max / img_w * 100, 2) if img_w else 0, "bold": False, }) # Convert rows to output format with percentages out_rows = [] for row in rows: out_rows.append({ "index": row["index"], "y_min_px": round(row["y_min"]), "y_max_px": round(row["y_max"]), "y_min_pct": round(row["y_min"] / img_h * 100, 2) if img_h else 0, "y_max_pct": round(row["y_max"] / img_h * 100, 2) if img_h else 0, "is_header": row["index"] in header_rows, }) return { "columns": out_columns, "rows": out_rows, "cells": cells, "header_rows": header_rows, } def _get_content_bounds(words: List[Dict]) -> tuple: """Get content bounds from word positions.""" if not words: return 0, 0, 0, 0 x_min = min(w["left"] for w in words) y_min = min(w["top"] for w in words) x_max = max(w["left"] + w["width"] for w in words) y_max = max(w["top"] + w["height"] for w in words) return x_min, y_min, x_max - x_min, y_max - y_min # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @router.post("/sessions/{session_id}/build-grid") async def build_grid(session_id: str): """Build a structured, zone-aware grid from existing Kombi word results. Requires that paddle-kombi or rapid-kombi has already been run on the session. Uses the image for box detection and the word positions for grid structuring. Returns a StructuredGrid with zones, each containing their own columns, rows, and cells — ready for the frontend Excel-like editor. """ t0 = time.time() # 1. Load session and word results session = await get_session_db(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found") word_result = session.get("word_result") if not word_result or not word_result.get("cells"): raise HTTPException( status_code=400, detail="No word results found. Run paddle-kombi or rapid-kombi first.", ) img_w = word_result.get("image_width", 0) img_h = word_result.get("image_height", 0) if not img_w or not img_h: raise HTTPException(status_code=400, detail="Missing image dimensions in word_result") # 2. Flatten all word boxes from cells all_words = _flatten_word_boxes(word_result["cells"]) if not all_words: raise HTTPException(status_code=400, detail="No word boxes found in cells") logger.info("build-grid session %s: %d words from %d cells", session_id, len(all_words), len(word_result["cells"])) # 3. Load image for box detection img_png = await get_session_image(session_id, "cropped") if not img_png: img_png = await get_session_image(session_id, "dewarped") if not img_png: img_png = await get_session_image(session_id, "original") zones_data: List[Dict[str, Any]] = [] boxes_detected = 0 content_x, content_y, content_w, content_h = _get_content_bounds(all_words) if img_png: # Decode image for box detection arr = np.frombuffer(img_png, dtype=np.uint8) img_bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR) if img_bgr is not None: # Detect bordered boxes boxes = detect_boxes( img_bgr, content_x=content_x, content_w=content_w, content_y=content_y, content_h=content_h, ) boxes_detected = len(boxes) if boxes: # Split page into zones page_zones = split_page_into_zones( content_x, content_y, content_w, content_h, boxes ) for pz in page_zones: zone_words = _words_in_zone( all_words, pz.y, pz.height, pz.x, pz.width ) grid = _build_zone_grid( zone_words, pz.x, pz.y, pz.width, pz.height, pz.index, img_w, img_h, ) zone_entry: Dict[str, Any] = { "zone_index": pz.index, "zone_type": pz.zone_type, "bbox_px": { "x": pz.x, "y": pz.y, "w": pz.width, "h": pz.height, }, "bbox_pct": { "x": round(pz.x / img_w * 100, 2) if img_w else 0, "y": round(pz.y / img_h * 100, 2) if img_h else 0, "w": round(pz.width / img_w * 100, 2) if img_w else 0, "h": round(pz.height / img_h * 100, 2) if img_h else 0, }, "border": None, "word_count": len(zone_words), **grid, } if pz.box: zone_entry["border"] = { "thickness": pz.box.border_thickness, "confidence": pz.box.confidence, } zones_data.append(zone_entry) # 4. Fallback: no boxes detected → single zone with all words if not zones_data: grid = _build_zone_grid( all_words, content_x, content_y, content_w, content_h, 0, img_w, img_h, ) zones_data.append({ "zone_index": 0, "zone_type": "content", "bbox_px": { "x": content_x, "y": content_y, "w": content_w, "h": content_h, }, "bbox_pct": { "x": round(content_x / img_w * 100, 2) if img_w else 0, "y": round(content_y / img_h * 100, 2) if img_h else 0, "w": round(content_w / img_w * 100, 2) if img_w else 0, "h": round(content_h / img_h * 100, 2) if img_h else 0, }, "border": None, "word_count": len(all_words), **grid, }) duration = time.time() - t0 # 5. Build result total_cells = sum(len(z.get("cells", [])) for z in zones_data) total_columns = sum(len(z.get("columns", [])) for z in zones_data) total_rows = sum(len(z.get("rows", [])) for z in zones_data) result = { "session_id": session_id, "image_width": img_w, "image_height": img_h, "zones": zones_data, "boxes_detected": boxes_detected, "summary": { "total_zones": len(zones_data), "total_columns": total_columns, "total_rows": total_rows, "total_cells": total_cells, "total_words": len(all_words), }, "formatting": { "bold_columns": [], "header_rows": [], }, "duration_seconds": round(duration, 2), } # 6. Persist to DB await update_session_db(session_id, grid_editor_result=result) logger.info( "build-grid session %s: %d zones, %d cols, %d rows, %d cells, " "%d boxes in %.2fs", session_id, len(zones_data), total_columns, total_rows, total_cells, boxes_detected, duration, ) return result @router.post("/sessions/{session_id}/save-grid") async def save_grid(session_id: str, request: Request): """Save edited grid data from the frontend Excel-like editor. Receives the full StructuredGrid with user edits (text changes, formatting changes like bold columns, header rows, etc.) and persists it to the session's grid_editor_result. """ session = await get_session_db(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found") body = await request.json() # Validate basic structure if "zones" not in body: raise HTTPException(status_code=400, detail="Missing 'zones' in request body") # Preserve metadata from the original build existing = session.get("grid_editor_result") or {} result = { "session_id": session_id, "image_width": body.get("image_width", existing.get("image_width", 0)), "image_height": body.get("image_height", existing.get("image_height", 0)), "zones": body["zones"], "boxes_detected": body.get("boxes_detected", existing.get("boxes_detected", 0)), "summary": body.get("summary", existing.get("summary", {})), "formatting": body.get("formatting", existing.get("formatting", {})), "duration_seconds": existing.get("duration_seconds", 0), "edited": True, } await update_session_db(session_id, grid_editor_result=result) logger.info("save-grid session %s: %d zones saved", session_id, len(body["zones"])) return {"session_id": session_id, "saved": True} @router.get("/sessions/{session_id}/grid-editor") async def get_grid(session_id: str): """Retrieve the current grid editor state for a session.""" session = await get_session_db(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found") result = session.get("grid_editor_result") if not result: raise HTTPException( status_code=404, detail="No grid editor data. Run build-grid first.", ) return result