feat: add Gemini CLI session log usage tracking

Parse ~/.gemini/tmp/*/chats/session-*.json for precise per-message
token data (input/output/cached/thoughts). Integrates with existing
background sync and manual sync button alongside Claude and Codex.
This commit is contained in:
Jason
2026-04-06 19:48:06 +08:00
parent 8ad1bb7924
commit f5d7064d57
8 changed files with 525 additions and 3 deletions
+13
View File
@@ -187,6 +187,19 @@ pub fn sync_session_usage(
}
}
// 同步 Gemini 使用数据
match crate::services::session_usage_gemini::sync_gemini_usage(&state.db) {
Ok(gemini_result) => {
result.imported += gemini_result.imported;
result.skipped += gemini_result.skipped;
result.files_scanned += gemini_result.files_scanned;
result.errors.extend(gemini_result.errors);
}
Err(e) => {
result.errors.push(format!("Gemini 同步失败: {e}"));
}
}
Ok(result)
}
+14
View File
@@ -817,6 +817,13 @@ pub fn run() {
{
log::warn!("Codex usage initial sync failed: {e}");
}
if let Err(e) =
crate::services::session_usage_gemini::sync_gemini_usage(
&db_for_session_sync,
)
{
log::warn!("Gemini usage initial sync failed: {e}");
}
// 定期同步
let mut interval = tokio::time::interval(std::time::Duration::from_secs(
@@ -839,6 +846,13 @@ pub fn run() {
{
log::warn!("Codex usage periodic sync failed: {e}");
}
if let Err(e) =
crate::services::session_usage_gemini::sync_gemini_usage(
&db_for_session_sync,
)
{
log::warn!("Gemini usage periodic sync failed: {e}");
}
}
});
});
+1
View File
@@ -11,6 +11,7 @@ pub mod provider;
pub mod proxy;
pub mod session_usage;
pub mod session_usage_codex;
pub mod session_usage_gemini;
pub mod skill;
pub mod speedtest;
pub mod stream_check;
@@ -0,0 +1,490 @@
//! Gemini CLI 会话日志使用追踪
//!
//! 从 ~/.gemini/tmp/<project_hash>/chats/session-*.json 中提取精确 token 使用数据。
//!
//! ## 数据流
//! ```text
//! ~/.gemini/tmp/*/chats/session-*.json → 全量解析 → 费用计算 → proxy_request_logs 表
//! ```
//!
//! ## 与 Claude/Codex 解析器的差异
//! - JSON 格式(非 JSONL):每个文件是单个 JSON 对象,包含 messages 数组
//! - 无需 delta 计算:tokens 字段是 per-message 独立值
//! - 无需状态恢复:不依赖前一条消息的累计值
//! - 天然去重:每条消息有唯一 id 字段
use crate::database::{lock_conn, Database};
use crate::error::AppError;
use crate::gemini_config::get_gemini_dir;
use crate::proxy::usage::calculator::{CostCalculator, ModelPricing};
use crate::proxy::usage::parser::TokenUsage;
use crate::services::session_usage::SessionSyncResult;
use rust_decimal::Decimal;
use std::fs;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
/// 从 Gemini message 中提取的 token 数据
#[derive(Debug)]
struct GeminiTokens {
input: u32,
output: u32,
cached: u32,
thoughts: u32,
}
/// 同步 Gemini 使用数据(从 JSON 会话日志)
pub fn sync_gemini_usage(db: &Database) -> Result<SessionSyncResult, AppError> {
let gemini_dir = get_gemini_dir();
let files = collect_gemini_session_files(&gemini_dir);
let mut result = SessionSyncResult {
imported: 0,
skipped: 0,
files_scanned: files.len() as u32,
errors: vec![],
};
if files.is_empty() {
return Ok(result);
}
for file_path in &files {
match sync_single_gemini_file(db, file_path) {
Ok((imported, skipped)) => {
result.imported += imported;
result.skipped += skipped;
}
Err(e) => {
let msg = format!("Gemini 会话文件解析失败 {}: {e}", file_path.display());
log::warn!("[GEMINI-SYNC] {msg}");
result.errors.push(msg);
}
}
}
if result.imported > 0 {
log::info!(
"[GEMINI-SYNC] 同步完成: 导入 {} 条, 跳过 {} 条, 扫描 {} 个文件",
result.imported,
result.skipped,
result.files_scanned
);
}
Ok(result)
}
/// 收集所有 Gemini 会话 JSON 文件
fn collect_gemini_session_files(gemini_dir: &Path) -> Vec<PathBuf> {
let mut files = Vec::new();
let tmp_dir = gemini_dir.join("tmp");
if !tmp_dir.is_dir() {
return files;
}
// 遍历 tmp/<project_hash>/chats/session-*.json
let project_dirs = match fs::read_dir(&tmp_dir) {
Ok(entries) => entries,
Err(_) => return files,
};
for entry in project_dirs.flatten() {
let chats_dir = entry.path().join("chats");
if !chats_dir.is_dir() {
continue;
}
let chat_files = match fs::read_dir(&chats_dir) {
Ok(entries) => entries,
Err(_) => continue,
};
for file_entry in chat_files.flatten() {
let path = file_entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("json") {
files.push(path);
}
}
}
files
}
/// 同步单个 Gemini 会话 JSON 文件,返回 (imported, skipped)
fn sync_single_gemini_file(db: &Database, file_path: &Path) -> Result<(u32, u32), AppError> {
let file_path_str = file_path.to_string_lossy().to_string();
// 获取文件元数据
let metadata = fs::metadata(file_path)
.map_err(|e| AppError::Config(format!("无法读取文件元数据: {e}")))?;
let file_modified = metadata
.modified()
.ok()
.and_then(|t| t.duration_since(SystemTime::UNIX_EPOCH).ok())
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
// 检查同步状态
let (last_modified, _last_offset) = get_sync_state(db, &file_path_str)?;
// 文件未变化则跳过
if file_modified <= last_modified {
return Ok((0, 0));
}
// 读取并解析整个 JSON 文件
let content = fs::read_to_string(file_path)
.map_err(|e| AppError::Config(format!("无法读取文件: {e}")))?;
let value: serde_json::Value = serde_json::from_str(&content)
.map_err(|e| AppError::Config(format!("JSON 解析失败: {e}")))?;
// 提取顶层 sessionId
let session_id = value
.get("sessionId")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
// 遍历 messages 数组
let messages = match value.get("messages").and_then(|v| v.as_array()) {
Some(msgs) => msgs,
None => return Ok((0, 0)),
};
let mut imported: u32 = 0;
let mut skipped: u32 = 0;
let mut gemini_msg_count: i64 = 0;
for msg in messages {
// 只处理 type == "gemini" 的消息
if msg.get("type").and_then(|t| t.as_str()) != Some("gemini") {
continue;
}
// 提取 tokens 对象
let tokens_obj = match msg.get("tokens") {
Some(t) if t.is_object() => t,
_ => continue,
};
let tokens = parse_gemini_tokens(tokens_obj);
if tokens.input == 0 && tokens.output == 0 && tokens.thoughts == 0 {
continue; // 跳过全零的空 token 消息
}
gemini_msg_count += 1;
// 提取消息 ID 和模型
let message_id = msg
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let model = msg
.get("model")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let timestamp = msg.get("timestamp").and_then(|v| v.as_str());
// 生成唯一 request_id
let session_id_str = session_id.as_deref().unwrap_or("unknown");
let request_id = format!("gemini_session:{session_id_str}:{message_id}");
match insert_gemini_session_entry(
db,
&request_id,
&tokens,
model,
session_id.as_deref(),
timestamp,
) {
Ok(true) => imported += 1,
Ok(false) => skipped += 1,
Err(e) => {
log::warn!("[GEMINI-SYNC] 插入失败 ({}): {e}", request_id);
skipped += 1;
}
}
}
// 更新同步状态
update_sync_state(db, &file_path_str, file_modified, gemini_msg_count)?;
Ok((imported, skipped))
}
/// 从 tokens JSON 对象中提取 token 数据
fn parse_gemini_tokens(tokens: &serde_json::Value) -> GeminiTokens {
GeminiTokens {
input: tokens
.get("input")
.and_then(|v| v.as_u64())
.unwrap_or(0) as u32,
output: tokens
.get("output")
.and_then(|v| v.as_u64())
.unwrap_or(0) as u32,
cached: tokens
.get("cached")
.and_then(|v| v.as_u64())
.unwrap_or(0) as u32,
thoughts: tokens
.get("thoughts")
.and_then(|v| v.as_u64())
.unwrap_or(0) as u32,
}
}
/// 插入单条 Gemini 会话记录到 proxy_request_logs
fn insert_gemini_session_entry(
db: &Database,
request_id: &str,
tokens: &GeminiTokens,
model: &str,
session_id: Option<&str>,
timestamp: Option<&str>,
) -> Result<bool, AppError> {
let conn = lock_conn!(db.conn);
// 检查是否已存在
let exists: bool = conn
.query_row(
"SELECT COUNT(*) FROM proxy_request_logs WHERE request_id = ?1",
rusqlite::params![request_id],
|row| row.get::<_, i64>(0).map(|c| c > 0),
)
.unwrap_or(false);
if exists {
return Ok(false);
}
// 解析时间戳
let created_at = timestamp
.and_then(|ts| {
chrono::DateTime::parse_from_rfc3339(ts)
.ok()
.map(|dt| dt.timestamp())
})
.unwrap_or_else(|| {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0)
});
// 合并 thoughts 到 output(思考 token 按输出计费)
let output_tokens = tokens.output + tokens.thoughts;
// 计算费用
let usage = TokenUsage {
input_tokens: tokens.input,
output_tokens,
cache_read_tokens: tokens.cached,
cache_creation_tokens: 0,
model: Some(model.to_string()),
};
let pricing = find_gemini_pricing(&conn, model);
let multiplier = Decimal::from(1);
let (input_cost, output_cost, cache_read_cost, cache_creation_cost, total_cost) = match pricing
{
Some(p) => {
let cost = CostCalculator::calculate(&usage, &p, multiplier);
(
cost.input_cost.to_string(),
cost.output_cost.to_string(),
cost.cache_read_cost.to_string(),
cost.cache_creation_cost.to_string(),
cost.total_cost.to_string(),
)
}
None => (
"0".to_string(),
"0".to_string(),
"0".to_string(),
"0".to_string(),
"0".to_string(),
),
};
conn.execute(
"INSERT OR IGNORE INTO proxy_request_logs (
request_id, provider_id, app_type, model, request_model,
input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
input_cost_usd, output_cost_usd, cache_read_cost_usd, cache_creation_cost_usd, total_cost_usd,
latency_ms, first_token_ms, status_code, error_message, session_id,
provider_type, is_streaming, cost_multiplier, created_at, data_source
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24)",
rusqlite::params![
request_id,
"_gemini_session", // provider_id
"gemini", // app_type
model,
model, // request_model = model
tokens.input,
output_tokens,
tokens.cached,
0i64, // cache_creation_tokens
input_cost,
output_cost,
cache_read_cost,
cache_creation_cost,
total_cost,
0i64, // latency_ms
Option::<i64>::None, // first_token_ms
200i64, // status_code
Option::<String>::None, // error_message
session_id.map(|s| s.to_string()),
Some("gemini_session"), // provider_type
1i64, // is_streaming
"1.0", // cost_multiplier
created_at,
"gemini_session", // data_source
],
)
.map_err(|e| AppError::Database(format!("插入 Gemini 会话日志失败: {e}")))?;
Ok(true)
}
/// 获取文件的同步状态
fn get_sync_state(db: &Database, file_path: &str) -> Result<(i64, i64), AppError> {
let conn = lock_conn!(db.conn);
let result = conn.query_row(
"SELECT last_modified, last_line_offset FROM session_log_sync WHERE file_path = ?1",
rusqlite::params![file_path],
|row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)),
);
Ok(result.unwrap_or((0, 0)))
}
/// 更新文件的同步状态
fn update_sync_state(
db: &Database,
file_path: &str,
last_modified: i64,
last_offset: i64,
) -> Result<(), AppError> {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
let conn = lock_conn!(db.conn);
conn.execute(
"INSERT OR REPLACE INTO session_log_sync (file_path, last_modified, last_line_offset, last_synced_at)
VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![file_path, last_modified, last_offset, now],
)
.map_err(|e| AppError::Database(format!("更新同步状态失败: {e}")))?;
Ok(())
}
/// 查找 Gemini 模型定价
fn find_gemini_pricing(conn: &rusqlite::Connection, model_id: &str) -> Option<ModelPricing> {
// 精确匹配
if let Some(pricing) = try_find_pricing(conn, model_id) {
return Some(pricing);
}
// LIKE 模糊匹配(兜底)
let pattern = format!("{model_id}%");
conn.query_row(
"SELECT input_cost_per_million, output_cost_per_million,
cache_read_cost_per_million, cache_creation_cost_per_million
FROM model_pricing WHERE model_id LIKE ?1 LIMIT 1",
rusqlite::params![pattern],
|row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
row.get::<_, String>(3)?,
))
},
)
.ok()
.and_then(|(i, o, cr, cc)| ModelPricing::from_strings(&i, &o, &cr, &cc).ok())
}
/// 精确匹配定价查询
fn try_find_pricing(conn: &rusqlite::Connection, model_id: &str) -> Option<ModelPricing> {
conn.query_row(
"SELECT input_cost_per_million, output_cost_per_million,
cache_read_cost_per_million, cache_creation_cost_per_million
FROM model_pricing WHERE model_id = ?1",
rusqlite::params![model_id],
|row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
row.get::<_, String>(3)?,
))
},
)
.ok()
.and_then(|(i, o, cr, cc)| ModelPricing::from_strings(&i, &o, &cr, &cc).ok())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_collect_gemini_session_files_nonexistent() {
let files = collect_gemini_session_files(Path::new("/nonexistent/path"));
assert!(files.is_empty());
}
#[test]
fn test_parse_gemini_tokens() {
let json: serde_json::Value = serde_json::json!({
"input": 8522,
"output": 29,
"cached": 3138,
"thoughts": 405,
"tool": 0,
"total": 8956
});
let tokens = parse_gemini_tokens(&json);
assert_eq!(tokens.input, 8522);
assert_eq!(tokens.output, 29);
assert_eq!(tokens.cached, 3138);
assert_eq!(tokens.thoughts, 405);
// output + thoughts = 29 + 405 = 434(用于计费)
assert_eq!(tokens.output + tokens.thoughts, 434);
}
#[test]
fn test_parse_gemini_tokens_missing_fields() {
// 缺少某些字段时应返回 0
let json: serde_json::Value = serde_json::json!({
"input": 100,
"output": 50
});
let tokens = parse_gemini_tokens(&json);
assert_eq!(tokens.input, 100);
assert_eq!(tokens.output, 50);
assert_eq!(tokens.cached, 0);
assert_eq!(tokens.thoughts, 0);
}
#[test]
fn test_parse_gemini_tokens_all_zero() {
let json: serde_json::Value = serde_json::json!({
"input": 0,
"output": 0,
"cached": 0,
"thoughts": 0,
"tool": 0,
"total": 0
});
let tokens = parse_gemini_tokens(&json);
assert_eq!(tokens.input, 0);
assert_eq!(tokens.output, 0);
// 全零会被 sync 逻辑跳过
}
}
+1
View File
@@ -16,6 +16,7 @@ const DATA_SOURCE_ICONS: Record<string, React.ReactNode> = {
session_log: <FileText className="h-3.5 w-3.5" />,
codex_db: <Database className="h-3.5 w-3.5" />,
codex_session: <FileText className="h-3.5 w-3.5" />,
gemini_session: <FileText className="h-3.5 w-3.5" />,
};
export function DataSourceBar({ refreshIntervalMs }: DataSourceBarProps) {
+2 -1
View File
@@ -1024,7 +1024,8 @@
"proxy": "Proxy",
"session_log": "Session Log",
"codex_db": "Codex DB",
"codex_session": "Codex Session"
"codex_session": "Codex Session",
"gemini_session": "Gemini Session"
},
"sessionSync": {
"trigger": "Sync session logs",
+2 -1
View File
@@ -1024,7 +1024,8 @@
"proxy": "プロキシ",
"session_log": "セッションログ",
"codex_db": "Codex DB",
"codex_session": "Codex セッション"
"codex_session": "Codex セッション",
"gemini_session": "Gemini セッション"
},
"sessionSync": {
"trigger": "セッションログを同期",
+2 -1
View File
@@ -1024,7 +1024,8 @@
"proxy": "代理",
"session_log": "会话日志",
"codex_db": "Codex 数据库",
"codex_session": "Codex 会话日志"
"codex_session": "Codex 会话日志",
"gemini_session": "Gemini 会话日志"
},
"sessionSync": {
"trigger": "同步会话日志",