feat(control-pipeline): add harmonization recheck endpoint
POST /generate/harmonization-recheck verifies promoted controls against Qdrant dedup collection via Embedding + LLM. Runs as stable asyncio background task inside the container (no docker exec issues). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1956,3 +1956,190 @@ async def get_anchor_backfill_status(backfill_id: str):
|
|||||||
if not status:
|
if not status:
|
||||||
raise HTTPException(status_code=404, detail="Anchor backfill job not found")
|
raise HTTPException(status_code=404, detail="Anchor backfill job not found")
|
||||||
return status
|
return status
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# HARMONIZATION RECHECK — verify promoted controls against Qdrant
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
class HarmonizationRecheckRequest(BaseModel):
|
||||||
|
dry_run: bool = True
|
||||||
|
since: str = "2026-04-24 08:30:00" # timestamp filter for promoted controls
|
||||||
|
until: str = "2026-04-24 09:00:00"
|
||||||
|
|
||||||
|
|
||||||
|
_harmonization_recheck_status: dict = {}
|
||||||
|
|
||||||
|
|
||||||
|
async def _run_harmonization_recheck(req: HarmonizationRecheckRequest, job_id: str):
|
||||||
|
"""Re-check promoted controls via Embedding + LLM dedup."""
|
||||||
|
import os
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant:6333")
|
||||||
|
EMBEDDING_URL = os.getenv("EMBEDDING_URL", "http://embedding-service:8087")
|
||||||
|
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434")
|
||||||
|
OLLAMA_MODEL = os.getenv("CONTROL_GEN_OLLAMA_MODEL", "qwen3.5:35b-a3b")
|
||||||
|
AUTO_DUP = 0.92
|
||||||
|
THRESHOLD = 0.85
|
||||||
|
COLLECTION = "atomic_controls_dedup"
|
||||||
|
|
||||||
|
db = SessionLocal()
|
||||||
|
try:
|
||||||
|
rows = db.execute(text("""
|
||||||
|
SELECT id::text, control_id, title, objective
|
||||||
|
FROM compliance.canonical_controls
|
||||||
|
WHERE release_state = 'draft'
|
||||||
|
AND updated_at >= CAST(:since AS timestamp)
|
||||||
|
AND updated_at < CAST(:until AS timestamp)
|
||||||
|
ORDER BY control_id
|
||||||
|
"""), {"since": req.since, "until": req.until}).fetchall()
|
||||||
|
|
||||||
|
total = len(rows)
|
||||||
|
unique = 0
|
||||||
|
duplicate = 0
|
||||||
|
llm_calls = 0
|
||||||
|
no_match = 0
|
||||||
|
errors = 0
|
||||||
|
|
||||||
|
_harmonization_recheck_status[job_id] = {
|
||||||
|
"status": "running", "total": total, "processed": 0,
|
||||||
|
"unique": 0, "duplicate": 0, "llm_calls": 0, "dry_run": req.dry_run,
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, row in enumerate(rows):
|
||||||
|
try:
|
||||||
|
search_text = f"{row.title or ''} {(row.objective or '')[:200]}"
|
||||||
|
|
||||||
|
# Get embedding
|
||||||
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
||||||
|
resp = await client.post(f"{EMBEDDING_URL}/embed",
|
||||||
|
json={"texts": [search_text]})
|
||||||
|
if resp.status_code != 200:
|
||||||
|
errors += 1
|
||||||
|
continue
|
||||||
|
emb = resp.json().get("embeddings", [[]])[0]
|
||||||
|
if not emb:
|
||||||
|
errors += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Search Qdrant
|
||||||
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
||||||
|
resp = await client.post(
|
||||||
|
f"{QDRANT_URL}/collections/{COLLECTION}/points/search",
|
||||||
|
json={"vector": emb, "limit": 3,
|
||||||
|
"score_threshold": THRESHOLD,
|
||||||
|
"with_payload": {"include": ["control_id", "title"]}})
|
||||||
|
results = resp.json().get("result", []) if resp.status_code == 200 else []
|
||||||
|
# Exclude self
|
||||||
|
results = [r for r in results
|
||||||
|
if r.get("payload", {}).get("control_uuid") != row[0]]
|
||||||
|
|
||||||
|
if not results:
|
||||||
|
no_match += 1
|
||||||
|
unique += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
best_score = results[0].get("score", 0.0)
|
||||||
|
best_title = results[0].get("payload", {}).get("title", "")
|
||||||
|
|
||||||
|
if best_score >= AUTO_DUP:
|
||||||
|
if not req.dry_run:
|
||||||
|
db.execute(text(
|
||||||
|
"UPDATE compliance.canonical_controls "
|
||||||
|
"SET release_state = 'duplicate', updated_at = NOW() "
|
||||||
|
"WHERE id = CAST(:id AS uuid)"
|
||||||
|
), {"id": row[0]})
|
||||||
|
duplicate += 1
|
||||||
|
|
||||||
|
elif best_score >= THRESHOLD:
|
||||||
|
# LLM verification
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||||
|
resp = await client.post(f"{OLLAMA_URL}/api/chat", json={
|
||||||
|
"model": OLLAMA_MODEL, "stream": False, "think": False,
|
||||||
|
"options": {"num_predict": 200},
|
||||||
|
"messages": [
|
||||||
|
{"role": "system", "content": (
|
||||||
|
"Vergleiche zwei Controls: DUPLIKAT oder VERSCHIEDEN. "
|
||||||
|
'Antworte NUR mit JSON: {"verdict":"DUPLIKAT" oder "VERSCHIEDEN","reason":"..."}'
|
||||||
|
)},
|
||||||
|
{"role": "user", "content": (
|
||||||
|
f"Control A:\n{row.title or ''}\n\n"
|
||||||
|
f"Control B:\n{best_title}\n\nDuplikat?"
|
||||||
|
)},
|
||||||
|
],
|
||||||
|
})
|
||||||
|
llm_calls += 1
|
||||||
|
content = resp.json().get("message", {}).get("content", "")
|
||||||
|
parsed = _parse_llm_json(content)
|
||||||
|
if parsed and "DUPLIKAT" in str(parsed.get("verdict", "")).upper():
|
||||||
|
if not req.dry_run:
|
||||||
|
db.execute(text(
|
||||||
|
"UPDATE compliance.canonical_controls "
|
||||||
|
"SET release_state = 'duplicate', updated_at = NOW() "
|
||||||
|
"WHERE id = CAST(:id AS uuid)"
|
||||||
|
), {"id": row[0]})
|
||||||
|
duplicate += 1
|
||||||
|
else:
|
||||||
|
unique += 1
|
||||||
|
except Exception:
|
||||||
|
unique += 1
|
||||||
|
errors += 1
|
||||||
|
else:
|
||||||
|
unique += 1
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
errors += 1
|
||||||
|
logger.warning("Harmonization recheck error %s: %s", row[1], e)
|
||||||
|
|
||||||
|
if (i + 1) % 100 == 0:
|
||||||
|
if not req.dry_run:
|
||||||
|
db.commit()
|
||||||
|
_harmonization_recheck_status[job_id] = {
|
||||||
|
"status": "running", "total": total, "processed": i + 1,
|
||||||
|
"unique": unique, "duplicate": duplicate,
|
||||||
|
"llm_calls": llm_calls, "no_match": no_match,
|
||||||
|
"errors": errors, "dry_run": req.dry_run,
|
||||||
|
}
|
||||||
|
|
||||||
|
if not req.dry_run:
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
_harmonization_recheck_status[job_id] = {
|
||||||
|
"status": "completed", "total": total, "processed": total,
|
||||||
|
"unique": unique, "duplicate": duplicate,
|
||||||
|
"llm_calls": llm_calls, "no_match": no_match,
|
||||||
|
"errors": errors, "dry_run": req.dry_run,
|
||||||
|
}
|
||||||
|
logger.info("Harmonization recheck %s: %d total, %d unique, %d dup, %d llm, %d err",
|
||||||
|
job_id, total, unique, duplicate, llm_calls, errors)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Harmonization recheck %s failed: %s", job_id, e)
|
||||||
|
_harmonization_recheck_status[job_id] = {"status": "failed", "error": str(e)}
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/generate/harmonization-recheck")
|
||||||
|
async def start_harmonization_recheck(req: HarmonizationRecheckRequest):
|
||||||
|
"""Re-check promoted controls against Qdrant dedup collection.
|
||||||
|
Uses Embedding + LLM verification for borderline matches.
|
||||||
|
"""
|
||||||
|
import uuid
|
||||||
|
job_id = str(uuid.uuid4())[:8]
|
||||||
|
_harmonization_recheck_status[job_id] = {"status": "starting"}
|
||||||
|
asyncio.create_task(_run_harmonization_recheck(req, job_id))
|
||||||
|
return {
|
||||||
|
"status": "running", "job_id": job_id,
|
||||||
|
"message": f"Harmonization recheck started. Poll /generate/harmonization-recheck-status/{job_id}",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/generate/harmonization-recheck-status/{job_id}")
|
||||||
|
async def get_harmonization_recheck_status(job_id: str):
|
||||||
|
status = _harmonization_recheck_status.get(job_id)
|
||||||
|
if not status:
|
||||||
|
raise HTTPException(status_code=404, detail="Harmonization recheck job not found")
|
||||||
|
return status
|
||||||
|
|||||||
Reference in New Issue
Block a user