Initial commit: Compliance Scanner Agent

Autonomous security and compliance scanning agent for git repositories.
Features: SAST (Semgrep), SBOM (Syft), CVE monitoring (OSV.dev/NVD),
GDPR/OAuth pattern detection, LLM triage, issue creation (GitHub/GitLab/Jira),
PR reviews, and Dioxus fullstack dashboard.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Sharang Parnerkar
2026-03-02 13:30:17 +01:00
commit 0867e401bc
97 changed files with 11750 additions and 0 deletions

View File

@@ -0,0 +1,35 @@
[package]
name = "compliance-agent"
version = "0.1.0"
edition = "2021"
[lints]
workspace = true
[dependencies]
compliance-core = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
chrono = { workspace = true }
mongodb = { workspace = true }
reqwest = { workspace = true }
thiserror = { workspace = true }
sha2 = { workspace = true }
hex = { workspace = true }
uuid = { workspace = true }
secrecy = { workspace = true }
regex = { workspace = true }
axum = "0.8"
tower-http = { version = "0.6", features = ["cors", "trace"] }
git2 = "0.20"
octocrab = "0.44"
tokio-cron-scheduler = "0.13"
dotenvy = "0.15"
hmac = "0.12"
walkdir = "2"
base64 = "0.22"
urlencoding = "2"
futures-util = "0.3"

View File

@@ -0,0 +1,45 @@
use std::sync::Arc;
use compliance_core::AgentConfig;
use crate::database::Database;
use crate::llm::LlmClient;
use crate::pipeline::orchestrator::PipelineOrchestrator;
#[derive(Clone)]
pub struct ComplianceAgent {
pub config: AgentConfig,
pub db: Database,
pub llm: Arc<LlmClient>,
pub http: reqwest::Client,
}
impl ComplianceAgent {
pub fn new(config: AgentConfig, db: Database) -> Self {
let llm = Arc::new(LlmClient::new(
config.litellm_url.clone(),
config.litellm_api_key.clone(),
config.litellm_model.clone(),
));
Self {
config,
db,
llm,
http: reqwest::Client::new(),
}
}
pub async fn run_scan(
&self,
repo_id: &str,
trigger: compliance_core::models::ScanTrigger,
) -> Result<(), crate::error::AgentError> {
let orchestrator = PipelineOrchestrator::new(
self.config.clone(),
self.db.clone(),
self.llm.clone(),
self.http.clone(),
);
orchestrator.run(repo_id, trigger).await
}
}

View File

@@ -0,0 +1,334 @@
use std::sync::Arc;
#[allow(unused_imports)]
use axum::extract::{Extension, Path, Query};
use axum::http::StatusCode;
use axum::Json;
use mongodb::bson::doc;
use serde::{Deserialize, Serialize};
use compliance_core::models::*;
use crate::agent::ComplianceAgent;
#[derive(Deserialize)]
pub struct PaginationParams {
#[serde(default = "default_page")]
pub page: u64,
#[serde(default = "default_limit")]
pub limit: i64,
}
fn default_page() -> u64 { 1 }
fn default_limit() -> i64 { 50 }
#[derive(Deserialize)]
pub struct FindingsFilter {
#[serde(default)]
pub repo_id: Option<String>,
#[serde(default)]
pub severity: Option<String>,
#[serde(default)]
pub scan_type: Option<String>,
#[serde(default)]
pub status: Option<String>,
#[serde(default = "default_page")]
pub page: u64,
#[serde(default = "default_limit")]
pub limit: i64,
}
#[derive(Serialize)]
pub struct ApiResponse<T: Serialize> {
pub data: T,
#[serde(skip_serializing_if = "Option::is_none")]
pub total: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub page: Option<u64>,
}
#[derive(Serialize)]
pub struct OverviewStats {
pub total_repositories: u64,
pub total_findings: u64,
pub critical_findings: u64,
pub high_findings: u64,
pub medium_findings: u64,
pub low_findings: u64,
pub total_sbom_entries: u64,
pub total_cve_alerts: u64,
pub total_issues: u64,
pub recent_scans: Vec<ScanRun>,
}
#[derive(Deserialize)]
pub struct AddRepositoryRequest {
pub name: String,
pub git_url: String,
#[serde(default = "default_branch")]
pub default_branch: String,
pub tracker_type: Option<TrackerType>,
pub tracker_owner: Option<String>,
pub tracker_repo: Option<String>,
pub scan_schedule: Option<String>,
}
fn default_branch() -> String { "main".to_string() }
#[derive(Deserialize)]
pub struct UpdateStatusRequest {
pub status: String,
}
type AgentExt = Extension<Arc<ComplianceAgent>>;
type ApiResult<T> = Result<Json<ApiResponse<T>>, StatusCode>;
pub async fn health() -> Json<serde_json::Value> {
Json(serde_json::json!({ "status": "ok" }))
}
pub async fn stats_overview(Extension(agent): AgentExt) -> ApiResult<OverviewStats> {
let db = &agent.db;
let total_repositories = db.repositories().count_documents(doc! {}).await.unwrap_or(0);
let total_findings = db.findings().count_documents(doc! {}).await.unwrap_or(0);
let critical_findings = db.findings().count_documents(doc! { "severity": "critical" }).await.unwrap_or(0);
let high_findings = db.findings().count_documents(doc! { "severity": "high" }).await.unwrap_or(0);
let medium_findings = db.findings().count_documents(doc! { "severity": "medium" }).await.unwrap_or(0);
let low_findings = db.findings().count_documents(doc! { "severity": "low" }).await.unwrap_or(0);
let total_sbom_entries = db.sbom_entries().count_documents(doc! {}).await.unwrap_or(0);
let total_cve_alerts = db.cve_alerts().count_documents(doc! {}).await.unwrap_or(0);
let total_issues = db.tracker_issues().count_documents(doc! {}).await.unwrap_or(0);
let recent_scans: Vec<ScanRun> = match db
.scan_runs()
.find(doc! {})
.sort(doc! { "started_at": -1 })
.limit(10)
.await
{
Ok(cursor) => collect_cursor_async(cursor).await,
Err(_) => Vec::new(),
};
Ok(Json(ApiResponse {
data: OverviewStats {
total_repositories,
total_findings,
critical_findings,
high_findings,
medium_findings,
low_findings,
total_sbom_entries,
total_cve_alerts,
total_issues,
recent_scans,
},
total: None,
page: None,
}))
}
pub async fn list_repositories(
Extension(agent): AgentExt,
Query(params): Query<PaginationParams>,
) -> ApiResult<Vec<TrackedRepository>> {
let db = &agent.db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db.repositories().count_documents(doc! {}).await.unwrap_or(0);
let repos = match db.repositories().find(doc! {}).skip(skip).limit(params.limit).await {
Ok(cursor) => collect_cursor_async(cursor).await,
Err(_) => Vec::new(),
};
Ok(Json(ApiResponse {
data: repos,
total: Some(total),
page: Some(params.page),
}))
}
pub async fn add_repository(
Extension(agent): AgentExt,
Json(req): Json<AddRepositoryRequest>,
) -> Result<Json<ApiResponse<TrackedRepository>>, StatusCode> {
let mut repo = TrackedRepository::new(req.name, req.git_url);
repo.default_branch = req.default_branch;
repo.tracker_type = req.tracker_type;
repo.tracker_owner = req.tracker_owner;
repo.tracker_repo = req.tracker_repo;
repo.scan_schedule = req.scan_schedule;
agent
.db
.repositories()
.insert_one(&repo)
.await
.map_err(|_| StatusCode::CONFLICT)?;
Ok(Json(ApiResponse {
data: repo,
total: None,
page: None,
}))
}
pub async fn trigger_scan(
Extension(agent): AgentExt,
Path(id): Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let agent_clone = (*agent).clone();
tokio::spawn(async move {
if let Err(e) = agent_clone.run_scan(&id, ScanTrigger::Manual).await {
tracing::error!("Manual scan failed for {id}: {e}");
}
});
Ok(Json(serde_json::json!({ "status": "scan_triggered" })))
}
pub async fn list_findings(
Extension(agent): AgentExt,
Query(filter): Query<FindingsFilter>,
) -> ApiResult<Vec<Finding>> {
let db = &agent.db;
let mut query = doc! {};
if let Some(repo_id) = &filter.repo_id {
query.insert("repo_id", repo_id);
}
if let Some(severity) = &filter.severity {
query.insert("severity", severity);
}
if let Some(scan_type) = &filter.scan_type {
query.insert("scan_type", scan_type);
}
if let Some(status) = &filter.status {
query.insert("status", status);
}
let skip = (filter.page.saturating_sub(1)) * filter.limit as u64;
let total = db.findings().count_documents(query.clone()).await.unwrap_or(0);
let findings = match db.findings().find(query).sort(doc! { "created_at": -1 }).skip(skip).limit(filter.limit).await {
Ok(cursor) => collect_cursor_async(cursor).await,
Err(_) => Vec::new(),
};
Ok(Json(ApiResponse {
data: findings,
total: Some(total),
page: Some(filter.page),
}))
}
pub async fn get_finding(
Extension(agent): AgentExt,
Path(id): Path<String>,
) -> Result<Json<ApiResponse<Finding>>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
let finding = agent
.db
.findings()
.find_one(doc! { "_id": oid })
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.ok_or(StatusCode::NOT_FOUND)?;
Ok(Json(ApiResponse {
data: finding,
total: None,
page: None,
}))
}
pub async fn update_finding_status(
Extension(agent): AgentExt,
Path(id): Path<String>,
Json(req): Json<UpdateStatusRequest>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let oid = mongodb::bson::oid::ObjectId::parse_str(&id).map_err(|_| StatusCode::BAD_REQUEST)?;
agent
.db
.findings()
.update_one(
doc! { "_id": oid },
doc! { "$set": { "status": &req.status, "updated_at": mongodb::bson::DateTime::now() } },
)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(serde_json::json!({ "status": "updated" })))
}
pub async fn list_sbom(
Extension(agent): AgentExt,
Query(params): Query<PaginationParams>,
) -> ApiResult<Vec<SbomEntry>> {
let db = &agent.db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db.sbom_entries().count_documents(doc! {}).await.unwrap_or(0);
let entries = match db.sbom_entries().find(doc! {}).skip(skip).limit(params.limit).await {
Ok(cursor) => collect_cursor_async(cursor).await,
Err(_) => Vec::new(),
};
Ok(Json(ApiResponse {
data: entries,
total: Some(total),
page: Some(params.page),
}))
}
pub async fn list_issues(
Extension(agent): AgentExt,
Query(params): Query<PaginationParams>,
) -> ApiResult<Vec<TrackerIssue>> {
let db = &agent.db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db.tracker_issues().count_documents(doc! {}).await.unwrap_or(0);
let issues = match db.tracker_issues().find(doc! {}).sort(doc! { "created_at": -1 }).skip(skip).limit(params.limit).await {
Ok(cursor) => collect_cursor_async(cursor).await,
Err(_) => Vec::new(),
};
Ok(Json(ApiResponse {
data: issues,
total: Some(total),
page: Some(params.page),
}))
}
pub async fn list_scan_runs(
Extension(agent): AgentExt,
Query(params): Query<PaginationParams>,
) -> ApiResult<Vec<ScanRun>> {
let db = &agent.db;
let skip = (params.page.saturating_sub(1)) * params.limit as u64;
let total = db.scan_runs().count_documents(doc! {}).await.unwrap_or(0);
let scans = match db.scan_runs().find(doc! {}).sort(doc! { "started_at": -1 }).skip(skip).limit(params.limit).await {
Ok(cursor) => collect_cursor_async(cursor).await,
Err(_) => Vec::new(),
};
Ok(Json(ApiResponse {
data: scans,
total: Some(total),
page: Some(params.page),
}))
}
async fn collect_cursor_async<T: serde::de::DeserializeOwned + Unpin + Send>(
mut cursor: mongodb::Cursor<T>,
) -> Vec<T> {
use futures_util::StreamExt;
let mut items = Vec::new();
while let Some(Ok(item)) = cursor.next().await {
items.push(item);
}
items
}

