Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 51s
CI / test-go-edu-search (push) Successful in 42s
CI / test-python-klausur (push) Failing after 2m53s
CI / test-python-agent-core (push) Successful in 21s
CI / test-nodejs-website (push) Successful in 55s
New endpoint POST /sessions/{id}/rerun-ocr-and-build-grid that:
1. Runs scan quality assessment
2. Applies CLAHE enhancement if degraded (controlled by enhance toggle)
3. Re-runs dual-engine OCR (RapidOCR + Tesseract) with min_conf filter
4. Merges OCR results and stores updated word_result
5. Builds grid with max_columns constraint
Frontend: Orange "OCR neu + Grid" button in GridToolbar.
Unlike "Neu berechnen" (which only rebuilds grid from existing words),
this button re-runs the full OCR pipeline with quality settings.
Now CLAHE toggle actually has an effect — it enhances the image
before OCR runs, not after.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
648 lines
24 KiB
Python
648 lines
24 KiB
Python
"""
|
|
Grid Editor API — endpoints for grid building, editing, and export.
|
|
|
|
The core grid building logic is in grid_build_core.py.
|
|
"""
|
|
|
|
import logging
|
|
import re
|
|
import time
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
from fastapi import APIRouter, HTTPException, Query, Request
|
|
|
|
from grid_build_core import _build_grid_core
|
|
from grid_editor_helpers import _words_in_zone
|
|
from ocr_pipeline_session_store import (
|
|
get_session_db,
|
|
update_session_db,
|
|
)
|
|
from ocr_pipeline_common import (
|
|
_cache,
|
|
_load_session_to_cache,
|
|
_get_cached,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["grid-editor"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/sessions/{session_id}/build-grid")
|
|
async def build_grid(
|
|
session_id: str,
|
|
ipa_mode: str = Query("auto", pattern="^(auto|all|de|en|none)$"),
|
|
syllable_mode: str = Query("auto", pattern="^(auto|all|de|en|none)$"),
|
|
enhance: bool = Query(True, description="Step 3: CLAHE + denoise for degraded scans"),
|
|
max_cols: int = Query(0, description="Step 2: Max column count (0=unlimited)"),
|
|
min_conf: int = Query(0, description="Step 1: Min OCR confidence (0=auto)"),
|
|
):
|
|
"""Build a structured, zone-aware grid from existing Kombi word results.
|
|
|
|
Requires that paddle-kombi or rapid-kombi has already been run on the session.
|
|
Uses the image for box detection and the word positions for grid structuring.
|
|
|
|
Query params:
|
|
ipa_mode: "auto" (only when English IPA detected), "all" (force), "none" (skip)
|
|
syllable_mode: "auto" (only when original has dividers), "all" (force), "none" (skip)
|
|
|
|
Returns a StructuredGrid with zones, each containing their own
|
|
columns, rows, and cells — ready for the frontend Excel-like editor.
|
|
"""
|
|
session = await get_session_db(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
|
|
try:
|
|
result = await _build_grid_core(
|
|
session_id, session,
|
|
ipa_mode=ipa_mode, syllable_mode=syllable_mode,
|
|
enhance=enhance,
|
|
max_columns=max_cols if max_cols > 0 else None,
|
|
min_conf=min_conf if min_conf > 0 else None,
|
|
)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
# Save automatic grid snapshot for later comparison with manual corrections
|
|
# Lazy import to avoid circular dependency with ocr_pipeline_regression
|
|
from ocr_pipeline_regression import _build_reference_snapshot
|
|
|
|
wr = session.get("word_result") or {}
|
|
engine = wr.get("ocr_engine", "")
|
|
if engine in ("kombi", "rapid_kombi"):
|
|
auto_pipeline = "kombi"
|
|
elif engine == "paddle_direct":
|
|
auto_pipeline = "paddle-direct"
|
|
else:
|
|
auto_pipeline = "pipeline"
|
|
auto_snapshot = _build_reference_snapshot(result, pipeline=auto_pipeline)
|
|
|
|
gt = session.get("ground_truth") or {}
|
|
gt["auto_grid_snapshot"] = auto_snapshot
|
|
|
|
# Persist to DB and advance current_step to 11 (reconstruction complete)
|
|
await update_session_db(session_id, grid_editor_result=result, ground_truth=gt, current_step=11)
|
|
|
|
logger.info(
|
|
"build-grid session %s: %d zones, %d cols, %d rows, %d cells, "
|
|
"%d boxes in %.2fs",
|
|
session_id,
|
|
len(result.get("zones", [])),
|
|
result.get("summary", {}).get("total_columns", 0),
|
|
result.get("summary", {}).get("total_rows", 0),
|
|
result.get("summary", {}).get("total_cells", 0),
|
|
result.get("boxes_detected", 0),
|
|
result.get("duration_seconds", 0),
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
@router.post("/sessions/{session_id}/rerun-ocr-and-build-grid")
|
|
async def rerun_ocr_and_build_grid(
|
|
session_id: str,
|
|
ipa_mode: str = Query("auto", pattern="^(auto|all|de|en|none)$"),
|
|
syllable_mode: str = Query("auto", pattern="^(auto|all|de|en|none)$"),
|
|
enhance: bool = Query(True, description="Step 3: CLAHE + denoise for degraded scans"),
|
|
max_cols: int = Query(0, description="Step 2: Max column count (0=unlimited)"),
|
|
min_conf: int = Query(0, description="Step 1: Min OCR confidence (0=auto)"),
|
|
):
|
|
"""Re-run OCR with quality settings, then rebuild the grid.
|
|
|
|
Unlike build-grid (which only rebuilds from existing words),
|
|
this endpoint re-runs the full OCR pipeline on the cropped image
|
|
with optional CLAHE enhancement, then builds the grid.
|
|
|
|
Steps executed: Image Enhancement → OCR → Grid Build
|
|
"""
|
|
session = await get_session_db(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
|
|
import time as _time
|
|
t0 = _time.time()
|
|
|
|
# 1. Load the cropped/dewarped image from cache or session
|
|
if session_id not in _cache:
|
|
await _load_session_to_cache(session_id)
|
|
cached = _get_cached(session_id)
|
|
|
|
dewarped_bgr = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr")
|
|
if dewarped_bgr is None:
|
|
raise HTTPException(status_code=400, detail="No cropped/dewarped image available. Run preprocessing steps first.")
|
|
|
|
import numpy as np
|
|
img_h, img_w = dewarped_bgr.shape[:2]
|
|
ocr_input = dewarped_bgr.copy()
|
|
|
|
# 2. Scan quality assessment
|
|
scan_quality_info = {}
|
|
try:
|
|
from scan_quality import score_scan_quality
|
|
quality_report = score_scan_quality(ocr_input)
|
|
scan_quality_info = quality_report.to_dict()
|
|
actual_min_conf = min_conf if min_conf > 0 else quality_report.recommended_min_conf
|
|
except Exception as e:
|
|
logger.warning(f"rerun-ocr: scan quality failed: {e}")
|
|
actual_min_conf = min_conf if min_conf > 0 else 40
|
|
|
|
# 3. Image enhancement (Step 3)
|
|
is_degraded = scan_quality_info.get("is_degraded", False)
|
|
if enhance and is_degraded:
|
|
try:
|
|
from ocr_image_enhance import enhance_for_ocr
|
|
ocr_input = enhance_for_ocr(ocr_input, is_degraded=True)
|
|
logger.info("rerun-ocr: CLAHE enhancement applied")
|
|
except Exception as e:
|
|
logger.warning(f"rerun-ocr: enhancement failed: {e}")
|
|
|
|
# 4. Run dual-engine OCR
|
|
from PIL import Image
|
|
import pytesseract
|
|
|
|
# RapidOCR
|
|
rapid_words = []
|
|
try:
|
|
from cv_ocr_engines import ocr_region_rapid
|
|
from cv_vocab_types import PageRegion
|
|
full_region = PageRegion(type="full_page", x=0, y=0, width=img_w, height=img_h)
|
|
rapid_words = ocr_region_rapid(ocr_input, full_region) or []
|
|
except Exception as e:
|
|
logger.warning(f"rerun-ocr: RapidOCR failed: {e}")
|
|
|
|
# Tesseract
|
|
pil_img = Image.fromarray(ocr_input[:, :, ::-1])
|
|
data = pytesseract.image_to_data(pil_img, lang='eng+deu', config='--psm 6 --oem 3', output_type=pytesseract.Output.DICT)
|
|
tess_words = []
|
|
for i in range(len(data["text"])):
|
|
text = (data["text"][i] or "").strip()
|
|
conf_raw = str(data["conf"][i])
|
|
conf = int(conf_raw) if conf_raw.lstrip("-").isdigit() else -1
|
|
if not text or conf < actual_min_conf:
|
|
continue
|
|
tess_words.append({
|
|
"text": text, "left": data["left"][i], "top": data["top"][i],
|
|
"width": data["width"][i], "height": data["height"][i], "conf": conf,
|
|
})
|
|
|
|
# 5. Merge OCR results
|
|
from ocr_pipeline_ocr_merge import _split_paddle_multi_words, _merge_paddle_tesseract, _deduplicate_words
|
|
rapid_split = _split_paddle_multi_words(rapid_words) if rapid_words else []
|
|
if rapid_split or tess_words:
|
|
merged_words = _merge_paddle_tesseract(rapid_split, tess_words, img_w, img_h)
|
|
merged_words = _deduplicate_words(merged_words)
|
|
else:
|
|
merged_words = tess_words
|
|
|
|
# 6. Store updated word_result in session
|
|
cells_for_storage = [{"text": w["text"], "left": w["left"], "top": w["top"],
|
|
"width": w["width"], "height": w["height"], "conf": w.get("conf", 0)}
|
|
for w in merged_words]
|
|
word_result = {
|
|
"cells": [{"text": " ".join(w["text"] for w in merged_words),
|
|
"word_boxes": cells_for_storage}],
|
|
"image_width": img_w,
|
|
"image_height": img_h,
|
|
"ocr_engine": "rapid_kombi",
|
|
"word_count": len(merged_words),
|
|
"raw_paddle_words": rapid_words,
|
|
}
|
|
await update_session_db(session_id, word_result=word_result)
|
|
|
|
# Reload session with updated word_result
|
|
session = await get_session_db(session_id)
|
|
|
|
ocr_duration = _time.time() - t0
|
|
logger.info(
|
|
"rerun-ocr session %s: %d words (rapid=%d, tess=%d, merged=%d) in %.1fs "
|
|
"(enhance=%s, min_conf=%d, quality=%s)",
|
|
session_id, len(merged_words), len(rapid_words), len(tess_words),
|
|
len(merged_words), ocr_duration, enhance, actual_min_conf,
|
|
scan_quality_info.get("quality_pct", "?"),
|
|
)
|
|
|
|
# 7. Build grid from new words
|
|
try:
|
|
result = await _build_grid_core(
|
|
session_id, session,
|
|
ipa_mode=ipa_mode, syllable_mode=syllable_mode,
|
|
enhance=enhance,
|
|
max_columns=max_cols if max_cols > 0 else None,
|
|
min_conf=min_conf if min_conf > 0 else None,
|
|
)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
# Persist grid
|
|
await update_session_db(session_id, grid_editor_result=result, current_step=11)
|
|
|
|
# Add quality info to response
|
|
result["scan_quality"] = scan_quality_info
|
|
result["ocr_stats"] = {
|
|
"rapid_words": len(rapid_words),
|
|
"tess_words": len(tess_words),
|
|
"merged_words": len(merged_words),
|
|
"min_conf_used": actual_min_conf,
|
|
"enhance_applied": enhance and is_degraded,
|
|
"ocr_duration_seconds": round(ocr_duration, 1),
|
|
}
|
|
|
|
total_duration = _time.time() - t0
|
|
logger.info(
|
|
"rerun-ocr+build-grid session %s: %d zones, %d cols, %d cells in %.1fs",
|
|
session_id,
|
|
len(result.get("zones", [])),
|
|
result.get("summary", {}).get("total_columns", 0),
|
|
result.get("summary", {}).get("total_cells", 0),
|
|
total_duration,
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
@router.post("/sessions/{session_id}/save-grid")
|
|
async def save_grid(session_id: str, request: Request):
|
|
"""Save edited grid data from the frontend Excel-like editor.
|
|
|
|
Receives the full StructuredGrid with user edits (text changes,
|
|
formatting changes like bold columns, header rows, etc.) and
|
|
persists it to the session's grid_editor_result.
|
|
"""
|
|
session = await get_session_db(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
|
|
body = await request.json()
|
|
|
|
# Validate basic structure
|
|
if "zones" not in body:
|
|
raise HTTPException(status_code=400, detail="Missing 'zones' in request body")
|
|
|
|
# Preserve metadata from the original build
|
|
existing = session.get("grid_editor_result") or {}
|
|
result = {
|
|
"session_id": session_id,
|
|
"image_width": body.get("image_width", existing.get("image_width", 0)),
|
|
"image_height": body.get("image_height", existing.get("image_height", 0)),
|
|
"zones": body["zones"],
|
|
"boxes_detected": body.get("boxes_detected", existing.get("boxes_detected", 0)),
|
|
"summary": body.get("summary", existing.get("summary", {})),
|
|
"formatting": body.get("formatting", existing.get("formatting", {})),
|
|
"duration_seconds": existing.get("duration_seconds", 0),
|
|
"edited": True,
|
|
}
|
|
|
|
await update_session_db(session_id, grid_editor_result=result, current_step=11)
|
|
|
|
logger.info("save-grid session %s: %d zones saved", session_id, len(body["zones"]))
|
|
|
|
return {"session_id": session_id, "saved": True}
|
|
|
|
|
|
@router.get("/sessions/{session_id}/grid-editor")
|
|
async def get_grid(session_id: str):
|
|
"""Retrieve the current grid editor state for a session."""
|
|
session = await get_session_db(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
|
|
result = session.get("grid_editor_result")
|
|
if not result:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail="No grid editor data. Run build-grid first.",
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Gutter Repair endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/sessions/{session_id}/gutter-repair")
|
|
async def gutter_repair(session_id: str):
|
|
"""Analyse grid for gutter-edge OCR errors and return repair suggestions.
|
|
|
|
Detects:
|
|
- Words truncated/blurred at the book binding (spell_fix)
|
|
- Words split across rows with missing hyphen chars (hyphen_join)
|
|
"""
|
|
session = await get_session_db(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
|
|
grid_data = session.get("grid_editor_result")
|
|
if not grid_data:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="No grid data. Run build-grid first.",
|
|
)
|
|
|
|
from cv_gutter_repair import analyse_grid_for_gutter_repair
|
|
|
|
image_width = grid_data.get("image_width", 0)
|
|
result = analyse_grid_for_gutter_repair(grid_data, image_width=image_width)
|
|
|
|
# Persist suggestions in ground_truth.gutter_repair (avoids DB migration)
|
|
gt = session.get("ground_truth") or {}
|
|
gt["gutter_repair"] = result
|
|
await update_session_db(session_id, ground_truth=gt)
|
|
|
|
logger.info(
|
|
"gutter-repair session %s: %d suggestions in %.2fs",
|
|
session_id,
|
|
result.get("stats", {}).get("suggestions_found", 0),
|
|
result.get("duration_seconds", 0),
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
@router.post("/sessions/{session_id}/gutter-repair/apply")
|
|
async def gutter_repair_apply(session_id: str, request: Request):
|
|
"""Apply accepted gutter repair suggestions to the grid.
|
|
|
|
Body: { "accepted": ["suggestion_id_1", "suggestion_id_2", ...] }
|
|
"""
|
|
session = await get_session_db(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
|
|
grid_data = session.get("grid_editor_result")
|
|
if not grid_data:
|
|
raise HTTPException(status_code=400, detail="No grid data.")
|
|
|
|
gt = session.get("ground_truth") or {}
|
|
gutter_result = gt.get("gutter_repair")
|
|
if not gutter_result:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="No gutter repair data. Run gutter-repair first.",
|
|
)
|
|
|
|
body = await request.json()
|
|
accepted_ids = body.get("accepted", [])
|
|
if not accepted_ids:
|
|
return {"applied_count": 0, "changes": []}
|
|
|
|
# text_overrides: { suggestion_id: "alternative_text" }
|
|
# Allows the user to pick a different correction from the alternatives list
|
|
text_overrides = body.get("text_overrides", {})
|
|
|
|
from cv_gutter_repair import apply_gutter_suggestions
|
|
|
|
suggestions = gutter_result.get("suggestions", [])
|
|
|
|
# Apply user-selected alternatives before passing to apply
|
|
for s in suggestions:
|
|
sid = s.get("id", "")
|
|
if sid in text_overrides and text_overrides[sid]:
|
|
s["suggested_text"] = text_overrides[sid]
|
|
|
|
result = apply_gutter_suggestions(grid_data, accepted_ids, suggestions)
|
|
|
|
# Save updated grid back to session
|
|
await update_session_db(session_id, grid_editor_result=grid_data)
|
|
|
|
logger.info(
|
|
"gutter-repair/apply session %s: %d changes applied",
|
|
session_id,
|
|
result.get("applied_count", 0),
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Box-Grid-Review endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/sessions/{session_id}/build-box-grids")
|
|
async def build_box_grids(session_id: str, request: Request):
|
|
"""Rebuild grid structure for all detected boxes with layout-aware detection.
|
|
|
|
Uses structure_result.boxes (from Step 7) as the source of box coordinates,
|
|
and raw_paddle_words as OCR word source. Creates or updates box zones in
|
|
the grid_editor_result.
|
|
|
|
Optional body: { "overrides": { "0": "bullet_list" } }
|
|
Maps box_index → forced layout_type.
|
|
"""
|
|
session = await get_session_db(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
|
|
grid_data = session.get("grid_editor_result")
|
|
if not grid_data:
|
|
raise HTTPException(status_code=400, detail="No grid data. Run build-grid first.")
|
|
|
|
# Get raw OCR words (with top/left/width/height keys)
|
|
word_result = session.get("word_result") or {}
|
|
all_words = word_result.get("raw_paddle_words") or word_result.get("raw_tesseract_words") or []
|
|
if not all_words:
|
|
raise HTTPException(status_code=400, detail="No raw OCR words available.")
|
|
|
|
# Get detected boxes from structure_result
|
|
structure_result = session.get("structure_result") or {}
|
|
gt = session.get("ground_truth") or {}
|
|
if not structure_result:
|
|
structure_result = gt.get("structure_result") or {}
|
|
detected_boxes = structure_result.get("boxes") or []
|
|
if not detected_boxes:
|
|
return {"session_id": session_id, "box_zones_rebuilt": 0, "spell_fixes": 0, "message": "No boxes detected"}
|
|
|
|
# Filter out false-positive boxes in header/footer margins.
|
|
# Textbook pages have ~2.5cm margins at top/bottom. At typical scan
|
|
# resolutions (150-300 DPI), that's roughly 5-10% of image height.
|
|
# A box whose vertical CENTER falls within the top or bottom 7% of
|
|
# the image is likely a page number, unit header, or running footer.
|
|
img_h_for_filter = grid_data.get("image_height", 0) or word_result.get("image_height", 0)
|
|
if img_h_for_filter > 0:
|
|
margin_frac = 0.07 # 7% of image height
|
|
margin_top = img_h_for_filter * margin_frac
|
|
margin_bottom = img_h_for_filter * (1 - margin_frac)
|
|
filtered = []
|
|
for box in detected_boxes:
|
|
by = box.get("y", 0)
|
|
bh = box.get("h", 0)
|
|
box_center_y = by + bh / 2
|
|
if box_center_y < margin_top or box_center_y > margin_bottom:
|
|
logger.info("build-box-grids: skipping header/footer box at y=%d h=%d (center=%.0f, margins=%.0f/%.0f)",
|
|
by, bh, box_center_y, margin_top, margin_bottom)
|
|
continue
|
|
filtered.append(box)
|
|
detected_boxes = filtered
|
|
|
|
body = {}
|
|
try:
|
|
body = await request.json()
|
|
except Exception:
|
|
pass
|
|
layout_overrides = body.get("overrides", {})
|
|
|
|
from cv_box_layout import build_box_zone_grid
|
|
from grid_editor_helpers import _words_in_zone
|
|
|
|
img_w = grid_data.get("image_width", 0) or word_result.get("image_width", 0)
|
|
img_h = grid_data.get("image_height", 0) or word_result.get("image_height", 0)
|
|
|
|
zones = grid_data.get("zones", [])
|
|
|
|
# Find highest existing zone_index
|
|
max_zone_idx = max((z.get("zone_index", 0) for z in zones), default=-1)
|
|
|
|
# Remove old box zones (we'll rebuild them)
|
|
zones = [z for z in zones if z.get("zone_type") != "box"]
|
|
|
|
box_count = 0
|
|
spell_fixes = 0
|
|
|
|
for box_idx, box in enumerate(detected_boxes):
|
|
bx = box.get("x", 0)
|
|
by = box.get("y", 0)
|
|
bw = box.get("w", 0)
|
|
bh = box.get("h", 0)
|
|
|
|
if bw <= 0 or bh <= 0:
|
|
continue
|
|
|
|
# Filter raw OCR words inside this box
|
|
zone_words = _words_in_zone(all_words, by, bh, bx, bw)
|
|
if not zone_words:
|
|
logger.info("Box %d: no words found in bbox (%d,%d,%d,%d)", box_idx, bx, by, bw, bh)
|
|
continue
|
|
|
|
zone_idx = max_zone_idx + 1 + box_idx
|
|
forced_layout = layout_overrides.get(str(box_idx))
|
|
|
|
# Build box grid
|
|
box_grid = build_box_zone_grid(
|
|
zone_words, bx, by, bw, bh,
|
|
zone_idx, img_w, img_h,
|
|
layout_type=forced_layout,
|
|
)
|
|
|
|
# Apply SmartSpellChecker to all box cells
|
|
try:
|
|
from smart_spell import SmartSpellChecker
|
|
ssc = SmartSpellChecker()
|
|
for cell in box_grid.get("cells", []):
|
|
text = cell.get("text", "")
|
|
if not text:
|
|
continue
|
|
result = ssc.correct_text(text, lang="auto")
|
|
if result.changed:
|
|
cell["text"] = result.corrected
|
|
spell_fixes += 1
|
|
except ImportError:
|
|
pass
|
|
|
|
# Build zone entry
|
|
zone_entry = {
|
|
"zone_index": zone_idx,
|
|
"zone_type": "box",
|
|
"bbox_px": {"x": bx, "y": by, "w": bw, "h": bh},
|
|
"bbox_pct": {
|
|
"x": round(bx / img_w * 100, 2) if img_w else 0,
|
|
"y": round(by / img_h * 100, 2) if img_h else 0,
|
|
"w": round(bw / img_w * 100, 2) if img_w else 0,
|
|
"h": round(bh / img_h * 100, 2) if img_h else 0,
|
|
},
|
|
"border": None,
|
|
"word_count": len(zone_words),
|
|
"columns": box_grid["columns"],
|
|
"rows": box_grid["rows"],
|
|
"cells": box_grid["cells"],
|
|
"header_rows": box_grid.get("header_rows", []),
|
|
"box_layout_type": box_grid.get("box_layout_type", "flowing"),
|
|
"box_grid_reviewed": False,
|
|
"box_bg_color": box.get("bg_color_name", ""),
|
|
"box_bg_hex": box.get("bg_color_hex", ""),
|
|
}
|
|
zones.append(zone_entry)
|
|
box_count += 1
|
|
|
|
# Sort zones by y-position for correct reading order
|
|
zones.sort(key=lambda z: z.get("bbox_px", {}).get("y", 0))
|
|
|
|
grid_data["zones"] = zones
|
|
await update_session_db(session_id, grid_editor_result=grid_data)
|
|
|
|
logger.info(
|
|
"build-box-grids session %s: %d boxes processed (%d words spell-fixed) from %d detected",
|
|
session_id, box_count, spell_fixes, len(detected_boxes),
|
|
)
|
|
|
|
return {
|
|
"session_id": session_id,
|
|
"box_zones_rebuilt": box_count,
|
|
"total_detected_boxes": len(detected_boxes),
|
|
"spell_fixes": spell_fixes,
|
|
"zones": zones,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Unified Grid endpoint
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("/sessions/{session_id}/build-unified-grid")
|
|
async def build_unified_grid_endpoint(session_id: str):
|
|
"""Build a single-zone unified grid merging content + box zones.
|
|
|
|
Takes the existing multi-zone grid_editor_result and produces a
|
|
unified grid where boxes are integrated into the main row sequence.
|
|
Persists as unified_grid_result (preserves original multi-zone data).
|
|
"""
|
|
session = await get_session_db(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
|
|
grid_data = session.get("grid_editor_result")
|
|
if not grid_data:
|
|
raise HTTPException(status_code=400, detail="No grid data. Run build-grid first.")
|
|
|
|
from unified_grid import build_unified_grid
|
|
|
|
result = build_unified_grid(
|
|
zones=grid_data.get("zones", []),
|
|
image_width=grid_data.get("image_width", 0),
|
|
image_height=grid_data.get("image_height", 0),
|
|
layout_metrics=grid_data.get("layout_metrics", {}),
|
|
)
|
|
|
|
# Persist as separate field (don't overwrite original multi-zone grid)
|
|
await update_session_db(session_id, unified_grid_result=result)
|
|
|
|
logger.info(
|
|
"build-unified-grid session %s: %d rows, %d cells",
|
|
session_id,
|
|
result.get("summary", {}).get("total_rows", 0),
|
|
result.get("summary", {}).get("total_cells", 0),
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
@router.get("/sessions/{session_id}/unified-grid")
|
|
async def get_unified_grid(session_id: str):
|
|
"""Retrieve the unified grid for a session."""
|
|
session = await get_session_db(session_id)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
|
|
|
result = session.get("unified_grid_result")
|
|
if not result:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail="No unified grid. Run build-unified-grid first.",
|
|
)
|
|
|
|
return result
|