Files
compliance-scanner-agent/compliance-agent/src/api/handlers/graph.rs
T
Sharang Parnerkar cdfbb62f9d
CI / Check (pull_request) Successful in 8m9s
CI / Detect Changes (pull_request) Has been skipped
CI / Deploy Agent (pull_request) Has been skipped
CI / Deploy Dashboard (pull_request) Has been skipped
CI / Deploy Docs (pull_request) Has been skipped
CI / Deploy MCP (pull_request) Has been skipped
feat(m7.2-B): migrate API handlers to per-tenant database pool
Builds on PR M7.2-A. Every HTTP handler in compliance-agent/src/api/
now takes a TenantCtx extractor and pulls a tenant-scoped Database
from agent.db_pool.for_tenant(&ctx). The query bodies are unchanged —
`db.findings().find(doc! {...})` reads from the tenant's own physical
database, so the filter doc cannot leak data across tenants because
the wrong tenant's data is literally on a different db handle.

Changes
- New `dto::tenant_db(&agent, &tenant) -> Result<Database, StatusCode>`
  helper. Every migrated handler calls it at the top of the body
  instead of `let db = &agent.db;`. 500 on the rare pool failure;
  4xx auth failures are already handled by the M7.1 status gate.
- New `api::server::inject_dev_tenant` middleware mounted only when
  Keycloak is NOT configured. Synthesizes a TenantContext with
  tenant_id = $DEV_TENANT_ID (default `dev`) so `cargo run` against
  a bare Mongo + no KC still serves the API. Logged loudly as
  "DO NOT use in any environment with real customer data".
- Test harness: TestServer mounts inject_dev_tenant so existing E2E
  tests reach handlers; cleanup() now drops every <db_name>_*
  per-tenant database, not just the legacy <db_name>.

Files migrated (handler count, all pass `cargo build`):
- chat.rs (3) — also rewires RagPipeline + EmbeddingStore to the
  tenant DB's inner() so vector search is per-tenant
- dast.rs (5)
- findings.rs (5)
- graph.rs (7) — also rewires GraphStore inside trigger_build's
  spawn to the tenant DB
- health.rs (1) — stats_overview migrated; public /health stays
  un-scoped
- issues.rs (1)
- notifications.rs (5)
- pentest_handlers/session.rs (12) — both wizard + legacy paths,
  plus pause/resume/stop/get_attack_chain/get_messages/
  get_session_findings/lookup_repo. PentestOrchestrator now gets
  the tenant DB clone in its spawn.
- pentest_handlers/export.rs (1) — fans out across sessions,
  attack_chain_nodes, dast_findings, findings, sbom_entries,
  graph_nodes from a single tenant_db acquisition
- pentest_handlers/stats.rs (1)
- pentest_handlers/stream.rs (1) — SSE handler verifies session
  via the tenant DB before subscribing
- repos.rs (6)
- sbom.rs (5)
- scans.rs (1)

help_chat.rs has no DB queries and was skipped.

Test plan
- cargo fmt --all clean
- cargo clippy --workspace --exclude compliance-dashboard
  -- -D warnings clean
- cargo test -p compliance-core --lib — 7 pass
- cargo test -p compliance-agent --lib — 228 pass
- cargo test -p compliance-agent --test tenant_isolation — 5 pass
  (driver-level isolation still holds post-handler migration)
- cargo test -p compliance-agent --test tenant_status_middleware
  — 6 pass

What's not yet migrated (PR-C / PR-D)
- scheduler.rs (6 sites), pipeline/orchestrator.rs (14),
  pentest/orchestrator.rs (13), webhooks (gitea/github/gitlab),
  trackers/jira.rs, pipeline/dedup.rs etc. — background paths
  without a JWT-derived tenant context.
- agent.db is still in the ComplianceAgent struct as a transitional
  handle for those paths. PR-D removes it once PR-C migrates the
  background paths.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-17 13:28:33 +02:00

372 lines
11 KiB
Rust

