add: ocr_pipeline_overlays.py for overlay rendering functions

Extracted 4 overlay functions (_get_structure_overlay, _get_columns_overlay,
_get_rows_overlay, _get_words_overlay) that were missing from the initial
split. Provides render_overlay() dispatcher used by sessions module.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-18 08:46:49 +01:00
parent ec287fd12e
commit 143e41ec76

View File

@@ -0,0 +1,547 @@
"""
Overlay image rendering for OCR pipeline.
Generates visual overlays for structure, columns, rows, and words
detection results.
Lizenz: Apache 2.0
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import logging
from dataclasses import asdict
from typing import Any, Dict, List, Optional
import cv2
import numpy as np
from fastapi import HTTPException
from fastapi.responses import Response
from ocr_pipeline_common import (
_cache,
_get_base_image_png,
_load_session_to_cache,
_get_cached,
)
from ocr_pipeline_session_store import get_session_db, get_session_image
from cv_color_detect import _COLOR_HEX, _COLOR_RANGES
from cv_box_detect import detect_boxes
from ocr_pipeline_rows import _draw_box_exclusion_overlay
logger = logging.getLogger(__name__)
async def render_overlay(overlay_type: str, session_id: str) -> Response:
"""Dispatch to the appropriate overlay renderer."""
if overlay_type == "structure":
return await _get_structure_overlay(session_id)
elif overlay_type == "columns":
return await _get_columns_overlay(session_id)
elif overlay_type == "rows":
return await _get_rows_overlay(session_id)
elif overlay_type == "words":
return await _get_words_overlay(session_id)
else:
raise HTTPException(status_code=400, detail=f"Unknown overlay type: {overlay_type}")
async def _get_structure_overlay(session_id: str) -> Response:
"""Generate overlay image showing detected boxes, zones, and color regions."""
base_png = await _get_base_image_png(session_id)
if not base_png:
raise HTTPException(status_code=404, detail="No base image available")
arr = np.frombuffer(base_png, dtype=np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if img is None:
raise HTTPException(status_code=500, detail="Failed to decode image")
h, w = img.shape[:2]
# Get structure result (run detection if not cached)
session = await get_session_db(session_id)
structure = (session or {}).get("structure_result")
if not structure:
# Run detection on-the-fly
margin = int(min(w, h) * 0.03)
content_x, content_y = margin, margin
content_w_px = w - 2 * margin
content_h_px = h - 2 * margin
boxes = detect_boxes(img, content_x, content_w_px, content_y, content_h_px)
zones = split_page_into_zones(content_x, content_y, content_w_px, content_h_px, boxes)
structure = {
"boxes": [
{"x": b.x, "y": b.y, "w": b.width, "h": b.height,
"confidence": b.confidence, "border_thickness": b.border_thickness}
for b in boxes
],
"zones": [
{"index": z.index, "zone_type": z.zone_type,
"y": z.y, "h": z.height, "x": z.x, "w": z.width}
for z in zones
],
}
overlay = img.copy()
# --- Draw zone boundaries ---
zone_colors = {
"content": (200, 200, 200), # light gray
"box": (255, 180, 0), # blue-ish (BGR)
}
for zone in structure.get("zones", []):
zx = zone["x"]
zy = zone["y"]
zw = zone["w"]
zh = zone["h"]
color = zone_colors.get(zone["zone_type"], (200, 200, 200))
# Draw zone boundary as dashed line
dash_len = 12
for edge_x in range(zx, zx + zw, dash_len * 2):
end_x = min(edge_x + dash_len, zx + zw)
cv2.line(img, (edge_x, zy), (end_x, zy), color, 1)
cv2.line(img, (edge_x, zy + zh), (end_x, zy + zh), color, 1)
# Zone label
zone_label = f"Zone {zone['index']} ({zone['zone_type']})"
cv2.putText(img, zone_label, (zx + 5, zy + 15),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 1)
# --- Draw detected boxes ---
# Color map for box backgrounds (BGR)
bg_hex_to_bgr = {
"#dc2626": (38, 38, 220), # red
"#2563eb": (235, 99, 37), # blue
"#16a34a": (74, 163, 22), # green
"#ea580c": (12, 88, 234), # orange
"#9333ea": (234, 51, 147), # purple
"#ca8a04": (4, 138, 202), # yellow
"#6b7280": (128, 114, 107), # gray
}
for box_data in structure.get("boxes", []):
bx = box_data["x"]
by = box_data["y"]
bw = box_data["w"]
bh = box_data["h"]
conf = box_data.get("confidence", 0)
thickness = box_data.get("border_thickness", 0)
bg_hex = box_data.get("bg_color_hex", "#6b7280")
bg_name = box_data.get("bg_color_name", "")
# Box fill color
fill_bgr = bg_hex_to_bgr.get(bg_hex, (128, 114, 107))
# Semi-transparent fill
cv2.rectangle(overlay, (bx, by), (bx + bw, by + bh), fill_bgr, -1)
# Solid border
border_color = fill_bgr
cv2.rectangle(img, (bx, by), (bx + bw, by + bh), border_color, 3)
# Label
label = f"BOX"
if bg_name and bg_name not in ("unknown", "white"):
label += f" ({bg_name})"
if thickness > 0:
label += f" border={thickness}px"
label += f" {int(conf * 100)}%"
cv2.putText(img, label, (bx + 8, by + 22),
cv2.FONT_HERSHEY_SIMPLEX, 0.55, (255, 255, 255), 2)
cv2.putText(img, label, (bx + 8, by + 22),
cv2.FONT_HERSHEY_SIMPLEX, 0.55, border_color, 1)
# Blend overlay at 15% opacity
cv2.addWeighted(overlay, 0.15, img, 0.85, 0, img)
# --- Draw color regions (HSV masks) ---
hsv = cv2.cvtColor(
cv2.imdecode(np.frombuffer(base_png, dtype=np.uint8), cv2.IMREAD_COLOR),
cv2.COLOR_BGR2HSV,
)
color_bgr_map = {
"red": (0, 0, 255),
"orange": (0, 140, 255),
"yellow": (0, 200, 255),
"green": (0, 200, 0),
"blue": (255, 150, 0),
"purple": (200, 0, 200),
}
for color_name, ranges in _COLOR_RANGES.items():
mask = np.zeros((h, w), dtype=np.uint8)
for lower, upper in ranges:
mask = cv2.bitwise_or(mask, cv2.inRange(hsv, lower, upper))
# Only draw if there are significant colored pixels
if np.sum(mask > 0) < 100:
continue
# Draw colored contours
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
draw_color = color_bgr_map.get(color_name, (200, 200, 200))
for cnt in contours:
area = cv2.contourArea(cnt)
if area < 20:
continue
cv2.drawContours(img, [cnt], -1, draw_color, 2)
# --- Draw graphic elements ---
graphics_data = structure.get("graphics", [])
shape_icons = {
"image": "IMAGE",
"illustration": "ILLUST",
}
for gfx in graphics_data:
gx, gy = gfx["x"], gfx["y"]
gw, gh = gfx["w"], gfx["h"]
shape = gfx.get("shape", "icon")
color_hex = gfx.get("color_hex", "#6b7280")
conf = gfx.get("confidence", 0)
# Pick draw color based on element color (BGR)
gfx_bgr = bg_hex_to_bgr.get(color_hex, (128, 114, 107))
# Draw bounding box (dashed style via short segments)
dash = 6
for seg_x in range(gx, gx + gw, dash * 2):
end_x = min(seg_x + dash, gx + gw)
cv2.line(img, (seg_x, gy), (end_x, gy), gfx_bgr, 2)
cv2.line(img, (seg_x, gy + gh), (end_x, gy + gh), gfx_bgr, 2)
for seg_y in range(gy, gy + gh, dash * 2):
end_y = min(seg_y + dash, gy + gh)
cv2.line(img, (gx, seg_y), (gx, end_y), gfx_bgr, 2)
cv2.line(img, (gx + gw, seg_y), (gx + gw, end_y), gfx_bgr, 2)
# Label
icon = shape_icons.get(shape, shape.upper()[:5])
label = f"{icon} {int(conf * 100)}%"
# White background for readability
(tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)
lx = gx + 2
ly = max(gy - 4, th + 4)
cv2.rectangle(img, (lx - 1, ly - th - 2), (lx + tw + 2, ly + 3), (255, 255, 255), -1)
cv2.putText(img, label, (lx, ly), cv2.FONT_HERSHEY_SIMPLEX, 0.4, gfx_bgr, 1)
# Encode result
_, png_buf = cv2.imencode(".png", img)
return Response(content=png_buf.tobytes(), media_type="image/png")
async def _get_columns_overlay(session_id: str) -> Response:
"""Generate cropped (or dewarped) image with column borders drawn on it."""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
column_result = session.get("column_result")
if not column_result or not column_result.get("columns"):
raise HTTPException(status_code=404, detail="No column data available")
# Load best available base image (cropped > dewarped > original)
base_png = await _get_base_image_png(session_id)
if not base_png:
raise HTTPException(status_code=404, detail="No base image available")
arr = np.frombuffer(base_png, dtype=np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if img is None:
raise HTTPException(status_code=500, detail="Failed to decode image")
# Color map for region types (BGR)
colors = {
"column_en": (255, 180, 0), # Blue
"column_de": (0, 200, 0), # Green
"column_example": (0, 140, 255), # Orange
"column_text": (200, 200, 0), # Cyan/Turquoise
"page_ref": (200, 0, 200), # Purple
"column_marker": (0, 0, 220), # Red
"column_ignore": (180, 180, 180), # Light Gray
"header": (128, 128, 128), # Gray
"footer": (128, 128, 128), # Gray
"margin_top": (100, 100, 100), # Dark Gray
"margin_bottom": (100, 100, 100), # Dark Gray
}
overlay = img.copy()
for col in column_result["columns"]:
x, y = col["x"], col["y"]
w, h = col["width"], col["height"]
color = colors.get(col.get("type", ""), (200, 200, 200))
# Semi-transparent fill
cv2.rectangle(overlay, (x, y), (x + w, y + h), color, -1)
# Solid border
cv2.rectangle(img, (x, y), (x + w, y + h), color, 3)
# Label with confidence
label = col.get("type", "unknown").replace("column_", "").upper()
conf = col.get("classification_confidence")
if conf is not None and conf < 1.0:
label = f"{label} {int(conf * 100)}%"
cv2.putText(img, label, (x + 10, y + 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
# Blend overlay at 20% opacity
cv2.addWeighted(overlay, 0.2, img, 0.8, 0, img)
# Draw detected box boundaries as dashed rectangles
zones = column_result.get("zones") or []
for zone in zones:
if zone.get("zone_type") == "box" and zone.get("box"):
box = zone["box"]
bx, by = box["x"], box["y"]
bw, bh = box["width"], box["height"]
box_color = (0, 200, 255) # Yellow (BGR)
# Draw dashed rectangle by drawing short line segments
dash_len = 15
for edge_x in range(bx, bx + bw, dash_len * 2):
end_x = min(edge_x + dash_len, bx + bw)
cv2.line(img, (edge_x, by), (end_x, by), box_color, 2)
cv2.line(img, (edge_x, by + bh), (end_x, by + bh), box_color, 2)
for edge_y in range(by, by + bh, dash_len * 2):
end_y = min(edge_y + dash_len, by + bh)
cv2.line(img, (bx, edge_y), (bx, end_y), box_color, 2)
cv2.line(img, (bx + bw, edge_y), (bx + bw, end_y), box_color, 2)
cv2.putText(img, "BOX", (bx + 10, by + bh - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, box_color, 2)
# Red semi-transparent overlay for box zones
_draw_box_exclusion_overlay(img, zones)
success, result_png = cv2.imencode(".png", img)
if not success:
raise HTTPException(status_code=500, detail="Failed to encode overlay image")
return Response(content=result_png.tobytes(), media_type="image/png")
# ---------------------------------------------------------------------------
# Row Detection Endpoints
# ---------------------------------------------------------------------------
async def _get_rows_overlay(session_id: str) -> Response:
"""Generate cropped (or dewarped) image with row bands drawn on it."""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
row_result = session.get("row_result")
if not row_result or not row_result.get("rows"):
raise HTTPException(status_code=404, detail="No row data available")
# Load best available base image (cropped > dewarped > original)
base_png = await _get_base_image_png(session_id)
if not base_png:
raise HTTPException(status_code=404, detail="No base image available")
arr = np.frombuffer(base_png, dtype=np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if img is None:
raise HTTPException(status_code=500, detail="Failed to decode image")
# Color map for row types (BGR)
row_colors = {
"content": (255, 180, 0), # Blue
"header": (128, 128, 128), # Gray
"footer": (128, 128, 128), # Gray
"margin_top": (100, 100, 100), # Dark Gray
"margin_bottom": (100, 100, 100), # Dark Gray
}
overlay = img.copy()
for row in row_result["rows"]:
x, y = row["x"], row["y"]
w, h = row["width"], row["height"]
row_type = row.get("row_type", "content")
color = row_colors.get(row_type, (200, 200, 200))
# Semi-transparent fill
cv2.rectangle(overlay, (x, y), (x + w, y + h), color, -1)
# Solid border
cv2.rectangle(img, (x, y), (x + w, y + h), color, 2)
# Label
idx = row.get("index", 0)
label = f"R{idx} {row_type.upper()}"
wc = row.get("word_count", 0)
if wc:
label = f"{label} ({wc}w)"
cv2.putText(img, label, (x + 5, y + 18),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
# Blend overlay at 15% opacity
cv2.addWeighted(overlay, 0.15, img, 0.85, 0, img)
# Draw zone separator lines if zones exist
column_result = session.get("column_result") or {}
zones = column_result.get("zones") or []
if zones:
img_w_px = img.shape[1]
zone_color = (0, 200, 255) # Yellow (BGR)
dash_len = 20
for zone in zones:
if zone.get("zone_type") == "box":
zy = zone["y"]
zh = zone["height"]
for line_y in [zy, zy + zh]:
for sx in range(0, img_w_px, dash_len * 2):
ex = min(sx + dash_len, img_w_px)
cv2.line(img, (sx, line_y), (ex, line_y), zone_color, 2)
# Red semi-transparent overlay for box zones
_draw_box_exclusion_overlay(img, zones)
success, result_png = cv2.imencode(".png", img)
if not success:
raise HTTPException(status_code=500, detail="Failed to encode overlay image")
return Response(content=result_png.tobytes(), media_type="image/png")
async def _get_words_overlay(session_id: str) -> Response:
"""Generate cropped (or dewarped) image with cell grid drawn on it."""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
word_result = session.get("word_result")
if not word_result:
raise HTTPException(status_code=404, detail="No word data available")
# Support both new cell-based and legacy entry-based formats
cells = word_result.get("cells")
if not cells and not word_result.get("entries"):
raise HTTPException(status_code=404, detail="No word data available")
# Load best available base image (cropped > dewarped > original)
base_png = await _get_base_image_png(session_id)
if not base_png:
raise HTTPException(status_code=404, detail="No base image available")
arr = np.frombuffer(base_png, dtype=np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if img is None:
raise HTTPException(status_code=500, detail="Failed to decode image")
img_h, img_w = img.shape[:2]
overlay = img.copy()
if cells:
# New cell-based overlay: color by column index
col_palette = [
(255, 180, 0), # Blue (BGR)
(0, 200, 0), # Green
(0, 140, 255), # Orange
(200, 100, 200), # Purple
(200, 200, 0), # Cyan
(100, 200, 200), # Yellow-ish
]
for cell in cells:
bbox = cell.get("bbox_px", {})
cx = bbox.get("x", 0)
cy = bbox.get("y", 0)
cw = bbox.get("w", 0)
ch = bbox.get("h", 0)
if cw <= 0 or ch <= 0:
continue
col_idx = cell.get("col_index", 0)
color = col_palette[col_idx % len(col_palette)]
# Cell rectangle border
cv2.rectangle(img, (cx, cy), (cx + cw, cy + ch), color, 1)
# Semi-transparent fill
cv2.rectangle(overlay, (cx, cy), (cx + cw, cy + ch), color, -1)
# Cell-ID label (top-left corner)
cell_id = cell.get("cell_id", "")
cv2.putText(img, cell_id, (cx + 2, cy + 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.28, color, 1)
# Text label (bottom of cell)
text = cell.get("text", "")
if text:
conf = cell.get("confidence", 0)
if conf >= 70:
text_color = (0, 180, 0)
elif conf >= 50:
text_color = (0, 180, 220)
else:
text_color = (0, 0, 220)
label = text.replace('\n', ' ')[:30]
cv2.putText(img, label, (cx + 3, cy + ch - 4),
cv2.FONT_HERSHEY_SIMPLEX, 0.35, text_color, 1)
else:
# Legacy fallback: entry-based overlay (for old sessions)
column_result = session.get("column_result")
row_result = session.get("row_result")
col_colors = {
"column_en": (255, 180, 0),
"column_de": (0, 200, 0),
"column_example": (0, 140, 255),
}
columns = []
if column_result and column_result.get("columns"):
columns = [c for c in column_result["columns"]
if c.get("type", "").startswith("column_")]
content_rows_data = []
if row_result and row_result.get("rows"):
content_rows_data = [r for r in row_result["rows"]
if r.get("row_type") == "content"]
for col in columns:
col_type = col.get("type", "")
color = col_colors.get(col_type, (200, 200, 200))
cx, cw = col["x"], col["width"]
for row in content_rows_data:
ry, rh = row["y"], row["height"]
cv2.rectangle(img, (cx, ry), (cx + cw, ry + rh), color, 1)
cv2.rectangle(overlay, (cx, ry), (cx + cw, ry + rh), color, -1)
entries = word_result["entries"]
entry_by_row: Dict[int, Dict] = {}
for entry in entries:
entry_by_row[entry.get("row_index", -1)] = entry
for row_idx, row in enumerate(content_rows_data):
entry = entry_by_row.get(row_idx)
if not entry:
continue
conf = entry.get("confidence", 0)
text_color = (0, 180, 0) if conf >= 70 else (0, 180, 220) if conf >= 50 else (0, 0, 220)
ry, rh = row["y"], row["height"]
for col in columns:
col_type = col.get("type", "")
cx, cw = col["x"], col["width"]
field = {"column_en": "english", "column_de": "german", "column_example": "example"}.get(col_type, "")
text = entry.get(field, "") if field else ""
if text:
label = text.replace('\n', ' ')[:30]
cv2.putText(img, label, (cx + 3, ry + rh - 4),
cv2.FONT_HERSHEY_SIMPLEX, 0.35, text_color, 1)
# Blend overlay at 10% opacity
cv2.addWeighted(overlay, 0.1, img, 0.9, 0, img)
# Red semi-transparent overlay for box zones
column_result = session.get("column_result") or {}
zones = column_result.get("zones") or []
_draw_box_exclusion_overlay(img, zones)
success, result_png = cv2.imencode(".png", img)
if not success:
raise HTTPException(status_code=500, detail="Failed to encode overlay image")
return Response(content=result_png.tobytes(), media_type="image/png")