From 0f6dd1135e2b04b825602235d43930f453e83f07 Mon Sep 17 00:00:00 2001 From: Sharang Parnerkar <30073382+mighty840@users.noreply.github.com> Date: Wed, 17 Jun 2026 15:00:37 +0200 Subject: [PATCH] feat(m7.2-C): migrate background paths to per-tenant pool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the loop on M7.2 isolation for paths that don't have a JWT context: scheduler, webhooks, and the agent's `run_scan` / `run_pr_review` helpers all now take a `tenant_id` at the boundary and resolve to a tenant-scoped `Database` via `db_pool.for_tenant_id(...)`. Internal orchestrators (PipelineOrchestrator, PentestOrchestrator) and pipeline helpers were already DB-agnostic — they take `db: Database` at construction and don't care which tenant it points to. Changes - DatabasePool::for_tenant_id(&str) — same as for_tenant but accepts a bare tenant_id. Background paths don't have a full TenantContext. for_tenant is now a thin wrapper that delegates. - agent.run_scan(tenant_id, repo_id, trigger) — pulls the tenant database before constructing the PipelineOrchestrator. Was: run_scan(repo_id, trigger) reading agent.db. - agent.run_pr_review(tenant_id, repo_id, ...) — same shape. - Webhook routes change: /webhook/{tenant_id}/{platform}/{repo_id}. Tenant is part of the URL path because webhooks arrive without a JWT — they're authenticated via per-repo HMAC, not the tenant gate. The dashboard surfaces the full per-tenant URL when the repo is registered. All three handlers (gitea, github, gitlab) updated. - scheduler.rs — iterates tenants from $SCHEDULER_TENANT_IDS (comma-separated env), or DEV_TENANT_ID's `dev` default. Both scan_all_repos and monitor_cves now run once per configured tenant. M7.2-D will replace this static config with a pull from the tenant-registry. - api/handlers/repos.rs::trigger_scan now passes tenant.0.tenant_id. What's unchanged because it didn't need to change - PipelineOrchestrator, PentestOrchestrator: take `db: Database` at construction — they're tenant-DB-agnostic by design. The caller picks the tenant DB. - pipeline/{dedup,graph_build,issue_creation,sbom/mod}.rs, pentest/{context,report/html/*}.rs, trackers/jira.rs, llm/triage.rs: take `&Database` or `&mongodb::Database` as args, transitively tenant-scoped via the caller. Test plan - cargo fmt --all clean - cargo clippy --workspace --exclude compliance-dashboard -- -D warnings clean - cargo test -p compliance-core --lib — 7 pass - cargo test -p compliance-agent --lib — 228 pass - cargo test -p compliance-agent --test tenant_isolation — 5 pass - cargo test -p compliance-agent --test tenant_status_middleware — 6 pass What's left (PR-D) - Drop the transitional agent.db field — no remaining call sites (verified by `grep -rn "agent\.db\b" compliance-agent/src`). - main.rs / TestServer stop building the legacy Database; only the pool remains. - Add cross-tenant admin helpers (list tenants, drop tenant DB) on the pool for offboarding flows. - Pull tenants from the tenant-registry instead of an env var. Co-Authored-By: Claude Opus 4.7 --- compliance-agent/src/agent.rs | 23 ++--- compliance-agent/src/api/handlers/repos.rs | 7 +- compliance-agent/src/database.rs | 17 +++- compliance-agent/src/scheduler.rs | 99 +++++++++++++++++----- compliance-agent/src/webhooks/gitea.rs | 32 +++++-- compliance-agent/src/webhooks/github.rs | 32 +++++-- compliance-agent/src/webhooks/gitlab.rs | 32 +++++-- compliance-agent/src/webhooks/server.rs | 12 ++- 8 files changed, 182 insertions(+), 72 deletions(-) diff --git a/compliance-agent/src/agent.rs b/compliance-agent/src/agent.rs index bc66705..6ce21f1 100644 --- a/compliance-agent/src/agent.rs +++ b/compliance-agent/src/agent.rs @@ -60,28 +60,27 @@ impl ComplianceAgent { pub async fn run_scan( &self, + tenant_id: &str, repo_id: &str, trigger: compliance_core::models::ScanTrigger, ) -> Result<(), crate::error::AgentError> { - let orchestrator = PipelineOrchestrator::new( - self.config.clone(), - self.db.clone(), - self.llm.clone(), - self.http.clone(), - ); + let db = self.db_pool.for_tenant_id(tenant_id).await?; + let orchestrator = + PipelineOrchestrator::new(self.config.clone(), db, self.llm.clone(), self.http.clone()); orchestrator.run(repo_id, trigger).await } /// Run a PR review: scan the diff and post review comments. pub async fn run_pr_review( &self, + tenant_id: &str, repo_id: &str, pr_number: u64, base_sha: &str, head_sha: &str, ) -> Result<(), crate::error::AgentError> { - let repo = self - .db + let db = self.db_pool.for_tenant_id(tenant_id).await?; + let repo = db .repositories() .find_one(mongodb::bson::doc! { "_id": mongodb::bson::oid::ObjectId::parse_str(repo_id) @@ -92,12 +91,8 @@ impl ComplianceAgent { crate::error::AgentError::Other(format!("Repository {repo_id} not found")) })?; - let orchestrator = PipelineOrchestrator::new( - self.config.clone(), - self.db.clone(), - self.llm.clone(), - self.http.clone(), - ); + let orchestrator = + PipelineOrchestrator::new(self.config.clone(), db, self.llm.clone(), self.http.clone()); orchestrator .run_pr_review(&repo, repo_id, pr_number, base_sha, head_sha) .await diff --git a/compliance-agent/src/api/handlers/repos.rs b/compliance-agent/src/api/handlers/repos.rs index f4b2397..5c58167 100644 --- a/compliance-agent/src/api/handlers/repos.rs +++ b/compliance-agent/src/api/handlers/repos.rs @@ -158,11 +158,16 @@ pub async fn get_ssh_public_key( #[tracing::instrument(skip_all, fields(repo_id = %id))] pub async fn trigger_scan( Extension(agent): AgentExt, + tenant: TenantCtx, Path(id): Path, ) -> Result, StatusCode> { let agent_clone = (*agent).clone(); + let tenant_id = tenant.0.tenant_id.clone(); tokio::spawn(async move { - if let Err(e) = agent_clone.run_scan(&id, ScanTrigger::Manual).await { + if let Err(e) = agent_clone + .run_scan(&tenant_id, &id, ScanTrigger::Manual) + .await + { tracing::error!("Manual scan failed for {id}: {e}"); } }); diff --git a/compliance-agent/src/database.rs b/compliance-agent/src/database.rs index 0d5e4f6..11d17fa 100644 --- a/compliance-agent/src/database.rs +++ b/compliance-agent/src/database.rs @@ -78,19 +78,28 @@ impl DatabasePool { /// first call per tenant (per process). Cheap on the hot path — /// subsequent calls skip the round-trip. pub async fn for_tenant(&self, ctx: &TenantContext) -> Result { - let db_name = self.tenant_db_name(&ctx.tenant_id); + self.for_tenant_id(&ctx.tenant_id).await + } + + /// Like [`Self::for_tenant`] but accepts a bare tenant_id. + /// For background paths (scheduler, webhooks, pipeline orchestrators) + /// that don't have a full [`TenantContext`] but know which tenant + /// they're operating on (typically resolved from a URL path, a job + /// argument, or the registry). + pub async fn for_tenant_id(&self, tenant_id: &str) -> Result { + let db_name = self.tenant_db_name(tenant_id); let db = Database::from_database(self.client.database(&db_name)); // `DashMap::insert` returns the previous value; `None` means we // were the first writer for this tenant_id and own the // index-ensure work. - if self.ensured.insert(ctx.tenant_id.clone(), ()).is_none() { + if self.ensured.insert(tenant_id.to_string(), ()).is_none() { if let Err(e) = db.ensure_indexes().await { // Roll the marker back so the next request retries. - self.ensured.remove(&ctx.tenant_id); + self.ensured.remove(tenant_id); return Err(e); } tracing::debug!( - tenant_id = %ctx.tenant_id, + tenant_id = %tenant_id, db_name = %db_name, "Indexes ensured for tenant database" ); diff --git a/compliance-agent/src/scheduler.rs b/compliance-agent/src/scheduler.rs index 2ce907e..81581a1 100644 --- a/compliance-agent/src/scheduler.rs +++ b/compliance-agent/src/scheduler.rs @@ -4,8 +4,14 @@ use tokio_cron_scheduler::{Job, JobScheduler}; use compliance_core::models::ScanTrigger; use crate::agent::ComplianceAgent; +use crate::database::Database; use crate::error::AgentError; +/// Default tenant the scheduler runs against when `SCHEDULER_TENANT_IDS` +/// isn't set. Matches the dev-injector default so a bare `cargo run` has +/// the scheduler scanning whatever lives in `_dev`. +const DEFAULT_SCHEDULER_TENANT_ID: &str = "dev"; + pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> { let sched = JobScheduler::new() .await @@ -18,7 +24,9 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> let agent = scan_agent.clone(); Box::pin(async move { tracing::info!("Scheduled scan triggered"); - scan_all_repos(&agent).await; + for tenant_id in scheduler_tenants() { + scan_all_repos(&agent, &tenant_id).await; + } }) }) .map_err(|e| AgentError::Scheduler(format!("Failed to create scan job: {e}")))?; @@ -34,7 +42,9 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> let agent = cve_agent.clone(); Box::pin(async move { tracing::info!("CVE monitor triggered"); - monitor_cves(&agent).await; + for tenant_id in scheduler_tenants() { + monitor_cves(&agent, &tenant_id).await; + } }) }) .map_err(|e| AgentError::Scheduler(format!("Failed to create CVE monitor job: {e}")))?; @@ -48,8 +58,9 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> .await .map_err(|e| AgentError::Scheduler(format!("Failed to start scheduler: {e}")))?; + let tenants = scheduler_tenants(); tracing::info!( - "Scheduler started: scans='{}', CVE monitor='{}'", + "Scheduler started: scans='{}', CVE monitor='{}', tenants={tenants:?}", agent.config.scan_schedule, agent.config.cve_monitor_schedule, ); @@ -60,13 +71,47 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> } } -async fn scan_all_repos(agent: &ComplianceAgent) { +/// Tenants the scheduler iterates each tick. From `SCHEDULER_TENANT_IDS` +/// (comma-separated), or `DEFAULT_SCHEDULER_TENANT_ID` if unset. M7.2-D +/// will replace this with a pull from the tenant-registry. +fn scheduler_tenants() -> Vec { + std::env::var("SCHEDULER_TENANT_IDS") + .ok() + .map(|s| { + s.split(',') + .map(str::trim) + .filter(|s| !s.is_empty()) + .map(String::from) + .collect::>() + }) + .filter(|v| !v.is_empty()) + .unwrap_or_else(|| vec![DEFAULT_SCHEDULER_TENANT_ID.to_string()]) +} + +/// Resolve the per-tenant database. Logs and returns `None` on failure +/// so the loop in the caller can continue with other tenants. +async fn tenant_db(agent: &ComplianceAgent, tenant_id: &str) -> Option { + match agent.db_pool.for_tenant_id(tenant_id).await { + Ok(db) => Some(db), + Err(e) => { + tracing::error!("Scheduler: cannot open tenant database '{tenant_id}': {e}"); + None + } + } +} + +async fn scan_all_repos(agent: &ComplianceAgent, tenant_id: &str) { use futures_util::StreamExt; - let cursor = match agent.db.repositories().find(doc! {}).await { + let db = match tenant_db(agent, tenant_id).await { + Some(db) => db, + None => return, + }; + + let cursor = match db.repositories().find(doc! {}).await { Ok(c) => c, Err(e) => { - tracing::error!("Failed to list repos for scheduled scan: {e}"); + tracing::error!("Failed to list repos for tenant '{tenant_id}': {e}"); return; } }; @@ -75,33 +120,44 @@ async fn scan_all_repos(agent: &ComplianceAgent) { for repo in repos { let repo_id = repo.id.map(|id| id.to_hex()).unwrap_or_default(); - if let Err(e) = agent.run_scan(&repo_id, ScanTrigger::Scheduled).await { - tracing::error!("Scheduled scan failed for {}: {e}", repo.name); + if let Err(e) = agent + .run_scan(tenant_id, &repo_id, ScanTrigger::Scheduled) + .await + { + tracing::error!( + "Scheduled scan failed for {} (tenant '{tenant_id}'): {e}", + repo.name + ); } } } -async fn monitor_cves(agent: &ComplianceAgent) { +async fn monitor_cves(agent: &ComplianceAgent, tenant_id: &str) { use compliance_core::models::notification::{parse_severity, CveNotification}; use compliance_core::models::SbomEntry; use futures_util::StreamExt; + let db = match tenant_db(agent, tenant_id).await { + Some(db) => db, + None => return, + }; + // Fetch all SBOM entries grouped by repo - let cursor = match agent.db.sbom_entries().find(doc! {}).await { + let cursor = match db.sbom_entries().find(doc! {}).await { Ok(c) => c, Err(e) => { - tracing::error!("CVE monitor: failed to list SBOM entries: {e}"); + tracing::error!("CVE monitor: failed to list SBOM entries for '{tenant_id}': {e}"); return; } }; let entries: Vec = cursor.filter_map(|r| async { r.ok() }).collect().await; if entries.is_empty() { - tracing::debug!("CVE monitor: no SBOM entries, skipping"); + tracing::debug!("CVE monitor: no SBOM entries for tenant '{tenant_id}', skipping"); return; } tracing::info!( - "CVE monitor: checking {} dependencies for new CVEs", + "CVE monitor: checking {} dependencies for new CVEs (tenant '{tenant_id}')", entries.len() ); @@ -112,7 +168,7 @@ async fn monitor_cves(agent: &ComplianceAgent) { std::collections::HashMap::new(); for rid in &repo_ids { if let Ok(oid) = mongodb::bson::oid::ObjectId::parse_str(rid) { - if let Ok(Some(repo)) = agent.db.repositories().find_one(doc! { "_id": oid }).await { + if let Ok(Some(repo)) = db.repositories().find_one(doc! { "_id": oid }).await { repo_names.insert(rid.clone(), repo.name.clone()); } } @@ -160,8 +216,7 @@ async fn monitor_cves(agent: &ComplianceAgent) { for alert in &alerts { let filter = doc! { "cve_id": &alert.cve_id, "repo_id": &alert.repo_id }; let update = doc! { "$setOnInsert": mongodb::bson::to_bson(alert).unwrap_or_default() }; - let _ = agent - .db + let _ = db .cve_alerts() .update_one(filter, update) .upsert(true) @@ -174,8 +229,7 @@ async fn monitor_cves(agent: &ComplianceAgent) { continue; } if let Some(entry_id) = &entry.id { - let _ = agent - .db + let _ = db .sbom_entries() .update_one( doc! { "_id": entry_id }, @@ -213,8 +267,7 @@ async fn monitor_cves(agent: &ComplianceAgent) { let update = doc! { "$setOnInsert": mongodb::bson::to_bson(¬ification).unwrap_or_default() }; - match agent - .db + match db .cve_notifications() .update_one(filter, update) .upsert(true) @@ -232,8 +285,10 @@ async fn monitor_cves(agent: &ComplianceAgent) { } if new_notifications > 0 { - tracing::info!("CVE monitor: created {new_notifications} new notification(s)"); + tracing::info!( + "CVE monitor: created {new_notifications} new notification(s) for tenant '{tenant_id}'" + ); } else { - tracing::info!("CVE monitor: no new CVEs found"); + tracing::info!("CVE monitor: no new CVEs found for tenant '{tenant_id}'"); } } diff --git a/compliance-agent/src/webhooks/gitea.rs b/compliance-agent/src/webhooks/gitea.rs index ec58b2c..a77c161 100644 --- a/compliance-agent/src/webhooks/gitea.rs +++ b/compliance-agent/src/webhooks/gitea.rs @@ -14,24 +14,30 @@ type HmacSha256 = Hmac; pub async fn handle_gitea_webhook( Extension(agent): Extension>, - Path(repo_id): Path, + Path((tenant_id, repo_id)): Path<(String, String)>, headers: HeaderMap, body: Bytes, ) -> StatusCode { - // Look up the repo to get its webhook secret + // Look up the repo in the tenant's database to get its webhook secret let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) { Ok(oid) => oid, Err(_) => return StatusCode::NOT_FOUND, }; - let repo = match agent - .db + let db = match agent.db_pool.for_tenant_id(&tenant_id).await { + Ok(db) => db, + Err(e) => { + tracing::warn!("Gitea webhook: cannot open tenant database '{tenant_id}': {e}"); + return StatusCode::NOT_FOUND; + } + }; + let repo = match db .repositories() .find_one(mongodb::bson::doc! { "_id": oid }) .await { Ok(Some(repo)) => repo, _ => { - tracing::warn!("Gitea webhook: repo {repo_id} not found"); + tracing::warn!("Gitea webhook: repo {repo_id} not found in tenant '{tenant_id}'"); return StatusCode::NOT_FOUND; } }; @@ -66,15 +72,21 @@ pub async fn handle_gitea_webhook( "push" => { let agent_clone = (*agent).clone(); let repo_id = repo_id.clone(); + let tenant_id = tenant_id.clone(); tokio::spawn(async move { - tracing::info!("Gitea push webhook: triggering scan for {repo_id}"); - if let Err(e) = agent_clone.run_scan(&repo_id, ScanTrigger::Webhook).await { + tracing::info!( + "Gitea push webhook: triggering scan for {repo_id} in tenant {tenant_id}" + ); + if let Err(e) = agent_clone + .run_scan(&tenant_id, &repo_id, ScanTrigger::Webhook) + .await + { tracing::error!("Webhook-triggered scan failed: {e}"); } }); StatusCode::OK } - "pull_request" => handle_pull_request(agent, &repo_id, &payload).await, + "pull_request" => handle_pull_request(agent, &tenant_id, &repo_id, &payload).await, _ => { tracing::debug!("Gitea webhook: ignoring event '{event}'"); StatusCode::OK @@ -84,6 +96,7 @@ pub async fn handle_gitea_webhook( async fn handle_pull_request( agent: Arc, + tenant_id: &str, repo_id: &str, payload: &serde_json::Value, ) -> StatusCode { @@ -106,13 +119,14 @@ async fn handle_pull_request( } let repo_id = repo_id.to_string(); + let tenant_id = tenant_id.to_string(); let head_sha = head_sha.to_string(); let base_sha = base_sha.to_string(); let agent_clone = (*agent).clone(); tokio::spawn(async move { tracing::info!("Gitea PR webhook: reviewing PR #{pr_number} on {repo_id}"); if let Err(e) = agent_clone - .run_pr_review(&repo_id, pr_number, &base_sha, &head_sha) + .run_pr_review(&tenant_id, &repo_id, pr_number, &base_sha, &head_sha) .await { tracing::error!("PR review failed for #{pr_number}: {e}"); diff --git a/compliance-agent/src/webhooks/github.rs b/compliance-agent/src/webhooks/github.rs index 7273714..a9b7048 100644 --- a/compliance-agent/src/webhooks/github.rs +++ b/compliance-agent/src/webhooks/github.rs @@ -14,24 +14,30 @@ type HmacSha256 = Hmac; pub async fn handle_github_webhook( Extension(agent): Extension>, - Path(repo_id): Path, + Path((tenant_id, repo_id)): Path<(String, String)>, headers: HeaderMap, body: Bytes, ) -> StatusCode { - // Look up the repo to get its webhook secret + // Look up the repo in the tenant's database to get its webhook secret let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) { Ok(oid) => oid, Err(_) => return StatusCode::NOT_FOUND, }; - let repo = match agent - .db + let db = match agent.db_pool.for_tenant_id(&tenant_id).await { + Ok(db) => db, + Err(e) => { + tracing::warn!("GitHub webhook: cannot open tenant database '{tenant_id}': {e}"); + return StatusCode::NOT_FOUND; + } + }; + let repo = match db .repositories() .find_one(mongodb::bson::doc! { "_id": oid }) .await { Ok(Some(repo)) => repo, _ => { - tracing::warn!("GitHub webhook: repo {repo_id} not found"); + tracing::warn!("GitHub webhook: repo {repo_id} not found in tenant '{tenant_id}'"); return StatusCode::NOT_FOUND; } }; @@ -66,15 +72,21 @@ pub async fn handle_github_webhook( "push" => { let agent_clone = (*agent).clone(); let repo_id = repo_id.clone(); + let tenant_id = tenant_id.clone(); tokio::spawn(async move { - tracing::info!("GitHub push webhook: triggering scan for {repo_id}"); - if let Err(e) = agent_clone.run_scan(&repo_id, ScanTrigger::Webhook).await { + tracing::info!( + "GitHub push webhook: triggering scan for {repo_id} in tenant {tenant_id}" + ); + if let Err(e) = agent_clone + .run_scan(&tenant_id, &repo_id, ScanTrigger::Webhook) + .await + { tracing::error!("Webhook-triggered scan failed: {e}"); } }); StatusCode::OK } - "pull_request" => handle_pull_request(agent, &repo_id, &payload).await, + "pull_request" => handle_pull_request(agent, &tenant_id, &repo_id, &payload).await, _ => { tracing::debug!("GitHub webhook: ignoring event '{event}'"); StatusCode::OK @@ -84,6 +96,7 @@ pub async fn handle_github_webhook( async fn handle_pull_request( agent: Arc, + tenant_id: &str, repo_id: &str, payload: &serde_json::Value, ) -> StatusCode { @@ -105,13 +118,14 @@ async fn handle_pull_request( } let repo_id = repo_id.to_string(); + let tenant_id = tenant_id.to_string(); let head_sha = head_sha.to_string(); let base_sha = base_sha.to_string(); let agent_clone = (*agent).clone(); tokio::spawn(async move { tracing::info!("GitHub PR webhook: reviewing PR #{pr_number} on {repo_id}"); if let Err(e) = agent_clone - .run_pr_review(&repo_id, pr_number, &base_sha, &head_sha) + .run_pr_review(&tenant_id, &repo_id, pr_number, &base_sha, &head_sha) .await { tracing::error!("PR review failed for #{pr_number}: {e}"); diff --git a/compliance-agent/src/webhooks/gitlab.rs b/compliance-agent/src/webhooks/gitlab.rs index b0a7219..c830a28 100644 --- a/compliance-agent/src/webhooks/gitlab.rs +++ b/compliance-agent/src/webhooks/gitlab.rs @@ -10,24 +10,30 @@ use crate::agent::ComplianceAgent; pub async fn handle_gitlab_webhook( Extension(agent): Extension>, - Path(repo_id): Path, + Path((tenant_id, repo_id)): Path<(String, String)>, headers: HeaderMap, body: Bytes, ) -> StatusCode { - // Look up the repo to get its webhook secret + // Look up the repo in the tenant's database to get its webhook secret let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) { Ok(oid) => oid, Err(_) => return StatusCode::NOT_FOUND, }; - let repo = match agent - .db + let db = match agent.db_pool.for_tenant_id(&tenant_id).await { + Ok(db) => db, + Err(e) => { + tracing::warn!("GitLab webhook: cannot open tenant database '{tenant_id}': {e}"); + return StatusCode::NOT_FOUND; + } + }; + let repo = match db .repositories() .find_one(mongodb::bson::doc! { "_id": oid }) .await { Ok(Some(repo)) => repo, _ => { - tracing::warn!("GitLab webhook: repo {repo_id} not found"); + tracing::warn!("GitLab webhook: repo {repo_id} not found in tenant '{tenant_id}'"); return StatusCode::NOT_FOUND; } }; @@ -59,15 +65,21 @@ pub async fn handle_gitlab_webhook( "push" => { let agent_clone = (*agent).clone(); let repo_id = repo_id.clone(); + let tenant_id = tenant_id.clone(); tokio::spawn(async move { - tracing::info!("GitLab push webhook: triggering scan for {repo_id}"); - if let Err(e) = agent_clone.run_scan(&repo_id, ScanTrigger::Webhook).await { + tracing::info!( + "GitLab push webhook: triggering scan for {repo_id} in tenant {tenant_id}" + ); + if let Err(e) = agent_clone + .run_scan(&tenant_id, &repo_id, ScanTrigger::Webhook) + .await + { tracing::error!("Webhook-triggered scan failed: {e}"); } }); StatusCode::OK } - "merge_request" => handle_merge_request(agent, &repo_id, &payload).await, + "merge_request" => handle_merge_request(agent, &tenant_id, &repo_id, &payload).await, _ => { tracing::debug!("GitLab webhook: ignoring event '{event_type}'"); StatusCode::OK @@ -77,6 +89,7 @@ pub async fn handle_gitlab_webhook( async fn handle_merge_request( agent: Arc, + tenant_id: &str, repo_id: &str, payload: &serde_json::Value, ) -> StatusCode { @@ -101,13 +114,14 @@ async fn handle_merge_request( } let repo_id = repo_id.to_string(); + let tenant_id = tenant_id.to_string(); let head_sha = head_sha.to_string(); let base_sha = base_sha.to_string(); let agent_clone = (*agent).clone(); tokio::spawn(async move { tracing::info!("GitLab MR webhook: reviewing MR !{mr_iid} on {repo_id}"); if let Err(e) = agent_clone - .run_pr_review(&repo_id, mr_iid, &base_sha, &head_sha) + .run_pr_review(&tenant_id, &repo_id, mr_iid, &base_sha, &head_sha) .await { tracing::error!("MR review failed for !{mr_iid}: {e}"); diff --git a/compliance-agent/src/webhooks/server.rs b/compliance-agent/src/webhooks/server.rs index b90f860..6e30084 100644 --- a/compliance-agent/src/webhooks/server.rs +++ b/compliance-agent/src/webhooks/server.rs @@ -9,17 +9,21 @@ use crate::webhooks::{gitea, github, gitlab}; pub async fn start_webhook_server(agent: &ComplianceAgent) -> Result<(), AgentError> { let app = Router::new() - // Per-repo webhook URLs: /webhook/{platform}/{repo_id} + // Per-tenant per-repo webhook URLs: /webhook/{tenant_id}/{platform}/{repo_id} + // The tenant_id is resolved from the URL path because webhooks + // arrive without a JWT — they're authenticated via per-repo HMAC, + // not via the tenant gate. The dashboard surfaces the full URL + // including the tenant_id when the repo is registered. .route( - "/webhook/github/{repo_id}", + "/webhook/{tenant_id}/github/{repo_id}", post(github::handle_github_webhook), ) .route( - "/webhook/gitlab/{repo_id}", + "/webhook/{tenant_id}/gitlab/{repo_id}", post(gitlab::handle_gitlab_webhook), ) .route( - "/webhook/gitea/{repo_id}", + "/webhook/{tenant_id}/gitea/{repo_id}", post(gitea::handle_gitea_webhook), ) .layer(Extension(Arc::new(agent.clone()))); -- 2.52.0