from pathlib import Path
from typing import List
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import HTMLResponse
import shutil
from ai_processor import (
dummy_process_scan,
describe_scan_with_ai,
analyze_scan_structure_with_ai,
build_clean_html_from_analysis,
remove_handwriting_from_scan, # B: Handschriftentfernung (muss in ai_processor.py existieren)
)
app = FastAPI()
BASE_DIR = Path.home() / "Arbeitsblaetter"
EINGANG_DIR = BASE_DIR / "Eingang"
BEREINIGT_DIR = BASE_DIR / "Bereinigt"
EDITIERBAR_DIR = BASE_DIR / "Editierbar"
NEU_GENERIERT_DIR = BASE_DIR / "Neu_generiert"
VALID_SUFFIXES = {".jpg", ".jpeg", ".png", ".pdf", ".JPG", ".JPEG", ".PNG", ".PDF"}
for d in [EINGANG_DIR, BEREINIGT_DIR, EDITIERBAR_DIR, NEU_GENERIERT_DIR]:
d.mkdir(parents=True, exist_ok=True)
def is_valid_input_file(path: Path) -> bool:
return path.is_file() and not path.name.startswith(".") and path.suffix in VALID_SUFFIXES
@app.get("/")
def home():
return {
"status": "OK",
"message": "Deine lokale App läuft!",
"base_dir": str(BASE_DIR),
}
# --- Upload-Bereich ---
@app.post("/upload-scan")
async def upload_scan(file: UploadFile = File(...)):
"""
Einfache Variante: eine einzelne Datei hochladen.
"""
target_path = EINGANG_DIR / file.filename
with target_path.open("wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return {
"status": "OK",
"message": "Scan gespeichert",
"saved_as": str(target_path),
}
@app.get("/upload-form", response_class=HTMLResponse)
def upload_form():
"""
Einfache HTML-Seite für Upload mehrerer Dateien.
Kann z.B. vom Handy unter http://DEINE-IP:8000/upload-form aufgerufen werden.
"""
return """
Arbeitsblätter hochladen
Arbeitsblätter hochladen
Wähle ein oder mehrere eingescannt Arbeitsblätter (JPG/PNG/PDF) aus und lade sie hoch.
"""
@app.post("/upload-multi")
async def upload_multi(files: List[UploadFile] = File(...)):
"""
Mehrere Dateien auf einmal hochladen.
Alle Dateien werden im Eingang-Ordner gespeichert.
"""
saved = []
for file in files:
target_path = EINGANG_DIR / file.filename
with target_path.open("wb") as buffer:
shutil.copyfileobj(file.file, buffer)
saved.append(str(target_path))
return {
"status": "OK",
"message": "Dateien gespeichert",
"saved_as": saved,
}
# --- Dateiliste ---
@app.get("/eingang-dateien")
def list_eingang_files():
files = [f.name for f in EINGANG_DIR.iterdir() if is_valid_input_file(f)]
return {"eingang": files}
# --- B: Dummy- und Handschrift-Verarbeitung ---
@app.post("/process-all")
def process_all_scans():
processed = []
skipped = []
for f in EINGANG_DIR.iterdir():
if is_valid_input_file(f):
result_path = dummy_process_scan(f)
processed.append(result_path.name)
else:
skipped.append(f.name)
return {
"status": "OK",
"message": "Dummy-Verarbeitung abgeschlossen",
"processed_files": processed,
"skipped": skipped,
}
@app.post("/remove-handwriting-all")
def remove_handwriting_all():
"""
Entfernt bei allen geeigneten Bilddateien im Eingang-Ordner möglichst die Handschrift
und legt bereinigte Bilder im Bereinigt-Ordner ab.
"""
cleaned = []
errors = []
skipped = []
for f in EINGANG_DIR.iterdir():
if not is_valid_input_file(f):
skipped.append(f.name)
continue
# Nur JPG/PNG für die Bild-Handschriftentfernung
if f.suffix.lower() not in {".jpg", ".jpeg", ".png"}:
skipped.append(f.name)
continue
try:
out_path = remove_handwriting_from_scan(f)
cleaned.append(out_path.name)
except Exception as e:
errors.append({"file": f.name, "error": str(e)})
status = "OK"
message = "Bereinigte Bilder erzeugt."
if errors and not cleaned:
status = "ERROR"
message = "Keine bereinigten Bilder erzeugt, nur Fehler."
elif errors and cleaned:
status = "PARTIAL"
message = "Einige bereinigte Bilder erzeugt, aber auch Fehler."
return {
"status": status,
"message": message,
"cleaned": cleaned,
"errors": errors,
"skipped": skipped,
}
# --- Beschreiben / Analysieren / Clean-HTML ---
@app.post("/describe-all")
def describe_all_scans():
described = []
errors = []
skipped = []
for f in EINGANG_DIR.iterdir():
if not is_valid_input_file(f):
skipped.append(f.name)
continue
try:
out_path = describe_scan_with_ai(f)
described.append(out_path.name)
except Exception as e:
errors.append({"file": f.name, "error": str(e)})
status = "OK"
message = "Beschreibungen erstellt"
if errors and not described:
status = "ERROR"
message = "Keine Beschreibungen erstellt, nur Fehler."
elif errors and described:
status = "PARTIAL"
message = "Einige Beschreibungen erstellt, aber auch Fehler."
return {
"status": status,
"message": message,
"described": described,
"errors": errors,
"skipped": skipped,
}
@app.post("/analyze-all")
def analyze_all_scans():
analyzed = []
errors = []
skipped = []
for f in EINGANG_DIR.iterdir():
if not is_valid_input_file(f):
skipped.append(f.name)
continue
try:
out_path = analyze_scan_structure_with_ai(f)
analyzed.append(out_path.name)
except Exception as e:
errors.append({"file": f.name, "error": str(e)})
status = "OK"
message = "Analysen erstellt"
if errors and not analyzed:
status = "ERROR"
message = "Keine Analysen erstellt, nur Fehler."
elif errors and analyzed:
status = "PARTIAL"
message = "Einige Analysen erstellt, aber auch Fehler."
return {
"status": status,
"message": message,
"analyzed": analyzed,
"errors": errors,
"skipped": skipped,
}
@app.post("/generate-clean")
def generate_clean_worksheets():
"""
Nimmt alle *_analyse.json-Dateien und erzeugt *_clean.html-Arbeitsblätter.
"""
generated = []
errors = []
for f in BEREINIGT_DIR.iterdir():
if f.is_file() and f.suffix == ".json" and f.name.endswith("_analyse.json"):
try:
out_path = build_clean_html_from_analysis(f)
generated.append(out_path.name)
except Exception as e:
errors.append({"file": f.name, "error": str(e)})
status = "OK"
message = "Clean-HTML-Arbeitsblätter erzeugt"
if errors and not generated:
status = "ERROR"
message = "Keine HTML-Arbeitsblätter erzeugt, nur Fehler."
elif errors and generated:
status = "PARTIAL"
message = "Einige HTML-Arbeitsblätter erzeugt, aber auch Fehler."
return {
"status": status,
"message": message,
"generated": generated,
"errors": errors,
}
@app.get("/bereinigt-dateien")
def list_bereinigt_files():
files = [f.name for f in BEREINIGT_DIR.iterdir() if f.is_file()]
return {"bereinigt": files}