Add OCR Kombi Pipeline: modular 11-step architecture with multi-page support
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 29s
CI / test-go-edu-search (push) Successful in 28s
CI / test-python-klausur (push) Failing after 2m24s
CI / test-python-agent-core (push) Successful in 22s
CI / test-nodejs-website (push) Successful in 20s

Phase 1 of the clean architecture refactor: Replaces the 751-line ocr-overlay
monolith with a modular pipeline. Each step gets its own component file.

Frontend: /ai/ocr-kombi route with 11 steps (Upload, Orientation, PageSplit,
Deskew, Dewarp, ContentCrop, OCR, Structure, GridBuild, GridReview, GroundTruth).
Session list supports document grouping for multi-page uploads.

Backend: New ocr_kombi/ module with multi-page PDF upload (splits PDF into N
sessions with shared document_group_id). DB migration adds document_group_id
and page_number columns.

Old /ai/ocr-overlay remains fully functional for A/B testing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-26 15:55:28 +01:00
parent d26233b5b3
commit d26a9f60ab
25 changed files with 1935 additions and 7 deletions

View File

@@ -46,6 +46,7 @@ from ocr_pipeline_api import router as ocr_pipeline_router, _cache as ocr_pipeli
from grid_editor_api import router as grid_editor_router
from orientation_crop_api import router as orientation_crop_router, set_cache_ref as set_orientation_crop_cache
from ocr_pipeline_session_store import init_ocr_pipeline_tables
from ocr_kombi.router import router as ocr_kombi_router
try:
from handwriting_htr_api import router as htr_router
except ImportError:
@@ -186,6 +187,7 @@ if htr_router:
app.include_router(htr_router) # Handwriting HTR (Klausur)
if dsfa_rag_router:
app.include_router(dsfa_rag_router) # DSFA RAG Corpus Search
app.include_router(ocr_kombi_router) # OCR Kombi Pipeline (modular)
# =============================================

View File

@@ -0,0 +1,12 @@
-- Migration: Add document_group_id and page_number for multi-page document grouping.
-- A document_group_id groups multiple sessions that belong to the same scanned document.
-- page_number is the 1-based page index within the group.
ALTER TABLE ocr_pipeline_sessions
ADD COLUMN IF NOT EXISTS document_group_id UUID,
ADD COLUMN IF NOT EXISTS page_number INT;
-- Index for efficient group lookups
CREATE INDEX IF NOT EXISTS idx_ocr_sessions_document_group
ON ocr_pipeline_sessions (document_group_id)
WHERE document_group_id IS NOT NULL;

View File

@@ -0,0 +1 @@
"""OCR Kombi Pipeline - modular step-based OCR processing."""

View File

@@ -0,0 +1,19 @@
"""
Composite router for the OCR Kombi pipeline.
Aggregates step-specific sub-routers into one router for main.py to include.
"""
from fastapi import APIRouter
from .step_upload import router as upload_router
router = APIRouter(prefix="/api/v1/ocr-kombi", tags=["ocr-kombi"])
# Include step-specific routes
router.include_router(upload_router)
# Future steps will be added here:
# from .step_orientation import router as orientation_router
# router.include_router(orientation_router)
# ...

View File

