Add MCP media tools, real-time subscriptions, and UI

Major MCP extension: adds tools for media content retrieval, OCR, real-time message subscription with webhook push, chat activity analysis, user profile, and shared file search. Implements persistent subscription management, new prompt templates, and real-time resource endpoints. Updates UI to display active MCP subscriptions, enhances CSV export with MessageID, and documents all new features in the changelog. Removes Dockerfile and docker-compose.yml.
This commit is contained in:
lx1056758714-glitch
2025-12-18 21:37:55 +08:00
parent ec41b0d2be
commit 428dabe05a
19 changed files with 1254 additions and 99 deletions

View File

@@ -501,6 +501,53 @@ func (a *App) initMenu() {
Selected: a.selectAccountSelected,
}
mcpSubscriptions := &menu.Item{
Index: 9,
Name: "查看 MCP 订阅",
Description: "查看当前正在订阅的实时消息流及推送状态",
Selected: func(i *menu.Item) {
subs := a.m.GetMCPSubscriptions()
lastPushTime, lastPushTalker := a.m.GetMCPStatus()
status := "[red]未启动[white]"
if a.ctx.HTTPEnabled {
status = "[green]运行中[white]"
}
text := fmt.Sprintf("推送服务状态: %s\n", status)
text += fmt.Sprintf("推送服务地址: [cyan]%s[white]\n", a.ctx.HTTPAddr)
if !lastPushTime.IsZero() {
text += fmt.Sprintf("最近推送时间: [green]%s[white] (%s)\n", lastPushTime.Format("15:04:05"), lastPushTalker)
} else {
text += "最近推送时间: [gray]暂无推送[white]\n"
}
text += "[yellow]订阅信息已持久化保存到本地[white]\n\n"
if len(subs) == 0 {
text += "当前无活跃订阅。"
} else {
text += "活跃订阅列表:\n"
for _, sub := range subs {
statusStr := "[gray]等待中[white]"
if sub.LastStatus == "Success" {
statusStr = "[green]成功[white]"
} else if sub.LastStatus == "Failed" || sub.LastStatus == "Error" {
statusStr = fmt.Sprintf("[red]失败: %s[white]", sub.LastError)
}
text += fmt.Sprintf("- %s\n 状态: %s\n 推送地址: %s\n 订阅时间: %s\n", sub.Talker, statusStr, sub.WebhookURL, sub.LastTime.Format("2006-01-02 15:04:05"))
}
}
modal := tview.NewModal().
SetText(text).
AddButtons([]string{"返回"}).
SetDoneFunc(func(buttonIndex int, buttonLabel string) {
a.mainPages.RemovePage("modal")
})
a.mainPages.AddPage("modal", modal, true, true)
a.SetFocus(modal)
},
}
a.menu.AddItem(getDataKey)
a.menu.AddItem(restartAndGetDataKey)
a.menu.AddItem(decryptData)
@@ -508,9 +555,10 @@ func (a *App) initMenu() {
a.menu.AddItem(autoDecrypt)
a.menu.AddItem(setting)
a.menu.AddItem(selectAccount)
a.menu.AddItem(mcpSubscriptions)
a.menu.AddItem(&menu.Item{
Index: 9,
Index: 10,
Name: "退出",
Description: "退出程序",
Selected: func(i *menu.Item) {

View File

@@ -91,14 +91,26 @@ func (s *Service) GetMessages(start, end time.Time, talker string, sender string
return s.db.GetMessages(start, end, talker, sender, keyword, limit, offset)
}
func (s *Service) GetMessage(talker string, seq int64) (*model.Message, error) {
return s.db.GetMessage(talker, seq)
}
func (s *Service) GetContacts(key string, limit, offset int) (*wechatdb.GetContactsResp, error) {
return s.db.GetContacts(key, limit, offset)
}
func (s *Service) GetContact(key string) (*model.Contact, error) {
return s.db.GetContact(key)
}
func (s *Service) GetChatRooms(key string, limit, offset int) (*wechatdb.GetChatRoomsResp, error) {
return s.db.GetChatRooms(key, limit, offset)
}
func (s *Service) GetChatRoom(key string) (*model.ChatRoom, error) {
return s.db.GetChatRoom(key)
}
// GetSession retrieves session information
func (s *Service) GetSessions(key string, limit, offset int) (*wechatdb.GetSessionsResp, error) {
return s.db.GetSessions(key, limit, offset)

View File

@@ -3,7 +3,13 @@ package http
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
"sort"
"strings"
"time"
@@ -13,21 +19,132 @@ import (
"github.com/sjzar/chatlog/internal/chatlog/conf"
"github.com/sjzar/chatlog/internal/errors"
"github.com/sjzar/chatlog/internal/model"
"github.com/sjzar/chatlog/pkg/util"
"github.com/sjzar/chatlog/pkg/util/dat2img"
"github.com/sjzar/chatlog/pkg/util/silk"
"github.com/sjzar/chatlog/pkg/version"
)
func (s *Service) initMCPServer() {
s.mcpServer = server.NewMCPServer(conf.AppName, version.Version)
s.mcpServer = server.NewMCPServer(conf.AppName, version.Version,
server.WithResourceCapabilities(true, true),
server.WithToolCapabilities(true),
server.WithPromptCapabilities(true),
)
s.mcpServer.AddTool(ContactTool, s.handleMCPContact)
s.mcpServer.AddTool(ChatRoomTool, s.handleMCPChatRoom)
s.mcpServer.AddTool(RecentChatTool, s.handleMCPRecentChat)
s.mcpServer.AddTool(ChatLogTool, s.handleMCPChatLog)
s.mcpServer.AddTool(CurrentTimeTool, s.handleMCPCurrentTime)
s.mcpSSEServer = server.NewSSEServer(s.mcpServer)
s.mcpServer.AddTool(GetMediaContentTool, s.handleMCPGetMediaContent)
s.mcpServer.AddTool(OCRImageMessageTool, s.handleMCPOCRImageMessage)
s.mcpServer.AddTool(SubscribeNewMessagesTool, s.handleMCPSubscribeNewMessages)
s.mcpServer.AddTool(UnsubscribeNewMessagesTool, s.handleMCPUnsubscribeNewMessages)
s.mcpServer.AddTool(GetActiveSubscriptionsTool, s.handleMCPGetActiveSubscriptions)
s.mcpServer.AddTool(SendWebhookNotificationTool, s.handleMCPSendWebhookNotification)
s.mcpServer.AddTool(AnalyzeChatActivityTool, s.handleMCPAnalyzeChatActivity)
s.mcpServer.AddTool(GetUserProfileTool, s.handleMCPGetUserProfile)
s.mcpServer.AddTool(SearchSharedFilesTool, s.handleMCPSearchSharedFiles)
s.mcpServer.AddResourceTemplate(ChatLogRealtimeResource, s.handleMCPRealtimeResource)
s.mcpServer.AddPrompt(ChatSummaryDailyPrompt, s.handleMCPChatSummaryDaily)
s.mcpServer.AddPrompt(ConflictDetectorPrompt, s.handleMCPConflictDetector)
s.mcpServer.AddPrompt(RelationshipMilestonesPrompt, s.handleMCPRelationshipMilestones)
s.mcpSSEServer = server.NewSSEServer(s.mcpServer,
server.WithSSEEndpoint("/sse"),
server.WithMessageEndpoint("/message"),
)
s.mcpStreamableServer = server.NewStreamableHTTPServer(s.mcpServer)
go s.startMCPMessageMonitor()
}
var ChatSummaryDailyPrompt = mcp.NewPrompt(
"chat_summary_daily",
mcp.WithPromptDescription("生成每日聊天摘要模板。"),
mcp.WithArgument("date", mcp.ArgumentDescription("摘要日期 (YYYY-MM-DD)"), mcp.RequiredArgument()),
mcp.WithArgument("talker", mcp.ArgumentDescription("对话方 ID"), mcp.RequiredArgument()),
)
var ConflictDetectorPrompt = mcp.NewPrompt(
"conflict_detector",
mcp.WithPromptDescription("情绪与冲突检测模板。"),
mcp.WithArgument("talker", mcp.ArgumentDescription("对话方 ID"), mcp.RequiredArgument()),
)
var RelationshipMilestonesPrompt = mcp.NewPrompt(
"relationship_milestones",
mcp.WithPromptDescription("关系里程碑回顾模板。"),
mcp.WithArgument("talker", mcp.ArgumentDescription("对话方 ID"), mcp.RequiredArgument()),
)
var SearchSharedFilesTool = mcp.NewTool(
"search_shared_files",
mcp.WithDescription(`专门搜索聊天记录中发送的文件元数据。当用户想找某个特定的共享文件时使用。`),
mcp.WithString("talker", mcp.Description("对话方 ID"), mcp.Required()),
mcp.WithString("keyword", mcp.Description("文件名搜索关键词")),
)
var ChatLogRealtimeResource = mcp.NewResourceTemplate(
"chatlog://realtime/{talker}",
"实时消息流",
mcp.WithTemplateDescription("特定对话方的实时消息更新流。当收到资源更新通知时,读取此资源以获取最新消息。"),
mcp.WithTemplateMIMEType("text/plain"),
)
var AnalyzeChatActivityTool = mcp.NewTool(
"analyze_chat_activity",
mcp.WithDescription(`统计特定时间段内对话方的活跃度,包括发言频率、活跃时段等。用于分析某人的社交习惯或群聊热度。`),
mcp.WithString("time", mcp.Description("时间范围 (例如: 2023-04-01~2023-04-18)"), mcp.Required()),
mcp.WithString("talker", mcp.Description("对话方 ID"), mcp.Required()),
)
var GetUserProfileTool = mcp.NewTool(
"get_user_profile",
mcp.WithDescription(`获取联系人或群组的详细资料,包括备注、属性、群成员(如果是群组)等背景信息。用于更深入地了解对话方。`),
mcp.WithString("key", mcp.Description("联系人或群组的 ID 或名称"), mcp.Required()),
)
var SubscribeNewMessagesTool = mcp.NewTool(
"subscribe_new_messages",
mcp.WithDescription(`订阅特定联系人或群组的实时消息。订阅后,系统将监控新消息并在发现新消息时向指定的 Webhook 地址推送。建议在需要实时跟进对话时使用。`),
mcp.WithString("talker", mcp.Description("要订阅的对话方 ID联系人或群组"), mcp.Required()),
mcp.WithString("webhook_url", mcp.Description("推送新消息通知的 Webhook 地址"), mcp.Required()),
)
var UnsubscribeNewMessagesTool = mcp.NewTool(
"unsubscribe_new_messages",
mcp.WithDescription(`取消订阅特定联系人或群组的实时消息。取消后,将不再收到该对话方的新消息通知。`),
mcp.WithString("talker", mcp.Description("要取消订阅的对话方 ID联系人或群组"), mcp.Required()),
)
var GetActiveSubscriptionsTool = mcp.NewTool(
"get_active_subscriptions",
mcp.WithDescription(`获取当前正在订阅的活跃实时消息流列表。用于查看当前监控了哪些联系人或群组,方便管理或取消订阅。`),
)
var SendWebhookNotificationTool = mcp.NewTool(
"send_webhook_notification",
mcp.WithDescription(`触发外部 Webhook 通知。当模型完成聊天记录分析、发现重要事项或需要提醒外部系统时使用此工具。`),
mcp.WithString("url", mcp.Description("Webhook 接收地址"), mcp.Required()),
mcp.WithString("message", mcp.Description("要发送的通知内容或分析结果"), mcp.Required()),
mcp.WithString("level", mcp.Description("通知级别 (info, warn, error)")),
)
var OCRImageMessageTool = mcp.NewTool(
"ocr_image_message",
mcp.WithDescription(`对特定图片消息进行 OCR 解析以提取其中的文字。`),
mcp.WithString("talker", mcp.Description("消息所在的对话方(联系人 ID 或群 ID"), mcp.Required()),
mcp.WithNumber("message_id", mcp.Description("消息的唯一 ID (Seq)"), mcp.Required()),
)
var GetMediaContentTool = mcp.NewTool(
"get_media_content",
mcp.WithDescription(`根据消息 ID 获取解码后的媒体文件内容(图片或语音)。当聊天记录中显示 [图片] 或 [语音] 且用户需要查看具体内容或进行分析时使用此工具。`),
mcp.WithString("talker", mcp.Description("消息所在的对话方(联系人 ID 或群 ID"), mcp.Required()),
mcp.WithNumber("message_id", mcp.Description("消息的唯一 ID (Seq)"), mcp.Required()),
)
var ContactTool = mcp.NewTool(
"query_contact",
mcp.WithDescription(`查询用户的联系人信息。可以通过姓名、备注名或ID进行查询返回匹配的联系人列表。当用户询问某人的联系方式、想了解联系人信息或需要查找特定联系人时使用此工具。参数为空时将返回联系人列表`),
@@ -95,8 +212,9 @@ var ChatLogTool = mcp.NewTool(
- 我是否分析了所有上下文后再回答?
- 如果上述任一问题答案为"否",则必须纠正流程
返回格式:"昵称(ID) 时间\n消息内容\n昵称(ID) 时间\n消息内容"
查询多个Talker时返回格式为"昵称(ID)\n[TalkerName(Talker)] 时间\n消息内容"
返回格式:"昵称(ID) [MessageID] 时间\n消息内容\n昵称(ID) [MessageID] 时间\n消息内容"
消息内容包含 [图片] 或 [语音] 时,可以使用 get_media_content 或 ocr_image_message 工具,并传入对应的 [MessageID] 来获取具体内容
当查询多个Talker时返回格式为"昵称(ID)\n[TalkerName(Talker)] [MessageID] 时间\n消息内容"
重要提示:
1. 当用户询问特定时间段内的聊天记录时,必须使用正确的时间格式,特别是包含小时和分钟的查询
@@ -326,3 +444,701 @@ func (s *Service) handleMCPCurrentTime(ctx context.Context, request mcp.CallTool
},
}, nil
}
type GetMediaContentRequest struct {
Talker string `json:"talker"`
MessageID int64 `json:"message_id"`
}
func (s *Service) handleMCPGetMediaContent(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
var req GetMediaContentRequest
if err := request.BindArguments(&req); err != nil {
return errors.ErrMCPTool(err), nil
}
msg, err := s.db.GetMessage(req.Talker, req.MessageID)
if err != nil {
log.Error().Err(err).Msg("Failed to get message")
return errors.ErrMCPTool(err), nil
}
switch msg.Type {
case model.MessageTypeImage:
return s.handleMCPGetImage(ctx, msg)
case model.MessageTypeVoice:
return s.handleMCPGetVoice(ctx, msg)
default:
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("暂不支持的消息类型: %d", msg.Type),
},
},
}, nil
}
}
func (s *Service) handleMCPOCRImageMessage(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
var req GetMediaContentRequest
if err := request.BindArguments(&req); err != nil {
return errors.ErrMCPTool(err), nil
}
msg, err := s.db.GetMessage(req.Talker, req.MessageID)
if err != nil {
log.Error().Err(err).Msg("Failed to get message")
return errors.ErrMCPTool(err), nil
}
if msg.Type != model.MessageTypeImage {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: "该消息不是图片消息,无法进行 OCR 解析。",
},
},
}, nil
}
result, err := s.handleMCPGetImage(ctx, msg)
if err != nil {
return result, err
}
// 在结果中添加一条提示信息
result.Content = append([]mcp.Content{
mcp.TextContent{
Type: "text",
Text: "已提取图片数据,请直接分析该图片内容并提取文字 (OCR)。",
},
}, result.Content...)
return result, nil
}
func (s *Service) handleMCPGetImage(ctx context.Context, msg *model.Message) (*mcp.CallToolResult, error) {
key, ok := msg.Contents["md5"].(string)
if !ok {
// 尝试从 path 获取
key, _ = msg.Contents["path"].(string)
}
if key == "" {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: "无法找到图片标识符",
},
},
}, nil
}
media, err := s.db.GetMedia("image", key)
if err != nil {
return errors.ErrMCPTool(err), nil
}
absolutePath := filepath.Join(s.conf.GetDataDir(), media.Path)
b, err := os.ReadFile(absolutePath)
if err != nil {
return errors.ErrMCPTool(err), nil
}
var data []byte
var mimeType string
if strings.HasSuffix(strings.ToLower(media.Path), ".dat") {
out, ext, err := dat2img.Dat2Image(b)
if err != nil {
return errors.ErrMCPTool(err), nil
}
data = out
switch ext {
case "png":
mimeType = "image/png"
case "gif":
mimeType = "image/gif"
case "bmp":
mimeType = "image/bmp"
default:
mimeType = "image/jpeg"
}
} else {
data = b
ext := strings.ToLower(filepath.Ext(media.Path))
switch ext {
case ".png":
mimeType = "image/png"
case ".gif":
mimeType = "image/gif"
case ".bmp":
mimeType = "image/bmp"
default:
mimeType = "image/jpeg"
}
}
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.ImageContent{
Type: "image",
Data: base64.StdEncoding.EncodeToString(data),
MIMEType: mimeType,
},
},
}, nil
}
func (s *Service) handleMCPGetVoice(ctx context.Context, msg *model.Message) (*mcp.CallToolResult, error) {
key, ok := msg.Contents["voice"].(string)
if !ok {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: "无法找到语音标识符",
},
},
}, nil
}
media, err := s.db.GetMedia("voice", key)
if err != nil {
return errors.ErrMCPTool(err), nil
}
out, err := silk.Silk2MP3(media.Data)
if err != nil {
// 如果转换失败,返回 base64 编码的原始数据
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("语音转换失败: %v。原始语音数据(base64): %s", err, base64.StdEncoding.EncodeToString(media.Data)),
},
},
}, nil
}
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("语音已转换为 MP3 格式。数据(base64): %s", base64.StdEncoding.EncodeToString(out)),
},
},
}, nil
}
type SubscribeNewMessagesRequest struct {
Talker string `json:"talker"`
WebhookURL string `json:"webhook_url"`
}
func (s *Service) handleMCPSubscribeNewMessages(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
var req SubscribeNewMessagesRequest
if err := request.BindArguments(&req); err != nil {
return errors.ErrMCPTool(err), nil
}
if req.Talker == "" || req.WebhookURL == "" {
return errors.ErrMCPTool(fmt.Errorf("talker and webhook_url are required")), nil
}
// 初始化订阅
s.mcpSubMu.Lock()
s.mcpSubscriptions[req.Talker] = &Subscription{
Talker: req.Talker,
WebhookURL: req.WebhookURL,
LastTime: time.Now(),
}
s.mcpSubMu.Unlock()
s.saveSubscriptions()
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("已成功订阅 %s 的实时消息。推送地址: %s", req.Talker, req.WebhookURL),
},
},
}, nil
}
func (s *Service) handleMCPUnsubscribeNewMessages(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
var req SubscribeNewMessagesRequest
if err := request.BindArguments(&req); err != nil {
return errors.ErrMCPTool(err), nil
}
if req.Talker == "" {
return errors.ErrMCPTool(fmt.Errorf("talker is required")), nil
}
// 删除订阅
s.mcpSubMu.Lock()
delete(s.mcpSubscriptions, req.Talker)
s.mcpSubMu.Unlock()
s.saveSubscriptions()
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("已成功取消订阅 %s 的实时消息。", req.Talker),
},
},
}, nil
}
func (s *Service) handleMCPGetActiveSubscriptions(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
s.mcpSubMu.RLock()
defer s.mcpSubMu.RUnlock()
if len(s.mcpSubscriptions) == 0 {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: "当前没有活跃的订阅。",
},
},
}, nil
}
buf := &bytes.Buffer{}
buf.WriteString("当前活跃订阅列表:\n")
for talker, sub := range s.mcpSubscriptions {
buf.WriteString(fmt.Sprintf("- %s (Webhook: %s)\n", talker, sub.WebhookURL))
}
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: buf.String(),
},
},
}, nil
}
type SendWebhookNotificationRequest struct {
URL string `json:"url"`
Message string `json:"message"`
Level string `json:"level"`
}
func (s *Service) handleMCPSendWebhookNotification(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
var req SendWebhookNotificationRequest
if err := request.BindArguments(&req); err != nil {
return errors.ErrMCPTool(err), nil
}
payload := map[string]interface{}{
"message": req.Message,
"level": req.Level,
"timestamp": time.Now().Format(time.RFC3339),
"source": "chatlog-mcp",
}
body, err := json.Marshal(payload)
if err != nil {
return errors.ErrMCPTool(err), nil
}
httpReq, err := http.NewRequestWithContext(ctx, "POST", req.URL, bytes.NewBuffer(body))
if err != nil {
return errors.ErrMCPTool(err), nil
}
httpReq.Header.Set("Content-Type", "application/json")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(httpReq)
if err != nil {
return errors.ErrMCPTool(err), nil
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return errors.ErrMCPTool(fmt.Errorf("webhook returned status %d", resp.StatusCode)), nil
}
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: "Webhook 通知发送成功。",
},
},
}, nil
}
type AnalyzeChatActivityRequest struct {
Time string `json:"time"`
Talker string `json:"talker"`
}
func (s *Service) handleMCPAnalyzeChatActivity(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
var req AnalyzeChatActivityRequest
if err := request.BindArguments(&req); err != nil {
return errors.ErrMCPTool(err), nil
}
start, end, ok := util.TimeRangeOf(req.Time)
if !ok {
return errors.ErrMCPTool(fmt.Errorf("invalid time format")), nil
}
messages, err := s.db.GetMessages(start, end, req.Talker, "", "", 0, 0)
if err != nil {
return errors.ErrMCPTool(err), nil
}
if len(messages) == 0 {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: "该时间段内没有聊天记录。",
},
},
}, nil
}
// 统计逻辑
totalCount := len(messages)
senderStats := make(map[string]int)
hourStats := make(map[int]int)
typeStats := make(map[int64]int)
for _, m := range messages {
sender := m.SenderName
if sender == "" {
sender = m.Sender
}
senderStats[sender]++
hourStats[m.Time.Hour()]++
typeStats[m.Type]++
}
buf := &bytes.Buffer{}
buf.WriteString(fmt.Sprintf("分析报告 (%s - %s)\n", start.Format(time.DateOnly), end.Format(time.DateOnly)))
buf.WriteString(fmt.Sprintf("总消息数: %d\n\n", totalCount))
buf.WriteString("发言频率排行:\n")
type senderStat struct {
Name string
Count int
}
ss := make([]senderStat, 0, len(senderStats))
for name, count := range senderStats {
ss = append(ss, senderStat{name, count})
}
sort.Slice(ss, func(i, j int) bool { return ss[i].Count > ss[j].Count })
for i, s := range ss {
if i >= 10 {
break
} // 只显示前 10
percentage := float64(s.Count) / float64(totalCount) * 100
buf.WriteString(fmt.Sprintf("- %s: %d (%.1f%%)\n", s.Name, s.Count, percentage))
}
buf.WriteString("\n活跃时段分布:\n")
for h := 0; h < 24; h++ {
if count, ok := hourStats[h]; ok {
buf.WriteString(fmt.Sprintf("%02d:00: %s (%d)\n", h, strings.Repeat("█", (count*20+totalCount-1)/totalCount), count))
}
}
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: buf.String(),
},
},
}, nil
}
type GetUserProfileRequest struct {
Key string `json:"key"`
}
func (s *Service) handleMCPGetUserProfile(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
var req GetUserProfileRequest
if err := request.BindArguments(&req); err != nil {
return errors.ErrMCPTool(err), nil
}
buf := &bytes.Buffer{}
// 尝试作为群聊获取
if chatRoom, err := s.db.GetChatRoom(req.Key); err == nil {
buf.WriteString(fmt.Sprintf("【群聊资料】\n"))
buf.WriteString(fmt.Sprintf("ID: %s\n", chatRoom.Name))
buf.WriteString(fmt.Sprintf("名称: %s\n", chatRoom.NickName))
if chatRoom.Remark != "" {
buf.WriteString(fmt.Sprintf("备注: %s\n", chatRoom.Remark))
}
buf.WriteString(fmt.Sprintf("群主: %s\n", chatRoom.Owner))
buf.WriteString(fmt.Sprintf("成员数: %d\n", len(chatRoom.Users)))
buf.WriteString("\n部分成员列表:\n")
for i, user := range chatRoom.Users {
if i >= 20 {
buf.WriteString("... 等等\n")
break
}
displayName := chatRoom.User2DisplayName[user.UserName]
buf.WriteString(fmt.Sprintf("- %s (%s)\n", displayName, user.UserName))
}
} else if contact, err := s.db.GetContact(req.Key); err == nil {
// 尝试作为联系人获取
buf.WriteString(fmt.Sprintf("【联系人资料】\n"))
buf.WriteString(fmt.Sprintf("ID: %s\n", contact.UserName))
buf.WriteString(fmt.Sprintf("昵称: %s\n", contact.NickName))
if contact.Remark != "" {
buf.WriteString(fmt.Sprintf("备注: %s\n", contact.Remark))
}
if contact.Alias != "" {
buf.WriteString(fmt.Sprintf("微信号: %s\n", contact.Alias))
}
buf.WriteString(fmt.Sprintf("是否好友: %v\n", contact.IsFriend))
} else {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("未找到相关联系人或群组: %s", req.Key),
},
},
}, nil
}
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: buf.String(),
},
},
}, nil
}
type SearchSharedFilesRequest struct {
Talker string `json:"talker"`
Keyword string `json:"keyword"`
}
func (s *Service) handleMCPSearchSharedFiles(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
var req SearchSharedFilesRequest
if err := request.BindArguments(&req); err != nil {
return errors.ErrMCPTool(err), nil
}
// 查找 MessageTypeShare (49) 且 MessageSubTypeFile (6)
messages, err := s.db.GetMessages(time.Time{}, time.Now(), req.Talker, "", req.Keyword, 50, 0)
if err != nil {
return errors.ErrMCPTool(err), nil
}
buf := &bytes.Buffer{}
count := 0
for _, m := range messages {
if m.Type == model.MessageTypeShare && m.SubType == model.MessageSubTypeFile {
title, _ := m.Contents["title"].(string)
buf.WriteString(fmt.Sprintf("[%d] %s - %s\n", m.Seq, m.Time.Format("2006-01-02 15:04"), title))
count++
}
}
if count == 0 {
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: "未找到相关共享文件。",
},
},
}, nil
}
return &mcp.CallToolResult{
Content: []mcp.Content{
mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("找到 %d 个文件:\n%s", count, buf.String()),
},
},
}, nil
}
func (s *Service) handleMCPChatSummaryDaily(ctx context.Context, request mcp.GetPromptRequest) (*mcp.GetPromptResult, error) {
date := request.Params.Arguments["date"]
talker := request.Params.Arguments["talker"]
return mcp.NewGetPromptResult(
"每日聊天摘要指令",
[]mcp.PromptMessage{
mcp.NewPromptMessage(mcp.RoleUser, mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("请分析并在总结 %s 在 %s 的聊天内容。请先使用 query_chat_log 获取当天的完整记录,然后从关键话题、重要决策、待办事项三个维度进行总结。", talker, date),
}),
},
), nil
}
func (s *Service) handleMCPConflictDetector(ctx context.Context, request mcp.GetPromptRequest) (*mcp.GetPromptResult, error) {
talker := request.Params.Arguments["talker"]
return mcp.NewGetPromptResult(
"情绪与冲突检测指令",
[]mcp.PromptMessage{
mcp.NewPromptMessage(mcp.RoleUser, mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("请分析与 %s 最近的聊天记录,识别是否存在潜在的情绪波动或冲突。请关注语气变化、负面词汇频率以及争议性话题。", talker),
}),
},
), nil
}
func (s *Service) handleMCPRelationshipMilestones(ctx context.Context, request mcp.GetPromptRequest) (*mcp.GetPromptResult, error) {
talker := request.Params.Arguments["talker"]
return mcp.NewGetPromptResult(
"关系里程碑回顾指令",
[]mcp.PromptMessage{
mcp.NewPromptMessage(mcp.RoleUser, mcp.TextContent{
Type: "text",
Text: fmt.Sprintf("请回顾与 %s 的历史聊天记录,找出重要的关系里程碑(如:初次相识、重大合作达成、共同解决的危机等)。", talker),
}),
},
), nil
}
func (s *Service) handleMCPRealtimeResource(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) {
uri := request.Params.URI
talker := ""
// 尝试从路径解析 chatlog://realtime/{talker}
if strings.HasPrefix(uri, "chatlog://realtime/") {
talker = strings.TrimPrefix(uri, "chatlog://realtime/")
}
if talker == "" {
return nil, fmt.Errorf("talker not found in URI: %s", uri)
}
// 获取最近消息,取较多数量以确保能涵盖最新动态,然后截取最后 10 条
now := time.Now()
start := now.Add(-10 * time.Minute)
messages, err := s.db.GetMessages(start, now, talker, "", "", 100, 0)
if err != nil {
return nil, err
}
if len(messages) > 10 {
messages = messages[len(messages)-10:]
}
buf := &bytes.Buffer{}
if len(messages) == 0 {
buf.WriteString("目前没有新消息")
} else {
for _, m := range messages {
buf.WriteString(m.PlainText(false, "15:04:05", ""))
buf.WriteString("\n")
}
}
return []mcp.ResourceContents{
mcp.TextResourceContents{
URI: uri,
MIMEType: "text/plain",
Text: buf.String(),
},
}, nil
}
func (s *Service) startMCPMessageMonitor() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.mcpSubMu.RLock()
// 创建副本以减少锁持有时间
subscriptions := make([]*Subscription, 0, len(s.mcpSubscriptions))
for _, sub := range s.mcpSubscriptions {
subscriptions = append(subscriptions, &Subscription{
Talker: sub.Talker,
WebhookURL: sub.WebhookURL,
LastTime: sub.LastTime,
LastStatus: sub.LastStatus,
LastError: sub.LastError,
})
}
s.mcpSubMu.RUnlock()
for _, sub := range subscriptions {
now := time.Now()
messages, err := s.db.GetMessages(sub.LastTime, now, sub.Talker, "", "", 10, 0)
if err != nil {
log.Error().Err(err).Str("talker", sub.Talker).Msg("Monitor failed to get messages")
continue
}
if len(messages) > 0 {
// 准备推送内容
buf := &bytes.Buffer{}
for _, m := range messages {
buf.WriteString(m.PlainText(false, "15:04:05", ""))
buf.WriteString("\n")
}
payload := map[string]interface{}{
"message": buf.String(),
"talker": sub.Talker,
"timestamp": time.Now().Format(time.RFC3339),
"source": "chatlog-monitor",
}
body, _ := json.Marshal(payload)
go func(talker, url string, data []byte) {
httpReq, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
if err != nil {
s.updateSubscriptionStatus(talker, "Error", err.Error())
return
}
httpReq.Header.Set("Content-Type", "application/json")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(httpReq)
if err != nil {
s.updateSubscriptionStatus(talker, "Failed", err.Error())
return
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
s.updateSubscriptionStatus(talker, "Failed", fmt.Sprintf("Status %d", resp.StatusCode))
} else {
s.updateSubscriptionStatus(talker, "Success", "")
}
}(sub.Talker, sub.WebhookURL, body)
// 更新最后一次检查的时间为最新消息的时间 + 1秒
s.mcpSubMu.Lock()
if currentSub, ok := s.mcpSubscriptions[sub.Talker]; ok {
currentSub.LastTime = messages[len(messages)-1].Time.Add(time.Second)
}
s.lastPushTime = time.Now()
s.lastPushTalker = sub.Talker
s.mcpSubMu.Unlock()
log.Info().Str("talker", sub.Talker).Int("count", len(messages)).Str("webhook", sub.WebhookURL).Msg("Sent Webhook push notification")
}
}
}
}
}

