Compare commits
1 Commits
main
..
7e07ebeb53
| Author | SHA1 | Date | |
|---|---|---|---|
| 7e07ebeb53 |
@@ -7,18 +7,11 @@ use crate::agent::ComplianceAgent;
|
|||||||
use crate::database::Database;
|
use crate::database::Database;
|
||||||
use crate::error::AgentError;
|
use crate::error::AgentError;
|
||||||
|
|
||||||
/// Default tenant the scheduler runs against when neither the tenant
|
/// Default tenant the scheduler runs against when `SCHEDULER_TENANT_IDS`
|
||||||
/// registry nor `SCHEDULER_TENANT_IDS` are configured. Matches the
|
/// isn't set. Matches the dev-injector default so a bare `cargo run` has
|
||||||
/// dev-injector default so a bare `cargo run` has the scheduler
|
/// the scheduler scanning whatever lives in `<prefix>_dev`.
|
||||||
/// scanning whatever lives in `<prefix>_dev`.
|
|
||||||
const DEFAULT_SCHEDULER_TENANT_ID: &str = "dev";
|
const DEFAULT_SCHEDULER_TENANT_ID: &str = "dev";
|
||||||
|
|
||||||
/// Request timeout when fetching the live tenant list from the
|
|
||||||
/// registry. Kept short — if the registry is slow we'd rather fall
|
|
||||||
/// back to env-configured ids and finish the tick than block the
|
|
||||||
/// scheduler loop.
|
|
||||||
const REGISTRY_FETCH_TIMEOUT_SECS: u64 = 5;
|
|
||||||
|
|
||||||
pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> {
|
pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> {
|
||||||
let sched = JobScheduler::new()
|
let sched = JobScheduler::new()
|
||||||
.await
|
.await
|
||||||
@@ -31,12 +24,7 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
|
|||||||
let agent = scan_agent.clone();
|
let agent = scan_agent.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
tracing::info!("Scheduled scan triggered");
|
tracing::info!("Scheduled scan triggered");
|
||||||
let tenants = scheduler_tenants(&agent).await;
|
for tenant_id in scheduler_tenants() {
|
||||||
tracing::debug!(
|
|
||||||
tenant_count = tenants.len(),
|
|
||||||
"Scheduled scan: tenants resolved"
|
|
||||||
);
|
|
||||||
for tenant_id in tenants {
|
|
||||||
scan_all_repos(&agent, &tenant_id).await;
|
scan_all_repos(&agent, &tenant_id).await;
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@@ -54,12 +42,7 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
|
|||||||
let agent = cve_agent.clone();
|
let agent = cve_agent.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
tracing::info!("CVE monitor triggered");
|
tracing::info!("CVE monitor triggered");
|
||||||
let tenants = scheduler_tenants(&agent).await;
|
for tenant_id in scheduler_tenants() {
|
||||||
tracing::debug!(
|
|
||||||
tenant_count = tenants.len(),
|
|
||||||
"CVE monitor: tenants resolved"
|
|
||||||
);
|
|
||||||
for tenant_id in tenants {
|
|
||||||
monitor_cves(&agent, &tenant_id).await;
|
monitor_cves(&agent, &tenant_id).await;
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@@ -75,14 +58,9 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| AgentError::Scheduler(format!("Failed to start scheduler: {e}")))?;
|
.map_err(|e| AgentError::Scheduler(format!("Failed to start scheduler: {e}")))?;
|
||||||
|
|
||||||
let tenants = scheduler_tenants(agent).await;
|
let tenants = scheduler_tenants();
|
||||||
let source = if agent.config.tenant_registry_url.is_some() {
|
|
||||||
"tenant-registry (env fallback)"
|
|
||||||
} else {
|
|
||||||
"env (SCHEDULER_TENANT_IDS)"
|
|
||||||
};
|
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"Scheduler started: scans='{}', CVE monitor='{}', tenant source={source}, tenants={tenants:?}",
|
"Scheduler started: scans='{}', CVE monitor='{}', tenants={tenants:?}",
|
||||||
agent.config.scan_schedule,
|
agent.config.scan_schedule,
|
||||||
agent.config.cve_monitor_schedule,
|
agent.config.cve_monitor_schedule,
|
||||||
);
|
);
|
||||||
@@ -93,40 +71,10 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tenants the scheduler iterates each tick.
|
/// Tenants the scheduler iterates each tick. From `SCHEDULER_TENANT_IDS`
|
||||||
///
|
/// (comma-separated), or `DEFAULT_SCHEDULER_TENANT_ID` if unset. M7.2-D
|
||||||
/// Resolution order:
|
/// will replace this with a pull from the tenant-registry.
|
||||||
/// 1. **Tenant registry** at `agent.config.tenant_registry_url`
|
fn scheduler_tenants() -> Vec<String> {
|
||||||
/// (`GET /v1/tenants`). Fresh on every tick — picks up newly
|
|
||||||
/// provisioned tenants without an agent restart.
|
|
||||||
/// 2. **`SCHEDULER_TENANT_IDS`** env (comma-separated) — fallback when
|
|
||||||
/// the registry is unreachable, the response is malformed, or no
|
|
||||||
/// registry URL is configured.
|
|
||||||
/// 3. **`DEFAULT_SCHEDULER_TENANT_ID`** (`"dev"`) — last-ditch fallback
|
|
||||||
/// so the scheduler keeps doing something useful in dev.
|
|
||||||
///
|
|
||||||
/// We never panic out of this function — the scheduler must keep
|
|
||||||
/// firing even if the registry is offline.
|
|
||||||
async fn scheduler_tenants(agent: &ComplianceAgent) -> Vec<String> {
|
|
||||||
if let Some(url) = agent.config.tenant_registry_url.as_deref() {
|
|
||||||
match fetch_tenants_from_registry(&agent.http, url).await {
|
|
||||||
Ok(v) if !v.is_empty() => return v,
|
|
||||||
Ok(_) => {
|
|
||||||
tracing::warn!("tenant-registry returned empty list; falling back to env");
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!(
|
|
||||||
url = %url,
|
|
||||||
error = %e,
|
|
||||||
"tenant-registry fetch failed; falling back to env"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tenants_from_env()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn tenants_from_env() -> Vec<String> {
|
|
||||||
std::env::var("SCHEDULER_TENANT_IDS")
|
std::env::var("SCHEDULER_TENANT_IDS")
|
||||||
.ok()
|
.ok()
|
||||||
.map(|s| {
|
.map(|s| {
|
||||||
@@ -140,134 +88,6 @@ fn tenants_from_env() -> Vec<String> {
|
|||||||
.unwrap_or_else(|| vec![DEFAULT_SCHEDULER_TENANT_ID.to_string()])
|
.unwrap_or_else(|| vec![DEFAULT_SCHEDULER_TENANT_ID.to_string()])
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Shape we accept from the registry. Liberal in what we accept:
|
|
||||||
/// the registry can return any field shape as long as either `id` or
|
|
||||||
/// `tenant_id` is present. Other fields are ignored.
|
|
||||||
#[derive(serde::Deserialize)]
|
|
||||||
struct RegistryTenant {
|
|
||||||
#[serde(alias = "tenant_id")]
|
|
||||||
id: String,
|
|
||||||
/// Filter out non-running tenants if status is present. Missing
|
|
||||||
/// status defaults to "active" so older registry deployments keep
|
|
||||||
/// working.
|
|
||||||
#[serde(default = "default_status")]
|
|
||||||
status: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn default_status() -> String {
|
|
||||||
"active".to_string()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(serde::Deserialize)]
|
|
||||||
struct RegistryListResponse {
|
|
||||||
data: Vec<RegistryTenant>,
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn fetch_tenants_from_registry(
|
|
||||||
http: &reqwest::Client,
|
|
||||||
base_url: &str,
|
|
||||||
) -> Result<Vec<String>, String> {
|
|
||||||
let url = format!("{}/v1/tenants", base_url.trim_end_matches('/'));
|
|
||||||
let resp = http
|
|
||||||
.get(&url)
|
|
||||||
.timeout(std::time::Duration::from_secs(REGISTRY_FETCH_TIMEOUT_SECS))
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("request failed: {e}"))?;
|
|
||||||
if !resp.status().is_success() {
|
|
||||||
return Err(format!("registry returned {}", resp.status()));
|
|
||||||
}
|
|
||||||
let body: RegistryListResponse = resp
|
|
||||||
.json()
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("invalid JSON: {e}"))?;
|
|
||||||
Ok(filter_active(body.data))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Frozen/Archived tenants don't need scheduled scans; the M7.1
|
|
||||||
/// status gate would 402/410 anyway. Skip them so we don't waste
|
|
||||||
/// cycles. Active / trial / demo / anything-else-unknown all run.
|
|
||||||
fn filter_active(rows: Vec<RegistryTenant>) -> Vec<String> {
|
|
||||||
rows.into_iter()
|
|
||||||
.filter(|t| !matches!(t.status.as_str(), "frozen" | "archived"))
|
|
||||||
.map(|t| t.id)
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
fn tenant(id: &str, status: &str) -> RegistryTenant {
|
|
||||||
RegistryTenant {
|
|
||||||
id: id.to_string(),
|
|
||||||
status: status.to_string(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn filter_active_keeps_running_skips_frozen_archived() {
|
|
||||||
let rows = vec![
|
|
||||||
tenant("a", "active"),
|
|
||||||
tenant("b", "trial"),
|
|
||||||
tenant("c", "demo"),
|
|
||||||
tenant("d", "frozen"),
|
|
||||||
tenant("e", "archived"),
|
|
||||||
tenant("f", "weird-but-not-known-dead"),
|
|
||||||
];
|
|
||||||
let out = filter_active(rows);
|
|
||||||
assert_eq!(out, vec!["a", "b", "c", "f"]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn deserialize_registry_response_accepts_id_or_tenant_id() {
|
|
||||||
let body = r#"{"data":[
|
|
||||||
{"id":"a","status":"active"},
|
|
||||||
{"tenant_id":"b","status":"trial"},
|
|
||||||
{"id":"c"}
|
|
||||||
]}"#;
|
|
||||||
let parsed: RegistryListResponse = serde_json::from_str(body).unwrap();
|
|
||||||
assert_eq!(parsed.data.len(), 3);
|
|
||||||
assert_eq!(parsed.data[0].id, "a");
|
|
||||||
assert_eq!(parsed.data[1].id, "b");
|
|
||||||
assert_eq!(parsed.data[2].id, "c");
|
|
||||||
// Default status for the third entry should be "active"
|
|
||||||
assert_eq!(parsed.data[2].status, "active");
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Combined into a single test: cargo runs tests in parallel and
|
|
||||||
/// env vars are process-global, so two separate tests touching
|
|
||||||
/// `SCHEDULER_TENANT_IDS` race each other. Doing both checks in
|
|
||||||
/// one test keeps them in a deterministic order.
|
|
||||||
#[test]
|
|
||||||
fn tenants_from_env_resolution() {
|
|
||||||
std::env::remove_var("SCHEDULER_TENANT_IDS");
|
|
||||||
assert_eq!(
|
|
||||||
tenants_from_env(),
|
|
||||||
vec![DEFAULT_SCHEDULER_TENANT_ID.to_string()],
|
|
||||||
"unset → default"
|
|
||||||
);
|
|
||||||
|
|
||||||
std::env::set_var("SCHEDULER_TENANT_IDS", "acme, globex ,,hello");
|
|
||||||
let out = tenants_from_env();
|
|
||||||
std::env::remove_var("SCHEDULER_TENANT_IDS");
|
|
||||||
assert_eq!(
|
|
||||||
out,
|
|
||||||
vec!["acme", "globex", "hello"],
|
|
||||||
"splits + trims + drops empty"
|
|
||||||
);
|
|
||||||
|
|
||||||
std::env::set_var("SCHEDULER_TENANT_IDS", "");
|
|
||||||
let out = tenants_from_env();
|
|
||||||
std::env::remove_var("SCHEDULER_TENANT_IDS");
|
|
||||||
assert_eq!(
|
|
||||||
out,
|
|
||||||
vec![DEFAULT_SCHEDULER_TENANT_ID.to_string()],
|
|
||||||
"empty → default"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Resolve the per-tenant database. Logs and returns `None` on failure
|
/// Resolve the per-tenant database. Logs and returns `None` on failure
|
||||||
/// so the loop in the caller can continue with other tenants.
|
/// so the loop in the caller can continue with other tenants.
|
||||||
async fn tenant_db(agent: &ComplianceAgent, tenant_id: &str) -> Option<Database> {
|
async fn tenant_db(agent: &ComplianceAgent, tenant_id: &str) -> Option<Database> {
|
||||||
|
|||||||
Reference in New Issue
Block a user