All checks were successful
CI / Format (push) Successful in 4s
CI / Clippy (push) Successful in 4m19s
CI / Security Audit (push) Successful in 1m44s
CI / Tests (push) Successful in 5m15s
CI / Detect Changes (push) Successful in 5s
CI / Deploy Agent (push) Successful in 2s
CI / Deploy Dashboard (push) Successful in 2s
CI / Deploy Docs (push) Has been skipped
CI / Deploy MCP (push) Successful in 2s
232 lines
6.8 KiB
Rust
232 lines
6.8 KiB
Rust
use std::path::Path;
|
|
|
|
use compliance_core::models::{Finding, ScanType, Severity};
|
|
use compliance_core::traits::{ScanOutput, Scanner};
|
|
use compliance_core::CoreError;
|
|
|
|
use crate::pipeline::dedup;
|
|
|
|
pub struct SemgrepScanner;
|
|
|
|
impl Scanner for SemgrepScanner {
|
|
fn name(&self) -> &str {
|
|
"semgrep"
|
|
}
|
|
|
|
fn scan_type(&self) -> ScanType {
|
|
ScanType::Sast
|
|
}
|
|
|
|
#[tracing::instrument(skip_all)]
|
|
async fn scan(&self, repo_path: &Path, repo_id: &str) -> Result<ScanOutput, CoreError> {
|
|
let output = tokio::process::Command::new("semgrep")
|
|
.args(["--config=auto", "--json", "--quiet"])
|
|
.arg(repo_path)
|
|
.output()
|
|
.await
|
|
.map_err(|e| CoreError::Scanner {
|
|
scanner: "semgrep".to_string(),
|
|
source: Box::new(e),
|
|
})?;
|
|
|
|
if !output.status.success() && output.stdout.is_empty() {
|
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
|
tracing::warn!("Semgrep exited with {}: {stderr}", output.status);
|
|
return Ok(ScanOutput::default());
|
|
}
|
|
|
|
let result: SemgrepOutput = serde_json::from_slice(&output.stdout)?;
|
|
let findings = result
|
|
.results
|
|
.into_iter()
|
|
.map(|r| {
|
|
let severity = match r.extra.severity.as_str() {
|
|
"ERROR" => Severity::High,
|
|
"WARNING" => Severity::Medium,
|
|
"INFO" => Severity::Low,
|
|
_ => Severity::Info,
|
|
};
|
|
|
|
let fingerprint = dedup::compute_fingerprint(&[
|
|
repo_id,
|
|
&r.check_id,
|
|
&r.path,
|
|
&r.start.line.to_string(),
|
|
]);
|
|
|
|
let mut finding = Finding::new(
|
|
repo_id.to_string(),
|
|
fingerprint,
|
|
"semgrep".to_string(),
|
|
ScanType::Sast,
|
|
r.extra.message.clone(),
|
|
r.extra.message,
|
|
severity,
|
|
);
|
|
finding.rule_id = Some(r.check_id);
|
|
finding.file_path = Some(r.path);
|
|
finding.line_number = Some(r.start.line);
|
|
finding.code_snippet = Some(r.extra.lines);
|
|
finding.cwe = r
|
|
.extra
|
|
.metadata
|
|
.and_then(|m| m.get("cwe").and_then(|v| v.as_str()).map(|s| s.to_string()));
|
|
finding
|
|
})
|
|
.collect();
|
|
|
|
Ok(ScanOutput {
|
|
findings,
|
|
sbom_entries: Vec::new(),
|
|
})
|
|
}
|
|
}
|
|
|
|
#[derive(serde::Deserialize)]
|
|
struct SemgrepOutput {
|
|
results: Vec<SemgrepResult>,
|
|
}
|
|
|
|
#[derive(serde::Deserialize)]
|
|
struct SemgrepResult {
|
|
check_id: String,
|
|
path: String,
|
|
start: SemgrepPosition,
|
|
extra: SemgrepExtra,
|
|
}
|
|
|
|
#[derive(serde::Deserialize)]
|
|
struct SemgrepPosition {
|
|
line: u32,
|
|
}
|
|
|
|
#[derive(serde::Deserialize)]
|
|
struct SemgrepExtra {
|
|
message: String,
|
|
severity: String,
|
|
lines: String,
|
|
#[serde(default)]
|
|
metadata: Option<serde_json::Value>,
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn deserialize_semgrep_output() {
|
|
let json = r#"{
|
|
"results": [
|
|
{
|
|
"check_id": "python.lang.security.audit.exec-detected",
|
|
"path": "src/main.py",
|
|
"start": {"line": 15},
|
|
"extra": {
|
|
"message": "Detected use of exec()",
|
|
"severity": "ERROR",
|
|
"lines": "exec(user_input)",
|
|
"metadata": {"cwe": "CWE-78"}
|
|
}
|
|
}
|
|
]
|
|
}"#;
|
|
let output: SemgrepOutput = serde_json::from_str(json).unwrap();
|
|
assert_eq!(output.results.len(), 1);
|
|
|
|
let r = &output.results[0];
|
|
assert_eq!(r.check_id, "python.lang.security.audit.exec-detected");
|
|
assert_eq!(r.path, "src/main.py");
|
|
assert_eq!(r.start.line, 15);
|
|
assert_eq!(r.extra.message, "Detected use of exec()");
|
|
assert_eq!(r.extra.severity, "ERROR");
|
|
assert_eq!(r.extra.lines, "exec(user_input)");
|
|
assert_eq!(
|
|
r.extra
|
|
.metadata
|
|
.as_ref()
|
|
.unwrap()
|
|
.get("cwe")
|
|
.unwrap()
|
|
.as_str(),
|
|
Some("CWE-78")
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn deserialize_semgrep_empty_results() {
|
|
let json = r#"{"results": []}"#;
|
|
let output: SemgrepOutput = serde_json::from_str(json).unwrap();
|
|
assert!(output.results.is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn deserialize_semgrep_no_metadata() {
|
|
let json = r#"{
|
|
"results": [
|
|
{
|
|
"check_id": "rule-1",
|
|
"path": "app.py",
|
|
"start": {"line": 1},
|
|
"extra": {
|
|
"message": "found something",
|
|
"severity": "WARNING",
|
|
"lines": "import os"
|
|
}
|
|
}
|
|
]
|
|
}"#;
|
|
let output: SemgrepOutput = serde_json::from_str(json).unwrap();
|
|
assert!(output.results[0].extra.metadata.is_none());
|
|
}
|
|
|
|
#[test]
|
|
fn semgrep_severity_mapping() {
|
|
let cases = vec![
|
|
("ERROR", "High"),
|
|
("WARNING", "Medium"),
|
|
("INFO", "Low"),
|
|
("UNKNOWN", "Info"),
|
|
];
|
|
for (input, expected) in cases {
|
|
let result = match input {
|
|
"ERROR" => "High",
|
|
"WARNING" => "Medium",
|
|
"INFO" => "Low",
|
|
_ => "Info",
|
|
};
|
|
assert_eq!(result, expected, "Severity for '{input}'");
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn deserialize_semgrep_multiple_results() {
|
|
let json = r#"{
|
|
"results": [
|
|
{
|
|
"check_id": "rule-a",
|
|
"path": "a.py",
|
|
"start": {"line": 1},
|
|
"extra": {
|
|
"message": "msg a",
|
|
"severity": "ERROR",
|
|
"lines": "line a"
|
|
}
|
|
},
|
|
{
|
|
"check_id": "rule-b",
|
|
"path": "b.py",
|
|
"start": {"line": 99},
|
|
"extra": {
|
|
"message": "msg b",
|
|
"severity": "INFO",
|
|
"lines": "line b"
|
|
}
|
|
}
|
|
]
|
|
}"#;
|
|
let output: SemgrepOutput = serde_json::from_str(json).unwrap();
|
|
assert_eq!(output.results.len(), 2);
|
|
assert_eq!(output.results[1].start.line, 99);
|
|
}
|
|
}
|