feat(control-pipeline): add batch-dedup endpoint + source_citation JSONB migration
- Add POST /v1/canonical/generate/batch-dedup endpoint for Pass 0b atomic controls deduplication (Phase 1: intra-group, Phase 2: cross-group) - source_citation column migrated from TEXT to JSONB (5,459 rows converted) - migrate_jsonb.py script added for generation_metadata conversion Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1336,6 +1336,72 @@ async def get_repair_backfill_status(backfill_id: str):
|
|||||||
return status
|
return status
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# BATCH DEDUP
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
class BatchDedupRequest(BaseModel):
|
||||||
|
dry_run: bool = True
|
||||||
|
hint_filter: Optional[str] = None # Only process groups matching this hint prefix
|
||||||
|
|
||||||
|
|
||||||
|
_batch_dedup_status: dict = {}
|
||||||
|
|
||||||
|
|
||||||
|
async def _run_batch_dedup(req: BatchDedupRequest, dedup_id: str):
|
||||||
|
"""Run batch dedup in background."""
|
||||||
|
from services.batch_dedup_runner import BatchDedupRunner
|
||||||
|
|
||||||
|
db = SessionLocal()
|
||||||
|
try:
|
||||||
|
runner = BatchDedupRunner(db)
|
||||||
|
_batch_dedup_status[dedup_id] = {"status": "running", "phase": "starting"}
|
||||||
|
|
||||||
|
stats = await runner.run(dry_run=req.dry_run, hint_filter=req.hint_filter)
|
||||||
|
|
||||||
|
_batch_dedup_status[dedup_id] = {
|
||||||
|
"status": "completed",
|
||||||
|
"dry_run": req.dry_run,
|
||||||
|
**stats,
|
||||||
|
}
|
||||||
|
logger.info("BatchDedup %s completed: %s", dedup_id, stats)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("BatchDedup %s failed: %s", dedup_id, e)
|
||||||
|
_batch_dedup_status[dedup_id] = {"status": "failed", "error": str(e)}
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/generate/batch-dedup")
|
||||||
|
async def start_batch_dedup(req: BatchDedupRequest):
|
||||||
|
"""Run batch deduplication on Pass 0b atomic controls.
|
||||||
|
|
||||||
|
Phase 1: Intra-group dedup (same merge_group_hint → pick best, link rest)
|
||||||
|
Phase 2: Cross-group dedup (embed masters, search Qdrant for similar)
|
||||||
|
|
||||||
|
Default is dry_run=True (preview only, no DB changes).
|
||||||
|
"""
|
||||||
|
import uuid
|
||||||
|
dedup_id = str(uuid.uuid4())[:8]
|
||||||
|
_batch_dedup_status[dedup_id] = {"status": "starting"}
|
||||||
|
asyncio.create_task(_run_batch_dedup(req, dedup_id))
|
||||||
|
return {
|
||||||
|
"status": "running",
|
||||||
|
"dedup_id": dedup_id,
|
||||||
|
"message": f"BatchDedup started. Poll /generate/batch-dedup-status/{dedup_id}",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/generate/batch-dedup-status/{dedup_id}")
|
||||||
|
async def get_batch_dedup_status(dedup_id: str):
|
||||||
|
"""Get status of a batch dedup job."""
|
||||||
|
status = _batch_dedup_status.get(dedup_id)
|
||||||
|
if not status:
|
||||||
|
raise HTTPException(status_code=404, detail="BatchDedup job not found")
|
||||||
|
return status
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
# ANCHOR BACKFILL
|
# ANCHOR BACKFILL
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
|
|||||||
Reference in New Issue
Block a user