[codex] Stabilize Codex OAuth cache routing (#2218)

* Stabilize Codex OAuth cache routing

Codex OAuth-backed Claude proxy requests now reuse a client-provided session identity for prompt cache routing and send Codex-like session headers when that identity exists. Generated proxy UUIDs are intentionally excluded so they do not fragment cache locality.\n\nThe same path exposed two runtime issues during validation: rustls needed an explicit process crypto provider, and Codex OAuth can return Responses SSE even when the original Claude request is non-streaming. Those are handled so cache-routed requests can complete instead of panicking or being parsed as JSON.\n\nConstraint: Official Codex uses conversation identity and Responses session headers for prompt cache routing.\nRejected: Always use generated proxy session IDs | generated IDs change per request and reduce cache reuse.\nConfidence: medium\nScope-risk: moderate\nDirective: Do not remove the client-provided-session guard unless generated session IDs become stable per conversation.\nTested: cargo test codex_oauth\nTested: Local dev app health check on 127.0.0.1:15721\nTested: Local proxy logs showed cache_read_tokens after restart\nNot-tested: Full cargo test without local cc-switch port conflict\nRelated: #2217

* feat(proxy): aggregate forced Codex OAuth SSE into JSON for non-streaming clients

Narrow override on top of #2235's streaming fallback.

Codex OAuth always forces upstream openai_responses into SSE, even
when the original Claude request is stream:false. #2235 handles this
by routing such responses through the streaming transform so the
client receives text/event-stream — that avoids the 422 that JSON
parsing would produce, and it also protects any other provider that
unexpectedly returns SSE (the response.is_sse() guard).

But for Claude SDK callers that sent stream:false, returning SSE
still violates the Anthropic non-streaming contract. This commit
adds an override on exactly one combination — non-streaming client
+ codex_oauth + openai_responses — to aggregate the upstream
Responses SSE into a synthetic Responses JSON and then run the
regular responses_to_anthropic non-streaming transform. All other
paths, including the generic response.is_sse() fallback, remain
on the streaming path from #2235.

The aggregator reuses proxy::sse::take_sse_block / strip_sse_field,
which support both \n\n and \r\n\r\n delimiters; a hand-rolled
split("\n\n") would silently fail on real HTTPS upstreams.

Tests cover the happy path, CRLF delimiters, response.failed
errors, and the missing response.completed defensive branch.

---------

Co-authored-by: Jason <farion1231@gmail.com>
This commit is contained in:
lif
2026-04-23 11:15:01 +08:00
committed by GitHub
parent 3ad7d0b1cc
commit 444c123ad0
6 changed files with 357 additions and 34 deletions
+2
View File
@@ -273,6 +273,8 @@ pub fn run() {
.plugin(tauri_plugin_opener::init())
.plugin(tauri_plugin_store::Builder::new().build())
.setup(|app| {
let _ = rustls::crypto::ring::default_provider().install_default();
// 预先刷新 Store 覆盖配置,确保后续路径读取正确(日志/数据库等)
app_store::refresh_app_config_dir_override(app.handle());
panic_hook::init_app_config_dir(crate::config::get_app_config_dir());
+2 -1
View File
@@ -258,7 +258,8 @@ pub struct ProviderMeta {
pub is_full_url: Option<bool>,
/// Prompt cache key for OpenAI Responses-compatible endpoints.
/// When set, injected into converted Responses requests to improve cache hit rate.
/// If not set, provider ID is used automatically during Claude -> Responses conversion.
/// If not set, Codex OAuth uses the current session ID; other Claude -> Responses
/// conversions fall back to provider ID.
#[serde(rename = "promptCacheKey", skip_serializing_if = "Option::is_none")]
pub prompt_cache_key: Option<String>,
/// 累加模式应用中,该 provider 是否已写入 live config。
+65 -1
View File
@@ -55,6 +55,8 @@ pub struct RequestForwarder {
current_provider_id_at_start: String,
/// 代理会话 ID(用于 Gemini Native shadow replay
session_id: String,
/// Session ID 是否由客户端提供;生成值不能作为上游缓存身份。
session_client_provided: bool,
/// 整流器配置
rectifier_config: RectifierConfig,
/// 优化器配置
@@ -77,6 +79,7 @@ impl RequestForwarder {
app_handle: Option<tauri::AppHandle>,
current_provider_id_at_start: String,
session_id: String,
session_client_provided: bool,
_streaming_first_byte_timeout: u64,
_streaming_idle_timeout: u64,
rectifier_config: RectifierConfig,
@@ -92,6 +95,7 @@ impl RequestForwarder {
app_handle,
current_provider_id_at_start,
session_id,
session_client_provided,
rectifier_config,
optimizer_config,
copilot_optimizer_config,
@@ -966,7 +970,8 @@ impl RequestForwarder {
mapped_body,
provider,
api_format,
Some(&self.session_id),
self.session_client_provided
.then_some(self.session_id.as_str()),
Some(self.gemini_shadow.as_ref()),
)?
} else {
@@ -984,6 +989,7 @@ impl RequestForwarder {
// Codex OAuth 需要注入的 ChatGPT-Account-Id(在动态 token 获取期间填充)
let mut codex_oauth_account_id: Option<String> = None;
let mut should_send_codex_oauth_session_headers = false;
// 获取认证头(提前准备,用于内联替换)
let mut auth_headers = if let Some(mut auth) = adapter.extract_auth(provider) {
@@ -1065,6 +1071,7 @@ impl RequestForwarder {
match token_result {
Ok(token) => {
auth = AuthInfo::new(token, AuthStrategy::CodexOAuth);
should_send_codex_oauth_session_headers = true;
// 解析使用的 account_id(用于注入 ChatGPT-Account-Id header
codex_oauth_account_id = match account_id {
Some(id) => Some(id),
@@ -1102,6 +1109,13 @@ impl RequestForwarder {
}
}
let codex_oauth_session_headers =
if should_send_codex_oauth_session_headers && self.session_client_provided {
build_codex_oauth_session_headers(&self.session_id)
} else {
Vec::new()
};
// --- Copilot 优化器:动态 header 注入 ---
if let Some((ref classification, ref det_request_id, ref interaction_id)) =
copilot_optimization
@@ -1342,6 +1356,12 @@ impl RequestForwarder {
);
}
// Codex OAuth 反代尽量对齐官方 Codex CLI 的会话路由信号。
// 只发送客户端提供的 session_id;生成的 UUID 每次不同,反而会破坏前缀缓存。
for (name, value) in codex_oauth_session_headers {
ordered_headers.insert(name, value);
}
// 序列化请求体
let body_bytes = serde_json::to_vec(&filtered_body)
.map_err(|e| ProxyError::Internal(format!("Failed to serialize request body: {e}")))?;
@@ -1773,6 +1793,28 @@ fn append_query_to_full_url(base_url: &str, query: Option<&str>) -> String {
}
}
fn build_codex_oauth_session_headers(
session_id: &str,
) -> Vec<(http::HeaderName, http::HeaderValue)> {
let session_id = session_id.trim();
if session_id.is_empty() {
return Vec::new();
}
let mut headers = Vec::new();
if let Ok(value) = http::HeaderValue::from_str(session_id) {
headers.push((http::HeaderName::from_static("session_id"), value.clone()));
headers.push((http::HeaderName::from_static("x-client-request-id"), value));
}
let window_id = format!("{session_id}:0");
if let Ok(value) = http::HeaderValue::from_str(&window_id) {
headers.push((http::HeaderName::from_static("x-codex-window-id"), value));
}
headers
}
fn should_force_identity_encoding(
endpoint: &str,
body: &Value,
@@ -1882,6 +1924,28 @@ mod tests {
assert_eq!(summary, "line1 line2...");
}
#[test]
fn codex_oauth_session_headers_match_codex_cache_identity() {
let headers = build_codex_oauth_session_headers("session-123");
let mut map = HeaderMap::new();
for (name, value) in headers {
map.insert(name, value);
}
assert_eq!(
map.get("session_id"),
Some(&HeaderValue::from_static("session-123"))
);
assert_eq!(
map.get("x-client-request-id"),
Some(&HeaderValue::from_static("session-123"))
);
assert_eq!(
map.get("x-codex-window-id"),
Some(&HeaderValue::from_static("session-123:0"))
);
}
#[test]
fn rewrite_claude_transform_endpoint_strips_beta_for_chat_completions() {
let (endpoint, passthrough_query) = rewrite_claude_transform_endpoint(
+4
View File
@@ -57,6 +57,8 @@ pub struct RequestContext {
pub app_type: AppType,
/// Session ID(从客户端请求提取或新生成)
pub session_id: String,
/// Session ID 是否由客户端提供。生成的 UUID 不能作为上游缓存 key,否则每个请求都会换 key。
pub session_client_provided: bool,
/// 整流器配置
pub rectifier_config: RectifierConfig,
/// 优化器配置
@@ -161,6 +163,7 @@ impl RequestContext {
app_type_str,
app_type,
session_id,
session_client_provided: session_result.client_provided,
rectifier_config,
optimizer_config,
copilot_optimizer_config,
@@ -223,6 +226,7 @@ impl RequestContext {
state.app_handle.clone(),
self.current_provider_id.clone(),
self.session_id.clone(),
self.session_client_provided,
first_byte_timeout,
idle_timeout,
self.rectifier_config.clone(),
+162 -18
View File
@@ -25,6 +25,7 @@ use super::{
SseUsageCollector,
},
server::ProxyState,
sse::{strip_sse_field, take_sse_block},
types::*,
usage::parser::TokenUsage,
ProxyError,
@@ -151,16 +152,29 @@ async fn handle_claude_transform(
api_format: &str,
) -> Result<axum::response::Response, ProxyError> {
let status = response.status();
let use_streaming = should_use_claude_transform_streaming(
is_stream,
response.is_sse(),
api_format,
ctx.provider
.meta
.as_ref()
.and_then(|meta| meta.provider_type.as_deref())
== Some("codex_oauth"),
);
let is_codex_oauth = ctx
.provider
.meta
.as_ref()
.and_then(|meta| meta.provider_type.as_deref())
== Some("codex_oauth");
// Codex OAuth 会把 openai_responses 响应强制升级为 SSE,即使客户端发的是 stream:false。
// should_use_claude_transform_streaming 默认会把这个组合路由到流式转换器——虽然能避免
// JSON parse 报 422,但会让非流客户端收到 text/event-stream,违反 Anthropic 非流语义。
// 这里为这个特定组合打开 override:把上游 SSE 聚合成 Anthropic JSON 回给客户端,其它
// 场景(任意上游 is_sse、非 Codex OAuth 等)仍沿用原有流式兜底。
let aggregate_codex_oauth_responses_sse =
!is_stream && is_codex_oauth && api_format == "openai_responses";
let use_streaming = if aggregate_codex_oauth_responses_sse {
false
} else {
should_use_claude_transform_streaming(
is_stream,
response.is_sse(),
api_format,
is_codex_oauth,
)
};
let tool_schema_hints = transform_gemini::extract_anthropic_tool_schema_hints(original_body);
let tool_schema_hints = (!tool_schema_hints.is_empty()).then_some(tool_schema_hints);
@@ -255,10 +269,14 @@ async fn handle_claude_transform(
let body_str = String::from_utf8_lossy(&body_bytes);
let upstream_response: Value = serde_json::from_slice(&body_bytes).map_err(|e| {
log::error!("[Claude] 解析上游响应失败: {e}, body: {body_str}");
ProxyError::TransformError(format!("Failed to parse upstream response: {e}"))
})?;
let upstream_response: Value = if aggregate_codex_oauth_responses_sse {
responses_sse_to_response_value(&body_str)?
} else {
serde_json::from_slice(&body_bytes).map_err(|e| {
log::error!("[Claude] 解析上游响应失败: {e}, body: {body_str}");
ProxyError::TransformError(format!("Failed to parse upstream response: {e}"))
})?
};
// 根据 api_format 选择非流式转换器
let anthropic_response = if api_format == "openai_responses" {
@@ -577,9 +595,79 @@ fn should_use_claude_transform_streaming(
api_format: &str,
is_codex_oauth: bool,
) -> bool {
requested_streaming
|| upstream_is_sse
|| (is_codex_oauth && api_format == "openai_responses")
requested_streaming || upstream_is_sse || (is_codex_oauth && api_format == "openai_responses")
}
/// 把 OpenAI Responses SSE 流聚合成一个完整的 Responses JSON 对象,供下游转成 Anthropic
/// 非流响应。仅在 Codex OAuth 把 `stream:false` 强制升级为 SSE 的场景下调用。
///
/// 复用 `proxy::sse` 的 `take_sse_block`/`strip_sse_field``take_sse_block` 同时支持
/// `\n\n` 与 `\r\n\r\n` 两种分隔符,`strip_sse_field` 兼容带/不带空格的字段写法。
fn responses_sse_to_response_value(body: &str) -> Result<Value, ProxyError> {
let mut buffer = body.to_string();
let mut completed_response: Option<Value> = None;
let mut output_items = Vec::new();
while let Some(block) = take_sse_block(&mut buffer) {
let mut event_name = "";
let mut data_lines: Vec<&str> = Vec::new();
for line in block.lines() {
if let Some(evt) = strip_sse_field(line, "event") {
event_name = evt.trim();
} else if let Some(d) = strip_sse_field(line, "data") {
data_lines.push(d);
}
}
if data_lines.is_empty() {
continue;
}
let data_str = data_lines.join("\n");
if data_str.trim() == "[DONE]" {
continue;
}
let data: Value = serde_json::from_str(&data_str).map_err(|e| {
ProxyError::TransformError(format!("Failed to parse upstream SSE event: {e}"))
})?;
match event_name {
"response.output_item.done" => {
if let Some(item) = data.get("item") {
output_items.push(item.clone());
}
}
"response.completed" => {
completed_response = Some(data.get("response").cloned().unwrap_or(data));
}
"response.failed" => {
let message = data
.pointer("/response/error/message")
.and_then(|v| v.as_str())
.unwrap_or("response.failed event received");
return Err(ProxyError::TransformError(message.to_string()));
}
_ => {}
}
}
let mut response = completed_response.ok_or_else(|| {
ProxyError::TransformError("No response.completed event in upstream SSE".to_string())
})?;
if !output_items.is_empty() {
if let Some(obj) = response.as_object_mut() {
obj.insert("output".to_string(), Value::Array(output_items));
} else {
return Err(ProxyError::TransformError(
"response.completed payload is not an object".to_string(),
));
}
}
Ok(response)
}
// ============================================================================
@@ -665,7 +753,8 @@ async fn log_usage(
#[cfg(test)]
mod tests {
use super::should_use_claude_transform_streaming;
use super::{responses_sse_to_response_value, should_use_claude_transform_streaming};
use crate::proxy::ProxyError;
#[test]
fn codex_oauth_responses_force_streaming_even_if_client_sent_false() {
@@ -696,4 +785,59 @@ mod tests {
false,
));
}
#[test]
fn responses_sse_to_response_value_collects_output_items() {
let sse = r#"event: response.output_item.done
data: {"type":"response.output_item.done","item":{"type":"message","role":"assistant","content":[{"type":"output_text","text":"hello"}]}}
event: response.completed
data: {"type":"response.completed","response":{"id":"resp_1","status":"completed","model":"gpt-5.4","output":[],"usage":{"input_tokens":10,"output_tokens":2}}}
"#;
let response = responses_sse_to_response_value(sse).unwrap();
assert_eq!(response["id"], "resp_1");
assert_eq!(response["output"][0]["type"], "message");
assert_eq!(response["output"][0]["content"][0]["text"], "hello");
}
#[test]
fn responses_sse_to_response_value_handles_crlf_delimiters() {
// 真实 HTTP SSE 按规范使用 \r\n\r\n 分隔事件;take_sse_block 必须同时处理两种分隔符,
// 否则此路径在任何标准上游(含 Codex OAuth HTTPS 后端)下都会 TransformError。
let sse = "event: response.output_item.done\r\n\
data: {\"type\":\"response.output_item.done\",\"item\":{\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"output_text\",\"text\":\"hi\"}]}}\r\n\
\r\n\
event: response.completed\r\n\
data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp_crlf\",\"status\":\"completed\",\"model\":\"gpt-5.4\",\"output\":[],\"usage\":{\"input_tokens\":5,\"output_tokens\":1}}}\r\n\
\r\n";
let response = responses_sse_to_response_value(sse).unwrap();
assert_eq!(response["id"], "resp_crlf");
assert_eq!(response["output"][0]["type"], "message");
assert_eq!(response["output"][0]["content"][0]["text"], "hi");
}
#[test]
fn responses_sse_to_response_value_returns_err_on_response_failed() {
let sse = "event: response.failed\n\
data: {\"type\":\"response.failed\",\"response\":{\"error\":{\"message\":\"upstream blew up\"}}}\n\n";
let err = responses_sse_to_response_value(sse).unwrap_err();
match err {
ProxyError::TransformError(msg) => assert!(msg.contains("upstream blew up")),
other => panic!("expected TransformError, got {other:?}"),
}
}
#[test]
fn responses_sse_to_response_value_errors_when_no_completed_event() {
let sse = "event: response.output_item.done\n\
data: {\"type\":\"response.output_item.done\",\"item\":{\"type\":\"message\"}}\n\n";
assert!(responses_sse_to_response_value(sse).is_err());
}
}
+122 -14
View File
@@ -89,6 +89,12 @@ pub fn transform_claude_request_for_api_format(
session_id: Option<&str>,
shadow_store: Option<&super::gemini_shadow::GeminiShadowStore>,
) -> Result<serde_json::Value, ProxyError> {
let is_codex_oauth = provider
.meta
.as_ref()
.and_then(|m| m.provider_type.as_deref())
== Some("codex_oauth");
// Copilot 场景:优先从 metadata.user_id 提取 session ID 作为 cache key
// 格式: "uuid_sessionId" → 提取 "_" 后面的部分作为 session 标识
// 同一会话的请求共享 cache key,提升 Copilot 缓存命中率
@@ -118,28 +124,33 @@ pub fn transform_claude_request_for_api_format(
.filter(|s| !s.is_empty())
.map(|s| s.to_string())
})
} else if is_codex_oauth {
session_id
.map(str::trim)
.filter(|s| !s.is_empty())
.map(ToString::to_string)
} else {
None
};
let cache_key = session_cache_key
.as_deref()
.or_else(|| {
provider
.meta
.as_ref()
.and_then(|m| m.prompt_cache_key.as_deref())
})
.unwrap_or(&provider.id);
let explicit_cache_key = provider
.meta
.as_ref()
.and_then(|m| m.prompt_cache_key.as_deref());
let cache_key = if is_codex_oauth {
explicit_cache_key
.or(session_cache_key.as_deref())
.unwrap_or(&provider.id)
} else {
session_cache_key
.as_deref()
.or(explicit_cache_key)
.unwrap_or(&provider.id)
};
match api_format {
"openai_responses" => {
// Codex OAuth (ChatGPT Plus/Pro 反代) 需要在请求体里强制 store: false
// + include: ["reasoning.encrypted_content"],由 transform 层统一处理。
let is_codex_oauth = provider
.meta
.as_ref()
.and_then(|m| m.provider_type.as_deref())
== Some("codex_oauth");
super::transform_responses::anthropic_to_responses(
body,
Some(cache_key),
@@ -1222,6 +1233,103 @@ mod tests {
assert!(transformed.get("max_output_tokens").is_some());
}
#[test]
fn test_transform_claude_request_for_codex_oauth_uses_session_cache_key() {
let provider = create_provider_with_meta(
json!({
"env": {
"ANTHROPIC_BASE_URL": "https://chatgpt.com/backend-api/codex"
}
}),
ProviderMeta {
api_format: Some("openai_responses".to_string()),
provider_type: Some("codex_oauth".to_string()),
..ProviderMeta::default()
},
);
let body = json!({
"model": "gpt-5.4",
"messages": [{ "role": "user", "content": "hello" }],
"max_tokens": 128
});
let transformed = transform_claude_request_for_api_format(
body,
&provider,
"openai_responses",
Some("session-123"),
None,
)
.unwrap();
assert_eq!(transformed["prompt_cache_key"], "session-123");
}
#[test]
fn test_transform_claude_request_for_codex_oauth_without_session_falls_back_to_provider_id() {
let provider = create_provider_with_meta(
json!({
"env": {
"ANTHROPIC_BASE_URL": "https://chatgpt.com/backend-api/codex"
}
}),
ProviderMeta {
api_format: Some("openai_responses".to_string()),
provider_type: Some("codex_oauth".to_string()),
..ProviderMeta::default()
},
);
let body = json!({
"model": "gpt-5.4",
"messages": [{ "role": "user", "content": "hello" }],
"max_tokens": 128
});
let transformed = transform_claude_request_for_api_format(
body,
&provider,
"openai_responses",
None,
None,
)
.unwrap();
assert_eq!(transformed["prompt_cache_key"], provider.id);
}
#[test]
fn test_transform_claude_request_for_codex_oauth_keeps_explicit_cache_key() {
let provider = create_provider_with_meta(
json!({
"env": {
"ANTHROPIC_BASE_URL": "https://chatgpt.com/backend-api/codex"
}
}),
ProviderMeta {
api_format: Some("openai_responses".to_string()),
provider_type: Some("codex_oauth".to_string()),
prompt_cache_key: Some("explicit-cache-key".to_string()),
..ProviderMeta::default()
},
);
let body = json!({
"model": "gpt-5.4",
"messages": [{ "role": "user", "content": "hello" }],
"max_tokens": 128
});
let transformed = transform_claude_request_for_api_format(
body,
&provider,
"openai_responses",
Some("session-123"),
None,
)
.unwrap();
assert_eq!(transformed["prompt_cache_key"], "explicit-cache-key");
}
#[test]
fn test_transform_claude_request_for_api_format_gemini_native() {
let provider = create_provider_with_meta(