Feat/provider icon color (#385)

* feat(ui): add color prop support to ProviderIcon component

* feat(health): add stream check core functionality

Add new stream-based health check module to replace model_test:
- Add stream_check command layer with single and batch provider checks
- Add stream_check DAO layer for config and log persistence
- Add stream_check service layer with retry mechanism and health status evaluation
- Add frontend HealthStatusIndicator component
- Add frontend useStreamCheck hook

This provides more comprehensive health checking capabilities.

* refactor(health): replace model_test with stream_check

Replace model_test module with stream_check across the codebase:
- Remove model_test command and service modules
- Update command registry in lib.rs to use stream_check commands
- Update module exports in commands/mod.rs and services/mod.rs
- Remove frontend useModelTest hook
- Update stream_check command implementation

This refactoring provides clearer naming and better separation of concerns.

* refactor(db): clean up unused database tables and optimize schema

Remove deprecated and unused database tables:
- Remove proxy_usage table (replaced by proxy_request_logs)
- Remove usage_daily_stats table (aggregation done on-the-fly)
- Rename model_test_logs to stream_check_logs with updated schema
- Remove related DAO methods for proxy_usage
- Update usage_stats service to use proxy_request_logs only
- Refactor usage_script to work with new schema

This simplifies the database schema and removes redundant data storage.

* refactor(ui): update frontend components for stream check

Update frontend components to use stream check API:
- Refactor ModelTestConfigPanel to use stream check config
- Update API layer to use stream_check commands
- Add HealthStatus type and StreamCheckResult interface
- Update ProviderList to use new health check integration
- Update AutoFailoverConfigPanel with stream check references
- Improve UI layout and configuration options

This completes the frontend migration from model_test to stream_check.

* feat(health): add configurable test models and reasoning effort support

Enhance stream check service with configurable test models:
- Add claude_model, codex_model, gemini_model to StreamCheckConfig
- Support reasoning effort syntax (model@level or model#level)
- Parse and apply reasoning_effort for OpenAI-compatible models
- Remove hardcoded model names from check functions
- Add unit tests for model parsing logic
- Remove obsolete model_test source files

This allows users to customize which models are used for health checks.
This commit is contained in:
YoVinchen
2025-12-11 17:20:44 +08:00
committed by GitHub
parent 038b74b844
commit 395783e22a
21 changed files with 883 additions and 1102 deletions

View File

@@ -6,13 +6,13 @@ mod env;
mod import_export;
mod mcp;
mod misc;
mod model_test;
mod plugin;
mod prompt;
mod provider;
mod proxy;
mod settings;
pub mod skill;
mod stream_check;
mod usage;
pub use config::*;
@@ -21,11 +21,11 @@ pub use env::*;
pub use import_export::*;
pub use mcp::*;
pub use misc::*;
pub use model_test::*;
pub use plugin::*;
pub use prompt::*;
pub use provider::*;
pub use proxy::*;
pub use settings::*;
pub use skill::*;
pub use stream_check::*;
pub use usage::*;

View File

@@ -1,128 +0,0 @@
//! 模型测试相关命令
use crate::app_config::AppType;
use crate::error::AppError;
use crate::services::model_test::{
ModelTestConfig, ModelTestLog, ModelTestResult, ModelTestService,
};
use crate::store::AppState;
use tauri::State;
/// 测试单个供应商的模型可用性
#[tauri::command]
pub async fn test_provider_model(
state: State<'_, AppState>,
app_type: AppType,
provider_id: String,
) -> Result<ModelTestResult, AppError> {
// 获取测试配置
let config = state.db.get_model_test_config()?;
// 获取供应商
let providers = state.db.get_all_providers(app_type.as_str())?;
let provider = providers
.get(&provider_id)
.ok_or_else(|| AppError::Message(format!("供应商 {provider_id} 不存在")))?;
// 执行测试
let result = ModelTestService::test_provider(&app_type, provider, &config).await?;
// 记录日志
let _ = state.db.save_model_test_log(
&provider_id,
&provider.name,
app_type.as_str(),
&result.model_used,
&config.test_prompt,
&result,
);
Ok(result)
}
/// 批量测试所有供应商
#[tauri::command]
pub async fn test_all_providers_model(
state: State<'_, AppState>,
app_type: AppType,
proxy_targets_only: bool,
) -> Result<Vec<(String, ModelTestResult)>, AppError> {
let config = state.db.get_model_test_config()?;
let providers = state.db.get_all_providers(app_type.as_str())?;
let mut results = Vec::new();
for (id, provider) in providers {
// 如果只测试代理目标,跳过非代理目标
if proxy_targets_only && !provider.is_proxy_target.unwrap_or(false) {
continue;
}
match ModelTestService::test_provider(&app_type, &provider, &config).await {
Ok(result) => {
// 记录日志
let _ = state.db.save_model_test_log(
&id,
&provider.name,
app_type.as_str(),
&result.model_used,
&config.test_prompt,
&result,
);
results.push((id, result));
}
Err(e) => {
let error_result = ModelTestResult {
success: false,
message: e.to_string(),
response_time_ms: None,
http_status: None,
model_used: String::new(),
tested_at: chrono::Utc::now().timestamp(),
};
results.push((id, error_result));
}
}
}
Ok(results)
}
/// 获取模型测试配置
#[tauri::command]
pub fn get_model_test_config(state: State<'_, AppState>) -> Result<ModelTestConfig, AppError> {
state.db.get_model_test_config()
}
/// 保存模型测试配置
#[tauri::command]
pub fn save_model_test_config(
state: State<'_, AppState>,
config: ModelTestConfig,
) -> Result<(), AppError> {
state.db.save_model_test_config(&config)
}
/// 获取模型测试日志
#[tauri::command]
pub fn get_model_test_logs(
state: State<'_, AppState>,
app_type: Option<String>,
provider_id: Option<String>,
limit: Option<u32>,
) -> Result<Vec<ModelTestLog>, AppError> {
state.db.get_model_test_logs(
app_type.as_deref(),
provider_id.as_deref(),
limit.unwrap_or(50),
)
}
/// 清理旧的测试日志
#[tauri::command]
pub fn cleanup_model_test_logs(
state: State<'_, AppState>,
keep_count: Option<u32>,
) -> Result<u64, AppError> {
state.db.cleanup_model_test_logs(keep_count.unwrap_or(100))
}

View File

@@ -0,0 +1,89 @@
//! 流式健康检查命令
use crate::app_config::AppType;
use crate::error::AppError;
use crate::services::stream_check::{
HealthStatus, StreamCheckConfig, StreamCheckResult, StreamCheckService,
};
use crate::store::AppState;
use tauri::State;
/// 流式健康检查(单个供应商)
#[tauri::command]
pub async fn stream_check_provider(
state: State<'_, AppState>,
app_type: AppType,
provider_id: String,
) -> Result<StreamCheckResult, AppError> {
let config = state.db.get_stream_check_config()?;
let providers = state.db.get_all_providers(app_type.as_str())?;
let provider = providers
.get(&provider_id)
.ok_or_else(|| AppError::Message(format!("供应商 {provider_id} 不存在")))?;
let result = StreamCheckService::check_with_retry(&app_type, provider, &config).await?;
// 记录日志
let _ =
state
.db
.save_stream_check_log(&provider_id, &provider.name, app_type.as_str(), &result);
Ok(result)
}
/// 批量流式健康检查
#[tauri::command]
pub async fn stream_check_all_providers(
state: State<'_, AppState>,
app_type: AppType,
proxy_targets_only: bool,
) -> Result<Vec<(String, StreamCheckResult)>, AppError> {
let config = state.db.get_stream_check_config()?;
let providers = state.db.get_all_providers(app_type.as_str())?;
let mut results = Vec::new();
for (id, provider) in providers {
if proxy_targets_only && !provider.is_proxy_target.unwrap_or(false) {
continue;
}
let result = StreamCheckService::check_with_retry(&app_type, &provider, &config)
.await
.unwrap_or_else(|e| StreamCheckResult {
status: HealthStatus::Failed,
success: false,
message: e.to_string(),
response_time_ms: None,
http_status: None,
model_used: String::new(),
tested_at: chrono::Utc::now().timestamp(),
retry_count: 0,
});
let _ = state
.db
.save_stream_check_log(&id, &provider.name, app_type.as_str(), &result);
results.push((id, result));
}
Ok(results)
}
/// 获取流式检查配置
#[tauri::command]
pub fn get_stream_check_config(state: State<'_, AppState>) -> Result<StreamCheckConfig, AppError> {
state.db.get_stream_check_config()
}
/// 保存流式检查配置
#[tauri::command]
pub fn save_stream_check_config(
state: State<'_, AppState>,
config: StreamCheckConfig,
) -> Result<(), AppError> {
state.db.save_stream_check_config(&config)
}

View File

@@ -8,5 +8,6 @@ pub mod providers;
pub mod proxy;
pub mod settings;
pub mod skills;
pub mod stream_check;
// 所有 DAO 方法都通过 Database impl 提供,无需单独导出

View File

@@ -210,83 +210,6 @@ impl Database {
Ok(())
}
// ==================== Proxy Usage (可选) ====================
/// 记录代理使用统计
#[allow(dead_code)]
pub async fn record_proxy_usage(&self, record: &ProxyUsageRecord) -> Result<(), AppError> {
let conn = lock_conn!(self.conn);
conn.execute(
"INSERT INTO proxy_usage
(provider_id, app_type, endpoint, request_tokens, response_tokens,
status_code, latency_ms, error, timestamp)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
rusqlite::params![
&record.provider_id,
&record.app_type,
&record.endpoint,
record.request_tokens,
record.response_tokens,
record.status_code as i64,
record.latency_ms as i64,
&record.error,
&record.timestamp,
],
)
.map_err(|e| AppError::Database(e.to_string()))?;
Ok(())
}
/// 查询最近的使用统计
#[allow(dead_code)]
pub async fn get_recent_usage(
&self,
provider_id: &str,
app_type: &str,
limit: usize,
) -> Result<Vec<ProxyUsageRecord>, AppError> {
let conn = lock_conn!(self.conn);
let mut stmt = conn
.prepare(
"SELECT provider_id, app_type, endpoint, request_tokens, response_tokens,
status_code, latency_ms, error, timestamp
FROM proxy_usage
WHERE provider_id = ?1 AND app_type = ?2
ORDER BY timestamp DESC
LIMIT ?3",
)
.map_err(|e| AppError::Database(e.to_string()))?;
let rows = stmt
.query_map(
rusqlite::params![provider_id, app_type, limit as i64],
|row| {
Ok(ProxyUsageRecord {
provider_id: row.get(0)?,
app_type: row.get(1)?,
endpoint: row.get(2)?,
request_tokens: row.get(3)?,
response_tokens: row.get(4)?,
status_code: row.get::<_, i64>(5)? as u16,
latency_ms: row.get::<_, i64>(6)? as u64,
error: row.get(7)?,
timestamp: row.get(8)?,
})
},
)
.map_err(|e| AppError::Database(e.to_string()))?;
let mut records = Vec::new();
for row in rows {
records.push(row.map_err(|e| AppError::Database(e.to_string()))?);
}
Ok(records)
}
// ==================== Circuit Breaker Config ====================
/// 获取熔断器配置

View File

@@ -0,0 +1,57 @@
//! 流式健康检查日志 DAO
use crate::database::{lock_conn, Database};
use crate::error::AppError;
use crate::services::stream_check::{StreamCheckConfig, StreamCheckResult};
impl Database {
/// 保存流式检查日志
pub fn save_stream_check_log(
&self,
provider_id: &str,
provider_name: &str,
app_type: &str,
result: &StreamCheckResult,
) -> Result<i64, AppError> {
let conn = lock_conn!(self.conn);
conn.execute(
"INSERT INTO stream_check_logs
(provider_id, provider_name, app_type, status, success, message,
response_time_ms, http_status, model_used, retry_count, tested_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
rusqlite::params![
provider_id,
provider_name,
app_type,
format!("{:?}", result.status).to_lowercase(),
result.success,
result.message,
result.response_time_ms.map(|t| t as i64),
result.http_status.map(|s| s as i64),
result.model_used,
result.retry_count as i64,
result.tested_at,
],
)
.map_err(|e| AppError::Database(e.to_string()))?;
Ok(conn.last_insert_rowid())
}
/// 获取流式检查配置
pub fn get_stream_check_config(&self) -> Result<StreamCheckConfig, AppError> {
match self.get_setting("stream_check_config")? {
Some(json) => serde_json::from_str(&json)
.map_err(|e| AppError::Message(format!("解析配置失败: {e}"))),
None => Ok(StreamCheckConfig::default()),
}
}
/// 保存流式检查配置
pub fn save_stream_check_config(&self, config: &StreamCheckConfig) -> Result<(), AppError> {
let json = serde_json::to_string(config)
.map_err(|e| AppError::Message(format!("序列化配置失败: {e}")))?;
self.set_setting("stream_check_config", &json)
}
}

View File

@@ -173,40 +173,7 @@ impl Database {
)
.map_err(|e| AppError::Database(e.to_string()))?;
// 10. Proxy Usage 表 (代理使用统计,可选)
conn.execute(
"CREATE TABLE IF NOT EXISTS proxy_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
provider_id TEXT NOT NULL,
app_type TEXT NOT NULL,
endpoint TEXT NOT NULL,
request_tokens INTEGER,
response_tokens INTEGER,
status_code INTEGER NOT NULL,
latency_ms INTEGER NOT NULL,
error TEXT,
timestamp TEXT NOT NULL
)",
[],
)
.map_err(|e| AppError::Database(e.to_string()))?;
// 为 proxy_usage 创建索引
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_proxy_usage_timestamp
ON proxy_usage(timestamp)",
[],
)
.map_err(|e| AppError::Database(e.to_string()))?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_proxy_usage_provider
ON proxy_usage(provider_id, app_type)",
[],
)
.map_err(|e| AppError::Database(e.to_string()))?;
// 11. Proxy Request Logs 表 (详细请求日志)
// 10. Proxy Request Logs 表 (详细请求日志)
conn.execute(
"CREATE TABLE IF NOT EXISTS proxy_request_logs (
request_id TEXT PRIMARY KEY,
@@ -272,7 +239,7 @@ impl Database {
)
.map_err(|e| AppError::Database(e.to_string()))?;
// 12. Model Pricing 表 (模型定价)
// 11. Model Pricing 表 (模型定价)
conn.execute(
"CREATE TABLE IF NOT EXISTS model_pricing (
model_id TEXT PRIMARY KEY,
@@ -286,38 +253,20 @@ impl Database {
)
.map_err(|e| AppError::Database(e.to_string()))?;
// 13. Usage Daily Stats 表 (每日聚合统计)
// 12. Stream Check Logs 表 (流式健康检查日志)
conn.execute(
"CREATE TABLE IF NOT EXISTS usage_daily_stats (
date TEXT NOT NULL,
provider_id TEXT NOT NULL,
app_type TEXT NOT NULL,
model TEXT NOT NULL,
request_count INTEGER NOT NULL DEFAULT 0,
total_input_tokens INTEGER NOT NULL DEFAULT 0,
total_output_tokens INTEGER NOT NULL DEFAULT 0,
total_cost_usd TEXT NOT NULL DEFAULT '0',
success_count INTEGER NOT NULL DEFAULT 0,
error_count INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (date, provider_id, app_type, model)
)",
[],
)
.map_err(|e| AppError::Database(e.to_string()))?;
// 14. Model Test Logs 表 (模型测试日志,独立于代理使用统计)
conn.execute(
"CREATE TABLE IF NOT EXISTS model_test_logs (
"CREATE TABLE IF NOT EXISTS stream_check_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
provider_id TEXT NOT NULL,
provider_name TEXT NOT NULL,
app_type TEXT NOT NULL,
model TEXT NOT NULL,
prompt TEXT NOT NULL,
status TEXT NOT NULL,
success INTEGER NOT NULL,
message TEXT NOT NULL,
response_time_ms INTEGER,
http_status INTEGER,
model_used TEXT,
retry_count INTEGER DEFAULT 0,
tested_at INTEGER NOT NULL
)",
[],
@@ -325,20 +274,13 @@ impl Database {
.map_err(|e| AppError::Database(e.to_string()))?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_model_test_logs_provider
ON model_test_logs(provider_id, app_type)",
"CREATE INDEX IF NOT EXISTS idx_stream_check_logs_provider
ON stream_check_logs(app_type, provider_id, tested_at DESC)",
[],
)
.map_err(|e| AppError::Database(e.to_string()))?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_model_test_logs_tested_at
ON model_test_logs(tested_at DESC)",
[],
)
.map_err(|e| AppError::Database(e.to_string()))?;
// 15. Circuit Breaker Config 表 (熔断器配置)
// 13. Circuit Breaker Config 表 (熔断器配置)
conn.execute(
"CREATE TABLE IF NOT EXISTS circuit_breaker_config (
id INTEGER PRIMARY KEY CHECK (id = 1),
@@ -574,24 +516,6 @@ impl Database {
[],
)?;
// usage_daily_stats 表
conn.execute(
"CREATE TABLE IF NOT EXISTS usage_daily_stats (
date TEXT NOT NULL,
provider_id TEXT NOT NULL,
app_type TEXT NOT NULL,
model TEXT NOT NULL,
request_count INTEGER NOT NULL DEFAULT 0,
total_input_tokens INTEGER NOT NULL DEFAULT 0,
total_output_tokens INTEGER NOT NULL DEFAULT 0,
total_cost_usd TEXT NOT NULL DEFAULT '0',
success_count INTEGER NOT NULL DEFAULT 0,
error_count INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (date, provider_id, app_type, model)
)",
[],
)?;
// 清空并重新插入模型定价
conn.execute("DELETE FROM model_pricing", [])
.map_err(|e| AppError::Database(format!("清空模型定价失败: {e}")))?;

