From 7005b1856126c56b48c3e059b1012c3b3b6ca63a Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Mon, 9 Mar 2026 15:06:23 +0100 Subject: [PATCH] feat: generische Box-Erkennung fuer zonenbasierte Spaltenerkennung - Neue Datei cv_box_detect.py: 2-Stufen-Algorithmus (Linien + Farbe) - DetectedBox/PageZone Dataclasses in cv_vocab_types.py - detect_column_geometry_zoned() in cv_layout.py - API-Endpoints erweitert: zones/boxes_detected im column_result - Overlay-Funktionen zeichnen Box-Grenzen als gestrichelte Rechtecke - Fix: numpy array or-Verknuepfung an 7 Stellen in ocr_pipeline_api.py - 12 Unit-Tests fuer Box-Erkennung und Zone-Splitting Co-Authored-By: Claude Opus 4.6 --- klausur-service/backend/cv_box_detect.py | 369 ++++++++++++++++++ klausur-service/backend/cv_layout.py | 132 +++++++ klausur-service/backend/cv_vocab_pipeline.py | 1 + klausur-service/backend/cv_vocab_types.py | 24 ++ klausur-service/backend/ocr_pipeline_api.py | 83 +++- .../backend/tests/test_cv_box_detect.py | 226 +++++++++++ 6 files changed, 821 insertions(+), 14 deletions(-) create mode 100644 klausur-service/backend/cv_box_detect.py create mode 100644 klausur-service/backend/tests/test_cv_box_detect.py diff --git a/klausur-service/backend/cv_box_detect.py b/klausur-service/backend/cv_box_detect.py new file mode 100644 index 0000000..f6c4a4a --- /dev/null +++ b/klausur-service/backend/cv_box_detect.py @@ -0,0 +1,369 @@ +""" +Embedded box detection and page zone splitting for the CV vocabulary pipeline. + +Detects boxes (grammar tips, exercises, etc.) that span the page width and +interrupt the normal column layout. Splits the page into vertical zones so +that column detection can run independently per zone. + +Two-stage algorithm: + 1. Morphological line detection — finds bordered boxes via horizontal lines. + 2. Color/saturation fallback — finds shaded boxes without visible borders. + +Lizenz: Apache 2.0 (kommerziell nutzbar) +DATENSCHUTZ: Alle Verarbeitung erfolgt lokal. +""" + +import logging +from typing import List, Optional, Tuple + +import cv2 +import numpy as np + +from cv_vocab_types import DetectedBox, PageZone + +logger = logging.getLogger(__name__) + +__all__ = [ + "detect_boxes", + "split_page_into_zones", +] + + +# --------------------------------------------------------------------------- +# Stage 1: Morphological line detection +# --------------------------------------------------------------------------- + +def _detect_boxes_by_lines( + gray: np.ndarray, + content_x: int, + content_w: int, + content_y: int, + content_h: int, +) -> List[DetectedBox]: + """Find boxes defined by pairs of long horizontal border lines. + + Args: + gray: Grayscale image (full page). + content_x, content_w: Horizontal content bounds. + content_y, content_h: Vertical content bounds. + + Returns: + List of DetectedBox for each detected bordered box. + """ + h, w = gray.shape[:2] + + # Binarize: dark pixels → white on black background + _, binary = cv2.threshold(gray, 180, 255, cv2.THRESH_BINARY_INV) + + # Horizontal morphology kernel — at least 50% of content width + kernel_w = max(50, content_w // 2) + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_w, 1)) + lines_img = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel) + + # Horizontal projection: count line pixels per row + h_proj = np.sum(lines_img[:, content_x:content_x + content_w] > 0, axis=1) + line_threshold = content_w * 0.30 + + # Group consecutive rows with enough line pixels into line segments + line_segments: List[Tuple[int, int]] = [] # (y_start, y_end) + seg_start: Optional[int] = None + for y in range(h): + if h_proj[y] >= line_threshold: + if seg_start is None: + seg_start = y + else: + if seg_start is not None: + line_segments.append((seg_start, y)) + seg_start = None + if seg_start is not None: + line_segments.append((seg_start, h)) + + if len(line_segments) < 2: + return [] + + # Pair lines into boxes: top-line + bottom-line + # Minimum box height: 30px. Maximum: 70% of content height. + min_box_h = 30 + max_box_h = int(content_h * 0.70) + + boxes: List[DetectedBox] = [] + used = set() + for i, (top_start, top_end) in enumerate(line_segments): + if i in used: + continue + for j in range(i + 1, len(line_segments)): + if j in used: + continue + bot_start, bot_end = line_segments[j] + box_y = top_start + box_h = bot_end - top_start + if box_h < min_box_h or box_h > max_box_h: + continue + + # Estimate border thickness from line segment heights + border_top = top_end - top_start + border_bot = bot_end - bot_start + + box = DetectedBox( + x=content_x, + y=box_y, + width=content_w, + height=box_h, + confidence=0.8, + border_thickness=max(border_top, border_bot), + ) + boxes.append(box) + used.add(i) + used.add(j) + break # move to next top-line candidate + + return boxes + + +# --------------------------------------------------------------------------- +# Stage 2: Color / saturation fallback +# --------------------------------------------------------------------------- + +def _detect_boxes_by_color( + img_bgr: np.ndarray, + content_x: int, + content_w: int, + content_y: int, + content_h: int, +) -> List[DetectedBox]: + """Find boxes with shaded/colored background (no visible border lines). + + Args: + img_bgr: BGR color image (full page). + content_x, content_w: Horizontal content bounds. + content_y, content_h: Vertical content bounds. + + Returns: + List of DetectedBox for each detected shaded box. + """ + h, w = img_bgr.shape[:2] + + hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV) + gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) + + # Mask: pixels that are saturated OR noticeably darker than white + sat_mask = hsv[:, :, 1] > 25 + dark_mask = gray < 220 + combined = (sat_mask | dark_mask).astype(np.uint8) * 255 + + # Close small gaps in the mask + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15)) + combined = cv2.morphologyEx(combined, cv2.MORPH_CLOSE, kernel) + + contours, _ = cv2.findContours(combined, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + min_area = content_w * content_h * 0.05 + min_box_h = 30 + max_box_h = int(content_h * 0.70) + min_width_ratio = 0.60 + + boxes: List[DetectedBox] = [] + for cnt in contours: + area = cv2.contourArea(cnt) + if area < min_area: + continue + + # Approximate to polygon — check if roughly rectangular + peri = cv2.arcLength(cnt, True) + approx = cv2.approxPolyDP(cnt, 0.04 * peri, True) + if len(approx) < 4 or len(approx) > 8: + continue + + bx, by, bw, bh = cv2.boundingRect(cnt) + + # Width filter: must span most of the page + if bw < content_w * min_width_ratio: + continue + + # Height filter + if bh < min_box_h or bh > max_box_h: + continue + + boxes.append(DetectedBox( + x=bx, + y=by, + width=bw, + height=bh, + confidence=0.6, + border_thickness=0, + )) + + return boxes + + +# --------------------------------------------------------------------------- +# Validation +# --------------------------------------------------------------------------- + +def _validate_box( + box: DetectedBox, + gray: np.ndarray, + content_w: int, + content_h: int, + median_row_gap: int, +) -> bool: + """Validate that a detected box is genuine (not a table-row separator etc.).""" + # Must span > 60% of content width + if box.width < content_w * 0.60: + return False + + # Height constraints + if box.height < 30 or box.height > content_h * 0.70: + return False + + # Must not be confused with a table-row separator: + # real boxes are at least 3x the median row gap + if median_row_gap > 0 and box.height < median_row_gap * 3: + return False + + # Must contain some text (ink density check) + roi = gray[box.y:box.y + box.height, box.x:box.x + box.width] + if roi.size == 0: + return False + ink_ratio = np.sum(roi < 128) / roi.size + if ink_ratio < 0.002: # nearly empty → not a real content box + return False + + return True + + +# --------------------------------------------------------------------------- +# Public API: detect_boxes +# --------------------------------------------------------------------------- + +def detect_boxes( + img_bgr: np.ndarray, + content_x: int, + content_w: int, + content_y: int, + content_h: int, + median_row_gap: int = 0, +) -> List[DetectedBox]: + """Detect embedded boxes on a page image. + + Runs line-based detection first, then color-based fallback if no + bordered boxes are found. + + Args: + img_bgr: BGR color image (full page or cropped). + content_x, content_w: Horizontal content bounds. + content_y, content_h: Vertical content bounds. + median_row_gap: Median row gap height (for filtering out table separators). + + Returns: + List of validated DetectedBox instances, sorted by y position. + """ + gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) + + # Stage 1: Line-based detection + boxes = _detect_boxes_by_lines(gray, content_x, content_w, content_y, content_h) + + # Stage 2: Color fallback if no bordered boxes found + if not boxes: + boxes = _detect_boxes_by_color(img_bgr, content_x, content_w, content_y, content_h) + + # Validate + validated = [b for b in boxes if _validate_box(b, gray, content_w, content_h, median_row_gap)] + + # Sort top to bottom + validated.sort(key=lambda b: b.y) + + if validated: + logger.info(f"BoxDetect: {len(validated)} box(es) detected " + f"(from {len(boxes)} candidates)") + else: + logger.debug("BoxDetect: no boxes detected") + + return validated + + +# --------------------------------------------------------------------------- +# Zone Splitting +# --------------------------------------------------------------------------- + +def split_page_into_zones( + content_x: int, + content_y: int, + content_w: int, + content_h: int, + boxes: List[DetectedBox], + min_zone_height: int = 40, +) -> List[PageZone]: + """Split a page into vertical zones based on detected boxes. + + Regions above, between, and below boxes become 'content' zones; + box regions become 'box' zones. + + Args: + content_x, content_y, content_w, content_h: Content area bounds. + boxes: Detected boxes, sorted by y position. + min_zone_height: Minimum height for a content zone to be kept. + + Returns: + List of PageZone, ordered top to bottom. + """ + if not boxes: + # Single zone: entire content area + return [PageZone( + index=0, + zone_type='content', + y=content_y, + height=content_h, + x=content_x, + width=content_w, + )] + + zones: List[PageZone] = [] + zone_idx = 0 + cursor_y = content_y + content_bottom = content_y + content_h + + for box in boxes: + # Content zone above this box + gap_above = box.y - cursor_y + if gap_above >= min_zone_height: + zones.append(PageZone( + index=zone_idx, + zone_type='content', + y=cursor_y, + height=gap_above, + x=content_x, + width=content_w, + )) + zone_idx += 1 + + # Box zone + zones.append(PageZone( + index=zone_idx, + zone_type='box', + y=box.y, + height=box.height, + x=box.x, + width=box.width, + box=box, + )) + zone_idx += 1 + + cursor_y = box.y + box.height + + # Content zone below last box + remaining = content_bottom - cursor_y + if remaining >= min_zone_height: + zones.append(PageZone( + index=zone_idx, + zone_type='content', + y=cursor_y, + height=remaining, + x=content_x, + width=content_w, + )) + + logger.info(f"ZoneSplit: {len(zones)} zones from {len(boxes)} box(es): " + f"{[z.zone_type for z in zones]}") + + return zones diff --git a/klausur-service/backend/cv_layout.py b/klausur-service/backend/cv_layout.py index 47713a1..22ae93a 100644 --- a/klausur-service/backend/cv_layout.py +++ b/klausur-service/backend/cv_layout.py @@ -13,10 +13,12 @@ import numpy as np from cv_vocab_types import ( ColumnGeometry, + DetectedBox, DocumentTypeResult, ENGLISH_FUNCTION_WORDS, GERMAN_FUNCTION_WORDS, PageRegion, + PageZone, RowGeometry, ) @@ -3034,3 +3036,133 @@ def analyze_layout_by_words(ocr_img: np.ndarray, dewarped_bgr: np.ndarray) -> Li f"{[(r.type, r.x, r.width, r.classification_confidence) for r in regions if r.type not in ('header','footer','margin_top','margin_bottom')]}") return regions + + +# --------------------------------------------------------------------------- +# Zone-aware column geometry detection +# --------------------------------------------------------------------------- + +def detect_column_geometry_zoned( + ocr_img: np.ndarray, + dewarped_bgr: np.ndarray, +) -> Optional[Tuple[ + List[ColumnGeometry], # flat column list (all zones) + int, int, int, int, # left_x, right_x, top_y, bottom_y + List[Dict], # word_dicts + np.ndarray, # inv + List[Dict], # zones (serializable) + List[DetectedBox], # detected boxes +]]: + """Zone-aware column geometry detection. + + 1. Finds content bounds. + 2. Runs box detection. + 3. If boxes found: splits page into zones, runs detect_column_geometry() + per content zone on the corresponding sub-image. + 4. If no boxes: delegates entirely to detect_column_geometry() (backward compat). + + Returns: + Extended tuple: (geometries, left_x, right_x, top_y, bottom_y, + word_dicts, inv, zones_data, boxes) + or None if detection fails. + """ + from cv_box_detect import detect_boxes, split_page_into_zones + + # First run normal detection to get content bounds and word data + geo_result = detect_column_geometry(ocr_img, dewarped_bgr) + if geo_result is None: + return None + + geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv = geo_result + content_w = right_x - left_x + content_h = bottom_y - top_y + + # Detect boxes in the image + boxes = detect_boxes( + dewarped_bgr, left_x, content_w, top_y, content_h, + ) + + if not boxes: + # No boxes — single zone, backward compatible + zone_data = [{ + "index": 0, + "zone_type": "content", + "y": top_y, + "height": content_h, + "x": left_x, + "width": content_w, + "columns": [], # filled later by caller + }] + return (geometries, left_x, right_x, top_y, bottom_y, + word_dicts, inv, zone_data, boxes) + + # Split into zones + zones = split_page_into_zones(left_x, top_y, content_w, content_h, boxes) + + # Run column detection per content zone + all_geometries: List[ColumnGeometry] = [] + zones_data: List[Dict] = [] + + for zone in zones: + zone_dict: Dict = { + "index": zone.index, + "zone_type": zone.zone_type, + "y": zone.y, + "height": zone.height, + "x": zone.x, + "width": zone.width, + "columns": [], + } + + if zone.box is not None: + zone_dict["box"] = { + "x": zone.box.x, + "y": zone.box.y, + "width": zone.box.width, + "height": zone.box.height, + "confidence": zone.box.confidence, + "border_thickness": zone.box.border_thickness, + } + + if zone.zone_type == 'content' and zone.height >= 40: + # Extract sub-image for this zone + zone_y_end = zone.y + zone.height + sub_ocr = ocr_img[zone.y:zone_y_end, :] + sub_bgr = dewarped_bgr[zone.y:zone_y_end, :] + + sub_result = detect_column_geometry(sub_ocr, sub_bgr) + if sub_result is not None: + sub_geoms, sub_lx, sub_rx, sub_ty, sub_by, _sub_words, _sub_inv = sub_result + + # Offset column y-coordinates back to absolute page coords + for g in sub_geoms: + g.y += zone.y + + zone_cols = [] + for g in sub_geoms: + zone_cols.append({ + "index": g.index, + "x": g.x, + "y": g.y, + "width": g.width, + "height": g.height, + "word_count": g.word_count, + "width_ratio": g.width_ratio, + "zone_index": zone.index, + }) + zone_dict["columns"] = zone_cols + all_geometries.extend(sub_geoms) + else: + logger.debug(f"ZonedColumns: zone {zone.index} column detection returned None") + + zones_data.append(zone_dict) + + # If per-zone detection produced no columns, fall back to the original + if not all_geometries: + all_geometries = geometries + + logger.info(f"ZonedColumns: {len(boxes)} box(es), {len(zones)} zone(s), " + f"{len(all_geometries)} total columns") + + return (all_geometries, left_x, right_x, top_y, bottom_y, + word_dicts, inv, zones_data, boxes) diff --git a/klausur-service/backend/cv_vocab_pipeline.py b/klausur-service/backend/cv_vocab_pipeline.py index 940381b..1b3eeb3 100644 --- a/klausur-service/backend/cv_vocab_pipeline.py +++ b/klausur-service/backend/cv_vocab_pipeline.py @@ -19,6 +19,7 @@ from cv_preprocessing import * # noqa: F401,F403 from cv_layout import * # noqa: F401,F403 from cv_ocr_engines import * # noqa: F401,F403 from cv_cell_grid import * # noqa: F401,F403 +from cv_box_detect import * # noqa: F401,F403 from cv_review import * # noqa: F401,F403 # Private names used by consumers — not covered by wildcard re-exports. diff --git a/klausur-service/backend/cv_vocab_types.py b/klausur-service/backend/cv_vocab_types.py index 74a6b9c..4673ae3 100644 --- a/klausur-service/backend/cv_vocab_types.py +++ b/klausur-service/backend/cv_vocab_types.py @@ -154,3 +154,27 @@ class DocumentTypeResult: pipeline: str # 'cell_first' | 'full_page' skip_steps: List[str] = field(default_factory=list) # e.g. ['columns', 'rows'] features: Dict[str, Any] = field(default_factory=dict) # debug info + + +@dataclass +class DetectedBox: + """An embedded box (e.g. grammar tip, exercise) detected on the page.""" + x: int # absolute pixel position + y: int + width: int + height: int + confidence: float # 0.0-1.0 + border_thickness: int = 0 + + +@dataclass +class PageZone: + """A horizontal zone of the page — either normal content or a detected box.""" + index: int # 0-based, top to bottom + zone_type: str # 'content' | 'box' + y: int # absolute pixel y + height: int + x: int + width: int + box: Optional[DetectedBox] = None + columns: List[ColumnGeometry] = field(default_factory=list) diff --git a/klausur-service/backend/ocr_pipeline_api.py b/klausur-service/backend/ocr_pipeline_api.py index a65455c..f10ccec 100644 --- a/klausur-service/backend/ocr_pipeline_api.py +++ b/klausur-service/backend/ocr_pipeline_api.py @@ -57,6 +57,7 @@ from cv_vocab_pipeline import ( deskew_image_iterative, deskew_two_pass, detect_column_geometry, + detect_column_geometry_zoned, detect_document_type, detect_row_geometry, expand_narrow_columns, @@ -1001,7 +1002,7 @@ async def detect_type(session_id: str): await _load_session_to_cache(session_id) cached = _get_cached(session_id) - img_bgr = cached.get("cropped_bgr") or cached.get("dewarped_bgr") + img_bgr = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr") if img_bgr is None: raise HTTPException(status_code=400, detail="Crop or dewarp must be completed first") @@ -1052,7 +1053,7 @@ async def detect_columns(session_id: str): await _load_session_to_cache(session_id) cached = _get_cached(session_id) - img_bgr = cached.get("cropped_bgr") or cached.get("dewarped_bgr") + img_bgr = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr") if img_bgr is None: raise HTTPException(status_code=400, detail="Crop or dewarp must be completed before column detection") @@ -1062,21 +1063,26 @@ async def detect_columns(session_id: str): ocr_img = create_ocr_image(img_bgr) h, w = ocr_img.shape[:2] - # Phase A: Geometry detection (returns word_dicts + inv for reuse) - geo_result = detect_column_geometry(ocr_img, img_bgr) + # Phase A: Zone-aware geometry detection + zoned_result = detect_column_geometry_zoned(ocr_img, img_bgr) - if geo_result is None: + if zoned_result is None: # Fallback to projection-based layout layout_img = create_layout_image(img_bgr) regions = analyze_layout(layout_img, ocr_img) + zones_data = None + boxes_detected = 0 else: - geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv = geo_result + geometries, left_x, right_x, top_y, bottom_y, word_dicts, inv, zones_data, boxes = zoned_result content_w = right_x - left_x + boxes_detected = len(boxes) # Cache intermediates for row detection (avoids second Tesseract run) cached["_word_dicts"] = word_dicts cached["_inv"] = inv cached["_content_bounds"] = (left_x, right_x, top_y, bottom_y) + cached["_zones_data"] = zones_data + cached["_boxes_detected"] = boxes_detected # Detect header/footer early so sub-column clustering ignores them header_y, footer_y = _detect_header_footer_gaps(inv, w, h) if inv is not None else (None, None) @@ -1106,8 +1112,13 @@ async def detect_columns(session_id: str): "columns": columns, "classification_methods": methods, "duration_seconds": round(duration, 2), + "boxes_detected": boxes_detected, } + # Add zone data when boxes are present + if zones_data and boxes_detected > 0: + column_result["zones"] = zones_data + # Persist to DB — also invalidate downstream results (rows, words) await update_session_db( session_id, @@ -1124,13 +1135,14 @@ async def detect_columns(session_id: str): col_count = len([c for c in columns if c["type"].startswith("column")]) logger.info(f"OCR Pipeline: columns session {session_id}: " - f"{col_count} columns detected ({duration:.2f}s)") + f"{col_count} columns detected, {boxes_detected} box(es) ({duration:.2f}s)") img_w = img_bgr.shape[1] await _append_pipeline_log(session_id, "columns", { "total_columns": len(columns), "column_widths_pct": [round(c["width"] / img_w * 100, 1) for c in columns], "column_types": [c["type"] for c in columns], + "boxes_detected": boxes_detected, }, duration_ms=int(duration * 1000)) return { @@ -1266,6 +1278,27 @@ async def _get_columns_overlay(session_id: str) -> Response: # Blend overlay at 20% opacity cv2.addWeighted(overlay, 0.2, img, 0.8, 0, img) + # Draw detected box boundaries as dashed rectangles + zones = column_result.get("zones", []) + for zone in zones: + if zone.get("zone_type") == "box" and zone.get("box"): + box = zone["box"] + bx, by = box["x"], box["y"] + bw, bh = box["width"], box["height"] + box_color = (0, 200, 255) # Yellow (BGR) + # Draw dashed rectangle by drawing short line segments + dash_len = 15 + for edge_x in range(bx, bx + bw, dash_len * 2): + end_x = min(edge_x + dash_len, bx + bw) + cv2.line(img, (edge_x, by), (end_x, by), box_color, 2) + cv2.line(img, (edge_x, by + bh), (end_x, by + bh), box_color, 2) + for edge_y in range(by, by + bh, dash_len * 2): + end_y = min(edge_y + dash_len, by + bh) + cv2.line(img, (bx, edge_y), (bx, end_y), box_color, 2) + cv2.line(img, (bx + bw, edge_y), (bx + bw, end_y), box_color, 2) + cv2.putText(img, "BOX", (bx + 10, by + bh - 10), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, box_color, 2) + success, result_png = cv2.imencode(".png", img) if not success: raise HTTPException(status_code=500, detail="Failed to encode overlay image") @@ -1284,7 +1317,7 @@ async def detect_rows(session_id: str): await _load_session_to_cache(session_id) cached = _get_cached(session_id) - dewarped_bgr = cached.get("cropped_bgr") or cached.get("dewarped_bgr") + dewarped_bgr = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr") if dewarped_bgr is None: raise HTTPException(status_code=400, detail="Crop or dewarp must be completed before row detection") @@ -1315,7 +1348,7 @@ async def detect_rows(session_id: str): # Build serializable result (exclude words to keep payload small) rows_data = [] for r in rows: - rows_data.append({ + rd = { "index": r.index, "x": r.x, "y": r.y, @@ -1324,7 +1357,9 @@ async def detect_rows(session_id: str): "word_count": r.word_count, "row_type": r.row_type, "gap_before": r.gap_before, - }) + "zone_index": 0, + } + rows_data.append(rd) type_counts = {} for r in rows: @@ -1456,7 +1491,7 @@ async def detect_words( await _load_session_to_cache(session_id) cached = _get_cached(session_id) - dewarped_bgr = cached.get("cropped_bgr") or cached.get("dewarped_bgr") + dewarped_bgr = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr") if dewarped_bgr is None: logger.warning("detect_words: no cropped/dewarped image for session %s (cache keys: %s)", session_id, [k for k in cached.keys() if k.endswith('_bgr')]) @@ -1560,6 +1595,10 @@ async def detect_words( ) duration = time.time() - t0 + # Add zone_index to each cell (default 0 for backward compatibility) + for cell in cells: + cell.setdefault("zone_index", 0) + # Layout detection col_types = {c['type'] for c in columns_meta} is_vocab = bool(col_types & {'column_en', 'column_de'}) @@ -2749,6 +2788,22 @@ async def _get_rows_overlay(session_id: str) -> Response: # Blend overlay at 15% opacity cv2.addWeighted(overlay, 0.15, img, 0.85, 0, img) + # Draw zone separator lines if zones exist + column_result = session.get("column_result") or {} + zones = column_result.get("zones", []) + if zones: + img_w_px = img.shape[1] + zone_color = (0, 200, 255) # Yellow (BGR) + dash_len = 20 + for zone in zones: + if zone.get("zone_type") == "box": + zy = zone["y"] + zh = zone["height"] + for line_y in [zy, zy + zh]: + for sx in range(0, img_w_px, dash_len * 2): + ex = min(sx + dash_len, img_w_px) + cv2.line(img, (sx, line_y), (ex, line_y), zone_color, 2) + success, result_png = cv2.imencode(".png", img) if not success: raise HTTPException(status_code=500, detail="Failed to encode overlay image") @@ -3182,7 +3237,7 @@ async def run_auto(session_id: str, req: RunAutoRequest, request: Request): yield await _auto_sse_event("columns", "start", {}) try: t0 = time.time() - col_img = cached.get("cropped_bgr") or cached.get("dewarped_bgr") + col_img = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr") if col_img is None: raise ValueError("Cropped/dewarped image not available") @@ -3243,7 +3298,7 @@ async def run_auto(session_id: str, req: RunAutoRequest, request: Request): yield await _auto_sse_event("rows", "start", {}) try: t0 = time.time() - row_img = cached.get("cropped_bgr") or cached.get("dewarped_bgr") + row_img = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr") session = await get_session_db(session_id) column_result = session.get("column_result") or cached.get("column_result") if not column_result or not column_result.get("columns"): @@ -3321,7 +3376,7 @@ async def run_auto(session_id: str, req: RunAutoRequest, request: Request): yield await _auto_sse_event("words", "start", {"engine": req.ocr_engine}) try: t0 = time.time() - word_img = cached.get("cropped_bgr") or cached.get("dewarped_bgr") + word_img = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr") session = await get_session_db(session_id) column_result = session.get("column_result") or cached.get("column_result") diff --git a/klausur-service/backend/tests/test_cv_box_detect.py b/klausur-service/backend/tests/test_cv_box_detect.py new file mode 100644 index 0000000..0f3c73f --- /dev/null +++ b/klausur-service/backend/tests/test_cv_box_detect.py @@ -0,0 +1,226 @@ +""" +Tests for cv_box_detect.py — box detection and page zone splitting. + +Lizenz: Apache 2.0 +""" + +import numpy as np +import pytest + +import cv2 + +from cv_box_detect import detect_boxes, split_page_into_zones +from cv_vocab_types import DetectedBox, PageZone + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _white_image(width: int = 1200, height: int = 1800) -> np.ndarray: + """Create a plain white BGR image.""" + return np.ones((height, width, 3), dtype=np.uint8) * 255 + + +def _draw_bordered_box(img: np.ndarray, x: int, y: int, w: int, h: int, + thickness: int = 3, fill_text: bool = True) -> np.ndarray: + """Draw a bordered box (rectangle) on the image with some inner text.""" + cv2.rectangle(img, (x, y), (x + w, y + h), (0, 0, 0), thickness) + if fill_text: + # Add some dark text inside so the box passes ink-density validation + cv2.putText(img, "Grammar Tip: Use the present perfect.", + (x + 20, y + h // 2), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1) + return img + + +def _draw_colored_box(img: np.ndarray, x: int, y: int, w: int, h: int, + color: tuple = (200, 230, 255)) -> np.ndarray: + """Draw a shaded/colored box (no border lines) with some inner text.""" + cv2.rectangle(img, (x, y), (x + w, y + h), color, -1) + cv2.putText(img, "Exercise: Fill in the blanks.", + (x + 20, y + h // 2), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 1) + return img + + +# --------------------------------------------------------------------------- +# detect_boxes tests +# --------------------------------------------------------------------------- + +class TestDetectBoxes: + """Tests for the detect_boxes() function.""" + + def test_no_boxes_returns_empty(self): + """A plain white image should produce no detected boxes.""" + img = _white_image() + boxes = detect_boxes(img, content_x=50, content_w=1100, + content_y=50, content_h=1700) + assert boxes == [] + + def test_single_border_box(self): + """A single bordered rectangle should be detected.""" + img = _white_image() + _draw_bordered_box(img, x=60, y=500, w=1080, h=200, thickness=3) + + boxes = detect_boxes(img, content_x=50, content_w=1100, + content_y=50, content_h=1700) + + assert len(boxes) >= 1 + box = boxes[0] + assert isinstance(box, DetectedBox) + assert box.confidence > 0 + # Box should roughly be in the right area + assert 400 <= box.y <= 600 + assert box.height >= 100 + + def test_colored_box_fallback(self): + """A colored box without border lines should be detected by color fallback.""" + img = _white_image() + _draw_colored_box(img, x=60, y=600, w=1080, h=180, color=(140, 200, 240)) + + boxes = detect_boxes(img, content_x=50, content_w=1100, + content_y=50, content_h=1700) + + assert len(boxes) >= 1 + box = boxes[0] + assert isinstance(box, DetectedBox) + # Color-detected boxes have lower confidence + assert box.confidence > 0 + + def test_box_too_small_filtered(self): + """A box shorter than 30px should be filtered out.""" + img = _white_image() + # Draw a thin horizontal band (20px high) — should not count as a box + _draw_bordered_box(img, x=60, y=500, w=1080, h=20, thickness=1) + + boxes = detect_boxes(img, content_x=50, content_w=1100, + content_y=50, content_h=1700) + + assert len(boxes) == 0 + + def test_box_too_narrow_filtered(self): + """A box narrower than 60% of content width should be filtered out.""" + img = _white_image() + # Draw a narrow box (only 400px wide on a 1100px content area = 36%) + _draw_bordered_box(img, x=60, y=500, w=400, h=200, thickness=3) + + boxes = detect_boxes(img, content_x=50, content_w=1100, + content_y=50, content_h=1700) + + assert len(boxes) == 0 + + def test_boxes_sorted_by_y(self): + """Multiple boxes should be returned sorted top to bottom.""" + img = _white_image() + _draw_bordered_box(img, x=60, y=1000, w=1080, h=150, thickness=3) + _draw_bordered_box(img, x=60, y=400, w=1080, h=150, thickness=3) + + boxes = detect_boxes(img, content_x=50, content_w=1100, + content_y=50, content_h=1700) + + if len(boxes) >= 2: + assert boxes[0].y <= boxes[1].y + + +# --------------------------------------------------------------------------- +# split_page_into_zones tests +# --------------------------------------------------------------------------- + +class TestSplitPageIntoZones: + """Tests for the split_page_into_zones() function.""" + + def test_split_zones_no_boxes(self): + """Without boxes, should return a single content zone.""" + zones = split_page_into_zones( + content_x=50, content_y=100, content_w=1100, content_h=1600, + boxes=[], + ) + + assert len(zones) == 1 + assert zones[0].zone_type == 'content' + assert zones[0].y == 100 + assert zones[0].height == 1600 + + def test_split_zones_one_box(self): + """One box should create up to 3 zones: above, box, below.""" + box = DetectedBox(x=50, y=500, width=1100, height=200, + confidence=0.8, border_thickness=3) + zones = split_page_into_zones( + content_x=50, content_y=100, content_w=1100, content_h=1600, + boxes=[box], + ) + + # Should have 3 zones: content above, box, content below + assert len(zones) == 3 + assert zones[0].zone_type == 'content' + assert zones[0].y == 100 + assert zones[0].height == 400 # 500 - 100 + + assert zones[1].zone_type == 'box' + assert zones[1].y == 500 + assert zones[1].height == 200 + assert zones[1].box is not None + + assert zones[2].zone_type == 'content' + assert zones[2].y == 700 # 500 + 200 + assert zones[2].height == 1000 # (100+1600) - 700 + + def test_split_zones_two_boxes(self): + """Two boxes should create up to 5 zones.""" + box1 = DetectedBox(x=50, y=400, width=1100, height=150, + confidence=0.8, border_thickness=3) + box2 = DetectedBox(x=50, y=900, width=1100, height=150, + confidence=0.8, border_thickness=3) + zones = split_page_into_zones( + content_x=50, content_y=100, content_w=1100, content_h=1600, + boxes=[box1, box2], + ) + + assert len(zones) == 5 + types = [z.zone_type for z in zones] + assert types == ['content', 'box', 'content', 'box', 'content'] + + def test_split_zones_min_height(self): + """Content zones smaller than min_zone_height should be dropped.""" + # Box very close to the top — gap above is only 10px + box = DetectedBox(x=50, y=110, width=1100, height=200, + confidence=0.8, border_thickness=3) + zones = split_page_into_zones( + content_x=50, content_y=100, content_w=1100, content_h=1600, + boxes=[box], + min_zone_height=40, + ) + + # Gap above box is only 10px < 40px min → should be skipped + assert zones[0].zone_type == 'box' + # Remaining should be content below the box + assert any(z.zone_type == 'content' for z in zones) + + def test_zone_indices_sequential(self): + """Zone indices should be sequential starting from 0.""" + box = DetectedBox(x=50, y=500, width=1100, height=200, + confidence=0.8, border_thickness=3) + zones = split_page_into_zones( + content_x=50, content_y=100, content_w=1100, content_h=1600, + boxes=[box], + ) + + indices = [z.index for z in zones] + assert indices == list(range(len(zones))) + + def test_backward_compat_no_boxes(self): + """Without boxes, result should be identical: single zone covering full area.""" + zones = split_page_into_zones( + content_x=50, content_y=100, content_w=1100, content_h=1600, + boxes=[], + ) + + assert len(zones) == 1 + z = zones[0] + assert z.zone_type == 'content' + assert z.x == 50 + assert z.y == 100 + assert z.width == 1100 + assert z.height == 1600 + assert z.box is None