All checks were successful
CI / Tests (push) Successful in 5m17s
CI / Detect Changes (push) Successful in 3s
CI / Deploy Agent (push) Successful in 3s
CI / Deploy Dashboard (push) Has been skipped
CI / Deploy Docs (push) Has been skipped
CI / Deploy MCP (push) Has been skipped
CI / Format (push) Successful in 4s
CI / Clippy (push) Successful in 4m38s
CI / Security Audit (push) Successful in 1m50s
Add repo_id, finding_id, and filter fields to tracing::instrument attributes for better trace correlation in SigNoz. Replace all silently swallowed errors (Err(_) => Vec::new()) with tracing::warn! logging across mod.rs, dast.rs, graph.rs handlers. Add stage-level spans with .instrument() to pipeline orchestrator for visibility into scan phases. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
358 lines
11 KiB
Rust
358 lines
11 KiB
Rust
use std::sync::Arc;
|
|
|
|
use axum::extract::{Extension, Path, Query};
|
|
use axum::http::StatusCode;
|
|
use axum::Json;
|
|
use mongodb::bson::doc;
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
use compliance_core::models::graph::{CodeEdge, CodeNode, GraphBuildRun, ImpactAnalysis};
|
|
|
|
use crate::agent::ComplianceAgent;
|
|
|
|
use super::{collect_cursor_async, ApiResponse};
|
|
|
|
type AgentExt = Extension<Arc<ComplianceAgent>>;
|
|
|
|
#[derive(Serialize)]
|
|
pub struct GraphData {
|
|
pub build: Option<GraphBuildRun>,
|
|
pub nodes: Vec<CodeNode>,
|
|
pub edges: Vec<CodeEdge>,
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
pub struct SearchParams {
|
|
pub q: String,
|
|
#[serde(default = "default_search_limit")]
|
|
pub limit: usize,
|
|
}
|
|
|
|
fn default_search_limit() -> usize {
|
|
50
|
|
}
|
|
|
|
/// GET /api/v1/graph/:repo_id — Full graph data
|
|
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
|
|
pub async fn get_graph(
|
|
Extension(agent): AgentExt,
|
|
Path(repo_id): Path<String>,
|
|
) -> Result<Json<ApiResponse<GraphData>>, StatusCode> {
|
|
let db = &agent.db;
|
|
|
|
// Get latest build
|
|
let build: Option<GraphBuildRun> = db
|
|
.graph_builds()
|
|
.find_one(doc! { "repo_id": &repo_id })
|
|
.sort(doc! { "started_at": -1 })
|
|
.await
|
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
|
|
|
let (nodes, edges) = if build.is_some() {
|
|
// Filter by repo_id only — delete_repo_graph clears old data before each rebuild,
|
|
// so there is only one set of nodes/edges per repo.
|
|
let filter = doc! { "repo_id": &repo_id };
|
|
|
|
let all_nodes: Vec<CodeNode> = match db.graph_nodes().find(filter.clone()).await {
|
|
Ok(cursor) => collect_cursor_async(cursor).await,
|
|
Err(e) => {
|
|
tracing::warn!("Failed to fetch graph nodes: {e}");
|
|
Vec::new()
|
|
}
|
|
};
|
|
let edges: Vec<CodeEdge> = match db.graph_edges().find(filter).await {
|
|
Ok(cursor) => collect_cursor_async(cursor).await,
|
|
Err(e) => {
|
|
tracing::warn!("Failed to fetch graph edges: {e}");
|
|
Vec::new()
|
|
}
|
|
};
|
|
|
|
// Remove disconnected nodes (no edges) to keep the graph clean
|
|
let connected: std::collections::HashSet<&str> = edges
|
|
.iter()
|
|
.flat_map(|e| [e.source.as_str(), e.target.as_str()])
|
|
.collect();
|
|
let nodes = all_nodes
|
|
.into_iter()
|
|
.filter(|n| connected.contains(n.qualified_name.as_str()))
|
|
.collect();
|
|
|
|
(nodes, edges)
|
|
} else {
|
|
(Vec::new(), Vec::new())
|
|
};
|
|
|
|
Ok(Json(ApiResponse {
|
|
data: GraphData {
|
|
build,
|
|
nodes,
|
|
edges,
|
|
},
|
|
total: None,
|
|
page: None,
|
|
}))
|
|
}
|
|
|
|
/// GET /api/v1/graph/:repo_id/nodes — List nodes (paginated)
|
|
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
|
|
pub async fn get_nodes(
|
|
Extension(agent): AgentExt,
|
|
Path(repo_id): Path<String>,
|
|
) -> Result<Json<ApiResponse<Vec<CodeNode>>>, StatusCode> {
|
|
let db = &agent.db;
|
|
let filter = doc! { "repo_id": &repo_id };
|
|
|
|
let nodes: Vec<CodeNode> = match db.graph_nodes().find(filter).await {
|
|
Ok(cursor) => collect_cursor_async(cursor).await,
|
|
Err(e) => {
|
|
tracing::warn!("Failed to fetch graph nodes: {e}");
|
|
Vec::new()
|
|
}
|
|
};
|
|
|
|
let total = nodes.len() as u64;
|
|
Ok(Json(ApiResponse {
|
|
data: nodes,
|
|
total: Some(total),
|
|
page: None,
|
|
}))
|
|
}
|
|
|
|
/// GET /api/v1/graph/:repo_id/communities — List detected communities
|
|
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
|
|
pub async fn get_communities(
|
|
Extension(agent): AgentExt,
|
|
Path(repo_id): Path<String>,
|
|
) -> Result<Json<ApiResponse<Vec<CommunityInfo>>>, StatusCode> {
|
|
let db = &agent.db;
|
|
let filter = doc! { "repo_id": &repo_id };
|
|
|
|
let nodes: Vec<CodeNode> = match db.graph_nodes().find(filter).await {
|
|
Ok(cursor) => collect_cursor_async(cursor).await,
|
|
Err(e) => {
|
|
tracing::warn!("Failed to fetch graph nodes for communities: {e}");
|
|
Vec::new()
|
|
}
|
|
};
|
|
|
|
let mut communities: std::collections::HashMap<u32, Vec<String>> =
|
|
std::collections::HashMap::new();
|
|
for node in &nodes {
|
|
if let Some(cid) = node.community_id {
|
|
communities
|
|
.entry(cid)
|
|
.or_default()
|
|
.push(node.qualified_name.clone());
|
|
}
|
|
}
|
|
|
|
let mut result: Vec<CommunityInfo> = communities
|
|
.into_iter()
|
|
.map(|(id, members)| CommunityInfo {
|
|
community_id: id,
|
|
member_count: members.len() as u32,
|
|
members,
|
|
})
|
|
.collect();
|
|
result.sort_by_key(|c| c.community_id);
|
|
|
|
let total = result.len() as u64;
|
|
Ok(Json(ApiResponse {
|
|
data: result,
|
|
total: Some(total),
|
|
page: None,
|
|
}))
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub struct CommunityInfo {
|
|
pub community_id: u32,
|
|
pub member_count: u32,
|
|
pub members: Vec<String>,
|
|
}
|
|
|
|
/// GET /api/v1/graph/:repo_id/impact/:finding_id — Impact analysis
|
|
#[tracing::instrument(skip_all, fields(repo_id = %repo_id, finding_id = %finding_id))]
|
|
pub async fn get_impact(
|
|
Extension(agent): AgentExt,
|
|
Path((repo_id, finding_id)): Path<(String, String)>,
|
|
) -> Result<Json<ApiResponse<Option<ImpactAnalysis>>>, StatusCode> {
|
|
let db = &agent.db;
|
|
let filter = doc! { "repo_id": &repo_id, "finding_id": &finding_id };
|
|
|
|
let impact = db
|
|
.impact_analyses()
|
|
.find_one(filter)
|
|
.await
|
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
|
|
|
Ok(Json(ApiResponse {
|
|
data: impact,
|
|
total: None,
|
|
page: None,
|
|
}))
|
|
}
|
|
|
|
/// GET /api/v1/graph/:repo_id/search — BM25 symbol search
|
|
#[tracing::instrument(skip_all, fields(repo_id = %repo_id, query = %params.q))]
|
|
pub async fn search_symbols(
|
|
Extension(agent): AgentExt,
|
|
Path(repo_id): Path<String>,
|
|
Query(params): Query<SearchParams>,
|
|
) -> Result<Json<ApiResponse<Vec<CodeNode>>>, StatusCode> {
|
|
let db = &agent.db;
|
|
|
|
// Simple text search on qualified_name and name fields
|
|
let filter = doc! {
|
|
"repo_id": &repo_id,
|
|
"name": { "$regex": ¶ms.q, "$options": "i" },
|
|
};
|
|
|
|
let nodes: Vec<CodeNode> = match db
|
|
.graph_nodes()
|
|
.find(filter)
|
|
.limit(params.limit as i64)
|
|
.await
|
|
{
|
|
Ok(cursor) => collect_cursor_async(cursor).await,
|
|
Err(e) => {
|
|
tracing::warn!("Failed to search graph nodes: {e}");
|
|
Vec::new()
|
|
}
|
|
};
|
|
|
|
let total = nodes.len() as u64;
|
|
Ok(Json(ApiResponse {
|
|
data: nodes,
|
|
total: Some(total),
|
|
page: None,
|
|
}))
|
|
}
|
|
|
|
/// GET /api/v1/graph/:repo_id/file-content — Read source file from cloned repo
|
|
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
|
|
pub async fn get_file_content(
|
|
Extension(agent): AgentExt,
|
|
Path(repo_id): Path<String>,
|
|
Query(params): Query<FileContentParams>,
|
|
) -> Result<Json<ApiResponse<FileContent>>, StatusCode> {
|
|
let db = &agent.db;
|
|
|
|
// Look up the repository to get repo name
|
|
let repo = db
|
|
.repositories()
|
|
.find_one(doc! { "_id": mongodb::bson::oid::ObjectId::parse_str(&repo_id).ok() })
|
|
.await
|
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
|
|
.ok_or(StatusCode::NOT_FOUND)?;
|
|
|
|
let base_path = std::path::Path::new(&agent.config.git_clone_base_path);
|
|
let file_path = base_path.join(&repo.name).join(¶ms.path);
|
|
|
|
// Security: ensure we don't escape the repo directory
|
|
let canonical = file_path
|
|
.canonicalize()
|
|
.map_err(|_| StatusCode::NOT_FOUND)?;
|
|
let base_canonical = base_path
|
|
.join(&repo.name)
|
|
.canonicalize()
|
|
.map_err(|_| StatusCode::NOT_FOUND)?;
|
|
if !canonical.starts_with(&base_canonical) {
|
|
return Err(StatusCode::FORBIDDEN);
|
|
}
|
|
|
|
let content = std::fs::read_to_string(&canonical).map_err(|_| StatusCode::NOT_FOUND)?;
|
|
|
|
// Cap at 10,000 lines
|
|
let truncated: String = content.lines().take(10_000).collect::<Vec<_>>().join("\n");
|
|
|
|
let language = params.path.rsplit('.').next().unwrap_or("").to_string();
|
|
|
|
Ok(Json(ApiResponse {
|
|
data: FileContent {
|
|
content: truncated,
|
|
path: params.path,
|
|
language,
|
|
},
|
|
total: None,
|
|
page: None,
|
|
}))
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
pub struct FileContentParams {
|
|
pub path: String,
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
pub struct FileContent {
|
|
pub content: String,
|
|
pub path: String,
|
|
pub language: String,
|
|
}
|
|
|
|
/// POST /api/v1/graph/:repo_id/build — Trigger graph rebuild
|
|
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
|
|
pub async fn trigger_build(
|
|
Extension(agent): AgentExt,
|
|
Path(repo_id): Path<String>,
|
|
) -> Result<Json<serde_json::Value>, StatusCode> {
|
|
let agent_clone = (*agent).clone();
|
|
tokio::spawn(async move {
|
|
let repo = match agent_clone
|
|
.db
|
|
.repositories()
|
|
.find_one(doc! { "_id": mongodb::bson::oid::ObjectId::parse_str(&repo_id).ok() })
|
|
.await
|
|
{
|
|
Ok(Some(r)) => r,
|
|
_ => {
|
|
tracing::error!("Repository {repo_id} not found for graph build");
|
|
return;
|
|
}
|
|
};
|
|
|
|
let creds = crate::pipeline::git::RepoCredentials {
|
|
ssh_key_path: Some(agent_clone.config.ssh_key_path.clone()),
|
|
auth_token: repo.auth_token.clone(),
|
|
auth_username: repo.auth_username.clone(),
|
|
};
|
|
let git_ops =
|
|
crate::pipeline::git::GitOps::new(&agent_clone.config.git_clone_base_path, creds);
|
|
let repo_path = match git_ops.clone_or_fetch(&repo.git_url, &repo.name) {
|
|
Ok(p) => p,
|
|
Err(e) => {
|
|
tracing::error!("Failed to clone repo for graph build: {e}");
|
|
return;
|
|
}
|
|
};
|
|
|
|
let graph_build_id = uuid::Uuid::new_v4().to_string();
|
|
let engine = compliance_graph::GraphEngine::new(50_000);
|
|
|
|
match engine.build_graph(&repo_path, &repo_id, &graph_build_id) {
|
|
Ok((code_graph, build_run)) => {
|
|
let store =
|
|
compliance_graph::graph::persistence::GraphStore::new(agent_clone.db.inner());
|
|
let _ = store.delete_repo_graph(&repo_id).await;
|
|
let _ = store
|
|
.store_graph(&build_run, &code_graph.nodes, &code_graph.edges)
|
|
.await;
|
|
tracing::info!(
|
|
"[{repo_id}] Graph rebuild complete: {} nodes, {} edges",
|
|
build_run.node_count,
|
|
build_run.edge_count
|
|
);
|
|
}
|
|
Err(e) => {
|
|
tracing::error!("[{repo_id}] Graph rebuild failed: {e}");
|
|
}
|
|
}
|
|
});
|
|
|
|
Ok(Json(
|
|
serde_json::json!({ "status": "graph_build_triggered" }),
|
|
))
|
|
}
|