View File

@@ -134,7 +134,7 @@ func (s *Service) handleChatlog(c *gin.Context) {
c.Writer.Flush()
csvWriter := csv.NewWriter(c.Writer)
csvWriter.Write([]string{"Time", "SenderName", "Sender", "TalkerName", "Talker", "Content"})
csvWriter.Write([]string{"MessageID", "Time", "SenderName", "Sender", "TalkerName", "Talker", "Content"})
for _, m := range messages {
csvWriter.Write(m.CSV(c.Request.Host))
}

View File

@@ -2,7 +2,11 @@ package http
import (
"context"
"encoding/json"
"net/http"
"os"
"path/filepath"
"sync"
"time"
"github.com/gin-gonic/gin"
@@ -13,6 +17,14 @@ import (
"github.com/sjzar/chatlog/internal/errors"
)
type Subscription struct {
Talker string `json:"talker"`
WebhookURL string `json:"webhook_url"`
LastTime time.Time `json:"last_time"`
LastStatus string `json:"last_status"`
LastError string `json:"last_error"`
}
type Service struct {
conf Config
db *database.Service
@@ -23,6 +35,15 @@ type Service struct {
mcpServer *server.MCPServer
mcpSSEServer *server.SSEServer
mcpStreamableServer *server.StreamableHTTPServer
// MCP 实时消息订阅
mcpSubscriptions map[string]*Subscription
mcpSubMu sync.RWMutex
lastPushTime time.Time
lastPushTalker string
subscriptionPath string
}
type Config interface {
@@ -49,16 +70,51 @@ func NewService(conf Config, db *database.Service) *Service {
)
s := &Service{
conf: conf,
db: db,
router: router,
conf: conf,
db: db,
router: router,
mcpSubscriptions: make(map[string]*Subscription),
subscriptionPath: filepath.Join(conf.GetDataDir(), "subscriptions.json"),
}
s.loadSubscriptions()
s.initMCPServer()
s.initRouter()
return s
}
func (s *Service) saveSubscriptions() {
s.mcpSubMu.RLock()
defer s.mcpSubMu.RUnlock()
data, err := json.MarshalIndent(s.mcpSubscriptions, "", " ")
if err != nil {
log.Error().Err(err).Msg("Failed to marshal subscriptions")
return
}
if err := os.WriteFile(s.subscriptionPath, data, 0644); err != nil {
log.Error().Err(err).Msg("Failed to save subscriptions")
}
}
func (s *Service) loadSubscriptions() {
s.mcpSubMu.Lock()
defer s.mcpSubMu.Unlock()
data, err := os.ReadFile(s.subscriptionPath)
if err != nil {
if !os.IsNotExist(err) {
log.Error().Err(err).Msg("Failed to read subscriptions file")
}
return
}
if err := json.Unmarshal(data, &s.mcpSubscriptions); err != nil {
log.Error().Err(err).Msg("Failed to unmarshal subscriptions")
}
}
func (s *Service) Start() error {
s.server = &http.Server{
@@ -108,6 +164,38 @@ func (s *Service) Stop() error {
return nil
}
func (s *Service) updateSubscriptionStatus(talker, status, errMsg string) {
s.mcpSubMu.Lock()
if sub, ok := s.mcpSubscriptions[talker]; ok {
sub.LastStatus = status
sub.LastError = errMsg
}
s.mcpSubMu.Unlock()
s.saveSubscriptions()
}
func (s *Service) GetRouter() *gin.Engine {
return s.router
}
func (s *Service) GetMCPSubscriptions() []*Subscription {
s.mcpSubMu.RLock()
defer s.mcpSubMu.RUnlock()
res := make([]*Subscription, 0, len(s.mcpSubscriptions))
for _, sub := range s.mcpSubscriptions {
res = append(res, &Subscription{
Talker: sub.Talker,
WebhookURL: sub.WebhookURL,
LastTime: sub.LastTime,
LastStatus: sub.LastStatus,
LastError: sub.LastError,
})
}
return res
}
func (s *Service) GetMCPStatus() (time.Time, string) {
s.mcpSubMu.RLock()
defer s.mcpSubMu.RUnlock()
return s.lastPushTime, s.lastPushTalker
}

View File

@@ -420,6 +420,20 @@ func (m *Manager) GetLatestSession() (*model.Session, error) {
return nil, nil
}
func (m *Manager) GetMCPSubscriptions() []*http.Subscription {
if m.http == nil {
return nil
}
return m.http.GetMCPSubscriptions()
}
func (m *Manager) GetMCPStatus() (time.Time, string) {
if m.http == nil {
return time.Time{}, ""
}
return m.http.GetMCPStatus()
}
func (m *Manager) CommandKey(configPath string, pid int, force bool, showXorKey bool) (string, error) {
var err error

View File

@@ -9,6 +9,7 @@ var (
ErrTalkerEmpty = New(nil, http.StatusBadRequest, "talker empty").WithStack()
ErrKeyEmpty = New(nil, http.StatusBadRequest, "key empty").WithStack()
ErrMediaNotFound = New(nil, http.StatusNotFound, "media not found").WithStack()
ErrMessageNotFound = New(nil, http.StatusNotFound, "message not found").WithStack()
ErrKeyLengthMust32 = New(nil, http.StatusBadRequest, "key length must be 32 bytes").WithStack()
)

View File

@@ -105,7 +105,8 @@ const (
type Message struct {
Version string `json:"-"` // 消息版本,内部判断
Seq int64 `json:"seq"` // 消息序号10位时间戳 + 3位序号
Seq int64 `json:"seq"` // 唯一序列号 (timestamp * 1000000 + local_id)
ID int64 `json:"id"` // 冗余 ID 字段,确保某些客户端能正确解析
Time time.Time `json:"time"` // 消息创建时间10位时间戳
Talker string `json:"talker"` // 聊天对象,微信 ID or 群 ID
TalkerName string `json:"talkerName"` // 聊天对象名称
@@ -318,6 +319,8 @@ func (m *Message) PlainText(showChatRoom bool, timeFormat string, host string) s
}
buf.WriteString(" ")
buf.WriteString(fmt.Sprintf("[%d] ", m.Seq))
if m.IsChatRoom && showChatRoom {
buf.WriteString("[")
if m.TalkerName != "" {
@@ -345,6 +348,9 @@ func (m *Message) PlainTextContent() string {
case MessageTypeText:
return m.Content
case MessageTypeImage:
if host, _ := m.Contents["host"].(string); host == "" {
return "[图片]"
}
keylist := make([]string, 0)
if m.Contents["md5"] != nil {
if md5, ok := m.Contents["md5"].(string); ok {
@@ -363,6 +369,9 @@ func (m *Message) PlainTextContent() string {
}
return fmt.Sprintf("![图片](http://%s/image/%s)", m.Contents["host"], strings.Join(keylist, ","))
case MessageTypeVoice:
if host, _ := m.Contents["host"].(string); host == "" {
return "[语音]"
}
if voice, ok := m.Contents["voice"]; ok {
return fmt.Sprintf("[语音](http://%s/voice/%s)", m.Contents["host"], voice)
}
@@ -370,6 +379,9 @@ func (m *Message) PlainTextContent() string {
case MessageTypeCard:
return "[名片]"
case MessageTypeVideo:
if host, _ := m.Contents["host"].(string); host == "" {
return "[视频]"
}
keylist := make([]string, 0)
if m.Contents["md5"] != nil {
if md5, ok := m.Contents["md5"].(string); ok {
@@ -411,6 +423,9 @@ func (m *Message) PlainTextContent() string {
case MessageSubTypeLink, MessageSubTypeLink2:
return fmt.Sprintf("[链接|%s](%s)", m.Contents["title"], m.Contents["url"])
case MessageSubTypeFile:
if host, _ := m.Contents["host"].(string); host == "" {
return fmt.Sprintf("[文件|%s]", m.Contents["title"])
}
return fmt.Sprintf("[文件|%s](http://%s/file/%s)", m.Contents["title"], m.Contents["host"], m.Contents["md5"])
case MessageSubTypeGIF:
return "[GIF表情]"
@@ -532,6 +547,7 @@ func (m *Message) PlainTextContent() string {
func (m *Message) CSV(host string) []string {
m.SetContent("host", host)
return []string{
fmt.Sprintf("%d", m.Seq),
m.Time.Format("2006-01-02 15:04:05"),
m.SenderName,
m.Sender,

View File

@@ -34,6 +34,7 @@ import (
// WCDB_CT_source INTEGER DEFAULT NULL
// )
type MessageV4 struct {
LocalID int64 `json:"local_id"` // 本地唯一 ID
SortSeq int64 `json:"sort_seq"` // 消息序号10位时间戳 + 3位序号
ServerID int64 `json:"server_id"` // 消息 ID用于关联 voice
LocalType int64 `json:"local_type"` // 消息类型
@@ -46,8 +47,10 @@ type MessageV4 struct {
func (m *MessageV4) Wrap(talker string) *Message {
uniqueID := (m.CreateTime * 1000000) + m.LocalID
_m := &Message{
Seq: m.SortSeq,
Seq: uniqueID,
ID: uniqueID,
Time: time.Unix(m.CreateTime, 0),
Talker: talker,
IsChatRoom: strings.HasSuffix(talker, "@chatroom"),

View File

@@ -15,6 +15,7 @@ type DataSource interface {
// 消息
GetMessages(ctx context.Context, startTime, endTime time.Time, talker string, sender string, keyword string, limit, offset int) ([]*model.Message, error)
GetMessage(ctx context.Context, talker string, seq int64) (*model.Message, error)
// 联系人
GetContacts(ctx context.Context, key string, limit, offset int) ([]*model.Contact, error)

View File

@@ -254,7 +254,7 @@ func (ds *DataSource) GetMessages(ctx context.Context, startTime, endTime time.T
log.Debug().Msgf("Start time: %d, End time: %d", startTime.Unix(), endTime.Unix())
query := fmt.Sprintf(`
SELECT m.sort_seq, m.server_id, m.local_type, n.user_name, m.create_time, m.message_content, m.packed_info_data, m.status
SELECT m.local_id, m.sort_seq, m.server_id, m.local_type, n.user_name, m.create_time, m.message_content, m.packed_info_data, m.status
FROM %s m
LEFT JOIN Name2Id n ON m.real_sender_id = n.rowid
WHERE %s
@@ -276,6 +276,7 @@ func (ds *DataSource) GetMessages(ctx context.Context, startTime, endTime time.T
for rows.Next() {
var msg model.MessageV4
err := rows.Scan(
&msg.LocalID,
&msg.SortSeq,
&msg.ServerID,
&msg.LocalType,
@@ -363,6 +364,63 @@ func (ds *DataSource) GetMessages(ctx context.Context, startTime, endTime time.T
return filteredMessages, nil
}
func (ds *DataSource) GetMessage(ctx context.Context, talker string, seq int64) (*model.Message, error) {
if talker == "" {
return nil, errors.ErrTalkerEmpty
}
// Seq = (create_time * 1000000) + local_id
createTime := seq / 1000000
localID := seq % 1000000
t := time.Unix(createTime, 0)
dbInfos := ds.getDBInfosForTimeRange(t, t.Add(time.Second))
if len(dbInfos) == 0 {
return nil, errors.TimeRangeNotFound(t, t.Add(time.Second))
}
_talkerMd5Bytes := md5.Sum([]byte(talker))
talkerMd5 := hex.EncodeToString(_talkerMd5Bytes[:])
tableName := "Msg_" + talkerMd5
for _, dbInfo := range dbInfos {
db, err := ds.dbm.OpenDB(dbInfo.FilePath)
if err != nil {
continue
}
query := fmt.Sprintf(`
SELECT m.local_id, m.sort_seq, m.server_id, m.local_type, n.user_name, m.create_time, m.message_content, m.packed_info_data, m.status
FROM %s m
LEFT JOIN Name2Id n ON m.real_sender_id = n.rowid
WHERE m.local_id = ?
`, tableName)
var msg model.MessageV4
err = db.QueryRowContext(ctx, query, localID).Scan(
&msg.LocalID,
&msg.SortSeq,
&msg.ServerID,
&msg.LocalType,
&msg.UserName,
&msg.CreateTime,
&msg.MessageContent,
&msg.PackedInfoData,
&msg.Status,
)
if err != nil {
if err == sql.ErrNoRows {
continue
}
return nil, errors.QueryFailed("", err)
}
return msg.Wrap(talker), nil
}
return nil, errors.ErrMessageNotFound
}
// 联系人
func (ds *DataSource) GetContacts(ctx context.Context, key string, limit, offset int) ([]*model.Contact, error) {
var query string

View File

@@ -28,6 +28,22 @@ func (r *Repository) GetMessages(ctx context.Context, startTime, endTime time.Ti
return messages, nil
}
// GetMessage 获取单条消息
func (r *Repository) GetMessage(ctx context.Context, talker string, seq int64) (*model.Message, error) {
// 如果传入的是昵称,尝试转换为 ID
if contact, _ := r.GetContact(ctx, talker); contact != nil {
talker = contact.UserName
}
msg, err := r.ds.GetMessage(ctx, talker, seq)
if err != nil {
return nil, err
}
r.enrichMessage(msg)
return msg, nil
}
// EnrichMessages 补充消息的额外信息
func (r *Repository) EnrichMessages(ctx context.Context, messages []*model.Message) error {
for _, msg := range messages {

View File

@@ -70,6 +70,10 @@ func (w *DB) GetMessages(start, end time.Time, talker string, sender string, key
return messages, nil
}
func (w *DB) GetMessage(talker string, seq int64) (*model.Message, error) {
return w.repo.GetMessage(context.Background(), talker, seq)
}
type GetContactsResp struct {
Items []*model.Contact `json:"items"`
}
@@ -87,6 +91,10 @@ func (w *DB) GetContacts(key string, limit, offset int) (*GetContactsResp, error
}, nil
}
func (w *DB) GetContact(key string) (*model.Contact, error) {
return w.repo.GetContact(context.Background(), key)
}
type GetChatRoomsResp struct {
Items []*model.ChatRoom `json:"items"`
}
@@ -104,6 +112,10 @@ func (w *DB) GetChatRooms(key string, limit, offset int) (*GetChatRoomsResp, err
}, nil
}
func (w *DB) GetChatRoom(key string) (*model.ChatRoom, error) {
return w.repo.GetChatRoom(context.Background(), key)
}
type GetSessionsResp struct {
Items []*model.Session `json:"items"`
}