View File

@@ -0,0 +1,5 @@
pub mod handlers;
pub mod routes;
pub mod server;
pub use server::start_api_server;

View File

@@ -0,0 +1,19 @@
use axum::routing::{get, patch, post};
use axum::Router;
use crate::api::handlers;
pub fn build_router() -> Router {
Router::new()
.route("/api/v1/health", get(handlers::health))
.route("/api/v1/stats/overview", get(handlers::stats_overview))
.route("/api/v1/repositories", get(handlers::list_repositories))
.route("/api/v1/repositories", post(handlers::add_repository))
.route("/api/v1/repositories/{id}/scan", post(handlers::trigger_scan))
.route("/api/v1/findings", get(handlers::list_findings))
.route("/api/v1/findings/{id}", get(handlers::get_finding))
.route("/api/v1/findings/{id}/status", patch(handlers::update_finding_status))
.route("/api/v1/sbom", get(handlers::list_sbom))
.route("/api/v1/issues", get(handlers::list_issues))
.route("/api/v1/scan-runs", get(handlers::list_scan_runs))
}

View File

@@ -0,0 +1,28 @@
use std::sync::Arc;
use axum::Extension;
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
use crate::agent::ComplianceAgent;
use crate::api::routes;
use crate::error::AgentError;
pub async fn start_api_server(agent: ComplianceAgent, port: u16) -> Result<(), AgentError> {
let app = routes::build_router()
.layer(Extension(Arc::new(agent)))
.layer(CorsLayer::permissive())
.layer(TraceLayer::new_for_http());
let addr = format!("0.0.0.0:{port}");
let listener = tokio::net::TcpListener::bind(&addr)
.await
.map_err(|e| AgentError::Other(format!("Failed to bind to {addr}: {e}")))?;
tracing::info!("REST API listening on {addr}");
axum::serve(listener, app)
.await
.map_err(|e| AgentError::Other(format!("API server error: {e}")))?;
Ok(())
}

View File

@@ -0,0 +1,43 @@
use compliance_core::AgentConfig;
use secrecy::SecretString;
use crate::error::AgentError;
fn env_var(key: &str) -> Result<String, AgentError> {
std::env::var(key).map_err(|_| AgentError::Config(format!("Missing env var: {key}")))
}
fn env_var_opt(key: &str) -> Option<String> {
std::env::var(key).ok().filter(|v| !v.is_empty())
}
fn env_secret_opt(key: &str) -> Option<SecretString> {
env_var_opt(key).map(SecretString::from)
}
pub fn load_config() -> Result<AgentConfig, AgentError> {
Ok(AgentConfig {
mongodb_uri: env_var("MONGODB_URI")?,
mongodb_database: env_var_opt("MONGODB_DATABASE").unwrap_or_else(|| "compliance_scanner".to_string()),
litellm_url: env_var_opt("LITELLM_URL").unwrap_or_else(|| "http://localhost:4000".to_string()),
litellm_api_key: SecretString::from(env_var_opt("LITELLM_API_KEY").unwrap_or_default()),
litellm_model: env_var_opt("LITELLM_MODEL").unwrap_or_else(|| "gpt-4o".to_string()),
github_token: env_secret_opt("GITHUB_TOKEN"),
github_webhook_secret: env_secret_opt("GITHUB_WEBHOOK_SECRET"),
gitlab_url: env_var_opt("GITLAB_URL"),
gitlab_token: env_secret_opt("GITLAB_TOKEN"),
gitlab_webhook_secret: env_secret_opt("GITLAB_WEBHOOK_SECRET"),
jira_url: env_var_opt("JIRA_URL"),
jira_email: env_var_opt("JIRA_EMAIL"),
jira_api_token: env_secret_opt("JIRA_API_TOKEN"),
jira_project_key: env_var_opt("JIRA_PROJECT_KEY"),
searxng_url: env_var_opt("SEARXNG_URL"),
nvd_api_key: env_secret_opt("NVD_API_KEY"),
agent_port: env_var_opt("AGENT_PORT")
.and_then(|p| p.parse().ok())
.unwrap_or(3001),
scan_schedule: env_var_opt("SCAN_SCHEDULE").unwrap_or_else(|| "0 0 */6 * * *".to_string()),
cve_monitor_schedule: env_var_opt("CVE_MONITOR_SCHEDULE").unwrap_or_else(|| "0 0 0 * * *".to_string()),
git_clone_base_path: env_var_opt("GIT_CLONE_BASE_PATH").unwrap_or_else(|| "/tmp/compliance-scanner/repos".to_string()),
})
}

View File

@@ -0,0 +1,122 @@
use mongodb::bson::doc;
use mongodb::{Client, Collection, IndexModel};
use mongodb::options::IndexOptions;
use compliance_core::models::*;
use crate::error::AgentError;
#[derive(Clone, Debug)]
pub struct Database {
inner: mongodb::Database,
}
impl Database {
pub async fn connect(uri: &str, db_name: &str) -> Result<Self, AgentError> {
let client = Client::with_uri_str(uri).await?;
let db = client.database(db_name);
db.run_command(doc! { "ping": 1 }).await?;
tracing::info!("Connected to MongoDB database '{db_name}'");
Ok(Self { inner: db })
}
pub async fn ensure_indexes(&self) -> Result<(), AgentError> {
// repositories: unique git_url
self.repositories()
.create_index(
IndexModel::builder()
.keys(doc! { "git_url": 1 })
.options(IndexOptions::builder().unique(true).build())
.build(),
)
.await?;
// findings: unique fingerprint
self.findings()
.create_index(
IndexModel::builder()
.keys(doc! { "fingerprint": 1 })
.options(IndexOptions::builder().unique(true).build())
.build(),
)
.await?;
// findings: repo_id + severity compound
self.findings()
.create_index(
IndexModel::builder()
.keys(doc! { "repo_id": 1, "severity": 1 })
.build(),
)
.await?;
// scan_runs: repo_id + started_at descending
self.scan_runs()
.create_index(
IndexModel::builder()
.keys(doc! { "repo_id": 1, "started_at": -1 })
.build(),
)
.await?;
// sbom_entries: compound
self.sbom_entries()
.create_index(
IndexModel::builder()
.keys(doc! { "repo_id": 1, "name": 1, "version": 1 })
.build(),
)
.await?;
// cve_alerts: unique cve_id + repo_id
self.cve_alerts()
.create_index(
IndexModel::builder()
.keys(doc! { "cve_id": 1, "repo_id": 1 })
.options(IndexOptions::builder().unique(true).build())
.build(),
)
.await?;
// tracker_issues: unique finding_id
self.tracker_issues()
.create_index(
IndexModel::builder()
.keys(doc! { "finding_id": 1 })
.options(IndexOptions::builder().unique(true).build())
.build(),
)
.await?;
tracing::info!("Database indexes ensured");
Ok(())
}
pub fn repositories(&self) -> Collection<TrackedRepository> {
self.inner.collection("repositories")
}
pub fn findings(&self) -> Collection<Finding> {
self.inner.collection("findings")
}
pub fn scan_runs(&self) -> Collection<ScanRun> {
self.inner.collection("scan_runs")
}
pub fn sbom_entries(&self) -> Collection<SbomEntry> {
self.inner.collection("sbom_entries")
}
pub fn cve_alerts(&self) -> Collection<CveAlert> {
self.inner.collection("cve_alerts")
}
pub fn tracker_issues(&self) -> Collection<TrackerIssue> {
self.inner.collection("tracker_issues")
}
pub fn raw_collection(&self, name: &str) -> Collection<mongodb::bson::Document> {
self.inner.collection(name)
}
}

View File

