feat(proxy): sync UI when failover succeeds

Add FailoverSwitchManager to handle provider switching after successful
failover. This ensures the UI reflects the actual provider in use:

- Create failover_switch.rs with deduplication and async switching logic
- Pass AppHandle through ProxyService -> ProxyServer -> RequestForwarder
- Update is_current in database when failover succeeds
- Emit provider-switched event for frontend refresh
- Update tray menu and live backup synchronously

The switching runs asynchronously via tokio::spawn to avoid blocking
API responses while still providing immediate UI feedback.
This commit is contained in:
Jason
2025-12-15 16:28:00 +08:00
parent 1172209f49
commit 9196d07925
7 changed files with 210 additions and 4 deletions
+5
View File
@@ -332,6 +332,11 @@ pub fn run() {
let app_state = AppState::new(db);
// 设置 AppHandle 用于代理故障转移时的 UI 更新
app_state
.proxy_service
.set_app_handle(app.handle().clone());
// ============================================================
// 按表独立判断的导入逻辑(各类数据独立检查,互不影响)
// ============================================================
+148
View File
@@ -0,0 +1,148 @@
//! 故障转移切换模块
//!
//! 处理故障转移成功后的供应商切换逻辑,包括:
//! - 去重控制(避免多个请求同时触发)
//! - 数据库更新
//! - 托盘菜单更新
//! - 前端事件发射
//! - Live 备份更新
use crate::database::Database;
use crate::error::AppError;
use std::collections::HashSet;
use std::str::FromStr;
use std::sync::Arc;
use tauri::{Emitter, Manager};
use tokio::sync::RwLock;
/// 故障转移切换管理器
///
/// 负责处理故障转移成功后的供应商切换,确保 UI 能够直观反映当前使用的供应商。
#[derive(Clone)]
pub struct FailoverSwitchManager {
/// 正在处理中的切换(key = "app_type:provider_id"
pending_switches: Arc<RwLock<HashSet<String>>>,
db: Arc<Database>,
}
impl FailoverSwitchManager {
pub fn new(db: Arc<Database>) -> Self {
Self {
pending_switches: Arc::new(RwLock::new(HashSet::new())),
db,
}
}
/// 尝试执行故障转移切换
///
/// 如果相同的切换已在进行中,则跳过;否则执行切换逻辑。
///
/// # Returns
/// - `Ok(true)` - 切换成功执行
/// - `Ok(false)` - 切换已在进行中,跳过
/// - `Err(e)` - 切换过程中发生错误
pub async fn try_switch(
&self,
app_handle: Option<&tauri::AppHandle>,
app_type: &str,
provider_id: &str,
provider_name: &str,
) -> Result<bool, AppError> {
let switch_key = format!("{}:{}", app_type, provider_id);
// 去重检查:如果相同切换已在进行中,跳过
{
let mut pending = self.pending_switches.write().await;
if pending.contains(&switch_key) {
log::debug!(
"[Failover] 切换已在进行中,跳过: {} -> {}",
app_type,
provider_id
);
return Ok(false);
}
pending.insert(switch_key.clone());
}
// 执行切换(确保最后清理 pending 标记)
let result = self
.do_switch(app_handle, app_type, provider_id, provider_name)
.await;
// 清理 pending 标记
{
let mut pending = self.pending_switches.write().await;
pending.remove(&switch_key);
}
result
}
async fn do_switch(
&self,
app_handle: Option<&tauri::AppHandle>,
app_type: &str,
provider_id: &str,
provider_name: &str,
) -> Result<bool, AppError> {
log::info!(
"[Failover] 开始切换供应商: {} -> {} ({})",
app_type,
provider_name,
provider_id
);
// 1. 更新数据库 is_current
self.db.set_current_provider(app_type, provider_id)?;
// 2. 更新本地 settings(设备级)
let app_type_enum = crate::app_config::AppType::from_str(app_type)
.map_err(|_| AppError::Message(format!("无效的应用类型: {}", app_type)))?;
crate::settings::set_current_provider(&app_type_enum, Some(provider_id))?;
// 3. 更新托盘菜单和发射事件
if let Some(app) = app_handle {
// 更新托盘菜单
if let Some(app_state) = app.try_state::<crate::store::AppState>() {
// 更新 Live 备份(确保代理停止时恢复正确配置)
if let Ok(Some(provider)) = self.db.get_provider_by_id(provider_id, app_type) {
if let Err(e) = app_state
.proxy_service
.update_live_backup_from_provider(app_type, &provider)
.await
{
log::warn!("[Failover] 更新 Live 备份失败: {e}");
}
}
// 重建托盘菜单
if let Ok(new_menu) = crate::tray::create_tray_menu(app, app_state.inner()) {
if let Some(tray) = app.tray_by_id("main") {
if let Err(e) = tray.set_menu(Some(new_menu)) {
log::error!("[Failover] 更新托盘菜单失败: {e}");
}
}
}
}
// 发射事件到前端
let event_data = serde_json::json!({
"appType": app_type,
"providerId": provider_id,
"source": "failover" // 标识来源是故障转移
});
if let Err(e) = app.emit("provider-switched", event_data) {
log::error!("[Failover] 发射供应商切换事件失败: {e}");
}
}
log::info!(
"[Failover] 供应商切换完成: {} -> {} ({})",
app_type,
provider_name,
provider_id
);
Ok(true)
}
}
+23
View File
@@ -4,6 +4,7 @@
use super::{
error::*,
failover_switch::FailoverSwitchManager,
provider_router::ProviderRouter,
providers::{get_adapter, ProviderAdapter},
types::ProxyStatus,
@@ -24,6 +25,10 @@ pub struct RequestForwarder {
max_retries: u8,
status: Arc<RwLock<ProxyStatus>>,
current_providers: Arc<RwLock<std::collections::HashMap<String, (String, String)>>>,
/// 故障转移切换管理器
failover_manager: Arc<FailoverSwitchManager>,
/// AppHandle,用于发射事件和更新托盘
app_handle: Option<tauri::AppHandle>,
}
impl RequestForwarder {
@@ -33,6 +38,8 @@ impl RequestForwarder {
max_retries: u8,
status: Arc<RwLock<ProxyStatus>>,
current_providers: Arc<RwLock<std::collections::HashMap<String, (String, String)>>>,
failover_manager: Arc<FailoverSwitchManager>,
app_handle: Option<tauri::AppHandle>,
) -> Self {
let mut client_builder = Client::builder();
if timeout_secs > 0 {
@@ -49,6 +56,8 @@ impl RequestForwarder {
max_retries,
status,
current_providers,
failover_manager,
app_handle,
}
}
@@ -201,6 +210,20 @@ impl RequestForwarder {
provider.name,
latency
);
// 异步触发供应商切换,更新 UI 和托盘菜单
let fm = self.failover_manager.clone();
let ah = self.app_handle.clone();
let pid = provider.id.clone();
let pname = provider.name.clone();
let at = app_type_str.to_string();
tokio::spawn(async move {
if let Err(e) = fm.try_switch(ah.as_ref(), &at, &pid, &pname).await
{
log::error!("[Failover] 切换供应商失败: {e}");
}
});
}
// 重新计算成功率
if status.total_requests > 0 {
+2
View File
@@ -131,6 +131,8 @@ impl RequestContext {
self.config.max_retries,
state.status.clone(),
state.current_providers.clone(),
state.failover_manager.clone(),
state.app_handle.clone(),
)
}
+1
View File
@@ -4,6 +4,7 @@
pub mod circuit_breaker;
pub mod error;
pub(crate) mod failover_switch;
mod forwarder;
pub mod handler_config;
pub mod handler_context;
+17 -2
View File
@@ -2,7 +2,10 @@
//!
//! 基于Axum的HTTP服务器,处理代理请求
use super::{handlers, provider_router::ProviderRouter, types::*, ProxyError};
use super::{
failover_switch::FailoverSwitchManager, handlers, provider_router::ProviderRouter, types::*,
ProxyError,
};
use crate::database::Database;
use axum::{
routing::{get, post},
@@ -25,6 +28,10 @@ pub struct ProxyState {
pub current_providers: Arc<RwLock<std::collections::HashMap<String, (String, String)>>>,
/// 共享的 ProviderRouter(持有熔断器状态,跨请求保持)
pub provider_router: Arc<ProviderRouter>,
/// AppHandle,用于发射事件和更新托盘菜单
pub app_handle: Option<tauri::AppHandle>,
/// 故障转移切换管理器
pub failover_manager: Arc<FailoverSwitchManager>,
}
/// 代理HTTP服务器
@@ -37,9 +44,15 @@ pub struct ProxyServer {
}
impl ProxyServer {
pub fn new(config: ProxyConfig, db: Arc<Database>) -> Self {
pub fn new(
config: ProxyConfig,
db: Arc<Database>,
app_handle: Option<tauri::AppHandle>,
) -> Self {
// 创建共享的 ProviderRouter(熔断器状态将跨所有请求保持)
let provider_router = Arc::new(ProviderRouter::new(db.clone()));
// 创建故障转移切换管理器
let failover_manager = Arc::new(FailoverSwitchManager::new(db.clone()));
let state = ProxyState {
db,
@@ -48,6 +61,8 @@ impl ProxyServer {
start_time: Arc::new(RwLock::new(None)),
current_providers: Arc::new(RwLock::new(std::collections::HashMap::new())),
provider_router,
app_handle,
failover_manager,
};
Self {
+14 -2
View File
@@ -17,6 +17,8 @@ use tokio::sync::RwLock;
pub struct ProxyService {
db: Arc<Database>,
server: Arc<RwLock<Option<ProxyServer>>>,
/// AppHandle,用于传递给 ProxyServer 以支持故障转移时的 UI 更新
app_handle: Arc<RwLock<Option<tauri::AppHandle>>>,
}
impl ProxyService {
@@ -24,9 +26,17 @@ impl ProxyService {
Self {
db,
server: Arc::new(RwLock::new(None)),
app_handle: Arc::new(RwLock::new(None)),
}
}
/// 设置 AppHandle(在应用初始化时调用)
pub fn set_app_handle(&self, handle: tauri::AppHandle) {
futures::executor::block_on(async {
*self.app_handle.write().await = Some(handle);
});
}
/// 启动代理服务器
pub async fn start(&self) -> Result<ProxyServerInfo, String> {
// 1. 获取配置
@@ -45,7 +55,8 @@ impl ProxyService {
}
// 4. 创建并启动服务器
let server = ProxyServer::new(config.clone(), self.db.clone());
let app_handle = self.app_handle.read().await.clone();
let server = ProxyServer::new(config.clone(), self.db.clone(), app_handle);
let info = server
.start()
.await
@@ -682,7 +693,8 @@ impl ProxyService {
.map_err(|e| format!("重启前停止代理服务器失败: {e}"))?;
}
let new_server = ProxyServer::new(new_config, self.db.clone());
let app_handle = self.app_handle.read().await.clone();
let new_server = ProxyServer::new(new_config, self.db.clone(), app_handle);
new_server
.start()
.await