From a42f31fe52ffe20c75888f545e491c392ea504ae Mon Sep 17 00:00:00 2001 From: zhayujie Date: Mon, 16 Mar 2026 17:46:05 +0800 Subject: [PATCH] feat: support wecom_bot stream card --- channel/wecom_bot/wecom_bot_channel.py | 96 ++++++++++++++++++++++---- 1 file changed, 83 insertions(+), 13 deletions(-) diff --git a/channel/wecom_bot/wecom_bot_channel.py b/channel/wecom_bot/wecom_bot_channel.py index da9def7..b331fbc 100644 --- a/channel/wecom_bot/wecom_bot_channel.py +++ b/channel/wecom_bot/wecom_bot_channel.py @@ -48,6 +48,7 @@ class WecomBotChannel(ChatChannel): self._stop_event = threading.Event() self._pending_responses = {} # req_id -> (threading.Event, result_holder) self._pending_lock = threading.Lock() + self._stream_states = {} # req_id -> {"stream_id": str, "content": str} conf()["group_name_white_list"] = ["ALL_GROUP"] conf()["single_chat_prefix"] = [""] @@ -200,17 +201,18 @@ class WecomBotChannel(ChatChannel): event.set() return - # Subscribe response + # Subscribe response (only handle once before connected) if errcode is not None and cmd == "": - if errcode == 0: - logger.info("[WecomBot] Subscribe success") - self._connected = True - self._start_heartbeat() - self.report_startup_success() - else: - errmsg = data.get("errmsg", "unknown error") - logger.error(f"[WecomBot] Subscribe failed: errcode={errcode}, errmsg={errmsg}") - self.report_startup_error(errmsg) + if not self._connected: + if errcode == 0: + logger.info("[WecomBot] Subscribe success") + self._connected = True + self._start_heartbeat() + self.report_startup_success() + else: + errmsg = data.get("errmsg", "unknown error") + logger.error(f"[WecomBot] Subscribe failed: errcode={errcode}, errmsg={errmsg}") + self.report_startup_error(errmsg) return if cmd == "aibot_msg_callback": @@ -298,6 +300,8 @@ class WecomBotChannel(ChatChannel): no_need_at=True, ) if context: + if req_id: + context["on_event"] = self._make_stream_callback(req_id) self.produce(context) # ------------------------------------------------------------------ @@ -316,6 +320,66 @@ class WecomBotChannel(ChatChannel): else: logger.debug(f"[WecomBot] Event: {event_type}") + # ------------------------------------------------------------------ + # Stream callback (for agent on_event) + # ------------------------------------------------------------------ + + def _make_stream_callback(self, req_id: str): + """Build an on_event callback that pushes agent stream deltas to wecom via stream message. + + All intermediate segments (thinking before tool calls) and the final answer + are accumulated into a single stream message, separated by '---'. + """ + stream_id = uuid.uuid4().hex[:16] + self._stream_states[req_id] = { + "stream_id": stream_id, + "committed": "", # finalized content from previous segments + "current": "", # current segment being streamed + } + + def _push_stream(state: dict): + """Push current stream content to wecom.""" + self._ws_send({ + "cmd": "aibot_respond_msg", + "headers": {"req_id": req_id}, + "body": { + "msgtype": "stream", + "stream": { + "id": state["stream_id"], + "finish": False, + "content": state["committed"] + state["current"], + }, + }, + }) + + def on_event(event: dict): + event_type = event.get("type") + data = event.get("data", {}) + state = self._stream_states.get(req_id) + if not state: + return + + if event_type == "turn_start": + state["current"] = "" + + elif event_type == "message_update": + delta = data.get("delta", "") + if delta: + state["current"] += delta + _push_stream(state) + + elif event_type == "message_end": + tool_calls = data.get("tool_calls", []) + if tool_calls: + if state["current"].strip(): + state["committed"] += state["current"].strip() + "\n\n---\n\n" + state["current"] = "" + else: + state["committed"] += state["current"] + state["current"] = "" + + return on_event + # ------------------------------------------------------------------ # _compose_context (same pattern as feishu) # ------------------------------------------------------------------ @@ -383,9 +447,15 @@ class WecomBotChannel(ChatChannel): # ------------------------------------------------------------------ def _send_text(self, content: str, receiver: str, is_group: bool, req_id: str = None): - """Send text/markdown reply.""" + """Send text/markdown reply. Reuses stream state if available (streaming mode).""" if req_id: - stream_id = uuid.uuid4().hex[:16] + state = self._stream_states.pop(req_id, None) + if state: + final_content = state["committed"] + content + stream_id = state["stream_id"] + else: + final_content = content + stream_id = uuid.uuid4().hex[:16] self._ws_send({ "cmd": "aibot_respond_msg", "headers": {"req_id": req_id}, @@ -394,7 +464,7 @@ class WecomBotChannel(ChatChannel): "stream": { "id": stream_id, "finish": True, - "content": content, + "content": final_content, }, }, })