use compliance_core::error::CoreError; use compliance_core::models::graph::{CodeEdge, CodeNode, GraphBuildRun, ImpactAnalysis}; use futures_util::TryStreamExt; use mongodb::bson::doc; use mongodb::options::IndexOptions; use mongodb::{Collection, Database, IndexModel}; use tracing::info; /// MongoDB persistence layer for the code knowledge graph pub struct GraphStore { nodes: Collection, edges: Collection, builds: Collection, impacts: Collection, } impl GraphStore { pub fn new(db: &Database) -> Self { Self { nodes: db.collection("graph_nodes"), edges: db.collection("graph_edges"), builds: db.collection("graph_builds"), impacts: db.collection("impact_analyses"), } } /// Ensure indexes are created pub async fn ensure_indexes(&self) -> Result<(), CoreError> { // graph_nodes: compound index on (repo_id, graph_build_id) self.nodes .create_index( IndexModel::builder() .keys(doc! { "repo_id": 1, "graph_build_id": 1 }) .build(), ) .await?; // graph_nodes: index on qualified_name for lookups self.nodes .create_index( IndexModel::builder() .keys(doc! { "qualified_name": 1 }) .build(), ) .await?; // graph_edges: compound index on (repo_id, graph_build_id) self.edges .create_index( IndexModel::builder() .keys(doc! { "repo_id": 1, "graph_build_id": 1 }) .build(), ) .await?; // graph_builds: compound index on (repo_id, started_at DESC) self.builds .create_index( IndexModel::builder() .keys(doc! { "repo_id": 1, "started_at": -1 }) .build(), ) .await?; // impact_analyses: compound index on (repo_id, finding_id) self.impacts .create_index( IndexModel::builder() .keys(doc! { "repo_id": 1, "finding_id": 1 }) .options(IndexOptions::builder().unique(true).build()) .build(), ) .await?; Ok(()) } /// Store a complete graph build result pub async fn store_graph( &self, build_run: &GraphBuildRun, nodes: &[CodeNode], edges: &[CodeEdge], ) -> Result { // Insert the build run let result = self.builds.insert_one(build_run).await?; let build_id = result .inserted_id .as_object_id() .map(|oid| oid.to_hex()) .unwrap_or_default(); // Insert nodes in batches if !nodes.is_empty() { let batch_size = 1000; for chunk in nodes.chunks(batch_size) { self.nodes.insert_many(chunk.to_vec()).await?; } } // Insert edges in batches if !edges.is_empty() { let batch_size = 1000; for chunk in edges.chunks(batch_size) { self.edges.insert_many(chunk.to_vec()).await?; } } info!( build_id = %build_id, nodes = nodes.len(), edges = edges.len(), "Graph stored to MongoDB" ); Ok(build_id) } /// Delete previous graph data for a repo before storing new graph pub async fn delete_repo_graph(&self, repo_id: &str) -> Result<(), CoreError> { let filter = doc! { "repo_id": repo_id }; self.nodes.delete_many(filter.clone()).await?; self.edges.delete_many(filter.clone()).await?; self.impacts.delete_many(filter).await?; Ok(()) } /// Store an impact analysis result pub async fn store_impact(&self, impact: &ImpactAnalysis) -> Result<(), CoreError> { let filter = doc! { "repo_id": &impact.repo_id, "finding_id": &impact.finding_id, }; let opts = mongodb::options::ReplaceOptions::builder() .upsert(true) .build(); self.impacts .replace_one(filter, impact) .with_options(opts) .await?; Ok(()) } /// Get the latest graph build for a repo pub async fn get_latest_build( &self, repo_id: &str, ) -> Result, CoreError> { let filter = doc! { "repo_id": repo_id }; let opts = mongodb::options::FindOneOptions::builder() .sort(doc! { "started_at": -1 }) .build(); let result = self.builds.find_one(filter).with_options(opts).await?; Ok(result) } /// Get all nodes for a repo's latest graph build pub async fn get_nodes( &self, repo_id: &str, graph_build_id: &str, ) -> Result, CoreError> { let filter = doc! { "repo_id": repo_id, "graph_build_id": graph_build_id, }; let cursor = self.nodes.find(filter).await?; let nodes: Vec = cursor.try_collect().await?; Ok(nodes) } /// Get all edges for a repo's latest graph build pub async fn get_edges( &self, repo_id: &str, graph_build_id: &str, ) -> Result, CoreError> { let filter = doc! { "repo_id": repo_id, "graph_build_id": graph_build_id, }; let cursor = self.edges.find(filter).await?; let edges: Vec = cursor.try_collect().await?; Ok(edges) } /// Get impact analysis for a finding pub async fn get_impact( &self, repo_id: &str, finding_id: &str, ) -> Result, CoreError> { let filter = doc! { "repo_id": repo_id, "finding_id": finding_id, }; let result = self.impacts.find_one(filter).await?; Ok(result) } /// Get nodes grouped by community pub async fn get_communities( &self, repo_id: &str, graph_build_id: &str, ) -> Result, CoreError> { let filter = doc! { "repo_id": repo_id, "graph_build_id": graph_build_id, }; let cursor = self.nodes.find(filter).await?; let nodes: Vec = cursor.try_collect().await?; let mut communities: std::collections::HashMap> = std::collections::HashMap::new(); for node in &nodes { if let Some(cid) = node.community_id { communities .entry(cid) .or_default() .push(node.qualified_name.clone()); } } let mut result: Vec = communities .into_iter() .map(|(id, members)| CommunityInfo { community_id: id, member_count: members.len() as u32, members, }) .collect(); result.sort_by_key(|c| c.community_id); Ok(result) } } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct CommunityInfo { pub community_id: u32, pub member_count: u32, pub members: Vec, }