""" Grid Editor API — endpoints for grid building, editing, and export. The core grid building logic is in grid_build_core.py. """ import logging import re import time from typing import Any, Dict, List, Optional, Tuple from fastapi import APIRouter, HTTPException, Query, Request from grid_build_core import _build_grid_core from grid_editor_helpers import _words_in_zone from ocr_pipeline_session_store import ( get_session_db, update_session_db, ) from ocr_pipeline_common import ( _cache, _load_session_to_cache, _get_cached, ) logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["grid-editor"]) # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @router.post("/sessions/{session_id}/build-grid") async def build_grid( session_id: str, ipa_mode: str = Query("auto", pattern="^(auto|all|de|en|none)$"), syllable_mode: str = Query("auto", pattern="^(auto|all|de|en|none)$"), enhance: bool = Query(True, description="Step 3: CLAHE + denoise for degraded scans"), max_cols: int = Query(0, description="Step 2: Max column count (0=unlimited)"), min_conf: int = Query(0, description="Step 1: Min OCR confidence (0=auto)"), ): """Build a structured, zone-aware grid from existing Kombi word results. Requires that paddle-kombi or rapid-kombi has already been run on the session. Uses the image for box detection and the word positions for grid structuring. Query params: ipa_mode: "auto" (only when English IPA detected), "all" (force), "none" (skip) syllable_mode: "auto" (only when original has dividers), "all" (force), "none" (skip) Returns a StructuredGrid with zones, each containing their own columns, rows, and cells — ready for the frontend Excel-like editor. """ session = await get_session_db(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found") try: result = await _build_grid_core( session_id, session, ipa_mode=ipa_mode, syllable_mode=syllable_mode, enhance=enhance, max_columns=max_cols if max_cols > 0 else None, min_conf=min_conf if min_conf > 0 else None, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) # Save automatic grid snapshot for later comparison with manual corrections # Lazy import to avoid circular dependency with ocr_pipeline_regression from ocr_pipeline_regression import _build_reference_snapshot wr = session.get("word_result") or {} engine = wr.get("ocr_engine", "") if engine in ("kombi", "rapid_kombi"): auto_pipeline = "kombi" elif engine == "paddle_direct": auto_pipeline = "paddle-direct" else: auto_pipeline = "pipeline" auto_snapshot = _build_reference_snapshot(result, pipeline=auto_pipeline) gt = session.get("ground_truth") or {} gt["auto_grid_snapshot"] = auto_snapshot # Persist to DB and advance current_step to 11 (reconstruction complete) await update_session_db(session_id, grid_editor_result=result, ground_truth=gt, current_step=11) logger.info( "build-grid session %s: %d zones, %d cols, %d rows, %d cells, " "%d boxes in %.2fs", session_id, len(result.get("zones", [])), result.get("summary", {}).get("total_columns", 0), result.get("summary", {}).get("total_rows", 0), result.get("summary", {}).get("total_cells", 0), result.get("boxes_detected", 0), result.get("duration_seconds", 0), ) return result @router.post("/sessions/{session_id}/rerun-ocr-and-build-grid") async def rerun_ocr_and_build_grid( session_id: str, ipa_mode: str = Query("auto", pattern="^(auto|all|de|en|none)$"), syllable_mode: str = Query("auto", pattern="^(auto|all|de|en|none)$"), enhance: bool = Query(True, description="Step 3: CLAHE + denoise for degraded scans"), max_cols: int = Query(0, description="Step 2: Max column count (0=unlimited)"), min_conf: int = Query(0, description="Step 1: Min OCR confidence (0=auto)"), vision_fusion: bool = Query(False, description="Step 4: Vision-LLM fusion for degraded scans"), doc_category: str = Query("", description="Document type for Vision-LLM prompt context"), ): """Re-run OCR with quality settings, then rebuild the grid. Unlike build-grid (which only rebuilds from existing words), this endpoint re-runs the full OCR pipeline on the cropped image with optional CLAHE enhancement, then builds the grid. Steps executed: Image Enhancement → OCR → Grid Build """ session = await get_session_db(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found") import time as _time t0 = _time.time() # 1. Load the cropped/dewarped image from cache or session if session_id not in _cache: await _load_session_to_cache(session_id) cached = _get_cached(session_id) dewarped_bgr = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr") if dewarped_bgr is None: raise HTTPException(status_code=400, detail="No cropped/dewarped image available. Run preprocessing steps first.") import numpy as np img_h, img_w = dewarped_bgr.shape[:2] ocr_input = dewarped_bgr.copy() # 2. Scan quality assessment scan_quality_info = {} try: from scan_quality import score_scan_quality quality_report = score_scan_quality(ocr_input) scan_quality_info = quality_report.to_dict() actual_min_conf = min_conf if min_conf > 0 else quality_report.recommended_min_conf except Exception as e: logger.warning(f"rerun-ocr: scan quality failed: {e}") actual_min_conf = min_conf if min_conf > 0 else 40 # 3. Image enhancement (Step 3) is_degraded = scan_quality_info.get("is_degraded", False) if enhance and is_degraded: try: from ocr_image_enhance import enhance_for_ocr ocr_input = enhance_for_ocr(ocr_input, is_degraded=True) logger.info("rerun-ocr: CLAHE enhancement applied") except Exception as e: logger.warning(f"rerun-ocr: enhancement failed: {e}") # 4. Run dual-engine OCR from PIL import Image import pytesseract # RapidOCR rapid_words = [] try: from cv_ocr_engines import ocr_region_rapid from cv_vocab_types import PageRegion full_region = PageRegion(type="full_page", x=0, y=0, width=img_w, height=img_h) rapid_words = ocr_region_rapid(ocr_input, full_region) or [] except Exception as e: logger.warning(f"rerun-ocr: RapidOCR failed: {e}") # Tesseract pil_img = Image.fromarray(ocr_input[:, :, ::-1]) data = pytesseract.image_to_data(pil_img, lang='eng+deu', config='--psm 6 --oem 3', output_type=pytesseract.Output.DICT) tess_words = [] for i in range(len(data["text"])): text = (data["text"][i] or "").strip() conf_raw = str(data["conf"][i]) conf = int(conf_raw) if conf_raw.lstrip("-").isdigit() else -1 if not text or conf < actual_min_conf: continue tess_words.append({ "text": text, "left": data["left"][i], "top": data["top"][i], "width": data["width"][i], "height": data["height"][i], "conf": conf, }) # 5. Merge OCR results from ocr_pipeline_ocr_merge import _split_paddle_multi_words, _merge_paddle_tesseract, _deduplicate_words rapid_split = _split_paddle_multi_words(rapid_words) if rapid_words else [] if rapid_split or tess_words: merged_words = _merge_paddle_tesseract(rapid_split, tess_words, img_w, img_h) merged_words = _deduplicate_words(merged_words) else: merged_words = tess_words # 6. Store updated word_result in session cells_for_storage = [{"text": w["text"], "left": w["left"], "top": w["top"], "width": w["width"], "height": w["height"], "conf": w.get("conf", 0)} for w in merged_words] word_result = { "cells": [{"text": " ".join(w["text"] for w in merged_words), "word_boxes": cells_for_storage}], "image_width": img_w, "image_height": img_h, "ocr_engine": "rapid_kombi", "word_count": len(merged_words), "raw_paddle_words": rapid_words, } # 6b. Vision-LLM Fusion (Step 4) — correct OCR using Vision model vision_applied = False if vision_fusion: try: from vision_ocr_fusion import vision_fuse_ocr category = doc_category or session.get("document_category") or "vokabelseite" logger.info(f"rerun-ocr: running Vision-LLM fusion (category={category})") merged_words = await vision_fuse_ocr(ocr_input, merged_words, category) vision_applied = True # Rebuild storage from fused words cells_for_storage = [{"text": w["text"], "left": w["left"], "top": w["top"], "width": w["width"], "height": w["height"], "conf": w.get("conf", 0)} for w in merged_words] word_result["cells"] = [{"text": " ".join(w["text"] for w in merged_words), "word_boxes": cells_for_storage}] word_result["word_count"] = len(merged_words) word_result["ocr_engine"] = "vision_fusion" except Exception as e: logger.warning(f"rerun-ocr: Vision-LLM fusion failed: {e}") await update_session_db(session_id, word_result=word_result) # Reload session with updated word_result session = await get_session_db(session_id) ocr_duration = _time.time() - t0 logger.info( "rerun-ocr session %s: %d words (rapid=%d, tess=%d, merged=%d) in %.1fs " "(enhance=%s, min_conf=%d, quality=%s)", session_id, len(merged_words), len(rapid_words), len(tess_words), len(merged_words), ocr_duration, enhance, actual_min_conf, scan_quality_info.get("quality_pct", "?"), ) # 7. Build grid from new words try: result = await _build_grid_core( session_id, session, ipa_mode=ipa_mode, syllable_mode=syllable_mode, enhance=enhance, max_columns=max_cols if max_cols > 0 else None, min_conf=min_conf if min_conf > 0 else None, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) # Persist grid await update_session_db(session_id, grid_editor_result=result, current_step=11) # Add quality info to response result["scan_quality"] = scan_quality_info result["ocr_stats"] = { "rapid_words": len(rapid_words), "tess_words": len(tess_words), "merged_words": len(merged_words), "min_conf_used": actual_min_conf, "enhance_applied": enhance and is_degraded, "vision_fusion_applied": vision_applied, "document_category": doc_category or session.get("document_category", ""), "ocr_duration_seconds": round(ocr_duration, 1), } total_duration = _time.time() - t0 logger.info( "rerun-ocr+build-grid session %s: %d zones, %d cols, %d cells in %.1fs", session_id, len(result.get("zones", [])), result.get("summary", {}).get("total_columns", 0), result.get("summary", {}).get("total_cells", 0), total_duration, ) return result @router.post("/sessions/{session_id}/save-grid") async def save_grid(session_id: str, request: Request): """Save edited grid data from the frontend Excel-like editor. Receives the full StructuredGrid with user edits (text changes, formatting changes like bold columns, header rows, etc.) and persists it to the session's grid_editor_result. """ session = await get_session_db(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found") body = await request.json() # Validate basic structure if "zones" not in body: raise HTTPException(status_code=400, detail="Missing 'zones' in request body") # Preserve metadata from the original build existing = session.get("grid_editor_result") or {} result = { "session_id": session_id, "image_width": body.get("image_width", existing.get("image_width", 0)), "image_height": body.get("image_height", existing.get("image_height", 0)), "zones": body["zones"], "boxes_detected": body.get("boxes_detected", existing.get("boxes_detected", 0)), "summary": body.get("summary", existing.get("summary", {})), "formatting": body.get("formatting", existing.get("formatting", {})), "duration_seconds": existing.get("duration_seconds", 0), "edited": True, } await update_session_db(session_id, grid_editor_result=result, current_step=11) logger.info("save-grid session %s: %d zones saved", session_id, len(body["zones"])) return {"session_id": session_id, "saved": True} @router.get("/sessions/{session_id}/grid-editor") async def get_grid(session_id: str): """Retrieve the current grid editor state for a session.""" session = await get_session_db(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found") result = session.get("grid_editor_result") if not result: raise HTTPException( status_code=404, detail="No grid editor data. Run build-grid first.", ) return result # --------------------------------------------------------------------------- # Gutter Repair endpoints # --------------------------------------------------------------------------- @router.post("/sessions/{session_id}/gutter-repair") async def gutter_repair(session_id: str): """Analyse grid for gutter-edge OCR errors and return repair suggestions. Detects: - Words truncated/blurred at the book binding (spell_fix) - Words split across rows with missing hyphen chars (hyphen_join) """ session = await get_session_db(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found") grid_data = session.get("grid_editor_result") if not grid_data: raise HTTPException( status_code=400, detail="No grid data. Run build-grid first.", ) from cv_gutter_repair import analyse_grid_for_gutter_repair image_width = grid_data.get("image_width", 0) result = analyse_grid_for_gutter_repair(grid_data, image_width=image_width) # Persist suggestions in ground_truth.gutter_repair (avoids DB migration) gt = session.get("ground_truth") or {} gt["gutter_repair"] = result await update_session_db(session_id, ground_truth=gt) logger.info( "gutter-repair session %s: %d suggestions in %.2fs", session_id, result.get("stats", {}).get("suggestions_found", 0), result.get("duration_seconds", 0), ) return result @router.post("/sessions/{session_id}/gutter-repair/apply") async def gutter_repair_apply(session_id: str, request: Request): """Apply accepted gutter repair suggestions to the grid. Body: { "accepted": ["suggestion_id_1", "suggestion_id_2", ...] } """ session = await get_session_db(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found") grid_data = session.get("grid_editor_result") if not grid_data: raise HTTPException(status_code=400, detail="No grid data.") gt = session.get("ground_truth") or {} gutter_result = gt.get("gutter_repair") if not gutter_result: raise HTTPException( status_code=400, detail="No gutter repair data. Run gutter-repair first.", ) body = await request.json() accepted_ids = body.get("accepted", []) if not accepted_ids: return {"applied_count": 0, "changes": []} # text_overrides: { suggestion_id: "alternative_text" } # Allows the user to pick a different correction from the alternatives list text_overrides = body.get("text_overrides", {}) from cv_gutter_repair import apply_gutter_suggestions suggestions = gutter_result.get("suggestions", []) # Apply user-selected alternatives before passing to apply for s in suggestions: sid = s.get("id", "") if sid in text_overrides and text_overrides[sid]: s["suggested_text"] = text_overrides[sid] result = apply_gutter_suggestions(grid_data, accepted_ids, suggestions) # Save updated grid back to session await update_session_db(session_id, grid_editor_result=grid_data) logger.info( "gutter-repair/apply session %s: %d changes applied", session_id, result.get("applied_count", 0), ) return result # --------------------------------------------------------------------------- # Box-Grid-Review endpoints # --------------------------------------------------------------------------- @router.post("/sessions/{session_id}/build-box-grids") async def build_box_grids(session_id: str, request: Request): """Rebuild grid structure for all detected boxes with layout-aware detection. Uses structure_result.boxes (from Step 7) as the source of box coordinates, and raw_paddle_words as OCR word source. Creates or updates box zones in the grid_editor_result. Optional body: { "overrides": { "0": "bullet_list" } } Maps box_index → forced layout_type. """ session = await get_session_db(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found") grid_data = session.get("grid_editor_result") if not grid_data: raise HTTPException(status_code=400, detail="No grid data. Run build-grid first.") # Get raw OCR words (with top/left/width/height keys) word_result = session.get("word_result") or {} all_words = word_result.get("raw_paddle_words") or word_result.get("raw_tesseract_words") or [] if not all_words: raise HTTPException(status_code=400, detail="No raw OCR words available.") # Get detected boxes from structure_result structure_result = session.get("structure_result") or {} gt = session.get("ground_truth") or {} if not structure_result: structure_result = gt.get("structure_result") or {} detected_boxes = structure_result.get("boxes") or [] if not detected_boxes: return {"session_id": session_id, "box_zones_rebuilt": 0, "spell_fixes": 0, "message": "No boxes detected"} # Filter out false-positive boxes in header/footer margins. # Textbook pages have ~2.5cm margins at top/bottom. At typical scan # resolutions (150-300 DPI), that's roughly 5-10% of image height. # A box whose vertical CENTER falls within the top or bottom 7% of # the image is likely a page number, unit header, or running footer. img_h_for_filter = grid_data.get("image_height", 0) or word_result.get("image_height", 0) if img_h_for_filter > 0: margin_frac = 0.07 # 7% of image height margin_top = img_h_for_filter * margin_frac margin_bottom = img_h_for_filter * (1 - margin_frac) filtered = [] for box in detected_boxes: by = box.get("y", 0) bh = box.get("h", 0) box_center_y = by + bh / 2 if box_center_y < margin_top or box_center_y > margin_bottom: logger.info("build-box-grids: skipping header/footer box at y=%d h=%d (center=%.0f, margins=%.0f/%.0f)", by, bh, box_center_y, margin_top, margin_bottom) continue filtered.append(box) detected_boxes = filtered body = {} try: body = await request.json() except Exception: pass layout_overrides = body.get("overrides", {}) from cv_box_layout import build_box_zone_grid from grid_editor_helpers import _words_in_zone img_w = grid_data.get("image_width", 0) or word_result.get("image_width", 0) img_h = grid_data.get("image_height", 0) or word_result.get("image_height", 0) zones = grid_data.get("zones", []) # Find highest existing zone_index max_zone_idx = max((z.get("zone_index", 0) for z in zones), default=-1) # Remove old box zones (we'll rebuild them) zones = [z for z in zones if z.get("zone_type") != "box"] box_count = 0 spell_fixes = 0 for box_idx, box in enumerate(detected_boxes): bx = box.get("x", 0) by = box.get("y", 0) bw = box.get("w", 0) bh = box.get("h", 0) if bw <= 0 or bh <= 0: continue # Filter raw OCR words inside this box zone_words = _words_in_zone(all_words, by, bh, bx, bw) if not zone_words: logger.info("Box %d: no words found in bbox (%d,%d,%d,%d)", box_idx, bx, by, bw, bh) continue zone_idx = max_zone_idx + 1 + box_idx forced_layout = layout_overrides.get(str(box_idx)) # Build box grid box_grid = build_box_zone_grid( zone_words, bx, by, bw, bh, zone_idx, img_w, img_h, layout_type=forced_layout, ) # Apply SmartSpellChecker to all box cells try: from smart_spell import SmartSpellChecker ssc = SmartSpellChecker() for cell in box_grid.get("cells", []): text = cell.get("text", "") if not text: continue result = ssc.correct_text(text, lang="auto") if result.changed: cell["text"] = result.corrected spell_fixes += 1 except ImportError: pass # Build zone entry zone_entry = { "zone_index": zone_idx, "zone_type": "box", "bbox_px": {"x": bx, "y": by, "w": bw, "h": bh}, "bbox_pct": { "x": round(bx / img_w * 100, 2) if img_w else 0, "y": round(by / img_h * 100, 2) if img_h else 0, "w": round(bw / img_w * 100, 2) if img_w else 0, "h": round(bh / img_h * 100, 2) if img_h else 0, }, "border": None, "word_count": len(zone_words), "columns": box_grid["columns"], "rows": box_grid["rows"], "cells": box_grid["cells"], "header_rows": box_grid.get("header_rows", []), "box_layout_type": box_grid.get("box_layout_type", "flowing"), "box_grid_reviewed": False, "box_bg_color": box.get("bg_color_name", ""), "box_bg_hex": box.get("bg_color_hex", ""), } zones.append(zone_entry) box_count += 1 # Sort zones by y-position for correct reading order zones.sort(key=lambda z: z.get("bbox_px", {}).get("y", 0)) grid_data["zones"] = zones await update_session_db(session_id, grid_editor_result=grid_data) logger.info( "build-box-grids session %s: %d boxes processed (%d words spell-fixed) from %d detected", session_id, box_count, spell_fixes, len(detected_boxes), ) return { "session_id": session_id, "box_zones_rebuilt": box_count, "total_detected_boxes": len(detected_boxes), "spell_fixes": spell_fixes, "zones": zones, } # --------------------------------------------------------------------------- # Unified Grid endpoint # --------------------------------------------------------------------------- @router.post("/sessions/{session_id}/build-unified-grid") async def build_unified_grid_endpoint(session_id: str): """Build a single-zone unified grid merging content + box zones. Takes the existing multi-zone grid_editor_result and produces a unified grid where boxes are integrated into the main row sequence. Persists as unified_grid_result (preserves original multi-zone data). """ session = await get_session_db(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found") grid_data = session.get("grid_editor_result") if not grid_data: raise HTTPException(status_code=400, detail="No grid data. Run build-grid first.") from unified_grid import build_unified_grid result = build_unified_grid( zones=grid_data.get("zones", []), image_width=grid_data.get("image_width", 0), image_height=grid_data.get("image_height", 0), layout_metrics=grid_data.get("layout_metrics", {}), ) # Persist as separate field (don't overwrite original multi-zone grid) await update_session_db(session_id, unified_grid_result=result) logger.info( "build-unified-grid session %s: %d rows, %d cells", session_id, result.get("summary", {}).get("total_rows", 0), result.get("summary", {}).get("total_cells", 0), ) return result @router.get("/sessions/{session_id}/unified-grid") async def get_unified_grid(session_id: str): """Retrieve the unified grid for a session.""" session = await get_session_db(session_id) if not session: raise HTTPException(status_code=404, detail=f"Session {session_id} not found") result = session.get("unified_grid_result") if not result: raise HTTPException( status_code=404, detail="No unified grid. Run build-unified-grid first.", ) return result