feat(channel): add wecom_bot channel

This commit is contained in:
zhayujie
2026-03-16 14:39:15 +08:00
parent c4b5f7fbae
commit d4480b695e
10 changed files with 995 additions and 20 deletions

View File

@@ -134,12 +134,13 @@ def _execute_agent_task(task: dict, agent_bridge):
elif channel_type == "dingtalk":
# DingTalk requires msg object, set to None for scheduled tasks
context["msg"] = None
# 如果是单聊,需要传递 sender_staff_id
if not is_group:
sender_staff_id = action.get("dingtalk_sender_staff_id")
if sender_staff_id:
context["dingtalk_sender_staff_id"] = sender_staff_id
elif channel_type == "wecom_bot":
context["msg"] = None
# Use Agent to execute the task
# Mark this as a scheduled task execution to prevent recursive task creation
context["is_scheduled_task"] = True
@@ -234,7 +235,9 @@ def _execute_send_message(task: dict, agent_bridge):
logger.debug(f"[Scheduler] DingTalk single chat: sender_staff_id={sender_staff_id}")
else:
logger.warning(f"[Scheduler] Task {task['id']}: DingTalk single chat message missing sender_staff_id")
elif channel_type == "wecom_bot":
context["msg"] = None
# Create reply
reply = Reply(ReplyType.TEXT, content)
@@ -327,31 +330,31 @@ def _execute_tool_call(task: dict, agent_bridge):
context["request_id"] = request_id
logger.debug(f"[Scheduler] Generated request_id for web channel: {request_id}")
elif channel_type == "feishu":
# Feishu channel: for scheduled tasks, send as new message (no msg_id to reply to)
context["receive_id_type"] = "chat_id" if is_group else "open_id"
context["msg"] = None
logger.debug(f"[Scheduler] Feishu: receive_id_type={context['receive_id_type']}, is_group={is_group}, receiver={receiver}")
elif channel_type == "wecom_bot":
context["msg"] = None
reply = Reply(ReplyType.TEXT, content)
# Get channel and send
from channel.channel_factory import create_channel
try:
channel = create_channel(channel_type)
if channel:
# For web channel, register the request_id to session mapping
if channel_type == "web" and hasattr(channel, 'request_to_session'):
channel.request_to_session[request_id] = receiver
logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}")
channel.send(reply, context)
logger.info(f"[Scheduler] Task {task['id']} executed: sent tool result to {receiver}")
else:
logger.error(f"[Scheduler] Failed to create channel: {channel_type}")
except Exception as e:
logger.error(f"[Scheduler] Failed to send tool result: {e}")
except Exception as e:
logger.error(f"[Scheduler] Error in _execute_tool_call: {e}")
@@ -409,7 +412,9 @@ def _execute_skill_call(task: dict, agent_bridge):
elif channel_type == "feishu":
context["receive_id_type"] = "chat_id" if is_group else "open_id"
context["msg"] = None
elif channel_type == "wecom_bot":
context["msg"] = None
# Use Agent to execute the skill
try:
# Don't clear history - scheduler tasks use isolated session_id so they won't pollute user conversations

View File

@@ -36,6 +36,30 @@ PPT_SUFFIXES: Set[str] = {".ppt", ".pptx"}
ALL_DOC_SUFFIXES = PDF_SUFFIXES | WORD_SUFFIXES | TEXT_SUFFIXES | SPREADSHEET_SUFFIXES | PPT_SUFFIXES
_CHARSET_RE = re.compile(r'charset\s*=\s*["\']?\s*([\w\-]+)', re.IGNORECASE)
_META_CHARSET_RE = re.compile(rb'<meta[^>]+charset\s*=\s*["\']?\s*([\w\-]+)', re.IGNORECASE)
_META_HTTP_EQUIV_RE = re.compile(
rb'<meta[^>]+http-equiv\s*=\s*["\']?Content-Type["\']?[^>]+content\s*=\s*["\'][^"\']*charset=([\w\-]+)',
re.IGNORECASE,
)
def _extract_charset_from_content_type(content_type: str) -> Optional[str]:
"""Extract charset from Content-Type header value."""
m = _CHARSET_RE.search(content_type)
return m.group(1) if m else None
def _extract_charset_from_html_meta(raw_bytes: bytes) -> Optional[str]:
"""Extract charset from HTML <meta> tags in the first few KB of raw bytes."""
m = _META_CHARSET_RE.search(raw_bytes)
if m:
return m.group(1).decode("ascii", errors="ignore")
m = _META_HTTP_EQUIV_RE.search(raw_bytes)
if m:
return m.group(1).decode("ascii", errors="ignore")
return None
def _get_url_suffix(url: str) -> str:
"""Extract file extension from URL path, ignoring query params."""
@@ -114,14 +138,7 @@ class WebFetch(BaseTool):
if self._is_binary_content_type(content_type) and not _is_document_url(url):
return self._handle_download_by_content_type(url, response, content_type)
# Fix encoding: use apparent_encoding to auto-detect, but keep Windows encodings as-is
if response.apparent_encoding and response.apparent_encoding.lower().startswith("windows"):
response.encoding = response.encoding
else:
response.encoding = response.apparent_encoding
if not response.encoding:
response.encoding = "utf-8"
response.encoding = self._detect_encoding(response)
html = response.text
title = self._extract_title(html)
text = self._extract_text(html)
@@ -306,6 +323,35 @@ class WebFetch(BaseTool):
return "\n\n".join(text_parts)
# ---- Encoding detection ----
@staticmethod
def _detect_encoding(response: requests.Response) -> str:
"""Detect response encoding with priority: Content-Type header > HTML meta > chardet > utf-8."""
# 1. Check Content-Type header for explicit charset
content_type = response.headers.get("Content-Type", "")
charset = _extract_charset_from_content_type(content_type)
if charset:
return charset
# 2. Scan raw bytes for HTML meta charset declaration
raw = response.content[:4096]
charset = _extract_charset_from_html_meta(raw)
if charset:
return charset
# 3. Use apparent_encoding (chardet-based detection) if confident enough
apparent = response.apparent_encoding
if apparent:
apparent_lower = apparent.lower()
# Trust CJK / Windows encodings detected by chardet
trusted_prefixes = ("utf", "gb", "big5", "euc", "shift_jis", "iso-2022", "windows", "ascii")
if any(apparent_lower.startswith(p) for p in trusted_prefixes):
return apparent
# 4. Fallback
return "utf-8"
# ---- Helper methods ----
def _ensure_tmp_dir(self) -> str:

