feat(pipeline): Pass 0b prompt v4 + Haiku backfill endpoint

Prompt v4 adds 6 new fields to Pass 0b output:
- applicability: condition rules (same format as dependency engine)
- check_type: expanded to 10 granular types
- scanner_hint: search_terms + negative_indicators for MCP
- manual_review_required_if: escalation conditions
- evidence_type: code/process/hybrid
- provides_context: context variables this control creates

New endpoint POST /generate/backfill-extended:
- Backfills existing 9k controls via Haiku Batch API (~$1.50)
- Adds all 6 new fields to generation_metadata
- Supports dry_run mode

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Benjamin Admin
2026-04-26 23:14:59 +02:00
parent 96b8f25747
commit 5ef039a6bc
2 changed files with 285 additions and 4 deletions

View File

@@ -2321,6 +2321,238 @@ async def submit_pass0b(req: SubmitPass0bRequest):
db.close()
# =============================================================================
# HAIKU BACKFILL: Extended Fields (applicability, check_type, scanner_hint, etc.)
# =============================================================================
_BACKFILL_SYSTEM_PROMPT = """\
Du bist ein Security-Compliance-Experte. Du erhaeltst bestehende atomare Controls \
und ergaenzt fehlende Felder. Aendere NIEMALS bestehende Felder (title, assertion, \
objective, requirements, evidence, severity, category).
Ergaenze NUR diese 6 Felder:
1. applicability: Unter welchen Bedingungen gilt dieses Control?
Universell: {}
Bedingt: {"field": "context.SIGNAL", "op": "==", "value": true}
Zusammengesetzt: {"operator": "AND", "clauses": [{...}, {...}]}
Typische Felder: context.uses_oauth, context.has_public_api,
context.processes_personal_data, context.uses_ai_system,
context.has_employees, context.sells_online, context.uses_encryption
2. check_type: EINEN der 10 Werte:
technical_config_check, code_pattern_check, runtime_security_test,
document_policy_check, document_classification_check, document_contract_check,
evidence_artifact_check, process_verification, training_verification, interview_assessment
3. scanner_hint: {"search_terms": [...], "negative_indicators": [...]}
4. manual_review_required_if: ["Bedingung 1", "Bedingung 2"]
5. evidence_type: code|process|hybrid
6. provides_context: ["context.VARIABLE", ...] oder []
Antworte als JSON-Objekt mit Control-ID als Key."""
class BackfillExtendedRequest(BaseModel):
limit: int = 0
batch_size: int = 10
dry_run: bool = True
_backfill_extended_status: dict = {}
async def _run_backfill_extended(req: BackfillExtendedRequest, job_id: str):
"""Run extended field backfill via Haiku Batch API."""
from services.decomposition_pass import (
create_anthropic_batch, fetch_batch_results, check_batch_status,
)
db = SessionLocal()
try:
_backfill_extended_status[job_id] = {"status": "loading_controls"}
query = """
SELECT id::text, control_id, title,
generation_metadata->>'assertion' as assertion,
objective, category, severity,
generation_metadata->>'merge_group_hint' as merge_key
FROM canonical_controls
WHERE release_state = 'draft'
AND generation_metadata->>'decomposition_method' = 'pass0b'
AND (generation_metadata->>'applicability' IS NULL
OR generation_metadata->>'applicability' = ''
OR generation_metadata->>'applicability' = '{}')
"""
if req.limit > 0:
query += f" LIMIT {req.limit}"
rows = db.execute(text(query)).fetchall()
total = len(rows)
_backfill_extended_status[job_id] = {
"status": "preparing_batches", "total_controls": total,
}
if total == 0:
_backfill_extended_status[job_id] = {
"status": "completed", "total_controls": 0, "message": "Nothing to backfill",
}
return
if req.dry_run:
_backfill_extended_status[job_id] = {
"status": "dry_run_complete", "total_controls": total,
"estimated_requests": (total + req.batch_size - 1) // req.batch_size,
}
return
# Build batch requests
requests = []
for i in range(0, total, req.batch_size):
batch = rows[i:i + req.batch_size]
controls_text = ""
for r in batch:
controls_text += f"""
Control-ID: {r[1]}
Titel: {r[2]}
Assertion: {r[3] or ''}
Objective: {r[4] or ''}
Kategorie: {r[5] or ''}
Severity: {r[6] or ''}
Merge-Key: {r[7] or ''}
---
"""
prompt = f"Ergaenze die fehlenden 6 Felder fuer diese {len(batch)} Controls:\n{controls_text}"
batch_idx = i // req.batch_size
requests.append({
"custom_id": f"bf_ext_b{batch_idx:05d}",
"params": {
"model": "claude-haiku-4-5-20251001",
"max_tokens": max(2048, len(batch) * 400),
"system": [
{
"type": "text",
"text": _BACKFILL_SYSTEM_PROMPT,
"cache_control": {"type": "ephemeral"},
}
],
"messages": [{"role": "user", "content": prompt}],
},
})
_backfill_extended_status[job_id] = {
"status": "submitting_batch", "total_controls": total,
"total_requests": len(requests),
}
batch_result = await create_anthropic_batch(requests)
batch_id = batch_result.get("id", "")
_backfill_extended_status[job_id] = {
"status": "batch_submitted", "batch_id": batch_id,
"total_controls": total, "total_requests": len(requests),
}
# Poll for completion
import asyncio
for _ in range(360): # max 1 hour
await asyncio.sleep(10)
status = await check_batch_status(batch_id)
if status.get("processing_status") == "ended":
break
# Process results
results = await fetch_batch_results(batch_id)
updated = 0
errors = 0
for result in results:
result_data = result.get("result", {})
if result_data.get("type") != "succeeded":
errors += 1
continue
message = result_data.get("message", {})
content = message.get("content", [])
text_content = content[0].get("text", "") if content else ""
try:
import json as json_mod
# Try to parse JSON object
parsed = json_mod.loads(text_content) if text_content.strip().startswith("{") else {}
if not parsed:
# Try extracting JSON from markdown
import re
json_match = re.search(r'\{[\s\S]*\}', text_content)
if json_match:
parsed = json_mod.loads(json_match.group())
for control_id, fields in parsed.items():
if not isinstance(fields, dict):
continue
# Merge into generation_metadata
db.execute(text("""
UPDATE canonical_controls
SET generation_metadata = generation_metadata || CAST(:new_fields AS jsonb),
updated_at = NOW()
WHERE control_id = :cid
AND release_state = 'draft'
"""), {
"cid": control_id,
"new_fields": json_mod.dumps({
"applicability": fields.get("applicability", {}),
"check_type": fields.get("check_type", ""),
"scanner_hint": fields.get("scanner_hint", {}),
"manual_review_required_if": fields.get("manual_review_required_if", []),
"evidence_type": fields.get("evidence_type", ""),
"provides_context": fields.get("provides_context", []),
}),
})
updated += 1
db.commit()
except Exception as e:
logger.error("Backfill parse error: %s", e)
errors += 1
try:
db.rollback()
except Exception:
pass
_backfill_extended_status[job_id] = {
"status": "completed", "batch_id": batch_id,
"total_controls": total, "updated": updated, "errors": errors,
}
except Exception as e:
logger.error("Backfill extended %s failed: %s", job_id, e)
_backfill_extended_status[job_id] = {"status": "failed", "error": str(e)}
finally:
db.close()
@router.post("/generate/backfill-extended")
async def start_backfill_extended(req: BackfillExtendedRequest):
"""Backfill extended fields (applicability, scanner_hint, etc.) via Haiku Batch API."""
import uuid as uuid_mod
job_id = str(uuid_mod.uuid4())[:8]
_backfill_extended_status[job_id] = {"status": "starting"}
asyncio.create_task(_run_backfill_extended(req, job_id))
return {
"status": "running", "job_id": job_id,
"message": f"Backfill started. Poll /generate/backfill-extended-status/{job_id}",
}
@router.get("/generate/backfill-extended-status/{job_id}")
async def get_backfill_extended_status(job_id: str):
status = _backfill_extended_status.get(job_id)
if not status:
raise HTTPException(status_code=404, detail="Backfill job not found")
return status
@router.get("/generate/quality-metrics")
async def get_quality_metrics(
since_hours: int = Query(default=0, description="Only count controls created in last N hours (0=all)"),