Files
compliance-scanner-agent/compliance-agent/src/api/handlers/repos.rs
Sharang Parnerkar bae24f9cf8
All checks were successful
CI / Check (push) Has been skipped
CI / Detect Changes (push) Successful in 4s
CI / Deploy Agent (push) Successful in 9s
CI / Deploy Dashboard (push) Has been skipped
CI / Deploy Docs (push) Has been skipped
CI / Deploy MCP (push) Has been skipped
fix: cascade-delete DAST targets, pentests, and downstream data on repo delete (#50)
2026-03-30 07:11:23 +00:00

329 lines
11 KiB
Rust

use axum::extract::{Extension, Path, Query};
use axum::http::StatusCode;
use axum::Json;
use mongodb::bson::doc;
use super::dto::*;
use compliance_core::models::*;
#[tracing::instrument(skip_all)]
pub async fn list_repositories(
Extension(agent): AgentExt,
Query(params): Query<PaginationParams>,
) -> ApiResult<Vec<TrackedRepository>> {
let db = &agent.db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db
.repositories()
.count_documents(doc! {})
.await
.unwrap_or(0);
let repos = match db
.repositories()
.find(doc! {})
.skip(skip)
.limit(params.limit)
.await
{
Ok(cursor) => collect_cursor_async(cursor).await,
Err(e) => {
tracing::warn!("Failed to fetch repositories: {e}");
Vec::new()
}
};
Ok(Json(ApiResponse {
data: repos,
total: Some(total),
page: Some(params.page),
}))
}
#[tracing::instrument(skip_all)]
pub async fn add_repository(
Extension(agent): AgentExt,
Json(req): Json<AddRepositoryRequest>,
) -> Result<Json<ApiResponse<TrackedRepository>>, (StatusCode, String)> {
// Validate repository access before saving
let creds = crate::pipeline::git::RepoCredentials {
ssh_key_path: Some(agent.config.ssh_key_path.clone()),
auth_token: req.auth_token.clone(),
auth_username: req.auth_username.clone(),
};
if let Err(e) = crate::pipeline::git::GitOps::test_access(&req.git_url, &creds) {
return Err((
StatusCode::BAD_REQUEST,
format!("Cannot access repository: {e}"),
));
}
let mut repo = TrackedRepository::new(req.name, req.git_url);
repo.default_branch = req.default_branch;
repo.auth_token = req.auth_token;
repo.auth_username = req.auth_username;
repo.tracker_type = req.tracker_type;
repo.tracker_owner = req.tracker_owner;
repo.tracker_repo = req.tracker_repo;
repo.tracker_token = req.tracker_token;
repo.scan_schedule = req.scan_schedule;
agent
.db
.repositories()
.insert_one(&repo)
.await
.map_err(|_| {
(
StatusCode::CONFLICT,
"Repository already exists".to_string(),
)
})?;
Ok(Json(ApiResponse {
data: repo,
total: None,
page: None,
}))
}
#[tracing::instrument(skip_all, fields(repo_id = %id))]
pub async fn update_repository(
Extension(agent): AgentExt,
Path(id): Path<String>,
Json(req): Json<UpdateRepositoryRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let mut set_doc = doc! { "updated_at": mongodb::bson::DateTime::now() };
if let Some(name) = &req.name {
set_doc.insert("name", name);
}
if let Some(branch) = &req.default_branch {
set_doc.insert("default_branch", branch);
}
if let Some(token) = &req.auth_token {
set_doc.insert("auth_token", token);
}
if let Some(username) = &req.auth_username {
set_doc.insert("auth_username", username);
}
if let Some(tracker_type) = &req.tracker_type {
set_doc.insert("tracker_type", tracker_type.to_string());
}
if let Some(owner) = &req.tracker_owner {
set_doc.insert("tracker_owner", owner);
}
if let Some(repo) = &req.tracker_repo {
set_doc.insert("tracker_repo", repo);
}
if let Some(token) = &req.tracker_token {
set_doc.insert("tracker_token", token);
}
if let Some(schedule) = &req.scan_schedule {
set_doc.insert("scan_schedule", schedule);
}
let result = agent
.db
.repositories()
.update_one(doc! { "_id": oid }, doc! { "$set": set_doc })
.await
.map_err(|e| {
tracing::warn!("Failed to update repository: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
if result.matched_count == 0 {
return Err(StatusCode::NOT_FOUND);
}
Ok(Json(serde_json::json!({ "status": "updated" })))
}
#[tracing::instrument(skip_all)]
pub async fn get_ssh_public_key(
Extension(agent): AgentExt,
) -> Result<Json<serde_json::Value>, StatusCode> {
let public_path = format!("{}.pub", agent.config.ssh_key_path);
let public_key = std::fs::read_to_string(&public_path).map_err(|_| StatusCode::NOT_FOUND)?;
Ok(Json(serde_json::json!({ "public_key": public_key.trim() })))
}
#[tracing::instrument(skip_all, fields(repo_id = %id))]
pub async fn trigger_scan(
Extension(agent): AgentExt,
Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let agent_clone = (*agent).clone();
tokio::spawn(async move {
if let Err(e) = agent_clone.run_scan(&id, ScanTrigger::Manual).await {
tracing::error!("Manual scan failed for {id}: {e}");
}
});
Ok(Json(serde_json::json!({ "status": "scan_triggered" })))
}
/// Return the webhook secret for a repository (used by dashboard to display it)
pub async fn get_webhook_config(
Extension(agent): AgentExt,
Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let repo = agent
.db
.repositories()
.find_one(doc! { "_id": oid })
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.ok_or(StatusCode::NOT_FOUND)?;
let tracker_type = repo
.tracker_type
.as_ref()
.map(|t| t.to_string())
.unwrap_or_else(|| "gitea".to_string());
Ok(Json(serde_json::json!({
"webhook_secret": repo.webhook_secret,
"tracker_type": tracker_type,
})))
}
#[tracing::instrument(skip_all, fields(repo_id = %id))]
pub async fn delete_repository(
Extension(agent): AgentExt,
Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let db = &agent.db;
// Delete the repository
let result = db
.repositories()
.delete_one(doc! { "_id": oid })
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
if result.deleted_count == 0 {
return Err(StatusCode::NOT_FOUND);
}
// Cascade delete all related data
let _ = db.findings().delete_many(doc! { "repo_id": &id }).await;
let _ = db.sbom_entries().delete_many(doc! { "repo_id": &id }).await;
let _ = db.scan_runs().delete_many(doc! { "repo_id": &id }).await;
let _ = db.cve_alerts().delete_many(doc! { "repo_id": &id }).await;
let _ = db
.tracker_issues()
.delete_many(doc! { "repo_id": &id })
.await;
let _ = db.graph_nodes().delete_many(doc! { "repo_id": &id }).await;
let _ = db.graph_edges().delete_many(doc! { "repo_id": &id }).await;
let _ = db.graph_builds().delete_many(doc! { "repo_id": &id }).await;
let _ = db
.impact_analyses()
.delete_many(doc! { "repo_id": &id })
.await;
let _ = db
.code_embeddings()
.delete_many(doc! { "repo_id": &id })
.await;
let _ = db
.embedding_builds()
.delete_many(doc! { "repo_id": &id })
.await;
// Cascade delete DAST targets linked to this repo, and all their downstream data
// (scan runs, findings, pentest sessions, attack chains, messages)
if let Ok(mut cursor) = db.dast_targets().find(doc! { "repo_id": &id }).await {
use futures_util::StreamExt;
while let Some(Ok(target)) = cursor.next().await {
let target_id = target.id.map(|oid| oid.to_hex()).unwrap_or_default();
if !target_id.is_empty() {
cascade_delete_dast_target(db, &target_id).await;
}
}
}
// Also delete pentest sessions linked directly to this repo (not via target)
if let Ok(mut cursor) = db.pentest_sessions().find(doc! { "repo_id": &id }).await {
use futures_util::StreamExt;
while let Some(Ok(session)) = cursor.next().await {
let session_id = session.id.map(|oid| oid.to_hex()).unwrap_or_default();
if !session_id.is_empty() {
let _ = db
.attack_chain_nodes()
.delete_many(doc! { "session_id": &session_id })
.await;
let _ = db
.pentest_messages()
.delete_many(doc! { "session_id": &session_id })
.await;
// Delete DAST findings produced by this session
let _ = db
.dast_findings()
.delete_many(doc! { "session_id": &session_id })
.await;
}
}
}
let _ = db
.pentest_sessions()
.delete_many(doc! { "repo_id": &id })
.await;
Ok(Json(serde_json::json!({ "status": "deleted" })))
}
/// Cascade-delete a DAST target and all its downstream data.
async fn cascade_delete_dast_target(db: &crate::database::Database, target_id: &str) {
// Delete pentest sessions for this target (and their attack chains + messages)
if let Ok(mut cursor) = db
.pentest_sessions()
.find(doc! { "target_id": target_id })
.await
{
use futures_util::StreamExt;
while let Some(Ok(session)) = cursor.next().await {
let session_id = session.id.map(|oid| oid.to_hex()).unwrap_or_default();
if !session_id.is_empty() {
let _ = db
.attack_chain_nodes()
.delete_many(doc! { "session_id": &session_id })
.await;
let _ = db
.pentest_messages()
.delete_many(doc! { "session_id": &session_id })
.await;
let _ = db
.dast_findings()
.delete_many(doc! { "session_id": &session_id })
.await;
}
}
}
let _ = db
.pentest_sessions()
.delete_many(doc! { "target_id": target_id })
.await;
// Delete DAST scan runs and their findings
let _ = db
.dast_findings()
.delete_many(doc! { "target_id": target_id })
.await;
let _ = db
.dast_scan_runs()
.delete_many(doc! { "target_id": target_id })
.await;
// Delete the target itself
if let Ok(oid) = mongodb::bson::oid::ObjectId::parse_str(target_id) {
let _ = db.dast_targets().delete_one(doc! { "_id": oid }).await;
}
}