Files
breakpilot-lehrer/klausur-service/backend/orientation_crop_api.py
Benjamin Admin 2763631711
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 28s
CI / test-go-edu-search (push) Successful in 27s
CI / test-python-klausur (push) Failing after 1m59s
CI / test-python-agent-core (push) Successful in 17s
CI / test-nodejs-website (push) Successful in 18s
feat: Orientierung + Zuschneiden als Schritte 1-2 in OCR-Pipeline
Zwei neue Wizard-Schritte vor Begradigung:
- Step 1: Orientierungserkennung (0/90/180/270° via Tesseract OSD)
- Step 2: Seitenrand-Erkennung und Zuschnitt (Scannerraender entfernen)

Backend:
- orientation_crop_api.py: POST /orientation, POST /crop, POST /crop/skip
- page_crop.py: detect_and_crop_page() mit Format-Erkennung (A4/A5/Letter)
- Session-Store: orientation_result, crop_result Felder
- Pipeline nutzt zugeschnittenes Bild fuer Deskew/Dewarp

Frontend:
- StepOrientation.tsx: Upload + Auto-Orientierung + Vorher/Nachher
- StepCrop.tsx: Auto-Crop + Format-Badge + Ueberspringen-Option
- Pipeline-Stepper: 10 Schritte (war 8)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 23:55:23 +01:00

331 lines
9.9 KiB
Python

