mirror of
https://github.com/zhayujie/chatgpt-on-wechat.git
synced 2026-04-21 03:18:39 +08:00
feat: support batch skill install from zip and github
This commit is contained in:
@@ -599,239 +599,38 @@ class CowCliPlugin(Plugin):
|
||||
return "请指定要安装的技能: /skill install <名称>"
|
||||
|
||||
try:
|
||||
from cli.utils import get_skills_dir, SKILL_HUB_API
|
||||
from cli.commands.skill import _parse_github_url, _download_github_dir
|
||||
import requests
|
||||
import shutil
|
||||
import zipfile
|
||||
import tempfile
|
||||
from cli.commands.skill import install_skill
|
||||
result = install_skill(name)
|
||||
|
||||
skills_dir = get_skills_dir()
|
||||
os.makedirs(skills_dir, exist_ok=True)
|
||||
if result.error:
|
||||
return f"安装失败: {result.error}"
|
||||
|
||||
if name.startswith(("http://", "https://")) and name.rstrip("/").endswith("SKILL.md"):
|
||||
import re as re_mod
|
||||
dir_url = re_mod.sub(r'/SKILL\.md/?$', '', name)
|
||||
gh = _parse_github_url(dir_url)
|
||||
if gh:
|
||||
owner, repo, branch, subpath = gh
|
||||
spec = f"{owner}/{repo}"
|
||||
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo
|
||||
return self._skill_install_github(
|
||||
spec, skills_dir, subpath=subpath, skill_name=skill_name, branch=branch
|
||||
)
|
||||
return self._skill_install_url(name, skills_dir)
|
||||
if not result.installed:
|
||||
return "\n".join(result.messages) if result.messages else "未找到可安装的技能"
|
||||
|
||||
parsed = _parse_github_url(name)
|
||||
if parsed:
|
||||
owner, repo, branch, subpath = parsed
|
||||
spec = f"{owner}/{repo}"
|
||||
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo
|
||||
return self._skill_install_github(
|
||||
spec, skills_dir, subpath=subpath, skill_name=skill_name, branch=branch
|
||||
)
|
||||
|
||||
provider = None
|
||||
if name.startswith("github:"):
|
||||
name = name[7:]
|
||||
elif name.startswith("clawhub:"):
|
||||
name = name[8:]
|
||||
provider = "clawhub"
|
||||
|
||||
body = {}
|
||||
if provider:
|
||||
body["provider"] = provider
|
||||
resp = requests.post(
|
||||
f"{SKILL_HUB_API}/skills/{name}/download",
|
||||
json=body,
|
||||
timeout=15,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
|
||||
content_type = resp.headers.get("Content-Type", "")
|
||||
|
||||
if "application/json" in content_type:
|
||||
data = resp.json()
|
||||
source_type = data.get("source_type")
|
||||
if source_type == "github" or "redirect" in data:
|
||||
source_url = data.get("source_url", "")
|
||||
parsed_url = _parse_github_url(source_url)
|
||||
if parsed_url:
|
||||
owner, repo, branch, subpath = parsed_url
|
||||
return self._skill_install_github(
|
||||
f"{owner}/{repo}", skills_dir, subpath=subpath,
|
||||
skill_name=name, branch=branch
|
||||
)
|
||||
return self._skill_install_github(source_url, skills_dir, skill_name=name)
|
||||
if source_type == "registry":
|
||||
download_url = data.get("download_url")
|
||||
if not download_url:
|
||||
return f"此技能来自不支持的注册表,无法自动安装。"
|
||||
from urllib.parse import urlparse
|
||||
if urlparse(download_url).scheme != "https":
|
||||
return "安装失败: 下载地址不安全 (非 HTTPS)"
|
||||
provider = data.get("source_provider", "registry")
|
||||
try:
|
||||
dl_resp = requests.get(download_url, timeout=60, allow_redirects=True)
|
||||
dl_resp.raise_for_status()
|
||||
except Exception as e:
|
||||
return f"从 {provider} 下载失败: {e}"
|
||||
self._extract_zip(dl_resp.content, name, skills_dir)
|
||||
self._register_skill(name, source=provider)
|
||||
return self._format_install_success(name, provider)
|
||||
|
||||
elif "application/zip" in content_type:
|
||||
self._extract_zip(resp.content, name, skills_dir)
|
||||
self._register_skill(name, source="cowhub")
|
||||
return self._format_install_success(name, "cowhub")
|
||||
|
||||
return "技能商店返回了未预期的响应格式"
|
||||
|
||||
except requests.HTTPError as e:
|
||||
if e.response is not None and e.response.status_code == 404:
|
||||
return f"技能 '{name}' 未在技能商店中找到"
|
||||
return f"安装失败: {e}"
|
||||
return self._format_install_result(result)
|
||||
except Exception as e:
|
||||
return f"安装失败: {e}"
|
||||
|
||||
def _skill_install_url(self, url: str, skills_dir: str) -> str:
|
||||
"""Install a skill from a direct SKILL.md URL."""
|
||||
import requests
|
||||
from cli.commands.skill import _parse_skill_frontmatter
|
||||
|
||||
try:
|
||||
resp = requests.get(url, timeout=30)
|
||||
resp.raise_for_status()
|
||||
except Exception as e:
|
||||
return f"下载 SKILL.md 失败: {e}"
|
||||
|
||||
content = resp.text
|
||||
fm = _parse_skill_frontmatter(content)
|
||||
skill_name = fm.get("name")
|
||||
if not skill_name:
|
||||
return "SKILL.md 中未找到 name 字段,无法安装"
|
||||
|
||||
skill_name = skill_name.strip()
|
||||
skill_dir = os.path.join(skills_dir, skill_name)
|
||||
os.makedirs(skill_dir, exist_ok=True)
|
||||
|
||||
with open(os.path.join(skill_dir, "SKILL.md"), "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
|
||||
self._register_skill(skill_name, source="url")
|
||||
return self._format_install_success(skill_name, "url")
|
||||
|
||||
def _skill_install_github(self, spec: str, skills_dir: str,
|
||||
subpath: str = None, skill_name: str = None,
|
||||
branch: str = "main") -> str:
|
||||
import requests
|
||||
import shutil
|
||||
import zipfile
|
||||
import tempfile
|
||||
from cli.commands.skill import _download_github_dir
|
||||
|
||||
if "#" in spec and not subpath:
|
||||
spec, subpath = spec.split("#", 1)
|
||||
if not skill_name:
|
||||
skill_name = subpath.rstrip("/").split("/")[-1] if subpath else spec.split("/")[-1]
|
||||
|
||||
owner, repo = spec.split("/", 1)
|
||||
target_dir = os.path.join(skills_dir, skill_name)
|
||||
|
||||
# For subpath installs, try Contents API first
|
||||
if subpath:
|
||||
try:
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
api_dest = os.path.join(tmp_dir, skill_name)
|
||||
os.makedirs(api_dest)
|
||||
_download_github_dir(owner, repo, branch, subpath.strip("/"), api_dest)
|
||||
if os.path.exists(target_dir):
|
||||
shutil.rmtree(target_dir)
|
||||
shutil.copytree(api_dest, target_dir)
|
||||
self._register_skill(skill_name, source="github")
|
||||
return self._format_install_success(skill_name, "github")
|
||||
except Exception:
|
||||
pass # fall through to zip download
|
||||
|
||||
# Fallback: download full repo zip
|
||||
zip_url = f"https://github.com/{spec}/archive/refs/heads/{branch}.zip"
|
||||
try:
|
||||
resp = requests.get(zip_url, timeout=60, allow_redirects=True)
|
||||
resp.raise_for_status()
|
||||
except Exception as e:
|
||||
return f"从 GitHub 下载失败: {e}"
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
zip_path = os.path.join(tmp_dir, "repo.zip")
|
||||
with open(zip_path, "wb") as f:
|
||||
f.write(resp.content)
|
||||
|
||||
extract_dir = os.path.join(tmp_dir, "extracted")
|
||||
with zipfile.ZipFile(zip_path, "r") as zf:
|
||||
zf.extractall(extract_dir)
|
||||
|
||||
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
|
||||
repo_root = extract_dir
|
||||
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
|
||||
repo_root = os.path.join(extract_dir, top_items[0])
|
||||
|
||||
if subpath:
|
||||
source_dir = os.path.join(repo_root, subpath.strip("/"))
|
||||
if not os.path.isdir(source_dir):
|
||||
return f"路径 '{subpath}' 在仓库中不存在"
|
||||
else:
|
||||
source_dir = repo_root
|
||||
|
||||
if os.path.exists(target_dir):
|
||||
shutil.rmtree(target_dir)
|
||||
shutil.copytree(source_dir, target_dir)
|
||||
|
||||
self._register_skill(skill_name, source="github")
|
||||
return self._format_install_success(skill_name, "github")
|
||||
|
||||
def _extract_zip(self, content: bytes, name: str, skills_dir: str):
|
||||
import zipfile
|
||||
import tempfile
|
||||
import shutil
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp_dir:
|
||||
zip_path = os.path.join(tmp_dir, "package.zip")
|
||||
with open(zip_path, "wb") as f:
|
||||
f.write(content)
|
||||
|
||||
extract_dir = os.path.join(tmp_dir, "extracted")
|
||||
with zipfile.ZipFile(zip_path, "r") as zf:
|
||||
zf.extractall(extract_dir)
|
||||
|
||||
top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")]
|
||||
source = extract_dir
|
||||
if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])):
|
||||
source = os.path.join(extract_dir, top_items[0])
|
||||
|
||||
target = os.path.join(skills_dir, name)
|
||||
if os.path.exists(target):
|
||||
shutil.rmtree(target)
|
||||
shutil.copytree(source, target)
|
||||
|
||||
@staticmethod
|
||||
def _register_skill(name: str, source: str = "cowhub"):
|
||||
try:
|
||||
from cli.commands.skill import _register_installed_skill
|
||||
_register_installed_skill(name, source=source)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _format_install_success(name: str, source: str) -> str:
|
||||
def _format_install_result(result) -> str:
|
||||
"""Format InstallResult into a chat-friendly message."""
|
||||
from cli.commands.skill import _read_skill_description
|
||||
from cli.utils import get_skills_dir
|
||||
desc = _read_skill_description(os.path.join(get_skills_dir(), name))
|
||||
lines = [f"✅ {name}"]
|
||||
if desc:
|
||||
if len(desc) > 60:
|
||||
desc = desc[:57] + "…"
|
||||
lines.append(f" {desc}")
|
||||
lines.append(f" 来源: {source}")
|
||||
skills_dir = get_skills_dir()
|
||||
|
||||
lines = []
|
||||
for skill_name in result.installed:
|
||||
desc = _read_skill_description(os.path.join(skills_dir, skill_name))
|
||||
lines.append(f"✅ {skill_name}")
|
||||
if desc:
|
||||
if len(desc) > 60:
|
||||
desc = desc[:57] + "…"
|
||||
lines.append(f" {desc}")
|
||||
|
||||
if len(result.installed) > 1:
|
||||
lines.append(f"\n共安装 {len(result.installed)} 个技能")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _skill_uninstall(self, name: str) -> str:
|
||||
|
||||
Reference in New Issue
Block a user