feat: 增加查询缓存以加速访问

This commit is contained in:
digua
2026-04-07 22:14:54 +08:00
parent 9b8595925d
commit 2306b3dfd7
4 changed files with 99 additions and 4 deletions

View File

@@ -350,7 +350,9 @@ export function deleteSession(sessionId: string): boolean {
if (fs.existsSync(shmPath)) {
fs.unlinkSync(shmPath)
}
deleteSessionCache(sessionId, getCacheDir())
const cacheDir = getCacheDir()
deleteSessionCache(sessionId, cacheDir)
deleteSessionCache(sessionId, path.join(cacheDir, 'query'))
return true
} catch (error) {
console.error('[Database] Failed to delete session:', error)

View File

@@ -636,7 +636,9 @@ export function registerChatHandlers(ctx: IpcContext): void {
*/
ipcMain.handle('chat:updateMemberAliases', async (_, sessionId: string, memberId: number, aliases: string[]) => {
try {
return await worker.updateMemberAliases(sessionId, memberId, aliases)
const result = await worker.updateMemberAliases(sessionId, memberId, aliases)
if (result) worker.invalidateAnalysisCache(sessionId).catch(() => {})
return result
} catch (error) {
console.error('Failed to update member alias:', error)
return false
@@ -651,7 +653,9 @@ export function registerChatHandlers(ctx: IpcContext): void {
// 先关闭数据库连接
await worker.closeDatabase(sessionId)
// 执行删除
return await worker.deleteMember(sessionId, memberId)
const result = await worker.deleteMember(sessionId, memberId)
if (result) worker.invalidateAnalysisCache(sessionId).catch(() => {})
return result
} catch (error) {
console.error('Failed to delete member:', error)
return false
@@ -1029,6 +1033,8 @@ export function registerChatHandlers(ctx: IpcContext): void {
} catch (e) {
console.error('[IpcMain] Failed to incrementally generate session index:', e)
}
// 数据变更后清除分析缓存
worker.invalidateAnalysisCache(sessionId).catch(() => {})
}
return result

View File

@@ -9,8 +9,10 @@
* 4. 返回结果
*/
import * as path from 'path'
import { parentPort, workerData } from 'worker_threads'
import { initDbDir, closeDatabase, closeAllDatabases } from './core'
import { initDbDir, closeDatabase, closeAllDatabases, getCacheDir } from './core'
import { getCache, setCache, deleteSessionCache } from '../database/sessionCache'
import {
getAvailableYears,
getMemberActivity,
@@ -73,6 +75,57 @@ import { streamImport, streamParseFileInfo, analyzeIncrementalImport, incrementa
// 初始化数据库目录
initDbDir(workerData.dbDir, workerData.cacheDir)
// ==================== 分析结果缓存 ====================
const ANALYSIS_CACHE_PREFIX = 'analysis:'
function getQueryCacheDir(): string {
const cacheDir = getCacheDir()
return cacheDir ? path.join(cacheDir, 'query') : ''
}
const CACHEABLE_QUERIES = new Set([
'getAvailableYears',
'getMemberActivity',
'getHourlyActivity',
'getDailyActivity',
'getWeekdayActivity',
'getMonthlyActivity',
'getYearlyActivity',
'getMessageLengthDistribution',
'getMessageTypeDistribution',
'getTimeRange',
'getCatchphraseAnalysis',
'getMentionAnalysis',
'getMentionGraph',
'getLaughAnalysis',
'getClusterGraph',
'getWordFrequency',
])
function buildAnalysisCacheKey(type: string, payload: any): string {
const parts = [ANALYSIS_CACHE_PREFIX + type]
// 标准 filter 对象(大多数分析查询)
const filter = payload.filter || payload.timeFilter
if (filter) {
if (filter.startTs !== undefined) parts.push(`s${filter.startTs}`)
if (filter.endTs !== undefined) parts.push(`e${filter.endTs}`)
if (filter.memberId !== undefined && filter.memberId !== null) {
parts.push(`m${filter.memberId}`)
}
}
// 顶层 memberId如 getWordFrequency 直接传 memberId
if (payload.memberId !== undefined && payload.memberId !== null) parts.push(`m${payload.memberId}`)
if (payload.keywords) parts.push(`k${JSON.stringify(payload.keywords)}`)
if (payload.options) parts.push(`o${JSON.stringify(payload.options)}`)
// getWordFrequency 特有参数
if (payload.locale) parts.push(`l${payload.locale}`)
if (payload.topN) parts.push(`n${payload.topN}`)
if (payload.minLength) parts.push(`ml${payload.minLength}`)
if (payload.posTags) parts.push(`pt${JSON.stringify(payload.posTags)}`)
return parts.join(':')
}
// ==================== 消息处理 ====================
interface WorkerMessage {
@@ -167,6 +220,15 @@ const syncHandlers: Record<string, (payload: any) => any> = {
// 深度搜索LIKE 子串匹配)
deepSearchMessages: (p) => deepSearchMessages(p.sessionId, p.keywords, p.filter, p.limit, p.offset, p.senderId),
// 缓存管理
invalidateAnalysisCache: (p) => {
const queryCacheDir = getQueryCacheDir()
if (queryCacheDir && p.sessionId) {
deleteSessionCache(p.sessionId, queryCacheDir)
}
return true
},
}
// 异步消息处理器(流式操作)
@@ -201,6 +263,21 @@ parentPort?.on('message', async (message: WorkerMessage) => {
throw new Error(`Unknown message type: ${type}`)
}
// 可缓存查询先查缓存miss 后执行并写回
const queryCacheDir = getQueryCacheDir()
if (queryCacheDir && CACHEABLE_QUERIES.has(type) && payload.sessionId) {
const cacheKey = buildAnalysisCacheKey(type, payload)
const cached = getCache(payload.sessionId, cacheKey, queryCacheDir)
if (cached !== null) {
parentPort?.postMessage({ id, success: true, result: cached })
return
}
const result = syncHandler(payload)
setCache(payload.sessionId, cacheKey, result, queryCacheDir)
parentPort?.postMessage({ id, success: true, result })
return
}
const result = syncHandler(payload)
parentPort?.postMessage({ id, success: true, result })
} catch (error) {

View File

@@ -253,6 +253,16 @@ export async function pluginCompute<TOutput = any>(fnString: string, input: any)
return sendToWorker('pluginCompute', { fnString, input }, 120000)
}
// ==================== 缓存管理 ====================
/**
* 清除指定 session 的所有分析结果缓存
* 在数据变更(增量导入、成员删除/别名更新)后调用
*/
export async function invalidateAnalysisCache(sessionId: string): Promise<boolean> {
return sendToWorker('invalidateAnalysisCache', { sessionId })
}
// ==================== 导出的异步 API ====================
export async function getAvailableYears(sessionId: string): Promise<number[]> {