Files
breakpilot-lehrer/klausur-service/backend/worksheet/editor_reconstruct.py
Benjamin Admin eecb5472dd
Some checks failed
CI / go-lint (push) Has been skipped
CI / python-lint (push) Has been skipped
CI / nodejs-lint (push) Has been skipped
CI / test-go-school (push) Successful in 1m7s
CI / test-go-edu-search (push) Successful in 46s
CI / test-python-klausur (push) Failing after 2m32s
CI / test-python-agent-core (push) Successful in 33s
CI / test-nodejs-website (push) Successful in 34s
Fix: Update all old-style imports inside packages to new paths
65 files in klausur-service packages + 3 in backend-lehrer packages
had stale imports referencing deleted shim modules.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-26 00:19:13 +02:00

256 lines
8.3 KiB
Python

"""
Worksheet Editor Reconstruct — Document reconstruction from vocab sessions.
"""
import io
import uuid
import base64
import logging
from typing import List, Dict
import numpy as np
from .editor_models import (
ReconstructRequest,
ReconstructResponse,
)
logger = logging.getLogger(__name__)
async def reconstruct_document_logic(request: ReconstructRequest) -> ReconstructResponse:
"""
Reconstruct a document from a vocab session into Fabric.js canvas format.
This function:
1. Loads the original PDF from the vocab session
2. Runs OCR with position tracking
3. Creates Fabric.js canvas JSON with positioned elements
4. Maps extracted vocabulary to their positions
Returns ReconstructResponse ready to send to the client.
"""
from fastapi import HTTPException
from vocab.worksheet.api import _sessions, convert_pdf_page_to_image
# Check if session exists
if request.session_id not in _sessions:
raise HTTPException(status_code=404, detail=f"Session {request.session_id} not found")
session = _sessions[request.session_id]
if not session.get("pdf_data"):
raise HTTPException(status_code=400, detail="Session has no PDF data")
pdf_data = session["pdf_data"]
page_count = session.get("pdf_page_count", 1)
if request.page_number < 1 or request.page_number > page_count:
raise HTTPException(
status_code=400,
detail=f"Page {request.page_number} not found. PDF has {page_count} pages."
)
vocabulary = session.get("vocabulary", [])
page_vocab = [v for v in vocabulary if v.get("source_page") == request.page_number]
logger.info(f"Reconstructing page {request.page_number} from session {request.session_id}")
logger.info(f"Found {len(page_vocab)} vocabulary items for this page")
image_bytes = await convert_pdf_page_to_image(pdf_data, request.page_number)
if not image_bytes:
raise HTTPException(status_code=500, detail="Failed to convert PDF page to image")
from PIL import Image
img = Image.open(io.BytesIO(image_bytes))
img_width, img_height = img.size
from hybrid_vocab_extractor import run_paddle_ocr
ocr_regions, raw_text = run_paddle_ocr(image_bytes)
logger.info(f"OCR found {len(ocr_regions)} text regions")
A4_WIDTH = 794
A4_HEIGHT = 1123
scale_x = A4_WIDTH / img_width
scale_y = A4_HEIGHT / img_height
fabric_objects = []
# 1. Add white background
fabric_objects.append({
"type": "rect", "left": 0, "top": 0,
"width": A4_WIDTH, "height": A4_HEIGHT,
"fill": "#ffffff", "selectable": False,
"evented": False, "isBackground": True
})
# 2. Group OCR regions by Y-coordinate to detect rows
sorted_regions = sorted(ocr_regions, key=lambda r: (r.y1, r.x1))
# 3. Detect headers (larger text at top)
headers = []
for region in sorted_regions:
height = region.y2 - region.y1
if region.y1 < img_height * 0.15 and height > 30:
headers.append(region)
# 4. Create text objects for each region
vocab_matched = 0
for region in sorted_regions:
left = int(region.x1 * scale_x)
top = int(region.y1 * scale_y)
is_header = region in headers
region_height = region.y2 - region.y1
base_font_size = max(10, min(32, int(region_height * scale_y * 0.8)))
if is_header:
base_font_size = max(base_font_size, 24)
is_vocab = False
vocab_match = None
for v in page_vocab:
if v.get("english", "").lower() in region.text.lower() or \
v.get("german", "").lower() in region.text.lower():
is_vocab = True
vocab_match = v
vocab_matched += 1
break
text_obj = {
"type": "i-text",
"id": f"text_{uuid.uuid4().hex[:8]}",
"left": left, "top": top,
"text": region.text,
"fontFamily": "Arial",
"fontSize": base_font_size,
"fontWeight": "bold" if is_header else "normal",
"fill": "#000000",
"originX": "left", "originY": "top",
}
if is_vocab and vocab_match:
text_obj["isVocabulary"] = True
text_obj["vocabularyId"] = vocab_match.get("id")
text_obj["english"] = vocab_match.get("english")
text_obj["german"] = vocab_match.get("german")
fabric_objects.append(text_obj)
# 5. If include_images, detect and extract image regions
if request.include_images:
image_regions = await _detect_image_regions(image_bytes, ocr_regions, img_width, img_height)
for i, img_region in enumerate(image_regions):
img_x1 = int(img_region["x1"])
img_y1 = int(img_region["y1"])
img_x2 = int(img_region["x2"])
img_y2 = int(img_region["y2"])
cropped = img.crop((img_x1, img_y1, img_x2, img_y2))
buffer = io.BytesIO()
cropped.save(buffer, format='PNG')
buffer.seek(0)
img_base64 = f"data:image/png;base64,{base64.b64encode(buffer.getvalue()).decode('utf-8')}"
fabric_objects.append({
"type": "image",
"id": f"img_{uuid.uuid4().hex[:8]}",
"left": int(img_x1 * scale_x),
"top": int(img_y1 * scale_y),
"width": int((img_x2 - img_x1) * scale_x),
"height": int((img_y2 - img_y1) * scale_y),
"src": img_base64,
"scaleX": 1, "scaleY": 1,
})
import json
canvas_data = {
"version": "6.0.0",
"objects": fabric_objects,
"background": "#ffffff"
}
return ReconstructResponse(
canvas_json=json.dumps(canvas_data),
page_width=A4_WIDTH,
page_height=A4_HEIGHT,
elements_count=len(fabric_objects),
vocabulary_matched=vocab_matched,
message=f"Reconstructed page {request.page_number} with {len(fabric_objects)} elements, "
f"{vocab_matched} vocabulary items matched"
)
async def _detect_image_regions(
image_bytes: bytes,
ocr_regions: list,
img_width: int,
img_height: int
) -> List[Dict]:
"""
Detect image/graphic regions in the document.
Uses a simple approach:
1. Find large gaps between text regions (potential image areas)
2. Use edge detection to find bounded regions
3. Filter out text areas
"""
from PIL import Image
import cv2
try:
img = Image.open(io.BytesIO(image_bytes))
img_array = np.array(img.convert('L'))
text_mask = np.ones_like(img_array, dtype=bool)
for region in ocr_regions:
x1 = max(0, region.x1 - 5)
y1 = max(0, region.y1 - 5)
x2 = min(img_width, region.x2 + 5)
y2 = min(img_height, region.y2 + 5)
text_mask[y1:y2, x1:x2] = False
image_regions = []
edges = cv2.Canny(img_array, 50, 150)
edges[~text_mask] = 0
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for contour in contours:
x, y, w, h = cv2.boundingRect(contour)
if w > 50 and h > 50:
if w < img_width * 0.9 and h < img_height * 0.9:
region_content = img_array[y:y+h, x:x+w]
variance = np.var(region_content)
if variance > 500:
image_regions.append({
"x1": x, "y1": y,
"x2": x + w, "y2": y + h
})
filtered_regions = []
for region in sorted(image_regions, key=lambda r: (r["x2"]-r["x1"])*(r["y2"]-r["y1"]), reverse=True):
overlaps = False
for existing in filtered_regions:
if not (region["x2"] < existing["x1"] or region["x1"] > existing["x2"] or
region["y2"] < existing["y1"] or region["y1"] > existing["y2"]):
overlaps = True
break
if not overlaps:
filtered_regions.append(region)
logger.info(f"Detected {len(filtered_regions)} image regions")
return filtered_regions[:10]
except Exception as e:
logger.warning(f"Image region detection failed: {e}")
return []