Services: Admin-Lehrer, Backend-Lehrer, Studio v2, Website, Klausur-Service, School-Service, Voice-Service, Geo-Service, BreakPilot Drive, Agent-Core Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
384 lines
11 KiB
Python
384 lines
11 KiB
Python
"""
|
|
Inpainting Service for Worksheet Cleanup
|
|
|
|
Removes handwriting from scanned worksheets using inpainting techniques.
|
|
Supports multiple backends:
|
|
1. OpenCV (Telea/NS algorithms) - Fast, CPU-based baseline
|
|
2. LaMa (Large Mask Inpainting) - Optional, better quality
|
|
|
|
DATENSCHUTZ: All processing happens locally on Mac Mini.
|
|
"""
|
|
|
|
import numpy as np
|
|
from PIL import Image
|
|
import io
|
|
import logging
|
|
from typing import Tuple, Optional
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
|
|
# OpenCV is optional - only required for actual inpainting
|
|
try:
|
|
import cv2
|
|
CV2_AVAILABLE = True
|
|
except ImportError:
|
|
cv2 = None
|
|
CV2_AVAILABLE = False
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class InpaintingMethod(str, Enum):
|
|
"""Available inpainting methods."""
|
|
OPENCV_TELEA = "opencv_telea" # Fast, good for small regions
|
|
OPENCV_NS = "opencv_ns" # Navier-Stokes, slower but smoother
|
|
LAMA = "lama" # LaMa deep learning (if available)
|
|
AUTO = "auto" # Automatically select best method
|
|
|
|
|
|
@dataclass
|
|
class InpaintingResult:
|
|
"""Result of inpainting operation."""
|
|
image: np.ndarray # Cleaned image (BGR)
|
|
method_used: str # Which method was actually used
|
|
processing_time_ms: float # Processing time in milliseconds
|
|
success: bool
|
|
error_message: Optional[str] = None
|
|
|
|
|
|
# Global LaMa model (lazy loaded)
|
|
_lama_model = None
|
|
_lama_available = None
|
|
|
|
|
|
def check_lama_available() -> bool:
|
|
"""Check if LaMa inpainting is available."""
|
|
global _lama_available
|
|
|
|
if _lama_available is not None:
|
|
return _lama_available
|
|
|
|
try:
|
|
# Try to import lama-cleaner library
|
|
from lama_cleaner.model_manager import ModelManager
|
|
_lama_available = True
|
|
logger.info("LaMa inpainting is available")
|
|
except ImportError:
|
|
_lama_available = False
|
|
logger.info("LaMa not available, will use OpenCV fallback")
|
|
except Exception as e:
|
|
_lama_available = False
|
|
logger.warning(f"LaMa check failed: {e}")
|
|
|
|
return _lama_available
|
|
|
|
|
|
def inpaint_image(
|
|
image_bytes: bytes,
|
|
mask_bytes: bytes,
|
|
method: InpaintingMethod = InpaintingMethod.AUTO,
|
|
inpaint_radius: int = 3
|
|
) -> InpaintingResult:
|
|
"""
|
|
Inpaint (remove) masked regions from an image.
|
|
|
|
Args:
|
|
image_bytes: Source image as bytes
|
|
mask_bytes: Binary mask where white (255) = regions to remove
|
|
method: Inpainting method to use
|
|
inpaint_radius: Radius for OpenCV inpainting (default 3)
|
|
|
|
Returns:
|
|
InpaintingResult with cleaned image
|
|
|
|
Raises:
|
|
ImportError: If OpenCV is not available
|
|
"""
|
|
if not CV2_AVAILABLE:
|
|
raise ImportError(
|
|
"OpenCV (cv2) is required for inpainting. "
|
|
"Install with: pip install opencv-python-headless"
|
|
)
|
|
|
|
import time
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Load image
|
|
img = Image.open(io.BytesIO(image_bytes))
|
|
img_array = np.array(img)
|
|
|
|
# Convert to BGR for OpenCV
|
|
if len(img_array.shape) == 2:
|
|
img_bgr = cv2.cvtColor(img_array, cv2.COLOR_GRAY2BGR)
|
|
elif img_array.shape[2] == 4:
|
|
img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGBA2BGR)
|
|
elif img_array.shape[2] == 3:
|
|
img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
|
|
else:
|
|
img_bgr = img_array
|
|
|
|
# Load mask
|
|
mask_img = Image.open(io.BytesIO(mask_bytes))
|
|
mask_array = np.array(mask_img)
|
|
|
|
# Ensure mask is single channel
|
|
if len(mask_array.shape) == 3:
|
|
mask_array = cv2.cvtColor(mask_array, cv2.COLOR_BGR2GRAY)
|
|
|
|
# Ensure mask is binary
|
|
_, mask_binary = cv2.threshold(mask_array, 127, 255, cv2.THRESH_BINARY)
|
|
|
|
# Resize mask if dimensions don't match
|
|
if mask_binary.shape[:2] != img_bgr.shape[:2]:
|
|
mask_binary = cv2.resize(
|
|
mask_binary,
|
|
(img_bgr.shape[1], img_bgr.shape[0]),
|
|
interpolation=cv2.INTER_NEAREST
|
|
)
|
|
|
|
# Select method
|
|
if method == InpaintingMethod.AUTO:
|
|
# Use LaMa if available and mask is large
|
|
mask_ratio = np.sum(mask_binary > 0) / mask_binary.size
|
|
if check_lama_available() and mask_ratio > 0.05:
|
|
method = InpaintingMethod.LAMA
|
|
else:
|
|
method = InpaintingMethod.OPENCV_TELEA
|
|
|
|
# Perform inpainting
|
|
if method == InpaintingMethod.LAMA:
|
|
result_img, actual_method = _inpaint_lama(img_bgr, mask_binary)
|
|
elif method == InpaintingMethod.OPENCV_NS:
|
|
result_img = cv2.inpaint(
|
|
img_bgr, mask_binary, inpaint_radius, cv2.INPAINT_NS
|
|
)
|
|
actual_method = "opencv_ns"
|
|
else: # OPENCV_TELEA (default)
|
|
result_img = cv2.inpaint(
|
|
img_bgr, mask_binary, inpaint_radius, cv2.INPAINT_TELEA
|
|
)
|
|
actual_method = "opencv_telea"
|
|
|
|
processing_time = (time.time() - start_time) * 1000
|
|
|
|
logger.info(f"Inpainting completed: method={actual_method}, "
|
|
f"time={processing_time:.1f}ms")
|
|
|
|
return InpaintingResult(
|
|
image=result_img,
|
|
method_used=actual_method,
|
|
processing_time_ms=processing_time,
|
|
success=True
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Inpainting failed: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
|
|
return InpaintingResult(
|
|
image=None,
|
|
method_used="none",
|
|
processing_time_ms=0,
|
|
success=False,
|
|
error_message=str(e)
|
|
)
|
|
|
|
|
|
def _inpaint_lama(
|
|
img_bgr: np.ndarray,
|
|
mask: np.ndarray
|
|
) -> Tuple[np.ndarray, str]:
|
|
"""
|
|
Inpaint using LaMa (Large Mask Inpainting).
|
|
|
|
Falls back to OpenCV if LaMa fails.
|
|
"""
|
|
global _lama_model
|
|
|
|
try:
|
|
from lama_cleaner.model_manager import ModelManager
|
|
from lama_cleaner.schema import Config, HDStrategy, LDMSampler
|
|
|
|
# Initialize model if needed
|
|
if _lama_model is None:
|
|
logger.info("Loading LaMa model...")
|
|
_lama_model = ModelManager(
|
|
name="lama",
|
|
device="cpu", # Use CPU for Mac Mini compatibility
|
|
)
|
|
logger.info("LaMa model loaded")
|
|
|
|
# Prepare config
|
|
config = Config(
|
|
ldm_steps=25,
|
|
ldm_sampler=LDMSampler.plms,
|
|
hd_strategy=HDStrategy.ORIGINAL,
|
|
hd_strategy_crop_margin=32,
|
|
hd_strategy_crop_trigger_size=800,
|
|
hd_strategy_resize_limit=800,
|
|
)
|
|
|
|
# Convert BGR to RGB for LaMa
|
|
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
|
|
|
|
# Run inpainting
|
|
result_rgb = _lama_model(img_rgb, mask, config)
|
|
|
|
# Convert back to BGR
|
|
result_bgr = cv2.cvtColor(result_rgb, cv2.COLOR_RGB2BGR)
|
|
|
|
return result_bgr, "lama"
|
|
|
|
except Exception as e:
|
|
logger.warning(f"LaMa inpainting failed, falling back to OpenCV: {e}")
|
|
# Fallback to OpenCV
|
|
result = cv2.inpaint(img_bgr, mask, 3, cv2.INPAINT_TELEA)
|
|
return result, "opencv_telea_fallback"
|
|
|
|
|
|
def inpaint_opencv_telea(
|
|
image_bytes: bytes,
|
|
mask_bytes: bytes,
|
|
radius: int = 3
|
|
) -> bytes:
|
|
"""
|
|
Simple OpenCV Telea inpainting - fastest option.
|
|
|
|
Args:
|
|
image_bytes: Source image
|
|
mask_bytes: Binary mask (white = remove)
|
|
radius: Inpainting radius
|
|
|
|
Returns:
|
|
Inpainted image as PNG bytes
|
|
"""
|
|
result = inpaint_image(
|
|
image_bytes,
|
|
mask_bytes,
|
|
method=InpaintingMethod.OPENCV_TELEA,
|
|
inpaint_radius=radius
|
|
)
|
|
|
|
if not result.success:
|
|
raise RuntimeError(f"Inpainting failed: {result.error_message}")
|
|
|
|
return image_to_png(result.image)
|
|
|
|
|
|
def inpaint_opencv_ns(
|
|
image_bytes: bytes,
|
|
mask_bytes: bytes,
|
|
radius: int = 3
|
|
) -> bytes:
|
|
"""
|
|
OpenCV Navier-Stokes inpainting - smoother but slower.
|
|
"""
|
|
result = inpaint_image(
|
|
image_bytes,
|
|
mask_bytes,
|
|
method=InpaintingMethod.OPENCV_NS,
|
|
inpaint_radius=radius
|
|
)
|
|
|
|
if not result.success:
|
|
raise RuntimeError(f"Inpainting failed: {result.error_message}")
|
|
|
|
return image_to_png(result.image)
|
|
|
|
|
|
def remove_handwriting(
|
|
image_bytes: bytes,
|
|
mask: Optional[np.ndarray] = None,
|
|
method: InpaintingMethod = InpaintingMethod.AUTO
|
|
) -> Tuple[bytes, dict]:
|
|
"""
|
|
High-level function to remove handwriting from an image.
|
|
|
|
If no mask is provided, detects handwriting automatically.
|
|
|
|
Args:
|
|
image_bytes: Source image
|
|
mask: Optional pre-computed mask
|
|
method: Inpainting method
|
|
|
|
Returns:
|
|
Tuple of (cleaned image bytes, metadata dict)
|
|
"""
|
|
from services.handwriting_detection import detect_handwriting, mask_to_png
|
|
|
|
# Detect handwriting if no mask provided
|
|
if mask is None:
|
|
detection_result = detect_handwriting(image_bytes)
|
|
mask = detection_result.mask
|
|
detection_info = {
|
|
"confidence": detection_result.confidence,
|
|
"handwriting_ratio": detection_result.handwriting_ratio,
|
|
"detection_method": detection_result.detection_method
|
|
}
|
|
else:
|
|
detection_info = {"provided_mask": True}
|
|
|
|
# Check if there's anything to inpaint
|
|
if np.sum(mask > 0) == 0:
|
|
logger.info("No handwriting detected, returning original image")
|
|
return image_bytes, {
|
|
"inpainting_performed": False,
|
|
"reason": "no_handwriting_detected",
|
|
**detection_info
|
|
}
|
|
|
|
# Convert mask to bytes for inpainting
|
|
mask_bytes = mask_to_png(mask)
|
|
|
|
# Perform inpainting
|
|
result = inpaint_image(image_bytes, mask_bytes, method=method)
|
|
|
|
if not result.success:
|
|
raise RuntimeError(f"Inpainting failed: {result.error_message}")
|
|
|
|
# Convert result to PNG
|
|
result_bytes = image_to_png(result.image)
|
|
|
|
metadata = {
|
|
"inpainting_performed": True,
|
|
"method_used": result.method_used,
|
|
"processing_time_ms": result.processing_time_ms,
|
|
**detection_info
|
|
}
|
|
|
|
return result_bytes, metadata
|
|
|
|
|
|
def image_to_png(img_bgr: np.ndarray) -> bytes:
|
|
"""
|
|
Convert BGR image array to PNG bytes.
|
|
"""
|
|
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
|
|
img_pil = Image.fromarray(img_rgb)
|
|
buffer = io.BytesIO()
|
|
img_pil.save(buffer, format='PNG', optimize=True)
|
|
return buffer.getvalue()
|
|
|
|
|
|
def dilate_mask(mask_bytes: bytes, iterations: int = 2) -> bytes:
|
|
"""
|
|
Dilate a mask to expand the removal region.
|
|
|
|
Useful to ensure complete handwriting removal including edges.
|
|
"""
|
|
mask_img = Image.open(io.BytesIO(mask_bytes))
|
|
mask_array = np.array(mask_img)
|
|
|
|
if len(mask_array.shape) == 3:
|
|
mask_array = cv2.cvtColor(mask_array, cv2.COLOR_BGR2GRAY)
|
|
|
|
kernel = np.ones((3, 3), np.uint8)
|
|
dilated = cv2.dilate(mask_array, kernel, iterations=iterations)
|
|
|
|
img = Image.fromarray(dilated)
|
|
buffer = io.BytesIO()
|
|
img.save(buffer, format='PNG')
|
|
return buffer.getvalue()
|