diff --git a/klausur-service/backend/cv_vocab_pipeline.py b/klausur-service/backend/cv_vocab_pipeline.py index 6be9209..6aaabc9 100644 --- a/klausur-service/backend/cv_vocab_pipeline.py +++ b/klausur-service/backend/cv_vocab_pipeline.py @@ -875,11 +875,147 @@ def analyze_layout(layout_img: np.ndarray, ocr_img: np.ndarray) -> List[PageRegi # --- Phase A: Geometry Detection --- -def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int]]: - """Detect column geometry by clustering left-aligned word positions. +def _detect_columns_by_clustering( + word_dicts: List[Dict], + left_edges: List[int], + edge_word_indices: List[int], + content_w: int, + content_h: int, + left_x: int, + right_x: int, + top_y: int, + bottom_y: int, +) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int]]: + """Fallback: detect columns by clustering left-aligned word positions. - Phase A of the two-phase column detection. Returns untyped column - geometries with their words for subsequent content-based classification. + Used when the primary gap-based algorithm finds fewer than 2 gaps. + """ + tolerance = max(10, int(content_w * 0.01)) + sorted_pairs = sorted(zip(left_edges, edge_word_indices), key=lambda p: p[0]) + + clusters = [] + cluster_widxs = [] + cur_edges = [sorted_pairs[0][0]] + cur_widxs = [sorted_pairs[0][1]] + for edge, widx in sorted_pairs[1:]: + if edge - cur_edges[-1] <= tolerance: + cur_edges.append(edge) + cur_widxs.append(widx) + else: + clusters.append(cur_edges) + cluster_widxs.append(cur_widxs) + cur_edges = [edge] + cur_widxs = [widx] + clusters.append(cur_edges) + cluster_widxs.append(cur_widxs) + + MIN_Y_COVERAGE_PRIMARY = 0.30 + MIN_Y_COVERAGE_SECONDARY = 0.15 + MIN_WORDS_SECONDARY = 5 + + cluster_infos = [] + for c_edges, c_widxs in zip(clusters, cluster_widxs): + if len(c_edges) < 2: + continue + y_positions = [word_dicts[idx]['top'] for idx in c_widxs] + y_span = max(y_positions) - min(y_positions) + y_coverage = y_span / content_h if content_h > 0 else 0.0 + cluster_infos.append({ + 'mean_x': int(np.mean(c_edges)), + 'count': len(c_edges), + 'min_edge': min(c_edges), + 'max_edge': max(c_edges), + 'y_min': min(y_positions), + 'y_max': max(y_positions), + 'y_coverage': y_coverage, + }) + + primary = [c for c in cluster_infos if c['y_coverage'] >= MIN_Y_COVERAGE_PRIMARY] + primary_set = set(id(c) for c in primary) + secondary = [c for c in cluster_infos + if id(c) not in primary_set + and c['y_coverage'] >= MIN_Y_COVERAGE_SECONDARY + and c['count'] >= MIN_WORDS_SECONDARY] + significant = sorted(primary + secondary, key=lambda c: c['mean_x']) + + if len(significant) < 3: + logger.info("ColumnGeometry clustering fallback: < 3 significant clusters") + return None + + merge_distance = max(30, int(content_w * 0.06)) + merged = [significant[0].copy()] + for s in significant[1:]: + if s['mean_x'] - merged[-1]['mean_x'] < merge_distance: + prev = merged[-1] + total = prev['count'] + s['count'] + avg_x = (prev['mean_x'] * prev['count'] + s['mean_x'] * s['count']) // total + prev['mean_x'] = avg_x + prev['count'] = total + prev['min_edge'] = min(prev['min_edge'], s['min_edge']) + prev['max_edge'] = max(prev['max_edge'], s['max_edge']) + else: + merged.append(s.copy()) + + if len(merged) < 3: + logger.info("ColumnGeometry clustering fallback: < 3 merged clusters") + return None + + logger.info(f"ColumnGeometry clustering fallback: {len(merged)} columns from clustering") + + margin_px = max(6, int(content_w * 0.003)) + return _build_geometries_from_starts( + [(max(0, left_x + m['min_edge'] - margin_px), m['count']) for m in merged], + word_dicts, left_x, right_x, top_y, bottom_y, content_w, content_h, + ) + + +def _build_geometries_from_starts( + col_starts: List[Tuple[int, int]], + word_dicts: List[Dict], + left_x: int, + right_x: int, + top_y: int, + bottom_y: int, + content_w: int, + content_h: int, +) -> Tuple[List[ColumnGeometry], int, int, int, int]: + """Build ColumnGeometry objects from a list of (abs_start_x, word_count) pairs.""" + geometries = [] + for i, (start_x, count) in enumerate(col_starts): + if i + 1 < len(col_starts): + col_width = col_starts[i + 1][0] - start_x + else: + col_width = right_x - start_x + + col_left_rel = start_x - left_x + col_right_rel = col_left_rel + col_width + col_words = [w for w in word_dicts + if col_left_rel <= w['left'] < col_right_rel] + + geometries.append(ColumnGeometry( + index=i, + x=start_x, + y=top_y, + width=col_width, + height=content_h, + word_count=len(col_words), + words=col_words, + width_ratio=col_width / content_w if content_w > 0 else 0.0, + )) + + logger.info(f"ColumnGeometry: {len(geometries)} columns: " + f"{[(g.index, g.x, g.width, g.word_count) for g in geometries]}") + return (geometries, left_x, right_x, top_y, bottom_y) + + +def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Optional[Tuple[List[ColumnGeometry], int, int, int, int]]: + """Detect column geometry using whitespace-gap analysis with word validation. + + Phase A of the two-phase column detection. Uses vertical projection + profiles to find whitespace gaps between columns, then validates that + no gap cuts through a word bounding box. + + Falls back to clustering-based detection if fewer than 2 gaps are found. Args: ocr_img: Binarized grayscale image for layout analysis. @@ -887,11 +1023,11 @@ def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Opt Returns: Tuple of (geometries, left_x, right_x, top_y, bottom_y) or None if - fewer than 3 clusters are found (signals fallback needed). + detection fails entirely. """ h, w = ocr_img.shape[:2] - # --- Find content bounds --- + # --- Step 1: Find content bounds --- inv = cv2.bitwise_not(ocr_img) left_x, right_x, top_y, bottom_y = _find_content_bounds(inv) content_w = right_x - left_x @@ -905,7 +1041,7 @@ def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Opt logger.info(f"ColumnGeometry: content bounds x=[{left_x}..{right_x}] ({content_w}px), " f"y=[{top_y}..{bottom_y}] ({content_h}px)") - # --- Get word bounding boxes from Tesseract --- + # --- Step 2: Get word bounding boxes from Tesseract --- content_roi = dewarped_bgr[top_y:bottom_y, left_x:right_x] pil_img = Image.fromarray(cv2.cvtColor(content_roi, cv2.COLOR_BGR2RGB)) @@ -915,10 +1051,9 @@ def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Opt logger.warning(f"ColumnGeometry: Tesseract image_to_data failed: {e}") return None - # Collect words with their full info word_dicts = [] left_edges = [] - edge_word_indices = [] # Track which word_dicts index each edge belongs to + edge_word_indices = [] n_words = len(data['text']) for i in range(n_words): conf = int(data['conf'][i]) if str(data['conf'][i]).lstrip('-').isdigit() else -1 @@ -942,146 +1077,171 @@ def detect_column_geometry(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Opt logger.info(f"ColumnGeometry: {len(left_edges)} words detected in content area") - # --- Cluster left edges (tracking word indices per cluster) --- - tolerance = max(10, int(content_w * 0.01)) + # --- Step 3: Vertical projection profile --- + content_strip = inv[top_y:bottom_y, left_x:right_x] + v_proj = np.sum(content_strip, axis=0).astype(float) + v_proj_norm = v_proj / (content_h * 255) if content_h > 0 else v_proj - # Sort edges while keeping word index association - sorted_pairs = sorted(zip(left_edges, edge_word_indices), key=lambda p: p[0]) + # Smooth the projection to avoid noise-induced micro-gaps + kernel_size = max(5, content_w // 80) + if kernel_size % 2 == 0: + kernel_size += 1 # keep odd for symmetry + v_smooth = np.convolve(v_proj_norm, np.ones(kernel_size) / kernel_size, mode='same') - clusters = [] # list of lists of edge x-values - cluster_widxs = [] # parallel list of lists of word_dicts indices - cur_edges = [sorted_pairs[0][0]] - cur_widxs = [sorted_pairs[0][1]] - for edge, widx in sorted_pairs[1:]: - if edge - cur_edges[-1] <= tolerance: - cur_edges.append(edge) - cur_widxs.append(widx) + # --- Step 4: Find whitespace gaps --- + # Threshold: areas with very little ink density are gaps + median_density = float(np.median(v_smooth[v_smooth > 0])) if np.any(v_smooth > 0) else 0.01 + gap_threshold = max(median_density * 0.15, 0.005) + + in_gap = v_smooth < gap_threshold + MIN_GAP_WIDTH = max(8, content_w // 200) # min ~8px or 0.5% of content width + + # Collect contiguous gap regions + raw_gaps = [] # (start_x_rel, end_x_rel) relative to content ROI + gap_start = None + for x in range(len(in_gap)): + if in_gap[x]: + if gap_start is None: + gap_start = x else: - clusters.append(cur_edges) - cluster_widxs.append(cur_widxs) - cur_edges = [edge] - cur_widxs = [widx] - clusters.append(cur_edges) - cluster_widxs.append(cur_widxs) + if gap_start is not None: + gap_width = x - gap_start + if gap_width >= MIN_GAP_WIDTH: + raw_gaps.append((gap_start, x)) + gap_start = None + # Handle gap at the right edge + if gap_start is not None: + gap_width = len(in_gap) - gap_start + if gap_width >= MIN_GAP_WIDTH: + raw_gaps.append((gap_start, len(in_gap))) - # --- Enrich clusters with Y-span info and apply verticality filter --- - MIN_Y_COVERAGE_PRIMARY = 0.30 # Primary columns span >= 30% of page height - MIN_Y_COVERAGE_SECONDARY = 0.15 # Secondary columns span >= 15% - MIN_WORDS_SECONDARY = 5 # Secondary columns need >= 5 words + logger.info(f"ColumnGeometry: {len(raw_gaps)} raw gaps found (threshold={gap_threshold:.4f}, " + f"min_width={MIN_GAP_WIDTH}px): " + f"{[(g[0]+left_x, g[1]+left_x, g[1]-g[0]) for g in raw_gaps]}") - cluster_infos = [] - for c_edges, c_widxs in zip(clusters, cluster_widxs): - if len(c_edges) < 2: - continue - y_positions = [word_dicts[idx]['top'] for idx in c_widxs] - y_span = max(y_positions) - min(y_positions) - y_coverage = y_span / content_h if content_h > 0 else 0.0 + # --- Step 5: Validate gaps against word bounding boxes --- + validated_gaps = [] + for gap_start_rel, gap_end_rel in raw_gaps: + # Check if any word overlaps with this gap region + overlapping = False + for wd in word_dicts: + word_left = wd['left'] + word_right = wd['left'] + wd['width'] + if word_left < gap_end_rel and word_right > gap_start_rel: + overlapping = True + break - cluster_infos.append({ - 'mean_x': int(np.mean(c_edges)), - 'count': len(c_edges), - 'min_edge': min(c_edges), - 'max_edge': max(c_edges), - 'y_min': min(y_positions), - 'y_max': max(y_positions), - 'y_coverage': y_coverage, - }) - - _ci_summary = [(ci['mean_x']+left_x, ci['count'], format(ci['y_coverage'], '.0%')) for ci in cluster_infos[:12]] - logger.info(f"ColumnGeometry: {len(cluster_infos)} clusters with >=2 words " - f"(from {len(clusters)} total), y_coverage: {_ci_summary}") - - # Primary: good vertical coverage - primary = [c for c in cluster_infos if c['y_coverage'] >= MIN_Y_COVERAGE_PRIMARY] - # Secondary: moderate coverage with enough words - primary_set = set(id(c) for c in primary) - secondary = [c for c in cluster_infos - if id(c) not in primary_set - and c['y_coverage'] >= MIN_Y_COVERAGE_SECONDARY - and c['count'] >= MIN_WORDS_SECONDARY] - - significant = sorted(primary + secondary, key=lambda c: c['mean_x']) - - _sig_summary = [(s['mean_x']+left_x, s['count'], format(s['y_coverage'], '.0%')) for s in significant[:10]] - logger.info(f"ColumnGeometry: {len(significant)} significant clusters " - f"(primary={len(primary)}, secondary={len(secondary)}): {_sig_summary}") - - if len(significant) < 3: - logger.info("ColumnGeometry: < 3 clusters after verticality filter, signaling fallback") - return None - - # --- Merge clusters that are very close --- - # 6% of content width: on a typical 5-col vocab page (~1500px wide), - # this is ~90px, which merges sub-alignments within a single column - # while keeping real column boundaries (~300px apart) separate. - merge_distance = max(30, int(content_w * 0.06)) - merged = [significant[0].copy()] - for s in significant[1:]: - if s['mean_x'] - merged[-1]['mean_x'] < merge_distance: - prev = merged[-1] - total = prev['count'] + s['count'] - avg_x = (prev['mean_x'] * prev['count'] + s['mean_x'] * s['count']) // total - prev['mean_x'] = avg_x - prev['count'] = total - prev['min_edge'] = min(prev['min_edge'], s['min_edge']) - prev['max_edge'] = max(prev['max_edge'], s['max_edge']) - prev['y_min'] = min(prev['y_min'], s['y_min']) - prev['y_max'] = max(prev['y_max'], s['y_max']) - prev['y_coverage'] = (prev['y_max'] - prev['y_min']) / content_h if content_h > 0 else 0.0 + if not overlapping: + validated_gaps.append((gap_start_rel, gap_end_rel)) else: - merged.append(s.copy()) + # Try to shift the gap to avoid the overlapping word(s) + # Find the tightest word boundaries within the gap region + min_word_left = content_w + max_word_right = 0 + for wd in word_dicts: + word_left = wd['left'] + word_right = wd['left'] + wd['width'] + if word_left < gap_end_rel and word_right > gap_start_rel: + min_word_left = min(min_word_left, word_left) + max_word_right = max(max_word_right, word_right) - # --- Post-merge: absorb tiny clusters (< 5% content width) into neighbors --- - i = 0 - absorbed_count = 0 - while i < len(merged) and len(merged) > 3: - if i + 1 < len(merged): - cluster_w = merged[i + 1]['mean_x'] - merged[i]['mean_x'] - else: - cluster_w = content_w - (merged[i]['mean_x'] - merged[0]['mean_x']) - if cluster_w / content_w < 0.05: - # Absorb into neighbor (prefer left) - if i > 0: - target = merged[i - 1] + # Try gap before the overlapping words + if min_word_left - gap_start_rel >= MIN_GAP_WIDTH: + validated_gaps.append((gap_start_rel, min_word_left)) + logger.debug(f"ColumnGeometry: gap shifted left to avoid word at {min_word_left}") + # Try gap after the overlapping words + elif gap_end_rel - max_word_right >= MIN_GAP_WIDTH: + validated_gaps.append((max_word_right, gap_end_rel)) + logger.debug(f"ColumnGeometry: gap shifted right to avoid word at {max_word_right}") else: - target = merged[i + 1] - target['count'] += merged[i]['count'] - target['min_edge'] = min(target['min_edge'], merged[i]['min_edge']) - target['max_edge'] = max(target['max_edge'], merged[i]['max_edge']) - target['y_min'] = min(target['y_min'], merged[i]['y_min']) - target['y_max'] = max(target['y_max'], merged[i]['y_max']) - target['y_coverage'] = (target['y_max'] - target['y_min']) / content_h if content_h > 0 else 0.0 - del merged[i] - absorbed_count += 1 - else: - i += 1 - if absorbed_count: - logger.info(f"ColumnGeometry: absorbed {absorbed_count} tiny clusters (< 5% width)") + logger.debug(f"ColumnGeometry: gap [{gap_start_rel}..{gap_end_rel}] " + f"discarded (word overlap, no room to shift)") - _merged_summary = [(m['mean_x']+left_x, m['count'], format(m['y_coverage'], '.0%')) for m in merged] - logger.info(f"ColumnGeometry: {len(merged)} clusters after merging (dist={merge_distance}px): {_merged_summary}") + logger.info(f"ColumnGeometry: {len(validated_gaps)} gaps after word validation: " + f"{[(g[0]+left_x, g[1]+left_x, g[1]-g[0]) for g in validated_gaps]}") - if len(merged) < 3: - logger.info("ColumnGeometry: < 3 merged clusters, signaling fallback") - return None + # --- Step 6: Fallback to clustering if too few gaps --- + if len(validated_gaps) < 2: + logger.info("ColumnGeometry: < 2 gaps found, falling back to clustering") + return _detect_columns_by_clustering( + word_dicts, left_edges, edge_word_indices, + content_w, content_h, left_x, right_x, top_y, bottom_y, + ) - # --- Derive column boundaries --- - margin_px = max(6, int(content_w * 0.003)) # ~2mm margin before column start + # --- Step 7: Derive column boundaries from gaps --- + # Sort gaps by position + validated_gaps.sort(key=lambda g: g[0]) + # Identify margin gaps (first and last) vs interior gaps + # A margin gap touches the edge of the content area (within 2% tolerance) + edge_tolerance = max(10, int(content_w * 0.02)) + + is_left_margin = validated_gaps[0][0] <= edge_tolerance + is_right_margin = validated_gaps[-1][1] >= content_w - edge_tolerance + + # Interior gaps define column boundaries + # Column starts at the end of a gap, ends at the start of the next gap col_starts = [] - for m in merged: - abs_start = max(0, left_x + m['min_edge'] - margin_px) - col_starts.append((abs_start, m['count'])) - # Calculate column widths and assign words to columns - geometries = [] - for i, (start_x, count) in enumerate(col_starts): + if is_left_margin: + # First column starts after the left margin gap + first_gap_end = validated_gaps[0][1] + interior_gaps = validated_gaps[1:] + else: + # No left margin gap — first column starts at content left edge + first_gap_end = 0 + interior_gaps = validated_gaps[:] + + if is_right_margin: + # Last gap is right margin — don't use it as column start + interior_gaps_for_boundaries = interior_gaps[:-1] + right_boundary = validated_gaps[-1][0] # last column ends at right margin gap start + else: + interior_gaps_for_boundaries = interior_gaps + right_boundary = content_w + + # First column + col_starts.append(left_x + first_gap_end) + + # Columns between interior gaps + for gap_start_rel, gap_end_rel in interior_gaps_for_boundaries: + col_starts.append(left_x + gap_end_rel) + + # Count words per column region (for logging) + col_start_counts = [] + for i, start_x in enumerate(col_starts): if i + 1 < len(col_starts): - col_width = col_starts[i + 1][0] - start_x + next_start = col_starts[i + 1] + elif is_right_margin: + next_start = left_x + right_boundary else: - col_width = right_x - start_x + next_start = right_x - # Assign words to this column based on left edge + col_left_rel = start_x - left_x + col_right_rel = next_start - left_x + n_words_in_col = sum(1 for w in word_dicts + if col_left_rel <= w['left'] < col_right_rel) + col_start_counts.append((start_x, n_words_in_col)) + + logger.info(f"ColumnGeometry: {len(col_starts)} columns from {len(validated_gaps)} gaps " + f"(left_margin={is_left_margin}, right_margin={is_right_margin}): " + f"{col_start_counts}") + + # --- Step 8: Build ColumnGeometry objects --- + # Determine right edge for each column + all_boundaries = [] + for i, start_x in enumerate(col_starts): + if i + 1 < len(col_starts): + end_x = col_starts[i + 1] + elif is_right_margin: + end_x = left_x + right_boundary + else: + end_x = right_x + all_boundaries.append((start_x, end_x)) + + geometries = [] + for i, (start_x, end_x) in enumerate(all_boundaries): + col_width = end_x - start_x col_left_rel = start_x - left_x col_right_rel = col_left_rel + col_width col_words = [w for w in word_dicts