feat: 解析器重构

This commit is contained in:
digua
2025-12-01 17:22:34 +08:00
parent e7de9cc57d
commit b30abc336d
26 changed files with 2366 additions and 676 deletions
+26 -7
View File
@@ -37,10 +37,12 @@ import {
getCheckInAnalysis,
} from './queryAdvanced'
import { parseFile, detectFormat } from '../parser'
import { streamImport, streamParseFileInfo } from './streamImport'
import type { FileParseInfo } from '../../../src/types/chat'
/**
* 解析文件获取基本信息(在 Worker 线程中执行,不阻塞主进程)
* @deprecated 使用 streamParseFileInfo 替代
*/
function parseFileInfo(filePath: string): FileParseInfo {
const format = detectFormat(filePath)
@@ -70,9 +72,9 @@ interface WorkerMessage {
payload: any
}
// 消息类型到处理函数的映射
const handlers: Record<string, (payload: any) => any> = {
// 文件解析(合并功能使用)
// 同步消息处理器
const syncHandlers: Record<string, (payload: any) => any> = {
// 文件解析(合并功能使用,已废弃
parseFileInfo: (p) => parseFileInfo(p.filePath),
// 基础查询
@@ -111,17 +113,34 @@ const handlers: Record<string, (payload: any) => any> = {
getCheckInAnalysis: (p) => getCheckInAnalysis(p.sessionId, p.filter),
}
// 异步消息处理器(流式操作)
const asyncHandlers: Record<string, (payload: any, requestId: string) => Promise<any>> = {
// 流式导入
streamImport: (p, id) => streamImport(p.filePath, id),
// 流式解析文件信息(用于合并预览)
streamParseFileInfo: (p, id) => streamParseFileInfo(p.filePath, id),
}
// 处理消息
parentPort?.on('message', (message: WorkerMessage) => {
parentPort?.on('message', async (message: WorkerMessage) => {
const { id, type, payload } = message
try {
const handler = handlers[type]
if (!handler) {
// 检查是否是异步处理器
const asyncHandler = asyncHandlers[type]
if (asyncHandler) {
const result = await asyncHandler(payload, id)
parentPort?.postMessage({ id, success: true, result })
return
}
// 同步处理器
const syncHandler = syncHandlers[type]
if (!syncHandler) {
throw new Error(`Unknown message type: ${type}`)
}
const result = handler(payload)
const result = syncHandler(payload)
parentPort?.postMessage({ id, success: true, result })
} catch (error) {
parentPort?.postMessage({
+4 -1
View File
@@ -31,6 +31,9 @@ export {
getAllSessions,
getSession,
closeDatabase,
// 文件解析 API异步,用于合并功能
// 文件解析 API已废弃,使用流式版本
parseFileInfo,
// 流式导入 API
streamImport,
streamParseFileInfo,
} from './workerManager'
+382
View File
@@ -0,0 +1,382 @@
/**
* 流式导入模块
* 在 Worker 线程中流式解析文件并批量写入数据库
*/
import Database from 'better-sqlite3'
import * as fs from 'fs'
import * as path from 'path'
import { parentPort } from 'worker_threads'
import {
streamParseFile,
detectFormat,
getPreprocessor,
needsPreprocess,
type ParseProgress,
type ParsedMeta,
type ParsedMember,
type ParsedMessage,
getFileSize,
} from '../parser'
/** 流式导入结果 */
export interface StreamImportResult {
success: boolean
sessionId?: string
error?: string
}
import { getDbDir } from './dbCore'
/**
* 发送进度到主进程
*/
function sendProgress(requestId: string, progress: ParseProgress): void {
parentPort?.postMessage({
id: requestId,
type: 'progress',
payload: progress,
})
}
/**
* 生成唯一的会话ID
*/
function generateSessionId(): string {
const timestamp = Date.now()
const random = Math.random().toString(36).substring(2, 8)
return `chat_${timestamp}_${random}`
}
/**
* 获取数据库文件路径
*/
function getDbPath(sessionId: string): string {
return path.join(getDbDir(), `${sessionId}.db`)
}
/**
* 创建数据库并初始化表结构
*/
function createDatabase(sessionId: string): Database.Database {
const dbDir = getDbDir()
if (!fs.existsSync(dbDir)) {
fs.mkdirSync(dbDir, { recursive: true })
}
const dbPath = getDbPath(sessionId)
const db = new Database(dbPath)
db.pragma('journal_mode = WAL')
db.pragma('synchronous = NORMAL')
db.exec(`
CREATE TABLE IF NOT EXISTS meta (
name TEXT NOT NULL,
platform TEXT NOT NULL,
type TEXT NOT NULL,
imported_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS member (
id INTEGER PRIMARY KEY AUTOINCREMENT,
platform_id TEXT NOT NULL UNIQUE,
name TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS member_name_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
member_id INTEGER NOT NULL,
name TEXT NOT NULL,
start_ts INTEGER NOT NULL,
end_ts INTEGER,
FOREIGN KEY(member_id) REFERENCES member(id)
);
CREATE TABLE IF NOT EXISTS message (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender_id INTEGER NOT NULL,
ts INTEGER NOT NULL,
type INTEGER NOT NULL,
content TEXT,
FOREIGN KEY(sender_id) REFERENCES member(id)
);
CREATE INDEX IF NOT EXISTS idx_message_ts ON message(ts);
CREATE INDEX IF NOT EXISTS idx_message_sender ON message(sender_id);
CREATE INDEX IF NOT EXISTS idx_member_name_history_member_id ON member_name_history(member_id);
`)
return db
}
/**
* 流式导入聊天记录
* @param filePath 文件路径
* @param requestId 请求ID(用于进度回调)
*/
export async function streamImport(filePath: string, requestId: string): Promise<StreamImportResult> {
// 检测格式
const formatFeature = detectFormat(filePath)
if (!formatFeature) {
return { success: false, error: '无法识别文件格式' }
}
console.log(`[StreamImport] 开始导入: ${filePath}, 格式: ${formatFeature.name}`)
// 预处理:如果格式需要且文件较大,先精简
let actualFilePath = filePath
let tempFilePath: string | null = null
const preprocessor = getPreprocessor(filePath)
if (preprocessor && needsPreprocess(filePath)) {
console.log(`[StreamImport] 文件需要预处理,开始精简...`)
sendProgress(requestId, {
stage: 'parsing',
bytesRead: 0,
totalBytes: 0,
messagesProcessed: 0,
percentage: 0,
message: '预处理:精简大文件中...',
})
try {
tempFilePath = await preprocessor.preprocess(filePath, (progress) => {
sendProgress(requestId, {
...progress,
message: progress.message || '预处理中...',
})
})
actualFilePath = tempFilePath
console.log(`[StreamImport] 预处理完成: ${tempFilePath}`)
} catch (err) {
console.error(`[StreamImport] 预处理失败:`, err)
return {
success: false,
error: `预处理失败: ${err instanceof Error ? err.message : String(err)}`,
}
}
}
const sessionId = generateSessionId()
const db = createDatabase(sessionId)
// 准备语句
const insertMeta = db.prepare(`
INSERT INTO meta (name, platform, type, imported_at) VALUES (?, ?, ?, ?)
`)
const insertMember = db.prepare(`
INSERT OR IGNORE INTO member (platform_id, name) VALUES (?, ?)
`)
const getMemberId = db.prepare(`SELECT id FROM member WHERE platform_id = ?`)
const insertMessage = db.prepare(`
INSERT INTO message (sender_id, ts, type, content) VALUES (?, ?, ?, ?)
`)
const insertNameHistory = db.prepare(`
INSERT INTO member_name_history (member_id, name, start_ts, end_ts) VALUES (?, ?, ?, ?)
`)
const updateMemberName = db.prepare(`UPDATE member SET name = ? WHERE platform_id = ?`)
const updateNameHistoryEndTs = db.prepare(`
UPDATE member_name_history SET end_ts = ? WHERE member_id = ? AND end_ts IS NULL
`)
// 成员ID映射(platformId -> dbId
const memberIdMap = new Map<string, number>()
// 昵称追踪器
const nicknameTracker = new Map<string, { currentName: string; lastSeenTs: number }>()
// 是否已插入 meta
let metaInserted = false
// 开始事务(整个导入作为一个大事务,提高性能)
db.exec('BEGIN TRANSACTION')
try {
await streamParseFile(actualFilePath, {
batchSize: 5000,
onProgress: (progress) => {
// 转发进度到主进程
sendProgress(requestId, progress)
},
onMeta: (meta: ParsedMeta) => {
if (!metaInserted) {
insertMeta.run(meta.name, meta.platform, meta.type, Math.floor(Date.now() / 1000))
metaInserted = true
console.log(`[StreamImport] Meta 已写入: ${meta.name}`)
}
},
onMembers: (members: ParsedMember[]) => {
console.log(`[StreamImport] 收到 ${members.length} 个成员`)
for (const member of members) {
insertMember.run(member.platformId, member.name)
const row = getMemberId.get(member.platformId) as { id: number } | undefined
if (row) {
memberIdMap.set(member.platformId, row.id)
}
}
},
onMessageBatch: (messages: ParsedMessage[]) => {
for (const msg of messages) {
// 数据验证:跳过无效消息
if (!msg.senderPlatformId || !msg.senderName) {
console.warn('[StreamImport] 跳过无效消息:缺少发送者信息')
continue
}
if (msg.timestamp === undefined || msg.timestamp === null || isNaN(msg.timestamp)) {
console.warn('[StreamImport] 跳过无效消息:缺少时间戳')
continue
}
if (msg.type === undefined || msg.type === null) {
console.warn('[StreamImport] 跳过无效消息:缺少消息类型')
continue
}
// 确保成员存在
if (!memberIdMap.has(msg.senderPlatformId)) {
const memberName = msg.senderName || msg.senderPlatformId
insertMember.run(msg.senderPlatformId, memberName)
const row = getMemberId.get(msg.senderPlatformId) as { id: number } | undefined
if (row) {
memberIdMap.set(msg.senderPlatformId, row.id)
}
}
const senderId = memberIdMap.get(msg.senderPlatformId)
if (senderId === undefined) continue
// 插入消息
insertMessage.run(senderId, msg.timestamp, msg.type, msg.content)
// 追踪昵称变化
const senderName = msg.senderName || msg.senderPlatformId
const tracker = nicknameTracker.get(msg.senderPlatformId)
if (!tracker) {
nicknameTracker.set(msg.senderPlatformId, {
currentName: senderName,
lastSeenTs: msg.timestamp,
})
insertNameHistory.run(senderId, senderName, msg.timestamp, null)
} else if (tracker.currentName !== senderName) {
updateNameHistoryEndTs.run(msg.timestamp, senderId)
insertNameHistory.run(senderId, senderName, msg.timestamp, null)
tracker.currentName = senderName
tracker.lastSeenTs = msg.timestamp
} else {
tracker.lastSeenTs = msg.timestamp
}
}
},
})
// 更新成员的最新昵称
for (const [platformId, tracker] of nicknameTracker.entries()) {
updateMemberName.run(tracker.currentName, platformId)
}
// 提交事务
db.exec('COMMIT')
console.log(`[StreamImport] 导入完成: ${sessionId}`)
return { success: true, sessionId }
} catch (error) {
// 回滚事务
db.exec('ROLLBACK')
// 删除失败的数据库文件
const dbPath = getDbPath(sessionId)
if (fs.existsSync(dbPath)) {
fs.unlinkSync(dbPath)
}
console.error(`[StreamImport] 导入失败:`, error)
return {
success: false,
error: error instanceof Error ? error.message : String(error),
}
} finally {
db.close()
// 清理临时文件
if (tempFilePath && preprocessor) {
preprocessor.cleanup(tempFilePath)
console.log(`[StreamImport] 已清理临时文件: ${tempFilePath}`)
}
}
}
/**
* 流式解析文件获取基本信息(只统计,不写入数据库)
* 用于合并功能的预览
*/
export async function streamParseFileInfo(
filePath: string,
requestId: string
): Promise<{
name: string
format: string
platform: string
messageCount: number
memberCount: number
fileSize: number
}> {
const formatFeature = detectFormat(filePath)
if (!formatFeature) {
throw new Error('无法识别文件格式')
}
// 获取文件大小
const fileSize = fs.statSync(filePath).size
// 立即发送初始进度,让用户知道已开始处理
sendProgress(requestId, {
stage: 'parsing',
bytesRead: 0,
totalBytes: fileSize,
messagesProcessed: 0,
percentage: 0,
message: '正在读取文件...',
})
let name = '未知群聊'
let platform = formatFeature.platform
let messageCount = 0
const memberSet = new Set<string>()
await streamParseFile(filePath, {
// 对于大文件使用更小的批次,以更频繁地更新进度
batchSize: fileSize > 100 * 1024 * 1024 ? 2000 : 5000,
onProgress: (progress) => {
sendProgress(requestId, progress)
},
onMeta: (meta) => {
name = meta.name
platform = meta.platform
},
onMembers: (members) => {
for (const m of members) {
memberSet.add(m.platformId)
}
},
onMessageBatch: (messages) => {
messageCount += messages.length
for (const msg of messages) {
memberSet.add(msg.senderPlatformId)
}
},
})
return {
name,
format: formatFeature.name,
platform,
messageCount,
memberCount: memberSet.size,
fileSize,
}
}
+83 -8
View File
@@ -7,6 +7,7 @@ import { Worker } from 'worker_threads'
import { app } from 'electron'
import * as path from 'path'
import * as fs from 'fs'
import type { ParseProgress } from '../parser'
// Worker 实例
let worker: Worker | null = null
@@ -17,6 +18,7 @@ const pendingRequests = new Map<
{
resolve: (value: any) => void
reject: (error: Error) => void
onProgress?: (progress: ParseProgress) => void // 进度回调
}
>()
@@ -86,17 +88,26 @@ export function initWorker(): void {
// 监听 Worker 消息
worker.on('message', (message) => {
const { id, success, result, error } = message
const { id, type, success, result, error, payload } = message
const pending = pendingRequests.get(id)
if (pending) {
pendingRequests.delete(id)
if (!pending) return
if (success) {
pending.resolve(result)
} else {
pending.reject(new Error(error))
// 处理进度消息(不删除 pending,因为还没完成)
if (type === 'progress') {
if (pending.onProgress) {
pending.onProgress(payload)
}
return
}
// 处理完成或错误消息
pendingRequests.delete(id)
if (success) {
pending.resolve(result)
} else {
pending.reject(new Error(error))
}
})
@@ -130,7 +141,6 @@ export function initWorker(): void {
function sendToWorker<T>(type: string, payload: any): Promise<T> {
return new Promise((resolve, reject) => {
if (!worker) {
// 尝试初始化 Worker
try {
initWorker()
} catch (error) {
@@ -155,6 +165,42 @@ function sendToWorker<T>(type: string, payload: any): Promise<T> {
})
}
/**
* 发送消息到 Worker 并等待响应(带进度回调)
* 用于流式导入等长时间操作
*/
function sendToWorkerWithProgress<T>(
type: string,
payload: any,
onProgress?: (progress: ParseProgress) => void,
timeoutMs: number = 600000 // 默认 10 分钟超时
): Promise<T> {
return new Promise((resolve, reject) => {
if (!worker) {
try {
initWorker()
} catch (error) {
reject(new Error('Worker not initialized'))
return
}
}
const id = `req_${++requestIdCounter}`
pendingRequests.set(id, { resolve, reject, onProgress })
worker!.postMessage({ id, type, payload })
// 设置超时
setTimeout(() => {
if (pendingRequests.has(id)) {
pendingRequests.delete(id)
reject(new Error(`Worker request timeout: ${type}`))
}
}, timeoutMs)
})
}
/**
* 关闭 Worker
*/
@@ -261,11 +307,40 @@ export async function closeDatabase(sessionId: string): Promise<void> {
/**
* 解析文件获取基本信息(在 Worker 线程中执行)
* @deprecated 使用 streamParseFileInfo 替代
*/
export async function parseFileInfo(filePath: string): Promise<any> {
return sendToWorker('parseFileInfo', { filePath })
}
/**
* 流式解析文件获取基本信息(用于合并预览)
*/
export async function streamParseFileInfo(
filePath: string,
onProgress?: (progress: ParseProgress) => void
): Promise<{
name: string
format: string
platform: string
messageCount: number
memberCount: number
}> {
return sendToWorkerWithProgress('streamParseFileInfo', { filePath }, onProgress)
}
/**
* 流式导入聊天记录
* @param filePath 文件路径
* @param onProgress 进度回调
*/
export async function streamImport(
filePath: string,
onProgress?: (progress: ParseProgress) => void
): Promise<{ success: boolean; sessionId?: string; error?: string }> {
return sendToWorkerWithProgress('streamImport', { filePath }, onProgress)
}
/**
* 获取数据库目录(供外部使用)
*/