Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 27s
CI / test-go-edu-search (push) Successful in 36s
CI / test-python-klausur (push) Failing after 2m9s
CI / test-python-agent-core (push) Successful in 17s
CI / test-nodejs-website (push) Successful in 21s
_filter_footer_words now returns page number info (text, y_pct, number) instead of just removing footer words. The page number is included in the grid result as `page_number` and displayed in the frontend summary bar as "S. 233". This preserves page numbers for later page concatenation in the customer frontend while still removing them from the grid content. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1417 lines
51 KiB
Python
1417 lines
51 KiB
Python
"""
|
||
Grid Editor helper functions — filters, detectors, and zone grid building.
|
||
|
||
Extracted from grid_editor_api.py for maintainability.
|
||
All functions are pure computation — no HTTP, DB, or session side effects.
|
||
|
||
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
||
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
|
||
"""
|
||
|
||
import logging
|
||
import re
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
import cv2
|
||
import numpy as np
|
||
|
||
from cv_vocab_types import PageZone
|
||
from cv_words_first import _cluster_rows, _build_cells
|
||
from cv_ocr_engines import _text_has_garbled_ipa
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _filter_border_strip_words(words: List[Dict]) -> Tuple[List[Dict], int]:
|
||
"""Remove page-border decoration strip words BEFORE column detection.
|
||
|
||
Scans from each page edge inward to find the first significant x-gap
|
||
(>30 px). If the edge cluster contains <15 % of total words, those
|
||
words are removed as border-strip artifacts (alphabet letters,
|
||
illustration fragments).
|
||
|
||
Must run BEFORE ``_build_zone_grid`` so that column detection only
|
||
sees real content words and doesn't produce inflated row counts.
|
||
"""
|
||
if len(words) < 10:
|
||
return words, 0
|
||
|
||
sorted_words = sorted(words, key=lambda w: w.get("left", 0))
|
||
total = len(sorted_words)
|
||
|
||
# -- Left-edge scan (running max right-edge) --
|
||
left_count = 0
|
||
running_right = 0
|
||
for gi in range(total - 1):
|
||
running_right = max(
|
||
running_right,
|
||
sorted_words[gi].get("left", 0) + sorted_words[gi].get("width", 0),
|
||
)
|
||
if sorted_words[gi + 1].get("left", 0) - running_right > 30:
|
||
left_count = gi + 1
|
||
break
|
||
|
||
# -- Right-edge scan (running min left) --
|
||
right_count = 0
|
||
running_left = sorted_words[-1].get("left", 0)
|
||
for gi in range(total - 1, 0, -1):
|
||
running_left = min(running_left, sorted_words[gi].get("left", 0))
|
||
prev_right = (
|
||
sorted_words[gi - 1].get("left", 0)
|
||
+ sorted_words[gi - 1].get("width", 0)
|
||
)
|
||
if running_left - prev_right > 30:
|
||
right_count = total - gi
|
||
break
|
||
|
||
# Validate candidate strip: real border decorations are mostly short
|
||
# words (alphabet letters like "A", "Bb", stray marks). Multi-word
|
||
# content like "der Ranzen" or "die Schals" (continuation of German
|
||
# translations) must NOT be removed.
|
||
def _is_decorative_strip(candidates: List[Dict]) -> bool:
|
||
if not candidates:
|
||
return False
|
||
short = sum(1 for w in candidates if len((w.get("text") or "").strip()) <= 2)
|
||
return short / len(candidates) >= 0.45
|
||
|
||
strip_ids: set = set()
|
||
if left_count > 0 and left_count / total < 0.20:
|
||
candidates = sorted_words[:left_count]
|
||
if _is_decorative_strip(candidates):
|
||
strip_ids = {id(w) for w in candidates}
|
||
elif right_count > 0 and right_count / total < 0.20:
|
||
candidates = sorted_words[total - right_count:]
|
||
if _is_decorative_strip(candidates):
|
||
strip_ids = {id(w) for w in candidates}
|
||
|
||
if not strip_ids:
|
||
return words, 0
|
||
|
||
return [w for w in words if id(w) not in strip_ids], len(strip_ids)
|
||
|
||
|
||
def _cluster_columns_by_alignment(
|
||
words: List[Dict],
|
||
zone_w: int,
|
||
rows: List[Dict],
|
||
) -> List[Dict[str, Any]]:
|
||
"""Detect columns by clustering left-edge alignment across rows.
|
||
|
||
Hybrid approach:
|
||
1. Group words by row, find "group start" positions within each row
|
||
(words preceded by a large gap or first word in row)
|
||
2. Cluster group-start left-edges by X-proximity across rows
|
||
3. Filter by row coverage (how many rows have a group start here)
|
||
4. Merge nearby clusters
|
||
5. Build column boundaries
|
||
|
||
This filters out mid-phrase word positions (e.g. IPA transcriptions,
|
||
second words in multi-word entries) by only considering positions
|
||
where a new word group begins within a row.
|
||
"""
|
||
if not words or not rows:
|
||
return []
|
||
|
||
total_rows = len(rows)
|
||
if total_rows == 0:
|
||
return []
|
||
|
||
# --- Group words by row ---
|
||
row_words: Dict[int, List[Dict]] = {}
|
||
for w in words:
|
||
y_center = w["top"] + w["height"] / 2
|
||
best = min(rows, key=lambda r: abs(r["y_center"] - y_center))
|
||
row_words.setdefault(best["index"], []).append(w)
|
||
|
||
# --- Compute adaptive gap threshold for group-start detection ---
|
||
all_gaps: List[float] = []
|
||
for ri, rw_list in row_words.items():
|
||
sorted_rw = sorted(rw_list, key=lambda w: w["left"])
|
||
for i in range(len(sorted_rw) - 1):
|
||
right = sorted_rw[i]["left"] + sorted_rw[i]["width"]
|
||
gap = sorted_rw[i + 1]["left"] - right
|
||
if gap > 0:
|
||
all_gaps.append(gap)
|
||
|
||
if all_gaps:
|
||
sorted_gaps = sorted(all_gaps)
|
||
median_gap = sorted_gaps[len(sorted_gaps) // 2]
|
||
heights = [w["height"] for w in words if w.get("height", 0) > 0]
|
||
median_h = sorted(heights)[len(heights) // 2] if heights else 25
|
||
# Column boundary: gap > 3× median gap or > 1.5× median word height
|
||
gap_threshold = max(median_gap * 3, median_h * 1.5, 30)
|
||
else:
|
||
gap_threshold = 50
|
||
|
||
# --- Find group-start positions (left-edges that begin a new column) ---
|
||
start_positions: List[tuple] = [] # (left_edge, row_index)
|
||
for ri, rw_list in row_words.items():
|
||
sorted_rw = sorted(rw_list, key=lambda w: w["left"])
|
||
# First word in row is always a group start
|
||
start_positions.append((sorted_rw[0]["left"], ri))
|
||
for i in range(1, len(sorted_rw)):
|
||
right_prev = sorted_rw[i - 1]["left"] + sorted_rw[i - 1]["width"]
|
||
gap = sorted_rw[i]["left"] - right_prev
|
||
if gap >= gap_threshold:
|
||
start_positions.append((sorted_rw[i]["left"], ri))
|
||
|
||
start_positions.sort(key=lambda x: x[0])
|
||
|
||
logger.info(
|
||
"alignment columns: %d group-start positions from %d words "
|
||
"(gap_threshold=%.0f, %d rows)",
|
||
len(start_positions), len(words), gap_threshold, total_rows,
|
||
)
|
||
|
||
if not start_positions:
|
||
x_min = min(w["left"] for w in words)
|
||
x_max = max(w["left"] + w["width"] for w in words)
|
||
return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}]
|
||
|
||
# --- Cluster group-start positions by X-proximity ---
|
||
tolerance = max(10, int(zone_w * 0.01))
|
||
clusters: List[Dict[str, Any]] = []
|
||
cur_edges = [start_positions[0][0]]
|
||
cur_rows = {start_positions[0][1]}
|
||
|
||
for left, row_idx in start_positions[1:]:
|
||
if left - cur_edges[-1] <= tolerance:
|
||
cur_edges.append(left)
|
||
cur_rows.add(row_idx)
|
||
else:
|
||
clusters.append({
|
||
"mean_x": int(sum(cur_edges) / len(cur_edges)),
|
||
"min_edge": min(cur_edges),
|
||
"max_edge": max(cur_edges),
|
||
"count": len(cur_edges),
|
||
"distinct_rows": len(cur_rows),
|
||
"row_coverage": len(cur_rows) / total_rows,
|
||
})
|
||
cur_edges = [left]
|
||
cur_rows = {row_idx}
|
||
clusters.append({
|
||
"mean_x": int(sum(cur_edges) / len(cur_edges)),
|
||
"min_edge": min(cur_edges),
|
||
"max_edge": max(cur_edges),
|
||
"count": len(cur_edges),
|
||
"distinct_rows": len(cur_rows),
|
||
"row_coverage": len(cur_rows) / total_rows,
|
||
})
|
||
|
||
# --- Filter by row coverage ---
|
||
# These thresholds must be high enough to avoid false columns in flowing
|
||
# text (random inter-word gaps) while still detecting real columns in
|
||
# vocabulary worksheets (which typically have >80% row coverage).
|
||
MIN_COVERAGE_PRIMARY = 0.35
|
||
MIN_COVERAGE_SECONDARY = 0.12
|
||
MIN_WORDS_SECONDARY = 4
|
||
MIN_DISTINCT_ROWS = 3
|
||
|
||
# Content boundary for left-margin detection
|
||
content_x_min = min(w["left"] for w in words)
|
||
content_x_max = max(w["left"] + w["width"] for w in words)
|
||
content_span = content_x_max - content_x_min
|
||
|
||
primary = [
|
||
c for c in clusters
|
||
if c["row_coverage"] >= MIN_COVERAGE_PRIMARY
|
||
and c["distinct_rows"] >= MIN_DISTINCT_ROWS
|
||
]
|
||
primary_ids = {id(c) for c in primary}
|
||
secondary = [
|
||
c for c in clusters
|
||
if id(c) not in primary_ids
|
||
and c["row_coverage"] >= MIN_COVERAGE_SECONDARY
|
||
and c["count"] >= MIN_WORDS_SECONDARY
|
||
and c["distinct_rows"] >= MIN_DISTINCT_ROWS
|
||
]
|
||
|
||
# Tertiary: narrow left-margin columns (page refs, markers) that have
|
||
# too few rows for secondary but are clearly left-aligned and separated
|
||
# from the main content. These appear at the far left or far right and
|
||
# have a large gap to the nearest significant cluster.
|
||
used_ids = {id(c) for c in primary} | {id(c) for c in secondary}
|
||
sig_xs = [c["mean_x"] for c in primary + secondary]
|
||
|
||
MIN_DISTINCT_ROWS_TERTIARY = max(MIN_DISTINCT_ROWS + 1, 4)
|
||
MIN_COVERAGE_TERTIARY = 0.05 # at least 5% of rows
|
||
tertiary = []
|
||
for c in clusters:
|
||
if id(c) in used_ids:
|
||
continue
|
||
if c["distinct_rows"] < MIN_DISTINCT_ROWS_TERTIARY:
|
||
continue
|
||
if c["row_coverage"] < MIN_COVERAGE_TERTIARY:
|
||
continue
|
||
# Must be near left or right content margin (within 15%)
|
||
rel_pos = (c["mean_x"] - content_x_min) / content_span if content_span else 0.5
|
||
if not (rel_pos < 0.15 or rel_pos > 0.85):
|
||
continue
|
||
# Must have significant gap to nearest significant cluster
|
||
if sig_xs:
|
||
min_dist = min(abs(c["mean_x"] - sx) for sx in sig_xs)
|
||
if min_dist < max(30, content_span * 0.02):
|
||
continue
|
||
tertiary.append(c)
|
||
|
||
if tertiary:
|
||
for c in tertiary:
|
||
logger.info(
|
||
" tertiary (margin) cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)",
|
||
c["mean_x"], c["min_edge"], c["max_edge"],
|
||
c["count"], c["distinct_rows"], c["row_coverage"] * 100,
|
||
)
|
||
|
||
significant = sorted(primary + secondary + tertiary, key=lambda c: c["mean_x"])
|
||
|
||
for c in significant:
|
||
logger.info(
|
||
" significant cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)",
|
||
c["mean_x"], c["min_edge"], c["max_edge"],
|
||
c["count"], c["distinct_rows"], c["row_coverage"] * 100,
|
||
)
|
||
logger.info(
|
||
"alignment columns: %d clusters, %d primary, %d secondary → %d significant",
|
||
len(clusters), len(primary), len(secondary), len(significant),
|
||
)
|
||
|
||
if not significant:
|
||
# Fallback: single column covering all content
|
||
x_min = min(w["left"] for w in words)
|
||
x_max = max(w["left"] + w["width"] for w in words)
|
||
return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}]
|
||
|
||
# --- Merge nearby clusters ---
|
||
merge_distance = max(25, int(zone_w * 0.03))
|
||
merged = [significant[0].copy()]
|
||
for s in significant[1:]:
|
||
if s["mean_x"] - merged[-1]["mean_x"] < merge_distance:
|
||
prev = merged[-1]
|
||
total = prev["count"] + s["count"]
|
||
prev["mean_x"] = (
|
||
prev["mean_x"] * prev["count"] + s["mean_x"] * s["count"]
|
||
) // total
|
||
prev["count"] = total
|
||
prev["min_edge"] = min(prev["min_edge"], s["min_edge"])
|
||
prev["max_edge"] = max(prev["max_edge"], s["max_edge"])
|
||
prev["distinct_rows"] = max(prev["distinct_rows"], s["distinct_rows"])
|
||
else:
|
||
merged.append(s.copy())
|
||
|
||
logger.info(
|
||
"alignment columns: %d after merge (distance=%d)",
|
||
len(merged), merge_distance,
|
||
)
|
||
|
||
# --- Build column boundaries ---
|
||
margin = max(5, int(zone_w * 0.005))
|
||
content_x_min = min(w["left"] for w in words)
|
||
content_x_max = max(w["left"] + w["width"] for w in words)
|
||
|
||
columns: List[Dict[str, Any]] = []
|
||
for i, cluster in enumerate(merged):
|
||
x_min = max(content_x_min, cluster["min_edge"] - margin)
|
||
if i + 1 < len(merged):
|
||
x_max = merged[i + 1]["min_edge"] - margin
|
||
else:
|
||
x_max = content_x_max
|
||
|
||
columns.append({
|
||
"index": i,
|
||
"type": f"column_{i + 1}" if len(merged) > 1 else "column_text",
|
||
"x_min": x_min,
|
||
"x_max": x_max,
|
||
})
|
||
|
||
return columns
|
||
|
||
|
||
# Characters that are typically OCR artefacts from box border lines.
|
||
# Intentionally excludes ! (red markers) and . , ; (real punctuation).
|
||
_GRID_GHOST_CHARS = set("|1lI[](){}/\\-—–_~=+")
|
||
|
||
|
||
def _filter_border_ghosts(
|
||
words: List[Dict],
|
||
boxes: List,
|
||
) -> tuple:
|
||
"""Remove words sitting on box borders that are OCR artefacts.
|
||
|
||
Returns (filtered_words, removed_count).
|
||
"""
|
||
if not boxes or not words:
|
||
return words, 0
|
||
|
||
# Build border bands from detected boxes
|
||
x_bands: List[tuple] = []
|
||
y_bands: List[tuple] = []
|
||
for b in boxes:
|
||
bt = (
|
||
b.border_thickness
|
||
if hasattr(b, "border_thickness")
|
||
else b.get("border_thickness", 3)
|
||
)
|
||
# Skip borderless boxes (images/graphics) — no border line to produce ghosts
|
||
if bt == 0:
|
||
continue
|
||
bx = b.x if hasattr(b, "x") else b.get("x", 0)
|
||
by = b.y if hasattr(b, "y") else b.get("y", 0)
|
||
bw = b.width if hasattr(b, "width") else b.get("w", b.get("width", 0))
|
||
bh = b.height if hasattr(b, "height") else b.get("h", b.get("height", 0))
|
||
margin = max(bt * 2, 10) + 6
|
||
x_bands.append((bx - margin, bx + margin))
|
||
x_bands.append((bx + bw - margin, bx + bw + margin))
|
||
y_bands.append((by - margin, by + margin))
|
||
y_bands.append((by + bh - margin, by + bh + margin))
|
||
|
||
def _is_ghost(w: Dict) -> bool:
|
||
text = (w.get("text") or "").strip()
|
||
if not text:
|
||
return False
|
||
# Check if any word edge (not just center) touches a border band
|
||
w_left = w["left"]
|
||
w_right = w["left"] + w["width"]
|
||
w_top = w["top"]
|
||
w_bottom = w["top"] + w["height"]
|
||
on_border = (
|
||
any(lo <= w_left <= hi or lo <= w_right <= hi for lo, hi in x_bands)
|
||
or any(lo <= w_top <= hi or lo <= w_bottom <= hi for lo, hi in y_bands)
|
||
)
|
||
if not on_border:
|
||
return False
|
||
if len(text) == 1 and text in _GRID_GHOST_CHARS:
|
||
return True
|
||
return False
|
||
|
||
filtered = [w for w in words if not _is_ghost(w)]
|
||
return filtered, len(words) - len(filtered)
|
||
|
||
|
||
_MARKER_CHARS = set("•*·-–—|~=+#>→►▸▪◆○●□■✓✗✔✘")
|
||
|
||
|
||
def _merge_inline_marker_columns(
|
||
columns: List[Dict],
|
||
words: List[Dict],
|
||
) -> List[Dict]:
|
||
"""Merge narrow marker columns (bullets, numbering) into adjacent text.
|
||
|
||
Bullet points (•, *, -) and numbering (1., 2.) create narrow columns
|
||
at the left edge of a zone. These are inline markers that indent text,
|
||
not real separate columns. Merge them with their right neighbour.
|
||
|
||
Does NOT merge columns containing alphabetic words like "to", "in",
|
||
"der", "die", "das" — those are legitimate content columns.
|
||
"""
|
||
if len(columns) < 2:
|
||
return columns
|
||
|
||
merged: List[Dict] = []
|
||
skip: set = set()
|
||
|
||
for i, col in enumerate(columns):
|
||
if i in skip:
|
||
continue
|
||
|
||
# Find words in this column
|
||
col_words = [
|
||
w for w in words
|
||
if col["x_min"] <= w["left"] + w["width"] / 2 < col["x_max"]
|
||
]
|
||
col_width = col["x_max"] - col["x_min"]
|
||
|
||
# Narrow column with mostly short words → MIGHT be inline markers
|
||
if col_words and col_width < 80:
|
||
avg_len = sum(len(w.get("text", "")) for w in col_words) / len(col_words)
|
||
if avg_len <= 2 and i + 1 < len(columns):
|
||
# Check if words are actual markers (symbols/numbers) vs
|
||
# real alphabetic words like "to", "in", "der", "die"
|
||
texts = [(w.get("text") or "").strip() for w in col_words]
|
||
alpha_count = sum(
|
||
1 for t in texts
|
||
if t and t[0].isalpha() and t not in _MARKER_CHARS
|
||
)
|
||
alpha_ratio = alpha_count / len(texts) if texts else 0
|
||
|
||
# If ≥50% of words are alphabetic, this is a real column
|
||
if alpha_ratio >= 0.5:
|
||
logger.info(
|
||
" kept narrow column %d (w=%d, avg_len=%.1f, "
|
||
"alpha=%.0f%%) — contains real words",
|
||
i, col_width, avg_len, alpha_ratio * 100,
|
||
)
|
||
else:
|
||
# Merge into next column
|
||
next_col = columns[i + 1].copy()
|
||
next_col["x_min"] = col["x_min"]
|
||
merged.append(next_col)
|
||
skip.add(i + 1)
|
||
logger.info(
|
||
" merged inline marker column %d (w=%d, avg_len=%.1f) "
|
||
"into column %d",
|
||
i, col_width, avg_len, i + 1,
|
||
)
|
||
continue
|
||
|
||
merged.append(col)
|
||
|
||
# Re-index
|
||
for i, col in enumerate(merged):
|
||
col["index"] = i
|
||
col["type"] = f"column_{i + 1}" if len(merged) > 1 else "column_text"
|
||
|
||
return merged
|
||
|
||
|
||
def _flatten_word_boxes(cells: List[Dict]) -> List[Dict]:
|
||
"""Extract all word_boxes from cells into a flat list of word dicts."""
|
||
words: List[Dict] = []
|
||
for cell in cells:
|
||
for wb in cell.get("word_boxes") or []:
|
||
if wb.get("text", "").strip():
|
||
words.append({
|
||
"text": wb["text"],
|
||
"left": wb["left"],
|
||
"top": wb["top"],
|
||
"width": wb["width"],
|
||
"height": wb["height"],
|
||
"conf": wb.get("conf", 0),
|
||
})
|
||
return words
|
||
|
||
|
||
def _words_in_zone(
|
||
words: List[Dict],
|
||
zone_y: int,
|
||
zone_h: int,
|
||
zone_x: int,
|
||
zone_w: int,
|
||
) -> List[Dict]:
|
||
"""Filter words whose Y-center falls within a zone's bounds."""
|
||
zone_y_end = zone_y + zone_h
|
||
zone_x_end = zone_x + zone_w
|
||
result = []
|
||
for w in words:
|
||
cy = w["top"] + w["height"] / 2
|
||
cx = w["left"] + w["width"] / 2
|
||
if zone_y <= cy <= zone_y_end and zone_x <= cx <= zone_x_end:
|
||
result.append(w)
|
||
return result
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Vertical divider detection and zone splitting
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_PIPE_RE_VSPLIT = re.compile(r"^\|+$")
|
||
|
||
|
||
def _detect_vertical_dividers(
|
||
words: List[Dict],
|
||
zone_x: int,
|
||
zone_w: int,
|
||
zone_y: int,
|
||
zone_h: int,
|
||
) -> List[float]:
|
||
"""Detect vertical divider lines from pipe word_boxes at consistent x.
|
||
|
||
Returns list of divider x-positions (empty if no dividers found).
|
||
"""
|
||
if not words or zone_w <= 0 or zone_h <= 0:
|
||
return []
|
||
|
||
# Collect pipe word_boxes
|
||
pipes = [
|
||
w for w in words
|
||
if _PIPE_RE_VSPLIT.match((w.get("text") or "").strip())
|
||
]
|
||
if len(pipes) < 5:
|
||
return []
|
||
|
||
# Cluster pipe x-centers by proximity
|
||
tolerance = max(15, int(zone_w * 0.02))
|
||
pipe_xs = sorted(w["left"] + w["width"] / 2 for w in pipes)
|
||
|
||
clusters: List[List[float]] = [[pipe_xs[0]]]
|
||
for x in pipe_xs[1:]:
|
||
if x - clusters[-1][-1] <= tolerance:
|
||
clusters[-1].append(x)
|
||
else:
|
||
clusters.append([x])
|
||
|
||
dividers: List[float] = []
|
||
for cluster in clusters:
|
||
if len(cluster) < 5:
|
||
continue
|
||
mean_x = sum(cluster) / len(cluster)
|
||
# Must be between 15% and 85% of zone width
|
||
rel_pos = (mean_x - zone_x) / zone_w
|
||
if rel_pos < 0.15 or rel_pos > 0.85:
|
||
continue
|
||
# Check vertical coverage: pipes must span >= 50% of zone height
|
||
cluster_pipes = [
|
||
w for w in pipes
|
||
if abs(w["left"] + w["width"] / 2 - mean_x) <= tolerance
|
||
]
|
||
ys = [w["top"] for w in cluster_pipes] + [w["top"] + w["height"] for w in cluster_pipes]
|
||
y_span = max(ys) - min(ys) if ys else 0
|
||
if y_span < zone_h * 0.5:
|
||
continue
|
||
dividers.append(mean_x)
|
||
|
||
return sorted(dividers)
|
||
|
||
|
||
def _split_zone_at_vertical_dividers(
|
||
zone: "PageZone",
|
||
divider_xs: List[float],
|
||
vsplit_group_id: int,
|
||
) -> List["PageZone"]:
|
||
"""Split a PageZone at vertical divider positions into sub-zones."""
|
||
from cv_vocab_types import PageZone
|
||
|
||
boundaries = [zone.x] + divider_xs + [zone.x + zone.width]
|
||
hints = []
|
||
for i in range(len(boundaries) - 1):
|
||
if i == 0:
|
||
hints.append("left_of_vsplit")
|
||
elif i == len(boundaries) - 2:
|
||
hints.append("right_of_vsplit")
|
||
else:
|
||
hints.append("middle_of_vsplit")
|
||
|
||
sub_zones = []
|
||
for i in range(len(boundaries) - 1):
|
||
x_start = int(boundaries[i])
|
||
x_end = int(boundaries[i + 1])
|
||
sub = PageZone(
|
||
index=0, # re-indexed later
|
||
zone_type=zone.zone_type,
|
||
y=zone.y,
|
||
height=zone.height,
|
||
x=x_start,
|
||
width=x_end - x_start,
|
||
box=zone.box,
|
||
image_overlays=zone.image_overlays,
|
||
layout_hint=hints[i],
|
||
vsplit_group=vsplit_group_id,
|
||
)
|
||
sub_zones.append(sub)
|
||
|
||
return sub_zones
|
||
|
||
|
||
def _merge_content_zones_across_boxes(
|
||
zones: List,
|
||
content_x: int,
|
||
content_w: int,
|
||
) -> List:
|
||
"""Merge content zones separated by box zones into single zones.
|
||
|
||
Box zones become image_overlays on the merged content zone.
|
||
Pattern: [content, box*, content] → [merged_content with overlay]
|
||
Box zones NOT between two content zones stay as standalone zones.
|
||
"""
|
||
if len(zones) < 3:
|
||
return zones
|
||
|
||
# Group consecutive runs of [content, box+, content]
|
||
result: List = []
|
||
i = 0
|
||
while i < len(zones):
|
||
z = zones[i]
|
||
if z.zone_type != "content":
|
||
result.append(z)
|
||
i += 1
|
||
continue
|
||
|
||
# Start of a potential merge group: content zone
|
||
group_contents = [z]
|
||
group_boxes = []
|
||
j = i + 1
|
||
# Absorb [box, content] pairs — only absorb a box if it's
|
||
# confirmed to be followed by another content zone.
|
||
while j < len(zones):
|
||
if (zones[j].zone_type == "box"
|
||
and j + 1 < len(zones)
|
||
and zones[j + 1].zone_type == "content"):
|
||
group_boxes.append(zones[j])
|
||
group_contents.append(zones[j + 1])
|
||
j += 2
|
||
else:
|
||
break
|
||
|
||
if len(group_contents) >= 2 and group_boxes:
|
||
# Merge: create one large content zone spanning all
|
||
y_min = min(c.y for c in group_contents)
|
||
y_max = max(c.y + c.height for c in group_contents)
|
||
overlays = []
|
||
for bz in group_boxes:
|
||
overlay = {
|
||
"y": bz.y,
|
||
"height": bz.height,
|
||
"x": bz.x,
|
||
"width": bz.width,
|
||
}
|
||
if bz.box:
|
||
overlay["box"] = {
|
||
"x": bz.box.x,
|
||
"y": bz.box.y,
|
||
"width": bz.box.width,
|
||
"height": bz.box.height,
|
||
"confidence": bz.box.confidence,
|
||
"border_thickness": bz.box.border_thickness,
|
||
}
|
||
overlays.append(overlay)
|
||
|
||
merged = PageZone(
|
||
index=0, # re-indexed below
|
||
zone_type="content",
|
||
y=y_min,
|
||
height=y_max - y_min,
|
||
x=content_x,
|
||
width=content_w,
|
||
image_overlays=overlays,
|
||
)
|
||
result.append(merged)
|
||
i = j
|
||
else:
|
||
# No merge possible — emit just the content zone
|
||
result.append(z)
|
||
i += 1
|
||
|
||
# Re-index zones
|
||
for idx, z in enumerate(result):
|
||
z.index = idx
|
||
|
||
logger.info(
|
||
"zone-merge: %d zones → %d zones after merging across boxes",
|
||
len(zones), len(result),
|
||
)
|
||
return result
|
||
|
||
|
||
def _detect_heading_rows_by_color(zones_data: List[Dict], img_w: int, img_h: int) -> int:
|
||
"""Detect heading rows by color + height after color annotation.
|
||
|
||
A row is a heading if:
|
||
1. ALL word_boxes have color_name != 'black' (typically 'blue')
|
||
2. Mean word height > 1.2x median height of all words in the zone
|
||
|
||
Detected heading rows are merged into a single spanning cell.
|
||
Returns count of headings detected.
|
||
"""
|
||
heading_count = 0
|
||
|
||
for z in zones_data:
|
||
cells = z.get("cells", [])
|
||
rows = z.get("rows", [])
|
||
columns = z.get("columns", [])
|
||
if not cells or not rows or len(columns) < 2:
|
||
continue
|
||
|
||
# Compute median word height across the zone
|
||
all_heights = []
|
||
for cell in cells:
|
||
for wb in cell.get("word_boxes") or []:
|
||
h = wb.get("height", 0)
|
||
if h > 0:
|
||
all_heights.append(h)
|
||
if not all_heights:
|
||
continue
|
||
all_heights_sorted = sorted(all_heights)
|
||
median_h = all_heights_sorted[len(all_heights_sorted) // 2]
|
||
|
||
heading_row_indices = []
|
||
for row in rows:
|
||
if row.get("is_header"):
|
||
continue # already detected as header
|
||
ri = row["index"]
|
||
row_cells = [c for c in cells if c.get("row_index") == ri]
|
||
row_wbs = [
|
||
wb for cell in row_cells
|
||
for wb in cell.get("word_boxes") or []
|
||
]
|
||
if not row_wbs:
|
||
continue
|
||
|
||
# Condition 1: ALL words are non-black
|
||
all_colored = all(
|
||
wb.get("color_name", "black") != "black"
|
||
for wb in row_wbs
|
||
)
|
||
if not all_colored:
|
||
continue
|
||
|
||
# Condition 2: mean height > 1.2x median
|
||
mean_h = sum(wb.get("height", 0) for wb in row_wbs) / len(row_wbs)
|
||
if mean_h <= median_h * 1.2:
|
||
continue
|
||
|
||
heading_row_indices.append(ri)
|
||
|
||
# Merge heading cells into spanning cells
|
||
for hri in heading_row_indices:
|
||
header_cells = [c for c in cells if c.get("row_index") == hri]
|
||
if len(header_cells) <= 1:
|
||
# Single cell — just mark it as heading
|
||
if header_cells:
|
||
header_cells[0]["col_type"] = "heading"
|
||
heading_count += 1
|
||
# Mark row as header
|
||
for row in rows:
|
||
if row["index"] == hri:
|
||
row["is_header"] = True
|
||
continue
|
||
|
||
# Collect all word_boxes and text from all columns
|
||
all_wb = []
|
||
all_text_parts = []
|
||
for hc in sorted(header_cells, key=lambda c: c["col_index"]):
|
||
all_wb.extend(hc.get("word_boxes", []))
|
||
if hc.get("text", "").strip():
|
||
all_text_parts.append(hc["text"].strip())
|
||
|
||
# Remove all cells for this row, replace with one spanning cell
|
||
z["cells"] = [c for c in z["cells"] if c.get("row_index") != hri]
|
||
|
||
if all_wb:
|
||
x_min = min(wb["left"] for wb in all_wb)
|
||
y_min = min(wb["top"] for wb in all_wb)
|
||
x_max = max(wb["left"] + wb["width"] for wb in all_wb)
|
||
y_max = max(wb["top"] + wb["height"] for wb in all_wb)
|
||
|
||
# Use the actual starting col_index from the first cell
|
||
first_col = min(hc["col_index"] for hc in header_cells)
|
||
zone_idx = z.get("zone_index", 0)
|
||
z["cells"].append({
|
||
"cell_id": f"Z{zone_idx}_R{hri:02d}_C{first_col}",
|
||
"zone_index": zone_idx,
|
||
"row_index": hri,
|
||
"col_index": first_col,
|
||
"col_type": "heading",
|
||
"text": " ".join(all_text_parts),
|
||
"confidence": 0.0,
|
||
"bbox_px": {"x": x_min, "y": y_min,
|
||
"w": x_max - x_min, "h": y_max - y_min},
|
||
"bbox_pct": {
|
||
"x": round(x_min / img_w * 100, 2) if img_w else 0,
|
||
"y": round(y_min / img_h * 100, 2) if img_h else 0,
|
||
"w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0,
|
||
"h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0,
|
||
},
|
||
"word_boxes": all_wb,
|
||
"ocr_engine": "words_first",
|
||
"is_bold": True,
|
||
})
|
||
|
||
# Mark row as header
|
||
for row in rows:
|
||
if row["index"] == hri:
|
||
row["is_header"] = True
|
||
heading_count += 1
|
||
|
||
return heading_count
|
||
|
||
|
||
def _detect_heading_rows_by_single_cell(
|
||
zones_data: List[Dict], img_w: int, img_h: int,
|
||
) -> int:
|
||
"""Detect heading rows that have only a single content cell.
|
||
|
||
Black headings like "Theme" have normal color and height, so they are
|
||
missed by ``_detect_heading_rows_by_color``. The distinguishing signal
|
||
is that they occupy only one column while normal vocabulary rows fill
|
||
at least 2-3 columns.
|
||
|
||
A row qualifies as a heading if:
|
||
1. It is not already marked as a header/heading.
|
||
2. It has exactly ONE cell whose col_type starts with ``column_``
|
||
(excluding column_1 / page_ref which only carries page numbers).
|
||
3. That single cell is NOT in the last column (continuation/example
|
||
lines like "2. Veränderung, Wechsel" often sit alone in column_4).
|
||
4. The text does not start with ``[`` (IPA continuation).
|
||
5. The zone has ≥3 columns and ≥5 rows (avoids false positives in
|
||
tiny zones).
|
||
6. The majority of rows in the zone have ≥2 content cells (ensures
|
||
we are in a multi-column vocab layout).
|
||
"""
|
||
heading_count = 0
|
||
|
||
for z in zones_data:
|
||
cells = z.get("cells", [])
|
||
rows = z.get("rows", [])
|
||
columns = z.get("columns", [])
|
||
if len(columns) < 3 or len(rows) < 5:
|
||
continue
|
||
|
||
# Determine the last col_index (example/sentence column)
|
||
col_indices = sorted(set(c.get("col_index", 0) for c in cells))
|
||
if not col_indices:
|
||
continue
|
||
last_col = col_indices[-1]
|
||
|
||
# Count content cells per row (column_* but not column_1/page_ref).
|
||
# Exception: column_1 cells that contain a dictionary article word
|
||
# (die/der/das etc.) ARE content — they appear in dictionary layouts
|
||
# where the leftmost column holds grammatical articles.
|
||
_ARTICLE_WORDS = {
|
||
"die", "der", "das", "dem", "den", "des", "ein", "eine",
|
||
"the", "a", "an",
|
||
}
|
||
row_content_counts: Dict[int, int] = {}
|
||
for cell in cells:
|
||
ct = cell.get("col_type", "")
|
||
if not ct.startswith("column_"):
|
||
continue
|
||
if ct == "column_1":
|
||
ctext = (cell.get("text") or "").strip().lower()
|
||
if ctext not in _ARTICLE_WORDS:
|
||
continue
|
||
ri = cell.get("row_index", -1)
|
||
row_content_counts[ri] = row_content_counts.get(ri, 0) + 1
|
||
|
||
# Majority of rows must have ≥2 content cells
|
||
multi_col_rows = sum(1 for cnt in row_content_counts.values() if cnt >= 2)
|
||
if multi_col_rows < len(rows) * 0.4:
|
||
continue
|
||
|
||
# Exclude first and last non-header rows — these are typically
|
||
# page numbers or footer text, not headings.
|
||
non_header_rows = [r for r in rows if not r.get("is_header")]
|
||
if len(non_header_rows) < 3:
|
||
continue
|
||
first_ri = non_header_rows[0]["index"]
|
||
last_ri = non_header_rows[-1]["index"]
|
||
|
||
heading_row_indices = []
|
||
for row in rows:
|
||
if row.get("is_header"):
|
||
continue
|
||
ri = row["index"]
|
||
if ri == first_ri or ri == last_ri:
|
||
continue
|
||
row_cells = [c for c in cells if c.get("row_index") == ri]
|
||
content_cells = [
|
||
c for c in row_cells
|
||
if c.get("col_type", "").startswith("column_")
|
||
and (c.get("col_type") != "column_1"
|
||
or (c.get("text") or "").strip().lower() in _ARTICLE_WORDS)
|
||
]
|
||
if len(content_cells) != 1:
|
||
continue
|
||
cell = content_cells[0]
|
||
# Not in the last column (continuation/example lines)
|
||
if cell.get("col_index") == last_col:
|
||
continue
|
||
text = (cell.get("text") or "").strip()
|
||
if not text or text.startswith("["):
|
||
continue
|
||
# Skip garbled IPA without brackets (e.g. "ska:f – ska:vz")
|
||
# but NOT text with real IPA symbols (e.g. "Theme [θˈiːm]")
|
||
_REAL_IPA_CHARS = set("ˈˌəɪɛɒʊʌæɑɔʃʒθðŋ")
|
||
if _text_has_garbled_ipa(text) and not any(c in _REAL_IPA_CHARS for c in text):
|
||
continue
|
||
heading_row_indices.append(ri)
|
||
|
||
# Guard: if >25% of eligible rows would become headings, the
|
||
# heuristic is misfiring (e.g. sparse single-column layout where
|
||
# most rows naturally have only 1 content cell).
|
||
eligible_rows = len(non_header_rows) - 2 # minus first/last excluded
|
||
if eligible_rows > 0 and len(heading_row_indices) > eligible_rows * 0.25:
|
||
logger.debug(
|
||
"Skipping single-cell heading detection for zone %s: "
|
||
"%d/%d rows would be headings (>25%%)",
|
||
z.get("zone_index"), len(heading_row_indices), eligible_rows,
|
||
)
|
||
continue
|
||
|
||
for hri in heading_row_indices:
|
||
header_cells = [c for c in cells if c.get("row_index") == hri]
|
||
if not header_cells:
|
||
continue
|
||
|
||
# Collect all word_boxes and text
|
||
all_wb = []
|
||
all_text_parts = []
|
||
for hc in sorted(header_cells, key=lambda c: c["col_index"]):
|
||
all_wb.extend(hc.get("word_boxes", []))
|
||
if hc.get("text", "").strip():
|
||
all_text_parts.append(hc["text"].strip())
|
||
|
||
first_col_idx = min(hc["col_index"] for hc in header_cells)
|
||
|
||
# Remove old cells for this row, add spanning heading cell
|
||
z["cells"] = [c for c in z["cells"] if c.get("row_index") != hri]
|
||
|
||
if all_wb:
|
||
x_min = min(wb["left"] for wb in all_wb)
|
||
y_min = min(wb["top"] for wb in all_wb)
|
||
x_max = max(wb["left"] + wb["width"] for wb in all_wb)
|
||
y_max = max(wb["top"] + wb["height"] for wb in all_wb)
|
||
else:
|
||
# Fallback to first cell bbox
|
||
bp = header_cells[0].get("bbox_px", {})
|
||
x_min = bp.get("x", 0)
|
||
y_min = bp.get("y", 0)
|
||
x_max = x_min + bp.get("w", 0)
|
||
y_max = y_min + bp.get("h", 0)
|
||
|
||
zone_idx = z.get("zone_index", 0)
|
||
z["cells"].append({
|
||
"cell_id": f"Z{zone_idx}_R{hri:02d}_C{first_col_idx}",
|
||
"zone_index": zone_idx,
|
||
"row_index": hri,
|
||
"col_index": first_col_idx,
|
||
"col_type": "heading",
|
||
"text": " ".join(all_text_parts),
|
||
"confidence": 0.0,
|
||
"bbox_px": {"x": x_min, "y": y_min,
|
||
"w": x_max - x_min, "h": y_max - y_min},
|
||
"bbox_pct": {
|
||
"x": round(x_min / img_w * 100, 2) if img_w else 0,
|
||
"y": round(y_min / img_h * 100, 2) if img_h else 0,
|
||
"w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0,
|
||
"h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0,
|
||
},
|
||
"word_boxes": all_wb,
|
||
"ocr_engine": "words_first",
|
||
"is_bold": False,
|
||
})
|
||
|
||
for row in rows:
|
||
if row["index"] == hri:
|
||
row["is_header"] = True
|
||
heading_count += 1
|
||
|
||
return heading_count
|
||
|
||
|
||
def _detect_header_rows(
|
||
rows: List[Dict],
|
||
zone_words: List[Dict],
|
||
zone_y: int,
|
||
columns: Optional[List[Dict]] = None,
|
||
skip_first_row_header: bool = False,
|
||
) -> List[int]:
|
||
"""Detect header rows: first-row heuristic + spanning header detection.
|
||
|
||
A "spanning header" is a row whose words stretch across multiple column
|
||
boundaries (e.g. "Unit4: Bonnie Scotland" centred across 4 columns).
|
||
"""
|
||
if len(rows) < 2:
|
||
return []
|
||
|
||
headers = []
|
||
|
||
if not skip_first_row_header:
|
||
first_row = rows[0]
|
||
second_row = rows[1]
|
||
|
||
# Gap between first and second row > 0.5x average row height
|
||
avg_h = sum(r["y_max"] - r["y_min"] for r in rows) / len(rows)
|
||
gap = second_row["y_min"] - first_row["y_max"]
|
||
if gap > avg_h * 0.5:
|
||
headers.append(0)
|
||
|
||
# Also check if first row words are taller than average (bold/header text)
|
||
all_heights = [w["height"] for w in zone_words]
|
||
median_h = sorted(all_heights)[len(all_heights) // 2] if all_heights else 20
|
||
first_row_words = [
|
||
w for w in zone_words
|
||
if first_row["y_min"] <= w["top"] + w["height"] / 2 <= first_row["y_max"]
|
||
]
|
||
if first_row_words:
|
||
first_h = max(w["height"] for w in first_row_words)
|
||
if first_h > median_h * 1.3:
|
||
if 0 not in headers:
|
||
headers.append(0)
|
||
|
||
# Note: Spanning-header detection (rows spanning all columns) has been
|
||
# disabled because it produces too many false positives on vocabulary
|
||
# worksheets where IPA transcriptions or short entries naturally span
|
||
# multiple columns with few words. The first-row heuristic above is
|
||
# sufficient for detecting real headers.
|
||
|
||
return headers
|
||
|
||
|
||
def _build_zone_grid(
|
||
zone_words: List[Dict],
|
||
zone_x: int,
|
||
zone_y: int,
|
||
zone_w: int,
|
||
zone_h: int,
|
||
zone_index: int,
|
||
img_w: int,
|
||
img_h: int,
|
||
global_columns: Optional[List[Dict]] = None,
|
||
skip_first_row_header: bool = False,
|
||
) -> Dict[str, Any]:
|
||
"""Build columns, rows, cells for a single zone from its words.
|
||
|
||
Args:
|
||
global_columns: If provided, use these pre-computed column boundaries
|
||
instead of detecting columns per zone. Used for content zones so
|
||
that all content zones (above/between/below boxes) share the same
|
||
column structure. Box zones always detect columns independently.
|
||
"""
|
||
if not zone_words:
|
||
return {
|
||
"columns": [],
|
||
"rows": [],
|
||
"cells": [],
|
||
"header_rows": [],
|
||
}
|
||
|
||
# Cluster rows first (needed for column alignment analysis)
|
||
rows = _cluster_rows(zone_words)
|
||
|
||
# Diagnostic logging for small/medium zones (box zones typically have 40-60 words)
|
||
if len(zone_words) <= 60:
|
||
import statistics as _st
|
||
_heights = [w['height'] for w in zone_words if w.get('height', 0) > 0]
|
||
_med_h = _st.median(_heights) if _heights else 20
|
||
_y_tol = max(_med_h * 0.5, 5)
|
||
logger.info(
|
||
"zone %d row-clustering: %d words, median_h=%.0f, y_tol=%.1f → %d rows",
|
||
zone_index, len(zone_words), _med_h, _y_tol, len(rows),
|
||
)
|
||
for w in sorted(zone_words, key=lambda ww: (ww['top'], ww['left'])):
|
||
logger.info(
|
||
" zone %d word: y=%d x=%d h=%d w=%d '%s'",
|
||
zone_index, w['top'], w['left'], w['height'], w['width'],
|
||
w.get('text', '')[:40],
|
||
)
|
||
for r in rows:
|
||
logger.info(
|
||
" zone %d row %d: y_min=%d y_max=%d y_center=%.0f",
|
||
zone_index, r['index'], r['y_min'], r['y_max'], r['y_center'],
|
||
)
|
||
|
||
# Use global columns if provided, otherwise detect per zone
|
||
columns = global_columns if global_columns else _cluster_columns_by_alignment(zone_words, zone_w, rows)
|
||
|
||
# Merge inline marker columns (bullets, numbering) into adjacent text
|
||
if not global_columns:
|
||
columns = _merge_inline_marker_columns(columns, zone_words)
|
||
|
||
if not columns or not rows:
|
||
return {
|
||
"columns": [],
|
||
"rows": [],
|
||
"cells": [],
|
||
"header_rows": [],
|
||
}
|
||
|
||
# Build cells
|
||
cells = _build_cells(zone_words, columns, rows, img_w, img_h)
|
||
|
||
# Prefix cell IDs with zone index
|
||
for cell in cells:
|
||
cell["cell_id"] = f"Z{zone_index}_{cell['cell_id']}"
|
||
cell["zone_index"] = zone_index
|
||
|
||
# Detect header rows (pass columns for spanning header detection)
|
||
header_rows = _detect_header_rows(rows, zone_words, zone_y, columns,
|
||
skip_first_row_header=skip_first_row_header)
|
||
|
||
# Merge cells in spanning header rows into a single col-0 cell
|
||
if header_rows and len(columns) >= 2:
|
||
for hri in header_rows:
|
||
header_cells = [c for c in cells if c["row_index"] == hri]
|
||
if len(header_cells) <= 1:
|
||
continue
|
||
# Collect all word_boxes and text from all columns
|
||
all_wb = []
|
||
all_text_parts = []
|
||
for hc in sorted(header_cells, key=lambda c: c["col_index"]):
|
||
all_wb.extend(hc.get("word_boxes", []))
|
||
if hc.get("text", "").strip():
|
||
all_text_parts.append(hc["text"].strip())
|
||
# Remove all header cells, replace with one spanning cell
|
||
cells = [c for c in cells if c["row_index"] != hri]
|
||
if all_wb:
|
||
x_min = min(wb["left"] for wb in all_wb)
|
||
y_min = min(wb["top"] for wb in all_wb)
|
||
x_max = max(wb["left"] + wb["width"] for wb in all_wb)
|
||
y_max = max(wb["top"] + wb["height"] for wb in all_wb)
|
||
cells.append({
|
||
"cell_id": f"R{hri:02d}_C0",
|
||
"row_index": hri,
|
||
"col_index": 0,
|
||
"col_type": "spanning_header",
|
||
"text": " ".join(all_text_parts),
|
||
"confidence": 0.0,
|
||
"bbox_px": {"x": x_min, "y": y_min,
|
||
"w": x_max - x_min, "h": y_max - y_min},
|
||
"bbox_pct": {
|
||
"x": round(x_min / img_w * 100, 2) if img_w else 0,
|
||
"y": round(y_min / img_h * 100, 2) if img_h else 0,
|
||
"w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0,
|
||
"h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0,
|
||
},
|
||
"word_boxes": all_wb,
|
||
"ocr_engine": "words_first",
|
||
"is_bold": True,
|
||
})
|
||
|
||
# Convert columns to output format with percentages
|
||
out_columns = []
|
||
for col in columns:
|
||
x_min = col["x_min"]
|
||
x_max = col["x_max"]
|
||
out_columns.append({
|
||
"index": col["index"],
|
||
"label": col["type"],
|
||
"x_min_px": round(x_min),
|
||
"x_max_px": round(x_max),
|
||
"x_min_pct": round(x_min / img_w * 100, 2) if img_w else 0,
|
||
"x_max_pct": round(x_max / img_w * 100, 2) if img_w else 0,
|
||
"bold": False,
|
||
})
|
||
|
||
# Convert rows to output format with percentages
|
||
out_rows = []
|
||
for row in rows:
|
||
out_rows.append({
|
||
"index": row["index"],
|
||
"y_min_px": round(row["y_min"]),
|
||
"y_max_px": round(row["y_max"]),
|
||
"y_min_pct": round(row["y_min"] / img_h * 100, 2) if img_h else 0,
|
||
"y_max_pct": round(row["y_max"] / img_h * 100, 2) if img_h else 0,
|
||
"is_header": row["index"] in header_rows,
|
||
})
|
||
|
||
return {
|
||
"columns": out_columns,
|
||
"rows": out_rows,
|
||
"cells": cells,
|
||
"header_rows": header_rows,
|
||
"_raw_columns": columns, # internal: for propagation to other zones
|
||
}
|
||
|
||
|
||
def _get_content_bounds(words: List[Dict]) -> tuple:
|
||
"""Get content bounds from word positions."""
|
||
if not words:
|
||
return 0, 0, 0, 0
|
||
x_min = min(w["left"] for w in words)
|
||
y_min = min(w["top"] for w in words)
|
||
x_max = max(w["left"] + w["width"] for w in words)
|
||
y_max = max(w["top"] + w["height"] for w in words)
|
||
return x_min, y_min, x_max - x_min, y_max - y_min
|
||
|
||
|
||
def _filter_decorative_margin(
|
||
words: List[Dict],
|
||
img_w: int,
|
||
log: Any,
|
||
session_id: str,
|
||
) -> Dict[str, Any]:
|
||
"""Remove words that belong to a decorative alphabet strip on a margin.
|
||
|
||
Some vocabulary worksheets have a vertical A–Z alphabet graphic along
|
||
the left or right edge. OCR reads each letter as an isolated single-
|
||
character word. These decorative elements are not content and confuse
|
||
column/row detection.
|
||
|
||
Detection criteria (phase 1 — find the strip using single-char words):
|
||
- Words are in the outer 30% of the page (left or right)
|
||
- Nearly all words are single characters (letters or digits)
|
||
- At least 8 such words form a vertical strip (≥8 unique Y positions)
|
||
- Average horizontal spread of the strip is small (< 80px)
|
||
|
||
Phase 2 — once a strip is confirmed, also remove any short word (≤3
|
||
chars) in the same narrow x-range. This catches multi-char OCR
|
||
artifacts like "Vv" that belong to the same decorative element.
|
||
|
||
Modifies *words* in place.
|
||
|
||
Returns:
|
||
Dict with 'found' (bool), 'side' (str), 'letters_detected' (int).
|
||
"""
|
||
no_strip: Dict[str, Any] = {"found": False, "side": "", "letters_detected": 0}
|
||
if not words or img_w <= 0:
|
||
return no_strip
|
||
|
||
margin_cutoff = img_w * 0.30
|
||
# Phase 1: find candidate strips using short words (1-2 chars).
|
||
# OCR often reads alphabet sidebar letters as pairs ("Aa", "Bb")
|
||
# rather than singles, so accept ≤2-char words as strip candidates.
|
||
left_strip = [
|
||
w for w in words
|
||
if len((w.get("text") or "").strip()) <= 2
|
||
and w["left"] + w.get("width", 0) / 2 < margin_cutoff
|
||
]
|
||
right_strip = [
|
||
w for w in words
|
||
if len((w.get("text") or "").strip()) <= 2
|
||
and w["left"] + w.get("width", 0) / 2 > img_w - margin_cutoff
|
||
]
|
||
|
||
for strip, side in [(left_strip, "left"), (right_strip, "right")]:
|
||
if len(strip) < 6:
|
||
continue
|
||
# Check vertical distribution: should have many distinct Y positions
|
||
y_centers = sorted(set(
|
||
int(w["top"] + w.get("height", 0) / 2) // 20 * 20 # bucket
|
||
for w in strip
|
||
))
|
||
if len(y_centers) < 6:
|
||
continue
|
||
# Check horizontal compactness
|
||
x_positions = [w["left"] for w in strip]
|
||
x_min = min(x_positions)
|
||
x_max = max(x_positions)
|
||
x_spread = x_max - x_min
|
||
if x_spread > 80:
|
||
continue
|
||
|
||
# Phase 2: strip confirmed — also collect short words in same x-range
|
||
# Expand x-range slightly to catch neighbors (e.g. "Vv" next to "U")
|
||
strip_x_lo = x_min - 20
|
||
strip_x_hi = x_max + 60 # word width + tolerance
|
||
all_strip_words = [
|
||
w for w in words
|
||
if len((w.get("text") or "").strip()) <= 3
|
||
and strip_x_lo <= w["left"] <= strip_x_hi
|
||
and (w["left"] + w.get("width", 0) / 2 < margin_cutoff
|
||
if side == "left"
|
||
else w["left"] + w.get("width", 0) / 2 > img_w - margin_cutoff)
|
||
]
|
||
|
||
strip_set = set(id(w) for w in all_strip_words)
|
||
before = len(words)
|
||
words[:] = [w for w in words if id(w) not in strip_set]
|
||
removed = before - len(words)
|
||
if removed:
|
||
log.info(
|
||
"build-grid session %s: removed %d decorative %s-margin words "
|
||
"(strip x=%d-%d)",
|
||
session_id, removed, side, strip_x_lo, strip_x_hi,
|
||
)
|
||
return {"found": True, "side": side, "letters_detected": len(strip)}
|
||
|
||
return no_strip
|
||
|
||
|
||
def _filter_footer_words(
|
||
words: List[Dict],
|
||
img_h: int,
|
||
log: Any,
|
||
session_id: str,
|
||
) -> Optional[Dict]:
|
||
"""Remove isolated words in the bottom 5% of the page (page numbers).
|
||
|
||
Modifies *words* in place and returns a page_number metadata dict
|
||
if a page number was extracted, or None.
|
||
"""
|
||
if not words or img_h <= 0:
|
||
return None
|
||
footer_y = img_h * 0.95
|
||
footer_words = [
|
||
w for w in words
|
||
if w["top"] + w.get("height", 0) / 2 > footer_y
|
||
]
|
||
if not footer_words:
|
||
return None
|
||
# Only remove if footer has very few words (≤ 3) with short text
|
||
total_text = "".join((w.get("text") or "").strip() for w in footer_words)
|
||
if len(footer_words) <= 3 and len(total_text) <= 10:
|
||
# Extract page number metadata before removing
|
||
page_number_info = {
|
||
"text": total_text.strip(),
|
||
"y_pct": round(footer_words[0]["top"] / img_h * 100, 1),
|
||
}
|
||
# Try to parse as integer
|
||
digits = "".join(c for c in total_text if c.isdigit())
|
||
if digits:
|
||
page_number_info["number"] = int(digits)
|
||
|
||
footer_set = set(id(w) for w in footer_words)
|
||
words[:] = [w for w in words if id(w) not in footer_set]
|
||
log.info(
|
||
"build-grid session %s: extracted page number '%s' and removed %d footer words",
|
||
session_id, total_text, len(footer_words),
|
||
)
|
||
return page_number_info
|
||
return None
|
||
|
||
|
||
def _filter_header_junk(
|
||
words: List[Dict],
|
||
img_h: int,
|
||
log: Any,
|
||
session_id: str,
|
||
) -> None:
|
||
"""Remove OCR junk from header illustrations above the real content.
|
||
|
||
Textbook pages often have decorative header graphics (illustrations,
|
||
icons) that OCR reads as low-confidence junk characters. Real content
|
||
typically starts further down the page.
|
||
|
||
Algorithm:
|
||
1. Find the "content start" — the first Y position where a dense
|
||
horizontal row of 3+ high-confidence words begins.
|
||
2. Above that line, remove words with conf < 75 and text ≤ 3 chars.
|
||
These are almost certainly OCR artifacts from illustrations.
|
||
|
||
Modifies *words* in place.
|
||
"""
|
||
if not words or img_h <= 0:
|
||
return
|
||
|
||
# --- Find content start: first horizontal row with ≥3 high-conf words ---
|
||
# Sort words by Y
|
||
sorted_by_y = sorted(words, key=lambda w: w["top"])
|
||
content_start_y = 0
|
||
_ROW_TOLERANCE = img_h * 0.02 # words within 2% of page height = same row
|
||
_MIN_ROW_WORDS = 3
|
||
_MIN_CONF = 80
|
||
|
||
i = 0
|
||
while i < len(sorted_by_y):
|
||
row_y = sorted_by_y[i]["top"]
|
||
# Collect words in this row band
|
||
row_words = []
|
||
j = i
|
||
while j < len(sorted_by_y) and sorted_by_y[j]["top"] - row_y < _ROW_TOLERANCE:
|
||
row_words.append(sorted_by_y[j])
|
||
j += 1
|
||
# Count high-confidence words with real text (> 1 char)
|
||
high_conf = [
|
||
w for w in row_words
|
||
if w.get("conf", 0) >= _MIN_CONF
|
||
and len((w.get("text") or "").strip()) > 1
|
||
]
|
||
if len(high_conf) >= _MIN_ROW_WORDS:
|
||
content_start_y = row_y
|
||
break
|
||
i = j if j > i else i + 1
|
||
|
||
if content_start_y <= 0:
|
||
return # no clear content start found
|
||
|
||
# --- Remove low-conf short junk above content start ---
|
||
junk = [
|
||
w for w in words
|
||
if w["top"] + w.get("height", 0) < content_start_y
|
||
and w.get("conf", 0) < 75
|
||
and len((w.get("text") or "").strip()) <= 3
|
||
]
|
||
if not junk:
|
||
return
|
||
|
||
junk_set = set(id(w) for w in junk)
|
||
before = len(words)
|
||
words[:] = [w for w in words if id(w) not in junk_set]
|
||
removed = before - len(words)
|
||
if removed:
|
||
log.info(
|
||
"build-grid session %s: removed %d header junk words above y=%d "
|
||
"(content start)",
|
||
session_id, removed, content_start_y,
|
||
)
|
||
|