mirror of
https://github.com/farion1231/cc-switch.git
synced 2026-04-09 20:42:28 +08:00
refactor(proxy): remove is_proxy_target in favor of failover_queue
- Remove `is_proxy_target` field from Provider struct (Rust & TypeScript) - Remove related DAO methods: get_proxy_target_provider, set_proxy_target - Remove deprecated Tauri commands: get_proxy_targets, set_proxy_target - Add `is_available()` method to CircuitBreaker for availability checks without consuming HalfOpen probe permits (used in select_providers) - Keep `allow_request()` for actual request gating with permit tracking - Update stream_check to use failover_queue instead of is_proxy_target - Clean up commented-out reset circuit breaker button in ProviderActions - Remove unused useProxyTargets and useSetProxyTarget hooks
This commit is contained in:
@@ -86,19 +86,6 @@ pub fn switch_provider(
|
||||
.map_err(|e| e.to_string())
|
||||
}
|
||||
|
||||
/// 设置代理目标供应商
|
||||
#[tauri::command]
|
||||
pub fn set_proxy_target_provider(
|
||||
state: State<'_, AppState>,
|
||||
app: String,
|
||||
id: String,
|
||||
) -> Result<bool, String> {
|
||||
let app_type = AppType::from_str(&app).map_err(|e| e.to_string())?;
|
||||
ProviderService::set_proxy_target(state.inner(), app_type, &id)
|
||||
.map(|_| true)
|
||||
.map_err(|e| e.to_string())
|
||||
}
|
||||
|
||||
fn import_default_config_internal(state: &AppState, app_type: AppType) -> Result<bool, AppError> {
|
||||
ProviderService::import_default_config(state, app_type)
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
//!
|
||||
//! 提供前端调用的 API 接口
|
||||
|
||||
use crate::provider::Provider;
|
||||
use crate::proxy::types::*;
|
||||
use crate::proxy::{CircuitBreakerConfig, CircuitBreakerStats};
|
||||
use crate::store::AppState;
|
||||
@@ -69,47 +68,6 @@ pub async fn switch_proxy_provider(
|
||||
|
||||
// ==================== 故障转移相关命令 ====================
|
||||
|
||||
/// 获取代理目标列表
|
||||
#[tauri::command]
|
||||
pub async fn get_proxy_targets(
|
||||
state: tauri::State<'_, AppState>,
|
||||
app_type: String,
|
||||
) -> Result<Vec<Provider>, String> {
|
||||
let db = &state.db;
|
||||
db.get_proxy_targets(&app_type)
|
||||
.await
|
||||
.map_err(|e| e.to_string())
|
||||
.map(|providers| providers.into_values().collect())
|
||||
}
|
||||
|
||||
/// 设置代理目标
|
||||
#[tauri::command]
|
||||
pub async fn set_proxy_target(
|
||||
state: tauri::State<'_, AppState>,
|
||||
provider_id: String,
|
||||
app_type: String,
|
||||
enabled: bool,
|
||||
) -> Result<(), String> {
|
||||
let db = &state.db;
|
||||
|
||||
// 设置代理目标状态
|
||||
db.set_proxy_target(&provider_id, &app_type, enabled)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
// 如果是禁用代理目标,重置健康状态
|
||||
if !enabled {
|
||||
log::info!(
|
||||
"Resetting health status for provider {provider_id} (app: {app_type}) after disabling proxy target"
|
||||
);
|
||||
if let Err(e) = db.reset_provider_health(&provider_id, &app_type).await {
|
||||
log::warn!("Failed to reset provider health: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 获取供应商健康状态
|
||||
#[tauri::command]
|
||||
pub async fn get_provider_health(
|
||||
|
||||
@@ -6,6 +6,7 @@ use crate::services::stream_check::{
|
||||
HealthStatus, StreamCheckConfig, StreamCheckResult, StreamCheckService,
|
||||
};
|
||||
use crate::store::AppState;
|
||||
use std::collections::HashSet;
|
||||
use tauri::State;
|
||||
|
||||
/// 流式健康检查(单个供应商)
|
||||
@@ -44,10 +45,28 @@ pub async fn stream_check_all_providers(
|
||||
let providers = state.db.get_all_providers(app_type.as_str())?;
|
||||
|
||||
let mut results = Vec::new();
|
||||
let allowed_ids: Option<HashSet<String>> = if proxy_targets_only {
|
||||
let mut ids = HashSet::new();
|
||||
if let Ok(Some(current_id)) = state.db.get_current_provider(app_type.as_str()) {
|
||||
ids.insert(current_id);
|
||||
}
|
||||
if let Ok(queue) = state.db.get_failover_queue(app_type.as_str()) {
|
||||
for item in queue {
|
||||
if item.enabled {
|
||||
ids.insert(item.provider_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(ids)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
for (id, provider) in providers {
|
||||
if proxy_targets_only && !provider.is_proxy_target.unwrap_or(false) {
|
||||
continue;
|
||||
if let Some(ids) = &allowed_ids {
|
||||
if !ids.contains(&id) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let result = StreamCheckService::check_with_retry(&app_type, &provider, &config)
|
||||
|
||||
@@ -17,7 +17,7 @@ impl Database {
|
||||
) -> Result<IndexMap<String, Provider>, AppError> {
|
||||
let conn = lock_conn!(self.conn);
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT id, name, settings_config, website_url, category, created_at, sort_index, notes, icon, icon_color, meta, is_proxy_target
|
||||
"SELECT id, name, settings_config, website_url, category, created_at, sort_index, notes, icon, icon_color, meta
|
||||
FROM providers WHERE app_type = ?1
|
||||
ORDER BY COALESCE(sort_index, 999999), created_at ASC, id ASC"
|
||||
).map_err(|e| AppError::Database(e.to_string()))?;
|
||||
@@ -35,7 +35,6 @@ impl Database {
|
||||
let icon: Option<String> = row.get(8)?;
|
||||
let icon_color: Option<String> = row.get(9)?;
|
||||
let meta_str: String = row.get(10)?;
|
||||
let is_proxy_target: bool = row.get(11)?;
|
||||
|
||||
let settings_config =
|
||||
serde_json::from_str(&settings_config_str).unwrap_or(serde_json::Value::Null);
|
||||
@@ -55,7 +54,6 @@ impl Database {
|
||||
meta: Some(meta),
|
||||
icon,
|
||||
icon_color,
|
||||
is_proxy_target: Some(is_proxy_target),
|
||||
},
|
||||
))
|
||||
})
|
||||
@@ -131,7 +129,7 @@ impl Database {
|
||||
) -> Result<Option<Provider>, AppError> {
|
||||
let conn = lock_conn!(self.conn);
|
||||
let result = conn.query_row(
|
||||
"SELECT name, settings_config, website_url, category, created_at, sort_index, notes, icon, icon_color, meta, is_proxy_target
|
||||
"SELECT name, settings_config, website_url, category, created_at, sort_index, notes, icon, icon_color, meta
|
||||
FROM providers WHERE id = ?1 AND app_type = ?2",
|
||||
params![id, app_type],
|
||||
|row| {
|
||||
@@ -145,7 +143,6 @@ impl Database {
|
||||
let icon: Option<String> = row.get(7)?;
|
||||
let icon_color: Option<String> = row.get(8)?;
|
||||
let meta_str: String = row.get(9)?;
|
||||
let is_proxy_target: bool = row.get(10)?;
|
||||
|
||||
let settings_config = serde_json::from_str(&settings_config_str).unwrap_or(serde_json::Value::Null);
|
||||
let meta: ProviderMeta = serde_json::from_str(&meta_str).unwrap_or_default();
|
||||
@@ -162,7 +159,6 @@ impl Database {
|
||||
meta: Some(meta),
|
||||
icon,
|
||||
icon_color,
|
||||
is_proxy_target: Some(is_proxy_target),
|
||||
})
|
||||
},
|
||||
);
|
||||
@@ -174,26 +170,6 @@ impl Database {
|
||||
}
|
||||
}
|
||||
|
||||
/// 获取代理目标供应商 ID
|
||||
pub fn get_proxy_target_provider(&self, app_type: &str) -> Result<Option<String>, AppError> {
|
||||
let conn = lock_conn!(self.conn);
|
||||
let mut stmt = conn
|
||||
.prepare("SELECT id FROM providers WHERE app_type = ?1 AND is_proxy_target = 1 LIMIT 1")
|
||||
.map_err(|e| AppError::Database(e.to_string()))?;
|
||||
|
||||
let mut rows = stmt
|
||||
.query(params![app_type])
|
||||
.map_err(|e| AppError::Database(e.to_string()))?;
|
||||
|
||||
if let Some(row) = rows.next().map_err(|e| AppError::Database(e.to_string()))? {
|
||||
Ok(Some(
|
||||
row.get(0).map_err(|e| AppError::Database(e.to_string()))?,
|
||||
))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// 保存供应商(新增或更新)
|
||||
///
|
||||
/// 注意:更新模式下不同步 endpoints,因为编辑模式下端点通过单独的 API 管理
|
||||
@@ -208,17 +184,17 @@ impl Database {
|
||||
let mut meta_clone = provider.meta.clone().unwrap_or_default();
|
||||
let endpoints = std::mem::take(&mut meta_clone.custom_endpoints);
|
||||
|
||||
// 检查是否存在(用于判断新增/更新,以及保留 is_current 和 is_proxy_target)
|
||||
let existing: Option<(bool, bool)> = tx
|
||||
// 检查是否存在(用于判断新增/更新,以及保留 is_current)
|
||||
let existing: Option<bool> = tx
|
||||
.query_row(
|
||||
"SELECT is_current, is_proxy_target FROM providers WHERE id = ?1 AND app_type = ?2",
|
||||
"SELECT is_current FROM providers WHERE id = ?1 AND app_type = ?2",
|
||||
params![provider.id, app_type],
|
||||
|row| Ok((row.get(0)?, row.get(1)?)),
|
||||
|row| row.get(0),
|
||||
)
|
||||
.ok();
|
||||
|
||||
let is_update = existing.is_some();
|
||||
let (is_current, is_proxy_target) = existing.unwrap_or((false, false));
|
||||
let is_current = existing.unwrap_or(false);
|
||||
|
||||
if is_update {
|
||||
// 更新模式:使用 UPDATE 避免触发 ON DELETE CASCADE
|
||||
@@ -234,9 +210,8 @@ impl Database {
|
||||
icon = ?8,
|
||||
icon_color = ?9,
|
||||
meta = ?10,
|
||||
is_current = ?11,
|
||||
is_proxy_target = ?12
|
||||
WHERE id = ?13 AND app_type = ?14",
|
||||
is_current = ?11
|
||||
WHERE id = ?12 AND app_type = ?13",
|
||||
params![
|
||||
provider.name,
|
||||
serde_json::to_string(&provider.settings_config).unwrap(),
|
||||
@@ -249,7 +224,6 @@ impl Database {
|
||||
provider.icon_color,
|
||||
serde_json::to_string(&meta_clone).unwrap(),
|
||||
is_current,
|
||||
is_proxy_target,
|
||||
provider.id,
|
||||
app_type,
|
||||
],
|
||||
@@ -260,8 +234,8 @@ impl Database {
|
||||
tx.execute(
|
||||
"INSERT INTO providers (
|
||||
id, app_type, name, settings_config, website_url, category,
|
||||
created_at, sort_index, notes, icon, icon_color, meta, is_current, is_proxy_target
|
||||
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
|
||||
created_at, sort_index, notes, icon, icon_color, meta, is_current
|
||||
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
|
||||
params![
|
||||
provider.id,
|
||||
app_type,
|
||||
@@ -276,7 +250,6 @@ impl Database {
|
||||
provider.icon_color,
|
||||
serde_json::to_string(&meta_clone).unwrap(),
|
||||
is_current,
|
||||
is_proxy_target,
|
||||
],
|
||||
)
|
||||
.map_err(|e| AppError::Database(e.to_string()))?;
|
||||
@@ -332,157 +305,6 @@ impl Database {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 设置代理目标供应商
|
||||
pub fn set_proxy_target_provider(&self, app_type: &str, id: &str) -> Result<(), AppError> {
|
||||
let mut conn = lock_conn!(self.conn);
|
||||
let tx = conn
|
||||
.transaction()
|
||||
.map_err(|e| AppError::Database(e.to_string()))?;
|
||||
|
||||
// 重置所有为 0
|
||||
tx.execute(
|
||||
"UPDATE providers SET is_proxy_target = 0 WHERE app_type = ?1",
|
||||
params![app_type],
|
||||
)
|
||||
.map_err(|e| AppError::Database(e.to_string()))?;
|
||||
|
||||
// 设置新的代理目标供应商
|
||||
tx.execute(
|
||||
"UPDATE providers SET is_proxy_target = 1 WHERE id = ?1 AND app_type = ?2",
|
||||
params![id, app_type],
|
||||
)
|
||||
.map_err(|e| AppError::Database(e.to_string()))?;
|
||||
|
||||
tx.commit().map_err(|e| AppError::Database(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 设置单个供应商的代理目标状态(支持多个代理目标)
|
||||
pub async fn set_proxy_target(
|
||||
&self,
|
||||
provider_id: &str,
|
||||
app_type: &str,
|
||||
enabled: bool,
|
||||
) -> Result<(), AppError> {
|
||||
let conn = lock_conn!(self.conn);
|
||||
conn.execute(
|
||||
"UPDATE providers SET is_proxy_target = ?1
|
||||
WHERE id = ?2 AND app_type = ?3",
|
||||
params![if enabled { 1 } else { 0 }, provider_id, app_type],
|
||||
)
|
||||
.map_err(|e| AppError::Database(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 获取指定应用类型的所有代理目标供应商(按 sort_index 排序)
|
||||
pub async fn get_proxy_targets(
|
||||
&self,
|
||||
app_type: &str,
|
||||
) -> Result<IndexMap<String, Provider>, AppError> {
|
||||
let conn = lock_conn!(self.conn);
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT id, name, settings_config, website_url, category, created_at, sort_index, notes, icon, icon_color, meta, is_proxy_target
|
||||
FROM providers WHERE app_type = ?1 AND is_proxy_target = 1
|
||||
ORDER BY COALESCE(sort_index, 999999), created_at ASC, id ASC"
|
||||
).map_err(|e| AppError::Database(e.to_string()))?;
|
||||
|
||||
let provider_iter = stmt
|
||||
.query_map(params![app_type], |row| {
|
||||
let id: String = row.get(0)?;
|
||||
let name: String = row.get(1)?;
|
||||
let settings_config_str: String = row.get(2)?;
|
||||
let website_url: Option<String> = row.get(3)?;
|
||||
let category: Option<String> = row.get(4)?;
|
||||
let created_at: Option<i64> = row.get(5)?;
|
||||
let sort_index: Option<usize> = row.get(6)?;
|
||||
let notes: Option<String> = row.get(7)?;
|
||||
let icon: Option<String> = row.get(8)?;
|
||||
let icon_color: Option<String> = row.get(9)?;
|
||||
let meta_str: String = row.get(10)?;
|
||||
let is_proxy_target: bool = row.get(11)?;
|
||||
|
||||
let settings_config =
|
||||
serde_json::from_str(&settings_config_str).unwrap_or(serde_json::Value::Null);
|
||||
let meta: ProviderMeta = serde_json::from_str(&meta_str).unwrap_or_default();
|
||||
|
||||
Ok((
|
||||
id,
|
||||
Provider {
|
||||
id: "".to_string(),
|
||||
name,
|
||||
settings_config,
|
||||
website_url,
|
||||
category,
|
||||
created_at,
|
||||
sort_index,
|
||||
notes,
|
||||
meta: Some(meta),
|
||||
icon,
|
||||
icon_color,
|
||||
is_proxy_target: Some(is_proxy_target),
|
||||
},
|
||||
))
|
||||
})
|
||||
.map_err(|e| AppError::Database(e.to_string()))?;
|
||||
|
||||
let mut providers = IndexMap::new();
|
||||
for provider_res in provider_iter {
|
||||
let (id, mut provider) = provider_res.map_err(|e| AppError::Database(e.to_string()))?;
|
||||
provider.id = id.clone();
|
||||
|
||||
// 加载 endpoints
|
||||
let mut stmt_endpoints = conn.prepare(
|
||||
"SELECT url, added_at FROM provider_endpoints WHERE provider_id = ?1 AND app_type = ?2 ORDER BY added_at ASC, url ASC"
|
||||
).map_err(|e| AppError::Database(e.to_string()))?;
|
||||
|
||||
let endpoints_iter = stmt_endpoints
|
||||
.query_map(params![id, app_type], |row| {
|
||||
let url: String = row.get(0)?;
|
||||
let added_at: Option<i64> = row.get(1)?;
|
||||
Ok((
|
||||
url,
|
||||
crate::settings::CustomEndpoint {
|
||||
url: "".to_string(),
|
||||
added_at: added_at.unwrap_or(0),
|
||||
last_used: None,
|
||||
},
|
||||
))
|
||||
})
|
||||
.map_err(|e| AppError::Database(e.to_string()))?;
|
||||
|
||||
let mut custom_endpoints = HashMap::new();
|
||||
for ep_res in endpoints_iter {
|
||||
let (url, mut ep) = ep_res.map_err(|e| AppError::Database(e.to_string()))?;
|
||||
ep.url = url.clone();
|
||||
custom_endpoints.insert(url, ep);
|
||||
}
|
||||
|
||||
if let Some(meta) = &mut provider.meta {
|
||||
meta.custom_endpoints = custom_endpoints;
|
||||
}
|
||||
|
||||
providers.insert(id, provider);
|
||||
}
|
||||
|
||||
Ok(providers)
|
||||
}
|
||||
|
||||
/// 获取所有活跃的代理目标
|
||||
pub fn get_all_proxy_targets(&self) -> Result<Vec<(String, String, String)>, AppError> {
|
||||
let conn = lock_conn!(self.conn);
|
||||
let mut stmt = conn
|
||||
.prepare("SELECT app_type, name, id FROM providers WHERE is_proxy_target = 1")
|
||||
.map_err(|e| AppError::Database(e.to_string()))?;
|
||||
|
||||
let targets = stmt
|
||||
.query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
|
||||
.map_err(|e| AppError::Database(e.to_string()))?
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map_err(|e| AppError::Database(e.to_string()))?;
|
||||
|
||||
Ok(targets)
|
||||
}
|
||||
|
||||
/// 更新供应商的 settings_config(仅更新配置,不改变其他字段)
|
||||
pub fn update_provider_settings_config(
|
||||
&self,
|
||||
|
||||
@@ -31,19 +31,12 @@ impl Database {
|
||||
icon_color TEXT,
|
||||
meta TEXT NOT NULL DEFAULT '{}',
|
||||
is_current BOOLEAN NOT NULL DEFAULT 0,
|
||||
is_proxy_target BOOLEAN NOT NULL DEFAULT 0,
|
||||
PRIMARY KEY (id, app_type)
|
||||
)",
|
||||
[],
|
||||
)
|
||||
.map_err(|e| AppError::Database(e.to_string()))?;
|
||||
|
||||
// 尝试添加 is_proxy_target 列(如果表已存在但缺少该列)
|
||||
let _ = conn.execute(
|
||||
"ALTER TABLE providers ADD COLUMN is_proxy_target BOOLEAN NOT NULL DEFAULT 0",
|
||||
[],
|
||||
);
|
||||
|
||||
// 2. Provider Endpoints 表
|
||||
conn.execute(
|
||||
"CREATE TABLE IF NOT EXISTS provider_endpoints (
|
||||
|
||||
@@ -245,7 +245,6 @@ fn dry_run_validates_schema_compatibility() {
|
||||
meta: None,
|
||||
icon: None,
|
||||
icon_color: None,
|
||||
is_proxy_target: Some(false),
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@@ -132,7 +132,6 @@ pub(crate) fn build_provider_from_request(
|
||||
meta,
|
||||
icon: request.icon.clone(),
|
||||
icon_color: None,
|
||||
is_proxy_target: None,
|
||||
};
|
||||
|
||||
Ok(provider)
|
||||
|
||||
@@ -145,6 +145,13 @@ fn test_build_gemini_provider_with_model() {
|
||||
content: None,
|
||||
description: None,
|
||||
enabled: None,
|
||||
usage_enabled: None,
|
||||
usage_script: None,
|
||||
usage_api_key: None,
|
||||
usage_base_url: None,
|
||||
usage_access_token: None,
|
||||
usage_user_id: None,
|
||||
usage_auto_interval: None,
|
||||
};
|
||||
|
||||
let provider = build_provider_from_request(&AppType::Gemini, &request).unwrap();
|
||||
@@ -191,6 +198,13 @@ fn test_build_gemini_provider_without_model() {
|
||||
content: None,
|
||||
description: None,
|
||||
enabled: None,
|
||||
usage_enabled: None,
|
||||
usage_script: None,
|
||||
usage_api_key: None,
|
||||
usage_base_url: None,
|
||||
usage_access_token: None,
|
||||
usage_user_id: None,
|
||||
usage_auto_interval: None,
|
||||
};
|
||||
|
||||
let provider = build_provider_from_request(&AppType::Gemini, &request).unwrap();
|
||||
@@ -232,6 +246,13 @@ fn test_parse_and_merge_config_claude() {
|
||||
content: None,
|
||||
description: None,
|
||||
enabled: None,
|
||||
usage_enabled: None,
|
||||
usage_script: None,
|
||||
usage_api_key: None,
|
||||
usage_base_url: None,
|
||||
usage_access_token: None,
|
||||
usage_user_id: None,
|
||||
usage_auto_interval: None,
|
||||
};
|
||||
|
||||
let merged = parse_and_merge_config(&request).unwrap();
|
||||
@@ -275,6 +296,13 @@ fn test_parse_and_merge_config_url_override() {
|
||||
content: None,
|
||||
description: None,
|
||||
enabled: None,
|
||||
usage_enabled: None,
|
||||
usage_script: None,
|
||||
usage_api_key: None,
|
||||
usage_base_url: None,
|
||||
usage_access_token: None,
|
||||
usage_user_id: None,
|
||||
usage_auto_interval: None,
|
||||
};
|
||||
|
||||
let merged = parse_and_merge_config(&request).unwrap();
|
||||
|
||||
@@ -581,7 +581,6 @@ pub fn run() {
|
||||
commands::update_provider,
|
||||
commands::delete_provider,
|
||||
commands::switch_provider,
|
||||
commands::set_proxy_target_provider,
|
||||
commands::import_default_config,
|
||||
commands::get_claude_config_status,
|
||||
commands::get_config_status,
|
||||
@@ -684,8 +683,6 @@ pub fn run() {
|
||||
commands::is_live_takeover_active,
|
||||
commands::switch_proxy_provider,
|
||||
// Proxy failover commands
|
||||
commands::get_proxy_targets,
|
||||
commands::set_proxy_target,
|
||||
commands::get_provider_health,
|
||||
commands::reset_circuit_breaker,
|
||||
commands::get_circuit_breaker_config,
|
||||
|
||||
@@ -36,10 +36,6 @@ pub struct Provider {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(rename = "iconColor")]
|
||||
pub icon_color: Option<String>,
|
||||
/// 是否为代理目标(数据库专用字段,不写入配置文件)
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(rename = "isProxyTarget")]
|
||||
pub is_proxy_target: Option<bool>,
|
||||
}
|
||||
|
||||
impl Provider {
|
||||
@@ -62,7 +58,6 @@ impl Provider {
|
||||
meta: None,
|
||||
icon: None,
|
||||
icon_color: None,
|
||||
is_proxy_target: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,6 +99,36 @@ impl CircuitBreaker {
|
||||
log::debug!("Circuit breaker config updated");
|
||||
}
|
||||
|
||||
/// 判断当前 Provider 是否“可被纳入候选链路”
|
||||
///
|
||||
/// 这个方法不会占用 HalfOpen 探测名额,仅用于路由选择阶段的“可用性判断”:
|
||||
/// - Closed / HalfOpen:可用(返回 true)
|
||||
/// - Open:若超时到达则切到 HalfOpen 并返回 true,否则返回 false
|
||||
///
|
||||
/// 注意:真正发起请求前仍需调用 `allow_request()` 来获取 HalfOpen 探测名额,
|
||||
/// 并在请求结束后通过 `record_success()` / `record_failure()` 释放。
|
||||
pub async fn is_available(&self) -> bool {
|
||||
let state = *self.state.read().await;
|
||||
let config = self.config.read().await;
|
||||
|
||||
match state {
|
||||
CircuitState::Closed | CircuitState::HalfOpen => true,
|
||||
CircuitState::Open => {
|
||||
if let Some(opened_at) = *self.last_opened_at.read().await {
|
||||
if opened_at.elapsed().as_secs() >= config.timeout_seconds {
|
||||
drop(config); // 释放读锁再转换状态
|
||||
log::info!(
|
||||
"Circuit breaker transitioning from Open to HalfOpen (timeout reached)"
|
||||
);
|
||||
self.transition_to_half_open().await;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// 检查是否允许请求通过
|
||||
pub async fn allow_request(&self) -> bool {
|
||||
let state = *self.state.read().await;
|
||||
|
||||
@@ -148,13 +148,33 @@ impl RequestForwarder {
|
||||
|
||||
let mut last_error = None;
|
||||
let mut failover_happened = false;
|
||||
let mut attempted_providers = 0usize;
|
||||
|
||||
// 依次尝试每个供应商
|
||||
for (attempt, provider) in providers.iter().enumerate() {
|
||||
for provider in providers.iter() {
|
||||
// 发起请求前先获取熔断器放行许可(HalfOpen 会占用探测名额)
|
||||
if !self
|
||||
.router
|
||||
.allow_provider_request(&provider.id, app_type_str)
|
||||
.await
|
||||
{
|
||||
log::debug!(
|
||||
"[{}] Provider {} 熔断器拒绝本次请求,跳过",
|
||||
app_type_str,
|
||||
provider.name
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
attempted_providers += 1;
|
||||
if attempted_providers > 1 {
|
||||
failover_happened = true;
|
||||
}
|
||||
|
||||
log::info!(
|
||||
"[{}] 尝试 {}/{} - 使用Provider: {} (sort_index: {})",
|
||||
app_type_str,
|
||||
attempt + 1,
|
||||
attempted_providers,
|
||||
providers.len(),
|
||||
provider.name,
|
||||
provider.sort_index.unwrap_or(999999)
|
||||
@@ -167,9 +187,6 @@ impl RequestForwarder {
|
||||
status.current_provider_id = Some(provider.id.clone());
|
||||
status.total_requests += 1;
|
||||
status.last_request_at = Some(chrono::Utc::now().to_rfc3339());
|
||||
if attempt > 0 {
|
||||
failover_happened = true;
|
||||
}
|
||||
}
|
||||
|
||||
let start = Instant::now();
|
||||
@@ -306,6 +323,21 @@ impl RequestForwarder {
|
||||
}
|
||||
}
|
||||
|
||||
if attempted_providers == 0 {
|
||||
// providers 列表非空,但全部被熔断器拒绝(典型:HalfOpen 探测名额被占用)
|
||||
{
|
||||
let mut status = self.status.write().await;
|
||||
status.failed_requests += 1;
|
||||
status.last_error = Some("所有供应商暂时不可用(熔断器限制)".to_string());
|
||||
if status.total_requests > 0 {
|
||||
status.success_rate = (status.success_requests as f32
|
||||
/ status.total_requests as f32)
|
||||
* 100.0;
|
||||
}
|
||||
}
|
||||
return Err(ProxyError::NoAvailableProvider);
|
||||
}
|
||||
|
||||
// 所有供应商都失败了
|
||||
{
|
||||
let mut status = self.status.write().await;
|
||||
|
||||
@@ -43,7 +43,7 @@ impl ProviderRouter {
|
||||
let circuit_key = format!("{}:{}", app_type, current.id);
|
||||
let breaker = self.get_or_create_circuit_breaker(&circuit_key).await;
|
||||
|
||||
if breaker.allow_request().await {
|
||||
if breaker.is_available().await {
|
||||
log::info!(
|
||||
"[{}] Current provider available: {} ({})",
|
||||
app_type,
|
||||
@@ -81,7 +81,7 @@ impl ProviderRouter {
|
||||
let circuit_key = format!("{}:{}", app_type, provider.id);
|
||||
let breaker = self.get_or_create_circuit_breaker(&circuit_key).await;
|
||||
|
||||
if breaker.allow_request().await {
|
||||
if breaker.is_available().await {
|
||||
log::info!(
|
||||
"[{}] Failover provider available: {} ({}) at queue position {}",
|
||||
app_type,
|
||||
@@ -116,6 +116,20 @@ impl ProviderRouter {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// 请求执行前获取熔断器“放行许可”
|
||||
///
|
||||
/// - Closed:直接放行
|
||||
/// - Open:超时到达后切到 HalfOpen 并放行一次探测
|
||||
/// - HalfOpen:按限流规则放行探测
|
||||
///
|
||||
/// 注意:调用方必须在请求结束后通过 `record_result()` 释放 HalfOpen 名额,
|
||||
/// 否则会导致该 Provider 长时间无法进入探测状态。
|
||||
pub async fn allow_provider_request(&self, provider_id: &str, app_type: &str) -> bool {
|
||||
let circuit_key = format!("{app_type}:{provider_id}");
|
||||
let breaker = self.get_or_create_circuit_breaker(&circuit_key).await;
|
||||
breaker.allow_request().await
|
||||
}
|
||||
|
||||
/// 记录供应商请求结果
|
||||
pub async fn record_result(
|
||||
&self,
|
||||
@@ -155,24 +169,6 @@ impl ProviderRouter {
|
||||
)
|
||||
.await?;
|
||||
|
||||
// 4. 如果连续失败达到熔断阈值,自动禁用代理目标
|
||||
if !success {
|
||||
let health = self.db.get_provider_health(provider_id, app_type).await?;
|
||||
|
||||
// 如果连续失败达到阈值,自动关闭该供应商的代理开关
|
||||
if health.consecutive_failures >= failure_threshold {
|
||||
log::warn!(
|
||||
"Provider {} has failed {} times (threshold: {}), auto-disabling proxy target",
|
||||
provider_id,
|
||||
health.consecutive_failures,
|
||||
failure_threshold
|
||||
);
|
||||
self.db
|
||||
.set_proxy_target(provider_id, app_type, false)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -261,6 +257,7 @@ impl ProviderRouter {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::database::Database;
|
||||
use serde_json::json;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_provider_router_creation() {
|
||||
@@ -271,4 +268,44 @@ mod tests {
|
||||
let breaker = router.get_or_create_circuit_breaker("claude:test").await;
|
||||
assert!(breaker.allow_request().await);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn select_providers_does_not_consume_half_open_permit() {
|
||||
let db = Arc::new(Database::memory().unwrap());
|
||||
|
||||
// 配置:让熔断器 Open 后立刻进入 HalfOpen(timeout_seconds=0),并用 1 次失败就打开熔断器
|
||||
db.update_circuit_breaker_config(&CircuitBreakerConfig {
|
||||
failure_threshold: 1,
|
||||
timeout_seconds: 0,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// 准备 2 个 Provider:A(当前)+ B(队列)
|
||||
let provider_a =
|
||||
Provider::with_id("a".to_string(), "Provider A".to_string(), json!({}), None);
|
||||
let provider_b =
|
||||
Provider::with_id("b".to_string(), "Provider B".to_string(), json!({}), None);
|
||||
|
||||
db.save_provider("claude", &provider_a).unwrap();
|
||||
db.save_provider("claude", &provider_b).unwrap();
|
||||
db.set_current_provider("claude", "a").unwrap();
|
||||
db.add_to_failover_queue("claude", "b").unwrap();
|
||||
|
||||
let router = ProviderRouter::new(db.clone());
|
||||
|
||||
// 让 B 进入 Open 状态(failure_threshold=1)
|
||||
router
|
||||
.record_result("b", "claude", false, Some("fail".to_string()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// select_providers 只做“可用性判断”,不应占用 HalfOpen 探测名额
|
||||
let providers = router.select_providers("claude").await.unwrap();
|
||||
assert_eq!(providers.len(), 2);
|
||||
|
||||
// 如果 select_providers 错误地消耗了 HalfOpen 名额,这里会返回 false(被限流拒绝)
|
||||
assert!(router.allow_provider_request("b", "claude").await);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -253,7 +253,6 @@ mod tests {
|
||||
meta: None,
|
||||
icon: None,
|
||||
icon_color: None,
|
||||
is_proxy_target: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -174,7 +174,6 @@ mod tests {
|
||||
meta: None,
|
||||
icon: None,
|
||||
icon_color: None,
|
||||
is_proxy_target: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -250,7 +250,6 @@ mod tests {
|
||||
meta: None,
|
||||
icon: None,
|
||||
icon_color: None,
|
||||
is_proxy_target: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -205,7 +205,6 @@ mod tests {
|
||||
meta: None,
|
||||
icon: None,
|
||||
icon_color: None,
|
||||
is_proxy_target: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -394,7 +394,6 @@ mod tests {
|
||||
meta: None,
|
||||
icon: None,
|
||||
icon_color: None,
|
||||
is_proxy_target: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -214,9 +214,6 @@ impl ProviderService {
|
||||
// Update database is_current
|
||||
state.db.set_current_provider(app_type.as_str(), id)?;
|
||||
|
||||
// 同时更新 is_proxy_target(代理路由器使用此字段选择供应商)
|
||||
state.db.set_proxy_target_provider(app_type.as_str(), id)?;
|
||||
|
||||
// Update local settings for consistency
|
||||
crate::settings::set_current_provider(&app_type, Some(id))?;
|
||||
|
||||
@@ -229,7 +226,7 @@ impl ProviderService {
|
||||
.map_err(|e| AppError::Message(format!("更新 Live 备份失败: {e}")))?;
|
||||
|
||||
// Note: No Live config write, no MCP sync
|
||||
// The proxy server will route requests to the new provider via is_proxy_target
|
||||
// The proxy server will route requests to the new provider via is_current
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -287,18 +284,6 @@ impl ProviderService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set proxy target provider
|
||||
pub fn set_proxy_target(state: &AppState, app_type: AppType, id: &str) -> Result<(), AppError> {
|
||||
// Check if provider exists
|
||||
let providers = state.db.get_all_providers(app_type.as_str())?;
|
||||
if !providers.contains_key(id) {
|
||||
return Err(AppError::Message(format!("供应商 {id} 不存在")));
|
||||
}
|
||||
|
||||
state.db.set_proxy_target_provider(app_type.as_str(), id)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Sync current provider to live configuration (re-export)
|
||||
pub fn sync_current_to_live(state: &AppState) -> Result<(), AppError> {
|
||||
sync_current_to_live(state)
|
||||
|
||||
@@ -77,25 +77,22 @@ impl ProxyService {
|
||||
|
||||
/// 启动代理服务器(带 Live 配置接管)
|
||||
pub async fn start_with_takeover(&self) -> Result<ProxyServerInfo, String> {
|
||||
// 1. 自动将各应用当前选中的供应商设置为代理目标
|
||||
self.setup_proxy_targets().await?;
|
||||
|
||||
// 2. 备份各应用的 Live 配置
|
||||
// 1. 备份各应用的 Live 配置
|
||||
self.backup_live_configs().await?;
|
||||
|
||||
// 3. 同步 Live 配置中的 Token 到数据库(确保代理能读到最新的 Token)
|
||||
// 2. 同步 Live 配置中的 Token 到数据库(确保代理能读到最新的 Token)
|
||||
self.sync_live_to_providers().await?;
|
||||
|
||||
// 4. 接管各应用的 Live 配置(写入代理地址,清空 Token)
|
||||
// 3. 接管各应用的 Live 配置(写入代理地址,清空 Token)
|
||||
self.takeover_live_configs().await?;
|
||||
|
||||
// 5. 设置接管状态
|
||||
// 4. 设置接管状态
|
||||
self.db
|
||||
.set_live_takeover_active(true)
|
||||
.await
|
||||
.map_err(|e| format!("设置接管状态失败: {e}"))?;
|
||||
|
||||
// 6. 启动代理服务器
|
||||
// 5. 启动代理服务器
|
||||
match self.start().await {
|
||||
Ok(info) => Ok(info),
|
||||
Err(e) => {
|
||||
@@ -108,31 +105,6 @@ impl ProxyService {
|
||||
}
|
||||
}
|
||||
|
||||
/// 自动设置代理目标:将各应用当前选中的供应商设置为代理目标
|
||||
async fn setup_proxy_targets(&self) -> Result<(), String> {
|
||||
let app_types = ["claude", "codex", "gemini"];
|
||||
|
||||
for app_type in app_types {
|
||||
// 获取当前选中的供应商
|
||||
if let Ok(Some(provider_id)) = self.db.get_current_provider(app_type) {
|
||||
// 设置为代理目标
|
||||
if let Err(e) = self.db.set_proxy_target(&provider_id, app_type, true).await {
|
||||
log::warn!("设置 {} 的代理目标 {} 失败: {}", app_type, provider_id, e);
|
||||
} else {
|
||||
log::info!(
|
||||
"已将 {} 的当前供应商 {} 设置为代理目标",
|
||||
app_type,
|
||||
provider_id
|
||||
);
|
||||
}
|
||||
} else {
|
||||
log::debug!("{} 没有当前供应商,跳过代理目标设置", app_type);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 同步 Live 配置中的 Token 到数据库
|
||||
///
|
||||
/// 在清空 Live Token 之前调用,确保数据库中的 Provider 配置有最新的 Token。
|
||||
|
||||
Reference in New Issue
Block a user