feat(usage): add Hermes Agent tracking + fix zero-cost bug + perf

Hermes:
- Parse ~/.hermes/state.db sessions (incl. profiles/*/state.db) into
  proxy_request_logs with data_source='hermes_session', WAL-aware
  incremental sync, Hermes-reported cost preferred over model_pricing
  fallback

Zero-cost bug (dashboard showed \$0 totals):
- GPT-5.5 family default pricing (~83% of affected rows used GPT-5.5)
- find_model_pricing_row: ASCII-lowercase normalization so
  "OpenAI/GPT-5.5@HIGH" matches seeded "gpt-5.5"
- Startup cost backfill in async task: scan rows where total_cost <= 0
  but tokens > 0, recompute via model_pricing in a single transaction

Performance:
- Add (app_type, created_at DESC) covering index for dashboard range
  queries
- Add expression index on COALESCE(data_source, 'proxy') so dedup EXISTS
  subqueries use index lookup instead of full scan; drop superseded
  idx_request_logs_dedup_lookup

Refactor:
- row_to_request_log_detail helper (3-way de-dup; fixes cost_multiplier
  \"1\" vs \"1.0\" drift between callers)
- Promote get_sync_state/update_sync_state to shared session_usage
  module (4 copies -> 1)
- run_step helper in lib.rs replaces 9 if-let-Err blocks
- maybe_backfill_log_costs returns bool to skip duplicate total_cost
  parsing in caller
This commit is contained in:
Jason
2026-04-29 22:38:29 +08:00
parent 2ee7cb4101
commit f061b777b7
16 changed files with 1248 additions and 203 deletions
+13
View File
@@ -220,6 +220,19 @@ pub fn sync_session_usage(
}
}
// 同步 Hermes 使用数据
match crate::services::session_usage_hermes::sync_hermes_usage(&state.db) {
Ok(hermes_result) => {
result.imported += hermes_result.imported;
result.skipped += hermes_result.skipped;
result.files_scanned += hermes_result.files_scanned;
result.errors.extend(hermes_result.errors);
}
Err(e) => {
result.errors.push(format!("Hermes 同步失败: {e}"));
}
}
Ok(result)
}
+32 -7
View File
@@ -214,7 +214,7 @@ impl Database {
[],
)
.map_err(|e| AppError::Database(e.to_string()))?;
Self::create_request_logs_dedup_index_if_supported(conn)?;
Self::create_request_logs_usage_indexes_if_supported(conn)?;
// 11. Model Pricing 表
conn.execute(
@@ -1108,7 +1108,7 @@ impl Database {
"data_source",
"TEXT NOT NULL DEFAULT 'proxy'",
)?;
Self::create_request_logs_dedup_index_if_supported(conn)?;
Self::create_request_logs_usage_indexes_if_supported(conn)?;
}
// 2. 创建会话日志同步状态表
@@ -1298,6 +1298,13 @@ impl Database {
"0.30",
"3.75",
),
// GPT-5.5 系列
("gpt-5.5", "GPT-5.5", "5", "30", "0.50", "0"),
("gpt-5.5-low", "GPT-5.5", "5", "30", "0.50", "0"),
("gpt-5.5-medium", "GPT-5.5", "5", "30", "0.50", "0"),
("gpt-5.5-high", "GPT-5.5", "5", "30", "0.50", "0"),
("gpt-5.5-xhigh", "GPT-5.5", "5", "30", "0.50", "0"),
("gpt-5.5-minimal", "GPT-5.5", "5", "30", "0.50", "0"),
// GPT-5.4 系列
("gpt-5.4", "GPT-5.4", "2.50", "15", "0.25", "0"),
("gpt-5.4-mini", "GPT-5.4 Mini", "0.75", "4.50", "0.075", "0"),
@@ -1910,11 +1917,22 @@ impl Database {
Ok(())
}
fn create_request_logs_dedup_index_if_supported(conn: &Connection) -> Result<(), AppError> {
fn create_request_logs_usage_indexes_if_supported(conn: &Connection) -> Result<(), AppError> {
if !Self::table_exists(conn, "proxy_request_logs")? {
return Ok(());
}
let has_app_type = Self::has_column(conn, "proxy_request_logs", "app_type")?;
let has_created_at = Self::has_column(conn, "proxy_request_logs", "created_at")?;
if has_app_type && has_created_at {
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_request_logs_app_created_at
ON proxy_request_logs(app_type, created_at DESC)",
[],
)
.map_err(|e| AppError::Database(format!("创建使用量应用时间索引失败: {e}")))?;
}
let required_columns = [
"app_type",
"data_source",
@@ -1930,13 +1948,20 @@ impl Database {
}
}
conn.execute("DROP INDEX IF EXISTS idx_request_logs_dedup_lookup", [])
.map_err(|e| AppError::Database(format!("删除旧使用量去重索引失败: {e}")))?;
// 查询层为了兼容历史 NULL data_source 行,会使用
// COALESCE(data_source, 'proxy')。普通 data_source 索引无法匹配该表达式,
// 会让跨源去重子查询退化成大量扫描;表达式索引让 SQLite 能按同一表达式查找。
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_request_logs_dedup_lookup
ON proxy_request_logs(app_type, data_source, input_tokens, output_tokens,
cache_read_tokens, created_at, cache_creation_tokens)",
"CREATE INDEX IF NOT EXISTS idx_request_logs_dedup_lookup_expr
ON proxy_request_logs(app_type, COALESCE(data_source, 'proxy'), input_tokens,
output_tokens, cache_read_tokens, created_at,
cache_creation_tokens)",
[],
)
.map_err(|e| AppError::Database(format!("创建使用量去重索引失败: {e}")))?;
.map_err(|e| AppError::Database(format!("创建使用量去重表达式索引失败: {e}")))?;
Ok(())
}
+44 -42
View File
@@ -924,28 +924,35 @@ pub fn run() {
tauri::async_runtime::spawn(async move {
const SESSION_SYNC_INTERVAL_SECS: u64 = 60;
fn run_step<T>(name: &str, result: Result<T, crate::error::AppError>) {
if let Err(e) = result {
log::warn!("{name} failed: {e}");
}
}
let db = &db_for_session_sync;
// 首次同步
if let Err(e) =
crate::services::session_usage::sync_claude_session_logs(
&db_for_session_sync,
)
{
log::warn!("Session usage initial sync failed: {e}");
}
if let Err(e) =
crate::services::session_usage_codex::sync_codex_usage(
&db_for_session_sync,
)
{
log::warn!("Codex usage initial sync failed: {e}");
}
if let Err(e) =
crate::services::session_usage_gemini::sync_gemini_usage(
&db_for_session_sync,
)
{
log::warn!("Gemini usage initial sync failed: {e}");
}
run_step(
"Usage cost startup backfill",
db.backfill_missing_usage_costs(),
);
run_step(
"Session usage initial sync",
crate::services::session_usage::sync_claude_session_logs(db),
);
run_step(
"Codex usage initial sync",
crate::services::session_usage_codex::sync_codex_usage(db),
);
run_step(
"Gemini usage initial sync",
crate::services::session_usage_gemini::sync_gemini_usage(db),
);
run_step(
"Hermes usage initial sync",
crate::services::session_usage_hermes::sync_hermes_usage(db),
);
// 定期同步
let mut interval = tokio::time::interval(std::time::Duration::from_secs(
@@ -954,27 +961,22 @@ pub fn run() {
interval.tick().await; // skip immediate first tick
loop {
interval.tick().await;
if let Err(e) =
crate::services::session_usage::sync_claude_session_logs(
&db_for_session_sync,
)
{
log::warn!("Session usage periodic sync failed: {e}");
}
if let Err(e) =
crate::services::session_usage_codex::sync_codex_usage(
&db_for_session_sync,
)
{
log::warn!("Codex usage periodic sync failed: {e}");
}
if let Err(e) =
crate::services::session_usage_gemini::sync_gemini_usage(
&db_for_session_sync,
)
{
log::warn!("Gemini usage periodic sync failed: {e}");
}
run_step(
"Session usage periodic sync",
crate::services::session_usage::sync_claude_session_logs(db),
);
run_step(
"Codex usage periodic sync",
crate::services::session_usage_codex::sync_codex_usage(db),
);
run_step(
"Gemini usage periodic sync",
crate::services::session_usage_gemini::sync_gemini_usage(db),
);
run_step(
"Hermes usage periodic sync",
crate::services::session_usage_hermes::sync_hermes_usage(db),
);
}
});
});
+1
View File
@@ -12,6 +12,7 @@ pub mod proxy;
pub mod session_usage;
pub mod session_usage_codex;
pub mod session_usage_gemini;
pub mod session_usage_hermes;
pub mod skill;
pub mod speedtest;
pub mod stream_check;
+8 -4
View File
@@ -308,8 +308,10 @@ fn sync_single_file(db: &Database, file_path: &Path) -> Result<(u32, u32), AppEr
Ok((imported, skipped))
}
/// 获取文件的同步状态
fn get_sync_state(db: &Database, file_path: &str) -> Result<(i64, i64), AppError> {
/// 获取 session_log_sync 表中某条目的同步进度。
///
/// Shared by all session_usage_* parsers.
pub(crate) fn get_sync_state(db: &Database, file_path: &str) -> Result<(i64, i64), AppError> {
let conn = lock_conn!(db.conn);
let result = conn.query_row(
"SELECT last_modified, last_line_offset FROM session_log_sync WHERE file_path = ?1",
@@ -319,8 +321,10 @@ fn get_sync_state(db: &Database, file_path: &str) -> Result<(i64, i64), AppError
Ok(result.unwrap_or((0, 0)))
}
/// 更新文件的同步状态
fn update_sync_state(
/// 更新 session_log_sync 表中某条目的同步进度。
///
/// Shared by all session_usage_* parsers.
pub(crate) fn update_sync_state(
db: &Database,
file_path: &str,
last_modified: i64,
+1 -34
View File
@@ -18,7 +18,7 @@ use crate::database::{lock_conn, Database};
use crate::error::AppError;
use crate::proxy::usage::calculator::{CostCalculator, ModelPricing};
use crate::proxy::usage::parser::TokenUsage;
use crate::services::session_usage::SessionSyncResult;
use crate::services::session_usage::{get_sync_state, update_sync_state, SessionSyncResult};
use crate::services::usage_stats::{should_skip_session_insert, DedupKey};
use rust_decimal::Decimal;
use std::fs;
@@ -538,39 +538,6 @@ fn insert_codex_session_entry(
Ok(true)
}
/// 获取文件的同步状态
fn get_sync_state(db: &Database, file_path: &str) -> Result<(i64, i64), AppError> {
let conn = lock_conn!(db.conn);
let result = conn.query_row(
"SELECT last_modified, last_line_offset FROM session_log_sync WHERE file_path = ?1",
rusqlite::params![file_path],
|row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)),
);
Ok(result.unwrap_or((0, 0)))
}
/// 更新文件的同步状态
fn update_sync_state(
db: &Database,
file_path: &str,
last_modified: i64,
last_offset: i64,
) -> Result<(), AppError> {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
let conn = lock_conn!(db.conn);
conn.execute(
"INSERT OR REPLACE INTO session_log_sync (file_path, last_modified, last_line_offset, last_synced_at)
VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![file_path, last_modified, last_offset, now],
)
.map_err(|e| AppError::Database(format!("更新同步状态失败: {e}")))?;
Ok(())
}
/// 找 Codex 模型定价(带归一化)
fn find_codex_pricing(conn: &rusqlite::Connection, model_id: &str) -> Option<ModelPricing> {
let normalized = normalize_codex_model(model_id);
+1 -34
View File
@@ -18,7 +18,7 @@ use crate::error::AppError;
use crate::gemini_config::get_gemini_dir;
use crate::proxy::usage::calculator::{CostCalculator, ModelPricing};
use crate::proxy::usage::parser::TokenUsage;
use crate::services::session_usage::SessionSyncResult;
use crate::services::session_usage::{get_sync_state, update_sync_state, SessionSyncResult};
use crate::services::usage_stats::{should_skip_session_insert, DedupKey};
use rust_decimal::Decimal;
use std::fs;
@@ -356,39 +356,6 @@ fn insert_gemini_session_entry(
Ok(conn.changes() > 0)
}
/// 获取文件的同步状态
fn get_sync_state(db: &Database, file_path: &str) -> Result<(i64, i64), AppError> {
let conn = lock_conn!(db.conn);
let result = conn.query_row(
"SELECT last_modified, last_line_offset FROM session_log_sync WHERE file_path = ?1",
rusqlite::params![file_path],
|row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)),
);
Ok(result.unwrap_or((0, 0)))
}
/// 更新文件的同步状态
fn update_sync_state(
db: &Database,
file_path: &str,
last_modified: i64,
last_offset: i64,
) -> Result<(), AppError> {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
let conn = lock_conn!(db.conn);
conn.execute(
"INSERT OR REPLACE INTO session_log_sync (file_path, last_modified, last_line_offset, last_synced_at)
VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![file_path, last_modified, last_offset, now],
)
.map_err(|e| AppError::Database(format!("更新同步状态失败: {e}")))?;
Ok(())
}
/// 查找 Gemini 模型定价
fn find_gemini_pricing(conn: &rusqlite::Connection, model_id: &str) -> Option<ModelPricing> {
// 精确匹配
@@ -0,0 +1,892 @@
//! Hermes Agent session usage tracking.
//!
//! Reads token and cost totals from Hermes' local SQLite state database and
//! imports one usage row per session into `proxy_request_logs`.
use crate::database::{lock_conn, Database};
use crate::error::AppError;
use crate::hermes_config::get_hermes_dir;
use crate::proxy::usage::calculator::{CostCalculator, ModelPricing};
use crate::proxy::usage::parser::TokenUsage;
use crate::services::session_usage::{get_sync_state, update_sync_state, SessionSyncResult};
use crate::services::usage_stats::{has_matching_proxy_usage_log, DedupKey};
use rusqlite::{Connection, OpenFlags};
use rust_decimal::Decimal;
use std::collections::HashSet;
use std::fs;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
const HERMES_APP_TYPE: &str = "hermes";
const HERMES_PROVIDER_ID: &str = "_hermes_session";
const HERMES_DATA_SOURCE: &str = "hermes_session";
const HERMES_SYNC_PREFIX: &str = "sqlite:";
const WATERMARK_OVERLAP_MS: i64 = 300_000;
const ACTIVE_SESSION_LOOKBACK_MS: i64 = 24 * 60 * 60 * 1000;
#[derive(Debug)]
struct HermesSessionUsage {
session_id: String,
model: String,
timestamp_ms: i64,
input_tokens: u32,
output_tokens: u32,
cache_read_tokens: u32,
cache_creation_tokens: u32,
total_cost_usd: Option<String>,
}
/// Sync Hermes usage data from state.db and profile state databases.
pub fn sync_hermes_usage(db: &Database) -> Result<SessionSyncResult, AppError> {
let hermes_dir = get_hermes_dir();
let db_paths = collect_hermes_db_paths(&hermes_dir);
let mut result = SessionSyncResult {
imported: 0,
skipped: 0,
files_scanned: db_paths.len() as u32,
errors: vec![],
};
for db_path in &db_paths {
match sync_single_hermes_db(db, db_path) {
Ok((imported, skipped)) => {
result.imported += imported;
result.skipped += skipped;
}
Err(e) => {
let msg = format!("Hermes 数据库同步失败 {}: {e}", db_path.display());
log::warn!("[HERMES-SYNC] {msg}");
result.errors.push(msg);
}
}
}
if result.imported > 0 {
log::info!(
"[HERMES-SYNC] 同步完成: 导入 {} 条, 跳过 {} 条, 扫描 {} 个数据库",
result.imported,
result.skipped,
result.files_scanned
);
}
Ok(result)
}
fn collect_hermes_db_paths(hermes_dir: &Path) -> Vec<PathBuf> {
let mut paths = Vec::new();
let root_db = hermes_dir.join("state.db");
if root_db.is_file() {
paths.push(root_db);
}
let profiles_dir = hermes_dir.join("profiles");
if let Ok(entries) = fs::read_dir(&profiles_dir) {
for entry in entries.flatten() {
let profile_db = entry.path().join("state.db");
if profile_db.is_file() {
paths.push(profile_db);
}
}
}
paths
}
fn sync_single_hermes_db(db: &Database, db_path: &Path) -> Result<(u32, u32), AppError> {
let sync_key = hermes_sync_key(db_path);
let file_modified = sqlite_db_modified_ms(db_path);
let (last_modified, last_watermark) = get_sync_state(db, &sync_key)?;
if file_modified <= last_modified {
return Ok((0, 0));
}
let conn = Connection::open_with_flags(
db_path,
OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
)
.map_err(|e| AppError::Database(format!("打开 Hermes 数据库失败: {e}")))?;
ensure_sessions_table(&conn)?;
let columns = get_table_columns(&conn, "sessions")?;
validate_hermes_columns(&columns)?;
let threshold = last_watermark.saturating_sub(WATERMARK_OVERLAP_MS);
let sessions = load_hermes_sessions(&conn, &columns, threshold)?;
let mut imported = 0;
let mut skipped = 0;
let mut max_watermark = last_watermark;
for session in &sessions {
max_watermark = max_watermark.max(session.timestamp_ms);
match insert_hermes_session_entry(db, session) {
Ok(true) => imported += 1,
Ok(false) => skipped += 1,
Err(e) => {
log::warn!("[HERMES-SYNC] 插入失败 ({}): {e}", session.session_id);
skipped += 1;
}
}
}
update_sync_state(db, &sync_key, file_modified, max_watermark)?;
Ok((imported, skipped))
}
fn hermes_sync_key(db_path: &Path) -> String {
let absolute = db_path
.canonicalize()
.unwrap_or_else(|_| db_path.to_path_buf());
format!("{HERMES_SYNC_PREFIX}{}", absolute.display())
}
fn sqlite_db_modified_ms(db_path: &Path) -> i64 {
[
db_path.to_path_buf(),
sqlite_sidecar_path(db_path, "-wal"),
sqlite_sidecar_path(db_path, "-shm"),
]
.iter()
.filter_map(|path| fs::metadata(path).ok())
.filter_map(|metadata| metadata.modified().ok())
.filter_map(system_time_to_ms)
.max()
.unwrap_or(0)
}
fn sqlite_sidecar_path(db_path: &Path, suffix: &str) -> PathBuf {
let mut raw = db_path.as_os_str().to_os_string();
raw.push(suffix);
PathBuf::from(raw)
}
fn system_time_to_ms(time: SystemTime) -> Option<i64> {
time.duration_since(SystemTime::UNIX_EPOCH)
.ok()
.map(|duration| duration.as_millis() as i64)
}
fn ensure_sessions_table(conn: &Connection) -> Result<(), AppError> {
let has_sessions = conn
.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='sessions'",
[],
|row| row.get::<_, bool>(0),
)
.unwrap_or(false);
if has_sessions {
Ok(())
} else {
Err(AppError::Database(
"Hermes 数据库缺少 sessions 表".to_string(),
))
}
}
fn get_table_columns(conn: &Connection, table: &str) -> Result<HashSet<String>, AppError> {
let mut stmt = conn
.prepare(&format!("PRAGMA table_info({table})"))
.map_err(|e| AppError::Database(format!("读取 Hermes 表结构失败: {e}")))?;
let rows = stmt
.query_map([], |row| row.get::<_, String>(1))
.map_err(|e| AppError::Database(format!("读取 Hermes 字段失败: {e}")))?;
let mut columns = HashSet::new();
for row in rows {
columns.insert(row.map_err(|e| AppError::Database(e.to_string()))?);
}
Ok(columns)
}
fn validate_hermes_columns(columns: &HashSet<String>) -> Result<(), AppError> {
if !columns.contains("id") {
return Err(AppError::Database(
"Hermes sessions 表缺少 id 字段".to_string(),
));
}
if !columns.contains("started_at") && !columns.contains("ended_at") {
return Err(AppError::Database(
"Hermes sessions 表缺少 started_at/ended_at 字段".to_string(),
));
}
Ok(())
}
fn load_hermes_sessions(
conn: &Connection,
columns: &HashSet<String>,
threshold_ms: i64,
) -> Result<Vec<HermesSessionUsage>, AppError> {
let timestamp_expr = timestamp_expr(columns);
let timestamp_ms_expr =
format!("CAST(ROUND(CAST(({timestamp_expr}) AS REAL) * 1000.0) AS INTEGER)");
let active_session_clause = if columns.contains("ended_at") && columns.contains("started_at") {
format!(
" OR (ended_at IS NULL AND {timestamp_ms_expr} > ?1 - {ACTIVE_SESSION_LOOKBACK_MS})"
)
} else {
String::new()
};
let sql = format!(
"SELECT
CAST(id AS TEXT) AS id,
{model_expr} AS model,
{timestamp_ms_expr} AS timestamp_ms,
{input_expr} AS input_tokens,
{output_expr} AS output_tokens,
{reasoning_expr} AS reasoning_tokens,
{cache_read_expr} AS cache_read_tokens,
{cache_write_expr} AS cache_write_tokens,
{actual_cost_expr} AS actual_cost_usd,
{estimated_cost_expr} AS estimated_cost_usd
FROM sessions
WHERE ({timestamp_ms_expr} > ?1{active_session_clause})
ORDER BY {timestamp_ms_expr} ASC",
model_expr = text_expr(columns, "model", "unknown"),
input_expr = int_expr(columns, "input_tokens"),
output_expr = int_expr(columns, "output_tokens"),
reasoning_expr = int_expr(columns, "reasoning_tokens"),
cache_read_expr = int_expr(columns, "cache_read_tokens"),
cache_write_expr = int_expr(columns, "cache_write_tokens"),
actual_cost_expr = cost_expr(columns, "actual_cost_usd"),
estimated_cost_expr = cost_expr(columns, "estimated_cost_usd"),
);
let mut stmt = conn
.prepare(&sql)
.map_err(|e| AppError::Database(format!("准备 Hermes 查询失败: {e}")))?;
let rows = stmt
.query_map([threshold_ms], |row| {
let output_tokens = get_i64(row, 4).saturating_add(get_i64(row, 5));
let actual_cost: Option<String> = row.get(8)?;
let estimated_cost: Option<String> = row.get(9)?;
Ok(HermesSessionUsage {
session_id: row.get(0)?,
model: row
.get::<_, Option<String>>(1)?
.unwrap_or_else(|| "unknown".to_string()),
timestamp_ms: get_i64(row, 2),
input_tokens: to_u32(get_i64(row, 3)),
output_tokens: to_u32(output_tokens),
cache_read_tokens: to_u32(get_i64(row, 6)),
cache_creation_tokens: to_u32(get_i64(row, 7)),
total_cost_usd: actual_cost
.filter(|s| !s.trim().is_empty())
.or_else(|| estimated_cost.filter(|s| !s.trim().is_empty())),
})
})
.map_err(|e| AppError::Database(format!("查询 Hermes sessions 失败: {e}")))?;
let mut sessions = Vec::new();
for row in rows {
let session = row.map_err(|e| AppError::Database(e.to_string()))?;
if session.session_id.trim().is_empty() || session.timestamp_ms <= 0 {
continue;
}
sessions.push(session);
}
Ok(sessions)
}
fn timestamp_expr(columns: &HashSet<String>) -> String {
match (columns.contains("ended_at"), columns.contains("started_at")) {
(true, true) => "COALESCE(ended_at, started_at)".to_string(),
(true, false) => "ended_at".to_string(),
(false, true) => "started_at".to_string(),
(false, false) => "NULL".to_string(),
}
}
fn text_expr(columns: &HashSet<String>, column: &str, default: &str) -> String {
if columns.contains(column) {
format!("COALESCE(NULLIF(TRIM(CAST({column} AS TEXT)), ''), '{default}')")
} else {
format!("'{default}'")
}
}
fn int_expr(columns: &HashSet<String>, column: &str) -> String {
if columns.contains(column) {
format!("CAST(COALESCE({column}, 0) AS INTEGER)")
} else {
"0".to_string()
}
}
fn cost_expr(columns: &HashSet<String>, column: &str) -> String {
if columns.contains(column) {
format!("NULLIF(TRIM(CAST({column} AS TEXT)), '')")
} else {
"NULL".to_string()
}
}
fn get_i64(row: &rusqlite::Row<'_>, index: usize) -> i64 {
row.get::<_, Option<i64>>(index).ok().flatten().unwrap_or(0)
}
fn to_u32(value: i64) -> u32 {
value.max(0).min(u32::MAX as i64) as u32
}
fn insert_hermes_session_entry(
db: &Database,
session: &HermesSessionUsage,
) -> Result<bool, AppError> {
let conn = lock_conn!(db.conn);
let created_at = session.timestamp_ms / 1000;
let request_id = format!("hermes-session-{}", session.session_id);
let dedup_key = DedupKey {
app_type: HERMES_APP_TYPE,
model: &session.model,
input_tokens: session.input_tokens,
output_tokens: session.output_tokens,
cache_read_tokens: session.cache_read_tokens,
cache_creation_tokens: session.cache_creation_tokens,
created_at,
};
// Hermes rows use UPSERT so repeated request_ids can refresh estimated cost
// into actual cost. This check only prevents double-counting when a matching
// proxy log already covers the same usage window.
if has_matching_proxy_usage_log(&conn, &dedup_key)? {
return Ok(false);
}
let (input_cost, output_cost, cache_read_cost, cache_creation_cost, total_cost) =
match &session.total_cost_usd {
Some(total) => (
"0".to_string(),
"0".to_string(),
"0".to_string(),
"0".to_string(),
total.clone(),
),
None => calculate_fallback_cost(&conn, session),
};
conn.execute(
"INSERT INTO proxy_request_logs (
request_id, provider_id, app_type, model, request_model,
input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
input_cost_usd, output_cost_usd, cache_read_cost_usd, cache_creation_cost_usd, total_cost_usd,
latency_ms, first_token_ms, status_code, error_message, session_id,
provider_type, is_streaming, cost_multiplier, created_at, data_source
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24)
ON CONFLICT(request_id) DO UPDATE SET
model = excluded.model,
request_model = excluded.request_model,
input_tokens = excluded.input_tokens,
output_tokens = excluded.output_tokens,
cache_read_tokens = excluded.cache_read_tokens,
cache_creation_tokens = excluded.cache_creation_tokens,
input_cost_usd = excluded.input_cost_usd,
output_cost_usd = excluded.output_cost_usd,
cache_read_cost_usd = excluded.cache_read_cost_usd,
cache_creation_cost_usd = excluded.cache_creation_cost_usd,
total_cost_usd = excluded.total_cost_usd,
created_at = excluded.created_at
WHERE model != excluded.model
OR request_model != excluded.request_model
OR input_tokens != excluded.input_tokens
OR output_tokens != excluded.output_tokens
OR cache_read_tokens != excluded.cache_read_tokens
OR cache_creation_tokens != excluded.cache_creation_tokens
OR input_cost_usd != excluded.input_cost_usd
OR output_cost_usd != excluded.output_cost_usd
OR cache_read_cost_usd != excluded.cache_read_cost_usd
OR cache_creation_cost_usd != excluded.cache_creation_cost_usd
OR total_cost_usd != excluded.total_cost_usd
OR created_at != excluded.created_at",
rusqlite::params![
request_id,
HERMES_PROVIDER_ID,
HERMES_APP_TYPE,
session.model,
session.model,
session.input_tokens,
session.output_tokens,
session.cache_read_tokens,
session.cache_creation_tokens,
input_cost,
output_cost,
cache_read_cost,
cache_creation_cost,
total_cost,
0i64,
Option::<i64>::None,
200i64,
Option::<String>::None,
session.session_id,
Some(HERMES_DATA_SOURCE),
1i64,
"1.0",
created_at,
HERMES_DATA_SOURCE,
],
)
.map_err(|e| AppError::Database(format!("插入 Hermes 会话用量失败: {e}")))?;
Ok(conn.changes() > 0)
}
fn calculate_fallback_cost(
conn: &Connection,
session: &HermesSessionUsage,
) -> (String, String, String, String, String) {
let usage = TokenUsage {
input_tokens: session.input_tokens,
output_tokens: session.output_tokens,
cache_read_tokens: session.cache_read_tokens,
cache_creation_tokens: session.cache_creation_tokens,
model: Some(session.model.clone()),
message_id: None,
};
match find_hermes_pricing(conn, &session.model) {
Some(pricing) => {
let cost = CostCalculator::calculate(&usage, &pricing, Decimal::from(1));
(
cost.input_cost.to_string(),
cost.output_cost.to_string(),
cost.cache_read_cost.to_string(),
cost.cache_creation_cost.to_string(),
cost.total_cost.to_string(),
)
}
None => (
"0".to_string(),
"0".to_string(),
"0".to_string(),
"0".to_string(),
"0".to_string(),
),
}
}
fn find_hermes_pricing(conn: &Connection, model_id: &str) -> Option<ModelPricing> {
let candidates = pricing_candidates(model_id);
for candidate in &candidates {
if let Some(pricing) = try_find_pricing(conn, candidate) {
return Some(pricing);
}
}
for candidate in &candidates {
let pattern = format!("{candidate}%");
if let Ok((input, output, cache_read, cache_creation)) = conn.query_row(
"SELECT input_cost_per_million, output_cost_per_million,
cache_read_cost_per_million, cache_creation_cost_per_million
FROM model_pricing WHERE model_id LIKE ?1 LIMIT 1",
rusqlite::params![pattern],
|row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
row.get::<_, String>(3)?,
))
},
) {
if let Ok(pricing) =
ModelPricing::from_strings(&input, &output, &cache_read, &cache_creation)
{
return Some(pricing);
}
}
}
None
}
fn pricing_candidates(model_id: &str) -> Vec<String> {
let mut candidates = Vec::new();
let raw = model_id.trim();
if !raw.is_empty() {
candidates.push(raw.to_string());
if let Some((_, tail)) = raw.rsplit_once('/') {
if !tail.is_empty() && tail != raw {
candidates.push(tail.to_string());
}
}
}
candidates
}
fn try_find_pricing(conn: &Connection, model_id: &str) -> Option<ModelPricing> {
conn.query_row(
"SELECT input_cost_per_million, output_cost_per_million,
cache_read_cost_per_million, cache_creation_cost_per_million
FROM model_pricing WHERE model_id = ?1",
rusqlite::params![model_id],
|row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
row.get::<_, String>(3)?,
))
},
)
.ok()
.and_then(|(input, output, cache_read, cache_creation)| {
ModelPricing::from_strings(&input, &output, &cache_read, &cache_creation).ok()
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::services::session_usage::get_data_source_breakdown;
use crate::services::usage_stats::LogFilters;
use std::thread;
use std::time::Duration;
use tempfile::tempdir;
fn create_hermes_db(path: &Path) -> Connection {
let conn = Connection::open(path).expect("open hermes db");
conn.execute(
"CREATE TABLE sessions (
id TEXT PRIMARY KEY,
model TEXT,
started_at REAL,
ended_at REAL,
input_tokens INTEGER,
output_tokens INTEGER,
cache_read_tokens INTEGER,
cache_write_tokens INTEGER,
reasoning_tokens INTEGER,
actual_cost_usd REAL,
estimated_cost_usd REAL
)",
[],
)
.expect("create sessions");
conn
}
fn insert_session(
conn: &Connection,
id: &str,
ended_at: f64,
actual_cost: Option<f64>,
estimated_cost: Option<f64>,
) {
conn.execute(
"INSERT INTO sessions (
id, model, started_at, ended_at, input_tokens, output_tokens,
cache_read_tokens, cache_write_tokens, reasoning_tokens,
actual_cost_usd, estimated_cost_usd
) VALUES (?1, 'openrouter/anthropic/claude-opus-4-7', ?2, ?3, 10, 20, 3, 4, 7, ?4, ?5)",
rusqlite::params![id, ended_at - 10.0, ended_at, actual_cost, estimated_cost],
)
.expect("insert session");
}
fn insert_active_session(conn: &Connection, id: &str, started_at: f64) {
conn.execute(
"INSERT INTO sessions (
id, model, started_at, ended_at, input_tokens, output_tokens,
cache_read_tokens, cache_write_tokens, reasoning_tokens,
actual_cost_usd, estimated_cost_usd
) VALUES (?1, 'openrouter/anthropic/claude-opus-4-7', ?2, NULL, 10, 20, 3, 4, 7, 0.1, NULL)",
rusqlite::params![id, started_at],
)
.expect("insert active session");
}
#[test]
fn test_ensure_sessions_table_errors_when_missing() {
let conn = Connection::open_in_memory().expect("open memory db");
let err = ensure_sessions_table(&conn).expect_err("missing sessions should error");
assert!(err.to_string().contains("缺少 sessions 表"));
}
#[test]
fn test_validate_hermes_columns_rejects_missing_id() {
let columns = HashSet::from(["started_at".to_string()]);
let err = validate_hermes_columns(&columns).expect_err("missing id should error");
assert!(err.to_string().contains("缺少 id 字段"));
}
#[test]
fn test_validate_hermes_columns_rejects_missing_timestamp() {
let columns = HashSet::from(["id".to_string()]);
let err = validate_hermes_columns(&columns).expect_err("missing timestamp should error");
assert!(err.to_string().contains("缺少 started_at/ended_at 字段"));
}
#[test]
fn test_sync_hermes_session_imports_tokens_and_actual_cost() -> Result<(), AppError> {
let temp = tempdir().expect("tempdir");
let db_path = temp.path().join("state.db");
let hermes_conn = create_hermes_db(&db_path);
insert_session(&hermes_conn, "s1", 1000.0, Some(0.123456), Some(0.5));
drop(hermes_conn);
let db = Database::memory()?;
let (imported, skipped) = sync_single_hermes_db(&db, &db_path)?;
assert_eq!((imported, skipped), (1, 0));
let conn = lock_conn!(db.conn);
let row: (String, String, i64, i64, i64, String, String) = conn.query_row(
"SELECT app_type, data_source, input_tokens, output_tokens, cache_creation_tokens, total_cost_usd, provider_id
FROM proxy_request_logs WHERE request_id = 'hermes-session-s1'",
[],
|row| {
Ok((
row.get(0)?,
row.get(1)?,
row.get(2)?,
row.get(3)?,
row.get(4)?,
row.get(5)?,
row.get(6)?,
))
},
)?;
assert_eq!(row.0, "hermes");
assert_eq!(row.1, HERMES_DATA_SOURCE);
assert_eq!(row.2, 10);
assert_eq!(row.3, 27);
assert_eq!(row.4, 4);
assert_eq!(row.5, "0.123456");
assert_eq!(row.6, HERMES_PROVIDER_ID);
Ok(())
}
#[test]
fn test_load_hermes_sessions_limits_active_zombie_sessions() -> Result<(), AppError> {
let temp = tempdir().expect("tempdir");
let db_path = temp.path().join("state.db");
let hermes_conn = create_hermes_db(&db_path);
insert_active_session(&hermes_conn, "old-active", 1_000.0);
insert_active_session(&hermes_conn, "recent-active", 150_000.0);
let columns = get_table_columns(&hermes_conn, "sessions")?;
let sessions = load_hermes_sessions(&hermes_conn, &columns, 200_000_000)?;
let ids: Vec<&str> = sessions
.iter()
.map(|session| session.session_id.as_str())
.collect();
assert!(!ids.contains(&"old-active"));
assert!(ids.contains(&"recent-active"));
Ok(())
}
#[test]
fn test_sync_state_is_per_database_and_watermark_incremental() -> Result<(), AppError> {
let temp = tempdir().expect("tempdir");
let db_path = temp.path().join("state.db");
let hermes_conn = create_hermes_db(&db_path);
insert_session(&hermes_conn, "s1", 1000.0, Some(0.1), None);
drop(hermes_conn);
let db = Database::memory()?;
let (imported, _) = sync_single_hermes_db(&db, &db_path)?;
assert_eq!(imported, 1);
let sync_key = hermes_sync_key(&db_path);
{
let conn = lock_conn!(db.conn);
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM session_log_sync WHERE file_path = ?1",
[sync_key.as_str()],
|row| row.get(0),
)?;
assert_eq!(count, 1);
}
thread::sleep(Duration::from_millis(5));
let hermes_conn = Connection::open(&db_path).expect("reopen hermes db");
insert_session(&hermes_conn, "s2", 1100.0, Some(0.2), None);
drop(hermes_conn);
let (imported, skipped) = sync_single_hermes_db(&db, &db_path)?;
assert_eq!(imported, 1);
assert!(skipped >= 1);
let conn = lock_conn!(db.conn);
let count: i64 = conn.query_row("SELECT COUNT(*) FROM session_log_sync", [], |row| {
row.get(0)
})?;
assert_eq!(count, 1);
let max_offset: i64 = conn.query_row(
"SELECT last_line_offset FROM session_log_sync WHERE file_path = ?1",
[sync_key.as_str()],
|row| row.get(0),
)?;
assert_eq!(max_offset, 1_100_000);
Ok(())
}
#[test]
fn test_sqlite_mtime_includes_wal_and_shm_sidecars() {
let temp = tempdir().expect("tempdir");
let db_path = temp.path().join("state.db");
fs::write(&db_path, b"db").expect("write db");
let db_mtime = sqlite_db_modified_ms(&db_path);
thread::sleep(Duration::from_millis(20));
let wal_path = sqlite_sidecar_path(&db_path, "-wal");
fs::write(&wal_path, b"wal").expect("write wal");
let with_wal = sqlite_db_modified_ms(&db_path);
assert!(with_wal >= db_mtime);
assert!(with_wal > 0);
}
#[test]
fn test_fallback_cost_uses_model_pricing_when_hermes_cost_missing() -> Result<(), AppError> {
let temp = tempdir().expect("tempdir");
let db_path = temp.path().join("state.db");
let hermes_conn = create_hermes_db(&db_path);
insert_session(&hermes_conn, "s1", 1000.0, None, None);
drop(hermes_conn);
let db = Database::memory()?;
{
let conn = lock_conn!(db.conn);
conn.execute(
"INSERT OR REPLACE INTO model_pricing (
model_id, display_name, input_cost_per_million, output_cost_per_million,
cache_read_cost_per_million, cache_creation_cost_per_million
) VALUES ('claude-opus-4-7', 'Claude Opus 4.7', '10', '20', '1', '5')",
[],
)?;
}
let (imported, _) = sync_single_hermes_db(&db, &db_path)?;
assert_eq!(imported, 1);
let conn = lock_conn!(db.conn);
let total: String = conn.query_row(
"SELECT total_cost_usd FROM proxy_request_logs WHERE request_id = 'hermes-session-s1'",
[],
|row| row.get(0),
)?;
assert_ne!(total, "0");
Ok(())
}
#[test]
fn test_estimated_cost_used_when_actual_cost_missing() -> Result<(), AppError> {
let temp = tempdir().expect("tempdir");
let db_path = temp.path().join("state.db");
let hermes_conn = create_hermes_db(&db_path);
insert_session(&hermes_conn, "s1", 1000.0, None, Some(0.456789));
drop(hermes_conn);
let db = Database::memory()?;
let (imported, _) = sync_single_hermes_db(&db, &db_path)?;
assert_eq!(imported, 1);
let conn = lock_conn!(db.conn);
let total: String = conn.query_row(
"SELECT total_cost_usd FROM proxy_request_logs WHERE request_id = 'hermes-session-s1'",
[],
|row| row.get(0),
)?;
assert_eq!(total, "0.456789");
Ok(())
}
#[test]
fn test_matching_proxy_log_skips_hermes_session_insert() -> Result<(), AppError> {
let temp = tempdir().expect("tempdir");
let db_path = temp.path().join("state.db");
let hermes_conn = create_hermes_db(&db_path);
insert_session(&hermes_conn, "s1", 1000.0, Some(0.1), None);
drop(hermes_conn);
let db = Database::memory()?;
{
let conn = lock_conn!(db.conn);
conn.execute(
"INSERT INTO proxy_request_logs (
request_id, provider_id, app_type, model, request_model,
input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
total_cost_usd, latency_ms, status_code, created_at, data_source
) VALUES ('proxy-1', 'p1', 'hermes', 'openrouter/anthropic/claude-opus-4-7',
'openrouter/anthropic/claude-opus-4-7', 10, 27, 3, 4, '0.1', 100, 200, 1000, 'proxy')",
[],
)?;
}
let (imported, skipped) = sync_single_hermes_db(&db, &db_path)?;
assert_eq!(imported, 0);
assert_eq!(skipped, 1);
let conn = lock_conn!(db.conn);
let count: i64 = conn.query_row("SELECT COUNT(*) FROM proxy_request_logs", [], |row| {
row.get(0)
})?;
assert_eq!(count, 1);
Ok(())
}
#[test]
fn test_hermes_provider_name_data_source_and_app_filter() -> Result<(), AppError> {
let temp = tempdir().expect("tempdir");
let db_path = temp.path().join("state.db");
let hermes_conn = create_hermes_db(&db_path);
insert_session(&hermes_conn, "s1", 1000.0, Some(0.1), None);
drop(hermes_conn);
let db = Database::memory()?;
sync_single_hermes_db(&db, &db_path)?;
let provider_stats = db.get_provider_stats(None, None, Some("hermes"))?;
assert_eq!(provider_stats.len(), 1);
assert_eq!(provider_stats[0].provider_id, HERMES_PROVIDER_ID);
assert_eq!(provider_stats[0].provider_name, "Hermes (Session)");
let sources = get_data_source_breakdown(&db)?;
assert!(sources
.iter()
.any(|source| source.data_source == HERMES_DATA_SOURCE && source.request_count == 1));
let logs = db.get_request_logs(
&LogFilters {
app_type: Some("hermes".to_string()),
..Default::default()
},
0,
20,
)?;
assert_eq!(logs.total, 1);
assert_eq!(
logs.data[0].provider_name.as_deref(),
Some("Hermes (Session)")
);
Ok(())
}
#[test]
fn test_collect_hermes_profile_databases() {
let temp = tempdir().expect("tempdir");
fs::write(temp.path().join("state.db"), b"root").expect("write root");
let profile_dir = temp.path().join("profiles").join("work");
fs::create_dir_all(&profile_dir).expect("profile dir");
fs::write(profile_dir.join("state.db"), b"profile").expect("write profile");
let paths = collect_hermes_db_paths(temp.path());
assert_eq!(paths.len(), 2);
assert!(paths.iter().any(|path| path.ends_with("state.db")));
assert!(paths
.iter()
.any(|path| path.to_string_lossy().contains("profiles/work/state.db")));
}
}
+240 -75
View File
@@ -117,29 +117,84 @@ pub struct RequestLogDetail {
pub data_source: Option<String>,
}
/// 把 24 列的查询结果映射为 `RequestLogDetail`。
///
/// 调用方的 SELECT **必须**按以下顺序返回 24 列:
/// `request_id, provider_id, provider_name, app_type, model, request_model,
/// cost_multiplier, input_tokens, output_tokens, cache_read_tokens,
/// cache_creation_tokens, input_cost_usd, output_cost_usd, cache_read_cost_usd,
/// cache_creation_cost_usd, total_cost_usd, is_streaming, latency_ms,
/// first_token_ms, duration_ms, status_code, error_message, created_at,
/// data_source`
///
/// 不需要 provider_name 时(如 backfillSELECT `NULL AS provider_name` 占位即可。
fn row_to_request_log_detail(row: &rusqlite::Row<'_>) -> rusqlite::Result<RequestLogDetail> {
Ok(RequestLogDetail {
request_id: row.get(0)?,
provider_id: row.get(1)?,
provider_name: row.get(2)?,
app_type: row.get(3)?,
model: row.get(4)?,
request_model: row.get(5)?,
cost_multiplier: row
.get::<_, Option<String>>(6)?
.unwrap_or_else(|| "1".to_string()),
input_tokens: row.get::<_, i64>(7)? as u32,
output_tokens: row.get::<_, i64>(8)? as u32,
cache_read_tokens: row.get::<_, i64>(9)? as u32,
cache_creation_tokens: row.get::<_, i64>(10)? as u32,
input_cost_usd: row.get(11)?,
output_cost_usd: row.get(12)?,
cache_read_cost_usd: row.get(13)?,
cache_creation_cost_usd: row.get(14)?,
total_cost_usd: row.get(15)?,
is_streaming: row.get::<_, i64>(16)? != 0,
latency_ms: row.get::<_, i64>(17)? as u64,
first_token_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
duration_ms: row.get::<_, Option<i64>>(19)?.map(|v| v as u64),
status_code: row.get::<_, i64>(20)? as u16,
error_message: row.get(21)?,
created_at: row.get(22)?,
data_source: row.get(23)?,
})
}
/// SQL fragment: resolve provider_name with fallback for session-based entries.
/// Session logs use placeholder provider_ids (_session, _codex_session, _gemini_session)
/// that don't exist in the providers table — this COALESCE gives them readable names.
/// Session logs use placeholder provider_ids (e.g., `_session`, `_<app>_session`)
/// that don't exist in the providers table — the CASE expression below is the
/// authoritative mapping from placeholder to readable name.
fn provider_name_coalesce(log_alias: &str, provider_alias: &str) -> String {
format!(
"COALESCE({provider_alias}.name, CASE {log_alias}.provider_id \
WHEN '_session' THEN 'Claude (Session)' \
WHEN '_codex_session' THEN 'Codex (Session)' \
WHEN '_gemini_session' THEN 'Gemini (Session)' \
WHEN '_hermes_session' THEN 'Hermes (Session)' \
ELSE {log_alias}.provider_id END)"
)
}
pub(crate) const SESSION_PROXY_DEDUP_WINDOW_SECONDS: i64 = 10 * 60;
/// SQL 片段:把指定别名的 `data_source` 包成 COALESCENULL 视作 'proxy'。
///
/// 防御 schema v9 之前可能写入的 NULL data_source 行(见
/// `tests::create_legacy_nullable_logs_table`)。所有用到 data_source 的查询
/// 都应通过此 helper 生成片段,避免遗漏。
fn data_source_expr(log_alias: &str) -> String {
format!("COALESCE({log_alias}.data_source, 'proxy')")
}
pub(crate) fn effective_usage_log_filter(log_alias: &str) -> String {
let data_source = data_source_expr(log_alias);
let proxy_data_source = data_source_expr("proxy_dedup");
format!(
"NOT (
{log_alias}.data_source IN ('session_log', 'codex_session', 'gemini_session')
{data_source} IN ('session_log', 'codex_session', 'gemini_session', 'hermes_session')
AND EXISTS (
SELECT 1
FROM proxy_request_logs proxy_dedup
WHERE proxy_dedup.data_source = 'proxy'
WHERE {proxy_data_source} = 'proxy'
AND proxy_dedup.app_type = {log_alias}.app_type
AND proxy_dedup.status_code >= 200
AND proxy_dedup.status_code < 300
@@ -150,7 +205,7 @@ pub(crate) fn effective_usage_log_filter(log_alias: &str) -> String {
proxy_dedup.cache_creation_tokens = {log_alias}.cache_creation_tokens
OR (
{log_alias}.cache_creation_tokens = 0
AND {log_alias}.data_source IN ('codex_session', 'gemini_session')
AND {data_source} IN ('codex_session', 'gemini_session')
)
)
AND proxy_dedup.created_at BETWEEN
@@ -212,11 +267,12 @@ pub(crate) fn has_matching_proxy_usage_log(
let allow_missing_cache_creation =
matches!(key.app_type, "codex" | "gemini") && key.cache_creation_tokens == 0;
conn.query_row(
let l_data_source = data_source_expr("l");
let sql = format!(
"SELECT EXISTS (
SELECT 1
FROM proxy_request_logs l
WHERE l.data_source = 'proxy'
WHERE {l_data_source} = 'proxy'
AND l.app_type = ?1
AND l.status_code >= 200
AND l.status_code < 300
@@ -230,7 +286,11 @@ pub(crate) fn has_matching_proxy_usage_log(
OR LOWER(l.model) = 'unknown'
OR LOWER(?2) = 'unknown'
)
)",
)"
);
conn.query_row(
&sql,
params![
key.app_type,
key.model,
@@ -1043,36 +1103,7 @@ impl Database {
let mut stmt = conn.prepare(&sql)?;
let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let rows = stmt.query_map(params_refs.as_slice(), |row| {
Ok(RequestLogDetail {
request_id: row.get(0)?,
provider_id: row.get(1)?,
provider_name: row.get(2)?,
app_type: row.get(3)?,
model: row.get(4)?,
request_model: row.get(5)?,
cost_multiplier: row
.get::<_, Option<String>>(6)?
.unwrap_or_else(|| "1".to_string()),
input_tokens: row.get::<_, i64>(7)? as u32,
output_tokens: row.get::<_, i64>(8)? as u32,
cache_read_tokens: row.get::<_, i64>(9)? as u32,
cache_creation_tokens: row.get::<_, i64>(10)? as u32,
input_cost_usd: row.get(11)?,
output_cost_usd: row.get(12)?,
cache_read_cost_usd: row.get(13)?,
cache_creation_cost_usd: row.get(14)?,
total_cost_usd: row.get(15)?,
is_streaming: row.get::<_, i64>(16)? != 0,
latency_ms: row.get::<_, i64>(17)? as u64,
first_token_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
duration_ms: row.get::<_, Option<i64>>(19)?.map(|v| v as u64),
status_code: row.get::<_, i64>(20)? as u16,
error_message: row.get(21)?,
created_at: row.get(22)?,
data_source: row.get(23)?,
})
})?;
let rows = stmt.query_map(params_refs.as_slice(), row_to_request_log_detail)?;
let mut logs = Vec::new();
let mut provider_cache = HashMap::new();
@@ -1116,36 +1147,7 @@ impl Database {
LEFT JOIN providers p ON l.provider_id = p.id AND l.app_type = p.app_type
WHERE l.request_id = ?"
);
let result = conn.query_row(&detail_sql, [request_id], |row| {
Ok(RequestLogDetail {
request_id: row.get(0)?,
provider_id: row.get(1)?,
provider_name: row.get(2)?,
app_type: row.get(3)?,
model: row.get(4)?,
request_model: row.get(5)?,
cost_multiplier: row
.get::<_, Option<String>>(6)?
.unwrap_or_else(|| "1".to_string()),
input_tokens: row.get::<_, i64>(7)? as u32,
output_tokens: row.get::<_, i64>(8)? as u32,
cache_read_tokens: row.get::<_, i64>(9)? as u32,
cache_creation_tokens: row.get::<_, i64>(10)? as u32,
input_cost_usd: row.get(11)?,
output_cost_usd: row.get(12)?,
cache_read_cost_usd: row.get(13)?,
cache_creation_cost_usd: row.get(14)?,
total_cost_usd: row.get(15)?,
is_streaming: row.get::<_, i64>(16)? != 0,
latency_ms: row.get::<_, i64>(17)? as u64,
first_token_ms: row.get::<_, Option<i64>>(18)?.map(|v| v as u64),
duration_ms: row.get::<_, Option<i64>>(19)?.map(|v| v as u64),
status_code: row.get::<_, i64>(20)? as u16,
error_message: row.get(21)?,
created_at: row.get(22)?,
data_source: row.get(23)?,
})
});
let result = conn.query_row(&detail_sql, [request_id], row_to_request_log_detail);
match result {
Ok(mut detail) => {
@@ -1276,27 +1278,80 @@ struct PricingInfo {
}
impl Database {
/// Recalculate stored zero-cost usage rows once pricing becomes available.
pub(crate) fn backfill_missing_usage_costs(&self) -> Result<u64, AppError> {
let conn = lock_conn!(self.conn);
Self::backfill_missing_usage_costs_on_conn(&conn)
}
fn backfill_missing_usage_costs_on_conn(conn: &Connection) -> Result<u64, AppError> {
let mut logs = {
let mut stmt = conn.prepare(
"SELECT request_id, provider_id, NULL AS provider_name, app_type, model, request_model,
cost_multiplier,
input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
input_cost_usd, output_cost_usd, cache_read_cost_usd,
cache_creation_cost_usd, total_cost_usd, is_streaming, latency_ms,
first_token_ms, duration_ms, status_code, error_message, created_at,
data_source
FROM proxy_request_logs
WHERE CAST(total_cost_usd AS REAL) <= 0
AND (input_tokens > 0 OR output_tokens > 0
OR cache_read_tokens > 0 OR cache_creation_tokens > 0)",
)?;
let rows = stmt.query_map([], row_to_request_log_detail)?;
rows.collect::<Result<Vec<_>, _>>()?
};
if logs.is_empty() {
return Ok(0);
}
let tx = conn
.unchecked_transaction()
.map_err(|e| AppError::Database(format!("启动用量成本回填事务失败: {e}")))?;
let mut updated = 0u64;
let mut provider_cache = HashMap::new();
let mut pricing_cache = HashMap::new();
for log in &mut logs {
if Self::maybe_backfill_log_costs(&tx, log, &mut provider_cache, &mut pricing_cache)? {
updated += 1;
}
}
tx.commit()
.map_err(|e| AppError::Database(format!("提交用量成本回填事务失败: {e}")))?;
if updated > 0 {
log::info!("已回填 {updated} 条缺失的用量成本");
}
Ok(updated)
}
/// 尝试为单条 log 回填成本字段。返回是否实际写入(true=已 UPDATEfalse=跳过)。
fn maybe_backfill_log_costs(
conn: &Connection,
log: &mut RequestLogDetail,
provider_cache: &mut HashMap<(String, String), rust_decimal::Decimal>,
pricing_cache: &mut HashMap<String, PricingInfo>,
) -> Result<(), AppError> {
let total_cost = rust_decimal::Decimal::from_str(&log.total_cost_usd)
) -> Result<bool, AppError> {
let existing_cost = rust_decimal::Decimal::from_str(&log.total_cost_usd)
.unwrap_or(rust_decimal::Decimal::ZERO);
let has_cost = total_cost > rust_decimal::Decimal::ZERO;
let has_cost = existing_cost > rust_decimal::Decimal::ZERO;
let has_usage = log.input_tokens > 0
|| log.output_tokens > 0
|| log.cache_read_tokens > 0
|| log.cache_creation_tokens > 0;
if has_cost || !has_usage {
return Ok(());
return Ok(false);
}
let pricing = match Self::get_model_pricing_cached(conn, pricing_cache, &log.model)? {
Some(info) => info,
None => return Ok(()),
None => return Ok(false),
};
let multiplier = Self::get_cost_multiplier_cached(
conn,
@@ -1352,7 +1407,7 @@ impl Database {
)
.map_err(|e| AppError::Database(format!("更新请求成本失败: {e}")))?;
Ok(())
Ok(true)
}
fn get_cost_multiplier_cached(
@@ -1431,7 +1486,8 @@ pub(crate) fn find_model_pricing_row(
.next()
.unwrap_or(model_id)
.trim()
.replace('@', "-");
.replace('@', "-")
.to_ascii_lowercase();
// 精确匹配清洗后的名称
let exact = conn
@@ -1514,6 +1570,110 @@ mod tests {
Ok(())
}
fn create_legacy_nullable_logs_table(conn: &Connection) -> Result<(), AppError> {
conn.execute(
"CREATE TABLE proxy_request_logs (
request_id TEXT PRIMARY KEY,
app_type TEXT NOT NULL,
model TEXT NOT NULL,
input_tokens INTEGER NOT NULL,
output_tokens INTEGER NOT NULL,
cache_read_tokens INTEGER NOT NULL,
cache_creation_tokens INTEGER NOT NULL,
status_code INTEGER NOT NULL,
created_at INTEGER NOT NULL,
data_source TEXT
)",
[],
)?;
Ok(())
}
#[test]
fn test_effective_filter_keeps_legacy_null_data_source_proxy_rows() -> Result<(), AppError> {
let conn = Connection::open_in_memory()?;
create_legacy_nullable_logs_table(&conn)?;
conn.execute(
"INSERT INTO proxy_request_logs (
request_id, app_type, model, input_tokens, output_tokens,
cache_read_tokens, cache_creation_tokens, status_code, created_at, data_source
) VALUES ('legacy-proxy', 'codex', 'gpt-5.5', 10, 2, 1, 0, 200, 1000, NULL)",
[],
)?;
let filter = effective_usage_log_filter("l");
let sql = format!("SELECT COUNT(*) FROM proxy_request_logs l WHERE {filter}");
let count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
assert_eq!(count, 1);
Ok(())
}
#[test]
fn test_matching_proxy_log_treats_legacy_null_data_source_as_proxy() -> Result<(), AppError> {
let conn = Connection::open_in_memory()?;
create_legacy_nullable_logs_table(&conn)?;
conn.execute(
"INSERT INTO proxy_request_logs (
request_id, app_type, model, input_tokens, output_tokens,
cache_read_tokens, cache_creation_tokens, status_code, created_at, data_source
) VALUES ('legacy-proxy', 'codex', 'gpt-5.5', 10, 2, 1, 0, 200, 1000, NULL)",
[],
)?;
let key = DedupKey {
app_type: "codex",
model: "gpt-5.5",
input_tokens: 10,
output_tokens: 2,
cache_read_tokens: 1,
cache_creation_tokens: 0,
created_at: 1000,
};
assert!(has_matching_proxy_usage_log(&conn, &key)?);
Ok(())
}
#[test]
fn test_backfill_missing_usage_costs_uses_new_gpt_5_5_pricing() -> Result<(), AppError> {
let db = Database::memory()?;
{
let conn = lock_conn!(db.conn);
insert_usage_log(
&conn,
"codex-gpt-5-5-zero-cost",
"codex",
"_codex_session",
"gpt-5.5",
"codex_session",
1000,
1_000_000,
1_000_000,
0,
0,
200,
"0",
)?;
}
assert_eq!(db.backfill_missing_usage_costs()?, 1);
let conn = lock_conn!(db.conn);
let (input_cost, output_cost, total_cost): (String, String, String) = conn.query_row(
"SELECT input_cost_usd, output_cost_usd, total_cost_usd
FROM proxy_request_logs WHERE request_id = 'codex-gpt-5-5-zero-cost'",
[],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
)?;
assert_eq!(input_cost, "5.000000");
assert_eq!(output_cost, "30.000000");
assert_eq!(total_cost, "35.000000");
Ok(())
}
#[test]
fn test_get_usage_summary() -> Result<(), AppError> {
let db = Database::memory()?;
@@ -2436,6 +2596,11 @@ mod tests {
result.is_some(),
"带 @ 分隔符的模型 gpt-5.2-codex@low 应能匹配到 gpt-5.2-codex-low"
);
let result = find_model_pricing_row(&conn, "OpenAI/GPT-5.5@HIGH")?;
assert!(
result.is_some(),
"大小写混合的 GPT-5.5 模型应能归一化匹配到 gpt-5.5-high"
);
// 测试不存在的模型
let result = find_model_pricing_row(&conn, "unknown-model-123")?;
+1
View File
@@ -17,6 +17,7 @@ const DATA_SOURCE_ICONS: Record<string, React.ReactNode> = {
codex_db: <Database className="h-3.5 w-3.5" />,
codex_session: <FileText className="h-3.5 w-3.5" />,
gemini_session: <FileText className="h-3.5 w-3.5" />,
hermes_session: <Database className="h-3.5 w-3.5" />,
};
export function DataSourceBar({ refreshIntervalMs }: DataSourceBarProps) {
+1
View File
@@ -141,6 +141,7 @@ export function RequestLogTable({
<SelectItem value="claude">Claude</SelectItem>
<SelectItem value="codex">Codex</SelectItem>
<SelectItem value="gemini">Gemini</SelectItem>
<SelectItem value="hermes">Hermes</SelectItem>
</SelectContent>
</Select>
+1
View File
@@ -35,6 +35,7 @@ const APP_FILTER_OPTIONS: AppTypeFilter[] = [
"claude",
"codex",
"gemini",
"hermes",
];
export function UsageDashboard() {
+4 -2
View File
@@ -1120,7 +1120,8 @@
"all": "All",
"claude": "Claude Code",
"codex": "Codex",
"gemini": "Gemini"
"gemini": "Gemini",
"hermes": "Hermes"
},
"dataSources": "Data Sources",
"dataSource": {
@@ -1128,7 +1129,8 @@
"session_log": "Session Log",
"codex_db": "Codex DB",
"codex_session": "Codex Session",
"gemini_session": "Gemini Session"
"gemini_session": "Gemini Session",
"hermes_session": "Hermes Session"
},
"sessionSync": {
"trigger": "Sync session logs",
+4 -2
View File
@@ -1120,7 +1120,8 @@
"all": "すべて",
"claude": "Claude Code",
"codex": "Codex",
"gemini": "Gemini"
"gemini": "Gemini",
"hermes": "Hermes"
},
"dataSources": "データソース",
"dataSource": {
@@ -1128,7 +1129,8 @@
"session_log": "セッションログ",
"codex_db": "Codex DB",
"codex_session": "Codex セッション",
"gemini_session": "Gemini セッション"
"gemini_session": "Gemini セッション",
"hermes_session": "Hermes セッション"
},
"sessionSync": {
"trigger": "セッションログを同期",
+4 -2
View File
@@ -1120,7 +1120,8 @@
"all": "全部",
"claude": "Claude Code",
"codex": "Codex",
"gemini": "Gemini"
"gemini": "Gemini",
"hermes": "Hermes"
},
"dataSources": "数据来源",
"dataSource": {
@@ -1128,7 +1129,8 @@
"session_log": "会话日志",
"codex_db": "Codex 数据库",
"codex_session": "Codex 会话日志",
"gemini_session": "Gemini 会话日志"
"gemini_session": "Gemini 会话日志",
"hermes_session": "Hermes 会话日志"
},
"sessionSync": {
"trigger": "同步会话日志",
+1 -1
View File
@@ -129,7 +129,7 @@ export interface UsageRangeSelection {
customEndDate?: number;
}
export type AppTypeFilter = "all" | "claude" | "codex" | "gemini";
export type AppTypeFilter = "all" | "claude" | "codex" | "gemini" | "hermes";
export interface StatsFilters {
timeRange: UsageRangePreset;