diff --git a/control-pipeline/api/control_generator_routes.py b/control-pipeline/api/control_generator_routes.py index 5a46979..13d420d 100644 --- a/control-pipeline/api/control_generator_routes.py +++ b/control-pipeline/api/control_generator_routes.py @@ -2715,3 +2715,199 @@ async def get_quality_metrics( } finally: db.close() + + +# ============================================================================= +# REVIEW CANDIDATE VERIFICATION (Block B — LLM decides DUPLIKAT/VERSCHIEDEN) +# ============================================================================= + +_REVIEW_VERIFY_SYSTEM = """Du vergleichst Paare von Compliance Controls und entscheidest ob sie Duplikate sind. +Antworte NUR mit einem JSON-Array. Fuer jedes Paar ein Objekt: +{"pair_id": "...", "decision": "DUPLIKAT" oder "VERSCHIEDEN", "reason": "kurze Begruendung"} +DUPLIKAT = gleiche Anforderung, nur anders formuliert. +VERSCHIEDEN = unterschiedliche Anforderungen, auch wenn aehnliche Woerter vorkommen.""" + + +class ReviewVerifyRequest(BaseModel): + limit: int = 0 + batch_size: int = 10 + dry_run: bool = True + + +_review_verify_status: dict = {} + + +async def _run_review_verify(req: ReviewVerifyRequest, job_id: str): + from services.decomposition_pass import ( + create_anthropic_batch, fetch_batch_results, check_batch_status, + ) + import asyncio as aio + db = SessionLocal() + try: + _review_verify_status[job_id] = {"status": "loading"} + + query = """ + SELECT r.id::text, r.candidate_control_id, r.candidate_title, + r.matched_control_id, c2.title as matched_title, + r.similarity_score + FROM control_dedup_reviews r + LEFT JOIN canonical_controls c2 ON c2.id = r.matched_control_uuid + WHERE r.review_status = 'pending' + ORDER BY r.similarity_score DESC + """ + if req.limit > 0: + query += f" LIMIT {req.limit}" + + rows = db.execute(text(query)).fetchall() + total = len(rows) + _review_verify_status[job_id] = {"status": "preparing", "total": total} + + if total == 0: + _review_verify_status[job_id] = { + "status": "completed", "total": 0, "message": "No pending reviews", + } + return + + if req.dry_run: + _review_verify_status[job_id] = { + "status": "dry_run", "total": total, + "estimated_requests": (total + req.batch_size - 1) // req.batch_size, + } + return + + # Build batch requests + api_requests = [] + pair_map = {} + for i in range(0, total, req.batch_size): + batch = rows[i:i + req.batch_size] + prompt = "Vergleiche diese Control-Paare:\n\n" + batch_pairs = [] + for r in batch: + pair_id = r[0][:8] + prompt += ( + f"Paar {pair_id}:\n" + f" A: {r[1]} — {r[2]}\n" + f" B: {r[3]} — {r[4]}\n" + f" Similarity: {r[5]:.3f}\n\n" + ) + batch_pairs.append({"review_id": r[0], "candidate_id": r[1]}) + + batch_idx = i // req.batch_size + custom_id = f"rv_b{batch_idx:05d}" + pair_map[custom_id] = batch_pairs + api_requests.append({ + "custom_id": custom_id, + "params": { + "model": "claude-haiku-4-5-20251001", + "max_tokens": max(1024, len(batch) * 150), + "system": [{ + "type": "text", + "text": _REVIEW_VERIFY_SYSTEM, + "cache_control": {"type": "ephemeral"}, + }], + "messages": [{"role": "user", "content": prompt}], + }, + }) + + _review_verify_status[job_id] = { + "status": "submitting", "total": total, "requests": len(api_requests), + } + batch_result = await create_anthropic_batch(api_requests) + batch_id = batch_result.get("id", "") + _review_verify_status[job_id] = { + "status": "batch_submitted", "batch_id": batch_id, + "total": total, "requests": len(api_requests), + } + + # Poll for completion + for _ in range(720): + await aio.sleep(10) + status = await check_batch_status(batch_id) + if status.get("processing_status") == "ended": + break + + # Process results + results = await fetch_batch_results(batch_id) + duplicates = 0 + different = 0 + errors = 0 + + for result in results: + custom_id = result.get("custom_id", "") + result_data = result.get("result", {}) + if result_data.get("type") != "succeeded": + errors += 1 + continue + + content = result_data.get("message", {}).get("content", []) + text_content = content[0].get("text", "") if content else "" + + try: + import json as jmod + import re + json_matches = re.findall(r'\{[^}]+\}', text_content) + pairs = pair_map.get(custom_id, []) + + for j, match_str in enumerate(json_matches): + try: + parsed = jmod.loads(match_str) + except Exception: + continue + + decision = parsed.get("decision", "").upper() + if j < len(pairs): + review_id = pairs[j]["review_id"] + if "DUPLIKAT" in decision: + db.execute(text(""" + UPDATE control_dedup_reviews + SET review_status = 'duplicate', review_notes = :notes + WHERE id = CAST(:rid AS uuid) + """), {"rid": review_id, "notes": parsed.get("reason", "")}) + duplicates += 1 + else: + db.execute(text(""" + UPDATE control_dedup_reviews + SET review_status = 'different', review_notes = :notes + WHERE id = CAST(:rid AS uuid) + """), {"rid": review_id, "notes": parsed.get("reason", "")}) + different += 1 + + db.commit() + except Exception as e: + logger.error("Review verify parse error: %s", e) + errors += 1 + try: + db.rollback() + except Exception: + pass + + _review_verify_status[job_id] = { + "status": "completed", "batch_id": batch_id, "total": total, + "duplicates": duplicates, "different": different, "errors": errors, + } + except Exception as e: + logger.error("Review verify %s failed: %s", job_id, e) + _review_verify_status[job_id] = {"status": "failed", "error": str(e)} + finally: + db.close() + + +@router.post("/generate/review-verify") +async def start_review_verify(req: ReviewVerifyRequest): + """LLM-verify review candidates (DUPLIKAT/VERSCHIEDEN) via Haiku Batch.""" + import uuid as uuid_mod + job_id = str(uuid_mod.uuid4())[:8] + _review_verify_status[job_id] = {"status": "starting"} + asyncio.create_task(_run_review_verify(req, job_id)) + return { + "status": "running", "job_id": job_id, + "message": f"Poll /generate/review-verify-status/{job_id}", + } + + +@router.get("/generate/review-verify-status/{job_id}") +async def get_review_verify_status(job_id: str): + status = _review_verify_status.get(job_id) + if not status: + raise HTTPException(status_code=404, detail="Review verify job not found") + return status