use std::sync::Arc; use mongodb::bson::doc; use tracing::Instrument; use compliance_core::models::*; use compliance_core::traits::Scanner; use compliance_core::AgentConfig; use crate::database::Database; use crate::error::AgentError; use crate::llm::LlmClient; use crate::pipeline::cve::CveScanner; use crate::pipeline::git::GitOps; use crate::pipeline::gitleaks::GitleaksScanner; use crate::pipeline::lint::LintScanner; use crate::pipeline::patterns::{GdprPatternScanner, OAuthPatternScanner}; use crate::pipeline::sbom::SbomScanner; use crate::pipeline::semgrep::SemgrepScanner; /// Context from graph analysis passed to LLM triage for enhanced filtering #[derive(Debug)] #[allow(dead_code)] pub struct GraphContext { pub node_count: u32, pub edge_count: u32, pub community_count: u32, pub impacts: Vec, } pub struct PipelineOrchestrator { pub(super) config: AgentConfig, pub(super) db: Database, pub(super) llm: Arc, pub(super) http: reqwest::Client, } impl PipelineOrchestrator { pub fn new( config: AgentConfig, db: Database, llm: Arc, http: reqwest::Client, ) -> Self { Self { config, db, llm, http, } } #[tracing::instrument(skip_all, fields(repo_id = %repo_id, trigger = ?trigger))] pub async fn run(&self, repo_id: &str, trigger: ScanTrigger) -> Result<(), AgentError> { // Look up the repository let repo = self .db .repositories() .find_one(doc! { "_id": mongodb::bson::oid::ObjectId::parse_str(repo_id).map_err(|e| AgentError::Other(e.to_string()))? }) .await? .ok_or_else(|| AgentError::Other(format!("Repository {repo_id} not found")))?; // Create scan run let scan_run = ScanRun::new(repo_id.to_string(), trigger); let insert = self.db.scan_runs().insert_one(&scan_run).await?; let scan_run_id = insert .inserted_id .as_object_id() .map(|id| id.to_hex()) .unwrap_or_default(); let result = self.run_pipeline(&repo, &scan_run_id).await; // Update scan run status match &result { Ok(count) => { self.db .scan_runs() .update_one( doc! { "_id": &insert.inserted_id }, doc! { "$set": { "status": "completed", "current_phase": "completed", "new_findings_count": *count as i64, "completed_at": mongodb::bson::DateTime::now(), } }, ) .await?; } Err(e) => { tracing::error!(repo_id, error = %e, "Scan pipeline failed"); self.db .scan_runs() .update_one( doc! { "_id": &insert.inserted_id }, doc! { "$set": { "status": "failed", "error_message": e.to_string(), "completed_at": mongodb::bson::DateTime::now(), } }, ) .await?; } } result.map(|_| ()) } #[tracing::instrument(skip_all, fields(repo_id = repo.name.as_str()))] async fn run_pipeline( &self, repo: &TrackedRepository, scan_run_id: &str, ) -> Result { let repo_id = repo.id.as_ref().map(|id| id.to_hex()).unwrap_or_default(); // Stage 0: Change detection tracing::info!("[{repo_id}] Stage 0: Change detection"); let creds = GitOps::make_repo_credentials(&self.config, repo); let git_ops = GitOps::new(&self.config.git_clone_base_path, creds); let repo_path = git_ops.clone_or_fetch(&repo.git_url, &repo.name)?; if !GitOps::has_new_commits(&repo_path, repo.last_scanned_commit.as_deref())? { tracing::info!("[{repo_id}] No new commits, skipping scan"); return Ok(0); } let current_sha = GitOps::get_head_sha(&repo_path)?; let mut all_findings: Vec = Vec::new(); // Stage 1: Semgrep SAST tracing::info!("[{repo_id}] Stage 1: Semgrep SAST"); self.update_phase(scan_run_id, "sast").await; match async { let semgrep = SemgrepScanner; semgrep.scan(&repo_path, &repo_id).await } .instrument(tracing::info_span!("stage_sast")) .await { Ok(output) => all_findings.extend(output.findings), Err(e) => tracing::warn!("[{repo_id}] Semgrep failed: {e}"), } // Stage 2: SBOM Generation tracing::info!("[{repo_id}] Stage 2: SBOM Generation"); self.update_phase(scan_run_id, "sbom_generation").await; let mut sbom_entries = match async { let sbom_scanner = SbomScanner; sbom_scanner.scan(&repo_path, &repo_id).await } .instrument(tracing::info_span!("stage_sbom_generation")) .await { Ok(output) => output.sbom_entries, Err(e) => { tracing::warn!("[{repo_id}] SBOM generation failed: {e}"); Vec::new() } }; // Stage 3: CVE Scanning tracing::info!("[{repo_id}] Stage 3: CVE Scanning"); self.update_phase(scan_run_id, "cve_scanning").await; let cve_scanner = CveScanner::new( self.http.clone(), self.config.searxng_url.clone(), self.config.nvd_api_key.as_ref().map(|k| { use secrecy::ExposeSecret; k.expose_secret().to_string() }), ); let cve_alerts = match async { cve_scanner .scan_dependencies(&repo_id, &mut sbom_entries) .await } .instrument(tracing::info_span!("stage_cve_scanning")) .await { Ok(alerts) => alerts, Err(e) => { tracing::warn!("[{repo_id}] CVE scanning failed: {e}"); Vec::new() } }; // Stage 4: Pattern Scanning (GDPR + OAuth) tracing::info!("[{repo_id}] Stage 4: Pattern Scanning"); self.update_phase(scan_run_id, "pattern_scanning").await; { let pattern_findings = async { let mut findings = Vec::new(); let gdpr = GdprPatternScanner::new(); match gdpr.scan(&repo_path, &repo_id).await { Ok(output) => findings.extend(output.findings), Err(e) => tracing::warn!("[{repo_id}] GDPR pattern scan failed: {e}"), } let oauth = OAuthPatternScanner::new(); match oauth.scan(&repo_path, &repo_id).await { Ok(output) => findings.extend(output.findings), Err(e) => tracing::warn!("[{repo_id}] OAuth pattern scan failed: {e}"), } findings } .instrument(tracing::info_span!("stage_pattern_scanning")) .await; all_findings.extend(pattern_findings); } // Stage 4a: Secret Detection (Gitleaks) tracing::info!("[{repo_id}] Stage 4a: Secret Detection"); self.update_phase(scan_run_id, "secret_detection").await; match async { let gitleaks = GitleaksScanner; gitleaks.scan(&repo_path, &repo_id).await } .instrument(tracing::info_span!("stage_secret_detection")) .await { Ok(output) => all_findings.extend(output.findings), Err(e) => tracing::warn!("[{repo_id}] Gitleaks failed: {e}"), } // Stage 4b: Lint Scanning tracing::info!("[{repo_id}] Stage 4b: Lint Scanning"); self.update_phase(scan_run_id, "lint_scanning").await; match async { let lint = LintScanner; lint.scan(&repo_path, &repo_id).await } .instrument(tracing::info_span!("stage_lint_scanning")) .await { Ok(output) => all_findings.extend(output.findings), Err(e) => tracing::warn!("[{repo_id}] Lint scanning failed: {e}"), } // Stage 4.5: Graph Building tracing::info!("[{repo_id}] Stage 4.5: Graph Building"); self.update_phase(scan_run_id, "graph_building").await; let graph_context = match async { self.build_code_graph(&repo_path, &repo_id, &all_findings) .await } .instrument(tracing::info_span!("stage_graph_building")) .await { Ok(ctx) => Some(ctx), Err(e) => { tracing::warn!("[{repo_id}] Graph building failed: {e}"); None } }; // Stage 5: LLM Triage (enhanced with graph context) tracing::info!( "[{repo_id}] Stage 5: LLM Triage ({} findings)", all_findings.len() ); self.update_phase(scan_run_id, "llm_triage").await; let triaged = crate::llm::triage::triage_findings( &self.llm, &mut all_findings, graph_context.as_ref(), ) .await; tracing::info!("[{repo_id}] Triaged: {triaged} findings passed confidence threshold"); // Dedup against existing findings and insert new ones let mut new_count = 0u32; let mut new_findings: Vec = Vec::new(); for mut finding in all_findings { finding.scan_run_id = Some(scan_run_id.to_string()); // Check if fingerprint already exists let existing = self .db .findings() .find_one(doc! { "fingerprint": &finding.fingerprint }) .await?; if existing.is_none() { let result = self.db.findings().insert_one(&finding).await?; finding.id = result.inserted_id.as_object_id(); new_findings.push(finding); new_count += 1; } } // Remove stale SBOM entries for this repo before reinserting if !sbom_entries.is_empty() { self.db .sbom_entries() .delete_many(doc! { "repo_id": &repo.id }) .await?; } // Persist SBOM entries for entry in &sbom_entries { let filter = doc! { "repo_id": &entry.repo_id, "name": &entry.name, "version": &entry.version, }; let update = mongodb::bson::to_document(entry) .map(|d| doc! { "$set": d }) .unwrap_or_else(|_| doc! {}); self.db .sbom_entries() .update_one(filter, update) .upsert(true) .await?; } // Persist CVE alerts (upsert by cve_id + repo_id) for alert in &cve_alerts { let filter = doc! { "cve_id": &alert.cve_id, "repo_id": &alert.repo_id, }; let update = mongodb::bson::to_document(alert) .map(|d| doc! { "$set": d }) .unwrap_or_else(|_| doc! {}); self.db .cve_alerts() .update_one(filter, update) .upsert(true) .await?; } // Stage 6: Issue Creation tracing::info!("[{repo_id}] Stage 6: Issue Creation"); self.update_phase(scan_run_id, "issue_creation").await; if let Err(e) = self .create_tracker_issues(repo, &repo_id, &new_findings) .await { tracing::warn!("[{repo_id}] Issue creation failed: {e}"); } // Stage 7: Update repository self.db .repositories() .update_one( doc! { "_id": repo.id }, doc! { "$set": { "last_scanned_commit": ¤t_sha, "updated_at": mongodb::bson::DateTime::now(), }, "$inc": { "findings_count": new_count as i64 }, }, ) .await?; // Stage 8: DAST (async, optional — only if a DastTarget is configured) tracing::info!("[{repo_id}] Stage 8: Checking for DAST targets"); self.update_phase(scan_run_id, "dast_scanning").await; self.maybe_trigger_dast(&repo_id, scan_run_id).await; tracing::info!("[{repo_id}] Scan complete: {new_count} new findings"); Ok(new_count) } pub(super) async fn update_phase(&self, scan_run_id: &str, phase: &str) { if let Ok(oid) = mongodb::bson::oid::ObjectId::parse_str(scan_run_id) { let _ = self .db .scan_runs() .update_one( doc! { "_id": oid }, doc! { "$set": { "current_phase": phase }, "$push": { "phases_completed": phase }, }, ) .await; } } } /// Extract the scheme + host from a git URL. /// e.g. "https://gitea.example.com/owner/repo.git" -> "https://gitea.example.com" /// e.g. "ssh://git@gitea.example.com:22/owner/repo.git" -> "https://gitea.example.com" pub(super) fn extract_base_url(git_url: &str) -> Option { if let Some(rest) = git_url.strip_prefix("https://") { let host = rest.split('/').next()?; Some(format!("https://{host}")) } else if let Some(rest) = git_url.strip_prefix("http://") { let host = rest.split('/').next()?; Some(format!("http://{host}")) } else if let Some(rest) = git_url.strip_prefix("ssh://") { // ssh://git@host:port/path -> extract host let after_at = rest.find('@').map(|i| &rest[i + 1..]).unwrap_or(rest); let host = after_at.split(&[':', '/'][..]).next()?; Some(format!("https://{host}")) } else if let Some(at_pos) = git_url.find('@') { // SCP-style: git@host:owner/repo.git let after_at = &git_url[at_pos + 1..]; let host = after_at.split(':').next()?; Some(format!("https://{host}")) } else { None } }