@@ -0,0 +1,41 @@
use compliance_core::CoreError;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AgentError {
#[error(transparent)]
Core(#[from] CoreError),
#[error("Database error: {0}")]
Database(#[from] mongodb::error::Error),
#[error("Git error: {0}")]
Git(#[from] git2::Error),
#[error("HTTP error: {0}")]
Http(#[from] reqwest::Error),
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Scheduler error: {0}")]
Scheduler(String),
#[error("Configuration error: {0}")]
Config(String),
#[error("{0}")]
Other(String),
}
impl From<AgentError> for axum::http::StatusCode {
fn from(err: AgentError) -> Self {
match err {
AgentError::Core(CoreError::NotFound(_)) => axum::http::StatusCode::NOT_FOUND,
_ => axum::http::StatusCode::INTERNAL_SERVER_ERROR,
}
}
}

View File

@@ -0,0 +1,157 @@
use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use crate::error::AgentError;
#[derive(Clone)]
pub struct LlmClient {
base_url: String,
api_key: SecretString,
model: String,
http: reqwest::Client,
}
#[derive(Serialize)]
struct ChatMessage {
role: String,
content: String,
}
#[derive(Serialize)]
struct ChatCompletionRequest {
model: String,
messages: Vec<ChatMessage>,
#[serde(skip_serializing_if = "Option::is_none")]
temperature: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
max_tokens: Option<u32>,
}
#[derive(Deserialize)]
struct ChatCompletionResponse {
choices: Vec<ChatChoice>,
}
#[derive(Deserialize)]
struct ChatChoice {
message: ChatResponseMessage,
}
#[derive(Deserialize)]
struct ChatResponseMessage {
content: String,
}
impl LlmClient {
pub fn new(base_url: String, api_key: SecretString, model: String) -> Self {
Self {
base_url,
api_key,
model,
http: reqwest::Client::new(),
}
}
pub async fn chat(
&self,
system_prompt: &str,
user_prompt: &str,
temperature: Option<f64>,
) -> Result<String, AgentError> {
let url = format!("{}/v1/chat/completions", self.base_url.trim_end_matches('/'));
let request_body = ChatCompletionRequest {
model: self.model.clone(),
messages: vec![
ChatMessage {
role: "system".to_string(),
content: system_prompt.to_string(),
},
ChatMessage {
role: "user".to_string(),
content: user_prompt.to_string(),
},
],
temperature,
max_tokens: Some(4096),
};
let mut req = self
.http
.post(&url)
.header("content-type", "application/json")
.json(&request_body);
let key = self.api_key.expose_secret();
if !key.is_empty() {
req = req.header("Authorization", format!("Bearer {key}"));
}
let resp = req.send().await.map_err(|e| {
AgentError::Other(format!("LiteLLM request failed: {e}"))
})?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(AgentError::Other(format!("LiteLLM returned {status}: {body}")));
}
let body: ChatCompletionResponse = resp.json().await.map_err(|e| {
AgentError::Other(format!("Failed to parse LiteLLM response: {e}"))
})?;
body.choices
.first()
.map(|c| c.message.content.clone())
.ok_or_else(|| AgentError::Other("Empty response from LiteLLM".to_string()))
}
pub async fn chat_with_messages(
&self,
messages: Vec<(String, String)>,
temperature: Option<f64>,
) -> Result<String, AgentError> {
let url = format!("{}/v1/chat/completions", self.base_url.trim_end_matches('/'));
let request_body = ChatCompletionRequest {
model: self.model.clone(),
messages: messages
.into_iter()
.map(|(role, content)| ChatMessage { role, content })
.collect(),
temperature,
max_tokens: Some(4096),
};
let mut req = self
.http
.post(&url)
.header("content-type", "application/json")
.json(&request_body);
let key = self.api_key.expose_secret();
if !key.is_empty() {
req = req.header("Authorization", format!("Bearer {key}"));
}
let resp = req.send().await.map_err(|e| {
AgentError::Other(format!("LiteLLM request failed: {e}"))
})?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(AgentError::Other(format!("LiteLLM returned {status}: {body}")));
}
let body: ChatCompletionResponse = resp.json().await.map_err(|e| {
AgentError::Other(format!("Failed to parse LiteLLM response: {e}"))
})?;
body.choices
.first()
.map(|c| c.message.content.clone())
.ok_or_else(|| AgentError::Other("Empty response from LiteLLM".to_string()))
}
}

View File

@@ -0,0 +1,65 @@
use std::sync::Arc;
use compliance_core::models::Finding;
use crate::error::AgentError;
use crate::llm::LlmClient;
const DESCRIPTION_SYSTEM_PROMPT: &str = r#"You are a security engineer writing issue descriptions for a bug tracker. Generate a clear, actionable issue body in Markdown format that includes:
1. **Summary**: 1-2 sentence overview
2. **Evidence**: Code location, snippet, and what was detected
3. **Impact**: What could happen if not fixed
4. **Remediation**: Step-by-step fix instructions
5. **References**: Relevant CWE/CVE links if applicable
Keep it concise and professional. Use code blocks for code snippets."#;
pub async fn generate_issue_description(
llm: &Arc<LlmClient>,
finding: &Finding,
) -> Result<(String, String), AgentError> {
let user_prompt = format!(
"Generate an issue title and body for this finding:\n\
Scanner: {}\n\
Type: {}\n\
Severity: {}\n\
Rule: {}\n\
Title: {}\n\
Description: {}\n\
File: {}\n\
Line: {}\n\
Code:\n```\n{}\n```\n\
CWE: {}\n\
CVE: {}\n\
Remediation hint: {}",
finding.scanner,
finding.scan_type,
finding.severity,
finding.rule_id.as_deref().unwrap_or("N/A"),
finding.title,
finding.description,
finding.file_path.as_deref().unwrap_or("N/A"),
finding.line_number.map(|n| n.to_string()).unwrap_or_else(|| "N/A".to_string()),
finding.code_snippet.as_deref().unwrap_or("N/A"),
finding.cwe.as_deref().unwrap_or("N/A"),
finding.cve.as_deref().unwrap_or("N/A"),
finding.remediation.as_deref().unwrap_or("N/A"),
);
let response = llm.chat(DESCRIPTION_SYSTEM_PROMPT, &user_prompt, Some(0.3)).await?;
// Extract title from first line, rest is body
let mut lines = response.lines();
let title = lines
.next()
.unwrap_or(&finding.title)
.trim_start_matches('#')
.trim()
.to_string();
let body = lines.collect::<Vec<_>>().join("\n").trim().to_string();
let body = if body.is_empty() { response } else { body };
Ok((title, body))
}

View File

@@ -0,0 +1,27 @@
use std::sync::Arc;
use compliance_core::models::Finding;
use crate::error::AgentError;
use crate::llm::LlmClient;
const FIX_SYSTEM_PROMPT: &str = r#"You are a security engineer. Given a security finding with code context, suggest a concrete code fix. Return ONLY the fixed code snippet that can directly replace the vulnerable code. Include brief inline comments explaining the fix."#;
pub async fn suggest_fix(
llm: &Arc<LlmClient>,
finding: &Finding,
) -> Result<String, AgentError> {
let user_prompt = format!(
"Suggest a fix for this vulnerability:\n\
Language context from file: {}\n\
Rule: {}\n\
Description: {}\n\
Vulnerable code:\n```\n{}\n```",
finding.file_path.as_deref().unwrap_or("unknown"),
finding.rule_id.as_deref().unwrap_or("N/A"),
finding.description,
finding.code_snippet.as_deref().unwrap_or("N/A"),
);
llm.chat(FIX_SYSTEM_PROMPT, &user_prompt, Some(0.2)).await
}

View File

@@ -0,0 +1,10 @@
pub mod client;
#[allow(dead_code)]
pub mod descriptions;
#[allow(dead_code)]
pub mod fixes;
#[allow(dead_code)]
pub mod pr_review;
pub mod triage;
pub use client::LlmClient;

View File

@@ -0,0 +1,77 @@
use std::sync::Arc;
use compliance_core::models::Finding;
use compliance_core::traits::issue_tracker::ReviewComment;
use crate::error::AgentError;
use crate::llm::LlmClient;
const PR_REVIEW_SYSTEM_PROMPT: &str = r#"You are a security-focused code reviewer. Given a list of security findings in a PR diff, generate concise review comments. Each comment should:
1. Briefly explain the issue
2. Suggest a specific fix
3. Reference the relevant security standard (CWE, OWASP) if applicable
Be constructive and professional. Return JSON array:
[{"path": "file.rs", "line": 42, "body": "..."}]"#;
pub async fn generate_pr_review(
llm: &Arc<LlmClient>,
findings: &[Finding],
) -> Result<(String, Vec<ReviewComment>), AgentError> {
if findings.is_empty() {
return Ok(("No security issues found in this PR.".to_string(), Vec::new()));
}
let findings_text: Vec<String> = findings
.iter()
.map(|f| {
format!(
"- [{severity}] {title} in {file}:{line}\n Code: {code}\n Rule: {rule}",
severity = f.severity,
title = f.title,
file = f.file_path.as_deref().unwrap_or("unknown"),
line = f.line_number.map(|n| n.to_string()).unwrap_or_else(|| "?".to_string()),
code = f.code_snippet.as_deref().unwrap_or("N/A"),
rule = f.rule_id.as_deref().unwrap_or("N/A"),
)
})
.collect();
let user_prompt = format!(
"Generate review comments for these {} findings:\n{}",
findings.len(),
findings_text.join("\n"),
);
let response = llm.chat(PR_REVIEW_SYSTEM_PROMPT, &user_prompt, Some(0.3)).await?;
// Parse comments from LLM response
let comments: Vec<ReviewComment> = serde_json::from_str::<Vec<PrComment>>(&response)
.unwrap_or_default()
.into_iter()
.map(|c| ReviewComment {
path: c.path,
line: c.line,
body: c.body,
})
.collect();
let summary = format!(
"## Security Review\n\nFound **{}** potential security issue(s) in this PR.\n\n{}",
findings.len(),
findings
.iter()
.map(|f| format!("- **[{}]** {} in `{}`", f.severity, f.title, f.file_path.as_deref().unwrap_or("unknown")))
.collect::<Vec<_>>()
.join("\n"),
);
Ok((summary, comments))
}
#[derive(serde::Deserialize)]
struct PrComment {
path: String,
line: u32,
body: String,
}

View File

@@ -0,0 +1,73 @@
use std::sync::Arc;
use compliance_core::models::{Finding, FindingStatus};
use crate::llm::LlmClient;
const TRIAGE_SYSTEM_PROMPT: &str = r#"You are a security finding triage expert. Analyze the following security finding and determine:
1. Is this a true positive? (yes/no)
2. Confidence score (0-10, where 10 is highest confidence this is a real issue)
3. Brief remediation suggestion (1-2 sentences)
Respond in JSON format:
{"true_positive": true/false, "confidence": N, "remediation": "..."}"#;
pub async fn triage_findings(llm: &Arc<LlmClient>, findings: &mut Vec<Finding>) -> usize {
let mut passed = 0;
for finding in findings.iter_mut() {
let user_prompt = format!(
"Scanner: {}\nRule: {}\nSeverity: {}\nTitle: {}\nDescription: {}\nFile: {}\nLine: {}\nCode: {}",
finding.scanner,
finding.rule_id.as_deref().unwrap_or("N/A"),
finding.severity,
finding.title,
finding.description,
finding.file_path.as_deref().unwrap_or("N/A"),
finding.line_number.map(|n| n.to_string()).unwrap_or_else(|| "N/A".to_string()),
finding.code_snippet.as_deref().unwrap_or("N/A"),
);
match llm.chat(TRIAGE_SYSTEM_PROMPT, &user_prompt, Some(0.1)).await {
Ok(response) => {
if let Ok(result) = serde_json::from_str::<TriageResult>(&response) {
finding.confidence = Some(result.confidence);
if let Some(remediation) = result.remediation {
finding.remediation = Some(remediation);
}
if result.confidence >= 3.0 {
finding.status = FindingStatus::Triaged;
passed += 1;
} else {
finding.status = FindingStatus::FalsePositive;
}
} else {
// If LLM response doesn't parse, keep the finding
finding.status = FindingStatus::Triaged;
passed += 1;
tracing::warn!("Failed to parse triage response for {}: {response}", finding.fingerprint);
}
}
Err(e) => {
// On LLM error, keep the finding
tracing::warn!("LLM triage failed for {}: {e}", finding.fingerprint);
finding.status = FindingStatus::Triaged;
passed += 1;
}
}
}
// Remove false positives
findings.retain(|f| f.status != FindingStatus::FalsePositive);
passed
}
#[derive(serde::Deserialize)]
struct TriageResult {
#[serde(default)]
true_positive: bool,
#[serde(default)]
confidence: f64,
remediation: Option<String>,
}

View File

@@ -0,0 +1,53 @@
use tracing_subscriber::EnvFilter;
mod agent;
mod config;
mod database;
mod error;
mod api;
mod llm;
mod pipeline;
mod scheduler;
#[allow(dead_code)]
mod trackers;
mod webhooks;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.init();
dotenvy::dotenv().ok();
tracing::info!("Loading configuration...");
let config = config::load_config()?;
tracing::info!("Connecting to MongoDB...");
let db = database::Database::connect(&config.mongodb_uri, &config.mongodb_database).await?;
db.ensure_indexes().await?;
let agent = agent::ComplianceAgent::new(config.clone(), db.clone());
tracing::info!("Starting scheduler...");
let scheduler_agent = agent.clone();
let scheduler_handle = tokio::spawn(async move {
if let Err(e) = scheduler::start_scheduler(&scheduler_agent).await {
tracing::error!("Scheduler error: {e}");
}
});
tracing::info!("Starting webhook server...");
let webhook_agent = agent.clone();
let webhook_handle = tokio::spawn(async move {
if let Err(e) = webhooks::start_webhook_server(&webhook_agent).await {
tracing::error!("Webhook server error: {e}");
}
});
tracing::info!("Starting REST API on port {}...", config.agent_port);
api::start_api_server(agent, config.agent_port).await?;
let _ = tokio::join!(scheduler_handle, webhook_handle);
Ok(())
}

View File

@@ -0,0 +1,199 @@
use compliance_core::models::{CveAlert, CveSource, SbomEntry, VulnRef};
use compliance_core::CoreError;
pub struct CveScanner {
http: reqwest::Client,
searxng_url: Option<String>,
nvd_api_key: Option<String>,
}
impl CveScanner {
pub fn new(http: reqwest::Client, searxng_url: Option<String>, nvd_api_key: Option<String>) -> Self {
Self { http, searxng_url, nvd_api_key }
}
pub async fn scan_dependencies(
&self,
repo_id: &str,
entries: &mut [SbomEntry],
) -> Result<Vec<CveAlert>, CoreError> {
let mut alerts = Vec::new();
// Batch query OSV.dev
let osv_results = self.query_osv_batch(entries).await?;
for (idx, vulns) in osv_results.into_iter().enumerate() {
if let Some(entry) = entries.get_mut(idx) {
for vuln in &vulns {
entry.known_vulnerabilities.push(VulnRef {
id: vuln.id.clone(),
source: "osv".to_string(),
severity: vuln.severity.clone(),
url: Some(format!("https://osv.dev/vulnerability/{}", vuln.id)),
});
let mut alert = CveAlert::new(
vuln.id.clone(),
repo_id.to_string(),
entry.name.clone(),
entry.version.clone(),
CveSource::Osv,
);
alert.summary = vuln.summary.clone();
alerts.push(alert);
}
}
}
// Enrich with NVD CVSS scores
for alert in &mut alerts {
if let Ok(Some(cvss)) = self.query_nvd(&alert.cve_id).await {
alert.cvss_score = Some(cvss);
}
}
Ok(alerts)
}
async fn query_osv_batch(&self, entries: &[SbomEntry]) -> Result<Vec<Vec<OsvVuln>>, CoreError> {
let queries: Vec<_> = entries
.iter()
.filter_map(|e| {
e.purl.as_ref().map(|purl| {
serde_json::json!({
"package": { "purl": purl }
})
})
})
.collect();
if queries.is_empty() {
return Ok(Vec::new());
}
let body = serde_json::json!({ "queries": queries });
let resp = self
.http
.post("https://api.osv.dev/v1/querybatch")
.json(&body)
.send()
.await
.map_err(|e| CoreError::Http(format!("OSV.dev request failed: {e}")))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
tracing::warn!("OSV.dev returned {status}: {body}");
return Ok(Vec::new());
}
let result: OsvBatchResponse = resp.json().await.map_err(|e| {
CoreError::Http(format!("Failed to parse OSV.dev response: {e}"))
})?;
let vulns = result
.results
.into_iter()
.map(|r| {
r.vulns
.unwrap_or_default()
.into_iter()
.map(|v| OsvVuln {
id: v.id,
summary: v.summary,
severity: v.database_specific
.and_then(|d| d.get("severity").and_then(|s| s.as_str()).map(String::from)),
})
.collect()
})
.collect();
Ok(vulns)
}
async fn query_nvd(&self, cve_id: &str) -> Result<Option<f64>, CoreError> {
if !cve_id.starts_with("CVE-") {
return Ok(None);
}
let url = format!("https://services.nvd.nist.gov/rest/json/cves/2.0?cveId={cve_id}");
let mut req = self.http.get(&url);
if let Some(key) = &self.nvd_api_key {
req = req.header("apiKey", key.as_str());
}
let resp = req.send().await.map_err(|e| {
CoreError::Http(format!("NVD request failed: {e}"))
})?;
if !resp.status().is_success() {
return Ok(None);
}
let body: serde_json::Value = resp.json().await.map_err(|e| {
CoreError::Http(format!("Failed to parse NVD response: {e}"))
})?;
// Extract CVSS v3.1 base score
let score = body["vulnerabilities"]
.as_array()
.and_then(|v| v.first())
.and_then(|v| v["cve"]["metrics"]["cvssMetricV31"].as_array())
.and_then(|m| m.first())
.and_then(|m| m["cvssData"]["baseScore"].as_f64());
Ok(score)
}
pub async fn search_context(&self, cve_id: &str) -> Result<Vec<String>, CoreError> {
let Some(searxng_url) = &self.searxng_url else {
return Ok(Vec::new());
};
let url = format!("{}/search?q={cve_id}&format=json&engines=duckduckgo", searxng_url.trim_end_matches('/'));
let resp = self.http.get(&url).send().await.map_err(|e| {
CoreError::Http(format!("SearXNG request failed: {e}"))
})?;
if !resp.status().is_success() {
return Ok(Vec::new());
}
let body: serde_json::Value = resp.json().await.unwrap_or_default();
let results = body["results"]
.as_array()
.map(|arr| {
arr.iter()
.take(5)
.filter_map(|r| r["url"].as_str().map(String::from))
.collect()
})
.unwrap_or_default();
Ok(results)
}
}
#[derive(serde::Deserialize)]
struct OsvBatchResponse {
results: Vec<OsvBatchResult>,
}
#[derive(serde::Deserialize)]
struct OsvBatchResult {
vulns: Option<Vec<OsvVulnEntry>>,
}
#[derive(serde::Deserialize)]
struct OsvVulnEntry {
id: String,
summary: Option<String>,
database_specific: Option<serde_json::Value>,
}
struct OsvVuln {
id: String,
summary: Option<String>,
severity: Option<String>,
}

