klausur-service (11 files): - cv_gutter_repair, ocr_pipeline_regression, upload_api - ocr_pipeline_sessions, smart_spell, nru_worksheet_generator - ocr_pipeline_overlays, mail/aggregator, zeugnis_api - cv_syllable_detect, self_rag backend-lehrer (17 files): - classroom_engine/suggestions, generators/quiz_generator - worksheets_api, llm_gateway/comparison, state_engine_api - classroom/models (→ 4 submodules), services/file_processor - alerts_agent/api/wizard+digests+routes, content_generators/pdf - classroom/routes/sessions, llm_gateway/inference - classroom_engine/analytics, auth/keycloak_auth - alerts_agent/processing/rule_engine, ai_processor/print_versions agent-core (5 files): - brain/memory_store, brain/knowledge_graph, brain/context_manager - orchestrator/supervisor, sessions/session_manager admin-lehrer (5 components): - GridOverlay, StepGridReview, DevOpsPipelineSidebar - DataFlowDiagram, sbom/wizard/page website (2 files): - DependencyMap, lehrer/abitur-archiv Other: nibis_ingestion, grid_detection_service, export-doclayout-onnx Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
365 lines
12 KiB
Python
365 lines
12 KiB
Python
"""
|
|
Grid Detection Service v4
|
|
|
|
Detects table/grid structure from OCR bounding-box data.
|
|
Converts pixel coordinates to percentage and mm coordinates (A4 format).
|
|
Supports deskew correction, column detection, and multi-line cell handling.
|
|
|
|
Lizenz: Apache 2.0 (kommerziell nutzbar)
|
|
"""
|
|
|
|
import math
|
|
import logging
|
|
from typing import List
|
|
|
|
from .grid_detection_models import (
|
|
A4_WIDTH_MM,
|
|
A4_HEIGHT_MM,
|
|
COLUMN_MARGIN_MM,
|
|
CellStatus,
|
|
ColumnType,
|
|
OCRRegion,
|
|
GridCell,
|
|
GridResult,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class GridDetectionService:
|
|
"""Detect grid/table structure from OCR bounding-box regions."""
|
|
|
|
def __init__(self, y_tolerance_pct: float = 1.5, padding_pct: float = 0.3,
|
|
column_margin_mm: float = COLUMN_MARGIN_MM):
|
|
self.y_tolerance_pct = y_tolerance_pct
|
|
self.padding_pct = padding_pct
|
|
self.column_margin_mm = column_margin_mm
|
|
|
|
def calculate_deskew_angle(self, regions: List[OCRRegion]) -> float:
|
|
"""Calculate page skew angle from OCR region positions.
|
|
|
|
Uses left-edge alignment of regions to detect consistent tilt.
|
|
Returns angle in degrees, clamped to +/-5 degrees.
|
|
"""
|
|
if len(regions) < 3:
|
|
return 0.0
|
|
|
|
# Group by similar X position (same column)
|
|
sorted_by_x = sorted(regions, key=lambda r: r.x)
|
|
|
|
# Find regions that are vertically aligned (similar X)
|
|
x_tolerance = 3.0 # percent
|
|
aligned_groups: List[List[OCRRegion]] = []
|
|
current_group = [sorted_by_x[0]]
|
|
|
|
for r in sorted_by_x[1:]:
|
|
if abs(r.x - current_group[0].x) <= x_tolerance:
|
|
current_group.append(r)
|
|
else:
|
|
if len(current_group) >= 3:
|
|
aligned_groups.append(current_group)
|
|
current_group = [r]
|
|
|
|
if len(current_group) >= 3:
|
|
aligned_groups.append(current_group)
|
|
|
|
if not aligned_groups:
|
|
return 0.0
|
|
|
|
# Use the largest aligned group to calculate skew
|
|
best_group = max(aligned_groups, key=len)
|
|
best_group.sort(key=lambda r: r.y)
|
|
|
|
# Linear regression: X as function of Y
|
|
n = len(best_group)
|
|
sum_y = sum(r.y for r in best_group)
|
|
sum_x = sum(r.x for r in best_group)
|
|
sum_xy = sum(r.x * r.y for r in best_group)
|
|
sum_y2 = sum(r.y ** 2 for r in best_group)
|
|
|
|
denom = n * sum_y2 - sum_y ** 2
|
|
if denom == 0:
|
|
return 0.0
|
|
|
|
slope = (n * sum_xy - sum_y * sum_x) / denom
|
|
|
|
# Convert slope to angle (slope is dx/dy in percent space)
|
|
# Adjust for aspect ratio: A4 is 210/297 ~ 0.707
|
|
aspect = A4_WIDTH_MM / A4_HEIGHT_MM
|
|
angle_rad = math.atan(slope * aspect)
|
|
angle_deg = math.degrees(angle_rad)
|
|
|
|
# Clamp to +/-5 degrees
|
|
return max(-5.0, min(5.0, round(angle_deg, 2)))
|
|
|
|
def apply_deskew_to_regions(self, regions: List[OCRRegion], angle: float) -> List[OCRRegion]:
|
|
"""Apply deskew correction to region coordinates.
|
|
|
|
Rotates all coordinates around the page center by -angle.
|
|
"""
|
|
if abs(angle) < 0.01:
|
|
return regions
|
|
|
|
angle_rad = math.radians(-angle)
|
|
cos_a = math.cos(angle_rad)
|
|
sin_a = math.sin(angle_rad)
|
|
|
|
# Page center
|
|
cx, cy = 50.0, 50.0
|
|
|
|
result = []
|
|
for r in regions:
|
|
# Rotate center of region around page center
|
|
rx = r.center_x - cx
|
|
ry = r.center_y - cy
|
|
new_cx = rx * cos_a - ry * sin_a + cx
|
|
new_cy = rx * sin_a + ry * cos_a + cy
|
|
new_x = new_cx - r.width / 2
|
|
new_y = new_cy - r.height / 2
|
|
|
|
result.append(OCRRegion(
|
|
text=r.text,
|
|
confidence=r.confidence,
|
|
x=round(new_x, 2),
|
|
y=round(new_y, 2),
|
|
width=r.width,
|
|
height=r.height,
|
|
))
|
|
|
|
return result
|
|
|
|
def _group_regions_into_rows(self, regions: List[OCRRegion]) -> List[List[OCRRegion]]:
|
|
"""Group regions by Y position into rows."""
|
|
if not regions:
|
|
return []
|
|
|
|
sorted_regions = sorted(regions, key=lambda r: r.y)
|
|
rows: List[List[OCRRegion]] = []
|
|
current_row = [sorted_regions[0]]
|
|
current_y = sorted_regions[0].center_y
|
|
|
|
for r in sorted_regions[1:]:
|
|
if abs(r.center_y - current_y) <= self.y_tolerance_pct:
|
|
current_row.append(r)
|
|
else:
|
|
current_row.sort(key=lambda r: r.x)
|
|
rows.append(current_row)
|
|
current_row = [r]
|
|
current_y = r.center_y
|
|
|
|
if current_row:
|
|
current_row.sort(key=lambda r: r.x)
|
|
rows.append(current_row)
|
|
|
|
return rows
|
|
|
|
def _detect_column_boundaries(self, rows: List[List[OCRRegion]]) -> List[float]:
|
|
"""Detect column boundaries from row data."""
|
|
if not rows:
|
|
return []
|
|
|
|
# Collect all X starting positions
|
|
all_x = []
|
|
for row in rows:
|
|
for r in row:
|
|
all_x.append(r.x)
|
|
|
|
if not all_x:
|
|
return []
|
|
|
|
all_x.sort()
|
|
|
|
# Gap-based clustering
|
|
min_gap = 5.0 # percent
|
|
clusters: List[List[float]] = []
|
|
current = [all_x[0]]
|
|
|
|
for x in all_x[1:]:
|
|
if x - current[-1] > min_gap:
|
|
clusters.append(current)
|
|
current = [x]
|
|
else:
|
|
current.append(x)
|
|
|
|
if current:
|
|
clusters.append(current)
|
|
|
|
# Column boundaries: start of each cluster
|
|
boundaries = [min(c) - self.padding_pct for c in clusters]
|
|
# Add right boundary
|
|
boundaries.append(100.0)
|
|
|
|
return boundaries
|
|
|
|
def _assign_column_types(self, boundaries: List[float]) -> List[str]:
|
|
"""Assign column types based on position."""
|
|
num_cols = max(0, len(boundaries) - 1)
|
|
type_map = [ColumnType.ENGLISH, ColumnType.GERMAN, ColumnType.EXAMPLE]
|
|
result = []
|
|
for i in range(num_cols):
|
|
if i < len(type_map):
|
|
result.append(type_map[i].value)
|
|
else:
|
|
result.append(ColumnType.UNKNOWN.value)
|
|
return result
|
|
|
|
def detect_grid(self, regions: List[OCRRegion]) -> GridResult:
|
|
"""Detect grid structure from OCR regions.
|
|
|
|
Args:
|
|
regions: List of OCR regions with percentage-based coordinates.
|
|
|
|
Returns:
|
|
GridResult with detected rows, columns, and cells.
|
|
"""
|
|
if not regions:
|
|
return GridResult(stats={"recognized": 0, "problematic": 0, "empty": 0, "manual": 0, "total": 0, "coverage": 0.0})
|
|
|
|
# Step 1: Calculate and apply deskew
|
|
deskew_angle = self.calculate_deskew_angle(regions)
|
|
corrected_regions = self.apply_deskew_to_regions(regions, deskew_angle)
|
|
|
|
# Step 2: Group into rows
|
|
rows = self._group_regions_into_rows(corrected_regions)
|
|
|
|
# Step 3: Detect column boundaries
|
|
col_boundaries = self._detect_column_boundaries(rows)
|
|
column_types = self._assign_column_types(col_boundaries)
|
|
num_cols = max(1, len(col_boundaries) - 1)
|
|
|
|
# Step 4: Build cell grid
|
|
num_rows = len(rows)
|
|
row_boundaries = []
|
|
cells = []
|
|
|
|
recognized = 0
|
|
problematic = 0
|
|
empty = 0
|
|
|
|
for row_idx, row_regions in enumerate(rows):
|
|
# Row Y boundary
|
|
if row_regions:
|
|
row_y = min(r.y for r in row_regions) - self.padding_pct
|
|
row_bottom = max(r.bottom for r in row_regions) + self.padding_pct
|
|
else:
|
|
row_y = row_idx / num_rows * 100
|
|
row_bottom = (row_idx + 1) / num_rows * 100
|
|
|
|
row_boundaries.append(row_y)
|
|
row_height = row_bottom - row_y
|
|
|
|
row_cells = []
|
|
for col_idx in range(num_cols):
|
|
col_x = col_boundaries[col_idx]
|
|
col_right = col_boundaries[col_idx + 1] if col_idx + 1 < len(col_boundaries) else 100.0
|
|
col_width = col_right - col_x
|
|
|
|
# Find regions in this cell
|
|
cell_regions = []
|
|
for r in row_regions:
|
|
r_center = r.center_x
|
|
if col_x <= r_center < col_right:
|
|
cell_regions.append(r)
|
|
|
|
if cell_regions:
|
|
text = " ".join(r.text for r in cell_regions)
|
|
avg_conf = sum(r.confidence for r in cell_regions) / len(cell_regions)
|
|
status = CellStatus.RECOGNIZED if avg_conf >= 0.5 else CellStatus.PROBLEMATIC
|
|
# Use actual bounding box from regions
|
|
actual_x = min(r.x for r in cell_regions)
|
|
actual_y = min(r.y for r in cell_regions)
|
|
actual_right = max(r.right for r in cell_regions)
|
|
actual_bottom = max(r.bottom for r in cell_regions)
|
|
|
|
cell = GridCell(
|
|
row=row_idx,
|
|
col=col_idx,
|
|
x=actual_x,
|
|
y=actual_y,
|
|
width=actual_right - actual_x,
|
|
height=actual_bottom - actual_y,
|
|
text=text,
|
|
confidence=round(avg_conf, 3),
|
|
status=status,
|
|
column_type=ColumnType(column_types[col_idx]) if col_idx < len(column_types) else ColumnType.UNKNOWN,
|
|
logical_row=row_idx,
|
|
logical_col=col_idx,
|
|
)
|
|
|
|
if status == CellStatus.RECOGNIZED:
|
|
recognized += 1
|
|
else:
|
|
problematic += 1
|
|
else:
|
|
cell = GridCell(
|
|
row=row_idx,
|
|
col=col_idx,
|
|
x=col_x,
|
|
y=row_y,
|
|
width=col_width,
|
|
height=row_height,
|
|
status=CellStatus.EMPTY,
|
|
column_type=ColumnType(column_types[col_idx]) if col_idx < len(column_types) else ColumnType.UNKNOWN,
|
|
logical_row=row_idx,
|
|
logical_col=col_idx,
|
|
)
|
|
empty += 1
|
|
|
|
row_cells.append(cell)
|
|
cells.append(row_cells)
|
|
|
|
# Add final row boundary
|
|
if rows and rows[-1]:
|
|
row_boundaries.append(max(r.bottom for r in rows[-1]) + self.padding_pct)
|
|
else:
|
|
row_boundaries.append(100.0)
|
|
|
|
total = num_rows * num_cols
|
|
coverage = (recognized + problematic) / max(total, 1)
|
|
|
|
return GridResult(
|
|
rows=num_rows,
|
|
columns=num_cols,
|
|
cells=cells,
|
|
column_types=column_types,
|
|
column_boundaries=col_boundaries,
|
|
row_boundaries=row_boundaries,
|
|
deskew_angle=deskew_angle,
|
|
stats={
|
|
"recognized": recognized,
|
|
"problematic": problematic,
|
|
"empty": empty,
|
|
"manual": 0,
|
|
"total": total,
|
|
"coverage": round(coverage, 3),
|
|
},
|
|
)
|
|
|
|
def convert_tesseract_regions(self, tess_words: List[dict],
|
|
image_width: int, image_height: int) -> List[OCRRegion]:
|
|
"""Convert Tesseract word data (pixels) to OCRRegions (percentages).
|
|
|
|
Args:
|
|
tess_words: Word list from tesseract_vocab_extractor.extract_bounding_boxes.
|
|
image_width: Image width in pixels.
|
|
image_height: Image height in pixels.
|
|
|
|
Returns:
|
|
List of OCRRegion with percentage-based coordinates.
|
|
"""
|
|
if not tess_words or image_width == 0 or image_height == 0:
|
|
return []
|
|
|
|
regions = []
|
|
for w in tess_words:
|
|
regions.append(OCRRegion(
|
|
text=w["text"],
|
|
confidence=w.get("conf", 50) / 100.0,
|
|
x=w["left"] / image_width * 100,
|
|
y=w["top"] / image_height * 100,
|
|
width=w["width"] / image_width * 100,
|
|
height=w["height"] / image_height * 100,
|
|
))
|
|
|
|
return regions
|