feat: box zone artifact filter, spanning headers, parenthesis fix
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 28s
CI / test-go-edu-search (push) Successful in 27s
CI / test-python-klausur (push) Failing after 1m59s
CI / test-python-agent-core (push) Successful in 22s
CI / test-nodejs-website (push) Successful in 19s
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 28s
CI / test-go-edu-search (push) Successful in 27s
CI / test-python-klausur (push) Failing after 1m59s
CI / test-python-agent-core (push) Successful in 22s
CI / test-nodejs-website (push) Successful in 19s
1. Filter recovered single-char artifacts (!, ?, •) from box zones
where they are decorative noise, not real text markers
2. Detect spanning header rows (e.g. "Unit4: Bonnie Scotland") that
stretch across multiple columns with colored text. Merge their
cells into a single spanning cell in column 0.
3. Fix missing opening parentheses: when cell text has ")" but no
matching "(", prepend "(" to the text.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -415,9 +415,13 @@ def _detect_header_rows(
|
|||||||
rows: List[Dict],
|
rows: List[Dict],
|
||||||
zone_words: List[Dict],
|
zone_words: List[Dict],
|
||||||
zone_y: int,
|
zone_y: int,
|
||||||
|
columns: Optional[List[Dict]] = None,
|
||||||
) -> List[int]:
|
) -> List[int]:
|
||||||
"""Heuristic: the first row is a header if it has bold/large text or
|
"""Detect header rows: first-row heuristic + spanning header detection.
|
||||||
there's a significant gap after it."""
|
|
||||||
|
A "spanning header" is a row whose words stretch across multiple column
|
||||||
|
boundaries (e.g. "Unit4: Bonnie Scotland" centred across 4 columns).
|
||||||
|
"""
|
||||||
if len(rows) < 2:
|
if len(rows) < 2:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
@@ -425,25 +429,60 @@ def _detect_header_rows(
|
|||||||
first_row = rows[0]
|
first_row = rows[0]
|
||||||
second_row = rows[1]
|
second_row = rows[1]
|
||||||
|
|
||||||
# Gap between first and second row > 1.5x average row height
|
# Gap between first and second row > 0.5x average row height
|
||||||
avg_h = sum(r["y_max"] - r["y_min"] for r in rows) / len(rows)
|
avg_h = sum(r["y_max"] - r["y_min"] for r in rows) / len(rows)
|
||||||
gap = second_row["y_min"] - first_row["y_max"]
|
gap = second_row["y_min"] - first_row["y_max"]
|
||||||
if gap > avg_h * 0.5:
|
if gap > avg_h * 0.5:
|
||||||
headers.append(0)
|
headers.append(0)
|
||||||
|
|
||||||
# Also check if first row words are taller than average (bold/header text)
|
# Also check if first row words are taller than average (bold/header text)
|
||||||
|
all_heights = [w["height"] for w in zone_words]
|
||||||
|
median_h = sorted(all_heights)[len(all_heights) // 2] if all_heights else 20
|
||||||
first_row_words = [
|
first_row_words = [
|
||||||
w for w in zone_words
|
w for w in zone_words
|
||||||
if first_row["y_min"] <= w["top"] + w["height"] / 2 <= first_row["y_max"]
|
if first_row["y_min"] <= w["top"] + w["height"] / 2 <= first_row["y_max"]
|
||||||
]
|
]
|
||||||
if first_row_words:
|
if first_row_words:
|
||||||
first_h = max(w["height"] for w in first_row_words)
|
first_h = max(w["height"] for w in first_row_words)
|
||||||
all_heights = [w["height"] for w in zone_words]
|
|
||||||
median_h = sorted(all_heights)[len(all_heights) // 2] if all_heights else first_h
|
|
||||||
if first_h > median_h * 1.3:
|
if first_h > median_h * 1.3:
|
||||||
if 0 not in headers:
|
if 0 not in headers:
|
||||||
headers.append(0)
|
headers.append(0)
|
||||||
|
|
||||||
|
# Spanning header detection: rows with few words that cross column
|
||||||
|
# boundaries and don't fit the normal multi-column pattern.
|
||||||
|
if columns and len(columns) >= 2:
|
||||||
|
# Typical data row has words in 2+ columns; a spanning header has
|
||||||
|
# words that sit in the middle columns without matching the pattern.
|
||||||
|
for row in rows:
|
||||||
|
ri = row["index"]
|
||||||
|
if ri in headers:
|
||||||
|
continue
|
||||||
|
row_words = [
|
||||||
|
w for w in zone_words
|
||||||
|
if row["y_min"] <= w["top"] + w["height"] / 2 <= row["y_max"]
|
||||||
|
]
|
||||||
|
if not row_words or len(row_words) > 6:
|
||||||
|
continue # too many words to be a header
|
||||||
|
# Check if all row words are colored (common for section headers)
|
||||||
|
all_colored = all(
|
||||||
|
w.get("color_name") and w.get("color_name") != "black"
|
||||||
|
for w in row_words
|
||||||
|
)
|
||||||
|
# Check if words span across the middle columns (not in col 0)
|
||||||
|
word_x_min = min(w["left"] for w in row_words)
|
||||||
|
word_x_max = max(w["left"] + w["width"] for w in row_words)
|
||||||
|
first_col_end = columns[0]["x_max"] if columns else 0
|
||||||
|
# Header if: colored text that starts after the first column
|
||||||
|
# or spans more than 2 columns
|
||||||
|
cols_spanned = sum(
|
||||||
|
1 for c in columns
|
||||||
|
if word_x_min < c["x_max"] and word_x_max > c["x_min"]
|
||||||
|
)
|
||||||
|
if all_colored and cols_spanned >= 2:
|
||||||
|
headers.append(ri)
|
||||||
|
elif cols_spanned >= 3 and len(row_words) <= 4:
|
||||||
|
headers.append(ri)
|
||||||
|
|
||||||
return headers
|
return headers
|
||||||
|
|
||||||
|
|
||||||
@@ -522,8 +561,48 @@ def _build_zone_grid(
|
|||||||
cell["cell_id"] = f"Z{zone_index}_{cell['cell_id']}"
|
cell["cell_id"] = f"Z{zone_index}_{cell['cell_id']}"
|
||||||
cell["zone_index"] = zone_index
|
cell["zone_index"] = zone_index
|
||||||
|
|
||||||
# Detect header rows
|
# Detect header rows (pass columns for spanning header detection)
|
||||||
header_rows = _detect_header_rows(rows, zone_words, zone_y)
|
header_rows = _detect_header_rows(rows, zone_words, zone_y, columns)
|
||||||
|
|
||||||
|
# Merge cells in spanning header rows into a single col-0 cell
|
||||||
|
if header_rows and len(columns) >= 2:
|
||||||
|
for hri in header_rows:
|
||||||
|
header_cells = [c for c in cells if c["row_index"] == hri]
|
||||||
|
if len(header_cells) <= 1:
|
||||||
|
continue
|
||||||
|
# Collect all word_boxes and text from all columns
|
||||||
|
all_wb = []
|
||||||
|
all_text_parts = []
|
||||||
|
for hc in sorted(header_cells, key=lambda c: c["col_index"]):
|
||||||
|
all_wb.extend(hc.get("word_boxes", []))
|
||||||
|
if hc.get("text", "").strip():
|
||||||
|
all_text_parts.append(hc["text"].strip())
|
||||||
|
# Remove all header cells, replace with one spanning cell
|
||||||
|
cells = [c for c in cells if c["row_index"] != hri]
|
||||||
|
if all_wb:
|
||||||
|
x_min = min(wb["left"] for wb in all_wb)
|
||||||
|
y_min = min(wb["top"] for wb in all_wb)
|
||||||
|
x_max = max(wb["left"] + wb["width"] for wb in all_wb)
|
||||||
|
y_max = max(wb["top"] + wb["height"] for wb in all_wb)
|
||||||
|
cells.append({
|
||||||
|
"cell_id": f"R{hri:02d}_C0",
|
||||||
|
"row_index": hri,
|
||||||
|
"col_index": 0,
|
||||||
|
"col_type": "spanning_header",
|
||||||
|
"text": " ".join(all_text_parts),
|
||||||
|
"confidence": 0.0,
|
||||||
|
"bbox_px": {"x": x_min, "y": y_min,
|
||||||
|
"w": x_max - x_min, "h": y_max - y_min},
|
||||||
|
"bbox_pct": {
|
||||||
|
"x": round(x_min / img_w * 100, 2) if img_w else 0,
|
||||||
|
"y": round(y_min / img_h * 100, 2) if img_h else 0,
|
||||||
|
"w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0,
|
||||||
|
"h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0,
|
||||||
|
},
|
||||||
|
"word_boxes": all_wb,
|
||||||
|
"ocr_engine": "words_first",
|
||||||
|
"is_bold": True,
|
||||||
|
})
|
||||||
|
|
||||||
# Convert columns to output format with percentages
|
# Convert columns to output format with percentages
|
||||||
out_columns = []
|
out_columns = []
|
||||||
@@ -716,10 +795,29 @@ async def build_grid(session_id: str):
|
|||||||
# First pass: build grids per zone independently
|
# First pass: build grids per zone independently
|
||||||
zone_grids: List[Dict] = []
|
zone_grids: List[Dict] = []
|
||||||
|
|
||||||
|
_RECOVERED_NOISE = {"!", "?", "•", "·"}
|
||||||
|
|
||||||
for pz in page_zones:
|
for pz in page_zones:
|
||||||
zone_words = _words_in_zone(
|
zone_words = _words_in_zone(
|
||||||
all_words, pz.y, pz.height, pz.x, pz.width
|
all_words, pz.y, pz.height, pz.x, pz.width
|
||||||
)
|
)
|
||||||
|
# In box zones, filter out recovered single-char artifacts
|
||||||
|
# (decorative elements like !, ?, • from color recovery)
|
||||||
|
if pz.zone_type == "box":
|
||||||
|
before = len(zone_words)
|
||||||
|
zone_words = [
|
||||||
|
w for w in zone_words
|
||||||
|
if not (
|
||||||
|
w.get("recovered")
|
||||||
|
and w.get("text", "").strip() in _RECOVERED_NOISE
|
||||||
|
)
|
||||||
|
]
|
||||||
|
removed = before - len(zone_words)
|
||||||
|
if removed:
|
||||||
|
logger.info(
|
||||||
|
"build-grid: filtered %d recovered artifacts from box zone %d",
|
||||||
|
removed, pz.index,
|
||||||
|
)
|
||||||
grid = _build_zone_grid(
|
grid = _build_zone_grid(
|
||||||
zone_words, pz.x, pz.y, pz.width, pz.height,
|
zone_words, pz.x, pz.y, pz.width, pz.height,
|
||||||
pz.index, img_w, img_h,
|
pz.index, img_w, img_h,
|
||||||
@@ -863,6 +961,15 @@ async def build_grid(session_id: str):
|
|||||||
all_wb.extend(cell.get("word_boxes", []))
|
all_wb.extend(cell.get("word_boxes", []))
|
||||||
detect_word_colors(img_bgr, all_wb)
|
detect_word_colors(img_bgr, all_wb)
|
||||||
|
|
||||||
|
# 5b. Fix unmatched parentheses in cell text
|
||||||
|
# OCR often misses opening "(" while detecting closing ")".
|
||||||
|
# If a cell's text has ")" without a matching "(", prepend "(".
|
||||||
|
for z in zones_data:
|
||||||
|
for cell in z.get("cells", []):
|
||||||
|
text = cell.get("text", "")
|
||||||
|
if ")" in text and "(" not in text:
|
||||||
|
cell["text"] = "(" + text
|
||||||
|
|
||||||
duration = time.time() - t0
|
duration = time.time() - t0
|
||||||
|
|
||||||
# 6. Build result
|
# 6. Build result
|
||||||
|
|||||||
Reference in New Issue
Block a user