View File

@@ -0,0 +1,10 @@
use sha2::{Digest, Sha256};
pub fn compute_fingerprint(parts: &[&str]) -> String {
let mut hasher = Sha256::new();
for part in parts {
hasher.update(part.as_bytes());
hasher.update(b"|");
}
hex::encode(hasher.finalize())
}

View File

@@ -0,0 +1,100 @@
use std::path::{Path, PathBuf};
use git2::{FetchOptions, Repository};
use crate::error::AgentError;
pub struct GitOps {
base_path: PathBuf,
}
impl GitOps {
pub fn new(base_path: &str) -> Self {
Self {
base_path: PathBuf::from(base_path),
}
}
pub fn clone_or_fetch(&self, git_url: &str, repo_name: &str) -> Result<PathBuf, AgentError> {
let repo_path = self.base_path.join(repo_name);
if repo_path.exists() {
self.fetch(&repo_path)?;
} else {
std::fs::create_dir_all(&repo_path)?;
Repository::clone(git_url, &repo_path)?;
tracing::info!("Cloned {git_url} to {}", repo_path.display());
}
Ok(repo_path)
}
fn fetch(&self, repo_path: &Path) -> Result<(), AgentError> {
let repo = Repository::open(repo_path)?;
let mut remote = repo.find_remote("origin")?;
let mut fetch_opts = FetchOptions::new();
remote.fetch(&[] as &[&str], Some(&mut fetch_opts), None)?;
// Fast-forward to origin/HEAD
let fetch_head = repo.find_reference("FETCH_HEAD")?;
let fetch_commit = repo.reference_to_annotated_commit(&fetch_head)?;
let head_ref = repo.head()?;
let head_name = head_ref.name().unwrap_or("HEAD");
repo.reference(
head_name,
fetch_commit.id(),
true,
"fast-forward",
)?;
repo.checkout_head(Some(git2::build::CheckoutBuilder::default().force()))?;
tracing::info!("Fetched and fast-forwarded {}", repo_path.display());
Ok(())
}
pub fn get_head_sha(repo_path: &Path) -> Result<String, AgentError> {
let repo = Repository::open(repo_path)?;
let head = repo.head()?;
let commit = head.peel_to_commit()?;
Ok(commit.id().to_string())
}
pub fn has_new_commits(repo_path: &Path, last_sha: Option<&str>) -> Result<bool, AgentError> {
let current_sha = Self::get_head_sha(repo_path)?;
match last_sha {
Some(sha) if sha == current_sha => Ok(false),
_ => Ok(true),
}
}
pub fn get_changed_files(
repo_path: &Path,
old_sha: &str,
new_sha: &str,
) -> Result<Vec<String>, AgentError> {
let repo = Repository::open(repo_path)?;
let old_commit = repo.find_commit(git2::Oid::from_str(old_sha)?)?;
let new_commit = repo.find_commit(git2::Oid::from_str(new_sha)?)?;
let old_tree = old_commit.tree()?;
let new_tree = new_commit.tree()?;
let diff = repo.diff_tree_to_tree(Some(&old_tree), Some(&new_tree), None)?;
let mut files = Vec::new();
diff.foreach(
&mut |delta, _| {
if let Some(path) = delta.new_file().path() {
files.push(path.to_string_lossy().to_string());
}
true
},
None,
None,
None,
)?;
Ok(files)
}
}

View File

@@ -0,0 +1,7 @@
pub mod cve;
pub mod dedup;
pub mod git;
pub mod orchestrator;
pub mod patterns;
pub mod sbom;
pub mod semgrep;

View File

