From 076a6cd5673f93eebea4c9d2facf314026bbc5af Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Wed, 22 Apr 2026 20:15:46 +0200 Subject: [PATCH] feat(control-pipeline): add LLM dedup endpoint for borderline review queue POST /v1/canonical/generate/llm-dedup uses local Ollama (qwen3.5:35b-a3b) to verify borderline duplicate matches (score 0.85-0.91). More accurate than embedding similarity for compliance controls with subtle scope differences (e.g. "documented" vs "implemented"). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../api/control_generator_routes.py | 181 ++++++++++++++++++ 1 file changed, 181 insertions(+) diff --git a/control-pipeline/api/control_generator_routes.py b/control-pipeline/api/control_generator_routes.py index 7692303..9d67bfb 100644 --- a/control-pipeline/api/control_generator_routes.py +++ b/control-pipeline/api/control_generator_routes.py @@ -1402,6 +1402,187 @@ async def get_batch_dedup_status(dedup_id: str): return status +# ============================================================================= +# LLM DEDUP REVIEW — Local LLM verifies borderline duplicates +# ============================================================================= + +class LLMDedupRequest(BaseModel): + dry_run: bool = True + limit: int = 100 + min_score: float = 0.85 # Only review entries >= this score + max_score: float = 0.91 # Only review entries < this score (0.91+ already handled) + model: str = "qwen3.5:35b-a3b" + + +_llm_dedup_status: dict = {} + + +async def _run_llm_dedup(req: LLMDedupRequest, job_id: str): + """Use local LLM to verify borderline dedup matches.""" + import httpx + import os + + OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") + + db = SessionLocal() + try: + # Load review entries in the score band + limit_clause = f"LIMIT {req.limit}" if req.limit > 0 else "" + rows = db.execute(text(f""" + SELECT r.id, r.candidate_control_id, r.candidate_title, + r.matched_control_id, r.similarity_score, + c1.objective as candidate_objective, + c1.requirements::text as candidate_requirements, + c2.title as matched_title, + c2.objective as matched_objective, + c2.requirements::text as matched_requirements + FROM compliance.control_dedup_reviews r + LEFT JOIN compliance.canonical_controls c1 ON c1.control_id = r.candidate_control_id + LEFT JOIN compliance.canonical_controls c2 ON c2.control_id = r.matched_control_id + WHERE r.dedup_stage = 'batch_dedup' + AND r.similarity_score >= :min_score + AND r.similarity_score < :max_score + ORDER BY r.similarity_score DESC + {limit_clause} + """), {"min_score": req.min_score, "max_score": req.max_score}).fetchall() + + total = len(rows) + duplicates = 0 + different = 0 + errors = 0 + results = [] + + _llm_dedup_status[job_id] = { + "status": "running", "total": total, "processed": 0, + "duplicates": 0, "different": 0, "errors": 0, + "dry_run": req.dry_run, + } + + for i, row in enumerate(rows): + try: + # Build comparison prompt + candidate_ctx = row.candidate_title or "" + if row.candidate_objective: + candidate_ctx += f"\nObjective: {row.candidate_objective[:300]}" + if row.candidate_requirements and row.candidate_requirements not in ("[]", "null"): + candidate_ctx += f"\nRequirements: {row.candidate_requirements[:300]}" + + matched_ctx = row.matched_title or "" + if row.matched_objective: + matched_ctx += f"\nObjective: {row.matched_objective[:300]}" + if row.matched_requirements and row.matched_requirements not in ("[]", "null"): + matched_ctx += f"\nRequirements: {row.matched_requirements[:300]}" + + prompt = f"Control A ({row.candidate_control_id}):\n{candidate_ctx}\n\nControl B ({row.matched_control_id}):\n{matched_ctx}\n\nSind diese Controls Duplikate?" + + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post( + f"{OLLAMA_URL}/api/chat", + json={ + "model": req.model, + "stream": False, + "messages": [ + {"role": "system", "content": "Du bist ein Compliance-Experte. Vergleiche zwei Controls und entscheide: DUPLIKAT (gleiche Anforderung, nur anders formuliert) oder VERSCHIEDEN (unterschiedlicher Scope/Inhalt). Antworte NUR mit einem JSON: {\"verdict\": \"DUPLIKAT\" oder \"VERSCHIEDEN\", \"reason\": \"kurze Begruendung\"}"}, + {"role": "user", "content": prompt}, + ], + }, + ) + + if resp.status_code != 200: + errors += 1 + continue + + content = resp.json().get("message", {}).get("content", "") + # Parse verdict from response + parsed = _parse_llm_json(content) + if not parsed: + errors += 1 + continue + + verdict = parsed.get("verdict", "").upper() + reason = parsed.get("reason", "") + + if "DUPLIKAT" in verdict: + duplicates += 1 + if not req.dry_run: + # Mark as duplicate + db.execute(text(""" + UPDATE compliance.canonical_controls + SET release_state = 'duplicate', + merged_into_uuid = (SELECT id FROM compliance.canonical_controls WHERE control_id = :matched LIMIT 1), + updated_at = NOW() + WHERE control_id = :candidate AND release_state = 'draft' + """), {"candidate": row.candidate_control_id, "matched": row.matched_control_id}) + else: + different += 1 + + results.append({ + "candidate": row.candidate_control_id, + "matched": row.matched_control_id, + "score": float(row.similarity_score), + "verdict": verdict, + "reason": reason[:100], + }) + + except Exception as e: + errors += 1 + logger.warning("LLM dedup error for %s: %s", row.candidate_control_id, e) + + if not req.dry_run and (i + 1) % 50 == 0: + db.commit() + + _llm_dedup_status[job_id] = { + "status": "running", "total": total, "processed": i + 1, + "duplicates": duplicates, "different": different, "errors": errors, + "dry_run": req.dry_run, + } + + if not req.dry_run: + db.commit() + + _llm_dedup_status[job_id] = { + "status": "completed", "total": total, "processed": total, + "duplicates": duplicates, "different": different, "errors": errors, + "dry_run": req.dry_run, "results": results[-50:], + } + logger.info("LLM dedup %s completed: %d dup, %d diff, %d err out of %d", + job_id, duplicates, different, errors, total) + + except Exception as e: + logger.error("LLM dedup %s failed: %s", job_id, e) + _llm_dedup_status[job_id] = {"status": "failed", "error": str(e)} + finally: + db.close() + + +@router.post("/generate/llm-dedup") +async def start_llm_dedup(req: LLMDedupRequest): + """Use local LLM to verify borderline dedup matches from the review queue. + + Sends each candidate+matched pair to the local Ollama LLM for a + DUPLIKAT/VERSCHIEDEN verdict. Much more accurate than embedding similarity. + Default is dry_run=True (preview only, no DB changes). + """ + import uuid + job_id = str(uuid.uuid4())[:8] + _llm_dedup_status[job_id] = {"status": "starting"} + asyncio.create_task(_run_llm_dedup(req, job_id)) + return { + "status": "running", + "job_id": job_id, + "message": f"LLM dedup started. Poll /generate/llm-dedup-status/{job_id}", + } + + +@router.get("/generate/llm-dedup-status/{job_id}") +async def get_llm_dedup_status(job_id: str): + """Get status of an LLM dedup job.""" + status = _llm_dedup_status.get(job_id) + if not status: + raise HTTPException(status_code=404, detail="LLM dedup job not found") + return status + + # ============================================================================= # ANCHOR BACKFILL # =============================================================================