fix: Pull 拉取时指定 limit=1000 避免远端数据源一次性导出过多数据导致卡顿

之前 fetchToTempFile 未传 limit 参数,远端可能尝试一次返回全部消息。
现在每次请求限制最多 1000 条,通过 sync 分页续拉机制自动获取完整数据。

Made-with: Cursor
This commit is contained in:
digua
2026-04-22 21:07:28 +08:00
committed by digua
parent 770f44fc9e
commit 3474e00508
+20 -23
View File
@@ -9,12 +9,7 @@ import * as crypto from 'crypto'
import { net, BrowserWindow } from 'electron'
import { getTempDir } from '../paths'
import * as worker from '../worker/workerManager'
import {
loadDataSources,
type DataSource,
type ImportSession,
updateImportSession,
} from './dataSource'
import { loadDataSources, type DataSource, type ImportSession, updateImportSession } from './dataSource'
import { getImportingStatus } from './routes/import'
import { apiLogger } from './logger'
@@ -80,7 +75,12 @@ function parseSyncFromFile(filePath: string): SyncMeta | null {
try {
const obj = JSON.parse(lines[i])
if (obj._type === 'sync') {
return { hasMore: !!obj.hasMore, nextSince: obj.nextSince, nextOffset: obj.nextOffset, watermark: obj.watermark }
return {
hasMore: !!obj.hasMore,
nextSince: obj.nextSince,
nextOffset: obj.nextOffset,
watermark: obj.watermark,
}
}
} catch {
continue
@@ -188,11 +188,7 @@ interface ImportResult {
needFullResync?: boolean
}
async function importTempFile(
baseUrl: string,
sess: ImportSession,
tempFile: string
): Promise<ImportResult> {
async function importTempFile(baseUrl: string, sess: ImportSession, tempFile: string): Promise<ImportResult> {
let targetId = sess.targetSessionId
if (!targetId) {
const derived = deriveLocalSessionId(baseUrl, sess.remoteSessionId)
@@ -237,6 +233,7 @@ async function importTempFile(
// ==================== Core pull loop (per ImportSession) ====================
const MAX_PAGES_PER_PULL = 50
const DEFAULT_PULL_LIMIT = 1000
interface PullSessionResult {
success: boolean
@@ -244,11 +241,7 @@ interface PullSessionResult {
error?: string
}
async function executePullSession(
sourceId: string,
ds: DataSource,
sess: ImportSession
): Promise<PullSessionResult> {
async function executePullSession(sourceId: string, ds: DataSource, sess: ImportSession): Promise<PullSessionResult> {
if (getImportingStatus()) {
apiLogger.info(`[Pull] Skipping "${sess.name}": import in progress`)
return { success: false, newMessageCount: 0, error: 'Import in progress' }
@@ -266,7 +259,12 @@ async function executePullSession(
try {
while (pageCount < MAX_PAGES_PER_PULL) {
pageCount++
const tempFile = await fetchToTempFile(ds.baseUrl, sess.remoteSessionId, ds.token, { since, offset, end })
const tempFile = await fetchToTempFile(ds.baseUrl, sess.remoteSessionId, ds.token, {
since,
offset,
end,
limit: DEFAULT_PULL_LIMIT,
})
try {
const stat = fs.statSync(tempFile)
@@ -390,7 +388,9 @@ function startTimer(ds: DataSource): void {
}, intervalMs)
timers.set(ds.id, timer)
apiLogger.info(`[Pull] Timer started for source ${ds.baseUrl} (${ds.sessions.length} sessions, every ${ds.intervalMinutes}min)`)
apiLogger.info(
`[Pull] Timer started for source ${ds.baseUrl} (${ds.sessions.length} sessions, every ${ds.intervalMinutes}min)`
)
}
export function stopTimer(id: string): void {
@@ -431,10 +431,7 @@ export function reloadTimer(dsId: string): void {
}
}
export async function triggerPull(
sourceId: string,
sessionId?: string
): Promise<{ success: boolean; error?: string }> {
export async function triggerPull(sourceId: string, sessionId?: string): Promise<{ success: boolean; error?: string }> {
const ds = loadDataSources().find((s) => s.id === sourceId)
if (!ds) return { success: false, error: 'Data source not found' }