1
app.py
View File

@@ -227,6 +227,7 @@ def _clear_singleton_cache(channel_name: str):
"wechatcom_app": "channel.wechatcom.wechatcomapp_channel.WechatComAppChannel",
const.FEISHU: "channel.feishu.feishu_channel.FeiShuChanel",
const.DINGTALK: "channel.dingtalk.dingtalk_channel.DingTalkChanel",
const.WECOM_BOT: "channel.wecom_bot.wecom_bot_channel.WecomBotChannel",
}
module_path = cls_map.get(channel_name)
if not module_path:

View File

@@ -33,6 +33,9 @@ def create_channel(channel_type) -> Channel:
elif channel_type == const.DINGTALK:
from channel.dingtalk.dingtalk_channel import DingTalkChanel
ch = DingTalkChanel()
elif channel_type == const.WECOM_BOT:
from channel.wecom_bot.wecom_bot_channel import WecomBotChannel
ch = WecomBotChannel()
else:
raise RuntimeError
ch.channel_type = channel_type

View File

View File

@@ -0,0 +1,697 @@
"""
WeCom (企业微信) AI Bot channel via WebSocket long connection.
Supports:
- Single chat and group chat (text / image / file input & output)
- Scheduled task push via aibot_send_msg
- Heartbeat keep-alive and auto-reconnect
"""
import base64
import hashlib
import json
import math
import os
import threading
import time
import uuid
import requests
import websocket
from bridge.context import Context, ContextType
from bridge.reply import Reply, ReplyType
from channel.chat_channel import ChatChannel, check_prefix
from channel.wecom_bot.wecom_bot_message import WecomBotMessage
from common.expired_dict import ExpiredDict
from common.log import logger
from common.singleton import singleton
from config import conf
WECOM_WS_URL = "wss://openws.work.weixin.qq.com"
HEARTBEAT_INTERVAL = 30
MEDIA_CHUNK_SIZE = 512 * 1024 # 512KB per chunk (before base64 encoding)
@singleton
class WecomBotChannel(ChatChannel):
def __init__(self):
super().__init__()
self.bot_id = ""
self.bot_secret = ""
self.received_msgs = ExpiredDict(60 * 60 * 7.1)
self._ws = None
self._ws_thread = None
self._heartbeat_thread = None
self._connected = False
self._stop_event = threading.Event()
self._pending_responses = {} # req_id -> (threading.Event, result_holder)
self._pending_lock = threading.Lock()
conf()["group_name_white_list"] = ["ALL_GROUP"]
conf()["single_chat_prefix"] = [""]
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def startup(self):
self.bot_id = conf().get("wecom_bot_id", "")
self.bot_secret = conf().get("wecom_bot_secret", "")
if not self.bot_id or not self.bot_secret:
err = "[WecomBot] wecom_bot_id and wecom_bot_secret are required"
logger.error(err)
self.report_startup_error(err)
return
self._stop_event.clear()
self._start_ws()
def stop(self):
logger.info("[WecomBot] stop() called")
self._stop_event.set()
if self._ws:
try:
self._ws.close()
except Exception:
pass
self._ws = None
self._connected = False
# ------------------------------------------------------------------
# WebSocket connection
# ------------------------------------------------------------------
def _start_ws(self):
def _on_open(ws):
logger.info("[WecomBot] WebSocket connected, sending subscribe...")
self._send_subscribe()
def _on_message(ws, raw):
try:
data = json.loads(raw)
self._handle_ws_message(data)
except Exception as e:
logger.error(f"[WecomBot] Failed to handle ws message: {e}", exc_info=True)
def _on_error(ws, error):
logger.error(f"[WecomBot] WebSocket error: {error}")
def _on_close(ws, close_status_code, close_msg):
logger.warning(f"[WecomBot] WebSocket closed: status={close_status_code}, msg={close_msg}")
self._connected = False
if not self._stop_event.is_set():
logger.info("[WecomBot] Will reconnect in 5s...")
time.sleep(5)
if not self._stop_event.is_set():
self._start_ws()
self._ws = websocket.WebSocketApp(
WECOM_WS_URL,
on_open=_on_open,
on_message=_on_message,
on_error=_on_error,
on_close=_on_close,
)
def run_forever():
try:
self._ws.run_forever(ping_interval=0, reconnect=0)
except (SystemExit, KeyboardInterrupt):
logger.info("[WecomBot] WebSocket thread interrupted")
except Exception as e:
logger.error(f"[WecomBot] WebSocket run_forever error: {e}")
self._ws_thread = threading.Thread(target=run_forever, daemon=True)
self._ws_thread.start()
self._ws_thread.join()
def _ws_send(self, data: dict):
if self._ws:
self._ws.send(json.dumps(data, ensure_ascii=False))
def _gen_req_id(self) -> str:
return uuid.uuid4().hex[:16]
# ------------------------------------------------------------------
# Subscribe & heartbeat
# ------------------------------------------------------------------
def _send_subscribe(self):
self._ws_send({
"cmd": "aibot_subscribe",
"headers": {"req_id": self._gen_req_id()},
"body": {
"bot_id": self.bot_id,
"secret": self.bot_secret,
},
})
def _start_heartbeat(self):
if self._heartbeat_thread and self._heartbeat_thread.is_alive():
return
def heartbeat_loop():
while not self._stop_event.is_set() and self._connected:
try:
self._ws_send({
"cmd": "ping",
"headers": {"req_id": self._gen_req_id()},
})
except Exception as e:
logger.warning(f"[WecomBot] Heartbeat send failed: {e}")
break
self._stop_event.wait(HEARTBEAT_INTERVAL)
self._heartbeat_thread = threading.Thread(target=heartbeat_loop, daemon=True)
self._heartbeat_thread.start()
# ------------------------------------------------------------------
# Incoming message dispatch
# ------------------------------------------------------------------
def _send_and_wait(self, data: dict, timeout: float = 15) -> dict:
"""Send a ws message and wait for the matching response by req_id."""
req_id = data.get("headers", {}).get("req_id", "")
event = threading.Event()
holder = {"data": None}
with self._pending_lock:
self._pending_responses[req_id] = (event, holder)
self._ws_send(data)
event.wait(timeout=timeout)
with self._pending_lock:
self._pending_responses.pop(req_id, None)
return holder["data"] or {}
def _handle_ws_message(self, data: dict):
cmd = data.get("cmd", "")
errcode = data.get("errcode")
req_id = data.get("headers", {}).get("req_id", "")
# Check if this is a response to a pending request
if req_id:
with self._pending_lock:
pending = self._pending_responses.get(req_id)
if pending:
event, holder = pending
holder["data"] = data
event.set()
return
# Subscribe response
if errcode is not None and cmd == "":
if errcode == 0:
logger.info("[WecomBot] Subscribe success")
self._connected = True
self._start_heartbeat()
self.report_startup_success()
else:
errmsg = data.get("errmsg", "unknown error")
logger.error(f"[WecomBot] Subscribe failed: errcode={errcode}, errmsg={errmsg}")
self.report_startup_error(errmsg)
return
if cmd == "aibot_msg_callback":
self._handle_msg_callback(data)
elif cmd == "aibot_event_callback":
self._handle_event_callback(data)
elif cmd == "":
if errcode and errcode != 0:
logger.warning(f"[WecomBot] Response error: {data}")
# ------------------------------------------------------------------
# Message callback
# ------------------------------------------------------------------
def _handle_msg_callback(self, data: dict):
body = data.get("body", {})
req_id = data.get("headers", {}).get("req_id", "")
msg_id = body.get("msgid", "")
if self.received_msgs.get(msg_id):
logger.debug(f"[WecomBot] Duplicate msg filtered: {msg_id}")
return
self.received_msgs[msg_id] = True
chattype = body.get("chattype", "single")
is_group = chattype == "group"
try:
wecom_msg = WecomBotMessage(body, is_group=is_group)
except NotImplementedError as e:
logger.warning(f"[WecomBot] {e}")
return
except Exception as e:
logger.error(f"[WecomBot] Failed to parse message: {e}", exc_info=True)
return
wecom_msg.req_id = req_id
# File cache logic (same pattern as feishu)
from channel.file_cache import get_file_cache
file_cache = get_file_cache()
if is_group:
if conf().get("group_shared_session", True):
session_id = body.get("chatid", "")
else:
session_id = wecom_msg.from_user_id + "_" + body.get("chatid", "")
else:
session_id = wecom_msg.from_user_id
if wecom_msg.ctype == ContextType.IMAGE:
if hasattr(wecom_msg, "image_path") and wecom_msg.image_path:
file_cache.add(session_id, wecom_msg.image_path, file_type="image")
logger.info(f"[WecomBot] Image cached for session {session_id}")
return
if wecom_msg.ctype == ContextType.FILE:
wecom_msg.prepare()
file_cache.add(session_id, wecom_msg.content, file_type="file")
logger.info(f"[WecomBot] File cached for session {session_id}: {wecom_msg.content}")
return
if wecom_msg.ctype == ContextType.TEXT:
cached_files = file_cache.get(session_id)
if cached_files:
file_refs = []
for fi in cached_files:
ftype = fi["type"]
fpath = fi["path"]
if ftype == "image":
file_refs.append(f"[图片: {fpath}]")
elif ftype == "video":
file_refs.append(f"[视频: {fpath}]")
else:
file_refs.append(f"[文件: {fpath}]")
wecom_msg.content = wecom_msg.content + "\n" + "\n".join(file_refs)
logger.info(f"[WecomBot] Attached {len(cached_files)} cached file(s)")
file_cache.clear(session_id)
context = self._compose_context(
wecom_msg.ctype,
wecom_msg.content,
isgroup=is_group,
msg=wecom_msg,
no_need_at=True,
)
if context:
self.produce(context)
# ------------------------------------------------------------------
# Event callback
# ------------------------------------------------------------------
def _handle_event_callback(self, data: dict):
body = data.get("body", {})
event = body.get("event", {})
event_type = event.get("eventtype", "")
if event_type == "enter_chat":
logger.info(f"[WecomBot] User entered chat: {body.get('from', {}).get('userid')}")
elif event_type == "disconnected_event":
logger.warning("[WecomBot] Received disconnected_event, another connection took over")
else:
logger.debug(f"[WecomBot] Event: {event_type}")
# ------------------------------------------------------------------
# _compose_context (same pattern as feishu)
# ------------------------------------------------------------------
def _compose_context(self, ctype: ContextType, content, **kwargs):
context = Context(ctype, content)
context.kwargs = kwargs
if "channel_type" not in context:
context["channel_type"] = self.channel_type
if "origin_ctype" not in context:
context["origin_ctype"] = ctype
cmsg = context["msg"]
if cmsg.is_group:
if conf().get("group_shared_session", True):
context["session_id"] = cmsg.other_user_id
else:
context["session_id"] = f"{cmsg.from_user_id}:{cmsg.other_user_id}"
else:
context["session_id"] = cmsg.from_user_id
context["receiver"] = cmsg.other_user_id
if ctype == ContextType.TEXT:
img_match_prefix = check_prefix(content, conf().get("image_create_prefix"))
if img_match_prefix:
content = content.replace(img_match_prefix, "", 1)
context.type = ContextType.IMAGE_CREATE
else:
context.type = ContextType.TEXT
context.content = content.strip()
return context
# ------------------------------------------------------------------
# Send reply
# ------------------------------------------------------------------
def send(self, reply: Reply, context: Context):
msg = context.get("msg")
is_group = context.get("isgroup", False)
receiver = context.get("receiver", "")
# Determine req_id for responding or use send_msg for scheduled push
req_id = getattr(msg, "req_id", None) if msg else None
if reply.type == ReplyType.TEXT:
self._send_text(reply.content, receiver, is_group, req_id)
elif reply.type in (ReplyType.IMAGE_URL, ReplyType.IMAGE):
self._send_image(reply.content, receiver, is_group, req_id)
elif reply.type == ReplyType.FILE:
if hasattr(reply, "text_content") and reply.text_content:
self._send_text(reply.text_content, receiver, is_group, req_id)
time.sleep(0.3)
self._send_file(reply.content, receiver, is_group, req_id)
elif reply.type == ReplyType.VIDEO or reply.type == ReplyType.VIDEO_URL:
self._send_file(reply.content, receiver, is_group, req_id, media_type="video")
else:
logger.warning(f"[WecomBot] Unsupported reply type: {reply.type}, falling back to text")
self._send_text(str(reply.content), receiver, is_group, req_id)
# ------------------------------------------------------------------
# Respond message (via websocket)
# ------------------------------------------------------------------
def _send_text(self, content: str, receiver: str, is_group: bool, req_id: str = None):
"""Send text/markdown reply."""
if req_id:
stream_id = uuid.uuid4().hex[:16]
self._ws_send({
"cmd": "aibot_respond_msg",
"headers": {"req_id": req_id},
"body": {
"msgtype": "stream",
"stream": {
"id": stream_id,
"finish": True,
"content": content,
},
},
})
else:
self._active_send_markdown(content, receiver, is_group)
def _send_image(self, img_path_or_url: str, receiver: str, is_group: bool, req_id: str = None):
"""Send image reply. Converts to JPG/PNG and compresses if >10MB."""
local_path = img_path_or_url
if local_path.startswith("file://"):
local_path = local_path[7:]
if local_path.startswith(("http://", "https://")):
try:
resp = requests.get(local_path, timeout=30)
resp.raise_for_status()
ct = resp.headers.get("Content-Type", "")
if "jpeg" in ct or "jpg" in ct:
ext = ".jpg"
elif "webp" in ct:
ext = ".webp"
elif "gif" in ct:
ext = ".gif"
else:
ext = ".png"
tmp_path = f"/tmp/wecom_img_{uuid.uuid4().hex[:8]}{ext}"
with open(tmp_path, "wb") as f:
f.write(resp.content)
logger.info(f"[WecomBot] Image downloaded: size={len(resp.content)}, "
f"content-type={ct}, path={tmp_path}")
local_path = tmp_path
except Exception as e:
logger.error(f"[WecomBot] Failed to download image for sending: {e}")
self._send_text("[Image send failed]", receiver, is_group, req_id)
return
if not os.path.exists(local_path):
logger.error(f"[WecomBot] Image file not found: {local_path}")
return
max_image_size = 10 * 1024 * 1024 # 10MB limit for image type
local_path = self._ensure_image_format(local_path)
if not local_path:
self._send_text("[Image format conversion failed]", receiver, is_group, req_id)
return
if os.path.getsize(local_path) > max_image_size:
local_path = self._compress_image(local_path, max_image_size)
if not local_path:
self._send_text("[Image too large]", receiver, is_group, req_id)
return
file_size = os.path.getsize(local_path)
logger.info(f"[WecomBot] Uploading image: path={local_path}, size={file_size} bytes")
media_id = self._upload_media(local_path, "image")
if not media_id:
logger.error("[WecomBot] Failed to upload image")
self._send_text("[Image upload failed]", receiver, is_group, req_id)
return
if req_id:
self._ws_send({
"cmd": "aibot_respond_msg",
"headers": {"req_id": req_id},
"body": {
"msgtype": "image",
"image": {"media_id": media_id},
},
})
else:
self._ws_send({
"cmd": "aibot_send_msg",
"headers": {"req_id": self._gen_req_id()},
"body": {
"chatid": receiver,
"chat_type": 2 if is_group else 1,
"msgtype": "image",
"image": {"media_id": media_id},
},
})
@staticmethod
def _ensure_image_format(file_path: str) -> str:
"""Ensure image is JPG or PNG (the only formats wecom supports). Convert if needed."""
try:
from PIL import Image
img = Image.open(file_path)
fmt = (img.format or "").upper()
if fmt in ("JPEG", "PNG"):
# Already a supported format, but make sure the filename extension matches
ext = os.path.splitext(file_path)[1].lower()
if fmt == "JPEG" and ext in (".jpg", ".jpeg"):
return file_path
if fmt == "PNG" and ext == ".png":
return file_path
# Extension doesn't match — rename/copy with correct extension
correct_ext = ".jpg" if fmt == "JPEG" else ".png"
out_path = f"/tmp/wecom_fmt_{uuid.uuid4().hex[:8]}{correct_ext}"
img.save(out_path, fmt)
logger.info(f"[WecomBot] Image renamed: {file_path} -> {out_path} ({fmt})")
return out_path
# Unsupported format (WebP, GIF, BMP, etc.) — convert to PNG
if img.mode == "RGBA":
out_path = f"/tmp/wecom_fmt_{uuid.uuid4().hex[:8]}.png"
img.save(out_path, "PNG")
else:
out_path = f"/tmp/wecom_fmt_{uuid.uuid4().hex[:8]}.jpg"
img.convert("RGB").save(out_path, "JPEG", quality=90)
logger.info(f"[WecomBot] Image converted from {fmt} -> {out_path}")
return out_path
except Exception as e:
logger.error(f"[WecomBot] Image format check failed: {e}")
return file_path
@staticmethod
def _compress_image(file_path: str, max_bytes: int) -> str:
"""Compress image to fit within max_bytes. Returns new path or empty string."""
try:
from PIL import Image
img = Image.open(file_path)
if img.mode == "RGBA":
img = img.convert("RGB")
out_path = f"/tmp/wecom_compressed_{uuid.uuid4().hex[:8]}.jpg"
quality = 85
while quality >= 30:
img.save(out_path, "JPEG", quality=quality, optimize=True)
if os.path.getsize(out_path) <= max_bytes:
logger.info(f"[WecomBot] Image compressed: quality={quality}, "
f"size={os.path.getsize(out_path)} bytes")
return out_path
quality -= 10
# Still too large — resize
ratio = (max_bytes / os.path.getsize(out_path)) ** 0.5
new_size = (int(img.width * ratio), int(img.height * ratio))
img = img.resize(new_size, Image.LANCZOS)
img.save(out_path, "JPEG", quality=70, optimize=True)
if os.path.getsize(out_path) <= max_bytes:
logger.info(f"[WecomBot] Image compressed with resize: {new_size}, "
f"size={os.path.getsize(out_path)} bytes")
return out_path
logger.error(f"[WecomBot] Cannot compress image below {max_bytes} bytes")
return ""
except Exception as e:
logger.error(f"[WecomBot] Image compression failed: {e}")
return ""
def _send_file(self, file_path: str, receiver: str, is_group: bool,
req_id: str = None, media_type: str = "file"):
"""Send file/video reply by uploading media first."""
local_path = file_path
if local_path.startswith("file://"):
local_path = local_path[7:]
if local_path.startswith(("http://", "https://")):
try:
resp = requests.get(local_path, timeout=60)
resp.raise_for_status()
ext = os.path.splitext(local_path)[1] or ".bin"
tmp_path = f"/tmp/wecom_file_{uuid.uuid4().hex[:8]}{ext}"
with open(tmp_path, "wb") as f:
f.write(resp.content)
local_path = tmp_path
except Exception as e:
logger.error(f"[WecomBot] Failed to download file for sending: {e}")
return
if not os.path.exists(local_path):
logger.error(f"[WecomBot] File not found: {local_path}")
return
media_id = self._upload_media(local_path, media_type)
if not media_id:
logger.error(f"[WecomBot] Failed to upload {media_type}")
return
if req_id:
self._ws_send({
"cmd": "aibot_respond_msg",
"headers": {"req_id": req_id},
"body": {
"msgtype": media_type,
media_type: {"media_id": media_id},
},
})
else:
self._ws_send({
"cmd": "aibot_send_msg",
"headers": {"req_id": self._gen_req_id()},
"body": {
"chatid": receiver,
"chat_type": 2 if is_group else 1,
"msgtype": media_type,
media_type: {"media_id": media_id},
},
})
def _active_send_markdown(self, content: str, receiver: str, is_group: bool):
"""Proactively send markdown message (for scheduled tasks, no req_id)."""
self._ws_send({
"cmd": "aibot_send_msg",
"headers": {"req_id": self._gen_req_id()},
"body": {
"chatid": receiver,
"chat_type": 2 if is_group else 1,
"msgtype": "markdown",
"markdown": {"content": content},
},
})
# ------------------------------------------------------------------
# Media upload (chunked)
# ------------------------------------------------------------------
def _upload_media(self, file_path: str, media_type: str = "file") -> str:
"""
Upload a local file to wecom bot via chunked upload protocol.
Returns media_id on success, empty string on failure.
"""
if not os.path.exists(file_path):
logger.error(f"[WecomBot] Upload file not found: {file_path}")
return ""
file_size = os.path.getsize(file_path)
if file_size < 5:
logger.error(f"[WecomBot] File too small: {file_size} bytes")
return ""
filename = os.path.basename(file_path)
total_chunks = math.ceil(file_size / MEDIA_CHUNK_SIZE)
if total_chunks > 100:
logger.error(f"[WecomBot] Too many chunks: {total_chunks} > 100")
return ""
file_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for block in iter(lambda: f.read(8192), b""):
file_md5.update(block)
md5_hex = file_md5.hexdigest()
# 1. Init upload
init_resp = self._send_and_wait({
"cmd": "aibot_upload_media_init",
"headers": {"req_id": self._gen_req_id()},
"body": {
"type": media_type,
"filename": filename,
"total_size": file_size,
"total_chunks": total_chunks,
"md5": md5_hex,
},
}, timeout=15)
if init_resp.get("errcode") != 0:
logger.error(f"[WecomBot] Upload init failed: {init_resp}")
return ""
upload_id = init_resp.get("body", {}).get("upload_id")
if not upload_id:
logger.error("[WecomBot] Failed to get upload_id")
return ""
# 2. Upload chunks
with open(file_path, "rb") as f:
for idx in range(total_chunks):
chunk = f.read(MEDIA_CHUNK_SIZE)
b64_data = base64.b64encode(chunk).decode("utf-8")
chunk_resp = self._send_and_wait({
"cmd": "aibot_upload_media_chunk",
"headers": {"req_id": self._gen_req_id()},
"body": {
"upload_id": upload_id,
"chunk_index": idx,
"base64_data": b64_data,
},
}, timeout=30)
if chunk_resp.get("errcode") != 0:
logger.error(f"[WecomBot] Chunk {idx} upload failed: {chunk_resp}")
return ""
# 3. Finish upload
finish_resp = self._send_and_wait({
"cmd": "aibot_upload_media_finish",
"headers": {"req_id": self._gen_req_id()},
"body": {"upload_id": upload_id},
}, timeout=30)
if finish_resp.get("errcode") != 0:
logger.error(f"[WecomBot] Upload finish failed: {finish_resp}")
return ""
media_id = finish_resp.get("body", {}).get("media_id", "")
if media_id:
logger.info(f"[WecomBot] Media uploaded: media_id={media_id}")
else:
logger.error("[WecomBot] Failed to get media_id from finish response")
return media_id

