feat(pipeline): add batch API status + result processing endpoints

- GET /generate/batch-api-status/{batch_id} — check Anthropic batch status
- POST /generate/process-batch — process completed batch results (background)
- GET /generate/process-batch-status/{job_id} — poll processing progress

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-04-26 09:36:47 +02:00
parent 629b9d9ca5
commit d93321275c

View File

@@ -2233,3 +2233,61 @@ async def get_harmonization_recheck_status(job_id: str):
if not status:
raise HTTPException(status_code=404, detail="Harmonization recheck job not found")
return status
# =============================================================================
# BATCH API: STATUS + RESULT PROCESSING
# =============================================================================
@router.get("/generate/batch-api-status/{batch_id}")
async def get_batch_api_status(batch_id: str):
"""Check status of an Anthropic Batch API job."""
from services.decomposition_pass import check_batch_status
status = await check_batch_status(batch_id)
return status
class ProcessBatchRequest(BaseModel):
batch_id: str
pass_type: str = "0b" # "0a" or "0b"
_batch_process_status: dict = {}
async def _run_batch_processing(req: ProcessBatchRequest, job_id: str):
"""Process batch results in background."""
from services.decomposition_pass import DecompositionPass
db = SessionLocal()
try:
_batch_process_status[job_id] = {"status": "running", "phase": "processing"}
dp = DecompositionPass(db)
result = await dp.process_batch_results(req.batch_id, req.pass_type)
_batch_process_status[job_id] = {"status": "completed", **result}
except Exception as e:
logger.error("Batch processing %s failed: %s", job_id, e)
_batch_process_status[job_id] = {"status": "failed", "error": str(e)}
finally:
db.close()
@router.post("/generate/process-batch")
async def process_batch(req: ProcessBatchRequest):
"""Process results from a completed Anthropic Batch API job."""
import uuid
job_id = str(uuid.uuid4())[:8]
_batch_process_status[job_id] = {"status": "starting"}
asyncio.create_task(_run_batch_processing(req, job_id))
return {
"status": "running", "job_id": job_id,
"message": f"Batch processing started. Poll /generate/process-batch-status/{job_id}",
}
@router.get("/generate/process-batch-status/{job_id}")
async def get_batch_process_status(job_id: str):
"""Get status of batch result processing."""
status = _batch_process_status.get(job_id)
if not status:
raise HTTPException(status_code=404, detail="Batch processing job not found")
return status