Files
breakpilot-lehrer/klausur-service/backend/grid_editor_api.py
Benjamin Admin 427fecdce0
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 27s
CI / test-go-edu-search (push) Successful in 27s
CI / test-python-klausur (push) Failing after 2m1s
CI / test-python-agent-core (push) Successful in 16s
CI / test-nodejs-website (push) Successful in 18s
fix: union column detection across all content zones
Instead of propagating columns from the largest content zone only
(which missed narrow columns like page_ref), collect column split
points from ALL content zones and merge them. This way a column
found in any zone (e.g. page_ref at x=132 in the zone below boxes)
is available everywhere.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-16 23:02:33 +01:00

782 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Grid Editor API — builds a structured, zone-aware grid from Kombi OCR results.
Takes the merged word positions from paddle-kombi / rapid-kombi and:
1. Detects bordered boxes on the image (cv_box_detect)
2. Splits the page into zones (content + box regions)
3. Clusters words into columns and rows per zone
4. Returns a hierarchical StructuredGrid for the frontend Excel-like editor
Lizenz: Apache 2.0 (kommerziell nutzbar)
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import logging
import time
from typing import Any, Dict, List, Optional
import cv2
import numpy as np
from fastapi import APIRouter, HTTPException, Request
from cv_box_detect import detect_boxes, split_page_into_zones
from cv_color_detect import detect_word_colors, recover_colored_text
from cv_words_first import _cluster_rows, _build_cells
from ocr_pipeline_session_store import (
get_session_db,
get_session_image,
update_session_db,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["grid-editor"])
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _cluster_columns_by_alignment(
words: List[Dict],
zone_w: int,
rows: List[Dict],
) -> List[Dict[str, Any]]:
"""Detect columns by clustering left-edge alignment across rows.
Hybrid approach:
1. Group words by row, find "group start" positions within each row
(words preceded by a large gap or first word in row)
2. Cluster group-start left-edges by X-proximity across rows
3. Filter by row coverage (how many rows have a group start here)
4. Merge nearby clusters
5. Build column boundaries
This filters out mid-phrase word positions (e.g. IPA transcriptions,
second words in multi-word entries) by only considering positions
where a new word group begins within a row.
"""
if not words or not rows:
return []
total_rows = len(rows)
if total_rows == 0:
return []
# --- Group words by row ---
row_words: Dict[int, List[Dict]] = {}
for w in words:
y_center = w["top"] + w["height"] / 2
best = min(rows, key=lambda r: abs(r["y_center"] - y_center))
row_words.setdefault(best["index"], []).append(w)
# --- Compute adaptive gap threshold for group-start detection ---
all_gaps: List[float] = []
for ri, rw_list in row_words.items():
sorted_rw = sorted(rw_list, key=lambda w: w["left"])
for i in range(len(sorted_rw) - 1):
right = sorted_rw[i]["left"] + sorted_rw[i]["width"]
gap = sorted_rw[i + 1]["left"] - right
if gap > 0:
all_gaps.append(gap)
if all_gaps:
sorted_gaps = sorted(all_gaps)
median_gap = sorted_gaps[len(sorted_gaps) // 2]
heights = [w["height"] for w in words if w.get("height", 0) > 0]
median_h = sorted(heights)[len(heights) // 2] if heights else 25
# Column boundary: gap > 3× median gap or > 1.5× median word height
gap_threshold = max(median_gap * 3, median_h * 1.5, 30)
else:
gap_threshold = 50
# --- Find group-start positions (left-edges that begin a new column) ---
start_positions: List[tuple] = [] # (left_edge, row_index)
for ri, rw_list in row_words.items():
sorted_rw = sorted(rw_list, key=lambda w: w["left"])
# First word in row is always a group start
start_positions.append((sorted_rw[0]["left"], ri))
for i in range(1, len(sorted_rw)):
right_prev = sorted_rw[i - 1]["left"] + sorted_rw[i - 1]["width"]
gap = sorted_rw[i]["left"] - right_prev
if gap >= gap_threshold:
start_positions.append((sorted_rw[i]["left"], ri))
start_positions.sort(key=lambda x: x[0])
logger.info(
"alignment columns: %d group-start positions from %d words "
"(gap_threshold=%.0f, %d rows)",
len(start_positions), len(words), gap_threshold, total_rows,
)
if not start_positions:
x_min = min(w["left"] for w in words)
x_max = max(w["left"] + w["width"] for w in words)
return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}]
# --- Cluster group-start positions by X-proximity ---
tolerance = max(10, int(zone_w * 0.01))
clusters: List[Dict[str, Any]] = []
cur_edges = [start_positions[0][0]]
cur_rows = {start_positions[0][1]}
for left, row_idx in start_positions[1:]:
if left - cur_edges[-1] <= tolerance:
cur_edges.append(left)
cur_rows.add(row_idx)
else:
clusters.append({
"mean_x": int(sum(cur_edges) / len(cur_edges)),
"min_edge": min(cur_edges),
"max_edge": max(cur_edges),
"count": len(cur_edges),
"distinct_rows": len(cur_rows),
"row_coverage": len(cur_rows) / total_rows,
})
cur_edges = [left]
cur_rows = {row_idx}
clusters.append({
"mean_x": int(sum(cur_edges) / len(cur_edges)),
"min_edge": min(cur_edges),
"max_edge": max(cur_edges),
"count": len(cur_edges),
"distinct_rows": len(cur_rows),
"row_coverage": len(cur_rows) / total_rows,
})
# --- Filter by row coverage ---
MIN_COVERAGE_PRIMARY = 0.20
MIN_COVERAGE_SECONDARY = 0.12
MIN_WORDS_SECONDARY = 3
MIN_DISTINCT_ROWS = 2
# Content boundary for left-margin detection
content_x_min = min(w["left"] for w in words)
content_x_max = max(w["left"] + w["width"] for w in words)
content_span = content_x_max - content_x_min
primary = [
c for c in clusters
if c["row_coverage"] >= MIN_COVERAGE_PRIMARY
and c["distinct_rows"] >= MIN_DISTINCT_ROWS
]
primary_ids = {id(c) for c in primary}
secondary = [
c for c in clusters
if id(c) not in primary_ids
and c["row_coverage"] >= MIN_COVERAGE_SECONDARY
and c["count"] >= MIN_WORDS_SECONDARY
and c["distinct_rows"] >= MIN_DISTINCT_ROWS
]
# Tertiary: narrow left-margin columns (page refs, markers) that have
# too few rows for secondary but are clearly left-aligned and separated
# from the main content. These appear at the far left or far right and
# have a large gap to the nearest significant cluster.
used_ids = {id(c) for c in primary} | {id(c) for c in secondary}
sig_xs = [c["mean_x"] for c in primary + secondary]
tertiary = []
for c in clusters:
if id(c) in used_ids or c["distinct_rows"] < MIN_DISTINCT_ROWS:
continue
# Must be near left or right content margin (within 15%)
rel_pos = (c["mean_x"] - content_x_min) / content_span if content_span else 0.5
if not (rel_pos < 0.15 or rel_pos > 0.85):
continue
# Must have significant gap to nearest significant cluster
if sig_xs:
min_dist = min(abs(c["mean_x"] - sx) for sx in sig_xs)
if min_dist < max(30, content_span * 0.02):
continue
tertiary.append(c)
if tertiary:
for c in tertiary:
logger.info(
" tertiary (margin) cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)",
c["mean_x"], c["min_edge"], c["max_edge"],
c["count"], c["distinct_rows"], c["row_coverage"] * 100,
)
significant = sorted(primary + secondary + tertiary, key=lambda c: c["mean_x"])
for c in significant:
logger.info(
" significant cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)",
c["mean_x"], c["min_edge"], c["max_edge"],
c["count"], c["distinct_rows"], c["row_coverage"] * 100,
)
logger.info(
"alignment columns: %d clusters, %d primary, %d secondary → %d significant",
len(clusters), len(primary), len(secondary), len(significant),
)
if not significant:
# Fallback: single column covering all content
x_min = min(w["left"] for w in words)
x_max = max(w["left"] + w["width"] for w in words)
return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}]
# --- Merge nearby clusters ---
merge_distance = max(25, int(zone_w * 0.03))
merged = [significant[0].copy()]
for s in significant[1:]:
if s["mean_x"] - merged[-1]["mean_x"] < merge_distance:
prev = merged[-1]
total = prev["count"] + s["count"]
prev["mean_x"] = (
prev["mean_x"] * prev["count"] + s["mean_x"] * s["count"]
) // total
prev["count"] = total
prev["min_edge"] = min(prev["min_edge"], s["min_edge"])
prev["max_edge"] = max(prev["max_edge"], s["max_edge"])
prev["distinct_rows"] = max(prev["distinct_rows"], s["distinct_rows"])
else:
merged.append(s.copy())
logger.info(
"alignment columns: %d after merge (distance=%d)",
len(merged), merge_distance,
)
# --- Build column boundaries ---
margin = max(5, int(zone_w * 0.005))
content_x_min = min(w["left"] for w in words)
content_x_max = max(w["left"] + w["width"] for w in words)
columns: List[Dict[str, Any]] = []
for i, cluster in enumerate(merged):
x_min = max(content_x_min, cluster["min_edge"] - margin)
if i + 1 < len(merged):
x_max = merged[i + 1]["min_edge"] - margin
else:
x_max = content_x_max
columns.append({
"index": i,
"type": f"column_{i + 1}" if len(merged) > 1 else "column_text",
"x_min": x_min,
"x_max": x_max,
})
return columns
def _flatten_word_boxes(cells: List[Dict]) -> List[Dict]:
"""Extract all word_boxes from cells into a flat list of word dicts."""
words: List[Dict] = []
for cell in cells:
for wb in cell.get("word_boxes") or []:
if wb.get("text", "").strip():
words.append({
"text": wb["text"],
"left": wb["left"],
"top": wb["top"],
"width": wb["width"],
"height": wb["height"],
"conf": wb.get("conf", 0),
})
return words
def _words_in_zone(
words: List[Dict],
zone_y: int,
zone_h: int,
zone_x: int,
zone_w: int,
) -> List[Dict]:
"""Filter words whose Y-center falls within a zone's bounds."""
zone_y_end = zone_y + zone_h
zone_x_end = zone_x + zone_w
result = []
for w in words:
cy = w["top"] + w["height"] / 2
cx = w["left"] + w["width"] / 2
if zone_y <= cy <= zone_y_end and zone_x <= cx <= zone_x_end:
result.append(w)
return result
def _detect_header_rows(
rows: List[Dict],
zone_words: List[Dict],
zone_y: int,
) -> List[int]:
"""Heuristic: the first row is a header if it has bold/large text or
there's a significant gap after it."""
if len(rows) < 2:
return []
headers = []
first_row = rows[0]
second_row = rows[1]
# Gap between first and second row > 1.5x average row height
avg_h = sum(r["y_max"] - r["y_min"] for r in rows) / len(rows)
gap = second_row["y_min"] - first_row["y_max"]
if gap > avg_h * 0.5:
headers.append(0)
# Also check if first row words are taller than average (bold/header text)
first_row_words = [
w for w in zone_words
if first_row["y_min"] <= w["top"] + w["height"] / 2 <= first_row["y_max"]
]
if first_row_words:
first_h = max(w["height"] for w in first_row_words)
all_heights = [w["height"] for w in zone_words]
median_h = sorted(all_heights)[len(all_heights) // 2] if all_heights else first_h
if first_h > median_h * 1.3:
if 0 not in headers:
headers.append(0)
return headers
def _build_zone_grid(
zone_words: List[Dict],
zone_x: int,
zone_y: int,
zone_w: int,
zone_h: int,
zone_index: int,
img_w: int,
img_h: int,
global_columns: Optional[List[Dict]] = None,
) -> Dict[str, Any]:
"""Build columns, rows, cells for a single zone from its words.
Args:
global_columns: If provided, use these pre-computed column boundaries
instead of detecting columns per zone. Used for content zones so
that all content zones (above/between/below boxes) share the same
column structure. Box zones always detect columns independently.
"""
if not zone_words:
return {
"columns": [],
"rows": [],
"cells": [],
"header_rows": [],
}
# Cluster rows first (needed for column alignment analysis)
rows = _cluster_rows(zone_words)
# Use global columns if provided, otherwise detect per zone
columns = global_columns if global_columns else _cluster_columns_by_alignment(zone_words, zone_w, rows)
if not columns or not rows:
return {
"columns": [],
"rows": [],
"cells": [],
"header_rows": [],
}
# Build cells
cells = _build_cells(zone_words, columns, rows, img_w, img_h)
# Prefix cell IDs with zone index
for cell in cells:
cell["cell_id"] = f"Z{zone_index}_{cell['cell_id']}"
cell["zone_index"] = zone_index
# Detect header rows
header_rows = _detect_header_rows(rows, zone_words, zone_y)
# Convert columns to output format with percentages
out_columns = []
for col in columns:
x_min = col["x_min"]
x_max = col["x_max"]
out_columns.append({
"index": col["index"],
"label": col["type"],
"x_min_px": round(x_min),
"x_max_px": round(x_max),
"x_min_pct": round(x_min / img_w * 100, 2) if img_w else 0,
"x_max_pct": round(x_max / img_w * 100, 2) if img_w else 0,
"bold": False,
})
# Convert rows to output format with percentages
out_rows = []
for row in rows:
out_rows.append({
"index": row["index"],
"y_min_px": round(row["y_min"]),
"y_max_px": round(row["y_max"]),
"y_min_pct": round(row["y_min"] / img_h * 100, 2) if img_h else 0,
"y_max_pct": round(row["y_max"] / img_h * 100, 2) if img_h else 0,
"is_header": row["index"] in header_rows,
})
return {
"columns": out_columns,
"rows": out_rows,
"cells": cells,
"header_rows": header_rows,
"_raw_columns": columns, # internal: for propagation to other zones
}
def _get_content_bounds(words: List[Dict]) -> tuple:
"""Get content bounds from word positions."""
if not words:
return 0, 0, 0, 0
x_min = min(w["left"] for w in words)
y_min = min(w["top"] for w in words)
x_max = max(w["left"] + w["width"] for w in words)
y_max = max(w["top"] + w["height"] for w in words)
return x_min, y_min, x_max - x_min, y_max - y_min
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.post("/sessions/{session_id}/build-grid")
async def build_grid(session_id: str):
"""Build a structured, zone-aware grid from existing Kombi word results.
Requires that paddle-kombi or rapid-kombi has already been run on the session.
Uses the image for box detection and the word positions for grid structuring.
Returns a StructuredGrid with zones, each containing their own
columns, rows, and cells — ready for the frontend Excel-like editor.
"""
t0 = time.time()
# 1. Load session and word results
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
word_result = session.get("word_result")
if not word_result or not word_result.get("cells"):
raise HTTPException(
status_code=400,
detail="No word results found. Run paddle-kombi or rapid-kombi first.",
)
img_w = word_result.get("image_width", 0)
img_h = word_result.get("image_height", 0)
if not img_w or not img_h:
raise HTTPException(status_code=400, detail="Missing image dimensions in word_result")
# 2. Flatten all word boxes from cells
all_words = _flatten_word_boxes(word_result["cells"])
if not all_words:
raise HTTPException(status_code=400, detail="No word boxes found in cells")
logger.info("build-grid session %s: %d words from %d cells",
session_id, len(all_words), len(word_result["cells"]))
# 3. Load image for box detection
img_png = await get_session_image(session_id, "cropped")
if not img_png:
img_png = await get_session_image(session_id, "dewarped")
if not img_png:
img_png = await get_session_image(session_id, "original")
zones_data: List[Dict[str, Any]] = []
boxes_detected = 0
recovered_count = 0
img_bgr = None
content_x, content_y, content_w, content_h = _get_content_bounds(all_words)
if img_png:
# Decode image for color detection + box detection
arr = np.frombuffer(img_png, dtype=np.uint8)
img_bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if img_bgr is not None:
# --- Recover colored text that OCR missed (before grid building) ---
recovered = recover_colored_text(img_bgr, all_words)
if recovered:
recovered_count = len(recovered)
all_words.extend(recovered)
logger.info(
"build-grid session %s: +%d recovered colored words",
session_id, recovered_count,
)
# Detect bordered boxes
boxes = detect_boxes(
img_bgr,
content_x=content_x,
content_w=content_w,
content_y=content_y,
content_h=content_h,
)
boxes_detected = len(boxes)
if boxes:
# Split page into zones
page_zones = split_page_into_zones(
content_x, content_y, content_w, content_h, boxes
)
# --- Union columns from all content zones ---
# Each content zone detects columns independently. Narrow
# columns (page refs, markers) may appear in only one zone.
# Merge column split-points from ALL content zones so every
# zone shares the full column set.
# First pass: build grids per zone independently
zone_grids: List[Dict] = []
for pz in page_zones:
zone_words = _words_in_zone(
all_words, pz.y, pz.height, pz.x, pz.width
)
grid = _build_zone_grid(
zone_words, pz.x, pz.y, pz.width, pz.height,
pz.index, img_w, img_h,
)
zone_grids.append({"pz": pz, "words": zone_words, "grid": grid})
# Second pass: merge column boundaries from all content zones
content_zones = [
zg for zg in zone_grids if zg["pz"].zone_type == "content"
]
if len(content_zones) > 1:
# Collect column split points (x_min of non-first columns)
all_split_xs: List[float] = []
for zg in content_zones:
raw_cols = zg["grid"].get("_raw_columns", [])
for col in raw_cols[1:]:
all_split_xs.append(col["x_min"])
if all_split_xs:
all_split_xs.sort()
merge_distance = max(25, int(content_w * 0.03))
merged_xs = [all_split_xs[0]]
for x in all_split_xs[1:]:
if x - merged_xs[-1] < merge_distance:
merged_xs[-1] = (merged_xs[-1] + x) / 2
else:
merged_xs.append(x)
total_cols = len(merged_xs) + 1
max_zone_cols = max(
len(zg["grid"].get("_raw_columns", []))
for zg in content_zones
)
# Only apply union if it found more columns than
# any single zone (union adds information)
if total_cols > max_zone_cols:
cx_min = min(w["left"] for w in all_words)
cx_max = max(
w["left"] + w["width"] for w in all_words
)
merged_columns: List[Dict[str, Any]] = []
prev_x = cx_min
for i, sx in enumerate(merged_xs):
merged_columns.append({
"index": i,
"type": f"column_{i + 1}",
"x_min": prev_x,
"x_max": sx,
})
prev_x = sx
merged_columns.append({
"index": len(merged_xs),
"type": f"column_{len(merged_xs) + 1}",
"x_min": prev_x,
"x_max": cx_max,
})
# Re-build ALL content zones with merged columns
for zg in zone_grids:
pz = zg["pz"]
if pz.zone_type == "content":
grid = _build_zone_grid(
zg["words"], pz.x, pz.y,
pz.width, pz.height,
pz.index, img_w, img_h,
global_columns=merged_columns,
)
zg["grid"] = grid
logger.info(
"build-grid session %s: union of %d content "
"zones → %d merged columns (max single zone: %d)",
session_id, len(content_zones),
total_cols, max_zone_cols,
)
for zg in zone_grids:
pz = zg["pz"]
grid = zg["grid"]
# Remove internal _raw_columns before adding to response
grid.pop("_raw_columns", None)
zone_entry: Dict[str, Any] = {
"zone_index": pz.index,
"zone_type": pz.zone_type,
"bbox_px": {
"x": pz.x, "y": pz.y,
"w": pz.width, "h": pz.height,
},
"bbox_pct": {
"x": round(pz.x / img_w * 100, 2) if img_w else 0,
"y": round(pz.y / img_h * 100, 2) if img_h else 0,
"w": round(pz.width / img_w * 100, 2) if img_w else 0,
"h": round(pz.height / img_h * 100, 2) if img_h else 0,
},
"border": None,
"word_count": len(zg["words"]),
**grid,
}
if pz.box:
zone_entry["border"] = {
"thickness": pz.box.border_thickness,
"confidence": pz.box.confidence,
}
zones_data.append(zone_entry)
# 4. Fallback: no boxes detected → single zone with all words
if not zones_data:
grid = _build_zone_grid(
all_words, content_x, content_y, content_w, content_h,
0, img_w, img_h,
)
grid.pop("_raw_columns", None)
zones_data.append({
"zone_index": 0,
"zone_type": "content",
"bbox_px": {
"x": content_x, "y": content_y,
"w": content_w, "h": content_h,
},
"bbox_pct": {
"x": round(content_x / img_w * 100, 2) if img_w else 0,
"y": round(content_y / img_h * 100, 2) if img_h else 0,
"w": round(content_w / img_w * 100, 2) if img_w else 0,
"h": round(content_h / img_h * 100, 2) if img_h else 0,
},
"border": None,
"word_count": len(all_words),
**grid,
})
# 5. Color annotation on final word_boxes in cells
if img_bgr is not None:
all_wb: List[Dict] = []
for z in zones_data:
for cell in z.get("cells", []):
all_wb.extend(cell.get("word_boxes", []))
detect_word_colors(img_bgr, all_wb)
duration = time.time() - t0
# 6. Build result
total_cells = sum(len(z.get("cells", [])) for z in zones_data)
total_columns = sum(len(z.get("columns", [])) for z in zones_data)
total_rows = sum(len(z.get("rows", [])) for z in zones_data)
# Collect color statistics from all word_boxes in cells
color_stats: Dict[str, int] = {}
for z in zones_data:
for cell in z.get("cells", []):
for wb in cell.get("word_boxes", []):
cn = wb.get("color_name", "black")
color_stats[cn] = color_stats.get(cn, 0) + 1
result = {
"session_id": session_id,
"image_width": img_w,
"image_height": img_h,
"zones": zones_data,
"boxes_detected": boxes_detected,
"summary": {
"total_zones": len(zones_data),
"total_columns": total_columns,
"total_rows": total_rows,
"total_cells": total_cells,
"total_words": len(all_words),
"recovered_colored": recovered_count,
"color_stats": color_stats,
},
"formatting": {
"bold_columns": [],
"header_rows": [],
},
"duration_seconds": round(duration, 2),
}
# 7. Persist to DB
await update_session_db(session_id, grid_editor_result=result)
logger.info(
"build-grid session %s: %d zones, %d cols, %d rows, %d cells, "
"%d boxes in %.2fs",
session_id, len(zones_data), total_columns, total_rows,
total_cells, boxes_detected, duration,
)
return result
@router.post("/sessions/{session_id}/save-grid")
async def save_grid(session_id: str, request: Request):
"""Save edited grid data from the frontend Excel-like editor.
Receives the full StructuredGrid with user edits (text changes,
formatting changes like bold columns, header rows, etc.) and
persists it to the session's grid_editor_result.
"""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
body = await request.json()
# Validate basic structure
if "zones" not in body:
raise HTTPException(status_code=400, detail="Missing 'zones' in request body")
# Preserve metadata from the original build
existing = session.get("grid_editor_result") or {}
result = {
"session_id": session_id,
"image_width": body.get("image_width", existing.get("image_width", 0)),
"image_height": body.get("image_height", existing.get("image_height", 0)),
"zones": body["zones"],
"boxes_detected": body.get("boxes_detected", existing.get("boxes_detected", 0)),
"summary": body.get("summary", existing.get("summary", {})),
"formatting": body.get("formatting", existing.get("formatting", {})),
"duration_seconds": existing.get("duration_seconds", 0),
"edited": True,
}
await update_session_db(session_id, grid_editor_result=result)
logger.info("save-grid session %s: %d zones saved", session_id, len(body["zones"]))
return {"session_id": session_id, "saved": True}
@router.get("/sessions/{session_id}/grid-editor")
async def get_grid(session_id: str):
"""Retrieve the current grid editor state for a session."""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
result = session.get("grid_editor_result")
if not result:
raise HTTPException(
status_code=404,
detail="No grid editor data. Run build-grid first.",
)
return result