fix: parse tool_use/tool_result messages and add OpenCode SQLite backend (#1401)

* fix: parse tool_use/tool_result messages and add OpenCode SQLite backend

  - Claude: reclassify user messages containing tool_result as "tool" role
  - Codex: handle function_call and function_call_output payload types
  - Gemini: support array content and toolCalls extraction, filter info/error types
  - OpenCode: add SQLite session scan, load and delete alongside legacy JSON
  - utils: extend parse_timestamp_to_ms for integer timestamps, extract tool_use/tool_result in shared extract_text

* fix: address remaining issues from tool_use/tool_result parsing commit
  - Claude: fix role misclassification for mixed user+tool_result messages (any → all)
  - OpenCode: extract duplicate part text logic into extract_part_text()
  - OpenCode: add path validation for SQLite delete to prevent foreign DB access
  - OpenCode: wrap SQLite deletion in transaction for atomicity
  - openclaw_config: remove redundant as_deref() on Option<&str>
This commit is contained in:
BlueOcean
2026-03-22 22:02:35 +08:00
committed by GitHub
parent 117dbf1386
commit bd3cfb7741
7 changed files with 858 additions and 35 deletions
+1 -1
View File
@@ -294,7 +294,7 @@ impl OpenClawConfigDocument {
if let Some(existing) = key_value_pairs
.iter_mut()
.find(|pair| json5_key_name(&pair.key).as_deref() == Some(key))
.find(|pair| json5_key_name(&pair.key) == Some(key))
{
existing.value = new_value;
return Ok(());
+10
View File
@@ -69,6 +69,11 @@ pub fn scan_sessions() -> Vec<SessionMeta> {
}
pub fn load_messages(provider_id: &str, source_path: &str) -> Result<Vec<SessionMessage>, String> {
// OpenCode SQLite sessions use a "sqlite:" prefixed source_path
if provider_id == "opencode" && source_path.starts_with("sqlite:") {
return opencode::load_messages_sqlite(source_path);
}
let path = Path::new(source_path);
match provider_id {
"codex" => codex::load_messages(path),
@@ -85,6 +90,11 @@ pub fn delete_session(
session_id: &str,
source_path: &str,
) -> Result<bool, String> {
// OpenCode SQLite sessions bypass the file-based deletion path
if provider_id == "opencode" && source_path.starts_with("sqlite:") {
return opencode::delete_session_sqlite(session_id, source_path);
}
let root = provider_root(provider_id)?;
delete_session_with_root(provider_id, session_id, Path::new(source_path), &root)
}
@@ -52,11 +52,25 @@ pub fn load_messages(path: &Path) -> Result<Vec<SessionMessage>, String> {
None => continue,
};
let role = message
let mut role = message
.get("role")
.and_then(Value::as_str)
.unwrap_or("unknown")
.to_string();
// Claude wraps tool_result inside user messages; reclassify as "tool" role
if role == "user" {
if let Some(Value::Array(items)) = message.get("content") {
let all_tool_results = !items.is_empty()
&& items.iter().all(|item| {
item.get("type").and_then(Value::as_str) == Some("tool_result")
});
if all_tool_results {
role = "tool".to_string();
}
}
}
let content = message.get("content").map(extract_text).unwrap_or_default();
if content.trim().is_empty() {
continue;
@@ -268,4 +282,58 @@ mod tests {
assert!(!path.exists());
assert!(!sidecar.exists());
}
#[test]
fn load_messages_tool_use_shows_as_assistant() {
let temp = tempdir().expect("tempdir");
let path = temp.path().join("session.jsonl");
std::fs::write(
&path,
concat!(
"{\"message\":{\"role\":\"assistant\",\"content\":[{\"type\":\"tool_use\",\"id\":\"toolu_1\",\"name\":\"Write\",\"input\":{\"file_path\":\"a.txt\"}}]},\"timestamp\":\"2026-03-06T10:00:00Z\"}\n",
"{\"message\":{\"role\":\"user\",\"content\":[{\"type\":\"tool_result\",\"tool_use_id\":\"toolu_1\",\"content\":\"File written\"}]},\"timestamp\":\"2026-03-06T10:00:01Z\"}\n",
),
)
.expect("write");
let msgs = load_messages(&path).expect("load");
assert_eq!(msgs.len(), 2);
assert_eq!(msgs[0].role, "assistant");
assert!(msgs[0].content.contains("[Tool: Write]"));
assert_eq!(msgs[1].role, "tool");
assert_eq!(msgs[1].content, "File written");
}
#[test]
fn load_messages_mixed_text_and_tool_use() {
let temp = tempdir().expect("tempdir");
let path = temp.path().join("session.jsonl");
std::fs::write(
&path,
"{\"message\":{\"role\":\"assistant\",\"content\":[{\"type\":\"text\",\"text\":\"Let me help.\"},{\"type\":\"tool_use\",\"id\":\"toolu_1\",\"name\":\"Read\",\"input\":{}}]},\"timestamp\":\"2026-03-06T10:00:00Z\"}\n",
)
.expect("write");
let msgs = load_messages(&path).expect("load");
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].role, "assistant");
assert!(msgs[0].content.contains("Let me help."));
assert!(msgs[0].content.contains("[Tool: Read]"));
}
#[test]
fn load_messages_mixed_user_tool_result_and_text_stays_user() {
let temp = tempdir().expect("tempdir");
let path = temp.path().join("session.jsonl");
std::fs::write(
&path,
"{\"message\":{\"role\":\"user\",\"content\":[{\"type\":\"tool_result\",\"tool_use_id\":\"toolu_1\",\"content\":\"result\"},{\"type\":\"text\",\"text\":\"Please continue\"}]},\"timestamp\":\"2026-03-06T10:00:00Z\"}\n",
)
.expect("write");
let msgs = load_messages(&path).expect("load");
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].role, "user");
assert!(msgs[0].content.contains("Please continue"));
}
}
@@ -59,16 +59,37 @@ pub fn load_messages(path: &Path) -> Result<Vec<SessionMessage>, String> {
None => continue,
};
if payload.get("type").and_then(Value::as_str) != Some("message") {
continue;
}
let payload_type = payload.get("type").and_then(Value::as_str).unwrap_or("");
// Codex uses separate payload types for tool interactions
let (role, content) = match payload_type {
"message" => {
let role = payload
.get("role")
.and_then(Value::as_str)
.unwrap_or("unknown")
.to_string();
let content = payload.get("content").map(extract_text).unwrap_or_default();
(role, content)
}
"function_call" => {
let name = payload
.get("name")
.and_then(Value::as_str)
.unwrap_or("unknown");
("assistant".to_string(), format!("[Tool: {name}]"))
}
"function_call_output" => {
let output = payload
.get("output")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
("tool".to_string(), output)
}
_ => continue,
};
let role = payload
.get("role")
.and_then(Value::as_str)
.unwrap_or("unknown")
.to_string();
let content = payload.get("content").map(extract_text).unwrap_or_default();
if content.trim().is_empty() {
continue;
}
@@ -239,4 +260,36 @@ mod tests {
assert!(!path.exists());
}
#[test]
fn load_messages_includes_function_call_and_output() {
let temp = tempdir().expect("tempdir");
let path = temp.path().join("session.jsonl");
std::fs::write(
&path,
concat!(
"{\"timestamp\":\"2026-03-06T21:50:12Z\",\"type\":\"session_meta\",\"payload\":{\"id\":\"test-id\",\"cwd\":\"/tmp\"}}\n",
"{\"timestamp\":\"2026-03-06T21:50:13Z\",\"type\":\"response_item\",\"payload\":{\"type\":\"message\",\"role\":\"user\",\"content\":\"list files\"}}\n",
"{\"timestamp\":\"2026-03-06T21:50:14Z\",\"type\":\"response_item\",\"payload\":{\"type\":\"function_call\",\"name\":\"shell\",\"arguments\":\"{\\\"cmd\\\":[\\\"ls\\\"]}\",\"call_id\":\"call_1\"}}\n",
"{\"timestamp\":\"2026-03-06T21:50:15Z\",\"type\":\"response_item\",\"payload\":{\"type\":\"function_call_output\",\"call_id\":\"call_1\",\"output\":\"file1.txt\\nfile2.txt\"}}\n",
"{\"timestamp\":\"2026-03-06T21:50:16Z\",\"type\":\"response_item\",\"payload\":{\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"output_text\",\"text\":\"Done.\"}]}}\n",
),
)
.expect("write");
let msgs = load_messages(&path).expect("load");
assert_eq!(msgs.len(), 4);
assert_eq!(msgs[0].role, "user");
assert_eq!(msgs[0].content, "list files");
assert_eq!(msgs[1].role, "assistant");
assert!(msgs[1].content.contains("[Tool: shell]"));
assert_eq!(msgs[2].role, "tool");
assert!(msgs[2].content.contains("file1.txt"));
assert_eq!(msgs[3].role, "assistant");
assert_eq!(msgs[3].content, "Done.");
}
}
@@ -60,21 +60,47 @@ pub fn load_messages(path: &Path) -> Result<Vec<SessionMessage>, String> {
let mut result = Vec::new();
for msg in messages {
let content = match msg.get("content").and_then(Value::as_str) {
Some(c) if !c.trim().is_empty() => c.to_string(),
_ => continue,
let role = match msg.get("type").and_then(Value::as_str) {
Some("gemini") => "assistant",
Some("user") => "user",
Some("info") | Some("error") => continue,
Some(_) | None => continue,
};
let role = match msg.get("type").and_then(Value::as_str) {
Some("gemini") => "assistant".to_string(),
Some("user") => "user".to_string(),
Some(other) => other.to_string(),
None => continue,
// Gemini content may be a plain string or an array of {text: ...} objects
let mut content = match msg.get("content") {
Some(Value::String(s)) => s.to_string(),
Some(Value::Array(items)) => items
.iter()
.filter_map(|item| item.get("text").and_then(Value::as_str))
.collect::<Vec<_>>()
.join("\n"),
_ => String::new(),
};
// Append tool call names from the optional toolCalls array
if let Some(Value::Array(calls)) = msg.get("toolCalls") {
for call in calls {
if let Some(name) = call.get("name").and_then(Value::as_str) {
if !content.is_empty() {
content.push('\n');
}
content.push_str(&format!("[Tool: {name}]"));
}
}
}
if content.trim().is_empty() {
continue;
}
let ts = msg.get("timestamp").and_then(parse_timestamp_to_ms);
result.push(SessionMessage { role, content, ts });
result.push(SessionMessage {
role: role.to_string(),
content,
ts,
});
}
Ok(result)
@@ -172,4 +198,55 @@ mod tests {
assert!(!path.exists());
}
#[test]
fn load_messages_handles_array_content() {
let temp = tempdir().expect("tempdir");
let path = temp.path().join("session.json");
std::fs::write(
&path,
r#"{
"sessionId": "test",
"messages": [
{"id":"1","timestamp":"2026-03-06T10:00:00Z","type":"user","content":[{"text":"hello"}]},
{"id":"2","timestamp":"2026-03-06T10:00:01Z","type":"gemini","content":"world"},
{"id":"3","timestamp":"2026-03-06T10:00:02Z","type":"info","content":"system info"},
{"id":"4","timestamp":"2026-03-06T10:00:03Z","type":"error","content":"MCP ERROR"}
]
}"#,
)
.expect("write");
let msgs = load_messages(&path).expect("load");
assert_eq!(msgs.len(), 2);
assert_eq!(msgs[0].role, "user");
assert_eq!(msgs[0].content, "hello");
assert_eq!(msgs[1].role, "assistant");
assert_eq!(msgs[1].content, "world");
}
#[test]
fn load_messages_includes_tool_calls() {
let temp = tempdir().expect("tempdir");
let path = temp.path().join("session.json");
std::fs::write(
&path,
r#"{
"sessionId": "test",
"messages": [
{"id":"1","timestamp":"2026-03-10T08:24:50Z","type":"gemini","content":"","toolCalls":[{"id":"call_1","name":"web_search","args":{"query":"test"}}]},
{"id":"2","timestamp":"2026-03-10T08:25:00Z","type":"gemini","content":"Here are the results.","toolCalls":[{"id":"call_2","name":"web_fetch","args":{"url":"http://example.com"}}]}
]
}"#,
)
.expect("write");
let msgs = load_messages(&path).expect("load");
assert_eq!(msgs.len(), 2);
assert_eq!(msgs[0].role, "assistant");
assert!(msgs[0].content.contains("[Tool: web_search]"));
assert_eq!(msgs[1].role, "assistant");
assert!(msgs[1].content.contains("Here are the results."));
assert!(msgs[1].content.contains("[Tool: web_fetch]"));
}
}
@@ -1,5 +1,6 @@
use std::path::{Path, PathBuf};
use rusqlite::Connection;
use serde_json::Value;
use crate::session_manager::{SessionMessage, SessionMeta};
@@ -8,22 +9,59 @@ use super::utils::{parse_timestamp_to_ms, path_basename, truncate_summary};
const PROVIDER_ID: &str = "opencode";
/// Return the OpenCode data directory.
/// Return the OpenCode base directory (`$XDG_DATA_HOME/opencode`).
///
/// Respects `XDG_DATA_HOME` on all platforms; falls back to
/// `~/.local/share/opencode/storage/`.
pub(crate) fn get_opencode_data_dir() -> PathBuf {
/// `~/.local/share/opencode/`.
pub(crate) fn get_opencode_base_dir() -> PathBuf {
if let Ok(xdg) = std::env::var("XDG_DATA_HOME") {
if !xdg.is_empty() {
return PathBuf::from(xdg).join("opencode").join("storage");
return PathBuf::from(xdg).join("opencode");
}
}
dirs::home_dir()
.map(|h| h.join(".local/share/opencode/storage"))
.unwrap_or_else(|| PathBuf::from(".local/share/opencode/storage"))
.map(|h| h.join(".local/share/opencode"))
.unwrap_or_else(|| PathBuf::from(".local/share/opencode"))
}
/// Return the OpenCode JSON storage directory (legacy flat-file layout).
pub(crate) fn get_opencode_data_dir() -> PathBuf {
get_opencode_base_dir().join("storage")
}
fn get_opencode_db_path() -> PathBuf {
get_opencode_base_dir().join("opencode.db")
}
/// Scan sessions from both the legacy JSON files and the newer SQLite database,
/// merging results with SQLite taking precedence on ID conflicts.
pub fn scan_sessions() -> Vec<SessionMeta> {
let json_sessions = scan_sessions_json();
let sqlite_sessions = scan_sessions_sqlite();
if sqlite_sessions.is_empty() {
return json_sessions;
}
if json_sessions.is_empty() {
return sqlite_sessions;
}
// Deduplicate: keep SQLite version when the same session_id exists in both
let sqlite_ids: std::collections::HashSet<String> = sqlite_sessions
.iter()
.map(|s| s.session_id.clone())
.collect();
let mut merged = sqlite_sessions;
for s in json_sessions {
if !sqlite_ids.contains(&s.session_id) {
merged.push(s);
}
}
merged
}
fn scan_sessions_json() -> Vec<SessionMeta> {
let storage = get_opencode_data_dir();
let session_dir = storage.join("session");
if !session_dir.exists() {
@@ -42,6 +80,81 @@ pub fn scan_sessions() -> Vec<SessionMeta> {
sessions
}
/// Parse a SQLite source reference in the format `sqlite:<db_path>:<session_id>`.
///
/// Uses `rfind(":ses_")` to split the path from the session ID because the
/// db path itself may contain colons (e.g. `C:\Users\...` on Windows).
/// This relies on the OpenCode convention that session IDs start with `ses_`.
fn parse_sqlite_source(source: &str) -> Option<(PathBuf, String)> {
let rest = source.strip_prefix("sqlite:")?;
let sep = rest.rfind(":ses_")?;
let db_path = PathBuf::from(&rest[..sep]);
let session_id = rest[sep + 1..].to_string();
Some((db_path, session_id))
}
fn scan_sessions_sqlite() -> Vec<SessionMeta> {
let db_path = get_opencode_db_path();
if !db_path.exists() {
return Vec::new();
}
let conn = match Connection::open_with_flags(
&db_path,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
) {
Ok(c) => c,
Err(_) => return Vec::new(),
};
let mut stmt = match conn.prepare(
"SELECT id, title, directory, time_created, time_updated FROM session ORDER BY time_updated DESC",
) {
Ok(s) => s,
Err(_) => return Vec::new(),
};
let db_display = db_path.display().to_string();
let iter = match stmt.query_map([], |row| {
let session_id: String = row.get(0)?;
let title: String = row.get(1)?;
let directory: String = row.get(2)?;
let created: i64 = row.get(3)?;
let updated: i64 = row.get(4)?;
Ok((session_id, title, directory, created, updated))
}) {
Ok(rows) => rows,
Err(_) => return Vec::new(),
};
let mut sessions = Vec::new();
for row in iter.flatten() {
let (session_id, title, directory, created, updated) = row;
let display_title = if title.is_empty() {
path_basename(&directory)
} else {
Some(title)
};
sessions.push(SessionMeta {
provider_id: PROVIDER_ID.to_string(),
session_id: session_id.clone(),
title: display_title.clone(),
summary: display_title,
project_dir: if directory.is_empty() {
None
} else {
Some(directory)
},
created_at: Some(created),
last_active_at: Some(updated),
source_path: Some(format!("sqlite:{}:{}", db_display, session_id)),
resume_command: Some(format!("opencode session resume {session_id}")),
});
}
sessions
}
pub fn load_messages(path: &Path) -> Result<Vec<SessionMessage>, String> {
// `path` is the message directory: storage/message/{sessionID}/
if !path.is_dir() {
@@ -111,6 +224,95 @@ pub fn load_messages(path: &Path) -> Result<Vec<SessionMessage>, String> {
Ok(messages)
}
/// Load messages from the OpenCode SQLite database for a given source reference.
/// Joins the `message` and `part` tables in memory to reconstruct full messages.
pub fn load_messages_sqlite(source: &str) -> Result<Vec<SessionMessage>, String> {
let (db_path, session_id) = parse_sqlite_source(source)
.ok_or_else(|| format!("Invalid SQLite source reference: {source}"))?;
let conn = Connection::open_with_flags(
&db_path,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
)
.map_err(|e| format!("Failed to open OpenCode database: {e}"))?;
let mut msg_stmt = conn
.prepare(
"SELECT id, time_created, data FROM message WHERE session_id = ?1 ORDER BY time_created ASC",
)
.map_err(|e| format!("Failed to prepare message query: {e}"))?;
let msg_rows = msg_stmt
.query_map([session_id.as_str()], |row| {
let id: String = row.get(0)?;
let ts: i64 = row.get(1)?;
let data: String = row.get(2)?;
Ok((id, ts, data))
})
.map_err(|e| format!("Failed to query messages: {e}"))?;
let mut part_stmt = conn
.prepare(
"SELECT message_id, data FROM part WHERE session_id = ?1 ORDER BY time_created ASC",
)
.map_err(|e| format!("Failed to prepare part query: {e}"))?;
let part_rows = part_stmt
.query_map([session_id.as_str()], |row| {
let message_id: String = row.get(0)?;
let data: String = row.get(1)?;
Ok((message_id, data))
})
.map_err(|e| format!("Failed to query parts: {e}"))?;
let mut parts_map: std::collections::HashMap<String, Vec<String>> =
std::collections::HashMap::new();
for part in part_rows.flatten() {
let (message_id, data) = part;
parts_map.entry(message_id).or_default().push(data);
}
let mut messages = Vec::new();
for row in msg_rows.flatten() {
let (msg_id, ts, data) = row;
let msg_value: Value = match serde_json::from_str(&data) {
Ok(v) => v,
Err(_) => continue,
};
let role = msg_value
.get("role")
.and_then(Value::as_str)
.unwrap_or("unknown")
.to_string();
let mut texts = Vec::new();
if let Some(parts) = parts_map.get(&msg_id) {
for part_data in parts {
let part_value: Value = match serde_json::from_str(part_data) {
Ok(v) => v,
Err(_) => continue,
};
if let Some(text) = extract_part_text(&part_value) {
texts.push(text);
}
}
}
let content = texts.join("\n");
if content.trim().is_empty() {
continue;
}
messages.push(SessionMessage {
role,
content,
ts: Some(ts),
});
}
Ok(messages)
}
pub fn delete_session(storage: &Path, path: &Path, session_id: &str) -> Result<bool, String> {
if path.file_name().and_then(|name| name.to_str()) != Some(session_id) {
return Err(format!(
@@ -176,6 +378,48 @@ pub fn delete_session(storage: &Path, path: &Path, session_id: &str) -> Result<b
Ok(true)
}
/// Delete a session from the OpenCode SQLite database.
pub fn delete_session_sqlite(session_id: &str, source: &str) -> Result<bool, String> {
let (db_path, ref_session_id) = parse_sqlite_source(source)
.ok_or_else(|| format!("Invalid SQLite source reference: {source}"))?;
let db_path = db_path
.canonicalize()
.map_err(|e| format!("Failed to canonicalize SQLite database path: {e}"))?;
let expected_db_path = get_opencode_db_path()
.canonicalize()
.map_err(|e| format!("Failed to canonicalize expected OpenCode database path: {e}"))?;
if ref_session_id != session_id {
return Err(format!(
"OpenCode SQLite session ID mismatch: expected {session_id}, found {ref_session_id}"
));
}
if db_path != expected_db_path {
return Err("SQLite path does not match expected OpenCode database".to_string());
}
let conn =
Connection::open(&db_path).map_err(|e| format!("Failed to open OpenCode database: {e}"))?;
let tx = conn
.unchecked_transaction()
.map_err(|e| format!("Failed to begin transaction: {e}"))?;
tx.execute("DELETE FROM part WHERE session_id = ?1", [session_id])
.map_err(|e| format!("Failed to delete OpenCode parts: {e}"))?;
tx.execute("DELETE FROM message WHERE session_id = ?1", [session_id])
.map_err(|e| format!("Failed to delete OpenCode messages: {e}"))?;
let deleted = tx
.execute("DELETE FROM session WHERE id = ?1", [session_id])
.map_err(|e| format!("Failed to delete OpenCode session: {e}"))?;
tx.commit()
.map_err(|e| format!("Failed to commit session deletion: {e}"))?;
Ok(deleted > 0)
}
fn parse_session(storage: &Path, path: &Path) -> Option<SessionMeta> {
let data = std::fs::read_to_string(path).ok()?;
let value: Value = serde_json::from_str(&data).ok()?;
@@ -286,6 +530,24 @@ fn get_first_user_summary(storage: &Path, session_id: &str) -> Option<String> {
}
/// Collect text content from all parts in a part directory.
fn extract_part_text(part_value: &Value) -> Option<String> {
match part_value.get("type").and_then(Value::as_str) {
Some("text") => part_value
.get("text")
.and_then(Value::as_str)
.filter(|t| !t.trim().is_empty())
.map(|t| t.to_string()),
Some("tool") => {
let tool = part_value
.get("tool")
.and_then(Value::as_str)
.unwrap_or("unknown");
Some(format!("[Tool: {tool}]"))
}
_ => None,
}
}
fn collect_parts_text(part_dir: &Path) -> String {
if !part_dir.is_dir() {
return String::new();
@@ -305,15 +567,8 @@ fn collect_parts_text(part_dir: &Path) -> String {
Err(_) => continue,
};
// Only include text-type parts
if value.get("type").and_then(Value::as_str) != Some("text") {
continue;
}
if let Some(text) = value.get("text").and_then(Value::as_str) {
if !text.trim().is_empty() {
texts.push(text.to_string());
}
if let Some(text) = extract_part_text(&value) {
texts.push(text);
}
}
@@ -370,8 +625,47 @@ fn remove_dir_all_if_exists(path: &Path) -> std::io::Result<()> {
#[cfg(test)]
mod tests {
use super::*;
use rusqlite::Connection;
use std::sync::{Mutex, OnceLock};
use tempfile::tempdir;
fn opencode_env_lock() -> &'static Mutex<()> {
static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
LOCK.get_or_init(|| Mutex::new(()))
}
fn create_sqlite_schema(conn: &Connection) {
conn.execute_batch(
"
PRAGMA foreign_keys = ON;
CREATE TABLE session (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
directory TEXT NOT NULL,
time_created INTEGER NOT NULL,
time_updated INTEGER NOT NULL
);
CREATE TABLE message (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
time_created INTEGER NOT NULL,
data TEXT NOT NULL,
FOREIGN KEY(session_id) REFERENCES session(id) ON DELETE CASCADE
);
CREATE TABLE part (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
message_id TEXT NOT NULL,
time_created INTEGER NOT NULL,
data TEXT NOT NULL,
FOREIGN KEY(session_id) REFERENCES session(id) ON DELETE CASCADE,
FOREIGN KEY(message_id) REFERENCES message(id) ON DELETE CASCADE
);
",
)
.expect("create sqlite schema");
}
#[test]
fn delete_session_removes_session_diff_messages_and_parts() {
let temp = tempdir().expect("tempdir");
@@ -432,4 +726,272 @@ mod tests {
.join(format!("{project_id}.json"))
.exists());
}
#[test]
fn load_messages_includes_tool_parts() {
let temp = tempdir().expect("tempdir");
let storage = temp.path();
let session_id = "ses_test";
let msg_id = "msg_1";
let msg_dir = storage.join("message").join(session_id);
let part_dir = storage.join("part").join(msg_id);
std::fs::create_dir_all(&msg_dir).expect("create msg dir");
std::fs::create_dir_all(&part_dir).expect("create part dir");
std::fs::write(
msg_dir.join(format!("{msg_id}.json")),
r#"{"id":"msg_1","role":"assistant","time":{"created":"2026-03-06T10:00:00Z"}}"#,
)
.expect("write msg");
std::fs::write(
part_dir.join("prt_1.json"),
r#"{"id":"prt_1","type":"tool","tool":"bash","state":{"status":"completed","input":{"command":"ls"},"output":"file.txt"}}"#,
)
.expect("write tool part");
std::fs::write(
part_dir.join("prt_2.json"),
r#"{"id":"prt_2","type":"text","text":"Here are the files."}"#,
)
.expect("write text part");
let msgs = load_messages(&msg_dir).expect("load");
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].role, "assistant");
assert!(msgs[0].content.contains("[Tool: bash]"));
assert!(msgs[0].content.contains("Here are the files."));
}
#[test]
fn parse_sqlite_source_accepts_valid_references() {
let parsed = parse_sqlite_source("sqlite:/tmp/opencode.db:ses_123").expect("valid source");
assert_eq!(parsed.0, PathBuf::from("/tmp/opencode.db"));
assert_eq!(parsed.1, "ses_123");
}
#[test]
fn parse_sqlite_source_rejects_invalid_references() {
assert!(parse_sqlite_source("/tmp/opencode.db:ses_123").is_none());
assert!(parse_sqlite_source("sqlite:/tmp/opencode.db:msg_123").is_none());
assert!(parse_sqlite_source("sqlite:/tmp/opencode.db").is_none());
}
#[test]
#[allow(deprecated)] // set_var/remove_var deprecated since Rust 1.81; safe here under mutex
fn scan_sessions_sqlite_reads_temp_database() {
let _guard = opencode_env_lock().lock().expect("lock");
let temp = tempdir().expect("tempdir");
let original_xdg = std::env::var_os("XDG_DATA_HOME");
std::env::set_var("XDG_DATA_HOME", temp.path());
let base_dir = temp.path().join("opencode");
std::fs::create_dir_all(&base_dir).expect("create base dir");
let db_path = base_dir.join("opencode.db");
let conn = Connection::open(&db_path).expect("open sqlite db");
create_sqlite_schema(&conn);
conn.execute(
"INSERT INTO session (id, title, directory, time_created, time_updated) VALUES (?1, ?2, ?3, ?4, ?5)",
("ses_1", "", "/tmp/project-a", 1_771_061_953_033_i64, 1_771_061_954_033_i64),
)
.expect("insert session 1");
conn.execute(
"INSERT INTO session (id, title, directory, time_created, time_updated) VALUES (?1, ?2, ?3, ?4, ?5)",
("ses_2", "Named Session", "/tmp/project-b", 1_771_061_950_000_i64, 1_771_061_955_000_i64),
)
.expect("insert session 2");
drop(conn);
let sessions = scan_sessions_sqlite();
#[allow(deprecated)]
if let Some(value) = original_xdg {
std::env::set_var("XDG_DATA_HOME", value);
} else {
std::env::remove_var("XDG_DATA_HOME");
}
assert_eq!(sessions.len(), 2);
assert_eq!(sessions[0].session_id, "ses_2");
assert_eq!(sessions[0].title.as_deref(), Some("Named Session"));
assert_eq!(sessions[1].session_id, "ses_1");
assert_eq!(sessions[1].title.as_deref(), Some("project-a"));
assert_eq!(sessions[1].project_dir.as_deref(), Some("/tmp/project-a"));
let expected_source = format!("sqlite:{}:ses_1", db_path.display());
assert_eq!(
sessions[1].source_path.as_deref(),
Some(expected_source.as_str())
);
}
#[test]
fn load_messages_sqlite_reads_messages_and_parts() {
let temp = tempdir().expect("tempdir");
let db_path = temp.path().join("opencode.db");
let conn = Connection::open(&db_path).expect("open sqlite db");
create_sqlite_schema(&conn);
conn.execute(
"INSERT INTO session (id, title, directory, time_created, time_updated) VALUES (?1, ?2, ?3, ?4, ?5)",
("ses_1", "Session", "/tmp/project-a", 1000_i64, 3000_i64),
)
.expect("insert session");
conn.execute(
"INSERT INTO message (id, session_id, time_created, data) VALUES (?1, ?2, ?3, ?4)",
("msg_1", "ses_1", 1000_i64, r#"{"role":"user"}"#),
)
.expect("insert message 1");
conn.execute(
"INSERT INTO message (id, session_id, time_created, data) VALUES (?1, ?2, ?3, ?4)",
("msg_2", "ses_1", 2000_i64, r#"{"role":"assistant"}"#),
)
.expect("insert message 2");
conn.execute(
"INSERT INTO part (id, session_id, message_id, time_created, data) VALUES (?1, ?2, ?3, ?4, ?5)",
("prt_1", "ses_1", "msg_1", 1000_i64, r#"{"type":"text","text":"Hello"}"#),
)
.expect("insert part 1");
conn.execute(
"INSERT INTO part (id, session_id, message_id, time_created, data) VALUES (?1, ?2, ?3, ?4, ?5)",
(
"prt_2",
"ses_1",
"msg_2",
2000_i64,
r#"{"type":"tool","tool":"bash"}"#,
),
)
.expect("insert part 2");
conn.execute(
"INSERT INTO part (id, session_id, message_id, time_created, data) VALUES (?1, ?2, ?3, ?4, ?5)",
(
"prt_3",
"ses_1",
"msg_2",
2001_i64,
r#"{"type":"text","text":"Done"}"#,
),
)
.expect("insert part 3");
drop(conn);
let source = format!("sqlite:{}:ses_1", db_path.display());
let messages = load_messages_sqlite(&source).expect("load sqlite messages");
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].role, "user");
assert_eq!(messages[0].content, "Hello");
assert_eq!(messages[0].ts, Some(1000));
assert_eq!(messages[1].role, "assistant");
assert_eq!(messages[1].content, "[Tool: bash]\nDone");
assert_eq!(messages[1].ts, Some(2000));
}
#[test]
fn delete_session_sqlite_removes_session() {
let _guard = opencode_env_lock().lock().expect("lock");
let temp = tempdir().expect("tempdir");
let original_xdg = std::env::var_os("XDG_DATA_HOME");
#[allow(deprecated)]
std::env::set_var("XDG_DATA_HOME", temp.path());
let base_dir = temp.path().join("opencode");
std::fs::create_dir_all(&base_dir).expect("create base dir");
let db_path = base_dir.join("opencode.db");
let conn = Connection::open(&db_path).expect("open sqlite db");
create_sqlite_schema(&conn);
conn.execute(
"INSERT INTO session (id, title, directory, time_created, time_updated) VALUES (?1, ?2, ?3, ?4, ?5)",
("ses_1", "Session", "/tmp/project-a", 1000_i64, 3000_i64),
)
.expect("insert session");
conn.execute(
"INSERT INTO message (id, session_id, time_created, data) VALUES (?1, ?2, ?3, ?4)",
("msg_1", "ses_1", 1000_i64, r#"{"role":"user"}"#),
)
.expect("insert message");
conn.execute(
"INSERT INTO part (id, session_id, message_id, time_created, data) VALUES (?1, ?2, ?3, ?4, ?5)",
("prt_1", "ses_1", "msg_1", 1000_i64, r#"{"type":"text","text":"Hello"}"#),
)
.expect("insert part");
drop(conn);
let source = format!("sqlite:{}:ses_1", db_path.display());
let deleted = delete_session_sqlite("ses_1", &source).expect("delete sqlite session");
assert!(deleted);
let conn = Connection::open(&db_path).expect("re-open sqlite db");
let remaining_sessions: i64 = conn
.query_row(
"SELECT COUNT(*) FROM session WHERE id = 'ses_1'",
[],
|row| row.get(0),
)
.expect("count sessions");
let remaining_messages: i64 = conn
.query_row(
"SELECT COUNT(*) FROM message WHERE session_id = 'ses_1'",
[],
|row| row.get(0),
)
.expect("count messages");
let remaining_parts: i64 = conn
.query_row(
"SELECT COUNT(*) FROM part WHERE session_id = 'ses_1'",
[],
|row| row.get(0),
)
.expect("count parts");
assert_eq!(remaining_sessions, 0);
assert_eq!(remaining_messages, 0);
assert_eq!(remaining_parts, 0);
#[allow(deprecated)]
if let Some(value) = original_xdg {
std::env::set_var("XDG_DATA_HOME", value);
} else {
std::env::remove_var("XDG_DATA_HOME");
}
}
#[test]
fn delete_session_sqlite_rejects_foreign_db_path() {
let _guard = opencode_env_lock().lock().expect("lock");
let temp = tempdir().expect("tempdir");
let original_xdg = std::env::var_os("XDG_DATA_HOME");
#[allow(deprecated)]
std::env::set_var("XDG_DATA_HOME", temp.path());
let expected_base_dir = temp.path().join("opencode");
std::fs::create_dir_all(&expected_base_dir).expect("create expected base dir");
let expected_db_path = expected_base_dir.join("opencode.db");
Connection::open(&expected_db_path).expect("create expected sqlite db");
let db_path = temp.path().join("foreign.db");
let conn = Connection::open(&db_path).expect("open sqlite db");
create_sqlite_schema(&conn);
conn.execute(
"INSERT INTO session (id, title, directory, time_created, time_updated) VALUES (?1, ?2, ?3, ?4, ?5)",
("ses_1", "Session", "/tmp/project", 1000_i64, 3000_i64),
)
.expect("insert session");
drop(conn);
let source = format!("sqlite:{}:ses_1", db_path.display());
let err = delete_session_sqlite("ses_1", &source).expect_err("should reject foreign db");
assert!(err.contains("expected OpenCode database"));
#[allow(deprecated)]
if let Some(value) = original_xdg {
std::env::set_var("XDG_DATA_HOME", value);
} else {
std::env::remove_var("XDG_DATA_HOME");
}
}
}
@@ -46,6 +46,15 @@ pub fn read_head_tail_lines(
}
pub fn parse_timestamp_to_ms(value: &Value) -> Option<i64> {
// Integer: milliseconds (>1e12) or seconds
if let Some(n) = value.as_i64() {
return Some(if n > 1_000_000_000_000 { n } else { n * 1000 });
}
if let Some(n) = value.as_f64() {
let n = n as i64;
return Some(if n > 1_000_000_000_000 { n } else { n * 1000 });
}
// RFC3339 string
let raw = value.as_str()?;
DateTime::parse_from_rfc3339(raw)
.ok()
@@ -71,6 +80,28 @@ pub fn extract_text(content: &Value) -> String {
}
fn extract_text_from_item(item: &Value) -> Option<String> {
let item_type = item.get("type").and_then(Value::as_str).unwrap_or("");
// tool_use: show tool name
if item_type == "tool_use" {
let name = item
.get("name")
.and_then(Value::as_str)
.unwrap_or("unknown");
return Some(format!("[Tool: {name}]"));
}
// tool_result: extract nested content
if item_type == "tool_result" {
if let Some(content) = item.get("content") {
let text = extract_text(content);
if !text.is_empty() {
return Some(text);
}
}
return None;
}
if let Some(text) = item.get("text").and_then(|v| v.as_str()) {
return Some(text.to_string());
}
@@ -119,3 +150,25 @@ pub fn path_basename(value: &str) -> Option<String> {
.filter(|segment| !segment.is_empty())?;
Some(last.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn parse_timestamp_to_ms_supports_integers_and_rfc3339() {
assert_eq!(
parse_timestamp_to_ms(&json!(1_771_061_953_033_i64)),
Some(1_771_061_953_033)
);
assert_eq!(
parse_timestamp_to_ms(&json!(1_771_061_953_i64)),
Some(1_771_061_953_000)
);
assert_eq!(
parse_timestamp_to_ms(&json!("1970-01-01T00:00:01Z")),
Some(1_000)
);
}
}