Compare commits

..

8 Commits

Author SHA1 Message Date
Sharang Parnerkar b96dda11fb fix: live progress + concurrency for embedding builds
CI / Check (pull_request) Successful in 10m34s
CI / Detect Changes (pull_request) Has been skipped
CI / Deploy Agent (pull_request) Has been skipped
CI / Deploy Dashboard (pull_request) Has been skipped
CI / Deploy Docs (pull_request) Has been skipped
CI / Deploy MCP (pull_request) Has been skipped
The embedding build progress was only written to MongoDB after every
batch completed (and the final flush + status update only happened at
the very end), so the dashboard would show "0/N chunks (0%)" for the
entire run, then jump straight to "complete." For a repo with 2k+
chunks this looked like the build was stuck.

Three fixes:
- pipeline: call update_build(Running, embedded_count, ...) after each
  batch so /api/v1/chat/:repo_id/status reflects real progress, and
  flush embeddings to Mongo every 200 records so a partial failure does
  not lose everything.
- pipeline: drive batches with FuturesUnordered at concurrency=4 so
  litellm requests overlap instead of going strictly serial (112
  sequential requests for a 2221-chunk repo were the wall-time floor).
- llm client: give the reqwest client a 300s request timeout and 10s
  connect timeout. Previously LlmClient used reqwest::Client::new()
  with no timeout, so a hung embedding call would block the build
  indefinitely.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 11:39:37 +02:00
