Files
breakpilot-lehrer/klausur-service/backend/vocab_worksheet_analysis_api.py
Benjamin Admin 9ba420fa91
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 42s
CI / test-go-edu-search (push) Successful in 34s
CI / test-python-klausur (push) Failing after 2m51s
CI / test-python-agent-core (push) Successful in 21s
CI / test-nodejs-website (push) Successful in 29s
Fix: Remove broken getKlausurApiUrl and clean up empty lines
sed replacement left orphaned hostname references in story page
and empty lines in getApiBase functions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-24 16:02:04 +02:00

473 lines
17 KiB
Python

"""
Vocabulary Worksheet Analysis API - OCR export, ground truth labeling,
extract-with-boxes, deskewed images, and learning unit generation.
The two large handlers (compare_ocr_methods, analyze_grid) live in
vocab_worksheet_compare_api.py and are included via compare_router.
"""
from fastapi import APIRouter, Body, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional, Dict, Any
from datetime import datetime
import os
import io
import json
import logging
def _get_sessions():
from vocab_worksheet_api import _sessions
return _sessions
def _get_local_storage_path():
from vocab_worksheet_api import LOCAL_STORAGE_PATH
return LOCAL_STORAGE_PATH
from vocab_worksheet_generation import convert_pdf_page_to_image
# Try to import Tesseract extractor
try:
from tesseract_vocab_extractor import (
extract_bounding_boxes, TESSERACT_AVAILABLE,
)
except ImportError:
TESSERACT_AVAILABLE = False
# Try to import Grid Detection Service
try:
from services.grid_detection_service import GridDetectionService
GRID_SERVICE_AVAILABLE = True
except ImportError:
GRID_SERVICE_AVAILABLE = False
logger = logging.getLogger(__name__)
analysis_router = APIRouter()
def _ocr_export_dir():
return os.path.join(_get_local_storage_path(), "ocr-exports")
def _ground_truth_dir():
return os.path.join(_get_local_storage_path(), "ground-truth")
# =============================================================================
# OCR Export Endpoints (for cross-app OCR data sharing)
# =============================================================================
@analysis_router.post("/sessions/{session_id}/ocr-export/{page_number}")
async def save_ocr_export(session_id: str, page_number: int, data: Dict[str, Any] = Body(...)):
"""
Save OCR export data for cross-app sharing (admin-v2 -> studio-v2).
Both apps proxy to klausur-service via /klausur-api/, so this endpoint
serves as shared storage accessible from both ports.
"""
logger.info(f"Saving OCR export for session {session_id}, page {page_number}")
os.makedirs(_ocr_export_dir(), exist_ok=True)
# Save the export data
export_path = os.path.join(_ocr_export_dir(), f"{session_id}_page{page_number}.json")
with open(export_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# Update latest pointer
latest_path = os.path.join(_ocr_export_dir(), "latest.json")
with open(latest_path, 'w', encoding='utf-8') as f:
json.dump({
"session_id": session_id,
"page_number": page_number,
"saved_at": datetime.utcnow().isoformat(),
}, f, ensure_ascii=False, indent=2)
return {
"success": True,
"session_id": session_id,
"page_number": page_number,
"message": "OCR export saved successfully",
}
@analysis_router.get("/sessions/{session_id}/ocr-export/{page_number}")
async def load_ocr_export(session_id: str, page_number: int):
"""Load a specific OCR export by session and page number."""
export_path = os.path.join(_ocr_export_dir(), f"{session_id}_page{page_number}.json")
if not os.path.exists(export_path):
raise HTTPException(status_code=404, detail="OCR export not found")
with open(export_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
@analysis_router.get("/ocr-export/latest")
async def load_latest_ocr_export():
"""Load the most recently saved OCR export data."""
latest_path = os.path.join(_ocr_export_dir(), "latest.json")
if not os.path.exists(latest_path):
raise HTTPException(status_code=404, detail="No OCR exports found")
with open(latest_path, 'r', encoding='utf-8') as f:
pointer = json.load(f)
session_id = pointer.get("session_id")
page_number = pointer.get("page_number")
export_path = os.path.join(_ocr_export_dir(), f"{session_id}_page{page_number}.json")
if not os.path.exists(export_path):
raise HTTPException(status_code=404, detail="Latest OCR export file not found")
with open(export_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
# =============================================================================
# Extract with Boxes & Deskewed Image
# =============================================================================
async def extract_entries_with_boxes(image_bytes: bytes, lang: str = "eng+deu") -> dict:
"""Extract vocabulary entries with bounding boxes using Tesseract + GridDetectionService.
Returns dict with 'entries' list and 'image_width'/'image_height'.
Each entry has row_index, english, german, example, confidence, bbox, bbox_en, bbox_de, bbox_ex.
All bbox coordinates are in percent (0-100).
"""
if not TESSERACT_AVAILABLE:
raise HTTPException(status_code=500, detail="Tesseract not available")
if not GRID_SERVICE_AVAILABLE:
raise HTTPException(status_code=500, detail="GridDetectionService not available")
# Step 1: Tesseract word-level bounding boxes
tess_result = await extract_bounding_boxes(image_bytes, lang=lang)
words = tess_result.get("words", [])
img_w = tess_result.get("image_width", 0)
img_h = tess_result.get("image_height", 0)
if not words or img_w == 0 or img_h == 0:
return {"entries": [], "image_width": img_w, "image_height": img_h}
# Step 2: Convert to OCR regions (percentage-based)
service = GridDetectionService()
regions = service.convert_tesseract_regions(words, img_w, img_h)
if not regions:
return {"entries": [], "image_width": img_w, "image_height": img_h}
# Step 3: Detect grid
grid_result = service.detect_grid(regions)
if not grid_result.cells:
return {"entries": [], "image_width": img_w, "image_height": img_h}
# Step 4: Group cells by logical_row and column_type
from services.grid_detection_service import ColumnType
entries = []
for row_idx, row_cells in enumerate(grid_result.cells):
en_text = ""
de_text = ""
ex_text = ""
en_bbox = None
de_bbox = None
ex_bbox = None
row_conf_sum = 0.0
row_conf_count = 0
for cell in row_cells:
cell_bbox = {"x": round(cell.x, 2), "y": round(cell.y, 2),
"w": round(cell.width, 2), "h": round(cell.height, 2)}
if cell.column_type == ColumnType.ENGLISH:
en_text = cell.text.strip()
en_bbox = cell_bbox
elif cell.column_type == ColumnType.GERMAN:
de_text = cell.text.strip()
de_bbox = cell_bbox
elif cell.column_type == ColumnType.EXAMPLE:
ex_text = cell.text.strip()
ex_bbox = cell_bbox
if cell.text.strip():
row_conf_sum += cell.confidence
row_conf_count += 1
# Skip completely empty rows
if not en_text and not de_text and not ex_text:
continue
# Calculate whole-row bounding box
all_bboxes = [b for b in [en_bbox, de_bbox, ex_bbox] if b is not None]
if all_bboxes:
row_x = min(b["x"] for b in all_bboxes)
row_y = min(b["y"] for b in all_bboxes)
row_right = max(b["x"] + b["w"] for b in all_bboxes)
row_bottom = max(b["y"] + b["h"] for b in all_bboxes)
row_bbox = {"x": round(row_x, 2), "y": round(row_y, 2),
"w": round(row_right - row_x, 2), "h": round(row_bottom - row_y, 2)}
else:
row_bbox = {"x": 0, "y": 0, "w": 100, "h": 3}
avg_conf = round((row_conf_sum / row_conf_count * 100) if row_conf_count > 0 else 0, 1)
entries.append({
"row_index": row_idx,
"english": en_text,
"german": de_text,
"example": ex_text,
"confidence": avg_conf,
"bbox": row_bbox,
"bbox_en": en_bbox or {"x": 0, "y": 0, "w": 0, "h": 0},
"bbox_de": de_bbox or {"x": 0, "y": 0, "w": 0, "h": 0},
"bbox_ex": ex_bbox or {"x": 0, "y": 0, "w": 0, "h": 0},
})
return {"entries": entries, "image_width": img_w, "image_height": img_h}
@analysis_router.post("/sessions/{session_id}/extract-with-boxes/{page_number}")
async def extract_with_boxes(session_id: str, page_number: int):
"""Extract vocabulary entries with bounding boxes for ground truth labeling.
Uses Tesseract + GridDetectionService for spatial positioning.
page_number is 0-indexed.
"""
logger.info(f"Extract with boxes for session {session_id}, page {page_number}")
if session_id not in _get_sessions():
raise HTTPException(status_code=404, detail="Session not found")
session = _get_sessions()[session_id]
pdf_data = session.get("pdf_data")
if not pdf_data:
raise HTTPException(status_code=400, detail="No PDF uploaded for this session")
page_count = session.get("pdf_page_count", 1)
if page_number < 0 or page_number >= page_count:
raise HTTPException(status_code=400, detail=f"Invalid page number. PDF has {page_count} pages (0-indexed).")
# Convert page to hires image
image_data = await convert_pdf_page_to_image(pdf_data, page_number, thumbnail=False)
# Deskew image before OCR
deskew_angle = 0.0
try:
from cv_vocab_pipeline import deskew_image_by_word_alignment, CV2_AVAILABLE
if CV2_AVAILABLE:
image_data, deskew_angle = deskew_image_by_word_alignment(image_data)
logger.info(f"Deskew: {deskew_angle:.2f}° for page {page_number}")
except Exception as e:
logger.warning(f"Deskew failed for page {page_number}: {e}")
# Cache deskewed image in session for later serving
if "deskewed_images" not in session:
session["deskewed_images"] = {}
session["deskewed_images"][str(page_number)] = image_data
# Extract entries with boxes (now on deskewed image)
result = await extract_entries_with_boxes(image_data)
# Cache in session
if "gt_entries" not in session:
session["gt_entries"] = {}
session["gt_entries"][str(page_number)] = result["entries"]
return {
"success": True,
"entries": result["entries"],
"entry_count": len(result["entries"]),
"image_width": result["image_width"],
"image_height": result["image_height"],
"deskew_angle": round(deskew_angle, 2),
"deskewed": abs(deskew_angle) > 0.05,
}
@analysis_router.get("/sessions/{session_id}/deskewed-image/{page_number}")
async def get_deskewed_image(session_id: str, page_number: int):
"""Return the deskewed page image as PNG.
Falls back to the original hires image if no deskewed version is cached.
"""
if session_id not in _get_sessions():
raise HTTPException(status_code=404, detail="Session not found")
session = _get_sessions()[session_id]
deskewed = session.get("deskewed_images", {}).get(str(page_number))
if deskewed:
return StreamingResponse(io.BytesIO(deskewed), media_type="image/png")
# Fallback: render original hires image
pdf_data = session.get("pdf_data")
if not pdf_data:
raise HTTPException(status_code=400, detail="No PDF uploaded for this session")
image_data = await convert_pdf_page_to_image(pdf_data, page_number, thumbnail=False)
return StreamingResponse(io.BytesIO(image_data), media_type="image/png")
# =============================================================================
# Ground Truth Labeling
# =============================================================================
@analysis_router.post("/sessions/{session_id}/ground-truth/{page_number}")
async def save_ground_truth(session_id: str, page_number: int, data: dict = Body(...)):
"""Save ground truth labels for a page.
Expects body with 'entries' list - each entry has english, german, example,
status ('confirmed' | 'edited' | 'skipped'), and bbox fields.
"""
logger.info(f"Save ground truth for session {session_id}, page {page_number}")
if session_id not in _get_sessions():
raise HTTPException(status_code=404, detail="Session not found")
entries = data.get("entries", [])
if not entries:
raise HTTPException(status_code=400, detail="No entries provided")
# Save in session
session = _get_sessions()[session_id]
if "ground_truth" not in session:
session["ground_truth"] = {}
session["ground_truth"][str(page_number)] = entries
# Also save to disk
os.makedirs(_ground_truth_dir(), exist_ok=True)
gt_path = os.path.join(_ground_truth_dir(), f"{session_id}_page{page_number}.json")
gt_data = {
"session_id": session_id,
"page_number": page_number,
"saved_at": datetime.now().isoformat(),
"entry_count": len(entries),
"entries": entries,
}
with open(gt_path, 'w', encoding='utf-8') as f:
json.dump(gt_data, f, ensure_ascii=False, indent=2)
logger.info(f"Ground truth saved: {len(entries)} entries to {gt_path}")
confirmed = sum(1 for e in entries if e.get("status") == "confirmed")
edited = sum(1 for e in entries if e.get("status") == "edited")
skipped = sum(1 for e in entries if e.get("status") == "skipped")
return {
"success": True,
"saved_count": len(entries),
"confirmed": confirmed,
"edited": edited,
"skipped": skipped,
"file_path": gt_path,
}
@analysis_router.get("/sessions/{session_id}/ground-truth/{page_number}")
async def load_ground_truth(session_id: str, page_number: int):
"""Load saved ground truth for a page."""
logger.info(f"Load ground truth for session {session_id}, page {page_number}")
if session_id not in _get_sessions():
raise HTTPException(status_code=404, detail="Session not found")
# Try session cache first
session = _get_sessions()[session_id]
cached = session.get("ground_truth", {}).get(str(page_number))
if cached:
return {"success": True, "entries": cached, "source": "cache"}
# Try disk
gt_path = os.path.join(_ground_truth_dir(), f"{session_id}_page{page_number}.json")
if not os.path.exists(gt_path):
raise HTTPException(status_code=404, detail="No ground truth found for this page")
with open(gt_path, 'r', encoding='utf-8') as f:
gt_data = json.load(f)
return {"success": True, "entries": gt_data.get("entries", []), "source": "disk"}
# ─── Learning Module Generation ─────────────────────────────────────────────
class GenerateLearningUnitRequest(BaseModel):
grade: Optional[str] = None
generate_modules: bool = True
@analysis_router.post("/sessions/{session_id}/generate-learning-unit")
async def generate_learning_unit_endpoint(session_id: str, request: GenerateLearningUnitRequest = None):
"""
Create a Learning Unit from the vocabulary in this session.
1. Takes vocabulary from the session
2. Creates a Learning Unit in backend-lehrer
3. Optionally triggers MC/Cloze/QA generation
Returns the created unit info and generation status.
"""
if request is None:
request = GenerateLearningUnitRequest()
if session_id not in _get_sessions():
raise HTTPException(status_code=404, detail="Session not found")
session = _get_sessions()[session_id]
vocabulary = session.get("vocabulary", [])
if not vocabulary:
raise HTTPException(status_code=400, detail="No vocabulary in this session")
try:
from vocab_learn_bridge import create_learning_unit, generate_learning_modules
# Step 1: Create Learning Unit
result = await create_learning_unit(
session_name=session["name"],
vocabulary=vocabulary,
grade=request.grade,
)
# Step 2: Generate modules if requested
if request.generate_modules:
try:
gen_result = await generate_learning_modules(
unit_id=result["unit_id"],
analysis_path=result["analysis_path"],
)
result["generation"] = gen_result
except Exception as e:
logger.warning(f"Module generation failed (unit created): {e}")
result["generation"] = {"status": "error", "reason": str(e)}
return result
except ImportError:
raise HTTPException(status_code=501, detail="vocab_learn_bridge module not available")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except RuntimeError as e:
raise HTTPException(status_code=502, detail=str(e))
# =============================================================================
# Include compare_ocr_methods & analyze_grid from companion module
# =============================================================================
from vocab_worksheet_compare_api import compare_router # noqa: E402
analysis_router.include_router(compare_router)