feat: AI-driven automated penetration testing (#12)
Some checks failed
CI / Clippy (push) Failing after 1m51s
CI / Security Audit (push) Successful in 2m1s
CI / Tests (push) Has been skipped
CI / Detect Changes (push) Has been skipped
CI / Deploy Agent (push) Has been skipped
CI / Deploy Dashboard (push) Has been skipped
CI / Deploy Docs (push) Has been skipped
CI / Format (push) Failing after 42s
CI / Deploy MCP (push) Has been skipped
Some checks failed
CI / Clippy (push) Failing after 1m51s
CI / Security Audit (push) Successful in 2m1s
CI / Tests (push) Has been skipped
CI / Detect Changes (push) Has been skipped
CI / Deploy Agent (push) Has been skipped
CI / Deploy Dashboard (push) Has been skipped
CI / Deploy Docs (push) Has been skipped
CI / Format (push) Failing after 42s
CI / Deploy MCP (push) Has been skipped
This commit was merged in pull request #12.
This commit is contained in:
130
compliance-dast/src/tools/auth_bypass.rs
Normal file
130
compliance-dast/src/tools/auth_bypass.rs
Normal file
@@ -0,0 +1,130 @@
|
||||
use compliance_core::error::CoreError;
|
||||
use compliance_core::traits::dast_agent::{DastAgent, DastContext, DiscoveredEndpoint, EndpointParameter};
|
||||
use compliance_core::traits::pentest_tool::{PentestTool, PentestToolContext, PentestToolResult};
|
||||
use serde_json::json;
|
||||
|
||||
use crate::agents::auth_bypass::AuthBypassAgent;
|
||||
|
||||
/// PentestTool wrapper around the existing AuthBypassAgent.
|
||||
pub struct AuthBypassTool {
|
||||
http: reqwest::Client,
|
||||
agent: AuthBypassAgent,
|
||||
}
|
||||
|
||||
impl AuthBypassTool {
|
||||
pub fn new(http: reqwest::Client) -> Self {
|
||||
let agent = AuthBypassAgent::new(http.clone());
|
||||
Self { http, agent }
|
||||
}
|
||||
|
||||
fn parse_endpoints(input: &serde_json::Value) -> Vec<DiscoveredEndpoint> {
|
||||
let mut endpoints = Vec::new();
|
||||
if let Some(arr) = input.get("endpoints").and_then(|v| v.as_array()) {
|
||||
for ep in arr {
|
||||
let url = ep.get("url").and_then(|v| v.as_str()).unwrap_or_default().to_string();
|
||||
let method = ep.get("method").and_then(|v| v.as_str()).unwrap_or("GET").to_string();
|
||||
let mut parameters = Vec::new();
|
||||
if let Some(params) = ep.get("parameters").and_then(|v| v.as_array()) {
|
||||
for p in params {
|
||||
parameters.push(EndpointParameter {
|
||||
name: p.get("name").and_then(|v| v.as_str()).unwrap_or_default().to_string(),
|
||||
location: p.get("location").and_then(|v| v.as_str()).unwrap_or("query").to_string(),
|
||||
param_type: p.get("param_type").and_then(|v| v.as_str()).map(String::from),
|
||||
example_value: p.get("example_value").and_then(|v| v.as_str()).map(String::from),
|
||||
});
|
||||
}
|
||||
}
|
||||
endpoints.push(DiscoveredEndpoint {
|
||||
url,
|
||||
method,
|
||||
parameters,
|
||||
content_type: ep.get("content_type").and_then(|v| v.as_str()).map(String::from),
|
||||
requires_auth: ep.get("requires_auth").and_then(|v| v.as_bool()).unwrap_or(false),
|
||||
});
|
||||
}
|
||||
}
|
||||
endpoints
|
||||
}
|
||||
}
|
||||
|
||||
impl PentestTool for AuthBypassTool {
|
||||
fn name(&self) -> &str {
|
||||
"auth_bypass_scanner"
|
||||
}
|
||||
|
||||
fn description(&self) -> &str {
|
||||
"Tests endpoints for authentication bypass vulnerabilities. Tries accessing protected \
|
||||
endpoints without credentials, with manipulated tokens, and with common default credentials."
|
||||
}
|
||||
|
||||
fn input_schema(&self) -> serde_json::Value {
|
||||
json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"endpoints": {
|
||||
"type": "array",
|
||||
"description": "Endpoints to test for authentication bypass",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"url": { "type": "string" },
|
||||
"method": { "type": "string", "enum": ["GET", "POST", "PUT", "PATCH", "DELETE"] },
|
||||
"parameters": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": { "type": "string" },
|
||||
"location": { "type": "string" },
|
||||
"param_type": { "type": "string" },
|
||||
"example_value": { "type": "string" }
|
||||
},
|
||||
"required": ["name"]
|
||||
}
|
||||
},
|
||||
"requires_auth": { "type": "boolean", "description": "Whether this endpoint requires authentication" }
|
||||
},
|
||||
"required": ["url", "method"]
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": ["endpoints"]
|
||||
})
|
||||
}
|
||||
|
||||
fn execute<'a>(
|
||||
&'a self,
|
||||
input: serde_json::Value,
|
||||
context: &'a PentestToolContext,
|
||||
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<PentestToolResult, CoreError>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
let endpoints = Self::parse_endpoints(&input);
|
||||
if endpoints.is_empty() {
|
||||
return Ok(PentestToolResult {
|
||||
summary: "No endpoints provided to test.".to_string(),
|
||||
findings: Vec::new(),
|
||||
data: json!({}),
|
||||
});
|
||||
}
|
||||
|
||||
let dast_context = DastContext {
|
||||
endpoints,
|
||||
technologies: Vec::new(),
|
||||
sast_hints: Vec::new(),
|
||||
};
|
||||
|
||||
let findings = self.agent.run(&context.target, &dast_context).await?;
|
||||
let count = findings.len();
|
||||
|
||||
Ok(PentestToolResult {
|
||||
summary: if count > 0 {
|
||||
format!("Found {count} authentication bypass vulnerabilities.")
|
||||
} else {
|
||||
"No authentication bypass vulnerabilities detected.".to_string()
|
||||
},
|
||||
findings,
|
||||
data: json!({ "endpoints_tested": dast_context.endpoints.len() }),
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user