fix(pipeline): make dedup Phase 2 resilient — paginated, timeout, per-control error handling
- Paginated DB queries (100 rows/page) instead of loading all 166k rows - Individual timeout (30s) per embedding + qdrant call - Per-control try/except — one failure doesn't kill the job - Sequential processing (no asyncio.gather) for stability - Progress logging every 500 controls Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -330,112 +330,140 @@ class BatchDedupRunner:
|
|||||||
async def _run_cross_group_pass(self):
|
async def _run_cross_group_pass(self):
|
||||||
"""Phase 2: Find cross-group duplicates among surviving masters.
|
"""Phase 2: Find cross-group duplicates among surviving masters.
|
||||||
|
|
||||||
After Phase 1, ~52k masters remain. Many have similar semantics
|
Paginated DB queries + individual error handling per control.
|
||||||
despite different merge_group_hints (e.g. different German spellings).
|
Never loads all rows into memory at once.
|
||||||
This pass embeds all masters and finds near-duplicates via Qdrant.
|
|
||||||
"""
|
"""
|
||||||
logger.info("BatchDedup Phase 2: Cross-group pass starting...")
|
logger.info("BatchDedup Phase 2: Cross-group pass starting...")
|
||||||
|
|
||||||
rows = self.db.execute(text("""
|
# Count total
|
||||||
SELECT id::text, control_id, title,
|
total_row = self.db.execute(text("""
|
||||||
generation_metadata->>'merge_group_hint' as merge_group_hint
|
SELECT COUNT(*) FROM canonical_controls
|
||||||
FROM canonical_controls
|
|
||||||
WHERE decomposition_method = 'pass0b'
|
WHERE decomposition_method = 'pass0b'
|
||||||
AND release_state != 'duplicate'
|
AND release_state != 'duplicate'
|
||||||
AND release_state != 'deprecated'
|
AND release_state != 'deprecated'
|
||||||
ORDER BY control_id
|
""")).fetchone()
|
||||||
""")).fetchall()
|
total = total_row[0] if total_row else 0
|
||||||
|
|
||||||
self._progress_total = len(rows)
|
self._progress_total = total
|
||||||
self._progress_count = 0
|
self._progress_count = 0
|
||||||
logger.info("BatchDedup Cross-group: %d masters to check", len(rows))
|
logger.info("BatchDedup Cross-group: %d masters to check", total)
|
||||||
cross_linked = 0
|
cross_linked = 0
|
||||||
cross_review = 0
|
cross_review = 0
|
||||||
|
|
||||||
# Process in parallel batches for embedding + Qdrant search
|
# Paginated processing — 100 rows per DB query
|
||||||
PARALLEL_BATCH = 10
|
DB_PAGE = 100
|
||||||
|
last_control_id = ""
|
||||||
|
|
||||||
async def _embed_and_search(r):
|
while True:
|
||||||
"""Embed one control and search Qdrant — safe for asyncio.gather."""
|
rows = self.db.execute(text("""
|
||||||
hint = r[3] or ""
|
SELECT id::text, control_id, title,
|
||||||
parts = hint.split(":", 2)
|
generation_metadata->>'merge_group_hint' as merge_group_hint
|
||||||
action = parts[0] if len(parts) > 0 else ""
|
FROM canonical_controls
|
||||||
obj = parts[1] if len(parts) > 1 else ""
|
WHERE decomposition_method = 'pass0b'
|
||||||
canonical = canonicalize_text(action, obj, r[2])
|
AND release_state != 'duplicate'
|
||||||
embedding = await get_embedding(canonical)
|
AND release_state != 'deprecated'
|
||||||
if not embedding:
|
AND control_id > :last_id
|
||||||
return None
|
ORDER BY control_id
|
||||||
results = await qdrant_search_cross_regulation(
|
LIMIT :page_size
|
||||||
embedding, top_k=5, collection=self.collection,
|
"""), {"last_id": last_control_id, "page_size": DB_PAGE}).fetchall()
|
||||||
)
|
|
||||||
return (r, results)
|
|
||||||
|
|
||||||
for batch_start in range(0, len(rows), PARALLEL_BATCH):
|
if not rows:
|
||||||
batch = rows[batch_start:batch_start + PARALLEL_BATCH]
|
break
|
||||||
tasks = [_embed_and_search(r) for r in batch]
|
|
||||||
results_batch = await asyncio.gather(*tasks, return_exceptions=True)
|
|
||||||
|
|
||||||
for res in results_batch:
|
last_control_id = rows[-1][1]
|
||||||
if res is None or isinstance(res, Exception):
|
|
||||||
if isinstance(res, Exception):
|
# Process each control individually (no asyncio.gather — more stable)
|
||||||
logger.error("BatchDedup embed/search error: %s", res)
|
for r in rows:
|
||||||
|
try:
|
||||||
|
hint = r[3] or ""
|
||||||
|
parts = hint.split(":", 2)
|
||||||
|
action = parts[0] if len(parts) > 0 else ""
|
||||||
|
obj = parts[1] if len(parts) > 1 else ""
|
||||||
|
canonical = canonicalize_text(action, obj, r[2])
|
||||||
|
|
||||||
|
# Timeout per embedding call
|
||||||
|
try:
|
||||||
|
embedding = await asyncio.wait_for(
|
||||||
|
get_embedding(canonical), timeout=30.0
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
self.stats["errors"] += 1
|
self.stats["errors"] += 1
|
||||||
continue
|
|
||||||
|
|
||||||
r, results = res
|
|
||||||
ctrl_uuid = r[0]
|
|
||||||
hint = r[3] or ""
|
|
||||||
|
|
||||||
if not results:
|
|
||||||
continue
|
|
||||||
|
|
||||||
for match in results:
|
|
||||||
match_score = match.get("score", 0.0)
|
|
||||||
match_payload = match.get("payload", {})
|
|
||||||
match_uuid = match_payload.get("control_uuid", "")
|
|
||||||
|
|
||||||
if match_uuid == ctrl_uuid:
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if match_score > LINK_THRESHOLD:
|
if not embedding:
|
||||||
try:
|
continue
|
||||||
self.db.execute(text("""
|
|
||||||
UPDATE canonical_controls
|
|
||||||
SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid)
|
|
||||||
WHERE id = CAST(:dup AS uuid)
|
|
||||||
AND release_state != 'duplicate'
|
|
||||||
"""), {"master": match_uuid, "dup": ctrl_uuid})
|
|
||||||
|
|
||||||
self.db.execute(text("""
|
try:
|
||||||
INSERT INTO control_parent_links
|
results = await asyncio.wait_for(
|
||||||
(control_uuid, parent_control_uuid, link_type, confidence)
|
qdrant_search_cross_regulation(
|
||||||
VALUES (CAST(:cu AS uuid), CAST(:pu AS uuid), 'cross_regulation', :conf)
|
embedding, top_k=5, collection=self.collection,
|
||||||
ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING
|
), timeout=30.0
|
||||||
"""), {"cu": match_uuid, "pu": ctrl_uuid, "conf": match_score})
|
|
||||||
|
|
||||||
transferred = self._transfer_parent_links(match_uuid, ctrl_uuid)
|
|
||||||
self.stats["parent_links_transferred"] += transferred
|
|
||||||
|
|
||||||
self.db.commit()
|
|
||||||
cross_linked += 1
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("BatchDedup cross-group link error %s→%s: %s",
|
|
||||||
ctrl_uuid, match_uuid, e)
|
|
||||||
self.db.rollback()
|
|
||||||
self.stats["errors"] += 1
|
|
||||||
break
|
|
||||||
elif match_score > REVIEW_THRESHOLD:
|
|
||||||
self._write_review(
|
|
||||||
{"control_id": r[1], "title": r[2], "objective": "",
|
|
||||||
"merge_group_hint": hint, "pattern_id": None},
|
|
||||||
match_payload, match_score,
|
|
||||||
)
|
)
|
||||||
cross_review += 1
|
except asyncio.TimeoutError:
|
||||||
break
|
self.stats["errors"] += 1
|
||||||
|
continue
|
||||||
|
|
||||||
processed = min(batch_start + PARALLEL_BATCH, len(rows))
|
ctrl_uuid = r[0]
|
||||||
self._progress_count = processed
|
|
||||||
if processed % 500 < PARALLEL_BATCH:
|
for match in (results or []):
|
||||||
|
match_score = match.get("score", 0.0)
|
||||||
|
match_payload = match.get("payload", {})
|
||||||
|
match_uuid = match_payload.get("control_uuid", "")
|
||||||
|
|
||||||
|
if match_uuid == ctrl_uuid:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if match_score > LINK_THRESHOLD:
|
||||||
|
try:
|
||||||
|
self.db.execute(text("""
|
||||||
|
UPDATE canonical_controls
|
||||||
|
SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid)
|
||||||
|
WHERE id = CAST(:dup AS uuid)
|
||||||
|
AND release_state != 'duplicate'
|
||||||
|
"""), {"master": match_uuid, "dup": ctrl_uuid})
|
||||||
|
|
||||||
|
self.db.execute(text("""
|
||||||
|
INSERT INTO control_parent_links
|
||||||
|
(control_uuid, parent_control_uuid, link_type, confidence)
|
||||||
|
VALUES (CAST(:cu AS uuid), CAST(:pu AS uuid), 'cross_regulation', :conf)
|
||||||
|
ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING
|
||||||
|
"""), {"cu": match_uuid, "pu": ctrl_uuid, "conf": match_score})
|
||||||
|
|
||||||
|
transferred = self._transfer_parent_links(match_uuid, ctrl_uuid)
|
||||||
|
self.stats["parent_links_transferred"] += transferred
|
||||||
|
self.db.commit()
|
||||||
|
cross_linked += 1
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("BatchDedup cross-group link error %s→%s: %s",
|
||||||
|
ctrl_uuid, match_uuid, e)
|
||||||
|
try:
|
||||||
|
self.db.rollback()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self.stats["errors"] += 1
|
||||||
|
break
|
||||||
|
elif match_score > REVIEW_THRESHOLD:
|
||||||
|
self._write_review(
|
||||||
|
{"control_id": r[1], "title": r[2], "objective": "",
|
||||||
|
"merge_group_hint": hint, "pattern_id": None},
|
||||||
|
match_payload, match_score,
|
||||||
|
)
|
||||||
|
cross_review += 1
|
||||||
|
break
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("BatchDedup cross-group control %s error: %s", r[1], e)
|
||||||
|
self.stats["errors"] += 1
|
||||||
|
try:
|
||||||
|
self.db.rollback()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self._progress_count += 1
|
||||||
|
|
||||||
|
# Log progress every page
|
||||||
|
processed = self._progress_count
|
||||||
|
if processed % 500 < DB_PAGE:
|
||||||
logger.info("BatchDedup Cross-group: %d/%d checked, %d linked, %d review",
|
logger.info("BatchDedup Cross-group: %d/%d checked, %d linked, %d review",
|
||||||
processed, len(rows), cross_linked, cross_review)
|
processed, len(rows), cross_linked, cross_review)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user