cdfbb62f9d
CI / Check (pull_request) Successful in 8m9s
CI / Detect Changes (pull_request) Has been skipped
CI / Deploy Agent (pull_request) Has been skipped
CI / Deploy Dashboard (pull_request) Has been skipped
CI / Deploy Docs (pull_request) Has been skipped
CI / Deploy MCP (pull_request) Has been skipped
Builds on PR M7.2-A. Every HTTP handler in compliance-agent/src/api/
now takes a TenantCtx extractor and pulls a tenant-scoped Database
from agent.db_pool.for_tenant(&ctx). The query bodies are unchanged —
`db.findings().find(doc! {...})` reads from the tenant's own physical
database, so the filter doc cannot leak data across tenants because
the wrong tenant's data is literally on a different db handle.
Changes
- New `dto::tenant_db(&agent, &tenant) -> Result<Database, StatusCode>`
helper. Every migrated handler calls it at the top of the body
instead of `let db = &agent.db;`. 500 on the rare pool failure;
4xx auth failures are already handled by the M7.1 status gate.
- New `api::server::inject_dev_tenant` middleware mounted only when
Keycloak is NOT configured. Synthesizes a TenantContext with
tenant_id = $DEV_TENANT_ID (default `dev`) so `cargo run` against
a bare Mongo + no KC still serves the API. Logged loudly as
"DO NOT use in any environment with real customer data".
- Test harness: TestServer mounts inject_dev_tenant so existing E2E
tests reach handlers; cleanup() now drops every <db_name>_*
per-tenant database, not just the legacy <db_name>.
Files migrated (handler count, all pass `cargo build`):
- chat.rs (3) — also rewires RagPipeline + EmbeddingStore to the
tenant DB's inner() so vector search is per-tenant
- dast.rs (5)
- findings.rs (5)
- graph.rs (7) — also rewires GraphStore inside trigger_build's
spawn to the tenant DB
- health.rs (1) — stats_overview migrated; public /health stays
un-scoped
- issues.rs (1)
- notifications.rs (5)
- pentest_handlers/session.rs (12) — both wizard + legacy paths,
plus pause/resume/stop/get_attack_chain/get_messages/
get_session_findings/lookup_repo. PentestOrchestrator now gets
the tenant DB clone in its spawn.
- pentest_handlers/export.rs (1) — fans out across sessions,
attack_chain_nodes, dast_findings, findings, sbom_entries,
graph_nodes from a single tenant_db acquisition
- pentest_handlers/stats.rs (1)
- pentest_handlers/stream.rs (1) — SSE handler verifies session
via the tenant DB before subscribing
- repos.rs (6)
- sbom.rs (5)
- scans.rs (1)
help_chat.rs has no DB queries and was skipped.
Test plan
- cargo fmt --all clean
- cargo clippy --workspace --exclude compliance-dashboard
-- -D warnings clean
- cargo test -p compliance-core --lib — 7 pass
- cargo test -p compliance-agent --lib — 228 pass
- cargo test -p compliance-agent --test tenant_isolation — 5 pass
(driver-level isolation still holds post-handler migration)
- cargo test -p compliance-agent --test tenant_status_middleware
— 6 pass
What's not yet migrated (PR-C / PR-D)
- scheduler.rs (6 sites), pipeline/orchestrator.rs (14),
pentest/orchestrator.rs (13), webhooks (gitea/github/gitlab),
trackers/jira.rs, pipeline/dedup.rs etc. — background paths
without a JWT-derived tenant context.
- agent.db is still in the ComplianceAgent struct as a transitional
handle for those paths. PR-D removes it once PR-C migrates the
background paths.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
184 lines
5.4 KiB
Rust
184 lines
5.4 KiB
Rust
use axum::extract::Extension;
|
|
use axum::http::StatusCode;
|
|
use axum::Json;
|
|
use mongodb::bson::doc;
|
|
use serde::Deserialize;
|
|
|
|
use compliance_core::models::notification::CveNotification;
|
|
use compliance_core::tenant_ctx::TenantCtx;
|
|
|
|
use super::dto::{tenant_db, AgentExt, ApiResponse};
|
|
|
|
/// GET /api/v1/notifications — List CVE notifications (newest first)
|
|
#[tracing::instrument(skip_all)]
|
|
pub async fn list_notifications(
|
|
Extension(agent): AgentExt,
|
|
tenant: TenantCtx,
|
|
axum::extract::Query(params): axum::extract::Query<NotificationFilter>,
|
|
) -> Result<Json<ApiResponse<Vec<CveNotification>>>, StatusCode> {
|
|
let db = tenant_db(&agent, &tenant).await?;
|
|
let mut filter = doc! {};
|
|
|
|
// Filter by status (default: show new + read, exclude dismissed)
|
|
match params.status.as_deref() {
|
|
Some("all") => {}
|
|
Some(s) => {
|
|
filter.insert("status", s);
|
|
}
|
|
None => {
|
|
filter.insert("status", doc! { "$in": ["new", "read"] });
|
|
}
|
|
}
|
|
|
|
// Filter by severity
|
|
if let Some(ref sev) = params.severity {
|
|
filter.insert("severity", sev.as_str());
|
|
}
|
|
|
|
// Filter by repo
|
|
if let Some(ref repo_id) = params.repo_id {
|
|
filter.insert("repo_id", repo_id.as_str());
|
|
}
|
|
|
|
let page = params.page.unwrap_or(1).max(1);
|
|
let limit = params.limit.unwrap_or(50).min(200);
|
|
let skip = (page - 1) * limit as u64;
|
|
|
|
let total = db
|
|
.cve_notifications()
|
|
.count_documents(filter.clone())
|
|
.await
|
|
.unwrap_or(0);
|
|
|
|
let notifications: Vec<CveNotification> = match db
|
|
.cve_notifications()
|
|
.find(filter)
|
|
.sort(doc! { "created_at": -1 })
|
|
.skip(skip)
|
|
.limit(limit)
|
|
.await
|
|
{
|
|
Ok(cursor) => {
|
|
use futures_util::StreamExt;
|
|
let mut items = Vec::new();
|
|
let mut cursor = cursor;
|
|
while let Some(Ok(n)) = cursor.next().await {
|
|
items.push(n);
|
|
}
|
|
items
|
|
}
|
|
Err(e) => {
|
|
tracing::error!("Failed to list notifications: {e}");
|
|
return Err(StatusCode::INTERNAL_SERVER_ERROR);
|
|
}
|
|
};
|
|
|
|
Ok(Json(ApiResponse {
|
|
data: notifications,
|
|
total: Some(total),
|
|
page: Some(page),
|
|
}))
|
|
}
|
|
|
|
/// GET /api/v1/notifications/count — Count of unread notifications
|
|
#[tracing::instrument(skip_all)]
|
|
pub async fn notification_count(
|
|
Extension(agent): AgentExt,
|
|
tenant: TenantCtx,
|
|
) -> Result<Json<serde_json::Value>, StatusCode> {
|
|
let db = tenant_db(&agent, &tenant).await?;
|
|
let count = db
|
|
.cve_notifications()
|
|
.count_documents(doc! { "status": "new" })
|
|
.await
|
|
.unwrap_or(0);
|
|
|
|
Ok(Json(serde_json::json!({ "count": count })))
|
|
}
|
|
|
|
/// PATCH /api/v1/notifications/:id/read — Mark a notification as read
|
|
#[tracing::instrument(skip_all, fields(id = %id))]
|
|
pub async fn mark_read(
|
|
Extension(agent): AgentExt,
|
|
tenant: TenantCtx,
|
|
axum::extract::Path(id): axum::extract::Path<String>,
|
|
) -> Result<Json<serde_json::Value>, StatusCode> {
|
|
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
|
|
let db = tenant_db(&agent, &tenant).await?;
|
|
|
|
let result = db
|
|
.cve_notifications()
|
|
.update_one(
|
|
doc! { "_id": oid },
|
|
doc! { "$set": {
|
|
"status": "read",
|
|
"read_at": mongodb::bson::DateTime::now(),
|
|
}},
|
|
)
|
|
.await
|
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
|
|
|
if result.matched_count == 0 {
|
|
return Err(StatusCode::NOT_FOUND);
|
|
}
|
|
Ok(Json(serde_json::json!({ "status": "read" })))
|
|
}
|
|
|
|
/// PATCH /api/v1/notifications/:id/dismiss — Dismiss a notification
|
|
#[tracing::instrument(skip_all, fields(id = %id))]
|
|
pub async fn dismiss_notification(
|
|
Extension(agent): AgentExt,
|
|
tenant: TenantCtx,
|
|
axum::extract::Path(id): axum::extract::Path<String>,
|
|
) -> Result<Json<serde_json::Value>, StatusCode> {
|
|
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
|
|
let db = tenant_db(&agent, &tenant).await?;
|
|
|
|
let result = db
|
|
.cve_notifications()
|
|
.update_one(
|
|
doc! { "_id": oid },
|
|
doc! { "$set": { "status": "dismissed" } },
|
|
)
|
|
.await
|
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
|
|
|
if result.matched_count == 0 {
|
|
return Err(StatusCode::NOT_FOUND);
|
|
}
|
|
Ok(Json(serde_json::json!({ "status": "dismissed" })))
|
|
}
|
|
|
|
/// POST /api/v1/notifications/read-all — Mark all new notifications as read
|
|
#[tracing::instrument(skip_all)]
|
|
pub async fn mark_all_read(
|
|
Extension(agent): AgentExt,
|
|
tenant: TenantCtx,
|
|
) -> Result<Json<serde_json::Value>, StatusCode> {
|
|
let db = tenant_db(&agent, &tenant).await?;
|
|
let result = db
|
|
.cve_notifications()
|
|
.update_many(
|
|
doc! { "status": "new" },
|
|
doc! { "$set": {
|
|
"status": "read",
|
|
"read_at": mongodb::bson::DateTime::now(),
|
|
}},
|
|
)
|
|
.await
|
|
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
|
|
|
Ok(Json(
|
|
serde_json::json!({ "updated": result.modified_count }),
|
|
))
|
|
}
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
pub struct NotificationFilter {
|
|
pub status: Option<String>,
|
|
pub severity: Option<String>,
|
|
pub repo_id: Option<String>,
|
|
pub page: Option<u64>,
|
|
pub limit: Option<i64>,
|
|
}
|