fix: add db.rollback() to batch dedup error handlers

SQLAlchemy sessions enter a failed state after SQL errors.
Without rollback(), all subsequent queries on the same session
fail with InFailedSqlTransaction. Added try/except with rollback
in _mark_duplicate, _mark_duplicate_to, _write_review, cross-group
pass, and the main phase1 loop.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-03-24 08:41:36 +01:00
parent 770f0b5ab0
commit 9282850138

View File

@@ -141,6 +141,10 @@ class BatchDedupRunner:
except Exception as e: except Exception as e:
logger.error("BatchDedup Phase 1 error on hint %s: %s", hint, e) logger.error("BatchDedup Phase 1 error on hint %s: %s", hint, e)
self.stats["errors"] += 1 self.stats["errors"] += 1
try:
self.db.rollback()
except Exception:
pass
logger.info( logger.info(
"BatchDedup Phase 1 done: %d masters, %d linked, %d review", "BatchDedup Phase 1 done: %d masters, %d linked, %d review",
@@ -372,26 +376,32 @@ class BatchDedupRunner:
# Simple check: different control UUID is enough # Simple check: different control UUID is enough
if match_score > LINK_THRESHOLD: if match_score > LINK_THRESHOLD:
# Mark the worse one as duplicate # Mark the worse one as duplicate
self.db.execute(text(""" try:
UPDATE canonical_controls self.db.execute(text("""
SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid) UPDATE canonical_controls
WHERE id = CAST(:dup AS uuid) SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid)
AND release_state != 'duplicate' WHERE id = CAST(:dup AS uuid)
"""), {"master": match_uuid, "dup": uuid}) AND release_state != 'duplicate'
"""), {"master": match_uuid, "dup": uuid})
self.db.execute(text(""" self.db.execute(text("""
INSERT INTO control_parent_links INSERT INTO control_parent_links
(control_uuid, parent_control_uuid, link_type, confidence) (control_uuid, parent_control_uuid, link_type, confidence)
VALUES (CAST(:cu AS uuid), CAST(:pu AS uuid), 'cross_regulation', :conf) VALUES (CAST(:cu AS uuid), CAST(:pu AS uuid), 'cross_regulation', :conf)
ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING
"""), {"cu": match_uuid, "pu": uuid, "conf": match_score}) """), {"cu": match_uuid, "pu": uuid, "conf": match_score})
# Transfer parent links # Transfer parent links
transferred = self._transfer_parent_links(match_uuid, uuid) transferred = self._transfer_parent_links(match_uuid, uuid)
self.stats["parent_links_transferred"] += transferred self.stats["parent_links_transferred"] += transferred
self.db.commit() self.db.commit()
cross_linked += 1 cross_linked += 1
except Exception as e:
logger.error("BatchDedup cross-group link error %s%s: %s",
uuid, match_uuid, e)
self.db.rollback()
self.stats["errors"] += 1
break # Only one cross-link per control break # Only one cross-link per control
elif match_score > REVIEW_THRESHOLD: elif match_score > REVIEW_THRESHOLD:
self._write_review( self._write_review(
@@ -474,43 +484,55 @@ class BatchDedupRunner:
async def _mark_duplicate(self, master: dict, candidate: dict, confidence: float): async def _mark_duplicate(self, master: dict, candidate: dict, confidence: float):
"""Mark candidate as duplicate of master, transfer parent links.""" """Mark candidate as duplicate of master, transfer parent links."""
self.db.execute(text(""" try:
UPDATE canonical_controls self.db.execute(text("""
SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid) UPDATE canonical_controls
WHERE id = CAST(:cand AS uuid) SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid)
"""), {"master": master["uuid"], "cand": candidate["uuid"]}) WHERE id = CAST(:cand AS uuid)
"""), {"master": master["uuid"], "cand": candidate["uuid"]})
self.db.execute(text(""" self.db.execute(text("""
INSERT INTO control_parent_links INSERT INTO control_parent_links
(control_uuid, parent_control_uuid, link_type, confidence) (control_uuid, parent_control_uuid, link_type, confidence)
VALUES (CAST(:master AS uuid), CAST(:cand_parent AS uuid), 'dedup_merge', :conf) VALUES (CAST(:master AS uuid), CAST(:cand_parent AS uuid), 'dedup_merge', :conf)
ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING
"""), {"master": master["uuid"], "cand_parent": candidate["uuid"], "conf": confidence}) """), {"master": master["uuid"], "cand_parent": candidate["uuid"], "conf": confidence})
transferred = self._transfer_parent_links(master["uuid"], candidate["uuid"]) transferred = self._transfer_parent_links(master["uuid"], candidate["uuid"])
self.stats["parent_links_transferred"] += transferred self.stats["parent_links_transferred"] += transferred
self.db.commit() self.db.commit()
except Exception as e:
logger.error("BatchDedup _mark_duplicate error %s%s: %s",
candidate["uuid"], master["uuid"], e)
self.db.rollback()
raise
async def _mark_duplicate_to(self, master_uuid: str, candidate: dict, confidence: float): async def _mark_duplicate_to(self, master_uuid: str, candidate: dict, confidence: float):
"""Mark candidate as duplicate of a Qdrant-matched master.""" """Mark candidate as duplicate of a Qdrant-matched master."""
self.db.execute(text(""" try:
UPDATE canonical_controls self.db.execute(text("""
SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid) UPDATE canonical_controls
WHERE id = CAST(:cand AS uuid) SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid)
"""), {"master": master_uuid, "cand": candidate["uuid"]}) WHERE id = CAST(:cand AS uuid)
"""), {"master": master_uuid, "cand": candidate["uuid"]})
self.db.execute(text(""" self.db.execute(text("""
INSERT INTO control_parent_links INSERT INTO control_parent_links
(control_uuid, parent_control_uuid, link_type, confidence) (control_uuid, parent_control_uuid, link_type, confidence)
VALUES (CAST(:master AS uuid), CAST(:cand_parent AS uuid), 'dedup_merge', :conf) VALUES (CAST(:master AS uuid), CAST(:cand_parent AS uuid), 'dedup_merge', :conf)
ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING
"""), {"master": master_uuid, "cand_parent": candidate["uuid"], "conf": confidence}) """), {"master": master_uuid, "cand_parent": candidate["uuid"], "conf": confidence})
transferred = self._transfer_parent_links(master_uuid, candidate["uuid"]) transferred = self._transfer_parent_links(master_uuid, candidate["uuid"])
self.stats["parent_links_transferred"] += transferred self.stats["parent_links_transferred"] += transferred
self.db.commit() self.db.commit()
except Exception as e:
logger.error("BatchDedup _mark_duplicate_to error %s%s: %s",
candidate["uuid"], master_uuid, e)
self.db.rollback()
raise
def _transfer_parent_links(self, master_uuid: str, duplicate_uuid: str) -> int: def _transfer_parent_links(self, master_uuid: str, duplicate_uuid: str) -> int:
"""Move existing parent links from duplicate to master.""" """Move existing parent links from duplicate to master."""
@@ -549,26 +571,31 @@ class BatchDedupRunner:
def _write_review(self, candidate: dict, matched_payload: dict, score: float): def _write_review(self, candidate: dict, matched_payload: dict, score: float):
"""Write a dedup review entry for borderline matches.""" """Write a dedup review entry for borderline matches."""
self.db.execute(text(""" try:
INSERT INTO control_dedup_reviews self.db.execute(text("""
(candidate_control_id, candidate_title, candidate_objective, INSERT INTO control_dedup_reviews
matched_control_uuid, matched_control_id, (candidate_control_id, candidate_title, candidate_objective,
similarity_score, dedup_stage, dedup_details) matched_control_uuid, matched_control_id,
VALUES (:ccid, :ct, :co, CAST(:mcu AS uuid), :mci, similarity_score, dedup_stage, dedup_details)
:ss, 'batch_dedup', :dd::jsonb) VALUES (:ccid, :ct, :co, CAST(:mcu AS uuid), :mci,
"""), { :ss, 'batch_dedup', :dd::jsonb)
"ccid": candidate["control_id"], """), {
"ct": candidate["title"], "ccid": candidate["control_id"],
"co": candidate.get("objective", ""), "ct": candidate["title"],
"mcu": matched_payload.get("control_uuid"), "co": candidate.get("objective", ""),
"mci": matched_payload.get("control_id"), "mcu": matched_payload.get("control_uuid"),
"ss": score, "mci": matched_payload.get("control_id"),
"dd": json.dumps({ "ss": score,
"merge_group_hint": candidate.get("merge_group_hint", ""), "dd": json.dumps({
"pattern_id": candidate.get("pattern_id"), "merge_group_hint": candidate.get("merge_group_hint", ""),
}), "pattern_id": candidate.get("pattern_id"),
}) }),
self.db.commit() })
self.db.commit()
except Exception as e:
logger.error("BatchDedup _write_review error: %s", e)
self.db.rollback()
raise
# ── Progress ───────────────────────────────────────────────────────── # ── Progress ─────────────────────────────────────────────────────────