View File

@@ -674,13 +674,11 @@ pub fn run() {
commands::update_model_pricing,
commands::delete_model_pricing,
commands::check_provider_limits,
// Model testing
commands::test_provider_model,
commands::test_all_providers_model,
commands::get_model_test_config,
commands::save_model_test_config,
commands::get_model_test_logs,
commands::cleanup_model_test_logs,
// Stream health check
commands::stream_check_provider,
commands::stream_check_all_providers,
commands::get_stream_check_config,
commands::save_stream_check_config,
commands::get_tool_versions,
]);

View File

@@ -108,20 +108,6 @@ pub struct ProviderHealth {
pub updated_at: String,
}
/// 使用统计记录
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProxyUsageRecord {
pub provider_id: String,
pub app_type: String,
pub endpoint: String,
pub request_tokens: Option<i32>,
pub response_tokens: Option<i32>,
pub status_code: u16,
pub latency_ms: u64,
pub error: Option<String>,
pub timestamp: String,
}
/// Live 配置备份记录
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LiveBackup {

View File

@@ -2,18 +2,16 @@ pub mod config;
pub mod env_checker;
pub mod env_manager;
pub mod mcp;
pub mod model_test;
pub mod prompt;
pub mod provider;
pub mod proxy;
pub mod skill;
pub mod speedtest;
pub mod stream_check;
pub mod usage_stats;
pub use config::ConfigService;
pub use mcp::McpService;
#[allow(unused_imports)]
pub use model_test::{ModelTestConfig, ModelTestLog, ModelTestResult, ModelTestService};
pub use prompt::PromptService;
pub use provider::{ProviderService, ProviderSortUpdate};
pub use proxy::ProxyService;

View File

@@ -1,510 +0,0 @@
//! 模型测试服务
//!
//! 提供独立的模型可用性测试功能,复用现有 Provider 适配器逻辑,
//! 但不影响正常代理数据流程。测试结果记录到独立的日志表。
use crate::app_config::AppType;
use crate::database::Database;
use crate::error::AppError;
use crate::provider::Provider;
use crate::proxy::providers::{get_adapter, AuthInfo, ProviderAdapter};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::time::{Duration, Instant};
/// 模型测试配置
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ModelTestConfig {
/// 默认测试模型Claude
pub claude_model: String,
/// 默认测试模型Codex/OpenAI
pub codex_model: String,
/// 默认测试模型Gemini
pub gemini_model: String,
/// 测试提示词
pub test_prompt: String,
/// 超时时间(秒)
pub timeout_secs: u64,
}
impl Default for ModelTestConfig {
fn default() -> Self {
Self {
claude_model: "claude-haiku-4-5-20251001".to_string(),
codex_model: "gpt-5.1-low".to_string(),
gemini_model: "gemini-3-pro-low".to_string(),
test_prompt: "ping".to_string(),
timeout_secs: 15,
}
}
}
/// 模型测试结果
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ModelTestResult {
pub success: bool,
pub message: String,
pub response_time_ms: Option<u64>,
pub http_status: Option<u16>,
pub model_used: String,
pub tested_at: i64,
}
/// 模型测试日志记录
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ModelTestLog {
pub id: i64,
pub provider_id: String,
pub provider_name: String,
pub app_type: String,
pub model: String,
pub prompt: String,
pub success: bool,
pub message: String,
pub response_time_ms: Option<i64>,
pub http_status: Option<i64>,
pub tested_at: i64,
}
/// 模型测试服务
pub struct ModelTestService;
impl ModelTestService {
/// 测试单个供应商的模型可用性
pub async fn test_provider(
app_type: &AppType,
provider: &Provider,
config: &ModelTestConfig,
) -> Result<ModelTestResult, AppError> {
let start = Instant::now();
let adapter = get_adapter(app_type);
// 构建 HTTP 客户端(独立于代理服务)
let client = Client::builder()
.timeout(Duration::from_secs(config.timeout_secs))
.build()
.map_err(|e| AppError::Message(format!("创建 HTTP 客户端失败: {e}")))?;
// 根据 AppType 选择测试模型
let model = match app_type {
AppType::Claude => &config.claude_model,
AppType::Codex => &config.codex_model,
AppType::Gemini => &config.gemini_model,
};
let result = match app_type {
AppType::Claude => {
Self::test_claude(
&client,
provider,
adapter.as_ref(),
model,
&config.test_prompt,
)
.await
}
AppType::Codex => {
Self::test_codex(
&client,
provider,
adapter.as_ref(),
model,
&config.test_prompt,
)
.await
}
AppType::Gemini => {
Self::test_gemini(
&client,
provider,
adapter.as_ref(),
model,
&config.test_prompt,
)
.await
}
};
let response_time = start.elapsed().as_millis() as u64;
let tested_at = chrono::Utc::now().timestamp();
match result {
Ok((status, msg)) => Ok(ModelTestResult {
success: true,
message: msg,
response_time_ms: Some(response_time),
http_status: Some(status),
model_used: model.clone(),
tested_at,
}),
Err(e) => Ok(ModelTestResult {
success: false,
message: e.to_string(),
response_time_ms: Some(response_time),
http_status: None,
model_used: model.clone(),
tested_at,
}),
}
}
/// 测试 Claude (Anthropic Messages API)
async fn test_claude(
client: &Client,
provider: &Provider,
adapter: &dyn ProviderAdapter,
model: &str,
prompt: &str,
) -> Result<(u16, String), AppError> {
let base_url = adapter
.extract_base_url(provider)
.map_err(|e| AppError::Message(format!("提取 base_url 失败: {e}")))?;
let auth = adapter
.extract_auth(provider)
.ok_or_else(|| AppError::Message("未找到 API Key".to_string()))?;
// 智能拼接 URL避免重复 /v1
let base = base_url.trim_end_matches('/');
let url = if base.ends_with("/v1") {
format!("{base}/messages")
} else {
format!("{base}/v1/messages")
};
let body = json!({
"model": model,
"max_tokens": 1,
"messages": [{
"role": "user",
"content": prompt
}]
});
let mut request = client.post(&url).json(&body);
request = Self::add_claude_auth(request, &auth);
let response = request.send().await.map_err(|e| {
if e.is_timeout() {
AppError::Message("请求超时".to_string())
} else if e.is_connect() {
AppError::Message(format!("连接失败: {e}"))
} else {
AppError::Message(e.to_string())
}
})?;
let status = response.status().as_u16();
if response.status().is_success() {
// 先获取文本,再尝试解析 JSON兼容流式响应
let text = response.text().await.unwrap_or_default();
// 尝试解析 JSON
if let Ok(data) = serde_json::from_str::<Value>(&text) {
if data.get("type").is_some()
|| data.get("content").is_some()
|| data.get("id").is_some()
{
return Ok((status, "模型测试成功".to_string()));
}
}
// 即使无法解析 JSON只要状态码是 200 就认为成功
Ok((status, "模型测试成功".to_string()))
} else {
let error_text = response.text().await.unwrap_or_default();
Err(AppError::Message(format!("HTTP {status}: {error_text}")))
}
}
/// 测试 Codex (OpenAI Chat Completions API)
async fn test_codex(
client: &Client,
provider: &Provider,
adapter: &dyn ProviderAdapter,
model: &str,
prompt: &str,
) -> Result<(u16, String), AppError> {
let base_url = adapter
.extract_base_url(provider)
.map_err(|e| AppError::Message(format!("提取 base_url 失败: {e}")))?;
let auth = adapter
.extract_auth(provider)
.ok_or_else(|| AppError::Message("未找到 API Key".to_string()))?;
// 智能拼接 URL避免重复 /v1
let base = base_url.trim_end_matches('/');
let url = if base.ends_with("/v1") {
format!("{base}/chat/completions")
} else {
format!("{base}/v1/chat/completions")
};
let body = json!({
"model": model,
"messages": [{
"role": "user",
"content": prompt
}],
"max_tokens": 1,
"stream": false
});
let request = client
.post(&url)
.header("Authorization", format!("Bearer {}", auth.api_key))
.header("Content-Type", "application/json")
.json(&body);
let response = request.send().await.map_err(|e| {
if e.is_timeout() {
AppError::Message("请求超时".to_string())
} else if e.is_connect() {
AppError::Message(format!("连接失败: {e}"))
} else {
AppError::Message(e.to_string())
}
})?;
let status = response.status().as_u16();
if response.status().is_success() {
// 先获取文本,再尝试解析 JSON
let text = response.text().await.unwrap_or_default();
if let Ok(data) = serde_json::from_str::<Value>(&text) {
if data.get("choices").is_some() || data.get("id").is_some() {
return Ok((status, "模型测试成功".to_string()));
}
}
// 即使无法解析 JSON只要状态码是 200 就认为成功
Ok((status, "模型测试成功".to_string()))
} else {
let error_text = response.text().await.unwrap_or_default();
Err(AppError::Message(format!("HTTP {status}: {error_text}")))
}
}
/// 测试 Gemini (Google Generative AI API)
async fn test_gemini(
client: &Client,
provider: &Provider,
adapter: &dyn ProviderAdapter,
model: &str,
prompt: &str,
) -> Result<(u16, String), AppError> {
let base_url = adapter
.extract_base_url(provider)
.map_err(|e| AppError::Message(format!("提取 base_url 失败: {e}")))?;
let auth = adapter
.extract_auth(provider)
.ok_or_else(|| AppError::Message("未找到 API Key".to_string()))?;
let url = format!(
"{}/v1beta/models/{}:generateContent?key={}",
base_url.trim_end_matches('/'),
model,
auth.api_key
);
let body = json!({
"contents": [{
"parts": [{
"text": prompt
}]
}],
"generationConfig": {
"maxOutputTokens": 1
}
});
let request = client
.post(&url)
.header("Content-Type", "application/json")
.json(&body);
let response = request.send().await.map_err(|e| {
if e.is_timeout() {
AppError::Message("请求超时".to_string())
} else if e.is_connect() {
AppError::Message(format!("连接失败: {e}"))
} else {
AppError::Message(e.to_string())
}
})?;
let status = response.status().as_u16();
if response.status().is_success() {
let data: Value = response
.json()
.await
.map_err(|e| AppError::Message(format!("解析响应失败: {e}")))?;
if data.get("candidates").is_some() {
Ok((status, "模型测试成功".to_string()))
} else {
Err(AppError::Message("响应格式异常".to_string()))
}
} else {
let error_text = response.text().await.unwrap_or_default();
Err(AppError::Message(format!("HTTP {status}: {error_text}")))
}
}
/// 添加 Claude 认证头
fn add_claude_auth(
request: reqwest::RequestBuilder,
auth: &AuthInfo,
) -> reqwest::RequestBuilder {
request
.header("x-api-key", &auth.api_key)
.header("anthropic-version", "2023-06-01")
.header("Content-Type", "application/json")
}
}
// ===== 数据库操作 =====
impl Database {
/// 保存模型测试日志
pub fn save_model_test_log(
&self,
provider_id: &str,
provider_name: &str,
app_type: &str,
model: &str,
prompt: &str,
result: &ModelTestResult,
) -> Result<i64, AppError> {
let conn = self
.conn
.lock()
.map_err(|e| AppError::Database(format!("获取数据库连接失败: {e}")))?;
conn.execute(
"INSERT INTO model_test_logs
(provider_id, provider_name, app_type, model, prompt, success, message, response_time_ms, http_status, tested_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
rusqlite::params![
provider_id,
provider_name,
app_type,
model,
prompt,
result.success,
result.message,
result.response_time_ms.map(|t| t as i64),
result.http_status.map(|s| s as i64),
result.tested_at,
],
)
.map_err(|e| AppError::Database(e.to_string()))?;
Ok(conn.last_insert_rowid())
}
/// 获取模型测试日志
pub fn get_model_test_logs(
&self,
app_type: Option<&str>,
provider_id: Option<&str>,
limit: u32,
) -> Result<Vec<ModelTestLog>, AppError> {
let conn = self
.conn
.lock()
.map_err(|e| AppError::Database(format!("获取数据库连接失败: {e}")))?;
let mut sql = String::from(
"SELECT id, provider_id, provider_name, app_type, model, prompt, success, message, response_time_ms, http_status, tested_at
FROM model_test_logs WHERE 1=1"
);
let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(at) = app_type {
sql.push_str(" AND app_type = ?");
params.push(Box::new(at.to_string()));
}
if let Some(pid) = provider_id {
sql.push_str(" AND provider_id = ?");
params.push(Box::new(pid.to_string()));
}
sql.push_str(" ORDER BY tested_at DESC LIMIT ?");
params.push(Box::new(limit as i64));
let params_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|p| p.as_ref()).collect();
let mut stmt = conn
.prepare(&sql)
.map_err(|e| AppError::Database(e.to_string()))?;
let logs = stmt
.query_map(params_refs.as_slice(), |row| {
Ok(ModelTestLog {
id: row.get(0)?,
provider_id: row.get(1)?,
provider_name: row.get(2)?,
app_type: row.get(3)?,
model: row.get(4)?,
prompt: row.get(5)?,
success: row.get(6)?,
message: row.get(7)?,
response_time_ms: row.get(8)?,
http_status: row.get(9)?,
tested_at: row.get(10)?,
})
})
.map_err(|e| AppError::Database(e.to_string()))?
.collect::<Result<Vec<_>, _>>()
.map_err(|e| AppError::Database(e.to_string()))?;
Ok(logs)
}
/// 获取模型测试配置
pub fn get_model_test_config(&self) -> Result<ModelTestConfig, AppError> {
match self.get_setting("model_test_config")? {
Some(json) => serde_json::from_str(&json)
.map_err(|e| AppError::Message(format!("解析模型测试配置失败: {e}"))),
None => Ok(ModelTestConfig::default()),
}
}
/// 保存模型测试配置
pub fn save_model_test_config(&self, config: &ModelTestConfig) -> Result<(), AppError> {
let json = serde_json::to_string(config)
.map_err(|e| AppError::Message(format!("序列化模型测试配置失败: {e}")))?;
self.set_setting("model_test_config", &json)
}
/// 清理旧的测试日志(保留最近 N 条)
pub fn cleanup_model_test_logs(&self, keep_count: u32) -> Result<u64, AppError> {
let conn = self
.conn
.lock()
.map_err(|e| AppError::Database(format!("获取数据库连接失败: {e}")))?;
let deleted = conn
.execute(
"DELETE FROM model_test_logs WHERE id NOT IN (
SELECT id FROM model_test_logs ORDER BY tested_at DESC LIMIT ?
)",
rusqlite::params![keep_count as i64],
)
.map_err(|e| AppError::Database(e.to_string()))?;
Ok(deleted as u64)
}
}

