Qdrant collections use regulation_id (not regulation_code), regulation_name_de, guideline_name, download_url etc. Also search bp_compliance_datenschutz collection where OWASP/ENISA docs live. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
254 lines
8.9 KiB
Python
254 lines
8.9 KiB
Python
"""
|
|
Anchor Finder — finds open-source references (OWASP, NIST, ENISA) for controls.
|
|
|
|
Two-stage search:
|
|
Stage A: Direct Qdrant vector search for open-source chunks matching the control topic
|
|
Stage B: Web search via DuckDuckGo Instant Answer API (no API key needed)
|
|
|
|
Only open-source references (Rule 1+2) are accepted as anchors.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
from dataclasses import dataclass
|
|
from typing import List, Optional
|
|
|
|
import httpx
|
|
|
|
from .control_generator import (
|
|
GeneratedControl,
|
|
REGULATION_LICENSE_MAP,
|
|
_RULE2_PREFIXES,
|
|
_RULE3_PREFIXES,
|
|
_classify_regulation,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant:6333")
|
|
EMBEDDING_URL = os.getenv("EMBEDDING_URL", "http://embedding-service:8087")
|
|
|
|
# Regulation codes that are safe to reference as open anchors (Rule 1+2)
|
|
_OPEN_SOURCE_RULES = {1, 2}
|
|
|
|
# Collections to search for anchors (open-source frameworks)
|
|
_ANCHOR_COLLECTIONS = ["bp_compliance_ce", "bp_compliance_datenschutz"]
|
|
|
|
|
|
@dataclass
|
|
class OpenAnchor:
|
|
framework: str
|
|
ref: str
|
|
url: str
|
|
|
|
|
|
class AnchorFinder:
|
|
"""Finds open-source references to anchor generated controls."""
|
|
|
|
def __init__(self, rag_client=None):
|
|
# rag_client kept for backwards compat but no longer used
|
|
pass
|
|
|
|
async def find_anchors(
|
|
self,
|
|
control: GeneratedControl,
|
|
skip_web: bool = False,
|
|
min_anchors: int = 2,
|
|
) -> List[OpenAnchor]:
|
|
"""Find open-source anchors for a control."""
|
|
# Stage A: Direct Qdrant vector search
|
|
anchors = await self._search_qdrant_for_open_anchors(control)
|
|
|
|
# Stage B: Web search if not enough anchors
|
|
if len(anchors) < min_anchors and not skip_web:
|
|
web_anchors = await self._search_web(control)
|
|
# Deduplicate by framework+ref
|
|
existing_keys = {(a.framework, a.ref) for a in anchors}
|
|
for wa in web_anchors:
|
|
if (wa.framework, wa.ref) not in existing_keys:
|
|
anchors.append(wa)
|
|
|
|
return anchors
|
|
|
|
async def _get_embedding(self, text: str) -> list:
|
|
"""Get embedding vector via embedding service."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
resp = await client.post(
|
|
f"{EMBEDDING_URL}/embed",
|
|
json={"texts": [text]},
|
|
)
|
|
resp.raise_for_status()
|
|
embeddings = resp.json().get("embeddings", [])
|
|
return embeddings[0] if embeddings else []
|
|
except Exception as e:
|
|
logger.warning("Embedding request failed: %s", e)
|
|
return []
|
|
|
|
async def _search_qdrant_for_open_anchors(self, control: GeneratedControl) -> List[OpenAnchor]:
|
|
"""Search Qdrant directly for chunks from open sources matching the control topic."""
|
|
# Build search query from control title + first 3 tags
|
|
tags_str = " ".join(control.tags[:3]) if control.tags else ""
|
|
query = f"{control.title} {tags_str}".strip()
|
|
|
|
# Get embedding for query
|
|
embedding = await self._get_embedding(query)
|
|
if not embedding:
|
|
return []
|
|
|
|
anchors: List[OpenAnchor] = []
|
|
seen: set[str] = set()
|
|
|
|
for collection in _ANCHOR_COLLECTIONS:
|
|
try:
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
resp = await client.post(
|
|
f"{QDRANT_URL}/collections/{collection}/points/search",
|
|
json={
|
|
"vector": embedding,
|
|
"limit": 20,
|
|
"with_payload": True,
|
|
"with_vector": False,
|
|
},
|
|
)
|
|
if resp.status_code != 200:
|
|
logger.warning("Qdrant search %s failed: %d", collection, resp.status_code)
|
|
continue
|
|
|
|
results = resp.json().get("result", [])
|
|
|
|
except Exception as e:
|
|
logger.warning("Qdrant search error for %s: %s", collection, e)
|
|
continue
|
|
|
|
for hit in results:
|
|
payload = hit.get("payload", {})
|
|
# Qdrant payloads use regulation_id (not regulation_code)
|
|
regulation_code = (
|
|
payload.get("regulation_id", "")
|
|
or payload.get("regulation_code", "")
|
|
or payload.get("metadata", {}).get("regulation_id", "")
|
|
)
|
|
if not regulation_code:
|
|
continue
|
|
|
|
# Only accept open-source references
|
|
license_info = _classify_regulation(regulation_code)
|
|
if license_info.get("rule") not in _OPEN_SOURCE_RULES:
|
|
continue
|
|
|
|
# Build reference key for dedup
|
|
article = payload.get("article", "") or payload.get("category", "") or ""
|
|
ref = article
|
|
key = f"{regulation_code}:{ref}"
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
|
|
reg_name = (
|
|
payload.get("regulation_name_de", "")
|
|
or payload.get("regulation_name_en", "")
|
|
or payload.get("guideline_name", "")
|
|
)
|
|
reg_short = payload.get("regulation_short", "")
|
|
source_url = (
|
|
payload.get("download_url", "")
|
|
or payload.get("source_url", "")
|
|
or payload.get("source", "")
|
|
)
|
|
|
|
framework_name = license_info.get("name", reg_name or reg_short or regulation_code)
|
|
url = source_url or self._build_reference_url(regulation_code, ref)
|
|
|
|
anchors.append(OpenAnchor(
|
|
framework=framework_name,
|
|
ref=ref,
|
|
url=url,
|
|
))
|
|
|
|
if len(anchors) >= 5:
|
|
break
|
|
|
|
if len(anchors) >= 5:
|
|
break
|
|
|
|
return anchors
|
|
|
|
async def _search_web(self, control: GeneratedControl) -> List[OpenAnchor]:
|
|
"""Search DuckDuckGo Instant Answer API for open references."""
|
|
keywords = f"{control.title} security control OWASP NIST"
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
resp = await client.get(
|
|
"https://api.duckduckgo.com/",
|
|
params={
|
|
"q": keywords,
|
|
"format": "json",
|
|
"no_html": "1",
|
|
"skip_disambig": "1",
|
|
},
|
|
)
|
|
if resp.status_code != 200:
|
|
return []
|
|
|
|
data = resp.json()
|
|
anchors: List[OpenAnchor] = []
|
|
|
|
# Parse RelatedTopics
|
|
for topic in data.get("RelatedTopics", [])[:10]:
|
|
url = topic.get("FirstURL", "")
|
|
text = topic.get("Text", "")
|
|
|
|
if not url:
|
|
continue
|
|
|
|
# Only accept known open-source domains
|
|
framework = self._identify_framework_from_url(url)
|
|
if framework:
|
|
anchors.append(OpenAnchor(
|
|
framework=framework,
|
|
ref=text[:100] if text else url,
|
|
url=url,
|
|
))
|
|
|
|
if len(anchors) >= 3:
|
|
break
|
|
|
|
return anchors
|
|
|
|
except Exception as e:
|
|
logger.warning("Web anchor search failed: %s", e)
|
|
return []
|
|
|
|
@staticmethod
|
|
def _identify_framework_from_url(url: str) -> Optional[str]:
|
|
"""Identify if a URL belongs to a known open-source framework."""
|
|
url_lower = url.lower()
|
|
if "owasp.org" in url_lower:
|
|
return "OWASP"
|
|
if "nist.gov" in url_lower or "csrc.nist.gov" in url_lower:
|
|
return "NIST"
|
|
if "enisa.europa.eu" in url_lower:
|
|
return "ENISA"
|
|
if "cisa.gov" in url_lower:
|
|
return "CISA"
|
|
if "eur-lex.europa.eu" in url_lower:
|
|
return "EU Law"
|
|
return None
|
|
|
|
@staticmethod
|
|
def _build_reference_url(regulation_code: str, ref: str) -> str:
|
|
"""Build a reference URL for known frameworks."""
|
|
code = regulation_code.lower()
|
|
if code.startswith("owasp"):
|
|
return "https://owasp.org/www-project-application-security-verification-standard/"
|
|
if code.startswith("nist"):
|
|
return "https://csrc.nist.gov/publications"
|
|
if code.startswith("enisa"):
|
|
return "https://www.enisa.europa.eu/publications"
|
|
if code.startswith("eu_"):
|
|
return "https://eur-lex.europa.eu/"
|
|
if code == "cisa_secure_by_design":
|
|
return "https://www.cisa.gov/securebydesign"
|
|
return ""
|