use std::sync::Arc;
use axum::extract::{Extension, Path, Query};
use axum::http::StatusCode;
use axum::Json;
use mongodb::bson::doc;
use serde::{Deserialize, Serialize};
use compliance_core::models::graph::{CodeEdge, CodeNode, GraphBuildRun, ImpactAnalysis};
use compliance_core::tenant_ctx::TenantCtx;
use crate::agent::ComplianceAgent;
use super::dto::tenant_db;
use super::{collect_cursor_async, ApiResponse};
type AgentExt = Extension<Arc<ComplianceAgent>>;
#[derive(Serialize)]
pub struct GraphData {
pub build: Option<GraphBuildRun>,
pub nodes: Vec<CodeNode>,
pub edges: Vec<CodeEdge>,
}
#[derive(Deserialize)]
pub struct SearchParams {
pub q: String,
#[serde(default = "default_search_limit")]
pub limit: usize,
}
fn default_search_limit() -> usize {
50
}
/// GET /api/v1/graph/:repo_id — Full graph data
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn get_graph(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>,
) -> Result<Json<ApiResponse<GraphData>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
// Get latest build
let build: Option<GraphBuildRun> = db
.graph_builds()
.find_one(doc! { "repo_id": &repo_id })
.sort(doc! { "started_at": -1 })
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let (nodes, edges) = if build.is_some() {
// Filter by repo_id only — delete_repo_graph clears old data before each rebuild,
// so there is only one set of nodes/edges per repo.
let filter = doc! { "repo_id": &repo_id };
let all_nodes: Vec<CodeNode> = match db.graph_nodes().find(filter.clone()).await {
Ok(cursor) => collect_cursor_async(cursor).await,
Err(e) => {
tracing::warn!("Failed to fetch graph nodes: {e}");
Vec::new()
}
};
let edges: Vec<CodeEdge> = match db.graph_edges().find(filter).await {
Ok(cursor) => collect_cursor_async(cursor).await,
Err(e) => {
tracing::warn!("Failed to fetch graph edges: {e}");
Vec::new()
}
};
// Remove disconnected nodes (no edges) to keep the graph clean
let connected: std::collections::HashSet<&str> = edges
.iter()
.flat_map(|e| [e.source.as_str(), e.target.as_str()])
.collect();
let nodes = all_nodes
.into_iter()
.filter(|n| connected.contains(n.qualified_name.as_str()))
.collect();
(nodes, edges)
} else {
(Vec::new(), Vec::new())
};
Ok(Json(ApiResponse {
data: GraphData {
build,
nodes,
edges,
},
total: None,
page: None,
}))
}
/// GET /api/v1/graph/:repo_id/nodes — List nodes (paginated)
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn get_nodes(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>,
) -> Result<Json<ApiResponse<Vec<CodeNode>>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let filter = doc! { "repo_id": &repo_id };
let nodes: Vec<CodeNode> = match db.graph_nodes().find(filter).await {
Ok(cursor) => collect_cursor_async(cursor).await,
Err(e) => {
tracing::warn!("Failed to fetch graph nodes: {e}");
Vec::new()
}
};
let total = nodes.len() as u64;
Ok(Json(ApiResponse {
data: nodes,
total: Some(total),
page: None,
}))
}
/// GET /api/v1/graph/:repo_id/communities — List detected communities
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn get_communities(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>,
) -> Result<Json<ApiResponse<Vec<CommunityInfo>>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let filter = doc! { "repo_id": &repo_id };
let nodes: Vec<CodeNode> = match db.graph_nodes().find(filter).await {
Ok(cursor) => collect_cursor_async(cursor).await,
Err(e) => {
tracing::warn!("Failed to fetch graph nodes for communities: {e}");
Vec::new()
}
};
let mut communities: std::collections::HashMap<u32, Vec<String>> =
std::collections::HashMap::new();
for node in &nodes {
if let Some(cid) = node.community_id {
communities
.entry(cid)
.or_default()
.push(node.qualified_name.clone());
}
}
let mut result: Vec<CommunityInfo> = communities
.into_iter()
.map(|(id, members)| CommunityInfo {
community_id: id,
member_count: members.len() as u32,
members,
})
.collect();
result.sort_by_key(|c| c.community_id);
let total = result.len() as u64;
Ok(Json(ApiResponse {
data: result,
total: Some(total),
page: None,
}))
}
#[derive(Serialize)]
pub struct CommunityInfo {
pub community_id: u32,
pub member_count: u32,
pub members: Vec<String>,
}
/// GET /api/v1/graph/:repo_id/impact/:finding_id — Impact analysis
#[tracing::instrument(skip_all, fields(repo_id = %repo_id, finding_id = %finding_id))]
pub async fn get_impact(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path((repo_id, finding_id)): Path<(String, String)>,
) -> Result<Json<ApiResponse<Option<ImpactAnalysis>>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let filter = doc! { "repo_id": &repo_id, "finding_id": &finding_id };
let impact = db
.impact_analyses()
.find_one(filter)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(ApiResponse {
data: impact,
total: None,
page: None,
}))
}
/// GET /api/v1/graph/:repo_id/search — BM25 symbol search
#[tracing::instrument(skip_all, fields(repo_id = %repo_id, query = %params.q))]
pub async fn search_symbols(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>,
Query(params): Query<SearchParams>,
) -> Result<Json<ApiResponse<Vec<CodeNode>>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
// Simple text search on qualified_name and name fields
let filter = doc! {
"repo_id": &repo_id,
"name": { "$regex": &params.q, "$options": "i" },
};
let nodes: Vec<CodeNode> = match db
.graph_nodes()
.find(filter)
.limit(params.limit as i64)
.await
{
Ok(cursor) => collect_cursor_async(cursor).await,
Err(e) => {
tracing::warn!("Failed to search graph nodes: {e}");
Vec::new()
}
};
let total = nodes.len() as u64;
Ok(Json(ApiResponse {
data: nodes,
total: Some(total),
page: None,
}))
}
/// GET /api/v1/graph/:repo_id/file-content — Read source file from cloned repo
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn get_file_content(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>,
Query(params): Query<FileContentParams>,
) -> Result<Json<ApiResponse<FileContent>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
// Look up the repository to get repo name
let repo = db
.repositories()
.find_one(doc! { "_id": mongodb::bson::oid::ObjectId::parse_str(&repo_id).ok() })
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.ok_or(StatusCode::NOT_FOUND)?;
let base_path = std::path::Path::new(&agent.config.git_clone_base_path);
let file_path = base_path.join(&repo.name).join(&params.path);
// Security: ensure we don't escape the repo directory
let canonical = file_path
.canonicalize()
.map_err(|_| StatusCode::NOT_FOUND)?;
let base_canonical = base_path
.join(&repo.name)
.canonicalize()
.map_err(|_| StatusCode::NOT_FOUND)?;
if !canonical.starts_with(&base_canonical) {
return Err(StatusCode::FORBIDDEN);
}
let content = std::fs::read_to_string(&canonical).map_err(|_| StatusCode::NOT_FOUND)?;
// Cap at 10,000 lines
let truncated: String = content.lines().take(10_000).collect::<Vec<_>>().join("\n");
let language = params.path.rsplit('.').next().unwrap_or("").to_string();
Ok(Json(ApiResponse {
data: FileContent {
content: truncated,
path: params.path,
language,
},
total: None,
page: None,
}))
}
#[derive(Deserialize)]
pub struct FileContentParams {
pub path: String,
}
#[derive(Serialize)]
pub struct FileContent {
pub content: String,
pub path: String,
pub language: String,
}
/// POST /api/v1/graph/:repo_id/build — Trigger graph rebuild
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn trigger_build(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let agent_clone = (*agent).clone();
tokio::spawn(async move {
let repo = match db
.repositories()
.find_one(doc! { "_id": mongodb::bson::oid::ObjectId::parse_str(&repo_id).ok() })
.await
{
Ok(Some(r)) => r,
_ => {
tracing::error!("Repository {repo_id} not found for graph build");
return;
}
};
let creds = crate::pipeline::git::RepoCredentials {
ssh_key_path: Some(agent_clone.config.ssh_key_path.clone()),
auth_token: repo.auth_token.clone(),
auth_username: repo.auth_username.clone(),
};
let git_ops =
crate::pipeline::git::GitOps::new(&agent_clone.config.git_clone_base_path, creds);
let repo_path = match git_ops.clone_or_fetch(&repo.git_url, &repo.name) {
Ok(p) => p,
Err(e) => {
tracing::error!("Failed to clone repo for graph build: {e}");
return;
}
};
let graph_build_id = uuid::Uuid::new_v4().to_string();
let engine = compliance_graph::GraphEngine::new(50_000);
match engine.build_graph(&repo_path, &repo_id, &graph_build_id) {
Ok((code_graph, build_run)) => {
let store = compliance_graph::graph::persistence::GraphStore::new(db.inner());
let _ = store.delete_repo_graph(&repo_id).await;
let _ = store
.store_graph(&build_run, &code_graph.nodes, &code_graph.edges)
.await;
tracing::info!(
"[{repo_id}] Graph rebuild complete: {} nodes, {} edges",
build_run.node_count,
build_run.edge_count
);
}
Err(e) => {
tracing::error!("[{repo_id}] Graph rebuild failed: {e}");
}
}
});
Ok(Json(
serde_json::json!({ "status": "graph_build_triggered" }),
))
}