@@ -0,0 +1,132 @@
"""
Step 1: Upload — handles single images and multi-page PDFs.
Multi-page PDFs are split into individual PNG pages, each getting its own
session linked by a shared document_group_id.
"""
import io
import uuid
import logging
import time
from typing import Optional
from fastapi import APIRouter, UploadFile, File, Form, HTTPException
from ocr_pipeline_session_store import create_session_db, get_document_group_sessions
logger = logging.getLogger(__name__)
router = APIRouter()
def _pdf_to_pngs(pdf_bytes: bytes) -> list[bytes]:
"""Convert a PDF to a list of PNG byte buffers (one per page)."""
try:
import fitz # PyMuPDF
except ImportError:
raise HTTPException(
status_code=500,
detail="PDF-Verarbeitung nicht verfuegbar (PyMuPDF fehlt)"
)
doc = fitz.open(stream=pdf_bytes, filetype="pdf")
pages: list[bytes] = []
for page in doc:
# Render at 300 DPI for OCR quality
mat = fitz.Matrix(300 / 72, 300 / 72)
pix = page.get_pixmap(matrix=mat)
pages.append(pix.tobytes("png"))
doc.close()
return pages
@router.post("/upload")
async def upload_document(
file: UploadFile = File(...),
name: Optional[str] = Form(None),
document_category: Optional[str] = Form(None),
):
"""Upload a single image or multi-page PDF.
Single image: Creates 1 session with document_group_id + page_number=1.
Multi-page PDF: Creates N sessions with shared document_group_id,
page_number 1..N, and titles "Title — S. X".
"""
t0 = time.time()
file_bytes = await file.read()
filename = file.filename or "upload"
base_title = name or filename.rsplit(".", 1)[0]
is_pdf = (
filename.lower().endswith(".pdf")
or file.content_type == "application/pdf"
or file_bytes[:4] == b"%PDF"
)
group_id = str(uuid.uuid4())
created_sessions = []
if is_pdf:
pages = _pdf_to_pngs(file_bytes)
if not pages:
raise HTTPException(status_code=400, detail="PDF enthaelt keine Seiten")
for i, png_bytes in enumerate(pages, start=1):
session_id = str(uuid.uuid4())
page_title = f"{base_title} — S. {i}" if len(pages) > 1 else base_title
session = await create_session_db(
session_id=session_id,
name=page_title,
filename=filename,
original_png=png_bytes,
document_group_id=group_id,
page_number=i,
)
created_sessions.append({
"session_id": session["id"],
"name": session["name"],
"page_number": i,
})
else:
# Single image
session_id = str(uuid.uuid4())
session = await create_session_db(
session_id=session_id,
name=base_title,
filename=filename,
original_png=file_bytes,
document_group_id=group_id,
page_number=1,
)
created_sessions.append({
"session_id": session["id"],
"name": session["name"],
"page_number": 1,
})
duration = round(time.time() - t0, 2)
logger.info(
"Upload complete: %d page(s), group=%s, %.2fs",
len(created_sessions), group_id, duration,
)
return {
"document_group_id": group_id,
"page_count": len(created_sessions),
"sessions": created_sessions,
"duration_seconds": duration,
}
@router.get("/documents/{group_id}")
async def get_document_group(group_id: str):
"""Get all sessions in a document group, sorted by page_number."""
sessions = await get_document_group_sessions(group_id)
if not sessions:
raise HTTPException(status_code=404, detail="Dokumentgruppe nicht gefunden")
return {
"document_group_id": group_id,
"page_count": len(sessions),
"sessions": sessions,
}

View File

