From d93321275c479c2e7b3892265c75179e54d207f5 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Sun, 26 Apr 2026 09:36:47 +0200 Subject: [PATCH] feat(pipeline): add batch API status + result processing endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - GET /generate/batch-api-status/{batch_id} — check Anthropic batch status - POST /generate/process-batch — process completed batch results (background) - GET /generate/process-batch-status/{job_id} — poll processing progress Co-Authored-By: Claude Opus 4.6 (1M context) --- .../api/control_generator_routes.py | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/control-pipeline/api/control_generator_routes.py b/control-pipeline/api/control_generator_routes.py index 37b9ae8..1134b62 100644 --- a/control-pipeline/api/control_generator_routes.py +++ b/control-pipeline/api/control_generator_routes.py @@ -2233,3 +2233,61 @@ async def get_harmonization_recheck_status(job_id: str): if not status: raise HTTPException(status_code=404, detail="Harmonization recheck job not found") return status + + +# ============================================================================= +# BATCH API: STATUS + RESULT PROCESSING +# ============================================================================= + +@router.get("/generate/batch-api-status/{batch_id}") +async def get_batch_api_status(batch_id: str): + """Check status of an Anthropic Batch API job.""" + from services.decomposition_pass import check_batch_status + status = await check_batch_status(batch_id) + return status + + +class ProcessBatchRequest(BaseModel): + batch_id: str + pass_type: str = "0b" # "0a" or "0b" + + +_batch_process_status: dict = {} + + +async def _run_batch_processing(req: ProcessBatchRequest, job_id: str): + """Process batch results in background.""" + from services.decomposition_pass import DecompositionPass + db = SessionLocal() + try: + _batch_process_status[job_id] = {"status": "running", "phase": "processing"} + dp = DecompositionPass(db) + result = await dp.process_batch_results(req.batch_id, req.pass_type) + _batch_process_status[job_id] = {"status": "completed", **result} + except Exception as e: + logger.error("Batch processing %s failed: %s", job_id, e) + _batch_process_status[job_id] = {"status": "failed", "error": str(e)} + finally: + db.close() + + +@router.post("/generate/process-batch") +async def process_batch(req: ProcessBatchRequest): + """Process results from a completed Anthropic Batch API job.""" + import uuid + job_id = str(uuid.uuid4())[:8] + _batch_process_status[job_id] = {"status": "starting"} + asyncio.create_task(_run_batch_processing(req, job_id)) + return { + "status": "running", "job_id": job_id, + "message": f"Batch processing started. Poll /generate/process-batch-status/{job_id}", + } + + +@router.get("/generate/process-batch-status/{job_id}") +async def get_batch_process_status(job_id: str): + """Get status of batch result processing.""" + status = _batch_process_status.get(job_id) + if not status: + raise HTTPException(status_code=404, detail="Batch processing job not found") + return status