feat: 大文件合并和导入性能优化

This commit is contained in:
digua
2025-12-02 01:53:44 +08:00
parent b43191c7e0
commit 7ca4d0d7d8
7 changed files with 1247 additions and 137 deletions
+39 -27
View File
@@ -1,6 +1,7 @@
import { ipcMain, app, dialog, clipboard, shell, BrowserWindow } from 'electron' import { ipcMain, app, dialog, clipboard, shell, BrowserWindow } from 'electron'
import { autoUpdater } from 'electron-updater' import { autoUpdater } from 'electron-updater'
import * as fs from 'fs/promises' import * as fs from 'fs/promises'
import * as fsSync from 'fs'
// 导入数据库核心模块(用于导入和删除操作) // 导入数据库核心模块(用于导入和删除操作)
import * as databaseCore from './database/core' import * as databaseCore from './database/core'
@@ -8,36 +9,47 @@ import * as databaseCore from './database/core'
import * as worker from './worker' import * as worker from './worker'
// 导入解析器模块 // 导入解析器模块
import * as parser from './parser' import * as parser from './parser'
import { detectFormat, type ParseProgress, type ParseResult } from './parser' import { detectFormat, type ParseProgress } from './parser'
// 导入合并模块 // 导入合并模块
import * as merger from './merger' import * as merger from './merger'
import { deleteTempDatabase, cleanupAllTempDatabases } from './merger/tempCache'
import type { MergeParams } from '../../src/types/chat' import type { MergeParams } from '../../src/types/chat'
console.log('[IpcMain] Database, Worker and Parser modules imported') console.log('[IpcMain] Database, Worker and Parser modules imported')
// ==================== 解析结果缓存 ==================== // ==================== 临时数据库缓存 ====================
// 用于合并功能:缓存文件的完整解析结果,避免重复解析 // 用于合并功能:缓存文件对应的临时数据库路径
// 这样用户删除本地文件后仍然可以进行合并 // 这样用户删除本地文件后仍然可以进行合并(数据已存入临时数据库)
const parseResultCache = new Map<string, ParseResult>() const tempDbCache = new Map<string, string>()
/** /**
* 清理指定文件的缓存 * 清理指定文件的缓存(删除临时数据库)
*/ */
function clearParseCache(filePath: string): void { function clearTempDbCache(filePath: string): void {
parseResultCache.delete(filePath) const tempDbPath = tempDbCache.get(filePath)
if (tempDbPath) {
deleteTempDatabase(tempDbPath)
tempDbCache.delete(filePath)
}
} }
/** /**
* 清理所有缓存 * 清理所有缓存(删除所有临时数据库)
*/ */
function clearAllParseCache(): void { function clearAllTempDbCache(): void {
parseResultCache.clear() for (const tempDbPath of tempDbCache.values()) {
console.log('[IpcMain] 已清理所有解析缓存') deleteTempDatabase(tempDbPath)
}
tempDbCache.clear()
console.log('[IpcMain] 已清理所有临时数据库缓存')
} }
const mainIpcMain = (win: BrowserWindow) => { const mainIpcMain = (win: BrowserWindow) => {
console.log('[IpcMain] Registering IPC handlers...') console.log('[IpcMain] Registering IPC handlers...')
// 清理残留的临时数据库(上次崩溃可能残留)
cleanupAllTempDatabases()
// 初始化 Worker // 初始化 Worker
try { try {
worker.initWorker() worker.initWorker()
@@ -617,11 +629,11 @@ const mainIpcMain = (win: BrowserWindow) => {
/** /**
* 解析文件获取基本信息(用于合并预览) * 解析文件获取基本信息(用于合并预览)
* 使用流式解析获取进度,同时缓存完整解析结果 * 使用流式解析,数据写入临时数据库,避免内存溢出
*/ */
ipcMain.handle('merge:parseFileInfo', async (_, filePath: string) => { ipcMain.handle('merge:parseFileInfo', async (_, filePath: string) => {
try { try {
// 使用流式解析,避免大文件 OOM,同时获取完整解析结果 // 使用流式解析,写入临时数据库
const result = await worker.streamParseFileInfo(filePath, (progress: ParseProgress) => { const result = await worker.streamParseFileInfo(filePath, (progress: ParseProgress) => {
// 可选:发送进度到渲染进程 // 可选:发送进度到渲染进程
win.webContents.send('merge:parseProgress', { win.webContents.send('merge:parseProgress', {
@@ -630,14 +642,14 @@ const mainIpcMain = (win: BrowserWindow) => {
}) })
}) })
// 缓存完整解析结果(用于后续合并) // 缓存临时数据库路径(用于后续合并)
// 这样即使用户删除本地文件,也能继续合并 // 这样即使用户删除本地文件,也能继续合并(数据已在临时数据库中)
if (result.parseResult) { if (result.tempDbPath) {
parseResultCache.set(filePath, result.parseResult) tempDbCache.set(filePath, result.tempDbPath)
console.log(`[IpcMain] 已缓存解析结果: ${filePath}, 消息数: ${result.parseResult.messages.length}`) console.log(`[IpcMain] 已缓存临时数据库: ${filePath} -> ${result.tempDbPath}`)
} }
// 返回基本信息(不包含完整解析结果,减少 IPC 传输) // 返回基本信息
return { return {
name: result.name, name: result.name,
format: result.format, format: result.format,
@@ -653,11 +665,11 @@ const mainIpcMain = (win: BrowserWindow) => {
}) })
/** /**
* 检测合并冲突(使用缓存的解析结果 * 检测合并冲突(使用临时数据库
*/ */
ipcMain.handle('merge:checkConflicts', async (_, filePaths: string[]) => { ipcMain.handle('merge:checkConflicts', async (_, filePaths: string[]) => {
try { try {
return merger.checkConflictsWithCache(filePaths, parseResultCache) return merger.checkConflictsWithTempDb(filePaths, tempDbCache)
} catch (error) { } catch (error) {
console.error('检测冲突失败:', error) console.error('检测冲突失败:', error)
throw error throw error
@@ -665,15 +677,15 @@ const mainIpcMain = (win: BrowserWindow) => {
}) })
/** /**
* 执行合并(使用缓存的解析结果 * 执行合并(使用临时数据库
*/ */
ipcMain.handle('merge:mergeFiles', async (_, params: MergeParams) => { ipcMain.handle('merge:mergeFiles', async (_, params: MergeParams) => {
try { try {
const result = await merger.mergeFilesWithCache(params, parseResultCache) const result = await merger.mergeFilesWithTempDb(params, tempDbCache)
// 合并完成后清理缓存 // 合并完成后清理缓存
if (result.success) { if (result.success) {
for (const filePath of params.filePaths) { for (const filePath of params.filePaths) {
clearParseCache(filePath) clearTempDbCache(filePath)
} }
} }
return result return result
@@ -688,9 +700,9 @@ const mainIpcMain = (win: BrowserWindow) => {
*/ */
ipcMain.handle('merge:clearCache', async (_, filePath?: string) => { ipcMain.handle('merge:clearCache', async (_, filePath?: string) => {
if (filePath) { if (filePath) {
clearParseCache(filePath) clearTempDbCache(filePath)
} else { } else {
clearAllParseCache() clearAllTempDbCache()
} }
return true return true
}) })
+316 -23
View File
@@ -8,6 +8,7 @@ import * as path from 'path'
import { app } from 'electron' import { app } from 'electron'
import { parseFileSync, detectFormat } from '../parser' import { parseFileSync, detectFormat } from '../parser'
import { importData } from '../database/core' import { importData } from '../database/core'
import { TempDbReader } from './tempCache'
import type { import type {
ParseResult, ParseResult,
ParsedMessage, ParsedMessage,
@@ -23,6 +24,8 @@ import type {
ChatPlatform, ChatPlatform,
ChatType, ChatType,
MergeSource, MergeSource,
ParsedMeta,
ParsedMember,
} from '../../../src/types/chat' } from '../../../src/types/chat'
/** /**
@@ -192,6 +195,16 @@ export async function checkConflicts(filePaths: string[]): Promise<ConflictCheck
/** /**
* 内部函数:检测消息中的冲突 * 内部函数:检测消息中的冲突
*/ */
/**
* 检查消息是否是纯图片消息
* 纯图片消息格式如:[图片: xxx.jpg]、[图片: {xxx}.jpg] 等
*/
function isImageOnlyMessage(content: string | undefined): boolean {
if (!content) return false
// 匹配 [图片: xxx] 格式,允许各种图片名称格式
return /^\[图片:\s*.+\]$/.test(content.trim())
}
function detectConflictsInMessages( function detectConflictsInMessages(
allMessages: Array<{ msg: ParsedMessage; source: string }>, allMessages: Array<{ msg: ParsedMessage; source: string }>,
conflicts: MergeConflict[] conflicts: MergeConflict[]
@@ -214,6 +227,9 @@ function detectConflictsInMessages(
} }
console.log(`[Merger] 有多条消息的时间戳数: ${multiMsgTsCount}`) console.log(`[Merger] 有多条消息的时间戳数: ${multiMsgTsCount}`)
// 统计自动去重数量
let autoDeduplicatedCount = 0
// 检测每个时间戳内的冲突 // 检测每个时间戳内的冲突
for (const [ts, items] of timeGroups) { for (const [ts, items] of timeGroups) {
if (items.length < 2) continue if (items.length < 2) continue
@@ -239,23 +255,36 @@ function detectConflictsInMessages(
continue continue
} }
// 按内容长度分组 // 按内容分组(完全相同的内容会被分到一组,自动去重)
const lengthGroups = new Map<number, Array<{ msg: ParsedMessage; source: string }>>() const contentGroups = new Map<string, Array<{ msg: ParsedMessage; source: string }>>()
for (const item of senderItems) { for (const item of senderItems) {
const len = (item.msg.content || '').length const content = item.msg.content || ''
if (!lengthGroups.has(len)) { if (!contentGroups.has(content)) {
lengthGroups.set(len, []) contentGroups.set(content, [])
} }
lengthGroups.get(len)!.push(item) contentGroups.get(content)!.push(item)
} }
// 如果有多个不同长度的消息,说明可能是冲突 // 统计自动去重的消息(内容完全相同但来自不同文件)
if (lengthGroups.size > 1) { for (const [, contentItems] of contentGroups) {
const lengthEntries = Array.from(lengthGroups.entries()) if (contentItems.length > 1) {
for (let i = 0; i < lengthEntries.length - 1; i++) { const contentSources = new Set(contentItems.map((it) => it.source))
for (let j = i + 1; j < lengthEntries.length; j++) { if (contentSources.size > 1) {
const [len1, items1] = lengthEntries[i] // 内容相同但来自不同文件,自动去重
const [len2, items2] = lengthEntries[j] autoDeduplicatedCount += contentItems.length - 1
}
}
}
// 只有当有多个不同内容时才是真正的冲突
if (contentGroups.size > 1) {
const contentEntries = Array.from(contentGroups.entries())
// 检查这些不同内容是否来自不同文件
for (let i = 0; i < contentEntries.length - 1; i++) {
for (let j = i + 1; j < contentEntries.length; j++) {
const [content1, items1] = contentEntries[i]
const [content2, items2] = contentEntries[j]
// 找到两个来源不同的消息 // 找到两个来源不同的消息
const item1 = items1[0] const item1 = items1[0]
@@ -264,27 +293,29 @@ function detectConflictsInMessages(
// 如果找不到来自不同文件的消息,跳过 // 如果找不到来自不同文件的消息,跳过
if (!item2) continue if (!item2) continue
// 如果两边都是纯图片消息,自动跳过(不需要用户选择)
if (isImageOnlyMessage(content1) && isImageOnlyMessage(content2)) {
autoDeduplicatedCount++
continue
}
// 打印冲突详情 // 打印冲突详情
if (conflicts.length < 5) { if (conflicts.length < 5) {
console.log(`[Merger] 冲突 #${conflicts.length + 1}:`) console.log(`[Merger] 冲突 #${conflicts.length + 1}:`)
console.log(` 时间戳: ${ts} (${new Date(ts * 1000).toLocaleString()})`) console.log(` 时间戳: ${ts} (${new Date(ts * 1000).toLocaleString()})`)
console.log(` 发送者: ${sender} (${item1.msg.senderName})`) console.log(` 发送者: ${sender} (${item1.msg.senderName})`)
console.log( console.log(` 文件1: ${item1.source}, 长度: ${content1.length}, 内容: "${content1.slice(0, 50)}..."`)
` 文件1: ${item1.source}, 长度: ${len1}, 内容: "${(item1.msg.content || '').slice(0, 50)}..."` console.log(` 文件2: ${item2.source}, 长度: ${content2.length}, 内容: "${content2.slice(0, 50)}..."`)
)
console.log(
` 文件2: ${item2.source}, 长度: ${len2}, 内容: "${(item2.msg.content || '').slice(0, 50)}..."`
)
} }
conflicts.push({ conflicts.push({
id: `conflict_${ts}_${sender}_${conflicts.length}`, id: `conflict_${ts}_${sender}_${conflicts.length}`,
timestamp: ts, timestamp: ts,
sender: item1.msg.senderName || sender, sender: item1.msg.senderName || sender,
contentLength1: len1, contentLength1: content1.length,
contentLength2: len2, contentLength2: content2.length,
content1: item1.msg.content || '', content1: content1,
content2: item2.msg.content || '', content2: content2,
}) })
} }
} }
@@ -292,6 +323,8 @@ function detectConflictsInMessages(
} }
} }
console.log(`[Merger] 自动去重消息数(含图片冲突): ${autoDeduplicatedCount}`)
console.log(`[Merger] 检测到冲突数: ${conflicts.length}`) console.log(`[Merger] 检测到冲突数: ${conflicts.length}`)
// 计算去重后的消息数 // 计算去重后的消息数
@@ -531,3 +564,263 @@ function executeMerge(
} }
} }
} }
// ==================== 临时数据库版本(方案3:内存优化) ====================
/**
* 检测合并冲突(使用临时数据库,内存友好)
*/
export async function checkConflictsWithTempDb(
filePaths: string[],
tempDbCache: Map<string, string>
): Promise<ConflictCheckResult> {
const allMessages: Array<{ msg: ParsedMessage; source: string }> = []
const conflicts: MergeConflict[] = []
console.log('[Merger] checkConflictsWithTempDb: 开始检测冲突')
console.log(
'[Merger] 文件列表:',
filePaths.map((p) => path.basename(p))
)
console.log(
'[Merger] 临时数据库缓存状态:',
filePaths.map((p) => `${path.basename(p)}: ${tempDbCache.has(p) ? '已缓存' : '未缓存'}`)
)
// 从临时数据库读取所有消息
const readers: TempDbReader[] = []
try {
for (const filePath of filePaths) {
const tempDbPath = tempDbCache.get(filePath)
if (!tempDbPath) {
throw new Error(`未找到文件的临时数据库: ${path.basename(filePath)}`)
}
const reader = new TempDbReader(tempDbPath)
readers.push(reader)
const meta = reader.getMeta()
const sourceName = path.basename(filePath)
console.log(`[Merger] 从临时数据库读取: ${sourceName}, 平台: ${meta?.platform}`)
// 流式读取消息,避免一次性加载到内存
reader.streamMessages(10000, (messages) => {
for (const msg of messages) {
allMessages.push({ msg, source: sourceName })
}
})
}
console.log(`[Merger] 总消息数: ${allMessages.length}`)
// 检查格式一致性
const platforms = readers.map((r) => r.getMeta()?.platform || 'unknown')
const uniquePlatforms = [...new Set(platforms)]
if (uniquePlatforms.length > 1) {
throw new Error(
`不支持合并不同格式的聊天记录。\n检测到的格式:${uniquePlatforms.join('、')}\n请确保所有文件使用相同的导出工具和格式。`
)
}
console.log('[Merger] 格式检查通过:', uniquePlatforms[0])
return detectConflictsInMessages(allMessages, conflicts)
} finally {
// 关闭所有 reader
for (const reader of readers) {
reader.close()
}
}
}
/**
* 合并多个聊天记录文件(使用临时数据库,内存友好)
*/
export async function mergeFilesWithTempDb(
params: MergeParams,
tempDbCache: Map<string, string>
): Promise<MergeResult> {
const { filePaths, outputName, outputDir, conflictResolutions, andAnalyze } = params
console.log('[Merger] mergeFilesWithTempDb: 开始合并')
console.log(
'[Merger] 临时数据库缓存状态:',
filePaths.map((p) => `${path.basename(p)}: ${tempDbCache.has(p) ? '已缓存' : '未缓存'}`)
)
const readers: TempDbReader[] = []
try {
// 打开所有临时数据库
const parseResults: Array<{ meta: ParsedMeta; members: ParsedMember[]; source: string; reader: TempDbReader }> = []
for (const filePath of filePaths) {
const tempDbPath = tempDbCache.get(filePath)
if (!tempDbPath) {
throw new Error(`未找到文件的临时数据库: ${path.basename(filePath)}`)
}
const reader = new TempDbReader(tempDbPath)
readers.push(reader)
const meta = reader.getMeta()
if (!meta) {
throw new Error(`无法读取元信息: ${path.basename(filePath)}`)
}
const members = reader.getMembers()
const sourceName = path.basename(filePath)
console.log(`[Merger] 使用临时数据库: ${sourceName}`)
parseResults.push({ meta, members, source: sourceName, reader })
}
// 合并成员
const memberMap = new Map<string, ChatLabMember>()
for (const { members } of parseResults) {
for (const member of members) {
const existing = memberMap.get(member.platformId)
if (existing) {
if (existing.name !== member.name && !existing.aliases?.includes(member.name)) {
existing.aliases = existing.aliases || []
existing.aliases.push(member.name)
}
} else {
memberMap.set(member.platformId, {
platformId: member.platformId,
name: member.name,
})
}
}
}
// 流式合并消息(去重)- 使用 Set 替代 Map 以提高性能
// 注:冲突解决方案通过消息处理顺序生效(第一个被处理的版本会被保留)
const seenKeys = new Set<string>()
const mergedMessages: ChatLabMessage[] = []
let totalProcessed = 0
const startTime = Date.now()
for (const { reader, source } of parseResults) {
const readerStartTime = Date.now()
let readerCount = 0
reader.streamMessages(10000, (messages) => {
for (const msg of messages) {
const key = getMessageKey(msg)
// 跳过已处理的消息(去重)
if (seenKeys.has(key)) {
continue
}
seenKeys.add(key)
// 注:冲突已在去重时处理(seenKeys),用户选择的冲突解决方案
// 决定了哪个版本的消息先被处理,后续相同 key 的消息会被跳过
mergedMessages.push({
sender: msg.senderPlatformId,
name: msg.senderName,
timestamp: msg.timestamp,
type: msg.type,
content: msg.content,
})
readerCount++
}
totalProcessed += messages.length
})
console.log(`[Merger] 处理 ${source}: ${readerCount} 条唯一消息, 耗时: ${Date.now() - readerStartTime}ms`)
}
// 排序
const sortStartTime = Date.now()
mergedMessages.sort((a, b) => a.timestamp - b.timestamp)
console.log(`[Merger] 排序耗时: ${Date.now() - sortStartTime}ms`)
console.log(`[Merger] 合并后消息数: ${mergedMessages.length}`)
// 确定平台
const platforms = new Set(parseResults.map((r) => r.meta.platform))
const platform = platforms.size === 1 ? parseResults[0].meta.platform : 'mixed'
// 构建来源信息
const sources: MergeSource[] = parseResults.map(({ reader, source, meta }) => ({
filename: source,
platform: meta.platform,
messageCount: reader.getMessageCount(),
}))
// 构建 ChatLab 格式
const chatLabData: ChatLabFormat = {
chatlab: {
version: '1.0.0',
exportedAt: Math.floor(Date.now() / 1000),
generator: 'ChatLab Merge Tool',
},
meta: {
name: outputName,
platform: platform as ChatPlatform,
type: parseResults[0].meta.type as ChatType,
sources,
},
members: Array.from(memberMap.values()),
messages: mergedMessages,
}
// 写入文件(不格式化 JSON 以提高性能)
const targetDir = outputDir || getDefaultOutputDir()
ensureOutputDir(targetDir)
const filename = generateOutputFilename(outputName)
const outputPath = path.join(targetDir, filename)
const writeStartTime = Date.now()
fs.writeFileSync(outputPath, JSON.stringify(chatLabData), 'utf-8')
console.log(`[Merger] 写入文件耗时: ${Date.now() - writeStartTime}ms`)
console.log(`[Merger] 总合并耗时: ${Date.now() - startTime}ms`)
// 如果需要分析,导入数据库
let sessionId: string | undefined
if (andAnalyze) {
const importStartTime = Date.now()
const parseResult: ParseResult = {
meta: {
name: chatLabData.meta.name,
platform: chatLabData.meta.platform,
type: chatLabData.meta.type,
},
members: chatLabData.members.map((m) => ({
platformId: m.platformId,
name: m.name,
})),
messages: chatLabData.messages.map((msg) => ({
senderPlatformId: msg.sender,
senderName: msg.name,
timestamp: msg.timestamp,
type: msg.type,
content: msg.content,
})),
}
sessionId = importData(parseResult)
console.log(`[Merger] 导入数据库耗时: ${Date.now() - importStartTime}ms`)
}
return {
success: true,
outputPath,
sessionId,
}
} catch (err) {
return {
success: false,
error: err instanceof Error ? err.message : '合并失败',
}
} finally {
// 关闭所有 reader
for (const reader of readers) {
reader.close()
}
}
}
+351
View File
@@ -0,0 +1,351 @@
/**
* 临时数据库缓存管理器
* 用于合并功能:将解析结果存入临时 SQLite 数据库,避免内存溢出
*/
import Database from 'better-sqlite3'
import * as fs from 'fs'
import * as path from 'path'
import { app } from 'electron'
import type { ParseResult, ParsedMeta, ParsedMember, ParsedMessage } from '../../../src/types/chat'
// 临时数据库目录
let tempDir: string | null = null
/**
* 获取临时数据库目录
*/
function getTempDir(): string {
if (tempDir) return tempDir
try {
const docPath = app.getPath('documents')
tempDir = path.join(docPath, 'ChatLab', 'temp')
} catch (error) {
console.error('[TempCache] Error getting documents path:', error)
tempDir = path.join(process.cwd(), 'temp')
}
// 确保目录存在
if (!fs.existsSync(tempDir)) {
fs.mkdirSync(tempDir, { recursive: true })
}
return tempDir
}
/**
* 生成临时数据库文件路径
*/
export function generateTempDbPath(sourceFilePath: string): string {
const timestamp = Date.now()
const random = Math.random().toString(36).substring(2, 8)
const baseName = path.basename(sourceFilePath, path.extname(sourceFilePath))
const safeName = baseName.replace(/[/\\?%*:|"<>]/g, '_').substring(0, 50)
return path.join(getTempDir(), `merge_${safeName}_${timestamp}_${random}.db`)
}
/**
* 创建临时数据库并初始化表结构
*/
export function createTempDatabase(dbPath: string): Database.Database {
const db = new Database(dbPath)
db.pragma('journal_mode = WAL')
db.pragma('synchronous = NORMAL')
db.exec(`
CREATE TABLE IF NOT EXISTS meta (
name TEXT NOT NULL,
platform TEXT NOT NULL,
type TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS member (
platform_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
nickname TEXT
);
CREATE TABLE IF NOT EXISTS message (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender_platform_id TEXT NOT NULL,
sender_name TEXT NOT NULL,
timestamp INTEGER NOT NULL,
type INTEGER NOT NULL,
content TEXT
);
CREATE INDEX IF NOT EXISTS idx_message_ts ON message(timestamp);
CREATE INDEX IF NOT EXISTS idx_message_sender ON message(sender_platform_id);
`)
return db
}
/**
* 临时数据库写入器
* 用于流式写入解析结果
*/
export class TempDbWriter {
private db: Database.Database
private insertMeta: Database.Statement
private insertMember: Database.Statement
private insertMessage: Database.Statement
private memberSet: Set<string> = new Set()
private messageCount: number = 0
constructor(dbPath: string) {
this.db = createTempDatabase(dbPath)
// 准备语句
this.insertMeta = this.db.prepare(`
INSERT INTO meta (name, platform, type) VALUES (?, ?, ?)
`)
this.insertMember = this.db.prepare(`
INSERT OR IGNORE INTO member (platform_id, name, nickname) VALUES (?, ?, ?)
`)
this.insertMessage = this.db.prepare(`
INSERT INTO message (sender_platform_id, sender_name, timestamp, type, content)
VALUES (?, ?, ?, ?, ?)
`)
// 开始事务
this.db.exec('BEGIN TRANSACTION')
}
/**
* 写入元信息
*/
writeMeta(meta: ParsedMeta): void {
this.insertMeta.run(meta.name, meta.platform, meta.type)
}
/**
* 写入成员(批量)
*/
writeMembers(members: ParsedMember[]): void {
for (const m of members) {
if (!this.memberSet.has(m.platformId)) {
this.memberSet.add(m.platformId)
this.insertMember.run(m.platformId, m.name, m.nickname || null)
}
}
}
/**
* 写入消息(批量)
*/
writeMessages(messages: ParsedMessage[]): void {
for (const msg of messages) {
// 确保成员存在
if (!this.memberSet.has(msg.senderPlatformId)) {
this.memberSet.add(msg.senderPlatformId)
this.insertMember.run(msg.senderPlatformId, msg.senderName, null)
}
this.insertMessage.run(msg.senderPlatformId, msg.senderName, msg.timestamp, msg.type, msg.content || null)
this.messageCount++
}
}
/**
* 完成写入(提交事务)
*/
finish(): { messageCount: number; memberCount: number } {
this.db.exec('COMMIT')
const result = {
messageCount: this.messageCount,
memberCount: this.memberSet.size,
}
this.db.close()
return result
}
/**
* 取消写入(回滚事务)
*/
abort(): void {
try {
this.db.exec('ROLLBACK')
} catch {
// 忽略回滚错误
}
this.db.close()
}
}
/**
* 临时数据库读取器
* 用于流式读取合并时的数据
*/
export class TempDbReader {
private db: Database.Database
private dbPath: string
constructor(dbPath: string) {
this.dbPath = dbPath
this.db = new Database(dbPath, { readonly: true })
this.db.pragma('journal_mode = WAL')
}
/**
* 读取元信息
*/
getMeta(): ParsedMeta | null {
const row = this.db.prepare('SELECT * FROM meta LIMIT 1').get() as
| { name: string; platform: string; type: string }
| undefined
if (!row) return null
return {
name: row.name,
platform: row.platform,
type: row.type as 'group' | 'private',
}
}
/**
* 读取所有成员
*/
getMembers(): ParsedMember[] {
const rows = this.db.prepare('SELECT * FROM member').all() as Array<{
platform_id: string
name: string
nickname: string | null
}>
return rows.map((r) => ({
platformId: r.platform_id,
name: r.name,
nickname: r.nickname || undefined,
}))
}
/**
* 获取消息总数
*/
getMessageCount(): number {
const row = this.db.prepare('SELECT COUNT(*) as count FROM message').get() as { count: number }
return row.count
}
/**
* 流式读取消息(分批)
* @param batchSize 每批消息数量
* @param callback 处理每批消息的回调
*/
streamMessages(batchSize: number, callback: (messages: ParsedMessage[]) => void): void {
const stmt = this.db.prepare(`
SELECT sender_platform_id, sender_name, timestamp, type, content
FROM message
ORDER BY timestamp ASC
LIMIT ? OFFSET ?
`)
let offset = 0
while (true) {
const rows = stmt.all(batchSize, offset) as Array<{
sender_platform_id: string
sender_name: string
timestamp: number
type: number
content: string | null
}>
if (rows.length === 0) break
const messages: ParsedMessage[] = rows.map((r) => ({
senderPlatformId: r.sender_platform_id,
senderName: r.sender_name,
timestamp: r.timestamp,
type: r.type,
content: r.content || undefined,
}))
callback(messages)
offset += batchSize
}
}
/**
* 获取所有消息(用于冲突检测,内存中处理)
* 注意:对于超大文件,应使用 streamMessages
*/
getAllMessages(): ParsedMessage[] {
const rows = this.db
.prepare(
`
SELECT sender_platform_id, sender_name, timestamp, type, content
FROM message
ORDER BY timestamp ASC
`
)
.all() as Array<{
sender_platform_id: string
sender_name: string
timestamp: number
type: number
content: string | null
}>
return rows.map((r) => ({
senderPlatformId: r.sender_platform_id,
senderName: r.sender_name,
timestamp: r.timestamp,
type: r.type,
content: r.content || undefined,
}))
}
/**
* 关闭数据库连接
*/
close(): void {
this.db.close()
}
/**
* 获取数据库路径
*/
getPath(): string {
return this.dbPath
}
}
/**
* 删除临时数据库文件
*/
export function deleteTempDatabase(dbPath: string): void {
try {
const walPath = dbPath + '-wal'
const shmPath = dbPath + '-shm'
if (fs.existsSync(dbPath)) fs.unlinkSync(dbPath)
if (fs.existsSync(walPath)) fs.unlinkSync(walPath)
if (fs.existsSync(shmPath)) fs.unlinkSync(shmPath)
console.log(`[TempCache] 已删除临时数据库: ${dbPath}`)
} catch (error) {
console.error(`[TempCache] 删除临时数据库失败: ${dbPath}`, error)
}
}
/**
* 清理所有临时数据库(应用启动时调用)
*/
export function cleanupAllTempDatabases(): void {
try {
const dir = getTempDir()
if (!fs.existsSync(dir)) return
const files = fs.readdirSync(dir)
for (const file of files) {
if (file.startsWith('merge_') && file.endsWith('.db')) {
const filePath = path.join(dir, file)
deleteTempDatabase(filePath)
}
}
console.log('[TempCache] 已清理所有临时数据库')
} catch (error) {
console.error('[TempCache] 清理临时数据库失败:', error)
}
}
+112
View File
@@ -0,0 +1,112 @@
/**
* 性能日志模块
* 实时记录导入过程的性能指标
*/
import * as fs from 'fs'
import * as path from 'path'
import { getDbDir } from './dbCore'
// 状态
let lastLogTime = Date.now()
let lastMessageCount = 0
let currentLogFile: string | null = null
/**
* 获取性能日志目录
*/
function getLogDir(): string {
const dbDir = getDbDir()
const logDir = path.join(path.dirname(dbDir), 'logs')
if (!fs.existsSync(logDir)) {
fs.mkdirSync(logDir, { recursive: true })
}
return logDir
}
/**
* 初始化日志文件(实时写入)
*/
export function initPerfLog(sessionId: string): void {
try {
const logDir = getLogDir()
currentLogFile = path.join(logDir, `import_${sessionId}_${Date.now()}.log`)
// 写入头部
fs.writeFileSync(currentLogFile, `=== 导入性能日志 ===\n开始时间: ${new Date().toISOString()}\n\n`, 'utf-8')
console.log(`[PerfLog] 日志文件: ${currentLogFile}`)
} catch (e) {
console.error('[PerfLog] 初始化日志文件失败:', e)
}
}
/**
* 实时记录性能日志(每次追加写入文件)
*/
export function logPerf(event: string, messagesProcessed: number, batchSize?: number): void {
const now = Date.now()
const duration = now - lastLogTime
const messagesDelta = messagesProcessed - lastMessageCount
const speed = duration > 0 ? Math.round((messagesDelta / duration) * 1000) : 0
// 获取内存使用
let memory = 0
try {
const used = process.memoryUsage()
memory = Math.round(used.heapUsed / 1024 / 1024)
} catch {
// 忽略
}
const logLine =
`[${new Date().toISOString()}] ${event} | ` +
`消息: ${messagesProcessed.toLocaleString()} | ` +
`耗时: ${duration}ms | ` +
`速度: ${speed.toLocaleString()}/秒 | ` +
`内存: ${memory}MB` +
(batchSize ? ` | 批次: ${batchSize}` : '') +
'\n'
// 控制台输出
console.log(`[PerfLog] ${logLine.trim()}`)
// 实时写入文件
if (currentLogFile) {
try {
fs.appendFileSync(currentLogFile, logLine, 'utf-8')
} catch (e) {
console.error('[PerfLog] 写入日志失败:', e)
}
}
lastLogTime = now
lastMessageCount = messagesProcessed
}
/**
* 追加详细日志(分阶段耗时)
*/
export function logPerfDetail(detail: string): void {
if (currentLogFile) {
try {
fs.appendFileSync(currentLogFile, ` ${detail}\n`, 'utf-8')
} catch {
// 忽略
}
}
}
/**
* 重置性能日志状态
*/
export function resetPerfLog(): void {
lastLogTime = Date.now()
lastMessageCount = 0
currentLogFile = null
}
/**
* 获取当前日志文件路径
*/
export function getCurrentLogFile(): string | null {
return currentLogFile
}
+361 -73
View File
@@ -16,7 +16,6 @@ import {
type ParsedMeta, type ParsedMeta,
type ParsedMember, type ParsedMember,
type ParsedMessage, type ParsedMessage,
getFileSize,
} from '../parser' } from '../parser'
/** 流式导入结果 */ /** 流式导入结果 */
@@ -26,6 +25,70 @@ export interface StreamImportResult {
error?: string error?: string
} }
import { getDbDir } from './dbCore' import { getDbDir } from './dbCore'
import { initPerfLog, logPerf, logPerfDetail, resetPerfLog, getCurrentLogFile } from './perfLogger'
// ==================== 临时数据库相关(用于合并功能) ====================
/**
* 获取临时数据库目录(Worker 环境)
*/
function getTempDir(): string {
const dbDir = getDbDir()
const tempDir = path.join(path.dirname(dbDir), 'temp')
if (!fs.existsSync(tempDir)) {
fs.mkdirSync(tempDir, { recursive: true })
}
return tempDir
}
/**
* 生成临时数据库文件路径
*/
function generateTempDbPath(sourceFilePath: string): string {
const timestamp = Date.now()
const random = Math.random().toString(36).substring(2, 8)
const baseName = path.basename(sourceFilePath, path.extname(sourceFilePath))
const safeName = baseName.replace(/[/\\?%*:|"<>]/g, '_').substring(0, 50)
return path.join(getTempDir(), `merge_${safeName}_${timestamp}_${random}.db`)
}
/**
* 创建临时数据库并初始化表结构
*/
function createTempDatabase(dbPath: string): Database.Database {
const db = new Database(dbPath)
db.pragma('journal_mode = WAL')
db.pragma('synchronous = NORMAL')
db.exec(`
CREATE TABLE IF NOT EXISTS meta (
name TEXT NOT NULL,
platform TEXT NOT NULL,
type TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS member (
platform_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
nickname TEXT
);
CREATE TABLE IF NOT EXISTS message (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sender_platform_id TEXT NOT NULL,
sender_name TEXT NOT NULL,
timestamp INTEGER NOT NULL,
type INTEGER NOT NULL,
content TEXT
);
CREATE INDEX IF NOT EXISTS idx_message_ts ON message(timestamp);
CREATE INDEX IF NOT EXISTS idx_message_sender ON message(sender_platform_id);
`)
return db
}
/** /**
* 发送进度到主进程 * 发送进度到主进程
@@ -55,9 +118,9 @@ function getDbPath(sessionId: string): string {
} }
/** /**
* 创建数据库并初始化表结构 * 创建数据库并初始化表结构(不含索引,用于快速导入)
*/ */
function createDatabase(sessionId: string): Database.Database { function createDatabaseWithoutIndexes(sessionId: string): Database.Database {
const dbDir = getDbDir() const dbDir = getDbDir()
if (!fs.existsSync(dbDir)) { if (!fs.existsSync(dbDir)) {
fs.mkdirSync(dbDir, { recursive: true }) fs.mkdirSync(dbDir, { recursive: true })
@@ -68,7 +131,10 @@ function createDatabase(sessionId: string): Database.Database {
db.pragma('journal_mode = WAL') db.pragma('journal_mode = WAL')
db.pragma('synchronous = NORMAL') db.pragma('synchronous = NORMAL')
// 增加缓存大小以提高写入性能
db.pragma('cache_size = -64000') // 64MB 缓存
// 创建表结构(不创建索引,导入完成后再创建)
db.exec(` db.exec(`
CREATE TABLE IF NOT EXISTS meta ( CREATE TABLE IF NOT EXISTS meta (
name TEXT NOT NULL, name TEXT NOT NULL,
@@ -101,13 +167,25 @@ function createDatabase(sessionId: string): Database.Database {
content TEXT, content TEXT,
FOREIGN KEY(sender_id) REFERENCES member(id) FOREIGN KEY(sender_id) REFERENCES member(id)
); );
`)
return db
}
/**
* 导入完成后创建索引
*/
function createIndexes(db: Database.Database): void {
console.log('[StreamImport] 开始创建索引...')
const startTime = Date.now()
db.exec(`
CREATE INDEX IF NOT EXISTS idx_message_ts ON message(ts); CREATE INDEX IF NOT EXISTS idx_message_ts ON message(ts);
CREATE INDEX IF NOT EXISTS idx_message_sender ON message(sender_id); CREATE INDEX IF NOT EXISTS idx_message_sender ON message(sender_id);
CREATE INDEX IF NOT EXISTS idx_member_name_history_member_id ON member_name_history(member_id); CREATE INDEX IF NOT EXISTS idx_member_name_history_member_id ON member_name_history(member_id);
`) `)
return db console.log(`[StreamImport] 索引创建完成,耗时: ${Date.now() - startTime}ms`)
} }
/** /**
@@ -124,6 +202,12 @@ export async function streamImport(filePath: string, requestId: string): Promise
console.log(`[StreamImport] 开始导入: ${filePath}, 格式: ${formatFeature.name}`) console.log(`[StreamImport] 开始导入: ${filePath}, 格式: ${formatFeature.name}`)
// 初始化性能日志(实时写入文件)
resetPerfLog()
const sessionId = generateSessionId()
initPerfLog(sessionId)
logPerf('开始导入', 0)
// 预处理:如果格式需要且文件较大,先精简 // 预处理:如果格式需要且文件较大,先精简
let actualFilePath = filePath let actualFilePath = filePath
let tempFilePath: string | null = null let tempFilePath: string | null = null
@@ -158,8 +242,7 @@ export async function streamImport(filePath: string, requestId: string): Promise
} }
} }
const sessionId = generateSessionId() const db = createDatabaseWithoutIndexes(sessionId)
const db = createDatabase(sessionId)
// 准备语句 // 准备语句
const insertMeta = db.prepare(` const insertMeta = db.prepare(`
@@ -176,19 +259,79 @@ export async function streamImport(filePath: string, requestId: string): Promise
INSERT INTO member_name_history (member_id, name, start_ts, end_ts) VALUES (?, ?, ?, ?) INSERT INTO member_name_history (member_id, name, start_ts, end_ts) VALUES (?, ?, ?, ?)
`) `)
const updateMemberName = db.prepare(`UPDATE member SET name = ? WHERE platform_id = ?`) const updateMemberName = db.prepare(`UPDATE member SET name = ? WHERE platform_id = ?`)
const updateNameHistoryEndTs = db.prepare(`
UPDATE member_name_history SET end_ts = ? WHERE member_id = ? AND end_ts IS NULL
`)
// 成员ID映射(platformId -> dbId // 成员ID映射(platformId -> dbId
const memberIdMap = new Map<string, number>() const memberIdMap = new Map<string, number>()
// 昵称追踪器 // 昵称追踪器(收集所有变化,最后批量写入)
const nicknameTracker = new Map<string, { currentName: string; lastSeenTs: number }>() const nicknameTracker = new Map<
string,
{
currentName: string
lastSeenTs: number
history: Array<{ name: string; startTs: number }>
}
>()
// 是否已插入 meta // 是否已插入 meta
let metaInserted = false let metaInserted = false
// 开始事务(整个导入作为一个大事务,提高性能 // 分批提交配置(每 50000 条消息提交一次
db.exec('BEGIN TRANSACTION') const BATCH_COMMIT_SIZE = 50000
// WAL checkpoint 间隔(每 200000 条执行一次 checkpoint
const CHECKPOINT_INTERVAL = 200000
let messageCountInBatch = 0
let totalMessageCount = 0
let lastCheckpointCount = 0
let inTransaction = false
// 开始第一个事务
const beginTransaction = () => {
if (!inTransaction) {
db.exec('BEGIN TRANSACTION')
inTransaction = true
}
}
// 执行 WAL checkpoint(将 WAL 日志合并到主数据库)
const doCheckpoint = () => {
try {
db.pragma('wal_checkpoint(TRUNCATE)')
console.log(`[StreamImport] WAL checkpoint 完成,累计 ${totalMessageCount}`)
} catch (e) {
console.warn('[StreamImport] WAL checkpoint 失败:', e)
}
}
// 提交当前事务并开始新事务
const commitAndBeginNew = () => {
if (inTransaction) {
db.exec('COMMIT')
inTransaction = false
// 记录性能日志
logPerf(`提交事务`, totalMessageCount, BATCH_COMMIT_SIZE)
// 定期执行 WAL checkpoint(防止 WAL 文件过大导致变慢)
if (totalMessageCount - lastCheckpointCount >= CHECKPOINT_INTERVAL) {
doCheckpoint()
logPerf('WAL checkpoint', totalMessageCount)
lastCheckpointCount = totalMessageCount
}
console.log(`[StreamImport] 已提交事务,累计 ${totalMessageCount} 条消息`)
// 发送写入进度
sendProgress(requestId, {
stage: 'importing',
bytesRead: 0,
totalBytes: 0,
messagesProcessed: totalMessageCount,
percentage: 100,
message: `正在写入数据库... 已处理 ${totalMessageCount.toLocaleString()}`,
})
}
beginTransaction()
}
beginTransaction()
try { try {
await streamParseFile(actualFilePath, { await streamParseFile(actualFilePath, {
@@ -219,22 +362,29 @@ export async function streamImport(filePath: string, requestId: string): Promise
}, },
onMessageBatch: (messages: ParsedMessage[]) => { onMessageBatch: (messages: ParsedMessage[]) => {
// 分阶段计时
let memberLookupTime = 0
let memberInsertTime = 0
let messageInsertTime = 0
let nicknameTrackTime = 0
let memberLookupCount = 0
let memberInsertCount = 0
let nicknameChangeCount = 0
for (const msg of messages) { for (const msg of messages) {
// 数据验证:跳过无效消息 // 数据验证:跳过无效消息
if (!msg.senderPlatformId || !msg.senderName) { if (!msg.senderPlatformId || !msg.senderName) {
console.warn('[StreamImport] 跳过无效消息:缺少发送者信息')
continue continue
} }
if (msg.timestamp === undefined || msg.timestamp === null || isNaN(msg.timestamp)) { if (msg.timestamp === undefined || msg.timestamp === null || isNaN(msg.timestamp)) {
console.warn('[StreamImport] 跳过无效消息:缺少时间戳')
continue continue
} }
if (msg.type === undefined || msg.type === null) { if (msg.type === undefined || msg.type === null) {
console.warn('[StreamImport] 跳过无效消息:缺少消息类型')
continue continue
} }
// 确保成员存在 // 确保成员存在
let t0 = Date.now()
if (!memberIdMap.has(msg.senderPlatformId)) { if (!memberIdMap.has(msg.senderPlatformId)) {
const memberName = msg.senderName || msg.senderPlatformId const memberName = msg.senderName || msg.senderPlatformId
insertMember.run(msg.senderPlatformId, memberName, null) insertMember.run(msg.senderPlatformId, memberName, null)
@@ -242,48 +392,146 @@ export async function streamImport(filePath: string, requestId: string): Promise
if (row) { if (row) {
memberIdMap.set(msg.senderPlatformId, row.id) memberIdMap.set(msg.senderPlatformId, row.id)
} }
memberInsertCount++
memberInsertTime += Date.now() - t0
} else {
memberLookupCount++
memberLookupTime += Date.now() - t0
} }
const senderId = memberIdMap.get(msg.senderPlatformId) const senderId = memberIdMap.get(msg.senderPlatformId)
if (senderId === undefined) continue if (senderId === undefined) continue
// 插入消息 // 插入消息
t0 = Date.now()
insertMessage.run(senderId, msg.timestamp, msg.type, msg.content) insertMessage.run(senderId, msg.timestamp, msg.type, msg.content)
messageInsertTime += Date.now() - t0
messageCountInBatch++
totalMessageCount++
// 追踪昵称变化 // 追踪昵称变化(仅记录,不写入数据库,最后批量处理)
t0 = Date.now()
const senderName = msg.senderName || msg.senderPlatformId const senderName = msg.senderName || msg.senderPlatformId
const tracker = nicknameTracker.get(msg.senderPlatformId) const tracker = nicknameTracker.get(msg.senderPlatformId)
if (!tracker) { if (!tracker) {
nicknameTracker.set(msg.senderPlatformId, { nicknameTracker.set(msg.senderPlatformId, {
currentName: senderName, currentName: senderName,
lastSeenTs: msg.timestamp, lastSeenTs: msg.timestamp,
history: [{ name: senderName, startTs: msg.timestamp }],
}) })
insertNameHistory.run(senderId, senderName, msg.timestamp, null) nicknameChangeCount++
} else if (tracker.currentName !== senderName) { } else if (tracker.currentName !== senderName) {
updateNameHistoryEndTs.run(msg.timestamp, senderId) // 记录昵称变化(稍后批量写入)
insertNameHistory.run(senderId, senderName, msg.timestamp, null) tracker.history.push({ name: senderName, startTs: msg.timestamp })
tracker.currentName = senderName tracker.currentName = senderName
tracker.lastSeenTs = msg.timestamp tracker.lastSeenTs = msg.timestamp
nicknameChangeCount++
} else { } else {
tracker.lastSeenTs = msg.timestamp tracker.lastSeenTs = msg.timestamp
} }
nicknameTrackTime += Date.now() - t0
// 分批提交(每 50000 条)
if (messageCountInBatch >= BATCH_COMMIT_SIZE) {
// 记录详细分阶段耗时
const detail =
`[详细] 成员查找: ${memberLookupTime}ms (${memberLookupCount}次) | ` +
`成员插入: ${memberInsertTime}ms (${memberInsertCount}次) | ` +
`消息插入: ${messageInsertTime}ms | ` +
`昵称追踪: ${nicknameTrackTime}ms (变化${nicknameChangeCount}次)`
logPerfDetail(detail)
commitAndBeginNew()
messageCountInBatch = 0
// 重置计时
memberLookupTime = 0
memberInsertTime = 0
messageInsertTime = 0
nicknameTrackTime = 0
memberLookupCount = 0
memberInsertCount = 0
nicknameChangeCount = 0
}
} }
}, },
}) })
// 更新成员的最新昵称 // 提交最后的消息事务
for (const [platformId, tracker] of nicknameTracker.entries()) { if (inTransaction) {
updateMemberName.run(tracker.currentName, platformId) db.exec('COMMIT')
inTransaction = false
} }
// 提交事务 // 批量写入昵称历史(在索引创建前,写入速度更快)
db.exec('COMMIT') sendProgress(requestId, {
stage: 'importing',
bytesRead: 0,
totalBytes: 0,
messagesProcessed: totalMessageCount,
percentage: 100,
message: '正在写入昵称历史...',
})
logPerf('开始写入昵称历史', totalMessageCount)
console.log(`[StreamImport] 导入完成: ${sessionId}`) // 开始新事务
db.exec('BEGIN TRANSACTION')
let historyCount = 0
for (const [platformId, tracker] of nicknameTracker.entries()) {
const senderId = memberIdMap.get(platformId)
if (!senderId) continue
// 写入所有昵称历史
for (let i = 0; i < tracker.history.length; i++) {
const h = tracker.history[i]
const endTs = i < tracker.history.length - 1 ? tracker.history[i + 1].startTs : null
insertNameHistory.run(senderId, h.name, h.startTs, endTs)
historyCount++
}
// 更新成员最新昵称
updateMemberName.run(tracker.currentName, platformId)
}
db.exec('COMMIT')
logPerf(`昵称历史写入完成 (${historyCount}条)`, totalMessageCount)
// 创建索引(导入完成后批量创建,比边导入边更新快很多)
sendProgress(requestId, {
stage: 'importing',
bytesRead: 0,
totalBytes: 0,
messagesProcessed: totalMessageCount,
percentage: 100,
message: '正在创建索引...',
})
logPerf('开始创建索引', totalMessageCount)
createIndexes(db)
logPerf('索引创建完成', totalMessageCount)
// 最终 WAL checkpoint
sendProgress(requestId, {
stage: 'importing',
bytesRead: 0,
totalBytes: 0,
messagesProcessed: totalMessageCount,
percentage: 100,
message: '正在优化数据库...',
})
doCheckpoint()
logPerf('WAL checkpoint 完成', totalMessageCount)
logPerf('导入完成', totalMessageCount)
console.log(`[StreamImport] 导入完成: ${sessionId}, 总消息数: ${totalMessageCount}`)
return { success: true, sessionId } return { success: true, sessionId }
} catch (error) { } catch (error) {
// 回滚事务 // 回滚当前事务
db.exec('ROLLBACK') if (inTransaction) {
try {
db.exec('ROLLBACK')
} catch {
// 忽略回滚错误
}
}
// 删除失败的数据库文件 // 删除失败的数据库文件
const dbPath = getDbPath(sessionId) const dbPath = getDbPath(sessionId)
@@ -316,17 +564,13 @@ export interface StreamParseFileInfoResult {
messageCount: number messageCount: number
memberCount: number memberCount: number
fileSize: number fileSize: number
// 完整解析结果(用于后续合并,避免重复解析 // 临时数据库路径(用于后续合并,避免内存溢出
parseResult: { tempDbPath: string
meta: ParsedMeta
members: ParsedMember[]
messages: ParsedMessage[]
}
} }
/** /**
* 流式解析文件获取基本信息和完整解析结果 * 流式解析文件,写入临时数据库
* 用于合并功能的预览,同时缓存完整结果供后续合并使用 * 用于合并功能:解析结果存入临时 SQLite,避免内存溢出
*/ */
export async function streamParseFileInfo(filePath: string, requestId: string): Promise<StreamParseFileInfoResult> { export async function streamParseFileInfo(filePath: string, requestId: string): Promise<StreamParseFileInfoResult> {
const formatFeature = detectFormat(filePath) const formatFeature = detectFormat(filePath)
@@ -347,51 +591,95 @@ export async function streamParseFileInfo(filePath: string, requestId: string):
message: '正在读取文件...', message: '正在读取文件...',
}) })
// 创建临时数据库
const tempDbPath = generateTempDbPath(filePath)
const db = createTempDatabase(tempDbPath)
// 准备语句
const insertMeta = db.prepare('INSERT INTO meta (name, platform, type) VALUES (?, ?, ?)')
const insertMember = db.prepare('INSERT OR IGNORE INTO member (platform_id, name, nickname) VALUES (?, ?, ?)')
const insertMessage = db.prepare(`
INSERT INTO message (sender_platform_id, sender_name, timestamp, type, content)
VALUES (?, ?, ?, ?, ?)
`)
let meta: ParsedMeta = { name: '未知群聊', platform: formatFeature.platform, type: 'group' } let meta: ParsedMeta = { name: '未知群聊', platform: formatFeature.platform, type: 'group' }
const members: ParsedMember[] = []
const messages: ParsedMessage[] = []
const memberSet = new Set<string>() const memberSet = new Set<string>()
let messageCount = 0
let metaInserted = false
await streamParseFile(filePath, { // 开始事务
// 对于大文件使用更小的批次,以更频繁地更新进度 db.exec('BEGIN TRANSACTION')
batchSize: fileSize > 100 * 1024 * 1024 ? 2000 : 5000,
onProgress: (progress) => { try {
sendProgress(requestId, progress) await streamParseFile(filePath, {
}, // 对于大文件使用更小的批次,以更频繁地更新进度
batchSize: fileSize > 100 * 1024 * 1024 ? 2000 : 5000,
onMeta: (parsedMeta) => { onProgress: (progress) => {
meta = parsedMeta sendProgress(requestId, progress)
}, },
onMembers: (parsedMembers) => { onMeta: (parsedMeta) => {
for (const m of parsedMembers) { meta = parsedMeta
if (!memberSet.has(m.platformId)) { if (!metaInserted) {
memberSet.add(m.platformId) insertMeta.run(parsedMeta.name, parsedMeta.platform, parsedMeta.type)
members.push(m) metaInserted = true
} }
} },
},
onMessageBatch: (batch) => { onMembers: (parsedMembers) => {
messages.push(...batch) for (const m of parsedMembers) {
for (const msg of batch) { if (!memberSet.has(m.platformId)) {
memberSet.add(msg.senderPlatformId) memberSet.add(m.platformId)
} insertMember.run(m.platformId, m.name, m.nickname || null)
}, }
}) }
},
return { onMessageBatch: (batch) => {
name: meta.name, for (const msg of batch) {
format: formatFeature.name, // 确保成员存在
platform: meta.platform, if (!memberSet.has(msg.senderPlatformId)) {
messageCount: messages.length, memberSet.add(msg.senderPlatformId)
memberCount: memberSet.size, insertMember.run(msg.senderPlatformId, msg.senderName, null)
fileSize, }
parseResult: {
meta, insertMessage.run(msg.senderPlatformId, msg.senderName, msg.timestamp, msg.type, msg.content || null)
members, messageCount++
messages, }
}, },
})
// 提交事务
db.exec('COMMIT')
db.close()
console.log(`[StreamImport] 已写入临时数据库: ${tempDbPath}, 消息数: ${messageCount}`)
return {
name: meta.name,
format: formatFeature.name,
platform: meta.platform,
messageCount,
memberCount: memberSet.size,
fileSize,
tempDbPath,
}
} catch (error) {
// 回滚并清理
try {
db.exec('ROLLBACK')
} catch {
// 忽略回滚错误
}
db.close()
// 删除失败的临时数据库
if (fs.existsSync(tempDbPath)) {
fs.unlinkSync(tempDbPath)
}
throw error
} }
} }
+3 -12
View File
@@ -314,7 +314,8 @@ export async function parseFileInfo(filePath: string): Promise<any> {
} }
/** /**
* 流式解析文件获取基本信息和完整解析结果(用于合并预览 * 流式解析文件,写入临时数据库(用于合并功能
* 返回基本信息和临时数据库路径
*/ */
export async function streamParseFileInfo( export async function streamParseFileInfo(
filePath: string, filePath: string,
@@ -326,17 +327,7 @@ export async function streamParseFileInfo(
messageCount: number messageCount: number
memberCount: number memberCount: number
fileSize: number fileSize: number
parseResult: { tempDbPath: string
meta: { name: string; platform: string; type: string }
members: Array<{ platformId: string; name: string; nickname?: string }>
messages: Array<{
senderPlatformId: string
senderName: string
timestamp: number
type: number
content?: string
}>
}
}> { }> {
return sendToWorkerWithProgress('streamParseFileInfo', { filePath }, onProgress) return sendToWorkerWithProgress('streamParseFileInfo', { filePath }, onProgress)
} }
+65 -2
View File
@@ -48,6 +48,10 @@ const mergeProgress = ref(0)
const currentStep = ref<'select' | 'conflict' | 'done'>('select') const currentStep = ref<'select' | 'conflict' | 'done'>('select')
const outputFilePath = ref('') const outputFilePath = ref('')
// 分页相关
const currentPage = ref(1)
const pageSize = 20
// 解析进度监听 // 解析进度监听
let unsubscribeProgress: (() => void) | null = null let unsubscribeProgress: (() => void) | null = null
@@ -198,6 +202,7 @@ async function doMerge() {
if (checkResult.conflicts.length > 0) { if (checkResult.conflicts.length > 0) {
conflicts.value = checkResult.conflicts conflicts.value = checkResult.conflicts
currentPage.value = 1 // 重置分页
currentStep.value = 'conflict' currentStep.value = 'conflict'
isMerging.value = false isMerging.value = false
return return
@@ -305,6 +310,20 @@ function getStatusColor(status: FileInfo['status']): string {
// 计算已解决的冲突数 // 计算已解决的冲突数
const resolvedCount = computed(() => conflicts.value.filter((c) => c.resolution).length) const resolvedCount = computed(() => conflicts.value.filter((c) => c.resolution).length)
// 分页相关计算属性
const totalPages = computed(() => Math.ceil(conflicts.value.length / pageSize))
const paginatedConflicts = computed(() => {
const start = (currentPage.value - 1) * pageSize
return conflicts.value.slice(start, start + pageSize)
})
// 分页导航
function goToPage(page: number) {
if (page >= 1 && page <= totalPages.value) {
currentPage.value = page
}
}
// 批量选择所有冲突 // 批量选择所有冲突
function batchSelectAll(resolution: 'keep1' | 'keep2' | 'keepBoth') { function batchSelectAll(resolution: 'keep1' | 'keep2' | 'keepBoth') {
for (const conflict of conflicts.value) { for (const conflict of conflicts.value) {
@@ -493,14 +512,14 @@ const file2Name = computed(() => files.value[1]?.name || '文件 2')
<!-- 冲突列表 --> <!-- 冲突列表 -->
<div class="max-h-[400px] divide-y divide-gray-200 overflow-y-auto dark:divide-gray-800"> <div class="max-h-[400px] divide-y divide-gray-200 overflow-y-auto dark:divide-gray-800">
<div v-for="(conflict, index) in conflicts" :key="conflict.id" class="p-4"> <div v-for="(conflict, index) in paginatedConflicts" :key="conflict.id" class="p-4">
<!-- 冲突信息 --> <!-- 冲突信息 -->
<div class="mb-3 flex items-center justify-between"> <div class="mb-3 flex items-center justify-between">
<div class="flex items-center gap-2"> <div class="flex items-center gap-2">
<span <span
class="flex h-6 w-6 items-center justify-center rounded-full bg-gray-200 text-xs font-medium dark:bg-gray-700" class="flex h-6 w-6 items-center justify-center rounded-full bg-gray-200 text-xs font-medium dark:bg-gray-700"
> >
{{ index + 1 }} {{ (currentPage - 1) * pageSize + index + 1 }}
</span> </span>
<span class="text-sm text-gray-600 dark:text-gray-400">{{ conflict.sender }}</span> <span class="text-sm text-gray-600 dark:text-gray-400">{{ conflict.sender }}</span>
</div> </div>
@@ -564,6 +583,50 @@ const file2Name = computed(() => files.value[1]?.name || '文件 2')
</div> </div>
</div> </div>
<!-- 分页控件仅当冲突数 > 20 时显示 -->
<div
v-if="totalPages > 1"
class="flex items-center justify-center gap-2 border-t border-gray-200 bg-gray-50 px-5 py-3 dark:border-gray-800 dark:bg-gray-800/50"
>
<UButton
size="xs"
color="gray"
variant="ghost"
icon="i-heroicons-chevron-left"
:disabled="currentPage === 1"
@click="goToPage(currentPage - 1)"
/>
<div class="flex items-center gap-1">
<template v-for="page in totalPages" :key="page">
<UButton
v-if="page === 1 || page === totalPages || Math.abs(page - currentPage) <= 1"
size="xs"
:color="page === currentPage ? 'primary' : 'gray'"
:variant="page === currentPage ? 'soft' : 'ghost'"
@click="goToPage(page)"
>
{{ page }}
</UButton>
<span v-else-if="page === 2 && currentPage > 3" class="px-1 text-xs text-gray-400">...</span>
<span
v-else-if="page === totalPages - 1 && currentPage < totalPages - 2"
class="px-1 text-xs text-gray-400"
>
...
</span>
</template>
</div>
<UButton
size="xs"
color="gray"
variant="ghost"
icon="i-heroicons-chevron-right"
:disabled="currentPage === totalPages"
@click="goToPage(currentPage + 1)"
/>
<span class="ml-2 text-xs text-gray-500"> {{ currentPage }} / {{ totalPages }} </span>
</div>
<!-- 底部操作 --> <!-- 底部操作 -->
<div class="flex items-center justify-between border-t border-gray-200 px-5 py-4 dark:border-gray-800"> <div class="flex items-center justify-between border-t border-gray-200 px-5 py-4 dark:border-gray-800">
<UButton color="gray" variant="ghost" @click="currentStep = 'select'"> <UButton color="gray" variant="ghost" @click="currentStep = 'select'">