feat: pentest feature improvements — streaming, pause/resume, encryption, browser tool, reports, docs

- True SSE streaming via broadcast channels (DashMap per session)
- Session pause/resume with watch channels + dashboard buttons
- AES-256-GCM credential encryption at rest (PENTEST_ENCRYPTION_KEY)
- Concurrency limiter (Semaphore, max 5 sessions, 429 on overflow)
- Browser tool: headless Chrome CDP automation (navigate, click, fill, screenshot, evaluate)
- Report code-level correlation: SAST findings, code graph, SBOM linked per DAST finding
- Split html.rs (1919 LOC) into html/ module directory (8 files)
- Wizard: target/repo dropdowns from existing data, SSH key display, close button on all steps
- Auth: auto-register with optional registration URL (Playwright discovery), plus-addressing email, IMAP overrides
- Attack chain: tool input/output in detail panel, running node pulse animation
- Architecture docs with Mermaid diagrams + 8 screenshots

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Sharang Parnerkar
2026-03-17 00:07:50 +01:00
parent 11e1c5f438
commit a912ec9ad9
45 changed files with 5927 additions and 2133 deletions

View File

@@ -2,7 +2,7 @@ use std::sync::Arc;
use std::time::Duration;
use mongodb::bson::doc;
use tokio::sync::broadcast;
use tokio::sync::{broadcast, watch};
use compliance_core::models::dast::DastTarget;
use compliance_core::models::pentest::*;
@@ -22,29 +22,27 @@ pub struct PentestOrchestrator {
pub(crate) llm: Arc<LlmClient>,
pub(crate) db: Database,
pub(crate) event_tx: broadcast::Sender<PentestEvent>,
pub(crate) pause_rx: Option<watch::Receiver<bool>>,
}
impl PentestOrchestrator {
pub fn new(llm: Arc<LlmClient>, db: Database) -> Self {
let (event_tx, _) = broadcast::channel(256);
/// Create a new orchestrator with an externally-provided broadcast sender
/// and an optional pause receiver.
pub fn new(
llm: Arc<LlmClient>,
db: Database,
event_tx: broadcast::Sender<PentestEvent>,
pause_rx: Option<watch::Receiver<bool>>,
) -> Self {
Self {
tool_registry: ToolRegistry::new(),
llm,
db,
event_tx,
pause_rx,
}
}
#[allow(dead_code)]
pub fn subscribe(&self) -> broadcast::Receiver<PentestEvent> {
self.event_tx.subscribe()
}
#[allow(dead_code)]
pub fn event_sender(&self) -> broadcast::Sender<PentestEvent> {
self.event_tx.clone()
}
/// Run a pentest session with timeout and automatic failure marking on errors.
pub async fn run_session_guarded(
&self,
@@ -54,8 +52,18 @@ impl PentestOrchestrator {
) {
let session_id = session.id;
// Use config-specified timeout or default
let timeout_duration = session
.config
.as_ref()
.and_then(|c| c.max_duration_minutes)
.map(|m| Duration::from_secs(m as u64 * 60))
.unwrap_or(SESSION_TIMEOUT);
let timeout_minutes = timeout_duration.as_secs() / 60;
match tokio::time::timeout(
SESSION_TIMEOUT,
timeout_duration,
self.run_session(session, target, initial_message),
)
.await
@@ -72,12 +80,10 @@ impl PentestOrchestrator {
});
}
Err(_) => {
tracing::warn!(?session_id, "Pentest session timed out after 30 minutes");
self.mark_session_failed(session_id, "Session timed out after 30 minutes")
.await;
let _ = self.event_tx.send(PentestEvent::Error {
message: "Session timed out after 30 minutes".to_string(),
});
let msg = format!("Session timed out after {timeout_minutes} minutes");
tracing::warn!(?session_id, "{msg}");
self.mark_session_failed(session_id, &msg).await;
let _ = self.event_tx.send(PentestEvent::Error { message: msg });
}
}
}
@@ -103,6 +109,45 @@ impl PentestOrchestrator {
}
}
/// Check if the session is paused; if so, update DB status and wait until resumed.
async fn wait_if_paused(&self, session: &PentestSession) {
let Some(ref pause_rx) = self.pause_rx else {
return;
};
let mut rx = pause_rx.clone();
if !*rx.borrow() {
return;
}
// We are paused — update DB status
if let Some(sid) = session.id {
let _ = self
.db
.pentest_sessions()
.update_one(doc! { "_id": sid }, doc! { "$set": { "status": "paused" }})
.await;
}
let _ = self.event_tx.send(PentestEvent::Paused);
// Wait until unpaused
while *rx.borrow() {
if rx.changed().await.is_err() {
break;
}
}
// Resumed — update DB status back to running
if let Some(sid) = session.id {
let _ = self
.db
.pentest_sessions()
.update_one(doc! { "_id": sid }, doc! { "$set": { "status": "running" }})
.await;
}
let _ = self.event_tx.send(PentestEvent::Resumed);
}
async fn run_session(
&self,
session: &PentestSession,
@@ -175,6 +220,9 @@ impl PentestOrchestrator {
let mut prev_node_ids: Vec<String> = Vec::new();
for _iteration in 0..max_iterations {
// Check pause state at top of each iteration
self.wait_if_paused(session).await;
let response = self
.llm
.chat_with_tools(messages.clone(), &tool_defs, Some(0.2), Some(8192))
@@ -417,6 +465,21 @@ impl PentestOrchestrator {
.await;
}
// If cleanup_test_user is requested, append a cleanup instruction
if session
.config
.as_ref()
.is_some_and(|c| c.auth.cleanup_test_user)
{
let cleanup_msg = PentestMessage::user(
session_id.clone(),
"Testing is complete. Now please clean up: navigate to the application and delete \
the test user account that was created during this session. Confirm once done."
.to_string(),
);
let _ = self.db.pentest_messages().insert_one(&cleanup_msg).await;
}
let _ = self.event_tx.send(PentestEvent::Complete {
summary: format!(
"Pentest complete. {} findings from {} tool invocations.",