Add DAST, graph modules, toast notifications, and dashboard enhancements
Add DAST scanning and code knowledge graph features across the stack: - compliance-dast and compliance-graph workspace crates - Agent API handlers and routes for DAST targets/scans and graph builds - Core models and traits for DAST and graph domains - Dashboard pages for DAST targets/findings/overview and graph explorer/impact - Toast notification system with auto-dismiss for async action feedback - Button click animations and disabled states for better UX Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -8,6 +8,8 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
compliance-core = { workspace = true, features = ["mongodb"] }
|
||||
compliance-graph = { path = "../compliance-graph" }
|
||||
compliance-dast = { path = "../compliance-dast" }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
|
||||
226
compliance-agent/src/api/handlers/dast.rs
Normal file
226
compliance-agent/src/api/handlers/dast.rs
Normal file
@@ -0,0 +1,226 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::extract::{Extension, Path, Query};
|
||||
use axum::http::StatusCode;
|
||||
use axum::Json;
|
||||
use mongodb::bson::doc;
|
||||
use serde::Deserialize;
|
||||
|
||||
use compliance_core::models::dast::{DastFinding, DastScanRun, DastTarget, DastTargetType};
|
||||
|
||||
use crate::agent::ComplianceAgent;
|
||||
|
||||
use super::{collect_cursor_async, ApiResponse, PaginationParams};
|
||||
|
||||
type AgentExt = Extension<Arc<ComplianceAgent>>;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct AddTargetRequest {
|
||||
pub name: String,
|
||||
pub base_url: String,
|
||||
#[serde(default = "default_target_type")]
|
||||
pub target_type: DastTargetType,
|
||||
pub repo_id: Option<String>,
|
||||
#[serde(default)]
|
||||
pub excluded_paths: Vec<String>,
|
||||
#[serde(default = "default_crawl_depth")]
|
||||
pub max_crawl_depth: u32,
|
||||
#[serde(default = "default_rate_limit")]
|
||||
pub rate_limit: u32,
|
||||
#[serde(default)]
|
||||
pub allow_destructive: bool,
|
||||
}
|
||||
|
||||
fn default_target_type() -> DastTargetType {
|
||||
DastTargetType::WebApp
|
||||
}
|
||||
fn default_crawl_depth() -> u32 {
|
||||
5
|
||||
}
|
||||
fn default_rate_limit() -> u32 {
|
||||
10
|
||||
}
|
||||
|
||||
/// GET /api/v1/dast/targets — List DAST targets
|
||||
pub async fn list_targets(
|
||||
Extension(agent): AgentExt,
|
||||
Query(params): Query<PaginationParams>,
|
||||
) -> Result<Json<ApiResponse<Vec<DastTarget>>>, StatusCode> {
|
||||
let db = &agent.db;
|
||||
let skip = (params.page.saturating_sub(1)) * params.limit as u64;
|
||||
let total = db
|
||||
.dast_targets()
|
||||
.count_documents(doc! {})
|
||||
.await
|
||||
.unwrap_or(0);
|
||||
|
||||
let targets = match db
|
||||
.dast_targets()
|
||||
.find(doc! {})
|
||||
.skip(skip)
|
||||
.limit(params.limit)
|
||||
.await
|
||||
{
|
||||
Ok(cursor) => collect_cursor_async(cursor).await,
|
||||
Err(_) => Vec::new(),
|
||||
};
|
||||
|
||||
Ok(Json(ApiResponse {
|
||||
data: targets,
|
||||
total: Some(total),
|
||||
page: Some(params.page),
|
||||
}))
|
||||
}
|
||||
|
||||
/// POST /api/v1/dast/targets — Add a new DAST target
|
||||
pub async fn add_target(
|
||||
Extension(agent): AgentExt,
|
||||
Json(req): Json<AddTargetRequest>,
|
||||
) -> Result<Json<ApiResponse<DastTarget>>, StatusCode> {
|
||||
let mut target = DastTarget::new(req.name, req.base_url, req.target_type);
|
||||
target.repo_id = req.repo_id;
|
||||
target.excluded_paths = req.excluded_paths;
|
||||
target.max_crawl_depth = req.max_crawl_depth;
|
||||
target.rate_limit = req.rate_limit;
|
||||
target.allow_destructive = req.allow_destructive;
|
||||
|
||||
agent
|
||||
.db
|
||||
.dast_targets()
|
||||
.insert_one(&target)
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
Ok(Json(ApiResponse {
|
||||
data: target,
|
||||
total: None,
|
||||
page: None,
|
||||
}))
|
||||
}
|
||||
|
||||
/// POST /api/v1/dast/targets/:id/scan — Trigger DAST scan
|
||||
pub async fn trigger_scan(
|
||||
Extension(agent): AgentExt,
|
||||
Path(id): Path<String>,
|
||||
) -> Result<Json<serde_json::Value>, StatusCode> {
|
||||
let oid =
|
||||
mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
|
||||
|
||||
let target = agent
|
||||
.db
|
||||
.dast_targets()
|
||||
.find_one(doc! { "_id": oid })
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
|
||||
.ok_or(StatusCode::NOT_FOUND)?;
|
||||
|
||||
let db = agent.db.clone();
|
||||
tokio::spawn(async move {
|
||||
let orchestrator = compliance_dast::DastOrchestrator::new(100);
|
||||
match orchestrator.run_scan(&target, Vec::new()).await {
|
||||
Ok((scan_run, findings)) => {
|
||||
if let Err(e) = db.dast_scan_runs().insert_one(&scan_run).await {
|
||||
tracing::error!("Failed to store DAST scan run: {e}");
|
||||
}
|
||||
for finding in &findings {
|
||||
if let Err(e) = db.dast_findings().insert_one(finding).await {
|
||||
tracing::error!("Failed to store DAST finding: {e}");
|
||||
}
|
||||
}
|
||||
tracing::info!("DAST scan complete: {} findings", findings.len());
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("DAST scan failed: {e}");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Json(serde_json::json!({ "status": "dast_scan_triggered" })))
|
||||
}
|
||||
|
||||
/// GET /api/v1/dast/scan-runs — List DAST scan runs
|
||||
pub async fn list_scan_runs(
|
||||
Extension(agent): AgentExt,
|
||||
Query(params): Query<PaginationParams>,
|
||||
) -> Result<Json<ApiResponse<Vec<DastScanRun>>>, StatusCode> {
|
||||
let db = &agent.db;
|
||||
let skip = (params.page.saturating_sub(1)) * params.limit as u64;
|
||||
let total = db
|
||||
.dast_scan_runs()
|
||||
.count_documents(doc! {})
|
||||
.await
|
||||
.unwrap_or(0);
|
||||
|
||||
let runs = match db
|
||||
.dast_scan_runs()
|
||||
.find(doc! {})
|
||||
.sort(doc! { "started_at": -1 })
|
||||
.skip(skip)
|
||||
.limit(params.limit)
|
||||
.await
|
||||
{
|
||||
Ok(cursor) => collect_cursor_async(cursor).await,
|
||||
Err(_) => Vec::new(),
|
||||
};
|
||||
|
||||
Ok(Json(ApiResponse {
|
||||
data: runs,
|
||||
total: Some(total),
|
||||
page: Some(params.page),
|
||||
}))
|
||||
}
|
||||
|
||||
/// GET /api/v1/dast/findings — List DAST findings
|
||||
pub async fn list_findings(
|
||||
Extension(agent): AgentExt,
|
||||
Query(params): Query<PaginationParams>,
|
||||
) -> Result<Json<ApiResponse<Vec<DastFinding>>>, StatusCode> {
|
||||
let db = &agent.db;
|
||||
let skip = (params.page.saturating_sub(1)) * params.limit as u64;
|
||||
let total = db
|
||||
.dast_findings()
|
||||
.count_documents(doc! {})
|
||||
.await
|
||||
.unwrap_or(0);
|
||||
|
||||
let findings = match db
|
||||
.dast_findings()
|
||||
.find(doc! {})
|
||||
.sort(doc! { "created_at": -1 })
|
||||
.skip(skip)
|
||||
.limit(params.limit)
|
||||
.await
|
||||
{
|
||||
Ok(cursor) => collect_cursor_async(cursor).await,
|
||||
Err(_) => Vec::new(),
|
||||
};
|
||||
|
||||
Ok(Json(ApiResponse {
|
||||
data: findings,
|
||||
total: Some(total),
|
||||
page: Some(params.page),
|
||||
}))
|
||||
}
|
||||
|
||||
/// GET /api/v1/dast/findings/:id — Finding detail with evidence
|
||||
pub async fn get_finding(
|
||||
Extension(agent): AgentExt,
|
||||
Path(id): Path<String>,
|
||||
) -> Result<Json<ApiResponse<DastFinding>>, StatusCode> {
|
||||
let oid =
|
||||
mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
|
||||
|
||||
let finding = agent
|
||||
.db
|
||||
.dast_findings()
|
||||
.find_one(doc! { "_id": oid })
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
|
||||
.ok_or(StatusCode::NOT_FOUND)?;
|
||||
|
||||
Ok(Json(ApiResponse {
|
||||
data: finding,
|
||||
total: None,
|
||||
page: None,
|
||||
}))
|
||||
}
|
||||
256
compliance-agent/src/api/handlers/graph.rs
Normal file
256
compliance-agent/src/api/handlers/graph.rs
Normal file
@@ -0,0 +1,256 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::extract::{Extension, Path, Query};
|
||||
use axum::http::StatusCode;
|
||||
use axum::Json;
|
||||
use mongodb::bson::doc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use compliance_core::models::graph::{CodeEdge, CodeNode, GraphBuildRun, ImpactAnalysis};
|
||||
|
||||
use crate::agent::ComplianceAgent;
|
||||
|
||||
use super::{collect_cursor_async, ApiResponse};
|
||||
|
||||
type AgentExt = Extension<Arc<ComplianceAgent>>;
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct GraphData {
|
||||
pub build: Option<GraphBuildRun>,
|
||||
pub nodes: Vec<CodeNode>,
|
||||
pub edges: Vec<CodeEdge>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct SearchParams {
|
||||
pub q: String,
|
||||
#[serde(default = "default_search_limit")]
|
||||
pub limit: usize,
|
||||
}
|
||||
|
||||
fn default_search_limit() -> usize {
|
||||
50
|
||||
}
|
||||
|
||||
/// GET /api/v1/graph/:repo_id — Full graph data
|
||||
pub async fn get_graph(
|
||||
Extension(agent): AgentExt,
|
||||
Path(repo_id): Path<String>,
|
||||
) -> Result<Json<ApiResponse<GraphData>>, StatusCode> {
|
||||
let db = &agent.db;
|
||||
|
||||
// Get latest build
|
||||
let build: Option<GraphBuildRun> = db
|
||||
.graph_builds()
|
||||
.find_one(doc! { "repo_id": &repo_id })
|
||||
.sort(doc! { "started_at": -1 })
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
let (nodes, edges) = if let Some(ref b) = build {
|
||||
let build_id = b.id.map(|oid| oid.to_hex()).unwrap_or_default();
|
||||
let filter = doc! { "repo_id": &repo_id, "graph_build_id": &build_id };
|
||||
|
||||
let nodes: Vec<CodeNode> = match db.graph_nodes().find(filter.clone()).await {
|
||||
Ok(cursor) => collect_cursor_async(cursor).await,
|
||||
Err(_) => Vec::new(),
|
||||
};
|
||||
let edges: Vec<CodeEdge> = match db.graph_edges().find(filter).await {
|
||||
Ok(cursor) => collect_cursor_async(cursor).await,
|
||||
Err(_) => Vec::new(),
|
||||
};
|
||||
(nodes, edges)
|
||||
} else {
|
||||
(Vec::new(), Vec::new())
|
||||
};
|
||||
|
||||
Ok(Json(ApiResponse {
|
||||
data: GraphData {
|
||||
build,
|
||||
nodes,
|
||||
edges,
|
||||
},
|
||||
total: None,
|
||||
page: None,
|
||||
}))
|
||||
}
|
||||
|
||||
/// GET /api/v1/graph/:repo_id/nodes — List nodes (paginated)
|
||||
pub async fn get_nodes(
|
||||
Extension(agent): AgentExt,
|
||||
Path(repo_id): Path<String>,
|
||||
) -> Result<Json<ApiResponse<Vec<CodeNode>>>, StatusCode> {
|
||||
let db = &agent.db;
|
||||
let filter = doc! { "repo_id": &repo_id };
|
||||
|
||||
let nodes: Vec<CodeNode> = match db.graph_nodes().find(filter).await {
|
||||
Ok(cursor) => collect_cursor_async(cursor).await,
|
||||
Err(_) => Vec::new(),
|
||||
};
|
||||
|
||||
let total = nodes.len() as u64;
|
||||
Ok(Json(ApiResponse {
|
||||
data: nodes,
|
||||
total: Some(total),
|
||||
page: None,
|
||||
}))
|
||||
}
|
||||
|
||||
/// GET /api/v1/graph/:repo_id/communities — List detected communities
|
||||
pub async fn get_communities(
|
||||
Extension(agent): AgentExt,
|
||||
Path(repo_id): Path<String>,
|
||||
) -> Result<Json<ApiResponse<Vec<CommunityInfo>>>, StatusCode> {
|
||||
let db = &agent.db;
|
||||
let filter = doc! { "repo_id": &repo_id };
|
||||
|
||||
let nodes: Vec<CodeNode> = match db.graph_nodes().find(filter).await {
|
||||
Ok(cursor) => collect_cursor_async(cursor).await,
|
||||
Err(_) => Vec::new(),
|
||||
};
|
||||
|
||||
let mut communities: std::collections::HashMap<u32, Vec<String>> =
|
||||
std::collections::HashMap::new();
|
||||
for node in &nodes {
|
||||
if let Some(cid) = node.community_id {
|
||||
communities
|
||||
.entry(cid)
|
||||
.or_default()
|
||||
.push(node.qualified_name.clone());
|
||||
}
|
||||
}
|
||||
|
||||
let mut result: Vec<CommunityInfo> = communities
|
||||
.into_iter()
|
||||
.map(|(id, members)| CommunityInfo {
|
||||
community_id: id,
|
||||
member_count: members.len() as u32,
|
||||
members,
|
||||
})
|
||||
.collect();
|
||||
result.sort_by_key(|c| c.community_id);
|
||||
|
||||
let total = result.len() as u64;
|
||||
Ok(Json(ApiResponse {
|
||||
data: result,
|
||||
total: Some(total),
|
||||
page: None,
|
||||
}))
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct CommunityInfo {
|
||||
pub community_id: u32,
|
||||
pub member_count: u32,
|
||||
pub members: Vec<String>,
|
||||
}
|
||||
|
||||
/// GET /api/v1/graph/:repo_id/impact/:finding_id — Impact analysis
|
||||
pub async fn get_impact(
|
||||
Extension(agent): AgentExt,
|
||||
Path((repo_id, finding_id)): Path<(String, String)>,
|
||||
) -> Result<Json<ApiResponse<Option<ImpactAnalysis>>>, StatusCode> {
|
||||
let db = &agent.db;
|
||||
let filter = doc! { "repo_id": &repo_id, "finding_id": &finding_id };
|
||||
|
||||
let impact = db
|
||||
.impact_analyses()
|
||||
.find_one(filter)
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
Ok(Json(ApiResponse {
|
||||
data: impact,
|
||||
total: None,
|
||||
page: None,
|
||||
}))
|
||||
}
|
||||
|
||||
/// GET /api/v1/graph/:repo_id/search — BM25 symbol search
|
||||
pub async fn search_symbols(
|
||||
Extension(agent): AgentExt,
|
||||
Path(repo_id): Path<String>,
|
||||
Query(params): Query<SearchParams>,
|
||||
) -> Result<Json<ApiResponse<Vec<CodeNode>>>, StatusCode> {
|
||||
let db = &agent.db;
|
||||
|
||||
// Simple text search on qualified_name and name fields
|
||||
let filter = doc! {
|
||||
"repo_id": &repo_id,
|
||||
"name": { "$regex": ¶ms.q, "$options": "i" },
|
||||
};
|
||||
|
||||
let nodes: Vec<CodeNode> = match db
|
||||
.graph_nodes()
|
||||
.find(filter)
|
||||
.limit(params.limit as i64)
|
||||
.await
|
||||
{
|
||||
Ok(cursor) => collect_cursor_async(cursor).await,
|
||||
Err(_) => Vec::new(),
|
||||
};
|
||||
|
||||
let total = nodes.len() as u64;
|
||||
Ok(Json(ApiResponse {
|
||||
data: nodes,
|
||||
total: Some(total),
|
||||
page: None,
|
||||
}))
|
||||
}
|
||||
|
||||
/// POST /api/v1/graph/:repo_id/build — Trigger graph rebuild
|
||||
pub async fn trigger_build(
|
||||
Extension(agent): AgentExt,
|
||||
Path(repo_id): Path<String>,
|
||||
) -> Result<Json<serde_json::Value>, StatusCode> {
|
||||
let agent_clone = (*agent).clone();
|
||||
tokio::spawn(async move {
|
||||
let repo = match agent_clone
|
||||
.db
|
||||
.repositories()
|
||||
.find_one(doc! { "_id": mongodb::bson::oid::ObjectId::parse_str(&repo_id).ok() })
|
||||
.await
|
||||
{
|
||||
Ok(Some(r)) => r,
|
||||
_ => {
|
||||
tracing::error!("Repository {repo_id} not found for graph build");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let git_ops = crate::pipeline::git::GitOps::new(&agent_clone.config.git_clone_base_path);
|
||||
let repo_path = match git_ops.clone_or_fetch(&repo.git_url, &repo.name) {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to clone repo for graph build: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let graph_build_id = uuid::Uuid::new_v4().to_string();
|
||||
let engine = compliance_graph::GraphEngine::new(50_000);
|
||||
|
||||
match engine.build_graph(&repo_path, &repo_id, &graph_build_id) {
|
||||
Ok((code_graph, build_run)) => {
|
||||
let store =
|
||||
compliance_graph::graph::persistence::GraphStore::new(agent_clone.db.inner());
|
||||
let _ = store.delete_repo_graph(&repo_id).await;
|
||||
let _ = store
|
||||
.store_graph(&build_run, &code_graph.nodes, &code_graph.edges)
|
||||
.await;
|
||||
tracing::info!(
|
||||
"[{repo_id}] Graph rebuild complete: {} nodes, {} edges",
|
||||
build_run.node_count,
|
||||
build_run.edge_count
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("[{repo_id}] Graph rebuild failed: {e}");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Json(
|
||||
serde_json::json!({ "status": "graph_build_triggered" }),
|
||||
))
|
||||
}
|
||||
@@ -1,3 +1,6 @@
|
||||
pub mod dast;
|
||||
pub mod graph;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
#[allow(unused_imports)]
|
||||
@@ -410,8 +413,11 @@ async fn collect_cursor_async<T: serde::de::DeserializeOwned + Unpin + Send>(
|
||||
) -> Vec<T> {
|
||||
use futures_util::StreamExt;
|
||||
let mut items = Vec::new();
|
||||
while let Some(Ok(item)) = cursor.next().await {
|
||||
items.push(item);
|
||||
while let Some(result) = cursor.next().await {
|
||||
match result {
|
||||
Ok(item) => items.push(item),
|
||||
Err(e) => tracing::warn!("Failed to deserialize document: {e}"),
|
||||
}
|
||||
}
|
||||
items
|
||||
}
|
||||
|
||||
@@ -22,4 +22,54 @@ pub fn build_router() -> Router {
|
||||
.route("/api/v1/sbom", get(handlers::list_sbom))
|
||||
.route("/api/v1/issues", get(handlers::list_issues))
|
||||
.route("/api/v1/scan-runs", get(handlers::list_scan_runs))
|
||||
// Graph API endpoints
|
||||
.route(
|
||||
"/api/v1/graph/{repo_id}",
|
||||
get(handlers::graph::get_graph),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/graph/{repo_id}/nodes",
|
||||
get(handlers::graph::get_nodes),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/graph/{repo_id}/communities",
|
||||
get(handlers::graph::get_communities),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/graph/{repo_id}/impact/{finding_id}",
|
||||
get(handlers::graph::get_impact),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/graph/{repo_id}/search",
|
||||
get(handlers::graph::search_symbols),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/graph/{repo_id}/build",
|
||||
post(handlers::graph::trigger_build),
|
||||
)
|
||||
// DAST API endpoints
|
||||
.route(
|
||||
"/api/v1/dast/targets",
|
||||
get(handlers::dast::list_targets),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/dast/targets",
|
||||
post(handlers::dast::add_target),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/dast/targets/{id}/scan",
|
||||
post(handlers::dast::trigger_scan),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/dast/scan-runs",
|
||||
get(handlers::dast::list_scan_runs),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/dast/findings",
|
||||
get(handlers::dast::list_findings),
|
||||
)
|
||||
.route(
|
||||
"/api/v1/dast/findings/{id}",
|
||||
get(handlers::dast::get_finding),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -88,6 +88,70 @@ impl Database {
|
||||
)
|
||||
.await?;
|
||||
|
||||
// graph_nodes: compound (repo_id, graph_build_id)
|
||||
self.graph_nodes()
|
||||
.create_index(
|
||||
IndexModel::builder()
|
||||
.keys(doc! { "repo_id": 1, "graph_build_id": 1 })
|
||||
.build(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// graph_edges: compound (repo_id, graph_build_id)
|
||||
self.graph_edges()
|
||||
.create_index(
|
||||
IndexModel::builder()
|
||||
.keys(doc! { "repo_id": 1, "graph_build_id": 1 })
|
||||
.build(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// graph_builds: compound (repo_id, started_at DESC)
|
||||
self.graph_builds()
|
||||
.create_index(
|
||||
IndexModel::builder()
|
||||
.keys(doc! { "repo_id": 1, "started_at": -1 })
|
||||
.build(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// impact_analyses: unique (repo_id, finding_id)
|
||||
self.impact_analyses()
|
||||
.create_index(
|
||||
IndexModel::builder()
|
||||
.keys(doc! { "repo_id": 1, "finding_id": 1 })
|
||||
.options(IndexOptions::builder().unique(true).build())
|
||||
.build(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// dast_targets: index on repo_id
|
||||
self.dast_targets()
|
||||
.create_index(
|
||||
IndexModel::builder()
|
||||
.keys(doc! { "repo_id": 1 })
|
||||
.build(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// dast_scan_runs: compound (target_id, started_at DESC)
|
||||
self.dast_scan_runs()
|
||||
.create_index(
|
||||
IndexModel::builder()
|
||||
.keys(doc! { "target_id": 1, "started_at": -1 })
|
||||
.build(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// dast_findings: compound (scan_run_id, vuln_type)
|
||||
self.dast_findings()
|
||||
.create_index(
|
||||
IndexModel::builder()
|
||||
.keys(doc! { "scan_run_id": 1, "vuln_type": 1 })
|
||||
.build(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
tracing::info!("Database indexes ensured");
|
||||
Ok(())
|
||||
}
|
||||
@@ -116,8 +180,43 @@ impl Database {
|
||||
self.inner.collection("tracker_issues")
|
||||
}
|
||||
|
||||
// Graph collections
|
||||
pub fn graph_nodes(&self) -> Collection<compliance_core::models::graph::CodeNode> {
|
||||
self.inner.collection("graph_nodes")
|
||||
}
|
||||
|
||||
pub fn graph_edges(&self) -> Collection<compliance_core::models::graph::CodeEdge> {
|
||||
self.inner.collection("graph_edges")
|
||||
}
|
||||
|
||||
pub fn graph_builds(&self) -> Collection<compliance_core::models::graph::GraphBuildRun> {
|
||||
self.inner.collection("graph_builds")
|
||||
}
|
||||
|
||||
pub fn impact_analyses(&self) -> Collection<compliance_core::models::graph::ImpactAnalysis> {
|
||||
self.inner.collection("impact_analyses")
|
||||
}
|
||||
|
||||
// DAST collections
|
||||
pub fn dast_targets(&self) -> Collection<DastTarget> {
|
||||
self.inner.collection("dast_targets")
|
||||
}
|
||||
|
||||
pub fn dast_scan_runs(&self) -> Collection<DastScanRun> {
|
||||
self.inner.collection("dast_scan_runs")
|
||||
}
|
||||
|
||||
pub fn dast_findings(&self) -> Collection<DastFinding> {
|
||||
self.inner.collection("dast_findings")
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn raw_collection(&self, name: &str) -> Collection<mongodb::bson::Document> {
|
||||
self.inner.collection(name)
|
||||
}
|
||||
|
||||
/// Get the raw MongoDB database handle (for graph persistence)
|
||||
pub fn inner(&self) -> &mongodb::Database {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::sync::Arc;
|
||||
use compliance_core::models::{Finding, FindingStatus};
|
||||
|
||||
use crate::llm::LlmClient;
|
||||
use crate::pipeline::orchestrator::GraphContext;
|
||||
|
||||
const TRIAGE_SYSTEM_PROMPT: &str = r#"You are a security finding triage expert. Analyze the following security finding and determine:
|
||||
1. Is this a true positive? (yes/no)
|
||||
@@ -12,11 +13,15 @@ const TRIAGE_SYSTEM_PROMPT: &str = r#"You are a security finding triage expert.
|
||||
Respond in JSON format:
|
||||
{"true_positive": true/false, "confidence": N, "remediation": "..."}"#;
|
||||
|
||||
pub async fn triage_findings(llm: &Arc<LlmClient>, findings: &mut Vec<Finding>) -> usize {
|
||||
pub async fn triage_findings(
|
||||
llm: &Arc<LlmClient>,
|
||||
findings: &mut Vec<Finding>,
|
||||
graph_context: Option<&GraphContext>,
|
||||
) -> usize {
|
||||
let mut passed = 0;
|
||||
|
||||
for finding in findings.iter_mut() {
|
||||
let user_prompt = format!(
|
||||
let mut user_prompt = format!(
|
||||
"Scanner: {}\nRule: {}\nSeverity: {}\nTitle: {}\nDescription: {}\nFile: {}\nLine: {}\nCode: {}",
|
||||
finding.scanner,
|
||||
finding.rule_id.as_deref().unwrap_or("N/A"),
|
||||
@@ -28,6 +33,37 @@ pub async fn triage_findings(llm: &Arc<LlmClient>, findings: &mut Vec<Finding>)
|
||||
finding.code_snippet.as_deref().unwrap_or("N/A"),
|
||||
);
|
||||
|
||||
// Enrich with graph context if available
|
||||
if let Some(ctx) = graph_context {
|
||||
if let Some(impact) = ctx
|
||||
.impacts
|
||||
.iter()
|
||||
.find(|i| i.finding_id == finding.fingerprint)
|
||||
{
|
||||
user_prompt.push_str(&format!(
|
||||
"\n\n--- Code Graph Context ---\n\
|
||||
Blast radius: {} nodes affected\n\
|
||||
Entry points affected: {}\n\
|
||||
Direct callers: {}\n\
|
||||
Communities affected: {}\n\
|
||||
Call chains: {}",
|
||||
impact.blast_radius,
|
||||
if impact.affected_entry_points.is_empty() {
|
||||
"none".to_string()
|
||||
} else {
|
||||
impact.affected_entry_points.join(", ")
|
||||
},
|
||||
if impact.direct_callers.is_empty() {
|
||||
"none".to_string()
|
||||
} else {
|
||||
impact.direct_callers.join(", ")
|
||||
},
|
||||
impact.affected_communities.len(),
|
||||
impact.call_chains.len(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
match llm
|
||||
.chat(TRIAGE_SYSTEM_PROMPT, &user_prompt, Some(0.1))
|
||||
.await
|
||||
|
||||
@@ -15,6 +15,16 @@ use crate::pipeline::patterns::{GdprPatternScanner, OAuthPatternScanner};
|
||||
use crate::pipeline::sbom::SbomScanner;
|
||||
use crate::pipeline::semgrep::SemgrepScanner;
|
||||
|
||||
/// Context from graph analysis passed to LLM triage for enhanced filtering
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub struct GraphContext {
|
||||
pub node_count: u32,
|
||||
pub edge_count: u32,
|
||||
pub community_count: u32,
|
||||
pub impacts: Vec<compliance_core::models::graph::ImpactAnalysis>,
|
||||
}
|
||||
|
||||
pub struct PipelineOrchestrator {
|
||||
config: AgentConfig,
|
||||
db: Database,
|
||||
@@ -172,13 +182,30 @@ impl PipelineOrchestrator {
|
||||
Err(e) => tracing::warn!("[{repo_id}] OAuth pattern scan failed: {e}"),
|
||||
}
|
||||
|
||||
// Stage 5: LLM Triage
|
||||
// Stage 4.5: Graph Building
|
||||
tracing::info!("[{repo_id}] Stage 4.5: Graph Building");
|
||||
self.update_phase(scan_run_id, "graph_building").await;
|
||||
let graph_context = match self.build_code_graph(&repo_path, &repo_id, &all_findings).await
|
||||
{
|
||||
Ok(ctx) => Some(ctx),
|
||||
Err(e) => {
|
||||
tracing::warn!("[{repo_id}] Graph building failed: {e}");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// Stage 5: LLM Triage (enhanced with graph context)
|
||||
tracing::info!(
|
||||
"[{repo_id}] Stage 5: LLM Triage ({} findings)",
|
||||
all_findings.len()
|
||||
);
|
||||
self.update_phase(scan_run_id, "llm_triage").await;
|
||||
let triaged = crate::llm::triage::triage_findings(&self.llm, &mut all_findings).await;
|
||||
let triaged = crate::llm::triage::triage_findings(
|
||||
&self.llm,
|
||||
&mut all_findings,
|
||||
graph_context.as_ref(),
|
||||
)
|
||||
.await;
|
||||
tracing::info!("[{repo_id}] Triaged: {triaged} findings passed confidence threshold");
|
||||
|
||||
// Dedup against existing findings and insert new ones
|
||||
@@ -250,10 +277,121 @@ impl PipelineOrchestrator {
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Stage 8: DAST (async, optional — only if a DastTarget is configured)
|
||||
tracing::info!("[{repo_id}] Stage 8: Checking for DAST targets");
|
||||
self.update_phase(scan_run_id, "dast_scanning").await;
|
||||
self.maybe_trigger_dast(&repo_id, scan_run_id).await;
|
||||
|
||||
tracing::info!("[{repo_id}] Scan complete: {new_count} new findings");
|
||||
Ok(new_count)
|
||||
}
|
||||
|
||||
/// Build the code knowledge graph for a repo and compute impact analyses
|
||||
async fn build_code_graph(
|
||||
&self,
|
||||
repo_path: &std::path::Path,
|
||||
repo_id: &str,
|
||||
findings: &[Finding],
|
||||
) -> Result<GraphContext, AgentError> {
|
||||
let graph_build_id = uuid::Uuid::new_v4().to_string();
|
||||
let engine = compliance_graph::GraphEngine::new(50_000);
|
||||
|
||||
let (mut code_graph, build_run) = engine
|
||||
.build_graph(repo_path, repo_id, &graph_build_id)
|
||||
.map_err(|e| AgentError::Other(format!("Graph build error: {e}")))?;
|
||||
|
||||
// Apply community detection
|
||||
compliance_graph::graph::community::apply_communities(&mut code_graph);
|
||||
|
||||
// Store graph in MongoDB
|
||||
let store = compliance_graph::graph::persistence::GraphStore::new(self.db.inner());
|
||||
store
|
||||
.delete_repo_graph(repo_id)
|
||||
.await
|
||||
.map_err(|e| AgentError::Other(format!("Graph cleanup error: {e}")))?;
|
||||
store
|
||||
.store_graph(&build_run, &code_graph.nodes, &code_graph.edges)
|
||||
.await
|
||||
.map_err(|e| AgentError::Other(format!("Graph store error: {e}")))?;
|
||||
|
||||
// Compute impact analysis for each finding
|
||||
let analyzer = compliance_graph::GraphEngine::impact_analyzer(&code_graph);
|
||||
let mut impacts = Vec::new();
|
||||
|
||||
for finding in findings {
|
||||
if let Some(file_path) = &finding.file_path {
|
||||
let impact = analyzer.analyze(
|
||||
repo_id,
|
||||
&finding.fingerprint,
|
||||
&graph_build_id,
|
||||
file_path,
|
||||
finding.line_number,
|
||||
);
|
||||
store
|
||||
.store_impact(&impact)
|
||||
.await
|
||||
.map_err(|e| AgentError::Other(format!("Impact store error: {e}")))?;
|
||||
impacts.push(impact);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(GraphContext {
|
||||
node_count: build_run.node_count,
|
||||
edge_count: build_run.edge_count,
|
||||
community_count: build_run.community_count,
|
||||
impacts,
|
||||
})
|
||||
}
|
||||
|
||||
/// Trigger DAST scan if a target is configured for this repo
|
||||
async fn maybe_trigger_dast(&self, repo_id: &str, scan_run_id: &str) {
|
||||
use futures_util::TryStreamExt;
|
||||
|
||||
let filter = mongodb::bson::doc! { "repo_id": repo_id };
|
||||
let targets: Vec<compliance_core::models::DastTarget> = match self
|
||||
.db
|
||||
.dast_targets()
|
||||
.find(filter)
|
||||
.await
|
||||
{
|
||||
Ok(cursor) => cursor.try_collect().await.unwrap_or_default(),
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
if targets.is_empty() {
|
||||
tracing::info!("[{repo_id}] No DAST targets configured, skipping");
|
||||
return;
|
||||
}
|
||||
|
||||
for target in targets {
|
||||
let db = self.db.clone();
|
||||
let scan_run_id = scan_run_id.to_string();
|
||||
tokio::spawn(async move {
|
||||
let orchestrator = compliance_dast::DastOrchestrator::new(100);
|
||||
match orchestrator.run_scan(&target, Vec::new()).await {
|
||||
Ok((mut scan_run, findings)) => {
|
||||
scan_run.sast_scan_run_id = Some(scan_run_id);
|
||||
if let Err(e) = db.dast_scan_runs().insert_one(&scan_run).await {
|
||||
tracing::error!("Failed to store DAST scan run: {e}");
|
||||
}
|
||||
for finding in &findings {
|
||||
if let Err(e) = db.dast_findings().insert_one(finding).await {
|
||||
tracing::error!("Failed to store DAST finding: {e}");
|
||||
}
|
||||
}
|
||||
tracing::info!(
|
||||
"DAST scan complete: {} findings",
|
||||
findings.len()
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("DAST scan failed: {e}");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_phase(&self, scan_run_id: &str, phase: &str) {
|
||||
if let Ok(oid) = mongodb::bson::oid::ObjectId::parse_str(scan_run_id) {
|
||||
let _ = self
|
||||
|
||||
Reference in New Issue
Block a user