From 079df5a47c01b33508c79c3cd9177a88dc0a7008 Mon Sep 17 00:00:00 2001 From: zhayujie Date: Sun, 29 Mar 2026 14:38:11 +0800 Subject: [PATCH] feat: support batch skill install from zip and github --- cli/commands/skill.py | 562 ++++++++++++++++++++++--------------- plugins/cow_cli/cow_cli.py | 247 ++-------------- 2 files changed, 355 insertions(+), 454 deletions(-) diff --git a/cli/commands/skill.py b/cli/commands/skill.py index 01065a72..51187262 100644 --- a/cli/commands/skill.py +++ b/cli/commands/skill.py @@ -8,6 +8,8 @@ import hashlib import shutil import zipfile import tempfile +from dataclasses import dataclass, field +from typing import Optional, List from urllib.parse import urlparse @@ -22,6 +24,24 @@ from cli.utils import ( SKILL_HUB_API, ) + +# ====================================================================== +# Public types for the core install API (used by CLI and chat plugin) +# ====================================================================== + +class SkillInstallError(Exception): + """Raised when skill installation fails.""" + pass + + +@dataclass +class InstallResult: + """Result of a skill installation operation.""" + installed: List[str] = field(default_factory=list) + messages: List[str] = field(default_factory=list) + error: Optional[str] = None + + _SAFE_NAME_RE = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_\-]{0,63}$") _GITHUB_URL_RE = re.compile( r"^https?://github\.com/([^/]+)/([^/]+?)(?:\.git)?(?:/(?:tree|blob)/([^/]+)(?:/(.+))?)?/?$" @@ -216,6 +236,51 @@ def _scan_skills_in_repo(repo_root: str) -> list: return found +def _scan_skills_in_dir(directory: str) -> list: + """Scan immediate subdirectories for SKILL.md files. + + Unlike _scan_skills_in_repo which checks conventional locations, + this scans all direct children of the given directory. + Returns a list of (skill_name, skill_dir_path) tuples. + """ + found = [] + if not os.path.isdir(directory): + return found + for entry in os.listdir(directory): + if entry.startswith(".") or entry in _SKILL_SCAN_SKIP: + continue + entry_path = os.path.join(directory, entry) + if os.path.isdir(entry_path) and os.path.isfile(os.path.join(entry_path, "SKILL.md")): + fm = _parse_skill_frontmatter( + _read_file_text(os.path.join(entry_path, "SKILL.md")) + ) + name = fm.get("name") or entry + found.append((name, entry_path)) + return found + + +def _batch_install_skills(discovered, spec, skills_dir, source, result: InstallResult): + """Install a list of discovered skills into skills_dir.""" + result.messages.append(f"Found {len(discovered)} skill(s) in {spec}:") + for sname, sdir in discovered: + safe_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', sname)[:64] + if not _SAFE_NAME_RE.match(safe_name): + result.messages.append(f" Skipping '{sname}' (invalid name)") + continue + target_dir = os.path.join(skills_dir, safe_name) + if os.path.exists(target_dir): + shutil.rmtree(target_dir) + shutil.copytree(sdir, target_dir) + _register_installed_skill(safe_name, source=source) + result.installed.append(safe_name) + result.messages.append(f" + {safe_name}") + + if result.installed: + result.messages.append(f"{len(result.installed)} skill(s) installed from {spec}.") + else: + result.messages.append("No valid skills found.") + + def _read_file_text(path: str) -> str: """Read a file as UTF-8 text, returning empty string on failure.""" try: @@ -225,16 +290,11 @@ def _read_file_text(path: str) -> str: return "" -def _install_local(path: str): - """Install skill(s) from a local directory. - - If the path contains SKILL.md directly, install it as a single skill. - Otherwise scan for skills/ subdirectories with SKILL.md. - """ +def _install_local(path: str, result: InstallResult): + """Install skill(s) from a local directory.""" path = os.path.abspath(os.path.expanduser(path)) if not os.path.isdir(path): - click.echo(f"Error: '{path}' is not a directory.", err=True) - sys.exit(1) + raise SkillInstallError(f"'{path}' is not a directory.") skills_dir = get_skills_dir() os.makedirs(skills_dir, exist_ok=True) @@ -243,41 +303,21 @@ def _install_local(path: str): fm = _parse_skill_frontmatter(_read_file_text(os.path.join(path, "SKILL.md"))) skill_name = fm.get("name") or os.path.basename(path) skill_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', skill_name)[:64] - _validate_skill_name(skill_name) + _check_skill_name(skill_name) target_dir = os.path.join(skills_dir, skill_name) if os.path.exists(target_dir): shutil.rmtree(target_dir) shutil.copytree(path, target_dir) _register_installed_skill(skill_name, source="local") - _print_install_success(skill_name, "local") + result.installed.append(skill_name) + result.messages.append(f"Installed '{skill_name}' from local path.") return - discovered = _scan_skills_in_repo(path) + discovered = _scan_skills_in_repo(path) or _scan_skills_in_dir(path) if not discovered: - click.echo(f"Error: No skills found in '{path}'.", err=True) - sys.exit(1) + raise SkillInstallError(f"No skills found in '{path}'.") - click.echo(f"Found {len(discovered)} skill(s) in {path}:\n") - installed_names = [] - for sname, sdir in discovered: - safe_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', sname)[:64] - if not _SAFE_NAME_RE.match(safe_name): - click.echo(click.style(f" ✗ Skipping '{sname}' (invalid name)", fg="yellow")) - continue - target_dir = os.path.join(skills_dir, safe_name) - if os.path.exists(target_dir): - shutil.rmtree(target_dir) - shutil.copytree(sdir, target_dir) - _register_installed_skill(safe_name, source="local") - installed_names.append(safe_name) - - if installed_names: - click.echo("") - for n in installed_names: - _print_install_success(n, "local") - click.echo(f"\n {len(installed_names)} skill(s) installed from local path.") - else: - click.echo("No valid skills found in directory.") + _batch_install_skills(discovered, path, skills_dir, "local", result) def _register_installed_skill(name: str, source: str = "cowhub"): @@ -345,39 +385,38 @@ def _read_skill_description(skill_dir: str) -> str: return "" -def _install_url(url: str): +def _install_url(url: str, result: InstallResult): """Install a skill from a direct SKILL.md URL.""" - click.echo(f"Downloading SKILL.md from {url} ...") + result.messages.append(f"Downloading SKILL.md from {url} ...") try: resp = requests.get(url, timeout=30) resp.raise_for_status() except Exception as e: - click.echo(f"Error: Failed to download SKILL.md: {e}", err=True) - sys.exit(1) + raise SkillInstallError(f"Failed to download SKILL.md: {e}") content = resp.text fm = _parse_skill_frontmatter(content) skill_name = fm.get("name") if not skill_name: - click.echo("Error: SKILL.md missing 'name' field in frontmatter.", err=True) - sys.exit(1) + raise SkillInstallError("SKILL.md missing 'name' field in frontmatter.") skill_name = skill_name.strip() - _validate_skill_name(skill_name) + _check_skill_name(skill_name) skills_dir = get_skills_dir() os.makedirs(skills_dir, exist_ok=True) skill_dir = os.path.join(skills_dir, skill_name) if os.path.isdir(skill_dir): - click.echo(f"Skill '{skill_name}' already exists. Overwriting SKILL.md ...") + result.messages.append(f"Skill '{skill_name}' already exists. Overwriting SKILL.md ...") os.makedirs(skill_dir, exist_ok=True) with open(os.path.join(skill_dir, "SKILL.md"), "w", encoding="utf-8") as f: f.write(content) _register_installed_skill(skill_name, source="url") - _print_install_success(skill_name, "url") + result.installed.append(skill_name) + result.messages.append(f"Installed '{skill_name}' from URL.") def _print_install_success(name: str, source: str): @@ -410,6 +449,20 @@ def _validate_github_spec(spec: str): sys.exit(1) +def _check_skill_name(name: str): + """Raise SkillInstallError if name is invalid.""" + if not _SAFE_NAME_RE.match(name): + raise SkillInstallError( + f"Invalid skill name '{name}'. Use only letters, digits, hyphens, and underscores." + ) + + +def _check_github_spec(spec: str): + """Raise SkillInstallError if spec is not owner/repo.""" + if not re.match(r"^[a-zA-Z0-9_\-]+/[a-zA-Z0-9_.\-]+$", spec): + raise SkillInstallError(f"Invalid GitHub spec '{spec}'. Expected format: owner/repo") + + def _safe_extractall(zf: zipfile.ZipFile, dest: str): """Extract zip while guarding against Zip Slip (path traversal).""" dest = os.path.realpath(dest) @@ -441,6 +494,18 @@ def _verify_checksum(content: bytes, expected: str): return True +def _check_checksum(content: bytes, expected: str): + """Raise SkillInstallError on SHA-256 checksum mismatch.""" + if not expected: + return + actual = hashlib.sha256(content).hexdigest() + if actual != expected.lower(): + raise SkillInstallError( + f"Checksum mismatch! Expected: {expected}, Actual: {actual}. " + "The downloaded package may have been tampered with." + ) + + @click.group() def skill(): """Manage CowAgent skills.""" @@ -612,7 +677,98 @@ def search(query): # ------------------------------------------------------------------ -# cow skill install +# Core install function — reusable from CLI and chat plugin +# ------------------------------------------------------------------ + +def install_skill(name: str) -> InstallResult: + """Core install logic, usable from CLI and chat plugin. + + Accepts all formats: Skill Hub name, owner/repo, GitHub/GitLab URL, + git@ SSH, local path, SKILL.md URL. + Returns InstallResult with installed skill names and messages. + """ + result = InstallResult() + try: + _route_install(name, result) + except SkillInstallError as e: + result.error = str(e) + return result + + +def _route_install(name: str, result: InstallResult): + """Dispatch to the appropriate installer based on input format.""" + # --- Local path --- + if name.startswith(("./", "../", "/", "~/")): + _install_local(name, result) + return + + # --- Direct SKILL.md URL --- + if name.startswith(("http://", "https://")) and name.rstrip("/").endswith("SKILL.md"): + dir_url = re.sub(r'/SKILL\.md/?$', '', name) + gh = _parse_github_url(dir_url) + if gh: + owner, repo, branch, subpath = gh + _install_github(f"{owner}/{repo}", result, subpath=subpath, skill_name=( + subpath.rstrip("/").split("/")[-1] if subpath else repo + ), branch=branch) + return + _install_url(name, result) + return + + # --- Full GitHub URL --- + parsed = _parse_github_url(name) + if parsed: + owner, repo, branch, subpath = parsed + _install_github(f"{owner}/{repo}", result, subpath=subpath, branch=branch) + return + + # --- Full GitLab URL --- + gl = _parse_gitlab_url(name) + if gl: + owner, repo, branch, subpath = gl + _install_gitlab(f"{owner}/{repo}", result, subpath=subpath, branch=branch) + return + + # --- git@host:owner/repo.git SSH URL --- + ssh = _parse_git_ssh_url(name) + if ssh: + host, owner, repo = ssh + _install_git_clone(name, result, display_name=f"{owner}/{repo}") + return + + # --- github: prefix --- + if name.startswith("github:"): + raw = name[7:] + subpath = None + if "#" in raw: + raw, subpath = raw.split("#", 1) + _check_github_spec(raw) + _install_github(raw, result, subpath=subpath) + return + + # --- clawhub: prefix --- + if name.startswith("clawhub:"): + skill_name = name[8:] + _check_skill_name(skill_name) + _install_hub(skill_name, result, provider="clawhub") + return + + # --- owner/repo or owner/repo#subpath shorthand --- + if re.match(r"^[a-zA-Z0-9_\-]+/[a-zA-Z0-9_.\-]+(?:#.+)?$", name): + subpath = None + spec = name + if "#" in spec: + spec, subpath = spec.split("#", 1) + _install_github(spec, result, subpath=subpath) + return + + # --- Fallback: Skill Hub by name --- + _check_skill_name(name) + _install_hub(name, result) + + +# ------------------------------------------------------------------ +# cow skill install (CLI thin wrapper) # ------------------------------------------------------------------ @skill.command() @click.argument("name") @@ -641,82 +797,20 @@ def install(name): cow skill install https://example.com/path/to/SKILL.md """ - # --- Local path --- - if name.startswith(("./", "../", "/", "~/")): - _install_local(name) - return - - # --- Direct SKILL.md URL --- - if name.startswith(("http://", "https://")) and name.rstrip("/").endswith("SKILL.md"): - dir_url = re.sub(r'/SKILL\.md/?$', '', name) - gh = _parse_github_url(dir_url) - if gh: - owner, repo, branch, subpath = gh - _install_github(f"{owner}/{repo}", subpath=subpath, skill_name=( - subpath.rstrip("/").split("/")[-1] if subpath else repo - ), branch=branch) - return - _install_url(name) - return - - # --- Full GitHub URL --- - parsed = _parse_github_url(name) - if parsed: - owner, repo, branch, subpath = parsed - _install_github(f"{owner}/{repo}", subpath=subpath, branch=branch) - return - - # --- Full GitLab URL --- - gl = _parse_gitlab_url(name) - if gl: - owner, repo, branch, subpath = gl - _install_gitlab(f"{owner}/{repo}", subpath=subpath, branch=branch) - return - - # --- git@host:owner/repo.git SSH URL --- - ssh = _parse_git_ssh_url(name) - if ssh: - host, owner, repo = ssh - _install_git_clone(name, f"{owner}/{repo}") - return - - # --- github: prefix --- - if name.startswith("github:"): - raw = name[7:] - subpath = None - if "#" in raw: - raw, subpath = raw.split("#", 1) - _validate_github_spec(raw) - _install_github(raw, subpath=subpath) - return - - # --- clawhub: prefix --- - if name.startswith("clawhub:"): - skill_name = name[8:] - _validate_skill_name(skill_name) - _install_hub(skill_name, provider="clawhub") - return - - # --- owner/repo or owner/repo#subpath shorthand --- - if re.match(r"^[a-zA-Z0-9_\-]+/[a-zA-Z0-9_.\-]+(?:#.+)?$", name): - subpath = None - spec = name - if "#" in spec: - spec, subpath = spec.split("#", 1) - _install_github(spec, subpath=subpath) - return - - # --- Fallback: Skill Hub by name --- - _validate_skill_name(name) - _install_hub(name) + result = install_skill(name) + for msg in result.messages: + click.echo(msg) + if result.error: + click.echo(f"Error: {result.error}", err=True) + sys.exit(1) -def _install_hub(name, provider=None): +def _install_hub(name, result: InstallResult, provider=None): """Install a skill from Skill Hub.""" skills_dir = get_skills_dir() os.makedirs(skills_dir, exist_ok=True) - click.echo(f"Fetching skill info for '{name}'...") + result.messages.append(f"Fetching skill info for '{name}'...") try: body = {} @@ -730,13 +824,12 @@ def _install_hub(name, provider=None): resp.raise_for_status() except requests.HTTPError as e: if e.response is not None and e.response.status_code == 404: - click.echo(f"Error: Skill '{name}' not found on Skill Hub.", err=True) - else: - click.echo(f"Error: Failed to fetch skill: {e}", err=True) - sys.exit(1) + raise SkillInstallError(f"Skill '{name}' not found on Skill Hub.") + raise SkillInstallError(f"Failed to fetch skill: {e}") + except SkillInstallError: + raise except Exception as e: - click.echo(f"Error: Failed to connect to Skill Hub: {e}", err=True) - sys.exit(1) + raise SkillInstallError(f"Failed to connect to Skill Hub: {e}") content_type = resp.headers.get("Content-Type", "") @@ -749,12 +842,12 @@ def _install_hub(name, provider=None): parsed_url = _parse_github_url(source_url) if parsed_url: owner, repo, branch, subpath = parsed_url - click.echo(f"Source: GitHub ({source_url})") - _install_github(f"{owner}/{repo}", subpath=subpath, skill_name=name, branch=branch) + result.messages.append(f"Source: GitHub ({source_url})") + _install_github(f"{owner}/{repo}", result, subpath=subpath, skill_name=name, branch=branch) else: - _validate_github_spec(source_url) - click.echo(f"Source: GitHub ({source_url})") - _install_github(source_url, skill_name=name) + _check_github_spec(source_url) + result.messages.append(f"Source: GitHub ({source_url})") + _install_github(source_url, result, skill_name=name) return if source_type == "registry": @@ -762,25 +855,25 @@ def _install_hub(name, provider=None): if download_url: parsed = urlparse(download_url) if parsed.scheme != "https": - click.echo(f"Error: Refusing to download from non-HTTPS URL.", err=True) - sys.exit(1) - provider = data.get("source_provider", "registry") + raise SkillInstallError("Refusing to download from non-HTTPS URL.") + src_provider = data.get("source_provider", "registry") expected_checksum = data.get("checksum") or data.get("sha256") - click.echo(f"Source: {provider}") - click.echo("Downloading skill package...") + result.messages.append(f"Source: {src_provider}") + result.messages.append("Downloading skill package...") try: dl_resp = requests.get(download_url, timeout=60, allow_redirects=True) dl_resp.raise_for_status() except Exception as e: - click.echo(f"Error: Failed to download from {provider}: {e}", err=True) - sys.exit(1) - _verify_checksum(dl_resp.content, expected_checksum) - _install_zip_bytes(dl_resp.content, name, skills_dir) - _register_installed_skill(name, source=provider) - _print_install_success(name, provider) + raise SkillInstallError(f"Failed to download from {src_provider}: {e}") + _check_checksum(dl_resp.content, expected_checksum) + installed_before = len(result.installed) + _install_zip_bytes(dl_resp.content, name, skills_dir, result=result, source_label=src_provider) + if len(result.installed) == installed_before: + _register_installed_skill(name, source=src_provider) + result.installed.append(name) + result.messages.append(f"Installed '{name}' from {src_provider}.") else: - click.echo(f"Error: Unsupported registry provider.", err=True) - sys.exit(1) + raise SkillInstallError("Unsupported registry provider.") return if "redirect" in data: @@ -788,81 +881,76 @@ def _install_hub(name, provider=None): parsed_url = _parse_github_url(source_url) if parsed_url: owner, repo, branch, subpath = parsed_url - click.echo(f"Source: GitHub ({source_url})") - _install_github(f"{owner}/{repo}", subpath=subpath, skill_name=name, branch=branch) + result.messages.append(f"Source: GitHub ({source_url})") + _install_github(f"{owner}/{repo}", result, subpath=subpath, skill_name=name, branch=branch) else: - _validate_github_spec(source_url) - click.echo(f"Source: GitHub ({source_url})") - _install_github(source_url, skill_name=name) + _check_github_spec(source_url) + result.messages.append(f"Source: GitHub ({source_url})") + _install_github(source_url, result, skill_name=name) return elif "application/zip" in content_type: - click.echo("Downloading skill package...") + result.messages.append("Downloading skill package...") expected_checksum = resp.headers.get("X-Checksum-Sha256") - _verify_checksum(resp.content, expected_checksum) - _install_zip_bytes(resp.content, name, skills_dir) - _register_installed_skill(name) - _print_install_success(name, "cowhub") + _check_checksum(resp.content, expected_checksum) + installed_before = len(result.installed) + _install_zip_bytes(resp.content, name, skills_dir, result=result, source_label="cowhub") + if len(result.installed) == installed_before: + _register_installed_skill(name) + result.installed.append(name) + result.messages.append(f"Installed '{name}' from Skill Hub.") return - click.echo(f"Error: Unexpected response from Skill Hub.", err=True) - sys.exit(1) + raise SkillInstallError("Unexpected response from Skill Hub.") -def _install_github(spec, subpath=None, skill_name=None, branch="main", source="github"): +def _install_github(spec, result: InstallResult, subpath=None, skill_name=None, branch="main", source="github"): """Install skill(s) from a GitHub repo. Strategy: zip download first (no API rate limit), Contents API as fallback. - - When subpath is given, install that single skill directory. - When subpath is None, scan the repo for all skills/ subdirectories containing - SKILL.md and batch-install them. - - spec format: owner/repo or owner/repo#path """ if "#" in spec and not subpath: spec, subpath = spec.split("#", 1) - _validate_github_spec(spec) + _check_github_spec(spec) skills_dir = get_skills_dir() os.makedirs(skills_dir, exist_ok=True) owner, repo = spec.split("/", 1) - # --- Primary: zip download (no rate limit) --- - click.echo(f"Downloading from GitHub: {spec} (branch: {branch})...") + result.messages.append(f"Downloading from GitHub: {spec} (branch: {branch})...") tmp_dir = None repo_root = None try: tmp_dir, repo_root = _download_repo_zip(spec, branch) except Exception: - click.echo("Zip download failed, falling back to Contents API...") + result.messages.append("Zip download failed, falling back to Contents API...") if repo_root: try: - _install_from_repo_root(repo_root, spec, subpath, skill_name, skills_dir, source) + _install_from_repo_root(repo_root, spec, subpath, skill_name, skills_dir, source, result) return - except SystemExit: + except SkillInstallError: raise except Exception as e: - click.echo(f"Error processing zip: {e}", err=True) - click.echo("Falling back to Contents API...") + result.messages.append(f"Error processing zip: {e}") + result.messages.append("Falling back to Contents API...") finally: if tmp_dir: shutil.rmtree(tmp_dir, ignore_errors=True) - # --- Fallback: Contents API (rate-limited but works for single skill) --- if not subpath: - click.echo("Error: Zip download failed and batch install requires zip.", err=True) - click.echo(f" Try again or specify a subpath: cow skill install {spec}#skills/", err=True) - sys.exit(1) + raise SkillInstallError( + f"Zip download failed and batch install requires zip. " + f"Try again or specify a subpath: {spec}#skills/" + ) if not skill_name: skill_name = subpath.rstrip("/").split("/")[-1] - _validate_skill_name(skill_name) + _check_skill_name(skill_name) - click.echo(f"Downloading via Contents API: {spec}/{subpath} ...") + result.messages.append(f"Downloading via Contents API: {spec}/{subpath} ...") target_dir = os.path.join(skills_dir, skill_name) try: with tempfile.TemporaryDirectory() as api_tmp: @@ -873,116 +961,109 @@ def _install_github(spec, subpath=None, skill_name=None, branch="main", source=" shutil.rmtree(target_dir) shutil.copytree(api_dest, target_dir) _register_installed_skill(skill_name, source=source) - _print_install_success(skill_name, source) + result.installed.append(skill_name) + result.messages.append(f"Installed '{skill_name}' from GitHub.") except Exception as e: - click.echo(f"Error: Contents API also failed: {e}", err=True) - sys.exit(1) + raise SkillInstallError(f"Contents API also failed: {e}") -def _install_from_repo_root(repo_root, spec, subpath, skill_name, skills_dir, source): +def _install_from_repo_root(repo_root, spec, subpath, skill_name, skills_dir, source, result: InstallResult): """Install skill(s) from an already-extracted repo root directory.""" if subpath: source_dir = os.path.join(repo_root, subpath.strip("/")) if not os.path.isdir(source_dir): - click.echo(f"Error: Path '{subpath}' not found in repository.", err=True) - sys.exit(1) + raise SkillInstallError(f"Path '{subpath}' not found in repository.") - if not skill_name: - fm = _parse_skill_frontmatter( - _read_file_text(os.path.join(source_dir, "SKILL.md")) - ) - skill_name = fm.get("name") or subpath.rstrip("/").split("/")[-1] - _validate_skill_name(skill_name) + if os.path.isfile(os.path.join(source_dir, "SKILL.md")): + if not skill_name: + fm = _parse_skill_frontmatter( + _read_file_text(os.path.join(source_dir, "SKILL.md")) + ) + skill_name = fm.get("name") or subpath.rstrip("/").split("/")[-1] + _check_skill_name(skill_name) - target_dir = os.path.join(skills_dir, skill_name) - if os.path.exists(target_dir): - shutil.rmtree(target_dir) - shutil.copytree(source_dir, target_dir) - _register_installed_skill(skill_name, source=source) - _print_install_success(skill_name, source) + target_dir = os.path.join(skills_dir, skill_name) + if os.path.exists(target_dir): + shutil.rmtree(target_dir) + shutil.copytree(source_dir, target_dir) + _register_installed_skill(skill_name, source=source) + result.installed.append(skill_name) + result.messages.append(f"Installed '{skill_name}' from {source}.") + return + + discovered = _scan_skills_in_dir(source_dir) + if discovered: + _batch_install_skills(discovered, spec, skills_dir, source, result) + return + + raise SkillInstallError(f"No SKILL.md found in '{subpath}' or its subdirectories.") else: - # Auto-discover all skills in the repo discovered = _scan_skills_in_repo(repo_root) if not discovered: if skill_name: - _validate_skill_name(skill_name) + _check_skill_name(skill_name) else: skill_name = spec.split("/")[-1] - _validate_skill_name(skill_name) + _check_skill_name(skill_name) target_dir = os.path.join(skills_dir, skill_name) if os.path.exists(target_dir): shutil.rmtree(target_dir) shutil.copytree(repo_root, target_dir) _register_installed_skill(skill_name, source=source) - _print_install_success(skill_name, source) + result.installed.append(skill_name) + result.messages.append(f"Installed '{skill_name}' from {source}.") return - click.echo(f"Found {len(discovered)} skill(s) in {spec}:\n") - installed_names = [] - for sname, sdir in discovered: - safe_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', sname)[:64] - if not _SAFE_NAME_RE.match(safe_name): - click.echo(click.style(f" ✗ Skipping '{sname}' (invalid name)", fg="yellow")) - continue - target_dir = os.path.join(skills_dir, safe_name) - if os.path.exists(target_dir): - shutil.rmtree(target_dir) - shutil.copytree(sdir, target_dir) - _register_installed_skill(safe_name, source=source) - installed_names.append(safe_name) - - if installed_names: - click.echo("") - for n in installed_names: - _print_install_success(n, source) - click.echo(f"\n {len(installed_names)} skill(s) installed from {spec}.") - else: - click.echo("No valid skills found in repository.") + _batch_install_skills(discovered, spec, skills_dir, source, result) -def _install_gitlab(spec, subpath=None, branch="main"): +def _install_gitlab(spec, result: InstallResult, subpath=None, branch="main"): """Install skill(s) from a GitLab repo via zip download.""" - _validate_github_spec(spec) + _check_github_spec(spec) skills_dir = get_skills_dir() os.makedirs(skills_dir, exist_ok=True) - click.echo(f"Downloading from GitLab: {spec} (branch: {branch})...") + result.messages.append(f"Downloading from GitLab: {spec} (branch: {branch})...") try: tmp_dir, repo_root = _download_repo_zip(spec, branch, host="gitlab") except Exception as e: - click.echo(f"Error: Failed to download from GitLab: {e}", err=True) - sys.exit(1) + raise SkillInstallError(f"Failed to download from GitLab: {e}") try: - _install_from_repo_root(repo_root, spec, subpath, None, skills_dir, "gitlab") + _install_from_repo_root(repo_root, spec, subpath, None, skills_dir, "gitlab", result) finally: shutil.rmtree(tmp_dir, ignore_errors=True) -def _install_git_clone(git_url: str, display_name: str = ""): +def _install_git_clone(git_url: str, result: InstallResult, display_name: str = ""): """Install skill(s) from any git URL via shallow clone.""" skills_dir = get_skills_dir() os.makedirs(skills_dir, exist_ok=True) - click.echo(f"Cloning {display_name or git_url} ...") + result.messages.append(f"Cloning {display_name or git_url} ...") try: tmp_dir, repo_root = _clone_repo(git_url) except RuntimeError as e: - click.echo(f"Error: {e}", err=True) - sys.exit(1) + raise SkillInstallError(str(e)) try: - _install_from_repo_root(repo_root, display_name or git_url, None, None, skills_dir, "git") + _install_from_repo_root(repo_root, display_name or git_url, None, None, skills_dir, "git", result) finally: shutil.rmtree(tmp_dir, ignore_errors=True) -def _install_zip_bytes(content, name, skills_dir): - """Extract a zip archive into the skills directory.""" +def _install_zip_bytes(content, name, skills_dir, result: InstallResult = None, source_label: str = "zip"): + """Extract a zip archive and install skill(s). + + Supports three scenarios: + 1. Root contains SKILL.md → single skill install + 2. Contains multiple skill dirs (skills/, or immediate children with SKILL.md) → batch install + 3. Fallback → treat the entire archive as a single skill named `name` + """ with tempfile.TemporaryDirectory() as tmp_dir: zip_path = os.path.join(tmp_dir, "package.zip") with open(zip_path, "wb") as f: @@ -993,14 +1074,35 @@ def _install_zip_bytes(content, name, skills_dir): _safe_extractall(zf, extract_dir) top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")] - source = extract_dir + pkg_root = extract_dir if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])): - source = os.path.join(extract_dir, top_items[0]) + pkg_root = os.path.join(extract_dir, top_items[0]) + + discovered = _scan_skills_in_repo(pkg_root) or _scan_skills_in_dir(pkg_root) + + if discovered and len(discovered) > 1 and result is not None: + _batch_install_skills(discovered, name, skills_dir, source_label, result) + return + + if discovered and len(discovered) == 1: + sname, sdir = discovered[0] + safe_name = re.sub(r'[^a-zA-Z0-9_\-]', '-', sname)[:64] + if not _SAFE_NAME_RE.match(safe_name): + safe_name = name + target = os.path.join(skills_dir, safe_name) + if os.path.exists(target): + shutil.rmtree(target) + shutil.copytree(sdir, target) + _register_installed_skill(safe_name, source=source_label) + if result is not None: + result.installed.append(safe_name) + result.messages.append(f"Installed '{safe_name}' from {source_label}.") + return target = os.path.join(skills_dir, name) if os.path.exists(target): shutil.rmtree(target) - shutil.copytree(source, target) + shutil.copytree(pkg_root, target) diff --git a/plugins/cow_cli/cow_cli.py b/plugins/cow_cli/cow_cli.py index 7d2fce09..97f722d8 100644 --- a/plugins/cow_cli/cow_cli.py +++ b/plugins/cow_cli/cow_cli.py @@ -599,239 +599,38 @@ class CowCliPlugin(Plugin): return "请指定要安装的技能: /skill install <名称>" try: - from cli.utils import get_skills_dir, SKILL_HUB_API - from cli.commands.skill import _parse_github_url, _download_github_dir - import requests - import shutil - import zipfile - import tempfile + from cli.commands.skill import install_skill + result = install_skill(name) - skills_dir = get_skills_dir() - os.makedirs(skills_dir, exist_ok=True) + if result.error: + return f"安装失败: {result.error}" - if name.startswith(("http://", "https://")) and name.rstrip("/").endswith("SKILL.md"): - import re as re_mod - dir_url = re_mod.sub(r'/SKILL\.md/?$', '', name) - gh = _parse_github_url(dir_url) - if gh: - owner, repo, branch, subpath = gh - spec = f"{owner}/{repo}" - skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo - return self._skill_install_github( - spec, skills_dir, subpath=subpath, skill_name=skill_name, branch=branch - ) - return self._skill_install_url(name, skills_dir) + if not result.installed: + return "\n".join(result.messages) if result.messages else "未找到可安装的技能" - parsed = _parse_github_url(name) - if parsed: - owner, repo, branch, subpath = parsed - spec = f"{owner}/{repo}" - skill_name = subpath.rstrip("/").split("/")[-1] if subpath else repo - return self._skill_install_github( - spec, skills_dir, subpath=subpath, skill_name=skill_name, branch=branch - ) - - provider = None - if name.startswith("github:"): - name = name[7:] - elif name.startswith("clawhub:"): - name = name[8:] - provider = "clawhub" - - body = {} - if provider: - body["provider"] = provider - resp = requests.post( - f"{SKILL_HUB_API}/skills/{name}/download", - json=body, - timeout=15, - ) - resp.raise_for_status() - - content_type = resp.headers.get("Content-Type", "") - - if "application/json" in content_type: - data = resp.json() - source_type = data.get("source_type") - if source_type == "github" or "redirect" in data: - source_url = data.get("source_url", "") - parsed_url = _parse_github_url(source_url) - if parsed_url: - owner, repo, branch, subpath = parsed_url - return self._skill_install_github( - f"{owner}/{repo}", skills_dir, subpath=subpath, - skill_name=name, branch=branch - ) - return self._skill_install_github(source_url, skills_dir, skill_name=name) - if source_type == "registry": - download_url = data.get("download_url") - if not download_url: - return f"此技能来自不支持的注册表,无法自动安装。" - from urllib.parse import urlparse - if urlparse(download_url).scheme != "https": - return "安装失败: 下载地址不安全 (非 HTTPS)" - provider = data.get("source_provider", "registry") - try: - dl_resp = requests.get(download_url, timeout=60, allow_redirects=True) - dl_resp.raise_for_status() - except Exception as e: - return f"从 {provider} 下载失败: {e}" - self._extract_zip(dl_resp.content, name, skills_dir) - self._register_skill(name, source=provider) - return self._format_install_success(name, provider) - - elif "application/zip" in content_type: - self._extract_zip(resp.content, name, skills_dir) - self._register_skill(name, source="cowhub") - return self._format_install_success(name, "cowhub") - - return "技能商店返回了未预期的响应格式" - - except requests.HTTPError as e: - if e.response is not None and e.response.status_code == 404: - return f"技能 '{name}' 未在技能商店中找到" - return f"安装失败: {e}" + return self._format_install_result(result) except Exception as e: return f"安装失败: {e}" - def _skill_install_url(self, url: str, skills_dir: str) -> str: - """Install a skill from a direct SKILL.md URL.""" - import requests - from cli.commands.skill import _parse_skill_frontmatter - - try: - resp = requests.get(url, timeout=30) - resp.raise_for_status() - except Exception as e: - return f"下载 SKILL.md 失败: {e}" - - content = resp.text - fm = _parse_skill_frontmatter(content) - skill_name = fm.get("name") - if not skill_name: - return "SKILL.md 中未找到 name 字段,无法安装" - - skill_name = skill_name.strip() - skill_dir = os.path.join(skills_dir, skill_name) - os.makedirs(skill_dir, exist_ok=True) - - with open(os.path.join(skill_dir, "SKILL.md"), "w", encoding="utf-8") as f: - f.write(content) - - self._register_skill(skill_name, source="url") - return self._format_install_success(skill_name, "url") - - def _skill_install_github(self, spec: str, skills_dir: str, - subpath: str = None, skill_name: str = None, - branch: str = "main") -> str: - import requests - import shutil - import zipfile - import tempfile - from cli.commands.skill import _download_github_dir - - if "#" in spec and not subpath: - spec, subpath = spec.split("#", 1) - if not skill_name: - skill_name = subpath.rstrip("/").split("/")[-1] if subpath else spec.split("/")[-1] - - owner, repo = spec.split("/", 1) - target_dir = os.path.join(skills_dir, skill_name) - - # For subpath installs, try Contents API first - if subpath: - try: - with tempfile.TemporaryDirectory() as tmp_dir: - api_dest = os.path.join(tmp_dir, skill_name) - os.makedirs(api_dest) - _download_github_dir(owner, repo, branch, subpath.strip("/"), api_dest) - if os.path.exists(target_dir): - shutil.rmtree(target_dir) - shutil.copytree(api_dest, target_dir) - self._register_skill(skill_name, source="github") - return self._format_install_success(skill_name, "github") - except Exception: - pass # fall through to zip download - - # Fallback: download full repo zip - zip_url = f"https://github.com/{spec}/archive/refs/heads/{branch}.zip" - try: - resp = requests.get(zip_url, timeout=60, allow_redirects=True) - resp.raise_for_status() - except Exception as e: - return f"从 GitHub 下载失败: {e}" - - with tempfile.TemporaryDirectory() as tmp_dir: - zip_path = os.path.join(tmp_dir, "repo.zip") - with open(zip_path, "wb") as f: - f.write(resp.content) - - extract_dir = os.path.join(tmp_dir, "extracted") - with zipfile.ZipFile(zip_path, "r") as zf: - zf.extractall(extract_dir) - - top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")] - repo_root = extract_dir - if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])): - repo_root = os.path.join(extract_dir, top_items[0]) - - if subpath: - source_dir = os.path.join(repo_root, subpath.strip("/")) - if not os.path.isdir(source_dir): - return f"路径 '{subpath}' 在仓库中不存在" - else: - source_dir = repo_root - - if os.path.exists(target_dir): - shutil.rmtree(target_dir) - shutil.copytree(source_dir, target_dir) - - self._register_skill(skill_name, source="github") - return self._format_install_success(skill_name, "github") - - def _extract_zip(self, content: bytes, name: str, skills_dir: str): - import zipfile - import tempfile - import shutil - - with tempfile.TemporaryDirectory() as tmp_dir: - zip_path = os.path.join(tmp_dir, "package.zip") - with open(zip_path, "wb") as f: - f.write(content) - - extract_dir = os.path.join(tmp_dir, "extracted") - with zipfile.ZipFile(zip_path, "r") as zf: - zf.extractall(extract_dir) - - top_items = [d for d in os.listdir(extract_dir) if not d.startswith(".")] - source = extract_dir - if len(top_items) == 1 and os.path.isdir(os.path.join(extract_dir, top_items[0])): - source = os.path.join(extract_dir, top_items[0]) - - target = os.path.join(skills_dir, name) - if os.path.exists(target): - shutil.rmtree(target) - shutil.copytree(source, target) - @staticmethod - def _register_skill(name: str, source: str = "cowhub"): - try: - from cli.commands.skill import _register_installed_skill - _register_installed_skill(name, source=source) - except Exception: - pass - - @staticmethod - def _format_install_success(name: str, source: str) -> str: + def _format_install_result(result) -> str: + """Format InstallResult into a chat-friendly message.""" from cli.commands.skill import _read_skill_description from cli.utils import get_skills_dir - desc = _read_skill_description(os.path.join(get_skills_dir(), name)) - lines = [f"✅ {name}"] - if desc: - if len(desc) > 60: - desc = desc[:57] + "…" - lines.append(f" {desc}") - lines.append(f" 来源: {source}") + skills_dir = get_skills_dir() + + lines = [] + for skill_name in result.installed: + desc = _read_skill_description(os.path.join(skills_dir, skill_name)) + lines.append(f"✅ {skill_name}") + if desc: + if len(desc) > 60: + desc = desc[:57] + "…" + lines.append(f" {desc}") + + if len(result.installed) > 1: + lines.append(f"\n共安装 {len(result.installed)} 个技能") + return "\n".join(lines) def _skill_uninstall(self, name: str) -> str: