feat: support multi-channel

This commit is contained in:
zhayujie
2026-02-26 18:34:08 +08:00
parent 925d728a86
commit 7cce224499
7 changed files with 153 additions and 96 deletions

View File

@@ -1,9 +1,15 @@
import sys
import time
import web
import json
import logging
import mimetypes
import os
import threading
import time
import uuid
from queue import Queue, Empty
import web
from bridge.context import *
from bridge.reply import Reply, ReplyType
from channel.chat_channel import ChatChannel, check_prefix
@@ -11,20 +17,17 @@ from channel.chat_message import ChatMessage
from common.log import logger
from common.singleton import singleton
from config import conf
import os
import mimetypes
import threading
import logging
class WebMessage(ChatMessage):
def __init__(
self,
msg_id,
content,
ctype=ContextType.TEXT,
from_user_id="User",
to_user_id="Chatgpt",
other_user_id="Chatgpt",
self,
msg_id,
content,
ctype=ContextType.TEXT,
from_user_id="User",
to_user_id="Chatgpt",
other_user_id="Chatgpt",
):
self.msg_id = msg_id
self.ctype = ctype
@@ -38,7 +41,7 @@ class WebMessage(ChatMessage):
class WebChannel(ChatChannel):
NOT_SUPPORT_REPLYTYPE = [ReplyType.VOICE]
_instance = None
# def __new__(cls):
# if cls._instance is None:
# cls._instance = super(WebChannel, cls).__new__(cls)
@@ -47,12 +50,11 @@ class WebChannel(ChatChannel):
def __init__(self):
super().__init__()
self.msg_id_counter = 0
self.session_queues = {} # session_id -> Queue (fallback polling)
self.request_to_session = {} # request_id -> session_id
self.sse_queues = {} # request_id -> Queue (SSE streaming)
self.session_queues = {} # session_id -> Queue (fallback polling)
self.request_to_session = {} # request_id -> session_id
self.sse_queues = {} # request_id -> Queue (SSE streaming)
self._http_server = None
def _generate_msg_id(self):
"""生成唯一的消息ID"""
self.msg_id_counter += 1
@@ -111,6 +113,7 @@ class WebChannel(ChatChannel):
def _make_sse_callback(self, request_id: str):
"""Build an on_event callback that pushes agent stream events into the SSE queue."""
def on_event(event: dict):
if request_id not in self.sse_queues:
return
@@ -237,28 +240,28 @@ class WebChannel(ChatChannel):
data = web.data()
json_data = json.loads(data)
session_id = json_data.get('session_id')
if not session_id or session_id not in self.session_queues:
return json.dumps({"status": "error", "message": "Invalid session ID"})
# 尝试从队列获取响应,不等待
try:
# 使用peek而不是get这样如果前端没有成功处理下次还能获取到
response = self.session_queues[session_id].get(block=False)
# 返回响应包含请求ID以区分不同请求
return json.dumps({
"status": "success",
"status": "success",
"has_content": True,
"content": response["content"],
"request_id": response["request_id"],
"timestamp": response["timestamp"]
})
except Empty:
# 没有新响应
return json.dumps({"status": "success", "has_content": False})
except Exception as e:
logger.error(f"Error polling response: {e}")
return json.dumps({"status": "error", "message": str(e)})
@@ -271,9 +274,10 @@ class WebChannel(ChatChannel):
def startup(self):
port = conf().get("web_port", 9899)
# 打印可用渠道类型提示
logger.info("[WebChannel] 当前channel为web可修改 config.json 配置文件中的 channel_type 字段进行切换。全部可用类型为:")
logger.info(
"[WebChannel] 全部可用通道如下,可修改 config.json 配置文件中的 channel_type 字段进行切换,多个通道用逗号分隔:")
logger.info("[WebChannel] 1. web - 网页")
logger.info("[WebChannel] 2. terminal - 终端")
logger.info("[WebChannel] 3. feishu - 飞书")
@@ -281,16 +285,16 @@ class WebChannel(ChatChannel):
logger.info("[WebChannel] 5. wechatcom_app - 企微自建应用")
logger.info("[WebChannel] 6. wechatmp - 个人公众号")
logger.info("[WebChannel] 7. wechatmp_service - 企业公众号")
logger.info("[WebChannel] ✅ Web控制台已运行")
logger.info(f"[WebChannel] 🌐 本地访问: http://localhost:{port}")
logger.info(f"[WebChannel] 🌍 服务器访问: http://YOUR_IP:{port} (请将YOUR_IP替换为服务器IP)")
logger.info("[WebChannel] ✅ Web对话网页已运行")
# 确保静态文件目录存在
static_dir = os.path.join(os.path.dirname(__file__), 'static')
if not os.path.exists(static_dir):
os.makedirs(static_dir)
logger.debug(f"[WebChannel] Created static directory: {static_dir}")
urls = (
'/', 'RootHandler',
'/message', 'MessageHandler',
@@ -307,14 +311,14 @@ class WebChannel(ChatChannel):
'/assets/(.*)', 'AssetsHandler',
)
app = web.application(urls, globals(), autoreload=False)
# 完全禁用web.py的HTTP日志输出
web.httpserver.LogMiddleware.log = lambda self, status, environ: None
# 配置web.py的日志级别为ERROR
logging.getLogger("web").setLevel(logging.ERROR)
logging.getLogger("web.httpserver").setLevel(logging.ERROR)
# Build WSGI app with middleware (same as runsimple but without print)
func = web.httpserver.StaticMiddleware(app.wsgifunc())
func = web.httpserver.LogMiddleware(func)