Compare commits

..

7 Commits

Author SHA1 Message Date
Sharang Parnerkar dcec519565 fix(dashboard): attach Keycloak token on agent API calls
CI / Check (pull_request) Successful in 8m12s
CI / Detect Changes (pull_request) Has been skipped
CI / Deploy Agent (pull_request) Has been skipped
CI / Deploy Dashboard (pull_request) Has been skipped
CI / Deploy Docs (pull_request) Has been skipped
CI / Deploy MCP (pull_request) Has been skipped
Symptom: "unable to load repositories" — agent returns
"Missing authorization header" 401 on every protected endpoint
because the dashboard's server functions were calling reqwest::get
without an Authorization header. The Keycloak OIDC flow
(auth_login / auth_callback) was already wired up and storing the
access_token in tower-sessions, but the access_token was never
threaded into outbound calls.

Fix
- New `infrastructure::agent_client` module exposes:
  - `agent_request(method, path) -> RequestBuilder`
  - `agent_get(path) -> RequestBuilder` (sugar for GET)
  Both pull the session's access_token (via FullstackContext extract)
  and attach `Authorization: Bearer <token>`. When Keycloak is not
  configured the helper short-circuits — matching the dashboard's
  require_auth middleware which short-circuits in the same state.
- Migrated every #[server] function in:
  - chat, dast, findings, graph, issues, notifications, pentest,
    repositories, sbom, scans, stats
  - 57 call sites total, all replaced.
- Left as-is:
  - `infrastructure::server::webhook_proxy` — forwards to the agent's
    separate webhook server (port 3002), which is HMAC-authenticated,
    not JWT-authenticated.
  - `infrastructure::auth::auth_callback` — performs the KC token
    exchange itself; bearer auth would be circular.

Test plan
- cargo fmt --all clean
- cargo clippy -p compliance-dashboard --features server -- -D warnings
  clean
- cargo check -p compliance-dashboard --features server clean
- cargo check -p compliance-dashboard (web target) implicit via build
- Manual: after deploy, dashboard's repositories page loads without
  401; calls now carry Authorization: Bearer header to the agent.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-17 20:31:21 +02:00
Sharang Parnerkar 08c4ec4cff feat(m7.2-D): drop transitional agent.db, add admin helpers
CI / Check (pull_request) Successful in 9m27s
CI / Detect Changes (pull_request) Has been skipped
CI / Deploy Agent (pull_request) Has been skipped
CI / Deploy Dashboard (pull_request) Has been skipped
CI / Deploy Docs (pull_request) Has been skipped
CI / Deploy MCP (pull_request) Has been skipped
Final slice of M7.2. Removes the transitional single-database handle
that M7.2-A introduced alongside the pool, so the compliance-agent
now has a single source of truth for storage: every code path obtains
a tenant-scoped Database from `agent.db_pool.for_tenant_id(...)` or
`for_tenant(&ctx)`. There is no shared "default" database anywhere.

Changes
- ComplianceAgent: `db: Database` field removed. ComplianceAgent::new
  now takes only `(config, db_pool)`. Verified by an earlier grep
  during M7.2-C that no remaining call site reads `agent.db`.
- main.rs: stops constructing the legacy Database. Only the pool is
  built at startup.
- TestServer: same — drops Database::connect/ensure_indexes, builds
  only the pool. cleanup() now drops every `<db_name>_*` per-tenant
  database (no longer touches a bare `<db_name>`).
- DatabasePool::list_tenant_db_names() — lists Mongo databases
  matching the pool's prefix. For admin endpoints + scheduler tenant
  enumeration in a future M7.3 (this PR keeps SCHEDULER_TENANT_IDS
  env config — registry integration is a separate concern).
- DatabasePool::drop_tenant(&str) — idempotent tenant offboarding.
  Drops the per-tenant database and evicts the in-memory `ensured`
  marker so a later re-provision re-runs ensure_indexes.

Test plan
- cargo fmt --all clean
- cargo clippy --workspace --exclude compliance-dashboard
  -- -D warnings clean
- cargo test -p compliance-core --lib — 7 pass
- cargo test -p compliance-agent --lib — 228 pass
- cargo test -p compliance-agent --test tenant_isolation — 6 pass
  including new `admin_helpers_list_and_drop_tenant_dbs`
- cargo test -p compliance-agent --test tenant_status_middleware
  — 6 pass

M7.2 closeout state after this lands
- M7.1 (auth + status) — done
- M7.2-A (pool) — done
- M7.2-B (handlers) — done
- M7.2-C (background paths) — done
- M7.2-D (legacy db removed, admin helpers) — done (this PR)
- Future M7.3: scheduler pulls tenants from tenant-registry instead
  of SCHEDULER_TENANT_IDS env; cross-tenant admin HTTP endpoints
  built on list_tenant_db_names / drop_tenant.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-17 15:05:27 +02:00
Sharang Parnerkar 0f6dd1135e feat(m7.2-C): migrate background paths to per-tenant pool
CI / Check (pull_request) Successful in 10m33s
CI / Detect Changes (pull_request) Has been skipped
CI / Deploy Agent (pull_request) Has been skipped
CI / Deploy Dashboard (pull_request) Has been skipped
CI / Deploy Docs (pull_request) Has been skipped
CI / Deploy MCP (pull_request) Has been skipped
Closes the loop on M7.2 isolation for paths that don't have a JWT
context: scheduler, webhooks, and the agent's `run_scan` / `run_pr_review`
helpers all now take a `tenant_id` at the boundary and resolve to a
tenant-scoped `Database` via `db_pool.for_tenant_id(...)`. Internal
orchestrators (PipelineOrchestrator, PentestOrchestrator) and pipeline
helpers were already DB-agnostic — they take `db: Database` at
construction and don't care which tenant it points to.

Changes
- DatabasePool::for_tenant_id(&str) — same as for_tenant but accepts
  a bare tenant_id. Background paths don't have a full TenantContext.
  for_tenant is now a thin wrapper that delegates.
- agent.run_scan(tenant_id, repo_id, trigger) — pulls the tenant
  database before constructing the PipelineOrchestrator. Was:
  run_scan(repo_id, trigger) reading agent.db.
- agent.run_pr_review(tenant_id, repo_id, ...) — same shape.
- Webhook routes change: /webhook/{tenant_id}/{platform}/{repo_id}.
  Tenant is part of the URL path because webhooks arrive without a
  JWT — they're authenticated via per-repo HMAC, not the tenant gate.
  The dashboard surfaces the full per-tenant URL when the repo is
  registered. All three handlers (gitea, github, gitlab) updated.
- scheduler.rs — iterates tenants from $SCHEDULER_TENANT_IDS
  (comma-separated env), or DEV_TENANT_ID's `dev` default. Both
  scan_all_repos and monitor_cves now run once per configured
  tenant. M7.2-D will replace this static config with a pull from
  the tenant-registry.
- api/handlers/repos.rs::trigger_scan now passes tenant.0.tenant_id.

What's unchanged because it didn't need to change
- PipelineOrchestrator, PentestOrchestrator: take `db: Database` at
  construction — they're tenant-DB-agnostic by design. The caller
  picks the tenant DB.
