diff --git a/src-tauri/src/commands/usage.rs b/src-tauri/src/commands/usage.rs index 4e527ebbe..1f92456ef 100644 --- a/src-tauri/src/commands/usage.rs +++ b/src-tauri/src/commands/usage.rs @@ -166,6 +166,38 @@ pub fn delete_model_pricing(state: State<'_, AppState>, model_id: String) -> Res Ok(()) } +/// 手动触发会话日志同步 +#[tauri::command] +pub fn sync_session_usage( + state: State<'_, AppState>, +) -> Result { + // 同步 Claude 会话日志 + let mut result = crate::services::session_usage::sync_claude_session_logs(&state.db)?; + + // 同步 Codex 使用数据 + match crate::services::session_usage_codex::sync_codex_usage(&state.db) { + Ok(codex_result) => { + result.imported += codex_result.imported; + result.skipped += codex_result.skipped; + result.files_scanned += codex_result.files_scanned; + result.errors.extend(codex_result.errors); + } + Err(e) => { + result.errors.push(format!("Codex 同步失败: {e}")); + } + } + + Ok(result) +} + +/// 获取数据来源分布 +#[tauri::command] +pub fn get_usage_data_sources( + state: State<'_, AppState>, +) -> Result, AppError> { + crate::services::session_usage::get_data_source_breakdown(&state.db) +} + /// 模型定价信息 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/src-tauri/src/database/mod.rs b/src-tauri/src/database/mod.rs index d093c0971..1e5f1fe23 100644 --- a/src-tauri/src/database/mod.rs +++ b/src-tauri/src/database/mod.rs @@ -44,7 +44,7 @@ use std::sync::Mutex; /// 当前 Schema 版本号 /// 每次修改表结构时递增,并在 schema.rs 中添加相应的迁移逻辑 -pub(crate) const SCHEMA_VERSION: i32 = 7; +pub(crate) const SCHEMA_VERSION: i32 = 8; /// 安全地序列化 JSON,避免 unwrap panic pub(crate) fn to_json_string(value: &T) -> Result { diff --git a/src-tauri/src/database/schema.rs b/src-tauri/src/database/schema.rs index e85d8827a..b3e9d26e2 100644 --- a/src-tauri/src/database/schema.rs +++ b/src-tauri/src/database/schema.rs @@ -189,7 +189,8 @@ impl Database { total_cost_usd TEXT NOT NULL DEFAULT '0', latency_ms INTEGER NOT NULL, first_token_ms INTEGER, duration_ms INTEGER, status_code INTEGER NOT NULL, error_message TEXT, session_id TEXT, provider_type TEXT, is_streaming INTEGER NOT NULL DEFAULT 0, - cost_multiplier TEXT NOT NULL DEFAULT '1.0', created_at INTEGER NOT NULL + cost_multiplier TEXT NOT NULL DEFAULT '1.0', created_at INTEGER NOT NULL, + data_source TEXT NOT NULL DEFAULT 'proxy' )", []).map_err(|e| AppError::Database(e.to_string()))?; conn.execute("CREATE INDEX IF NOT EXISTS idx_request_logs_provider ON proxy_request_logs(provider_id, app_type)", []) @@ -271,6 +272,18 @@ impl Database { ) .map_err(|e| AppError::Database(e.to_string()))?; + // 18. Session Log Sync 表 (会话日志同步状态) + conn.execute( + "CREATE TABLE IF NOT EXISTS session_log_sync ( + file_path TEXT PRIMARY KEY, + last_modified INTEGER NOT NULL, + last_line_offset INTEGER NOT NULL DEFAULT 0, + last_synced_at INTEGER NOT NULL + )", + [], + ) + .map_err(|e| AppError::Database(e.to_string()))?; + // 尝试添加 live_takeover_active 列到 proxy_config 表 let _ = conn.execute( "ALTER TABLE proxy_config ADD COLUMN live_takeover_active INTEGER NOT NULL DEFAULT 0", @@ -400,6 +413,11 @@ impl Database { Self::migrate_v6_to_v7(conn)?; Self::set_user_version(conn, 7)?; } + 7 => { + log::info!("迁移数据库从 v7 到 v8(会话日志使用追踪)"); + Self::migrate_v7_to_v8(conn)?; + Self::set_user_version(conn, 8)?; + } _ => { return Err(AppError::Database(format!( "未知的数据库版本 {version},无法迁移到 {SCHEMA_VERSION}" @@ -1060,6 +1078,34 @@ impl Database { Ok(()) } + /// v7 -> v8: 会话日志使用追踪(无代理模式统计支持) + fn migrate_v7_to_v8(conn: &Connection) -> Result<(), AppError> { + // 1. 为 proxy_request_logs 添加 data_source 列,区分数据来源 + if Self::table_exists(conn, "proxy_request_logs")? { + Self::add_column_if_missing( + conn, + "proxy_request_logs", + "data_source", + "TEXT NOT NULL DEFAULT 'proxy'", + )?; + } + + // 2. 创建会话日志同步状态表 + conn.execute( + "CREATE TABLE IF NOT EXISTS session_log_sync ( + file_path TEXT PRIMARY KEY, + last_modified INTEGER NOT NULL, + last_line_offset INTEGER NOT NULL DEFAULT 0, + last_synced_at INTEGER NOT NULL + )", + [], + ) + .map_err(|e| AppError::Database(format!("创建 session_log_sync 表失败: {e}")))?; + + log::info!("v7 -> v8 迁移完成:已添加 data_source 列和 session_log_sync 表"); + Ok(()) + } + /// 插入默认模型定价数据 /// 格式: (model_id, display_name, input, output, cache_read, cache_creation) /// 注意: model_id 使用短横线格式(如 claude-haiku-4-5),与 API 返回的模型名称标准化后一致 diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index d7490e924..186215b89 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -796,6 +796,51 @@ pub fn run() { } } }); + + // Session log usage sync: 启动时同步一次,之后每 60 秒检查 + let db_for_session_sync = state.db.clone(); + tauri::async_runtime::spawn(async move { + const SESSION_SYNC_INTERVAL_SECS: u64 = 60; + + // 首次同步 + if let Err(e) = + crate::services::session_usage::sync_claude_session_logs( + &db_for_session_sync, + ) + { + log::warn!("Session usage initial sync failed: {e}"); + } + if let Err(e) = + crate::services::session_usage_codex::sync_codex_usage( + &db_for_session_sync, + ) + { + log::warn!("Codex usage initial sync failed: {e}"); + } + + // 定期同步 + let mut interval = tokio::time::interval(std::time::Duration::from_secs( + SESSION_SYNC_INTERVAL_SECS, + )); + interval.tick().await; // skip immediate first tick + loop { + interval.tick().await; + if let Err(e) = + crate::services::session_usage::sync_claude_session_logs( + &db_for_session_sync, + ) + { + log::warn!("Session usage periodic sync failed: {e}"); + } + if let Err(e) = + crate::services::session_usage_codex::sync_codex_usage( + &db_for_session_sync, + ) + { + log::warn!("Codex usage periodic sync failed: {e}"); + } + } + }); }); // Linux: 禁用 WebKitGTK 硬件加速,防止 EGL 初始化失败导致白屏 @@ -1025,6 +1070,9 @@ pub fn run() { commands::update_model_pricing, commands::delete_model_pricing, commands::check_provider_limits, + // Session usage sync + commands::sync_session_usage, + commands::get_usage_data_sources, // Stream health check commands::stream_check_provider, commands::stream_check_all_providers, diff --git a/src-tauri/src/services/mod.rs b/src-tauri/src/services/mod.rs index 64c1f641b..d64711245 100644 --- a/src-tauri/src/services/mod.rs +++ b/src-tauri/src/services/mod.rs @@ -9,6 +9,8 @@ pub mod omo; pub mod prompt; pub mod provider; pub mod proxy; +pub mod session_usage; +pub mod session_usage_codex; pub mod skill; pub mod speedtest; pub mod stream_check; diff --git a/src-tauri/src/services/session_usage.rs b/src-tauri/src/services/session_usage.rs new file mode 100644 index 000000000..dd4bbbb92 --- /dev/null +++ b/src-tauri/src/services/session_usage.rs @@ -0,0 +1,634 @@ +//! Claude Code 会话日志使用追踪 +//! +//! 从 ~/.claude/projects/ 下的 JSONL 会话文件中提取 token 使用数据, +//! 实现无代理模式下的使用统计。 +//! +//! ## 数据流 +//! ```text +//! ~/.claude/projects/*/*.jsonl → 增量解析 → 去重 → 费用计算 → proxy_request_logs 表 +//! ``` + +use crate::config::get_claude_config_dir; +use crate::database::{lock_conn, Database}; +use crate::error::AppError; +use crate::proxy::usage::calculator::{CostCalculator, ModelPricing}; +use crate::proxy::usage::parser::TokenUsage; +use rust_decimal::Decimal; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::fs; +use std::io::{BufRead, BufReader}; +use std::path::{Path, PathBuf}; +use std::time::SystemTime; + +/// 同步结果 +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SessionSyncResult { + pub imported: u32, + pub skipped: u32, + pub files_scanned: u32, + pub errors: Vec, +} + +/// 数据来源分布 +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct DataSourceSummary { + pub data_source: String, + pub request_count: u32, + pub total_cost_usd: String, +} + +/// 从 JSONL 中解析出的 assistant 消息使用数据 +#[derive(Debug)] +struct ParsedAssistantUsage { + message_id: String, + model: String, + input_tokens: u32, + output_tokens: u32, + cache_read_tokens: u32, + cache_creation_tokens: u32, + stop_reason: Option, + timestamp: Option, + session_id: Option, +} + +/// 同步 Claude Code 会话日志到使用统计数据库 +pub fn sync_claude_session_logs(db: &Database) -> Result { + let projects_dir = get_claude_config_dir().join("projects"); + if !projects_dir.exists() { + return Ok(SessionSyncResult { + imported: 0, + skipped: 0, + files_scanned: 0, + errors: vec![], + }); + } + + let mut result = SessionSyncResult { + imported: 0, + skipped: 0, + files_scanned: 0, + errors: vec![], + }; + + // 收集所有 .jsonl 文件 + let jsonl_files = collect_jsonl_files(&projects_dir); + + for file_path in &jsonl_files { + result.files_scanned += 1; + + match sync_single_file(db, file_path) { + Ok((imported, skipped)) => { + result.imported += imported; + result.skipped += skipped; + } + Err(e) => { + let msg = format!("{}: {e}", file_path.display()); + log::warn!("[SESSION-SYNC] 文件解析失败: {msg}"); + result.errors.push(msg); + } + } + } + + if result.imported > 0 { + log::info!( + "[SESSION-SYNC] 同步完成: 导入 {} 条, 跳过 {} 条, 扫描 {} 个文件", + result.imported, + result.skipped, + result.files_scanned + ); + } + + Ok(result) +} + +/// 收集目录下所有 .jsonl 文件 +fn collect_jsonl_files(projects_dir: &Path) -> Vec { + let mut files = Vec::new(); + + let entries = match fs::read_dir(projects_dir) { + Ok(e) => e, + Err(_) => return files, + }; + + for entry in entries.flatten() { + let path = entry.path(); + if !path.is_dir() { + continue; + } + // 每个项目目录下的 .jsonl 文件 + if let Ok(sub_entries) = fs::read_dir(&path) { + for sub_entry in sub_entries.flatten() { + let sub_path = sub_entry.path(); + if sub_path.extension().and_then(|e| e.to_str()) == Some("jsonl") { + files.push(sub_path); + } + } + } + } + + files +} + +/// 同步单个 JSONL 文件,返回 (imported, skipped) +fn sync_single_file(db: &Database, file_path: &Path) -> Result<(u32, u32), AppError> { + let file_path_str = file_path.to_string_lossy().to_string(); + + // 获取文件元数据 + let metadata = fs::metadata(file_path) + .map_err(|e| AppError::Config(format!("无法读取文件元数据: {e}")))?; + let file_modified = metadata + .modified() + .ok() + .and_then(|t| t.duration_since(SystemTime::UNIX_EPOCH).ok()) + .map(|d| d.as_secs() as i64) + .unwrap_or(0); + + // 检查同步状态 + let (last_modified, last_offset) = get_sync_state(db, &file_path_str)?; + + // 文件未变化则跳过 + if file_modified <= last_modified { + return Ok((0, 0)); + } + + // 从上次偏移位置开始增量解析 + let file = fs::File::open(file_path) + .map_err(|e| AppError::Config(format!("无法打开文件: {e}")))?; + let reader = BufReader::new(file); + + let mut line_offset: i64 = 0; + let mut messages: HashMap = HashMap::new(); + let mut current_session_id: Option = None; + + for line_result in reader.lines() { + line_offset += 1; + + // 跳过已处理的行 + if line_offset <= last_offset { + continue; + } + + let line = match line_result { + Ok(l) => l, + Err(_) => continue, // 容忍不完整的最后一行 + }; + + if line.trim().is_empty() { + continue; + } + + let value: serde_json::Value = match serde_json::from_str(&line) { + Ok(v) => v, + Err(_) => continue, + }; + + // 提取 session ID (从 system 或首条消息) + if current_session_id.is_none() { + if let Some(sid) = value.get("sessionId").and_then(|v| v.as_str()) { + current_session_id = Some(sid.to_string()); + } + } + + // 只处理 assistant 类型的消息 + if value.get("type").and_then(|t| t.as_str()) != Some("assistant") { + continue; + } + + let message = match value.get("message") { + Some(m) => m, + None => continue, + }; + + let msg_id = match message.get("id").and_then(|v| v.as_str()) { + Some(id) => id.to_string(), + None => continue, + }; + + let usage = match message.get("usage") { + Some(u) => u, + None => continue, + }; + + let parsed = ParsedAssistantUsage { + message_id: msg_id.clone(), + model: message + .get("model") + .and_then(|v| v.as_str()) + .unwrap_or("unknown") + .to_string(), + input_tokens: usage + .get("input_tokens") + .and_then(|v| v.as_u64()) + .unwrap_or(0) as u32, + output_tokens: usage + .get("output_tokens") + .and_then(|v| v.as_u64()) + .unwrap_or(0) as u32, + cache_read_tokens: usage + .get("cache_read_input_tokens") + .and_then(|v| v.as_u64()) + .unwrap_or(0) as u32, + cache_creation_tokens: usage + .get("cache_creation_input_tokens") + .and_then(|v| v.as_u64()) + .unwrap_or(0) as u32, + stop_reason: message + .get("stop_reason") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + timestamp: value + .get("timestamp") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()), + session_id: current_session_id.clone(), + }; + + // 按 message.id 去重:优先保留有 stop_reason 的条目,否则保留最新的 + let should_replace = match messages.get(&msg_id) { + None => true, + Some(existing) => { + // 新条目有 stop_reason 而旧条目没有 → 替换 + if parsed.stop_reason.is_some() && existing.stop_reason.is_none() { + true + } + // 两个都有或都没有 stop_reason → 取 output_tokens 更大的 + else if parsed.stop_reason.is_some() == existing.stop_reason.is_some() { + parsed.output_tokens > existing.output_tokens + } else { + false + } + } + }; + + if should_replace { + messages.insert(msg_id, parsed); + } + } + + // 写入数据库 + let mut imported: u32 = 0; + let mut skipped: u32 = 0; + + for (_, msg) in &messages { + // 只导入有 stop_reason 的最终条目(完整的 API 调用) + if msg.stop_reason.is_none() { + continue; + } + + let request_id = format!("session:{}", msg.message_id); + + // 跳过 output_tokens 为 0 的无意义条目 + if msg.output_tokens == 0 { + continue; + } + + match insert_session_log_entry(db, &request_id, msg) { + Ok(true) => imported += 1, + Ok(false) => skipped += 1, + Err(e) => { + log::warn!("[SESSION-SYNC] 插入失败 ({}): {e}", msg.message_id); + skipped += 1; + } + } + } + + // 更新同步状态 + update_sync_state(db, &file_path_str, file_modified, line_offset)?; + + Ok((imported, skipped)) +} + +/// 获取文件的同步状态 +fn get_sync_state(db: &Database, file_path: &str) -> Result<(i64, i64), AppError> { + let conn = lock_conn!(db.conn); + let result = conn.query_row( + "SELECT last_modified, last_line_offset FROM session_log_sync WHERE file_path = ?1", + rusqlite::params![file_path], + |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)), + ); + Ok(result.unwrap_or((0, 0))) +} + +/// 更新文件的同步状态 +fn update_sync_state( + db: &Database, + file_path: &str, + last_modified: i64, + last_offset: i64, +) -> Result<(), AppError> { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or(0); + + let conn = lock_conn!(db.conn); + conn.execute( + "INSERT OR REPLACE INTO session_log_sync (file_path, last_modified, last_line_offset, last_synced_at) + VALUES (?1, ?2, ?3, ?4)", + rusqlite::params![file_path, last_modified, last_offset, now], + ) + .map_err(|e| AppError::Database(format!("更新同步状态失败: {e}")))?; + Ok(()) +} + +/// 插入单条会话日志到 proxy_request_logs,返回是否成功插入 (true=新插入, false=已存在) +fn insert_session_log_entry( + db: &Database, + request_id: &str, + msg: &ParsedAssistantUsage, +) -> Result { + let conn = lock_conn!(db.conn); + + // 检查是否已存在 + let exists: bool = conn + .query_row( + "SELECT COUNT(*) FROM proxy_request_logs WHERE request_id = ?1", + rusqlite::params![request_id], + |row| row.get::<_, i64>(0).map(|c| c > 0), + ) + .unwrap_or(false); + + if exists { + return Ok(false); + } + + // 解析时间戳 + let created_at = msg + .timestamp + .as_ref() + .and_then(|ts| { + // 尝试解析 ISO 8601 时间戳 + chrono::DateTime::parse_from_rfc3339(ts) + .ok() + .map(|dt| dt.timestamp()) + }) + .unwrap_or_else(|| { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .unwrap_or(0) + }); + + // 计算费用 + let usage = TokenUsage { + input_tokens: msg.input_tokens, + output_tokens: msg.output_tokens, + cache_read_tokens: msg.cache_read_tokens, + cache_creation_tokens: msg.cache_creation_tokens, + model: Some(msg.model.clone()), + }; + + let pricing = find_model_pricing_for_session(&conn, &msg.model); + let multiplier = Decimal::from(1); + let (input_cost, output_cost, cache_read_cost, cache_creation_cost, total_cost) = + match pricing { + Some(p) => { + let cost = CostCalculator::calculate(&usage, &p, multiplier); + ( + cost.input_cost.to_string(), + cost.output_cost.to_string(), + cost.cache_read_cost.to_string(), + cost.cache_creation_cost.to_string(), + cost.total_cost.to_string(), + ) + } + None => ( + "0".to_string(), + "0".to_string(), + "0".to_string(), + "0".to_string(), + "0".to_string(), + ), + }; + + conn.execute( + "INSERT OR IGNORE INTO proxy_request_logs ( + request_id, provider_id, app_type, model, request_model, + input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, + input_cost_usd, output_cost_usd, cache_read_cost_usd, cache_creation_cost_usd, total_cost_usd, + latency_ms, first_token_ms, status_code, error_message, session_id, + provider_type, is_streaming, cost_multiplier, created_at, data_source + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24)", + rusqlite::params![ + request_id, + "_session", // provider_id: 标记为会话来源 + "claude", // app_type + msg.model, + msg.model, // request_model = model + msg.input_tokens, + msg.output_tokens, + msg.cache_read_tokens, + msg.cache_creation_tokens, + input_cost, + output_cost, + cache_read_cost, + cache_creation_cost, + total_cost, + 0i64, // latency_ms: 会话日志无此数据 + Option::::None, // first_token_ms + 200i64, // status_code: 有 stop_reason 说明请求成功 + Option::::None, // error_message + msg.session_id, + Some("session_log"), // provider_type + 1i64, // is_streaming: Claude Code 通常使用流式 + "1.0", // cost_multiplier + created_at, + "session_log", // data_source + ], + ) + .map_err(|e| AppError::Database(format!("插入会话日志失败: {e}")))?; + + Ok(true) +} + +/// 从 model_pricing 表查找模型定价(支持模糊匹配) +fn find_model_pricing_for_session( + conn: &rusqlite::Connection, + model_id: &str, +) -> Option { + // 精确匹配 + if let Ok(Some(pricing)) = try_find_pricing(conn, model_id) { + return Some(pricing); + } + + // 模糊匹配:去掉日期后缀 + // 例如 "claude-opus-4-6-20260206" -> "claude-opus-4-6" + let parts: Vec<&str> = model_id.rsplitn(2, '-').collect(); + if parts.len() == 2 { + if let Some(suffix) = parts.first() { + if suffix.len() == 8 && suffix.chars().all(|c| c.is_ascii_digit()) { + if let Ok(Some(pricing)) = try_find_pricing(conn, parts[1]) { + return Some(pricing); + } + } + } + } + + // 尝试 LIKE 匹配 + let pattern = format!("{model_id}%"); + let result = conn.query_row( + "SELECT input_cost_per_million, output_cost_per_million, + cache_read_cost_per_million, cache_creation_cost_per_million + FROM model_pricing WHERE model_id LIKE ?1 LIMIT 1", + rusqlite::params![pattern], + |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, String>(2)?, + row.get::<_, String>(3)?, + )) + }, + ); + + match result { + Ok((input, output, cache_read, cache_creation)) => { + ModelPricing::from_strings(&input, &output, &cache_read, &cache_creation).ok() + } + Err(_) => None, + } +} + +fn try_find_pricing( + conn: &rusqlite::Connection, + model_id: &str, +) -> Result, AppError> { + let result = conn.query_row( + "SELECT input_cost_per_million, output_cost_per_million, + cache_read_cost_per_million, cache_creation_cost_per_million + FROM model_pricing WHERE model_id = ?1", + rusqlite::params![model_id], + |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, String>(2)?, + row.get::<_, String>(3)?, + )) + }, + ); + + match result { + Ok((input, output, cache_read, cache_creation)) => { + ModelPricing::from_strings(&input, &output, &cache_read, &cache_creation) + .map(Some) + .map_err(|e| AppError::Database(format!("解析定价失败: {e}"))) + } + Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(AppError::Database(format!("查询定价失败: {e}"))), + } +} + +/// 查询数据来源分布统计 +pub fn get_data_source_breakdown(db: &Database) -> Result, AppError> { + let conn = lock_conn!(db.conn); + + let mut stmt = conn.prepare( + "SELECT COALESCE(data_source, 'proxy') as ds, COUNT(*) as cnt, + COALESCE(SUM(CAST(total_cost_usd AS REAL)), 0) as cost + FROM proxy_request_logs + GROUP BY ds + ORDER BY cnt DESC", + )?; + + let rows = stmt.query_map([], |row| { + Ok(DataSourceSummary { + data_source: row.get(0)?, + request_count: row.get::<_, i64>(1)? as u32, + total_cost_usd: format!("{:.6}", row.get::<_, f64>(2)?), + }) + })?; + + let mut summaries = Vec::new(); + for row in rows { + summaries.push(row.map_err(|e| AppError::Database(e.to_string()))?); + } + + Ok(summaries) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_usage_from_jsonl_line() { + let line = r#"{"type":"assistant","message":{"id":"msg_test123","model":"claude-opus-4-6","usage":{"input_tokens":3,"output_tokens":150,"cache_read_input_tokens":5000,"cache_creation_input_tokens":10000},"stop_reason":"end_turn"},"timestamp":"2026-04-05T12:00:00Z","sessionId":"session-abc"}"#; + + let value: serde_json::Value = serde_json::from_str(line).unwrap(); + assert_eq!( + value.get("type").and_then(|t| t.as_str()), + Some("assistant") + ); + + let message = value.get("message").unwrap(); + let usage = message.get("usage").unwrap(); + + assert_eq!(usage.get("input_tokens").unwrap().as_u64().unwrap(), 3); + assert_eq!(usage.get("output_tokens").unwrap().as_u64().unwrap(), 150); + assert_eq!( + usage + .get("cache_read_input_tokens") + .unwrap() + .as_u64() + .unwrap(), + 5000 + ); + assert_eq!( + usage + .get("cache_creation_input_tokens") + .unwrap() + .as_u64() + .unwrap(), + 10000 + ); + assert_eq!( + message.get("stop_reason").unwrap().as_str().unwrap(), + "end_turn" + ); + } + + #[test] + fn test_dedup_by_message_id() { + // 同一个 message.id 有多条,应该取 stop_reason 有值的那条 + let mut messages: HashMap = HashMap::new(); + + // 中间条目(无 stop_reason) + let intermediate = ParsedAssistantUsage { + message_id: "msg_1".to_string(), + model: "claude-opus-4-6".to_string(), + input_tokens: 3, + output_tokens: 26, + cache_read_tokens: 5000, + cache_creation_tokens: 10000, + stop_reason: None, + timestamp: Some("2026-04-05T12:00:00Z".to_string()), + session_id: None, + }; + messages.insert("msg_1".to_string(), intermediate); + + // 最终条目(有 stop_reason) + let final_entry = ParsedAssistantUsage { + message_id: "msg_1".to_string(), + model: "claude-opus-4-6".to_string(), + input_tokens: 3, + output_tokens: 1349, + cache_read_tokens: 5000, + cache_creation_tokens: 10000, + stop_reason: Some("end_turn".to_string()), + timestamp: Some("2026-04-05T12:00:00Z".to_string()), + session_id: None, + }; + + // 应该替换 + let should_replace = final_entry.stop_reason.is_some() + && messages.get("msg_1").unwrap().stop_reason.is_none(); + assert!(should_replace); + + messages.insert("msg_1".to_string(), final_entry); + assert_eq!(messages.get("msg_1").unwrap().output_tokens, 1349); + } +} diff --git a/src-tauri/src/services/session_usage_codex.rs b/src-tauri/src/services/session_usage_codex.rs new file mode 100644 index 000000000..4e2a3041b --- /dev/null +++ b/src-tauri/src/services/session_usage_codex.rs @@ -0,0 +1,254 @@ +//! Codex 使用数据导入 +//! +//! 从 ~/.codex/state_5.sqlite 的 threads 表读取 token 使用数据, +//! 导入到 proxy_request_logs 表。 +//! +//! ## 限制 +//! - Thread 级粒度(非请求级) +//! - 只有总 token 数,使用估算比例分 input/output + +use crate::codex_config::get_codex_config_dir; +use crate::database::{lock_conn, Database}; +use crate::error::AppError; +use crate::proxy::usage::calculator::{CostCalculator, ModelPricing}; +use crate::proxy::usage::parser::TokenUsage; +use crate::services::session_usage::SessionSyncResult; +use rust_decimal::Decimal; + +/// 默认 input/output 比例(70% input, 30% output) +const DEFAULT_INPUT_RATIO: f64 = 0.7; + +/// 同步 Codex 使用数据 +pub fn sync_codex_usage(db: &Database) -> Result { + let codex_dir = get_codex_config_dir(); + let state_db_path = codex_dir.join("state_5.sqlite"); + + if !state_db_path.exists() { + return Ok(SessionSyncResult { + imported: 0, + skipped: 0, + files_scanned: 0, + errors: vec![], + }); + } + + let mut result = SessionSyncResult { + imported: 0, + skipped: 0, + files_scanned: 1, + errors: vec![], + }; + + // 只读打开 Codex SQLite 数据库 + let codex_conn = match rusqlite::Connection::open_with_flags( + &state_db_path, + rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX, + ) { + Ok(c) => c, + Err(e) => { + result.errors.push(format!("无法打开 Codex 数据库: {e}")); + return Ok(result); + } + }; + + // 查询所有 thread + let mut stmt = match codex_conn.prepare( + "SELECT id, model, model_provider, tokens_used, created_at, title + FROM threads + WHERE tokens_used > 0 + ORDER BY created_at DESC", + ) { + Ok(s) => s, + Err(e) => { + result + .errors + .push(format!("查询 Codex threads 失败: {e}")); + return Ok(result); + } + }; + + let rows = match stmt.query_map([], |row| { + Ok(( + row.get::<_, String>(0)?, // id + row.get::<_, Option>(1)?, // model (nullable) + row.get::<_, String>(2)?, // model_provider + row.get::<_, i64>(3)?, // tokens_used + row.get::<_, i64>(4)?, // created_at (unix seconds) + row.get::<_, String>(5)?, // title + )) + }) { + Ok(r) => r, + Err(e) => { + result.errors.push(format!("遍历 threads 失败: {e}")); + return Ok(result); + } + }; + + let conn = lock_conn!(db.conn); + + for row_result in rows { + let (thread_id, model, _provider, tokens_used, created_at, _title) = match row_result { + Ok(r) => r, + Err(e) => { + result.errors.push(format!("读取行失败: {e}")); + continue; + } + }; + + let request_id = format!("codex_thread:{thread_id}"); + let model = model.unwrap_or_else(|| "unknown".to_string()); + + // 检查是否已存在 + let exists: bool = conn + .query_row( + "SELECT COUNT(*) FROM proxy_request_logs WHERE request_id = ?1", + rusqlite::params![request_id], + |row| row.get::<_, i64>(0).map(|c| c > 0), + ) + .unwrap_or(false); + + if exists { + result.skipped += 1; + continue; + } + + // 使用估算比例分配 input/output + let input_tokens = (tokens_used as f64 * DEFAULT_INPUT_RATIO) as u32; + let output_tokens = tokens_used as u32 - input_tokens; + + let usage = TokenUsage { + input_tokens, + output_tokens, + cache_read_tokens: 0, + cache_creation_tokens: 0, + model: Some(model.clone()), + }; + + // 查找定价 + let pricing = find_codex_pricing(&conn, &model); + let multiplier = Decimal::from(1); + let (input_cost, output_cost, cache_read_cost, cache_creation_cost, total_cost) = + match pricing { + Some(p) => { + let cost = CostCalculator::calculate(&usage, &p, multiplier); + ( + cost.input_cost.to_string(), + cost.output_cost.to_string(), + cost.cache_read_cost.to_string(), + cost.cache_creation_cost.to_string(), + cost.total_cost.to_string(), + ) + } + None => ( + "0".to_string(), + "0".to_string(), + "0".to_string(), + "0".to_string(), + "0".to_string(), + ), + }; + + let insert_result = conn.execute( + "INSERT OR IGNORE INTO proxy_request_logs ( + request_id, provider_id, app_type, model, request_model, + input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, + input_cost_usd, output_cost_usd, cache_read_cost_usd, cache_creation_cost_usd, total_cost_usd, + latency_ms, first_token_ms, status_code, error_message, session_id, + provider_type, is_streaming, cost_multiplier, created_at, data_source + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24)", + rusqlite::params![ + request_id, + "_codex_session", + "codex", + model, + model, + input_tokens, + output_tokens, + 0i64, // cache_read_tokens + 0i64, // cache_creation_tokens + input_cost, + output_cost, + cache_read_cost, + cache_creation_cost, + total_cost, + 0i64, // latency_ms + Option::::None, + 200i64, // status_code + Option::::None, + Some(thread_id), // session_id = thread_id + Some("codex_db"), + 0i64, // is_streaming: unknown + "1.0", + created_at, + "codex_db", + ], + ); + + match insert_result { + Ok(changed) if changed > 0 => result.imported += 1, + Ok(_) => result.skipped += 1, + Err(e) => { + result.errors.push(format!("插入失败: {e}")); + result.skipped += 1; + } + } + } + + if result.imported > 0 { + log::info!( + "[CODEX-SYNC] 同步完成: 导入 {} 条, 跳过 {} 条", + result.imported, + result.skipped + ); + } + + Ok(result) +} + +/// 查找 Codex 模型定价 +fn find_codex_pricing( + conn: &rusqlite::Connection, + model_id: &str, +) -> Option { + // 精确匹配 + let result = conn.query_row( + "SELECT input_cost_per_million, output_cost_per_million, + cache_read_cost_per_million, cache_creation_cost_per_million + FROM model_pricing WHERE model_id = ?1", + rusqlite::params![model_id], + |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, String>(2)?, + row.get::<_, String>(3)?, + )) + }, + ); + + match result { + Ok((input, output, cache_read, cache_creation)) => { + ModelPricing::from_strings(&input, &output, &cache_read, &cache_creation).ok() + } + Err(_) => { + // 尝试 LIKE 匹配 + let pattern = format!("{model_id}%"); + conn.query_row( + "SELECT input_cost_per_million, output_cost_per_million, + cache_read_cost_per_million, cache_creation_cost_per_million + FROM model_pricing WHERE model_id LIKE ?1 LIMIT 1", + rusqlite::params![pattern], + |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, String>(2)?, + row.get::<_, String>(3)?, + )) + }, + ) + .ok() + .and_then(|(i, o, cr, cc)| ModelPricing::from_strings(&i, &o, &cr, &cc).ok()) + } + } +} diff --git a/src-tauri/src/services/usage_stats.rs b/src-tauri/src/services/usage_stats.rs index af6f75270..d03eea482 100644 --- a/src-tauri/src/services/usage_stats.rs +++ b/src-tauri/src/services/usage_stats.rs @@ -113,6 +113,8 @@ pub struct RequestLogDetail { pub status_code: u16, pub error_message: Option, pub created_at: i64, + #[serde(skip_serializing_if = "Option::is_none")] + pub data_source: Option, } impl Database { @@ -588,7 +590,7 @@ impl Database { l.input_tokens, l.output_tokens, l.cache_read_tokens, l.cache_creation_tokens, l.input_cost_usd, l.output_cost_usd, l.cache_read_cost_usd, l.cache_creation_cost_usd, l.total_cost_usd, l.is_streaming, l.latency_ms, l.first_token_ms, l.duration_ms, - l.status_code, l.error_message, l.created_at + l.status_code, l.error_message, l.created_at, l.data_source FROM proxy_request_logs l LEFT JOIN providers p ON l.provider_id = p.id AND l.app_type = p.app_type {where_clause} @@ -625,6 +627,7 @@ impl Database { status_code: row.get::<_, i64>(20)? as u16, error_message: row.get(21)?, created_at: row.get(22)?, + data_source: row.get(23)?, }) })?; @@ -664,7 +667,7 @@ impl Database { input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, input_cost_usd, output_cost_usd, cache_read_cost_usd, cache_creation_cost_usd, total_cost_usd, is_streaming, latency_ms, first_token_ms, duration_ms, - status_code, error_message, created_at + status_code, error_message, created_at, l.data_source FROM proxy_request_logs l LEFT JOIN providers p ON l.provider_id = p.id AND l.app_type = p.app_type WHERE l.request_id = ?", @@ -694,6 +697,7 @@ impl Database { status_code: row.get::<_, i64>(20)? as u16, error_message: row.get(21)?, created_at: row.get(22)?, + data_source: row.get(23)?, }) }, ); diff --git a/src/components/usage/DataSourceBar.tsx b/src/components/usage/DataSourceBar.tsx new file mode 100644 index 000000000..e068151e0 --- /dev/null +++ b/src/components/usage/DataSourceBar.tsx @@ -0,0 +1,122 @@ +import { useTranslation } from "react-i18next"; +import { useQuery, useQueryClient } from "@tanstack/react-query"; +import { usageApi } from "@/lib/api/usage"; +import { usageKeys } from "@/lib/query/usage"; +import { Database, FileText, RefreshCw, Loader2 } from "lucide-react"; +import { Button } from "@/components/ui/button"; +import { useState } from "react"; +import { toast } from "sonner"; + +interface DataSourceBarProps { + refreshIntervalMs: number; +} + +const DATA_SOURCE_ICONS: Record = { + proxy: , + session_log: , + codex_db: , +}; + +export function DataSourceBar({ refreshIntervalMs }: DataSourceBarProps) { + const { t } = useTranslation(); + const queryClient = useQueryClient(); + const [syncing, setSyncing] = useState(false); + + const { data: sources } = useQuery({ + queryKey: [...usageKeys.all, "data-sources"], + queryFn: usageApi.getDataSourceBreakdown, + refetchInterval: refreshIntervalMs > 0 ? refreshIntervalMs : false, + refetchIntervalInBackground: false, + }); + + const handleSync = async () => { + setSyncing(true); + try { + const result = await usageApi.syncSessionUsage(); + if (result.imported > 0) { + toast.success( + t("usage.sessionSync.imported", { + count: result.imported, + defaultValue: "Imported {{count}} records from session logs", + }), + ); + // Refresh all usage data + queryClient.invalidateQueries({ queryKey: usageKeys.all }); + } else { + toast.info( + t("usage.sessionSync.upToDate", { + defaultValue: "Session logs are up to date", + }), + ); + } + } catch { + toast.error( + t("usage.sessionSync.failed", { + defaultValue: "Session sync failed", + }), + ); + } finally { + setSyncing(false); + } + }; + + if (!sources || sources.length === 0) { + return null; + } + + const hasNonProxy = sources.some((s) => s.dataSource !== "proxy"); + + return ( +
+ + {t("usage.dataSources", { defaultValue: "Data Sources" })}: + +
+ {sources.map((source) => ( +
+ {DATA_SOURCE_ICONS[source.dataSource] ?? ( + + )} + + {t(`usage.dataSource.${source.dataSource}`, { + defaultValue: source.dataSource, + })} + + + {source.requestCount.toLocaleString()} + +
+ ))} +
+ +
+ +
+
+ ); +} diff --git a/src/components/usage/RequestLogTable.tsx b/src/components/usage/RequestLogTable.tsx index 691a8e075..5d6bd0c17 100644 --- a/src/components/usage/RequestLogTable.tsx +++ b/src/components/usage/RequestLogTable.tsx @@ -371,13 +371,16 @@ export function RequestLogTable({ refreshIntervalMs }: RequestLogTableProps) { {t("usage.status")} + + {t("usage.source", { defaultValue: "Source" })} + {logs.length === 0 ? ( {t("usage.noData")} @@ -506,6 +509,21 @@ export function RequestLogTable({ refreshIntervalMs }: RequestLogTableProps) { {log.statusCode} + + {log.dataSource && log.dataSource !== "proxy" ? ( + + {t(`usage.dataSource.${log.dataSource}`, { + defaultValue: log.dataSource, + })} + + ) : ( + + {t("usage.dataSource.proxy", { + defaultValue: "Proxy", + })} + + )} + )) )} diff --git a/src/components/usage/UsageDashboard.tsx b/src/components/usage/UsageDashboard.tsx index 81060e357..f11ba536d 100644 --- a/src/components/usage/UsageDashboard.tsx +++ b/src/components/usage/UsageDashboard.tsx @@ -6,6 +6,7 @@ import { UsageTrendChart } from "./UsageTrendChart"; import { RequestLogTable } from "./RequestLogTable"; import { ProviderStatsTable } from "./ProviderStatsTable"; import { ModelStatsTable } from "./ModelStatsTable"; +import { DataSourceBar } from "./DataSourceBar"; import type { TimeRange } from "@/types/usage"; import { motion } from "framer-motion"; import { @@ -100,6 +101,8 @@ export function UsageDashboard() { + + diff --git a/src/i18n/locales/en.json b/src/i18n/locales/en.json index da49b30a6..8fc98c2ff 100644 --- a/src/i18n/locales/en.json +++ b/src/i18n/locales/en.json @@ -1018,6 +1018,21 @@ "unknownProvider": "Unknown Provider", "stream": "Stream", "nonStream": "Non-stream", + "source": "Source", + "dataSources": "Data Sources", + "dataSource": { + "proxy": "Proxy", + "session_log": "Session Log", + "codex_db": "Codex DB" + }, + "sessionSync": { + "trigger": "Sync session logs", + "import": "Import Sessions", + "resync": "Sync", + "imported": "Imported {{count}} records from session logs", + "upToDate": "Session logs are up to date", + "failed": "Session sync failed" + }, "totalRecords": "{{total}} records total", "modelPricing": "Model Pricing", "loadPricingError": "Failed to load pricing data", diff --git a/src/i18n/locales/ja.json b/src/i18n/locales/ja.json index bc13a3138..e27cc8411 100644 --- a/src/i18n/locales/ja.json +++ b/src/i18n/locales/ja.json @@ -1018,6 +1018,21 @@ "unknownProvider": "不明なプロバイダー", "stream": "ストリーム", "nonStream": "非ストリーム", + "source": "ソース", + "dataSources": "データソース", + "dataSource": { + "proxy": "プロキシ", + "session_log": "セッションログ", + "codex_db": "Codex DB" + }, + "sessionSync": { + "trigger": "セッションログを同期", + "import": "セッションをインポート", + "resync": "同期", + "imported": "セッションログから {{count}} 件のレコードをインポートしました", + "upToDate": "セッションログは最新です", + "failed": "セッション同期に失敗しました" + }, "totalRecords": "全 {{total}} 件", "modelPricing": "モデル料金", "loadPricingError": "料金データの読み込みに失敗しました", diff --git a/src/i18n/locales/zh.json b/src/i18n/locales/zh.json index 431e8354f..84f783c00 100644 --- a/src/i18n/locales/zh.json +++ b/src/i18n/locales/zh.json @@ -1018,6 +1018,21 @@ "unknownProvider": "未知供应商", "stream": "流", "nonStream": "非流", + "source": "来源", + "dataSources": "数据来源", + "dataSource": { + "proxy": "代理", + "session_log": "会话日志", + "codex_db": "Codex 数据库" + }, + "sessionSync": { + "trigger": "同步会话日志", + "import": "导入会话", + "resync": "同步", + "imported": "从会话日志导入了 {{count}} 条记录", + "upToDate": "会话日志已是最新", + "failed": "会话同步失败" + }, "totalRecords": "共 {{total}} 条记录", "modelPricing": "模型定价", "loadPricingError": "加载定价数据失败", diff --git a/src/lib/api/usage.ts b/src/lib/api/usage.ts index dbb38bc9b..981af57d2 100644 --- a/src/lib/api/usage.ts +++ b/src/lib/api/usage.ts @@ -9,6 +9,8 @@ import type { ModelPricing, ProviderLimitStatus, PaginatedLogs, + SessionSyncResult, + DataSourceSummary, } from "@/types/usage"; import type { UsageResult } from "@/types"; import type { AppId } from "./types"; @@ -115,4 +117,13 @@ export const usageApi = { ): Promise => { return invoke("check_provider_limits", { providerId, appType }); }, + + // Session usage sync + syncSessionUsage: async (): Promise => { + return invoke("sync_session_usage"); + }, + + getDataSourceBreakdown: async (): Promise => { + return invoke("get_usage_data_sources"); + }, }; diff --git a/src/types/usage.ts b/src/types/usage.ts index 61dc8774f..6b21e2b71 100644 --- a/src/types/usage.ts +++ b/src/types/usage.ts @@ -31,6 +31,20 @@ export interface RequestLog { statusCode: number; errorMessage?: string; createdAt: number; + dataSource?: string; +} + +export interface SessionSyncResult { + imported: number; + skipped: number; + filesScanned: number; + errors: string[]; +} + +export interface DataSourceSummary { + dataSource: string; + requestCount: number; + totalCostUsd: string; } export interface PaginatedLogs {