feat: implement Hermes session manager with SQLite + JSONL support (Phase 6)

- Add hermes.rs session provider with dual-source scanning:
  SQLite (state.db) as primary, JSONL transcripts as fallback
- Dynamic schema discovery via PRAGMA table_info for SQLite resilience
- Use read_head_tail_lines for efficient JSONL metadata extraction
  (head 30 lines for metadata, tail 10 for last_active_at)
- Support both flat and nested JSONL message formats
- Add SQLite session loading and transactional deletion
- Register hermes in parallel session scan (thread::scope)
- Add "hermes" to frontend ProviderFilter type
- 7 unit tests covering JSONL parsing, SQLite source parsing, deletion
This commit is contained in:
Jason
2026-04-15 21:17:36 +08:00
parent 240969d8c7
commit e8953c286f
4 changed files with 621 additions and 7 deletions
+15 -6
View File
@@ -4,7 +4,7 @@ pub mod terminal;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use providers::{claude, codex, gemini, openclaw, opencode};
use providers::{claude, codex, gemini, hermes, openclaw, opencode};
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
@@ -56,18 +56,20 @@ pub struct DeleteSessionOutcome {
}
pub fn scan_sessions() -> Vec<SessionMeta> {
let (r1, r2, r3, r4, r5) = std::thread::scope(|s| {
let (r1, r2, r3, r4, r5, r6) = std::thread::scope(|s| {
let h1 = s.spawn(codex::scan_sessions);
let h2 = s.spawn(claude::scan_sessions);
let h3 = s.spawn(opencode::scan_sessions);
let h4 = s.spawn(openclaw::scan_sessions);
let h5 = s.spawn(gemini::scan_sessions);
let h6 = s.spawn(hermes::scan_sessions);
(
h1.join().unwrap_or_default(),
h2.join().unwrap_or_default(),
h3.join().unwrap_or_default(),
h4.join().unwrap_or_default(),
h5.join().unwrap_or_default(),
h6.join().unwrap_or_default(),
)
});
@@ -77,6 +79,7 @@ pub fn scan_sessions() -> Vec<SessionMeta> {
sessions.extend(r3);
sessions.extend(r4);
sessions.extend(r5);
sessions.extend(r6);
sessions.sort_by(|a, b| {
let a_ts = a.last_active_at.or(a.created_at).unwrap_or(0);
@@ -88,10 +91,13 @@ pub fn scan_sessions() -> Vec<SessionMeta> {
}
pub fn load_messages(provider_id: &str, source_path: &str) -> Result<Vec<SessionMessage>, String> {
// OpenCode SQLite sessions use a "sqlite:" prefixed source_path
// SQLite sessions use a "sqlite:" prefixed source_path
if provider_id == "opencode" && source_path.starts_with("sqlite:") {
return opencode::load_messages_sqlite(source_path);
}
if provider_id == "hermes" && source_path.starts_with("sqlite:") {
return hermes::load_messages_sqlite(source_path);
}
let path = Path::new(source_path);
match provider_id {
@@ -100,7 +106,7 @@ pub fn load_messages(provider_id: &str, source_path: &str) -> Result<Vec<Session
"opencode" => opencode::load_messages(path),
"openclaw" => openclaw::load_messages(path),
"gemini" => gemini::load_messages(path),
"hermes" => Err("Hermes session loading not yet implemented".to_string()),
"hermes" => hermes::load_messages(path),
_ => Err(format!("Unsupported provider: {provider_id}")),
}
}
@@ -110,10 +116,13 @@ pub fn delete_session(
session_id: &str,
source_path: &str,
) -> Result<bool, String> {
// OpenCode SQLite sessions bypass the file-based deletion path
// SQLite sessions bypass the file-based deletion path
if provider_id == "opencode" && source_path.starts_with("sqlite:") {
return opencode::delete_session_sqlite(session_id, source_path);
}
if provider_id == "hermes" && source_path.starts_with("sqlite:") {
return hermes::delete_session_sqlite(session_id, source_path);
}
let root = provider_root(provider_id)?;
delete_session_with_root(provider_id, session_id, Path::new(source_path), &root)
@@ -151,7 +160,7 @@ fn delete_session_with_root(
"opencode" => opencode::delete_session(&validated_root, &validated_source, session_id),
"openclaw" => openclaw::delete_session(&validated_root, &validated_source, session_id),
"gemini" => gemini::delete_session(&validated_root, &validated_source, session_id),
"hermes" => Err("Hermes session deletion not yet implemented".to_string()),
"hermes" => hermes::delete_session(&validated_root, &validated_source, session_id),
_ => Err(format!("Unsupported provider: {provider_id}")),
}
}
@@ -0,0 +1,603 @@
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use rusqlite::Connection;
use serde_json::Value;
use crate::hermes_config::get_hermes_dir;
use crate::session_manager::{SessionMessage, SessionMeta};
use super::utils::{
extract_text, parse_timestamp_to_ms, read_head_tail_lines, truncate_summary, TITLE_MAX_CHARS,
};
const PROVIDER_ID: &str = "hermes";
fn get_hermes_db_path() -> PathBuf {
get_hermes_dir().join("state.db")
}
fn get_hermes_sessions_dir() -> PathBuf {
get_hermes_dir().join("sessions")
}
/// Scan sessions from both SQLite database and JSONL transcript files,
/// with SQLite taking precedence on ID conflicts.
pub fn scan_sessions() -> Vec<SessionMeta> {
let sqlite_sessions = scan_sessions_sqlite();
let jsonl_sessions = scan_sessions_jsonl();
if sqlite_sessions.is_empty() {
return jsonl_sessions;
}
if jsonl_sessions.is_empty() {
return sqlite_sessions;
}
let sqlite_ids: std::collections::HashSet<String> = sqlite_sessions
.iter()
.map(|s| s.session_id.clone())
.collect();
let mut merged = sqlite_sessions;
for s in jsonl_sessions {
if !sqlite_ids.contains(&s.session_id) {
merged.push(s);
}
}
merged
}
// ── SQLite scanning ─────────────────────────────────────────────────
fn scan_sessions_sqlite() -> Vec<SessionMeta> {
let db_path = get_hermes_db_path();
if !db_path.exists() {
return Vec::new();
}
let conn = match Connection::open_with_flags(
&db_path,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
) {
Ok(c) => c,
Err(_) => return Vec::new(),
};
// Check if sessions table exists
let has_sessions: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='sessions'",
[],
|row| row.get(0),
)
.unwrap_or(false);
if !has_sessions {
return Vec::new();
}
// Query sessions — use flexible column access via pragma
let columns = get_table_columns(&conn, "sessions");
let query = "SELECT * FROM sessions ORDER BY rowid DESC LIMIT 500";
let mut stmt = match conn.prepare(query) {
Ok(s) => s,
Err(_) => return Vec::new(),
};
let mut sessions = Vec::new();
let rows = match stmt.query_map([], |row| Ok(row_to_json(row, &columns))) {
Ok(r) => r,
Err(_) => return Vec::new(),
};
let db_source = format!("sqlite:{}", db_path.display());
for row_result in rows.flatten() {
if let Some(meta) = sqlite_row_to_session_meta(&row_result, &db_source) {
sessions.push(meta);
}
}
sessions
}
fn sqlite_row_to_session_meta(row: &Value, db_source: &str) -> Option<SessionMeta> {
let obj = row.as_object()?;
let session_id = obj.get("id").and_then(Value::as_str)?.to_string();
let title = obj
.get("title")
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
.map(|s| truncate_summary(s, TITLE_MAX_CHARS).to_string());
let cwd = obj
.get("cwd")
.or_else(|| obj.get("directory"))
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
let started_at = obj
.get("started_at")
.or_else(|| obj.get("created_at"))
.and_then(parse_timestamp_to_ms);
let ended_at = obj
.get("ended_at")
.or_else(|| obj.get("updated_at"))
.and_then(parse_timestamp_to_ms);
let source_path = format!("{}#{}", db_source, session_id);
Some(SessionMeta {
provider_id: PROVIDER_ID.to_string(),
session_id,
title,
summary: None,
project_dir: cwd,
created_at: started_at,
last_active_at: ended_at.or(started_at),
source_path: Some(source_path),
resume_command: None,
})
}
/// Get column names for a table.
fn get_table_columns(conn: &Connection, table: &str) -> Vec<String> {
let query = format!("PRAGMA table_info({table})");
let mut stmt = match conn.prepare(&query) {
Ok(s) => s,
Err(_) => return Vec::new(),
};
let rows = match stmt.query_map([], |row| {
let name: String = row.get(1)?;
Ok(name)
}) {
Ok(r) => r,
Err(_) => return Vec::new(),
};
rows.flatten().collect()
}
/// Convert a SQLite row to a JSON Value using known column names.
fn row_to_json(row: &rusqlite::Row, columns: &[String]) -> Value {
let mut map = serde_json::Map::new();
for (i, col) in columns.iter().enumerate() {
// Try string first, then integer, then float, then null
if let Ok(val) = row.get::<_, String>(i) {
map.insert(col.clone(), Value::String(val));
} else if let Ok(val) = row.get::<_, i64>(i) {
map.insert(col.clone(), Value::Number(val.into()));
} else if let Ok(val) = row.get::<_, f64>(i) {
if let Some(n) = serde_json::Number::from_f64(val) {
map.insert(col.clone(), Value::Number(n));
}
} else {
map.insert(col.clone(), Value::Null);
}
}
Value::Object(map)
}
/// Load messages from the Hermes SQLite database.
pub fn load_messages_sqlite(source: &str) -> Result<Vec<SessionMessage>, String> {
let (db_path, session_id) = parse_sqlite_source(source)
.ok_or_else(|| format!("Invalid SQLite source reference: {source}"))?;
let conn = Connection::open_with_flags(
&db_path,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
)
.map_err(|e| format!("Failed to open Hermes database: {e}"))?;
// Try querying with common column names
let query =
"SELECT role, content, created_at FROM messages WHERE session_id = ?1 ORDER BY created_at ASC";
let mut stmt = conn
.prepare(query)
.map_err(|e| format!("Failed to prepare messages query: {e}"))?;
let rows = stmt
.query_map([session_id.as_str()], |row| {
let role: String = row.get(0)?;
let content: String = row.get(1)?;
let ts: Option<i64> = row.get(2).ok();
Ok((role, content, ts))
})
.map_err(|e| format!("Failed to query messages: {e}"))?;
let mut messages = Vec::new();
for row in rows.flatten() {
let (role, content, ts) = row;
if content.trim().is_empty() {
continue;
}
let ts_ms = ts.and_then(|v| parse_timestamp_to_ms(&Value::Number(v.into())));
messages.push(SessionMessage {
role,
content,
ts: ts_ms,
});
}
Ok(messages)
}
/// Delete a session from the Hermes SQLite database.
pub fn delete_session_sqlite(session_id: &str, source: &str) -> Result<bool, String> {
let (db_path, ref_session_id) = parse_sqlite_source(source)
.ok_or_else(|| format!("Invalid SQLite source reference: {source}"))?;
let db_path = db_path
.canonicalize()
.map_err(|e| format!("Failed to canonicalize Hermes database path: {e}"))?;
let expected_db_path = get_hermes_db_path()
.canonicalize()
.map_err(|e| format!("Failed to canonicalize expected Hermes database path: {e}"))?;
if ref_session_id != session_id {
return Err(format!(
"Hermes SQLite session ID mismatch: expected {session_id}, found {ref_session_id}"
));
}
if db_path != expected_db_path {
return Err("SQLite path does not match expected Hermes database".to_string());
}
let conn =
Connection::open(&db_path).map_err(|e| format!("Failed to open Hermes database: {e}"))?;
let tx = conn
.unchecked_transaction()
.map_err(|e| format!("Failed to begin transaction: {e}"))?;
// Delete messages first (child records)
let _ = tx.execute("DELETE FROM messages WHERE session_id = ?1", [session_id]);
let deleted = tx
.execute("DELETE FROM sessions WHERE id = ?1", [session_id])
.map_err(|e| format!("Failed to delete Hermes session: {e}"))?;
tx.commit()
.map_err(|e| format!("Failed to commit session deletion: {e}"))?;
Ok(deleted > 0)
}
fn parse_sqlite_source(source: &str) -> Option<(PathBuf, String)> {
let rest = source.strip_prefix("sqlite:")?;
let hash_pos = rest.rfind('#')?;
let db_path = PathBuf::from(&rest[..hash_pos]);
let session_id = rest[hash_pos + 1..].to_string();
if session_id.is_empty() {
return None;
}
Some((db_path, session_id))
}
// ── JSONL scanning ──────────────────────────────────────────────────
fn scan_sessions_jsonl() -> Vec<SessionMeta> {
let sessions_dir = get_hermes_sessions_dir();
if !sessions_dir.exists() {
return Vec::new();
}
let entries = match std::fs::read_dir(&sessions_dir) {
Ok(e) => e,
Err(_) => return Vec::new(),
};
let mut sessions = Vec::new();
for entry in entries.flatten() {
let path = entry.path();
let ext = path.extension().and_then(|e| e.to_str());
if ext != Some("jsonl") && ext != Some("json") {
continue;
}
if let Some(meta) = parse_jsonl_session(&path) {
sessions.push(meta);
}
}
sessions
}
fn parse_jsonl_session(path: &Path) -> Option<SessionMeta> {
// Read head (metadata + first user message) and tail (last timestamp)
let (head, tail) = read_head_tail_lines(path, 30, 10).ok()?;
let mut first_user_msg: Option<String> = None;
let mut first_ts: Option<i64> = None;
let mut last_ts: Option<i64> = None;
let mut session_id: Option<String> = None;
let mut title: Option<String> = None;
let mut cwd: Option<String> = None;
// Process head lines for metadata and first user message
for line in &head {
if line.trim().is_empty() {
continue;
}
let value: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
let ts = value
.get("timestamp")
.or_else(|| value.get("ts"))
.and_then(parse_timestamp_to_ms);
if first_ts.is_none() {
first_ts = ts;
}
last_ts = ts.or(last_ts);
let line_type = value.get("type").and_then(Value::as_str).unwrap_or("");
// Extract session metadata from session-type lines
if line_type == "session" || line_type == "init" {
if session_id.is_none() {
session_id = value
.get("id")
.or_else(|| value.get("sessionId"))
.and_then(Value::as_str)
.map(|s| s.to_string());
}
if title.is_none() {
title = value
.get("title")
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
}
if cwd.is_none() {
cwd = value
.get("cwd")
.or_else(|| value.get("directory"))
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
}
}
if first_user_msg.is_none() {
let role = value
.get("role")
.or_else(|| value.get("message").and_then(|m| m.get("role")))
.and_then(Value::as_str);
if role == Some("user") {
let content = value
.get("content")
.or_else(|| value.get("message").and_then(|m| m.get("content")));
if let Some(c) = content {
let text = extract_text(c);
if !text.trim().is_empty() {
first_user_msg = Some(truncate_summary(&text, TITLE_MAX_CHARS).to_string());
}
}
}
}
}
// Process tail lines for the most recent timestamp
for line in tail.iter().rev() {
if line.trim().is_empty() {
continue;
}
let value: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
let ts = value
.get("timestamp")
.or_else(|| value.get("ts"))
.and_then(parse_timestamp_to_ms);
if let Some(t) = ts {
last_ts = Some(t);
break;
}
}
// Fall back to filename as session ID
let session_id = session_id.unwrap_or_else(|| {
path.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string()
});
let source_path = path.to_string_lossy().to_string();
Some(SessionMeta {
provider_id: PROVIDER_ID.to_string(),
session_id,
title: title.or_else(|| first_user_msg.clone()),
summary: first_user_msg,
project_dir: cwd,
created_at: first_ts,
last_active_at: last_ts.or(first_ts),
source_path: Some(source_path),
resume_command: None,
})
}
/// Load messages from a Hermes JSONL transcript file.
pub fn load_messages(path: &Path) -> Result<Vec<SessionMessage>, String> {
let file = File::open(path).map_err(|e| format!("Failed to open session file: {e}"))?;
let reader = BufReader::new(file);
let mut messages = Vec::new();
for line in reader.lines() {
let line = match line {
Ok(l) => l,
Err(_) => continue,
};
if line.trim().is_empty() {
continue;
}
let value: Value = match serde_json::from_str(&line) {
Ok(v) => v,
Err(_) => continue,
};
// Support both flat messages and nested {type:"message", message:{...}} format
let (role_val, content_val, ts_val) =
if value.get("type").and_then(Value::as_str) == Some("message") {
let msg = match value.get("message") {
Some(m) => m,
None => continue,
};
(
msg.get("role"),
msg.get("content"),
value.get("timestamp").or_else(|| msg.get("ts")),
)
} else {
(
value.get("role"),
value.get("content"),
value.get("timestamp").or_else(|| value.get("ts")),
)
};
let role = match role_val.and_then(Value::as_str) {
Some(r) => r.to_string(),
None => continue,
};
let content = content_val.map(extract_text).unwrap_or_default();
if content.trim().is_empty() {
continue;
}
let ts = ts_val.and_then(parse_timestamp_to_ms);
messages.push(SessionMessage { role, content, ts });
}
Ok(messages)
}
/// Delete a Hermes JSONL session file.
pub fn delete_session(_root: &Path, path: &Path, _session_id: &str) -> Result<bool, String> {
std::fs::remove_file(path).map_err(|e| {
format!(
"Failed to delete Hermes session file {}: {e}",
path.display()
)
})?;
Ok(true)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::tempdir;
#[test]
fn parse_sqlite_source_valid() {
let (path, id) = parse_sqlite_source("sqlite:/home/user/.hermes/state.db#session-123")
.expect("should parse");
assert_eq!(path, PathBuf::from("/home/user/.hermes/state.db"));
assert_eq!(id, "session-123");
}
#[test]
fn parse_sqlite_source_invalid() {
assert!(parse_sqlite_source("not-sqlite").is_none());
assert!(parse_sqlite_source("sqlite:").is_none());
assert!(parse_sqlite_source("sqlite:/path#").is_none());
}
#[test]
fn parse_jsonl_session_extracts_metadata() {
let dir = tempdir().expect("tempdir");
let path = dir.path().join("test-session.jsonl");
let mut f = File::create(&path).expect("create");
writeln!(
f,
r#"{{"type":"session","id":"s1","title":"My Session","cwd":"/home/user/project"}}"#
)
.unwrap();
writeln!(f, r#"{{"type":"message","message":{{"role":"user","content":"Hello world"}},"timestamp":"2026-01-01T00:00:00Z"}}"#).unwrap();
writeln!(f, r#"{{"type":"message","message":{{"role":"assistant","content":"Hi there"}},"timestamp":"2026-01-01T00:01:00Z"}}"#).unwrap();
f.flush().unwrap();
let meta = parse_jsonl_session(&path).expect("should parse");
assert_eq!(meta.session_id, "s1");
assert_eq!(meta.title.as_deref(), Some("My Session"));
assert_eq!(meta.project_dir.as_deref(), Some("/home/user/project"));
assert!(meta.created_at.is_some());
assert!(meta.last_active_at.is_some());
}
#[test]
fn parse_jsonl_session_fallback_to_filename() {
let dir = tempdir().expect("tempdir");
let path = dir.path().join("my-session.jsonl");
let mut f = File::create(&path).expect("create");
writeln!(f, r#"{{"role":"user","content":"Hello","ts":1700000000}}"#).unwrap();
f.flush().unwrap();
let meta = parse_jsonl_session(&path).expect("should parse");
assert_eq!(meta.session_id, "my-session");
assert!(meta.title.is_some()); // Falls back to first user message
}
#[test]
fn load_messages_flat_format() {
let dir = tempdir().expect("tempdir");
let path = dir.path().join("session.jsonl");
let mut f = File::create(&path).expect("create");
writeln!(
f,
r#"{{"role":"user","content":"What is Rust?","ts":1700000000}}"#
)
.unwrap();
writeln!(
f,
r#"{{"role":"assistant","content":"A systems programming language.","ts":1700000001}}"#
)
.unwrap();
f.flush().unwrap();
let msgs = load_messages(&path).expect("should load");
assert_eq!(msgs.len(), 2);
assert_eq!(msgs[0].role, "user");
assert_eq!(msgs[1].role, "assistant");
}
#[test]
fn load_messages_nested_format() {
let dir = tempdir().expect("tempdir");
let path = dir.path().join("session.jsonl");
let mut f = File::create(&path).expect("create");
writeln!(f, r#"{{"type":"session","id":"s1"}}"#).unwrap();
writeln!(f, r#"{{"type":"message","message":{{"role":"user","content":"Hello"}},"timestamp":"2026-01-01T00:00:00Z"}}"#).unwrap();
writeln!(f, r#"{{"type":"message","message":{{"role":"assistant","content":"Hi"}},"timestamp":"2026-01-01T00:01:00Z"}}"#).unwrap();
f.flush().unwrap();
let msgs = load_messages(&path).expect("should load");
assert_eq!(msgs.len(), 2);
assert_eq!(msgs[0].role, "user");
assert!(msgs[0].ts.is_some());
}
#[test]
fn delete_session_removes_file() {
let dir = tempdir().expect("tempdir");
let path = dir.path().join("session.jsonl");
File::create(&path).expect("create");
assert!(path.exists());
delete_session(dir.path(), &path, "session").expect("should delete");
assert!(!path.exists());
}
}
@@ -1,6 +1,7 @@
pub mod claude;
pub mod codex;
pub mod gemini;
pub mod hermes;
pub mod openclaw;
pub mod opencode;
mod utils;
@@ -62,7 +62,8 @@ type ProviderFilter =
| "claude"
| "opencode"
| "openclaw"
| "gemini";
| "gemini"
| "hermes";
export function SessionManagerPage({ appId }: { appId: string }) {
const { t } = useTranslation();