feat(chat): added chat interface and connection to ollama (#10)
Co-authored-by: Sharang Parnerkar <parnerkarsharang@gmail.com> Reviewed-on: #10
This commit was merged in pull request #10.
This commit is contained in:
507
src/infrastructure/chat.rs
Normal file
507
src/infrastructure/chat.rs
Normal file
@@ -0,0 +1,507 @@
|
||||
//! Chat CRUD server functions for session and message persistence.
|
||||
//!
|
||||
//! Each function extracts the user's `sub` from the tower-sessions session
|
||||
//! to scope all queries to the authenticated user. The `ServerState` provides
|
||||
//! access to the MongoDB [`Database`](super::database::Database).
|
||||
|
||||
use crate::models::{ChatMessage, ChatSession};
|
||||
use dioxus::prelude::*;
|
||||
|
||||
/// Convert a raw BSON document to a `ChatSession`, extracting `_id` as a hex string.
|
||||
#[cfg(feature = "server")]
|
||||
pub(crate) fn doc_to_chat_session(doc: &mongodb::bson::Document) -> ChatSession {
|
||||
use crate::models::ChatNamespace;
|
||||
|
||||
let id = doc
|
||||
.get_object_id("_id")
|
||||
.map(|oid| oid.to_hex())
|
||||
.unwrap_or_default();
|
||||
let namespace = match doc.get_str("namespace").unwrap_or("General") {
|
||||
"News" => ChatNamespace::News,
|
||||
_ => ChatNamespace::General,
|
||||
};
|
||||
let article_url = doc
|
||||
.get_str("article_url")
|
||||
.ok()
|
||||
.map(String::from)
|
||||
.filter(|s| !s.is_empty());
|
||||
|
||||
ChatSession {
|
||||
id,
|
||||
user_sub: doc.get_str("user_sub").unwrap_or_default().to_string(),
|
||||
title: doc.get_str("title").unwrap_or_default().to_string(),
|
||||
namespace,
|
||||
provider: doc.get_str("provider").unwrap_or_default().to_string(),
|
||||
model: doc.get_str("model").unwrap_or_default().to_string(),
|
||||
created_at: doc.get_str("created_at").unwrap_or_default().to_string(),
|
||||
updated_at: doc.get_str("updated_at").unwrap_or_default().to_string(),
|
||||
article_url,
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert a raw BSON document to a `ChatMessage`, extracting `_id` as a hex string.
|
||||
#[cfg(feature = "server")]
|
||||
pub(crate) fn doc_to_chat_message(doc: &mongodb::bson::Document) -> ChatMessage {
|
||||
use crate::models::ChatRole;
|
||||
|
||||
let id = doc
|
||||
.get_object_id("_id")
|
||||
.map(|oid| oid.to_hex())
|
||||
.unwrap_or_default();
|
||||
let role = match doc.get_str("role").unwrap_or("User") {
|
||||
"Assistant" => ChatRole::Assistant,
|
||||
"System" => ChatRole::System,
|
||||
_ => ChatRole::User,
|
||||
};
|
||||
ChatMessage {
|
||||
id,
|
||||
session_id: doc.get_str("session_id").unwrap_or_default().to_string(),
|
||||
role,
|
||||
content: doc.get_str("content").unwrap_or_default().to_string(),
|
||||
attachments: Vec::new(),
|
||||
timestamp: doc.get_str("timestamp").unwrap_or_default().to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper: extract the authenticated user's `sub` from the session.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns `ServerFnError` if the session is missing or unreadable.
|
||||
#[cfg(feature = "server")]
|
||||
async fn require_user_sub() -> Result<String, ServerFnError> {
|
||||
use crate::infrastructure::auth::LOGGED_IN_USER_SESS_KEY;
|
||||
use crate::infrastructure::state::UserStateInner;
|
||||
use dioxus_fullstack::FullstackContext;
|
||||
|
||||
let session: tower_sessions::Session = FullstackContext::extract().await?;
|
||||
let user: UserStateInner = session
|
||||
.get(LOGGED_IN_USER_SESS_KEY)
|
||||
.await
|
||||
.map_err(|e| ServerFnError::new(format!("session read failed: {e}")))?
|
||||
.ok_or_else(|| ServerFnError::new("not authenticated"))?;
|
||||
Ok(user.sub)
|
||||
}
|
||||
|
||||
/// Helper: extract the [`ServerState`] from the request context.
|
||||
#[cfg(feature = "server")]
|
||||
async fn require_state() -> Result<crate::infrastructure::ServerState, ServerFnError> {
|
||||
dioxus_fullstack::FullstackContext::extract().await
|
||||
}
|
||||
|
||||
/// List all chat sessions for the authenticated user, ordered by
|
||||
/// `updated_at` descending (most recent first).
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns `ServerFnError` if authentication or the database query fails.
|
||||
#[server(endpoint = "list-chat-sessions")]
|
||||
pub async fn list_chat_sessions() -> Result<Vec<ChatSession>, ServerFnError> {
|
||||
use mongodb::bson::doc;
|
||||
use mongodb::options::FindOptions;
|
||||
|
||||
let user_sub = require_user_sub().await?;
|
||||
let state = require_state().await?;
|
||||
|
||||
let opts = FindOptions::builder()
|
||||
.sort(doc! { "updated_at": -1 })
|
||||
.build();
|
||||
|
||||
let mut cursor = state
|
||||
.db
|
||||
.raw_collection("chat_sessions")
|
||||
.find(doc! { "user_sub": &user_sub })
|
||||
.with_options(opts)
|
||||
.await
|
||||
.map_err(|e| ServerFnError::new(format!("db error: {e}")))?;
|
||||
|
||||
let mut sessions = Vec::new();
|
||||
use futures::TryStreamExt;
|
||||
while let Some(raw_doc) = cursor
|
||||
.try_next()
|
||||
.await
|
||||
.map_err(|e| ServerFnError::new(format!("cursor error: {e}")))?
|
||||
{
|
||||
sessions.push(doc_to_chat_session(&raw_doc));
|
||||
}
|
||||
|
||||
Ok(sessions)
|
||||
}
|
||||
|
||||
/// Create a new chat session and return it with the MongoDB-generated ID.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `title` - Display title for the session
|
||||
/// * `namespace` - Namespace string: `"General"` or `"News"`
|
||||
/// * `provider` - LLM provider name (e.g. "ollama")
|
||||
/// * `model` - Model ID (e.g. "llama3.1:8b")
|
||||
/// * `article_url` - Source article URL (only for `News` namespace, empty if none)
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns `ServerFnError` if authentication or the insert fails.
|
||||
#[server(endpoint = "create-chat-session")]
|
||||
pub async fn create_chat_session(
|
||||
title: String,
|
||||
namespace: String,
|
||||
provider: String,
|
||||
model: String,
|
||||
article_url: String,
|
||||
) -> Result<ChatSession, ServerFnError> {
|
||||
use crate::models::ChatNamespace;
|
||||
|
||||
let user_sub = require_user_sub().await?;
|
||||
let state = require_state().await?;
|
||||
|
||||
let ns = if namespace == "News" {
|
||||
ChatNamespace::News
|
||||
} else {
|
||||
ChatNamespace::General
|
||||
};
|
||||
|
||||
let url = if article_url.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(article_url)
|
||||
};
|
||||
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
let session = ChatSession {
|
||||
id: String::new(), // MongoDB will generate _id
|
||||
user_sub,
|
||||
title,
|
||||
namespace: ns,
|
||||
provider,
|
||||
model,
|
||||
created_at: now.clone(),
|
||||
updated_at: now,
|
||||
article_url: url,
|
||||
};
|
||||
|
||||
let result = state
|
||||
.db
|
||||
.chat_sessions()
|
||||
.insert_one(&session)
|
||||
.await
|
||||
.map_err(|e| ServerFnError::new(format!("insert failed: {e}")))?;
|
||||
|
||||
// Return the session with the generated ID
|
||||
let id = result
|
||||
.inserted_id
|
||||
.as_object_id()
|
||||
.map(|oid| oid.to_hex())
|
||||
.unwrap_or_default();
|
||||
|
||||
Ok(ChatSession { id, ..session })
|
||||
}
|
||||
|
||||
/// Rename a chat session.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `session_id` - The MongoDB document ID of the session
|
||||
/// * `new_title` - The new title to set
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns `ServerFnError` if authentication, the session is not found,
|
||||
/// or the update fails.
|
||||
#[server(endpoint = "rename-chat-session")]
|
||||
pub async fn rename_chat_session(
|
||||
session_id: String,
|
||||
new_title: String,
|
||||
) -> Result<(), ServerFnError> {
|
||||
use mongodb::bson::{doc, oid::ObjectId};
|
||||
|
||||
let user_sub = require_user_sub().await?;
|
||||
let state = require_state().await?;
|
||||
|
||||
let oid = ObjectId::parse_str(&session_id)
|
||||
.map_err(|e| ServerFnError::new(format!("invalid session id: {e}")))?;
|
||||
|
||||
let result = state
|
||||
.db
|
||||
.chat_sessions()
|
||||
.update_one(
|
||||
doc! { "_id": oid, "user_sub": &user_sub },
|
||||
doc! { "$set": { "title": &new_title, "updated_at": chrono::Utc::now().to_rfc3339() } },
|
||||
)
|
||||
.await
|
||||
.map_err(|e| ServerFnError::new(format!("update failed: {e}")))?;
|
||||
|
||||
if result.matched_count == 0 {
|
||||
return Err(ServerFnError::new("session not found or not owned by user"));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete a chat session and all its messages.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `session_id` - The MongoDB document ID of the session
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns `ServerFnError` if authentication or the delete fails.
|
||||
#[server(endpoint = "delete-chat-session")]
|
||||
pub async fn delete_chat_session(session_id: String) -> Result<(), ServerFnError> {
|
||||
use mongodb::bson::{doc, oid::ObjectId};
|
||||
|
||||
let user_sub = require_user_sub().await?;
|
||||
let state = require_state().await?;
|
||||
|
||||
let oid = ObjectId::parse_str(&session_id)
|
||||
.map_err(|e| ServerFnError::new(format!("invalid session id: {e}")))?;
|
||||
|
||||
// Delete the session (scoped to user)
|
||||
state
|
||||
.db
|
||||
.chat_sessions()
|
||||
.delete_one(doc! { "_id": oid, "user_sub": &user_sub })
|
||||
.await
|
||||
.map_err(|e| ServerFnError::new(format!("delete session failed: {e}")))?;
|
||||
|
||||
// Delete all messages belonging to this session
|
||||
state
|
||||
.db
|
||||
.chat_messages()
|
||||
.delete_many(doc! { "session_id": &session_id })
|
||||
.await
|
||||
.map_err(|e| ServerFnError::new(format!("delete messages failed: {e}")))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Load all messages for a chat session, ordered by timestamp ascending.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `session_id` - The MongoDB document ID of the session
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns `ServerFnError` if authentication or the query fails.
|
||||
#[server(endpoint = "list-chat-messages")]
|
||||
pub async fn list_chat_messages(session_id: String) -> Result<Vec<ChatMessage>, ServerFnError> {
|
||||
use mongodb::bson::doc;
|
||||
use mongodb::options::FindOptions;
|
||||
|
||||
// Verify the user owns this session
|
||||
let user_sub = require_user_sub().await?;
|
||||
let state = require_state().await?;
|
||||
|
||||
// Verify the user owns this session using ObjectId for _id matching
|
||||
use mongodb::bson::oid::ObjectId;
|
||||
let session_oid = ObjectId::parse_str(&session_id)
|
||||
.map_err(|e| ServerFnError::new(format!("invalid session id: {e}")))?;
|
||||
|
||||
let session_exists = state
|
||||
.db
|
||||
.raw_collection("chat_sessions")
|
||||
.count_documents(doc! { "_id": session_oid, "user_sub": &user_sub })
|
||||
.await
|
||||
.map_err(|e| ServerFnError::new(format!("db error: {e}")))?;
|
||||
|
||||
if session_exists == 0 {
|
||||
return Err(ServerFnError::new("session not found or not owned by user"));
|
||||
}
|
||||
|
||||
let opts = FindOptions::builder().sort(doc! { "timestamp": 1 }).build();
|
||||
|
||||
let mut cursor = state
|
||||
.db
|
||||
.raw_collection("chat_messages")
|
||||
.find(doc! { "session_id": &session_id })
|
||||
.with_options(opts)
|
||||
.await
|
||||
.map_err(|e| ServerFnError::new(format!("db error: {e}")))?;
|
||||
|
||||
let mut messages = Vec::new();
|
||||
use futures::TryStreamExt;
|
||||
while let Some(raw_doc) = cursor
|
||||
.try_next()
|
||||
.await
|
||||
.map_err(|e| ServerFnError::new(format!("cursor error: {e}")))?
|
||||
{
|
||||
messages.push(doc_to_chat_message(&raw_doc));
|
||||
}
|
||||
|
||||
Ok(messages)
|
||||
}
|
||||
|
||||
/// Persist a single chat message and return it with the MongoDB-generated ID.
|
||||
///
|
||||
/// Also updates the parent session's `updated_at` timestamp.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `session_id` - The session this message belongs to
|
||||
/// * `role` - Message role string: `"user"`, `"assistant"`, or `"system"`
|
||||
/// * `content` - Message text content
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns `ServerFnError` if authentication or the insert fails.
|
||||
#[server(endpoint = "save-chat-message")]
|
||||
pub async fn save_chat_message(
|
||||
session_id: String,
|
||||
role: String,
|
||||
content: String,
|
||||
) -> Result<ChatMessage, ServerFnError> {
|
||||
use crate::models::ChatRole;
|
||||
use mongodb::bson::{doc, oid::ObjectId};
|
||||
|
||||
let _user_sub = require_user_sub().await?;
|
||||
let state = require_state().await?;
|
||||
|
||||
let chat_role = match role.as_str() {
|
||||
"assistant" => ChatRole::Assistant,
|
||||
"system" => ChatRole::System,
|
||||
_ => ChatRole::User,
|
||||
};
|
||||
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
let message = ChatMessage {
|
||||
id: String::new(),
|
||||
session_id: session_id.clone(),
|
||||
role: chat_role,
|
||||
content,
|
||||
attachments: Vec::new(),
|
||||
timestamp: now.clone(),
|
||||
};
|
||||
|
||||
let result = state
|
||||
.db
|
||||
.chat_messages()
|
||||
.insert_one(&message)
|
||||
.await
|
||||
.map_err(|e| ServerFnError::new(format!("insert failed: {e}")))?;
|
||||
|
||||
let id = result
|
||||
.inserted_id
|
||||
.as_object_id()
|
||||
.map(|oid| oid.to_hex())
|
||||
.unwrap_or_default();
|
||||
|
||||
// Update session's updated_at timestamp
|
||||
if let Ok(session_oid) = ObjectId::parse_str(&session_id) {
|
||||
let _ = state
|
||||
.db
|
||||
.chat_sessions()
|
||||
.update_one(
|
||||
doc! { "_id": session_oid },
|
||||
doc! { "$set": { "updated_at": &now } },
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(ChatMessage { id, ..message })
|
||||
}
|
||||
|
||||
/// Non-streaming chat completion (fallback for article panel).
|
||||
///
|
||||
/// Sends the full conversation history to the configured LLM provider
|
||||
/// and returns the complete response. Used where SSE streaming is not
|
||||
/// needed (e.g. dashboard article follow-up panel).
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `session_id` - The chat session ID (loads provider/model config)
|
||||
/// * `messages_json` - Conversation history as JSON string:
|
||||
/// `[{"role":"user","content":"..."},...]`
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns `ServerFnError` if the LLM request fails.
|
||||
#[server(endpoint = "chat-complete")]
|
||||
pub async fn chat_complete(
|
||||
session_id: String,
|
||||
messages_json: String,
|
||||
) -> Result<String, ServerFnError> {
|
||||
use mongodb::bson::{doc, oid::ObjectId};
|
||||
|
||||
let _user_sub = require_user_sub().await?;
|
||||
let state = require_state().await?;
|
||||
|
||||
// Load the session to get provider/model
|
||||
let session_oid = ObjectId::parse_str(&session_id)
|
||||
.map_err(|e| ServerFnError::new(format!("invalid session id: {e}")))?;
|
||||
|
||||
let session_doc = state
|
||||
.db
|
||||
.raw_collection("chat_sessions")
|
||||
.find_one(doc! { "_id": session_oid })
|
||||
.await
|
||||
.map_err(|e| ServerFnError::new(format!("db error: {e}")))?
|
||||
.ok_or_else(|| ServerFnError::new("session not found"))?;
|
||||
let session = doc_to_chat_session(&session_doc);
|
||||
|
||||
// Resolve provider URL and model
|
||||
let (base_url, model) = resolve_provider_url(&state, &session.provider, &session.model);
|
||||
|
||||
// Parse messages from JSON
|
||||
let chat_msgs: Vec<serde_json::Value> = serde_json::from_str(&messages_json)
|
||||
.map_err(|e| ServerFnError::new(format!("invalid messages JSON: {e}")))?;
|
||||
|
||||
let body = serde_json::json!({
|
||||
"model": model,
|
||||
"messages": chat_msgs,
|
||||
"stream": false,
|
||||
});
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let url = format!("{}/v1/chat/completions", base_url.trim_end_matches('/'));
|
||||
|
||||
let resp = client
|
||||
.post(&url)
|
||||
.header("content-type", "application/json")
|
||||
.json(&body)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| ServerFnError::new(format!("LLM request failed: {e}")))?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
let status = resp.status();
|
||||
let text = resp.text().await.unwrap_or_default();
|
||||
return Err(ServerFnError::new(format!("LLM returned {status}: {text}")));
|
||||
}
|
||||
|
||||
let json: serde_json::Value = resp
|
||||
.json()
|
||||
.await
|
||||
.map_err(|e| ServerFnError::new(format!("parse error: {e}")))?;
|
||||
|
||||
json["choices"][0]["message"]["content"]
|
||||
.as_str()
|
||||
.map(String::from)
|
||||
.ok_or_else(|| ServerFnError::new("empty LLM response"))
|
||||
}
|
||||
|
||||
/// Resolve the base URL for a provider, falling back to server defaults.
|
||||
#[cfg(feature = "server")]
|
||||
fn resolve_provider_url(
|
||||
state: &crate::infrastructure::ServerState,
|
||||
provider: &str,
|
||||
model: &str,
|
||||
) -> (String, String) {
|
||||
match provider {
|
||||
"openai" => ("https://api.openai.com".to_string(), model.to_string()),
|
||||
"anthropic" => ("https://api.anthropic.com".to_string(), model.to_string()),
|
||||
"huggingface" => (
|
||||
format!("https://api-inference.huggingface.co/models/{}", model),
|
||||
model.to_string(),
|
||||
),
|
||||
// Default to Ollama
|
||||
_ => (
|
||||
state.services.ollama_url.clone(),
|
||||
if model.is_empty() {
|
||||
state.services.ollama_model.clone()
|
||||
} else {
|
||||
model.to_string()
|
||||
},
|
||||
),
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user