e3aabe7d18
CI / Check (pull_request) Successful in 8m40s
CI / Detect Changes (pull_request) Has been skipped
CI / Deploy Agent (pull_request) Has been skipped
CI / Deploy Dashboard (pull_request) Has been skipped
CI / Deploy Docs (pull_request) Has been skipped
CI / Deploy MCP (pull_request) Has been skipped
First slice of the M7.2 tenant-isolation work. Adds a `DatabasePool`
that hands out per-tenant `Database` handles physically scoped to
`<prefix>_<tenant_id>` Mongo databases. Isolation is at the driver,
not at "we hope we filter" — a handle for tenant A literally cannot
see tenant B's documents because it's connected to a different db.
What's in this PR
- DatabasePool::connect — pings the cluster, prepares per-tenant lazy
handles.
- DatabasePool::for_tenant(&TenantContext) — returns a Database scoped
to that tenant. ensure_indexes runs once per tenant per process via
a DashMap-backed marker; failure rolls the marker back so the next
request retries.
- tenant_db_name — `<prefix>_<sanitized_tenant_id>` if it fits in
Mongo's 63-byte db-name cap, else `<prefix>_<sha256-16hex>` fallback.
- Sanitizer rewrites the Mongo-disallowed chars (`/ \ . " $ <space>
NUL`) so any future tenant_id shape works.
- ComplianceAgent gains a `db_pool: DatabasePool` field next to the
existing `db: Database`. Handlers / pipelines / webhooks still use
`db` — they migrate to `db_pool.for_tenant(&ctx)` in M7.2-B/C and
`db` goes away in M7.2-D.
Test plan
- cargo fmt --all clean
- cargo clippy --workspace --exclude compliance-dashboard -- -D warnings
clean
- cargo test -p compliance-core --lib — 7 pass
- cargo test -p compliance-agent --lib — 228 pass
- cargo test -p compliance-agent --test tenant_isolation — 4 pass
against live mongo on 27017:
* pool_isolates_tenants_at_driver_level — writes for acme + globex,
reads through each tenant's handle; each sees exactly its own
data with no filter doc anywhere.
* for_tenant_is_idempotent_index_creation — second + third call
for the same tenant do not error.
* tenant_db_name_sanitizes_unsafe_characters
* tenant_db_name_falls_back_to_hash_when_too_long — 100-byte
tenant_id collapses to a stable 8-byte hex suffix.
Why per-tenant DB vs `tenant_id` field + filter
- Driver-level isolation; impossible to forget the filter on one of
the 184 query call-sites in compliance-agent.
- Handlers don't change shape at migration — `agent.db.findings()`
becomes `db.findings()` after pulling `db` from
`agent.db_pool.for_tenant(&ctx)`.
- GDPR delete = `db.dropDatabase()`.
- On-prem deploy = the same code path, with one tenant.
- Trade-off accepted: index storage duplicated per tenant; Mongo's
~thousand-db ceiling is way above the 10s-100s tenants we're
targeting.
Caveats
- Existing `agent.db` continues to point at the single legacy db.
Handlers / pipelines that use it are unscoped until M7.2-B/C
migrate them.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
156 lines
5.3 KiB
Rust
156 lines
5.3 KiB
Rust
use std::sync::Arc;
|
|
|
|
use dashmap::DashMap;
|
|
use tokio::sync::{broadcast, watch, Semaphore};
|
|
|
|
use compliance_core::models::pentest::PentestEvent;
|
|
use compliance_core::AgentConfig;
|
|
|
|
use crate::database::{Database, DatabasePool};
|
|
use crate::llm::LlmClient;
|
|
use crate::pipeline::orchestrator::PipelineOrchestrator;
|
|
|
|
/// Default maximum concurrent pentest sessions.
|
|
const DEFAULT_MAX_CONCURRENT_SESSIONS: usize = 5;
|
|
|
|
#[derive(Clone)]
|
|
pub struct ComplianceAgent {
|
|
pub config: AgentConfig,
|
|
/// Transitional single-database handle. Used by handlers that have
|
|
/// not yet been migrated to `db_pool.for_tenant(&ctx)` (M7.2-B/C).
|
|
/// Will be removed once every call site is tenant-scoped (M7.2-D).
|
|
pub db: Database,
|
|
/// Per-tenant Mongo broker introduced in M7.2-A. Handlers should
|
|
/// prefer this and obtain a tenant-scoped [`Database`] from it.
|
|
pub db_pool: DatabasePool,
|
|
pub llm: Arc<LlmClient>,
|
|
pub http: reqwest::Client,
|
|
/// Per-session broadcast senders for SSE streaming.
|
|
pub session_streams: Arc<DashMap<String, broadcast::Sender<PentestEvent>>>,
|
|
/// Per-session pause controls (true = paused).
|
|
pub session_pause: Arc<DashMap<String, watch::Sender<bool>>>,
|
|
/// Semaphore limiting concurrent pentest sessions.
|
|
pub session_semaphore: Arc<Semaphore>,
|
|
}
|
|
|
|
impl ComplianceAgent {
|
|
pub fn new(config: AgentConfig, db: Database, db_pool: DatabasePool) -> Self {
|
|
let llm = Arc::new(LlmClient::new(
|
|
config.litellm_url.clone(),
|
|
config.litellm_api_key.clone(),
|
|
config.litellm_model.clone(),
|
|
config.litellm_embed_model.clone(),
|
|
));
|
|
let http = reqwest::Client::builder()
|
|
.timeout(std::time::Duration::from_secs(30))
|
|
.connect_timeout(std::time::Duration::from_secs(10))
|
|
.build()
|
|
.unwrap_or_default();
|
|
Self {
|
|
config,
|
|
db,
|
|
db_pool,
|
|
llm,
|
|
http,
|
|
session_streams: Arc::new(DashMap::new()),
|
|
session_pause: Arc::new(DashMap::new()),
|
|
session_semaphore: Arc::new(Semaphore::new(DEFAULT_MAX_CONCURRENT_SESSIONS)),
|
|
}
|
|
}
|
|
|
|
pub async fn run_scan(
|
|
&self,
|
|
repo_id: &str,
|
|
trigger: compliance_core::models::ScanTrigger,
|
|
) -> Result<(), crate::error::AgentError> {
|
|
let orchestrator = PipelineOrchestrator::new(
|
|
self.config.clone(),
|
|
self.db.clone(),
|
|
self.llm.clone(),
|
|
self.http.clone(),
|
|
);
|
|
orchestrator.run(repo_id, trigger).await
|
|
}
|
|
|
|
/// Run a PR review: scan the diff and post review comments.
|
|
pub async fn run_pr_review(
|
|
&self,
|
|
repo_id: &str,
|
|
pr_number: u64,
|
|
base_sha: &str,
|
|
head_sha: &str,
|
|
) -> Result<(), crate::error::AgentError> {
|
|
let repo = self
|
|
.db
|
|
.repositories()
|
|
.find_one(mongodb::bson::doc! {
|
|
"_id": mongodb::bson::oid::ObjectId::parse_str(repo_id)
|
|
.map_err(|e| crate::error::AgentError::Other(e.to_string()))?
|
|
})
|
|
.await?
|
|
.ok_or_else(|| {
|
|
crate::error::AgentError::Other(format!("Repository {repo_id} not found"))
|
|
})?;
|
|
|
|
let orchestrator = PipelineOrchestrator::new(
|
|
self.config.clone(),
|
|
self.db.clone(),
|
|
self.llm.clone(),
|
|
self.http.clone(),
|
|
);
|
|
orchestrator
|
|
.run_pr_review(&repo, repo_id, pr_number, base_sha, head_sha)
|
|
.await
|
|
}
|
|
|
|
// ── Session stream management ──────────────────────────────────
|
|
|
|
/// Register a broadcast sender for a session. Returns the sender.
|
|
pub fn register_session_stream(&self, session_id: &str) -> broadcast::Sender<PentestEvent> {
|
|
let (tx, _) = broadcast::channel(256);
|
|
self.session_streams
|
|
.insert(session_id.to_string(), tx.clone());
|
|
tx
|
|
}
|
|
|
|
/// Subscribe to a session's broadcast stream.
|
|
pub fn subscribe_session(&self, session_id: &str) -> Option<broadcast::Receiver<PentestEvent>> {
|
|
self.session_streams
|
|
.get(session_id)
|
|
.map(|tx| tx.subscribe())
|
|
}
|
|
|
|
// ── Session pause/resume management ────────────────────────────
|
|
|
|
/// Register a pause control for a session. Returns the watch receiver.
|
|
pub fn register_pause_control(&self, session_id: &str) -> watch::Receiver<bool> {
|
|
let (tx, rx) = watch::channel(false);
|
|
self.session_pause.insert(session_id.to_string(), tx);
|
|
rx
|
|
}
|
|
|
|
/// Pause a session.
|
|
pub fn pause_session(&self, session_id: &str) -> bool {
|
|
if let Some(tx) = self.session_pause.get(session_id) {
|
|
tx.send(true).is_ok()
|
|
} else {
|
|
false
|
|
}
|
|
}
|
|
|
|
/// Resume a session.
|
|
pub fn resume_session(&self, session_id: &str) -> bool {
|
|
if let Some(tx) = self.session_pause.get(session_id) {
|
|
tx.send(false).is_ok()
|
|
} else {
|
|
false
|
|
}
|
|
}
|
|
|
|
/// Clean up all per-session resources.
|
|
pub fn cleanup_session(&self, session_id: &str) {
|
|
self.session_streams.remove(session_id);
|
|
self.session_pause.remove(session_id);
|
|
}
|
|
}
|