diff --git a/agent/tools/scheduler/integration.py b/agent/tools/scheduler/integration.py index cebc7c6..7d43236 100644 --- a/agent/tools/scheduler/integration.py +++ b/agent/tools/scheduler/integration.py @@ -134,12 +134,13 @@ def _execute_agent_task(task: dict, agent_bridge): elif channel_type == "dingtalk": # DingTalk requires msg object, set to None for scheduled tasks context["msg"] = None - # 如果是单聊,需要传递 sender_staff_id if not is_group: sender_staff_id = action.get("dingtalk_sender_staff_id") if sender_staff_id: context["dingtalk_sender_staff_id"] = sender_staff_id - + elif channel_type == "wecom_bot": + context["msg"] = None + # Use Agent to execute the task # Mark this as a scheduled task execution to prevent recursive task creation context["is_scheduled_task"] = True @@ -234,7 +235,9 @@ def _execute_send_message(task: dict, agent_bridge): logger.debug(f"[Scheduler] DingTalk single chat: sender_staff_id={sender_staff_id}") else: logger.warning(f"[Scheduler] Task {task['id']}: DingTalk single chat message missing sender_staff_id") - + elif channel_type == "wecom_bot": + context["msg"] = None + # Create reply reply = Reply(ReplyType.TEXT, content) @@ -327,31 +330,31 @@ def _execute_tool_call(task: dict, agent_bridge): context["request_id"] = request_id logger.debug(f"[Scheduler] Generated request_id for web channel: {request_id}") elif channel_type == "feishu": - # Feishu channel: for scheduled tasks, send as new message (no msg_id to reply to) context["receive_id_type"] = "chat_id" if is_group else "open_id" context["msg"] = None logger.debug(f"[Scheduler] Feishu: receive_id_type={context['receive_id_type']}, is_group={is_group}, receiver={receiver}") - + elif channel_type == "wecom_bot": + context["msg"] = None + reply = Reply(ReplyType.TEXT, content) - + # Get channel and send from channel.channel_factory import create_channel - + try: channel = create_channel(channel_type) if channel: - # For web channel, register the request_id to session mapping if channel_type == "web" and hasattr(channel, 'request_to_session'): channel.request_to_session[request_id] = receiver logger.debug(f"[Scheduler] Registered request_id {request_id} -> session {receiver}") - + channel.send(reply, context) logger.info(f"[Scheduler] Task {task['id']} executed: sent tool result to {receiver}") else: logger.error(f"[Scheduler] Failed to create channel: {channel_type}") except Exception as e: logger.error(f"[Scheduler] Failed to send tool result: {e}") - + except Exception as e: logger.error(f"[Scheduler] Error in _execute_tool_call: {e}") @@ -409,7 +412,9 @@ def _execute_skill_call(task: dict, agent_bridge): elif channel_type == "feishu": context["receive_id_type"] = "chat_id" if is_group else "open_id" context["msg"] = None - + elif channel_type == "wecom_bot": + context["msg"] = None + # Use Agent to execute the skill try: # Don't clear history - scheduler tasks use isolated session_id so they won't pollute user conversations diff --git a/agent/tools/web_fetch/web_fetch.py b/agent/tools/web_fetch/web_fetch.py index ca15ad3..83300b0 100644 --- a/agent/tools/web_fetch/web_fetch.py +++ b/agent/tools/web_fetch/web_fetch.py @@ -36,6 +36,30 @@ PPT_SUFFIXES: Set[str] = {".ppt", ".pptx"} ALL_DOC_SUFFIXES = PDF_SUFFIXES | WORD_SUFFIXES | TEXT_SUFFIXES | SPREADSHEET_SUFFIXES | PPT_SUFFIXES +_CHARSET_RE = re.compile(r'charset\s*=\s*["\']?\s*([\w\-]+)', re.IGNORECASE) +_META_CHARSET_RE = re.compile(rb']+charset\s*=\s*["\']?\s*([\w\-]+)', re.IGNORECASE) +_META_HTTP_EQUIV_RE = re.compile( + rb']+http-equiv\s*=\s*["\']?Content-Type["\']?[^>]+content\s*=\s*["\'][^"\']*charset=([\w\-]+)', + re.IGNORECASE, +) + + +def _extract_charset_from_content_type(content_type: str) -> Optional[str]: + """Extract charset from Content-Type header value.""" + m = _CHARSET_RE.search(content_type) + return m.group(1) if m else None + + +def _extract_charset_from_html_meta(raw_bytes: bytes) -> Optional[str]: + """Extract charset from HTML tags in the first few KB of raw bytes.""" + m = _META_CHARSET_RE.search(raw_bytes) + if m: + return m.group(1).decode("ascii", errors="ignore") + m = _META_HTTP_EQUIV_RE.search(raw_bytes) + if m: + return m.group(1).decode("ascii", errors="ignore") + return None + def _get_url_suffix(url: str) -> str: """Extract file extension from URL path, ignoring query params.""" @@ -114,14 +138,7 @@ class WebFetch(BaseTool): if self._is_binary_content_type(content_type) and not _is_document_url(url): return self._handle_download_by_content_type(url, response, content_type) - # Fix encoding: use apparent_encoding to auto-detect, but keep Windows encodings as-is - if response.apparent_encoding and response.apparent_encoding.lower().startswith("windows"): - response.encoding = response.encoding - else: - response.encoding = response.apparent_encoding - if not response.encoding: - response.encoding = "utf-8" - + response.encoding = self._detect_encoding(response) html = response.text title = self._extract_title(html) text = self._extract_text(html) @@ -306,6 +323,35 @@ class WebFetch(BaseTool): return "\n\n".join(text_parts) + # ---- Encoding detection ---- + + @staticmethod + def _detect_encoding(response: requests.Response) -> str: + """Detect response encoding with priority: Content-Type header > HTML meta > chardet > utf-8.""" + # 1. Check Content-Type header for explicit charset + content_type = response.headers.get("Content-Type", "") + charset = _extract_charset_from_content_type(content_type) + if charset: + return charset + + # 2. Scan raw bytes for HTML meta charset declaration + raw = response.content[:4096] + charset = _extract_charset_from_html_meta(raw) + if charset: + return charset + + # 3. Use apparent_encoding (chardet-based detection) if confident enough + apparent = response.apparent_encoding + if apparent: + apparent_lower = apparent.lower() + # Trust CJK / Windows encodings detected by chardet + trusted_prefixes = ("utf", "gb", "big5", "euc", "shift_jis", "iso-2022", "windows", "ascii") + if any(apparent_lower.startswith(p) for p in trusted_prefixes): + return apparent + + # 4. Fallback + return "utf-8" + # ---- Helper methods ---- def _ensure_tmp_dir(self) -> str: diff --git a/app.py b/app.py index f7b7eed..b26502f 100644 --- a/app.py +++ b/app.py @@ -227,6 +227,7 @@ def _clear_singleton_cache(channel_name: str): "wechatcom_app": "channel.wechatcom.wechatcomapp_channel.WechatComAppChannel", const.FEISHU: "channel.feishu.feishu_channel.FeiShuChanel", const.DINGTALK: "channel.dingtalk.dingtalk_channel.DingTalkChanel", + const.WECOM_BOT: "channel.wecom_bot.wecom_bot_channel.WecomBotChannel", } module_path = cls_map.get(channel_name) if not module_path: diff --git a/channel/channel_factory.py b/channel/channel_factory.py index 7c6e30f..3c3a6e8 100644 --- a/channel/channel_factory.py +++ b/channel/channel_factory.py @@ -33,6 +33,9 @@ def create_channel(channel_type) -> Channel: elif channel_type == const.DINGTALK: from channel.dingtalk.dingtalk_channel import DingTalkChanel ch = DingTalkChanel() + elif channel_type == const.WECOM_BOT: + from channel.wecom_bot.wecom_bot_channel import WecomBotChannel + ch = WecomBotChannel() else: raise RuntimeError ch.channel_type = channel_type diff --git a/channel/wecom_bot/__init__.py b/channel/wecom_bot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/channel/wecom_bot/wecom_bot_channel.py b/channel/wecom_bot/wecom_bot_channel.py new file mode 100644 index 0000000..da9def7 --- /dev/null +++ b/channel/wecom_bot/wecom_bot_channel.py @@ -0,0 +1,697 @@ +""" +WeCom (企业微信) AI Bot channel via WebSocket long connection. + +Supports: +- Single chat and group chat (text / image / file input & output) +- Scheduled task push via aibot_send_msg +- Heartbeat keep-alive and auto-reconnect +""" + +import base64 +import hashlib +import json +import math +import os +import threading +import time +import uuid + +import requests +import websocket + +from bridge.context import Context, ContextType +from bridge.reply import Reply, ReplyType +from channel.chat_channel import ChatChannel, check_prefix +from channel.wecom_bot.wecom_bot_message import WecomBotMessage +from common.expired_dict import ExpiredDict +from common.log import logger +from common.singleton import singleton +from config import conf + +WECOM_WS_URL = "wss://openws.work.weixin.qq.com" +HEARTBEAT_INTERVAL = 30 +MEDIA_CHUNK_SIZE = 512 * 1024 # 512KB per chunk (before base64 encoding) + + +@singleton +class WecomBotChannel(ChatChannel): + + def __init__(self): + super().__init__() + self.bot_id = "" + self.bot_secret = "" + self.received_msgs = ExpiredDict(60 * 60 * 7.1) + self._ws = None + self._ws_thread = None + self._heartbeat_thread = None + self._connected = False + self._stop_event = threading.Event() + self._pending_responses = {} # req_id -> (threading.Event, result_holder) + self._pending_lock = threading.Lock() + + conf()["group_name_white_list"] = ["ALL_GROUP"] + conf()["single_chat_prefix"] = [""] + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + def startup(self): + self.bot_id = conf().get("wecom_bot_id", "") + self.bot_secret = conf().get("wecom_bot_secret", "") + + if not self.bot_id or not self.bot_secret: + err = "[WecomBot] wecom_bot_id and wecom_bot_secret are required" + logger.error(err) + self.report_startup_error(err) + return + + self._stop_event.clear() + self._start_ws() + + def stop(self): + logger.info("[WecomBot] stop() called") + self._stop_event.set() + if self._ws: + try: + self._ws.close() + except Exception: + pass + self._ws = None + self._connected = False + + # ------------------------------------------------------------------ + # WebSocket connection + # ------------------------------------------------------------------ + + def _start_ws(self): + def _on_open(ws): + logger.info("[WecomBot] WebSocket connected, sending subscribe...") + self._send_subscribe() + + def _on_message(ws, raw): + try: + data = json.loads(raw) + self._handle_ws_message(data) + except Exception as e: + logger.error(f"[WecomBot] Failed to handle ws message: {e}", exc_info=True) + + def _on_error(ws, error): + logger.error(f"[WecomBot] WebSocket error: {error}") + + def _on_close(ws, close_status_code, close_msg): + logger.warning(f"[WecomBot] WebSocket closed: status={close_status_code}, msg={close_msg}") + self._connected = False + if not self._stop_event.is_set(): + logger.info("[WecomBot] Will reconnect in 5s...") + time.sleep(5) + if not self._stop_event.is_set(): + self._start_ws() + + self._ws = websocket.WebSocketApp( + WECOM_WS_URL, + on_open=_on_open, + on_message=_on_message, + on_error=_on_error, + on_close=_on_close, + ) + + def run_forever(): + try: + self._ws.run_forever(ping_interval=0, reconnect=0) + except (SystemExit, KeyboardInterrupt): + logger.info("[WecomBot] WebSocket thread interrupted") + except Exception as e: + logger.error(f"[WecomBot] WebSocket run_forever error: {e}") + + self._ws_thread = threading.Thread(target=run_forever, daemon=True) + self._ws_thread.start() + self._ws_thread.join() + + def _ws_send(self, data: dict): + if self._ws: + self._ws.send(json.dumps(data, ensure_ascii=False)) + + def _gen_req_id(self) -> str: + return uuid.uuid4().hex[:16] + + # ------------------------------------------------------------------ + # Subscribe & heartbeat + # ------------------------------------------------------------------ + + def _send_subscribe(self): + self._ws_send({ + "cmd": "aibot_subscribe", + "headers": {"req_id": self._gen_req_id()}, + "body": { + "bot_id": self.bot_id, + "secret": self.bot_secret, + }, + }) + + def _start_heartbeat(self): + if self._heartbeat_thread and self._heartbeat_thread.is_alive(): + return + + def heartbeat_loop(): + while not self._stop_event.is_set() and self._connected: + try: + self._ws_send({ + "cmd": "ping", + "headers": {"req_id": self._gen_req_id()}, + }) + except Exception as e: + logger.warning(f"[WecomBot] Heartbeat send failed: {e}") + break + self._stop_event.wait(HEARTBEAT_INTERVAL) + + self._heartbeat_thread = threading.Thread(target=heartbeat_loop, daemon=True) + self._heartbeat_thread.start() + + # ------------------------------------------------------------------ + # Incoming message dispatch + # ------------------------------------------------------------------ + + def _send_and_wait(self, data: dict, timeout: float = 15) -> dict: + """Send a ws message and wait for the matching response by req_id.""" + req_id = data.get("headers", {}).get("req_id", "") + event = threading.Event() + holder = {"data": None} + with self._pending_lock: + self._pending_responses[req_id] = (event, holder) + self._ws_send(data) + event.wait(timeout=timeout) + with self._pending_lock: + self._pending_responses.pop(req_id, None) + return holder["data"] or {} + + def _handle_ws_message(self, data: dict): + cmd = data.get("cmd", "") + errcode = data.get("errcode") + req_id = data.get("headers", {}).get("req_id", "") + + # Check if this is a response to a pending request + if req_id: + with self._pending_lock: + pending = self._pending_responses.get(req_id) + if pending: + event, holder = pending + holder["data"] = data + event.set() + return + + # Subscribe response + if errcode is not None and cmd == "": + if errcode == 0: + logger.info("[WecomBot] Subscribe success") + self._connected = True + self._start_heartbeat() + self.report_startup_success() + else: + errmsg = data.get("errmsg", "unknown error") + logger.error(f"[WecomBot] Subscribe failed: errcode={errcode}, errmsg={errmsg}") + self.report_startup_error(errmsg) + return + + if cmd == "aibot_msg_callback": + self._handle_msg_callback(data) + elif cmd == "aibot_event_callback": + self._handle_event_callback(data) + elif cmd == "": + if errcode and errcode != 0: + logger.warning(f"[WecomBot] Response error: {data}") + + # ------------------------------------------------------------------ + # Message callback + # ------------------------------------------------------------------ + + def _handle_msg_callback(self, data: dict): + body = data.get("body", {}) + req_id = data.get("headers", {}).get("req_id", "") + msg_id = body.get("msgid", "") + + if self.received_msgs.get(msg_id): + logger.debug(f"[WecomBot] Duplicate msg filtered: {msg_id}") + return + self.received_msgs[msg_id] = True + + chattype = body.get("chattype", "single") + is_group = chattype == "group" + + try: + wecom_msg = WecomBotMessage(body, is_group=is_group) + except NotImplementedError as e: + logger.warning(f"[WecomBot] {e}") + return + except Exception as e: + logger.error(f"[WecomBot] Failed to parse message: {e}", exc_info=True) + return + + wecom_msg.req_id = req_id + + # File cache logic (same pattern as feishu) + from channel.file_cache import get_file_cache + file_cache = get_file_cache() + + if is_group: + if conf().get("group_shared_session", True): + session_id = body.get("chatid", "") + else: + session_id = wecom_msg.from_user_id + "_" + body.get("chatid", "") + else: + session_id = wecom_msg.from_user_id + + if wecom_msg.ctype == ContextType.IMAGE: + if hasattr(wecom_msg, "image_path") and wecom_msg.image_path: + file_cache.add(session_id, wecom_msg.image_path, file_type="image") + logger.info(f"[WecomBot] Image cached for session {session_id}") + return + + if wecom_msg.ctype == ContextType.FILE: + wecom_msg.prepare() + file_cache.add(session_id, wecom_msg.content, file_type="file") + logger.info(f"[WecomBot] File cached for session {session_id}: {wecom_msg.content}") + return + + if wecom_msg.ctype == ContextType.TEXT: + cached_files = file_cache.get(session_id) + if cached_files: + file_refs = [] + for fi in cached_files: + ftype = fi["type"] + fpath = fi["path"] + if ftype == "image": + file_refs.append(f"[图片: {fpath}]") + elif ftype == "video": + file_refs.append(f"[视频: {fpath}]") + else: + file_refs.append(f"[文件: {fpath}]") + wecom_msg.content = wecom_msg.content + "\n" + "\n".join(file_refs) + logger.info(f"[WecomBot] Attached {len(cached_files)} cached file(s)") + file_cache.clear(session_id) + + context = self._compose_context( + wecom_msg.ctype, + wecom_msg.content, + isgroup=is_group, + msg=wecom_msg, + no_need_at=True, + ) + if context: + self.produce(context) + + # ------------------------------------------------------------------ + # Event callback + # ------------------------------------------------------------------ + + def _handle_event_callback(self, data: dict): + body = data.get("body", {}) + event = body.get("event", {}) + event_type = event.get("eventtype", "") + + if event_type == "enter_chat": + logger.info(f"[WecomBot] User entered chat: {body.get('from', {}).get('userid')}") + elif event_type == "disconnected_event": + logger.warning("[WecomBot] Received disconnected_event, another connection took over") + else: + logger.debug(f"[WecomBot] Event: {event_type}") + + # ------------------------------------------------------------------ + # _compose_context (same pattern as feishu) + # ------------------------------------------------------------------ + + def _compose_context(self, ctype: ContextType, content, **kwargs): + context = Context(ctype, content) + context.kwargs = kwargs + if "channel_type" not in context: + context["channel_type"] = self.channel_type + if "origin_ctype" not in context: + context["origin_ctype"] = ctype + + cmsg = context["msg"] + + if cmsg.is_group: + if conf().get("group_shared_session", True): + context["session_id"] = cmsg.other_user_id + else: + context["session_id"] = f"{cmsg.from_user_id}:{cmsg.other_user_id}" + else: + context["session_id"] = cmsg.from_user_id + + context["receiver"] = cmsg.other_user_id + + if ctype == ContextType.TEXT: + img_match_prefix = check_prefix(content, conf().get("image_create_prefix")) + if img_match_prefix: + content = content.replace(img_match_prefix, "", 1) + context.type = ContextType.IMAGE_CREATE + else: + context.type = ContextType.TEXT + context.content = content.strip() + + return context + + # ------------------------------------------------------------------ + # Send reply + # ------------------------------------------------------------------ + + def send(self, reply: Reply, context: Context): + msg = context.get("msg") + is_group = context.get("isgroup", False) + receiver = context.get("receiver", "") + + # Determine req_id for responding or use send_msg for scheduled push + req_id = getattr(msg, "req_id", None) if msg else None + + if reply.type == ReplyType.TEXT: + self._send_text(reply.content, receiver, is_group, req_id) + elif reply.type in (ReplyType.IMAGE_URL, ReplyType.IMAGE): + self._send_image(reply.content, receiver, is_group, req_id) + elif reply.type == ReplyType.FILE: + if hasattr(reply, "text_content") and reply.text_content: + self._send_text(reply.text_content, receiver, is_group, req_id) + time.sleep(0.3) + self._send_file(reply.content, receiver, is_group, req_id) + elif reply.type == ReplyType.VIDEO or reply.type == ReplyType.VIDEO_URL: + self._send_file(reply.content, receiver, is_group, req_id, media_type="video") + else: + logger.warning(f"[WecomBot] Unsupported reply type: {reply.type}, falling back to text") + self._send_text(str(reply.content), receiver, is_group, req_id) + + # ------------------------------------------------------------------ + # Respond message (via websocket) + # ------------------------------------------------------------------ + + def _send_text(self, content: str, receiver: str, is_group: bool, req_id: str = None): + """Send text/markdown reply.""" + if req_id: + stream_id = uuid.uuid4().hex[:16] + self._ws_send({ + "cmd": "aibot_respond_msg", + "headers": {"req_id": req_id}, + "body": { + "msgtype": "stream", + "stream": { + "id": stream_id, + "finish": True, + "content": content, + }, + }, + }) + else: + self._active_send_markdown(content, receiver, is_group) + + def _send_image(self, img_path_or_url: str, receiver: str, is_group: bool, req_id: str = None): + """Send image reply. Converts to JPG/PNG and compresses if >10MB.""" + local_path = img_path_or_url + if local_path.startswith("file://"): + local_path = local_path[7:] + + if local_path.startswith(("http://", "https://")): + try: + resp = requests.get(local_path, timeout=30) + resp.raise_for_status() + ct = resp.headers.get("Content-Type", "") + if "jpeg" in ct or "jpg" in ct: + ext = ".jpg" + elif "webp" in ct: + ext = ".webp" + elif "gif" in ct: + ext = ".gif" + else: + ext = ".png" + tmp_path = f"/tmp/wecom_img_{uuid.uuid4().hex[:8]}{ext}" + with open(tmp_path, "wb") as f: + f.write(resp.content) + logger.info(f"[WecomBot] Image downloaded: size={len(resp.content)}, " + f"content-type={ct}, path={tmp_path}") + local_path = tmp_path + except Exception as e: + logger.error(f"[WecomBot] Failed to download image for sending: {e}") + self._send_text("[Image send failed]", receiver, is_group, req_id) + return + + if not os.path.exists(local_path): + logger.error(f"[WecomBot] Image file not found: {local_path}") + return + + max_image_size = 10 * 1024 * 1024 # 10MB limit for image type + local_path = self._ensure_image_format(local_path) + if not local_path: + self._send_text("[Image format conversion failed]", receiver, is_group, req_id) + return + + if os.path.getsize(local_path) > max_image_size: + local_path = self._compress_image(local_path, max_image_size) + if not local_path: + self._send_text("[Image too large]", receiver, is_group, req_id) + return + + file_size = os.path.getsize(local_path) + logger.info(f"[WecomBot] Uploading image: path={local_path}, size={file_size} bytes") + media_id = self._upload_media(local_path, "image") + if not media_id: + logger.error("[WecomBot] Failed to upload image") + self._send_text("[Image upload failed]", receiver, is_group, req_id) + return + + if req_id: + self._ws_send({ + "cmd": "aibot_respond_msg", + "headers": {"req_id": req_id}, + "body": { + "msgtype": "image", + "image": {"media_id": media_id}, + }, + }) + else: + self._ws_send({ + "cmd": "aibot_send_msg", + "headers": {"req_id": self._gen_req_id()}, + "body": { + "chatid": receiver, + "chat_type": 2 if is_group else 1, + "msgtype": "image", + "image": {"media_id": media_id}, + }, + }) + + @staticmethod + def _ensure_image_format(file_path: str) -> str: + """Ensure image is JPG or PNG (the only formats wecom supports). Convert if needed.""" + try: + from PIL import Image + img = Image.open(file_path) + fmt = (img.format or "").upper() + if fmt in ("JPEG", "PNG"): + # Already a supported format, but make sure the filename extension matches + ext = os.path.splitext(file_path)[1].lower() + if fmt == "JPEG" and ext in (".jpg", ".jpeg"): + return file_path + if fmt == "PNG" and ext == ".png": + return file_path + # Extension doesn't match — rename/copy with correct extension + correct_ext = ".jpg" if fmt == "JPEG" else ".png" + out_path = f"/tmp/wecom_fmt_{uuid.uuid4().hex[:8]}{correct_ext}" + img.save(out_path, fmt) + logger.info(f"[WecomBot] Image renamed: {file_path} -> {out_path} ({fmt})") + return out_path + + # Unsupported format (WebP, GIF, BMP, etc.) — convert to PNG + if img.mode == "RGBA": + out_path = f"/tmp/wecom_fmt_{uuid.uuid4().hex[:8]}.png" + img.save(out_path, "PNG") + else: + out_path = f"/tmp/wecom_fmt_{uuid.uuid4().hex[:8]}.jpg" + img.convert("RGB").save(out_path, "JPEG", quality=90) + logger.info(f"[WecomBot] Image converted from {fmt} -> {out_path}") + return out_path + except Exception as e: + logger.error(f"[WecomBot] Image format check failed: {e}") + return file_path + + @staticmethod + def _compress_image(file_path: str, max_bytes: int) -> str: + """Compress image to fit within max_bytes. Returns new path or empty string.""" + try: + from PIL import Image + img = Image.open(file_path) + if img.mode == "RGBA": + img = img.convert("RGB") + + out_path = f"/tmp/wecom_compressed_{uuid.uuid4().hex[:8]}.jpg" + quality = 85 + while quality >= 30: + img.save(out_path, "JPEG", quality=quality, optimize=True) + if os.path.getsize(out_path) <= max_bytes: + logger.info(f"[WecomBot] Image compressed: quality={quality}, " + f"size={os.path.getsize(out_path)} bytes") + return out_path + quality -= 10 + + # Still too large — resize + ratio = (max_bytes / os.path.getsize(out_path)) ** 0.5 + new_size = (int(img.width * ratio), int(img.height * ratio)) + img = img.resize(new_size, Image.LANCZOS) + img.save(out_path, "JPEG", quality=70, optimize=True) + if os.path.getsize(out_path) <= max_bytes: + logger.info(f"[WecomBot] Image compressed with resize: {new_size}, " + f"size={os.path.getsize(out_path)} bytes") + return out_path + + logger.error(f"[WecomBot] Cannot compress image below {max_bytes} bytes") + return "" + except Exception as e: + logger.error(f"[WecomBot] Image compression failed: {e}") + return "" + + def _send_file(self, file_path: str, receiver: str, is_group: bool, + req_id: str = None, media_type: str = "file"): + """Send file/video reply by uploading media first.""" + local_path = file_path + if local_path.startswith("file://"): + local_path = local_path[7:] + + if local_path.startswith(("http://", "https://")): + try: + resp = requests.get(local_path, timeout=60) + resp.raise_for_status() + ext = os.path.splitext(local_path)[1] or ".bin" + tmp_path = f"/tmp/wecom_file_{uuid.uuid4().hex[:8]}{ext}" + with open(tmp_path, "wb") as f: + f.write(resp.content) + local_path = tmp_path + except Exception as e: + logger.error(f"[WecomBot] Failed to download file for sending: {e}") + return + + if not os.path.exists(local_path): + logger.error(f"[WecomBot] File not found: {local_path}") + return + + media_id = self._upload_media(local_path, media_type) + if not media_id: + logger.error(f"[WecomBot] Failed to upload {media_type}") + return + + if req_id: + self._ws_send({ + "cmd": "aibot_respond_msg", + "headers": {"req_id": req_id}, + "body": { + "msgtype": media_type, + media_type: {"media_id": media_id}, + }, + }) + else: + self._ws_send({ + "cmd": "aibot_send_msg", + "headers": {"req_id": self._gen_req_id()}, + "body": { + "chatid": receiver, + "chat_type": 2 if is_group else 1, + "msgtype": media_type, + media_type: {"media_id": media_id}, + }, + }) + + def _active_send_markdown(self, content: str, receiver: str, is_group: bool): + """Proactively send markdown message (for scheduled tasks, no req_id).""" + self._ws_send({ + "cmd": "aibot_send_msg", + "headers": {"req_id": self._gen_req_id()}, + "body": { + "chatid": receiver, + "chat_type": 2 if is_group else 1, + "msgtype": "markdown", + "markdown": {"content": content}, + }, + }) + + # ------------------------------------------------------------------ + # Media upload (chunked) + # ------------------------------------------------------------------ + + def _upload_media(self, file_path: str, media_type: str = "file") -> str: + """ + Upload a local file to wecom bot via chunked upload protocol. + Returns media_id on success, empty string on failure. + """ + if not os.path.exists(file_path): + logger.error(f"[WecomBot] Upload file not found: {file_path}") + return "" + + file_size = os.path.getsize(file_path) + if file_size < 5: + logger.error(f"[WecomBot] File too small: {file_size} bytes") + return "" + + filename = os.path.basename(file_path) + total_chunks = math.ceil(file_size / MEDIA_CHUNK_SIZE) + if total_chunks > 100: + logger.error(f"[WecomBot] Too many chunks: {total_chunks} > 100") + return "" + + file_md5 = hashlib.md5() + with open(file_path, "rb") as f: + for block in iter(lambda: f.read(8192), b""): + file_md5.update(block) + md5_hex = file_md5.hexdigest() + + # 1. Init upload + init_resp = self._send_and_wait({ + "cmd": "aibot_upload_media_init", + "headers": {"req_id": self._gen_req_id()}, + "body": { + "type": media_type, + "filename": filename, + "total_size": file_size, + "total_chunks": total_chunks, + "md5": md5_hex, + }, + }, timeout=15) + + if init_resp.get("errcode") != 0: + logger.error(f"[WecomBot] Upload init failed: {init_resp}") + return "" + + upload_id = init_resp.get("body", {}).get("upload_id") + if not upload_id: + logger.error("[WecomBot] Failed to get upload_id") + return "" + + # 2. Upload chunks + with open(file_path, "rb") as f: + for idx in range(total_chunks): + chunk = f.read(MEDIA_CHUNK_SIZE) + b64_data = base64.b64encode(chunk).decode("utf-8") + chunk_resp = self._send_and_wait({ + "cmd": "aibot_upload_media_chunk", + "headers": {"req_id": self._gen_req_id()}, + "body": { + "upload_id": upload_id, + "chunk_index": idx, + "base64_data": b64_data, + }, + }, timeout=30) + if chunk_resp.get("errcode") != 0: + logger.error(f"[WecomBot] Chunk {idx} upload failed: {chunk_resp}") + return "" + + # 3. Finish upload + finish_resp = self._send_and_wait({ + "cmd": "aibot_upload_media_finish", + "headers": {"req_id": self._gen_req_id()}, + "body": {"upload_id": upload_id}, + }, timeout=30) + + if finish_resp.get("errcode") != 0: + logger.error(f"[WecomBot] Upload finish failed: {finish_resp}") + return "" + + media_id = finish_resp.get("body", {}).get("media_id", "") + if media_id: + logger.info(f"[WecomBot] Media uploaded: media_id={media_id}") + else: + logger.error("[WecomBot] Failed to get media_id from finish response") + return media_id diff --git a/channel/wecom_bot/wecom_bot_message.py b/channel/wecom_bot/wecom_bot_message.py new file mode 100644 index 0000000..16b7dfd --- /dev/null +++ b/channel/wecom_bot/wecom_bot_message.py @@ -0,0 +1,216 @@ +import os +import re +import base64 +import requests + +from bridge.context import ContextType +from channel.chat_message import ChatMessage +from common.log import logger +from common.utils import expand_path +from config import conf +from Crypto.Cipher import AES + + +MAGIC_SIGNATURES = [ + (b"%PDF", ".pdf"), + (b"\x89PNG\r\n\x1a\n", ".png"), + (b"\xff\xd8\xff", ".jpg"), + (b"GIF87a", ".gif"), + (b"GIF89a", ".gif"), + (b"RIFF", ".webp"), # RIFF....WEBP, further checked below + (b"PK\x03\x04", ".zip"), # zip / docx / xlsx / pptx + (b"\x1f\x8b", ".gz"), + (b"Rar!\x1a\x07", ".rar"), + (b"7z\xbc\xaf\x27\x1c", ".7z"), + (b"\x00\x00\x00", ".mp4"), # ftyp box, further checked below + (b"#!AMR", ".amr"), +] + +OFFICE_ZIP_MARKERS = { + b"word/": ".docx", + b"xl/": ".xlsx", + b"ppt/": ".pptx", +} + + +def _guess_ext_from_bytes(data: bytes) -> str: + """Guess file extension from file content magic bytes.""" + if not data or len(data) < 8: + return "" + for sig, ext in MAGIC_SIGNATURES: + if data[:len(sig)] == sig: + if ext == ".webp" and data[8:12] != b"WEBP": + continue + if ext == ".mp4": + if b"ftyp" not in data[4:12]: + continue + if ext == ".zip": + for marker, office_ext in OFFICE_ZIP_MARKERS.items(): + if marker in data[:2000]: + return office_ext + return ".zip" + return ext + return "" + + +def _decrypt_media(url: str, aeskey: str) -> bytes: + """ + Download and decrypt AES-256-CBC encrypted media from wecom bot. + Returns decrypted bytes. + """ + resp = requests.get(url, timeout=30) + resp.raise_for_status() + encrypted = resp.content + + key = base64.b64decode(aeskey + "=" * (-len(aeskey) % 4)) + if len(key) != 32: + raise ValueError(f"Invalid AES key length: {len(key)}, expected 32") + + iv = key[:16] + cipher = AES.new(key, AES.MODE_CBC, iv) + decrypted = cipher.decrypt(encrypted) + + pad_len = decrypted[-1] + if pad_len > 32: + raise ValueError(f"Invalid PKCS7 padding length: {pad_len}") + return decrypted[:-pad_len] + + +def _get_tmp_dir() -> str: + """Return the workspace tmp directory (absolute path), creating it if needed.""" + ws_root = expand_path(conf().get("agent_workspace", "~/cow")) + tmp_dir = os.path.join(ws_root, "tmp") + os.makedirs(tmp_dir, exist_ok=True) + return tmp_dir + + +class WecomBotMessage(ChatMessage): + """Message wrapper for wecom bot (websocket long-connection mode).""" + + def __init__(self, msg_body: dict, is_group: bool = False): + super().__init__(msg_body) + self.msg_id = msg_body.get("msgid") + self.create_time = msg_body.get("create_time") + self.is_group = is_group + + msg_type = msg_body.get("msgtype") + from_userid = msg_body.get("from", {}).get("userid", "") + chat_id = msg_body.get("chatid", "") + bot_id = msg_body.get("aibotid", "") + + if msg_type == "text": + self.ctype = ContextType.TEXT + content = msg_body.get("text", {}).get("content", "") + if is_group: + content = re.sub(r"@\S+\s*", "", content).strip() + self.content = content + + elif msg_type == "voice": + self.ctype = ContextType.TEXT + self.content = msg_body.get("voice", {}).get("content", "") + + elif msg_type == "image": + self.ctype = ContextType.IMAGE + image_info = msg_body.get("image", {}) + image_url = image_info.get("url", "") + aeskey = image_info.get("aeskey", "") + tmp_dir = _get_tmp_dir() + image_path = os.path.join(tmp_dir, f"wecom_{self.msg_id}.png") + + try: + data = _decrypt_media(image_url, aeskey) + with open(image_path, "wb") as f: + f.write(data) + self.content = image_path + self.image_path = image_path + logger.info(f"[WecomBot] Image downloaded: {image_path}") + except Exception as e: + logger.error(f"[WecomBot] Failed to download image: {e}") + self.content = "[Image download failed]" + self.image_path = None + + elif msg_type == "mixed": + self.ctype = ContextType.TEXT + text_parts = [] + image_paths = [] + mixed_items = msg_body.get("mixed", {}).get("msg_item", []) + tmp_dir = _get_tmp_dir() + + for idx, item in enumerate(mixed_items): + item_type = item.get("msgtype") + if item_type == "text": + txt = item.get("text", {}).get("content", "") + if is_group: + txt = re.sub(r"@\S+\s*", "", txt).strip() + if txt: + text_parts.append(txt) + elif item_type == "image": + img_info = item.get("image", {}) + img_url = img_info.get("url", "") + img_aeskey = img_info.get("aeskey", "") + img_path = os.path.join(tmp_dir, f"wecom_{self.msg_id}_{idx}.png") + try: + img_data = _decrypt_media(img_url, img_aeskey) + with open(img_path, "wb") as f: + f.write(img_data) + image_paths.append(img_path) + except Exception as e: + logger.error(f"[WecomBot] Failed to download mixed image: {e}") + + content_parts = text_parts[:] + for p in image_paths: + content_parts.append(f"[图片: {p}]") + self.content = "\n".join(content_parts) if content_parts else "[Mixed message]" + + elif msg_type == "file": + self.ctype = ContextType.FILE + file_info = msg_body.get("file", {}) + file_url = file_info.get("url", "") + aeskey = file_info.get("aeskey", "") + tmp_dir = _get_tmp_dir() + base_path = os.path.join(tmp_dir, f"wecom_{self.msg_id}") + self.content = base_path + + def _download_file(): + try: + data = _decrypt_media(file_url, aeskey) + ext = _guess_ext_from_bytes(data) + final_path = base_path + ext + with open(final_path, "wb") as f: + f.write(data) + self.content = final_path + logger.info(f"[WecomBot] File downloaded: {final_path}") + except Exception as e: + logger.error(f"[WecomBot] Failed to download file: {e}") + self._prepare_fn = _download_file + + elif msg_type == "video": + self.ctype = ContextType.FILE + video_info = msg_body.get("video", {}) + video_url = video_info.get("url", "") + aeskey = video_info.get("aeskey", "") + tmp_dir = _get_tmp_dir() + self.content = os.path.join(tmp_dir, f"wecom_{self.msg_id}.mp4") + + def _download_video(): + try: + data = _decrypt_media(video_url, aeskey) + with open(self.content, "wb") as f: + f.write(data) + logger.info(f"[WecomBot] Video downloaded: {self.content}") + except Exception as e: + logger.error(f"[WecomBot] Failed to download video: {e}") + self._prepare_fn = _download_video + + else: + raise NotImplementedError(f"Unsupported message type: {msg_type}") + + self.from_user_id = from_userid + self.to_user_id = bot_id + if is_group: + self.other_user_id = chat_id + self.actual_user_id = from_userid + self.actual_user_nickname = from_userid + else: + self.other_user_id = from_userid + self.actual_user_id = from_userid diff --git a/common/const.py b/common/const.py index 3ac7ad7..507b246 100644 --- a/common/const.py +++ b/common/const.py @@ -186,3 +186,4 @@ MODEL_LIST = MODEL_LIST + GITEE_AI_MODEL_LIST + MODELSCOPE_MODEL_LIST # channel FEISHU = "feishu" DINGTALK = "dingtalk" +WECOM_BOT = "wecom_bot" diff --git a/config.py b/config.py index 0bba5ba..af2fd02 100644 --- a/config.py +++ b/config.py @@ -150,11 +150,14 @@ available_setting = { "dingtalk_client_id": "", # 钉钉机器人Client ID "dingtalk_client_secret": "", # 钉钉机器人Client Secret "dingtalk_card_enabled": False, + # 企微智能机器人配置(长连接模式) + "wecom_bot_id": "", # 企微智能机器人BotID + "wecom_bot_secret": "", # 企微智能机器人长连接Secret # chatgpt指令自定义触发词 "clear_memory_commands": ["#清除记忆"], # 重置会话指令,必须以#开头 # channel配置 - "channel_type": "", # 通道类型,支持多渠道同时运行。单个: "feishu",多个: "feishu, dingtalk" 或 ["feishu", "dingtalk"]。可选值: web,feishu,dingtalk,wechatmp,wechatmp_service,wechatcom_app + "channel_type": "", # 通道类型,支持多渠道同时运行。单个: "feishu",多个: "feishu, dingtalk" 或 ["feishu", "dingtalk"]。可选值: web,feishu,dingtalk,wecom_bot,wechatmp,wechatmp_service,wechatcom_app "web_console": True, # 是否自动启动Web控制台(默认启动)。设为False可禁用 "subscribe_msg": "", # 订阅消息, 支持: wechatmp, wechatmp_service, wechatcom_app "debug": False, # 是否开启debug模式,开启后会打印更多日志 diff --git a/requirements.txt b/requirements.txt index cccb8e7..32e2da3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,3 +22,6 @@ dashscope lark-oapi # dingtalk dingtalk_stream +# wecom bot websocket mode +websocket-client +pycryptodome