Some checks failed
CI / Format (push) Successful in 2s
CI / Clippy (push) Failing after 1m23s
CI / Security Audit (push) Has been skipped
CI / Tests (push) Has been skipped
CI / Clippy (pull_request) Failing after 1m18s
CI / Security Audit (pull_request) Has been skipped
CI / Tests (pull_request) Has been skipped
CI / Format (pull_request) Successful in 3s
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
254 lines
7.4 KiB
Rust
254 lines
7.4 KiB
Rust
use compliance_core::error::CoreError;
|
|
use compliance_core::models::graph::{CodeEdge, CodeNode, GraphBuildRun, ImpactAnalysis};
|
|
use futures_util::TryStreamExt;
|
|
use mongodb::bson::doc;
|
|
use mongodb::options::IndexOptions;
|
|
use mongodb::{Collection, Database, IndexModel};
|
|
use tracing::info;
|
|
|
|
/// MongoDB persistence layer for the code knowledge graph
|
|
pub struct GraphStore {
|
|
nodes: Collection<CodeNode>,
|
|
edges: Collection<CodeEdge>,
|
|
builds: Collection<GraphBuildRun>,
|
|
impacts: Collection<ImpactAnalysis>,
|
|
}
|
|
|
|
impl GraphStore {
|
|
pub fn new(db: &Database) -> Self {
|
|
Self {
|
|
nodes: db.collection("graph_nodes"),
|
|
edges: db.collection("graph_edges"),
|
|
builds: db.collection("graph_builds"),
|
|
impacts: db.collection("impact_analyses"),
|
|
}
|
|
}
|
|
|
|
/// Ensure indexes are created
|
|
pub async fn ensure_indexes(&self) -> Result<(), CoreError> {
|
|
// graph_nodes: compound index on (repo_id, graph_build_id)
|
|
self.nodes
|
|
.create_index(
|
|
IndexModel::builder()
|
|
.keys(doc! { "repo_id": 1, "graph_build_id": 1 })
|
|
.build(),
|
|
)
|
|
.await?;
|
|
|
|
// graph_nodes: index on qualified_name for lookups
|
|
self.nodes
|
|
.create_index(
|
|
IndexModel::builder()
|
|
.keys(doc! { "qualified_name": 1 })
|
|
.build(),
|
|
)
|
|
.await?;
|
|
|
|
// graph_edges: compound index on (repo_id, graph_build_id)
|
|
self.edges
|
|
.create_index(
|
|
IndexModel::builder()
|
|
.keys(doc! { "repo_id": 1, "graph_build_id": 1 })
|
|
.build(),
|
|
)
|
|
.await?;
|
|
|
|
// graph_builds: compound index on (repo_id, started_at DESC)
|
|
self.builds
|
|
.create_index(
|
|
IndexModel::builder()
|
|
.keys(doc! { "repo_id": 1, "started_at": -1 })
|
|
.build(),
|
|
)
|
|
.await?;
|
|
|
|
// impact_analyses: compound index on (repo_id, finding_id)
|
|
self.impacts
|
|
.create_index(
|
|
IndexModel::builder()
|
|
.keys(doc! { "repo_id": 1, "finding_id": 1 })
|
|
.options(IndexOptions::builder().unique(true).build())
|
|
.build(),
|
|
)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Store a complete graph build result
|
|
pub async fn store_graph(
|
|
&self,
|
|
build_run: &GraphBuildRun,
|
|
nodes: &[CodeNode],
|
|
edges: &[CodeEdge],
|
|
) -> Result<String, CoreError> {
|
|
// Insert the build run
|
|
let result = self.builds.insert_one(build_run).await?;
|
|
let build_id = result
|
|
.inserted_id
|
|
.as_object_id()
|
|
.map(|oid| oid.to_hex())
|
|
.unwrap_or_default();
|
|
|
|
// Insert nodes in batches
|
|
if !nodes.is_empty() {
|
|
let batch_size = 1000;
|
|
for chunk in nodes.chunks(batch_size) {
|
|
self.nodes.insert_many(chunk.to_vec()).await?;
|
|
}
|
|
}
|
|
|
|
// Insert edges in batches
|
|
if !edges.is_empty() {
|
|
let batch_size = 1000;
|
|
for chunk in edges.chunks(batch_size) {
|
|
self.edges.insert_many(chunk.to_vec()).await?;
|
|
}
|
|
}
|
|
|
|
info!(
|
|
build_id = %build_id,
|
|
nodes = nodes.len(),
|
|
edges = edges.len(),
|
|
"Graph stored to MongoDB"
|
|
);
|
|
|
|
Ok(build_id)
|
|
}
|
|
|
|
/// Delete previous graph data for a repo before storing new graph
|
|
pub async fn delete_repo_graph(&self, repo_id: &str) -> Result<(), CoreError> {
|
|
let filter = doc! { "repo_id": repo_id };
|
|
self.nodes.delete_many(filter.clone()).await?;
|
|
self.edges.delete_many(filter.clone()).await?;
|
|
self.impacts.delete_many(filter).await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Store an impact analysis result
|
|
pub async fn store_impact(&self, impact: &ImpactAnalysis) -> Result<(), CoreError> {
|
|
let filter = doc! {
|
|
"repo_id": &impact.repo_id,
|
|
"finding_id": &impact.finding_id,
|
|
};
|
|
|
|
let opts = mongodb::options::ReplaceOptions::builder()
|
|
.upsert(true)
|
|
.build();
|
|
|
|
self.impacts
|
|
.replace_one(filter, impact)
|
|
.with_options(opts)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Get the latest graph build for a repo
|
|
pub async fn get_latest_build(
|
|
&self,
|
|
repo_id: &str,
|
|
) -> Result<Option<GraphBuildRun>, CoreError> {
|
|
let filter = doc! { "repo_id": repo_id };
|
|
let opts = mongodb::options::FindOneOptions::builder()
|
|
.sort(doc! { "started_at": -1 })
|
|
.build();
|
|
|
|
let result = self.builds.find_one(filter).with_options(opts).await?;
|
|
Ok(result)
|
|
}
|
|
|
|
/// Get all nodes for a repo's latest graph build
|
|
pub async fn get_nodes(
|
|
&self,
|
|
repo_id: &str,
|
|
graph_build_id: &str,
|
|
) -> Result<Vec<CodeNode>, CoreError> {
|
|
let filter = doc! {
|
|
"repo_id": repo_id,
|
|
"graph_build_id": graph_build_id,
|
|
};
|
|
|
|
let cursor = self.nodes.find(filter).await?;
|
|
let nodes: Vec<CodeNode> = cursor.try_collect().await?;
|
|
Ok(nodes)
|
|
}
|
|
|
|
/// Get all edges for a repo's latest graph build
|
|
pub async fn get_edges(
|
|
&self,
|
|
repo_id: &str,
|
|
graph_build_id: &str,
|
|
) -> Result<Vec<CodeEdge>, CoreError> {
|
|
let filter = doc! {
|
|
"repo_id": repo_id,
|
|
"graph_build_id": graph_build_id,
|
|
};
|
|
|
|
let cursor = self.edges.find(filter).await?;
|
|
let edges: Vec<CodeEdge> = cursor.try_collect().await?;
|
|
Ok(edges)
|
|
}
|
|
|
|
/// Get impact analysis for a finding
|
|
pub async fn get_impact(
|
|
&self,
|
|
repo_id: &str,
|
|
finding_id: &str,
|
|
) -> Result<Option<ImpactAnalysis>, CoreError> {
|
|
let filter = doc! {
|
|
"repo_id": repo_id,
|
|
"finding_id": finding_id,
|
|
};
|
|
|
|
let result = self.impacts.find_one(filter).await?;
|
|
Ok(result)
|
|
}
|
|
|
|
/// Get nodes grouped by community
|
|
pub async fn get_communities(
|
|
&self,
|
|
repo_id: &str,
|
|
graph_build_id: &str,
|
|
) -> Result<Vec<CommunityInfo>, CoreError> {
|
|
let filter = doc! {
|
|
"repo_id": repo_id,
|
|
"graph_build_id": graph_build_id,
|
|
};
|
|
|
|
let cursor = self.nodes.find(filter).await?;
|
|
let nodes: Vec<CodeNode> = cursor.try_collect().await?;
|
|
|
|
let mut communities: std::collections::HashMap<u32, Vec<String>> =
|
|
std::collections::HashMap::new();
|
|
|
|
for node in &nodes {
|
|
if let Some(cid) = node.community_id {
|
|
communities
|
|
.entry(cid)
|
|
.or_default()
|
|
.push(node.qualified_name.clone());
|
|
}
|
|
}
|
|
|
|
let mut result: Vec<CommunityInfo> = communities
|
|
.into_iter()
|
|
.map(|(id, members)| CommunityInfo {
|
|
community_id: id,
|
|
member_count: members.len() as u32,
|
|
members,
|
|
})
|
|
.collect();
|
|
|
|
result.sort_by_key(|c| c.community_id);
|
|
Ok(result)
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
|
pub struct CommunityInfo {
|
|
pub community_id: u32,
|
|
pub member_count: u32,
|
|
pub members: Vec<String>,
|
|
}
|