Compare commits
7 Commits
038eaf783c
...
df30d4eae3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
df30d4eae3 | ||
|
|
2e6ab3a646 | ||
|
|
cc5ee74921 | ||
|
|
21d37b5da1 | ||
|
|
19cbbf310a | ||
|
|
fc0ab84e40 | ||
|
|
050d410ba0 |
@@ -1022,11 +1022,6 @@ def _insert_missing_ipa(text: str, pronunciation: str = 'british') -> str:
|
||||
(e.g. "scare skea" where "skea" is garbled /skɛə/). This scans the text
|
||||
for the headword, inserts correct [IPA], and strips the garbled fragments.
|
||||
|
||||
IMPORTANT: This function must only be called when ``_text_has_garbled_ipa``
|
||||
confirms that the text actually contains garbled phonetics. If the text
|
||||
is clean (e.g. just "scissors"), IPA must NOT be inserted — the original
|
||||
page had no phonetics on that line.
|
||||
|
||||
Only inserts for words that:
|
||||
- are standalone (not already followed by a bracket)
|
||||
- have an IPA entry in the dictionary
|
||||
@@ -1065,6 +1060,49 @@ def _insert_missing_ipa(text: str, pronunciation: str = 'british') -> str:
|
||||
# Fallback: try without hyphens (e.g. "second-hand" → "secondhand")
|
||||
if not ipa and '-' in clean:
|
||||
ipa = _lookup_ipa(clean.replace('-', ''), pronunciation)
|
||||
# Fallback 1: IPA-marker split for merged tokens where OCR
|
||||
# joined headword with its IPA (e.g. "schoolbagsku:lbæg").
|
||||
# Find the first IPA marker character (:, æ, ɪ, etc.), walk
|
||||
# backwards ≤3 chars for the onset consonant cluster, and
|
||||
# split into headword + OCR IPA.
|
||||
_IPA_SPLIT_CHARS = set(':əɪɛɒʊʌæɑɔʃʒθðŋˈˌ')
|
||||
if not ipa:
|
||||
first_marker = next(
|
||||
(p for p, ch in enumerate(w) if ch in _IPA_SPLIT_CHARS), -1,
|
||||
)
|
||||
if first_marker >= 3:
|
||||
split = first_marker
|
||||
while (split > 0
|
||||
and split > first_marker - 3
|
||||
and w[split - 1].isalpha()
|
||||
and w[split - 1].islower()):
|
||||
split -= 1
|
||||
if split >= 2:
|
||||
headword = w[:split]
|
||||
ocr_ipa = w[split:]
|
||||
hw_ipa = _lookup_ipa(headword, pronunciation)
|
||||
if hw_ipa:
|
||||
words[i] = f"{headword} [{hw_ipa}]"
|
||||
else:
|
||||
# Word not in dictionary — use OCR IPA
|
||||
words[i] = f"{headword} [{ocr_ipa}]"
|
||||
words = words[:i + 1]
|
||||
ipa = True # signal that we handled it
|
||||
break
|
||||
# Fallback 2: prefix matching for merged tokens WITHOUT IPA
|
||||
# markers (e.g. "Scotland'skotland"). Find longest dictionary
|
||||
# prefix using only alpha chars to avoid punctuation matches.
|
||||
if not ipa:
|
||||
alpha = re.sub(r'[^a-zA-Z]', '', clean)
|
||||
if len(alpha) > 5: # need at least 6 chars for meaningful split
|
||||
for end in range(len(alpha), 3, -1): # min prefix 4 chars
|
||||
prefix = alpha[:end]
|
||||
test_ipa = _lookup_ipa(prefix, pronunciation)
|
||||
if test_ipa:
|
||||
ipa = test_ipa
|
||||
w = prefix
|
||||
words[i] = prefix
|
||||
break
|
||||
if ipa:
|
||||
words[i] = f"{w} [{ipa}]"
|
||||
# Strip garbled OCR phonetics after the IPA bracket.
|
||||
@@ -1096,6 +1134,155 @@ def _insert_missing_ipa(text: str, pronunciation: str = 'british') -> str:
|
||||
return ' '.join(words)
|
||||
|
||||
|
||||
def _has_non_dict_trailing(text: str, pronunciation: str = 'british') -> bool:
|
||||
"""Check if text has a headword followed by non-dictionary trailing words.
|
||||
|
||||
Used as an additional trigger for ``_insert_missing_ipa`` when
|
||||
``_text_has_garbled_ipa`` returns False because the garbled IPA
|
||||
happens to look like plain ASCII (e.g. "skea" for /skɛə/).
|
||||
"""
|
||||
if not IPA_AVAILABLE:
|
||||
return False
|
||||
words = text.strip().split()
|
||||
if len(words) < 2 or len(words) > 6:
|
||||
return False
|
||||
# Find first dictionary word
|
||||
hw_idx = -1
|
||||
for i, w in enumerate(words):
|
||||
clean = re.sub(r'[^a-zA-Z\'-]', '', w)
|
||||
if not clean or len(clean) < 2:
|
||||
continue
|
||||
if clean.lower() in _GRAMMAR_BRACKET_WORDS:
|
||||
continue
|
||||
if _lookup_ipa(clean, pronunciation):
|
||||
hw_idx = i
|
||||
break
|
||||
if hw_idx < 0 or hw_idx >= len(words) - 1:
|
||||
return False
|
||||
# Check ALL remaining words — if none are dictionary/delimiter/German,
|
||||
# they are likely garbled IPA.
|
||||
for j in range(hw_idx + 1, len(words)):
|
||||
wj = words[j]
|
||||
if wj in ('–', '—', '-', '/', '|', ',', ';'):
|
||||
return False
|
||||
clean_j = re.sub(r'[^a-zA-Z]', '', wj)
|
||||
if clean_j and clean_j[0].isupper():
|
||||
return False
|
||||
if clean_j and len(clean_j) >= 2 and _lookup_ipa(clean_j, pronunciation):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _strip_post_bracket_garbled(
|
||||
text: str, pronunciation: str = 'british',
|
||||
) -> str:
|
||||
"""Strip garbled IPA fragments that trail after proper [IPA] brackets.
|
||||
|
||||
E.g. ``sea [sˈiː] si:`` → ``sea [sˈiː]``
|
||||
``seat [sˈiːt] si:t`` → ``seat [sˈiːt]``
|
||||
"""
|
||||
if ']' not in text:
|
||||
return text
|
||||
last_bracket = text.rfind(']')
|
||||
if last_bracket >= len(text) - 1:
|
||||
return text
|
||||
before = text[:last_bracket + 1].rstrip()
|
||||
after = text[last_bracket + 1:].strip()
|
||||
if not after:
|
||||
return text
|
||||
after_words = after.split()
|
||||
kept: List[str] = []
|
||||
for idx, w in enumerate(after_words):
|
||||
# Delimiter — keep rest
|
||||
if w in ('–', '—', '-', '/', '|', ',', ';'):
|
||||
kept.extend(after_words[idx:])
|
||||
break
|
||||
# Contains IPA markers (length mark, IPA chars) — garbled, skip
|
||||
if ':' in w or any(c in w for c in 'əɪɛɒʊʌæɑɔʃʒθðŋˈˌ'):
|
||||
continue
|
||||
clean = re.sub(r'[^a-zA-Z]', '', w)
|
||||
# Uppercase — likely German, keep rest
|
||||
if clean and clean[0].isupper():
|
||||
kept.extend(after_words[idx:])
|
||||
break
|
||||
# Known English word — keep rest
|
||||
if clean and len(clean) >= 2 and _lookup_ipa(clean, pronunciation):
|
||||
kept.extend(after_words[idx:])
|
||||
break
|
||||
# Unknown short word — likely garbled, skip
|
||||
if kept:
|
||||
return before + ' ' + ' '.join(kept)
|
||||
return before
|
||||
|
||||
|
||||
def fix_ipa_continuation_cell(
|
||||
garbled_text: str,
|
||||
headword_text: str,
|
||||
pronunciation: str = 'british',
|
||||
) -> str:
|
||||
"""Replace garbled IPA in a continuation row with proper IPA.
|
||||
|
||||
Continuation rows appear below the headword and contain only the
|
||||
printed phonetic transcription, which OCR garbles into fragments
|
||||
like ``ska:f – ska:vz`` (should be ``[skˈɑːf] – [skˈɑːvz]``).
|
||||
|
||||
Args:
|
||||
garbled_text: The OCR-garbled IPA text from the continuation row.
|
||||
headword_text: The headword text from the previous row
|
||||
(e.g. ``scarf – scarves``).
|
||||
pronunciation: ``'british'`` or ``'american'``.
|
||||
|
||||
Returns:
|
||||
Corrected IPA text, or the original if no fix could be applied.
|
||||
"""
|
||||
if not IPA_AVAILABLE or not garbled_text or not headword_text:
|
||||
return garbled_text
|
||||
|
||||
# Strip existing IPA brackets from headword text
|
||||
clean_hw = re.sub(r'\[[^\]]*\]', '', headword_text).strip()
|
||||
if not clean_hw:
|
||||
return garbled_text
|
||||
|
||||
# Split headword by delimiters (– — -)
|
||||
# "scarf – scarves" → ["scarf", "scarves"]
|
||||
# "see - saw - seen" → ["see", "saw", "seen"]
|
||||
parts = re.split(r'\s*[–—]\s*|\s+-\s+', clean_hw)
|
||||
parts = [p.strip() for p in parts if p.strip()]
|
||||
|
||||
if not parts:
|
||||
return garbled_text
|
||||
|
||||
# Look up IPA for each headword part
|
||||
ipa_parts: List[str] = []
|
||||
for part in parts:
|
||||
# A part may be multi-word like "secondary school"
|
||||
words = part.split()
|
||||
word_ipas: List[str] = []
|
||||
for w in words:
|
||||
clean_w = re.sub(r'[^a-zA-Z\'-]', '', w)
|
||||
if not clean_w or len(clean_w) < 2:
|
||||
continue
|
||||
# Skip grammar words like "to" at the start
|
||||
if clean_w.lower() in _GRAMMAR_BRACKET_WORDS:
|
||||
continue
|
||||
ipa = _lookup_ipa(clean_w, pronunciation)
|
||||
if ipa:
|
||||
word_ipas.append(ipa)
|
||||
if word_ipas:
|
||||
ipa_parts.append('[' + ' '.join(word_ipas) + ']')
|
||||
|
||||
if not ipa_parts:
|
||||
return garbled_text
|
||||
|
||||
# Join with delimiter
|
||||
result = ' – '.join(ipa_parts)
|
||||
logger.debug(
|
||||
"fix_ipa_continuation: '%s' → '%s' (headwords: '%s')",
|
||||
garbled_text, result, headword_text,
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
def _insert_headword_ipa(text: str, pronunciation: str = 'british') -> str:
|
||||
"""Insert IPA for the first English headword in a long mixed-language line.
|
||||
|
||||
@@ -1174,11 +1361,15 @@ def fix_cell_phonetics(
|
||||
if col_type == 'column_en':
|
||||
# Full processing: replace garbled IPA, strip orphan brackets.
|
||||
new_text = _replace_phonetics_in_text(text, pronunciation, strip_orphans=True)
|
||||
if new_text == text and _text_has_garbled_ipa(text):
|
||||
# Only insert IPA when there IS garbled phonetics in the
|
||||
# text — never add IPA to clean text that had none on the
|
||||
# original page.
|
||||
new_text = _insert_missing_ipa(text, pronunciation)
|
||||
if new_text == text:
|
||||
# Insert IPA when garbled phonetics exist OR when trailing
|
||||
# non-dictionary words suggest garbled IPA in plain ASCII.
|
||||
if _text_has_garbled_ipa(text) or _has_non_dict_trailing(text, pronunciation):
|
||||
new_text = _insert_missing_ipa(text, pronunciation)
|
||||
# Strip trailing garbled fragments after proper [IPA] brackets
|
||||
# (e.g. "sea [sˈiː] si:" → "sea [sˈiː]")
|
||||
if ']' in new_text:
|
||||
new_text = _strip_post_bracket_garbled(new_text, pronunciation)
|
||||
else:
|
||||
# column_text: replace garbled IPA, no orphan stripping
|
||||
new_text = _replace_phonetics_in_text(text, pronunciation, strip_orphans=False)
|
||||
|
||||
@@ -178,3 +178,4 @@ class PageZone:
|
||||
width: int
|
||||
box: Optional[DetectedBox] = None
|
||||
columns: List[ColumnGeometry] = field(default_factory=list)
|
||||
image_overlays: List[Dict] = field(default_factory=list)
|
||||
|
||||
@@ -21,8 +21,9 @@ import numpy as np
|
||||
from fastapi import APIRouter, HTTPException, Request
|
||||
|
||||
from cv_box_detect import detect_boxes, split_page_into_zones
|
||||
from cv_vocab_types import PageZone
|
||||
from cv_color_detect import detect_word_colors, recover_colored_text
|
||||
from cv_ocr_engines import fix_cell_phonetics
|
||||
from cv_ocr_engines import fix_cell_phonetics, fix_ipa_continuation_cell, _text_has_garbled_ipa
|
||||
from cv_words_first import _cluster_rows, _build_cells
|
||||
from ocr_pipeline_session_store import (
|
||||
get_session_db,
|
||||
@@ -439,6 +440,217 @@ def _words_in_zone(
|
||||
return result
|
||||
|
||||
|
||||
def _merge_content_zones_across_boxes(
|
||||
zones: List,
|
||||
content_x: int,
|
||||
content_w: int,
|
||||
) -> List:
|
||||
"""Merge content zones separated by box zones into single zones.
|
||||
|
||||
Box zones become image_overlays on the merged content zone.
|
||||
Pattern: [content, box*, content] → [merged_content with overlay]
|
||||
Box zones NOT between two content zones stay as standalone zones.
|
||||
"""
|
||||
if len(zones) < 3:
|
||||
return zones
|
||||
|
||||
# Group consecutive runs of [content, box+, content]
|
||||
result: List = []
|
||||
i = 0
|
||||
while i < len(zones):
|
||||
z = zones[i]
|
||||
if z.zone_type != "content":
|
||||
result.append(z)
|
||||
i += 1
|
||||
continue
|
||||
|
||||
# Start of a potential merge group: content zone
|
||||
group_contents = [z]
|
||||
group_boxes = []
|
||||
j = i + 1
|
||||
# Absorb [box, content] pairs — only absorb a box if it's
|
||||
# confirmed to be followed by another content zone.
|
||||
while j < len(zones):
|
||||
if (zones[j].zone_type == "box"
|
||||
and j + 1 < len(zones)
|
||||
and zones[j + 1].zone_type == "content"):
|
||||
group_boxes.append(zones[j])
|
||||
group_contents.append(zones[j + 1])
|
||||
j += 2
|
||||
else:
|
||||
break
|
||||
|
||||
if len(group_contents) >= 2 and group_boxes:
|
||||
# Merge: create one large content zone spanning all
|
||||
y_min = min(c.y for c in group_contents)
|
||||
y_max = max(c.y + c.height for c in group_contents)
|
||||
overlays = []
|
||||
for bz in group_boxes:
|
||||
overlay = {
|
||||
"y": bz.y,
|
||||
"height": bz.height,
|
||||
"x": bz.x,
|
||||
"width": bz.width,
|
||||
}
|
||||
if bz.box:
|
||||
overlay["box"] = {
|
||||
"x": bz.box.x,
|
||||
"y": bz.box.y,
|
||||
"width": bz.box.width,
|
||||
"height": bz.box.height,
|
||||
"confidence": bz.box.confidence,
|
||||
"border_thickness": bz.box.border_thickness,
|
||||
}
|
||||
overlays.append(overlay)
|
||||
|
||||
merged = PageZone(
|
||||
index=0, # re-indexed below
|
||||
zone_type="content",
|
||||
y=y_min,
|
||||
height=y_max - y_min,
|
||||
x=content_x,
|
||||
width=content_w,
|
||||
image_overlays=overlays,
|
||||
)
|
||||
result.append(merged)
|
||||
i = j
|
||||
else:
|
||||
# No merge possible — emit just the content zone
|
||||
result.append(z)
|
||||
i += 1
|
||||
|
||||
# Re-index zones
|
||||
for idx, z in enumerate(result):
|
||||
z.index = idx
|
||||
|
||||
logger.info(
|
||||
"zone-merge: %d zones → %d zones after merging across boxes",
|
||||
len(zones), len(result),
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
def _detect_heading_rows_by_color(zones_data: List[Dict], img_w: int, img_h: int) -> int:
|
||||
"""Detect heading rows by color + height after color annotation.
|
||||
|
||||
A row is a heading if:
|
||||
1. ALL word_boxes have color_name != 'black' (typically 'blue')
|
||||
2. Mean word height > 1.2x median height of all words in the zone
|
||||
|
||||
Detected heading rows are merged into a single spanning cell.
|
||||
Returns count of headings detected.
|
||||
"""
|
||||
heading_count = 0
|
||||
|
||||
for z in zones_data:
|
||||
cells = z.get("cells", [])
|
||||
rows = z.get("rows", [])
|
||||
columns = z.get("columns", [])
|
||||
if not cells or not rows or len(columns) < 2:
|
||||
continue
|
||||
|
||||
# Compute median word height across the zone
|
||||
all_heights = []
|
||||
for cell in cells:
|
||||
for wb in cell.get("word_boxes") or []:
|
||||
h = wb.get("height", 0)
|
||||
if h > 0:
|
||||
all_heights.append(h)
|
||||
if not all_heights:
|
||||
continue
|
||||
all_heights_sorted = sorted(all_heights)
|
||||
median_h = all_heights_sorted[len(all_heights_sorted) // 2]
|
||||
|
||||
heading_row_indices = []
|
||||
for row in rows:
|
||||
if row.get("is_header"):
|
||||
continue # already detected as header
|
||||
ri = row["index"]
|
||||
row_cells = [c for c in cells if c.get("row_index") == ri]
|
||||
row_wbs = [
|
||||
wb for cell in row_cells
|
||||
for wb in cell.get("word_boxes") or []
|
||||
]
|
||||
if not row_wbs:
|
||||
continue
|
||||
|
||||
# Condition 1: ALL words are non-black
|
||||
all_colored = all(
|
||||
wb.get("color_name", "black") != "black"
|
||||
for wb in row_wbs
|
||||
)
|
||||
if not all_colored:
|
||||
continue
|
||||
|
||||
# Condition 2: mean height > 1.2x median
|
||||
mean_h = sum(wb.get("height", 0) for wb in row_wbs) / len(row_wbs)
|
||||
if mean_h <= median_h * 1.2:
|
||||
continue
|
||||
|
||||
heading_row_indices.append(ri)
|
||||
|
||||
# Merge heading cells into spanning cells
|
||||
for hri in heading_row_indices:
|
||||
header_cells = [c for c in cells if c.get("row_index") == hri]
|
||||
if len(header_cells) <= 1:
|
||||
# Single cell — just mark it as heading
|
||||
if header_cells:
|
||||
header_cells[0]["col_type"] = "heading"
|
||||
heading_count += 1
|
||||
# Mark row as header
|
||||
for row in rows:
|
||||
if row["index"] == hri:
|
||||
row["is_header"] = True
|
||||
continue
|
||||
|
||||
# Collect all word_boxes and text from all columns
|
||||
all_wb = []
|
||||
all_text_parts = []
|
||||
for hc in sorted(header_cells, key=lambda c: c["col_index"]):
|
||||
all_wb.extend(hc.get("word_boxes", []))
|
||||
if hc.get("text", "").strip():
|
||||
all_text_parts.append(hc["text"].strip())
|
||||
|
||||
# Remove all cells for this row, replace with one spanning cell
|
||||
z["cells"] = [c for c in z["cells"] if c.get("row_index") != hri]
|
||||
|
||||
if all_wb:
|
||||
x_min = min(wb["left"] for wb in all_wb)
|
||||
y_min = min(wb["top"] for wb in all_wb)
|
||||
x_max = max(wb["left"] + wb["width"] for wb in all_wb)
|
||||
y_max = max(wb["top"] + wb["height"] for wb in all_wb)
|
||||
|
||||
zone_idx = z.get("zone_index", 0)
|
||||
z["cells"].append({
|
||||
"cell_id": f"Z{zone_idx}_R{hri:02d}_C0",
|
||||
"zone_index": zone_idx,
|
||||
"row_index": hri,
|
||||
"col_index": 0,
|
||||
"col_type": "heading",
|
||||
"text": " ".join(all_text_parts),
|
||||
"confidence": 0.0,
|
||||
"bbox_px": {"x": x_min, "y": y_min,
|
||||
"w": x_max - x_min, "h": y_max - y_min},
|
||||
"bbox_pct": {
|
||||
"x": round(x_min / img_w * 100, 2) if img_w else 0,
|
||||
"y": round(y_min / img_h * 100, 2) if img_h else 0,
|
||||
"w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0,
|
||||
"h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0,
|
||||
},
|
||||
"word_boxes": all_wb,
|
||||
"ocr_engine": "words_first",
|
||||
"is_bold": True,
|
||||
})
|
||||
|
||||
# Mark row as header
|
||||
for row in rows:
|
||||
if row["index"] == hri:
|
||||
row["is_header"] = True
|
||||
heading_count += 1
|
||||
|
||||
return heading_count
|
||||
|
||||
|
||||
def _detect_header_rows(
|
||||
rows: List[Dict],
|
||||
zone_words: List[Dict],
|
||||
@@ -1023,6 +1235,11 @@ async def _build_grid_core(session_id: str, session: dict) -> dict:
|
||||
content_x, content_y, content_w, content_h, boxes
|
||||
)
|
||||
|
||||
# Merge content zones separated by box zones
|
||||
page_zones = _merge_content_zones_across_boxes(
|
||||
page_zones, content_x, content_w
|
||||
)
|
||||
|
||||
# --- Union columns from all content zones ---
|
||||
# Each content zone detects columns independently. Narrow
|
||||
# columns (page refs, markers) may appear in only one zone.
|
||||
@@ -1161,6 +1378,9 @@ async def _build_grid_core(session_id: str, session: dict) -> dict:
|
||||
"confidence": pz.box.confidence,
|
||||
}
|
||||
|
||||
if pz.image_overlays:
|
||||
zone_entry["image_overlays"] = pz.image_overlays
|
||||
|
||||
zones_data.append(zone_entry)
|
||||
|
||||
# 4. Fallback: no boxes detected → single zone with all words
|
||||
@@ -1282,6 +1502,11 @@ async def _build_grid_core(session_id: str, session: dict) -> dict:
|
||||
all_wb.extend(cell.get("word_boxes", []))
|
||||
detect_word_colors(img_bgr, all_wb)
|
||||
|
||||
# 5a. Heading detection by color + height (after color is available)
|
||||
heading_count = _detect_heading_rows_by_color(zones_data, img_w, img_h)
|
||||
if heading_count:
|
||||
logger.info("Detected %d heading rows by color+height", heading_count)
|
||||
|
||||
# 5b. Fix unmatched parentheses in cell text
|
||||
# OCR often misses opening "(" while detecting closing ")".
|
||||
# If a cell's text has ")" without a matching "(", prepend "(".
|
||||
@@ -1324,19 +1549,18 @@ async def _build_grid_core(session_id: str, session: dict) -> dict:
|
||||
if orig:
|
||||
cell["col_type"] = orig
|
||||
|
||||
# 5d. Remove IPA continuation rows — rows where the printed
|
||||
# 5d. Fix IPA continuation rows — rows where the printed
|
||||
# phonetic transcription wraps to a line below the headword.
|
||||
# These rows have text only in the English column (+ margin
|
||||
# noise) and fix_cell_phonetics did NOT insert IPA brackets
|
||||
# (because there's no real English word to look up).
|
||||
ipa_cont_rows: set = set()
|
||||
# These contain only garbled IPA in the EN column and nothing
|
||||
# in other columns. Replace garbled text with proper IPA
|
||||
# looked up from the headword in the previous row.
|
||||
ipa_cont_fixed = 0
|
||||
for z in zones_data:
|
||||
for row in z.get("rows", []):
|
||||
rows_sorted = sorted(z.get("rows", []), key=lambda r: r["index"])
|
||||
z_cells = z.get("cells", [])
|
||||
for idx, row in enumerate(rows_sorted):
|
||||
ri = row["index"]
|
||||
row_cells = [
|
||||
c for c in z.get("cells", [])
|
||||
if c.get("row_index") == ri
|
||||
]
|
||||
row_cells = [c for c in z_cells if c.get("row_index") == ri]
|
||||
en_cells = [
|
||||
c for c in row_cells
|
||||
if c.get("col_type") == en_col_type
|
||||
@@ -1347,41 +1571,38 @@ async def _build_grid_core(session_id: str, session: dict) -> dict:
|
||||
if c.get("col_type") != en_col_type
|
||||
and len((c.get("text") or "").strip()) >= 3
|
||||
]
|
||||
if en_cells and not other_cells:
|
||||
en_text = en_cells[0].get("text", "")
|
||||
# Strip any IPA brackets that fix_cell_phonetics
|
||||
# may have added for short dictionary matches
|
||||
# (e.g. "si" → "[si]") to check underlying text.
|
||||
text_bare = re.sub(r'\[[^\]]*\]', '', en_text).strip()
|
||||
# Garbled IPA typically contains ':' (length mark)
|
||||
# or starts with ' (stress mark), and has no word
|
||||
# with ≥3 letters that could be a real headword.
|
||||
has_headword = any(
|
||||
len(re.sub(r'[^a-zA-Z]', '', w)) >= 3
|
||||
for w in text_bare.split()
|
||||
) if text_bare else False
|
||||
looks_phonetic = (
|
||||
':' in text_bare
|
||||
or text_bare.startswith("'")
|
||||
or text_bare.startswith("\u2019")
|
||||
or not has_headword
|
||||
if not en_cells or other_cells:
|
||||
continue
|
||||
en_text = en_cells[0].get("text", "")
|
||||
if not _text_has_garbled_ipa(en_text):
|
||||
continue
|
||||
# Already has proper IPA brackets → already fixed
|
||||
if re.search(r'\[[^\]]*[ˈˌəɪɛɒʊʌæɑɔʃʒθðŋ][^\]]*\]', en_text):
|
||||
continue
|
||||
# Find headword in previous row
|
||||
if idx == 0:
|
||||
continue
|
||||
prev_ri = rows_sorted[idx - 1]["index"]
|
||||
prev_en = [
|
||||
c for c in z_cells
|
||||
if c.get("row_index") == prev_ri
|
||||
and c.get("col_type") == en_col_type
|
||||
]
|
||||
if not prev_en:
|
||||
continue
|
||||
prev_text = prev_en[0].get("text", "")
|
||||
fixed = fix_ipa_continuation_cell(
|
||||
en_text, prev_text, pronunciation="british",
|
||||
)
|
||||
if fixed != en_text:
|
||||
en_cells[0]["text"] = fixed
|
||||
ipa_cont_fixed += 1
|
||||
logger.info(
|
||||
"IPA continuation R%d: '%s' → '%s'",
|
||||
ri, en_text, fixed,
|
||||
)
|
||||
if looks_phonetic:
|
||||
ipa_cont_rows.add(ri)
|
||||
if ipa_cont_rows:
|
||||
for z in zones_data:
|
||||
z["rows"] = [
|
||||
r for r in z.get("rows", [])
|
||||
if r["index"] not in ipa_cont_rows
|
||||
]
|
||||
z["cells"] = [
|
||||
c for c in z.get("cells", [])
|
||||
if c.get("row_index") not in ipa_cont_rows
|
||||
]
|
||||
logger.info(
|
||||
"removed %d IPA continuation rows: %s",
|
||||
len(ipa_cont_rows), sorted(ipa_cont_rows),
|
||||
)
|
||||
if ipa_cont_fixed:
|
||||
logger.info("Fixed %d IPA continuation rows", ipa_cont_fixed)
|
||||
|
||||
duration = time.time() - t0
|
||||
|
||||
|
||||
360
klausur-service/backend/tests/test_grid_editor_api.py
Normal file
360
klausur-service/backend/tests/test_grid_editor_api.py
Normal file
@@ -0,0 +1,360 @@
|
||||
"""
|
||||
Tests for grid_editor_api zone merging and heading detection.
|
||||
|
||||
Covers:
|
||||
- _merge_content_zones_across_boxes: zone merging logic
|
||||
- _detect_heading_rows_by_color: heading detection by color + height
|
||||
"""
|
||||
|
||||
import sys
|
||||
sys.path.insert(0, '/app')
|
||||
|
||||
import pytest
|
||||
from cv_vocab_types import PageZone, DetectedBox
|
||||
from grid_editor_api import (
|
||||
_merge_content_zones_across_boxes,
|
||||
_detect_heading_rows_by_color,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _merge_content_zones_across_boxes
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMergeContentZonesAcrossBoxes:
|
||||
"""Test zone merging across box zones."""
|
||||
|
||||
def test_no_merge_when_less_than_3_zones(self):
|
||||
"""Fewer than 3 zones → no merge possible."""
|
||||
zones = [
|
||||
PageZone(index=0, zone_type="content", y=0, height=100, x=0, width=500),
|
||||
PageZone(index=1, zone_type="box", y=100, height=50, x=50, width=400,
|
||||
box=DetectedBox(x=50, y=100, width=400, height=50, confidence=0.9)),
|
||||
]
|
||||
result = _merge_content_zones_across_boxes(zones, 0, 500)
|
||||
assert len(result) == 2
|
||||
assert result[0].zone_type == "content"
|
||||
assert result[1].zone_type == "box"
|
||||
|
||||
def test_merge_content_box_content(self):
|
||||
"""[content, box, content] → [merged_content with overlay]."""
|
||||
zones = [
|
||||
PageZone(index=0, zone_type="content", y=0, height=100, x=0, width=500),
|
||||
PageZone(index=1, zone_type="box", y=100, height=50, x=50, width=400,
|
||||
box=DetectedBox(x=50, y=100, width=400, height=50, confidence=0.9)),
|
||||
PageZone(index=2, zone_type="content", y=150, height=200, x=0, width=500),
|
||||
]
|
||||
result = _merge_content_zones_across_boxes(zones, 0, 500)
|
||||
assert len(result) == 1
|
||||
merged = result[0]
|
||||
assert merged.zone_type == "content"
|
||||
assert merged.y == 0
|
||||
assert merged.height == 350 # 0 to 350
|
||||
assert len(merged.image_overlays) == 1
|
||||
assert merged.image_overlays[0]["y"] == 100
|
||||
assert merged.image_overlays[0]["height"] == 50
|
||||
|
||||
def test_box_at_start_not_merged(self):
|
||||
"""Box at the start (not between contents) stays separate."""
|
||||
zones = [
|
||||
PageZone(index=0, zone_type="box", y=0, height=50, x=50, width=400,
|
||||
box=DetectedBox(x=50, y=0, width=400, height=50, confidence=0.9)),
|
||||
PageZone(index=1, zone_type="content", y=50, height=100, x=0, width=500),
|
||||
PageZone(index=2, zone_type="box", y=150, height=50, x=50, width=400,
|
||||
box=DetectedBox(x=50, y=150, width=400, height=50, confidence=0.9)),
|
||||
PageZone(index=3, zone_type="content", y=200, height=200, x=0, width=500),
|
||||
]
|
||||
result = _merge_content_zones_across_boxes(zones, 0, 500)
|
||||
# Box at start stays, then content+box+content merges
|
||||
assert len(result) == 2
|
||||
assert result[0].zone_type == "box"
|
||||
assert result[1].zone_type == "content"
|
||||
assert len(result[1].image_overlays) == 1
|
||||
|
||||
def test_consecutive_boxes_not_merged(self):
|
||||
"""[content, box, box, content] → no merge (consecutive boxes rare in practice)."""
|
||||
zones = [
|
||||
PageZone(index=0, zone_type="content", y=0, height=100, x=0, width=500),
|
||||
PageZone(index=1, zone_type="box", y=100, height=50, x=50, width=400,
|
||||
box=DetectedBox(x=50, y=100, width=400, height=50, confidence=0.9)),
|
||||
PageZone(index=2, zone_type="box", y=150, height=30, x=60, width=380,
|
||||
box=DetectedBox(x=60, y=150, width=380, height=30, confidence=0.8)),
|
||||
PageZone(index=3, zone_type="content", y=180, height=200, x=0, width=500),
|
||||
]
|
||||
result = _merge_content_zones_across_boxes(zones, 0, 500)
|
||||
# Two consecutive boxes: the algorithm only merges [content, box, content]
|
||||
# pairs, so consecutive boxes break the pattern.
|
||||
assert len(result) == 4
|
||||
|
||||
def test_zone_reindexing(self):
|
||||
"""Zone indices are re-numbered after merging."""
|
||||
zones = [
|
||||
PageZone(index=0, zone_type="content", y=0, height=100, x=0, width=500),
|
||||
PageZone(index=1, zone_type="box", y=100, height=50, x=50, width=400,
|
||||
box=DetectedBox(x=50, y=100, width=400, height=50, confidence=0.9)),
|
||||
PageZone(index=2, zone_type="content", y=150, height=200, x=0, width=500),
|
||||
]
|
||||
result = _merge_content_zones_across_boxes(zones, 0, 500)
|
||||
assert result[0].index == 0
|
||||
|
||||
def test_no_boxes_passthrough(self):
|
||||
"""All-content zones pass through unchanged."""
|
||||
zones = [
|
||||
PageZone(index=0, zone_type="content", y=0, height=100, x=0, width=500),
|
||||
PageZone(index=1, zone_type="content", y=100, height=100, x=0, width=500),
|
||||
]
|
||||
result = _merge_content_zones_across_boxes(zones, 0, 500)
|
||||
assert len(result) == 2
|
||||
|
||||
def test_typical_vocab_page_pattern(self):
|
||||
"""Typical pattern: [box(VOCABULARY), content, box(image), content]
|
||||
→ box stays, content+box+content merges."""
|
||||
zones = [
|
||||
PageZone(index=0, zone_type="box", y=10, height=40, x=50, width=400,
|
||||
box=DetectedBox(x=50, y=10, width=400, height=40, confidence=0.95)),
|
||||
PageZone(index=1, zone_type="content", y=60, height=50, x=0, width=500),
|
||||
PageZone(index=2, zone_type="box", y=120, height=85, x=50, width=400,
|
||||
box=DetectedBox(x=50, y=120, width=400, height=85, confidence=0.8)),
|
||||
PageZone(index=3, zone_type="content", y=210, height=500, x=0, width=500),
|
||||
]
|
||||
result = _merge_content_zones_across_boxes(zones, 0, 500)
|
||||
assert len(result) == 2
|
||||
assert result[0].zone_type == "box" # VOCABULARY header box stays
|
||||
assert result[1].zone_type == "content" # merged content zone
|
||||
assert result[1].y == 60
|
||||
assert result[1].height == 710 - 60 # 60 to 710
|
||||
assert len(result[1].image_overlays) == 1
|
||||
assert result[1].image_overlays[0]["y"] == 120
|
||||
# Check reindexing
|
||||
assert result[0].index == 0
|
||||
assert result[1].index == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _detect_heading_rows_by_color
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDetectHeadingRowsByColor:
|
||||
"""Test heading detection by color + height."""
|
||||
|
||||
def _make_word_box(self, text, left, top, width, height, color="black"):
|
||||
return {
|
||||
"text": text,
|
||||
"left": left,
|
||||
"top": top,
|
||||
"width": width,
|
||||
"height": height,
|
||||
"color_name": color,
|
||||
"conf": 90,
|
||||
}
|
||||
|
||||
def _make_zone(self, cells, rows, columns, zone_index=0,
|
||||
bbox_x=0, bbox_y=0, bbox_w=800, bbox_h=1000):
|
||||
return {
|
||||
"zone_index": zone_index,
|
||||
"zone_type": "content",
|
||||
"bbox_px": {"x": bbox_x, "y": bbox_y, "w": bbox_w, "h": bbox_h},
|
||||
"cells": cells,
|
||||
"rows": rows,
|
||||
"columns": columns,
|
||||
}
|
||||
|
||||
def test_blue_heading_detected(self):
|
||||
"""Row with all blue words + taller height → heading."""
|
||||
# Normal rows: height ~20
|
||||
normal_cells = []
|
||||
for ri in range(5):
|
||||
normal_cells.append({
|
||||
"cell_id": f"Z0_R{ri:02d}_C0",
|
||||
"zone_index": 0,
|
||||
"row_index": ri,
|
||||
"col_index": 0,
|
||||
"col_type": "column_1",
|
||||
"text": f"word_{ri}",
|
||||
"word_boxes": [
|
||||
self._make_word_box(f"word_{ri}", 10, 100 + ri * 30, 80, 20),
|
||||
],
|
||||
})
|
||||
normal_cells.append({
|
||||
"cell_id": f"Z0_R{ri:02d}_C1",
|
||||
"zone_index": 0,
|
||||
"row_index": ri,
|
||||
"col_index": 1,
|
||||
"col_type": "column_2",
|
||||
"text": f"translation_{ri}",
|
||||
"word_boxes": [
|
||||
self._make_word_box(f"translation_{ri}", 300, 100 + ri * 30, 100, 20),
|
||||
],
|
||||
})
|
||||
|
||||
# Heading row (index 2): blue, taller (height 25)
|
||||
heading_ri = 2
|
||||
for c in normal_cells:
|
||||
if c["row_index"] == heading_ri:
|
||||
for wb in c["word_boxes"]:
|
||||
wb["color_name"] = "blue"
|
||||
wb["height"] = 25 # > 1.2 * 20 = 24
|
||||
|
||||
rows = [
|
||||
{"index": ri, "y_min_px": 100 + ri * 30, "y_max_px": 120 + ri * 30, "is_header": False}
|
||||
for ri in range(5)
|
||||
]
|
||||
columns = [
|
||||
{"index": 0, "label": "column_1"},
|
||||
{"index": 1, "label": "column_2"},
|
||||
]
|
||||
|
||||
zones_data = [self._make_zone(normal_cells, rows, columns)]
|
||||
count = _detect_heading_rows_by_color(zones_data, 800, 1000)
|
||||
|
||||
assert count == 1
|
||||
# Check that row 2 is now marked as header
|
||||
assert rows[2]["is_header"] is True
|
||||
# Check that the heading cell was created
|
||||
heading_cells = [c for c in zones_data[0]["cells"] if c["row_index"] == heading_ri]
|
||||
assert len(heading_cells) == 1
|
||||
assert heading_cells[0]["col_type"] == "heading"
|
||||
assert "word_2" in heading_cells[0]["text"]
|
||||
assert "translation_2" in heading_cells[0]["text"]
|
||||
|
||||
def test_black_row_not_heading(self):
|
||||
"""Row with black words → not a heading, even if tall."""
|
||||
cells = [
|
||||
{
|
||||
"cell_id": "Z0_R00_C0", "zone_index": 0, "row_index": 0,
|
||||
"col_index": 0, "col_type": "column_1", "text": "hello",
|
||||
"word_boxes": [self._make_word_box("hello", 10, 100, 80, 25, "black")],
|
||||
},
|
||||
{
|
||||
"cell_id": "Z0_R00_C1", "zone_index": 0, "row_index": 0,
|
||||
"col_index": 1, "col_type": "column_2", "text": "world",
|
||||
"word_boxes": [self._make_word_box("world", 300, 100, 80, 25, "black")],
|
||||
},
|
||||
{
|
||||
"cell_id": "Z0_R01_C0", "zone_index": 0, "row_index": 1,
|
||||
"col_index": 0, "col_type": "column_1", "text": "foo",
|
||||
"word_boxes": [self._make_word_box("foo", 10, 130, 80, 20, "black")],
|
||||
},
|
||||
{
|
||||
"cell_id": "Z0_R01_C1", "zone_index": 0, "row_index": 1,
|
||||
"col_index": 1, "col_type": "column_2", "text": "bar",
|
||||
"word_boxes": [self._make_word_box("bar", 300, 130, 80, 20, "black")],
|
||||
},
|
||||
]
|
||||
rows = [
|
||||
{"index": 0, "y_min_px": 100, "y_max_px": 125, "is_header": False},
|
||||
{"index": 1, "y_min_px": 130, "y_max_px": 150, "is_header": False},
|
||||
]
|
||||
columns = [{"index": 0, "label": "column_1"}, {"index": 1, "label": "column_2"}]
|
||||
zones_data = [self._make_zone(cells, rows, columns)]
|
||||
count = _detect_heading_rows_by_color(zones_data, 800, 1000)
|
||||
assert count == 0
|
||||
|
||||
def test_mixed_color_row_not_heading(self):
|
||||
"""Row with some blue and some black words → not a heading."""
|
||||
cells = [
|
||||
{
|
||||
"cell_id": "Z0_R00_C0", "zone_index": 0, "row_index": 0,
|
||||
"col_index": 0, "col_type": "column_1", "text": "Unit",
|
||||
"word_boxes": [self._make_word_box("Unit", 10, 100, 60, 25, "blue")],
|
||||
},
|
||||
{
|
||||
"cell_id": "Z0_R00_C1", "zone_index": 0, "row_index": 0,
|
||||
"col_index": 1, "col_type": "column_2", "text": "normal",
|
||||
"word_boxes": [self._make_word_box("normal", 300, 100, 80, 25, "black")],
|
||||
},
|
||||
{
|
||||
"cell_id": "Z0_R01_C0", "zone_index": 0, "row_index": 1,
|
||||
"col_index": 0, "col_type": "column_1", "text": "foo",
|
||||
"word_boxes": [self._make_word_box("foo", 10, 130, 80, 20, "black")],
|
||||
},
|
||||
{
|
||||
"cell_id": "Z0_R01_C1", "zone_index": 0, "row_index": 1,
|
||||
"col_index": 1, "col_type": "column_2", "text": "bar",
|
||||
"word_boxes": [self._make_word_box("bar", 300, 130, 80, 20, "black")],
|
||||
},
|
||||
]
|
||||
rows = [
|
||||
{"index": 0, "y_min_px": 100, "y_max_px": 125, "is_header": False},
|
||||
{"index": 1, "y_min_px": 130, "y_max_px": 150, "is_header": False},
|
||||
]
|
||||
columns = [{"index": 0, "label": "column_1"}, {"index": 1, "label": "column_2"}]
|
||||
zones_data = [self._make_zone(cells, rows, columns)]
|
||||
count = _detect_heading_rows_by_color(zones_data, 800, 1000)
|
||||
assert count == 0
|
||||
|
||||
def test_colored_but_not_tall_not_heading(self):
|
||||
"""Row with all blue words but normal height → not a heading."""
|
||||
cells = [
|
||||
{
|
||||
"cell_id": "Z0_R00_C0", "zone_index": 0, "row_index": 0,
|
||||
"col_index": 0, "col_type": "column_1", "text": "Unit",
|
||||
"word_boxes": [self._make_word_box("Unit", 10, 100, 60, 20, "blue")],
|
||||
},
|
||||
{
|
||||
"cell_id": "Z0_R00_C1", "zone_index": 0, "row_index": 0,
|
||||
"col_index": 1, "col_type": "column_2", "text": "four",
|
||||
"word_boxes": [self._make_word_box("four", 300, 100, 60, 20, "blue")],
|
||||
},
|
||||
{
|
||||
"cell_id": "Z0_R01_C0", "zone_index": 0, "row_index": 1,
|
||||
"col_index": 0, "col_type": "column_1", "text": "foo",
|
||||
"word_boxes": [self._make_word_box("foo", 10, 130, 80, 20, "black")],
|
||||
},
|
||||
{
|
||||
"cell_id": "Z0_R01_C1", "zone_index": 0, "row_index": 1,
|
||||
"col_index": 1, "col_type": "column_2", "text": "bar",
|
||||
"word_boxes": [self._make_word_box("bar", 300, 130, 80, 20, "black")],
|
||||
},
|
||||
]
|
||||
rows = [
|
||||
{"index": 0, "y_min_px": 100, "y_max_px": 120, "is_header": False},
|
||||
{"index": 1, "y_min_px": 130, "y_max_px": 150, "is_header": False},
|
||||
]
|
||||
columns = [{"index": 0, "label": "column_1"}, {"index": 1, "label": "column_2"}]
|
||||
zones_data = [self._make_zone(cells, rows, columns)]
|
||||
count = _detect_heading_rows_by_color(zones_data, 800, 1000)
|
||||
assert count == 0
|
||||
|
||||
def test_single_column_zone_skipped(self):
|
||||
"""Zones with < 2 columns are skipped."""
|
||||
cells = [
|
||||
{
|
||||
"cell_id": "Z0_R00_C0", "zone_index": 0, "row_index": 0,
|
||||
"col_index": 0, "col_type": "column_1", "text": "Unit",
|
||||
"word_boxes": [self._make_word_box("Unit", 10, 100, 60, 25, "blue")],
|
||||
},
|
||||
]
|
||||
rows = [{"index": 0, "y_min_px": 100, "y_max_px": 125, "is_header": False}]
|
||||
columns = [{"index": 0, "label": "column_1"}]
|
||||
zones_data = [self._make_zone(cells, rows, columns)]
|
||||
count = _detect_heading_rows_by_color(zones_data, 800, 1000)
|
||||
assert count == 0
|
||||
|
||||
def test_already_header_skipped(self):
|
||||
"""Rows already marked is_header are not re-detected."""
|
||||
cells = [
|
||||
{
|
||||
"cell_id": "Z0_R00_C0", "zone_index": 0, "row_index": 0,
|
||||
"col_index": 0, "col_type": "spanning_header", "text": "Header",
|
||||
"word_boxes": [self._make_word_box("Header", 10, 100, 60, 25, "blue")],
|
||||
},
|
||||
{
|
||||
"cell_id": "Z0_R01_C0", "zone_index": 0, "row_index": 1,
|
||||
"col_index": 0, "col_type": "column_1", "text": "foo",
|
||||
"word_boxes": [self._make_word_box("foo", 10, 130, 80, 20, "black")],
|
||||
},
|
||||
{
|
||||
"cell_id": "Z0_R01_C1", "zone_index": 0, "row_index": 1,
|
||||
"col_index": 1, "col_type": "column_2", "text": "bar",
|
||||
"word_boxes": [self._make_word_box("bar", 300, 130, 80, 20, "black")],
|
||||
},
|
||||
]
|
||||
rows = [
|
||||
{"index": 0, "y_min_px": 100, "y_max_px": 125, "is_header": True},
|
||||
{"index": 1, "y_min_px": 130, "y_max_px": 150, "is_header": False},
|
||||
]
|
||||
columns = [{"index": 0, "label": "column_1"}, {"index": 1, "label": "column_2"}]
|
||||
zones_data = [self._make_zone(cells, rows, columns)]
|
||||
count = _detect_heading_rows_by_color(zones_data, 800, 1000)
|
||||
assert count == 0
|
||||
Reference in New Issue
Block a user