feat: 增强 CipherTalk MCP 协作能力与流式支持

This commit is contained in:
ILoveBingLu
2026-04-03 21:13:06 +08:00
parent 961e232c11
commit 4a0efe8904
8 changed files with 883 additions and 27 deletions
+25 -7
View File
@@ -1,10 +1,19 @@
import { getMcpConfigSnapshot, getMcpHealthPayload, getMcpStatusPayload } from './runtime'
import { McpReadService } from './readService'
import type { McpToolName } from './types'
import type { McpStreamPartialPayloadMap, McpStreamProgressPayload, McpToolName } from './types'
const readService = new McpReadService()
export async function executeMcpTool(toolName: McpToolName, args: Record<string, unknown> = {}) {
type ExecuteStreamReporter = {
progress?: (payload: McpStreamProgressPayload) => void | Promise<void>
partial?: <K extends keyof McpStreamPartialPayloadMap>(toolName: K, payload: McpStreamPartialPayloadMap[K]) => void | Promise<void>
}
export async function executeMcpTool(
toolName: McpToolName,
args: Record<string, unknown> = {},
reporter?: ExecuteStreamReporter
) {
switch (toolName) {
case 'health_check': {
const payload = getMcpHealthPayload()
@@ -14,6 +23,15 @@ export async function executeMcpTool(toolName: McpToolName, args: Record<string,
const payload = getMcpStatusPayload()
return { summary: 'CipherTalk MCP status loaded.', payload }
}
case 'resolve_session': {
const payload = await readService.resolveSession(args as any, reporter)
return {
summary: payload.recommended
? `Resolved ${payload.query} to ${payload.recommended.displayName}.`
: `Found ${payload.candidates.length} candidates for ${payload.query}.`,
payload
}
}
case 'get_global_statistics': {
const payload = await readService.getGlobalStatistics(args as any)
return { summary: 'Loaded global statistics.', payload }
@@ -27,26 +45,26 @@ export async function executeMcpTool(toolName: McpToolName, args: Record<string,
return { summary: 'Loaded activity distribution.', payload }
}
case 'list_sessions': {
const payload = await readService.listSessions(args as any)
const payload = await readService.listSessions(args as any, reporter)
return { summary: `Loaded ${payload.items.length} sessions.`, payload }
}
case 'get_messages': {
const defaults = getMcpConfigSnapshot()
const payload = await readService.getMessages(args as any, defaults.mcpExposeMediaPaths)
const payload = await readService.getMessages(args as any, defaults.mcpExposeMediaPaths, reporter)
return { summary: `Loaded ${payload.items.length} messages.`, payload }
}
case 'list_contacts': {
const payload = await readService.listContacts(args as any)
const payload = await readService.listContacts(args as any, reporter)
return { summary: `Loaded ${payload.items.length} contacts.`, payload }
}
case 'search_messages': {
const defaults = getMcpConfigSnapshot()
const payload = await readService.searchMessages(args as any, defaults.mcpExposeMediaPaths)
const payload = await readService.searchMessages(args as any, defaults.mcpExposeMediaPaths, reporter)
return { summary: `Loaded ${payload.hits.length} message hits.`, payload }
}
case 'get_session_context': {
const defaults = getMcpConfigSnapshot()
const payload = await readService.getSessionContext(args as any, defaults.mcpExposeMediaPaths)
const payload = await readService.getSessionContext(args as any, defaults.mcpExposeMediaPaths, reporter)
return { summary: `Loaded ${payload.items.length} context messages.`, payload }
}
default:
+104 -4
View File
@@ -2,7 +2,7 @@ import * as http from 'http'
import type { Socket } from 'net'
import { McpToolError } from './result'
import { executeMcpTool } from './dispatcher'
import { MCP_TOOL_NAMES, type McpToolName } from './types'
import { MCP_TOOL_NAMES, type McpStreamEvent, type McpStreamPartialPayloadMap, type McpStreamProgressPayload, type McpToolName } from './types'
type ProxySettings = {
host: '127.0.0.1'
@@ -145,6 +145,11 @@ class McpProxyService {
res.end(JSON.stringify(payload))
}
private sendSse(res: http.ServerResponse, event: McpStreamEvent): void {
res.write(`event: ${event.event}\n`)
res.write(`data: ${JSON.stringify(event.data)}\n\n`)
}
private isAuthorized(req: http.IncomingMessage): boolean {
if (!this.settings.token) return true
@@ -194,7 +199,7 @@ class McpProxyService {
return
}
if (method !== 'POST' || !pathname.startsWith('/tool/')) {
if ((method !== 'POST' && method !== 'GET') || !pathname.startsWith('/tool/')) {
this.sendJson(res, 404, {
success: false,
error: {
@@ -206,7 +211,9 @@ class McpProxyService {
return
}
const toolName = pathname.slice('/tool/'.length) as McpToolName
const isStreamRequest = pathname.endsWith('/stream')
const toolPath = isStreamRequest ? pathname.slice(0, -'/stream'.length) : pathname
const toolName = toolPath.slice('/tool/'.length) as McpToolName
if (!MCP_TOOL_NAMES.includes(toolName)) {
this.sendJson(res, 404, {
success: false,
@@ -220,8 +227,12 @@ class McpProxyService {
}
try {
const body = await this.readJson(req)
const body = method === 'POST' ? await this.readJson(req) : {}
const args = body.args && typeof body.args === 'object' ? body.args as Record<string, unknown> : {}
if (isStreamRequest) {
await this.handleStreamRequest(toolName, args, requestId, res)
return
}
const startedAt = Date.now()
const result = await executeMcpTool(toolName, args)
this.logger?.info('McpProxy', '内部 MCP 代理查询成功', {
@@ -254,6 +265,95 @@ class McpProxyService {
})
}
}
private async handleStreamRequest(
toolName: McpToolName,
args: Record<string, unknown>,
requestId: string,
res: http.ServerResponse
): Promise<void> {
const startedAt = Date.now()
let chunkIndex = 0
res.writeHead(200, {
'Content-Type': 'text/event-stream; charset=utf-8',
'Cache-Control': 'no-store',
Connection: 'keep-alive'
})
this.sendSse(res, {
event: 'meta',
data: {
toolName,
requestId,
startedAt
}
})
try {
const result = await executeMcpTool(toolName, args, {
progress: async (payload: McpStreamProgressPayload) => {
this.sendSse(res, {
event: 'progress',
data: payload
})
},
partial: async <K extends keyof McpStreamPartialPayloadMap>(partialToolName: K, payload: McpStreamPartialPayloadMap[K]) => {
chunkIndex += 1
this.sendSse(res, {
event: 'partial',
data: {
toolName: partialToolName,
chunkIndex,
payload
}
})
}
})
this.sendSse(res, {
event: 'progress',
data: {
stage: 'completed',
message: `Completed ${toolName}.`
}
})
this.sendSse(res, {
event: 'complete',
data: {
toolName,
summary: result.summary,
payload: result.payload,
completedAt: Date.now()
}
})
res.end()
} catch (error) {
const payload = error instanceof McpToolError
? error.toShape()
: {
code: 'INTERNAL_ERROR' as const,
message: String(error)
}
this.sendSse(res, {
event: 'progress',
data: {
stage: 'failed',
message: `Failed ${toolName}.`
}
})
this.sendSse(res, {
event: 'error',
data: {
...payload,
toolName,
failedAt: Date.now()
}
})
res.end()
}
}
}
export const mcpProxyService = new McpProxyService()
+321 -13
View File
@@ -24,8 +24,12 @@ import {
type McpMessageMatchField,
type McpSearchMatchMode,
type McpMessagesPayload,
type McpStreamPartialPayloadMap,
type McpStreamProgressPayload,
type McpSearchHit,
type McpSearchMessagesPayload,
type McpResolveSessionPayload,
type McpResolvedSessionCandidate,
type McpSessionContextPayload,
type McpSessionItem,
type McpSessionKind,
@@ -48,6 +52,11 @@ const listSessionsArgsSchema = z.object({
unreadOnly: z.boolean().optional()
})
const resolveSessionArgsSchema = z.object({
query: z.string().trim().min(1),
limit: z.number().int().positive().optional()
})
const getMessagesArgsSchema = z.object({
sessionId: z.string().trim().min(1),
offset: z.number().int().nonnegative().optional(),
@@ -116,6 +125,7 @@ const getSessionContextArgsSchema = z.object({
})
type ListSessionsArgs = z.infer<typeof listSessionsArgsSchema>
type ResolveSessionArgs = z.infer<typeof resolveSessionArgsSchema>
type GetMessagesArgs = z.infer<typeof getMessagesArgsSchema>
type ListContactsArgs = z.infer<typeof listContactsArgsSchema>
type SearchMessagesArgs = z.infer<typeof searchMessagesArgsSchema>
@@ -137,6 +147,7 @@ type McpSessionLookupEntry = {
session: McpSessionRef
aliases: string[]
}
type ScoredSessionCandidate = { entry: McpSessionLookupEntry; score: number }
type SearchRawHit = {
session: McpSessionRef
message: Message
@@ -144,6 +155,10 @@ type SearchRawHit = {
excerpt: string
score: number
}
type McpStreamReporter = {
progress?: (payload: McpStreamProgressPayload) => void | Promise<void>
partial?: <K extends keyof McpStreamPartialPayloadMap>(toolName: K, payload: McpStreamPartialPayloadMap[K]) => void | Promise<void>
}
function toTimestampMs(value?: number | null): number {
if (!value || !Number.isFinite(value) || value <= 0) return 0
@@ -471,6 +486,18 @@ function formatSessionCandidateHint(rawInput: string, candidates: McpSessionLook
return `${rawInput}”匹配到多个候选,请改用更具体的信息重试:\n${preview}`
}
async function reportProgress(reporter: McpStreamReporter | undefined, payload: McpStreamProgressPayload): Promise<void> {
await reporter?.progress?.(payload)
}
async function reportPartial<K extends keyof McpStreamPartialPayloadMap>(
reporter: McpStreamReporter | undefined,
toolName: K,
payload: McpStreamPartialPayloadMap[K]
): Promise<void> {
await reporter?.partial?.(toolName, payload)
}
async function getContactCatalog(): Promise<{ items: McpContactRef[]; map: Map<string, McpContactRef> }> {
const result = await chatService.getContacts()
if (!result.success) {
@@ -499,9 +526,11 @@ function tryResolveContactRef(
const exact = contactMap.get(normalized)
if (exact) return exact
const partialMatches = Array.from(new Set(contactMap.values().filter((contact) =>
buildContactSearchKeys(contact).some((value) => normalizeQuery(value).includes(normalized))
)))
const partialMatches = Array.from(new Set(
Array.from(contactMap.values()).filter((contact) =>
buildContactSearchKeys(contact).some((value) => normalizeQuery(value).includes(normalized))
)
)) as McpContactRef[]
return partialMatches.length === 1 ? partialMatches[0] : null
}
@@ -510,7 +539,7 @@ function findSessionCandidates(
rawInput: string,
sessions: McpSessionItem[],
contacts: McpContactRef[]
): Array<{ entry: McpSessionLookupEntry; score: number }> {
): ScoredSessionCandidate[] {
const query = normalizeQuery(rawInput)
if (!query) return []
@@ -523,6 +552,85 @@ function findSessionCandidates(
.sort((a, b) => b.score - a.score || a.entry.session.displayName.localeCompare(b.entry.session.displayName, 'zh-CN'))
}
function toCandidateConfidence(score: number): 'high' | 'medium' | 'low' {
if (score >= 1000 || score >= 820) return 'high'
if (score >= 640) return 'medium'
return 'low'
}
function buildCandidateEvidence(candidate: ScoredSessionCandidate, query: string): string[] {
const normalizedQuery = normalizeQuery(query)
const evidence: string[] = []
for (const alias of candidate.entry.aliases) {
const normalizedAlias = normalizeQuery(alias)
if (!normalizedAlias) continue
if (normalizedAlias === normalizedQuery) {
evidence.push(`Exact alias match: ${alias}`)
} else if (normalizedAlias.startsWith(normalizedQuery)) {
evidence.push(`Prefix alias match: ${alias}`)
} else if (normalizedAlias.includes(normalizedQuery)) {
evidence.push(`Fuzzy alias match: ${alias}`)
} else if (isSubsequence(normalizedQuery, normalizedAlias)) {
evidence.push(`Subsequence alias match: ${alias}`)
}
if (evidence.length >= 3) break
}
if (candidate.score >= 1000) {
evidence.push('High-confidence score from exact resolution.')
} else if (candidate.score >= 820) {
evidence.push('High-confidence score from strong fuzzy match.')
} else if (candidate.score >= 640) {
evidence.push('Medium-confidence score from partial fuzzy match.')
}
return uniqueStrings(evidence).slice(0, 4)
}
function toResolvedCandidate(candidate: ScoredSessionCandidate, query: string): McpResolvedSessionCandidate {
return {
...candidate.entry.session,
score: candidate.score,
confidence: toCandidateConfidence(candidate.score),
aliases: candidate.entry.aliases,
evidence: buildCandidateEvidence(candidate, query)
}
}
function buildSearchSessionSummaries(hits: McpSearchHit[]): McpSearchMessagesPayload['sessionSummaries'] {
const grouped = new Map<string, {
session: McpSessionRef
hitCount: number
topScore: number
sampleExcerpts: string[]
}>()
for (const hit of hits) {
const key = hit.session.sessionId
const existing = grouped.get(key)
if (!existing) {
grouped.set(key, {
session: hit.session,
hitCount: 1,
topScore: hit.score,
sampleExcerpts: hit.excerpt ? [hit.excerpt] : []
})
continue
}
existing.hitCount += 1
existing.topScore = Math.max(existing.topScore, hit.score)
if (hit.excerpt && existing.sampleExcerpts.length < 2 && !existing.sampleExcerpts.includes(hit.excerpt)) {
existing.sampleExcerpts.push(hit.excerpt)
}
}
return Array.from(grouped.values())
.sort((a, b) => b.hitCount - a.hitCount || b.topScore - a.topScore)
}
function resolveSessionRefStrict(
rawInput: string,
sessions: McpSessionItem[],
@@ -551,6 +659,49 @@ function resolveSessionRefStrict(
))
}
async function resolveSessionRefStrictWithProgress(
rawInput: string,
sessions: McpSessionItem[],
sessionMap: Map<string, McpSessionRef>,
contacts: McpContactRef[],
contactMap: Map<string, McpContactRef>,
reporter?: McpStreamReporter
): Promise<McpSessionRef> {
await reportProgress(reporter, {
stage: 'resolving_input',
message: `Resolving session reference from "${rawInput}".`
})
await reportProgress(reporter, {
stage: 'searching_contacts',
message: 'Searching contacts and aliases.'
})
await reportProgress(reporter, {
stage: 'searching_sessions',
message: 'Searching sessions and recent conversation entries.'
})
const direct = resolveSessionRef(rawInput, sessionMap, contactMap)
if (sessionMap.has(direct.sessionId)) {
await reportProgress(reporter, {
stage: 'resolving_candidates',
message: `Resolved to ${direct.displayName}.`,
candidates: [direct],
candidateCount: 1
})
return direct
}
const candidates = findSessionCandidates(rawInput, sessions, contacts)
await reportProgress(reporter, {
stage: 'resolving_candidates',
message: candidates.length > 0 ? `Found ${candidates.length} candidate sessions.` : 'No candidate sessions found.',
candidates: candidates.slice(0, 5).map((item) => item.entry.session),
candidateCount: candidates.length
})
return resolveSessionRefStrict(rawInput, sessions, sessionMap, contacts, contactMap)
}
function resolveSessionRef(
rawSessionId: string,
sessionMap: Map<string, McpSessionRef>,
@@ -845,7 +996,71 @@ function messageMatchesFilters(
}
export class McpReadService {
async listSessions(rawArgs: ListSessionsArgs): Promise<McpSessionsPayload> {
async resolveSession(rawArgs: ResolveSessionArgs, reporter?: McpStreamReporter): Promise<McpResolveSessionPayload> {
const args = resolveSessionArgsSchema.safeParse(rawArgs)
if (!args.success) {
throw new McpToolError('BAD_REQUEST', 'Invalid resolve_session arguments.', args.error.message)
}
const limit = Math.min(args.data.limit ?? 5, 10)
const [{ items: sessions, map: sessionMap }, { items: contacts, map: contactMap }] = await Promise.all([
getSessionCatalog(),
getContactCatalog()
])
await reportProgress(reporter, {
stage: 'resolving_input',
message: `Resolving session candidates for "${args.data.query}".`
})
const direct = resolveSessionRef(args.data.query, sessionMap, contactMap)
let candidates = findSessionCandidates(args.data.query, sessions, contacts)
if (sessionMap.has(direct.sessionId) && !candidates.some((item) => item.entry.session.sessionId === direct.sessionId)) {
const directEntry = buildSessionLookupEntries(sessions, contacts).find((entry) => entry.session.sessionId === direct.sessionId)
if (directEntry) {
candidates = [{ entry: directEntry, score: 1000 }, ...candidates]
}
}
const dedupedCandidates = Array.from(new Map(
candidates.map((candidate) => [candidate.entry.session.sessionId, candidate])
).values()).slice(0, limit)
await reportProgress(reporter, {
stage: 'resolving_candidates',
message: dedupedCandidates.length > 0 ? `Found ${dedupedCandidates.length} session candidates.` : 'No session candidates found.',
candidates: dedupedCandidates.map((candidate) => candidate.entry.session),
candidateCount: dedupedCandidates.length
})
const recommended = dedupedCandidates[0] ? toResolvedCandidate(dedupedCandidates[0], args.data.query) : undefined
const exact = Boolean(recommended && recommended.score >= 1000)
const resolved = Boolean(recommended && (dedupedCandidates.length === 1 || recommended.confidence === 'high'))
const payload: McpResolveSessionPayload = {
query: args.data.query,
resolved,
exact,
recommended,
candidates: dedupedCandidates.map((candidate) => toResolvedCandidate(candidate, args.data.query)),
suggestedNextAction: resolved
? 'get_session_context'
: dedupedCandidates.length > 0
? 'search_messages'
: 'list_contacts',
message: resolved
? `Resolved "${args.data.query}" to ${recommended?.displayName}.`
: dedupedCandidates.length > 0
? `Found ${dedupedCandidates.length} plausible session candidates for "${args.data.query}".`
: `No session candidates found for "${args.data.query}".`
}
await reportPartial(reporter, 'resolve_session', payload)
return payload
}
async listSessions(rawArgs: ListSessionsArgs, reporter?: McpStreamReporter): Promise<McpSessionsPayload> {
const args = listSessionsArgsSchema.safeParse(rawArgs)
if (!args.success) {
throw new McpToolError('BAD_REQUEST', 'Invalid list_sessions arguments.', args.error.message)
@@ -855,6 +1070,10 @@ export class McpReadService {
const offset = Math.max(0, args.data.offset ?? 0)
const limit = Math.min(args.data.limit ?? 100, MAX_LIST_LIMIT)
const unreadOnly = Boolean(args.data.unreadOnly)
await reportProgress(reporter, {
stage: 'searching_sessions',
message: query ? `Searching sessions for "${args.data.q}".` : 'Listing sessions.'
})
const [{ items: sessionItems }, { map: contactMap }] = await Promise.all([
getSessionCatalog(),
@@ -887,6 +1106,13 @@ export class McpReadService {
const total = sessions.length
const items = sessions.slice(offset, offset + limit)
await reportPartial(reporter, 'list_sessions', {
items,
total,
offset,
limit,
hasMore: offset + items.length < total
})
return {
items,
@@ -897,7 +1123,7 @@ export class McpReadService {
}
}
async listContacts(rawArgs: ListContactsArgs): Promise<McpContactsPayload> {
async listContacts(rawArgs: ListContactsArgs, reporter?: McpStreamReporter): Promise<McpContactsPayload> {
const args = listContactsArgsSchema.safeParse(rawArgs)
if (!args.success) {
throw new McpToolError('BAD_REQUEST', 'Invalid list_contacts arguments.', args.error.message)
@@ -907,6 +1133,10 @@ export class McpReadService {
const offset = Math.max(0, args.data.offset ?? 0)
const limit = Math.min(args.data.limit ?? 100, MAX_LIST_LIMIT)
const typeSet = args.data.types?.length ? new Set(args.data.types) : null
await reportProgress(reporter, {
stage: 'searching_contacts',
message: query ? `Searching contacts for "${args.data.q}".` : 'Listing contacts.'
})
const result = await chatService.getContacts()
if (!result.success) {
@@ -936,6 +1166,13 @@ export class McpReadService {
const total = contacts.length
const items = contacts.slice(offset, offset + limit)
await reportPartial(reporter, 'list_contacts', {
items,
total,
offset,
limit,
hasMore: offset + items.length < total
})
return {
items,
@@ -1012,7 +1249,7 @@ export class McpReadService {
}
}
async getMessages(rawArgs: GetMessagesArgs, defaultIncludeMediaPaths: boolean): Promise<McpMessagesPayload> {
async getMessages(rawArgs: GetMessagesArgs, defaultIncludeMediaPaths: boolean, reporter?: McpStreamReporter): Promise<McpMessagesPayload> {
const args = getMessagesArgsSchema.safeParse(rawArgs)
if (!args.success) {
throw new McpToolError('BAD_REQUEST', 'Invalid get_messages arguments.', args.error.message)
@@ -1035,8 +1272,14 @@ export class McpReadService {
getSessionCatalog(),
getContactCatalog()
])
const session = resolveSessionRefStrict(rawSessionId, sessions, sessionMap, contacts, contactMap)
const session = await resolveSessionRefStrictWithProgress(rawSessionId, sessions, sessionMap, contacts, contactMap, reporter)
const sessionId = session.sessionId
await reportProgress(reporter, {
stage: 'scanning_messages',
message: `Scanning messages in ${session.displayName}.`,
sessionsScanned: 1,
messagesScanned: 0
})
const matched: Message[] = []
let scanOffset = 0
@@ -1064,6 +1307,12 @@ export class McpReadService {
scanOffset += part.length
scanned += part.length
await reportProgress(reporter, {
stage: 'scanning_messages',
message: `Scanned ${scanned} messages in ${session.displayName}.`,
sessionsScanned: 1,
messagesScanned: scanned
})
if (!result.hasMore) {
reachedEnd = true
@@ -1075,6 +1324,12 @@ export class McpReadService {
const page = matched.slice(offset, offset + limit)
const items = await normalizeMessages(sessionId, page, { includeMediaPaths, includeRaw })
await reportPartial(reporter, 'get_messages', {
items,
offset,
limit,
hasMore: reachedEnd ? matched.length > offset + items.length : true
})
return {
items,
@@ -1084,7 +1339,7 @@ export class McpReadService {
}
}
async searchMessages(rawArgs: SearchMessagesArgs, defaultIncludeMediaPaths: boolean): Promise<McpSearchMessagesPayload> {
async searchMessages(rawArgs: SearchMessagesArgs, defaultIncludeMediaPaths: boolean, reporter?: McpStreamReporter): Promise<McpSearchMessagesPayload> {
const args = searchMessagesArgsSchema.safeParse(rawArgs)
if (!args.success) {
throw new McpToolError('BAD_REQUEST', 'Invalid search_messages arguments.', args.error.message)
@@ -1108,7 +1363,7 @@ export class McpReadService {
}
const targetSessions = sessionIdCandidates.length > 0
? sessionIdCandidates.map((sessionId) => resolveSessionRefStrict(sessionId, sessions, sessionMap, contacts, contactMap))
? await Promise.all(sessionIdCandidates.map((sessionId) => resolveSessionRefStrictWithProgress(sessionId, sessions, sessionMap, contacts, contactMap, reporter)))
: sessions.map((session) => ({
sessionId: session.sessionId,
displayName: session.displayName,
@@ -1126,6 +1381,12 @@ export class McpReadService {
let truncated = false
let bestScore = Number.NEGATIVE_INFINITY
let hitTargetReached = false
await reportProgress(reporter, {
stage: 'scanning_messages',
message: `Searching ${targetSessions.length} sessions for "${args.data.query}".`,
sessionsScanned: 0,
messagesScanned: 0
})
for (const session of targetSessions) {
sessionsScanned += 1
@@ -1159,6 +1420,12 @@ export class McpReadService {
sessionOffset += part.length
sessionScanned += part.length
messagesScanned += part.length
await reportProgress(reporter, {
stage: 'scanning_messages',
message: `Scanned ${messagesScanned} messages across ${sessionsScanned} sessions.`,
sessionsScanned,
messagesScanned
})
for (const message of part) {
if (!messageMatchesFilters(message, {
@@ -1187,6 +1454,12 @@ export class McpReadService {
if (rawHits.length >= limit * 3) {
hitTargetReached = true
await reportProgress(reporter, {
stage: 'streaming_hits',
message: `Collected ${rawHits.length} candidate hits.`,
sessionsScanned,
messagesScanned
})
if (roundBestScore === Number.NEGATIVE_INFINITY || roundBestScore <= bestScoreBeforeBatch) {
truncated = true
break
@@ -1218,17 +1491,27 @@ export class McpReadService {
matchedField: hit.matchedField,
score: hit.score
})))
const sessionSummaries = buildSearchSessionSummaries(hits)
await reportPartial(reporter, 'search_messages', {
hits,
limit,
sessionsScanned,
messagesScanned,
truncated,
sessionSummaries
})
return {
hits,
limit,
sessionsScanned,
messagesScanned,
truncated
truncated,
sessionSummaries
}
}
async getSessionContext(rawArgs: GetSessionContextArgs, defaultIncludeMediaPaths: boolean): Promise<McpSessionContextPayload> {
async getSessionContext(rawArgs: GetSessionContextArgs, defaultIncludeMediaPaths: boolean, reporter?: McpStreamReporter): Promise<McpSessionContextPayload> {
const args = getSessionContextArgsSchema.safeParse(rawArgs)
if (!args.success) {
throw new McpToolError('BAD_REQUEST', 'Invalid get_session_context arguments.', args.error.message)
@@ -1238,13 +1521,18 @@ export class McpReadService {
getSessionCatalog(),
getContactCatalog()
])
const session = resolveSessionRefStrict(args.data.sessionId, sessions, sessionMap, contacts, contactMap)
const session = await resolveSessionRefStrictWithProgress(args.data.sessionId, sessions, sessionMap, contacts, contactMap, reporter)
const resolvedSessionId = session.sessionId
const includeRaw = args.data.includeRaw ?? false
const includeMediaPaths = args.data.includeMediaPaths ?? defaultIncludeMediaPaths
if (args.data.mode === 'latest') {
const latestLimit = Math.min(args.data.beforeLimit ?? 30, MAX_CONTEXT_LIMIT)
await reportProgress(reporter, {
stage: 'scanning_messages',
message: `Loading latest context for ${session.displayName}.`,
sessionsScanned: 1
})
const result = await chatService.getMessages(resolvedSessionId, 0, latestLimit)
if (!result.success) {
mapChatError(result.error)
@@ -1254,6 +1542,13 @@ export class McpReadService {
includeMediaPaths,
includeRaw
})
await reportPartial(reporter, 'get_session_context', {
session,
mode: 'latest',
items: messages,
hasMoreBefore: Boolean(result.hasMore),
hasMoreAfter: false
})
return {
session,
@@ -1267,6 +1562,11 @@ export class McpReadService {
const anchorCursor = args.data.anchorCursor!
const beforeLimit = Math.min(args.data.beforeLimit ?? 20, MAX_CONTEXT_LIMIT)
const afterLimit = Math.min(args.data.afterLimit ?? 20, MAX_CONTEXT_LIMIT)
await reportProgress(reporter, {
stage: 'scanning_messages',
message: `Loading context around anchor in ${session.displayName}.`,
sessionsScanned: 1
})
const [beforeResult, anchorResult, afterResult] = await Promise.all([
chatService.getMessagesBefore(
@@ -1315,6 +1615,14 @@ export class McpReadService {
includeRaw
})
])
await reportPartial(reporter, 'get_session_context', {
session,
mode: 'around',
anchor: anchorItem,
items: [...beforeItems, anchorItem, ...afterItems],
hasMoreBefore: Boolean(beforeResult.hasMore),
hasMoreAfter: Boolean(afterResult.hasMore)
})
return {
session,
+112
View File
@@ -10,6 +10,8 @@ import type {
McpGlobalStatisticsPayload,
McpHealthPayload,
McpMessagesPayload,
McpResolveSessionPayload,
McpStreamEvent,
McpSearchMessagesPayload,
McpSessionContextPayload,
McpSessionsPayload,
@@ -36,10 +38,29 @@ type ProxyEnvelopeError = {
}
}
type StreamToolOptions = {
signal?: AbortSignal
onEvent?: (event: McpStreamEvent) => void
}
function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
function findSseDelimiterIndex(buffer: string): { index: number; length: number } | null {
const crlfIndex = buffer.indexOf('\r\n\r\n')
if (crlfIndex >= 0) {
return { index: crlfIndex, length: 4 }
}
const lfIndex = buffer.indexOf('\n\n')
if (lfIndex >= 0) {
return { index: lfIndex, length: 2 }
}
return null
}
function getSpawnEnv(): NodeJS.ProcessEnv {
const env = { ...process.env }
delete env.ELECTRON_RUN_AS_NODE
@@ -153,6 +174,82 @@ export class McpReadService {
return payload.data
}
async streamTool(
toolName: McpToolName,
args: Record<string, unknown> = {},
options: StreamToolOptions = {}
): Promise<unknown> {
const proxyConfig = await this.ensureProxyReady(toolName !== 'health_check')
const response = await fetch(`${proxyConfig.url}/tool/${toolName}/stream`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream',
...(proxyConfig.token ? { Authorization: `Bearer ${proxyConfig.token}` } : {})
},
body: JSON.stringify({ args }),
signal: options.signal
})
if (!response.ok || !response.body) {
return this.callProxy(toolName, args)
}
const decoder = new TextDecoder()
const reader = response.body.getReader()
let buffer = ''
let finalPayload: unknown
const flushEvent = async (rawBlock: string) => {
const lines = rawBlock.split(/\r?\n/)
let eventName = 'message'
const dataLines: string[] = []
for (const line of lines) {
if (line.startsWith('event:')) {
eventName = line.slice('event:'.length).trim()
} else if (line.startsWith('data:')) {
dataLines.push(line.slice('data:'.length).trim())
}
}
if (dataLines.length === 0) return
const parsed = JSON.parse(dataLines.join('\n')) as McpStreamEvent['data']
const event = { event: eventName, data: parsed } as McpStreamEvent
options.onEvent?.(event)
if (event.event === 'error') {
throw new McpToolError(event.data.code, event.data.message, event.data.hint)
}
if (event.event === 'complete') {
finalPayload = event.data.payload
}
}
while (true) {
const { value, done } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
let delimiter = findSseDelimiterIndex(buffer)
while (delimiter) {
const block = buffer.slice(0, delimiter.index).trim()
buffer = buffer.slice(delimiter.index + delimiter.length)
if (block) {
await flushEvent(block)
}
delimiter = findSseDelimiterIndex(buffer)
}
}
if (buffer.trim()) {
await flushEvent(buffer.trim())
}
return finalPayload
}
async healthCheck(): Promise<McpHealthPayload> {
const proxyConfig = await this.ensureProxyReady(false)
const response = await fetch(`${proxyConfig.url}/status`, {
@@ -169,6 +266,10 @@ export class McpReadService {
return this.callProxy<McpStatusPayload>('get_status')
}
async resolveSession(rawArgs: Record<string, unknown>): Promise<McpResolveSessionPayload> {
return this.callProxy<McpResolveSessionPayload>('resolve_session', rawArgs)
}
async listSessions(rawArgs: Record<string, unknown>): Promise<McpSessionsPayload> {
return this.callProxy<McpSessionsPayload>('list_sessions', rawArgs)
}
@@ -209,4 +310,15 @@ export class McpReadService {
includeMediaPaths: rawArgs.includeMediaPaths ?? defaultIncludeMediaPaths
})
}
async streamSearchMessages(
rawArgs: Record<string, unknown>,
defaultIncludeMediaPaths: boolean,
options: StreamToolOptions = {}
): Promise<McpSearchMessagesPayload> {
return this.streamTool('search_messages', {
...rawArgs,
includeMediaPaths: rawArgs.includeMediaPaths ?? defaultIncludeMediaPaths
}, options) as Promise<McpSearchMessagesPayload>
}
}
+19 -3
View File
@@ -31,6 +31,22 @@ export function registerCipherTalkMcpTools(server: any) {
}
})
server.registerTool('resolve_session', {
title: 'Resolve Session',
description: 'Resolve a fuzzy person/session clue into the most likely chat session, returning candidates, confidence, and recommended next action.',
inputSchema: {
query: z.string().trim().min(1).describe('Fuzzy person or session clue. Can be a partial name, nickname, remark fragment, institution fragment, or sessionId.'),
limit: z.number().int().positive().optional().describe('Maximum number of candidates to return.')
}
}, async (args: unknown) => {
try {
const payload = await readService.resolveSession((args || {}) as any)
return createToolSuccess(payload.message, payload)
} catch (error) {
return createToolError(error)
}
})
server.registerTool('get_global_statistics', {
title: 'Get Global Statistics',
description: 'Return global private-chat statistics for agent-side analysis.',
@@ -82,7 +98,7 @@ export function registerCipherTalkMcpTools(server: any) {
server.registerTool('list_sessions', {
title: 'List Sessions',
description: 'List chat sessions with search and pagination.',
description: 'List chat sessions with search and pagination. Use as a fuzzy discovery entry point when the user only remembers part of a name, remark, institution, or recent clue.',
inputSchema: {
q: z.string().optional().describe('Optional search keyword.'),
offset: z.number().int().nonnegative().optional().describe('Pagination offset.'),
@@ -124,7 +140,7 @@ export function registerCipherTalkMcpTools(server: any) {
server.registerTool('list_contacts', {
title: 'List Contacts',
description: 'List contacts, groups, and official accounts for agent-side resolution.',
description: 'List contacts, groups, and official accounts for agent-side resolution. Use as a broad fuzzy lookup entry point before guessing a specific sessionId.',
inputSchema: {
q: z.string().optional().describe('Optional search keyword.'),
offset: z.number().int().nonnegative().optional().describe('Pagination offset.'),
@@ -142,7 +158,7 @@ export function registerCipherTalkMcpTools(server: any) {
server.registerTool('search_messages', {
title: 'Search Messages',
description: 'Search messages across one or more sessions and return agent-friendly hits.',
description: 'Search messages across one or more sessions and return agent-friendly hits. Use for broad clue hunting when the target session or keyword is still uncertain.',
inputSchema: {
query: z.string().trim().min(1).describe('Required full-text query.'),
sessionId: z.string().trim().min(1).optional().describe('Single session identifier to search. Accepts sessionId, contactId, display name, remark, or nickname when uniquely resolvable.'),
+110
View File
@@ -1,6 +1,7 @@
export const MCP_TOOL_NAMES = [
'health_check',
'get_status',
'resolve_session',
'list_sessions',
'get_messages',
'list_contacts',
@@ -49,6 +50,16 @@ export type McpToolName = (typeof MCP_TOOL_NAMES)[number]
export type McpContactKind = (typeof MCP_CONTACT_KINDS)[number]
export type McpMessageKind = (typeof MCP_MESSAGE_KINDS)[number]
export type McpSearchMatchMode = 'substring' | 'exact'
export type McpStreamEventType = 'meta' | 'progress' | 'partial' | 'complete' | 'error'
export type McpStreamProgressStage =
| 'resolving_input'
| 'searching_contacts'
| 'searching_sessions'
| 'resolving_candidates'
| 'scanning_messages'
| 'streaming_hits'
| 'completed'
| 'failed'
export type McpLaunchMode = 'dev' | 'packaged'
export type McpLauncherMode = 'dev-runner' | 'packaged-launcher' | 'direct'
@@ -122,6 +133,23 @@ export interface McpSessionsPayload {
hasMore: boolean
}
export interface McpResolvedSessionCandidate extends McpSessionRef {
score: number
confidence: 'high' | 'medium' | 'low'
aliases: string[]
evidence: string[]
}
export interface McpResolveSessionPayload {
query: string
resolved: boolean
exact: boolean
recommended?: McpResolvedSessionCandidate
candidates: McpResolvedSessionCandidate[]
suggestedNextAction: 'get_messages' | 'get_session_context' | 'search_messages' | 'list_contacts' | 'list_sessions'
message: string
}
export interface McpContactItem {
contactId: string
sessionId?: string
@@ -196,6 +224,12 @@ export interface McpSearchMessagesPayload {
sessionsScanned: number
messagesScanned: number
truncated: boolean
sessionSummaries?: Array<{
session: McpSessionRef
hitCount: number
topScore: number
sampleExcerpts: string[]
}>
}
export interface McpTimeRange {
@@ -256,3 +290,79 @@ export interface McpSessionContextPayload {
hasMoreBefore: boolean
hasMoreAfter: boolean
}
export interface McpStreamMetaPayload {
toolName: McpToolName
requestId?: string
startedAt: number
}
export interface McpStreamProgressPayload {
stage: McpStreamProgressStage
message?: string
sessionsScanned?: number
messagesScanned?: number
candidates?: Array<Pick<McpSessionRef, 'sessionId' | 'displayName' | 'kind'>>
candidateCount?: number
truncated?: boolean
}
export interface McpStreamPartialPayloadMap {
resolve_session: Partial<McpResolveSessionPayload>
list_sessions: Partial<McpSessionsPayload>
list_contacts: Partial<McpContactsPayload>
get_messages: Partial<McpMessagesPayload>
search_messages: Partial<McpSearchMessagesPayload>
get_session_context: Partial<McpSessionContextPayload>
}
export type McpStreamPartialPayload =
| McpStreamPartialPayloadMap['list_sessions']
| McpStreamPartialPayloadMap['list_contacts']
| McpStreamPartialPayloadMap['get_messages']
| McpStreamPartialPayloadMap['search_messages']
| McpStreamPartialPayloadMap['get_session_context']
export interface McpStreamMetaEvent {
event: 'meta'
data: McpStreamMetaPayload
}
export interface McpStreamProgressEvent {
event: 'progress'
data: McpStreamProgressPayload
}
export interface McpStreamPartialEvent {
event: 'partial'
data: {
toolName: McpToolName
chunkIndex: number
payload: McpStreamPartialPayload
}
}
export interface McpStreamCompleteEvent {
event: 'complete'
data: {
toolName: McpToolName
summary: string
payload: unknown
completedAt: number
}
}
export interface McpStreamErrorEvent {
event: 'error'
data: McpErrorShape & {
toolName: McpToolName
failedAt: number
}
}
export type McpStreamEvent =
| McpStreamMetaEvent
| McpStreamProgressEvent
| McpStreamPartialEvent
| McpStreamCompleteEvent
| McpStreamErrorEvent
+79
View File
@@ -0,0 +1,79 @@
---
name: ct-mcp-copilot
description: Use CipherTalk MCP as an AI copilot for contact lookup, session resolution, message search, context retrieval, and chat analytics. Trigger when the user provides partial, fuzzy, mistaken, or incomplete clues such as nicknames, remarks, organization fragments, typo-prone names, or half-remembered keywords, or wants the AI to proactively dig for more data instead of stopping after one failed query.
---
# ct-mcp-copilot
Use CipherTalk MCP like a patient investigator, not like a rigid database client.
## Core behavior
1. Start broad, then narrow.
2. Treat `list_contacts` and `list_sessions` as fuzzy entry points.
3. Assume the user may remember only part of the truth.
4. Do not stop after the first miss.
5. When multiple candidates exist, compare them and keep shrinking the set.
## Default routing
1. If the user describes a person loosely, start with both `list_contacts` and `list_sessions`.
2. When the clue is especially fuzzy or typo-prone, prefer `resolve_session` first to get candidates, confidence, and the recommended next action.
3. If the target is still unclear, compare remark, nickname, display name, recent timestamp, and session kind.
4. Once one session becomes the best candidate, switch to `get_messages` or `get_session_context`.
5. If the user wants more clues or the session is still uncertain, use `search_messages` across multiple sessions or globally.
6. Use analytics tools only after the target scope is reasonably stable.
## Fuzzy clue strategy
When the user gives weak clues such as a nickname fragment, an organization fragment, a possibly mistyped name, or a half-remembered phrase:
- Search contacts and sessions in parallel.
- Use fragment matches, nickname matches, remark matches, and organization-name matches.
- Prefer candidates with recent activity when the user implies recency.
- If a keyword is uncertain, search globally before concluding there is no evidence.
- If one query misses, reformulate the clue and try another route.
## Candidate handling
When there are multiple plausible candidates:
- Do not pretend the result is unique.
- Read `resolve_session.candidates[*].evidence` before choosing.
- Compare the top candidates using recent message preview, session kind, and contact aliases.
- Explain which candidate is currently strongest and why.
- If needed, inspect each candidates latest context before answering.
When `resolve_session` returns a recommendation:
- Treat `recommended.confidence` as a hint, not a blind verdict.
- Use `recommended.evidence` to explain why this candidate is strongest.
- If confidence is only `medium` or `low`, verify with `get_session_context` or `search_messages` before committing.
When `search_messages` returns global or multi-session hits:
- Read `sessionSummaries` first.
- Use `sessionSummaries` to see which session is accumulating the strongest evidence.
- Use `sampleExcerpts` to decide whether to keep narrowing, switch sessions, or confirm the lead.
## Battle report
After each meaningful exploration round, produce a very short battle report for yourself or the user:
- “战报:已锁定 3 个候选,下一步按备注和最近消息区分。”
- “战报:会话还不唯一,准备全局搜关键词补证据。”
- “战报:已确认目标会话,开始拉最近上下文。”
Keep it short. It should help trace the reasoning, not overshadow the answer.
## Never do this
- Do not conclude “没有数据” after a single failed query.
- Do not insist on exact `sessionId` when fuzzy resolution is possible.
- Do not ignore `hint` or candidate summaries returned by MCP.
- Do not ignore `evidence` on resolved candidates or `sessionSummaries` on search results.
- Do not lock onto a candidate while ambiguity is still obvious.
## References
- Read [references/queries.md](references/queries.md) when you need concrete fuzzy-query playbooks, fallback chains, or battle-report examples.
+113
View File
@@ -0,0 +1,113 @@
# CipherTalk MCP Query Playbook
## Quick playbooks
### 1. User gives a vague person clue
Example:
- “帮我查那个昵称像英文名的人最近聊了什么”
- “那个带组织备注的人”
Use this order:
1. `list_contacts(q=<clue>)`
2. `list_sessions(q=<clue>)`
3. If the clue is weak, run `resolve_session(query=<clue>)`
4. Read `recommended`, `confidence`, and `evidence`
5. Compare candidates
6. `get_session_context` on the best candidate
7. If still uncertain, `search_messages` with a related keyword
Battle report:
- “战报:联系人和会话都已起底,先核对最近上下文。”
### 2. User may remember the name wrong
Example:
- “名字可能记错了,只记得一半”
Use this order:
1. Search multiple variants in contacts and sessions
2. Run `resolve_session` on the variants if needed
3. Prefer overlap between results
4. If multiple hits remain, inspect latest context for top candidates
5. Tell the user which one looks most plausible and why
Battle report:
- “战报:记忆有偏差,先用别名和片段交叉排嫌疑人。”
### 3. User wants more data, not just one answer
Example:
- “你自己多查点”
- “再深挖一点”
Use this order:
1. Resolve the most likely session
2. Read `resolve_session.recommended.evidence`
3. Pull latest context
4. Search related keywords globally or across nearby candidates
5. Use `search_messages.sessionSummaries` to see which session owns most of the evidence
6. Add timing clues, active hours, or contact rankings if useful
Battle report:
- “战报:主目标已确认,开始扩线索,不只看单条聊天。”
### 4. Keyword is weak or typo-prone
Example:
- user only remembers half a phrase
- user remembers a nickname that might be wrong
Use this order:
1. Fuzzy person lookup first
2. Global `search_messages`
3. Read `sessionSummaries` before digging into raw hits
4. Narrow back to candidate sessions
5. Re-run `search_messages` inside the best session(s)
Battle report:
- “战报:关键词不稳,先撒网再回收。”
## Candidate comparison checklist
When choosing among multiple sessions, compare:
- contact remark
- nickname
- display name
- recent message preview
- last active timestamp
- session kind
## Answer style
When the evidence is strong:
- state the likely target directly
- mention the evidence briefly
- prefer quoting `resolve_session.evidence` or `sessionSummaries` over hand-wavy justification
When the evidence is mixed:
- name the top candidate
- mention 1-2 backup candidates
- say what you checked to distinguish them
- mention which evidence is still missing
When evidence is still weak:
- say it is not fully locked yet
- continue querying instead of stopping early
- say which next tool call is most likely to break the tie