feat: improved OCR pipeline session manager with categories, thumbnails, pipeline logging
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 39s
CI / test-go-edu-search (push) Successful in 27s
CI / test-python-klausur (push) Failing after 1m48s
CI / test-python-agent-core (push) Successful in 17s
CI / test-nodejs-website (push) Successful in 20s
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 39s
CI / test-go-edu-search (push) Successful in 27s
CI / test-python-klausur (push) Failing after 1m48s
CI / test-python-agent-core (push) Successful in 17s
CI / test-nodejs-website (push) Successful in 20s
- Add document_category (10 types) and pipeline_log JSONB columns - Session list: thumbnails, copyable IDs, category/doc_type badges - Inline category dropdown, bulk delete, pipeline step logging - New endpoints: thumbnail, delete-all, pipeline-log, categories - Cleared all 22 old test sessions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -66,6 +66,7 @@ from cv_vocab_pipeline import (
|
||||
)
|
||||
from ocr_pipeline_session_store import (
|
||||
create_session_db,
|
||||
delete_all_sessions_db,
|
||||
delete_session_db,
|
||||
get_session_db,
|
||||
get_session_image,
|
||||
@@ -151,8 +152,15 @@ class DewarpGroundTruthRequest(BaseModel):
|
||||
notes: Optional[str] = None
|
||||
|
||||
|
||||
class RenameSessionRequest(BaseModel):
|
||||
name: str
|
||||
VALID_DOCUMENT_CATEGORIES = {
|
||||
'vokabelseite', 'buchseite', 'arbeitsblatt', 'klausurseite',
|
||||
'mathearbeit', 'statistik', 'zeitung', 'formular', 'handschrift', 'sonstiges',
|
||||
}
|
||||
|
||||
|
||||
class UpdateSessionRequest(BaseModel):
|
||||
name: Optional[str] = None
|
||||
document_category: Optional[str] = None
|
||||
|
||||
|
||||
class ManualColumnsRequest(BaseModel):
|
||||
@@ -281,6 +289,8 @@ async def get_session_info(session_id: str):
|
||||
"image_height": img_h,
|
||||
"original_image_url": f"/api/v1/ocr-pipeline/sessions/{session_id}/image/original",
|
||||
"current_step": session.get("current_step", 1),
|
||||
"document_category": session.get("document_category"),
|
||||
"doc_type": session.get("doc_type"),
|
||||
}
|
||||
|
||||
if session.get("deskew_result"):
|
||||
@@ -293,17 +303,31 @@ async def get_session_info(session_id: str):
|
||||
result["row_result"] = session["row_result"]
|
||||
if session.get("word_result"):
|
||||
result["word_result"] = session["word_result"]
|
||||
if session.get("doc_type_result"):
|
||||
result["doc_type_result"] = session["doc_type_result"]
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@router.put("/sessions/{session_id}")
|
||||
async def rename_session(session_id: str, req: RenameSessionRequest):
|
||||
"""Rename a session."""
|
||||
updated = await update_session_db(session_id, name=req.name)
|
||||
async def update_session(session_id: str, req: UpdateSessionRequest):
|
||||
"""Update session name and/or document category."""
|
||||
kwargs: Dict[str, Any] = {}
|
||||
if req.name is not None:
|
||||
kwargs["name"] = req.name
|
||||
if req.document_category is not None:
|
||||
if req.document_category not in VALID_DOCUMENT_CATEGORIES:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Invalid category '{req.document_category}'. Valid: {sorted(VALID_DOCUMENT_CATEGORIES)}",
|
||||
)
|
||||
kwargs["document_category"] = req.document_category
|
||||
if not kwargs:
|
||||
raise HTTPException(status_code=400, detail="Nothing to update")
|
||||
updated = await update_session_db(session_id, **kwargs)
|
||||
if not updated:
|
||||
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
||||
return {"session_id": session_id, "name": req.name}
|
||||
return {"session_id": session_id, **kwargs}
|
||||
|
||||
|
||||
@router.delete("/sessions/{session_id}")
|
||||
@@ -316,6 +340,78 @@ async def delete_session(session_id: str):
|
||||
return {"session_id": session_id, "deleted": True}
|
||||
|
||||
|
||||
@router.delete("/sessions")
|
||||
async def delete_all_sessions():
|
||||
"""Delete ALL sessions (cleanup)."""
|
||||
_cache.clear()
|
||||
count = await delete_all_sessions_db()
|
||||
return {"deleted_count": count}
|
||||
|
||||
|
||||
@router.get("/sessions/{session_id}/thumbnail")
|
||||
async def get_session_thumbnail(session_id: str, size: int = Query(default=80, ge=16, le=400)):
|
||||
"""Return a small thumbnail of the original image."""
|
||||
original_png = await get_session_image(session_id, "original")
|
||||
if not original_png:
|
||||
raise HTTPException(status_code=404, detail=f"Session {session_id} not found or no image")
|
||||
arr = np.frombuffer(original_png, dtype=np.uint8)
|
||||
img = cv2.imdecode(arr, cv2.IMREAD_COLOR)
|
||||
if img is None:
|
||||
raise HTTPException(status_code=500, detail="Failed to decode image")
|
||||
h, w = img.shape[:2]
|
||||
scale = size / max(h, w)
|
||||
new_w, new_h = int(w * scale), int(h * scale)
|
||||
thumb = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
|
||||
_, png_bytes = cv2.imencode(".png", thumb)
|
||||
return Response(content=png_bytes.tobytes(), media_type="image/png",
|
||||
headers={"Cache-Control": "public, max-age=3600"})
|
||||
|
||||
|
||||
@router.get("/sessions/{session_id}/pipeline-log")
|
||||
async def get_pipeline_log(session_id: str):
|
||||
"""Get the pipeline execution log for a session."""
|
||||
session = await get_session_db(session_id)
|
||||
if not session:
|
||||
raise HTTPException(status_code=404, detail=f"Session {session_id} not found")
|
||||
return {"session_id": session_id, "pipeline_log": session.get("pipeline_log") or {"steps": []}}
|
||||
|
||||
|
||||
@router.get("/categories")
|
||||
async def list_categories():
|
||||
"""List valid document categories."""
|
||||
return {"categories": sorted(VALID_DOCUMENT_CATEGORIES)}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pipeline Log Helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _append_pipeline_log(
|
||||
session_id: str,
|
||||
step_name: str,
|
||||
metrics: Dict[str, Any],
|
||||
success: bool = True,
|
||||
duration_ms: Optional[int] = None,
|
||||
):
|
||||
"""Append a step entry to the session's pipeline_log JSONB."""
|
||||
session = await get_session_db(session_id)
|
||||
if not session:
|
||||
return
|
||||
log = session.get("pipeline_log") or {"steps": []}
|
||||
if not isinstance(log, dict):
|
||||
log = {"steps": []}
|
||||
entry = {
|
||||
"step": step_name,
|
||||
"completed_at": datetime.utcnow().isoformat(),
|
||||
"success": success,
|
||||
"metrics": metrics,
|
||||
}
|
||||
if duration_ms is not None:
|
||||
entry["duration_ms"] = duration_ms
|
||||
log.setdefault("steps", []).append(entry)
|
||||
await update_session_db(session_id, pipeline_log=log)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Image Endpoints
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -448,6 +544,12 @@ async def auto_deskew(session_id: str):
|
||||
logger.info(f"OCR Pipeline: deskew session {session_id}: "
|
||||
f"hough={angle_hough:.2f} wa={angle_wa:.2f} -> {method_used} {angle_applied:.2f}")
|
||||
|
||||
await _append_pipeline_log(session_id, "deskew", {
|
||||
"angle_applied": round(angle_applied, 3),
|
||||
"confidence": round(confidence, 2),
|
||||
"method": method_used,
|
||||
}, duration_ms=int(duration * 1000))
|
||||
|
||||
return {
|
||||
"session_id": session_id,
|
||||
**deskew_result,
|
||||
@@ -680,6 +782,13 @@ async def auto_dewarp(
|
||||
f"method={dewarp_info['method']} shear={dewarp_info['shear_degrees']:.3f} "
|
||||
f"conf={dewarp_info['confidence']:.2f} ({duration:.2f}s)")
|
||||
|
||||
await _append_pipeline_log(session_id, "dewarp", {
|
||||
"shear_degrees": dewarp_info["shear_degrees"],
|
||||
"confidence": dewarp_info["confidence"],
|
||||
"method": dewarp_info["method"],
|
||||
"ensemble_methods": [d.get("method", "") for d in dewarp_info.get("detections", [])],
|
||||
}, duration_ms=int(duration * 1000))
|
||||
|
||||
return {
|
||||
"session_id": session_id,
|
||||
**dewarp_result,
|
||||
@@ -808,6 +917,13 @@ async def detect_type(session_id: str):
|
||||
logger.info(f"OCR Pipeline: detect-type session {session_id}: "
|
||||
f"{result.doc_type} (confidence={result.confidence}, {duration:.2f}s)")
|
||||
|
||||
await _append_pipeline_log(session_id, "detect_type", {
|
||||
"doc_type": result.doc_type,
|
||||
"pipeline": result.pipeline,
|
||||
"confidence": result.confidence,
|
||||
**{k: v for k, v in (result.features or {}).items() if isinstance(v, (int, float, str, bool))},
|
||||
}, duration_ms=int(duration * 1000))
|
||||
|
||||
return {"session_id": session_id, **result_dict}
|
||||
|
||||
|
||||
@@ -896,6 +1012,13 @@ async def detect_columns(session_id: str):
|
||||
logger.info(f"OCR Pipeline: columns session {session_id}: "
|
||||
f"{col_count} columns detected ({duration:.2f}s)")
|
||||
|
||||
img_w = dewarped_bgr.shape[1]
|
||||
await _append_pipeline_log(session_id, "columns", {
|
||||
"total_columns": len(columns),
|
||||
"column_widths_pct": [round(c["width"] / img_w * 100, 1) for c in columns],
|
||||
"column_types": [c["type"] for c in columns],
|
||||
}, duration_ms=int(duration * 1000))
|
||||
|
||||
return {
|
||||
"session_id": session_id,
|
||||
**column_result,
|
||||
@@ -1112,6 +1235,15 @@ async def detect_rows(session_id: str):
|
||||
logger.info(f"OCR Pipeline: rows session {session_id}: "
|
||||
f"{len(rows)} rows detected ({duration:.2f}s): {type_counts}")
|
||||
|
||||
content_rows = sum(1 for r in rows if r.row_type == "content")
|
||||
avg_height = round(sum(r.height for r in rows) / len(rows)) if rows else 0
|
||||
await _append_pipeline_log(session_id, "rows", {
|
||||
"total_rows": len(rows),
|
||||
"content_rows": content_rows,
|
||||
"artifact_rows_removed": type_counts.get("header", 0) + type_counts.get("footer", 0),
|
||||
"avg_row_height_px": avg_height,
|
||||
}, duration_ms=int(duration * 1000))
|
||||
|
||||
return {
|
||||
"session_id": session_id,
|
||||
**row_result,
|
||||
@@ -1369,6 +1501,15 @@ async def detect_words(
|
||||
f"layout={word_result['layout']}, "
|
||||
f"{len(cells)} cells ({duration:.2f}s), summary: {word_result['summary']}")
|
||||
|
||||
await _append_pipeline_log(session_id, "words", {
|
||||
"total_cells": len(cells),
|
||||
"non_empty_cells": word_result["summary"]["non_empty_cells"],
|
||||
"low_confidence_count": word_result["summary"]["low_confidence"],
|
||||
"ocr_engine": used_engine,
|
||||
"layout": word_result["layout"],
|
||||
"entry_count": word_result.get("entry_count", 0),
|
||||
}, duration_ms=int(duration * 1000))
|
||||
|
||||
return {
|
||||
"session_id": session_id,
|
||||
**word_result,
|
||||
@@ -1774,6 +1915,13 @@ async def run_llm_review(session_id: str, request: Request, stream: bool = False
|
||||
logger.info(f"LLM review session {session_id}: {len(result['changes'])} changes, "
|
||||
f"{result['duration_ms']}ms, model={result['model_used']}")
|
||||
|
||||
await _append_pipeline_log(session_id, "correction", {
|
||||
"engine": "llm",
|
||||
"model": result["model_used"],
|
||||
"total_entries": len(entries),
|
||||
"corrections_proposed": len(result["changes"]),
|
||||
}, duration_ms=result["duration_ms"])
|
||||
|
||||
return {
|
||||
"session_id": session_id,
|
||||
"changes": result["changes"],
|
||||
|
||||
@@ -66,7 +66,9 @@ async def init_ocr_pipeline_tables():
|
||||
ADD COLUMN IF NOT EXISTS clean_png BYTEA,
|
||||
ADD COLUMN IF NOT EXISTS handwriting_removal_meta JSONB,
|
||||
ADD COLUMN IF NOT EXISTS doc_type VARCHAR(50),
|
||||
ADD COLUMN IF NOT EXISTS doc_type_result JSONB
|
||||
ADD COLUMN IF NOT EXISTS doc_type_result JSONB,
|
||||
ADD COLUMN IF NOT EXISTS document_category VARCHAR(50),
|
||||
ADD COLUMN IF NOT EXISTS pipeline_log JSONB
|
||||
""")
|
||||
|
||||
|
||||
@@ -91,6 +93,7 @@ async def create_session_db(
|
||||
deskew_result, dewarp_result, column_result, row_result,
|
||||
word_result, ground_truth, auto_shear_degrees,
|
||||
doc_type, doc_type_result,
|
||||
document_category, pipeline_log,
|
||||
created_at, updated_at
|
||||
""", uuid.UUID(session_id), name, filename, original_png)
|
||||
|
||||
@@ -106,6 +109,7 @@ async def get_session_db(session_id: str) -> Optional[Dict[str, Any]]:
|
||||
deskew_result, dewarp_result, column_result, row_result,
|
||||
word_result, ground_truth, auto_shear_degrees,
|
||||
doc_type, doc_type_result,
|
||||
document_category, pipeline_log,
|
||||
created_at, updated_at
|
||||
FROM ocr_pipeline_sessions WHERE id = $1
|
||||
""", uuid.UUID(session_id))
|
||||
@@ -151,9 +155,10 @@ async def update_session_db(session_id: str, **kwargs) -> Optional[Dict[str, Any
|
||||
'deskew_result', 'dewarp_result', 'column_result', 'row_result',
|
||||
'word_result', 'ground_truth', 'auto_shear_degrees',
|
||||
'doc_type', 'doc_type_result',
|
||||
'document_category', 'pipeline_log',
|
||||
}
|
||||
|
||||
jsonb_fields = {'deskew_result', 'dewarp_result', 'column_result', 'row_result', 'word_result', 'ground_truth', 'handwriting_removal_meta', 'doc_type_result'}
|
||||
jsonb_fields = {'deskew_result', 'dewarp_result', 'column_result', 'row_result', 'word_result', 'ground_truth', 'handwriting_removal_meta', 'doc_type_result', 'pipeline_log'}
|
||||
|
||||
for key, value in kwargs.items():
|
||||
if key in allowed_fields:
|
||||
@@ -180,6 +185,7 @@ async def update_session_db(session_id: str, **kwargs) -> Optional[Dict[str, Any
|
||||
deskew_result, dewarp_result, column_result, row_result,
|
||||
word_result, ground_truth, auto_shear_degrees,
|
||||
doc_type, doc_type_result,
|
||||
document_category, pipeline_log,
|
||||
created_at, updated_at
|
||||
""", *values)
|
||||
|
||||
@@ -194,6 +200,7 @@ async def list_sessions_db(limit: int = 50) -> List[Dict[str, Any]]:
|
||||
async with pool.acquire() as conn:
|
||||
rows = await conn.fetch("""
|
||||
SELECT id, name, filename, status, current_step,
|
||||
document_category, doc_type,
|
||||
created_at, updated_at
|
||||
FROM ocr_pipeline_sessions
|
||||
ORDER BY created_at DESC
|
||||
@@ -213,6 +220,18 @@ async def delete_session_db(session_id: str) -> bool:
|
||||
return result == "DELETE 1"
|
||||
|
||||
|
||||
async def delete_all_sessions_db() -> int:
|
||||
"""Delete all sessions. Returns number of deleted rows."""
|
||||
pool = await get_pool()
|
||||
async with pool.acquire() as conn:
|
||||
result = await conn.execute("DELETE FROM ocr_pipeline_sessions")
|
||||
# result is e.g. "DELETE 5"
|
||||
try:
|
||||
return int(result.split()[-1])
|
||||
except (ValueError, IndexError):
|
||||
return 0
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# HELPER
|
||||
# =============================================================================
|
||||
@@ -235,7 +254,7 @@ def _row_to_dict(row: asyncpg.Record) -> Dict[str, Any]:
|
||||
result[key] = result[key].isoformat()
|
||||
|
||||
# JSONB → parsed (asyncpg returns str for JSONB)
|
||||
for key in ['deskew_result', 'dewarp_result', 'column_result', 'row_result', 'word_result', 'ground_truth', 'doc_type_result']:
|
||||
for key in ['deskew_result', 'dewarp_result', 'column_result', 'row_result', 'word_result', 'ground_truth', 'doc_type_result', 'pipeline_log']:
|
||||
if key in result and result[key] is not None:
|
||||
if isinstance(result[key], str):
|
||||
result[key] = json.loads(result[key])
|
||||
|
||||
Reference in New Issue
Block a user