diff --git a/control-pipeline/api/control_generator_routes.py b/control-pipeline/api/control_generator_routes.py index 37b9ae8..1134b62 100644 --- a/control-pipeline/api/control_generator_routes.py +++ b/control-pipeline/api/control_generator_routes.py @@ -2233,3 +2233,61 @@ async def get_harmonization_recheck_status(job_id: str): if not status: raise HTTPException(status_code=404, detail="Harmonization recheck job not found") return status + + +# ============================================================================= +# BATCH API: STATUS + RESULT PROCESSING +# ============================================================================= + +@router.get("/generate/batch-api-status/{batch_id}") +async def get_batch_api_status(batch_id: str): + """Check status of an Anthropic Batch API job.""" + from services.decomposition_pass import check_batch_status + status = await check_batch_status(batch_id) + return status + + +class ProcessBatchRequest(BaseModel): + batch_id: str + pass_type: str = "0b" # "0a" or "0b" + + +_batch_process_status: dict = {} + + +async def _run_batch_processing(req: ProcessBatchRequest, job_id: str): + """Process batch results in background.""" + from services.decomposition_pass import DecompositionPass + db = SessionLocal() + try: + _batch_process_status[job_id] = {"status": "running", "phase": "processing"} + dp = DecompositionPass(db) + result = await dp.process_batch_results(req.batch_id, req.pass_type) + _batch_process_status[job_id] = {"status": "completed", **result} + except Exception as e: + logger.error("Batch processing %s failed: %s", job_id, e) + _batch_process_status[job_id] = {"status": "failed", "error": str(e)} + finally: + db.close() + + +@router.post("/generate/process-batch") +async def process_batch(req: ProcessBatchRequest): + """Process results from a completed Anthropic Batch API job.""" + import uuid + job_id = str(uuid.uuid4())[:8] + _batch_process_status[job_id] = {"status": "starting"} + asyncio.create_task(_run_batch_processing(req, job_id)) + return { + "status": "running", "job_id": job_id, + "message": f"Batch processing started. Poll /generate/process-batch-status/{job_id}", + } + + +@router.get("/generate/process-batch-status/{job_id}") +async def get_batch_process_status(job_id: str): + """Get status of batch result processing.""" + status = _batch_process_status.get(job_id) + if not status: + raise HTTPException(status_code=404, detail="Batch processing job not found") + return status