Files
chatgpt-on-wechat/channel/wecom_bot/wecom_bot_channel.py
2026-03-16 19:03:18 +08:00

768 lines
29 KiB
Python

"""
WeCom (企业微信) AI Bot channel via WebSocket long connection.
Supports:
- Single chat and group chat (text / image / file input & output)
- Scheduled task push via aibot_send_msg
- Heartbeat keep-alive and auto-reconnect
"""
import base64
import hashlib
import json
import math
import os
import threading
import time
import uuid
import requests
import websocket
from bridge.context import Context, ContextType
from bridge.reply import Reply, ReplyType
from channel.chat_channel import ChatChannel, check_prefix
from channel.wecom_bot.wecom_bot_message import WecomBotMessage
from common.expired_dict import ExpiredDict
from common.log import logger
from common.singleton import singleton
from config import conf
WECOM_WS_URL = "wss://openws.work.weixin.qq.com"
HEARTBEAT_INTERVAL = 30
MEDIA_CHUNK_SIZE = 512 * 1024 # 512KB per chunk (before base64 encoding)
@singleton
class WecomBotChannel(ChatChannel):
def __init__(self):
super().__init__()
self.bot_id = ""
self.bot_secret = ""
self.received_msgs = ExpiredDict(60 * 60 * 7.1)
self._ws = None
self._ws_thread = None
self._heartbeat_thread = None
self._connected = False
self._stop_event = threading.Event()
self._pending_responses = {} # req_id -> (threading.Event, result_holder)
self._pending_lock = threading.Lock()
self._stream_states = {} # req_id -> {"stream_id": str, "content": str}
conf()["group_name_white_list"] = ["ALL_GROUP"]
conf()["single_chat_prefix"] = [""]
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def startup(self):
self.bot_id = conf().get("wecom_bot_id", "")
self.bot_secret = conf().get("wecom_bot_secret", "")
if not self.bot_id or not self.bot_secret:
err = "[WecomBot] wecom_bot_id and wecom_bot_secret are required"
logger.error(err)
self.report_startup_error(err)
return
self._stop_event.clear()
self._start_ws()
def stop(self):
logger.info("[WecomBot] stop() called")
self._stop_event.set()
if self._ws:
try:
self._ws.close()
except Exception:
pass
self._ws = None
self._connected = False
# ------------------------------------------------------------------
# WebSocket connection
# ------------------------------------------------------------------
def _start_ws(self):
def _on_open(ws):
logger.info("[WecomBot] WebSocket connected, sending subscribe...")
self._send_subscribe()
def _on_message(ws, raw):
try:
data = json.loads(raw)
self._handle_ws_message(data)
except Exception as e:
logger.error(f"[WecomBot] Failed to handle ws message: {e}", exc_info=True)
def _on_error(ws, error):
logger.error(f"[WecomBot] WebSocket error: {error}")
def _on_close(ws, close_status_code, close_msg):
logger.warning(f"[WecomBot] WebSocket closed: status={close_status_code}, msg={close_msg}")
self._connected = False
if not self._stop_event.is_set():
logger.info("[WecomBot] Will reconnect in 5s...")
time.sleep(5)
if not self._stop_event.is_set():
self._start_ws()
self._ws = websocket.WebSocketApp(
WECOM_WS_URL,
on_open=_on_open,
on_message=_on_message,
on_error=_on_error,
on_close=_on_close,
)
def run_forever():
try:
self._ws.run_forever(ping_interval=0, reconnect=0)
except (SystemExit, KeyboardInterrupt):
logger.info("[WecomBot] WebSocket thread interrupted")
except Exception as e:
logger.error(f"[WecomBot] WebSocket run_forever error: {e}")
self._ws_thread = threading.Thread(target=run_forever, daemon=True)
self._ws_thread.start()
self._ws_thread.join()
def _ws_send(self, data: dict):
if self._ws:
self._ws.send(json.dumps(data, ensure_ascii=False))
def _gen_req_id(self) -> str:
return uuid.uuid4().hex[:16]
# ------------------------------------------------------------------
# Subscribe & heartbeat
# ------------------------------------------------------------------
def _send_subscribe(self):
self._ws_send({
"cmd": "aibot_subscribe",
"headers": {"req_id": self._gen_req_id()},
"body": {
"bot_id": self.bot_id,
"secret": self.bot_secret,
},
})
def _start_heartbeat(self):
if self._heartbeat_thread and self._heartbeat_thread.is_alive():
return
def heartbeat_loop():
while not self._stop_event.is_set() and self._connected:
try:
self._ws_send({
"cmd": "ping",
"headers": {"req_id": self._gen_req_id()},
})
except Exception as e:
logger.warning(f"[WecomBot] Heartbeat send failed: {e}")
break
self._stop_event.wait(HEARTBEAT_INTERVAL)
self._heartbeat_thread = threading.Thread(target=heartbeat_loop, daemon=True)
self._heartbeat_thread.start()
# ------------------------------------------------------------------
# Incoming message dispatch
# ------------------------------------------------------------------
def _send_and_wait(self, data: dict, timeout: float = 15) -> dict:
"""Send a ws message and wait for the matching response by req_id."""
req_id = data.get("headers", {}).get("req_id", "")
event = threading.Event()
holder = {"data": None}
with self._pending_lock:
self._pending_responses[req_id] = (event, holder)
self._ws_send(data)
event.wait(timeout=timeout)
with self._pending_lock:
self._pending_responses.pop(req_id, None)
return holder["data"] or {}
def _handle_ws_message(self, data: dict):
cmd = data.get("cmd", "")
errcode = data.get("errcode")
req_id = data.get("headers", {}).get("req_id", "")
# Check if this is a response to a pending request
if req_id:
with self._pending_lock:
pending = self._pending_responses.get(req_id)
if pending:
event, holder = pending
holder["data"] = data
event.set()
return
# Subscribe response (only handle once before connected)
if errcode is not None and cmd == "":
if not self._connected:
if errcode == 0:
logger.info("[WecomBot] ✅ Subscribe success")
self._connected = True
self._start_heartbeat()
self.report_startup_success()
else:
errmsg = data.get("errmsg", "unknown error")
logger.error(f"[WecomBot] Subscribe failed: errcode={errcode}, errmsg={errmsg}")
self.report_startup_error(errmsg)
return
if cmd == "aibot_msg_callback":
self._handle_msg_callback(data)
elif cmd == "aibot_event_callback":
self._handle_event_callback(data)
elif cmd == "":
if errcode and errcode != 0:
logger.warning(f"[WecomBot] Response error: {data}")
# ------------------------------------------------------------------
# Message callback
# ------------------------------------------------------------------
def _handle_msg_callback(self, data: dict):
body = data.get("body", {})
req_id = data.get("headers", {}).get("req_id", "")
msg_id = body.get("msgid", "")
if self.received_msgs.get(msg_id):
logger.debug(f"[WecomBot] Duplicate msg filtered: {msg_id}")
return
self.received_msgs[msg_id] = True
chattype = body.get("chattype", "single")
is_group = chattype == "group"
try:
wecom_msg = WecomBotMessage(body, is_group=is_group)
except NotImplementedError as e:
logger.warning(f"[WecomBot] {e}")
return
except Exception as e:
logger.error(f"[WecomBot] Failed to parse message: {e}", exc_info=True)
return
wecom_msg.req_id = req_id
# File cache logic (same pattern as feishu)
from channel.file_cache import get_file_cache
file_cache = get_file_cache()
if is_group:
if conf().get("group_shared_session", True):
session_id = body.get("chatid", "")
else:
session_id = wecom_msg.from_user_id + "_" + body.get("chatid", "")
else:
session_id = wecom_msg.from_user_id
if wecom_msg.ctype == ContextType.IMAGE:
if hasattr(wecom_msg, "image_path") and wecom_msg.image_path:
file_cache.add(session_id, wecom_msg.image_path, file_type="image")
logger.info(f"[WecomBot] Image cached for session {session_id}")
return
if wecom_msg.ctype == ContextType.FILE:
wecom_msg.prepare()
file_cache.add(session_id, wecom_msg.content, file_type="file")
logger.info(f"[WecomBot] File cached for session {session_id}: {wecom_msg.content}")
return
if wecom_msg.ctype == ContextType.TEXT:
cached_files = file_cache.get(session_id)
if cached_files:
file_refs = []
for fi in cached_files:
ftype = fi["type"]
fpath = fi["path"]
if ftype == "image":
file_refs.append(f"[图片: {fpath}]")
elif ftype == "video":
file_refs.append(f"[视频: {fpath}]")
else:
file_refs.append(f"[文件: {fpath}]")
wecom_msg.content = wecom_msg.content + "\n" + "\n".join(file_refs)
logger.info(f"[WecomBot] Attached {len(cached_files)} cached file(s)")
file_cache.clear(session_id)
context = self._compose_context(
wecom_msg.ctype,
wecom_msg.content,
isgroup=is_group,
msg=wecom_msg,
no_need_at=True,
)
if context:
if req_id:
context["on_event"] = self._make_stream_callback(req_id)
self.produce(context)
# ------------------------------------------------------------------
# Event callback
# ------------------------------------------------------------------
def _handle_event_callback(self, data: dict):
body = data.get("body", {})
event = body.get("event", {})
event_type = event.get("eventtype", "")
if event_type == "enter_chat":
logger.info(f"[WecomBot] User entered chat: {body.get('from', {}).get('userid')}")
elif event_type == "disconnected_event":
logger.warning("[WecomBot] Received disconnected_event, another connection took over")
else:
logger.debug(f"[WecomBot] Event: {event_type}")
# ------------------------------------------------------------------
# Stream callback (for agent on_event)
# ------------------------------------------------------------------
def _make_stream_callback(self, req_id: str):
"""Build an on_event callback that pushes agent stream deltas to wecom via stream message.
All intermediate segments (thinking before tool calls) and the final answer
are accumulated into a single stream message, separated by '---'.
"""
stream_id = uuid.uuid4().hex[:16]
self._stream_states[req_id] = {
"stream_id": stream_id,
"committed": "", # finalized content from previous segments
"current": "", # current segment being streamed
}
def _push_stream(state: dict):
"""Push current stream content to wecom."""
self._ws_send({
"cmd": "aibot_respond_msg",
"headers": {"req_id": req_id},
"body": {
"msgtype": "stream",
"stream": {
"id": state["stream_id"],
"finish": False,
"content": state["committed"] + state["current"],
},
},
})
def on_event(event: dict):
event_type = event.get("type")
data = event.get("data", {})
state = self._stream_states.get(req_id)
if not state:
return
if event_type == "turn_start":
state["current"] = ""
elif event_type == "message_update":
delta = data.get("delta", "")
if delta:
state["current"] += delta
_push_stream(state)
elif event_type == "message_end":
tool_calls = data.get("tool_calls", [])
if tool_calls:
if state["current"].strip():
state["committed"] += state["current"].strip() + "\n\n---\n\n"
state["current"] = ""
else:
state["committed"] += state["current"]
state["current"] = ""
return on_event
# ------------------------------------------------------------------
# _compose_context (same pattern as feishu)
# ------------------------------------------------------------------
def _compose_context(self, ctype: ContextType, content, **kwargs):
context = Context(ctype, content)
context.kwargs = kwargs
if "channel_type" not in context:
context["channel_type"] = self.channel_type
if "origin_ctype" not in context:
context["origin_ctype"] = ctype
cmsg = context["msg"]
if cmsg.is_group:
if conf().get("group_shared_session", True):
context["session_id"] = cmsg.other_user_id
else:
context["session_id"] = f"{cmsg.from_user_id}:{cmsg.other_user_id}"
else:
context["session_id"] = cmsg.from_user_id
context["receiver"] = cmsg.other_user_id
if ctype == ContextType.TEXT:
img_match_prefix = check_prefix(content, conf().get("image_create_prefix"))
if img_match_prefix:
content = content.replace(img_match_prefix, "", 1)
context.type = ContextType.IMAGE_CREATE
else:
context.type = ContextType.TEXT
context.content = content.strip()
return context
# ------------------------------------------------------------------
# Send reply
# ------------------------------------------------------------------
def send(self, reply: Reply, context: Context):
msg = context.get("msg")
is_group = context.get("isgroup", False)
receiver = context.get("receiver", "")
# Determine req_id for responding or use send_msg for scheduled push
req_id = getattr(msg, "req_id", None) if msg else None
if reply.type == ReplyType.TEXT:
self._send_text(reply.content, receiver, is_group, req_id)
elif reply.type in (ReplyType.IMAGE_URL, ReplyType.IMAGE):
self._send_image(reply.content, receiver, is_group, req_id)
elif reply.type == ReplyType.FILE:
if hasattr(reply, "text_content") and reply.text_content:
self._send_text(reply.text_content, receiver, is_group, req_id)
time.sleep(0.3)
self._send_file(reply.content, receiver, is_group, req_id)
elif reply.type == ReplyType.VIDEO or reply.type == ReplyType.VIDEO_URL:
self._send_file(reply.content, receiver, is_group, req_id, media_type="video")
else:
logger.warning(f"[WecomBot] Unsupported reply type: {reply.type}, falling back to text")
self._send_text(str(reply.content), receiver, is_group, req_id)
# ------------------------------------------------------------------
# Respond message (via websocket)
# ------------------------------------------------------------------
def _send_text(self, content: str, receiver: str, is_group: bool, req_id: str = None):
"""Send text/markdown reply. Reuses stream state if available (streaming mode)."""
if req_id:
state = self._stream_states.pop(req_id, None)
if state:
final_content = state["committed"]
stream_id = state["stream_id"]
else:
final_content = content
stream_id = uuid.uuid4().hex[:16]
self._ws_send({
"cmd": "aibot_respond_msg",
"headers": {"req_id": req_id},
"body": {
"msgtype": "stream",
"stream": {
"id": stream_id,
"finish": True,
"content": final_content,
},
},
})
else:
self._active_send_markdown(content, receiver, is_group)
def _send_image(self, img_path_or_url: str, receiver: str, is_group: bool, req_id: str = None):
"""Send image reply. Converts to JPG/PNG and compresses if >2MB."""
local_path = img_path_or_url
if local_path.startswith("file://"):
local_path = local_path[7:]
if local_path.startswith(("http://", "https://")):
try:
resp = requests.get(local_path, timeout=30)
resp.raise_for_status()
ct = resp.headers.get("Content-Type", "")
if "jpeg" in ct or "jpg" in ct:
ext = ".jpg"
elif "webp" in ct:
ext = ".webp"
elif "gif" in ct:
ext = ".gif"
else:
ext = ".png"
tmp_path = f"/tmp/wecom_img_{uuid.uuid4().hex[:8]}{ext}"
with open(tmp_path, "wb") as f:
f.write(resp.content)
logger.info(f"[WecomBot] Image downloaded: size={len(resp.content)}, "
f"content-type={ct}, path={tmp_path}")
local_path = tmp_path
except Exception as e:
logger.error(f"[WecomBot] Failed to download image for sending: {e}")
self._send_text("[Image send failed]", receiver, is_group, req_id)
return
if not os.path.exists(local_path):
logger.error(f"[WecomBot] Image file not found: {local_path}")
return
max_image_size = 2 * 1024 * 1024 # 2MB limit for image upload
local_path = self._ensure_image_format(local_path)
if not local_path:
self._send_text("[Image format conversion failed]", receiver, is_group, req_id)
return
if os.path.getsize(local_path) > max_image_size:
local_path = self._compress_image(local_path, max_image_size)
if not local_path:
self._send_text("[Image too large]", receiver, is_group, req_id)
return
file_size = os.path.getsize(local_path)
logger.info(f"[WecomBot] Uploading image: path={local_path}, size={file_size} bytes")
media_id = self._upload_media(local_path, "image")
if not media_id:
logger.error("[WecomBot] Failed to upload image")
self._send_text("[Image upload failed]", receiver, is_group, req_id)
return
if req_id:
self._ws_send({
"cmd": "aibot_respond_msg",
"headers": {"req_id": req_id},
"body": {
"msgtype": "image",
"image": {"media_id": media_id},
},
})
else:
self._ws_send({
"cmd": "aibot_send_msg",
"headers": {"req_id": self._gen_req_id()},
"body": {
"chatid": receiver,
"chat_type": 2 if is_group else 1,
"msgtype": "image",
"image": {"media_id": media_id},
},
})
@staticmethod
def _ensure_image_format(file_path: str) -> str:
"""Ensure image is JPG or PNG (the only formats wecom supports). Convert if needed."""
try:
from PIL import Image
img = Image.open(file_path)
fmt = (img.format or "").upper()
if fmt in ("JPEG", "PNG"):
# Already a supported format, but make sure the filename extension matches
ext = os.path.splitext(file_path)[1].lower()
if fmt == "JPEG" and ext in (".jpg", ".jpeg"):
return file_path
if fmt == "PNG" and ext == ".png":
return file_path
# Extension doesn't match — rename/copy with correct extension
correct_ext = ".jpg" if fmt == "JPEG" else ".png"
out_path = f"/tmp/wecom_fmt_{uuid.uuid4().hex[:8]}{correct_ext}"
img.save(out_path, fmt)
logger.info(f"[WecomBot] Image renamed: {file_path} -> {out_path} ({fmt})")
return out_path
# Unsupported format (WebP, GIF, BMP, etc.) — convert to PNG
if img.mode == "RGBA":
out_path = f"/tmp/wecom_fmt_{uuid.uuid4().hex[:8]}.png"
img.save(out_path, "PNG")
else:
out_path = f"/tmp/wecom_fmt_{uuid.uuid4().hex[:8]}.jpg"
img.convert("RGB").save(out_path, "JPEG", quality=90)
logger.info(f"[WecomBot] Image converted from {fmt} -> {out_path}")
return out_path
except Exception as e:
logger.error(f"[WecomBot] Image format check failed: {e}")
return file_path
@staticmethod
def _compress_image(file_path: str, max_bytes: int) -> str:
"""Compress image to fit within max_bytes. Returns new path or empty string."""
try:
from PIL import Image
img = Image.open(file_path)
if img.mode == "RGBA":
img = img.convert("RGB")
out_path = f"/tmp/wecom_compressed_{uuid.uuid4().hex[:8]}.jpg"
quality = 85
while quality >= 30:
img.save(out_path, "JPEG", quality=quality, optimize=True)
if os.path.getsize(out_path) <= max_bytes:
logger.info(f"[WecomBot] Image compressed: quality={quality}, "
f"size={os.path.getsize(out_path)} bytes")
return out_path
quality -= 10
# Still too large — resize
ratio = (max_bytes / os.path.getsize(out_path)) ** 0.5
new_size = (int(img.width * ratio), int(img.height * ratio))
img = img.resize(new_size, Image.LANCZOS)
img.save(out_path, "JPEG", quality=70, optimize=True)
if os.path.getsize(out_path) <= max_bytes:
logger.info(f"[WecomBot] Image compressed with resize: {new_size}, "
f"size={os.path.getsize(out_path)} bytes")
return out_path
logger.error(f"[WecomBot] Cannot compress image below {max_bytes} bytes")
return ""
except Exception as e:
logger.error(f"[WecomBot] Image compression failed: {e}")
return ""
def _send_file(self, file_path: str, receiver: str, is_group: bool,
req_id: str = None, media_type: str = "file"):
"""Send file/video reply by uploading media first."""
local_path = file_path
if local_path.startswith("file://"):
local_path = local_path[7:]
if local_path.startswith(("http://", "https://")):
try:
resp = requests.get(local_path, timeout=60)
resp.raise_for_status()
ext = os.path.splitext(local_path)[1] or ".bin"
tmp_path = f"/tmp/wecom_file_{uuid.uuid4().hex[:8]}{ext}"
with open(tmp_path, "wb") as f:
f.write(resp.content)
local_path = tmp_path
except Exception as e:
logger.error(f"[WecomBot] Failed to download file for sending: {e}")
return
if not os.path.exists(local_path):
logger.error(f"[WecomBot] File not found: {local_path}")
return
media_id = self._upload_media(local_path, media_type)
if not media_id:
logger.error(f"[WecomBot] Failed to upload {media_type}")
return
if req_id:
self._ws_send({
"cmd": "aibot_respond_msg",
"headers": {"req_id": req_id},
"body": {
"msgtype": media_type,
media_type: {"media_id": media_id},
},
})
else:
self._ws_send({
"cmd": "aibot_send_msg",
"headers": {"req_id": self._gen_req_id()},
"body": {
"chatid": receiver,
"chat_type": 2 if is_group else 1,
"msgtype": media_type,
media_type: {"media_id": media_id},
},
})
def _active_send_markdown(self, content: str, receiver: str, is_group: bool):
"""Proactively send markdown message (for scheduled tasks, no req_id)."""
self._ws_send({
"cmd": "aibot_send_msg",
"headers": {"req_id": self._gen_req_id()},
"body": {
"chatid": receiver,
"chat_type": 2 if is_group else 1,
"msgtype": "markdown",
"markdown": {"content": content},
},
})
# ------------------------------------------------------------------
# Media upload (chunked)
# ------------------------------------------------------------------
def _upload_media(self, file_path: str, media_type: str = "file") -> str:
"""
Upload a local file to wecom bot via chunked upload protocol.
Returns media_id on success, empty string on failure.
"""
if not os.path.exists(file_path):
logger.error(f"[WecomBot] Upload file not found: {file_path}")
return ""
file_size = os.path.getsize(file_path)
if file_size < 5:
logger.error(f"[WecomBot] File too small: {file_size} bytes")
return ""
filename = os.path.basename(file_path)
total_chunks = math.ceil(file_size / MEDIA_CHUNK_SIZE)
if total_chunks > 100:
logger.error(f"[WecomBot] Too many chunks: {total_chunks} > 100")
return ""
file_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for block in iter(lambda: f.read(8192), b""):
file_md5.update(block)
md5_hex = file_md5.hexdigest()
# 1. Init upload
init_resp = self._send_and_wait({
"cmd": "aibot_upload_media_init",
"headers": {"req_id": self._gen_req_id()},
"body": {
"type": media_type,
"filename": filename,
"total_size": file_size,
"total_chunks": total_chunks,
"md5": md5_hex,
},
}, timeout=15)
if init_resp.get("errcode") != 0:
logger.error(f"[WecomBot] Upload init failed: {init_resp}")
return ""
upload_id = init_resp.get("body", {}).get("upload_id")
if not upload_id:
logger.error("[WecomBot] Failed to get upload_id")
return ""
# 2. Upload chunks
with open(file_path, "rb") as f:
for idx in range(total_chunks):
chunk = f.read(MEDIA_CHUNK_SIZE)
b64_data = base64.b64encode(chunk).decode("utf-8")
chunk_resp = self._send_and_wait({
"cmd": "aibot_upload_media_chunk",
"headers": {"req_id": self._gen_req_id()},
"body": {
"upload_id": upload_id,
"chunk_index": idx,
"base64_data": b64_data,
},
}, timeout=30)
if chunk_resp.get("errcode") != 0:
logger.error(f"[WecomBot] Chunk {idx} upload failed: {chunk_resp}")
return ""
# 3. Finish upload
finish_resp = self._send_and_wait({
"cmd": "aibot_upload_media_finish",
"headers": {"req_id": self._gen_req_id()},
"body": {"upload_id": upload_id},
}, timeout=30)
if finish_resp.get("errcode") != 0:
logger.error(f"[WecomBot] Upload finish failed: {finish_resp}")
return ""
media_id = finish_resp.get("body", {}).get("media_id", "")
if media_id:
logger.info(f"[WecomBot] Media uploaded: media_id={media_id}")
else:
logger.error("[WecomBot] Failed to get media_id from finish response")
return media_id