Compare commits

..

1 Commits

Author SHA1 Message Date
Sharang Parnerkar 4d5eedcc8b fix: add HTTP timeout to reqwest client and CVE stage timeout
CI / Check (pull_request) Successful in 9m39s
CI / Detect Changes (pull_request) Has been skipped
CI / Deploy Agent (pull_request) Has been skipped
CI / Deploy Dashboard (pull_request) Has been skipped
CI / Deploy Docs (pull_request) Has been skipped
CI / Deploy MCP (pull_request) Has been skipped
Without a timeout on the reqwest client, sequential NVD API calls
for each CVE alert could hang indefinitely. With 1098 SBOM entries
producing hundreds of alerts, this would stall the scan pipeline.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 08:48:03 +02:00
2 changed files with 20 additions and 93 deletions
+1 -6
View File
@@ -19,17 +19,12 @@ impl LlmClient {
model: String, model: String,
embed_model: String, embed_model: String,
) -> Self { ) -> Self {
let http = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(300))
.connect_timeout(std::time::Duration::from_secs(10))
.build()
.unwrap_or_default();
Self { Self {
base_url, base_url,
api_key, api_key,
model, model,
embed_model, embed_model,
http, http: reqwest::Client::new(),
} }
} }
+19 -87
View File
@@ -6,16 +6,11 @@ use compliance_core::models::embedding::{CodeEmbedding, EmbeddingBuildRun, Embed
use compliance_core::models::graph::CodeNode; use compliance_core::models::graph::CodeNode;
use compliance_graph::graph::chunking::extract_chunks; use compliance_graph::graph::chunking::extract_chunks;
use compliance_graph::graph::embedding_store::EmbeddingStore; use compliance_graph::graph::embedding_store::EmbeddingStore;
use futures_util::stream::{FuturesUnordered, StreamExt};
use tracing::{error, info}; use tracing::{error, info};
use crate::error::AgentError; use crate::error::AgentError;
use crate::llm::LlmClient; use crate::llm::LlmClient;
const EMBED_BATCH_SIZE: usize = 20;
const EMBED_CONCURRENCY: usize = 4;
const EMBED_FLUSH_EVERY: usize = 200;
/// RAG pipeline for building embeddings and performing retrieval /// RAG pipeline for building embeddings and performing retrieval
pub struct RagPipeline { pub struct RagPipeline {
llm: Arc<LlmClient>, llm: Arc<LlmClient>,
@@ -82,33 +77,25 @@ impl RagPipeline {
.await .await
.map_err(|e| AgentError::Other(format!("Failed to delete old embeddings: {e}")))?; .map_err(|e| AgentError::Other(format!("Failed to delete old embeddings: {e}")))?;
// Step 3: Batch embed with bounded concurrency. Flush to Mongo and // Step 3: Batch embed (small batches to stay within model limits)
// update progress periodically so the dashboard can show live status. let batch_size = 20;
let mut pending = Vec::with_capacity(EMBED_FLUSH_EVERY); let mut all_embeddings = Vec::new();
let mut embedded_count = 0u32; let mut embedded_count = 0u32;
// Build the list of batch indices to process. for batch_start in (0..chunks.len()).step_by(batch_size) {
let batches: Vec<(usize, usize)> = (0..chunks.len()) let batch_end = (batch_start + batch_size).min(chunks.len());
.step_by(EMBED_BATCH_SIZE) let batch_chunks = &chunks[batch_start..batch_end];
.map(|start| (start, (start + EMBED_BATCH_SIZE).min(chunks.len())))
.collect();
let mut batch_iter = batches.into_iter(); // Prepare texts: context_header + content
let mut in_flight = FuturesUnordered::new(); let texts: Vec<String> = batch_chunks
.iter()
.map(|c| format!("{}\n{}", c.context_header, c.content))
.collect();
// Prime up to EMBED_CONCURRENCY batches. match self.llm.embed(texts).await {
for _ in 0..EMBED_CONCURRENCY { Ok(vectors) => {
if let Some((start, end)) = batch_iter.next() {
in_flight.push(self.embed_batch(&chunks[start..end], start, end));
}
}
while let Some(result) = in_flight.next().await {
match result {
Ok((start, end, vectors)) => {
let batch_chunks = &chunks[start..end];
for (chunk, embedding) in batch_chunks.iter().zip(vectors) { for (chunk, embedding) in batch_chunks.iter().zip(vectors) {
pending.push(CodeEmbedding { all_embeddings.push(CodeEmbedding {
id: None, id: None,
repo_id: repo_id.to_string(), repo_id: repo_id.to_string(),
graph_build_id: graph_build_id.to_string(), graph_build_id: graph_build_id.to_string(),
@@ -126,45 +113,9 @@ impl RagPipeline {
}); });
} }
embedded_count += batch_chunks.len() as u32; embedded_count += batch_chunks.len() as u32;
// Flush pending embeddings to Mongo periodically and update progress.
if pending.len() >= EMBED_FLUSH_EVERY {
self.embedding_store
.store_embeddings(&pending)
.await
.map_err(|e| {
AgentError::Other(format!("Failed to store embeddings: {e}"))
})?;
pending.clear();
}
// Always update the progress counter on the build doc — even if
// we haven't flushed embeddings yet — so the UI shows movement.
if let Err(e) = self
.embedding_store
.update_build(
repo_id,
graph_build_id,
EmbeddingBuildStatus::Running,
embedded_count,
None,
)
.await
{
error!("[{repo_id}] Failed to update build progress: {e}");
}
// Queue the next batch to keep concurrency saturated.
if let Some((s, e)) = batch_iter.next() {
in_flight.push(self.embed_batch(&chunks[s..e], s, e));
}
} }
Err(e) => { Err(e) => {
error!("[{repo_id}] Embedding batch failed: {e}"); error!("[{repo_id}] Embedding batch failed: {e}");
// Flush whatever we have so partial progress isn't lost.
if !pending.is_empty() {
let _ = self.embedding_store.store_embeddings(&pending).await;
}
build.status = EmbeddingBuildStatus::Failed; build.status = EmbeddingBuildStatus::Failed;
build.error_message = Some(e.to_string()); build.error_message = Some(e.to_string());
build.completed_at = Some(Utc::now()); build.completed_at = Some(Utc::now());
@@ -183,13 +134,11 @@ impl RagPipeline {
} }
} }
// Step 4: Flush any remaining embeddings // Step 4: Store all embeddings
if !pending.is_empty() { self.embedding_store
self.embedding_store .store_embeddings(&all_embeddings)
.store_embeddings(&pending) .await
.await .map_err(|e| AgentError::Other(format!("Failed to store embeddings: {e}")))?;
.map_err(|e| AgentError::Other(format!("Failed to store embeddings: {e}")))?;
}
// Step 5: Update build status // Step 5: Update build status
build.status = EmbeddingBuildStatus::Completed; build.status = EmbeddingBuildStatus::Completed;
@@ -212,21 +161,4 @@ impl RagPipeline {
); );
Ok(build) Ok(build)
} }
/// Embed one batch of chunks. Returns the (start, end, vectors) tuple so
/// out-of-order completion from `FuturesUnordered` can still be reconciled
/// against the original chunk slice.
async fn embed_batch(
&self,
batch_chunks: &[compliance_graph::graph::chunking::CodeChunk],
start: usize,
end: usize,
) -> Result<(usize, usize, Vec<Vec<f64>>), AgentError> {
let texts: Vec<String> = batch_chunks
.iter()
.map(|c| format!("{}\n{}", c.context_header, c.content))
.collect();
let vectors = self.llm.embed(texts).await?;
Ok((start, end, vectors))
}
} }