feat(pipeline): add review-verify endpoint — LLM decides DUPLIKAT/VERSCHIEDEN

Sends 67k review candidates to Haiku Batch API in pairs.
Each pair gets a DUPLIKAT/VERSCHIEDEN decision with reasoning.
Results stored in control_dedup_reviews.review_status.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-05-01 09:36:30 +02:00
parent f2104768a0
commit b8ff4e9290
@@ -2715,3 +2715,199 @@ async def get_quality_metrics(
}
finally:
db.close()
# =============================================================================
# REVIEW CANDIDATE VERIFICATION (Block B — LLM decides DUPLIKAT/VERSCHIEDEN)
# =============================================================================
_REVIEW_VERIFY_SYSTEM = """Du vergleichst Paare von Compliance Controls und entscheidest ob sie Duplikate sind.
Antworte NUR mit einem JSON-Array. Fuer jedes Paar ein Objekt:
{"pair_id": "...", "decision": "DUPLIKAT" oder "VERSCHIEDEN", "reason": "kurze Begruendung"}
DUPLIKAT = gleiche Anforderung, nur anders formuliert.
VERSCHIEDEN = unterschiedliche Anforderungen, auch wenn aehnliche Woerter vorkommen."""
class ReviewVerifyRequest(BaseModel):
limit: int = 0
batch_size: int = 10
dry_run: bool = True
_review_verify_status: dict = {}
async def _run_review_verify(req: ReviewVerifyRequest, job_id: str):
from services.decomposition_pass import (
create_anthropic_batch, fetch_batch_results, check_batch_status,
)
import asyncio as aio
db = SessionLocal()
try:
_review_verify_status[job_id] = {"status": "loading"}
query = """
SELECT r.id::text, r.candidate_control_id, r.candidate_title,
r.matched_control_id, c2.title as matched_title,
r.similarity_score
FROM control_dedup_reviews r
LEFT JOIN canonical_controls c2 ON c2.id = r.matched_control_uuid
WHERE r.review_status = 'pending'
ORDER BY r.similarity_score DESC
"""
if req.limit > 0:
query += f" LIMIT {req.limit}"
rows = db.execute(text(query)).fetchall()
total = len(rows)
_review_verify_status[job_id] = {"status": "preparing", "total": total}
if total == 0:
_review_verify_status[job_id] = {
"status": "completed", "total": 0, "message": "No pending reviews",
}
return
if req.dry_run:
_review_verify_status[job_id] = {
"status": "dry_run", "total": total,
"estimated_requests": (total + req.batch_size - 1) // req.batch_size,
}
return
# Build batch requests
api_requests = []
pair_map = {}
for i in range(0, total, req.batch_size):
batch = rows[i:i + req.batch_size]
prompt = "Vergleiche diese Control-Paare:\n\n"
batch_pairs = []
for r in batch:
pair_id = r[0][:8]
prompt += (
f"Paar {pair_id}:\n"
f" A: {r[1]}{r[2]}\n"
f" B: {r[3]}{r[4]}\n"
f" Similarity: {r[5]:.3f}\n\n"
)
batch_pairs.append({"review_id": r[0], "candidate_id": r[1]})
batch_idx = i // req.batch_size
custom_id = f"rv_b{batch_idx:05d}"
pair_map[custom_id] = batch_pairs
api_requests.append({
"custom_id": custom_id,
"params": {
"model": "claude-haiku-4-5-20251001",
"max_tokens": max(1024, len(batch) * 150),
"system": [{
"type": "text",
"text": _REVIEW_VERIFY_SYSTEM,
"cache_control": {"type": "ephemeral"},
}],
"messages": [{"role": "user", "content": prompt}],
},
})
_review_verify_status[job_id] = {
"status": "submitting", "total": total, "requests": len(api_requests),
}
batch_result = await create_anthropic_batch(api_requests)
batch_id = batch_result.get("id", "")
_review_verify_status[job_id] = {
"status": "batch_submitted", "batch_id": batch_id,
"total": total, "requests": len(api_requests),
}
# Poll for completion
for _ in range(720):
await aio.sleep(10)
status = await check_batch_status(batch_id)
if status.get("processing_status") == "ended":
break
# Process results
results = await fetch_batch_results(batch_id)
duplicates = 0
different = 0
errors = 0
for result in results:
custom_id = result.get("custom_id", "")
result_data = result.get("result", {})
if result_data.get("type") != "succeeded":
errors += 1
continue
content = result_data.get("message", {}).get("content", [])
text_content = content[0].get("text", "") if content else ""
try:
import json as jmod
import re
json_matches = re.findall(r'\{[^}]+\}', text_content)
pairs = pair_map.get(custom_id, [])
for j, match_str in enumerate(json_matches):
try:
parsed = jmod.loads(match_str)
except Exception:
continue
decision = parsed.get("decision", "").upper()
if j < len(pairs):
review_id = pairs[j]["review_id"]
if "DUPLIKAT" in decision:
db.execute(text("""
UPDATE control_dedup_reviews
SET review_status = 'duplicate', review_notes = :notes
WHERE id = CAST(:rid AS uuid)
"""), {"rid": review_id, "notes": parsed.get("reason", "")})
duplicates += 1
else:
db.execute(text("""
UPDATE control_dedup_reviews
SET review_status = 'different', review_notes = :notes
WHERE id = CAST(:rid AS uuid)
"""), {"rid": review_id, "notes": parsed.get("reason", "")})
different += 1
db.commit()
except Exception as e:
logger.error("Review verify parse error: %s", e)
errors += 1
try:
db.rollback()
except Exception:
pass
_review_verify_status[job_id] = {
"status": "completed", "batch_id": batch_id, "total": total,
"duplicates": duplicates, "different": different, "errors": errors,
}
except Exception as e:
logger.error("Review verify %s failed: %s", job_id, e)
_review_verify_status[job_id] = {"status": "failed", "error": str(e)}
finally:
db.close()
@router.post("/generate/review-verify")
async def start_review_verify(req: ReviewVerifyRequest):
"""LLM-verify review candidates (DUPLIKAT/VERSCHIEDEN) via Haiku Batch."""
import uuid as uuid_mod
job_id = str(uuid_mod.uuid4())[:8]
_review_verify_status[job_id] = {"status": "starting"}
asyncio.create_task(_run_review_verify(req, job_id))
return {
"status": "running", "job_id": job_id,
"message": f"Poll /generate/review-verify-status/{job_id}",
}
@router.get("/generate/review-verify-status/{job_id}")
async def get_review_verify_status(job_id: str):
status = _review_verify_status.get(job_id)
if not status:
raise HTTPException(status_code=404, detail="Review verify job not found")
return status