From f061b777b7b530eaf81dda301c729f3fd4f4f299 Mon Sep 17 00:00:00 2001 From: Jason Date: Wed, 29 Apr 2026 22:38:29 +0800 Subject: [PATCH] feat(usage): add Hermes Agent tracking + fix zero-cost bug + perf Hermes: - Parse ~/.hermes/state.db sessions (incl. profiles/*/state.db) into proxy_request_logs with data_source='hermes_session', WAL-aware incremental sync, Hermes-reported cost preferred over model_pricing fallback Zero-cost bug (dashboard showed \$0 totals): - GPT-5.5 family default pricing (~83% of affected rows used GPT-5.5) - find_model_pricing_row: ASCII-lowercase normalization so "OpenAI/GPT-5.5@HIGH" matches seeded "gpt-5.5" - Startup cost backfill in async task: scan rows where total_cost <= 0 but tokens > 0, recompute via model_pricing in a single transaction Performance: - Add (app_type, created_at DESC) covering index for dashboard range queries - Add expression index on COALESCE(data_source, 'proxy') so dedup EXISTS subqueries use index lookup instead of full scan; drop superseded idx_request_logs_dedup_lookup Refactor: - row_to_request_log_detail helper (3-way de-dup; fixes cost_multiplier \"1\" vs \"1.0\" drift between callers) - Promote get_sync_state/update_sync_state to shared session_usage module (4 copies -> 1) - run_step helper in lib.rs replaces 9 if-let-Err blocks - maybe_backfill_log_costs returns bool to skip duplicate total_cost parsing in caller --- src-tauri/src/commands/usage.rs | 13 + src-tauri/src/database/schema.rs | 39 +- src-tauri/src/lib.rs | 86 +- src-tauri/src/services/mod.rs | 1 + src-tauri/src/services/session_usage.rs | 12 +- src-tauri/src/services/session_usage_codex.rs | 35 +- .../src/services/session_usage_gemini.rs | 35 +- .../src/services/session_usage_hermes.rs | 892 ++++++++++++++++++ src-tauri/src/services/usage_stats.rs | 315 +++++-- src/components/usage/DataSourceBar.tsx | 1 + src/components/usage/RequestLogTable.tsx | 1 + src/components/usage/UsageDashboard.tsx | 1 + src/i18n/locales/en.json | 6 +- src/i18n/locales/ja.json | 6 +- src/i18n/locales/zh.json | 6 +- src/types/usage.ts | 2 +- 16 files changed, 1248 insertions(+), 203 deletions(-) create mode 100644 src-tauri/src/services/session_usage_hermes.rs diff --git a/src-tauri/src/commands/usage.rs b/src-tauri/src/commands/usage.rs index 8c9047310..6e46acf2d 100644 --- a/src-tauri/src/commands/usage.rs +++ b/src-tauri/src/commands/usage.rs @@ -220,6 +220,19 @@ pub fn sync_session_usage( } } + // 同步 Hermes 使用数据 + match crate::services::session_usage_hermes::sync_hermes_usage(&state.db) { + Ok(hermes_result) => { + result.imported += hermes_result.imported; + result.skipped += hermes_result.skipped; + result.files_scanned += hermes_result.files_scanned; + result.errors.extend(hermes_result.errors); + } + Err(e) => { + result.errors.push(format!("Hermes 同步失败: {e}")); + } + } + Ok(result) } diff --git a/src-tauri/src/database/schema.rs b/src-tauri/src/database/schema.rs index 208f79413..d28ccdca5 100644 --- a/src-tauri/src/database/schema.rs +++ b/src-tauri/src/database/schema.rs @@ -214,7 +214,7 @@ impl Database { [], ) .map_err(|e| AppError::Database(e.to_string()))?; - Self::create_request_logs_dedup_index_if_supported(conn)?; + Self::create_request_logs_usage_indexes_if_supported(conn)?; // 11. Model Pricing 表 conn.execute( @@ -1108,7 +1108,7 @@ impl Database { "data_source", "TEXT NOT NULL DEFAULT 'proxy'", )?; - Self::create_request_logs_dedup_index_if_supported(conn)?; + Self::create_request_logs_usage_indexes_if_supported(conn)?; } // 2. 创建会话日志同步状态表 @@ -1298,6 +1298,13 @@ impl Database { "0.30", "3.75", ), + // GPT-5.5 系列 + ("gpt-5.5", "GPT-5.5", "5", "30", "0.50", "0"), + ("gpt-5.5-low", "GPT-5.5", "5", "30", "0.50", "0"), + ("gpt-5.5-medium", "GPT-5.5", "5", "30", "0.50", "0"), + ("gpt-5.5-high", "GPT-5.5", "5", "30", "0.50", "0"), + ("gpt-5.5-xhigh", "GPT-5.5", "5", "30", "0.50", "0"), + ("gpt-5.5-minimal", "GPT-5.5", "5", "30", "0.50", "0"), // GPT-5.4 系列 ("gpt-5.4", "GPT-5.4", "2.50", "15", "0.25", "0"), ("gpt-5.4-mini", "GPT-5.4 Mini", "0.75", "4.50", "0.075", "0"), @@ -1910,11 +1917,22 @@ impl Database { Ok(()) } - fn create_request_logs_dedup_index_if_supported(conn: &Connection) -> Result<(), AppError> { + fn create_request_logs_usage_indexes_if_supported(conn: &Connection) -> Result<(), AppError> { if !Self::table_exists(conn, "proxy_request_logs")? { return Ok(()); } + let has_app_type = Self::has_column(conn, "proxy_request_logs", "app_type")?; + let has_created_at = Self::has_column(conn, "proxy_request_logs", "created_at")?; + if has_app_type && has_created_at { + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_request_logs_app_created_at + ON proxy_request_logs(app_type, created_at DESC)", + [], + ) + .map_err(|e| AppError::Database(format!("创建使用量应用时间索引失败: {e}")))?; + } + let required_columns = [ "app_type", "data_source", @@ -1930,13 +1948,20 @@ impl Database { } } + conn.execute("DROP INDEX IF EXISTS idx_request_logs_dedup_lookup", []) + .map_err(|e| AppError::Database(format!("删除旧使用量去重索引失败: {e}")))?; + + // 查询层为了兼容历史 NULL data_source 行,会使用 + // COALESCE(data_source, 'proxy')。普通 data_source 索引无法匹配该表达式, + // 会让跨源去重子查询退化成大量扫描;表达式索引让 SQLite 能按同一表达式查找。 conn.execute( - "CREATE INDEX IF NOT EXISTS idx_request_logs_dedup_lookup - ON proxy_request_logs(app_type, data_source, input_tokens, output_tokens, - cache_read_tokens, created_at, cache_creation_tokens)", + "CREATE INDEX IF NOT EXISTS idx_request_logs_dedup_lookup_expr + ON proxy_request_logs(app_type, COALESCE(data_source, 'proxy'), input_tokens, + output_tokens, cache_read_tokens, created_at, + cache_creation_tokens)", [], ) - .map_err(|e| AppError::Database(format!("创建使用量去重索引失败: {e}")))?; + .map_err(|e| AppError::Database(format!("创建使用量去重表达式索引失败: {e}")))?; Ok(()) } diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 9e7b937ce..d7e7498ba 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -924,28 +924,35 @@ pub fn run() { tauri::async_runtime::spawn(async move { const SESSION_SYNC_INTERVAL_SECS: u64 = 60; + fn run_step(name: &str, result: Result) { + if let Err(e) = result { + log::warn!("{name} failed: {e}"); + } + } + + let db = &db_for_session_sync; + // 首次同步 - if let Err(e) = - crate::services::session_usage::sync_claude_session_logs( - &db_for_session_sync, - ) - { - log::warn!("Session usage initial sync failed: {e}"); - } - if let Err(e) = - crate::services::session_usage_codex::sync_codex_usage( - &db_for_session_sync, - ) - { - log::warn!("Codex usage initial sync failed: {e}"); - } - if let Err(e) = - crate::services::session_usage_gemini::sync_gemini_usage( - &db_for_session_sync, - ) - { - log::warn!("Gemini usage initial sync failed: {e}"); - } + run_step( + "Usage cost startup backfill", + db.backfill_missing_usage_costs(), + ); + run_step( + "Session usage initial sync", + crate::services::session_usage::sync_claude_session_logs(db), + ); + run_step( + "Codex usage initial sync", + crate::services::session_usage_codex::sync_codex_usage(db), + ); + run_step( + "Gemini usage initial sync", + crate::services::session_usage_gemini::sync_gemini_usage(db), + ); + run_step( + "Hermes usage initial sync", + crate::services::session_usage_hermes::sync_hermes_usage(db), + ); // 定期同步 let mut interval = tokio::time::interval(std::time::Duration::from_secs( @@ -954,27 +961,22 @@ pub fn run() { interval.tick().await; // skip immediate first tick loop { interval.tick().await; - if let Err(e) = - crate::services::session_usage::sync_claude_session_logs( - &db_for_session_sync, - ) - { - log::warn!("Session usage periodic sync failed: {e}"); - } - if let Err(e) = - crate::services::session_usage_codex::sync_codex_usage( - &db_for_session_sync, - ) - { - log::warn!("Codex usage periodic sync failed: {e}"); - } - if let Err(e) = - crate::services::session_usage_gemini::sync_gemini_usage( - &db_for_session_sync, - ) - { - log::warn!("Gemini usage periodic sync failed: {e}"); - } + run_step( + "Session usage periodic sync", + crate::services::session_usage::sync_claude_session_logs(db), + ); + run_step( + "Codex usage periodic sync", + crate::services::session_usage_codex::sync_codex_usage(db), + ); + run_step( + "Gemini usage periodic sync", + crate::services::session_usage_gemini::sync_gemini_usage(db), + ); + run_step( + "Hermes usage periodic sync", + crate::services::session_usage_hermes::sync_hermes_usage(db), + ); } }); }); diff --git a/src-tauri/src/services/mod.rs b/src-tauri/src/services/mod.rs index d9fb5077c..20c13302f 100644 --- a/src-tauri/src/services/mod.rs +++ b/src-tauri/src/services/mod.rs @@ -12,6 +12,7 @@ pub mod proxy; pub mod session_usage; pub mod session_usage_codex; pub mod session_usage_gemini; +pub mod session_usage_hermes; pub mod skill; pub mod speedtest; pub mod stream_check; diff --git a/src-tauri/src/services/session_usage.rs b/src-tauri/src/services/session_usage.rs index 902b78cbc..5f9f0f618 100644 --- a/src-tauri/src/services/session_usage.rs +++ b/src-tauri/src/services/session_usage.rs @@ -308,8 +308,10 @@ fn sync_single_file(db: &Database, file_path: &Path) -> Result<(u32, u32), AppEr Ok((imported, skipped)) } -/// 获取文件的同步状态 -fn get_sync_state(db: &Database, file_path: &str) -> Result<(i64, i64), AppError> { +/// 获取 session_log_sync 表中某条目的同步进度。 +/// +/// Shared by all session_usage_* parsers. +pub(crate) fn get_sync_state(db: &Database, file_path: &str) -> Result<(i64, i64), AppError> { let conn = lock_conn!(db.conn); let result = conn.query_row( "SELECT last_modified, last_line_offset FROM session_log_sync WHERE file_path = ?1", @@ -319,8 +321,10 @@ fn get_sync_state(db: &Database, file_path: &str) -> Result<(i64, i64), AppError Ok(result.unwrap_or((0, 0))) } -/// 更新文件的同步状态 -fn update_sync_state( +/// 更新 session_log_sync 表中某条目的同步进度。 +/// +/// Shared by all session_usage_* parsers. +pub(crate) fn update_sync_state( db: &Database, file_path: &str, last_modified: i64, diff --git a/src-tauri/src/services/session_usage_codex.rs b/src-tauri/src/services/session_usage_codex.rs index b666634ff..ac4f81c09 100644 --- a/src-tauri/src/services/session_usage_codex.rs +++ b/src-tauri/src/services/session_usage_codex.rs @@ -18,7 +18,7 @@ use crate::database::{lock_conn, Database}; use crate::error::AppError; use crate::proxy::usage::calculator::{CostCalculator, ModelPricing}; use crate::proxy::usage::parser::TokenUsage; -use crate::services::session_usage::SessionSyncResult; +use crate::services::session_usage::{get_sync_state, update_sync_state, SessionSyncResult}; use crate::services::usage_stats::{should_skip_session_insert, DedupKey}; use rust_decimal::Decimal; use std::fs; @@ -538,39 +538,6 @@ fn insert_codex_session_entry( Ok(true) } -/// 获取文件的同步状态 -fn get_sync_state(db: &Database, file_path: &str) -> Result<(i64, i64), AppError> { - let conn = lock_conn!(db.conn); - let result = conn.query_row( - "SELECT last_modified, last_line_offset FROM session_log_sync WHERE file_path = ?1", - rusqlite::params![file_path], - |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)), - ); - Ok(result.unwrap_or((0, 0))) -} - -/// 更新文件的同步状态 -fn update_sync_state( - db: &Database, - file_path: &str, - last_modified: i64, - last_offset: i64, -) -> Result<(), AppError> { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .map(|d| d.as_secs() as i64) - .unwrap_or(0); - - let conn = lock_conn!(db.conn); - conn.execute( - "INSERT OR REPLACE INTO session_log_sync (file_path, last_modified, last_line_offset, last_synced_at) - VALUES (?1, ?2, ?3, ?4)", - rusqlite::params![file_path, last_modified, last_offset, now], - ) - .map_err(|e| AppError::Database(format!("更新同步状态失败: {e}")))?; - Ok(()) -} - /// ��找 Codex 模型定价(带归一化) fn find_codex_pricing(conn: &rusqlite::Connection, model_id: &str) -> Option { let normalized = normalize_codex_model(model_id); diff --git a/src-tauri/src/services/session_usage_gemini.rs b/src-tauri/src/services/session_usage_gemini.rs index e0049f91b..ef68d20da 100644 --- a/src-tauri/src/services/session_usage_gemini.rs +++ b/src-tauri/src/services/session_usage_gemini.rs @@ -18,7 +18,7 @@ use crate::error::AppError; use crate::gemini_config::get_gemini_dir; use crate::proxy::usage::calculator::{CostCalculator, ModelPricing}; use crate::proxy::usage::parser::TokenUsage; -use crate::services::session_usage::SessionSyncResult; +use crate::services::session_usage::{get_sync_state, update_sync_state, SessionSyncResult}; use crate::services::usage_stats::{should_skip_session_insert, DedupKey}; use rust_decimal::Decimal; use std::fs; @@ -356,39 +356,6 @@ fn insert_gemini_session_entry( Ok(conn.changes() > 0) } -/// 获取文件的同步状态 -fn get_sync_state(db: &Database, file_path: &str) -> Result<(i64, i64), AppError> { - let conn = lock_conn!(db.conn); - let result = conn.query_row( - "SELECT last_modified, last_line_offset FROM session_log_sync WHERE file_path = ?1", - rusqlite::params![file_path], - |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)), - ); - Ok(result.unwrap_or((0, 0))) -} - -/// 更新文件的同步状态 -fn update_sync_state( - db: &Database, - file_path: &str, - last_modified: i64, - last_offset: i64, -) -> Result<(), AppError> { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .map(|d| d.as_secs() as i64) - .unwrap_or(0); - - let conn = lock_conn!(db.conn); - conn.execute( - "INSERT OR REPLACE INTO session_log_sync (file_path, last_modified, last_line_offset, last_synced_at) - VALUES (?1, ?2, ?3, ?4)", - rusqlite::params![file_path, last_modified, last_offset, now], - ) - .map_err(|e| AppError::Database(format!("更新同步状态失败: {e}")))?; - Ok(()) -} - /// 查找 Gemini 模型定价 fn find_gemini_pricing(conn: &rusqlite::Connection, model_id: &str) -> Option { // 精确匹配 diff --git a/src-tauri/src/services/session_usage_hermes.rs b/src-tauri/src/services/session_usage_hermes.rs new file mode 100644 index 000000000..cec315c97 --- /dev/null +++ b/src-tauri/src/services/session_usage_hermes.rs @@ -0,0 +1,892 @@ +//! Hermes Agent session usage tracking. +//! +//! Reads token and cost totals from Hermes' local SQLite state database and +//! imports one usage row per session into `proxy_request_logs`. + +use crate::database::{lock_conn, Database}; +use crate::error::AppError; +use crate::hermes_config::get_hermes_dir; +use crate::proxy::usage::calculator::{CostCalculator, ModelPricing}; +use crate::proxy::usage::parser::TokenUsage; +use crate::services::session_usage::{get_sync_state, update_sync_state, SessionSyncResult}; +use crate::services::usage_stats::{has_matching_proxy_usage_log, DedupKey}; +use rusqlite::{Connection, OpenFlags}; +use rust_decimal::Decimal; +use std::collections::HashSet; +use std::fs; +use std::path::{Path, PathBuf}; +use std::time::SystemTime; + +const HERMES_APP_TYPE: &str = "hermes"; +const HERMES_PROVIDER_ID: &str = "_hermes_session"; +const HERMES_DATA_SOURCE: &str = "hermes_session"; +const HERMES_SYNC_PREFIX: &str = "sqlite:"; +const WATERMARK_OVERLAP_MS: i64 = 300_000; +const ACTIVE_SESSION_LOOKBACK_MS: i64 = 24 * 60 * 60 * 1000; + +#[derive(Debug)] +struct HermesSessionUsage { + session_id: String, + model: String, + timestamp_ms: i64, + input_tokens: u32, + output_tokens: u32, + cache_read_tokens: u32, + cache_creation_tokens: u32, + total_cost_usd: Option, +} + +/// Sync Hermes usage data from state.db and profile state databases. +pub fn sync_hermes_usage(db: &Database) -> Result { + let hermes_dir = get_hermes_dir(); + let db_paths = collect_hermes_db_paths(&hermes_dir); + + let mut result = SessionSyncResult { + imported: 0, + skipped: 0, + files_scanned: db_paths.len() as u32, + errors: vec![], + }; + + for db_path in &db_paths { + match sync_single_hermes_db(db, db_path) { + Ok((imported, skipped)) => { + result.imported += imported; + result.skipped += skipped; + } + Err(e) => { + let msg = format!("Hermes 数据库同步失败 {}: {e}", db_path.display()); + log::warn!("[HERMES-SYNC] {msg}"); + result.errors.push(msg); + } + } + } + + if result.imported > 0 { + log::info!( + "[HERMES-SYNC] 同步完成: 导入 {} 条, 跳过 {} 条, 扫描 {} 个数据库", + result.imported, + result.skipped, + result.files_scanned + ); + } + + Ok(result) +} + +fn collect_hermes_db_paths(hermes_dir: &Path) -> Vec { + let mut paths = Vec::new(); + + let root_db = hermes_dir.join("state.db"); + if root_db.is_file() { + paths.push(root_db); + } + + let profiles_dir = hermes_dir.join("profiles"); + if let Ok(entries) = fs::read_dir(&profiles_dir) { + for entry in entries.flatten() { + let profile_db = entry.path().join("state.db"); + if profile_db.is_file() { + paths.push(profile_db); + } + } + } + + paths +} + +fn sync_single_hermes_db(db: &Database, db_path: &Path) -> Result<(u32, u32), AppError> { + let sync_key = hermes_sync_key(db_path); + let file_modified = sqlite_db_modified_ms(db_path); + let (last_modified, last_watermark) = get_sync_state(db, &sync_key)?; + + if file_modified <= last_modified { + return Ok((0, 0)); + } + + let conn = Connection::open_with_flags( + db_path, + OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX, + ) + .map_err(|e| AppError::Database(format!("打开 Hermes 数据库失败: {e}")))?; + + ensure_sessions_table(&conn)?; + let columns = get_table_columns(&conn, "sessions")?; + validate_hermes_columns(&columns)?; + + let threshold = last_watermark.saturating_sub(WATERMARK_OVERLAP_MS); + let sessions = load_hermes_sessions(&conn, &columns, threshold)?; + + let mut imported = 0; + let mut skipped = 0; + let mut max_watermark = last_watermark; + + for session in &sessions { + max_watermark = max_watermark.max(session.timestamp_ms); + match insert_hermes_session_entry(db, session) { + Ok(true) => imported += 1, + Ok(false) => skipped += 1, + Err(e) => { + log::warn!("[HERMES-SYNC] 插入失败 ({}): {e}", session.session_id); + skipped += 1; + } + } + } + + update_sync_state(db, &sync_key, file_modified, max_watermark)?; + + Ok((imported, skipped)) +} + +fn hermes_sync_key(db_path: &Path) -> String { + let absolute = db_path + .canonicalize() + .unwrap_or_else(|_| db_path.to_path_buf()); + format!("{HERMES_SYNC_PREFIX}{}", absolute.display()) +} + +fn sqlite_db_modified_ms(db_path: &Path) -> i64 { + [ + db_path.to_path_buf(), + sqlite_sidecar_path(db_path, "-wal"), + sqlite_sidecar_path(db_path, "-shm"), + ] + .iter() + .filter_map(|path| fs::metadata(path).ok()) + .filter_map(|metadata| metadata.modified().ok()) + .filter_map(system_time_to_ms) + .max() + .unwrap_or(0) +} + +fn sqlite_sidecar_path(db_path: &Path, suffix: &str) -> PathBuf { + let mut raw = db_path.as_os_str().to_os_string(); + raw.push(suffix); + PathBuf::from(raw) +} + +fn system_time_to_ms(time: SystemTime) -> Option { + time.duration_since(SystemTime::UNIX_EPOCH) + .ok() + .map(|duration| duration.as_millis() as i64) +} + +fn ensure_sessions_table(conn: &Connection) -> Result<(), AppError> { + let has_sessions = conn + .query_row( + "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='sessions'", + [], + |row| row.get::<_, bool>(0), + ) + .unwrap_or(false); + + if has_sessions { + Ok(()) + } else { + Err(AppError::Database( + "Hermes 数据库缺少 sessions 表".to_string(), + )) + } +} + +fn get_table_columns(conn: &Connection, table: &str) -> Result, AppError> { + let mut stmt = conn + .prepare(&format!("PRAGMA table_info({table})")) + .map_err(|e| AppError::Database(format!("读取 Hermes 表结构失败: {e}")))?; + + let rows = stmt + .query_map([], |row| row.get::<_, String>(1)) + .map_err(|e| AppError::Database(format!("读取 Hermes 字段失败: {e}")))?; + + let mut columns = HashSet::new(); + for row in rows { + columns.insert(row.map_err(|e| AppError::Database(e.to_string()))?); + } + Ok(columns) +} + +fn validate_hermes_columns(columns: &HashSet) -> Result<(), AppError> { + if !columns.contains("id") { + return Err(AppError::Database( + "Hermes sessions 表缺少 id 字段".to_string(), + )); + } + + if !columns.contains("started_at") && !columns.contains("ended_at") { + return Err(AppError::Database( + "Hermes sessions 表缺少 started_at/ended_at 字段".to_string(), + )); + } + + Ok(()) +} + +fn load_hermes_sessions( + conn: &Connection, + columns: &HashSet, + threshold_ms: i64, +) -> Result, AppError> { + let timestamp_expr = timestamp_expr(columns); + let timestamp_ms_expr = + format!("CAST(ROUND(CAST(({timestamp_expr}) AS REAL) * 1000.0) AS INTEGER)"); + let active_session_clause = if columns.contains("ended_at") && columns.contains("started_at") { + format!( + " OR (ended_at IS NULL AND {timestamp_ms_expr} > ?1 - {ACTIVE_SESSION_LOOKBACK_MS})" + ) + } else { + String::new() + }; + + let sql = format!( + "SELECT + CAST(id AS TEXT) AS id, + {model_expr} AS model, + {timestamp_ms_expr} AS timestamp_ms, + {input_expr} AS input_tokens, + {output_expr} AS output_tokens, + {reasoning_expr} AS reasoning_tokens, + {cache_read_expr} AS cache_read_tokens, + {cache_write_expr} AS cache_write_tokens, + {actual_cost_expr} AS actual_cost_usd, + {estimated_cost_expr} AS estimated_cost_usd + FROM sessions + WHERE ({timestamp_ms_expr} > ?1{active_session_clause}) + ORDER BY {timestamp_ms_expr} ASC", + model_expr = text_expr(columns, "model", "unknown"), + input_expr = int_expr(columns, "input_tokens"), + output_expr = int_expr(columns, "output_tokens"), + reasoning_expr = int_expr(columns, "reasoning_tokens"), + cache_read_expr = int_expr(columns, "cache_read_tokens"), + cache_write_expr = int_expr(columns, "cache_write_tokens"), + actual_cost_expr = cost_expr(columns, "actual_cost_usd"), + estimated_cost_expr = cost_expr(columns, "estimated_cost_usd"), + ); + + let mut stmt = conn + .prepare(&sql) + .map_err(|e| AppError::Database(format!("准备 Hermes 查询失败: {e}")))?; + + let rows = stmt + .query_map([threshold_ms], |row| { + let output_tokens = get_i64(row, 4).saturating_add(get_i64(row, 5)); + let actual_cost: Option = row.get(8)?; + let estimated_cost: Option = row.get(9)?; + + Ok(HermesSessionUsage { + session_id: row.get(0)?, + model: row + .get::<_, Option>(1)? + .unwrap_or_else(|| "unknown".to_string()), + timestamp_ms: get_i64(row, 2), + input_tokens: to_u32(get_i64(row, 3)), + output_tokens: to_u32(output_tokens), + cache_read_tokens: to_u32(get_i64(row, 6)), + cache_creation_tokens: to_u32(get_i64(row, 7)), + total_cost_usd: actual_cost + .filter(|s| !s.trim().is_empty()) + .or_else(|| estimated_cost.filter(|s| !s.trim().is_empty())), + }) + }) + .map_err(|e| AppError::Database(format!("查询 Hermes sessions 失败: {e}")))?; + + let mut sessions = Vec::new(); + for row in rows { + let session = row.map_err(|e| AppError::Database(e.to_string()))?; + if session.session_id.trim().is_empty() || session.timestamp_ms <= 0 { + continue; + } + sessions.push(session); + } + + Ok(sessions) +} + +fn timestamp_expr(columns: &HashSet) -> String { + match (columns.contains("ended_at"), columns.contains("started_at")) { + (true, true) => "COALESCE(ended_at, started_at)".to_string(), + (true, false) => "ended_at".to_string(), + (false, true) => "started_at".to_string(), + (false, false) => "NULL".to_string(), + } +} + +fn text_expr(columns: &HashSet, column: &str, default: &str) -> String { + if columns.contains(column) { + format!("COALESCE(NULLIF(TRIM(CAST({column} AS TEXT)), ''), '{default}')") + } else { + format!("'{default}'") + } +} + +fn int_expr(columns: &HashSet, column: &str) -> String { + if columns.contains(column) { + format!("CAST(COALESCE({column}, 0) AS INTEGER)") + } else { + "0".to_string() + } +} + +fn cost_expr(columns: &HashSet, column: &str) -> String { + if columns.contains(column) { + format!("NULLIF(TRIM(CAST({column} AS TEXT)), '')") + } else { + "NULL".to_string() + } +} + +fn get_i64(row: &rusqlite::Row<'_>, index: usize) -> i64 { + row.get::<_, Option>(index).ok().flatten().unwrap_or(0) +} + +fn to_u32(value: i64) -> u32 { + value.max(0).min(u32::MAX as i64) as u32 +} + +fn insert_hermes_session_entry( + db: &Database, + session: &HermesSessionUsage, +) -> Result { + let conn = lock_conn!(db.conn); + let created_at = session.timestamp_ms / 1000; + let request_id = format!("hermes-session-{}", session.session_id); + + let dedup_key = DedupKey { + app_type: HERMES_APP_TYPE, + model: &session.model, + input_tokens: session.input_tokens, + output_tokens: session.output_tokens, + cache_read_tokens: session.cache_read_tokens, + cache_creation_tokens: session.cache_creation_tokens, + created_at, + }; + // Hermes rows use UPSERT so repeated request_ids can refresh estimated cost + // into actual cost. This check only prevents double-counting when a matching + // proxy log already covers the same usage window. + if has_matching_proxy_usage_log(&conn, &dedup_key)? { + return Ok(false); + } + + let (input_cost, output_cost, cache_read_cost, cache_creation_cost, total_cost) = + match &session.total_cost_usd { + Some(total) => ( + "0".to_string(), + "0".to_string(), + "0".to_string(), + "0".to_string(), + total.clone(), + ), + None => calculate_fallback_cost(&conn, session), + }; + + conn.execute( + "INSERT INTO proxy_request_logs ( + request_id, provider_id, app_type, model, request_model, + input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, + input_cost_usd, output_cost_usd, cache_read_cost_usd, cache_creation_cost_usd, total_cost_usd, + latency_ms, first_token_ms, status_code, error_message, session_id, + provider_type, is_streaming, cost_multiplier, created_at, data_source + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24) + ON CONFLICT(request_id) DO UPDATE SET + model = excluded.model, + request_model = excluded.request_model, + input_tokens = excluded.input_tokens, + output_tokens = excluded.output_tokens, + cache_read_tokens = excluded.cache_read_tokens, + cache_creation_tokens = excluded.cache_creation_tokens, + input_cost_usd = excluded.input_cost_usd, + output_cost_usd = excluded.output_cost_usd, + cache_read_cost_usd = excluded.cache_read_cost_usd, + cache_creation_cost_usd = excluded.cache_creation_cost_usd, + total_cost_usd = excluded.total_cost_usd, + created_at = excluded.created_at + WHERE model != excluded.model + OR request_model != excluded.request_model + OR input_tokens != excluded.input_tokens + OR output_tokens != excluded.output_tokens + OR cache_read_tokens != excluded.cache_read_tokens + OR cache_creation_tokens != excluded.cache_creation_tokens + OR input_cost_usd != excluded.input_cost_usd + OR output_cost_usd != excluded.output_cost_usd + OR cache_read_cost_usd != excluded.cache_read_cost_usd + OR cache_creation_cost_usd != excluded.cache_creation_cost_usd + OR total_cost_usd != excluded.total_cost_usd + OR created_at != excluded.created_at", + rusqlite::params![ + request_id, + HERMES_PROVIDER_ID, + HERMES_APP_TYPE, + session.model, + session.model, + session.input_tokens, + session.output_tokens, + session.cache_read_tokens, + session.cache_creation_tokens, + input_cost, + output_cost, + cache_read_cost, + cache_creation_cost, + total_cost, + 0i64, + Option::::None, + 200i64, + Option::::None, + session.session_id, + Some(HERMES_DATA_SOURCE), + 1i64, + "1.0", + created_at, + HERMES_DATA_SOURCE, + ], + ) + .map_err(|e| AppError::Database(format!("插入 Hermes 会话用量失败: {e}")))?; + + Ok(conn.changes() > 0) +} + +fn calculate_fallback_cost( + conn: &Connection, + session: &HermesSessionUsage, +) -> (String, String, String, String, String) { + let usage = TokenUsage { + input_tokens: session.input_tokens, + output_tokens: session.output_tokens, + cache_read_tokens: session.cache_read_tokens, + cache_creation_tokens: session.cache_creation_tokens, + model: Some(session.model.clone()), + message_id: None, + }; + + match find_hermes_pricing(conn, &session.model) { + Some(pricing) => { + let cost = CostCalculator::calculate(&usage, &pricing, Decimal::from(1)); + ( + cost.input_cost.to_string(), + cost.output_cost.to_string(), + cost.cache_read_cost.to_string(), + cost.cache_creation_cost.to_string(), + cost.total_cost.to_string(), + ) + } + None => ( + "0".to_string(), + "0".to_string(), + "0".to_string(), + "0".to_string(), + "0".to_string(), + ), + } +} + +fn find_hermes_pricing(conn: &Connection, model_id: &str) -> Option { + let candidates = pricing_candidates(model_id); + for candidate in &candidates { + if let Some(pricing) = try_find_pricing(conn, candidate) { + return Some(pricing); + } + } + + for candidate in &candidates { + let pattern = format!("{candidate}%"); + if let Ok((input, output, cache_read, cache_creation)) = conn.query_row( + "SELECT input_cost_per_million, output_cost_per_million, + cache_read_cost_per_million, cache_creation_cost_per_million + FROM model_pricing WHERE model_id LIKE ?1 LIMIT 1", + rusqlite::params![pattern], + |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, String>(2)?, + row.get::<_, String>(3)?, + )) + }, + ) { + if let Ok(pricing) = + ModelPricing::from_strings(&input, &output, &cache_read, &cache_creation) + { + return Some(pricing); + } + } + } + + None +} + +fn pricing_candidates(model_id: &str) -> Vec { + let mut candidates = Vec::new(); + let raw = model_id.trim(); + if !raw.is_empty() { + candidates.push(raw.to_string()); + if let Some((_, tail)) = raw.rsplit_once('/') { + if !tail.is_empty() && tail != raw { + candidates.push(tail.to_string()); + } + } + } + candidates +} + +fn try_find_pricing(conn: &Connection, model_id: &str) -> Option { + conn.query_row( + "SELECT input_cost_per_million, output_cost_per_million, + cache_read_cost_per_million, cache_creation_cost_per_million + FROM model_pricing WHERE model_id = ?1", + rusqlite::params![model_id], + |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, String>(2)?, + row.get::<_, String>(3)?, + )) + }, + ) + .ok() + .and_then(|(input, output, cache_read, cache_creation)| { + ModelPricing::from_strings(&input, &output, &cache_read, &cache_creation).ok() + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::services::session_usage::get_data_source_breakdown; + use crate::services::usage_stats::LogFilters; + use std::thread; + use std::time::Duration; + use tempfile::tempdir; + + fn create_hermes_db(path: &Path) -> Connection { + let conn = Connection::open(path).expect("open hermes db"); + conn.execute( + "CREATE TABLE sessions ( + id TEXT PRIMARY KEY, + model TEXT, + started_at REAL, + ended_at REAL, + input_tokens INTEGER, + output_tokens INTEGER, + cache_read_tokens INTEGER, + cache_write_tokens INTEGER, + reasoning_tokens INTEGER, + actual_cost_usd REAL, + estimated_cost_usd REAL + )", + [], + ) + .expect("create sessions"); + conn + } + + fn insert_session( + conn: &Connection, + id: &str, + ended_at: f64, + actual_cost: Option, + estimated_cost: Option, + ) { + conn.execute( + "INSERT INTO sessions ( + id, model, started_at, ended_at, input_tokens, output_tokens, + cache_read_tokens, cache_write_tokens, reasoning_tokens, + actual_cost_usd, estimated_cost_usd + ) VALUES (?1, 'openrouter/anthropic/claude-opus-4-7', ?2, ?3, 10, 20, 3, 4, 7, ?4, ?5)", + rusqlite::params![id, ended_at - 10.0, ended_at, actual_cost, estimated_cost], + ) + .expect("insert session"); + } + + fn insert_active_session(conn: &Connection, id: &str, started_at: f64) { + conn.execute( + "INSERT INTO sessions ( + id, model, started_at, ended_at, input_tokens, output_tokens, + cache_read_tokens, cache_write_tokens, reasoning_tokens, + actual_cost_usd, estimated_cost_usd + ) VALUES (?1, 'openrouter/anthropic/claude-opus-4-7', ?2, NULL, 10, 20, 3, 4, 7, 0.1, NULL)", + rusqlite::params![id, started_at], + ) + .expect("insert active session"); + } + + #[test] + fn test_ensure_sessions_table_errors_when_missing() { + let conn = Connection::open_in_memory().expect("open memory db"); + let err = ensure_sessions_table(&conn).expect_err("missing sessions should error"); + assert!(err.to_string().contains("缺少 sessions 表")); + } + + #[test] + fn test_validate_hermes_columns_rejects_missing_id() { + let columns = HashSet::from(["started_at".to_string()]); + let err = validate_hermes_columns(&columns).expect_err("missing id should error"); + assert!(err.to_string().contains("缺少 id 字段")); + } + + #[test] + fn test_validate_hermes_columns_rejects_missing_timestamp() { + let columns = HashSet::from(["id".to_string()]); + let err = validate_hermes_columns(&columns).expect_err("missing timestamp should error"); + assert!(err.to_string().contains("缺少 started_at/ended_at 字段")); + } + + #[test] + fn test_sync_hermes_session_imports_tokens_and_actual_cost() -> Result<(), AppError> { + let temp = tempdir().expect("tempdir"); + let db_path = temp.path().join("state.db"); + let hermes_conn = create_hermes_db(&db_path); + insert_session(&hermes_conn, "s1", 1000.0, Some(0.123456), Some(0.5)); + drop(hermes_conn); + + let db = Database::memory()?; + let (imported, skipped) = sync_single_hermes_db(&db, &db_path)?; + assert_eq!((imported, skipped), (1, 0)); + + let conn = lock_conn!(db.conn); + let row: (String, String, i64, i64, i64, String, String) = conn.query_row( + "SELECT app_type, data_source, input_tokens, output_tokens, cache_creation_tokens, total_cost_usd, provider_id + FROM proxy_request_logs WHERE request_id = 'hermes-session-s1'", + [], + |row| { + Ok(( + row.get(0)?, + row.get(1)?, + row.get(2)?, + row.get(3)?, + row.get(4)?, + row.get(5)?, + row.get(6)?, + )) + }, + )?; + assert_eq!(row.0, "hermes"); + assert_eq!(row.1, HERMES_DATA_SOURCE); + assert_eq!(row.2, 10); + assert_eq!(row.3, 27); + assert_eq!(row.4, 4); + assert_eq!(row.5, "0.123456"); + assert_eq!(row.6, HERMES_PROVIDER_ID); + Ok(()) + } + + #[test] + fn test_load_hermes_sessions_limits_active_zombie_sessions() -> Result<(), AppError> { + let temp = tempdir().expect("tempdir"); + let db_path = temp.path().join("state.db"); + let hermes_conn = create_hermes_db(&db_path); + insert_active_session(&hermes_conn, "old-active", 1_000.0); + insert_active_session(&hermes_conn, "recent-active", 150_000.0); + + let columns = get_table_columns(&hermes_conn, "sessions")?; + let sessions = load_hermes_sessions(&hermes_conn, &columns, 200_000_000)?; + let ids: Vec<&str> = sessions + .iter() + .map(|session| session.session_id.as_str()) + .collect(); + + assert!(!ids.contains(&"old-active")); + assert!(ids.contains(&"recent-active")); + Ok(()) + } + + #[test] + fn test_sync_state_is_per_database_and_watermark_incremental() -> Result<(), AppError> { + let temp = tempdir().expect("tempdir"); + let db_path = temp.path().join("state.db"); + let hermes_conn = create_hermes_db(&db_path); + insert_session(&hermes_conn, "s1", 1000.0, Some(0.1), None); + drop(hermes_conn); + + let db = Database::memory()?; + let (imported, _) = sync_single_hermes_db(&db, &db_path)?; + assert_eq!(imported, 1); + + let sync_key = hermes_sync_key(&db_path); + { + let conn = lock_conn!(db.conn); + let count: i64 = conn.query_row( + "SELECT COUNT(*) FROM session_log_sync WHERE file_path = ?1", + [sync_key.as_str()], + |row| row.get(0), + )?; + assert_eq!(count, 1); + } + + thread::sleep(Duration::from_millis(5)); + let hermes_conn = Connection::open(&db_path).expect("reopen hermes db"); + insert_session(&hermes_conn, "s2", 1100.0, Some(0.2), None); + drop(hermes_conn); + + let (imported, skipped) = sync_single_hermes_db(&db, &db_path)?; + assert_eq!(imported, 1); + assert!(skipped >= 1); + + let conn = lock_conn!(db.conn); + let count: i64 = conn.query_row("SELECT COUNT(*) FROM session_log_sync", [], |row| { + row.get(0) + })?; + assert_eq!(count, 1); + let max_offset: i64 = conn.query_row( + "SELECT last_line_offset FROM session_log_sync WHERE file_path = ?1", + [sync_key.as_str()], + |row| row.get(0), + )?; + assert_eq!(max_offset, 1_100_000); + Ok(()) + } + + #[test] + fn test_sqlite_mtime_includes_wal_and_shm_sidecars() { + let temp = tempdir().expect("tempdir"); + let db_path = temp.path().join("state.db"); + fs::write(&db_path, b"db").expect("write db"); + let db_mtime = sqlite_db_modified_ms(&db_path); + thread::sleep(Duration::from_millis(20)); + let wal_path = sqlite_sidecar_path(&db_path, "-wal"); + fs::write(&wal_path, b"wal").expect("write wal"); + let with_wal = sqlite_db_modified_ms(&db_path); + assert!(with_wal >= db_mtime); + assert!(with_wal > 0); + } + + #[test] + fn test_fallback_cost_uses_model_pricing_when_hermes_cost_missing() -> Result<(), AppError> { + let temp = tempdir().expect("tempdir"); + let db_path = temp.path().join("state.db"); + let hermes_conn = create_hermes_db(&db_path); + insert_session(&hermes_conn, "s1", 1000.0, None, None); + drop(hermes_conn); + + let db = Database::memory()?; + { + let conn = lock_conn!(db.conn); + conn.execute( + "INSERT OR REPLACE INTO model_pricing ( + model_id, display_name, input_cost_per_million, output_cost_per_million, + cache_read_cost_per_million, cache_creation_cost_per_million + ) VALUES ('claude-opus-4-7', 'Claude Opus 4.7', '10', '20', '1', '5')", + [], + )?; + } + + let (imported, _) = sync_single_hermes_db(&db, &db_path)?; + assert_eq!(imported, 1); + + let conn = lock_conn!(db.conn); + let total: String = conn.query_row( + "SELECT total_cost_usd FROM proxy_request_logs WHERE request_id = 'hermes-session-s1'", + [], + |row| row.get(0), + )?; + assert_ne!(total, "0"); + Ok(()) + } + + #[test] + fn test_estimated_cost_used_when_actual_cost_missing() -> Result<(), AppError> { + let temp = tempdir().expect("tempdir"); + let db_path = temp.path().join("state.db"); + let hermes_conn = create_hermes_db(&db_path); + insert_session(&hermes_conn, "s1", 1000.0, None, Some(0.456789)); + drop(hermes_conn); + + let db = Database::memory()?; + let (imported, _) = sync_single_hermes_db(&db, &db_path)?; + assert_eq!(imported, 1); + + let conn = lock_conn!(db.conn); + let total: String = conn.query_row( + "SELECT total_cost_usd FROM proxy_request_logs WHERE request_id = 'hermes-session-s1'", + [], + |row| row.get(0), + )?; + assert_eq!(total, "0.456789"); + Ok(()) + } + + #[test] + fn test_matching_proxy_log_skips_hermes_session_insert() -> Result<(), AppError> { + let temp = tempdir().expect("tempdir"); + let db_path = temp.path().join("state.db"); + let hermes_conn = create_hermes_db(&db_path); + insert_session(&hermes_conn, "s1", 1000.0, Some(0.1), None); + drop(hermes_conn); + + let db = Database::memory()?; + { + let conn = lock_conn!(db.conn); + conn.execute( + "INSERT INTO proxy_request_logs ( + request_id, provider_id, app_type, model, request_model, + input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, + total_cost_usd, latency_ms, status_code, created_at, data_source + ) VALUES ('proxy-1', 'p1', 'hermes', 'openrouter/anthropic/claude-opus-4-7', + 'openrouter/anthropic/claude-opus-4-7', 10, 27, 3, 4, '0.1', 100, 200, 1000, 'proxy')", + [], + )?; + } + + let (imported, skipped) = sync_single_hermes_db(&db, &db_path)?; + assert_eq!(imported, 0); + assert_eq!(skipped, 1); + + let conn = lock_conn!(db.conn); + let count: i64 = conn.query_row("SELECT COUNT(*) FROM proxy_request_logs", [], |row| { + row.get(0) + })?; + assert_eq!(count, 1); + Ok(()) + } + + #[test] + fn test_hermes_provider_name_data_source_and_app_filter() -> Result<(), AppError> { + let temp = tempdir().expect("tempdir"); + let db_path = temp.path().join("state.db"); + let hermes_conn = create_hermes_db(&db_path); + insert_session(&hermes_conn, "s1", 1000.0, Some(0.1), None); + drop(hermes_conn); + + let db = Database::memory()?; + sync_single_hermes_db(&db, &db_path)?; + + let provider_stats = db.get_provider_stats(None, None, Some("hermes"))?; + assert_eq!(provider_stats.len(), 1); + assert_eq!(provider_stats[0].provider_id, HERMES_PROVIDER_ID); + assert_eq!(provider_stats[0].provider_name, "Hermes (Session)"); + + let sources = get_data_source_breakdown(&db)?; + assert!(sources + .iter() + .any(|source| source.data_source == HERMES_DATA_SOURCE && source.request_count == 1)); + + let logs = db.get_request_logs( + &LogFilters { + app_type: Some("hermes".to_string()), + ..Default::default() + }, + 0, + 20, + )?; + assert_eq!(logs.total, 1); + assert_eq!( + logs.data[0].provider_name.as_deref(), + Some("Hermes (Session)") + ); + Ok(()) + } + + #[test] + fn test_collect_hermes_profile_databases() { + let temp = tempdir().expect("tempdir"); + fs::write(temp.path().join("state.db"), b"root").expect("write root"); + let profile_dir = temp.path().join("profiles").join("work"); + fs::create_dir_all(&profile_dir).expect("profile dir"); + fs::write(profile_dir.join("state.db"), b"profile").expect("write profile"); + + let paths = collect_hermes_db_paths(temp.path()); + assert_eq!(paths.len(), 2); + assert!(paths.iter().any(|path| path.ends_with("state.db"))); + assert!(paths + .iter() + .any(|path| path.to_string_lossy().contains("profiles/work/state.db"))); + } +} diff --git a/src-tauri/src/services/usage_stats.rs b/src-tauri/src/services/usage_stats.rs index 9faef593b..4cc929630 100644 --- a/src-tauri/src/services/usage_stats.rs +++ b/src-tauri/src/services/usage_stats.rs @@ -117,29 +117,84 @@ pub struct RequestLogDetail { pub data_source: Option, } +/// 把 24 列的查询结果映射为 `RequestLogDetail`。 +/// +/// 调用方的 SELECT **必须**按以下顺序返回 24 列: +/// `request_id, provider_id, provider_name, app_type, model, request_model, +/// cost_multiplier, input_tokens, output_tokens, cache_read_tokens, +/// cache_creation_tokens, input_cost_usd, output_cost_usd, cache_read_cost_usd, +/// cache_creation_cost_usd, total_cost_usd, is_streaming, latency_ms, +/// first_token_ms, duration_ms, status_code, error_message, created_at, +/// data_source` +/// +/// 不需要 provider_name 时(如 backfill)SELECT `NULL AS provider_name` 占位即可。 +fn row_to_request_log_detail(row: &rusqlite::Row<'_>) -> rusqlite::Result { + Ok(RequestLogDetail { + request_id: row.get(0)?, + provider_id: row.get(1)?, + provider_name: row.get(2)?, + app_type: row.get(3)?, + model: row.get(4)?, + request_model: row.get(5)?, + cost_multiplier: row + .get::<_, Option>(6)? + .unwrap_or_else(|| "1".to_string()), + input_tokens: row.get::<_, i64>(7)? as u32, + output_tokens: row.get::<_, i64>(8)? as u32, + cache_read_tokens: row.get::<_, i64>(9)? as u32, + cache_creation_tokens: row.get::<_, i64>(10)? as u32, + input_cost_usd: row.get(11)?, + output_cost_usd: row.get(12)?, + cache_read_cost_usd: row.get(13)?, + cache_creation_cost_usd: row.get(14)?, + total_cost_usd: row.get(15)?, + is_streaming: row.get::<_, i64>(16)? != 0, + latency_ms: row.get::<_, i64>(17)? as u64, + first_token_ms: row.get::<_, Option>(18)?.map(|v| v as u64), + duration_ms: row.get::<_, Option>(19)?.map(|v| v as u64), + status_code: row.get::<_, i64>(20)? as u16, + error_message: row.get(21)?, + created_at: row.get(22)?, + data_source: row.get(23)?, + }) +} + /// SQL fragment: resolve provider_name with fallback for session-based entries. -/// Session logs use placeholder provider_ids (_session, _codex_session, _gemini_session) -/// that don't exist in the providers table — this COALESCE gives them readable names. +/// Session logs use placeholder provider_ids (e.g., `_session`, `__session`) +/// that don't exist in the providers table — the CASE expression below is the +/// authoritative mapping from placeholder to readable name. fn provider_name_coalesce(log_alias: &str, provider_alias: &str) -> String { format!( "COALESCE({provider_alias}.name, CASE {log_alias}.provider_id \ WHEN '_session' THEN 'Claude (Session)' \ WHEN '_codex_session' THEN 'Codex (Session)' \ WHEN '_gemini_session' THEN 'Gemini (Session)' \ + WHEN '_hermes_session' THEN 'Hermes (Session)' \ ELSE {log_alias}.provider_id END)" ) } pub(crate) const SESSION_PROXY_DEDUP_WINDOW_SECONDS: i64 = 10 * 60; +/// SQL 片段:把指定别名的 `data_source` 包成 COALESCE,NULL 视作 'proxy'。 +/// +/// 防御 schema v9 之前可能写入的 NULL data_source 行(见 +/// `tests::create_legacy_nullable_logs_table`)。所有用到 data_source 的查询 +/// 都应通过此 helper 生成片段,避免遗漏。 +fn data_source_expr(log_alias: &str) -> String { + format!("COALESCE({log_alias}.data_source, 'proxy')") +} + pub(crate) fn effective_usage_log_filter(log_alias: &str) -> String { + let data_source = data_source_expr(log_alias); + let proxy_data_source = data_source_expr("proxy_dedup"); format!( "NOT ( - {log_alias}.data_source IN ('session_log', 'codex_session', 'gemini_session') + {data_source} IN ('session_log', 'codex_session', 'gemini_session', 'hermes_session') AND EXISTS ( SELECT 1 FROM proxy_request_logs proxy_dedup - WHERE proxy_dedup.data_source = 'proxy' + WHERE {proxy_data_source} = 'proxy' AND proxy_dedup.app_type = {log_alias}.app_type AND proxy_dedup.status_code >= 200 AND proxy_dedup.status_code < 300 @@ -150,7 +205,7 @@ pub(crate) fn effective_usage_log_filter(log_alias: &str) -> String { proxy_dedup.cache_creation_tokens = {log_alias}.cache_creation_tokens OR ( {log_alias}.cache_creation_tokens = 0 - AND {log_alias}.data_source IN ('codex_session', 'gemini_session') + AND {data_source} IN ('codex_session', 'gemini_session') ) ) AND proxy_dedup.created_at BETWEEN @@ -212,11 +267,12 @@ pub(crate) fn has_matching_proxy_usage_log( let allow_missing_cache_creation = matches!(key.app_type, "codex" | "gemini") && key.cache_creation_tokens == 0; - conn.query_row( + let l_data_source = data_source_expr("l"); + let sql = format!( "SELECT EXISTS ( SELECT 1 FROM proxy_request_logs l - WHERE l.data_source = 'proxy' + WHERE {l_data_source} = 'proxy' AND l.app_type = ?1 AND l.status_code >= 200 AND l.status_code < 300 @@ -230,7 +286,11 @@ pub(crate) fn has_matching_proxy_usage_log( OR LOWER(l.model) = 'unknown' OR LOWER(?2) = 'unknown' ) - )", + )" + ); + + conn.query_row( + &sql, params![ key.app_type, key.model, @@ -1043,36 +1103,7 @@ impl Database { let mut stmt = conn.prepare(&sql)?; let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect(); - let rows = stmt.query_map(params_refs.as_slice(), |row| { - Ok(RequestLogDetail { - request_id: row.get(0)?, - provider_id: row.get(1)?, - provider_name: row.get(2)?, - app_type: row.get(3)?, - model: row.get(4)?, - request_model: row.get(5)?, - cost_multiplier: row - .get::<_, Option>(6)? - .unwrap_or_else(|| "1".to_string()), - input_tokens: row.get::<_, i64>(7)? as u32, - output_tokens: row.get::<_, i64>(8)? as u32, - cache_read_tokens: row.get::<_, i64>(9)? as u32, - cache_creation_tokens: row.get::<_, i64>(10)? as u32, - input_cost_usd: row.get(11)?, - output_cost_usd: row.get(12)?, - cache_read_cost_usd: row.get(13)?, - cache_creation_cost_usd: row.get(14)?, - total_cost_usd: row.get(15)?, - is_streaming: row.get::<_, i64>(16)? != 0, - latency_ms: row.get::<_, i64>(17)? as u64, - first_token_ms: row.get::<_, Option>(18)?.map(|v| v as u64), - duration_ms: row.get::<_, Option>(19)?.map(|v| v as u64), - status_code: row.get::<_, i64>(20)? as u16, - error_message: row.get(21)?, - created_at: row.get(22)?, - data_source: row.get(23)?, - }) - })?; + let rows = stmt.query_map(params_refs.as_slice(), row_to_request_log_detail)?; let mut logs = Vec::new(); let mut provider_cache = HashMap::new(); @@ -1116,36 +1147,7 @@ impl Database { LEFT JOIN providers p ON l.provider_id = p.id AND l.app_type = p.app_type WHERE l.request_id = ?" ); - let result = conn.query_row(&detail_sql, [request_id], |row| { - Ok(RequestLogDetail { - request_id: row.get(0)?, - provider_id: row.get(1)?, - provider_name: row.get(2)?, - app_type: row.get(3)?, - model: row.get(4)?, - request_model: row.get(5)?, - cost_multiplier: row - .get::<_, Option>(6)? - .unwrap_or_else(|| "1".to_string()), - input_tokens: row.get::<_, i64>(7)? as u32, - output_tokens: row.get::<_, i64>(8)? as u32, - cache_read_tokens: row.get::<_, i64>(9)? as u32, - cache_creation_tokens: row.get::<_, i64>(10)? as u32, - input_cost_usd: row.get(11)?, - output_cost_usd: row.get(12)?, - cache_read_cost_usd: row.get(13)?, - cache_creation_cost_usd: row.get(14)?, - total_cost_usd: row.get(15)?, - is_streaming: row.get::<_, i64>(16)? != 0, - latency_ms: row.get::<_, i64>(17)? as u64, - first_token_ms: row.get::<_, Option>(18)?.map(|v| v as u64), - duration_ms: row.get::<_, Option>(19)?.map(|v| v as u64), - status_code: row.get::<_, i64>(20)? as u16, - error_message: row.get(21)?, - created_at: row.get(22)?, - data_source: row.get(23)?, - }) - }); + let result = conn.query_row(&detail_sql, [request_id], row_to_request_log_detail); match result { Ok(mut detail) => { @@ -1276,27 +1278,80 @@ struct PricingInfo { } impl Database { + /// Recalculate stored zero-cost usage rows once pricing becomes available. + pub(crate) fn backfill_missing_usage_costs(&self) -> Result { + let conn = lock_conn!(self.conn); + Self::backfill_missing_usage_costs_on_conn(&conn) + } + + fn backfill_missing_usage_costs_on_conn(conn: &Connection) -> Result { + let mut logs = { + let mut stmt = conn.prepare( + "SELECT request_id, provider_id, NULL AS provider_name, app_type, model, request_model, + cost_multiplier, + input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, + input_cost_usd, output_cost_usd, cache_read_cost_usd, + cache_creation_cost_usd, total_cost_usd, is_streaming, latency_ms, + first_token_ms, duration_ms, status_code, error_message, created_at, + data_source + FROM proxy_request_logs + WHERE CAST(total_cost_usd AS REAL) <= 0 + AND (input_tokens > 0 OR output_tokens > 0 + OR cache_read_tokens > 0 OR cache_creation_tokens > 0)", + )?; + + let rows = stmt.query_map([], row_to_request_log_detail)?; + rows.collect::, _>>()? + }; + + if logs.is_empty() { + return Ok(0); + } + + let tx = conn + .unchecked_transaction() + .map_err(|e| AppError::Database(format!("启动用量成本回填事务失败: {e}")))?; + + let mut updated = 0u64; + let mut provider_cache = HashMap::new(); + let mut pricing_cache = HashMap::new(); + for log in &mut logs { + if Self::maybe_backfill_log_costs(&tx, log, &mut provider_cache, &mut pricing_cache)? { + updated += 1; + } + } + tx.commit() + .map_err(|e| AppError::Database(format!("提交用量成本回填事务失败: {e}")))?; + + if updated > 0 { + log::info!("已回填 {updated} 条缺失的用量成本"); + } + + Ok(updated) + } + + /// 尝试为单条 log 回填成本字段。返回是否实际写入(true=已 UPDATE,false=跳过)。 fn maybe_backfill_log_costs( conn: &Connection, log: &mut RequestLogDetail, provider_cache: &mut HashMap<(String, String), rust_decimal::Decimal>, pricing_cache: &mut HashMap, - ) -> Result<(), AppError> { - let total_cost = rust_decimal::Decimal::from_str(&log.total_cost_usd) + ) -> Result { + let existing_cost = rust_decimal::Decimal::from_str(&log.total_cost_usd) .unwrap_or(rust_decimal::Decimal::ZERO); - let has_cost = total_cost > rust_decimal::Decimal::ZERO; + let has_cost = existing_cost > rust_decimal::Decimal::ZERO; let has_usage = log.input_tokens > 0 || log.output_tokens > 0 || log.cache_read_tokens > 0 || log.cache_creation_tokens > 0; if has_cost || !has_usage { - return Ok(()); + return Ok(false); } let pricing = match Self::get_model_pricing_cached(conn, pricing_cache, &log.model)? { Some(info) => info, - None => return Ok(()), + None => return Ok(false), }; let multiplier = Self::get_cost_multiplier_cached( conn, @@ -1352,7 +1407,7 @@ impl Database { ) .map_err(|e| AppError::Database(format!("更新请求成本失败: {e}")))?; - Ok(()) + Ok(true) } fn get_cost_multiplier_cached( @@ -1431,7 +1486,8 @@ pub(crate) fn find_model_pricing_row( .next() .unwrap_or(model_id) .trim() - .replace('@', "-"); + .replace('@', "-") + .to_ascii_lowercase(); // 精确匹配清洗后的名称 let exact = conn @@ -1514,6 +1570,110 @@ mod tests { Ok(()) } + fn create_legacy_nullable_logs_table(conn: &Connection) -> Result<(), AppError> { + conn.execute( + "CREATE TABLE proxy_request_logs ( + request_id TEXT PRIMARY KEY, + app_type TEXT NOT NULL, + model TEXT NOT NULL, + input_tokens INTEGER NOT NULL, + output_tokens INTEGER NOT NULL, + cache_read_tokens INTEGER NOT NULL, + cache_creation_tokens INTEGER NOT NULL, + status_code INTEGER NOT NULL, + created_at INTEGER NOT NULL, + data_source TEXT + )", + [], + )?; + Ok(()) + } + + #[test] + fn test_effective_filter_keeps_legacy_null_data_source_proxy_rows() -> Result<(), AppError> { + let conn = Connection::open_in_memory()?; + create_legacy_nullable_logs_table(&conn)?; + conn.execute( + "INSERT INTO proxy_request_logs ( + request_id, app_type, model, input_tokens, output_tokens, + cache_read_tokens, cache_creation_tokens, status_code, created_at, data_source + ) VALUES ('legacy-proxy', 'codex', 'gpt-5.5', 10, 2, 1, 0, 200, 1000, NULL)", + [], + )?; + + let filter = effective_usage_log_filter("l"); + let sql = format!("SELECT COUNT(*) FROM proxy_request_logs l WHERE {filter}"); + let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?; + assert_eq!(count, 1); + + Ok(()) + } + + #[test] + fn test_matching_proxy_log_treats_legacy_null_data_source_as_proxy() -> Result<(), AppError> { + let conn = Connection::open_in_memory()?; + create_legacy_nullable_logs_table(&conn)?; + conn.execute( + "INSERT INTO proxy_request_logs ( + request_id, app_type, model, input_tokens, output_tokens, + cache_read_tokens, cache_creation_tokens, status_code, created_at, data_source + ) VALUES ('legacy-proxy', 'codex', 'gpt-5.5', 10, 2, 1, 0, 200, 1000, NULL)", + [], + )?; + + let key = DedupKey { + app_type: "codex", + model: "gpt-5.5", + input_tokens: 10, + output_tokens: 2, + cache_read_tokens: 1, + cache_creation_tokens: 0, + created_at: 1000, + }; + assert!(has_matching_proxy_usage_log(&conn, &key)?); + + Ok(()) + } + + #[test] + fn test_backfill_missing_usage_costs_uses_new_gpt_5_5_pricing() -> Result<(), AppError> { + let db = Database::memory()?; + + { + let conn = lock_conn!(db.conn); + insert_usage_log( + &conn, + "codex-gpt-5-5-zero-cost", + "codex", + "_codex_session", + "gpt-5.5", + "codex_session", + 1000, + 1_000_000, + 1_000_000, + 0, + 0, + 200, + "0", + )?; + } + + assert_eq!(db.backfill_missing_usage_costs()?, 1); + + let conn = lock_conn!(db.conn); + let (input_cost, output_cost, total_cost): (String, String, String) = conn.query_row( + "SELECT input_cost_usd, output_cost_usd, total_cost_usd + FROM proxy_request_logs WHERE request_id = 'codex-gpt-5-5-zero-cost'", + [], + |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)), + )?; + assert_eq!(input_cost, "5.000000"); + assert_eq!(output_cost, "30.000000"); + assert_eq!(total_cost, "35.000000"); + + Ok(()) + } + #[test] fn test_get_usage_summary() -> Result<(), AppError> { let db = Database::memory()?; @@ -2436,6 +2596,11 @@ mod tests { result.is_some(), "带 @ 分隔符的模型 gpt-5.2-codex@low 应能匹配到 gpt-5.2-codex-low" ); + let result = find_model_pricing_row(&conn, "OpenAI/GPT-5.5@HIGH")?; + assert!( + result.is_some(), + "大小写混合的 GPT-5.5 模型应能归一化匹配到 gpt-5.5-high" + ); // 测试不存在的模型 let result = find_model_pricing_row(&conn, "unknown-model-123")?; diff --git a/src/components/usage/DataSourceBar.tsx b/src/components/usage/DataSourceBar.tsx index 7dabc51a8..88e856d7a 100644 --- a/src/components/usage/DataSourceBar.tsx +++ b/src/components/usage/DataSourceBar.tsx @@ -17,6 +17,7 @@ const DATA_SOURCE_ICONS: Record = { codex_db: , codex_session: , gemini_session: , + hermes_session: , }; export function DataSourceBar({ refreshIntervalMs }: DataSourceBarProps) { diff --git a/src/components/usage/RequestLogTable.tsx b/src/components/usage/RequestLogTable.tsx index f1f2835f9..0842a1855 100644 --- a/src/components/usage/RequestLogTable.tsx +++ b/src/components/usage/RequestLogTable.tsx @@ -141,6 +141,7 @@ export function RequestLogTable({ Claude Codex Gemini + Hermes diff --git a/src/components/usage/UsageDashboard.tsx b/src/components/usage/UsageDashboard.tsx index 97bc1a80c..ef5233a6f 100644 --- a/src/components/usage/UsageDashboard.tsx +++ b/src/components/usage/UsageDashboard.tsx @@ -35,6 +35,7 @@ const APP_FILTER_OPTIONS: AppTypeFilter[] = [ "claude", "codex", "gemini", + "hermes", ]; export function UsageDashboard() { diff --git a/src/i18n/locales/en.json b/src/i18n/locales/en.json index 766e018b6..26e4b09ed 100644 --- a/src/i18n/locales/en.json +++ b/src/i18n/locales/en.json @@ -1120,7 +1120,8 @@ "all": "All", "claude": "Claude Code", "codex": "Codex", - "gemini": "Gemini" + "gemini": "Gemini", + "hermes": "Hermes" }, "dataSources": "Data Sources", "dataSource": { @@ -1128,7 +1129,8 @@ "session_log": "Session Log", "codex_db": "Codex DB", "codex_session": "Codex Session", - "gemini_session": "Gemini Session" + "gemini_session": "Gemini Session", + "hermes_session": "Hermes Session" }, "sessionSync": { "trigger": "Sync session logs", diff --git a/src/i18n/locales/ja.json b/src/i18n/locales/ja.json index a4e8e412f..a351f0d16 100644 --- a/src/i18n/locales/ja.json +++ b/src/i18n/locales/ja.json @@ -1120,7 +1120,8 @@ "all": "すべて", "claude": "Claude Code", "codex": "Codex", - "gemini": "Gemini" + "gemini": "Gemini", + "hermes": "Hermes" }, "dataSources": "データソース", "dataSource": { @@ -1128,7 +1129,8 @@ "session_log": "セッションログ", "codex_db": "Codex DB", "codex_session": "Codex セッション", - "gemini_session": "Gemini セッション" + "gemini_session": "Gemini セッション", + "hermes_session": "Hermes セッション" }, "sessionSync": { "trigger": "セッションログを同期", diff --git a/src/i18n/locales/zh.json b/src/i18n/locales/zh.json index ae148fe8a..19e1c21cd 100644 --- a/src/i18n/locales/zh.json +++ b/src/i18n/locales/zh.json @@ -1120,7 +1120,8 @@ "all": "全部", "claude": "Claude Code", "codex": "Codex", - "gemini": "Gemini" + "gemini": "Gemini", + "hermes": "Hermes" }, "dataSources": "数据来源", "dataSource": { @@ -1128,7 +1129,8 @@ "session_log": "会话日志", "codex_db": "Codex 数据库", "codex_session": "Codex 会话日志", - "gemini_session": "Gemini 会话日志" + "gemini_session": "Gemini 会话日志", + "hermes_session": "Hermes 会话日志" }, "sessionSync": { "trigger": "同步会话日志", diff --git a/src/types/usage.ts b/src/types/usage.ts index c595c1cf7..b48aa152d 100644 --- a/src/types/usage.ts +++ b/src/types/usage.ts @@ -129,7 +129,7 @@ export interface UsageRangeSelection { customEndDate?: number; } -export type AppTypeFilter = "all" | "claude" | "codex" | "gemini"; +export type AppTypeFilter = "all" | "claude" | "codex" | "gemini" | "hermes"; export interface StatsFilters { timeRange: UsageRangePreset;