From 143e41ec76d93725438604d3de71ea3357eff624 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Wed, 18 Mar 2026 08:46:49 +0100 Subject: [PATCH] add: ocr_pipeline_overlays.py for overlay rendering functions Extracted 4 overlay functions (_get_structure_overlay, _get_columns_overlay, _get_rows_overlay, _get_words_overlay) that were missing from the initial split. Provides render_overlay() dispatcher used by sessions module. Co-Authored-By: Claude Opus 4.6 --- .../backend/ocr_pipeline_overlays.py | 547 ++++++++++++++++++ 1 file changed, 547 insertions(+) create mode 100644 klausur-service/backend/ocr_pipeline_overlays.py diff --git a/klausur-service/backend/ocr_pipeline_overlays.py b/klausur-service/backend/ocr_pipeline_overlays.py new file mode 100644 index 0000000..e63ead7 --- /dev/null +++ b/klausur-service/backend/ocr_pipeline_overlays.py @@ -0,0 +1,547 @@ +""" +Overlay image rendering for OCR pipeline. + +Generates visual overlays for structure, columns, rows, and words +detection results. + +Lizenz: Apache 2.0 +DATENSCHUTZ: Alle Verarbeitung erfolgt lokal. +""" + +import logging +from dataclasses import asdict +from typing import Any, Dict, List, Optional + +import cv2 +import numpy as np +from fastapi import HTTPException +from fastapi.responses import Response + +from ocr_pipeline_common import ( + _cache, + _get_base_image_png, + _load_session_to_cache, + _get_cached, +) +from ocr_pipeline_session_store import get_session_db, get_session_image +from cv_color_detect import _COLOR_HEX, _COLOR_RANGES +from cv_box_detect import detect_boxes +from ocr_pipeline_rows import _draw_box_exclusion_overlay + +logger = logging.getLogger(__name__) + + +async def render_overlay(overlay_type: str, session_id: str) -> Response: + """Dispatch to the appropriate overlay renderer.""" + if overlay_type == "structure": + return await _get_structure_overlay(session_id) + elif overlay_type == "columns": + return await _get_columns_overlay(session_id) + elif overlay_type == "rows": + return await _get_rows_overlay(session_id) + elif overlay_type == "words": + return await _get_words_overlay(session_id) + else: + raise HTTPException(status_code=400, detail=f"Unknown overlay type: {overlay_type}") + + +async def _get_structure_overlay(session_id: str) -> Response: + """Generate overlay image showing detected boxes, zones, and color regions.""" + base_png = await _get_base_image_png(session_id) + if not base_png: + raise HTTPException(status_code=404, detail="No base image available") + + arr = np.frombuffer(base_png, dtype=np.uint8) + img = cv2.imdecode(arr, cv2.IMREAD_COLOR) + if img is None: + raise HTTPException(status_code=500, detail="Failed to decode image") + + h, w = img.shape[:2] + + # Get structure result (run detection if not cached) + session = await get_session_db(session_id) + structure = (session or {}).get("structure_result") + + if not structure: + # Run detection on-the-fly + margin = int(min(w, h) * 0.03) + content_x, content_y = margin, margin + content_w_px = w - 2 * margin + content_h_px = h - 2 * margin + boxes = detect_boxes(img, content_x, content_w_px, content_y, content_h_px) + zones = split_page_into_zones(content_x, content_y, content_w_px, content_h_px, boxes) + structure = { + "boxes": [ + {"x": b.x, "y": b.y, "w": b.width, "h": b.height, + "confidence": b.confidence, "border_thickness": b.border_thickness} + for b in boxes + ], + "zones": [ + {"index": z.index, "zone_type": z.zone_type, + "y": z.y, "h": z.height, "x": z.x, "w": z.width} + for z in zones + ], + } + + overlay = img.copy() + + # --- Draw zone boundaries --- + zone_colors = { + "content": (200, 200, 200), # light gray + "box": (255, 180, 0), # blue-ish (BGR) + } + for zone in structure.get("zones", []): + zx = zone["x"] + zy = zone["y"] + zw = zone["w"] + zh = zone["h"] + color = zone_colors.get(zone["zone_type"], (200, 200, 200)) + + # Draw zone boundary as dashed line + dash_len = 12 + for edge_x in range(zx, zx + zw, dash_len * 2): + end_x = min(edge_x + dash_len, zx + zw) + cv2.line(img, (edge_x, zy), (end_x, zy), color, 1) + cv2.line(img, (edge_x, zy + zh), (end_x, zy + zh), color, 1) + + # Zone label + zone_label = f"Zone {zone['index']} ({zone['zone_type']})" + cv2.putText(img, zone_label, (zx + 5, zy + 15), + cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 1) + + # --- Draw detected boxes --- + # Color map for box backgrounds (BGR) + bg_hex_to_bgr = { + "#dc2626": (38, 38, 220), # red + "#2563eb": (235, 99, 37), # blue + "#16a34a": (74, 163, 22), # green + "#ea580c": (12, 88, 234), # orange + "#9333ea": (234, 51, 147), # purple + "#ca8a04": (4, 138, 202), # yellow + "#6b7280": (128, 114, 107), # gray + } + + for box_data in structure.get("boxes", []): + bx = box_data["x"] + by = box_data["y"] + bw = box_data["w"] + bh = box_data["h"] + conf = box_data.get("confidence", 0) + thickness = box_data.get("border_thickness", 0) + bg_hex = box_data.get("bg_color_hex", "#6b7280") + bg_name = box_data.get("bg_color_name", "") + + # Box fill color + fill_bgr = bg_hex_to_bgr.get(bg_hex, (128, 114, 107)) + + # Semi-transparent fill + cv2.rectangle(overlay, (bx, by), (bx + bw, by + bh), fill_bgr, -1) + + # Solid border + border_color = fill_bgr + cv2.rectangle(img, (bx, by), (bx + bw, by + bh), border_color, 3) + + # Label + label = f"BOX" + if bg_name and bg_name not in ("unknown", "white"): + label += f" ({bg_name})" + if thickness > 0: + label += f" border={thickness}px" + label += f" {int(conf * 100)}%" + cv2.putText(img, label, (bx + 8, by + 22), + cv2.FONT_HERSHEY_SIMPLEX, 0.55, (255, 255, 255), 2) + cv2.putText(img, label, (bx + 8, by + 22), + cv2.FONT_HERSHEY_SIMPLEX, 0.55, border_color, 1) + + # Blend overlay at 15% opacity + cv2.addWeighted(overlay, 0.15, img, 0.85, 0, img) + + # --- Draw color regions (HSV masks) --- + hsv = cv2.cvtColor( + cv2.imdecode(np.frombuffer(base_png, dtype=np.uint8), cv2.IMREAD_COLOR), + cv2.COLOR_BGR2HSV, + ) + color_bgr_map = { + "red": (0, 0, 255), + "orange": (0, 140, 255), + "yellow": (0, 200, 255), + "green": (0, 200, 0), + "blue": (255, 150, 0), + "purple": (200, 0, 200), + } + for color_name, ranges in _COLOR_RANGES.items(): + mask = np.zeros((h, w), dtype=np.uint8) + for lower, upper in ranges: + mask = cv2.bitwise_or(mask, cv2.inRange(hsv, lower, upper)) + # Only draw if there are significant colored pixels + if np.sum(mask > 0) < 100: + continue + # Draw colored contours + contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + draw_color = color_bgr_map.get(color_name, (200, 200, 200)) + for cnt in contours: + area = cv2.contourArea(cnt) + if area < 20: + continue + cv2.drawContours(img, [cnt], -1, draw_color, 2) + + # --- Draw graphic elements --- + graphics_data = structure.get("graphics", []) + shape_icons = { + "image": "IMAGE", + "illustration": "ILLUST", + } + for gfx in graphics_data: + gx, gy = gfx["x"], gfx["y"] + gw, gh = gfx["w"], gfx["h"] + shape = gfx.get("shape", "icon") + color_hex = gfx.get("color_hex", "#6b7280") + conf = gfx.get("confidence", 0) + + # Pick draw color based on element color (BGR) + gfx_bgr = bg_hex_to_bgr.get(color_hex, (128, 114, 107)) + + # Draw bounding box (dashed style via short segments) + dash = 6 + for seg_x in range(gx, gx + gw, dash * 2): + end_x = min(seg_x + dash, gx + gw) + cv2.line(img, (seg_x, gy), (end_x, gy), gfx_bgr, 2) + cv2.line(img, (seg_x, gy + gh), (end_x, gy + gh), gfx_bgr, 2) + for seg_y in range(gy, gy + gh, dash * 2): + end_y = min(seg_y + dash, gy + gh) + cv2.line(img, (gx, seg_y), (gx, end_y), gfx_bgr, 2) + cv2.line(img, (gx + gw, seg_y), (gx + gw, end_y), gfx_bgr, 2) + + # Label + icon = shape_icons.get(shape, shape.upper()[:5]) + label = f"{icon} {int(conf * 100)}%" + # White background for readability + (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1) + lx = gx + 2 + ly = max(gy - 4, th + 4) + cv2.rectangle(img, (lx - 1, ly - th - 2), (lx + tw + 2, ly + 3), (255, 255, 255), -1) + cv2.putText(img, label, (lx, ly), cv2.FONT_HERSHEY_SIMPLEX, 0.4, gfx_bgr, 1) + + # Encode result + _, png_buf = cv2.imencode(".png", img) + return Response(content=png_buf.tobytes(), media_type="image/png") + + + +async def _get_columns_overlay(session_id: str) -> Response: + """Generate cropped (or dewarped) image with column borders drawn on it.""" + session = await get_session_db(session_id) + if not session: + raise HTTPException(status_code=404, detail=f"Session {session_id} not found") + + column_result = session.get("column_result") + if not column_result or not column_result.get("columns"): + raise HTTPException(status_code=404, detail="No column data available") + + # Load best available base image (cropped > dewarped > original) + base_png = await _get_base_image_png(session_id) + if not base_png: + raise HTTPException(status_code=404, detail="No base image available") + + arr = np.frombuffer(base_png, dtype=np.uint8) + img = cv2.imdecode(arr, cv2.IMREAD_COLOR) + if img is None: + raise HTTPException(status_code=500, detail="Failed to decode image") + + # Color map for region types (BGR) + colors = { + "column_en": (255, 180, 0), # Blue + "column_de": (0, 200, 0), # Green + "column_example": (0, 140, 255), # Orange + "column_text": (200, 200, 0), # Cyan/Turquoise + "page_ref": (200, 0, 200), # Purple + "column_marker": (0, 0, 220), # Red + "column_ignore": (180, 180, 180), # Light Gray + "header": (128, 128, 128), # Gray + "footer": (128, 128, 128), # Gray + "margin_top": (100, 100, 100), # Dark Gray + "margin_bottom": (100, 100, 100), # Dark Gray + } + + overlay = img.copy() + for col in column_result["columns"]: + x, y = col["x"], col["y"] + w, h = col["width"], col["height"] + color = colors.get(col.get("type", ""), (200, 200, 200)) + + # Semi-transparent fill + cv2.rectangle(overlay, (x, y), (x + w, y + h), color, -1) + + # Solid border + cv2.rectangle(img, (x, y), (x + w, y + h), color, 3) + + # Label with confidence + label = col.get("type", "unknown").replace("column_", "").upper() + conf = col.get("classification_confidence") + if conf is not None and conf < 1.0: + label = f"{label} {int(conf * 100)}%" + cv2.putText(img, label, (x + 10, y + 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2) + + # Blend overlay at 20% opacity + cv2.addWeighted(overlay, 0.2, img, 0.8, 0, img) + + # Draw detected box boundaries as dashed rectangles + zones = column_result.get("zones") or [] + for zone in zones: + if zone.get("zone_type") == "box" and zone.get("box"): + box = zone["box"] + bx, by = box["x"], box["y"] + bw, bh = box["width"], box["height"] + box_color = (0, 200, 255) # Yellow (BGR) + # Draw dashed rectangle by drawing short line segments + dash_len = 15 + for edge_x in range(bx, bx + bw, dash_len * 2): + end_x = min(edge_x + dash_len, bx + bw) + cv2.line(img, (edge_x, by), (end_x, by), box_color, 2) + cv2.line(img, (edge_x, by + bh), (end_x, by + bh), box_color, 2) + for edge_y in range(by, by + bh, dash_len * 2): + end_y = min(edge_y + dash_len, by + bh) + cv2.line(img, (bx, edge_y), (bx, end_y), box_color, 2) + cv2.line(img, (bx + bw, edge_y), (bx + bw, end_y), box_color, 2) + cv2.putText(img, "BOX", (bx + 10, by + bh - 10), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, box_color, 2) + + # Red semi-transparent overlay for box zones + _draw_box_exclusion_overlay(img, zones) + + success, result_png = cv2.imencode(".png", img) + if not success: + raise HTTPException(status_code=500, detail="Failed to encode overlay image") + + return Response(content=result_png.tobytes(), media_type="image/png") + + +# --------------------------------------------------------------------------- +# Row Detection Endpoints +# --------------------------------------------------------------------------- + + + +async def _get_rows_overlay(session_id: str) -> Response: + """Generate cropped (or dewarped) image with row bands drawn on it.""" + session = await get_session_db(session_id) + if not session: + raise HTTPException(status_code=404, detail=f"Session {session_id} not found") + + row_result = session.get("row_result") + if not row_result or not row_result.get("rows"): + raise HTTPException(status_code=404, detail="No row data available") + + # Load best available base image (cropped > dewarped > original) + base_png = await _get_base_image_png(session_id) + if not base_png: + raise HTTPException(status_code=404, detail="No base image available") + + arr = np.frombuffer(base_png, dtype=np.uint8) + img = cv2.imdecode(arr, cv2.IMREAD_COLOR) + if img is None: + raise HTTPException(status_code=500, detail="Failed to decode image") + + # Color map for row types (BGR) + row_colors = { + "content": (255, 180, 0), # Blue + "header": (128, 128, 128), # Gray + "footer": (128, 128, 128), # Gray + "margin_top": (100, 100, 100), # Dark Gray + "margin_bottom": (100, 100, 100), # Dark Gray + } + + overlay = img.copy() + for row in row_result["rows"]: + x, y = row["x"], row["y"] + w, h = row["width"], row["height"] + row_type = row.get("row_type", "content") + color = row_colors.get(row_type, (200, 200, 200)) + + # Semi-transparent fill + cv2.rectangle(overlay, (x, y), (x + w, y + h), color, -1) + + # Solid border + cv2.rectangle(img, (x, y), (x + w, y + h), color, 2) + + # Label + idx = row.get("index", 0) + label = f"R{idx} {row_type.upper()}" + wc = row.get("word_count", 0) + if wc: + label = f"{label} ({wc}w)" + cv2.putText(img, label, (x + 5, y + 18), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1) + + # Blend overlay at 15% opacity + cv2.addWeighted(overlay, 0.15, img, 0.85, 0, img) + + # Draw zone separator lines if zones exist + column_result = session.get("column_result") or {} + zones = column_result.get("zones") or [] + if zones: + img_w_px = img.shape[1] + zone_color = (0, 200, 255) # Yellow (BGR) + dash_len = 20 + for zone in zones: + if zone.get("zone_type") == "box": + zy = zone["y"] + zh = zone["height"] + for line_y in [zy, zy + zh]: + for sx in range(0, img_w_px, dash_len * 2): + ex = min(sx + dash_len, img_w_px) + cv2.line(img, (sx, line_y), (ex, line_y), zone_color, 2) + + # Red semi-transparent overlay for box zones + _draw_box_exclusion_overlay(img, zones) + + success, result_png = cv2.imencode(".png", img) + if not success: + raise HTTPException(status_code=500, detail="Failed to encode overlay image") + + return Response(content=result_png.tobytes(), media_type="image/png") + + + +async def _get_words_overlay(session_id: str) -> Response: + """Generate cropped (or dewarped) image with cell grid drawn on it.""" + session = await get_session_db(session_id) + if not session: + raise HTTPException(status_code=404, detail=f"Session {session_id} not found") + + word_result = session.get("word_result") + if not word_result: + raise HTTPException(status_code=404, detail="No word data available") + + # Support both new cell-based and legacy entry-based formats + cells = word_result.get("cells") + if not cells and not word_result.get("entries"): + raise HTTPException(status_code=404, detail="No word data available") + + # Load best available base image (cropped > dewarped > original) + base_png = await _get_base_image_png(session_id) + if not base_png: + raise HTTPException(status_code=404, detail="No base image available") + + arr = np.frombuffer(base_png, dtype=np.uint8) + img = cv2.imdecode(arr, cv2.IMREAD_COLOR) + if img is None: + raise HTTPException(status_code=500, detail="Failed to decode image") + + img_h, img_w = img.shape[:2] + + overlay = img.copy() + + if cells: + # New cell-based overlay: color by column index + col_palette = [ + (255, 180, 0), # Blue (BGR) + (0, 200, 0), # Green + (0, 140, 255), # Orange + (200, 100, 200), # Purple + (200, 200, 0), # Cyan + (100, 200, 200), # Yellow-ish + ] + + for cell in cells: + bbox = cell.get("bbox_px", {}) + cx = bbox.get("x", 0) + cy = bbox.get("y", 0) + cw = bbox.get("w", 0) + ch = bbox.get("h", 0) + if cw <= 0 or ch <= 0: + continue + + col_idx = cell.get("col_index", 0) + color = col_palette[col_idx % len(col_palette)] + + # Cell rectangle border + cv2.rectangle(img, (cx, cy), (cx + cw, cy + ch), color, 1) + # Semi-transparent fill + cv2.rectangle(overlay, (cx, cy), (cx + cw, cy + ch), color, -1) + + # Cell-ID label (top-left corner) + cell_id = cell.get("cell_id", "") + cv2.putText(img, cell_id, (cx + 2, cy + 10), + cv2.FONT_HERSHEY_SIMPLEX, 0.28, color, 1) + + # Text label (bottom of cell) + text = cell.get("text", "") + if text: + conf = cell.get("confidence", 0) + if conf >= 70: + text_color = (0, 180, 0) + elif conf >= 50: + text_color = (0, 180, 220) + else: + text_color = (0, 0, 220) + + label = text.replace('\n', ' ')[:30] + cv2.putText(img, label, (cx + 3, cy + ch - 4), + cv2.FONT_HERSHEY_SIMPLEX, 0.35, text_color, 1) + else: + # Legacy fallback: entry-based overlay (for old sessions) + column_result = session.get("column_result") + row_result = session.get("row_result") + col_colors = { + "column_en": (255, 180, 0), + "column_de": (0, 200, 0), + "column_example": (0, 140, 255), + } + + columns = [] + if column_result and column_result.get("columns"): + columns = [c for c in column_result["columns"] + if c.get("type", "").startswith("column_")] + + content_rows_data = [] + if row_result and row_result.get("rows"): + content_rows_data = [r for r in row_result["rows"] + if r.get("row_type") == "content"] + + for col in columns: + col_type = col.get("type", "") + color = col_colors.get(col_type, (200, 200, 200)) + cx, cw = col["x"], col["width"] + for row in content_rows_data: + ry, rh = row["y"], row["height"] + cv2.rectangle(img, (cx, ry), (cx + cw, ry + rh), color, 1) + cv2.rectangle(overlay, (cx, ry), (cx + cw, ry + rh), color, -1) + + entries = word_result["entries"] + entry_by_row: Dict[int, Dict] = {} + for entry in entries: + entry_by_row[entry.get("row_index", -1)] = entry + + for row_idx, row in enumerate(content_rows_data): + entry = entry_by_row.get(row_idx) + if not entry: + continue + conf = entry.get("confidence", 0) + text_color = (0, 180, 0) if conf >= 70 else (0, 180, 220) if conf >= 50 else (0, 0, 220) + ry, rh = row["y"], row["height"] + for col in columns: + col_type = col.get("type", "") + cx, cw = col["x"], col["width"] + field = {"column_en": "english", "column_de": "german", "column_example": "example"}.get(col_type, "") + text = entry.get(field, "") if field else "" + if text: + label = text.replace('\n', ' ')[:30] + cv2.putText(img, label, (cx + 3, ry + rh - 4), + cv2.FONT_HERSHEY_SIMPLEX, 0.35, text_color, 1) + + # Blend overlay at 10% opacity + cv2.addWeighted(overlay, 0.1, img, 0.9, 0, img) + + # Red semi-transparent overlay for box zones + column_result = session.get("column_result") or {} + zones = column_result.get("zones") or [] + _draw_box_exclusion_overlay(img, zones) + + success, result_png = cv2.imencode(".png", img) + if not success: + raise HTTPException(status_code=500, detail="Failed to encode overlay image") + + return Response(content=result_png.tobytes(), media_type="image/png") +