From 444c123ad0589b4fe5510422e1d13737b8078954 Mon Sep 17 00:00:00 2001 From: lif <1835304752@qq.com> Date: Thu, 23 Apr 2026 11:15:01 +0800 Subject: [PATCH] [codex] Stabilize Codex OAuth cache routing (#2218) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Stabilize Codex OAuth cache routing Codex OAuth-backed Claude proxy requests now reuse a client-provided session identity for prompt cache routing and send Codex-like session headers when that identity exists. Generated proxy UUIDs are intentionally excluded so they do not fragment cache locality.\n\nThe same path exposed two runtime issues during validation: rustls needed an explicit process crypto provider, and Codex OAuth can return Responses SSE even when the original Claude request is non-streaming. Those are handled so cache-routed requests can complete instead of panicking or being parsed as JSON.\n\nConstraint: Official Codex uses conversation identity and Responses session headers for prompt cache routing.\nRejected: Always use generated proxy session IDs | generated IDs change per request and reduce cache reuse.\nConfidence: medium\nScope-risk: moderate\nDirective: Do not remove the client-provided-session guard unless generated session IDs become stable per conversation.\nTested: cargo test codex_oauth\nTested: Local dev app health check on 127.0.0.1:15721\nTested: Local proxy logs showed cache_read_tokens after restart\nNot-tested: Full cargo test without local cc-switch port conflict\nRelated: #2217 * feat(proxy): aggregate forced Codex OAuth SSE into JSON for non-streaming clients Narrow override on top of #2235's streaming fallback. Codex OAuth always forces upstream openai_responses into SSE, even when the original Claude request is stream:false. #2235 handles this by routing such responses through the streaming transform so the client receives text/event-stream — that avoids the 422 that JSON parsing would produce, and it also protects any other provider that unexpectedly returns SSE (the response.is_sse() guard). But for Claude SDK callers that sent stream:false, returning SSE still violates the Anthropic non-streaming contract. This commit adds an override on exactly one combination — non-streaming client + codex_oauth + openai_responses — to aggregate the upstream Responses SSE into a synthetic Responses JSON and then run the regular responses_to_anthropic non-streaming transform. All other paths, including the generic response.is_sse() fallback, remain on the streaming path from #2235. The aggregator reuses proxy::sse::take_sse_block / strip_sse_field, which support both \n\n and \r\n\r\n delimiters; a hand-rolled split("\n\n") would silently fail on real HTTPS upstreams. Tests cover the happy path, CRLF delimiters, response.failed errors, and the missing response.completed defensive branch. --------- Co-authored-by: Jason --- src-tauri/src/lib.rs | 2 + src-tauri/src/provider.rs | 3 +- src-tauri/src/proxy/forwarder.rs | 66 ++++++++- src-tauri/src/proxy/handler_context.rs | 4 + src-tauri/src/proxy/handlers.rs | 180 +++++++++++++++++++++--- src-tauri/src/proxy/providers/claude.rs | 136 ++++++++++++++++-- 6 files changed, 357 insertions(+), 34 deletions(-) diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 72cff929a..ac2b227e9 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -273,6 +273,8 @@ pub fn run() { .plugin(tauri_plugin_opener::init()) .plugin(tauri_plugin_store::Builder::new().build()) .setup(|app| { + let _ = rustls::crypto::ring::default_provider().install_default(); + // 预先刷新 Store 覆盖配置,确保后续路径读取正确(日志/数据库等) app_store::refresh_app_config_dir_override(app.handle()); panic_hook::init_app_config_dir(crate::config::get_app_config_dir()); diff --git a/src-tauri/src/provider.rs b/src-tauri/src/provider.rs index af3af3f4b..efcd1c5dc 100644 --- a/src-tauri/src/provider.rs +++ b/src-tauri/src/provider.rs @@ -258,7 +258,8 @@ pub struct ProviderMeta { pub is_full_url: Option, /// Prompt cache key for OpenAI Responses-compatible endpoints. /// When set, injected into converted Responses requests to improve cache hit rate. - /// If not set, provider ID is used automatically during Claude -> Responses conversion. + /// If not set, Codex OAuth uses the current session ID; other Claude -> Responses + /// conversions fall back to provider ID. #[serde(rename = "promptCacheKey", skip_serializing_if = "Option::is_none")] pub prompt_cache_key: Option, /// 累加模式应用中,该 provider 是否已写入 live config。 diff --git a/src-tauri/src/proxy/forwarder.rs b/src-tauri/src/proxy/forwarder.rs index f31bafcb7..48f2af19f 100644 --- a/src-tauri/src/proxy/forwarder.rs +++ b/src-tauri/src/proxy/forwarder.rs @@ -55,6 +55,8 @@ pub struct RequestForwarder { current_provider_id_at_start: String, /// 代理会话 ID(用于 Gemini Native shadow replay) session_id: String, + /// Session ID 是否由客户端提供;生成值不能作为上游缓存身份。 + session_client_provided: bool, /// 整流器配置 rectifier_config: RectifierConfig, /// 优化器配置 @@ -77,6 +79,7 @@ impl RequestForwarder { app_handle: Option, current_provider_id_at_start: String, session_id: String, + session_client_provided: bool, _streaming_first_byte_timeout: u64, _streaming_idle_timeout: u64, rectifier_config: RectifierConfig, @@ -92,6 +95,7 @@ impl RequestForwarder { app_handle, current_provider_id_at_start, session_id, + session_client_provided, rectifier_config, optimizer_config, copilot_optimizer_config, @@ -966,7 +970,8 @@ impl RequestForwarder { mapped_body, provider, api_format, - Some(&self.session_id), + self.session_client_provided + .then_some(self.session_id.as_str()), Some(self.gemini_shadow.as_ref()), )? } else { @@ -984,6 +989,7 @@ impl RequestForwarder { // Codex OAuth 需要注入的 ChatGPT-Account-Id(在动态 token 获取期间填充) let mut codex_oauth_account_id: Option = None; + let mut should_send_codex_oauth_session_headers = false; // 获取认证头(提前准备,用于内联替换) let mut auth_headers = if let Some(mut auth) = adapter.extract_auth(provider) { @@ -1065,6 +1071,7 @@ impl RequestForwarder { match token_result { Ok(token) => { auth = AuthInfo::new(token, AuthStrategy::CodexOAuth); + should_send_codex_oauth_session_headers = true; // 解析使用的 account_id(用于注入 ChatGPT-Account-Id header) codex_oauth_account_id = match account_id { Some(id) => Some(id), @@ -1102,6 +1109,13 @@ impl RequestForwarder { } } + let codex_oauth_session_headers = + if should_send_codex_oauth_session_headers && self.session_client_provided { + build_codex_oauth_session_headers(&self.session_id) + } else { + Vec::new() + }; + // --- Copilot 优化器:动态 header 注入 --- if let Some((ref classification, ref det_request_id, ref interaction_id)) = copilot_optimization @@ -1342,6 +1356,12 @@ impl RequestForwarder { ); } + // Codex OAuth 反代尽量对齐官方 Codex CLI 的会话路由信号。 + // 只发送客户端提供的 session_id;生成的 UUID 每次不同,反而会破坏前缀缓存。 + for (name, value) in codex_oauth_session_headers { + ordered_headers.insert(name, value); + } + // 序列化请求体 let body_bytes = serde_json::to_vec(&filtered_body) .map_err(|e| ProxyError::Internal(format!("Failed to serialize request body: {e}")))?; @@ -1773,6 +1793,28 @@ fn append_query_to_full_url(base_url: &str, query: Option<&str>) -> String { } } +fn build_codex_oauth_session_headers( + session_id: &str, +) -> Vec<(http::HeaderName, http::HeaderValue)> { + let session_id = session_id.trim(); + if session_id.is_empty() { + return Vec::new(); + } + + let mut headers = Vec::new(); + if let Ok(value) = http::HeaderValue::from_str(session_id) { + headers.push((http::HeaderName::from_static("session_id"), value.clone())); + headers.push((http::HeaderName::from_static("x-client-request-id"), value)); + } + + let window_id = format!("{session_id}:0"); + if let Ok(value) = http::HeaderValue::from_str(&window_id) { + headers.push((http::HeaderName::from_static("x-codex-window-id"), value)); + } + + headers +} + fn should_force_identity_encoding( endpoint: &str, body: &Value, @@ -1882,6 +1924,28 @@ mod tests { assert_eq!(summary, "line1 line2..."); } + #[test] + fn codex_oauth_session_headers_match_codex_cache_identity() { + let headers = build_codex_oauth_session_headers("session-123"); + let mut map = HeaderMap::new(); + for (name, value) in headers { + map.insert(name, value); + } + + assert_eq!( + map.get("session_id"), + Some(&HeaderValue::from_static("session-123")) + ); + assert_eq!( + map.get("x-client-request-id"), + Some(&HeaderValue::from_static("session-123")) + ); + assert_eq!( + map.get("x-codex-window-id"), + Some(&HeaderValue::from_static("session-123:0")) + ); + } + #[test] fn rewrite_claude_transform_endpoint_strips_beta_for_chat_completions() { let (endpoint, passthrough_query) = rewrite_claude_transform_endpoint( diff --git a/src-tauri/src/proxy/handler_context.rs b/src-tauri/src/proxy/handler_context.rs index da2413ec4..151a8bc10 100644 --- a/src-tauri/src/proxy/handler_context.rs +++ b/src-tauri/src/proxy/handler_context.rs @@ -57,6 +57,8 @@ pub struct RequestContext { pub app_type: AppType, /// Session ID(从客户端请求提取或新生成) pub session_id: String, + /// Session ID 是否由客户端提供。生成的 UUID 不能作为上游缓存 key,否则每个请求都会换 key。 + pub session_client_provided: bool, /// 整流器配置 pub rectifier_config: RectifierConfig, /// 优化器配置 @@ -161,6 +163,7 @@ impl RequestContext { app_type_str, app_type, session_id, + session_client_provided: session_result.client_provided, rectifier_config, optimizer_config, copilot_optimizer_config, @@ -223,6 +226,7 @@ impl RequestContext { state.app_handle.clone(), self.current_provider_id.clone(), self.session_id.clone(), + self.session_client_provided, first_byte_timeout, idle_timeout, self.rectifier_config.clone(), diff --git a/src-tauri/src/proxy/handlers.rs b/src-tauri/src/proxy/handlers.rs index 39c7243bc..ca5632c71 100644 --- a/src-tauri/src/proxy/handlers.rs +++ b/src-tauri/src/proxy/handlers.rs @@ -25,6 +25,7 @@ use super::{ SseUsageCollector, }, server::ProxyState, + sse::{strip_sse_field, take_sse_block}, types::*, usage::parser::TokenUsage, ProxyError, @@ -151,16 +152,29 @@ async fn handle_claude_transform( api_format: &str, ) -> Result { let status = response.status(); - let use_streaming = should_use_claude_transform_streaming( - is_stream, - response.is_sse(), - api_format, - ctx.provider - .meta - .as_ref() - .and_then(|meta| meta.provider_type.as_deref()) - == Some("codex_oauth"), - ); + let is_codex_oauth = ctx + .provider + .meta + .as_ref() + .and_then(|meta| meta.provider_type.as_deref()) + == Some("codex_oauth"); + // Codex OAuth 会把 openai_responses 响应强制升级为 SSE,即使客户端发的是 stream:false。 + // should_use_claude_transform_streaming 默认会把这个组合路由到流式转换器——虽然能避免 + // JSON parse 报 422,但会让非流客户端收到 text/event-stream,违反 Anthropic 非流语义。 + // 这里为这个特定组合打开 override:把上游 SSE 聚合成 Anthropic JSON 回给客户端,其它 + // 场景(任意上游 is_sse、非 Codex OAuth 等)仍沿用原有流式兜底。 + let aggregate_codex_oauth_responses_sse = + !is_stream && is_codex_oauth && api_format == "openai_responses"; + let use_streaming = if aggregate_codex_oauth_responses_sse { + false + } else { + should_use_claude_transform_streaming( + is_stream, + response.is_sse(), + api_format, + is_codex_oauth, + ) + }; let tool_schema_hints = transform_gemini::extract_anthropic_tool_schema_hints(original_body); let tool_schema_hints = (!tool_schema_hints.is_empty()).then_some(tool_schema_hints); @@ -255,10 +269,14 @@ async fn handle_claude_transform( let body_str = String::from_utf8_lossy(&body_bytes); - let upstream_response: Value = serde_json::from_slice(&body_bytes).map_err(|e| { - log::error!("[Claude] 解析上游响应失败: {e}, body: {body_str}"); - ProxyError::TransformError(format!("Failed to parse upstream response: {e}")) - })?; + let upstream_response: Value = if aggregate_codex_oauth_responses_sse { + responses_sse_to_response_value(&body_str)? + } else { + serde_json::from_slice(&body_bytes).map_err(|e| { + log::error!("[Claude] 解析上游响应失败: {e}, body: {body_str}"); + ProxyError::TransformError(format!("Failed to parse upstream response: {e}")) + })? + }; // 根据 api_format 选择非流式转换器 let anthropic_response = if api_format == "openai_responses" { @@ -577,9 +595,79 @@ fn should_use_claude_transform_streaming( api_format: &str, is_codex_oauth: bool, ) -> bool { - requested_streaming - || upstream_is_sse - || (is_codex_oauth && api_format == "openai_responses") + requested_streaming || upstream_is_sse || (is_codex_oauth && api_format == "openai_responses") +} + +/// 把 OpenAI Responses SSE 流聚合成一个完整的 Responses JSON 对象,供下游转成 Anthropic +/// 非流响应。仅在 Codex OAuth 把 `stream:false` 强制升级为 SSE 的场景下调用。 +/// +/// 复用 `proxy::sse` 的 `take_sse_block`/`strip_sse_field`:`take_sse_block` 同时支持 +/// `\n\n` 与 `\r\n\r\n` 两种分隔符,`strip_sse_field` 兼容带/不带空格的字段写法。 +fn responses_sse_to_response_value(body: &str) -> Result { + let mut buffer = body.to_string(); + let mut completed_response: Option = None; + let mut output_items = Vec::new(); + + while let Some(block) = take_sse_block(&mut buffer) { + let mut event_name = ""; + let mut data_lines: Vec<&str> = Vec::new(); + + for line in block.lines() { + if let Some(evt) = strip_sse_field(line, "event") { + event_name = evt.trim(); + } else if let Some(d) = strip_sse_field(line, "data") { + data_lines.push(d); + } + } + + if data_lines.is_empty() { + continue; + } + + let data_str = data_lines.join("\n"); + if data_str.trim() == "[DONE]" { + continue; + } + + let data: Value = serde_json::from_str(&data_str).map_err(|e| { + ProxyError::TransformError(format!("Failed to parse upstream SSE event: {e}")) + })?; + + match event_name { + "response.output_item.done" => { + if let Some(item) = data.get("item") { + output_items.push(item.clone()); + } + } + "response.completed" => { + completed_response = Some(data.get("response").cloned().unwrap_or(data)); + } + "response.failed" => { + let message = data + .pointer("/response/error/message") + .and_then(|v| v.as_str()) + .unwrap_or("response.failed event received"); + return Err(ProxyError::TransformError(message.to_string())); + } + _ => {} + } + } + + let mut response = completed_response.ok_or_else(|| { + ProxyError::TransformError("No response.completed event in upstream SSE".to_string()) + })?; + + if !output_items.is_empty() { + if let Some(obj) = response.as_object_mut() { + obj.insert("output".to_string(), Value::Array(output_items)); + } else { + return Err(ProxyError::TransformError( + "response.completed payload is not an object".to_string(), + )); + } + } + + Ok(response) } // ============================================================================ @@ -665,7 +753,8 @@ async fn log_usage( #[cfg(test)] mod tests { - use super::should_use_claude_transform_streaming; + use super::{responses_sse_to_response_value, should_use_claude_transform_streaming}; + use crate::proxy::ProxyError; #[test] fn codex_oauth_responses_force_streaming_even_if_client_sent_false() { @@ -696,4 +785,59 @@ mod tests { false, )); } + + #[test] + fn responses_sse_to_response_value_collects_output_items() { + let sse = r#"event: response.output_item.done +data: {"type":"response.output_item.done","item":{"type":"message","role":"assistant","content":[{"type":"output_text","text":"hello"}]}} + +event: response.completed +data: {"type":"response.completed","response":{"id":"resp_1","status":"completed","model":"gpt-5.4","output":[],"usage":{"input_tokens":10,"output_tokens":2}}} + +"#; + + let response = responses_sse_to_response_value(sse).unwrap(); + + assert_eq!(response["id"], "resp_1"); + assert_eq!(response["output"][0]["type"], "message"); + assert_eq!(response["output"][0]["content"][0]["text"], "hello"); + } + + #[test] + fn responses_sse_to_response_value_handles_crlf_delimiters() { + // 真实 HTTP SSE 按规范使用 \r\n\r\n 分隔事件;take_sse_block 必须同时处理两种分隔符, + // 否则此路径在任何标准上游(含 Codex OAuth HTTPS 后端)下都会 TransformError。 + let sse = "event: response.output_item.done\r\n\ +data: {\"type\":\"response.output_item.done\",\"item\":{\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"output_text\",\"text\":\"hi\"}]}}\r\n\ +\r\n\ +event: response.completed\r\n\ +data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp_crlf\",\"status\":\"completed\",\"model\":\"gpt-5.4\",\"output\":[],\"usage\":{\"input_tokens\":5,\"output_tokens\":1}}}\r\n\ +\r\n"; + + let response = responses_sse_to_response_value(sse).unwrap(); + + assert_eq!(response["id"], "resp_crlf"); + assert_eq!(response["output"][0]["type"], "message"); + assert_eq!(response["output"][0]["content"][0]["text"], "hi"); + } + + #[test] + fn responses_sse_to_response_value_returns_err_on_response_failed() { + let sse = "event: response.failed\n\ +data: {\"type\":\"response.failed\",\"response\":{\"error\":{\"message\":\"upstream blew up\"}}}\n\n"; + + let err = responses_sse_to_response_value(sse).unwrap_err(); + match err { + ProxyError::TransformError(msg) => assert!(msg.contains("upstream blew up")), + other => panic!("expected TransformError, got {other:?}"), + } + } + + #[test] + fn responses_sse_to_response_value_errors_when_no_completed_event() { + let sse = "event: response.output_item.done\n\ +data: {\"type\":\"response.output_item.done\",\"item\":{\"type\":\"message\"}}\n\n"; + + assert!(responses_sse_to_response_value(sse).is_err()); + } } diff --git a/src-tauri/src/proxy/providers/claude.rs b/src-tauri/src/proxy/providers/claude.rs index 604c9ebfe..be454c65f 100644 --- a/src-tauri/src/proxy/providers/claude.rs +++ b/src-tauri/src/proxy/providers/claude.rs @@ -89,6 +89,12 @@ pub fn transform_claude_request_for_api_format( session_id: Option<&str>, shadow_store: Option<&super::gemini_shadow::GeminiShadowStore>, ) -> Result { + let is_codex_oauth = provider + .meta + .as_ref() + .and_then(|m| m.provider_type.as_deref()) + == Some("codex_oauth"); + // Copilot 场景:优先从 metadata.user_id 提取 session ID 作为 cache key // 格式: "uuid_sessionId" → 提取 "_" 后面的部分作为 session 标识 // 同一会话的请求共享 cache key,提升 Copilot 缓存命中率 @@ -118,28 +124,33 @@ pub fn transform_claude_request_for_api_format( .filter(|s| !s.is_empty()) .map(|s| s.to_string()) }) + } else if is_codex_oauth { + session_id + .map(str::trim) + .filter(|s| !s.is_empty()) + .map(ToString::to_string) } else { None }; - let cache_key = session_cache_key - .as_deref() - .or_else(|| { - provider - .meta - .as_ref() - .and_then(|m| m.prompt_cache_key.as_deref()) - }) - .unwrap_or(&provider.id); + let explicit_cache_key = provider + .meta + .as_ref() + .and_then(|m| m.prompt_cache_key.as_deref()); + let cache_key = if is_codex_oauth { + explicit_cache_key + .or(session_cache_key.as_deref()) + .unwrap_or(&provider.id) + } else { + session_cache_key + .as_deref() + .or(explicit_cache_key) + .unwrap_or(&provider.id) + }; match api_format { "openai_responses" => { // Codex OAuth (ChatGPT Plus/Pro 反代) 需要在请求体里强制 store: false // + include: ["reasoning.encrypted_content"],由 transform 层统一处理。 - let is_codex_oauth = provider - .meta - .as_ref() - .and_then(|m| m.provider_type.as_deref()) - == Some("codex_oauth"); super::transform_responses::anthropic_to_responses( body, Some(cache_key), @@ -1222,6 +1233,103 @@ mod tests { assert!(transformed.get("max_output_tokens").is_some()); } + #[test] + fn test_transform_claude_request_for_codex_oauth_uses_session_cache_key() { + let provider = create_provider_with_meta( + json!({ + "env": { + "ANTHROPIC_BASE_URL": "https://chatgpt.com/backend-api/codex" + } + }), + ProviderMeta { + api_format: Some("openai_responses".to_string()), + provider_type: Some("codex_oauth".to_string()), + ..ProviderMeta::default() + }, + ); + let body = json!({ + "model": "gpt-5.4", + "messages": [{ "role": "user", "content": "hello" }], + "max_tokens": 128 + }); + + let transformed = transform_claude_request_for_api_format( + body, + &provider, + "openai_responses", + Some("session-123"), + None, + ) + .unwrap(); + + assert_eq!(transformed["prompt_cache_key"], "session-123"); + } + + #[test] + fn test_transform_claude_request_for_codex_oauth_without_session_falls_back_to_provider_id() { + let provider = create_provider_with_meta( + json!({ + "env": { + "ANTHROPIC_BASE_URL": "https://chatgpt.com/backend-api/codex" + } + }), + ProviderMeta { + api_format: Some("openai_responses".to_string()), + provider_type: Some("codex_oauth".to_string()), + ..ProviderMeta::default() + }, + ); + let body = json!({ + "model": "gpt-5.4", + "messages": [{ "role": "user", "content": "hello" }], + "max_tokens": 128 + }); + + let transformed = transform_claude_request_for_api_format( + body, + &provider, + "openai_responses", + None, + None, + ) + .unwrap(); + + assert_eq!(transformed["prompt_cache_key"], provider.id); + } + + #[test] + fn test_transform_claude_request_for_codex_oauth_keeps_explicit_cache_key() { + let provider = create_provider_with_meta( + json!({ + "env": { + "ANTHROPIC_BASE_URL": "https://chatgpt.com/backend-api/codex" + } + }), + ProviderMeta { + api_format: Some("openai_responses".to_string()), + provider_type: Some("codex_oauth".to_string()), + prompt_cache_key: Some("explicit-cache-key".to_string()), + ..ProviderMeta::default() + }, + ); + let body = json!({ + "model": "gpt-5.4", + "messages": [{ "role": "user", "content": "hello" }], + "max_tokens": 128 + }); + + let transformed = transform_claude_request_for_api_format( + body, + &provider, + "openai_responses", + Some("session-123"), + None, + ) + .unwrap(); + + assert_eq!(transformed["prompt_cache_key"], "explicit-cache-key"); + } + #[test] fn test_transform_claude_request_for_api_format_gemini_native() { let provider = create_provider_with_meta(