fix(dashboard): attach Keycloak token on agent API calls (#90)
CI / Check (push) Has been skipped
CI / Detect Changes (push) Successful in 6s
CI / Deploy Agent (push) Successful in 4m8s
CI / Deploy Dashboard (push) Successful in 4m58s
CI / Deploy Docs (push) Has been skipped
CI / Deploy MCP (push) Has been skipped
CI / Check (push) Has been skipped
CI / Detect Changes (push) Successful in 6s
CI / Deploy Agent (push) Successful in 4m8s
CI / Deploy Dashboard (push) Successful in 4m58s
CI / Deploy Docs (push) Has been skipped
CI / Deploy MCP (push) Has been skipped
This commit was merged in pull request #90.
This commit is contained in:
@@ -4,8 +4,14 @@ use tokio_cron_scheduler::{Job, JobScheduler};
|
||||
use compliance_core::models::ScanTrigger;
|
||||
|
||||
use crate::agent::ComplianceAgent;
|
||||
use crate::database::Database;
|
||||
use crate::error::AgentError;
|
||||
|
||||
/// Default tenant the scheduler runs against when `SCHEDULER_TENANT_IDS`
|
||||
/// isn't set. Matches the dev-injector default so a bare `cargo run` has
|
||||
/// the scheduler scanning whatever lives in `<prefix>_dev`.
|
||||
const DEFAULT_SCHEDULER_TENANT_ID: &str = "dev";
|
||||
|
||||
pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> {
|
||||
let sched = JobScheduler::new()
|
||||
.await
|
||||
@@ -18,7 +24,9 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
|
||||
let agent = scan_agent.clone();
|
||||
Box::pin(async move {
|
||||
tracing::info!("Scheduled scan triggered");
|
||||
scan_all_repos(&agent).await;
|
||||
for tenant_id in scheduler_tenants() {
|
||||
scan_all_repos(&agent, &tenant_id).await;
|
||||
}
|
||||
})
|
||||
})
|
||||
.map_err(|e| AgentError::Scheduler(format!("Failed to create scan job: {e}")))?;
|
||||
@@ -34,7 +42,9 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
|
||||
let agent = cve_agent.clone();
|
||||
Box::pin(async move {
|
||||
tracing::info!("CVE monitor triggered");
|
||||
monitor_cves(&agent).await;
|
||||
for tenant_id in scheduler_tenants() {
|
||||
monitor_cves(&agent, &tenant_id).await;
|
||||
}
|
||||
})
|
||||
})
|
||||
.map_err(|e| AgentError::Scheduler(format!("Failed to create CVE monitor job: {e}")))?;
|
||||
@@ -48,8 +58,9 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
|
||||
.await
|
||||
.map_err(|e| AgentError::Scheduler(format!("Failed to start scheduler: {e}")))?;
|
||||
|
||||
let tenants = scheduler_tenants();
|
||||
tracing::info!(
|
||||
"Scheduler started: scans='{}', CVE monitor='{}'",
|
||||
"Scheduler started: scans='{}', CVE monitor='{}', tenants={tenants:?}",
|
||||
agent.config.scan_schedule,
|
||||
agent.config.cve_monitor_schedule,
|
||||
);
|
||||
@@ -60,13 +71,47 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
|
||||
}
|
||||
}
|
||||
|
||||
async fn scan_all_repos(agent: &ComplianceAgent) {
|
||||
/// Tenants the scheduler iterates each tick. From `SCHEDULER_TENANT_IDS`
|
||||
/// (comma-separated), or `DEFAULT_SCHEDULER_TENANT_ID` if unset. M7.2-D
|
||||
/// will replace this with a pull from the tenant-registry.
|
||||
fn scheduler_tenants() -> Vec<String> {
|
||||
std::env::var("SCHEDULER_TENANT_IDS")
|
||||
.ok()
|
||||
.map(|s| {
|
||||
s.split(',')
|
||||
.map(str::trim)
|
||||
.filter(|s| !s.is_empty())
|
||||
.map(String::from)
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.filter(|v| !v.is_empty())
|
||||
.unwrap_or_else(|| vec![DEFAULT_SCHEDULER_TENANT_ID.to_string()])
|
||||
}
|
||||
|
||||
/// Resolve the per-tenant database. Logs and returns `None` on failure
|
||||
/// so the loop in the caller can continue with other tenants.
|
||||
async fn tenant_db(agent: &ComplianceAgent, tenant_id: &str) -> Option<Database> {
|
||||
match agent.db_pool.for_tenant_id(tenant_id).await {
|
||||
Ok(db) => Some(db),
|
||||
Err(e) => {
|
||||
tracing::error!("Scheduler: cannot open tenant database '{tenant_id}': {e}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn scan_all_repos(agent: &ComplianceAgent, tenant_id: &str) {
|
||||
use futures_util::StreamExt;
|
||||
|
||||
let cursor = match agent.db.repositories().find(doc! {}).await {
|
||||
let db = match tenant_db(agent, tenant_id).await {
|
||||
Some(db) => db,
|
||||
None => return,
|
||||
};
|
||||
|
||||
let cursor = match db.repositories().find(doc! {}).await {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to list repos for scheduled scan: {e}");
|
||||
tracing::error!("Failed to list repos for tenant '{tenant_id}': {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -75,33 +120,44 @@ async fn scan_all_repos(agent: &ComplianceAgent) {
|
||||
|
||||
for repo in repos {
|
||||
let repo_id = repo.id.map(|id| id.to_hex()).unwrap_or_default();
|
||||
if let Err(e) = agent.run_scan(&repo_id, ScanTrigger::Scheduled).await {
|
||||
tracing::error!("Scheduled scan failed for {}: {e}", repo.name);
|
||||
if let Err(e) = agent
|
||||
.run_scan(tenant_id, &repo_id, ScanTrigger::Scheduled)
|
||||
.await
|
||||
{
|
||||
tracing::error!(
|
||||
"Scheduled scan failed for {} (tenant '{tenant_id}'): {e}",
|
||||
repo.name
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn monitor_cves(agent: &ComplianceAgent) {
|
||||
async fn monitor_cves(agent: &ComplianceAgent, tenant_id: &str) {
|
||||
use compliance_core::models::notification::{parse_severity, CveNotification};
|
||||
use compliance_core::models::SbomEntry;
|
||||
use futures_util::StreamExt;
|
||||
|
||||
let db = match tenant_db(agent, tenant_id).await {
|
||||
Some(db) => db,
|
||||
None => return,
|
||||
};
|
||||
|
||||
// Fetch all SBOM entries grouped by repo
|
||||
let cursor = match agent.db.sbom_entries().find(doc! {}).await {
|
||||
let cursor = match db.sbom_entries().find(doc! {}).await {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
tracing::error!("CVE monitor: failed to list SBOM entries: {e}");
|
||||
tracing::error!("CVE monitor: failed to list SBOM entries for '{tenant_id}': {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let entries: Vec<SbomEntry> = cursor.filter_map(|r| async { r.ok() }).collect().await;
|
||||
if entries.is_empty() {
|
||||
tracing::debug!("CVE monitor: no SBOM entries, skipping");
|
||||
tracing::debug!("CVE monitor: no SBOM entries for tenant '{tenant_id}', skipping");
|
||||
return;
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
"CVE monitor: checking {} dependencies for new CVEs",
|
||||
"CVE monitor: checking {} dependencies for new CVEs (tenant '{tenant_id}')",
|
||||
entries.len()
|
||||
);
|
||||
|
||||
@@ -112,7 +168,7 @@ async fn monitor_cves(agent: &ComplianceAgent) {
|
||||
std::collections::HashMap::new();
|
||||
for rid in &repo_ids {
|
||||
if let Ok(oid) = mongodb::bson::oid::ObjectId::parse_str(rid) {
|
||||
if let Ok(Some(repo)) = agent.db.repositories().find_one(doc! { "_id": oid }).await {
|
||||
if let Ok(Some(repo)) = db.repositories().find_one(doc! { "_id": oid }).await {
|
||||
repo_names.insert(rid.clone(), repo.name.clone());
|
||||
}
|
||||
}
|
||||
@@ -160,8 +216,7 @@ async fn monitor_cves(agent: &ComplianceAgent) {
|
||||
for alert in &alerts {
|
||||
let filter = doc! { "cve_id": &alert.cve_id, "repo_id": &alert.repo_id };
|
||||
let update = doc! { "$setOnInsert": mongodb::bson::to_bson(alert).unwrap_or_default() };
|
||||
let _ = agent
|
||||
.db
|
||||
let _ = db
|
||||
.cve_alerts()
|
||||
.update_one(filter, update)
|
||||
.upsert(true)
|
||||
@@ -174,8 +229,7 @@ async fn monitor_cves(agent: &ComplianceAgent) {
|
||||
continue;
|
||||
}
|
||||
if let Some(entry_id) = &entry.id {
|
||||
let _ = agent
|
||||
.db
|
||||
let _ = db
|
||||
.sbom_entries()
|
||||
.update_one(
|
||||
doc! { "_id": entry_id },
|
||||
@@ -213,8 +267,7 @@ async fn monitor_cves(agent: &ComplianceAgent) {
|
||||
let update = doc! {
|
||||
"$setOnInsert": mongodb::bson::to_bson(¬ification).unwrap_or_default()
|
||||
};
|
||||
match agent
|
||||
.db
|
||||
match db
|
||||
.cve_notifications()
|
||||
.update_one(filter, update)
|
||||
.upsert(true)
|
||||
@@ -232,8 +285,10 @@ async fn monitor_cves(agent: &ComplianceAgent) {
|
||||
}
|
||||
|
||||
if new_notifications > 0 {
|
||||
tracing::info!("CVE monitor: created {new_notifications} new notification(s)");
|
||||
tracing::info!(
|
||||
"CVE monitor: created {new_notifications} new notification(s) for tenant '{tenant_id}'"
|
||||
);
|
||||
} else {
|
||||
tracing::info!("CVE monitor: no new CVEs found");
|
||||
tracing::info!("CVE monitor: no new CVEs found for tenant '{tenant_id}'");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user