feat(m7.2-B): migrate API handlers to per-tenant database pool
CI / Check (pull_request) Successful in 8m9s
CI / Detect Changes (pull_request) Has been skipped
CI / Deploy Agent (pull_request) Has been skipped
CI / Deploy Dashboard (pull_request) Has been skipped
CI / Deploy Docs (pull_request) Has been skipped
CI / Deploy MCP (pull_request) Has been skipped
CI / Check (pull_request) Successful in 8m9s
CI / Detect Changes (pull_request) Has been skipped
CI / Deploy Agent (pull_request) Has been skipped
CI / Deploy Dashboard (pull_request) Has been skipped
CI / Deploy Docs (pull_request) Has been skipped
CI / Deploy MCP (pull_request) Has been skipped
Builds on PR M7.2-A. Every HTTP handler in compliance-agent/src/api/
now takes a TenantCtx extractor and pulls a tenant-scoped Database
from agent.db_pool.for_tenant(&ctx). The query bodies are unchanged —
`db.findings().find(doc! {...})` reads from the tenant's own physical
database, so the filter doc cannot leak data across tenants because
the wrong tenant's data is literally on a different db handle.
Changes
- New `dto::tenant_db(&agent, &tenant) -> Result<Database, StatusCode>`
helper. Every migrated handler calls it at the top of the body
instead of `let db = &agent.db;`. 500 on the rare pool failure;
4xx auth failures are already handled by the M7.1 status gate.
- New `api::server::inject_dev_tenant` middleware mounted only when
Keycloak is NOT configured. Synthesizes a TenantContext with
tenant_id = $DEV_TENANT_ID (default `dev`) so `cargo run` against
a bare Mongo + no KC still serves the API. Logged loudly as
"DO NOT use in any environment with real customer data".
- Test harness: TestServer mounts inject_dev_tenant so existing E2E
tests reach handlers; cleanup() now drops every <db_name>_*
per-tenant database, not just the legacy <db_name>.
Files migrated (handler count, all pass `cargo build`):
- chat.rs (3) — also rewires RagPipeline + EmbeddingStore to the
tenant DB's inner() so vector search is per-tenant
- dast.rs (5)
- findings.rs (5)
- graph.rs (7) — also rewires GraphStore inside trigger_build's
spawn to the tenant DB
- health.rs (1) — stats_overview migrated; public /health stays
un-scoped
- issues.rs (1)
- notifications.rs (5)
- pentest_handlers/session.rs (12) — both wizard + legacy paths,
plus pause/resume/stop/get_attack_chain/get_messages/
get_session_findings/lookup_repo. PentestOrchestrator now gets
the tenant DB clone in its spawn.
- pentest_handlers/export.rs (1) — fans out across sessions,
attack_chain_nodes, dast_findings, findings, sbom_entries,
graph_nodes from a single tenant_db acquisition
- pentest_handlers/stats.rs (1)
- pentest_handlers/stream.rs (1) — SSE handler verifies session
via the tenant DB before subscribing
- repos.rs (6)
- sbom.rs (5)
- scans.rs (1)
help_chat.rs has no DB queries and was skipped.
Test plan
- cargo fmt --all clean
- cargo clippy --workspace --exclude compliance-dashboard
-- -D warnings clean
- cargo test -p compliance-core --lib — 7 pass
- cargo test -p compliance-agent --lib — 228 pass
- cargo test -p compliance-agent --test tenant_isolation — 5 pass
(driver-level isolation still holds post-handler migration)
- cargo test -p compliance-agent --test tenant_status_middleware
— 6 pass
What's not yet migrated (PR-C / PR-D)
- scheduler.rs (6 sites), pipeline/orchestrator.rs (14),
pentest/orchestrator.rs (13), webhooks (gitea/github/gitlab),
trackers/jira.rs, pipeline/dedup.rs etc. — background paths
without a JWT-derived tenant context.
- agent.db is still in the ComplianceAgent struct as a transitional
handle for those paths. PR-D removes it once PR-C migrates the
background paths.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -7,9 +7,11 @@ use mongodb::bson::doc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use compliance_core::models::graph::{CodeEdge, CodeNode, GraphBuildRun, ImpactAnalysis};
|
||||
use compliance_core::tenant_ctx::TenantCtx;
|
||||
|
||||
use crate::agent::ComplianceAgent;
|
||||
|
||||
use super::dto::tenant_db;
|
||||
use super::{collect_cursor_async, ApiResponse};
|
||||
|
||||
type AgentExt = Extension<Arc<ComplianceAgent>>;
|
||||
@@ -36,9 +38,11 @@ fn default_search_limit() -> usize {
|
||||
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
|
||||
pub async fn get_graph(
|
||||
Extension(agent): AgentExt,
|
||||
tenant: TenantCtx,
|
||||
Path(repo_id): Path<String>,
|
||||
) -> Result<Json<ApiResponse<GraphData>>, StatusCode> {
|
||||
let db = &agent.db;
|
||||
let db = tenant_db(&agent, &tenant).await?;
|
||||
let db = &db;
|
||||
|
||||
// Get latest build
|
||||
let build: Option<GraphBuildRun> = db
|
||||
@@ -98,9 +102,11 @@ pub async fn get_graph(
|
||||
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
|
||||
pub async fn get_nodes(
|
||||
Extension(agent): AgentExt,
|
||||
tenant: TenantCtx,
|
||||
Path(repo_id): Path<String>,
|
||||
) -> Result<Json<ApiResponse<Vec<CodeNode>>>, StatusCode> {
|
||||
let db = &agent.db;
|
||||
let db = tenant_db(&agent, &tenant).await?;
|
||||
let db = &db;
|
||||
let filter = doc! { "repo_id": &repo_id };
|
||||
|
||||
let nodes: Vec<CodeNode> = match db.graph_nodes().find(filter).await {
|
||||
@@ -123,9 +129,11 @@ pub async fn get_nodes(
|
||||
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
|
||||
pub async fn get_communities(
|
||||
Extension(agent): AgentExt,
|
||||
tenant: TenantCtx,
|
||||
Path(repo_id): Path<String>,
|
||||
) -> Result<Json<ApiResponse<Vec<CommunityInfo>>>, StatusCode> {
|
||||
let db = &agent.db;
|
||||
let db = tenant_db(&agent, &tenant).await?;
|
||||
let db = &db;
|
||||
let filter = doc! { "repo_id": &repo_id };
|
||||
|
||||
let nodes: Vec<CodeNode> = match db.graph_nodes().find(filter).await {
|
||||
@@ -176,9 +184,11 @@ pub struct CommunityInfo {
|
||||
#[tracing::instrument(skip_all, fields(repo_id = %repo_id, finding_id = %finding_id))]
|
||||
pub async fn get_impact(
|
||||
Extension(agent): AgentExt,
|
||||
tenant: TenantCtx,
|
||||
Path((repo_id, finding_id)): Path<(String, String)>,
|
||||
) -> Result<Json<ApiResponse<Option<ImpactAnalysis>>>, StatusCode> {
|
||||
let db = &agent.db;
|
||||
let db = tenant_db(&agent, &tenant).await?;
|
||||
let db = &db;
|
||||
let filter = doc! { "repo_id": &repo_id, "finding_id": &finding_id };
|
||||
|
||||
let impact = db
|
||||
@@ -198,10 +208,12 @@ pub async fn get_impact(
|
||||
#[tracing::instrument(skip_all, fields(repo_id = %repo_id, query = %params.q))]
|
||||
pub async fn search_symbols(
|
||||
Extension(agent): AgentExt,
|
||||
tenant: TenantCtx,
|
||||
Path(repo_id): Path<String>,
|
||||
Query(params): Query<SearchParams>,
|
||||
) -> Result<Json<ApiResponse<Vec<CodeNode>>>, StatusCode> {
|
||||
let db = &agent.db;
|
||||
let db = tenant_db(&agent, &tenant).await?;
|
||||
let db = &db;
|
||||
|
||||
// Simple text search on qualified_name and name fields
|
||||
let filter = doc! {
|
||||
@@ -234,10 +246,12 @@ pub async fn search_symbols(
|
||||
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
|
||||
pub async fn get_file_content(
|
||||
Extension(agent): AgentExt,
|
||||
tenant: TenantCtx,
|
||||
Path(repo_id): Path<String>,
|
||||
Query(params): Query<FileContentParams>,
|
||||
) -> Result<Json<ApiResponse<FileContent>>, StatusCode> {
|
||||
let db = &agent.db;
|
||||
let db = tenant_db(&agent, &tenant).await?;
|
||||
let db = &db;
|
||||
|
||||
// Look up the repository to get repo name
|
||||
let repo = db
|
||||
@@ -296,12 +310,13 @@ pub struct FileContent {
|
||||
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
|
||||
pub async fn trigger_build(
|
||||
Extension(agent): AgentExt,
|
||||
tenant: TenantCtx,
|
||||
Path(repo_id): Path<String>,
|
||||
) -> Result<Json<serde_json::Value>, StatusCode> {
|
||||
let db = tenant_db(&agent, &tenant).await?;
|
||||
let agent_clone = (*agent).clone();
|
||||
tokio::spawn(async move {
|
||||
let repo = match agent_clone
|
||||
.db
|
||||
let repo = match db
|
||||
.repositories()
|
||||
.find_one(doc! { "_id": mongodb::bson::oid::ObjectId::parse_str(&repo_id).ok() })
|
||||
.await
|
||||
@@ -333,8 +348,7 @@ pub async fn trigger_build(
|
||||
|
||||
match engine.build_graph(&repo_path, &repo_id, &graph_build_id) {
|
||||
Ok((code_graph, build_run)) => {
|
||||
let store =
|
||||
compliance_graph::graph::persistence::GraphStore::new(agent_clone.db.inner());
|
||||
let store = compliance_graph::graph::persistence::GraphStore::new(db.inner());
|
||||
let _ = store.delete_repo_graph(&repo_id).await;
|
||||
let _ = store
|
||||
.store_graph(&build_run, &code_graph.nodes, &code_graph.edges)
|
||||
|
||||
Reference in New Issue
Block a user