Files
compliance-scanner-agent/compliance-core/tests/models.rs
Sharang Parnerkar c461faa2fb
All checks were successful
CI / Check (push) Has been skipped
CI / Detect Changes (push) Successful in 7s
CI / Deploy Agent (push) Successful in 2s
CI / Deploy Dashboard (push) Successful in 2s
CI / Deploy Docs (push) Successful in 2s
CI / Deploy MCP (push) Successful in 2s
feat: pentest onboarding — streaming, browser automation, reports, user cleanup (#16)
Complete pentest feature overhaul: SSE streaming, session-persistent browser tool (CDP), AES-256 credential encryption, auto-screenshots in reports, code-level remediation correlation, SAST triage chunking, context window optimization, test user cleanup (Keycloak/Auth0/Okta), wizard dropdowns, attack chain improvements, architecture docs with Mermaid diagrams.

Co-authored-by: Sharang Parnerkar <parnerkarsharang@gmail.com>
Reviewed-on: #16
2026-03-17 20:32:20 +00:00

620 lines
19 KiB
Rust

use compliance_core::models::*;
// ─── Severity ───
#[test]
fn severity_display_all_variants() {
assert_eq!(Severity::Info.to_string(), "info");
assert_eq!(Severity::Low.to_string(), "low");
assert_eq!(Severity::Medium.to_string(), "medium");
assert_eq!(Severity::High.to_string(), "high");
assert_eq!(Severity::Critical.to_string(), "critical");
}
#[test]
fn severity_ordering() {
assert!(Severity::Info < Severity::Low);
assert!(Severity::Low < Severity::Medium);
assert!(Severity::Medium < Severity::High);
assert!(Severity::High < Severity::Critical);
}
#[test]
fn severity_serde_roundtrip() {
for sev in [
Severity::Info,
Severity::Low,
Severity::Medium,
Severity::High,
Severity::Critical,
] {
let json = serde_json::to_string(&sev).unwrap();
let back: Severity = serde_json::from_str(&json).unwrap();
assert_eq!(sev, back);
}
}
#[test]
fn severity_deserialize_lowercase() {
let s: Severity = serde_json::from_str(r#""critical""#).unwrap();
assert_eq!(s, Severity::Critical);
}
// ─── FindingStatus ───
#[test]
fn finding_status_display_all_variants() {
assert_eq!(FindingStatus::Open.to_string(), "open");
assert_eq!(FindingStatus::Triaged.to_string(), "triaged");
assert_eq!(FindingStatus::FalsePositive.to_string(), "false_positive");
assert_eq!(FindingStatus::Resolved.to_string(), "resolved");
assert_eq!(FindingStatus::Ignored.to_string(), "ignored");
}
#[test]
fn finding_status_serde_roundtrip() {
for status in [
FindingStatus::Open,
FindingStatus::Triaged,
FindingStatus::FalsePositive,
FindingStatus::Resolved,
FindingStatus::Ignored,
] {
let json = serde_json::to_string(&status).unwrap();
let back: FindingStatus = serde_json::from_str(&json).unwrap();
assert_eq!(status, back);
}
}
// ─── Finding ───
#[test]
fn finding_new_defaults() {
let f = Finding::new(
"repo1".into(),
"fp123".into(),
"semgrep".into(),
ScanType::Sast,
"Test title".into(),
"Test desc".into(),
Severity::High,
);
assert_eq!(f.repo_id, "repo1");
assert_eq!(f.fingerprint, "fp123");
assert_eq!(f.scanner, "semgrep");
assert_eq!(f.scan_type, ScanType::Sast);
assert_eq!(f.severity, Severity::High);
assert_eq!(f.status, FindingStatus::Open);
assert!(f.id.is_none());
assert!(f.rule_id.is_none());
assert!(f.confidence.is_none());
assert!(f.file_path.is_none());
assert!(f.remediation.is_none());
assert!(f.suggested_fix.is_none());
assert!(f.triage_action.is_none());
assert!(f.developer_feedback.is_none());
}
// ─── ScanType ───
#[test]
fn scan_type_display_all_variants() {
let cases = vec![
(ScanType::Sast, "sast"),
(ScanType::Sbom, "sbom"),
(ScanType::Cve, "cve"),
(ScanType::Gdpr, "gdpr"),
(ScanType::OAuth, "oauth"),
(ScanType::Graph, "graph"),
(ScanType::Dast, "dast"),
(ScanType::SecretDetection, "secret_detection"),
(ScanType::Lint, "lint"),
(ScanType::CodeReview, "code_review"),
];
for (variant, expected) in cases {
assert_eq!(variant.to_string(), expected);
}
}
#[test]
fn scan_type_serde_roundtrip() {
for st in [
ScanType::Sast,
ScanType::SecretDetection,
ScanType::CodeReview,
] {
let json = serde_json::to_string(&st).unwrap();
let back: ScanType = serde_json::from_str(&json).unwrap();
assert_eq!(st, back);
}
}
// ─── ScanRun ───
#[test]
fn scan_run_new_defaults() {
let sr = ScanRun::new("repo1".into(), ScanTrigger::Manual);
assert_eq!(sr.repo_id, "repo1");
assert_eq!(sr.trigger, ScanTrigger::Manual);
assert_eq!(sr.status, ScanRunStatus::Running);
assert_eq!(sr.current_phase, ScanPhase::ChangeDetection);
assert!(sr.phases_completed.is_empty());
assert_eq!(sr.new_findings_count, 0);
assert!(sr.error_message.is_none());
assert!(sr.completed_at.is_none());
}
// ─── PentestStatus ───
#[test]
fn pentest_status_display() {
assert_eq!(pentest::PentestStatus::Running.to_string(), "running");
assert_eq!(pentest::PentestStatus::Paused.to_string(), "paused");
assert_eq!(pentest::PentestStatus::Completed.to_string(), "completed");
assert_eq!(pentest::PentestStatus::Failed.to_string(), "failed");
}
// ─── PentestStrategy ───
#[test]
fn pentest_strategy_display() {
assert_eq!(pentest::PentestStrategy::Quick.to_string(), "quick");
assert_eq!(
pentest::PentestStrategy::Comprehensive.to_string(),
"comprehensive"
);
assert_eq!(pentest::PentestStrategy::Targeted.to_string(), "targeted");
assert_eq!(
pentest::PentestStrategy::Aggressive.to_string(),
"aggressive"
);
assert_eq!(pentest::PentestStrategy::Stealth.to_string(), "stealth");
}
// ─── PentestSession ───
#[test]
fn pentest_session_new_defaults() {
let s = pentest::PentestSession::new("target1".into(), pentest::PentestStrategy::Quick);
assert_eq!(s.target_id, "target1");
assert_eq!(s.status, pentest::PentestStatus::Running);
assert_eq!(s.strategy, pentest::PentestStrategy::Quick);
assert_eq!(s.tool_invocations, 0);
assert_eq!(s.tool_successes, 0);
assert_eq!(s.findings_count, 0);
assert!(s.completed_at.is_none());
assert!(s.repo_id.is_none());
}
#[test]
fn pentest_session_success_rate_zero_invocations() {
let s = pentest::PentestSession::new("t".into(), pentest::PentestStrategy::Comprehensive);
assert_eq!(s.success_rate(), 100.0);
}
#[test]
fn pentest_session_success_rate_calculation() {
let mut s = pentest::PentestSession::new("t".into(), pentest::PentestStrategy::Comprehensive);
s.tool_invocations = 10;
s.tool_successes = 7;
assert!((s.success_rate() - 70.0).abs() < f64::EPSILON);
}
#[test]
fn pentest_session_success_rate_all_success() {
let mut s = pentest::PentestSession::new("t".into(), pentest::PentestStrategy::Quick);
s.tool_invocations = 5;
s.tool_successes = 5;
assert_eq!(s.success_rate(), 100.0);
}
#[test]
fn pentest_session_success_rate_none_success() {
let mut s = pentest::PentestSession::new("t".into(), pentest::PentestStrategy::Quick);
s.tool_invocations = 3;
s.tool_successes = 0;
assert_eq!(s.success_rate(), 0.0);
}
// ─── PentestMessage factories ───
#[test]
fn pentest_message_user() {
let m = pentest::PentestMessage::user("sess1".into(), "hello".into());
assert_eq!(m.role, "user");
assert_eq!(m.session_id, "sess1");
assert_eq!(m.content, "hello");
assert!(m.attack_node_id.is_none());
assert!(m.tool_calls.is_none());
}
#[test]
fn pentest_message_assistant() {
let m = pentest::PentestMessage::assistant("sess1".into(), "response".into());
assert_eq!(m.role, "assistant");
}
#[test]
fn pentest_message_tool_result() {
let m = pentest::PentestMessage::tool_result("sess1".into(), "output".into(), "node1".into());
assert_eq!(m.role, "tool_result");
assert_eq!(m.attack_node_id, Some("node1".to_string()));
}
// ─── AttackChainNode ───
#[test]
fn attack_chain_node_new_defaults() {
let n = pentest::AttackChainNode::new(
"sess1".into(),
"node1".into(),
"recon".into(),
serde_json::json!({"target": "example.com"}),
"Starting recon".into(),
);
assert_eq!(n.session_id, "sess1");
assert_eq!(n.node_id, "node1");
assert_eq!(n.tool_name, "recon");
assert_eq!(n.status, pentest::AttackNodeStatus::Pending);
assert!(n.parent_node_ids.is_empty());
assert!(n.findings_produced.is_empty());
assert!(n.risk_score.is_none());
assert!(n.started_at.is_none());
}
// ─── DastTarget ───
#[test]
fn dast_target_new_defaults() {
let t = dast::DastTarget::new(
"My App".into(),
"https://example.com".into(),
dast::DastTargetType::WebApp,
);
assert_eq!(t.name, "My App");
assert_eq!(t.base_url, "https://example.com");
assert_eq!(t.target_type, dast::DastTargetType::WebApp);
assert_eq!(t.max_crawl_depth, 5);
assert_eq!(t.rate_limit, 10);
assert!(!t.allow_destructive);
assert!(t.excluded_paths.is_empty());
assert!(t.auth_config.is_none());
assert!(t.repo_id.is_none());
}
#[test]
fn dast_target_type_display() {
assert_eq!(dast::DastTargetType::WebApp.to_string(), "webapp");
assert_eq!(dast::DastTargetType::RestApi.to_string(), "rest_api");
assert_eq!(dast::DastTargetType::GraphQl.to_string(), "graphql");
}
// ─── DastScanRun ───
#[test]
fn dast_scan_run_new_defaults() {
let sr = dast::DastScanRun::new("target1".into());
assert_eq!(sr.status, dast::DastScanStatus::Running);
assert_eq!(sr.current_phase, dast::DastScanPhase::Reconnaissance);
assert!(sr.phases_completed.is_empty());
assert_eq!(sr.endpoints_discovered, 0);
assert_eq!(sr.findings_count, 0);
assert!(!sr.exploitable_count > 0);
assert!(sr.completed_at.is_none());
}
#[test]
fn dast_scan_phase_display() {
assert_eq!(
dast::DastScanPhase::Reconnaissance.to_string(),
"reconnaissance"
);
assert_eq!(dast::DastScanPhase::Crawling.to_string(), "crawling");
assert_eq!(dast::DastScanPhase::Completed.to_string(), "completed");
}
// ─── DastVulnType ───
#[test]
fn dast_vuln_type_display_all_variants() {
let cases = vec![
(dast::DastVulnType::SqlInjection, "sql_injection"),
(dast::DastVulnType::Xss, "xss"),
(dast::DastVulnType::AuthBypass, "auth_bypass"),
(dast::DastVulnType::Ssrf, "ssrf"),
(dast::DastVulnType::Idor, "idor"),
(dast::DastVulnType::Other, "other"),
];
for (variant, expected) in cases {
assert_eq!(variant.to_string(), expected);
}
}
// ─── DastFinding ───
#[test]
fn dast_finding_new_defaults() {
let f = dast::DastFinding::new(
"run1".into(),
"target1".into(),
dast::DastVulnType::Xss,
"XSS in search".into(),
"Reflected XSS".into(),
Severity::High,
"https://example.com/search".into(),
"GET".into(),
);
assert_eq!(f.vuln_type, dast::DastVulnType::Xss);
assert_eq!(f.severity, Severity::High);
assert!(!f.exploitable);
assert!(f.evidence.is_empty());
assert!(f.session_id.is_none());
assert!(f.linked_sast_finding_id.is_none());
}
// ─── SbomEntry ───
#[test]
fn sbom_entry_new_defaults() {
let e = SbomEntry::new(
"repo1".into(),
"lodash".into(),
"4.17.21".into(),
"npm".into(),
);
assert_eq!(e.name, "lodash");
assert_eq!(e.version, "4.17.21");
assert_eq!(e.package_manager, "npm");
assert!(e.license.is_none());
assert!(e.purl.is_none());
assert!(e.known_vulnerabilities.is_empty());
}
// ─── TrackedRepository ───
#[test]
fn tracked_repository_new_defaults() {
let r = TrackedRepository::new("My Repo".into(), "https://github.com/org/repo.git".into());
assert_eq!(r.name, "My Repo");
assert_eq!(r.git_url, "https://github.com/org/repo.git");
assert_eq!(r.default_branch, "main");
assert!(!r.webhook_enabled);
assert!(r.webhook_secret.is_some());
// Webhook secret should be 32 hex chars (UUID without dashes)
assert_eq!(r.webhook_secret.as_ref().unwrap().len(), 32);
assert!(r.tracker_type.is_none());
assert_eq!(r.findings_count, 0);
}
// ─── ScanTrigger ───
#[test]
fn scan_trigger_serde_roundtrip() {
for trigger in [
ScanTrigger::Scheduled,
ScanTrigger::Webhook,
ScanTrigger::Manual,
] {
let json = serde_json::to_string(&trigger).unwrap();
let back: ScanTrigger = serde_json::from_str(&json).unwrap();
assert_eq!(trigger, back);
}
}
// ─── PentestEvent serde (tagged enum) ───
#[test]
fn pentest_event_serde_thinking() {
let event = pentest::PentestEvent::Thinking {
reasoning: "analyzing target".into(),
};
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains(r#""type":"thinking""#));
assert!(json.contains("analyzing target"));
}
#[test]
fn pentest_event_serde_finding() {
let event = pentest::PentestEvent::Finding {
finding_id: "f1".into(),
title: "XSS".into(),
severity: "high".into(),
};
let json = serde_json::to_string(&event).unwrap();
let back: pentest::PentestEvent = serde_json::from_str(&json).unwrap();
match back {
pentest::PentestEvent::Finding {
finding_id,
title,
severity,
} => {
assert_eq!(finding_id, "f1");
assert_eq!(title, "XSS");
assert_eq!(severity, "high");
}
_ => panic!("wrong variant"),
}
}
// ─── PentestEvent Paused/Resumed ───
#[test]
fn pentest_event_serde_paused() {
let event = pentest::PentestEvent::Paused;
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains(r#""type":"paused""#));
let back: pentest::PentestEvent = serde_json::from_str(&json).unwrap();
assert!(matches!(back, pentest::PentestEvent::Paused));
}
#[test]
fn pentest_event_serde_resumed() {
let event = pentest::PentestEvent::Resumed;
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains(r#""type":"resumed""#));
let back: pentest::PentestEvent = serde_json::from_str(&json).unwrap();
assert!(matches!(back, pentest::PentestEvent::Resumed));
}
// ─── PentestConfig serde ───
#[test]
fn pentest_config_serde_roundtrip() {
let config = pentest::PentestConfig {
app_url: "https://example.com".into(),
git_repo_url: Some("https://github.com/org/repo".into()),
branch: Some("main".into()),
commit_hash: None,
app_type: Some("web".into()),
rate_limit: Some(10),
auth: pentest::PentestAuthConfig {
mode: pentest::AuthMode::Manual,
username: Some("admin".into()),
password: Some("pass123".into()),
registration_url: None,
verification_email: None,
imap_host: None,
imap_port: None,
imap_username: None,
imap_password: None,
cleanup_test_user: true,
},
custom_headers: [("X-Token".to_string(), "abc".to_string())]
.into_iter()
.collect(),
strategy: Some("comprehensive".into()),
allow_destructive: false,
initial_instructions: Some("Test the login flow".into()),
scope_exclusions: vec!["/admin".into()],
disclaimer_accepted: true,
disclaimer_accepted_at: Some(chrono::Utc::now()),
environment: pentest::Environment::Staging,
tester: pentest::TesterInfo {
name: "Alice".into(),
email: "alice@example.com".into(),
},
max_duration_minutes: Some(30),
skip_mode: false,
};
let json = serde_json::to_string(&config).unwrap();
let back: pentest::PentestConfig = serde_json::from_str(&json).unwrap();
assert_eq!(back.app_url, "https://example.com");
assert_eq!(back.auth.mode, pentest::AuthMode::Manual);
assert_eq!(back.auth.username, Some("admin".into()));
assert!(back.auth.cleanup_test_user);
assert_eq!(back.scope_exclusions, vec!["/admin".to_string()]);
assert_eq!(back.environment, pentest::Environment::Staging);
}
#[test]
fn pentest_auth_config_default() {
let auth = pentest::PentestAuthConfig::default();
assert_eq!(auth.mode, pentest::AuthMode::None);
assert!(auth.username.is_none());
assert!(auth.password.is_none());
assert!(auth.verification_email.is_none());
assert!(auth.imap_host.is_none());
assert!(!auth.cleanup_test_user);
}
// ─── TestUserRecord ───
#[test]
fn test_user_record_default() {
let r = pentest::TestUserRecord::default();
assert!(r.username.is_none());
assert!(r.email.is_none());
assert!(r.provider_user_id.is_none());
assert!(r.provider.is_none());
assert!(!r.cleaned_up);
}
#[test]
fn test_user_record_serde_roundtrip() {
let r = pentest::TestUserRecord {
username: Some("pentestuser".into()),
email: Some("pentest+abc@scanner.example.com".into()),
provider_user_id: Some("kc-uuid-123".into()),
provider: Some(pentest::IdentityProvider::Keycloak),
cleaned_up: false,
};
let json = serde_json::to_string(&r).unwrap();
let back: pentest::TestUserRecord = serde_json::from_str(&json).unwrap();
assert_eq!(back.username, Some("pentestuser".into()));
assert_eq!(back.provider, Some(pentest::IdentityProvider::Keycloak));
assert!(!back.cleaned_up);
}
#[test]
fn identity_provider_serde_all_variants() {
for (variant, expected) in [
(pentest::IdentityProvider::Keycloak, "\"keycloak\""),
(pentest::IdentityProvider::Auth0, "\"auth0\""),
(pentest::IdentityProvider::Okta, "\"okta\""),
(pentest::IdentityProvider::Firebase, "\"firebase\""),
(pentest::IdentityProvider::Custom, "\"custom\""),
] {
let json = serde_json::to_string(&variant).unwrap();
assert_eq!(json, expected);
let back: pentest::IdentityProvider = serde_json::from_str(&json).unwrap();
assert_eq!(back, variant);
}
}
#[test]
fn pentest_session_with_test_user() {
let mut s = pentest::PentestSession::new("t".into(), pentest::PentestStrategy::Quick);
assert!(s.test_user.is_none());
s.test_user = Some(pentest::TestUserRecord {
username: Some("pentester".into()),
email: Some("pentest+123@example.com".into()),
provider_user_id: None,
provider: Some(pentest::IdentityProvider::Auth0),
cleaned_up: false,
});
let bson_doc = bson::to_document(&s).unwrap();
let back: pentest::PentestSession = bson::from_document(bson_doc).unwrap();
assert!(back.test_user.is_some());
let tu = back.test_user.as_ref().unwrap();
assert_eq!(tu.username, Some("pentester".into()));
assert_eq!(tu.provider, Some(pentest::IdentityProvider::Auth0));
}
// ─── Serde helpers (BSON datetime) ───
#[test]
fn bson_datetime_roundtrip_via_finding() {
let f = Finding::new(
"repo1".into(),
"fp".into(),
"test".into(),
ScanType::Sast,
"t".into(),
"d".into(),
Severity::Low,
);
// Serialize to BSON and back
let bson_doc = bson::to_document(&f).unwrap();
let back: Finding = bson::from_document(bson_doc).unwrap();
// Timestamps should survive (within 1 second tolerance due to ms precision)
assert!((back.created_at - f.created_at).num_milliseconds().abs() < 1000);
}
#[test]
fn opt_bson_datetime_roundtrip_with_none() {
let s = pentest::PentestSession::new("t".into(), pentest::PentestStrategy::Quick);
assert!(s.completed_at.is_none());
let bson_doc = bson::to_document(&s).unwrap();
let back: pentest::PentestSession = bson::from_document(bson_doc).unwrap();
assert!(back.completed_at.is_none());
}
#[test]
fn opt_bson_datetime_roundtrip_with_some() {
let mut s = pentest::PentestSession::new("t".into(), pentest::PentestStrategy::Quick);
s.completed_at = Some(chrono::Utc::now());
let bson_doc = bson::to_document(&s).unwrap();
let back: pentest::PentestSession = bson::from_document(bson_doc).unwrap();
assert!(back.completed_at.is_some());
}