All checks were successful
CI / Check (pull_request) Successful in 9m47s
CI / Detect Changes (pull_request) Has been skipped
CI / Deploy Agent (pull_request) Has been skipped
CI / Deploy Dashboard (pull_request) Has been skipped
CI / Deploy Docs (pull_request) Has been skipped
CI / Deploy MCP (pull_request) Has been skipped
Implements the full CVE alerting pipeline: CVE Monitor (scheduler.rs): - Replaces stub monitor_cves with actual OSV.dev scanning of all SBOM entries - Runs hourly by default (CVE_MONITOR_SCHEDULE, was daily) - Creates CveNotification for each new CVE (deduped by cve_id+repo+package) - Updates SBOM entries with discovered vulnerabilities - Upserts CveAlert records Notification Model (compliance-core/models/notification.rs): - CveNotification with status lifecycle: new → read → dismissed - NotificationSeverity (Low/Medium/High/Critical) from CVSS scores - parse_severity helper for OSV/NVD severity mapping API Endpoints (5 new routes): - GET /api/v1/notifications — List with status/severity/repo filters - GET /api/v1/notifications/count — Unread count (for badge) - PATCH /api/v1/notifications/:id/read — Mark as read - PATCH /api/v1/notifications/:id/dismiss — Dismiss - POST /api/v1/notifications/read-all — Bulk mark read Dashboard Notification Bell: - Floating bell icon (top-right) with unread count badge - Dropdown panel showing CVE details: severity, CVSS, package, repo, summary - Dismiss individual notifications - Auto-marks as read when panel opens - Polls count every 30 seconds Also: - Fix Dockerfile.dashboard: revert to dioxus-cli 0.7.3 --locked - Add cve_notifications collection with unique + status indexes - MongoDB indexes for efficient notification queries Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
240 lines
8.2 KiB
Rust
240 lines
8.2 KiB
Rust
use mongodb::bson::doc;
|
|
use tokio_cron_scheduler::{Job, JobScheduler};
|
|
|
|
use compliance_core::models::ScanTrigger;
|
|
|
|
use crate::agent::ComplianceAgent;
|
|
use crate::error::AgentError;
|
|
|
|
pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> {
|
|
let sched = JobScheduler::new()
|
|
.await
|
|
.map_err(|e| AgentError::Scheduler(format!("Failed to create scheduler: {e}")))?;
|
|
|
|
// Periodic scan job
|
|
let scan_agent = agent.clone();
|
|
let scan_schedule = agent.config.scan_schedule.clone();
|
|
let scan_job = Job::new_async(scan_schedule.as_str(), move |_uuid, _lock| {
|
|
let agent = scan_agent.clone();
|
|
Box::pin(async move {
|
|
tracing::info!("Scheduled scan triggered");
|
|
scan_all_repos(&agent).await;
|
|
})
|
|
})
|
|
.map_err(|e| AgentError::Scheduler(format!("Failed to create scan job: {e}")))?;
|
|
sched
|
|
.add(scan_job)
|
|
.await
|
|
.map_err(|e| AgentError::Scheduler(format!("Failed to add scan job: {e}")))?;
|
|
|
|
// CVE monitor job (daily)
|
|
let cve_agent = agent.clone();
|
|
let cve_schedule = agent.config.cve_monitor_schedule.clone();
|
|
let cve_job = Job::new_async(cve_schedule.as_str(), move |_uuid, _lock| {
|
|
let agent = cve_agent.clone();
|
|
Box::pin(async move {
|
|
tracing::info!("CVE monitor triggered");
|
|
monitor_cves(&agent).await;
|
|
})
|
|
})
|
|
.map_err(|e| AgentError::Scheduler(format!("Failed to create CVE monitor job: {e}")))?;
|
|
sched
|
|
.add(cve_job)
|
|
.await
|
|
.map_err(|e| AgentError::Scheduler(format!("Failed to add CVE monitor job: {e}")))?;
|
|
|
|
sched
|
|
.start()
|
|
.await
|
|
.map_err(|e| AgentError::Scheduler(format!("Failed to start scheduler: {e}")))?;
|
|
|
|
tracing::info!(
|
|
"Scheduler started: scans='{}', CVE monitor='{}'",
|
|
agent.config.scan_schedule,
|
|
agent.config.cve_monitor_schedule,
|
|
);
|
|
|
|
// Keep scheduler alive
|
|
loop {
|
|
tokio::time::sleep(tokio::time::Duration::from_secs(3600)).await;
|
|
}
|
|
}
|
|
|
|
async fn scan_all_repos(agent: &ComplianceAgent) {
|
|
use futures_util::StreamExt;
|
|
|
|
let cursor = match agent.db.repositories().find(doc! {}).await {
|
|
Ok(c) => c,
|
|
Err(e) => {
|
|
tracing::error!("Failed to list repos for scheduled scan: {e}");
|
|
return;
|
|
}
|
|
};
|
|
|
|
let repos: Vec<_> = cursor.filter_map(|r| async { r.ok() }).collect().await;
|
|
|
|
for repo in repos {
|
|
let repo_id = repo.id.map(|id| id.to_hex()).unwrap_or_default();
|
|
if let Err(e) = agent.run_scan(&repo_id, ScanTrigger::Scheduled).await {
|
|
tracing::error!("Scheduled scan failed for {}: {e}", repo.name);
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn monitor_cves(agent: &ComplianceAgent) {
|
|
use compliance_core::models::notification::{parse_severity, CveNotification};
|
|
use compliance_core::models::SbomEntry;
|
|
use futures_util::StreamExt;
|
|
|
|
// Fetch all SBOM entries grouped by repo
|
|
let cursor = match agent.db.sbom_entries().find(doc! {}).await {
|
|
Ok(c) => c,
|
|
Err(e) => {
|
|
tracing::error!("CVE monitor: failed to list SBOM entries: {e}");
|
|
return;
|
|
}
|
|
};
|
|
let entries: Vec<SbomEntry> = cursor.filter_map(|r| async { r.ok() }).collect().await;
|
|
if entries.is_empty() {
|
|
tracing::debug!("CVE monitor: no SBOM entries, skipping");
|
|
return;
|
|
}
|
|
|
|
tracing::info!(
|
|
"CVE monitor: checking {} dependencies for new CVEs",
|
|
entries.len()
|
|
);
|
|
|
|
// Build a repo_id → repo_name lookup
|
|
let repo_ids: std::collections::HashSet<String> =
|
|
entries.iter().map(|e| e.repo_id.clone()).collect();
|
|
let mut repo_names: std::collections::HashMap<String, String> =
|
|
std::collections::HashMap::new();
|
|
for rid in &repo_ids {
|
|
if let Ok(oid) = mongodb::bson::oid::ObjectId::parse_str(rid) {
|
|
if let Ok(Some(repo)) = agent.db.repositories().find_one(doc! { "_id": oid }).await {
|
|
repo_names.insert(rid.clone(), repo.name.clone());
|
|
}
|
|
}
|
|
}
|
|
|
|
// Use the existing CveScanner to query OSV.dev
|
|
let nvd_key = agent.config.nvd_api_key.as_ref().map(|k| {
|
|
use secrecy::ExposeSecret;
|
|
k.expose_secret().to_string()
|
|
});
|
|
let scanner = crate::pipeline::cve::CveScanner::new(
|
|
agent.http.clone(),
|
|
agent.config.searxng_url.clone(),
|
|
nvd_key,
|
|
);
|
|
|
|
// Group entries by repo for scanning
|
|
let mut entries_by_repo: std::collections::HashMap<String, Vec<SbomEntry>> =
|
|
std::collections::HashMap::new();
|
|
for entry in entries {
|
|
entries_by_repo
|
|
.entry(entry.repo_id.clone())
|
|
.or_default()
|
|
.push(entry);
|
|
}
|
|
|
|
let mut new_notifications = 0u32;
|
|
|
|
for (repo_id, mut repo_entries) in entries_by_repo {
|
|
let repo_name = repo_names
|
|
.get(&repo_id)
|
|
.cloned()
|
|
.unwrap_or_else(|| repo_id.clone());
|
|
|
|
// Scan dependencies for CVEs
|
|
let alerts = match scanner.scan_dependencies(&repo_id, &mut repo_entries).await {
|
|
Ok(a) => a,
|
|
Err(e) => {
|
|
tracing::warn!("CVE monitor: scan failed for {repo_name}: {e}");
|
|
continue;
|
|
}
|
|
};
|
|
|
|
// Upsert CVE alerts (existing logic)
|
|
for alert in &alerts {
|
|
let filter = doc! { "cve_id": &alert.cve_id, "repo_id": &alert.repo_id };
|
|
let update = doc! { "$setOnInsert": mongodb::bson::to_bson(alert).unwrap_or_default() };
|
|
let _ = agent
|
|
.db
|
|
.cve_alerts()
|
|
.update_one(filter, update)
|
|
.upsert(true)
|
|
.await;
|
|
}
|
|
|
|
// Update SBOM entries with discovered vulnerabilities
|
|
for entry in &repo_entries {
|
|
if entry.known_vulnerabilities.is_empty() {
|
|
continue;
|
|
}
|
|
if let Some(entry_id) = &entry.id {
|
|
let _ = agent
|
|
.db
|
|
.sbom_entries()
|
|
.update_one(
|
|
doc! { "_id": entry_id },
|
|
doc! { "$set": {
|
|
"known_vulnerabilities": mongodb::bson::to_bson(&entry.known_vulnerabilities).unwrap_or_default(),
|
|
"updated_at": mongodb::bson::DateTime::now(),
|
|
}},
|
|
)
|
|
.await;
|
|
}
|
|
}
|
|
|
|
// Create notifications for NEW CVEs (dedup against existing notifications)
|
|
for alert in &alerts {
|
|
let filter = doc! {
|
|
"cve_id": &alert.cve_id,
|
|
"repo_id": &alert.repo_id,
|
|
"package_name": &alert.affected_package,
|
|
"package_version": &alert.affected_version,
|
|
};
|
|
// Only insert if not already exists (upsert with $setOnInsert)
|
|
let severity = parse_severity(alert.severity.as_deref(), alert.cvss_score);
|
|
let mut notification = CveNotification::new(
|
|
alert.cve_id.clone(),
|
|
repo_id.clone(),
|
|
repo_name.clone(),
|
|
alert.affected_package.clone(),
|
|
alert.affected_version.clone(),
|
|
severity,
|
|
);
|
|
notification.cvss_score = alert.cvss_score;
|
|
notification.summary = alert.summary.clone();
|
|
notification.url = Some(format!("https://osv.dev/vulnerability/{}", alert.cve_id));
|
|
|
|
let update = doc! {
|
|
"$setOnInsert": mongodb::bson::to_bson(¬ification).unwrap_or_default()
|
|
};
|
|
match agent
|
|
.db
|
|
.cve_notifications()
|
|
.update_one(filter, update)
|
|
.upsert(true)
|
|
.await
|
|
{
|
|
Ok(result) if result.upserted_id.is_some() => {
|
|
new_notifications += 1;
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!("CVE monitor: failed to create notification: {e}");
|
|
}
|
|
_ => {} // Already exists
|
|
}
|
|
}
|
|
}
|
|
|
|
if new_notifications > 0 {
|
|
tracing::info!("CVE monitor: created {new_notifications} new notification(s)");
|
|
} else {
|
|
tracing::info!("CVE monitor: no new CVEs found");
|
|
}
|
|
}
|