@@ -76,7 +76,16 @@ async def init_ocr_pipeline_tables():
ADD COLUMN IF NOT EXISTS parent_session_id UUID REFERENCES ocr_pipeline_sessions(id) ON DELETE CASCADE,
ADD COLUMN IF NOT EXISTS box_index INT,
ADD COLUMN IF NOT EXISTS grid_editor_result JSONB,
ADD COLUMN IF NOT EXISTS structure_result JSONB
ADD COLUMN IF NOT EXISTS structure_result JSONB,
ADD COLUMN IF NOT EXISTS document_group_id UUID,
ADD COLUMN IF NOT EXISTS page_number INT
""")
# Index for document group lookups
await conn.execute("""
CREATE INDEX IF NOT EXISTS idx_ocr_sessions_document_group
ON ocr_pipeline_sessions (document_group_id)
WHERE document_group_id IS NOT NULL
""")
@@ -91,21 +100,26 @@ async def create_session_db(
original_png: bytes,
parent_session_id: Optional[str] = None,
box_index: Optional[int] = None,
document_group_id: Optional[str] = None,
page_number: Optional[int] = None,
) -> Dict[str, Any]:
"""Create a new OCR pipeline session.
Args:
parent_session_id: If set, this is a sub-session for a box region.
box_index: 0-based index of the box this sub-session represents.
document_group_id: Groups multi-page uploads into one document.
page_number: 1-based page index within the document group.
"""
pool = await get_pool()
parent_uuid = uuid.UUID(parent_session_id) if parent_session_id else None
group_uuid = uuid.UUID(document_group_id) if document_group_id else None
async with pool.acquire() as conn:
row = await conn.fetchrow("""
INSERT INTO ocr_pipeline_sessions (
id, name, filename, original_png, status, current_step,
parent_session_id, box_index
) VALUES ($1, $2, $3, $4, 'active', 1, $5, $6)
parent_session_id, box_index, document_group_id, page_number
) VALUES ($1, $2, $3, $4, 'active', 1, $5, $6, $7, $8)
RETURNING id, name, filename, status, current_step,
orientation_result, crop_result,
deskew_result, dewarp_result, column_result, row_result,
@@ -114,9 +128,10 @@ async def create_session_db(
document_category, pipeline_log,
grid_editor_result, structure_result,
parent_session_id, box_index,
document_group_id, page_number,
created_at, updated_at
""", uuid.UUID(session_id), name, filename, original_png,
parent_uuid, box_index)
parent_uuid, box_index, group_uuid, page_number)
return _row_to_dict(row)
@@ -134,6 +149,7 @@ async def get_session_db(session_id: str) -> Optional[Dict[str, Any]]:
document_category, pipeline_log,
grid_editor_result, structure_result,
parent_session_id, box_index,
document_group_id, page_number,
created_at, updated_at
FROM ocr_pipeline_sessions WHERE id = $1
""", uuid.UUID(session_id))
@@ -186,6 +202,7 @@ async def update_session_db(session_id: str, **kwargs) -> Optional[Dict[str, Any
'document_category', 'pipeline_log',
'grid_editor_result', 'structure_result',
'parent_session_id', 'box_index',
'document_group_id', 'page_number',
}
jsonb_fields = {'orientation_result', 'crop_result', 'deskew_result', 'dewarp_result', 'column_result', 'row_result', 'word_result', 'ground_truth', 'handwriting_removal_meta', 'doc_type_result', 'pipeline_log', 'grid_editor_result', 'structure_result'}
@@ -217,8 +234,9 @@ async def update_session_db(session_id: str, **kwargs) -> Optional[Dict[str, Any
word_result, ground_truth, auto_shear_degrees,
doc_type, doc_type_result,
document_category, pipeline_log,
grid_editor_result,
grid_editor_result, structure_result,
parent_session_id, box_index,
document_group_id, page_number,
created_at, updated_at
""", *values)
@@ -243,6 +261,7 @@ async def list_sessions_db(
SELECT id, name, filename, status, current_step,
document_category, doc_type,
parent_session_id, box_index,
document_group_id, page_number,
created_at, updated_at
FROM ocr_pipeline_sessions
{where}
@@ -261,6 +280,7 @@ async def get_sub_sessions(parent_session_id: str) -> List[Dict[str, Any]]:
SELECT id, name, filename, status, current_step,
document_category, doc_type,
parent_session_id, box_index,
document_group_id, page_number,
created_at, updated_at
FROM ocr_pipeline_sessions
WHERE parent_session_id = $1
@@ -270,6 +290,24 @@ async def get_sub_sessions(parent_session_id: str) -> List[Dict[str, Any]]:
return [_row_to_dict(row) for row in rows]
async def get_document_group_sessions(document_group_id: str) -> List[Dict[str, Any]]:
"""Get all sessions in a document group, ordered by page_number."""
pool = await get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch("""
SELECT id, name, filename, status, current_step,
document_category, doc_type,
parent_session_id, box_index,
document_group_id, page_number,
created_at, updated_at
FROM ocr_pipeline_sessions
WHERE document_group_id = $1
ORDER BY page_number ASC
""", uuid.UUID(document_group_id))
return [_row_to_dict(row) for row in rows]
async def list_ground_truth_sessions_db() -> List[Dict[str, Any]]:
"""List sessions that have a build_grid_reference in ground_truth."""
pool = await get_pool()
@@ -324,7 +362,7 @@ def _row_to_dict(row: asyncpg.Record) -> Dict[str, Any]:
result = dict(row)
# UUID → string
for key in ['id', 'session_id', 'parent_session_id']:
for key in ['id', 'session_id', 'parent_session_id', 'document_group_id']:
if key in result and result[key] is not None:
result[key] = str(result[key])