feat(control-pipeline): add LLM dedup endpoint for borderline review queue

POST /v1/canonical/generate/llm-dedup uses local Ollama (qwen3.5:35b-a3b)
to verify borderline duplicate matches (score 0.85-0.91). More accurate
than embedding similarity for compliance controls with subtle scope
differences (e.g. "documented" vs "implemented").

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-04-22 20:15:46 +02:00
parent c93796554a
commit 076a6cd567

View File

@@ -1402,6 +1402,187 @@ async def get_batch_dedup_status(dedup_id: str):
return status return status
# =============================================================================
# LLM DEDUP REVIEW — Local LLM verifies borderline duplicates
# =============================================================================
class LLMDedupRequest(BaseModel):
dry_run: bool = True
limit: int = 100
min_score: float = 0.85 # Only review entries >= this score
max_score: float = 0.91 # Only review entries < this score (0.91+ already handled)
model: str = "qwen3.5:35b-a3b"
_llm_dedup_status: dict = {}
async def _run_llm_dedup(req: LLMDedupRequest, job_id: str):
"""Use local LLM to verify borderline dedup matches."""
import httpx
import os
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
db = SessionLocal()
try:
# Load review entries in the score band
limit_clause = f"LIMIT {req.limit}" if req.limit > 0 else ""
rows = db.execute(text(f"""
SELECT r.id, r.candidate_control_id, r.candidate_title,
r.matched_control_id, r.similarity_score,
c1.objective as candidate_objective,
c1.requirements::text as candidate_requirements,
c2.title as matched_title,
c2.objective as matched_objective,
c2.requirements::text as matched_requirements
FROM compliance.control_dedup_reviews r
LEFT JOIN compliance.canonical_controls c1 ON c1.control_id = r.candidate_control_id
LEFT JOIN compliance.canonical_controls c2 ON c2.control_id = r.matched_control_id
WHERE r.dedup_stage = 'batch_dedup'
AND r.similarity_score >= :min_score
AND r.similarity_score < :max_score
ORDER BY r.similarity_score DESC
{limit_clause}
"""), {"min_score": req.min_score, "max_score": req.max_score}).fetchall()
total = len(rows)
duplicates = 0
different = 0
errors = 0
results = []
_llm_dedup_status[job_id] = {
"status": "running", "total": total, "processed": 0,
"duplicates": 0, "different": 0, "errors": 0,
"dry_run": req.dry_run,
}
for i, row in enumerate(rows):
try:
# Build comparison prompt
candidate_ctx = row.candidate_title or ""
if row.candidate_objective:
candidate_ctx += f"\nObjective: {row.candidate_objective[:300]}"
if row.candidate_requirements and row.candidate_requirements not in ("[]", "null"):
candidate_ctx += f"\nRequirements: {row.candidate_requirements[:300]}"
matched_ctx = row.matched_title or ""
if row.matched_objective:
matched_ctx += f"\nObjective: {row.matched_objective[:300]}"
if row.matched_requirements and row.matched_requirements not in ("[]", "null"):
matched_ctx += f"\nRequirements: {row.matched_requirements[:300]}"
prompt = f"Control A ({row.candidate_control_id}):\n{candidate_ctx}\n\nControl B ({row.matched_control_id}):\n{matched_ctx}\n\nSind diese Controls Duplikate?"
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{OLLAMA_URL}/api/chat",
json={
"model": req.model,
"stream": False,
"messages": [
{"role": "system", "content": "Du bist ein Compliance-Experte. Vergleiche zwei Controls und entscheide: DUPLIKAT (gleiche Anforderung, nur anders formuliert) oder VERSCHIEDEN (unterschiedlicher Scope/Inhalt). Antworte NUR mit einem JSON: {\"verdict\": \"DUPLIKAT\" oder \"VERSCHIEDEN\", \"reason\": \"kurze Begruendung\"}"},
{"role": "user", "content": prompt},
],
},
)
if resp.status_code != 200:
errors += 1
continue
content = resp.json().get("message", {}).get("content", "")
# Parse verdict from response
parsed = _parse_llm_json(content)
if not parsed:
errors += 1
continue
verdict = parsed.get("verdict", "").upper()
reason = parsed.get("reason", "")
if "DUPLIKAT" in verdict:
duplicates += 1
if not req.dry_run:
# Mark as duplicate
db.execute(text("""
UPDATE compliance.canonical_controls
SET release_state = 'duplicate',
merged_into_uuid = (SELECT id FROM compliance.canonical_controls WHERE control_id = :matched LIMIT 1),
updated_at = NOW()
WHERE control_id = :candidate AND release_state = 'draft'
"""), {"candidate": row.candidate_control_id, "matched": row.matched_control_id})
else:
different += 1
results.append({
"candidate": row.candidate_control_id,
"matched": row.matched_control_id,
"score": float(row.similarity_score),
"verdict": verdict,
"reason": reason[:100],
})
except Exception as e:
errors += 1
logger.warning("LLM dedup error for %s: %s", row.candidate_control_id, e)
if not req.dry_run and (i + 1) % 50 == 0:
db.commit()
_llm_dedup_status[job_id] = {
"status": "running", "total": total, "processed": i + 1,
"duplicates": duplicates, "different": different, "errors": errors,
"dry_run": req.dry_run,
}
if not req.dry_run:
db.commit()
_llm_dedup_status[job_id] = {
"status": "completed", "total": total, "processed": total,
"duplicates": duplicates, "different": different, "errors": errors,
"dry_run": req.dry_run, "results": results[-50:],
}
logger.info("LLM dedup %s completed: %d dup, %d diff, %d err out of %d",
job_id, duplicates, different, errors, total)
except Exception as e:
logger.error("LLM dedup %s failed: %s", job_id, e)
_llm_dedup_status[job_id] = {"status": "failed", "error": str(e)}
finally:
db.close()
@router.post("/generate/llm-dedup")
async def start_llm_dedup(req: LLMDedupRequest):
"""Use local LLM to verify borderline dedup matches from the review queue.
Sends each candidate+matched pair to the local Ollama LLM for a
DUPLIKAT/VERSCHIEDEN verdict. Much more accurate than embedding similarity.
Default is dry_run=True (preview only, no DB changes).
"""
import uuid
job_id = str(uuid.uuid4())[:8]
_llm_dedup_status[job_id] = {"status": "starting"}
asyncio.create_task(_run_llm_dedup(req, job_id))
return {
"status": "running",
"job_id": job_id,
"message": f"LLM dedup started. Poll /generate/llm-dedup-status/{job_id}",
}
@router.get("/generate/llm-dedup-status/{job_id}")
async def get_llm_dedup_status(job_id: str):
"""Get status of an LLM dedup job."""
status = _llm_dedup_status.get(job_id)
if not status:
raise HTTPException(status_code=404, detail="LLM dedup job not found")
return status
# ============================================================================= # =============================================================================
# ANCHOR BACKFILL # ANCHOR BACKFILL
# ============================================================================= # =============================================================================