Add RAG embedding and AI chat feature
Some checks failed
CI / Security Audit (push) Has been cancelled
CI / Tests (push) Has been cancelled
CI / Clippy (push) Has been cancelled
CI / Format (push) Failing after 3s

Implement end-to-end RAG pipeline: AST-aware code chunking, LiteLLM
embedding generation, MongoDB vector storage with brute-force cosine
similarity fallback for self-hosted instances, and a chat API with
RAG-augmented responses. Add dedicated /chat/:repo_id dashboard page
with embedding build controls, message history, and source reference
cards.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Sharang Parnerkar
2026-03-04 23:29:40 +01:00
parent db454867f3
commit 89c30a62dd
25 changed files with 1692 additions and 25 deletions

View File

@@ -20,6 +20,7 @@ impl ComplianceAgent {
config.litellm_url.clone(),
config.litellm_api_key.clone(),
config.litellm_model.clone(),
config.litellm_embed_model.clone(),
));
Self {
config,

View File

@@ -0,0 +1,238 @@
use std::sync::Arc;
use axum::extract::{Extension, Path};
use axum::http::StatusCode;
use axum::Json;
use mongodb::bson::doc;
use compliance_core::models::chat::{ChatRequest, ChatResponse, SourceReference};
use compliance_core::models::embedding::EmbeddingBuildRun;
use compliance_graph::graph::embedding_store::EmbeddingStore;
use crate::agent::ComplianceAgent;
use crate::rag::pipeline::RagPipeline;
use super::ApiResponse;
type AgentExt = Extension<Arc<ComplianceAgent>>;
/// POST /api/v1/chat/:repo_id — Send a chat message with RAG context
pub async fn chat(
Extension(agent): AgentExt,
Path(repo_id): Path<String>,
Json(req): Json<ChatRequest>,
) -> Result<Json<ApiResponse<ChatResponse>>, StatusCode> {
let pipeline = RagPipeline::new(agent.llm.clone(), agent.db.inner());
// Step 1: Embed the user's message
let query_vectors = agent
.llm
.embed(vec![req.message.clone()])
.await
.map_err(|e| {
tracing::error!("Failed to embed query: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let query_embedding = query_vectors.into_iter().next().ok_or_else(|| {
tracing::error!("Empty embedding response");
StatusCode::INTERNAL_SERVER_ERROR
})?;
// Step 2: Vector search — retrieve top 8 chunks
let search_results = pipeline
.store()
.vector_search(&repo_id, query_embedding, 8, 0.5)
.await
.map_err(|e| {
tracing::error!("Vector search failed: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
// Step 3: Build system prompt with code context
let mut context_parts = Vec::new();
let mut sources = Vec::new();
for (embedding, score) in &search_results {
context_parts.push(format!(
"--- {} ({}, {}:L{}-L{}) ---\n{}",
embedding.qualified_name,
embedding.kind,
embedding.file_path,
embedding.start_line,
embedding.end_line,
embedding.content,
));
// Truncate snippet for the response
let snippet: String = embedding
.content
.lines()
.take(10)
.collect::<Vec<_>>()
.join("\n");
sources.push(SourceReference {
file_path: embedding.file_path.clone(),
qualified_name: embedding.qualified_name.clone(),
start_line: embedding.start_line,
end_line: embedding.end_line,
language: embedding.language.clone(),
snippet,
score: *score,
});
}
let code_context = if context_parts.is_empty() {
"No relevant code context found.".to_string()
} else {
context_parts.join("\n\n")
};
let system_prompt = format!(
"You are an expert code assistant for a software repository. \
Answer the user's question based on the code context below. \
Reference specific files and functions when relevant. \
If the context doesn't contain enough information, say so.\n\n\
## Code Context\n\n{code_context}"
);
// Step 4: Build messages array with history
let mut messages: Vec<(String, String)> = Vec::new();
messages.push(("system".to_string(), system_prompt));
for msg in &req.history {
messages.push((msg.role.clone(), msg.content.clone()));
}
messages.push(("user".to_string(), req.message));
// Step 5: Call LLM
let response_text = agent
.llm
.chat_with_messages(messages, Some(0.3))
.await
.map_err(|e| {
tracing::error!("LLM chat failed: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(ApiResponse {
data: ChatResponse {
message: response_text,
sources,
},
total: None,
page: None,
}))
}
/// POST /api/v1/chat/:repo_id/build-embeddings — Trigger embedding build
pub async fn build_embeddings(
Extension(agent): AgentExt,
Path(repo_id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let agent_clone = (*agent).clone();
tokio::spawn(async move {
let repo = match agent_clone
.db
.repositories()
.find_one(doc! { "_id": mongodb::bson::oid::ObjectId::parse_str(&repo_id).ok() })
.await
{
Ok(Some(r)) => r,
_ => {
tracing::error!("Repository {repo_id} not found for embedding build");
return;
}
};
// Get latest graph build
let build = match agent_clone
.db
.graph_builds()
.find_one(doc! { "repo_id": &repo_id })
.sort(doc! { "started_at": -1 })
.await
{
Ok(Some(b)) => b,
_ => {
tracing::error!("[{repo_id}] No graph build found — build graph first");
return;
}
};
let graph_build_id = build
.id
.map(|id| id.to_hex())
.unwrap_or_else(|| "unknown".to_string());
// Get nodes
let nodes: Vec<compliance_core::models::graph::CodeNode> = match agent_clone
.db
.graph_nodes()
.find(doc! { "repo_id": &repo_id })
.await
{
Ok(cursor) => {
use futures_util::StreamExt;
let mut items = Vec::new();
let mut cursor = cursor;
while let Some(Ok(item)) = cursor.next().await {
items.push(item);
}
items
}
Err(e) => {
tracing::error!("[{repo_id}] Failed to fetch nodes: {e}");
return;
}
};
let git_ops = crate::pipeline::git::GitOps::new(&agent_clone.config.git_clone_base_path);
let repo_path = match git_ops.clone_or_fetch(&repo.git_url, &repo.name) {
Ok(p) => p,
Err(e) => {
tracing::error!("Failed to clone repo for embedding build: {e}");
return;
}
};
let pipeline = RagPipeline::new(agent_clone.llm.clone(), agent_clone.db.inner());
match pipeline
.build_embeddings(&repo_id, &repo_path, &graph_build_id, &nodes)
.await
{
Ok(run) => {
tracing::info!(
"[{repo_id}] Embedding build complete: {}/{} chunks",
run.embedded_chunks,
run.total_chunks
);
}
Err(e) => {
tracing::error!("[{repo_id}] Embedding build failed: {e}");
}
}
});
Ok(Json(
serde_json::json!({ "status": "embedding_build_triggered" }),
))
}
/// GET /api/v1/chat/:repo_id/status — Get latest embedding build status
pub async fn embedding_status(
Extension(agent): AgentExt,
Path(repo_id): Path<String>,
) -> Result<Json<ApiResponse<Option<EmbeddingBuildRun>>>, StatusCode> {
let store = EmbeddingStore::new(agent.db.inner());
let build = store.get_latest_build(&repo_id).await.map_err(|e| {
tracing::error!("Failed to get embedding status: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(Json(ApiResponse {
data: build,
total: None,
page: None,
}))
}

View File

@@ -1,3 +1,4 @@
pub mod chat;
pub mod dast;
pub mod graph;

View File

@@ -23,10 +23,7 @@ pub fn build_router() -> Router {
.route("/api/v1/issues", get(handlers::list_issues))
.route("/api/v1/scan-runs", get(handlers::list_scan_runs))
// Graph API endpoints
.route(
"/api/v1/graph/{repo_id}",
get(handlers::graph::get_graph),
)
.route("/api/v1/graph/{repo_id}", get(handlers::graph::get_graph))
.route(
"/api/v1/graph/{repo_id}/nodes",
get(handlers::graph::get_nodes),
@@ -52,14 +49,8 @@ pub fn build_router() -> Router {
post(handlers::graph::trigger_build),
)
// DAST API endpoints
.route(
"/api/v1/dast/targets",
get(handlers::dast::list_targets),
)
.route(
"/api/v1/dast/targets",
post(handlers::dast::add_target),
)
.route("/api/v1/dast/targets", get(handlers::dast::list_targets))
.route("/api/v1/dast/targets", post(handlers::dast::add_target))
.route(
"/api/v1/dast/targets/{id}/scan",
post(handlers::dast::trigger_scan),
@@ -68,12 +59,19 @@ pub fn build_router() -> Router {
"/api/v1/dast/scan-runs",
get(handlers::dast::list_scan_runs),
)
.route(
"/api/v1/dast/findings",
get(handlers::dast::list_findings),
)
.route("/api/v1/dast/findings", get(handlers::dast::list_findings))
.route(
"/api/v1/dast/findings/{id}",
get(handlers::dast::get_finding),
)
// Chat / RAG API endpoints
.route("/api/v1/chat/{repo_id}", post(handlers::chat::chat))
.route(
"/api/v1/chat/{repo_id}/build-embeddings",
post(handlers::chat::build_embeddings),
)
.route(
"/api/v1/chat/{repo_id}/status",
get(handlers::chat::embedding_status),
)
}

View File

@@ -24,6 +24,8 @@ pub fn load_config() -> Result<AgentConfig, AgentError> {
.unwrap_or_else(|| "http://localhost:4000".to_string()),
litellm_api_key: SecretString::from(env_var_opt("LITELLM_API_KEY").unwrap_or_default()),
litellm_model: env_var_opt("LITELLM_MODEL").unwrap_or_else(|| "gpt-4o".to_string()),
litellm_embed_model: env_var_opt("LITELLM_EMBED_MODEL")
.unwrap_or_else(|| "text-embedding-3-small".to_string()),
github_token: env_secret_opt("GITHUB_TOKEN"),
github_webhook_secret: env_secret_opt("GITHUB_WEBHOOK_SECRET"),
gitlab_url: env_var_opt("GITLAB_URL"),

View File

@@ -127,11 +127,7 @@ impl Database {
// dast_targets: index on repo_id
self.dast_targets()
.create_index(
IndexModel::builder()
.keys(doc! { "repo_id": 1 })
.build(),
)
.create_index(IndexModel::builder().keys(doc! { "repo_id": 1 }).build())
.await?;
// dast_scan_runs: compound (target_id, started_at DESC)
@@ -152,6 +148,24 @@ impl Database {
)
.await?;
// code_embeddings: compound (repo_id, graph_build_id)
self.code_embeddings()
.create_index(
IndexModel::builder()
.keys(doc! { "repo_id": 1, "graph_build_id": 1 })
.build(),
)
.await?;
// embedding_builds: compound (repo_id, started_at DESC)
self.embedding_builds()
.create_index(
IndexModel::builder()
.keys(doc! { "repo_id": 1, "started_at": -1 })
.build(),
)
.await?;
tracing::info!("Database indexes ensured");
Ok(())
}
@@ -210,6 +224,17 @@ impl Database {
self.inner.collection("dast_findings")
}
// Embedding collections
pub fn code_embeddings(&self) -> Collection<compliance_core::models::embedding::CodeEmbedding> {
self.inner.collection("code_embeddings")
}
pub fn embedding_builds(
&self,
) -> Collection<compliance_core::models::embedding::EmbeddingBuildRun> {
self.inner.collection("embedding_builds")
}
#[allow(dead_code)]
pub fn raw_collection(&self, name: &str) -> Collection<mongodb::bson::Document> {
self.inner.collection(name)

View File

@@ -8,6 +8,7 @@ pub struct LlmClient {
base_url: String,
api_key: SecretString,
model: String,
embed_model: String,
http: reqwest::Client,
}
@@ -42,16 +43,46 @@ struct ChatResponseMessage {
content: String,
}
/// Request body for the embeddings API
#[derive(Serialize)]
struct EmbeddingRequest {
model: String,
input: Vec<String>,
}
/// Response from the embeddings API
#[derive(Deserialize)]
struct EmbeddingResponse {
data: Vec<EmbeddingData>,
}
/// A single embedding result
#[derive(Deserialize)]
struct EmbeddingData {
embedding: Vec<f64>,
index: usize,
}
impl LlmClient {
pub fn new(base_url: String, api_key: SecretString, model: String) -> Self {
pub fn new(
base_url: String,
api_key: SecretString,
model: String,
embed_model: String,
) -> Self {
Self {
base_url,
api_key,
model,
embed_model,
http: reqwest::Client::new(),
}
}
pub fn embed_model(&self) -> &str {
&self.embed_model
}
pub async fn chat(
&self,
system_prompt: &str,
@@ -169,4 +200,49 @@ impl LlmClient {
.map(|c| c.message.content.clone())
.ok_or_else(|| AgentError::Other("Empty response from LiteLLM".to_string()))
}
/// Generate embeddings for a batch of texts
pub async fn embed(&self, texts: Vec<String>) -> Result<Vec<Vec<f64>>, AgentError> {
let url = format!("{}/v1/embeddings", self.base_url.trim_end_matches('/'));
let request_body = EmbeddingRequest {
model: self.embed_model.clone(),
input: texts,
};
let mut req = self
.http
.post(&url)
.header("content-type", "application/json")
.json(&request_body);
let key = self.api_key.expose_secret();
if !key.is_empty() {
req = req.header("Authorization", format!("Bearer {key}"));
}
let resp = req
.send()
.await
.map_err(|e| AgentError::Other(format!("Embedding request failed: {e}")))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(AgentError::Other(format!(
"Embedding API returned {status}: {body}"
)));
}
let body: EmbeddingResponse = resp
.json()
.await
.map_err(|e| AgentError::Other(format!("Failed to parse embedding response: {e}")))?;
// Sort by index to maintain input order
let mut data = body.data;
data.sort_by_key(|d| d.index);
Ok(data.into_iter().map(|d| d.embedding).collect())
}
}

View File

@@ -7,6 +7,7 @@ mod database;
mod error;
mod llm;
mod pipeline;
mod rag;
mod scheduler;
#[allow(dead_code)]
mod trackers;

View File

@@ -0,0 +1 @@
pub mod pipeline;

View File

@@ -0,0 +1,164 @@
use std::path::Path;
use std::sync::Arc;
use chrono::Utc;
use compliance_core::models::embedding::{CodeEmbedding, EmbeddingBuildRun, EmbeddingBuildStatus};
use compliance_core::models::graph::CodeNode;
use compliance_graph::graph::chunking::extract_chunks;
use compliance_graph::graph::embedding_store::EmbeddingStore;
use tracing::{error, info};
use crate::error::AgentError;
use crate::llm::LlmClient;
/// RAG pipeline for building embeddings and performing retrieval
pub struct RagPipeline {
llm: Arc<LlmClient>,
embedding_store: EmbeddingStore,
}
impl RagPipeline {
pub fn new(llm: Arc<LlmClient>, db: &mongodb::Database) -> Self {
Self {
llm,
embedding_store: EmbeddingStore::new(db),
}
}
pub fn store(&self) -> &EmbeddingStore {
&self.embedding_store
}
/// Build embeddings for all code nodes in a repository
pub async fn build_embeddings(
&self,
repo_id: &str,
repo_path: &Path,
graph_build_id: &str,
nodes: &[CodeNode],
) -> Result<EmbeddingBuildRun, AgentError> {
let embed_model = self.llm.embed_model().to_string();
let mut build =
EmbeddingBuildRun::new(repo_id.to_string(), graph_build_id.to_string(), embed_model);
// Step 1: Extract chunks
let chunks = extract_chunks(repo_path, nodes, 2048);
build.total_chunks = chunks.len() as u32;
info!(
"[{repo_id}] Extracted {} chunks for embedding",
chunks.len()
);
// Store the initial build record
self.embedding_store
.store_build(&build)
.await
.map_err(|e| AgentError::Other(format!("Failed to store build: {e}")))?;
if chunks.is_empty() {
build.status = EmbeddingBuildStatus::Completed;
build.completed_at = Some(Utc::now());
self.embedding_store
.update_build(
repo_id,
graph_build_id,
EmbeddingBuildStatus::Completed,
0,
None,
)
.await
.map_err(|e| AgentError::Other(format!("Failed to update build: {e}")))?;
return Ok(build);
}
// Step 2: Delete old embeddings for this repo
self.embedding_store
.delete_repo_embeddings(repo_id)
.await
.map_err(|e| AgentError::Other(format!("Failed to delete old embeddings: {e}")))?;
// Step 3: Batch embed (small batches to stay within model limits)
let batch_size = 20;
let mut all_embeddings = Vec::new();
let mut embedded_count = 0u32;
for batch_start in (0..chunks.len()).step_by(batch_size) {
let batch_end = (batch_start + batch_size).min(chunks.len());
let batch_chunks = &chunks[batch_start..batch_end];
// Prepare texts: context_header + content
let texts: Vec<String> = batch_chunks
.iter()
.map(|c| format!("{}\n{}", c.context_header, c.content))
.collect();
match self.llm.embed(texts).await {
Ok(vectors) => {
for (chunk, embedding) in batch_chunks.iter().zip(vectors) {
all_embeddings.push(CodeEmbedding {
id: None,
repo_id: repo_id.to_string(),
graph_build_id: graph_build_id.to_string(),
qualified_name: chunk.qualified_name.clone(),
kind: chunk.kind.clone(),
file_path: chunk.file_path.clone(),
start_line: chunk.start_line,
end_line: chunk.end_line,
language: chunk.language.clone(),
content: chunk.content.clone(),
context_header: chunk.context_header.clone(),
embedding,
token_estimate: chunk.token_estimate,
created_at: Utc::now(),
});
}
embedded_count += batch_chunks.len() as u32;
}
Err(e) => {
error!("[{repo_id}] Embedding batch failed: {e}");
build.status = EmbeddingBuildStatus::Failed;
build.error_message = Some(e.to_string());
build.completed_at = Some(Utc::now());
let _ = self
.embedding_store
.update_build(
repo_id,
graph_build_id,
EmbeddingBuildStatus::Failed,
embedded_count,
Some(e.to_string()),
)
.await;
return Ok(build);
}
}
}
// Step 4: Store all embeddings
self.embedding_store
.store_embeddings(&all_embeddings)
.await
.map_err(|e| AgentError::Other(format!("Failed to store embeddings: {e}")))?;
// Step 5: Update build status
build.status = EmbeddingBuildStatus::Completed;
build.embedded_chunks = embedded_count;
build.completed_at = Some(Utc::now());
self.embedding_store
.update_build(
repo_id,
graph_build_id,
EmbeddingBuildStatus::Completed,
embedded_count,
None,
)
.await
.map_err(|e| AgentError::Other(format!("Failed to update build: {e}")))?;
info!(
"[{repo_id}] Embedding build complete: {embedded_count}/{} chunks",
build.total_chunks
);
Ok(build)
}
}