feat(control-generator): 7-stage pipeline for RAG→LLM→Controls generation
Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 45s
CI/CD / test-python-document-crawler (push) Has been cancelled
CI/CD / test-python-dsms-gateway (push) Has been cancelled
CI/CD / validate-canonical-controls (push) Has been cancelled
CI/CD / deploy-hetzner (push) Has been cancelled
CI/CD / test-python-backend-compliance (push) Has been cancelled
Some checks failed
CI/CD / go-lint (push) Has been skipped
CI/CD / python-lint (push) Has been skipped
CI/CD / nodejs-lint (push) Has been skipped
CI/CD / test-go-ai-compliance (push) Successful in 45s
CI/CD / test-python-document-crawler (push) Has been cancelled
CI/CD / test-python-dsms-gateway (push) Has been cancelled
CI/CD / validate-canonical-controls (push) Has been cancelled
CI/CD / deploy-hetzner (push) Has been cancelled
CI/CD / test-python-backend-compliance (push) Has been cancelled
Implements the Control Generator Pipeline that systematically generates canonical security controls from 150k+ RAG chunks across all compliance collections (BSI, NIST, OWASP, ENISA, EU laws, German laws). Three license rules enforced throughout: - Rule 1 (free_use): Laws/Public Domain — original text preserved - Rule 2 (citation_required): CC-BY/CC-BY-SA — text with citation - Rule 3 (restricted): BSI/ISO — full reformulation, no source traces New files: - Migration 046: job tracking, chunk tracking, blocked sources tables - control_generator.py: 7-stage pipeline (scan→classify→structure/reform→harmonize→anchor→store→mark) - anchor_finder.py: RAG + DuckDuckGo open-source reference search - control_generator_routes.py: REST API (generate, review, stats, blocked-sources) - test_control_generator.py: license mapping, rule enforcement, anchor filtering tests Modified: - __init__.py: register control_generator_router - route.ts: proxy generator/review/stats endpoints - page.tsx: Generator modal, stats panel, state filter, review queue, license badges Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
188
backend-compliance/compliance/services/anchor_finder.py
Normal file
188
backend-compliance/compliance/services/anchor_finder.py
Normal file
@@ -0,0 +1,188 @@
|
||||
"""
|
||||
Anchor Finder — finds open-source references (OWASP, NIST, ENISA) for controls.
|
||||
|
||||
Two-stage search:
|
||||
Stage A: RAG-internal search for open-source chunks matching the control topic
|
||||
Stage B: Web search via DuckDuckGo Instant Answer API (no API key needed)
|
||||
|
||||
Only open-source references (Rule 1+2) are accepted as anchors.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
|
||||
import httpx
|
||||
|
||||
from .rag_client import ComplianceRAGClient, get_rag_client
|
||||
from .control_generator import (
|
||||
GeneratedControl,
|
||||
REGULATION_LICENSE_MAP,
|
||||
_RULE2_PREFIXES,
|
||||
_RULE3_PREFIXES,
|
||||
_classify_regulation,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Regulation codes that are safe to reference as open anchors (Rule 1+2)
|
||||
_OPEN_SOURCE_RULES = {1, 2}
|
||||
|
||||
|
||||
@dataclass
|
||||
class OpenAnchor:
|
||||
framework: str
|
||||
ref: str
|
||||
url: str
|
||||
|
||||
|
||||
class AnchorFinder:
|
||||
"""Finds open-source references to anchor generated controls."""
|
||||
|
||||
def __init__(self, rag_client: ComplianceRAGClient | None = None):
|
||||
self.rag = rag_client or get_rag_client()
|
||||
|
||||
async def find_anchors(
|
||||
self,
|
||||
control: GeneratedControl,
|
||||
skip_web: bool = False,
|
||||
min_anchors: int = 2,
|
||||
) -> list[OpenAnchor]:
|
||||
"""Find open-source anchors for a control."""
|
||||
# Stage A: RAG-internal search
|
||||
anchors = await self._search_rag_for_open_anchors(control)
|
||||
|
||||
# Stage B: Web search if not enough anchors
|
||||
if len(anchors) < min_anchors and not skip_web:
|
||||
web_anchors = await self._search_web(control)
|
||||
# Deduplicate by framework+ref
|
||||
existing_keys = {(a.framework, a.ref) for a in anchors}
|
||||
for wa in web_anchors:
|
||||
if (wa.framework, wa.ref) not in existing_keys:
|
||||
anchors.append(wa)
|
||||
|
||||
return anchors
|
||||
|
||||
async def _search_rag_for_open_anchors(self, control: GeneratedControl) -> list[OpenAnchor]:
|
||||
"""Search RAG for chunks from open sources matching the control topic."""
|
||||
# Build search query from control title + first 3 tags
|
||||
tags_str = " ".join(control.tags[:3]) if control.tags else ""
|
||||
query = f"{control.title} {tags_str}".strip()
|
||||
|
||||
results = await self.rag.search(
|
||||
query=query,
|
||||
collection="bp_compliance_ce",
|
||||
top_k=15,
|
||||
)
|
||||
|
||||
anchors: list[OpenAnchor] = []
|
||||
seen: set[str] = set()
|
||||
|
||||
for r in results:
|
||||
if not r.regulation_code:
|
||||
continue
|
||||
|
||||
# Only accept open-source references
|
||||
license_info = _classify_regulation(r.regulation_code)
|
||||
if license_info.get("rule") not in _OPEN_SOURCE_RULES:
|
||||
continue
|
||||
|
||||
# Build reference key for dedup
|
||||
ref = r.article or r.category or ""
|
||||
key = f"{r.regulation_code}:{ref}"
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
|
||||
framework_name = license_info.get("name", r.regulation_name or r.regulation_short or r.regulation_code)
|
||||
url = r.source_url or self._build_reference_url(r.regulation_code, ref)
|
||||
|
||||
anchors.append(OpenAnchor(
|
||||
framework=framework_name,
|
||||
ref=ref,
|
||||
url=url,
|
||||
))
|
||||
|
||||
if len(anchors) >= 5:
|
||||
break
|
||||
|
||||
return anchors
|
||||
|
||||
async def _search_web(self, control: GeneratedControl) -> list[OpenAnchor]:
|
||||
"""Search DuckDuckGo Instant Answer API for open references."""
|
||||
keywords = f"{control.title} security control OWASP NIST"
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
resp = await client.get(
|
||||
"https://api.duckduckgo.com/",
|
||||
params={
|
||||
"q": keywords,
|
||||
"format": "json",
|
||||
"no_html": "1",
|
||||
"skip_disambig": "1",
|
||||
},
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
return []
|
||||
|
||||
data = resp.json()
|
||||
anchors: list[OpenAnchor] = []
|
||||
|
||||
# Parse RelatedTopics
|
||||
for topic in data.get("RelatedTopics", [])[:10]:
|
||||
url = topic.get("FirstURL", "")
|
||||
text = topic.get("Text", "")
|
||||
|
||||
if not url:
|
||||
continue
|
||||
|
||||
# Only accept known open-source domains
|
||||
framework = self._identify_framework_from_url(url)
|
||||
if framework:
|
||||
anchors.append(OpenAnchor(
|
||||
framework=framework,
|
||||
ref=text[:100] if text else url,
|
||||
url=url,
|
||||
))
|
||||
|
||||
if len(anchors) >= 3:
|
||||
break
|
||||
|
||||
return anchors
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("Web anchor search failed: %s", e)
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def _identify_framework_from_url(url: str) -> str | None:
|
||||
"""Identify if a URL belongs to a known open-source framework."""
|
||||
url_lower = url.lower()
|
||||
if "owasp.org" in url_lower:
|
||||
return "OWASP"
|
||||
if "nist.gov" in url_lower or "csrc.nist.gov" in url_lower:
|
||||
return "NIST"
|
||||
if "enisa.europa.eu" in url_lower:
|
||||
return "ENISA"
|
||||
if "cisa.gov" in url_lower:
|
||||
return "CISA"
|
||||
if "eur-lex.europa.eu" in url_lower:
|
||||
return "EU Law"
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _build_reference_url(regulation_code: str, ref: str) -> str:
|
||||
"""Build a reference URL for known frameworks."""
|
||||
code = regulation_code.lower()
|
||||
if code.startswith("owasp"):
|
||||
return "https://owasp.org/www-project-application-security-verification-standard/"
|
||||
if code.startswith("nist"):
|
||||
return "https://csrc.nist.gov/publications"
|
||||
if code.startswith("enisa"):
|
||||
return "https://www.enisa.europa.eu/publications"
|
||||
if code.startswith("eu_"):
|
||||
return "https://eur-lex.europa.eu/"
|
||||
if code == "cisa_secure_by_design":
|
||||
return "https://www.cisa.gov/securebydesign"
|
||||
return ""
|
||||
Reference in New Issue
Block a user