Files
breakpilot-lehrer/klausur-service/backend/ocr_pipeline_sessions.py
Benjamin Admin 9c5e950c99
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 42s
CI / test-nodejs-website (push) Successful in 36s
CI / test-python-klausur (push) Failing after 10m2s
CI / test-go-edu-search (push) Failing after 10m9s
CI / test-python-agent-core (push) Failing after 14m58s
Fix multi-page PDF upload: include session_id for first page
The frontend expects session_id in the upload response, but multi-page
PDFs returned only document_group_id + pages[]. Now includes session_id
pointing to the first page for backwards compatibility.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 16:26:25 +02:00

598 lines
21 KiB
Python

"""
OCR Pipeline Sessions API - Session management and image serving endpoints.
Extracted from ocr_pipeline_api.py for modularity.
Handles: CRUD for sessions, thumbnails, pipeline logs, categories,
image serving (with overlay dispatch), and document type detection.
Lizenz: Apache 2.0
DATENSCHUTZ: Alle Verarbeitung erfolgt lokal.
"""
import logging
import time
import uuid
from typing import Any, Dict, Optional
import cv2
import numpy as np
from fastapi import APIRouter, File, Form, HTTPException, Query, UploadFile
from fastapi.responses import Response
from cv_vocab_pipeline import (
create_ocr_image,
detect_document_type,
render_image_high_res,
render_pdf_high_res,
)
from ocr_pipeline_common import (
VALID_DOCUMENT_CATEGORIES,
UpdateSessionRequest,
_append_pipeline_log,
_cache,
_get_base_image_png,
_get_cached,
_load_session_to_cache,
)
from ocr_pipeline_overlays import render_overlay
from ocr_pipeline_session_store import (
create_session_db,
delete_all_sessions_db,
delete_session_db,
get_session_db,
get_session_image,
get_sub_sessions,
list_sessions_db,
update_session_db,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/ocr-pipeline", tags=["ocr-pipeline"])
# ---------------------------------------------------------------------------
# Session Management Endpoints
# ---------------------------------------------------------------------------
@router.get("/sessions")
async def list_sessions(include_sub_sessions: bool = False):
"""List OCR pipeline sessions.
By default, sub-sessions (box regions) are hidden.
Pass ?include_sub_sessions=true to show them.
"""
sessions = await list_sessions_db(include_sub_sessions=include_sub_sessions)
return {"sessions": sessions}
@router.post("/sessions")
async def create_session(
file: UploadFile = File(...),
name: Optional[str] = Form(None),
):
"""Upload a PDF or image file and create a pipeline session.
For multi-page PDFs (> 1 page), each page becomes its own session
grouped under a ``document_group_id``. The response includes a
``pages`` array with one entry per page/session.
"""
file_data = await file.read()
filename = file.filename or "upload"
content_type = file.content_type or ""
is_pdf = content_type == "application/pdf" or filename.lower().endswith(".pdf")
session_name = name or filename
# --- Multi-page PDF handling ---
if is_pdf:
try:
import fitz # PyMuPDF
pdf_doc = fitz.open(stream=file_data, filetype="pdf")
page_count = pdf_doc.page_count
pdf_doc.close()
except Exception as e:
raise HTTPException(status_code=400, detail=f"Could not read PDF: {e}")
if page_count > 1:
return await _create_multi_page_sessions(
file_data, filename, session_name, page_count,
)
# --- Single page (image or 1-page PDF) ---
session_id = str(uuid.uuid4())
try:
if is_pdf:
img_bgr = render_pdf_high_res(file_data, page_number=0, zoom=3.0)
else:
img_bgr = render_image_high_res(file_data)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Could not process file: {e}")
# Encode original as PNG bytes
success, png_buf = cv2.imencode(".png", img_bgr)
if not success:
raise HTTPException(status_code=500, detail="Failed to encode image")
original_png = png_buf.tobytes()
# Persist to DB
await create_session_db(
session_id=session_id,
name=session_name,
filename=filename,
original_png=original_png,
)
# Cache BGR array for immediate processing
_cache[session_id] = {
"id": session_id,
"filename": filename,
"name": session_name,
"original_bgr": img_bgr,
"oriented_bgr": None,
"cropped_bgr": None,
"deskewed_bgr": None,
"dewarped_bgr": None,
"orientation_result": None,
"crop_result": None,
"deskew_result": None,
"dewarp_result": None,
"ground_truth": {},
"current_step": 1,
}
logger.info(f"OCR Pipeline: created session {session_id} from {filename} "
f"({img_bgr.shape[1]}x{img_bgr.shape[0]})")
return {
"session_id": session_id,
"filename": filename,
"name": session_name,
"image_width": img_bgr.shape[1],
"image_height": img_bgr.shape[0],
"original_image_url": f"/api/v1/ocr-pipeline/sessions/{session_id}/image/original",
}
async def _create_multi_page_sessions(
pdf_data: bytes,
filename: str,
base_name: str,
page_count: int,
) -> dict:
"""Create one session per PDF page, grouped by document_group_id."""
document_group_id = str(uuid.uuid4())
pages = []
for page_idx in range(page_count):
session_id = str(uuid.uuid4())
page_name = f"{base_name} — Seite {page_idx + 1}"
try:
img_bgr = render_pdf_high_res(pdf_data, page_number=page_idx, zoom=3.0)
except Exception as e:
logger.warning(f"Failed to render PDF page {page_idx + 1}: {e}")
continue
ok, png_buf = cv2.imencode(".png", img_bgr)
if not ok:
continue
page_png = png_buf.tobytes()
await create_session_db(
session_id=session_id,
name=page_name,
filename=filename,
original_png=page_png,
document_group_id=document_group_id,
page_number=page_idx + 1,
)
_cache[session_id] = {
"id": session_id,
"filename": filename,
"name": page_name,
"original_bgr": img_bgr,
"oriented_bgr": None,
"cropped_bgr": None,
"deskewed_bgr": None,
"dewarped_bgr": None,
"orientation_result": None,
"crop_result": None,
"deskew_result": None,
"dewarp_result": None,
"ground_truth": {},
"current_step": 1,
}
h, w = img_bgr.shape[:2]
pages.append({
"session_id": session_id,
"name": page_name,
"page_number": page_idx + 1,
"image_width": w,
"image_height": h,
"original_image_url": f"/api/v1/ocr-pipeline/sessions/{session_id}/image/original",
})
logger.info(
f"OCR Pipeline: created page session {session_id} "
f"(page {page_idx + 1}/{page_count}) from {filename} ({w}x{h})"
)
# Include session_id pointing to first page for backwards compatibility
# (frontends that expect a single session_id will navigate to page 1)
first_session_id = pages[0]["session_id"] if pages else None
return {
"session_id": first_session_id,
"document_group_id": document_group_id,
"filename": filename,
"name": base_name,
"page_count": page_count,
"pages": pages,
}
@router.get("/sessions/{session_id}")
async def get_session_info(session_id: str):
"""Get session info including deskew/dewarp/column results for step navigation."""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
# Get image dimensions from original PNG
original_png = await get_session_image(session_id, "original")
if original_png:
arr = np.frombuffer(original_png, dtype=np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
img_w, img_h = img.shape[1], img.shape[0] if img is not None else (0, 0)
else:
img_w, img_h = 0, 0
result = {
"session_id": session["id"],
"filename": session.get("filename", ""),
"name": session.get("name", ""),
"image_width": img_w,
"image_height": img_h,
"original_image_url": f"/api/v1/ocr-pipeline/sessions/{session_id}/image/original",
"current_step": session.get("current_step", 1),
"document_category": session.get("document_category"),
"doc_type": session.get("doc_type"),
}
if session.get("orientation_result"):
result["orientation_result"] = session["orientation_result"]
if session.get("crop_result"):
result["crop_result"] = session["crop_result"]
if session.get("deskew_result"):
result["deskew_result"] = session["deskew_result"]
if session.get("dewarp_result"):
result["dewarp_result"] = session["dewarp_result"]
if session.get("column_result"):
result["column_result"] = session["column_result"]
if session.get("row_result"):
result["row_result"] = session["row_result"]
if session.get("word_result"):
result["word_result"] = session["word_result"]
if session.get("doc_type_result"):
result["doc_type_result"] = session["doc_type_result"]
if session.get("structure_result"):
result["structure_result"] = session["structure_result"]
if session.get("grid_editor_result"):
# Include summary only to keep response small
gr = session["grid_editor_result"]
result["grid_editor_result"] = {
"summary": gr.get("summary", {}),
"zones_count": len(gr.get("zones", [])),
"edited": gr.get("edited", False),
}
if session.get("ground_truth"):
result["ground_truth"] = session["ground_truth"]
# Box sub-session info (zone_type='box' from column detection — NOT page-split)
if session.get("parent_session_id"):
result["parent_session_id"] = session["parent_session_id"]
result["box_index"] = session.get("box_index")
else:
# Check for box sub-sessions (column detection creates these)
subs = await get_sub_sessions(session_id)
if subs:
result["sub_sessions"] = [
{"id": s["id"], "name": s.get("name"), "box_index": s.get("box_index")}
for s in subs
]
return result
@router.put("/sessions/{session_id}")
async def update_session(session_id: str, req: UpdateSessionRequest):
"""Update session name and/or document category."""
kwargs: Dict[str, Any] = {}
if req.name is not None:
kwargs["name"] = req.name
if req.document_category is not None:
if req.document_category not in VALID_DOCUMENT_CATEGORIES:
raise HTTPException(
status_code=400,
detail=f"Invalid category '{req.document_category}'. Valid: {sorted(VALID_DOCUMENT_CATEGORIES)}",
)
kwargs["document_category"] = req.document_category
if not kwargs:
raise HTTPException(status_code=400, detail="Nothing to update")
updated = await update_session_db(session_id, **kwargs)
if not updated:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
return {"session_id": session_id, **kwargs}
@router.delete("/sessions/{session_id}")
async def delete_session(session_id: str):
"""Delete a session."""
_cache.pop(session_id, None)
deleted = await delete_session_db(session_id)
if not deleted:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
return {"session_id": session_id, "deleted": True}
@router.delete("/sessions")
async def delete_all_sessions():
"""Delete ALL sessions (cleanup)."""
_cache.clear()
count = await delete_all_sessions_db()
return {"deleted_count": count}
@router.post("/sessions/{session_id}/create-box-sessions")
async def create_box_sessions(session_id: str):
"""Create sub-sessions for each detected box region.
Crops box regions from the cropped/dewarped image and creates
independent sub-sessions that can be processed through the pipeline.
"""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
column_result = session.get("column_result")
if not column_result:
raise HTTPException(status_code=400, detail="Column detection must be completed first")
zones = column_result.get("zones") or []
box_zones = [z for z in zones if z.get("zone_type") == "box" and z.get("box")]
if not box_zones:
return {"session_id": session_id, "sub_sessions": [], "message": "No boxes detected"}
# Check for existing sub-sessions
existing = await get_sub_sessions(session_id)
if existing:
return {
"session_id": session_id,
"sub_sessions": [{"id": s["id"], "box_index": s.get("box_index")} for s in existing],
"message": f"{len(existing)} sub-session(s) already exist",
}
# Load base image
base_png = await get_session_image(session_id, "cropped")
if not base_png:
base_png = await get_session_image(session_id, "dewarped")
if not base_png:
raise HTTPException(status_code=400, detail="No base image available")
arr = np.frombuffer(base_png, dtype=np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if img is None:
raise HTTPException(status_code=500, detail="Failed to decode image")
parent_name = session.get("name", "Session")
created = []
for i, zone in enumerate(box_zones):
box = zone["box"]
bx, by = box["x"], box["y"]
bw, bh = box["width"], box["height"]
# Crop box region with small padding
pad = 5
y1 = max(0, by - pad)
y2 = min(img.shape[0], by + bh + pad)
x1 = max(0, bx - pad)
x2 = min(img.shape[1], bx + bw + pad)
crop = img[y1:y2, x1:x2]
# Encode as PNG
success, png_buf = cv2.imencode(".png", crop)
if not success:
logger.warning(f"Failed to encode box {i} crop for session {session_id}")
continue
sub_id = str(uuid.uuid4())
sub_name = f"{parent_name} — Box {i + 1}"
await create_session_db(
session_id=sub_id,
name=sub_name,
filename=session.get("filename", "box-crop.png"),
original_png=png_buf.tobytes(),
parent_session_id=session_id,
box_index=i,
)
# Cache the BGR for immediate processing
# Promote original to cropped so column/row/word detection finds it
box_bgr = crop.copy()
_cache[sub_id] = {
"id": sub_id,
"filename": session.get("filename", "box-crop.png"),
"name": sub_name,
"parent_session_id": session_id,
"original_bgr": box_bgr,
"oriented_bgr": None,
"cropped_bgr": box_bgr,
"deskewed_bgr": None,
"dewarped_bgr": None,
"orientation_result": None,
"crop_result": None,
"deskew_result": None,
"dewarp_result": None,
"ground_truth": {},
"current_step": 1,
}
created.append({
"id": sub_id,
"name": sub_name,
"box_index": i,
"box": box,
"image_width": crop.shape[1],
"image_height": crop.shape[0],
})
logger.info(f"Created box sub-session {sub_id} for session {session_id} "
f"(box {i}, {crop.shape[1]}x{crop.shape[0]})")
return {
"session_id": session_id,
"sub_sessions": created,
"total": len(created),
}
@router.get("/sessions/{session_id}/thumbnail")
async def get_session_thumbnail(session_id: str, size: int = Query(default=80, ge=16, le=400)):
"""Return a small thumbnail of the original image."""
original_png = await get_session_image(session_id, "original")
if not original_png:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found or no image")
arr = np.frombuffer(original_png, dtype=np.uint8)
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
if img is None:
raise HTTPException(status_code=500, detail="Failed to decode image")
h, w = img.shape[:2]
scale = size / max(h, w)
new_w, new_h = int(w * scale), int(h * scale)
thumb = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
_, png_bytes = cv2.imencode(".png", thumb)
return Response(content=png_bytes.tobytes(), media_type="image/png",
headers={"Cache-Control": "public, max-age=3600"})
@router.get("/sessions/{session_id}/pipeline-log")
async def get_pipeline_log(session_id: str):
"""Get the pipeline execution log for a session."""
session = await get_session_db(session_id)
if not session:
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
return {"session_id": session_id, "pipeline_log": session.get("pipeline_log") or {"steps": []}}
@router.get("/categories")
async def list_categories():
"""List valid document categories."""
return {"categories": sorted(VALID_DOCUMENT_CATEGORIES)}
# ---------------------------------------------------------------------------
# Image Endpoints
# ---------------------------------------------------------------------------
@router.get("/sessions/{session_id}/image/{image_type}")
async def get_image(session_id: str, image_type: str):
"""Serve session images: original, deskewed, dewarped, binarized, structure-overlay, columns-overlay, or rows-overlay."""
valid_types = {"original", "oriented", "cropped", "deskewed", "dewarped", "binarized", "structure-overlay", "columns-overlay", "rows-overlay", "words-overlay", "clean"}
if image_type not in valid_types:
raise HTTPException(status_code=400, detail=f"Unknown image type: {image_type}")
if image_type == "structure-overlay":
return await render_overlay("structure", session_id)
if image_type == "columns-overlay":
return await render_overlay("columns", session_id)
if image_type == "rows-overlay":
return await render_overlay("rows", session_id)
if image_type == "words-overlay":
return await render_overlay("words", session_id)
# Try cache first for fast serving
cached = _cache.get(session_id)
if cached:
png_key = f"{image_type}_png" if image_type != "original" else None
bgr_key = f"{image_type}_bgr" if image_type != "binarized" else None
# For binarized, check if we have it cached as PNG
if image_type == "binarized" and cached.get("binarized_png"):
return Response(content=cached["binarized_png"], media_type="image/png")
# Load from DB — for cropped/dewarped, fall back through the chain
if image_type in ("cropped", "dewarped"):
data = await _get_base_image_png(session_id)
else:
data = await get_session_image(session_id, image_type)
if not data:
raise HTTPException(status_code=404, detail=f"Image '{image_type}' not available yet")
return Response(content=data, media_type="image/png")
# ---------------------------------------------------------------------------
# Document Type Detection (between Dewarp and Columns)
# ---------------------------------------------------------------------------
@router.post("/sessions/{session_id}/detect-type")
async def detect_type(session_id: str):
"""Detect document type (vocab_table, full_text, generic_table).
Should be called after crop (clean image available).
Falls back to dewarped if crop was skipped.
Stores result in session for frontend to decide pipeline flow.
"""
if session_id not in _cache:
await _load_session_to_cache(session_id)
cached = _get_cached(session_id)
img_bgr = cached.get("cropped_bgr") if cached.get("cropped_bgr") is not None else cached.get("dewarped_bgr")
if img_bgr is None:
raise HTTPException(status_code=400, detail="Crop or dewarp must be completed first")
t0 = time.time()
ocr_img = create_ocr_image(img_bgr)
result = detect_document_type(ocr_img, img_bgr)
duration = time.time() - t0
result_dict = {
"doc_type": result.doc_type,
"confidence": result.confidence,
"pipeline": result.pipeline,
"skip_steps": result.skip_steps,
"features": result.features,
"duration_seconds": round(duration, 2),
}
# Persist to DB
await update_session_db(
session_id,
doc_type=result.doc_type,
doc_type_result=result_dict,
)
cached["doc_type_result"] = result_dict
logger.info(f"OCR Pipeline: detect-type session {session_id}: "
f"{result.doc_type} (confidence={result.confidence}, {duration:.2f}s)")
await _append_pipeline_log(session_id, "detect_type", {
"doc_type": result.doc_type,
"pipeline": result.pipeline,
"confidence": result.confidence,
**{k: v for k, v in (result.features or {}).items() if isinstance(v, (int, float, str, bool))},
}, duration_ms=int(duration * 1000))
return {"session_id": session_id, **result_dict}