From d31fccbe0e2e113d083a04cd049d736c20dd9f0d Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Fri, 24 Apr 2026 13:25:56 +0200 Subject: [PATCH] feat(control-pipeline): add harmonization recheck endpoint POST /generate/harmonization-recheck verifies promoted controls against Qdrant dedup collection via Embedding + LLM. Runs as stable asyncio background task inside the container (no docker exec issues). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../api/control_generator_routes.py | 187 ++++++++++++++++++ 1 file changed, 187 insertions(+) diff --git a/control-pipeline/api/control_generator_routes.py b/control-pipeline/api/control_generator_routes.py index 4770462..d544b03 100644 --- a/control-pipeline/api/control_generator_routes.py +++ b/control-pipeline/api/control_generator_routes.py @@ -1956,3 +1956,190 @@ async def get_anchor_backfill_status(backfill_id: str): if not status: raise HTTPException(status_code=404, detail="Anchor backfill job not found") return status + + +# ============================================================================= +# HARMONIZATION RECHECK — verify promoted controls against Qdrant +# ============================================================================= + +class HarmonizationRecheckRequest(BaseModel): + dry_run: bool = True + since: str = "2026-04-24 08:30:00" # timestamp filter for promoted controls + until: str = "2026-04-24 09:00:00" + + +_harmonization_recheck_status: dict = {} + + +async def _run_harmonization_recheck(req: HarmonizationRecheckRequest, job_id: str): + """Re-check promoted controls via Embedding + LLM dedup.""" + import os + import httpx + + QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant:6333") + EMBEDDING_URL = os.getenv("EMBEDDING_URL", "http://embedding-service:8087") + OLLAMA_URL = os.getenv("OLLAMA_URL", "http://host.docker.internal:11434") + OLLAMA_MODEL = os.getenv("CONTROL_GEN_OLLAMA_MODEL", "qwen3.5:35b-a3b") + AUTO_DUP = 0.92 + THRESHOLD = 0.85 + COLLECTION = "atomic_controls_dedup" + + db = SessionLocal() + try: + rows = db.execute(text(""" + SELECT id::text, control_id, title, objective + FROM compliance.canonical_controls + WHERE release_state = 'draft' + AND updated_at >= CAST(:since AS timestamp) + AND updated_at < CAST(:until AS timestamp) + ORDER BY control_id + """), {"since": req.since, "until": req.until}).fetchall() + + total = len(rows) + unique = 0 + duplicate = 0 + llm_calls = 0 + no_match = 0 + errors = 0 + + _harmonization_recheck_status[job_id] = { + "status": "running", "total": total, "processed": 0, + "unique": 0, "duplicate": 0, "llm_calls": 0, "dry_run": req.dry_run, + } + + for i, row in enumerate(rows): + try: + search_text = f"{row.title or ''} {(row.objective or '')[:200]}" + + # Get embedding + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.post(f"{EMBEDDING_URL}/embed", + json={"texts": [search_text]}) + if resp.status_code != 200: + errors += 1 + continue + emb = resp.json().get("embeddings", [[]])[0] + if not emb: + errors += 1 + continue + + # Search Qdrant + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.post( + f"{QDRANT_URL}/collections/{COLLECTION}/points/search", + json={"vector": emb, "limit": 3, + "score_threshold": THRESHOLD, + "with_payload": {"include": ["control_id", "title"]}}) + results = resp.json().get("result", []) if resp.status_code == 200 else [] + # Exclude self + results = [r for r in results + if r.get("payload", {}).get("control_uuid") != row[0]] + + if not results: + no_match += 1 + unique += 1 + continue + + best_score = results[0].get("score", 0.0) + best_title = results[0].get("payload", {}).get("title", "") + + if best_score >= AUTO_DUP: + if not req.dry_run: + db.execute(text( + "UPDATE compliance.canonical_controls " + "SET release_state = 'duplicate', updated_at = NOW() " + "WHERE id = CAST(:id AS uuid)" + ), {"id": row[0]}) + duplicate += 1 + + elif best_score >= THRESHOLD: + # LLM verification + try: + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post(f"{OLLAMA_URL}/api/chat", json={ + "model": OLLAMA_MODEL, "stream": False, "think": False, + "options": {"num_predict": 200}, + "messages": [ + {"role": "system", "content": ( + "Vergleiche zwei Controls: DUPLIKAT oder VERSCHIEDEN. " + 'Antworte NUR mit JSON: {"verdict":"DUPLIKAT" oder "VERSCHIEDEN","reason":"..."}' + )}, + {"role": "user", "content": ( + f"Control A:\n{row.title or ''}\n\n" + f"Control B:\n{best_title}\n\nDuplikat?" + )}, + ], + }) + llm_calls += 1 + content = resp.json().get("message", {}).get("content", "") + parsed = _parse_llm_json(content) + if parsed and "DUPLIKAT" in str(parsed.get("verdict", "")).upper(): + if not req.dry_run: + db.execute(text( + "UPDATE compliance.canonical_controls " + "SET release_state = 'duplicate', updated_at = NOW() " + "WHERE id = CAST(:id AS uuid)" + ), {"id": row[0]}) + duplicate += 1 + else: + unique += 1 + except Exception: + unique += 1 + errors += 1 + else: + unique += 1 + + except Exception as e: + errors += 1 + logger.warning("Harmonization recheck error %s: %s", row[1], e) + + if (i + 1) % 100 == 0: + if not req.dry_run: + db.commit() + _harmonization_recheck_status[job_id] = { + "status": "running", "total": total, "processed": i + 1, + "unique": unique, "duplicate": duplicate, + "llm_calls": llm_calls, "no_match": no_match, + "errors": errors, "dry_run": req.dry_run, + } + + if not req.dry_run: + db.commit() + + _harmonization_recheck_status[job_id] = { + "status": "completed", "total": total, "processed": total, + "unique": unique, "duplicate": duplicate, + "llm_calls": llm_calls, "no_match": no_match, + "errors": errors, "dry_run": req.dry_run, + } + logger.info("Harmonization recheck %s: %d total, %d unique, %d dup, %d llm, %d err", + job_id, total, unique, duplicate, llm_calls, errors) + + except Exception as e: + logger.error("Harmonization recheck %s failed: %s", job_id, e) + _harmonization_recheck_status[job_id] = {"status": "failed", "error": str(e)} + finally: + db.close() + + +@router.post("/generate/harmonization-recheck") +async def start_harmonization_recheck(req: HarmonizationRecheckRequest): + """Re-check promoted controls against Qdrant dedup collection. + Uses Embedding + LLM verification for borderline matches. + """ + import uuid + job_id = str(uuid.uuid4())[:8] + _harmonization_recheck_status[job_id] = {"status": "starting"} + asyncio.create_task(_run_harmonization_recheck(req, job_id)) + return { + "status": "running", "job_id": job_id, + "message": f"Harmonization recheck started. Poll /generate/harmonization-recheck-status/{job_id}", + } + + +@router.get("/generate/harmonization-recheck-status/{job_id}") +async def get_harmonization_recheck_status(job_id: str): + status = _harmonization_recheck_status.get(job_id) + if not status: + raise HTTPException(status_code=404, detail="Harmonization recheck job not found") + return status