""" Anchor Finder — finds open-source references (OWASP, NIST, ENISA) for controls. Two-stage search: Stage A: RAG-internal search for open-source chunks matching the control topic Stage B: Web search via DuckDuckGo Instant Answer API (no API key needed) Only open-source references (Rule 1+2) are accepted as anchors. """ import logging from dataclasses import dataclass from typing import List, Optional import httpx from .rag_client import ComplianceRAGClient, get_rag_client from .control_generator import ( GeneratedControl, REGULATION_LICENSE_MAP, _RULE2_PREFIXES, _RULE3_PREFIXES, _classify_regulation, ) logger = logging.getLogger(__name__) # Regulation codes that are safe to reference as open anchors (Rule 1+2) _OPEN_SOURCE_RULES = {1, 2} @dataclass class OpenAnchor: framework: str ref: str url: str class AnchorFinder: """Finds open-source references to anchor generated controls.""" def __init__(self, rag_client: Optional[ComplianceRAGClient] = None): self.rag = rag_client or get_rag_client() async def find_anchors( self, control: GeneratedControl, skip_web: bool = False, min_anchors: int = 2, ) -> List[OpenAnchor]: """Find open-source anchors for a control.""" # Stage A: RAG-internal search anchors = await self._search_rag_for_open_anchors(control) # Stage B: Web search if not enough anchors if len(anchors) < min_anchors and not skip_web: web_anchors = await self._search_web(control) # Deduplicate by framework+ref existing_keys = {(a.framework, a.ref) for a in anchors} for wa in web_anchors: if (wa.framework, wa.ref) not in existing_keys: anchors.append(wa) return anchors async def _search_rag_for_open_anchors(self, control: GeneratedControl) -> List[OpenAnchor]: """Search RAG for chunks from open sources matching the control topic.""" # Build search query from control title + first 3 tags tags_str = " ".join(control.tags[:3]) if control.tags else "" query = f"{control.title} {tags_str}".strip() results = await self.rag.search_with_rerank( query=query, collection="bp_compliance_ce", top_k=15, ) anchors: List[OpenAnchor] = [] seen: set[str] = set() for r in results: if not r.regulation_code: continue # Only accept open-source references license_info = _classify_regulation(r.regulation_code) if license_info.get("rule") not in _OPEN_SOURCE_RULES: continue # Build reference key for dedup ref = r.article or r.category or "" key = f"{r.regulation_code}:{ref}" if key in seen: continue seen.add(key) framework_name = license_info.get("name", r.regulation_name or r.regulation_short or r.regulation_code) url = r.source_url or self._build_reference_url(r.regulation_code, ref) anchors.append(OpenAnchor( framework=framework_name, ref=ref, url=url, )) if len(anchors) >= 5: break return anchors async def _search_web(self, control: GeneratedControl) -> List[OpenAnchor]: """Search DuckDuckGo Instant Answer API for open references.""" keywords = f"{control.title} security control OWASP NIST" try: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.get( "https://api.duckduckgo.com/", params={ "q": keywords, "format": "json", "no_html": "1", "skip_disambig": "1", }, ) if resp.status_code != 200: return [] data = resp.json() anchors: List[OpenAnchor] = [] # Parse RelatedTopics for topic in data.get("RelatedTopics", [])[:10]: url = topic.get("FirstURL", "") text = topic.get("Text", "") if not url: continue # Only accept known open-source domains framework = self._identify_framework_from_url(url) if framework: anchors.append(OpenAnchor( framework=framework, ref=text[:100] if text else url, url=url, )) if len(anchors) >= 3: break return anchors except Exception as e: logger.warning("Web anchor search failed: %s", e) return [] @staticmethod def _identify_framework_from_url(url: str) -> Optional[str]: """Identify if a URL belongs to a known open-source framework.""" url_lower = url.lower() if "owasp.org" in url_lower: return "OWASP" if "nist.gov" in url_lower or "csrc.nist.gov" in url_lower: return "NIST" if "enisa.europa.eu" in url_lower: return "ENISA" if "cisa.gov" in url_lower: return "CISA" if "eur-lex.europa.eu" in url_lower: return "EU Law" return None @staticmethod def _build_reference_url(regulation_code: str, ref: str) -> str: """Build a reference URL for known frameworks.""" code = regulation_code.lower() if code.startswith("owasp"): return "https://owasp.org/www-project-application-security-verification-standard/" if code.startswith("nist"): return "https://csrc.nist.gov/publications" if code.startswith("enisa"): return "https://www.enisa.europa.eu/publications" if code.startswith("eu_"): return "https://eur-lex.europa.eu/" if code == "cisa_secure_by_design": return "https://www.cisa.gov/securebydesign" return ""