Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0f6dd1135e |
@@ -60,28 +60,27 @@ impl ComplianceAgent {
|
|||||||
|
|
||||||
pub async fn run_scan(
|
pub async fn run_scan(
|
||||||
&self,
|
&self,
|
||||||
|
tenant_id: &str,
|
||||||
repo_id: &str,
|
repo_id: &str,
|
||||||
trigger: compliance_core::models::ScanTrigger,
|
trigger: compliance_core::models::ScanTrigger,
|
||||||
) -> Result<(), crate::error::AgentError> {
|
) -> Result<(), crate::error::AgentError> {
|
||||||
let orchestrator = PipelineOrchestrator::new(
|
let db = self.db_pool.for_tenant_id(tenant_id).await?;
|
||||||
self.config.clone(),
|
let orchestrator =
|
||||||
self.db.clone(),
|
PipelineOrchestrator::new(self.config.clone(), db, self.llm.clone(), self.http.clone());
|
||||||
self.llm.clone(),
|
|
||||||
self.http.clone(),
|
|
||||||
);
|
|
||||||
orchestrator.run(repo_id, trigger).await
|
orchestrator.run(repo_id, trigger).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Run a PR review: scan the diff and post review comments.
|
/// Run a PR review: scan the diff and post review comments.
|
||||||
pub async fn run_pr_review(
|
pub async fn run_pr_review(
|
||||||
&self,
|
&self,
|
||||||
|
tenant_id: &str,
|
||||||
repo_id: &str,
|
repo_id: &str,
|
||||||
pr_number: u64,
|
pr_number: u64,
|
||||||
base_sha: &str,
|
base_sha: &str,
|
||||||
head_sha: &str,
|
head_sha: &str,
|
||||||
) -> Result<(), crate::error::AgentError> {
|
) -> Result<(), crate::error::AgentError> {
|
||||||
let repo = self
|
let db = self.db_pool.for_tenant_id(tenant_id).await?;
|
||||||
.db
|
let repo = db
|
||||||
.repositories()
|
.repositories()
|
||||||
.find_one(mongodb::bson::doc! {
|
.find_one(mongodb::bson::doc! {
|
||||||
"_id": mongodb::bson::oid::ObjectId::parse_str(repo_id)
|
"_id": mongodb::bson::oid::ObjectId::parse_str(repo_id)
|
||||||
@@ -92,12 +91,8 @@ impl ComplianceAgent {
|
|||||||
crate::error::AgentError::Other(format!("Repository {repo_id} not found"))
|
crate::error::AgentError::Other(format!("Repository {repo_id} not found"))
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let orchestrator = PipelineOrchestrator::new(
|
let orchestrator =
|
||||||
self.config.clone(),
|
PipelineOrchestrator::new(self.config.clone(), db, self.llm.clone(), self.http.clone());
|
||||||
self.db.clone(),
|
|
||||||
self.llm.clone(),
|
|
||||||
self.http.clone(),
|
|
||||||
);
|
|
||||||
orchestrator
|
orchestrator
|
||||||
.run_pr_review(&repo, repo_id, pr_number, base_sha, head_sha)
|
.run_pr_review(&repo, repo_id, pr_number, base_sha, head_sha)
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -158,11 +158,16 @@ pub async fn get_ssh_public_key(
|
|||||||
#[tracing::instrument(skip_all, fields(repo_id = %id))]
|
#[tracing::instrument(skip_all, fields(repo_id = %id))]
|
||||||
pub async fn trigger_scan(
|
pub async fn trigger_scan(
|
||||||
Extension(agent): AgentExt,
|
Extension(agent): AgentExt,
|
||||||
|
tenant: TenantCtx,
|
||||||
Path(id): Path<String>,
|
Path(id): Path<String>,
|
||||||
) -> Result<Json<serde_json::Value>, StatusCode> {
|
) -> Result<Json<serde_json::Value>, StatusCode> {
|
||||||
let agent_clone = (*agent).clone();
|
let agent_clone = (*agent).clone();
|
||||||
|
let tenant_id = tenant.0.tenant_id.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = agent_clone.run_scan(&id, ScanTrigger::Manual).await {
|
if let Err(e) = agent_clone
|
||||||
|
.run_scan(&tenant_id, &id, ScanTrigger::Manual)
|
||||||
|
.await
|
||||||
|
{
|
||||||
tracing::error!("Manual scan failed for {id}: {e}");
|
tracing::error!("Manual scan failed for {id}: {e}");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -78,19 +78,28 @@ impl DatabasePool {
|
|||||||
/// first call per tenant (per process). Cheap on the hot path —
|
/// first call per tenant (per process). Cheap on the hot path —
|
||||||
/// subsequent calls skip the round-trip.
|
/// subsequent calls skip the round-trip.
|
||||||
pub async fn for_tenant(&self, ctx: &TenantContext) -> Result<Database, AgentError> {
|
pub async fn for_tenant(&self, ctx: &TenantContext) -> Result<Database, AgentError> {
|
||||||
let db_name = self.tenant_db_name(&ctx.tenant_id);
|
self.for_tenant_id(&ctx.tenant_id).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Like [`Self::for_tenant`] but accepts a bare tenant_id.
|
||||||
|
/// For background paths (scheduler, webhooks, pipeline orchestrators)
|
||||||
|
/// that don't have a full [`TenantContext`] but know which tenant
|
||||||
|
/// they're operating on (typically resolved from a URL path, a job
|
||||||
|
/// argument, or the registry).
|
||||||
|
pub async fn for_tenant_id(&self, tenant_id: &str) -> Result<Database, AgentError> {
|
||||||
|
let db_name = self.tenant_db_name(tenant_id);
|
||||||
let db = Database::from_database(self.client.database(&db_name));
|
let db = Database::from_database(self.client.database(&db_name));
|
||||||
// `DashMap::insert` returns the previous value; `None` means we
|
// `DashMap::insert` returns the previous value; `None` means we
|
||||||
// were the first writer for this tenant_id and own the
|
// were the first writer for this tenant_id and own the
|
||||||
// index-ensure work.
|
// index-ensure work.
|
||||||
if self.ensured.insert(ctx.tenant_id.clone(), ()).is_none() {
|
if self.ensured.insert(tenant_id.to_string(), ()).is_none() {
|
||||||
if let Err(e) = db.ensure_indexes().await {
|
if let Err(e) = db.ensure_indexes().await {
|
||||||
// Roll the marker back so the next request retries.
|
// Roll the marker back so the next request retries.
|
||||||
self.ensured.remove(&ctx.tenant_id);
|
self.ensured.remove(tenant_id);
|
||||||
return Err(e);
|
return Err(e);
|
||||||
}
|
}
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
tenant_id = %ctx.tenant_id,
|
tenant_id = %tenant_id,
|
||||||
db_name = %db_name,
|
db_name = %db_name,
|
||||||
"Indexes ensured for tenant database"
|
"Indexes ensured for tenant database"
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -4,8 +4,14 @@ use tokio_cron_scheduler::{Job, JobScheduler};
|
|||||||
use compliance_core::models::ScanTrigger;
|
use compliance_core::models::ScanTrigger;
|
||||||
|
|
||||||
use crate::agent::ComplianceAgent;
|
use crate::agent::ComplianceAgent;
|
||||||
|
use crate::database::Database;
|
||||||
use crate::error::AgentError;
|
use crate::error::AgentError;
|
||||||
|
|
||||||
|
/// Default tenant the scheduler runs against when `SCHEDULER_TENANT_IDS`
|
||||||
|
/// isn't set. Matches the dev-injector default so a bare `cargo run` has
|
||||||
|
/// the scheduler scanning whatever lives in `<prefix>_dev`.
|
||||||
|
const DEFAULT_SCHEDULER_TENANT_ID: &str = "dev";
|
||||||
|
|
||||||
pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> {
|
pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> {
|
||||||
let sched = JobScheduler::new()
|
let sched = JobScheduler::new()
|
||||||
.await
|
.await
|
||||||
@@ -18,7 +24,9 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
|
|||||||
let agent = scan_agent.clone();
|
let agent = scan_agent.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
tracing::info!("Scheduled scan triggered");
|
tracing::info!("Scheduled scan triggered");
|
||||||
scan_all_repos(&agent).await;
|
for tenant_id in scheduler_tenants() {
|
||||||
|
scan_all_repos(&agent, &tenant_id).await;
|
||||||
|
}
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.map_err(|e| AgentError::Scheduler(format!("Failed to create scan job: {e}")))?;
|
.map_err(|e| AgentError::Scheduler(format!("Failed to create scan job: {e}")))?;
|
||||||
@@ -34,7 +42,9 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
|
|||||||
let agent = cve_agent.clone();
|
let agent = cve_agent.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
tracing::info!("CVE monitor triggered");
|
tracing::info!("CVE monitor triggered");
|
||||||
monitor_cves(&agent).await;
|
for tenant_id in scheduler_tenants() {
|
||||||
|
monitor_cves(&agent, &tenant_id).await;
|
||||||
|
}
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.map_err(|e| AgentError::Scheduler(format!("Failed to create CVE monitor job: {e}")))?;
|
.map_err(|e| AgentError::Scheduler(format!("Failed to create CVE monitor job: {e}")))?;
|
||||||
@@ -48,8 +58,9 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| AgentError::Scheduler(format!("Failed to start scheduler: {e}")))?;
|
.map_err(|e| AgentError::Scheduler(format!("Failed to start scheduler: {e}")))?;
|
||||||
|
|
||||||
|
let tenants = scheduler_tenants();
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"Scheduler started: scans='{}', CVE monitor='{}'",
|
"Scheduler started: scans='{}', CVE monitor='{}', tenants={tenants:?}",
|
||||||
agent.config.scan_schedule,
|
agent.config.scan_schedule,
|
||||||
agent.config.cve_monitor_schedule,
|
agent.config.cve_monitor_schedule,
|
||||||
);
|
);
|
||||||
@@ -60,13 +71,47 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn scan_all_repos(agent: &ComplianceAgent) {
|
/// Tenants the scheduler iterates each tick. From `SCHEDULER_TENANT_IDS`
|
||||||
|
/// (comma-separated), or `DEFAULT_SCHEDULER_TENANT_ID` if unset. M7.2-D
|
||||||
|
/// will replace this with a pull from the tenant-registry.
|
||||||
|
fn scheduler_tenants() -> Vec<String> {
|
||||||
|
std::env::var("SCHEDULER_TENANT_IDS")
|
||||||
|
.ok()
|
||||||
|
.map(|s| {
|
||||||
|
s.split(',')
|
||||||
|
.map(str::trim)
|
||||||
|
.filter(|s| !s.is_empty())
|
||||||
|
.map(String::from)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
})
|
||||||
|
.filter(|v| !v.is_empty())
|
||||||
|
.unwrap_or_else(|| vec![DEFAULT_SCHEDULER_TENANT_ID.to_string()])
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Resolve the per-tenant database. Logs and returns `None` on failure
|
||||||
|
/// so the loop in the caller can continue with other tenants.
|
||||||
|
async fn tenant_db(agent: &ComplianceAgent, tenant_id: &str) -> Option<Database> {
|
||||||
|
match agent.db_pool.for_tenant_id(tenant_id).await {
|
||||||
|
Ok(db) => Some(db),
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Scheduler: cannot open tenant database '{tenant_id}': {e}");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn scan_all_repos(agent: &ComplianceAgent, tenant_id: &str) {
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
|
|
||||||
let cursor = match agent.db.repositories().find(doc! {}).await {
|
let db = match tenant_db(agent, tenant_id).await {
|
||||||
|
Some(db) => db,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
|
||||||
|
let cursor = match db.repositories().find(doc! {}).await {
|
||||||
Ok(c) => c,
|
Ok(c) => c,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("Failed to list repos for scheduled scan: {e}");
|
tracing::error!("Failed to list repos for tenant '{tenant_id}': {e}");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -75,33 +120,44 @@ async fn scan_all_repos(agent: &ComplianceAgent) {
|
|||||||
|
|
||||||
for repo in repos {
|
for repo in repos {
|
||||||
let repo_id = repo.id.map(|id| id.to_hex()).unwrap_or_default();
|
let repo_id = repo.id.map(|id| id.to_hex()).unwrap_or_default();
|
||||||
if let Err(e) = agent.run_scan(&repo_id, ScanTrigger::Scheduled).await {
|
if let Err(e) = agent
|
||||||
tracing::error!("Scheduled scan failed for {}: {e}", repo.name);
|
.run_scan(tenant_id, &repo_id, ScanTrigger::Scheduled)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!(
|
||||||
|
"Scheduled scan failed for {} (tenant '{tenant_id}'): {e}",
|
||||||
|
repo.name
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn monitor_cves(agent: &ComplianceAgent) {
|
async fn monitor_cves(agent: &ComplianceAgent, tenant_id: &str) {
|
||||||
use compliance_core::models::notification::{parse_severity, CveNotification};
|
use compliance_core::models::notification::{parse_severity, CveNotification};
|
||||||
use compliance_core::models::SbomEntry;
|
use compliance_core::models::SbomEntry;
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
|
|
||||||
|
let db = match tenant_db(agent, tenant_id).await {
|
||||||
|
Some(db) => db,
|
||||||
|
None => return,
|
||||||
|
};
|
||||||
|
|
||||||
// Fetch all SBOM entries grouped by repo
|
// Fetch all SBOM entries grouped by repo
|
||||||
let cursor = match agent.db.sbom_entries().find(doc! {}).await {
|
let cursor = match db.sbom_entries().find(doc! {}).await {
|
||||||
Ok(c) => c,
|
Ok(c) => c,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("CVE monitor: failed to list SBOM entries: {e}");
|
tracing::error!("CVE monitor: failed to list SBOM entries for '{tenant_id}': {e}");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let entries: Vec<SbomEntry> = cursor.filter_map(|r| async { r.ok() }).collect().await;
|
let entries: Vec<SbomEntry> = cursor.filter_map(|r| async { r.ok() }).collect().await;
|
||||||
if entries.is_empty() {
|
if entries.is_empty() {
|
||||||
tracing::debug!("CVE monitor: no SBOM entries, skipping");
|
tracing::debug!("CVE monitor: no SBOM entries for tenant '{tenant_id}', skipping");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"CVE monitor: checking {} dependencies for new CVEs",
|
"CVE monitor: checking {} dependencies for new CVEs (tenant '{tenant_id}')",
|
||||||
entries.len()
|
entries.len()
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -112,7 +168,7 @@ async fn monitor_cves(agent: &ComplianceAgent) {
|
|||||||
std::collections::HashMap::new();
|
std::collections::HashMap::new();
|
||||||
for rid in &repo_ids {
|
for rid in &repo_ids {
|
||||||
if let Ok(oid) = mongodb::bson::oid::ObjectId::parse_str(rid) {
|
if let Ok(oid) = mongodb::bson::oid::ObjectId::parse_str(rid) {
|
||||||
if let Ok(Some(repo)) = agent.db.repositories().find_one(doc! { "_id": oid }).await {
|
if let Ok(Some(repo)) = db.repositories().find_one(doc! { "_id": oid }).await {
|
||||||
repo_names.insert(rid.clone(), repo.name.clone());
|
repo_names.insert(rid.clone(), repo.name.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -160,8 +216,7 @@ async fn monitor_cves(agent: &ComplianceAgent) {
|
|||||||
for alert in &alerts {
|
for alert in &alerts {
|
||||||
let filter = doc! { "cve_id": &alert.cve_id, "repo_id": &alert.repo_id };
|
let filter = doc! { "cve_id": &alert.cve_id, "repo_id": &alert.repo_id };
|
||||||
let update = doc! { "$setOnInsert": mongodb::bson::to_bson(alert).unwrap_or_default() };
|
let update = doc! { "$setOnInsert": mongodb::bson::to_bson(alert).unwrap_or_default() };
|
||||||
let _ = agent
|
let _ = db
|
||||||
.db
|
|
||||||
.cve_alerts()
|
.cve_alerts()
|
||||||
.update_one(filter, update)
|
.update_one(filter, update)
|
||||||
.upsert(true)
|
.upsert(true)
|
||||||
@@ -174,8 +229,7 @@ async fn monitor_cves(agent: &ComplianceAgent) {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if let Some(entry_id) = &entry.id {
|
if let Some(entry_id) = &entry.id {
|
||||||
let _ = agent
|
let _ = db
|
||||||
.db
|
|
||||||
.sbom_entries()
|
.sbom_entries()
|
||||||
.update_one(
|
.update_one(
|
||||||
doc! { "_id": entry_id },
|
doc! { "_id": entry_id },
|
||||||
@@ -213,8 +267,7 @@ async fn monitor_cves(agent: &ComplianceAgent) {
|
|||||||
let update = doc! {
|
let update = doc! {
|
||||||
"$setOnInsert": mongodb::bson::to_bson(¬ification).unwrap_or_default()
|
"$setOnInsert": mongodb::bson::to_bson(¬ification).unwrap_or_default()
|
||||||
};
|
};
|
||||||
match agent
|
match db
|
||||||
.db
|
|
||||||
.cve_notifications()
|
.cve_notifications()
|
||||||
.update_one(filter, update)
|
.update_one(filter, update)
|
||||||
.upsert(true)
|
.upsert(true)
|
||||||
@@ -232,8 +285,10 @@ async fn monitor_cves(agent: &ComplianceAgent) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if new_notifications > 0 {
|
if new_notifications > 0 {
|
||||||
tracing::info!("CVE monitor: created {new_notifications} new notification(s)");
|
tracing::info!(
|
||||||
|
"CVE monitor: created {new_notifications} new notification(s) for tenant '{tenant_id}'"
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
tracing::info!("CVE monitor: no new CVEs found");
|
tracing::info!("CVE monitor: no new CVEs found for tenant '{tenant_id}'");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,24 +14,30 @@ type HmacSha256 = Hmac<Sha256>;
|
|||||||
|
|
||||||
pub async fn handle_gitea_webhook(
|
pub async fn handle_gitea_webhook(
|
||||||
Extension(agent): Extension<Arc<ComplianceAgent>>,
|
Extension(agent): Extension<Arc<ComplianceAgent>>,
|
||||||
Path(repo_id): Path<String>,
|
Path((tenant_id, repo_id)): Path<(String, String)>,
|
||||||
headers: HeaderMap,
|
headers: HeaderMap,
|
||||||
body: Bytes,
|
body: Bytes,
|
||||||
) -> StatusCode {
|
) -> StatusCode {
|
||||||
// Look up the repo to get its webhook secret
|
// Look up the repo in the tenant's database to get its webhook secret
|
||||||
let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) {
|
let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) {
|
||||||
Ok(oid) => oid,
|
Ok(oid) => oid,
|
||||||
Err(_) => return StatusCode::NOT_FOUND,
|
Err(_) => return StatusCode::NOT_FOUND,
|
||||||
};
|
};
|
||||||
let repo = match agent
|
let db = match agent.db_pool.for_tenant_id(&tenant_id).await {
|
||||||
.db
|
Ok(db) => db,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("Gitea webhook: cannot open tenant database '{tenant_id}': {e}");
|
||||||
|
return StatusCode::NOT_FOUND;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let repo = match db
|
||||||
.repositories()
|
.repositories()
|
||||||
.find_one(mongodb::bson::doc! { "_id": oid })
|
.find_one(mongodb::bson::doc! { "_id": oid })
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(Some(repo)) => repo,
|
Ok(Some(repo)) => repo,
|
||||||
_ => {
|
_ => {
|
||||||
tracing::warn!("Gitea webhook: repo {repo_id} not found");
|
tracing::warn!("Gitea webhook: repo {repo_id} not found in tenant '{tenant_id}'");
|
||||||
return StatusCode::NOT_FOUND;
|
return StatusCode::NOT_FOUND;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -66,15 +72,21 @@ pub async fn handle_gitea_webhook(
|
|||||||
"push" => {
|
"push" => {
|
||||||
let agent_clone = (*agent).clone();
|
let agent_clone = (*agent).clone();
|
||||||
let repo_id = repo_id.clone();
|
let repo_id = repo_id.clone();
|
||||||
|
let tenant_id = tenant_id.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
tracing::info!("Gitea push webhook: triggering scan for {repo_id}");
|
tracing::info!(
|
||||||
if let Err(e) = agent_clone.run_scan(&repo_id, ScanTrigger::Webhook).await {
|
"Gitea push webhook: triggering scan for {repo_id} in tenant {tenant_id}"
|
||||||
|
);
|
||||||
|
if let Err(e) = agent_clone
|
||||||
|
.run_scan(&tenant_id, &repo_id, ScanTrigger::Webhook)
|
||||||
|
.await
|
||||||
|
{
|
||||||
tracing::error!("Webhook-triggered scan failed: {e}");
|
tracing::error!("Webhook-triggered scan failed: {e}");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
StatusCode::OK
|
StatusCode::OK
|
||||||
}
|
}
|
||||||
"pull_request" => handle_pull_request(agent, &repo_id, &payload).await,
|
"pull_request" => handle_pull_request(agent, &tenant_id, &repo_id, &payload).await,
|
||||||
_ => {
|
_ => {
|
||||||
tracing::debug!("Gitea webhook: ignoring event '{event}'");
|
tracing::debug!("Gitea webhook: ignoring event '{event}'");
|
||||||
StatusCode::OK
|
StatusCode::OK
|
||||||
@@ -84,6 +96,7 @@ pub async fn handle_gitea_webhook(
|
|||||||
|
|
||||||
async fn handle_pull_request(
|
async fn handle_pull_request(
|
||||||
agent: Arc<ComplianceAgent>,
|
agent: Arc<ComplianceAgent>,
|
||||||
|
tenant_id: &str,
|
||||||
repo_id: &str,
|
repo_id: &str,
|
||||||
payload: &serde_json::Value,
|
payload: &serde_json::Value,
|
||||||
) -> StatusCode {
|
) -> StatusCode {
|
||||||
@@ -106,13 +119,14 @@ async fn handle_pull_request(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let repo_id = repo_id.to_string();
|
let repo_id = repo_id.to_string();
|
||||||
|
let tenant_id = tenant_id.to_string();
|
||||||
let head_sha = head_sha.to_string();
|
let head_sha = head_sha.to_string();
|
||||||
let base_sha = base_sha.to_string();
|
let base_sha = base_sha.to_string();
|
||||||
let agent_clone = (*agent).clone();
|
let agent_clone = (*agent).clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
tracing::info!("Gitea PR webhook: reviewing PR #{pr_number} on {repo_id}");
|
tracing::info!("Gitea PR webhook: reviewing PR #{pr_number} on {repo_id}");
|
||||||
if let Err(e) = agent_clone
|
if let Err(e) = agent_clone
|
||||||
.run_pr_review(&repo_id, pr_number, &base_sha, &head_sha)
|
.run_pr_review(&tenant_id, &repo_id, pr_number, &base_sha, &head_sha)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
tracing::error!("PR review failed for #{pr_number}: {e}");
|
tracing::error!("PR review failed for #{pr_number}: {e}");
|
||||||
|
|||||||
@@ -14,24 +14,30 @@ type HmacSha256 = Hmac<Sha256>;
|
|||||||
|
|
||||||
pub async fn handle_github_webhook(
|
pub async fn handle_github_webhook(
|
||||||
Extension(agent): Extension<Arc<ComplianceAgent>>,
|
Extension(agent): Extension<Arc<ComplianceAgent>>,
|
||||||
Path(repo_id): Path<String>,
|
Path((tenant_id, repo_id)): Path<(String, String)>,
|
||||||
headers: HeaderMap,
|
headers: HeaderMap,
|
||||||
body: Bytes,
|
body: Bytes,
|
||||||
) -> StatusCode {
|
) -> StatusCode {
|
||||||
// Look up the repo to get its webhook secret
|
// Look up the repo in the tenant's database to get its webhook secret
|
||||||
let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) {
|
let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) {
|
||||||
Ok(oid) => oid,
|
Ok(oid) => oid,
|
||||||
Err(_) => return StatusCode::NOT_FOUND,
|
Err(_) => return StatusCode::NOT_FOUND,
|
||||||
};
|
};
|
||||||
let repo = match agent
|
let db = match agent.db_pool.for_tenant_id(&tenant_id).await {
|
||||||
.db
|
Ok(db) => db,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("GitHub webhook: cannot open tenant database '{tenant_id}': {e}");
|
||||||
|
return StatusCode::NOT_FOUND;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let repo = match db
|
||||||
.repositories()
|
.repositories()
|
||||||
.find_one(mongodb::bson::doc! { "_id": oid })
|
.find_one(mongodb::bson::doc! { "_id": oid })
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(Some(repo)) => repo,
|
Ok(Some(repo)) => repo,
|
||||||
_ => {
|
_ => {
|
||||||
tracing::warn!("GitHub webhook: repo {repo_id} not found");
|
tracing::warn!("GitHub webhook: repo {repo_id} not found in tenant '{tenant_id}'");
|
||||||
return StatusCode::NOT_FOUND;
|
return StatusCode::NOT_FOUND;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -66,15 +72,21 @@ pub async fn handle_github_webhook(
|
|||||||
"push" => {
|
"push" => {
|
||||||
let agent_clone = (*agent).clone();
|
let agent_clone = (*agent).clone();
|
||||||
let repo_id = repo_id.clone();
|
let repo_id = repo_id.clone();
|
||||||
|
let tenant_id = tenant_id.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
tracing::info!("GitHub push webhook: triggering scan for {repo_id}");
|
tracing::info!(
|
||||||
if let Err(e) = agent_clone.run_scan(&repo_id, ScanTrigger::Webhook).await {
|
"GitHub push webhook: triggering scan for {repo_id} in tenant {tenant_id}"
|
||||||
|
);
|
||||||
|
if let Err(e) = agent_clone
|
||||||
|
.run_scan(&tenant_id, &repo_id, ScanTrigger::Webhook)
|
||||||
|
.await
|
||||||
|
{
|
||||||
tracing::error!("Webhook-triggered scan failed: {e}");
|
tracing::error!("Webhook-triggered scan failed: {e}");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
StatusCode::OK
|
StatusCode::OK
|
||||||
}
|
}
|
||||||
"pull_request" => handle_pull_request(agent, &repo_id, &payload).await,
|
"pull_request" => handle_pull_request(agent, &tenant_id, &repo_id, &payload).await,
|
||||||
_ => {
|
_ => {
|
||||||
tracing::debug!("GitHub webhook: ignoring event '{event}'");
|
tracing::debug!("GitHub webhook: ignoring event '{event}'");
|
||||||
StatusCode::OK
|
StatusCode::OK
|
||||||
@@ -84,6 +96,7 @@ pub async fn handle_github_webhook(
|
|||||||
|
|
||||||
async fn handle_pull_request(
|
async fn handle_pull_request(
|
||||||
agent: Arc<ComplianceAgent>,
|
agent: Arc<ComplianceAgent>,
|
||||||
|
tenant_id: &str,
|
||||||
repo_id: &str,
|
repo_id: &str,
|
||||||
payload: &serde_json::Value,
|
payload: &serde_json::Value,
|
||||||
) -> StatusCode {
|
) -> StatusCode {
|
||||||
@@ -105,13 +118,14 @@ async fn handle_pull_request(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let repo_id = repo_id.to_string();
|
let repo_id = repo_id.to_string();
|
||||||
|
let tenant_id = tenant_id.to_string();
|
||||||
let head_sha = head_sha.to_string();
|
let head_sha = head_sha.to_string();
|
||||||
let base_sha = base_sha.to_string();
|
let base_sha = base_sha.to_string();
|
||||||
let agent_clone = (*agent).clone();
|
let agent_clone = (*agent).clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
tracing::info!("GitHub PR webhook: reviewing PR #{pr_number} on {repo_id}");
|
tracing::info!("GitHub PR webhook: reviewing PR #{pr_number} on {repo_id}");
|
||||||
if let Err(e) = agent_clone
|
if let Err(e) = agent_clone
|
||||||
.run_pr_review(&repo_id, pr_number, &base_sha, &head_sha)
|
.run_pr_review(&tenant_id, &repo_id, pr_number, &base_sha, &head_sha)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
tracing::error!("PR review failed for #{pr_number}: {e}");
|
tracing::error!("PR review failed for #{pr_number}: {e}");
|
||||||
|
|||||||
@@ -10,24 +10,30 @@ use crate::agent::ComplianceAgent;
|
|||||||
|
|
||||||
pub async fn handle_gitlab_webhook(
|
pub async fn handle_gitlab_webhook(
|
||||||
Extension(agent): Extension<Arc<ComplianceAgent>>,
|
Extension(agent): Extension<Arc<ComplianceAgent>>,
|
||||||
Path(repo_id): Path<String>,
|
Path((tenant_id, repo_id)): Path<(String, String)>,
|
||||||
headers: HeaderMap,
|
headers: HeaderMap,
|
||||||
body: Bytes,
|
body: Bytes,
|
||||||
) -> StatusCode {
|
) -> StatusCode {
|
||||||
// Look up the repo to get its webhook secret
|
// Look up the repo in the tenant's database to get its webhook secret
|
||||||
let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) {
|
let oid = match mongodb::bson::oid::ObjectId::parse_str(&repo_id) {
|
||||||
Ok(oid) => oid,
|
Ok(oid) => oid,
|
||||||
Err(_) => return StatusCode::NOT_FOUND,
|
Err(_) => return StatusCode::NOT_FOUND,
|
||||||
};
|
};
|
||||||
let repo = match agent
|
let db = match agent.db_pool.for_tenant_id(&tenant_id).await {
|
||||||
.db
|
Ok(db) => db,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!("GitLab webhook: cannot open tenant database '{tenant_id}': {e}");
|
||||||
|
return StatusCode::NOT_FOUND;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let repo = match db
|
||||||
.repositories()
|
.repositories()
|
||||||
.find_one(mongodb::bson::doc! { "_id": oid })
|
.find_one(mongodb::bson::doc! { "_id": oid })
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(Some(repo)) => repo,
|
Ok(Some(repo)) => repo,
|
||||||
_ => {
|
_ => {
|
||||||
tracing::warn!("GitLab webhook: repo {repo_id} not found");
|
tracing::warn!("GitLab webhook: repo {repo_id} not found in tenant '{tenant_id}'");
|
||||||
return StatusCode::NOT_FOUND;
|
return StatusCode::NOT_FOUND;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -59,15 +65,21 @@ pub async fn handle_gitlab_webhook(
|
|||||||
"push" => {
|
"push" => {
|
||||||
let agent_clone = (*agent).clone();
|
let agent_clone = (*agent).clone();
|
||||||
let repo_id = repo_id.clone();
|
let repo_id = repo_id.clone();
|
||||||
|
let tenant_id = tenant_id.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
tracing::info!("GitLab push webhook: triggering scan for {repo_id}");
|
tracing::info!(
|
||||||
if let Err(e) = agent_clone.run_scan(&repo_id, ScanTrigger::Webhook).await {
|
"GitLab push webhook: triggering scan for {repo_id} in tenant {tenant_id}"
|
||||||
|
);
|
||||||
|
if let Err(e) = agent_clone
|
||||||
|
.run_scan(&tenant_id, &repo_id, ScanTrigger::Webhook)
|
||||||
|
.await
|
||||||
|
{
|
||||||
tracing::error!("Webhook-triggered scan failed: {e}");
|
tracing::error!("Webhook-triggered scan failed: {e}");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
StatusCode::OK
|
StatusCode::OK
|
||||||
}
|
}
|
||||||
"merge_request" => handle_merge_request(agent, &repo_id, &payload).await,
|
"merge_request" => handle_merge_request(agent, &tenant_id, &repo_id, &payload).await,
|
||||||
_ => {
|
_ => {
|
||||||
tracing::debug!("GitLab webhook: ignoring event '{event_type}'");
|
tracing::debug!("GitLab webhook: ignoring event '{event_type}'");
|
||||||
StatusCode::OK
|
StatusCode::OK
|
||||||
@@ -77,6 +89,7 @@ pub async fn handle_gitlab_webhook(
|
|||||||
|
|
||||||
async fn handle_merge_request(
|
async fn handle_merge_request(
|
||||||
agent: Arc<ComplianceAgent>,
|
agent: Arc<ComplianceAgent>,
|
||||||
|
tenant_id: &str,
|
||||||
repo_id: &str,
|
repo_id: &str,
|
||||||
payload: &serde_json::Value,
|
payload: &serde_json::Value,
|
||||||
) -> StatusCode {
|
) -> StatusCode {
|
||||||
@@ -101,13 +114,14 @@ async fn handle_merge_request(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let repo_id = repo_id.to_string();
|
let repo_id = repo_id.to_string();
|
||||||
|
let tenant_id = tenant_id.to_string();
|
||||||
let head_sha = head_sha.to_string();
|
let head_sha = head_sha.to_string();
|
||||||
let base_sha = base_sha.to_string();
|
let base_sha = base_sha.to_string();
|
||||||
let agent_clone = (*agent).clone();
|
let agent_clone = (*agent).clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
tracing::info!("GitLab MR webhook: reviewing MR !{mr_iid} on {repo_id}");
|
tracing::info!("GitLab MR webhook: reviewing MR !{mr_iid} on {repo_id}");
|
||||||
if let Err(e) = agent_clone
|
if let Err(e) = agent_clone
|
||||||
.run_pr_review(&repo_id, mr_iid, &base_sha, &head_sha)
|
.run_pr_review(&tenant_id, &repo_id, mr_iid, &base_sha, &head_sha)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
tracing::error!("MR review failed for !{mr_iid}: {e}");
|
tracing::error!("MR review failed for !{mr_iid}: {e}");
|
||||||
|
|||||||
@@ -9,17 +9,21 @@ use crate::webhooks::{gitea, github, gitlab};
|
|||||||
|
|
||||||
pub async fn start_webhook_server(agent: &ComplianceAgent) -> Result<(), AgentError> {
|
pub async fn start_webhook_server(agent: &ComplianceAgent) -> Result<(), AgentError> {
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
// Per-repo webhook URLs: /webhook/{platform}/{repo_id}
|
// Per-tenant per-repo webhook URLs: /webhook/{tenant_id}/{platform}/{repo_id}
|
||||||
|
// The tenant_id is resolved from the URL path because webhooks
|
||||||
|
// arrive without a JWT — they're authenticated via per-repo HMAC,
|
||||||
|
// not via the tenant gate. The dashboard surfaces the full URL
|
||||||
|
// including the tenant_id when the repo is registered.
|
||||||
.route(
|
.route(
|
||||||
"/webhook/github/{repo_id}",
|
"/webhook/{tenant_id}/github/{repo_id}",
|
||||||
post(github::handle_github_webhook),
|
post(github::handle_github_webhook),
|
||||||
)
|
)
|
||||||
.route(
|
.route(
|
||||||
"/webhook/gitlab/{repo_id}",
|
"/webhook/{tenant_id}/gitlab/{repo_id}",
|
||||||
post(gitlab::handle_gitlab_webhook),
|
post(gitlab::handle_gitlab_webhook),
|
||||||
)
|
)
|
||||||
.route(
|
.route(
|
||||||
"/webhook/gitea/{repo_id}",
|
"/webhook/{tenant_id}/gitea/{repo_id}",
|
||||||
post(gitea::handle_gitea_webhook),
|
post(gitea::handle_gitea_webhook),
|
||||||
)
|
)
|
||||||
.layer(Extension(Arc::new(agent.clone())));
|
.layer(Extension(Arc::new(agent.clone())));
|
||||||
|
|||||||
Reference in New Issue
Block a user