View File

@@ -0,0 +1,436 @@
//! 流式健康检查服务
//!
//! 使用流式 API 进行快速健康检查,只需接收首个 chunk 即判定成功。
use futures::StreamExt;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::time::{Duration, Instant};
use crate::app_config::AppType;
use crate::error::AppError;
use crate::provider::Provider;
use crate::proxy::providers::{get_adapter, AuthInfo};
/// 健康状态枚举
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum HealthStatus {
Operational,
Degraded,
Failed,
}
/// 流式检查配置
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StreamCheckConfig {
pub timeout_secs: u64,
pub max_retries: u32,
pub degraded_threshold_ms: u64,
/// Claude 测试模型
pub claude_model: String,
/// Codex 测试模型
pub codex_model: String,
/// Gemini 测试模型
pub gemini_model: String,
}
impl Default for StreamCheckConfig {
fn default() -> Self {
Self {
timeout_secs: 45,
max_retries: 2,
degraded_threshold_ms: 6000,
claude_model: "claude-haiku-4-5-20251001".to_string(),
codex_model: "gpt-5.1-codex@low".to_string(),
gemini_model: "gemini-3-pro-preview".to_string(),
}
}
}
/// 流式检查结果
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StreamCheckResult {
pub status: HealthStatus,
pub success: bool,
pub message: String,
pub response_time_ms: Option<u64>,
pub http_status: Option<u16>,
pub model_used: String,
pub tested_at: i64,
pub retry_count: u32,
}
/// 流式健康检查服务
pub struct StreamCheckService;
impl StreamCheckService {
/// 执行流式健康检查(带重试)
pub async fn check_with_retry(
app_type: &AppType,
provider: &Provider,
config: &StreamCheckConfig,
) -> Result<StreamCheckResult, AppError> {
let mut last_result = None;
for attempt in 0..=config.max_retries {
let result = Self::check_once(app_type, provider, config).await;
match &result {
Ok(r) if r.success => {
return Ok(StreamCheckResult {
retry_count: attempt,
..r.clone()
});
}
Ok(r) => {
// 失败但非异常,判断是否重试
if Self::should_retry(&r.message) && attempt < config.max_retries {
last_result = Some(r.clone());
continue;
}
return Ok(StreamCheckResult {
retry_count: attempt,
..r.clone()
});
}
Err(e) => {
if Self::should_retry(&e.to_string()) && attempt < config.max_retries {
continue;
}
return Err(AppError::Message(e.to_string()));
}
}
}
Ok(last_result.unwrap_or_else(|| StreamCheckResult {
status: HealthStatus::Failed,
success: false,
message: "检查失败".to_string(),
response_time_ms: None,
http_status: None,
model_used: String::new(),
tested_at: chrono::Utc::now().timestamp(),
retry_count: config.max_retries,
}))
}
/// 单次流式检查
async fn check_once(
app_type: &AppType,
provider: &Provider,
config: &StreamCheckConfig,
) -> Result<StreamCheckResult, AppError> {
let start = Instant::now();
let adapter = get_adapter(app_type);
let base_url = adapter
.extract_base_url(provider)
.map_err(|e| AppError::Message(format!("提取 base_url 失败: {e}")))?;
let auth = adapter
.extract_auth(provider)
.ok_or_else(|| AppError::Message("未找到 API Key".to_string()))?;
let client = Client::builder()
.timeout(Duration::from_secs(config.timeout_secs))
.user_agent("cc-switch/1.0")
.build()
.map_err(|e| AppError::Message(format!("创建客户端失败: {e}")))?;
let result = match app_type {
AppType::Claude => {
Self::check_claude_stream(&client, &base_url, &auth, &config.claude_model).await
}
AppType::Codex => {
Self::check_codex_stream(&client, &base_url, &auth, &config.codex_model).await
}
AppType::Gemini => {
Self::check_gemini_stream(&client, &base_url, &auth, &config.gemini_model).await
}
};
let response_time = start.elapsed().as_millis() as u64;
let tested_at = chrono::Utc::now().timestamp();
match result {
Ok((status_code, model)) => {
let health_status =
Self::determine_status(response_time, config.degraded_threshold_ms);
Ok(StreamCheckResult {
status: health_status,
success: true,
message: "检查成功".to_string(),
response_time_ms: Some(response_time),
http_status: Some(status_code),
model_used: model,
tested_at,
retry_count: 0,
})
}
Err(e) => Ok(StreamCheckResult {
status: HealthStatus::Failed,
success: false,
message: e.to_string(),
response_time_ms: Some(response_time),
http_status: None,
model_used: String::new(),
tested_at,
retry_count: 0,
}),
}
}
/// Claude 流式检查
async fn check_claude_stream(
client: &Client,
base_url: &str,
auth: &AuthInfo,
model: &str,
) -> Result<(u16, String), AppError> {
let base = base_url.trim_end_matches('/');
let url = if base.ends_with("/v1") {
format!("{base}/messages")
} else {
format!("{base}/v1/messages")
};
let body = json!({
"model": model,
"max_tokens": 1,
"messages": [{ "role": "user", "content": "hi" }],
"stream": true
});
let response = client
.post(&url)
.header("x-api-key", &auth.api_key)
.header("anthropic-version", "2023-06-01")
.header("Content-Type", "application/json")
.json(&body)
.send()
.await
.map_err(Self::map_request_error)?;
let status = response.status().as_u16();
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(AppError::Message(format!("HTTP {status}: {error_text}")));
}
// 流式读取:只需首个 chunk
let mut stream = response.bytes_stream();
if let Some(chunk) = stream.next().await {
match chunk {
Ok(_) => Ok((status, model.to_string())),
Err(e) => Err(AppError::Message(format!("读取流失败: {e}"))),
}
} else {
Err(AppError::Message("未收到响应数据".to_string()))
}
}
/// Codex 流式检查
async fn check_codex_stream(
client: &Client,
base_url: &str,
auth: &AuthInfo,
model: &str,
) -> Result<(u16, String), AppError> {
let base = base_url.trim_end_matches('/');
let url = if base.ends_with("/v1") {
format!("{base}/chat/completions")
} else {
format!("{base}/v1/chat/completions")
};
// 解析模型名和推理等级 (支持 model@level 或 model#level 格式)
let (actual_model, reasoning_effort) = Self::parse_model_with_effort(model);
let mut body = json!({
"model": actual_model,
"messages": [
{ "role": "system", "content": "" },
{ "role": "assistant", "content": "" },
{ "role": "user", "content": "hi" }
],
"max_tokens": 1,
"temperature": 0,
"stream": true
});
// 如果是推理模型,添加 reasoning_effort
if let Some(effort) = reasoning_effort {
body["reasoning_effort"] = json!(effort);
}
let response = client
.post(&url)
.header("Authorization", format!("Bearer {}", auth.api_key))
.header("Content-Type", "application/json")
.json(&body)
.send()
.await
.map_err(Self::map_request_error)?;
let status = response.status().as_u16();
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(AppError::Message(format!("HTTP {status}: {error_text}")));
}
let mut stream = response.bytes_stream();
if let Some(chunk) = stream.next().await {
match chunk {
Ok(_) => Ok((status, model.to_string())),
Err(e) => Err(AppError::Message(format!("读取流失败: {e}"))),
}
} else {
Err(AppError::Message("未收到响应数据".to_string()))
}
}
/// Gemini 流式检查
async fn check_gemini_stream(
client: &Client,
base_url: &str,
auth: &AuthInfo,
model: &str,
) -> Result<(u16, String), AppError> {
let base = base_url.trim_end_matches('/');
let url = format!("{base}/v1/chat/completions");
let body = json!({
"model": model,
"messages": [{ "role": "user", "content": "hi" }],
"max_tokens": 1,
"temperature": 0,
"stream": true
});
let response = client
.post(&url)
.header("Authorization", format!("Bearer {}", auth.api_key))
.header("Content-Type", "application/json")
.json(&body)
.send()
.await
.map_err(Self::map_request_error)?;
let status = response.status().as_u16();
if !response.status().is_success() {
let error_text = response.text().await.unwrap_or_default();
return Err(AppError::Message(format!("HTTP {status}: {error_text}")));
}
let mut stream = response.bytes_stream();
if let Some(chunk) = stream.next().await {
match chunk {
Ok(_) => Ok((status, model.to_string())),
Err(e) => Err(AppError::Message(format!("读取流失败: {e}"))),
}
} else {
Err(AppError::Message("未收到响应数据".to_string()))
}
}
fn determine_status(latency_ms: u64, threshold: u64) -> HealthStatus {
if latency_ms <= threshold {
HealthStatus::Operational
} else {
HealthStatus::Degraded
}
}
/// 解析模型名和推理等级 (支持 model@level 或 model#level 格式)
/// 返回 (实际模型名, Option<推理等级>)
fn parse_model_with_effort(model: &str) -> (String, Option<String>) {
// 查找 @ 或 # 分隔符
if let Some(pos) = model.find('@').or_else(|| model.find('#')) {
let actual_model = model[..pos].to_string();
let effort = model[pos + 1..].to_string();
if !effort.is_empty() {
return (actual_model, Some(effort));
}
}
(model.to_string(), None)
}
fn should_retry(msg: &str) -> bool {
let lower = msg.to_lowercase();
lower.contains("timeout")
|| lower.contains("abort")
|| lower.contains("中断")
|| lower.contains("超时")
}
fn map_request_error(e: reqwest::Error) -> AppError {
if e.is_timeout() {
AppError::Message("请求超时".to_string())
} else if e.is_connect() {
AppError::Message(format!("连接失败: {e}"))
} else {
AppError::Message(e.to_string())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_determine_status() {
assert_eq!(
StreamCheckService::determine_status(3000, 6000),
HealthStatus::Operational
);
assert_eq!(
StreamCheckService::determine_status(6000, 6000),
HealthStatus::Operational
);
assert_eq!(
StreamCheckService::determine_status(6001, 6000),
HealthStatus::Degraded
);
}
#[test]
fn test_should_retry() {
assert!(StreamCheckService::should_retry("请求超时"));
assert!(StreamCheckService::should_retry("request timeout"));
assert!(!StreamCheckService::should_retry("API Key 无效"));
}
#[test]
fn test_default_config() {
let config = StreamCheckConfig::default();
assert_eq!(config.timeout_secs, 45);
assert_eq!(config.max_retries, 2);
assert_eq!(config.degraded_threshold_ms, 6000);
}
#[test]
fn test_parse_model_with_effort() {
// 带 @ 分隔符
let (model, effort) = StreamCheckService::parse_model_with_effort("gpt-5.1-codex@low");
assert_eq!(model, "gpt-5.1-codex");
assert_eq!(effort, Some("low".to_string()));
// 带 # 分隔符
let (model, effort) = StreamCheckService::parse_model_with_effort("o1-preview#high");
assert_eq!(model, "o1-preview");
assert_eq!(effort, Some("high".to_string()));
// 无分隔符
let (model, effort) = StreamCheckService::parse_model_with_effort("gpt-4o-mini");
assert_eq!(model, "gpt-4o-mini");
assert_eq!(effort, None);
}
}

View File

@@ -652,56 +652,6 @@ impl Database {
monthly_exceeded,
})
}
/// 更新每日统计聚合
///
/// 在请求完成后调用,更新 usage_daily_stats 表
#[allow(clippy::too_many_arguments)]
pub fn update_daily_stats(
&self,
provider_id: &str,
app_type: &str,
model: &str,
input_tokens: u32,
output_tokens: u32,
total_cost: &str,
is_success: bool,
) -> Result<(), AppError> {
let conn = lock_conn!(self.conn);
let date = Utc::now().format("%Y-%m-%d").to_string();
// 使用 UPSERT 更新或插入统计
conn.execute(
"INSERT INTO usage_daily_stats (
date, provider_id, app_type, model,
request_count, total_input_tokens, total_output_tokens,
total_cost_usd, success_count, error_count
) VALUES (?1, ?2, ?3, ?4, 1, ?5, ?6, ?7, ?8, ?9)
ON CONFLICT(date, provider_id, app_type, model) DO UPDATE SET
request_count = request_count + 1,
total_input_tokens = total_input_tokens + ?5,
total_output_tokens = total_output_tokens + ?6,
total_cost_usd = CAST(
CAST(total_cost_usd AS REAL) + CAST(?7 AS REAL) AS TEXT
),
success_count = success_count + ?8,
error_count = error_count + ?9",
params![
date,
provider_id,
app_type,
model,
input_tokens,
output_tokens,
total_cost,
if is_success { 1 } else { 0 },
if is_success { 0 } else { 1 },
],
)
.map_err(|e| AppError::Database(format!("更新每日统计失败: {e}")))?;
Ok(())
}
}
/// Provider 限额状态

View File

@@ -535,14 +535,8 @@ fn validate_request_url(request_url: &str, base_url: &str) -> Result<(), AppErro
(Some(request_port), Some(base_port)) => {
return Err(AppError::localized(
"usage_script.request_port_mismatch",
format!(
"请求端口 {} 必须与 base_url 端口 {} 匹配",
request_port, base_port
),
format!(
"Request port {} must match base_url port {}",
request_port, base_port
),
format!("请求端口 {request_port} 必须与 base_url 端口 {base_port} 匹配"),
format!("Request port {request_port} must match base_url port {base_port}"),
));
}
_ => {