Files
breakpilot-lehrer/klausur-service/backend/grid_editor_api.py
Benjamin Admin c3f1547e32
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 27s
CI / test-go-edu-search (push) Successful in 28s
CI / test-python-klausur (push) Failing after 2m1s
CI / test-python-agent-core (push) Successful in 17s
CI / test-nodejs-website (push) Successful in 17s
feat: add Excel-like grid editor for OCR overlay (Kombi mode step 6)
Backend: new grid_editor_api.py with build-grid endpoint that detects
bordered boxes, splits page into zones, clusters columns/rows per zone
from Kombi word positions. New DB column grid_editor_result JSONB.

Frontend: GridEditor component with editable HTML tables per zone,
column bold toggle, header row toggle, undo/redo, keyboard navigation
(Tab/Enter/Arrow), image overlay verification, and save/load.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-14 23:41:03 +01:00

427 lines
14 KiB
Python

"""
Grid Editor API — builds a structured, zone-aware grid from Kombi OCR results.
Takes the merged word positions from paddle-kombi / rapid-kombi and:
1. Detects bordered boxes on the image (cv_box_detect)
2. Splits the page into zones (content + box regions)
3. Clusters words into columns and rows per zone
4. Returns a hierarchical StructuredGrid for the frontend Excel-like editor
Lizenz: Apache 2.0 (kommerziell nutzbar)
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import logging
import time
from typing import Any, Dict, List, Optional
import cv2
import numpy as np
from fastapi import APIRouter, HTTPException, Request
from cv_box_detect import detect_boxes, split_page_into_zones
from cv_words_first import _cluster_columns, _cluster_rows, _build_cells
from ocr_pipeline_session_store import (
get_session_db,
get_session_image,
update_session_db,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["grid-editor"])
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _flatten_word_boxes(cells: List[Dict]) -> List[Dict]:
"""Extract all word_boxes from cells into a flat list of word dicts."""
words: List[Dict] = []
for cell in cells:
for wb in cell.get("word_boxes") or []:
if wb.get("text", "").strip():
words.append({
"text": wb["text"],
"left": wb["left"],
"top": wb["top"],
"width": wb["width"],
"height": wb["height"],
"conf": wb.get("conf", 0),
})
return words
def _words_in_zone(
words: List[Dict],
zone_y: int,
zone_h: int,
zone_x: int,
zone_w: int,
) -> List[Dict]:
"""Filter words whose Y-center falls within a zone's bounds."""
zone_y_end = zone_y + zone_h
zone_x_end = zone_x + zone_w
result = []
for w in words:
cy = w["top"] + w["height"] / 2
cx = w["left"] + w["width"] / 2
if zone_y <= cy <= zone_y_end and zone_x <= cx <= zone_x_end:
result.append(w)
return result
def _detect_header_rows(
rows: List[Dict],
zone_words: List[Dict],
zone_y: int,
) -> List[int]:
"""Heuristic: the first row is a header if it has bold/large text or
there's a significant gap after it."""
if len(rows) < 2:
return []
headers = []
first_row = rows[0]
second_row = rows[1]
# Gap between first and second row > 1.5x average row height
avg_h = sum(r["y_max"] - r["y_min"] for r in rows) / len(rows)
gap = second_row["y_min"] - first_row["y_max"]
if gap > avg_h * 0.5:
headers.append(0)
# Also check if first row words are taller than average (bold/header text)
first_row_words = [
w for w in zone_words
if first_row["y_min"] <= w["top"] + w["height"] / 2 <= first_row["y_max"]
]
if first_row_words:
first_h = max(w["height"] for w in first_row_words)
all_heights = [w["height"] for w in zone_words]
median_h = sorted(all_heights)[len(all_heights) // 2] if all_heights else first_h
if first_h > median_h * 1.3:
if 0 not in headers:
headers.append(0)
return headers
def _build_zone_grid(
zone_words: List[Dict],
zone_x: int,
zone_y: int,
zone_w: int,
zone_h: int,
zone_index: int,
img_w: int,
img_h: int,
) -> Dict[str, Any]:
"""Build columns, rows, cells for a single zone from its words."""
if not zone_words:
return {
"columns": [],
"rows": [],
"cells": [],
"header_rows": [],
}
# Cluster columns and rows
columns = _cluster_columns(zone_words, zone_w)
rows = _cluster_rows(zone_words)
if not columns or not rows:
return {
"columns": [],
"rows": [],
"cells": [],
"header_rows": [],
}
# Build cells
cells = _build_cells(zone_words, columns, rows, img_w, img_h)
# Prefix cell IDs with zone index
for cell in cells:
cell["cell_id"] = f"Z{zone_index}_{cell['cell_id']}"
cell["zone_index"] = zone_index
# Detect header rows
header_rows = _detect_header_rows(rows, zone_words, zone_y)
# Convert columns to output format with percentages
out_columns = []
for col in columns:
x_min = col["x_min"]
x_max = col["x_max"]
out_columns.append({
"index": col["index"],
"label": col["type"],
"x_min_px": round(x_min),
"x_max_px": round(x_max),
"x_min_pct": round(x_min / img_w * 100, 2) if img_w else 0,
"x_max_pct": round(x_max / img_w * 100, 2) if img_w else 0,
"bold": False,
})
# Convert rows to output format with percentages
out_rows = []
for row in rows:
out_rows.append({
"index": row["index"],
"y_min_px": round(row["y_min"]),
"y_max_px": round(row["y_max"]),
"y_min_pct": round(row["y_min"] / img_h * 100, 2) if img_h else 0,
"y_max_pct": round(row["y_max"] / img_h * 100, 2) if img_h else 0,
"is_header": row["index"] in header_rows,
})
return {
"columns": out_columns,
"rows": out_rows,
"cells": cells,
"header_rows": header_rows,
}
def _get_content_bounds(words: List[Dict]) -> tuple:
"""Get content bounds from word positions."""
if not words:
return 0, 0, 0, 0
x_min = min(w["left"] for w in words)
y_min = min(w["top"] for w in words)
x_max = max(w["left"] + w["width"] for w in words)
y_max = max(w["top"] + w["height"] for w in words)
return x_min, y_min, x_max - x_min, y_max - y_min
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.post("/sessions/{session_id}/build-grid")
async def build_grid(session_id: str):
"""Build a structured, zone-aware grid from existing Kombi word results.
Requires that paddle-kombi or rapid-kombi has already been run on the session.
Uses the image for box detection and the word positions for grid structuring.
Returns a StructuredGrid with zones, each containing their own
columns, rows, and cells — ready for the frontend Excel-like editor.
"""
t0 = time.time()
# 1. Load session and word results
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
word_result = session.get("word_result")
if not word_result or not word_result.get("cells"):
raise HTTPException(
status_code=400,
detail="No word results found. Run paddle-kombi or rapid-kombi first.",
)
img_w = word_result.get("image_width", 0)
img_h = word_result.get("image_height", 0)
if not img_w or not img_h:
raise HTTPException(status_code=400, detail="Missing image dimensions in word_result")
# 2. Flatten all word boxes from cells
all_words = _flatten_word_boxes(word_result["cells"])
if not all_words:
raise HTTPException(status_code=400, detail="No word boxes found in cells")
logger.info("build-grid session %s: %d words from %d cells",
session_id, len(all_words), len(word_result["cells"]))
# 3. Load image for box detection
img_png = await get_session_image(session_id, "cropped")
if not img_png:
img_png = await get_session_image(session_id, "dewarped")
if not img_png:
img_png = await get_session_image(session_id, "original")
zones_data: List[Dict[str, Any]] = []
boxes_detected = 0
content_x, content_y, content_w, content_h = _get_content_bounds(all_words)
if img_png:
# Decode image for box detection
arr = np.frombuffer(img_png, dtype=np.uint8)
img_bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if img_bgr is not None:
# Detect bordered boxes
boxes = detect_boxes(
img_bgr,
content_x=content_x,
content_w=content_w,
content_y=content_y,
content_h=content_h,
)
boxes_detected = len(boxes)
if boxes:
# Split page into zones
page_zones = split_page_into_zones(
content_x, content_y, content_w, content_h, boxes
)
for pz in page_zones:
zone_words = _words_in_zone(
all_words, pz.y, pz.height, pz.x, pz.width
)
grid = _build_zone_grid(
zone_words, pz.x, pz.y, pz.width, pz.height,
pz.index, img_w, img_h,
)
zone_entry: Dict[str, Any] = {
"zone_index": pz.index,
"zone_type": pz.zone_type,
"bbox_px": {
"x": pz.x, "y": pz.y,
"w": pz.width, "h": pz.height,
},
"bbox_pct": {
"x": round(pz.x / img_w * 100, 2) if img_w else 0,
"y": round(pz.y / img_h * 100, 2) if img_h else 0,
"w": round(pz.width / img_w * 100, 2) if img_w else 0,
"h": round(pz.height / img_h * 100, 2) if img_h else 0,
},
"border": None,
"word_count": len(zone_words),
**grid,
}
if pz.box:
zone_entry["border"] = {
"thickness": pz.box.border_thickness,
"confidence": pz.box.confidence,
}
zones_data.append(zone_entry)
# 4. Fallback: no boxes detected → single zone with all words
if not zones_data:
grid = _build_zone_grid(
all_words, content_x, content_y, content_w, content_h,
0, img_w, img_h,
)
zones_data.append({
"zone_index": 0,
"zone_type": "content",
"bbox_px": {
"x": content_x, "y": content_y,
"w": content_w, "h": content_h,
},
"bbox_pct": {
"x": round(content_x / img_w * 100, 2) if img_w else 0,
"y": round(content_y / img_h * 100, 2) if img_h else 0,
"w": round(content_w / img_w * 100, 2) if img_w else 0,
"h": round(content_h / img_h * 100, 2) if img_h else 0,
},
"border": None,
"word_count": len(all_words),
**grid,
})
duration = time.time() - t0
# 5. Build result
total_cells = sum(len(z.get("cells", [])) for z in zones_data)
total_columns = sum(len(z.get("columns", [])) for z in zones_data)
total_rows = sum(len(z.get("rows", [])) for z in zones_data)
result = {
"session_id": session_id,
"image_width": img_w,
"image_height": img_h,
"zones": zones_data,
"boxes_detected": boxes_detected,
"summary": {
"total_zones": len(zones_data),
"total_columns": total_columns,
"total_rows": total_rows,
"total_cells": total_cells,
"total_words": len(all_words),
},
"formatting": {
"bold_columns": [],
"header_rows": [],
},
"duration_seconds": round(duration, 2),
}
# 6. Persist to DB
await update_session_db(session_id, grid_editor_result=result)
logger.info(
"build-grid session %s: %d zones, %d cols, %d rows, %d cells, "
"%d boxes in %.2fs",
session_id, len(zones_data), total_columns, total_rows,
total_cells, boxes_detected, duration,
)
return result
@router.post("/sessions/{session_id}/save-grid")
async def save_grid(session_id: str, request: Request):
"""Save edited grid data from the frontend Excel-like editor.
Receives the full StructuredGrid with user edits (text changes,
formatting changes like bold columns, header rows, etc.) and
persists it to the session's grid_editor_result.
"""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
body = await request.json()
# Validate basic structure
if "zones" not in body:
raise HTTPException(status_code=400, detail="Missing 'zones' in request body")
# Preserve metadata from the original build
existing = session.get("grid_editor_result") or {}
result = {
"session_id": session_id,
"image_width": body.get("image_width", existing.get("image_width", 0)),
"image_height": body.get("image_height", existing.get("image_height", 0)),
"zones": body["zones"],
"boxes_detected": body.get("boxes_detected", existing.get("boxes_detected", 0)),
"summary": body.get("summary", existing.get("summary", {})),
"formatting": body.get("formatting", existing.get("formatting", {})),
"duration_seconds": existing.get("duration_seconds", 0),
"edited": True,
}
await update_session_db(session_id, grid_editor_result=result)
logger.info("save-grid session %s: %d zones saved", session_id, len(body["zones"]))
return {"session_id": session_id, "saved": True}
@router.get("/sessions/{session_id}/grid-editor")
async def get_grid(session_id: str):
"""Retrieve the current grid editor state for a session."""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
result = session.get("grid_editor_result")
if not result:
raise HTTPException(
status_code=404,
detail="No grid editor data. Run build-grid first.",
)
return result