Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b96dda11fb | |||
| e67a13535a | |||
| df0063abc0 | |||
| 5cafd13f44 | |||
| 69209649a5 | |||
| d5439adc0d | |||
| bc7cdd35e4 | |||
| c062d834a1 |
@@ -0,0 +1,10 @@
|
||||
[advisories]
|
||||
ignore = [
|
||||
# hickory-proto 0.25.x pulled in transitively via mongodb → hickory-resolver.
|
||||
# MongoDB 3.x has not yet released with hickory-resolver 0.26.x, so we cannot
|
||||
# upgrade past this without a mongodb release. Both are DNS-layer DoS vectors
|
||||
# requiring a MITM/controlled DNS server against MongoDB's hostname resolution —
|
||||
# not a realistic attack surface here. Revisit when mongodb bumps hickory.
|
||||
"RUSTSEC-2026-0118", # NSEC3 loop, no fix available upstream
|
||||
"RUSTSEC-2026-0119", # O(n²) name compression, fixed in hickory-proto >=0.26.1
|
||||
]
|
||||
+48
-20
@@ -145,13 +145,20 @@ jobs:
|
||||
needs: [detect-changes]
|
||||
if: needs.detect-changes.outputs.agent == 'true'
|
||||
container:
|
||||
image: alpine:latest
|
||||
image: docker:27-cli
|
||||
steps:
|
||||
- name: Trigger Coolify deploy
|
||||
- name: Build, push and trigger orca redeploy
|
||||
run: |
|
||||
apk add --no-cache curl
|
||||
curl -sf "${{ secrets.COOLIFY_WEBHOOK_AGENT }}" \
|
||||
-H "Authorization: Bearer ${{ secrets.COOLIFY_TOKEN }}"
|
||||
apk add --no-cache git curl openssl
|
||||
git init && git remote add origin "${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY}.git"
|
||||
git fetch --depth=1 origin "${GITHUB_SHA}" && git checkout FETCH_HEAD
|
||||
IMAGE=registry.meghsakha.com/compliance-agent
|
||||
echo "${{ secrets.REGISTRY_PASSWORD }}" | docker login registry.meghsakha.com -u "${{ secrets.REGISTRY_USERNAME }}" --password-stdin
|
||||
docker build -f Dockerfile.agent -t "$IMAGE:latest" -t "$IMAGE:${GITHUB_SHA}" .
|
||||
docker push "$IMAGE:latest" && docker push "$IMAGE:${GITHUB_SHA}"
|
||||
PAYLOAD=$(printf '{"ref":"refs/heads/main","repository":{"full_name":"sharang/compliance-scanner-agent"},"head_commit":{"id":"%s","message":"deploy agent"}}' "${GITHUB_SHA}")
|
||||
SIG=$(printf '%s' "$PAYLOAD" | openssl dgst -sha256 -hmac "${{ secrets.ORCA_WEBHOOK_SECRET }}" | awk '{print $2}')
|
||||
RESP=$(curl -fsS -w "\nHTTP %{http_code}" -X POST "http://46.225.100.82:6880/api/v1/webhooks/github" -H "Content-Type: application/json" -H "X-Hub-Signature-256: sha256=$SIG" -d "$PAYLOAD"); echo "$RESP"
|
||||
|
||||
deploy-dashboard:
|
||||
name: Deploy Dashboard
|
||||
@@ -159,13 +166,20 @@ jobs:
|
||||
needs: [detect-changes]
|
||||
if: needs.detect-changes.outputs.dashboard == 'true'
|
||||
container:
|
||||
image: alpine:latest
|
||||
image: docker:27-cli
|
||||
steps:
|
||||
- name: Trigger Coolify deploy
|
||||
- name: Build, push and trigger orca redeploy
|
||||
run: |
|
||||
apk add --no-cache curl
|
||||
curl -sf "${{ secrets.COOLIFY_WEBHOOK_DASHBOARD }}" \
|
||||
-H "Authorization: Bearer ${{ secrets.COOLIFY_TOKEN }}"
|
||||
apk add --no-cache git curl openssl
|
||||
git init && git remote add origin "${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY}.git"
|
||||
git fetch --depth=1 origin "${GITHUB_SHA}" && git checkout FETCH_HEAD
|
||||
IMAGE=registry.meghsakha.com/compliance-dashboard
|
||||
echo "${{ secrets.REGISTRY_PASSWORD }}" | docker login registry.meghsakha.com -u "${{ secrets.REGISTRY_USERNAME }}" --password-stdin
|
||||
docker build -f Dockerfile.dashboard -t "$IMAGE:latest" -t "$IMAGE:${GITHUB_SHA}" .
|
||||
docker push "$IMAGE:latest" && docker push "$IMAGE:${GITHUB_SHA}"
|
||||
PAYLOAD=$(printf '{"ref":"refs/heads/main","repository":{"full_name":"sharang/compliance-scanner-agent"},"head_commit":{"id":"%s","message":"deploy dashboard"}}' "${GITHUB_SHA}")
|
||||
SIG=$(printf '%s' "$PAYLOAD" | openssl dgst -sha256 -hmac "${{ secrets.ORCA_WEBHOOK_SECRET }}" | awk '{print $2}')
|
||||
RESP=$(curl -fsS -w "\nHTTP %{http_code}" -X POST "http://46.225.100.82:6880/api/v1/webhooks/github" -H "Content-Type: application/json" -H "X-Hub-Signature-256: sha256=$SIG" -d "$PAYLOAD"); echo "$RESP"
|
||||
|
||||
deploy-docs:
|
||||
name: Deploy Docs
|
||||
@@ -173,13 +187,20 @@ jobs:
|
||||
needs: [detect-changes]
|
||||
if: needs.detect-changes.outputs.docs == 'true'
|
||||
container:
|
||||
image: alpine:latest
|
||||
image: docker:27-cli
|
||||
steps:
|
||||
- name: Trigger Coolify deploy
|
||||
- name: Build, push and trigger orca redeploy
|
||||
run: |
|
||||
apk add --no-cache curl
|
||||
curl -sf "${{ secrets.COOLIFY_WEBHOOK_DOCS }}" \
|
||||
-H "Authorization: Bearer ${{ secrets.COOLIFY_TOKEN }}"
|
||||
apk add --no-cache git curl openssl
|
||||
git init && git remote add origin "${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY}.git"
|
||||
git fetch --depth=1 origin "${GITHUB_SHA}" && git checkout FETCH_HEAD
|
||||
IMAGE=registry.meghsakha.com/compliance-docs
|
||||
echo "${{ secrets.REGISTRY_PASSWORD }}" | docker login registry.meghsakha.com -u "${{ secrets.REGISTRY_USERNAME }}" --password-stdin
|
||||
docker build -f Dockerfile.docs -t "$IMAGE:latest" -t "$IMAGE:${GITHUB_SHA}" .
|
||||
docker push "$IMAGE:latest" && docker push "$IMAGE:${GITHUB_SHA}"
|
||||
PAYLOAD=$(printf '{"ref":"refs/heads/main","repository":{"full_name":"sharang/compliance-scanner-agent"},"head_commit":{"id":"%s","message":"deploy docs"}}' "${GITHUB_SHA}")
|
||||
SIG=$(printf '%s' "$PAYLOAD" | openssl dgst -sha256 -hmac "${{ secrets.ORCA_WEBHOOK_SECRET }}" | awk '{print $2}')
|
||||
RESP=$(curl -fsS -w "\nHTTP %{http_code}" -X POST "http://46.225.100.82:6880/api/v1/webhooks/github" -H "Content-Type: application/json" -H "X-Hub-Signature-256: sha256=$SIG" -d "$PAYLOAD"); echo "$RESP"
|
||||
|
||||
deploy-mcp:
|
||||
name: Deploy MCP
|
||||
@@ -187,10 +208,17 @@ jobs:
|
||||
needs: [detect-changes]
|
||||
if: needs.detect-changes.outputs.mcp == 'true'
|
||||
container:
|
||||
image: alpine:latest
|
||||
image: docker:27-cli
|
||||
steps:
|
||||
- name: Trigger Coolify deploy
|
||||
- name: Build, push and trigger orca redeploy
|
||||
run: |
|
||||
apk add --no-cache curl
|
||||
curl -sf "${{ secrets.COOLIFY_WEBHOOK_MCP }}" \
|
||||
-H "Authorization: Bearer ${{ secrets.COOLIFY_TOKEN }}"
|
||||
apk add --no-cache git curl openssl
|
||||
git init && git remote add origin "${GITHUB_SERVER_URL}/${GITHUB_REPOSITORY}.git"
|
||||
git fetch --depth=1 origin "${GITHUB_SHA}" && git checkout FETCH_HEAD
|
||||
IMAGE=registry.meghsakha.com/compliance-mcp
|
||||
echo "${{ secrets.REGISTRY_PASSWORD }}" | docker login registry.meghsakha.com -u "${{ secrets.REGISTRY_USERNAME }}" --password-stdin
|
||||
docker build -f Dockerfile.mcp -t "$IMAGE:latest" -t "$IMAGE:${GITHUB_SHA}" .
|
||||
docker push "$IMAGE:latest" && docker push "$IMAGE:${GITHUB_SHA}"
|
||||
PAYLOAD=$(printf '{"ref":"refs/heads/main","repository":{"full_name":"sharang/compliance-scanner-agent"},"head_commit":{"id":"%s","message":"deploy mcp"}}' "${GITHUB_SHA}")
|
||||
SIG=$(printf '%s' "$PAYLOAD" | openssl dgst -sha256 -hmac "${{ secrets.ORCA_WEBHOOK_SECRET }}" | awk '{print $2}')
|
||||
RESP=$(curl -fsS -w "\nHTTP %{http_code}" -X POST "http://46.225.100.82:6880/api/v1/webhooks/github" -H "Content-Type: application/json" -H "X-Hub-Signature-256: sha256=$SIG" -d "$PAYLOAD"); echo "$RESP"
|
||||
|
||||
Generated
+6
-6
@@ -3524,9 +3524,9 @@ checksum = "224484c5d09285a7b8cb0a0c117e847ebd14cb6e4470ecf68cdb89c503b0edb9"
|
||||
|
||||
[[package]]
|
||||
name = "mongodb"
|
||||
version = "3.5.1"
|
||||
version = "3.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "803dd859e8afa084c255a8effd8000ff86f7c8076a50cd6d8c99e8f3496f75c2"
|
||||
checksum = "1ef2c933617431ad0246fb5b43c425ebdae18c7f7259c87de0726d93b0e7e91b"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"bitflags",
|
||||
@@ -3570,9 +3570,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "mongodb-internal-macros"
|
||||
version = "3.5.1"
|
||||
version = "3.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a973ef3dd3dbc6f6e65bbdecfd9ec5e781b9e7493b0f369a7c62e35d8e5ae2c8"
|
||||
checksum = "9e5758dc828eb2d02ec30563cba365609d56ddd833190b192beaee2b475a7bb3"
|
||||
dependencies = [
|
||||
"macro_magic",
|
||||
"proc-macro2",
|
||||
@@ -4699,9 +4699,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.103.10"
|
||||
version = "0.103.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef"
|
||||
checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e"
|
||||
dependencies = [
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
|
||||
@@ -44,3 +44,4 @@ RUN mkdir -p /data/compliance-scanner/ssh
|
||||
EXPOSE 3001 3002
|
||||
|
||||
ENTRYPOINT ["compliance-agent"]
|
||||
|
||||
|
||||
@@ -20,3 +20,4 @@ ENV IP=0.0.0.0
|
||||
EXPOSE 8080
|
||||
|
||||
ENTRYPOINT ["./compliance-dashboard"]
|
||||
|
||||
|
||||
@@ -12,3 +12,4 @@ RUN rm /etc/nginx/conf.d/default.conf
|
||||
COPY docs/nginx.conf /etc/nginx/conf.d/default.conf
|
||||
COPY --from=builder /app/.vitepress/dist /usr/share/nginx/html
|
||||
EXPOSE 80
|
||||
|
||||
|
||||
@@ -14,3 +14,4 @@ EXPOSE 8090
|
||||
ENV MCP_PORT=8090
|
||||
|
||||
ENTRYPOINT ["compliance-mcp"]
|
||||
|
||||
|
||||
@@ -35,11 +35,16 @@ impl ComplianceAgent {
|
||||
config.litellm_model.clone(),
|
||||
config.litellm_embed_model.clone(),
|
||||
));
|
||||
let http = reqwest::Client::builder()
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.connect_timeout(std::time::Duration::from_secs(10))
|
||||
.build()
|
||||
.unwrap_or_default();
|
||||
Self {
|
||||
config,
|
||||
db,
|
||||
llm,
|
||||
http: reqwest::Client::new(),
|
||||
http,
|
||||
session_streams: Arc::new(DashMap::new()),
|
||||
session_pause: Arc::new(DashMap::new()),
|
||||
session_semaphore: Arc::new(Semaphore::new(DEFAULT_MAX_CONCURRENT_SESSIONS)),
|
||||
|
||||
@@ -19,12 +19,17 @@ impl LlmClient {
|
||||
model: String,
|
||||
embed_model: String,
|
||||
) -> Self {
|
||||
let http = reqwest::Client::builder()
|
||||
.timeout(std::time::Duration::from_secs(300))
|
||||
.connect_timeout(std::time::Duration::from_secs(10))
|
||||
.build()
|
||||
.unwrap_or_default();
|
||||
Self {
|
||||
base_url,
|
||||
api_key,
|
||||
model,
|
||||
embed_model,
|
||||
http: reqwest::Client::new(),
|
||||
http,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ use compliance_agent::{agent, api, config, database, scheduler, ssh, webhooks};
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
match dotenvy::dotenv() {
|
||||
Ok(path) => eprintln!("[dotenv] Loaded from: {}", path.display()),
|
||||
Err(e) => eprintln!("[dotenv] FAILED: {e}"),
|
||||
Err(_) => eprintln!("[dotenv] No .env file found, using environment variables"),
|
||||
}
|
||||
|
||||
let _telemetry_guard = compliance_core::telemetry::init_telemetry("compliance-agent");
|
||||
|
||||
@@ -19,7 +19,9 @@ impl Scanner for GitleaksScanner {
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn scan(&self, repo_path: &Path, repo_id: &str) -> Result<ScanOutput, CoreError> {
|
||||
let output = tokio::process::Command::new("gitleaks")
|
||||
let output = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(300),
|
||||
tokio::process::Command::new("gitleaks")
|
||||
.args([
|
||||
"detect",
|
||||
"--source",
|
||||
@@ -33,8 +35,13 @@ impl Scanner for GitleaksScanner {
|
||||
"0",
|
||||
])
|
||||
.current_dir(repo_path)
|
||||
.output()
|
||||
.output(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| CoreError::Scanner {
|
||||
scanner: "gitleaks".to_string(),
|
||||
source: "timed out after 5 minutes".into(),
|
||||
})?
|
||||
.map_err(|e| CoreError::Scanner {
|
||||
scanner: "gitleaks".to_string(),
|
||||
source: Box::new(e),
|
||||
|
||||
@@ -174,19 +174,26 @@ impl PipelineOrchestrator {
|
||||
k.expose_secret().to_string()
|
||||
}),
|
||||
);
|
||||
let cve_alerts = match async {
|
||||
let cve_alerts = match tokio::time::timeout(
|
||||
std::time::Duration::from_secs(600),
|
||||
async {
|
||||
cve_scanner
|
||||
.scan_dependencies(&repo_id, &mut sbom_entries)
|
||||
.await
|
||||
}
|
||||
.instrument(tracing::info_span!("stage_cve_scanning"))
|
||||
.instrument(tracing::info_span!("stage_cve_scanning")),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(alerts) => alerts,
|
||||
Err(e) => {
|
||||
Ok(Ok(alerts)) => alerts,
|
||||
Ok(Err(e)) => {
|
||||
tracing::warn!("[{repo_id}] CVE scanning failed: {e}");
|
||||
Vec::new()
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::warn!("[{repo_id}] CVE scanning timed out after 10 minutes");
|
||||
Vec::new()
|
||||
}
|
||||
};
|
||||
|
||||
// Stage 4: Pattern Scanning (GDPR + OAuth)
|
||||
|
||||
@@ -5,16 +5,22 @@ use compliance_core::CoreError;
|
||||
|
||||
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
|
||||
pub(super) async fn run_syft(repo_path: &Path, repo_id: &str) -> Result<Vec<SbomEntry>, CoreError> {
|
||||
let output = tokio::process::Command::new("syft")
|
||||
let output = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(300),
|
||||
tokio::process::Command::new("syft")
|
||||
.arg(repo_path)
|
||||
.args(["-o", "cyclonedx-json"])
|
||||
// Enable remote license lookups for all ecosystems
|
||||
.env("SYFT_GOLANG_SEARCH_REMOTE_LICENSES", "true")
|
||||
.env("SYFT_JAVASCRIPT_SEARCH_REMOTE_LICENSES", "true")
|
||||
.env("SYFT_PYTHON_SEARCH_REMOTE_LICENSES", "true")
|
||||
.env("SYFT_JAVA_USE_NETWORK", "true")
|
||||
.output()
|
||||
.output(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| CoreError::Scanner {
|
||||
scanner: "syft".to_string(),
|
||||
source: "timed out after 5 minutes".into(),
|
||||
})?
|
||||
.map_err(|e| CoreError::Scanner {
|
||||
scanner: "syft".to_string(),
|
||||
source: Box::new(e),
|
||||
|
||||
@@ -19,11 +19,26 @@ impl Scanner for SemgrepScanner {
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn scan(&self, repo_path: &Path, repo_id: &str) -> Result<ScanOutput, CoreError> {
|
||||
let output = tokio::process::Command::new("semgrep")
|
||||
.args(["--config=auto", "--json", "--quiet"])
|
||||
let output = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(600),
|
||||
tokio::process::Command::new("semgrep")
|
||||
.args([
|
||||
"--config=auto",
|
||||
"--json",
|
||||
"--quiet",
|
||||
"--max-memory",
|
||||
"500",
|
||||
"--jobs",
|
||||
"1",
|
||||
])
|
||||
.arg(repo_path)
|
||||
.output()
|
||||
.output(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| CoreError::Scanner {
|
||||
scanner: "semgrep".to_string(),
|
||||
source: "timed out after 10 minutes".into(),
|
||||
})?
|
||||
.map_err(|e| CoreError::Scanner {
|
||||
scanner: "semgrep".to_string(),
|
||||
source: Box::new(e),
|
||||
|
||||
@@ -6,11 +6,16 @@ use compliance_core::models::embedding::{CodeEmbedding, EmbeddingBuildRun, Embed
|
||||
use compliance_core::models::graph::CodeNode;
|
||||
use compliance_graph::graph::chunking::extract_chunks;
|
||||
use compliance_graph::graph::embedding_store::EmbeddingStore;
|
||||
use futures_util::stream::{FuturesUnordered, StreamExt};
|
||||
use tracing::{error, info};
|
||||
|
||||
use crate::error::AgentError;
|
||||
use crate::llm::LlmClient;
|
||||
|
||||
const EMBED_BATCH_SIZE: usize = 20;
|
||||
const EMBED_CONCURRENCY: usize = 4;
|
||||
const EMBED_FLUSH_EVERY: usize = 200;
|
||||
|
||||
/// RAG pipeline for building embeddings and performing retrieval
|
||||
pub struct RagPipeline {
|
||||
llm: Arc<LlmClient>,
|
||||
@@ -77,25 +82,33 @@ impl RagPipeline {
|
||||
.await
|
||||
.map_err(|e| AgentError::Other(format!("Failed to delete old embeddings: {e}")))?;
|
||||
|
||||
// Step 3: Batch embed (small batches to stay within model limits)
|
||||
let batch_size = 20;
|
||||
let mut all_embeddings = Vec::new();
|
||||
// Step 3: Batch embed with bounded concurrency. Flush to Mongo and
|
||||
// update progress periodically so the dashboard can show live status.
|
||||
let mut pending = Vec::with_capacity(EMBED_FLUSH_EVERY);
|
||||
let mut embedded_count = 0u32;
|
||||
|
||||
for batch_start in (0..chunks.len()).step_by(batch_size) {
|
||||
let batch_end = (batch_start + batch_size).min(chunks.len());
|
||||
let batch_chunks = &chunks[batch_start..batch_end];
|
||||
|
||||
// Prepare texts: context_header + content
|
||||
let texts: Vec<String> = batch_chunks
|
||||
.iter()
|
||||
.map(|c| format!("{}\n{}", c.context_header, c.content))
|
||||
// Build the list of batch indices to process.
|
||||
let batches: Vec<(usize, usize)> = (0..chunks.len())
|
||||
.step_by(EMBED_BATCH_SIZE)
|
||||
.map(|start| (start, (start + EMBED_BATCH_SIZE).min(chunks.len())))
|
||||
.collect();
|
||||
|
||||
match self.llm.embed(texts).await {
|
||||
Ok(vectors) => {
|
||||
let mut batch_iter = batches.into_iter();
|
||||
let mut in_flight = FuturesUnordered::new();
|
||||
|
||||
// Prime up to EMBED_CONCURRENCY batches.
|
||||
for _ in 0..EMBED_CONCURRENCY {
|
||||
if let Some((start, end)) = batch_iter.next() {
|
||||
in_flight.push(self.embed_batch(&chunks[start..end], start, end));
|
||||
}
|
||||
}
|
||||
|
||||
while let Some(result) = in_flight.next().await {
|
||||
match result {
|
||||
Ok((start, end, vectors)) => {
|
||||
let batch_chunks = &chunks[start..end];
|
||||
for (chunk, embedding) in batch_chunks.iter().zip(vectors) {
|
||||
all_embeddings.push(CodeEmbedding {
|
||||
pending.push(CodeEmbedding {
|
||||
id: None,
|
||||
repo_id: repo_id.to_string(),
|
||||
graph_build_id: graph_build_id.to_string(),
|
||||
@@ -113,9 +126,45 @@ impl RagPipeline {
|
||||
});
|
||||
}
|
||||
embedded_count += batch_chunks.len() as u32;
|
||||
|
||||
// Flush pending embeddings to Mongo periodically and update progress.
|
||||
if pending.len() >= EMBED_FLUSH_EVERY {
|
||||
self.embedding_store
|
||||
.store_embeddings(&pending)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
AgentError::Other(format!("Failed to store embeddings: {e}"))
|
||||
})?;
|
||||
pending.clear();
|
||||
}
|
||||
|
||||
// Always update the progress counter on the build doc — even if
|
||||
// we haven't flushed embeddings yet — so the UI shows movement.
|
||||
if let Err(e) = self
|
||||
.embedding_store
|
||||
.update_build(
|
||||
repo_id,
|
||||
graph_build_id,
|
||||
EmbeddingBuildStatus::Running,
|
||||
embedded_count,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("[{repo_id}] Failed to update build progress: {e}");
|
||||
}
|
||||
|
||||
// Queue the next batch to keep concurrency saturated.
|
||||
if let Some((s, e)) = batch_iter.next() {
|
||||
in_flight.push(self.embed_batch(&chunks[s..e], s, e));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("[{repo_id}] Embedding batch failed: {e}");
|
||||
// Flush whatever we have so partial progress isn't lost.
|
||||
if !pending.is_empty() {
|
||||
let _ = self.embedding_store.store_embeddings(&pending).await;
|
||||
}
|
||||
build.status = EmbeddingBuildStatus::Failed;
|
||||
build.error_message = Some(e.to_string());
|
||||
build.completed_at = Some(Utc::now());
|
||||
@@ -134,11 +183,13 @@ impl RagPipeline {
|
||||
}
|
||||
}
|
||||
|
||||
// Step 4: Store all embeddings
|
||||
// Step 4: Flush any remaining embeddings
|
||||
if !pending.is_empty() {
|
||||
self.embedding_store
|
||||
.store_embeddings(&all_embeddings)
|
||||
.store_embeddings(&pending)
|
||||
.await
|
||||
.map_err(|e| AgentError::Other(format!("Failed to store embeddings: {e}")))?;
|
||||
}
|
||||
|
||||
// Step 5: Update build status
|
||||
build.status = EmbeddingBuildStatus::Completed;
|
||||
@@ -161,4 +212,21 @@ impl RagPipeline {
|
||||
);
|
||||
Ok(build)
|
||||
}
|
||||
|
||||
/// Embed one batch of chunks. Returns the (start, end, vectors) tuple so
|
||||
/// out-of-order completion from `FuturesUnordered` can still be reconciled
|
||||
/// against the original chunk slice.
|
||||
async fn embed_batch(
|
||||
&self,
|
||||
batch_chunks: &[compliance_graph::graph::chunking::CodeChunk],
|
||||
start: usize,
|
||||
end: usize,
|
||||
) -> Result<(usize, usize, Vec<Vec<f64>>), AgentError> {
|
||||
let texts: Vec<String> = batch_chunks
|
||||
.iter()
|
||||
.map(|c| format!("{}\n{}", c.context_header, c.content))
|
||||
.collect();
|
||||
let vectors = self.llm.embed(texts).await?;
|
||||
Ok((start, end, vectors))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ pub fn AppShell() -> Element {
|
||||
// Not authenticated — redirect to Keycloak login
|
||||
rsx! {
|
||||
document::Script {
|
||||
dangerous_inner_html: "window.location.href = '/auth';"
|
||||
"window.location.href = '/auth';"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user