""" Column type classification for OCR layout analysis. Entry point: classify_column_types() with 4-level fallback chain. Also provides positional_column_regions() and _build_margin_regions(). Position-based classifiers (Level 2+3) in cv_layout_classify_position.py. """ import logging from typing import Dict, List, Optional import numpy as np from cv_vocab_types import ColumnGeometry, PageRegion from cv_layout_scoring import ( _score_language, _score_role, _score_dictionary_signals, _classify_dictionary_columns, ) from cv_layout_classify_position import ( _classify_by_position_enhanced, _classify_by_position_fallback, ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Margin Region Building # --------------------------------------------------------------------------- def _build_margin_regions( all_regions: List[PageRegion], left_x: int, right_x: int, img_w: int, top_y: int, content_h: int, ) -> List[PageRegion]: """Create margin_left / margin_right PageRegions from content bounds. Margins represent the space between the image edge and the first/last content column. They are used downstream for faithful page reconstruction but are skipped during OCR. """ margins: List[PageRegion] = [] # Minimum gap (px) to create a margin region _min_gap = 5 if left_x > _min_gap: margins.append(PageRegion( type='margin_left', x=0, y=top_y, width=left_x, height=content_h, classification_confidence=1.0, classification_method='content_bounds', )) # Right margin: from end of last content column to image edge non_margin = [r for r in all_regions if r.type not in ('margin_left', 'margin_right', 'header', 'footer', 'margin_top', 'margin_bottom')] if non_margin: last_col_end = max(r.x + r.width for r in non_margin) else: last_col_end = right_x if img_w - last_col_end > _min_gap: margins.append(PageRegion( type='margin_right', x=last_col_end, y=top_y, width=img_w - last_col_end, height=content_h, classification_confidence=1.0, classification_method='content_bounds', )) if margins: logger.info(f"Margins: {[(m.type, m.x, m.width) for m in margins]} " f"(left_x={left_x}, right_x={right_x}, img_w={img_w})") return margins # --------------------------------------------------------------------------- # Positional Column Regions # --------------------------------------------------------------------------- def positional_column_regions( geometries: List[ColumnGeometry], content_w: int, content_h: int, left_x: int, ) -> List[PageRegion]: """Classify columns by position only (no language scoring). Structural columns (page_ref, column_marker) are identified by geometry. Remaining content columns are labelled left->right as column_en, column_de, column_example. The names are purely positional -- no language analysis. """ structural: List[PageRegion] = [] content_cols: List[ColumnGeometry] = [] for g in geometries: rel_x = g.x - left_x # page_ref: narrow column in the leftmost 20% region if g.width_ratio < 0.12 and (rel_x / content_w if content_w else 0) < 0.20: structural.append(PageRegion( type='page_ref', x=g.x, y=g.y, width=g.width, height=content_h, classification_confidence=0.95, classification_method='positional', )) # column_marker: very narrow, few words elif g.width_ratio < 0.06 and g.word_count <= 15: structural.append(PageRegion( type='column_marker', x=g.x, y=g.y, width=g.width, height=content_h, classification_confidence=0.95, classification_method='positional', )) # empty or near-empty narrow column -> treat as margin/structural elif g.word_count <= 2 and g.width_ratio < 0.15: structural.append(PageRegion( type='column_marker', x=g.x, y=g.y, width=g.width, height=content_h, classification_confidence=0.85, classification_method='positional', )) else: content_cols.append(g) # Single content column -> plain text page if len(content_cols) == 1: g = content_cols[0] return structural + [PageRegion( type='column_text', x=g.x, y=g.y, width=g.width, height=content_h, classification_confidence=0.9, classification_method='positional', )] # No content columns if not content_cols: return structural # Sort content columns left->right and assign positional labels content_cols.sort(key=lambda g: g.x) # With exactly 2 content columns: if the left one is very wide (>35%), # it likely contains EN+DE combined, so the right one is examples. if (len(content_cols) == 2 and content_cols[0].width_ratio > 0.35 and content_cols[1].width_ratio > 0.20): labels = ['column_en', 'column_example'] else: labels = ['column_en', 'column_de', 'column_example'] regions = list(structural) for i, g in enumerate(content_cols): label = labels[i] if i < len(labels) else 'column_example' regions.append(PageRegion( type=label, x=g.x, y=g.y, width=g.width, height=content_h, classification_confidence=0.95, classification_method='positional', )) logger.info(f"PositionalColumns: {len(structural)} structural, " f"{len(content_cols)} content -> " f"{[r.type for r in regions]}") return regions # --------------------------------------------------------------------------- # Main Classification Entry Point # --------------------------------------------------------------------------- def classify_column_types(geometries: List[ColumnGeometry], content_w: int, top_y: int, img_w: int, img_h: int, bottom_y: int, left_x: int = 0, right_x: int = 0, inv: Optional[np.ndarray] = None, document_category: Optional[str] = None, margin_strip_detected: bool = False) -> List[PageRegion]: """Classify column types using a 3-level fallback chain. Level 0: Dictionary detection (if signals are strong enough) Level 1: Content-based (language + role scoring) Level 2: Position + language (old rules enhanced with language detection) Level 3: Pure position (exact old code, no regression) Args: geometries: List of ColumnGeometry from Phase A. content_w: Total content width. top_y: Top Y of content area. img_w: Full image width. img_h: Full image height. bottom_y: Bottom Y of content area. left_x: Left content bound (from _find_content_bounds). right_x: Right content bound (from _find_content_bounds). document_category: User-selected category (e.g. 'woerterbuch'). margin_strip_detected: Whether a decorative A-Z margin strip was found. Returns: List of PageRegion with types, confidence, and method. """ # _add_header_footer lives in cv_layout (avoids circular import at module # level). Lazy-import here so the module can be tested independently when # cv_layout hasn't been modified yet. from cv_layout_detection import _add_header_footer # noqa: E402 content_h = bottom_y - top_y def _with_margins(result: List[PageRegion]) -> List[PageRegion]: """Append margin_left / margin_right regions to *result*.""" margins = _build_margin_regions(result, left_x, right_x, img_w, top_y, content_h) return result + margins # Special case: single column -> plain text page if len(geometries) == 1: geom = geometries[0] return _with_margins([PageRegion( type='column_text', x=geom.x, y=geom.y, width=geom.width, height=geom.height, classification_confidence=0.9, classification_method='content', )]) # --- Pre-filter: first/last columns with very few words -> column_ignore --- # Sub-columns from _detect_sub_columns() are exempt: they intentionally # have few words (page refs, markers) and should not be discarded. ignore_regions = [] active_geometries = [] for idx, g in enumerate(geometries): if (idx == 0 or idx == len(geometries) - 1) and g.word_count < 8 and not g.is_sub_column: ignore_regions.append(PageRegion( type='column_ignore', x=g.x, y=g.y, width=g.width, height=content_h, classification_confidence=0.95, classification_method='content', )) logger.info(f"ClassifyColumns: column {idx} (x={g.x}, words={g.word_count}) -> column_ignore (edge, few words)") else: active_geometries.append(g) # Re-index active geometries for classification for new_idx, g in enumerate(active_geometries): g.index = new_idx geometries = active_geometries # Handle edge case: all columns ignored or only 1 left if len(geometries) == 0: return _with_margins(ignore_regions) if len(geometries) == 1: geom = geometries[0] ignore_regions.append(PageRegion( type='column_text', x=geom.x, y=geom.y, width=geom.width, height=geom.height, classification_confidence=0.9, classification_method='content', )) return _with_margins(ignore_regions) # --- Score all columns --- lang_scores = [_score_language(g.words) for g in geometries] role_scores = [_score_role(g) for g in geometries] logger.info(f"ClassifyColumns: language scores: " f"{[(g.index, ls) for g, ls in zip(geometries, lang_scores)]}") logger.info(f"ClassifyColumns: role scores: " f"{[(g.index, rs) for g, rs in zip(geometries, role_scores)]}") # --- Level 0: Dictionary detection --- dict_signals = _score_dictionary_signals( geometries, document_category=document_category, margin_strip_detected=margin_strip_detected, ) if dict_signals["is_dictionary"]: regions = _classify_dictionary_columns( geometries, dict_signals, lang_scores, content_h, ) if regions is not None: logger.info("ClassifyColumns: Level 0 (dictionary) succeeded, confidence=%.3f", dict_signals["confidence"]) _add_header_footer(regions, top_y, bottom_y, img_w, img_h, inv=inv) return _with_margins(ignore_regions + regions) # --- Level 1: Content-based classification --- regions = _classify_by_content(geometries, lang_scores, role_scores, content_w, content_h) if regions is not None: logger.info("ClassifyColumns: Level 1 (content-based) succeeded") _add_header_footer(regions, top_y, bottom_y, img_w, img_h, inv=inv) return _with_margins(ignore_regions + regions) # --- Level 2: Position + language enhanced --- regions = _classify_by_position_enhanced(geometries, lang_scores, content_w, content_h) if regions is not None: logger.info("ClassifyColumns: Level 2 (position+language) succeeded") _add_header_footer(regions, top_y, bottom_y, img_w, img_h, inv=inv) return _with_margins(ignore_regions + regions) # --- Level 3: Pure position fallback (old code, no regression) --- logger.info("ClassifyColumns: Level 3 (position fallback)") regions = _classify_by_position_fallback(geometries, content_w, content_h) _add_header_footer(regions, top_y, bottom_y, img_w, img_h, inv=inv) return _with_margins(ignore_regions + regions) # --------------------------------------------------------------------------- # Level 1: Content-Based Classification # --------------------------------------------------------------------------- def _classify_by_content(geometries: List[ColumnGeometry], lang_scores: List[Dict[str, float]], role_scores: List[Dict[str, float]], content_w: int, content_h: int) -> Optional[List[PageRegion]]: """Level 1: Classify columns purely by content analysis. Requires clear language signals to distinguish EN/DE columns. Returns None if language signals are too weak. """ regions = [] assigned = set() # Step 1: Assign structural roles first (reference, marker) # left_20_threshold: only the leftmost ~20% of content area qualifies for page_ref left_20_threshold = geometries[0].x + content_w * 0.20 if geometries else 0 for i, (geom, rs, ls) in enumerate(zip(geometries, role_scores, lang_scores)): is_left_side = geom.x < left_20_threshold has_strong_language = ls['eng'] > 0.3 or ls['deu'] > 0.3 if rs['reference'] >= 0.5 and geom.width_ratio < 0.12 and is_left_side and not has_strong_language: regions.append(PageRegion( type='page_ref', x=geom.x, y=geom.y, width=geom.width, height=content_h, classification_confidence=rs['reference'], classification_method='content', )) assigned.add(i) elif rs['marker'] >= 0.7 and geom.width_ratio < 0.06: regions.append(PageRegion( type='column_marker', x=geom.x, y=geom.y, width=geom.width, height=content_h, classification_confidence=rs['marker'], classification_method='content', )) assigned.add(i) elif geom.width_ratio < 0.05 and not is_left_side: # Narrow column on the right side -> marker, not page_ref regions.append(PageRegion( type='column_marker', x=geom.x, y=geom.y, width=geom.width, height=content_h, classification_confidence=0.8, classification_method='content', )) assigned.add(i) # Step 2: Among remaining columns, find EN and DE by language scores remaining = [(i, geometries[i], lang_scores[i], role_scores[i]) for i in range(len(geometries)) if i not in assigned] if len(remaining) < 2: # Not enough columns for EN/DE pair if len(remaining) == 1: i, geom, ls, rs = remaining[0] regions.append(PageRegion( type='column_text', x=geom.x, y=geom.y, width=geom.width, height=content_h, classification_confidence=0.6, classification_method='content', )) regions.sort(key=lambda r: r.x) return regions # Check if we have enough language signal en_candidates = [(i, g, ls) for i, g, ls, rs in remaining if ls['eng'] > ls['deu'] and ls['eng'] > 0.05] de_candidates = [(i, g, ls) for i, g, ls, rs in remaining if ls['deu'] > ls['eng'] and ls['deu'] > 0.05] # Position tiebreaker: when language signals are weak, use left=EN, right=DE if (not en_candidates or not de_candidates) and len(remaining) >= 2: max_eng = max(ls['eng'] for _, _, ls, _ in remaining) max_deu = max(ls['deu'] for _, _, ls, _ in remaining) if max_eng < 0.15 and max_deu < 0.15: # Both signals weak -- fall back to positional: left=EN, right=DE sorted_remaining = sorted(remaining, key=lambda x: x[1].x) best_en = (sorted_remaining[0][0], sorted_remaining[0][1], sorted_remaining[0][2]) best_de = (sorted_remaining[1][0], sorted_remaining[1][1], sorted_remaining[1][2]) logger.info("ClassifyColumns: Level 1 using position tiebreaker (weak signals) - left=EN, right=DE") en_conf = 0.4 de_conf = 0.4 regions.append(PageRegion( type='column_en', x=best_en[1].x, y=best_en[1].y, width=best_en[1].width, height=content_h, classification_confidence=en_conf, classification_method='content', )) assigned.add(best_en[0]) regions.append(PageRegion( type='column_de', x=best_de[1].x, y=best_de[1].y, width=best_de[1].width, height=content_h, classification_confidence=de_conf, classification_method='content', )) assigned.add(best_de[0]) # Assign remaining as example for i, geom, ls, rs in remaining: if i not in assigned: regions.append(PageRegion( type='column_example', x=geom.x, y=geom.y, width=geom.width, height=content_h, classification_confidence=0.4, classification_method='content', )) regions.sort(key=lambda r: r.x) return regions if not en_candidates or not de_candidates: # Language signals too weak for content-based classification logger.info("ClassifyColumns: Level 1 failed - no clear EN/DE language split") return None # Pick the best EN and DE candidates best_en = max(en_candidates, key=lambda x: x[2]['eng']) best_de = max(de_candidates, key=lambda x: x[2]['deu']) # Position-aware EN selection: in typical textbooks the layout is EN | DE | Example. # Example sentences contain English function words ("the", "a", "is") which inflate # the eng score of the Example column. When the best EN candidate sits to the RIGHT # of the DE column and there is another EN candidate to the LEFT, prefer the left one # -- it is almost certainly the real vocabulary column. if best_de[2]['deu'] > 0.5 and best_en[1].x > best_de[1].x and len(en_candidates) > 1: left_of_de = [c for c in en_candidates if c[1].x < best_de[1].x] if left_of_de: alt_en = max(left_of_de, key=lambda x: x[2]['eng']) logger.info( f"ClassifyColumns: Level 1 position fix -- best EN col {best_en[0]} " f"(eng={best_en[2]['eng']:.3f}) is right of DE col {best_de[0]}; " f"preferring left col {alt_en[0]} (eng={alt_en[2]['eng']:.3f})") best_en = alt_en if best_en[0] == best_de[0]: # Same column scored highest for both -- ambiguous logger.info("ClassifyColumns: Level 1 failed - same column highest for EN and DE") return None en_conf = best_en[2]['eng'] de_conf = best_de[2]['deu'] regions.append(PageRegion( type='column_en', x=best_en[1].x, y=best_en[1].y, width=best_en[1].width, height=content_h, classification_confidence=round(en_conf, 2), classification_method='content', )) assigned.add(best_en[0]) regions.append(PageRegion( type='column_de', x=best_de[1].x, y=best_de[1].y, width=best_de[1].width, height=content_h, classification_confidence=round(de_conf, 2), classification_method='content', )) assigned.add(best_de[0]) # Step 3: Remaining columns -> example or text based on role scores for i, geom, ls, rs in remaining: if i in assigned: continue if rs['sentence'] > 0.4: regions.append(PageRegion( type='column_example', x=geom.x, y=geom.y, width=geom.width, height=content_h, classification_confidence=round(rs['sentence'], 2), classification_method='content', )) else: regions.append(PageRegion( type='column_example', x=geom.x, y=geom.y, width=geom.width, height=content_h, classification_confidence=0.5, classification_method='content', )) regions.sort(key=lambda r: r.x) return regions