From 642a8587b5ce428b7ffcf5651016816fda41caeb Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Wed, 22 Apr 2026 08:44:31 +0200 Subject: [PATCH] feat(control-pipeline): add batch-dedup endpoint + source_citation JSONB migration - Add POST /v1/canonical/generate/batch-dedup endpoint for Pass 0b atomic controls deduplication (Phase 1: intra-group, Phase 2: cross-group) - source_citation column migrated from TEXT to JSONB (5,459 rows converted) - migrate_jsonb.py script added for generation_metadata conversion Co-Authored-By: Claude Opus 4.6 (1M context) --- .../api/control_generator_routes.py | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/control-pipeline/api/control_generator_routes.py b/control-pipeline/api/control_generator_routes.py index 2f2cfe8..7692303 100644 --- a/control-pipeline/api/control_generator_routes.py +++ b/control-pipeline/api/control_generator_routes.py @@ -1336,6 +1336,72 @@ async def get_repair_backfill_status(backfill_id: str): return status +# ============================================================================= +# BATCH DEDUP +# ============================================================================= + +class BatchDedupRequest(BaseModel): + dry_run: bool = True + hint_filter: Optional[str] = None # Only process groups matching this hint prefix + + +_batch_dedup_status: dict = {} + + +async def _run_batch_dedup(req: BatchDedupRequest, dedup_id: str): + """Run batch dedup in background.""" + from services.batch_dedup_runner import BatchDedupRunner + + db = SessionLocal() + try: + runner = BatchDedupRunner(db) + _batch_dedup_status[dedup_id] = {"status": "running", "phase": "starting"} + + stats = await runner.run(dry_run=req.dry_run, hint_filter=req.hint_filter) + + _batch_dedup_status[dedup_id] = { + "status": "completed", + "dry_run": req.dry_run, + **stats, + } + logger.info("BatchDedup %s completed: %s", dedup_id, stats) + + except Exception as e: + logger.error("BatchDedup %s failed: %s", dedup_id, e) + _batch_dedup_status[dedup_id] = {"status": "failed", "error": str(e)} + finally: + db.close() + + +@router.post("/generate/batch-dedup") +async def start_batch_dedup(req: BatchDedupRequest): + """Run batch deduplication on Pass 0b atomic controls. + + Phase 1: Intra-group dedup (same merge_group_hint → pick best, link rest) + Phase 2: Cross-group dedup (embed masters, search Qdrant for similar) + + Default is dry_run=True (preview only, no DB changes). + """ + import uuid + dedup_id = str(uuid.uuid4())[:8] + _batch_dedup_status[dedup_id] = {"status": "starting"} + asyncio.create_task(_run_batch_dedup(req, dedup_id)) + return { + "status": "running", + "dedup_id": dedup_id, + "message": f"BatchDedup started. Poll /generate/batch-dedup-status/{dedup_id}", + } + + +@router.get("/generate/batch-dedup-status/{dedup_id}") +async def get_batch_dedup_status(dedup_id: str): + """Get status of a batch dedup job.""" + status = _batch_dedup_status.get(dedup_id) + if not status: + raise HTTPException(status_code=404, detail="BatchDedup job not found") + return status + + # ============================================================================= # ANCHOR BACKFILL # =============================================================================