use mongodb::bson::doc; use tokio_cron_scheduler::{Job, JobScheduler}; use compliance_core::models::ScanTrigger; use crate::agent::ComplianceAgent; use crate::error::AgentError; pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> { let sched = JobScheduler::new() .await .map_err(|e| AgentError::Scheduler(format!("Failed to create scheduler: {e}")))?; // Periodic scan job let scan_agent = agent.clone(); let scan_schedule = agent.config.scan_schedule.clone(); let scan_job = Job::new_async(scan_schedule.as_str(), move |_uuid, _lock| { let agent = scan_agent.clone(); Box::pin(async move { tracing::info!("Scheduled scan triggered"); scan_all_repos(&agent).await; }) }) .map_err(|e| AgentError::Scheduler(format!("Failed to create scan job: {e}")))?; sched .add(scan_job) .await .map_err(|e| AgentError::Scheduler(format!("Failed to add scan job: {e}")))?; // CVE monitor job (daily) let cve_agent = agent.clone(); let cve_schedule = agent.config.cve_monitor_schedule.clone(); let cve_job = Job::new_async(cve_schedule.as_str(), move |_uuid, _lock| { let agent = cve_agent.clone(); Box::pin(async move { tracing::info!("CVE monitor triggered"); monitor_cves(&agent).await; }) }) .map_err(|e| AgentError::Scheduler(format!("Failed to create CVE monitor job: {e}")))?; sched .add(cve_job) .await .map_err(|e| AgentError::Scheduler(format!("Failed to add CVE monitor job: {e}")))?; sched .start() .await .map_err(|e| AgentError::Scheduler(format!("Failed to start scheduler: {e}")))?; tracing::info!( "Scheduler started: scans='{}', CVE monitor='{}'", agent.config.scan_schedule, agent.config.cve_monitor_schedule, ); // Keep scheduler alive loop { tokio::time::sleep(tokio::time::Duration::from_secs(3600)).await; } } async fn scan_all_repos(agent: &ComplianceAgent) { use futures_util::StreamExt; let cursor = match agent.db.repositories().find(doc! {}).await { Ok(c) => c, Err(e) => { tracing::error!("Failed to list repos for scheduled scan: {e}"); return; } }; let repos: Vec<_> = cursor.filter_map(|r| async { r.ok() }).collect().await; for repo in repos { let repo_id = repo.id.map(|id| id.to_hex()).unwrap_or_default(); if let Err(e) = agent.run_scan(&repo_id, ScanTrigger::Scheduled).await { tracing::error!("Scheduled scan failed for {}: {e}", repo.name); } } } async fn monitor_cves(agent: &ComplianceAgent) { use futures_util::StreamExt; // Re-scan all SBOM entries for new CVEs let cursor = match agent.db.sbom_entries().find(doc! {}).await { Ok(c) => c, Err(e) => { tracing::error!("Failed to list SBOM entries for CVE monitoring: {e}"); return; } }; let entries: Vec<_> = cursor.filter_map(|r| async { r.ok() }).collect().await; if entries.is_empty() { return; } tracing::info!("CVE monitor: checking {} dependencies", entries.len()); // The actual CVE checking is handled by the CveScanner in the pipeline // This is a simplified version that just logs the activity }