""" Grid Editor helper functions — filters, detectors, and zone grid building. Extracted from grid_editor_api.py for maintainability. All functions are pure computation — no HTTP, DB, or session side effects. Lizenz: Apache 2.0 (kommerziell nutzbar) DATENSCHUTZ: Alle Verarbeitung erfolgt lokal. """ import logging import re from typing import Any, Dict, List, Optional, Tuple import cv2 import numpy as np from cv_vocab_types import PageZone from cv_words_first import _cluster_rows, _build_cells from cv_ocr_engines import _text_has_garbled_ipa logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Cross-column word splitting # --------------------------------------------------------------------------- _spell_cache: Optional[Any] = None _spell_loaded = False def _is_recognized_word(text: str) -> bool: """Check if *text* is a recognized German or English word. Uses the spellchecker library (same as cv_syllable_detect.py). Returns True for real words like "oder", "Kabel", "Zeitung". Returns False for OCR merge artifacts like "sichzie", "dasZimmer". """ global _spell_cache, _spell_loaded if not text or len(text) < 2: return False if not _spell_loaded: _spell_loaded = True try: from spellchecker import SpellChecker _spell_cache = SpellChecker(language="de") except Exception: pass if _spell_cache is None: return False return text.lower() in _spell_cache def _split_cross_column_words( words: List[Dict], columns: List[Dict], ) -> List[Dict]: """Split word boxes that span across column boundaries. When OCR merges adjacent words from different columns (e.g. "sichzie" spanning Col 1 and Col 2, or "dasZimmer" crossing the boundary), split the word box at the column boundary so each piece is assigned to the correct column. Only splits when: - The word has significant overlap (>15% of its width) on both sides - AND the word is not a recognized real word (OCR merge artifact), OR the word contains a case transition (lowercase→uppercase) near the boundary indicating two merged words like "dasZimmer". """ if len(columns) < 2: return words # Column boundaries = midpoints between adjacent column edges boundaries = [] for i in range(len(columns) - 1): boundary = (columns[i]["x_max"] + columns[i + 1]["x_min"]) / 2 boundaries.append(boundary) new_words: List[Dict] = [] split_count = 0 for w in words: w_left = w["left"] w_width = w["width"] w_right = w_left + w_width text = (w.get("text") or "").strip() if not text or len(text) < 4 or w_width < 10: new_words.append(w) continue # Find the first boundary this word straddles significantly split_boundary = None for b in boundaries: if w_left < b < w_right: left_part = b - w_left right_part = w_right - b # Both sides must have at least 15% of the word width if left_part > w_width * 0.15 and right_part > w_width * 0.15: split_boundary = b break if split_boundary is None: new_words.append(w) continue # Compute approximate split position in the text. left_width = split_boundary - w_left split_ratio = left_width / w_width approx_pos = len(text) * split_ratio # Strategy 1: look for a case transition (lowercase→uppercase) near # the approximate split point — e.g. "dasZimmer" splits at 'Z'. split_char = None search_lo = max(1, int(approx_pos) - 3) search_hi = min(len(text), int(approx_pos) + 2) for i in range(search_lo, search_hi): if text[i - 1].islower() and text[i].isupper(): split_char = i break # Strategy 2: if no case transition, only split if the whole word # is NOT a real word (i.e. it's an OCR merge artifact like "sichzie"). # Real words like "oder", "Kabel", "Zeitung" must not be split. if split_char is None: clean = re.sub(r"[,;:.!?]+$", "", text) # strip trailing punct if _is_recognized_word(clean): new_words.append(w) continue # Not a real word — use floor of proportional position split_char = max(1, min(len(text) - 1, int(approx_pos))) left_text = text[:split_char].rstrip() right_text = text[split_char:].lstrip() if len(left_text) < 2 or len(right_text) < 2: new_words.append(w) continue right_width = w_width - round(left_width) new_words.append({ **w, "text": left_text, "width": round(left_width), }) new_words.append({ **w, "text": right_text, "left": round(split_boundary), "width": right_width, }) split_count += 1 logger.info( "split cross-column word %r → %r + %r at boundary %.0f", text, left_text, right_text, split_boundary, ) if split_count: logger.info("split %d cross-column word(s)", split_count) return new_words def _filter_border_strip_words(words: List[Dict]) -> Tuple[List[Dict], int]: """Remove page-border decoration strip words BEFORE column detection. Scans from each page edge inward to find the first significant x-gap (>30 px). If the edge cluster contains <15 % of total words, those words are removed as border-strip artifacts (alphabet letters, illustration fragments). Must run BEFORE ``_build_zone_grid`` so that column detection only sees real content words and doesn't produce inflated row counts. """ if len(words) < 10: return words, 0 sorted_words = sorted(words, key=lambda w: w.get("left", 0)) total = len(sorted_words) # -- Left-edge scan (running max right-edge) -- left_count = 0 running_right = 0 for gi in range(total - 1): running_right = max( running_right, sorted_words[gi].get("left", 0) + sorted_words[gi].get("width", 0), ) if sorted_words[gi + 1].get("left", 0) - running_right > 30: left_count = gi + 1 break # -- Right-edge scan (running min left) -- right_count = 0 running_left = sorted_words[-1].get("left", 0) for gi in range(total - 1, 0, -1): running_left = min(running_left, sorted_words[gi].get("left", 0)) prev_right = ( sorted_words[gi - 1].get("left", 0) + sorted_words[gi - 1].get("width", 0) ) if running_left - prev_right > 30: right_count = total - gi break # Validate candidate strip: real border decorations are mostly short # words (alphabet letters like "A", "Bb", stray marks). Multi-word # content like "der Ranzen" or "die Schals" (continuation of German # translations) must NOT be removed. def _is_decorative_strip(candidates: List[Dict]) -> bool: if not candidates: return False short = sum(1 for w in candidates if len((w.get("text") or "").strip()) <= 2) return short / len(candidates) >= 0.45 strip_ids: set = set() if left_count > 0 and left_count / total < 0.20: candidates = sorted_words[:left_count] if _is_decorative_strip(candidates): strip_ids = {id(w) for w in candidates} elif right_count > 0 and right_count / total < 0.20: candidates = sorted_words[total - right_count:] if _is_decorative_strip(candidates): strip_ids = {id(w) for w in candidates} if not strip_ids: return words, 0 return [w for w in words if id(w) not in strip_ids], len(strip_ids) def _cluster_columns_by_alignment( words: List[Dict], zone_w: int, rows: List[Dict], ) -> List[Dict[str, Any]]: """Detect columns by clustering left-edge alignment across rows. Hybrid approach: 1. Group words by row, find "group start" positions within each row (words preceded by a large gap or first word in row) 2. Cluster group-start left-edges by X-proximity across rows 3. Filter by row coverage (how many rows have a group start here) 4. Merge nearby clusters 5. Build column boundaries This filters out mid-phrase word positions (e.g. IPA transcriptions, second words in multi-word entries) by only considering positions where a new word group begins within a row. """ if not words or not rows: return [] total_rows = len(rows) if total_rows == 0: return [] # --- Group words by row --- row_words: Dict[int, List[Dict]] = {} for w in words: y_center = w["top"] + w["height"] / 2 best = min(rows, key=lambda r: abs(r["y_center"] - y_center)) row_words.setdefault(best["index"], []).append(w) # --- Compute adaptive gap threshold for group-start detection --- all_gaps: List[float] = [] for ri, rw_list in row_words.items(): sorted_rw = sorted(rw_list, key=lambda w: w["left"]) for i in range(len(sorted_rw) - 1): right = sorted_rw[i]["left"] + sorted_rw[i]["width"] gap = sorted_rw[i + 1]["left"] - right if gap > 0: all_gaps.append(gap) if all_gaps: sorted_gaps = sorted(all_gaps) median_gap = sorted_gaps[len(sorted_gaps) // 2] heights = [w["height"] for w in words if w.get("height", 0) > 0] median_h = sorted(heights)[len(heights) // 2] if heights else 25 # Column boundary: gap > 3× median gap or > 1.5× median word height gap_threshold = max(median_gap * 3, median_h * 1.5, 30) else: gap_threshold = 50 # --- Find group-start positions (left-edges that begin a new column) --- start_positions: List[tuple] = [] # (left_edge, row_index) for ri, rw_list in row_words.items(): sorted_rw = sorted(rw_list, key=lambda w: w["left"]) # First word in row is always a group start start_positions.append((sorted_rw[0]["left"], ri)) for i in range(1, len(sorted_rw)): right_prev = sorted_rw[i - 1]["left"] + sorted_rw[i - 1]["width"] gap = sorted_rw[i]["left"] - right_prev if gap >= gap_threshold: start_positions.append((sorted_rw[i]["left"], ri)) start_positions.sort(key=lambda x: x[0]) logger.info( "alignment columns: %d group-start positions from %d words " "(gap_threshold=%.0f, %d rows)", len(start_positions), len(words), gap_threshold, total_rows, ) if not start_positions: x_min = min(w["left"] for w in words) x_max = max(w["left"] + w["width"] for w in words) return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}] # --- Cluster group-start positions by X-proximity --- tolerance = max(10, int(zone_w * 0.01)) clusters: List[Dict[str, Any]] = [] cur_edges = [start_positions[0][0]] cur_rows = {start_positions[0][1]} for left, row_idx in start_positions[1:]: if left - cur_edges[-1] <= tolerance: cur_edges.append(left) cur_rows.add(row_idx) else: clusters.append({ "mean_x": int(sum(cur_edges) / len(cur_edges)), "min_edge": min(cur_edges), "max_edge": max(cur_edges), "count": len(cur_edges), "distinct_rows": len(cur_rows), "row_coverage": len(cur_rows) / total_rows, }) cur_edges = [left] cur_rows = {row_idx} clusters.append({ "mean_x": int(sum(cur_edges) / len(cur_edges)), "min_edge": min(cur_edges), "max_edge": max(cur_edges), "count": len(cur_edges), "distinct_rows": len(cur_rows), "row_coverage": len(cur_rows) / total_rows, }) # --- Filter by row coverage --- # These thresholds must be high enough to avoid false columns in flowing # text (random inter-word gaps) while still detecting real columns in # vocabulary worksheets (which typically have >80% row coverage). MIN_COVERAGE_PRIMARY = 0.35 MIN_COVERAGE_SECONDARY = 0.12 MIN_WORDS_SECONDARY = 4 MIN_DISTINCT_ROWS = 3 # Content boundary for left-margin detection content_x_min = min(w["left"] for w in words) content_x_max = max(w["left"] + w["width"] for w in words) content_span = content_x_max - content_x_min primary = [ c for c in clusters if c["row_coverage"] >= MIN_COVERAGE_PRIMARY and c["distinct_rows"] >= MIN_DISTINCT_ROWS ] primary_ids = {id(c) for c in primary} secondary = [ c for c in clusters if id(c) not in primary_ids and c["row_coverage"] >= MIN_COVERAGE_SECONDARY and c["count"] >= MIN_WORDS_SECONDARY and c["distinct_rows"] >= MIN_DISTINCT_ROWS ] # Tertiary: narrow left-margin columns (page refs, markers) that have # too few rows for secondary but are clearly left-aligned and separated # from the main content. These appear at the far left or far right and # have a large gap to the nearest significant cluster. used_ids = {id(c) for c in primary} | {id(c) for c in secondary} sig_xs = [c["mean_x"] for c in primary + secondary] MIN_DISTINCT_ROWS_TERTIARY = max(MIN_DISTINCT_ROWS + 1, 4) MIN_COVERAGE_TERTIARY = 0.05 # at least 5% of rows tertiary = [] for c in clusters: if id(c) in used_ids: continue if c["distinct_rows"] < MIN_DISTINCT_ROWS_TERTIARY: continue if c["row_coverage"] < MIN_COVERAGE_TERTIARY: continue # Must be near left or right content margin (within 15%) rel_pos = (c["mean_x"] - content_x_min) / content_span if content_span else 0.5 if not (rel_pos < 0.15 or rel_pos > 0.85): continue # Must have significant gap to nearest significant cluster if sig_xs: min_dist = min(abs(c["mean_x"] - sx) for sx in sig_xs) if min_dist < max(30, content_span * 0.02): continue tertiary.append(c) if tertiary: for c in tertiary: logger.info( " tertiary (margin) cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)", c["mean_x"], c["min_edge"], c["max_edge"], c["count"], c["distinct_rows"], c["row_coverage"] * 100, ) significant = sorted(primary + secondary + tertiary, key=lambda c: c["mean_x"]) for c in significant: logger.info( " significant cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)", c["mean_x"], c["min_edge"], c["max_edge"], c["count"], c["distinct_rows"], c["row_coverage"] * 100, ) logger.info( "alignment columns: %d clusters, %d primary, %d secondary → %d significant", len(clusters), len(primary), len(secondary), len(significant), ) if not significant: # Fallback: single column covering all content x_min = min(w["left"] for w in words) x_max = max(w["left"] + w["width"] for w in words) return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}] # --- Merge nearby clusters --- merge_distance = max(25, int(zone_w * 0.03)) merged = [significant[0].copy()] for s in significant[1:]: if s["mean_x"] - merged[-1]["mean_x"] < merge_distance: prev = merged[-1] total = prev["count"] + s["count"] prev["mean_x"] = ( prev["mean_x"] * prev["count"] + s["mean_x"] * s["count"] ) // total prev["count"] = total prev["min_edge"] = min(prev["min_edge"], s["min_edge"]) prev["max_edge"] = max(prev["max_edge"], s["max_edge"]) prev["distinct_rows"] = max(prev["distinct_rows"], s["distinct_rows"]) else: merged.append(s.copy()) logger.info( "alignment columns: %d after merge (distance=%d)", len(merged), merge_distance, ) # --- Build column boundaries --- margin = max(5, int(zone_w * 0.005)) content_x_min = min(w["left"] for w in words) content_x_max = max(w["left"] + w["width"] for w in words) columns: List[Dict[str, Any]] = [] for i, cluster in enumerate(merged): x_min = max(content_x_min, cluster["min_edge"] - margin) if i + 1 < len(merged): x_max = merged[i + 1]["min_edge"] - margin else: x_max = content_x_max columns.append({ "index": i, "type": f"column_{i + 1}" if len(merged) > 1 else "column_text", "x_min": x_min, "x_max": x_max, }) return columns # Characters that are typically OCR artefacts from box border lines. # Intentionally excludes ! (red markers) and . , ; (real punctuation). _GRID_GHOST_CHARS = set("|1lI[](){}/\\-—–_~=+") def _filter_border_ghosts( words: List[Dict], boxes: List, ) -> tuple: """Remove words sitting on box borders that are OCR artefacts. Returns (filtered_words, removed_count). """ if not boxes or not words: return words, 0 # Build border bands from detected boxes x_bands: List[tuple] = [] y_bands: List[tuple] = [] for b in boxes: bt = ( b.border_thickness if hasattr(b, "border_thickness") else b.get("border_thickness", 3) ) # Skip borderless boxes (images/graphics) — no border line to produce ghosts if bt == 0: continue bx = b.x if hasattr(b, "x") else b.get("x", 0) by = b.y if hasattr(b, "y") else b.get("y", 0) bw = b.width if hasattr(b, "width") else b.get("w", b.get("width", 0)) bh = b.height if hasattr(b, "height") else b.get("h", b.get("height", 0)) margin = max(bt * 2, 10) + 6 x_bands.append((bx - margin, bx + margin)) x_bands.append((bx + bw - margin, bx + bw + margin)) y_bands.append((by - margin, by + margin)) y_bands.append((by + bh - margin, by + bh + margin)) def _is_ghost(w: Dict) -> bool: text = (w.get("text") or "").strip() if not text: return False # Check if any word edge (not just center) touches a border band w_left = w["left"] w_right = w["left"] + w["width"] w_top = w["top"] w_bottom = w["top"] + w["height"] on_border = ( any(lo <= w_left <= hi or lo <= w_right <= hi for lo, hi in x_bands) or any(lo <= w_top <= hi or lo <= w_bottom <= hi for lo, hi in y_bands) ) if not on_border: return False if len(text) == 1 and text in _GRID_GHOST_CHARS: return True return False filtered = [w for w in words if not _is_ghost(w)] return filtered, len(words) - len(filtered) _MARKER_CHARS = set("•*·-–—|~=+#>→►▸▪◆○●□■✓✗✔✘") def _merge_inline_marker_columns( columns: List[Dict], words: List[Dict], ) -> List[Dict]: """Merge narrow marker columns (bullets, numbering) into adjacent text. Bullet points (•, *, -) and numbering (1., 2.) create narrow columns at the left edge of a zone. These are inline markers that indent text, not real separate columns. Merge them with their right neighbour. Does NOT merge columns containing alphabetic words like "to", "in", "der", "die", "das" — those are legitimate content columns. """ if len(columns) < 2: return columns merged: List[Dict] = [] skip: set = set() for i, col in enumerate(columns): if i in skip: continue # Find words in this column col_words = [ w for w in words if col["x_min"] <= w["left"] + w["width"] / 2 < col["x_max"] ] col_width = col["x_max"] - col["x_min"] # Narrow column with mostly short words → MIGHT be inline markers if col_words and col_width < 80: avg_len = sum(len(w.get("text", "")) for w in col_words) / len(col_words) if avg_len <= 2 and i + 1 < len(columns): # Check if words are actual markers (symbols/numbers) vs # real alphabetic words like "to", "in", "der", "die" texts = [(w.get("text") or "").strip() for w in col_words] alpha_count = sum( 1 for t in texts if t and t[0].isalpha() and t not in _MARKER_CHARS ) alpha_ratio = alpha_count / len(texts) if texts else 0 # If ≥50% of words are alphabetic, this is a real column if alpha_ratio >= 0.5: logger.info( " kept narrow column %d (w=%d, avg_len=%.1f, " "alpha=%.0f%%) — contains real words", i, col_width, avg_len, alpha_ratio * 100, ) else: # Merge into next column next_col = columns[i + 1].copy() next_col["x_min"] = col["x_min"] merged.append(next_col) skip.add(i + 1) logger.info( " merged inline marker column %d (w=%d, avg_len=%.1f) " "into column %d", i, col_width, avg_len, i + 1, ) continue merged.append(col) # Re-index for i, col in enumerate(merged): col["index"] = i col["type"] = f"column_{i + 1}" if len(merged) > 1 else "column_text" return merged def _flatten_word_boxes(cells: List[Dict]) -> List[Dict]: """Extract all word_boxes from cells into a flat list of word dicts.""" words: List[Dict] = [] for cell in cells: for wb in cell.get("word_boxes") or []: if wb.get("text", "").strip(): words.append({ "text": wb["text"], "left": wb["left"], "top": wb["top"], "width": wb["width"], "height": wb["height"], "conf": wb.get("conf", 0), }) return words def _words_in_zone( words: List[Dict], zone_y: int, zone_h: int, zone_x: int, zone_w: int, ) -> List[Dict]: """Filter words whose Y-center falls within a zone's bounds.""" zone_y_end = zone_y + zone_h zone_x_end = zone_x + zone_w result = [] for w in words: cy = w["top"] + w["height"] / 2 cx = w["left"] + w["width"] / 2 if zone_y <= cy <= zone_y_end and zone_x <= cx <= zone_x_end: result.append(w) return result # --------------------------------------------------------------------------- # Vertical divider detection and zone splitting # --------------------------------------------------------------------------- _PIPE_RE_VSPLIT = re.compile(r"^\|+$") def _detect_vertical_dividers( words: List[Dict], zone_x: int, zone_w: int, zone_y: int, zone_h: int, ) -> List[float]: """Detect vertical divider lines from pipe word_boxes at consistent x. Returns list of divider x-positions (empty if no dividers found). """ if not words or zone_w <= 0 or zone_h <= 0: return [] # Collect pipe word_boxes pipes = [ w for w in words if _PIPE_RE_VSPLIT.match((w.get("text") or "").strip()) ] if len(pipes) < 5: return [] # Cluster pipe x-centers by proximity tolerance = max(15, int(zone_w * 0.02)) pipe_xs = sorted(w["left"] + w["width"] / 2 for w in pipes) clusters: List[List[float]] = [[pipe_xs[0]]] for x in pipe_xs[1:]: if x - clusters[-1][-1] <= tolerance: clusters[-1].append(x) else: clusters.append([x]) dividers: List[float] = [] for cluster in clusters: if len(cluster) < 5: continue mean_x = sum(cluster) / len(cluster) # Must be between 15% and 85% of zone width rel_pos = (mean_x - zone_x) / zone_w if rel_pos < 0.15 or rel_pos > 0.85: continue # Check vertical coverage: pipes must span >= 50% of zone height cluster_pipes = [ w for w in pipes if abs(w["left"] + w["width"] / 2 - mean_x) <= tolerance ] ys = [w["top"] for w in cluster_pipes] + [w["top"] + w["height"] for w in cluster_pipes] y_span = max(ys) - min(ys) if ys else 0 if y_span < zone_h * 0.5: continue dividers.append(mean_x) return sorted(dividers) def _split_zone_at_vertical_dividers( zone: "PageZone", divider_xs: List[float], vsplit_group_id: int, ) -> List["PageZone"]: """Split a PageZone at vertical divider positions into sub-zones.""" from cv_vocab_types import PageZone boundaries = [zone.x] + divider_xs + [zone.x + zone.width] hints = [] for i in range(len(boundaries) - 1): if i == 0: hints.append("left_of_vsplit") elif i == len(boundaries) - 2: hints.append("right_of_vsplit") else: hints.append("middle_of_vsplit") sub_zones = [] for i in range(len(boundaries) - 1): x_start = int(boundaries[i]) x_end = int(boundaries[i + 1]) sub = PageZone( index=0, # re-indexed later zone_type=zone.zone_type, y=zone.y, height=zone.height, x=x_start, width=x_end - x_start, box=zone.box, image_overlays=zone.image_overlays, layout_hint=hints[i], vsplit_group=vsplit_group_id, ) sub_zones.append(sub) return sub_zones def _merge_content_zones_across_boxes( zones: List, content_x: int, content_w: int, ) -> List: """Merge content zones separated by box zones into single zones. Box zones become image_overlays on the merged content zone. Pattern: [content, box*, content] → [merged_content with overlay] Box zones NOT between two content zones stay as standalone zones. """ if len(zones) < 3: return zones # Group consecutive runs of [content, box+, content] result: List = [] i = 0 while i < len(zones): z = zones[i] if z.zone_type != "content": result.append(z) i += 1 continue # Start of a potential merge group: content zone group_contents = [z] group_boxes = [] j = i + 1 # Absorb [box, content] pairs — only absorb a box if it's # confirmed to be followed by another content zone. while j < len(zones): if (zones[j].zone_type == "box" and j + 1 < len(zones) and zones[j + 1].zone_type == "content"): group_boxes.append(zones[j]) group_contents.append(zones[j + 1]) j += 2 else: break if len(group_contents) >= 2 and group_boxes: # Merge: create one large content zone spanning all y_min = min(c.y for c in group_contents) y_max = max(c.y + c.height for c in group_contents) overlays = [] for bz in group_boxes: overlay = { "y": bz.y, "height": bz.height, "x": bz.x, "width": bz.width, } if bz.box: overlay["box"] = { "x": bz.box.x, "y": bz.box.y, "width": bz.box.width, "height": bz.box.height, "confidence": bz.box.confidence, "border_thickness": bz.box.border_thickness, } overlays.append(overlay) merged = PageZone( index=0, # re-indexed below zone_type="content", y=y_min, height=y_max - y_min, x=content_x, width=content_w, image_overlays=overlays, ) result.append(merged) i = j else: # No merge possible — emit just the content zone result.append(z) i += 1 # Re-index zones for idx, z in enumerate(result): z.index = idx logger.info( "zone-merge: %d zones → %d zones after merging across boxes", len(zones), len(result), ) return result def _detect_heading_rows_by_color(zones_data: List[Dict], img_w: int, img_h: int) -> int: """Detect heading rows by color + height after color annotation. A row is a heading if: 1. ALL word_boxes have color_name != 'black' (typically 'blue') 2. Mean word height > 1.2x median height of all words in the zone Detected heading rows are merged into a single spanning cell. Returns count of headings detected. """ heading_count = 0 for z in zones_data: cells = z.get("cells", []) rows = z.get("rows", []) columns = z.get("columns", []) if not cells or not rows or len(columns) < 2: continue # Compute median word height across the zone all_heights = [] for cell in cells: for wb in cell.get("word_boxes") or []: h = wb.get("height", 0) if h > 0: all_heights.append(h) if not all_heights: continue all_heights_sorted = sorted(all_heights) median_h = all_heights_sorted[len(all_heights_sorted) // 2] heading_row_indices = [] for row in rows: if row.get("is_header"): continue # already detected as header ri = row["index"] row_cells = [c for c in cells if c.get("row_index") == ri] row_wbs = [ wb for cell in row_cells for wb in cell.get("word_boxes") or [] ] if not row_wbs: continue # Condition 1: ALL words are non-black all_colored = all( wb.get("color_name", "black") != "black" for wb in row_wbs ) if not all_colored: continue # Condition 2: mean height > 1.2x median mean_h = sum(wb.get("height", 0) for wb in row_wbs) / len(row_wbs) if mean_h <= median_h * 1.2: continue heading_row_indices.append(ri) # Merge heading cells into spanning cells for hri in heading_row_indices: header_cells = [c for c in cells if c.get("row_index") == hri] if len(header_cells) <= 1: # Single cell — just mark it as heading if header_cells: header_cells[0]["col_type"] = "heading" heading_count += 1 # Mark row as header for row in rows: if row["index"] == hri: row["is_header"] = True continue # Collect all word_boxes and text from all columns all_wb = [] all_text_parts = [] for hc in sorted(header_cells, key=lambda c: c["col_index"]): all_wb.extend(hc.get("word_boxes", [])) if hc.get("text", "").strip(): all_text_parts.append(hc["text"].strip()) # Remove all cells for this row, replace with one spanning cell z["cells"] = [c for c in z["cells"] if c.get("row_index") != hri] if all_wb: x_min = min(wb["left"] for wb in all_wb) y_min = min(wb["top"] for wb in all_wb) x_max = max(wb["left"] + wb["width"] for wb in all_wb) y_max = max(wb["top"] + wb["height"] for wb in all_wb) # Use the actual starting col_index from the first cell first_col = min(hc["col_index"] for hc in header_cells) zone_idx = z.get("zone_index", 0) z["cells"].append({ "cell_id": f"Z{zone_idx}_R{hri:02d}_C{first_col}", "zone_index": zone_idx, "row_index": hri, "col_index": first_col, "col_type": "heading", "text": " ".join(all_text_parts), "confidence": 0.0, "bbox_px": {"x": x_min, "y": y_min, "w": x_max - x_min, "h": y_max - y_min}, "bbox_pct": { "x": round(x_min / img_w * 100, 2) if img_w else 0, "y": round(y_min / img_h * 100, 2) if img_h else 0, "w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0, "h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0, }, "word_boxes": all_wb, "ocr_engine": "words_first", "is_bold": True, }) # Mark row as header for row in rows: if row["index"] == hri: row["is_header"] = True heading_count += 1 return heading_count def _detect_heading_rows_by_single_cell( zones_data: List[Dict], img_w: int, img_h: int, ) -> int: """Detect heading rows that have only a single content cell. Black headings like "Theme" have normal color and height, so they are missed by ``_detect_heading_rows_by_color``. The distinguishing signal is that they occupy only one column while normal vocabulary rows fill at least 2-3 columns. A row qualifies as a heading if: 1. It is not already marked as a header/heading. 2. It has exactly ONE cell whose col_type starts with ``column_`` (excluding column_1 / page_ref which only carries page numbers). 3. That single cell is NOT in the last column (continuation/example lines like "2. Veränderung, Wechsel" often sit alone in column_4). 4. The text does not start with ``[`` (IPA continuation). 5. The zone has ≥3 columns and ≥5 rows (avoids false positives in tiny zones). 6. The majority of rows in the zone have ≥2 content cells (ensures we are in a multi-column vocab layout). """ heading_count = 0 for z in zones_data: cells = z.get("cells", []) rows = z.get("rows", []) columns = z.get("columns", []) if len(columns) < 3 or len(rows) < 5: continue # Determine the last col_index (example/sentence column) col_indices = sorted(set(c.get("col_index", 0) for c in cells)) if not col_indices: continue last_col = col_indices[-1] # Count content cells per row (column_* but not column_1/page_ref). # Exception: column_1 cells that contain a dictionary article word # (die/der/das etc.) ARE content — they appear in dictionary layouts # where the leftmost column holds grammatical articles. _ARTICLE_WORDS = { "die", "der", "das", "dem", "den", "des", "ein", "eine", "the", "a", "an", } row_content_counts: Dict[int, int] = {} for cell in cells: ct = cell.get("col_type", "") if not ct.startswith("column_"): continue if ct == "column_1": ctext = (cell.get("text") or "").strip().lower() if ctext not in _ARTICLE_WORDS: continue ri = cell.get("row_index", -1) row_content_counts[ri] = row_content_counts.get(ri, 0) + 1 # Majority of rows must have ≥2 content cells multi_col_rows = sum(1 for cnt in row_content_counts.values() if cnt >= 2) if multi_col_rows < len(rows) * 0.4: continue # Exclude first and last non-header rows — these are typically # page numbers or footer text, not headings. non_header_rows = [r for r in rows if not r.get("is_header")] if len(non_header_rows) < 3: continue first_ri = non_header_rows[0]["index"] last_ri = non_header_rows[-1]["index"] heading_row_indices = [] for row in rows: if row.get("is_header"): continue ri = row["index"] if ri == first_ri or ri == last_ri: continue row_cells = [c for c in cells if c.get("row_index") == ri] content_cells = [ c for c in row_cells if c.get("col_type", "").startswith("column_") and (c.get("col_type") != "column_1" or (c.get("text") or "").strip().lower() in _ARTICLE_WORDS) ] if len(content_cells) != 1: continue cell = content_cells[0] # Not in the last column (continuation/example lines) if cell.get("col_index") == last_col: continue text = (cell.get("text") or "").strip() if not text or text.startswith("["): continue # Skip garbled IPA without brackets (e.g. "ska:f – ska:vz") # but NOT text with real IPA symbols (e.g. "Theme [θˈiːm]") _REAL_IPA_CHARS = set("ˈˌəɪɛɒʊʌæɑɔʃʒθðŋ") if _text_has_garbled_ipa(text) and not any(c in _REAL_IPA_CHARS for c in text): continue # Guard: dictionary section headings are short (1-4 alpha chars # like "A", "Ab", "Zi", "Sch"). Longer text that starts # lowercase is a regular vocabulary word (e.g. "zentral") that # happens to appear alone in its row. alpha_only = re.sub(r'[^a-zA-ZäöüÄÖÜßẞ]', '', text) if len(alpha_only) > 4 and text[0].islower(): continue heading_row_indices.append(ri) # Guard: if >25% of eligible rows would become headings, the # heuristic is misfiring (e.g. sparse single-column layout where # most rows naturally have only 1 content cell). eligible_rows = len(non_header_rows) - 2 # minus first/last excluded if eligible_rows > 0 and len(heading_row_indices) > eligible_rows * 0.25: logger.debug( "Skipping single-cell heading detection for zone %s: " "%d/%d rows would be headings (>25%%)", z.get("zone_index"), len(heading_row_indices), eligible_rows, ) continue for hri in heading_row_indices: header_cells = [c for c in cells if c.get("row_index") == hri] if not header_cells: continue # Collect all word_boxes and text all_wb = [] all_text_parts = [] for hc in sorted(header_cells, key=lambda c: c["col_index"]): all_wb.extend(hc.get("word_boxes", [])) if hc.get("text", "").strip(): all_text_parts.append(hc["text"].strip()) first_col_idx = min(hc["col_index"] for hc in header_cells) # Remove old cells for this row, add spanning heading cell z["cells"] = [c for c in z["cells"] if c.get("row_index") != hri] if all_wb: x_min = min(wb["left"] for wb in all_wb) y_min = min(wb["top"] for wb in all_wb) x_max = max(wb["left"] + wb["width"] for wb in all_wb) y_max = max(wb["top"] + wb["height"] for wb in all_wb) else: # Fallback to first cell bbox bp = header_cells[0].get("bbox_px", {}) x_min = bp.get("x", 0) y_min = bp.get("y", 0) x_max = x_min + bp.get("w", 0) y_max = y_min + bp.get("h", 0) zone_idx = z.get("zone_index", 0) z["cells"].append({ "cell_id": f"Z{zone_idx}_R{hri:02d}_C{first_col_idx}", "zone_index": zone_idx, "row_index": hri, "col_index": first_col_idx, "col_type": "heading", "text": " ".join(all_text_parts), "confidence": 0.0, "bbox_px": {"x": x_min, "y": y_min, "w": x_max - x_min, "h": y_max - y_min}, "bbox_pct": { "x": round(x_min / img_w * 100, 2) if img_w else 0, "y": round(y_min / img_h * 100, 2) if img_h else 0, "w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0, "h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0, }, "word_boxes": all_wb, "ocr_engine": "words_first", "is_bold": False, }) for row in rows: if row["index"] == hri: row["is_header"] = True heading_count += 1 return heading_count def _detect_header_rows( rows: List[Dict], zone_words: List[Dict], zone_y: int, columns: Optional[List[Dict]] = None, skip_first_row_header: bool = False, ) -> List[int]: """Detect header rows: first-row heuristic + spanning header detection. A "spanning header" is a row whose words stretch across multiple column boundaries (e.g. "Unit4: Bonnie Scotland" centred across 4 columns). """ if len(rows) < 2: return [] headers = [] if not skip_first_row_header: first_row = rows[0] second_row = rows[1] # Gap between first and second row > 0.5x average row height avg_h = sum(r["y_max"] - r["y_min"] for r in rows) / len(rows) gap = second_row["y_min"] - first_row["y_max"] if gap > avg_h * 0.5: headers.append(0) # Also check if first row words are taller than average (bold/header text) all_heights = [w["height"] for w in zone_words] median_h = sorted(all_heights)[len(all_heights) // 2] if all_heights else 20 first_row_words = [ w for w in zone_words if first_row["y_min"] <= w["top"] + w["height"] / 2 <= first_row["y_max"] ] if first_row_words: first_h = max(w["height"] for w in first_row_words) if first_h > median_h * 1.3: if 0 not in headers: headers.append(0) # Note: Spanning-header detection (rows spanning all columns) has been # disabled because it produces too many false positives on vocabulary # worksheets where IPA transcriptions or short entries naturally span # multiple columns with few words. The first-row heuristic above is # sufficient for detecting real headers. return headers def _build_zone_grid( zone_words: List[Dict], zone_x: int, zone_y: int, zone_w: int, zone_h: int, zone_index: int, img_w: int, img_h: int, global_columns: Optional[List[Dict]] = None, skip_first_row_header: bool = False, ) -> Dict[str, Any]: """Build columns, rows, cells for a single zone from its words. Args: global_columns: If provided, use these pre-computed column boundaries instead of detecting columns per zone. Used for content zones so that all content zones (above/between/below boxes) share the same column structure. Box zones always detect columns independently. """ if not zone_words: return { "columns": [], "rows": [], "cells": [], "header_rows": [], } # Cluster rows first (needed for column alignment analysis) rows = _cluster_rows(zone_words) # Diagnostic logging for small/medium zones (box zones typically have 40-60 words) if len(zone_words) <= 60: import statistics as _st _heights = [w['height'] for w in zone_words if w.get('height', 0) > 0] _med_h = _st.median(_heights) if _heights else 20 _y_tol = max(_med_h * 0.5, 5) logger.info( "zone %d row-clustering: %d words, median_h=%.0f, y_tol=%.1f → %d rows", zone_index, len(zone_words), _med_h, _y_tol, len(rows), ) for w in sorted(zone_words, key=lambda ww: (ww['top'], ww['left'])): logger.info( " zone %d word: y=%d x=%d h=%d w=%d '%s'", zone_index, w['top'], w['left'], w['height'], w['width'], w.get('text', '')[:40], ) for r in rows: logger.info( " zone %d row %d: y_min=%d y_max=%d y_center=%.0f", zone_index, r['index'], r['y_min'], r['y_max'], r['y_center'], ) # Use global columns if provided, otherwise detect per zone columns = global_columns if global_columns else _cluster_columns_by_alignment(zone_words, zone_w, rows) # Merge inline marker columns (bullets, numbering) into adjacent text if not global_columns: columns = _merge_inline_marker_columns(columns, zone_words) if not columns or not rows: return { "columns": [], "rows": [], "cells": [], "header_rows": [], } # Split word boxes that straddle column boundaries (e.g. "sichzie" # spanning Col 1 + Col 2). Must happen after column detection and # before cell assignment. if len(columns) >= 2: zone_words = _split_cross_column_words(zone_words, columns) # Build cells cells = _build_cells(zone_words, columns, rows, img_w, img_h) # Prefix cell IDs with zone index for cell in cells: cell["cell_id"] = f"Z{zone_index}_{cell['cell_id']}" cell["zone_index"] = zone_index # Detect header rows (pass columns for spanning header detection) header_rows = _detect_header_rows(rows, zone_words, zone_y, columns, skip_first_row_header=skip_first_row_header) # Merge cells in spanning header rows into a single col-0 cell if header_rows and len(columns) >= 2: for hri in header_rows: header_cells = [c for c in cells if c["row_index"] == hri] if len(header_cells) <= 1: continue # Collect all word_boxes and text from all columns all_wb = [] all_text_parts = [] for hc in sorted(header_cells, key=lambda c: c["col_index"]): all_wb.extend(hc.get("word_boxes", [])) if hc.get("text", "").strip(): all_text_parts.append(hc["text"].strip()) # Remove all header cells, replace with one spanning cell cells = [c for c in cells if c["row_index"] != hri] if all_wb: x_min = min(wb["left"] for wb in all_wb) y_min = min(wb["top"] for wb in all_wb) x_max = max(wb["left"] + wb["width"] for wb in all_wb) y_max = max(wb["top"] + wb["height"] for wb in all_wb) cells.append({ "cell_id": f"R{hri:02d}_C0", "row_index": hri, "col_index": 0, "col_type": "spanning_header", "text": " ".join(all_text_parts), "confidence": 0.0, "bbox_px": {"x": x_min, "y": y_min, "w": x_max - x_min, "h": y_max - y_min}, "bbox_pct": { "x": round(x_min / img_w * 100, 2) if img_w else 0, "y": round(y_min / img_h * 100, 2) if img_h else 0, "w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0, "h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0, }, "word_boxes": all_wb, "ocr_engine": "words_first", "is_bold": True, }) # Convert columns to output format with percentages out_columns = [] for col in columns: x_min = col["x_min"] x_max = col["x_max"] out_columns.append({ "index": col["index"], "label": col["type"], "x_min_px": round(x_min), "x_max_px": round(x_max), "x_min_pct": round(x_min / img_w * 100, 2) if img_w else 0, "x_max_pct": round(x_max / img_w * 100, 2) if img_w else 0, "bold": False, }) # Convert rows to output format with percentages out_rows = [] for row in rows: out_rows.append({ "index": row["index"], "y_min_px": round(row["y_min"]), "y_max_px": round(row["y_max"]), "y_min_pct": round(row["y_min"] / img_h * 100, 2) if img_h else 0, "y_max_pct": round(row["y_max"] / img_h * 100, 2) if img_h else 0, "is_header": row["index"] in header_rows, }) return { "columns": out_columns, "rows": out_rows, "cells": cells, "header_rows": header_rows, "_raw_columns": columns, # internal: for propagation to other zones } def _get_content_bounds(words: List[Dict]) -> tuple: """Get content bounds from word positions.""" if not words: return 0, 0, 0, 0 x_min = min(w["left"] for w in words) y_min = min(w["top"] for w in words) x_max = max(w["left"] + w["width"] for w in words) y_max = max(w["top"] + w["height"] for w in words) return x_min, y_min, x_max - x_min, y_max - y_min def _filter_decorative_margin( words: List[Dict], img_w: int, log: Any, session_id: str, ) -> Dict[str, Any]: """Remove words that belong to a decorative alphabet strip on a margin. Some vocabulary worksheets have a vertical A–Z alphabet graphic along the left or right edge. OCR reads each letter as an isolated single- character word. These decorative elements are not content and confuse column/row detection. Detection criteria (phase 1 — find the strip using single-char words): - Words are in the outer 30% of the page (left or right) - Nearly all words are single characters (letters or digits) - At least 8 such words form a vertical strip (≥8 unique Y positions) - Average horizontal spread of the strip is small (< 80px) Phase 2 — once a strip is confirmed, also remove any short word (≤3 chars) in the same narrow x-range. This catches multi-char OCR artifacts like "Vv" that belong to the same decorative element. Modifies *words* in place. Returns: Dict with 'found' (bool), 'side' (str), 'letters_detected' (int). """ no_strip: Dict[str, Any] = {"found": False, "side": "", "letters_detected": 0} if not words or img_w <= 0: return no_strip margin_cutoff = img_w * 0.30 # Phase 1: find candidate strips using short words (1-2 chars). # OCR often reads alphabet sidebar letters as pairs ("Aa", "Bb") # rather than singles, so accept ≤2-char words as strip candidates. left_strip = [ w for w in words if len((w.get("text") or "").strip()) <= 2 and w["left"] + w.get("width", 0) / 2 < margin_cutoff ] right_strip = [ w for w in words if len((w.get("text") or "").strip()) <= 2 and w["left"] + w.get("width", 0) / 2 > img_w - margin_cutoff ] for strip, side in [(left_strip, "left"), (right_strip, "right")]: if len(strip) < 6: continue # Check vertical distribution: should have many distinct Y positions y_centers = sorted(set( int(w["top"] + w.get("height", 0) / 2) // 20 * 20 # bucket for w in strip )) if len(y_centers) < 6: continue # Check horizontal compactness x_positions = [w["left"] for w in strip] x_min = min(x_positions) x_max = max(x_positions) x_spread = x_max - x_min if x_spread > 80: continue # Phase 2: strip confirmed — also collect short words in same x-range # Expand x-range slightly to catch neighbors (e.g. "Vv" next to "U") strip_x_lo = x_min - 20 strip_x_hi = x_max + 60 # word width + tolerance all_strip_words = [ w for w in words if len((w.get("text") or "").strip()) <= 3 and strip_x_lo <= w["left"] <= strip_x_hi and (w["left"] + w.get("width", 0) / 2 < margin_cutoff if side == "left" else w["left"] + w.get("width", 0) / 2 > img_w - margin_cutoff) ] strip_set = set(id(w) for w in all_strip_words) before = len(words) words[:] = [w for w in words if id(w) not in strip_set] removed = before - len(words) if removed: log.info( "build-grid session %s: removed %d decorative %s-margin words " "(strip x=%d-%d)", session_id, removed, side, strip_x_lo, strip_x_hi, ) return {"found": True, "side": side, "letters_detected": len(strip)} return no_strip def _filter_footer_words( words: List[Dict], img_h: int, log: Any, session_id: str, ) -> Optional[Dict]: """Remove isolated words in the bottom 5% of the page (page numbers). Modifies *words* in place and returns a page_number metadata dict if a page number was extracted, or None. """ if not words or img_h <= 0: return None footer_y = img_h * 0.95 footer_words = [ w for w in words if w["top"] + w.get("height", 0) / 2 > footer_y ] if not footer_words: return None # Only remove if footer has very few words (≤ 3) with short text total_text = "".join((w.get("text") or "").strip() for w in footer_words) if len(footer_words) <= 3 and len(total_text) <= 10: # Extract page number metadata before removing page_number_info = { "text": total_text.strip(), "y_pct": round(footer_words[0]["top"] / img_h * 100, 1), } # Try to parse as integer digits = "".join(c for c in total_text if c.isdigit()) if digits: page_number_info["number"] = int(digits) footer_set = set(id(w) for w in footer_words) words[:] = [w for w in words if id(w) not in footer_set] log.info( "build-grid session %s: extracted page number '%s' and removed %d footer words", session_id, total_text, len(footer_words), ) return page_number_info return None def _filter_header_junk( words: List[Dict], img_h: int, log: Any, session_id: str, ) -> None: """Remove OCR junk from header illustrations above the real content. Textbook pages often have decorative header graphics (illustrations, icons) that OCR reads as low-confidence junk characters. Real content typically starts further down the page. Algorithm: 1. Find the "content start" — the first Y position where a dense horizontal row of 3+ high-confidence words begins. 2. Above that line, remove words with conf < 75 and text ≤ 3 chars. These are almost certainly OCR artifacts from illustrations. Modifies *words* in place. """ if not words or img_h <= 0: return # --- Find content start: first horizontal row with ≥3 high-conf words --- # Sort words by Y sorted_by_y = sorted(words, key=lambda w: w["top"]) content_start_y = 0 _ROW_TOLERANCE = img_h * 0.02 # words within 2% of page height = same row _MIN_ROW_WORDS = 3 _MIN_CONF = 80 i = 0 while i < len(sorted_by_y): row_y = sorted_by_y[i]["top"] # Collect words in this row band row_words = [] j = i while j < len(sorted_by_y) and sorted_by_y[j]["top"] - row_y < _ROW_TOLERANCE: row_words.append(sorted_by_y[j]) j += 1 # Count high-confidence words with real text (> 1 char) high_conf = [ w for w in row_words if w.get("conf", 0) >= _MIN_CONF and len((w.get("text") or "").strip()) > 1 ] if len(high_conf) >= _MIN_ROW_WORDS: content_start_y = row_y break i = j if j > i else i + 1 if content_start_y <= 0: return # no clear content start found # --- Remove low-conf short junk above content start --- junk = [ w for w in words if w["top"] + w.get("height", 0) < content_start_y and w.get("conf", 0) < 75 and len((w.get("text") or "").strip()) <= 3 ] if not junk: return junk_set = set(id(w) for w in junk) before = len(words) words[:] = [w for w in words if id(w) not in junk_set] removed = before - len(words) if removed: log.info( "build-grid session %s: removed %d header junk words above y=%d " "(content start)", session_id, removed, content_start_y, )