Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Failing after 19s
CI / test-go-edu-search (push) Failing after 23s
CI / test-python-klausur (push) Failing after 10s
CI / test-python-agent-core (push) Failing after 9s
CI / test-nodejs-website (push) Failing after 26s
grid_editor_api.py: 2411 → 474 lines - Extracted _build_grid_core() (1892 lines) into grid_build_core.py - API file now only contains endpoints (build, save, get, gutter, box, unified) StepAnsicht.tsx: 212 → 112 lines - Removed useGridEditor imports (not needed for read-only spreadsheet) - Removed unified grid fetch/build (not used with multi-sheet approach) - Removed Spreadsheet/Grid toggle (only spreadsheet mode now) - Simple: fetch grid-editor data → pass to SpreadsheetView Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
475 lines
17 KiB
Python
475 lines
17 KiB
Python
"""
|
|
Grid Editor API — endpoints for grid building, editing, and export.
|
|
|
|
The core grid building logic is in grid_build_core.py.
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
import time
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from fastapi import APIRouter, HTTPException, Query, Request
|
|
|
|
from grid_build_core import _build_grid_core
|
|
from grid_editor_helpers import _words_in_zone
|
|
from ocr_pipeline_session_store import (
|
|
get_session_db,
|
|
update_session_db,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["grid-editor"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/sessions/{session_id}/build-grid")
|
|
async def build_grid(
|
|
session_id: str,
|
|
ipa_mode: str = Query("auto", pattern="^(auto|all|de|en|none)$"),
|
|
syllable_mode: str = Query("auto", pattern="^(auto|all|de|en|none)$"),
|
|
):
|
|
"""Build a structured, zone-aware grid from existing Kombi word results.
|
|
|
|
Requires that paddle-kombi or rapid-kombi has already been run on the session.
|
|
Uses the image for box detection and the word positions for grid structuring.
|
|
|
|
Query params:
|
|
ipa_mode: "auto" (only when English IPA detected), "all" (force), "none" (skip)
|
|
syllable_mode: "auto" (only when original has dividers), "all" (force), "none" (skip)
|
|
|
|
Returns a StructuredGrid with zones, each containing their own
|
|
columns, rows, and cells — ready for the frontend Excel-like editor.
|
|
"""
|
|
session = await get_session_db(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
|
|
try:
|
|
result = await _build_grid_core(
|
|
session_id, session,
|
|
ipa_mode=ipa_mode, syllable_mode=syllable_mode,
|
|
)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
# Save automatic grid snapshot for later comparison with manual corrections
|
|
# Lazy import to avoid circular dependency with ocr_pipeline_regression
|
|
from ocr_pipeline_regression import _build_reference_snapshot
|
|
|
|
wr = session.get("word_result") or {}
|
|
engine = wr.get("ocr_engine", "")
|
|
if engine in ("kombi", "rapid_kombi"):
|
|
auto_pipeline = "kombi"
|
|
elif engine == "paddle_direct":
|
|
auto_pipeline = "paddle-direct"
|
|
else:
|
|
auto_pipeline = "pipeline"
|
|
auto_snapshot = _build_reference_snapshot(result, pipeline=auto_pipeline)
|
|
|
|
gt = session.get("ground_truth") or {}
|
|
gt["auto_grid_snapshot"] = auto_snapshot
|
|
|
|
# Persist to DB and advance current_step to 11 (reconstruction complete)
|
|
await update_session_db(session_id, grid_editor_result=result, ground_truth=gt, current_step=11)
|
|
|
|
logger.info(
|
|
"build-grid session %s: %d zones, %d cols, %d rows, %d cells, "
|
|
"%d boxes in %.2fs",
|
|
session_id,
|
|
len(result.get("zones", [])),
|
|
result.get("summary", {}).get("total_columns", 0),
|
|
result.get("summary", {}).get("total_rows", 0),
|
|
result.get("summary", {}).get("total_cells", 0),
|
|
result.get("boxes_detected", 0),
|
|
result.get("duration_seconds", 0),
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
@router.post("/sessions/{session_id}/save-grid")
|
|
async def save_grid(session_id: str, request: Request):
|
|
"""Save edited grid data from the frontend Excel-like editor.
|
|
|
|
Receives the full StructuredGrid with user edits (text changes,
|
|
formatting changes like bold columns, header rows, etc.) and
|
|
persists it to the session's grid_editor_result.
|
|
"""
|
|
session = await get_session_db(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
|
|
body = await request.json()
|
|
|
|
# Validate basic structure
|
|
if "zones" not in body:
|
|
raise HTTPException(status_code=400, detail="Missing 'zones' in request body")
|
|
|
|
# Preserve metadata from the original build
|
|
existing = session.get("grid_editor_result") or {}
|
|
result = {
|
|
"session_id": session_id,
|
|
"image_width": body.get("image_width", existing.get("image_width", 0)),
|
|
"image_height": body.get("image_height", existing.get("image_height", 0)),
|
|
"zones": body["zones"],
|
|
"boxes_detected": body.get("boxes_detected", existing.get("boxes_detected", 0)),
|
|
"summary": body.get("summary", existing.get("summary", {})),
|
|
"formatting": body.get("formatting", existing.get("formatting", {})),
|
|
"duration_seconds": existing.get("duration_seconds", 0),
|
|
"edited": True,
|
|
}
|
|
|
|
await update_session_db(session_id, grid_editor_result=result, current_step=11)
|
|
|
|
logger.info("save-grid session %s: %d zones saved", session_id, len(body["zones"]))
|
|
|
|
return {"session_id": session_id, "saved": True}
|
|
|
|
|
|
@router.get("/sessions/{session_id}/grid-editor")
|
|
async def get_grid(session_id: str):
|
|
"""Retrieve the current grid editor state for a session."""
|
|
session = await get_session_db(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
|
|
result = session.get("grid_editor_result")
|
|
if not result:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail="No grid editor data. Run build-grid first.",
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Gutter Repair endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/sessions/{session_id}/gutter-repair")
|
|
async def gutter_repair(session_id: str):
|
|
"""Analyse grid for gutter-edge OCR errors and return repair suggestions.
|
|
|
|
Detects:
|
|
- Words truncated/blurred at the book binding (spell_fix)
|
|
- Words split across rows with missing hyphen chars (hyphen_join)
|
|
"""
|
|
session = await get_session_db(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
|
|
grid_data = session.get("grid_editor_result")
|
|
if not grid_data:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="No grid data. Run build-grid first.",
|
|
)
|
|
|
|
from cv_gutter_repair import analyse_grid_for_gutter_repair
|
|
|
|
image_width = grid_data.get("image_width", 0)
|
|
result = analyse_grid_for_gutter_repair(grid_data, image_width=image_width)
|
|
|
|
# Persist suggestions in ground_truth.gutter_repair (avoids DB migration)
|
|
gt = session.get("ground_truth") or {}
|
|
gt["gutter_repair"] = result
|
|
await update_session_db(session_id, ground_truth=gt)
|
|
|
|
logger.info(
|
|
"gutter-repair session %s: %d suggestions in %.2fs",
|
|
session_id,
|
|
result.get("stats", {}).get("suggestions_found", 0),
|
|
result.get("duration_seconds", 0),
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
@router.post("/sessions/{session_id}/gutter-repair/apply")
|
|
async def gutter_repair_apply(session_id: str, request: Request):
|
|
"""Apply accepted gutter repair suggestions to the grid.
|
|
|
|
Body: { "accepted": ["suggestion_id_1", "suggestion_id_2", ...] }
|
|
"""
|
|
session = await get_session_db(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
|
|
grid_data = session.get("grid_editor_result")
|
|
if not grid_data:
|
|
raise HTTPException(status_code=400, detail="No grid data.")
|
|
|
|
gt = session.get("ground_truth") or {}
|
|
gutter_result = gt.get("gutter_repair")
|
|
if not gutter_result:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="No gutter repair data. Run gutter-repair first.",
|
|
)
|
|
|
|
body = await request.json()
|
|
accepted_ids = body.get("accepted", [])
|
|
if not accepted_ids:
|
|
return {"applied_count": 0, "changes": []}
|
|
|
|
# text_overrides: { suggestion_id: "alternative_text" }
|
|
# Allows the user to pick a different correction from the alternatives list
|
|
text_overrides = body.get("text_overrides", {})
|
|
|
|
from cv_gutter_repair import apply_gutter_suggestions
|
|
|
|
suggestions = gutter_result.get("suggestions", [])
|
|
|
|
# Apply user-selected alternatives before passing to apply
|
|
for s in suggestions:
|
|
sid = s.get("id", "")
|
|
if sid in text_overrides and text_overrides[sid]:
|
|
s["suggested_text"] = text_overrides[sid]
|
|
|
|
result = apply_gutter_suggestions(grid_data, accepted_ids, suggestions)
|
|
|
|
# Save updated grid back to session
|
|
await update_session_db(session_id, grid_editor_result=grid_data)
|
|
|
|
logger.info(
|
|
"gutter-repair/apply session %s: %d changes applied",
|
|
session_id,
|
|
result.get("applied_count", 0),
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Box-Grid-Review endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/sessions/{session_id}/build-box-grids")
|
|
async def build_box_grids(session_id: str, request: Request):
|
|
"""Rebuild grid structure for all detected boxes with layout-aware detection.
|
|
|
|
Uses structure_result.boxes (from Step 7) as the source of box coordinates,
|
|
and raw_paddle_words as OCR word source. Creates or updates box zones in
|
|
the grid_editor_result.
|
|
|
|
Optional body: { "overrides": { "0": "bullet_list" } }
|
|
Maps box_index → forced layout_type.
|
|
"""
|
|
session = await get_session_db(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
|
|
grid_data = session.get("grid_editor_result")
|
|
if not grid_data:
|
|
raise HTTPException(status_code=400, detail="No grid data. Run build-grid first.")
|
|
|
|
# Get raw OCR words (with top/left/width/height keys)
|
|
word_result = session.get("word_result") or {}
|
|
all_words = word_result.get("raw_paddle_words") or word_result.get("raw_tesseract_words") or []
|
|
if not all_words:
|
|
raise HTTPException(status_code=400, detail="No raw OCR words available.")
|
|
|
|
# Get detected boxes from structure_result
|
|
structure_result = session.get("structure_result") or {}
|
|
gt = session.get("ground_truth") or {}
|
|
if not structure_result:
|
|
structure_result = gt.get("structure_result") or {}
|
|
detected_boxes = structure_result.get("boxes") or []
|
|
if not detected_boxes:
|
|
return {"session_id": session_id, "box_zones_rebuilt": 0, "spell_fixes": 0, "message": "No boxes detected"}
|
|
|
|
# Filter out false-positive boxes in header/footer margins.
|
|
# Textbook pages have ~2.5cm margins at top/bottom. At typical scan
|
|
# resolutions (150-300 DPI), that's roughly 5-10% of image height.
|
|
# A box whose vertical CENTER falls within the top or bottom 7% of
|
|
# the image is likely a page number, unit header, or running footer.
|
|
img_h_for_filter = grid_data.get("image_height", 0) or word_result.get("image_height", 0)
|
|
if img_h_for_filter > 0:
|
|
margin_frac = 0.07 # 7% of image height
|
|
margin_top = img_h_for_filter * margin_frac
|
|
margin_bottom = img_h_for_filter * (1 - margin_frac)
|
|
filtered = []
|
|
for box in detected_boxes:
|
|
by = box.get("y", 0)
|
|
bh = box.get("h", 0)
|
|
box_center_y = by + bh / 2
|
|
if box_center_y < margin_top or box_center_y > margin_bottom:
|
|
logger.info("build-box-grids: skipping header/footer box at y=%d h=%d (center=%.0f, margins=%.0f/%.0f)",
|
|
by, bh, box_center_y, margin_top, margin_bottom)
|
|
continue
|
|
filtered.append(box)
|
|
detected_boxes = filtered
|
|
|
|
body = {}
|
|
try:
|
|
body = await request.json()
|
|
except Exception:
|
|
pass
|
|
layout_overrides = body.get("overrides", {})
|
|
|
|
from cv_box_layout import build_box_zone_grid
|
|
from grid_editor_helpers import _words_in_zone
|
|
|
|
img_w = grid_data.get("image_width", 0) or word_result.get("image_width", 0)
|
|
img_h = grid_data.get("image_height", 0) or word_result.get("image_height", 0)
|
|
|
|
zones = grid_data.get("zones", [])
|
|
|
|
# Find highest existing zone_index
|
|
max_zone_idx = max((z.get("zone_index", 0) for z in zones), default=-1)
|
|
|
|
# Remove old box zones (we'll rebuild them)
|
|
zones = [z for z in zones if z.get("zone_type") != "box"]
|
|
|
|
box_count = 0
|
|
spell_fixes = 0
|
|
|
|
for box_idx, box in enumerate(detected_boxes):
|
|
bx = box.get("x", 0)
|
|
by = box.get("y", 0)
|
|
bw = box.get("w", 0)
|
|
bh = box.get("h", 0)
|
|
|
|
if bw <= 0 or bh <= 0:
|
|
continue
|
|
|
|
# Filter raw OCR words inside this box
|
|
zone_words = _words_in_zone(all_words, by, bh, bx, bw)
|
|
if not zone_words:
|
|
logger.info("Box %d: no words found in bbox (%d,%d,%d,%d)", box_idx, bx, by, bw, bh)
|
|
continue
|
|
|
|
zone_idx = max_zone_idx + 1 + box_idx
|
|
forced_layout = layout_overrides.get(str(box_idx))
|
|
|
|
# Build box grid
|
|
box_grid = build_box_zone_grid(
|
|
zone_words, bx, by, bw, bh,
|
|
zone_idx, img_w, img_h,
|
|
layout_type=forced_layout,
|
|
)
|
|
|
|
# Apply SmartSpellChecker to all box cells
|
|
try:
|
|
from smart_spell import SmartSpellChecker
|
|
ssc = SmartSpellChecker()
|
|
for cell in box_grid.get("cells", []):
|
|
text = cell.get("text", "")
|
|
if not text:
|
|
continue
|
|
result = ssc.correct_text(text, lang="auto")
|
|
if result.changed:
|
|
cell["text"] = result.corrected
|
|
spell_fixes += 1
|
|
except ImportError:
|
|
pass
|
|
|
|
# Build zone entry
|
|
zone_entry = {
|
|
"zone_index": zone_idx,
|
|
"zone_type": "box",
|
|
"bbox_px": {"x": bx, "y": by, "w": bw, "h": bh},
|
|
"bbox_pct": {
|
|
"x": round(bx / img_w * 100, 2) if img_w else 0,
|
|
"y": round(by / img_h * 100, 2) if img_h else 0,
|
|
"w": round(bw / img_w * 100, 2) if img_w else 0,
|
|
"h": round(bh / img_h * 100, 2) if img_h else 0,
|
|
},
|
|
"border": None,
|
|
"word_count": len(zone_words),
|
|
"columns": box_grid["columns"],
|
|
"rows": box_grid["rows"],
|
|
"cells": box_grid["cells"],
|
|
"header_rows": box_grid.get("header_rows", []),
|
|
"box_layout_type": box_grid.get("box_layout_type", "flowing"),
|
|
"box_grid_reviewed": False,
|
|
"box_bg_color": box.get("bg_color_name", ""),
|
|
"box_bg_hex": box.get("bg_color_hex", ""),
|
|
}
|
|
zones.append(zone_entry)
|
|
box_count += 1
|
|
|
|
# Sort zones by y-position for correct reading order
|
|
zones.sort(key=lambda z: z.get("bbox_px", {}).get("y", 0))
|
|
|
|
grid_data["zones"] = zones
|
|
await update_session_db(session_id, grid_editor_result=grid_data)
|
|
|
|
logger.info(
|
|
"build-box-grids session %s: %d boxes processed (%d words spell-fixed) from %d detected",
|
|
session_id, box_count, spell_fixes, len(detected_boxes),
|
|
)
|
|
|
|
return {
|
|
"session_id": session_id,
|
|
"box_zones_rebuilt": box_count,
|
|
"total_detected_boxes": len(detected_boxes),
|
|
"spell_fixes": spell_fixes,
|
|
"zones": zones,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Unified Grid endpoint
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/sessions/{session_id}/build-unified-grid")
|
|
async def build_unified_grid_endpoint(session_id: str):
|
|
"""Build a single-zone unified grid merging content + box zones.
|
|
|
|
Takes the existing multi-zone grid_editor_result and produces a
|
|
unified grid where boxes are integrated into the main row sequence.
|
|
Persists as unified_grid_result (preserves original multi-zone data).
|
|
"""
|
|
session = await get_session_db(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
|
|
grid_data = session.get("grid_editor_result")
|
|
if not grid_data:
|
|
raise HTTPException(status_code=400, detail="No grid data. Run build-grid first.")
|
|
|
|
from unified_grid import build_unified_grid
|
|
|
|
result = build_unified_grid(
|
|
zones=grid_data.get("zones", []),
|
|
image_width=grid_data.get("image_width", 0),
|
|
image_height=grid_data.get("image_height", 0),
|
|
layout_metrics=grid_data.get("layout_metrics", {}),
|
|
)
|
|
|
|
# Persist as separate field (don't overwrite original multi-zone grid)
|
|
await update_session_db(session_id, unified_grid_result=result)
|
|
|
|
logger.info(
|
|
"build-unified-grid session %s: %d rows, %d cells",
|
|
session_id,
|
|
result.get("summary", {}).get("total_rows", 0),
|
|
result.get("summary", {}).get("total_cells", 0),
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
@router.get("/sessions/{session_id}/unified-grid")
|
|
async def get_unified_grid(session_id: str):
|
|
"""Retrieve the unified grid for a session."""
|
|
session = await get_session_db(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
|
|
result = session.get("unified_grid_result")
|
|
if not result:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail="No unified grid. Run build-unified-grid first.",
|
|
)
|
|
|
|
return result
|