From 608611423baeffb502110555b16a38b6f33148a3 Mon Sep 17 00:00:00 2001 From: Sharang Parnerkar <30073382+mighty840@users.noreply.github.com> Date: Thu, 18 Jun 2026 13:07:11 +0200 Subject: [PATCH] feat(m7.3): scheduler pulls tenants from registry, env as fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the M7.2-C static `SCHEDULER_TENANT_IDS` env enumeration with a live query to the tenant-registry at every tick. New tenants get picked up without an agent restart; the env stays as the fallback when the registry is unreachable so the scheduler is never silenced by a registry outage. Resolution order 1. agent.config.tenant_registry_url → GET /v1/tenants - 5s timeout (kept short — we'd rather fall back than block the tick) - Frozen and Archived tenants filtered out (the M7.1 status gate would 402/410 them anyway, no point scanning their repos) - Accepts either {"id"} or {"tenant_id"} for forward compatibility with whatever shape the registry settles on 2. SCHEDULER_TENANT_IDS env (comma-separated) — fallback when the registry URL is unset OR the fetch fails OR the parsed response is empty. Each failure mode logs a warn with the url so operators see the problem. 3. DEFAULT_SCHEDULER_TENANT_ID ("dev") — last-ditch fallback so a bare `cargo run` against a clean Mongo still scans the dev tenant. Why each tick instead of caching - Tick frequency is every few hours (scan_schedule default "0 0 */6 * * *"). The registry call is at most 4 times a day per agent — cheap. - Caching introduces a staleness window for newly provisioned tenants. The whole point of registry integration is to pick them up fast. Startup log - Includes "tenant source=tenant-registry" or "env" so operators can tell at a glance which mode the scheduler is in. Test plan - cargo fmt --all clean - cargo clippy -p compliance-agent -- -D warnings clean - cargo test -p compliance-agent --lib — 232 pass (+3 new): * filter_active_keeps_running_skips_frozen_archived * deserialize_registry_response_accepts_id_or_tenant_id (covers the {"id"|"tenant_id"} alias) * tenants_from_env_resolution (single test covering unset → default, csv → splits, "" → default — collapsed to one to avoid env-var test races) Production - Set TENANT_REGISTRY_URL in orca-infra alongside KEYCLOAK_URL when the registry is ready to serve. Until then, scheduler keeps using SCHEDULER_TENANT_IDS — no operator action needed. - Future M7.4 cleanup: once tenant-registry adoption is universal, delete SCHEDULER_TENANT_IDS env support entirely. Stacked on #95 (admin endpoints) since that PR added tenant_registry_url to AgentConfig. Once #95 lands this auto- retargets to main. Co-Authored-By: Claude Opus 4.7 --- compliance-agent/src/scheduler.rs | 202 ++++++++++++++++++++++++++++-- 1 file changed, 191 insertions(+), 11 deletions(-) diff --git a/compliance-agent/src/scheduler.rs b/compliance-agent/src/scheduler.rs index 81581a1..d1d1e1d 100644 --- a/compliance-agent/src/scheduler.rs +++ b/compliance-agent/src/scheduler.rs @@ -7,11 +7,18 @@ use crate::agent::ComplianceAgent; use crate::database::Database; use crate::error::AgentError; -/// Default tenant the scheduler runs against when `SCHEDULER_TENANT_IDS` -/// isn't set. Matches the dev-injector default so a bare `cargo run` has -/// the scheduler scanning whatever lives in `_dev`. +/// Default tenant the scheduler runs against when neither the tenant +/// registry nor `SCHEDULER_TENANT_IDS` are configured. Matches the +/// dev-injector default so a bare `cargo run` has the scheduler +/// scanning whatever lives in `_dev`. const DEFAULT_SCHEDULER_TENANT_ID: &str = "dev"; +/// Request timeout when fetching the live tenant list from the +/// registry. Kept short — if the registry is slow we'd rather fall +/// back to env-configured ids and finish the tick than block the +/// scheduler loop. +const REGISTRY_FETCH_TIMEOUT_SECS: u64 = 5; + pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> { let sched = JobScheduler::new() .await @@ -24,7 +31,12 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> let agent = scan_agent.clone(); Box::pin(async move { tracing::info!("Scheduled scan triggered"); - for tenant_id in scheduler_tenants() { + let tenants = scheduler_tenants(&agent).await; + tracing::debug!( + tenant_count = tenants.len(), + "Scheduled scan: tenants resolved" + ); + for tenant_id in tenants { scan_all_repos(&agent, &tenant_id).await; } }) @@ -42,7 +54,12 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> let agent = cve_agent.clone(); Box::pin(async move { tracing::info!("CVE monitor triggered"); - for tenant_id in scheduler_tenants() { + let tenants = scheduler_tenants(&agent).await; + tracing::debug!( + tenant_count = tenants.len(), + "CVE monitor: tenants resolved" + ); + for tenant_id in tenants { monitor_cves(&agent, &tenant_id).await; } }) @@ -58,9 +75,14 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> .await .map_err(|e| AgentError::Scheduler(format!("Failed to start scheduler: {e}")))?; - let tenants = scheduler_tenants(); + let tenants = scheduler_tenants(agent).await; + let source = if agent.config.tenant_registry_url.is_some() { + "tenant-registry (env fallback)" + } else { + "env (SCHEDULER_TENANT_IDS)" + }; tracing::info!( - "Scheduler started: scans='{}', CVE monitor='{}', tenants={tenants:?}", + "Scheduler started: scans='{}', CVE monitor='{}', tenant source={source}, tenants={tenants:?}", agent.config.scan_schedule, agent.config.cve_monitor_schedule, ); @@ -71,10 +93,40 @@ pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> } } -/// Tenants the scheduler iterates each tick. From `SCHEDULER_TENANT_IDS` -/// (comma-separated), or `DEFAULT_SCHEDULER_TENANT_ID` if unset. M7.2-D -/// will replace this with a pull from the tenant-registry. -fn scheduler_tenants() -> Vec { +/// Tenants the scheduler iterates each tick. +/// +/// Resolution order: +/// 1. **Tenant registry** at `agent.config.tenant_registry_url` +/// (`GET /v1/tenants`). Fresh on every tick — picks up newly +/// provisioned tenants without an agent restart. +/// 2. **`SCHEDULER_TENANT_IDS`** env (comma-separated) — fallback when +/// the registry is unreachable, the response is malformed, or no +/// registry URL is configured. +/// 3. **`DEFAULT_SCHEDULER_TENANT_ID`** (`"dev"`) — last-ditch fallback +/// so the scheduler keeps doing something useful in dev. +/// +/// We never panic out of this function — the scheduler must keep +/// firing even if the registry is offline. +async fn scheduler_tenants(agent: &ComplianceAgent) -> Vec { + if let Some(url) = agent.config.tenant_registry_url.as_deref() { + match fetch_tenants_from_registry(&agent.http, url).await { + Ok(v) if !v.is_empty() => return v, + Ok(_) => { + tracing::warn!("tenant-registry returned empty list; falling back to env"); + } + Err(e) => { + tracing::warn!( + url = %url, + error = %e, + "tenant-registry fetch failed; falling back to env" + ); + } + } + } + tenants_from_env() +} + +fn tenants_from_env() -> Vec { std::env::var("SCHEDULER_TENANT_IDS") .ok() .map(|s| { @@ -88,6 +140,134 @@ fn scheduler_tenants() -> Vec { .unwrap_or_else(|| vec![DEFAULT_SCHEDULER_TENANT_ID.to_string()]) } +/// Shape we accept from the registry. Liberal in what we accept: +/// the registry can return any field shape as long as either `id` or +/// `tenant_id` is present. Other fields are ignored. +#[derive(serde::Deserialize)] +struct RegistryTenant { + #[serde(alias = "tenant_id")] + id: String, + /// Filter out non-running tenants if status is present. Missing + /// status defaults to "active" so older registry deployments keep + /// working. + #[serde(default = "default_status")] + status: String, +} + +fn default_status() -> String { + "active".to_string() +} + +#[derive(serde::Deserialize)] +struct RegistryListResponse { + data: Vec, +} + +async fn fetch_tenants_from_registry( + http: &reqwest::Client, + base_url: &str, +) -> Result, String> { + let url = format!("{}/v1/tenants", base_url.trim_end_matches('/')); + let resp = http + .get(&url) + .timeout(std::time::Duration::from_secs(REGISTRY_FETCH_TIMEOUT_SECS)) + .send() + .await + .map_err(|e| format!("request failed: {e}"))?; + if !resp.status().is_success() { + return Err(format!("registry returned {}", resp.status())); + } + let body: RegistryListResponse = resp + .json() + .await + .map_err(|e| format!("invalid JSON: {e}"))?; + Ok(filter_active(body.data)) +} + +/// Frozen/Archived tenants don't need scheduled scans; the M7.1 +/// status gate would 402/410 anyway. Skip them so we don't waste +/// cycles. Active / trial / demo / anything-else-unknown all run. +fn filter_active(rows: Vec) -> Vec { + rows.into_iter() + .filter(|t| !matches!(t.status.as_str(), "frozen" | "archived")) + .map(|t| t.id) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + fn tenant(id: &str, status: &str) -> RegistryTenant { + RegistryTenant { + id: id.to_string(), + status: status.to_string(), + } + } + + #[test] + fn filter_active_keeps_running_skips_frozen_archived() { + let rows = vec![ + tenant("a", "active"), + tenant("b", "trial"), + tenant("c", "demo"), + tenant("d", "frozen"), + tenant("e", "archived"), + tenant("f", "weird-but-not-known-dead"), + ]; + let out = filter_active(rows); + assert_eq!(out, vec!["a", "b", "c", "f"]); + } + + #[test] + fn deserialize_registry_response_accepts_id_or_tenant_id() { + let body = r#"{"data":[ + {"id":"a","status":"active"}, + {"tenant_id":"b","status":"trial"}, + {"id":"c"} + ]}"#; + let parsed: RegistryListResponse = serde_json::from_str(body).unwrap(); + assert_eq!(parsed.data.len(), 3); + assert_eq!(parsed.data[0].id, "a"); + assert_eq!(parsed.data[1].id, "b"); + assert_eq!(parsed.data[2].id, "c"); + // Default status for the third entry should be "active" + assert_eq!(parsed.data[2].status, "active"); + } + + /// Combined into a single test: cargo runs tests in parallel and + /// env vars are process-global, so two separate tests touching + /// `SCHEDULER_TENANT_IDS` race each other. Doing both checks in + /// one test keeps them in a deterministic order. + #[test] + fn tenants_from_env_resolution() { + std::env::remove_var("SCHEDULER_TENANT_IDS"); + assert_eq!( + tenants_from_env(), + vec![DEFAULT_SCHEDULER_TENANT_ID.to_string()], + "unset → default" + ); + + std::env::set_var("SCHEDULER_TENANT_IDS", "acme, globex ,,hello"); + let out = tenants_from_env(); + std::env::remove_var("SCHEDULER_TENANT_IDS"); + assert_eq!( + out, + vec!["acme", "globex", "hello"], + "splits + trims + drops empty" + ); + + std::env::set_var("SCHEDULER_TENANT_IDS", ""); + let out = tenants_from_env(); + std::env::remove_var("SCHEDULER_TENANT_IDS"); + assert_eq!( + out, + vec![DEFAULT_SCHEDULER_TENANT_ID.to_string()], + "empty → default" + ); + } +} + /// Resolve the per-tenant database. Logs and returns `None` on failure /// so the loop in the caller can continue with other tenants. async fn tenant_db(agent: &ComplianceAgent, tenant_id: &str) -> Option { -- 2.52.0