Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 40s
CI / test-go-edu-search (push) Successful in 25s
CI / test-python-klausur (push) Failing after 1m55s
CI / test-python-agent-core (push) Successful in 17s
CI / test-nodejs-website (push) Successful in 17s
The previous heuristic picked the column with the longest average text as the English headword column. In layouts with long example sentences, this picked the wrong column (examples instead of headwords). Now counts cells with bracket patterns per column — the column with the most brackets is the headword column where IPA needs fixing. Fixes garbled OCR-IPA like "change [tfeind3]" → "change [tʃˈeɪndʒ]". Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1838 lines
71 KiB
Python
1838 lines
71 KiB
Python
"""
|
||
Grid Editor API — builds a structured, zone-aware grid from Kombi OCR results.
|
||
|
||
Takes the merged word positions from paddle-kombi / rapid-kombi and:
|
||
1. Detects bordered boxes on the image (cv_box_detect)
|
||
2. Splits the page into zones (content + box regions)
|
||
3. Clusters words into columns and rows per zone
|
||
4. Returns a hierarchical StructuredGrid for the frontend Excel-like editor
|
||
|
||
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
||
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
||
"""
|
||
|
||
import logging
|
||
import re
|
||
import time
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
import cv2
|
||
import numpy as np
|
||
from fastapi import APIRouter, HTTPException, Request
|
||
|
||
from cv_box_detect import detect_boxes, split_page_into_zones
|
||
from cv_vocab_types import PageZone
|
||
from cv_color_detect import detect_word_colors, recover_colored_text
|
||
from cv_ocr_engines import fix_cell_phonetics, fix_ipa_continuation_cell, _text_has_garbled_ipa
|
||
from cv_words_first import _cluster_rows, _build_cells
|
||
from ocr_pipeline_session_store import (
|
||
get_session_db,
|
||
get_session_image,
|
||
update_session_db,
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["grid-editor"])
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _cluster_columns_by_alignment(
|
||
words: List[Dict],
|
||
zone_w: int,
|
||
rows: List[Dict],
|
||
) -> List[Dict[str, Any]]:
|
||
"""Detect columns by clustering left-edge alignment across rows.
|
||
|
||
Hybrid approach:
|
||
1. Group words by row, find "group start" positions within each row
|
||
(words preceded by a large gap or first word in row)
|
||
2. Cluster group-start left-edges by X-proximity across rows
|
||
3. Filter by row coverage (how many rows have a group start here)
|
||
4. Merge nearby clusters
|
||
5. Build column boundaries
|
||
|
||
This filters out mid-phrase word positions (e.g. IPA transcriptions,
|
||
second words in multi-word entries) by only considering positions
|
||
where a new word group begins within a row.
|
||
"""
|
||
if not words or not rows:
|
||
return []
|
||
|
||
total_rows = len(rows)
|
||
if total_rows == 0:
|
||
return []
|
||
|
||
# --- Group words by row ---
|
||
row_words: Dict[int, List[Dict]] = {}
|
||
for w in words:
|
||
y_center = w["top"] + w["height"] / 2
|
||
best = min(rows, key=lambda r: abs(r["y_center"] - y_center))
|
||
row_words.setdefault(best["index"], []).append(w)
|
||
|
||
# --- Compute adaptive gap threshold for group-start detection ---
|
||
all_gaps: List[float] = []
|
||
for ri, rw_list in row_words.items():
|
||
sorted_rw = sorted(rw_list, key=lambda w: w["left"])
|
||
for i in range(len(sorted_rw) - 1):
|
||
right = sorted_rw[i]["left"] + sorted_rw[i]["width"]
|
||
gap = sorted_rw[i + 1]["left"] - right
|
||
if gap > 0:
|
||
all_gaps.append(gap)
|
||
|
||
if all_gaps:
|
||
sorted_gaps = sorted(all_gaps)
|
||
median_gap = sorted_gaps[len(sorted_gaps) // 2]
|
||
heights = [w["height"] for w in words if w.get("height", 0) > 0]
|
||
median_h = sorted(heights)[len(heights) // 2] if heights else 25
|
||
# Column boundary: gap > 3× median gap or > 1.5× median word height
|
||
gap_threshold = max(median_gap * 3, median_h * 1.5, 30)
|
||
else:
|
||
gap_threshold = 50
|
||
|
||
# --- Find group-start positions (left-edges that begin a new column) ---
|
||
start_positions: List[tuple] = [] # (left_edge, row_index)
|
||
for ri, rw_list in row_words.items():
|
||
sorted_rw = sorted(rw_list, key=lambda w: w["left"])
|
||
# First word in row is always a group start
|
||
start_positions.append((sorted_rw[0]["left"], ri))
|
||
for i in range(1, len(sorted_rw)):
|
||
right_prev = sorted_rw[i - 1]["left"] + sorted_rw[i - 1]["width"]
|
||
gap = sorted_rw[i]["left"] - right_prev
|
||
if gap >= gap_threshold:
|
||
start_positions.append((sorted_rw[i]["left"], ri))
|
||
|
||
start_positions.sort(key=lambda x: x[0])
|
||
|
||
logger.info(
|
||
"alignment columns: %d group-start positions from %d words "
|
||
"(gap_threshold=%.0f, %d rows)",
|
||
len(start_positions), len(words), gap_threshold, total_rows,
|
||
)
|
||
|
||
if not start_positions:
|
||
x_min = min(w["left"] for w in words)
|
||
x_max = max(w["left"] + w["width"] for w in words)
|
||
return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}]
|
||
|
||
# --- Cluster group-start positions by X-proximity ---
|
||
tolerance = max(10, int(zone_w * 0.01))
|
||
clusters: List[Dict[str, Any]] = []
|
||
cur_edges = [start_positions[0][0]]
|
||
cur_rows = {start_positions[0][1]}
|
||
|
||
for left, row_idx in start_positions[1:]:
|
||
if left - cur_edges[-1] <= tolerance:
|
||
cur_edges.append(left)
|
||
cur_rows.add(row_idx)
|
||
else:
|
||
clusters.append({
|
||
"mean_x": int(sum(cur_edges) / len(cur_edges)),
|
||
"min_edge": min(cur_edges),
|
||
"max_edge": max(cur_edges),
|
||
"count": len(cur_edges),
|
||
"distinct_rows": len(cur_rows),
|
||
"row_coverage": len(cur_rows) / total_rows,
|
||
})
|
||
cur_edges = [left]
|
||
cur_rows = {row_idx}
|
||
clusters.append({
|
||
"mean_x": int(sum(cur_edges) / len(cur_edges)),
|
||
"min_edge": min(cur_edges),
|
||
"max_edge": max(cur_edges),
|
||
"count": len(cur_edges),
|
||
"distinct_rows": len(cur_rows),
|
||
"row_coverage": len(cur_rows) / total_rows,
|
||
})
|
||
|
||
# --- Filter by row coverage ---
|
||
# These thresholds must be high enough to avoid false columns in flowing
|
||
# text (random inter-word gaps) while still detecting real columns in
|
||
# vocabulary worksheets (which typically have >80% row coverage).
|
||
MIN_COVERAGE_PRIMARY = 0.35
|
||
MIN_COVERAGE_SECONDARY = 0.20
|
||
MIN_WORDS_SECONDARY = 4
|
||
MIN_DISTINCT_ROWS = 3
|
||
|
||
# Content boundary for left-margin detection
|
||
content_x_min = min(w["left"] for w in words)
|
||
content_x_max = max(w["left"] + w["width"] for w in words)
|
||
content_span = content_x_max - content_x_min
|
||
|
||
primary = [
|
||
c for c in clusters
|
||
if c["row_coverage"] >= MIN_COVERAGE_PRIMARY
|
||
and c["distinct_rows"] >= MIN_DISTINCT_ROWS
|
||
]
|
||
primary_ids = {id(c) for c in primary}
|
||
secondary = [
|
||
c for c in clusters
|
||
if id(c) not in primary_ids
|
||
and c["row_coverage"] >= MIN_COVERAGE_SECONDARY
|
||
and c["count"] >= MIN_WORDS_SECONDARY
|
||
and c["distinct_rows"] >= MIN_DISTINCT_ROWS
|
||
]
|
||
|
||
# Tertiary: narrow left-margin columns (page refs, markers) that have
|
||
# too few rows for secondary but are clearly left-aligned and separated
|
||
# from the main content. These appear at the far left or far right and
|
||
# have a large gap to the nearest significant cluster.
|
||
used_ids = {id(c) for c in primary} | {id(c) for c in secondary}
|
||
sig_xs = [c["mean_x"] for c in primary + secondary]
|
||
|
||
tertiary = []
|
||
for c in clusters:
|
||
if id(c) in used_ids or c["distinct_rows"] < MIN_DISTINCT_ROWS:
|
||
continue
|
||
# Must be near left or right content margin (within 15%)
|
||
rel_pos = (c["mean_x"] - content_x_min) / content_span if content_span else 0.5
|
||
if not (rel_pos < 0.15 or rel_pos > 0.85):
|
||
continue
|
||
# Must have significant gap to nearest significant cluster
|
||
if sig_xs:
|
||
min_dist = min(abs(c["mean_x"] - sx) for sx in sig_xs)
|
||
if min_dist < max(30, content_span * 0.02):
|
||
continue
|
||
tertiary.append(c)
|
||
|
||
if tertiary:
|
||
for c in tertiary:
|
||
logger.info(
|
||
" tertiary (margin) cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)",
|
||
c["mean_x"], c["min_edge"], c["max_edge"],
|
||
c["count"], c["distinct_rows"], c["row_coverage"] * 100,
|
||
)
|
||
|
||
significant = sorted(primary + secondary + tertiary, key=lambda c: c["mean_x"])
|
||
|
||
for c in significant:
|
||
logger.info(
|
||
" significant cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)",
|
||
c["mean_x"], c["min_edge"], c["max_edge"],
|
||
c["count"], c["distinct_rows"], c["row_coverage"] * 100,
|
||
)
|
||
logger.info(
|
||
"alignment columns: %d clusters, %d primary, %d secondary → %d significant",
|
||
len(clusters), len(primary), len(secondary), len(significant),
|
||
)
|
||
|
||
if not significant:
|
||
# Fallback: single column covering all content
|
||
x_min = min(w["left"] for w in words)
|
||
x_max = max(w["left"] + w["width"] for w in words)
|
||
return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}]
|
||
|
||
# --- Merge nearby clusters ---
|
||
merge_distance = max(25, int(zone_w * 0.03))
|
||
merged = [significant[0].copy()]
|
||
for s in significant[1:]:
|
||
if s["mean_x"] - merged[-1]["mean_x"] < merge_distance:
|
||
prev = merged[-1]
|
||
total = prev["count"] + s["count"]
|
||
prev["mean_x"] = (
|
||
prev["mean_x"] * prev["count"] + s["mean_x"] * s["count"]
|
||
) // total
|
||
prev["count"] = total
|
||
prev["min_edge"] = min(prev["min_edge"], s["min_edge"])
|
||
prev["max_edge"] = max(prev["max_edge"], s["max_edge"])
|
||
prev["distinct_rows"] = max(prev["distinct_rows"], s["distinct_rows"])
|
||
else:
|
||
merged.append(s.copy())
|
||
|
||
logger.info(
|
||
"alignment columns: %d after merge (distance=%d)",
|
||
len(merged), merge_distance,
|
||
)
|
||
|
||
# --- Build column boundaries ---
|
||
margin = max(5, int(zone_w * 0.005))
|
||
content_x_min = min(w["left"] for w in words)
|
||
content_x_max = max(w["left"] + w["width"] for w in words)
|
||
|
||
columns: List[Dict[str, Any]] = []
|
||
for i, cluster in enumerate(merged):
|
||
x_min = max(content_x_min, cluster["min_edge"] - margin)
|
||
if i + 1 < len(merged):
|
||
x_max = merged[i + 1]["min_edge"] - margin
|
||
else:
|
||
x_max = content_x_max
|
||
|
||
columns.append({
|
||
"index": i,
|
||
"type": f"column_{i + 1}" if len(merged) > 1 else "column_text",
|
||
"x_min": x_min,
|
||
"x_max": x_max,
|
||
})
|
||
|
||
return columns
|
||
|
||
|
||
# Characters that are typically OCR artefacts from box border lines.
|
||
# Intentionally excludes ! (red markers) and . , ; (real punctuation).
|
||
_GRID_GHOST_CHARS = set("|1lI[](){}/\\-—–_~=+")
|
||
|
||
|
||
def _filter_border_ghosts(
|
||
words: List[Dict],
|
||
boxes: List,
|
||
) -> tuple:
|
||
"""Remove words sitting on box borders that are OCR artefacts.
|
||
|
||
Returns (filtered_words, removed_count).
|
||
"""
|
||
if not boxes or not words:
|
||
return words, 0
|
||
|
||
# Build border bands from detected boxes
|
||
x_bands: List[tuple] = []
|
||
y_bands: List[tuple] = []
|
||
for b in boxes:
|
||
bt = (
|
||
b.border_thickness
|
||
if hasattr(b, "border_thickness")
|
||
else b.get("border_thickness", 3)
|
||
)
|
||
# Skip borderless boxes (images/graphics) — no border line to produce ghosts
|
||
if bt == 0:
|
||
continue
|
||
bx = b.x if hasattr(b, "x") else b.get("x", 0)
|
||
by = b.y if hasattr(b, "y") else b.get("y", 0)
|
||
bw = b.width if hasattr(b, "width") else b.get("w", b.get("width", 0))
|
||
bh = b.height if hasattr(b, "height") else b.get("h", b.get("height", 0))
|
||
margin = max(bt * 2, 10) + 6
|
||
x_bands.append((bx - margin, bx + margin))
|
||
x_bands.append((bx + bw - margin, bx + bw + margin))
|
||
y_bands.append((by - margin, by + margin))
|
||
y_bands.append((by + bh - margin, by + bh + margin))
|
||
|
||
def _is_ghost(w: Dict) -> bool:
|
||
text = (w.get("text") or "").strip()
|
||
if not text:
|
||
return False
|
||
# Check if any word edge (not just center) touches a border band
|
||
w_left = w["left"]
|
||
w_right = w["left"] + w["width"]
|
||
w_top = w["top"]
|
||
w_bottom = w["top"] + w["height"]
|
||
on_border = (
|
||
any(lo <= w_left <= hi or lo <= w_right <= hi for lo, hi in x_bands)
|
||
or any(lo <= w_top <= hi or lo <= w_bottom <= hi for lo, hi in y_bands)
|
||
)
|
||
if not on_border:
|
||
return False
|
||
if len(text) == 1 and text in _GRID_GHOST_CHARS:
|
||
return True
|
||
return False
|
||
|
||
filtered = [w for w in words if not _is_ghost(w)]
|
||
return filtered, len(words) - len(filtered)
|
||
|
||
|
||
_MARKER_CHARS = set("•*·-–—|~=+#>→►▸▪◆○●□■✓✗✔✘")
|
||
|
||
|
||
def _merge_inline_marker_columns(
|
||
columns: List[Dict],
|
||
words: List[Dict],
|
||
) -> List[Dict]:
|
||
"""Merge narrow marker columns (bullets, numbering) into adjacent text.
|
||
|
||
Bullet points (•, *, -) and numbering (1., 2.) create narrow columns
|
||
at the left edge of a zone. These are inline markers that indent text,
|
||
not real separate columns. Merge them with their right neighbour.
|
||
|
||
Does NOT merge columns containing alphabetic words like "to", "in",
|
||
"der", "die", "das" — those are legitimate content columns.
|
||
"""
|
||
if len(columns) < 2:
|
||
return columns
|
||
|
||
merged: List[Dict] = []
|
||
skip: set = set()
|
||
|
||
for i, col in enumerate(columns):
|
||
if i in skip:
|
||
continue
|
||
|
||
# Find words in this column
|
||
col_words = [
|
||
w for w in words
|
||
if col["x_min"] <= w["left"] + w["width"] / 2 < col["x_max"]
|
||
]
|
||
col_width = col["x_max"] - col["x_min"]
|
||
|
||
# Narrow column with mostly short words → MIGHT be inline markers
|
||
if col_words and col_width < 80:
|
||
avg_len = sum(len(w.get("text", "")) for w in col_words) / len(col_words)
|
||
if avg_len <= 2 and i + 1 < len(columns):
|
||
# Check if words are actual markers (symbols/numbers) vs
|
||
# real alphabetic words like "to", "in", "der", "die"
|
||
texts = [(w.get("text") or "").strip() for w in col_words]
|
||
alpha_count = sum(
|
||
1 for t in texts
|
||
if t and t[0].isalpha() and t not in _MARKER_CHARS
|
||
)
|
||
alpha_ratio = alpha_count / len(texts) if texts else 0
|
||
|
||
# If ≥50% of words are alphabetic, this is a real column
|
||
if alpha_ratio >= 0.5:
|
||
logger.info(
|
||
" kept narrow column %d (w=%d, avg_len=%.1f, "
|
||
"alpha=%.0f%%) — contains real words",
|
||
i, col_width, avg_len, alpha_ratio * 100,
|
||
)
|
||
else:
|
||
# Merge into next column
|
||
next_col = columns[i + 1].copy()
|
||
next_col["x_min"] = col["x_min"]
|
||
merged.append(next_col)
|
||
skip.add(i + 1)
|
||
logger.info(
|
||
" merged inline marker column %d (w=%d, avg_len=%.1f) "
|
||
"into column %d",
|
||
i, col_width, avg_len, i + 1,
|
||
)
|
||
continue
|
||
|
||
merged.append(col)
|
||
|
||
# Re-index
|
||
for i, col in enumerate(merged):
|
||
col["index"] = i
|
||
col["type"] = f"column_{i + 1}" if len(merged) > 1 else "column_text"
|
||
|
||
return merged
|
||
|
||
|
||
def _flatten_word_boxes(cells: List[Dict]) -> List[Dict]:
|
||
"""Extract all word_boxes from cells into a flat list of word dicts."""
|
||
words: List[Dict] = []
|
||
for cell in cells:
|
||
for wb in cell.get("word_boxes") or []:
|
||
if wb.get("text", "").strip():
|
||
words.append({
|
||
"text": wb["text"],
|
||
"left": wb["left"],
|
||
"top": wb["top"],
|
||
"width": wb["width"],
|
||
"height": wb["height"],
|
||
"conf": wb.get("conf", 0),
|
||
})
|
||
return words
|
||
|
||
|
||
def _words_in_zone(
|
||
words: List[Dict],
|
||
zone_y: int,
|
||
zone_h: int,
|
||
zone_x: int,
|
||
zone_w: int,
|
||
) -> List[Dict]:
|
||
"""Filter words whose Y-center falls within a zone's bounds."""
|
||
zone_y_end = zone_y + zone_h
|
||
zone_x_end = zone_x + zone_w
|
||
result = []
|
||
for w in words:
|
||
cy = w["top"] + w["height"] / 2
|
||
cx = w["left"] + w["width"] / 2
|
||
if zone_y <= cy <= zone_y_end and zone_x <= cx <= zone_x_end:
|
||
result.append(w)
|
||
return result
|
||
|
||
|
||
def _merge_content_zones_across_boxes(
|
||
zones: List,
|
||
content_x: int,
|
||
content_w: int,
|
||
) -> List:
|
||
"""Merge content zones separated by box zones into single zones.
|
||
|
||
Box zones become image_overlays on the merged content zone.
|
||
Pattern: [content, box*, content] → [merged_content with overlay]
|
||
Box zones NOT between two content zones stay as standalone zones.
|
||
"""
|
||
if len(zones) < 3:
|
||
return zones
|
||
|
||
# Group consecutive runs of [content, box+, content]
|
||
result: List = []
|
||
i = 0
|
||
while i < len(zones):
|
||
z = zones[i]
|
||
if z.zone_type != "content":
|
||
result.append(z)
|
||
i += 1
|
||
continue
|
||
|
||
# Start of a potential merge group: content zone
|
||
group_contents = [z]
|
||
group_boxes = []
|
||
j = i + 1
|
||
# Absorb [box, content] pairs — only absorb a box if it's
|
||
# confirmed to be followed by another content zone.
|
||
while j < len(zones):
|
||
if (zones[j].zone_type == "box"
|
||
and j + 1 < len(zones)
|
||
and zones[j + 1].zone_type == "content"):
|
||
group_boxes.append(zones[j])
|
||
group_contents.append(zones[j + 1])
|
||
j += 2
|
||
else:
|
||
break
|
||
|
||
if len(group_contents) >= 2 and group_boxes:
|
||
# Merge: create one large content zone spanning all
|
||
y_min = min(c.y for c in group_contents)
|
||
y_max = max(c.y + c.height for c in group_contents)
|
||
overlays = []
|
||
for bz in group_boxes:
|
||
overlay = {
|
||
"y": bz.y,
|
||
"height": bz.height,
|
||
"x": bz.x,
|
||
"width": bz.width,
|
||
}
|
||
if bz.box:
|
||
overlay["box"] = {
|
||
"x": bz.box.x,
|
||
"y": bz.box.y,
|
||
"width": bz.box.width,
|
||
"height": bz.box.height,
|
||
"confidence": bz.box.confidence,
|
||
"border_thickness": bz.box.border_thickness,
|
||
}
|
||
overlays.append(overlay)
|
||
|
||
merged = PageZone(
|
||
index=0, # re-indexed below
|
||
zone_type="content",
|
||
y=y_min,
|
||
height=y_max - y_min,
|
||
x=content_x,
|
||
width=content_w,
|
||
image_overlays=overlays,
|
||
)
|
||
result.append(merged)
|
||
i = j
|
||
else:
|
||
# No merge possible — emit just the content zone
|
||
result.append(z)
|
||
i += 1
|
||
|
||
# Re-index zones
|
||
for idx, z in enumerate(result):
|
||
z.index = idx
|
||
|
||
logger.info(
|
||
"zone-merge: %d zones → %d zones after merging across boxes",
|
||
len(zones), len(result),
|
||
)
|
||
return result
|
||
|
||
|
||
def _detect_heading_rows_by_color(zones_data: List[Dict], img_w: int, img_h: int) -> int:
|
||
"""Detect heading rows by color + height after color annotation.
|
||
|
||
A row is a heading if:
|
||
1. ALL word_boxes have color_name != 'black' (typically 'blue')
|
||
2. Mean word height > 1.2x median height of all words in the zone
|
||
|
||
Detected heading rows are merged into a single spanning cell.
|
||
Returns count of headings detected.
|
||
"""
|
||
heading_count = 0
|
||
|
||
for z in zones_data:
|
||
cells = z.get("cells", [])
|
||
rows = z.get("rows", [])
|
||
columns = z.get("columns", [])
|
||
if not cells or not rows or len(columns) < 2:
|
||
continue
|
||
|
||
# Compute median word height across the zone
|
||
all_heights = []
|
||
for cell in cells:
|
||
for wb in cell.get("word_boxes") or []:
|
||
h = wb.get("height", 0)
|
||
if h > 0:
|
||
all_heights.append(h)
|
||
if not all_heights:
|
||
continue
|
||
all_heights_sorted = sorted(all_heights)
|
||
median_h = all_heights_sorted[len(all_heights_sorted) // 2]
|
||
|
||
heading_row_indices = []
|
||
for row in rows:
|
||
if row.get("is_header"):
|
||
continue # already detected as header
|
||
ri = row["index"]
|
||
row_cells = [c for c in cells if c.get("row_index") == ri]
|
||
row_wbs = [
|
||
wb for cell in row_cells
|
||
for wb in cell.get("word_boxes") or []
|
||
]
|
||
if not row_wbs:
|
||
continue
|
||
|
||
# Condition 1: ALL words are non-black
|
||
all_colored = all(
|
||
wb.get("color_name", "black") != "black"
|
||
for wb in row_wbs
|
||
)
|
||
if not all_colored:
|
||
continue
|
||
|
||
# Condition 2: mean height > 1.2x median
|
||
mean_h = sum(wb.get("height", 0) for wb in row_wbs) / len(row_wbs)
|
||
if mean_h <= median_h * 1.2:
|
||
continue
|
||
|
||
heading_row_indices.append(ri)
|
||
|
||
# Merge heading cells into spanning cells
|
||
for hri in heading_row_indices:
|
||
header_cells = [c for c in cells if c.get("row_index") == hri]
|
||
if len(header_cells) <= 1:
|
||
# Single cell — just mark it as heading
|
||
if header_cells:
|
||
header_cells[0]["col_type"] = "heading"
|
||
heading_count += 1
|
||
# Mark row as header
|
||
for row in rows:
|
||
if row["index"] == hri:
|
||
row["is_header"] = True
|
||
continue
|
||
|
||
# Collect all word_boxes and text from all columns
|
||
all_wb = []
|
||
all_text_parts = []
|
||
for hc in sorted(header_cells, key=lambda c: c["col_index"]):
|
||
all_wb.extend(hc.get("word_boxes", []))
|
||
if hc.get("text", "").strip():
|
||
all_text_parts.append(hc["text"].strip())
|
||
|
||
# Remove all cells for this row, replace with one spanning cell
|
||
z["cells"] = [c for c in z["cells"] if c.get("row_index") != hri]
|
||
|
||
if all_wb:
|
||
x_min = min(wb["left"] for wb in all_wb)
|
||
y_min = min(wb["top"] for wb in all_wb)
|
||
x_max = max(wb["left"] + wb["width"] for wb in all_wb)
|
||
y_max = max(wb["top"] + wb["height"] for wb in all_wb)
|
||
|
||
zone_idx = z.get("zone_index", 0)
|
||
z["cells"].append({
|
||
"cell_id": f"Z{zone_idx}_R{hri:02d}_C0",
|
||
"zone_index": zone_idx,
|
||
"row_index": hri,
|
||
"col_index": 0,
|
||
"col_type": "heading",
|
||
"text": " ".join(all_text_parts),
|
||
"confidence": 0.0,
|
||
"bbox_px": {"x": x_min, "y": y_min,
|
||
"w": x_max - x_min, "h": y_max - y_min},
|
||
"bbox_pct": {
|
||
"x": round(x_min / img_w * 100, 2) if img_w else 0,
|
||
"y": round(y_min / img_h * 100, 2) if img_h else 0,
|
||
"w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0,
|
||
"h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0,
|
||
},
|
||
"word_boxes": all_wb,
|
||
"ocr_engine": "words_first",
|
||
"is_bold": True,
|
||
})
|
||
|
||
# Mark row as header
|
||
for row in rows:
|
||
if row["index"] == hri:
|
||
row["is_header"] = True
|
||
heading_count += 1
|
||
|
||
return heading_count
|
||
|
||
|
||
def _detect_header_rows(
|
||
rows: List[Dict],
|
||
zone_words: List[Dict],
|
||
zone_y: int,
|
||
columns: Optional[List[Dict]] = None,
|
||
skip_first_row_header: bool = False,
|
||
) -> List[int]:
|
||
"""Detect header rows: first-row heuristic + spanning header detection.
|
||
|
||
A "spanning header" is a row whose words stretch across multiple column
|
||
boundaries (e.g. "Unit4: Bonnie Scotland" centred across 4 columns).
|
||
"""
|
||
if len(rows) < 2:
|
||
return []
|
||
|
||
headers = []
|
||
|
||
if not skip_first_row_header:
|
||
first_row = rows[0]
|
||
second_row = rows[1]
|
||
|
||
# Gap between first and second row > 0.5x average row height
|
||
avg_h = sum(r["y_max"] - r["y_min"] for r in rows) / len(rows)
|
||
gap = second_row["y_min"] - first_row["y_max"]
|
||
if gap > avg_h * 0.5:
|
||
headers.append(0)
|
||
|
||
# Also check if first row words are taller than average (bold/header text)
|
||
all_heights = [w["height"] for w in zone_words]
|
||
median_h = sorted(all_heights)[len(all_heights) // 2] if all_heights else 20
|
||
first_row_words = [
|
||
w for w in zone_words
|
||
if first_row["y_min"] <= w["top"] + w["height"] / 2 <= first_row["y_max"]
|
||
]
|
||
if first_row_words:
|
||
first_h = max(w["height"] for w in first_row_words)
|
||
if first_h > median_h * 1.3:
|
||
if 0 not in headers:
|
||
headers.append(0)
|
||
|
||
# Note: Spanning-header detection (rows spanning all columns) has been
|
||
# disabled because it produces too many false positives on vocabulary
|
||
# worksheets where IPA transcriptions or short entries naturally span
|
||
# multiple columns with few words. The first-row heuristic above is
|
||
# sufficient for detecting real headers.
|
||
|
||
return headers
|
||
|
||
|
||
def _build_zone_grid(
|
||
zone_words: List[Dict],
|
||
zone_x: int,
|
||
zone_y: int,
|
||
zone_w: int,
|
||
zone_h: int,
|
||
zone_index: int,
|
||
img_w: int,
|
||
img_h: int,
|
||
global_columns: Optional[List[Dict]] = None,
|
||
skip_first_row_header: bool = False,
|
||
) -> Dict[str, Any]:
|
||
"""Build columns, rows, cells for a single zone from its words.
|
||
|
||
Args:
|
||
global_columns: If provided, use these pre-computed column boundaries
|
||
instead of detecting columns per zone. Used for content zones so
|
||
that all content zones (above/between/below boxes) share the same
|
||
column structure. Box zones always detect columns independently.
|
||
"""
|
||
if not zone_words:
|
||
return {
|
||
"columns": [],
|
||
"rows": [],
|
||
"cells": [],
|
||
"header_rows": [],
|
||
}
|
||
|
||
# Cluster rows first (needed for column alignment analysis)
|
||
rows = _cluster_rows(zone_words)
|
||
|
||
# Diagnostic logging for small/medium zones (box zones typically have 40-60 words)
|
||
if len(zone_words) <= 60:
|
||
import statistics as _st
|
||
_heights = [w['height'] for w in zone_words if w.get('height', 0) > 0]
|
||
_med_h = _st.median(_heights) if _heights else 20
|
||
_y_tol = max(_med_h * 0.5, 5)
|
||
logger.info(
|
||
"zone %d row-clustering: %d words, median_h=%.0f, y_tol=%.1f → %d rows",
|
||
zone_index, len(zone_words), _med_h, _y_tol, len(rows),
|
||
)
|
||
for w in sorted(zone_words, key=lambda ww: (ww['top'], ww['left'])):
|
||
logger.info(
|
||
" zone %d word: y=%d x=%d h=%d w=%d '%s'",
|
||
zone_index, w['top'], w['left'], w['height'], w['width'],
|
||
w.get('text', '')[:40],
|
||
)
|
||
for r in rows:
|
||
logger.info(
|
||
" zone %d row %d: y_min=%d y_max=%d y_center=%.0f",
|
||
zone_index, r['index'], r['y_min'], r['y_max'], r['y_center'],
|
||
)
|
||
|
||
# Use global columns if provided, otherwise detect per zone
|
||
columns = global_columns if global_columns else _cluster_columns_by_alignment(zone_words, zone_w, rows)
|
||
|
||
# Merge inline marker columns (bullets, numbering) into adjacent text
|
||
if not global_columns:
|
||
columns = _merge_inline_marker_columns(columns, zone_words)
|
||
|
||
if not columns or not rows:
|
||
return {
|
||
"columns": [],
|
||
"rows": [],
|
||
"cells": [],
|
||
"header_rows": [],
|
||
}
|
||
|
||
# Build cells
|
||
cells = _build_cells(zone_words, columns, rows, img_w, img_h)
|
||
|
||
# Prefix cell IDs with zone index
|
||
for cell in cells:
|
||
cell["cell_id"] = f"Z{zone_index}_{cell['cell_id']}"
|
||
cell["zone_index"] = zone_index
|
||
|
||
# Detect header rows (pass columns for spanning header detection)
|
||
header_rows = _detect_header_rows(rows, zone_words, zone_y, columns,
|
||
skip_first_row_header=skip_first_row_header)
|
||
|
||
# Merge cells in spanning header rows into a single col-0 cell
|
||
if header_rows and len(columns) >= 2:
|
||
for hri in header_rows:
|
||
header_cells = [c for c in cells if c["row_index"] == hri]
|
||
if len(header_cells) <= 1:
|
||
continue
|
||
# Collect all word_boxes and text from all columns
|
||
all_wb = []
|
||
all_text_parts = []
|
||
for hc in sorted(header_cells, key=lambda c: c["col_index"]):
|
||
all_wb.extend(hc.get("word_boxes", []))
|
||
if hc.get("text", "").strip():
|
||
all_text_parts.append(hc["text"].strip())
|
||
# Remove all header cells, replace with one spanning cell
|
||
cells = [c for c in cells if c["row_index"] != hri]
|
||
if all_wb:
|
||
x_min = min(wb["left"] for wb in all_wb)
|
||
y_min = min(wb["top"] for wb in all_wb)
|
||
x_max = max(wb["left"] + wb["width"] for wb in all_wb)
|
||
y_max = max(wb["top"] + wb["height"] for wb in all_wb)
|
||
cells.append({
|
||
"cell_id": f"R{hri:02d}_C0",
|
||
"row_index": hri,
|
||
"col_index": 0,
|
||
"col_type": "spanning_header",
|
||
"text": " ".join(all_text_parts),
|
||
"confidence": 0.0,
|
||
"bbox_px": {"x": x_min, "y": y_min,
|
||
"w": x_max - x_min, "h": y_max - y_min},
|
||
"bbox_pct": {
|
||
"x": round(x_min / img_w * 100, 2) if img_w else 0,
|
||
"y": round(y_min / img_h * 100, 2) if img_h else 0,
|
||
"w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0,
|
||
"h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0,
|
||
},
|
||
"word_boxes": all_wb,
|
||
"ocr_engine": "words_first",
|
||
"is_bold": True,
|
||
})
|
||
|
||
# Convert columns to output format with percentages
|
||
out_columns = []
|
||
for col in columns:
|
||
x_min = col["x_min"]
|
||
x_max = col["x_max"]
|
||
out_columns.append({
|
||
"index": col["index"],
|
||
"label": col["type"],
|
||
"x_min_px": round(x_min),
|
||
"x_max_px": round(x_max),
|
||
"x_min_pct": round(x_min / img_w * 100, 2) if img_w else 0,
|
||
"x_max_pct": round(x_max / img_w * 100, 2) if img_w else 0,
|
||
"bold": False,
|
||
})
|
||
|
||
# Convert rows to output format with percentages
|
||
out_rows = []
|
||
for row in rows:
|
||
out_rows.append({
|
||
"index": row["index"],
|
||
"y_min_px": round(row["y_min"]),
|
||
"y_max_px": round(row["y_max"]),
|
||
"y_min_pct": round(row["y_min"] / img_h * 100, 2) if img_h else 0,
|
||
"y_max_pct": round(row["y_max"] / img_h * 100, 2) if img_h else 0,
|
||
"is_header": row["index"] in header_rows,
|
||
})
|
||
|
||
return {
|
||
"columns": out_columns,
|
||
"rows": out_rows,
|
||
"cells": cells,
|
||
"header_rows": header_rows,
|
||
"_raw_columns": columns, # internal: for propagation to other zones
|
||
}
|
||
|
||
|
||
def _get_content_bounds(words: List[Dict]) -> tuple:
|
||
"""Get content bounds from word positions."""
|
||
if not words:
|
||
return 0, 0, 0, 0
|
||
x_min = min(w["left"] for w in words)
|
||
y_min = min(w["top"] for w in words)
|
||
x_max = max(w["left"] + w["width"] for w in words)
|
||
y_max = max(w["top"] + w["height"] for w in words)
|
||
return x_min, y_min, x_max - x_min, y_max - y_min
|
||
|
||
|
||
def _filter_decorative_margin(
|
||
words: List[Dict],
|
||
img_w: int,
|
||
log: Any,
|
||
session_id: str,
|
||
) -> None:
|
||
"""Remove words that belong to a decorative alphabet strip on a margin.
|
||
|
||
Some vocabulary worksheets have a vertical A–Z alphabet graphic along
|
||
the left or right edge. OCR reads each letter as an isolated single-
|
||
character word. These decorative elements are not content and confuse
|
||
column/row detection.
|
||
|
||
Detection criteria (phase 1 — find the strip using single-char words):
|
||
- Words are in the outer 30% of the page (left or right)
|
||
- Nearly all words are single characters (letters or digits)
|
||
- At least 8 such words form a vertical strip (≥8 unique Y positions)
|
||
- Average horizontal spread of the strip is small (< 80px)
|
||
|
||
Phase 2 — once a strip is confirmed, also remove any short word (≤3
|
||
chars) in the same narrow x-range. This catches multi-char OCR
|
||
artifacts like "Vv" that belong to the same decorative element.
|
||
|
||
Modifies *words* in place.
|
||
"""
|
||
if not words or img_w <= 0:
|
||
return
|
||
|
||
margin_cutoff = img_w * 0.30
|
||
# Phase 1: find candidate strips using single-char words
|
||
left_strip = [
|
||
w for w in words
|
||
if len((w.get("text") or "").strip()) == 1
|
||
and w["left"] + w.get("width", 0) / 2 < margin_cutoff
|
||
]
|
||
right_strip = [
|
||
w for w in words
|
||
if len((w.get("text") or "").strip()) == 1
|
||
and w["left"] + w.get("width", 0) / 2 > img_w - margin_cutoff
|
||
]
|
||
|
||
for strip, side in [(left_strip, "left"), (right_strip, "right")]:
|
||
if len(strip) < 8:
|
||
continue
|
||
# Check vertical distribution: should have many distinct Y positions
|
||
y_centers = sorted(set(
|
||
int(w["top"] + w.get("height", 0) / 2) // 20 * 20 # bucket
|
||
for w in strip
|
||
))
|
||
if len(y_centers) < 6:
|
||
continue
|
||
# Check horizontal compactness
|
||
x_positions = [w["left"] for w in strip]
|
||
x_min = min(x_positions)
|
||
x_max = max(x_positions)
|
||
x_spread = x_max - x_min
|
||
if x_spread > 80:
|
||
continue
|
||
|
||
# Phase 2: strip confirmed — also collect short words in same x-range
|
||
# Expand x-range slightly to catch neighbors (e.g. "Vv" next to "U")
|
||
strip_x_lo = x_min - 20
|
||
strip_x_hi = x_max + 60 # word width + tolerance
|
||
all_strip_words = [
|
||
w for w in words
|
||
if len((w.get("text") or "").strip()) <= 3
|
||
and strip_x_lo <= w["left"] <= strip_x_hi
|
||
and (w["left"] + w.get("width", 0) / 2 < margin_cutoff
|
||
if side == "left"
|
||
else w["left"] + w.get("width", 0) / 2 > img_w - margin_cutoff)
|
||
]
|
||
|
||
strip_set = set(id(w) for w in all_strip_words)
|
||
before = len(words)
|
||
words[:] = [w for w in words if id(w) not in strip_set]
|
||
removed = before - len(words)
|
||
if removed:
|
||
log.info(
|
||
"build-grid session %s: removed %d decorative %s-margin words "
|
||
"(strip x=%d-%d)",
|
||
session_id, removed, side, strip_x_lo, strip_x_hi,
|
||
)
|
||
|
||
|
||
def _filter_footer_words(
|
||
words: List[Dict],
|
||
img_h: int,
|
||
log: Any,
|
||
session_id: str,
|
||
) -> None:
|
||
"""Remove isolated words in the bottom 5% of the page (page numbers).
|
||
|
||
Modifies *words* in place.
|
||
"""
|
||
if not words or img_h <= 0:
|
||
return
|
||
footer_y = img_h * 0.95
|
||
footer_words = [
|
||
w for w in words
|
||
if w["top"] + w.get("height", 0) / 2 > footer_y
|
||
]
|
||
if not footer_words:
|
||
return
|
||
# Only remove if footer has very few words (≤ 3) with short text
|
||
total_text = "".join((w.get("text") or "").strip() for w in footer_words)
|
||
if len(footer_words) <= 3 and len(total_text) <= 10:
|
||
footer_set = set(id(w) for w in footer_words)
|
||
words[:] = [w for w in words if id(w) not in footer_set]
|
||
log.info(
|
||
"build-grid session %s: removed %d footer words ('%s')",
|
||
session_id, len(footer_words), total_text,
|
||
)
|
||
|
||
|
||
def _filter_header_junk(
|
||
words: List[Dict],
|
||
img_h: int,
|
||
log: Any,
|
||
session_id: str,
|
||
) -> None:
|
||
"""Remove OCR junk from header illustrations above the real content.
|
||
|
||
Textbook pages often have decorative header graphics (illustrations,
|
||
icons) that OCR reads as low-confidence junk characters. Real content
|
||
typically starts further down the page.
|
||
|
||
Algorithm:
|
||
1. Find the "content start" — the first Y position where a dense
|
||
horizontal row of 3+ high-confidence words begins.
|
||
2. Above that line, remove words with conf < 75 and text ≤ 3 chars.
|
||
These are almost certainly OCR artifacts from illustrations.
|
||
|
||
Modifies *words* in place.
|
||
"""
|
||
if not words or img_h <= 0:
|
||
return
|
||
|
||
# --- Find content start: first horizontal row with ≥3 high-conf words ---
|
||
# Sort words by Y
|
||
sorted_by_y = sorted(words, key=lambda w: w["top"])
|
||
content_start_y = 0
|
||
_ROW_TOLERANCE = img_h * 0.02 # words within 2% of page height = same row
|
||
_MIN_ROW_WORDS = 3
|
||
_MIN_CONF = 80
|
||
|
||
i = 0
|
||
while i < len(sorted_by_y):
|
||
row_y = sorted_by_y[i]["top"]
|
||
# Collect words in this row band
|
||
row_words = []
|
||
j = i
|
||
while j < len(sorted_by_y) and sorted_by_y[j]["top"] - row_y < _ROW_TOLERANCE:
|
||
row_words.append(sorted_by_y[j])
|
||
j += 1
|
||
# Count high-confidence words with real text (> 1 char)
|
||
high_conf = [
|
||
w for w in row_words
|
||
if w.get("conf", 0) >= _MIN_CONF
|
||
and len((w.get("text") or "").strip()) > 1
|
||
]
|
||
if len(high_conf) >= _MIN_ROW_WORDS:
|
||
content_start_y = row_y
|
||
break
|
||
i = j if j > i else i + 1
|
||
|
||
if content_start_y <= 0:
|
||
return # no clear content start found
|
||
|
||
# --- Remove low-conf short junk above content start ---
|
||
junk = [
|
||
w for w in words
|
||
if w["top"] + w.get("height", 0) < content_start_y
|
||
and w.get("conf", 0) < 75
|
||
and len((w.get("text") or "").strip()) <= 3
|
||
]
|
||
if not junk:
|
||
return
|
||
|
||
junk_set = set(id(w) for w in junk)
|
||
before = len(words)
|
||
words[:] = [w for w in words if id(w) not in junk_set]
|
||
removed = before - len(words)
|
||
if removed:
|
||
log.info(
|
||
"build-grid session %s: removed %d header junk words above y=%d "
|
||
"(content start)",
|
||
session_id, removed, content_start_y,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Core computation (used by build-grid endpoint and regression tests)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def _build_grid_core(session_id: str, session: dict) -> dict:
|
||
"""Core grid building logic — pure computation, no HTTP or DB side effects.
|
||
|
||
Args:
|
||
session_id: Session identifier (for logging and image loading).
|
||
session: Full session dict from get_session_db().
|
||
|
||
Returns:
|
||
StructuredGrid result dict.
|
||
|
||
Raises:
|
||
ValueError: If session data is incomplete.
|
||
"""
|
||
t0 = time.time()
|
||
|
||
# 1. Validate and load word results
|
||
word_result = session.get("word_result")
|
||
if not word_result or not word_result.get("cells"):
|
||
raise ValueError("No word results found. Run paddle-kombi or rapid-kombi first.")
|
||
|
||
img_w = word_result.get("image_width", 0)
|
||
img_h = word_result.get("image_height", 0)
|
||
if not img_w or not img_h:
|
||
raise ValueError("Missing image dimensions in word_result")
|
||
|
||
# 2. Flatten all word boxes from cells
|
||
all_words = _flatten_word_boxes(word_result["cells"])
|
||
if not all_words:
|
||
raise ValueError("No word boxes found in cells")
|
||
|
||
logger.info("build-grid session %s: %d words from %d cells",
|
||
session_id, len(all_words), len(word_result["cells"]))
|
||
|
||
# 2b. Filter decorative margin columns (alphabet graphics).
|
||
# Some worksheets have a decorative alphabet strip along one margin
|
||
# (A-Z in a graphic). OCR reads these as single-char words aligned
|
||
# vertically. Detect and remove them before grid building.
|
||
_filter_decorative_margin(all_words, img_w, logger, session_id)
|
||
|
||
# 2c. Filter footer rows (page numbers at the very bottom).
|
||
# Isolated short text in the bottom 5% of the page is typically a
|
||
# page number ("64", "S. 12") and not real content.
|
||
_filter_footer_words(all_words, img_h, logger, session_id)
|
||
|
||
# 2c2. Filter OCR junk from header illustrations.
|
||
# Low-confidence short fragments above the first real content row.
|
||
_filter_header_junk(all_words, img_h, logger, session_id)
|
||
|
||
# 2d. Filter words inside user-defined exclude regions (from Structure step).
|
||
# These are explicitly marked by the user, so ALL words inside are removed
|
||
# regardless of confidence.
|
||
structure_result = session.get("structure_result")
|
||
exclude_rects = []
|
||
if structure_result:
|
||
for er in structure_result.get("exclude_regions", []):
|
||
exclude_rects.append({
|
||
"x": er["x"], "y": er["y"],
|
||
"w": er["w"], "h": er["h"],
|
||
})
|
||
if exclude_rects:
|
||
before = len(all_words)
|
||
filtered = []
|
||
for w in all_words:
|
||
w_cx = w["left"] + w.get("width", 0) / 2
|
||
w_cy = w["top"] + w.get("height", 0) / 2
|
||
inside = any(
|
||
er["x"] <= w_cx <= er["x"] + er["w"]
|
||
and er["y"] <= w_cy <= er["y"] + er["h"]
|
||
for er in exclude_rects
|
||
)
|
||
if not inside:
|
||
filtered.append(w)
|
||
removed = before - len(filtered)
|
||
if removed:
|
||
all_words = filtered
|
||
logger.info(
|
||
"build-grid session %s: removed %d words inside %d user exclude region(s)",
|
||
session_id, removed, len(exclude_rects),
|
||
)
|
||
|
||
# 2e. Filter words inside detected graphic/image regions
|
||
# Only remove LOW-CONFIDENCE words (likely OCR artifacts from images).
|
||
# High-confidence words are real text even if they overlap a detected
|
||
# graphic region (e.g. colored text that graphic detection couldn't
|
||
# fully distinguish from an image).
|
||
_GRAPHIC_CONF_THRESHOLD = 50 # keep words with conf >= 50
|
||
graphic_rects = []
|
||
if structure_result:
|
||
for g in structure_result.get("graphics", []):
|
||
graphic_rects.append({
|
||
"x": g["x"], "y": g["y"],
|
||
"w": g["w"], "h": g["h"],
|
||
})
|
||
if graphic_rects:
|
||
before = len(all_words)
|
||
filtered = []
|
||
for w in all_words:
|
||
w_cx = w["left"] + w.get("width", 0) / 2
|
||
w_cy = w["top"] + w.get("height", 0) / 2
|
||
inside = any(
|
||
gr["x"] <= w_cx <= gr["x"] + gr["w"]
|
||
and gr["y"] <= w_cy <= gr["y"] + gr["h"]
|
||
for gr in graphic_rects
|
||
)
|
||
if inside and w.get("conf", 0) < _GRAPHIC_CONF_THRESHOLD:
|
||
continue # remove low-confidence artifact
|
||
filtered.append(w)
|
||
removed = before - len(filtered)
|
||
if removed:
|
||
all_words = filtered
|
||
logger.info(
|
||
"build-grid session %s: removed %d low-conf words inside %d graphic region(s)",
|
||
session_id, removed, len(graphic_rects),
|
||
)
|
||
|
||
# 3. Load image for box detection
|
||
img_png = await get_session_image(session_id, "cropped")
|
||
if not img_png:
|
||
img_png = await get_session_image(session_id, "dewarped")
|
||
if not img_png:
|
||
img_png = await get_session_image(session_id, "original")
|
||
|
||
zones_data: List[Dict[str, Any]] = []
|
||
boxes_detected = 0
|
||
recovered_count = 0
|
||
img_bgr = None
|
||
|
||
content_x, content_y, content_w, content_h = _get_content_bounds(all_words)
|
||
|
||
if img_png:
|
||
# Decode image for color detection + box detection
|
||
arr = np.frombuffer(img_png, dtype=np.uint8)
|
||
img_bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
|
||
|
||
if img_bgr is not None:
|
||
# --- Recover colored text that OCR missed (before grid building) ---
|
||
recovered = recover_colored_text(img_bgr, all_words)
|
||
if recovered and graphic_rects:
|
||
# Filter recovered chars inside graphic regions
|
||
recovered = [
|
||
r for r in recovered
|
||
if not any(
|
||
gr["x"] <= r["left"] + r.get("width", 0) / 2 <= gr["x"] + gr["w"]
|
||
and gr["y"] <= r["top"] + r.get("height", 0) / 2 <= gr["y"] + gr["h"]
|
||
for gr in graphic_rects
|
||
)
|
||
]
|
||
if recovered:
|
||
recovered_count = len(recovered)
|
||
all_words.extend(recovered)
|
||
logger.info(
|
||
"build-grid session %s: +%d recovered colored words",
|
||
session_id, recovered_count,
|
||
)
|
||
|
||
# Detect bordered boxes
|
||
boxes = detect_boxes(
|
||
img_bgr,
|
||
content_x=content_x,
|
||
content_w=content_w,
|
||
content_y=content_y,
|
||
content_h=content_h,
|
||
)
|
||
boxes_detected = len(boxes)
|
||
|
||
if boxes:
|
||
# Filter border ghost words before grid building
|
||
all_words, ghost_count = _filter_border_ghosts(all_words, boxes)
|
||
if ghost_count:
|
||
logger.info(
|
||
"build-grid session %s: removed %d border ghost words",
|
||
session_id, ghost_count,
|
||
)
|
||
|
||
# Split page into zones
|
||
page_zones = split_page_into_zones(
|
||
content_x, content_y, content_w, content_h, boxes
|
||
)
|
||
|
||
# Merge content zones separated by box zones
|
||
page_zones = _merge_content_zones_across_boxes(
|
||
page_zones, content_x, content_w
|
||
)
|
||
|
||
# --- Union columns from all content zones ---
|
||
# Each content zone detects columns independently. Narrow
|
||
# columns (page refs, markers) may appear in only one zone.
|
||
# Merge column split-points from ALL content zones so every
|
||
# zone shares the full column set.
|
||
|
||
# First pass: build grids per zone independently
|
||
zone_grids: List[Dict] = []
|
||
|
||
for pz in page_zones:
|
||
zone_words = _words_in_zone(
|
||
all_words, pz.y, pz.height, pz.x, pz.width
|
||
)
|
||
# Filter recovered single-char artifacts in ALL zones
|
||
# (decorative colored pixel blobs like !, ?, • from
|
||
# recover_colored_text that don't represent real text)
|
||
before = len(zone_words)
|
||
zone_words = [
|
||
w for w in zone_words
|
||
if not (
|
||
w.get("recovered")
|
||
and len(w.get("text", "").strip()) <= 2
|
||
)
|
||
]
|
||
removed = before - len(zone_words)
|
||
if removed:
|
||
logger.info(
|
||
"build-grid: filtered %d recovered artifacts from %s zone %d",
|
||
removed, pz.zone_type, pz.index,
|
||
)
|
||
# Filter words inside image overlay regions (merged box zones)
|
||
if pz.image_overlays:
|
||
before_ov = len(zone_words)
|
||
zone_words = [
|
||
w for w in zone_words
|
||
if not any(
|
||
ov["y"] <= w["top"] + w["height"] / 2 <= ov["y"] + ov["height"]
|
||
and ov["x"] <= w["left"] + w["width"] / 2 <= ov["x"] + ov["width"]
|
||
for ov in pz.image_overlays
|
||
)
|
||
]
|
||
ov_removed = before_ov - len(zone_words)
|
||
if ov_removed:
|
||
logger.info(
|
||
"build-grid: filtered %d words inside image overlays from zone %d",
|
||
ov_removed, pz.index,
|
||
)
|
||
grid = _build_zone_grid(
|
||
zone_words, pz.x, pz.y, pz.width, pz.height,
|
||
pz.index, img_w, img_h,
|
||
skip_first_row_header=bool(pz.image_overlays),
|
||
)
|
||
zone_grids.append({"pz": pz, "words": zone_words, "grid": grid})
|
||
|
||
# Second pass: merge column boundaries from all content zones
|
||
content_zones = [
|
||
zg for zg in zone_grids if zg["pz"].zone_type == "content"
|
||
]
|
||
if len(content_zones) > 1:
|
||
# Collect column split points (x_min of non-first columns)
|
||
all_split_xs: List[float] = []
|
||
for zg in content_zones:
|
||
raw_cols = zg["grid"].get("_raw_columns", [])
|
||
for col in raw_cols[1:]:
|
||
all_split_xs.append(col["x_min"])
|
||
|
||
if all_split_xs:
|
||
all_split_xs.sort()
|
||
merge_distance = max(25, int(content_w * 0.03))
|
||
merged_xs = [all_split_xs[0]]
|
||
for x in all_split_xs[1:]:
|
||
if x - merged_xs[-1] < merge_distance:
|
||
merged_xs[-1] = (merged_xs[-1] + x) / 2
|
||
else:
|
||
merged_xs.append(x)
|
||
|
||
total_cols = len(merged_xs) + 1
|
||
max_zone_cols = max(
|
||
len(zg["grid"].get("_raw_columns", []))
|
||
for zg in content_zones
|
||
)
|
||
|
||
# Apply union whenever it has at least as many
|
||
# columns as the best single zone. Even with the
|
||
# same count the union boundaries are better because
|
||
# they incorporate evidence from all zones.
|
||
if total_cols >= max_zone_cols:
|
||
cx_min = min(w["left"] for w in all_words)
|
||
cx_max = max(
|
||
w["left"] + w["width"] for w in all_words
|
||
)
|
||
merged_columns: List[Dict[str, Any]] = []
|
||
prev_x = cx_min
|
||
for i, sx in enumerate(merged_xs):
|
||
merged_columns.append({
|
||
"index": i,
|
||
"type": f"column_{i + 1}",
|
||
"x_min": prev_x,
|
||
"x_max": sx,
|
||
})
|
||
prev_x = sx
|
||
merged_columns.append({
|
||
"index": len(merged_xs),
|
||
"type": f"column_{len(merged_xs) + 1}",
|
||
"x_min": prev_x,
|
||
"x_max": cx_max,
|
||
})
|
||
|
||
# Re-build ALL content zones with merged columns
|
||
for zg in zone_grids:
|
||
pz = zg["pz"]
|
||
if pz.zone_type == "content":
|
||
grid = _build_zone_grid(
|
||
zg["words"], pz.x, pz.y,
|
||
pz.width, pz.height,
|
||
pz.index, img_w, img_h,
|
||
global_columns=merged_columns,
|
||
skip_first_row_header=bool(pz.image_overlays),
|
||
)
|
||
zg["grid"] = grid
|
||
logger.info(
|
||
"build-grid session %s: union of %d content "
|
||
"zones → %d merged columns (max single zone: %d)",
|
||
session_id, len(content_zones),
|
||
total_cols, max_zone_cols,
|
||
)
|
||
|
||
for zg in zone_grids:
|
||
pz = zg["pz"]
|
||
grid = zg["grid"]
|
||
# Remove internal _raw_columns before adding to response
|
||
grid.pop("_raw_columns", None)
|
||
|
||
zone_entry: Dict[str, Any] = {
|
||
"zone_index": pz.index,
|
||
"zone_type": pz.zone_type,
|
||
"bbox_px": {
|
||
"x": pz.x, "y": pz.y,
|
||
"w": pz.width, "h": pz.height,
|
||
},
|
||
"bbox_pct": {
|
||
"x": round(pz.x / img_w * 100, 2) if img_w else 0,
|
||
"y": round(pz.y / img_h * 100, 2) if img_h else 0,
|
||
"w": round(pz.width / img_w * 100, 2) if img_w else 0,
|
||
"h": round(pz.height / img_h * 100, 2) if img_h else 0,
|
||
},
|
||
"border": None,
|
||
"word_count": len(zg["words"]),
|
||
**grid,
|
||
}
|
||
|
||
if pz.box:
|
||
zone_entry["border"] = {
|
||
"thickness": pz.box.border_thickness,
|
||
"confidence": pz.box.confidence,
|
||
}
|
||
|
||
if pz.image_overlays:
|
||
zone_entry["image_overlays"] = pz.image_overlays
|
||
|
||
zones_data.append(zone_entry)
|
||
|
||
# 4. Fallback: no boxes detected → single zone with all words
|
||
if not zones_data:
|
||
# Filter recovered single-char artifacts (same as in zone loop above)
|
||
before = len(all_words)
|
||
filtered_words = [
|
||
w for w in all_words
|
||
if not (w.get("recovered") and len(w.get("text", "").strip()) <= 2)
|
||
]
|
||
removed = before - len(filtered_words)
|
||
if removed:
|
||
logger.info(
|
||
"build-grid session %s: filtered %d recovered artifacts (fallback zone)",
|
||
session_id, removed,
|
||
)
|
||
grid = _build_zone_grid(
|
||
filtered_words, content_x, content_y, content_w, content_h,
|
||
0, img_w, img_h,
|
||
)
|
||
grid.pop("_raw_columns", None)
|
||
zones_data.append({
|
||
"zone_index": 0,
|
||
"zone_type": "content",
|
||
"bbox_px": {
|
||
"x": content_x, "y": content_y,
|
||
"w": content_w, "h": content_h,
|
||
},
|
||
"bbox_pct": {
|
||
"x": round(content_x / img_w * 100, 2) if img_w else 0,
|
||
"y": round(content_y / img_h * 100, 2) if img_h else 0,
|
||
"w": round(content_w / img_w * 100, 2) if img_w else 0,
|
||
"h": round(content_h / img_h * 100, 2) if img_h else 0,
|
||
},
|
||
"border": None,
|
||
"word_count": len(all_words),
|
||
**grid,
|
||
})
|
||
|
||
# 4b. Remove junk rows: rows where ALL cells contain only short,
|
||
# low-confidence text (OCR noise, stray marks). Real vocabulary rows
|
||
# have at least one word with conf >= 50 or meaningful text length.
|
||
# Also remove "oversized stub" rows: rows with ≤2 very short words
|
||
# whose word-boxes are significantly taller than the median (e.g.
|
||
# large red page numbers like "( 9" that are not real text content).
|
||
_JUNK_CONF_THRESHOLD = 50
|
||
_JUNK_MAX_TEXT_LEN = 3
|
||
for z in zones_data:
|
||
cells = z.get("cells", [])
|
||
rows = z.get("rows", [])
|
||
if not cells or not rows:
|
||
continue
|
||
|
||
# Compute median word height across the zone for oversized detection
|
||
all_wb_heights = [
|
||
wb["height"]
|
||
for cell in cells
|
||
for wb in cell.get("word_boxes") or []
|
||
if wb.get("height", 0) > 0
|
||
]
|
||
median_wb_h = sorted(all_wb_heights)[len(all_wb_heights) // 2] if all_wb_heights else 28
|
||
|
||
junk_row_indices = set()
|
||
for row in rows:
|
||
ri = row["index"]
|
||
row_cells = [c for c in cells if c.get("row_index") == ri]
|
||
if not row_cells:
|
||
continue
|
||
|
||
row_wbs = [
|
||
wb for cell in row_cells
|
||
for wb in cell.get("word_boxes") or []
|
||
]
|
||
|
||
# Rule 1: ALL word_boxes are low-conf AND short text
|
||
all_junk = True
|
||
for wb in row_wbs:
|
||
text = (wb.get("text") or "").strip()
|
||
conf = wb.get("conf", 0)
|
||
if conf >= _JUNK_CONF_THRESHOLD or len(text) > _JUNK_MAX_TEXT_LEN:
|
||
all_junk = False
|
||
break
|
||
if all_junk and row_wbs:
|
||
junk_row_indices.add(ri)
|
||
continue
|
||
|
||
# Rule 2: oversized stub — ≤3 words, short total text,
|
||
# and word height > 1.8× median (page numbers, stray marks,
|
||
# OCR from illustration labels like "SEA &")
|
||
if len(row_wbs) <= 3:
|
||
total_text = "".join((wb.get("text") or "").strip() for wb in row_wbs)
|
||
max_h = max((wb.get("height", 0) for wb in row_wbs), default=0)
|
||
if len(total_text) <= 5 and max_h > median_wb_h * 1.8:
|
||
junk_row_indices.add(ri)
|
||
continue
|
||
|
||
# Rule 3: scattered debris — rows with only tiny fragments
|
||
# (e.g. OCR artifacts from illustrations/graphics).
|
||
# If the row has no word longer than 2 chars, it's noise.
|
||
longest = max(len((wb.get("text") or "").strip()) for wb in row_wbs)
|
||
if longest <= 2:
|
||
junk_row_indices.add(ri)
|
||
continue
|
||
|
||
if junk_row_indices:
|
||
z["cells"] = [c for c in cells if c.get("row_index") not in junk_row_indices]
|
||
z["rows"] = [r for r in rows if r["index"] not in junk_row_indices]
|
||
logger.info(
|
||
"build-grid: removed %d junk rows from zone %d: %s",
|
||
len(junk_row_indices), z["zone_index"],
|
||
sorted(junk_row_indices),
|
||
)
|
||
|
||
# 4c. Remove oversized word_boxes from individual cells.
|
||
# OCR artifacts from graphics/images (e.g. a huge "N" from a map image)
|
||
# have word heights 3-5x the median. Remove them per-word so they don't
|
||
# pollute cells that also contain valid text in other columns.
|
||
for z in zones_data:
|
||
cells = z.get("cells", [])
|
||
if not cells:
|
||
continue
|
||
all_wh = [
|
||
wb["height"]
|
||
for cell in cells
|
||
for wb in cell.get("word_boxes") or []
|
||
if wb.get("height", 0) > 0
|
||
]
|
||
if not all_wh:
|
||
continue
|
||
med_h = sorted(all_wh)[len(all_wh) // 2]
|
||
oversized_threshold = med_h * 3
|
||
removed_oversized = 0
|
||
for cell in cells:
|
||
wbs = cell.get("word_boxes") or []
|
||
filtered = [wb for wb in wbs if wb.get("height", 0) <= oversized_threshold]
|
||
if len(filtered) < len(wbs):
|
||
removed_oversized += len(wbs) - len(filtered)
|
||
cell["word_boxes"] = filtered
|
||
cell["text"] = " ".join(
|
||
wb.get("text", "").strip()
|
||
for wb in sorted(filtered, key=lambda w: (w.get("top", 0), w.get("left", 0)))
|
||
if wb.get("text", "").strip()
|
||
)
|
||
if removed_oversized:
|
||
# Remove cells that became empty after oversized removal
|
||
z["cells"] = [c for c in cells if c.get("word_boxes")]
|
||
logger.info(
|
||
"build-grid: removed %d oversized word_boxes (>%dpx) from zone %d",
|
||
removed_oversized, oversized_threshold, z.get("zone_index", 0),
|
||
)
|
||
|
||
# 5. Color annotation on final word_boxes in cells
|
||
if img_bgr is not None:
|
||
all_wb: List[Dict] = []
|
||
for z in zones_data:
|
||
for cell in z.get("cells", []):
|
||
all_wb.extend(cell.get("word_boxes", []))
|
||
detect_word_colors(img_bgr, all_wb)
|
||
|
||
# 5a. Heading detection by color + height (after color is available)
|
||
heading_count = _detect_heading_rows_by_color(zones_data, img_w, img_h)
|
||
if heading_count:
|
||
logger.info("Detected %d heading rows by color+height", heading_count)
|
||
|
||
# 5b. Fix unmatched parentheses in cell text
|
||
# OCR often misses opening "(" while detecting closing ")".
|
||
# If a cell's text has ")" without a matching "(", prepend "(".
|
||
for z in zones_data:
|
||
for cell in z.get("cells", []):
|
||
text = cell.get("text", "")
|
||
if ")" in text and "(" not in text:
|
||
cell["text"] = "(" + text
|
||
|
||
# 5c. IPA phonetic correction — replace garbled OCR phonetics with
|
||
# correct IPA from the dictionary (same as in the OCR pipeline).
|
||
# Only applies to vocabulary tables (≥3 columns: EN | article | DE).
|
||
# Single/two-column layouts are continuous text, not vocab tables.
|
||
all_cells = [cell for z in zones_data for cell in z.get("cells", [])]
|
||
total_cols = sum(len(z.get("columns", [])) for z in zones_data)
|
||
if total_cols >= 3:
|
||
# Find the column that contains IPA brackets → English headwords.
|
||
# Count cells with bracket patterns per col_type. The column with
|
||
# the most brackets is the headword column (IPA sits after or below
|
||
# headwords). Falls back to longest-average if no brackets found.
|
||
col_bracket_count: Dict[str, int] = {}
|
||
col_avg_len: Dict[str, List[int]] = {}
|
||
for cell in all_cells:
|
||
ct = cell.get("col_type", "")
|
||
txt = cell.get("text", "") or ""
|
||
col_avg_len.setdefault(ct, []).append(len(txt))
|
||
if ct.startswith("column_") and '[' in txt:
|
||
col_bracket_count[ct] = col_bracket_count.get(ct, 0) + 1
|
||
# Pick column with most bracket IPA patterns
|
||
en_col_type = None
|
||
if col_bracket_count:
|
||
en_col_type = max(col_bracket_count, key=col_bracket_count.get)
|
||
else:
|
||
# Fallback: longest average text
|
||
best_avg = 0
|
||
for ct, lengths in col_avg_len.items():
|
||
if not ct.startswith("column_"):
|
||
continue
|
||
avg = sum(lengths) / len(lengths) if lengths else 0
|
||
if avg > best_avg:
|
||
best_avg = avg
|
||
en_col_type = ct
|
||
if en_col_type:
|
||
for cell in all_cells:
|
||
if cell.get("col_type") == en_col_type:
|
||
cell["_orig_col_type"] = en_col_type
|
||
cell["col_type"] = "column_en"
|
||
fix_cell_phonetics(all_cells, pronunciation="british")
|
||
for cell in all_cells:
|
||
orig = cell.pop("_orig_col_type", None)
|
||
if orig:
|
||
cell["col_type"] = orig
|
||
|
||
# 5d. Fix IPA continuation cells — cells where the printed
|
||
# phonetic transcription wraps to a line below the headword.
|
||
# These contain garbled IPA (e.g. "[n, nn]", "[1uedtX,1]").
|
||
# Replace garbled text with proper IPA looked up from the
|
||
# headword in the previous row's same column.
|
||
# Note: We check ALL columns, not just en_col_type, because
|
||
# the EN headword column may not be the longest-average column.
|
||
ipa_cont_fixed = 0
|
||
for z in zones_data:
|
||
rows_sorted = sorted(z.get("rows", []), key=lambda r: r["index"])
|
||
z_cells = z.get("cells", [])
|
||
for idx, row in enumerate(rows_sorted):
|
||
if idx == 0:
|
||
continue
|
||
ri = row["index"]
|
||
row_cells = [c for c in z_cells if c.get("row_index") == ri]
|
||
for cell in row_cells:
|
||
ct = cell.get("col_type", "")
|
||
if not ct.startswith("column_"):
|
||
continue
|
||
cell_text = (cell.get("text") or "").strip()
|
||
# Only treat as continuation when text is entirely
|
||
# inside brackets — e.g. "[n, nn]", "[klaoz 'daun]".
|
||
# Text like "employee [im'ploi:]" has a headword
|
||
# OUTSIDE brackets and must NOT be overwritten.
|
||
if not (cell_text.startswith('[') and cell_text.endswith(']')):
|
||
continue
|
||
if not _text_has_garbled_ipa(cell_text):
|
||
continue
|
||
# Already has proper IPA brackets → already fixed
|
||
if re.search(r'\[[^\]]*[ˈˌəɪɛɒʊʌæɑɔʃʒθðŋ][^\]]*\]', cell_text):
|
||
continue
|
||
# Find headword in previous row, same column
|
||
prev_ri = rows_sorted[idx - 1]["index"]
|
||
prev_same_col = [
|
||
c for c in z_cells
|
||
if c.get("row_index") == prev_ri
|
||
and c.get("col_type") == ct
|
||
]
|
||
if not prev_same_col:
|
||
continue
|
||
prev_text = prev_same_col[0].get("text", "")
|
||
fixed = fix_ipa_continuation_cell(
|
||
cell_text, prev_text, pronunciation="british",
|
||
)
|
||
if fixed != cell_text:
|
||
cell["text"] = fixed
|
||
ipa_cont_fixed += 1
|
||
logger.info(
|
||
"IPA continuation R%d %s: '%s' → '%s'",
|
||
ri, ct, cell_text, fixed,
|
||
)
|
||
if ipa_cont_fixed:
|
||
logger.info("Fixed %d IPA continuation cells", ipa_cont_fixed)
|
||
|
||
duration = time.time() - t0
|
||
|
||
# 6. Build result
|
||
total_cells = sum(len(z.get("cells", [])) for z in zones_data)
|
||
total_columns = sum(len(z.get("columns", [])) for z in zones_data)
|
||
total_rows = sum(len(z.get("rows", [])) for z in zones_data)
|
||
|
||
# Collect color statistics from all word_boxes in cells
|
||
color_stats: Dict[str, int] = {}
|
||
for z in zones_data:
|
||
for cell in z.get("cells", []):
|
||
for wb in cell.get("word_boxes", []):
|
||
cn = wb.get("color_name", "black")
|
||
color_stats[cn] = color_stats.get(cn, 0) + 1
|
||
|
||
# Compute layout metrics for faithful grid reconstruction
|
||
all_content_row_heights: List[float] = []
|
||
for z in zones_data:
|
||
for row in z.get("rows", []):
|
||
if not row.get("is_header", False):
|
||
h = row.get("y_max_px", 0) - row.get("y_min_px", 0)
|
||
if h > 0:
|
||
all_content_row_heights.append(h)
|
||
avg_row_height = (
|
||
sum(all_content_row_heights) / len(all_content_row_heights)
|
||
if all_content_row_heights else 30.0
|
||
)
|
||
font_size_suggestion = max(10, int(avg_row_height * 0.6))
|
||
|
||
result = {
|
||
"session_id": session_id,
|
||
"image_width": img_w,
|
||
"image_height": img_h,
|
||
"zones": zones_data,
|
||
"boxes_detected": boxes_detected,
|
||
"summary": {
|
||
"total_zones": len(zones_data),
|
||
"total_columns": total_columns,
|
||
"total_rows": total_rows,
|
||
"total_cells": total_cells,
|
||
"total_words": len(all_words),
|
||
"recovered_colored": recovered_count,
|
||
"color_stats": color_stats,
|
||
},
|
||
"formatting": {
|
||
"bold_columns": [],
|
||
"header_rows": [],
|
||
},
|
||
"layout_metrics": {
|
||
"page_width_px": img_w,
|
||
"page_height_px": img_h,
|
||
"avg_row_height_px": round(avg_row_height, 1),
|
||
"font_size_suggestion_px": font_size_suggestion,
|
||
},
|
||
"duration_seconds": round(duration, 2),
|
||
}
|
||
|
||
return result
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Endpoints
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@router.post("/sessions/{session_id}/build-grid")
|
||
async def build_grid(session_id: str):
|
||
"""Build a structured, zone-aware grid from existing Kombi word results.
|
||
|
||
Requires that paddle-kombi or rapid-kombi has already been run on the session.
|
||
Uses the image for box detection and the word positions for grid structuring.
|
||
|
||
Returns a StructuredGrid with zones, each containing their own
|
||
columns, rows, and cells — ready for the frontend Excel-like editor.
|
||
"""
|
||
session = await get_session_db(session_id)
|
||
if not session:
|
||
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
||
|
||
try:
|
||
result = await _build_grid_core(session_id, session)
|
||
except ValueError as e:
|
||
raise HTTPException(status_code=400, detail=str(e))
|
||
|
||
# Persist to DB
|
||
await update_session_db(session_id, grid_editor_result=result)
|
||
|
||
logger.info(
|
||
"build-grid session %s: %d zones, %d cols, %d rows, %d cells, "
|
||
"%d boxes in %.2fs",
|
||
session_id,
|
||
len(result.get("zones", [])),
|
||
result.get("summary", {}).get("total_columns", 0),
|
||
result.get("summary", {}).get("total_rows", 0),
|
||
result.get("summary", {}).get("total_cells", 0),
|
||
result.get("boxes_detected", 0),
|
||
result.get("duration_seconds", 0),
|
||
)
|
||
|
||
return result
|
||
|
||
|
||
@router.post("/sessions/{session_id}/save-grid")
|
||
async def save_grid(session_id: str, request: Request):
|
||
"""Save edited grid data from the frontend Excel-like editor.
|
||
|
||
Receives the full StructuredGrid with user edits (text changes,
|
||
formatting changes like bold columns, header rows, etc.) and
|
||
persists it to the session's grid_editor_result.
|
||
"""
|
||
session = await get_session_db(session_id)
|
||
if not session:
|
||
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
||
|
||
body = await request.json()
|
||
|
||
# Validate basic structure
|
||
if "zones" not in body:
|
||
raise HTTPException(status_code=400, detail="Missing 'zones' in request body")
|
||
|
||
# Preserve metadata from the original build
|
||
existing = session.get("grid_editor_result") or {}
|
||
result = {
|
||
"session_id": session_id,
|
||
"image_width": body.get("image_width", existing.get("image_width", 0)),
|
||
"image_height": body.get("image_height", existing.get("image_height", 0)),
|
||
"zones": body["zones"],
|
||
"boxes_detected": body.get("boxes_detected", existing.get("boxes_detected", 0)),
|
||
"summary": body.get("summary", existing.get("summary", {})),
|
||
"formatting": body.get("formatting", existing.get("formatting", {})),
|
||
"duration_seconds": existing.get("duration_seconds", 0),
|
||
"edited": True,
|
||
}
|
||
|
||
await update_session_db(session_id, grid_editor_result=result)
|
||
|
||
logger.info("save-grid session %s: %d zones saved", session_id, len(body["zones"]))
|
||
|
||
return {"session_id": session_id, "saved": True}
|
||
|
||
|
||
@router.get("/sessions/{session_id}/grid-editor")
|
||
async def get_grid(session_id: str):
|
||
"""Retrieve the current grid editor state for a session."""
|
||
session = await get_session_db(session_id)
|
||
if not session:
|
||
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
||
|
||
result = session.get("grid_editor_result")
|
||
if not result:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail="No grid editor data. Run build-grid first.",
|
||
)
|
||
|
||
return result
|