""" Grid Editor — column detection, cross-column splitting, marker merging. Split from grid_editor_helpers.py for maintainability. All functions are pure computation — no HTTP, DB, or session side effects. Lizenz: Apache 2.0 (kommerziell nutzbar) DATENSCHUTZ: Alle Verarbeitung erfolgt lokal. """ import logging import re from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Cross-column word splitting # --------------------------------------------------------------------------- _spell_cache: Optional[Any] = None _spell_loaded = False def _is_recognized_word(text: str) -> bool: """Check if *text* is a recognized German or English word. Uses the spellchecker library (same as cv_syllable_detect.py). Returns True for real words like "oder", "Kabel", "Zeitung". Returns False for OCR merge artifacts like "sichzie", "dasZimmer". """ global _spell_cache, _spell_loaded if not text or len(text) < 2: return False if not _spell_loaded: _spell_loaded = True try: from spellchecker import SpellChecker _spell_cache = SpellChecker(language="de") except Exception: pass if _spell_cache is None: return False return text.lower() in _spell_cache def _split_cross_column_words( words: List[Dict], columns: List[Dict], ) -> List[Dict]: """Split word boxes that span across column boundaries. When OCR merges adjacent words from different columns (e.g. "sichzie" spanning Col 1 and Col 2, or "dasZimmer" crossing the boundary), split the word box at the column boundary so each piece is assigned to the correct column. Only splits when: - The word has significant overlap (>15% of its width) on both sides - AND the word is not a recognized real word (OCR merge artifact), OR the word contains a case transition (lowercase->uppercase) near the boundary indicating two merged words like "dasZimmer". """ if len(columns) < 2: return words # Column boundaries = midpoints between adjacent column edges boundaries = [] for i in range(len(columns) - 1): boundary = (columns[i]["x_max"] + columns[i + 1]["x_min"]) / 2 boundaries.append(boundary) new_words: List[Dict] = [] split_count = 0 for w in words: w_left = w["left"] w_width = w["width"] w_right = w_left + w_width text = (w.get("text") or "").strip() if not text or len(text) < 4 or w_width < 10: new_words.append(w) continue # Find the first boundary this word straddles significantly split_boundary = None for b in boundaries: if w_left < b < w_right: left_part = b - w_left right_part = w_right - b # Both sides must have at least 15% of the word width if left_part > w_width * 0.15 and right_part > w_width * 0.15: split_boundary = b break if split_boundary is None: new_words.append(w) continue # Compute approximate split position in the text. left_width = split_boundary - w_left split_ratio = left_width / w_width approx_pos = len(text) * split_ratio # Strategy 1: look for a case transition (lowercase->uppercase) near # the approximate split point — e.g. "dasZimmer" splits at 'Z'. split_char = None search_lo = max(1, int(approx_pos) - 3) search_hi = min(len(text), int(approx_pos) + 2) for i in range(search_lo, search_hi): if text[i - 1].islower() and text[i].isupper(): split_char = i break # Strategy 2: if no case transition, only split if the whole word # is NOT a real word (i.e. it's an OCR merge artifact like "sichzie"). # Real words like "oder", "Kabel", "Zeitung" must not be split. if split_char is None: clean = re.sub(r"[,;:.!?]+$", "", text) # strip trailing punct if _is_recognized_word(clean): new_words.append(w) continue # Not a real word — use floor of proportional position split_char = max(1, min(len(text) - 1, int(approx_pos))) left_text = text[:split_char].rstrip() right_text = text[split_char:].lstrip() if len(left_text) < 2 or len(right_text) < 2: new_words.append(w) continue right_width = w_width - round(left_width) new_words.append({ **w, "text": left_text, "width": round(left_width), }) new_words.append({ **w, "text": right_text, "left": round(split_boundary), "width": right_width, }) split_count += 1 logger.info( "split cross-column word %r -> %r + %r at boundary %.0f", text, left_text, right_text, split_boundary, ) if split_count: logger.info("split %d cross-column word(s)", split_count) return new_words def _cluster_columns_by_alignment( words: List[Dict], zone_w: int, rows: List[Dict], ) -> List[Dict[str, Any]]: """Detect columns by clustering left-edge alignment across rows. Hybrid approach: 1. Group words by row, find "group start" positions within each row (words preceded by a large gap or first word in row) 2. Cluster group-start left-edges by X-proximity across rows 3. Filter by row coverage (how many rows have a group start here) 4. Merge nearby clusters 5. Build column boundaries This filters out mid-phrase word positions (e.g. IPA transcriptions, second words in multi-word entries) by only considering positions where a new word group begins within a row. """ if not words or not rows: return [] total_rows = len(rows) if total_rows == 0: return [] # --- Group words by row --- row_words: Dict[int, List[Dict]] = {} for w in words: y_center = w["top"] + w["height"] / 2 best = min(rows, key=lambda r: abs(r["y_center"] - y_center)) row_words.setdefault(best["index"], []).append(w) # --- Compute adaptive gap threshold for group-start detection --- all_gaps: List[float] = [] for ri, rw_list in row_words.items(): sorted_rw = sorted(rw_list, key=lambda w: w["left"]) for i in range(len(sorted_rw) - 1): right = sorted_rw[i]["left"] + sorted_rw[i]["width"] gap = sorted_rw[i + 1]["left"] - right if gap > 0: all_gaps.append(gap) if all_gaps: sorted_gaps = sorted(all_gaps) median_gap = sorted_gaps[len(sorted_gaps) // 2] heights = [w["height"] for w in words if w.get("height", 0) > 0] median_h = sorted(heights)[len(heights) // 2] if heights else 25 # For small word counts (boxes, sub-zones): PaddleOCR returns # multi-word blocks, so ALL inter-word gaps are potential column # boundaries. Use a low threshold based on word height — any gap # wider than ~1x median word height is a column separator. if len(words) <= 60: gap_threshold = max(median_h * 1.0, 25) logger.info( "alignment columns (small zone): gap_threshold=%.0f " "(median_h=%.0f, %d words, %d gaps: %s)", gap_threshold, median_h, len(words), len(sorted_gaps), [int(g) for g in sorted_gaps[:10]], ) else: # Standard approach for large zones (full pages) gap_threshold = max(median_gap * 3, median_h * 1.5, 30) # Cap at 25% of zone width max_gap = zone_w * 0.25 if gap_threshold > max_gap > 30: logger.info("alignment columns: capping gap_threshold %.0f -> %.0f (25%% of zone_w=%d)", gap_threshold, max_gap, zone_w) gap_threshold = max_gap else: gap_threshold = 50 # --- Find group-start positions (left-edges that begin a new column) --- start_positions: List[tuple] = [] # (left_edge, row_index) for ri, rw_list in row_words.items(): sorted_rw = sorted(rw_list, key=lambda w: w["left"]) # First word in row is always a group start start_positions.append((sorted_rw[0]["left"], ri)) for i in range(1, len(sorted_rw)): right_prev = sorted_rw[i - 1]["left"] + sorted_rw[i - 1]["width"] gap = sorted_rw[i]["left"] - right_prev if gap >= gap_threshold: start_positions.append((sorted_rw[i]["left"], ri)) start_positions.sort(key=lambda x: x[0]) logger.info( "alignment columns: %d group-start positions from %d words " "(gap_threshold=%.0f, %d rows)", len(start_positions), len(words), gap_threshold, total_rows, ) if not start_positions: x_min = min(w["left"] for w in words) x_max = max(w["left"] + w["width"] for w in words) return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}] # --- Cluster group-start positions by X-proximity --- tolerance = max(10, int(zone_w * 0.01)) clusters: List[Dict[str, Any]] = [] cur_edges = [start_positions[0][0]] cur_rows = {start_positions[0][1]} for left, row_idx in start_positions[1:]: if left - cur_edges[-1] <= tolerance: cur_edges.append(left) cur_rows.add(row_idx) else: clusters.append({ "mean_x": int(sum(cur_edges) / len(cur_edges)), "min_edge": min(cur_edges), "max_edge": max(cur_edges), "count": len(cur_edges), "distinct_rows": len(cur_rows), "row_coverage": len(cur_rows) / total_rows, }) cur_edges = [left] cur_rows = {row_idx} clusters.append({ "mean_x": int(sum(cur_edges) / len(cur_edges)), "min_edge": min(cur_edges), "max_edge": max(cur_edges), "count": len(cur_edges), "distinct_rows": len(cur_rows), "row_coverage": len(cur_rows) / total_rows, }) # --- Filter by row coverage --- # These thresholds must be high enough to avoid false columns in flowing # text (random inter-word gaps) while still detecting real columns in # vocabulary worksheets (which typically have >80% row coverage). MIN_COVERAGE_PRIMARY = 0.35 MIN_COVERAGE_SECONDARY = 0.12 MIN_WORDS_SECONDARY = 4 MIN_DISTINCT_ROWS = 3 # Content boundary for left-margin detection content_x_min = min(w["left"] for w in words) content_x_max = max(w["left"] + w["width"] for w in words) content_span = content_x_max - content_x_min primary = [ c for c in clusters if c["row_coverage"] >= MIN_COVERAGE_PRIMARY and c["distinct_rows"] >= MIN_DISTINCT_ROWS ] primary_ids = {id(c) for c in primary} secondary = [ c for c in clusters if id(c) not in primary_ids and c["row_coverage"] >= MIN_COVERAGE_SECONDARY and c["count"] >= MIN_WORDS_SECONDARY and c["distinct_rows"] >= MIN_DISTINCT_ROWS ] # Tertiary: narrow left-margin columns (page refs, markers) that have # too few rows for secondary but are clearly left-aligned and separated # from the main content. These appear at the far left or far right and # have a large gap to the nearest significant cluster. used_ids = {id(c) for c in primary} | {id(c) for c in secondary} sig_xs = [c["mean_x"] for c in primary + secondary] # Tertiary: clusters that are clearly to the LEFT of the first # significant column (or RIGHT of the last). If words consistently # start at a position left of the established first column boundary, # they MUST be a separate column — regardless of how few rows they # cover. The only requirement is a clear spatial gap. MIN_COVERAGE_TERTIARY = 0.02 # at least 1 row effectively tertiary = [] for c in clusters: if id(c) in used_ids: continue if c["distinct_rows"] < 1: continue if c["row_coverage"] < MIN_COVERAGE_TERTIARY: continue # Must be near left or right content margin (within 15%) rel_pos = (c["mean_x"] - content_x_min) / content_span if content_span else 0.5 if not (rel_pos < 0.15 or rel_pos > 0.85): continue # Must have significant gap to nearest significant cluster if sig_xs: min_dist = min(abs(c["mean_x"] - sx) for sx in sig_xs) if min_dist < max(30, content_span * 0.02): continue tertiary.append(c) if tertiary: for c in tertiary: logger.info( " tertiary (margin) cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)", c["mean_x"], c["min_edge"], c["max_edge"], c["count"], c["distinct_rows"], c["row_coverage"] * 100, ) significant = sorted(primary + secondary + tertiary, key=lambda c: c["mean_x"]) for c in significant: logger.info( " significant cluster: x=%d (range %d-%d), %d words, %d rows (%.0f%%)", c["mean_x"], c["min_edge"], c["max_edge"], c["count"], c["distinct_rows"], c["row_coverage"] * 100, ) logger.info( "alignment columns: %d clusters, %d primary, %d secondary -> %d significant", len(clusters), len(primary), len(secondary), len(significant), ) if not significant: # Fallback: single column covering all content x_min = min(w["left"] for w in words) x_max = max(w["left"] + w["width"] for w in words) return [{"index": 0, "type": "column_text", "x_min": x_min, "x_max": x_max}] # --- Merge nearby clusters --- merge_distance = max(25, int(zone_w * 0.03)) merged = [significant[0].copy()] for s in significant[1:]: if s["mean_x"] - merged[-1]["mean_x"] < merge_distance: prev = merged[-1] total = prev["count"] + s["count"] prev["mean_x"] = ( prev["mean_x"] * prev["count"] + s["mean_x"] * s["count"] ) // total prev["count"] = total prev["min_edge"] = min(prev["min_edge"], s["min_edge"]) prev["max_edge"] = max(prev["max_edge"], s["max_edge"]) prev["distinct_rows"] = max(prev["distinct_rows"], s["distinct_rows"]) else: merged.append(s.copy()) logger.info( "alignment columns: %d after merge (distance=%d)", len(merged), merge_distance, ) # --- Build column boundaries --- margin = max(5, int(zone_w * 0.005)) content_x_min = min(w["left"] for w in words) content_x_max = max(w["left"] + w["width"] for w in words) columns: List[Dict[str, Any]] = [] for i, cluster in enumerate(merged): x_min = max(content_x_min, cluster["min_edge"] - margin) if i + 1 < len(merged): x_max = merged[i + 1]["min_edge"] - margin else: x_max = content_x_max columns.append({ "index": i, "type": f"column_{i + 1}" if len(merged) > 1 else "column_text", "x_min": x_min, "x_max": x_max, }) return columns _MARKER_CHARS = set("*-+#>") def _merge_inline_marker_columns( columns: List[Dict], words: List[Dict], ) -> List[Dict]: """Merge narrow marker columns (bullets, numbering) into adjacent text. Bullet points (*, -) and numbering (1., 2.) create narrow columns at the left edge of a zone. These are inline markers that indent text, not real separate columns. Merge them with their right neighbour. Does NOT merge columns containing alphabetic words like "to", "in", "der", "die", "das" — those are legitimate content columns. """ if len(columns) < 2: return columns merged: List[Dict] = [] skip: set = set() for i, col in enumerate(columns): if i in skip: continue # Find words in this column col_words = [ w for w in words if col["x_min"] <= w["left"] + w["width"] / 2 < col["x_max"] ] col_width = col["x_max"] - col["x_min"] # Narrow column with mostly short words -> MIGHT be inline markers if col_words and col_width < 80: avg_len = sum(len(w.get("text", "")) for w in col_words) / len(col_words) if avg_len <= 2 and i + 1 < len(columns): # Check if words are actual markers (symbols/numbers) vs # real alphabetic words like "to", "in", "der", "die" texts = [(w.get("text") or "").strip() for w in col_words] alpha_count = sum( 1 for t in texts if t and t[0].isalpha() and t not in _MARKER_CHARS ) alpha_ratio = alpha_count / len(texts) if texts else 0 # If >=50% of words are alphabetic, this is a real column if alpha_ratio >= 0.5: logger.info( " kept narrow column %d (w=%d, avg_len=%.1f, " "alpha=%.0f%%) -- contains real words", i, col_width, avg_len, alpha_ratio * 100, ) else: # Merge into next column next_col = columns[i + 1].copy() next_col["x_min"] = col["x_min"] merged.append(next_col) skip.add(i + 1) logger.info( " merged inline marker column %d (w=%d, avg_len=%.1f) " "into column %d", i, col_width, avg_len, i + 1, ) continue merged.append(col) # Re-index for i, col in enumerate(merged): col["index"] = i col["type"] = f"column_{i + 1}" if len(merged) > 1 else "column_text" return merged