Merge pull request #2705 from zhayujie/feat-qq-channel

feat: add qq channel
This commit is contained in:
zhayujie
2026-03-17 17:26:39 +08:00
committed by GitHub
19 changed files with 1200 additions and 14 deletions

View File

@@ -7,7 +7,7 @@
[中文] | [<a href="docs/en/README.md">English</a>]
</p>
**CowAgent** 是基于大模型的超级AI助理能够主动思考和任务规划、操作计算机和外部资源、创造和执行Skills、拥有长期记忆并不断成长。CowAgent 支持灵活切换多种模型,能处理文本、语音、图片、文件等多模态消息,可接入网页、飞书、钉钉、企微智能机器人、企业微信应用、微信公众号中使用7*24小时运行于你的个人电脑或服务器中。
**CowAgent** 是基于大模型的超级AI助理能够主动思考和任务规划、操作计算机和外部资源、创造和执行Skills、拥有长期记忆并不断成长。CowAgent 支持灵活切换多种模型,能处理文本、语音、图片、文件等多模态消息,可接入网页、飞书、钉钉、企微智能机器人、QQ、企微自建应用、微信公众号中使用7*24小时运行于你的个人电脑或服务器中。
<p align="center">
<a href="https://cowagent.ai/">🌐 官网</a> &nbsp;·&nbsp;
@@ -143,7 +143,7 @@ pip3 install -r requirements-optional.txt
```bash
# config.json 文件内容示例
{
"channel_type": "web", # 接入渠道类型默认为web支持修改为:feishu,dingtalk,wecom_bot,wechatcom_app,wechatmp_service,wechatmp,terminal
"channel_type": "web", # 接入渠道类型默认为web支持修改为:feishu,dingtalk,wecom_bot,qq,wechatcom_app,wechatmp_service,wechatmp,terminal
"model": "MiniMax-M2.5", # 模型名称
"minimax_api_key": "", # MiniMax API Key
"zhipu_ai_api_key": "", # 智谱GLM API Key
@@ -702,7 +702,23 @@ API Key创建在 [控制台](https://aistudio.google.com/app/apikey?hl=zh-cn)
</details>
<details>
<summary>5. WeCom App - 企业微信应用</summary>
<summary>5. QQ - QQ 机器人</summary>
QQ 机器人使用 WebSocket 长连接模式,无需公网 IP 和域名,支持 QQ 单聊、群聊和频道消息:
```json
{
"channel_type": "qq",
"qq_app_id": "YOUR_APP_ID",
"qq_app_secret": "YOUR_APP_SECRET"
}
```
详细步骤和参数说明参考 [QQ 机器人接入](https://docs.cowagent.ai/channels/qq)
</details>
<details>
<summary>6. WeCom App - 企业微信应用</summary>
企业微信自建应用接入需在后台创建应用并启用消息回调,配置示例:
@@ -722,7 +738,7 @@ API Key创建在 [控制台](https://aistudio.google.com/app/apikey?hl=zh-cn)
</details>
<details>
<summary>6. WeChat MP - 微信公众号</summary>
<summary>7. WeChat MP - 微信公众号</summary>
本项目支持订阅号和服务号两种公众号,通过服务号(`wechatmp_service`)体验更佳。
@@ -757,7 +773,7 @@ API Key创建在 [控制台](https://aistudio.google.com/app/apikey?hl=zh-cn)
</details>
<details>
<summary>7. Terminal - 终端</summary>
<summary>8. Terminal - 终端</summary>
修改 `config.json` 中的 `channel_type` 字段:

View File

@@ -237,6 +237,8 @@ def _execute_send_message(task: dict, agent_bridge):
logger.warning(f"[Scheduler] Task {task['id']}: DingTalk single chat message missing sender_staff_id")
elif channel_type == "wecom_bot":
context["msg"] = None
elif channel_type == "qq":
context["msg"] = None
# Create reply
reply = Reply(ReplyType.TEXT, content)

1
app.py
View File

@@ -228,6 +228,7 @@ def _clear_singleton_cache(channel_name: str):
const.FEISHU: "channel.feishu.feishu_channel.FeiShuChanel",
const.DINGTALK: "channel.dingtalk.dingtalk_channel.DingTalkChanel",
const.WECOM_BOT: "channel.wecom_bot.wecom_bot_channel.WecomBotChannel",
const.QQ: "channel.qq.qq_channel.QQChannel",
}
module_path = cls_map.get(channel_name)
if not module_path:

View File

@@ -36,6 +36,9 @@ def create_channel(channel_type) -> Channel:
elif channel_type == const.WECOM_BOT:
from channel.wecom_bot.wecom_bot_channel import WecomBotChannel
ch = WecomBotChannel()
elif channel_type == const.QQ:
from channel.qq.qq_channel import QQChannel
ch = QQChannel()
else:
raise RuntimeError
ch.channel_type = channel_type

0
channel/qq/__init__.py Normal file
View File

735
channel/qq/qq_channel.py Normal file
View File

@@ -0,0 +1,735 @@
"""
QQ Bot channel via WebSocket long connection.
Supports:
- Group chat (@bot), single chat (C2C), guild channel, guild DM
- Text / image / file message send & receive
- Heartbeat keep-alive and auto-reconnect with session resume
"""
import base64
import json
import os
import threading
import time
import requests
import websocket
from bridge.context import Context, ContextType
from bridge.reply import Reply, ReplyType
from channel.chat_channel import ChatChannel, check_prefix
from channel.qq.qq_message import QQMessage
from common.expired_dict import ExpiredDict
from common.log import logger
from common.singleton import singleton
from config import conf
# Rich media file_type constants
QQ_FILE_TYPE_IMAGE = 1
QQ_FILE_TYPE_VIDEO = 2
QQ_FILE_TYPE_VOICE = 3
QQ_FILE_TYPE_FILE = 4
QQ_API_BASE = "https://api.sgroup.qq.com"
# Intents: GROUP_AND_C2C_EVENT(1<<25) | PUBLIC_GUILD_MESSAGES(1<<30)
DEFAULT_INTENTS = (1 << 25) | (1 << 30)
# OpCode constants
OP_DISPATCH = 0
OP_HEARTBEAT = 1
OP_IDENTIFY = 2
OP_RESUME = 6
OP_RECONNECT = 7
OP_INVALID_SESSION = 9
OP_HELLO = 10
OP_HEARTBEAT_ACK = 11
# Resumable error codes
RESUMABLE_CLOSE_CODES = {4008, 4009}
@singleton
class QQChannel(ChatChannel):
def __init__(self):
super().__init__()
self.app_id = ""
self.app_secret = ""
self._access_token = ""
self._token_expires_at = 0
self._ws = None
self._ws_thread = None
self._heartbeat_thread = None
self._connected = False
self._stop_event = threading.Event()
self._token_lock = threading.Lock()
self._session_id = None
self._last_seq = None
self._heartbeat_interval = 45000
self._can_resume = False
self.received_msgs = ExpiredDict(60 * 60 * 7.1)
self._msg_seq_counter = {}
conf()["group_name_white_list"] = ["ALL_GROUP"]
conf()["single_chat_prefix"] = [""]
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def startup(self):
self.app_id = conf().get("qq_app_id", "")
self.app_secret = conf().get("qq_app_secret", "")
if not self.app_id or not self.app_secret:
err = "[QQ] qq_app_id and qq_app_secret are required"
logger.error(err)
self.report_startup_error(err)
return
self._refresh_access_token()
if not self._access_token:
err = "[QQ] Failed to get initial access_token"
logger.error(err)
self.report_startup_error(err)
return
self._stop_event.clear()
self._start_ws()
def stop(self):
logger.info("[QQ] stop() called")
self._stop_event.set()
if self._ws:
try:
self._ws.close()
except Exception:
pass
self._ws = None
self._connected = False
# ------------------------------------------------------------------
# Access Token
# ------------------------------------------------------------------
def _refresh_access_token(self):
try:
resp = requests.post(
"https://bots.qq.com/app/getAppAccessToken",
json={"appId": self.app_id, "clientSecret": self.app_secret},
timeout=10,
)
resp.raise_for_status()
data = resp.json()
self._access_token = data.get("access_token", "")
expires_in = int(data.get("expires_in", 7200))
self._token_expires_at = time.time() + expires_in - 60
logger.debug(f"[QQ] Access token refreshed, expires_in={expires_in}s")
except Exception as e:
logger.error(f"[QQ] Failed to refresh access_token: {e}")
def _get_access_token(self) -> str:
with self._token_lock:
if time.time() >= self._token_expires_at:
self._refresh_access_token()
return self._access_token
def _get_auth_headers(self) -> dict:
return {
"Authorization": f"QQBot {self._get_access_token()}",
"Content-Type": "application/json",
}
# ------------------------------------------------------------------
# WebSocket connection
# ------------------------------------------------------------------
def _get_ws_url(self) -> str:
try:
resp = requests.get(
f"{QQ_API_BASE}/gateway",
headers=self._get_auth_headers(),
timeout=10,
)
resp.raise_for_status()
url = resp.json().get("url", "")
logger.debug(f"[QQ] Gateway URL: {url}")
return url
except Exception as e:
logger.error(f"[QQ] Failed to get gateway URL: {e}")
return ""
def _start_ws(self):
ws_url = self._get_ws_url()
if not ws_url:
logger.error("[QQ] Cannot start WebSocket without gateway URL")
self.report_startup_error("Failed to get gateway URL")
return
def _on_open(ws):
logger.debug("[QQ] WebSocket connected, waiting for Hello...")
def _on_message(ws, raw):
try:
data = json.loads(raw)
self._handle_ws_message(data)
except Exception as e:
logger.error(f"[QQ] Failed to handle ws message: {e}", exc_info=True)
def _on_error(ws, error):
logger.error(f"[QQ] WebSocket error: {error}")
def _on_close(ws, close_status_code, close_msg):
logger.warning(f"[QQ] WebSocket closed: status={close_status_code}, msg={close_msg}")
self._connected = False
if not self._stop_event.is_set():
if close_status_code in RESUMABLE_CLOSE_CODES and self._session_id:
self._can_resume = True
logger.info("[QQ] Will attempt resume in 3s...")
time.sleep(3)
else:
self._can_resume = False
logger.info("[QQ] Will reconnect in 5s...")
time.sleep(5)
if not self._stop_event.is_set():
self._start_ws()
self._ws = websocket.WebSocketApp(
ws_url,
on_open=_on_open,
on_message=_on_message,
on_error=_on_error,
on_close=_on_close,
)
def run_forever():
try:
self._ws.run_forever(ping_interval=0, reconnect=0)
except (SystemExit, KeyboardInterrupt):
logger.info("[QQ] WebSocket thread interrupted")
except Exception as e:
logger.error(f"[QQ] WebSocket run_forever error: {e}")
self._ws_thread = threading.Thread(target=run_forever, daemon=True)
self._ws_thread.start()
self._ws_thread.join()
def _ws_send(self, data: dict):
if self._ws:
self._ws.send(json.dumps(data, ensure_ascii=False))
# ------------------------------------------------------------------
# Identify & Resume & Heartbeat
# ------------------------------------------------------------------
def _send_identify(self):
self._ws_send({
"op": OP_IDENTIFY,
"d": {
"token": f"QQBot {self._get_access_token()}",
"intents": DEFAULT_INTENTS,
"shard": [0, 1],
"properties": {
"$os": "linux",
"$browser": "chatgpt-on-wechat",
"$device": "chatgpt-on-wechat",
},
},
})
logger.debug(f"[QQ] Identify sent with intents={DEFAULT_INTENTS}")
def _send_resume(self):
self._ws_send({
"op": OP_RESUME,
"d": {
"token": f"QQBot {self._get_access_token()}",
"session_id": self._session_id,
"seq": self._last_seq,
},
})
logger.debug(f"[QQ] Resume sent: session_id={self._session_id}, seq={self._last_seq}")
def _start_heartbeat(self, interval_ms: int):
if self._heartbeat_thread and self._heartbeat_thread.is_alive():
return
self._heartbeat_interval = interval_ms
interval_sec = interval_ms / 1000.0
def heartbeat_loop():
while not self._stop_event.is_set() and self._connected:
try:
self._ws_send({
"op": OP_HEARTBEAT,
"d": self._last_seq,
})
except Exception as e:
logger.warning(f"[QQ] Heartbeat send failed: {e}")
break
self._stop_event.wait(interval_sec)
self._heartbeat_thread = threading.Thread(target=heartbeat_loop, daemon=True)
self._heartbeat_thread.start()
# ------------------------------------------------------------------
# Incoming message dispatch
# ------------------------------------------------------------------
def _handle_ws_message(self, data: dict):
op = data.get("op")
d = data.get("d")
t = data.get("t")
s = data.get("s")
if s is not None:
self._last_seq = s
if op == OP_HELLO:
heartbeat_interval = d.get("heartbeat_interval", 45000) if d else 45000
logger.debug(f"[QQ] Received Hello, heartbeat_interval={heartbeat_interval}ms")
self._heartbeat_interval = heartbeat_interval
if self._can_resume and self._session_id:
self._send_resume()
else:
self._send_identify()
elif op == OP_HEARTBEAT_ACK:
logger.debug("[QQ] Heartbeat ACK received")
elif op == OP_HEARTBEAT:
self._ws_send({"op": OP_HEARTBEAT, "d": self._last_seq})
elif op == OP_RECONNECT:
logger.warning("[QQ] Server requested reconnect")
self._can_resume = True
if self._ws:
self._ws.close()
elif op == OP_INVALID_SESSION:
logger.warning("[QQ] Invalid session, re-identifying...")
self._session_id = None
self._can_resume = False
time.sleep(2)
self._send_identify()
elif op == OP_DISPATCH:
if t == "READY":
self._session_id = d.get("session_id", "")
user = d.get("user", {})
bot_name = user.get('username', '')
logger.info(f"[QQ] ✅ Connected successfully (bot={bot_name})")
self._connected = True
self._can_resume = False
self._start_heartbeat(self._heartbeat_interval)
self.report_startup_success()
elif t == "RESUMED":
logger.info("[QQ] Session resumed successfully")
self._connected = True
self._can_resume = False
self._start_heartbeat(self._heartbeat_interval)
elif t in ("GROUP_AT_MESSAGE_CREATE", "C2C_MESSAGE_CREATE",
"AT_MESSAGE_CREATE", "DIRECT_MESSAGE_CREATE"):
self._handle_msg_event(d, t)
elif t in ("GROUP_ADD_ROBOT", "FRIEND_ADD"):
logger.info(f"[QQ] Event: {t}")
else:
logger.debug(f"[QQ] Dispatch event: {t}")
# ------------------------------------------------------------------
# Message event handling
# ------------------------------------------------------------------
def _handle_msg_event(self, event_data: dict, event_type: str):
msg_id = event_data.get("id", "")
if self.received_msgs.get(msg_id):
logger.debug(f"[QQ] Duplicate msg filtered: {msg_id}")
return
self.received_msgs[msg_id] = True
try:
qq_msg = QQMessage(event_data, event_type)
except NotImplementedError as e:
logger.warning(f"[QQ] {e}")
return
except Exception as e:
logger.error(f"[QQ] Failed to parse message: {e}", exc_info=True)
return
is_group = qq_msg.is_group
from channel.file_cache import get_file_cache
file_cache = get_file_cache()
if is_group:
session_id = qq_msg.other_user_id
else:
session_id = qq_msg.from_user_id
if qq_msg.ctype == ContextType.IMAGE:
if hasattr(qq_msg, "image_path") and qq_msg.image_path:
file_cache.add(session_id, qq_msg.image_path, file_type="image")
logger.info(f"[QQ] Image cached for session {session_id}")
return
if qq_msg.ctype == ContextType.TEXT:
cached_files = file_cache.get(session_id)
if cached_files:
file_refs = []
for fi in cached_files:
ftype = fi["type"]
fpath = fi["path"]
if ftype == "image":
file_refs.append(f"[图片: {fpath}]")
elif ftype == "video":
file_refs.append(f"[视频: {fpath}]")
else:
file_refs.append(f"[文件: {fpath}]")
qq_msg.content = qq_msg.content + "\n" + "\n".join(file_refs)
logger.info(f"[QQ] Attached {len(cached_files)} cached file(s)")
file_cache.clear(session_id)
context = self._compose_context(
qq_msg.ctype,
qq_msg.content,
isgroup=is_group,
msg=qq_msg,
no_need_at=True,
)
if context:
self.produce(context)
# ------------------------------------------------------------------
# _compose_context
# ------------------------------------------------------------------
def _compose_context(self, ctype: ContextType, content, **kwargs):
context = Context(ctype, content)
context.kwargs = kwargs
if "channel_type" not in context:
context["channel_type"] = self.channel_type
if "origin_ctype" not in context:
context["origin_ctype"] = ctype
cmsg = context["msg"]
if cmsg.is_group:
context["session_id"] = cmsg.other_user_id
else:
context["session_id"] = cmsg.from_user_id
context["receiver"] = cmsg.other_user_id
if ctype == ContextType.TEXT:
img_match_prefix = check_prefix(content, conf().get("image_create_prefix"))
if img_match_prefix:
content = content.replace(img_match_prefix, "", 1)
context.type = ContextType.IMAGE_CREATE
else:
context.type = ContextType.TEXT
context.content = content.strip()
return context
# ------------------------------------------------------------------
# Send reply
# ------------------------------------------------------------------
def send(self, reply: Reply, context: Context):
msg = context.get("msg")
is_group = context.get("isgroup", False)
receiver = context.get("receiver", "")
if not msg:
# Active send (e.g. scheduled tasks), no original message to reply to
self._active_send_text(reply.content if reply.type == ReplyType.TEXT else str(reply.content),
receiver, is_group)
return
event_type = getattr(msg, "event_type", "")
msg_id = getattr(msg, "msg_id", "")
if reply.type == ReplyType.TEXT:
self._send_text(reply.content, msg, event_type, msg_id)
elif reply.type in (ReplyType.IMAGE_URL, ReplyType.IMAGE):
self._send_image(reply.content, msg, event_type, msg_id)
elif reply.type == ReplyType.FILE:
if hasattr(reply, "text_content") and reply.text_content:
self._send_text(reply.text_content, msg, event_type, msg_id)
time.sleep(0.3)
self._send_file(reply.content, msg, event_type, msg_id)
elif reply.type in (ReplyType.VIDEO, ReplyType.VIDEO_URL):
self._send_media(reply.content, msg, event_type, msg_id, QQ_FILE_TYPE_VIDEO)
else:
logger.warning(f"[QQ] Unsupported reply type: {reply.type}, falling back to text")
self._send_text(str(reply.content), msg, event_type, msg_id)
# ------------------------------------------------------------------
# Send helpers
# ------------------------------------------------------------------
def _get_next_msg_seq(self, msg_id: str) -> int:
seq = self._msg_seq_counter.get(msg_id, 1)
self._msg_seq_counter[msg_id] = seq + 1
return seq
def _build_msg_url_and_base_body(self, msg: QQMessage, event_type: str, msg_id: str):
"""Build the API URL and base body dict for sending a message."""
if event_type == "GROUP_AT_MESSAGE_CREATE":
group_openid = msg._rawmsg.get("group_openid", "")
url = f"{QQ_API_BASE}/v2/groups/{group_openid}/messages"
body = {
"msg_id": msg_id,
"msg_seq": self._get_next_msg_seq(msg_id),
}
return url, body, "group", group_openid
elif event_type == "C2C_MESSAGE_CREATE":
user_openid = msg._rawmsg.get("author", {}).get("user_openid", "") or msg.from_user_id
url = f"{QQ_API_BASE}/v2/users/{user_openid}/messages"
body = {
"msg_id": msg_id,
"msg_seq": self._get_next_msg_seq(msg_id),
}
return url, body, "c2c", user_openid
elif event_type == "AT_MESSAGE_CREATE":
channel_id = msg._rawmsg.get("channel_id", "")
url = f"{QQ_API_BASE}/channels/{channel_id}/messages"
body = {"msg_id": msg_id}
return url, body, "channel", channel_id
elif event_type == "DIRECT_MESSAGE_CREATE":
guild_id = msg._rawmsg.get("guild_id", "")
url = f"{QQ_API_BASE}/dms/{guild_id}/messages"
body = {"msg_id": msg_id}
return url, body, "dm", guild_id
return None, None, None, None
def _post_message(self, url: str, body: dict, event_type: str):
try:
resp = requests.post(url, json=body, headers=self._get_auth_headers(), timeout=10)
if resp.status_code in (200, 201, 202, 204):
logger.info(f"[QQ] Message sent successfully: event_type={event_type}")
else:
logger.error(f"[QQ] Failed to send message: status={resp.status_code}, "
f"body={resp.text}")
except Exception as e:
logger.error(f"[QQ] Send message error: {e}")
# ------------------------------------------------------------------
# Active send (no original message, e.g. scheduled tasks)
# ------------------------------------------------------------------
def _active_send_text(self, content: str, receiver: str, is_group: bool):
"""Send text without an original message (active push). QQ limits active messages to 4/month per user."""
if not receiver:
logger.warning("[QQ] No receiver for active send")
return
if is_group:
url = f"{QQ_API_BASE}/v2/groups/{receiver}/messages"
else:
url = f"{QQ_API_BASE}/v2/users/{receiver}/messages"
body = {
"content": content,
"msg_type": 0,
}
event_label = "GROUP_ACTIVE" if is_group else "C2C_ACTIVE"
self._post_message(url, body, event_label)
# ------------------------------------------------------------------
# Send text
# ------------------------------------------------------------------
def _send_text(self, content: str, msg: QQMessage, event_type: str, msg_id: str):
url, body, _, _ = self._build_msg_url_and_base_body(msg, event_type, msg_id)
if not url:
logger.warning(f"[QQ] Cannot send reply for event_type: {event_type}")
return
body["content"] = content
body["msg_type"] = 0
self._post_message(url, body, event_type)
# ------------------------------------------------------------------
# Rich media upload & send (image / video / file)
# ------------------------------------------------------------------
def _upload_rich_media(self, file_url: str, file_type: int, msg: QQMessage,
event_type: str) -> str:
"""
Upload media via QQ rich media API and return file_info.
For group: POST /v2/groups/{group_openid}/files
For c2c: POST /v2/users/{openid}/files
"""
if event_type == "GROUP_AT_MESSAGE_CREATE":
group_openid = msg._rawmsg.get("group_openid", "")
upload_url = f"{QQ_API_BASE}/v2/groups/{group_openid}/files"
elif event_type == "C2C_MESSAGE_CREATE":
user_openid = (msg._rawmsg.get("author", {}).get("user_openid", "")
or msg.from_user_id)
upload_url = f"{QQ_API_BASE}/v2/users/{user_openid}/files"
else:
logger.warning(f"[QQ] Rich media upload not supported for event_type: {event_type}")
return ""
upload_body = {
"file_type": file_type,
"url": file_url,
"srv_send_msg": False,
}
try:
resp = requests.post(
upload_url, json=upload_body,
headers=self._get_auth_headers(), timeout=30,
)
if resp.status_code in (200, 201):
data = resp.json()
file_info = data.get("file_info", "")
logger.info(f"[QQ] Rich media uploaded: file_type={file_type}, "
f"file_uuid={data.get('file_uuid', '')}")
return file_info
else:
logger.error(f"[QQ] Rich media upload failed: status={resp.status_code}, "
f"body={resp.text}")
return ""
except Exception as e:
logger.error(f"[QQ] Rich media upload error: {e}")
return ""
def _upload_rich_media_base64(self, file_path: str, file_type: int, msg: QQMessage,
event_type: str) -> str:
"""Upload local file via base64 file_data field."""
if event_type == "GROUP_AT_MESSAGE_CREATE":
group_openid = msg._rawmsg.get("group_openid", "")
upload_url = f"{QQ_API_BASE}/v2/groups/{group_openid}/files"
elif event_type == "C2C_MESSAGE_CREATE":
user_openid = (msg._rawmsg.get("author", {}).get("user_openid", "")
or msg.from_user_id)
upload_url = f"{QQ_API_BASE}/v2/users/{user_openid}/files"
else:
logger.warning(f"[QQ] Rich media upload not supported for event_type: {event_type}")
return ""
try:
with open(file_path, "rb") as f:
file_data = base64.b64encode(f.read()).decode("utf-8")
except Exception as e:
logger.error(f"[QQ] Failed to read file for upload: {e}")
return ""
upload_body = {
"file_type": file_type,
"file_data": file_data,
"srv_send_msg": False,
}
try:
resp = requests.post(
upload_url, json=upload_body,
headers=self._get_auth_headers(), timeout=30,
)
if resp.status_code in (200, 201):
data = resp.json()
file_info = data.get("file_info", "")
logger.info(f"[QQ] Rich media uploaded (base64): file_type={file_type}, "
f"file_uuid={data.get('file_uuid', '')}")
return file_info
else:
logger.error(f"[QQ] Rich media upload (base64) failed: status={resp.status_code}, "
f"body={resp.text}")
return ""
except Exception as e:
logger.error(f"[QQ] Rich media upload (base64) error: {e}")
return ""
def _send_media_msg(self, file_info: str, msg: QQMessage, event_type: str, msg_id: str):
"""Send a message with msg_type=7 (rich media) using file_info."""
url, body, _, _ = self._build_msg_url_and_base_body(msg, event_type, msg_id)
if not url:
return
body["msg_type"] = 7
body["media"] = {"file_info": file_info}
self._post_message(url, body, event_type)
def _send_image(self, img_path_or_url: str, msg: QQMessage, event_type: str, msg_id: str):
"""Send image reply. Supports URL and local file path."""
if event_type not in ("GROUP_AT_MESSAGE_CREATE", "C2C_MESSAGE_CREATE"):
self._send_text(str(img_path_or_url), msg, event_type, msg_id)
return
if img_path_or_url.startswith("file://"):
img_path_or_url = img_path_or_url[7:]
if img_path_or_url.startswith(("http://", "https://")):
file_info = self._upload_rich_media(
img_path_or_url, QQ_FILE_TYPE_IMAGE, msg, event_type)
elif os.path.exists(img_path_or_url):
file_info = self._upload_rich_media_base64(
img_path_or_url, QQ_FILE_TYPE_IMAGE, msg, event_type)
else:
logger.error(f"[QQ] Image not found: {img_path_or_url}")
self._send_text("[Image send failed]", msg, event_type, msg_id)
return
if file_info:
self._send_media_msg(file_info, msg, event_type, msg_id)
else:
self._send_text("[Image upload failed]", msg, event_type, msg_id)
def _send_file(self, file_path_or_url: str, msg: QQMessage, event_type: str, msg_id: str):
"""Send file reply."""
if event_type not in ("GROUP_AT_MESSAGE_CREATE", "C2C_MESSAGE_CREATE"):
self._send_text(str(file_path_or_url), msg, event_type, msg_id)
return
if file_path_or_url.startswith("file://"):
file_path_or_url = file_path_or_url[7:]
if file_path_or_url.startswith(("http://", "https://")):
file_info = self._upload_rich_media(
file_path_or_url, QQ_FILE_TYPE_FILE, msg, event_type)
elif os.path.exists(file_path_or_url):
file_info = self._upload_rich_media_base64(
file_path_or_url, QQ_FILE_TYPE_FILE, msg, event_type)
else:
logger.error(f"[QQ] File not found: {file_path_or_url}")
self._send_text("[File send failed]", msg, event_type, msg_id)
return
if file_info:
self._send_media_msg(file_info, msg, event_type, msg_id)
else:
self._send_text("[File upload failed]", msg, event_type, msg_id)
def _send_media(self, path_or_url: str, msg: QQMessage, event_type: str,
msg_id: str, file_type: int):
"""Generic media send for video/voice etc."""
if event_type not in ("GROUP_AT_MESSAGE_CREATE", "C2C_MESSAGE_CREATE"):
self._send_text(str(path_or_url), msg, event_type, msg_id)
return
if path_or_url.startswith("file://"):
path_or_url = path_or_url[7:]
if path_or_url.startswith(("http://", "https://")):
file_info = self._upload_rich_media(path_or_url, file_type, msg, event_type)
elif os.path.exists(path_or_url):
file_info = self._upload_rich_media_base64(path_or_url, file_type, msg, event_type)
else:
logger.error(f"[QQ] Media not found: {path_or_url}")
return
if file_info:
self._send_media_msg(file_info, msg, event_type, msg_id)
else:
logger.error(f"[QQ] Media upload failed: {path_or_url}")

123
channel/qq/qq_message.py Normal file
View File

@@ -0,0 +1,123 @@
import os
import requests
from bridge.context import ContextType
from channel.chat_message import ChatMessage
from common.log import logger
from common.utils import expand_path
from config import conf
def _get_tmp_dir() -> str:
"""Return the workspace tmp directory (absolute path), creating it if needed."""
ws_root = expand_path(conf().get("agent_workspace", "~/cow"))
tmp_dir = os.path.join(ws_root, "tmp")
os.makedirs(tmp_dir, exist_ok=True)
return tmp_dir
class QQMessage(ChatMessage):
"""Message wrapper for QQ Bot (websocket long-connection mode)."""
def __init__(self, event_data: dict, event_type: str):
super().__init__(event_data)
self.msg_id = event_data.get("id", "")
self.create_time = event_data.get("timestamp", "")
self.is_group = event_type in ("GROUP_AT_MESSAGE_CREATE",)
self.event_type = event_type
author = event_data.get("author", {})
from_user_id = author.get("member_openid", "") or author.get("id", "")
group_openid = event_data.get("group_openid", "")
content = event_data.get("content", "").strip()
attachments = event_data.get("attachments", [])
has_image = any(
a.get("content_type", "").startswith("image/") for a in attachments
) if attachments else False
if has_image and not content:
self.ctype = ContextType.IMAGE
img_attachment = next(
a for a in attachments if a.get("content_type", "").startswith("image/")
)
img_url = img_attachment.get("url", "")
if img_url and not img_url.startswith("http"):
img_url = "https://" + img_url
tmp_dir = _get_tmp_dir()
image_path = os.path.join(tmp_dir, f"qq_{self.msg_id}.png")
try:
resp = requests.get(img_url, timeout=30)
resp.raise_for_status()
with open(image_path, "wb") as f:
f.write(resp.content)
self.content = image_path
self.image_path = image_path
logger.info(f"[QQ] Image downloaded: {image_path}")
except Exception as e:
logger.error(f"[QQ] Failed to download image: {e}")
self.content = "[Image download failed]"
self.image_path = None
elif has_image and content:
self.ctype = ContextType.TEXT
image_paths = []
tmp_dir = _get_tmp_dir()
for idx, att in enumerate(attachments):
if not att.get("content_type", "").startswith("image/"):
continue
img_url = att.get("url", "")
if img_url and not img_url.startswith("http"):
img_url = "https://" + img_url
img_path = os.path.join(tmp_dir, f"qq_{self.msg_id}_{idx}.png")
try:
resp = requests.get(img_url, timeout=30)
resp.raise_for_status()
with open(img_path, "wb") as f:
f.write(resp.content)
image_paths.append(img_path)
except Exception as e:
logger.error(f"[QQ] Failed to download mixed image: {e}")
content_parts = [content]
for p in image_paths:
content_parts.append(f"[图片: {p}]")
self.content = "\n".join(content_parts)
else:
self.ctype = ContextType.TEXT
self.content = content
if event_type == "GROUP_AT_MESSAGE_CREATE":
self.from_user_id = from_user_id
self.to_user_id = ""
self.other_user_id = group_openid
self.actual_user_id = from_user_id
self.actual_user_nickname = from_user_id
elif event_type == "C2C_MESSAGE_CREATE":
user_openid = author.get("user_openid", "") or from_user_id
self.from_user_id = user_openid
self.to_user_id = ""
self.other_user_id = user_openid
self.actual_user_id = user_openid
elif event_type == "AT_MESSAGE_CREATE":
self.from_user_id = from_user_id
self.to_user_id = ""
channel_id = event_data.get("channel_id", "")
self.other_user_id = channel_id
self.actual_user_id = from_user_id
self.actual_user_nickname = author.get("username", from_user_id)
elif event_type == "DIRECT_MESSAGE_CREATE":
self.from_user_id = from_user_id
self.to_user_id = ""
guild_id = event_data.get("guild_id", "")
self.other_user_id = f"dm_{guild_id}_{from_user_id}"
self.actual_user_id = from_user_id
self.actual_user_nickname = author.get("username", from_user_id)
else:
raise NotImplementedError(f"Unsupported QQ event type: {event_type}")
logger.debug(f"[QQ] Message parsed: type={event_type}, ctype={self.ctype}, "
f"from={self.from_user_id}, content_len={len(self.content)}")

View File

@@ -615,6 +615,15 @@ class ChannelsHandler:
{"key": "wecom_bot_secret", "label": "Secret", "type": "secret"},
],
}),
("qq", {
"label": {"zh": "QQ 机器人", "en": "QQ Bot"},
"icon": "fa-comment",
"color": "blue",
"fields": [
{"key": "qq_app_id", "label": "App ID", "type": "text"},
{"key": "qq_app_secret", "label": "App Secret", "type": "secret"},
],
}),
("wechatcom_app", {
"label": {"zh": "企微自建应用", "en": "WeCom App"},
"icon": "fa-building",

View File

@@ -26,6 +26,8 @@ CHANNEL_ACTIONS = {"channel_create", "channel_update", "channel_delete"}
CREDENTIAL_MAP = {
"feishu": ("feishu_app_id", "feishu_app_secret"),
"dingtalk": ("dingtalk_client_id", "dingtalk_client_secret"),
"wecom_bot": ("wecom_bot_id", "wecom_bot_secret"),
"qq": ("qq_app_id", "qq_app_secret"),
"wechatmp": ("wechatmp_app_id", "wechatmp_app_secret"),
"wechatmp_service": ("wechatmp_app_id", "wechatmp_app_secret"),
"wechatcom_app": ("wechatcomapp_agent_id", "wechatcomapp_secret"),
@@ -669,6 +671,12 @@ def _build_config():
elif current_channel_type in ("wechatmp", "wechatmp_service"):
config["app_id"] = local_conf.get("wechatmp_app_id")
config["app_secret"] = local_conf.get("wechatmp_app_secret")
elif current_channel_type == "wecom_bot":
config["app_id"] = local_conf.get("wecom_bot_id")
config["app_secret"] = local_conf.get("wecom_bot_secret")
elif current_channel_type == "qq":
config["app_id"] = local_conf.get("qq_app_id")
config["app_secret"] = local_conf.get("qq_app_secret")
elif current_channel_type == "wechatcom_app":
config["app_id"] = local_conf.get("wechatcomapp_agent_id")
config["app_secret"] = local_conf.get("wechatcomapp_secret")

View File

@@ -187,3 +187,4 @@ MODEL_LIST = MODEL_LIST + GITEE_AI_MODEL_LIST + MODELSCOPE_MODEL_LIST
FEISHU = "feishu"
DINGTALK = "dingtalk"
WECOM_BOT = "wecom_bot"
QQ = "qq"

View File

@@ -381,6 +381,8 @@ def load_config():
"wechatmp_app_secret": "WECHATMP_APP_SECRET",
"wechatcomapp_agent_id": "WECHATCOMAPP_AGENT_ID",
"wechatcomapp_secret": "WECHATCOMAPP_SECRET",
"qq_app_id": "QQ_APP_ID",
"qq_app_secret": "QQ_APP_SECRET"
}
injected = 0
for conf_key, env_key in _CONFIG_TO_ENV.items():

View File

@@ -179,5 +179,7 @@ Agent支持在多种渠道中使用只需修改 `config.json` 中的 `channel
- **飞书接入**[飞书接入文档](https://docs.link-ai.tech/cow/multi-platform/feishu)
- **钉钉接入**[钉钉接入文档](https://docs.link-ai.tech/cow/multi-platform/dingtalk)
- **企业微信应用接入**[企微应用文档](https://docs.link-ai.tech/cow/multi-platform/wechat-com)
- **企微智能机器人**[企微智能机器人文档](https://docs.link-ai.tech/cow/multi-platform/wecom-bot)
- **QQ机器人**[QQ机器人文档](https://docs.link-ai.tech/cow/multi-platform/qq)
更多渠道配置参考:[通道说明](../README.md#通道说明)

88
docs/channels/qq.mdx Normal file
View File

@@ -0,0 +1,88 @@
---
title: QQ 机器人
description: 将 CowAgent 接入 QQ 机器人WebSocket 长连接模式)
---
> 通过 QQ 开放平台的机器人接口接入 CowAgent支持 QQ 单聊、QQ 群聊(@机器人)、频道消息和频道私信,无需公网 IP使用 WebSocket 长连接模式。
<Note>
QQ 机器人通过 QQ 开放平台创建,使用 WebSocket 长连接接收消息,通过 OpenAPI 发送消息,无需公网 IP 和域名。
</Note>
## 一、创建 QQ 机器人
> 进入[QQ 开放平台](https://q.qq.com)QQ扫码登录如果未注册开放平台账号请先完成[账号注册](https://q.qq.com/#/register)。
1.在 [QQ开放平台-机器人列表页](https://q.qq.com/#/apps),点击创建机器人:
<img src="https://cdn.link-ai.tech/doc/20260317162900.png" width="800"/>
2.填写机器人名称、头像等基本信息,完成创建:
<img src="https://cdn.link-ai.tech/doc/20260317163005.png" width="800"/>
3.点击进入机器人配置页面,选择**开发管理**菜单,完成以下步骤:
- 复制并记录 **AppID**机器人ID
- 生成并记录 **AppSecret**(机器人秘钥)
<img src="https://cdn.link-ai.tech/doc/20260317164955.png" width="800"/>
## 二、配置和运行
### 方式一Web 控制台接入
启动 Cow项目后打开 Web 控制台 (本地链接为: http://127.0.0.1:9899/ ),选择 **通道** 菜单,点击 **接入通道**,选择 **QQ 机器人**,填写上一步保存的 AppID 和 AppSecret点击接入即可。
<img src="https://cdn.link-ai.tech/doc/20260317165425.png" width="800"/>
### 方式二:配置文件接入
在 `config.json` 中添加以下配置:
```json
{
"channel_type": "qq",
"qq_app_id": "YOUR_APP_ID",
"qq_app_secret": "YOUR_APP_SECRET"
}
```
| 参数 | 说明 |
| --- | --- |
| `qq_app_id` | QQ 机器人的 AppID在开放平台开发管理中获取 |
| `qq_app_secret` | QQ 机器人的 AppSecret在开放平台开发管理中获取 |
配置完成后启动程序,日志显示 `[QQ] ✅ Connected successfully` 即表示连接成功。
## 三、使用
在 QQ开放平台 - 管理 - **使用范围和人员** 菜单中使用QQ客户端扫描 "添加到群和消息列表" 的二维码即可开始与QQ机器人的聊天
<img src="https://cdn.link-ai.tech/doc/20260317165947.png" width="800"/>
对话效果:
<img src="https://cdn.link-ai.tech/doc/20260317171508.png" width="800"/>
## 四、功能说明
> 注意若需在群聊及频道中使用QQ机器人需完成发布上架审核并在使用范围配置权限使用范围。
| 功能 | 支持情况 |
| --- | --- |
| QQ 单聊 | ✅ |
| QQ 群聊(@机器人) | ✅ |
| 频道消息(@机器人) | ✅ |
| 频道私信 | ✅ |
| 文本消息 | ✅ 收发 |
| 图片消息 | ✅ 收发(群聊和单聊) |
| 文件消息 | ✅ 发送(群聊和单聊) |
| 定时任务 | ✅ 主动推送(每月每用户限 4 条) |
## 五、注意事项
- **被动消息限制**QQ 单聊被动消息有效期为 60 分钟,每条消息最多回复 5 次QQ 群聊被动消息有效期为 5 分钟。
- **主动消息限制**:单聊和群聊每月主动消息上限为 4 条,在使用定时任务功能时需要注意这个限制
- **事件权限**:默认订阅 `GROUP_AND_C2C_EVENT`QQ群/单聊)和 `PUBLIC_GUILD_MESSAGES`(频道公域消息),如需其他事件类型请在开放平台申请权限。

View File

@@ -29,7 +29,7 @@ description: 将 CowAgent 接入企业微信智能机器人(长连接模式)
### 方式一Web 控制台接入
启动程序后打开 Web 控制台 (本地接为: http://127.0.0.1:9899/ ),选择 **通道** 菜单,点击 **接入通道**,选择 **企微智能机器人**,填写上一步保存的 Bot ID 和 Secret点击接入即可。
启动Cow项目后打开 Web 控制台 (本地接为: http://127.0.0.1:9899/ ),选择 **通道** 菜单,点击 **接入通道**,选择 **企微智能机器人**,填写上一步保存的 Bot ID 和 Secret点击接入即可。
<img src="https://cdn.link-ai.tech/doc/20260316181711.png" width="800"/>

View File

@@ -88,3 +88,11 @@ description: 将 CowAgent 接入企业微信自建应用
如需让外部个人微信用户使用,可在 **我的企业 → 微信插件** 中分享邀请关注二维码,个人微信扫码关注后即可与应用对话:
<img src="https://cdn.link-ai.tech/doc/20260228103232.png" width="520"/>
## 常见问题
需要确保已安装以下依赖:
```bash
pip install websocket-client pycryptodome
```

View File

@@ -157,6 +157,7 @@
"channels/feishu",
"channels/dingtalk",
"channels/wecom-bot",
"channels/qq",
"channels/wecom",
"channels/wechatmp"
]
@@ -300,6 +301,7 @@
"en/channels/feishu",
"en/channels/dingtalk",
"en/channels/wecom-bot",
"en/channels/qq",
"en/channels/wecom",
"en/channels/wechatmp"
]

88
docs/en/channels/qq.mdx Normal file
View File

@@ -0,0 +1,88 @@
---
title: QQ Bot
description: Connect CowAgent to QQ Bot (WebSocket long connection)
---
> Connect CowAgent via QQ Open Platform's bot API, supporting QQ direct messages, group chats (@bot), guild channel messages, and guild DMs. No public IP required — uses WebSocket long connection.
<Note>
QQ Bot is created through the QQ Open Platform. It uses WebSocket long connection to receive messages and OpenAPI to send messages. No public IP or domain is required.
</Note>
## 1. Create a QQ Bot
> Visit the [QQ Open Platform](https://q.qq.com), sign in with QQ. If you haven't registered, please complete [account registration](https://q.qq.com/#/register) first.
1.Go to the [QQ Open Platform - Bot List](https://q.qq.com/#/apps), and click **Create Bot**:
<img src="https://cdn.link-ai.tech/doc/20260317162900.png" width="800"/>
2.Fill in the bot name, avatar, and other basic information to complete the creation:
<img src="https://cdn.link-ai.tech/doc/20260317163005.png" width="800"/>
3.Enter the bot configuration page, go to **Development Management**, and complete the following steps:
- Copy and save the **AppID** (Bot ID)
- Generate and save the **AppSecret** (Bot Secret)
<img src="https://cdn.link-ai.tech/doc/20260317164955.png" width="800"/>
## 2. Configuration and Running
### Option A: Web Console
Start the program and open the Web console (local access: http://127.0.0.1:9899/). Go to the **Channels** tab, click **Connect Channel**, select **QQ Bot**, fill in the AppID and AppSecret from the previous step, and click Connect.
<img src="https://cdn.link-ai.tech/doc/20260317165425.png" width="800"/>
### Option B: Config File
Add the following to your `config.json`:
```json
{
"channel_type": "qq",
"qq_app_id": "YOUR_APP_ID",
"qq_app_secret": "YOUR_APP_SECRET"
}
```
| Parameter | Description |
| --- | --- |
| `qq_app_id` | AppID of the QQ Bot, found in Development Management on the open platform |
| `qq_app_secret` | AppSecret of the QQ Bot, found in Development Management on the open platform |
After configuration, start the program. The log message `[QQ] ✅ Connected successfully` indicates a successful connection.
## 3. Usage
In the QQ Open Platform, go to **Management → Usage Scope & Members**, scan the "Add to group and message list" QR code with your QQ client to start chatting with the bot:
<img src="https://cdn.link-ai.tech/doc/20260317165947.png" width="800"/>
Chat example:
<img src="https://cdn.link-ai.tech/doc/20260317171508.png" width="800"/>
## 4. Supported Features
> Note: To use the QQ bot in group chats and guild channels, you need to complete the publishing review and configure usage scope permissions.
| Feature | Status |
| --- | --- |
| QQ Direct Messages | ✅ |
| QQ Group Chat (@bot) | ✅ |
| Guild Channel (@bot) | ✅ |
| Guild DM | ✅ |
| Text Messages | ✅ Send & Receive |
| Image Messages | ✅ Send & Receive (group & direct) |
| File Messages | ✅ Send (group & direct) |
| Scheduled Tasks | ✅ Active push (4 per user per month) |
## 5. Notes
- **Passive message limits**: QQ direct message replies are valid for 60 minutes (max 5 replies per message); group chat replies are valid for 5 minutes.
- **Active message limits**: Both direct and group chats have a monthly limit of 4 active messages. Keep this in mind when using the scheduled tasks feature.
- **Event permissions**: By default, `GROUP_AND_C2C_EVENT` (QQ group/direct) and `PUBLIC_GUILD_MESSAGES` (guild public messages) are subscribed. Apply for additional permissions on the open platform if needed.

View File

@@ -88,3 +88,11 @@ Search for the app name you just created in WeCom to start chatting directly. Yo
To allow external personal WeChat users to use the app, go to **My Enterprise → WeChat Plugin**, share the invite QR code. After scanning and following, personal WeChat users can join and chat with the app:
<img src="https://cdn.link-ai.tech/doc/20260228103232.png" width="520"/>
## FAQ
Make sure the following dependencies are installed:
```bash
pip install websocket-client pycryptodome
```

106
run.sh
View File

@@ -409,19 +409,21 @@ select_channel() {
echo -e "${CYAN}${BOLD}=========================================${NC}"
echo -e "${YELLOW}1) Feishu (飞书)${NC}"
echo -e "${YELLOW}2) DingTalk (钉钉)${NC}"
echo -e "${YELLOW}3) WeCom (企微应用)${NC}"
echo -e "${YELLOW}4) Web (网页)${NC}"
echo -e "${YELLOW}3) WeCom Bot (企微智能机器人)${NC}"
echo -e "${YELLOW}4) QQ (QQ 机器人)${NC}"
echo -e "${YELLOW}5) WeCom App (企微自建应用)${NC}"
echo -e "${YELLOW}6) Web (网页)${NC}"
echo ""
while true; do
read -p "Enter your choice [press Enter for default: 1 - Feishu]: " channel_choice
channel_choice=${channel_choice:-1}
case "$channel_choice" in
1|2|3|4)
1|2|3|4|5|6)
break
;;
*)
echo -e "${RED}Invalid choice. Please enter 1-4.${NC}"
echo -e "${RED}Invalid choice. Please enter 1-6.${NC}"
;;
esac
done
@@ -456,9 +458,31 @@ configure_channel() {
ACCESS_INFO="DingTalk channel configured"
;;
3)
# WeCom
# WeCom Bot
CHANNEL_TYPE="wecom_bot"
echo -e "${GREEN}Configure WeCom Bot...${NC}"
read -p "Enter WeCom Bot ID: " wecom_bot_id
read -p "Enter WeCom Bot Secret: " wecom_bot_secret
WECOM_BOT_ID="$wecom_bot_id"
WECOM_BOT_SECRET="$wecom_bot_secret"
ACCESS_INFO="WeCom Bot channel configured"
;;
4)
# QQ
CHANNEL_TYPE="qq"
echo -e "${GREEN}Configure QQ Bot...${NC}"
read -p "Enter QQ App ID: " qq_app_id
read -p "Enter QQ App Secret: " qq_app_secret
QQ_APP_ID="$qq_app_id"
QQ_APP_SECRET="$qq_app_secret"
ACCESS_INFO="QQ Bot channel configured"
;;
5)
# WeCom App
CHANNEL_TYPE="wechatcom_app"
echo -e "${GREEN}Configure WeCom...${NC}"
echo -e "${GREEN}Configure WeCom App...${NC}"
read -p "Enter WeChat Corp ID: " corp_id
read -p "Enter WeChat Com App Token: " com_token
read -p "Enter WeChat Com App Secret: " com_secret
@@ -473,9 +497,9 @@ configure_channel() {
WECHATCOM_AGENT_ID="$com_agent_id"
WECHATCOM_AES_KEY="$com_aes_key"
WECHATCOM_PORT="$com_port"
ACCESS_INFO="WeCom channel configured on port ${com_port}"
ACCESS_INFO="WeCom App channel configured on port ${com_port}"
;;
4)
6)
# Web
CHANNEL_TYPE="web"
read -p "Enter web port [press Enter for default: 9899]: " web_port
@@ -600,6 +624,72 @@ EOF
"agent_max_context_turns": 30,
"agent_max_steps": 15
}
EOF
;;
wecom_bot)
cat > config.json <<EOF
{
"channel_type": "wecom_bot",
"wecom_bot_id": "${WECOM_BOT_ID}",
"wecom_bot_secret": "${WECOM_BOT_SECRET}",
"model": "${MODEL_NAME}",
"open_ai_api_key": "${OPENAI_KEY:-}",
"open_ai_api_base": "${OPENAI_BASE:-https://api.openai.com/v1}",
"claude_api_key": "${CLAUDE_KEY:-}",
"claude_api_base": "${CLAUDE_BASE:-https://api.anthropic.com/v1}",
"gemini_api_key": "${GEMINI_KEY:-}",
"gemini_api_base": "${GEMINI_BASE:-https://generativelanguage.googleapis.com}",
"zhipu_ai_api_key": "${ZHIPU_KEY:-}",
"moonshot_api_key": "${MOONSHOT_KEY:-}",
"ark_api_key": "${ARK_KEY:-}",
"dashscope_api_key": "${DASHSCOPE_KEY:-}",
"minimax_api_key": "${MINIMAX_KEY:-}",
"voice_to_text": "openai",
"text_to_voice": "openai",
"voice_reply_voice": false,
"speech_recognition": true,
"group_speech_recognition": false,
"use_linkai": ${USE_LINKAI:-false},
"linkai_api_key": "${LINKAI_KEY:-}",
"linkai_app_code": "",
"agent": true,
"agent_max_context_tokens": 40000,
"agent_max_context_turns": 30,
"agent_max_steps": 15
}
EOF
;;
qq)
cat > config.json <<EOF
{
"channel_type": "qq",
"qq_app_id": "${QQ_APP_ID}",
"qq_app_secret": "${QQ_APP_SECRET}",
"model": "${MODEL_NAME}",
"open_ai_api_key": "${OPENAI_KEY:-}",
"open_ai_api_base": "${OPENAI_BASE:-https://api.openai.com/v1}",
"claude_api_key": "${CLAUDE_KEY:-}",
"claude_api_base": "${CLAUDE_BASE:-https://api.anthropic.com/v1}",
"gemini_api_key": "${GEMINI_KEY:-}",
"gemini_api_base": "${GEMINI_BASE:-https://generativelanguage.googleapis.com}",
"zhipu_ai_api_key": "${ZHIPU_KEY:-}",
"moonshot_api_key": "${MOONSHOT_KEY:-}",
"ark_api_key": "${ARK_KEY:-}",
"dashscope_api_key": "${DASHSCOPE_KEY:-}",
"minimax_api_key": "${MINIMAX_KEY:-}",
"voice_to_text": "openai",
"text_to_voice": "openai",
"voice_reply_voice": false,
"speech_recognition": true,
"group_speech_recognition": false,
"use_linkai": ${USE_LINKAI:-false},
"linkai_api_key": "${LINKAI_KEY:-}",
"linkai_app_code": "",
"agent": true,
"agent_max_context_tokens": 40000,
"agent_max_context_turns": 30,
"agent_max_steps": 15
}
EOF
;;
wechatcom_app)