From 8669b408e9e9595f604143afc9d0f54709916475 Mon Sep 17 00:00:00 2001 From: Jason Date: Sun, 12 Apr 2026 00:10:01 +0800 Subject: [PATCH] fix(usage): deduplicate proxy and session log usage records Extract message_id from Claude API responses (msg_xxx) and use it to generate a shared request_id format (session:{msg_xxx}) between the proxy logger and session log sync. When session sync encounters the same request_id via INSERT OR IGNORE, it skips the duplicate. - Add message_id field to TokenUsage, extracted from Claude responses - Add TokenUsage::dedup_request_id() to generate shared request IDs - Define SESSION_REQUEST_ID_PREFIX constant to eliminate magic strings - Change proxy logger to INSERT OR REPLACE for richer-data-wins semantics --- src-tauri/src/proxy/handlers.rs | 2 +- src-tauri/src/proxy/response_processor.rs | 4 +- src-tauri/src/proxy/usage/calculator.rs | 4 ++ src-tauri/src/proxy/usage/logger.rs | 3 +- src-tauri/src/proxy/usage/parser.rs | 42 +++++++++++++++++-- src-tauri/src/services/session_usage.rs | 7 +++- src-tauri/src/services/session_usage_codex.rs | 1 + .../src/services/session_usage_gemini.rs | 1 + 8 files changed, 57 insertions(+), 7 deletions(-) diff --git a/src-tauri/src/proxy/handlers.rs b/src-tauri/src/proxy/handlers.rs index 08816246a..3a2bd4ca4 100644 --- a/src-tauri/src/proxy/handlers.rs +++ b/src-tauri/src/proxy/handlers.rs @@ -603,7 +603,7 @@ async fn log_usage( model }; - let request_id = uuid::Uuid::new_v4().to_string(); + let request_id = usage.dedup_request_id(); if let Err(e) = logger.log_with_calculation( request_id, diff --git a/src-tauri/src/proxy/response_processor.rs b/src-tauri/src/proxy/response_processor.rs index 3f4fe8425..1a185f93b 100644 --- a/src-tauri/src/proxy/response_processor.rs +++ b/src-tauri/src/proxy/response_processor.rs @@ -528,7 +528,7 @@ async fn log_usage_internal( model }; - let request_id = uuid::Uuid::new_v4().to_string(); + let request_id = usage.dedup_request_id(); log::debug!( "[{app_type}] 记录请求日志: id={request_id}, provider={provider_id}, model={model}, streaming={is_streaming}, status={status_code}, latency_ms={latency_ms}, first_token_ms={first_token_ms:?}, session={}, input={}, output={}, cache_read={}, cache_creation={}", @@ -784,6 +784,7 @@ mod tests { cache_read_tokens: 0, cache_creation_tokens: 0, model: None, + message_id: None, }; log_usage_internal( @@ -843,6 +844,7 @@ mod tests { cache_read_tokens: 0, cache_creation_tokens: 0, model: None, + message_id: None, }; log_usage_internal( diff --git a/src-tauri/src/proxy/usage/calculator.rs b/src-tauri/src/proxy/usage/calculator.rs index 340150237..6499ac0c7 100644 --- a/src-tauri/src/proxy/usage/calculator.rs +++ b/src-tauri/src/proxy/usage/calculator.rs @@ -114,6 +114,7 @@ mod tests { cache_read_tokens: 200, cache_creation_tokens: 100, model: None, + message_id: None, }; let pricing = ModelPricing::from_strings("3.0", "15.0", "0.3", "3.75").unwrap(); @@ -144,6 +145,7 @@ mod tests { cache_read_tokens: 0, cache_creation_tokens: 0, model: None, + message_id: None, }; let pricing = ModelPricing::from_strings("3.0", "15.0", "0", "0").unwrap(); @@ -165,6 +167,7 @@ mod tests { cache_read_tokens: 0, cache_creation_tokens: 0, model: None, + message_id: None, }; let multiplier = Decimal::from_str("1.0").unwrap(); @@ -181,6 +184,7 @@ mod tests { cache_read_tokens: 1, cache_creation_tokens: 1, model: None, + message_id: None, }; let pricing = ModelPricing::from_strings("0.075", "0.3", "0.01875", "0.075").unwrap(); diff --git a/src-tauri/src/proxy/usage/logger.rs b/src-tauri/src/proxy/usage/logger.rs index c1f3c3774..94f028017 100644 --- a/src-tauri/src/proxy/usage/logger.rs +++ b/src-tauri/src/proxy/usage/logger.rs @@ -73,7 +73,7 @@ impl<'a> UsageLogger<'a> { }); conn.execute( - "INSERT INTO proxy_request_logs ( + "INSERT OR REPLACE INTO proxy_request_logs ( request_id, provider_id, app_type, model, request_model, input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, input_cost_usd, output_cost_usd, cache_read_cost_usd, cache_creation_cost_usd, total_cost_usd, @@ -358,6 +358,7 @@ mod tests { cache_read_tokens: 0, cache_creation_tokens: 0, model: None, + message_id: None, }; logger.log_with_calculation( diff --git a/src-tauri/src/proxy/usage/parser.rs b/src-tauri/src/proxy/usage/parser.rs index 2541652c9..f2b797ee3 100644 --- a/src-tauri/src/proxy/usage/parser.rs +++ b/src-tauri/src/proxy/usage/parser.rs @@ -9,6 +9,9 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; +/// Session 日志 request_id 前缀,与 `session_usage.rs` 中的格式保持一致 +pub const SESSION_REQUEST_ID_PREFIX: &str = "session:"; + /// Token 使用量统计 #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct TokenUsage { @@ -18,6 +21,22 @@ pub struct TokenUsage { pub cache_creation_tokens: u32, /// 从响应中提取的实际模型名称(如果可用) pub model: Option, + /// 从响应中提取的消息 ID(用于跨源去重) + /// + /// Claude API: `msg_xxx`,与 session JSONL 中的 `message.id` 一致 + #[serde(skip)] + pub message_id: Option, +} + +impl TokenUsage { + /// 生成与 session 日志共享的 request_id,用于跨源去重。 + /// 有 message_id 时返回 `session:{id}`,否则回退到随机 UUID。 + pub fn dedup_request_id(&self) -> String { + self.message_id + .as_ref() + .map(|mid| format!("{SESSION_REQUEST_ID_PREFIX}{mid}")) + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()) + } } /// API 类型 @@ -39,6 +58,10 @@ impl TokenUsage { .get("model") .and_then(|v| v.as_str()) .map(|s| s.to_string()); + let message_id = body + .get("id") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); Some(Self { input_tokens: usage.get("input_tokens")?.as_u64()? as u32, @@ -52,6 +75,7 @@ impl TokenUsage { .and_then(|v| v.as_u64()) .unwrap_or(0) as u32, model, + message_id, }) } @@ -60,18 +84,23 @@ impl TokenUsage { pub fn from_claude_stream_events(events: &[Value]) -> Option { let mut usage = Self::default(); let mut model: Option = None; + let mut message_id: Option = None; for event in events { if let Some(event_type) = event.get("type").and_then(|v| v.as_str()) { match event_type { "message_start" => { - // 从 message_start 提取模型名称 - if model.is_none() { - if let Some(message) = event.get("message") { + if let Some(message) = event.get("message") { + if model.is_none() { if let Some(m) = message.get("model").and_then(|v| v.as_str()) { model = Some(m.to_string()); } } + if message_id.is_none() { + if let Some(id) = message.get("id").and_then(|v| v.as_str()) { + message_id = Some(id.to_string()); + } + } } if let Some(msg_usage) = event.get("message").and_then(|m| m.get("usage")) { // 从 message_start 获取 input_tokens(原生 Claude API) @@ -137,6 +166,7 @@ impl TokenUsage { if usage.input_tokens > 0 || usage.output_tokens > 0 { usage.model = model; + usage.message_id = message_id; Some(usage) } else { None @@ -153,6 +183,7 @@ impl TokenUsage { cache_read_tokens: 0, cache_creation_tokens: 0, model: None, + message_id: None, }) } @@ -202,6 +233,7 @@ impl TokenUsage { .and_then(|v| v.as_u64()) .unwrap_or(0) as u32, model, + message_id: None, }) } @@ -245,6 +277,7 @@ impl TokenUsage { .and_then(|v| v.as_u64()) .unwrap_or(0) as u32, model, + message_id: None, }) } @@ -339,6 +372,7 @@ impl TokenUsage { cache_read_tokens: cached_tokens, cache_creation_tokens: 0, model, + message_id: None, }) } @@ -383,6 +417,7 @@ impl TokenUsage { .unwrap_or(0) as u32, cache_creation_tokens: 0, model, + message_id: None, }) } @@ -433,6 +468,7 @@ impl TokenUsage { cache_read_tokens: total_cache_read, cache_creation_tokens: 0, model, + message_id: None, }) } else { None diff --git a/src-tauri/src/services/session_usage.rs b/src-tauri/src/services/session_usage.rs index d821f35d2..2b9c5af9f 100644 --- a/src-tauri/src/services/session_usage.rs +++ b/src-tauri/src/services/session_usage.rs @@ -278,7 +278,11 @@ fn sync_single_file(db: &Database, file_path: &Path) -> Result<(u32, u32), AppEr continue; } - let request_id = format!("session:{}", msg.message_id); + let request_id = format!( + "{}{}", + crate::proxy::usage::parser::SESSION_REQUEST_ID_PREFIX, + msg.message_id + ); // 跳过 output_tokens 为 0 的无意义条目 if msg.output_tokens == 0 { @@ -379,6 +383,7 @@ fn insert_session_log_entry( cache_read_tokens: msg.cache_read_tokens, cache_creation_tokens: msg.cache_creation_tokens, model: Some(msg.model.clone()), + message_id: None, }; let pricing = find_model_pricing_for_session(&conn, &msg.model); diff --git a/src-tauri/src/services/session_usage_codex.rs b/src-tauri/src/services/session_usage_codex.rs index 13dff44b0..bd92e2ea1 100644 --- a/src-tauri/src/services/session_usage_codex.rs +++ b/src-tauri/src/services/session_usage_codex.rs @@ -474,6 +474,7 @@ fn insert_codex_session_entry( cache_read_tokens: delta.cached_input, cache_creation_tokens: 0, model: Some(model.to_string()), + message_id: None, }; let pricing = find_codex_pricing(&conn, model); diff --git a/src-tauri/src/services/session_usage_gemini.rs b/src-tauri/src/services/session_usage_gemini.rs index 8bac1fa4f..24ff25c37 100644 --- a/src-tauri/src/services/session_usage_gemini.rs +++ b/src-tauri/src/services/session_usage_gemini.rs @@ -261,6 +261,7 @@ fn insert_gemini_session_entry( cache_read_tokens: tokens.cached, cache_creation_tokens: 0, model: Some(model.to_string()), + message_id: None, }; let pricing = find_gemini_pricing(&conn, model);