View File

@@ -0,0 +1,216 @@
import os
import re
import base64
import requests
from bridge.context import ContextType
from channel.chat_message import ChatMessage
from common.log import logger
from common.utils import expand_path
from config import conf
from Crypto.Cipher import AES
MAGIC_SIGNATURES = [
(b"%PDF", ".pdf"),
(b"\x89PNG\r\n\x1a\n", ".png"),
(b"\xff\xd8\xff", ".jpg"),
(b"GIF87a", ".gif"),
(b"GIF89a", ".gif"),
(b"RIFF", ".webp"), # RIFF....WEBP, further checked below
(b"PK\x03\x04", ".zip"), # zip / docx / xlsx / pptx
(b"\x1f\x8b", ".gz"),
(b"Rar!\x1a\x07", ".rar"),
(b"7z\xbc\xaf\x27\x1c", ".7z"),
(b"\x00\x00\x00", ".mp4"), # ftyp box, further checked below
(b"#!AMR", ".amr"),
]
OFFICE_ZIP_MARKERS = {
b"word/": ".docx",
b"xl/": ".xlsx",
b"ppt/": ".pptx",
}
def _guess_ext_from_bytes(data: bytes) -> str:
"""Guess file extension from file content magic bytes."""
if not data or len(data) < 8:
return ""
for sig, ext in MAGIC_SIGNATURES:
if data[:len(sig)] == sig:
if ext == ".webp" and data[8:12] != b"WEBP":
continue
if ext == ".mp4":
if b"ftyp" not in data[4:12]:
continue
if ext == ".zip":
for marker, office_ext in OFFICE_ZIP_MARKERS.items():
if marker in data[:2000]:
return office_ext
return ".zip"
return ext
return ""
def _decrypt_media(url: str, aeskey: str) -> bytes:
"""
Download and decrypt AES-256-CBC encrypted media from wecom bot.
Returns decrypted bytes.
"""
resp = requests.get(url, timeout=30)
resp.raise_for_status()
encrypted = resp.content
key = base64.b64decode(aeskey + "=" * (-len(aeskey) % 4))
if len(key) != 32:
raise ValueError(f"Invalid AES key length: {len(key)}, expected 32")
iv = key[:16]
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = cipher.decrypt(encrypted)
pad_len = decrypted[-1]
if pad_len > 32:
raise ValueError(f"Invalid PKCS7 padding length: {pad_len}")
return decrypted[:-pad_len]
def _get_tmp_dir() -> str:
"""Return the workspace tmp directory (absolute path), creating it if needed."""
ws_root = expand_path(conf().get("agent_workspace", "~/cow"))
tmp_dir = os.path.join(ws_root, "tmp")
os.makedirs(tmp_dir, exist_ok=True)
return tmp_dir
class WecomBotMessage(ChatMessage):
"""Message wrapper for wecom bot (websocket long-connection mode)."""
def __init__(self, msg_body: dict, is_group: bool = False):
super().__init__(msg_body)
self.msg_id = msg_body.get("msgid")
self.create_time = msg_body.get("create_time")
self.is_group = is_group
msg_type = msg_body.get("msgtype")
from_userid = msg_body.get("from", {}).get("userid", "")
chat_id = msg_body.get("chatid", "")
bot_id = msg_body.get("aibotid", "")
if msg_type == "text":
self.ctype = ContextType.TEXT
content = msg_body.get("text", {}).get("content", "")
if is_group:
content = re.sub(r"@\S+\s*", "", content).strip()
self.content = content
elif msg_type == "voice":
self.ctype = ContextType.TEXT
self.content = msg_body.get("voice", {}).get("content", "")
elif msg_type == "image":
self.ctype = ContextType.IMAGE
image_info = msg_body.get("image", {})
image_url = image_info.get("url", "")
aeskey = image_info.get("aeskey", "")
tmp_dir = _get_tmp_dir()
image_path = os.path.join(tmp_dir, f"wecom_{self.msg_id}.png")
try:
data = _decrypt_media(image_url, aeskey)
with open(image_path, "wb") as f:
f.write(data)
self.content = image_path
self.image_path = image_path
logger.info(f"[WecomBot] Image downloaded: {image_path}")
except Exception as e:
logger.error(f"[WecomBot] Failed to download image: {e}")
self.content = "[Image download failed]"
self.image_path = None
elif msg_type == "mixed":
self.ctype = ContextType.TEXT
text_parts = []
image_paths = []
mixed_items = msg_body.get("mixed", {}).get("msg_item", [])
tmp_dir = _get_tmp_dir()
for idx, item in enumerate(mixed_items):
item_type = item.get("msgtype")
if item_type == "text":
txt = item.get("text", {}).get("content", "")
if is_group:
txt = re.sub(r"@\S+\s*", "", txt).strip()
if txt:
text_parts.append(txt)
elif item_type == "image":
img_info = item.get("image", {})
img_url = img_info.get("url", "")
img_aeskey = img_info.get("aeskey", "")
img_path = os.path.join(tmp_dir, f"wecom_{self.msg_id}_{idx}.png")
try:
img_data = _decrypt_media(img_url, img_aeskey)
with open(img_path, "wb") as f:
f.write(img_data)
image_paths.append(img_path)
except Exception as e:
logger.error(f"[WecomBot] Failed to download mixed image: {e}")
content_parts = text_parts[:]
for p in image_paths:
content_parts.append(f"[图片: {p}]")
self.content = "\n".join(content_parts) if content_parts else "[Mixed message]"
elif msg_type == "file":
self.ctype = ContextType.FILE
file_info = msg_body.get("file", {})
file_url = file_info.get("url", "")
aeskey = file_info.get("aeskey", "")
tmp_dir = _get_tmp_dir()
base_path = os.path.join(tmp_dir, f"wecom_{self.msg_id}")
self.content = base_path
def _download_file():
try:
data = _decrypt_media(file_url, aeskey)
ext = _guess_ext_from_bytes(data)
final_path = base_path + ext
with open(final_path, "wb") as f:
f.write(data)
self.content = final_path
logger.info(f"[WecomBot] File downloaded: {final_path}")
except Exception as e:
logger.error(f"[WecomBot] Failed to download file: {e}")
self._prepare_fn = _download_file
elif msg_type == "video":
self.ctype = ContextType.FILE
video_info = msg_body.get("video", {})
video_url = video_info.get("url", "")
aeskey = video_info.get("aeskey", "")
tmp_dir = _get_tmp_dir()
self.content = os.path.join(tmp_dir, f"wecom_{self.msg_id}.mp4")
def _download_video():
try:
data = _decrypt_media(video_url, aeskey)
with open(self.content, "wb") as f:
f.write(data)
logger.info(f"[WecomBot] Video downloaded: {self.content}")
except Exception as e:
logger.error(f"[WecomBot] Failed to download video: {e}")
self._prepare_fn = _download_video
else:
raise NotImplementedError(f"Unsupported message type: {msg_type}")
self.from_user_id = from_userid
self.to_user_id = bot_id
if is_group:
self.other_user_id = chat_id
self.actual_user_id = from_userid
self.actual_user_nickname = from_userid
else:
self.other_user_id = from_userid
self.actual_user_id = from_userid

