From 12b4c61bac2e7a6c1493e7357b3677d7f85b198a Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Tue, 24 Mar 2026 14:39:33 +0100 Subject: [PATCH] refactor: extract grid helpers + generic CV-gated syllable insertion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Extracted 1367 lines of helper functions from grid_editor_api.py (3051→1620 lines) into grid_editor_helpers.py (filters, detectors, zone grid building). 2. Created cv_syllable_detect.py with generic CV+pyphen logic: - Checks EVERY word_box for vertical pipe lines (not just first word) - No article-column dependency — works with any dictionary layout - CV morphological detection gates pyphen insertion 3. Grid editor scroll: calc(100vh-200px) for reliable scrolling. Co-Authored-By: Claude Opus 4.6 --- klausur-service/backend/cv_syllable_detect.py | 155 ++ klausur-service/backend/grid_editor_api.py | 1487 +---------------- .../backend/grid_editor_helpers.py | 1389 +++++++++++++++ 3 files changed, 1572 insertions(+), 1459 deletions(-) create mode 100644 klausur-service/backend/cv_syllable_detect.py create mode 100644 klausur-service/backend/grid_editor_helpers.py diff --git a/klausur-service/backend/cv_syllable_detect.py b/klausur-service/backend/cv_syllable_detect.py new file mode 100644 index 0000000..fc3bdb8 --- /dev/null +++ b/klausur-service/backend/cv_syllable_detect.py @@ -0,0 +1,155 @@ +""" +CV-based syllable divider detection and insertion for dictionary pages. + +Two-step approach: + 1. CV: morphological vertical line detection checks if a word_box image + contains thin, isolated pipe-like vertical lines (syllable dividers). + 2. pyphen: inserts syllable breaks at linguistically correct positions + for words where CV confirmed the presence of dividers. + +Lizenz: Apache 2.0 (kommerziell nutzbar) +DATENSCHUTZ: Alle Verarbeitung erfolgt lokal. +""" + +import logging +import re +from typing import Any, Dict, List + +import cv2 +import numpy as np + +logger = logging.getLogger(__name__) + + +def _word_has_pipe_lines(img_gray: np.ndarray, wb: Dict) -> bool: + """CV check: does this word_box image show thin vertical pipe dividers? + + Uses morphological opening with a tall thin kernel to isolate vertical + structures, then filters for thin (≤4px), isolated contours that are + NOT at the word edges (those would be l, I, 1 etc.). + """ + x = wb.get("left", 0) + y = wb.get("top", 0) + w = wb.get("width", 0) + h = wb.get("height", 0) + if w < 30 or h < 12: + return False + ih, iw = img_gray.shape[:2] + y1, y2 = max(0, y), min(ih, y + h) + x1, x2 = max(0, x), min(iw, x + w) + roi = img_gray[y1:y2, x1:x2] + if roi.size == 0: + return False + rh, rw = roi.shape + + # Binarize (ink = white on black background) + _, binary = cv2.threshold( + roi, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU + ) + + # Morphological opening: keep only tall vertical structures (≥55% height) + kern_h = max(int(rh * 0.55), 8) + kernel = np.ones((kern_h, 1), np.uint8) + vertical = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel) + + # Find surviving contours + contours, _ = cv2.findContours( + vertical, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE + ) + + margin = max(int(rw * 0.08), 3) + for cnt in contours: + cx, cy, cw, ch = cv2.boundingRect(cnt) + if cw > 4: + continue # too wide for a pipe + if cx < margin or cx + cw > rw - margin: + continue # at word edge — likely l, I, 1 + # Check isolation: adjacent columns should be mostly empty (ink-free) + left_zone = binary[cy:cy + ch, max(0, cx - 3):cx] + right_zone = binary[cy:cy + ch, cx + cw:min(rw, cx + cw + 3)] + left_ink = np.mean(left_zone) if left_zone.size else 255 + right_ink = np.mean(right_zone) if right_zone.size else 255 + if left_ink < 80 and right_ink < 80: + return True # isolated thin vertical line = pipe divider + return False + + +# IPA/phonetic bracket pattern — don't hyphenate transcriptions +_IPA_RE = re.compile(r'[\[\]ˈˌːʃʒθðŋɑɒæɔəɛɜɪʊʌ]') + + +def insert_syllable_dividers( + zones_data: List[Dict], + img_bgr: np.ndarray, + session_id: str, +) -> int: + """Insert pipe syllable dividers into dictionary cells where CV confirms them. + + For each cell on a dictionary page: + 1. Check if ANY word_box has CV-detected pipe lines + 2. If yes, apply pyphen to EACH word (≥4 chars) in the cell + 3. Try DE hyphenation first, then EN + + Returns the number of cells modified. + """ + try: + import pyphen + except ImportError: + logger.warning("pyphen not installed — skipping syllable insertion") + return 0 + + _hyph_de = pyphen.Pyphen(lang='de_DE') + _hyph_en = pyphen.Pyphen(lang='en_US') + img_gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) + + insertions = 0 + for z in zones_data: + for cell in z.get("cells", []): + ct = cell.get("col_type", "") + if not ct.startswith("column_"): + continue + text = cell.get("text", "") + if not text or "|" in text: + continue + if _IPA_RE.search(text): + continue + + # CV gate: check if ANY word_box in this cell has pipe lines + wbs = cell.get("word_boxes") or [] + if not any(_word_has_pipe_lines(img_gray, wb) for wb in wbs): + continue + + # Apply pyphen to each significant word in the cell + tokens = re.split(r'(\s+|[,;]+\s*)', text) + new_tokens = [] + changed = False + for tok in tokens: + # Skip whitespace/punctuation separators + if re.match(r'^[\s,;]+$', tok): + new_tokens.append(tok) + continue + # Only hyphenate words ≥ 4 alpha chars + clean = re.sub(r'[().\-]', '', tok) + if len(clean) < 4 or not re.search(r'[a-zA-ZäöüÄÖÜß]', clean): + new_tokens.append(tok) + continue + # Try DE first, then EN + hyph = _hyph_de.inserted(tok, hyphen='|') + if '|' not in hyph: + hyph = _hyph_en.inserted(tok, hyphen='|') + if '|' in hyph and hyph != tok: + new_tokens.append(hyph) + changed = True + else: + new_tokens.append(tok) + if changed: + cell["text"] = ''.join(new_tokens) + insertions += 1 + + if insertions: + logger.info( + "build-grid session %s: inserted syllable dividers in %d cells " + "(CV-validated)", + session_id, insertions, + ) + return insertions diff --git a/klausur-service/backend/grid_editor_api.py b/klausur-service/backend/grid_editor_api.py index 5c4eaa8..8eb1bfc 100644 --- a/klausur-service/backend/grid_editor_api.py +++ b/klausur-service/backend/grid_editor_api.py @@ -25,13 +25,34 @@ from cv_graphic_detect import detect_graphic_elements from cv_vocab_types import PageZone from cv_color_detect import detect_word_colors, recover_colored_text from cv_ocr_engines import fix_cell_phonetics, fix_ipa_continuation_cell, _text_has_garbled_ipa, _lookup_ipa, _words_to_reading_order_text, _group_words_into_lines -from cv_words_first import _cluster_rows, _build_cells from ocr_pipeline_session_store import ( get_session_db, get_session_image, update_session_db, ) +from grid_editor_helpers import ( + _filter_border_strip_words, + _cluster_columns_by_alignment, + _GRID_GHOST_CHARS, + _filter_border_ghosts, + _MARKER_CHARS, + _merge_inline_marker_columns, + _flatten_word_boxes, + _words_in_zone, + _PIPE_RE_VSPLIT, + _detect_vertical_dividers, + _split_zone_at_vertical_dividers, + _merge_content_zones_across_boxes, + _detect_heading_rows_by_color, + _detect_heading_rows_by_single_cell, + _detect_header_rows, + _build_zone_grid, + _get_content_bounds, + _filter_decorative_margin, + _filter_footer_words, + _filter_header_junk, +) logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["grid-editor"]) @@ -41,1373 +62,6 @@ router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["grid-editor"]) # Helpers # --------------------------------------------------------------------------- -def _filter_border_strip_words(words: List[Dict]) -> Tuple[List[Dict], int]: - """Remove page-border decoration strip words BEFORE column detection. - - Scans from each page edge inward to find the first significant x-gap - (>30 px). If the edge cluster contains <15 % of total words, those - words are removed as border-strip artifacts (alphabet letters, - illustration fragments). - - Must run BEFORE ``_build_zone_grid`` so that column detection only - sees real content words and doesn't produce inflated row counts. - """ - if len(words) < 10: - return words, 0 - - sorted_words = sorted(words, key=lambda w: w.get("left", 0)) - total = len(sorted_words) - - # -- Left-edge scan (running max right-edge) -- - left_count = 0 - running_right = 0 - for gi in range(total - 1): - running_right = max( - running_right, - sorted_words[gi].get("left", 0) + sorted_words[gi].get("width", 0), - ) - if sorted_words[gi + 1].get("left", 0) - running_right > 30: - left_count = gi + 1 - break - - # -- Right-edge scan (running min left) -- - right_count = 0 - running_left = sorted_words[-1].get("left", 0) - for gi in range(total - 1, 0, -1): - running_left = min(running_left, sorted_words[gi].get("left", 0)) - prev_right = ( - sorted_words[gi - 1].get("left", 0) - + sorted_words[gi - 1].get("width", 0) - ) - if running_left - prev_right > 30: - right_count = total - gi - break - - # Validate candidate strip: real border decorations are mostly short - # words (alphabet letters like "A", "Bb", stray marks). Multi-word - # content like "der Ranzen" or "die Schals" (continuation of German - # translations) must NOT be removed. - def _is_decorative_strip(candidates: List[Dict]) -> bool: - if not candidates: - return False - short = sum(1 for w in candidates if len((w.get("text") or "").strip()) <= 2) - return short / len(candidates) >= 0.45 - - strip_ids: set = set() - if left_count > 0 and left_count / total < 0.20: - candidates = sorted_words[:left_count] - if _is_decorative_strip(candidates): - strip_ids = {id(w) for w in candidates} - elif right_count > 0 and right_count / total < 0.20: - candidates = sorted_words[total - right_count:] - if _is_decorative_strip(candidates): - strip_ids = {id(w) for w in candidates} - - if not strip_ids: - return words, 0 - - return [w for w in words if id(w) not in strip_ids], len(strip_ids) - - -def _cluster_columns_by_alignment( - words: List[Dict], - zone_w: int, - rows: List[Dict], -) -> List[Dict[str, Any]]: - """Detect columns by clustering left-edge alignment across rows. - - Hybrid approach: - 1. Group words by row, find "group start" positions within each row - (words preceded by a large gap or first word in row) - 2. Cluster group-start left-edges by X-proximity across rows - 3. Filter by row coverage (how many rows have a group start here) - 4. Merge nearby clusters - 5. Build column boundaries - - This filters out mid-phrase word positions (e.g. IPA transcriptions, - second words in multi-word entries) by only considering positions - where a new word group begins within a row. - """ - if not words or not rows: - return [] - - total_rows = len(rows) - if total_rows == 0: - return [] - - # --- Group words by row --- - row_words: Dict[int, List[Dict]] = {} - for w in words: - y_center = w["top"] + w["height"] / 2 - best = min(rows, key=lambda r: abs(r["y_center"] - y_center)) - row_words.setdefault(best["index"], []).append(w) - - # --- Compute adaptive gap threshold for group-start detection --- - all_gaps: List[float] = [] - for ri, rw_list in row_words.items(): - sorted_rw = sorted(rw_list, key=lambda w: w["left"]) - for i in range(len(sorted_rw) - 1): - right = sorted_rw[i]["left"] + sorted_rw[i]["width"] - gap = sorted_rw[i + 1]["left"] - right - if gap > 0: - all_gaps.append(gap) - - if all_gaps: - sorted_gaps = sorted(all_gaps) - median_gap = sorted_gaps[len(sorted_gaps) // 2] - heights = [w["height"] for w in words if w.get("height", 0) > 0] - median_h = sorted(heights)[len(heights) // 2] if heights else 25 - # Column boundary: gap > 3× median gap or > 1.5× median word height - gap_threshold = max(median_gap * 3, median_h * 1.5, 30) - else: - gap_threshold = 50 - - # --- Find group-start positions (left-edges that begin a new column) --- - start_positions: List[tuple] = [] # (left_edge, row_index) - for ri, rw_list in row_words.items(): - sorted_rw = sorted(rw_list, key=lambda w: w["left"]) - # First word in row is always a group start - start_positions.append((sorted_rw[0]["left"], ri)) - for i in range(1, len(sorted_rw)): - right_prev = sorted_rw[i - 1]["left"] + sorted_rw[i - 1]["width"] - gap = sorted_rw[i]["left"] - right_prev - if gap >= gap_threshold: - start_positions.append((sorted_rw[i]["left"], ri)) - - start_positions.sort(key=lambda x: x[0]) - - logger.info( - "alignment columns: %d group-start positions from %d words " - "(gap_threshold=%.0f, %d rows)", - len(start_positions), len(words), gap_threshold, total_rows, - ) - - if not start_positions: - x_min = min(w["left"] for w in words) - x_max = max(w["left"] + w["width"] for w in words) - return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}] - - # --- Cluster group-start positions by X-proximity --- - tolerance = max(10, int(zone_w * 0.01)) - clusters: List[Dict[str, Any]] = [] - cur_edges = [start_positions[0][0]] - cur_rows = {start_positions[0][1]} - - for left, row_idx in start_positions[1:]: - if left - cur_edges[-1] <= tolerance: - cur_edges.append(left) - cur_rows.add(row_idx) - else: - clusters.append({ - "mean_x": int(sum(cur_edges) / len(cur_edges)), - "min_edge": min(cur_edges), - "max_edge": max(cur_edges), - "count": len(cur_edges), - "distinct_rows": len(cur_rows), - "row_coverage": len(cur_rows) / total_rows, - }) - cur_edges = [left] - cur_rows = {row_idx} - clusters.append({ - "mean_x": int(sum(cur_edges) / len(cur_edges)), - "min_edge": min(cur_edges), - "max_edge": max(cur_edges), - "count": len(cur_edges), - "distinct_rows": len(cur_rows), - "row_coverage": len(cur_rows) / total_rows, - }) - - # --- Filter by row coverage --- - # These thresholds must be high enough to avoid false columns in flowing - # text (random inter-word gaps) while still detecting real columns in - # vocabulary worksheets (which typically have >80% row coverage). - MIN_COVERAGE_PRIMARY = 0.35 - MIN_COVERAGE_SECONDARY = 0.12 - MIN_WORDS_SECONDARY = 4 - MIN_DISTINCT_ROWS = 3 - - # Content boundary for left-margin detection - content_x_min = min(w["left"] for w in words) - content_x_max = max(w["left"] + w["width"] for w in words) - content_span = content_x_max - content_x_min - - primary = [ - c for c in clusters - if c["row_coverage"] >= MIN_COVERAGE_PRIMARY - and c["distinct_rows"] >= MIN_DISTINCT_ROWS - ] - primary_ids = {id(c) for c in primary} - secondary = [ - c for c in clusters - if id(c) not in primary_ids - and c["row_coverage"] >= MIN_COVERAGE_SECONDARY - and c["count"] >= MIN_WORDS_SECONDARY - and c["distinct_rows"] >= MIN_DISTINCT_ROWS - ] - - # Tertiary: narrow left-margin columns (page refs, markers) that have - # too few rows for secondary but are clearly left-aligned and separated - # from the main content. These appear at the far left or far right and - # have a large gap to the nearest significant cluster. - used_ids = {id(c) for c in primary} | {id(c) for c in secondary} - sig_xs = [c["mean_x"] for c in primary + secondary] - - MIN_DISTINCT_ROWS_TERTIARY = max(MIN_DISTINCT_ROWS + 1, 4) - MIN_COVERAGE_TERTIARY = 0.05 # at least 5% of rows - tertiary = [] - for c in clusters: - if id(c) in used_ids: - continue - if c["distinct_rows"] < MIN_DISTINCT_ROWS_TERTIARY: - continue - if c["row_coverage"] < MIN_COVERAGE_TERTIARY: - continue - # Must be near left or right content margin (within 15%) - rel_pos = (c["mean_x"] - content_x_min) / content_span if content_span else 0.5 - if not (rel_pos < 0.15 or rel_pos > 0.85): - continue - # Must have significant gap to nearest significant cluster - if sig_xs: - min_dist = min(abs(c["mean_x"] - sx) for sx in sig_xs) - if min_dist < max(30, content_span * 0.02): - continue - tertiary.append(c) - - if tertiary: - for c in tertiary: - logger.info( - " tertiary (margin) cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)", - c["mean_x"], c["min_edge"], c["max_edge"], - c["count"], c["distinct_rows"], c["row_coverage"] * 100, - ) - - significant = sorted(primary + secondary + tertiary, key=lambda c: c["mean_x"]) - - for c in significant: - logger.info( - " significant cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)", - c["mean_x"], c["min_edge"], c["max_edge"], - c["count"], c["distinct_rows"], c["row_coverage"] * 100, - ) - logger.info( - "alignment columns: %d clusters, %d primary, %d secondary → %d significant", - len(clusters), len(primary), len(secondary), len(significant), - ) - - if not significant: - # Fallback: single column covering all content - x_min = min(w["left"] for w in words) - x_max = max(w["left"] + w["width"] for w in words) - return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}] - - # --- Merge nearby clusters --- - merge_distance = max(25, int(zone_w * 0.03)) - merged = [significant[0].copy()] - for s in significant[1:]: - if s["mean_x"] - merged[-1]["mean_x"] < merge_distance: - prev = merged[-1] - total = prev["count"] + s["count"] - prev["mean_x"] = ( - prev["mean_x"] * prev["count"] + s["mean_x"] * s["count"] - ) // total - prev["count"] = total - prev["min_edge"] = min(prev["min_edge"], s["min_edge"]) - prev["max_edge"] = max(prev["max_edge"], s["max_edge"]) - prev["distinct_rows"] = max(prev["distinct_rows"], s["distinct_rows"]) - else: - merged.append(s.copy()) - - logger.info( - "alignment columns: %d after merge (distance=%d)", - len(merged), merge_distance, - ) - - # --- Build column boundaries --- - margin = max(5, int(zone_w * 0.005)) - content_x_min = min(w["left"] for w in words) - content_x_max = max(w["left"] + w["width"] for w in words) - - columns: List[Dict[str, Any]] = [] - for i, cluster in enumerate(merged): - x_min = max(content_x_min, cluster["min_edge"] - margin) - if i + 1 < len(merged): - x_max = merged[i + 1]["min_edge"] - margin - else: - x_max = content_x_max - - columns.append({ - "index": i, - "type": f"column_{i + 1}" if len(merged) > 1 else "column_text", - "x_min": x_min, - "x_max": x_max, - }) - - return columns - - -# Characters that are typically OCR artefacts from box border lines. -# Intentionally excludes ! (red markers) and . , ; (real punctuation). -_GRID_GHOST_CHARS = set("|1lI[](){}/\\-—–_~=+") - - -def _filter_border_ghosts( - words: List[Dict], - boxes: List, -) -> tuple: - """Remove words sitting on box borders that are OCR artefacts. - - Returns (filtered_words, removed_count). - """ - if not boxes or not words: - return words, 0 - - # Build border bands from detected boxes - x_bands: List[tuple] = [] - y_bands: List[tuple] = [] - for b in boxes: - bt = ( - b.border_thickness - if hasattr(b, "border_thickness") - else b.get("border_thickness", 3) - ) - # Skip borderless boxes (images/graphics) — no border line to produce ghosts - if bt == 0: - continue - bx = b.x if hasattr(b, "x") else b.get("x", 0) - by = b.y if hasattr(b, "y") else b.get("y", 0) - bw = b.width if hasattr(b, "width") else b.get("w", b.get("width", 0)) - bh = b.height if hasattr(b, "height") else b.get("h", b.get("height", 0)) - margin = max(bt * 2, 10) + 6 - x_bands.append((bx - margin, bx + margin)) - x_bands.append((bx + bw - margin, bx + bw + margin)) - y_bands.append((by - margin, by + margin)) - y_bands.append((by + bh - margin, by + bh + margin)) - - def _is_ghost(w: Dict) -> bool: - text = (w.get("text") or "").strip() - if not text: - return False - # Check if any word edge (not just center) touches a border band - w_left = w["left"] - w_right = w["left"] + w["width"] - w_top = w["top"] - w_bottom = w["top"] + w["height"] - on_border = ( - any(lo <= w_left <= hi or lo <= w_right <= hi for lo, hi in x_bands) - or any(lo <= w_top <= hi or lo <= w_bottom <= hi for lo, hi in y_bands) - ) - if not on_border: - return False - if len(text) == 1 and text in _GRID_GHOST_CHARS: - return True - return False - - filtered = [w for w in words if not _is_ghost(w)] - return filtered, len(words) - len(filtered) - - -_MARKER_CHARS = set("•*·-–—|~=+#>→►▸▪◆○●□■✓✗✔✘") - - -def _merge_inline_marker_columns( - columns: List[Dict], - words: List[Dict], -) -> List[Dict]: - """Merge narrow marker columns (bullets, numbering) into adjacent text. - - Bullet points (•, *, -) and numbering (1., 2.) create narrow columns - at the left edge of a zone. These are inline markers that indent text, - not real separate columns. Merge them with their right neighbour. - - Does NOT merge columns containing alphabetic words like "to", "in", - "der", "die", "das" — those are legitimate content columns. - """ - if len(columns) < 2: - return columns - - merged: List[Dict] = [] - skip: set = set() - - for i, col in enumerate(columns): - if i in skip: - continue - - # Find words in this column - col_words = [ - w for w in words - if col["x_min"] <= w["left"] + w["width"] / 2 < col["x_max"] - ] - col_width = col["x_max"] - col["x_min"] - - # Narrow column with mostly short words → MIGHT be inline markers - if col_words and col_width < 80: - avg_len = sum(len(w.get("text", "")) for w in col_words) / len(col_words) - if avg_len <= 2 and i + 1 < len(columns): - # Check if words are actual markers (symbols/numbers) vs - # real alphabetic words like "to", "in", "der", "die" - texts = [(w.get("text") or "").strip() for w in col_words] - alpha_count = sum( - 1 for t in texts - if t and t[0].isalpha() and t not in _MARKER_CHARS - ) - alpha_ratio = alpha_count / len(texts) if texts else 0 - - # If ≥50% of words are alphabetic, this is a real column - if alpha_ratio >= 0.5: - logger.info( - " kept narrow column %d (w=%d, avg_len=%.1f, " - "alpha=%.0f%%) — contains real words", - i, col_width, avg_len, alpha_ratio * 100, - ) - else: - # Merge into next column - next_col = columns[i + 1].copy() - next_col["x_min"] = col["x_min"] - merged.append(next_col) - skip.add(i + 1) - logger.info( - " merged inline marker column %d (w=%d, avg_len=%.1f) " - "into column %d", - i, col_width, avg_len, i + 1, - ) - continue - - merged.append(col) - - # Re-index - for i, col in enumerate(merged): - col["index"] = i - col["type"] = f"column_{i + 1}" if len(merged) > 1 else "column_text" - - return merged - - -def _flatten_word_boxes(cells: List[Dict]) -> List[Dict]: - """Extract all word_boxes from cells into a flat list of word dicts.""" - words: List[Dict] = [] - for cell in cells: - for wb in cell.get("word_boxes") or []: - if wb.get("text", "").strip(): - words.append({ - "text": wb["text"], - "left": wb["left"], - "top": wb["top"], - "width": wb["width"], - "height": wb["height"], - "conf": wb.get("conf", 0), - }) - return words - - -def _words_in_zone( - words: List[Dict], - zone_y: int, - zone_h: int, - zone_x: int, - zone_w: int, -) -> List[Dict]: - """Filter words whose Y-center falls within a zone's bounds.""" - zone_y_end = zone_y + zone_h - zone_x_end = zone_x + zone_w - result = [] - for w in words: - cy = w["top"] + w["height"] / 2 - cx = w["left"] + w["width"] / 2 - if zone_y <= cy <= zone_y_end and zone_x <= cx <= zone_x_end: - result.append(w) - return result - - -# --------------------------------------------------------------------------- -# Vertical divider detection and zone splitting -# --------------------------------------------------------------------------- - -_PIPE_RE_VSPLIT = re.compile(r"^\|+$") - - -def _detect_vertical_dividers( - words: List[Dict], - zone_x: int, - zone_w: int, - zone_y: int, - zone_h: int, -) -> List[float]: - """Detect vertical divider lines from pipe word_boxes at consistent x. - - Returns list of divider x-positions (empty if no dividers found). - """ - if not words or zone_w <= 0 or zone_h <= 0: - return [] - - # Collect pipe word_boxes - pipes = [ - w for w in words - if _PIPE_RE_VSPLIT.match((w.get("text") or "").strip()) - ] - if len(pipes) < 5: - return [] - - # Cluster pipe x-centers by proximity - tolerance = max(15, int(zone_w * 0.02)) - pipe_xs = sorted(w["left"] + w["width"] / 2 for w in pipes) - - clusters: List[List[float]] = [[pipe_xs[0]]] - for x in pipe_xs[1:]: - if x - clusters[-1][-1] <= tolerance: - clusters[-1].append(x) - else: - clusters.append([x]) - - dividers: List[float] = [] - for cluster in clusters: - if len(cluster) < 5: - continue - mean_x = sum(cluster) / len(cluster) - # Must be between 15% and 85% of zone width - rel_pos = (mean_x - zone_x) / zone_w - if rel_pos < 0.15 or rel_pos > 0.85: - continue - # Check vertical coverage: pipes must span >= 50% of zone height - cluster_pipes = [ - w for w in pipes - if abs(w["left"] + w["width"] / 2 - mean_x) <= tolerance - ] - ys = [w["top"] for w in cluster_pipes] + [w["top"] + w["height"] for w in cluster_pipes] - y_span = max(ys) - min(ys) if ys else 0 - if y_span < zone_h * 0.5: - continue - dividers.append(mean_x) - - return sorted(dividers) - - -def _split_zone_at_vertical_dividers( - zone: "PageZone", - divider_xs: List[float], - vsplit_group_id: int, -) -> List["PageZone"]: - """Split a PageZone at vertical divider positions into sub-zones.""" - from cv_vocab_types import PageZone - - boundaries = [zone.x] + divider_xs + [zone.x + zone.width] - hints = [] - for i in range(len(boundaries) - 1): - if i == 0: - hints.append("left_of_vsplit") - elif i == len(boundaries) - 2: - hints.append("right_of_vsplit") - else: - hints.append("middle_of_vsplit") - - sub_zones = [] - for i in range(len(boundaries) - 1): - x_start = int(boundaries[i]) - x_end = int(boundaries[i + 1]) - sub = PageZone( - index=0, # re-indexed later - zone_type=zone.zone_type, - y=zone.y, - height=zone.height, - x=x_start, - width=x_end - x_start, - box=zone.box, - image_overlays=zone.image_overlays, - layout_hint=hints[i], - vsplit_group=vsplit_group_id, - ) - sub_zones.append(sub) - - return sub_zones - - -def _merge_content_zones_across_boxes( - zones: List, - content_x: int, - content_w: int, -) -> List: - """Merge content zones separated by box zones into single zones. - - Box zones become image_overlays on the merged content zone. - Pattern: [content, box*, content] → [merged_content with overlay] - Box zones NOT between two content zones stay as standalone zones. - """ - if len(zones) < 3: - return zones - - # Group consecutive runs of [content, box+, content] - result: List = [] - i = 0 - while i < len(zones): - z = zones[i] - if z.zone_type != "content": - result.append(z) - i += 1 - continue - - # Start of a potential merge group: content zone - group_contents = [z] - group_boxes = [] - j = i + 1 - # Absorb [box, content] pairs — only absorb a box if it's - # confirmed to be followed by another content zone. - while j < len(zones): - if (zones[j].zone_type == "box" - and j + 1 < len(zones) - and zones[j + 1].zone_type == "content"): - group_boxes.append(zones[j]) - group_contents.append(zones[j + 1]) - j += 2 - else: - break - - if len(group_contents) >= 2 and group_boxes: - # Merge: create one large content zone spanning all - y_min = min(c.y for c in group_contents) - y_max = max(c.y + c.height for c in group_contents) - overlays = [] - for bz in group_boxes: - overlay = { - "y": bz.y, - "height": bz.height, - "x": bz.x, - "width": bz.width, - } - if bz.box: - overlay["box"] = { - "x": bz.box.x, - "y": bz.box.y, - "width": bz.box.width, - "height": bz.box.height, - "confidence": bz.box.confidence, - "border_thickness": bz.box.border_thickness, - } - overlays.append(overlay) - - merged = PageZone( - index=0, # re-indexed below - zone_type="content", - y=y_min, - height=y_max - y_min, - x=content_x, - width=content_w, - image_overlays=overlays, - ) - result.append(merged) - i = j - else: - # No merge possible — emit just the content zone - result.append(z) - i += 1 - - # Re-index zones - for idx, z in enumerate(result): - z.index = idx - - logger.info( - "zone-merge: %d zones → %d zones after merging across boxes", - len(zones), len(result), - ) - return result - - -def _detect_heading_rows_by_color(zones_data: List[Dict], img_w: int, img_h: int) -> int: - """Detect heading rows by color + height after color annotation. - - A row is a heading if: - 1. ALL word_boxes have color_name != 'black' (typically 'blue') - 2. Mean word height > 1.2x median height of all words in the zone - - Detected heading rows are merged into a single spanning cell. - Returns count of headings detected. - """ - heading_count = 0 - - for z in zones_data: - cells = z.get("cells", []) - rows = z.get("rows", []) - columns = z.get("columns", []) - if not cells or not rows or len(columns) < 2: - continue - - # Compute median word height across the zone - all_heights = [] - for cell in cells: - for wb in cell.get("word_boxes") or []: - h = wb.get("height", 0) - if h > 0: - all_heights.append(h) - if not all_heights: - continue - all_heights_sorted = sorted(all_heights) - median_h = all_heights_sorted[len(all_heights_sorted) // 2] - - heading_row_indices = [] - for row in rows: - if row.get("is_header"): - continue # already detected as header - ri = row["index"] - row_cells = [c for c in cells if c.get("row_index") == ri] - row_wbs = [ - wb for cell in row_cells - for wb in cell.get("word_boxes") or [] - ] - if not row_wbs: - continue - - # Condition 1: ALL words are non-black - all_colored = all( - wb.get("color_name", "black") != "black" - for wb in row_wbs - ) - if not all_colored: - continue - - # Condition 2: mean height > 1.2x median - mean_h = sum(wb.get("height", 0) for wb in row_wbs) / len(row_wbs) - if mean_h <= median_h * 1.2: - continue - - heading_row_indices.append(ri) - - # Merge heading cells into spanning cells - for hri in heading_row_indices: - header_cells = [c for c in cells if c.get("row_index") == hri] - if len(header_cells) <= 1: - # Single cell — just mark it as heading - if header_cells: - header_cells[0]["col_type"] = "heading" - heading_count += 1 - # Mark row as header - for row in rows: - if row["index"] == hri: - row["is_header"] = True - continue - - # Collect all word_boxes and text from all columns - all_wb = [] - all_text_parts = [] - for hc in sorted(header_cells, key=lambda c: c["col_index"]): - all_wb.extend(hc.get("word_boxes", [])) - if hc.get("text", "").strip(): - all_text_parts.append(hc["text"].strip()) - - # Remove all cells for this row, replace with one spanning cell - z["cells"] = [c for c in z["cells"] if c.get("row_index") != hri] - - if all_wb: - x_min = min(wb["left"] for wb in all_wb) - y_min = min(wb["top"] for wb in all_wb) - x_max = max(wb["left"] + wb["width"] for wb in all_wb) - y_max = max(wb["top"] + wb["height"] for wb in all_wb) - - # Use the actual starting col_index from the first cell - first_col = min(hc["col_index"] for hc in header_cells) - zone_idx = z.get("zone_index", 0) - z["cells"].append({ - "cell_id": f"Z{zone_idx}_R{hri:02d}_C{first_col}", - "zone_index": zone_idx, - "row_index": hri, - "col_index": first_col, - "col_type": "heading", - "text": " ".join(all_text_parts), - "confidence": 0.0, - "bbox_px": {"x": x_min, "y": y_min, - "w": x_max - x_min, "h": y_max - y_min}, - "bbox_pct": { - "x": round(x_min / img_w * 100, 2) if img_w else 0, - "y": round(y_min / img_h * 100, 2) if img_h else 0, - "w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0, - "h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0, - }, - "word_boxes": all_wb, - "ocr_engine": "words_first", - "is_bold": True, - }) - - # Mark row as header - for row in rows: - if row["index"] == hri: - row["is_header"] = True - heading_count += 1 - - return heading_count - - -def _detect_heading_rows_by_single_cell( - zones_data: List[Dict], img_w: int, img_h: int, -) -> int: - """Detect heading rows that have only a single content cell. - - Black headings like "Theme" have normal color and height, so they are - missed by ``_detect_heading_rows_by_color``. The distinguishing signal - is that they occupy only one column while normal vocabulary rows fill - at least 2-3 columns. - - A row qualifies as a heading if: - 1. It is not already marked as a header/heading. - 2. It has exactly ONE cell whose col_type starts with ``column_`` - (excluding column_1 / page_ref which only carries page numbers). - 3. That single cell is NOT in the last column (continuation/example - lines like "2. Veränderung, Wechsel" often sit alone in column_4). - 4. The text does not start with ``[`` (IPA continuation). - 5. The zone has ≥3 columns and ≥5 rows (avoids false positives in - tiny zones). - 6. The majority of rows in the zone have ≥2 content cells (ensures - we are in a multi-column vocab layout). - """ - heading_count = 0 - - for z in zones_data: - cells = z.get("cells", []) - rows = z.get("rows", []) - columns = z.get("columns", []) - if len(columns) < 3 or len(rows) < 5: - continue - - # Determine the last col_index (example/sentence column) - col_indices = sorted(set(c.get("col_index", 0) for c in cells)) - if not col_indices: - continue - last_col = col_indices[-1] - - # Count content cells per row (column_* but not column_1/page_ref). - # Exception: column_1 cells that contain a dictionary article word - # (die/der/das etc.) ARE content — they appear in dictionary layouts - # where the leftmost column holds grammatical articles. - _ARTICLE_WORDS = { - "die", "der", "das", "dem", "den", "des", "ein", "eine", - "the", "a", "an", - } - row_content_counts: Dict[int, int] = {} - for cell in cells: - ct = cell.get("col_type", "") - if not ct.startswith("column_"): - continue - if ct == "column_1": - ctext = (cell.get("text") or "").strip().lower() - if ctext not in _ARTICLE_WORDS: - continue - ri = cell.get("row_index", -1) - row_content_counts[ri] = row_content_counts.get(ri, 0) + 1 - - # Majority of rows must have ≥2 content cells - multi_col_rows = sum(1 for cnt in row_content_counts.values() if cnt >= 2) - if multi_col_rows < len(rows) * 0.4: - continue - - # Exclude first and last non-header rows — these are typically - # page numbers or footer text, not headings. - non_header_rows = [r for r in rows if not r.get("is_header")] - if len(non_header_rows) < 3: - continue - first_ri = non_header_rows[0]["index"] - last_ri = non_header_rows[-1]["index"] - - heading_row_indices = [] - for row in rows: - if row.get("is_header"): - continue - ri = row["index"] - if ri == first_ri or ri == last_ri: - continue - row_cells = [c for c in cells if c.get("row_index") == ri] - content_cells = [ - c for c in row_cells - if c.get("col_type", "").startswith("column_") - and (c.get("col_type") != "column_1" - or (c.get("text") or "").strip().lower() in _ARTICLE_WORDS) - ] - if len(content_cells) != 1: - continue - cell = content_cells[0] - # Not in the last column (continuation/example lines) - if cell.get("col_index") == last_col: - continue - text = (cell.get("text") or "").strip() - if not text or text.startswith("["): - continue - # Skip garbled IPA without brackets (e.g. "ska:f – ska:vz") - # but NOT text with real IPA symbols (e.g. "Theme [θˈiːm]") - _REAL_IPA_CHARS = set("ˈˌəɪɛɒʊʌæɑɔʃʒθðŋ") - if _text_has_garbled_ipa(text) and not any(c in _REAL_IPA_CHARS for c in text): - continue - heading_row_indices.append(ri) - - for hri in heading_row_indices: - header_cells = [c for c in cells if c.get("row_index") == hri] - if not header_cells: - continue - - # Collect all word_boxes and text - all_wb = [] - all_text_parts = [] - for hc in sorted(header_cells, key=lambda c: c["col_index"]): - all_wb.extend(hc.get("word_boxes", [])) - if hc.get("text", "").strip(): - all_text_parts.append(hc["text"].strip()) - - first_col_idx = min(hc["col_index"] for hc in header_cells) - - # Remove old cells for this row, add spanning heading cell - z["cells"] = [c for c in z["cells"] if c.get("row_index") != hri] - - if all_wb: - x_min = min(wb["left"] for wb in all_wb) - y_min = min(wb["top"] for wb in all_wb) - x_max = max(wb["left"] + wb["width"] for wb in all_wb) - y_max = max(wb["top"] + wb["height"] for wb in all_wb) - else: - # Fallback to first cell bbox - bp = header_cells[0].get("bbox_px", {}) - x_min = bp.get("x", 0) - y_min = bp.get("y", 0) - x_max = x_min + bp.get("w", 0) - y_max = y_min + bp.get("h", 0) - - zone_idx = z.get("zone_index", 0) - z["cells"].append({ - "cell_id": f"Z{zone_idx}_R{hri:02d}_C{first_col_idx}", - "zone_index": zone_idx, - "row_index": hri, - "col_index": first_col_idx, - "col_type": "heading", - "text": " ".join(all_text_parts), - "confidence": 0.0, - "bbox_px": {"x": x_min, "y": y_min, - "w": x_max - x_min, "h": y_max - y_min}, - "bbox_pct": { - "x": round(x_min / img_w * 100, 2) if img_w else 0, - "y": round(y_min / img_h * 100, 2) if img_h else 0, - "w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0, - "h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0, - }, - "word_boxes": all_wb, - "ocr_engine": "words_first", - "is_bold": False, - }) - - for row in rows: - if row["index"] == hri: - row["is_header"] = True - heading_count += 1 - - return heading_count - - -def _detect_header_rows( - rows: List[Dict], - zone_words: List[Dict], - zone_y: int, - columns: Optional[List[Dict]] = None, - skip_first_row_header: bool = False, -) -> List[int]: - """Detect header rows: first-row heuristic + spanning header detection. - - A "spanning header" is a row whose words stretch across multiple column - boundaries (e.g. "Unit4: Bonnie Scotland" centred across 4 columns). - """ - if len(rows) < 2: - return [] - - headers = [] - - if not skip_first_row_header: - first_row = rows[0] - second_row = rows[1] - - # Gap between first and second row > 0.5x average row height - avg_h = sum(r["y_max"] - r["y_min"] for r in rows) / len(rows) - gap = second_row["y_min"] - first_row["y_max"] - if gap > avg_h * 0.5: - headers.append(0) - - # Also check if first row words are taller than average (bold/header text) - all_heights = [w["height"] for w in zone_words] - median_h = sorted(all_heights)[len(all_heights) // 2] if all_heights else 20 - first_row_words = [ - w for w in zone_words - if first_row["y_min"] <= w["top"] + w["height"] / 2 <= first_row["y_max"] - ] - if first_row_words: - first_h = max(w["height"] for w in first_row_words) - if first_h > median_h * 1.3: - if 0 not in headers: - headers.append(0) - - # Note: Spanning-header detection (rows spanning all columns) has been - # disabled because it produces too many false positives on vocabulary - # worksheets where IPA transcriptions or short entries naturally span - # multiple columns with few words. The first-row heuristic above is - # sufficient for detecting real headers. - - return headers - - -def _build_zone_grid( - zone_words: List[Dict], - zone_x: int, - zone_y: int, - zone_w: int, - zone_h: int, - zone_index: int, - img_w: int, - img_h: int, - global_columns: Optional[List[Dict]] = None, - skip_first_row_header: bool = False, -) -> Dict[str, Any]: - """Build columns, rows, cells for a single zone from its words. - - Args: - global_columns: If provided, use these pre-computed column boundaries - instead of detecting columns per zone. Used for content zones so - that all content zones (above/between/below boxes) share the same - column structure. Box zones always detect columns independently. - """ - if not zone_words: - return { - "columns": [], - "rows": [], - "cells": [], - "header_rows": [], - } - - # Cluster rows first (needed for column alignment analysis) - rows = _cluster_rows(zone_words) - - # Diagnostic logging for small/medium zones (box zones typically have 40-60 words) - if len(zone_words) <= 60: - import statistics as _st - _heights = [w['height'] for w in zone_words if w.get('height', 0) > 0] - _med_h = _st.median(_heights) if _heights else 20 - _y_tol = max(_med_h * 0.5, 5) - logger.info( - "zone %d row-clustering: %d words, median_h=%.0f, y_tol=%.1f → %d rows", - zone_index, len(zone_words), _med_h, _y_tol, len(rows), - ) - for w in sorted(zone_words, key=lambda ww: (ww['top'], ww['left'])): - logger.info( - " zone %d word: y=%d x=%d h=%d w=%d '%s'", - zone_index, w['top'], w['left'], w['height'], w['width'], - w.get('text', '')[:40], - ) - for r in rows: - logger.info( - " zone %d row %d: y_min=%d y_max=%d y_center=%.0f", - zone_index, r['index'], r['y_min'], r['y_max'], r['y_center'], - ) - - # Use global columns if provided, otherwise detect per zone - columns = global_columns if global_columns else _cluster_columns_by_alignment(zone_words, zone_w, rows) - - # Merge inline marker columns (bullets, numbering) into adjacent text - if not global_columns: - columns = _merge_inline_marker_columns(columns, zone_words) - - if not columns or not rows: - return { - "columns": [], - "rows": [], - "cells": [], - "header_rows": [], - } - - # Build cells - cells = _build_cells(zone_words, columns, rows, img_w, img_h) - - # Prefix cell IDs with zone index - for cell in cells: - cell["cell_id"] = f"Z{zone_index}_{cell['cell_id']}" - cell["zone_index"] = zone_index - - # Detect header rows (pass columns for spanning header detection) - header_rows = _detect_header_rows(rows, zone_words, zone_y, columns, - skip_first_row_header=skip_first_row_header) - - # Merge cells in spanning header rows into a single col-0 cell - if header_rows and len(columns) >= 2: - for hri in header_rows: - header_cells = [c for c in cells if c["row_index"] == hri] - if len(header_cells) <= 1: - continue - # Collect all word_boxes and text from all columns - all_wb = [] - all_text_parts = [] - for hc in sorted(header_cells, key=lambda c: c["col_index"]): - all_wb.extend(hc.get("word_boxes", [])) - if hc.get("text", "").strip(): - all_text_parts.append(hc["text"].strip()) - # Remove all header cells, replace with one spanning cell - cells = [c for c in cells if c["row_index"] != hri] - if all_wb: - x_min = min(wb["left"] for wb in all_wb) - y_min = min(wb["top"] for wb in all_wb) - x_max = max(wb["left"] + wb["width"] for wb in all_wb) - y_max = max(wb["top"] + wb["height"] for wb in all_wb) - cells.append({ - "cell_id": f"R{hri:02d}_C0", - "row_index": hri, - "col_index": 0, - "col_type": "spanning_header", - "text": " ".join(all_text_parts), - "confidence": 0.0, - "bbox_px": {"x": x_min, "y": y_min, - "w": x_max - x_min, "h": y_max - y_min}, - "bbox_pct": { - "x": round(x_min / img_w * 100, 2) if img_w else 0, - "y": round(y_min / img_h * 100, 2) if img_h else 0, - "w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0, - "h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0, - }, - "word_boxes": all_wb, - "ocr_engine": "words_first", - "is_bold": True, - }) - - # Convert columns to output format with percentages - out_columns = [] - for col in columns: - x_min = col["x_min"] - x_max = col["x_max"] - out_columns.append({ - "index": col["index"], - "label": col["type"], - "x_min_px": round(x_min), - "x_max_px": round(x_max), - "x_min_pct": round(x_min / img_w * 100, 2) if img_w else 0, - "x_max_pct": round(x_max / img_w * 100, 2) if img_w else 0, - "bold": False, - }) - - # Convert rows to output format with percentages - out_rows = [] - for row in rows: - out_rows.append({ - "index": row["index"], - "y_min_px": round(row["y_min"]), - "y_max_px": round(row["y_max"]), - "y_min_pct": round(row["y_min"] / img_h * 100, 2) if img_h else 0, - "y_max_pct": round(row["y_max"] / img_h * 100, 2) if img_h else 0, - "is_header": row["index"] in header_rows, - }) - - return { - "columns": out_columns, - "rows": out_rows, - "cells": cells, - "header_rows": header_rows, - "_raw_columns": columns, # internal: for propagation to other zones - } - - -def _get_content_bounds(words: List[Dict]) -> tuple: - """Get content bounds from word positions.""" - if not words: - return 0, 0, 0, 0 - x_min = min(w["left"] for w in words) - y_min = min(w["top"] for w in words) - x_max = max(w["left"] + w["width"] for w in words) - y_max = max(w["top"] + w["height"] for w in words) - return x_min, y_min, x_max - x_min, y_max - y_min - - -def _filter_decorative_margin( - words: List[Dict], - img_w: int, - log: Any, - session_id: str, -) -> Dict[str, Any]: - """Remove words that belong to a decorative alphabet strip on a margin. - - Some vocabulary worksheets have a vertical A–Z alphabet graphic along - the left or right edge. OCR reads each letter as an isolated single- - character word. These decorative elements are not content and confuse - column/row detection. - - Detection criteria (phase 1 — find the strip using single-char words): - - Words are in the outer 30% of the page (left or right) - - Nearly all words are single characters (letters or digits) - - At least 8 such words form a vertical strip (≥8 unique Y positions) - - Average horizontal spread of the strip is small (< 80px) - - Phase 2 — once a strip is confirmed, also remove any short word (≤3 - chars) in the same narrow x-range. This catches multi-char OCR - artifacts like "Vv" that belong to the same decorative element. - - Modifies *words* in place. - - Returns: - Dict with 'found' (bool), 'side' (str), 'letters_detected' (int). - """ - no_strip: Dict[str, Any] = {"found": False, "side": "", "letters_detected": 0} - if not words or img_w <= 0: - return no_strip - - margin_cutoff = img_w * 0.30 - # Phase 1: find candidate strips using short words (1-2 chars). - # OCR often reads alphabet sidebar letters as pairs ("Aa", "Bb") - # rather than singles, so accept ≤2-char words as strip candidates. - left_strip = [ - w for w in words - if len((w.get("text") or "").strip()) <= 2 - and w["left"] + w.get("width", 0) / 2 < margin_cutoff - ] - right_strip = [ - w for w in words - if len((w.get("text") or "").strip()) <= 2 - and w["left"] + w.get("width", 0) / 2 > img_w - margin_cutoff - ] - - for strip, side in [(left_strip, "left"), (right_strip, "right")]: - if len(strip) < 6: - continue - # Check vertical distribution: should have many distinct Y positions - y_centers = sorted(set( - int(w["top"] + w.get("height", 0) / 2) // 20 * 20 # bucket - for w in strip - )) - if len(y_centers) < 6: - continue - # Check horizontal compactness - x_positions = [w["left"] for w in strip] - x_min = min(x_positions) - x_max = max(x_positions) - x_spread = x_max - x_min - if x_spread > 80: - continue - - # Phase 2: strip confirmed — also collect short words in same x-range - # Expand x-range slightly to catch neighbors (e.g. "Vv" next to "U") - strip_x_lo = x_min - 20 - strip_x_hi = x_max + 60 # word width + tolerance - all_strip_words = [ - w for w in words - if len((w.get("text") or "").strip()) <= 3 - and strip_x_lo <= w["left"] <= strip_x_hi - and (w["left"] + w.get("width", 0) / 2 < margin_cutoff - if side == "left" - else w["left"] + w.get("width", 0) / 2 > img_w - margin_cutoff) - ] - - strip_set = set(id(w) for w in all_strip_words) - before = len(words) - words[:] = [w for w in words if id(w) not in strip_set] - removed = before - len(words) - if removed: - log.info( - "build-grid session %s: removed %d decorative %s-margin words " - "(strip x=%d-%d)", - session_id, removed, side, strip_x_lo, strip_x_hi, - ) - return {"found": True, "side": side, "letters_detected": len(strip)} - - return no_strip - - -def _filter_footer_words( - words: List[Dict], - img_h: int, - log: Any, - session_id: str, -) -> None: - """Remove isolated words in the bottom 5% of the page (page numbers). - - Modifies *words* in place. - """ - if not words or img_h <= 0: - return - footer_y = img_h * 0.95 - footer_words = [ - w for w in words - if w["top"] + w.get("height", 0) / 2 > footer_y - ] - if not footer_words: - return - # Only remove if footer has very few words (≤ 3) with short text - total_text = "".join((w.get("text") or "").strip() for w in footer_words) - if len(footer_words) <= 3 and len(total_text) <= 10: - footer_set = set(id(w) for w in footer_words) - words[:] = [w for w in words if id(w) not in footer_set] - log.info( - "build-grid session %s: removed %d footer words ('%s')", - session_id, len(footer_words), total_text, - ) - - -def _filter_header_junk( - words: List[Dict], - img_h: int, - log: Any, - session_id: str, -) -> None: - """Remove OCR junk from header illustrations above the real content. - - Textbook pages often have decorative header graphics (illustrations, - icons) that OCR reads as low-confidence junk characters. Real content - typically starts further down the page. - - Algorithm: - 1. Find the "content start" — the first Y position where a dense - horizontal row of 3+ high-confidence words begins. - 2. Above that line, remove words with conf < 75 and text ≤ 3 chars. - These are almost certainly OCR artifacts from illustrations. - - Modifies *words* in place. - """ - if not words or img_h <= 0: - return - - # --- Find content start: first horizontal row with ≥3 high-conf words --- - # Sort words by Y - sorted_by_y = sorted(words, key=lambda w: w["top"]) - content_start_y = 0 - _ROW_TOLERANCE = img_h * 0.02 # words within 2% of page height = same row - _MIN_ROW_WORDS = 3 - _MIN_CONF = 80 - - i = 0 - while i < len(sorted_by_y): - row_y = sorted_by_y[i]["top"] - # Collect words in this row band - row_words = [] - j = i - while j < len(sorted_by_y) and sorted_by_y[j]["top"] - row_y < _ROW_TOLERANCE: - row_words.append(sorted_by_y[j]) - j += 1 - # Count high-confidence words with real text (> 1 char) - high_conf = [ - w for w in row_words - if w.get("conf", 0) >= _MIN_CONF - and len((w.get("text") or "").strip()) > 1 - ] - if len(high_conf) >= _MIN_ROW_WORDS: - content_start_y = row_y - break - i = j if j > i else i + 1 - - if content_start_y <= 0: - return # no clear content start found - - # --- Remove low-conf short junk above content start --- - junk = [ - w for w in words - if w["top"] + w.get("height", 0) < content_start_y - and w.get("conf", 0) < 75 - and len((w.get("text") or "").strip()) <= 3 - ] - if not junk: - return - - junk_set = set(id(w) for w in junk) - before = len(words) - words[:] = [w for w in words if id(w) not in junk_set] - removed = before - len(words) - if removed: - log.info( - "build-grid session %s: removed %d header junk words above y=%d " - "(content start)", - session_id, removed, content_start_y, - ) - # --------------------------------------------------------------------------- # Core computation (used by build-grid endpoint and regression tests) @@ -2802,100 +1456,15 @@ async def _build_grid_core(session_id: str, session: dict) -> dict: logger.warning("Dictionary detection failed: %s", e) # --- Syllable divider insertion for dictionary pages --- - # Dictionary pages show syllable breaks as "|" (e.g. "Ka|me|rad"). - # OCR engines rarely detect "|", so we use a two-step approach: - # 1. CV: detect if a word_box image contains thin vertical pipe lines - # 2. pyphen: insert syllable breaks at linguistically correct positions - # Only the FIRST significant word per cell gets pipes (matching print layout). + # CV-validated: only inserts "|" where image shows thin vertical lines. + # See cv_syllable_detect.py for the detection + insertion logic. syllable_insertions = 0 if dict_detection.get("is_dictionary") and img_bgr is not None: try: - import pyphen - _hyph_de = pyphen.Pyphen(lang='de_DE') - _hyph_en = pyphen.Pyphen(lang='en_US') - _ipa_re = re.compile(r'[\[\]ˈˌːʃʒθðŋɑɒæɔəɛɜɪʊʌ]') - img_gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) - - def _word_has_pipe_lines(wb: Dict) -> bool: - """CV check: does this word_box image show thin vertical dividers?""" - x = wb.get("left", 0) - y = wb.get("top", 0) - w = wb.get("width", 0) - h = wb.get("height", 0) - if w < 30 or h < 12: - return False - ih, iw = img_gray.shape[:2] - y1, y2 = max(0, y), min(ih, y + h) - x1, x2 = max(0, x), min(iw, x + w) - roi = img_gray[y1:y2, x1:x2] - if roi.size == 0: - return False - rh, rw = roi.shape - # Binarize (ink = white) - _, binary = cv2.threshold( - roi, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU - ) - # Morphological opening: keep only tall vertical structures - kern_h = max(int(rh * 0.55), 8) - kernel = np.ones((kern_h, 1), np.uint8) - vertical = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel) - # Find surviving contours - contours, _ = cv2.findContours( - vertical, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE - ) - margin = max(int(rw * 0.08), 3) - for cnt in contours: - cx, cy, cw, ch = cv2.boundingRect(cnt) - if cw > 4: - continue # too wide - if cx < margin or cx + cw > rw - margin: - continue # at word edge (l, I, 1) - # Check isolation: adjacent columns should be mostly empty - left_zone = binary[cy:cy + ch, max(0, cx - 3):cx] - right_zone = binary[cy:cy + ch, cx + cw:min(rw, cx + cw + 3)] - left_ink = np.mean(left_zone) if left_zone.size else 255 - right_ink = np.mean(right_zone) if right_zone.size else 255 - if left_ink < 80 and right_ink < 80: - return True # isolated thin vertical line = pipe - return False - - for z in zones_data: - for cell in z.get("cells", []): - ct = cell.get("col_type", "") - if not ct.startswith("column_"): - continue - text = cell.get("text", "") - if not text or "|" in text: - continue - if _ipa_re.search(text): - continue - # CV gate: check if ANY word_box in this cell has pipe lines - wbs = cell.get("word_boxes") or [] - has_pipes = any(_word_has_pipe_lines(wb) for wb in wbs) - if not has_pipes: - continue - # Apply pyphen to FIRST significant word only - # (dictionary layout: only headword gets pipes) - match = re.match(r'^(\s*)([\w\-äöüÄÖÜß]+)(.*)', text, re.DOTALL) - if not match: - continue - prefix, first_word, rest = match.groups() - if len(first_word) < 4: - continue - hyph = _hyph_de.inserted(first_word, hyphen='|') - if '|' not in hyph: - hyph = _hyph_en.inserted(first_word, hyphen='|') - if '|' in hyph and hyph != first_word: - cell["text"] = prefix + hyph + rest - syllable_insertions += 1 - if syllable_insertions: - logger.info( - "build-grid session %s: inserted syllable dividers in %d cells " - "(CV-validated)", - session_id, syllable_insertions, - ) - except ImportError: - logger.warning("pyphen not installed — skipping syllable insertion") + from cv_syllable_detect import insert_syllable_dividers + syllable_insertions = insert_syllable_dividers( + zones_data, img_bgr, session_id, + ) except Exception as e: logger.warning("Syllable insertion failed: %s", e) diff --git a/klausur-service/backend/grid_editor_helpers.py b/klausur-service/backend/grid_editor_helpers.py new file mode 100644 index 0000000..06a59e2 --- /dev/null +++ b/klausur-service/backend/grid_editor_helpers.py @@ -0,0 +1,1389 @@ +""" +Grid Editor helper functions — filters, detectors, and zone grid building. + +Extracted from grid_editor_api.py for maintainability. +All functions are pure computation — no HTTP, DB, or session side effects. + +Lizenz: Apache 2.0 (kommerziell nutzbar) +DATENSCHUTZ: Alle Verarbeitung erfolgt lokal. +""" + +import logging +import re +from typing import Any, Dict, List, Optional, Tuple + +import cv2 +import numpy as np + +from cv_words_first import _cluster_rows, _build_cells + +logger = logging.getLogger(__name__) + + +def _filter_border_strip_words(words: List[Dict]) -> Tuple[List[Dict], int]: + """Remove page-border decoration strip words BEFORE column detection. + + Scans from each page edge inward to find the first significant x-gap + (>30 px). If the edge cluster contains <15 % of total words, those + words are removed as border-strip artifacts (alphabet letters, + illustration fragments). + + Must run BEFORE ``_build_zone_grid`` so that column detection only + sees real content words and doesn't produce inflated row counts. + """ + if len(words) < 10: + return words, 0 + + sorted_words = sorted(words, key=lambda w: w.get("left", 0)) + total = len(sorted_words) + + # -- Left-edge scan (running max right-edge) -- + left_count = 0 + running_right = 0 + for gi in range(total - 1): + running_right = max( + running_right, + sorted_words[gi].get("left", 0) + sorted_words[gi].get("width", 0), + ) + if sorted_words[gi + 1].get("left", 0) - running_right > 30: + left_count = gi + 1 + break + + # -- Right-edge scan (running min left) -- + right_count = 0 + running_left = sorted_words[-1].get("left", 0) + for gi in range(total - 1, 0, -1): + running_left = min(running_left, sorted_words[gi].get("left", 0)) + prev_right = ( + sorted_words[gi - 1].get("left", 0) + + sorted_words[gi - 1].get("width", 0) + ) + if running_left - prev_right > 30: + right_count = total - gi + break + + # Validate candidate strip: real border decorations are mostly short + # words (alphabet letters like "A", "Bb", stray marks). Multi-word + # content like "der Ranzen" or "die Schals" (continuation of German + # translations) must NOT be removed. + def _is_decorative_strip(candidates: List[Dict]) -> bool: + if not candidates: + return False + short = sum(1 for w in candidates if len((w.get("text") or "").strip()) <= 2) + return short / len(candidates) >= 0.45 + + strip_ids: set = set() + if left_count > 0 and left_count / total < 0.20: + candidates = sorted_words[:left_count] + if _is_decorative_strip(candidates): + strip_ids = {id(w) for w in candidates} + elif right_count > 0 and right_count / total < 0.20: + candidates = sorted_words[total - right_count:] + if _is_decorative_strip(candidates): + strip_ids = {id(w) for w in candidates} + + if not strip_ids: + return words, 0 + + return [w for w in words if id(w) not in strip_ids], len(strip_ids) + + +def _cluster_columns_by_alignment( + words: List[Dict], + zone_w: int, + rows: List[Dict], +) -> List[Dict[str, Any]]: + """Detect columns by clustering left-edge alignment across rows. + + Hybrid approach: + 1. Group words by row, find "group start" positions within each row + (words preceded by a large gap or first word in row) + 2. Cluster group-start left-edges by X-proximity across rows + 3. Filter by row coverage (how many rows have a group start here) + 4. Merge nearby clusters + 5. Build column boundaries + + This filters out mid-phrase word positions (e.g. IPA transcriptions, + second words in multi-word entries) by only considering positions + where a new word group begins within a row. + """ + if not words or not rows: + return [] + + total_rows = len(rows) + if total_rows == 0: + return [] + + # --- Group words by row --- + row_words: Dict[int, List[Dict]] = {} + for w in words: + y_center = w["top"] + w["height"] / 2 + best = min(rows, key=lambda r: abs(r["y_center"] - y_center)) + row_words.setdefault(best["index"], []).append(w) + + # --- Compute adaptive gap threshold for group-start detection --- + all_gaps: List[float] = [] + for ri, rw_list in row_words.items(): + sorted_rw = sorted(rw_list, key=lambda w: w["left"]) + for i in range(len(sorted_rw) - 1): + right = sorted_rw[i]["left"] + sorted_rw[i]["width"] + gap = sorted_rw[i + 1]["left"] - right + if gap > 0: + all_gaps.append(gap) + + if all_gaps: + sorted_gaps = sorted(all_gaps) + median_gap = sorted_gaps[len(sorted_gaps) // 2] + heights = [w["height"] for w in words if w.get("height", 0) > 0] + median_h = sorted(heights)[len(heights) // 2] if heights else 25 + # Column boundary: gap > 3× median gap or > 1.5× median word height + gap_threshold = max(median_gap * 3, median_h * 1.5, 30) + else: + gap_threshold = 50 + + # --- Find group-start positions (left-edges that begin a new column) --- + start_positions: List[tuple] = [] # (left_edge, row_index) + for ri, rw_list in row_words.items(): + sorted_rw = sorted(rw_list, key=lambda w: w["left"]) + # First word in row is always a group start + start_positions.append((sorted_rw[0]["left"], ri)) + for i in range(1, len(sorted_rw)): + right_prev = sorted_rw[i - 1]["left"] + sorted_rw[i - 1]["width"] + gap = sorted_rw[i]["left"] - right_prev + if gap >= gap_threshold: + start_positions.append((sorted_rw[i]["left"], ri)) + + start_positions.sort(key=lambda x: x[0]) + + logger.info( + "alignment columns: %d group-start positions from %d words " + "(gap_threshold=%.0f, %d rows)", + len(start_positions), len(words), gap_threshold, total_rows, + ) + + if not start_positions: + x_min = min(w["left"] for w in words) + x_max = max(w["left"] + w["width"] for w in words) + return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}] + + # --- Cluster group-start positions by X-proximity --- + tolerance = max(10, int(zone_w * 0.01)) + clusters: List[Dict[str, Any]] = [] + cur_edges = [start_positions[0][0]] + cur_rows = {start_positions[0][1]} + + for left, row_idx in start_positions[1:]: + if left - cur_edges[-1] <= tolerance: + cur_edges.append(left) + cur_rows.add(row_idx) + else: + clusters.append({ + "mean_x": int(sum(cur_edges) / len(cur_edges)), + "min_edge": min(cur_edges), + "max_edge": max(cur_edges), + "count": len(cur_edges), + "distinct_rows": len(cur_rows), + "row_coverage": len(cur_rows) / total_rows, + }) + cur_edges = [left] + cur_rows = {row_idx} + clusters.append({ + "mean_x": int(sum(cur_edges) / len(cur_edges)), + "min_edge": min(cur_edges), + "max_edge": max(cur_edges), + "count": len(cur_edges), + "distinct_rows": len(cur_rows), + "row_coverage": len(cur_rows) / total_rows, + }) + + # --- Filter by row coverage --- + # These thresholds must be high enough to avoid false columns in flowing + # text (random inter-word gaps) while still detecting real columns in + # vocabulary worksheets (which typically have >80% row coverage). + MIN_COVERAGE_PRIMARY = 0.35 + MIN_COVERAGE_SECONDARY = 0.12 + MIN_WORDS_SECONDARY = 4 + MIN_DISTINCT_ROWS = 3 + + # Content boundary for left-margin detection + content_x_min = min(w["left"] for w in words) + content_x_max = max(w["left"] + w["width"] for w in words) + content_span = content_x_max - content_x_min + + primary = [ + c for c in clusters + if c["row_coverage"] >= MIN_COVERAGE_PRIMARY + and c["distinct_rows"] >= MIN_DISTINCT_ROWS + ] + primary_ids = {id(c) for c in primary} + secondary = [ + c for c in clusters + if id(c) not in primary_ids + and c["row_coverage"] >= MIN_COVERAGE_SECONDARY + and c["count"] >= MIN_WORDS_SECONDARY + and c["distinct_rows"] >= MIN_DISTINCT_ROWS + ] + + # Tertiary: narrow left-margin columns (page refs, markers) that have + # too few rows for secondary but are clearly left-aligned and separated + # from the main content. These appear at the far left or far right and + # have a large gap to the nearest significant cluster. + used_ids = {id(c) for c in primary} | {id(c) for c in secondary} + sig_xs = [c["mean_x"] for c in primary + secondary] + + MIN_DISTINCT_ROWS_TERTIARY = max(MIN_DISTINCT_ROWS + 1, 4) + MIN_COVERAGE_TERTIARY = 0.05 # at least 5% of rows + tertiary = [] + for c in clusters: + if id(c) in used_ids: + continue + if c["distinct_rows"] < MIN_DISTINCT_ROWS_TERTIARY: + continue + if c["row_coverage"] < MIN_COVERAGE_TERTIARY: + continue + # Must be near left or right content margin (within 15%) + rel_pos = (c["mean_x"] - content_x_min) / content_span if content_span else 0.5 + if not (rel_pos < 0.15 or rel_pos > 0.85): + continue + # Must have significant gap to nearest significant cluster + if sig_xs: + min_dist = min(abs(c["mean_x"] - sx) for sx in sig_xs) + if min_dist < max(30, content_span * 0.02): + continue + tertiary.append(c) + + if tertiary: + for c in tertiary: + logger.info( + " tertiary (margin) cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)", + c["mean_x"], c["min_edge"], c["max_edge"], + c["count"], c["distinct_rows"], c["row_coverage"] * 100, + ) + + significant = sorted(primary + secondary + tertiary, key=lambda c: c["mean_x"]) + + for c in significant: + logger.info( + " significant cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)", + c["mean_x"], c["min_edge"], c["max_edge"], + c["count"], c["distinct_rows"], c["row_coverage"] * 100, + ) + logger.info( + "alignment columns: %d clusters, %d primary, %d secondary → %d significant", + len(clusters), len(primary), len(secondary), len(significant), + ) + + if not significant: + # Fallback: single column covering all content + x_min = min(w["left"] for w in words) + x_max = max(w["left"] + w["width"] for w in words) + return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}] + + # --- Merge nearby clusters --- + merge_distance = max(25, int(zone_w * 0.03)) + merged = [significant[0].copy()] + for s in significant[1:]: + if s["mean_x"] - merged[-1]["mean_x"] < merge_distance: + prev = merged[-1] + total = prev["count"] + s["count"] + prev["mean_x"] = ( + prev["mean_x"] * prev["count"] + s["mean_x"] * s["count"] + ) // total + prev["count"] = total + prev["min_edge"] = min(prev["min_edge"], s["min_edge"]) + prev["max_edge"] = max(prev["max_edge"], s["max_edge"]) + prev["distinct_rows"] = max(prev["distinct_rows"], s["distinct_rows"]) + else: + merged.append(s.copy()) + + logger.info( + "alignment columns: %d after merge (distance=%d)", + len(merged), merge_distance, + ) + + # --- Build column boundaries --- + margin = max(5, int(zone_w * 0.005)) + content_x_min = min(w["left"] for w in words) + content_x_max = max(w["left"] + w["width"] for w in words) + + columns: List[Dict[str, Any]] = [] + for i, cluster in enumerate(merged): + x_min = max(content_x_min, cluster["min_edge"] - margin) + if i + 1 < len(merged): + x_max = merged[i + 1]["min_edge"] - margin + else: + x_max = content_x_max + + columns.append({ + "index": i, + "type": f"column_{i + 1}" if len(merged) > 1 else "column_text", + "x_min": x_min, + "x_max": x_max, + }) + + return columns + + +# Characters that are typically OCR artefacts from box border lines. +# Intentionally excludes ! (red markers) and . , ; (real punctuation). +_GRID_GHOST_CHARS = set("|1lI[](){}/\\-—–_~=+") + + +def _filter_border_ghosts( + words: List[Dict], + boxes: List, +) -> tuple: + """Remove words sitting on box borders that are OCR artefacts. + + Returns (filtered_words, removed_count). + """ + if not boxes or not words: + return words, 0 + + # Build border bands from detected boxes + x_bands: List[tuple] = [] + y_bands: List[tuple] = [] + for b in boxes: + bt = ( + b.border_thickness + if hasattr(b, "border_thickness") + else b.get("border_thickness", 3) + ) + # Skip borderless boxes (images/graphics) — no border line to produce ghosts + if bt == 0: + continue + bx = b.x if hasattr(b, "x") else b.get("x", 0) + by = b.y if hasattr(b, "y") else b.get("y", 0) + bw = b.width if hasattr(b, "width") else b.get("w", b.get("width", 0)) + bh = b.height if hasattr(b, "height") else b.get("h", b.get("height", 0)) + margin = max(bt * 2, 10) + 6 + x_bands.append((bx - margin, bx + margin)) + x_bands.append((bx + bw - margin, bx + bw + margin)) + y_bands.append((by - margin, by + margin)) + y_bands.append((by + bh - margin, by + bh + margin)) + + def _is_ghost(w: Dict) -> bool: + text = (w.get("text") or "").strip() + if not text: + return False + # Check if any word edge (not just center) touches a border band + w_left = w["left"] + w_right = w["left"] + w["width"] + w_top = w["top"] + w_bottom = w["top"] + w["height"] + on_border = ( + any(lo <= w_left <= hi or lo <= w_right <= hi for lo, hi in x_bands) + or any(lo <= w_top <= hi or lo <= w_bottom <= hi for lo, hi in y_bands) + ) + if not on_border: + return False + if len(text) == 1 and text in _GRID_GHOST_CHARS: + return True + return False + + filtered = [w for w in words if not _is_ghost(w)] + return filtered, len(words) - len(filtered) + + +_MARKER_CHARS = set("•*·-–—|~=+#>→►▸▪◆○●□■✓✗✔✘") + + +def _merge_inline_marker_columns( + columns: List[Dict], + words: List[Dict], +) -> List[Dict]: + """Merge narrow marker columns (bullets, numbering) into adjacent text. + + Bullet points (•, *, -) and numbering (1., 2.) create narrow columns + at the left edge of a zone. These are inline markers that indent text, + not real separate columns. Merge them with their right neighbour. + + Does NOT merge columns containing alphabetic words like "to", "in", + "der", "die", "das" — those are legitimate content columns. + """ + if len(columns) < 2: + return columns + + merged: List[Dict] = [] + skip: set = set() + + for i, col in enumerate(columns): + if i in skip: + continue + + # Find words in this column + col_words = [ + w for w in words + if col["x_min"] <= w["left"] + w["width"] / 2 < col["x_max"] + ] + col_width = col["x_max"] - col["x_min"] + + # Narrow column with mostly short words → MIGHT be inline markers + if col_words and col_width < 80: + avg_len = sum(len(w.get("text", "")) for w in col_words) / len(col_words) + if avg_len <= 2 and i + 1 < len(columns): + # Check if words are actual markers (symbols/numbers) vs + # real alphabetic words like "to", "in", "der", "die" + texts = [(w.get("text") or "").strip() for w in col_words] + alpha_count = sum( + 1 for t in texts + if t and t[0].isalpha() and t not in _MARKER_CHARS + ) + alpha_ratio = alpha_count / len(texts) if texts else 0 + + # If ≥50% of words are alphabetic, this is a real column + if alpha_ratio >= 0.5: + logger.info( + " kept narrow column %d (w=%d, avg_len=%.1f, " + "alpha=%.0f%%) — contains real words", + i, col_width, avg_len, alpha_ratio * 100, + ) + else: + # Merge into next column + next_col = columns[i + 1].copy() + next_col["x_min"] = col["x_min"] + merged.append(next_col) + skip.add(i + 1) + logger.info( + " merged inline marker column %d (w=%d, avg_len=%.1f) " + "into column %d", + i, col_width, avg_len, i + 1, + ) + continue + + merged.append(col) + + # Re-index + for i, col in enumerate(merged): + col["index"] = i + col["type"] = f"column_{i + 1}" if len(merged) > 1 else "column_text" + + return merged + + +def _flatten_word_boxes(cells: List[Dict]) -> List[Dict]: + """Extract all word_boxes from cells into a flat list of word dicts.""" + words: List[Dict] = [] + for cell in cells: + for wb in cell.get("word_boxes") or []: + if wb.get("text", "").strip(): + words.append({ + "text": wb["text"], + "left": wb["left"], + "top": wb["top"], + "width": wb["width"], + "height": wb["height"], + "conf": wb.get("conf", 0), + }) + return words + + +def _words_in_zone( + words: List[Dict], + zone_y: int, + zone_h: int, + zone_x: int, + zone_w: int, +) -> List[Dict]: + """Filter words whose Y-center falls within a zone's bounds.""" + zone_y_end = zone_y + zone_h + zone_x_end = zone_x + zone_w + result = [] + for w in words: + cy = w["top"] + w["height"] / 2 + cx = w["left"] + w["width"] / 2 + if zone_y <= cy <= zone_y_end and zone_x <= cx <= zone_x_end: + result.append(w) + return result + + +# --------------------------------------------------------------------------- +# Vertical divider detection and zone splitting +# --------------------------------------------------------------------------- + +_PIPE_RE_VSPLIT = re.compile(r"^\|+$") + + +def _detect_vertical_dividers( + words: List[Dict], + zone_x: int, + zone_w: int, + zone_y: int, + zone_h: int, +) -> List[float]: + """Detect vertical divider lines from pipe word_boxes at consistent x. + + Returns list of divider x-positions (empty if no dividers found). + """ + if not words or zone_w <= 0 or zone_h <= 0: + return [] + + # Collect pipe word_boxes + pipes = [ + w for w in words + if _PIPE_RE_VSPLIT.match((w.get("text") or "").strip()) + ] + if len(pipes) < 5: + return [] + + # Cluster pipe x-centers by proximity + tolerance = max(15, int(zone_w * 0.02)) + pipe_xs = sorted(w["left"] + w["width"] / 2 for w in pipes) + + clusters: List[List[float]] = [[pipe_xs[0]]] + for x in pipe_xs[1:]: + if x - clusters[-1][-1] <= tolerance: + clusters[-1].append(x) + else: + clusters.append([x]) + + dividers: List[float] = [] + for cluster in clusters: + if len(cluster) < 5: + continue + mean_x = sum(cluster) / len(cluster) + # Must be between 15% and 85% of zone width + rel_pos = (mean_x - zone_x) / zone_w + if rel_pos < 0.15 or rel_pos > 0.85: + continue + # Check vertical coverage: pipes must span >= 50% of zone height + cluster_pipes = [ + w for w in pipes + if abs(w["left"] + w["width"] / 2 - mean_x) <= tolerance + ] + ys = [w["top"] for w in cluster_pipes] + [w["top"] + w["height"] for w in cluster_pipes] + y_span = max(ys) - min(ys) if ys else 0 + if y_span < zone_h * 0.5: + continue + dividers.append(mean_x) + + return sorted(dividers) + + +def _split_zone_at_vertical_dividers( + zone: "PageZone", + divider_xs: List[float], + vsplit_group_id: int, +) -> List["PageZone"]: + """Split a PageZone at vertical divider positions into sub-zones.""" + from cv_vocab_types import PageZone + + boundaries = [zone.x] + divider_xs + [zone.x + zone.width] + hints = [] + for i in range(len(boundaries) - 1): + if i == 0: + hints.append("left_of_vsplit") + elif i == len(boundaries) - 2: + hints.append("right_of_vsplit") + else: + hints.append("middle_of_vsplit") + + sub_zones = [] + for i in range(len(boundaries) - 1): + x_start = int(boundaries[i]) + x_end = int(boundaries[i + 1]) + sub = PageZone( + index=0, # re-indexed later + zone_type=zone.zone_type, + y=zone.y, + height=zone.height, + x=x_start, + width=x_end - x_start, + box=zone.box, + image_overlays=zone.image_overlays, + layout_hint=hints[i], + vsplit_group=vsplit_group_id, + ) + sub_zones.append(sub) + + return sub_zones + + +def _merge_content_zones_across_boxes( + zones: List, + content_x: int, + content_w: int, +) -> List: + """Merge content zones separated by box zones into single zones. + + Box zones become image_overlays on the merged content zone. + Pattern: [content, box*, content] → [merged_content with overlay] + Box zones NOT between two content zones stay as standalone zones. + """ + if len(zones) < 3: + return zones + + # Group consecutive runs of [content, box+, content] + result: List = [] + i = 0 + while i < len(zones): + z = zones[i] + if z.zone_type != "content": + result.append(z) + i += 1 + continue + + # Start of a potential merge group: content zone + group_contents = [z] + group_boxes = [] + j = i + 1 + # Absorb [box, content] pairs — only absorb a box if it's + # confirmed to be followed by another content zone. + while j < len(zones): + if (zones[j].zone_type == "box" + and j + 1 < len(zones) + and zones[j + 1].zone_type == "content"): + group_boxes.append(zones[j]) + group_contents.append(zones[j + 1]) + j += 2 + else: + break + + if len(group_contents) >= 2 and group_boxes: + # Merge: create one large content zone spanning all + y_min = min(c.y for c in group_contents) + y_max = max(c.y + c.height for c in group_contents) + overlays = [] + for bz in group_boxes: + overlay = { + "y": bz.y, + "height": bz.height, + "x": bz.x, + "width": bz.width, + } + if bz.box: + overlay["box"] = { + "x": bz.box.x, + "y": bz.box.y, + "width": bz.box.width, + "height": bz.box.height, + "confidence": bz.box.confidence, + "border_thickness": bz.box.border_thickness, + } + overlays.append(overlay) + + merged = PageZone( + index=0, # re-indexed below + zone_type="content", + y=y_min, + height=y_max - y_min, + x=content_x, + width=content_w, + image_overlays=overlays, + ) + result.append(merged) + i = j + else: + # No merge possible — emit just the content zone + result.append(z) + i += 1 + + # Re-index zones + for idx, z in enumerate(result): + z.index = idx + + logger.info( + "zone-merge: %d zones → %d zones after merging across boxes", + len(zones), len(result), + ) + return result + + +def _detect_heading_rows_by_color(zones_data: List[Dict], img_w: int, img_h: int) -> int: + """Detect heading rows by color + height after color annotation. + + A row is a heading if: + 1. ALL word_boxes have color_name != 'black' (typically 'blue') + 2. Mean word height > 1.2x median height of all words in the zone + + Detected heading rows are merged into a single spanning cell. + Returns count of headings detected. + """ + heading_count = 0 + + for z in zones_data: + cells = z.get("cells", []) + rows = z.get("rows", []) + columns = z.get("columns", []) + if not cells or not rows or len(columns) < 2: + continue + + # Compute median word height across the zone + all_heights = [] + for cell in cells: + for wb in cell.get("word_boxes") or []: + h = wb.get("height", 0) + if h > 0: + all_heights.append(h) + if not all_heights: + continue + all_heights_sorted = sorted(all_heights) + median_h = all_heights_sorted[len(all_heights_sorted) // 2] + + heading_row_indices = [] + for row in rows: + if row.get("is_header"): + continue # already detected as header + ri = row["index"] + row_cells = [c for c in cells if c.get("row_index") == ri] + row_wbs = [ + wb for cell in row_cells + for wb in cell.get("word_boxes") or [] + ] + if not row_wbs: + continue + + # Condition 1: ALL words are non-black + all_colored = all( + wb.get("color_name", "black") != "black" + for wb in row_wbs + ) + if not all_colored: + continue + + # Condition 2: mean height > 1.2x median + mean_h = sum(wb.get("height", 0) for wb in row_wbs) / len(row_wbs) + if mean_h <= median_h * 1.2: + continue + + heading_row_indices.append(ri) + + # Merge heading cells into spanning cells + for hri in heading_row_indices: + header_cells = [c for c in cells if c.get("row_index") == hri] + if len(header_cells) <= 1: + # Single cell — just mark it as heading + if header_cells: + header_cells[0]["col_type"] = "heading" + heading_count += 1 + # Mark row as header + for row in rows: + if row["index"] == hri: + row["is_header"] = True + continue + + # Collect all word_boxes and text from all columns + all_wb = [] + all_text_parts = [] + for hc in sorted(header_cells, key=lambda c: c["col_index"]): + all_wb.extend(hc.get("word_boxes", [])) + if hc.get("text", "").strip(): + all_text_parts.append(hc["text"].strip()) + + # Remove all cells for this row, replace with one spanning cell + z["cells"] = [c for c in z["cells"] if c.get("row_index") != hri] + + if all_wb: + x_min = min(wb["left"] for wb in all_wb) + y_min = min(wb["top"] for wb in all_wb) + x_max = max(wb["left"] + wb["width"] for wb in all_wb) + y_max = max(wb["top"] + wb["height"] for wb in all_wb) + + # Use the actual starting col_index from the first cell + first_col = min(hc["col_index"] for hc in header_cells) + zone_idx = z.get("zone_index", 0) + z["cells"].append({ + "cell_id": f"Z{zone_idx}_R{hri:02d}_C{first_col}", + "zone_index": zone_idx, + "row_index": hri, + "col_index": first_col, + "col_type": "heading", + "text": " ".join(all_text_parts), + "confidence": 0.0, + "bbox_px": {"x": x_min, "y": y_min, + "w": x_max - x_min, "h": y_max - y_min}, + "bbox_pct": { + "x": round(x_min / img_w * 100, 2) if img_w else 0, + "y": round(y_min / img_h * 100, 2) if img_h else 0, + "w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0, + "h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0, + }, + "word_boxes": all_wb, + "ocr_engine": "words_first", + "is_bold": True, + }) + + # Mark row as header + for row in rows: + if row["index"] == hri: + row["is_header"] = True + heading_count += 1 + + return heading_count + + +def _detect_heading_rows_by_single_cell( + zones_data: List[Dict], img_w: int, img_h: int, +) -> int: + """Detect heading rows that have only a single content cell. + + Black headings like "Theme" have normal color and height, so they are + missed by ``_detect_heading_rows_by_color``. The distinguishing signal + is that they occupy only one column while normal vocabulary rows fill + at least 2-3 columns. + + A row qualifies as a heading if: + 1. It is not already marked as a header/heading. + 2. It has exactly ONE cell whose col_type starts with ``column_`` + (excluding column_1 / page_ref which only carries page numbers). + 3. That single cell is NOT in the last column (continuation/example + lines like "2. Veränderung, Wechsel" often sit alone in column_4). + 4. The text does not start with ``[`` (IPA continuation). + 5. The zone has ≥3 columns and ≥5 rows (avoids false positives in + tiny zones). + 6. The majority of rows in the zone have ≥2 content cells (ensures + we are in a multi-column vocab layout). + """ + heading_count = 0 + + for z in zones_data: + cells = z.get("cells", []) + rows = z.get("rows", []) + columns = z.get("columns", []) + if len(columns) < 3 or len(rows) < 5: + continue + + # Determine the last col_index (example/sentence column) + col_indices = sorted(set(c.get("col_index", 0) for c in cells)) + if not col_indices: + continue + last_col = col_indices[-1] + + # Count content cells per row (column_* but not column_1/page_ref). + # Exception: column_1 cells that contain a dictionary article word + # (die/der/das etc.) ARE content — they appear in dictionary layouts + # where the leftmost column holds grammatical articles. + _ARTICLE_WORDS = { + "die", "der", "das", "dem", "den", "des", "ein", "eine", + "the", "a", "an", + } + row_content_counts: Dict[int, int] = {} + for cell in cells: + ct = cell.get("col_type", "") + if not ct.startswith("column_"): + continue + if ct == "column_1": + ctext = (cell.get("text") or "").strip().lower() + if ctext not in _ARTICLE_WORDS: + continue + ri = cell.get("row_index", -1) + row_content_counts[ri] = row_content_counts.get(ri, 0) + 1 + + # Majority of rows must have ≥2 content cells + multi_col_rows = sum(1 for cnt in row_content_counts.values() if cnt >= 2) + if multi_col_rows < len(rows) * 0.4: + continue + + # Exclude first and last non-header rows — these are typically + # page numbers or footer text, not headings. + non_header_rows = [r for r in rows if not r.get("is_header")] + if len(non_header_rows) < 3: + continue + first_ri = non_header_rows[0]["index"] + last_ri = non_header_rows[-1]["index"] + + heading_row_indices = [] + for row in rows: + if row.get("is_header"): + continue + ri = row["index"] + if ri == first_ri or ri == last_ri: + continue + row_cells = [c for c in cells if c.get("row_index") == ri] + content_cells = [ + c for c in row_cells + if c.get("col_type", "").startswith("column_") + and (c.get("col_type") != "column_1" + or (c.get("text") or "").strip().lower() in _ARTICLE_WORDS) + ] + if len(content_cells) != 1: + continue + cell = content_cells[0] + # Not in the last column (continuation/example lines) + if cell.get("col_index") == last_col: + continue + text = (cell.get("text") or "").strip() + if not text or text.startswith("["): + continue + # Skip garbled IPA without brackets (e.g. "ska:f – ska:vz") + # but NOT text with real IPA symbols (e.g. "Theme [θˈiːm]") + _REAL_IPA_CHARS = set("ˈˌəɪɛɒʊʌæɑɔʃʒθðŋ") + if _text_has_garbled_ipa(text) and not any(c in _REAL_IPA_CHARS for c in text): + continue + heading_row_indices.append(ri) + + for hri in heading_row_indices: + header_cells = [c for c in cells if c.get("row_index") == hri] + if not header_cells: + continue + + # Collect all word_boxes and text + all_wb = [] + all_text_parts = [] + for hc in sorted(header_cells, key=lambda c: c["col_index"]): + all_wb.extend(hc.get("word_boxes", [])) + if hc.get("text", "").strip(): + all_text_parts.append(hc["text"].strip()) + + first_col_idx = min(hc["col_index"] for hc in header_cells) + + # Remove old cells for this row, add spanning heading cell + z["cells"] = [c for c in z["cells"] if c.get("row_index") != hri] + + if all_wb: + x_min = min(wb["left"] for wb in all_wb) + y_min = min(wb["top"] for wb in all_wb) + x_max = max(wb["left"] + wb["width"] for wb in all_wb) + y_max = max(wb["top"] + wb["height"] for wb in all_wb) + else: + # Fallback to first cell bbox + bp = header_cells[0].get("bbox_px", {}) + x_min = bp.get("x", 0) + y_min = bp.get("y", 0) + x_max = x_min + bp.get("w", 0) + y_max = y_min + bp.get("h", 0) + + zone_idx = z.get("zone_index", 0) + z["cells"].append({ + "cell_id": f"Z{zone_idx}_R{hri:02d}_C{first_col_idx}", + "zone_index": zone_idx, + "row_index": hri, + "col_index": first_col_idx, + "col_type": "heading", + "text": " ".join(all_text_parts), + "confidence": 0.0, + "bbox_px": {"x": x_min, "y": y_min, + "w": x_max - x_min, "h": y_max - y_min}, + "bbox_pct": { + "x": round(x_min / img_w * 100, 2) if img_w else 0, + "y": round(y_min / img_h * 100, 2) if img_h else 0, + "w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0, + "h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0, + }, + "word_boxes": all_wb, + "ocr_engine": "words_first", + "is_bold": False, + }) + + for row in rows: + if row["index"] == hri: + row["is_header"] = True + heading_count += 1 + + return heading_count + + +def _detect_header_rows( + rows: List[Dict], + zone_words: List[Dict], + zone_y: int, + columns: Optional[List[Dict]] = None, + skip_first_row_header: bool = False, +) -> List[int]: + """Detect header rows: first-row heuristic + spanning header detection. + + A "spanning header" is a row whose words stretch across multiple column + boundaries (e.g. "Unit4: Bonnie Scotland" centred across 4 columns). + """ + if len(rows) < 2: + return [] + + headers = [] + + if not skip_first_row_header: + first_row = rows[0] + second_row = rows[1] + + # Gap between first and second row > 0.5x average row height + avg_h = sum(r["y_max"] - r["y_min"] for r in rows) / len(rows) + gap = second_row["y_min"] - first_row["y_max"] + if gap > avg_h * 0.5: + headers.append(0) + + # Also check if first row words are taller than average (bold/header text) + all_heights = [w["height"] for w in zone_words] + median_h = sorted(all_heights)[len(all_heights) // 2] if all_heights else 20 + first_row_words = [ + w for w in zone_words + if first_row["y_min"] <= w["top"] + w["height"] / 2 <= first_row["y_max"] + ] + if first_row_words: + first_h = max(w["height"] for w in first_row_words) + if first_h > median_h * 1.3: + if 0 not in headers: + headers.append(0) + + # Note: Spanning-header detection (rows spanning all columns) has been + # disabled because it produces too many false positives on vocabulary + # worksheets where IPA transcriptions or short entries naturally span + # multiple columns with few words. The first-row heuristic above is + # sufficient for detecting real headers. + + return headers + + +def _build_zone_grid( + zone_words: List[Dict], + zone_x: int, + zone_y: int, + zone_w: int, + zone_h: int, + zone_index: int, + img_w: int, + img_h: int, + global_columns: Optional[List[Dict]] = None, + skip_first_row_header: bool = False, +) -> Dict[str, Any]: + """Build columns, rows, cells for a single zone from its words. + + Args: + global_columns: If provided, use these pre-computed column boundaries + instead of detecting columns per zone. Used for content zones so + that all content zones (above/between/below boxes) share the same + column structure. Box zones always detect columns independently. + """ + if not zone_words: + return { + "columns": [], + "rows": [], + "cells": [], + "header_rows": [], + } + + # Cluster rows first (needed for column alignment analysis) + rows = _cluster_rows(zone_words) + + # Diagnostic logging for small/medium zones (box zones typically have 40-60 words) + if len(zone_words) <= 60: + import statistics as _st + _heights = [w['height'] for w in zone_words if w.get('height', 0) > 0] + _med_h = _st.median(_heights) if _heights else 20 + _y_tol = max(_med_h * 0.5, 5) + logger.info( + "zone %d row-clustering: %d words, median_h=%.0f, y_tol=%.1f → %d rows", + zone_index, len(zone_words), _med_h, _y_tol, len(rows), + ) + for w in sorted(zone_words, key=lambda ww: (ww['top'], ww['left'])): + logger.info( + " zone %d word: y=%d x=%d h=%d w=%d '%s'", + zone_index, w['top'], w['left'], w['height'], w['width'], + w.get('text', '')[:40], + ) + for r in rows: + logger.info( + " zone %d row %d: y_min=%d y_max=%d y_center=%.0f", + zone_index, r['index'], r['y_min'], r['y_max'], r['y_center'], + ) + + # Use global columns if provided, otherwise detect per zone + columns = global_columns if global_columns else _cluster_columns_by_alignment(zone_words, zone_w, rows) + + # Merge inline marker columns (bullets, numbering) into adjacent text + if not global_columns: + columns = _merge_inline_marker_columns(columns, zone_words) + + if not columns or not rows: + return { + "columns": [], + "rows": [], + "cells": [], + "header_rows": [], + } + + # Build cells + cells = _build_cells(zone_words, columns, rows, img_w, img_h) + + # Prefix cell IDs with zone index + for cell in cells: + cell["cell_id"] = f"Z{zone_index}_{cell['cell_id']}" + cell["zone_index"] = zone_index + + # Detect header rows (pass columns for spanning header detection) + header_rows = _detect_header_rows(rows, zone_words, zone_y, columns, + skip_first_row_header=skip_first_row_header) + + # Merge cells in spanning header rows into a single col-0 cell + if header_rows and len(columns) >= 2: + for hri in header_rows: + header_cells = [c for c in cells if c["row_index"] == hri] + if len(header_cells) <= 1: + continue + # Collect all word_boxes and text from all columns + all_wb = [] + all_text_parts = [] + for hc in sorted(header_cells, key=lambda c: c["col_index"]): + all_wb.extend(hc.get("word_boxes", [])) + if hc.get("text", "").strip(): + all_text_parts.append(hc["text"].strip()) + # Remove all header cells, replace with one spanning cell + cells = [c for c in cells if c["row_index"] != hri] + if all_wb: + x_min = min(wb["left"] for wb in all_wb) + y_min = min(wb["top"] for wb in all_wb) + x_max = max(wb["left"] + wb["width"] for wb in all_wb) + y_max = max(wb["top"] + wb["height"] for wb in all_wb) + cells.append({ + "cell_id": f"R{hri:02d}_C0", + "row_index": hri, + "col_index": 0, + "col_type": "spanning_header", + "text": " ".join(all_text_parts), + "confidence": 0.0, + "bbox_px": {"x": x_min, "y": y_min, + "w": x_max - x_min, "h": y_max - y_min}, + "bbox_pct": { + "x": round(x_min / img_w * 100, 2) if img_w else 0, + "y": round(y_min / img_h * 100, 2) if img_h else 0, + "w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0, + "h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0, + }, + "word_boxes": all_wb, + "ocr_engine": "words_first", + "is_bold": True, + }) + + # Convert columns to output format with percentages + out_columns = [] + for col in columns: + x_min = col["x_min"] + x_max = col["x_max"] + out_columns.append({ + "index": col["index"], + "label": col["type"], + "x_min_px": round(x_min), + "x_max_px": round(x_max), + "x_min_pct": round(x_min / img_w * 100, 2) if img_w else 0, + "x_max_pct": round(x_max / img_w * 100, 2) if img_w else 0, + "bold": False, + }) + + # Convert rows to output format with percentages + out_rows = [] + for row in rows: + out_rows.append({ + "index": row["index"], + "y_min_px": round(row["y_min"]), + "y_max_px": round(row["y_max"]), + "y_min_pct": round(row["y_min"] / img_h * 100, 2) if img_h else 0, + "y_max_pct": round(row["y_max"] / img_h * 100, 2) if img_h else 0, + "is_header": row["index"] in header_rows, + }) + + return { + "columns": out_columns, + "rows": out_rows, + "cells": cells, + "header_rows": header_rows, + "_raw_columns": columns, # internal: for propagation to other zones + } + + +def _get_content_bounds(words: List[Dict]) -> tuple: + """Get content bounds from word positions.""" + if not words: + return 0, 0, 0, 0 + x_min = min(w["left"] for w in words) + y_min = min(w["top"] for w in words) + x_max = max(w["left"] + w["width"] for w in words) + y_max = max(w["top"] + w["height"] for w in words) + return x_min, y_min, x_max - x_min, y_max - y_min + + +def _filter_decorative_margin( + words: List[Dict], + img_w: int, + log: Any, + session_id: str, +) -> Dict[str, Any]: + """Remove words that belong to a decorative alphabet strip on a margin. + + Some vocabulary worksheets have a vertical A–Z alphabet graphic along + the left or right edge. OCR reads each letter as an isolated single- + character word. These decorative elements are not content and confuse + column/row detection. + + Detection criteria (phase 1 — find the strip using single-char words): + - Words are in the outer 30% of the page (left or right) + - Nearly all words are single characters (letters or digits) + - At least 8 such words form a vertical strip (≥8 unique Y positions) + - Average horizontal spread of the strip is small (< 80px) + + Phase 2 — once a strip is confirmed, also remove any short word (≤3 + chars) in the same narrow x-range. This catches multi-char OCR + artifacts like "Vv" that belong to the same decorative element. + + Modifies *words* in place. + + Returns: + Dict with 'found' (bool), 'side' (str), 'letters_detected' (int). + """ + no_strip: Dict[str, Any] = {"found": False, "side": "", "letters_detected": 0} + if not words or img_w <= 0: + return no_strip + + margin_cutoff = img_w * 0.30 + # Phase 1: find candidate strips using short words (1-2 chars). + # OCR often reads alphabet sidebar letters as pairs ("Aa", "Bb") + # rather than singles, so accept ≤2-char words as strip candidates. + left_strip = [ + w for w in words + if len((w.get("text") or "").strip()) <= 2 + and w["left"] + w.get("width", 0) / 2 < margin_cutoff + ] + right_strip = [ + w for w in words + if len((w.get("text") or "").strip()) <= 2 + and w["left"] + w.get("width", 0) / 2 > img_w - margin_cutoff + ] + + for strip, side in [(left_strip, "left"), (right_strip, "right")]: + if len(strip) < 6: + continue + # Check vertical distribution: should have many distinct Y positions + y_centers = sorted(set( + int(w["top"] + w.get("height", 0) / 2) // 20 * 20 # bucket + for w in strip + )) + if len(y_centers) < 6: + continue + # Check horizontal compactness + x_positions = [w["left"] for w in strip] + x_min = min(x_positions) + x_max = max(x_positions) + x_spread = x_max - x_min + if x_spread > 80: + continue + + # Phase 2: strip confirmed — also collect short words in same x-range + # Expand x-range slightly to catch neighbors (e.g. "Vv" next to "U") + strip_x_lo = x_min - 20 + strip_x_hi = x_max + 60 # word width + tolerance + all_strip_words = [ + w for w in words + if len((w.get("text") or "").strip()) <= 3 + and strip_x_lo <= w["left"] <= strip_x_hi + and (w["left"] + w.get("width", 0) / 2 < margin_cutoff + if side == "left" + else w["left"] + w.get("width", 0) / 2 > img_w - margin_cutoff) + ] + + strip_set = set(id(w) for w in all_strip_words) + before = len(words) + words[:] = [w for w in words if id(w) not in strip_set] + removed = before - len(words) + if removed: + log.info( + "build-grid session %s: removed %d decorative %s-margin words " + "(strip x=%d-%d)", + session_id, removed, side, strip_x_lo, strip_x_hi, + ) + return {"found": True, "side": side, "letters_detected": len(strip)} + + return no_strip + + +def _filter_footer_words( + words: List[Dict], + img_h: int, + log: Any, + session_id: str, +) -> None: + """Remove isolated words in the bottom 5% of the page (page numbers). + + Modifies *words* in place. + """ + if not words or img_h <= 0: + return + footer_y = img_h * 0.95 + footer_words = [ + w for w in words + if w["top"] + w.get("height", 0) / 2 > footer_y + ] + if not footer_words: + return + # Only remove if footer has very few words (≤ 3) with short text + total_text = "".join((w.get("text") or "").strip() for w in footer_words) + if len(footer_words) <= 3 and len(total_text) <= 10: + footer_set = set(id(w) for w in footer_words) + words[:] = [w for w in words if id(w) not in footer_set] + log.info( + "build-grid session %s: removed %d footer words ('%s')", + session_id, len(footer_words), total_text, + ) + + +def _filter_header_junk( + words: List[Dict], + img_h: int, + log: Any, + session_id: str, +) -> None: + """Remove OCR junk from header illustrations above the real content. + + Textbook pages often have decorative header graphics (illustrations, + icons) that OCR reads as low-confidence junk characters. Real content + typically starts further down the page. + + Algorithm: + 1. Find the "content start" — the first Y position where a dense + horizontal row of 3+ high-confidence words begins. + 2. Above that line, remove words with conf < 75 and text ≤ 3 chars. + These are almost certainly OCR artifacts from illustrations. + + Modifies *words* in place. + """ + if not words or img_h <= 0: + return + + # --- Find content start: first horizontal row with ≥3 high-conf words --- + # Sort words by Y + sorted_by_y = sorted(words, key=lambda w: w["top"]) + content_start_y = 0 + _ROW_TOLERANCE = img_h * 0.02 # words within 2% of page height = same row + _MIN_ROW_WORDS = 3 + _MIN_CONF = 80 + + i = 0 + while i < len(sorted_by_y): + row_y = sorted_by_y[i]["top"] + # Collect words in this row band + row_words = [] + j = i + while j < len(sorted_by_y) and sorted_by_y[j]["top"] - row_y < _ROW_TOLERANCE: + row_words.append(sorted_by_y[j]) + j += 1 + # Count high-confidence words with real text (> 1 char) + high_conf = [ + w for w in row_words + if w.get("conf", 0) >= _MIN_CONF + and len((w.get("text") or "").strip()) > 1 + ] + if len(high_conf) >= _MIN_ROW_WORDS: + content_start_y = row_y + break + i = j if j > i else i + 1 + + if content_start_y <= 0: + return # no clear content start found + + # --- Remove low-conf short junk above content start --- + junk = [ + w for w in words + if w["top"] + w.get("height", 0) < content_start_y + and w.get("conf", 0) < 75 + and len((w.get("text") or "").strip()) <= 3 + ] + if not junk: + return + + junk_set = set(id(w) for w in junk) + before = len(words) + words[:] = [w for w in words if id(w) not in junk_set] + removed = before - len(words) + if removed: + log.info( + "build-grid session %s: removed %d header junk words above y=%d " + "(content start)", + session_id, removed, content_start_y, + ) +