fix(usage): deduplicate proxy and session log usage records

Extract message_id from Claude API responses (msg_xxx) and use it to
generate a shared request_id format (session:{msg_xxx}) between the
proxy logger and session log sync. When session sync encounters the
same request_id via INSERT OR IGNORE, it skips the duplicate.

- Add message_id field to TokenUsage, extracted from Claude responses
- Add TokenUsage::dedup_request_id() to generate shared request IDs
- Define SESSION_REQUEST_ID_PREFIX constant to eliminate magic strings
- Change proxy logger to INSERT OR REPLACE for richer-data-wins semantics
This commit is contained in:
Jason
2026-04-12 00:10:01 +08:00
parent bb7c83c214
commit 8669b408e9
8 changed files with 57 additions and 7 deletions
+1 -1
View File
@@ -603,7 +603,7 @@ async fn log_usage(
model
};
let request_id = uuid::Uuid::new_v4().to_string();
let request_id = usage.dedup_request_id();
if let Err(e) = logger.log_with_calculation(
request_id,
+3 -1
View File
@@ -528,7 +528,7 @@ async fn log_usage_internal(
model
};
let request_id = uuid::Uuid::new_v4().to_string();
let request_id = usage.dedup_request_id();
log::debug!(
"[{app_type}] 记录请求日志: id={request_id}, provider={provider_id}, model={model}, streaming={is_streaming}, status={status_code}, latency_ms={latency_ms}, first_token_ms={first_token_ms:?}, session={}, input={}, output={}, cache_read={}, cache_creation={}",
@@ -784,6 +784,7 @@ mod tests {
cache_read_tokens: 0,
cache_creation_tokens: 0,
model: None,
message_id: None,
};
log_usage_internal(
@@ -843,6 +844,7 @@ mod tests {
cache_read_tokens: 0,
cache_creation_tokens: 0,
model: None,
message_id: None,
};
log_usage_internal(
+4
View File
@@ -114,6 +114,7 @@ mod tests {
cache_read_tokens: 200,
cache_creation_tokens: 100,
model: None,
message_id: None,
};
let pricing = ModelPricing::from_strings("3.0", "15.0", "0.3", "3.75").unwrap();
@@ -144,6 +145,7 @@ mod tests {
cache_read_tokens: 0,
cache_creation_tokens: 0,
model: None,
message_id: None,
};
let pricing = ModelPricing::from_strings("3.0", "15.0", "0", "0").unwrap();
@@ -165,6 +167,7 @@ mod tests {
cache_read_tokens: 0,
cache_creation_tokens: 0,
model: None,
message_id: None,
};
let multiplier = Decimal::from_str("1.0").unwrap();
@@ -181,6 +184,7 @@ mod tests {
cache_read_tokens: 1,
cache_creation_tokens: 1,
model: None,
message_id: None,
};
let pricing = ModelPricing::from_strings("0.075", "0.3", "0.01875", "0.075").unwrap();
+2 -1
View File
@@ -73,7 +73,7 @@ impl<'a> UsageLogger<'a> {
});
conn.execute(
"INSERT INTO proxy_request_logs (
"INSERT OR REPLACE INTO proxy_request_logs (
request_id, provider_id, app_type, model, request_model,
input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
input_cost_usd, output_cost_usd, cache_read_cost_usd, cache_creation_cost_usd, total_cost_usd,
@@ -358,6 +358,7 @@ mod tests {
cache_read_tokens: 0,
cache_creation_tokens: 0,
model: None,
message_id: None,
};
logger.log_with_calculation(
+39 -3
View File
@@ -9,6 +9,9 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
/// Session 日志 request_id 前缀,与 `session_usage.rs` 中的格式保持一致
pub const SESSION_REQUEST_ID_PREFIX: &str = "session:";
/// Token 使用量统计
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct TokenUsage {
@@ -18,6 +21,22 @@ pub struct TokenUsage {
pub cache_creation_tokens: u32,
/// 从响应中提取的实际模型名称(如果可用)
pub model: Option<String>,
/// 从响应中提取的消息 ID(用于跨源去重)
///
/// Claude API: `msg_xxx`,与 session JSONL 中的 `message.id` 一致
#[serde(skip)]
pub message_id: Option<String>,
}
impl TokenUsage {
/// 生成与 session 日志共享的 request_id,用于跨源去重。
/// 有 message_id 时返回 `session:{id}`,否则回退到随机 UUID。
pub fn dedup_request_id(&self) -> String {
self.message_id
.as_ref()
.map(|mid| format!("{SESSION_REQUEST_ID_PREFIX}{mid}"))
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string())
}
}
/// API 类型
@@ -39,6 +58,10 @@ impl TokenUsage {
.get("model")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let message_id = body
.get("id")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
Some(Self {
input_tokens: usage.get("input_tokens")?.as_u64()? as u32,
@@ -52,6 +75,7 @@ impl TokenUsage {
.and_then(|v| v.as_u64())
.unwrap_or(0) as u32,
model,
message_id,
})
}
@@ -60,18 +84,23 @@ impl TokenUsage {
pub fn from_claude_stream_events(events: &[Value]) -> Option<Self> {
let mut usage = Self::default();
let mut model: Option<String> = None;
let mut message_id: Option<String> = None;
for event in events {
if let Some(event_type) = event.get("type").and_then(|v| v.as_str()) {
match event_type {
"message_start" => {
// 从 message_start 提取模型名称
if model.is_none() {
if let Some(message) = event.get("message") {
if let Some(message) = event.get("message") {
if model.is_none() {
if let Some(m) = message.get("model").and_then(|v| v.as_str()) {
model = Some(m.to_string());
}
}
if message_id.is_none() {
if let Some(id) = message.get("id").and_then(|v| v.as_str()) {
message_id = Some(id.to_string());
}
}
}
if let Some(msg_usage) = event.get("message").and_then(|m| m.get("usage")) {
// 从 message_start 获取 input_tokens(原生 Claude API
@@ -137,6 +166,7 @@ impl TokenUsage {
if usage.input_tokens > 0 || usage.output_tokens > 0 {
usage.model = model;
usage.message_id = message_id;
Some(usage)
} else {
None
@@ -153,6 +183,7 @@ impl TokenUsage {
cache_read_tokens: 0,
cache_creation_tokens: 0,
model: None,
message_id: None,
})
}
@@ -202,6 +233,7 @@ impl TokenUsage {
.and_then(|v| v.as_u64())
.unwrap_or(0) as u32,
model,
message_id: None,
})
}
@@ -245,6 +277,7 @@ impl TokenUsage {
.and_then(|v| v.as_u64())
.unwrap_or(0) as u32,
model,
message_id: None,
})
}
@@ -339,6 +372,7 @@ impl TokenUsage {
cache_read_tokens: cached_tokens,
cache_creation_tokens: 0,
model,
message_id: None,
})
}
@@ -383,6 +417,7 @@ impl TokenUsage {
.unwrap_or(0) as u32,
cache_creation_tokens: 0,
model,
message_id: None,
})
}
@@ -433,6 +468,7 @@ impl TokenUsage {
cache_read_tokens: total_cache_read,
cache_creation_tokens: 0,
model,
message_id: None,
})
} else {
None
+6 -1
View File
@@ -278,7 +278,11 @@ fn sync_single_file(db: &Database, file_path: &Path) -> Result<(u32, u32), AppEr
continue;
}
let request_id = format!("session:{}", msg.message_id);
let request_id = format!(
"{}{}",
crate::proxy::usage::parser::SESSION_REQUEST_ID_PREFIX,
msg.message_id
);
// 跳过 output_tokens 为 0 的无意义条目
if msg.output_tokens == 0 {
@@ -379,6 +383,7 @@ fn insert_session_log_entry(
cache_read_tokens: msg.cache_read_tokens,
cache_creation_tokens: msg.cache_creation_tokens,
model: Some(msg.model.clone()),
message_id: None,
};
let pricing = find_model_pricing_for_session(&conn, &msg.model);
@@ -474,6 +474,7 @@ fn insert_codex_session_entry(
cache_read_tokens: delta.cached_input,
cache_creation_tokens: 0,
model: Some(model.to_string()),
message_id: None,
};
let pricing = find_codex_pricing(&conn, model);
@@ -261,6 +261,7 @@ fn insert_gemini_session_entry(
cache_read_tokens: tokens.cached,
cache_creation_tokens: 0,
model: Some(model.to_string()),
message_id: None,
};
let pricing = find_gemini_pricing(&conn, model);