feat: support skills creator and gemini models

This commit is contained in:
saboteur7
2026-01-30 18:00:10 +08:00
parent 49fb4034c6
commit dd6a9c26bd
30 changed files with 3562 additions and 833 deletions

View File

@@ -447,35 +447,27 @@ class MemoryManager:
today_file = self.flush_manager.get_today_memory_file().name
if lang == "zh":
guidance = f"""## 记忆召回
下方"背景知识"包含你的核心长期记忆,可直接使用。如果背景知识中没有相关信息,再用 memory_search 搜索历史记录memory/*.md 日期文件)。
guidance = f"""## 记忆系统
## 记忆存储
当用户分享持久偏好、决策或重要事实时(无论是否明确要求"记住"),主动存储:
- 持久信息(偏好、决策、人物信息)→ memory/MEMORY.md
- 当天的笔记和上下文 → memory/{today_file}
- 静默存储,仅在用户明确要求时确认
**背景知识**: 下方包含核心长期记忆,可直接使用。需要查找历史时,用 memory_search 搜索(搜索一次即可,不要重复)。
## 记忆使用原则
- 不要主动提起或列举记忆内容
- 只在用户明确询问相关信息时才使用记忆
- 记忆是背景知识,不是要展示的内容
- 自然使用记忆,就像你本来就知道这些信息"""
**存储记忆**: 当用户分享重要信息时(偏好、决策、事实等),主动用 write 工具存储:
- 长期信息 → memory/MEMORY.md
- 当天笔记 → memory/{today_file}
- 静默存储,仅在明确要求时确认
**使用原则**: 自然使用记忆,就像你本来就知道。不要主动提起或列举记忆,除非用户明确询问。"""
else:
guidance = f"""## Memory Recall
"Background Knowledge" below contains your core long-term memories - use them directly. If information is not in Background Knowledge, use memory_search to search, then use memory_get to read files (path format: memory/MEMORY.md, memory/2026-01-30.md).
guidance = f"""## Memory System
## Memory Storage
When user shares durable preferences, decisions, or important facts (whether or not they explicitly say "remember"), proactively store:
- Durable info (preferences, decisions, people) → memory/MEMORY.md
- Daily notes and context → memory/{today_file}
- Store silently; only confirm when explicitly requested
**Background Knowledge**: Core long-term memories below - use directly. For history, use memory_search once (don't repeat).
## Memory Usage Principles
- Don't proactively mention or list memory contents
- Only use memories when user explicitly asks about them
- Memories are background knowledge, not content to showcase
- Use memories naturally as if you inherently knew this information"""
**Store Memories**: When user shares important info (preferences, decisions, facts), proactively write:
- Durable info → memory/MEMORY.md
- Daily notes → memory/{today_file}
- Store silently; confirm only when explicitly requested
**Usage**: Use memories naturally as if you always knew. Don't mention or list unless user explicitly asks."""
if include_context:
# Load bootstrap context (MEMORY.md only, like clawdbot)

13
agent/prompt/__init__.py Normal file
View File

@@ -0,0 +1,13 @@
"""
Agent Prompt Module - 系统提示词构建模块
"""
from .builder import PromptBuilder, build_agent_system_prompt
from .workspace import ensure_workspace, load_context_files
__all__ = [
'PromptBuilder',
'build_agent_system_prompt',
'ensure_workspace',
'load_context_files',
]

601
agent/prompt/builder.py Normal file
View File

@@ -0,0 +1,601 @@
"""
System Prompt Builder - 系统提示词构建器
参考 clawdbot 的 system-prompt.ts实现中文版的模块化提示词构建
"""
import os
from typing import List, Dict, Optional, Any
from dataclasses import dataclass
from common.log import logger
@dataclass
class ContextFile:
"""上下文文件"""
path: str
content: str
class PromptBuilder:
"""提示词构建器"""
def __init__(self, workspace_dir: str, language: str = "zh"):
"""
初始化提示词构建器
Args:
workspace_dir: 工作空间目录
language: 语言 ("zh""en")
"""
self.workspace_dir = workspace_dir
self.language = language
def build(
self,
base_persona: Optional[str] = None,
user_identity: Optional[Dict[str, str]] = None,
tools: Optional[List[Any]] = None,
context_files: Optional[List[ContextFile]] = None,
skill_manager: Any = None,
memory_manager: Any = None,
runtime_info: Optional[Dict[str, Any]] = None,
**kwargs
) -> str:
"""
构建完整的系统提示词
Args:
base_persona: 基础人格描述会被context_files中的SOUL.md覆盖
user_identity: 用户身份信息
tools: 工具列表
context_files: 上下文文件列表SOUL.md, USER.md, README.md等
skill_manager: 技能管理器
memory_manager: 记忆管理器
runtime_info: 运行时信息
**kwargs: 其他参数
Returns:
完整的系统提示词
"""
return build_agent_system_prompt(
workspace_dir=self.workspace_dir,
language=self.language,
base_persona=base_persona,
user_identity=user_identity,
tools=tools,
context_files=context_files,
skill_manager=skill_manager,
memory_manager=memory_manager,
runtime_info=runtime_info,
**kwargs
)
def build_agent_system_prompt(
workspace_dir: str,
language: str = "zh",
base_persona: Optional[str] = None,
user_identity: Optional[Dict[str, str]] = None,
tools: Optional[List[Any]] = None,
context_files: Optional[List[ContextFile]] = None,
skill_manager: Any = None,
memory_manager: Any = None,
runtime_info: Optional[Dict[str, Any]] = None,
**kwargs
) -> str:
"""
构建Agent系统提示词精简版中文
包含的sections:
1. 基础身份
2. 工具说明
3. 技能系统
4. 记忆系统
5. 用户身份
6. 文档路径
7. 工作空间
8. 项目上下文文件
Args:
workspace_dir: 工作空间目录
language: 语言 ("zh""en")
base_persona: 基础人格描述
user_identity: 用户身份信息
tools: 工具列表
context_files: 上下文文件列表
skill_manager: 技能管理器
memory_manager: 记忆管理器
runtime_info: 运行时信息
**kwargs: 其他参数
Returns:
完整的系统提示词
"""
sections = []
# 1. 基础身份
sections.extend(_build_identity_section(base_persona, language))
# 2. 工具说明
if tools:
sections.extend(_build_tooling_section(tools, language))
# 3. 技能系统
if skill_manager:
sections.extend(_build_skills_section(skill_manager, tools, language))
# 4. 记忆系统
if memory_manager:
sections.extend(_build_memory_section(memory_manager, tools, language))
# 5. 用户身份
if user_identity:
sections.extend(_build_user_identity_section(user_identity, language))
# 6. 工作空间
sections.extend(_build_workspace_section(workspace_dir, language))
# 7. 项目上下文文件SOUL.md, USER.md等
if context_files:
sections.extend(_build_context_files_section(context_files, language))
# 8. 运行时信息(如果有)
if runtime_info:
sections.extend(_build_runtime_section(runtime_info, language))
return "\n".join(sections)
def _build_identity_section(base_persona: Optional[str], language: str) -> List[str]:
"""构建基础身份section - 不再需要身份由SOUL.md定义"""
# 不再生成基础身份section完全由SOUL.md定义
return []
def _build_tooling_section(tools: List[Any], language: str) -> List[str]:
"""构建工具说明section"""
if language == "zh":
lines = [
"## 工具系统",
"",
"你可以使用以下工具来完成任务。工具名称是大小写敏感的,请严格按照列表中的名称调用。",
"",
"### 可用工具",
"",
]
else:
lines = [
"## Tooling",
"",
"You have access to the following tools. Tool names are case-sensitive.",
"",
"### Available Tools",
"",
]
# 工具分类和排序
tool_categories = {
"文件操作": ["read", "write", "edit", "ls", "grep", "find"],
"命令执行": ["bash", "terminal"],
"网络搜索": ["web_search", "web_fetch", "browser"],
"记忆系统": ["memory_search", "memory_get"],
"其他": []
}
# 构建工具映射
tool_map = {}
tool_descriptions = {
"read": "读取文件内容",
"write": "创建或覆盖文件",
"edit": "精确编辑文件内容",
"ls": "列出目录内容",
"grep": "在文件中搜索内容",
"find": "按照模式查找文件",
"bash": "执行shell命令",
"terminal": "管理后台进程",
"web_search": "网络搜索(使用搜索引擎)",
"web_fetch": "获取URL内容",
"browser": "控制浏览器",
"memory_search": "搜索记忆文件",
"memory_get": "获取记忆文件内容",
"calculator": "计算器",
"current_time": "获取当前时间",
}
for tool in tools:
tool_name = tool.name if hasattr(tool, 'name') else str(tool)
tool_desc = tool.description if hasattr(tool, 'description') else tool_descriptions.get(tool_name, "")
tool_map[tool_name] = tool_desc
# 按分类添加工具
for category, tool_names in tool_categories.items():
category_tools = [(name, tool_map.get(name, "")) for name in tool_names if name in tool_map]
if category_tools:
if language == "zh":
lines.append(f"**{category}**:")
else:
lines.append(f"**{category}**:")
for name, desc in category_tools:
if desc:
lines.append(f"- `{name}`: {desc}")
else:
lines.append(f"- `{name}`")
del tool_map[name] # 移除已添加的工具
lines.append("")
# 添加其他未分类的工具
if tool_map:
if language == "zh":
lines.append("**其他工具**:")
else:
lines.append("**Other Tools**:")
for name, desc in sorted(tool_map.items()):
if desc:
lines.append(f"- `{name}`: {desc}")
else:
lines.append(f"- `{name}`")
lines.append("")
# 工具使用指南
if language == "zh":
lines.extend([
"### 工具调用风格",
"",
"**默认规则**: 对于常规、低风险的工具调用,无需叙述,直接调用即可。",
"",
"**需要叙述的情况**:",
"- 多步骤、复杂的任务",
"- 敏感操作(如删除文件)",
"- 用户明确要求解释过程",
"",
"**叙述要求**: 保持简洁、有价值,避免重复显而易见的步骤。使用自然的人类语言。",
"",
])
else:
lines.extend([
"### Tool Call Style",
"",
"**Default**: Do not narrate routine, low-risk tool calls (just call the tool).",
"",
"**Narrate when**:",
"- Multi-step, complex work",
"- Sensitive actions (e.g., deletions)",
"- User explicitly asks",
"",
"**Keep narration brief and value-dense**. Use plain human language.",
"",
])
return lines
def _build_skills_section(skill_manager: Any, tools: Optional[List[Any]], language: str) -> List[str]:
"""构建技能系统section"""
if not skill_manager:
return []
# 获取read工具名称
read_tool_name = "read"
if tools:
for tool in tools:
tool_name = tool.name if hasattr(tool, 'name') else str(tool)
if tool_name.lower() == "read":
read_tool_name = tool_name
break
if language == "zh":
lines = [
"## 技能系统",
"",
"在回复之前:扫描下方 <available_skills> 中的 <description> 条目。",
"",
f"- 如果恰好有一个技能明确适用:使用 `{read_tool_name}` 工具读取其 <location> 路径下的 SKILL.md 文件,然后遵循它。",
"- 如果多个技能都适用:选择最具体的一个,然后读取并遵循。",
"- 如果没有明确适用的:不要读取任何 SKILL.md。",
"",
"**约束**: 永远不要一次性读取多个技能;只在选择后再读取。",
"",
]
else:
lines = [
"## Skills",
"",
"Before replying: scan <available_skills> <description> entries.",
"",
f"- If exactly one skill clearly applies: read its SKILL.md at <location> with `{read_tool_name}`, then follow it.",
"- If multiple could apply: choose the most specific one, then read/follow it.",
"- If none clearly apply: do not read any SKILL.md.",
"",
"**Constraints**: never read more than one skill up front; only read after selecting.",
"",
]
# 添加技能列表通过skill_manager获取
try:
skills_prompt = skill_manager.build_skills_prompt()
if skills_prompt:
lines.append(skills_prompt.strip())
lines.append("")
except Exception as e:
logger.warning(f"Failed to build skills prompt: {e}")
return lines
def _build_memory_section(memory_manager: Any, tools: Optional[List[Any]], language: str) -> List[str]:
"""构建记忆系统section"""
if not memory_manager:
return []
# 检查是否有memory工具
has_memory_tools = False
if tools:
tool_names = [tool.name if hasattr(tool, 'name') else str(tool) for tool in tools]
has_memory_tools = any(name in ['memory_search', 'memory_get'] for name in tool_names)
if not has_memory_tools:
return []
if language == "zh":
lines = [
"## 记忆系统",
"",
"在回答关于以前的工作、决定、日期、人物、偏好或待办事项的任何问题之前:",
"",
"1. 使用 `memory_search` 在 MEMORY.md 和 memory/*.md 中搜索",
"2. 然后使用 `memory_get` 只拉取需要的行",
"3. 如果搜索后仍然信心不足,告诉用户你已经检查过了",
"",
"**记忆文件结构**:",
"- `memory/MEMORY.md`: 长期记忆,包含重要的背景信息",
"- `memory/YYYY-MM-DD.md`: 每日记忆,记录当天的对话和事件",
"",
"**存储记忆**:",
"- 当用户分享重要信息时(偏好、爱好、决策、事实等),**主动用 write 工具存储**",
"- 长期信息 → memory/MEMORY.md",
"- 当天笔记 → memory/YYYY-MM-DD.md",
"- 静默存储,仅在明确要求时确认",
"",
"**使用原则**:",
"- 自然使用记忆,就像你本来就知道",
"- 不要主动提起或列举记忆,除非用户明确询问",
"",
]
else:
lines = [
"## Memory System",
"",
"Before answering anything about prior work, decisions, dates, people, preferences, or todos:",
"",
"1. Run `memory_search` on MEMORY.md + memory/*.md",
"2. Then use `memory_get` to pull only the needed lines",
"3. If low confidence after search, say you checked",
"",
"**Memory File Structure**:",
"- `memory/MEMORY.md`: Long-term memory with important context",
"- `memory/YYYY-MM-DD.md`: Daily memories for each day",
"",
"**Store Memories**:",
"- When user shares important info (preferences, hobbies, decisions, facts), **proactively write**",
"- Durable info → memory/MEMORY.md",
"- Daily notes → memory/YYYY-MM-DD.md",
"- Store silently; confirm only when explicitly requested",
"",
"**Usage Principles**:",
"- Use memories naturally as if you always knew",
"- Don't mention or list unless user explicitly asks",
"",
]
return lines
def _build_user_identity_section(user_identity: Dict[str, str], language: str) -> List[str]:
"""构建用户身份section"""
if not user_identity:
return []
if language == "zh":
lines = [
"## 用户身份",
"",
]
if user_identity.get("name"):
lines.append(f"**用户姓名**: {user_identity['name']}")
if user_identity.get("nickname"):
lines.append(f"**称呼**: {user_identity['nickname']}")
if user_identity.get("timezone"):
lines.append(f"**时区**: {user_identity['timezone']}")
if user_identity.get("notes"):
lines.append(f"**备注**: {user_identity['notes']}")
lines.append("")
else:
lines = [
"## User Identity",
"",
]
if user_identity.get("name"):
lines.append(f"**Name**: {user_identity['name']}")
if user_identity.get("nickname"):
lines.append(f"**Call them**: {user_identity['nickname']}")
if user_identity.get("timezone"):
lines.append(f"**Timezone**: {user_identity['timezone']}")
if user_identity.get("notes"):
lines.append(f"**Notes**: {user_identity['notes']}")
lines.append("")
return lines
def _build_docs_section(workspace_dir: str, language: str) -> List[str]:
"""构建文档路径section - 已移除,不再需要"""
# 不再生成文档section
return []
def _build_workspace_section(workspace_dir: str, language: str) -> List[str]:
"""构建工作空间section"""
if language == "zh":
lines = [
"## 工作空间",
"",
f"你的工作目录是: `{workspace_dir}`",
"",
"除非用户明确指示,否则将此目录视为文件操作的全局工作空间。",
"",
"**重要说明 - 文件已自动加载**:",
"",
"以下文件在会话启动时**已经自动加载**到系统提示词的「项目上下文」section 中,你**无需再用 read 工具读取它们**",
"",
"- ✅ `SOUL.md`: 已加载 - Agent的人格设定",
"- ✅ `USER.md`: 已加载 - 用户的身份信息",
"- ✅ `AGENTS.md`: 已加载 - 工作空间使用指南"
"",
"**首次对话**:",
"",
"如果这是你与用户的首次对话,并且你的人格设定和用户信息还是空白或初始状态,你应该:",
"",
"1. **以自然、友好的方式**打招呼并表达想要了解用户的意愿",
"2. 询问用户关于他们自己的信息(姓名、职业、偏好、时区等)",
"3. 询问用户希望你成为什么样的助理(性格、风格、称呼、专长等)",
"4. 使用 `write` 工具将信息保存到相应文件USER.md 和 SOUL.md",
"5. 之后可以随时使用 `edit` 工具更新这些配置",
"",
"**重要**: 在询问时保持自然对话风格,**不要提及文件名**(如 SOUL.md、USER.md 等技术细节),除非用户主动询问系统实现。用自然的表达如「了解你的信息」「设定我的性格」等。",
"",
"**记忆管理**:",
"",
"- 当用户说「记住这个」时,判断应该写入哪个文件:",
" - 关于你自己的配置 → SOUL.md",
" - 关于用户的信息 → USER.md",
" - 重要的背景信息 → memory/MEMORY.md",
" - 日常对话记录 → memory/YYYY-MM-DD.md",
"",
]
else:
lines = [
"## Workspace",
"",
f"Your working directory is: `{workspace_dir}`",
"",
"Treat this directory as the single global workspace for file operations unless explicitly instructed otherwise.",
"",
"**Workspace Files (Auto-loaded)**:",
"",
"The following user-editable files are automatically loaded and included in the Project Context below:",
"",
"- `SOUL.md`: Agent persona (your personality, style, and principles)",
"- `USER.md`: User identity (name, preferences, important dates)",
"- `AGENTS.md`: Workspace guidelines (your rules and workflows)",
"- `TOOLS.md`: Custom tool usage notes (configurations and tips)",
"- `MEMORY.md`: Long-term memory (important context and decisions)",
"",
"**First Conversation**:",
"",
"If this is your first conversation with the user, and your persona and user information are empty or contain placeholders, you should:",
"",
"1. **Greet naturally and warmly**, expressing your interest in learning about them",
"2. Ask about the user (name, job, preferences, timezone, etc.)",
"3. Ask what kind of assistant they want you to be (personality, style, name, expertise)",
"4. Use `write` tool to save the information to appropriate files (USER.md and SOUL.md)",
"5. Later, use `edit` tool to update these configurations as needed",
"",
"**Important**: Keep the conversation natural. **Do NOT mention file names** (like SOUL.md, USER.md, etc.) unless the user specifically asks about implementation details. Use natural expressions like \"learn about you\", \"configure my personality\", etc.",
"",
"**Memory Management**:",
"",
"- When user says 'remember this', decide which file to write to:",
" - About your configuration → SOUL.md",
" - About the user → USER.md",
" - Important context → memory/MEMORY.md",
" - Daily chat logs → memory/YYYY-MM-DD.md",
"",
]
return lines
def _build_context_files_section(context_files: List[ContextFile], language: str) -> List[str]:
"""构建项目上下文文件section"""
if not context_files:
return []
# 检查是否有SOUL.md
has_soul = any(
f.path.lower().endswith('soul.md') or 'soul.md' in f.path.lower()
for f in context_files
)
if language == "zh":
lines = [
"# 项目上下文",
"",
"以下项目上下文文件已被加载:",
"",
]
if has_soul:
lines.append("如果存在 `SOUL.md`,请体现其中定义的人格和语气。避免僵硬、模板化的回复;遵循其指导,除非有更高优先级的指令覆盖它。")
lines.append("")
else:
lines = [
"# Project Context",
"",
"The following project context files have been loaded:",
"",
]
if has_soul:
lines.append("If `SOUL.md` is present, embody its persona and tone. Avoid stiff, generic replies; follow its guidance unless higher-priority instructions override it.")
lines.append("")
# 添加每个文件的内容
for file in context_files:
lines.append(f"## {file.path}")
lines.append("")
lines.append(file.content)
lines.append("")
return lines
def _build_runtime_section(runtime_info: Dict[str, Any], language: str) -> List[str]:
"""构建运行时信息section"""
if not runtime_info:
return []
# Only include if there's actual runtime info to display
runtime_parts = []
if runtime_info.get("model"):
runtime_parts.append(f"模型={runtime_info['model']}" if language == "zh" else f"model={runtime_info['model']}")
if runtime_info.get("workspace"):
runtime_parts.append(f"工作空间={runtime_info['workspace']}" if language == "zh" else f"workspace={runtime_info['workspace']}")
# Only add channel if it's not the default "web"
if runtime_info.get("channel") and runtime_info.get("channel") != "web":
runtime_parts.append(f"渠道={runtime_info['channel']}" if language == "zh" else f"channel={runtime_info['channel']}")
if not runtime_parts:
return []
if language == "zh":
lines = [
"## 运行时信息",
"",
"运行时: " + " | ".join(runtime_parts),
""
]
else:
lines = [
"## Runtime",
"",
"Runtime: " + " | ".join(runtime_parts),
""
]
return lines

332
agent/prompt/workspace.py Normal file
View File

@@ -0,0 +1,332 @@
"""
Workspace Management - 工作空间管理模块
负责初始化工作空间、创建模板文件、加载上下文文件
"""
import os
from typing import List, Optional, Dict
from dataclasses import dataclass
from common.log import logger
from .builder import ContextFile
# 默认文件名常量
DEFAULT_SOUL_FILENAME = "SOUL.md"
DEFAULT_USER_FILENAME = "USER.md"
DEFAULT_AGENTS_FILENAME = "AGENTS.md"
DEFAULT_MEMORY_FILENAME = "MEMORY.md"
@dataclass
class WorkspaceFiles:
"""工作空间文件路径"""
soul_path: str
user_path: str
agents_path: str
memory_path: str
memory_dir: str
def ensure_workspace(workspace_dir: str, create_templates: bool = True) -> WorkspaceFiles:
"""
确保工作空间存在,并创建必要的模板文件
Args:
workspace_dir: 工作空间目录路径
create_templates: 是否创建模板文件(首次运行时)
Returns:
WorkspaceFiles对象包含所有文件路径
"""
# 确保目录存在
os.makedirs(workspace_dir, exist_ok=True)
# 定义文件路径
soul_path = os.path.join(workspace_dir, DEFAULT_SOUL_FILENAME)
user_path = os.path.join(workspace_dir, DEFAULT_USER_FILENAME)
agents_path = os.path.join(workspace_dir, DEFAULT_AGENTS_FILENAME)
memory_path = os.path.join(workspace_dir, DEFAULT_MEMORY_FILENAME)
memory_dir = os.path.join(workspace_dir, "memory")
# 创建memory子目录
os.makedirs(memory_dir, exist_ok=True)
# 如果需要,创建模板文件
if create_templates:
_create_template_if_missing(soul_path, _get_soul_template())
_create_template_if_missing(user_path, _get_user_template())
_create_template_if_missing(agents_path, _get_agents_template())
_create_template_if_missing(memory_path, _get_memory_template())
logger.info(f"[Workspace] Initialized workspace at: {workspace_dir}")
return WorkspaceFiles(
soul_path=soul_path,
user_path=user_path,
agents_path=agents_path,
memory_path=memory_path,
memory_dir=memory_dir
)
def load_context_files(workspace_dir: str, files_to_load: Optional[List[str]] = None) -> List[ContextFile]:
"""
加载工作空间的上下文文件
Args:
workspace_dir: 工作空间目录
files_to_load: 要加载的文件列表相对路径如果为None则加载所有标准文件
Returns:
ContextFile对象列表
"""
if files_to_load is None:
# 默认加载的文件(按优先级排序)
files_to_load = [
DEFAULT_SOUL_FILENAME,
DEFAULT_USER_FILENAME,
DEFAULT_AGENTS_FILENAME,
]
context_files = []
for filename in files_to_load:
filepath = os.path.join(workspace_dir, filename)
if not os.path.exists(filepath):
continue
try:
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read().strip()
# 跳过空文件或只包含模板占位符的文件
if not content or _is_template_placeholder(content):
continue
context_files.append(ContextFile(
path=filename,
content=content
))
logger.debug(f"[Workspace] Loaded context file: {filename}")
except Exception as e:
logger.warning(f"[Workspace] Failed to load {filename}: {e}")
return context_files
def _create_template_if_missing(filepath: str, template_content: str):
"""如果文件不存在,创建模板文件"""
if not os.path.exists(filepath):
try:
with open(filepath, 'w', encoding='utf-8') as f:
f.write(template_content)
logger.debug(f"[Workspace] Created template: {os.path.basename(filepath)}")
except Exception as e:
logger.error(f"[Workspace] Failed to create template {filepath}: {e}")
def _is_template_placeholder(content: str) -> bool:
"""检查内容是否为模板占位符"""
# 常见的占位符模式
placeholders = [
"*(填写",
"*(在首次对话时填写",
"*(可选)",
"*(根据需要添加",
]
lines = content.split('\n')
non_empty_lines = [line.strip() for line in lines if line.strip() and not line.strip().startswith('#')]
# 如果没有实际内容(只有标题和占位符)
if len(non_empty_lines) <= 3:
for placeholder in placeholders:
if any(placeholder in line for line in non_empty_lines):
return True
return False
# ============= 模板内容 =============
def _get_soul_template() -> str:
"""Agent人格设定模板"""
return """# SOUL.md - 我是谁?
*在首次对话时与用户一起填写这个文件,定义你的身份和性格。*
## 基本信息
- **名字**: *(在首次对话时填写,可以是用户给你起的名字)*
- **角色**: *(AI助理、智能管家、技术顾问等)*
- **性格**: *(友好、专业、幽默、严谨等)*
## 交流风格
*(描述你如何与用户交流:)*
- 使用什么样的语言风格?(正式/轻松/幽默)
- 回复长度偏好?(简洁/详细)
- 是否使用表情符号?
## 核心能力
*(你擅长什么?)*
- 文件管理和代码编辑
- 网络搜索和信息查询
- 记忆管理和上下文理解
- 任务规划和执行
## 行为准则
*(你遵循的基本原则:)*
1. 始终在执行破坏性操作前确认
2. 优先使用工具而不是猜测
3. 主动记录重要信息到记忆文件
4. 定期整理和总结对话内容
---
**注意**: 这不仅仅是元数据,这是你真正的灵魂。随着时间的推移,你可以使用 `edit` 工具来更新这个文件,让它更好地反映你的成长。
"""
def _get_user_template() -> str:
"""用户身份信息模板"""
return """# USER.md - 关于我的用户
*了解你正在帮助的人。随着了解的深入,更新此文件。*
## 基本信息
- **姓名**: *(在首次对话时询问)*
- **称呼**: *(用户希望被如何称呼)*
- **职业**: *(可选)*
- **时区**: *(例如: Asia/Shanghai)*
## 联系方式
- **微信**:
- **邮箱**:
- **其他**:
## 偏好设置
- **语言**: 中文
- **工作时间**: *(例如: 9:00-18:00)*
- **提醒方式**: *(用户偏好的提醒方式)*
## 重要日期
- **生日**:
- **其他重要日期**:
## 上下文
*(用户关心什么?正在做什么项目?有什么习惯?什么会让他们开心?随着时间积累这些信息。)*
---
**记住**: 你了解得越多,就能帮助得越好。但要尊重隐私 - 这是在了解一个人,而不是建立档案。
"""
def _get_agents_template() -> str:
"""工作空间指南模板"""
return """# AGENTS.md - 工作空间指南
这个文件夹是你的家。好好对待它。
## 系统自动加载
以下文件在每次会话启动时**已经自动加载**到系统提示词中,你无需再次读取:
- ✅ `SOUL.md` - 你的人格设定(已加载)
- ✅ `USER.md` - 用户信息(已加载)
- ✅ `AGENTS.md` - 本文件(已加载)
## 按需读取
以下文件**不会自动加载**,需要时使用相应工具读取:
- 📝 `memory/YYYY-MM-DD.md` - 每日记忆(用 memory_search 检索)
- 🧠 `MEMORY.md` - 长期记忆(用 memory_search 检索)
## 记忆系统
你每次会话都是全新的。这些文件是你的连续性:
### 📝 每日记忆:`memory/YYYY-MM-DD.md`
- 原始的对话日志
- 记录当天发生的事情
- 如果 `memory/` 目录不存在,创建它
### 🧠 长期记忆:`MEMORY.md`
- 你精选的记忆,就像人类的长期记忆
- **仅在主会话中加载**(与用户的直接聊天)
- **不要在共享上下文中加载**(群聊、与其他人的会话)
- 这是为了**安全** - 包含不应泄露给陌生人的个人上下文
- 你可以在主会话中自由**读取、编辑和更新** MEMORY.md
- 记录重要事件、想法、决定、观点、经验教训
- 这是你精选的记忆 - 精华,而不是原始日志
### 📝 写下来 - 不要"记在心里"
- **记忆是有限的** - 如果你想记住某事,写入文件
- "记在心里"不会在会话重启后保留,文件才会
- 当有人说"记住这个" → 更新 `memory/YYYY-MM-DD.md` 或相关文件
- 当你学到教训 → 更新 AGENTS.md、TOOLS.md 或相关技能
- 当你犯错 → 记录下来,这样未来的你不会重复
- **文字 > 大脑** 📝
## 安全
- 永远不要泄露私人数据
- 不要在未经询问的情况下运行破坏性命令
- 当有疑问时,先问
## 工具使用
技能提供你的工具。当你需要一个时,查看它的 `SKILL.md`。在 `TOOLS.md` 中保留本地笔记相机名称、SSH详情、语音偏好
## 让它成为你的
这只是一个起点。随着你弄清楚什么有效,添加你自己的约定、风格和规则。
"""
def _get_memory_template() -> str:
"""长期记忆模板"""
return """# MEMORY.md - 长期记忆
*这是你精选的长期记忆。重要的背景信息、决策和经验教训都记录在这里。*
## 重要背景
*(记录与用户相关的重要背景信息)*
## 关键决策
*(记录做过的重要决定及其原因)*
## 经验教训
*(记录学到的教训和避免的陷阱)*
## 项目和目标
*(记录正在进行的项目和长期目标)*
---
**使用指南**:
- 定期从每日记忆文件中提取重要内容更新到这里
- 保持内容精炼和有价值
- 移除过时或不再相关的信息
- 这应该是精华的总结,而不是流水账
"""

View File

@@ -162,9 +162,16 @@ class Agent:
# DeepSeek
elif 'deepseek' in model_name:
return 64000
# Gemini models
elif 'gemini' in model_name:
if '2.0' in model_name or 'exp' in model_name:
return 2000000 # Gemini 2.0: 2M tokens
else:
return 1000000 # Gemini 1.5: 1M tokens
# Default conservative value
return 10000
return 128000
def _get_context_reserve_tokens(self) -> int:
"""
@@ -176,9 +183,10 @@ class Agent:
if self.context_reserve_tokens is not None:
return self.context_reserve_tokens
# Reserve ~20% of context window for new requests
# Reserve ~10% of context window, with min 10K and max 200K
context_window = self._get_model_context_window()
return max(4000, int(context_window * 0.2))
reserve = int(context_window * 0.1)
return max(10000, min(200000, reserve))
def _estimate_message_tokens(self, message: dict) -> int:
"""

View File

@@ -111,16 +111,20 @@ class AgentStreamExecutor:
if usage and 'input_tokens' in usage:
current_tokens = usage.get('input_tokens', 0)
context_window = self.agent._get_model_context_window()
reserve_tokens = self.agent.context_reserve_tokens or 20000
# Use configured reserve_tokens or calculate based on context window
reserve_tokens = self.agent._get_context_reserve_tokens()
# Use smaller soft_threshold to trigger flush earlier (e.g., at 50K tokens)
soft_threshold = 10000 # Trigger 10K tokens before limit
if self.agent.memory_manager.should_flush_memory(
current_tokens=current_tokens,
context_window=context_window,
reserve_tokens=reserve_tokens
reserve_tokens=reserve_tokens,
soft_threshold=soft_threshold
):
self._emit_event("memory_flush_start", {
"current_tokens": current_tokens,
"threshold": context_window - reserve_tokens - 4000
"threshold": context_window - reserve_tokens - soft_threshold
})
# TODO: Execute memory flush in background
@@ -385,6 +389,14 @@ class AgentStreamExecutor:
"execution_time": execution_time
}
# Auto-refresh skills after skill creation
if tool_name == "bash" and result.status == "success":
command = arguments.get("command", "")
if "init_skill.py" in command and self.agent.skill_manager:
logger.info("🔄 Detected skill creation, refreshing skills...")
self.agent.refresh_skills()
logger.info(f"✅ Skills refreshed! Now have {len(self.agent.skill_manager.skills)} skills")
self._emit_event("tool_execution_end", {
"tool_call_id": tool_id,
"tool_name": tool_name,

View File

@@ -20,7 +20,6 @@ class Bash(BaseTool):
IMPORTANT SAFETY GUIDELINES:
- You can freely create, modify, and delete files within the current workspace
- For operations outside the workspace or potentially destructive commands (rm -rf, system commands, etc.), always explain what you're about to do and ask for user confirmation first
- Be especially careful with: file deletions, system modifications, network operations, or commands that might affect system stability
- When in doubt, describe the command's purpose and ask for permission before executing"""
params: dict = {

View File

@@ -80,7 +80,13 @@ class MemorySearchTool(BaseTool):
))
if not results:
return ToolResult.success(f"No relevant memories found for query: {query}")
# Return clear message that no memories exist yet
# This prevents infinite retry loops
return ToolResult.success(
f"No memories found for '{query}'. "
f"This is normal if no memories have been stored yet. "
f"You can store new memories by writing to memory/MEMORY.md or memory/YYYY-MM-DD.md files."
)
# Format results
output = [f"Found {len(results)} relevant memories:\n"]

View File

@@ -1,3 +0,0 @@
from .terminal import Terminal
__all__ = ['Terminal']

View File

@@ -1,100 +0,0 @@
import platform
import subprocess
from typing import Dict, Any
from agent.tools.base_tool import BaseTool, ToolResult
class Terminal(BaseTool):
name: str = "terminal"
description: str = "A tool to run terminal commands on the local system"
params: dict = {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": f"The terminal command to execute which should be valid in {platform.system()} platform"
}
},
"required": ["command"]
}
config: dict = {}
def __init__(self, config=None):
self.config = config or {}
# Set of dangerous commands that should be blocked
self.command_ban_set = {"halt", "poweroff", "shutdown", "reboot", "rm", "kill",
"exit", "sudo", "su", "userdel", "groupdel", "logout", "alias"}
def execute(self, args: Dict[str, Any]) -> ToolResult:
"""
Execute a terminal command safely.
:param args: Dictionary containing the command to execute
:return: Result of the command execution
"""
command = args.get("command", "").strip()
# Check if the command is safe to execute
if not self._is_safe_command(command):
return ToolResult.fail(result=f"Command '{command}' is not allowed for security reasons.")
try:
result = subprocess.run(
command,
shell=True,
check=True, # Raise exception on non-zero return code
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=self.config.get("timeout", 30)
)
return ToolResult.success({
"stdout": result.stdout,
"stderr": result.stderr,
"return_code": result.returncode,
"command": command
})
except subprocess.CalledProcessError as e:
# Preserve the original error handling for CalledProcessError
return ToolResult.fail({
"stdout": e.stdout,
"stderr": e.stderr,
"return_code": e.returncode,
"command": command
})
except subprocess.TimeoutExpired:
return ToolResult.fail(result=f"Command timed out after {self.config.get('timeout', 20)} seconds.")
except Exception as e:
return ToolResult.fail(result=f"Error executing command: {str(e)}")
def _is_safe_command(self, command: str) -> bool:
"""
Check if a command is safe to execute.
:param command: The command to check
:return: True if the command is safe, False otherwise
"""
# Split the command to get the base command
cmd_parts = command.split()
if not cmd_parts:
return False
base_cmd = cmd_parts[0].lower()
# Check if the base command is in the ban list
if base_cmd in self.command_ban_set:
return False
# Check for sudo/su commands
if any(banned in command.lower() for banned in ["sudo ", "su -"]):
return False
# Check for rm -rf or similar dangerous patterns
if "rm" in base_cmd and ("-rf" in command or "-r" in command or "-f" in command):
return False
# Additional security checks can be added here
return True

