use std::sync::Arc; use mongodb::bson::doc; use compliance_core::models::*; use compliance_core::traits::Scanner; use compliance_core::AgentConfig; use crate::database::Database; use crate::error::AgentError; use crate::llm::LlmClient; use crate::pipeline::code_review::CodeReviewScanner; use crate::pipeline::cve::CveScanner; use crate::pipeline::git::GitOps; use crate::pipeline::gitleaks::GitleaksScanner; use crate::pipeline::lint::LintScanner; use crate::pipeline::patterns::{GdprPatternScanner, OAuthPatternScanner}; use crate::pipeline::sbom::SbomScanner; use crate::pipeline::semgrep::SemgrepScanner; /// Context from graph analysis passed to LLM triage for enhanced filtering #[derive(Debug)] #[allow(dead_code)] pub struct GraphContext { pub node_count: u32, pub edge_count: u32, pub community_count: u32, pub impacts: Vec, } pub struct PipelineOrchestrator { config: AgentConfig, db: Database, llm: Arc, http: reqwest::Client, } impl PipelineOrchestrator { pub fn new( config: AgentConfig, db: Database, llm: Arc, http: reqwest::Client, ) -> Self { Self { config, db, llm, http, } } pub async fn run(&self, repo_id: &str, trigger: ScanTrigger) -> Result<(), AgentError> { // Look up the repository let repo = self .db .repositories() .find_one(doc! { "_id": mongodb::bson::oid::ObjectId::parse_str(repo_id).map_err(|e| AgentError::Other(e.to_string()))? }) .await? .ok_or_else(|| AgentError::Other(format!("Repository {repo_id} not found")))?; // Create scan run let scan_run = ScanRun::new(repo_id.to_string(), trigger); let insert = self.db.scan_runs().insert_one(&scan_run).await?; let scan_run_id = insert .inserted_id .as_object_id() .map(|id| id.to_hex()) .unwrap_or_default(); let result = self.run_pipeline(&repo, &scan_run_id).await; // Update scan run status match &result { Ok(count) => { self.db .scan_runs() .update_one( doc! { "_id": &insert.inserted_id }, doc! { "$set": { "status": "completed", "current_phase": "completed", "new_findings_count": *count as i64, "completed_at": mongodb::bson::DateTime::now(), } }, ) .await?; } Err(e) => { self.db .scan_runs() .update_one( doc! { "_id": &insert.inserted_id }, doc! { "$set": { "status": "failed", "error_message": e.to_string(), "completed_at": mongodb::bson::DateTime::now(), } }, ) .await?; } } result.map(|_| ()) } async fn run_pipeline( &self, repo: &TrackedRepository, scan_run_id: &str, ) -> Result { let repo_id = repo.id.as_ref().map(|id| id.to_hex()).unwrap_or_default(); // Stage 0: Change detection tracing::info!("[{repo_id}] Stage 0: Change detection"); let git_ops = GitOps::new(&self.config.git_clone_base_path); let repo_path = git_ops.clone_or_fetch(&repo.git_url, &repo.name)?; if !GitOps::has_new_commits(&repo_path, repo.last_scanned_commit.as_deref())? { tracing::info!("[{repo_id}] No new commits, skipping scan"); return Ok(0); } let current_sha = GitOps::get_head_sha(&repo_path)?; let mut all_findings: Vec = Vec::new(); // Stage 1: Semgrep SAST tracing::info!("[{repo_id}] Stage 1: Semgrep SAST"); self.update_phase(scan_run_id, "sast").await; let semgrep = SemgrepScanner; match semgrep.scan(&repo_path, &repo_id).await { Ok(output) => all_findings.extend(output.findings), Err(e) => tracing::warn!("[{repo_id}] Semgrep failed: {e}"), } // Stage 2: SBOM Generation tracing::info!("[{repo_id}] Stage 2: SBOM Generation"); self.update_phase(scan_run_id, "sbom_generation").await; let sbom_scanner = SbomScanner; let mut sbom_entries = match sbom_scanner.scan(&repo_path, &repo_id).await { Ok(output) => output.sbom_entries, Err(e) => { tracing::warn!("[{repo_id}] SBOM generation failed: {e}"); Vec::new() } }; // Stage 3: CVE Scanning tracing::info!("[{repo_id}] Stage 3: CVE Scanning"); self.update_phase(scan_run_id, "cve_scanning").await; let cve_scanner = CveScanner::new( self.http.clone(), self.config.searxng_url.clone(), self.config.nvd_api_key.as_ref().map(|k| { use secrecy::ExposeSecret; k.expose_secret().to_string() }), ); let cve_alerts = match cve_scanner .scan_dependencies(&repo_id, &mut sbom_entries) .await { Ok(alerts) => alerts, Err(e) => { tracing::warn!("[{repo_id}] CVE scanning failed: {e}"); Vec::new() } }; // Stage 4: Pattern Scanning (GDPR + OAuth) tracing::info!("[{repo_id}] Stage 4: Pattern Scanning"); self.update_phase(scan_run_id, "pattern_scanning").await; let gdpr = GdprPatternScanner::new(); match gdpr.scan(&repo_path, &repo_id).await { Ok(output) => all_findings.extend(output.findings), Err(e) => tracing::warn!("[{repo_id}] GDPR pattern scan failed: {e}"), } let oauth = OAuthPatternScanner::new(); match oauth.scan(&repo_path, &repo_id).await { Ok(output) => all_findings.extend(output.findings), Err(e) => tracing::warn!("[{repo_id}] OAuth pattern scan failed: {e}"), } // Stage 4a: Secret Detection (Gitleaks) tracing::info!("[{repo_id}] Stage 4a: Secret Detection"); self.update_phase(scan_run_id, "secret_detection").await; let gitleaks = GitleaksScanner; match gitleaks.scan(&repo_path, &repo_id).await { Ok(output) => all_findings.extend(output.findings), Err(e) => tracing::warn!("[{repo_id}] Gitleaks failed: {e}"), } // Stage 4b: Lint Scanning tracing::info!("[{repo_id}] Stage 4b: Lint Scanning"); self.update_phase(scan_run_id, "lint_scanning").await; let lint = LintScanner; match lint.scan(&repo_path, &repo_id).await { Ok(output) => all_findings.extend(output.findings), Err(e) => tracing::warn!("[{repo_id}] Lint scanning failed: {e}"), } // Stage 4c: LLM Code Review (only on incremental scans) if let Some(old_sha) = &repo.last_scanned_commit { tracing::info!("[{repo_id}] Stage 4c: LLM Code Review"); self.update_phase(scan_run_id, "code_review").await; let reviewer = CodeReviewScanner::new(self.llm.clone()); let review_output = reviewer .review_diff(&repo_path, &repo_id, old_sha, ¤t_sha) .await; all_findings.extend(review_output.findings); } // Stage 4.5: Graph Building tracing::info!("[{repo_id}] Stage 4.5: Graph Building"); self.update_phase(scan_run_id, "graph_building").await; let graph_context = match self .build_code_graph(&repo_path, &repo_id, &all_findings) .await { Ok(ctx) => Some(ctx), Err(e) => { tracing::warn!("[{repo_id}] Graph building failed: {e}"); None } }; // Stage 5: LLM Triage (enhanced with graph context) tracing::info!( "[{repo_id}] Stage 5: LLM Triage ({} findings)", all_findings.len() ); self.update_phase(scan_run_id, "llm_triage").await; let triaged = crate::llm::triage::triage_findings( &self.llm, &mut all_findings, graph_context.as_ref(), ) .await; tracing::info!("[{repo_id}] Triaged: {triaged} findings passed confidence threshold"); // Dedup against existing findings and insert new ones let mut new_count = 0u32; for mut finding in all_findings { finding.scan_run_id = Some(scan_run_id.to_string()); // Check if fingerprint already exists let existing = self .db .findings() .find_one(doc! { "fingerprint": &finding.fingerprint }) .await?; if existing.is_none() { self.db.findings().insert_one(&finding).await?; new_count += 1; } } // Persist SBOM entries (upsert by repo_id + name + version) for entry in &sbom_entries { let filter = doc! { "repo_id": &entry.repo_id, "name": &entry.name, "version": &entry.version, }; let update = mongodb::bson::to_document(entry) .map(|d| doc! { "$set": d }) .unwrap_or_else(|_| doc! {}); self.db .sbom_entries() .update_one(filter, update) .upsert(true) .await?; } // Persist CVE alerts (upsert by cve_id + repo_id) for alert in &cve_alerts { let filter = doc! { "cve_id": &alert.cve_id, "repo_id": &alert.repo_id, }; let update = mongodb::bson::to_document(alert) .map(|d| doc! { "$set": d }) .unwrap_or_else(|_| doc! {}); self.db .cve_alerts() .update_one(filter, update) .upsert(true) .await?; } // Stage 6: Issue Creation tracing::info!("[{repo_id}] Stage 6: Issue Creation"); self.update_phase(scan_run_id, "issue_creation").await; // Issue creation is handled by the trackers module - deferred to agent // Stage 7: Update repository self.db .repositories() .update_one( doc! { "_id": repo.id }, doc! { "$set": { "last_scanned_commit": ¤t_sha, "updated_at": mongodb::bson::DateTime::now(), }, "$inc": { "findings_count": new_count as i64 }, }, ) .await?; // Stage 8: DAST (async, optional — only if a DastTarget is configured) tracing::info!("[{repo_id}] Stage 8: Checking for DAST targets"); self.update_phase(scan_run_id, "dast_scanning").await; self.maybe_trigger_dast(&repo_id, scan_run_id).await; tracing::info!("[{repo_id}] Scan complete: {new_count} new findings"); Ok(new_count) } /// Build the code knowledge graph for a repo and compute impact analyses async fn build_code_graph( &self, repo_path: &std::path::Path, repo_id: &str, findings: &[Finding], ) -> Result { let graph_build_id = uuid::Uuid::new_v4().to_string(); let engine = compliance_graph::GraphEngine::new(50_000); let (mut code_graph, build_run) = engine .build_graph(repo_path, repo_id, &graph_build_id) .map_err(|e| AgentError::Other(format!("Graph build error: {e}")))?; // Apply community detection compliance_graph::graph::community::apply_communities(&mut code_graph); // Store graph in MongoDB let store = compliance_graph::graph::persistence::GraphStore::new(self.db.inner()); store .delete_repo_graph(repo_id) .await .map_err(|e| AgentError::Other(format!("Graph cleanup error: {e}")))?; store .store_graph(&build_run, &code_graph.nodes, &code_graph.edges) .await .map_err(|e| AgentError::Other(format!("Graph store error: {e}")))?; // Compute impact analysis for each finding let analyzer = compliance_graph::GraphEngine::impact_analyzer(&code_graph); let mut impacts = Vec::new(); for finding in findings { if let Some(file_path) = &finding.file_path { let impact = analyzer.analyze( repo_id, &finding.fingerprint, &graph_build_id, file_path, finding.line_number, ); store .store_impact(&impact) .await .map_err(|e| AgentError::Other(format!("Impact store error: {e}")))?; impacts.push(impact); } } Ok(GraphContext { node_count: build_run.node_count, edge_count: build_run.edge_count, community_count: build_run.community_count, impacts, }) } /// Trigger DAST scan if a target is configured for this repo async fn maybe_trigger_dast(&self, repo_id: &str, scan_run_id: &str) { use futures_util::TryStreamExt; let filter = mongodb::bson::doc! { "repo_id": repo_id }; let targets: Vec = match self.db.dast_targets().find(filter).await { Ok(cursor) => cursor.try_collect().await.unwrap_or_default(), Err(_) => return, }; if targets.is_empty() { tracing::info!("[{repo_id}] No DAST targets configured, skipping"); return; } for target in targets { let db = self.db.clone(); let scan_run_id = scan_run_id.to_string(); tokio::spawn(async move { let orchestrator = compliance_dast::DastOrchestrator::new(100); match orchestrator.run_scan(&target, Vec::new()).await { Ok((mut scan_run, findings)) => { scan_run.sast_scan_run_id = Some(scan_run_id); if let Err(e) = db.dast_scan_runs().insert_one(&scan_run).await { tracing::error!("Failed to store DAST scan run: {e}"); } for finding in &findings { if let Err(e) = db.dast_findings().insert_one(finding).await { tracing::error!("Failed to store DAST finding: {e}"); } } tracing::info!("DAST scan complete: {} findings", findings.len()); } Err(e) => { tracing::error!("DAST scan failed: {e}"); } } }); } } async fn update_phase(&self, scan_run_id: &str, phase: &str) { if let Ok(oid) = mongodb::bson::oid::ObjectId::parse_str(scan_run_id) { let _ = self .db .scan_runs() .update_one( doc! { "_id": oid }, doc! { "$set": { "current_phase": phase }, "$push": { "phases_completed": phase }, }, ) .await; } } }