sharang e67a13535a fix: add HTTP timeout to reqwest client and CVE stage timeout (#79)
CI / Check (push) Has been skipped
CI / Detect Changes (push) Successful in 5s
CI / Deploy Agent (push) Successful in 8m26s
CI / Deploy Dashboard (push) Has been skipped
CI / Deploy Docs (push) Has been skipped
CI / Deploy MCP (push) Has been skipped
2026-05-13 07:30:26 +00:00
sharang df0063abc0 fix: scanner timeouts, semgrep memory cap, syft remote lookups, Script error (#78)
CI / Check (push) Has been skipped
CI / Detect Changes (push) Successful in 5s
CI / Deploy Agent (push) Successful in 9m41s
CI / Deploy Dashboard (push) Successful in 15m19s
CI / Deploy Docs (push) Has been skipped
CI / Deploy MCP (push) Successful in 3m7s
## Summary

- **Scan produces no results in Orca** — semgrep (`--config=auto`, unbounded memory) and syft (remote license network calls) were getting OOM-killed or hanging in resource-constrained Orca containers. Scan would "complete" with 0 findings/SBOMs silently because each scanner failure is caught and logged as a warning.
- **Dashboard Script error spam** — `document::Script` in Dioxus 0.7 needs a single text node child for inline scripts; `dangerous_inner_html` was invalid and spammed the error log on every unauthenticated page load.

## Changes

| File | Change |
|------|--------|
| `semgrep.rs` | Add `--max-memory 500 --jobs 1`; 10-minute timeout |
| `syft.rs` | Remove remote license lookup env vars; 5-minute timeout |
| `gitleaks.rs` | 5-minute timeout |
| `app_shell.rs` | Fix `dangerous_inner_html` → text child in `document::Script` |

## Test plan

- [ ] Trigger a scan on a repo in Orca — findings and SBOM entries should now appear
- [ ] Agent logs should show timeout/error warnings rather than silent empty results when tools are killed
- [ ] Navigate to dashboard unauthenticated — Script error gone from logs
- [ ] Verify scans work end-to-end with `docker compose up`

---------

Co-authored-by: Sharang Parnerkar <30073382+mighty840@users.noreply.github.com>
Reviewed-on: #78
2026-05-12 11:27:24 +00:00
Sharang Parnerkar 5cafd13f44 ci: log orca webhook response so deploy steps arent silent
CI / Check (push) Has been skipped
CI / Detect Changes (push) Successful in 5s
CI / Deploy Agent (push) Has been skipped
CI / Deploy Dashboard (push) Has been skipped
CI / Deploy Docs (push) Has been skipped
CI / Deploy MCP (push) Has been skipped
Nightly E2E Tests / E2E Tests (push) Failing after 2m59s
2026-04-08 15:09:27 +02:00
Sharang Parnerkar 69209649a5 ci: trigger first orca build for all services
CI / Check (push) Has been skipped
CI / Detect Changes (push) Successful in 4s
CI / Deploy Agent (push) Successful in 7m5s
CI / Deploy Docs (push) Successful in 30s
CI / Deploy MCP (push) Successful in 1m31s
CI / Deploy Dashboard (push) Failing after 21m28s
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 10:10:07 +02:00
Sharang Parnerkar d5439adc0d ci: trigger build of dashboard, docs, mcp images for orca
CI / Check (push) Has been cancelled
CI / Detect Changes (push) Has been cancelled
CI / Deploy Agent (push) Has been cancelled
CI / Deploy Dashboard (push) Has been cancelled
CI / Deploy Docs (push) Has been cancelled
CI / Deploy MCP (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 10:09:49 +02:00
Sharang Parnerkar bc7cdd35e4 ci: replace coolify webhook with orca deploy
CI / Check (push) Has been cancelled
CI / Detect Changes (push) Has been cancelled
CI / Deploy Agent (push) Has been cancelled
CI / Deploy Dashboard (push) Has been cancelled
CI / Deploy Docs (push) Has been cancelled
CI / Deploy MCP (push) Has been cancelled
Each deploy job now builds the per-service image, pushes to the
private registry as :latest and :sha, then triggers an HMAC-signed
orca redeploy webhook. Coolify webhooks are no longer used.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 10:06:11 +02:00
Sharang Parnerkar c062d834a1 fix: downgrade dotenv missing file from FAILED to info message
CI / Check (push) Has been skipped
CI / Detect Changes (push) Successful in 3s
CI / Deploy Agent (push) Successful in 2s
CI / Deploy Dashboard (push) Has been skipped
CI / Deploy Docs (push) Has been skipped
CI / Deploy MCP (push) Has been skipped
Nightly E2E Tests / E2E Tests (push) Failing after 2m16s
Non-fatal in Docker where env vars come from container config.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 15:33:24 +02:00
16 changed files with 255 additions and 100 deletions
+10
View File
@@ -0,0 +1,10 @@
[advisories]
ignore = [
# hickory-proto 0.25.x pulled in transitively via mongodb → hickory-resolver.
# MongoDB 3.x has not yet released with hickory-resolver 0.26.x, so we cannot
# upgrade past this without a mongodb release. Both are DNS-layer DoS vectors
# requiring a MITM/controlled DNS server against MongoDB's hostname resolution —
# not a realistic attack surface here. Revisit when mongodb bumps hickory.
"RUSTSEC-2026-0118", # NSEC3 loop, no fix available upstream
"RUSTSEC-2026-0119", # O(n²) name compression, fixed in hickory-proto >=0.26.1
]
+48 -20
View File
@@ -145,13 +145,20 @@ jobs:
needs: [detect-changes]
if: needs.detect-changes.outputs.agent == 'true'
container:
image: alpine:latest
image: docker:27-cli
steps:
- name: Trigger Coolify deploy
- name: Build, push and trigger orca redeploy
run: |
apk add --no-cache curl
curl -sf "${{ secrets.COOLIFY_WEBHOOK_AGENT }}" \
-H "Authorization: Bearer ${{ secrets.COOLIFY_TOKEN }}"
apk add --no-cache git curl openssl
git init && git remote add origin "${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY}.git"
git fetch --depth=1 origin "${GITHUB_SHA}" && git checkout FETCH_HEAD
IMAGE=registry.meghsakha.com/compliance-agent
echo "${{ secrets.REGISTRY_PASSWORD }}" | docker login registry.meghsakha.com -u "${{ secrets.REGISTRY_USERNAME }}" --password-stdin
docker build -f Dockerfile.agent -t "$IMAGE:latest" -t "$IMAGE:${GITHUB_SHA}" .
docker push "$IMAGE:latest" && docker push "$IMAGE:${GITHUB_SHA}"
PAYLOAD=$(printf '{"ref":"refs/heads/main","repository":{"full_name":"sharang/compliance-scanner-agent"},"head_commit":{"id":"%s","message":"deploy agent"}}' "${GITHUB_SHA}")
SIG=$(printf '%s' "$PAYLOAD" | openssl dgst -sha256 -hmac "${{ secrets.ORCA_WEBHOOK_SECRET }}" | awk '{print $2}')
RESP=$(curl -fsS -w "\nHTTP %{http_code}" -X POST "http://46.225.100.82:6880/api/v1/webhooks/github" -H "Content-Type: application/json" -H "X-Hub-Signature-256: sha256=$SIG" -d "$PAYLOAD"); echo "$RESP"
deploy-dashboard:
name: Deploy Dashboard
@@ -159,13 +166,20 @@ jobs:
needs: [detect-changes]
if: needs.detect-changes.outputs.dashboard == 'true'
container:
image: alpine:latest
image: docker:27-cli
steps:
- name: Trigger Coolify deploy
- name: Build, push and trigger orca redeploy
run: |
apk add --no-cache curl
curl -sf "${{ secrets.COOLIFY_WEBHOOK_DASHBOARD }}" \
-H "Authorization: Bearer ${{ secrets.COOLIFY_TOKEN }}"
apk add --no-cache git curl openssl
git init && git remote add origin "${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY}.git"
git fetch --depth=1 origin "${GITHUB_SHA}" && git checkout FETCH_HEAD
IMAGE=registry.meghsakha.com/compliance-dashboard
echo "${{ secrets.REGISTRY_PASSWORD }}" | docker login registry.meghsakha.com -u "${{ secrets.REGISTRY_USERNAME }}" --password-stdin
docker build -f Dockerfile.dashboard -t "$IMAGE:latest" -t "$IMAGE:${GITHUB_SHA}" .
docker push "$IMAGE:latest" && docker push "$IMAGE:${GITHUB_SHA}"
PAYLOAD=$(printf '{"ref":"refs/heads/main","repository":{"full_name":"sharang/compliance-scanner-agent"},"head_commit":{"id":"%s","message":"deploy dashboard"}}' "${GITHUB_SHA}")
SIG=$(printf '%s' "$PAYLOAD" | openssl dgst -sha256 -hmac "${{ secrets.ORCA_WEBHOOK_SECRET }}" | awk '{print $2}')
RESP=$(curl -fsS -w "\nHTTP %{http_code}" -X POST "http://46.225.100.82:6880/api/v1/webhooks/github" -H "Content-Type: application/json" -H "X-Hub-Signature-256: sha256=$SIG" -d "$PAYLOAD"); echo "$RESP"
deploy-docs:
name: Deploy Docs
@@ -173,13 +187,20 @@ jobs:
needs: [detect-changes]
if: needs.detect-changes.outputs.docs == 'true'
container:
image: alpine:latest
image: docker:27-cli
steps:
- name: Trigger Coolify deploy
- name: Build, push and trigger orca redeploy
run: |
apk add --no-cache curl
curl -sf "${{ secrets.COOLIFY_WEBHOOK_DOCS }}" \
-H "Authorization: Bearer ${{ secrets.COOLIFY_TOKEN }}"
apk add --no-cache git curl openssl
git init && git remote add origin "${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY}.git"
git fetch --depth=1 origin "${GITHUB_SHA}" && git checkout FETCH_HEAD
IMAGE=registry.meghsakha.com/compliance-docs
echo "${{ secrets.REGISTRY_PASSWORD }}" | docker login registry.meghsakha.com -u "${{ secrets.REGISTRY_USERNAME }}" --password-stdin
docker build -f Dockerfile.docs -t "$IMAGE:latest" -t "$IMAGE:${GITHUB_SHA}" .
docker push "$IMAGE:latest" && docker push "$IMAGE:${GITHUB_SHA}"
PAYLOAD=$(printf '{"ref":"refs/heads/main","repository":{"full_name":"sharang/compliance-scanner-agent"},"head_commit":{"id":"%s","message":"deploy docs"}}' "${GITHUB_SHA}")
SIG=$(printf '%s' "$PAYLOAD" | openssl dgst -sha256 -hmac "${{ secrets.ORCA_WEBHOOK_SECRET }}" | awk '{print $2}')
RESP=$(curl -fsS -w "\nHTTP %{http_code}" -X POST "http://46.225.100.82:6880/api/v1/webhooks/github" -H "Content-Type: application/json" -H "X-Hub-Signature-256: sha256=$SIG" -d "$PAYLOAD"); echo "$RESP"
deploy-mcp:
name: Deploy MCP
@@ -187,10 +208,17 @@ jobs:
needs: [detect-changes]
if: needs.detect-changes.outputs.mcp == 'true'
container:
image: alpine:latest
image: docker:27-cli
steps:
- name: Trigger Coolify deploy
- name: Build, push and trigger orca redeploy
run: |
apk add --no-cache curl
curl -sf "${{ secrets.COOLIFY_WEBHOOK_MCP }}" \
-H "Authorization: Bearer ${{ secrets.COOLIFY_TOKEN }}"
apk add --no-cache git curl openssl
git init && git remote add origin "${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY}.git"
git fetch --depth=1 origin "${GITHUB_SHA}" && git checkout FETCH_HEAD
IMAGE=registry.meghsakha.com/compliance-mcp
echo "${{ secrets.REGISTRY_PASSWORD }}" | docker login registry.meghsakha.com -u "${{ secrets.REGISTRY_USERNAME }}" --password-stdin
docker build -f Dockerfile.mcp -t "$IMAGE:latest" -t "$IMAGE:${GITHUB_SHA}" .
docker push "$IMAGE:latest" && docker push "$IMAGE:${GITHUB_SHA}"
PAYLOAD=$(printf '{"ref":"refs/heads/main","repository":{"full_name":"sharang/compliance-scanner-agent"},"head_commit":{"id":"%s","message":"deploy mcp"}}' "${GITHUB_SHA}")
SIG=$(printf '%s' "$PAYLOAD" | openssl dgst -sha256 -hmac "${{ secrets.ORCA_WEBHOOK_SECRET }}" | awk '{print $2}')
RESP=$(curl -fsS -w "\nHTTP %{http_code}" -X POST "http://46.225.100.82:6880/api/v1/webhooks/github" -H "Content-Type: application/json" -H "X-Hub-Signature-256: sha256=$SIG" -d "$PAYLOAD"); echo "$RESP"
Generated
+6 -6
View File
@@ -3524,9 +3524,9 @@ checksum = "224484c5d09285a7b8cb0a0c117e847ebd14cb6e4470ecf68cdb89c503b0edb9"
[[package]]
name = "mongodb"
version = "3.5.1"
version = "3.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "803dd859e8afa084c255a8effd8000ff86f7c8076a50cd6d8c99e8f3496f75c2"
checksum = "1ef2c933617431ad0246fb5b43c425ebdae18c7f7259c87de0726d93b0e7e91b"
dependencies = [
"base64",
"bitflags",
@@ -3570,9 +3570,9 @@ dependencies = [
[[package]]
name = "mongodb-internal-macros"
version = "3.5.1"
version = "3.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a973ef3dd3dbc6f6e65bbdecfd9ec5e781b9e7493b0f369a7c62e35d8e5ae2c8"
checksum = "9e5758dc828eb2d02ec30563cba365609d56ddd833190b192beaee2b475a7bb3"
dependencies = [
"macro_magic",
"proc-macro2",
@@ -4699,9 +4699,9 @@ dependencies = [
[[package]]
name = "rustls-webpki"
version = "0.103.10"
version = "0.103.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef"
checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e"
dependencies = [
"ring",
"rustls-pki-types",
+1
View File
@@ -44,3 +44,4 @@ RUN mkdir -p /data/compliance-scanner/ssh
EXPOSE 3001 3002
ENTRYPOINT ["compliance-agent"]
+1
View File
@@ -20,3 +20,4 @@ ENV IP=0.0.0.0
EXPOSE 8080
ENTRYPOINT ["./compliance-dashboard"]
+1
View File
@@ -12,3 +12,4 @@ RUN rm /etc/nginx/conf.d/default.conf
COPY docs/nginx.conf /etc/nginx/conf.d/default.conf
COPY --from=builder /app/.vitepress/dist /usr/share/nginx/html
EXPOSE 80
+1
View File
@@ -14,3 +14,4 @@ EXPOSE 8090
ENV MCP_PORT=8090
ENTRYPOINT ["compliance-mcp"]
+6 -1
View File
@@ -35,11 +35,16 @@ impl ComplianceAgent {
config.litellm_model.clone(),
config.litellm_embed_model.clone(),
));
let http = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.connect_timeout(std::time::Duration::from_secs(10))
.build()
.unwrap_or_default();
Self {
config,
db,
llm,
http: reqwest::Client::new(),
http,
session_streams: Arc::new(DashMap::new()),
session_pause: Arc::new(DashMap::new()),
session_semaphore: Arc::new(Semaphore::new(DEFAULT_MAX_CONCURRENT_SESSIONS)),
+6 -1
View File
@@ -19,12 +19,17 @@ impl LlmClient {
model: String,
embed_model: String,
) -> Self {
let http = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(300))
.connect_timeout(std::time::Duration::from_secs(10))
.build()
.unwrap_or_default();
Self {
base_url,
api_key,
model,
embed_model,
http: reqwest::Client::new(),
http,
}
}
+1 -1
View File
@@ -4,7 +4,7 @@ use compliance_agent::{agent, api, config, database, scheduler, ssh, webhooks};
async fn main() -> Result<(), Box<dyn std::error::Error>> {
match dotenvy::dotenv() {
Ok(path) => eprintln!("[dotenv] Loaded from: {}", path.display()),
Err(e) => eprintln!("[dotenv] FAILED: {e}"),
Err(_) => eprintln!("[dotenv] No .env file found, using environment variables"),
}
let _telemetry_guard = compliance_core::telemetry::init_telemetry("compliance-agent");
+27 -20
View File
@@ -19,26 +19,33 @@ impl Scanner for GitleaksScanner {
#[tracing::instrument(skip_all)]
async fn scan(&self, repo_path: &Path, repo_id: &str) -> Result<ScanOutput, CoreError> {
let output = tokio::process::Command::new("gitleaks")
.args([
"detect",
"--source",
".",
"--report-format",
"json",
"--report-path",
"/dev/stdout",
"--no-banner",
"--exit-code",
"0",
])
.current_dir(repo_path)
.output()
.await
.map_err(|e| CoreError::Scanner {
scanner: "gitleaks".to_string(),
source: Box::new(e),
})?;
let output = tokio::time::timeout(
std::time::Duration::from_secs(300),
tokio::process::Command::new("gitleaks")
.args([
"detect",
"--source",
".",
"--report-format",
"json",
"--report-path",
"/dev/stdout",
"--no-banner",
"--exit-code",
"0",
])
.current_dir(repo_path)
.output(),
)
.await
.map_err(|_| CoreError::Scanner {
scanner: "gitleaks".to_string(),
source: "timed out after 5 minutes".into(),
})?
.map_err(|e| CoreError::Scanner {
scanner: "gitleaks".to_string(),
source: Box::new(e),
})?;
if output.stdout.is_empty() {
return Ok(ScanOutput::default());
+15 -8
View File
@@ -174,19 +174,26 @@ impl PipelineOrchestrator {
k.expose_secret().to_string()
}),
);
let cve_alerts = match async {
cve_scanner
.scan_dependencies(&repo_id, &mut sbom_entries)
.await
}
.instrument(tracing::info_span!("stage_cve_scanning"))
let cve_alerts = match tokio::time::timeout(
std::time::Duration::from_secs(600),
async {
cve_scanner
.scan_dependencies(&repo_id, &mut sbom_entries)
.await
}
.instrument(tracing::info_span!("stage_cve_scanning")),
)
.await
{
Ok(alerts) => alerts,
Err(e) => {
Ok(Ok(alerts)) => alerts,
Ok(Err(e)) => {
tracing::warn!("[{repo_id}] CVE scanning failed: {e}");
Vec::new()
}
Err(_) => {
tracing::warn!("[{repo_id}] CVE scanning timed out after 10 minutes");
Vec::new()
}
};
// Stage 4: Pattern Scanning (GDPR + OAuth)
+20 -14
View File
@@ -5,20 +5,26 @@ use compliance_core::CoreError;
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub(super) async fn run_syft(repo_path: &Path, repo_id: &str) -> Result<Vec<SbomEntry>, CoreError> {
let output = tokio::process::Command::new("syft")
.arg(repo_path)
.args(["-o", "cyclonedx-json"])
// Enable remote license lookups for all ecosystems
.env("SYFT_GOLANG_SEARCH_REMOTE_LICENSES", "true")
.env("SYFT_JAVASCRIPT_SEARCH_REMOTE_LICENSES", "true")
.env("SYFT_PYTHON_SEARCH_REMOTE_LICENSES", "true")
.env("SYFT_JAVA_USE_NETWORK", "true")
.output()
.await
.map_err(|e| CoreError::Scanner {
scanner: "syft".to_string(),
source: Box::new(e),
})?;
let output = tokio::time::timeout(
std::time::Duration::from_secs(300),
tokio::process::Command::new("syft")
.arg(repo_path)
.args(["-o", "cyclonedx-json"])
.env("SYFT_GOLANG_SEARCH_REMOTE_LICENSES", "true")
.env("SYFT_JAVASCRIPT_SEARCH_REMOTE_LICENSES", "true")
.env("SYFT_PYTHON_SEARCH_REMOTE_LICENSES", "true")
.env("SYFT_JAVA_USE_NETWORK", "true")
.output(),
)
.await
.map_err(|_| CoreError::Scanner {
scanner: "syft".to_string(),
source: "timed out after 5 minutes".into(),
})?
.map_err(|e| CoreError::Scanner {
scanner: "syft".to_string(),
source: Box::new(e),
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
+24 -9
View File
@@ -19,15 +19,30 @@ impl Scanner for SemgrepScanner {
#[tracing::instrument(skip_all)]
async fn scan(&self, repo_path: &Path, repo_id: &str) -> Result<ScanOutput, CoreError> {
let output = tokio::process::Command::new("semgrep")
.args(["--config=auto", "--json", "--quiet"])
.arg(repo_path)
.output()
.await
.map_err(|e| CoreError::Scanner {
scanner: "semgrep".to_string(),
source: Box::new(e),
})?;
let output = tokio::time::timeout(
std::time::Duration::from_secs(600),
tokio::process::Command::new("semgrep")
.args([
"--config=auto",
"--json",
"--quiet",
"--max-memory",
"500",
"--jobs",
"1",
])
.arg(repo_path)
.output(),
)
.await
.map_err(|_| CoreError::Scanner {
scanner: "semgrep".to_string(),
source: "timed out after 10 minutes".into(),
})?
.map_err(|e| CoreError::Scanner {
scanner: "semgrep".to_string(),
source: Box::new(e),
})?;
if !output.status.success() && output.stdout.is_empty() {
let stderr = String::from_utf8_lossy(&output.stderr);
+87 -19
View File
@@ -6,11 +6,16 @@ use compliance_core::models::embedding::{CodeEmbedding, EmbeddingBuildRun, Embed
use compliance_core::models::graph::CodeNode;
use compliance_graph::graph::chunking::extract_chunks;
use compliance_graph::graph::embedding_store::EmbeddingStore;
use futures_util::stream::{FuturesUnordered, StreamExt};
use tracing::{error, info};
use crate::error::AgentError;
use crate::llm::LlmClient;
const EMBED_BATCH_SIZE: usize = 20;
const EMBED_CONCURRENCY: usize = 4;
const EMBED_FLUSH_EVERY: usize = 200;
/// RAG pipeline for building embeddings and performing retrieval
pub struct RagPipeline {
llm: Arc<LlmClient>,
@@ -77,25 +82,33 @@ impl RagPipeline {
.await
.map_err(|e| AgentError::Other(format!("Failed to delete old embeddings: {e}")))?;
// Step 3: Batch embed (small batches to stay within model limits)
let batch_size = 20;
let mut all_embeddings = Vec::new();
// Step 3: Batch embed with bounded concurrency. Flush to Mongo and
// update progress periodically so the dashboard can show live status.
let mut pending = Vec::with_capacity(EMBED_FLUSH_EVERY);
let mut embedded_count = 0u32;
for batch_start in (0..chunks.len()).step_by(batch_size) {
let batch_end = (batch_start + batch_size).min(chunks.len());
let batch_chunks = &chunks[batch_start..batch_end];
// Build the list of batch indices to process.
let batches: Vec<(usize, usize)> = (0..chunks.len())
.step_by(EMBED_BATCH_SIZE)
.map(|start| (start, (start + EMBED_BATCH_SIZE).min(chunks.len())))
.collect();
// Prepare texts: context_header + content
let texts: Vec<String> = batch_chunks
.iter()
.map(|c| format!("{}\n{}", c.context_header, c.content))
.collect();
let mut batch_iter = batches.into_iter();
let mut in_flight = FuturesUnordered::new();
match self.llm.embed(texts).await {
Ok(vectors) => {
// Prime up to EMBED_CONCURRENCY batches.
for _ in 0..EMBED_CONCURRENCY {
if let Some((start, end)) = batch_iter.next() {
in_flight.push(self.embed_batch(&chunks[start..end], start, end));
}
}
while let Some(result) = in_flight.next().await {
match result {
Ok((start, end, vectors)) => {
let batch_chunks = &chunks[start..end];
for (chunk, embedding) in batch_chunks.iter().zip(vectors) {
all_embeddings.push(CodeEmbedding {
pending.push(CodeEmbedding {
id: None,
repo_id: repo_id.to_string(),
graph_build_id: graph_build_id.to_string(),
@@ -113,9 +126,45 @@ impl RagPipeline {
});
}
embedded_count += batch_chunks.len() as u32;
// Flush pending embeddings to Mongo periodically and update progress.
if pending.len() >= EMBED_FLUSH_EVERY {
self.embedding_store
.store_embeddings(&pending)
.await
.map_err(|e| {
AgentError::Other(format!("Failed to store embeddings: {e}"))
})?;
pending.clear();
}
// Always update the progress counter on the build doc — even if
// we haven't flushed embeddings yet — so the UI shows movement.
if let Err(e) = self
.embedding_store
.update_build(
repo_id,
graph_build_id,
EmbeddingBuildStatus::Running,
embedded_count,
None,
)
.await
{
error!("[{repo_id}] Failed to update build progress: {e}");
}
// Queue the next batch to keep concurrency saturated.
if let Some((s, e)) = batch_iter.next() {
in_flight.push(self.embed_batch(&chunks[s..e], s, e));
}
}
Err(e) => {
error!("[{repo_id}] Embedding batch failed: {e}");
// Flush whatever we have so partial progress isn't lost.
if !pending.is_empty() {
let _ = self.embedding_store.store_embeddings(&pending).await;
}
build.status = EmbeddingBuildStatus::Failed;
build.error_message = Some(e.to_string());
build.completed_at = Some(Utc::now());
@@ -134,11 +183,13 @@ impl RagPipeline {
}
}
// Step 4: Store all embeddings
self.embedding_store
.store_embeddings(&all_embeddings)
.await
.map_err(|e| AgentError::Other(format!("Failed to store embeddings: {e}")))?;
// Step 4: Flush any remaining embeddings
if !pending.is_empty() {
self.embedding_store
.store_embeddings(&pending)
.await
.map_err(|e| AgentError::Other(format!("Failed to store embeddings: {e}")))?;
}
// Step 5: Update build status
build.status = EmbeddingBuildStatus::Completed;
@@ -161,4 +212,21 @@ impl RagPipeline {
);
Ok(build)
}
/// Embed one batch of chunks. Returns the (start, end, vectors) tuple so
/// out-of-order completion from `FuturesUnordered` can still be reconciled
/// against the original chunk slice.
async fn embed_batch(
&self,
batch_chunks: &[compliance_graph::graph::chunking::CodeChunk],
start: usize,
end: usize,
) -> Result<(usize, usize, Vec<Vec<f64>>), AgentError> {
let texts: Vec<String> = batch_chunks
.iter()
.map(|c| format!("{}\n{}", c.context_header, c.content))
.collect();
let vectors = self.llm.embed(texts).await?;
Ok((start, end, vectors))
}
}
@@ -32,7 +32,7 @@ pub fn AppShell() -> Element {
// Not authenticated — redirect to Keycloak login
rsx! {
document::Script {
dangerous_inner_html: "window.location.href = '/auth';"
"window.location.href = '/auth';"
}
}
}