@@ -0,0 +1,252 @@
use std::sync::Arc;
use mongodb::bson::doc;
use compliance_core::models::*;
use compliance_core::traits::Scanner;
use compliance_core::AgentConfig;
use crate::database::Database;
use crate::error::AgentError;
use crate::llm::LlmClient;
use crate::pipeline::cve::CveScanner;
use crate::pipeline::git::GitOps;
use crate::pipeline::patterns::{GdprPatternScanner, OAuthPatternScanner};
use crate::pipeline::sbom::SbomScanner;
use crate::pipeline::semgrep::SemgrepScanner;
pub struct PipelineOrchestrator {
config: AgentConfig,
db: Database,
llm: Arc<LlmClient>,
http: reqwest::Client,
}
impl PipelineOrchestrator {
pub fn new(
config: AgentConfig,
db: Database,
llm: Arc<LlmClient>,
http: reqwest::Client,
) -> Self {
Self { config, db, llm, http }
}
pub async fn run(
&self,
repo_id: &str,
trigger: ScanTrigger,
) -> Result<(), AgentError> {
// Look up the repository
let repo = self
.db
.repositories()
.find_one(doc! { "_id": mongodb::bson::oid::ObjectId::parse_str(repo_id).map_err(|e| AgentError::Other(e.to_string()))? })
.await?
.ok_or_else(|| AgentError::Other(format!("Repository {repo_id} not found")))?;
// Create scan run
let scan_run = ScanRun::new(repo_id.to_string(), trigger);
let insert = self.db.scan_runs().insert_one(&scan_run).await?;
let scan_run_id = insert.inserted_id.as_object_id()
.map(|id| id.to_hex())
.unwrap_or_default();
let result = self.run_pipeline(&repo, &scan_run_id).await;
// Update scan run status
match &result {
Ok(count) => {
self.db.scan_runs().update_one(
doc! { "_id": &insert.inserted_id },
doc! {
"$set": {
"status": "completed",
"current_phase": "completed",
"new_findings_count": *count as i64,
"completed_at": mongodb::bson::DateTime::now(),
}
},
).await?;
}
Err(e) => {
self.db.scan_runs().update_one(
doc! { "_id": &insert.inserted_id },
doc! {
"$set": {
"status": "failed",
"error_message": e.to_string(),
"completed_at": mongodb::bson::DateTime::now(),
}
},
).await?;
}
}
result.map(|_| ())
}
async fn run_pipeline(
&self,
repo: &TrackedRepository,
scan_run_id: &str,
) -> Result<u32, AgentError> {
let repo_id = repo.id.as_ref()
.map(|id| id.to_hex())
.unwrap_or_default();
// Stage 0: Change detection
tracing::info!("[{repo_id}] Stage 0: Change detection");
let git_ops = GitOps::new(&self.config.git_clone_base_path);
let repo_path = git_ops.clone_or_fetch(&repo.git_url, &repo.name)?;
if !GitOps::has_new_commits(&repo_path, repo.last_scanned_commit.as_deref())? {
tracing::info!("[{repo_id}] No new commits, skipping scan");
return Ok(0);
}
let current_sha = GitOps::get_head_sha(&repo_path)?;
let mut all_findings: Vec<Finding> = Vec::new();
// Stage 1: Semgrep SAST
tracing::info!("[{repo_id}] Stage 1: Semgrep SAST");
self.update_phase(scan_run_id, "sast").await;
let semgrep = SemgrepScanner;
match semgrep.scan(&repo_path, &repo_id).await {
Ok(output) => all_findings.extend(output.findings),
Err(e) => tracing::warn!("[{repo_id}] Semgrep failed: {e}"),
}
// Stage 2: SBOM Generation
tracing::info!("[{repo_id}] Stage 2: SBOM Generation");
self.update_phase(scan_run_id, "sbom_generation").await;
let sbom_scanner = SbomScanner;
let mut sbom_entries = match sbom_scanner.scan(&repo_path, &repo_id).await {
Ok(output) => output.sbom_entries,
Err(e) => {
tracing::warn!("[{repo_id}] SBOM generation failed: {e}");
Vec::new()
}
};
// Stage 3: CVE Scanning
tracing::info!("[{repo_id}] Stage 3: CVE Scanning");
self.update_phase(scan_run_id, "cve_scanning").await;
let cve_scanner = CveScanner::new(
self.http.clone(),
self.config.searxng_url.clone(),
self.config.nvd_api_key.as_ref().map(|k| {
use secrecy::ExposeSecret;
k.expose_secret().to_string()
}),
);
let cve_alerts = match cve_scanner.scan_dependencies(&repo_id, &mut sbom_entries).await {
Ok(alerts) => alerts,
Err(e) => {
tracing::warn!("[{repo_id}] CVE scanning failed: {e}");
Vec::new()
}
};
// Stage 4: Pattern Scanning (GDPR + OAuth)
tracing::info!("[{repo_id}] Stage 4: Pattern Scanning");
self.update_phase(scan_run_id, "pattern_scanning").await;
let gdpr = GdprPatternScanner::new();
match gdpr.scan(&repo_path, &repo_id).await {
Ok(output) => all_findings.extend(output.findings),
Err(e) => tracing::warn!("[{repo_id}] GDPR pattern scan failed: {e}"),
}
let oauth = OAuthPatternScanner::new();
match oauth.scan(&repo_path, &repo_id).await {
Ok(output) => all_findings.extend(output.findings),
Err(e) => tracing::warn!("[{repo_id}] OAuth pattern scan failed: {e}"),
}
// Stage 5: LLM Triage
tracing::info!("[{repo_id}] Stage 5: LLM Triage ({} findings)", all_findings.len());
self.update_phase(scan_run_id, "llm_triage").await;
let triaged = crate::llm::triage::triage_findings(&self.llm, &mut all_findings).await;
tracing::info!("[{repo_id}] Triaged: {triaged} findings passed confidence threshold");
// Dedup against existing findings and insert new ones
let mut new_count = 0u32;
for mut finding in all_findings {
finding.scan_run_id = Some(scan_run_id.to_string());
// Check if fingerprint already exists
let existing = self
.db
.findings()
.find_one(doc! { "fingerprint": &finding.fingerprint })
.await?;
if existing.is_none() {
self.db.findings().insert_one(&finding).await?;
new_count += 1;
}
}
// Persist SBOM entries (upsert by repo_id + name + version)
for entry in &sbom_entries {
let filter = doc! {
"repo_id": &entry.repo_id,
"name": &entry.name,
"version": &entry.version,
};
let update = mongodb::bson::to_document(entry)
.map(|d| doc! { "$set": d })
.unwrap_or_else(|_| doc! {});
self.db
.sbom_entries()
.update_one(filter, update)
.upsert(true)
.await?;
}
// Persist CVE alerts (upsert by cve_id + repo_id)
for alert in &cve_alerts {
let filter = doc! {
"cve_id": &alert.cve_id,
"repo_id": &alert.repo_id,
};
let update = mongodb::bson::to_document(alert)
.map(|d| doc! { "$set": d })
.unwrap_or_else(|_| doc! {});
self.db
.cve_alerts()
.update_one(filter, update)
.upsert(true)
.await?;
}
// Stage 6: Issue Creation
tracing::info!("[{repo_id}] Stage 6: Issue Creation");
self.update_phase(scan_run_id, "issue_creation").await;
// Issue creation is handled by the trackers module - deferred to agent
// Stage 7: Update repository
self.db.repositories().update_one(
doc! { "_id": repo.id },
doc! {
"$set": {
"last_scanned_commit": &current_sha,
"updated_at": mongodb::bson::DateTime::now(),
},
"$inc": { "findings_count": new_count as i64 },
},
).await?;
tracing::info!("[{repo_id}] Scan complete: {new_count} new findings");
Ok(new_count)
}
async fn update_phase(&self, scan_run_id: &str, phase: &str) {
if let Ok(oid) = mongodb::bson::oid::ObjectId::parse_str(scan_run_id) {
let _ = self.db.scan_runs().update_one(
doc! { "_id": oid },
doc! {
"$set": { "current_phase": phase },
"$push": { "phases_completed": phase },
},
).await;
}
}
}

View File

