diff --git a/klausur-service/backend/cv_box_layout.py b/klausur-service/backend/cv_box_layout.py index 94ac51b..e1dd6fa 100644 --- a/klausur-service/backend/cv_box_layout.py +++ b/klausur-service/backend/cv_box_layout.py @@ -257,49 +257,8 @@ def build_box_zone_grid( global_columns=None, # detect columns independently ) - # --- Detect spanning cells --- - # If a row has fewer word-blocks than columns, some cells span multiple - # columns. Detect this and mark them as spanning_header so the frontend - # renders them correctly (single cell across the row). - columns = result.get("columns", []) - cells = result.get("cells", []) - rows = result.get("rows", []) - - if len(columns) >= 2: - # Group original words by row - from grid_editor_helpers import _cluster_rows as _cr - row_data = _cr(zone_words) - row_word_map: Dict[int, List[Dict]] = {} - for w in zone_words: - yc = w["top"] + w["height"] / 2 - best = min(row_data, key=lambda r: abs(r["y_center"] - yc)) - row_word_map.setdefault(best["index"], []).append(w) - - for row in rows: - ri = row.get("index", row.get("row_index", -1)) - rw = row_word_map.get(ri, []) - row_cells = [c for c in cells if c.get("row_index") == ri] - - # If this row has fewer word-blocks than columns, it's a spanning row - if 0 < len(rw) < len(columns): - # Merge all cell texts and mark as spanning - merged_text = " ".join( - c.get("text", "") for c in sorted(row_cells, key=lambda c: c.get("col_index", 0)) - ).strip() - # Remove existing cells for this row - cells = [c for c in cells if c.get("row_index") != ri] - # Add single spanning cell - cells.append({ - "cell_id": f"Z{zone_index}_R{ri:02d}_C0", - "row_index": ri, - "col_index": 0, - "col_type": "spanning_header", - "text": merged_text, - "word_boxes": rw, - }) - logger.info("Box zone %d row %d: merged %d cells into spanning cell", zone_index, ri, len(row_cells)) - - result["cells"] = cells + # Colspan detection is now handled generically by _detect_colspan_cells + # in grid_editor_helpers.py (called inside _build_zone_grid). result["box_layout_type"] = layout_type result["box_grid_reviewed"] = False diff --git a/klausur-service/backend/grid_editor_helpers.py b/klausur-service/backend/grid_editor_helpers.py index cb8b89c..90c55b9 100644 --- a/klausur-service/backend/grid_editor_helpers.py +++ b/klausur-service/backend/grid_editor_helpers.py @@ -1218,6 +1218,141 @@ def _detect_header_rows( return headers +def _detect_colspan_cells( + zone_words: List[Dict], + columns: List[Dict], + rows: List[Dict], + cells: List[Dict], + img_w: int, + img_h: int, +) -> List[Dict]: + """Detect and merge cells that span multiple columns (colspan). + + A word-block (PaddleOCR phrase) that extends significantly past a column + boundary into the next column indicates a merged cell. This replaces + the incorrectly split cells with a single cell spanning multiple columns. + + Works for both full-page scans and box zones. + """ + if len(columns) < 2 or not zone_words or not rows: + return cells + + from cv_words_first import _assign_word_to_row + + # Column boundaries (midpoints between adjacent columns) + col_boundaries = [] + for ci in range(len(columns) - 1): + col_boundaries.append((columns[ci]["x_max"] + columns[ci + 1]["x_min"]) / 2) + + def _cols_covered(w_left: float, w_right: float) -> List[int]: + """Return list of column indices that a word-block covers.""" + covered = [] + for col in columns: + col_mid = (col["x_min"] + col["x_max"]) / 2 + # Word covers a column if it extends past the column's midpoint + if w_left < col_mid < w_right: + covered.append(col["index"]) + # Also include column if word starts within it + elif col["x_min"] <= w_left < col["x_max"]: + covered.append(col["index"]) + return sorted(set(covered)) + + # Group original word-blocks by row + row_word_blocks: Dict[int, List[Dict]] = {} + for w in zone_words: + ri = _assign_word_to_row(w, rows) + row_word_blocks.setdefault(ri, []).append(w) + + # For each row, check if any word-block spans multiple columns + rows_to_merge: Dict[int, List[Dict]] = {} # row_index → list of spanning word-blocks + + for ri, wblocks in row_word_blocks.items(): + spanning = [] + for w in wblocks: + w_left = w["left"] + w_right = w_left + w["width"] + covered = _cols_covered(w_left, w_right) + if len(covered) >= 2: + spanning.append({"word": w, "cols": covered}) + if spanning: + rows_to_merge[ri] = spanning + + if not rows_to_merge: + return cells + + # Merge cells for spanning rows + new_cells = [] + for cell in cells: + ri = cell.get("row_index", -1) + if ri not in rows_to_merge: + new_cells.append(cell) + continue + + # Check if this cell's column is part of a spanning block + ci = cell.get("col_index", -1) + is_part_of_span = False + for span in rows_to_merge[ri]: + if ci in span["cols"]: + is_part_of_span = True + # Only emit the merged cell for the FIRST column in the span + if ci == span["cols"][0]: + # Collect all cells in this span + span_cells = [c for c in cells + if c.get("row_index") == ri + and c.get("col_index") in span["cols"]] + # Merge texts (skip if same text repeated) + texts = [] + for sc in sorted(span_cells, key=lambda c: c.get("col_index", 0)): + t = sc.get("text", "").strip() + if t and t not in texts: + texts.append(t) + merged_text = " ".join(texts) + + # Collect all word_boxes + all_wb = [] + for sc in span_cells: + all_wb.extend(sc.get("word_boxes", [])) + + # Compute merged bbox + if all_wb: + x_min = min(wb["left"] for wb in all_wb) + y_min = min(wb["top"] for wb in all_wb) + x_max = max(wb["left"] + wb["width"] for wb in all_wb) + y_max = max(wb["top"] + wb["height"] for wb in all_wb) + else: + x_min = y_min = x_max = y_max = 0 + + new_cells.append({ + "cell_id": cell["cell_id"], + "row_index": ri, + "col_index": span["cols"][0], + "col_type": "spanning_header", + "colspan": len(span["cols"]), + "text": merged_text, + "confidence": cell.get("confidence", 0), + "bbox_px": {"x": x_min, "y": y_min, + "w": x_max - x_min, "h": y_max - y_min}, + "bbox_pct": { + "x": round(x_min / img_w * 100, 2) if img_w else 0, + "y": round(y_min / img_h * 100, 2) if img_h else 0, + "w": round((x_max - x_min) / img_w * 100, 2) if img_w else 0, + "h": round((y_max - y_min) / img_h * 100, 2) if img_h else 0, + }, + "word_boxes": all_wb, + "ocr_engine": cell.get("ocr_engine", ""), + "is_bold": cell.get("is_bold", False), + }) + logger.info( + "colspan detected: row %d, cols %s → merged %d cells (%r)", + ri, span["cols"], len(span_cells), merged_text[:50], + ) + break + if not is_part_of_span: + new_cells.append(cell) + + return new_cells + + def _build_zone_grid( zone_words: List[Dict], zone_x: int, @@ -1295,6 +1430,13 @@ def _build_zone_grid( # Build cells cells = _build_cells(zone_words, columns, rows, img_w, img_h) + # --- Detect colspan (merged cells spanning multiple columns) --- + # A word-block that extends across column boundaries indicates a merged + # cell (like Excel cell-merge). Detect these and replace the split + # cells with a single spanning cell. + if len(columns) >= 2: + cells = _detect_colspan_cells(zone_words, columns, rows, cells, img_w, img_h) + # Prefix cell IDs with zone index for cell in cells: cell["cell_id"] = f"Z{zone_index}_{cell['cell_id']}"