Compare commits

..

1 Commits

Author SHA1 Message Date
Sharang Parnerkar f474699279 fix(core): JWKS refresh-on-failure in M7.1 auth middleware
CI / Check (pull_request) Successful in 8m17s
CI / Detect Changes (pull_request) Has been skipped
CI / Deploy Agent (pull_request) Has been skipped
CI / Deploy Dashboard (pull_request) Has been skipped
CI / Deploy Docs (pull_request) Has been skipped
CI / Deploy MCP (pull_request) Has been skipped
Without this, every Keycloak signing-key rotation produces a silent
401 storm against every request until the agent restarts — the cached
JWKS is held forever and never reconciled against KC.

Now: when `kid` isn't in the cached JWKS or the matching key fails
signature verification, we classify the failure as Stale, force a JWKS
refresh, and retry once. Anything else (expired, malformed, missing
tenant_id) is Permanent and short-circuits straight to 401.

* Splits the path into a pure `try_validate(token, header, kid, jwks)`
  helper returning a `ValidationError { Stale | Permanent }` enum.
* `fetch_or_get_jwks(state, force)` takes a force flag and holds the
  write lock across the network fetch so concurrent refreshers don't
  all hammer Keycloak when keys rotate (the second writer reuses what
  the first put in cache).
* Adds a unit test for the kid-not-found Stale classification.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-04 16:40:55 +02:00
44 changed files with 870 additions and 1556 deletions
Generated
-1
View File
@@ -687,7 +687,6 @@ dependencies = [
"tokio-cron-scheduler",
"tokio-stream",
"tokio-tungstenite 0.26.2",
"tower",
"tower-http",
"tracing",
"tracing-subscriber",
+2 -3
View File
@@ -7,7 +7,7 @@ edition = "2021"
workspace = true
[dependencies]
compliance-core = { workspace = true, features = ["mongodb", "telemetry", "axum"] }
compliance-core = { workspace = true, features = ["mongodb", "telemetry"] }
compliance-graph = { path = "../compliance-graph" }
compliance-dast = { path = "../compliance-dast" }
serde = { workspace = true }
@@ -44,8 +44,7 @@ dashmap = { workspace = true }
tokio-stream = { workspace = true }
[dev-dependencies]
compliance-core = { workspace = true, features = ["mongodb", "axum"] }
tower = { version = "0.5", features = ["util"] }
compliance-core = { workspace = true, features = ["mongodb"] }
reqwest = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
+18 -16
View File
@@ -6,7 +6,7 @@ use tokio::sync::{broadcast, watch, Semaphore};
use compliance_core::models::pentest::PentestEvent;
use compliance_core::AgentConfig;
use crate::database::DatabasePool;
use crate::database::Database;
use crate::llm::LlmClient;
use crate::pipeline::orchestrator::PipelineOrchestrator;
@@ -16,10 +16,7 @@ const DEFAULT_MAX_CONCURRENT_SESSIONS: usize = 5;
#[derive(Clone)]
pub struct ComplianceAgent {
pub config: AgentConfig,
/// Per-tenant Mongo broker. Every code path must obtain a
/// tenant-scoped [`crate::database::Database`] from this pool —
/// there is no single shared database any more.
pub db_pool: DatabasePool,
pub db: Database,
pub llm: Arc<LlmClient>,
pub http: reqwest::Client,
/// Per-session broadcast senders for SSE streaming.
@@ -31,7 +28,7 @@ pub struct ComplianceAgent {
}
impl ComplianceAgent {
pub fn new(config: AgentConfig, db_pool: DatabasePool) -> Self {
pub fn new(config: AgentConfig, db: Database) -> Self {
let llm = Arc::new(LlmClient::new(
config.litellm_url.clone(),
config.litellm_api_key.clone(),
@@ -45,7 +42,7 @@ impl ComplianceAgent {
.unwrap_or_default();
Self {
config,
db_pool,
db,
llm,
http,
session_streams: Arc::new(DashMap::new()),
@@ -56,27 +53,28 @@ impl ComplianceAgent {
pub async fn run_scan(
&self,
tenant_id: &str,
repo_id: &str,
trigger: compliance_core::models::ScanTrigger,
) -> Result<(), crate::error::AgentError> {
let db = self.db_pool.for_tenant_id(tenant_id).await?;
let orchestrator =
PipelineOrchestrator::new(self.config.clone(), db, self.llm.clone(), self.http.clone());
let orchestrator = PipelineOrchestrator::new(
self.config.clone(),
self.db.clone(),
self.llm.clone(),
self.http.clone(),
);
orchestrator.run(repo_id, trigger).await
}
/// Run a PR review: scan the diff and post review comments.
pub async fn run_pr_review(
&self,
tenant_id: &str,
repo_id: &str,
pr_number: u64,
base_sha: &str,
head_sha: &str,
) -> Result<(), crate::error::AgentError> {
let db = self.db_pool.for_tenant_id(tenant_id).await?;
let repo = db
let repo = self
.db
.repositories()
.find_one(mongodb::bson::doc! {
"_id": mongodb::bson::oid::ObjectId::parse_str(repo_id)
@@ -87,8 +85,12 @@ impl ComplianceAgent {
crate::error::AgentError::Other(format!("Repository {repo_id} not found"))
})?;
let orchestrator =
PipelineOrchestrator::new(self.config.clone(), db, self.llm.clone(), self.http.clone());
let orchestrator = PipelineOrchestrator::new(
self.config.clone(),
self.db.clone(),
self.llm.clone(),
self.http.clone(),
);
orchestrator
.run_pr_review(&repo, repo_id, pr_number, base_sha, head_sha)
.await
+113
View File
@@ -0,0 +1,113 @@
use std::sync::Arc;
use axum::{
extract::Request,
middleware::Next,
response::{IntoResponse, Response},
};
use jsonwebtoken::{decode, decode_header, jwk::JwkSet, DecodingKey, Validation};
use reqwest::StatusCode;
use serde::Deserialize;
use tokio::sync::RwLock;
/// Cached JWKS from Keycloak for token validation.
#[derive(Clone)]
pub struct JwksState {
pub jwks: Arc<RwLock<Option<JwkSet>>>,
pub jwks_url: String,
}
#[derive(Debug, Deserialize)]
struct Claims {
#[allow(dead_code)]
sub: String,
}
const PUBLIC_ENDPOINTS: &[&str] = &["/api/v1/health"];
/// Middleware that validates Bearer JWT tokens against Keycloak's JWKS.
///
/// Skips validation for health check endpoints.
/// If `JwksState` is not present as an extension (keycloak not configured),
/// all requests pass through.
pub async fn require_jwt_auth(request: Request, next: Next) -> Response {
let path = request.uri().path();
if PUBLIC_ENDPOINTS.contains(&path) {
return next.run(request).await;
}
let jwks_state = match request.extensions().get::<JwksState>() {
Some(s) => s.clone(),
None => return next.run(request).await,
};
let auth_header = match request.headers().get("authorization") {
Some(h) => h,
None => return (StatusCode::UNAUTHORIZED, "Missing authorization header").into_response(),
};
let token = match auth_header.to_str() {
Ok(s) if s.starts_with("Bearer ") => &s[7..],
_ => return (StatusCode::UNAUTHORIZED, "Invalid authorization header").into_response(),
};
match validate_token(token, &jwks_state).await {
Ok(()) => next.run(request).await,
Err(e) => {
tracing::warn!("JWT validation failed: {e}");
(StatusCode::UNAUTHORIZED, "Invalid token").into_response()
}
}
}
async fn validate_token(token: &str, state: &JwksState) -> Result<(), String> {
let header = decode_header(token).map_err(|e| format!("failed to decode JWT header: {e}"))?;
let kid = header
.kid
.ok_or_else(|| "JWT missing kid header".to_string())?;
let jwks = fetch_or_get_jwks(state).await?;
let jwk = jwks
.keys
.iter()
.find(|k| k.common.key_id.as_deref() == Some(&kid))
.ok_or_else(|| "no matching key found in JWKS".to_string())?;
let decoding_key =
DecodingKey::from_jwk(jwk).map_err(|e| format!("failed to create decoding key: {e}"))?;
let mut validation = Validation::new(header.alg);
validation.validate_exp = true;
validation.validate_aud = false;
decode::<Claims>(token, &decoding_key, &validation)
.map_err(|e| format!("token validation failed: {e}"))?;
Ok(())
}
async fn fetch_or_get_jwks(state: &JwksState) -> Result<JwkSet, String> {
{
let cached = state.jwks.read().await;
if let Some(ref jwks) = *cached {
return Ok(jwks.clone());
}
}
let resp = reqwest::get(&state.jwks_url)
.await
.map_err(|e| format!("failed to fetch JWKS: {e}"))?;
let jwks: JwkSet = resp
.json()
.await
.map_err(|e| format!("failed to parse JWKS: {e}"))?;
let mut cached = state.jwks.write().await;
*cached = Some(jwks.clone());
Ok(jwks)
}
+13 -17
View File
@@ -7,13 +7,11 @@ use mongodb::bson::doc;
use compliance_core::models::chat::{ChatRequest, ChatResponse, SourceReference};
use compliance_core::models::embedding::EmbeddingBuildRun;
use compliance_core::tenant_ctx::TenantCtx;
use compliance_graph::graph::embedding_store::EmbeddingStore;
use crate::agent::ComplianceAgent;
use crate::rag::pipeline::RagPipeline;
use super::dto::tenant_db;
use super::ApiResponse;
type AgentExt = Extension<Arc<ComplianceAgent>>;
@@ -22,12 +20,10 @@ type AgentExt = Extension<Arc<ComplianceAgent>>;
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn chat(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>,
Json(req): Json<ChatRequest>,
) -> Result<Json<ApiResponse<ChatResponse>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let pipeline = RagPipeline::new(agent.llm.clone(), db.inner());
let pipeline = RagPipeline::new(agent.llm.clone(), agent.db.inner());
// Step 1: Embed the user's message
let query_vectors = agent
@@ -137,15 +133,12 @@ pub async fn chat(
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn build_embeddings(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
// Resolve the tenant DB up front so we can move it into the spawn;
// the JWT/dev context isn't available inside detached tasks.
let db = tenant_db(&agent, &tenant).await?;
let agent_clone = (*agent).clone();
tokio::spawn(async move {
let repo = match db
let repo = match agent_clone
.db
.repositories()
.find_one(doc! { "_id": mongodb::bson::oid::ObjectId::parse_str(&repo_id).ok() })
.await
@@ -158,7 +151,8 @@ pub async fn build_embeddings(
};
// Get latest graph build
let build = match db
let build = match agent_clone
.db
.graph_builds()
.find_one(doc! { "repo_id": &repo_id })
.sort(doc! { "started_at": -1 })
@@ -177,8 +171,12 @@ pub async fn build_embeddings(
.unwrap_or_else(|| "unknown".to_string());
// Get nodes
let nodes: Vec<compliance_core::models::graph::CodeNode> =
match db.graph_nodes().find(doc! { "repo_id": &repo_id }).await {
let nodes: Vec<compliance_core::models::graph::CodeNode> = match agent_clone
.db
.graph_nodes()
.find(doc! { "repo_id": &repo_id })
.await
{
Ok(cursor) => {
use futures_util::StreamExt;
let mut items = Vec::new();
@@ -209,7 +207,7 @@ pub async fn build_embeddings(
}
};
let pipeline = RagPipeline::new(agent_clone.llm.clone(), db.inner());
let pipeline = RagPipeline::new(agent_clone.llm.clone(), agent_clone.db.inner());
match pipeline
.build_embeddings(&repo_id, &repo_path, &graph_build_id, &nodes)
.await
@@ -236,11 +234,9 @@ pub async fn build_embeddings(
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn embedding_status(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>,
) -> Result<Json<ApiResponse<Option<EmbeddingBuildRun>>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let store = EmbeddingStore::new(db.inner());
let store = EmbeddingStore::new(agent.db.inner());
let build = store.get_latest_build(&repo_id).await.map_err(|e| {
tracing::error!("Failed to get embedding status: {e}");
StatusCode::INTERNAL_SERVER_ERROR
+11 -20
View File
@@ -7,11 +7,9 @@ use mongodb::bson::doc;
use serde::Deserialize;
use compliance_core::models::dast::{DastFinding, DastScanRun, DastTarget, DastTargetType};
use compliance_core::tenant_ctx::TenantCtx;
use crate::agent::ComplianceAgent;
use super::dto::tenant_db;
use super::{collect_cursor_async, ApiResponse, PaginationParams};
type AgentExt = Extension<Arc<ComplianceAgent>>;
@@ -47,11 +45,9 @@ fn default_rate_limit() -> u32 {
#[tracing::instrument(skip_all)]
pub async fn list_targets(
Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<PaginationParams>,
) -> Result<Json<ApiResponse<Vec<DastTarget>>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db
.dast_targets()
@@ -84,7 +80,6 @@ pub async fn list_targets(
#[tracing::instrument(skip_all)]
pub async fn add_target(
Extension(agent): AgentExt,
tenant: TenantCtx,
Json(req): Json<AddTargetRequest>,
) -> Result<Json<ApiResponse<DastTarget>>, StatusCode> {
let mut target = DastTarget::new(req.name, req.base_url, req.target_type);
@@ -94,8 +89,9 @@ pub async fn add_target(
target.rate_limit = req.rate_limit;
target.allow_destructive = req.allow_destructive;
let db = tenant_db(&agent, &tenant).await?;
db.dast_targets()
agent
.db
.dast_targets()
.insert_one(&target)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
@@ -111,19 +107,19 @@ pub async fn add_target(
#[tracing::instrument(skip_all, fields(target_id = %id))]
pub async fn trigger_scan(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let target = db
let target = agent
.db
.dast_targets()
.find_one(doc! { "_id": oid })
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.ok_or(StatusCode::NOT_FOUND)?;
let db = agent.db.clone();
tokio::spawn(async move {
let orchestrator = compliance_dast::DastOrchestrator::new(100);
match orchestrator.run_scan(&target, Vec::new()).await {
@@ -151,11 +147,9 @@ pub async fn trigger_scan(
#[tracing::instrument(skip_all)]
pub async fn list_scan_runs(
Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<PaginationParams>,
) -> Result<Json<ApiResponse<Vec<DastScanRun>>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db
.dast_scan_runs()
@@ -189,11 +183,9 @@ pub async fn list_scan_runs(
#[tracing::instrument(skip_all)]
pub async fn list_findings(
Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<PaginationParams>,
) -> Result<Json<ApiResponse<Vec<DastFinding>>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db
.dast_findings()
@@ -227,13 +219,12 @@ pub async fn list_findings(
#[tracing::instrument(skip_all, fields(finding_id = %id))]
pub async fn get_finding(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
) -> Result<Json<ApiResponse<DastFinding>>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let finding = db
let finding = agent
.db
.dast_findings()
.find_one(doc! { "_id": oid })
.await
-21
View File
@@ -180,27 +180,6 @@ pub struct SbomVersionDiff {
pub(crate) type AgentExt = axum::extract::Extension<std::sync::Arc<crate::agent::ComplianceAgent>>;
pub(crate) type ApiResult<T> = Result<axum::Json<ApiResponse<T>>, axum::http::StatusCode>;
/// Resolve a tenant-scoped [`Database`] from the request's
/// [`TenantContext`] (inserted by the M7.1 JWT middleware, or by the
/// dev fallback in unsecured environments). The pool ensures the
/// tenant's indexes idempotently.
///
/// Returns 500 on the rare path where Mongo refuses the database
/// handle — the M7.1 auth/status middleware already rejects every
/// other failure mode with 4xx before we get here.
pub(crate) async fn tenant_db(
agent: &crate::agent::ComplianceAgent,
tenant: &compliance_core::tenant_ctx::TenantCtx,
) -> Result<crate::database::Database, axum::http::StatusCode> {
agent.db_pool.for_tenant(&tenant.0).await.map_err(|e| {
tracing::error!(
tenant_id = %tenant.0.tenant_id,
"Failed to acquire tenant database: {e}"
);
axum::http::StatusCode::INTERNAL_SERVER_ERROR
})
}
pub(crate) async fn collect_cursor_async<T: serde::de::DeserializeOwned + Unpin + Send>(
mut cursor: mongodb::Cursor<T>,
) -> Vec<T> {
+11 -16
View File
@@ -5,16 +5,13 @@ use mongodb::bson::doc;
use super::dto::*;
use compliance_core::models::Finding;
use compliance_core::tenant_ctx::TenantCtx;
#[tracing::instrument(skip_all, fields(repo_id = ?filter.repo_id, severity = ?filter.severity, scan_type = ?filter.scan_type))]
pub async fn list_findings(
Extension(agent): AgentExt,
tenant: TenantCtx,
Query(filter): Query<FindingsFilter>,
) -> ApiResult<Vec<Finding>> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
let mut query = doc! {};
if let Some(repo_id) = &filter.repo_id {
query.insert("repo_id", repo_id);
@@ -84,12 +81,11 @@ pub async fn list_findings(
#[tracing::instrument(skip_all, fields(finding_id = %id))]
pub async fn get_finding(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
) -> Result<Json<ApiResponse<Finding>>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let finding = db
let finding = agent
.db
.findings()
.find_one(doc! { "_id": oid })
.await
@@ -106,14 +102,14 @@ pub async fn get_finding(
#[tracing::instrument(skip_all, fields(finding_id = %id))]
pub async fn update_finding_status(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
Json(req): Json<UpdateStatusRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
db.findings()
agent
.db
.findings()
.update_one(
doc! { "_id": oid },
doc! { "$set": { "status": &req.status, "updated_at": mongodb::bson::DateTime::now() } },
@@ -127,7 +123,6 @@ pub async fn update_finding_status(
#[tracing::instrument(skip_all)]
pub async fn bulk_update_finding_status(
Extension(agent): AgentExt,
tenant: TenantCtx,
Json(req): Json<BulkUpdateStatusRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let oids: Vec<mongodb::bson::oid::ObjectId> = req
@@ -140,8 +135,8 @@ pub async fn bulk_update_finding_status(
return Err(StatusCode::BAD_REQUEST);
}
let db = tenant_db(&agent, &tenant).await?;
let result = db
let result = agent
.db
.findings()
.update_many(
doc! { "_id": { "$in": oids } },
@@ -158,14 +153,14 @@ pub async fn bulk_update_finding_status(
#[tracing::instrument(skip_all)]
pub async fn update_finding_feedback(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
Json(req): Json<UpdateFeedbackRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
db.findings()
agent
.db
.findings()
.update_one(
doc! { "_id": oid },
doc! { "$set": { "developer_feedback": &req.feedback, "updated_at": mongodb::bson::DateTime::now() } },
+10 -24
View File
@@ -7,11 +7,9 @@ use mongodb::bson::doc;
use serde::{Deserialize, Serialize};
use compliance_core::models::graph::{CodeEdge, CodeNode, GraphBuildRun, ImpactAnalysis};
use compliance_core::tenant_ctx::TenantCtx;
use crate::agent::ComplianceAgent;
use super::dto::tenant_db;
use super::{collect_cursor_async, ApiResponse};
type AgentExt = Extension<Arc<ComplianceAgent>>;
@@ -38,11 +36,9 @@ fn default_search_limit() -> usize {
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn get_graph(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>,
) -> Result<Json<ApiResponse<GraphData>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
// Get latest build
let build: Option<GraphBuildRun> = db
@@ -102,11 +98,9 @@ pub async fn get_graph(
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn get_nodes(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>,
) -> Result<Json<ApiResponse<Vec<CodeNode>>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
let filter = doc! { "repo_id": &repo_id };
let nodes: Vec<CodeNode> = match db.graph_nodes().find(filter).await {
@@ -129,11 +123,9 @@ pub async fn get_nodes(
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn get_communities(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>,
) -> Result<Json<ApiResponse<Vec<CommunityInfo>>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
let filter = doc! { "repo_id": &repo_id };
let nodes: Vec<CodeNode> = match db.graph_nodes().find(filter).await {
@@ -184,11 +176,9 @@ pub struct CommunityInfo {
#[tracing::instrument(skip_all, fields(repo_id = %repo_id, finding_id = %finding_id))]
pub async fn get_impact(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path((repo_id, finding_id)): Path<(String, String)>,
) -> Result<Json<ApiResponse<Option<ImpactAnalysis>>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
let filter = doc! { "repo_id": &repo_id, "finding_id": &finding_id };
let impact = db
@@ -208,12 +198,10 @@ pub async fn get_impact(
#[tracing::instrument(skip_all, fields(repo_id = %repo_id, query = %params.q))]
pub async fn search_symbols(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>,
Query(params): Query<SearchParams>,
) -> Result<Json<ApiResponse<Vec<CodeNode>>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
// Simple text search on qualified_name and name fields
let filter = doc! {
@@ -246,12 +234,10 @@ pub async fn search_symbols(
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn get_file_content(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>,
Query(params): Query<FileContentParams>,
) -> Result<Json<ApiResponse<FileContent>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
// Look up the repository to get repo name
let repo = db
@@ -310,13 +296,12 @@ pub struct FileContent {
#[tracing::instrument(skip_all, fields(repo_id = %repo_id))]
pub async fn trigger_build(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(repo_id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let agent_clone = (*agent).clone();
tokio::spawn(async move {
let repo = match db
let repo = match agent_clone
.db
.repositories()
.find_one(doc! { "_id": mongodb::bson::oid::ObjectId::parse_str(&repo_id).ok() })
.await
@@ -348,7 +333,8 @@ pub async fn trigger_build(
match engine.build_graph(&repo_path, &repo_id, &graph_build_id) {
Ok((code_graph, build_run)) => {
let store = compliance_graph::graph::persistence::GraphStore::new(db.inner());
let store =
compliance_graph::graph::persistence::GraphStore::new(agent_clone.db.inner());
let _ = store.delete_repo_graph(&repo_id).await;
let _ = store
.store_graph(&build_run, &code_graph.nodes, &code_graph.edges)
+2 -7
View File
@@ -3,7 +3,6 @@ use mongodb::bson::doc;
use super::dto::*;
use compliance_core::models::ScanRun;
use compliance_core::tenant_ctx::TenantCtx;
#[tracing::instrument(skip_all)]
pub async fn health() -> Json<serde_json::Value> {
@@ -11,12 +10,8 @@ pub async fn health() -> Json<serde_json::Value> {
}
#[tracing::instrument(skip_all)]
pub async fn stats_overview(
axum::extract::Extension(agent): AgentExt,
tenant: TenantCtx,
) -> ApiResult<OverviewStats> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
pub async fn stats_overview(axum::extract::Extension(agent): AgentExt) -> ApiResult<OverviewStats> {
let db = &agent.db;
let total_repositories = db
.repositories()
+1 -4
View File
@@ -4,16 +4,13 @@ use mongodb::bson::doc;
use super::dto::*;
use compliance_core::models::TrackerIssue;
use compliance_core::tenant_ctx::TenantCtx;
#[tracing::instrument(skip_all)]
pub async fn list_issues(
Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<PaginationParams>,
) -> ApiResult<Vec<TrackerIssue>> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db
.tracker_issues()
@@ -5,18 +5,15 @@ use mongodb::bson::doc;
use serde::Deserialize;
use compliance_core::models::notification::CveNotification;
use compliance_core::tenant_ctx::TenantCtx;
use super::dto::{tenant_db, AgentExt, ApiResponse};
use super::dto::{AgentExt, ApiResponse};
/// GET /api/v1/notifications — List CVE notifications (newest first)
#[tracing::instrument(skip_all)]
pub async fn list_notifications(
Extension(agent): AgentExt,
tenant: TenantCtx,
axum::extract::Query(params): axum::extract::Query<NotificationFilter>,
) -> Result<Json<ApiResponse<Vec<CveNotification>>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let mut filter = doc! {};
// Filter by status (default: show new + read, exclude dismissed)
@@ -44,13 +41,15 @@ pub async fn list_notifications(
let limit = params.limit.unwrap_or(50).min(200);
let skip = (page - 1) * limit as u64;
let total = db
let total = agent
.db
.cve_notifications()
.count_documents(filter.clone())
.await
.unwrap_or(0);
let notifications: Vec<CveNotification> = match db
let notifications: Vec<CveNotification> = match agent
.db
.cve_notifications()
.find(filter)
.sort(doc! { "created_at": -1 })
@@ -84,10 +83,9 @@ pub async fn list_notifications(
#[tracing::instrument(skip_all)]
pub async fn notification_count(
Extension(agent): AgentExt,
tenant: TenantCtx,
) -> Result<Json<serde_json::Value>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let count = db
let count = agent
.db
.cve_notifications()
.count_documents(doc! { "status": "new" })
.await
@@ -100,13 +98,12 @@ pub async fn notification_count(
#[tracing::instrument(skip_all, fields(id = %id))]
pub async fn mark_read(
Extension(agent): AgentExt,
tenant: TenantCtx,
axum::extract::Path(id): axum::extract::Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let result = db
let result = agent
.db
.cve_notifications()
.update_one(
doc! { "_id": oid },
@@ -128,13 +125,12 @@ pub async fn mark_read(
#[tracing::instrument(skip_all, fields(id = %id))]
pub async fn dismiss_notification(
Extension(agent): AgentExt,
tenant: TenantCtx,
axum::extract::Path(id): axum::extract::Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let result = db
let result = agent
.db
.cve_notifications()
.update_one(
doc! { "_id": oid },
@@ -153,10 +149,9 @@ pub async fn dismiss_notification(
#[tracing::instrument(skip_all)]
pub async fn mark_all_read(
Extension(agent): AgentExt,
tenant: TenantCtx,
) -> Result<Json<serde_json::Value>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let result = db
let result = agent
.db
.cve_notifications()
.update_many(
doc! { "status": "new" },
@@ -13,11 +13,10 @@ use compliance_core::models::dast::DastFinding;
use compliance_core::models::finding::Finding;
use compliance_core::models::pentest::*;
use compliance_core::models::sbom::SbomEntry;
use compliance_core::tenant_ctx::TenantCtx;
use crate::agent::ComplianceAgent;
use super::super::dto::{collect_cursor_async, tenant_db};
use super::super::dto::collect_cursor_async;
type AgentExt = Extension<Arc<ComplianceAgent>>;
@@ -36,15 +35,11 @@ pub struct ExportBody {
#[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn export_session_report(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
Json(body): Json<ExportBody>,
) -> Result<axum::response::Response, (StatusCode, String)> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid session ID".to_string()))?;
let db = tenant_db(&agent, &tenant)
.await
.map_err(|s| (s, "failed to acquire tenant database".to_string()))?;
if body.password.len() < 8 {
return Err((
@@ -54,7 +49,8 @@ pub async fn export_session_report(
}
// Fetch session
let session = db
let session = agent
.db
.pentest_sessions()
.find_one(doc! { "_id": oid })
.await
@@ -68,7 +64,9 @@ pub async fn export_session_report(
// Resolve target name
let target = if let Ok(tid) = mongodb::bson::oid::ObjectId::parse_str(&session.target_id) {
db.dast_targets()
agent
.db
.dast_targets()
.find_one(doc! { "_id": tid })
.await
.ok()
@@ -86,7 +84,8 @@ pub async fn export_session_report(
.unwrap_or_default();
// Fetch attack chain nodes
let nodes: Vec<AttackChainNode> = match db
let nodes: Vec<AttackChainNode> = match agent
.db
.attack_chain_nodes()
.find(doc! { "session_id": &id })
.sort(doc! { "started_at": 1 })
@@ -97,7 +96,8 @@ pub async fn export_session_report(
};
// Fetch DAST findings for this session, then deduplicate
let raw_findings: Vec<DastFinding> = match db
let raw_findings: Vec<DastFinding> = match agent
.db
.dast_findings()
.find(doc! { "session_id": &id })
.sort(doc! { "severity": -1, "created_at": -1 })
@@ -122,7 +122,8 @@ pub async fn export_session_report(
.or_else(|| target.as_ref().and_then(|t| t.repo_id.clone()));
let (sast_findings, sbom_entries, code_context) = if let Some(ref rid) = repo_id {
let sast: Vec<Finding> = match db
let sast: Vec<Finding> = match agent
.db
.findings()
.find(doc! {
"repo_id": rid,
@@ -142,7 +143,8 @@ pub async fn export_session_report(
Err(_) => Vec::new(),
};
let sbom: Vec<SbomEntry> = match db
let sbom: Vec<SbomEntry> = match agent
.db
.sbom_entries()
.find(doc! {
"repo_id": rid,
@@ -162,7 +164,8 @@ pub async fn export_session_report(
};
// Build code context from graph nodes
let code_ctx: Vec<CodeContextHint> = match db
let code_ctx: Vec<CodeContextHint> = match agent
.db
.graph_nodes()
.find(doc! { "repo_id": rid, "is_entry_point": true })
.limit(50)
@@ -7,12 +7,11 @@ use mongodb::bson::doc;
use serde::Deserialize;
use compliance_core::models::pentest::*;
use compliance_core::tenant_ctx::TenantCtx;
use crate::agent::ComplianceAgent;
use crate::pentest::PentestOrchestrator;
use super::super::dto::{collect_cursor_async, tenant_db, ApiResponse, PaginationParams};
use super::super::dto::{collect_cursor_async, ApiResponse, PaginationParams};
type AgentExt = Extension<Arc<ComplianceAgent>>;
@@ -44,7 +43,6 @@ pub struct LookupRepoQuery {
#[tracing::instrument(skip_all)]
pub async fn create_session(
Extension(agent): AgentExt,
tenant: TenantCtx,
Json(req): Json<CreateSessionRequest>,
) -> Result<Json<ApiResponse<PentestSession>>, (StatusCode, String)> {
// Try to acquire a concurrency permit
@@ -59,10 +57,6 @@ pub async fn create_session(
)
})?;
let db = tenant_db(&agent, &tenant)
.await
.map_err(|s| (s, "failed to acquire tenant database".to_string()))?;
if let Some(ref config) = req.config {
// ── Wizard path ──────────────────────────────────────────────
if !config.disclaimer_accepted {
@@ -73,7 +67,8 @@ pub async fn create_session(
}
// Look up or auto-create DastTarget by app_url
let target = match db
let target = match agent
.db
.dast_targets()
.find_one(doc! { "base_url": &config.app_url })
.await
@@ -92,7 +87,7 @@ pub async fn create_session(
}
t.allow_destructive = config.allow_destructive;
t.excluded_paths = config.scope_exclusions.clone();
let res = db.dast_targets().insert_one(&t).await.map_err(|e| {
let res = agent.db.dast_targets().insert_one(&t).await.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to create target: {e}"),
@@ -115,7 +110,8 @@ pub async fn create_session(
// Resolve repo_id from git_repo_url if provided
if let Some(ref git_url) = config.git_repo_url {
if let Ok(Some(repo)) = db
if let Ok(Some(repo)) = agent
.db
.repositories()
.find_one(doc! { "git_url": git_url })
.await
@@ -124,7 +120,8 @@ pub async fn create_session(
}
}
let insert_result = db
let insert_result = agent
.db
.pentest_sessions()
.insert_one(&session)
.await
@@ -215,7 +212,8 @@ pub async fn create_session(
// Persist encrypted credentials to DB
if session_for_task.config.is_some() {
if let Some(sid) = session.id {
let _ = db
let _ = agent
.db
.pentest_sessions()
.update_one(
doc! { "_id": sid },
@@ -247,13 +245,12 @@ pub async fn create_session(
});
let llm = agent.llm.clone();
let db_for_orchestrator = db.clone();
let db = agent.db.clone();
let session_clone = session.clone();
let target_clone = target.clone();
let agent_ref = agent.clone();
tokio::spawn(async move {
let orchestrator =
PentestOrchestrator::new(llm, db_for_orchestrator, event_tx, Some(pause_rx));
let orchestrator = PentestOrchestrator::new(llm, db, event_tx, Some(pause_rx));
orchestrator
.run_session_guarded(&session_clone, &target_clone, &initial_message)
.await;
@@ -295,7 +292,8 @@ pub async fn create_session(
)
})?;
let target = db
let target = agent
.db
.dast_targets()
.find_one(doc! { "_id": oid })
.await
@@ -312,7 +310,8 @@ pub async fn create_session(
let mut session = PentestSession::new(target_id, strategy);
session.repo_id = target.repo_id.clone();
let insert_result = db
let insert_result = agent
.db
.pentest_sessions()
.insert_one(&session)
.await
@@ -339,13 +338,12 @@ pub async fn create_session(
});
let llm = agent.llm.clone();
let db_for_orchestrator = db.clone();
let db = agent.db.clone();
let session_clone = session.clone();
let target_clone = target.clone();
let agent_ref = agent.clone();
tokio::spawn(async move {
let orchestrator =
PentestOrchestrator::new(llm, db_for_orchestrator, event_tx, Some(pause_rx));
let orchestrator = PentestOrchestrator::new(llm, db, event_tx, Some(pause_rx));
orchestrator
.run_session_guarded(&session_clone, &target_clone, &initial_message)
.await;
@@ -375,11 +373,10 @@ fn parse_strategy(s: &str) -> PentestStrategy {
#[tracing::instrument(skip_all)]
pub async fn lookup_repo(
Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<LookupRepoQuery>,
) -> Result<Json<ApiResponse<serde_json::Value>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let repo = db
let repo = agent
.db
.repositories()
.find_one(doc! { "git_url": &params.url })
.await
@@ -405,11 +402,9 @@ pub async fn lookup_repo(
#[tracing::instrument(skip_all)]
pub async fn list_sessions(
Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<PaginationParams>,
) -> Result<Json<ApiResponse<Vec<PentestSession>>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db
.pentest_sessions()
@@ -443,13 +438,12 @@ pub async fn list_sessions(
#[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn get_session(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
) -> Result<Json<ApiResponse<PentestSession>>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let mut session = db
let mut session = agent
.db
.pentest_sessions()
.find_one(doc! { "_id": oid })
.await
@@ -477,18 +471,15 @@ pub async fn get_session(
#[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn send_message(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
Json(req): Json<SendMessageRequest>,
) -> Result<Json<ApiResponse<PentestMessage>>, (StatusCode, String)> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid session ID".to_string()))?;
let db = tenant_db(&agent, &tenant)
.await
.map_err(|s| (s, "failed to acquire tenant database".to_string()))?;
// Verify session exists and is running
let session = db
let session = agent
.db
.pentest_sessions()
.find_one(doc! { "_id": oid })
.await
@@ -515,7 +506,8 @@ pub async fn send_message(
)
})?;
let target = db
let target = agent
.db
.dast_targets()
.find_one(doc! { "_id": target_oid })
.await
@@ -535,13 +527,13 @@ pub async fn send_message(
// Store user message
let session_id = id.clone();
let user_msg = PentestMessage::user(session_id.clone(), req.message.clone());
let _ = db.pentest_messages().insert_one(&user_msg).await;
let _ = agent.db.pentest_messages().insert_one(&user_msg).await;
let response_msg = user_msg.clone();
// Spawn orchestrator to continue the session
let llm = agent.llm.clone();
let db_for_orchestrator = db.clone();
let db = agent.db.clone();
let message = req.message.clone();
// Use existing broadcast sender if available, otherwise create a new one
@@ -556,7 +548,7 @@ pub async fn send_message(
.unwrap_or_else(|| agent.register_session_stream(&session_id));
tokio::spawn(async move {
let orchestrator = PentestOrchestrator::new(llm, db_for_orchestrator, event_tx, None);
let orchestrator = PentestOrchestrator::new(llm, db, event_tx, None);
orchestrator
.run_session_guarded(&session, &target, &message)
.await;
@@ -573,16 +565,13 @@ pub async fn send_message(
#[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn stop_session(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
) -> Result<Json<ApiResponse<PentestSession>>, (StatusCode, String)> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid session ID".to_string()))?;
let db = tenant_db(&agent, &tenant)
.await
.map_err(|s| (s, "failed to acquire tenant database".to_string()))?;
let session = db
let session = agent
.db
.pentest_sessions()
.find_one(doc! { "_id": oid })
.await
@@ -601,7 +590,9 @@ pub async fn stop_session(
));
}
db.pentest_sessions()
agent
.db
.pentest_sessions()
.update_one(
doc! { "_id": oid },
doc! { "$set": {
@@ -621,7 +612,8 @@ pub async fn stop_session(
// Clean up session resources
agent.cleanup_session(&id);
let updated = db
let updated = agent
.db
.pentest_sessions()
.find_one(doc! { "_id": oid })
.await
@@ -649,16 +641,13 @@ pub async fn stop_session(
#[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn pause_session(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
) -> Result<Json<ApiResponse<serde_json::Value>>, (StatusCode, String)> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid session ID".to_string()))?;
let db = tenant_db(&agent, &tenant)
.await
.map_err(|s| (s, "failed to acquire tenant database".to_string()))?;
let session = db
let session = agent
.db
.pentest_sessions()
.find_one(doc! { "_id": oid })
.await
@@ -695,16 +684,13 @@ pub async fn pause_session(
#[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn resume_session(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
) -> Result<Json<ApiResponse<serde_json::Value>>, (StatusCode, String)> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid session ID".to_string()))?;
let db = tenant_db(&agent, &tenant)
.await
.map_err(|s| (s, "failed to acquire tenant database".to_string()))?;
let session = db
let session = agent
.db
.pentest_sessions()
.find_one(doc! { "_id": oid })
.await
@@ -741,13 +727,12 @@ pub async fn resume_session(
#[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn get_attack_chain(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
) -> Result<Json<ApiResponse<Vec<AttackChainNode>>>, StatusCode> {
let _oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let nodes = match db
let nodes = match agent
.db
.attack_chain_nodes()
.find(doc! { "session_id": &id })
.sort(doc! { "started_at": 1 })
@@ -772,21 +757,21 @@ pub async fn get_attack_chain(
#[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn get_messages(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
Query(params): Query<PaginationParams>,
) -> Result<Json<ApiResponse<Vec<PentestMessage>>>, StatusCode> {
let _oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db
let total = agent
.db
.pentest_messages()
.count_documents(doc! { "session_id": &id })
.await
.unwrap_or(0);
let messages = match db
let messages = match agent
.db
.pentest_messages()
.find(doc! { "session_id": &id })
.sort(doc! { "created_at": 1 })
@@ -812,21 +797,21 @@ pub async fn get_messages(
#[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn get_session_findings(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
Query(params): Query<PaginationParams>,
) -> Result<Json<ApiResponse<Vec<compliance_core::models::dast::DastFinding>>>, StatusCode> {
let _oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db
let total = agent
.db
.dast_findings()
.count_documents(doc! { "session_id": &id })
.await
.unwrap_or(0);
let findings = match db
let findings = match agent
.db
.dast_findings()
.find(doc! { "session_id": &id })
.sort(doc! { "created_at": -1 })
@@ -6,11 +6,10 @@ use axum::Json;
use mongodb::bson::doc;
use compliance_core::models::pentest::*;
use compliance_core::tenant_ctx::TenantCtx;
use crate::agent::ComplianceAgent;
use super::super::dto::{collect_cursor_async, tenant_db, ApiResponse};
use super::super::dto::{collect_cursor_async, ApiResponse};
type AgentExt = Extension<Arc<ComplianceAgent>>;
@@ -18,10 +17,8 @@ type AgentExt = Extension<Arc<ComplianceAgent>>;
#[tracing::instrument(skip_all)]
pub async fn pentest_stats(
Extension(agent): AgentExt,
tenant: TenantCtx,
) -> Result<Json<ApiResponse<PentestStats>>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
let running_sessions = db
.pentest_sessions()
@@ -11,11 +11,10 @@ use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use compliance_core::models::pentest::*;
use compliance_core::tenant_ctx::TenantCtx;
use crate::agent::ComplianceAgent;
use super::super::dto::{collect_cursor_async, tenant_db};
use super::super::dto::collect_cursor_async;
type AgentExt = Extension<Arc<ComplianceAgent>>;
@@ -26,14 +25,13 @@ type AgentExt = Extension<Arc<ComplianceAgent>>;
#[tracing::instrument(skip_all, fields(session_id = %id))]
pub async fn session_stream(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
) -> Result<Sse<impl futures_util::Stream<Item = Result<Event, Infallible>>>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
// Verify session exists
let _session = db
let _session = agent
.db
.pentest_sessions()
.find_one(doc! { "_id": oid })
.await
@@ -45,7 +43,8 @@ pub async fn session_stream(
let mut initial_events: Vec<Result<Event, Infallible>> = Vec::new();
// Fetch recent messages for this session
let messages: Vec<PentestMessage> = match db
let messages: Vec<PentestMessage> = match agent
.db
.pentest_messages()
.find(doc! { "session_id": &id })
.sort(doc! { "created_at": 1 })
@@ -57,7 +56,8 @@ pub async fn session_stream(
};
// Fetch recent attack chain nodes
let nodes: Vec<AttackChainNode> = match db
let nodes: Vec<AttackChainNode> = match agent
.db
.attack_chain_nodes()
.find(doc! { "session_id": &id })
.sort(doc! { "started_at": 1 })
@@ -94,7 +94,8 @@ pub async fn session_stream(
}
// Add current session status event
let session = db
let session = agent
.db
.pentest_sessions()
.find_one(doc! { "_id": oid })
.await
+12 -23
View File
@@ -5,16 +5,13 @@ use mongodb::bson::doc;
use super::dto::*;
use compliance_core::models::*;
use compliance_core::tenant_ctx::TenantCtx;
#[tracing::instrument(skip_all)]
pub async fn list_repositories(
Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<PaginationParams>,
) -> ApiResult<Vec<TrackedRepository>> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db
.repositories()
@@ -46,7 +43,6 @@ pub async fn list_repositories(
#[tracing::instrument(skip_all)]
pub async fn add_repository(
Extension(agent): AgentExt,
tenant: TenantCtx,
Json(req): Json<AddRepositoryRequest>,
) -> Result<Json<ApiResponse<TrackedRepository>>, (StatusCode, String)> {
// Validate repository access before saving
@@ -73,10 +69,12 @@ pub async fn add_repository(
repo.tracker_token = req.tracker_token;
repo.scan_schedule = req.scan_schedule;
let db = tenant_db(&agent, &tenant)
agent
.db
.repositories()
.insert_one(&repo)
.await
.map_err(|s| (s, "failed to acquire tenant database".to_string()))?;
db.repositories().insert_one(&repo).await.map_err(|_| {
.map_err(|_| {
(
StatusCode::CONFLICT,
"Repository already exists".to_string(),
@@ -93,12 +91,10 @@ pub async fn add_repository(
#[tracing::instrument(skip_all, fields(repo_id = %id))]
pub async fn update_repository(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
Json(req): Json<UpdateRepositoryRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let mut set_doc = doc! { "updated_at": mongodb::bson::DateTime::now() };
@@ -130,7 +126,8 @@ pub async fn update_repository(
set_doc.insert("scan_schedule", schedule);
}
let result = db
let result = agent
.db
.repositories()
.update_one(doc! { "_id": oid }, doc! { "$set": set_doc })
.await
@@ -158,16 +155,11 @@ pub async fn get_ssh_public_key(
#[tracing::instrument(skip_all, fields(repo_id = %id))]
pub async fn trigger_scan(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let agent_clone = (*agent).clone();
let tenant_id = tenant.0.tenant_id.clone();
tokio::spawn(async move {
if let Err(e) = agent_clone
.run_scan(&tenant_id, &id, ScanTrigger::Manual)
.await
{
if let Err(e) = agent_clone.run_scan(&id, ScanTrigger::Manual).await {
tracing::error!("Manual scan failed for {id}: {e}");
}
});
@@ -178,12 +170,11 @@ pub async fn trigger_scan(
/// Return the webhook secret for a repository (used by dashboard to display it)
pub async fn get_webhook_config(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let repo = db
let repo = agent
.db
.repositories()
.find_one(doc! { "_id": oid })
.await
@@ -205,12 +196,10 @@ pub async fn get_webhook_config(
#[tracing::instrument(skip_all, fields(repo_id = %id))]
pub async fn delete_repository(
Extension(agent): AgentExt,
tenant: TenantCtx,
Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
// Delete the repository
let result = db
+5 -16
View File
@@ -6,7 +6,6 @@ use mongodb::bson::doc;
use super::dto::*;
use compliance_core::models::SbomEntry;
use compliance_core::tenant_ctx::TenantCtx;
const COPYLEFT_LICENSES: &[&str] = &[
"GPL-2.0",
@@ -30,10 +29,8 @@ const COPYLEFT_LICENSES: &[&str] = &[
#[tracing::instrument(skip_all)]
pub async fn sbom_filters(
Extension(agent): AgentExt,
tenant: TenantCtx,
) -> Result<Json<serde_json::Value>, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
let managers: Vec<String> = db
.sbom_entries()
@@ -64,11 +61,9 @@ pub async fn sbom_filters(
#[tracing::instrument(skip_all, fields(repo_id = ?filter.repo_id, package_manager = ?filter.package_manager))]
pub async fn list_sbom(
Extension(agent): AgentExt,
tenant: TenantCtx,
Query(filter): Query<SbomFilter>,
) -> ApiResult<Vec<SbomEntry>> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
let mut query = doc! {};
if let Some(repo_id) = &filter.repo_id {
@@ -125,11 +120,9 @@ pub async fn list_sbom(
#[tracing::instrument(skip_all)]
pub async fn export_sbom(
Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<SbomExportParams>,
) -> Result<impl IntoResponse, StatusCode> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
let entries: Vec<SbomEntry> = match db
.sbom_entries()
.find(doc! { "repo_id": &params.repo_id })
@@ -243,11 +236,9 @@ pub async fn export_sbom(
#[tracing::instrument(skip_all)]
pub async fn license_summary(
Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<SbomFilter>,
) -> ApiResult<Vec<LicenseSummary>> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
let mut query = doc! {};
if let Some(repo_id) = &params.repo_id {
query.insert("repo_id", repo_id);
@@ -294,11 +285,9 @@ pub async fn license_summary(
#[tracing::instrument(skip_all)]
pub async fn sbom_diff(
Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<SbomDiffParams>,
) -> ApiResult<SbomDiffResult> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
let entries_a: Vec<SbomEntry> = match db
.sbom_entries()
+1 -4
View File
@@ -4,16 +4,13 @@ use mongodb::bson::doc;
use super::dto::*;
use compliance_core::models::ScanRun;
use compliance_core::tenant_ctx::TenantCtx;
#[tracing::instrument(skip_all)]
pub async fn list_scan_runs(
Extension(agent): AgentExt,
tenant: TenantCtx,
Query(params): Query<PaginationParams>,
) -> ApiResult<Vec<ScanRun>> {
let db = tenant_db(&agent, &tenant).await?;
let db = &db;
let db = &agent.db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db.scan_runs().count_documents(doc! {}).await.unwrap_or(0);
+1
View File
@@ -1,3 +1,4 @@
pub mod auth_middleware;
pub mod handlers;
pub mod routes;
pub mod server;
+4 -52
View File
@@ -1,54 +1,17 @@
use std::sync::Arc;
use axum::extract::Request;
use axum::http::HeaderValue;
use axum::middleware::Next;
use axum::response::Response;
use axum::{middleware, Extension};
use tokio::sync::RwLock;
use tower_http::cors::CorsLayer;
use tower_http::set_header::SetResponseHeaderLayer;
use tower_http::trace::TraceLayer;
use compliance_core::auth::{require_jwt_auth, require_tenant_status, JwksState};
use compliance_core::{TenantContext, TenantStatus};
use crate::agent::ComplianceAgent;
use crate::api::auth_middleware::{require_jwt_auth, JwksState};
use crate::api::routes;
use crate::error::AgentError;
/// Synthetic tenant id used when Keycloak isn't configured (local dev,
/// `cargo run` against a bare Mongo). Lets the handler stack stay
/// uniformly tenant-scoped without the operator having to spin up KC
/// just to poke at the API. Override via `DEV_TENANT_ID`.
const DEFAULT_DEV_TENANT_ID: &str = "dev";
/// Inject a synthetic [`TenantContext`] for any request that lacks one.
/// Only mounted when Keycloak is NOT configured; with KC, the real
/// `require_jwt_auth` middleware owns this and we never reach here
/// without a context.
///
/// Public so the integration-test harness can mount it without
/// duplicating the synthetic-context shape.
pub async fn inject_dev_tenant(mut request: Request, next: Next) -> Response {
if request.extensions().get::<TenantContext>().is_none() {
let tenant_id =
std::env::var("DEV_TENANT_ID").unwrap_or_else(|_| DEFAULT_DEV_TENANT_ID.to_string());
let ctx = TenantContext {
tenant_slug: tenant_id.clone(),
tenant_id,
org_roles: vec![],
products: vec![],
plan: "dev".to_string(),
status: TenantStatus::Active,
user_id: "dev-user".to_string(),
user_name: None,
};
request.extensions_mut().insert(ctx);
}
next.run(request).await
}
pub async fn start_api_server(agent: ComplianceAgent, port: u16) -> Result<(), AgentError> {
let mut app = routes::build_router()
.layer(Extension(Arc::new(agent.clone())))
@@ -81,22 +44,11 @@ pub async fn start_api_server(agent: ComplianceAgent, port: u16) -> Result<(), A
jwks_url,
};
tracing::info!("Keycloak JWT auth enabled for realm '{kc_realm}'");
// Layers execute outermost-first. Extension(jwks_state) must run
// before require_jwt_auth so the middleware can read it; the
// status gate runs after JWT so TenantContext is in extensions.
app = app
.layer(middleware::from_fn(require_tenant_status))
.layer(middleware::from_fn(require_jwt_auth))
.layer(Extension(jwks_state));
.layer(Extension(jwks_state))
.layer(middleware::from_fn(require_jwt_auth));
} else {
let tenant_id =
std::env::var("DEV_TENANT_ID").unwrap_or_else(|_| DEFAULT_DEV_TENANT_ID.to_string());
tracing::warn!(
tenant_id = %tenant_id,
"Keycloak not configured — running unauthenticated against the dev tenant. \
DO NOT use in any environment with real customer data."
);
app = app.layer(middleware::from_fn(inject_dev_tenant));
tracing::warn!("Keycloak not configured - API endpoints are unprotected");
}
let addr = format!("0.0.0.0:{port}");
-192
View File
@@ -1,197 +1,11 @@
use std::sync::Arc;
use dashmap::DashMap;
use mongodb::bson::doc;
use mongodb::options::IndexOptions;
use mongodb::{Client, Collection, IndexModel};
use sha2::{Digest, Sha256};
use compliance_core::models::*;
use compliance_core::TenantContext;
use crate::error::AgentError;
/// Mongo enforces a 63-byte cap on database names (older clusters: 64
/// on Linux, 63 on Windows; we target the conservative limit).
const MAX_DB_NAME_LEN: usize = 63;
/// Hex length of the SHA-256 truncation used for the hash fallback
/// tenant DB name (16 bytes → 32 hex chars). 16 bytes gives ~2^64
/// birthday-collision resistance — at our 10s-100s tenant scale this
/// is effectively impossible to hit.
const HASH_HEX_LEN: usize = 32;
/// Largest `db_prefix` that still guarantees the hash-fallback name
/// fits in the 63-byte cap: `prefix + "_" + 32 hex chars`.
const MAX_PREFIX_LEN: usize = MAX_DB_NAME_LEN - 1 - HASH_HEX_LEN;
/// Per-tenant Mongo connection broker (M7.2 isolation model).
///
/// Holds one [`Client`] and hands out [`Database`] handles physically
/// scoped to `<db_prefix>_<tenant_id>`. The driver is the isolation
/// boundary — a handle for tenant A cannot see tenant B's documents
/// because it is connected to a different database, not because of an
/// application-level filter.
///
/// Index creation runs idempotently the first time each tenant is seen
/// in the process's lifetime. Mongo's `createIndex` is itself idempotent
/// by index name; the in-memory `ensured` set just skips the round-trip.
#[derive(Clone, Debug)]
pub struct DatabasePool {
client: Client,
db_prefix: String,
ensured: Arc<DashMap<String, ()>>,
}
impl DatabasePool {
/// Connect to the cluster and prepare to hand out tenant databases
/// named `<db_prefix>_<tenant_id>`.
///
/// Validates `db_prefix.len() <= MAX_PREFIX_LEN` so the
/// hash-fallback path is provably within Mongo's 63-byte db-name
/// cap. Refuses to construct a pool that could ever produce an
/// over-long name.
pub async fn connect(uri: &str, db_prefix: &str) -> Result<Self, AgentError> {
if db_prefix.len() > MAX_PREFIX_LEN {
return Err(AgentError::Other(format!(
"db_prefix '{db_prefix}' is {} chars; max is {MAX_PREFIX_LEN} so the \
hash-fallback tenant DB name fits Mongo's {MAX_DB_NAME_LEN}-byte cap",
db_prefix.len()
)));
}
let client = Client::with_uri_str(uri).await?;
client
.database("admin")
.run_command(doc! { "ping": 1 })
.await?;
tracing::info!(
"MongoDB cluster reachable; per-tenant pool ready (db prefix '{db_prefix}')"
);
Ok(Self {
client,
db_prefix: db_prefix.to_string(),
ensured: Arc::new(DashMap::new()),
})
}
/// Return a [`Database`] scoped to this tenant. Ensures indexes on
/// first call per tenant (per process). Cheap on the hot path —
/// subsequent calls skip the round-trip.
pub async fn for_tenant(&self, ctx: &TenantContext) -> Result<Database, AgentError> {
self.for_tenant_id(&ctx.tenant_id).await
}
/// Like [`Self::for_tenant`] but accepts a bare tenant_id.
/// For background paths (scheduler, webhooks, pipeline orchestrators)
/// that don't have a full [`TenantContext`] but know which tenant
/// they're operating on (typically resolved from a URL path, a job
/// argument, or the registry).
pub async fn for_tenant_id(&self, tenant_id: &str) -> Result<Database, AgentError> {
let db_name = self.tenant_db_name(tenant_id);
let db = Database::from_database(self.client.database(&db_name));
// `DashMap::insert` returns the previous value; `None` means we
// were the first writer for this tenant_id and own the
// index-ensure work.
if self.ensured.insert(tenant_id.to_string(), ()).is_none() {
if let Err(e) = db.ensure_indexes().await {
// Roll the marker back so the next request retries.
self.ensured.remove(tenant_id);
return Err(e);
}
tracing::debug!(
tenant_id = %tenant_id,
db_name = %db_name,
"Indexes ensured for tenant database"
);
}
Ok(db)
}
/// Compute the Mongo database name for a tenant. Public for tests
/// and tenant offboarding (`pool.client().database(name).drop()`).
///
/// Format: `<prefix>_<sanitized_tenant_id>` if it fits the 63-byte
/// cap, else `<prefix>_<sha256-16-byte-hex-of-tenant_id>`. The
/// `db_prefix` length invariant established at [`Self::connect`]
/// guarantees the hash-fallback name always fits — no runtime
/// assertion needed.
///
/// Collision resistance: the hash fallback is a 16-byte SHA-256
/// truncation, which gives ~2^64 birthday-collision resistance. At
/// our 10s100s tenant scale the probability of two tenant_ids
/// colliding is effectively zero. (8-byte truncation would have
/// been ~2^32 — too close for comfort on a regulated product.)
pub fn tenant_db_name(&self, tenant_id: &str) -> String {
let sanitized = sanitize_tenant_id(tenant_id);
let natural = format!("{}_{}", self.db_prefix, sanitized);
if natural.len() <= MAX_DB_NAME_LEN {
natural
} else {
let mut hasher = Sha256::new();
hasher.update(tenant_id.as_bytes());
let digest = hasher.finalize();
let suffix = hex::encode(&digest[..HASH_HEX_LEN / 2]);
format!("{}_{}", self.db_prefix, suffix)
}
}
/// Raw client handle. Reserved for cross-tenant admin flows that
/// must opt in explicitly (tenant listing, drop-on-offboard).
pub fn client(&self) -> &Client {
&self.client
}
/// List every Mongo database currently belonging to this pool,
/// identified by the `<db_prefix>_` prefix. The result is the raw
/// database names — opening one for offboarding/cleanup goes
/// through [`Self::client`].
///
/// Note: hashed-fallback names (very long tenant_ids) lose the
/// original tenant_id at the cluster level — we know a database
/// exists for *some* tenant but not which one. In practice
/// tenant_ids are UUIDs (36 chars) and never hit the fallback,
/// so this is a theoretical concern, not an operational one.
pub async fn list_tenant_db_names(&self) -> Result<Vec<String>, AgentError> {
let prefix = format!("{}_", self.db_prefix);
let names = self.client.list_database_names().await?;
Ok(names
.into_iter()
.filter(|n| n.starts_with(&prefix))
.collect())
}
/// Drop the database for a specific tenant. Used by GDPR delete
/// and tenant offboarding. Idempotent — dropping a non-existent
/// database is a no-op at the driver level.
///
/// Also evicts the tenant from the in-memory `ensured` set so a
/// later re-provision triggers fresh `ensure_indexes`.
pub async fn drop_tenant(&self, tenant_id: &str) -> Result<(), AgentError> {
let db_name = self.tenant_db_name(tenant_id);
self.client.database(&db_name).drop().await?;
self.ensured.remove(tenant_id);
tracing::info!(
tenant_id = %tenant_id,
db_name = %db_name,
"Dropped tenant database"
);
Ok(())
}
}
/// Mongo database names disallow `/`, `\`, `.`, `"`, `$`, ` `, and NUL.
/// breakpilot-dev tenant_ids are UUIDs so this is belt-and-braces, but
/// it lets the pool tolerate any future tenant_id shape without surprise.
fn sanitize_tenant_id(tenant_id: &str) -> String {
tenant_id
.chars()
.map(|c| match c {
'/' | '\\' | '.' | '"' | '$' | ' ' | '\0' => '_',
c => c,
})
.collect()
}
#[derive(Clone, Debug)]
pub struct Database {
inner: mongodb::Database,
@@ -206,12 +20,6 @@ impl Database {
Ok(Self { inner: db })
}
/// Wrap an already-resolved Mongo database. Used by [`DatabasePool`]
/// to hand out tenant-scoped handles without a fresh client per tenant.
pub(crate) fn from_database(inner: mongodb::Database) -> Self {
Self { inner }
}
pub async fn ensure_indexes(&self) -> Result<(), AgentError> {
// repositories: unique git_url
self.repositories()
+3 -6
View File
@@ -25,13 +25,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
tracing::info!("Connecting to MongoDB...");
// Per-tenant pool only — the agent has no shared "default" database
// after M7.2-D. `mongodb_database` is now the db-name prefix used
// for tenant databases (`<prefix>_<tenant_id>`).
let db_pool =
database::DatabasePool::connect(&config.mongodb_uri, &config.mongodb_database).await?;
let db = database::Database::connect(&config.mongodb_uri, &config.mongodb_database).await?;
db.ensure_indexes().await?;
let agent = agent::ComplianceAgent::new(config.clone(), db_pool);
let agent = agent::ComplianceAgent::new(config.clone(), db.clone());
tracing::info!("Starting scheduler...");
let scheduler_agent = agent.clone();
+22 -77
View File
@@ -4,14 +4,8 @@ use tokio_cron_scheduler::{Job, JobScheduler};
use compliance_core::models::ScanTrigger;
use crate::agent::ComplianceAgent;
use crate::database::Database;
use crate::error::AgentError;
/// Default tenant the scheduler runs against when `SCHEDULER_TENANT_IDS`
/// isn't set. Matches the dev-injector default so a bare `cargo run` has
/// the scheduler scanning whatever lives in `<prefix>_dev`.
const DEFAULT_SCHEDULER_TENANT_ID: &str = "dev";
pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> {
let sched = JobScheduler::new()
.await
@@ -24,9 +18,7 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
let agent = scan_agent.clone();
Box::pin(async move {
tracing::info!("Scheduled scan triggered");
for tenant_id in scheduler_tenants() {
scan_all_repos(&agent, &tenant_id).await;
}
scan_all_repos(&agent).await;
})
})
.map_err(|e| AgentError::Scheduler(format!("Failed to create scan job: {e}")))?;
@@ -42,9 +34,7 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
let agent = cve_agent.clone();
Box::pin(async move {
tracing::info!("CVE monitor triggered");
for tenant_id in scheduler_tenants() {
monitor_cves(&agent, &tenant_id).await;
}
monitor_cves(&agent).await;
})
})
.map_err(|e| AgentError::Scheduler(format!("Failed to create CVE monitor job: {e}")))?;
@@ -58,9 +48,8 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
.await
.map_err(|e| AgentError::Scheduler(format!("Failed to start scheduler: {e}")))?;
let tenants = scheduler_tenants();
tracing::info!(
"Scheduler started: scans='{}', CVE monitor='{}', tenants={tenants:?}",
"Scheduler started: scans='{}', CVE monitor='{}'",
agent.config.scan_schedule,
agent.config.cve_monitor_schedule,
);
@@ -71,47 +60,13 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
}
}
/// Tenants the scheduler iterates each tick. From `SCHEDULER_TENANT_IDS`
/// (comma-separated), or `DEFAULT_SCHEDULER_TENANT_ID` if unset. M7.2-D
/// will replace this with a pull from the tenant-registry.
fn scheduler_tenants() -> Vec<String> {
std::env::var("SCHEDULER_TENANT_IDS")
.ok()
.map(|s| {
s.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.map(String::from)
.collect::<Vec<_>>()
})
.filter(|v| !v.is_empty())
.unwrap_or_else(|| vec![DEFAULT_SCHEDULER_TENANT_ID.to_string()])
}
/// Resolve the per-tenant database. Logs and returns `None` on failure
/// so the loop in the caller can continue with other tenants.
async fn tenant_db(agent: &ComplianceAgent, tenant_id: &str) -> Option<Database> {
match agent.db_pool.for_tenant_id(tenant_id).await {
Ok(db) => Some(db),
Err(e) => {
tracing::error!("Scheduler: cannot open tenant database '{tenant_id}': {e}");
None
}
}
}
async fn scan_all_repos(agent: &ComplianceAgent, tenant_id: &str) {
async fn scan_all_repos(agent: &ComplianceAgent) {
use futures_util::StreamExt;
let db = match tenant_db(agent, tenant_id).await {
Some(db) => db,
None => return,
};
let cursor = match db.repositories().find(doc! {}).await {
let cursor = match agent.db.repositories().find(doc! {}).await {
Ok(c) => c,
Err(e) => {
tracing::error!("Failed to list repos for tenant '{tenant_id}': {e}");
tracing::error!("Failed to list repos for scheduled scan: {e}");
return;
}
};
@@ -120,44 +75,33 @@ async fn scan_all_repos(agent: &ComplianceAgent, tenant_id: &str) {
for repo in repos {
let repo_id = repo.id.map(|id| id.to_hex()).unwrap_or_default();
if let Err(e) = agent
.run_scan(tenant_id, &repo_id, ScanTrigger::Scheduled)
.await
{
tracing::error!(
"Scheduled scan failed for {} (tenant '{tenant_id}'): {e}",
repo.name
);
if let Err(e) = agent.run_scan(&repo_id, ScanTrigger::Scheduled).await {
tracing::error!("Scheduled scan failed for {}: {e}", repo.name);
}
}
}
async fn monitor_cves(agent: &ComplianceAgent, tenant_id: &str) {
async fn monitor_cves(agent: &ComplianceAgent) {
use compliance_core::models::notification::{parse_severity, CveNotification};
use compliance_core::models::SbomEntry;
use futures_util::StreamExt;
let db = match tenant_db(agent, tenant_id).await {
Some(db) => db,
None => return,
};
// Fetch all SBOM entries grouped by repo
let cursor = match db.sbom_entries().find(doc! {}).await {
let cursor = match agent.db.sbom_entries().find(doc! {}).await {
Ok(c) => c,
Err(e) => {
tracing::error!("CVE monitor: failed to list SBOM entries for '{tenant_id}': {e}");
tracing::error!("CVE monitor: failed to list SBOM entries: {e}");
return;
}
};
let entries: Vec<SbomEntry> = cursor.filter_map(|r| async { r.ok() }).collect().await;
if entries.is_empty() {
tracing::debug!("CVE monitor: no SBOM entries for tenant '{tenant_id}', skipping");
tracing::debug!("CVE monitor: no SBOM entries, skipping");
return;
}
tracing::info!(
"CVE monitor: checking {} dependencies for new CVEs (tenant '{tenant_id}')",
"CVE monitor: checking {} dependencies for new CVEs",
entries.len()
);
@@ -168,7 +112,7 @@ async fn monitor_cves(agent: &ComplianceAgent, tenant_id: &str) {
std::collections::HashMap::new();
for rid in &repo_ids {
if let Ok(oid) = mongodb::bson::oid::ObjectId::parse_str(rid) {
if let Ok(Some(repo)) = db.repositories().find_one(doc! { "_id": oid }).await {
if let Ok(Some(repo)) = agent.db.repositories().find_one(doc! { "_id": oid }).await {
repo_names.insert(rid.clone(), repo.name.clone());
}
}
@@ -216,7 +160,8 @@ async fn monitor_cves(agent: &ComplianceAgent, tenant_id: &str) {
for alert in &alerts {
let filter = doc! { "cve_id": &alert.cve_id, "repo_id": &alert.repo_id };
let update = doc! { "$setOnInsert": mongodb::bson::to_bson(alert).unwrap_or_default() };
let _ = db
let _ = agent
.db
.cve_alerts()
.update_one(filter, update)
.upsert(true)
@@ -229,7 +174,8 @@ async fn monitor_cves(agent: &ComplianceAgent, tenant_id: &str) {
continue;
}
if let Some(entry_id) = &entry.id {
let _ = db
let _ = agent
.db
.sbom_entries()
.update_one(
doc! { "_id": entry_id },
@@ -267,7 +213,8 @@ async fn monitor_cves(agent: &ComplianceAgent, tenant_id: &str) {
let update = doc! {
"$setOnInsert": mongodb::bson::to_bson(&notification).unwrap_or_default()
};
match db
match agent
.db
.cve_notifications()
.update_one(filter, update)
.upsert(true)
@@ -285,10 +232,8 @@ async fn monitor_cves(agent: &ComplianceAgent, tenant_id: &str) {
}
if new_notifications > 0 {
tracing::info!(
"CVE monitor: created {new_notifications} new notification(s) for tenant '{tenant_id}'"
);
tracing::info!("CVE monitor: created {new_notifications} new notification(s)");
} else {
tracing::info!("CVE monitor: no new CVEs found for tenant '{tenant_id}'");
tracing::info!("CVE monitor: no new CVEs found");
}
}
+9 -23
View File
@@ -14,30 +14,24 @@ type HmacSha256 = Hmac<Sha256>;
pub async fn handle_gitea_webhook(
Extension(agent): Extension<Arc<ComplianceAgent>>,
Path((tenant_id, repo_id)): Path<(String, String)>,
Path(repo_id): Path<String>,
headers: HeaderMap,
body: Bytes,
) -> StatusCode {
// Look up the repo in the tenant's database to get its webhook secret
// Look up the repo to get its webhook secret
let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) {
Ok(oid) => oid,
Err(_) => return StatusCode::NOT_FOUND,
};
let db = match agent.db_pool.for_tenant_id(&tenant_id).await {
Ok(db) => db,
Err(e) => {
tracing::warn!("Gitea webhook: cannot open tenant database '{tenant_id}': {e}");
return StatusCode::NOT_FOUND;
}
};
let repo = match db
let repo = match agent
.db
.repositories()
.find_one(mongodb::bson::doc! { "_id": oid })
.await
{
Ok(Some(repo)) => repo,
_ => {
tracing::warn!("Gitea webhook: repo {repo_id} not found in tenant '{tenant_id}'");
tracing::warn!("Gitea webhook: repo {repo_id} not found");
return StatusCode::NOT_FOUND;
}
};
@@ -72,21 +66,15 @@ pub async fn handle_gitea_webhook(
"push" => {
let agent_clone = (*agent).clone();
let repo_id = repo_id.clone();
let tenant_id = tenant_id.clone();
tokio::spawn(async move {
tracing::info!(
"Gitea push webhook: triggering scan for {repo_id} in tenant {tenant_id}"
);
if let Err(e) = agent_clone
.run_scan(&tenant_id, &repo_id, ScanTrigger::Webhook)
.await
{
tracing::info!("Gitea push webhook: triggering scan for {repo_id}");
if let Err(e) = agent_clone.run_scan(&repo_id, ScanTrigger::Webhook).await {
tracing::error!("Webhook-triggered scan failed: {e}");
}
});
StatusCode::OK
}
"pull_request" => handle_pull_request(agent, &tenant_id, &repo_id, &payload).await,
"pull_request" => handle_pull_request(agent, &repo_id, &payload).await,
_ => {
tracing::debug!("Gitea webhook: ignoring event '{event}'");
StatusCode::OK
@@ -96,7 +84,6 @@ pub async fn handle_gitea_webhook(
async fn handle_pull_request(
agent: Arc<ComplianceAgent>,
tenant_id: &str,
repo_id: &str,
payload: &serde_json::Value,
) -> StatusCode {
@@ -119,14 +106,13 @@ async fn handle_pull_request(
}
let repo_id = repo_id.to_string();
let tenant_id = tenant_id.to_string();
let head_sha = head_sha.to_string();
let base_sha = base_sha.to_string();
let agent_clone = (*agent).clone();
tokio::spawn(async move {
tracing::info!("Gitea PR webhook: reviewing PR #{pr_number} on {repo_id}");
if let Err(e) = agent_clone
.run_pr_review(&tenant_id, &repo_id, pr_number, &base_sha, &head_sha)
.run_pr_review(&repo_id, pr_number, &base_sha, &head_sha)
.await
{
tracing::error!("PR review failed for #{pr_number}: {e}");
+9 -23
View File
@@ -14,30 +14,24 @@ type HmacSha256 = Hmac<Sha256>;
pub async fn handle_github_webhook(
Extension(agent): Extension<Arc<ComplianceAgent>>,
Path((tenant_id, repo_id)): Path<(String, String)>,
Path(repo_id): Path<String>,
headers: HeaderMap,
body: Bytes,
) -> StatusCode {
// Look up the repo in the tenant's database to get its webhook secret
// Look up the repo to get its webhook secret
let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) {
Ok(oid) => oid,
Err(_) => return StatusCode::NOT_FOUND,
};
let db = match agent.db_pool.for_tenant_id(&tenant_id).await {
Ok(db) => db,
Err(e) => {
tracing::warn!("GitHub webhook: cannot open tenant database '{tenant_id}': {e}");
return StatusCode::NOT_FOUND;
}
};
let repo = match db
let repo = match agent
.db
.repositories()
.find_one(mongodb::bson::doc! { "_id": oid })
.await
{
Ok(Some(repo)) => repo,
_ => {
tracing::warn!("GitHub webhook: repo {repo_id} not found in tenant '{tenant_id}'");
tracing::warn!("GitHub webhook: repo {repo_id} not found");
return StatusCode::NOT_FOUND;
}
};
@@ -72,21 +66,15 @@ pub async fn handle_github_webhook(
"push" => {
let agent_clone = (*agent).clone();
let repo_id = repo_id.clone();
let tenant_id = tenant_id.clone();
tokio::spawn(async move {
tracing::info!(
"GitHub push webhook: triggering scan for {repo_id} in tenant {tenant_id}"
);
if let Err(e) = agent_clone
.run_scan(&tenant_id, &repo_id, ScanTrigger::Webhook)
.await
{
tracing::info!("GitHub push webhook: triggering scan for {repo_id}");
if let Err(e) = agent_clone.run_scan(&repo_id, ScanTrigger::Webhook).await {
tracing::error!("Webhook-triggered scan failed: {e}");
}
});
StatusCode::OK
}
"pull_request" => handle_pull_request(agent, &tenant_id, &repo_id, &payload).await,
"pull_request" => handle_pull_request(agent, &repo_id, &payload).await,
_ => {
tracing::debug!("GitHub webhook: ignoring event '{event}'");
StatusCode::OK
@@ -96,7 +84,6 @@ pub async fn handle_github_webhook(
async fn handle_pull_request(
agent: Arc<ComplianceAgent>,
tenant_id: &str,
repo_id: &str,
payload: &serde_json::Value,
) -> StatusCode {
@@ -118,14 +105,13 @@ async fn handle_pull_request(
}
let repo_id = repo_id.to_string();
let tenant_id = tenant_id.to_string();
let head_sha = head_sha.to_string();
let base_sha = base_sha.to_string();
let agent_clone = (*agent).clone();
tokio::spawn(async move {
tracing::info!("GitHub PR webhook: reviewing PR #{pr_number} on {repo_id}");
if let Err(e) = agent_clone
.run_pr_review(&tenant_id, &repo_id, pr_number, &base_sha, &head_sha)
.run_pr_review(&repo_id, pr_number, &base_sha, &head_sha)
.await
{
tracing::error!("PR review failed for #{pr_number}: {e}");
+9 -23
View File
@@ -10,30 +10,24 @@ use crate::agent::ComplianceAgent;
pub async fn handle_gitlab_webhook(
Extension(agent): Extension<Arc<ComplianceAgent>>,
Path((tenant_id, repo_id)): Path<(String, String)>,
Path(repo_id): Path<String>,
headers: HeaderMap,
body: Bytes,
) -> StatusCode {
// Look up the repo in the tenant's database to get its webhook secret
// Look up the repo to get its webhook secret
let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) {
Ok(oid) => oid,
Err(_) => return StatusCode::NOT_FOUND,
};
let db = match agent.db_pool.for_tenant_id(&tenant_id).await {
Ok(db) => db,
Err(e) => {
tracing::warn!("GitLab webhook: cannot open tenant database '{tenant_id}': {e}");
return StatusCode::NOT_FOUND;
}
};
let repo = match db
let repo = match agent
.db
.repositories()
.find_one(mongodb::bson::doc! { "_id": oid })
.await
{
Ok(Some(repo)) => repo,
_ => {
tracing::warn!("GitLab webhook: repo {repo_id} not found in tenant '{tenant_id}'");
tracing::warn!("GitLab webhook: repo {repo_id} not found");
return StatusCode::NOT_FOUND;
}
};
@@ -65,21 +59,15 @@ pub async fn handle_gitlab_webhook(
"push" => {
let agent_clone = (*agent).clone();
let repo_id = repo_id.clone();
let tenant_id = tenant_id.clone();
tokio::spawn(async move {
tracing::info!(
"GitLab push webhook: triggering scan for {repo_id} in tenant {tenant_id}"
);
if let Err(e) = agent_clone
.run_scan(&tenant_id, &repo_id, ScanTrigger::Webhook)
.await
{
tracing::info!("GitLab push webhook: triggering scan for {repo_id}");
if let Err(e) = agent_clone.run_scan(&repo_id, ScanTrigger::Webhook).await {
tracing::error!("Webhook-triggered scan failed: {e}");
}
});
StatusCode::OK
}
"merge_request" => handle_merge_request(agent, &tenant_id, &repo_id, &payload).await,
"merge_request" => handle_merge_request(agent, &repo_id, &payload).await,
_ => {
tracing::debug!("GitLab webhook: ignoring event '{event_type}'");
StatusCode::OK
@@ -89,7 +77,6 @@ pub async fn handle_gitlab_webhook(
async fn handle_merge_request(
agent: Arc<ComplianceAgent>,
tenant_id: &str,
repo_id: &str,
payload: &serde_json::Value,
) -> StatusCode {
@@ -114,14 +101,13 @@ async fn handle_merge_request(
}
let repo_id = repo_id.to_string();
let tenant_id = tenant_id.to_string();
let head_sha = head_sha.to_string();
let base_sha = base_sha.to_string();
let agent_clone = (*agent).clone();
tokio::spawn(async move {
tracing::info!("GitLab MR webhook: reviewing MR !{mr_iid} on {repo_id}");
if let Err(e) = agent_clone
.run_pr_review(&tenant_id, &repo_id, mr_iid, &base_sha, &head_sha)
.run_pr_review(&repo_id, mr_iid, &base_sha, &head_sha)
.await
{
tracing::error!("MR review failed for !{mr_iid}: {e}");
+4 -8
View File
@@ -9,21 +9,17 @@ use crate::webhooks::{gitea, github, gitlab};
pub async fn start_webhook_server(agent: &ComplianceAgent) -> Result<(), AgentError> {
let app = Router::new()
// Per-tenant per-repo webhook URLs: /webhook/{tenant_id}/{platform}/{repo_id}
// The tenant_id is resolved from the URL path because webhooks
// arrive without a JWT — they're authenticated via per-repo HMAC,
// not via the tenant gate. The dashboard surfaces the full URL
// including the tenant_id when the repo is registered.
// Per-repo webhook URLs: /webhook/{platform}/{repo_id}
.route(
"/webhook/{tenant_id}/github/{repo_id}",
"/webhook/github/{repo_id}",
post(github::handle_github_webhook),
)
.route(
"/webhook/{tenant_id}/gitlab/{repo_id}",
"/webhook/gitlab/{repo_id}",
post(gitlab::handle_gitlab_webhook),
)
.route(
"/webhook/{tenant_id}/gitea/{repo_id}",
"/webhook/gitea/{repo_id}",
post(gitea::handle_gitea_webhook),
)
.layer(Extension(Arc::new(agent.clone())));
+8 -20
View File
@@ -7,7 +7,7 @@ use std::sync::Arc;
use compliance_agent::agent::ComplianceAgent;
use compliance_agent::api;
use compliance_agent::database::DatabasePool;
use compliance_agent::database::Database;
use compliance_core::AgentConfig;
use secrecy::SecretString;
@@ -28,9 +28,10 @@ impl TestServer {
// Unique database name per test run to avoid collisions
let db_name = format!("test_{}", uuid::Uuid::new_v4().simple());
let db_pool = DatabasePool::connect(&mongodb_uri, &db_name)
let db = Database::connect(&mongodb_uri, &db_name)
.await
.expect("Failed to build DatabasePool");
.expect("Failed to connect to MongoDB — is it running?");
db.ensure_indexes().await.expect("Failed to create indexes");
let config = AgentConfig {
mongodb_uri: mongodb_uri.clone(),
@@ -68,15 +69,11 @@ impl TestServer {
pentest_imap_password: None,
};
let agent = ComplianceAgent::new(config, db_pool);
let agent = ComplianceAgent::new(config, db);
// Build the router with the agent extension. After M7.2-B every
// handler takes a TenantCtx extractor; without KC in the test
// harness, the dev-tenant injector mounts a synthetic context so
// tests run end-to-end against `<db_name>_dev`.
// Build the router with the agent extension
let app = api::routes::build_router()
.layer(axum::extract::Extension(Arc::new(agent)))
.layer(axum::middleware::from_fn(api::server::inject_dev_tenant))
.layer(tower_http::cors::CorsLayer::permissive());
// Bind to port 0 to get a random available port
@@ -159,19 +156,10 @@ impl TestServer {
&self.db_name
}
/// Drop every per-tenant database belonging to this test run.
/// Post-M7.2-D the agent never opens a `db_name` directly —
/// data lives only in `<db_name>_<tenant>` per-tenant databases.
/// Drop the test database on cleanup
pub async fn cleanup(&self) {
if let Ok(client) = mongodb::Client::with_uri_str(&self.mongodb_uri).await {
if let Ok(names) = client.list_database_names().await {
let prefix = format!("{}_", self.db_name);
for name in names {
if name.starts_with(&prefix) {
client.database(&name).drop().await.ok();
}
}
}
client.database(&self.db_name).drop().await.ok();
}
}
}
-298
View File
@@ -1,298 +0,0 @@
//! M7.2-A — `DatabasePool` isolation proof.
//!
//! Two `TenantContext`s, two databases, one client. Insert on A, query
//! on B → empty. Insert on B, query on A → only A's docs. Proves that
//! the per-tenant database split actually isolates at the driver level
//! and not at "we hope we filter."
//!
//! Requires MongoDB. Set `TEST_MONGODB_URI` to override the default
//! `mongodb://root:example@localhost:27017/?authSource=admin`.
#![allow(clippy::expect_used, clippy::unwrap_used)]
use compliance_agent::database::DatabasePool;
use compliance_core::models::TrackedRepository;
use compliance_core::{OrgRole, TenantContext, TenantStatus};
use mongodb::bson::doc;
fn ctx(tenant_id: &str, slug: &str) -> TenantContext {
TenantContext {
tenant_id: tenant_id.to_string(),
tenant_slug: slug.to_string(),
org_roles: vec![OrgRole::ItAdmin],
products: vec!["compliance-scanner".to_string()],
plan: "starter".to_string(),
status: TenantStatus::Active,
user_id: "u-1".to_string(),
user_name: None,
}
}
fn fixture_repo(name: &str, git_url: &str) -> TrackedRepository {
TrackedRepository {
id: None,
name: name.to_string(),
git_url: git_url.to_string(),
default_branch: "main".to_string(),
local_path: None,
scan_schedule: None,
webhook_enabled: false,
webhook_secret: None,
tracker_type: None,
tracker_owner: None,
tracker_repo: None,
tracker_token: None,
auth_token: None,
auth_username: None,
last_scanned_commit: None,
findings_count: 0,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
}
}
#[tokio::test]
async fn pool_isolates_tenants_at_driver_level() {
let uri = std::env::var("TEST_MONGODB_URI")
.unwrap_or_else(|_| "mongodb://root:example@localhost:27017/?authSource=admin".into());
// Unique per run so parallel test invocations don't collide. Kept
// short because Mongo caps db names at 63 bytes (prefix + tenant_id).
let prefix = format!("m72a_{}", short_id());
let pool = DatabasePool::connect(&uri, &prefix)
.await
.expect("Failed to connect to MongoDB — is it running?");
let acme = ctx("00000000-0000-0000-0000-00000000acme", "acme");
let globex = ctx("00000000-0000-0000-0000-0000globex000", "globex");
let acme_db = pool.for_tenant(&acme).await.expect("acme db");
let globex_db = pool.for_tenant(&globex).await.expect("globex db");
// Write distinct repos into each tenant's database.
acme_db
.repositories()
.insert_one(fixture_repo("acme-app", "git@example.com:acme/app.git"))
.await
.expect("insert acme");
globex_db
.repositories()
.insert_one(fixture_repo(
"globex-platform",
"git@example.com:globex/platform.git",
))
.await
.expect("insert globex");
// The point of the whole exercise: acme can ONLY see acme's repo
// and globex can ONLY see globex's, with no filter doc anywhere
// because the isolation is at the database handle, not in the query.
let acme_seen = collect(&acme_db).await;
let globex_seen = collect(&globex_db).await;
assert_eq!(acme_seen.len(), 1, "acme should see exactly its own repo");
assert_eq!(acme_seen[0].name, "acme-app");
assert_eq!(
globex_seen.len(),
1,
"globex should see exactly its own repo"
);
assert_eq!(globex_seen[0].name, "globex-platform");
// Sanity: the two databases really are different by name.
let acme_db_name = pool.tenant_db_name(&acme.tenant_id);
let globex_db_name = pool.tenant_db_name(&globex.tenant_id);
assert_ne!(acme_db_name, globex_db_name);
assert!(acme_db_name.starts_with(&prefix));
// Cleanup — drop both per-tenant databases.
pool.client()
.database(&acme_db_name)
.drop()
.await
.expect("drop acme");
pool.client()
.database(&globex_db_name)
.drop()
.await
.expect("drop globex");
}
#[tokio::test]
async fn for_tenant_is_idempotent_index_creation() {
let uri = std::env::var("TEST_MONGODB_URI")
.unwrap_or_else(|_| "mongodb://root:example@localhost:27017/?authSource=admin".into());
let prefix = format!("m72a_{}", short_id());
let pool = DatabasePool::connect(&uri, &prefix).await.expect("connect");
let acme = ctx("00000000-0000-0000-0000-00000000acme", "acme");
// Second call must not fail (ensure_indexes already ran, in-memory
// marker is set, Mongo's createIndex is idempotent by name anyway).
let _ = pool.for_tenant(&acme).await.expect("first call");
let _ = pool.for_tenant(&acme).await.expect("second call");
let _ = pool.for_tenant(&acme).await.expect("third call");
// Cleanup
let db_name = pool.tenant_db_name(&acme.tenant_id);
pool.client().database(&db_name).drop().await.expect("drop");
}
#[tokio::test]
async fn tenant_db_name_sanitizes_unsafe_characters() {
let uri = std::env::var("TEST_MONGODB_URI")
.unwrap_or_else(|_| "mongodb://root:example@localhost:27017/?authSource=admin".into());
let pool = DatabasePool::connect(&uri, "m72a_sanitize")
.await
.expect("connect");
// Mongo db names cannot contain `/ \ . " $ <space> NUL`. The pool
// must rewrite these without exploding on connect.
let funky = "te/n.a\\nt$id\" with spaces";
let name = pool.tenant_db_name(funky);
for c in ['/', '\\', '.', '"', '$', ' '] {
assert!(
!name.contains(c),
"sanitized db name still contains {c:?}: {name}"
);
}
}
#[tokio::test]
async fn admin_helpers_list_and_drop_tenant_dbs() {
let uri = std::env::var("TEST_MONGODB_URI")
.unwrap_or_else(|_| "mongodb://root:example@localhost:27017/?authSource=admin".into());
let prefix = format!("m72d_{}", short_id());
let pool = DatabasePool::connect(&uri, &prefix).await.expect("connect");
let acme = ctx("00000000-0000-0000-0000-00000000acme", "acme");
let globex = ctx("00000000-0000-0000-0000-0000globex000", "globex");
// Provision two tenants and write a doc into each so the databases
// actually materialize on the cluster (Mongo lazily creates DBs).
let acme_db = pool.for_tenant(&acme).await.expect("acme db");
let globex_db = pool.for_tenant(&globex).await.expect("globex db");
acme_db
.repositories()
.insert_one(fixture_repo("acme-app", "git@example.com:acme/app.git"))
.await
.expect("insert acme");
globex_db
.repositories()
.insert_one(fixture_repo("globex-app", "git@example.com:globex/app.git"))
.await
.expect("insert globex");
// list_tenant_db_names sees both, filtered by prefix
let names = pool.list_tenant_db_names().await.expect("list tenants");
let acme_name = pool.tenant_db_name(&acme.tenant_id);
let globex_name = pool.tenant_db_name(&globex.tenant_id);
assert!(
names.contains(&acme_name),
"expected {acme_name} in {names:?}"
);
assert!(
names.contains(&globex_name),
"expected {globex_name} in {names:?}"
);
for name in &names {
assert!(name.starts_with(&format!("{prefix}_")));
}
// drop_tenant removes acme's DB
pool.drop_tenant(&acme.tenant_id)
.await
.expect("drop acme tenant");
let after = pool
.list_tenant_db_names()
.await
.expect("list tenants after drop");
assert!(
!after.contains(&acme_name),
"acme should be gone after drop, got {after:?}"
);
assert!(
after.contains(&globex_name),
"globex should still be present, got {after:?}"
);
// Cleanup remaining
pool.drop_tenant(&globex.tenant_id)
.await
.expect("drop globex tenant");
}
#[tokio::test]
async fn tenant_db_name_falls_back_to_hash_when_too_long() {
let uri = std::env::var("TEST_MONGODB_URI")
.unwrap_or_else(|_| "mongodb://root:example@localhost:27017/?authSource=admin".into());
let pool = DatabasePool::connect(&uri, "m72a_long")
.await
.expect("connect");
// 100-byte tenant_id would overflow the 63-byte db-name cap with
// any reasonable prefix. The pool must hash it down.
let huge = "x".repeat(100);
let name = pool.tenant_db_name(&huge);
assert!(name.len() <= 63, "hashed name should fit: {name}");
assert!(name.starts_with("m72a_long_"));
// The hash suffix is 32 hex chars (16-byte SHA-256 truncation).
let suffix = name.trim_start_matches("m72a_long_");
assert_eq!(
suffix.len(),
32,
"expected 32-hex suffix (16-byte hash), got {suffix:?}"
);
assert!(suffix.chars().all(|c| c.is_ascii_hexdigit()));
// Stable: same input → same output.
assert_eq!(name, pool.tenant_db_name(&huge));
// Different inputs → different outputs (collision check on a tiny
// sample — full birthday-resistance is a proof not a test).
let huge2 = "y".repeat(100);
assert_ne!(pool.tenant_db_name(&huge), pool.tenant_db_name(&huge2));
}
#[tokio::test]
async fn connect_rejects_overlong_db_prefix() {
let uri = std::env::var("TEST_MONGODB_URI")
.unwrap_or_else(|_| "mongodb://root:example@localhost:27017/?authSource=admin".into());
// MAX_PREFIX_LEN is 30 (= 63 - 1 - 32). A 31-char prefix MUST be
// rejected at construction so the hash-fallback path can never
// produce an over-long db name at runtime.
let too_long = "a".repeat(31);
let err = DatabasePool::connect(&uri, &too_long).await.unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("max is 30") || msg.contains(&too_long),
"error should explain the cap: {msg}"
);
// Exactly 30 chars is the inclusive bound — must succeed.
let just_right = "a".repeat(30);
let _ = DatabasePool::connect(&uri, &just_right)
.await
.expect("30-char prefix should be accepted");
}
/// Short UUID slug for keeping test prefixes well under Mongo's 63-byte
/// db-name cap.
fn short_id() -> String {
uuid::Uuid::new_v4().simple().to_string()[..8].to_string()
}
/// Drain a `repositories` find cursor on the given tenant database.
async fn collect(db: &compliance_agent::database::Database) -> Vec<TrackedRepository> {
let mut cursor = db
.repositories()
.find(doc! {})
.await
.expect("find repositories");
let mut out = Vec::new();
while cursor.advance().await.expect("advance") {
out.push(cursor.deserialize_current().expect("deserialize"));
}
out
}
@@ -1,122 +0,0 @@
//! M7.1 — integration tests for `compliance_core::auth::require_tenant_status`.
//!
//! Exercises the middleware end-to-end through an Axum router so we
//! catch wiring bugs (extension propagation, method matching) that pure
//! unit tests would miss.
#![allow(clippy::expect_used, clippy::unwrap_used)]
use axum::{
body::Body,
extract::Request,
http::{Method, StatusCode},
middleware::{from_fn, Next},
response::Response,
routing::{get, post},
Router,
};
use compliance_core::{auth::require_tenant_status, TenantContext, TenantStatus};
use tower::ServiceExt;
fn ctx_with(status: TenantStatus) -> TenantContext {
TenantContext {
tenant_id: "t-1".to_string(),
tenant_slug: "acme".to_string(),
org_roles: vec![],
products: vec![],
plan: "starter".to_string(),
status,
user_id: "u-1".to_string(),
user_name: None,
}
}
fn router_with_ctx(ctx: Option<TenantContext>) -> Router {
let injector = move |mut req: Request, next: Next| {
let ctx = ctx.clone();
async move {
if let Some(c) = ctx {
req.extensions_mut().insert(c);
}
next.run(req).await
}
};
Router::new()
.route("/r", get(|| async { "read" }))
.route("/w", post(|| async { "write" }))
.layer(from_fn(require_tenant_status))
.layer(from_fn(injector))
}
async fn call(router: Router, method: Method, path: &str) -> Response {
let req = Request::builder()
.method(method)
.uri(path)
.body(Body::empty())
.expect("request build");
router.oneshot(req).await.expect("oneshot")
}
#[tokio::test]
async fn active_tenant_can_read_and_write() {
let r = router_with_ctx(Some(ctx_with(TenantStatus::Active)));
assert_eq!(
call(r.clone(), Method::GET, "/r").await.status(),
StatusCode::OK
);
assert_eq!(call(r, Method::POST, "/w").await.status(), StatusCode::OK);
}
#[tokio::test]
async fn trial_tenant_can_read_and_write() {
let r = router_with_ctx(Some(ctx_with(TenantStatus::Trial)));
assert_eq!(
call(r.clone(), Method::GET, "/r").await.status(),
StatusCode::OK
);
assert_eq!(call(r, Method::POST, "/w").await.status(), StatusCode::OK);
}
#[tokio::test]
async fn demo_tenant_can_read_and_write() {
let r = router_with_ctx(Some(ctx_with(TenantStatus::Demo)));
assert_eq!(
call(r.clone(), Method::GET, "/r").await.status(),
StatusCode::OK
);
assert_eq!(call(r, Method::POST, "/w").await.status(), StatusCode::OK);
}
#[tokio::test]
async fn frozen_tenant_can_read_but_not_write() {
let r = router_with_ctx(Some(ctx_with(TenantStatus::Frozen)));
assert_eq!(
call(r.clone(), Method::GET, "/r").await.status(),
StatusCode::OK
);
assert_eq!(
call(r, Method::POST, "/w").await.status(),
StatusCode::PAYMENT_REQUIRED
);
}
#[tokio::test]
async fn archived_tenant_is_gone_on_every_method() {
let r = router_with_ctx(Some(ctx_with(TenantStatus::Archived)));
assert_eq!(
call(r.clone(), Method::GET, "/r").await.status(),
StatusCode::GONE
);
assert_eq!(call(r, Method::POST, "/w").await.status(), StatusCode::GONE);
}
#[tokio::test]
async fn no_context_passes_through() {
let r = router_with_ctx(None);
assert_eq!(
call(r.clone(), Method::GET, "/r").await.status(),
StatusCode::OK
);
assert_eq!(call(r, Method::POST, "/w").await.status(), StatusCode::OK);
}
@@ -1,61 +0,0 @@
//! Authenticated HTTP client for talking to the compliance-agent.
//!
//! Every dashboard server function that hits `comp-dev.meghsakha.com/api/v1/*`
//! must go through here so the Keycloak access token from the user's
//! session is attached as `Authorization: Bearer <token>`. Without it
//! the agent's M7.1 `require_jwt_auth` middleware rejects with 401
//! "Missing authorization header".
//!
//! When Keycloak is not configured (dev convenience), the helper
//! returns an unauthenticated builder — matching the agent's
//! pass-through behavior in the same state.
use dioxus::prelude::ServerFnError;
use dioxus_fullstack::FullstackContext;
use reqwest::Method;
use super::auth::LOGGED_IN_USER_SESS_KEY;
use super::server_state::ServerState;
use super::user_state::UserStateInner;
/// Build a `RequestBuilder` for `<agent_api_url><path>` with the
/// session's access token attached. `path` should include a leading
/// `/`, e.g. `"/api/v1/repositories"`.
pub async fn agent_request(
method: Method,
path: &str,
) -> Result<reqwest::RequestBuilder, ServerFnError> {
let state: ServerState = FullstackContext::extract().await?;
let url = format!("{}{}", state.agent_api_url, path);
let mut req = reqwest::Client::new().request(method, &url);
req = attach_token(req, &state).await?;
Ok(req)
}
/// Same as [`agent_request`] but for `GET`. Convenience for the common case.
pub async fn agent_get(path: &str) -> Result<reqwest::RequestBuilder, ServerFnError> {
agent_request(Method::GET, path).await
}
/// Attach the session's bearer token if Keycloak is configured AND the
/// session has a logged-in user. Otherwise leave the request as-is.
///
/// The Keycloak-disabled path mirrors the dashboard's `require_auth`
/// middleware, which short-circuits when `state.keycloak.is_none()`.
async fn attach_token(
req: reqwest::RequestBuilder,
state: &ServerState,
) -> Result<reqwest::RequestBuilder, ServerFnError> {
if state.keycloak.is_none() {
return Ok(req);
}
let session: tower_sessions::Session = FullstackContext::extract().await?;
let user: Option<UserStateInner> = session
.get(LOGGED_IN_USER_SESS_KEY)
.await
.map_err(|e| ServerFnError::new(format!("session read failed: {e}")))?;
Ok(match user {
Some(u) => req.bearer_auth(u.access_token),
None => req,
})
}
+24 -15
View File
@@ -61,14 +61,16 @@ pub async fn send_chat_message(
message: String,
history: Vec<ChatHistoryMessage>,
) -> Result<ChatApiResponse, ServerFnError> {
// Chat uses a longer timeout because the LLM round-trip can be slow;
// agent_request doesn't expose a per-call timeout so we layer one on.
let resp = super::agent_client::agent_request(
reqwest::Method::POST,
&format!("/api/v1/chat/{repo_id}"),
)
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/chat/{repo_id}", state.agent_api_url);
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(120))
.build()
.map_err(|e| ServerFnError::new(e.to_string()))?;
let resp = client
.post(&url)
.json(&serde_json::json!({
"message": message,
"history": history,
@@ -89,11 +91,16 @@ pub async fn send_chat_message(
#[server]
pub async fn trigger_embedding_build(repo_id: String) -> Result<(), ServerFnError> {
super::agent_client::agent_request(
reqwest::Method::POST,
&format!("/api/v1/chat/{repo_id}/build-embeddings"),
)
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/chat/{repo_id}/build-embeddings",
state.agent_api_url
);
let client = reqwest::Client::new();
client
.post(&url)
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
@@ -104,9 +111,11 @@ pub async fn trigger_embedding_build(repo_id: String) -> Result<(), ServerFnErro
pub async fn fetch_embedding_status(
repo_id: String,
) -> Result<EmbeddingStatusResponse, ServerFnError> {
let resp = super::agent_client::agent_get(&format!("/api/v1/chat/{repo_id}/status"))
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/chat/{repo_id}/status", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: EmbeddingStatusResponse = resp
+31 -19
View File
@@ -26,9 +26,10 @@ pub struct DastFindingDetailResponse {
#[server]
pub async fn fetch_dast_targets() -> Result<DastTargetsResponse, ServerFnError> {
let resp = super::agent_client::agent_get("/api/v1/dast/targets")
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/dast/targets", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: DastTargetsResponse = resp
@@ -40,9 +41,10 @@ pub async fn fetch_dast_targets() -> Result<DastTargetsResponse, ServerFnError>
#[server]
pub async fn fetch_dast_scan_runs() -> Result<DastScanRunsResponse, ServerFnError> {
let resp = super::agent_client::agent_get("/api/v1/dast/scan-runs")
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/dast/scan-runs", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: DastScanRunsResponse = resp
@@ -54,9 +56,10 @@ pub async fn fetch_dast_scan_runs() -> Result<DastScanRunsResponse, ServerFnErro
#[server]
pub async fn fetch_dast_findings() -> Result<DastFindingsResponse, ServerFnError> {
let resp = super::agent_client::agent_get("/api/v1/dast/findings")
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/dast/findings", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: DastFindingsResponse = resp
@@ -70,9 +73,10 @@ pub async fn fetch_dast_findings() -> Result<DastFindingsResponse, ServerFnError
pub async fn fetch_dast_finding_detail(
id: String,
) -> Result<DastFindingDetailResponse, ServerFnError> {
let resp = super::agent_client::agent_get(&format!("/api/v1/dast/findings/{id}"))
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/dast/findings/{id}", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: DastFindingDetailResponse = resp
@@ -84,8 +88,12 @@ pub async fn fetch_dast_finding_detail(
#[server]
pub async fn add_dast_target(name: String, base_url: String) -> Result<(), ServerFnError> {
super::agent_client::agent_request(reqwest::Method::POST, "/api/v1/dast/targets")
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/dast/targets", state.agent_api_url);
let client = reqwest::Client::new();
client
.post(&url)
.json(&serde_json::json!({
"name": name,
"base_url": base_url,
@@ -98,11 +106,15 @@ pub async fn add_dast_target(name: String, base_url: String) -> Result<(), Serve
#[server]
pub async fn trigger_dast_scan(target_id: String) -> Result<(), ServerFnError> {
super::agent_client::agent_request(
reqwest::Method::POST,
&format!("/api/v1/dast/targets/{target_id}/scan"),
)
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/dast/targets/{target_id}/scan",
state.agent_api_url
);
let client = reqwest::Client::new();
client
.post(&url)
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
@@ -24,35 +24,39 @@ pub struct FindingsQuery {
#[server]
pub async fn fetch_findings(query: FindingsQuery) -> Result<FindingsListResponse, ServerFnError> {
let mut path = format!("/api/v1/findings?page={}&limit=20", query.page);
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let mut url = format!(
"{}/api/v1/findings?page={}&limit=20",
state.agent_api_url, query.page
);
if !query.severity.is_empty() {
path.push_str(&format!("&severity={}", query.severity));
url.push_str(&format!("&severity={}", query.severity));
}
if !query.scan_type.is_empty() {
path.push_str(&format!("&scan_type={}", query.scan_type));
url.push_str(&format!("&scan_type={}", query.scan_type));
}
if !query.status.is_empty() {
path.push_str(&format!("&status={}", query.status));
url.push_str(&format!("&status={}", query.status));
}
if !query.repo_id.is_empty() {
path.push_str(&format!("&repo_id={}", query.repo_id));
url.push_str(&format!("&repo_id={}", query.repo_id));
}
if !query.q.is_empty() {
path.push_str(&format!(
url.push_str(&format!(
"&q={}",
url::form_urlencoded::byte_serialize(query.q.as_bytes()).collect::<String>()
));
}
if !query.sort_by.is_empty() {
path.push_str(&format!("&sort_by={}", query.sort_by));
url.push_str(&format!("&sort_by={}", query.sort_by));
}
if !query.sort_order.is_empty() {
path.push_str(&format!("&sort_order={}", query.sort_order));
url.push_str(&format!("&sort_order={}", query.sort_order));
}
let resp = super::agent_client::agent_get(&path)
.await?
.send()
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: FindingsListResponse = resp
@@ -64,9 +68,11 @@ pub async fn fetch_findings(query: FindingsQuery) -> Result<FindingsListResponse
#[server]
pub async fn fetch_finding_detail(id: String) -> Result<Finding, ServerFnError> {
let resp = super::agent_client::agent_get(&format!("/api/v1/findings/{id}"))
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/findings/{id}", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: serde_json::Value = resp
@@ -80,15 +86,18 @@ pub async fn fetch_finding_detail(id: String) -> Result<Finding, ServerFnError>
#[server]
pub async fn update_finding_status(id: String, status: String) -> Result<(), ServerFnError> {
super::agent_client::agent_request(
reqwest::Method::PATCH,
&format!("/api/v1/findings/{id}/status"),
)
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/findings/{id}/status", state.agent_api_url);
let client = reqwest::Client::new();
client
.patch(&url)
.json(&serde_json::json!({ "status": status }))
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
Ok(())
}
@@ -97,25 +106,34 @@ pub async fn bulk_update_finding_status(
ids: Vec<String>,
status: String,
) -> Result<(), ServerFnError> {
super::agent_client::agent_request(reqwest::Method::PATCH, "/api/v1/findings/bulk-status")
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/findings/bulk-status", state.agent_api_url);
let client = reqwest::Client::new();
client
.patch(&url)
.json(&serde_json::json!({ "ids": ids, "status": status }))
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
Ok(())
}
#[server]
pub async fn update_finding_feedback(id: String, feedback: String) -> Result<(), ServerFnError> {
super::agent_client::agent_request(
reqwest::Method::PATCH,
&format!("/api/v1/findings/{id}/feedback"),
)
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/findings/{id}/feedback", state.agent_api_url);
let client = reqwest::Client::new();
client
.patch(&url)
.json(&serde_json::json!({ "feedback": feedback }))
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
Ok(())
}
@@ -50,9 +50,10 @@ pub struct SearchResponse {
#[server]
pub async fn fetch_graph(repo_id: String) -> Result<GraphDataResponse, ServerFnError> {
let resp = super::agent_client::agent_get(&format!("/api/v1/graph/{repo_id}"))
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/graph/{repo_id}", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: GraphDataResponse = resp
@@ -67,10 +68,13 @@ pub async fn fetch_impact(
repo_id: String,
finding_id: String,
) -> Result<ImpactResponse, ServerFnError> {
let resp =
super::agent_client::agent_get(&format!("/api/v1/graph/{repo_id}/impact/{finding_id}"))
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/graph/{repo_id}/impact/{finding_id}",
state.agent_api_url
);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: ImpactResponse = resp
@@ -82,9 +86,10 @@ pub async fn fetch_impact(
#[server]
pub async fn fetch_communities(repo_id: String) -> Result<CommunitiesResponse, ServerFnError> {
let resp = super::agent_client::agent_get(&format!("/api/v1/graph/{repo_id}/communities"))
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/graph/{repo_id}/communities", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: CommunitiesResponse = resp
@@ -99,11 +104,13 @@ pub async fn fetch_file_content(
repo_id: String,
file_path: String,
) -> Result<FileContentResponse, ServerFnError> {
let resp = super::agent_client::agent_get(&format!(
"/api/v1/graph/{repo_id}/file-content?path={file_path}"
))
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/graph/{repo_id}/file-content?path={file_path}",
state.agent_api_url
);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: FileContentResponse = resp
@@ -115,11 +122,13 @@ pub async fn fetch_file_content(
#[server]
pub async fn search_nodes(repo_id: String, query: String) -> Result<SearchResponse, ServerFnError> {
let resp = super::agent_client::agent_get(&format!(
"/api/v1/graph/{repo_id}/search?q={query}&limit=50"
))
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/graph/{repo_id}/search?q={query}&limit=50",
state.agent_api_url
);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: SearchResponse = resp
@@ -131,11 +140,12 @@ pub async fn search_nodes(repo_id: String, query: String) -> Result<SearchRespon
#[server]
pub async fn trigger_graph_build(repo_id: String) -> Result<(), ServerFnError> {
super::agent_client::agent_request(
reqwest::Method::POST,
&format!("/api/v1/graph/{repo_id}/build"),
)
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/graph/{repo_id}/build", state.agent_api_url);
let client = reqwest::Client::new();
client
.post(&url)
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
@@ -12,9 +12,11 @@ pub struct IssuesListResponse {
#[server]
pub async fn fetch_issues(page: u64) -> Result<IssuesListResponse, ServerFnError> {
let resp = super::agent_client::agent_get(&format!("/api/v1/issues?page={page}&limit=20"))
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/issues?page={page}&limit=20", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: IssuesListResponse = resp
@@ -18,8 +18,6 @@ pub mod stats;
// Server-only modules
#[cfg(feature = "server")]
mod agent_client;
#[cfg(feature = "server")]
mod auth;
#[cfg(feature = "server")]
mod auth_middleware;
@@ -32,9 +32,11 @@ pub struct NotificationCountResponse {
#[server]
pub async fn fetch_notification_count() -> Result<u64, ServerFnError> {
let resp = super::agent_client::agent_get("/api/v1/notifications/count")
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/notifications/count", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: NotificationCountResponse = resp
@@ -46,9 +48,11 @@ pub async fn fetch_notification_count() -> Result<u64, ServerFnError> {
#[server]
pub async fn fetch_notifications() -> Result<NotificationListResponse, ServerFnError> {
let resp = super::agent_client::agent_get("/api/v1/notifications?limit=20")
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/notifications?limit=20", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: NotificationListResponse = resp
@@ -60,8 +64,12 @@ pub async fn fetch_notifications() -> Result<NotificationListResponse, ServerFnE
#[server]
pub async fn mark_all_notifications_read() -> Result<(), ServerFnError> {
super::agent_client::agent_request(reqwest::Method::POST, "/api/v1/notifications/read-all")
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/notifications/read-all", state.agent_api_url);
reqwest::Client::new()
.post(&url)
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
@@ -70,11 +78,12 @@ pub async fn mark_all_notifications_read() -> Result<(), ServerFnError> {
#[server]
pub async fn dismiss_notification(id: String) -> Result<(), ServerFnError> {
super::agent_client::agent_request(
reqwest::Method::PATCH,
&format!("/api/v1/notifications/{id}/dismiss"),
)
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/notifications/{id}/dismiss", state.agent_api_url);
reqwest::Client::new()
.patch(&url)
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
@@ -32,10 +32,12 @@ pub struct AttackChainResponse {
#[server]
pub async fn fetch_pentest_sessions() -> Result<PentestSessionsResponse, ServerFnError> {
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
// Fetch sessions
let resp = super::agent_client::agent_get("/api/v1/pentest/sessions")
.await?
.send()
let url = format!("{}/api/v1/pentest/sessions", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let mut body: PentestSessionsResponse = resp
@@ -44,8 +46,8 @@ pub async fn fetch_pentest_sessions() -> Result<PentestSessionsResponse, ServerF
.map_err(|e| ServerFnError::new(e.to_string()))?;
// Fetch DAST targets to resolve target names
if let Ok(tresp_builder) = super::agent_client::agent_get("/api/v1/dast/targets").await {
if let Ok(tresp) = tresp_builder.send().await {
let targets_url = format!("{}/api/v1/dast/targets", state.agent_api_url);
if let Ok(tresp) = reqwest::get(&targets_url).await {
if let Ok(tbody) = tresp.json::<serde_json::Value>().await {
let targets = tbody.get("data").and_then(|v| v.as_array());
if let Some(targets) = targets {
@@ -75,16 +77,16 @@ pub async fn fetch_pentest_sessions() -> Result<PentestSessionsResponse, ServerF
}
}
}
}
Ok(body)
}
#[server]
pub async fn fetch_pentest_session(id: String) -> Result<PentestSessionResponse, ServerFnError> {
let resp = super::agent_client::agent_get(&format!("/api/v1/pentest/sessions/{id}"))
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/pentest/sessions/{id}", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let mut body: PentestSessionResponse = resp
@@ -94,8 +96,8 @@ pub async fn fetch_pentest_session(id: String) -> Result<PentestSessionResponse,
// Resolve target name from targets list
if let Some(tid) = body.data.get("target_id").and_then(|v| v.as_str()) {
if let Ok(tresp_builder) = super::agent_client::agent_get("/api/v1/dast/targets").await {
if let Ok(tresp) = tresp_builder.send().await {
let targets_url = format!("{}/api/v1/dast/targets", state.agent_api_url);
if let Ok(tresp) = reqwest::get(&targets_url).await {
if let Ok(tbody) = tresp.json::<serde_json::Value>().await {
if let Some(targets) = tbody.get("data").and_then(|v| v.as_array()) {
for t in targets {
@@ -120,7 +122,6 @@ pub async fn fetch_pentest_session(id: String) -> Result<PentestSessionResponse,
}
}
}
}
Ok(body)
}
@@ -129,10 +130,13 @@ pub async fn fetch_pentest_session(id: String) -> Result<PentestSessionResponse,
pub async fn fetch_pentest_messages(
session_id: String,
) -> Result<PentestMessagesResponse, ServerFnError> {
let resp =
super::agent_client::agent_get(&format!("/api/v1/pentest/sessions/{session_id}/messages"))
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/pentest/sessions/{session_id}/messages",
state.agent_api_url
);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: PentestMessagesResponse = resp
@@ -144,9 +148,10 @@ pub async fn fetch_pentest_messages(
#[server]
pub async fn fetch_pentest_stats() -> Result<PentestStatsResponse, ServerFnError> {
let resp = super::agent_client::agent_get("/api/v1/pentest/stats")
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/pentest/stats", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: PentestStatsResponse = resp
@@ -158,11 +163,13 @@ pub async fn fetch_pentest_stats() -> Result<PentestStatsResponse, ServerFnError
#[server]
pub async fn fetch_attack_chain(session_id: String) -> Result<AttackChainResponse, ServerFnError> {
let resp = super::agent_client::agent_get(&format!(
"/api/v1/pentest/sessions/{session_id}/attack-chain"
))
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/pentest/sessions/{session_id}/attack-chain",
state.agent_api_url
);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: AttackChainResponse = resp
@@ -178,9 +185,12 @@ pub async fn create_pentest_session(
strategy: String,
message: String,
) -> Result<PentestSessionResponse, ServerFnError> {
let resp =
super::agent_client::agent_request(reqwest::Method::POST, "/api/v1/pentest/sessions")
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/pentest/sessions", state.agent_api_url);
let client = reqwest::Client::new();
let resp = client
.post(&url)
.json(&serde_json::json!({
"target_id": target_id,
"strategy": strategy,
@@ -201,11 +211,14 @@ pub async fn create_pentest_session(
pub async fn create_pentest_session_wizard(
config_json: String,
) -> Result<PentestSessionResponse, ServerFnError> {
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/pentest/sessions", state.agent_api_url);
let config: serde_json::Value =
serde_json::from_str(&config_json).map_err(|e| ServerFnError::new(e.to_string()))?;
let resp =
super::agent_client::agent_request(reqwest::Method::POST, "/api/v1/pentest/sessions")
.await?
let client = reqwest::Client::new();
let resp = client
.post(&url)
.json(&serde_json::json!({ "config": config }))
.send()
.await
@@ -226,6 +239,8 @@ pub async fn create_pentest_session_wizard(
/// Look up a tracked repository by its git URL
#[server]
pub async fn lookup_repo_by_url(url: String) -> Result<serde_json::Value, ServerFnError> {
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let encoded_url: String = url
.bytes()
.flat_map(|b| {
@@ -236,10 +251,11 @@ pub async fn lookup_repo_by_url(url: String) -> Result<serde_json::Value, Server
}
})
.collect();
let resp =
super::agent_client::agent_get(&format!("/api/v1/pentest/lookup-repo?url={encoded_url}"))
.await?
.send()
let api_url = format!(
"{}/api/v1/pentest/lookup-repo?url={}",
state.agent_api_url, encoded_url
);
let resp = reqwest::get(&api_url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: serde_json::Value = resp
@@ -254,11 +270,15 @@ pub async fn send_pentest_message(
session_id: String,
message: String,
) -> Result<PentestMessagesResponse, ServerFnError> {
let resp = super::agent_client::agent_request(
reqwest::Method::POST,
&format!("/api/v1/pentest/sessions/{session_id}/chat"),
)
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/pentest/sessions/{session_id}/chat",
state.agent_api_url
);
let client = reqwest::Client::new();
let resp = client
.post(&url)
.json(&serde_json::json!({
"message": message,
}))
@@ -274,11 +294,15 @@ pub async fn send_pentest_message(
#[server]
pub async fn stop_pentest_session(session_id: String) -> Result<(), ServerFnError> {
super::agent_client::agent_request(
reqwest::Method::POST,
&format!("/api/v1/pentest/sessions/{session_id}/stop"),
)
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/pentest/sessions/{session_id}/stop",
state.agent_api_url
);
let client = reqwest::Client::new();
client
.post(&url)
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
@@ -287,11 +311,15 @@ pub async fn stop_pentest_session(session_id: String) -> Result<(), ServerFnErro
#[server]
pub async fn pause_pentest_session(session_id: String) -> Result<(), ServerFnError> {
let resp = super::agent_client::agent_request(
reqwest::Method::POST,
&format!("/api/v1/pentest/sessions/{session_id}/pause"),
)
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/pentest/sessions/{session_id}/pause",
state.agent_api_url
);
let client = reqwest::Client::new();
let resp = client
.post(&url)
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
@@ -304,11 +332,15 @@ pub async fn pause_pentest_session(session_id: String) -> Result<(), ServerFnErr
#[server]
pub async fn resume_pentest_session(session_id: String) -> Result<(), ServerFnError> {
let resp = super::agent_client::agent_request(
reqwest::Method::POST,
&format!("/api/v1/pentest/sessions/{session_id}/resume"),
)
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/pentest/sessions/{session_id}/resume",
state.agent_api_url
);
let client = reqwest::Client::new();
let resp = client
.post(&url)
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
@@ -323,10 +355,13 @@ pub async fn resume_pentest_session(session_id: String) -> Result<(), ServerFnEr
pub async fn fetch_pentest_findings(
session_id: String,
) -> Result<DastFindingsResponse, ServerFnError> {
let resp =
super::agent_client::agent_get(&format!("/api/v1/pentest/sessions/{session_id}/findings"))
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/pentest/sessions/{session_id}/findings",
state.agent_api_url
);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: DastFindingsResponse = resp
@@ -350,11 +385,15 @@ pub async fn export_pentest_report(
requester_name: String,
requester_email: String,
) -> Result<ExportReportResponse, ServerFnError> {
let resp = super::agent_client::agent_request(
reqwest::Method::POST,
&format!("/api/v1/pentest/sessions/{session_id}/export"),
)
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/pentest/sessions/{session_id}/export",
state.agent_api_url
);
let client = reqwest::Client::new();
let resp = client
.post(&url)
.json(&serde_json::json!({
"password": password,
"requester_name": requester_name,
@@ -12,10 +12,14 @@ pub struct RepositoryListResponse {
#[server]
pub async fn fetch_repositories(page: u64) -> Result<RepositoryListResponse, ServerFnError> {
let path = format!("/api/v1/repositories?page={page}&limit=20");
let resp = super::agent_client::agent_get(&path)
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/repositories?page={page}&limit=20",
state.agent_api_url
);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: RepositoryListResponse = resp
@@ -37,6 +41,10 @@ pub async fn add_repository(
tracker_repo: Option<String>,
tracker_token: Option<String>,
) -> Result<(), ServerFnError> {
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/repositories", state.agent_api_url);
let mut body = serde_json::json!({
"name": name,
"git_url": git_url,
@@ -61,8 +69,9 @@ pub async fn add_repository(
body["tracker_token"] = serde_json::Value::String(tk);
}
let resp = super::agent_client::agent_request(reqwest::Method::POST, "/api/v1/repositories")
.await?
let client = reqwest::Client::new();
let resp = client
.post(&url)
.json(&body)
.send()
.await
@@ -91,6 +100,10 @@ pub async fn update_repository(
tracker_token: Option<String>,
scan_schedule: Option<String>,
) -> Result<(), ServerFnError> {
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/repositories/{repo_id}", state.agent_api_url);
let mut body = serde_json::Map::new();
if let Some(v) = name.filter(|s| !s.is_empty()) {
body.insert("name".into(), serde_json::Value::String(v));
@@ -120,11 +133,9 @@ pub async fn update_repository(
body.insert("scan_schedule".into(), serde_json::Value::String(v));
}
let resp = super::agent_client::agent_request(
reqwest::Method::PATCH,
&format!("/api/v1/repositories/{repo_id}"),
)
.await?
let client = reqwest::Client::new();
let resp = client
.patch(&url)
.json(&body)
.send()
.await
@@ -142,9 +153,11 @@ pub async fn update_repository(
#[server]
pub async fn fetch_ssh_public_key() -> Result<String, ServerFnError> {
let resp = super::agent_client::agent_get("/api/v1/settings/ssh-public-key")
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/settings/ssh-public-key", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
@@ -166,11 +179,13 @@ pub async fn fetch_ssh_public_key() -> Result<String, ServerFnError> {
#[server]
pub async fn delete_repository(repo_id: String) -> Result<(), ServerFnError> {
let resp = super::agent_client::agent_request(
reqwest::Method::DELETE,
&format!("/api/v1/repositories/{repo_id}"),
)
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/repositories/{repo_id}", state.agent_api_url);
let client = reqwest::Client::new();
let resp = client
.delete(&url)
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
@@ -187,11 +202,13 @@ pub async fn delete_repository(repo_id: String) -> Result<(), ServerFnError> {
#[server]
pub async fn trigger_repo_scan(repo_id: String) -> Result<(), ServerFnError> {
super::agent_client::agent_request(
reqwest::Method::POST,
&format!("/api/v1/repositories/{repo_id}/scan"),
)
.await?
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/repositories/{repo_id}/scan", state.agent_api_url);
let client = reqwest::Client::new();
client
.post(&url)
.send()
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
@@ -207,10 +224,14 @@ pub struct WebhookConfigResponse {
#[server]
pub async fn fetch_webhook_config(repo_id: String) -> Result<WebhookConfigResponse, ServerFnError> {
let resp =
super::agent_client::agent_get(&format!("/api/v1/repositories/{repo_id}/webhook-config"))
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/repositories/{repo_id}/webhook-config",
state.agent_api_url
);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: WebhookConfigResponse = resp
@@ -223,9 +244,11 @@ pub async fn fetch_webhook_config(repo_id: String) -> Result<WebhookConfigRespon
/// Check if a repository has any running scans
#[server]
pub async fn check_repo_scanning(repo_id: String) -> Result<bool, ServerFnError> {
let resp = super::agent_client::agent_get("/api/v1/scan-runs?page=1&limit=1")
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/scan-runs?page=1&limit=1", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: serde_json::Value = resp
+35 -20
View File
@@ -87,9 +87,11 @@ pub struct SbomFiltersResponse {
#[server]
pub async fn fetch_sbom_filters() -> Result<SbomFiltersResponse, ServerFnError> {
let resp = super::agent_client::agent_get("/api/v1/sbom/filters")
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/sbom/filters", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let text = resp
@@ -110,6 +112,9 @@ pub async fn fetch_sbom_filtered(
license: Option<String>,
page: u64,
) -> Result<SbomListResponse, ServerFnError> {
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let mut params = vec![format!("page={page}"), "limit=50".to_string()];
if let Some(r) = &repo_id {
if !r.is_empty() {
@@ -135,10 +140,9 @@ pub async fn fetch_sbom_filtered(
}
}
let path = format!("/api/v1/sbom?{}", params.join("&"));
let resp = super::agent_client::agent_get(&path)
.await?
.send()
let url = format!("{}/api/v1/sbom?{}", state.agent_api_url, params.join("&"));
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let text = resp
@@ -152,10 +156,15 @@ pub async fn fetch_sbom_filtered(
#[server]
pub async fn fetch_sbom_export(repo_id: String, format: String) -> Result<String, ServerFnError> {
let path = format!("/api/v1/sbom/export?repo_id={repo_id}&format={format}");
let resp = super::agent_client::agent_get(&path)
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/sbom/export?repo_id={}&format={}",
state.agent_api_url, repo_id, format
);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let text = resp
@@ -169,16 +178,17 @@ pub async fn fetch_sbom_export(repo_id: String, format: String) -> Result<String
pub async fn fetch_license_summary(
repo_id: Option<String>,
) -> Result<LicenseSummaryResponse, ServerFnError> {
let mut path = "/api/v1/sbom/licenses".to_string();
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let mut url = format!("{}/api/v1/sbom/licenses", state.agent_api_url);
if let Some(r) = &repo_id {
if !r.is_empty() {
path = format!("{path}?repo_id={r}");
url = format!("{url}?repo_id={r}");
}
}
let resp = super::agent_client::agent_get(&path)
.await?
.send()
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let text = resp
@@ -195,10 +205,15 @@ pub async fn fetch_sbom_diff(
repo_a: String,
repo_b: String,
) -> Result<SbomDiffResponse, ServerFnError> {
let path = format!("/api/v1/sbom/diff?repo_a={repo_a}&repo_b={repo_b}");
let resp = super::agent_client::agent_get(&path)
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/sbom/diff?repo_a={}&repo_b={}",
state.agent_api_url, repo_a, repo_b
);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let text = resp
@@ -12,9 +12,14 @@ pub struct ScansListResponse {
#[server]
pub async fn fetch_scan_runs(page: u64) -> Result<ScansListResponse, ServerFnError> {
let resp = super::agent_client::agent_get(&format!("/api/v1/scan-runs?page={page}&limit=20"))
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!(
"{}/api/v1/scan-runs?page={page}&limit=20",
state.agent_api_url
);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: ScansListResponse = resp
@@ -16,9 +16,11 @@ pub struct OverviewStats {
#[server]
pub async fn fetch_overview_stats() -> Result<OverviewStats, ServerFnError> {
let resp = super::agent_client::agent_get("/api/v1/stats/overview")
.await?
.send()
let state: super::server_state::ServerState =
dioxus_fullstack::FullstackContext::extract().await?;
let url = format!("{}/api/v1/stats/overview", state.agent_api_url);
let resp = reqwest::get(&url)
.await
.map_err(|e| ServerFnError::new(e.to_string()))?;
let body: serde_json::Value = resp