View File

@@ -186,3 +186,4 @@ MODEL_LIST = MODEL_LIST + GITEE_AI_MODEL_LIST + MODELSCOPE_MODEL_LIST
# channel
FEISHU = "feishu"
DINGTALK = "dingtalk"
WECOM_BOT = "wecom_bot"

View File

@@ -150,11 +150,14 @@ available_setting = {
"dingtalk_client_id": "", # 钉钉机器人Client ID
"dingtalk_client_secret": "", # 钉钉机器人Client Secret
"dingtalk_card_enabled": False,
# 企微智能机器人配置(长连接模式)
"wecom_bot_id": "", # 企微智能机器人BotID
"wecom_bot_secret": "", # 企微智能机器人长连接Secret
# chatgpt指令自定义触发词
"clear_memory_commands": ["#清除记忆"], # 重置会话指令,必须以#开头
# channel配置
"channel_type": "", # 通道类型,支持多渠道同时运行。单个: "feishu",多个: "feishu, dingtalk" 或 ["feishu", "dingtalk"]。可选值: web,feishu,dingtalk,wechatmp,wechatmp_service,wechatcom_app
"channel_type": "", # 通道类型,支持多渠道同时运行。单个: "feishu",多个: "feishu, dingtalk" 或 ["feishu", "dingtalk"]。可选值: web,feishu,dingtalk,wecom_bot,wechatmp,wechatmp_service,wechatcom_app
"web_console": True, # 是否自动启动Web控制台默认启动。设为False可禁用
"subscribe_msg": "", # 订阅消息, 支持: wechatmp, wechatmp_service, wechatcom_app
"debug": False, # 是否开启debug模式开启后会打印更多日志

View File

@@ -22,3 +22,6 @@ dashscope
lark-oapi
# dingtalk
dingtalk_stream
# wecom bot websocket mode
websocket-client
pycryptodome