diff --git a/electron/main/ipcMain.ts b/electron/main/ipcMain.ts index 596b8c1..6a5fe59 100644 --- a/electron/main/ipcMain.ts +++ b/electron/main/ipcMain.ts @@ -4,10 +4,11 @@ import * as fs from 'fs/promises' // 导入数据库核心模块(用于导入和删除操作) import * as databaseCore from './database/core' -// 导入 Worker 模块(用于异步分析查询) +// 导入 Worker 模块(用于异步分析查询和流式导入) import * as worker from './worker' // 导入解析器模块 import * as parser from './parser' +import { detectFormat, type ParseProgress } from './parser' // 导入合并模块 import * as merger from './merger' import type { MergeParams } from '../../src/types/chat' @@ -152,8 +153,9 @@ const mainIpcMain = (win: BrowserWindow) => { const filePath = filePaths[0] console.log('[IpcMain] File selected:', filePath) - // 检测文件格式 - const format = parser.detectFormat(filePath) + // 检测文件格式(使用流式检测,只读取文件开头) + const formatFeature = detectFormat(filePath) + const format = formatFeature?.name || null console.log('[IpcMain] Detected format:', format) if (!format) { return { error: '无法识别的文件格式' } @@ -167,47 +169,44 @@ const mainIpcMain = (win: BrowserWindow) => { }) /** - * 导入聊天记录 + * 导入聊天记录(流式版本) */ ipcMain.handle('chat:import', async (_, filePath: string) => { console.log('[IpcMain] chat:import called with:', filePath) try { - // 发送进度:开始解析 + // 发送进度:开始检测格式 win.webContents.send('chat:importProgress', { - stage: 'parsing', - progress: 10, - message: '正在解析文件...', + stage: 'detecting', + progress: 5, + message: '正在检测文件格式...', }) - console.log('[IpcMain] Parsing file...') - // 解析文件 - const parseResult = parser.parseFile(filePath) - console.log('[IpcMain] Parse result:', { - memberCount: parseResult.members.length, - messageCount: parseResult.messages.length, + // 使用流式导入(在 Worker 线程中执行) + const result = await worker.streamImport(filePath, (progress: ParseProgress) => { + // 转发进度到渲染进程 + win.webContents.send('chat:importProgress', { + stage: progress.stage, + progress: progress.percentage, + message: progress.message, + bytesRead: progress.bytesRead, + totalBytes: progress.totalBytes, + messagesProcessed: progress.messagesProcessed, + }) }) - // 发送进度:开始保存 - win.webContents.send('chat:importProgress', { - stage: 'saving', - progress: 50, - message: `正在保存 ${parseResult.messages.length} 条消息...`, - }) - - console.log('[IpcMain] Importing to database...') - // 导入到数据库(使用核心模块,同步操作) - const sessionId = databaseCore.importData(parseResult) - console.log('[IpcMain] Import successful, sessionId:', sessionId) - - // 发送进度:完成 - win.webContents.send('chat:importProgress', { - stage: 'done', - progress: 100, - message: '导入完成', - }) - - return { success: true, sessionId } + if (result.success) { + console.log('[IpcMain] Stream import successful, sessionId:', result.sessionId) + return { success: true, sessionId: result.sessionId } + } else { + console.error('[IpcMain] Stream import failed:', result.error) + win.webContents.send('chat:importProgress', { + stage: 'error', + progress: 0, + message: result.error, + }) + return { success: false, error: result.error } + } } catch (error) { console.error('[IpcMain] Import failed:', error) @@ -583,12 +582,18 @@ const mainIpcMain = (win: BrowserWindow) => { /** * 解析文件获取基本信息(用于合并预览) - * 使用 Worker 线程异步执行,不阻塞主进程 + * 使用流式解析,支持大文件 */ ipcMain.handle('merge:parseFileInfo', async (_, filePath: string) => { try { - // 使用 Worker 线程解析,避免阻塞 UI - return await worker.parseFileInfo(filePath) + // 使用流式解析,避免大文件 OOM + return await worker.streamParseFileInfo(filePath, (progress: ParseProgress) => { + // 可选:发送进度到渲染进程 + win.webContents.send('merge:parseProgress', { + filePath, + progress, + }) + }) } catch (error) { console.error('解析文件信息失败:', error) throw error diff --git a/electron/main/merger/index.ts b/electron/main/merger/index.ts index 85f86dc..d7499dc 100644 --- a/electron/main/merger/index.ts +++ b/electron/main/merger/index.ts @@ -6,7 +6,7 @@ import * as fs from 'fs' import * as path from 'path' import { app } from 'electron' -import { parseFile, detectFormat } from '../parser' +import { parseFileSync, detectFormat } from '../parser' import { importData } from '../database/core' import type { ParseResult, @@ -57,18 +57,19 @@ function generateOutputFilename(name: string): string { /** * 解析文件获取基本信息(用于预览) + * 注意:推荐使用 parser.parseFileInfo 获取更详细的信息 */ -export function parseFileInfo(filePath: string): FileParseInfo { +export async function parseFileInfo(filePath: string): Promise { const format = detectFormat(filePath) if (!format) { throw new Error('无法识别文件格式') } - const result = parseFile(filePath) + const result = await parseFileSync(filePath) return { name: result.meta.name, - format, + format: format.name, platform: result.meta.platform, messageCount: result.messages.length, memberCount: result.members.length, @@ -86,7 +87,7 @@ function getMessageKey(msg: ParsedMessage): string { * 检测合并冲突 * 规则:时间戳 + 用户名 + 字符长度,当两项相同但另一项不同时报告冲突 */ -export function checkConflicts(filePaths: string[]): ConflictCheckResult { +export async function checkConflicts(filePaths: string[]): Promise { const allMessages: Array<{ msg: ParsedMessage; source: string }> = [] const conflicts: MergeConflict[] = [] @@ -101,7 +102,7 @@ export function checkConflicts(filePaths: string[]): ConflictCheckResult { for (const filePath of filePaths) { const format = detectFormat(filePath) if (format) { - formats.push(format) + formats.push(format.name) } else { throw new Error(`无法识别文件格式: ${path.basename(filePath)}`) } @@ -118,7 +119,7 @@ export function checkConflicts(filePaths: string[]): ConflictCheckResult { // 解析所有文件 for (const filePath of filePaths) { - const result = parseFile(filePath) + const result = await parseFileSync(filePath) const sourceName = path.basename(filePath) console.log(`[Merger] 解析 ${sourceName}: ${result.messages.length} 条消息`) for (const msg of result.messages) { @@ -241,14 +242,14 @@ export function checkConflicts(filePaths: string[]): ConflictCheckResult { /** * 合并多个聊天记录文件 */ -export function mergeFiles(params: MergeParams): MergeResult { +export async function mergeFiles(params: MergeParams): Promise { try { const { filePaths, outputName, outputDir, conflictResolutions, andAnalyze } = params // 解析所有文件 const parseResults: Array<{ result: ParseResult; source: string }> = [] for (const filePath of filePaths) { - const result = parseFile(filePath) + const result = await parseFileSync(filePath) parseResults.push({ result, source: path.basename(filePath) }) } diff --git a/electron/main/parser/chatlabJsonParser.ts b/electron/main/parser/chatlabJsonParser.ts deleted file mode 100644 index d56ae59..0000000 --- a/electron/main/parser/chatlabJsonParser.ts +++ /dev/null @@ -1,84 +0,0 @@ -/** - * ChatLab 专属 JSON 格式解析器 - * 支持 ChatLab 工具导出的统一格式 - */ - -import type { ChatParser } from './types' -import { - ChatPlatform, - ChatType, - type ParseResult, - type ParsedMember, - type ParsedMessage, - type ChatLabFormat, -} from '../../../src/types/chat' - -/** - * ChatLab JSON 格式解析器 - */ -export const chatlabJsonParser: ChatParser = { - name: 'ChatLab JSON', - platform: 'chatlab', - - detect(content: string, filename: string): boolean { - // 检查文件扩展名 - if (!filename.toLowerCase().endsWith('.json') && !filename.toLowerCase().endsWith('.chatlab.json')) { - return false - } - - try { - const data = JSON.parse(content) - // 检查是否有 ChatLab 格式特征 - return ( - data.chatlab && - typeof data.chatlab.version === 'string' && - data.meta && - Array.isArray(data.members) && - Array.isArray(data.messages) - ) - } catch { - return false - } - }, - - parse(content: string, _filename: string): ParseResult { - let data: ChatLabFormat - try { - data = JSON.parse(content) - } catch (e) { - throw new Error(`JSON 解析失败: ${e}`) - } - - if (!data.chatlab || !data.meta || !Array.isArray(data.messages)) { - throw new Error('无效的 ChatLab JSON 格式') - } - - // 解析元信息 - const meta = { - name: data.meta.name, - platform: (data.meta.platform as ChatPlatform) || ChatPlatform.UNKNOWN, - type: (data.meta.type as ChatType) || ChatType.GROUP, - } - - // 解析成员 - const members: ParsedMember[] = data.members.map((m) => ({ - platformId: m.platformId, - name: m.name, - })) - - // 解析消息 - const messages: ParsedMessage[] = data.messages.map((msg) => ({ - senderPlatformId: msg.sender, - senderName: msg.name, - timestamp: msg.timestamp, - type: msg.type, - content: msg.content, - })) - - return { - meta, - members, - messages, - } - }, -} diff --git a/electron/main/parser/formats/chatlab.ts b/electron/main/parser/formats/chatlab.ts new file mode 100644 index 0000000..f498189 --- /dev/null +++ b/electron/main/parser/formats/chatlab.ts @@ -0,0 +1,220 @@ +/** + * ChatLab JSON 格式 + * ChatLab 专属的统一格式(.chatlab.json) + * + * 特征: + * - 文件头包含 "chatlab" 字段 + * - 有 version 版本号 + * - 消息结构已经是标准化的 + */ + +import * as fs from 'fs' +import { parser } from 'stream-json' +import { pick } from 'stream-json/filters/Pick' +import { streamValues } from 'stream-json/streamers/StreamValues' +import { chain } from 'stream-chain' +import { ChatPlatform, ChatType } from '../../../../src/types/chat' +import type { + FormatFeature, + FormatModule, + Parser, + ParseOptions, + ParseEvent, + ParsedMeta, + ParsedMember, + ParsedMessage, +} from '../types' +import { getFileSize, createProgress, readFileHeadBytes } from '../utils' + +// ==================== 特征定义 ==================== + +export const feature: FormatFeature = { + id: 'chatlab', + name: 'ChatLab JSON', + platform: ChatPlatform.UNKNOWN, // ChatLab 格式可能包含多平台数据 + priority: 1, // 最高优先级 + extensions: ['.json', '.chatlab.json'], + signatures: { + head: [/"chatlab"\s*:\s*\{/, /"version"\s*:\s*"/], + requiredFields: ['chatlab', 'meta', 'messages'], + }, +} + +// ==================== 消息结构 ==================== + +interface ChatLabMessage { + sender: string // platformId + name: string // 发送时昵称 + timestamp: number // 秒级时间戳 + type: number // MessageType + content: string | null +} + +interface ChatLabMember { + platformId: string + name: string + aliases?: string[] +} + +// ==================== 解析器实现 ==================== + +async function* parseChatLab(options: ParseOptions): AsyncGenerator { + const { filePath, batchSize = 5000, onProgress } = options + + const totalBytes = getFileSize(filePath) + let bytesRead = 0 + let messagesProcessed = 0 + + // 发送初始进度 + const initialProgress = createProgress('parsing', 0, totalBytes, 0, '开始解析...') + yield { type: 'progress', data: initialProgress } + onProgress?.(initialProgress) + + // 读取文件头获取 meta 和 members 信息 + const headContent = readFileHeadBytes(filePath, 200000) + + // 解析 meta + let meta: ParsedMeta = { + name: '未知群聊', + platform: ChatPlatform.UNKNOWN, + type: ChatType.GROUP, + } + try { + const metaMatch = headContent.match(/"meta"\s*:\s*(\{[^}]+\})/) + if (metaMatch) { + const metaObj = JSON.parse(metaMatch[1]) + meta = { + name: metaObj.name || '未知群聊', + platform: (metaObj.platform as ChatPlatform) || ChatPlatform.UNKNOWN, + type: (metaObj.type as ChatType) || ChatType.GROUP, + } + } + } catch { + // 使用默认值 + } + yield { type: 'meta', data: meta } + + // 解析 members(如果在文件开头能找到) + const members: ParsedMember[] = [] + try { + const membersMatch = headContent.match(/"members"\s*:\s*\[([\s\S]*?)\]/) + if (membersMatch) { + const membersJson = JSON.parse(`[${membersMatch[1]}]`) as ChatLabMember[] + for (const m of membersJson) { + members.push({ + platformId: m.platformId, + name: m.name, + }) + } + } + } catch { + // members 可能太大,稍后从消息中收集 + } + + // 收集成员和消息 + const memberMapFromMessages = new Map() + let messageBatch: ParsedMessage[] = [] + + // 流式解析 + await new Promise((resolve, reject) => { + const readStream = fs.createReadStream(filePath, { encoding: 'utf-8' }) + + readStream.on('data', (chunk: string | Buffer) => { + bytesRead += typeof chunk === 'string' ? Buffer.byteLength(chunk) : chunk.length + }) + + const pipeline = chain([readStream, parser(), pick({ filter: /^messages\.\d+$/ }), streamValues()]) + + // 用于收集批次的临时数组 + const batchCollector: ParsedMessage[] = [] + + pipeline.on('data', ({ value }: { value: ChatLabMessage }) => { + const msg = value + + // 如果前面没解析到 members,从消息中收集 + if (members.length === 0) { + memberMapFromMessages.set(msg.sender, { + platformId: msg.sender, + name: msg.name, + }) + } + + batchCollector.push({ + senderPlatformId: msg.sender, + senderName: msg.name, + timestamp: msg.timestamp, + type: msg.type, + content: msg.content, + }) + + messagesProcessed++ + + // 达到批次大小 + if (batchCollector.length >= batchSize) { + messageBatch.push(...batchCollector) + batchCollector.length = 0 + + const progress = createProgress( + 'parsing', + bytesRead, + totalBytes, + messagesProcessed, + `已处理 ${messagesProcessed} 条消息...` + ) + onProgress?.(progress) + } + }) + + pipeline.on('end', () => { + // 收集剩余消息 + if (batchCollector.length > 0) { + messageBatch.push(...batchCollector) + } + resolve() + }) + + pipeline.on('error', reject) + }) + + // 发送成员 + if (members.length > 0) { + yield { type: 'members', data: members } + } else if (memberMapFromMessages.size > 0) { + yield { type: 'members', data: Array.from(memberMapFromMessages.values()) } + } + + // 分批发送消息 + for (let i = 0; i < messageBatch.length; i += batchSize) { + const batch = messageBatch.slice(i, i + batchSize) + yield { type: 'messages', data: batch } + } + + // 完成 + const doneProgress = createProgress('done', totalBytes, totalBytes, messagesProcessed, '解析完成') + yield { type: 'progress', data: doneProgress } + onProgress?.(doneProgress) + + yield { + type: 'done', + data: { + messageCount: messagesProcessed, + memberCount: members.length > 0 ? members.length : memberMapFromMessages.size, + }, + } +} + +// ==================== 导出解析器 ==================== + +export const parser_: Parser = { + feature, + parse: parseChatLab, +} + +// ==================== 导出格式模块 ==================== + +const module_: FormatModule = { + feature, + parser: parser_, +} + +export default module_ diff --git a/electron/main/parser/formats/index.ts b/electron/main/parser/formats/index.ts new file mode 100644 index 0000000..a456289 --- /dev/null +++ b/electron/main/parser/formats/index.ts @@ -0,0 +1,23 @@ +/** + * 格式模块注册 + * 导出所有支持的格式 + */ + +import type { FormatModule } from '../types' + +// 导入所有格式模块 +import chatlab from './chatlab' +import shuakamiQqExporterV4 from './shuakami-qq-exporter-v4' +import shuakamiQqExporterLegacy from './shuakami-qq-exporter-legacy' + +/** + * 所有支持的格式模块(按优先级排序) + */ +export const formats: FormatModule[] = [ + chatlab, // 优先级 1 + shuakamiQqExporterV4, // 优先级 10 - shuakami/qq-chat-exporter V4 + shuakamiQqExporterLegacy, // 优先级 20 - shuakami/qq-chat-exporter Legacy +] + +// 按名称导出,方便单独使用 +export { chatlab, shuakamiQqExporterV4, shuakamiQqExporterLegacy } diff --git a/electron/main/parser/formats/shuakami-qq-exporter-legacy.ts b/electron/main/parser/formats/shuakami-qq-exporter-legacy.ts new file mode 100644 index 0000000..048fc55 --- /dev/null +++ b/electron/main/parser/formats/shuakami-qq-exporter-legacy.ts @@ -0,0 +1,271 @@ +/** + * shuakami/qq-chat-exporter Legacy 格式解析器 + * 适配项目: https://github.com/shuakami/qq-chat-exporter + * 版本: V1-V3(早期版本) + * + * 特征: + * - 时间戳使用毫秒数 + * - 没有 metadata.version 或版本号小于 4 + * - sender 中主要使用 uin 字段 + * + * 注意:此解析器仅适配 shuakami/qq-chat-exporter 项目导出的格式, + * 其他 QQ 聊天记录导出工具可能需要创建独立的解析器。 + */ + +import * as fs from 'fs' +import { parser } from 'stream-json' +import { pick } from 'stream-json/filters/Pick' +import { streamValues } from 'stream-json/streamers/StreamValues' +import { chain } from 'stream-chain' +import { ChatPlatform, ChatType, MessageType } from '../../../../src/types/chat' +import type { + FormatFeature, + FormatModule, + Parser, + ParseOptions, + ParseEvent, + ParsedMeta, + ParsedMember, + ParsedMessage, +} from '../types' +import { getFileSize, createProgress, readFileHeadBytes, parseTimestamp, isValidYear } from '../utils' + +// ==================== 特征定义 ==================== + +export const feature: FormatFeature = { + id: 'shuakami-qq-exporter-legacy', + name: 'shuakami/qq-chat-exporter (Legacy)', + platform: ChatPlatform.QQ, + priority: 20, // 低于 V4 + extensions: ['.json'], + signatures: { + head: [/QQChatExporter/, /"chatInfo"/], + requiredFields: ['chatInfo', 'messages'], + }, +} + +// ==================== 消息结构 ==================== + +interface LegacyMessage { + id?: string + timestamp: number // 毫秒时间戳 + sender: { + uid?: string + uin: string + name: string + } + type?: string + system?: boolean + recalled?: boolean + content: { + text: string + html?: string + resources?: Array<{ type: string }> + elements?: Array<{ type: string }> + } +} + +// ==================== 消息类型转换 ==================== + +function convertMessageType(qqType: string | undefined, content: LegacyMessage['content']): MessageType { + // 检查资源类型 + if (content.resources && content.resources.length > 0) { + const resourceType = content.resources[0].type + switch (resourceType) { + case 'image': + return MessageType.IMAGE + case 'video': + return MessageType.VIDEO + case 'voice': + case 'audio': + return MessageType.VOICE + case 'file': + return MessageType.FILE + } + } + + // 检查元素类型 + if (content.elements) { + for (const elem of content.elements) { + if (elem.type === 'market_face' || elem.type === 'face') { + return MessageType.EMOJI + } + } + } + + // 根据 type 字符串判断 + switch (qqType) { + case 'type_1': + return MessageType.TEXT + case 'type_17': + return MessageType.EMOJI + case 'type_3': + return MessageType.IMAGE + case 'type_7': + return MessageType.VOICE + default: + return MessageType.TEXT + } +} + +// ==================== 解析器实现 ==================== + +async function* parseLegacy(options: ParseOptions): AsyncGenerator { + const { filePath, batchSize = 5000, onProgress } = options + + const totalBytes = getFileSize(filePath) + let bytesRead = 0 + let messagesProcessed = 0 + + // 发送初始进度 + const initialProgress = createProgress('parsing', 0, totalBytes, 0, '开始解析...') + yield { type: 'progress', data: initialProgress } + onProgress?.(initialProgress) + + // 读取文件头获取 meta 信息 + const headContent = readFileHeadBytes(filePath, 100000) + + // 解析 chatInfo + let chatInfo = { name: '未知群聊', type: 'group' as const } + try { + const chatInfoMatch = headContent.match(/"chatInfo"\s*:\s*(\{[^}]+\})/) + if (chatInfoMatch) { + chatInfo = JSON.parse(chatInfoMatch[1]) + } + } catch { + // 使用默认值 + } + + // 发送 meta + const meta: ParsedMeta = { + name: chatInfo.name, + platform: ChatPlatform.QQ, + type: chatInfo.type === 'group' ? ChatType.GROUP : ChatType.PRIVATE, + } + yield { type: 'meta', data: meta } + + // 收集成员和消息 + const memberMap = new Map() + let messageBatch: ParsedMessage[] = [] + + // 流式解析 + await new Promise((resolve, reject) => { + const readStream = fs.createReadStream(filePath, { encoding: 'utf-8' }) + + readStream.on('data', (chunk: string | Buffer) => { + bytesRead += typeof chunk === 'string' ? Buffer.byteLength(chunk) : chunk.length + }) + + const pipeline = chain([readStream, parser(), pick({ filter: /^messages\.\d+$/ }), streamValues()]) + + const processMessage = (msg: LegacyMessage): ParsedMessage | null => { + // 获取 platformId + const platformId = msg.sender.uin || msg.sender.uid + if (!platformId) return null + + // 获取发送者名称 + const senderName = msg.sender.name || platformId + + // 更新成员 + memberMap.set(platformId, { platformId, name: senderName }) + + // 解析时间戳(毫秒) + const timestamp = parseTimestamp(msg.timestamp) + if (timestamp === null || !isValidYear(timestamp)) return null + + // 消息类型 + const type = msg.system ? MessageType.SYSTEM : convertMessageType(msg.type, msg.content) + + // 文本内容 + let textContent = msg.content?.text || '' + if (msg.recalled) { + textContent = '[已撤回] ' + textContent + } + + return { + senderPlatformId: platformId, + senderName, + timestamp, + type, + content: textContent || null, + } + } + + // 用于收集批次的临时数组 + const batchCollector: ParsedMessage[] = [] + + pipeline.on('data', ({ value }: { value: LegacyMessage }) => { + const parsed = processMessage(value) + if (parsed) { + batchCollector.push(parsed) + messagesProcessed++ + + // 达到批次大小 + if (batchCollector.length >= batchSize) { + messageBatch.push(...batchCollector) + batchCollector.length = 0 + + const progress = createProgress( + 'parsing', + bytesRead, + totalBytes, + messagesProcessed, + `已处理 ${messagesProcessed} 条消息...` + ) + onProgress?.(progress) + } + } + }) + + pipeline.on('end', () => { + // 收集剩余消息 + if (batchCollector.length > 0) { + messageBatch.push(...batchCollector) + } + resolve() + }) + + pipeline.on('error', reject) + }) + + // 发送成员 + yield { type: 'members', data: Array.from(memberMap.values()) } + + // 分批发送消息 + for (let i = 0; i < messageBatch.length; i += batchSize) { + const batch = messageBatch.slice(i, i + batchSize) + yield { type: 'messages', data: batch } + } + + // 完成 + const doneProgress = createProgress('done', totalBytes, totalBytes, messagesProcessed, '解析完成') + yield { type: 'progress', data: doneProgress } + onProgress?.(doneProgress) + + yield { + type: 'done', + data: { messageCount: messagesProcessed, memberCount: memberMap.size }, + } +} + +// ==================== 导出解析器 ==================== + +export const parser_: Parser = { + feature, + parse: parseLegacy, +} + +// ==================== 导出预处理器 ==================== + +import { qqPreprocessor } from './shuakami-qq-preprocessor' +export const preprocessor = qqPreprocessor + +// ==================== 导出格式模块 ==================== + +const module_: FormatModule = { + feature, + parser: parser_, + preprocessor: qqPreprocessor, +} + +export default module_ diff --git a/electron/main/parser/formats/shuakami-qq-exporter-v4.ts b/electron/main/parser/formats/shuakami-qq-exporter-v4.ts new file mode 100644 index 0000000..fc5b9ab --- /dev/null +++ b/electron/main/parser/formats/shuakami-qq-exporter-v4.ts @@ -0,0 +1,272 @@ +/** + * shuakami/qq-chat-exporter V4 格式解析器 + * 适配项目: https://github.com/shuakami/qq-chat-exporter + * 版本: V4.x + * + * 特征: + * - 时间戳使用 ISO 字符串格式(如 "2017-12-30T03:24:36.000Z") + * - metadata.version 为 "4.x.x" + * - sender 中有 uid 字段 + * + * 注意:此解析器仅适配 shuakami/qq-chat-exporter 项目导出的格式, + * 其他 QQ 聊天记录导出工具可能需要创建独立的解析器。 + */ + +import * as fs from 'fs' +import { parser } from 'stream-json' +import { pick } from 'stream-json/filters/Pick' +import { streamValues } from 'stream-json/streamers/StreamValues' +import { chain } from 'stream-chain' +import { ChatPlatform, ChatType, MessageType } from '../../../../src/types/chat' +import type { + FormatFeature, + FormatModule, + Parser, + ParseOptions, + ParseEvent, + ParsedMeta, + ParsedMember, + ParsedMessage, +} from '../types' +import { getFileSize, createProgress, readFileHeadBytes, parseTimestamp, isValidYear } from '../utils' + +// ==================== 特征定义 ==================== + +export const feature: FormatFeature = { + id: 'shuakami-qq-exporter-v4', + name: 'shuakami/qq-chat-exporter V4', + platform: ChatPlatform.QQ, + priority: 10, // 高优先级 + extensions: ['.json'], + signatures: { + head: [/QQChatExporter V4/, /"version"\s*:\s*"4\./], + requiredFields: ['metadata', 'chatInfo', 'messages'], + }, +} + +// ==================== 消息结构 ==================== + +interface V4Message { + messageId?: string + timestamp: string // ISO 格式 + sender: { + uid?: string + uin?: string + name: string + } + messageType?: number + isSystemMessage?: boolean + isRecalled?: boolean + content: { + text: string + html?: string + raw?: string + resources?: Array<{ type: string }> + elements?: Array<{ type: string }> + } +} + +// ==================== 消息类型转换 ==================== + +function convertMessageType(messageType: number | undefined, content: V4Message['content']): MessageType { + // 检查资源类型 + if (content.resources && content.resources.length > 0) { + const resourceType = content.resources[0].type + switch (resourceType) { + case 'image': + return MessageType.IMAGE + case 'video': + return MessageType.VIDEO + case 'voice': + case 'audio': + return MessageType.VOICE + case 'file': + return MessageType.FILE + } + } + + // 检查元素类型 + if (content.elements) { + for (const elem of content.elements) { + if (elem.type === 'market_face' || elem.type === 'face') { + return MessageType.EMOJI + } + } + } + + // 根据 messageType 判断 + switch (messageType) { + case 1: + return MessageType.TEXT + case 2: + return MessageType.IMAGE + case 3: + return MessageType.VOICE + case 7: + return MessageType.VIDEO + default: + return MessageType.TEXT + } +} + +// ==================== 解析器实现 ==================== + +async function* parseV4(options: ParseOptions): AsyncGenerator { + const { filePath, batchSize = 5000, onProgress } = options + + const totalBytes = getFileSize(filePath) + let bytesRead = 0 + let messagesProcessed = 0 + + // 发送初始进度 + const initialProgress = createProgress('parsing', 0, totalBytes, 0, '开始解析...') + yield { type: 'progress', data: initialProgress } + onProgress?.(initialProgress) + + // 读取文件头获取 meta 信息 + const headContent = readFileHeadBytes(filePath, 100000) + + // 解析 chatInfo + let chatInfo = { name: '未知群聊', type: 'group' as const } + try { + const chatInfoMatch = headContent.match(/"chatInfo"\s*:\s*(\{[^}]+\})/) + if (chatInfoMatch) { + chatInfo = JSON.parse(chatInfoMatch[1]) + } + } catch { + // 使用默认值 + } + + // 发送 meta + const meta: ParsedMeta = { + name: chatInfo.name, + platform: ChatPlatform.QQ, + type: chatInfo.type === 'group' ? ChatType.GROUP : ChatType.PRIVATE, + } + yield { type: 'meta', data: meta } + + // 收集成员和消息 + const memberMap = new Map() + let messageBatch: ParsedMessage[] = [] + + // 流式解析 + await new Promise((resolve, reject) => { + const readStream = fs.createReadStream(filePath, { encoding: 'utf-8' }) + + readStream.on('data', (chunk: string | Buffer) => { + bytesRead += typeof chunk === 'string' ? Buffer.byteLength(chunk) : chunk.length + }) + + const pipeline = chain([readStream, parser(), pick({ filter: /^messages\.\d+$/ }), streamValues()]) + + const processMessage = (msg: V4Message): ParsedMessage | null => { + // 获取 platformId + const platformId = msg.sender.uin || msg.sender.uid + if (!platformId) return null + + // 获取发送者名称 + const senderName = msg.sender.name || platformId + + // 更新成员 + memberMap.set(platformId, { platformId, name: senderName }) + + // 解析时间戳 + const timestamp = parseTimestamp(msg.timestamp) + if (timestamp === null || !isValidYear(timestamp)) return null + + // 消息类型 + const type = msg.isSystemMessage ? MessageType.SYSTEM : convertMessageType(msg.messageType, msg.content) + + // 文本内容 + let textContent = msg.content?.text || '' + if (msg.isRecalled) { + textContent = '[已撤回] ' + textContent + } + + return { + senderPlatformId: platformId, + senderName, + timestamp, + type, + content: textContent || null, + } + } + + // 用于收集批次的临时数组 + const batchCollector: ParsedMessage[] = [] + + pipeline.on('data', ({ value }: { value: V4Message }) => { + const parsed = processMessage(value) + if (parsed) { + batchCollector.push(parsed) + messagesProcessed++ + + // 达到批次大小 + if (batchCollector.length >= batchSize) { + messageBatch.push(...batchCollector) + batchCollector.length = 0 + + const progress = createProgress( + 'parsing', + bytesRead, + totalBytes, + messagesProcessed, + `已处理 ${messagesProcessed} 条消息...` + ) + onProgress?.(progress) + } + } + }) + + pipeline.on('end', () => { + // 收集剩余消息 + if (batchCollector.length > 0) { + messageBatch.push(...batchCollector) + } + resolve() + }) + + pipeline.on('error', reject) + }) + + // 发送成员 + yield { type: 'members', data: Array.from(memberMap.values()) } + + // 分批发送消息 + for (let i = 0; i < messageBatch.length; i += batchSize) { + const batch = messageBatch.slice(i, i + batchSize) + yield { type: 'messages', data: batch } + } + + // 完成 + const doneProgress = createProgress('done', totalBytes, totalBytes, messagesProcessed, '解析完成') + yield { type: 'progress', data: doneProgress } + onProgress?.(doneProgress) + + yield { + type: 'done', + data: { messageCount: messagesProcessed, memberCount: memberMap.size }, + } +} + +// ==================== 导出解析器 ==================== + +export const parser_: Parser = { + feature, + parse: parseV4, +} + +// ==================== 导出预处理器 ==================== + +import { qqPreprocessor } from './shuakami-qq-preprocessor' +export const preprocessor = qqPreprocessor + +// ==================== 导出格式模块 ==================== + +const module_: FormatModule = { + feature, + parser: parser_, + preprocessor: qqPreprocessor, +} + +export default module_ diff --git a/electron/main/parser/formats/shuakami-qq-preprocessor.ts b/electron/main/parser/formats/shuakami-qq-preprocessor.ts new file mode 100644 index 0000000..fc7ff83 --- /dev/null +++ b/electron/main/parser/formats/shuakami-qq-preprocessor.ts @@ -0,0 +1,256 @@ +/** + * shuakami/qq-chat-exporter 格式预处理器 + * 适配项目: https://github.com/shuakami/qq-chat-exporter + * + * 功能:移除 content.html、content.raw 等冗余字段,减小文件体积 + * 阈值:>50MB 自动触发预处理 + */ + +import * as fs from 'fs' +import * as path from 'path' +import * as os from 'os' +import { parser } from 'stream-json' +import { pick } from 'stream-json/filters/Pick' +import { streamArray } from 'stream-json/streamers/StreamArray' +import { chain } from 'stream-chain' +import type { ParseProgress, Preprocessor } from '../types' +import { getFileSize, createProgress } from '../utils' + +/** 预处理阈值:50MB */ +const PREPROCESS_THRESHOLD = 50 * 1024 * 1024 + +/** + * 获取临时目录 + */ +function getTempDir(): string { + return path.join(os.tmpdir(), 'chatlab') +} + +/** + * 确保目录存在 + */ +function ensureDir(dir: string): void { + if (!fs.existsSync(dir)) { + fs.mkdirSync(dir, { recursive: true }) + } +} + +/** + * QQ JSON 消息的精简结构 + */ +interface SlimQQMessage { + id?: string + messageId?: string + timestamp: number | string + sender: { + uid?: string + uin?: string + name: string + } + type?: string + messageType?: number + content: { + text: string + elements?: Array<{ type: string }> + resources?: Array<{ type: string }> + } + recalled?: boolean + isRecalled?: boolean + system?: boolean + isSystemMessage?: boolean +} + +/** + * 精简 QQ JSON 消息对象 + */ +function slimMessage(msg: Record): SlimQQMessage { + const sender = msg.sender as { uin?: string; uid?: string; name?: string } | undefined + const content = msg.content as Record | undefined + + const slimContent: SlimQQMessage['content'] = { + text: (content?.text as string) || '', + } + + if (content?.elements && Array.isArray(content.elements)) { + slimContent.elements = (content.elements as Array<{ type: string }>).map((e) => ({ + type: e.type, + })) + } + + if (content?.resources && Array.isArray(content.resources)) { + slimContent.resources = (content.resources as Array<{ type: string }>).map((r) => ({ + type: r.type, + })) + } + + const slimMsg: SlimQQMessage = { + timestamp: msg.timestamp as number | string, + sender: { name: sender?.name || '' }, + content: slimContent, + } + + // 旧格式字段 + if (msg.id) slimMsg.id = msg.id as string + if (msg.type) slimMsg.type = msg.type as string + if (msg.recalled) slimMsg.recalled = msg.recalled as boolean + if (msg.system) slimMsg.system = msg.system as boolean + + // V4 新格式字段 + if (msg.messageId) slimMsg.messageId = msg.messageId as string + if (msg.messageType !== undefined) slimMsg.messageType = msg.messageType as number + if (msg.isRecalled) slimMsg.isRecalled = msg.isRecalled as boolean + if (msg.isSystemMessage) slimMsg.isSystemMessage = msg.isSystemMessage as boolean + + // sender 字段 + if (sender?.uin) slimMsg.sender.uin = sender.uin + if (sender?.uid) slimMsg.sender.uid = sender.uid + + return slimMsg +} + +/** + * 预处理 QQ JSON 文件 + */ +async function preprocessQQJson(inputPath: string, onProgress?: (progress: ParseProgress) => void): Promise { + const totalBytes = getFileSize(inputPath) + let bytesRead = 0 + let messagesProcessed = 0 + + const tempDir = getTempDir() + ensureDir(tempDir) + const outputFilename = `slim_${Date.now()}_${path.basename(inputPath)}` + const outputPath = path.join(tempDir, outputFilename) + + onProgress?.(createProgress('parsing', 0, totalBytes, 0, '预处理:读取文件头...')) + + return new Promise((resolve, reject) => { + const headChunks: string[] = [] + let headSize = 0 + const maxHeadSize = 100000 + + const headStream = fs.createReadStream(inputPath, { encoding: 'utf-8' }) + let chatInfo: Record = { name: '未知群聊', type: 'group' } + let metadata: Record | undefined + + headStream.on('data', (chunk: string) => { + if (headSize < maxHeadSize) { + headChunks.push(chunk) + headSize += chunk.length + } else { + headStream.destroy() + } + }) + + headStream.on('close', () => { + const headContent = headChunks.join('') + + try { + const chatInfoMatch = headContent.match(/"chatInfo"\s*:\s*(\{[^}]+\})/) + if (chatInfoMatch) { + chatInfo = JSON.parse(chatInfoMatch[1]) + } + } catch { + // 使用默认值 + } + + try { + const metadataMatch = headContent.match(/"metadata"\s*:\s*(\{[^}]+\})/) + if (metadataMatch) { + metadata = JSON.parse(metadataMatch[1]) + } + } catch { + // 忽略 + } + + onProgress?.(createProgress('parsing', 0, totalBytes, 0, '预处理:开始精简消息...')) + + const readStream = fs.createReadStream(inputPath, { encoding: 'utf-8' }) + const writeStream = fs.createWriteStream(outputPath, { encoding: 'utf-8' }) + + readStream.on('data', (chunk: string | Buffer) => { + bytesRead += typeof chunk === 'string' ? Buffer.byteLength(chunk) : chunk.length + }) + + const header = { metadata, chatInfo, messages: [] } + const headerJson = JSON.stringify(header) + writeStream.write(headerJson.slice(0, -3) + '\n') + + let isFirstMessage = true + + const pipeline = chain([readStream, parser(), pick({ filter: /^messages\.\d+$/ }), streamArray()]) + + pipeline.on('data', ({ value }: { value: Record }) => { + const slimMsg = slimMessage(value) + const msgJson = JSON.stringify(slimMsg) + + if (isFirstMessage) { + writeStream.write(msgJson) + isFirstMessage = false + } else { + writeStream.write(',\n' + msgJson) + } + + messagesProcessed++ + + if (messagesProcessed % 10000 === 0) { + onProgress?.( + createProgress( + 'parsing', + bytesRead, + totalBytes, + messagesProcessed, + `预处理:已精简 ${messagesProcessed} 条消息...` + ) + ) + } + }) + + pipeline.on('end', () => { + writeStream.write('\n]}') + writeStream.end() + + writeStream.on('finish', () => { + onProgress?.(createProgress('done', totalBytes, totalBytes, messagesProcessed, '预处理完成')) + resolve(outputPath) + }) + }) + + pipeline.on('error', (err) => { + writeStream.destroy() + if (fs.existsSync(outputPath)) { + fs.unlinkSync(outputPath) + } + onProgress?.(createProgress('error', bytesRead, totalBytes, messagesProcessed, `预处理错误: ${err.message}`)) + reject(err) + }) + }) + + headStream.on('error', reject) + }) +} + +/** + * 清理临时文件 + */ +function cleanupTempFile(filePath: string): void { + try { + if (fs.existsSync(filePath) && filePath.includes(getTempDir())) { + fs.unlinkSync(filePath) + } + } catch { + // 忽略清理错误 + } +} + +/** + * QQ Chat Exporter 预处理器 + */ +export const qqPreprocessor: Preprocessor = { + needsPreprocess(filePath: string, fileSize: number): boolean { + return fileSize > PREPROCESS_THRESHOLD + }, + + preprocess: preprocessQQJson, + + cleanup: cleanupTempFile, +} diff --git a/electron/main/parser/index.ts b/electron/main/parser/index.ts index a018989..ea76eb8 100644 --- a/electron/main/parser/index.ts +++ b/electron/main/parser/index.ts @@ -1,74 +1,256 @@ /** - * Parser 模块入口 - * 自动检测文件格式并使用对应的解析器 + * Parser V2 - 模块入口 + * 三层架构:标准层、嗅探层、解析层 */ -import * as fs from 'fs' -import type { ChatParser } from './types' -import { chatlabJsonParser } from './chatlabJsonParser' -import { qqJsonParser } from './qqJsonParser' -import { qqTxtParser } from './qqTxtParser' -import type { ParseResult } from '../../../src/types/chat' +import { FormatSniffer, createSniffer } from './sniffer' +import { formats } from './formats' +import type { + ParseOptions, + ParseEvent, + ParseResult, + ParseProgress, + FormatFeature, + Parser, + ParsedMeta, + ParsedMember, + ParsedMessage, +} from './types' -// 注册所有解析器(按优先级排序) -const parsers: ChatParser[] = [ - chatlabJsonParser, // ChatLab 格式最优先 - qqJsonParser, // QQ JSON 格式 - qqTxtParser, // TXT 格式兜底 -] +// ==================== 全局嗅探器实例 ==================== -/** - * 自动检测文件格式并解析 - * @param filePath 文件路径 - * @returns 解析结果 - */ -export function parseFile(filePath: string): ParseResult { - // 读取文件内容 - const content = fs.readFileSync(filePath, 'utf-8') - const filename = filePath.split(/[/\\]/).pop() || '' +const sniffer = createSniffer() +sniffer.registerAll(formats) - // 尝试每个解析器 - for (const parser of parsers) { - if (parser.detect(content, filename)) { - console.log(`使用解析器: ${parser.name}`) - return parser.parse(content, filename) - } - } - - throw new Error(`无法识别文件格式: ${filename}`) -} +// ==================== 公共 API ==================== /** * 检测文件格式 * @param filePath 文件路径 - * @returns 解析器名称,如果无法识别则返回 null + * @returns 格式特征,如果无法识别则返回 null */ -export function detectFormat(filePath: string): string | null { - try { - const content = fs.readFileSync(filePath, 'utf-8') - const filename = filePath.split(/[/\\]/).pop() || '' - - for (const parser of parsers) { - if (parser.detect(content, filename)) { - return parser.name - } - } - } catch { - // 文件读取失败 - } - - return null +export function detectFormat(filePath: string): FormatFeature | null { + return sniffer.sniff(filePath) } /** - * 获取支持的格式列表 + * 获取文件对应的解析器 + * @param filePath 文件路径 + * @returns 解析器实例,如果无法识别则返回 null */ -export function getSupportedFormats(): Array<{ name: string; platform: string }> { - return parsers.map((p) => ({ - name: p.name, - platform: p.platform, - })) +export function getParser(filePath: string): Parser | null { + return sniffer.getParser(filePath) } -// 导出类型 -export type { ChatParser, ParseError } from './types' +/** + * 获取所有支持的格式 + */ +export function getSupportedFormats(): FormatFeature[] { + return sniffer.getSupportedFormats() +} + +/** + * 获取格式的预处理器(如果有) + */ +export function getPreprocessor(filePath: string) { + const feature = sniffer.sniff(filePath) + if (!feature) return null + + const module = formats.find((m) => m.feature.id === feature.id) + return module?.preprocessor || null +} + +/** + * 检查文件是否需要预处理 + */ +export function needsPreprocess(filePath: string): boolean { + const preprocessor = getPreprocessor(filePath) + if (!preprocessor) return false + + const fileSize = getFileSize(filePath) + return preprocessor.needsPreprocess(filePath, fileSize) +} + +/** + * 流式解析文件 + * @param options 解析选项 + * @yields 解析事件流 + */ +export async function* parseFile(options: ParseOptions): AsyncGenerator { + const parser = sniffer.getParser(options.filePath) + if (!parser) { + yield { type: 'error', data: new Error(`无法识别文件格式: ${options.filePath}`) } + return + } + + console.log(`[Parser V2] 使用解析器: ${parser.feature.name}`) + yield* parser.parse(options) +} + +/** + * 同步解析文件(收集所有事件为完整结果) + * 适用于不需要流式处理的场景(如合并工具) + * @param filePath 文件路径 + * @param onProgress 进度回调(可选) + */ +export async function parseFileSync( + filePath: string, + onProgress?: (progress: ParseProgress) => void +): Promise { + let meta: ParsedMeta | null = null + const members: ParsedMember[] = [] + const messages: ParsedMessage[] = [] + + for await (const event of parseFile({ filePath, onProgress })) { + switch (event.type) { + case 'meta': + meta = event.data + break + case 'members': + members.push(...event.data) + break + case 'messages': + messages.push(...event.data) + break + case 'progress': + onProgress?.(event.data) + break + case 'error': + throw event.data + } + } + + if (!meta) { + throw new Error('解析失败:未获取到元信息') + } + + return { meta, members, messages } +} + +/** + * 解析文件获取基本信息(只统计,不返回完整消息) + * 用于预览和合并工具 + */ +export async function parseFileInfo( + filePath: string, + onProgress?: (progress: ParseProgress) => void +): Promise<{ + name: string + format: string + platform: string + messageCount: number + memberCount: number + fileSize: number +}> { + const feature = sniffer.sniff(filePath) + if (!feature) { + throw new Error(`无法识别文件格式: ${filePath}`) + } + + let name = '未知群聊' + let platform = feature.platform + let messageCount = 0 + let memberCount = 0 + + for await (const event of parseFile({ filePath, onProgress })) { + switch (event.type) { + case 'meta': + name = event.data.name + platform = event.data.platform + break + case 'members': + memberCount += event.data.length + break + case 'messages': + messageCount += event.data.length + break + case 'progress': + onProgress?.(event.data) + break + case 'error': + throw event.data + } + } + + // 获取文件大小 + const fs = await import('fs') + const fileSize = fs.statSync(filePath).size + + return { + name, + format: feature.name, + platform, + messageCount, + memberCount, + fileSize, + } +} + +// ==================== 导出类型 ==================== + +export type { + ParseOptions, + ParseEvent, + ParseResult, + ParseProgress, + FormatFeature, + Parser, + ParsedMeta, + ParsedMember, + ParsedMessage, +} + +// ==================== 导出嗅探器(高级用法) ==================== + +export { FormatSniffer, createSniffer } + +// ==================== 导出工具函数 ==================== + +export { getFileSize, formatFileSize, parseTimestamp, isValidYear, createProgress, readFileHeadBytes } from './utils' + +// ==================== 回调模式 API ==================== + +/** + * 回调模式的解析选项 + */ +export interface StreamParseCallbacks { + onProgress: (progress: ParseProgress) => void + onMeta: (meta: ParsedMeta) => void + onMembers: (members: ParsedMember[]) => void + onMessageBatch: (messages: ParsedMessage[]) => void +} + +export interface StreamParseOptions extends StreamParseCallbacks { + filePath: string + batchSize?: number +} + +/** + * 回调模式的流式解析 + * 内部使用 AsyncGenerator,对外提供回调接口 + */ +export async function streamParseFile( + filePath: string, + callbacks: Omit +): Promise { + const { onProgress, onMeta, onMembers, onMessageBatch, batchSize = 5000 } = callbacks + + for await (const event of parseFile({ filePath, batchSize, onProgress })) { + switch (event.type) { + case 'meta': + onMeta(event.data) + break + case 'members': + onMembers(event.data) + break + case 'messages': + onMessageBatch(event.data) + break + case 'progress': + onProgress(event.data) + break + case 'error': + throw event.data + } + } +} diff --git a/electron/main/parser/qqJsonParser.ts b/electron/main/parser/qqJsonParser.ts deleted file mode 100644 index 1b9c9bb..0000000 --- a/electron/main/parser/qqJsonParser.ts +++ /dev/null @@ -1,212 +0,0 @@ -/** - * QQ Chat Exporter V4 JSON 格式解析器 - * 支持 https://github.com/shuakami/qq-chat-exporter 导出的 JSON 格式 - */ - -import type { ChatParser, ParseError } from './types' -import { - ChatPlatform, - ChatType, - MessageType, - type ParseResult, - type ParsedMember, - type ParsedMessage, -} from '../../../src/types/chat' - -/** - * QQ JSON 导出格式的消息结构 - */ -interface QQJsonMessage { - id: string - seq?: string - timestamp: number - time?: string - sender: { - uid?: string - uin: string - name: string - } - type: string - content: { - text: string - html?: string - elements?: Array<{ - type: string - data: Record - }> - resources?: Array<{ - type: string - filename?: string - size?: number - url?: string - }> - } - recalled?: boolean - system?: boolean -} - -/** - * QQ JSON 导出格式的根结构 - */ -interface QQJsonExport { - metadata?: { - name?: string - version?: string - } - chatInfo: { - name: string - type: 'private' | 'group' - } - statistics?: { - totalMessages?: number - senders?: Array<{ - uid?: string - name: string - messageCount: number - }> - } - messages: QQJsonMessage[] -} - -/** - * 将 QQ 消息类型转换为统一类型 - */ -function convertMessageType(qqType: string, content: QQJsonMessage['content']): MessageType { - // 检查是否有资源(图片等) - if (content.resources && content.resources.length > 0) { - const resourceType = content.resources[0].type - switch (resourceType) { - case 'image': - return MessageType.IMAGE - case 'video': - return MessageType.VIDEO - case 'voice': - case 'audio': - return MessageType.VOICE - case 'file': - return MessageType.FILE - } - } - - // 检查 elements 中是否有特殊类型 - if (content.elements) { - for (const elem of content.elements) { - if (elem.type === 'market_face' || elem.type === 'face') { - return MessageType.EMOJI - } - } - } - - // 根据 QQ 原始类型判断 - switch (qqType) { - case 'type_1': - return MessageType.TEXT - case 'type_17': // 表情包 - return MessageType.EMOJI - case 'type_3': // 图片 - return MessageType.IMAGE - case 'type_7': // 语音 - return MessageType.VOICE - default: - return MessageType.TEXT - } -} - -/** - * QQ JSON 格式解析器 - */ -export const qqJsonParser: ChatParser = { - name: 'QQ Chat Exporter JSON', - platform: 'qq', - - detect(content: string, filename: string): boolean { - // 检查文件扩展名 - if (!filename.toLowerCase().endsWith('.json')) { - return false - } - - try { - const data = JSON.parse(content) - // 检查是否有 QQ Chat Exporter 的特征 - return ( - data.chatInfo && - typeof data.chatInfo.name === 'string' && - Array.isArray(data.messages) && - (data.metadata?.name?.includes('QQChatExporter') || - // 兼容没有 metadata 的情况,检查消息结构 - (data.messages.length > 0 && data.messages[0].sender?.uin !== undefined)) - ) - } catch { - return false - } - }, - - parse(content: string, _filename: string): ParseResult { - let data: QQJsonExport - try { - data = JSON.parse(content) - } catch (e) { - throw new Error(`JSON 解析失败: ${e}`) as ParseError - } - - if (!data.chatInfo || !Array.isArray(data.messages)) { - throw new Error('无效的 QQ JSON 格式:缺少 chatInfo 或 messages') as ParseError - } - - // 解析元信息 - const meta = { - name: data.chatInfo.name, - platform: ChatPlatform.QQ, - type: data.chatInfo.type === 'group' ? ChatType.GROUP : ChatType.PRIVATE, - } - - // 收集成员信息(使用 Map 去重,保留最新昵称) - const memberMap = new Map() - - // 解析消息 - const messages: ParsedMessage[] = [] - - for (const msg of data.messages) { - const platformId = msg.sender.uin - - // 更新成员信息(保留最新昵称) - memberMap.set(platformId, { - platformId, - name: msg.sender.name || platformId, - }) - - // 转换时间戳(QQ 导出是毫秒,需要转为秒) - const timestamp = Math.floor(msg.timestamp / 1000) - - // 过滤掉不合理的年份(2000年以前) - if (new Date(msg.timestamp).getFullYear() < 2000) { - continue - } - - // 确定消息类型 - const type = msg.system ? MessageType.SYSTEM : convertMessageType(msg.type, msg.content) - - // 提取文本内容 - let textContent = msg.content.text || '' - - // 如果是撤回的消息,添加标记 - if (msg.recalled) { - textContent = '[已撤回] ' + textContent - } - - messages.push({ - senderPlatformId: platformId, - senderName: msg.sender.name || platformId, - timestamp, - type, - content: textContent || null, - }) - } - - return { - meta, - members: Array.from(memberMap.values()), - messages, - } - }, -} diff --git a/electron/main/parser/qqTxtParser.ts b/electron/main/parser/qqTxtParser.ts deleted file mode 100644 index ca21bf6..0000000 --- a/electron/main/parser/qqTxtParser.ts +++ /dev/null @@ -1,217 +0,0 @@ -/** - * QQ 原生导出 TXT 格式解析器 - * 支持 QQ 客户端导出的文本格式聊天记录 - */ - -import type { ChatParser } from './types' -import { - ChatPlatform, - ChatType, - MessageType, - type ParseResult, - type ParsedMember, - type ParsedMessage, -} from '../../../src/types/chat' - -/** - * 消息行正则表达式 - * 格式: 2017-02-25 10:40:20 昵称(QQ号) - * 或: 2017-02-25 10:40:20 (QQ号) - * 或: 2017-02-25 10:40:20 【管理员】昵称(QQ号) - * 或: 2017-02-25 10:40:20 昵称<邮箱> - */ -const MESSAGE_HEADER_REGEX = - /^(\d{4}-\d{2}-\d{2}\s+\d{1,2}:\d{2}:\d{2})\s+(?:【[^】]+】)?(.+?)(?:\((\d+)\)|<([^>]+)>)\s*$/ - -/** - * 群名提取正则 - * 格式: 消息对象:群名 - */ -const GROUP_NAME_REGEX = /^消息对象[::](.+)$/ - -/** - * 检测消息类型 - */ -function detectMessageType(content: string): MessageType { - const trimmed = content.trim() - - // 图片 - if (trimmed === '[图片]' || trimmed.startsWith('[图片]')) { - return MessageType.IMAGE - } - - // 表情 - if (/^\[.+\]$/.test(trimmed) || /^\[\[.+\]\]$/.test(trimmed)) { - return MessageType.EMOJI - } - - // 语音 - if (trimmed === '[语音]' || trimmed.startsWith('[语音]')) { - return MessageType.VOICE - } - - // 视频 - if (trimmed === '[视频]' || trimmed.startsWith('[视频]')) { - return MessageType.VIDEO - } - - // 文件 - if (trimmed === '[文件]' || trimmed.startsWith('[文件]')) { - return MessageType.FILE - } - - // 系统消息 - if ( - trimmed.includes('加入了群聊') || - trimmed.includes('退出了群聊') || - trimmed.includes('撤回了一条消息') || - trimmed.includes('被管理员') || - trimmed.includes('成为管理员') - ) { - return MessageType.SYSTEM - } - - return MessageType.TEXT -} - -/** - * 解析日期时间字符串为时间戳(秒) - */ -function parseDateTime(dateTimeStr: string): number { - // 格式: 2017-02-25 10:40:20 - const [datePart, timePart] = dateTimeStr.split(/\s+/) - const [year, month, day] = datePart.split('-').map(Number) - const [hour, minute, second] = timePart.split(':').map(Number) - - const date = new Date(year, month - 1, day, hour, minute, second) - return Math.floor(date.getTime() / 1000) -} - -/** - * QQ TXT 格式解析器 - */ -export const qqTxtParser: ChatParser = { - name: 'QQ Native TXT Export', - platform: 'qq', - - detect(content: string, filename: string): boolean { - // 检查文件扩展名 - if (!filename.toLowerCase().endsWith('.txt')) { - return false - } - - // 检查文件头特征 - const lines = content.split('\n').slice(0, 20) - const hasHeader = lines.some( - (line) => - line.includes('消息记录') || - line.includes('消息分组') || - line.includes('消息对象') || - line.includes('================================================================') - ) - - // 检查是否有符合格式的消息行 - const hasMessagePattern = lines.some((line) => MESSAGE_HEADER_REGEX.test(line.trim())) - - return hasHeader || hasMessagePattern - }, - - parse(content: string, _filename: string): ParseResult { - const lines = content.split('\n') - - // 提取群名 - let groupName = '未知对话' - for (const line of lines.slice(0, 20)) { - const match = line.trim().match(GROUP_NAME_REGEX) - if (match) { - groupName = match[1].trim() - break - } - } - - // 收集成员信息 - const memberMap = new Map() - - // 解析消息 - const messages: ParsedMessage[] = [] - - let currentSender: { platformId: string; name: string } | null = null - let currentTimestamp: number = 0 - let currentContent: string[] = [] - - // 处理当前累积的消息 - const flushMessage = (): void => { - if (currentSender && currentContent.length > 0) { - const content = currentContent.join('\n').trim() - if (content) { - messages.push({ - senderPlatformId: currentSender.platformId, - senderName: currentSender.name, - timestamp: currentTimestamp, - type: detectMessageType(content), - content, - }) - } - } - currentContent = [] - } - - for (const line of lines) { - const trimmedLine = line.trim() - - // 跳过空行和分隔线 - if (!trimmedLine || trimmedLine.startsWith('===') || trimmedLine.startsWith('消息')) { - continue - } - - // 尝试匹配消息头 - const headerMatch = trimmedLine.match(MESSAGE_HEADER_REGEX) - - if (headerMatch) { - // 先保存之前的消息 - flushMessage() - - const dateTimeStr = headerMatch[1] - const nameOrEmpty = headerMatch[2].trim() - const qqNumber = headerMatch[3] || headerMatch[4] // QQ号或邮箱 - - // 处理只有QQ号没有昵称的情况 - const name = nameOrEmpty || qqNumber - - // 更新成员信息(保留最新昵称) - memberMap.set(qqNumber, { - platformId: qqNumber, - name, - }) - - const timestamp = parseDateTime(dateTimeStr) - - // 过滤掉不合理的年份(2000年以前) - if (new Date(timestamp * 1000).getFullYear() < 2000) { - // 如果时间戳无效,标记为 null,后续不添加 - currentSender = null - currentTimestamp = 0 - } else { - currentSender = { platformId: qqNumber, name } - currentTimestamp = timestamp - } - } else { - // 这是消息内容行 - currentContent.push(trimmedLine) - } - } - - // 处理最后一条消息 - flushMessage() - - return { - meta: { - name: groupName, - platform: ChatPlatform.QQ, - type: ChatType.GROUP, // TXT 导出通常是群聊 - }, - members: Array.from(memberMap.values()), - messages, - } - }, -} diff --git a/electron/main/parser/sniffer.ts b/electron/main/parser/sniffer.ts new file mode 100644 index 0000000..d3b6efb --- /dev/null +++ b/electron/main/parser/sniffer.ts @@ -0,0 +1,170 @@ +/** + * Parser V2 - 嗅探层 + * 负责检测文件格式,匹配对应的解析器 + */ + +import * as fs from 'fs' +import * as path from 'path' +import type { FormatFeature, FormatModule, Parser } from './types' + +/** 文件头检测大小 (8KB) */ +const HEAD_SIZE = 8 * 1024 + +/** + * 读取文件头部内容 + */ +function readFileHead(filePath: string, size: number = HEAD_SIZE): string { + const fd = fs.openSync(filePath, 'r') + const buffer = Buffer.alloc(size) + const bytesRead = fs.readSync(fd, buffer, 0, size, 0) + fs.closeSync(fd) + return buffer.slice(0, bytesRead).toString('utf-8') +} + +/** + * 获取文件扩展名(小写) + */ +function getExtension(filePath: string): string { + return path.extname(filePath).toLowerCase() +} + +/** + * 检查文件头是否匹配签名 + */ +function matchHeadSignatures(headContent: string, patterns: RegExp[]): boolean { + return patterns.some((pattern) => pattern.test(headContent)) +} + +/** + * 检查必需字段是否存在 + */ +function matchRequiredFields(headContent: string, fields: string[]): boolean { + // 简单检查:字段名是否出现在文件头中 + // 对于 JSON 文件,检查 "fieldName" 是否存在 + return fields.every((field) => { + const pattern = new RegExp(`"${field.replace('.', '"\\s*:\\s*.*"')}"\\s*:`) + return pattern.test(headContent) || headContent.includes(`"${field}"`) + }) +} + +/** + * 格式嗅探器 + * 管理所有格式特征,负责检测文件格式 + */ +export class FormatSniffer { + private formats: FormatModule[] = [] + + /** + * 注册格式模块 + */ + register(module: FormatModule): void { + this.formats.push(module) + // 按优先级排序(优先级数字越小越靠前) + this.formats.sort((a, b) => a.feature.priority - b.feature.priority) + } + + /** + * 批量注册格式模块 + */ + registerAll(modules: FormatModule[]): void { + for (const module of modules) { + this.register(module) + } + } + + /** + * 嗅探文件格式 + * @param filePath 文件路径 + * @returns 匹配的格式特征,如果无法识别则返回 null + */ + sniff(filePath: string): FormatFeature | null { + const ext = getExtension(filePath) + const headContent = readFileHead(filePath) + + for (const { feature } of this.formats) { + if (this.matchFeature(feature, ext, headContent)) { + return feature + } + } + + return null + } + + /** + * 获取文件对应的解析器 + * @param filePath 文件路径 + * @returns 匹配的解析器,如果无法识别则返回 null + */ + getParser(filePath: string): Parser | null { + const ext = getExtension(filePath) + const headContent = readFileHead(filePath) + + for (const { feature, parser } of this.formats) { + if (this.matchFeature(feature, ext, headContent)) { + return parser + } + } + + return null + } + + /** + * 根据格式 ID 获取解析器 + */ + getParserById(formatId: string): Parser | null { + const module = this.formats.find((m) => m.feature.id === formatId) + return module?.parser || null + } + + /** + * 获取所有支持的格式 + */ + getSupportedFormats(): FormatFeature[] { + return this.formats.map((m) => m.feature) + } + + /** + * 检查特征是否匹配 + */ + private matchFeature(feature: FormatFeature, ext: string, headContent: string): boolean { + // 1. 检查扩展名 + if (!feature.extensions.includes(ext)) { + return false + } + + const { signatures } = feature + + // 2. 检查文件头签名(如果定义了) + if (signatures.head && signatures.head.length > 0) { + if (!matchHeadSignatures(headContent, signatures.head)) { + return false + } + } + + // 3. 检查必需字段(如果定义了) + if (signatures.requiredFields && signatures.requiredFields.length > 0) { + if (!matchRequiredFields(headContent, signatures.requiredFields)) { + return false + } + } + + // 4. 检查字段值模式(如果定义了) + if (signatures.fieldPatterns) { + for (const [, pattern] of Object.entries(signatures.fieldPatterns)) { + if (!pattern.test(headContent)) { + return false + } + } + } + + return true + } +} + +/** + * 创建并返回全局嗅探器实例 + */ +export function createSniffer(): FormatSniffer { + return new FormatSniffer() +} + diff --git a/electron/main/parser/types.ts b/electron/main/parser/types.ts index b691f3d..7f3ee9a 100644 --- a/electron/main/parser/types.ts +++ b/electron/main/parser/types.ts @@ -1,44 +1,156 @@ /** - * Parser 接口定义 + * Parser V2 - 类型定义 + * 三层架构:标准层、嗅探层、解析层 */ -import type { ParseResult } from '../../../src/types/chat' +import type { ChatPlatform, ChatType, ParsedMember, ParsedMessage } from '../../../src/types/chat' + +// ==================== 标准层:统一输出结构 ==================== /** - * 聊天记录解析器接口 + * 解析的元信息 */ -export interface ChatParser { - /** 解析器名称 */ +export interface ParsedMeta { name: string - - /** 支持的平台 */ - platform: string - - /** - * 检测文件内容是否匹配该解析器 - * @param content 文件内容 - * @param filename 文件名 - */ - detect(content: string, filename: string): boolean - - /** - * 解析文件内容 - * @param content 文件内容 - * @param filename 文件名 - */ - parse(content: string, filename: string): ParseResult + platform: ChatPlatform + type: ChatType } /** - * 解析错误 + * 解析进度 */ -export class ParseError extends Error { - constructor( - message: string, - public readonly parserName: string - ) { - super(message) - this.name = 'ParseError' - } +export interface ParseProgress { + /** 阶段 */ + stage: 'detecting' | 'parsing' | 'done' | 'error' + /** 已读取字节数 */ + bytesRead: number + /** 文件总字节数 */ + totalBytes: number + /** 已处理消息数 */ + messagesProcessed: number + /** 百分比 0-100 */ + percentage: number + /** 状态消息 */ + message: string } +/** + * 解析事件(AsyncGenerator 输出) + */ +export type ParseEvent = + | { type: 'meta'; data: ParsedMeta } + | { type: 'members'; data: ParsedMember[] } + | { type: 'messages'; data: ParsedMessage[] } + | { type: 'progress'; data: ParseProgress } + | { type: 'done'; data: { messageCount: number; memberCount: number } } + | { type: 'error'; data: Error } + +/** + * 完整解析结果(用于同步收集) + */ +export interface ParseResult { + meta: ParsedMeta + members: ParsedMember[] + messages: ParsedMessage[] +} + +// ==================== 嗅探层:特征定义 ==================== + +/** + * 格式特征签名 + */ +export interface FormatSignatures { + /** 文件头正则匹配(任意一个匹配即可) */ + head?: RegExp[] + /** 必须存在的 JSON 字段路径 */ + requiredFields?: string[] + /** 字段值模式匹配 */ + fieldPatterns?: Record +} + +/** + * 格式特征定义 + */ +export interface FormatFeature { + /** 唯一标识符 */ + id: string + /** 显示名称 */ + name: string + /** 平台 */ + platform: ChatPlatform + /** 优先级(数字越小越优先) */ + priority: number + /** 支持的文件扩展名 */ + extensions: string[] + /** 内容特征签名 */ + signatures: FormatSignatures +} + +// ==================== 解析层:解析器接口 ==================== + +/** + * 解析选项 + */ +export interface ParseOptions { + /** 文件路径 */ + filePath: string + /** 每批消息数量(默认 5000) */ + batchSize?: number + /** 进度回调(可选,用于外部监听) */ + onProgress?: (progress: ParseProgress) => void +} + +/** + * 解析器接口 + * 使用 AsyncGenerator 实现渐进式输出 + */ +export interface Parser { + /** 关联的格式特征 */ + readonly feature: FormatFeature + + /** + * 解析文件 + * @param options 解析选项 + * @yields 解析事件流 + */ + parse(options: ParseOptions): AsyncGenerator +} + +/** + * 预处理器接口 + * 用于大文件预处理,移除冗余字段 + */ +export interface Preprocessor { + /** 是否需要预处理 */ + needsPreprocess(filePath: string, fileSize: number): boolean + /** 执行预处理,返回临时文件路径 */ + preprocess(filePath: string, onProgress?: (progress: ParseProgress) => void): Promise + /** 清理临时文件 */ + cleanup(tempPath: string): void +} + +/** + * 格式模块导出结构 + * 每个格式文件同时导出 feature、parser,以及可选的 preprocessor + */ +export interface FormatModule { + feature: FormatFeature + parser: Parser + preprocessor?: Preprocessor +} + +// ==================== 工具类型 ==================== + +/** + * 创建进度对象的工具函数类型 + */ +export type CreateProgress = ( + stage: ParseProgress['stage'], + bytesRead: number, + totalBytes: number, + messagesProcessed: number, + message: string +) => ParseProgress + +// 重新导出共享类型 +export type { ParsedMember, ParsedMessage, ChatPlatform, ChatType } diff --git a/electron/main/parser/utils.ts b/electron/main/parser/utils.ts new file mode 100644 index 0000000..7953b7e --- /dev/null +++ b/electron/main/parser/utils.ts @@ -0,0 +1,75 @@ +/** + * Parser V2 - 工具函数 + */ + +import * as fs from 'fs' +import type { ParseProgress, CreateProgress } from './types' + +/** + * 获取文件大小 + */ +export function getFileSize(filePath: string): number { + return fs.statSync(filePath).size +} + +/** + * 读取文件头部指定字节数 + */ +export function readFileHeadBytes(filePath: string, size: number): string { + const fd = fs.openSync(filePath, 'r') + const buffer = Buffer.alloc(size) + const bytesRead = fs.readSync(fd, buffer, 0, size, 0) + fs.closeSync(fd) + return buffer.slice(0, bytesRead).toString('utf-8') +} + +/** + * 创建进度对象 + */ +export const createProgress: CreateProgress = (stage, bytesRead, totalBytes, messagesProcessed, message) => { + const percentage = totalBytes > 0 ? Math.min(99, Math.round((bytesRead / totalBytes) * 100)) : 0 + return { + stage, + bytesRead, + totalBytes, + messagesProcessed, + percentage: stage === 'done' ? 100 : percentage, + message, + } +} + +/** + * 解析 ISO 时间戳或毫秒时间戳 + * @returns 秒级时间戳,如果解析失败返回 null + */ +export function parseTimestamp(value: string | number): number | null { + if (typeof value === 'string') { + // ISO 字符串格式:2017-12-30T03:24:36.000Z + const parsed = Date.parse(value) + if (isNaN(parsed)) return null + return Math.floor(parsed / 1000) + } else if (typeof value === 'number') { + // 毫秒时间戳 + if (isNaN(value)) return null + return Math.floor(value / 1000) + } + return null +} + +/** + * 验证年份是否合理(2000年以后) + */ +export function isValidYear(timestampSeconds: number): boolean { + return new Date(timestampSeconds * 1000).getFullYear() >= 2000 +} + +/** + * 格式化文件大小 + */ +export function formatFileSize(bytes: number): string { + if (bytes < 1024) return `${bytes} B` + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB` + if (bytes < 1024 * 1024 * 1024) return `${(bytes / (1024 * 1024)).toFixed(1)} MB` + return `${(bytes / (1024 * 1024 * 1024)).toFixed(2)} GB` +} + diff --git a/electron/main/worker/dbWorker.ts b/electron/main/worker/dbWorker.ts index 159b4f9..00b90b9 100644 --- a/electron/main/worker/dbWorker.ts +++ b/electron/main/worker/dbWorker.ts @@ -37,10 +37,12 @@ import { getCheckInAnalysis, } from './queryAdvanced' import { parseFile, detectFormat } from '../parser' +import { streamImport, streamParseFileInfo } from './streamImport' import type { FileParseInfo } from '../../../src/types/chat' /** * 解析文件获取基本信息(在 Worker 线程中执行,不阻塞主进程) + * @deprecated 使用 streamParseFileInfo 替代 */ function parseFileInfo(filePath: string): FileParseInfo { const format = detectFormat(filePath) @@ -70,9 +72,9 @@ interface WorkerMessage { payload: any } -// 消息类型到处理函数的映射 -const handlers: Record any> = { - // 文件解析(合并功能使用) +// 同步消息处理器 +const syncHandlers: Record any> = { + // 文件解析(合并功能使用,已废弃) parseFileInfo: (p) => parseFileInfo(p.filePath), // 基础查询 @@ -111,17 +113,34 @@ const handlers: Record any> = { getCheckInAnalysis: (p) => getCheckInAnalysis(p.sessionId, p.filter), } +// 异步消息处理器(流式操作) +const asyncHandlers: Record Promise> = { + // 流式导入 + streamImport: (p, id) => streamImport(p.filePath, id), + // 流式解析文件信息(用于合并预览) + streamParseFileInfo: (p, id) => streamParseFileInfo(p.filePath, id), +} + // 处理消息 -parentPort?.on('message', (message: WorkerMessage) => { +parentPort?.on('message', async (message: WorkerMessage) => { const { id, type, payload } = message try { - const handler = handlers[type] - if (!handler) { + // 检查是否是异步处理器 + const asyncHandler = asyncHandlers[type] + if (asyncHandler) { + const result = await asyncHandler(payload, id) + parentPort?.postMessage({ id, success: true, result }) + return + } + + // 同步处理器 + const syncHandler = syncHandlers[type] + if (!syncHandler) { throw new Error(`Unknown message type: ${type}`) } - const result = handler(payload) + const result = syncHandler(payload) parentPort?.postMessage({ id, success: true, result }) } catch (error) { parentPort?.postMessage({ diff --git a/electron/main/worker/index.ts b/electron/main/worker/index.ts index 69b2599..86e0cba 100644 --- a/electron/main/worker/index.ts +++ b/electron/main/worker/index.ts @@ -31,6 +31,9 @@ export { getAllSessions, getSession, closeDatabase, - // 文件解析 API(异步,用于合并功能) + // 文件解析 API(已废弃,使用流式版本) parseFileInfo, + // 流式导入 API + streamImport, + streamParseFileInfo, } from './workerManager' diff --git a/electron/main/worker/streamImport.ts b/electron/main/worker/streamImport.ts new file mode 100644 index 0000000..42b4404 --- /dev/null +++ b/electron/main/worker/streamImport.ts @@ -0,0 +1,382 @@ +/** + * 流式导入模块 + * 在 Worker 线程中流式解析文件并批量写入数据库 + */ + +import Database from 'better-sqlite3' +import * as fs from 'fs' +import * as path from 'path' +import { parentPort } from 'worker_threads' +import { + streamParseFile, + detectFormat, + getPreprocessor, + needsPreprocess, + type ParseProgress, + type ParsedMeta, + type ParsedMember, + type ParsedMessage, + getFileSize, +} from '../parser' + +/** 流式导入结果 */ +export interface StreamImportResult { + success: boolean + sessionId?: string + error?: string +} +import { getDbDir } from './dbCore' + +/** + * 发送进度到主进程 + */ +function sendProgress(requestId: string, progress: ParseProgress): void { + parentPort?.postMessage({ + id: requestId, + type: 'progress', + payload: progress, + }) +} + +/** + * 生成唯一的会话ID + */ +function generateSessionId(): string { + const timestamp = Date.now() + const random = Math.random().toString(36).substring(2, 8) + return `chat_${timestamp}_${random}` +} + +/** + * 获取数据库文件路径 + */ +function getDbPath(sessionId: string): string { + return path.join(getDbDir(), `${sessionId}.db`) +} + +/** + * 创建数据库并初始化表结构 + */ +function createDatabase(sessionId: string): Database.Database { + const dbDir = getDbDir() + if (!fs.existsSync(dbDir)) { + fs.mkdirSync(dbDir, { recursive: true }) + } + + const dbPath = getDbPath(sessionId) + const db = new Database(dbPath) + + db.pragma('journal_mode = WAL') + db.pragma('synchronous = NORMAL') + + db.exec(` + CREATE TABLE IF NOT EXISTS meta ( + name TEXT NOT NULL, + platform TEXT NOT NULL, + type TEXT NOT NULL, + imported_at INTEGER NOT NULL + ); + + CREATE TABLE IF NOT EXISTS member ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + platform_id TEXT NOT NULL UNIQUE, + name TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS member_name_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + member_id INTEGER NOT NULL, + name TEXT NOT NULL, + start_ts INTEGER NOT NULL, + end_ts INTEGER, + FOREIGN KEY(member_id) REFERENCES member(id) + ); + + CREATE TABLE IF NOT EXISTS message ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + sender_id INTEGER NOT NULL, + ts INTEGER NOT NULL, + type INTEGER NOT NULL, + content TEXT, + FOREIGN KEY(sender_id) REFERENCES member(id) + ); + + CREATE INDEX IF NOT EXISTS idx_message_ts ON message(ts); + CREATE INDEX IF NOT EXISTS idx_message_sender ON message(sender_id); + CREATE INDEX IF NOT EXISTS idx_member_name_history_member_id ON member_name_history(member_id); + `) + + return db +} + +/** + * 流式导入聊天记录 + * @param filePath 文件路径 + * @param requestId 请求ID(用于进度回调) + */ +export async function streamImport(filePath: string, requestId: string): Promise { + // 检测格式 + const formatFeature = detectFormat(filePath) + if (!formatFeature) { + return { success: false, error: '无法识别文件格式' } + } + + console.log(`[StreamImport] 开始导入: ${filePath}, 格式: ${formatFeature.name}`) + + // 预处理:如果格式需要且文件较大,先精简 + let actualFilePath = filePath + let tempFilePath: string | null = null + const preprocessor = getPreprocessor(filePath) + + if (preprocessor && needsPreprocess(filePath)) { + console.log(`[StreamImport] 文件需要预处理,开始精简...`) + sendProgress(requestId, { + stage: 'parsing', + bytesRead: 0, + totalBytes: 0, + messagesProcessed: 0, + percentage: 0, + message: '预处理:精简大文件中...', + }) + + try { + tempFilePath = await preprocessor.preprocess(filePath, (progress) => { + sendProgress(requestId, { + ...progress, + message: progress.message || '预处理中...', + }) + }) + actualFilePath = tempFilePath + console.log(`[StreamImport] 预处理完成: ${tempFilePath}`) + } catch (err) { + console.error(`[StreamImport] 预处理失败:`, err) + return { + success: false, + error: `预处理失败: ${err instanceof Error ? err.message : String(err)}`, + } + } + } + + const sessionId = generateSessionId() + const db = createDatabase(sessionId) + + // 准备语句 + const insertMeta = db.prepare(` + INSERT INTO meta (name, platform, type, imported_at) VALUES (?, ?, ?, ?) + `) + const insertMember = db.prepare(` + INSERT OR IGNORE INTO member (platform_id, name) VALUES (?, ?) + `) + const getMemberId = db.prepare(`SELECT id FROM member WHERE platform_id = ?`) + const insertMessage = db.prepare(` + INSERT INTO message (sender_id, ts, type, content) VALUES (?, ?, ?, ?) + `) + const insertNameHistory = db.prepare(` + INSERT INTO member_name_history (member_id, name, start_ts, end_ts) VALUES (?, ?, ?, ?) + `) + const updateMemberName = db.prepare(`UPDATE member SET name = ? WHERE platform_id = ?`) + const updateNameHistoryEndTs = db.prepare(` + UPDATE member_name_history SET end_ts = ? WHERE member_id = ? AND end_ts IS NULL + `) + + // 成员ID映射(platformId -> dbId) + const memberIdMap = new Map() + // 昵称追踪器 + const nicknameTracker = new Map() + // 是否已插入 meta + let metaInserted = false + + // 开始事务(整个导入作为一个大事务,提高性能) + db.exec('BEGIN TRANSACTION') + + try { + await streamParseFile(actualFilePath, { + batchSize: 5000, + + onProgress: (progress) => { + // 转发进度到主进程 + sendProgress(requestId, progress) + }, + + onMeta: (meta: ParsedMeta) => { + if (!metaInserted) { + insertMeta.run(meta.name, meta.platform, meta.type, Math.floor(Date.now() / 1000)) + metaInserted = true + console.log(`[StreamImport] Meta 已写入: ${meta.name}`) + } + }, + + onMembers: (members: ParsedMember[]) => { + console.log(`[StreamImport] 收到 ${members.length} 个成员`) + for (const member of members) { + insertMember.run(member.platformId, member.name) + const row = getMemberId.get(member.platformId) as { id: number } | undefined + if (row) { + memberIdMap.set(member.platformId, row.id) + } + } + }, + + onMessageBatch: (messages: ParsedMessage[]) => { + for (const msg of messages) { + // 数据验证:跳过无效消息 + if (!msg.senderPlatformId || !msg.senderName) { + console.warn('[StreamImport] 跳过无效消息:缺少发送者信息') + continue + } + if (msg.timestamp === undefined || msg.timestamp === null || isNaN(msg.timestamp)) { + console.warn('[StreamImport] 跳过无效消息:缺少时间戳') + continue + } + if (msg.type === undefined || msg.type === null) { + console.warn('[StreamImport] 跳过无效消息:缺少消息类型') + continue + } + + // 确保成员存在 + if (!memberIdMap.has(msg.senderPlatformId)) { + const memberName = msg.senderName || msg.senderPlatformId + insertMember.run(msg.senderPlatformId, memberName) + const row = getMemberId.get(msg.senderPlatformId) as { id: number } | undefined + if (row) { + memberIdMap.set(msg.senderPlatformId, row.id) + } + } + + const senderId = memberIdMap.get(msg.senderPlatformId) + if (senderId === undefined) continue + + // 插入消息 + insertMessage.run(senderId, msg.timestamp, msg.type, msg.content) + + // 追踪昵称变化 + const senderName = msg.senderName || msg.senderPlatformId + const tracker = nicknameTracker.get(msg.senderPlatformId) + if (!tracker) { + nicknameTracker.set(msg.senderPlatformId, { + currentName: senderName, + lastSeenTs: msg.timestamp, + }) + insertNameHistory.run(senderId, senderName, msg.timestamp, null) + } else if (tracker.currentName !== senderName) { + updateNameHistoryEndTs.run(msg.timestamp, senderId) + insertNameHistory.run(senderId, senderName, msg.timestamp, null) + tracker.currentName = senderName + tracker.lastSeenTs = msg.timestamp + } else { + tracker.lastSeenTs = msg.timestamp + } + } + }, + }) + + // 更新成员的最新昵称 + for (const [platformId, tracker] of nicknameTracker.entries()) { + updateMemberName.run(tracker.currentName, platformId) + } + + // 提交事务 + db.exec('COMMIT') + + console.log(`[StreamImport] 导入完成: ${sessionId}`) + return { success: true, sessionId } + } catch (error) { + // 回滚事务 + db.exec('ROLLBACK') + + // 删除失败的数据库文件 + const dbPath = getDbPath(sessionId) + if (fs.existsSync(dbPath)) { + fs.unlinkSync(dbPath) + } + + console.error(`[StreamImport] 导入失败:`, error) + return { + success: false, + error: error instanceof Error ? error.message : String(error), + } + } finally { + db.close() + + // 清理临时文件 + if (tempFilePath && preprocessor) { + preprocessor.cleanup(tempFilePath) + console.log(`[StreamImport] 已清理临时文件: ${tempFilePath}`) + } + } +} + +/** + * 流式解析文件获取基本信息(只统计,不写入数据库) + * 用于合并功能的预览 + */ +export async function streamParseFileInfo( + filePath: string, + requestId: string +): Promise<{ + name: string + format: string + platform: string + messageCount: number + memberCount: number + fileSize: number +}> { + const formatFeature = detectFormat(filePath) + if (!formatFeature) { + throw new Error('无法识别文件格式') + } + + // 获取文件大小 + const fileSize = fs.statSync(filePath).size + + // 立即发送初始进度,让用户知道已开始处理 + sendProgress(requestId, { + stage: 'parsing', + bytesRead: 0, + totalBytes: fileSize, + messagesProcessed: 0, + percentage: 0, + message: '正在读取文件...', + }) + + let name = '未知群聊' + let platform = formatFeature.platform + let messageCount = 0 + const memberSet = new Set() + + await streamParseFile(filePath, { + // 对于大文件使用更小的批次,以更频繁地更新进度 + batchSize: fileSize > 100 * 1024 * 1024 ? 2000 : 5000, + + onProgress: (progress) => { + sendProgress(requestId, progress) + }, + + onMeta: (meta) => { + name = meta.name + platform = meta.platform + }, + + onMembers: (members) => { + for (const m of members) { + memberSet.add(m.platformId) + } + }, + + onMessageBatch: (messages) => { + messageCount += messages.length + for (const msg of messages) { + memberSet.add(msg.senderPlatformId) + } + }, + }) + + return { + name, + format: formatFeature.name, + platform, + messageCount, + memberCount: memberSet.size, + fileSize, + } +} diff --git a/electron/main/worker/workerManager.ts b/electron/main/worker/workerManager.ts index 1e75e43..0ca64b4 100644 --- a/electron/main/worker/workerManager.ts +++ b/electron/main/worker/workerManager.ts @@ -7,6 +7,7 @@ import { Worker } from 'worker_threads' import { app } from 'electron' import * as path from 'path' import * as fs from 'fs' +import type { ParseProgress } from '../parser' // Worker 实例 let worker: Worker | null = null @@ -17,6 +18,7 @@ const pendingRequests = new Map< { resolve: (value: any) => void reject: (error: Error) => void + onProgress?: (progress: ParseProgress) => void // 进度回调 } >() @@ -86,17 +88,26 @@ export function initWorker(): void { // 监听 Worker 消息 worker.on('message', (message) => { - const { id, success, result, error } = message + const { id, type, success, result, error, payload } = message const pending = pendingRequests.get(id) - if (pending) { - pendingRequests.delete(id) + if (!pending) return - if (success) { - pending.resolve(result) - } else { - pending.reject(new Error(error)) + // 处理进度消息(不删除 pending,因为还没完成) + if (type === 'progress') { + if (pending.onProgress) { + pending.onProgress(payload) } + return + } + + // 处理完成或错误消息 + pendingRequests.delete(id) + + if (success) { + pending.resolve(result) + } else { + pending.reject(new Error(error)) } }) @@ -130,7 +141,6 @@ export function initWorker(): void { function sendToWorker(type: string, payload: any): Promise { return new Promise((resolve, reject) => { if (!worker) { - // 尝试初始化 Worker try { initWorker() } catch (error) { @@ -155,6 +165,42 @@ function sendToWorker(type: string, payload: any): Promise { }) } +/** + * 发送消息到 Worker 并等待响应(带进度回调) + * 用于流式导入等长时间操作 + */ +function sendToWorkerWithProgress( + type: string, + payload: any, + onProgress?: (progress: ParseProgress) => void, + timeoutMs: number = 600000 // 默认 10 分钟超时 +): Promise { + return new Promise((resolve, reject) => { + if (!worker) { + try { + initWorker() + } catch (error) { + reject(new Error('Worker not initialized')) + return + } + } + + const id = `req_${++requestIdCounter}` + + pendingRequests.set(id, { resolve, reject, onProgress }) + + worker!.postMessage({ id, type, payload }) + + // 设置超时 + setTimeout(() => { + if (pendingRequests.has(id)) { + pendingRequests.delete(id) + reject(new Error(`Worker request timeout: ${type}`)) + } + }, timeoutMs) + }) +} + /** * 关闭 Worker */ @@ -261,11 +307,40 @@ export async function closeDatabase(sessionId: string): Promise { /** * 解析文件获取基本信息(在 Worker 线程中执行) + * @deprecated 使用 streamParseFileInfo 替代 */ export async function parseFileInfo(filePath: string): Promise { return sendToWorker('parseFileInfo', { filePath }) } +/** + * 流式解析文件获取基本信息(用于合并预览) + */ +export async function streamParseFileInfo( + filePath: string, + onProgress?: (progress: ParseProgress) => void +): Promise<{ + name: string + format: string + platform: string + messageCount: number + memberCount: number +}> { + return sendToWorkerWithProgress('streamParseFileInfo', { filePath }, onProgress) +} + +/** + * 流式导入聊天记录 + * @param filePath 文件路径 + * @param onProgress 进度回调 + */ +export async function streamImport( + filePath: string, + onProgress?: (progress: ParseProgress) => void +): Promise<{ success: boolean; sessionId?: string; error?: string }> { + return sendToWorkerWithProgress('streamImport', { filePath }, onProgress) +} + /** * 获取数据库目录(供外部使用) */ diff --git a/electron/preload/index.d.ts b/electron/preload/index.d.ts index ef6d7bf..38bb4ab 100644 --- a/electron/preload/index.d.ts +++ b/electron/preload/index.d.ts @@ -76,6 +76,7 @@ interface MergeApi { parseFileInfo: (filePath: string) => Promise checkConflicts: (filePaths: string[]) => Promise mergeFiles: (params: MergeParams) => Promise + onParseProgress: (callback: (data: { filePath: string; progress: ImportProgress }) => void) => () => void } declare global { diff --git a/electron/preload/index.ts b/electron/preload/index.ts index c5dad11..b765cd4 100644 --- a/electron/preload/index.ts +++ b/electron/preload/index.ts @@ -42,7 +42,7 @@ const api = { } }, receive: (channel: string, func: (...args: unknown[]) => void) => { - const validChannels = ['show-message', 'chat:importProgress'] + const validChannels = ['show-message', 'chat:importProgress', 'merge:parseProgress'] if (validChannels.includes(channel)) { // Deliberately strip event as it includes `sender` ipcRenderer.on(channel, (_event, ...args) => func(...args)) @@ -301,6 +301,19 @@ const mergeApi = { mergeFiles: (params: MergeParams): Promise => { return ipcRenderer.invoke('merge:mergeFiles', params) }, + + /** + * 监听解析进度(用于大文件) + */ + onParseProgress: (callback: (data: { filePath: string; progress: ImportProgress }) => void) => { + const handler = (_event: Electron.IpcRendererEvent, data: { filePath: string; progress: ImportProgress }) => { + callback(data) + } + ipcRenderer.on('merge:parseProgress', handler) + return () => { + ipcRenderer.removeListener('merge:parseProgress', handler) + } + }, } // 扩展 api,添加 dialog 功能 diff --git a/package.json b/package.json index 3e8b30f..2490b20 100644 --- a/package.json +++ b/package.json @@ -24,7 +24,8 @@ "@electron-toolkit/preload": "^3.0.1", "@electron-toolkit/utils": "^4.0.0", "better-sqlite3": "^12.4.6", - "electron-updater": "^6.6.2" + "electron-updater": "^6.6.2", + "stream-json": "^1.9.1" }, "devDependencies": { "@electron-toolkit/eslint-config": "^1.0.2", @@ -35,6 +36,7 @@ "@rushstack/eslint-patch": "^1.15.0", "@tailwindcss/vite": "^4.0.0", "@types/better-sqlite3": "^7.6.13", + "@types/stream-json": "^1.7.8", "@vitejs/plugin-vue": "^5.2.3", "@vue/eslint-config-prettier": "^10.2.0", "@vueuse/core": "^13.9.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 39948c2..81430f5 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -20,6 +20,9 @@ importers: electron-updater: specifier: ^6.6.2 version: 6.6.2 + stream-json: + specifier: ^1.9.1 + version: 1.9.1 devDependencies: '@electron-toolkit/eslint-config': specifier: ^1.0.2 @@ -45,6 +48,9 @@ importers: '@types/better-sqlite3': specifier: ^7.6.13 version: 7.6.13 + '@types/stream-json': + specifier: ^1.7.8 + version: 1.7.8 '@vitejs/plugin-vue': specifier: ^5.2.3 version: 5.2.4(vite@6.4.1(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.30.2))(vue@3.5.25(typescript@5.9.3)) @@ -976,6 +982,12 @@ packages: '@types/responselike@1.0.3': resolution: {integrity: sha512-H/+L+UkTV33uf49PH5pCAUBVPNj2nDBXTN+qS1dOwyyg24l3CcicicCA7ca+HMvJBZcFgl5r8e+RR6elsb4Lyw==} + '@types/stream-chain@2.1.0': + resolution: {integrity: sha512-guDyAl6s/CAzXUOWpGK2bHvdiopLIwpGu8v10+lb9hnQOyo4oj/ZUQFOvqFjKGsE3wJP1fpIesCcMvbXuWsqOg==} + + '@types/stream-json@1.7.8': + resolution: {integrity: sha512-MU1OB1eFLcYWd1LjwKXrxdoPtXSRzRmAnnxs4Js/ayB5O/NvHraWwuOaqMWIebpYwM6khFlsJOHEhI9xK/ab4Q==} + '@types/verror@1.10.11': resolution: {integrity: sha512-RlDm9K7+o5stv0Co8i8ZRGxDbrTxhJtgjqjFyVh/tXQyl/rYtTKlnTvZ88oSTeYREWurwx20Js4kTuKCsFkUtg==} @@ -3164,6 +3176,12 @@ packages: std-env@3.10.0: resolution: {integrity: sha512-5GS12FdOZNliM5mAOxFRg7Ir0pWz8MdpYm6AY6VPkGpbA7ZzmbzNcBJQ0GPvvyWgcY7QAhCgf9Uy89I03faLkg==} + stream-chain@2.2.5: + resolution: {integrity: sha512-1TJmBx6aSWqZ4tx7aTpBDXK0/e2hhcNSTV8+CbFJtDjbb+I1mZ8lHit0Grw9GRT+6JbIrrDd8esncgBi8aBXGA==} + + stream-json@1.9.1: + resolution: {integrity: sha512-uWkjJ+2Nt/LO9Z/JyKZbMusL8Dkh97uUBTv3AJQ74y07lVahLY4eEFsPsE97pxYBwr8nnjMAIch5eqI0gPShyw==} + string-width@4.2.3: resolution: {integrity: sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g==} engines: {node: '>=8'} @@ -4686,6 +4704,15 @@ snapshots: dependencies: '@types/node': 24.10.1 + '@types/stream-chain@2.1.0': + dependencies: + '@types/node': 24.10.1 + + '@types/stream-json@1.7.8': + dependencies: + '@types/node': 24.10.1 + '@types/stream-chain': 2.1.0 + '@types/verror@1.10.11': optional: true @@ -7122,6 +7149,12 @@ snapshots: std-env@3.10.0: {} + stream-chain@2.2.5: {} + + stream-json@1.9.1: + dependencies: + stream-chain: 2.2.5 + string-width@4.2.3: dependencies: emoji-regex: 8.0.0 diff --git a/src/components/tools/MergeTab.vue b/src/components/tools/MergeTab.vue index 33e7d79..0a5aa70 100644 --- a/src/components/tools/MergeTab.vue +++ b/src/components/tools/MergeTab.vue @@ -1,5 +1,5 @@