Files
breakpilot-lehrer/klausur-service/backend/services/grid_detection_service.py
Benjamin Admin bd4b956e3c [split-required] Split final 43 files (500-668 LOC) to complete refactoring
klausur-service (11 files):
- cv_gutter_repair, ocr_pipeline_regression, upload_api
- ocr_pipeline_sessions, smart_spell, nru_worksheet_generator
- ocr_pipeline_overlays, mail/aggregator, zeugnis_api
- cv_syllable_detect, self_rag

backend-lehrer (17 files):
- classroom_engine/suggestions, generators/quiz_generator
- worksheets_api, llm_gateway/comparison, state_engine_api
- classroom/models (→ 4 submodules), services/file_processor
- alerts_agent/api/wizard+digests+routes, content_generators/pdf
- classroom/routes/sessions, llm_gateway/inference
- classroom_engine/analytics, auth/keycloak_auth
- alerts_agent/processing/rule_engine, ai_processor/print_versions

agent-core (5 files):
- brain/memory_store, brain/knowledge_graph, brain/context_manager
- orchestrator/supervisor, sessions/session_manager

admin-lehrer (5 components):
- GridOverlay, StepGridReview, DevOpsPipelineSidebar
- DataFlowDiagram, sbom/wizard/page

website (2 files):
- DependencyMap, lehrer/abitur-archiv

Other: nibis_ingestion, grid_detection_service, export-doclayout-onnx

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-25 09:41:42 +02:00

365 lines
12 KiB
Python