"""
Orientation & Crop API - Steps 1-2 of the OCR Pipeline.
Step 1: Orientation detection (fix 90/180/270 degree rotations)
Step 2: Page cropping (remove scanner borders, detect paper format)
These endpoints were extracted from the main pipeline to keep files manageable.
"""
import logging
import time
from typing import Any, Dict, Optional
import cv2
import numpy as np
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from cv_vocab_pipeline import detect_and_fix_orientation
from page_crop import detect_and_crop_page
from ocr_pipeline_session_store import (
get_session_db,
get_session_image,
update_session_db,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["ocr-pipeline"])
# Reference to the shared cache from ocr_pipeline_api (set in main.py)
_cache: Dict[str, Dict[str, Any]] = {}
def set_cache_ref(cache: Dict[str, Dict[str, Any]]):
"""Set reference to the shared cache from ocr_pipeline_api."""
global _cache
_cache = cache
async def _ensure_cached(session_id: str) -> Dict[str, Any]:
"""Ensure session is in cache, loading from DB if needed."""
if session_id in _cache:
return _cache[session_id]
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
cache_entry: Dict[str, Any] = {
"id": session_id,
**session,
"original_bgr": None,
"oriented_bgr": None,
"cropped_bgr": None,
"deskewed_bgr": None,
"dewarped_bgr": None,
}
for img_type, bgr_key in [
("original", "original_bgr"),
("oriented", "oriented_bgr"),
("cropped", "cropped_bgr"),
("deskewed", "deskewed_bgr"),
("dewarped", "dewarped_bgr"),
]:
png_data = await get_session_image(session_id, img_type)
if png_data:
arr = np.frombuffer(png_data, dtype=np.uint8)
bgr = cv2.imdecode(arr, cv2.IMREAD_COLOR)
cache_entry[bgr_key] = bgr
_cache[session_id] = cache_entry
return cache_entry
async def _append_pipeline_log(session_id: str, step: str, metrics: dict, duration_ms: int):
"""Append a step entry to the pipeline log."""
from datetime import datetime
session = await get_session_db(session_id)
if not session:
return
pipeline_log = session.get("pipeline_log") or {"steps": []}
pipeline_log["steps"].append({
"step": step,
"completed_at": datetime.utcnow().isoformat(),
"success": True,
"duration_ms": duration_ms,
"metrics": metrics,
})
await update_session_db(session_id, pipeline_log=pipeline_log)
# ---------------------------------------------------------------------------
# Step 1: Orientation
# ---------------------------------------------------------------------------
@router.post("/sessions/{session_id}/orientation")
async def detect_orientation(session_id: str):
"""Detect and fix 90/180/270 degree rotations from scanners.
Reads the original image, applies orientation correction,
stores the result as oriented_png.
"""
cached = await _ensure_cached(session_id)
img_bgr = cached.get("original_bgr")
if img_bgr is None:
raise HTTPException(status_code=400, detail="Original image not available")
t0 = time.time()
# Detect and fix orientation
oriented_bgr, orientation_deg = detect_and_fix_orientation(img_bgr.copy())
duration = time.time() - t0
orientation_result = {
"orientation_degrees": orientation_deg,
"corrected": orientation_deg != 0,
"duration_seconds": round(duration, 2),
}
# Encode oriented image
success, png_buf = cv2.imencode(".png", oriented_bgr)
oriented_png = png_buf.tobytes() if success else b""
# Update cache
cached["oriented_bgr"] = oriented_bgr
cached["orientation_result"] = orientation_result
# Persist to DB
await update_session_db(
session_id,
oriented_png=oriented_png,
orientation_result=orientation_result,
current_step=2,
)
logger.info(
"OCR Pipeline: orientation session %s: %d° (%s) in %.2fs",
session_id, orientation_deg,
"corrected" if orientation_deg else "no change",
duration,
)
await _append_pipeline_log(session_id, "orientation", {
"orientation_degrees": orientation_deg,
"corrected": orientation_deg != 0,
}, duration_ms=int(duration * 1000))
h, w = oriented_bgr.shape[:2]
return {
"session_id": session_id,
**orientation_result,
"image_width": w,
"image_height": h,
"oriented_image_url": f"/api/v1/ocr-pipeline/sessions/{session_id}/image/oriented",
}
# ---------------------------------------------------------------------------
# Step 2: Crop
# ---------------------------------------------------------------------------
@router.post("/sessions/{session_id}/crop")
async def auto_crop(session_id: str):
"""Auto-detect and crop scanner borders.
Reads the oriented image (or original if no orientation step),
detects the page boundary and crops.
"""
cached = await _ensure_cached(session_id)
# Use oriented image if available, else original
img_bgr = cached.get("oriented_bgr") or cached.get("original_bgr")
if img_bgr is None:
raise HTTPException(status_code=400, detail="No image available for cropping")
t0 = time.time()
cropped_bgr, crop_info = detect_and_crop_page(img_bgr)
duration = time.time() - t0
crop_info["duration_seconds"] = round(duration, 2)
# Encode cropped image
success, png_buf = cv2.imencode(".png", cropped_bgr)
cropped_png = png_buf.tobytes() if success else b""
# Update cache
cached["cropped_bgr"] = cropped_bgr
cached["crop_result"] = crop_info
# Persist to DB
await update_session_db(
session_id,
cropped_png=cropped_png,
crop_result=crop_info,
current_step=3,
)
logger.info(
"OCR Pipeline: crop session %s: applied=%s format=%s in %.2fs",
session_id, crop_info["crop_applied"],
crop_info.get("detected_format", "?"),
duration,
)
await _append_pipeline_log(session_id, "crop", {
"crop_applied": crop_info["crop_applied"],
"detected_format": crop_info.get("detected_format"),
"format_confidence": crop_info.get("format_confidence"),
}, duration_ms=int(duration * 1000))
h, w = cropped_bgr.shape[:2]
return {
"session_id": session_id,
**crop_info,
"image_width": w,
"image_height": h,
"cropped_image_url": f"/api/v1/ocr-pipeline/sessions/{session_id}/image/cropped",
}
class ManualCropRequest(BaseModel):
x: float # percentage 0-100
y: float # percentage 0-100
width: float # percentage 0-100
height: float # percentage 0-100
@router.post("/sessions/{session_id}/crop/manual")
async def manual_crop(session_id: str, req: ManualCropRequest):
"""Manually crop using percentage coordinates."""
cached = await _ensure_cached(session_id)
img_bgr = cached.get("oriented_bgr") or cached.get("original_bgr")
if img_bgr is None:
raise HTTPException(status_code=400, detail="No image available for cropping")
h, w = img_bgr.shape[:2]
# Convert percentages to pixels
px_x = int(w * req.x / 100.0)
px_y = int(h * req.y / 100.0)
px_w = int(w * req.width / 100.0)
px_h = int(h * req.height / 100.0)
# Clamp
px_x = max(0, min(px_x, w - 1))
px_y = max(0, min(px_y, h - 1))
px_w = max(1, min(px_w, w - px_x))
px_h = max(1, min(px_h, h - px_y))
cropped_bgr = img_bgr[px_y:px_y + px_h, px_x:px_x + px_w].copy()
success, png_buf = cv2.imencode(".png", cropped_bgr)
cropped_png = png_buf.tobytes() if success else b""
crop_result = {
"crop_applied": True,
"crop_rect": {"x": px_x, "y": px_y, "width": px_w, "height": px_h},
"crop_rect_pct": {"x": round(req.x, 2), "y": round(req.y, 2),
"width": round(req.width, 2), "height": round(req.height, 2)},
"original_size": {"width": w, "height": h},
"cropped_size": {"width": px_w, "height": px_h},
"method": "manual",
}
cached["cropped_bgr"] = cropped_bgr
cached["crop_result"] = crop_result
await update_session_db(
session_id,
cropped_png=cropped_png,
crop_result=crop_result,
current_step=3,
)
ch, cw = cropped_bgr.shape[:2]
return {
"session_id": session_id,
**crop_result,
"image_width": cw,
"image_height": ch,
"cropped_image_url": f"/api/v1/ocr-pipeline/sessions/{session_id}/image/cropped",
}
@router.post("/sessions/{session_id}/crop/skip")
async def skip_crop(session_id: str):
"""Skip cropping — use oriented (or original) image as-is."""
cached = await _ensure_cached(session_id)
img_bgr = cached.get("oriented_bgr") or cached.get("original_bgr")
if img_bgr is None:
raise HTTPException(status_code=400, detail="No image available")
h, w = img_bgr.shape[:2]
# Store the oriented image as cropped (identity crop)
success, png_buf = cv2.imencode(".png", img_bgr)
cropped_png = png_buf.tobytes() if success else b""
crop_result = {
"crop_applied": False,
"skipped": True,
"original_size": {"width": w, "height": h},
"cropped_size": {"width": w, "height": h},
}
cached["cropped_bgr"] = img_bgr
cached["crop_result"] = crop_result
await update_session_db(
session_id,
cropped_png=cropped_png,
crop_result=crop_result,
current_step=3,
)
return {
"session_id": session_id,
**crop_result,
"image_width": w,
"image_height": h,
"cropped_image_url": f"/api/v1/ocr-pipeline/sessions/{session_id}/image/cropped",
}