"""Batch Dedup Runner — Orchestrates deduplication of ~85k atomare Controls. Reduces Pass 0b controls from ~85k to ~18-25k unique Master Controls by: 1. Intra-Pattern Dedup: Group by pattern_id + merge_group_hint, pick best master 2. Cross-Regulation Dedup: Find near-duplicates across pattern boundaries Reuses the existing 4-Stage Pipeline from control_dedup.py. Only adds batch orchestration, quality scoring, and parent-link transfer logic. Usage: runner = BatchDedupRunner(db) stats = await runner.run(dry_run=True) # preview stats = await runner.run(dry_run=False) # execute stats = await runner.run(pattern_filter="CP-AUTH-001") # single pattern """ import json import logging import time from collections import defaultdict from sqlalchemy import text from compliance.services.control_dedup import ( ControlDedupChecker, DedupResult, canonicalize_text, ensure_qdrant_collection, get_embedding, normalize_action, normalize_object, qdrant_search, qdrant_search_cross_regulation, qdrant_upsert, CROSS_REG_LINK_THRESHOLD, ) logger = logging.getLogger(__name__) DEDUP_COLLECTION = "atomic_controls_dedup" # ── Quality Score ──────────────────────────────────────────────────────── def quality_score(control: dict) -> float: """Score a control by richness of requirements, tests, evidence, and objective. Higher score = better candidate for master control. """ score = 0.0 reqs = control.get("requirements") or "[]" if isinstance(reqs, str): try: reqs = json.loads(reqs) except (json.JSONDecodeError, TypeError): reqs = [] score += len(reqs) * 2.0 tests = control.get("test_procedure") or "[]" if isinstance(tests, str): try: tests = json.loads(tests) except (json.JSONDecodeError, TypeError): tests = [] score += len(tests) * 1.5 evidence = control.get("evidence") or "[]" if isinstance(evidence, str): try: evidence = json.loads(evidence) except (json.JSONDecodeError, TypeError): evidence = [] score += len(evidence) * 1.0 objective = control.get("objective") or "" score += min(len(objective) / 200, 3.0) return score # ── Batch Dedup Runner ─────────────────────────────────────────────────── class BatchDedupRunner: """Batch dedup orchestrator for existing Pass 0b atomic controls.""" def __init__(self, db, collection: str = DEDUP_COLLECTION): self.db = db self.collection = collection self.stats = { "total_controls": 0, "patterns_processed": 0, "sub_groups_processed": 0, "masters": 0, "linked": 0, "review": 0, "new_controls": 0, "parent_links_transferred": 0, "cross_reg_linked": 0, "errors": 0, "skipped_title_identical": 0, } self._progress_pattern = "" self._progress_count = 0 async def run( self, dry_run: bool = False, pattern_filter: str = None, ) -> dict: """Run the full batch dedup pipeline. Args: dry_run: If True, compute stats but don't modify DB. pattern_filter: If set, only process this pattern_id. Returns: Stats dict with counts. """ start = time.monotonic() logger.info("BatchDedup starting (dry_run=%s, pattern_filter=%s)", dry_run, pattern_filter) # Ensure Qdrant collection await ensure_qdrant_collection(collection=self.collection) # Phase 1: Intra-pattern dedup groups = self._load_pattern_groups(pattern_filter) for pattern_id, controls in groups: try: await self._process_pattern_group(pattern_id, controls, dry_run) self.stats["patterns_processed"] += 1 except Exception as e: logger.error("BatchDedup error on pattern %s: %s", pattern_id, e) self.stats["errors"] += 1 # Phase 2: Cross-regulation dedup (skip in dry_run for speed) if not dry_run: await self._run_cross_regulation_pass() elapsed = time.monotonic() - start self.stats["elapsed_seconds"] = round(elapsed, 1) logger.info("BatchDedup completed in %.1fs: %s", elapsed, self.stats) return self.stats def _load_pattern_groups(self, pattern_filter: str = None) -> list: """Load all Pass 0b controls grouped by pattern_id, largest first.""" conditions = [ "decomposition_method = 'pass0b'", "release_state != 'deprecated'", "release_state != 'duplicate'", ] params = {} if pattern_filter: conditions.append("pattern_id = :pf") params["pf"] = pattern_filter where = " AND ".join(conditions) rows = self.db.execute(text(f""" SELECT id::text, control_id, title, objective, pattern_id, requirements::text, test_procedure::text, evidence::text, release_state, generation_metadata->>'merge_group_hint' as merge_group_hint, generation_metadata->>'action_object_class' as action_object_class FROM canonical_controls WHERE {where} ORDER BY pattern_id, control_id """), params).fetchall() # Group by pattern_id by_pattern = defaultdict(list) for r in rows: by_pattern[r[4]].append({ "uuid": r[0], "control_id": r[1], "title": r[2], "objective": r[3], "pattern_id": r[4], "requirements": r[5], "test_procedure": r[6], "evidence": r[7], "release_state": r[8], "merge_group_hint": r[9] or "", "action_object_class": r[10] or "", }) self.stats["total_controls"] = len(rows) # Sort patterns by group size (descending) for progress visibility sorted_groups = sorted(by_pattern.items(), key=lambda x: len(x[1]), reverse=True) logger.info("BatchDedup loaded %d controls in %d patterns", len(rows), len(sorted_groups)) return sorted_groups def _sub_group_by_merge_hint(self, controls: list) -> dict: """Group controls by merge_group_hint composite key.""" groups = defaultdict(list) for c in controls: hint = c["merge_group_hint"] if hint: groups[hint].append(c) else: # No hint → each control is its own group groups[f"__no_hint_{c['uuid']}"].append(c) return dict(groups) async def _process_pattern_group( self, pattern_id: str, controls: list, dry_run: bool, ): """Process all controls within a single pattern_id.""" self._progress_pattern = pattern_id self._progress_count = 0 total = len(controls) sub_groups = self._sub_group_by_merge_hint(controls) for hint, group in sub_groups.items(): if len(group) < 2: # Single control → always master master = group[0] self.stats["masters"] += 1 if not dry_run: await self._embed_and_index(master) self._progress_count += 1 continue # Sort by quality score (best first) sorted_group = sorted(group, key=quality_score, reverse=True) master = sorted_group[0] self.stats["masters"] += 1 if not dry_run: await self._embed_and_index(master) for candidate in sorted_group[1:]: await self._check_and_link(master, candidate, pattern_id, dry_run) self._progress_count += 1 self.stats["sub_groups_processed"] += 1 # Progress logging every 100 controls if self._progress_count > 0 and self._progress_count % 100 == 0: logger.info( "BatchDedup [%s] %d/%d — masters=%d, linked=%d, review=%d", pattern_id, self._progress_count, total, self.stats["masters"], self.stats["linked"], self.stats["review"], ) async def _check_and_link( self, master: dict, candidate: dict, pattern_id: str, dry_run: bool, ): """Check if candidate is a duplicate of master and link if so.""" # Short-circuit: identical titles within same merge_group → direct link if (candidate["title"].strip().lower() == master["title"].strip().lower() and candidate["merge_group_hint"] == master["merge_group_hint"] and candidate["merge_group_hint"]): self.stats["linked"] += 1 self.stats["skipped_title_identical"] += 1 if not dry_run: await self._mark_duplicate(master, candidate, confidence=1.0) return # Extract action/object from merge_group_hint (format: "action_type:norm_obj:trigger_key") parts = candidate["merge_group_hint"].split(":", 2) action = parts[0] if len(parts) > 0 else "" obj = parts[1] if len(parts) > 1 else "" # Build canonical text and get embedding for candidate canonical = canonicalize_text(action, obj, candidate["title"]) embedding = await get_embedding(canonical) if not embedding: # Can't embed → keep as new control self.stats["new_controls"] += 1 if not dry_run: await self._embed_and_index(candidate) return # Search the dedup collection for similar controls results = await qdrant_search( embedding, pattern_id, top_k=5, collection=self.collection, ) if not results: # No matches → new master self.stats["new_controls"] += 1 if not dry_run: await self._embed_and_index(candidate) return best = results[0] best_score = best.get("score", 0.0) best_payload = best.get("payload", {}) best_uuid = best_payload.get("control_uuid", "") # Same action+object (since same merge_group_hint) → use standard thresholds from compliance.services.control_dedup import LINK_THRESHOLD, REVIEW_THRESHOLD if best_score > LINK_THRESHOLD: self.stats["linked"] += 1 if not dry_run: # Link to the matched master (which may differ from our `master`) await self._mark_duplicate_to( master_uuid=best_uuid, candidate=candidate, confidence=best_score, ) elif best_score > REVIEW_THRESHOLD: self.stats["review"] += 1 if not dry_run: self._write_review(candidate, best_payload, best_score) else: # Below threshold → becomes a new master self.stats["new_controls"] += 1 if not dry_run: await self._index_with_embedding(candidate, embedding) async def _embed_and_index(self, control: dict): """Compute embedding and index a control in the dedup Qdrant collection.""" parts = control["merge_group_hint"].split(":", 2) action = parts[0] if len(parts) > 0 else "" obj = parts[1] if len(parts) > 1 else "" norm_action = normalize_action(action) norm_object = normalize_object(obj) canonical = canonicalize_text(action, obj, control["title"]) embedding = await get_embedding(canonical) if not embedding: return await qdrant_upsert( point_id=control["uuid"], embedding=embedding, payload={ "control_uuid": control["uuid"], "control_id": control["control_id"], "title": control["title"], "pattern_id": control["pattern_id"], "action_normalized": norm_action, "object_normalized": norm_object, "canonical_text": canonical, }, collection=self.collection, ) async def _index_with_embedding(self, control: dict, embedding: list): """Index a control with a pre-computed embedding.""" parts = control["merge_group_hint"].split(":", 2) action = parts[0] if len(parts) > 0 else "" obj = parts[1] if len(parts) > 1 else "" norm_action = normalize_action(action) norm_object = normalize_object(obj) canonical = canonicalize_text(action, obj, control["title"]) await qdrant_upsert( point_id=control["uuid"], embedding=embedding, payload={ "control_uuid": control["uuid"], "control_id": control["control_id"], "title": control["title"], "pattern_id": control["pattern_id"], "action_normalized": norm_action, "object_normalized": norm_object, "canonical_text": canonical, }, collection=self.collection, ) async def _mark_duplicate(self, master: dict, candidate: dict, confidence: float): """Mark candidate as duplicate of master, transfer parent links.""" self.db.execute(text(""" UPDATE canonical_controls SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid) WHERE id = CAST(:cand AS uuid) """), {"master": master["uuid"], "cand": candidate["uuid"]}) # Add dedup_merge link self.db.execute(text(""" INSERT INTO control_parent_links (control_uuid, parent_control_uuid, link_type, confidence) VALUES (CAST(:master AS uuid), CAST(:cand_parent AS uuid), 'dedup_merge', :conf) ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING """), {"master": master["uuid"], "cand_parent": candidate["uuid"], "conf": confidence}) # Transfer parent links from candidate to master transferred = self._transfer_parent_links(master["uuid"], candidate["uuid"]) self.stats["parent_links_transferred"] += transferred self.db.commit() async def _mark_duplicate_to(self, master_uuid: str, candidate: dict, confidence: float): """Mark candidate as duplicate of a Qdrant-matched master.""" self.db.execute(text(""" UPDATE canonical_controls SET release_state = 'duplicate', merged_into_uuid = CAST(:master AS uuid) WHERE id = CAST(:cand AS uuid) """), {"master": master_uuid, "cand": candidate["uuid"]}) # Add dedup_merge link self.db.execute(text(""" INSERT INTO control_parent_links (control_uuid, parent_control_uuid, link_type, confidence) VALUES (CAST(:master AS uuid), CAST(:cand_parent AS uuid), 'dedup_merge', :conf) ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING """), {"master": master_uuid, "cand_parent": candidate["uuid"], "conf": confidence}) # Transfer parent links transferred = self._transfer_parent_links(master_uuid, candidate["uuid"]) self.stats["parent_links_transferred"] += transferred self.db.commit() def _transfer_parent_links(self, master_uuid: str, duplicate_uuid: str) -> int: """Move existing parent links from duplicate to master. Returns the number of links transferred. """ # Find parent links pointing TO the duplicate (where it was the child control) rows = self.db.execute(text(""" SELECT parent_control_uuid::text, link_type, confidence, source_regulation, source_article, obligation_candidate_id::text FROM control_parent_links WHERE control_uuid = CAST(:dup AS uuid) AND link_type = 'decomposition' """), {"dup": duplicate_uuid}).fetchall() transferred = 0 for r in rows: parent_uuid = r[0] # Skip self-references if parent_uuid == master_uuid: continue self.db.execute(text(""" INSERT INTO control_parent_links (control_uuid, parent_control_uuid, link_type, confidence, source_regulation, source_article, obligation_candidate_id) VALUES (CAST(:cu AS uuid), CAST(:pu AS uuid), :lt, :conf, :sr, :sa, CAST(:oci AS uuid)) ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING """), { "cu": master_uuid, "pu": parent_uuid, "lt": r[1], "conf": float(r[2]) if r[2] else 1.0, "sr": r[3], "sa": r[4], "oci": r[5], }) transferred += 1 return transferred def _write_review(self, candidate: dict, matched_payload: dict, score: float): """Write a dedup review entry for borderline matches.""" self.db.execute(text(""" INSERT INTO control_dedup_reviews (candidate_control_id, candidate_title, candidate_objective, matched_control_uuid, matched_control_id, similarity_score, dedup_stage, dedup_details) VALUES (:ccid, :ct, :co, CAST(:mcu AS uuid), :mci, :ss, 'batch_dedup', :dd::jsonb) """), { "ccid": candidate["control_id"], "ct": candidate["title"], "co": candidate.get("objective", ""), "mcu": matched_payload.get("control_uuid"), "mci": matched_payload.get("control_id"), "ss": score, "dd": json.dumps({ "merge_group_hint": candidate["merge_group_hint"], "pattern_id": candidate["pattern_id"], }), }) self.db.commit() async def _run_cross_regulation_pass(self): """Phase 2: Find cross-regulation duplicates among surviving masters.""" logger.info("BatchDedup Phase 2: Cross-regulation pass starting...") # Load all non-duplicate pass0b controls that are now masters rows = self.db.execute(text(""" SELECT id::text, control_id, title, pattern_id, generation_metadata->>'merge_group_hint' as merge_group_hint FROM canonical_controls WHERE decomposition_method = 'pass0b' AND release_state != 'duplicate' AND release_state != 'deprecated' ORDER BY control_id """)).fetchall() logger.info("BatchDedup Cross-reg: %d masters to check", len(rows)) cross_linked = 0 for i, r in enumerate(rows): uuid = r[0] hint = r[4] or "" parts = hint.split(":", 2) action = parts[0] if len(parts) > 0 else "" obj = parts[1] if len(parts) > 1 else "" canonical = canonicalize_text(action, obj, r[2]) embedding = await get_embedding(canonical) if not embedding: continue results = await qdrant_search_cross_regulation( embedding, top_k=5, collection=self.collection, ) if not results: continue # Check if best match is from a DIFFERENT pattern best = results[0] best_score = best.get("score", 0.0) best_payload = best.get("payload", {}) if (best_score > CROSS_REG_LINK_THRESHOLD and best_payload.get("pattern_id") != r[3] and best_payload.get("control_uuid") != uuid): # Cross-regulation link self.db.execute(text(""" INSERT INTO control_parent_links (control_uuid, parent_control_uuid, link_type, confidence) VALUES (CAST(:cu AS uuid), CAST(:pu AS uuid), 'cross_regulation', :conf) ON CONFLICT (control_uuid, parent_control_uuid) DO NOTHING """), { "cu": best_payload["control_uuid"], "pu": uuid, "conf": best_score, }) self.db.commit() cross_linked += 1 if (i + 1) % 500 == 0: logger.info("BatchDedup Cross-reg: %d/%d checked, %d linked", i + 1, len(rows), cross_linked) self.stats["cross_reg_linked"] = cross_linked logger.info("BatchDedup Cross-reg complete: %d links created", cross_linked) def get_status(self) -> dict: """Return current progress stats (for status endpoint).""" return { "pattern": self._progress_pattern, "progress": self._progress_count, **self.stats, }