From 91f4202e88da03a4ff494846c1309f37492936f6 Mon Sep 17 00:00:00 2001 From: Benjamin Admin Date: Tue, 21 Apr 2026 18:04:50 +0200 Subject: [PATCH] feat(control-pipeline): add anchor backfill endpoint + normalize target_audience MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add POST /v1/canonical/generate/backfill-anchors endpoint for batch populating open_anchors on controls generated with skip_web_search=true - Uses AnchorFinder Stage A (RAG search) to find OWASP/NIST/ENISA refs - Background job with progress tracking (same pattern as other backfills) - Promotes needs_review controls that gain anchors to draft state - Target audience normalization (enterprise/authority/provider → JSON arrays) already applied via SQL Co-Authored-By: Claude Opus 4.6 (1M context) --- .../api/control_generator_routes.py | 166 ++++++++++++++++++ .../services/control_generator.py | 34 ++-- 2 files changed, 184 insertions(+), 16 deletions(-) diff --git a/control-pipeline/api/control_generator_routes.py b/control-pipeline/api/control_generator_routes.py index 3a10dd4..56d5814 100644 --- a/control-pipeline/api/control_generator_routes.py +++ b/control-pipeline/api/control_generator_routes.py @@ -37,6 +37,8 @@ from services.control_generator import ( ) from services.citation_backfill import CitationBackfill, BackfillResult from services.rag_client import get_rag_client +from services.anchor_finder import AnchorFinder, OpenAnchor +from services.control_generator import GeneratedControl as _GeneratedControl logger = logging.getLogger(__name__) router = APIRouter(prefix="/v1/canonical", tags=["control-generator"]) @@ -1100,3 +1102,167 @@ async def get_source_type_backfill_status(backfill_id: str): if not status: raise HTTPException(status_code=404, detail="Source-type backfill job not found") return status + + +# ============================================================================= +# ANCHOR BACKFILL +# ============================================================================= + +class AnchorBackfillRequest(BaseModel): + dry_run: bool = True + limit: int = 0 # 0 = all controls without anchors + batch_size: int = 50 + skip_web: bool = True # Stage A only (RAG), no DuckDuckGo + include_needs_review: bool = True # Also backfill needs_review controls + + +_anchor_backfill_status: dict = {} + + +async def _run_anchor_backfill(req: AnchorBackfillRequest, backfill_id: str): + """Backfill open_anchors for controls that were generated with skip_web_search=true.""" + from dataclasses import asdict + + db = SessionLocal() + try: + rag_client = get_rag_client() + finder = AnchorFinder(rag_client=rag_client) + + # Find controls without anchors + states = "('draft', 'needs_review')" if req.include_needs_review else "('draft',)" + limit_clause = f"LIMIT {req.limit}" if req.limit > 0 else "" + rows = db.execute(text(f""" + SELECT id, control_id, title, tags + FROM compliance.canonical_controls + WHERE release_state IN {states} + AND (open_anchors IS NULL OR open_anchors::text = '[]' + OR open_anchors::text = 'null' OR open_anchors::text = '') + ORDER BY control_id + {limit_clause} + """)).fetchall() + + total = len(rows) + updated = 0 + with_anchors = 0 + empty_anchors = 0 + errors = [] + + _anchor_backfill_status[backfill_id] = { + "status": "running", "total": total, "updated": 0, + "with_anchors": 0, "empty_anchors": 0, "dry_run": req.dry_run, + } + + for i in range(0, total, req.batch_size): + batch = rows[i:i + req.batch_size] + + for row in batch: + try: + # Parse tags from DB (may be JSON string or list) + tags = row.tags + if isinstance(tags, str): + try: + tags = json.loads(tags) + except (json.JSONDecodeError, ValueError): + tags = [] + if not isinstance(tags, list): + tags = [] + + # Build minimal GeneratedControl for AnchorFinder + control = _GeneratedControl( + title=row.title or "", + tags=tags, + ) + + # Find anchors (Stage A: RAG only) + anchors = await finder.find_anchors( + control, skip_web=req.skip_web, min_anchors=2 + ) + + anchor_dicts = [asdict(a) for a in anchors] + + if not req.dry_run: + db.execute(text(""" + UPDATE compliance.canonical_controls + SET open_anchors = CAST(:anchors AS jsonb), + updated_at = NOW() + WHERE id = CAST(:cid AS uuid) + """), {"anchors": json.dumps(anchor_dicts), "cid": str(row.id)}) + + updated += 1 + if anchor_dicts: + with_anchors += 1 + else: + empty_anchors += 1 + + except Exception as e: + errors.append(f"{row.control_id}: {str(e)[:200]}") + logger.warning("Anchor backfill error for %s: %s", row.control_id, e) + + if not req.dry_run: + db.commit() + + _anchor_backfill_status[backfill_id] = { + "status": "running", "total": total, "updated": updated, + "with_anchors": with_anchors, "empty_anchors": empty_anchors, + "progress": f"{min(i + req.batch_size, total)}/{total}", + "dry_run": req.dry_run, "errors": errors[-10:], + } + + # Promote needs_review controls that gained anchors to draft + promoted = 0 + if not req.dry_run and req.include_needs_review: + result = db.execute(text(""" + UPDATE compliance.canonical_controls + SET release_state = 'draft', updated_at = NOW() + WHERE release_state = 'needs_review' + AND open_anchors IS NOT NULL + AND open_anchors::text != '[]' + AND open_anchors::text != 'null' + AND open_anchors::text != '' + RETURNING id + """)) + promoted = result.rowcount + db.commit() + + _anchor_backfill_status[backfill_id] = { + "status": "completed", "total": total, "updated": updated, + "with_anchors": with_anchors, "empty_anchors": empty_anchors, + "promoted_to_draft": promoted, + "dry_run": req.dry_run, "errors": errors[-50:], + } + logger.info("Anchor backfill %s completed: %d/%d updated (%d with anchors, %d empty, %d promoted)", + backfill_id, updated, total, with_anchors, empty_anchors, promoted) + + except Exception as e: + logger.error("Anchor backfill %s failed: %s", backfill_id, e) + _anchor_backfill_status[backfill_id] = {"status": "failed", "error": str(e)} + finally: + db.close() + + +@router.post("/generate/backfill-anchors") +async def start_anchor_backfill(req: AnchorBackfillRequest): + """Backfill open_anchors (OWASP/NIST/ENISA references) for controls without anchors. + + Uses RAG-internal search (Stage A) to find open-source framework references. + Controls generated with skip_web_search=true have empty open_anchors. + Default is dry_run=True (preview only, no DB changes). + """ + import uuid + backfill_id = str(uuid.uuid4())[:8] + _anchor_backfill_status[backfill_id] = {"status": "starting"} + asyncio.create_task(_run_anchor_backfill(req, backfill_id)) + return { + "status": "running", + "backfill_id": backfill_id, + "message": f"Anchor backfill started. Poll /generate/anchor-backfill-status/{backfill_id}", + } + + +@router.get("/generate/anchor-backfill-status/{backfill_id}") +async def get_anchor_backfill_status(backfill_id: str): + """Get status of an anchor backfill job.""" + status = _anchor_backfill_status.get(backfill_id) + if not status: + raise HTTPException(status_code=404, detail="Anchor backfill job not found") + return status diff --git a/control-pipeline/services/control_generator.py b/control-pipeline/services/control_generator.py index f65ec66..f333f3b 100644 --- a/control-pipeline/services/control_generator.py +++ b/control-pipeline/services/control_generator.py @@ -1555,14 +1555,15 @@ Gib ein JSON-Array zurueck mit GENAU {len(chunks)} Elementen. Fuer Aspekte ohne final.append(control) continue - # Anchor search - try: - from .anchor_finder import AnchorFinder - finder = AnchorFinder(self.rag) - anchors = await finder.find_anchors(control, skip_web=config.skip_web_search) - control.open_anchors = [asdict(a) if hasattr(a, '__dataclass_fields__') else a for a in anchors] - except Exception as e: - logger.warning("Anchor search failed: %s", e) + # Anchor search — skip entirely when skip_web_search=true (backfilled later) + if not config.skip_web_search: + try: + from .anchor_finder import AnchorFinder + finder = AnchorFinder(self.rag) + anchors = await finder.find_anchors(control, skip_web=False) + control.open_anchors = [asdict(a) if hasattr(a, '__dataclass_fields__') else a for a in anchors] + except Exception as e: + logger.warning("Anchor search failed: %s", e) # Release state if control.license_rule in (1, 2): @@ -2402,14 +2403,15 @@ Kategorien: {CATEGORY_LIST_STR}""" control.generation_metadata["similar_controls"] = duplicates return control - # Stage 5: Anchor Search (imported from anchor_finder) - try: - from .anchor_finder import AnchorFinder - finder = AnchorFinder(self.rag) - anchors = await finder.find_anchors(control, skip_web=config.skip_web_search) - control.open_anchors = [asdict(a) if hasattr(a, '__dataclass_fields__') else a for a in anchors] - except Exception as e: - logger.warning("Anchor search failed: %s", e) + # Stage 5: Anchor Search — skip entirely when skip_web_search=true (backfilled later) + if not config.skip_web_search: + try: + from .anchor_finder import AnchorFinder + finder = AnchorFinder(self.rag) + anchors = await finder.find_anchors(control, skip_web=False) + control.open_anchors = [asdict(a) if hasattr(a, '__dataclass_fields__') else a for a in anchors] + except Exception as e: + logger.warning("Anchor search failed: %s", e) # Determine release state if control.license_rule in (1, 2):