fix(proxy): improve cache hit rate for Codex/Responses requests

prompt_cache_key was falling back to provider.id when the client did not
supply a session, which collapsed every conversation onto a single key
and defeated upstream prefix caching. Only emit the key when a real
client-provided session/thread identity is available; otherwise let the
upstream use its default matching behaviour.

Additional fixes that affect cache stability:
- Canonicalise (sort) JSON keys in outgoing request bodies and in
  tool_call arguments / tool_result content so semantically identical
  requests produce identical byte sequences for upstream prefix caches.
- Exempt JSON Schema property maps (properties, patternProperties,
  definitions, \$defs) from the underscore-prefix filter so user-defined
  schema keys like _id and _meta survive.
- Add a [CacheTrace] debug log with stable hashes for instructions,
  tools, input and include to help diagnose cache misses.
- Thread session_id into the usage logger for request correlation.
This commit is contained in:
Jason
2026-05-11 08:41:32 +08:00
parent aec055a1d1
commit 00a789e7a3
9 changed files with 392 additions and 31 deletions
+2 -2
View File
@@ -309,8 +309,8 @@ pub struct ProviderMeta {
pub is_full_url: Option<bool>,
/// Prompt cache key for OpenAI Responses-compatible endpoints.
/// When set, injected into converted Responses requests to improve cache hit rate.
/// If not set, Codex OAuth uses the current session ID; other Claude -> Responses
/// conversions fall back to provider ID.
/// If not set, Claude -> Responses conversions use a client-provided session/thread
/// identity when available; generated session IDs are not sent upstream.
#[serde(rename = "promptCacheKey", skip_serializing_if = "Option::is_none")]
pub prompt_cache_key: Option<String>,
/// Codex OAuth FAST mode: inject `service_tier = "priority"` for ChatGPT Codex requests.
+49 -7
View File
@@ -6,6 +6,8 @@
//! - 以 `_` 开头的字段被视为私有参数,会被递归过滤
//! - 支持白名单机制,允许透传特定的 `_` 前缀字段
//! - 支持嵌套对象和数组的深度过滤
//! - JSON Schema 的 properties / patternProperties / definitions / $defs 名称
//! 是用户定义的字段名,不按私有参数过滤
//!
//! ## 使用场景
//! - `_internal_id`: 内部追踪 ID
@@ -65,29 +67,35 @@ pub fn filter_private_params(body: Value) -> Value {
/// ```
pub fn filter_private_params_with_whitelist(body: Value, whitelist: &[String]) -> Value {
let whitelist_set: HashSet<&str> = whitelist.iter().map(|s| s.as_str()).collect();
filter_recursive_with_whitelist(body, &mut Vec::new(), &whitelist_set)
filter_recursive_with_whitelist(body, &mut Vec::new(), &mut Vec::new(), &whitelist_set)
}
/// 递归过滤实现(支持白名单)
fn filter_recursive_with_whitelist(
value: Value,
path: &mut Vec<String>,
removed_keys: &mut Vec<String>,
whitelist: &HashSet<&str>,
) -> Value {
match value {
Value::Object(map) => {
let is_schema_name_map = path.last().is_some_and(|key| matches_schema_name_map(key));
let filtered: serde_json::Map<String, Value> = map
.into_iter()
.filter_map(|(key, val)| {
// 以 _ 开头且不在白名单中的字段被过滤
if key.starts_with('_') && !whitelist.contains(key.as_str()) {
if key.starts_with('_')
&& !whitelist.contains(key.as_str())
&& !is_schema_name_map
{
removed_keys.push(key);
None
} else {
Some((
key,
filter_recursive_with_whitelist(val, removed_keys, whitelist),
))
path.push(key.clone());
let filtered_value =
filter_recursive_with_whitelist(val, path, removed_keys, whitelist);
path.pop();
Some((key, filtered_value))
}
})
.collect();
@@ -102,13 +110,20 @@ fn filter_recursive_with_whitelist(
}
Value::Array(arr) => Value::Array(
arr.into_iter()
.map(|v| filter_recursive_with_whitelist(v, removed_keys, whitelist))
.map(|v| filter_recursive_with_whitelist(v, path, removed_keys, whitelist))
.collect(),
),
other => other,
}
}
fn matches_schema_name_map(key: &str) -> bool {
matches!(
key,
"properties" | "patternProperties" | "definitions" | "$defs"
)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -282,6 +297,33 @@ mod tests {
assert!(data.get("normal").is_some());
}
#[test]
fn test_preserves_json_schema_property_names_with_underscore() {
let input = json!({
"tools": [
{
"name": "lookup",
"input_schema": {
"type": "object",
"properties": {
"_id": {"type": "string", "_internal_note": "remove"},
"_meta": {"type": "object"}
},
"_private_schema_note": "remove"
}
}
]
});
let output = filter_private_params(input);
let schema = &output["tools"][0]["input_schema"];
assert!(schema["properties"].get("_id").is_some());
assert!(schema["properties"].get("_meta").is_some());
assert!(schema["properties"]["_id"].get("_internal_note").is_none());
assert!(schema.get("_private_schema_note").is_none());
}
#[test]
fn test_empty_whitelist_same_as_default() {
let input = json!({
+149 -1
View File
@@ -7,6 +7,7 @@ use super::{
body_filter::filter_private_params_with_whitelist,
error::*,
failover_switch::FailoverSwitchManager,
json_canonical::{canonicalize_value, short_value_hash},
log_codes::fwd as log_fwd,
provider_router::ProviderRouter,
providers::{
@@ -1008,7 +1009,15 @@ impl RequestForwarder {
// 过滤私有参数(以 `_` 开头的字段),防止内部信息泄露到上游
// 默认使用空白名单,过滤所有 _ 前缀字段
let filtered_body = filter_private_params_with_whitelist(request_body, &[]);
let filtered_body = prepare_upstream_request_body(request_body);
log_prompt_cache_trace(
app_type,
provider,
&effective_endpoint,
resolved_claude_api_format.as_deref(),
&filtered_body,
self.session_client_provided,
);
let force_identity_encoding = needs_transform
|| should_force_identity_encoding(&effective_endpoint, &filtered_body, headers);
@@ -1976,6 +1985,65 @@ fn summarize_text_for_log(text: &str, max_chars: usize) -> String {
format!("{truncated}...")
}
fn prepare_upstream_request_body(request_body: Value) -> Value {
canonicalize_value(filter_private_params_with_whitelist(request_body, &[]))
}
fn log_prompt_cache_trace(
app_type: &AppType,
provider: &Provider,
endpoint: &str,
api_format: Option<&str>,
body: &Value,
session_client_provided: bool,
) {
if !log::log_enabled!(log::Level::Debug) {
return;
}
let prompt_cache_key = body
.get("prompt_cache_key")
.and_then(|value| value.as_str())
.map(|key| format!("present(len={})", key.len()))
.unwrap_or_else(|| "absent".to_string());
let store = body
.get("store")
.map(value_for_log)
.unwrap_or_else(|| "absent".to_string());
let stream = body
.get("stream")
.map(value_for_log)
.unwrap_or_else(|| "absent".to_string());
log::debug!(
"[CacheTrace] app={}, provider={}, endpoint={}, api_format={}, session_client_provided={}, prompt_cache_key={}, store={}, stream={}, instructions_hash={}, tools_hash={}, input_hash={}, include_hash={}, body_hash={}",
app_type.as_str(),
provider.id,
endpoint,
api_format.unwrap_or("native"),
session_client_provided,
prompt_cache_key,
store,
stream,
short_value_hash(body.get("instructions")),
short_value_hash(body.get("tools")),
short_value_hash(body.get("input")),
short_value_hash(body.get("include")),
short_value_hash(Some(body)),
);
}
fn value_for_log(value: &Value) -> String {
match value {
Value::Bool(value) => value.to_string(),
Value::Number(value) => value.to_string(),
Value::String(value) => value.clone(),
Value::Null => "null".to_string(),
Value::Array(values) => format!("array(len={})", values.len()),
Value::Object(values) => format!("object(len={})", values.len()),
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -2068,6 +2136,86 @@ mod tests {
assert_eq!(summary, "line1 line2...");
}
#[test]
fn canonical_json_sorts_object_keys_for_cache_trace_hashes() {
let left = json!({
"tools": [
{
"parameters": {
"properties": {
"b": {"type": "string"},
"a": {"type": "number"}
},
"type": "object"
},
"name": "lookup"
}
]
});
let right = json!({
"tools": [
{
"name": "lookup",
"parameters": {
"type": "object",
"properties": {
"a": {"type": "number"},
"b": {"type": "string"}
}
}
}
]
});
assert_eq!(
crate::proxy::json_canonical::canonical_json_string(&left),
crate::proxy::json_canonical::canonical_json_string(&right)
);
assert_eq!(
short_value_hash(Some(&left)),
short_value_hash(Some(&right))
);
}
#[test]
fn prepare_upstream_request_body_filters_private_fields_and_canonicalizes_order() {
let body = json!({
"z": 1,
"_internal": "drop",
"tools": [
{
"name": "lookup",
"parameters": {
"type": "object",
"properties": {
"_id": {
"_private_note": "drop",
"type": "string"
},
"b": {"type": "number"},
"a": {"type": "string"}
}
}
}
],
"a": 2
});
let prepared = prepare_upstream_request_body(body);
assert!(prepared.get("_internal").is_none());
assert!(prepared["tools"][0]["parameters"]["properties"]
.get("_id")
.is_some());
assert!(prepared["tools"][0]["parameters"]["properties"]["_id"]
.get("_private_note")
.is_none());
assert_eq!(
serde_json::to_string(&prepared).unwrap(),
r#"{"a":2,"tools":[{"name":"lookup","parameters":{"properties":{"_id":{"type":"string"},"a":{"type":"string"},"b":{"type":"number"}},"type":"object"}}],"z":1}"#
);
}
#[test]
fn codex_oauth_session_headers_match_codex_cache_identity() {
let headers = build_codex_oauth_session_headers("session-123");
+7 -1
View File
@@ -277,6 +277,7 @@ async fn handle_claude_transform(
let model = ctx.request_model.clone();
let status_code = status.as_u16();
let start_time = ctx.start_time;
let session_id = ctx.session_id.clone();
SseUsageCollector::new(start_time, move |events, first_token_ms| {
if let Some(usage) = TokenUsage::from_claude_stream_events(&events) {
@@ -284,6 +285,7 @@ async fn handle_claude_transform(
let state = state.clone();
let provider_id = provider_id.clone();
let model = model.clone();
let session_id = session_id.clone();
tokio::spawn(async move {
log_usage(
@@ -297,6 +299,7 @@ async fn handle_claude_transform(
first_token_ms,
true,
status_code,
Some(session_id),
)
.await;
});
@@ -383,6 +386,7 @@ async fn handle_claude_transform(
let state = state.clone();
let provider_id = ctx.provider.id.clone();
let model = model.to_string();
let session_id = ctx.session_id.clone();
async move {
log_usage(
&state,
@@ -395,6 +399,7 @@ async fn handle_claude_transform(
None,
false,
status.as_u16(),
Some(session_id),
)
.await;
}
@@ -789,6 +794,7 @@ async fn log_usage(
first_token_ms: Option<u64>,
is_streaming: bool,
status_code: u16,
session_id: Option<String>,
) {
use super::usage::logger::UsageLogger;
@@ -816,7 +822,7 @@ async fn log_usage(
latency_ms,
first_token_ms,
status_code,
None,
session_id,
None, // provider_type
is_streaming,
) {
+102
View File
@@ -0,0 +1,102 @@
//! Stable JSON helpers for cache-sensitive request bodies.
use serde_json::Value;
use sha2::{Digest, Sha256};
pub(crate) fn canonicalize_value(value: Value) -> Value {
match value {
Value::Array(values) => Value::Array(values.into_iter().map(canonicalize_value).collect()),
Value::Object(map) => {
let mut entries = map.into_iter().collect::<Vec<_>>();
entries.sort_by(|(left, _), (right, _)| left.cmp(right));
let mut sorted = serde_json::Map::new();
for (key, value) in entries {
sorted.insert(key, canonicalize_value(value));
}
Value::Object(sorted)
}
other => other,
}
}
pub(crate) fn canonical_json_string(value: &Value) -> String {
match value {
Value::Null => "null".to_string(),
Value::Bool(value) => value.to_string(),
Value::Number(value) => value.to_string(),
Value::String(value) => serde_json::to_string(value)
.expect("serializing a JSON string for canonical output should not fail"),
Value::Array(values) => {
let parts = values.iter().map(canonical_json_string).collect::<Vec<_>>();
format!("[{}]", parts.join(","))
}
Value::Object(map) => {
let mut entries = map.iter().collect::<Vec<_>>();
entries.sort_by_key(|(left, _)| *left);
let parts = entries
.into_iter()
.map(|(key, value)| {
let key = serde_json::to_string(key).expect(
"serializing a JSON object key for canonical output should not fail",
);
format!("{key}:{}", canonical_json_string(value))
})
.collect::<Vec<_>>();
format!("{{{}}}", parts.join(","))
}
}
}
pub(crate) fn short_value_hash(value: Option<&Value>) -> String {
let Some(value) = value else {
return "absent".to_string();
};
short_sha256_hex(canonical_json_string(value).as_bytes())
}
pub(crate) fn short_sha256_hex(bytes: &[u8]) -> String {
let digest = Sha256::digest(bytes);
digest
.iter()
.take(8)
.map(|byte| format!("{byte:02x}"))
.collect::<String>()
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn canonical_json_string_sorts_nested_object_keys() {
let left = json!({
"b": 2,
"a": {
"d": true,
"c": [3, {"z": 1, "y": 2}]
}
});
let right = json!({
"a": {
"c": [3, {"y": 2, "z": 1}],
"d": true
},
"b": 2
});
assert_eq!(canonical_json_string(&left), canonical_json_string(&right));
assert_eq!(
short_value_hash(Some(&left)),
short_value_hash(Some(&right))
);
}
#[test]
fn canonicalize_value_sorts_map_storage_order() {
let value = canonicalize_value(json!({"b": 2, "a": 1}));
assert_eq!(serde_json::to_string(&value).unwrap(), r#"{"a":1,"b":2}"#);
}
}
+1
View File
@@ -17,6 +17,7 @@ mod handlers;
mod health;
pub mod http_client;
pub mod hyper_client;
pub(crate) mod json_canonical;
pub mod log_codes;
pub mod model_mapper;
pub mod provider_router;
+76 -14
View File
@@ -154,37 +154,37 @@ pub fn transform_claude_request_for_api_format(
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
})
} else if is_codex_oauth {
} else {
session_id
.map(str::trim)
.filter(|s| !s.is_empty())
.map(ToString::to_string)
} else {
None
};
let explicit_cache_key = provider
.meta
.as_ref()
.and_then(|m| m.prompt_cache_key.as_deref());
let cache_key = if is_codex_oauth {
explicit_cache_key
.or(session_cache_key.as_deref())
.unwrap_or(&provider.id)
let (cache_key, cache_key_source) = if let Some(key) = explicit_cache_key {
(Some(key), "explicit")
} else if let Some(key) = session_cache_key.as_deref() {
(Some(key), "session")
} else {
session_cache_key
.as_deref()
.or(explicit_cache_key)
.unwrap_or(&provider.id)
(None, "none")
};
match api_format {
"openai_responses" => {
log::debug!(
"[Cache] OpenAI Responses prompt_cache_key source={cache_key_source}, provider={}, codex_oauth={is_codex_oauth}, has_key={}",
provider.id,
cache_key.is_some()
);
// Codex OAuth (ChatGPT Plus/Pro 反代) 需要在请求体里强制 store: false
// + include: ["reasoning.encrypted_content"],由 transform 层统一处理。
let codex_fast_mode = provider.codex_fast_mode_enabled();
super::transform_responses::anthropic_to_responses(
body,
Some(cache_key),
cache_key,
is_codex_oauth,
codex_fast_mode,
)
@@ -1445,7 +1445,7 @@ mod tests {
}
#[test]
fn test_transform_claude_request_for_codex_oauth_without_session_falls_back_to_provider_id() {
fn test_transform_claude_request_for_codex_oauth_without_session_omits_cache_key() {
let provider = create_provider_with_meta(
json!({
"env": {
@@ -1473,7 +1473,69 @@ mod tests {
)
.unwrap();
assert_eq!(transformed["prompt_cache_key"], provider.id);
assert!(transformed.get("prompt_cache_key").is_none());
}
#[test]
fn test_transform_claude_request_for_responses_uses_session_cache_key() {
let provider = create_provider_with_meta(
json!({
"env": {
"ANTHROPIC_BASE_URL": "https://api.openai.example.com"
}
}),
ProviderMeta {
api_format: Some("openai_responses".to_string()),
..ProviderMeta::default()
},
);
let body = json!({
"model": "gpt-5.4",
"messages": [{ "role": "user", "content": "hello" }],
"max_tokens": 128
});
let transformed = transform_claude_request_for_api_format(
body,
&provider,
"openai_responses",
Some("claude-session-123"),
None,
)
.unwrap();
assert_eq!(transformed["prompt_cache_key"], "claude-session-123");
}
#[test]
fn test_transform_claude_request_for_responses_without_session_omits_cache_key() {
let provider = create_provider_with_meta(
json!({
"env": {
"ANTHROPIC_BASE_URL": "https://api.openai.example.com"
}
}),
ProviderMeta {
api_format: Some("openai_responses".to_string()),
..ProviderMeta::default()
},
);
let body = json!({
"model": "gpt-5.4",
"messages": [{ "role": "user", "content": "hello" }],
"max_tokens": 128
});
let transformed = transform_claude_request_for_api_format(
body,
&provider,
"openai_responses",
None,
None,
)
.unwrap();
assert!(transformed.get("prompt_cache_key").is_none());
}
#[test]
+3 -3
View File
@@ -3,7 +3,7 @@
//! 实现 Anthropic ↔ OpenAI 格式转换,用于 OpenRouter 支持
//! 参考: anthropic-proxy-rs
use crate::proxy::error::ProxyError;
use crate::proxy::{error::ProxyError, json_canonical::canonical_json_string};
use serde_json::{json, Value};
const ANTHROPIC_BILLING_HEADER_PREFIX: &str = "x-anthropic-billing-header:";
@@ -371,7 +371,7 @@ fn convert_message_to_openai(
"type": "function",
"function": {
"name": name,
"arguments": serde_json::to_string(&input).unwrap_or_default()
"arguments": canonical_json_string(&input)
}
}));
}
@@ -384,7 +384,7 @@ fn convert_message_to_openai(
let content_val = block.get("content");
let content_str = match content_val {
Some(Value::String(s)) => s.clone(),
Some(v) => serde_json::to_string(v).unwrap_or_default(),
Some(v) => canonical_json_string(v),
None => String::new(),
};
result.push(json!({
@@ -8,7 +8,7 @@
//! - system prompt 使用 `instructions` 字段而非 system role message
//! - usage 字段命名与 Anthropic 一致 (input_tokens/output_tokens)
use crate::proxy::error::ProxyError;
use crate::proxy::{error::ProxyError, json_canonical::canonical_json_string};
use serde_json::{json, Value};
pub(crate) fn sanitize_anthropic_tool_use_input(name: &str, input: Value) -> Value {
@@ -441,7 +441,7 @@ fn convert_messages_to_input(messages: &[Value]) -> Result<Vec<Value>, ProxyErro
"type": "function_call",
"call_id": id,
"name": name,
"arguments": serde_json::to_string(&arguments).unwrap_or_default()
"arguments": canonical_json_string(&arguments)
}));
}
@@ -462,7 +462,7 @@ fn convert_messages_to_input(messages: &[Value]) -> Result<Vec<Value>, ProxyErro
.unwrap_or("");
let output = match block.get("content") {
Some(Value::String(s)) => s.clone(),
Some(v) => serde_json::to_string(v).unwrap_or_default(),
Some(v) => canonical_json_string(v),
None => String::new(),
};