@@ -0,0 +1,226 @@
use std::path::Path;
use compliance_core::models::{Finding, ScanType, Severity};
use compliance_core::traits::{ScanOutput, Scanner};
use compliance_core::CoreError;
use regex::Regex;
use crate::pipeline::dedup;
pub struct GdprPatternScanner {
patterns: Vec<PatternRule>,
}
pub struct OAuthPatternScanner {
patterns: Vec<PatternRule>,
}
struct PatternRule {
id: String,
title: String,
description: String,
pattern: Regex,
severity: Severity,
file_extensions: Vec<String>,
}
impl GdprPatternScanner {
pub fn new() -> Self {
let patterns = vec![
PatternRule {
id: "gdpr-pii-logging".to_string(),
title: "PII data potentially logged".to_string(),
description: "Logging statements that may contain personally identifiable information (email, SSN, phone, IP address).".to_string(),
pattern: Regex::new(r#"(?i)(log|print|console\.|logger\.|tracing::)\s*[\.(].*\b(email|ssn|social.?security|phone.?number|ip.?addr|passport|date.?of.?birth|credit.?card)\b"#).unwrap_or_else(|_| Regex::new("^$").unwrap()),
severity: Severity::High,
file_extensions: vec!["rs", "py", "js", "ts", "java", "go", "rb"].into_iter().map(String::from).collect(),
},
PatternRule {
id: "gdpr-no-consent".to_string(),
title: "Data collection without apparent consent mechanism".to_string(),
description: "Data collection endpoint that doesn't reference consent or opt-in mechanisms.".to_string(),
pattern: Regex::new(r#"(?i)(collect|store|save|persist|record).*\b(personal|user.?data|pii|biometric)\b"#).unwrap_or_else(|_| Regex::new("^$").unwrap()),
severity: Severity::Medium,
file_extensions: vec!["rs", "py", "js", "ts", "java", "go"].into_iter().map(String::from).collect(),
},
PatternRule {
id: "gdpr-no-delete-endpoint".to_string(),
title: "Missing data deletion capability".to_string(),
description: "User data models or controllers without corresponding deletion endpoints (right to erasure).".to_string(),
pattern: Regex::new(r#"(?i)(class|struct|model)\s+User(?!.*[Dd]elete)"#).unwrap_or_else(|_| Regex::new("^$").unwrap()),
severity: Severity::Medium,
file_extensions: vec!["rs", "py", "js", "ts", "java", "go", "rb"].into_iter().map(String::from).collect(),
},
PatternRule {
id: "gdpr-hardcoded-retention".to_string(),
title: "Hardcoded data retention period".to_string(),
description: "Data retention periods should be configurable for GDPR compliance.".to_string(),
pattern: Regex::new(r#"(?i)(retention|ttl|expire|keep.?for)\s*[=:]\s*\d+"#).unwrap_or_else(|_| Regex::new("^$").unwrap()),
severity: Severity::Low,
file_extensions: vec!["rs", "py", "js", "ts", "java", "go", "yaml", "yml", "toml", "json"].into_iter().map(String::from).collect(),
},
];
Self { patterns }
}
}
impl Scanner for GdprPatternScanner {
fn name(&self) -> &str {
"gdpr-patterns"
}
fn scan_type(&self) -> ScanType {
ScanType::Gdpr
}
async fn scan(&self, repo_path: &Path, repo_id: &str) -> Result<ScanOutput, CoreError> {
let findings = scan_with_patterns(repo_path, repo_id, &self.patterns, ScanType::Gdpr, "gdpr-patterns")?;
Ok(ScanOutput {
findings,
sbom_entries: Vec::new(),
})
}
}
impl OAuthPatternScanner {
pub fn new() -> Self {
let patterns = vec![
PatternRule {
id: "oauth-implicit-grant".to_string(),
title: "OAuth implicit grant flow detected".to_string(),
description: "Implicit grant flow is deprecated and insecure. Use authorization code flow with PKCE instead.".to_string(),
pattern: Regex::new(r#"(?i)(response_type\s*[=:]\s*["']?token|grant_type\s*[=:]\s*["']?implicit)"#).unwrap_or_else(|_| Regex::new("^$").unwrap()),
severity: Severity::High,
file_extensions: vec!["rs", "py", "js", "ts", "java", "go", "yaml", "yml", "json"].into_iter().map(String::from).collect(),
},
PatternRule {
id: "oauth-missing-pkce".to_string(),
title: "OAuth flow without PKCE".to_string(),
description: "Authorization code flow should use PKCE (code_challenge/code_verifier) for public clients.".to_string(),
pattern: Regex::new(r#"(?i)authorization.?code(?!.*code.?challenge)(?!.*pkce)"#).unwrap_or_else(|_| Regex::new("^$").unwrap()),
severity: Severity::Medium,
file_extensions: vec!["rs", "py", "js", "ts", "java", "go"].into_iter().map(String::from).collect(),
},
PatternRule {
id: "oauth-token-localstorage".to_string(),
title: "Token stored in localStorage".to_string(),
description: "Storing tokens in localStorage is vulnerable to XSS. Use httpOnly cookies or secure session storage.".to_string(),
pattern: Regex::new(r#"(?i)localStorage\.(set|get)Item\s*\(\s*["'].*token"#).unwrap_or_else(|_| Regex::new("^$").unwrap()),
severity: Severity::High,
file_extensions: vec!["js", "ts", "jsx", "tsx"].into_iter().map(String::from).collect(),
},
PatternRule {
id: "oauth-token-url".to_string(),
title: "Token passed in URL parameters".to_string(),
description: "Tokens in URLs can leak via referrer headers, server logs, and browser history.".to_string(),
pattern: Regex::new(r#"(?i)(access_token|bearer)\s*[=]\s*.*\b(url|query|param|href)\b"#).unwrap_or_else(|_| Regex::new("^$").unwrap()),
severity: Severity::High,
file_extensions: vec!["rs", "py", "js", "ts", "java", "go"].into_iter().map(String::from).collect(),
},
];
Self { patterns }
}
}
impl Scanner for OAuthPatternScanner {
fn name(&self) -> &str {
"oauth-patterns"
}
fn scan_type(&self) -> ScanType {
ScanType::OAuth
}
async fn scan(&self, repo_path: &Path, repo_id: &str) -> Result<ScanOutput, CoreError> {
let findings = scan_with_patterns(repo_path, repo_id, &self.patterns, ScanType::OAuth, "oauth-patterns")?;
Ok(ScanOutput {
findings,
sbom_entries: Vec::new(),
})
}
}
fn scan_with_patterns(
repo_path: &Path,
repo_id: &str,
patterns: &[PatternRule],
scan_type: ScanType,
scanner_name: &str,
) -> Result<Vec<Finding>, CoreError> {
let mut findings = Vec::new();
for entry in walkdir(repo_path)? {
let path = entry.path();
if !path.is_file() {
continue;
}
let ext = path
.extension()
.and_then(|e| e.to_str())
.unwrap_or("")
.to_string();
let content = match std::fs::read_to_string(path) {
Ok(c) => c,
Err(_) => continue, // skip binary files
};
let relative_path = path
.strip_prefix(repo_path)
.unwrap_or(path)
.to_string_lossy()
.to_string();
for pattern in patterns {
if !pattern.file_extensions.contains(&ext) {
continue;
}
for (line_num, line) in content.lines().enumerate() {
if pattern.pattern.is_match(line) {
let fingerprint = dedup::compute_fingerprint(&[
repo_id,
&pattern.id,
&relative_path,
&(line_num + 1).to_string(),
]);
let mut finding = Finding::new(
repo_id.to_string(),
fingerprint,
scanner_name.to_string(),
scan_type.clone(),
pattern.title.clone(),
pattern.description.clone(),
pattern.severity.clone(),
);
finding.rule_id = Some(pattern.id.clone());
finding.file_path = Some(relative_path.clone());
finding.line_number = Some((line_num + 1) as u32);
finding.code_snippet = Some(line.to_string());
findings.push(finding);
}
}
}
}
Ok(findings)
}
fn walkdir(path: &Path) -> Result<Vec<walkdir::DirEntry>, CoreError> {
// Simple recursive file walk, skipping hidden dirs and common non-source dirs
let skip_dirs = [".git", "node_modules", "target", "vendor", ".venv", "__pycache__", "dist", "build"];
let entries: Vec<_> = walkdir::WalkDir::new(path)
.into_iter()
.filter_entry(|e| {
let name = e.file_name().to_string_lossy();
!skip_dirs.contains(&name.as_ref())
})
.filter_map(|e| e.ok())
.collect();
Ok(entries)
}

View File

@@ -0,0 +1,186 @@
use std::path::Path;
use compliance_core::models::{SbomEntry, ScanType, VulnRef};
use compliance_core::traits::{ScanOutput, Scanner};
use compliance_core::CoreError;
pub struct SbomScanner;
impl Scanner for SbomScanner {
fn name(&self) -> &str {
"sbom"
}
fn scan_type(&self) -> ScanType {
ScanType::Sbom
}
async fn scan(&self, repo_path: &Path, repo_id: &str) -> Result<ScanOutput, CoreError> {
let mut entries = Vec::new();
// Run syft for SBOM generation
match run_syft(repo_path, repo_id).await {
Ok(syft_entries) => entries.extend(syft_entries),
Err(e) => tracing::warn!("syft failed: {e}"),
}
// Run cargo-audit for Rust-specific vulns
match run_cargo_audit(repo_path, repo_id).await {
Ok(vulns) => merge_audit_vulns(&mut entries, vulns),
Err(e) => tracing::warn!("cargo-audit skipped: {e}"),
}
Ok(ScanOutput {
findings: Vec::new(),
sbom_entries: entries,
})
}
}
async fn run_syft(repo_path: &Path, repo_id: &str) -> Result<Vec<SbomEntry>, CoreError> {
let output = tokio::process::Command::new("syft")
.arg(repo_path)
.args(["-o", "cyclonedx-json"])
.output()
.await
.map_err(|e| CoreError::Scanner {
scanner: "syft".to_string(),
source: Box::new(e),
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(CoreError::Scanner {
scanner: "syft".to_string(),
source: format!("syft exited with {}: {stderr}", output.status).into(),
});
}
let cdx: CycloneDxBom = serde_json::from_slice(&output.stdout)?;
let entries = cdx
.components
.unwrap_or_default()
.into_iter()
.map(|c| {
let mut entry = SbomEntry::new(
repo_id.to_string(),
c.name,
c.version.unwrap_or_else(|| "unknown".to_string()),
c.component_type.unwrap_or_else(|| "library".to_string()),
);
entry.purl = c.purl;
entry.license = c.licenses.and_then(|ls| {
ls.first().and_then(|l| {
l.license.as_ref().map(|lic| {
lic.id.clone().unwrap_or_else(|| lic.name.clone().unwrap_or_default())
})
})
});
entry
})
.collect();
Ok(entries)
}
async fn run_cargo_audit(repo_path: &Path, _repo_id: &str) -> Result<Vec<AuditVuln>, CoreError> {
let cargo_lock = repo_path.join("Cargo.lock");
if !cargo_lock.exists() {
return Ok(Vec::new());
}
let output = tokio::process::Command::new("cargo")
.args(["audit", "--json"])
.current_dir(repo_path)
.output()
.await
.map_err(|e| CoreError::Scanner {
scanner: "cargo-audit".to_string(),
source: Box::new(e),
})?;
let result: CargoAuditOutput = serde_json::from_slice(&output.stdout)
.unwrap_or_else(|_| CargoAuditOutput { vulnerabilities: CargoAuditVulns { list: Vec::new() } });
let vulns = result
.vulnerabilities
.list
.into_iter()
.map(|v| AuditVuln {
package: v.advisory.package,
id: v.advisory.id,
url: v.advisory.url,
})
.collect();
Ok(vulns)
}
fn merge_audit_vulns(entries: &mut Vec<SbomEntry>, vulns: Vec<AuditVuln>) {
for vuln in vulns {
if let Some(entry) = entries.iter_mut().find(|e| e.name == vuln.package) {
entry.known_vulnerabilities.push(VulnRef {
id: vuln.id.clone(),
source: "cargo-audit".to_string(),
severity: None,
url: Some(vuln.url),
});
}
}
}
// CycloneDX JSON types
#[derive(serde::Deserialize)]
struct CycloneDxBom {
components: Option<Vec<CdxComponent>>,
}
#[derive(serde::Deserialize)]
struct CdxComponent {
name: String,
version: Option<String>,
#[serde(rename = "type")]
component_type: Option<String>,
purl: Option<String>,
licenses: Option<Vec<CdxLicenseWrapper>>,
}
#[derive(serde::Deserialize)]
struct CdxLicenseWrapper {
license: Option<CdxLicense>,
}
#[derive(serde::Deserialize)]
struct CdxLicense {
id: Option<String>,
name: Option<String>,
}
// Cargo audit types
#[derive(serde::Deserialize)]
struct CargoAuditOutput {
vulnerabilities: CargoAuditVulns,
}
#[derive(serde::Deserialize)]
struct CargoAuditVulns {
list: Vec<CargoAuditEntry>,
}
#[derive(serde::Deserialize)]
struct CargoAuditEntry {
advisory: CargoAuditAdvisory,
}
#[derive(serde::Deserialize)]
struct CargoAuditAdvisory {
id: String,
package: String,
url: String,
}
struct AuditVuln {
package: String,
id: String,
url: String,
}

View File

@@ -0,0 +1,110 @@
use std::path::Path;
use compliance_core::models::{Finding, ScanType, Severity};
use compliance_core::traits::{ScanOutput, Scanner};
use compliance_core::CoreError;
use crate::pipeline::dedup;
pub struct SemgrepScanner;
impl Scanner for SemgrepScanner {
fn name(&self) -> &str {
"semgrep"
}
fn scan_type(&self) -> ScanType {
ScanType::Sast
}
async fn scan(&self, repo_path: &Path, repo_id: &str) -> Result<ScanOutput, CoreError> {
let output = tokio::process::Command::new("semgrep")
.args(["--config=auto", "--json", "--quiet"])
.arg(repo_path)
.output()
.await
.map_err(|e| CoreError::Scanner {
scanner: "semgrep".to_string(),
source: Box::new(e),
})?;
if !output.status.success() && output.stdout.is_empty() {
let stderr = String::from_utf8_lossy(&output.stderr);
tracing::warn!("Semgrep exited with {}: {stderr}", output.status);
return Ok(ScanOutput::default());
}
let result: SemgrepOutput = serde_json::from_slice(&output.stdout)?;
let findings = result
.results
.into_iter()
.map(|r| {
let severity = match r.extra.severity.as_str() {
"ERROR" => Severity::High,
"WARNING" => Severity::Medium,
"INFO" => Severity::Low,
_ => Severity::Info,
};
let fingerprint = dedup::compute_fingerprint(&[
repo_id,
&r.check_id,
&r.path,
&r.start.line.to_string(),
]);
let mut finding = Finding::new(
repo_id.to_string(),
fingerprint,
"semgrep".to_string(),
ScanType::Sast,
r.extra.message.clone(),
r.extra.message,
severity,
);
finding.rule_id = Some(r.check_id);
finding.file_path = Some(r.path);
finding.line_number = Some(r.start.line);
finding.code_snippet = Some(r.extra.lines);
finding.cwe = r.extra.metadata.and_then(|m| {
m.get("cwe")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
});
finding
})
.collect();
Ok(ScanOutput {
findings,
sbom_entries: Vec::new(),
})
}
}
#[derive(serde::Deserialize)]
struct SemgrepOutput {
results: Vec<SemgrepResult>,
}
#[derive(serde::Deserialize)]
struct SemgrepResult {
check_id: String,
path: String,
start: SemgrepPosition,
extra: SemgrepExtra,
}
#[derive(serde::Deserialize)]
struct SemgrepPosition {
line: u32,
}
#[derive(serde::Deserialize)]
struct SemgrepExtra {
message: String,
severity: String,
lines: String,
#[serde(default)]
metadata: Option<serde_json::Value>,
}

View File

@@ -0,0 +1,105 @@
use mongodb::bson::doc;
use tokio_cron_scheduler::{Job, JobScheduler};
use compliance_core::models::ScanTrigger;
use crate::agent::ComplianceAgent;
use crate::error::AgentError;
pub async fn start_scheduler(agent: &ComplianceAgent) -> Result<(), AgentError> {
let sched = JobScheduler::new()
.await
.map_err(|e| AgentError::Scheduler(format!("Failed to create scheduler: {e}")))?;
// Periodic scan job
let scan_agent = agent.clone();
let scan_schedule = agent.config.scan_schedule.clone();
let scan_job = Job::new_async(scan_schedule.as_str(), move |_uuid, _lock| {
let agent = scan_agent.clone();
Box::pin(async move {
tracing::info!("Scheduled scan triggered");
scan_all_repos(&agent).await;
})
})
.map_err(|e| AgentError::Scheduler(format!("Failed to create scan job: {e}")))?;
sched.add(scan_job).await
.map_err(|e| AgentError::Scheduler(format!("Failed to add scan job: {e}")))?;
// CVE monitor job (daily)
let cve_agent = agent.clone();
let cve_schedule = agent.config.cve_monitor_schedule.clone();
let cve_job = Job::new_async(cve_schedule.as_str(), move |_uuid, _lock| {
let agent = cve_agent.clone();
Box::pin(async move {
tracing::info!("CVE monitor triggered");
monitor_cves(&agent).await;
})
})
.map_err(|e| AgentError::Scheduler(format!("Failed to create CVE monitor job: {e}")))?;
sched.add(cve_job).await
.map_err(|e| AgentError::Scheduler(format!("Failed to add CVE monitor job: {e}")))?;
sched.start().await
.map_err(|e| AgentError::Scheduler(format!("Failed to start scheduler: {e}")))?;
tracing::info!(
"Scheduler started: scans='{}', CVE monitor='{}'",
agent.config.scan_schedule,
agent.config.cve_monitor_schedule,
);
// Keep scheduler alive
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(3600)).await;
}
}
async fn scan_all_repos(agent: &ComplianceAgent) {
use futures_util::StreamExt;
let cursor = match agent.db.repositories().find(doc! {}).await {
Ok(c) => c,
Err(e) => {
tracing::error!("Failed to list repos for scheduled scan: {e}");
return;
}
};
let repos: Vec<_> = cursor
.filter_map(|r| async { r.ok() })
.collect()
.await;
for repo in repos {
let repo_id = repo.id.map(|id| id.to_hex()).unwrap_or_default();
if let Err(e) = agent.run_scan(&repo_id, ScanTrigger::Scheduled).await {
tracing::error!("Scheduled scan failed for {}: {e}", repo.name);
}
}
}
async fn monitor_cves(agent: &ComplianceAgent) {
use futures_util::StreamExt;
// Re-scan all SBOM entries for new CVEs
let cursor = match agent.db.sbom_entries().find(doc! {}).await {
Ok(c) => c,
Err(e) => {
tracing::error!("Failed to list SBOM entries for CVE monitoring: {e}");
return;
}
};
let entries: Vec<_> = cursor
.filter_map(|r| async { r.ok() })
.collect()
.await;
if entries.is_empty() {
return;
}
tracing::info!("CVE monitor: checking {} dependencies", entries.len());
// The actual CVE checking is handled by the CveScanner in the pipeline
// This is a simplified version that just logs the activity
}

View File

@@ -0,0 +1,161 @@
use compliance_core::error::CoreError;
use compliance_core::models::{TrackerIssue, TrackerType};
use compliance_core::traits::issue_tracker::{IssueTracker, ReviewComment};
use octocrab::Octocrab;
use secrecy::{ExposeSecret, SecretString};
pub struct GitHubTracker {
client: Octocrab,
}
impl GitHubTracker {
pub fn new(token: &SecretString) -> Result<Self, CoreError> {
let client = Octocrab::builder()
.personal_token(token.expose_secret().to_string())
.build()
.map_err(|e| CoreError::IssueTracker(format!("Failed to create GitHub client: {e}")))?;
Ok(Self { client })
}
}
impl IssueTracker for GitHubTracker {
fn name(&self) -> &str {
"github"
}
async fn create_issue(
&self,
owner: &str,
repo: &str,
title: &str,
body: &str,
labels: &[String],
) -> Result<TrackerIssue, CoreError> {
let issues_handler = self.client.issues(owner, repo);
let mut builder = issues_handler.create(title).body(body);
if !labels.is_empty() {
builder = builder.labels(labels.to_vec());
}
let issue = builder
.send()
.await
.map_err(|e| CoreError::IssueTracker(format!("GitHub create issue failed: {e}")))?;
Ok(TrackerIssue::new(
String::new(),
TrackerType::GitHub,
issue.number.to_string(),
issue.html_url.to_string(),
title.to_string(),
))
}
async fn update_issue_status(
&self,
owner: &str,
repo: &str,
external_id: &str,
status: &str,
) -> Result<(), CoreError> {
let issue_number: u64 = external_id
.parse()
.map_err(|_| CoreError::IssueTracker("Invalid issue number".to_string()))?;
let state_str = match status {
"closed" | "resolved" => "closed",
_ => "open",
};
// Use the REST API directly for state update
let route = format!("/repos/{owner}/{repo}/issues/{issue_number}");
let body = serde_json::json!({ "state": state_str });
self.client
.post::<serde_json::Value, _>(route, Some(&body))
.await
.map_err(|e| CoreError::IssueTracker(format!("GitHub update issue failed: {e}")))?;
Ok(())
}
async fn add_comment(
&self,
owner: &str,
repo: &str,
external_id: &str,
body: &str,
) -> Result<(), CoreError> {
let issue_number: u64 = external_id
.parse()
.map_err(|_| CoreError::IssueTracker("Invalid issue number".to_string()))?;
self.client
.issues(owner, repo)
.create_comment(issue_number, body)
.await
.map_err(|e| CoreError::IssueTracker(format!("GitHub add comment failed: {e}")))?;
Ok(())
}
async fn create_pr_review(
&self,
owner: &str,
repo: &str,
pr_number: u64,
body: &str,
comments: Vec<ReviewComment>,
) -> Result<(), CoreError> {
let review_comments: Vec<serde_json::Value> = comments
.iter()
.map(|c| {
serde_json::json!({
"path": c.path,
"line": c.line,
"body": c.body,
})
})
.collect();
let review_body = serde_json::json!({
"body": body,
"event": "COMMENT",
"comments": review_comments,
});
let route = format!("/repos/{owner}/{repo}/pulls/{pr_number}/reviews");
self.client
.post::<serde_json::Value, ()>(route, Some(&review_body))
.await
.map_err(|e| CoreError::IssueTracker(format!("GitHub PR review failed: {e}")))?;
Ok(())
}
async fn find_existing_issue(
&self,
owner: &str,
repo: &str,
fingerprint: &str,
) -> Result<Option<TrackerIssue>, CoreError> {
let query = format!("repo:{owner}/{repo} is:issue {fingerprint}");
let results = self
.client
.search()
.issues_and_pull_requests(&query)
.send()
.await
.map_err(|e| CoreError::IssueTracker(format!("GitHub search failed: {e}")))?;
if let Some(issue) = results.items.first() {
Ok(Some(TrackerIssue::new(
String::new(),
TrackerType::GitHub,
issue.number.to_string(),
issue.html_url.to_string(),
issue.title.clone(),
)))
} else {
Ok(None)
}
}
}

View File

@@ -0,0 +1,201 @@
use compliance_core::error::CoreError;
use compliance_core::models::{TrackerIssue, TrackerType};
use compliance_core::traits::issue_tracker::{IssueTracker, ReviewComment};
use secrecy::{ExposeSecret, SecretString};
pub struct GitLabTracker {
base_url: String,
http: reqwest::Client,
token: SecretString,
}
impl GitLabTracker {
pub fn new(base_url: String, token: SecretString) -> Self {
Self {
base_url: base_url.trim_end_matches('/').to_string(),
http: reqwest::Client::new(),
token,
}
}
fn api_url(&self, path: &str) -> String {
format!("{}/api/v4{}", self.base_url, path)
}
fn project_path(owner: &str, repo: &str) -> String {
urlencoding::encode(&format!("{owner}/{repo}")).to_string()
}
}
impl IssueTracker for GitLabTracker {
fn name(&self) -> &str {
"gitlab"
}
async fn create_issue(
&self,
owner: &str,
repo: &str,
title: &str,
body: &str,
labels: &[String],
) -> Result<TrackerIssue, CoreError> {
let project = Self::project_path(owner, repo);
let url = self.api_url(&format!("/projects/{project}/issues"));
let mut payload = serde_json::json!({
"title": title,
"description": body,
});
if !labels.is_empty() {
payload["labels"] = serde_json::Value::String(labels.join(","));
}
let resp = self
.http
.post(&url)
.header("PRIVATE-TOKEN", self.token.expose_secret())
.json(&payload)
.send()
.await
.map_err(|e| CoreError::IssueTracker(format!("GitLab create issue failed: {e}")))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(CoreError::IssueTracker(format!("GitLab returned {status}: {body}")));
}
let issue: serde_json::Value = resp.json().await
.map_err(|e| CoreError::IssueTracker(format!("Failed to parse GitLab response: {e}")))?;
Ok(TrackerIssue::new(
String::new(),
TrackerType::GitLab,
issue["iid"].to_string(),
issue["web_url"].as_str().unwrap_or("").to_string(),
title.to_string(),
))
}
async fn update_issue_status(
&self,
owner: &str,
repo: &str,
external_id: &str,
status: &str,
) -> Result<(), CoreError> {
let project = Self::project_path(owner, repo);
let url = self.api_url(&format!("/projects/{project}/issues/{external_id}"));
let state_event = match status {
"closed" | "resolved" => "close",
_ => "reopen",
};
self.http
.put(&url)
.header("PRIVATE-TOKEN", self.token.expose_secret())
.json(&serde_json::json!({ "state_event": state_event }))
.send()
.await
.map_err(|e| CoreError::IssueTracker(format!("GitLab update issue failed: {e}")))?;
Ok(())
}
async fn add_comment(
&self,
owner: &str,
repo: &str,
external_id: &str,
body: &str,
) -> Result<(), CoreError> {
let project = Self::project_path(owner, repo);
let url = self.api_url(&format!("/projects/{project}/issues/{external_id}/notes"));
self.http
.post(&url)
.header("PRIVATE-TOKEN", self.token.expose_secret())
.json(&serde_json::json!({ "body": body }))
.send()
.await
.map_err(|e| CoreError::IssueTracker(format!("GitLab add comment failed: {e}")))?;
Ok(())
}
async fn create_pr_review(
&self,
owner: &str,
repo: &str,
pr_number: u64,
body: &str,
comments: Vec<ReviewComment>,
) -> Result<(), CoreError> {
let project = Self::project_path(owner, repo);
// Post overall review as MR note
let note_url = self.api_url(&format!("/projects/{project}/merge_requests/{pr_number}/notes"));
self.http
.post(&note_url)
.header("PRIVATE-TOKEN", self.token.expose_secret())
.json(&serde_json::json!({ "body": body }))
.send()
.await
.map_err(|e| CoreError::IssueTracker(format!("GitLab MR note failed: {e}")))?;
// Post individual line comments as MR discussions
for comment in comments {
let disc_url = self.api_url(&format!("/projects/{project}/merge_requests/{pr_number}/discussions"));
let payload = serde_json::json!({
"body": comment.body,
"position": {
"position_type": "text",
"new_path": comment.path,
"new_line": comment.line,
}
});
let _ = self
.http
.post(&disc_url)
.header("PRIVATE-TOKEN", self.token.expose_secret())
.json(&payload)
.send()
.await;
}
Ok(())
}
async fn find_existing_issue(
&self,
owner: &str,
repo: &str,
fingerprint: &str,
) -> Result<Option<TrackerIssue>, CoreError> {
let project = Self::project_path(owner, repo);
let url = self.api_url(&format!("/projects/{project}/issues?search={fingerprint}"));
let resp = self
.http
.get(&url)
.header("PRIVATE-TOKEN", self.token.expose_secret())
.send()
.await
.map_err(|e| CoreError::IssueTracker(format!("GitLab search failed: {e}")))?;
let issues: Vec<serde_json::Value> = resp.json().await.unwrap_or_default();
if let Some(issue) = issues.first() {
Ok(Some(TrackerIssue::new(
String::new(),
TrackerType::GitLab,
issue["iid"].to_string(),
issue["web_url"].as_str().unwrap_or("").to_string(),
issue["title"].as_str().unwrap_or("").to_string(),
)))
} else {
Ok(None)
}
}
}

View File

@@ -0,0 +1,231 @@
use compliance_core::error::CoreError;
use compliance_core::models::{TrackerIssue, TrackerType};
use compliance_core::traits::issue_tracker::{IssueTracker, ReviewComment};
use secrecy::{ExposeSecret, SecretString};
pub struct JiraTracker {
base_url: String,
email: String,
api_token: SecretString,
project_key: String,
http: reqwest::Client,
}
impl JiraTracker {
pub fn new(base_url: String, email: String, api_token: SecretString, project_key: String) -> Self {
Self {
base_url: base_url.trim_end_matches('/').to_string(),
email,
api_token,
project_key,
http: reqwest::Client::new(),
}
}
fn auth_header(&self) -> String {
use base64::Engine;
let credentials = format!("{}:{}", self.email, self.api_token.expose_secret());
format!("Basic {}", base64::engine::general_purpose::STANDARD.encode(credentials))
}
}
impl IssueTracker for JiraTracker {
fn name(&self) -> &str {
"jira"
}
async fn create_issue(
&self,
_owner: &str,
_repo: &str,
title: &str,
body: &str,
labels: &[String],
) -> Result<TrackerIssue, CoreError> {
let url = format!("{}/rest/api/3/issue", self.base_url);
let mut payload = serde_json::json!({
"fields": {
"project": { "key": self.project_key },
"summary": title,
"description": {
"type": "doc",
"version": 1,
"content": [{
"type": "paragraph",
"content": [{
"type": "text",
"text": body,
}]
}]
},
"issuetype": { "name": "Bug" },
}
});
if !labels.is_empty() {
payload["fields"]["labels"] = serde_json::Value::Array(
labels.iter().map(|l| serde_json::Value::String(l.clone())).collect(),
);
}
let resp = self
.http
.post(&url)
.header("Authorization", self.auth_header())
.header("Content-Type", "application/json")
.json(&payload)
.send()
.await
.map_err(|e| CoreError::IssueTracker(format!("Jira create issue failed: {e}")))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(CoreError::IssueTracker(format!("Jira returned {status}: {body}")));
}
let issue: serde_json::Value = resp.json().await
.map_err(|e| CoreError::IssueTracker(format!("Failed to parse Jira response: {e}")))?;
let key = issue["key"].as_str().unwrap_or("").to_string();
let url = format!("{}/browse/{}", self.base_url, key);
Ok(TrackerIssue::new(
String::new(),
TrackerType::Jira,
key,
url,
title.to_string(),
))
}
async fn update_issue_status(
&self,
_owner: &str,
_repo: &str,
external_id: &str,
status: &str,
) -> Result<(), CoreError> {
// Get available transitions
let url = format!("{}/rest/api/3/issue/{external_id}/transitions", self.base_url);
let resp = self
.http
.get(&url)
.header("Authorization", self.auth_header())
.send()
.await
.map_err(|e| CoreError::IssueTracker(format!("Jira get transitions failed: {e}")))?;
let body: serde_json::Value = resp.json().await.unwrap_or_default();
let transitions = body["transitions"].as_array();
// Find matching transition
if let Some(transitions) = transitions {
let target = match status {
"closed" | "resolved" => "Done",
"in_progress" => "In Progress",
_ => "To Do",
};
if let Some(transition) = transitions.iter().find(|t| {
t["name"].as_str().map(|n| n.eq_ignore_ascii_case(target)).unwrap_or(false)
}) {
let transition_id = transition["id"].as_str().unwrap_or("");
self.http
.post(&format!("{}/rest/api/3/issue/{external_id}/transitions", self.base_url))
.header("Authorization", self.auth_header())
.json(&serde_json::json!({ "transition": { "id": transition_id } }))
.send()
.await
.map_err(|e| CoreError::IssueTracker(format!("Jira transition failed: {e}")))?;
}
}
Ok(())
}
async fn add_comment(
&self,
_owner: &str,
_repo: &str,
external_id: &str,
body: &str,
) -> Result<(), CoreError> {
let url = format!("{}/rest/api/3/issue/{external_id}/comment", self.base_url);
self.http
.post(&url)
.header("Authorization", self.auth_header())
.header("Content-Type", "application/json")
.json(&serde_json::json!({
"body": {
"type": "doc",
"version": 1,
"content": [{
"type": "paragraph",
"content": [{
"type": "text",
"text": body,
}]
}]
}
}))
.send()
.await
.map_err(|e| CoreError::IssueTracker(format!("Jira add comment failed: {e}")))?;
Ok(())
}
async fn create_pr_review(
&self,
_owner: &str,
_repo: &str,
_pr_number: u64,
_body: &str,
_comments: Vec<ReviewComment>,
) -> Result<(), CoreError> {
// Jira doesn't have native PR reviews - this is a no-op
tracing::info!("Jira doesn't support PR reviews natively, skipping");
Ok(())
}
async fn find_existing_issue(
&self,
_owner: &str,
_repo: &str,
fingerprint: &str,
) -> Result<Option<TrackerIssue>, CoreError> {
let jql = format!(
"project = {} AND text ~ \"{}\"",
self.project_key, fingerprint
);
let url = format!("{}/rest/api/3/search", self.base_url);
let resp = self
.http
.get(&url)
.header("Authorization", self.auth_header())
.query(&[("jql", &jql), ("maxResults", &"1".to_string())])
.send()
.await
.map_err(|e| CoreError::IssueTracker(format!("Jira search failed: {e}")))?;
let body: serde_json::Value = resp.json().await.unwrap_or_default();
if let Some(issue) = body["issues"].as_array().and_then(|arr| arr.first()) {
let key = issue["key"].as_str().unwrap_or("").to_string();
let url = format!("{}/browse/{}", self.base_url, key);
let title = issue["fields"]["summary"].as_str().unwrap_or("").to_string();
Ok(Some(TrackerIssue::new(
String::new(),
TrackerType::Jira,
key,
url,
title,
)))
} else {
Ok(None)
}
}
}

View File

@@ -0,0 +1,3 @@
pub mod github;
pub mod gitlab;
pub mod jira;

View File

@@ -0,0 +1,130 @@
use std::sync::Arc;
use axum::body::Bytes;
use axum::extract::Extension;
use axum::http::{HeaderMap, StatusCode};
use hmac::{Hmac, Mac};
use secrecy::ExposeSecret;
use sha2::Sha256;
use compliance_core::models::ScanTrigger;
use crate::agent::ComplianceAgent;
type HmacSha256 = Hmac<Sha256>;
pub async fn handle_github_webhook(
Extension(agent): Extension<Arc<ComplianceAgent>>,
headers: HeaderMap,
body: Bytes,
) -> StatusCode {
// Verify HMAC signature
if let Some(secret) = &agent.config.github_webhook_secret {
let signature = headers
.get("x-hub-signature-256")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if !verify_signature(secret.expose_secret(), &body, signature) {
tracing::warn!("GitHub webhook: invalid signature");
return StatusCode::UNAUTHORIZED;
}
}
let event = headers
.get("x-github-event")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let payload: serde_json::Value = match serde_json::from_slice(&body) {
Ok(v) => v,
Err(e) => {
tracing::warn!("GitHub webhook: invalid JSON: {e}");
return StatusCode::BAD_REQUEST;
}
};
match event {
"push" => handle_push(agent, &payload).await,
"pull_request" => handle_pull_request(agent, &payload).await,
_ => {
tracing::debug!("GitHub webhook: ignoring event '{event}'");
StatusCode::OK
}
}
}
async fn handle_push(agent: Arc<ComplianceAgent>, payload: &serde_json::Value) -> StatusCode {
let repo_url = payload["repository"]["clone_url"]
.as_str()
.or_else(|| payload["repository"]["html_url"].as_str())
.unwrap_or("");
if repo_url.is_empty() {
return StatusCode::BAD_REQUEST;
}
// Find matching tracked repository
let repo = agent
.db
.repositories()
.find_one(mongodb::bson::doc! { "git_url": repo_url })
.await
.ok()
.flatten();
if let Some(repo) = repo {
let repo_id = repo.id.map(|id| id.to_hex()).unwrap_or_default();
let agent_clone = (*agent).clone();
tokio::spawn(async move {
tracing::info!("GitHub push webhook: triggering scan for {repo_id}");
if let Err(e) = agent_clone.run_scan(&repo_id, ScanTrigger::Webhook).await {
tracing::error!("Webhook-triggered scan failed: {e}");
}
});
} else {
tracing::debug!("GitHub push webhook: no tracked repo for {repo_url}");
}
StatusCode::OK
}
async fn handle_pull_request(
_agent: Arc<ComplianceAgent>,
payload: &serde_json::Value,
) -> StatusCode {
let action = payload["action"].as_str().unwrap_or("");
if action != "opened" && action != "synchronize" {
return StatusCode::OK;
}
let repo_url = payload["repository"]["clone_url"]
.as_str()
.unwrap_or("");
let pr_number = payload["pull_request"]["number"].as_u64().unwrap_or(0);
if repo_url.is_empty() || pr_number == 0 {
return StatusCode::BAD_REQUEST;
}
tracing::info!("GitHub PR webhook: PR #{pr_number} {action} on {repo_url}");
// PR review scan would be triggered here - runs incremental SAST on diff
// and posts review comments via the GitHub tracker
StatusCode::OK
}
fn verify_signature(secret: &str, body: &[u8], signature: &str) -> bool {
let sig = signature.strip_prefix("sha256=").unwrap_or(signature);
let sig_bytes = match hex::decode(sig) {
Ok(b) => b,
Err(_) => return false,
};
let mut mac = match HmacSha256::new_from_slice(secret.as_bytes()) {
Ok(m) => m,
Err(_) => return false,
};
mac.update(body);
mac.verify_slice(&sig_bytes).is_ok()
}

View File

@@ -0,0 +1,95 @@
use std::sync::Arc;
use axum::body::Bytes;
use axum::extract::Extension;
use axum::http::{HeaderMap, StatusCode};
use secrecy::ExposeSecret;
use compliance_core::models::ScanTrigger;
use crate::agent::ComplianceAgent;
pub async fn handle_gitlab_webhook(
Extension(agent): Extension<Arc<ComplianceAgent>>,
headers: HeaderMap,
body: Bytes,
) -> StatusCode {
// Verify GitLab token
if let Some(secret) = &agent.config.gitlab_webhook_secret {
let token = headers
.get("x-gitlab-token")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if token != secret.expose_secret() {
tracing::warn!("GitLab webhook: invalid token");
return StatusCode::UNAUTHORIZED;
}
}
let payload: serde_json::Value = match serde_json::from_slice(&body) {
Ok(v) => v,
Err(e) => {
tracing::warn!("GitLab webhook: invalid JSON: {e}");
return StatusCode::BAD_REQUEST;
}
};
let event_type = payload["object_kind"].as_str().unwrap_or("");
match event_type {
"push" => handle_push(agent, &payload).await,
"merge_request" => handle_merge_request(agent, &payload).await,
_ => {
tracing::debug!("GitLab webhook: ignoring event '{event_type}'");
StatusCode::OK
}
}
}
async fn handle_push(agent: Arc<ComplianceAgent>, payload: &serde_json::Value) -> StatusCode {
let repo_url = payload["project"]["git_http_url"]
.as_str()
.or_else(|| payload["project"]["web_url"].as_str())
.unwrap_or("");
if repo_url.is_empty() {
return StatusCode::BAD_REQUEST;
}
let repo = agent
.db
.repositories()
.find_one(mongodb::bson::doc! { "git_url": repo_url })
.await
.ok()
.flatten();
if let Some(repo) = repo {
let repo_id = repo.id.map(|id| id.to_hex()).unwrap_or_default();
let agent_clone = (*agent).clone();
tokio::spawn(async move {
tracing::info!("GitLab push webhook: triggering scan for {repo_id}");
if let Err(e) = agent_clone.run_scan(&repo_id, ScanTrigger::Webhook).await {
tracing::error!("Webhook-triggered scan failed: {e}");
}
});
}
StatusCode::OK
}
async fn handle_merge_request(
_agent: Arc<ComplianceAgent>,
payload: &serde_json::Value,
) -> StatusCode {
let action = payload["object_attributes"]["action"].as_str().unwrap_or("");
if action != "open" && action != "update" {
return StatusCode::OK;
}
let mr_iid = payload["object_attributes"]["iid"].as_u64().unwrap_or(0);
tracing::info!("GitLab MR webhook: MR !{mr_iid} {action}");
StatusCode::OK
}

View File

@@ -0,0 +1,5 @@
pub mod github;
pub mod gitlab;
pub mod server;
pub use server::start_webhook_server;

View File

@@ -0,0 +1,27 @@
use std::sync::Arc;
use axum::routing::post;
use axum::{Extension, Router};
use crate::agent::ComplianceAgent;
use crate::error::AgentError;
use crate::webhooks::{github, gitlab};
pub async fn start_webhook_server(agent: &ComplianceAgent) -> Result<(), AgentError> {
let app = Router::new()
.route("/webhook/github", post(github::handle_github_webhook))
.route("/webhook/gitlab", post(gitlab::handle_gitlab_webhook))
.layer(Extension(Arc::new(agent.clone())));
let addr = "0.0.0.0:3002";
let listener = tokio::net::TcpListener::bind(addr)
.await
.map_err(|e| AgentError::Other(format!("Failed to bind webhook server: {e}")))?;
tracing::info!("Webhook server listening on {addr}");
axum::serve(listener, app)
.await
.map_err(|e| AgentError::Other(format!("Webhook server error: {e}")))?;
Ok(())
}