All checks were successful
CI / Format (push) Successful in 4s
CI / Clippy (push) Successful in 4m19s
CI / Security Audit (push) Successful in 1m44s
CI / Detect Changes (push) Successful in 5s
CI / Tests (push) Successful in 5m15s
CI / Deploy Agent (push) Successful in 2s
CI / Deploy Dashboard (push) Successful in 2s
CI / Deploy Docs (push) Has been skipped
CI / Deploy MCP (push) Successful in 2s
252 lines
8.6 KiB
Rust
252 lines
8.6 KiB
Rust
use compliance_core::error::CoreError;
|
|
use compliance_core::traits::dast_agent::{
|
|
DastAgent, DastContext, DiscoveredEndpoint, EndpointParameter,
|
|
};
|
|
use compliance_core::traits::pentest_tool::{PentestTool, PentestToolContext, PentestToolResult};
|
|
use serde_json::json;
|
|
|
|
use crate::agents::xss::XssAgent;
|
|
|
|
/// PentestTool wrapper around the existing XssAgent.
|
|
pub struct XssTool {
|
|
_http: reqwest::Client,
|
|
agent: XssAgent,
|
|
}
|
|
|
|
impl XssTool {
|
|
pub fn new(http: reqwest::Client) -> Self {
|
|
let agent = XssAgent::new(http.clone());
|
|
Self { _http: http, agent }
|
|
}
|
|
|
|
fn parse_endpoints(input: &serde_json::Value) -> Vec<DiscoveredEndpoint> {
|
|
let mut endpoints = Vec::new();
|
|
if let Some(arr) = input.get("endpoints").and_then(|v| v.as_array()) {
|
|
for ep in arr {
|
|
let url = ep
|
|
.get("url")
|
|
.and_then(|v| v.as_str())
|
|
.unwrap_or_default()
|
|
.to_string();
|
|
let method = ep
|
|
.get("method")
|
|
.and_then(|v| v.as_str())
|
|
.unwrap_or("GET")
|
|
.to_string();
|
|
let mut parameters = Vec::new();
|
|
if let Some(params) = ep.get("parameters").and_then(|v| v.as_array()) {
|
|
for p in params {
|
|
parameters.push(EndpointParameter {
|
|
name: p
|
|
.get("name")
|
|
.and_then(|v| v.as_str())
|
|
.unwrap_or_default()
|
|
.to_string(),
|
|
location: p
|
|
.get("location")
|
|
.and_then(|v| v.as_str())
|
|
.unwrap_or("query")
|
|
.to_string(),
|
|
param_type: p
|
|
.get("param_type")
|
|
.and_then(|v| v.as_str())
|
|
.map(String::from),
|
|
example_value: p
|
|
.get("example_value")
|
|
.and_then(|v| v.as_str())
|
|
.map(String::from),
|
|
});
|
|
}
|
|
}
|
|
endpoints.push(DiscoveredEndpoint {
|
|
url,
|
|
method,
|
|
parameters,
|
|
content_type: ep
|
|
.get("content_type")
|
|
.and_then(|v| v.as_str())
|
|
.map(String::from),
|
|
requires_auth: ep
|
|
.get("requires_auth")
|
|
.and_then(|v| v.as_bool())
|
|
.unwrap_or(false),
|
|
});
|
|
}
|
|
}
|
|
endpoints
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use serde_json::json;
|
|
|
|
#[test]
|
|
fn parse_endpoints_basic() {
|
|
let input = json!({
|
|
"endpoints": [
|
|
{
|
|
"url": "https://example.com/search",
|
|
"method": "GET",
|
|
"parameters": [
|
|
{ "name": "q", "location": "query" }
|
|
]
|
|
}
|
|
]
|
|
});
|
|
let endpoints = XssTool::parse_endpoints(&input);
|
|
assert_eq!(endpoints.len(), 1);
|
|
assert_eq!(endpoints[0].url, "https://example.com/search");
|
|
assert_eq!(endpoints[0].method, "GET");
|
|
assert_eq!(endpoints[0].parameters.len(), 1);
|
|
assert_eq!(endpoints[0].parameters[0].name, "q");
|
|
assert_eq!(endpoints[0].parameters[0].location, "query");
|
|
}
|
|
|
|
#[test]
|
|
fn parse_endpoints_empty() {
|
|
let input = json!({ "endpoints": [] });
|
|
assert!(XssTool::parse_endpoints(&input).is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn parse_endpoints_missing_key() {
|
|
let input = json!({});
|
|
assert!(XssTool::parse_endpoints(&input).is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn parse_endpoints_defaults() {
|
|
let input = json!({
|
|
"endpoints": [
|
|
{ "url": "https://example.com/api", "parameters": [] }
|
|
]
|
|
});
|
|
let endpoints = XssTool::parse_endpoints(&input);
|
|
assert_eq!(endpoints[0].method, "GET"); // default
|
|
assert!(!endpoints[0].requires_auth); // default false
|
|
}
|
|
|
|
#[test]
|
|
fn parse_endpoints_full_params() {
|
|
let input = json!({
|
|
"endpoints": [{
|
|
"url": "https://example.com",
|
|
"method": "POST",
|
|
"content_type": "application/json",
|
|
"requires_auth": true,
|
|
"parameters": [{
|
|
"name": "body",
|
|
"location": "body",
|
|
"param_type": "string",
|
|
"example_value": "test"
|
|
}]
|
|
}]
|
|
});
|
|
let endpoints = XssTool::parse_endpoints(&input);
|
|
assert_eq!(endpoints[0].method, "POST");
|
|
assert_eq!(
|
|
endpoints[0].content_type.as_deref(),
|
|
Some("application/json")
|
|
);
|
|
assert!(endpoints[0].requires_auth);
|
|
assert_eq!(
|
|
endpoints[0].parameters[0].param_type.as_deref(),
|
|
Some("string")
|
|
);
|
|
assert_eq!(
|
|
endpoints[0].parameters[0].example_value.as_deref(),
|
|
Some("test")
|
|
);
|
|
}
|
|
}
|
|
|
|
impl PentestTool for XssTool {
|
|
fn name(&self) -> &str {
|
|
"xss_scanner"
|
|
}
|
|
|
|
fn description(&self) -> &str {
|
|
"Tests endpoints for Cross-Site Scripting (XSS) vulnerabilities including reflected, \
|
|
stored, and DOM-based XSS. Provide endpoints with parameters to test."
|
|
}
|
|
|
|
fn input_schema(&self) -> serde_json::Value {
|
|
json!({
|
|
"type": "object",
|
|
"properties": {
|
|
"endpoints": {
|
|
"type": "array",
|
|
"description": "Endpoints to test for XSS",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"url": { "type": "string" },
|
|
"method": { "type": "string", "enum": ["GET", "POST", "PUT", "PATCH", "DELETE"] },
|
|
"parameters": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"name": { "type": "string" },
|
|
"location": { "type": "string", "enum": ["query", "body", "header", "path", "cookie"] },
|
|
"param_type": { "type": "string" },
|
|
"example_value": { "type": "string" }
|
|
},
|
|
"required": ["name"]
|
|
}
|
|
}
|
|
},
|
|
"required": ["url", "method", "parameters"]
|
|
}
|
|
},
|
|
"custom_payloads": {
|
|
"type": "array",
|
|
"description": "Optional additional XSS payloads to test",
|
|
"items": { "type": "string" }
|
|
}
|
|
},
|
|
"required": ["endpoints"]
|
|
})
|
|
}
|
|
|
|
fn execute<'a>(
|
|
&'a self,
|
|
input: serde_json::Value,
|
|
context: &'a PentestToolContext,
|
|
) -> std::pin::Pin<
|
|
Box<dyn std::future::Future<Output = Result<PentestToolResult, CoreError>> + Send + 'a>,
|
|
> {
|
|
Box::pin(async move {
|
|
let endpoints = Self::parse_endpoints(&input);
|
|
if endpoints.is_empty() {
|
|
return Ok(PentestToolResult {
|
|
summary: "No endpoints provided to test.".to_string(),
|
|
findings: Vec::new(),
|
|
data: json!({}),
|
|
});
|
|
}
|
|
|
|
let dast_context = DastContext {
|
|
endpoints,
|
|
technologies: Vec::new(),
|
|
sast_hints: Vec::new(),
|
|
};
|
|
|
|
let findings = self.agent.run(&context.target, &dast_context).await?;
|
|
let count = findings.len();
|
|
|
|
Ok(PentestToolResult {
|
|
summary: if count > 0 {
|
|
format!("Found {count} XSS vulnerabilities.")
|
|
} else {
|
|
"No XSS vulnerabilities detected.".to_string()
|
|
},
|
|
findings,
|
|
data: json!({ "endpoints_tested": dast_context.endpoints.len() }),
|
|
})
|
|
})
|
|
}
|
|
}
|