@@ -1,11 +1,17 @@
//! Codex 使用数据导入
//! Codex 会话日志使用追踪
//!
//! 从 ~/.codex/state_5.sqlite 的 threads 表读取 token 使用数据,
//! 导入到 proxy_request_logs 表 。
//! 从 ~/.codex/sessions/ 下的 JSONL 会话文件中提取精确 token 使用数据,
//! 替代原有的 state_5.sqlite 估算方案 。
//!
//! ## 限制
//! - Thread 级粒度(非请求级)
//! - 只有总 token 数,使用估算比例分 input/output
//! ## 数据流
//! ```text
//! ~/.codex/sessions/YYYY/MM/DD/*.jsonl → 增量解析 → delta 计算 → 费用计算 → proxy_request_logs 表
//! ```
//!
//! ## 解析的事件类型
//! - `session_meta` → 提取 session_id
//! - `turn_context` → 提取当前 model
//! - `event_msg` (type=token_count) → 提取累计 token 用量,计算 delta
use crate ::codex_config ::get_codex_config_dir ;
use crate ::database ::{ lock_conn , Database } ;
@@ -14,203 +20,510 @@ use crate::proxy::usage::calculator::{CostCalculator, ModelPricing};
use crate ::proxy ::usage ::parser ::TokenUsage ;
use crate ::services ::session_usage ::SessionSyncResult ;
use rust_decimal ::Decimal ;
use std ::fs ;
use std ::io ::{ BufRead , BufReader } ;
use std ::path ::{ Path , PathBuf } ;
use std ::time ::SystemTime ;
/// 默认 input/output 比例(70% input, 30% output )
const DEFAULT_INPUT_RATIO : f64 = 0.7 ;
/// 累计 token 用量(跟踪 total_token_usage 字段 )
#[ derive(Debug, Clone, Default) ]
struct CumulativeTokens {
input : u64 ,
cached_input : u64 ,
output : u64 ,
}
/// 同步 Codex 使用数据
/// 单次 API 调用的 token 增量
#[ derive(Debug) ]
struct DeltaTokens {
input : u32 ,
cached_input : u32 ,
output : u32 ,
}
impl DeltaTokens {
fn is_zero ( & self ) -> bool {
self . input = = 0 & & self . cached_input = = 0 & & self . output = = 0
}
}
/// 单文件解析时的运行状态
struct FileParseState {
session_id : Option < String > ,
current_model : String ,
prev_total : Option < CumulativeTokens > ,
event_index : u32 ,
}
/// 计算两次累计值之间的 delta
fn compute_delta ( prev : & Option < CumulativeTokens > , current : & CumulativeTokens ) -> DeltaTokens {
match prev {
None = > DeltaTokens {
input : current . input as u32 ,
cached_input : current . cached_input as u32 ,
output : current . output as u32 ,
} ,
Some ( p ) = > DeltaTokens {
input : current . input . saturating_sub ( p . input ) as u32 ,
cached_input : current . cached_input . saturating_sub ( p . cached_input ) as u32 ,
output : current . output . saturating_sub ( p . output ) as u32 ,
} ,
}
}
/// 从 JSON Value 中提取累计 token 用量
fn parse_cumulative_tokens ( total_usage : & serde_json ::Value ) -> Option < CumulativeTokens > {
if total_usage . is_null ( ) | | ! total_usage . is_object ( ) {
return None ;
}
Some ( CumulativeTokens {
input : total_usage
. get ( " input_tokens " )
. and_then ( | v | v . as_u64 ( ) )
. unwrap_or ( 0 ) ,
cached_input : total_usage
. get ( " cached_input_tokens " )
. or_else ( | | total_usage . get ( " cache_read_input_tokens " ) )
. and_then ( | v | v . as_u64 ( ) )
. unwrap_or ( 0 ) ,
output : total_usage
. get ( " output_tokens " )
. and_then ( | v | v . as_u64 ( ) )
. unwrap_or ( 0 ) ,
} )
}
/// 同步 Codex 使用数据(从 JSONL 会话日志)
pub fn sync_codex_usage ( db : & Database ) -> Result < SessionSyncResult , AppError > {
let codex_dir = get_codex_config_dir ( ) ;
let state_db_path = codex_dir . join ( " state_5.sqlite " ) ;
if ! state_db_path . exists ( ) {
return Ok ( SessionSyncResult {
imported : 0 ,
skipped : 0 ,
files_scanned : 0 ,
errors : vec ! [ ] ,
} ) ;
}
let files = collect_codex_session_files ( & codex_dir ) ;
let mut result = SessionSyncResult {
imported : 0 ,
skipped : 0 ,
files_scanned : 1 ,
files_scanned : files . len ( ) as u32 ,
errors : vec ! [ ] ,
} ;
// 只读打开 Codex SQLite 数据库
let codex_conn = match rusqlite ::Connection ::open_with_flags (
& state_db_path ,
rusqlite ::OpenFlags ::SQLITE_OPEN_READ_ONLY | rusqlite ::OpenFlags ::SQLITE_OPEN_NO_MUTEX ,
) {
Ok ( c ) = > c ,
Err ( e ) = > {
result . errors . push ( format! ( " 无法打开 Codex 数据库: {e} " ) ) ;
return Ok ( result ) ;
}
} ;
if files . is_empty ( ) {
return Ok ( result ) ;
}
// 查询所有 thread
let mut stmt = match codex_conn . prepare (
" SELECT id, model, model_provider, tokens_used, created_at, title
FROM threads
WHERE tokens_used > 0
ORDER BY created_at DESC " ,
) {
Ok ( s ) = > s ,
Err ( e ) = > {
result
. errors
. push ( format! ( " 查询 Codex threads 失败: {e} " ) ) ;
return Ok ( result ) ;
}
} ;
let rows = match stmt . query_map ( [ ] , | row | {
Ok ( (
row . get ::< _ , String > ( 0 ) ? , // id
row . get ::< _ , Option < String > > ( 1 ) ? , // model (nullable)
row . get ::< _ , String > ( 2 ) ? , // model_provider
row . get ::< _ , i64 > ( 3 ) ? , // tokens_used
row . get ::< _ , i64 > ( 4 ) ? , // created_at (unix seconds)
row . get ::< _ , String > ( 5 ) ? , // title
) )
} ) {
Ok ( r ) = > r ,
Err ( e ) = > {
result . errors . push ( format! ( " 遍历 threads 失败: {e} " ) ) ;
return Ok ( result ) ;
}
} ;
let conn = lock_conn! ( db . conn ) ;
for row_result in rows {
let ( thread_id , model , _provider , tokens_used , created_at , _title ) = match row_result {
Ok ( r ) = > r ,
Err ( e ) = > {
result . errors . push ( format! ( " 读取行失败: {e} " ) ) ;
continue ;
for file_path in & files {
match sync_single_codex_file ( db , file_path ) {
Ok ( ( imported , skipped ) ) = > {
result . imported + = imported ;
result . skipped + = skipped ;
}
} ;
let request_id = format! ( " codex_thread: {thread_id} " ) ;
let model = model . unwrap_or_else ( | | " unknown " . to_string ( ) ) ;
// 检查是否已存在
let exists : bool = conn
. query_row (
" SELECT COUNT(*) FROM proxy_request_logs WHERE request_id = ?1 " ,
rusqlite ::params! [ request_id ] ,
| row | row . get ::< _ , i64 > ( 0 ) . map ( | c | c > 0 ) ,
)
. unwrap_or ( false ) ;
if exists {
result . skipped + = 1 ;
continue ;
}
// 使用估算比例分配 input/output
let input_tokens = ( tokens_used as f64 * DEFAULT_INPUT_RATIO ) as u32 ;
let output_tokens = tokens_used as u32 - input_tokens ;
let usage = TokenUsage {
input_tokens ,
output_tokens ,
cache_read_tokens : 0 ,
cache_creation_tokens : 0 ,
model : Some ( model . clone ( ) ) ,
} ;
// 查找定价
let pricing = find_codex_pricing ( & conn , & model ) ;
let multiplier = Decimal ::from ( 1 ) ;
let ( input_cost , output_cost , cache_read_cost , cache_creation_cost , total_cost ) =
match pricing {
Some ( p ) = > {
let cost = CostCalculator ::calculate ( & usage , & p , multiplier ) ;
(
cost . input_cost . to_string ( ) ,
cost . output_cost . to_string ( ) ,
cost . cache_read_cost . to_string ( ) ,
cost . cache_creation_cost . to_string ( ) ,
cost . total_cost . to_string ( ) ,
)
}
None = > (
" 0 " . to_string ( ) ,
" 0 " . to_string ( ) ,
" 0 " . to_string ( ) ,
" 0 " . to_string ( ) ,
" 0 " . to_string ( ) ,
) ,
} ;
let insert_result = conn . execute (
" INSERT OR IGNORE INTO proxy_request_logs (
request_id, provider_id, app_type, model, request_model,
input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
input_cost_usd, output_cost_usd, cache_read_cost_usd, cache_creation_cost_usd, total_cost_usd,
latency_ms, first_token_ms, status_code, error_message, session_id,
provider_type, is_streaming, cost_multiplier, created_at, data_source
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24) " ,
rusqlite ::params! [
request_id ,
" _codex_session " ,
" codex " ,
model ,
model ,
input_tokens ,
output_tokens ,
0 i64 , // cache_read_tokens
0 i64 , // cache_creation_tokens
input_cost ,
output_cost ,
cache_read_cost ,
cache_creation_cost ,
total_cost ,
0 i64 , // latency_ms
Option ::< i64 > ::None ,
200 i64 , // status_code
Option ::< String > ::None ,
Some ( thread_id ) , // session_id = thread_id
Some ( " codex_db " ) ,
0 i64 , // is_streaming: unknown
" 1.0 " ,
created_at ,
" codex_db " ,
] ,
) ;
match insert_result {
Ok ( changed ) if changed > 0 = > result . imported + = 1 ,
Ok ( _ ) = > result . skipped + = 1 ,
Err ( e ) = > {
result . errors . push ( format! ( " 插入失败: {e} " ) ) ;
result . skipped + = 1 ;
let msg = format! ( " Codex 会话文件解析失败 {} : {e} " , file_path . display ( ) ) ;
log ::warn! ( " [CODEX-SYNC] {msg} " ) ;
result . errors . push ( msg ) ;
}
}
}
if result . imported > 0 {
log ::info! (
" [CODEX-SYNC] 同步完成: 导入 {} 条, 跳过 {} 条 " ,
" [CODEX-SYNC] 同步完成: 导入 {} 条, 跳过 {} 条, � � 描 {} 个文件 " ,
result . imported ,
result . skipped
result . skipped ,
result . files_scanned
) ;
}
Ok ( result )
}
/// 收集所有 Codex 会话 JSONL 文件
fn collect_codex_session_files ( codex_dir : & Path ) -> Vec < PathBuf > {
let mut files = Vec ::new ( ) ;
// 1. 扫描 sessions/YYYY/MM/DD/*.jsonl(日期分区目录� �
let sessions_dir = codex_dir . join ( " sessions " ) ;
if sessions_dir . is_dir ( ) {
collect_jsonl_recursive ( & sessions_dir , & mut files , 0 , 3 ) ;
}
// 2. 扫描 archived_sessions/*.jsonl(扁平归档目录)
let archived_dir = codex_dir . join ( " archived_sessions " ) ;
if archived_dir . is_dir ( ) {
if let Ok ( entries ) = fs ::read_dir ( & archived_dir ) {
for entry in entries . flatten ( ) {
let path = entry . path ( ) ;
if path . extension ( ) . and_then ( | e | e . to_str ( ) ) = = Some ( " jsonl " ) {
files . push ( path ) ;
}
}
}
}
files
}
/// 递归扫描目录下的 .jsonl 文件(限制最大深度)
fn collect_jsonl_recursive ( dir : & Path , files : & mut Vec < PathBuf > , depth : u32 , max_depth : u32 ) {
let entries = match fs ::read_dir ( dir ) {
Ok ( e ) = > e ,
Err ( _ ) = > return ,
} ;
for entry in entries . flatten ( ) {
let path = entry . path ( ) ;
if path . is_dir ( ) & & depth < max_depth {
collect_jsonl_recursive ( & path , files , depth + 1 , max_depth ) ;
} else if path . extension ( ) . and_then ( | e | e . to_str ( ) ) = = Some ( " jsonl " ) {
files . push ( path ) ;
}
}
}
/// 同步单� � Codex JSONL 文件,返回 (imported, skipped)
fn sync_single_codex_file ( db : & Database , file_path : & Path ) -> Result < ( u32 , u32 ) , AppError > {
let file_path_str = file_path . to_string_lossy ( ) . to_string ( ) ;
// 获取文件元数据
let metadata = fs ::metadata ( file_path )
. map_err ( | e | AppError ::Config ( format! ( " 无法读取文� � � 元数据: {e} " ) ) ) ? ;
let file_modified = metadata
. modified ( )
. ok ( )
. and_then ( | t | t . duration_since ( SystemTime ::UNIX_EPOCH ) . ok ( ) )
. map ( | d | d . as_secs ( ) as i64 )
. unwrap_or ( 0 ) ;
// 检查同步状态
let ( last_modified , last_offset ) = get_sync_state ( db , & file_path_str ) ? ;
// 文件未变化则跳过
if file_modified < = last_modified {
return Ok ( ( 0 , 0 ) ) ;
}
// 打开文件逐行解析
let file =
fs ::File ::open ( file_path ) . map_err ( | e | AppError ::Config ( format! ( " 无法打开文件: {e} " ) ) ) ? ;
let reader = BufReader ::new ( file ) ;
let mut state = FileParseState {
session_id : None ,
current_model : " unknown " . to_string ( ) ,
prev_total : None ,
event_index : 0 ,
} ;
let mut line_offset : i64 = 0 ;
let mut imported : u32 = 0 ;
let mut skipped : u32 = 0 ;
for line_result in reader . lines ( ) {
line_offset + = 1 ;
let line = match line_result {
Ok ( l ) = > l ,
Err ( _ ) = > continue , // 容忍不完整的最后一行
} ;
if line . trim ( ) . is_empty ( ) {
continue ;
}
// 快速过滤:在 JSON 反序列化前跳过无关行
let is_event_msg = line . contains ( " \" event_msg \" " ) ;
let is_turn_context = line . contains ( " \" turn_context \" " ) ;
let is_session_meta = line . contains ( " \" session_meta \" " ) ;
if ! is_event_msg & & ! is_turn_context & & ! is_session_meta {
continue ;
}
if is_event_msg & & ! line . contains ( " \" token_count \" " ) {
continue ;
}
let value : serde_json ::Value = match serde_json ::from_str ( & line ) {
Ok ( v ) = > v ,
Err ( _ ) = > continue ,
} ;
let event_type = match value . get ( " type " ) . and_then ( | t | t . as_str ( ) ) {
Some ( t ) = > t ,
None = > continue ,
} ;
match event_type {
" session_meta " = > {
if state . session_id . is_none ( ) {
let payload = value . get ( " payload " ) ;
state . session_id = payload
. and_then ( | p | {
p . get ( " session_id " )
. or_else ( | | p . get ( " sessionId " ) )
. or_else ( | | p . get ( " id " ) )
} )
. and_then ( | v | v . as_str ( ) )
. map ( | s | s . to_string ( ) ) ;
}
}
" turn_context " = > {
if let Some ( payload ) = value . get ( " payload " ) {
// model 可能在 payload.model 或 payload.info.model
if let Some ( model ) = payload
. get ( " model " )
. or_else ( | | payload . get ( " info " ) . and_then ( | info | info . get ( " model " ) ) )
. and_then ( | v | v . as_str ( ) )
{
state . current_model = model . to_string ( ) ;
}
}
}
" event_msg " = > {
let payload = match value . get ( " payload " ) {
Some ( p ) = > p ,
None = > continue ,
} ;
// 只处理 token_count 类型
if payload . get ( " type " ) . and_then ( | t | t . as_str ( ) ) ! = Some ( " token_count " ) {
continue ;
}
let info = match payload . get ( " info " ) {
Some ( i ) if ! i . is_null ( ) = > i ,
_ = > continue , // info 为 null 的首个事件跳� �
} ;
// 提取模型(token_count 事件也可能携带 model)
if let Some ( model ) = info
. get ( " model " )
. or_else ( | | info . get ( " model_name " ) )
. or_else ( | | payload . get ( " model " ) )
. and_then ( | v | v . as_str ( ) )
{
state . current_model = model . to_string ( ) ;
}
// 优先用 total_token_usage(累计值),fallback 到 last_token_usage(增量值)
let ( cumulative , is_total ) = if let Some ( total ) = info . get ( " total_token_usage " ) {
( parse_cumulative_tokens ( total ) , true )
} else if let Some ( last ) = info . get ( " last_token_usage " ) {
( parse_cumulative_tokens ( last ) , false )
} else {
continue ;
} ;
let cumulative = match cumulative {
Some ( c ) = > c ,
None = > continue ,
} ;
let delta = if is_total {
// 累计值模式:计算与上次的 delta
let d = compute_delta ( & state . prev_total , & cumulative ) ;
state . prev_total = Some ( cumulative ) ;
d
} else {
// 增量值模式:直接使用 last_token_usage 的值
DeltaTokens {
input : cumulative . input as u32 ,
cached_input : cumulative . cached_input as u32 ,
output : cumulative . output as u32 ,
}
} ;
if delta . is_zero ( ) {
continue ; // 跳过 task 边界的零 delta 事件
}
state . event_index + = 1 ;
// 跳过已处理的行(但仍需解析以恢复状态)
if line_offset < = last_offset {
continue ;
}
// 生成唯一 request_id
let session_id_str = state . session_id . as_deref ( ) . unwrap_or ( " unknown " ) ;
let request_id = format! ( " codex_session: {} : {} " , session_id_str , state . event_index ) ;
// 提取时间戳
let timestamp = value
. get ( " timestamp " )
. and_then ( | v | v . as_str ( ) )
. map ( | s | s . to_string ( ) ) ;
match insert_codex_session_entry (
db ,
& request_id ,
& delta ,
& state . current_model ,
state . session_id . as_deref ( ) ,
timestamp . as_deref ( ) ,
) {
Ok ( true ) = > imported + = 1 ,
Ok ( false ) = > skipped + = 1 ,
Err ( e ) = > {
log ::warn! ( " [CODEX-SYNC] 插入失败 ({}): {e} " , request_id ) ;
skipped + = 1 ;
}
}
}
_ = > { }
}
}
// 更新同步状态
update_sync_state ( db , & file_path_str , file_modified , line_offset ) ? ;
Ok ( ( imported , skipped ) )
}
/// 插入单条 Codex 会话记录到 proxy_request_logs
fn insert_codex_session_entry (
db : & Database ,
request_id : & str ,
delta : & DeltaTokens ,
model : & str ,
session_id : Option < & str > ,
timestamp : Option < & str > ,
) -> Result < bool , AppError > {
let conn = lock_conn! ( db . conn ) ;
// 检查是否已存在
let exists : bool = conn
. query_row (
" SELECT COUNT(*) FROM proxy_request_logs WHERE request_id = ?1 " ,
rusqlite ::params! [ request_id ] ,
| row | row . get ::< _ , i64 > ( 0 ) . map ( | c | c > 0 ) ,
)
. unwrap_or ( false ) ;
if exists {
return Ok ( false ) ;
}
// 解析时间戳
let created_at = timestamp
. and_then ( | ts | {
chrono ::DateTime ::parse_from_rfc3339 ( ts )
. ok ( )
. map ( | dt | dt . timestamp ( ) )
} )
. unwrap_or_else ( | | {
SystemTime ::now ( )
. duration_since ( SystemTime ::UNIX_EPOCH )
. map ( | d | d . as_secs ( ) as i64 )
. unwrap_or ( 0 )
} ) ;
// 计算费用
let usage = TokenUsage {
input_tokens : delta . input ,
output_tokens : delta . output ,
cache_read_tokens : delta . cached_input ,
cache_creation_tokens : 0 ,
model : Some ( model . to_string ( ) ) ,
} ;
let pricing = find_codex_pricing ( & conn , model ) ;
let multiplier = Decimal ::from ( 1 ) ;
let ( input_cost , output_cost , cache_read_cost , cache_creation_cost , total_cost ) = match pricing
{
Some ( p ) = > {
let cost = CostCalculator ::calculate ( & usage , & p , multiplier ) ;
(
cost . input_cost . to_string ( ) ,
cost . output_cost . to_string ( ) ,
cost . cache_read_cost . to_string ( ) ,
cost . cache_creation_cost . to_string ( ) ,
cost . total_cost . to_string ( ) ,
)
}
None = > (
" 0 " . to_string ( ) ,
" 0 " . to_string ( ) ,
" 0 " . to_string ( ) ,
" 0 " . to_string ( ) ,
" 0 " . to_string ( ) ,
) ,
} ;
conn . execute (
" INSERT OR IGNORE INTO proxy_request_logs (
request_id, provider_id, app_type, model, request_model,
input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
input_cost_usd, output_cost_usd, cache_read_cost_usd, cache_creation_cost_usd, total_cost_usd,
latency_ms, first_token_ms, status_code, error_message, session_id,
provider_type, is_streaming, cost_multiplier, created_at, data_source
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24) " ,
rusqlite ::params! [
request_id ,
" _codex_session " , // provider_id
" codex " , // app_type
model ,
model , // request_model = model
delta . input ,
delta . output ,
delta . cached_input ,
0 i64 , // cache_creation_tokens: Codex 日志无此数据
input_cost ,
output_cost ,
cache_read_cost ,
cache_creation_cost ,
total_cost ,
0 i64 , // latency_ms
Option ::< i64 > ::None , // first_token_ms
200 i64 , // status_code
Option ::< String > ::None , // error_message
session_id . map ( | s | s . to_string ( ) ) ,
Some ( " codex_session " ) , // provider_type
1 i64 , // is_streaming
" 1.0 " , // cost_multiplier
created_at ,
" codex_session " , // data_source
] ,
)
. map_err ( | e | AppError ::Database ( format! ( " 插入 Codex 会话日志失败: {e} " ) ) ) ? ;
Ok ( true )
}
/// 获取文件的同步状态
fn get_sync_state ( db : & Database , file_path : & str ) -> Result < ( i64 , i64 ) , AppError > {
let conn = lock_conn! ( db . conn ) ;
let result = conn . query_row (
" SELECT last_modified, last_line_offset FROM session_log_sync WHERE file_path = ?1 " ,
rusqlite ::params! [ file_path ] ,
| row | Ok ( ( row . get ::< _ , i64 > ( 0 ) ? , row . get ::< _ , i64 > ( 1 ) ? ) ) ,
) ;
Ok ( result . unwrap_or ( ( 0 , 0 ) ) )
}
/// 更新文件的同步状态
fn update_sync_state (
db : & Database ,
file_path : & str ,
last_modified : i64 ,
last_offset : i64 ,
) -> Result < ( ) , AppError > {
let now = SystemTime ::now ( )
. duration_since ( SystemTime ::UNIX_EPOCH )
. map ( | d | d . as_secs ( ) as i64 )
. unwrap_or ( 0 ) ;
let conn = lock_conn! ( db . conn ) ;
conn . execute (
" INSERT OR REPLACE INTO session_log_sync (file_path, last_modified, last_line_offset, last_synced_at)
VALUES (?1, ?2, ?3, ?4) " ,
rusqlite ::params! [ file_path , last_modified , last_offset , now ] ,
)
. map_err ( | e | AppError ::Database ( format! ( " 更新同步状态失败: {e} " ) ) ) ? ;
Ok ( ( ) )
}
/// 查找 Codex 模型定价
fn find_codex_pricing (
conn : & rusqlite ::Connection ,
model_id : & str ,
) -> Option < ModelPricing > {
// 精确匹配
fn find_codex_pricing ( conn : & rusqlite ::Connection , model_id : & str ) -> Option < ModelPricing > {
// 精确匹� �
let result = conn . query_row (
" SELECT input_cost_per_million, output_cost_per_million,
cache_read_cost_per_million, cache_creation_cost_per_million
@@ -252,3 +565,117 @@ fn find_codex_pricing(
}
}
}
#[ cfg(test) ]
mod tests {
use super ::* ;
#[ test ]
fn test_delta_first_event ( ) {
let prev = None ;
let current = CumulativeTokens {
input : 17934 ,
cached_input : 9600 ,
output : 454 ,
} ;
let delta = compute_delta ( & prev , & current ) ;
assert_eq! ( delta . input , 17934 ) ;
assert_eq! ( delta . cached_input , 9600 ) ;
assert_eq! ( delta . output , 454 ) ;
assert! ( ! delta . is_zero ( ) ) ;
}
#[ test ]
fn test_delta_subsequent_event ( ) {
let prev = Some ( CumulativeTokens {
input : 17934 ,
cached_input : 9600 ,
output : 454 ,
} ) ;
let current = CumulativeTokens {
input : 36722 ,
cached_input : 27904 ,
output : 804 ,
} ;
let delta = compute_delta ( & prev , & current ) ;
assert_eq! ( delta . input , 36722 - 17934 ) ;
assert_eq! ( delta . cached_input , 27904 - 9600 ) ;
assert_eq! ( delta . output , 804 - 454 ) ;
}
#[ test ]
fn test_delta_zero_at_task_boundary ( ) {
let prev = Some ( CumulativeTokens {
input : 58346 ,
cached_input : 46976 ,
output : 1045 ,
} ) ;
// task 边界:相同的累计值
let current = CumulativeTokens {
input : 58346 ,
cached_input : 46976 ,
output : 1045 ,
} ;
let delta = compute_delta ( & prev , & current ) ;
assert! ( delta . is_zero ( ) ) ;
}
#[ test ]
fn test_delta_saturating_sub ( ) {
// 异常情况:当前值小于前值(不应发生,但需防护)
let prev = Some ( CumulativeTokens {
input : 100 ,
cached_input : 50 ,
output : 30 ,
} ) ;
let current = CumulativeTokens {
input : 80 ,
cached_input : 40 ,
output : 20 ,
} ;
let delta = compute_delta ( & prev , & current ) ;
assert_eq! ( delta . input , 0 ) ;
assert_eq! ( delta . cached_input , 0 ) ;
assert_eq! ( delta . output , 0 ) ;
assert! ( delta . is_zero ( ) ) ;
}
#[ test ]
fn test_parse_cumulative_tokens_valid ( ) {
let json : serde_json ::Value = serde_json ::json! ( {
" input_tokens " : 17934 ,
" cached_input_tokens " : 9600 ,
" output_tokens " : 454 ,
" reasoning_output_tokens " : 233 ,
" total_tokens " : 18388
} ) ;
let tokens = parse_cumulative_tokens ( & json ) . unwrap ( ) ;
assert_eq! ( tokens . input , 17934 ) ;
assert_eq! ( tokens . cached_input , 9600 ) ;
assert_eq! ( tokens . output , 454 ) ;
}
#[ test ]
fn test_parse_cumulative_tokens_null ( ) {
let json = serde_json ::Value ::Null ;
assert! ( parse_cumulative_tokens ( & json ) . is_none ( ) ) ;
}
#[ test ]
fn test_parse_cumulative_tokens_alt_field_names ( ) {
// 某些版本可能使用 cache_read_input_tokens 而非 cached_input_tokens
let json : serde_json ::Value = serde_json ::json! ( {
" input_tokens " : 1000 ,
" cache_read_input_tokens " : 500 ,
" output_tokens " : 200
} ) ;
let tokens = parse_cumulative_tokens ( & json ) . unwrap ( ) ;
assert_eq! ( tokens . cached_input , 500 ) ;
}
#[ test ]
fn test_collect_codex_session_files_nonexistent ( ) {
let files = collect_codex_session_files ( Path ::new ( " /nonexistent/path " ) ) ;
assert! ( files . is_empty ( ) ) ;
}
}