- pipeline/{dedup,graph_build,issue_creation,sbom/mod}.rs,
  pentest/{context,report/html/*}.rs, trackers/jira.rs, llm/triage.rs:
  take `&Database` or `&mongodb::Database` as args, transitively
  tenant-scoped via the caller.

Test plan
- cargo fmt --all clean
- cargo clippy --workspace --exclude compliance-dashboard
  -- -D warnings clean
- cargo test -p compliance-core --lib — 7 pass
- cargo test -p compliance-agent --lib — 228 pass
- cargo test -p compliance-agent --test tenant_isolation — 5 pass
- cargo test -p compliance-agent --test tenant_status_middleware
  — 6 pass

What's left (PR-D)
- Drop the transitional agent.db field — no remaining call sites
  (verified by `grep -rn "agent\.db\b" compliance-agent/src`).
- main.rs / TestServer stop building the legacy Database; only the
  pool remains.
- Add cross-tenant admin helpers (list tenants, drop tenant DB) on
  the pool for offboarding flows.
- Pull tenants from the tenant-registry instead of an env var.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-17 15:00:37 +02:00
Sharang Parnerkar cdfbb62f9d feat(m7.2-B): migrate API handlers to per-tenant database pool
CI / Check (pull_request) Successful in 8m9s
CI / Detect Changes (pull_request) Has been skipped
CI / Deploy Agent (pull_request) Has been skipped
CI / Deploy Dashboard (pull_request) Has been skipped
CI / Deploy Docs (pull_request) Has been skipped
CI / Deploy MCP (pull_request) Has been skipped
Builds on PR M7.2-A. Every HTTP handler in compliance-agent/src/api/
now takes a TenantCtx extractor and pulls a tenant-scoped Database
from agent.db_pool.for_tenant(&ctx). The query bodies are unchanged —
`db.findings().find(doc! {...})` reads from the tenant's own physical
database, so the filter doc cannot leak data across tenants because
the wrong tenant's data is literally on a different db handle.

Changes
- New `dto::tenant_db(&agent, &tenant) -> Result<Database, StatusCode>`
  helper. Every migrated handler calls it at the top of the body
  instead of `let db = &agent.db;`. 500 on the rare pool failure;
  4xx auth failures are already handled by the M7.1 status gate.
- New `api::server::inject_dev_tenant` middleware mounted only when
  Keycloak is NOT configured. Synthesizes a TenantContext with
  tenant_id = $DEV_TENANT_ID (default `dev`) so `cargo run` against
  a bare Mongo + no KC still serves the API. Logged loudly as
  "DO NOT use in any environment with real customer data".
- Test harness: TestServer mounts inject_dev_tenant so existing E2E
  tests reach handlers; cleanup() now drops every <db_name>_*
  per-tenant database, not just the legacy <db_name>.

Files migrated (handler count, all pass `cargo build`):
- chat.rs (3) — also rewires RagPipeline + EmbeddingStore to the
  tenant DB's inner() so vector search is per-tenant
- dast.rs (5)
- findings.rs (5)
- graph.rs (7) — also rewires GraphStore inside trigger_build's
  spawn to the tenant DB
- health.rs (1) — stats_overview migrated; public /health stays
  un-scoped
- issues.rs (1)
- notifications.rs (5)
- pentest_handlers/session.rs (12) — both wizard + legacy paths,
  plus pause/resume/stop/get_attack_chain/get_messages/
  get_session_findings/lookup_repo. PentestOrchestrator now gets
  the tenant DB clone in its spawn.
- pentest_handlers/export.rs (1) — fans out across sessions,
  attack_chain_nodes, dast_findings, findings, sbom_entries,
  graph_nodes from a single tenant_db acquisition
- pentest_handlers/stats.rs (1)
- pentest_handlers/stream.rs (1) — SSE handler verifies session
  via the tenant DB before subscribing
- repos.rs (6)
- sbom.rs (5)
- scans.rs (1)

help_chat.rs has no DB queries and was skipped.

Test plan
- cargo fmt --all clean
- cargo clippy --workspace --exclude compliance-dashboard
  -- -D warnings clean
- cargo test -p compliance-core --lib — 7 pass
- cargo test -p compliance-agent --lib — 228 pass
- cargo test -p compliance-agent --test tenant_isolation — 5 pass
  (driver-level isolation still holds post-handler migration)
- cargo test -p compliance-agent --test tenant_status_middleware
  — 6 pass

What's not yet migrated (PR-C / PR-D)
- scheduler.rs (6 sites), pipeline/orchestrator.rs (14),
  pentest/orchestrator.rs (13), webhooks (gitea/github/gitlab),
  trackers/jira.rs, pipeline/dedup.rs etc. — background paths
  without a JWT-derived tenant context.
- agent.db is still in the ComplianceAgent struct as a transitional
  handle for those paths. PR-D removes it once PR-C migrates the
  background paths.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-17 13:28:33 +02:00
Sharang Parnerkar 003835764e fixup(m7.2-A): validate db_prefix at connect, bump hash to 16 bytes
CI / Check (pull_request) Successful in 8m29s
CI / Detect Changes (pull_request) Has been skipped
CI / Deploy Agent (pull_request) Has been skipped
CI / Deploy Dashboard (pull_request) Has been skipped
CI / Deploy Docs (pull_request) Has been skipped
CI / Deploy MCP (pull_request) Has been skipped
Addresses review feedback on the hash-fallback path.

The original `debug_assert!(hashed.len() <= MAX_DB_NAME_LEN)` was a
runtime hack that vanished in release builds. With an 8-byte hash
truncation (~2^32 birthday-collision resistance), two tenant_ids
hashing to the same suffix would silently share a database — no
panic, no rollback, just cross-tenant data leak. Not acceptable for
a regulated-industry product.

Changes:
- Bump hash truncation 8 → 16 bytes (32 hex chars). 2^64 birthday
  resistance — collision-impossible at our scale.
- Add MAX_PREFIX_LEN (= 30) and validate db_prefix.len() at
  `DatabasePool::connect`. The runtime hash-fallback arithmetic is
  now provably within Mongo's 63-byte cap; drop the debug_assert!.
- New test `connect_rejects_overlong_db_prefix` exercises the
  inclusive bound (30 passes, 31 fails).
- Existing hash-fallback test now asserts a 32-char hex suffix +
  basic distinctness for two different inputs.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-17 13:16:46 +02:00
Sharang Parnerkar e3aabe7d18 feat(m7.2-A): introduce per-tenant DatabasePool
CI / Check (pull_request) Successful in 8m40s
CI / Detect Changes (pull_request) Has been skipped
CI / Deploy Agent (pull_request) Has been skipped
CI / Deploy Dashboard (pull_request) Has been skipped
CI / Deploy Docs (pull_request) Has been skipped
CI / Deploy MCP (pull_request) Has been skipped
First slice of the M7.2 tenant-isolation work. Adds a `DatabasePool`
that hands out per-tenant `Database` handles physically scoped to
`<prefix>_<tenant_id>` Mongo databases. Isolation is at the driver,
not at "we hope we filter" — a handle for tenant A literally cannot
see tenant B's documents because it's connected to a different db.

What's in this PR
- DatabasePool::connect — pings the cluster, prepares per-tenant lazy
  handles.
- DatabasePool::for_tenant(&TenantContext) — returns a Database scoped
  to that tenant. ensure_indexes runs once per tenant per process via
  a DashMap-backed marker; failure rolls the marker back so the next
  request retries.
- tenant_db_name — `<prefix>_<sanitized_tenant_id>` if it fits in
  Mongo's 63-byte db-name cap, else `<prefix>_<sha256-16hex>` fallback.
- Sanitizer rewrites the Mongo-disallowed chars (`/ \ . " $ <space>
  NUL`) so any future tenant_id shape works.
- ComplianceAgent gains a `db_pool: DatabasePool` field next to the
  existing `db: Database`. Handlers / pipelines / webhooks still use
  `db` — they migrate to `db_pool.for_tenant(&ctx)` in M7.2-B/C and
  `db` goes away in M7.2-D.

Test plan
- cargo fmt --all clean
- cargo clippy --workspace --exclude compliance-dashboard -- -D warnings
  clean
- cargo test -p compliance-core --lib — 7 pass
- cargo test -p compliance-agent --lib — 228 pass
- cargo test -p compliance-agent --test tenant_isolation — 4 pass
  against live mongo on 27017:
    * pool_isolates_tenants_at_driver_level — writes for acme + globex,
      reads through each tenant's handle; each sees exactly its own
      data with no filter doc anywhere.
    * for_tenant_is_idempotent_index_creation — second + third call
      for the same tenant do not error.
    * tenant_db_name_sanitizes_unsafe_characters
    * tenant_db_name_falls_back_to_hash_when_too_long — 100-byte
      tenant_id collapses to a stable 8-byte hex suffix.

Why per-tenant DB vs `tenant_id` field + filter
- Driver-level isolation; impossible to forget the filter on one of
  the 184 query call-sites in compliance-agent.
- Handlers don't change shape at migration — `agent.db.findings()`
  becomes `db.findings()` after pulling `db` from
  `agent.db_pool.for_tenant(&ctx)`.
- GDPR delete = `db.dropDatabase()`.
- On-prem deploy = the same code path, with one tenant.
- Trade-off accepted: index storage duplicated per tenant; Mongo's
  ~thousand-db ceiling is way above the 10s-100s tenants we're
  targeting.

Caveats
- Existing `agent.db` continues to point at the single legacy db.
  Handlers / pipelines that use it are unscoped until M7.2-B/C
  migrate them.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-17 11:58:24 +02:00
sharang 183234f9af feat(m7.1): wire compliance-agent to compliance-core auth + status gate (#85)
CI / Check (push) Has been skipped
CI / Detect Changes (push) Successful in 5s
CI / Deploy Agent (push) Successful in 8m38s
CI / Deploy Dashboard (push) Successful in 7m30s
CI / Deploy Docs (push) Has been skipped
CI / Deploy MCP (push) Successful in 1m55s
2026-06-17 09:36:52 +00:00
39 changed files with 1422 additions and 751 deletions
+16 -18
View File
@@ -6,7 +6,7 @@ use tokio::sync::{broadcast, watch, Semaphore};
use compliance_core::models::pentest::PentestEvent; use compliance_core::models::pentest::PentestEvent;
use compliance_core::AgentConfig; use compliance_core::AgentConfig;
use crate::database::Database; use crate::database::DatabasePool;
use crate::llm::LlmClient; use crate::llm::LlmClient;
use crate::pipeline::orchestrator::PipelineOrchestrator; use crate::pipeline::orchestrator::PipelineOrchestrator;
@@ -16,7 +16,10 @@ const DEFAULT_MAX_CONCURRENT_SESSIONS: usize = 5;
#[derive(Clone)] #[derive(Clone)]
pub struct ComplianceAgent { pub struct ComplianceAgent {
pub config: AgentConfig, pub config: AgentConfig,
pub db: Database, /// Per-tenant Mongo broker. Every code path must obtain a
/// tenant-scoped [`crate::database::Database`] from this pool —
/// there is no single shared database any more.
pub db_pool: DatabasePool,
pub llm: Arc<LlmClient>, pub llm: Arc<LlmClient>,
pub http: reqwest::Client, pub http: reqwest::Client,
/// Per-session broadcast senders for SSE streaming. /// Per-session broadcast senders for SSE streaming.
@@ -28,7 +31,7 @@ pub struct ComplianceAgent {
} }
impl ComplianceAgent { impl ComplianceAgent {
pub fn new(config: AgentConfig, db: Database) -> Self { pub fn new(config: AgentConfig, db_pool: DatabasePool) -> Self {
let llm = Arc::new(LlmClient::new( let llm = Arc::new(LlmClient::new(
config.litellm_url.clone(), config.litellm_url.clone(),
config.litellm_api_key.clone(), config.litellm_api_key.clone(),
@@ -42,7 +45,7 @@ impl ComplianceAgent {
.unwrap_or_default(); .unwrap_or_default();
Self { Self {
config, config,
db, db_pool,
llm, llm,
http, http,
session_streams: Arc::new(DashMap::new()), session_streams: Arc::new(DashMap::new()),
@@ -53,28 +56,27 @@ impl ComplianceAgent {
pub async fn run_scan( pub async fn run_scan(
&self, &self,
tenant_id: &str,
repo_id: &str, repo_id: &str,
trigger: compliance_core::models::ScanTrigger, trigger: compliance_core::models::ScanTrigger,
) -> Result<(), crate::error::AgentError> { ) -> Result<(), crate::error::AgentError> {
let orchestrator = PipelineOrchestrator::new( let db = self.db_pool.for_tenant_id(tenant_id).await?;
self.config.clone(), let orchestrator =
self.db.clone(), PipelineOrchestrator::new(self.config.clone(), db, self.llm.clone(), self.http.clone());
self.llm.clone(),
self.http.clone(),
);
orchestrator.run(repo_id, trigger).await orchestrator.run(repo_id, trigger).await
} }
/// Run a PR review: scan the diff and post review comments. /// Run a PR review: scan the diff and post review comments.
pub async fn run_pr_review( pub async fn run_pr_review(
&self, &self,
tenant_id: &str,
repo_id: &str, repo_id: &str,
pr_number: u64, pr_number: u64,
base_sha: &str, base_sha: &str,
head_sha: &str, head_sha: &str,
) -> Result<(), crate::error::AgentError> { ) -> Result<(), crate::error::AgentError> {
let repo = self let db = self.db_pool.for_tenant_id(tenant_id).await?;
.db let repo = db
.repositories() .repositories()
.find_one(mongodb::bson::doc! { .find_one(mongodb::bson::doc! {
"_id": mongodb::bson::oid::ObjectId::parse_str(repo_id) "_id": mongodb::bson::oid::ObjectId::parse_str(repo_id)
@@ -85,12 +87,8 @@ impl ComplianceAgent {
crate::error::AgentError::Other(format!("Repository {repo_id} not found")) crate::error::AgentError::Other(format!("Repository {repo_id} not found"))
})?; })?;
let orchestrator = PipelineOrchestrator::new( let orchestrator =
self.config.clone(), PipelineOrchestrator::new(self.config.clone(), db, self.llm.clone(), self.http.clone());
self.db.clone(),
self.llm.clone(),
self.http.clone(),
);
orchestrator orchestrator
.run_pr_review(&repo, repo_id, pr_number, base_sha, head_sha) .run_pr_review(&repo, repo_id, pr_number, base_sha, head_sha)
.await .await
+30 -26
View File
@@ -7,11 +7,13 @@ use mongodb::bson::doc;
use compliance_core::models::chat::{ChatRequest, ChatResponse, SourceReference}; use compliance_core::models::chat::{ChatRequest, ChatResponse, SourceReference};
use compliance_core::models::embedding::EmbeddingBuildRun; use compliance_core::models::embedding::EmbeddingBuildRun;
use compliance_core::tenant_ctx::TenantCtx;
use compliance_graph::graph::embedding_store::EmbeddingStore; use compliance_graph::graph::embedding_store::EmbeddingStore;
use crate::agent::ComplianceAgent; use crate::agent::ComplianceAgent;
use crate::rag::pipeline::RagPipeline; use crate::rag::pipeline::RagPipeline;
use super::dto::tenant_db;
use super::ApiResponse; use super::ApiResponse;
type AgentExt = Extension<Arc<ComplianceAgent>>; type AgentExt = Extension<Arc<ComplianceAgent>>;
@@ -20,10 +22,12 @@ type AgentExt = Extension<Arc<ComplianceAgent>>;
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))] #[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn chat( pub async fn chat(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>, Path(repo_id): Path<String>,
Json(req): Json<ChatRequest>, Json(req): Json<ChatRequest>,
) -> Result<Json<ApiResponse<ChatResponse>>, StatusCode> { ) -> Result<Json<ApiResponse<ChatResponse>>, StatusCode> {
let pipeline = RagPipeline::new(agent.llm.clone(), agent.db.inner()); let db = tenant_db(&agent, &tenant).await?;
let pipeline = RagPipeline::new(agent.llm.clone(), db.inner());
// Step 1: Embed the user's message // Step 1: Embed the user's message
let query_vectors = agent let query_vectors = agent
@@ -133,12 +137,15 @@ pub async fn chat(
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))] #[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn build_embeddings( pub async fn build_embeddings(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>, Path(repo_id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> { ) -> Result<Json<serde_json::Value>, StatusCode> {
// Resolve the tenant DB up front so we can move it into the spawn;
// the JWT/dev context isn't available inside detached tasks.
let db = tenant_db(&agent, &tenant).await?;
let agent_clone = (*agent).clone(); let agent_clone = (*agent).clone();
tokio::spawn(async move { tokio::spawn(async move {
let repo = match agent_clone let repo = match db
.db
.repositories() .repositories()
.find_one(doc! { "_id": mongodb::bson::oid::ObjectId::parse_str(&repo_id).ok() }) .find_one(doc! { "_id": mongodb::bson::oid::ObjectId::parse_str(&repo_id).ok() })
.await .await
@@ -151,8 +158,7 @@ pub async fn build_embeddings(
}; };
// Get latest graph build // Get latest graph build
let build = match agent_clone let build = match db
.db
.graph_builds() .graph_builds()
.find_one(doc! { "repo_id": &repo_id }) .find_one(doc! { "repo_id": &repo_id })
.sort(doc! { "started_at": -1 }) .sort(doc! { "started_at": -1 })
@@ -171,26 +177,22 @@ pub async fn build_embeddings(
.unwrap_or_else(|| "unknown".to_string()); .unwrap_or_else(|| "unknown".to_string());
// Get nodes // Get nodes
let nodes: Vec<compliance_core::models::graph::CodeNode> = match agent_clone let nodes: Vec<compliance_core::models::graph::CodeNode> =
.db match db.graph_nodes().find(doc! { "repo_id": &repo_id }).await {
.graph_nodes() Ok(cursor) => {
.find(doc! { "repo_id": &repo_id }) use futures_util::StreamExt;
.await let mut items = Vec::new();
{ let mut cursor = cursor;
Ok(cursor) => { while let Some(Ok(item)) = cursor.next().await {
use futures_util::StreamExt; items.push(item);
let mut items = Vec::new(); }
let mut cursor = cursor; items
while let Some(Ok(item)) = cursor.next().await {
items.push(item);
} }
items Err(e) => {
} tracing::error!("[{repo_id}] Failed to fetch nodes: {e}");
Err(e) => { return;
tracing::error!("[{repo_id}] Failed to fetch nodes: {e}"); }
return; };
}
};
let creds = crate::pipeline::git::RepoCredentials { let creds = crate::pipeline::git::RepoCredentials {
ssh_key_path: Some(agent_clone.config.ssh_key_path.clone()), ssh_key_path: Some(agent_clone.config.ssh_key_path.clone()),
@@ -207,7 +209,7 @@ pub async fn build_embeddings(
} }
}; };
let pipeline = RagPipeline::new(agent_clone.llm.clone(), agent_clone.db.inner()); let pipeline = RagPipeline::new(agent_clone.llm.clone(), db.inner());
match pipeline match pipeline
.build_embeddings(&repo_id, &repo_path, &graph_build_id, &nodes) .build_embeddings(&repo_id, &repo_path, &graph_build_id, &nodes)
.await .await
@@ -234,9 +236,11 @@ pub async fn build_embeddings(
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))] #[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn embedding_status( pub async fn embedding_status(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>, Path(repo_id): Path<String>,
) -> Result<Json<ApiResponse<Option<EmbeddingBuildRun>>>, StatusCode> { ) -> Result<Json<ApiResponse<Option<EmbeddingBuildRun>>>, StatusCode> {
let store = EmbeddingStore::new(agent.db.inner()); let db = tenant_db(&agent, &tenant).await?;
let store = EmbeddingStore::new(db.inner());
let build = store.get_latest_build(&repo_id).await.map_err(|e| { let build = store.get_latest_build(&repo_id).await.map_err(|e| {
tracing::error!("Failed to get embedding status: {e}"); tracing::error!("Failed to get embedding status: {e}");
StatusCode::INTERNAL_SERVER_ERROR StatusCode::INTERNAL_SERVER_ERROR
+20 -11
View File
@@ -7,9 +7,11 @@ use mongodb::bson::doc;
use serde::Deserialize; use serde::Deserialize;
use compliance_core::models::dast::{DastFinding, DastScanRun, DastTarget, DastTargetType}; use compliance_core::models::dast::{DastFinding, DastScanRun, DastTarget, DastTargetType};
use compliance_core::tenant_ctx::TenantCtx;
use crate::agent::ComplianceAgent; use crate::agent::ComplianceAgent;
use super::dto::tenant_db;
use super::{collect_cursor_async, ApiResponse, PaginationParams}; use super::{collect_cursor_async, ApiResponse, PaginationParams};
type AgentExt = Extension<Arc<ComplianceAgent>>; type AgentExt = Extension<Arc<ComplianceAgent>>;
@@ -45,9 +47,11 @@ fn default_rate_limit() -> u32 {
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn list_targets( pub async fn list_targets(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<PaginationParams>, Query(params): Query<PaginationParams>,
) -> Result<Json<ApiResponse<Vec<DastTarget>>>, StatusCode> { ) -> Result<Json<ApiResponse<Vec<DastTarget>>>, StatusCode> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64; let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db let total = db
.dast_targets() .dast_targets()
@@ -80,6 +84,7 @@ pub async fn list_targets(
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn add_target( pub async fn add_target(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Json(req): Json<AddTargetRequest>, Json(req): Json<AddTargetRequest>,
) -> Result<Json<ApiResponse<DastTarget>>, StatusCode> { ) -> Result<Json<ApiResponse<DastTarget>>, StatusCode> {
let mut target = DastTarget::new(req.name, req.base_url, req.target_type); let mut target = DastTarget::new(req.name, req.base_url, req.target_type);
@@ -89,9 +94,8 @@ pub async fn add_target(
target.rate_limit = req.rate_limit; target.rate_limit = req.rate_limit;
target.allow_destructive = req.allow_destructive; target.allow_destructive = req.allow_destructive;
agent let db = tenant_db(&agent, &tenant).await?;
.db db.dast_targets()
.dast_targets()
.insert_one(&target) .insert_one(&target)
.await .await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
@@ -107,19 +111,19 @@ pub async fn add_target(
#[tracing::instrument(skip_all, fields(target_id = %id))] #[tracing::instrument(skip_all, fields(target_id = %id))]
pub async fn trigger_scan( pub async fn trigger_scan(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> { ) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?; let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let target = agent let target = db
.db
.dast_targets() .dast_targets()
.find_one(doc! { "_id": oid }) .find_one(doc! { "_id": oid })
.await .await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.ok_or(StatusCode::NOT_FOUND)?; .ok_or(StatusCode::NOT_FOUND)?;
let db = agent.db.clone();
tokio::spawn(async move { tokio::spawn(async move {
let orchestrator = compliance_dast::DastOrchestrator::new(100); let orchestrator = compliance_dast::DastOrchestrator::new(100);
match orchestrator.run_scan(&target, Vec::new()).await { match orchestrator.run_scan(&target, Vec::new()).await {
@@ -147,9 +151,11 @@ pub async fn trigger_scan(
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn list_scan_runs( pub async fn list_scan_runs(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<PaginationParams>, Query(params): Query<PaginationParams>,
) -> Result<Json<ApiResponse<Vec<DastScanRun>>>, StatusCode> { ) -> Result<Json<ApiResponse<Vec<DastScanRun>>>, StatusCode> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64; let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db let total = db
.dast_scan_runs() .dast_scan_runs()
@@ -183,9 +189,11 @@ pub async fn list_scan_runs(
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn list_findings( pub async fn list_findings(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<PaginationParams>, Query(params): Query<PaginationParams>,
) -> Result<Json<ApiResponse<Vec<DastFinding>>>, StatusCode> { ) -> Result<Json<ApiResponse<Vec<DastFinding>>>, StatusCode> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64; let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db let total = db
.dast_findings() .dast_findings()
@@ -219,12 +227,13 @@ pub async fn list_findings(
#[tracing::instrument(skip_all, fields(finding_id = %id))] #[tracing::instrument(skip_all, fields(finding_id = %id))]
pub async fn get_finding( pub async fn get_finding(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
) -> Result<Json<ApiResponse<DastFinding>>, StatusCode> { ) -> Result<Json<ApiResponse<DastFinding>>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?; let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let finding = agent let finding = db
.db
.dast_findings() .dast_findings()
.find_one(doc! { "_id": oid }) .find_one(doc! { "_id": oid })
.await .await
+21
View File
@@ -180,6 +180,27 @@ pub struct SbomVersionDiff {
pub(crate) type AgentExt = axum::extract::Extension<std::sync::Arc<crate::agent::ComplianceAgent>>; pub(crate) type AgentExt = axum::extract::Extension<std::sync::Arc<crate::agent::ComplianceAgent>>;
pub(crate) type ApiResult<T> = Result<axum::Json<ApiResponse<T>>, axum::http::StatusCode>; pub(crate) type ApiResult<T> = Result<axum::Json<ApiResponse<T>>, axum::http::StatusCode>;
/// Resolve a tenant-scoped [`Database`] from the request's
/// [`TenantContext`] (inserted by the M7.1 JWT middleware, or by the
/// dev fallback in unsecured environments). The pool ensures the
/// tenant's indexes idempotently.
///
/// Returns 500 on the rare path where Mongo refuses the database
/// handle — the M7.1 auth/status middleware already rejects every
/// other failure mode with 4xx before we get here.
pub(crate) async fn tenant_db(
agent: &crate::agent::ComplianceAgent,
tenant: &compliance_core::tenant_ctx::TenantCtx,
) -> Result<crate::database::Database, axum::http::StatusCode> {
agent.db_pool.for_tenant(&tenant.0).await.map_err(|e| {
tracing::error!(
tenant_id = %tenant.0.tenant_id,
"Failed to acquire tenant database: {e}"
);
axum::http::StatusCode::INTERNAL_SERVER_ERROR
})
}
pub(crate) async fn collect_cursor_async<T: serde::de::DeserializeOwned + Unpin + Send>( pub(crate) async fn collect_cursor_async<T: serde::de::DeserializeOwned + Unpin + Send>(
mut cursor: mongodb::Cursor<T>, mut cursor: mongodb::Cursor<T>,
) -> Vec<T> { ) -> Vec<T> {
+16 -11
View File
@@ -5,13 +5,16 @@ use mongodb::bson::doc;
use super::dto::*; use super::dto::*;
use compliance_core::models::Finding; use compliance_core::models::Finding;
use compliance_core::tenant_ctx::TenantCtx;
#[tracing::instrument(skip_all, fields(repo_id = ?filter.repo_id, severity = ?filter.severity, scan_type = ?filter.scan_type))] #[tracing::instrument(skip_all, fields(repo_id = ?filter.repo_id, severity = ?filter.severity, scan_type = ?filter.scan_type))]
pub async fn list_findings( pub async fn list_findings(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Query(filter): Query<FindingsFilter>, Query(filter): Query<FindingsFilter>,
) -> ApiResult<Vec<Finding>> { ) -> ApiResult<Vec<Finding>> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let mut query = doc! {}; let mut query = doc! {};
if let Some(repo_id) = &filter.repo_id { if let Some(repo_id) = &filter.repo_id {
query.insert("repo_id", repo_id); query.insert("repo_id", repo_id);
@@ -81,11 +84,12 @@ pub async fn list_findings(
#[tracing::instrument(skip_all, fields(finding_id = %id))] #[tracing::instrument(skip_all, fields(finding_id = %id))]
pub async fn get_finding( pub async fn get_finding(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
) -> Result<Json<ApiResponse<Finding>>, StatusCode> { ) -> Result<Json<ApiResponse<Finding>>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?; let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let finding = agent let db = tenant_db(&agent, &tenant).await?;
.db let finding = db
.findings() .findings()
.find_one(doc! { "_id": oid }) .find_one(doc! { "_id": oid })
.await .await
@@ -102,14 +106,14 @@ pub async fn get_finding(
#[tracing::instrument(skip_all, fields(finding_id = %id))] #[tracing::instrument(skip_all, fields(finding_id = %id))]
pub async fn update_finding_status( pub async fn update_finding_status(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
Json(req): Json<UpdateStatusRequest>, Json(req): Json<UpdateStatusRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> { ) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?; let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
agent db.findings()
.db
.findings()
.update_one( .update_one(
doc! { "_id": oid }, doc! { "_id": oid },
doc! { "$set": { "status": &req.status, "updated_at": mongodb::bson::DateTime::now() } }, doc! { "$set": { "status": &req.status, "updated_at": mongodb::bson::DateTime::now() } },
@@ -123,6 +127,7 @@ pub async fn update_finding_status(
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn bulk_update_finding_status( pub async fn bulk_update_finding_status(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Json(req): Json<BulkUpdateStatusRequest>, Json(req): Json<BulkUpdateStatusRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> { ) -> Result<Json<serde_json::Value>, StatusCode> {
let oids: Vec<mongodb::bson::oid::ObjectId> = req let oids: Vec<mongodb::bson::oid::ObjectId> = req
@@ -135,8 +140,8 @@ pub async fn bulk_update_finding_status(
return Err(StatusCode::BAD_REQUEST); return Err(StatusCode::BAD_REQUEST);
} }
let result = agent let db = tenant_db(&agent, &tenant).await?;
.db let result = db
.findings() .findings()
.update_many( .update_many(
doc! { "_id": { "$in": oids } }, doc! { "_id": { "$in": oids } },
@@ -153,14 +158,14 @@ pub async fn bulk_update_finding_status(
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn update_finding_feedback( pub async fn update_finding_feedback(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
Json(req): Json<UpdateFeedbackRequest>, Json(req): Json<UpdateFeedbackRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> { ) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?; let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
agent db.findings()
.db
.findings()
.update_one( .update_one(
doc! { "_id": oid }, doc! { "_id": oid },
doc! { "$set": { "developer_feedback": &req.feedback, "updated_at": mongodb::bson::DateTime::now() } }, doc! { "$set": { "developer_feedback": &req.feedback, "updated_at": mongodb::bson::DateTime::now() } },
+24 -10
View File
@@ -7,9 +7,11 @@ use mongodb::bson::doc;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use compliance_core::models::graph::{CodeEdge, CodeNode, GraphBuildRun, ImpactAnalysis}; use compliance_core::models::graph::{CodeEdge, CodeNode, GraphBuildRun, ImpactAnalysis};
use compliance_core::tenant_ctx::TenantCtx;
use crate::agent::ComplianceAgent; use crate::agent::ComplianceAgent;
use super::dto::tenant_db;
use super::{collect_cursor_async, ApiResponse}; use super::{collect_cursor_async, ApiResponse};
type AgentExt = Extension<Arc<ComplianceAgent>>; type AgentExt = Extension<Arc<ComplianceAgent>>;
@@ -36,9 +38,11 @@ fn default_search_limit() -> usize {
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))] #[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn get_graph( pub async fn get_graph(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>, Path(repo_id): Path<String>,
) -> Result<Json<ApiResponse<GraphData>>, StatusCode> { ) -> Result<Json<ApiResponse<GraphData>>, StatusCode> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
// Get latest build // Get latest build
let build: Option<GraphBuildRun> = db let build: Option<GraphBuildRun> = db
@@ -98,9 +102,11 @@ pub async fn get_graph(
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))] #[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn get_nodes( pub async fn get_nodes(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>, Path(repo_id): Path<String>,
) -> Result<Json<ApiResponse<Vec<CodeNode>>>, StatusCode> { ) -> Result<Json<ApiResponse<Vec<CodeNode>>>, StatusCode> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let filter = doc! { "repo_id": &repo_id }; let filter = doc! { "repo_id": &repo_id };
let nodes: Vec<CodeNode> = match db.graph_nodes().find(filter).await { let nodes: Vec<CodeNode> = match db.graph_nodes().find(filter).await {
@@ -123,9 +129,11 @@ pub async fn get_nodes(
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))] #[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn get_communities( pub async fn get_communities(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>, Path(repo_id): Path<String>,
) -> Result<Json<ApiResponse<Vec<CommunityInfo>>>, StatusCode> { ) -> Result<Json<ApiResponse<Vec<CommunityInfo>>>, StatusCode> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let filter = doc! { "repo_id": &repo_id }; let filter = doc! { "repo_id": &repo_id };
let nodes: Vec<CodeNode> = match db.graph_nodes().find(filter).await { let nodes: Vec<CodeNode> = match db.graph_nodes().find(filter).await {
@@ -176,9 +184,11 @@ pub struct CommunityInfo {
#[tracing::instrument(skip_all, fields(repo_id = %repo_id, finding_id = %finding_id))] #[tracing::instrument(skip_all, fields(repo_id = %repo_id, finding_id = %finding_id))]
pub async fn get_impact( pub async fn get_impact(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path((repo_id, finding_id)): Path<(String, String)>, Path((repo_id, finding_id)): Path<(String, String)>,
) -> Result<Json<ApiResponse<Option<ImpactAnalysis>>>, StatusCode> { ) -> Result<Json<ApiResponse<Option<ImpactAnalysis>>>, StatusCode> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let filter = doc! { "repo_id": &repo_id, "finding_id": &finding_id }; let filter = doc! { "repo_id": &repo_id, "finding_id": &finding_id };
let impact = db let impact = db
@@ -198,10 +208,12 @@ pub async fn get_impact(
#[tracing::instrument(skip_all, fields(repo_id = %repo_id, query = %params.q))] #[tracing::instrument(skip_all, fields(repo_id = %repo_id, query = %params.q))]
pub async fn search_symbols( pub async fn search_symbols(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>, Path(repo_id): Path<String>,
Query(params): Query<SearchParams>, Query(params): Query<SearchParams>,
) -> Result<Json<ApiResponse<Vec<CodeNode>>>, StatusCode> { ) -> Result<Json<ApiResponse<Vec<CodeNode>>>, StatusCode> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
// Simple text search on qualified_name and name fields // Simple text search on qualified_name and name fields
let filter = doc! { let filter = doc! {
@@ -234,10 +246,12 @@ pub async fn search_symbols(
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))] #[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn get_file_content( pub async fn get_file_content(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>, Path(repo_id): Path<String>,
Query(params): Query<FileContentParams>, Query(params): Query<FileContentParams>,
) -> Result<Json<ApiResponse<FileContent>>, StatusCode> { ) -> Result<Json<ApiResponse<FileContent>>, StatusCode> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
// Look up the repository to get repo name // Look up the repository to get repo name
let repo = db let repo = db
@@ -296,12 +310,13 @@ pub struct FileContent {
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))] #[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn trigger_build( pub async fn trigger_build(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>, Path(repo_id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> { ) -> Result<Json<serde_json::Value>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let agent_clone = (*agent).clone(); let agent_clone = (*agent).clone();
tokio::spawn(async move { tokio::spawn(async move {
let repo = match agent_clone let repo = match db
.db
.repositories() .repositories()
.find_one(doc! { "_id": mongodb::bson::oid::ObjectId::parse_str(&repo_id).ok() }) .find_one(doc! { "_id": mongodb::bson::oid::ObjectId::parse_str(&repo_id).ok() })
.await .await
@@ -333,8 +348,7 @@ pub async fn trigger_build(
match engine.build_graph(&repo_path, &repo_id, &graph_build_id) { match engine.build_graph(&repo_path, &repo_id, &graph_build_id) {
Ok((code_graph, build_run)) => { Ok((code_graph, build_run)) => {
let store = let store = compliance_graph::graph::persistence::GraphStore::new(db.inner());
compliance_graph::graph::persistence::GraphStore::new(agent_clone.db.inner());
let _ = store.delete_repo_graph(&repo_id).await; let _ = store.delete_repo_graph(&repo_id).await;
let _ = store let _ = store
.store_graph(&build_run, &code_graph.nodes, &code_graph.edges) .store_graph(&build_run, &code_graph.nodes, &code_graph.edges)
+7 -2
View File
@@ -3,6 +3,7 @@ use mongodb::bson::doc;
use super::dto::*; use super::dto::*;
use compliance_core::models::ScanRun; use compliance_core::models::ScanRun;
use compliance_core::tenant_ctx::TenantCtx;
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn health() -> Json<serde_json::Value> { pub async fn health() -> Json<serde_json::Value> {
@@ -10,8 +11,12 @@ pub async fn health() -> Json<serde_json::Value> {
} }
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn stats_overview(axum::extract::Extension(agent): AgentExt) -> ApiResult<OverviewStats> { pub async fn stats_overview(
let db = &agent.db; axum::extract::Extension(agent): AgentExt,
tenant: TenantCtx,
) -> ApiResult<OverviewStats> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let total_repositories = db let total_repositories = db
.repositories() .repositories()
+4 -1
View File
@@ -4,13 +4,16 @@ use mongodb::bson::doc;
use super::dto::*; use super::dto::*;
use compliance_core::models::TrackerIssue; use compliance_core::models::TrackerIssue;
use compliance_core::tenant_ctx::TenantCtx;
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn list_issues( pub async fn list_issues(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<PaginationParams>, Query(params): Query<PaginationParams>,
) -> ApiResult<Vec<TrackerIssue>> { ) -> ApiResult<Vec<TrackerIssue>> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64; let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db let total = db
.tracker_issues() .tracker_issues()
@@ -5,15 +5,18 @@ use mongodb::bson::doc;
use serde::Deserialize; use serde::Deserialize;
use compliance_core::models::notification::CveNotification; use compliance_core::models::notification::CveNotification;
use compliance_core::tenant_ctx::TenantCtx;
use super::dto::{AgentExt, ApiResponse}; use super::dto::{tenant_db, AgentExt, ApiResponse};
/// GET /api/v1/notifications — List CVE notifications (newest first) /// GET /api/v1/notifications — List CVE notifications (newest first)
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn list_notifications( pub async fn list_notifications(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
axum::extract::Query(params): axum::extract::Query<NotificationFilter>, axum::extract::Query(params): axum::extract::Query<NotificationFilter>,
) -> Result<Json<ApiResponse<Vec<CveNotification>>>, StatusCode> { ) -> Result<Json<ApiResponse<Vec<CveNotification>>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let mut filter = doc! {}; let mut filter = doc! {};
// Filter by status (default: show new + read, exclude dismissed) // Filter by status (default: show new + read, exclude dismissed)
@@ -41,15 +44,13 @@ pub async fn list_notifications(
let limit = params.limit.unwrap_or(50).min(200); let limit = params.limit.unwrap_or(50).min(200);
let skip = (page - 1) * limit as u64; let skip = (page - 1) * limit as u64;
let total = agent let total = db
.db
.cve_notifications() .cve_notifications()
.count_documents(filter.clone()) .count_documents(filter.clone())
.await .await
.unwrap_or(0); .unwrap_or(0);
let notifications: Vec<CveNotification> = match agent let notifications: Vec<CveNotification> = match db
.db
.cve_notifications() .cve_notifications()
.find(filter) .find(filter)
.sort(doc! { "created_at": -1 }) .sort(doc! { "created_at": -1 })
@@ -83,9 +84,10 @@ pub async fn list_notifications(
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn notification_count( pub async fn notification_count(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
) -> Result<Json<serde_json::Value>, StatusCode> { ) -> Result<Json<serde_json::Value>, StatusCode> {
let count = agent let db = tenant_db(&agent, &tenant).await?;
.db let count = db
.cve_notifications() .cve_notifications()
.count_documents(doc! { "status": "new" }) .count_documents(doc! { "status": "new" })
.await .await
@@ -98,12 +100,13 @@ pub async fn notification_count(
#[tracing::instrument(skip_all, fields(id = %id))] #[tracing::instrument(skip_all, fields(id = %id))]
pub async fn mark_read( pub async fn mark_read(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
axum::extract::Path(id): axum::extract::Path<String>, axum::extract::Path(id): axum::extract::Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> { ) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?; let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let result = agent let result = db
.db
.cve_notifications() .cve_notifications()
.update_one( .update_one(
doc! { "_id": oid }, doc! { "_id": oid },
@@ -125,12 +128,13 @@ pub async fn mark_read(
#[tracing::instrument(skip_all, fields(id = %id))] #[tracing::instrument(skip_all, fields(id = %id))]
pub async fn dismiss_notification( pub async fn dismiss_notification(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
axum::extract::Path(id): axum::extract::Path<String>, axum::extract::Path(id): axum::extract::Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> { ) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?; let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let result = agent let result = db
.db
.cve_notifications() .cve_notifications()
.update_one( .update_one(
doc! { "_id": oid }, doc! { "_id": oid },
@@ -149,9 +153,10 @@ pub async fn dismiss_notification(
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn mark_all_read( pub async fn mark_all_read(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
) -> Result<Json<serde_json::Value>, StatusCode> { ) -> Result<Json<serde_json::Value>, StatusCode> {
let result = agent let db = tenant_db(&agent, &tenant).await?;
.db let result = db
.cve_notifications() .cve_notifications()
.update_many( .update_many(
doc! { "status": "new" }, doc! { "status": "new" },
@@ -13,10 +13,11 @@ use compliance_core::models::dast::DastFinding;
use compliance_core::models::finding::Finding; use compliance_core::models::finding::Finding;
use compliance_core::models::pentest::*; use compliance_core::models::pentest::*;
use compliance_core::models::sbom::SbomEntry; use compliance_core::models::sbom::SbomEntry;
use compliance_core::tenant_ctx::TenantCtx;
use crate::agent::ComplianceAgent; use crate::agent::ComplianceAgent;
use super::super::dto::collect_cursor_async; use super::super::dto::{collect_cursor_async, tenant_db};
type AgentExt = Extension<Arc<ComplianceAgent>>; type AgentExt = Extension<Arc<ComplianceAgent>>;
@@ -35,11 +36,15 @@ pub struct ExportBody {
#[tracing::instrument(skip_all, fields(session_id = %id))] #[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn export_session_report( pub async fn export_session_report(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
Json(body): Json<ExportBody>, Json(body): Json<ExportBody>,
) -> Result<axum::response::Response, (StatusCode, String)> { ) -> Result<axum::response::Response, (StatusCode, String)> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id) let oid = mongodb::bson::oid::ObjectId::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid session ID".to_string()))?; .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid session ID".to_string()))?;
let db = tenant_db(&agent, &tenant)
.await
.map_err(|s| (s, "failed to acquire tenant database".to_string()))?;
if body.password.len() < 8 { if body.password.len() < 8 {
return Err(( return Err((
@@ -49,8 +54,7 @@ pub async fn export_session_report(
} }
// Fetch session // Fetch session
let session = agent let session = db
.db
.pentest_sessions() .pentest_sessions()
.find_one(doc! { "_id": oid }) .find_one(doc! { "_id": oid })
.await .await
@@ -64,9 +68,7 @@ pub async fn export_session_report(
// Resolve target name // Resolve target name
let target = if let Ok(tid) = mongodb::bson::oid::ObjectId::parse_str(&session.target_id) { let target = if let Ok(tid) = mongodb::bson::oid::ObjectId::parse_str(&session.target_id) {
agent db.dast_targets()
.db
.dast_targets()
.find_one(doc! { "_id": tid }) .find_one(doc! { "_id": tid })
.await .await
.ok() .ok()
@@ -84,8 +86,7 @@ pub async fn export_session_report(
.unwrap_or_default(); .unwrap_or_default();
// Fetch attack chain nodes // Fetch attack chain nodes
let nodes: Vec<AttackChainNode> = match agent let nodes: Vec<AttackChainNode> = match db
.db
.attack_chain_nodes() .attack_chain_nodes()
.find(doc! { "session_id": &id }) .find(doc! { "session_id": &id })
.sort(doc! { "started_at": 1 }) .sort(doc! { "started_at": 1 })
@@ -96,8 +97,7 @@ pub async fn export_session_report(
}; };
// Fetch DAST findings for this session, then deduplicate // Fetch DAST findings for this session, then deduplicate
let raw_findings: Vec<DastFinding> = match agent let raw_findings: Vec<DastFinding> = match db
.db
.dast_findings() .dast_findings()
.find(doc! { "session_id": &id }) .find(doc! { "session_id": &id })
.sort(doc! { "severity": -1, "created_at": -1 }) .sort(doc! { "severity": -1, "created_at": -1 })
@@ -122,8 +122,7 @@ pub async fn export_session_report(
.or_else(|| target.as_ref().and_then(|t| t.repo_id.clone())); .or_else(|| target.as_ref().and_then(|t| t.repo_id.clone()));
let (sast_findings, sbom_entries, code_context) = if let Some(ref rid) = repo_id { let (sast_findings, sbom_entries, code_context) = if let Some(ref rid) = repo_id {
let sast: Vec<Finding> = match agent let sast: Vec<Finding> = match db
.db
.findings() .findings()
.find(doc! { .find(doc! {
"repo_id": rid, "repo_id": rid,
@@ -143,8 +142,7 @@ pub async fn export_session_report(
Err(_) => Vec::new(), Err(_) => Vec::new(),
}; };
let sbom: Vec<SbomEntry> = match agent let sbom: Vec<SbomEntry> = match db
.db
.sbom_entries() .sbom_entries()
.find(doc! { .find(doc! {
"repo_id": rid, "repo_id": rid,
@@ -164,8 +162,7 @@ pub async fn export_session_report(
}; };
// Build code context from graph nodes // Build code context from graph nodes
let code_ctx: Vec<CodeContextHint> = match agent let code_ctx: Vec<CodeContextHint> = match db
.db
.graph_nodes() .graph_nodes()
.find(doc! { "repo_id": rid, "is_entry_point": true }) .find(doc! { "repo_id": rid, "is_entry_point": true })
.limit(50) .limit(50)
@@ -7,11 +7,12 @@ use mongodb::bson::doc;
use serde::Deserialize; use serde::Deserialize;
use compliance_core::models::pentest::*; use compliance_core::models::pentest::*;
use compliance_core::tenant_ctx::TenantCtx;
use crate::agent::ComplianceAgent; use crate::agent::ComplianceAgent;
use crate::pentest::PentestOrchestrator; use crate::pentest::PentestOrchestrator;
use super::super::dto::{collect_cursor_async, ApiResponse, PaginationParams}; use super::super::dto::{collect_cursor_async, tenant_db, ApiResponse, PaginationParams};
type AgentExt = Extension<Arc<ComplianceAgent>>; type AgentExt = Extension<Arc<ComplianceAgent>>;
@@ -43,6 +44,7 @@ pub struct LookupRepoQuery {
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn create_session( pub async fn create_session(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Json(req): Json<CreateSessionRequest>, Json(req): Json<CreateSessionRequest>,
) -> Result<Json<ApiResponse<PentestSession>>, (StatusCode, String)> { ) -> Result<Json<ApiResponse<PentestSession>>, (StatusCode, String)> {
// Try to acquire a concurrency permit // Try to acquire a concurrency permit
@@ -57,6 +59,10 @@ pub async fn create_session(
) )
})?; })?;
let db = tenant_db(&agent, &tenant)
.await
.map_err(|s| (s, "failed to acquire tenant database".to_string()))?;
if let Some(ref config) = req.config { if let Some(ref config) = req.config {
// ── Wizard path ────────────────────────────────────────────── // ── Wizard path ──────────────────────────────────────────────
if !config.disclaimer_accepted { if !config.disclaimer_accepted {
@@ -67,8 +73,7 @@ pub async fn create_session(
} }
// Look up or auto-create DastTarget by app_url // Look up or auto-create DastTarget by app_url
let target = match agent let target = match db
.db
.dast_targets() .dast_targets()
.find_one(doc! { "base_url": &config.app_url }) .find_one(doc! { "base_url": &config.app_url })
.await .await
@@ -87,7 +92,7 @@ pub async fn create_session(
} }
t.allow_destructive = config.allow_destructive; t.allow_destructive = config.allow_destructive;
t.excluded_paths = config.scope_exclusions.clone(); t.excluded_paths = config.scope_exclusions.clone();
let res = agent.db.dast_targets().insert_one(&t).await.map_err(|e| { let res = db.dast_targets().insert_one(&t).await.map_err(|e| {
( (
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to create target: {e}"), format!("Failed to create target: {e}"),
@@ -110,8 +115,7 @@ pub async fn create_session(
// Resolve repo_id from git_repo_url if provided // Resolve repo_id from git_repo_url if provided
if let Some(ref git_url) = config.git_repo_url { if let Some(ref git_url) = config.git_repo_url {
if let Ok(Some(repo)) = agent if let Ok(Some(repo)) = db
.db
.repositories() .repositories()
.find_one(doc! { "git_url": git_url }) .find_one(doc! { "git_url": git_url })
.await .await
@@ -120,8 +124,7 @@ pub async fn create_session(
} }
} }
let insert_result = agent let insert_result = db
.db
.pentest_sessions() .pentest_sessions()
.insert_one(&session) .insert_one(&session)
.await .await
@@ -212,8 +215,7 @@ pub async fn create_session(
// Persist encrypted credentials to DB // Persist encrypted credentials to DB
if session_for_task.config.is_some() { if session_for_task.config.is_some() {
if let Some(sid) = session.id { if let Some(sid) = session.id {
let _ = agent let _ = db
.db
.pentest_sessions() .pentest_sessions()
.update_one( .update_one(
doc! { "_id": sid }, doc! { "_id": sid },
@@ -245,12 +247,13 @@ pub async fn create_session(
}); });
let llm = agent.llm.clone(); let llm = agent.llm.clone();
let db = agent.db.clone(); let db_for_orchestrator = db.clone();
let session_clone = session.clone(); let session_clone = session.clone();
let target_clone = target.clone(); let target_clone = target.clone();
let agent_ref = agent.clone(); let agent_ref = agent.clone();
tokio::spawn(async move { tokio::spawn(async move {
let orchestrator = PentestOrchestrator::new(llm, db, event_tx, Some(pause_rx)); let orchestrator =
PentestOrchestrator::new(llm, db_for_orchestrator, event_tx, Some(pause_rx));
orchestrator orchestrator
.run_session_guarded(&session_clone, &target_clone, &initial_message) .run_session_guarded(&session_clone, &target_clone, &initial_message)
.await; .await;
@@ -292,8 +295,7 @@ pub async fn create_session(
) )
})?; })?;
let target = agent let target = db
.db
.dast_targets() .dast_targets()
.find_one(doc! { "_id": oid }) .find_one(doc! { "_id": oid })
.await .await
@@ -310,8 +312,7 @@ pub async fn create_session(
let mut session = PentestSession::new(target_id, strategy); let mut session = PentestSession::new(target_id, strategy);
session.repo_id = target.repo_id.clone(); session.repo_id = target.repo_id.clone();
let insert_result = agent let insert_result = db
.db
.pentest_sessions() .pentest_sessions()
.insert_one(&session) .insert_one(&session)
.await .await
@@ -338,12 +339,13 @@ pub async fn create_session(
}); });
let llm = agent.llm.clone(); let llm = agent.llm.clone();
let db = agent.db.clone(); let db_for_orchestrator = db.clone();
let session_clone = session.clone(); let session_clone = session.clone();
let target_clone = target.clone(); let target_clone = target.clone();
let agent_ref = agent.clone(); let agent_ref = agent.clone();
tokio::spawn(async move { tokio::spawn(async move {
let orchestrator = PentestOrchestrator::new(llm, db, event_tx, Some(pause_rx)); let orchestrator =
PentestOrchestrator::new(llm, db_for_orchestrator, event_tx, Some(pause_rx));
orchestrator orchestrator
.run_session_guarded(&session_clone, &target_clone, &initial_message) .run_session_guarded(&session_clone, &target_clone, &initial_message)
.await; .await;
@@ -373,10 +375,11 @@ fn parse_strategy(s: &str) -> PentestStrategy {
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn lookup_repo( pub async fn lookup_repo(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<LookupRepoQuery>, Query(params): Query<LookupRepoQuery>,
) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> { ) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
let repo = agent let db = tenant_db(&agent, &tenant).await?;
.db let repo = db
.repositories() .repositories()
.find_one(doc! { "git_url": &params.url }) .find_one(doc! { "git_url": &params.url })
.await .await
@@ -402,9 +405,11 @@ pub async fn lookup_repo(
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn list_sessions( pub async fn list_sessions(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<PaginationParams>, Query(params): Query<PaginationParams>,
) -> Result<Json<ApiResponse<Vec<PentestSession>>>, StatusCode> { ) -> Result<Json<ApiResponse<Vec<PentestSession>>>, StatusCode> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64; let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db let total = db
.pentest_sessions() .pentest_sessions()
@@ -438,12 +443,13 @@ pub async fn list_sessions(
#[tracing::instrument(skip_all, fields(session_id = %id))] #[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn get_session( pub async fn get_session(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
) -> Result<Json<ApiResponse<PentestSession>>, StatusCode> { ) -> Result<Json<ApiResponse<PentestSession>>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?; let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let mut session = agent let mut session = db
.db
.pentest_sessions() .pentest_sessions()
.find_one(doc! { "_id": oid }) .find_one(doc! { "_id": oid })
.await .await
@@ -471,15 +477,18 @@ pub async fn get_session(
#[tracing::instrument(skip_all, fields(session_id = %id))] #[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn send_message( pub async fn send_message(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
Json(req): Json<SendMessageRequest>, Json(req): Json<SendMessageRequest>,
) -> Result<Json<ApiResponse<PentestMessage>>, (StatusCode, String)> { ) -> Result<Json<ApiResponse<PentestMessage>>, (StatusCode, String)> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id) let oid = mongodb::bson::oid::ObjectId::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid session ID".to_string()))?; .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid session ID".to_string()))?;
let db = tenant_db(&agent, &tenant)
.await
.map_err(|s| (s, "failed to acquire tenant database".to_string()))?;
// Verify session exists and is running // Verify session exists and is running
let session = agent let session = db
.db
.pentest_sessions() .pentest_sessions()
.find_one(doc! { "_id": oid }) .find_one(doc! { "_id": oid })
.await .await
@@ -506,8 +515,7 @@ pub async fn send_message(
) )
})?; })?;
let target = agent let target = db
.db
.dast_targets() .dast_targets()
.find_one(doc! { "_id": target_oid }) .find_one(doc! { "_id": target_oid })
.await .await
@@ -527,13 +535,13 @@ pub async fn send_message(
// Store user message // Store user message
let session_id = id.clone(); let session_id = id.clone();
let user_msg = PentestMessage::user(session_id.clone(), req.message.clone()); let user_msg = PentestMessage::user(session_id.clone(), req.message.clone());
let _ = agent.db.pentest_messages().insert_one(&user_msg).await; let _ = db.pentest_messages().insert_one(&user_msg).await;
let response_msg = user_msg.clone(); let response_msg = user_msg.clone();
// Spawn orchestrator to continue the session // Spawn orchestrator to continue the session
let llm = agent.llm.clone(); let llm = agent.llm.clone();
let db = agent.db.clone(); let db_for_orchestrator = db.clone();
let message = req.message.clone(); let message = req.message.clone();
// Use existing broadcast sender if available, otherwise create a new one // Use existing broadcast sender if available, otherwise create a new one
@@ -548,7 +556,7 @@ pub async fn send_message(
.unwrap_or_else(|| agent.register_session_stream(&session_id)); .unwrap_or_else(|| agent.register_session_stream(&session_id));
tokio::spawn(async move { tokio::spawn(async move {
let orchestrator = PentestOrchestrator::new(llm, db, event_tx, None); let orchestrator = PentestOrchestrator::new(llm, db_for_orchestrator, event_tx, None);
orchestrator orchestrator
.run_session_guarded(&session, &target, &message) .run_session_guarded(&session, &target, &message)
.await; .await;
@@ -565,13 +573,16 @@ pub async fn send_message(
#[tracing::instrument(skip_all, fields(session_id = %id))] #[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn stop_session( pub async fn stop_session(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
) -> Result<Json<ApiResponse<PentestSession>>, (StatusCode, String)> { ) -> Result<Json<ApiResponse<PentestSession>>, (StatusCode, String)> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id) let oid = mongodb::bson::oid::ObjectId::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid session ID".to_string()))?; .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid session ID".to_string()))?;
let db = tenant_db(&agent, &tenant)
.await
.map_err(|s| (s, "failed to acquire tenant database".to_string()))?;
let session = agent let session = db
.db
.pentest_sessions() .pentest_sessions()
.find_one(doc! { "_id": oid }) .find_one(doc! { "_id": oid })
.await .await
@@ -590,9 +601,7 @@ pub async fn stop_session(
)); ));
} }
agent db.pentest_sessions()
.db
.pentest_sessions()
.update_one( .update_one(
doc! { "_id": oid }, doc! { "_id": oid },
doc! { "$set": { doc! { "$set": {
@@ -612,8 +621,7 @@ pub async fn stop_session(
// Clean up session resources // Clean up session resources
agent.cleanup_session(&id); agent.cleanup_session(&id);
let updated = agent let updated = db
.db
.pentest_sessions() .pentest_sessions()
.find_one(doc! { "_id": oid }) .find_one(doc! { "_id": oid })
.await .await
@@ -641,13 +649,16 @@ pub async fn stop_session(
#[tracing::instrument(skip_all, fields(session_id = %id))] #[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn pause_session( pub async fn pause_session(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
) -> Result<Json<ApiResponse<serde_json::Value>>, (StatusCode, String)> { ) -> Result<Json<ApiResponse<serde_json::Value>>, (StatusCode, String)> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id) let oid = mongodb::bson::oid::ObjectId::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid session ID".to_string()))?; .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid session ID".to_string()))?;
let db = tenant_db(&agent, &tenant)
.await
.map_err(|s| (s, "failed to acquire tenant database".to_string()))?;
let session = agent let session = db
.db
.pentest_sessions() .pentest_sessions()
.find_one(doc! { "_id": oid }) .find_one(doc! { "_id": oid })
.await .await
@@ -684,13 +695,16 @@ pub async fn pause_session(
#[tracing::instrument(skip_all, fields(session_id = %id))] #[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn resume_session( pub async fn resume_session(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
) -> Result<Json<ApiResponse<serde_json::Value>>, (StatusCode, String)> { ) -> Result<Json<ApiResponse<serde_json::Value>>, (StatusCode, String)> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id) let oid = mongodb::bson::oid::ObjectId::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid session ID".to_string()))?; .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid session ID".to_string()))?;
let db = tenant_db(&agent, &tenant)
.await
.map_err(|s| (s, "failed to acquire tenant database".to_string()))?;
let session = agent let session = db
.db
.pentest_sessions() .pentest_sessions()
.find_one(doc! { "_id": oid }) .find_one(doc! { "_id": oid })
.await .await
@@ -727,12 +741,13 @@ pub async fn resume_session(
#[tracing::instrument(skip_all, fields(session_id = %id))] #[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn get_attack_chain( pub async fn get_attack_chain(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
) -> Result<Json<ApiResponse<Vec<AttackChainNode>>>, StatusCode> { ) -> Result<Json<ApiResponse<Vec<AttackChainNode>>>, StatusCode> {
let _oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?; let _oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let nodes = match agent let nodes = match db
.db
.attack_chain_nodes() .attack_chain_nodes()
.find(doc! { "session_id": &id }) .find(doc! { "session_id": &id })
.sort(doc! { "started_at": 1 }) .sort(doc! { "started_at": 1 })
@@ -757,21 +772,21 @@ pub async fn get_attack_chain(
#[tracing::instrument(skip_all, fields(session_id = %id))] #[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn get_messages( pub async fn get_messages(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
Query(params): Query<PaginationParams>, Query(params): Query<PaginationParams>,
) -> Result<Json<ApiResponse<Vec<PentestMessage>>>, StatusCode> { ) -> Result<Json<ApiResponse<Vec<PentestMessage>>>, StatusCode> {
let _oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?; let _oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let skip = (params.page.saturating_sub(1)) * params.limit as u64; let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = agent let total = db
.db
.pentest_messages() .pentest_messages()
.count_documents(doc! { "session_id": &id }) .count_documents(doc! { "session_id": &id })
.await .await
.unwrap_or(0); .unwrap_or(0);
let messages = match agent let messages = match db
.db
.pentest_messages() .pentest_messages()
.find(doc! { "session_id": &id }) .find(doc! { "session_id": &id })
.sort(doc! { "created_at": 1 }) .sort(doc! { "created_at": 1 })
@@ -797,21 +812,21 @@ pub async fn get_messages(
#[tracing::instrument(skip_all, fields(session_id = %id))] #[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn get_session_findings( pub async fn get_session_findings(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
Query(params): Query<PaginationParams>, Query(params): Query<PaginationParams>,
) -> Result<Json<ApiResponse<Vec<compliance_core::models::dast::DastFinding>>>, StatusCode> { ) -> Result<Json<ApiResponse<Vec<compliance_core::models::dast::DastFinding>>>, StatusCode> {
let _oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?; let _oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let skip = (params.page.saturating_sub(1)) * params.limit as u64; let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = agent let total = db
.db
.dast_findings() .dast_findings()
.count_documents(doc! { "session_id": &id }) .count_documents(doc! { "session_id": &id })
.await .await
.unwrap_or(0); .unwrap_or(0);
let findings = match agent let findings = match db
.db
.dast_findings() .dast_findings()
.find(doc! { "session_id": &id }) .find(doc! { "session_id": &id })
.sort(doc! { "created_at": -1 }) .sort(doc! { "created_at": -1 })
@@ -6,10 +6,11 @@ use axum::Json;
use mongodb::bson::doc; use mongodb::bson::doc;
use compliance_core::models::pentest::*; use compliance_core::models::pentest::*;
use compliance_core::tenant_ctx::TenantCtx;
use crate::agent::ComplianceAgent; use crate::agent::ComplianceAgent;
use super::super::dto::{collect_cursor_async, ApiResponse}; use super::super::dto::{collect_cursor_async, tenant_db, ApiResponse};
type AgentExt = Extension<Arc<ComplianceAgent>>; type AgentExt = Extension<Arc<ComplianceAgent>>;
@@ -17,8 +18,10 @@ type AgentExt = Extension<Arc<ComplianceAgent>>;
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn pentest_stats( pub async fn pentest_stats(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
) -> Result<Json<ApiResponse<PentestStats>>, StatusCode> { ) -> Result<Json<ApiResponse<PentestStats>>, StatusCode> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let running_sessions = db let running_sessions = db
.pentest_sessions() .pentest_sessions()
@@ -11,10 +11,11 @@ use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use compliance_core::models::pentest::*; use compliance_core::models::pentest::*;
use compliance_core::tenant_ctx::TenantCtx;
use crate::agent::ComplianceAgent; use crate::agent::ComplianceAgent;
use super::super::dto::collect_cursor_async; use super::super::dto::{collect_cursor_async, tenant_db};
type AgentExt = Extension<Arc<ComplianceAgent>>; type AgentExt = Extension<Arc<ComplianceAgent>>;
@@ -25,13 +26,14 @@ type AgentExt = Extension<Arc<ComplianceAgent>>;
#[tracing::instrument(skip_all, fields(session_id = %id))] #[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn session_stream( pub async fn session_stream(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
) -> Result<Sse<impl futures_util::Stream<Item = Result<Event, Infallible>>>, StatusCode> { ) -> Result<Sse<impl futures_util::Stream<Item = Result<Event, Infallible>>>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?; let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
// Verify session exists // Verify session exists
let _session = agent let _session = db
.db
.pentest_sessions() .pentest_sessions()
.find_one(doc! { "_id": oid }) .find_one(doc! { "_id": oid })
.await .await
@@ -43,8 +45,7 @@ pub async fn session_stream(
let mut initial_events: Vec<Result<Event, Infallible>> = Vec::new(); let mut initial_events: Vec<Result<Event, Infallible>> = Vec::new();
// Fetch recent messages for this session // Fetch recent messages for this session
let messages: Vec<PentestMessage> = match agent let messages: Vec<PentestMessage> = match db
.db
.pentest_messages() .pentest_messages()
.find(doc! { "session_id": &id }) .find(doc! { "session_id": &id })
.sort(doc! { "created_at": 1 }) .sort(doc! { "created_at": 1 })
@@ -56,8 +57,7 @@ pub async fn session_stream(
}; };
// Fetch recent attack chain nodes // Fetch recent attack chain nodes
let nodes: Vec<AttackChainNode> = match agent let nodes: Vec<AttackChainNode> = match db
.db
.attack_chain_nodes() .attack_chain_nodes()
.find(doc! { "session_id": &id }) .find(doc! { "session_id": &id })
.sort(doc! { "started_at": 1 }) .sort(doc! { "started_at": 1 })
@@ -94,8 +94,7 @@ pub async fn session_stream(
} }
// Add current session status event // Add current session status event
let session = agent let session = db
.db
.pentest_sessions() .pentest_sessions()
.find_one(doc! { "_id": oid }) .find_one(doc! { "_id": oid })
.await .await
+28 -17
View File
@@ -5,13 +5,16 @@ use mongodb::bson::doc;
use super::dto::*; use super::dto::*;
use compliance_core::models::*; use compliance_core::models::*;
use compliance_core::tenant_ctx::TenantCtx;
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn list_repositories( pub async fn list_repositories(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<PaginationParams>, Query(params): Query<PaginationParams>,
) -> ApiResult<Vec<TrackedRepository>> { ) -> ApiResult<Vec<TrackedRepository>> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64; let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db let total = db
.repositories() .repositories()
@@ -43,6 +46,7 @@ pub async fn list_repositories(
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn add_repository( pub async fn add_repository(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Json(req): Json<AddRepositoryRequest>, Json(req): Json<AddRepositoryRequest>,
) -> Result<Json<ApiResponse<TrackedRepository>>, (StatusCode, String)> { ) -> Result<Json<ApiResponse<TrackedRepository>>, (StatusCode, String)> {
// Validate repository access before saving // Validate repository access before saving
@@ -69,17 +73,15 @@ pub async fn add_repository(
repo.tracker_token = req.tracker_token; repo.tracker_token = req.tracker_token;
repo.scan_schedule = req.scan_schedule; repo.scan_schedule = req.scan_schedule;
agent let db = tenant_db(&agent, &tenant)
.db
.repositories()
.insert_one(&repo)
.await .await
.map_err(|_| { .map_err(|s| (s, "failed to acquire tenant database".to_string()))?;
( db.repositories().insert_one(&repo).await.map_err(|_| {
StatusCode::CONFLICT, (
"Repository already exists".to_string(), StatusCode::CONFLICT,
) "Repository already exists".to_string(),
})?; )
})?;
Ok(Json(ApiResponse { Ok(Json(ApiResponse {
data: repo, data: repo,
@@ -91,10 +93,12 @@ pub async fn add_repository(
#[tracing::instrument(skip_all, fields(repo_id = %id))] #[tracing::instrument(skip_all, fields(repo_id = %id))]
pub async fn update_repository( pub async fn update_repository(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
Json(req): Json<UpdateRepositoryRequest>, Json(req): Json<UpdateRepositoryRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> { ) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?; let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let mut set_doc = doc! { "updated_at": mongodb::bson::DateTime::now() }; let mut set_doc = doc! { "updated_at": mongodb::bson::DateTime::now() };
@@ -126,8 +130,7 @@ pub async fn update_repository(
set_doc.insert("scan_schedule", schedule); set_doc.insert("scan_schedule", schedule);
} }
let result = agent let result = db
.db
.repositories() .repositories()
.update_one(doc! { "_id": oid }, doc! { "$set": set_doc }) .update_one(doc! { "_id": oid }, doc! { "$set": set_doc })
.await .await
@@ -155,11 +158,16 @@ pub async fn get_ssh_public_key(
#[tracing::instrument(skip_all, fields(repo_id = %id))] #[tracing::instrument(skip_all, fields(repo_id = %id))]
pub async fn trigger_scan( pub async fn trigger_scan(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> { ) -> Result<Json<serde_json::Value>, StatusCode> {
let agent_clone = (*agent).clone(); let agent_clone = (*agent).clone();
let tenant_id = tenant.0.tenant_id.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = agent_clone.run_scan(&id, ScanTrigger::Manual).await { if let Err(e) = agent_clone
.run_scan(&tenant_id, &id, ScanTrigger::Manual)
.await
{
tracing::error!("Manual scan failed for {id}: {e}"); tracing::error!("Manual scan failed for {id}: {e}");
} }
}); });
@@ -170,11 +178,12 @@ pub async fn trigger_scan(
/// Return the webhook secret for a repository (used by dashboard to display it) /// Return the webhook secret for a repository (used by dashboard to display it)
pub async fn get_webhook_config( pub async fn get_webhook_config(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> { ) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?; let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let repo = agent let db = tenant_db(&agent, &tenant).await?;
.db let repo = db
.repositories() .repositories()
.find_one(doc! { "_id": oid }) .find_one(doc! { "_id": oid })
.await .await
@@ -196,10 +205,12 @@ pub async fn get_webhook_config(
#[tracing::instrument(skip_all, fields(repo_id = %id))] #[tracing::instrument(skip_all, fields(repo_id = %id))]
pub async fn delete_repository( pub async fn delete_repository(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>, Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> { ) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?; let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
// Delete the repository // Delete the repository
let result = db let result = db
+16 -5
View File
@@ -6,6 +6,7 @@ use mongodb::bson::doc;
use super::dto::*; use super::dto::*;
use compliance_core::models::SbomEntry; use compliance_core::models::SbomEntry;
use compliance_core::tenant_ctx::TenantCtx;
const COPYLEFT_LICENSES: &[&str] = &[ const COPYLEFT_LICENSES: &[&str] = &[
"GPL-2.0", "GPL-2.0",
@@ -29,8 +30,10 @@ const COPYLEFT_LICENSES: &[&str] = &[
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn sbom_filters( pub async fn sbom_filters(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
) -> Result<Json<serde_json::Value>, StatusCode> { ) -> Result<Json<serde_json::Value>, StatusCode> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let managers: Vec<String> = db let managers: Vec<String> = db
.sbom_entries() .sbom_entries()
@@ -61,9 +64,11 @@ pub async fn sbom_filters(
#[tracing::instrument(skip_all, fields(repo_id = ?filter.repo_id, package_manager = ?filter.package_manager))] #[tracing::instrument(skip_all, fields(repo_id = ?filter.repo_id, package_manager = ?filter.package_manager))]
pub async fn list_sbom( pub async fn list_sbom(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Query(filter): Query<SbomFilter>, Query(filter): Query<SbomFilter>,
) -> ApiResult<Vec<SbomEntry>> { ) -> ApiResult<Vec<SbomEntry>> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let mut query = doc! {}; let mut query = doc! {};
if let Some(repo_id) = &filter.repo_id { if let Some(repo_id) = &filter.repo_id {
@@ -120,9 +125,11 @@ pub async fn list_sbom(
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn export_sbom( pub async fn export_sbom(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<SbomExportParams>, Query(params): Query<SbomExportParams>,
) -> Result<impl IntoResponse, StatusCode> { ) -> Result<impl IntoResponse, StatusCode> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let entries: Vec<SbomEntry> = match db let entries: Vec<SbomEntry> = match db
.sbom_entries() .sbom_entries()
.find(doc! { "repo_id": &params.repo_id }) .find(doc! { "repo_id": &params.repo_id })
@@ -236,9 +243,11 @@ pub async fn export_sbom(
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn license_summary( pub async fn license_summary(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<SbomFilter>, Query(params): Query<SbomFilter>,
) -> ApiResult<Vec<LicenseSummary>> { ) -> ApiResult<Vec<LicenseSummary>> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let mut query = doc! {}; let mut query = doc! {};
if let Some(repo_id) = &params.repo_id { if let Some(repo_id) = &params.repo_id {
query.insert("repo_id", repo_id); query.insert("repo_id", repo_id);
@@ -285,9 +294,11 @@ pub async fn license_summary(
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn sbom_diff( pub async fn sbom_diff(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<SbomDiffParams>, Query(params): Query<SbomDiffParams>,
) -> ApiResult<SbomDiffResult> { ) -> ApiResult<SbomDiffResult> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let entries_a: Vec<SbomEntry> = match db let entries_a: Vec<SbomEntry> = match db
.sbom_entries() .sbom_entries()
+4 -1
View File
@@ -4,13 +4,16 @@ use mongodb::bson::doc;
use super::dto::*; use super::dto::*;
use compliance_core::models::ScanRun; use compliance_core::models::ScanRun;
use compliance_core::tenant_ctx::TenantCtx;
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn list_scan_runs( pub async fn list_scan_runs(
Extension(agent): AgentExt, Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<PaginationParams>, Query(params): Query<PaginationParams>,
) -> ApiResult<Vec<ScanRun>> { ) -> ApiResult<Vec<ScanRun>> {
let db = &agent.db; let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64; let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db.scan_runs().count_documents(doc! {}).await.unwrap_or(0); let total = db.scan_runs().count_documents(doc! {}).await.unwrap_or(0);
+44 -1
View File
@@ -1,6 +1,9 @@
use std::sync::Arc; use std::sync::Arc;
use axum::extract::Request;
use axum::http::HeaderValue; use axum::http::HeaderValue;
use axum::middleware::Next;
use axum::response::Response;
use axum::{middleware, Extension}; use axum::{middleware, Extension};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tower_http::cors::CorsLayer; use tower_http::cors::CorsLayer;
@@ -8,11 +11,44 @@ use tower_http::set_header::SetResponseHeaderLayer;
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
use compliance_core::auth::{require_jwt_auth, require_tenant_status, JwksState}; use compliance_core::auth::{require_jwt_auth, require_tenant_status, JwksState};
use compliance_core::{TenantContext, TenantStatus};
use crate::agent::ComplianceAgent; use crate::agent::ComplianceAgent;
use crate::api::routes; use crate::api::routes;
use crate::error::AgentError; use crate::error::AgentError;
/// Synthetic tenant id used when Keycloak isn't configured (local dev,
/// `cargo run` against a bare Mongo). Lets the handler stack stay
/// uniformly tenant-scoped without the operator having to spin up KC
/// just to poke at the API. Override via `DEV_TENANT_ID`.
const DEFAULT_DEV_TENANT_ID: &str = "dev";
/// Inject a synthetic [`TenantContext`] for any request that lacks one.
/// Only mounted when Keycloak is NOT configured; with KC, the real
/// `require_jwt_auth` middleware owns this and we never reach here
/// without a context.
///
/// Public so the integration-test harness can mount it without
/// duplicating the synthetic-context shape.
pub async fn inject_dev_tenant(mut request: Request, next: Next) -> Response {
if request.extensions().get::<TenantContext>().is_none() {
let tenant_id =
std::env::var("DEV_TENANT_ID").unwrap_or_else(|_| DEFAULT_DEV_TENANT_ID.to_string());
let ctx = TenantContext {
tenant_slug: tenant_id.clone(),
tenant_id,
org_roles: vec![],
products: vec![],
plan: "dev".to_string(),
status: TenantStatus::Active,
user_id: "dev-user".to_string(),
user_name: None,
};
request.extensions_mut().insert(ctx);
}
next.run(request).await
}
pub async fn start_api_server(agent: ComplianceAgent, port: u16) -> Result<(), AgentError> { pub async fn start_api_server(agent: ComplianceAgent, port: u16) -> Result<(), AgentError> {
let mut app = routes::build_router() let mut app = routes::build_router()
.layer(Extension(Arc::new(agent.clone()))) .layer(Extension(Arc::new(agent.clone())))
@@ -53,7 +89,14 @@ pub async fn start_api_server(agent: ComplianceAgent, port: u16) -> Result<(), A
.layer(middleware::from_fn(require_jwt_auth)) .layer(middleware::from_fn(require_jwt_auth))
.layer(Extension(jwks_state)); .layer(Extension(jwks_state));
} else { } else {
tracing::warn!("Keycloak not configured - API endpoints are unprotected"); let tenant_id =
std::env::var("DEV_TENANT_ID").unwrap_or_else(|_| DEFAULT_DEV_TENANT_ID.to_string());
tracing::warn!(
tenant_id = %tenant_id,
"Keycloak not configured — running unauthenticated against the dev tenant. \
DO NOT use in any environment with real customer data."
);
app = app.layer(middleware::from_fn(inject_dev_tenant));
} }
let addr = format!("0.0.0.0:{port}"); let addr = format!("0.0.0.0:{port}");
+192
View File
@@ -1,11 +1,197 @@
use std::sync::Arc;
use dashmap::DashMap;
use mongodb::bson::doc; use mongodb::bson::doc;
use mongodb::options::IndexOptions; use mongodb::options::IndexOptions;
use mongodb::{Client, Collection, IndexModel}; use mongodb::{Client, Collection, IndexModel};
use sha2::{Digest, Sha256};
use compliance_core::models::*; use compliance_core::models::*;
use compliance_core::TenantContext;
use crate::error::AgentError; use crate::error::AgentError;
/// Mongo enforces a 63-byte cap on database names (older clusters: 64
/// on Linux, 63 on Windows; we target the conservative limit).
const MAX_DB_NAME_LEN: usize = 63;
/// Hex length of the SHA-256 truncation used for the hash fallback
/// tenant DB name (16 bytes → 32 hex chars). 16 bytes gives ~2^64
/// birthday-collision resistance — at our 10s-100s tenant scale this
/// is effectively impossible to hit.
const HASH_HEX_LEN: usize = 32;
/// Largest `db_prefix` that still guarantees the hash-fallback name
/// fits in the 63-byte cap: `prefix + "_" + 32 hex chars`.
const MAX_PREFIX_LEN: usize = MAX_DB_NAME_LEN - 1 - HASH_HEX_LEN;
/// Per-tenant Mongo connection broker (M7.2 isolation model).
///
/// Holds one [`Client`] and hands out [`Database`] handles physically
/// scoped to `<db_prefix>_<tenant_id>`. The driver is the isolation
/// boundary — a handle for tenant A cannot see tenant B's documents
/// because it is connected to a different database, not because of an
/// application-level filter.
///
/// Index creation runs idempotently the first time each tenant is seen
/// in the process's lifetime. Mongo's `createIndex` is itself idempotent
/// by index name; the in-memory `ensured` set just skips the round-trip.
#[derive(Clone, Debug)]
pub struct DatabasePool {
client: Client,
db_prefix: String,
ensured: Arc<DashMap<String, ()>>,
}
impl DatabasePool {
/// Connect to the cluster and prepare to hand out tenant databases
/// named `<db_prefix>_<tenant_id>`.
///
/// Validates `db_prefix.len() <= MAX_PREFIX_LEN` so the
/// hash-fallback path is provably within Mongo's 63-byte db-name
/// cap. Refuses to construct a pool that could ever produce an
/// over-long name.
pub async fn connect(uri: &str, db_prefix: &str) -> Result<Self, AgentError> {
if db_prefix.len() > MAX_PREFIX_LEN {
return Err(AgentError::Other(format!(
"db_prefix '{db_prefix}' is {} chars; max is {MAX_PREFIX_LEN} so the \
hash-fallback tenant DB name fits Mongo's {MAX_DB_NAME_LEN}-byte cap",
db_prefix.len()
)));
}
let client = Client::with_uri_str(uri).await?;
client
.database("admin")
.run_command(doc! { "ping": 1 })
.await?;
tracing::info!(
"MongoDB cluster reachable; per-tenant pool ready (db prefix '{db_prefix}')"
);
Ok(Self {
client,
db_prefix: db_prefix.to_string(),
ensured: Arc::new(DashMap::new()),
})
}
/// Return a [`Database`] scoped to this tenant. Ensures indexes on
/// first call per tenant (per process). Cheap on the hot path —
/// subsequent calls skip the round-trip.
pub async fn for_tenant(&self, ctx: &TenantContext) -> Result<Database, AgentError> {
self.for_tenant_id(&ctx.tenant_id).await
}
/// Like [`Self::for_tenant`] but accepts a bare tenant_id.
/// For background paths (scheduler, webhooks, pipeline orchestrators)
/// that don't have a full [`TenantContext`] but know which tenant
/// they're operating on (typically resolved from a URL path, a job
/// argument, or the registry).
pub async fn for_tenant_id(&self, tenant_id: &str) -> Result<Database, AgentError> {
let db_name = self.tenant_db_name(tenant_id);
let db = Database::from_database(self.client.database(&db_name));
// `DashMap::insert` returns the previous value; `None` means we
// were the first writer for this tenant_id and own the
// index-ensure work.
if self.ensured.insert(tenant_id.to_string(), ()).is_none() {
if let Err(e) = db.ensure_indexes().await {
// Roll the marker back so the next request retries.
self.ensured.remove(tenant_id);
return Err(e);
}
tracing::debug!(
tenant_id = %tenant_id,
db_name = %db_name,
"Indexes ensured for tenant database"
);
}
Ok(db)
}
/// Compute the Mongo database name for a tenant. Public for tests
/// and tenant offboarding (`pool.client().database(name).drop()`).
///
/// Format: `<prefix>_<sanitized_tenant_id>` if it fits the 63-byte
/// cap, else `<prefix>_<sha256-16-byte-hex-of-tenant_id>`. The
/// `db_prefix` length invariant established at [`Self::connect`]
/// guarantees the hash-fallback name always fits — no runtime
/// assertion needed.
///
/// Collision resistance: the hash fallback is a 16-byte SHA-256
/// truncation, which gives ~2^64 birthday-collision resistance. At
/// our 10s100s tenant scale the probability of two tenant_ids
/// colliding is effectively zero. (8-byte truncation would have
/// been ~2^32 — too close for comfort on a regulated product.)
pub fn tenant_db_name(&self, tenant_id: &str) -> String {
let sanitized = sanitize_tenant_id(tenant_id);
let natural = format!("{}_{}", self.db_prefix, sanitized);
if natural.len() <= MAX_DB_NAME_LEN {
natural
} else {
let mut hasher = Sha256::new();
hasher.update(tenant_id.as_bytes());
let digest = hasher.finalize();
let suffix = hex::encode(&digest[..HASH_HEX_LEN / 2]);
format!("{}_{}", self.db_prefix, suffix)
}
}
/// Raw client handle. Reserved for cross-tenant admin flows that
/// must opt in explicitly (tenant listing, drop-on-offboard).
pub fn client(&self) -> &Client {
&self.client
}
/// List every Mongo database currently belonging to this pool,
/// identified by the `<db_prefix>_` prefix. The result is the raw
/// database names — opening one for offboarding/cleanup goes
/// through [`Self::client`].
///
/// Note: hashed-fallback names (very long tenant_ids) lose the
/// original tenant_id at the cluster level — we know a database
/// exists for *some* tenant but not which one. In practice
/// tenant_ids are UUIDs (36 chars) and never hit the fallback,
/// so this is a theoretical concern, not an operational one.
pub async fn list_tenant_db_names(&self) -> Result<Vec<String>, AgentError> {
let prefix = format!("{}_", self.db_prefix);
let names = self.client.list_database_names().await?;
Ok(names
.into_iter()
.filter(|n| n.starts_with(&prefix))
.collect())
}
/// Drop the database for a specific tenant. Used by GDPR delete
/// and tenant offboarding. Idempotent — dropping a non-existent
/// database is a no-op at the driver level.
///
/// Also evicts the tenant from the in-memory `ensured` set so a
/// later re-provision triggers fresh `ensure_indexes`.
pub async fn drop_tenant(&self, tenant_id: &str) -> Result<(), AgentError> {
let db_name = self.tenant_db_name(tenant_id);
self.client.database(&db_name).drop().await?;
self.ensured.remove(tenant_id);
tracing::info!(
tenant_id = %tenant_id,
db_name = %db_name,
"Dropped tenant database"
);
Ok(())
}
}
/// Mongo database names disallow `/`, `\`, `.`, `"`, `$`, ` `, and NUL.
/// breakpilot-dev tenant_ids are UUIDs so this is belt-and-braces, but
/// it lets the pool tolerate any future tenant_id shape without surprise.
fn sanitize_tenant_id(tenant_id: &str) -> String {
tenant_id
.chars()
.map(|c| match c {
'/' | '\\' | '.' | '"' | '$' | ' ' | '\0' => '_',
c => c,
})
.collect()
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Database { pub struct Database {
inner: mongodb::Database, inner: mongodb::Database,
@@ -20,6 +206,12 @@ impl Database {
Ok(Self { inner: db }) Ok(Self { inner: db })
} }
/// Wrap an already-resolved Mongo database. Used by [`DatabasePool`]
/// to hand out tenant-scoped handles without a fresh client per tenant.
pub(crate) fn from_database(inner: mongodb::Database) -> Self {
Self { inner }
}
pub async fn ensure_indexes(&self) -> Result<(), AgentError> { pub async fn ensure_indexes(&self) -> Result<(), AgentError> {
// repositories: unique git_url // repositories: unique git_url
self.repositories() self.repositories()
+6 -3
View File
@@ -25,10 +25,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
tracing::info!("Connecting to MongoDB..."); tracing::info!("Connecting to MongoDB...");
let db = database::Database::connect(&config.mongodb_uri, &config.mongodb_database).await?; // Per-tenant pool only — the agent has no shared "default" database
db.ensure_indexes().await?; // after M7.2-D. `mongodb_database` is now the db-name prefix used
// for tenant databases (`<prefix>_<tenant_id>`).
let db_pool =
database::DatabasePool::connect(&config.mongodb_uri, &config.mongodb_database).await?;
let agent = agent::ComplianceAgent::new(config.clone(), db.clone()); let agent = agent::ComplianceAgent::new(config.clone(), db_pool);
tracing::info!("Starting scheduler..."); tracing::info!("Starting scheduler...");
let scheduler_agent = agent.clone(); let scheduler_agent = agent.clone();
+77 -22
View File
@@ -4,8 +4,14 @@ use tokio_cron_scheduler::{Job, JobScheduler};
use compliance_core::models::ScanTrigger; use compliance_core::models::ScanTrigger;
use crate::agent::ComplianceAgent; use crate::agent::ComplianceAgent;
use crate::database::Database;
use crate::error::AgentError; use crate::error::AgentError;
/// Default tenant the scheduler runs against when `SCHEDULER_TENANT_IDS`
/// isn't set. Matches the dev-injector default so a bare `cargo run` has
/// the scheduler scanning whatever lives in `<prefix>_dev`.
const DEFAULT_SCHEDULER_TENANT_ID: &str = "dev";
pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> { pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> {
let sched = JobScheduler::new() let sched = JobScheduler::new()
.await .await
@@ -18,7 +24,9 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
let agent = scan_agent.clone(); let agent = scan_agent.clone();
Box::pin(async move { Box::pin(async move {
tracing::info!("Scheduled scan triggered"); tracing::info!("Scheduled scan triggered");
scan_all_repos(&agent).await; for tenant_id in scheduler_tenants() {
scan_all_repos(&agent, &tenant_id).await;
}
}) })
}) })
.map_err(|e| AgentError::Scheduler(format!("Failed to create scan job: {e}")))?; .map_err(|e| AgentError::Scheduler(format!("Failed to create scan job: {e}")))?;
@@ -34,7 +42,9 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
let agent = cve_agent.clone(); let agent = cve_agent.clone();
Box::pin(async move { Box::pin(async move {
tracing::info!("CVE monitor triggered"); tracing::info!("CVE monitor triggered");
monitor_cves(&agent).await; for tenant_id in scheduler_tenants() {
monitor_cves(&agent, &tenant_id).await;
}
}) })
}) })
.map_err(|e| AgentError::Scheduler(format!("Failed to create CVE monitor job: {e}")))?; .map_err(|e| AgentError::Scheduler(format!("Failed to create CVE monitor job: {e}")))?;
@@ -48,8 +58,9 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
.await .await
.map_err(|e| AgentError::Scheduler(format!("Failed to start scheduler: {e}")))?; .map_err(|e| AgentError::Scheduler(format!("Failed to start scheduler: {e}")))?;
let tenants = scheduler_tenants();
tracing::info!( tracing::info!(
"Scheduler started: scans='{}', CVE monitor='{}'", "Scheduler started: scans='{}', CVE monitor='{}', tenants={tenants:?}",
agent.config.scan_schedule, agent.config.scan_schedule,
agent.config.cve_monitor_schedule, agent.config.cve_monitor_schedule,
); );
@@ -60,13 +71,47 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
} }
} }
async fn scan_all_repos(agent: &ComplianceAgent) { /// Tenants the scheduler iterates each tick. From `SCHEDULER_TENANT_IDS`
/// (comma-separated), or `DEFAULT_SCHEDULER_TENANT_ID` if unset. M7.2-D
/// will replace this with a pull from the tenant-registry.
fn scheduler_tenants() -> Vec<String> {
std::env::var("SCHEDULER_TENANT_IDS")
.ok()
.map(|s| {
s.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.map(String::from)
.collect::<Vec<_>>()
})
.filter(|v| !v.is_empty())
.unwrap_or_else(|| vec![DEFAULT_SCHEDULER_TENANT_ID.to_string()])
}
/// Resolve the per-tenant database. Logs and returns `None` on failure
/// so the loop in the caller can continue with other tenants.
async fn tenant_db(agent: &ComplianceAgent, tenant_id: &str) -> Option<Database> {
match agent.db_pool.for_tenant_id(tenant_id).await {
Ok(db) => Some(db),
Err(e) => {
tracing::error!("Scheduler: cannot open tenant database '{tenant_id}': {e}");
None
}
}
}
async fn scan_all_repos(agent: &ComplianceAgent, tenant_id: &str) {
use futures_util::StreamExt; use futures_util::StreamExt;
let cursor = match agent.db.repositories().find(doc! {}).await { let db = match tenant_db(agent, tenant_id).await {
Some(db) => db,
None => return,
};
let cursor = match db.repositories().find(doc! {}).await {
Ok(c) => c, Ok(c) => c,
Err(e) => { Err(e) => {
tracing::error!("Failed to list repos for scheduled scan: {e}"); tracing::error!("Failed to list repos for tenant '{tenant_id}': {e}");
return; return;
} }
}; };
@@ -75,33 +120,44 @@ async fn scan_all_repos(agent: &ComplianceAgent) {
for repo in repos { for repo in repos {
let repo_id = repo.id.map(|id| id.to_hex()).unwrap_or_default(); let repo_id = repo.id.map(|id| id.to_hex()).unwrap_or_default();
if let Err(e) = agent.run_scan(&repo_id, ScanTrigger::Scheduled).await { if let Err(e) = agent
tracing::error!("Scheduled scan failed for {}: {e}", repo.name); .run_scan(tenant_id, &repo_id, ScanTrigger::Scheduled)
.await
{
tracing::error!(
"Scheduled scan failed for {} (tenant '{tenant_id}'): {e}",
repo.name
);
} }
} }
} }
async fn monitor_cves(agent: &ComplianceAgent) { async fn monitor_cves(agent: &ComplianceAgent, tenant_id: &str) {
use compliance_core::models::notification::{parse_severity, CveNotification}; use compliance_core::models::notification::{parse_severity, CveNotification};
use compliance_core::models::SbomEntry; use compliance_core::models::SbomEntry;
use futures_util::StreamExt; use futures_util::StreamExt;
let db = match tenant_db(agent, tenant_id).await {
Some(db) => db,
None => return,
};
// Fetch all SBOM entries grouped by repo // Fetch all SBOM entries grouped by repo
let cursor = match agent.db.sbom_entries().find(doc! {}).await { let cursor = match db.sbom_entries().find(doc! {}).await {
Ok(c) => c, Ok(c) => c,
Err(e) => { Err(e) => {
tracing::error!("CVE monitor: failed to list SBOM entries: {e}"); tracing::error!("CVE monitor: failed to list SBOM entries for '{tenant_id}': {e}");
return; return;
} }
}; };
let entries: Vec<SbomEntry> = cursor.filter_map(|r| async { r.ok() }).collect().await; let entries: Vec<SbomEntry> = cursor.filter_map(|r| async { r.ok() }).collect().await;
if entries.is_empty() { if entries.is_empty() {
tracing::debug!("CVE monitor: no SBOM entries, skipping"); tracing::debug!("CVE monitor: no SBOM entries for tenant '{tenant_id}', skipping");
return; return;
} }
tracing::info!( tracing::info!(
"CVE monitor: checking {} dependencies for new CVEs", "CVE monitor: checking {} dependencies for new CVEs (tenant '{tenant_id}')",
entries.len() entries.len()
); );
@@ -112,7 +168,7 @@ async fn monitor_cves(agent: &ComplianceAgent) {
std::collections::HashMap::new(); std::collections::HashMap::new();
for rid in &repo_ids { for rid in &repo_ids {
if let Ok(oid) = mongodb::bson::oid::ObjectId::parse_str(rid) { if let Ok(oid) = mongodb::bson::oid::ObjectId::parse_str(rid) {
if let Ok(Some(repo)) = agent.db.repositories().find_one(doc! { "_id": oid }).await { if let Ok(Some(repo)) = db.repositories().find_one(doc! { "_id": oid }).await {
repo_names.insert(rid.clone(), repo.name.clone()); repo_names.insert(rid.clone(), repo.name.clone());
} }
} }
@@ -160,8 +216,7 @@ async fn monitor_cves(agent: &ComplianceAgent) {
for alert in &alerts { for alert in &alerts {
let filter = doc! { "cve_id": &alert.cve_id, "repo_id": &alert.repo_id }; let filter = doc! { "cve_id": &alert.cve_id, "repo_id": &alert.repo_id };
let update = doc! { "$setOnInsert": mongodb::bson::to_bson(alert).unwrap_or_default() }; let update = doc! { "$setOnInsert": mongodb::bson::to_bson(alert).unwrap_or_default() };
let _ = agent let _ = db
.db
.cve_alerts() .cve_alerts()
.update_one(filter, update) .update_one(filter, update)
.upsert(true) .upsert(true)
@@ -174,8 +229,7 @@ async fn monitor_cves(agent: &ComplianceAgent) {
continue; continue;
} }
if let Some(entry_id) = &entry.id { if let Some(entry_id) = &entry.id {
let _ = agent let _ = db
.db
.sbom_entries() .sbom_entries()
.update_one( .update_one(
doc! { "_id": entry_id }, doc! { "_id": entry_id },
@@ -213,8 +267,7 @@ async fn monitor_cves(agent: &ComplianceAgent) {
let update = doc! { let update = doc! {
"$setOnInsert": mongodb::bson::to_bson(&notification).unwrap_or_default() "$setOnInsert": mongodb::bson::to_bson(&notification).unwrap_or_default()
}; };
match agent match db
.db
.cve_notifications() .cve_notifications()
.update_one(filter, update) .update_one(filter, update)
.upsert(true) .upsert(true)
@@ -232,8 +285,10 @@ async fn monitor_cves(agent: &ComplianceAgent) {
} }
if new_notifications > 0 { if new_notifications > 0 {
tracing::info!("CVE monitor: created {new_notifications} new notification(s)"); tracing::info!(
"CVE monitor: created {new_notifications} new notification(s) for tenant '{tenant_id}'"
);
} else { } else {
tracing::info!("CVE monitor: no new CVEs found"); tracing::info!("CVE monitor: no new CVEs found for tenant '{tenant_id}'");
} }
} }
+23 -9
View File
@@ -14,24 +14,30 @@ type HmacSha256 = Hmac<Sha256>;
pub async fn handle_gitea_webhook( pub async fn handle_gitea_webhook(
Extension(agent): Extension<Arc<ComplianceAgent>>, Extension(agent): Extension<Arc<ComplianceAgent>>,
Path(repo_id): Path<String>, Path((tenant_id, repo_id)): Path<(String, String)>,
headers: HeaderMap, headers: HeaderMap,
body: Bytes, body: Bytes,
) -> StatusCode { ) -> StatusCode {
// Look up the repo to get its webhook secret // Look up the repo in the tenant's database to get its webhook secret
let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) { let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) {
Ok(oid) => oid, Ok(oid) => oid,
Err(_) => return StatusCode::NOT_FOUND, Err(_) => return StatusCode::NOT_FOUND,
}; };
let repo = match agent let db = match agent.db_pool.for_tenant_id(&tenant_id).await {
.db Ok(db) => db,
Err(e) => {
tracing::warn!("Gitea webhook: cannot open tenant database '{tenant_id}': {e}");
return StatusCode::NOT_FOUND;
}
};
let repo = match db
.repositories() .repositories()
.find_one(mongodb::bson::doc! { "_id": oid }) .find_one(mongodb::bson::doc! { "_id": oid })
.await .await
{ {
Ok(Some(repo)) => repo, Ok(Some(repo)) => repo,
_ => { _ => {
tracing::warn!("Gitea webhook: repo {repo_id} not found"); tracing::warn!("Gitea webhook: repo {repo_id} not found in tenant '{tenant_id}'");
return StatusCode::NOT_FOUND; return StatusCode::NOT_FOUND;
} }
}; };
@@ -66,15 +72,21 @@ pub async fn handle_gitea_webhook(
"push" => { "push" => {
let agent_clone = (*agent).clone(); let agent_clone = (*agent).clone();
let repo_id = repo_id.clone(); let repo_id = repo_id.clone();
let tenant_id = tenant_id.clone();
tokio::spawn(async move { tokio::spawn(async move {
tracing::info!("Gitea push webhook: triggering scan for {repo_id}"); tracing::info!(
if let Err(e) = agent_clone.run_scan(&repo_id, ScanTrigger::Webhook).await { "Gitea push webhook: triggering scan for {repo_id} in tenant {tenant_id}"
);
if let Err(e) = agent_clone
.run_scan(&tenant_id, &repo_id, ScanTrigger::Webhook)
.await
{
tracing::error!("Webhook-triggered scan failed: {e}"); tracing::error!("Webhook-triggered scan failed: {e}");
} }
}); });
StatusCode::OK StatusCode::OK
} }
"pull_request" => handle_pull_request(agent, &repo_id, &payload).await, "pull_request" => handle_pull_request(agent, &tenant_id, &repo_id, &payload).await,
_ => { _ => {
tracing::debug!("Gitea webhook: ignoring event '{event}'"); tracing::debug!("Gitea webhook: ignoring event '{event}'");
StatusCode::OK StatusCode::OK
@@ -84,6 +96,7 @@ pub async fn handle_gitea_webhook(
async fn handle_pull_request( async fn handle_pull_request(
agent: Arc<ComplianceAgent>, agent: Arc<ComplianceAgent>,
tenant_id: &str,
repo_id: &str, repo_id: &str,
payload: &serde_json::Value, payload: &serde_json::Value,
) -> StatusCode { ) -> StatusCode {
@@ -106,13 +119,14 @@ async fn handle_pull_request(
} }
let repo_id = repo_id.to_string(); let repo_id = repo_id.to_string();
let tenant_id = tenant_id.to_string();
let head_sha = head_sha.to_string(); let head_sha = head_sha.to_string();
let base_sha = base_sha.to_string(); let base_sha = base_sha.to_string();
let agent_clone = (*agent).clone(); let agent_clone = (*agent).clone();
tokio::spawn(async move { tokio::spawn(async move {
tracing::info!("Gitea PR webhook: reviewing PR #{pr_number} on {repo_id}"); tracing::info!("Gitea PR webhook: reviewing PR #{pr_number} on {repo_id}");
if let Err(e) = agent_clone if let Err(e) = agent_clone
.run_pr_review(&repo_id, pr_number, &base_sha, &head_sha) .run_pr_review(&tenant_id, &repo_id, pr_number, &base_sha, &head_sha)
.await .await
{ {
tracing::error!("PR review failed for #{pr_number}: {e}"); tracing::error!("PR review failed for #{pr_number}: {e}");
+23 -9
View File
@@ -14,24 +14,30 @@ type HmacSha256 = Hmac<Sha256>;
pub async fn handle_github_webhook( pub async fn handle_github_webhook(
Extension(agent): Extension<Arc<ComplianceAgent>>, Extension(agent): Extension<Arc<ComplianceAgent>>,
Path(repo_id): Path<String>, Path((tenant_id, repo_id)): Path<(String, String)>,
headers: HeaderMap, headers: HeaderMap,
body: Bytes, body: Bytes,
) -> StatusCode { ) -> StatusCode {
// Look up the repo to get its webhook secret // Look up the repo in the tenant's database to get its webhook secret
let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) { let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) {
Ok(oid) => oid, Ok(oid) => oid,
Err(_) => return StatusCode::NOT_FOUND, Err(_) => return StatusCode::NOT_FOUND,
}; };
let repo = match agent let db = match agent.db_pool.for_tenant_id(&tenant_id).await {
.db Ok(db) => db,
Err(e) => {
tracing::warn!("GitHub webhook: cannot open tenant database '{tenant_id}': {e}");
return StatusCode::NOT_FOUND;
}
};
let repo = match db
.repositories() .repositories()
.find_one(mongodb::bson::doc! { "_id": oid }) .find_one(mongodb::bson::doc! { "_id": oid })
.await .await
{ {
Ok(Some(repo)) => repo, Ok(Some(repo)) => repo,
_ => { _ => {
tracing::warn!("GitHub webhook: repo {repo_id} not found"); tracing::warn!("GitHub webhook: repo {repo_id} not found in tenant '{tenant_id}'");
return StatusCode::NOT_FOUND; return StatusCode::NOT_FOUND;
} }
}; };
@@ -66,15 +72,21 @@ pub async fn handle_github_webhook(
"push" => { "push" => {
let agent_clone = (*agent).clone(); let agent_clone = (*agent).clone();
let repo_id = repo_id.clone(); let repo_id = repo_id.clone();
let tenant_id = tenant_id.clone();
tokio::spawn(async move { tokio::spawn(async move {
tracing::info!("GitHub push webhook: triggering scan for {repo_id}"); tracing::info!(
if let Err(e) = agent_clone.run_scan(&repo_id, ScanTrigger::Webhook).await { "GitHub push webhook: triggering scan for {repo_id} in tenant {tenant_id}"
);
if let Err(e) = agent_clone
.run_scan(&tenant_id, &repo_id, ScanTrigger::Webhook)
.await
{
tracing::error!("Webhook-triggered scan failed: {e}"); tracing::error!("Webhook-triggered scan failed: {e}");
} }
}); });
StatusCode::OK StatusCode::OK
} }
"pull_request" => handle_pull_request(agent, &repo_id, &payload).await, "pull_request" => handle_pull_request(agent, &tenant_id, &repo_id, &payload).await,
_ => { _ => {
tracing::debug!("GitHub webhook: ignoring event '{event}'"); tracing::debug!("GitHub webhook: ignoring event '{event}'");
StatusCode::OK StatusCode::OK
@@ -84,6 +96,7 @@ pub async fn handle_github_webhook(
async fn handle_pull_request( async fn handle_pull_request(
agent: Arc<ComplianceAgent>, agent: Arc<ComplianceAgent>,
tenant_id: &str,
repo_id: &str, repo_id: &str,
payload: &serde_json::Value, payload: &serde_json::Value,
) -> StatusCode { ) -> StatusCode {
@@ -105,13 +118,14 @@ async fn handle_pull_request(
} }
let repo_id = repo_id.to_string(); let repo_id = repo_id.to_string();
let tenant_id = tenant_id.to_string();
let head_sha = head_sha.to_string(); let head_sha = head_sha.to_string();
let base_sha = base_sha.to_string(); let base_sha = base_sha.to_string();
let agent_clone = (*agent).clone(); let agent_clone = (*agent).clone();
tokio::spawn(async move { tokio::spawn(async move {
tracing::info!("GitHub PR webhook: reviewing PR #{pr_number} on {repo_id}"); tracing::info!("GitHub PR webhook: reviewing PR #{pr_number} on {repo_id}");
if let Err(e) = agent_clone if let Err(e) = agent_clone
.run_pr_review(&repo_id, pr_number, &base_sha, &head_sha) .run_pr_review(&tenant_id, &repo_id, pr_number, &base_sha, &head_sha)
.await .await
{ {
tracing::error!("PR review failed for #{pr_number}: {e}"); tracing::error!("PR review failed for #{pr_number}: {e}");
+23 -9
View File
@@ -10,24 +10,30 @@ use crate::agent::ComplianceAgent;
pub async fn handle_gitlab_webhook( pub async fn handle_gitlab_webhook(
Extension(agent): Extension<Arc<ComplianceAgent>>, Extension(agent): Extension<Arc<ComplianceAgent>>,
Path(repo_id): Path<String>, Path((tenant_id, repo_id)): Path<(String, String)>,
headers: HeaderMap, headers: HeaderMap,
body: Bytes, body: Bytes,
) -> StatusCode { ) -> StatusCode {
// Look up the repo to get its webhook secret // Look up the repo in the tenant's database to get its webhook secret
let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) { let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) {
Ok(oid) => oid, Ok(oid) => oid,
Err(_) => return StatusCode::NOT_FOUND, Err(_) => return StatusCode::NOT_FOUND,
}; };
let repo = match agent let db = match agent.db_pool.for_tenant_id(&tenant_id).await {
.db Ok(db) => db,
Err(e) => {
tracing::warn!("GitLab webhook: cannot open tenant database '{tenant_id}': {e}");
return StatusCode::NOT_FOUND;
}
};
let repo = match db
.repositories() .repositories()
.find_one(mongodb::bson::doc! { "_id": oid }) .find_one(mongodb::bson::doc! { "_id": oid })
.await .await
{ {
Ok(Some(repo)) => repo, Ok(Some(repo)) => repo,
_ => { _ => {
tracing::warn!("GitLab webhook: repo {repo_id} not found"); tracing::warn!("GitLab webhook: repo {repo_id} not found in tenant '{tenant_id}'");
return StatusCode::NOT_FOUND; return StatusCode::NOT_FOUND;
} }
}; };
@@ -59,15 +65,21 @@ pub async fn handle_gitlab_webhook(
"push" => { "push" => {
let agent_clone = (*agent).clone(); let agent_clone = (*agent).clone();
let repo_id = repo_id.clone(); let repo_id = repo_id.clone();
let tenant_id = tenant_id.clone();
tokio::spawn(async move { tokio::spawn(async move {
tracing::info!("GitLab push webhook: triggering scan for {repo_id}"); tracing::info!(
if let Err(e) = agent_clone.run_scan(&repo_id, ScanTrigger::Webhook).await { "GitLab push webhook: triggering scan for {repo_id} in tenant {tenant_id}"
);
if let Err(e) = agent_clone
.run_scan(&tenant_id, &repo_id, ScanTrigger::Webhook)
.await
{
tracing::error!("Webhook-triggered scan failed: {e}"); tracing::error!("Webhook-triggered scan failed: {e}");
} }
}); });
StatusCode::OK StatusCode::OK
} }
"merge_request" => handle_merge_request(agent, &repo_id, &payload).await, "merge_request" => handle_merge_request(agent, &tenant_id, &repo_id, &payload).await,
_ => { _ => {
tracing::debug!("GitLab webhook: ignoring event '{event_type}'"); tracing::debug!("GitLab webhook: ignoring event '{event_type}'");
StatusCode::OK StatusCode::OK
@@ -77,6 +89,7 @@ pub async fn handle_gitlab_webhook(
async fn handle_merge_request( async fn handle_merge_request(
agent: Arc<ComplianceAgent>, agent: Arc<ComplianceAgent>,
tenant_id: &str,
repo_id: &str, repo_id: &str,
payload: &serde_json::Value, payload: &serde_json::Value,
) -> StatusCode { ) -> StatusCode {
@@ -101,13 +114,14 @@ async fn handle_merge_request(
} }
let repo_id = repo_id.to_string(); let repo_id = repo_id.to_string();
let tenant_id = tenant_id.to_string();
let head_sha = head_sha.to_string(); let head_sha = head_sha.to_string();
let base_sha = base_sha.to_string(); let base_sha = base_sha.to_string();
let agent_clone = (*agent).clone(); let agent_clone = (*agent).clone();
tokio::spawn(async move { tokio::spawn(async move {
tracing::info!("GitLab MR webhook: reviewing MR !{mr_iid} on {repo_id}"); tracing::info!("GitLab MR webhook: reviewing MR !{mr_iid} on {repo_id}");
if let Err(e) = agent_clone if let Err(e) = agent_clone
.run_pr_review(&repo_id, mr_iid, &base_sha, &head_sha) .run_pr_review(&tenant_id, &repo_id, mr_iid, &base_sha, &head_sha)
.await .await
{ {
tracing::error!("MR review failed for !{mr_iid}: {e}"); tracing::error!("MR review failed for !{mr_iid}: {e}");
+8 -4
View File
@@ -9,17 +9,21 @@ use crate::webhooks::{gitea, github, gitlab};
pub async fn start_webhook_server(agent: &ComplianceAgent) -> Result<(), AgentError> { pub async fn start_webhook_server(agent: &ComplianceAgent) -> Result<(), AgentError> {
let app = Router::new() let app = Router::new()
// Per-repo webhook URLs: /webhook/{platform}/{repo_id} // Per-tenant per-repo webhook URLs: /webhook/{tenant_id}/{platform}/{repo_id}
// The tenant_id is resolved from the URL path because webhooks
// arrive without a JWT — they're authenticated via per-repo HMAC,
// not via the tenant gate. The dashboard surfaces the full URL
// including the tenant_id when the repo is registered.
.route( .route(
"/webhook/github/{repo_id}", "/webhook/{tenant_id}/github/{repo_id}",
post(github::handle_github_webhook), post(github::handle_github_webhook),
) )
.route( .route(
"/webhook/gitlab/{repo_id}", "/webhook/{tenant_id}/gitlab/{repo_id}",
post(gitlab::handle_gitlab_webhook), post(gitlab::handle_gitlab_webhook),
) )
.route( .route(
"/webhook/gitea/{repo_id}", "/webhook/{tenant_id}/gitea/{repo_id}",
post(gitea::handle_gitea_webhook), post(gitea::handle_gitea_webhook),
) )
.layer(Extension(Arc::new(agent.clone()))); .layer(Extension(Arc::new(agent.clone())));
+20 -8
View File
@@ -7,7 +7,7 @@ use std::sync::Arc;
use compliance_agent::agent::ComplianceAgent; use compliance_agent::agent::ComplianceAgent;
use compliance_agent::api; use compliance_agent::api;
use compliance_agent::database::Database; use compliance_agent::database::DatabasePool;
use compliance_core::AgentConfig; use compliance_core::AgentConfig;
use secrecy::SecretString; use secrecy::SecretString;
@@ -28,10 +28,9 @@ impl TestServer {
// Unique database name per test run to avoid collisions // Unique database name per test run to avoid collisions
let db_name = format!("test_{}", uuid::Uuid::new_v4().simple()); let db_name = format!("test_{}", uuid::Uuid::new_v4().simple());
let db = Database::connect(&mongodb_uri, &db_name) let db_pool = DatabasePool::connect(&mongodb_uri, &db_name)
.await .await
.expect("Failed to connect to MongoDB — is it running?"); .expect("Failed to build DatabasePool");
db.ensure_indexes().await.expect("Failed to create indexes");
let config = AgentConfig { let config = AgentConfig {
mongodb_uri: mongodb_uri.clone(), mongodb_uri: mongodb_uri.clone(),
@@ -69,11 +68,15 @@ impl TestServer {
pentest_imap_password: None, pentest_imap_password: None,
}; };
let agent = ComplianceAgent::new(config, db); let agent = ComplianceAgent::new(config, db_pool);
// Build the router with the agent extension // Build the router with the agent extension. After M7.2-B every
// handler takes a TenantCtx extractor; without KC in the test
// harness, the dev-tenant injector mounts a synthetic context so
// tests run end-to-end against `<db_name>_dev`.
let app = api::routes::build_router() let app = api::routes::build_router()
.layer(axum::extract::Extension(Arc::new(agent))) .layer(axum::extract::Extension(Arc::new(agent)))
.layer(axum::middleware::from_fn(api::server::inject_dev_tenant))
.layer(tower_http::cors::CorsLayer::permissive()); .layer(tower_http::cors::CorsLayer::permissive());
// Bind to port 0 to get a random available port // Bind to port 0 to get a random available port
@@ -156,10 +159,19 @@ impl TestServer {
&self.db_name &self.db_name
} }
/// Drop the test database on cleanup /// Drop every per-tenant database belonging to this test run.
/// Post-M7.2-D the agent never opens a `db_name` directly —
/// data lives only in `<db_name>_<tenant>` per-tenant databases.
pub async fn cleanup(&self) { pub async fn cleanup(&self) {
if let Ok(client) = mongodb::Client::with_uri_str(&self.mongodb_uri).await { if let Ok(client) = mongodb::Client::with_uri_str(&self.mongodb_uri).await {
client.database(&self.db_name).drop().await.ok(); if let Ok(names) = client.list_database_names().await {
let prefix = format!("{}_", self.db_name);
for name in names {
if name.starts_with(&prefix) {
client.database(&name).drop().await.ok();
}
}
}
} }
} }
} }
+298
View File
@@ -0,0 +1,298 @@
//! M7.2-A — `DatabasePool` isolation proof.
//!
//! Two `TenantContext`s, two databases, one client. Insert on A, query
//! on B → empty. Insert on B, query on A → only A's docs. Proves that
//! the per-tenant database split actually isolates at the driver level
//! and not at "we hope we filter."
//!
//! Requires MongoDB. Set `TEST_MONGODB_URI` to override the default
//! `mongodb://root:example@localhost:27017/?authSource=admin`.
#![allow(clippy::expect_used, clippy::unwrap_used)]
use compliance_agent::database::DatabasePool;
use compliance_core::models::TrackedRepository;
use compliance_core::{OrgRole, TenantContext, TenantStatus};
use mongodb::bson::doc;
fn ctx(tenant_id: &str, slug: &str) -> TenantContext {
TenantContext {
tenant_id: tenant_id.to_string(),
tenant_slug: slug.to_string(),
org_roles: vec![OrgRole::ItAdmin],
products: vec!["compliance-scanner".to_string()],
plan: "starter".to_string(),
status: TenantStatus::Active,
user_id: "u-1".to_string(),
user_name: None,
}
}
fn fixture_repo(name: &str, git_url: &str) -> TrackedRepository {
TrackedRepository {
id: None,
name: name.to_string(),
git_url: git_url.to_string(),
default_branch: "main".to_string(),
local_path: None,
scan_schedule: None,
webhook_enabled: false,
webhook_secret: None,
tracker_type: None,
tracker_owner: None,
tracker_repo: None,
tracker_token: None,
auth_token: None,
auth_username: None,
last_scanned_commit: None,
findings_count: 0,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
}
}
#[tokio::test]
async fn pool_isolates_tenants_at_driver_level() {
let uri = std::env::var("TEST_MONGODB_URI")
.unwrap_or_else(|_| "mongodb://root:example@localhost:27017/?authSource=admin".into());
// Unique per run so parallel test invocations don't collide. Kept
// short because Mongo caps db names at 63 bytes (prefix + tenant_id).
let prefix = format!("m72a_{}", short_id());
let pool = DatabasePool::connect(&uri, &prefix)
.await
.expect("Failed to connect to MongoDB — is it running?");
let acme = ctx("00000000-0000-0000-0000-00000000acme", "acme");
let globex = ctx("00000000-0000-0000-0000-0000globex000", "globex");
let acme_db = pool.for_tenant(&acme).await.expect("acme db");
let globex_db = pool.for_tenant(&globex).await.expect("globex db");
// Write distinct repos into each tenant's database.
acme_db
.repositories()
.insert_one(fixture_repo("acme-app", "git@example.com:acme/app.git"))
.await
.expect("insert acme");
globex_db
.repositories()
.insert_one(fixture_repo(
"globex-platform",
"git@example.com:globex/platform.git",
))
.await
.expect("insert globex");
// The point of the whole exercise: acme can ONLY see acme's repo
// and globex can ONLY see globex's, with no filter doc anywhere
// because the isolation is at the database handle, not in the query.
let acme_seen = collect(&acme_db).await;
let globex_seen = collect(&globex_db).await;
assert_eq!(acme_seen.len(), 1, "acme should see exactly its own repo");
assert_eq!(acme_seen[0].name, "acme-app");
assert_eq!(
globex_seen.len(),
1,
"globex should see exactly its own repo"
);
assert_eq!(globex_seen[0].name, "globex-platform");
// Sanity: the two databases really are different by name.
let acme_db_name = pool.tenant_db_name(&acme.tenant_id);
let globex_db_name = pool.tenant_db_name(&globex.tenant_id);
assert_ne!(acme_db_name, globex_db_name);
assert!(acme_db_name.starts_with(&prefix));
// Cleanup — drop both per-tenant databases.
pool.client()
.database(&acme_db_name)
.drop()
.await
.expect("drop acme");
pool.client()
.database(&globex_db_name)
.drop()
.await
.expect("drop globex");
}
#[tokio::test]
async fn for_tenant_is_idempotent_index_creation() {
let uri = std::env::var("TEST_MONGODB_URI")
.unwrap_or_else(|_| "mongodb://root:example@localhost:27017/?authSource=admin".into());
let prefix = format!("m72a_{}", short_id());
let pool = DatabasePool::connect(&uri, &prefix).await.expect("connect");
let acme = ctx("00000000-0000-0000-0000-00000000acme", "acme");
// Second call must not fail (ensure_indexes already ran, in-memory
// marker is set, Mongo's createIndex is idempotent by name anyway).
let _ = pool.for_tenant(&acme).await.expect("first call");
let _ = pool.for_tenant(&acme).await.expect("second call");
let _ = pool.for_tenant(&acme).await.expect("third call");
// Cleanup
let db_name = pool.tenant_db_name(&acme.tenant_id);
pool.client().database(&db_name).drop().await.expect("drop");
}
#[tokio::test]
async fn tenant_db_name_sanitizes_unsafe_characters() {
let uri = std::env::var("TEST_MONGODB_URI")
.unwrap_or_else(|_| "mongodb://root:example@localhost:27017/?authSource=admin".into());
let pool = DatabasePool::connect(&uri, "m72a_sanitize")
.await
.expect("connect");
// Mongo db names cannot contain `/ \ . " $ <space> NUL`. The pool
// must rewrite these without exploding on connect.
let funky = "te/n.a\\nt$id\" with spaces";
let name = pool.tenant_db_name(funky);
for c in ['/', '\\', '.', '"', '$', ' '] {
assert!(
!name.contains(c),
"sanitized db name still contains {c:?}: {name}"
);
}
}
#[tokio::test]
async fn admin_helpers_list_and_drop_tenant_dbs() {
let uri = std::env::var("TEST_MONGODB_URI")
.unwrap_or_else(|_| "mongodb://root:example@localhost:27017/?authSource=admin".into());
let prefix = format!("m72d_{}", short_id());
let pool = DatabasePool::connect(&uri, &prefix).await.expect("connect");
let acme = ctx("00000000-0000-0000-0000-00000000acme", "acme");
let globex = ctx("00000000-0000-0000-0000-0000globex000", "globex");
// Provision two tenants and write a doc into each so the databases
// actually materialize on the cluster (Mongo lazily creates DBs).
let acme_db = pool.for_tenant(&acme).await.expect("acme db");
let globex_db = pool.for_tenant(&globex).await.expect("globex db");
acme_db
.repositories()
.insert_one(fixture_repo("acme-app", "git@example.com:acme/app.git"))
.await
.expect("insert acme");
globex_db
.repositories()
.insert_one(fixture_repo("globex-app", "git@example.com:globex/app.git"))
.await
.expect("insert globex");
// list_tenant_db_names sees both, filtered by prefix
let names = pool.list_tenant_db_names().await.expect("list tenants");
let acme_name = pool.tenant_db_name(&acme.tenant_id);
let globex_name = pool.tenant_db_name(&globex.tenant_id);
assert!(
names.contains(&acme_name),
"expected {acme_name} in {names:?}"
);
assert!(
names.contains(&globex_name),
"expected {globex_name} in {names:?}"
);
for name in &names {
assert!(name.starts_with(&format!("{prefix}_")));
}
// drop_tenant removes acme's DB
pool.drop_tenant(&acme.tenant_id)
.await
.expect("drop acme tenant");
let after = pool
.list_tenant_db_names()
.await
.expect("list tenants after drop");
assert!(
!after.contains(&acme_name),
"acme should be gone after drop, got {after:?}"
);
assert!(
after.contains(&globex_name),
"globex should still be present, got {after:?}"
);
// Cleanup remaining
pool.drop_tenant(&globex.tenant_id)
.await
.expect("drop globex tenant");
}
#[tokio::test]
async fn tenant_db_name_falls_back_to_hash_when_too_long() {
let uri = std::env::var("TEST_MONGODB_URI")
.unwrap_or_else(|_| "mongodb://root:example@localhost:27017/?authSource=admin".into());
let pool = DatabasePool::connect(&uri, "m72a_long")
.await
.expect("connect");
// 100-byte tenant_id would overflow the 63-byte db-name cap with
// any reasonable prefix. The pool must hash it down.
let huge = "x".repeat(100);
let name = pool.tenant_db_name(&huge);
assert!(name.len() <= 63, "hashed name should fit: {name}");
assert!(name.starts_with("m72a_long_"));
// The hash suffix is 32 hex chars (16-byte SHA-256 truncation).
let suffix = name.trim_start_matches("m72a_long_");
assert_eq!(
suffix.len(),
32,
"expected 32-hex suffix (16-byte hash), got {suffix:?}"
);
assert!(suffix.chars().all(|c| c.is_ascii_hexdigit()));
// Stable: same input → same output.
assert_eq!(name, pool.tenant_db_name(&huge));
// Different inputs → different outputs (collision check on a tiny
// sample — full birthday-resistance is a proof not a test).
let huge2 = "y".repeat(100);
assert_ne!(pool.tenant_db_name(&huge), pool.tenant_db_name(&huge2));
}
#[tokio::test]
async fn connect_rejects_overlong_db_prefix() {
let uri = std::env::var("TEST_MONGODB_URI")
.unwrap_or_else(|_| "mongodb://root:example@localhost:27017/?authSource=admin".into());
// MAX_PREFIX_LEN is 30 (= 63 - 1 - 32). A 31-char prefix MUST be
// rejected at construction so the hash-fallback path can never
// produce an over-long db name at runtime.
let too_long = "a".repeat(31);
let err = DatabasePool::connect(&uri, &too_long).await.unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("max is 30") || msg.contains(&too_long),
"error should explain the cap: {msg}"
);
// Exactly 30 chars is the inclusive bound — must succeed.
let just_right = "a".repeat(30);
let _ = DatabasePool::connect(&uri, &just_right)
.await
.expect("30-char prefix should be accepted");
}
/// Short UUID slug for keeping test prefixes well under Mongo's 63-byte
/// db-name cap.
fn short_id() -> String {
uuid::Uuid::new_v4().simple().to_string()[..8].to_string()
}
/// Drain a `repositories` find cursor on the given tenant database.
async fn collect(db: &compliance_agent::database::Database) -> Vec<TrackedRepository> {
let mut cursor = db
.repositories()
.find(doc! {})
.await
.expect("find repositories");
let mut out = Vec::new();
while cursor.advance().await.expect("advance") {
out.push(cursor.deserialize_current().expect("deserialize"));
}
out
}
@@ -0,0 +1,61 @@
//! Authenticated HTTP client for talking to the compliance-agent.
//!
//! Every dashboard server function that hits `comp-dev.meghsakha.com/api/v1/*`
//! must go through here so the Keycloak access token from the user's
//! session is attached as `Authorization: Bearer <token>`. Without it
//! the agent's M7.1 `require_jwt_auth` middleware rejects with 401
//! "Missing authorization header".
//!
//! When Keycloak is not configured (dev convenience), the helper
//! returns an unauthenticated builder — matching the agent's
//! pass-through behavior in the same state.
use dioxus::prelude::ServerFnError;
use dioxus_fullstack::FullstackContext;
use reqwest::Method;
use super::auth::LOGGED_IN_USER_SESS_KEY;
use super::server_state::ServerState;
use super::user_state::UserStateInner;
/// Build a `RequestBuilder` for `<agent_api_url><path>` with the
/// session's access token attached. `path` should include a leading
/// `/`, e.g. `"/api/v1/repositories"`.
pub async fn agent_request(
method: Method,
path: &str,
) -> Result<reqwest::RequestBuilder, ServerFnError> {
let state: ServerState = FullstackContext::extract().await?;
let url = format!("{}{}", state.agent_api_url, path);
let mut req = reqwest::Client::new().request(method, &url);
req = attach_token(req, &state).await?;
Ok(req)
}
/// Same as [`agent_request`] but for `GET`. Convenience for the common case.
pub async fn agent_get(path: &str) -> Result<reqwest::RequestBuilder, ServerFnError> {
agent_request(Method::GET, path).await
}
/// Attach the session's bearer token if Keycloak is configured AND the
/// session has a logged-in user. Otherwise leave the request as-is.
///
/// The Keycloak-disabled path mirrors the dashboard's `require_auth`
/// middleware, which short-circuits when `state.keycloak.is_none()`.
async fn attach_token(
req: reqwest::RequestBuilder,
state: &ServerState,
) -> Result<reqwest::RequestBuilder, ServerFnError> {
if state.keycloak.is_none() {
return Ok(req);
}
let session: tower_sessions::Session = FullstackContext::extract().await?;
let user: Option<UserStateInner> = session
.get(LOGGED_IN_USER_SESS_KEY)
.await
.map_err(|e| ServerFnError::new(format!("session read failed: {e}")))?;
Ok(match user {
Some(u) => req.bearer_auth(u.access_token),
None => req,
})
}
+26 -35
View File
@@ -61,23 +61,21 @@ pub async fn send_chat_message(
message: String, message: String,
history: Vec<ChatHistoryMessage>, history: Vec<ChatHistoryMessage>,
) -> Result<ChatApiResponse, ServerFnError> { ) -> Result<ChatApiResponse, ServerFnError> {
let state: super::server_state::ServerState = // Chat uses a longer timeout because the LLM round-trip can be slow;
dioxus_fullstack::FullstackContext::extract().await?; // agent_request doesn't expose a per-call timeout so we layer one on.
let resp = super::agent_client::agent_request(
let url = format!("{}/api/v1/chat/{repo_id}", state.agent_api_url); reqwest::Method::POST,
let client = reqwest::Client::builder() &format!("/api/v1/chat/{repo_id}"),
.timeout(std::time::Duration::from_secs(120)) )
.build() .await?
.map_err(|e| ServerFnError::new(e.to_string()))?; .timeout(std::time::Duration::from_secs(120))
let resp = client .json(&serde_json::json!({
.post(&url) "message": message,
.json(&serde_json::json!({ "history": history,
"message": message, }))
"history": history, .send()
})) .await
.send() .map_err(|e| ServerFnError::new(format!("Request failed: {e}")))?;
.await
.map_err(|e| ServerFnError::new(format!("Request failed: {e}")))?;
let text = resp let text = resp
.text() .text()
@@ -91,19 +89,14 @@ pub async fn send_chat_message(
#[server] #[server]
pub async fn trigger_embedding_build(repo_id: String) -> Result<(), ServerFnError> { pub async fn trigger_embedding_build(repo_id: String) -> Result<(), ServerFnError> {
let state: super::server_state::ServerState = super::agent_client::agent_request(
dioxus_fullstack::FullstackContext::extract().await?; reqwest::Method::POST,
&format!("/api/v1/chat/{repo_id}/build-embeddings"),
let url = format!( )
"{}/api/v1/chat/{repo_id}/build-embeddings", .await?
state.agent_api_url .send()
); .await
let client = reqwest::Client::new(); .map_err(|e| ServerFnError::new(e.to_string()))?;
client
.post(&url)
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
Ok(()) Ok(())
} }
@@ -111,11 +104,9 @@ pub async fn trigger_embedding_build(repo_id: String) -> Result<(), ServerFnErro
pub async fn fetch_embedding_status( pub async fn fetch_embedding_status(
repo_id: String, repo_id: String,
) -> Result<EmbeddingStatusResponse, ServerFnError> { ) -> Result<EmbeddingStatusResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get(&format!("/api/v1/chat/{repo_id}/status"))
dioxus_fullstack::FullstackContext::extract().await?; .await?
.send()
let url = format!("{}/api/v1/chat/{repo_id}/status", state.agent_api_url);
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let body: EmbeddingStatusResponse = resp let body: EmbeddingStatusResponse = resp
+22 -34
View File
@@ -26,10 +26,9 @@ pub struct DastFindingDetailResponse {
#[server] #[server]
pub async fn fetch_dast_targets() -> Result<DastTargetsResponse, ServerFnError> { pub async fn fetch_dast_targets() -> Result<DastTargetsResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get("/api/v1/dast/targets")
dioxus_fullstack::FullstackContext::extract().await?; .await?
let url = format!("{}/api/v1/dast/targets", state.agent_api_url); .send()
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let body: DastTargetsResponse = resp let body: DastTargetsResponse = resp
@@ -41,10 +40,9 @@ pub async fn fetch_dast_targets() -> Result<DastTargetsResponse, ServerFnError>
#[server] #[server]
pub async fn fetch_dast_scan_runs() -> Result<DastScanRunsResponse, ServerFnError> { pub async fn fetch_dast_scan_runs() -> Result<DastScanRunsResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get("/api/v1/dast/scan-runs")
dioxus_fullstack::FullstackContext::extract().await?; .await?
let url = format!("{}/api/v1/dast/scan-runs", state.agent_api_url); .send()
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let body: DastScanRunsResponse = resp let body: DastScanRunsResponse = resp
@@ -56,10 +54,9 @@ pub async fn fetch_dast_scan_runs() -> Result<DastScanRunsResponse, ServerFnErro
#[server] #[server]
pub async fn fetch_dast_findings() -> Result<DastFindingsResponse, ServerFnError> { pub async fn fetch_dast_findings() -> Result<DastFindingsResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get("/api/v1/dast/findings")
dioxus_fullstack::FullstackContext::extract().await?; .await?
let url = format!("{}/api/v1/dast/findings", state.agent_api_url); .send()
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let body: DastFindingsResponse = resp let body: DastFindingsResponse = resp
@@ -73,10 +70,9 @@ pub async fn fetch_dast_findings() -> Result<DastFindingsResponse, ServerFnError
pub async fn fetch_dast_finding_detail( pub async fn fetch_dast_finding_detail(
id: String, id: String,
) -> Result<DastFindingDetailResponse, ServerFnError> { ) -> Result<DastFindingDetailResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get(&format!("/api/v1/dast/findings/{id}"))
dioxus_fullstack::FullstackContext::extract().await?; .await?
let url = format!("{}/api/v1/dast/findings/{id}", state.agent_api_url); .send()
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let body: DastFindingDetailResponse = resp let body: DastFindingDetailResponse = resp
@@ -88,12 +84,8 @@ pub async fn fetch_dast_finding_detail(
#[server] #[server]
pub async fn add_dast_target(name: String, base_url: String) -> Result<(), ServerFnError> { pub async fn add_dast_target(name: String, base_url: String) -> Result<(), ServerFnError> {
let state: super::server_state::ServerState = super::agent_client::agent_request(reqwest::Method::POST, "/api/v1/dast/targets")
dioxus_fullstack::FullstackContext::extract().await?; .await?
let url = format!("{}/api/v1/dast/targets", state.agent_api_url);
let client = reqwest::Client::new();
client
.post(&url)
.json(&serde_json::json!({ .json(&serde_json::json!({
"name": name, "name": name,
"base_url": base_url, "base_url": base_url,
@@ -106,17 +98,13 @@ pub async fn add_dast_target(name: String, base_url: String) -> Result<(), Serve
#[server] #[server]
pub async fn trigger_dast_scan(target_id: String) -> Result<(), ServerFnError> { pub async fn trigger_dast_scan(target_id: String) -> Result<(), ServerFnError> {
let state: super::server_state::ServerState = super::agent_client::agent_request(
dioxus_fullstack::FullstackContext::extract().await?; reqwest::Method::POST,
let url = format!( &format!("/api/v1/dast/targets/{target_id}/scan"),
"{}/api/v1/dast/targets/{target_id}/scan", )
state.agent_api_url .await?
); .send()
let client = reqwest::Client::new(); .await
client .map_err(|e| ServerFnError::new(e.to_string()))?;
.post(&url)
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
Ok(()) Ok(())
} }
@@ -24,39 +24,35 @@ pub struct FindingsQuery {
#[server] #[server]
pub async fn fetch_findings(query: FindingsQuery) -> Result<FindingsListResponse, ServerFnError> { pub async fn fetch_findings(query: FindingsQuery) -> Result<FindingsListResponse, ServerFnError> {
let state: super::server_state::ServerState = let mut path = format!("/api/v1/findings?page={}&limit=20", query.page);
dioxus_fullstack::FullstackContext::extract().await?;
let mut url = format!(
"{}/api/v1/findings?page={}&limit=20",
state.agent_api_url, query.page
);
if !query.severity.is_empty() { if !query.severity.is_empty() {
url.push_str(&format!("&severity={}", query.severity)); path.push_str(&format!("&severity={}", query.severity));
} }
if !query.scan_type.is_empty() { if !query.scan_type.is_empty() {
url.push_str(&format!("&scan_type={}", query.scan_type)); path.push_str(&format!("&scan_type={}", query.scan_type));
} }
if !query.status.is_empty() { if !query.status.is_empty() {
url.push_str(&format!("&status={}", query.status)); path.push_str(&format!("&status={}", query.status));
} }
if !query.repo_id.is_empty() { if !query.repo_id.is_empty() {
url.push_str(&format!("&repo_id={}", query.repo_id)); path.push_str(&format!("&repo_id={}", query.repo_id));
} }
if !query.q.is_empty() { if !query.q.is_empty() {
url.push_str(&format!( path.push_str(&format!(
"&q={}", "&q={}",
url::form_urlencoded::byte_serialize(query.q.as_bytes()).collect::<String>() url::form_urlencoded::byte_serialize(query.q.as_bytes()).collect::<String>()
)); ));
} }
if !query.sort_by.is_empty() { if !query.sort_by.is_empty() {
url.push_str(&format!("&sort_by={}", query.sort_by)); path.push_str(&format!("&sort_by={}", query.sort_by));
} }
if !query.sort_order.is_empty() { if !query.sort_order.is_empty() {
url.push_str(&format!("&sort_order={}", query.sort_order)); path.push_str(&format!("&sort_order={}", query.sort_order));
} }
let resp = reqwest::get(&url) let resp = super::agent_client::agent_get(&path)
.await?
.send()
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let body: FindingsListResponse = resp let body: FindingsListResponse = resp
@@ -68,11 +64,9 @@ pub async fn fetch_findings(query: FindingsQuery) -> Result<FindingsListResponse
#[server] #[server]
pub async fn fetch_finding_detail(id: String) -> Result<Finding, ServerFnError> { pub async fn fetch_finding_detail(id: String) -> Result<Finding, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get(&format!("/api/v1/findings/{id}"))
dioxus_fullstack::FullstackContext::extract().await?; .await?
let url = format!("{}/api/v1/findings/{id}", state.agent_api_url); .send()
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let body: serde_json::Value = resp let body: serde_json::Value = resp
@@ -86,18 +80,15 @@ pub async fn fetch_finding_detail(id: String) -> Result<Finding, ServerFnError>
#[server] #[server]
pub async fn update_finding_status(id: String, status: String) -> Result<(), ServerFnError> { pub async fn update_finding_status(id: String, status: String) -> Result<(), ServerFnError> {
let state: super::server_state::ServerState = super::agent_client::agent_request(
dioxus_fullstack::FullstackContext::extract().await?; reqwest::Method::PATCH,
let url = format!("{}/api/v1/findings/{id}/status", state.agent_api_url); &format!("/api/v1/findings/{id}/status"),
)
let client = reqwest::Client::new(); .await?
client .json(&serde_json::json!({ "status": status }))
.patch(&url) .send()
.json(&serde_json::json!({ "status": status })) .await
.send() .map_err(|e| ServerFnError::new(e.to_string()))?;
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
Ok(()) Ok(())
} }
@@ -106,34 +97,25 @@ pub async fn bulk_update_finding_status(
ids: Vec<String>, ids: Vec<String>,
status: String, status: String,
) -> Result<(), ServerFnError> { ) -> Result<(), ServerFnError> {
let state: super::server_state::ServerState = super::agent_client::agent_request(reqwest::Method::PATCH, "/api/v1/findings/bulk-status")
dioxus_fullstack::FullstackContext::extract().await?; .await?
let url = format!("{}/api/v1/findings/bulk-status", state.agent_api_url);
let client = reqwest::Client::new();
client
.patch(&url)
.json(&serde_json::json!({ "ids": ids, "status": status })) .json(&serde_json::json!({ "ids": ids, "status": status }))
.send() .send()
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
Ok(()) Ok(())
} }
#[server] #[server]
pub async fn update_finding_feedback(id: String, feedback: String) -> Result<(), ServerFnError> { pub async fn update_finding_feedback(id: String, feedback: String) -> Result<(), ServerFnError> {
let state: super::server_state::ServerState = super::agent_client::agent_request(
dioxus_fullstack::FullstackContext::extract().await?; reqwest::Method::PATCH,
let url = format!("{}/api/v1/findings/{id}/feedback", state.agent_api_url); &format!("/api/v1/findings/{id}/feedback"),
)
let client = reqwest::Client::new(); .await?
client .json(&serde_json::json!({ "feedback": feedback }))
.patch(&url) .send()
.json(&serde_json::json!({ "feedback": feedback })) .await
.send() .map_err(|e| ServerFnError::new(e.to_string()))?;
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
Ok(()) Ok(())
} }
@@ -50,10 +50,9 @@ pub struct SearchResponse {
#[server] #[server]
pub async fn fetch_graph(repo_id: String) -> Result<GraphDataResponse, ServerFnError> { pub async fn fetch_graph(repo_id: String) -> Result<GraphDataResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get(&format!("/api/v1/graph/{repo_id}"))
dioxus_fullstack::FullstackContext::extract().await?; .await?
let url = format!("{}/api/v1/graph/{repo_id}", state.agent_api_url); .send()
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let body: GraphDataResponse = resp let body: GraphDataResponse = resp
@@ -68,15 +67,12 @@ pub async fn fetch_impact(
repo_id: String, repo_id: String,
finding_id: String, finding_id: String,
) -> Result<ImpactResponse, ServerFnError> { ) -> Result<ImpactResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp =
dioxus_fullstack::FullstackContext::extract().await?; super::agent_client::agent_get(&format!("/api/v1/graph/{repo_id}/impact/{finding_id}"))
let url = format!( .await?
"{}/api/v1/graph/{repo_id}/impact/{finding_id}", .send()
state.agent_api_url .await
); .map_err(|e| ServerFnError::new(e.to_string()))?;
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: ImpactResponse = resp let body: ImpactResponse = resp
.json() .json()
.await .await
@@ -86,10 +82,9 @@ pub async fn fetch_impact(
#[server] #[server]
pub async fn fetch_communities(repo_id: String) -> Result<CommunitiesResponse, ServerFnError> { pub async fn fetch_communities(repo_id: String) -> Result<CommunitiesResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get(&format!("/api/v1/graph/{repo_id}/communities"))
dioxus_fullstack::FullstackContext::extract().await?; .await?
let url = format!("{}/api/v1/graph/{repo_id}/communities", state.agent_api_url); .send()
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let body: CommunitiesResponse = resp let body: CommunitiesResponse = resp
@@ -104,15 +99,13 @@ pub async fn fetch_file_content(
repo_id: String, repo_id: String,
file_path: String, file_path: String,
) -> Result<FileContentResponse, ServerFnError> { ) -> Result<FileContentResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get(&format!(
dioxus_fullstack::FullstackContext::extract().await?; "/api/v1/graph/{repo_id}/file-content?path={file_path}"
let url = format!( ))
"{}/api/v1/graph/{repo_id}/file-content?path={file_path}", .await?
state.agent_api_url .send()
); .await
let resp = reqwest::get(&url) .map_err(|e| ServerFnError::new(e.to_string()))?;
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: FileContentResponse = resp let body: FileContentResponse = resp
.json() .json()
.await .await
@@ -122,15 +115,13 @@ pub async fn fetch_file_content(
#[server] #[server]
pub async fn search_nodes(repo_id: String, query: String) -> Result<SearchResponse, ServerFnError> { pub async fn search_nodes(repo_id: String, query: String) -> Result<SearchResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get(&format!(
dioxus_fullstack::FullstackContext::extract().await?; "/api/v1/graph/{repo_id}/search?q={query}&limit=50"
let url = format!( ))
"{}/api/v1/graph/{repo_id}/search?q={query}&limit=50", .await?
state.agent_api_url .send()
); .await
let resp = reqwest::get(&url) .map_err(|e| ServerFnError::new(e.to_string()))?;
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: SearchResponse = resp let body: SearchResponse = resp
.json() .json()
.await .await
@@ -140,14 +131,13 @@ pub async fn search_nodes(repo_id: String, query: String) -> Result<SearchRespon
#[server] #[server]
pub async fn trigger_graph_build(repo_id: String) -> Result<(), ServerFnError> { pub async fn trigger_graph_build(repo_id: String) -> Result<(), ServerFnError> {
let state: super::server_state::ServerState = super::agent_client::agent_request(
dioxus_fullstack::FullstackContext::extract().await?; reqwest::Method::POST,
let url = format!("{}/api/v1/graph/{repo_id}/build", state.agent_api_url); &format!("/api/v1/graph/{repo_id}/build"),
let client = reqwest::Client::new(); )
client .await?
.post(&url) .send()
.send() .await
.await .map_err(|e| ServerFnError::new(e.to_string()))?;
.map_err(|e| ServerFnError::new(e.to_string()))?;
Ok(()) Ok(())
} }
@@ -12,11 +12,9 @@ pub struct IssuesListResponse {
#[server] #[server]
pub async fn fetch_issues(page: u64) -> Result<IssuesListResponse, ServerFnError> { pub async fn fetch_issues(page: u64) -> Result<IssuesListResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get(&format!("/api/v1/issues?page={page}&limit=20"))
dioxus_fullstack::FullstackContext::extract().await?; .await?
let url = format!("{}/api/v1/issues?page={page}&limit=20", state.agent_api_url); .send()
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let body: IssuesListResponse = resp let body: IssuesListResponse = resp
@@ -18,6 +18,8 @@ pub mod stats;
// Server-only modules // Server-only modules
#[cfg(feature = "server")] #[cfg(feature = "server")]
mod agent_client;
#[cfg(feature = "server")]
mod auth; mod auth;
#[cfg(feature = "server")] #[cfg(feature = "server")]
mod auth_middleware; mod auth_middleware;
@@ -32,11 +32,9 @@ pub struct NotificationCountResponse {
#[server] #[server]
pub async fn fetch_notification_count() -> Result<u64, ServerFnError> { pub async fn fetch_notification_count() -> Result<u64, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get("/api/v1/notifications/count")
dioxus_fullstack::FullstackContext::extract().await?; .await?
.send()
let url = format!("{}/api/v1/notifications/count", state.agent_api_url);
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let body: NotificationCountResponse = resp let body: NotificationCountResponse = resp
@@ -48,11 +46,9 @@ pub async fn fetch_notification_count() -> Result<u64, ServerFnError> {
#[server] #[server]
pub async fn fetch_notifications() -> Result<NotificationListResponse, ServerFnError> { pub async fn fetch_notifications() -> Result<NotificationListResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get("/api/v1/notifications?limit=20")
dioxus_fullstack::FullstackContext::extract().await?; .await?
.send()
let url = format!("{}/api/v1/notifications?limit=20", state.agent_api_url);
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let body: NotificationListResponse = resp let body: NotificationListResponse = resp
@@ -64,12 +60,8 @@ pub async fn fetch_notifications() -> Result<NotificationListResponse, ServerFnE
#[server] #[server]
pub async fn mark_all_notifications_read() -> Result<(), ServerFnError> { pub async fn mark_all_notifications_read() -> Result<(), ServerFnError> {
let state: super::server_state::ServerState = super::agent_client::agent_request(reqwest::Method::POST, "/api/v1/notifications/read-all")
dioxus_fullstack::FullstackContext::extract().await?; .await?
let url = format!("{}/api/v1/notifications/read-all", state.agent_api_url);
reqwest::Client::new()
.post(&url)
.send() .send()
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
@@ -78,14 +70,13 @@ pub async fn mark_all_notifications_read() -> Result<(), ServerFnError> {
#[server] #[server]
pub async fn dismiss_notification(id: String) -> Result<(), ServerFnError> { pub async fn dismiss_notification(id: String) -> Result<(), ServerFnError> {
let state: super::server_state::ServerState = super::agent_client::agent_request(
dioxus_fullstack::FullstackContext::extract().await?; reqwest::Method::PATCH,
&format!("/api/v1/notifications/{id}/dismiss"),
let url = format!("{}/api/v1/notifications/{id}/dismiss", state.agent_api_url); )
reqwest::Client::new() .await?
.patch(&url) .send()
.send() .await
.await .map_err(|e| ServerFnError::new(e.to_string()))?;
.map_err(|e| ServerFnError::new(e.to_string()))?;
Ok(()) Ok(())
} }
+145 -184
View File
@@ -32,12 +32,10 @@ pub struct AttackChainResponse {
#[server] #[server]
pub async fn fetch_pentest_sessions() -> Result<PentestSessionsResponse, ServerFnError> { pub async fn fetch_pentest_sessions() -> Result<PentestSessionsResponse, ServerFnError> {
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
// Fetch sessions // Fetch sessions
let url = format!("{}/api/v1/pentest/sessions", state.agent_api_url); let resp = super::agent_client::agent_get("/api/v1/pentest/sessions")
let resp = reqwest::get(&url) .await?
.send()
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let mut body: PentestSessionsResponse = resp let mut body: PentestSessionsResponse = resp
@@ -46,31 +44,32 @@ pub async fn fetch_pentest_sessions() -> Result<PentestSessionsResponse, ServerF
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
// Fetch DAST targets to resolve target names // Fetch DAST targets to resolve target names
let targets_url = format!("{}/api/v1/dast/targets", state.agent_api_url); if let Ok(tresp_builder) = super::agent_client::agent_get("/api/v1/dast/targets").await {
if let Ok(tresp) = reqwest::get(&targets_url).await { if let Ok(tresp) = tresp_builder.send().await {
if let Ok(tbody) = tresp.json::<serde_json::Value>().await { if let Ok(tbody) = tresp.json::<serde_json::Value>().await {
let targets = tbody.get("data").and_then(|v| v.as_array()); let targets = tbody.get("data").and_then(|v| v.as_array());
if let Some(targets) = targets { if let Some(targets) = targets {
// Build target_id -> name lookup // Build target_id -> name lookup
let target_map: std::collections::HashMap<String, String> = targets let target_map: std::collections::HashMap<String, String> = targets
.iter() .iter()
.filter_map(|t| { .filter_map(|t| {
let id = t.get("_id")?.get("$oid")?.as_str()?.to_string(); let id = t.get("_id")?.get("$oid")?.as_str()?.to_string();
let name = t.get("name")?.as_str()?.to_string(); let name = t.get("name")?.as_str()?.to_string();
Some((id, name)) Some((id, name))
}) })
.collect(); .collect();
// Enrich sessions with target_name // Enrich sessions with target_name
for session in body.data.iter_mut() { for session in body.data.iter_mut() {
if let Some(tid) = session.get("target_id").and_then(|v| v.as_str()) { if let Some(tid) = session.get("target_id").and_then(|v| v.as_str()) {
if let Some(name) = target_map.get(tid) { if let Some(name) = target_map.get(tid) {
session.as_object_mut().map(|obj| { session.as_object_mut().map(|obj| {
obj.insert( obj.insert(
"target_name".to_string(), "target_name".to_string(),
serde_json::Value::String(name.clone()), serde_json::Value::String(name.clone()),
) )
}); });
}
} }
} }
} }
@@ -83,10 +82,9 @@ pub async fn fetch_pentest_sessions() -> Result<PentestSessionsResponse, ServerF
#[server] #[server]
pub async fn fetch_pentest_session(id: String) -> Result<PentestSessionResponse, ServerFnError> { pub async fn fetch_pentest_session(id: String) -> Result<PentestSessionResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get(&format!("/api/v1/pentest/sessions/{id}"))
dioxus_fullstack::FullstackContext::extract().await?; .await?
let url = format!("{}/api/v1/pentest/sessions/{id}", state.agent_api_url); .send()
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let mut body: PentestSessionResponse = resp let mut body: PentestSessionResponse = resp
@@ -96,26 +94,27 @@ pub async fn fetch_pentest_session(id: String) -> Result<PentestSessionResponse,
// Resolve target name from targets list // Resolve target name from targets list
if let Some(tid) = body.data.get("target_id").and_then(|v| v.as_str()) { if let Some(tid) = body.data.get("target_id").and_then(|v| v.as_str()) {
let targets_url = format!("{}/api/v1/dast/targets", state.agent_api_url); if let Ok(tresp_builder) = super::agent_client::agent_get("/api/v1/dast/targets").await {
if let Ok(tresp) = reqwest::get(&targets_url).await { if let Ok(tresp) = tresp_builder.send().await {
if let Ok(tbody) = tresp.json::<serde_json::Value>().await { if let Ok(tbody) = tresp.json::<serde_json::Value>().await {
if let Some(targets) = tbody.get("data").and_then(|v| v.as_array()) { if let Some(targets) = tbody.get("data").and_then(|v| v.as_array()) {
for t in targets { for t in targets {
let t_id = t let t_id = t
.get("_id") .get("_id")
.and_then(|v| v.get("$oid")) .and_then(|v| v.get("$oid"))
.and_then(|v| v.as_str()) .and_then(|v| v.as_str())
.unwrap_or(""); .unwrap_or("");
if t_id == tid { if t_id == tid {
if let Some(name) = t.get("name").and_then(|v| v.as_str()) { if let Some(name) = t.get("name").and_then(|v| v.as_str()) {
body.data.as_object_mut().map(|obj| { body.data.as_object_mut().map(|obj| {
obj.insert( obj.insert(
"target_name".to_string(), "target_name".to_string(),
serde_json::Value::String(name.to_string()), serde_json::Value::String(name.to_string()),
) )
}); });
}
break;
} }
break;
} }
} }
} }
@@ -130,15 +129,12 @@ pub async fn fetch_pentest_session(id: String) -> Result<PentestSessionResponse,
pub async fn fetch_pentest_messages( pub async fn fetch_pentest_messages(
session_id: String, session_id: String,
) -> Result<PentestMessagesResponse, ServerFnError> { ) -> Result<PentestMessagesResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp =
dioxus_fullstack::FullstackContext::extract().await?; super::agent_client::agent_get(&format!("/api/v1/pentest/sessions/{session_id}/messages"))
let url = format!( .await?
"{}/api/v1/pentest/sessions/{session_id}/messages", .send()
state.agent_api_url .await
); .map_err(|e| ServerFnError::new(e.to_string()))?;
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: PentestMessagesResponse = resp let body: PentestMessagesResponse = resp
.json() .json()
.await .await
@@ -148,10 +144,9 @@ pub async fn fetch_pentest_messages(
#[server] #[server]
pub async fn fetch_pentest_stats() -> Result<PentestStatsResponse, ServerFnError> { pub async fn fetch_pentest_stats() -> Result<PentestStatsResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get("/api/v1/pentest/stats")
dioxus_fullstack::FullstackContext::extract().await?; .await?
let url = format!("{}/api/v1/pentest/stats", state.agent_api_url); .send()
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let body: PentestStatsResponse = resp let body: PentestStatsResponse = resp
@@ -163,15 +158,13 @@ pub async fn fetch_pentest_stats() -> Result<PentestStatsResponse, ServerFnError
#[server] #[server]
pub async fn fetch_attack_chain(session_id: String) -> Result<AttackChainResponse, ServerFnError> { pub async fn fetch_attack_chain(session_id: String) -> Result<AttackChainResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get(&format!(
dioxus_fullstack::FullstackContext::extract().await?; "/api/v1/pentest/sessions/{session_id}/attack-chain"
let url = format!( ))
"{}/api/v1/pentest/sessions/{session_id}/attack-chain", .await?
state.agent_api_url .send()
); .await
let resp = reqwest::get(&url) .map_err(|e| ServerFnError::new(e.to_string()))?;
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: AttackChainResponse = resp let body: AttackChainResponse = resp
.json() .json()
.await .await
@@ -185,20 +178,17 @@ pub async fn create_pentest_session(
strategy: String, strategy: String,
message: String, message: String,
) -> Result<PentestSessionResponse, ServerFnError> { ) -> Result<PentestSessionResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp =
dioxus_fullstack::FullstackContext::extract().await?; super::agent_client::agent_request(reqwest::Method::POST, "/api/v1/pentest/sessions")
let url = format!("{}/api/v1/pentest/sessions", state.agent_api_url); .await?
let client = reqwest::Client::new(); .json(&serde_json::json!({
let resp = client "target_id": target_id,
.post(&url) "strategy": strategy,
.json(&serde_json::json!({ "message": message,
"target_id": target_id, }))
"strategy": strategy, .send()
"message": message, .await
})) .map_err(|e| ServerFnError::new(e.to_string()))?;
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: PentestSessionResponse = resp let body: PentestSessionResponse = resp
.json() .json()
.await .await
@@ -211,18 +201,15 @@ pub async fn create_pentest_session(
pub async fn create_pentest_session_wizard( pub async fn create_pentest_session_wizard(
config_json: String, config_json: String,
) -> Result<PentestSessionResponse, ServerFnError> { ) -> Result<PentestSessionResponse, ServerFnError> {
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/pentest/sessions", state.agent_api_url);
let config: serde_json::Value = let config: serde_json::Value =
serde_json::from_str(&config_json).map_err(|e| ServerFnError::new(e.to_string()))?; serde_json::from_str(&config_json).map_err(|e| ServerFnError::new(e.to_string()))?;
let client = reqwest::Client::new(); let resp =
let resp = client super::agent_client::agent_request(reqwest::Method::POST, "/api/v1/pentest/sessions")
.post(&url) .await?
.json(&serde_json::json!({ "config": config })) .json(&serde_json::json!({ "config": config }))
.send() .send()
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
if !resp.status().is_success() { if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default(); let text = resp.text().await.unwrap_or_default();
return Err(ServerFnError::new(format!( return Err(ServerFnError::new(format!(
@@ -239,8 +226,6 @@ pub async fn create_pentest_session_wizard(
/// Look up a tracked repository by its git URL /// Look up a tracked repository by its git URL
#[server] #[server]
pub async fn lookup_repo_by_url(url: String) -> Result<serde_json::Value, ServerFnError> { pub async fn lookup_repo_by_url(url: String) -> Result<serde_json::Value, ServerFnError> {
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let encoded_url: String = url let encoded_url: String = url
.bytes() .bytes()
.flat_map(|b| { .flat_map(|b| {
@@ -251,13 +236,12 @@ pub async fn lookup_repo_by_url(url: String) -> Result<serde_json::Value, Server
} }
}) })
.collect(); .collect();
let api_url = format!( let resp =
"{}/api/v1/pentest/lookup-repo?url={}", super::agent_client::agent_get(&format!("/api/v1/pentest/lookup-repo?url={encoded_url}"))
state.agent_api_url, encoded_url .await?
); .send()
let resp = reqwest::get(&api_url) .await
.await .map_err(|e| ServerFnError::new(e.to_string()))?;
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: serde_json::Value = resp let body: serde_json::Value = resp
.json() .json()
.await .await
@@ -270,21 +254,17 @@ pub async fn send_pentest_message(
session_id: String, session_id: String,
message: String, message: String,
) -> Result<PentestMessagesResponse, ServerFnError> { ) -> Result<PentestMessagesResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_request(
dioxus_fullstack::FullstackContext::extract().await?; reqwest::Method::POST,
let url = format!( &format!("/api/v1/pentest/sessions/{session_id}/chat"),
"{}/api/v1/pentest/sessions/{session_id}/chat", )
state.agent_api_url .await?
); .json(&serde_json::json!({
let client = reqwest::Client::new(); "message": message,
let resp = client }))
.post(&url) .send()
.json(&serde_json::json!({ .await
"message": message, .map_err(|e| ServerFnError::new(e.to_string()))?;
}))
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: PentestMessagesResponse = resp let body: PentestMessagesResponse = resp
.json() .json()
.await .await
@@ -294,35 +274,27 @@ pub async fn send_pentest_message(
#[server] #[server]
pub async fn stop_pentest_session(session_id: String) -> Result<(), ServerFnError> { pub async fn stop_pentest_session(session_id: String) -> Result<(), ServerFnError> {
let state: super::server_state::ServerState = super::agent_client::agent_request(
dioxus_fullstack::FullstackContext::extract().await?; reqwest::Method::POST,
let url = format!( &format!("/api/v1/pentest/sessions/{session_id}/stop"),
"{}/api/v1/pentest/sessions/{session_id}/stop", )
state.agent_api_url .await?
); .send()
let client = reqwest::Client::new(); .await
client .map_err(|e| ServerFnError::new(e.to_string()))?;
.post(&url)
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
Ok(()) Ok(())
} }
#[server] #[server]
pub async fn pause_pentest_session(session_id: String) -> Result<(), ServerFnError> { pub async fn pause_pentest_session(session_id: String) -> Result<(), ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_request(
dioxus_fullstack::FullstackContext::extract().await?; reqwest::Method::POST,
let url = format!( &format!("/api/v1/pentest/sessions/{session_id}/pause"),
"{}/api/v1/pentest/sessions/{session_id}/pause", )
state.agent_api_url .await?
); .send()
let client = reqwest::Client::new(); .await
let resp = client .map_err(|e| ServerFnError::new(e.to_string()))?;
.post(&url)
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
if !resp.status().is_success() { if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default(); let text = resp.text().await.unwrap_or_default();
return Err(ServerFnError::new(format!("Pause failed: {text}"))); return Err(ServerFnError::new(format!("Pause failed: {text}")));
@@ -332,18 +304,14 @@ pub async fn pause_pentest_session(session_id: String) -> Result<(), ServerFnErr
#[server] #[server]
pub async fn resume_pentest_session(session_id: String) -> Result<(), ServerFnError> { pub async fn resume_pentest_session(session_id: String) -> Result<(), ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_request(
dioxus_fullstack::FullstackContext::extract().await?; reqwest::Method::POST,
let url = format!( &format!("/api/v1/pentest/sessions/{session_id}/resume"),
"{}/api/v1/pentest/sessions/{session_id}/resume", )
state.agent_api_url .await?
); .send()
let client = reqwest::Client::new(); .await
let resp = client .map_err(|e| ServerFnError::new(e.to_string()))?;
.post(&url)
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
if !resp.status().is_success() { if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default(); let text = resp.text().await.unwrap_or_default();
return Err(ServerFnError::new(format!("Resume failed: {text}"))); return Err(ServerFnError::new(format!("Resume failed: {text}")));
@@ -355,15 +323,12 @@ pub async fn resume_pentest_session(session_id: String) -> Result<(), ServerFnEr
pub async fn fetch_pentest_findings( pub async fn fetch_pentest_findings(
session_id: String, session_id: String,
) -> Result<DastFindingsResponse, ServerFnError> { ) -> Result<DastFindingsResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp =
dioxus_fullstack::FullstackContext::extract().await?; super::agent_client::agent_get(&format!("/api/v1/pentest/sessions/{session_id}/findings"))
let url = format!( .await?
"{}/api/v1/pentest/sessions/{session_id}/findings", .send()
state.agent_api_url .await
); .map_err(|e| ServerFnError::new(e.to_string()))?;
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: DastFindingsResponse = resp let body: DastFindingsResponse = resp
.json() .json()
.await .await
@@ -385,23 +350,19 @@ pub async fn export_pentest_report(
requester_name: String, requester_name: String,
requester_email: String, requester_email: String,
) -> Result<ExportReportResponse, ServerFnError> { ) -> Result<ExportReportResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_request(
dioxus_fullstack::FullstackContext::extract().await?; reqwest::Method::POST,
let url = format!( &format!("/api/v1/pentest/sessions/{session_id}/export"),
"{}/api/v1/pentest/sessions/{session_id}/export", )
state.agent_api_url .await?
); .json(&serde_json::json!({
let client = reqwest::Client::new(); "password": password,
let resp = client "requester_name": requester_name,
.post(&url) "requester_email": requester_email,
.json(&serde_json::json!({ }))
"password": password, .send()
"requester_name": requester_name, .await
"requester_email": requester_email, .map_err(|e| ServerFnError::new(e.to_string()))?;
}))
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
if !resp.status().is_success() { if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default(); let text = resp.text().await.unwrap_or_default();
return Err(ServerFnError::new(format!("Export failed: {text}"))); return Err(ServerFnError::new(format!("Export failed: {text}")));
@@ -12,14 +12,10 @@ pub struct RepositoryListResponse {
#[server] #[server]
pub async fn fetch_repositories(page: u64) -> Result<RepositoryListResponse, ServerFnError> { pub async fn fetch_repositories(page: u64) -> Result<RepositoryListResponse, ServerFnError> {
let state: super::server_state::ServerState = let path = format!("/api/v1/repositories?page={page}&limit=20");
dioxus_fullstack::FullstackContext::extract().await?; let resp = super::agent_client::agent_get(&path)
let url = format!( .await?
"{}/api/v1/repositories?page={page}&limit=20", .send()
state.agent_api_url
);
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let body: RepositoryListResponse = resp let body: RepositoryListResponse = resp
@@ -41,10 +37,6 @@ pub async fn add_repository(
tracker_repo: Option<String>, tracker_repo: Option<String>,
tracker_token: Option<String>, tracker_token: Option<String>,
) -> Result<(), ServerFnError> { ) -> Result<(), ServerFnError> {
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/repositories", state.agent_api_url);
let mut body = serde_json::json!({ let mut body = serde_json::json!({
"name": name, "name": name,
"git_url": git_url, "git_url": git_url,
@@ -69,9 +61,8 @@ pub async fn add_repository(
body["tracker_token"] = serde_json::Value::String(tk); body["tracker_token"] = serde_json::Value::String(tk);
} }
let client = reqwest::Client::new(); let resp = super::agent_client::agent_request(reqwest::Method::POST, "/api/v1/repositories")
let resp = client .await?
.post(&url)
.json(&body) .json(&body)
.send() .send()
.await .await
@@ -100,10 +91,6 @@ pub async fn update_repository(
tracker_token: Option<String>, tracker_token: Option<String>,
scan_schedule: Option<String>, scan_schedule: Option<String>,
) -> Result<(), ServerFnError> { ) -> Result<(), ServerFnError> {
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/repositories/{repo_id}", state.agent_api_url);
let mut body = serde_json::Map::new(); let mut body = serde_json::Map::new();
if let Some(v) = name.filter(|s| !s.is_empty()) { if let Some(v) = name.filter(|s| !s.is_empty()) {
body.insert("name".into(), serde_json::Value::String(v)); body.insert("name".into(), serde_json::Value::String(v));
@@ -133,13 +120,15 @@ pub async fn update_repository(
body.insert("scan_schedule".into(), serde_json::Value::String(v)); body.insert("scan_schedule".into(), serde_json::Value::String(v));
} }
let client = reqwest::Client::new(); let resp = super::agent_client::agent_request(
let resp = client reqwest::Method::PATCH,
.patch(&url) &format!("/api/v1/repositories/{repo_id}"),
.json(&body) )
.send() .await?
.await .json(&body)
.map_err(|e| ServerFnError::new(e.to_string()))?; .send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
if !resp.status().is_success() { if !resp.status().is_success() {
let text = resp.text().await.unwrap_or_default(); let text = resp.text().await.unwrap_or_default();
@@ -153,11 +142,9 @@ pub async fn update_repository(
#[server] #[server]
pub async fn fetch_ssh_public_key() -> Result<String, ServerFnError> { pub async fn fetch_ssh_public_key() -> Result<String, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get("/api/v1/settings/ssh-public-key")
dioxus_fullstack::FullstackContext::extract().await?; .await?
let url = format!("{}/api/v1/settings/ssh-public-key", state.agent_api_url); .send()
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
@@ -179,16 +166,14 @@ pub async fn fetch_ssh_public_key() -> Result<String, ServerFnError> {
#[server] #[server]
pub async fn delete_repository(repo_id: String) -> Result<(), ServerFnError> { pub async fn delete_repository(repo_id: String) -> Result<(), ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_request(
dioxus_fullstack::FullstackContext::extract().await?; reqwest::Method::DELETE,
let url = format!("{}/api/v1/repositories/{repo_id}", state.agent_api_url); &format!("/api/v1/repositories/{repo_id}"),
)
let client = reqwest::Client::new(); .await?
let resp = client .send()
.delete(&url) .await
.send() .map_err(|e| ServerFnError::new(e.to_string()))?;
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
if !resp.status().is_success() { if !resp.status().is_success() {
let body = resp.text().await.unwrap_or_default(); let body = resp.text().await.unwrap_or_default();
@@ -202,16 +187,14 @@ pub async fn delete_repository(repo_id: String) -> Result<(), ServerFnError> {
#[server] #[server]
pub async fn trigger_repo_scan(repo_id: String) -> Result<(), ServerFnError> { pub async fn trigger_repo_scan(repo_id: String) -> Result<(), ServerFnError> {
let state: super::server_state::ServerState = super::agent_client::agent_request(
dioxus_fullstack::FullstackContext::extract().await?; reqwest::Method::POST,
let url = format!("{}/api/v1/repositories/{repo_id}/scan", state.agent_api_url); &format!("/api/v1/repositories/{repo_id}/scan"),
)
let client = reqwest::Client::new(); .await?
client .send()
.post(&url) .await
.send() .map_err(|e| ServerFnError::new(e.to_string()))?;
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
Ok(()) Ok(())
} }
@@ -224,16 +207,12 @@ pub struct WebhookConfigResponse {
#[server] #[server]
pub async fn fetch_webhook_config(repo_id: String) -> Result<WebhookConfigResponse, ServerFnError> { pub async fn fetch_webhook_config(repo_id: String) -> Result<WebhookConfigResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp =
dioxus_fullstack::FullstackContext::extract().await?; super::agent_client::agent_get(&format!("/api/v1/repositories/{repo_id}/webhook-config"))
let url = format!( .await?
"{}/api/v1/repositories/{repo_id}/webhook-config", .send()
state.agent_api_url .await
); .map_err(|e| ServerFnError::new(e.to_string()))?;
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: WebhookConfigResponse = resp let body: WebhookConfigResponse = resp
.json() .json()
.await .await
@@ -244,11 +223,9 @@ pub async fn fetch_webhook_config(repo_id: String) -> Result<WebhookConfigRespon
/// Check if a repository has any running scans /// Check if a repository has any running scans
#[server] #[server]
pub async fn check_repo_scanning(repo_id: String) -> Result<bool, ServerFnError> { pub async fn check_repo_scanning(repo_id: String) -> Result<bool, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get("/api/v1/scan-runs?page=1&limit=1")
dioxus_fullstack::FullstackContext::extract().await?; .await?
let url = format!("{}/api/v1/scan-runs?page=1&limit=1", state.agent_api_url); .send()
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let body: serde_json::Value = resp let body: serde_json::Value = resp
+20 -35
View File
@@ -87,11 +87,9 @@ pub struct SbomFiltersResponse {
#[server] #[server]
pub async fn fetch_sbom_filters() -> Result<SbomFiltersResponse, ServerFnError> { pub async fn fetch_sbom_filters() -> Result<SbomFiltersResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get("/api/v1/sbom/filters")
dioxus_fullstack::FullstackContext::extract().await?; .await?
.send()
let url = format!("{}/api/v1/sbom/filters", state.agent_api_url);
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let text = resp let text = resp
@@ -112,9 +110,6 @@ pub async fn fetch_sbom_filtered(
license: Option<String>, license: Option<String>,
page: u64, page: u64,
) -> Result<SbomListResponse, ServerFnError> { ) -> Result<SbomListResponse, ServerFnError> {
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let mut params = vec![format!("page={page}"), "limit=50".to_string()]; let mut params = vec![format!("page={page}"), "limit=50".to_string()];
if let Some(r) = &repo_id { if let Some(r) = &repo_id {
if !r.is_empty() { if !r.is_empty() {
@@ -140,9 +135,10 @@ pub async fn fetch_sbom_filtered(
} }
} }
let url = format!("{}/api/v1/sbom?{}", state.agent_api_url, params.join("&")); let path = format!("/api/v1/sbom?{}", params.join("&"));
let resp = super::agent_client::agent_get(&path)
let resp = reqwest::get(&url) .await?
.send()
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let text = resp let text = resp
@@ -156,15 +152,10 @@ pub async fn fetch_sbom_filtered(
#[server] #[server]
pub async fn fetch_sbom_export(repo_id: String, format: String) -> Result<String, ServerFnError> { pub async fn fetch_sbom_export(repo_id: String, format: String) -> Result<String, ServerFnError> {
let state: super::server_state::ServerState = let path = format!("/api/v1/sbom/export?repo_id={repo_id}&format={format}");
dioxus_fullstack::FullstackContext::extract().await?; let resp = super::agent_client::agent_get(&path)
.await?
let url = format!( .send()
"{}/api/v1/sbom/export?repo_id={}&format={}",
state.agent_api_url, repo_id, format
);
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let text = resp let text = resp
@@ -178,17 +169,16 @@ pub async fn fetch_sbom_export(repo_id: String, format: String) -> Result<String
pub async fn fetch_license_summary( pub async fn fetch_license_summary(
repo_id: Option<String>, repo_id: Option<String>,
) -> Result<LicenseSummaryResponse, ServerFnError> { ) -> Result<LicenseSummaryResponse, ServerFnError> {
let state: super::server_state::ServerState = let mut path = "/api/v1/sbom/licenses".to_string();
dioxus_fullstack::FullstackContext::extract().await?;
let mut url = format!("{}/api/v1/sbom/licenses", state.agent_api_url);
if let Some(r) = &repo_id { if let Some(r) = &repo_id {
if !r.is_empty() { if !r.is_empty() {
url = format!("{url}?repo_id={r}"); path = format!("{path}?repo_id={r}");
} }
} }
let resp = reqwest::get(&url) let resp = super::agent_client::agent_get(&path)
.await?
.send()
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let text = resp let text = resp
@@ -205,15 +195,10 @@ pub async fn fetch_sbom_diff(
repo_a: String, repo_a: String,
repo_b: String, repo_b: String,
) -> Result<SbomDiffResponse, ServerFnError> { ) -> Result<SbomDiffResponse, ServerFnError> {
let state: super::server_state::ServerState = let path = format!("/api/v1/sbom/diff?repo_a={repo_a}&repo_b={repo_b}");
dioxus_fullstack::FullstackContext::extract().await?; let resp = super::agent_client::agent_get(&path)
.await?
let url = format!( .send()
"{}/api/v1/sbom/diff?repo_a={}&repo_b={}",
state.agent_api_url, repo_a, repo_b
);
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let text = resp let text = resp
@@ -12,14 +12,9 @@ pub struct ScansListResponse {
#[server] #[server]
pub async fn fetch_scan_runs(page: u64) -> Result<ScansListResponse, ServerFnError> { pub async fn fetch_scan_runs(page: u64) -> Result<ScansListResponse, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get(&format!("/api/v1/scan-runs?page={page}&limit=20"))
dioxus_fullstack::FullstackContext::extract().await?; .await?
let url = format!( .send()
"{}/api/v1/scan-runs?page={page}&limit=20",
state.agent_api_url
);
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let body: ScansListResponse = resp let body: ScansListResponse = resp
@@ -16,11 +16,9 @@ pub struct OverviewStats {
#[server] #[server]
pub async fn fetch_overview_stats() -> Result<OverviewStats, ServerFnError> { pub async fn fetch_overview_stats() -> Result<OverviewStats, ServerFnError> {
let state: super::server_state::ServerState = let resp = super::agent_client::agent_get("/api/v1/stats/overview")
dioxus_fullstack::FullstackContext::extract().await?; .await?
let url = format!("{}/api/v1/stats/overview", state.agent_api_url); .send()
let resp = reqwest::get(&url)
.await .await
.map_err(|e| ServerFnError::new(e.to_string()))?; .map_err(|e| ServerFnError::new(e.to_string()))?;
let body: serde_json::Value = resp let body: serde_json::Value = resp