View File

@@ -1,255 +0,0 @@
# WebFetch 工具实现总结
## 实现完成 ✅
基于 clawdbot 的 `web_fetch` 工具,我们成功实现了一个免费的网页抓取工具。
## 核心特性
### 1. 完全免费 💰
- ❌ 不需要任何 API Key
- ❌ 不需要付费服务
- ✅ 只需要基础的 HTTP 请求
### 2. 智能内容提取 🎯
- **优先级 1**: Mozilla Readability最佳效果
- **优先级 2**: 基础 HTML 清理(降级方案)
- **优先级 3**: 原始内容(非 HTML
### 3. 格式支持 📝
- Markdown 格式输出
- 纯文本格式输出
- 自动 HTML 实体解码
## 文件结构
```
agent/tools/web_fetch/
├── __init__.py # 模块导出
├── web_fetch.py # 主要实现367 行)
├── test_web_fetch.py # 测试脚本
├── README.md # 使用文档
└── IMPLEMENTATION_SUMMARY.md # 本文件
```
## 技术实现
### 依赖层级
```
必需依赖:
└── requests (HTTP 请求)
推荐依赖:
├── readability-lxml (智能提取)
└── html2text (Markdown 转换)
```
### 核心流程
```python
1. 验证 URL
检查协议 (http/https)
验证格式
2. 发送 HTTP 请求
设置 User-Agent
处理重定向 (最多 3 )
请求重试 (失败 3 )
超时控制 (默认 30 )
3. 内容提取
HTML Readability 提取
HTML 基础清理 (降级)
HTML 原始返回
4. 格式转换
Markdown (html2text)
Text (正则清理)
5. 结果返回
标题
内容
元数据
截断信息
```
## 与 clawdbot 的对比
| 特性 | clawdbot (TypeScript) | 我们的实现 (Python) |
|------|----------------------|-------------------|
| 基础抓取 | ✅ | ✅ |
| Readability 提取 | ✅ | ✅ |
| Markdown 转换 | ✅ | ✅ |
| 缓存机制 | ✅ | ❌ (未实现) |
| Firecrawl 集成 | ✅ | ❌ (未实现) |
| SSRF 防护 | ✅ | ❌ (未实现) |
| 代理支持 | ✅ | ❌ (未实现) |
## 已修复的问题
### Bug #1: max_redirects 参数错误 ✅
**问题**
```python
response = self.session.get(
url,
max_redirects=self.max_redirects # ❌ requests 不支持此参数
)
```
**解决方案**
```python
# 在 session 级别设置
session.max_redirects = self.max_redirects
# 请求时只使用 allow_redirects
response = self.session.get(
url,
allow_redirects=True # ✅ 正确的参数
)
```
## 使用示例
### 基础使用
```python
from agent.tools.web_fetch import WebFetch
tool = WebFetch()
result = tool.execute({
"url": "https://example.com",
"extract_mode": "markdown",
"max_chars": 5000
})
print(result.result['text'])
```
### 在 Agent 中使用
```python
from agent.tools import WebFetch
agent = agent_bridge.create_agent(
name="MyAgent",
tools=[
WebFetch(),
# ... 其他工具
]
)
```
### 在 Skills 中引导
```markdown
---
name: web-content-reader
---
# 网页内容阅读器
当用户提供一个网址时,使用 web_fetch 工具读取内容。
<example>
用户: 帮我看看这个网页 https://example.com
助手: <tool_use name="web_fetch">
<url>https://example.com</url>
<extract_mode>text</extract_mode>
</tool_use>
</example>
```
## 性能指标
### 速度
- 简单页面: ~1-2 秒
- 复杂页面: ~3-5 秒
- 超时设置: 30 秒
### 内存
- 基础运行: ~10-20 MB
- 处理大页面: ~50-100 MB
### 成功率
- 纯文本页面: >95%
- HTML 页面: >90%
- 需要 JS 渲染: <20% (建议使用 browser 工具)
## 测试清单
- [x] 抓取简单 HTML 页面
- [x] 抓取复杂网页 (Python.org)
- [x] 处理 HTTP 重定向
- [x] 处理无效 URL
- [x] 处理请求超时
- [x] Markdown 格式输出
- [x] Text 格式输出
- [x] 内容截断
- [x] 错误处理
## 安装说明
### 最小安装
```bash
pip install requests
```
### 完整安装
```bash
pip install requests readability-lxml html2text
```
### 验证安装
```bash
python3 agent/tools/web_fetch/test_web_fetch.py
```
## 未来改进方向
### 优先级 1 (推荐)
- [ ] 添加缓存机制 (减少重复请求)
- [ ] 支持自定义 headers
- [ ] 添加 cookie 支持
### 优先级 2 (可选)
- [ ] SSRF 防护 (安全性)
- [ ] 代理支持
- [ ] Firecrawl 集成 (付费服务)
### 优先级 3 (高级)
- [ ] 自动字符编码检测
- [ ] PDF 内容提取
- [ ] 图片 OCR 支持
## 常见问题
### Q: 为什么有些页面抓取不到内容?
A: 可能原因:
1. 页面需要 JavaScript 渲染 → 使用 `browser` 工具
2. 页面有反爬虫机制 → 调整 User-Agent 或使用代理
3. 页面需要登录 → 使用 `browser` 工具进行交互
### Q: 如何提高提取质量?
A:
1. 安装 `readability-lxml`: `pip install readability-lxml`
2. 安装 `html2text`: `pip install html2text`
3. 使用 `markdown` 模式而不是 `text` 模式
### Q: 可以抓取 API 返回的 JSON 吗?
A: 可以!工具会自动检测 content-type对于 JSON 会格式化输出。
## 贡献
本实现参考了以下优秀项目:
- [Clawdbot](https://github.com/moltbot/moltbot) - Web tools 设计
- [Mozilla Readability](https://github.com/mozilla/readability) - 内容提取算法
- [html2text](https://github.com/Alir3z4/html2text) - HTML 转 Markdown
## 许可
遵循项目主许可证。

View File

@@ -1,100 +0,0 @@
"""
Test script for WebFetch tool
"""
import sys
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent))
from agent.tools.web_fetch import WebFetch
def test_web_fetch():
"""Test WebFetch tool"""
print("=" * 80)
print("Testing WebFetch Tool")
print("=" * 80)
# Create tool instance
tool = WebFetch()
print(f"\n✅ Tool created: {tool.name}")
print(f" Description: {tool.description}")
# Test 1: Fetch a simple webpage
print("\n" + "-" * 80)
print("Test 1: Fetching example.com")
print("-" * 80)
result = tool.execute({
"url": "https://example.com",
"extract_mode": "text",
"max_chars": 1000
})
if result.status == "success":
print("✅ Success!")
data = result.result
print(f" Title: {data.get('title', 'N/A')}")
print(f" Status: {data.get('status')}")
print(f" Extractor: {data.get('extractor')}")
print(f" Length: {data.get('length')} chars")
print(f" Truncated: {data.get('truncated')}")
print(f"\n Content preview:")
print(f" {data.get('text', '')[:200]}...")
else:
print(f"❌ Failed: {result.result}")
# Test 2: Invalid URL
print("\n" + "-" * 80)
print("Test 2: Testing invalid URL")
print("-" * 80)
result = tool.execute({
"url": "not-a-valid-url"
})
if result.status == "error":
print(f"✅ Correctly rejected invalid URL: {result.result}")
else:
print(f"❌ Should have rejected invalid URL")
# Test 3: Test with a real webpage (optional)
print("\n" + "-" * 80)
print("Test 3: Fetching a real webpage (Python.org)")
print("-" * 80)
result = tool.execute({
"url": "https://www.python.org",
"extract_mode": "markdown",
"max_chars": 2000
})
if result.status == "success":
print("✅ Success!")
data = result.result
print(f" Title: {data.get('title', 'N/A')}")
print(f" Status: {data.get('status')}")
print(f" Extractor: {data.get('extractor')}")
print(f" Length: {data.get('length')} chars")
print(f" Truncated: {data.get('truncated')}")
if data.get('warning'):
print(f" ⚠️ Warning: {data.get('warning')}")
print(f"\n Content preview:")
print(f" {data.get('text', '')[:300]}...")
else:
print(f"❌ Failed: {result.result}")
# Close the tool
tool.close()
print("\n" + "=" * 80)
print("Testing complete!")
print("=" * 80)
if __name__ == "__main__":
test_web_fetch()