Compare commits

..

12 Commits

Author SHA1 Message Date
zhayujie 7cce224499 feat: support multi-channel 2026-02-26 18:34:08 +08:00
zhayujie 925d728a86 fix: replace upsert syntax to support SQLite lower version 2026-02-26 10:44:04 +08:00
zhayujie 9917552b4b fix: improve web UI stability and conversation history restore
- Fix dark mode FOUC: apply theme in <head> before first paint, defer
  transition-colors to post-init to avoid animated flash on load
- Fix Safari IME Enter bug: defer compositionend reset via setTimeout(0)
- Fix history scroll: use requestAnimationFrame before scrollChatToBottom
- Limit restore turns to min(6, max_turns//3) on restart
- Fix load_messages cutoff to start at turn boundary, preventing orphaned
  tool_use/tool_result pairs from being sent to the LLM
- Merge all assistant messages within one user turn into a single bubble;
  render tool_calls in history using same CSS as live SSE view
- Handle empty choices list in stream chunks
2026-02-26 10:35:20 +08:00
zhayujie 29bfbecdc9 feat: persistent storage of conversation history 2026-02-25 18:01:39 +08:00
zhayujie 1a7a8c98d9 docs: add scam warning disclaimer 2026-02-25 01:34:16 +08:00
zhayujie cddb38ac3d Merge pull request #2673 from zhayujie/feat-web-console
feat: web console
2026-02-24 00:06:29 +08:00
zhayujie 394853c0fb feat: web console module display 2026-02-24 00:04:17 +08:00
zhayujie c0702c8b36 feat: web channel stream chat 2026-02-23 22:19:50 +08:00
zhayujie d610608391 feat: add cloud host config 2026-02-23 15:06:31 +08:00
zhayujie 9082eec91d feat: dark mode is used by default 2026-02-23 14:57:02 +08:00
zhayujie f1a1413b5f feat: web console upgrade 2026-02-21 17:56:31 +08:00
zhayujie c1e7f9af9b Merge pull request #2672 from zhayujie/feat-config-update
feat: cloud config update
2026-02-21 11:34:05 +08:00
16 changed files with 3021 additions and 1662 deletions
+6 -3
View File
@@ -24,8 +24,9 @@
## 声明
1. 本项目遵循 [MIT开源协议](/LICENSE),主要用于技术研究和学习,使用本项目时需遵守所在地法律法规、相关政策以及企业章程,禁止用于任何违法或侵犯他人权益的行为。任何个人、团队和企业,无论以何种方式使用该项目、对何对象提供服务,所产生的一切后果,本项目均不承担任何责任
2. 成本与安全:Agent模式下Token使用量高于普通对话模式,请根据效果及成本综合选择模型。Agent具有访问所在操作系统的能力,请谨慎选择项目部署环境。同时项目也会持续升级安全机制、并降低模型消耗成本
1. 本项目遵循 [MIT开源协议](/LICENSE),主要用于技术研究和学习,使用本项目时需遵守所在地法律法规、相关政策以及企业章程,禁止用于任何违法或侵犯他人权益的行为。任何个人、团队和企业,无论以何种方式使用该项目、对何对象提供服务,所产生的一切后果,本项目均不承担任何责任
2. 成本与安全:Agent模式下Token使用量高于普通对话模式,请根据效果及成本综合选择模型。Agent具有访问所在操作系统的能力,请谨慎选择项目部署环境。同时项目也会持续升级安全机制、并降低模型消耗成本
3. CowAgent项目专注于开源技术开发,不会参与、授权或发行任何加密货币。
## 演示
@@ -607,10 +608,12 @@ API Key创建:在 [控制台](https://aistudio.google.com/app/apikey?hl=zh-cn)
以下对可接入通道的配置方式进行说明,应用通道代码在项目的 `channel/` 目录下。
支持同时可接入多个通道,配置时可通过逗号进行分割,例如 `"channel_type": "feishu,dingtalk"`
<details>
<summary>1. Web</summary>
项目启动后默认运行Web通道,配置如下:
项目启动后默认运行Web控制台,配置如下:
```json
{
+1
View File
@@ -158,6 +158,7 @@ class ChatService:
logger.info(f"[ChatService] Agent run completed: session={session_id}")
class _StreamState:
"""Mutable state shared between the event callback and the run method."""
+12 -2
View File
@@ -1,11 +1,21 @@
"""
Memory module for AgentMesh
Provides long-term memory capabilities with hybrid search (vector + keyword)
Provides both long-term memory (vector/keyword search) and short-term
conversation history persistence (SQLite).
"""
from agent.memory.manager import MemoryManager
from agent.memory.config import MemoryConfig, get_default_memory_config, set_global_memory_config
from agent.memory.embedding import create_embedding_provider
from agent.memory.conversation_store import ConversationStore, get_conversation_store
__all__ = ['MemoryManager', 'MemoryConfig', 'get_default_memory_config', 'set_global_memory_config', 'create_embedding_provider']
__all__ = [
'MemoryManager',
'MemoryConfig',
'get_default_memory_config',
'set_global_memory_config',
'create_embedding_provider',
'ConversationStore',
'get_conversation_store',
]
+618
View File
@@ -0,0 +1,618 @@
"""
Conversation history persistence using SQLite.
Design:
- sessions table: per-session metadata (channel_type, last_active, msg_count)
- messages table: individual messages stored as JSON, append-only
- Pruning: age-based only (sessions not updated within N days are deleted)
- Thread-safe via a single in-process lock
Storage path: ~/cow/sessions/conversations.db
"""
from __future__ import annotations
import json
import sqlite3
import threading
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from common.log import logger
# ---------------------------------------------------------------------------
# Schema
# ---------------------------------------------------------------------------
_DDL = """
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
channel_type TEXT NOT NULL DEFAULT '',
created_at INTEGER NOT NULL,
last_active INTEGER NOT NULL,
msg_count INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
seq INTEGER NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
created_at INTEGER NOT NULL,
UNIQUE (session_id, seq)
);
CREATE INDEX IF NOT EXISTS idx_messages_session
ON messages (session_id, seq);
CREATE INDEX IF NOT EXISTS idx_sessions_last_active
ON sessions (last_active);
"""
# Migration: add channel_type column to existing databases that predate it.
_MIGRATION_ADD_CHANNEL_TYPE = """
ALTER TABLE sessions ADD COLUMN channel_type TEXT NOT NULL DEFAULT '';
"""
DEFAULT_MAX_AGE_DAYS: int = 30
def _is_visible_user_message(content: Any) -> bool:
"""
Return True when a user-role message represents actual user input
(not an internal tool_result injected by the agent loop).
"""
if isinstance(content, str):
return bool(content.strip())
if isinstance(content, list):
return any(
isinstance(b, dict) and b.get("type") == "text"
for b in content
)
return False
def _extract_display_text(content: Any) -> str:
"""
Extract the human-readable text portion from a message content value.
Returns an empty string for tool_use / tool_result blocks.
"""
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
parts = [
b.get("text", "")
for b in content
if isinstance(b, dict) and b.get("type") == "text"
]
return "\n".join(p for p in parts if p).strip()
return ""
def _extract_tool_calls(content: Any) -> List[Dict[str, Any]]:
"""
Extract tool_use blocks from an assistant message content.
Returns a list of {name, arguments} dicts (result filled in later).
"""
if not isinstance(content, list):
return []
return [
{"id": b.get("id", ""), "name": b.get("name", ""), "arguments": b.get("input", {})}
for b in content
if isinstance(b, dict) and b.get("type") == "tool_use"
]
def _extract_tool_results(content: Any) -> Dict[str, str]:
"""
Extract tool_result blocks from a user message, keyed by tool_use_id.
"""
if not isinstance(content, list):
return {}
results = {}
for b in content:
if not isinstance(b, dict) or b.get("type") != "tool_result":
continue
tool_id = b.get("tool_use_id", "")
result_content = b.get("content", "")
if isinstance(result_content, list):
result_content = "\n".join(
rb.get("text", "") for rb in result_content
if isinstance(rb, dict) and rb.get("type") == "text"
)
results[tool_id] = str(result_content)
return results
def _group_into_display_turns(
rows: List[tuple],
) -> List[Dict[str, Any]]:
"""
Convert raw (role, content_json, created_at) DB rows into display turns.
One display turn = one visible user message + one merged assistant reply.
All intermediate assistant messages (those carrying tool_use) and the final
assistant text reply produced for the same user query are collapsed into a
single assistant turn, exactly matching the live SSE rendering where tools
and the final answer appear inside the same bubble.
Grouping rules:
- A visible user message starts a new group.
- tool_result user messages are internal; their content is attached to the
matching tool_use entry via tool_use_id and they never become own turns.
- All assistant messages within a group are merged:
* tool_use blocks → tool_calls list (result filled from tool_results)
* text blocks → last non-empty text becomes the display content
"""
# ------------------------------------------------------------------ #
# Pass 1: split rows into groups, each starting with a visible user msg
# ------------------------------------------------------------------ #
# group = (user_row | None, [subsequent_rows])
# user_row: (content, created_at)
groups: List[tuple] = []
cur_user: Optional[tuple] = None
cur_rest: List[tuple] = []
started = False
for role, raw_content, created_at in rows:
try:
content = json.loads(raw_content)
except Exception:
content = raw_content
if role == "user" and _is_visible_user_message(content):
if started:
groups.append((cur_user, cur_rest))
cur_user = (content, created_at)
cur_rest = []
started = True
else:
cur_rest.append((role, content, created_at))
if started:
groups.append((cur_user, cur_rest))
# ------------------------------------------------------------------ #
# Pass 2: build display turns from each group
# ------------------------------------------------------------------ #
turns: List[Dict[str, Any]] = []
for user_row, rest in groups:
# User turn
if user_row:
content, created_at = user_row
text = _extract_display_text(content)
if text:
turns.append({"role": "user", "content": text, "created_at": created_at})
# Collect all tool_calls and tool_results from the rest of the group
all_tool_calls: List[Dict[str, Any]] = []
tool_results: Dict[str, str] = {}
final_text = ""
final_ts: Optional[int] = None
for role, content, created_at in rest:
if role == "user":
tool_results.update(_extract_tool_results(content))
elif role == "assistant":
tcs = _extract_tool_calls(content)
all_tool_calls.extend(tcs)
t = _extract_display_text(content)
if t:
final_text = t
final_ts = created_at
# Attach tool results to their matching tool_call entries
for tc in all_tool_calls:
tc["result"] = tool_results.get(tc.get("id", ""), "")
if final_text or all_tool_calls:
turns.append({
"role": "assistant",
"content": final_text,
"tool_calls": all_tool_calls,
"created_at": final_ts or (user_row[1] if user_row else 0),
})
return turns
class ConversationStore:
"""
SQLite-backed store for per-session conversation history.
Usage:
store = ConversationStore(db_path)
store.append_messages("user_123", new_messages, channel_type="feishu")
msgs = store.load_messages("user_123", max_turns=30)
"""
def __init__(self, db_path: Path):
self._db_path = db_path
self._lock = threading.Lock()
self._init_db()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def load_messages(
self,
session_id: str,
max_turns: int = 30,
) -> List[Dict[str, Any]]:
"""
Load the most recent messages for a session, for injection into the LLM.
ALL message types (user text, assistant tool_use, tool_result) are returned
in their original JSON form so the LLM can reconstruct the full context.
max_turns is a *visible-turn* count: we count only user messages whose
content is actual user text (not tool_result blocks). This prevents
tool-heavy sessions from exhausting the turn budget prematurely.
Args:
session_id: Unique session identifier.
max_turns: Maximum number of visible user-assistant turns to keep.
Returns:
Chronologically ordered list of message dicts (role, content).
"""
with self._lock:
conn = self._connect()
try:
rows = conn.execute(
"""
SELECT seq, role, content
FROM messages
WHERE session_id = ?
ORDER BY seq DESC
""",
(session_id,),
).fetchall()
finally:
conn.close()
if not rows:
return []
# Walk newest-to-oldest counting *visible* user turns (actual user text,
# not tool_result injections). Record the seq of every visible user
# message so we can find a clean cut point later.
visible_turn_seqs: List[int] = [] # newest first
for seq, role, raw_content in rows:
if role != "user":
continue
try:
content = json.loads(raw_content)
except Exception:
content = raw_content
if _is_visible_user_message(content):
visible_turn_seqs.append(seq)
# Determine the seq of the oldest visible user message we want to keep.
# If the total turns fit within max_turns, keep everything.
if len(visible_turn_seqs) <= max_turns:
cutoff_seq = None # keep all
else:
# The Nth visible user message (0-indexed) is the oldest we keep.
cutoff_seq = visible_turn_seqs[max_turns - 1]
# Build result in chronological order, starting from cutoff.
# IMPORTANT: we start exactly at cutoff_seq (the visible user message),
# never mid-group, so tool_use / tool_result pairs are always complete.
result = []
for seq, role, raw_content in reversed(rows):
if cutoff_seq is not None and seq < cutoff_seq:
continue
try:
content = json.loads(raw_content)
except Exception:
content = raw_content
result.append({"role": role, "content": content})
return result
def append_messages(
self,
session_id: str,
messages: List[Dict[str, Any]],
channel_type: str = "",
) -> None:
"""
Append new messages to a session's history.
Seq numbers continue from the session's current maximum, so
concurrent callers on distinct sessions never collide.
Args:
session_id: Unique session identifier.
messages: List of message dicts to append.
channel_type: Source channel (e.g. "feishu", "web", "wechat").
Only written on session creation; ignored on update.
"""
if not messages:
return
now = int(time.time())
with self._lock:
conn = self._connect()
try:
with conn:
# INSERT OR IGNORE creates the row on first visit;
# the UPDATE always refreshes last_active.
# Avoids ON CONFLICT...DO UPDATE (requires SQLite >= 3.24).
conn.execute(
"""
INSERT OR IGNORE INTO sessions
(session_id, channel_type, created_at, last_active, msg_count)
VALUES (?, ?, ?, ?, 0)
""",
(session_id, channel_type, now, now),
)
conn.execute(
"UPDATE sessions SET last_active = ? WHERE session_id = ?",
(now, session_id),
)
# Determine starting seq for the new batch.
row = conn.execute(
"SELECT COALESCE(MAX(seq), -1) FROM messages WHERE session_id = ?",
(session_id,),
).fetchone()
next_seq = row[0] + 1
for msg in messages:
role = msg.get("role", "")
content = json.dumps(
msg.get("content", ""), ensure_ascii=False
)
conn.execute(
"""
INSERT OR IGNORE INTO messages
(session_id, seq, role, content, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(session_id, next_seq, role, content, now),
)
next_seq += 1
conn.execute(
"""
UPDATE sessions
SET msg_count = (
SELECT COUNT(*) FROM messages WHERE session_id = ?
)
WHERE session_id = ?
""",
(session_id, session_id),
)
finally:
conn.close()
def clear_session(self, session_id: str) -> None:
"""Delete all messages and the session record for a given session_id."""
with self._lock:
conn = self._connect()
try:
with conn:
conn.execute(
"DELETE FROM messages WHERE session_id = ?", (session_id,)
)
conn.execute(
"DELETE FROM sessions WHERE session_id = ?", (session_id,)
)
finally:
conn.close()
def cleanup_old_sessions(self, max_age_days: Optional[int] = None) -> int:
"""
Delete sessions that have not been active within max_age_days.
Args:
max_age_days: Override the default retention period.
Returns:
Number of sessions deleted.
"""
try:
from config import conf
max_age = max_age_days or conf().get(
"conversation_max_age_days", DEFAULT_MAX_AGE_DAYS
)
except Exception:
max_age = max_age_days or DEFAULT_MAX_AGE_DAYS
cutoff = int(time.time()) - max_age * 86400
deleted = 0
with self._lock:
conn = self._connect()
try:
with conn:
stale = conn.execute(
"SELECT session_id FROM sessions WHERE last_active < ?",
(cutoff,),
).fetchall()
for (sid,) in stale:
conn.execute(
"DELETE FROM messages WHERE session_id = ?", (sid,)
)
conn.execute(
"DELETE FROM sessions WHERE session_id = ?", (sid,)
)
deleted += 1
finally:
conn.close()
if deleted:
logger.info(f"[ConversationStore] Pruned {deleted} expired sessions")
return deleted
def load_history_page(
self,
session_id: str,
page: int = 1,
page_size: int = 20,
) -> Dict[str, Any]:
"""
Load a page of conversation history for UI display, grouped into turns.
Each "turn" maps to one of:
- A user message (role="user", content=str)
- An assistant message (role="assistant", content=str,
tool_calls=[{name, arguments, result}] when tools were used)
Internal tool_result user messages are merged into the preceding
assistant entry's tool_calls list and never appear as standalone items.
Pages are numbered from 1 (most recent). Messages within a page are
returned in chronological order.
Returns:
{
"messages": [
{
"role": "user" | "assistant",
"content": str,
"tool_calls": [...], # assistant only, may be []
"created_at": int,
},
...
],
"total": <visible turn count>,
"page": <current page>,
"page_size": <page_size>,
"has_more": bool,
}
"""
page = max(1, page)
with self._lock:
conn = self._connect()
try:
rows = conn.execute(
"""
SELECT role, content, created_at
FROM messages
WHERE session_id = ?
ORDER BY seq ASC
""",
(session_id,),
).fetchall()
finally:
conn.close()
visible = _group_into_display_turns(rows)
total = len(visible)
offset = (page - 1) * page_size
page_items = list(reversed(visible))[offset: offset + page_size]
page_items = list(reversed(page_items))
return {
"messages": page_items,
"total": total,
"page": page,
"page_size": page_size,
"has_more": offset + page_size < total,
}
def get_stats(self) -> Dict[str, Any]:
"""Return basic stats keyed by channel_type, for monitoring."""
with self._lock:
conn = self._connect()
try:
total_sessions = conn.execute(
"SELECT COUNT(*) FROM sessions"
).fetchone()[0]
total_messages = conn.execute(
"SELECT COUNT(*) FROM messages"
).fetchone()[0]
by_channel = conn.execute(
"""
SELECT channel_type, COUNT(*) as cnt
FROM sessions
GROUP BY channel_type
ORDER BY cnt DESC
"""
).fetchall()
return {
"total_sessions": total_sessions,
"total_messages": total_messages,
"by_channel": {row[0] or "unknown": row[1] for row in by_channel},
}
finally:
conn.close()
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _init_db(self) -> None:
self._db_path.parent.mkdir(parents=True, exist_ok=True)
conn = self._connect()
try:
conn.executescript(_DDL)
conn.commit()
self._migrate(conn)
finally:
conn.close()
def _migrate(self, conn: sqlite3.Connection) -> None:
"""Apply incremental schema migrations on existing databases."""
cols = {
row[1]
for row in conn.execute("PRAGMA table_info(sessions)").fetchall()
}
if "channel_type" not in cols:
try:
conn.execute(_MIGRATION_ADD_CHANNEL_TYPE)
conn.commit()
logger.info("[ConversationStore] Migrated: added channel_type column")
except Exception as e:
logger.warning(f"[ConversationStore] Migration failed: {e}")
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(str(self._db_path), timeout=10)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
return conn
# ---------------------------------------------------------------------------
# Singleton
# ---------------------------------------------------------------------------
_store_instance: Optional[ConversationStore] = None
_store_lock = threading.Lock()
def get_conversation_store() -> ConversationStore:
"""
Return the process-wide ConversationStore singleton.
Reuses the long-term memory database so the project stays with a single
SQLite file: ~/cow/memory/long-term/index.db
The conversation tables (sessions / messages) are separate from the
memory tables (memory_chunks / file_metadata) — no conflicts.
"""
global _store_instance
if _store_instance is not None:
return _store_instance
with _store_lock:
if _store_instance is not None:
return _store_instance
try:
from agent.memory.config import get_default_memory_config
db_path = get_default_memory_config().get_db_path()
except Exception:
from common.utils import expand_path
db_path = Path(expand_path("~/cow")) / "memory" / "long-term" / "index.db"
_store_instance = ConversationStore(db_path)
logger.debug(f"[ConversationStore] Using shared DB at: {db_path}")
return _store_instance
+2 -2
View File
@@ -501,7 +501,7 @@ class AgentStreamExecutor:
# Prepare messages
messages = self._prepare_messages()
logger.debug(f"Sending {len(messages)} messages to LLM")
logger.info(f"Sending {len(messages)} messages to LLM")
# Prepare tool definitions (OpenAI/Claude format)
tools_schema = None
@@ -574,7 +574,7 @@ class AgentStreamExecutor:
raise Exception(f"{error_msg} (Status: {status_code}, Code: {error_code}, Type: {error_type})")
# Parse chunk
if isinstance(chunk, dict) and "choices" in chunk:
if isinstance(chunk, dict) and chunk.get("choices"):
choice = chunk["choices"][0]
delta = choice.get("delta", {})
+102 -57
View File
@@ -13,7 +13,6 @@ from plugins import *
import threading
# Global channel manager for restart support
_channel_mgr = None
@@ -21,92 +20,130 @@ def get_channel_manager():
return _channel_mgr
def _parse_channel_type(raw) -> list:
"""
Parse channel_type config value into a list of channel names.
Supports:
- single string: "feishu"
- comma-separated string: "feishu, dingtalk"
- list: ["feishu", "dingtalk"]
"""
if isinstance(raw, list):
return [ch.strip() for ch in raw if ch.strip()]
if isinstance(raw, str):
return [ch.strip() for ch in raw.split(",") if ch.strip()]
return []
class ChannelManager:
"""
Manage the lifecycle of a channel, supporting restart from sub-threads.
The channel.startup() runs in a daemon thread so that the main thread
remains available and a new channel can be started at any time.
Manage the lifecycle of multiple channels running concurrently.
Each channel.startup() runs in its own daemon thread.
The web channel is started as default console unless explicitly disabled.
"""
def __init__(self):
self._channel = None
self._channel_thread = None
self._channels = {} # channel_name -> channel instance
self._threads = {} # channel_name -> thread
self._primary_channel = None
self._lock = threading.Lock()
@property
def channel(self):
return self._channel
"""Return the primary (first non-web) channel for backward compatibility."""
return self._primary_channel
def start(self, channel_name: str, first_start: bool = False):
def get_channel(self, channel_name: str):
return self._channels.get(channel_name)
def start(self, channel_names: list, first_start: bool = False):
"""
Create and start a channel in a sub-thread.
Create and start one or more channels in sub-threads.
If first_start is True, plugins and linkai client will also be initialized.
"""
with self._lock:
channel = channel_factory.create_channel(channel_name)
self._channel = channel
channels = []
for name in channel_names:
ch = channel_factory.create_channel(name)
self._channels[name] = ch
channels.append((name, ch))
if self._primary_channel is None and name != "web":
self._primary_channel = ch
if self._primary_channel is None and channels:
self._primary_channel = channels[0][1]
if first_start:
if channel_name in ["wx", "wxy", "terminal", "wechatmp", "web",
"wechatmp_service", "wechatcom_app", "wework",
const.FEISHU, const.DINGTALK]:
PluginManager().load_plugins()
PluginManager().load_plugins()
if conf().get("use_linkai"):
try:
from common import cloud_client
threading.Thread(target=cloud_client.start, args=(channel, self), daemon=True).start()
except Exception as e:
threading.Thread(
target=cloud_client.start,
args=(self._primary_channel, self),
daemon=True,
).start()
except Exception:
pass
# Run channel.startup() in a daemon thread so we can restart later
self._channel_thread = threading.Thread(
target=self._run_channel, args=(channel,), daemon=True
)
self._channel_thread.start()
logger.debug(f"[ChannelManager] Channel '{channel_name}' started in sub-thread")
# Start web console first so its logs print cleanly,
# then start remaining channels after a brief pause.
web_entry = None
other_entries = []
for entry in channels:
if entry[0] == "web":
web_entry = entry
else:
other_entries.append(entry)
def _run_channel(self, channel):
ordered = ([web_entry] if web_entry else []) + other_entries
for i, (name, ch) in enumerate(ordered):
if i > 0 and name != "web":
time.sleep(0.1)
t = threading.Thread(target=self._run_channel, args=(name, ch), daemon=True)
self._threads[name] = t
t.start()
logger.debug(f"[ChannelManager] Channel '{name}' started in sub-thread")
def _run_channel(self, name: str, channel):
try:
channel.startup()
except Exception as e:
logger.error(f"[ChannelManager] Channel startup error: {e}")
logger.error(f"[ChannelManager] Channel '{name}' startup error: {e}")
logger.exception(e)
def stop(self):
def stop(self, channel_name: str = None):
"""
Stop the current channel. Since most channel startup() methods block
on an HTTP server or stream client, we stop by terminating the thread.
Stop channel(s). If channel_name is given, stop only that channel;
otherwise stop all channels.
"""
with self._lock:
if self._channel is None:
return
channel_type = getattr(self._channel, 'channel_type', 'unknown')
logger.info(f"[ChannelManager] Stopping channel '{channel_type}'...")
# Try graceful stop if channel implements it
try:
if hasattr(self._channel, 'stop'):
self._channel.stop()
except Exception as e:
logger.warning(f"[ChannelManager] Error during channel stop: {e}")
self._channel = None
self._channel_thread = None
names = [channel_name] if channel_name else list(self._channels.keys())
for name in names:
ch = self._channels.pop(name, None)
self._threads.pop(name, None)
if ch is None:
continue
logger.info(f"[ChannelManager] Stopping channel '{name}'...")
try:
if hasattr(ch, 'stop'):
ch.stop()
except Exception as e:
logger.warning(f"[ChannelManager] Error during channel '{name}' stop: {e}")
if channel_name and self._primary_channel is self._channels.get(channel_name):
self._primary_channel = None
def restart(self, new_channel_name: str):
"""
Restart the channel with a new channel type.
Restart a single channel with a new channel type.
Can be called from any thread (e.g. linkai config callback).
"""
logger.info(f"[ChannelManager] Restarting channel to '{new_channel_name}'...")
self.stop()
# Clear singleton cache so a fresh channel instance is created
self.stop(new_channel_name)
_clear_singleton_cache(new_channel_name)
time.sleep(1) # Brief pause to allow resources to release
self.start(new_channel_name, first_start=False)
time.sleep(1)
self.start([new_channel_name], first_start=False)
logger.info(f"[ChannelManager] Channel restarted to '{new_channel_name}' successfully")
@@ -130,14 +167,11 @@ def _clear_singleton_cache(channel_name: str):
module_path = cls_map.get(channel_name)
if not module_path:
return
# The singleton decorator stores instances in a closure dict keyed by class.
# We need to find the actual class and clear it from the closure.
try:
parts = module_path.rsplit(".", 1)
module_name, class_name = parts[0], parts[1]
import importlib
module = importlib.import_module(module_name)
# The module-level name is the wrapper function from @singleton
wrapper = getattr(module, class_name, None)
if wrapper and hasattr(wrapper, '__closure__') and wrapper.__closure__:
for cell in wrapper.__closure__:
@@ -176,17 +210,28 @@ def run():
# kill signal
sigterm_handler_wrap(signal.SIGTERM)
# create channel
channel_name = conf().get("channel_type", "wx")
# Parse channel_type into a list
raw_channel = conf().get("channel_type", "wx")
if "--cmd" in sys.argv:
channel_name = "terminal"
channel_names = ["terminal"]
else:
channel_names = _parse_channel_type(raw_channel)
if not channel_names:
channel_names = ["wx"]
if channel_name == "wxy":
if "wxy" in channel_names:
os.environ["WECHATY_LOG"] = "warn"
# Auto-start web console unless explicitly disabled
web_console_enabled = conf().get("web_console", True)
if web_console_enabled and "web" not in channel_names:
channel_names.append("web")
logger.info(f"[App] Starting channels: {channel_names}")
_channel_mgr = ChannelManager()
_channel_mgr.start(channel_name, first_start=True)
_channel_mgr.start(channel_names, first_start=True)
while True:
time.sleep(1)
+38 -1
View File
@@ -325,6 +325,10 @@ class AgentBridge:
logger.warning(f"[AgentBridge] Failed to attach context to scheduler: {e}")
break
# Record message count before execution so we can diff new messages
with agent.messages_lock:
pre_run_len = len(agent.messages)
try:
# Use agent's run_stream method with event handler
response = agent.run_stream(
@@ -336,9 +340,16 @@ class AgentBridge:
# Restore original tools
if context and context.get("is_scheduled_task"):
agent.tools = original_tools
# Log execution summary
event_handler.log_summary()
# Persist new messages generated during this run
if session_id:
channel_type = (context.get("channel_type") or "") if context else ""
with agent.messages_lock:
new_messages = agent.messages[pre_run_len:]
self._persist_messages(session_id, list(new_messages), channel_type)
# Check if there are files to send (from read tool)
if hasattr(agent, 'stream_executor') and hasattr(agent.stream_executor, 'files_to_send'):
@@ -475,6 +486,32 @@ class AgentBridge:
except Exception as e:
logger.warning(f"[AgentBridge] Failed to migrate API keys: {e}")
def _persist_messages(
self, session_id: str, new_messages: list, channel_type: str = ""
) -> None:
"""
Persist new messages to the conversation store after each agent run.
Failures are logged but never propagate — they must not interrupt replies.
"""
if not new_messages:
return
try:
from config import conf
if not conf().get("conversation_persistence", True):
return
except Exception:
pass
try:
from agent.memory import get_conversation_store
get_conversation_store().append_messages(
session_id, new_messages, channel_type=channel_type
)
except Exception as e:
logger.warning(
f"[AgentBridge] Failed to persist messages for session={session_id}: {e}"
)
def clear_session(self, session_id: str):
"""
Clear a specific session's agent and conversation history
+5 -5
View File
@@ -94,15 +94,15 @@ class AgentEventHandler:
def _send_to_channel(self, message):
"""
Try to send message to channel
Args:
message: Message to send
Try to send intermediate message to channel.
Skipped in SSE mode because thinking text is already streamed via on_event.
"""
if self.context and self.context.get("on_event"):
return
if self.channel:
try:
from bridge.reply import Reply, ReplyType
# Create a Reply object for the message
reply = Reply(ReplyType.TEXT, message)
self.channel._send(reply, self.context)
except Exception as e:
+40 -1
View File
@@ -118,8 +118,47 @@ class AgentInitializer:
# Attach memory manager
if memory_manager:
agent.memory_manager = memory_manager
# Restore persisted conversation history for this session
if session_id:
self._restore_conversation_history(agent, session_id)
return agent
def _restore_conversation_history(self, agent, session_id: str) -> None:
"""
Load persisted conversation messages from SQLite and inject them
into the agent's in-memory message list.
Only runs when conversation persistence is enabled (default: True).
Respects agent_max_context_turns to limit how many turns are loaded.
"""
from config import conf
if not conf().get("conversation_persistence", True):
return
try:
from agent.memory import get_conversation_store
store = get_conversation_store()
# On restore, load at most min(10, max_turns // 2) turns so that
# a long-running session does not immediately fill the context window
# after a restart. The full max_turns budget is reserved for the
# live conversation that follows.
max_turns = conf().get("agent_max_context_turns", 30)
restore_turns = min(6, max(1, max_turns // 3))
saved = store.load_messages(session_id, max_turns=restore_turns)
if saved:
with agent.messages_lock:
agent.messages = saved
logger.debug(
f"[AgentInitializer] Restored {len(saved)} messages "
f"({restore_turns} turns cap) for session={session_id}"
)
except Exception as e:
logger.warning(
f"[AgentInitializer] Failed to restore conversation history for "
f"session={session_id}: {e}"
)
def _load_env_file(self):
"""Load environment variables from .env file"""
+4 -1
View File
@@ -57,11 +57,14 @@ class Channel(object):
if context and "channel_type" not in context:
context["channel_type"] = self.channel_type
# Read on_event callback injected by the channel (e.g. web SSE)
on_event = context.get("on_event") if context else None
# Use agent bridge to handle the query
return Bridge().fetch_agent_reply(
query=query,
context=context,
on_event=None,
on_event=on_event,
clear_history=False
)
except Exception as e:
+8 -3
View File
@@ -24,11 +24,16 @@ handler_pool = ThreadPoolExecutor(max_workers=8) # 处理消息的线程池
class ChatChannel(Channel):
name = None # 登录的用户名
user_id = None # 登录的用户id
futures = {} # 记录每个session_id提交到线程池的future对象, 用于重置会话时把没执行的future取消掉,正在执行的不会被取消
sessions = {} # 用于控制并发,每个session_id同时只能有一个context在处理
lock = threading.Lock() # 用于控制对sessions的访问
def __init__(self):
# Instance-level attributes so each channel subclass has its own
# independent session queue and lock. Previously these were class-level,
# which caused contexts from one channel (e.g. Feishu) to be consumed
# by another channel's consume() thread (e.g. Web), leading to errors
# like "No request_id found in context".
self.futures = {}
self.sessions = {}
self.lock = threading.Lock()
_thread = threading.Thread(target=self.consume)
_thread.setDaemon(True)
_thread.start()
+498 -1510
View File
File diff suppressed because it is too large Load Diff
+239
View File
@@ -0,0 +1,239 @@
/* =====================================================================
CowAgent Console Styles
===================================================================== */
/* Animations */
@keyframes pulseDot {
0%, 80%, 100% { transform: scale(0.6); opacity: 0.4; }
40% { transform: scale(1); opacity: 1; }
}
/* Scrollbar */
* { scrollbar-width: thin; scrollbar-color: #94a3b8 transparent; }
::-webkit-scrollbar { width: 6px; height: 6px; }
::-webkit-scrollbar-track { background: transparent; }
::-webkit-scrollbar-thumb { background: #94a3b8; border-radius: 3px; }
::-webkit-scrollbar-thumb:hover { background: #64748b; }
.dark ::-webkit-scrollbar-thumb { background: #475569; }
.dark ::-webkit-scrollbar-thumb:hover { background: #64748b; }
/* Sidebar */
.sidebar-item.active {
background: rgba(255, 255, 255, 0.08);
color: #FFFFFF;
}
.sidebar-item.active .item-icon { color: #4ABE6E; }
/* Menu Groups */
.menu-group-items { max-height: 0; overflow: hidden; transition: max-height 0.25s ease-out; }
.menu-group.open .menu-group-items { max-height: 500px; transition: max-height 0.35s ease-in; }
.menu-group .chevron { transition: transform 0.25s ease; }
.menu-group.open .chevron { transform: rotate(90deg); }
/* View Switching */
.view { display: none; height: 100%; }
.view.active { display: flex; flex-direction: column; }
/* Markdown Content */
.msg-content p { margin: 0.5em 0; line-height: 1.7; }
.msg-content p:first-child { margin-top: 0; }
.msg-content p:last-child { margin-bottom: 0; }
.msg-content h1, .msg-content h2, .msg-content h3,
.msg-content h4, .msg-content h5, .msg-content h6 {
margin-top: 1.2em; margin-bottom: 0.6em; font-weight: 600; line-height: 1.3;
}
.msg-content h1 { font-size: 1.4em; }
.msg-content h2 { font-size: 1.25em; }
.msg-content h3 { font-size: 1.1em; }
.msg-content ul, .msg-content ol { margin: 0.5em 0; padding-left: 1.8em; }
.msg-content li { margin: 0.25em 0; }
.msg-content pre {
border-radius: 8px; overflow-x: auto; margin: 0.8em 0;
background: #f1f5f9; padding: 1em;
}
.dark .msg-content pre { background: #111111; }
.msg-content code {
font-family: 'JetBrains Mono', 'Fira Code', Consolas, monospace;
font-size: 0.875em;
}
.msg-content :not(pre) > code {
background: rgba(74, 190, 110, 0.1); color: #1C6B3B;
padding: 2px 6px; border-radius: 4px;
}
.dark .msg-content :not(pre) > code {
background: rgba(74, 190, 110, 0.15); color: #74E9A4;
}
.msg-content pre code { background: transparent; padding: 0; color: inherit; }
.msg-content blockquote {
border-left: 3px solid #4ABE6E; padding: 0.5em 1em;
margin: 0.8em 0; background: rgba(74, 190, 110, 0.05); border-radius: 0 6px 6px 0;
}
.dark .msg-content blockquote { background: rgba(74, 190, 110, 0.08); }
.msg-content table { border-collapse: collapse; width: 100%; margin: 0.8em 0; }
.msg-content th, .msg-content td {
border: 1px solid #e2e8f0; padding: 8px 12px; text-align: left;
}
.dark .msg-content th, .dark .msg-content td { border-color: rgba(255,255,255,0.1); }
.msg-content th { background: #f1f5f9; font-weight: 600; }
.dark .msg-content th { background: #111111; }
.msg-content img { max-width: 100%; height: auto; border-radius: 8px; margin: 0.5em 0; }
.msg-content a { color: #35A85B; text-decoration: underline; }
.msg-content a:hover { color: #228547; }
.msg-content hr { border: none; height: 1px; background: #e2e8f0; margin: 1.2em 0; }
.dark .msg-content hr { background: rgba(255,255,255,0.1); }
/* SSE Streaming cursor */
@keyframes blink { 0%, 100% { opacity: 1; } 50% { opacity: 0; } }
.sse-streaming::after {
content: '▋';
display: inline-block;
margin-left: 2px;
color: #4ABE6E;
animation: blink 0.9s step-end infinite;
font-size: 0.85em;
vertical-align: middle;
}
/* Agent steps (thinking summaries + tool indicators) */
.agent-steps:empty { display: none; }
.agent-steps:not(:empty) {
margin-bottom: 0.625rem;
padding-bottom: 0.5rem;
border-bottom: 1px dashed rgba(0, 0, 0, 0.08);
}
.dark .agent-steps:not(:empty) { border-bottom-color: rgba(255, 255, 255, 0.08); }
.agent-step {
font-size: 0.75rem;
line-height: 1.4;
color: #94a3b8;
margin-bottom: 0.25rem;
}
.agent-step:last-child { margin-bottom: 0; }
/* Thinking step - collapsible */
.agent-thinking-step .thinking-header {
display: flex;
align-items: center;
gap: 0.375rem;
cursor: pointer;
user-select: none;
}
.agent-thinking-step .thinking-header.no-toggle { cursor: default; }
.agent-thinking-step .thinking-header:not(.no-toggle):hover { color: #64748b; }
.dark .agent-thinking-step .thinking-header:not(.no-toggle):hover { color: #cbd5e1; }
.agent-thinking-step .thinking-header i:first-child { font-size: 0.625rem; margin-top: 1px; }
.agent-thinking-step .thinking-chevron {
font-size: 0.5rem;
margin-left: auto;
transition: transform 0.2s ease;
opacity: 0.5;
}
.agent-thinking-step.expanded .thinking-chevron { transform: rotate(90deg); }
.agent-thinking-step .thinking-full {
display: none;
margin-top: 0.375rem;
margin-left: 1rem;
padding: 0.5rem;
background: rgba(0, 0, 0, 0.02);
border-radius: 6px;
border: 1px solid rgba(0, 0, 0, 0.04);
font-size: 0.75rem;
line-height: 1.5;
color: #94a3b8;
max-height: 200px;
overflow-y: auto;
}
.dark .agent-thinking-step .thinking-full {
background: rgba(255, 255, 255, 0.02);
border-color: rgba(255, 255, 255, 0.04);
}
.agent-thinking-step.expanded .thinking-full { display: block; }
.agent-thinking-step .thinking-full p { margin: 0.25em 0; }
.agent-thinking-step .thinking-full p:first-child { margin-top: 0; }
.agent-thinking-step .thinking-full p:last-child { margin-bottom: 0; }
/* Tool step - collapsible */
.agent-tool-step .tool-header {
display: flex;
align-items: center;
gap: 0.375rem;
cursor: pointer;
user-select: none;
padding: 1px 0;
border-radius: 4px;
}
.agent-tool-step .tool-header:hover { color: #64748b; }
.dark .agent-tool-step .tool-header:hover { color: #cbd5e1; }
.agent-tool-step .tool-icon { font-size: 0.625rem; }
.agent-tool-step .tool-chevron {
font-size: 0.5rem;
margin-left: auto;
transition: transform 0.2s ease;
opacity: 0.5;
}
.agent-tool-step.expanded .tool-chevron { transform: rotate(90deg); }
.agent-tool-step .tool-time {
font-size: 0.65rem;
opacity: 0.6;
margin-left: 0.25rem;
}
/* Tool detail panel */
.agent-tool-step .tool-detail {
display: none;
margin-top: 0.375rem;
margin-left: 1rem;
padding: 0.5rem;
background: rgba(0, 0, 0, 0.02);
border-radius: 6px;
border: 1px solid rgba(0, 0, 0, 0.04);
}
.dark .agent-tool-step .tool-detail {
background: rgba(255, 255, 255, 0.02);
border-color: rgba(255, 255, 255, 0.04);
}
.agent-tool-step.expanded .tool-detail { display: block; }
.tool-detail-section { margin-bottom: 0.375rem; }
.tool-detail-section:last-child { margin-bottom: 0; }
.tool-detail-label {
font-size: 0.625rem;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.05em;
opacity: 0.6;
margin-bottom: 0.125rem;
}
.tool-detail-content {
font-family: 'JetBrains Mono', 'Fira Code', Consolas, monospace;
font-size: 0.7rem;
line-height: 1.5;
white-space: pre-wrap;
word-break: break-all;
max-height: 200px;
overflow-y: auto;
margin: 0;
padding: 0.25rem 0;
background: transparent;
color: inherit;
}
.tool-error-text { color: #f87171; }
/* Tool failed state */
.agent-tool-step.tool-failed .tool-name { color: #f87171; }
/* Chat Input */
#chat-input {
resize: none; height: 42px; max-height: 180px;
overflow-y: hidden;
transition: border-color 0.2s ease;
}
/* Placeholder Cards */
.placeholder-card {
transition: transform 0.2s ease, box-shadow 0.2s ease;
}
.placeholder-card:hover {
transform: translateY(-2px);
box-shadow: 0 8px 25px -5px rgba(0, 0, 0, 0.1);
}
File diff suppressed because it is too large Load Diff
+327 -76
View File
@@ -1,10 +1,15 @@
import sys
import time
import web
import json
import logging
import mimetypes
import os
import threading
import time
import uuid
import io
from queue import Queue, Empty
import web
from bridge.context import *
from bridge.reply import Reply, ReplyType
from channel.chat_channel import ChatChannel, check_prefix
@@ -12,20 +17,17 @@ from channel.chat_message import ChatMessage
from common.log import logger
from common.singleton import singleton
from config import conf
import os
import mimetypes # 添加这行来处理MIME类型
import threading
import logging
class WebMessage(ChatMessage):
def __init__(
self,
msg_id,
content,
ctype=ContextType.TEXT,
from_user_id="User",
to_user_id="Chatgpt",
other_user_id="Chatgpt",
self,
msg_id,
content,
ctype=ContextType.TEXT,
from_user_id="User",
to_user_id="Chatgpt",
other_user_id="Chatgpt",
):
self.msg_id = msg_id
self.ctype = ctype
@@ -39,7 +41,7 @@ class WebMessage(ChatMessage):
class WebChannel(ChatChannel):
NOT_SUPPORT_REPLYTYPE = [ReplyType.VOICE]
_instance = None
# def __new__(cls):
# if cls._instance is None:
# cls._instance = super(WebChannel, cls).__new__(cls)
@@ -47,12 +49,12 @@ class WebChannel(ChatChannel):
def __init__(self):
super().__init__()
self.msg_id_counter = 0 # 添加消息ID计数器
self.session_queues = {} # 存储session_id到队列的映射
self.request_to_session = {} # 存储request_idsession_id的映射
self.msg_id_counter = 0
self.session_queues = {} # session_id -> Queue (fallback polling)
self.request_to_session = {} # request_id -> session_id
self.sse_queues = {} # request_id -> Queue (SSE streaming)
self._http_server = None
def _generate_msg_id(self):
"""生成唯一的消息ID"""
self.msg_id_counter += 1
@@ -71,22 +73,30 @@ class WebChannel(ChatChannel):
if reply.type == ReplyType.IMAGE_URL:
time.sleep(0.5)
# 获取请求ID和会话ID
request_id = context.get("request_id", None)
if not request_id:
logger.error("No request_id found in context, cannot send message")
return
# 通过request_id获取session_id
session_id = self.request_to_session.get(request_id)
if not session_id:
logger.error(f"No session_id found for request {request_id}")
return
# 检查是否有会话队列
# SSE mode: push done event to SSE queue
if request_id in self.sse_queues:
content = reply.content if reply.content is not None else ""
self.sse_queues[request_id].put({
"type": "done",
"content": content,
"request_id": request_id,
"timestamp": time.time()
})
logger.debug(f"SSE done sent for request {request_id}")
return
# Fallback: polling mode
if session_id in self.session_queues:
# 创建响应数据,包含请求ID以区分不同请求的响应
response_data = {
"type": str(reply.type),
"content": reply.content,
@@ -94,69 +104,134 @@ class WebChannel(ChatChannel):
"request_id": request_id
}
self.session_queues[session_id].put(response_data)
logger.debug(f"Response sent to queue for session {session_id}, request {request_id}")
logger.debug(f"Response sent to poll queue for session {session_id}, request {request_id}")
else:
logger.warning(f"No response queue found for session {session_id}, response dropped")
except Exception as e:
logger.error(f"Error in send method: {e}")
def _make_sse_callback(self, request_id: str):
"""Build an on_event callback that pushes agent stream events into the SSE queue."""
def on_event(event: dict):
if request_id not in self.sse_queues:
return
q = self.sse_queues[request_id]
event_type = event.get("type")
data = event.get("data", {})
if event_type == "message_update":
delta = data.get("delta", "")
if delta:
q.put({"type": "delta", "content": delta})
elif event_type == "tool_execution_start":
tool_name = data.get("tool_name", "tool")
arguments = data.get("arguments", {})
q.put({"type": "tool_start", "tool": tool_name, "arguments": arguments})
elif event_type == "tool_execution_end":
tool_name = data.get("tool_name", "tool")
status = data.get("status", "success")
result = data.get("result", "")
exec_time = data.get("execution_time", 0)
# Truncate long results to avoid huge SSE payloads
result_str = str(result)
if len(result_str) > 2000:
result_str = result_str[:2000] + ""
q.put({
"type": "tool_end",
"tool": tool_name,
"status": status,
"result": result_str,
"execution_time": round(exec_time, 2)
})
return on_event
def post_message(self):
"""
Handle incoming messages from users via POST request.
Returns a request_id for tracking this specific request.
"""
try:
data = web.data() # 获取原始POST数据
data = web.data()
json_data = json.loads(data)
session_id = json_data.get('session_id', f'session_{int(time.time())}')
prompt = json_data.get('message', '')
# 生成请求ID
use_sse = json_data.get('stream', True)
request_id = self._generate_request_id()
# 将请求ID与会话ID关联
self.request_to_session[request_id] = session_id
# 确保会话队列存在
if session_id not in self.session_queues:
self.session_queues[session_id] = Queue()
# Web channel 不需要前缀,确保消息能通过前缀检查
if use_sse:
self.sse_queues[request_id] = Queue()
trigger_prefixs = conf().get("single_chat_prefix", [""])
if check_prefix(prompt, trigger_prefixs) is None:
# 如果没有匹配到前缀,给消息加上第一个前缀
if trigger_prefixs:
prompt = trigger_prefixs[0] + prompt
logger.debug(f"[WebChannel] Added prefix to message: {prompt}")
# 创建消息对象
msg = WebMessage(self._generate_msg_id(), prompt)
msg.from_user_id = session_id # 使用会话ID作为用户ID
# 创建上下文,明确指定 isgroup=False
msg.from_user_id = session_id
context = self._compose_context(ContextType.TEXT, prompt, msg=msg, isgroup=False)
# 检查 context 是否为 None(可能被插件过滤等)
if context is None:
logger.warning(f"[WebChannel] Context is None for session {session_id}, message may be filtered")
if request_id in self.sse_queues:
del self.sse_queues[request_id]
return json.dumps({"status": "error", "message": "Message was filtered"})
# 覆盖必要的字段(_compose_context 会设置默认值,但我们需要使用实际的 session_id)
context["session_id"] = session_id
context["receiver"] = session_id
context["request_id"] = request_id
# 异步处理消息 - 只传递上下文
if use_sse:
context["on_event"] = self._make_sse_callback(request_id)
threading.Thread(target=self.produce, args=(context,)).start()
# 返回请求ID
return json.dumps({"status": "success", "request_id": request_id})
return json.dumps({"status": "success", "request_id": request_id, "stream": use_sse})
except Exception as e:
logger.error(f"Error processing message: {e}")
return json.dumps({"status": "error", "message": str(e)})
def stream_response(self, request_id: str):
"""
SSE generator for a given request_id.
Yields UTF-8 encoded bytes to avoid WSGI Latin-1 mangling.
"""
if request_id not in self.sse_queues:
yield b"data: {\"type\": \"error\", \"message\": \"invalid request_id\"}\n\n"
return
q = self.sse_queues[request_id]
timeout = 300 # 5 minutes max
deadline = time.time() + timeout
try:
while time.time() < deadline:
try:
item = q.get(timeout=1)
except Empty:
yield b": keepalive\n\n"
continue
payload = json.dumps(item, ensure_ascii=False)
yield f"data: {payload}\n\n".encode("utf-8")
if item.get("type") == "done":
break
finally:
self.sse_queues.pop(request_id, None)
def poll_response(self):
"""
Poll for responses using the session_id.
@@ -165,28 +240,28 @@ class WebChannel(ChatChannel):
data = web.data()
json_data = json.loads(data)
session_id = json_data.get('session_id')
if not session_id or session_id not in self.session_queues:
return json.dumps({"status": "error", "message": "Invalid session ID"})
# 尝试从队列获取响应,不等待
try:
# 使用peek而不是get,这样如果前端没有成功处理,下次还能获取到
response = self.session_queues[session_id].get(block=False)
# 返回响应,包含请求ID以区分不同请求
return json.dumps({
"status": "success",
"status": "success",
"has_content": True,
"content": response["content"],
"request_id": response["request_id"],
"timestamp": response["timestamp"]
})
except Empty:
# 没有新响应
return json.dumps({"status": "success", "has_content": False})
except Exception as e:
logger.error(f"Error polling response: {e}")
return json.dumps({"status": "error", "message": str(e)})
@@ -199,9 +274,10 @@ class WebChannel(ChatChannel):
def startup(self):
port = conf().get("web_port", 9899)
# 打印可用渠道类型提示
logger.info("[WebChannel] 当前channel为web,可修改 config.json 配置文件中的 channel_type 字段进行切换。全部可用类型为:")
logger.info(
"[WebChannel] 全部可用通道如下,可修改 config.json 配置文件中的 channel_type 字段进行切换,多个通道用逗号分隔:")
logger.info("[WebChannel] 1. web - 网页")
logger.info("[WebChannel] 2. terminal - 终端")
logger.info("[WebChannel] 3. feishu - 飞书")
@@ -209,33 +285,40 @@ class WebChannel(ChatChannel):
logger.info("[WebChannel] 5. wechatcom_app - 企微自建应用")
logger.info("[WebChannel] 6. wechatmp - 个人公众号")
logger.info("[WebChannel] 7. wechatmp_service - 企业公众号")
logger.info(f"[WebChannel] 🌐 本地访问: http://localhost:{port}/chat")
logger.info(f"[WebChannel] 🌍 服务器访问: http://YOUR_IP:{port}/chat (请将YOUR_IP替换为服务器IP)")
logger.info("[WebChannel] ✅ Web对话网页已运行")
logger.info("[WebChannel] ✅ Web控制台已运行")
logger.info(f"[WebChannel] 🌐 本地访问: http://localhost:{port}")
logger.info(f"[WebChannel] 🌍 服务器访问: http://YOUR_IP:{port} (请将YOUR_IP替换为服务器IP)")
# 确保静态文件目录存在
static_dir = os.path.join(os.path.dirname(__file__), 'static')
if not os.path.exists(static_dir):
os.makedirs(static_dir)
logger.debug(f"[WebChannel] Created static directory: {static_dir}")
urls = (
'/', 'RootHandler',
'/message', 'MessageHandler',
'/poll', 'PollHandler',
'/stream', 'StreamHandler',
'/chat', 'ChatHandler',
'/config', 'ConfigHandler',
'/api/skills', 'SkillsHandler',
'/api/memory', 'MemoryHandler',
'/api/memory/content', 'MemoryContentHandler',
'/api/scheduler', 'SchedulerHandler',
'/api/history', 'HistoryHandler',
'/api/logs', 'LogsHandler',
'/assets/(.*)', 'AssetsHandler',
)
app = web.application(urls, globals(), autoreload=False)
# 完全禁用web.py的HTTP日志输出
web.httpserver.LogMiddleware.log = lambda self, status, environ: None
# 配置web.py的日志级别为ERROR
logging.getLogger("web").setLevel(logging.ERROR)
logging.getLogger("web.httpserver").setLevel(logging.ERROR)
# Build WSGI app with middleware (same as runsimple but without print)
func = web.httpserver.StaticMiddleware(app.wsgifunc())
func = web.httpserver.LogMiddleware(func)
@@ -272,6 +355,21 @@ class PollHandler:
return WebChannel().poll_response()
class StreamHandler:
def GET(self):
params = web.input(request_id='')
request_id = params.request_id
if not request_id:
raise web.badrequest()
web.header('Content-Type', 'text/event-stream; charset=utf-8')
web.header('Cache-Control', 'no-cache')
web.header('X-Accel-Buffering', 'no')
web.header('Access-Control-Allow-Origin', '*')
return WebChannel().stream_response(request_id)
class ChatHandler:
def GET(self):
# 正常返回聊天页面
@@ -282,28 +380,181 @@ class ChatHandler:
class ConfigHandler:
def GET(self):
"""返回前端需要的配置信息"""
"""Return configuration info for the web console."""
try:
use_agent = conf().get("agent", False)
local_config = conf()
use_agent = local_config.get("agent", False)
if use_agent:
title = "CowAgent"
subtitle = "我可以帮你解答问题、管理计算机、创造和执行技能,并通过长期记忆不断成长"
else:
title = "AI 助手"
subtitle = "我可以回答问题、提供信息或者帮助您完成各种任务"
title = "AI Assistant"
return json.dumps({
"status": "success",
"use_agent": use_agent,
"title": title,
"subtitle": subtitle
"model": local_config.get("model", ""),
"channel_type": local_config.get("channel_type", ""),
"agent_max_context_tokens": local_config.get("agent_max_context_tokens", ""),
"agent_max_context_turns": local_config.get("agent_max_context_turns", ""),
"agent_max_steps": local_config.get("agent_max_steps", ""),
})
except Exception as e:
logger.error(f"Error getting config: {e}")
return json.dumps({"status": "error", "message": str(e)})
def _get_workspace_root():
"""Resolve the agent workspace directory."""
from common.utils import expand_path
return expand_path(conf().get("agent_workspace", "~/cow"))
class SkillsHandler:
def GET(self):
web.header('Content-Type', 'application/json; charset=utf-8')
try:
from agent.skills.service import SkillService
from agent.skills.manager import SkillManager
workspace_root = _get_workspace_root()
manager = SkillManager(custom_dir=os.path.join(workspace_root, "skills"))
service = SkillService(manager)
skills = service.query()
return json.dumps({"status": "success", "skills": skills}, ensure_ascii=False)
except Exception as e:
logger.error(f"[WebChannel] Skills API error: {e}")
return json.dumps({"status": "error", "message": str(e)})
class MemoryHandler:
def GET(self):
web.header('Content-Type', 'application/json; charset=utf-8')
try:
from agent.memory.service import MemoryService
params = web.input(page='1', page_size='20')
workspace_root = _get_workspace_root()
service = MemoryService(workspace_root)
result = service.list_files(page=int(params.page), page_size=int(params.page_size))
return json.dumps({"status": "success", **result}, ensure_ascii=False)
except Exception as e:
logger.error(f"[WebChannel] Memory API error: {e}")
return json.dumps({"status": "error", "message": str(e)})
class MemoryContentHandler:
def GET(self):
web.header('Content-Type', 'application/json; charset=utf-8')
try:
from agent.memory.service import MemoryService
params = web.input(filename='')
if not params.filename:
return json.dumps({"status": "error", "message": "filename required"})
workspace_root = _get_workspace_root()
service = MemoryService(workspace_root)
result = service.get_content(params.filename)
return json.dumps({"status": "success", **result}, ensure_ascii=False)
except FileNotFoundError:
return json.dumps({"status": "error", "message": "file not found"})
except Exception as e:
logger.error(f"[WebChannel] Memory content API error: {e}")
return json.dumps({"status": "error", "message": str(e)})
class SchedulerHandler:
def GET(self):
web.header('Content-Type', 'application/json; charset=utf-8')
try:
from agent.tools.scheduler.task_store import TaskStore
workspace_root = _get_workspace_root()
store_path = os.path.join(workspace_root, "scheduler", "tasks.json")
store = TaskStore(store_path)
tasks = store.list_tasks()
return json.dumps({"status": "success", "tasks": tasks}, ensure_ascii=False)
except Exception as e:
logger.error(f"[WebChannel] Scheduler API error: {e}")
return json.dumps({"status": "error", "message": str(e)})
class HistoryHandler:
def GET(self):
"""
Return paginated conversation history for a session.
Query params:
session_id (required)
page int, default 1 (1 = most recent messages)
page_size int, default 20
"""
web.header('Content-Type', 'application/json; charset=utf-8')
web.header('Access-Control-Allow-Origin', '*')
try:
params = web.input(session_id='', page='1', page_size='20')
session_id = params.session_id.strip()
if not session_id:
return json.dumps({"status": "error", "message": "session_id required"})
from agent.memory import get_conversation_store
store = get_conversation_store()
result = store.load_history_page(
session_id=session_id,
page=int(params.page),
page_size=int(params.page_size),
)
return json.dumps({"status": "success", **result}, ensure_ascii=False)
except Exception as e:
logger.error(f"[WebChannel] History API error: {e}")
return json.dumps({"status": "error", "message": str(e)})
class LogsHandler:
def GET(self):
"""Stream the last N lines of run.log as SSE, then tail new lines."""
web.header('Content-Type', 'text/event-stream; charset=utf-8')
web.header('Cache-Control', 'no-cache')
web.header('X-Accel-Buffering', 'no')
from config import get_root
log_path = os.path.join(get_root(), "run.log")
def generate():
if not os.path.isfile(log_path):
yield b"data: {\"type\": \"error\", \"message\": \"run.log not found\"}\n\n"
return
# Read last 200 lines for initial display
try:
with open(log_path, 'r', encoding='utf-8', errors='replace') as f:
lines = f.readlines()
tail_lines = lines[-200:]
chunk = ''.join(tail_lines)
payload = json.dumps({"type": "init", "content": chunk}, ensure_ascii=False)
yield f"data: {payload}\n\n".encode('utf-8')
except Exception as e:
yield f"data: {{\"type\": \"error\", \"message\": \"{e}\"}}\n\n".encode('utf-8')
return
# Tail new lines
try:
with open(log_path, 'r', encoding='utf-8', errors='replace') as f:
f.seek(0, 2) # seek to end
deadline = time.time() + 600 # 10 min max
while time.time() < deadline:
line = f.readline()
if line:
payload = json.dumps({"type": "line", "content": line}, ensure_ascii=False)
yield f"data: {payload}\n\n".encode('utf-8')
else:
yield b": keepalive\n\n"
time.sleep(1)
except GeneratorExit:
return
except Exception:
return
return generate()
class AssetsHandler:
def GET(self, file_path): # 修改默认参数
try:
+3 -1
View File
@@ -160,7 +160,8 @@ available_setting = {
# chatgpt指令自定义触发词
"clear_memory_commands": ["#清除记忆"], # 重置会话指令,必须以#开头
# channel配置
"channel_type": "", # 通道类型,支持{wx,wxy,terminal,wechatmp,wechatmp_service,wechatcom_app,dingtalk}
"channel_type": "", # 通道类型,支持多渠道同时运行。单个: "feishu",多个: "feishu, dingtalk" 或 ["feishu", "dingtalk"]。可选值: web,feishu,dingtalk,wechatmp,wechatmp_service,wechatcom_app
"web_console": True, # 是否自动启动Web控制台(默认启动)。设为False可禁用
"subscribe_msg": "", # 订阅消息, 支持: wechatmp, wechatmp_service, wechatcom_app
"debug": False, # 是否开启debug模式,开启后会打印更多日志
"appdata_dir": "", # 数据目录
@@ -186,6 +187,7 @@ available_setting = {
"linkai_api_key": "",
"linkai_app_code": "",
"linkai_api_base": "https://api.link-ai.tech", # linkAI服务地址
"cloud_host": "client.link-ai.tech",
"minimax_api_key": "",
"Minimax_group_id": "",
"Minimax_base_url": "",