diff --git a/compliance-agent/src/agent.rs b/compliance-agent/src/agent.rs index 61e73e6..bc66705 100644 --- a/compliance-agent/src/agent.rs +++ b/compliance-agent/src/agent.rs @@ -6,7 +6,7 @@ use tokio::sync::{broadcast, watch, Semaphore}; use compliance_core::models::pentest::PentestEvent; use compliance_core::AgentConfig; -use crate::database::Database; +use crate::database::{Database, DatabasePool}; use crate::llm::LlmClient; use crate::pipeline::orchestrator::PipelineOrchestrator; @@ -16,7 +16,13 @@ const DEFAULT_MAX_CONCURRENT_SESSIONS: usize = 5; #[derive(Clone)] pub struct ComplianceAgent { pub config: AgentConfig, + /// Transitional single-database handle. Used by handlers that have + /// not yet been migrated to `db_pool.for_tenant(&ctx)` (M7.2-B/C). + /// Will be removed once every call site is tenant-scoped (M7.2-D). pub db: Database, + /// Per-tenant Mongo broker introduced in M7.2-A. Handlers should + /// prefer this and obtain a tenant-scoped [`Database`] from it. + pub db_pool: DatabasePool, pub llm: Arc, pub http: reqwest::Client, /// Per-session broadcast senders for SSE streaming. @@ -28,7 +34,7 @@ pub struct ComplianceAgent { } impl ComplianceAgent { - pub fn new(config: AgentConfig, db: Database) -> Self { + pub fn new(config: AgentConfig, db: Database, db_pool: DatabasePool) -> Self { let llm = Arc::new(LlmClient::new( config.litellm_url.clone(), config.litellm_api_key.clone(), @@ -43,6 +49,7 @@ impl ComplianceAgent { Self { config, db, + db_pool, llm, http, session_streams: Arc::new(DashMap::new()), diff --git a/compliance-agent/src/database.rs b/compliance-agent/src/database.rs index 80d3b04..0d5e4f6 100644 --- a/compliance-agent/src/database.rs +++ b/compliance-agent/src/database.rs @@ -1,11 +1,151 @@ +use std::sync::Arc; + +use dashmap::DashMap; use mongodb::bson::doc; use mongodb::options::IndexOptions; use mongodb::{Client, Collection, IndexModel}; +use sha2::{Digest, Sha256}; use compliance_core::models::*; +use compliance_core::TenantContext; use crate::error::AgentError; +/// Mongo enforces a 63-byte cap on database names (older clusters: 64 +/// on Linux, 63 on Windows; we target the conservative limit). +const MAX_DB_NAME_LEN: usize = 63; + +/// Hex length of the SHA-256 truncation used for the hash fallback +/// tenant DB name (16 bytes → 32 hex chars). 16 bytes gives ~2^64 +/// birthday-collision resistance — at our 10s-100s tenant scale this +/// is effectively impossible to hit. +const HASH_HEX_LEN: usize = 32; + +/// Largest `db_prefix` that still guarantees the hash-fallback name +/// fits in the 63-byte cap: `prefix + "_" + 32 hex chars`. +const MAX_PREFIX_LEN: usize = MAX_DB_NAME_LEN - 1 - HASH_HEX_LEN; + +/// Per-tenant Mongo connection broker (M7.2 isolation model). +/// +/// Holds one [`Client`] and hands out [`Database`] handles physically +/// scoped to `_`. The driver is the isolation +/// boundary — a handle for tenant A cannot see tenant B's documents +/// because it is connected to a different database, not because of an +/// application-level filter. +/// +/// Index creation runs idempotently the first time each tenant is seen +/// in the process's lifetime. Mongo's `createIndex` is itself idempotent +/// by index name; the in-memory `ensured` set just skips the round-trip. +#[derive(Clone, Debug)] +pub struct DatabasePool { + client: Client, + db_prefix: String, + ensured: Arc>, +} + +impl DatabasePool { + /// Connect to the cluster and prepare to hand out tenant databases + /// named `_`. + /// + /// Validates `db_prefix.len() <= MAX_PREFIX_LEN` so the + /// hash-fallback path is provably within Mongo's 63-byte db-name + /// cap. Refuses to construct a pool that could ever produce an + /// over-long name. + pub async fn connect(uri: &str, db_prefix: &str) -> Result { + if db_prefix.len() > MAX_PREFIX_LEN { + return Err(AgentError::Other(format!( + "db_prefix '{db_prefix}' is {} chars; max is {MAX_PREFIX_LEN} so the \ + hash-fallback tenant DB name fits Mongo's {MAX_DB_NAME_LEN}-byte cap", + db_prefix.len() + ))); + } + let client = Client::with_uri_str(uri).await?; + client + .database("admin") + .run_command(doc! { "ping": 1 }) + .await?; + tracing::info!( + "MongoDB cluster reachable; per-tenant pool ready (db prefix '{db_prefix}')" + ); + Ok(Self { + client, + db_prefix: db_prefix.to_string(), + ensured: Arc::new(DashMap::new()), + }) + } + + /// Return a [`Database`] scoped to this tenant. Ensures indexes on + /// first call per tenant (per process). Cheap on the hot path — + /// subsequent calls skip the round-trip. + pub async fn for_tenant(&self, ctx: &TenantContext) -> Result { + let db_name = self.tenant_db_name(&ctx.tenant_id); + let db = Database::from_database(self.client.database(&db_name)); + // `DashMap::insert` returns the previous value; `None` means we + // were the first writer for this tenant_id and own the + // index-ensure work. + if self.ensured.insert(ctx.tenant_id.clone(), ()).is_none() { + if let Err(e) = db.ensure_indexes().await { + // Roll the marker back so the next request retries. + self.ensured.remove(&ctx.tenant_id); + return Err(e); + } + tracing::debug!( + tenant_id = %ctx.tenant_id, + db_name = %db_name, + "Indexes ensured for tenant database" + ); + } + Ok(db) + } + + /// Compute the Mongo database name for a tenant. Public for tests + /// and tenant offboarding (`pool.client().database(name).drop()`). + /// + /// Format: `_` if it fits the 63-byte + /// cap, else `_`. The + /// `db_prefix` length invariant established at [`Self::connect`] + /// guarantees the hash-fallback name always fits — no runtime + /// assertion needed. + /// + /// Collision resistance: the hash fallback is a 16-byte SHA-256 + /// truncation, which gives ~2^64 birthday-collision resistance. At + /// our 10s–100s tenant scale the probability of two tenant_ids + /// colliding is effectively zero. (8-byte truncation would have + /// been ~2^32 — too close for comfort on a regulated product.) + pub fn tenant_db_name(&self, tenant_id: &str) -> String { + let sanitized = sanitize_tenant_id(tenant_id); + let natural = format!("{}_{}", self.db_prefix, sanitized); + if natural.len() <= MAX_DB_NAME_LEN { + natural + } else { + let mut hasher = Sha256::new(); + hasher.update(tenant_id.as_bytes()); + let digest = hasher.finalize(); + let suffix = hex::encode(&digest[..HASH_HEX_LEN / 2]); + format!("{}_{}", self.db_prefix, suffix) + } + } + + /// Raw client handle. Reserved for cross-tenant admin flows that + /// must opt in explicitly (tenant listing, drop-on-offboard). + pub fn client(&self) -> &Client { + &self.client + } +} + +/// Mongo database names disallow `/`, `\`, `.`, `"`, `$`, ` `, and NUL. +/// breakpilot-dev tenant_ids are UUIDs so this is belt-and-braces, but +/// it lets the pool tolerate any future tenant_id shape without surprise. +fn sanitize_tenant_id(tenant_id: &str) -> String { + tenant_id + .chars() + .map(|c| match c { + '/' | '\\' | '.' | '"' | '$' | ' ' | '\0' => '_', + c => c, + }) + .collect() +} + #[derive(Clone, Debug)] pub struct Database { inner: mongodb::Database, @@ -20,6 +160,12 @@ impl Database { Ok(Self { inner: db }) } + /// Wrap an already-resolved Mongo database. Used by [`DatabasePool`] + /// to hand out tenant-scoped handles without a fresh client per tenant. + pub(crate) fn from_database(inner: mongodb::Database) -> Self { + Self { inner } + } + pub async fn ensure_indexes(&self) -> Result<(), AgentError> { // repositories: unique git_url self.repositories() diff --git a/compliance-agent/src/main.rs b/compliance-agent/src/main.rs index 64e862f..1270b22 100644 --- a/compliance-agent/src/main.rs +++ b/compliance-agent/src/main.rs @@ -28,7 +28,13 @@ async fn main() -> Result<(), Box> { let db = database::Database::connect(&config.mongodb_uri, &config.mongodb_database).await?; db.ensure_indexes().await?; - let agent = agent::ComplianceAgent::new(config.clone(), db.clone()); + // M7.2-A: per-tenant pool. Uses `mongodb_database` as the db-name + // prefix so tenant databases land as `_` next to + // the legacy single-tenant database. + let db_pool = + database::DatabasePool::connect(&config.mongodb_uri, &config.mongodb_database).await?; + + let agent = agent::ComplianceAgent::new(config.clone(), db.clone(), db_pool); tracing::info!("Starting scheduler..."); let scheduler_agent = agent.clone(); diff --git a/compliance-agent/tests/common/mod.rs b/compliance-agent/tests/common/mod.rs index 1cbe46f..756ef34 100644 --- a/compliance-agent/tests/common/mod.rs +++ b/compliance-agent/tests/common/mod.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use compliance_agent::agent::ComplianceAgent; use compliance_agent::api; -use compliance_agent::database::Database; +use compliance_agent::database::{Database, DatabasePool}; use compliance_core::AgentConfig; use secrecy::SecretString; @@ -33,6 +33,10 @@ impl TestServer { .expect("Failed to connect to MongoDB — is it running?"); db.ensure_indexes().await.expect("Failed to create indexes"); + let db_pool = DatabasePool::connect(&mongodb_uri, &db_name) + .await + .expect("Failed to build DatabasePool"); + let config = AgentConfig { mongodb_uri: mongodb_uri.clone(), mongodb_database: db_name.clone(), @@ -69,7 +73,7 @@ impl TestServer { pentest_imap_password: None, }; - let agent = ComplianceAgent::new(config, db); + let agent = ComplianceAgent::new(config, db, db_pool); // Build the router with the agent extension let app = api::routes::build_router() diff --git a/compliance-agent/tests/tenant_isolation.rs b/compliance-agent/tests/tenant_isolation.rs new file mode 100644 index 0000000..a12b57d --- /dev/null +++ b/compliance-agent/tests/tenant_isolation.rs @@ -0,0 +1,234 @@ +//! M7.2-A — `DatabasePool` isolation proof. +//! +//! Two `TenantContext`s, two databases, one client. Insert on A, query +//! on B → empty. Insert on B, query on A → only A's docs. Proves that +//! the per-tenant database split actually isolates at the driver level +//! and not at "we hope we filter." +//! +//! Requires MongoDB. Set `TEST_MONGODB_URI` to override the default +//! `mongodb://root:example@localhost:27017/?authSource=admin`. + +#![allow(clippy::expect_used, clippy::unwrap_used)] + +use compliance_agent::database::DatabasePool; +use compliance_core::models::TrackedRepository; +use compliance_core::{OrgRole, TenantContext, TenantStatus}; +use mongodb::bson::doc; + +fn ctx(tenant_id: &str, slug: &str) -> TenantContext { + TenantContext { + tenant_id: tenant_id.to_string(), + tenant_slug: slug.to_string(), + org_roles: vec![OrgRole::ItAdmin], + products: vec!["compliance-scanner".to_string()], + plan: "starter".to_string(), + status: TenantStatus::Active, + user_id: "u-1".to_string(), + user_name: None, + } +} + +fn fixture_repo(name: &str, git_url: &str) -> TrackedRepository { + TrackedRepository { + id: None, + name: name.to_string(), + git_url: git_url.to_string(), + default_branch: "main".to_string(), + local_path: None, + scan_schedule: None, + webhook_enabled: false, + webhook_secret: None, + tracker_type: None, + tracker_owner: None, + tracker_repo: None, + tracker_token: None, + auth_token: None, + auth_username: None, + last_scanned_commit: None, + findings_count: 0, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + } +} + +#[tokio::test] +async fn pool_isolates_tenants_at_driver_level() { + let uri = std::env::var("TEST_MONGODB_URI") + .unwrap_or_else(|_| "mongodb://root:example@localhost:27017/?authSource=admin".into()); + // Unique per run so parallel test invocations don't collide. Kept + // short because Mongo caps db names at 63 bytes (prefix + tenant_id). + let prefix = format!("m72a_{}", short_id()); + + let pool = DatabasePool::connect(&uri, &prefix) + .await + .expect("Failed to connect to MongoDB — is it running?"); + + let acme = ctx("00000000-0000-0000-0000-00000000acme", "acme"); + let globex = ctx("00000000-0000-0000-0000-0000globex000", "globex"); + + let acme_db = pool.for_tenant(&acme).await.expect("acme db"); + let globex_db = pool.for_tenant(&globex).await.expect("globex db"); + + // Write distinct repos into each tenant's database. + acme_db + .repositories() + .insert_one(fixture_repo("acme-app", "git@example.com:acme/app.git")) + .await + .expect("insert acme"); + globex_db + .repositories() + .insert_one(fixture_repo( + "globex-platform", + "git@example.com:globex/platform.git", + )) + .await + .expect("insert globex"); + + // The point of the whole exercise: acme can ONLY see acme's repo + // and globex can ONLY see globex's, with no filter doc anywhere + // because the isolation is at the database handle, not in the query. + let acme_seen = collect(&acme_db).await; + let globex_seen = collect(&globex_db).await; + + assert_eq!(acme_seen.len(), 1, "acme should see exactly its own repo"); + assert_eq!(acme_seen[0].name, "acme-app"); + assert_eq!( + globex_seen.len(), + 1, + "globex should see exactly its own repo" + ); + assert_eq!(globex_seen[0].name, "globex-platform"); + + // Sanity: the two databases really are different by name. + let acme_db_name = pool.tenant_db_name(&acme.tenant_id); + let globex_db_name = pool.tenant_db_name(&globex.tenant_id); + assert_ne!(acme_db_name, globex_db_name); + assert!(acme_db_name.starts_with(&prefix)); + + // Cleanup — drop both per-tenant databases. + pool.client() + .database(&acme_db_name) + .drop() + .await + .expect("drop acme"); + pool.client() + .database(&globex_db_name) + .drop() + .await + .expect("drop globex"); +} + +#[tokio::test] +async fn for_tenant_is_idempotent_index_creation() { + let uri = std::env::var("TEST_MONGODB_URI") + .unwrap_or_else(|_| "mongodb://root:example@localhost:27017/?authSource=admin".into()); + let prefix = format!("m72a_{}", short_id()); + let pool = DatabasePool::connect(&uri, &prefix).await.expect("connect"); + + let acme = ctx("00000000-0000-0000-0000-00000000acme", "acme"); + + // Second call must not fail (ensure_indexes already ran, in-memory + // marker is set, Mongo's createIndex is idempotent by name anyway). + let _ = pool.for_tenant(&acme).await.expect("first call"); + let _ = pool.for_tenant(&acme).await.expect("second call"); + let _ = pool.for_tenant(&acme).await.expect("third call"); + + // Cleanup + let db_name = pool.tenant_db_name(&acme.tenant_id); + pool.client().database(&db_name).drop().await.expect("drop"); +} + +#[tokio::test] +async fn tenant_db_name_sanitizes_unsafe_characters() { + let uri = std::env::var("TEST_MONGODB_URI") + .unwrap_or_else(|_| "mongodb://root:example@localhost:27017/?authSource=admin".into()); + let pool = DatabasePool::connect(&uri, "m72a_sanitize") + .await + .expect("connect"); + + // Mongo db names cannot contain `/ \ . " $ NUL`. The pool + // must rewrite these without exploding on connect. + let funky = "te/n.a\\nt$id\" with spaces"; + let name = pool.tenant_db_name(funky); + for c in ['/', '\\', '.', '"', '$', ' '] { + assert!( + !name.contains(c), + "sanitized db name still contains {c:?}: {name}" + ); + } +} + +#[tokio::test] +async fn tenant_db_name_falls_back_to_hash_when_too_long() { + let uri = std::env::var("TEST_MONGODB_URI") + .unwrap_or_else(|_| "mongodb://root:example@localhost:27017/?authSource=admin".into()); + let pool = DatabasePool::connect(&uri, "m72a_long") + .await + .expect("connect"); + + // 100-byte tenant_id would overflow the 63-byte db-name cap with + // any reasonable prefix. The pool must hash it down. + let huge = "x".repeat(100); + let name = pool.tenant_db_name(&huge); + assert!(name.len() <= 63, "hashed name should fit: {name}"); + assert!(name.starts_with("m72a_long_")); + // The hash suffix is 32 hex chars (16-byte SHA-256 truncation). + let suffix = name.trim_start_matches("m72a_long_"); + assert_eq!( + suffix.len(), + 32, + "expected 32-hex suffix (16-byte hash), got {suffix:?}" + ); + assert!(suffix.chars().all(|c| c.is_ascii_hexdigit())); + + // Stable: same input → same output. + assert_eq!(name, pool.tenant_db_name(&huge)); + + // Different inputs → different outputs (collision check on a tiny + // sample — full birthday-resistance is a proof not a test). + let huge2 = "y".repeat(100); + assert_ne!(pool.tenant_db_name(&huge), pool.tenant_db_name(&huge2)); +} + +#[tokio::test] +async fn connect_rejects_overlong_db_prefix() { + let uri = std::env::var("TEST_MONGODB_URI") + .unwrap_or_else(|_| "mongodb://root:example@localhost:27017/?authSource=admin".into()); + + // MAX_PREFIX_LEN is 30 (= 63 - 1 - 32). A 31-char prefix MUST be + // rejected at construction so the hash-fallback path can never + // produce an over-long db name at runtime. + let too_long = "a".repeat(31); + let err = DatabasePool::connect(&uri, &too_long).await.unwrap_err(); + let msg = format!("{err}"); + assert!( + msg.contains("max is 30") || msg.contains(&too_long), + "error should explain the cap: {msg}" + ); + + // Exactly 30 chars is the inclusive bound — must succeed. + let just_right = "a".repeat(30); + let _ = DatabasePool::connect(&uri, &just_right) + .await + .expect("30-char prefix should be accepted"); +} + +/// Short UUID slug for keeping test prefixes well under Mongo's 63-byte +/// db-name cap. +fn short_id() -> String { + uuid::Uuid::new_v4().simple().to_string()[..8].to_string() +} + +/// Drain a `repositories` find cursor on the given tenant database. +async fn collect(db: &compliance_agent::database::Database) -> Vec { + let mut cursor = db + .repositories() + .find(doc! {}) + .await + .expect("find repositories"); + let mut out = Vec::new(); + while cursor.advance().await.expect("advance") { + out.push(cursor.deserialize_current().expect("deserialize")); + } + out +}