use compliance_core::models::Finding; use super::orchestrator::{GraphContext, PipelineOrchestrator}; use crate::error::AgentError; impl PipelineOrchestrator { /// Build the code knowledge graph for a repo and compute impact analyses pub(super) async fn build_code_graph( &self, repo_path: &std::path::Path, repo_id: &str, findings: &[Finding], ) -> Result { let graph_build_id = uuid::Uuid::new_v4().to_string(); let engine = compliance_graph::GraphEngine::new(50_000); let (mut code_graph, build_run) = engine .build_graph(repo_path, repo_id, &graph_build_id) .map_err(|e| AgentError::Other(format!("Graph build error: {e}")))?; // Apply community detection compliance_graph::graph::community::apply_communities(&mut code_graph); // Store graph in MongoDB let store = compliance_graph::graph::persistence::GraphStore::new(self.db.inner()); store .delete_repo_graph(repo_id) .await .map_err(|e| AgentError::Other(format!("Graph cleanup error: {e}")))?; store .store_graph(&build_run, &code_graph.nodes, &code_graph.edges) .await .map_err(|e| AgentError::Other(format!("Graph store error: {e}")))?; // Compute impact analysis for each finding let analyzer = compliance_graph::GraphEngine::impact_analyzer(&code_graph); let mut impacts = Vec::new(); for finding in findings { if let Some(file_path) = &finding.file_path { let impact = analyzer.analyze( repo_id, &finding.fingerprint, &graph_build_id, file_path, finding.line_number, ); store .store_impact(&impact) .await .map_err(|e| AgentError::Other(format!("Impact store error: {e}")))?; impacts.push(impact); } } Ok(GraphContext { node_count: build_run.node_count, edge_count: build_run.edge_count, community_count: build_run.community_count, impacts, }) } /// Trigger DAST scan if a target is configured for this repo pub(super) async fn maybe_trigger_dast(&self, repo_id: &str, scan_run_id: &str) { use futures_util::TryStreamExt; let filter = mongodb::bson::doc! { "repo_id": repo_id }; let targets: Vec = match self.db.dast_targets().find(filter).await { Ok(cursor) => cursor.try_collect().await.unwrap_or_default(), Err(_) => return, }; if targets.is_empty() { tracing::info!("[{repo_id}] No DAST targets configured, skipping"); return; } for target in targets { let db = self.db.clone(); let scan_run_id = scan_run_id.to_string(); tokio::spawn(async move { let orchestrator = compliance_dast::DastOrchestrator::new(100); match orchestrator.run_scan(&target, Vec::new()).await { Ok((mut scan_run, findings)) => { scan_run.sast_scan_run_id = Some(scan_run_id); if let Err(e) = db.dast_scan_runs().insert_one(&scan_run).await { tracing::error!("Failed to store DAST scan run: {e}"); } for finding in &findings { if let Err(e) = db.dast_findings().insert_one(finding).await { tracing::error!("Failed to store DAST finding: {e}"); } } tracing::info!("DAST scan complete: {} findings", findings.len()); } Err(e) => { tracing::error!("DAST scan failed: {e}"); } } }); } } }