formatting code

This commit is contained in:
lanvent
2023-04-17 01:00:08 +08:00
parent 3b8972ce1f
commit 8f72e8c3e6
92 changed files with 1850 additions and 1188 deletions

View File

@@ -1 +1 @@
from .godcmd import *
from .godcmd import *

View File

@@ -1,4 +1,4 @@
{
"password": "",
"admin_users": []
}
"password": "",
"admin_users": []
}

View File

@@ -6,14 +6,16 @@ import random
import string
import traceback
from typing import Tuple
import plugins
from bridge.bridge import Bridge
from bridge.context import ContextType
from bridge.reply import Reply, ReplyType
from config import conf, load_config
import plugins
from plugins import *
from common import const
from common.log import logger
from config import conf, load_config
from plugins import *
# 定义指令集
COMMANDS = {
"help": {
@@ -41,7 +43,7 @@ COMMANDS = {
},
"id": {
"alias": ["id", "用户"],
"desc": "获取用户id", # wechaty和wechatmp的用户id不会变化可用于绑定管理员
"desc": "获取用户id", # wechaty和wechatmp的用户id不会变化可用于绑定管理员
},
"reset": {
"alias": ["reset", "重置会话"],
@@ -114,18 +116,20 @@ ADMIN_COMMANDS = {
"desc": "开启机器调试日志",
},
}
# 定义帮助函数
def get_help_text(isadmin, isgroup):
help_text = "通用指令:\n"
for cmd, info in COMMANDS.items():
if cmd=="auth": #不提示认证指令
if cmd == "auth": # 不提示认证指令
continue
if cmd=="id" and conf().get("channel_type","wx") not in ["wxy","wechatmp"]:
if cmd == "id" and conf().get("channel_type", "wx") not in ["wxy", "wechatmp"]:
continue
alias=["#"+a for a in info['alias'][:1]]
alias = ["#" + a for a in info["alias"][:1]]
help_text += f"{','.join(alias)} "
if 'args' in info:
args=[a for a in info['args']]
if "args" in info:
args = [a for a in info["args"]]
help_text += f"{' '.join(args)}"
help_text += f": {info['desc']}\n"
@@ -135,39 +139,48 @@ def get_help_text(isadmin, isgroup):
for plugin in plugins:
if plugins[plugin].enabled and not plugins[plugin].hidden:
namecn = plugins[plugin].namecn
help_text += "\n%s:"%namecn
help_text += PluginManager().instances[plugin].get_help_text(verbose=False).strip()
help_text += "\n%s:" % namecn
help_text += (
PluginManager().instances[plugin].get_help_text(verbose=False).strip()
)
if ADMIN_COMMANDS and isadmin:
help_text += "\n\n管理员指令:\n"
for cmd, info in ADMIN_COMMANDS.items():
alias=["#"+a for a in info['alias'][:1]]
alias = ["#" + a for a in info["alias"][:1]]
help_text += f"{','.join(alias)} "
if 'args' in info:
args=[a for a in info['args']]
if "args" in info:
args = [a for a in info["args"]]
help_text += f"{' '.join(args)}"
help_text += f": {info['desc']}\n"
return help_text
@plugins.register(name="Godcmd", desire_priority=999, hidden=True, desc="为你的机器人添加指令集,有用户和管理员两种角色,加载顺序请放在首位,初次运行后插件目录会生成配置文件, 填充管理员密码后即可认证", version="1.0", author="lanvent")
class Godcmd(Plugin):
@plugins.register(
name="Godcmd",
desire_priority=999,
hidden=True,
desc="为你的机器人添加指令集,有用户和管理员两种角色,加载顺序请放在首位,初次运行后插件目录会生成配置文件, 填充管理员密码后即可认证",
version="1.0",
author="lanvent",
)
class Godcmd(Plugin):
def __init__(self):
super().__init__()
curdir=os.path.dirname(__file__)
config_path=os.path.join(curdir,"config.json")
gconf=None
curdir = os.path.dirname(__file__)
config_path = os.path.join(curdir, "config.json")
gconf = None
if not os.path.exists(config_path):
gconf={"password":"","admin_users":[]}
with open(config_path,"w") as f:
json.dump(gconf,f,indent=4)
gconf = {"password": "", "admin_users": []}
with open(config_path, "w") as f:
json.dump(gconf, f, indent=4)
else:
with open(config_path,"r") as f:
gconf=json.load(f)
with open(config_path, "r") as f:
gconf = json.load(f)
if gconf["password"] == "":
self.temp_password = "".join(random.sample(string.digits, 4))
logger.info("[Godcmd] 因未设置口令,本次的临时口令为%s"%self.temp_password)
logger.info("[Godcmd] 因未设置口令,本次的临时口令为%s" % self.temp_password)
else:
self.temp_password = None
custom_commands = conf().get("clear_memory_commands", [])
@@ -178,41 +191,42 @@ class Godcmd(Plugin):
COMMANDS["reset"]["alias"].append(custom_command)
self.password = gconf["password"]
self.admin_users = gconf["admin_users"] # 预存的管理员账号这些账号不需要认证。itchat的用户名每次都会变不可用
self.isrunning = True # 机器人是否运行中
self.admin_users = gconf[
"admin_users"
] # 预存的管理员账号这些账号不需要认证。itchat的用户名每次都会变不可用
self.isrunning = True # 机器人是否运行中
self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context
logger.info("[Godcmd] inited")
def on_handle_context(self, e_context: EventContext):
context_type = e_context['context'].type
context_type = e_context["context"].type
if context_type != ContextType.TEXT:
if not self.isrunning:
e_context.action = EventAction.BREAK_PASS
return
content = e_context['context'].content
content = e_context["context"].content
logger.debug("[Godcmd] on_handle_context. content: %s" % content)
if content.startswith("#"):
# msg = e_context['context']['msg']
channel = e_context['channel']
user = e_context['context']['receiver']
session_id = e_context['context']['session_id']
isgroup = e_context['context'].get("isgroup", False)
channel = e_context["channel"]
user = e_context["context"]["receiver"]
session_id = e_context["context"]["session_id"]
isgroup = e_context["context"].get("isgroup", False)
bottype = Bridge().get_bot_type("chat")
bot = Bridge().get_bot("chat")
# 将命令和参数分割
command_parts = content[1:].strip().split()
cmd = command_parts[0]
args = command_parts[1:]
isadmin=False
isadmin = False
if user in self.admin_users:
isadmin=True
ok=False
result="string"
if any(cmd in info['alias'] for info in COMMANDS.values()):
cmd = next(c for c, info in COMMANDS.items() if cmd in info['alias'])
isadmin = True
ok = False
result = "string"
if any(cmd in info["alias"] for info in COMMANDS.values()):
cmd = next(c for c, info in COMMANDS.items() if cmd in info["alias"])
if cmd == "auth":
ok, result = self.authenticate(user, args, isadmin, isgroup)
elif cmd == "help" or cmd == "helpp":
@@ -224,10 +238,14 @@ class Godcmd(Plugin):
query_name = args[0].upper()
# search name and namecn
for name, plugincls in plugins.items():
if not plugincls.enabled :
if not plugincls.enabled:
continue
if query_name == name or query_name == plugincls.namecn:
ok, result = True, PluginManager().instances[name].get_help_text(isgroup=isgroup, isadmin=isadmin, verbose=True)
ok, result = True, PluginManager().instances[
name
].get_help_text(
isgroup=isgroup, isadmin=isadmin, verbose=True
)
break
if not ok:
result = "插件不存在或未启用"
@@ -236,14 +254,14 @@ class Godcmd(Plugin):
elif cmd == "set_openai_api_key":
if len(args) == 1:
user_data = conf().get_user_data(user)
user_data['openai_api_key'] = args[0]
user_data["openai_api_key"] = args[0]
ok, result = True, "你的OpenAI私有api_key已设置为" + args[0]
else:
ok, result = False, "请提供一个api_key"
elif cmd == "reset_openai_api_key":
try:
user_data = conf().get_user_data(user)
user_data.pop('openai_api_key')
user_data.pop("openai_api_key")
ok, result = True, "你的OpenAI私有api_key已清除"
except Exception as e:
ok, result = False, "你没有设置私有api_key"
@@ -255,12 +273,16 @@ class Godcmd(Plugin):
else:
ok, result = False, "当前对话机器人不支持重置会话"
logger.debug("[Godcmd] command: %s by %s" % (cmd, user))
elif any(cmd in info['alias'] for info in ADMIN_COMMANDS.values()):
elif any(cmd in info["alias"] for info in ADMIN_COMMANDS.values()):
if isadmin:
if isgroup:
ok, result = False, "群聊不可执行管理员指令"
else:
cmd = next(c for c, info in ADMIN_COMMANDS.items() if cmd in info['alias'])
cmd = next(
c
for c, info in ADMIN_COMMANDS.items()
if cmd in info["alias"]
)
if cmd == "stop":
self.isrunning = False
ok, result = True, "服务已暂停"
@@ -278,13 +300,13 @@ class Godcmd(Plugin):
else:
ok, result = False, "当前对话机器人不支持重置会话"
elif cmd == "debug":
logger.setLevel('DEBUG')
logger.setLevel("DEBUG")
ok, result = True, "DEBUG模式已开启"
elif cmd == "plist":
plugins = PluginManager().list_plugins()
ok = True
result = "插件列表:\n"
for name,plugincls in plugins.items():
for name, plugincls in plugins.items():
result += f"{plugincls.name}_v{plugincls.version} {plugincls.priority} - "
if plugincls.enabled:
result += "已启用\n"
@@ -294,16 +316,20 @@ class Godcmd(Plugin):
new_plugins = PluginManager().scan_plugins()
ok, result = True, "插件扫描完成"
PluginManager().activate_plugins()
if len(new_plugins) >0 :
if len(new_plugins) > 0:
result += "\n发现新插件:\n"
result += "\n".join([f"{p.name}_v{p.version}" for p in new_plugins])
else :
result +=", 未发现新插件"
result += "\n".join(
[f"{p.name}_v{p.version}" for p in new_plugins]
)
else:
result += ", 未发现新插件"
elif cmd == "setpri":
if len(args) != 2:
ok, result = False, "请提供插件名和优先级"
else:
ok = PluginManager().set_plugin_priority(args[0], int(args[1]))
ok = PluginManager().set_plugin_priority(
args[0], int(args[1])
)
if ok:
result = "插件" + args[0] + "优先级已设置为" + args[1]
else:
@@ -350,42 +376,42 @@ class Godcmd(Plugin):
else:
ok, result = False, "需要管理员权限才能执行该指令"
else:
trigger_prefix = conf().get('plugin_trigger_prefix',"$")
if trigger_prefix == "#": # 跟插件聊天指令前缀相同,继续递交
trigger_prefix = conf().get("plugin_trigger_prefix", "$")
if trigger_prefix == "#": # 跟插件聊天指令前缀相同,继续递交
return
ok, result = False, f"未知指令:{cmd}\n查看指令列表请输入#help \n"
reply = Reply()
if ok:
reply.type = ReplyType.INFO
else:
reply.type = ReplyType.ERROR
reply.content = result
e_context['reply'] = reply
e_context["reply"] = reply
e_context.action = EventAction.BREAK_PASS # 事件结束并跳过处理context的默认逻辑
e_context.action = EventAction.BREAK_PASS # 事件结束并跳过处理context的默认逻辑
elif not self.isrunning:
e_context.action = EventAction.BREAK_PASS
def authenticate(self, userid, args, isadmin, isgroup) -> Tuple[bool,str] :
def authenticate(self, userid, args, isadmin, isgroup) -> Tuple[bool, str]:
if isgroup:
return False,"请勿在群聊中认证"
return False, "请勿在群聊中认证"
if isadmin:
return False,"管理员账号无需认证"
return False, "管理员账号无需认证"
if len(args) != 1:
return False,"请提供口令"
return False, "请提供口令"
password = args[0]
if password == self.password:
self.admin_users.append(userid)
return True,"认证成功"
return True, "认证成功"
elif password == self.temp_password:
self.admin_users.append(userid)
return True,"认证成功,请尽快设置口令"
return True, "认证成功,请尽快设置口令"
else:
return False,"认证失败"
return False, "认证失败"
def get_help_text(self, isadmin = False, isgroup = False, **kwargs):
return get_help_text(isadmin, isgroup)
def get_help_text(self, isadmin=False, isgroup=False, **kwargs):
return get_help_text(isadmin, isgroup)