diff --git a/electron/main/api/pullScheduler.ts b/electron/main/api/pullScheduler.ts index 99115df..901a7a4 100644 --- a/electron/main/api/pullScheduler.ts +++ b/electron/main/api/pullScheduler.ts @@ -9,12 +9,7 @@ import * as crypto from 'crypto' import { net, BrowserWindow } from 'electron' import { getTempDir } from '../paths' import * as worker from '../worker/workerManager' -import { - loadDataSources, - type DataSource, - type ImportSession, - updateImportSession, -} from './dataSource' +import { loadDataSources, type DataSource, type ImportSession, updateImportSession } from './dataSource' import { getImportingStatus } from './routes/import' import { apiLogger } from './logger' @@ -80,7 +75,12 @@ function parseSyncFromFile(filePath: string): SyncMeta | null { try { const obj = JSON.parse(lines[i]) if (obj._type === 'sync') { - return { hasMore: !!obj.hasMore, nextSince: obj.nextSince, nextOffset: obj.nextOffset, watermark: obj.watermark } + return { + hasMore: !!obj.hasMore, + nextSince: obj.nextSince, + nextOffset: obj.nextOffset, + watermark: obj.watermark, + } } } catch { continue @@ -188,11 +188,7 @@ interface ImportResult { needFullResync?: boolean } -async function importTempFile( - baseUrl: string, - sess: ImportSession, - tempFile: string -): Promise { +async function importTempFile(baseUrl: string, sess: ImportSession, tempFile: string): Promise { let targetId = sess.targetSessionId if (!targetId) { const derived = deriveLocalSessionId(baseUrl, sess.remoteSessionId) @@ -237,6 +233,7 @@ async function importTempFile( // ==================== Core pull loop (per ImportSession) ==================== const MAX_PAGES_PER_PULL = 50 +const DEFAULT_PULL_LIMIT = 1000 interface PullSessionResult { success: boolean @@ -244,11 +241,7 @@ interface PullSessionResult { error?: string } -async function executePullSession( - sourceId: string, - ds: DataSource, - sess: ImportSession -): Promise { +async function executePullSession(sourceId: string, ds: DataSource, sess: ImportSession): Promise { if (getImportingStatus()) { apiLogger.info(`[Pull] Skipping "${sess.name}": import in progress`) return { success: false, newMessageCount: 0, error: 'Import in progress' } @@ -266,7 +259,12 @@ async function executePullSession( try { while (pageCount < MAX_PAGES_PER_PULL) { pageCount++ - const tempFile = await fetchToTempFile(ds.baseUrl, sess.remoteSessionId, ds.token, { since, offset, end }) + const tempFile = await fetchToTempFile(ds.baseUrl, sess.remoteSessionId, ds.token, { + since, + offset, + end, + limit: DEFAULT_PULL_LIMIT, + }) try { const stat = fs.statSync(tempFile) @@ -390,7 +388,9 @@ function startTimer(ds: DataSource): void { }, intervalMs) timers.set(ds.id, timer) - apiLogger.info(`[Pull] Timer started for source ${ds.baseUrl} (${ds.sessions.length} sessions, every ${ds.intervalMinutes}min)`) + apiLogger.info( + `[Pull] Timer started for source ${ds.baseUrl} (${ds.sessions.length} sessions, every ${ds.intervalMinutes}min)` + ) } export function stopTimer(id: string): void { @@ -431,10 +431,7 @@ export function reloadTimer(dsId: string): void { } } -export async function triggerPull( - sourceId: string, - sessionId?: string -): Promise<{ success: boolean; error?: string }> { +export async function triggerPull(sourceId: string, sessionId?: string): Promise<{ success: boolean; error?: string }> { const ds = loadDataSources().find((s) => s.id === sourceId) if (!ds) return { success: false, error: 'Data source not found' }