feat(control-pipeline): add repair backfill endpoint for missing title/objective/requirements

POST /v1/canonical/generate/backfill-repair uses Anthropic API to
generate missing fields from available context (source text, other
fields). Handles 1,470 controls with incomplete data.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-04-22 07:06:19 +02:00
parent 24e57f558e
commit 8b7671d310

View File

@@ -1104,6 +1104,238 @@ async def get_source_type_backfill_status(backfill_id: str):
return status
# =============================================================================
# REPAIR BACKFILL — Fix controls with missing title/objective/requirements
# =============================================================================
class RepairBackfillRequest(BaseModel):
dry_run: bool = True
limit: int = 0 # 0 = all
batch_size: int = 10
_repair_backfill_status: dict = {}
async def _run_repair_backfill(req: RepairBackfillRequest, backfill_id: str):
"""Repair controls with missing title, objective, or requirements using Anthropic API."""
import os
import httpx
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
ANTHROPIC_MODEL = os.getenv("CONTROL_GEN_ANTHROPIC_MODEL", "claude-sonnet-4-6")
if not ANTHROPIC_API_KEY:
_repair_backfill_status[backfill_id] = {
"status": "failed", "error": "ANTHROPIC_API_KEY not set"
}
return
db = SessionLocal()
try:
# Find controls needing repair: missing title OR missing objective OR missing requirements
limit_clause = f"LIMIT {req.limit}" if req.limit > 0 else ""
rows = db.execute(text(f"""
SELECT id, control_id, title, objective, requirements::text as requirements,
source_original_text, tags::text as tags, category
FROM compliance.canonical_controls
WHERE release_state = 'draft'
AND (
(title IS NULL OR title = 'None' OR title = '')
OR (objective IS NULL OR objective = 'None' OR objective = '')
OR (requirements IS NULL OR requirements::text = '[]' OR requirements::text = 'null')
)
ORDER BY control_id
{limit_clause}
""")).fetchall()
total = len(rows)
repaired = 0
skipped = 0
errors = []
_repair_backfill_status[backfill_id] = {
"status": "running", "total": total, "repaired": 0, "skipped": 0,
"dry_run": req.dry_run, "errors": [],
}
for i in range(0, total, req.batch_size):
batch = rows[i:i + req.batch_size]
entries = []
for idx, row in enumerate(batch):
# Collect all available context
available = []
if row.title and row.title != "None":
available.append(f"Titel: {row.title}")
if row.objective and row.objective != "None":
available.append(f"Objective: {row.objective[:500]}")
if row.requirements and row.requirements not in ("[]", "null", "None"):
available.append(f"Requirements: {row.requirements[:500]}")
if row.source_original_text and len(row.source_original_text) > 20:
available.append(f"Quelltext: {row.source_original_text[:800]}")
if row.category:
available.append(f"Kategorie: {row.category}")
missing = []
if not row.title or row.title == "None":
missing.append("title")
if not row.objective or row.objective == "None":
missing.append("objective")
if not row.requirements or row.requirements in ("[]", "null", "None"):
missing.append("requirements")
entries.append(
f"--- CONTROL {idx + 1}: {row.control_id} ---\n"
f"Fehlend: {', '.join(missing)}\n"
f"{'chr(10)'.join(available)}\n"
)
prompt = f"""Repariere die folgenden {len(batch)} Compliance-Controls. Fuer jedes Control fehlen bestimmte Felder.
Regeln:
- title: Kurzer, praegnanter deutscher Titel (max 80 Zeichen). Erster Buchstabe gross.
- objective: 1-2 Saetze die das Ziel des Controls beschreiben.
- requirements: JSON-Array mit 2-5 konkreten Anforderungen als Strings.
- Nur die fehlenden Felder generieren. Bestehende Felder NICHT aendern.
- Wenn nicht genug Kontext vorhanden ist, schreibe "SKIP" als Wert.
Antworte mit einem JSON-Array. Jedes Objekt hat:
- control_index: 1-basierter Index
- title: (nur wenn fehlend, sonst null)
- objective: (nur wenn fehlend, sonst null)
- requirements: (nur wenn fehlend, sonst null — als JSON-Array von Strings)
{chr(10).join(entries)}"""
try:
headers = {
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
}
payload = {
"model": ANTHROPIC_MODEL,
"max_tokens": 4096,
"system": "Du bist ein Compliance-Experte. Repariere unvollstaendige Controls. Antworte NUR mit validem JSON.",
"messages": [{"role": "user", "content": prompt}],
}
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
"https://api.anthropic.com/v1/messages",
headers=headers,
json=payload,
)
if resp.status_code != 200:
errors.append(f"Batch {i}: API {resp.status_code}")
continue
content = resp.json().get("content", [{}])[0].get("text", "")
parsed = _parse_llm_json(content)
if not parsed:
errors.append(f"Batch {i}: JSON parse failed")
continue
# Handle single object response
items = parsed if isinstance(parsed, list) else [parsed]
for item in items:
idx = item.get("control_index", 0) - 1
if idx < 0 or idx >= len(batch):
continue
row = batch[idx]
updates = []
params = {"cid": str(row.id)}
new_title = item.get("title")
if new_title and new_title != "SKIP" and (not row.title or row.title == "None"):
updates.append("title = :title")
params["title"] = new_title
new_obj = item.get("objective")
if new_obj and new_obj != "SKIP" and (not row.objective or row.objective == "None"):
updates.append("objective = :objective")
params["objective"] = new_obj
new_req = item.get("requirements")
if new_req and new_req != "SKIP" and (not row.requirements or row.requirements in ("[]", "null", "None")):
if isinstance(new_req, list):
updates.append("requirements = CAST(:requirements AS jsonb)")
params["requirements"] = json.dumps(new_req)
if not updates:
skipped += 1
continue
if not req.dry_run:
updates.append("updated_at = NOW()")
db.execute(text(f"""
UPDATE compliance.canonical_controls
SET {', '.join(updates)}
WHERE id = CAST(:cid AS uuid)
"""), params)
repaired += 1
if not req.dry_run:
db.commit()
except Exception as e:
errors.append(f"Batch {i}: {str(e)[:200]}")
logger.warning("Repair backfill batch %d error: %s", i, e)
db.rollback()
_repair_backfill_status[backfill_id] = {
"status": "running", "total": total, "repaired": repaired, "skipped": skipped,
"progress": f"{min(i + req.batch_size, total)}/{total}",
"dry_run": req.dry_run, "errors": errors[-10:],
}
_repair_backfill_status[backfill_id] = {
"status": "completed", "total": total, "repaired": repaired, "skipped": skipped,
"dry_run": req.dry_run, "errors": errors[-50:],
}
logger.info("Repair backfill %s completed: %d/%d repaired, %d skipped",
backfill_id, repaired, total, skipped)
except Exception as e:
logger.error("Repair backfill %s failed: %s", backfill_id, e)
_repair_backfill_status[backfill_id] = {"status": "failed", "error": str(e)}
finally:
db.close()
@router.post("/generate/backfill-repair")
async def start_repair_backfill(req: RepairBackfillRequest):
"""Repair controls with missing title, objective, or requirements using Anthropic API.
Finds draft controls where title/objective/requirements are missing or empty,
and generates the missing fields from available context (source text, other fields).
Default is dry_run=True (preview only, no DB changes).
"""
import uuid
backfill_id = str(uuid.uuid4())[:8]
_repair_backfill_status[backfill_id] = {"status": "starting"}
asyncio.create_task(_run_repair_backfill(req, backfill_id))
return {
"status": "running",
"backfill_id": backfill_id,
"message": f"Repair backfill started. Poll /generate/repair-backfill-status/{backfill_id}",
}
@router.get("/generate/repair-backfill-status/{backfill_id}")
async def get_repair_backfill_status(backfill_id: str):
"""Get status of a repair backfill job."""
status = _repair_backfill_status.get(backfill_id)
if not status:
raise HTTPException(status_code=404, detail="Repair backfill job not found")
return status
# =============================================================================
# ANCHOR BACKFILL
# =============================================================================