feat: 支持旧版QQ txt版本的讨论组格式

This commit is contained in:
digua
2025-12-23 23:03:45 +08:00
parent 1f7c5fef06
commit 616e7e38ea
12 changed files with 218 additions and 28 deletions
@@ -123,7 +123,7 @@ export const feature: FormatFeature = {
// ==================== 解析器实现 ====================
async function* parseChatLabJsonl(options: ParseOptions): AsyncGenerator<ParseEvent, void, unknown> {
const { filePath, batchSize = 5000, onProgress } = options
const { filePath, batchSize = 5000, onProgress, onLog } = options
const totalBytes = getFileSize(filePath)
let bytesRead = 0
@@ -134,6 +134,9 @@ async function* parseChatLabJsonl(options: ParseOptions): AsyncGenerator<ParseEv
yield { type: 'progress', data: initialProgress }
onProgress?.(initialProgress)
// 记录解析开始
onLog?.('info', `开始解析 ChatLab JSONL 文件,大小: ${(totalBytes / 1024 / 1024).toFixed(2)} MB`)
// 用于收集成员和消息
const members: ParsedMember[] = []
const memberMap = new Map<string, ParsedMember>()
@@ -258,11 +261,15 @@ async function* parseChatLabJsonl(options: ParseOptions): AsyncGenerator<ParseEv
yield { type: 'progress', data: doneProgress }
onProgress?.(doneProgress)
// 记录解析摘要
const memberCount = members.length > 0 ? members.length : memberMap.size
onLog?.('info', `解析完成: ${messagesProcessed} 条消息, ${memberCount} 个成员`)
yield {
type: 'done',
data: {
messageCount: messagesProcessed,
memberCount: members.length > 0 ? members.length : memberMap.size,
memberCount,
},
}
}
+9 -2
View File
@@ -76,7 +76,7 @@ interface ChatLabMember {
// ==================== 解析器实现 ====================
async function* parseChatLab(options: ParseOptions): AsyncGenerator<ParseEvent, void, unknown> {
const { filePath, batchSize = 5000, onProgress } = options
const { filePath, batchSize = 5000, onProgress, onLog } = options
const totalBytes = getFileSize(filePath)
let bytesRead = 0
@@ -87,6 +87,9 @@ async function* parseChatLab(options: ParseOptions): AsyncGenerator<ParseEvent,
yield { type: 'progress', data: initialProgress }
onProgress?.(initialProgress)
// 记录解析开始
onLog?.('info', `开始解析 ChatLab 格式文件,大小: ${(totalBytes / 1024 / 1024).toFixed(2)} MB`)
// 读取文件头获取 meta 和 members 信息
const headContent = readFileHeadBytes(filePath, 200000)
@@ -246,11 +249,15 @@ async function* parseChatLab(options: ParseOptions): AsyncGenerator<ParseEvent,
yield { type: 'progress', data: doneProgress }
onProgress?.(doneProgress)
// 记录解析摘要
const memberCount = members.length > 0 ? members.length : memberMapFromMessages.size
onLog?.('info', `解析完成: ${messagesProcessed} 条消息, ${memberCount} 个成员`)
yield {
type: 'done',
data: {
messageCount: messagesProcessed,
memberCount: members.length > 0 ? members.length : memberMapFromMessages.size,
memberCount,
},
}
}
+36 -10
View File
@@ -50,17 +50,19 @@ export const feature: FormatFeature = {
priority: 30,
extensions: ['.txt'],
signatures: {
head: [/消息记录(此消息记录为文本格式/, /消息对象:/],
// 支持群聊导出和多人聊天(讨论组)导出
head: [/消息记录(此消息记录为文本格式/, /消息对象:/, /多人聊天/],
},
}
// ==================== 消息头正则 ====================
// 匹配2019-07-16 18:15:05 夜喵大人🐱(642163903)
// 2019-07-16 18:15:11 铛🔔<ppbaozi@gmail.com>
const MESSAGE_HEADER_REGEX = /^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.+?)(?:\(([^)]+)\)|<([^>]+)>)$/
// 群聊格式2019-07-16 18:15:05 地瓜(23333233)
// 邮箱格式2019-07-16 18:15:11 土豆<example@xx.com>
// 讨论组格式:2017-08-29 20:28:30 番茄(没有 ID,只有昵称)
const MESSAGE_HEADER_REGEX = /^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (.+?)(?:\(([^)]+)\)|<([^>]+)>)?$/
// 匹配群名:消息对象:杭州FE
// 匹配群名:消息对象:xxx
const GROUP_NAME_REGEX = /^消息对象:(.+)$/
// ==================== 消息类型判断 ====================
@@ -141,17 +143,21 @@ const lastValidNickname = new Map<string, string>()
// ==================== 解析器实现 ====================
async function* parseTxt(options: ParseOptions): AsyncGenerator<ParseEvent, void, unknown> {
const { filePath, batchSize = 5000, onProgress } = options
const { filePath, batchSize = 5000, onProgress, onLog } = options
const totalBytes = getFileSize(filePath)
let bytesRead = 0
let messagesProcessed = 0
let skippedLines = 0 // 跳过的无效行计数
// 发送初始进度
const initialProgress = createProgress('parsing', 0, totalBytes, 0, '开始解析...')
yield { type: 'progress', data: initialProgress }
onProgress?.(initialProgress)
// 记录解析开始
onLog?.('info', `开始解析 QQ TXT 文件,大小: ${(totalBytes / 1024 / 1024).toFixed(2)} MB`)
// 收集数据
let groupName = '未知群聊'
const memberMap = new Map<string, MemberInfo>()
@@ -218,17 +224,19 @@ async function* parseTxt(options: ParseOptions): AsyncGenerator<ParseEvent, void
const timeStr = headerMatch[1]
const rawNickname = headerMatch[2].trim()
let nickname = cleanNickname(rawNickname) // 清理前缀污染
const platformId = headerMatch[3] || headerMatch[4] // (id) 或 <email>
// platformId: (id) 或 <email>,如果没有则使用昵称(讨论组格式)
let platformId = headerMatch[3] || headerMatch[4] || nickname
// 如果昵称和 ID 相同,可能是系统故障,使用之前记录的昵称
if (nickname === platformId) {
if (nickname === platformId && headerMatch[3]) {
// 只有当确实有 ID 时才检查昵称覆盖
const previousNickname = lastValidNickname.get(platformId)
if (previousNickname) {
nickname = previousNickname
}
// 如果没有之前的记录,保持使用 ID 作为昵称
} else {
// 记录有效昵称(昵称 != ID)
} else if (headerMatch[3] || headerMatch[4]) {
// 记录有效昵称(有 ID 且昵称 != ID
lastValidNickname.set(platformId, nickname)
}
@@ -262,6 +270,18 @@ async function* parseTxt(options: ParseOptions): AsyncGenerator<ParseEvent, void
if (line.startsWith('消息记录') || line.startsWith('消息分组')) continue
currentMessage.contentLines.push(line)
} else {
// 没有当前消息时,检查是否是需要跳过的行
const trimmed = line.trim()
if (
trimmed &&
!trimmed.startsWith('=====') &&
!trimmed.startsWith('消息记录') &&
!trimmed.startsWith('消息分组')
) {
// 这是一个无法解析的非空行
skippedLines++
}
}
}
@@ -295,6 +315,12 @@ async function* parseTxt(options: ParseOptions): AsyncGenerator<ParseEvent, void
yield { type: 'progress', data: doneProgress }
onProgress?.(doneProgress)
// 记录解析摘要
onLog?.('info', `解析完成: ${messagesProcessed} 条消息, ${memberMap.size} 个成员`)
if (skippedLines > 0) {
onLog?.('info', `跳过 ${skippedLines} 行无法解析的内容`)
}
yield {
type: 'done',
data: { messageCount: messagesProcessed, memberCount: memberMap.size },
@@ -227,17 +227,21 @@ function convertMessageType(
// ==================== 解析器实现 ====================
async function* parseV4(options: ParseOptions): AsyncGenerator<ParseEvent, void, unknown> {
const { filePath, batchSize = 5000, onProgress } = options
const { filePath, batchSize = 5000, onProgress, onLog } = options
const totalBytes = getFileSize(filePath)
let bytesRead = 0
let messagesProcessed = 0
let skippedMessages = 0 // 跳过的无效消息计数
// 发送初始进度
const initialProgress = createProgress('parsing', 0, totalBytes, 0, '开始解析...')
yield { type: 'progress', data: initialProgress }
onProgress?.(initialProgress)
// 记录解析开始
onLog?.('info', `开始解析 QQ Chat Exporter 文件,大小: ${(totalBytes / 1024 / 1024).toFixed(2)} MB`)
// 读取文件头获取 meta 信息(增加到 500KB 以包含 chatInfo.avatar
const headContent = readFileHeadBytes(filePath, 500000)
@@ -283,7 +287,10 @@ async function* parseV4(options: ParseOptions): AsyncGenerator<ParseEvent, void,
// 获取 platformId
const platformId =
value.sender.uin || value.sender.uid || value.rawMessage?.senderUin || value.rawMessage?.senderUid
if (!platformId) return
if (!platformId) {
skippedMessages++
return
}
// 获取名字信息
const raw = value.rawMessage
@@ -301,7 +308,10 @@ async function* parseV4(options: ParseOptions): AsyncGenerator<ParseEvent, void,
// 解析时间戳
const timestamp = parseTimestamp(value.timestamp)
if (timestamp === null || !isValidYear(timestamp)) return
if (timestamp === null || !isValidYear(timestamp)) {
skippedMessages++
return
}
// 消息类型
const type = value.isSystemMessage
@@ -389,6 +399,12 @@ async function* parseV4(options: ParseOptions): AsyncGenerator<ParseEvent, void,
yield { type: 'progress', data: doneProgress }
onProgress?.(doneProgress)
// 记录解析摘要
onLog?.('info', `解析完成: ${messagesProcessed} 条消息, ${memberMap.size} 个成员`)
if (skippedMessages > 0) {
onLog?.('info', `跳过 ${skippedMessages} 条无效消息(缺少发送者ID或时间戳无效)`)
}
yield {
type: 'done',
data: { messageCount: messagesProcessed, memberCount: memberMap.size },
@@ -208,7 +208,7 @@ function extractTextContent(wechatType: number, content: string | null): string
// ==================== 解析器实现 ====================
async function* parseWechatDefault(options: ParseOptions): AsyncGenerator<ParseEvent, void, unknown> {
const { filePath, batchSize = 5000, onProgress } = options
const { filePath, batchSize = 5000, onProgress, onLog } = options
const totalBytes = getFileSize(filePath)
let bytesRead = 0
@@ -219,6 +219,9 @@ async function* parseWechatDefault(options: ParseOptions): AsyncGenerator<ParseE
yield { type: 'progress', data: initialProgress }
onProgress?.(initialProgress)
// 记录解析开始
onLog?.('info', `开始解析微信导出文件,大小: ${(totalBytes / 1024 / 1024).toFixed(2)} MB`)
// 从文件名提取对方名称
const otherName = extractNameFromFilePath(filePath)
const selfPlatformId = 'self'
@@ -317,6 +320,9 @@ async function* parseWechatDefault(options: ParseOptions): AsyncGenerator<ParseE
yield { type: 'progress', data: doneProgress }
onProgress?.(doneProgress)
// 记录解析摘要
onLog?.('info', `解析完成: ${messagesProcessed} 条消息, ${memberMap.size} 个成员`)
yield {
type: 'done',
data: { messageCount: messagesProcessed, memberCount: memberMap.size },
@@ -147,7 +147,7 @@ interface MemberInfo {
// ==================== 解析器实现 ====================
async function* parseEchotrace(options: ParseOptions): AsyncGenerator<ParseEvent, void, unknown> {
const { filePath, batchSize = 5000, onProgress } = options
const { filePath, batchSize = 5000, onProgress, onLog } = options
const totalBytes = getFileSize(filePath)
let bytesRead = 0
@@ -158,6 +158,9 @@ async function* parseEchotrace(options: ParseOptions): AsyncGenerator<ParseEvent
yield { type: 'progress', data: initialProgress }
onProgress?.(initialProgress)
// 记录解析开始
onLog?.('info', `开始解析 Echotrace 微信导出文件,大小: ${(totalBytes / 1024 / 1024).toFixed(2)} MB`)
// 读取文件头获取 session 信息
const headContent = readFileHeadBytes(filePath, 2000)
@@ -485,6 +488,9 @@ async function* parseEchotrace(options: ParseOptions): AsyncGenerator<ParseEvent
yield { type: 'progress', data: doneProgress }
onProgress?.(doneProgress)
// 记录解析摘要
onLog?.('info', `解析完成: ${messagesProcessed} 条消息, ${memberMap.size} 个成员`)
yield {
type: 'done',
data: { messageCount: messagesProcessed, memberCount: memberMap.size },
@@ -500,7 +506,7 @@ export const parser_: Parser = {
// ==================== 预处理器(预留) ====================
import { echotracePreprocessor } from './echotrace-preprocessor'
import { echotracePreprocessor } from './ycccccccy-echotrace-preprocessor'
export const preprocessor = echotracePreprocessor
// ==================== 导出格式模块 ====================
+4 -2
View File
@@ -219,6 +219,8 @@ export interface StreamParseCallbacks {
onMeta: (meta: ParsedMeta) => void
onMembers: (members: ParsedMember[]) => void
onMessageBatch: (messages: ParsedMessage[]) => void
/** 日志回调(可选) */
onLog?: (level: 'info' | 'error', message: string) => void
}
export interface StreamParseOptions extends StreamParseCallbacks {
@@ -234,9 +236,9 @@ export async function streamParseFile(
filePath: string,
callbacks: Omit<StreamParseOptions, 'filePath'>
): Promise<void> {
const { onProgress, onMeta, onMembers, onMessageBatch, batchSize = 5000 } = callbacks
const { onProgress, onMeta, onMembers, onMessageBatch, onLog, batchSize = 5000 } = callbacks
for await (const event of parseFile({ filePath, batchSize, onProgress })) {
for await (const event of parseFile({ filePath, batchSize, onProgress, onLog })) {
switch (event.type) {
case 'meta':
onMeta(event.data)
+5
View File
@@ -90,6 +90,9 @@ export interface FormatFeature {
// ==================== 解析层:解析器接口 ====================
/** 日志级别 */
export type LogLevel = 'info' | 'error'
/**
* 解析选项
*/
@@ -100,6 +103,8 @@ export interface ParseOptions {
batchSize?: number
/** 进度回调(可选,用于外部监听) */
onProgress?: (progress: ParseProgress) => void
/** 日志回调(可选,用于记录解析过程中的信息、警告、错误) */
onLog?: (level: LogLevel, message: string) => void
}
/**
+12 -1
View File
@@ -15,5 +15,16 @@ export {
type TimeFilter,
} from './dbCore'
export { initPerfLog, logPerf, logPerfDetail, resetPerfLog, getCurrentLogFile } from './perfLogger'
export {
initPerfLog,
logPerf,
logPerfDetail,
resetPerfLog,
getCurrentLogFile,
logError,
logInfo,
logSummary,
getErrorCount,
LogLevel,
} from './perfLogger'
+75 -3
View File
@@ -1,17 +1,26 @@
/**
* 性能日志模块
* 实时记录导入过程的性能指标
* 导入日志模块
* 实时记录导入过程的性能指标、错误和警告信息
*/
import * as fs from 'fs'
import * as path from 'path'
import { getDbDir } from './dbCore'
// 日志级别
export enum LogLevel {
ERROR = 'ERROR',
INFO = 'INFO',
}
// 状态
let lastLogTime = Date.now()
let lastMessageCount = 0
let currentLogFile: string | null = null
// 统计计数器
let errorCount = 0
/**
* 获取性能日志目录
*/
@@ -32,7 +41,7 @@ export function initPerfLog(sessionId: string): void {
const logDir = getLogDir()
currentLogFile = path.join(logDir, `import_${sessionId}_${Date.now()}.log`)
// 写入头部
fs.writeFileSync(currentLogFile, `=== 导入性能日志 ===\n开始时间: ${new Date().toISOString()}\n\n`, 'utf-8')
fs.writeFileSync(currentLogFile, `=== 导入日志 ===\n开始时间: ${new Date().toISOString()}\n\n`, 'utf-8')
} catch {
// 忽略初始化失败
}
@@ -98,6 +107,7 @@ export function resetPerfLog(): void {
lastLogTime = Date.now()
lastMessageCount = 0
currentLogFile = null
errorCount = 0
}
/**
@@ -106,3 +116,65 @@ export function resetPerfLog(): void {
export function getCurrentLogFile(): string | null {
return currentLogFile
}
// ==================== 通用日志函数 ====================
/**
* 写入日志行
*/
function writeLogLine(level: LogLevel, message: string): void {
if (!currentLogFile) return
const logLine = `[${new Date().toISOString()}] [${level}] ${message}\n`
try {
fs.appendFileSync(currentLogFile, logLine, 'utf-8')
} catch {
// 忽略写入失败
}
}
/**
* 记录错误日志
* @param message 错误描述
* @param error 可选的 Error 对象
*/
export function logError(message: string, error?: Error): void {
errorCount++
const errorDetail = error ? `: ${error.message}` : ''
writeLogLine(LogLevel.ERROR, `${message}${errorDetail}`)
}
/**
* 记录信息日志
* @param message 信息描述
*/
export function logInfo(message: string): void {
writeLogLine(LogLevel.INFO, message)
}
/**
* 获取错误计数
*/
export function getErrorCount(): number {
return errorCount
}
/**
* 写入日志摘要(导入完成时调用)
*/
export function logSummary(totalMessages: number, totalMembers: number): void {
if (!currentLogFile) return
const summary = `
=== 导入摘要 ===
结束时间: ${new Date().toISOString()}
总消息数: ${totalMessages.toLocaleString()}
总成员数: ${totalMembers.toLocaleString()}
错误数: ${errorCount}
`
try {
fs.appendFileSync(currentLogFile, summary, 'utf-8')
} catch {
// 忽略
}
}
+34 -2
View File
@@ -18,7 +18,15 @@ import {
type ParsedMessage,
} from '../../parser'
import { getDbDir } from '../core'
import { initPerfLog, logPerf, logPerfDetail, resetPerfLog } from '../core'
import {
initPerfLog,
logPerf,
logPerfDetail,
resetPerfLog,
logInfo,
logError,
logSummary,
} from '../core'
/** 流式导入结果 */
export interface StreamImportResult {
@@ -210,6 +218,11 @@ export async function streamImport(filePath: string, requestId: string): Promise
resetPerfLog()
const sessionId = generateSessionId()
initPerfLog(sessionId)
// 记录导入开始信息
logInfo(`文件路径: ${filePath}`)
logInfo(`检测到格式: ${formatFeature.name} (${formatFeature.id})`)
logInfo(`平台: ${formatFeature.platform}`)
logPerf('开始导入', 0)
// 预处理:如果格式需要且文件较大,先精简
@@ -218,6 +231,7 @@ export async function streamImport(filePath: string, requestId: string): Promise
const preprocessor = getPreprocessor(filePath)
if (preprocessor && needsPreprocess(filePath)) {
logInfo('文件需要预处理,开始精简大文件...')
sendProgress(requestId, {
stage: 'parsing',
bytesRead: 0,
@@ -235,10 +249,13 @@ export async function streamImport(filePath: string, requestId: string): Promise
})
})
actualFilePath = tempFilePath
logInfo(`预处理完成,临时文件: ${tempFilePath}`)
} catch (err) {
const errorMsg = `预处理失败: ${err instanceof Error ? err.message : String(err)}`
logError(errorMsg, err instanceof Error ? err : undefined)
return {
success: false,
error: `预处理失败: ${err instanceof Error ? err.message : String(err)}`,
error: errorMsg,
}
}
}
@@ -351,6 +368,15 @@ export async function streamImport(filePath: string, requestId: string): Promise
sendProgress(requestId, progress)
},
onLog: (level, message) => {
// 将解析器日志写入导入日志文件
if (level === 'error') {
logError(message)
} else {
logInfo(message)
}
},
onMeta: (meta: ParsedMeta) => {
if (!metaInserted) {
insertMeta.run(
@@ -625,8 +651,14 @@ export async function streamImport(filePath: string, requestId: string): Promise
logPerf('WAL checkpoint 完成', totalMessageCount)
logPerf('导入完成', totalMessageCount)
// 写入日志摘要
logSummary(totalMessageCount, memberIdMap.size)
return { success: true, sessionId }
} catch (error) {
// 记录错误日志
logError('导入失败', error instanceof Error ? error : undefined)
// 回滚当前事务
if (inTransaction) {
try {