fix(proxy): parse SSE fields with optional spaces in streaming handlers (#1664)

* fix(proxy): handle SSE fields with or without spaces

* refactor(proxy): deduplicate SSE field parsing
This commit is contained in:
tgbdhs
2026-03-25 22:06:21 +08:00
committed by GitHub
parent 90812e7f3a
commit eaf83f4fbe
6 changed files with 91 additions and 8 deletions
+1
View File
@@ -22,6 +22,7 @@ pub mod response_handler;
pub mod response_processor;
pub(crate) mod server;
pub mod session;
pub(crate) mod sse;
pub mod thinking_budget_rectifier;
pub mod thinking_optimizer;
pub mod thinking_rectifier;
+8 -3
View File
@@ -2,6 +2,7 @@
//!
//! 实现 OpenAI SSE → Anthropic SSE 格式转换
use crate::proxy::sse::strip_sse_field;
use bytes::Bytes;
use futures::stream::{Stream, StreamExt};
use serde::{Deserialize, Serialize};
@@ -118,7 +119,7 @@ pub fn create_anthropic_sse_stream(
}
for l in line.lines() {
if let Some(data) = l.strip_prefix("data: ") {
if let Some(data) = strip_sse_field(l, "data") {
if data.trim() == "[DONE]" {
log::debug!("[Claude/OpenRouter] <<< OpenAI SSE: [DONE]");
let event = json!({"type": "message_stop"});
@@ -609,7 +610,9 @@ mod tests {
let events: Vec<Value> = merged
.split("\n\n")
.filter_map(|block| {
let data = block.lines().find_map(|line| line.strip_prefix("data: "))?;
let data = block
.lines()
.find_map(|line| strip_sse_field(line, "data"))?;
serde_json::from_str::<Value>(data).ok()
})
.collect();
@@ -694,7 +697,9 @@ mod tests {
let events: Vec<Value> = merged
.split("\n\n")
.filter_map(|block| {
let data = block.lines().find_map(|line| line.strip_prefix("data: "))?;
let data = block
.lines()
.find_map(|line| strip_sse_field(line, "data"))?;
serde_json::from_str::<Value>(data).ok()
})
.collect();
@@ -9,6 +9,7 @@
//! 与 Chat Completions 的 delta chunk 模型完全不同,需要独立的状态机处理。
use super::transform_responses::{build_anthropic_usage_from_responses, map_responses_stop_reason};
use crate::proxy::sse::strip_sse_field;
use bytes::Bytes;
use futures::stream::{Stream, StreamExt};
use serde_json::{json, Value};
@@ -133,9 +134,9 @@ pub fn create_anthropic_sse_stream_from_responses(
let mut data_parts: Vec<String> = Vec::new();
for line in block.lines() {
if let Some(evt) = line.strip_prefix("event: ") {
if let Some(evt) = strip_sse_field(line, "event") {
event_type = Some(evt.trim().to_string());
} else if let Some(d) = line.strip_prefix("data: ") {
} else if let Some(d) = strip_sse_field(line, "data") {
data_parts.push(d.to_string());
}
}
@@ -810,7 +811,9 @@ mod tests {
let events: Vec<Value> = merged
.split("\n\n")
.filter_map(|block| {
let data = block.lines().find_map(|line| line.strip_prefix("data: "))?;
let data = block
.lines()
.find_map(|line| strip_sse_field(line, "data"))?;
serde_json::from_str::<Value>(data).ok()
})
.collect();
+22 -1
View File
@@ -5,6 +5,7 @@
use super::session::ProxySession;
use super::usage::parser::TokenUsage;
use super::ProxyError;
use crate::proxy::sse::strip_sse_field;
use bytes::Bytes;
use futures::stream::{Stream, StreamExt};
use serde_json::Value;
@@ -90,7 +91,7 @@ impl StreamHandler {
buffer = buffer[pos + 2..].to_string();
for line in event_text.lines() {
if let Some(data) = line.strip_prefix("data: ") {
if let Some(data) = strip_sse_field(line, "data") {
if data.trim() != "[DONE]" {
if let Ok(json) = serde_json::from_str::<Value>(data) {
let mut guard = events.lock().await;
@@ -211,4 +212,24 @@ mod tests {
let handler = StreamHandler::new(30);
assert_eq!(handler.idle_timeout, Duration::from_secs(30));
}
#[test]
fn test_strip_sse_field_accepts_optional_space() {
assert_eq!(
super::strip_sse_field("data: {\"ok\":true}", "data"),
Some("{\"ok\":true}")
);
assert_eq!(
super::strip_sse_field("data:{\"ok\":true}", "data"),
Some("{\"ok\":true}")
);
assert_eq!(
super::strip_sse_field("event: message_start", "event"),
Some("message_start")
);
assert_eq!(
super::strip_sse_field("event:message_start", "event"),
Some("message_start")
);
}
}
+23 -1
View File
@@ -6,6 +6,7 @@ use super::{
handler_config::UsageParserConfig,
handler_context::{RequestContext, StreamingTimeoutConfig},
server::ProxyState,
sse::strip_sse_field,
usage::parser::TokenUsage,
ProxyError,
};
@@ -527,7 +528,7 @@ pub fn create_logged_passthrough_stream(
if !event_text.trim().is_empty() {
// 提取 data 部分并尝试解析为 JSON
for line in event_text.lines() {
if let Some(data) = line.strip_prefix("data: ") {
if let Some(data) = strip_sse_field(line, "data") {
if data.trim() != "[DONE]" {
if let Ok(json_value) = serde_json::from_str::<Value>(data) {
if let Some(c) = &collector {
@@ -591,6 +592,27 @@ mod tests {
use std::sync::Arc;
use tokio::sync::RwLock;
#[test]
fn test_strip_sse_field_accepts_optional_space() {
assert_eq!(
super::strip_sse_field("data: {\"ok\":true}", "data"),
Some("{\"ok\":true}")
);
assert_eq!(
super::strip_sse_field("data:{\"ok\":true}", "data"),
Some("{\"ok\":true}")
);
assert_eq!(
super::strip_sse_field("event: message_start", "event"),
Some("message_start")
);
assert_eq!(
super::strip_sse_field("event:message_start", "event"),
Some("message_start")
);
assert_eq!(super::strip_sse_field("id:1", "data"), None);
}
fn build_state(db: Arc<Database>) -> ProxyState {
ProxyState {
db: db.clone(),
+31
View File
@@ -0,0 +1,31 @@
#[inline]
pub(crate) fn strip_sse_field<'a>(line: &'a str, field: &str) -> Option<&'a str> {
line.strip_prefix(&format!("{field}: "))
.or_else(|| line.strip_prefix(&format!("{field}:")))
}
#[cfg(test)]
mod tests {
use super::strip_sse_field;
#[test]
fn strip_sse_field_accepts_optional_space() {
assert_eq!(
strip_sse_field("data: {\"ok\":true}", "data"),
Some("{\"ok\":true}")
);
assert_eq!(
strip_sse_field("data:{\"ok\":true}", "data"),
Some("{\"ok\":true}")
);
assert_eq!(
strip_sse_field("event: message_start", "event"),
Some("message_start")
);
assert_eq!(
strip_sse_field("event:message_start", "event"),
Some("message_start")
);
assert_eq!(strip_sse_field("id:1", "data"), None);
}
}