feat(control-pipeline): add anchor backfill endpoint + normalize target_audience
- Add POST /v1/canonical/generate/backfill-anchors endpoint for batch populating open_anchors on controls generated with skip_web_search=true - Uses AnchorFinder Stage A (RAG search) to find OWASP/NIST/ENISA refs - Background job with progress tracking (same pattern as other backfills) - Promotes needs_review controls that gain anchors to draft state - Target audience normalization (enterprise/authority/provider → JSON arrays) already applied via SQL Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -37,6 +37,8 @@ from services.control_generator import (
|
|||||||
)
|
)
|
||||||
from services.citation_backfill import CitationBackfill, BackfillResult
|
from services.citation_backfill import CitationBackfill, BackfillResult
|
||||||
from services.rag_client import get_rag_client
|
from services.rag_client import get_rag_client
|
||||||
|
from services.anchor_finder import AnchorFinder, OpenAnchor
|
||||||
|
from services.control_generator import GeneratedControl as _GeneratedControl
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
router = APIRouter(prefix="/v1/canonical", tags=["control-generator"])
|
router = APIRouter(prefix="/v1/canonical", tags=["control-generator"])
|
||||||
@@ -1100,3 +1102,167 @@ async def get_source_type_backfill_status(backfill_id: str):
|
|||||||
if not status:
|
if not status:
|
||||||
raise HTTPException(status_code=404, detail="Source-type backfill job not found")
|
raise HTTPException(status_code=404, detail="Source-type backfill job not found")
|
||||||
return status
|
return status
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# ANCHOR BACKFILL
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
class AnchorBackfillRequest(BaseModel):
|
||||||
|
dry_run: bool = True
|
||||||
|
limit: int = 0 # 0 = all controls without anchors
|
||||||
|
batch_size: int = 50
|
||||||
|
skip_web: bool = True # Stage A only (RAG), no DuckDuckGo
|
||||||
|
include_needs_review: bool = True # Also backfill needs_review controls
|
||||||
|
|
||||||
|
|
||||||
|
_anchor_backfill_status: dict = {}
|
||||||
|
|
||||||
|
|
||||||
|
async def _run_anchor_backfill(req: AnchorBackfillRequest, backfill_id: str):
|
||||||
|
"""Backfill open_anchors for controls that were generated with skip_web_search=true."""
|
||||||
|
from dataclasses import asdict
|
||||||
|
|
||||||
|
db = SessionLocal()
|
||||||
|
try:
|
||||||
|
rag_client = get_rag_client()
|
||||||
|
finder = AnchorFinder(rag_client=rag_client)
|
||||||
|
|
||||||
|
# Find controls without anchors
|
||||||
|
states = "('draft', 'needs_review')" if req.include_needs_review else "('draft',)"
|
||||||
|
limit_clause = f"LIMIT {req.limit}" if req.limit > 0 else ""
|
||||||
|
rows = db.execute(text(f"""
|
||||||
|
SELECT id, control_id, title, tags
|
||||||
|
FROM compliance.canonical_controls
|
||||||
|
WHERE release_state IN {states}
|
||||||
|
AND (open_anchors IS NULL OR open_anchors::text = '[]'
|
||||||
|
OR open_anchors::text = 'null' OR open_anchors::text = '')
|
||||||
|
ORDER BY control_id
|
||||||
|
{limit_clause}
|
||||||
|
""")).fetchall()
|
||||||
|
|
||||||
|
total = len(rows)
|
||||||
|
updated = 0
|
||||||
|
with_anchors = 0
|
||||||
|
empty_anchors = 0
|
||||||
|
errors = []
|
||||||
|
|
||||||
|
_anchor_backfill_status[backfill_id] = {
|
||||||
|
"status": "running", "total": total, "updated": 0,
|
||||||
|
"with_anchors": 0, "empty_anchors": 0, "dry_run": req.dry_run,
|
||||||
|
}
|
||||||
|
|
||||||
|
for i in range(0, total, req.batch_size):
|
||||||
|
batch = rows[i:i + req.batch_size]
|
||||||
|
|
||||||
|
for row in batch:
|
||||||
|
try:
|
||||||
|
# Parse tags from DB (may be JSON string or list)
|
||||||
|
tags = row.tags
|
||||||
|
if isinstance(tags, str):
|
||||||
|
try:
|
||||||
|
tags = json.loads(tags)
|
||||||
|
except (json.JSONDecodeError, ValueError):
|
||||||
|
tags = []
|
||||||
|
if not isinstance(tags, list):
|
||||||
|
tags = []
|
||||||
|
|
||||||
|
# Build minimal GeneratedControl for AnchorFinder
|
||||||
|
control = _GeneratedControl(
|
||||||
|
title=row.title or "",
|
||||||
|
tags=tags,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Find anchors (Stage A: RAG only)
|
||||||
|
anchors = await finder.find_anchors(
|
||||||
|
control, skip_web=req.skip_web, min_anchors=2
|
||||||
|
)
|
||||||
|
|
||||||
|
anchor_dicts = [asdict(a) for a in anchors]
|
||||||
|
|
||||||
|
if not req.dry_run:
|
||||||
|
db.execute(text("""
|
||||||
|
UPDATE compliance.canonical_controls
|
||||||
|
SET open_anchors = CAST(:anchors AS jsonb),
|
||||||
|
updated_at = NOW()
|
||||||
|
WHERE id = CAST(:cid AS uuid)
|
||||||
|
"""), {"anchors": json.dumps(anchor_dicts), "cid": str(row.id)})
|
||||||
|
|
||||||
|
updated += 1
|
||||||
|
if anchor_dicts:
|
||||||
|
with_anchors += 1
|
||||||
|
else:
|
||||||
|
empty_anchors += 1
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
errors.append(f"{row.control_id}: {str(e)[:200]}")
|
||||||
|
logger.warning("Anchor backfill error for %s: %s", row.control_id, e)
|
||||||
|
|
||||||
|
if not req.dry_run:
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
_anchor_backfill_status[backfill_id] = {
|
||||||
|
"status": "running", "total": total, "updated": updated,
|
||||||
|
"with_anchors": with_anchors, "empty_anchors": empty_anchors,
|
||||||
|
"progress": f"{min(i + req.batch_size, total)}/{total}",
|
||||||
|
"dry_run": req.dry_run, "errors": errors[-10:],
|
||||||
|
}
|
||||||
|
|
||||||
|
# Promote needs_review controls that gained anchors to draft
|
||||||
|
promoted = 0
|
||||||
|
if not req.dry_run and req.include_needs_review:
|
||||||
|
result = db.execute(text("""
|
||||||
|
UPDATE compliance.canonical_controls
|
||||||
|
SET release_state = 'draft', updated_at = NOW()
|
||||||
|
WHERE release_state = 'needs_review'
|
||||||
|
AND open_anchors IS NOT NULL
|
||||||
|
AND open_anchors::text != '[]'
|
||||||
|
AND open_anchors::text != 'null'
|
||||||
|
AND open_anchors::text != ''
|
||||||
|
RETURNING id
|
||||||
|
"""))
|
||||||
|
promoted = result.rowcount
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
_anchor_backfill_status[backfill_id] = {
|
||||||
|
"status": "completed", "total": total, "updated": updated,
|
||||||
|
"with_anchors": with_anchors, "empty_anchors": empty_anchors,
|
||||||
|
"promoted_to_draft": promoted,
|
||||||
|
"dry_run": req.dry_run, "errors": errors[-50:],
|
||||||
|
}
|
||||||
|
logger.info("Anchor backfill %s completed: %d/%d updated (%d with anchors, %d empty, %d promoted)",
|
||||||
|
backfill_id, updated, total, with_anchors, empty_anchors, promoted)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Anchor backfill %s failed: %s", backfill_id, e)
|
||||||
|
_anchor_backfill_status[backfill_id] = {"status": "failed", "error": str(e)}
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/generate/backfill-anchors")
|
||||||
|
async def start_anchor_backfill(req: AnchorBackfillRequest):
|
||||||
|
"""Backfill open_anchors (OWASP/NIST/ENISA references) for controls without anchors.
|
||||||
|
|
||||||
|
Uses RAG-internal search (Stage A) to find open-source framework references.
|
||||||
|
Controls generated with skip_web_search=true have empty open_anchors.
|
||||||
|
Default is dry_run=True (preview only, no DB changes).
|
||||||
|
"""
|
||||||
|
import uuid
|
||||||
|
backfill_id = str(uuid.uuid4())[:8]
|
||||||
|
_anchor_backfill_status[backfill_id] = {"status": "starting"}
|
||||||
|
asyncio.create_task(_run_anchor_backfill(req, backfill_id))
|
||||||
|
return {
|
||||||
|
"status": "running",
|
||||||
|
"backfill_id": backfill_id,
|
||||||
|
"message": f"Anchor backfill started. Poll /generate/anchor-backfill-status/{backfill_id}",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/generate/anchor-backfill-status/{backfill_id}")
|
||||||
|
async def get_anchor_backfill_status(backfill_id: str):
|
||||||
|
"""Get status of an anchor backfill job."""
|
||||||
|
status = _anchor_backfill_status.get(backfill_id)
|
||||||
|
if not status:
|
||||||
|
raise HTTPException(status_code=404, detail="Anchor backfill job not found")
|
||||||
|
return status
|
||||||
|
|||||||
@@ -1555,11 +1555,12 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Elementen. Fuer Aspekte ohne
|
|||||||
final.append(control)
|
final.append(control)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Anchor search
|
# Anchor search — skip entirely when skip_web_search=true (backfilled later)
|
||||||
|
if not config.skip_web_search:
|
||||||
try:
|
try:
|
||||||
from .anchor_finder import AnchorFinder
|
from .anchor_finder import AnchorFinder
|
||||||
finder = AnchorFinder(self.rag)
|
finder = AnchorFinder(self.rag)
|
||||||
anchors = await finder.find_anchors(control, skip_web=config.skip_web_search)
|
anchors = await finder.find_anchors(control, skip_web=False)
|
||||||
control.open_anchors = [asdict(a) if hasattr(a, '__dataclass_fields__') else a for a in anchors]
|
control.open_anchors = [asdict(a) if hasattr(a, '__dataclass_fields__') else a for a in anchors]
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Anchor search failed: %s", e)
|
logger.warning("Anchor search failed: %s", e)
|
||||||
@@ -2402,11 +2403,12 @@ Kategorien: {CATEGORY_LIST_STR}"""
|
|||||||
control.generation_metadata["similar_controls"] = duplicates
|
control.generation_metadata["similar_controls"] = duplicates
|
||||||
return control
|
return control
|
||||||
|
|
||||||
# Stage 5: Anchor Search (imported from anchor_finder)
|
# Stage 5: Anchor Search — skip entirely when skip_web_search=true (backfilled later)
|
||||||
|
if not config.skip_web_search:
|
||||||
try:
|
try:
|
||||||
from .anchor_finder import AnchorFinder
|
from .anchor_finder import AnchorFinder
|
||||||
finder = AnchorFinder(self.rag)
|
finder = AnchorFinder(self.rag)
|
||||||
anchors = await finder.find_anchors(control, skip_web=config.skip_web_search)
|
anchors = await finder.find_anchors(control, skip_web=False)
|
||||||
control.open_anchors = [asdict(a) if hasattr(a, '__dataclass_fields__') else a for a in anchors]
|
control.open_anchors = [asdict(a) if hasattr(a, '__dataclass_fields__') else a for a in anchors]
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Anchor search failed: %s", e)
|
logger.warning("Anchor search failed: %s", e)
|
||||||
|
|||||||
Reference in New Issue
Block a user