fix(sync): prevent data loss on small pages and validate retry import results

P1: Small terminal pages with hasMore=false were incorrectly treated as
empty based on file size alone, causing valid messages to be silently
dropped. Now checks actual message content via fileContainsMessages().

P1: Retry import results were not validated, so import failures (including
needFullResync) were masked as success. Now properly handles all failure
cases and tracks retryHasMore to exit the retry loop correctly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
digua
2026-05-23 15:37:24 +08:00
committed by digua
parent 92c8a795f9
commit 53e0d21ec7
2 changed files with 293 additions and 5 deletions
+221
View File
@@ -0,0 +1,221 @@
import { afterEach, describe, it } from 'node:test'
import assert from 'node:assert/strict'
import * as fs from 'fs'
import * as os from 'os'
import * as path from 'path'
import { PullEngine } from './pull-engine'
import type { DataSource, DataImporter, FetchParams, HttpFetcher, ImportSession, SyncNotifier } from './types'
const tempFiles: string[] = []
afterEach(() => {
for (const file of tempFiles.splice(0)) {
try {
if (fs.existsSync(file)) fs.unlinkSync(file)
} catch {
/* ignore */
}
}
})
function writeTempJson(data: unknown): string {
const file = path.join(os.tmpdir(), `chatlab-pull-engine-test-${process.pid}-${tempFiles.length}.json`)
fs.writeFileSync(file, JSON.stringify(data), 'utf-8')
tempFiles.push(file)
return file
}
async function withImmediateTimers<T>(fn: () => Promise<T>): Promise<T> {
const originalSetTimeout = globalThis.setTimeout
;(globalThis as any).setTimeout = (callback: (...args: any[]) => void) => {
callback()
return 0
}
try {
return await fn()
} finally {
globalThis.setTimeout = originalSetTimeout
}
}
function createDataSource(): DataSource {
return {
id: 'ds_test',
name: 'Test Source',
baseUrl: 'http://example.test/api/v1',
token: '',
intervalMinutes: 10,
pullLimit: 1000,
enabled: true,
createdAt: 1,
sessions: [],
}
}
function createSession(): ImportSession {
return {
id: 'is_test',
name: 'Test Session',
remoteSessionId: 'remote_session',
targetSessionId: 'local_session',
lastPullAt: 100,
lastStatus: 'idle',
lastError: '',
lastNewMessages: 0,
}
}
function createEngine(options: {
files: string[]
importResult: Awaited<ReturnType<DataImporter['importFile']>>
dataSource: DataSource
sessionUpdates?: Array<{ sessionId: string; updates: Partial<ImportSession> }>
pullResults?: Array<{ status: 'success' | 'error'; detail: string }>
}): PullEngine {
const files = [...options.files]
const fetcher: HttpFetcher = {
async fetchToTempFile(
_baseUrl: string,
_remoteSessionId: string,
_token: string,
_params: FetchParams
): Promise<string> {
const file = files.shift()
if (!file) throw new Error('Unexpected retry fetch')
return file
},
}
const importer: DataImporter = {
sessionExists: () => true,
importFile: async () => options.importResult,
}
const notifier: SyncNotifier = {
onSessionListChanged: () => {},
onPullResult: (_sourceId, _sessionId, status, detail) => {
options.pullResults?.push({ status, detail })
},
}
const dsManager = {
get: () => options.dataSource,
updateSession: (_sourceId: string, sessionId: string, updates: Partial<ImportSession>) => {
options.sessionUpdates?.push({ sessionId, updates })
},
}
return new PullEngine({
fetcher,
importer,
notifier,
dsManager: dsManager as any,
})
}
describe('PullEngine', () => {
it('imports a small final page instead of treating it as empty', async () => {
const session = createSession()
const dataSource = createDataSource()
dataSource.sessions = [session]
const smallFinalPage = writeTempJson({
chatlab: { version: '0.0.2', exportedAt: 100 },
meta: { name: 'Test Session', platform: 'test', type: 'group' },
members: [{ platformId: 'u1', accountName: 'Alice' }],
messages: [{ sender: 'u1', timestamp: 101, type: 0, content: 'hi' }],
sync: { hasMore: false, nextSince: 101 },
})
let importCount = 0
const engine = createEngine({
files: [smallFinalPage],
dataSource,
importResult: {
success: true,
newMessageCount: 1,
sessionId: session.targetSessionId,
},
})
;(engine as any).importer.importFile = async () => {
importCount++
return { success: true, newMessageCount: 1, sessionId: session.targetSessionId }
}
const result = await withImmediateTimers(() => engine.executePullSession(dataSource.id, dataSource, session))
assert.equal(result.success, true)
assert.equal(result.newMessageCount, 1)
assert.equal(importCount, 1)
})
it('reports retry import failure instead of marking the pull successful', async () => {
const session = createSession()
const dataSource = createDataSource()
dataSource.sessions = [session]
const emptyInitialPage = writeTempJson({
chatlab: { version: '0.0.2', exportedAt: 100 },
meta: { name: 'Test Session', platform: 'test', type: 'group' },
members: [],
messages: [],
sync: { hasMore: false, nextSince: 100 },
})
const retryPage = writeTempJson({
chatlab: { version: '0.0.2', exportedAt: 100 },
meta: { name: 'Test Session', platform: 'test', type: 'group' },
members: [{ platformId: 'u1', accountName: 'Alice' }],
messages: [
{
sender: 'u1',
timestamp: 101,
type: 0,
content: 'x'.repeat(1200),
},
],
sync: { hasMore: false, nextSince: 101 },
})
const emptyTerminalPage = writeTempJson({
chatlab: { version: '0.0.2', exportedAt: 100 },
meta: { name: 'Test Session', platform: 'test', type: 'group' },
members: [],
messages: [],
sync: { hasMore: false, nextSince: 101 },
})
const emptyRetryPage1 = writeTempJson({
chatlab: { version: '0.0.2', exportedAt: 100 },
meta: { name: 'Test Session', platform: 'test', type: 'group' },
members: [],
messages: [],
sync: { hasMore: false, nextSince: 101 },
})
const emptyRetryPage2 = writeTempJson({
chatlab: { version: '0.0.2', exportedAt: 100 },
meta: { name: 'Test Session', platform: 'test', type: 'group' },
members: [],
messages: [],
sync: { hasMore: false, nextSince: 101 },
})
const emptyRetryPage3 = writeTempJson({
chatlab: { version: '0.0.2', exportedAt: 100 },
meta: { name: 'Test Session', platform: 'test', type: 'group' },
members: [],
messages: [],
sync: { hasMore: false, nextSince: 101 },
})
const sessionUpdates: Array<{ sessionId: string; updates: Partial<ImportSession> }> = []
const pullResults: Array<{ status: 'success' | 'error'; detail: string }> = []
const engine = createEngine({
files: [emptyInitialPage, retryPage, emptyTerminalPage, emptyRetryPage1, emptyRetryPage2, emptyRetryPage3],
dataSource,
sessionUpdates,
pullResults,
importResult: {
success: false,
newMessageCount: 0,
sessionId: session.targetSessionId,
error: 'retry import failed',
},
})
const result = await withImmediateTimers(() => engine.executePullSession(dataSource.id, dataSource, session))
assert.equal(result.success, false)
assert.equal(result.error, 'retry import failed')
assert.equal(pullResults.at(-1)?.status, 'error')
assert.equal(sessionUpdates.at(-1)?.updates.lastStatus, 'error')
})
})
+72 -5
View File
@@ -77,6 +77,31 @@ export function parseSyncFromFile(filePath: string): SyncMeta | null {
}
}
function fileContainsMessages(filePath: string): boolean {
try {
if (filePath.endsWith('.jsonl')) {
const content = fs.readFileSync(filePath, 'utf-8')
for (const line of content.split('\n')) {
const trimmed = line.trim()
if (!trimmed) continue
try {
const obj = JSON.parse(trimmed)
if (obj._type === 'message') return true
} catch {
continue
}
}
return false
}
const raw = fs.readFileSync(filePath, 'utf-8')
const parsed = JSON.parse(raw)
return Array.isArray(parsed.messages) && parsed.messages.length > 0
} catch {
return false
}
}
function cleanupTempFile(filePath: string): void {
try {
if (fs.existsSync(filePath)) fs.unlinkSync(filePath)
@@ -177,12 +202,13 @@ export class PullEngine {
const stat = fs.statSync(tempFile)
this.logger.info(`[Pull] "${sess.name}" page ${pageCount}: fetched ${stat.size} bytes`)
const sync0 = parseSyncFromFile(tempFile)
if (stat.size < 1024) {
const sync0 = parseSyncFromFile(tempFile)
if (!sync0?.hasMore) {
if (!fileContainsMessages(tempFile) && sync0?.hasMore === false) {
cleanupTempFile(tempFile)
const retryDelays = [2000, 3000, 5000]
let retrySuccess = false
let retryHasMore = false
for (let ri = 0; ri < retryDelays.length; ri++) {
this.logger.info(
`[Pull] "${sess.name}" page ${pageCount} got empty response, retry ${ri + 1}/${retryDelays.length} after ${retryDelays[ri]}ms`
@@ -194,13 +220,52 @@ export class PullEngine {
})
const retryStat = fs.statSync(retryFile)
this.logger.info(`[Pull] "${sess.name}" retry ${ri + 1}: fetched ${retryStat.size} bytes`)
if (retryStat.size < 1024) {
const retrySync = parseSyncFromFile(retryFile)
if (retryStat.size < 1024 && !fileContainsMessages(retryFile) && retrySync?.hasMore === false) {
cleanupTempFile(retryFile)
continue
}
const retrySync = parseSyncFromFile(retryFile)
const retryResult = await this.importTempFile(ds.baseUrl, sess, retryFile)
cleanupTempFile(retryFile)
if (retryResult.needFullResync && !resyncAttempted) {
resyncAttempted = true
this.logger.info(`[Pull] Resetting since=0 for "${sess.name}" full resync`)
since = 0
pageCount = 0
sess.targetSessionId = ''
sess.lastPullAt = 0
this.dsManager.updateSession(sourceId, sess.id, { targetSessionId: '', lastPullAt: 0 })
retrySuccess = true
retryHasMore = true
break
}
if (retryResult.needFullResync) {
const errMsg = 'Full resync failed'
this.logger.error(`[Pull] Full resync already attempted for "${sess.name}", aborting`)
this.dsManager.updateSession(sourceId, sess.id, {
lastPullAt: Math.floor(Date.now() / 1000),
lastStatus: 'error',
lastError: errMsg,
})
this.notifier.onPullResult(sourceId, sess.id, 'error', errMsg)
this.markProgressDone(sess.id)
return { success: false, newMessageCount: 0, error: errMsg }
}
if (!retryResult.success) {
const errMsg = retryResult.error || 'Import failed'
this.dsManager.updateSession(sourceId, sess.id, {
lastPullAt: Math.floor(Date.now() / 1000),
lastStatus: 'error',
lastError: errMsg,
})
this.notifier.onPullResult(sourceId, sess.id, 'error', errMsg)
this.markProgressDone(sess.id)
return { success: false, newMessageCount: 0, error: errMsg }
}
if (retryResult.success && retryResult.sessionId && !sess.targetSessionId) {
sess.targetSessionId = retryResult.sessionId
this.dsManager.updateSession(sourceId, sess.id, { targetSessionId: retryResult.sessionId })
@@ -216,10 +281,12 @@ export class PullEngine {
if (retrySync?.hasMore && retrySync.nextSince !== undefined) {
since = retrySync.nextSince
}
retryHasMore = !!retrySync?.hasMore
retrySuccess = true
break
}
if (!retrySuccess) break
if (!retryHasMore) break
continue
}
}
@@ -229,7 +296,7 @@ export class PullEngine {
break
}
const sync = parseSyncFromFile(tempFile)
const sync = sync0
const result = await this.importTempFile(ds.baseUrl, sess, tempFile)
cleanupTempFile(tempFile)