Files
breakpilot-lehrer/klausur-service/backend/ocr_pipeline_api.py
Benjamin Admin d552fd8b6b
All checks were successful
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 38s
CI / test-go-edu-search (push) Successful in 29s
CI / test-python-klausur (push) Successful in 1m46s
CI / test-python-agent-core (push) Successful in 17s
CI / test-nodejs-website (push) Successful in 22s
feat: OCR Pipeline mit 6-Schritt-Wizard fuer Seitenrekonstruktion
Neue Route /ai/ocr-pipeline mit schrittweiser Begradigung (Deskew),
Raster-Overlay und Ground Truth. Schritte 2-6 als Platzhalter.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 15:38:08 +01:00

302 lines
10 KiB
Python

"""
OCR Pipeline API - Schrittweise Seitenrekonstruktion.
Zerlegt den OCR-Prozess in 6 einzelne Schritte:
1. Deskewing - Scan begradigen
2. Spaltenerkennung - Unsichtbare Spalten finden
3. Worterkennung - OCR mit Bounding Boxes
4. Koordinatenzuweisung - Exakte Positionen
5. Seitenrekonstruktion - Seite nachbauen
6. Ground Truth Validierung - Gesamtpruefung
Lizenz: Apache 2.0
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import io
import logging
import time
import uuid
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
import cv2
import numpy as np
from fastapi import APIRouter, File, HTTPException, UploadFile
from fastapi.responses import Response
from pydantic import BaseModel
from cv_vocab_pipeline import (
create_ocr_image,
deskew_image,
deskew_image_by_word_alignment,
render_image_high_res,
render_pdf_high_res,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["ocr-pipeline"])
# ---------------------------------------------------------------------------
# In-memory session store (24h TTL)
# ---------------------------------------------------------------------------
_sessions: Dict[str, Dict[str, Any]] = {}
SESSION_TTL_HOURS = 24
def _cleanup_expired():
"""Remove sessions older than TTL."""
cutoff = datetime.utcnow() - timedelta(hours=SESSION_TTL_HOURS)
expired = [sid for sid, s in _sessions.items() if s.get("created_at", datetime.utcnow()) < cutoff]
for sid in expired:
del _sessions[sid]
logger.info(f"OCR Pipeline: expired session {sid}")
def _get_session(session_id: str) -> Dict[str, Any]:
"""Get session or raise 404."""
session = _sessions.get(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
return session
# ---------------------------------------------------------------------------
# Pydantic Models
# ---------------------------------------------------------------------------
class ManualDeskewRequest(BaseModel):
angle: float
class DeskewGroundTruthRequest(BaseModel):
is_correct: bool
corrected_angle: Optional[float] = None
notes: Optional[str] = None
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.post("/sessions")
async def create_session(file: UploadFile = File(...)):
"""Upload a PDF or image file and create a pipeline session."""
_cleanup_expired()
file_data = await file.read()
filename = file.filename or "upload"
content_type = file.content_type or ""
session_id = str(uuid.uuid4())
is_pdf = content_type == "application/pdf" or filename.lower().endswith(".pdf")
try:
if is_pdf:
img_bgr = render_pdf_high_res(file_data, page_number=0, zoom=3.0)
else:
img_bgr = render_image_high_res(file_data)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Could not process file: {e}")
# Encode original as PNG bytes for serving
success, png_buf = cv2.imencode(".png", img_bgr)
if not success:
raise HTTPException(status_code=500, detail="Failed to encode image")
_sessions[session_id] = {
"id": session_id,
"filename": filename,
"created_at": datetime.utcnow(),
"original_bgr": img_bgr,
"original_png": png_buf.tobytes(),
"deskewed_bgr": None,
"deskewed_png": None,
"binarized_png": None,
"deskew_result": None,
"ground_truth": {},
"current_step": 1,
}
logger.info(f"OCR Pipeline: created session {session_id} from {filename} "
f"({img_bgr.shape[1]}x{img_bgr.shape[0]})")
return {
"session_id": session_id,
"filename": filename,
"image_width": img_bgr.shape[1],
"image_height": img_bgr.shape[0],
"original_image_url": f"/api/v1/ocr-pipeline/sessions/{session_id}/image/original",
}
@router.post("/sessions/{session_id}/deskew")
async def auto_deskew(session_id: str):
"""Run both deskew methods and pick the best one."""
session = _get_session(session_id)
img_bgr = session["original_bgr"]
t0 = time.time()
# Method 1: Hough Lines
try:
deskewed_hough, angle_hough = deskew_image(img_bgr.copy())
except Exception as e:
logger.warning(f"Hough deskew failed: {e}")
deskewed_hough, angle_hough = img_bgr, 0.0
# Method 2: Word Alignment (needs image bytes)
success_enc, png_orig = cv2.imencode(".png", img_bgr)
orig_bytes = png_orig.tobytes() if success_enc else b""
try:
deskewed_wa_bytes, angle_wa = deskew_image_by_word_alignment(orig_bytes)
except Exception as e:
logger.warning(f"Word alignment deskew failed: {e}")
deskewed_wa_bytes, angle_wa = orig_bytes, 0.0
duration = time.time() - t0
# Pick method with larger detected angle (more correction needed = more skew found)
# If both are ~0, prefer word alignment as it's more robust
if abs(angle_wa) >= abs(angle_hough) or abs(angle_hough) < 0.1:
method_used = "word_alignment"
angle_applied = angle_wa
# Decode word alignment result to BGR
wa_array = np.frombuffer(deskewed_wa_bytes, dtype=np.uint8)
deskewed_bgr = cv2.imdecode(wa_array, cv2.IMREAD_COLOR)
if deskewed_bgr is None:
deskewed_bgr = deskewed_hough
method_used = "hough"
angle_applied = angle_hough
else:
method_used = "hough"
angle_applied = angle_hough
deskewed_bgr = deskewed_hough
# Encode deskewed as PNG
success, deskewed_png_buf = cv2.imencode(".png", deskewed_bgr)
deskewed_png = deskewed_png_buf.tobytes() if success else session["original_png"]
# Create binarized version
try:
binarized = create_ocr_image(deskewed_bgr)
success_bin, bin_buf = cv2.imencode(".png", binarized)
binarized_png = bin_buf.tobytes() if success_bin else None
except Exception as e:
logger.warning(f"Binarization failed: {e}")
binarized_png = None
# Confidence: higher angle = lower confidence that we got it right
confidence = max(0.5, 1.0 - abs(angle_applied) / 5.0)
deskew_result = {
"angle_hough": round(angle_hough, 3),
"angle_word_alignment": round(angle_wa, 3),
"angle_applied": round(angle_applied, 3),
"method_used": method_used,
"confidence": round(confidence, 2),
"duration_seconds": round(duration, 2),
}
session["deskewed_bgr"] = deskewed_bgr
session["deskewed_png"] = deskewed_png
session["binarized_png"] = binarized_png
session["deskew_result"] = deskew_result
logger.info(f"OCR Pipeline: deskew session {session_id}: "
f"hough={angle_hough:.2f}° wa={angle_wa:.2f}° → {method_used} {angle_applied:.2f}°")
return {
"session_id": session_id,
**deskew_result,
"deskewed_image_url": f"/api/v1/ocr-pipeline/sessions/{session_id}/image/deskewed",
"binarized_image_url": f"/api/v1/ocr-pipeline/sessions/{session_id}/image/binarized",
}
@router.post("/sessions/{session_id}/deskew/manual")
async def manual_deskew(session_id: str, req: ManualDeskewRequest):
"""Apply a manual rotation angle to the original image."""
session = _get_session(session_id)
img_bgr = session["original_bgr"]
angle = max(-5.0, min(5.0, req.angle))
h, w = img_bgr.shape[:2]
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, angle, 1.0)
rotated = cv2.warpAffine(img_bgr, M, (w, h),
flags=cv2.INTER_LINEAR,
borderMode=cv2.BORDER_REPLICATE)
success, png_buf = cv2.imencode(".png", rotated)
deskewed_png = png_buf.tobytes() if success else session["original_png"]
# Binarize
try:
binarized = create_ocr_image(rotated)
success_bin, bin_buf = cv2.imencode(".png", binarized)
binarized_png = bin_buf.tobytes() if success_bin else None
except Exception:
binarized_png = None
session["deskewed_bgr"] = rotated
session["deskewed_png"] = deskewed_png
session["binarized_png"] = binarized_png
session["deskew_result"] = {
**(session.get("deskew_result") or {}),
"angle_applied": round(angle, 3),
"method_used": "manual",
}
logger.info(f"OCR Pipeline: manual deskew session {session_id}: {angle:.2f}°")
return {
"session_id": session_id,
"angle_applied": round(angle, 3),
"method_used": "manual",
"deskewed_image_url": f"/api/v1/ocr-pipeline/sessions/{session_id}/image/deskewed",
}
@router.get("/sessions/{session_id}/image/{image_type}")
async def get_image(session_id: str, image_type: str):
"""Serve session images: original, deskewed, or binarized."""
session = _get_session(session_id)
if image_type == "original":
data = session.get("original_png")
elif image_type == "deskewed":
data = session.get("deskewed_png")
elif image_type == "binarized":
data = session.get("binarized_png")
else:
raise HTTPException(status_code=400, detail=f"Unknown image type: {image_type}")
if not data:
raise HTTPException(status_code=404, detail=f"Image '{image_type}' not available yet")
return Response(content=data, media_type="image/png")
@router.post("/sessions/{session_id}/ground-truth/deskew")
async def save_deskew_ground_truth(session_id: str, req: DeskewGroundTruthRequest):
"""Save ground truth feedback for the deskew step."""
session = _get_session(session_id)
gt = {
"is_correct": req.is_correct,
"corrected_angle": req.corrected_angle,
"notes": req.notes,
"saved_at": datetime.utcnow().isoformat(),
"deskew_result": session.get("deskew_result"),
}
session["ground_truth"]["deskew"] = gt
logger.info(f"OCR Pipeline: ground truth deskew session {session_id}: "
f"correct={req.is_correct}, corrected_angle={req.corrected_angle}")
return {"session_id": session_id, "ground_truth": gt}