"""
Grid Detection Service v4
Detects table/grid structure from OCR bounding-box data.
Converts pixel coordinates to percentage and mm coordinates (A4 format).
Supports deskew correction, column detection, and multi-line cell handling.
Lizenz: Apache 2.0 (kommerziell nutzbar)
"""
import math
import logging
from typing import List
from .grid_detection_models import (
A4_WIDTH_MM,
A4_HEIGHT_MM,
COLUMN_MARGIN_MM,
CellStatus,
ColumnType,
OCRRegion,
GridCell,
GridResult,
)
logger = logging.getLogger(__name__)
class GridDetectionService:
"""Detect grid/table structure from OCR bounding-box regions."""
def __init__(self, y_tolerance_pct: float = 1.5, padding_pct: float = 0.3,
column_margin_mm: float = COLUMN_MARGIN_MM):
self.y_tolerance_pct = y_tolerance_pct
self.padding_pct = padding_pct
self.column_margin_mm = column_margin_mm
def calculate_deskew_angle(self, regions: List[OCRRegion]) -> float:
"""Calculate page skew angle from OCR region positions.
Uses left-edge alignment of regions to detect consistent tilt.
Returns angle in degrees, clamped to +/-5 degrees.
"""
if len(regions) < 3:
return 0.0
# Group by similar X position (same column)
sorted_by_x = sorted(regions, key=lambda r: r.x)
# Find regions that are vertically aligned (similar X)
x_tolerance = 3.0 # percent
aligned_groups: List[List[OCRRegion]] = []
current_group = [sorted_by_x[0]]
for r in sorted_by_x[1:]:
if abs(r.x - current_group[0].x) <= x_tolerance:
current_group.append(r)
else:
if len(current_group) >= 3:
aligned_groups.append(current_group)
current_group = [r]
if len(current_group) >= 3:
aligned_groups.append(current_group)
if not aligned_groups:
return 0.0
# Use the largest aligned group to calculate skew
best_group = max(aligned_groups, key=len)
best_group.sort(key=lambda r: r.y)
# Linear regression: X as function of Y
n = len(best_group)
sum_y = sum(r.y for r in best_group)
sum_x = sum(r.x for r in best_group)
sum_xy = sum(r.x * r.y for r in best_group)
sum_y2 = sum(r.y ** 2 for r in best_group)
denom = n * sum_y2 - sum_y ** 2
if denom == 0:
return 0.0
slope = (n * sum_xy - sum_y * sum_x) / denom
# Convert slope to angle (slope is dx/dy in percent space)
# Adjust for aspect ratio: A4 is 210/297 ~ 0.707
aspect = A4_WIDTH_MM / A4_HEIGHT_MM
angle_rad = math.atan(slope * aspect)
angle_deg = math.degrees(angle_rad)
# Clamp to +/-5 degrees
return max(-5.0, min(5.0, round(angle_deg, 2)))
def apply_deskew_to_regions(self, regions: List[OCRRegion], angle: float) -> List[OCRRegion]:
"""Apply deskew correction to region coordinates.
Rotates all coordinates around the page center by -angle.
"""
if abs(angle) < 0.01:
return regions
angle_rad = math.radians(-angle)
cos_a = math.cos(angle_rad)
sin_a = math.sin(angle_rad)
# Page center
cx, cy = 50.0, 50.0
result = []
for r in regions:
# Rotate center of region around page center
rx = r.center_x - cx
ry = r.center_y - cy
new_cx = rx * cos_a - ry * sin_a + cx
new_cy = rx * sin_a + ry * cos_a + cy
new_x = new_cx - r.width / 2
new_y = new_cy - r.height / 2
result.append(OCRRegion(
text=r.text,
confidence=r.confidence,
x=round(new_x, 2),
y=round(new_y, 2),
width=r.width,
height=r.height,
))
return result
def _group_regions_into_rows(self, regions: List[OCRRegion]) -> List[List[OCRRegion]]:
"""Group regions by Y position into rows."""
if not regions:
return []
sorted_regions = sorted(regions, key=lambda r: r.y)
rows: List[List[OCRRegion]] = []
current_row = [sorted_regions[0]]
current_y = sorted_regions[0].center_y
for r in sorted_regions[1:]:
if abs(r.center_y - current_y) <= self.y_tolerance_pct:
current_row.append(r)
else:
current_row.sort(key=lambda r: r.x)
rows.append(current_row)
current_row = [r]
current_y = r.center_y
if current_row:
current_row.sort(key=lambda r: r.x)
rows.append(current_row)
return rows
def _detect_column_boundaries(self, rows: List[List[OCRRegion]]) -> List[float]:
"""Detect column boundaries from row data."""
if not rows:
return []
# Collect all X starting positions
all_x = []
for row in rows:
for r in row:
all_x.append(r.x)
if not all_x:
return []
all_x.sort()
# Gap-based clustering
min_gap = 5.0 # percent
clusters: List[List[float]] = []
current = [all_x[0]]
for x in all_x[1:]:
if x - current[-1] > min_gap:
clusters.append(current)
current = [x]
else:
current.append(x)
if current:
clusters.append(current)
# Column boundaries: start of each cluster
boundaries = [min(c) - self.padding_pct for c in clusters]
# Add right boundary
boundaries.append(100.0)
return boundaries
def _assign_column_types(self, boundaries: List[float]) -> List[str]:
"""Assign column types based on position."""
num_cols = max(0, len(boundaries) - 1)
type_map = [ColumnType.ENGLISH, ColumnType.GERMAN, ColumnType.EXAMPLE]
result = []
for i in range(num_cols):
if i < len(type_map):
result.append(type_map[i].value)
else:
result.append(ColumnType.UNKNOWN.value)
return result
def detect_grid(self, regions: List[OCRRegion]) -> GridResult:
"""Detect grid structure from OCR regions.
Args:
regions: List of OCR regions with percentage-based coordinates.
Returns:
GridResult with detected rows, columns, and cells.
"""
if not regions:
return GridResult(stats={"recognized": 0, "problematic": 0, "empty": 0, "manual": 0, "total": 0, "coverage": 0.0})
# Step 1: Calculate and apply deskew
deskew_angle = self.calculate_deskew_angle(regions)
corrected_regions = self.apply_deskew_to_regions(regions, deskew_angle)
# Step 2: Group into rows
rows = self._group_regions_into_rows(corrected_regions)
# Step 3: Detect column boundaries
col_boundaries = self._detect_column_boundaries(rows)
column_types = self._assign_column_types(col_boundaries)
num_cols = max(1, len(col_boundaries) - 1)
# Step 4: Build cell grid
num_rows = len(rows)
row_boundaries = []
cells = []
recognized = 0
problematic = 0
empty = 0
for row_idx, row_regions in enumerate(rows):
# Row Y boundary
if row_regions:
row_y = min(r.y for r in row_regions) - self.padding_pct
row_bottom = max(r.bottom for r in row_regions) + self.padding_pct
else:
row_y = row_idx / num_rows * 100
row_bottom = (row_idx + 1) / num_rows * 100
row_boundaries.append(row_y)
row_height = row_bottom - row_y
row_cells = []
for col_idx in range(num_cols):
col_x = col_boundaries[col_idx]
col_right = col_boundaries[col_idx + 1] if col_idx + 1 < len(col_boundaries) else 100.0
col_width = col_right - col_x
# Find regions in this cell
cell_regions = []
for r in row_regions:
r_center = r.center_x
if col_x <= r_center < col_right:
cell_regions.append(r)
if cell_regions:
text = " ".join(r.text for r in cell_regions)
avg_conf = sum(r.confidence for r in cell_regions) / len(cell_regions)
status = CellStatus.RECOGNIZED if avg_conf >= 0.5 else CellStatus.PROBLEMATIC
# Use actual bounding box from regions
actual_x = min(r.x for r in cell_regions)
actual_y = min(r.y for r in cell_regions)
actual_right = max(r.right for r in cell_regions)
actual_bottom = max(r.bottom for r in cell_regions)
cell = GridCell(
row=row_idx,
col=col_idx,
x=actual_x,
y=actual_y,
width=actual_right - actual_x,
height=actual_bottom - actual_y,
text=text,
confidence=round(avg_conf, 3),
status=status,
column_type=ColumnType(column_types[col_idx]) if col_idx < len(column_types) else ColumnType.UNKNOWN,
logical_row=row_idx,
logical_col=col_idx,
)
if status == CellStatus.RECOGNIZED:
recognized += 1
else:
problematic += 1
else:
cell = GridCell(
row=row_idx,
col=col_idx,
x=col_x,
y=row_y,
width=col_width,
height=row_height,
status=CellStatus.EMPTY,
column_type=ColumnType(column_types[col_idx]) if col_idx < len(column_types) else ColumnType.UNKNOWN,
logical_row=row_idx,
logical_col=col_idx,
)
empty += 1
row_cells.append(cell)
cells.append(row_cells)
# Add final row boundary
if rows and rows[-1]:
row_boundaries.append(max(r.bottom for r in rows[-1]) + self.padding_pct)
else:
row_boundaries.append(100.0)
total = num_rows * num_cols
coverage = (recognized + problematic) / max(total, 1)
return GridResult(
rows=num_rows,
columns=num_cols,
cells=cells,
column_types=column_types,
column_boundaries=col_boundaries,
row_boundaries=row_boundaries,
deskew_angle=deskew_angle,
stats={
"recognized": recognized,
"problematic": problematic,
"empty": empty,
"manual": 0,
"total": total,
"coverage": round(coverage, 3),
},
)
def convert_tesseract_regions(self, tess_words: List[dict],
image_width: int, image_height: int) -> List[OCRRegion]:
"""Convert Tesseract word data (pixels) to OCRRegions (percentages).
Args:
tess_words: Word list from tesseract_vocab_extractor.extract_bounding_boxes.
image_width: Image width in pixels.
image_height: Image height in pixels.
Returns:
List of OCRRegion with percentage-based coordinates.
"""
if not tess_words or image_width == 0 or image_height == 0:
return []
regions = []
for w in tess_words:
regions.append(OCRRegion(
text=w["text"],
confidence=w.get("conf", 50) / 100.0,
x=w["left"] / image_width * 100,
y=w["top"] / image_height * 100,
width=w["width"] / image_width * 100,
height=w["height"] / image_height * 100,
))
return regions