mirror of
https://github.com/zhayujie/chatgpt-on-wechat.git
synced 2026-03-19 21:38:18 +08:00
134 lines
5.1 KiB
Python
134 lines
5.1 KiB
Python
import asyncio
|
||
import json
|
||
import threading
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
|
||
import plugins
|
||
from bridge.context import ContextType
|
||
from bridge.reply import Reply, ReplyType
|
||
from channel.chat_message import ChatMessage
|
||
from common.log import logger
|
||
from config import conf, global_config
|
||
from plugins import *
|
||
from .midjourney import MJBot, TaskType
|
||
|
||
# 任务线程池
|
||
task_thread_pool = ThreadPoolExecutor(max_workers=4)
|
||
|
||
|
||
@plugins.register(
|
||
name="linkai",
|
||
desc="A plugin that supports knowledge base and midjourney drawing.",
|
||
version="0.1.0",
|
||
author="https://link-ai.tech",
|
||
)
|
||
class LinkAI(Plugin):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context
|
||
self.config = super().load_config()
|
||
self.mj_bot = MJBot(self.config.get("midjourney"))
|
||
logger.info("[LinkAI] inited")
|
||
|
||
def on_handle_context(self, e_context: EventContext):
|
||
"""
|
||
消息处理逻辑
|
||
:param e_context: 消息上下文
|
||
"""
|
||
context = e_context['context']
|
||
if context.type not in [ContextType.TEXT, ContextType.IMAGE]:
|
||
# filter content no need solve
|
||
return
|
||
|
||
mj_type = self.mj_bot.judge_mj_task_type(e_context)
|
||
if mj_type:
|
||
# MJ作图任务处理
|
||
self.mj_bot.process_mj_task(mj_type, e_context)
|
||
return
|
||
|
||
if context.content.startswith(f"{_get_trigger_prefix()}linkai"):
|
||
# 应用管理功能
|
||
self._process_admin_cmd(e_context)
|
||
return
|
||
|
||
if self._is_chat_task(e_context):
|
||
# 文本对话任务处理
|
||
self._process_chat_task(e_context)
|
||
|
||
# 插件管理功能
|
||
def _process_admin_cmd(self, e_context: EventContext):
|
||
context = e_context['context']
|
||
cmd = context.content.split()
|
||
if len(cmd) == 1 or (len(cmd) == 2 and cmd[1] == "help"):
|
||
_set_reply_text(self.get_help_text(verbose=True), e_context, level=ReplyType.INFO)
|
||
return
|
||
if len(cmd) == 3 and cmd[1] == "app":
|
||
if not context.kwargs.get("isgroup"):
|
||
_set_reply_text("该指令需在群聊中使用", e_context, level=ReplyType.ERROR)
|
||
return
|
||
if e_context["context"]["session_id"] not in global_config["admin_users"]:
|
||
_set_reply_text("需要管理员权限执行", e_context, level=ReplyType.ERROR)
|
||
return
|
||
app_code = cmd[2]
|
||
group_name = context.kwargs.get("msg").from_user_nickname
|
||
group_mapping = self.config.get("group_app_map")
|
||
if group_mapping:
|
||
group_mapping[group_name] = app_code
|
||
else:
|
||
self.config["group_app_map"] = {group_name: app_code}
|
||
# 保存插件配置
|
||
super().save_config(self.config)
|
||
_set_reply_text(f"应用设置成功: {app_code}", e_context, level=ReplyType.INFO)
|
||
else:
|
||
_set_reply_text(f"指令错误,请输入{_get_trigger_prefix()}linkai help 获取帮助", e_context, level=ReplyType.INFO)
|
||
return
|
||
|
||
# LinkAI 对话任务处理
|
||
def _is_chat_task(self, e_context: EventContext):
|
||
context = e_context['context']
|
||
# 群聊应用管理
|
||
return self.config.get("group_app_map") and context.kwargs.get("isgroup")
|
||
|
||
def _process_chat_task(self, e_context: EventContext):
|
||
"""
|
||
处理LinkAI对话任务
|
||
:param e_context: 对话上下文
|
||
"""
|
||
context = e_context['context']
|
||
# 群聊应用管理
|
||
group_name = context.kwargs.get("msg").from_user_nickname
|
||
app_code = self._fetch_group_app_code(group_name)
|
||
if app_code:
|
||
context.kwargs['app_code'] = app_code
|
||
|
||
def _fetch_group_app_code(self, group_name: str) -> str:
|
||
"""
|
||
根据群聊名称获取对应的应用code
|
||
:param group_name: 群聊名称
|
||
:return: 应用code
|
||
"""
|
||
group_mapping = self.config.get("group_app_map")
|
||
if group_mapping:
|
||
app_code = group_mapping.get(group_name) or group_mapping.get("ALL_GROUP")
|
||
return app_code
|
||
|
||
def get_help_text(self, verbose=False, **kwargs):
|
||
trigger_prefix = _get_trigger_prefix()
|
||
help_text = "用于集成 LinkAI 提供的文本对话、知识库、绘画等能力。\n"
|
||
if not verbose:
|
||
return help_text
|
||
help_text += ""
|
||
help_text += f"{trigger_prefix}mj 描述词1,描述词2 ... : 利用描述词作画,参数请放在提示词之后。\n\n{trigger_prefix}mju ID 图片序号: 对指定ID消息中的第x张图片进行放大。\n例如:\n\"{trigger_prefix}mj a little cat, white --ar 9:16\"\n\"{trigger_prefix}mjimage a white cat --ar 9:16\"\n\"{trigger_prefix}mju 1105592717188272288 2\""
|
||
return help_text
|
||
|
||
|
||
# 静态方法
|
||
def _set_reply_text(content: str, e_context: EventContext, level: ReplyType = ReplyType.ERROR):
|
||
reply = Reply(level, content)
|
||
e_context["reply"] = reply
|
||
e_context.action = EventAction.BREAK_PASS
|
||
|
||
|
||
def _get_trigger_prefix():
|
||
return conf().get("plugin_trigger_prefix", "$")
|