Files
certifai/src/infrastructure/chat.rs
Sharang Parnerkar 1d7aebf37c
Some checks failed
CI / Format (push) Successful in 3s
CI / Clippy (push) Successful in 2m47s
CI / Security Audit (push) Successful in 1m35s
CI / Tests (push) Successful in 3m54s
CI / E2E Tests (push) Failing after 16s
CI / Deploy (push) Has been skipped
test: added more tests (#16)
Co-authored-by: Sharang Parnerkar <parnerkarsharang@gmail.com>
Reviewed-on: #16
2026-02-25 10:01:56 +00:00

742 lines
23 KiB
Rust

//! Chat CRUD server functions for session and message persistence.
//!
//! Each function extracts the user's `sub` from the tower-sessions session
//! to scope all queries to the authenticated user. The `ServerState` provides
//! access to the MongoDB [`Database`](super::database::Database).
use crate::models::{ChatMessage, ChatSession};
use dioxus::prelude::*;
/// Convert a raw BSON document to a `ChatSession`, extracting `_id` as a hex string.
#[cfg(feature = "server")]
pub(crate) fn doc_to_chat_session(doc: &mongodb::bson::Document) -> ChatSession {
use crate::models::ChatNamespace;
let id = doc
.get_object_id("_id")
.map(|oid| oid.to_hex())
.unwrap_or_default();
let namespace = match doc.get_str("namespace").unwrap_or("General") {
"News" => ChatNamespace::News,
_ => ChatNamespace::General,
};
let article_url = doc
.get_str("article_url")
.ok()
.map(String::from)
.filter(|s| !s.is_empty());
ChatSession {
id,
user_sub: doc.get_str("user_sub").unwrap_or_default().to_string(),
title: doc.get_str("title").unwrap_or_default().to_string(),
namespace,
provider: doc.get_str("provider").unwrap_or_default().to_string(),
model: doc.get_str("model").unwrap_or_default().to_string(),
created_at: doc.get_str("created_at").unwrap_or_default().to_string(),
updated_at: doc.get_str("updated_at").unwrap_or_default().to_string(),
article_url,
}
}
/// Convert a raw BSON document to a `ChatMessage`, extracting `_id` as a hex string.
#[cfg(feature = "server")]
pub(crate) fn doc_to_chat_message(doc: &mongodb::bson::Document) -> ChatMessage {
use crate::models::ChatRole;
let id = doc
.get_object_id("_id")
.map(|oid| oid.to_hex())
.unwrap_or_default();
let role = match doc.get_str("role").unwrap_or("User") {
"Assistant" => ChatRole::Assistant,
"System" => ChatRole::System,
_ => ChatRole::User,
};
ChatMessage {
id,
session_id: doc.get_str("session_id").unwrap_or_default().to_string(),
role,
content: doc.get_str("content").unwrap_or_default().to_string(),
attachments: Vec::new(),
timestamp: doc.get_str("timestamp").unwrap_or_default().to_string(),
}
}
/// Helper: extract the authenticated user's `sub` from the session.
///
/// # Errors
///
/// Returns `ServerFnError` if the session is missing or unreadable.
#[cfg(feature = "server")]
async fn require_user_sub() -> Result<String, ServerFnError> {
use crate::infrastructure::auth::LOGGED_IN_USER_SESS_KEY;
use crate::infrastructure::state::UserStateInner;
use dioxus_fullstack::FullstackContext;
let session: tower_sessions::Session = FullstackContext::extract().await?;
let user: UserStateInner = session
.get(LOGGED_IN_USER_SESS_KEY)
.await
.map_err(|e| ServerFnError::new(format!("session read failed: {e}")))?
.ok_or_else(|| ServerFnError::new("not authenticated"))?;
Ok(user.sub)
}
/// Helper: extract the [`ServerState`] from the request context.
#[cfg(feature = "server")]
async fn require_state() -> Result<crate::infrastructure::ServerState, ServerFnError> {
dioxus_fullstack::FullstackContext::extract().await
}
/// List all chat sessions for the authenticated user, ordered by
/// `updated_at` descending (most recent first).
///
/// # Errors
///
/// Returns `ServerFnError` if authentication or the database query fails.
#[server(endpoint = "list-chat-sessions")]
pub async fn list_chat_sessions() -> Result<Vec<ChatSession>, ServerFnError> {
use mongodb::bson::doc;
use mongodb::options::FindOptions;
let user_sub = require_user_sub().await?;
let state = require_state().await?;
let opts = FindOptions::builder()
.sort(doc! { "updated_at": -1 })
.build();
let mut cursor = state
.db
.raw_collection("chat_sessions")
.find(doc! { "user_sub": &user_sub })
.with_options(opts)
.await
.map_err(|e| ServerFnError::new(format!("db error: {e}")))?;
let mut sessions = Vec::new();
use futures::TryStreamExt;
while let Some(raw_doc) = cursor
.try_next()
.await
.map_err(|e| ServerFnError::new(format!("cursor error: {e}")))?
{
sessions.push(doc_to_chat_session(&raw_doc));
}
Ok(sessions)
}
/// Create a new chat session and return it with the MongoDB-generated ID.
///
/// # Arguments
///
/// * `title` - Display title for the session
/// * `namespace` - Namespace string: `"General"` or `"News"`
/// * `provider` - LLM provider name (e.g. "ollama")
/// * `model` - Model ID (e.g. "llama3.1:8b")
/// * `article_url` - Source article URL (only for `News` namespace, empty if none)
///
/// # Errors
///
/// Returns `ServerFnError` if authentication or the insert fails.
#[server(endpoint = "create-chat-session")]
pub async fn create_chat_session(
title: String,
namespace: String,
provider: String,
model: String,
article_url: String,
) -> Result<ChatSession, ServerFnError> {
use crate::models::ChatNamespace;
let user_sub = require_user_sub().await?;
let state = require_state().await?;
let ns = if namespace == "News" {
ChatNamespace::News
} else {
ChatNamespace::General
};
let url = if article_url.is_empty() {
None
} else {
Some(article_url)
};
let now = chrono::Utc::now().to_rfc3339();
let session = ChatSession {
id: String::new(), // MongoDB will generate _id
user_sub,
title,
namespace: ns,
provider,
model,
created_at: now.clone(),
updated_at: now,
article_url: url,
};
let result = state
.db
.chat_sessions()
.insert_one(&session)
.await
.map_err(|e| ServerFnError::new(format!("insert failed: {e}")))?;
// Return the session with the generated ID
let id = result
.inserted_id
.as_object_id()
.map(|oid| oid.to_hex())
.unwrap_or_default();
Ok(ChatSession { id, ..session })
}
/// Rename a chat session.
///
/// # Arguments
///
/// * `session_id` - The MongoDB document ID of the session
/// * `new_title` - The new title to set
///
/// # Errors
///
/// Returns `ServerFnError` if authentication, the session is not found,
/// or the update fails.
#[server(endpoint = "rename-chat-session")]
pub async fn rename_chat_session(
session_id: String,
new_title: String,
) -> Result<(), ServerFnError> {
use mongodb::bson::{doc, oid::ObjectId};
let user_sub = require_user_sub().await?;
let state = require_state().await?;
let oid = ObjectId::parse_str(&session_id)
.map_err(|e| ServerFnError::new(format!("invalid session id: {e}")))?;
let result = state
.db
.chat_sessions()
.update_one(
doc! { "_id": oid, "user_sub": &user_sub },
doc! { "$set": { "title": &new_title, "updated_at": chrono::Utc::now().to_rfc3339() } },
)
.await
.map_err(|e| ServerFnError::new(format!("update failed: {e}")))?;
if result.matched_count == 0 {
return Err(ServerFnError::new("session not found or not owned by user"));
}
Ok(())
}
/// Delete a chat session and all its messages.
///
/// # Arguments
///
/// * `session_id` - The MongoDB document ID of the session
///
/// # Errors
///
/// Returns `ServerFnError` if authentication or the delete fails.
#[server(endpoint = "delete-chat-session")]
pub async fn delete_chat_session(session_id: String) -> Result<(), ServerFnError> {
use mongodb::bson::{doc, oid::ObjectId};
let user_sub = require_user_sub().await?;
let state = require_state().await?;
let oid = ObjectId::parse_str(&session_id)
.map_err(|e| ServerFnError::new(format!("invalid session id: {e}")))?;
// Delete the session (scoped to user)
state
.db
.chat_sessions()
.delete_one(doc! { "_id": oid, "user_sub": &user_sub })
.await
.map_err(|e| ServerFnError::new(format!("delete session failed: {e}")))?;
// Delete all messages belonging to this session
state
.db
.chat_messages()
.delete_many(doc! { "session_id": &session_id })
.await
.map_err(|e| ServerFnError::new(format!("delete messages failed: {e}")))?;
Ok(())
}
/// Load all messages for a chat session, ordered by timestamp ascending.
///
/// # Arguments
///
/// * `session_id` - The MongoDB document ID of the session
///
/// # Errors
///
/// Returns `ServerFnError` if authentication or the query fails.
#[server(endpoint = "list-chat-messages")]
pub async fn list_chat_messages(session_id: String) -> Result<Vec<ChatMessage>, ServerFnError> {
use mongodb::bson::doc;
use mongodb::options::FindOptions;
// Verify the user owns this session
let user_sub = require_user_sub().await?;
let state = require_state().await?;
// Verify the user owns this session using ObjectId for _id matching
use mongodb::bson::oid::ObjectId;
let session_oid = ObjectId::parse_str(&session_id)
.map_err(|e| ServerFnError::new(format!("invalid session id: {e}")))?;
let session_exists = state
.db
.raw_collection("chat_sessions")
.count_documents(doc! { "_id": session_oid, "user_sub": &user_sub })
.await
.map_err(|e| ServerFnError::new(format!("db error: {e}")))?;
if session_exists == 0 {
return Err(ServerFnError::new("session not found or not owned by user"));
}
let opts = FindOptions::builder().sort(doc! { "timestamp": 1 }).build();
let mut cursor = state
.db
.raw_collection("chat_messages")
.find(doc! { "session_id": &session_id })
.with_options(opts)
.await
.map_err(|e| ServerFnError::new(format!("db error: {e}")))?;
let mut messages = Vec::new();
use futures::TryStreamExt;
while let Some(raw_doc) = cursor
.try_next()
.await
.map_err(|e| ServerFnError::new(format!("cursor error: {e}")))?
{
messages.push(doc_to_chat_message(&raw_doc));
}
Ok(messages)
}
/// Persist a single chat message and return it with the MongoDB-generated ID.
///
/// Also updates the parent session's `updated_at` timestamp.
///
/// # Arguments
///
/// * `session_id` - The session this message belongs to
/// * `role` - Message role string: `"user"`, `"assistant"`, or `"system"`
/// * `content` - Message text content
///
/// # Errors
///
/// Returns `ServerFnError` if authentication or the insert fails.
#[server(endpoint = "save-chat-message")]
pub async fn save_chat_message(
session_id: String,
role: String,
content: String,
) -> Result<ChatMessage, ServerFnError> {
use crate::models::ChatRole;
use mongodb::bson::{doc, oid::ObjectId};
let _user_sub = require_user_sub().await?;
let state = require_state().await?;
let chat_role = match role.as_str() {
"assistant" => ChatRole::Assistant,
"system" => ChatRole::System,
_ => ChatRole::User,
};
let now = chrono::Utc::now().to_rfc3339();
let message = ChatMessage {
id: String::new(),
session_id: session_id.clone(),
role: chat_role,
content,
attachments: Vec::new(),
timestamp: now.clone(),
};
let result = state
.db
.chat_messages()
.insert_one(&message)
.await
.map_err(|e| ServerFnError::new(format!("insert failed: {e}")))?;
let id = result
.inserted_id
.as_object_id()
.map(|oid| oid.to_hex())
.unwrap_or_default();
// Update session's updated_at timestamp
if let Ok(session_oid) = ObjectId::parse_str(&session_id) {
let _ = state
.db
.chat_sessions()
.update_one(
doc! { "_id": session_oid },
doc! { "$set": { "updated_at": &now } },
)
.await;
}
Ok(ChatMessage { id, ..message })
}
/// Non-streaming chat completion (fallback for article panel).
///
/// Sends the full conversation history to the configured LLM provider
/// and returns the complete response. Used where SSE streaming is not
/// needed (e.g. dashboard article follow-up panel).
///
/// # Arguments
///
/// * `session_id` - The chat session ID (loads provider/model config)
/// * `messages_json` - Conversation history as JSON string:
/// `[{"role":"user","content":"..."},...]`
///
/// # Errors
///
/// Returns `ServerFnError` if the LLM request fails.
#[server(endpoint = "chat-complete")]
pub async fn chat_complete(
session_id: String,
messages_json: String,
) -> Result<String, ServerFnError> {
use mongodb::bson::{doc, oid::ObjectId};
let _user_sub = require_user_sub().await?;
let state = require_state().await?;
// Load the session to get provider/model
let session_oid = ObjectId::parse_str(&session_id)
.map_err(|e| ServerFnError::new(format!("invalid session id: {e}")))?;
let session_doc = state
.db
.raw_collection("chat_sessions")
.find_one(doc! { "_id": session_oid })
.await
.map_err(|e| ServerFnError::new(format!("db error: {e}")))?
.ok_or_else(|| ServerFnError::new("session not found"))?;
let session = doc_to_chat_session(&session_doc);
// Resolve provider URL and model
let (base_url, model) = resolve_provider_url(
&state.services.ollama_url,
&state.services.ollama_model,
&session.provider,
&session.model,
);
// Parse messages from JSON
let chat_msgs: Vec<serde_json::Value> = serde_json::from_str(&messages_json)
.map_err(|e| ServerFnError::new(format!("invalid messages JSON: {e}")))?;
let body = serde_json::json!({
"model": model,
"messages": chat_msgs,
"stream": false,
});
let client = reqwest::Client::new();
let url = format!("{}/v1/chat/completions", base_url.trim_end_matches('/'));
let resp = client
.post(&url)
.header("content-type", "application/json")
.json(&body)
.send()
.await
.map_err(|e| ServerFnError::new(format!("LLM request failed: {e}")))?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
return Err(ServerFnError::new(format!("LLM returned {status}: {text}")));
}
let json: serde_json::Value = resp
.json()
.await
.map_err(|e| ServerFnError::new(format!("parse error: {e}")))?;
json["choices"][0]["message"]["content"]
.as_str()
.map(String::from)
.ok_or_else(|| ServerFnError::new("empty LLM response"))
}
/// Resolve the base URL for a provider, falling back to Ollama defaults.
///
/// # Arguments
///
/// * `ollama_url` - Default Ollama base URL from config
/// * `ollama_model` - Default Ollama model from config
/// * `provider` - Provider name (e.g. "openai", "anthropic", "huggingface")
/// * `model` - Model ID (may be empty for Ollama default)
///
/// # Returns
///
/// A `(base_url, model)` tuple resolved for the given provider.
#[cfg(feature = "server")]
pub(crate) fn resolve_provider_url(
ollama_url: &str,
ollama_model: &str,
provider: &str,
model: &str,
) -> (String, String) {
match provider {
"openai" => ("https://api.openai.com".to_string(), model.to_string()),
"anthropic" => ("https://api.anthropic.com".to_string(), model.to_string()),
"huggingface" => (
format!("https://api-inference.huggingface.co/models/{}", model),
model.to_string(),
),
// Default to Ollama
_ => (
ollama_url.to_string(),
if model.is_empty() {
ollama_model.to_string()
} else {
model.to_string()
},
),
}
}
#[cfg(test)]
mod tests {
// -----------------------------------------------------------------------
// BSON document conversion tests (server feature required)
// -----------------------------------------------------------------------
#[cfg(feature = "server")]
mod server_tests {
use super::super::{doc_to_chat_message, doc_to_chat_session, resolve_provider_url};
use crate::models::{ChatNamespace, ChatRole};
use mongodb::bson::{doc, oid::ObjectId, Document};
use pretty_assertions::assert_eq;
// -- doc_to_chat_session --
fn sample_session_doc() -> (ObjectId, Document) {
let oid = ObjectId::new();
let doc = doc! {
"_id": oid,
"user_sub": "user-42",
"title": "Test Session",
"namespace": "News",
"provider": "openai",
"model": "gpt-4",
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2025-01-02T00:00:00Z",
"article_url": "https://example.com/article",
};
(oid, doc)
}
#[test]
fn doc_to_chat_session_extracts_id_as_hex() {
let (oid, doc) = sample_session_doc();
let session = doc_to_chat_session(&doc);
assert_eq!(session.id, oid.to_hex());
}
#[test]
fn doc_to_chat_session_maps_news_namespace() {
let (_, doc) = sample_session_doc();
let session = doc_to_chat_session(&doc);
assert_eq!(session.namespace, ChatNamespace::News);
}
#[test]
fn doc_to_chat_session_defaults_to_general_for_unknown() {
let mut doc = sample_session_doc().1;
doc.insert("namespace", "SomethingElse");
let session = doc_to_chat_session(&doc);
assert_eq!(session.namespace, ChatNamespace::General);
}
#[test]
fn doc_to_chat_session_extracts_all_string_fields() {
let (_, doc) = sample_session_doc();
let session = doc_to_chat_session(&doc);
assert_eq!(session.user_sub, "user-42");
assert_eq!(session.title, "Test Session");
assert_eq!(session.provider, "openai");
assert_eq!(session.model, "gpt-4");
assert_eq!(session.created_at, "2025-01-01T00:00:00Z");
assert_eq!(session.updated_at, "2025-01-02T00:00:00Z");
}
#[test]
fn doc_to_chat_session_handles_missing_article_url() {
let oid = ObjectId::new();
let doc = doc! {
"_id": oid,
"user_sub": "u",
"title": "t",
"provider": "ollama",
"model": "m",
"created_at": "c",
"updated_at": "u",
};
let session = doc_to_chat_session(&doc);
assert_eq!(session.article_url, None);
}
#[test]
fn doc_to_chat_session_filters_empty_article_url() {
let oid = ObjectId::new();
let doc = doc! {
"_id": oid,
"user_sub": "u",
"title": "t",
"namespace": "News",
"provider": "ollama",
"model": "m",
"created_at": "c",
"updated_at": "u",
"article_url": "",
};
let session = doc_to_chat_session(&doc);
assert_eq!(session.article_url, None);
}
// -- doc_to_chat_message --
fn sample_message_doc() -> (ObjectId, Document) {
let oid = ObjectId::new();
let doc = doc! {
"_id": oid,
"session_id": "sess-1",
"role": "Assistant",
"content": "Hello there!",
"timestamp": "2025-01-01T12:00:00Z",
};
(oid, doc)
}
#[test]
fn doc_to_chat_message_extracts_id_as_hex() {
let (oid, doc) = sample_message_doc();
let msg = doc_to_chat_message(&doc);
assert_eq!(msg.id, oid.to_hex());
}
#[test]
fn doc_to_chat_message_maps_assistant_role() {
let (_, doc) = sample_message_doc();
let msg = doc_to_chat_message(&doc);
assert_eq!(msg.role, ChatRole::Assistant);
}
#[test]
fn doc_to_chat_message_maps_system_role() {
let mut doc = sample_message_doc().1;
doc.insert("role", "System");
let msg = doc_to_chat_message(&doc);
assert_eq!(msg.role, ChatRole::System);
}
#[test]
fn doc_to_chat_message_defaults_to_user_for_unknown() {
let mut doc = sample_message_doc().1;
doc.insert("role", "SomethingElse");
let msg = doc_to_chat_message(&doc);
assert_eq!(msg.role, ChatRole::User);
}
#[test]
fn doc_to_chat_message_extracts_content_and_timestamp() {
let (_, doc) = sample_message_doc();
let msg = doc_to_chat_message(&doc);
assert_eq!(msg.content, "Hello there!");
assert_eq!(msg.timestamp, "2025-01-01T12:00:00Z");
assert_eq!(msg.session_id, "sess-1");
}
#[test]
fn doc_to_chat_message_attachments_always_empty() {
let (_, doc) = sample_message_doc();
let msg = doc_to_chat_message(&doc);
assert!(msg.attachments.is_empty());
}
// -- resolve_provider_url --
const TEST_OLLAMA_URL: &str = "http://localhost:11434";
const TEST_OLLAMA_MODEL: &str = "llama3.1:8b";
#[test]
fn resolve_openai_returns_api_openai() {
let (url, model) =
resolve_provider_url(TEST_OLLAMA_URL, TEST_OLLAMA_MODEL, "openai", "gpt-4o");
assert_eq!(url, "https://api.openai.com");
assert_eq!(model, "gpt-4o");
}
#[test]
fn resolve_anthropic_returns_api_anthropic() {
let (url, model) = resolve_provider_url(
TEST_OLLAMA_URL,
TEST_OLLAMA_MODEL,
"anthropic",
"claude-3-opus",
);
assert_eq!(url, "https://api.anthropic.com");
assert_eq!(model, "claude-3-opus");
}
#[test]
fn resolve_huggingface_returns_model_url() {
let (url, model) = resolve_provider_url(
TEST_OLLAMA_URL,
TEST_OLLAMA_MODEL,
"huggingface",
"meta-llama/Llama-2-7b",
);
assert_eq!(
url,
"https://api-inference.huggingface.co/models/meta-llama/Llama-2-7b"
);
assert_eq!(model, "meta-llama/Llama-2-7b");
}
#[test]
fn resolve_unknown_defaults_to_ollama() {
let (url, model) =
resolve_provider_url(TEST_OLLAMA_URL, TEST_OLLAMA_MODEL, "ollama", "mistral:7b");
assert_eq!(url, TEST_OLLAMA_URL);
assert_eq!(model, "mistral:7b");
}
#[test]
fn resolve_empty_model_falls_back_to_server_default() {
let (url, model) =
resolve_provider_url(TEST_OLLAMA_URL, TEST_OLLAMA_MODEL, "ollama", "");
assert_eq!(url, TEST_OLLAMA_URL);
assert_eq!(model, TEST_OLLAMA_MODEL);
}
}
}