use mongodb::bson::doc; use tokio_cron_scheduler::{Job, JobScheduler}; use compliance_core::models::ScanTrigger; use crate::agent::ComplianceAgent; use crate::database::Database; use crate::error::AgentError; /// Default tenant the scheduler runs against when `SCHEDULER_TENANT_IDS` /// isn't set. Matches the dev-injector default so a bare `cargo run` has /// the scheduler scanning whatever lives in `_dev`. const DEFAULT_SCHEDULER_TENANT_ID: &str = "dev"; pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> { let sched = JobScheduler::new() .await .map_err(|e| AgentError::Scheduler(format!("Failed to create scheduler: {e}")))?; // Periodic scan job let scan_agent = agent.clone(); let scan_schedule = agent.config.scan_schedule.clone(); let scan_job = Job::new_async(scan_schedule.as_str(), move |_uuid, _lock| { let agent = scan_agent.clone(); Box::pin(async move { tracing::info!("Scheduled scan triggered"); for tenant_id in scheduler_tenants() { scan_all_repos(&agent, &tenant_id).await; } }) }) .map_err(|e| AgentError::Scheduler(format!("Failed to create scan job: {e}")))?; sched .add(scan_job) .await .map_err(|e| AgentError::Scheduler(format!("Failed to add scan job: {e}")))?; // CVE monitor job (daily) let cve_agent = agent.clone(); let cve_schedule = agent.config.cve_monitor_schedule.clone(); let cve_job = Job::new_async(cve_schedule.as_str(), move |_uuid, _lock| { let agent = cve_agent.clone(); Box::pin(async move { tracing::info!("CVE monitor triggered"); for tenant_id in scheduler_tenants() { monitor_cves(&agent, &tenant_id).await; } }) }) .map_err(|e| AgentError::Scheduler(format!("Failed to create CVE monitor job: {e}")))?; sched .add(cve_job) .await .map_err(|e| AgentError::Scheduler(format!("Failed to add CVE monitor job: {e}")))?; sched .start() .await .map_err(|e| AgentError::Scheduler(format!("Failed to start scheduler: {e}")))?; let tenants = scheduler_tenants(); tracing::info!( "Scheduler started: scans='{}', CVE monitor='{}', tenants={tenants:?}", agent.config.scan_schedule, agent.config.cve_monitor_schedule, ); // Keep scheduler alive loop { tokio::time::sleep(tokio::time::Duration::from_secs(3600)).await; } } /// Tenants the scheduler iterates each tick. From `SCHEDULER_TENANT_IDS` /// (comma-separated), or `DEFAULT_SCHEDULER_TENANT_ID` if unset. M7.2-D /// will replace this with a pull from the tenant-registry. fn scheduler_tenants() -> Vec { std::env::var("SCHEDULER_TENANT_IDS") .ok() .map(|s| { s.split(',') .map(str::trim) .filter(|s| !s.is_empty()) .map(String::from) .collect::>() }) .filter(|v| !v.is_empty()) .unwrap_or_else(|| vec![DEFAULT_SCHEDULER_TENANT_ID.to_string()]) } /// Resolve the per-tenant database. Logs and returns `None` on failure /// so the loop in the caller can continue with other tenants. async fn tenant_db(agent: &ComplianceAgent, tenant_id: &str) -> Option { match agent.db_pool.for_tenant_id(tenant_id).await { Ok(db) => Some(db), Err(e) => { tracing::error!("Scheduler: cannot open tenant database '{tenant_id}': {e}"); None } } } async fn scan_all_repos(agent: &ComplianceAgent, tenant_id: &str) { use futures_util::StreamExt; let db = match tenant_db(agent, tenant_id).await { Some(db) => db, None => return, }; let cursor = match db.repositories().find(doc! {}).await { Ok(c) => c, Err(e) => { tracing::error!("Failed to list repos for tenant '{tenant_id}': {e}"); return; } }; let repos: Vec<_> = cursor.filter_map(|r| async { r.ok() }).collect().await; for repo in repos { let repo_id = repo.id.map(|id| id.to_hex()).unwrap_or_default(); if let Err(e) = agent .run_scan(tenant_id, &repo_id, ScanTrigger::Scheduled) .await { tracing::error!( "Scheduled scan failed for {} (tenant '{tenant_id}'): {e}", repo.name ); } } } async fn monitor_cves(agent: &ComplianceAgent, tenant_id: &str) { use compliance_core::models::notification::{parse_severity, CveNotification}; use compliance_core::models::SbomEntry; use futures_util::StreamExt; let db = match tenant_db(agent, tenant_id).await { Some(db) => db, None => return, }; // Fetch all SBOM entries grouped by repo let cursor = match db.sbom_entries().find(doc! {}).await { Ok(c) => c, Err(e) => { tracing::error!("CVE monitor: failed to list SBOM entries for '{tenant_id}': {e}"); return; } }; let entries: Vec = cursor.filter_map(|r| async { r.ok() }).collect().await; if entries.is_empty() { tracing::debug!("CVE monitor: no SBOM entries for tenant '{tenant_id}', skipping"); return; } tracing::info!( "CVE monitor: checking {} dependencies for new CVEs (tenant '{tenant_id}')", entries.len() ); // Build a repo_id → repo_name lookup let repo_ids: std::collections::HashSet = entries.iter().map(|e| e.repo_id.clone()).collect(); let mut repo_names: std::collections::HashMap = std::collections::HashMap::new(); for rid in &repo_ids { if let Ok(oid) = mongodb::bson::oid::ObjectId::parse_str(rid) { if let Ok(Some(repo)) = db.repositories().find_one(doc! { "_id": oid }).await { repo_names.insert(rid.clone(), repo.name.clone()); } } } // Use the existing CveScanner to query OSV.dev let nvd_key = agent.config.nvd_api_key.as_ref().map(|k| { use secrecy::ExposeSecret; k.expose_secret().to_string() }); let scanner = crate::pipeline::cve::CveScanner::new( agent.http.clone(), agent.config.searxng_url.clone(), nvd_key, ); // Group entries by repo for scanning let mut entries_by_repo: std::collections::HashMap> = std::collections::HashMap::new(); for entry in entries { entries_by_repo .entry(entry.repo_id.clone()) .or_default() .push(entry); } let mut new_notifications = 0u32; for (repo_id, mut repo_entries) in entries_by_repo { let repo_name = repo_names .get(&repo_id) .cloned() .unwrap_or_else(|| repo_id.clone()); // Scan dependencies for CVEs let alerts = match scanner.scan_dependencies(&repo_id, &mut repo_entries).await { Ok(a) => a, Err(e) => { tracing::warn!("CVE monitor: scan failed for {repo_name}: {e}"); continue; } }; // Upsert CVE alerts (existing logic) for alert in &alerts { let filter = doc! { "cve_id": &alert.cve_id, "repo_id": &alert.repo_id }; let update = doc! { "$setOnInsert": mongodb::bson::to_bson(alert).unwrap_or_default() }; let _ = db .cve_alerts() .update_one(filter, update) .upsert(true) .await; } // Update SBOM entries with discovered vulnerabilities for entry in &repo_entries { if entry.known_vulnerabilities.is_empty() { continue; } if let Some(entry_id) = &entry.id { let _ = db .sbom_entries() .update_one( doc! { "_id": entry_id }, doc! { "$set": { "known_vulnerabilities": mongodb::bson::to_bson(&entry.known_vulnerabilities).unwrap_or_default(), "updated_at": mongodb::bson::DateTime::now(), }}, ) .await; } } // Create notifications for NEW CVEs (dedup against existing notifications) for alert in &alerts { let filter = doc! { "cve_id": &alert.cve_id, "repo_id": &alert.repo_id, "package_name": &alert.affected_package, "package_version": &alert.affected_version, }; // Only insert if not already exists (upsert with $setOnInsert) let severity = parse_severity(alert.severity.as_deref(), alert.cvss_score); let mut notification = CveNotification::new( alert.cve_id.clone(), repo_id.clone(), repo_name.clone(), alert.affected_package.clone(), alert.affected_version.clone(), severity, ); notification.cvss_score = alert.cvss_score; notification.summary = alert.summary.clone(); notification.url = Some(format!("https://osv.dev/vulnerability/{}", alert.cve_id)); let update = doc! { "$setOnInsert": mongodb::bson::to_bson(¬ification).unwrap_or_default() }; match db .cve_notifications() .update_one(filter, update) .upsert(true) .await { Ok(result) if result.upserted_id.is_some() => { new_notifications += 1; } Err(e) => { tracing::warn!("CVE monitor: failed to create notification: {e}"); } _ => {} // Already exists } } } if new_notifications > 0 { tracing::info!( "CVE monitor: created {new_notifications} new notification(s) for tenant '{tenant_id}'" ); } else { tracing::info!("CVE monitor: no new CVEs found for tenant '{tenant_id}'"); } }