From 8b7671d31047d582197aa24ce5d6b259f676b52c Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Wed, 22 Apr 2026 07:06:19 +0200 Subject: [PATCH] feat(control-pipeline): add repair backfill endpoint for missing title/objective/requirements POST /v1/canonical/generate/backfill-repair uses Anthropic API to generate missing fields from available context (source text, other fields). Handles 1,470 controls with incomplete data. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../api/control_generator_routes.py | 232 ++++++++++++++++++ 1 file changed, 232 insertions(+) diff --git a/control-pipeline/api/control_generator_routes.py b/control-pipeline/api/control_generator_routes.py index 36ec846..2f2cfe8 100644 --- a/control-pipeline/api/control_generator_routes.py +++ b/control-pipeline/api/control_generator_routes.py @@ -1104,6 +1104,238 @@ async def get_source_type_backfill_status(backfill_id: str): return status +# ============================================================================= +# REPAIR BACKFILL — Fix controls with missing title/objective/requirements +# ============================================================================= + +class RepairBackfillRequest(BaseModel): + dry_run: bool = True + limit: int = 0 # 0 = all + batch_size: int = 10 + + +_repair_backfill_status: dict = {} + + +async def _run_repair_backfill(req: RepairBackfillRequest, backfill_id: str): + """Repair controls with missing title, objective, or requirements using Anthropic API.""" + import os + import httpx + + ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") + ANTHROPIC_MODEL = os.getenv("CONTROL_GEN_ANTHROPIC_MODEL", "claude-sonnet-4-6") + + if not ANTHROPIC_API_KEY: + _repair_backfill_status[backfill_id] = { + "status": "failed", "error": "ANTHROPIC_API_KEY not set" + } + return + + db = SessionLocal() + try: + # Find controls needing repair: missing title OR missing objective OR missing requirements + limit_clause = f"LIMIT {req.limit}" if req.limit > 0 else "" + rows = db.execute(text(f""" + SELECT id, control_id, title, objective, requirements::text as requirements, + source_original_text, tags::text as tags, category + FROM compliance.canonical_controls + WHERE release_state = 'draft' + AND ( + (title IS NULL OR title = 'None' OR title = '') + OR (objective IS NULL OR objective = 'None' OR objective = '') + OR (requirements IS NULL OR requirements::text = '[]' OR requirements::text = 'null') + ) + ORDER BY control_id + {limit_clause} + """)).fetchall() + + total = len(rows) + repaired = 0 + skipped = 0 + errors = [] + + _repair_backfill_status[backfill_id] = { + "status": "running", "total": total, "repaired": 0, "skipped": 0, + "dry_run": req.dry_run, "errors": [], + } + + for i in range(0, total, req.batch_size): + batch = rows[i:i + req.batch_size] + + entries = [] + for idx, row in enumerate(batch): + # Collect all available context + available = [] + if row.title and row.title != "None": + available.append(f"Titel: {row.title}") + if row.objective and row.objective != "None": + available.append(f"Objective: {row.objective[:500]}") + if row.requirements and row.requirements not in ("[]", "null", "None"): + available.append(f"Requirements: {row.requirements[:500]}") + if row.source_original_text and len(row.source_original_text) > 20: + available.append(f"Quelltext: {row.source_original_text[:800]}") + if row.category: + available.append(f"Kategorie: {row.category}") + + missing = [] + if not row.title or row.title == "None": + missing.append("title") + if not row.objective or row.objective == "None": + missing.append("objective") + if not row.requirements or row.requirements in ("[]", "null", "None"): + missing.append("requirements") + + entries.append( + f"--- CONTROL {idx + 1}: {row.control_id} ---\n" + f"Fehlend: {', '.join(missing)}\n" + f"{'chr(10)'.join(available)}\n" + ) + + prompt = f"""Repariere die folgenden {len(batch)} Compliance-Controls. Fuer jedes Control fehlen bestimmte Felder. + +Regeln: +- title: Kurzer, praegnanter deutscher Titel (max 80 Zeichen). Erster Buchstabe gross. +- objective: 1-2 Saetze die das Ziel des Controls beschreiben. +- requirements: JSON-Array mit 2-5 konkreten Anforderungen als Strings. +- Nur die fehlenden Felder generieren. Bestehende Felder NICHT aendern. +- Wenn nicht genug Kontext vorhanden ist, schreibe "SKIP" als Wert. + +Antworte mit einem JSON-Array. Jedes Objekt hat: +- control_index: 1-basierter Index +- title: (nur wenn fehlend, sonst null) +- objective: (nur wenn fehlend, sonst null) +- requirements: (nur wenn fehlend, sonst null — als JSON-Array von Strings) + +{chr(10).join(entries)}""" + + try: + headers = { + "x-api-key": ANTHROPIC_API_KEY, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + } + payload = { + "model": ANTHROPIC_MODEL, + "max_tokens": 4096, + "system": "Du bist ein Compliance-Experte. Repariere unvollstaendige Controls. Antworte NUR mit validem JSON.", + "messages": [{"role": "user", "content": prompt}], + } + + async with httpx.AsyncClient(timeout=120.0) as client: + resp = await client.post( + "https://api.anthropic.com/v1/messages", + headers=headers, + json=payload, + ) + + if resp.status_code != 200: + errors.append(f"Batch {i}: API {resp.status_code}") + continue + + content = resp.json().get("content", [{}])[0].get("text", "") + parsed = _parse_llm_json(content) + if not parsed: + errors.append(f"Batch {i}: JSON parse failed") + continue + + # Handle single object response + items = parsed if isinstance(parsed, list) else [parsed] + + for item in items: + idx = item.get("control_index", 0) - 1 + if idx < 0 or idx >= len(batch): + continue + + row = batch[idx] + updates = [] + params = {"cid": str(row.id)} + + new_title = item.get("title") + if new_title and new_title != "SKIP" and (not row.title or row.title == "None"): + updates.append("title = :title") + params["title"] = new_title + + new_obj = item.get("objective") + if new_obj and new_obj != "SKIP" and (not row.objective or row.objective == "None"): + updates.append("objective = :objective") + params["objective"] = new_obj + + new_req = item.get("requirements") + if new_req and new_req != "SKIP" and (not row.requirements or row.requirements in ("[]", "null", "None")): + if isinstance(new_req, list): + updates.append("requirements = CAST(:requirements AS jsonb)") + params["requirements"] = json.dumps(new_req) + + if not updates: + skipped += 1 + continue + + if not req.dry_run: + updates.append("updated_at = NOW()") + db.execute(text(f""" + UPDATE compliance.canonical_controls + SET {', '.join(updates)} + WHERE id = CAST(:cid AS uuid) + """), params) + + repaired += 1 + + if not req.dry_run: + db.commit() + + except Exception as e: + errors.append(f"Batch {i}: {str(e)[:200]}") + logger.warning("Repair backfill batch %d error: %s", i, e) + db.rollback() + + _repair_backfill_status[backfill_id] = { + "status": "running", "total": total, "repaired": repaired, "skipped": skipped, + "progress": f"{min(i + req.batch_size, total)}/{total}", + "dry_run": req.dry_run, "errors": errors[-10:], + } + + _repair_backfill_status[backfill_id] = { + "status": "completed", "total": total, "repaired": repaired, "skipped": skipped, + "dry_run": req.dry_run, "errors": errors[-50:], + } + logger.info("Repair backfill %s completed: %d/%d repaired, %d skipped", + backfill_id, repaired, total, skipped) + + except Exception as e: + logger.error("Repair backfill %s failed: %s", backfill_id, e) + _repair_backfill_status[backfill_id] = {"status": "failed", "error": str(e)} + finally: + db.close() + + +@router.post("/generate/backfill-repair") +async def start_repair_backfill(req: RepairBackfillRequest): + """Repair controls with missing title, objective, or requirements using Anthropic API. + + Finds draft controls where title/objective/requirements are missing or empty, + and generates the missing fields from available context (source text, other fields). + Default is dry_run=True (preview only, no DB changes). + """ + import uuid + backfill_id = str(uuid.uuid4())[:8] + _repair_backfill_status[backfill_id] = {"status": "starting"} + asyncio.create_task(_run_repair_backfill(req, backfill_id)) + return { + "status": "running", + "backfill_id": backfill_id, + "message": f"Repair backfill started. Poll /generate/repair-backfill-status/{backfill_id}", + } + + +@router.get("/generate/repair-backfill-status/{backfill_id}") +async def get_repair_backfill_status(backfill_id: str): + """Get status of a repair backfill job.""" + status = _repair_backfill_status.get(backfill_id) + if not status: + raise HTTPException(status_code=404, detail="Repair backfill job not found") + return status + + # ============================================================================= # ANCHOR BACKFILL # =============================================================================