use mongodb::bson::doc; use tokio_cron_scheduler::{Job, JobScheduler}; use compliance_core::models::ScanTrigger; use crate::agent::ComplianceAgent; use crate::error::AgentError; pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> { let sched = JobScheduler::new() .await .map_err(|e| AgentError::Scheduler(format!("Failed to create scheduler: {e}")))?; // Periodic scan job let scan_agent = agent.clone(); let scan_schedule = agent.config.scan_schedule.clone(); let scan_job = Job::new_async(scan_schedule.as_str(), move |_uuid, _lock| { let agent = scan_agent.clone(); Box::pin(async move { tracing::info!("Scheduled scan triggered"); scan_all_repos(&agent).await; }) }) .map_err(|e| AgentError::Scheduler(format!("Failed to create scan job: {e}")))?; sched .add(scan_job) .await .map_err(|e| AgentError::Scheduler(format!("Failed to add scan job: {e}")))?; // CVE monitor job (daily) let cve_agent = agent.clone(); let cve_schedule = agent.config.cve_monitor_schedule.clone(); let cve_job = Job::new_async(cve_schedule.as_str(), move |_uuid, _lock| { let agent = cve_agent.clone(); Box::pin(async move { tracing::info!("CVE monitor triggered"); monitor_cves(&agent).await; }) }) .map_err(|e| AgentError::Scheduler(format!("Failed to create CVE monitor job: {e}")))?; sched .add(cve_job) .await .map_err(|e| AgentError::Scheduler(format!("Failed to add CVE monitor job: {e}")))?; sched .start() .await .map_err(|e| AgentError::Scheduler(format!("Failed to start scheduler: {e}")))?; tracing::info!( "Scheduler started: scans='{}', CVE monitor='{}'", agent.config.scan_schedule, agent.config.cve_monitor_schedule, ); // Keep scheduler alive loop { tokio::time::sleep(tokio::time::Duration::from_secs(3600)).await; } } async fn scan_all_repos(agent: &ComplianceAgent) { use futures_util::StreamExt; let cursor = match agent.db.repositories().find(doc! {}).await { Ok(c) => c, Err(e) => { tracing::error!("Failed to list repos for scheduled scan: {e}"); return; } }; let repos: Vec<_> = cursor.filter_map(|r| async { r.ok() }).collect().await; for repo in repos { let repo_id = repo.id.map(|id| id.to_hex()).unwrap_or_default(); if let Err(e) = agent.run_scan(&repo_id, ScanTrigger::Scheduled).await { tracing::error!("Scheduled scan failed for {}: {e}", repo.name); } } } async fn monitor_cves(agent: &ComplianceAgent) { use compliance_core::models::notification::{parse_severity, CveNotification}; use compliance_core::models::SbomEntry; use futures_util::StreamExt; // Fetch all SBOM entries grouped by repo let cursor = match agent.db.sbom_entries().find(doc! {}).await { Ok(c) => c, Err(e) => { tracing::error!("CVE monitor: failed to list SBOM entries: {e}"); return; } }; let entries: Vec = cursor.filter_map(|r| async { r.ok() }).collect().await; if entries.is_empty() { tracing::debug!("CVE monitor: no SBOM entries, skipping"); return; } tracing::info!( "CVE monitor: checking {} dependencies for new CVEs", entries.len() ); // Build a repo_id → repo_name lookup let repo_ids: std::collections::HashSet = entries.iter().map(|e| e.repo_id.clone()).collect(); let mut repo_names: std::collections::HashMap = std::collections::HashMap::new(); for rid in &repo_ids { if let Ok(oid) = mongodb::bson::oid::ObjectId::parse_str(rid) { if let Ok(Some(repo)) = agent.db.repositories().find_one(doc! { "_id": oid }).await { repo_names.insert(rid.clone(), repo.name.clone()); } } } // Use the existing CveScanner to query OSV.dev let nvd_key = agent.config.nvd_api_key.as_ref().map(|k| { use secrecy::ExposeSecret; k.expose_secret().to_string() }); let scanner = crate::pipeline::cve::CveScanner::new( agent.http.clone(), agent.config.searxng_url.clone(), nvd_key, ); // Group entries by repo for scanning let mut entries_by_repo: std::collections::HashMap> = std::collections::HashMap::new(); for entry in entries { entries_by_repo .entry(entry.repo_id.clone()) .or_default() .push(entry); } let mut new_notifications = 0u32; for (repo_id, mut repo_entries) in entries_by_repo { let repo_name = repo_names .get(&repo_id) .cloned() .unwrap_or_else(|| repo_id.clone()); // Scan dependencies for CVEs let alerts = match scanner.scan_dependencies(&repo_id, &mut repo_entries).await { Ok(a) => a, Err(e) => { tracing::warn!("CVE monitor: scan failed for {repo_name}: {e}"); continue; } }; // Upsert CVE alerts (existing logic) for alert in &alerts { let filter = doc! { "cve_id": &alert.cve_id, "repo_id": &alert.repo_id }; let update = doc! { "$setOnInsert": mongodb::bson::to_bson(alert).unwrap_or_default() }; let _ = agent .db .cve_alerts() .update_one(filter, update) .upsert(true) .await; } // Update SBOM entries with discovered vulnerabilities for entry in &repo_entries { if entry.known_vulnerabilities.is_empty() { continue; } if let Some(entry_id) = &entry.id { let _ = agent .db .sbom_entries() .update_one( doc! { "_id": entry_id }, doc! { "$set": { "known_vulnerabilities": mongodb::bson::to_bson(&entry.known_vulnerabilities).unwrap_or_default(), "updated_at": mongodb::bson::DateTime::now(), }}, ) .await; } } // Create notifications for NEW CVEs (dedup against existing notifications) for alert in &alerts { let filter = doc! { "cve_id": &alert.cve_id, "repo_id": &alert.repo_id, "package_name": &alert.affected_package, "package_version": &alert.affected_version, }; // Only insert if not already exists (upsert with $setOnInsert) let severity = parse_severity(alert.severity.as_deref(), alert.cvss_score); let mut notification = CveNotification::new( alert.cve_id.clone(), repo_id.clone(), repo_name.clone(), alert.affected_package.clone(), alert.affected_version.clone(), severity, ); notification.cvss_score = alert.cvss_score; notification.summary = alert.summary.clone(); notification.url = Some(format!("https://osv.dev/vulnerability/{}", alert.cve_id)); let update = doc! { "$setOnInsert": mongodb::bson::to_bson(¬ification).unwrap_or_default() }; match agent .db .cve_notifications() .update_one(filter, update) .upsert(true) .await { Ok(result) if result.upserted_id.is_some() => { new_notifications += 1; } Err(e) => { tracing::warn!("CVE monitor: failed to create notification: {e}"); } _ => {} // Already exists } } } if new_notifications > 0 { tracing::info!("CVE monitor: created {new_notifications} new notification(s)"); } else { tracing::info!("CVE monitor: no new CVEs found"); } }