From 46fa07e4a91adc71661c451e3b3d24f8391aa283 Mon Sep 17 00:00:00 2001 From: zhayujie Date: Mon, 2 Feb 2026 11:48:53 +0800 Subject: [PATCH] feat: optimize agent configuration and memory --- agent/prompt/builder.py | 23 +- agent/prompt/workspace.py | 9 +- agent/protocol/agent.py | 29 +- agent/protocol/agent_stream.py | 224 +++++++-- agent/skills/loader.py | 2 +- agent/skills/manager.py | 2 +- agent/tools/__init__.py | 33 +- agent/tools/memory/memory_get.py | 2 +- agent/tools/scheduler/integration.py | 4 +- agent/tools/scheduler/scheduler_service.py | 4 +- app.py | 17 + bridge/agent_bridge.py | 49 +- channel/chat_channel.py | 12 +- channel/dingtalk/dingtalk_channel.py | 518 +++++++++++++++++++-- channel/dingtalk/dingtalk_message.py | 176 +++++-- channel/feishu/feishu_channel.py | 249 ++++++---- channel/feishu/feishu_message.py | 28 +- channel/file_cache.py | 100 ++++ channel/web/web_channel.py | 10 +- config-template.json | 35 +- config.py | 6 +- models/claudeapi/claude_api_bot.py | 2 +- models/gemini/google_gemini_bot.py | 25 +- plugins/agent/agent.py | 2 +- plugins/banwords/banwords.py | 2 +- plugins/dungeon/dungeon.py | 2 +- plugins/finish/finish.py | 2 +- plugins/godcmd/godcmd.py | 2 +- plugins/hello/hello.py | 2 +- plugins/keyword/keyword.py | 4 +- plugins/linkai/linkai.py | 2 +- plugins/plugin.py | 2 - plugins/plugin_manager.py | 6 +- plugins/role/role.py | 2 +- plugins/tool/__init__.py | 1 - skills/linkai-agent/config.json.template | 12 +- 36 files changed, 1245 insertions(+), 355 deletions(-) create mode 100644 channel/file_cache.py delete mode 100644 plugins/tool/__init__.py diff --git a/agent/prompt/builder.py b/agent/prompt/builder.py index 1727c0b..c9ae35b 100644 --- a/agent/prompt/builder.py +++ b/agent/prompt/builder.py @@ -308,21 +308,20 @@ def _build_memory_section(memory_manager: Any, tools: Optional[List[Any]], langu "在回答关于以前的工作、决定、日期、人物、偏好或待办事项的任何问题之前:", "", "1. 不确定记忆文件位置 → 先用 `memory_search` 通过关键词和语义检索相关内容", - "2. 已知文件位置 → 直接用 `memory_get` 读取相应的行", - "3. search 无结果 → 尝试用 `memory_get` 读取最近两天的记忆文件", + "2. 已知文件位置 → 直接用 `memory_get` 读取相应的行 (例如:MEMORY.md, memory/YYYY-MM-DD.md)", + "3. search 无结果 → 尝试用 `memory_get` 读取MEMORY.md及最近两天记忆文件", "", "**记忆文件结构**:", - "- `MEMORY.md`: 长期记忆(已自动加载,无需主动读取)", + "- `MEMORY.md`: 长期记忆(核心信息、偏好、决策等)", "- `memory/YYYY-MM-DD.md`: 每日记忆,记录当天的事件和对话信息", "", - "**使用原则**:", - "- 自然使用记忆,就像你本来就知道; 不用刻意提起或列举记忆,除非用户提起相关内容", + "**写入记忆**:", + "- 追加内容 → `edit` 工具,oldText 留空", + "- 修改内容 → `edit` 工具,oldText 填写要替换的文本", + "- 新建文件 → `write` 工具", + "- **禁止写入敏感信息**:API密钥、令牌等敏感信息严禁写入记忆文件", "", - "**写入记忆的正确方式**:", - "- 追加到现有文件末尾 → 用 `edit` 工具,oldText 留空", - " 例: edit(path=memory/2026-02-01.md, oldText=\"\", newText=\"\\n## 新内容\\n...\")", - "- 修改文件中的某段文字 → 用 `edit` 工具,oldText 填写要替换的文本", - "- 创建新文件 → 用 `write`", + "**使用原则**: 自然使用记忆,就像你本来就知道;不用刻意提起,除非用户问起。", "", ] @@ -392,8 +391,8 @@ def _build_workspace_section(workspace_dir: str, language: str, is_first_convers "", "**交流规范**:", "", - "- 在所有对话中,无需提及技术细节(如 SOUL.md、USER.md 等文件名,工具名称,配置等),除非用户明确询问", - "- 用自然表达如「我已记住」而非「已更新 SOUL.md」", + "- 在对话中,非必要不输出工作空间技术细节(如 SOUL.md、USER.md等文件名称,工具名称,配置等),除非用户明确询问", + "- 例如用自然表达如「我已记住」而非「已更新 MEMORY.md」", "", ] diff --git a/agent/prompt/workspace.py b/agent/prompt/workspace.py index a096706..37d9a96 100644 --- a/agent/prompt/workspace.py +++ b/agent/prompt/workspace.py @@ -64,7 +64,7 @@ def ensure_workspace(workspace_dir: str, create_templates: bool = True) -> Works _create_template_if_missing(agents_path, _get_agents_template()) _create_template_if_missing(memory_path, _get_memory_template()) - logger.info(f"[Workspace] Initialized workspace at: {workspace_dir}") + logger.debug(f"[Workspace] Initialized workspace at: {workspace_dir}") return WorkspaceFiles( soul_path=soul_path, @@ -270,14 +270,9 @@ def _get_agents_template() -> str: 2. **动态记忆 → MEMORY.md**(爱好、偏好、决策、目标、项目、教训、待办事项) 3. **当天对话 → memory/YYYY-MM-DD.md**(今天聊的内容) -**重要**: -- 爱好(唱歌、篮球等)→ MEMORY.md,不是 USER.md -- 近期计划(下周要做什么)→ MEMORY.md,不是 USER.md -- USER.md 只存放不会变的基本信息 - ## 安全 -- 永远不要泄露私人数据 +- 永远不要泄露秘钥等私人数据 - 不要在未经询问的情况下运行破坏性命令 - 当有疑问时,先问 diff --git a/agent/protocol/agent.py b/agent/protocol/agent.py index 6759ebc..1b031e0 100644 --- a/agent/protocol/agent.py +++ b/agent/protocol/agent.py @@ -1,5 +1,6 @@ import json import time +import threading from common.log import logger from agent.protocol.models import LLMRequest, LLMModel @@ -43,6 +44,7 @@ class Agent: self.output_mode = output_mode self.last_usage = None # Store last API response usage info self.messages = [] # Unified message history for stream mode + self.messages_lock = threading.Lock() # Lock for thread-safe message operations self.memory_manager = memory_manager # Memory manager for auto memory flush self.workspace_dir = workspace_dir # Workspace directory self.enable_skills = enable_skills # Skills enabled flag @@ -57,7 +59,7 @@ class Agent: try: from agent.skills import SkillManager self.skill_manager = SkillManager(workspace_dir=workspace_dir) - logger.info(f"Initialized SkillManager with {len(self.skill_manager.skills)} skills") + logger.debug(f"Initialized SkillManager with {len(self.skill_manager.skills)} skills") except Exception as e: logger.warning(f"Failed to initialize SkillManager: {e}") @@ -335,7 +337,8 @@ class Agent: """ # Clear history if requested if clear_history: - self.messages = [] + with self.messages_lock: + self.messages = [] # Get model to use if not self.model: @@ -344,7 +347,17 @@ class Agent: # Get full system prompt with skills full_system_prompt = self.get_full_system_prompt(skill_filter=skill_filter) - # Create stream executor with agent's message history + # Create a copy of messages for this execution to avoid concurrent modification + # Record the original length to track which messages are new + with self.messages_lock: + messages_copy = self.messages.copy() + original_length = len(self.messages) + + # Get max_context_turns from config + from config import conf + max_context_turns = conf().get("agent_max_context_turns", 30) + + # Create stream executor with copied message history executor = AgentStreamExecutor( agent=self, model=self.model, @@ -352,14 +365,18 @@ class Agent: tools=self.tools, max_turns=self.max_steps, on_event=on_event, - messages=self.messages # Pass agent's message history + messages=messages_copy, # Pass copied message history + max_context_turns=max_context_turns ) # Execute response = executor.run_stream(user_message) - # Update agent's message history from executor - self.messages = executor.messages + # Append only the NEW messages from this execution (thread-safe) + # This allows concurrent requests to both contribute to history + with self.messages_lock: + new_messages = executor.messages[original_length:] + self.messages.extend(new_messages) # Store executor reference for agent_bridge to access files_to_send self.stream_executor = executor diff --git a/agent/protocol/agent_stream.py b/agent/protocol/agent_stream.py index 49812b9..2b1c883 100644 --- a/agent/protocol/agent_stream.py +++ b/agent/protocol/agent_stream.py @@ -31,7 +31,8 @@ class AgentStreamExecutor: tools: List[BaseTool], max_turns: int = 50, on_event: Optional[Callable] = None, - messages: Optional[List[Dict]] = None + messages: Optional[List[Dict]] = None, + max_context_turns: int = 30 ): """ Initialize stream executor @@ -44,6 +45,7 @@ class AgentStreamExecutor: max_turns: Maximum number of turns on_event: Event callback function messages: Optional existing message history (for persistent conversations) + max_context_turns: Maximum number of conversation turns to keep in context """ self.agent = agent self.model = model @@ -52,6 +54,7 @@ class AgentStreamExecutor: self.tools = {tool.name: tool for tool in tools} if isinstance(tools, list) else tools self.max_turns = max_turns self.on_event = on_event + self.max_context_turns = max_context_turns # Message history - use provided messages or create new list self.messages = messages if messages is not None else [] @@ -147,10 +150,7 @@ class AgentStreamExecutor: Final response text """ # Log user message with model info - logger.info(f"{'='*50}") - logger.info(f"🤖 Model: {self.model.model}") - logger.info(f"👤 用户: {user_message}") - logger.info(f"{'='*50}") + logger.info(f"🤖 {self.model.model} | 👤 {user_message}") # Add user message (Claude format - use content blocks for consistency) self.messages.append({ @@ -171,7 +171,7 @@ class AgentStreamExecutor: try: while turn < self.max_turns: turn += 1 - logger.info(f"第 {turn} 轮") + logger.debug(f"第 {turn} 轮") self._emit_event("turn_start", {"turn": turn}) # Check if memory flush is needed (before calling LLM) @@ -238,7 +238,7 @@ class AgentStreamExecutor: else: logger.info(f"💭 {assistant_msg[:150]}{'...' if len(assistant_msg) > 150 else ''}") - logger.info(f"✅ 完成 (无工具调用)") + logger.debug(f"✅ 完成 (无工具调用)") self._emit_event("turn_end", { "turn": turn, "has_tool_calls": False @@ -350,11 +350,37 @@ class AgentStreamExecutor: }) if turn >= self.max_turns: - logger.warning(f"⚠️ 已达到最大轮数限制: {self.max_turns}") - if not final_response: + logger.warning(f"⚠️ 已达到最大决策步数限制: {self.max_turns}") + + # Force model to summarize without tool calls + logger.info(f"[Agent] Requesting summary from LLM after reaching max steps...") + + # Add a system message to force summary + self.messages.append({ + "role": "user", + "content": [{ + "type": "text", + "text": f"你已经执行了{turn}个决策步骤,达到了单次运行的最大步数限制。请总结一下你目前的执行过程和结果,告诉用户当前的进展情况。不要再调用工具,直接用文字回复。" + }] + }) + + # Call LLM one more time to get summary (without retry to avoid loops) + try: + summary_response, summary_tools = self._call_llm_stream(retry_on_empty=False) + if summary_response: + final_response = summary_response + logger.info(f"💭 Summary: {summary_response[:150]}{'...' if len(summary_response) > 150 else ''}") + else: + # Fallback if model still doesn't respond + final_response = ( + f"我已经执行了{turn}个决策步骤,达到了单次运行的步数上限。" + "任务可能还未完全完成,建议你将任务拆分成更小的步骤,或者换一种方式描述需求。" + ) + except Exception as e: + logger.warning(f"Failed to get summary from LLM: {e}") final_response = ( - "抱歉,我在处理你的请求时遇到了一些困难,尝试了多次仍未能完成。" - "请尝试简化你的问题,或换一种方式描述。" + f"我已经执行了{turn}个决策步骤,达到了单次运行的步数上限。" + "任务可能还未完全完成,建议你将任务拆分成更小的步骤,或者换一种方式描述需求。" ) except Exception as e: @@ -363,7 +389,7 @@ class AgentStreamExecutor: raise finally: - logger.info(f"🏁 完成({turn}轮)") + logger.debug(f"🏁 完成({turn}轮)") self._emit_event("agent_end", {"final_response": final_response}) # 每轮对话结束后增加计数(用户消息+AI回复=1轮) @@ -783,54 +809,174 @@ class AgentStreamExecutor: logger.warning(f"⚠️ Removing incomplete tool_use message from history") self.messages.pop() + def _identify_complete_turns(self) -> List[Dict]: + """ + 识别完整的对话轮次 + + 一个完整轮次包括: + 1. 用户消息(text) + 2. AI 回复(可能包含 tool_use) + 3. 工具结果(tool_result,如果有) + 4. 后续 AI 回复(如果有) + + Returns: + List of turns, each turn is a dict with 'messages' list + """ + turns = [] + current_turn = {'messages': []} + + for msg in self.messages: + role = msg.get('role') + content = msg.get('content', []) + + if role == 'user': + # 检查是否是用户查询(不是工具结果) + is_user_query = False + if isinstance(content, list): + is_user_query = any( + block.get('type') == 'text' + for block in content + if isinstance(block, dict) + ) + elif isinstance(content, str): + is_user_query = True + + if is_user_query: + # 开始新轮次 + if current_turn['messages']: + turns.append(current_turn) + current_turn = {'messages': [msg]} + else: + # 工具结果,属于当前轮次 + current_turn['messages'].append(msg) + else: + # AI 回复,属于当前轮次 + current_turn['messages'].append(msg) + + # 添加最后一个轮次 + if current_turn['messages']: + turns.append(current_turn) + + return turns + + def _estimate_turn_tokens(self, turn: Dict) -> int: + """估算一个轮次的 tokens""" + return sum( + self.agent._estimate_message_tokens(msg) + for msg in turn['messages'] + ) + def _trim_messages(self): """ - Trim message history to stay within context limits. - Uses agent's context management configuration. + 智能清理消息历史,保持对话完整性 + + 使用完整轮次作为清理单位,确保: + 1. 不会在对话中间截断 + 2. 工具调用链(tool_use + tool_result)保持完整 + 3. 每轮对话都是完整的(用户消息 + AI回复 + 工具调用) """ if not self.messages or not self.agent: return + # Step 1: 识别完整轮次 + turns = self._identify_complete_turns() + + if not turns: + return + + # Step 2: 轮次限制 - 保留最近 N 轮 + if len(turns) > self.max_context_turns: + removed_turns = len(turns) - self.max_context_turns + turns = turns[-self.max_context_turns:] # 保留最近的轮次 + + logger.info( + f"💾 上下文轮次超限: {len(turns) + removed_turns} > {self.max_context_turns}," + f"移除最早的 {removed_turns} 轮完整对话" + ) + + # Step 3: Token 限制 - 保留完整轮次 # Get context window from agent (based on model) context_window = self.agent._get_model_context_window() - # Reserve 10% for response generation - reserve_tokens = int(context_window * 0.1) - max_tokens = context_window - reserve_tokens + # Use configured max_context_tokens if available + if hasattr(self.agent, 'max_context_tokens') and self.agent.max_context_tokens: + max_tokens = self.agent.max_context_tokens + else: + # Reserve 10% for response generation + reserve_tokens = int(context_window * 0.1) + max_tokens = context_window - reserve_tokens - # Estimate current tokens - current_tokens = sum(self.agent._estimate_message_tokens(msg) for msg in self.messages) - - # Add system prompt tokens + # Estimate system prompt tokens system_tokens = self.agent._estimate_message_tokens({"role": "system", "content": self.system_prompt}) - current_tokens += system_tokens + available_tokens = max_tokens - system_tokens - # If under limit, no need to trim - if current_tokens <= max_tokens: + # Calculate current tokens + current_tokens = sum(self._estimate_turn_tokens(turn) for turn in turns) + + # If under limit, reconstruct messages and return + if current_tokens + system_tokens <= max_tokens: + # Reconstruct message list from turns + new_messages = [] + for turn in turns: + new_messages.extend(turn['messages']) + + old_count = len(self.messages) + self.messages = new_messages + + # Log if we removed messages due to turn limit + if old_count > len(self.messages): + logger.info(f" 重建消息列表: {old_count} -> {len(self.messages)} 条消息") return - # Keep messages from newest, accumulating tokens - available_tokens = max_tokens - system_tokens - kept_messages = [] + # Token limit exceeded - keep complete turns from newest + logger.info( + f"🔄 上下文tokens超限: ~{current_tokens + system_tokens} > {max_tokens}," + f"将按完整轮次移除最早的对话" + ) + + # 从最新轮次开始,反向累加(保持完整轮次) + kept_turns = [] accumulated_tokens = 0 - - for msg in reversed(self.messages): - msg_tokens = self.agent._estimate_message_tokens(msg) - if accumulated_tokens + msg_tokens <= available_tokens: - kept_messages.insert(0, msg) - accumulated_tokens += msg_tokens + min_turns = 3 # 尽量保留至少 3 轮,但不强制(避免超出 token 限制) + + for i, turn in enumerate(reversed(turns)): + turn_tokens = self._estimate_turn_tokens(turn) + turns_from_end = i + 1 + + # 检查是否超出限制 + if accumulated_tokens + turn_tokens <= available_tokens: + kept_turns.insert(0, turn) + accumulated_tokens += turn_tokens else: + # 超出限制 + # 如果还没有保留足够的轮次,且这是最后的机会,尝试保留 + if len(kept_turns) < min_turns and turns_from_end <= min_turns: + # 检查是否严重超出(超出 20% 以上则放弃) + overflow_ratio = (accumulated_tokens + turn_tokens - available_tokens) / available_tokens + if overflow_ratio < 0.2: # 允许最多超出 20% + kept_turns.insert(0, turn) + accumulated_tokens += turn_tokens + logger.debug(f" 为保留最少轮次,允许超出 {overflow_ratio*100:.1f}%") + continue + # 停止保留更早的轮次 break - + + # 重建消息列表 + new_messages = [] + for turn in kept_turns: + new_messages.extend(turn['messages']) + old_count = len(self.messages) - self.messages = kept_messages + old_turn_count = len(turns) + self.messages = new_messages new_count = len(self.messages) - + new_turn_count = len(kept_turns) + if old_count > new_count: logger.info( - f"Context trimmed: {old_count} -> {new_count} messages " - f"(~{current_tokens} -> ~{system_tokens + accumulated_tokens} tokens, " - f"limit: {max_tokens})" + f" 移除了 {old_turn_count - new_turn_count} 轮对话 " + f"({old_count} -> {new_count} 条消息," + f"~{current_tokens + system_tokens} -> ~{accumulated_tokens + system_tokens} tokens)" ) def _prepare_messages(self) -> List[Dict[str, Any]]: diff --git a/agent/skills/loader.py b/agent/skills/loader.py index 5d67c47..cc77d32 100644 --- a/agent/skills/loader.py +++ b/agent/skills/loader.py @@ -259,7 +259,7 @@ class SkillLoader: for diag in all_diagnostics[:5]: # Log first 5 logger.debug(f" - {diag}") - logger.info(f"Loaded {len(skill_map)} skills from all sources") + logger.debug(f"Loaded {len(skill_map)} skills from all sources") return skill_map diff --git a/agent/skills/manager.py b/agent/skills/manager.py index acc3b17..bf9593f 100644 --- a/agent/skills/manager.py +++ b/agent/skills/manager.py @@ -59,7 +59,7 @@ class SkillManager: extra_dirs=self.extra_dirs, ) - logger.info(f"SkillManager: Loaded {len(self.skills)} skills") + logger.debug(f"SkillManager: Loaded {len(self.skills)} skills") def get_skill(self, name: str) -> Optional[SkillEntry]: """ diff --git a/agent/tools/__init__.py b/agent/tools/__init__.py index fe37aec..3cba117 100644 --- a/agent/tools/__init__.py +++ b/agent/tools/__init__.py @@ -46,32 +46,6 @@ def _import_optional_tools(): except Exception as e: logger.error(f"[Tools] Scheduler tool failed to load: {e}") - # Google Search (requires requests) - try: - from agent.tools.google_search.google_search import GoogleSearch - tools['GoogleSearch'] = GoogleSearch - except ImportError as e: - logger.warning(f"[Tools] GoogleSearch tool not loaded - missing dependency: {e}") - except Exception as e: - logger.error(f"[Tools] GoogleSearch tool failed to load: {e}") - - # File Save (may have dependencies) - try: - from agent.tools.file_save.file_save import FileSave - tools['FileSave'] = FileSave - except ImportError as e: - logger.warning(f"[Tools] FileSave tool not loaded - missing dependency: {e}") - except Exception as e: - logger.error(f"[Tools] FileSave tool failed to load: {e}") - - # Terminal (basic, should work) - try: - from agent.tools.terminal.terminal import Terminal - tools['Terminal'] = Terminal - except ImportError as e: - logger.warning(f"[Tools] Terminal tool not loaded - missing dependency: {e}") - except Exception as e: - logger.error(f"[Tools] Terminal tool failed to load: {e}") return tools @@ -102,7 +76,7 @@ def _import_browser_tool(): # Dynamically set BrowserTool -BrowserTool = _import_browser_tool() +# BrowserTool = _import_browser_tool() # Export all tools (including optional ones that might be None) __all__ = [ @@ -119,10 +93,7 @@ __all__ = [ 'EnvConfig', 'SchedulerTool', # Optional tools (may be None if dependencies not available) - 'GoogleSearch', - 'FileSave', - 'Terminal', - 'BrowserTool' + # 'BrowserTool' ] """ diff --git a/agent/tools/memory/memory_get.py b/agent/tools/memory/memory_get.py index bf5b708..5febb10 100644 --- a/agent/tools/memory/memory_get.py +++ b/agent/tools/memory/memory_get.py @@ -20,7 +20,7 @@ class MemoryGetTool(BaseTool): "properties": { "path": { "type": "string", - "description": "Relative path to the memory file (e.g. 'memory/2026-01-01.md')" + "description": "Relative path to the memory file (e.g. 'MEMORY.md', 'memory/2026-01-01.md')" }, "start_line": { "type": "integer", diff --git a/agent/tools/scheduler/integration.py b/agent/tools/scheduler/integration.py index 8b54ccd..1882195 100644 --- a/agent/tools/scheduler/integration.py +++ b/agent/tools/scheduler/integration.py @@ -36,7 +36,7 @@ def init_scheduler(agent_bridge) -> bool: # Create task store _task_store = TaskStore(store_path) - logger.info(f"[Scheduler] Task store initialized: {store_path}") + logger.debug(f"[Scheduler] Task store initialized: {store_path}") # Create execute callback def execute_task_callback(task: dict): @@ -65,7 +65,7 @@ def init_scheduler(agent_bridge) -> bool: _scheduler_service = SchedulerService(_task_store, execute_task_callback) _scheduler_service.start() - logger.info("[Scheduler] Scheduler service initialized and started") + logger.debug("[Scheduler] Scheduler service initialized and started") return True except Exception as e: diff --git a/agent/tools/scheduler/scheduler_service.py b/agent/tools/scheduler/scheduler_service.py index 248c776..286fbc6 100644 --- a/agent/tools/scheduler/scheduler_service.py +++ b/agent/tools/scheduler/scheduler_service.py @@ -39,7 +39,7 @@ class SchedulerService: self.running = True self.thread = threading.Thread(target=self._run_loop, daemon=True) self.thread.start() - logger.info("[Scheduler] Service started") + logger.debug("[Scheduler] Service started") def stop(self): """Stop the scheduler service""" @@ -54,7 +54,7 @@ class SchedulerService: def _run_loop(self): """Main scheduler loop""" - logger.info("[Scheduler] Scheduler loop started") + logger.debug("[Scheduler] Scheduler loop started") while self.running: try: diff --git a/app.py b/app.py index 022fdd7..ad0cfca 100644 --- a/app.py +++ b/app.py @@ -59,6 +59,23 @@ def run(): os.environ["WECHATY_LOG"] = "warn" start_channel(channel_name) + + # 打印系统运行成功信息 + logger.info("") + logger.info("=" * 50) + if conf().get("agent", False): + logger.info("✅ System started successfully!") + logger.info("🐮 Cow Agent is running") + logger.info(f" Channel: {channel_name}") + logger.info(f" Model: {conf().get('model', 'unknown')}") + logger.info(f" Workspace: {conf().get('agent_workspace', '~/cow')}") + else: + logger.info("✅ System started successfully!") + logger.info("🤖 ChatBot is running") + logger.info(f" Channel: {channel_name}") + logger.info(f" Model: {conf().get('model', 'unknown')}") + logger.info("=" * 50) + logger.info("") while True: time.sleep(1) diff --git a/bridge/agent_bridge.py b/bridge/agent_bridge.py index a9884e5..43aefc7 100644 --- a/bridge/agent_bridge.py +++ b/bridge/agent_bridge.py @@ -230,12 +230,7 @@ class AgentBridge: # Log skill loading details if agent.skill_manager: - logger.info(f"[AgentBridge] SkillManager initialized:") - logger.info(f"[AgentBridge] - Managed dir: {agent.skill_manager.managed_skills_dir}") - logger.info(f"[AgentBridge] - Workspace dir: {agent.skill_manager.workspace_dir}") - logger.info(f"[AgentBridge] - Total skills: {len(agent.skill_manager.skills)}") - for skill_name in agent.skill_manager.skills.keys(): - logger.info(f"[AgentBridge] * {skill_name}") + logger.debug(f"[AgentBridge] SkillManager initialized with {len(agent.skill_manager.skills)} skills") return agent @@ -469,14 +464,19 @@ class AgentBridge: logger.info("[AgentBridge] System prompt built successfully") + # Get cost control parameters from config + max_steps = conf().get("agent_max_steps", 20) + max_context_tokens = conf().get("agent_max_context_tokens", 50000) + # Create agent with configured tools and workspace agent = self.create_agent( system_prompt=system_prompt, tools=tools, - max_steps=50, + max_steps=max_steps, output_mode="logger", workspace_dir=workspace_root, # Pass workspace to agent for skills loading - enable_skills=True # Enable skills auto-loading + enable_skills=True, # Enable skills auto-loading + max_context_tokens=max_context_tokens ) # Attach memory manager to agent if available @@ -507,7 +507,7 @@ class AgentBridge: try: from dotenv import load_dotenv load_dotenv(env_file, override=True) - logger.info(f"[AgentBridge] Loaded environment variables from {env_file} for session {session_id}") + logger.debug(f"[AgentBridge] Loaded environment variables from {env_file} for session {session_id}") except ImportError: logger.warning(f"[AgentBridge] python-dotenv not installed, skipping .env file loading for session {session_id}") except Exception as e: @@ -543,12 +543,12 @@ class AgentBridge: api_key=openai_api_key, api_base=openai_api_base or "https://api.openai.com/v1" ) - logger.info(f"[AgentBridge] OpenAI embedding initialized for session {session_id}") + logger.debug(f"[AgentBridge] OpenAI embedding initialized for session {session_id}") except Exception as embed_error: logger.warning(f"[AgentBridge] OpenAI embedding failed for session {session_id}: {embed_error}") logger.info(f"[AgentBridge] Using keyword-only search for session {session_id}") else: - logger.info(f"[AgentBridge] No OpenAI API key, using keyword-only search for session {session_id}") + logger.debug(f"[AgentBridge] No OpenAI API key, using keyword-only search for session {session_id}") # 创建 memory config memory_config = MemoryConfig(workspace_root=workspace_root) @@ -564,15 +564,15 @@ class AgentBridge: if loop.is_running(): # 如果事件循环正在运行,创建任务 asyncio.create_task(memory_manager.sync()) - logger.info(f"[AgentBridge] Memory sync scheduled for session {session_id}") + logger.debug(f"[AgentBridge] Memory sync scheduled for session {session_id}") else: # 如果没有运行的循环,直接执行 loop.run_until_complete(memory_manager.sync()) - logger.info(f"[AgentBridge] Memory synced successfully for session {session_id}") + logger.debug(f"[AgentBridge] Memory synced successfully for session {session_id}") except RuntimeError: # 没有事件循环,创建新的 asyncio.run(memory_manager.sync()) - logger.info(f"[AgentBridge] Memory synced successfully for session {session_id}") + logger.debug(f"[AgentBridge] Memory synced successfully for session {session_id}") except Exception as sync_error: logger.warning(f"[AgentBridge] Memory sync failed for session {session_id}: {sync_error}") @@ -619,7 +619,7 @@ class AgentBridge: from agent.tools.scheduler.integration import init_scheduler if init_scheduler(self): self.scheduler_initialized = True - logger.info(f"[AgentBridge] Scheduler service initialized for session {session_id}") + logger.debug(f"[AgentBridge] Scheduler service initialized for session {session_id}") except Exception as e: logger.warning(f"[AgentBridge] Failed to initialize scheduler for session {session_id}: {e}") @@ -651,7 +651,7 @@ class AgentBridge: try: from agent.skills import SkillManager skill_manager = SkillManager(workspace_dir=workspace_root) - logger.info(f"[AgentBridge] Initialized SkillManager with {len(skill_manager.skills)} skills for session {session_id}") + logger.debug(f"[AgentBridge] Initialized SkillManager with {len(skill_manager.skills)} skills for session {session_id}") except Exception as e: logger.warning(f"[AgentBridge] Failed to initialize SkillManager for session {session_id}: {e}") @@ -716,15 +716,20 @@ class AgentBridge: if is_first: mark_conversation_started(workspace_root) + # Get cost control parameters from config + max_steps = conf().get("agent_max_steps", 20) + max_context_tokens = conf().get("agent_max_context_tokens", 50000) + # Create agent for this session agent = self.create_agent( system_prompt=system_prompt, tools=tools, - max_steps=50, + max_steps=max_steps, output_mode="logger", workspace_dir=workspace_root, skill_manager=skill_manager, - enable_skills=True + enable_skills=True, + max_context_tokens=max_context_tokens ) if memory_manager: @@ -893,16 +898,16 @@ class AgentBridge: for config_key, env_key in key_mapping.items(): # Skip if already in .env file if env_key in existing_env_vars: - logger.debug(f"[AgentBridge] Skipping {env_key} - already in .env") continue # Get value from config.json value = conf().get(config_key, "") if value and value.strip(): # Only migrate non-empty values keys_to_migrate[env_key] = value.strip() - logger.debug(f"[AgentBridge] Will migrate {env_key} from config.json") - else: - logger.debug(f"[AgentBridge] Skipping {env_key} - no value in config.json") + + # Log summary if there are keys to skip + if existing_env_vars: + logger.debug(f"[AgentBridge] {len(existing_env_vars)} env vars already in .env") # Write new keys to .env file if keys_to_migrate: diff --git a/channel/chat_channel.py b/channel/chat_channel.py index bceaeef..af3607d 100644 --- a/channel/chat_channel.py +++ b/channel/chat_channel.py @@ -173,11 +173,11 @@ class ChatChannel(Channel): def _handle(self, context: Context): if context is None or not context.content: return - logger.debug("[chat_channel] ready to handle context: {}".format(context)) + logger.debug("[chat_channel] handling context: {}".format(context)) # reply的构建步骤 reply = self._generate_reply(context) - logger.debug("[chat_channel] ready to decorate reply: {}".format(reply)) + logger.debug("[chat_channel] decorating reply: {}".format(reply)) # reply的包装步骤 if reply and reply.content: @@ -195,7 +195,7 @@ class ChatChannel(Channel): ) reply = e_context["reply"] if not e_context.is_pass(): - logger.debug("[chat_channel] ready to handle context: type={}, content={}".format(context.type, context.content)) + logger.debug("[chat_channel] type={}, content={}".format(context.type, context.content)) if context.type == ContextType.TEXT or context.type == ContextType.IMAGE_CREATE: # 文字和图片消息 context["channel"] = e_context["channel"] reply = super().build_reply_content(context.content, context) @@ -289,7 +289,7 @@ class ChatChannel(Channel): ) reply = e_context["reply"] if not e_context.is_pass() and reply and reply.type: - logger.debug("[chat_channel] ready to send reply: {}, context: {}".format(reply, context)) + logger.debug("[chat_channel] sending reply: {}, context: {}".format(reply, context)) # 如果是文本回复,尝试提取并发送图片 if reply.type == ReplyType.TEXT: @@ -343,7 +343,9 @@ class ChatChannel(Channel): logger.info(f"[chat_channel] Extracted {len(media_items)} media item(s) from reply") # 先发送文本(保持原文本不变) + logger.info(f"[chat_channel] Sending text content before media: {reply.content[:100]}...") self._send(reply, context) + logger.info(f"[chat_channel] Text sent, now sending {len(media_items)} media item(s)") # 然后逐个发送媒体文件 for i, (url, media_type) in enumerate(media_items): @@ -381,7 +383,7 @@ class ChatChannel(Channel): logger.error(f"[chat_channel] Failed to send {media_type} {url}: {e}") else: # 没有媒体文件,正常发送文本 - self._send(reply, context) + self._send(reply, context) def _send(self, reply: Reply, context: Context, retry_cnt=0): try: diff --git a/channel/dingtalk/dingtalk_channel.py b/channel/dingtalk/dingtalk_channel.py index 67f90a3..1b1e71b 100644 --- a/channel/dingtalk/dingtalk_channel.py +++ b/channel/dingtalk/dingtalk_channel.py @@ -8,6 +8,7 @@ import copy import json # -*- coding=utf-8 -*- import logging +import os import time import requests @@ -102,7 +103,7 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): self.logger = self.setup_logger() # 历史消息id暂存,用于幂等控制 self.receivedMsgs = ExpiredDict(conf().get("expires_in_seconds", 3600)) - logger.info("[DingTalk] client_id={}, client_secret={} ".format( + logger.debug("[DingTalk] client_id={}, client_secret={} ".format( self.dingtalk_client_id, self.dingtalk_client_secret)) # 无需群校验和前缀 conf()["group_name_white_list"] = ["ALL_GROUP"] @@ -118,6 +119,7 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): credential = dingtalk_stream.Credential(self.dingtalk_client_id, self.dingtalk_client_secret) client = dingtalk_stream.DingTalkStreamClient(credential) client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, self) + logger.info("[DingTalk] ✅ Stream connected, ready to receive messages") client.start_forever() def get_access_token(self): @@ -242,21 +244,241 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): except Exception as e: logger.error(f"[DingTalk] Error sending group message: {e}") return False + + def upload_media(self, file_path: str, media_type: str = "image") -> str: + """ + 上传媒体文件到钉钉 + + Args: + file_path: 本地文件路径或URL + media_type: 媒体类型 (image, video, voice, file) + + Returns: + media_id,如果上传失败返回 None + """ + access_token = self.get_access_token() + if not access_token: + logger.error("[DingTalk] Cannot upload media: no access token") + return None + + # 处理 file:// URL + if file_path.startswith("file://"): + file_path = file_path[7:] + + # 如果是 HTTP URL,先下载 + if file_path.startswith("http://") or file_path.startswith("https://"): + try: + import uuid + response = requests.get(file_path, timeout=(5, 60)) + if response.status_code != 200: + logger.error(f"[DingTalk] Failed to download file from URL: {file_path}") + return None + + # 保存到临时文件 + file_name = os.path.basename(file_path) or f"media_{uuid.uuid4()}" + workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow")) + tmp_dir = os.path.join(workspace_root, "tmp") + os.makedirs(tmp_dir, exist_ok=True) + temp_file = os.path.join(tmp_dir, file_name) + + with open(temp_file, "wb") as f: + f.write(response.content) + + file_path = temp_file + logger.info(f"[DingTalk] Downloaded file to {file_path}") + except Exception as e: + logger.error(f"[DingTalk] Error downloading file: {e}") + return None + + if not os.path.exists(file_path): + logger.error(f"[DingTalk] File not found: {file_path}") + return None + + # 上传到钉钉 + # 钉钉上传媒体文件 API: https://open.dingtalk.com/document/orgapp/upload-media-files + url = "https://oapi.dingtalk.com/media/upload" + params = { + "access_token": access_token, + "type": media_type + } + + try: + with open(file_path, "rb") as f: + files = {"media": (os.path.basename(file_path), f)} + response = requests.post(url, params=params, files=files, timeout=(5, 60)) + result = response.json() + + if result.get("errcode") == 0: + media_id = result.get("media_id") + logger.info(f"[DingTalk] Media uploaded successfully, media_id={media_id}") + return media_id + else: + logger.error(f"[DingTalk] Failed to upload media: {result}") + return None + except Exception as e: + logger.error(f"[DingTalk] Error uploading media: {e}") + return None + + def send_image_with_media_id(self, access_token: str, media_id: str, incoming_message, is_group: bool) -> bool: + """ + 发送图片消息(使用 media_id) + + Args: + access_token: 访问令牌 + media_id: 媒体ID + incoming_message: 钉钉消息对象 + is_group: 是否为群聊 + + Returns: + 是否发送成功 + """ + headers = { + "x-acs-dingtalk-access-token": access_token, + 'Content-Type': 'application/json' + } + + msg_param = { + "photoURL": media_id # 钉钉图片消息使用 photoURL 字段 + } + + body = { + "robotCode": incoming_message.robot_code, + "msgKey": "sampleImageMsg", + "msgParam": json.dumps(msg_param), + } + + if is_group: + # 群聊 + url = "https://api.dingtalk.com/v1.0/robot/groupMessages/send" + body["openConversationId"] = incoming_message.conversation_id + else: + # 单聊 + url = "https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend" + body["userIds"] = [incoming_message.sender_staff_id] + + try: + response = requests.post(url=url, headers=headers, json=body, timeout=10) + result = response.json() + + logger.info(f"[DingTalk] Image send result: {response.text}") + + if response.status_code == 200: + return True + else: + logger.error(f"[DingTalk] Send image error: {response.text}") + return False + except Exception as e: + logger.error(f"[DingTalk] Send image exception: {e}") + return False + + def send_image_message(self, receiver: str, media_id: str, is_group: bool, robot_code: str) -> bool: + """ + 发送图片消息 + + Args: + receiver: 接收者ID (user_id 或 conversation_id) + media_id: 媒体ID + is_group: 是否为群聊 + robot_code: 机器人编码 + + Returns: + 是否发送成功 + """ + access_token = self.get_access_token() + if not access_token: + logger.error("[DingTalk] Cannot send image: no access token") + return False + + if not robot_code: + logger.error("[DingTalk] Cannot send image: robot_code is required") + return False + + if is_group: + # 发送群聊图片 + url = "https://api.dingtalk.com/v1.0/robot/groupMessages/send" + headers = { + "x-acs-dingtalk-access-token": access_token, + "Content-Type": "application/json" + } + data = { + "msgParam": json.dumps({"mediaId": media_id}), + "msgKey": "sampleImageMsg", + "openConversationId": receiver, + "robotCode": robot_code + } + else: + # 发送单聊图片 + url = "https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend" + headers = { + "x-acs-dingtalk-access-token": access_token, + "Content-Type": "application/json" + } + data = { + "msgParam": json.dumps({"mediaId": media_id}), + "msgKey": "sampleImageMsg", + "userIds": [receiver], + "robotCode": robot_code + } + + try: + response = requests.post(url, headers=headers, json=data, timeout=10) + result = response.json() + + if response.status_code == 200: + logger.info(f"[DingTalk] Image message sent successfully") + return True + else: + logger.error(f"[DingTalk] Failed to send image message: {result}") + return False + except Exception as e: + logger.error(f"[DingTalk] Error sending image message: {e}") + return False + + def get_image_download_url(self, download_code: str) -> str: + """ + 获取图片下载地址 + 使用钉钉 API: https://open.dingtalk.com/document/orgapp/download-the-robot-to-receive-the-file + """ + access_token = self.get_access_token() + if not access_token: + logger.error("[DingTalk] Cannot get access token for image download") + return None + + url = f"https://oapi.dingtalk.com/robot/messageFiles/download" + params = { + "access_token": access_token, + "downloadCode": download_code + } + + try: + response = requests.get(url, params=params, timeout=10) + if response.status_code == 200: + # 返回图片的直接下载 URL(实际上这个 API 直接返回文件内容) + # 我们需要保存文件并返回本地路径 + logger.info(f"[DingTalk] Successfully got image download URL for code: {download_code}") + # 返回一个特殊的 URL,包含 download_code,后续会用它来下载 + return f"dingtalk://download/{download_code}" + else: + logger.error(f"[DingTalk] Failed to get image download URL: {response.text}") + return None + except Exception as e: + logger.error(f"[DingTalk] Exception getting image download URL: {e}") + return None async def process(self, callback: dingtalk_stream.CallbackMessage): try: incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data) # Debug: 打印完整的 event 数据 - logger.info(f"[DingTalk] ===== Incoming Message Debug =====") - logger.info(f"[DingTalk] callback.data keys: {callback.data.keys() if hasattr(callback.data, 'keys') else 'N/A'}") - logger.info(f"[DingTalk] incoming_message attributes: {dir(incoming_message)}") - logger.info(f"[DingTalk] robot_code: {getattr(incoming_message, 'robot_code', 'N/A')}") - logger.info(f"[DingTalk] chatbot_corp_id: {getattr(incoming_message, 'chatbot_corp_id', 'N/A')}") - logger.info(f"[DingTalk] chatbot_user_id: {getattr(incoming_message, 'chatbot_user_id', 'N/A')}") - logger.info(f"[DingTalk] conversation_id: {getattr(incoming_message, 'conversation_id', 'N/A')}") - logger.info(f"[DingTalk] Raw callback.data: {callback.data}") - logger.info(f"[DingTalk] =====================================") + logger.debug(f"[DingTalk] ===== Incoming Message Debug =====") + logger.debug(f"[DingTalk] callback.data keys: {callback.data.keys() if hasattr(callback.data, 'keys') else 'N/A'}") + logger.debug(f"[DingTalk] incoming_message attributes: {dir(incoming_message)}") + logger.debug(f"[DingTalk] robot_code: {getattr(incoming_message, 'robot_code', 'N/A')}") + logger.debug(f"[DingTalk] chatbot_corp_id: {getattr(incoming_message, 'chatbot_corp_id', 'N/A')}") + logger.debug(f"[DingTalk] chatbot_user_id: {getattr(incoming_message, 'chatbot_user_id', 'N/A')}") + logger.debug(f"[DingTalk] conversation_id: {getattr(incoming_message, 'conversation_id', 'N/A')}") + logger.debug(f"[DingTalk] Raw callback.data: {callback.data}") + logger.debug(f"[DingTalk] =====================================") image_download_handler = self # 传入方法所在的类实例 dingtalk_msg = DingTalkMessage(incoming_message, image_download_handler) @@ -267,7 +489,8 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): self.handle_single(dingtalk_msg) return AckMessage.STATUS_OK, 'OK' except Exception as e: - logger.error(f"dingtalk process error={e}") + logger.error(f"[DingTalk] process error: {e}") + logger.exception(e) # 打印完整堆栈跟踪 return AckMessage.STATUS_SYSTEM_EXCEPTION, 'ERROR' @time_checker @@ -286,6 +509,43 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): logger.debug("[DingTalk]receive text msg: {}".format(cmsg.content)) else: logger.debug("[DingTalk]receive other msg: {}".format(cmsg.content)) + + # 处理文件缓存逻辑 + from channel.file_cache import get_file_cache + file_cache = get_file_cache() + + # 单聊的 session_id 就是 sender_id + session_id = cmsg.from_user_id + + # 如果是单张图片消息,缓存起来 + if cmsg.ctype == ContextType.IMAGE: + if hasattr(cmsg, 'image_path') and cmsg.image_path: + file_cache.add(session_id, cmsg.image_path, file_type='image') + logger.info(f"[DingTalk] Image cached for session {session_id}, waiting for user query...") + # 单张图片不直接处理,等待用户提问 + return + + # 如果是文本消息,检查是否有缓存的文件 + if cmsg.ctype == ContextType.TEXT: + cached_files = file_cache.get(session_id) + if cached_files: + # 将缓存的文件附加到文本消息中 + file_refs = [] + for file_info in cached_files: + file_path = file_info['path'] + file_type = file_info['type'] + if file_type == 'image': + file_refs.append(f"[图片: {file_path}]") + elif file_type == 'video': + file_refs.append(f"[视频: {file_path}]") + else: + file_refs.append(f"[文件: {file_path}]") + + cmsg.content = cmsg.content + "\n" + "\n".join(file_refs) + logger.info(f"[DingTalk] Attached {len(cached_files)} cached file(s) to user query") + # 清除缓存 + file_cache.clear(session_id) + context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=False, msg=cmsg) if context: self.produce(context) @@ -307,6 +567,46 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): logger.debug("[DingTalk]receive text msg: {}".format(cmsg.content)) else: logger.debug("[DingTalk]receive other msg: {}".format(cmsg.content)) + + # 处理文件缓存逻辑 + from channel.file_cache import get_file_cache + file_cache = get_file_cache() + + # 群聊的 session_id + if conf().get("group_shared_session", True): + session_id = cmsg.other_user_id # conversation_id + else: + session_id = cmsg.from_user_id + "_" + cmsg.other_user_id + + # 如果是单张图片消息,缓存起来 + if cmsg.ctype == ContextType.IMAGE: + if hasattr(cmsg, 'image_path') and cmsg.image_path: + file_cache.add(session_id, cmsg.image_path, file_type='image') + logger.info(f"[DingTalk] Image cached for session {session_id}, waiting for user query...") + # 单张图片不直接处理,等待用户提问 + return + + # 如果是文本消息,检查是否有缓存的文件 + if cmsg.ctype == ContextType.TEXT: + cached_files = file_cache.get(session_id) + if cached_files: + # 将缓存的文件附加到文本消息中 + file_refs = [] + for file_info in cached_files: + file_path = file_info['path'] + file_type = file_info['type'] + if file_type == 'image': + file_refs.append(f"[图片: {file_path}]") + elif file_type == 'video': + file_refs.append(f"[视频: {file_path}]") + else: + file_refs.append(f"[文件: {file_path}]") + + cmsg.content = cmsg.content + "\n" + "\n".join(file_refs) + logger.info(f"[DingTalk] Attached {len(cached_files)} cached file(s) to user query") + # 清除缓存 + file_cache.clear(session_id) + context = self._compose_context(cmsg.ctype, cmsg.content, isgroup=True, msg=cmsg) context['no_need_at'] = True if context: @@ -314,6 +614,7 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): def send(self, reply: Reply, context: Context): + logger.info(f"[DingTalk] send() called with reply.type={reply.type}, content_length={len(str(reply.content))}") receiver = context["receiver"] # Check if msg exists (for scheduled tasks, msg might be None) @@ -357,29 +658,184 @@ class DingTalkChanel(ChatChannel, dingtalk_stream.ChatbotHandler): isgroup = msg.is_group incoming_message = msg.incoming_message - - if conf().get("dingtalk_card_enabled"): - logger.info("[Dingtalk] sendMsg={}, receiver={}".format(reply, receiver)) - def reply_with_text(): - self.reply_text(reply.content, incoming_message) - def reply_with_at_text(): - self.reply_text("📢 您有一条新的消息,请查看。", incoming_message) - def reply_with_ai_markdown(): - button_list, markdown_content = self.generate_button_markdown_content(context, reply) - self.reply_ai_markdown_button(incoming_message, markdown_content, button_list, "", "📌 内容由AI生成", "",[incoming_message.sender_staff_id]) - - if reply.type in [ReplyType.IMAGE_URL, ReplyType.IMAGE, ReplyType.TEXT]: - if isgroup: - reply_with_ai_markdown() - reply_with_at_text() + robot_code = self._robot_code or conf().get("dingtalk_robot_code") + + # 处理图片和视频发送 + if reply.type == ReplyType.IMAGE_URL: + logger.info(f"[DingTalk] Sending image: {reply.content}") + + # 如果有附加的文本内容,先发送文本 + if hasattr(reply, 'text_content') and reply.text_content: + self.reply_text(reply.text_content, incoming_message) + import time + time.sleep(0.3) # 短暂延迟,确保文本先到达 + + media_id = self.upload_media(reply.content, media_type="image") + if media_id: + # 使用主动发送 API 发送图片 + access_token = self.get_access_token() + if access_token: + success = self.send_image_with_media_id( + access_token, + media_id, + incoming_message, + isgroup + ) + if not success: + logger.error("[DingTalk] Failed to send image message") + self.reply_text("抱歉,图片发送失败", incoming_message) else: - reply_with_ai_markdown() + logger.error("[DingTalk] Cannot get access token") + self.reply_text("抱歉,图片发送失败(无法获取token)", incoming_message) else: - # 暂不支持其它类型消息回复 - reply_with_text() - else: - self.reply_text(reply.content, incoming_message) + logger.error("[DingTalk] Failed to upload image") + self.reply_text("抱歉,图片上传失败", incoming_message) + return + + elif reply.type == ReplyType.FILE: + # 如果有附加的文本内容,先发送文本 + if hasattr(reply, 'text_content') and reply.text_content: + self.reply_text(reply.text_content, incoming_message) + import time + time.sleep(0.3) # 短暂延迟,确保文本先到达 + + # 判断是否为视频文件 + file_path = reply.content + if file_path.startswith("file://"): + file_path = file_path[7:] + + is_video = file_path.lower().endswith(('.mp4', '.avi', '.mov', '.wmv', '.flv')) + + access_token = self.get_access_token() + if not access_token: + logger.error("[DingTalk] Cannot get access token") + self.reply_text("抱歉,文件发送失败(无法获取token)", incoming_message) + return + + if is_video: + logger.info(f"[DingTalk] Sending video: {reply.content}") + media_id = self.upload_media(reply.content, media_type="video") + if media_id: + # 发送视频消息 + msg_param = { + "duration": "30", # TODO: 获取实际视频时长 + "videoMediaId": media_id, + "videoType": "mp4", + "height": "400", + "width": "600", + } + success = self._send_file_message( + access_token, + incoming_message, + "sampleVideo", + msg_param, + isgroup + ) + if not success: + self.reply_text("抱歉,视频发送失败", incoming_message) + else: + logger.error("[DingTalk] Failed to upload video") + self.reply_text("抱歉,视频上传失败", incoming_message) + else: + # 其他文件类型 + logger.info(f"[DingTalk] Sending file: {reply.content}") + media_id = self.upload_media(reply.content, media_type="file") + if media_id: + file_name = os.path.basename(file_path) + file_base, file_extension = os.path.splitext(file_name) + msg_param = { + "mediaId": media_id, + "fileName": file_name, + "fileType": file_extension[1:] if file_extension else "file" + } + success = self._send_file_message( + access_token, + incoming_message, + "sampleFile", + msg_param, + isgroup + ) + if not success: + self.reply_text("抱歉,文件发送失败", incoming_message) + else: + logger.error("[DingTalk] Failed to upload file") + self.reply_text("抱歉,文件上传失败", incoming_message) + return + + # 处理文本消息 + elif reply.type == ReplyType.TEXT: + logger.info(f"[DingTalk] Sending text message, length={len(reply.content)}") + if conf().get("dingtalk_card_enabled"): + logger.info("[Dingtalk] sendMsg={}, receiver={}".format(reply, receiver)) + def reply_with_text(): + self.reply_text(reply.content, incoming_message) + def reply_with_at_text(): + self.reply_text("📢 您有一条新的消息,请查看。", incoming_message) + def reply_with_ai_markdown(): + button_list, markdown_content = self.generate_button_markdown_content(context, reply) + self.reply_ai_markdown_button(incoming_message, markdown_content, button_list, "", "📌 内容由AI生成", "",[incoming_message.sender_staff_id]) + if reply.type in [ReplyType.IMAGE_URL, ReplyType.IMAGE, ReplyType.TEXT]: + if isgroup: + reply_with_ai_markdown() + reply_with_at_text() + else: + reply_with_ai_markdown() + else: + # 暂不支持其它类型消息回复 + reply_with_text() + else: + self.reply_text(reply.content, incoming_message) + return + + def _send_file_message(self, access_token: str, incoming_message, msg_key: str, msg_param: dict, is_group: bool) -> bool: + """ + 发送文件/视频消息的通用方法 + + Args: + access_token: 访问令牌 + incoming_message: 钉钉消息对象 + msg_key: 消息类型 (sampleFile, sampleVideo, sampleAudio) + msg_param: 消息参数 + is_group: 是否为群聊 + + Returns: + 是否发送成功 + """ + headers = { + "x-acs-dingtalk-access-token": access_token, + 'Content-Type': 'application/json' + } + + body = { + "robotCode": incoming_message.robot_code, + "msgKey": msg_key, + "msgParam": json.dumps(msg_param), + } + + if is_group: + # 群聊 + url = "https://api.dingtalk.com/v1.0/robot/groupMessages/send" + body["openConversationId"] = incoming_message.conversation_id + else: + # 单聊 + url = "https://api.dingtalk.com/v1.0/robot/oToMessages/batchSend" + body["userIds"] = [incoming_message.sender_staff_id] + + try: + response = requests.post(url=url, headers=headers, json=body, timeout=10) + result = response.json() + + logger.info(f"[DingTalk] File send result: {response.text}") + + if response.status_code == 200: + return True + else: + logger.error(f"[DingTalk] Send file error: {response.text}") + return False + except Exception as e: + logger.error(f"[DingTalk] Send file exception: {e}") + return False def generate_button_markdown_content(self, context, reply): image_url = context.kwargs.get("image_url") diff --git a/channel/dingtalk/dingtalk_message.py b/channel/dingtalk/dingtalk_message.py index 2196a1f..9754f32 100644 --- a/channel/dingtalk/dingtalk_message.py +++ b/channel/dingtalk/dingtalk_message.py @@ -1,4 +1,5 @@ import os +import re import requests from dingtalk_stream import ChatbotMessage @@ -8,6 +9,7 @@ from channel.chat_message import ChatMessage # -*- coding=utf-8 -*- from common.log import logger from common.tmp_dir import TmpDir +from config import conf class DingTalkMessage(ChatMessage): @@ -37,15 +39,67 @@ class DingTalkMessage(ChatMessage): self.content = event.extensions['content']['recognition'].strip() self.ctype = ContextType.TEXT elif (self.message_type == 'picture') or (self.message_type == 'richText'): - self.ctype = ContextType.IMAGE # 钉钉图片类型或富文本类型消息处理 image_list = event.get_image_list() - if len(image_list) > 0: + + if self.message_type == 'picture' and len(image_list) > 0: + # 单张图片消息:下载到工作空间,用于文件缓存 + self.ctype = ContextType.IMAGE download_code = image_list[0] download_url = image_download_handler.get_image_download_url(download_code) - self.content = download_image_file(download_url, TmpDir().path()) + + # 下载到工作空间 tmp 目录 + workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow")) + tmp_dir = os.path.join(workspace_root, "tmp") + os.makedirs(tmp_dir, exist_ok=True) + + image_path = download_image_file(download_url, tmp_dir) + if image_path: + self.content = image_path + self.image_path = image_path # 保存图片路径用于缓存 + logger.info(f"[DingTalk] Downloaded single image to {image_path}") + else: + self.content = "[图片下载失败]" + self.image_path = None + + elif self.message_type == 'richText' and len(image_list) > 0: + # 富文本消息:下载所有图片并附加到文本中 + self.ctype = ContextType.TEXT + + # 下载到工作空间 tmp 目录 + workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow")) + tmp_dir = os.path.join(workspace_root, "tmp") + os.makedirs(tmp_dir, exist_ok=True) + + # 提取富文本中的文本内容 + text_content = "" + if self.rich_text_content: + # rich_text_content 是一个 RichTextContent 对象,需要从中提取文本 + text_list = event.get_text_list() + if text_list: + text_content = "".join(text_list).strip() + + # 下载所有图片 + image_paths = [] + for download_code in image_list: + download_url = image_download_handler.get_image_download_url(download_code) + image_path = download_image_file(download_url, tmp_dir) + if image_path: + image_paths.append(image_path) + + # 构建消息内容:文本 + 图片路径 + content_parts = [] + if text_content: + content_parts.append(text_content) + for img_path in image_paths: + content_parts.append(f"[图片: {img_path}]") + + self.content = "\n".join(content_parts) if content_parts else "[富文本消息]" + logger.info(f"[DingTalk] Received richText with {len(image_paths)} image(s): {self.content}") else: - logger.debug(f"[Dingtalk] messageType :{self.message_type} , imageList isEmpty") + self.ctype = ContextType.IMAGE + self.content = "[未找到图片]" + logger.debug(f"[DingTalk] messageType: {self.message_type}, imageList isEmpty") if self.is_group: self.from_user_id = event.conversation_id @@ -59,27 +113,95 @@ class DingTalkMessage(ChatMessage): def download_image_file(image_url, temp_dir): - headers = { - 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36' - } - # 设置代理 - # self.proxies - # , proxies=self.proxies - response = requests.get(image_url, headers=headers, stream=True, timeout=60 * 5) - if response.status_code == 200: - - # 生成文件名 - file_name = image_url.split("/")[-1].split("?")[0] - - # 检查临时目录是否存在,如果不存在则创建 - if not os.path.exists(temp_dir): - os.makedirs(temp_dir) - - # 将文件保存到临时目录 - file_path = os.path.join(temp_dir, file_name) - with open(file_path, 'wb') as file: - file.write(response.content) - return file_path + """ + 下载图片文件 + 支持两种方式: + 1. 普通 HTTP(S) URL + 2. 钉钉 downloadCode: dingtalk://download/{download_code} + """ + # 检查临时目录是否存在,如果不存在则创建 + if not os.path.exists(temp_dir): + os.makedirs(temp_dir) + + # 处理钉钉 downloadCode + if image_url.startswith("dingtalk://download/"): + download_code = image_url.replace("dingtalk://download/", "") + logger.info(f"[DingTalk] Downloading image with downloadCode: {download_code[:20]}...") + + # 需要从外部传入 access_token,这里先用一个临时方案 + # 从 config 获取 dingtalk_client_id 和 dingtalk_client_secret + from config import conf + client_id = conf().get("dingtalk_client_id") + client_secret = conf().get("dingtalk_client_secret") + + if not client_id or not client_secret: + logger.error("[DingTalk] Missing dingtalk_client_id or dingtalk_client_secret") + return None + + # 获取 access_token + token_url = "https://oapi.dingtalk.com/gettoken" + token_params = { + "appkey": client_id, + "appsecret": client_secret + } + + try: + token_response = requests.get(token_url, params=token_params, timeout=10) + token_data = token_response.json() + + if token_data.get("errcode") == 0: + access_token = token_data.get("access_token") + + # 下载图片 + download_url = f"https://oapi.dingtalk.com/robot/messageFiles/download" + download_params = { + "access_token": access_token, + "downloadCode": download_code + } + + response = requests.get(download_url, params=download_params, stream=True, timeout=60) + if response.status_code == 200: + # 生成文件名(使用 download_code 的 hash,避免特殊字符) + import hashlib + file_hash = hashlib.md5(download_code.encode()).hexdigest()[:16] + file_name = f"{file_hash}.png" + file_path = os.path.join(temp_dir, file_name) + + with open(file_path, 'wb') as file: + file.write(response.content) + + logger.info(f"[DingTalk] Image downloaded successfully: {file_path}") + return file_path + else: + logger.error(f"[DingTalk] Failed to download image: {response.status_code}") + return None + else: + logger.error(f"[DingTalk] Failed to get access token: {token_data}") + return None + except Exception as e: + logger.error(f"[DingTalk] Exception downloading image: {e}") + return None + + # 普通 HTTP(S) URL else: - logger.info(f"[Dingtalk] Failed to download image file, {response.content}") - return None + headers = { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36' + } + + try: + response = requests.get(image_url, headers=headers, stream=True, timeout=60 * 5) + if response.status_code == 200: + # 生成文件名 + file_name = image_url.split("/")[-1].split("?")[0] + + # 将文件保存到临时目录 + file_path = os.path.join(temp_dir, file_name) + with open(file_path, 'wb') as file: + file.write(response.content) + return file_path + else: + logger.info(f"[Dingtalk] Failed to download image file, {response.content}") + return None + except Exception as e: + logger.error(f"[Dingtalk] Exception downloading image: {e}") + return None diff --git a/channel/feishu/feishu_channel.py b/channel/feishu/feishu_channel.py index 0def10b..0197c93 100644 --- a/channel/feishu/feishu_channel.py +++ b/channel/feishu/feishu_channel.py @@ -55,7 +55,7 @@ class FeiShuChanel(ChatChannel): super().__init__() # 历史消息id暂存,用于幂等控制 self.receivedMsgs = ExpiredDict(60 * 60 * 7.1) - logger.info("[FeiShu] app_id={}, app_secret={}, verification_token={}, event_mode={}".format( + logger.debug("[FeiShu] app_id={}, app_secret={}, verification_token={}, event_mode={}".format( self.feishu_app_id, self.feishu_app_secret, self.feishu_token, self.feishu_event_mode)) # 无需群校验和前缀 conf()["group_name_white_list"] = ["ALL_GROUP"] @@ -74,7 +74,7 @@ class FeiShuChanel(ChatChannel): def _startup_webhook(self): """启动HTTP服务器接收事件(webhook模式)""" - logger.info("[FeiShu] Starting in webhook mode...") + logger.debug("[FeiShu] Starting in webhook mode...") urls = ( '/', 'channel.feishu.feishu_channel.FeishuController' ) @@ -84,7 +84,7 @@ class FeiShuChanel(ChatChannel): def _startup_websocket(self): """启动长连接接收事件(websocket模式)""" - logger.info("[FeiShu] Starting in websocket mode...") + logger.debug("[FeiShu] Starting in websocket mode...") # 创建事件处理器 def handle_message_event(data: lark.im.v1.P2ImMessageReceiveV1) -> None: @@ -118,7 +118,7 @@ class FeiShuChanel(ChatChannel): # 在新线程中启动客户端,避免阻塞主线程 def start_client(): try: - logger.info("[FeiShu] Websocket client starting...") + logger.debug("[FeiShu] Websocket client starting...") ws_client.start() except Exception as e: logger.error(f"[FeiShu] Websocket client error: {e}", exc_info=True) @@ -127,7 +127,7 @@ class FeiShuChanel(ChatChannel): ws_thread.start() # 保持主线程运行 - logger.info("[FeiShu] Websocket mode started, waiting for events...") + logger.info("[FeiShu] ✅ Websocket connected, ready to receive messages") ws_thread.join() def _handle_message_event(self, event: dict): @@ -173,6 +173,48 @@ class FeiShuChanel(ChatChannel): if not feishu_msg: return + # 处理文件缓存逻辑 + from channel.file_cache import get_file_cache + file_cache = get_file_cache() + + # 获取 session_id(用于缓存关联) + if is_group: + if conf().get("group_shared_session", True): + session_id = msg.get("chat_id") # 群共享会话 + else: + session_id = feishu_msg.from_user_id + "_" + msg.get("chat_id") + else: + session_id = feishu_msg.from_user_id + + # 如果是单张图片消息,缓存起来 + if feishu_msg.ctype == ContextType.IMAGE: + if hasattr(feishu_msg, 'image_path') and feishu_msg.image_path: + file_cache.add(session_id, feishu_msg.image_path, file_type='image') + logger.info(f"[FeiShu] Image cached for session {session_id}, waiting for user query...") + # 单张图片不直接处理,等待用户提问 + return + + # 如果是文本消息,检查是否有缓存的文件 + if feishu_msg.ctype == ContextType.TEXT: + cached_files = file_cache.get(session_id) + if cached_files: + # 将缓存的文件附加到文本消息中 + file_refs = [] + for file_info in cached_files: + file_path = file_info['path'] + file_type = file_info['type'] + if file_type == 'image': + file_refs.append(f"[图片: {file_path}]") + elif file_type == 'video': + file_refs.append(f"[视频: {file_path}]") + else: + file_refs.append(f"[文件: {file_path}]") + + feishu_msg.content = feishu_msg.content + "\n" + "\n".join(file_refs) + logger.info(f"[FeiShu] Attached {len(cached_files)} cached file(s) to user query") + # 清除缓存 + file_cache.clear(session_id) + context = self._compose_context( feishu_msg.ctype, feishu_msg.content, @@ -183,7 +225,7 @@ class FeiShuChanel(ChatChannel): ) if context: self.produce(context) - logger.info(f"[FeiShu] query={feishu_msg.content}, type={feishu_msg.ctype}") + logger.debug(f"[FeiShu] query={feishu_msg.content}, type={feishu_msg.ctype}") def send(self, reply: Reply, context: Context): msg = context.get("msg") @@ -197,7 +239,7 @@ class FeiShuChanel(ChatChannel): "Content-Type": "application/json", } msg_type = "text" - logger.info(f"[FeiShu] start send reply message, type={context.type}, content={reply.content}") + logger.debug(f"[FeiShu] sending reply, type={context.type}, content={reply.content[:100]}...") reply_content = reply.content content_key = "text" if reply.type == ReplyType.IMAGE_URL: @@ -217,14 +259,20 @@ class FeiShuChanel(ChatChannel): is_video = file_path.lower().endswith(('.mp4', '.avi', '.mov', '.wmv', '.flv')) if is_video: - # 视频使用 media 类型 - file_key = self._upload_video_url(reply.content, access_token) - if not file_key: + # 视频使用 media 类型,需要上传并获取 file_key 和 duration + video_info = self._upload_video_url(reply.content, access_token) + if not video_info or not video_info.get('file_key'): logger.warning("[FeiShu] upload video failed") return - reply_content = file_key + + # media 类型需要特殊的 content 格式 msg_type = "media" - content_key = "file_key" + # 注意:media 类型的 content 不使用 content_key,而是完整的 JSON 对象 + reply_content = { + "file_key": video_info['file_key'], + "duration": video_info.get('duration', 0) # 视频时长(毫秒) + } + content_key = None # media 类型不使用单一的 key else: # 其他文件使用 file 类型 file_key = self._upload_file_url(reply.content, access_token) @@ -243,7 +291,7 @@ class FeiShuChanel(ChatChannel): url = f"https://open.feishu.cn/open-apis/im/v1/messages/{msg.msg_id}/reply" data = { "msg_type": msg_type, - "content": json.dumps({content_key: reply_content}) + "content": json.dumps(reply_content) if content_key is None else json.dumps({content_key: reply_content}) } res = requests.post(url=url, headers=headers, json=data, timeout=(5, 10)) else: @@ -253,7 +301,7 @@ class FeiShuChanel(ChatChannel): data = { "receive_id": context.get("receiver"), "msg_type": msg_type, - "content": json.dumps({content_key: reply_content}) + "content": json.dumps(reply_content) if content_key is None else json.dumps({content_key: reply_content}) } res = requests.post(url=url, headers=headers, params=params, json=data, timeout=(5, 10)) res = res.json() @@ -336,103 +384,128 @@ class FeiShuChanel(ChatChannel): os.remove(temp_name) return upload_response.json().get("data").get("image_key") + def _get_video_duration(self, file_path: str) -> int: + """ + 获取视频时长(毫秒) + + Args: + file_path: 视频文件路径 + + Returns: + 视频时长(毫秒),如果获取失败返回0 + """ + try: + import subprocess + + # 使用 ffprobe 获取视频时长 + cmd = [ + 'ffprobe', + '-v', 'error', + '-show_entries', 'format=duration', + '-of', 'default=noprint_wrappers=1:nokey=1', + file_path + ] + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) + if result.returncode == 0: + duration_seconds = float(result.stdout.strip()) + duration_ms = int(duration_seconds * 1000) + logger.info(f"[FeiShu] Video duration: {duration_seconds:.2f}s ({duration_ms}ms)") + return duration_ms + else: + logger.warning(f"[FeiShu] Failed to get video duration via ffprobe: {result.stderr}") + return 0 + except FileNotFoundError: + logger.warning("[FeiShu] ffprobe not found, video duration will be 0. Install ffmpeg to fix this.") + return 0 + except Exception as e: + logger.warning(f"[FeiShu] Failed to get video duration: {e}") + return 0 + def _upload_video_url(self, video_url, access_token): """ - Upload video to Feishu and return file_key (for media type messages) + Upload video to Feishu and return video info (file_key and duration) Supports: - file:// URLs for local files - http(s):// URLs (download then upload) + + Returns: + dict with 'file_key' and 'duration' (milliseconds), or None if failed """ - # For file:// URLs (local files), upload directly - if video_url.startswith("file://"): - local_path = video_url[7:] # Remove file:// prefix - if not os.path.exists(local_path): - logger.error(f"[FeiShu] local video file not found: {local_path}") - return None + local_path = None + temp_file = None + + try: + # For file:// URLs (local files), upload directly + if video_url.startswith("file://"): + local_path = video_url[7:] # Remove file:// prefix + if not os.path.exists(local_path): + logger.error(f"[FeiShu] local video file not found: {local_path}") + return None + else: + # For HTTP URLs, download first + logger.info(f"[FeiShu] Downloading video from URL: {video_url}") + response = requests.get(video_url, timeout=(5, 60)) + if response.status_code != 200: + logger.error(f"[FeiShu] download video failed, status={response.status_code}") + return None + + # Save to temp file + import uuid + file_name = os.path.basename(video_url) or "video.mp4" + temp_file = str(uuid.uuid4()) + "_" + file_name + + with open(temp_file, "wb") as file: + file.write(response.content) + + logger.info(f"[FeiShu] Video downloaded, size={len(response.content)} bytes") + local_path = temp_file + # Get video duration + duration = self._get_video_duration(local_path) + + # Upload to Feishu file_name = os.path.basename(local_path) file_ext = os.path.splitext(file_name)[1].lower() - - # Determine file type for Feishu API (for media messages) - # Media type only supports mp4 - file_type_map = { - '.mp4': 'mp4', - } - file_type = file_type_map.get(file_ext, 'mp4') # Default to mp4 - - # Upload video to Feishu (use file upload API, but send as media type) - upload_url = "https://open.feishu.cn/open-apis/im/v1/files" - data = {'file_type': file_type, 'file_name': file_name} - headers = {'Authorization': f'Bearer {access_token}'} - - try: - with open(local_path, "rb") as file: - upload_response = requests.post( - upload_url, - files={"file": file}, - data=data, - headers=headers, - timeout=(5, 60) # 5s connect, 60s read timeout (videos are larger) - ) - logger.info(f"[FeiShu] upload video response, status={upload_response.status_code}, res={upload_response.content}") - - response_data = upload_response.json() - if response_data.get("code") == 0: - return response_data.get("data").get("file_key") - else: - logger.error(f"[FeiShu] upload video failed: {response_data}") - return None - except Exception as e: - logger.error(f"[FeiShu] upload video exception: {e}") - return None - - # For HTTP URLs, download first then upload - try: - logger.info(f"[FeiShu] Downloading video from URL: {video_url}") - response = requests.get(video_url, timeout=(5, 60)) - if response.status_code != 200: - logger.error(f"[FeiShu] download video failed, status={response.status_code}") - return None - - # Save to temp file - import uuid - file_name = os.path.basename(video_url) or "video.mp4" - temp_name = str(uuid.uuid4()) + "_" + file_name - - with open(temp_name, "wb") as file: - file.write(response.content) - - logger.info(f"[FeiShu] Video downloaded, size={len(response.content)} bytes, uploading...") - - # Upload - file_ext = os.path.splitext(file_name)[1].lower() - file_type_map = { - '.mp4': 'mp4', - } + file_type_map = {'.mp4': 'mp4'} file_type = file_type_map.get(file_ext, 'mp4') upload_url = "https://open.feishu.cn/open-apis/im/v1/files" data = {'file_type': file_type, 'file_name': file_name} headers = {'Authorization': f'Bearer {access_token}'} - with open(temp_name, "rb") as file: - upload_response = requests.post(upload_url, files={"file": file}, data=data, headers=headers, timeout=(5, 60)) - logger.info(f"[FeiShu] upload video, res={upload_response.content}") + with open(local_path, "rb") as file: + upload_response = requests.post( + upload_url, + files={"file": file}, + data=data, + headers=headers, + timeout=(5, 60) + ) + logger.info(f"[FeiShu] upload video response, status={upload_response.status_code}, res={upload_response.content}") response_data = upload_response.json() - os.remove(temp_name) # Clean up temp file - if response_data.get("code") == 0: - return response_data.get("data").get("file_key") + file_key = response_data.get("data").get("file_key") + return { + 'file_key': file_key, + 'duration': duration + } else: logger.error(f"[FeiShu] upload video failed: {response_data}") return None + except Exception as e: - logger.error(f"[FeiShu] upload video from URL exception: {e}") - # Clean up temp file if exists - if 'temp_name' in locals() and os.path.exists(temp_name): - os.remove(temp_name) + logger.error(f"[FeiShu] upload video exception: {e}") return None + + finally: + # Clean up temp file + if temp_file and os.path.exists(temp_file): + try: + os.remove(temp_file) + except Exception as e: + logger.warning(f"[FeiShu] Failed to remove temp file {temp_file}: {e}") def _upload_file_url(self, file_url, access_token): """ diff --git a/channel/feishu/feishu_message.py b/channel/feishu/feishu_message.py index 7b6408f..cb309f1 100644 --- a/channel/feishu/feishu_message.py +++ b/channel/feishu/feishu_message.py @@ -25,13 +25,33 @@ class FeishuMessage(ChatMessage): content = json.loads(msg.get('content')) self.content = content.get("text").strip() elif msg_type == "image": - # 单张图片消息,不处理和存储 + # 单张图片消息:下载并缓存,等待用户提问时一起发送 self.ctype = ContextType.IMAGE content = json.loads(msg.get("content")) image_key = content.get("image_key") - # 仅记录图片key,不下载 - self.content = f"[图片: {image_key}]" - logger.info(f"[FeiShu] Received single image message, key={image_key}, skipped download") + + # 下载图片到工作空间临时目录 + workspace_root = os.path.expanduser(conf().get("agent_workspace", "~/cow")) + tmp_dir = os.path.join(workspace_root, "tmp") + os.makedirs(tmp_dir, exist_ok=True) + image_path = os.path.join(tmp_dir, f"{image_key}.png") + + # 下载图片 + url = f"https://open.feishu.cn/open-apis/im/v1/messages/{msg.get('message_id')}/resources/{image_key}" + headers = {"Authorization": "Bearer " + access_token} + params = {"type": "image"} + response = requests.get(url=url, headers=headers, params=params) + + if response.status_code == 200: + with open(image_path, "wb") as f: + f.write(response.content) + logger.info(f"[FeiShu] Downloaded single image, key={image_key}, path={image_path}") + self.content = image_path + self.image_path = image_path # 保存图片路径 + else: + logger.error(f"[FeiShu] Failed to download single image, key={image_key}, status={response.status_code}") + self.content = f"[图片下载失败: {image_key}]" + self.image_path = None elif msg_type == "post": # 富文本消息,可能包含图片、文本等多种元素 content = json.loads(msg.get("content")) diff --git a/channel/file_cache.py b/channel/file_cache.py new file mode 100644 index 0000000..b258b66 --- /dev/null +++ b/channel/file_cache.py @@ -0,0 +1,100 @@ +""" +文件缓存管理器 +用于缓存单独发送的文件消息(图片、视频、文档等),在用户提问时自动附加 +""" +import time +import logging + +logger = logging.getLogger(__name__) + + +class FileCache: + """文件缓存管理器,按 session_id 缓存文件,TTL=2分钟""" + + def __init__(self, ttl=120): + """ + Args: + ttl: 缓存过期时间(秒),默认2分钟 + """ + self.cache = {} + self.ttl = ttl + + def add(self, session_id: str, file_path: str, file_type: str = "image"): + """ + 添加文件到缓存 + + Args: + session_id: 会话ID + file_path: 文件本地路径 + file_type: 文件类型(image, video, file 等) + """ + if session_id not in self.cache: + self.cache[session_id] = { + 'files': [], + 'timestamp': time.time() + } + + # 添加文件(去重) + file_info = {'path': file_path, 'type': file_type} + if file_info not in self.cache[session_id]['files']: + self.cache[session_id]['files'].append(file_info) + logger.info(f"[FileCache] Added {file_type} to cache for session {session_id}: {file_path}") + + def get(self, session_id: str) -> list: + """ + 获取缓存的文件列表 + + Args: + session_id: 会话ID + + Returns: + 文件信息列表 [{'path': '...', 'type': 'image'}, ...],如果没有或已过期返回空列表 + """ + if session_id not in self.cache: + return [] + + item = self.cache[session_id] + + # 检查是否过期 + if time.time() - item['timestamp'] > self.ttl: + logger.info(f"[FileCache] Cache expired for session {session_id}, clearing...") + del self.cache[session_id] + return [] + + return item['files'] + + def clear(self, session_id: str): + """ + 清除指定会话的缓存 + + Args: + session_id: 会话ID + """ + if session_id in self.cache: + logger.info(f"[FileCache] Cleared cache for session {session_id}") + del self.cache[session_id] + + def cleanup_expired(self): + """清理所有过期的缓存""" + current_time = time.time() + expired_sessions = [] + + for session_id, item in self.cache.items(): + if current_time - item['timestamp'] > self.ttl: + expired_sessions.append(session_id) + + for session_id in expired_sessions: + del self.cache[session_id] + logger.debug(f"[FileCache] Cleaned up expired cache for session {session_id}") + + if expired_sessions: + logger.info(f"[FileCache] Cleaned up {len(expired_sessions)} expired cache(s)") + + +# 全局单例 +_file_cache = FileCache() + + +def get_file_cache() -> FileCache: + """获取全局文件缓存实例""" + return _file_cache diff --git a/channel/web/web_channel.py b/channel/web/web_channel.py index 1486c1d..f2f075d 100644 --- a/channel/web/web_channel.py +++ b/channel/web/web_channel.py @@ -200,12 +200,12 @@ class WebChannel(ChatChannel): logger.info("""[WebChannel] 当前channel为web,可修改 config.json 配置文件中的 channel_type 字段进行切换。全部可用类型为: 1. web: 网页 2. terminal: 终端 - 3. wechatmp: 个人公众号 - 4. wechatmp_service: 企业公众号 + 3. feishu: 飞书 + 4. dingtalk: 钉钉 5. wechatcom_app: 企微自建应用 - 6. dingtalk: 钉钉 - 7. feishu: 飞书""") - logger.info(f"Web对话网页已运行, 请使用浏览器访问 http://localhost:{port}/chat (本地运行) 或 http://ip:{port}/chat (服务器运行)") + 6. wechatmp: 个人公众号 + 7. wechatmp_service: 企业公众号""") + logger.info(f"✅ Web对话网页已运行, 请使用浏览器访问 http://localhost:{port}/chat (本地运行) 或 http://ip:{port}/chat (服务器运行)") # 确保静态文件目录存在 static_dir = os.path.join(os.path.dirname(__file__), 'static') diff --git a/config-template.json b/config-template.json index ec9701a..3b42f31 100644 --- a/config-template.json +++ b/config-template.json @@ -1,36 +1,23 @@ { "channel_type": "web", - "model": "", + "model": "claude-sonnet-4-5", "open_ai_api_key": "YOUR API KEY", + "open_ai_api_base": "https://api.openai.com/v1", "claude_api_key": "YOUR API KEY", - "text_to_image": "dall-e-2", + "claude_api_base": "https://api.anthropic.com/v1", + "gemini_api_key": "YOUR API KEY", + "gemini_api_base": "https://generativelanguage.googleapis.com", "voice_to_text": "openai", "text_to_voice": "openai", - "proxy": "", - "hot_reload": false, - "single_chat_prefix": [ - "bot", - "@bot" - ], - "single_chat_reply_prefix": "[bot] ", - "group_chat_prefix": [ - "@bot" - ], - "group_name_white_list": [ - "Agent测试群", - "ChatGPT测试群2" - ], - "image_create_prefix": [""], + "voice_reply_voice": false, "speech_recognition": true, "group_speech_recognition": false, - "voice_reply_voice": false, - "conversation_max_tokens": 2500, - "expires_in_seconds": 3600, - "character_desc": "你是基于大语言模型的AI智能助手,旨在回答并解决人们的任何问题,并且可以使用多种语言与人交流。", - "temperature": 0.7, - "subscribe_msg": "感谢您的关注!\n这里是AI智能助手,可以自由对话。\n支持语音对话。\n支持图片输入。\n支持图片输出,画字开头的消息将按要求创作图片。\n支持tool、角色扮演和文字冒险等丰富的插件。\n输入{trigger_prefix}#help 查看详细指令。", + "proxy": "", "use_linkai": false, "linkai_api_key": "", "linkai_app_code": "", - "agent": false + "agent": true, + "agent_max_context_tokens": 40000, + "agent_max_context_turns": 30, + "agent_max_steps": 20 } diff --git a/config.py b/config.py index 6b9ec32..ecd3b72 100644 --- a/config.py +++ b/config.py @@ -15,6 +15,8 @@ available_setting = { "open_ai_api_key": "", # openai api key # openai apibase,当use_azure_chatgpt为true时,需要设置对应的api base "open_ai_api_base": "https://api.openai.com/v1", + "claude_api_base": "https://api.anthropic.com/v1", # claude api base + "gemini_api_base": "https://generativelanguage.googleapis.com", # gemini api base "proxy": "", # openai使用的代理 # chatgpt模型, 当use_azure_chatgpt为true时,其名称为Azure上model deployment名称 "model": "gpt-3.5-turbo", # 可选择: gpt-4o, pt-4o-mini, gpt-4-turbo, claude-3-sonnet, wenxin, moonshot, qwen-turbo, xunfei, glm-4, minimax, gemini等模型,全部可选模型详见common/const.py文件 @@ -204,13 +206,13 @@ class Config(dict): def __getitem__(self, key): # 跳过以下划线开头的注释字段 if not key.startswith("_") and key not in available_setting: - raise Exception("key {} not in available_setting".format(key)) + logger.warning("[Config] key '{}' not in available_setting, may not take effect".format(key)) return super().__getitem__(key) def __setitem__(self, key, value): # 跳过以下划线开头的注释字段 if not key.startswith("_") and key not in available_setting: - raise Exception("key {} not in available_setting".format(key)) + logger.warning("[Config] key '{}' not in available_setting, may not take effect".format(key)) return super().__setitem__(key, value) def get(self, key, default=None): diff --git a/models/claudeapi/claude_api_bot.py b/models/claudeapi/claude_api_bot.py index 4eb70c0..ed4bbc2 100644 --- a/models/claudeapi/claude_api_bot.py +++ b/models/claudeapi/claude_api_bot.py @@ -31,7 +31,7 @@ class ClaudeAPIBot(Bot, OpenAIImage): def __init__(self): super().__init__() self.api_key = conf().get("claude_api_key") - self.api_base = conf().get("open_ai_api_base") or "https://api.anthropic.com/v1" + self.api_base = conf().get("claude_api_base") or "https://api.anthropic.com/v1" self.proxy = conf().get("proxy", None) self.sessions = SessionManager(BaiduWenxinSession, model=conf().get("model") or "text-davinci-003") diff --git a/models/gemini/google_gemini_bot.py b/models/gemini/google_gemini_bot.py index cf46ef8..95d4b68 100644 --- a/models/gemini/google_gemini_bot.py +++ b/models/gemini/google_gemini_bot.py @@ -33,14 +33,11 @@ class GoogleGeminiBot(Bot): if self.model == "gemini": self.model = "gemini-pro" - # 支持自定义API base地址,复用open_ai_api_base配置 - self.api_base = conf().get("open_ai_api_base", "").strip() + # 支持自定义API base地址 + self.api_base = conf().get("gemini_api_base", "").strip() if self.api_base: # 移除末尾的斜杠 self.api_base = self.api_base.rstrip('/') - # 如果配置的是OpenAI的地址,则使用默认的Gemini地址 - if "api.openai.com" in self.api_base or not self.api_base: - self.api_base = "https://generativelanguage.googleapis.com" logger.info(f"[Gemini] Using custom API base: {self.api_base}") else: self.api_base = "https://generativelanguage.googleapis.com" @@ -254,7 +251,6 @@ class GoogleGeminiBot(Bot): gemini_tools = self._convert_tools_to_gemini_rest_format(tools) if gemini_tools: payload["tools"] = gemini_tools - logger.debug(f"[Gemini] Added {len(tools)} tools to request") # Make REST API call base_url = f"{self.api_base}/v1beta" @@ -267,8 +263,6 @@ class GoogleGeminiBot(Bot): "Content-Type": "application/json" } - logger.debug(f"[Gemini] REST API call: {endpoint}") - response = requests.post( endpoint, headers=headers, @@ -339,8 +333,6 @@ class GoogleGeminiBot(Bot): logger.warning(f"[Gemini] Skipping tool without name: {tool}") continue - logger.debug(f"[Gemini] Converting tool: {name}") - function_declarations.append({ "name": name, "description": description, @@ -464,7 +456,6 @@ class GoogleGeminiBot(Bot): try: chunk_data = json.loads(line) chunk_count += 1 - logger.debug(f"[Gemini] Stream chunk: {json.dumps(chunk_data, ensure_ascii=False)[:200]}") candidates = chunk_data.get("candidates", []) if not candidates: @@ -489,7 +480,6 @@ class GoogleGeminiBot(Bot): for part in parts: if "text" in part and part["text"]: has_content = True - logger.debug(f"[Gemini] Streaming text: {part['text'][:50]}...") yield { "id": f"chatcmpl-{time.time()}", "object": "chat.completion.chunk", @@ -505,7 +495,7 @@ class GoogleGeminiBot(Bot): # Collect function calls if "functionCall" in part: fc = part["functionCall"] - logger.debug(f"[Gemini] Function call detected: {fc.get('name')}") + logger.info(f"[Gemini] Function call: {fc.get('name')}") all_tool_calls.append({ "index": len(all_tool_calls), # Add index to differentiate multiple tool calls "id": f"call_{int(time.time() * 1000000)}_{len(all_tool_calls)}", @@ -522,7 +512,6 @@ class GoogleGeminiBot(Bot): # Send tool calls if any were collected if all_tool_calls and not has_sent_tool_calls: - logger.debug(f"[Gemini] Stream detected {len(all_tool_calls)} tool calls") yield { "id": f"chatcmpl-{time.time()}", "object": "chat.completion.chunk", @@ -536,14 +525,6 @@ class GoogleGeminiBot(Bot): } has_sent_tool_calls = True - # Log summary (only if there's something interesting) - if not has_content and not all_tool_calls: - logger.debug(f"[Gemini] Stream complete: has_content={has_content}, tool_calls={len(all_tool_calls)}") - elif all_tool_calls: - logger.debug(f"[Gemini] Stream complete: {len(all_tool_calls)} tool calls") - else: - logger.debug(f"[Gemini] Stream complete: text response") - # 如果返回空响应,记录详细警告 if not has_content and not all_tool_calls: logger.warning(f"[Gemini] ⚠️ Empty response detected!") diff --git a/plugins/agent/agent.py b/plugins/agent/agent.py index 07a40c1..a314f4a 100644 --- a/plugins/agent/agent.py +++ b/plugins/agent/agent.py @@ -32,7 +32,7 @@ class AgentPlugin(Plugin): self.config = self._load_config() self.tool_manager = ToolManager() self.tool_manager.load_tools(config_dict=self.config.get("tools")) - logger.info("[agent] inited") + logger.debug("[agent] inited") def _load_config(self) -> Dict: """Load configuration from config.yaml file.""" diff --git a/plugins/banwords/banwords.py b/plugins/banwords/banwords.py index 2a33a5a..6d73fef 100644 --- a/plugins/banwords/banwords.py +++ b/plugins/banwords/banwords.py @@ -49,7 +49,7 @@ class Banwords(Plugin): if conf.get("reply_filter", True): self.handlers[Event.ON_DECORATE_REPLY] = self.on_decorate_reply self.reply_action = conf.get("reply_action", "ignore") - logger.info("[Banwords] inited") + logger.debug("[Banwords] inited") except Exception as e: logger.warn("[Banwords] init failed, ignore or see https://github.com/zhayujie/chatgpt-on-wechat/tree/master/plugins/banwords .") raise e diff --git a/plugins/dungeon/dungeon.py b/plugins/dungeon/dungeon.py index dce62cd..b3eda23 100644 --- a/plugins/dungeon/dungeon.py +++ b/plugins/dungeon/dungeon.py @@ -53,7 +53,7 @@ class Dungeon(Plugin): def __init__(self): super().__init__() self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context - logger.info("[Dungeon] inited") + logger.debug("[Dungeon] inited") # 目前没有设计session过期事件,这里先暂时使用过期字典 if conf().get("expires_in_seconds"): self.games = ExpiredDict(conf().get("expires_in_seconds")) diff --git a/plugins/finish/finish.py b/plugins/finish/finish.py index a3c87ea..d976b53 100644 --- a/plugins/finish/finish.py +++ b/plugins/finish/finish.py @@ -20,7 +20,7 @@ class Finish(Plugin): def __init__(self): super().__init__() self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context - logger.info("[Finish] inited") + logger.debug("[Finish] inited") def on_handle_context(self, e_context: EventContext): if e_context["context"].type != ContextType.TEXT: diff --git a/plugins/godcmd/godcmd.py b/plugins/godcmd/godcmd.py index ecf519c..f1e956c 100644 --- a/plugins/godcmd/godcmd.py +++ b/plugins/godcmd/godcmd.py @@ -207,7 +207,7 @@ class Godcmd(Plugin): self.isrunning = True # 机器人是否运行中 self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context - logger.info("[Godcmd] inited") + logger.debug("[Godcmd] inited") def on_handle_context(self, e_context: EventContext): context_type = e_context["context"].type diff --git a/plugins/hello/hello.py b/plugins/hello/hello.py index 23de861..5e4162b 100644 --- a/plugins/hello/hello.py +++ b/plugins/hello/hello.py @@ -35,7 +35,7 @@ class Hello(Plugin): self.group_welc_prompt = self.config.get("group_welc_prompt", self.group_welc_prompt) self.group_exit_prompt = self.config.get("group_exit_prompt", self.group_exit_prompt) self.patpat_prompt = self.config.get("patpat_prompt", self.patpat_prompt) - logger.info("[Hello] inited") + logger.debug("[Hello] inited") self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context except Exception as e: logger.error(f"[Hello]初始化异常:{e}") diff --git a/plugins/keyword/keyword.py b/plugins/keyword/keyword.py index 281b8af..a43834e 100644 --- a/plugins/keyword/keyword.py +++ b/plugins/keyword/keyword.py @@ -37,9 +37,9 @@ class Keyword(Plugin): # 加载关键词 self.keyword = conf["keyword"] - logger.info("[keyword] {}".format(self.keyword)) + logger.debug("[keyword] {}".format(self.keyword)) self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context - logger.info("[keyword] inited.") + logger.debug("[keyword] inited.") except Exception as e: logger.warn("[keyword] init failed, ignore or see https://github.com/zhayujie/chatgpt-on-wechat/tree/master/plugins/keyword .") raise e diff --git a/plugins/linkai/linkai.py b/plugins/linkai/linkai.py index 8bdb13d..17183a6 100644 --- a/plugins/linkai/linkai.py +++ b/plugins/linkai/linkai.py @@ -32,7 +32,7 @@ class LinkAI(Plugin): self.sum_config = {} if self.config: self.sum_config = self.config.get("summary") - logger.info(f"[LinkAI] inited, config={self.config}") + logger.debug(f"[LinkAI] inited, config={self.config}") def on_handle_context(self, e_context: EventContext): """ diff --git a/plugins/plugin.py b/plugins/plugin.py index 3847c11..900501a 100644 --- a/plugins/plugin.py +++ b/plugins/plugin.py @@ -18,14 +18,12 @@ class Plugin: if not plugin_conf: # 全局配置不存在,则获取插件目录下的配置 plugin_config_path = os.path.join(self.path, "config.json") - logger.debug(f"loading plugin config, plugin_config_path={plugin_config_path}, exist={os.path.exists(plugin_config_path)}") if os.path.exists(plugin_config_path): with open(plugin_config_path, "r", encoding="utf-8") as f: plugin_conf = json.load(f) # 写入全局配置内存 write_plugin_config({self.name: plugin_conf}) - logger.debug(f"loading plugin config, plugin_name={self.name}, conf={plugin_conf}") return plugin_conf def save_config(self, config: dict): diff --git a/plugins/plugin_manager.py b/plugins/plugin_manager.py index ae9a1e1..22e7301 100644 --- a/plugins/plugin_manager.py +++ b/plugins/plugin_manager.py @@ -38,7 +38,7 @@ class PluginManager: if self.current_plugin_path == None: raise Exception("Plugin path not set") self.plugins[name.upper()] = plugincls - logger.info("Plugin %s_v%s registered, path=%s" % (name, plugincls.version, plugincls.path)) + logger.debug("Plugin %s_v%s registered, path=%s" % (name, plugincls.version, plugincls.path)) return wrapper @@ -47,7 +47,7 @@ class PluginManager: json.dump(self.pconf, f, indent=4, ensure_ascii=False) def load_config(self): - logger.info("Loading plugins config...") + logger.debug("Loading plugins config...") modified = False if os.path.exists("./plugins/plugins.json"): @@ -85,7 +85,7 @@ class PluginManager: logger.error(e) def scan_plugins(self): - logger.info("Scaning plugins ...") + logger.debug("Scanning plugins ...") plugins_dir = "./plugins" raws = [self.plugins[name] for name in self.plugins] for plugin_name in os.listdir(plugins_dir): diff --git a/plugins/role/role.py b/plugins/role/role.py index 87f5a14..b4e392d 100644 --- a/plugins/role/role.py +++ b/plugins/role/role.py @@ -66,7 +66,7 @@ class Role(Plugin): raise Exception("no role found") self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context self.roleplays = {} - logger.info("[Role] inited") + logger.debug("[Role] inited") except Exception as e: if isinstance(e, FileNotFoundError): logger.warn(f"[Role] init failed, {config_path} not found, ignore or see https://github.com/zhayujie/chatgpt-on-wechat/tree/master/plugins/role .") diff --git a/plugins/tool/__init__.py b/plugins/tool/__init__.py deleted file mode 100644 index 8c9d8dd..0000000 --- a/plugins/tool/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .tool import * diff --git a/skills/linkai-agent/config.json.template b/skills/linkai-agent/config.json.template index b22522d..8cfc7a9 100644 --- a/skills/linkai-agent/config.json.template +++ b/skills/linkai-agent/config.json.template @@ -1,14 +1,14 @@ { "apps": [ { - "app_code": "your_app_code_2", - "app_name": "知识库助手", - "app_description": "基于特定领域知识库提供智能问答的知识助手" + "app_code": "G7z6vKwp", + "app_name": "LinkAI客服助手", + "app_description": "当用户需要了解LinkAI平台相关问题时才选择该助手,基于LinkAI知识库进行回答" }, { - "app_code": "your_workflow_code", - "app_name": "数据分析工作流", - "app_description": "用于数据分析任务的工作流程" + "app_code": "SFY5x7JR", + "app_name": "内容创作助手", + "app_description": "当用户需要创作图片或视频时才使用该助手,支持Nano Banana、Seedream、即梦、Veo、可灵等多种模型" } ] }