Restructure: Move grid_* + vocab_* into packages (klausur-service)
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 28s
CI / test-go-edu-search (push) Successful in 29s
CI / test-python-klausur (push) Failing after 2m31s
CI / test-python-agent-core (push) Successful in 20s
CI / test-nodejs-website (push) Successful in 23s

grid/ package (16 files):
  grid/build/   — core, zones, cleanup, text_ops, cell_ops, finalize
  grid/editor/  — api, helpers, columns, filters, headers, zones

vocab/ package (10 files):
  vocab/worksheet/ — api, models, extraction, generation, ocr, upload, analysis, compare
  vocab/           — session_store, learn_bridge

26 backward-compat shims. Internal imports relative. RAG untouched.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-04-25 21:30:20 +02:00
parent 098a2ff092
commit 59c400b9aa
58 changed files with 8803 additions and 8659 deletions
@@ -0,0 +1,15 @@
"""
Grid Editor sub-package — FastAPI endpoints and helper functions.
Modules:
- api — barrel re-export (combined router + _build_grid_core)
- api_grid — build-grid, save-grid, get-grid endpoints
- api_gutter — gutter-repair endpoints
- api_box — build-box-grids endpoints
- api_unified — build-unified-grid endpoints
- helpers — barrel re-export of all helper symbols
- columns — column detection, cross-column splitting
- filters — word/zone filtering, border ghosts
- headers — header/heading detection, colspan detection
- zones — vertical dividers, zone splitting/merging
"""
@@ -0,0 +1,31 @@
"""
Grid Editor API — barrel re-export.
The actual endpoints live in:
- grid_editor_api_grid.py (build-grid, rerun-ocr, save-grid, get-grid)
- grid_editor_api_gutter.py (gutter-repair, gutter-repair/apply)
- grid_editor_api_box.py (build-box-grids)
- grid_editor_api_unified.py (build-unified-grid, unified-grid)
This module re-exports the combined router and key symbols so that
existing `from grid_editor_api import router` / `from grid_editor_api import _build_grid_core`
continue to work unchanged.
"""
from fastapi import APIRouter
from .api_grid import router as _grid_router
from .api_gutter import router as _gutter_router
from .api_box import router as _box_router
from .api_unified import router as _unified_router
# Re-export _build_grid_core so callers that do
# `from grid_editor_api import _build_grid_core` keep working.
from grid.build.core import _build_grid_core # noqa: F401
# Merge all sub-routers into one combined router
router = APIRouter()
router.include_router(_grid_router)
router.include_router(_gutter_router)
router.include_router(_box_router)
router.include_router(_unified_router)
@@ -0,0 +1,177 @@
"""
Grid Editor API — box-grid-review endpoints.
"""
import logging
from fastapi import APIRouter, HTTPException, Request
from .filters import _words_in_zone
from ocr_pipeline_session_store import (
get_session_db,
update_session_db,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["grid-editor"])
@router.post("/sessions/{session_id}/build-box-grids")
async def build_box_grids(session_id: str, request: Request):
"""Rebuild grid structure for all detected boxes with layout-aware detection.
Uses structure_result.boxes (from Step 7) as the source of box coordinates,
and raw_paddle_words as OCR word source. Creates or updates box zones in
the grid_editor_result.
Optional body: { "overrides": { "0": "bullet_list" } }
Maps box_index -> forced layout_type.
"""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
grid_data = session.get("grid_editor_result")
if not grid_data:
raise HTTPException(status_code=400, detail="No grid data. Run build-grid first.")
# Get raw OCR words (with top/left/width/height keys)
word_result = session.get("word_result") or {}
all_words = word_result.get("raw_paddle_words") or word_result.get("raw_tesseract_words") or []
if not all_words:
raise HTTPException(status_code=400, detail="No raw OCR words available.")
# Get detected boxes from structure_result
structure_result = session.get("structure_result") or {}
gt = session.get("ground_truth") or {}
if not structure_result:
structure_result = gt.get("structure_result") or {}
detected_boxes = structure_result.get("boxes") or []
if not detected_boxes:
return {"session_id": session_id, "box_zones_rebuilt": 0, "spell_fixes": 0, "message": "No boxes detected"}
# Filter out false-positive boxes in header/footer margins.
img_h_for_filter = grid_data.get("image_height", 0) or word_result.get("image_height", 0)
if img_h_for_filter > 0:
margin_frac = 0.07 # 7% of image height
margin_top = img_h_for_filter * margin_frac
margin_bottom = img_h_for_filter * (1 - margin_frac)
filtered = []
for box in detected_boxes:
by = box.get("y", 0)
bh = box.get("h", 0)
box_center_y = by + bh / 2
if box_center_y < margin_top or box_center_y > margin_bottom:
logger.info("build-box-grids: skipping header/footer box at y=%d h=%d (center=%.0f, margins=%.0f/%.0f)",
by, bh, box_center_y, margin_top, margin_bottom)
continue
filtered.append(box)
detected_boxes = filtered
body = {}
try:
body = await request.json()
except Exception:
pass
layout_overrides = body.get("overrides", {})
from cv_box_layout import build_box_zone_grid
img_w = grid_data.get("image_width", 0) or word_result.get("image_width", 0)
img_h = grid_data.get("image_height", 0) or word_result.get("image_height", 0)
zones = grid_data.get("zones", [])
# Find highest existing zone_index
max_zone_idx = max((z.get("zone_index", 0) for z in zones), default=-1)
# Remove old box zones (we'll rebuild them)
zones = [z for z in zones if z.get("zone_type") != "box"]
box_count = 0
spell_fixes = 0
for box_idx, box in enumerate(detected_boxes):
bx = box.get("x", 0)
by = box.get("y", 0)
bw = box.get("w", 0)
bh = box.get("h", 0)
if bw <= 0 or bh <= 0:
continue
# Filter raw OCR words inside this box
zone_words = _words_in_zone(all_words, by, bh, bx, bw)
if not zone_words:
logger.info("Box %d: no words found in bbox (%d,%d,%d,%d)", box_idx, bx, by, bw, bh)
continue
zone_idx = max_zone_idx + 1 + box_idx
forced_layout = layout_overrides.get(str(box_idx))
# Build box grid
box_grid = build_box_zone_grid(
zone_words, bx, by, bw, bh,
zone_idx, img_w, img_h,
layout_type=forced_layout,
)
# Apply SmartSpellChecker to all box cells
try:
from smart_spell import SmartSpellChecker
ssc = SmartSpellChecker()
for cell in box_grid.get("cells", []):
text = cell.get("text", "")
if not text:
continue
result = ssc.correct_text(text, lang="auto")
if result.changed:
cell["text"] = result.corrected
spell_fixes += 1
except ImportError:
pass
# Build zone entry
zone_entry = {
"zone_index": zone_idx,
"zone_type": "box",
"bbox_px": {"x": bx, "y": by, "w": bw, "h": bh},
"bbox_pct": {
"x": round(bx / img_w * 100, 2) if img_w else 0,
"y": round(by / img_h * 100, 2) if img_h else 0,
"w": round(bw / img_w * 100, 2) if img_w else 0,
"h": round(bh / img_h * 100, 2) if img_h else 0,
},
"border": None,
"word_count": len(zone_words),
"columns": box_grid["columns"],
"rows": box_grid["rows"],
"cells": box_grid["cells"],
"header_rows": box_grid.get("header_rows", []),
"box_layout_type": box_grid.get("box_layout_type", "flowing"),
"box_grid_reviewed": False,
"box_bg_color": box.get("bg_color_name", ""),
"box_bg_hex": box.get("bg_color_hex", ""),
}
zones.append(zone_entry)
box_count += 1
# Sort zones by y-position for correct reading order
zones.sort(key=lambda z: z.get("bbox_px", {}).get("y", 0))
grid_data["zones"] = zones
await update_session_db(session_id, grid_editor_result=grid_data)
logger.info(
"build-box-grids session %s: %d boxes processed (%d words spell-fixed) from %d detected",
session_id, box_count, spell_fixes, len(detected_boxes),
)
return {
"session_id": session_id,
"box_zones_rebuilt": box_count,
"total_detected_boxes": len(detected_boxes),
"spell_fixes": spell_fixes,
"zones": zones,
}
@@ -0,0 +1,334 @@
"""
Grid Editor API — grid build, save, and retrieve endpoints.
"""
import logging
from fastapi import APIRouter, HTTPException, Query, Request
from grid.build.core import _build_grid_core
from ocr_pipeline_session_store import (
get_session_db,
update_session_db,
)
from ocr_pipeline_common import (
_cache,
_load_session_to_cache,
_get_cached,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["grid-editor"])
@router.post("/sessions/{session_id}/build-grid")
async def build_grid(
session_id: str,
ipa_mode: str = Query("auto", pattern="^(auto|all|de|en|none)$"),
syllable_mode: str = Query("auto", pattern="^(auto|all|de|en|none)$"),
enhance: bool = Query(True, description="Step 3: CLAHE + denoise for degraded scans"),
max_cols: int = Query(0, description="Step 2: Max column count (0=unlimited)"),
min_conf: int = Query(0, description="Step 1: Min OCR confidence (0=auto)"),
):
"""Build a structured, zone-aware grid from existing Kombi word results.
Requires that paddle-kombi or rapid-kombi has already been run on the session.
Uses the image for box detection and the word positions for grid structuring.
Query params:
ipa_mode: "auto" (only when English IPA detected), "all" (force), "none" (skip)
syllable_mode: "auto" (only when original has dividers), "all" (force), "none" (skip)
Returns a StructuredGrid with zones, each containing their own
columns, rows, and cells — ready for the frontend Excel-like editor.
"""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
try:
result = await _build_grid_core(
session_id, session,
ipa_mode=ipa_mode, syllable_mode=syllable_mode,
enhance=enhance,
max_columns=max_cols if max_cols > 0 else None,
min_conf=min_conf if min_conf > 0 else None,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Save automatic grid snapshot for later comparison with manual corrections
# Lazy import to avoid circular dependency with ocr_pipeline_regression
from ocr_pipeline_regression import _build_reference_snapshot
wr = session.get("word_result") or {}
engine = wr.get("ocr_engine", "")
if engine in ("kombi", "rapid_kombi"):
auto_pipeline = "kombi"
elif engine == "paddle_direct":
auto_pipeline = "paddle-direct"
else:
auto_pipeline = "pipeline"
auto_snapshot = _build_reference_snapshot(result, pipeline=auto_pipeline)
gt = session.get("ground_truth") or {}
gt["auto_grid_snapshot"] = auto_snapshot
# Persist to DB and advance current_step to 11 (reconstruction complete)
await update_session_db(session_id, grid_editor_result=result, ground_truth=gt, current_step=11)
logger.info(
"build-grid session %s: %d zones, %d cols, %d rows, %d cells, "
"%d boxes in %.2fs",
session_id,
len(result.get("zones", [])),
result.get("summary", {}).get("total_columns", 0),
result.get("summary", {}).get("total_rows", 0),
result.get("summary", {}).get("total_cells", 0),
result.get("boxes_detected", 0),
result.get("duration_seconds", 0),
)
return result
@router.post("/sessions/{session_id}/rerun-ocr-and-build-grid")
async def rerun_ocr_and_build_grid(
session_id: str,
ipa_mode: str = Query("auto", pattern="^(auto|all|de|en|none)$"),
syllable_mode: str = Query("auto", pattern="^(auto|all|de|en|none)$"),
enhance: bool = Query(True, description="Step 3: CLAHE + denoise for degraded scans"),
max_cols: int = Query(0, description="Step 2: Max column count (0=unlimited)"),
min_conf: int = Query(0, description="Step 1: Min OCR confidence (0=auto)"),
vision_fusion: bool = Query(False, description="Step 4: Vision-LLM fusion for degraded scans"),
doc_category: str = Query("", description="Document type for Vision-LLM prompt context"),
):
"""Re-run OCR with quality settings, then rebuild the grid.
Unlike build-grid (which only rebuilds from existing words),
this endpoint re-runs the full OCR pipeline on the cropped image
with optional CLAHE enhancement, then builds the grid.
Steps executed: Image Enhancement -> OCR -> Grid Build
"""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
import time as _time
t0 = _time.time()
# 1. Load the cropped/dewarped image from cache or session
if session_id not in _cache:
await _load_session_to_cache(session_id)
cached = _get_cached(session_id)
dewarped_bgr = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr")
if dewarped_bgr is None:
raise HTTPException(status_code=400, detail="No cropped/dewarped image available. Run preprocessing steps first.")
img_h, img_w = dewarped_bgr.shape[:2]
ocr_input = dewarped_bgr.copy()
# 2. Scan quality assessment
scan_quality_info = {}
try:
from scan_quality import score_scan_quality
quality_report = score_scan_quality(ocr_input)
scan_quality_info = quality_report.to_dict()
actual_min_conf = min_conf if min_conf > 0 else quality_report.recommended_min_conf
except Exception as e:
logger.warning(f"rerun-ocr: scan quality failed: {e}")
actual_min_conf = min_conf if min_conf > 0 else 40
# 3. Image enhancement (Step 3)
is_degraded = scan_quality_info.get("is_degraded", False)
if enhance and is_degraded:
try:
from ocr_image_enhance import enhance_for_ocr
ocr_input = enhance_for_ocr(ocr_input, is_degraded=True)
logger.info("rerun-ocr: CLAHE enhancement applied")
except Exception as e:
logger.warning(f"rerun-ocr: enhancement failed: {e}")
# 4. Run dual-engine OCR
from PIL import Image
import pytesseract
# RapidOCR
rapid_words = []
try:
from cv_ocr_engines import ocr_region_rapid
from cv_vocab_types import PageRegion
full_region = PageRegion(type="full_page", x=0, y=0, width=img_w, height=img_h)
rapid_words = ocr_region_rapid(ocr_input, full_region) or []
except Exception as e:
logger.warning(f"rerun-ocr: RapidOCR failed: {e}")
# Tesseract
pil_img = Image.fromarray(ocr_input[:, :, ::-1])
data = pytesseract.image_to_data(pil_img, lang='eng+deu', config='--psm 6 --oem 3', output_type=pytesseract.Output.DICT)
tess_words = []
for i in range(len(data["text"])):
text = (data["text"][i] or "").strip()
conf_raw = str(data["conf"][i])
conf = int(conf_raw) if conf_raw.lstrip("-").isdigit() else -1
if not text or conf < actual_min_conf:
continue
tess_words.append({
"text": text, "left": data["left"][i], "top": data["top"][i],
"width": data["width"][i], "height": data["height"][i], "conf": conf,
})
# 5. Merge OCR results
from ocr_pipeline_ocr_merge import _split_paddle_multi_words, _merge_paddle_tesseract, _deduplicate_words
rapid_split = _split_paddle_multi_words(rapid_words) if rapid_words else []
if rapid_split or tess_words:
merged_words = _merge_paddle_tesseract(rapid_split, tess_words)
merged_words = _deduplicate_words(merged_words)
else:
merged_words = tess_words
# 6. Store updated word_result in session
cells_for_storage = [{"text": w["text"], "left": w["left"], "top": w["top"],
"width": w["width"], "height": w["height"], "conf": w.get("conf", 0)}
for w in merged_words]
word_result = {
"cells": [{"text": " ".join(w["text"] for w in merged_words),
"word_boxes": cells_for_storage}],
"image_width": img_w,
"image_height": img_h,
"ocr_engine": "rapid_kombi",
"word_count": len(merged_words),
"raw_paddle_words": rapid_words,
}
# 6b. Vision-LLM Fusion (Step 4) — correct OCR using Vision model
vision_applied = False
if vision_fusion:
try:
from vision_ocr_fusion import vision_fuse_ocr
category = doc_category or session.get("document_category") or "vokabelseite"
logger.info(f"rerun-ocr: running Vision-LLM fusion (category={category})")
merged_words = await vision_fuse_ocr(ocr_input, merged_words, category)
vision_applied = True
# Rebuild storage from fused words
cells_for_storage = [{"text": w["text"], "left": w["left"], "top": w["top"],
"width": w["width"], "height": w["height"], "conf": w.get("conf", 0)}
for w in merged_words]
word_result["cells"] = [{"text": " ".join(w["text"] for w in merged_words),
"word_boxes": cells_for_storage}]
word_result["word_count"] = len(merged_words)
word_result["ocr_engine"] = "vision_fusion"
except Exception as e:
logger.warning(f"rerun-ocr: Vision-LLM fusion failed: {e}")
await update_session_db(session_id, word_result=word_result)
# Reload session with updated word_result
session = await get_session_db(session_id)
ocr_duration = _time.time() - t0
logger.info(
"rerun-ocr session %s: %d words (rapid=%d, tess=%d, merged=%d) in %.1fs "
"(enhance=%s, min_conf=%d, quality=%s)",
session_id, len(merged_words), len(rapid_words), len(tess_words),
len(merged_words), ocr_duration, enhance, actual_min_conf,
scan_quality_info.get("quality_pct", "?"),
)
# 7. Build grid from new words
try:
result = await _build_grid_core(
session_id, session,
ipa_mode=ipa_mode, syllable_mode=syllable_mode,
enhance=enhance,
max_columns=max_cols if max_cols > 0 else None,
min_conf=min_conf if min_conf > 0 else None,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Persist grid
await update_session_db(session_id, grid_editor_result=result, current_step=11)
# Add quality info to response
result["scan_quality"] = scan_quality_info
result["ocr_stats"] = {
"rapid_words": len(rapid_words),
"tess_words": len(tess_words),
"merged_words": len(merged_words),
"min_conf_used": actual_min_conf,
"enhance_applied": enhance and is_degraded,
"vision_fusion_applied": vision_applied,
"document_category": doc_category or session.get("document_category", ""),
"ocr_duration_seconds": round(ocr_duration, 1),
}
total_duration = _time.time() - t0
logger.info(
"rerun-ocr+build-grid session %s: %d zones, %d cols, %d cells in %.1fs",
session_id,
len(result.get("zones", [])),
result.get("summary", {}).get("total_columns", 0),
result.get("summary", {}).get("total_cells", 0),
total_duration,
)
return result
@router.post("/sessions/{session_id}/save-grid")
async def save_grid(session_id: str, request: Request):
"""Save edited grid data from the frontend Excel-like editor.
Receives the full StructuredGrid with user edits (text changes,
formatting changes like bold columns, header rows, etc.) and
persists it to the session's grid_editor_result.
"""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
body = await request.json()
# Validate basic structure
if "zones" not in body:
raise HTTPException(status_code=400, detail="Missing 'zones' in request body")
# Preserve metadata from the original build
existing = session.get("grid_editor_result") or {}
result = {
"session_id": session_id,
"image_width": body.get("image_width", existing.get("image_width", 0)),
"image_height": body.get("image_height", existing.get("image_height", 0)),
"zones": body["zones"],
"boxes_detected": body.get("boxes_detected", existing.get("boxes_detected", 0)),
"summary": body.get("summary", existing.get("summary", {})),
"formatting": body.get("formatting", existing.get("formatting", {})),
"duration_seconds": existing.get("duration_seconds", 0),
"edited": True,
}
await update_session_db(session_id, grid_editor_result=result, current_step=11)
logger.info("save-grid session %s: %d zones saved", session_id, len(body["zones"]))
return {"session_id": session_id, "saved": True}
@router.get("/sessions/{session_id}/grid-editor")
async def get_grid(session_id: str):
"""Retrieve the current grid editor state for a session."""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
result = session.get("grid_editor_result")
if not result:
raise HTTPException(
status_code=404,
detail="No grid editor data. Run build-grid first.",
)
return result
@@ -0,0 +1,110 @@
"""
Grid Editor API — gutter repair endpoints.
"""
import logging
from fastapi import APIRouter, HTTPException, Request
from ocr_pipeline_session_store import (
get_session_db,
update_session_db,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["grid-editor"])
@router.post("/sessions/{session_id}/gutter-repair")
async def gutter_repair(session_id: str):
"""Analyse grid for gutter-edge OCR errors and return repair suggestions.
Detects:
- Words truncated/blurred at the book binding (spell_fix)
- Words split across rows with missing hyphen chars (hyphen_join)
"""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
grid_data = session.get("grid_editor_result")
if not grid_data:
raise HTTPException(
status_code=400,
detail="No grid data. Run build-grid first.",
)
from cv_gutter_repair import analyse_grid_for_gutter_repair
image_width = grid_data.get("image_width", 0)
result = analyse_grid_for_gutter_repair(grid_data, image_width=image_width)
# Persist suggestions in ground_truth.gutter_repair (avoids DB migration)
gt = session.get("ground_truth") or {}
gt["gutter_repair"] = result
await update_session_db(session_id, ground_truth=gt)
logger.info(
"gutter-repair session %s: %d suggestions in %.2fs",
session_id,
result.get("stats", {}).get("suggestions_found", 0),
result.get("duration_seconds", 0),
)
return result
@router.post("/sessions/{session_id}/gutter-repair/apply")
async def gutter_repair_apply(session_id: str, request: Request):
"""Apply accepted gutter repair suggestions to the grid.
Body: { "accepted": ["suggestion_id_1", "suggestion_id_2", ...] }
"""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
grid_data = session.get("grid_editor_result")
if not grid_data:
raise HTTPException(status_code=400, detail="No grid data.")
gt = session.get("ground_truth") or {}
gutter_result = gt.get("gutter_repair")
if not gutter_result:
raise HTTPException(
status_code=400,
detail="No gutter repair data. Run gutter-repair first.",
)
body = await request.json()
accepted_ids = body.get("accepted", [])
if not accepted_ids:
return {"applied_count": 0, "changes": []}
# text_overrides: { suggestion_id: "alternative_text" }
# Allows the user to pick a different correction from the alternatives list
text_overrides = body.get("text_overrides", {})
from cv_gutter_repair import apply_gutter_suggestions
suggestions = gutter_result.get("suggestions", [])
# Apply user-selected alternatives before passing to apply
for s in suggestions:
sid = s.get("id", "")
if sid in text_overrides and text_overrides[sid]:
s["suggested_text"] = text_overrides[sid]
result = apply_gutter_suggestions(grid_data, accepted_ids, suggestions)
# Save updated grid back to session
await update_session_db(session_id, grid_editor_result=grid_data)
logger.info(
"gutter-repair/apply session %s: %d changes applied",
session_id,
result.get("applied_count", 0),
)
return result
@@ -0,0 +1,71 @@
"""
Grid Editor API unified grid endpoints.
"""
import logging
from fastapi import APIRouter, HTTPException
from ocr_pipeline_session_store import (
get_session_db,
update_session_db,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["grid-editor"])
@router.post("/sessions/{session_id}/build-unified-grid")
async def build_unified_grid_endpoint(session_id: str):
"""Build a single-zone unified grid merging content + box zones.
Takes the existing multi-zone grid_editor_result and produces a
unified grid where boxes are integrated into the main row sequence.
Persists as unified_grid_result (preserves original multi-zone data).
"""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
grid_data = session.get("grid_editor_result")
if not grid_data:
raise HTTPException(status_code=400, detail="No grid data. Run build-grid first.")
from unified_grid import build_unified_grid
result = build_unified_grid(
zones=grid_data.get("zones", []),
image_width=grid_data.get("image_width", 0),
image_height=grid_data.get("image_height", 0),
layout_metrics=grid_data.get("layout_metrics", {}),
)
# Persist as separate field (don't overwrite original multi-zone grid)
await update_session_db(session_id, unified_grid_result=result)
logger.info(
"build-unified-grid session %s: %d rows, %d cells",
session_id,
result.get("summary", {}).get("total_rows", 0),
result.get("summary", {}).get("total_cells", 0),
)
return result
@router.get("/sessions/{session_id}/unified-grid")
async def get_unified_grid(session_id: str):
"""Retrieve the unified grid for a session."""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
result = session.get("unified_grid_result")
if not result:
raise HTTPException(
status_code=404,
detail="No unified grid. Run build-unified-grid first.",
)
return result
@@ -0,0 +1,492 @@
"""
Grid Editor column detection, cross-column splitting, marker merging.
Split from grid_editor_helpers.py for maintainability.
All functions are pure computation no HTTP, DB, or session side effects.
Lizenz: Apache 2.0 (kommerziell nutzbar)
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import logging
import re
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Cross-column word splitting
# ---------------------------------------------------------------------------
_spell_cache: Optional[Any] = None
_spell_loaded = False
def _is_recognized_word(text: str) -> bool:
"""Check if *text* is a recognized German or English word.
Uses the spellchecker library (same as cv_syllable_detect.py).
Returns True for real words like "oder", "Kabel", "Zeitung".
Returns False for OCR merge artifacts like "sichzie", "dasZimmer".
"""
global _spell_cache, _spell_loaded
if not text or len(text) < 2:
return False
if not _spell_loaded:
_spell_loaded = True
try:
from spellchecker import SpellChecker
_spell_cache = SpellChecker(language="de")
except Exception:
pass
if _spell_cache is None:
return False
return text.lower() in _spell_cache
def _split_cross_column_words(
words: List[Dict],
columns: List[Dict],
) -> List[Dict]:
"""Split word boxes that span across column boundaries.
When OCR merges adjacent words from different columns (e.g. "sichzie"
spanning Col 1 and Col 2, or "dasZimmer" crossing the boundary),
split the word box at the column boundary so each piece is assigned
to the correct column.
Only splits when:
- The word has significant overlap (>15% of its width) on both sides
- AND the word is not a recognized real word (OCR merge artifact), OR
the word contains a case transition (lowercase->uppercase) near the
boundary indicating two merged words like "dasZimmer".
"""
if len(columns) < 2:
return words
# Column boundaries = midpoints between adjacent column edges
boundaries = []
for i in range(len(columns) - 1):
boundary = (columns[i]["x_max"] + columns[i + 1]["x_min"]) / 2
boundaries.append(boundary)
new_words: List[Dict] = []
split_count = 0
for w in words:
w_left = w["left"]
w_width = w["width"]
w_right = w_left + w_width
text = (w.get("text") or "").strip()
if not text or len(text) < 4 or w_width < 10:
new_words.append(w)
continue
# Find the first boundary this word straddles significantly
split_boundary = None
for b in boundaries:
if w_left < b < w_right:
left_part = b - w_left
right_part = w_right - b
# Both sides must have at least 15% of the word width
if left_part > w_width * 0.15 and right_part > w_width * 0.15:
split_boundary = b
break
if split_boundary is None:
new_words.append(w)
continue
# Compute approximate split position in the text.
left_width = split_boundary - w_left
split_ratio = left_width / w_width
approx_pos = len(text) * split_ratio
# Strategy 1: look for a case transition (lowercase->uppercase) near
# the approximate split point — e.g. "dasZimmer" splits at 'Z'.
split_char = None
search_lo = max(1, int(approx_pos) - 3)
search_hi = min(len(text), int(approx_pos) + 2)
for i in range(search_lo, search_hi):
if text[i - 1].islower() and text[i].isupper():
split_char = i
break
# Strategy 2: if no case transition, only split if the whole word
# is NOT a real word (i.e. it's an OCR merge artifact like "sichzie").
# Real words like "oder", "Kabel", "Zeitung" must not be split.
if split_char is None:
clean = re.sub(r"[,;:.!?]+$", "", text) # strip trailing punct
if _is_recognized_word(clean):
new_words.append(w)
continue
# Not a real word — use floor of proportional position
split_char = max(1, min(len(text) - 1, int(approx_pos)))
left_text = text[:split_char].rstrip()
right_text = text[split_char:].lstrip()
if len(left_text) < 2 or len(right_text) < 2:
new_words.append(w)
continue
right_width = w_width - round(left_width)
new_words.append({
**w,
"text": left_text,
"width": round(left_width),
})
new_words.append({
**w,
"text": right_text,
"left": round(split_boundary),
"width": right_width,
})
split_count += 1
logger.info(
"split cross-column word %r -> %r + %r at boundary %.0f",
text, left_text, right_text, split_boundary,
)
if split_count:
logger.info("split %d cross-column word(s)", split_count)
return new_words
def _cluster_columns_by_alignment(
words: List[Dict],
zone_w: int,
rows: List[Dict],
) -> List[Dict[str, Any]]:
"""Detect columns by clustering left-edge alignment across rows.
Hybrid approach:
1. Group words by row, find "group start" positions within each row
(words preceded by a large gap or first word in row)
2. Cluster group-start left-edges by X-proximity across rows
3. Filter by row coverage (how many rows have a group start here)
4. Merge nearby clusters
5. Build column boundaries
This filters out mid-phrase word positions (e.g. IPA transcriptions,
second words in multi-word entries) by only considering positions
where a new word group begins within a row.
"""
if not words or not rows:
return []
total_rows = len(rows)
if total_rows == 0:
return []
# --- Group words by row ---
row_words: Dict[int, List[Dict]] = {}
for w in words:
y_center = w["top"] + w["height"] / 2
best = min(rows, key=lambda r: abs(r["y_center"] - y_center))
row_words.setdefault(best["index"], []).append(w)
# --- Compute adaptive gap threshold for group-start detection ---
all_gaps: List[float] = []
for ri, rw_list in row_words.items():
sorted_rw = sorted(rw_list, key=lambda w: w["left"])
for i in range(len(sorted_rw) - 1):
right = sorted_rw[i]["left"] + sorted_rw[i]["width"]
gap = sorted_rw[i + 1]["left"] - right
if gap > 0:
all_gaps.append(gap)
if all_gaps:
sorted_gaps = sorted(all_gaps)
median_gap = sorted_gaps[len(sorted_gaps) // 2]
heights = [w["height"] for w in words if w.get("height", 0) > 0]
median_h = sorted(heights)[len(heights) // 2] if heights else 25
# For small word counts (boxes, sub-zones): PaddleOCR returns
# multi-word blocks, so ALL inter-word gaps are potential column
# boundaries. Use a low threshold based on word height — any gap
# wider than ~1x median word height is a column separator.
if len(words) <= 60:
gap_threshold = max(median_h * 1.0, 25)
logger.info(
"alignment columns (small zone): gap_threshold=%.0f "
"(median_h=%.0f, %d words, %d gaps: %s)",
gap_threshold, median_h, len(words), len(sorted_gaps),
[int(g) for g in sorted_gaps[:10]],
)
else:
# Standard approach for large zones (full pages)
gap_threshold = max(median_gap * 3, median_h * 1.5, 30)
# Cap at 25% of zone width
max_gap = zone_w * 0.25
if gap_threshold > max_gap > 30:
logger.info("alignment columns: capping gap_threshold %.0f -> %.0f (25%% of zone_w=%d)", gap_threshold, max_gap, zone_w)
gap_threshold = max_gap
else:
gap_threshold = 50
# --- Find group-start positions (left-edges that begin a new column) ---
start_positions: List[tuple] = [] # (left_edge, row_index)
for ri, rw_list in row_words.items():
sorted_rw = sorted(rw_list, key=lambda w: w["left"])
# First word in row is always a group start
start_positions.append((sorted_rw[0]["left"], ri))
for i in range(1, len(sorted_rw)):
right_prev = sorted_rw[i - 1]["left"] + sorted_rw[i - 1]["width"]
gap = sorted_rw[i]["left"] - right_prev
if gap >= gap_threshold:
start_positions.append((sorted_rw[i]["left"], ri))
start_positions.sort(key=lambda x: x[0])
logger.info(
"alignment columns: %d group-start positions from %d words "
"(gap_threshold=%.0f, %d rows)",
len(start_positions), len(words), gap_threshold, total_rows,
)
if not start_positions:
x_min = min(w["left"] for w in words)
x_max = max(w["left"] + w["width"] for w in words)
return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}]
# --- Cluster group-start positions by X-proximity ---
tolerance = max(10, int(zone_w * 0.01))
clusters: List[Dict[str, Any]] = []
cur_edges = [start_positions[0][0]]
cur_rows = {start_positions[0][1]}
for left, row_idx in start_positions[1:]:
if left - cur_edges[-1] <= tolerance:
cur_edges.append(left)
cur_rows.add(row_idx)
else:
clusters.append({
"mean_x": int(sum(cur_edges) / len(cur_edges)),
"min_edge": min(cur_edges),
"max_edge": max(cur_edges),
"count": len(cur_edges),
"distinct_rows": len(cur_rows),
"row_coverage": len(cur_rows) / total_rows,
})
cur_edges = [left]
cur_rows = {row_idx}
clusters.append({
"mean_x": int(sum(cur_edges) / len(cur_edges)),
"min_edge": min(cur_edges),
"max_edge": max(cur_edges),
"count": len(cur_edges),
"distinct_rows": len(cur_rows),
"row_coverage": len(cur_rows) / total_rows,
})
# --- Filter by row coverage ---
# These thresholds must be high enough to avoid false columns in flowing
# text (random inter-word gaps) while still detecting real columns in
# vocabulary worksheets (which typically have >80% row coverage).
MIN_COVERAGE_PRIMARY = 0.35
MIN_COVERAGE_SECONDARY = 0.12
MIN_WORDS_SECONDARY = 4
MIN_DISTINCT_ROWS = 3
# Content boundary for left-margin detection
content_x_min = min(w["left"] for w in words)
content_x_max = max(w["left"] + w["width"] for w in words)
content_span = content_x_max - content_x_min
primary = [
c for c in clusters
if c["row_coverage"] >= MIN_COVERAGE_PRIMARY
and c["distinct_rows"] >= MIN_DISTINCT_ROWS
]
primary_ids = {id(c) for c in primary}
secondary = [
c for c in clusters
if id(c) not in primary_ids
and c["row_coverage"] >= MIN_COVERAGE_SECONDARY
and c["count"] >= MIN_WORDS_SECONDARY
and c["distinct_rows"] >= MIN_DISTINCT_ROWS
]
# Tertiary: narrow left-margin columns (page refs, markers) that have
# too few rows for secondary but are clearly left-aligned and separated
# from the main content. These appear at the far left or far right and
# have a large gap to the nearest significant cluster.
used_ids = {id(c) for c in primary} | {id(c) for c in secondary}
sig_xs = [c["mean_x"] for c in primary + secondary]
# Tertiary: clusters that are clearly to the LEFT of the first
# significant column (or RIGHT of the last). If words consistently
# start at a position left of the established first column boundary,
# they MUST be a separate column — regardless of how few rows they
# cover. The only requirement is a clear spatial gap.
MIN_COVERAGE_TERTIARY = 0.02 # at least 1 row effectively
tertiary = []
for c in clusters:
if id(c) in used_ids:
continue
if c["distinct_rows"] < 1:
continue
if c["row_coverage"] < MIN_COVERAGE_TERTIARY:
continue
# Must be near left or right content margin (within 15%)
rel_pos = (c["mean_x"] - content_x_min) / content_span if content_span else 0.5
if not (rel_pos < 0.15 or rel_pos > 0.85):
continue
# Must have significant gap to nearest significant cluster
if sig_xs:
min_dist = min(abs(c["mean_x"] - sx) for sx in sig_xs)
if min_dist < max(30, content_span * 0.02):
continue
tertiary.append(c)
if tertiary:
for c in tertiary:
logger.info(
" tertiary (margin) cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)",
c["mean_x"], c["min_edge"], c["max_edge"],
c["count"], c["distinct_rows"], c["row_coverage"] * 100,
)
significant = sorted(primary + secondary + tertiary, key=lambda c: c["mean_x"])
for c in significant:
logger.info(
" significant cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)",
c["mean_x"], c["min_edge"], c["max_edge"],
c["count"], c["distinct_rows"], c["row_coverage"] * 100,
)
logger.info(
"alignment columns: %d clusters, %d primary, %d secondary -> %d significant",
len(clusters), len(primary), len(secondary), len(significant),
)
if not significant:
# Fallback: single column covering all content
x_min = min(w["left"] for w in words)
x_max = max(w["left"] + w["width"] for w in words)
return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}]
# --- Merge nearby clusters ---
merge_distance = max(25, int(zone_w * 0.03))
merged = [significant[0].copy()]
for s in significant[1:]:
if s["mean_x"] - merged[-1]["mean_x"] < merge_distance:
prev = merged[-1]
total = prev["count"] + s["count"]
prev["mean_x"] = (
prev["mean_x"] * prev["count"] + s["mean_x"] * s["count"]
) // total
prev["count"] = total
prev["min_edge"] = min(prev["min_edge"], s["min_edge"])
prev["max_edge"] = max(prev["max_edge"], s["max_edge"])
prev["distinct_rows"] = max(prev["distinct_rows"], s["distinct_rows"])
else:
merged.append(s.copy())
logger.info(
"alignment columns: %d after merge (distance=%d)",
len(merged), merge_distance,
)
# --- Build column boundaries ---
margin = max(5, int(zone_w * 0.005))
content_x_min = min(w["left"] for w in words)
content_x_max = max(w["left"] + w["width"] for w in words)
columns: List[Dict[str, Any]] = []
for i, cluster in enumerate(merged):
x_min = max(content_x_min, cluster["min_edge"] - margin)
if i + 1 < len(merged):
x_max = merged[i + 1]["min_edge"] - margin
else:
x_max = content_x_max
columns.append({
"index": i,
"type": f"column_{i + 1}" if len(merged) > 1 else "column_text",
"x_min": x_min,
"x_max": x_max,
})
return columns
_MARKER_CHARS = set("*-+#>")
def _merge_inline_marker_columns(
columns: List[Dict],
words: List[Dict],
) -> List[Dict]:
"""Merge narrow marker columns (bullets, numbering) into adjacent text.
Bullet points (*, -) and numbering (1., 2.) create narrow columns
at the left edge of a zone. These are inline markers that indent text,
not real separate columns. Merge them with their right neighbour.
Does NOT merge columns containing alphabetic words like "to", "in",
"der", "die", "das" those are legitimate content columns.
"""
if len(columns) < 2:
return columns
merged: List[Dict] = []
skip: set = set()
for i, col in enumerate(columns):
if i in skip:
continue
# Find words in this column
col_words = [
w for w in words
if col["x_min"] <= w["left"] + w["width"] / 2 < col["x_max"]
]
col_width = col["x_max"] - col["x_min"]
# Narrow column with mostly short words -> MIGHT be inline markers
if col_words and col_width < 80:
avg_len = sum(len(w.get("text", "")) for w in col_words) / len(col_words)
if avg_len <= 2 and i + 1 < len(columns):
# Check if words are actual markers (symbols/numbers) vs
# real alphabetic words like "to", "in", "der", "die"
texts = [(w.get("text") or "").strip() for w in col_words]
alpha_count = sum(
1 for t in texts
if t and t[0].isalpha() and t not in _MARKER_CHARS
)
alpha_ratio = alpha_count / len(texts) if texts else 0
# If >=50% of words are alphabetic, this is a real column
if alpha_ratio >= 0.5:
logger.info(
" kept narrow column %d (w=%d, avg_len=%.1f, "
"alpha=%.0f%%) -- contains real words",
i, col_width, avg_len, alpha_ratio * 100,
)
else:
# Merge into next column
next_col = columns[i + 1].copy()
next_col["x_min"] = col["x_min"]
merged.append(next_col)
skip.add(i + 1)
logger.info(
" merged inline marker column %d (w=%d, avg_len=%.1f) "
"into column %d",
i, col_width, avg_len, i + 1,
)
continue
merged.append(col)
# Re-index
for i, col in enumerate(merged):
col["index"] = i
col["type"] = f"column_{i + 1}" if len(merged) > 1 else "column_text"
return merged
@@ -0,0 +1,402 @@
"""
Grid Editor word/zone filtering, border ghosts, decorative margins, footers.
Split from grid_editor_helpers.py for maintainability.
All functions are pure computation no HTTP, DB, or session side effects.
Lizenz: Apache 2.0 (kommerziell nutzbar)
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import logging
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
def _filter_border_strip_words(words: List[Dict]) -> Tuple[List[Dict], int]:
"""Remove page-border decoration strip words BEFORE column detection.
Scans from each page edge inward to find the first significant x-gap
(>30 px). If the edge cluster contains <15 % of total words, those
words are removed as border-strip artifacts (alphabet letters,
illustration fragments).
Must run BEFORE ``_build_zone_grid`` so that column detection only
sees real content words and doesn't produce inflated row counts.
"""
if len(words) < 10:
return words, 0
sorted_words = sorted(words, key=lambda w: w.get("left", 0))
total = len(sorted_words)
# -- Left-edge scan (running max right-edge) --
left_count = 0
running_right = 0
for gi in range(total - 1):
running_right = max(
running_right,
sorted_words[gi].get("left", 0) + sorted_words[gi].get("width", 0),
)
if sorted_words[gi + 1].get("left", 0) - running_right > 30:
left_count = gi + 1
break
# -- Right-edge scan (running min left) --
right_count = 0
running_left = sorted_words[-1].get("left", 0)
for gi in range(total - 1, 0, -1):
running_left = min(running_left, sorted_words[gi].get("left", 0))
prev_right = (
sorted_words[gi - 1].get("left", 0)
+ sorted_words[gi - 1].get("width", 0)
)
if running_left - prev_right > 30:
right_count = total - gi
break
# Validate candidate strip: real border decorations are mostly short
# words (alphabet letters like "A", "Bb", stray marks). Multi-word
# content like "der Ranzen" or "die Schals" (continuation of German
# translations) must NOT be removed.
def _is_decorative_strip(candidates: List[Dict]) -> bool:
if not candidates:
return False
short = sum(1 for w in candidates if len((w.get("text") or "").strip()) <= 2)
return short / len(candidates) >= 0.45
strip_ids: set = set()
if left_count > 0 and left_count / total < 0.20:
candidates = sorted_words[:left_count]
if _is_decorative_strip(candidates):
strip_ids = {id(w) for w in candidates}
elif right_count > 0 and right_count / total < 0.20:
candidates = sorted_words[total - right_count:]
if _is_decorative_strip(candidates):
strip_ids = {id(w) for w in candidates}
if not strip_ids:
return words, 0
return [w for w in words if id(w) not in strip_ids], len(strip_ids)
# Characters that are typically OCR artefacts from box border lines.
# Intentionally excludes ! (red markers) and . , ; (real punctuation).
_GRID_GHOST_CHARS = set("|1lI[](){}/\\-\u2014\u2013_~=+")
def _filter_border_ghosts(
words: List[Dict],
boxes: List,
) -> tuple:
"""Remove words sitting on box borders that are OCR artefacts.
Returns (filtered_words, removed_count).
"""
if not boxes or not words:
return words, 0
# Build border bands from detected boxes
x_bands: List[tuple] = []
y_bands: List[tuple] = []
for b in boxes:
bt = (
b.border_thickness
if hasattr(b, "border_thickness")
else b.get("border_thickness", 3)
)
# Skip borderless boxes (images/graphics) -- no border line to produce ghosts
if bt == 0:
continue
bx = b.x if hasattr(b, "x") else b.get("x", 0)
by = b.y if hasattr(b, "y") else b.get("y", 0)
bw = b.width if hasattr(b, "width") else b.get("w", b.get("width", 0))
bh = b.height if hasattr(b, "height") else b.get("h", b.get("height", 0))
margin = max(bt * 2, 10) + 6
x_bands.append((bx - margin, bx + margin))
x_bands.append((bx + bw - margin, bx + bw + margin))
y_bands.append((by - margin, by + margin))
y_bands.append((by + bh - margin, by + bh + margin))
def _is_ghost(w: Dict) -> bool:
text = (w.get("text") or "").strip()
if not text:
return False
# Check if any word edge (not just center) touches a border band
w_left = w["left"]
w_right = w["left"] + w["width"]
w_top = w["top"]
w_bottom = w["top"] + w["height"]
on_border = (
any(lo <= w_left <= hi or lo <= w_right <= hi for lo, hi in x_bands)
or any(lo <= w_top <= hi or lo <= w_bottom <= hi for lo, hi in y_bands)
)
if not on_border:
return False
if len(text) == 1 and text in _GRID_GHOST_CHARS:
return True
return False
filtered = [w for w in words if not _is_ghost(w)]
return filtered, len(words) - len(filtered)
def _flatten_word_boxes(cells: List[Dict]) -> List[Dict]:
"""Extract all word_boxes from cells into a flat list of word dicts."""
words: List[Dict] = []
for cell in cells:
for wb in cell.get("word_boxes") or []:
if wb.get("text", "").strip():
words.append({
"text": wb["text"],
"left": wb["left"],
"top": wb["top"],
"width": wb["width"],
"height": wb["height"],
"conf": wb.get("conf", 0),
})
return words
def _words_in_zone(
words: List[Dict],
zone_y: int,
zone_h: int,
zone_x: int,
zone_w: int,
) -> List[Dict]:
"""Filter words whose Y-center falls within a zone's bounds."""
zone_y_end = zone_y + zone_h
zone_x_end = zone_x + zone_w
result = []
for w in words:
cy = w["top"] + w["height"] / 2
cx = w["left"] + w["width"] / 2
if zone_y <= cy <= zone_y_end and zone_x <= cx <= zone_x_end:
result.append(w)
return result
def _get_content_bounds(words: List[Dict]) -> tuple:
"""Get content bounds from word positions."""
if not words:
return 0, 0, 0, 0
x_min = min(w["left"] for w in words)
y_min = min(w["top"] for w in words)
x_max = max(w["left"] + w["width"] for w in words)
y_max = max(w["top"] + w["height"] for w in words)
return x_min, y_min, x_max - x_min, y_max - y_min
def _filter_decorative_margin(
words: List[Dict],
img_w: int,
log: Any,
session_id: str,
) -> Dict[str, Any]:
"""Remove words that belong to a decorative alphabet strip on a margin.
Some vocabulary worksheets have a vertical A-Z alphabet graphic along
the left or right edge. OCR reads each letter as an isolated single-
character word. These decorative elements are not content and confuse
column/row detection.
Detection criteria (phase 1 -- find the strip using single-char words):
- Words are in the outer 30% of the page (left or right)
- Nearly all words are single characters (letters or digits)
- At least 8 such words form a vertical strip (>=8 unique Y positions)
- Average horizontal spread of the strip is small (< 80px)
Phase 2 -- once a strip is confirmed, also remove any short word (<=3
chars) in the same narrow x-range. This catches multi-char OCR
artifacts like "Vv" that belong to the same decorative element.
Modifies *words* in place.
Returns:
Dict with 'found' (bool), 'side' (str), 'letters_detected' (int).
"""
no_strip: Dict[str, Any] = {"found": False, "side": "", "letters_detected": 0}
if not words or img_w <= 0:
return no_strip
margin_cutoff = img_w * 0.30
# Phase 1: find candidate strips using short words (1-2 chars).
# OCR often reads alphabet sidebar letters as pairs ("Aa", "Bb")
# rather than singles, so accept <=2-char words as strip candidates.
left_strip = [
w for w in words
if len((w.get("text") or "").strip()) <= 2
and w["left"] + w.get("width", 0) / 2 < margin_cutoff
]
right_strip = [
w for w in words
if len((w.get("text") or "").strip()) <= 2
and w["left"] + w.get("width", 0) / 2 > img_w - margin_cutoff
]
for strip, side in [(left_strip, "left"), (right_strip, "right")]:
if len(strip) < 6:
continue
# Check vertical distribution: should have many distinct Y positions
y_centers = sorted(set(
int(w["top"] + w.get("height", 0) / 2) // 20 * 20 # bucket
for w in strip
))
if len(y_centers) < 6:
continue
# Check horizontal compactness
x_positions = [w["left"] for w in strip]
x_min = min(x_positions)
x_max = max(x_positions)
x_spread = x_max - x_min
if x_spread > 80:
continue
# Phase 2: strip confirmed -- also collect short words in same x-range
# Expand x-range slightly to catch neighbors (e.g. "Vv" next to "U")
strip_x_lo = x_min - 20
strip_x_hi = x_max + 60 # word width + tolerance
all_strip_words = [
w for w in words
if len((w.get("text") or "").strip()) <= 3
and strip_x_lo <= w["left"] <= strip_x_hi
and (w["left"] + w.get("width", 0) / 2 < margin_cutoff
if side == "left"
else w["left"] + w.get("width", 0) / 2 > img_w - margin_cutoff)
]
strip_set = set(id(w) for w in all_strip_words)
before = len(words)
words[:] = [w for w in words if id(w) not in strip_set]
removed = before - len(words)
if removed:
log.info(
"build-grid session %s: removed %d decorative %s-margin words "
"(strip x=%d-%d)",
session_id, removed, side, strip_x_lo, strip_x_hi,
)
return {"found": True, "side": side, "letters_detected": len(strip)}
return no_strip
def _filter_footer_words(
words: List[Dict],
img_h: int,
log: Any,
session_id: str,
) -> Optional[Dict]:
"""Remove isolated words in the bottom 5% of the page (page numbers).
Modifies *words* in place and returns a page_number metadata dict
if a page number was extracted, or None.
"""
if not words or img_h <= 0:
return None
footer_y = img_h * 0.95
footer_words = [
w for w in words
if w["top"] + w.get("height", 0) / 2 > footer_y
]
if not footer_words:
return None
# Only remove if footer has very few words (<= 3) with short text
total_text = "".join((w.get("text") or "").strip() for w in footer_words)
if len(footer_words) <= 3 and len(total_text) <= 10:
# Extract page number metadata before removing
page_number_info = {
"text": total_text.strip(),
"y_pct": round(footer_words[0]["top"] / img_h * 100, 1),
}
# Try to parse as integer
digits = "".join(c for c in total_text if c.isdigit())
if digits:
page_number_info["number"] = int(digits)
footer_set = set(id(w) for w in footer_words)
words[:] = [w for w in words if id(w) not in footer_set]
log.info(
"build-grid session %s: extracted page number '%s' and removed %d footer words",
session_id, total_text, len(footer_words),
)
return page_number_info
return None
def _filter_header_junk(
words: List[Dict],
img_h: int,
log: Any,
session_id: str,
) -> None:
"""Remove OCR junk from header illustrations above the real content.
Textbook pages often have decorative header graphics (illustrations,
icons) that OCR reads as low-confidence junk characters. Real content
typically starts further down the page.
Algorithm:
1. Find the "content start" -- the first Y position where a dense
horizontal row of 3+ high-confidence words begins.
2. Above that line, remove words with conf < 75 and text <= 3 chars.
These are almost certainly OCR artifacts from illustrations.
Modifies *words* in place.
"""
if not words or img_h <= 0:
return
# --- Find content start: first horizontal row with >=3 high-conf words ---
# Sort words by Y
sorted_by_y = sorted(words, key=lambda w: w["top"])
content_start_y = 0
_ROW_TOLERANCE = img_h * 0.02 # words within 2% of page height = same row
_MIN_ROW_WORDS = 3
_MIN_CONF = 80
i = 0
while i < len(sorted_by_y):
row_y = sorted_by_y[i]["top"]
# Collect words in this row band
row_words = []
j = i
while j < len(sorted_by_y) and sorted_by_y[j]["top"] - row_y < _ROW_TOLERANCE:
row_words.append(sorted_by_y[j])
j += 1
# Count high-confidence words with real text (> 1 char)
high_conf = [
w for w in row_words
if w.get("conf", 0) >= _MIN_CONF
and len((w.get("text") or "").strip()) > 1
]
if len(high_conf) >= _MIN_ROW_WORDS:
content_start_y = row_y
break
i = j if j > i else i + 1
if content_start_y <= 0:
return # no clear content start found
# --- Remove low-conf short junk above content start ---
junk = [
w for w in words
if w["top"] + w.get("height", 0) < content_start_y
and w.get("conf", 0) < 75
and len((w.get("text") or "").strip()) <= 3
]
if not junk:
return
junk_set = set(id(w) for w in junk)
before = len(words)
words[:] = [w for w in words if id(w) not in junk_set]
removed = before - len(words)
if removed:
log.info(
"build-grid session %s: removed %d header junk words above y=%d "
"(content start)",
session_id, removed, content_start_y,
)
@@ -0,0 +1,499 @@
"""
Grid Editor header/heading detection and colspan (merged cell) detection.
Split from grid_editor_helpers.py. Pure computation, no HTTP/DB side effects.
Lizenz: Apache 2.0 | DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import logging
import re
from typing import Dict, List, Optional
from cv_ocr_engines import _text_has_garbled_ipa
logger = logging.getLogger(__name__)
def _detect_heading_rows_by_color(zones_data: List[Dict], img_w: int, img_h: int) -> int:
"""Detect heading rows by color + height after color annotation.
A row is a heading if:
1. ALL word_boxes have color_name != 'black' (typically 'blue')
2. Mean word height > 1.2x median height of all words in the zone
Detected heading rows are merged into a single spanning cell.
Returns count of headings detected.
"""
heading_count = 0
for z in zones_data:
cells = z.get("cells", [])
rows = z.get("rows", [])
columns = z.get("columns", [])
if not cells or not rows or len(columns) < 2:
continue
# Compute median word height across the zone
all_heights = []
for cell in cells:
for wb in cell.get("word_boxes") or []:
h = wb.get("height", 0)
if h > 0:
all_heights.append(h)
if not all_heights:
continue
all_heights_sorted = sorted(all_heights)
median_h = all_heights_sorted[len(all_heights_sorted) // 2]
heading_row_indices = []
for row in rows:
if row.get("is_header"):
continue # already detected as header
ri = row["index"]
row_cells = [c for c in cells if c.get("row_index") == ri]
row_wbs = [
wb for cell in row_cells
for wb in cell.get("word_boxes") or []
]
if not row_wbs:
continue
# Condition 1: ALL words are non-black
all_colored = all(
wb.get("color_name", "black") != "black"
for wb in row_wbs
)
if not all_colored:
continue
# Condition 2: mean height > 1.2x median
mean_h = sum(wb.get("height", 0) for wb in row_wbs) / len(row_wbs)
if mean_h <= median_h * 1.2:
continue
heading_row_indices.append(ri)
# Merge heading cells into spanning cells
for hri in heading_row_indices:
header_cells = [c for c in cells if c.get("row_index") == hri]
if len(header_cells) <= 1:
# Single cell -- just mark it as heading
if header_cells:
header_cells[0]["col_type"] = "heading"
heading_count += 1
# Mark row as header
for row in rows:
if row["index"] == hri:
row["is_header"] = True
continue
# Collect all word_boxes and text from all columns
all_wb = []
all_text_parts = []
for hc in sorted(header_cells, key=lambda c: c["col_index"]):
all_wb.extend(hc.get("word_boxes", []))
if hc.get("text", "").strip():
all_text_parts.append(hc["text"].strip())
# Remove all cells for this row, replace with one spanning cell
z["cells"] = [c for c in z["cells"] if c.get("row_index") != hri]
if all_wb:
x_min = min(wb["left"] for wb in all_wb)
y_min = min(wb["top"] for wb in all_wb)
x_max = max(wb["left"] + wb["width"] for wb in all_wb)
y_max = max(wb["top"] + wb["height"] for wb in all_wb)
# Use the actual starting col_index from the first cell
first_col = min(hc["col_index"] for hc in header_cells)
zone_idx = z.get("zone_index", 0)
z["cells"].append({
"cell_id": f"Z{zone_idx}_R{hri:02d}_C{first_col}",
"zone_index": zone_idx,
"row_index": hri,
"col_index": first_col,
"col_type": "heading",
"text": " ".join(all_text_parts),
"confidence": 0.0,
"bbox_px": {"x": x_min, "y": y_min,
"w": x_max - x_min, "h": y_max - y_min},
"bbox_pct": {
"x": round(x_min / img_w * 100, 2) if img_w else 0,
"y": round(y_min / img_h * 100, 2) if img_h else 0,
"w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0,
"h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0,
},
"word_boxes": all_wb,
"ocr_engine": "words_first",
"is_bold": True,
})
# Mark row as header
for row in rows:
if row["index"] == hri:
row["is_header"] = True
heading_count += 1
return heading_count
def _detect_heading_rows_by_single_cell(
zones_data: List[Dict], img_w: int, img_h: int,
) -> int:
"""Detect heading rows that have only a single content cell.
Black headings like "Theme" have normal color and height, so they are
missed by ``_detect_heading_rows_by_color``. The distinguishing signal
is that they occupy only one column while normal vocabulary rows fill
at least 2-3 columns.
A row qualifies as a heading if:
1. It is not already marked as a header/heading.
2. It has exactly ONE cell whose col_type starts with ``column_``
(excluding column_1 / page_ref which only carries page numbers).
3. That single cell is NOT in the last column (continuation/example
lines like "2. Ver\u00e4nderung, Wechsel" often sit alone in column_4).
4. The text does not start with ``[`` (IPA continuation).
5. The zone has >=3 columns and >=5 rows (avoids false positives in
tiny zones).
6. The majority of rows in the zone have >=2 content cells (ensures
we are in a multi-column vocab layout).
"""
heading_count = 0
for z in zones_data:
cells = z.get("cells", [])
rows = z.get("rows", [])
columns = z.get("columns", [])
if len(columns) < 3 or len(rows) < 5:
continue
# Determine the last col_index (example/sentence column)
col_indices = sorted(set(c.get("col_index", 0) for c in cells))
if not col_indices:
continue
last_col = col_indices[-1]
# Count content cells per row (column_* but not column_1/page_ref).
# Exception: column_1 cells that contain a dictionary article word
# (die/der/das etc.) ARE content -- they appear in dictionary layouts
# where the leftmost column holds grammatical articles.
_ARTICLE_WORDS = {
"die", "der", "das", "dem", "den", "des", "ein", "eine",
"the", "a", "an",
}
row_content_counts: Dict[int, int] = {}
for cell in cells:
ct = cell.get("col_type", "")
if not ct.startswith("column_"):
continue
if ct == "column_1":
ctext = (cell.get("text") or "").strip().lower()
if ctext not in _ARTICLE_WORDS:
continue
ri = cell.get("row_index", -1)
row_content_counts[ri] = row_content_counts.get(ri, 0) + 1
# Majority of rows must have >=2 content cells
multi_col_rows = sum(1 for cnt in row_content_counts.values() if cnt >= 2)
if multi_col_rows < len(rows) * 0.4:
continue
# Exclude first and last non-header rows -- these are typically
# page numbers or footer text, not headings.
non_header_rows = [r for r in rows if not r.get("is_header")]
if len(non_header_rows) < 3:
continue
first_ri = non_header_rows[0]["index"]
last_ri = non_header_rows[-1]["index"]
heading_row_indices = []
for row in rows:
if row.get("is_header"):
continue
ri = row["index"]
if ri == first_ri or ri == last_ri:
continue
row_cells = [c for c in cells if c.get("row_index") == ri]
content_cells = [
c for c in row_cells
if c.get("col_type", "").startswith("column_")
and (c.get("col_type") != "column_1"
or (c.get("text") or "").strip().lower() in _ARTICLE_WORDS)
]
if len(content_cells) != 1:
continue
cell = content_cells[0]
# Not in the last column (continuation/example lines)
if cell.get("col_index") == last_col:
continue
text = (cell.get("text") or "").strip()
if not text or text.startswith("["):
continue
# Continuation lines start with "(" -- e.g. "(usw.)", "(TV-Serie)"
if text.startswith("("):
continue
# Single cell NOT in the first content column is likely a
# continuation/overflow line, not a heading. Real headings
# ("Theme 1", "Unit 3: ...") appear in the first or second
# content column.
first_content_col = col_indices[0] if col_indices else 0
if cell.get("col_index", 0) > first_content_col + 1:
continue
# Skip garbled IPA without brackets (e.g. "ska:f -- ska:vz")
# but NOT text with real IPA symbols (e.g. "Theme [\u03b8\u02c8i\u02d0m]")
_REAL_IPA_CHARS = set("\u02c8\u02cc\u0259\u026a\u025b\u0252\u028a\u028c\u00e6\u0251\u0254\u0283\u0292\u03b8\u00f0\u014b")
if _text_has_garbled_ipa(text) and not any(c in _REAL_IPA_CHARS for c in text):
continue
# Guard: dictionary section headings are short (1-4 alpha chars
# like "A", "Ab", "Zi", "Sch"). Longer text that starts
# lowercase is a regular vocabulary word (e.g. "zentral") that
# happens to appear alone in its row.
alpha_only = re.sub(r'[^a-zA-Z\u00e4\u00f6\u00fc\u00c4\u00d6\u00dc\u00df\u1e9e]', '', text)
if len(alpha_only) > 4 and text[0].islower():
continue
heading_row_indices.append(ri)
# Guard: if >25% of eligible rows would become headings, the
# heuristic is misfiring (e.g. sparse single-column layout where
# most rows naturally have only 1 content cell).
eligible_rows = len(non_header_rows) - 2 # minus first/last excluded
if eligible_rows > 0 and len(heading_row_indices) > eligible_rows * 0.25:
logger.debug(
"Skipping single-cell heading detection for zone %s: "
"%d/%d rows would be headings (>25%%)",
z.get("zone_index"), len(heading_row_indices), eligible_rows,
)
continue
for hri in heading_row_indices:
header_cells = [c for c in cells if c.get("row_index") == hri]
if not header_cells:
continue
# Collect all word_boxes and text
all_wb = []
all_text_parts = []
for hc in sorted(header_cells, key=lambda c: c["col_index"]):
all_wb.extend(hc.get("word_boxes", []))
if hc.get("text", "").strip():
all_text_parts.append(hc["text"].strip())
first_col_idx = min(hc["col_index"] for hc in header_cells)
# Remove old cells for this row, add spanning heading cell
z["cells"] = [c for c in z["cells"] if c.get("row_index") != hri]
if all_wb:
x_min = min(wb["left"] for wb in all_wb)
y_min = min(wb["top"] for wb in all_wb)
x_max = max(wb["left"] + wb["width"] for wb in all_wb)
y_max = max(wb["top"] + wb["height"] for wb in all_wb)
else:
# Fallback to first cell bbox
bp = header_cells[0].get("bbox_px", {})
x_min = bp.get("x", 0)
y_min = bp.get("y", 0)
x_max = x_min + bp.get("w", 0)
y_max = y_min + bp.get("h", 0)
zone_idx = z.get("zone_index", 0)
z["cells"].append({
"cell_id": f"Z{zone_idx}_R{hri:02d}_C{first_col_idx}",
"zone_index": zone_idx,
"row_index": hri,
"col_index": first_col_idx,
"col_type": "heading",
"text": " ".join(all_text_parts),
"confidence": 0.0,
"bbox_px": {"x": x_min, "y": y_min,
"w": x_max - x_min, "h": y_max - y_min},
"bbox_pct": {
"x": round(x_min / img_w * 100, 2) if img_w else 0,
"y": round(y_min / img_h * 100, 2) if img_h else 0,
"w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0,
"h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0,
},
"word_boxes": all_wb,
"ocr_engine": "words_first",
"is_bold": False,
})
for row in rows:
if row["index"] == hri:
row["is_header"] = True
heading_count += 1
return heading_count
def _detect_header_rows(
rows: List[Dict],
zone_words: List[Dict],
zone_y: int,
columns: Optional[List[Dict]] = None,
skip_first_row_header: bool = False,
) -> List[int]:
"""Detect header rows: first-row heuristic + spanning header detection.
A "spanning header" is a row whose words stretch across multiple column
boundaries (e.g. "Unit4: Bonnie Scotland" centred across 4 columns).
"""
if len(rows) < 2:
return []
headers = []
if not skip_first_row_header:
first_row = rows[0]
second_row = rows[1]
# Gap between first and second row > 0.5x average row height
avg_h = sum(r["y_max"] - r["y_min"] for r in rows) / len(rows)
gap = second_row["y_min"] - first_row["y_max"]
if gap > avg_h * 0.5:
headers.append(0)
# Also check if first row words are taller than average (bold/header text)
all_heights = [w["height"] for w in zone_words]
median_h = sorted(all_heights)[len(all_heights) // 2] if all_heights else 20
first_row_words = [
w for w in zone_words
if first_row["y_min"] <= w["top"] + w["height"] / 2 <= first_row["y_max"]
]
if first_row_words:
first_h = max(w["height"] for w in first_row_words)
if first_h > median_h * 1.3:
if 0 not in headers:
headers.append(0)
# Note: Spanning-header detection (rows spanning all columns) has been
# disabled because it produces too many false positives on vocabulary
# worksheets where IPA transcriptions or short entries naturally span
# multiple columns with few words. The first-row heuristic above is
# sufficient for detecting real headers.
return headers
def _detect_colspan_cells(
zone_words: List[Dict],
columns: List[Dict],
rows: List[Dict],
cells: List[Dict],
img_w: int,
img_h: int,
) -> List[Dict]:
"""Detect and merge cells that span multiple columns (colspan).
A word-block (PaddleOCR phrase) that extends significantly past a column
boundary into the next column indicates a merged cell. This replaces
the incorrectly split cells with a single cell spanning multiple columns.
Works for both full-page scans and box zones.
"""
if len(columns) < 2 or not zone_words or not rows:
return cells
from cv_words_first import _assign_word_to_row
# Column boundaries (midpoints between adjacent columns)
col_boundaries = []
for ci in range(len(columns) - 1):
col_boundaries.append((columns[ci]["x_max"] + columns[ci + 1]["x_min"]) / 2)
def _cols_covered(w_left: float, w_right: float) -> List[int]:
"""Return list of column indices that a word-block covers."""
covered = []
for col in columns:
col_mid = (col["x_min"] + col["x_max"]) / 2
# Word covers a column if it extends past the column's midpoint
if w_left < col_mid < w_right:
covered.append(col["index"])
# Also include column if word starts within it
elif col["x_min"] <= w_left < col["x_max"]:
covered.append(col["index"])
return sorted(set(covered))
# Group original word-blocks by row
row_word_blocks: Dict[int, List[Dict]] = {}
for w in zone_words:
ri = _assign_word_to_row(w, rows)
row_word_blocks.setdefault(ri, []).append(w)
# For each row, check if any word-block spans multiple columns
rows_to_merge: Dict[int, List[Dict]] = {} # row_index -> list of spanning word-blocks
for ri, wblocks in row_word_blocks.items():
spanning = []
for w in wblocks:
w_left = w["left"]
w_right = w_left + w["width"]
covered = _cols_covered(w_left, w_right)
if len(covered) >= 2:
spanning.append({"word": w, "cols": covered})
if spanning:
rows_to_merge[ri] = spanning
if not rows_to_merge:
return cells
# Merge cells for spanning rows
new_cells = []
for cell in cells:
ri = cell.get("row_index", -1)
if ri not in rows_to_merge:
new_cells.append(cell)
continue
# Check if this cell's column is part of a spanning block
ci = cell.get("col_index", -1)
is_part_of_span = False
for span in rows_to_merge[ri]:
if ci in span["cols"]:
is_part_of_span = True
# Only emit the merged cell for the FIRST column in the span
if ci == span["cols"][0]:
# Use the ORIGINAL word-block text (not the split cell texts
# which may have broken words like "euros a" + "nd cents")
orig_word = span["word"]
merged_text = orig_word.get("text", "").strip()
all_wb = [orig_word]
# Compute merged bbox
if all_wb:
x_min = min(wb["left"] for wb in all_wb)
y_min = min(wb["top"] for wb in all_wb)
x_max = max(wb["left"] + wb["width"] for wb in all_wb)
y_max = max(wb["top"] + wb["height"] for wb in all_wb)
else:
x_min = y_min = x_max = y_max = 0
new_cells.append({
"cell_id": cell["cell_id"],
"row_index": ri,
"col_index": span["cols"][0],
"col_type": "spanning_header",
"colspan": len(span["cols"]),
"text": merged_text,
"confidence": cell.get("confidence", 0),
"bbox_px": {"x": x_min, "y": y_min,
"w": x_max - x_min, "h": y_max - y_min},
"bbox_pct": {
"x": round(x_min / img_w * 100, 2) if img_w else 0,
"y": round(y_min / img_h * 100, 2) if img_h else 0,
"w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0,
"h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0,
},
"word_boxes": all_wb,
"ocr_engine": cell.get("ocr_engine", ""),
"is_bold": cell.get("is_bold", False),
})
logger.info(
"colspan detected: row %d, cols %s -> merged %d cells (%r)",
ri, span["cols"], len(span["cols"]), merged_text[:50],
)
break
if not is_part_of_span:
new_cells.append(cell)
return new_cells
@@ -0,0 +1,58 @@
"""
Grid Editor helper functions barrel re-export module.
This file re-exports all public symbols from the split sub-modules
so that existing ``from grid_editor_helpers import ...`` statements
continue to work without changes.
Sub-modules:
- columns column detection, cross-column splitting, marker merging
- filters word/zone filtering, border ghosts, decorative margins
- headers header/heading detection, colspan detection
- zones vertical dividers, zone splitting/merging, zone grid building
Lizenz: Apache 2.0 (kommerziell nutzbar)
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
# --- Re-export: columns ---------------------------------------------------
from .columns import ( # noqa: F401
_is_recognized_word,
_split_cross_column_words,
_cluster_columns_by_alignment,
_MARKER_CHARS,
_merge_inline_marker_columns,
)
# --- Re-export: filters ----------------------------------------------------
from .filters import ( # noqa: F401
_filter_border_strip_words,
_GRID_GHOST_CHARS,
_filter_border_ghosts,
_flatten_word_boxes,
_words_in_zone,
_get_content_bounds,
_filter_decorative_margin,
_filter_footer_words,
_filter_header_junk,
)
# --- Re-export: headers ----------------------------------------------------
from .headers import ( # noqa: F401
_detect_heading_rows_by_color,
_detect_heading_rows_by_single_cell,
_detect_header_rows,
_detect_colspan_cells,
)
# --- Re-export: zones -------------------------------------------------------
from .zones import ( # noqa: F401
_PIPE_RE_VSPLIT,
_detect_vertical_dividers,
_split_zone_at_vertical_dividers,
_merge_content_zones_across_boxes,
_build_zone_grid,
)
# --- Re-export from cv_words_first (used by cv_box_layout.py) ---------------
from cv_words_first import _cluster_rows # noqa: F401
@@ -0,0 +1,389 @@
"""
Grid Editor vertical divider detection, zone splitting/merging, zone grid building.
Split from grid_editor_helpers.py for maintainability.
All functions are pure computation no HTTP, DB, or session side effects.
Lizenz: Apache 2.0 (kommerziell nutzbar)
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import logging
import re
from typing import Any, Dict, List, Optional
from cv_vocab_types import PageZone
from cv_words_first import _cluster_rows, _build_cells
from .columns import (
_cluster_columns_by_alignment,
_merge_inline_marker_columns,
_split_cross_column_words,
)
from .headers import (
_detect_header_rows,
_detect_colspan_cells,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Vertical divider detection and zone splitting
# ---------------------------------------------------------------------------
_PIPE_RE_VSPLIT = re.compile(r"^\|+$")
def _detect_vertical_dividers(
words: List[Dict],
zone_x: int,
zone_w: int,
zone_y: int,
zone_h: int,
) -> List[float]:
"""Detect vertical divider lines from pipe word_boxes at consistent x.
Returns list of divider x-positions (empty if no dividers found).
"""
if not words or zone_w <= 0 or zone_h <= 0:
return []
# Collect pipe word_boxes
pipes = [
w for w in words
if _PIPE_RE_VSPLIT.match((w.get("text") or "").strip())
]
if len(pipes) < 5:
return []
# Cluster pipe x-centers by proximity
tolerance = max(15, int(zone_w * 0.02))
pipe_xs = sorted(w["left"] + w["width"] / 2 for w in pipes)
clusters: List[List[float]] = [[pipe_xs[0]]]
for x in pipe_xs[1:]:
if x - clusters[-1][-1] <= tolerance:
clusters[-1].append(x)
else:
clusters.append([x])
dividers: List[float] = []
for cluster in clusters:
if len(cluster) < 5:
continue
mean_x = sum(cluster) / len(cluster)
# Must be between 15% and 85% of zone width
rel_pos = (mean_x - zone_x) / zone_w
if rel_pos < 0.15 or rel_pos > 0.85:
continue
# Check vertical coverage: pipes must span >= 50% of zone height
cluster_pipes = [
w for w in pipes
if abs(w["left"] + w["width"] / 2 - mean_x) <= tolerance
]
ys = [w["top"] for w in cluster_pipes] + [w["top"] + w["height"] for w in cluster_pipes]
y_span = max(ys) - min(ys) if ys else 0
if y_span < zone_h * 0.5:
continue
dividers.append(mean_x)
return sorted(dividers)
def _split_zone_at_vertical_dividers(
zone: "PageZone",
divider_xs: List[float],
vsplit_group_id: int,
) -> List["PageZone"]:
"""Split a PageZone at vertical divider positions into sub-zones."""
boundaries = [zone.x] + divider_xs + [zone.x + zone.width]
hints = []
for i in range(len(boundaries) - 1):
if i == 0:
hints.append("left_of_vsplit")
elif i == len(boundaries) - 2:
hints.append("right_of_vsplit")
else:
hints.append("middle_of_vsplit")
sub_zones = []
for i in range(len(boundaries) - 1):
x_start = int(boundaries[i])
x_end = int(boundaries[i + 1])
sub = PageZone(
index=0, # re-indexed later
zone_type=zone.zone_type,
y=zone.y,
height=zone.height,
x=x_start,
width=x_end - x_start,
box=zone.box,
image_overlays=zone.image_overlays,
layout_hint=hints[i],
vsplit_group=vsplit_group_id,
)
sub_zones.append(sub)
return sub_zones
def _merge_content_zones_across_boxes(
zones: List,
content_x: int,
content_w: int,
) -> List:
"""Merge content zones separated by box zones into single zones.
Box zones become image_overlays on the merged content zone.
Pattern: [content, box*, content] -> [merged_content with overlay]
Box zones NOT between two content zones stay as standalone zones.
"""
if len(zones) < 3:
return zones
# Group consecutive runs of [content, box+, content]
result: List = []
i = 0
while i < len(zones):
z = zones[i]
if z.zone_type != "content":
result.append(z)
i += 1
continue
# Start of a potential merge group: content zone
group_contents = [z]
group_boxes = []
j = i + 1
# Absorb [box, content] pairs -- only absorb a box if it's
# confirmed to be followed by another content zone.
while j < len(zones):
if (zones[j].zone_type == "box"
and j + 1 < len(zones)
and zones[j + 1].zone_type == "content"):
group_boxes.append(zones[j])
group_contents.append(zones[j + 1])
j += 2
else:
break
if len(group_contents) >= 2 and group_boxes:
# Merge: create one large content zone spanning all
y_min = min(c.y for c in group_contents)
y_max = max(c.y + c.height for c in group_contents)
overlays = []
for bz in group_boxes:
overlay = {
"y": bz.y,
"height": bz.height,
"x": bz.x,
"width": bz.width,
}
if bz.box:
overlay["box"] = {
"x": bz.box.x,
"y": bz.box.y,
"width": bz.box.width,
"height": bz.box.height,
"confidence": bz.box.confidence,
"border_thickness": bz.box.border_thickness,
}
overlays.append(overlay)
merged = PageZone(
index=0, # re-indexed below
zone_type="content",
y=y_min,
height=y_max - y_min,
x=content_x,
width=content_w,
image_overlays=overlays,
)
result.append(merged)
i = j
else:
# No merge possible -- emit just the content zone
result.append(z)
i += 1
# Re-index zones
for idx, z in enumerate(result):
z.index = idx
logger.info(
"zone-merge: %d zones -> %d zones after merging across boxes",
len(zones), len(result),
)
return result
def _build_zone_grid(
zone_words: List[Dict],
zone_x: int,
zone_y: int,
zone_w: int,
zone_h: int,
zone_index: int,
img_w: int,
img_h: int,
global_columns: Optional[List[Dict]] = None,
skip_first_row_header: bool = False,
) -> Dict[str, Any]:
"""Build columns, rows, cells for a single zone from its words.
Args:
global_columns: If provided, use these pre-computed column boundaries
instead of detecting columns per zone. Used for content zones so
that all content zones (above/between/below boxes) share the same
column structure. Box zones always detect columns independently.
"""
if not zone_words:
return {
"columns": [],
"rows": [],
"cells": [],
"header_rows": [],
}
# Cluster rows first (needed for column alignment analysis)
rows = _cluster_rows(zone_words)
# Diagnostic logging for small/medium zones (box zones typically have 40-60 words)
if len(zone_words) <= 60:
import statistics as _st
_heights = [w['height'] for w in zone_words if w.get('height', 0) > 0]
_med_h = _st.median(_heights) if _heights else 20
_y_tol = max(_med_h * 0.5, 5)
logger.info(
"zone %d row-clustering: %d words, median_h=%.0f, y_tol=%.1f -> %d rows",
zone_index, len(zone_words), _med_h, _y_tol, len(rows),
)
for w in sorted(zone_words, key=lambda ww: (ww['top'], ww['left'])):
logger.info(
" zone %d word: y=%d x=%d h=%d w=%d '%s'",
zone_index, w['top'], w['left'], w['height'], w['width'],
w.get('text', '')[:40],
)
for r in rows:
logger.info(
" zone %d row %d: y_min=%d y_max=%d y_center=%.0f",
zone_index, r['index'], r['y_min'], r['y_max'], r['y_center'],
)
# Use global columns if provided, otherwise detect per zone
columns = global_columns if global_columns else _cluster_columns_by_alignment(zone_words, zone_w, rows)
# Merge inline marker columns (bullets, numbering) into adjacent text
if not global_columns:
columns = _merge_inline_marker_columns(columns, zone_words)
if not columns or not rows:
return {
"columns": [],
"rows": [],
"cells": [],
"header_rows": [],
}
# Split word boxes that straddle column boundaries (e.g. "sichzie"
# spanning Col 1 + Col 2). Must happen after column detection and
# before cell assignment.
# Keep original words for colspan detection (split destroys span info).
original_zone_words = zone_words
if len(columns) >= 2:
zone_words = _split_cross_column_words(zone_words, columns)
# Build cells
cells = _build_cells(zone_words, columns, rows, img_w, img_h)
# --- Detect colspan (merged cells spanning multiple columns) ---
# Uses the ORIGINAL (pre-split) words to detect word-blocks that span
# multiple columns. _split_cross_column_words would have destroyed
# this information by cutting words at column boundaries.
if len(columns) >= 2:
cells = _detect_colspan_cells(original_zone_words, columns, rows, cells, img_w, img_h)
# Prefix cell IDs with zone index
for cell in cells:
cell["cell_id"] = f"Z{zone_index}_{cell['cell_id']}"
cell["zone_index"] = zone_index
# Detect header rows (pass columns for spanning header detection)
header_rows = _detect_header_rows(rows, zone_words, zone_y, columns,
skip_first_row_header=skip_first_row_header)
# Merge cells in spanning header rows into a single col-0 cell
if header_rows and len(columns) >= 2:
for hri in header_rows:
header_cells = [c for c in cells if c["row_index"] == hri]
if len(header_cells) <= 1:
continue
# Collect all word_boxes and text from all columns
all_wb = []
all_text_parts = []
for hc in sorted(header_cells, key=lambda c: c["col_index"]):
all_wb.extend(hc.get("word_boxes", []))
if hc.get("text", "").strip():
all_text_parts.append(hc["text"].strip())
# Remove all header cells, replace with one spanning cell
cells = [c for c in cells if c["row_index"] != hri]
if all_wb:
x_min = min(wb["left"] for wb in all_wb)
y_min = min(wb["top"] for wb in all_wb)
x_max = max(wb["left"] + wb["width"] for wb in all_wb)
y_max = max(wb["top"] + wb["height"] for wb in all_wb)
cells.append({
"cell_id": f"R{hri:02d}_C0",
"row_index": hri,
"col_index": 0,
"col_type": "spanning_header",
"text": " ".join(all_text_parts),
"confidence": 0.0,
"bbox_px": {"x": x_min, "y": y_min,
"w": x_max - x_min, "h": y_max - y_min},
"bbox_pct": {
"x": round(x_min / img_w * 100, 2) if img_w else 0,
"y": round(y_min / img_h * 100, 2) if img_h else 0,
"w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0,
"h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0,
},
"word_boxes": all_wb,
"ocr_engine": "words_first",
"is_bold": True,
})
# Convert columns to output format with percentages
out_columns = []
for col in columns:
x_min = col["x_min"]
x_max = col["x_max"]
out_columns.append({
"index": col["index"],
"label": col["type"],
"x_min_px": round(x_min),
"x_max_px": round(x_max),
"x_min_pct": round(x_min / img_w * 100, 2) if img_w else 0,
"x_max_pct": round(x_max / img_w * 100, 2) if img_w else 0,
"bold": False,
})
# Convert rows to output format with percentages
out_rows = []
for row in rows:
out_rows.append({
"index": row["index"],
"y_min_px": round(row["y_min"]),
"y_max_px": round(row["y_max"]),
"y_min_pct": round(row["y_min"] / img_h * 100, 2) if img_h else 0,
"y_max_pct": round(row["y_max"] / img_h * 100, 2) if img_h else 0,
"is_header": row["index"] in header_rows,
})
return {
"columns": out_columns,
"rows": out_rows,
"cells": cells,
"header_rows": header_rows,
"_raw_columns": columns, # internal: for propagation to other zones
}