""" Grid Detection Service v4 Detects table/grid structure from OCR bounding-box data. Converts pixel coordinates to percentage and mm coordinates (A4 format). Supports deskew correction, column detection, and multi-line cell handling. Lizenz: Apache 2.0 (kommerziell nutzbar) """ import math import logging from enum import Enum from dataclasses import dataclass, field from typing import List, Optional, Dict, Any, Tuple logger = logging.getLogger(__name__) # A4 dimensions A4_WIDTH_MM = 210.0 A4_HEIGHT_MM = 297.0 # Column margin (1mm) COLUMN_MARGIN_MM = 1.0 COLUMN_MARGIN_PCT = (COLUMN_MARGIN_MM / A4_WIDTH_MM) * 100 class CellStatus(str, Enum): EMPTY = "empty" RECOGNIZED = "recognized" PROBLEMATIC = "problematic" MANUAL = "manual" class ColumnType(str, Enum): ENGLISH = "english" GERMAN = "german" EXAMPLE = "example" UNKNOWN = "unknown" @dataclass class OCRRegion: """A word/phrase detected by OCR with bounding box coordinates in percentage (0-100).""" text: str confidence: float x: float # X position as percentage of page width y: float # Y position as percentage of page height width: float # Width as percentage of page width height: float # Height as percentage of page height @property def x_mm(self) -> float: return round(self.x / 100 * A4_WIDTH_MM, 1) @property def y_mm(self) -> float: return round(self.y / 100 * A4_HEIGHT_MM, 1) @property def width_mm(self) -> float: return round(self.width / 100 * A4_WIDTH_MM, 1) @property def height_mm(self) -> float: return round(self.height / 100 * A4_HEIGHT_MM, 2) @property def center_x(self) -> float: return self.x + self.width / 2 @property def center_y(self) -> float: return self.y + self.height / 2 @property def right(self) -> float: return self.x + self.width @property def bottom(self) -> float: return self.y + self.height @dataclass class GridCell: """A cell in the detected grid with coordinates in percentage (0-100).""" row: int col: int x: float y: float width: float height: float text: str = "" confidence: float = 0.0 status: CellStatus = CellStatus.EMPTY column_type: ColumnType = ColumnType.UNKNOWN logical_row: int = 0 logical_col: int = 0 is_continuation: bool = False @property def x_mm(self) -> float: return round(self.x / 100 * A4_WIDTH_MM, 1) @property def y_mm(self) -> float: return round(self.y / 100 * A4_HEIGHT_MM, 1) @property def width_mm(self) -> float: return round(self.width / 100 * A4_WIDTH_MM, 1) @property def height_mm(self) -> float: return round(self.height / 100 * A4_HEIGHT_MM, 2) def to_dict(self) -> dict: return { "row": self.row, "col": self.col, "x": round(self.x, 2), "y": round(self.y, 2), "width": round(self.width, 2), "height": round(self.height, 2), "x_mm": self.x_mm, "y_mm": self.y_mm, "width_mm": self.width_mm, "height_mm": self.height_mm, "text": self.text, "confidence": self.confidence, "status": self.status.value, "column_type": self.column_type.value, "logical_row": self.logical_row, "logical_col": self.logical_col, "is_continuation": self.is_continuation, } @dataclass class GridResult: """Result of grid detection.""" rows: int = 0 columns: int = 0 cells: List[List[GridCell]] = field(default_factory=list) column_types: List[str] = field(default_factory=list) column_boundaries: List[float] = field(default_factory=list) row_boundaries: List[float] = field(default_factory=list) deskew_angle: float = 0.0 stats: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> dict: cells_dicts = [] for row_cells in self.cells: cells_dicts.append([c.to_dict() for c in row_cells]) return { "rows": self.rows, "columns": self.columns, "cells": cells_dicts, "column_types": self.column_types, "column_boundaries": [round(b, 2) for b in self.column_boundaries], "row_boundaries": [round(b, 2) for b in self.row_boundaries], "deskew_angle": round(self.deskew_angle, 2), "stats": self.stats, "page_dimensions": { "width_mm": A4_WIDTH_MM, "height_mm": A4_HEIGHT_MM, "format": "A4", }, } class GridDetectionService: """Detect grid/table structure from OCR bounding-box regions.""" def __init__(self, y_tolerance_pct: float = 1.5, padding_pct: float = 0.3, column_margin_mm: float = COLUMN_MARGIN_MM): self.y_tolerance_pct = y_tolerance_pct self.padding_pct = padding_pct self.column_margin_mm = column_margin_mm def calculate_deskew_angle(self, regions: List[OCRRegion]) -> float: """Calculate page skew angle from OCR region positions. Uses left-edge alignment of regions to detect consistent tilt. Returns angle in degrees, clamped to ±5°. """ if len(regions) < 3: return 0.0 # Group by similar X position (same column) sorted_by_x = sorted(regions, key=lambda r: r.x) # Find regions that are vertically aligned (similar X) x_tolerance = 3.0 # percent aligned_groups: List[List[OCRRegion]] = [] current_group = [sorted_by_x[0]] for r in sorted_by_x[1:]: if abs(r.x - current_group[0].x) <= x_tolerance: current_group.append(r) else: if len(current_group) >= 3: aligned_groups.append(current_group) current_group = [r] if len(current_group) >= 3: aligned_groups.append(current_group) if not aligned_groups: return 0.0 # Use the largest aligned group to calculate skew best_group = max(aligned_groups, key=len) best_group.sort(key=lambda r: r.y) # Linear regression: X as function of Y n = len(best_group) sum_y = sum(r.y for r in best_group) sum_x = sum(r.x for r in best_group) sum_xy = sum(r.x * r.y for r in best_group) sum_y2 = sum(r.y ** 2 for r in best_group) denom = n * sum_y2 - sum_y ** 2 if denom == 0: return 0.0 slope = (n * sum_xy - sum_y * sum_x) / denom # Convert slope to angle (slope is dx/dy in percent space) # Adjust for aspect ratio: A4 is 210/297 ≈ 0.707 aspect = A4_WIDTH_MM / A4_HEIGHT_MM angle_rad = math.atan(slope * aspect) angle_deg = math.degrees(angle_rad) # Clamp to ±5° return max(-5.0, min(5.0, round(angle_deg, 2))) def apply_deskew_to_regions(self, regions: List[OCRRegion], angle: float) -> List[OCRRegion]: """Apply deskew correction to region coordinates. Rotates all coordinates around the page center by -angle. """ if abs(angle) < 0.01: return regions angle_rad = math.radians(-angle) cos_a = math.cos(angle_rad) sin_a = math.sin(angle_rad) # Page center cx, cy = 50.0, 50.0 result = [] for r in regions: # Rotate center of region around page center rx = r.center_x - cx ry = r.center_y - cy new_cx = rx * cos_a - ry * sin_a + cx new_cy = rx * sin_a + ry * cos_a + cy new_x = new_cx - r.width / 2 new_y = new_cy - r.height / 2 result.append(OCRRegion( text=r.text, confidence=r.confidence, x=round(new_x, 2), y=round(new_y, 2), width=r.width, height=r.height, )) return result def _group_regions_into_rows(self, regions: List[OCRRegion]) -> List[List[OCRRegion]]: """Group regions by Y position into rows.""" if not regions: return [] sorted_regions = sorted(regions, key=lambda r: r.y) rows: List[List[OCRRegion]] = [] current_row = [sorted_regions[0]] current_y = sorted_regions[0].center_y for r in sorted_regions[1:]: if abs(r.center_y - current_y) <= self.y_tolerance_pct: current_row.append(r) else: current_row.sort(key=lambda r: r.x) rows.append(current_row) current_row = [r] current_y = r.center_y if current_row: current_row.sort(key=lambda r: r.x) rows.append(current_row) return rows def _detect_column_boundaries(self, rows: List[List[OCRRegion]]) -> List[float]: """Detect column boundaries from row data.""" if not rows: return [] # Collect all X starting positions all_x = [] for row in rows: for r in row: all_x.append(r.x) if not all_x: return [] all_x.sort() # Gap-based clustering min_gap = 5.0 # percent clusters: List[List[float]] = [] current = [all_x[0]] for x in all_x[1:]: if x - current[-1] > min_gap: clusters.append(current) current = [x] else: current.append(x) if current: clusters.append(current) # Column boundaries: start of each cluster boundaries = [min(c) - self.padding_pct for c in clusters] # Add right boundary boundaries.append(100.0) return boundaries def _assign_column_types(self, boundaries: List[float]) -> List[str]: """Assign column types based on position.""" num_cols = max(0, len(boundaries) - 1) type_map = [ColumnType.ENGLISH, ColumnType.GERMAN, ColumnType.EXAMPLE] result = [] for i in range(num_cols): if i < len(type_map): result.append(type_map[i].value) else: result.append(ColumnType.UNKNOWN.value) return result def detect_grid(self, regions: List[OCRRegion]) -> GridResult: """Detect grid structure from OCR regions. Args: regions: List of OCR regions with percentage-based coordinates. Returns: GridResult with detected rows, columns, and cells. """ if not regions: return GridResult(stats={"recognized": 0, "problematic": 0, "empty": 0, "manual": 0, "total": 0, "coverage": 0.0}) # Step 1: Calculate and apply deskew deskew_angle = self.calculate_deskew_angle(regions) corrected_regions = self.apply_deskew_to_regions(regions, deskew_angle) # Step 2: Group into rows rows = self._group_regions_into_rows(corrected_regions) # Step 3: Detect column boundaries col_boundaries = self._detect_column_boundaries(rows) column_types = self._assign_column_types(col_boundaries) num_cols = max(1, len(col_boundaries) - 1) # Step 4: Build cell grid num_rows = len(rows) row_boundaries = [] cells = [] recognized = 0 problematic = 0 empty = 0 for row_idx, row_regions in enumerate(rows): # Row Y boundary if row_regions: row_y = min(r.y for r in row_regions) - self.padding_pct row_bottom = max(r.bottom for r in row_regions) + self.padding_pct else: row_y = row_idx / num_rows * 100 row_bottom = (row_idx + 1) / num_rows * 100 row_boundaries.append(row_y) row_height = row_bottom - row_y row_cells = [] for col_idx in range(num_cols): col_x = col_boundaries[col_idx] col_right = col_boundaries[col_idx + 1] if col_idx + 1 < len(col_boundaries) else 100.0 col_width = col_right - col_x # Find regions in this cell cell_regions = [] for r in row_regions: r_center = r.center_x if col_x <= r_center < col_right: cell_regions.append(r) if cell_regions: text = " ".join(r.text for r in cell_regions) avg_conf = sum(r.confidence for r in cell_regions) / len(cell_regions) status = CellStatus.RECOGNIZED if avg_conf >= 0.5 else CellStatus.PROBLEMATIC # Use actual bounding box from regions actual_x = min(r.x for r in cell_regions) actual_y = min(r.y for r in cell_regions) actual_right = max(r.right for r in cell_regions) actual_bottom = max(r.bottom for r in cell_regions) cell = GridCell( row=row_idx, col=col_idx, x=actual_x, y=actual_y, width=actual_right - actual_x, height=actual_bottom - actual_y, text=text, confidence=round(avg_conf, 3), status=status, column_type=ColumnType(column_types[col_idx]) if col_idx < len(column_types) else ColumnType.UNKNOWN, logical_row=row_idx, logical_col=col_idx, ) if status == CellStatus.RECOGNIZED: recognized += 1 else: problematic += 1 else: cell = GridCell( row=row_idx, col=col_idx, x=col_x, y=row_y, width=col_width, height=row_height, status=CellStatus.EMPTY, column_type=ColumnType(column_types[col_idx]) if col_idx < len(column_types) else ColumnType.UNKNOWN, logical_row=row_idx, logical_col=col_idx, ) empty += 1 row_cells.append(cell) cells.append(row_cells) # Add final row boundary if rows and rows[-1]: row_boundaries.append(max(r.bottom for r in rows[-1]) + self.padding_pct) else: row_boundaries.append(100.0) total = num_rows * num_cols coverage = (recognized + problematic) / max(total, 1) return GridResult( rows=num_rows, columns=num_cols, cells=cells, column_types=column_types, column_boundaries=col_boundaries, row_boundaries=row_boundaries, deskew_angle=deskew_angle, stats={ "recognized": recognized, "problematic": problematic, "empty": empty, "manual": 0, "total": total, "coverage": round(coverage, 3), }, ) def convert_tesseract_regions(self, tess_words: List[dict], image_width: int, image_height: int) -> List[OCRRegion]: """Convert Tesseract word data (pixels) to OCRRegions (percentages). Args: tess_words: Word list from tesseract_vocab_extractor.extract_bounding_boxes. image_width: Image width in pixels. image_height: Image height in pixels. Returns: List of OCRRegion with percentage-based coordinates. """ if not tess_words or image_width == 0 or image_height == 0: return [] regions = [] for w in tess_words: regions.append(OCRRegion( text=w["text"], confidence=w.get("conf", 50) / 100.0, x=w["left"] / image_width * 100, y=w["top"] / image_height * 100, width=w["width"] / image_width * 100, height=w["height"] / image_height * 100, )) return regions