From 86cadc59d70d61314e0e72f7cb68430f15fe0385 Mon Sep 17 00:00:00 2001 From: digua Date: Sun, 19 Apr 2026 21:48:16 +0800 Subject: [PATCH] =?UTF-8?q?feat(api):=20=E5=AE=9E=E7=8E=B0=20Import=20API?= =?UTF-8?q?=20v1=20=E5=AE=8C=E6=95=B4=E5=8D=8F=E8=AE=AE=E4=B8=8E=E5=B1=82?= =?UTF-8?q?=E7=BA=A7=E6=95=B0=E6=8D=AE=E6=BA=90=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .prettierignore | 1 + docs/cn/chatlab-format.md | 389 ------------ docs/cn/{ => standard}/chatlab-api.md | 218 +++---- docs/cn/standard/chatlab-format.md | 11 +- docs/cn/standard/chatlab-import.md | 465 ++++++++++++++ docs/cn/standard/chatlab-pull.md | 308 ++++++++++ docs/en/standard/chatlab-format.md | 12 +- docs/tw/chatlab-format.md | 8 +- docs/tw/standard/chatlab-format.md | 8 +- electron/main/api/auth.ts | 10 +- electron/main/api/config.ts | 5 +- electron/main/api/dataSource.ts | 141 ++++- electron/main/api/errors.ts | 12 + electron/main/api/index.ts | 13 +- electron/main/api/logger.ts | 65 ++ electron/main/api/pullDiscovery.ts | 68 +++ electron/main/api/pullScheduler.ts | 446 ++++++++++---- electron/main/api/routes/import.ts | 369 +++++++++-- electron/main/api/server.ts | 3 +- electron/main/ipc/api.ts | 78 ++- electron/main/worker/dbWorker.ts | 14 +- .../main/worker/import/incrementalImport.ts | 324 +++++++--- electron/main/worker/import/index.ts | 3 + electron/main/worker/import/streamImport.ts | 64 +- electron/main/worker/query/sessions.ts | 16 + electron/main/worker/workerManager.ts | 54 +- electron/preload/apis/api-server.ts | 74 ++- electron/preload/index.d.ts | 50 +- src/components/common/Sidebar.vue | 13 +- src/i18n/locales/en-US/settings.json | 38 +- src/i18n/locales/ja-JP/settings.json | 38 +- src/i18n/locales/zh-CN/settings.json | 38 +- src/i18n/locales/zh-TW/settings.json | 38 +- .../components/API/DataSourceAddModal.vue | 307 +++++++--- .../components/API/DataSourceEditModal.vue | 107 ++++ .../settings/components/ApiSettingsTab.vue | 578 +++++++++++------- src/pages/settings/index.vue | 4 +- src/stores/apiServer.ts | 95 ++- 38 files changed, 3274 insertions(+), 1211 deletions(-) delete mode 100644 docs/cn/chatlab-format.md rename docs/cn/{ => standard}/chatlab-api.md (71%) create mode 100644 docs/cn/standard/chatlab-import.md create mode 100644 docs/cn/standard/chatlab-pull.md create mode 100644 electron/main/api/logger.ts create mode 100644 electron/main/api/pullDiscovery.ts create mode 100644 src/pages/settings/components/API/DataSourceEditModal.vue diff --git a/.prettierignore b/.prettierignore index 9454f03c..345ab59e 100644 --- a/.prettierignore +++ b/.prettierignore @@ -4,3 +4,4 @@ pnpm-lock.yaml LICENSE tsconfig.json tsconfig.*.json +docs/**/*.md diff --git a/docs/cn/chatlab-format.md b/docs/cn/chatlab-format.md deleted file mode 100644 index bc44ff89..00000000 --- a/docs/cn/chatlab-format.md +++ /dev/null @@ -1,389 +0,0 @@ -# ChatLab 标准化格式规范 v0.0.2 - -ChatLab 定义了一套标准的聊天记录数据交换格式,用于支持多平台数据的统一导入和分析。 - -只要你将聊天记录转为该格式,那么就可以被 ChatLab 解析并使用其分析能力。 - -::: warning 注意该格式规范目前仍处于早期制定阶段,部分字段和结构可能会在后续版本中调整。::: - -## 概述 - -### 支持的文件格式 - -| 格式 | 扩展名 | 适用场景 | -| --------- | -------- | ------------------------------------------------- | -| **JSON** | `.json` | 中小型记录(<100 万条),结构清晰,易于阅读 | -| **JSONL** | `.jsonl` | 超大规模记录(>100 万条),流式处理,内存占用恒定 | - -### 格式对比 - -| 特性 | JSON | JSONL | -| ------------ | ---------------------- | ----------------------- | -| 内存占用 | 需加载完整结构 | 逐行处理,恒定 (~100MB) | -| 文件大小限制 | ~1GB(取决于内存) | 无实际限制 | -| 追加写入 | ❌ 需重写整个文件 | ✅ 直接追加行 | -| 错误恢复 | 单处错误整文件失效 | 可跳过错误行继续 | -| 可读性 | ⭐⭐⭐ 易于阅读 | ⭐⭐ 每行一条记录 | -| 推荐场景 | 小中型记录 (<100 万条) | 大型记录 (>100 万条) | - -## 快速说明 - -以下是一个**最小化**的 ChatLab 格式示例,只包含必要字段: - -```json -{ - "chatlab": { - "version": "0.0.2", - "exportedAt": 1703001600 - }, - "meta": { - "name": "我的群聊", - "platform": "qq", - "type": "group" - }, - "members": [ - { - "platformId": "123456", - "accountName": "张三" - } - ], - "messages": [ - { - "sender": "123456", - "accountName": "张三", - "timestamp": 1703001600, - "type": 0, - "content": "大家好!" - } - ] -} -``` - ---- - -## JSON 格式详细说明 - -### 文件头 (chatlab) - -| 字段 | 类型 | 必填 | 说明 | -| ------------- | ------ | ---- | ---------------------------- | -| `version` | string | ✅ | 格式版本号,当前为 `"0.0.2"` | -| `exportedAt` | number | ✅ | 导出时间(秒级 Unix 时间戳) | -| `generator` | string | - | 生成工具名称 | -| `description` | string | - | 描述信息 | - -### 元信息 (meta) - -| 字段 | 类型 | 必填 | 说明 | -| ------------- | ------------- | ---- | -------------------------------------------------------- | -| `name` | string | ✅ | 群名或对话名 | -| `platform` | string | ✅ | 平台标识,如 `qq` / `wechat` / `discord` / `whatsapp` 等 | -| `type` | string | ✅ | 聊天类型:`group`(群聊)/ `private`(私聊) | -| `groupId` | string | - | 群 ID(仅群聊) | -| `groupAvatar` | string | - | 群头像(Data URL 格式) | -| `ownerId` | string | - | 所有者/导出者的 platformId | -| `sources` | MergeSource[] | - | 合并来源(合并工具生成,见下方说明) | - -#### MergeSource 结构(合并来源) - -当使用合并工具合并多个聊天记录文件时,`sources` 字段会记录原始文件的来源信息: - -| 字段 | 类型 | 必填 | 说明 | -| -------------- | ------ | ---- | -------- | -| `filename` | string | ✅ | 原文件名 | -| `platform` | string | - | 原平台 | -| `messageCount` | number | ✅ | 消息数量 | - -### 成员 (members) - -| 字段 | 类型 | 必填 | 说明 | -| --------------- | ------------ | ---- | ------------------------- | -| `platformId` | string | ✅ | 用户唯一标识 | -| `accountName` | string | ✅ | 账号名称 | -| `groupNickname` | string | - | 群昵称(仅群聊) | -| `aliases` | string[] | - | 用户自定义别名 | -| `avatar` | string | - | 用户头像(Data URL 格式) | -| `roles` | MemberRole[] | - | 成员角色(可多个) | - -#### 角色 (roles) - -成员可以拥有一个或多个角色,用于标识群主、管理员等身份: - -| 字段 | 类型 | 必填 | 说明 | -| ------ | ------ | ---- | -------------------------------------- | -| `id` | string | ✅ | 角色标识:`owner` / `admin` / 自定义ID | -| `name` | string | - | 角色显示名称(自定义角色需要) | - -**标准角色 ID:** - -| ID | 说明 | -| ------- | ----------- | -| `owner` | 群主/创建者 | -| `admin` | 管理员 | - -**角色示例:** - -```json -// 群主 -"roles": [{ "id": "owner" }] - -// 管理员 -"roles": [{ "id": "admin" }] - -// 多角色 -"roles": [ - { "id": "owner" }, - { "id": "tech-team", "name": "技术组" }, - { "id": "vip", "name": "VIP会员" } -] -``` - -### 消息 (messages) - -| 字段 | 类型 | 必填 | 说明 | -| ------------------- | -------------- | ---- | --------------------------------- | -| `sender` | string | ✅ | 发送者的 `platformId` | -| `accountName` | string | ✅ | 发送时的账号名称 | -| `groupNickname` | string | - | 发送时的群昵称 | -| `timestamp` | number | ✅ | 秒级 Unix 时间戳 | -| `type` | number | ✅ | 消息类型(见下方对照表) | -| `content` | string \| null | ✅ | 消息内容(非文本消息可为 `null`) | -| `platformMessageId` | string | - | 消息的平台原始 ID | -| `replyToMessageId` | string | - | 回复的目标消息 ID | - -#### 消息 ID 与回复关系说明 - -**`platformMessageId`**(消息的平台原始 ID): - -- 存储消息在原始平台上的唯一标识(如 Discord 的 snowflake ID、QQ 的消息 ID) -- 用于在查询时关联 `replyToMessageId`,以显示被回复消息的内容 -- 如果平台不提供消息 ID,可省略此字段 - -**`replyToMessageId`**(回复的目标消息 ID): - -- 存储被回复消息的**平台原始 ID** -- 通过与其他消息的 `platformMessageId` 关联,可查询被回复消息的内容和发送者 -- 仅当消息是回复类型时才有意义 -- 如果平台不支持或数据不包含回复关系,可省略此字段 - ---- - -## 消息类型对照表 - -::: tip 提示若您的聊天记录中有其他特殊类型需要支持,请提交 issue 说明情况,我们会评估是否加入标准消息类型中。::: - -### 基础消息类型 (0-19) - -| 值 | 名称 | 说明 | -| --- | -------- | ----------- | -| 0 | TEXT | 文本消息 | -| 1 | IMAGE | 图片 | -| 2 | VOICE | 语音 | -| 3 | VIDEO | 视频 | -| 4 | FILE | 文件 | -| 5 | EMOJI | 表情包/贴纸 | -| 7 | LINK | 链接/卡片 | -| 8 | LOCATION | 位置 | - -### 交互消息类型 (20-39) - -| 值 | 名称 | 说明 | -| --- | ---------- | ---------------------- | -| 20 | RED_PACKET | 红包 | -| 21 | TRANSFER | 转账 | -| 22 | POKE | 拍一拍/戳一戳 | -| 23 | CALL | 语音/视频通话 | -| 24 | SHARE | 分享(音乐、小程序等) | -| 25 | REPLY | 引用回复 | -| 26 | FORWARD | 转发消息 | -| 27 | CONTACT | 名片消息 | - -### 系统消息类型 (80+) - -| 值 | 名称 | 说明 | -| --- | ------ | ------------------------------ | -| 80 | SYSTEM | 系统消息(入群/退群/群公告等) | -| 81 | RECALL | 撤回消息 | -| 99 | OTHER | 其他/未知 | - -## 头像格式说明 - -头像字段 `avatar` 和 `groupAvatar` 支持两种格式: - -### 1. Data URL - -嵌入式格式,图片数据直接编码在文件中,离线可用: - -``` -data:image/jpeg;base64,/9j/4AAQSkZJRgABAQAAAQABAAD... -``` - -支持的图片 MIME 类型: - -- `image/jpeg` - JPEG 格式(推荐,体积较小) -- `image/png` - PNG 格式 -- `image/gif` - GIF 格式 -- `image/webp` - WebP 格式 - -### 2. 网络 URL - -外链格式,图片存储在网络服务器,体积更小但需网络访问: - -``` -https://example.com/avatars/user123.jpg -``` - -::: tip 建议 - -- 如果需要离线使用或长期存档,推荐使用 Data URL 格式 -- 导出 Data URL 时建议将头像压缩为 100×100 像素以内,以减小文件体积 -- 如果头像来自可靠的长期有效的 CDN,可使用网络 URL 以减小文件体积 ::: - -## 完整示例 - -### 群聊示例(含可选字段) - -```json -{ - "chatlab": { - "version": "0.0.2", - "exportedAt": 1703001600, - "generator": "My Converter Tool", - "description": "2024年技术交流群聊天记录备份" - }, - "meta": { - "name": "技术交流群", - "platform": "wechat", - "type": "group", - "groupId": "38988428513", - "groupAvatar": "data:image/jpeg;base64,/9j/4AAQSkZJRg...", - "ownerId": "abc123" - }, - "members": [ - { - "platformId": "abc123", - "accountName": "张三", - "groupNickname": "群主-张三", - "avatar": "data:image/jpeg;base64,/9j/4AAQSkZJRg...", - "roles": [{ "id": "owner" }] - }, - { - "platformId": "def456", - "accountName": "李四", - "groupNickname": "管理员", - "avatar": "data:image/jpeg;base64,/9j/4AAQSkZJRg...", - "roles": [{ "id": "admin" }] - } - ], - "messages": [ - { - "platformMessageId": "msg_001", - "sender": "abc123", - "accountName": "张三", - "groupNickname": "群主-张三", - "timestamp": 1703001600, - "type": 0, - "content": "大家好!欢迎加入技术交流群~" - }, - { - "platformMessageId": "msg_002", - "sender": "def456", - "accountName": "李四", - "groupNickname": "管理员", - "timestamp": 1703001610, - "type": 25, - "content": "收到!", - "replyToMessageId": "msg_001" - } - ] -} -``` - -### 私聊示例 - -```json -{ - "chatlab": { - "version": "0.0.2", - "exportedAt": 1703001600 - }, - "meta": { - "name": "与小明的对话", - "platform": "qq", - "type": "private" - }, - "members": [ - { - "platformId": "123456789", - "accountName": "我", - "avatar": "data:image/jpeg;base64,/9j/4AAQSkZJRg..." - }, - { - "platformId": "987654321", - "accountName": "小明", - "avatar": "data:image/jpeg;base64,/9j/4AAQSkZJRg..." - } - ], - "messages": [ - { - "sender": "123456789", - "accountName": "我", - "timestamp": 1703001600, - "type": 0, - "content": "在吗?" - } - ] -} -``` - -## JSONL 流式格式 - -JSONL(JSON Lines)格式适用于**超大规模聊天记录**(>100 万条),可避免内存溢出问题。 - -### 格式特点 - -- 每行一个 JSON 对象 -- 通过 `_type` 字段区分行类型:`header` / `member` / `message` -- 内存占用恒定(约 100MB),支持 GB 级文件 -- 支持流式写入,可边导出边追加 - -### 行类型说明 - -| `_type` | 说明 | 是否必需 | -| --------- | -------------------------------- | --------------- | -| `header` | 文件头,包含 `chatlab` 和 `meta` | ✅ 必须在第一行 | -| `member` | 成员信息 | - 可选 | -| `message` | 消息记录 | ✅ 至少一条 | - -### 完整示例 - -```jsonl -{"_type":"header","chatlab":{"version":"0.0.2","exportedAt":1703001600},"meta":{"name":"技术交流群","platform":"qq","type":"group"}} -{"_type":"member","platformId":"123456","accountName":"张三","groupNickname":"群主","roles":[{"id":"owner"}]} -{"_type":"member","platformId":"789012","accountName":"李四"} -{"_type":"message","platformMessageId":"msg_001","sender":"123456","accountName":"张三","groupNickname":"群主","timestamp":1703001600,"type":0,"content":"大家好!"} -{"_type":"message","sender":"789012","accountName":"李四","timestamp":1703001610,"type":0,"content":"你好!"} -{"_type":"message","sender":"123456","accountName":"张三","groupNickname":"群主","timestamp":1703001620,"type":1,"content":"[图片]"} -``` - -### 解析规则 - -1. **第一行必须是 header**:包含 `chatlab` 版本和 `meta` 元信息 -2. **成员行在消息之前**:可选,如果省略,成员信息会从消息中自动收集 -3. **消息按时间顺序排列**:建议按 `timestamp` 升序排列 -4. **每行独立完整**:单行解析错误可跳过继续处理 -5. **支持注释行**:以 `#` 开头的行会被跳过(可用于添加备注) - -::: warning 注意 - -- 每行必须是**有效的 JSON**(不能跨行) -- 行之间用换行符 `\n` 分隔 - -::: - -## 版本历史 - -| 版本 | 日期 | 变更 | -| ----- | ---------- | ------------------------------------------------------------------------------ | -| 0.0.1 | 2025-12-22 | 初始版本 | -| 0.0.2 | 2026-01-09 | 新增 roles、ownerId、platformMessageId、replyToMessageId 字段;新增 JSONL 格式 | diff --git a/docs/cn/chatlab-api.md b/docs/cn/standard/chatlab-api.md similarity index 71% rename from docs/cn/chatlab-api.md rename to docs/cn/standard/chatlab-api.md index b1ce8c74..9ecb37ff 100644 --- a/docs/cn/chatlab-api.md +++ b/docs/cn/standard/chatlab-api.md @@ -1,6 +1,21 @@ -# ChatLab API 文档 +--- +outline: deep +--- -ChatLab 提供本地 RESTful API 服务,允许外部工具、脚本和 MCP 等通过 HTTP 接口查询聊天记录、执行 SQL 查询、导入聊天数据。 +# ChatLab API + +> v1 + +ChatLab 提供本地 RESTful API 服务,允许外部工具、脚本和 MCP 等通过 HTTP 接口查询聊天记录、执行 SQL 查询、导出聊天数据。 + +::: tip 数据导入 + +如需通过 API 导入聊天数据,请参阅: + +- **[Push 导入协议](./chatlab-import.md)** — 外部系统主动将数据推送到 ChatLab +- **[Pull 远程数据源协议](./chatlab-pull.md)** — 第三方暴露标准端点,ChatLab 主动拉取数据 + +::: ## 快速开始 @@ -93,7 +108,7 @@ Token 可在 设置 → ChatLab API 页面查看和重新生成。 | GET | `/api/v1/status` | 服务状态 | | GET | `/api/v1/schema` | ChatLab Format JSON Schema | -### 数据查询(导出) +### 数据查询与导出 | 方法 | 路径 | 说明 | | ---- | ------------------------------------- | ------------------------ | @@ -107,10 +122,9 @@ Token 可在 设置 → ChatLab API 页面查看和重新生成。 ### 数据导入 -| 方法 | 路径 | 说明 | -| ---- | ----------------------------- | ------------------------ | -| POST | `/api/v1/import` | 导入聊天记录(新建会话) | -| POST | `/api/v1/sessions/:id/import` | 增量导入到指定会话 | +| 方法 | 路径 | 说明 | 文档 | +| ---- | ------------------------------------ | -------------------------------------------------------- | -------------------------------- | +| POST | `/api/v1/imports/:sessionId` | 导入消息到指定会话(首次自动创建,后续追加) | [Push 导入协议](./chatlab-import.md) | --- @@ -153,7 +167,8 @@ Token 可在 设置 → ChatLab API 页面查看和重新生成。 "platform": "qq", "type": "group", "messageCount": 58000, - "memberCount": 120 + "memberCount": 120, + "lastTimestamp": 1711468800 } ] } @@ -171,6 +186,36 @@ Token 可在 设置 → ChatLab API 页面查看和重新生成。 | ---- | ------ | ------- | | `id` | string | 会话 ID | +**响应示例:** + +```json +{ + "success": true, + "data": { + "id": "wechat_xxx@chatroom", + "name": "产品讨论群", + "platform": "wechat", + "type": "group", + "messageCount": 58000, + "memberCount": 86, + "firstTimestamp": 1609459200, + "lastTimestamp": 1711468800, + "lastPlatformMessageId": "msg_900000", + "groupId": "xxx@chatroom", + "importedAt": 1711469900 + } +} +``` + +| 字段 | 类型 | 说明 | +| ----------------------- | ------------ | ------------------------------------------------------ | +| `messageCount` | number | 会话内消息总数 | +| `memberCount` | number | 成员总数 | +| `firstTimestamp` | number\|null | 最早消息时间戳(秒级 Unix) | +| `lastTimestamp` | number\|null | 最新消息时间戳(秒级 Unix) | +| `lastPlatformMessageId` | string\|null | 最新一条有 platformMessageId 的消息 ID(用于增量边界) | +| `importedAt` | number | 最后一次导入时间 | + --- ### GET /api/v1/sessions/:id/messages @@ -186,8 +231,7 @@ Token 可在 设置 → ChatLab API 页面查看和重新生成。 | `startTime` | number | - | 起始时间戳(秒级 Unix) | | `endTime` | number | - | 结束时间戳(秒级 Unix) | | `keyword` | string | - | 关键词搜索 | -| `senderId` | string | - | 按发送者 platformId 筛选 | -| `type` | number | - | 按消息类型筛选 | +| `senderId` | string | - | 按发送者 ID 筛选 | **请求示例:** @@ -279,7 +323,7 @@ curl "http://127.0.0.1:5200/api/v1/sessions/abc123/messages?page=1&limit=50&keyw ```json { - "sql": "SELECT sender, COUNT(*) as count FROM messages GROUP BY sender ORDER BY count DESC LIMIT 10" + "sql": "SELECT sender_id, COUNT(*) as count FROM message GROUP BY sender_id ORDER BY count DESC LIMIT 10" } ``` @@ -289,16 +333,18 @@ curl "http://127.0.0.1:5200/api/v1/sessions/abc123/messages?page=1&limit=50&keyw { "success": true, "data": { - "columns": ["sender", "count"], + "columns": ["sender_id", "count"], "rows": [ - ["123456", 5800], - ["789012", 3200] + [1, 5800], + [2, 3200] ] } } ``` -> 关于数据库表结构,请参考 ChatLab 内部文档或使用 `SELECT * FROM sqlite_master WHERE type='table'` 查询。 +::: tip 提示 +使用 `SELECT * FROM sqlite_master WHERE type='table'` 查询可用的数据库表结构。 +::: --- @@ -332,104 +378,6 @@ curl "http://127.0.0.1:5200/api/v1/sessions/abc123/messages?page=1&limit=50&keyw --- -### POST /api/v1/import - -将聊天记录导入 ChatLab,**创建新会话**。 - -#### 支持的 Content-Type - -| Content-Type | 格式 | 适用场景 | Body 限制 | -| ---------------------- | ------------------- | ------------------------------ | ---------- | -| `application/json` | ChatLab Format JSON | 中小数据(快速测试、脚本集成) | **50MB** | -| `application/x-ndjson` | ChatLab JSONL 格式 | 大规模数据(生产级集成) | **无限制** | - -#### JSON 模式示例 - -```bash -curl -X POST http://127.0.0.1:5200/api/v1/import \ - -H "Authorization: Bearer YOUR_TOKEN" \ - -H "Content-Type: application/json" \ - -d '{ - "chatlab": { - "version": "0.0.2", - "exportedAt": 1711468800 - }, - "meta": { - "name": "导入测试", - "platform": "qq", - "type": "group" - }, - "members": [ - { "platformId": "123", "accountName": "测试用户" } - ], - "messages": [ - { - "sender": "123", - "accountName": "测试用户", - "timestamp": 1711468800, - "type": 0, - "content": "Hello World" - } - ] - }' -``` - -#### JSONL 模式示例 - -```bash -cat data.jsonl | curl -X POST http://127.0.0.1:5200/api/v1/import \ - -H "Authorization: Bearer YOUR_TOKEN" \ - -H "Content-Type: application/x-ndjson" \ - --data-binary @- -``` - -**响应:** - -```json -{ - "success": true, - "data": { - "mode": "new", - "sessionId": "session_xyz789" - } -} -``` - -> 关于 ChatLab Format 的详细规范,请参考 [ChatLab 标准化格式规范](./chatlab-format.md)。 - ---- - -### POST /api/v1/sessions/:id/import - -将聊天记录**增量导入**到已存在的会话。支持去重,相同消息不会重复插入。 - -**去重规则:** - -消息唯一键为 `timestamp + senderPlatformId + contentLength`。如果一条消息的时间戳、发送者和内容长度与已有消息完全相同,则视为重复并跳过。 - -**路径参数:** - -| 参数 | 类型 | 说明 | -| ---- | ------ | ----------- | -| `id` | string | 目标会话 ID | - -Content-Type 和请求体格式与 `POST /api/v1/import` 相同。 - -**响应:** - -```json -{ - "success": true, - "data": { - "mode": "incremental", - "sessionId": "session_abc123", - "newMessageCount": 150 - } -} -``` - ---- - ## 并发与限制 | 限制项 | 值 | 说明 | @@ -444,18 +392,20 @@ Content-Type 和请求体格式与 `POST /api/v1/import` 相同。 ## 错误码 -| 错误码 | HTTP 状态码 | 说明 | -| ------------------------ | ----------- | ------------------------------- | -| `UNAUTHORIZED` | 401 | Token 无效或缺失 | -| `SESSION_NOT_FOUND` | 404 | 会话不存在 | -| `INVALID_FORMAT` | 400 | 请求体不符合 ChatLab Format | -| `SQL_READONLY_VIOLATION` | 400 | SQL 不是 SELECT 语句 | -| `SQL_EXECUTION_ERROR` | 400 | SQL 执行出错 | -| `EXPORT_TOO_LARGE` | 400 | 消息数超过导出上限(10 万条) | -| `BODY_TOO_LARGE` | 413 | 请求体超过 50MB(仅 JSON 模式) | -| `IMPORT_IN_PROGRESS` | 409 | 有其他导入正在进行 | -| `IMPORT_FAILED` | 500 | 导入失败 | -| `SERVER_ERROR` | 500 | 服务内部错误 | +| 错误码 | HTTP 状态码 | 说明 | +| ------------------------ | ----------- | ----------------------------------- | +| `UNAUTHORIZED` | 401 | Token 无效或缺失 | +| `SESSION_NOT_FOUND` | 404 | 会话不存在 | +| `INVALID_FORMAT` | 400 | Content-Type 不支持或请求体格式错误 | +| `INVALID_PAYLOAD` | 400 | 必填字段缺失、类型错误或校验失败 | +| `SQL_READONLY_VIOLATION` | 400 | SQL 不是 SELECT 语句 | +| `SQL_EXECUTION_ERROR` | 400 | SQL 执行出错 | +| `EXPORT_TOO_LARGE` | 400 | 消息数超过导出上限(10 万条) | +| `BODY_TOO_LARGE` | 413 | 请求体超过 50MB(仅 JSON 模式) | +| `IMPORT_IN_PROGRESS` | 409 | 有其他导入正在进行 | +| `IDEMPOTENCY_CONFLICT` | 409 | 相同幂等键但请求体不一致 | +| `IMPORT_FAILED` | 500 | 导入失败 | +| `SERVER_ERROR` | 500 | 服务内部错误 | --- @@ -474,9 +424,9 @@ Content-Type 和请求体格式与 `POST /api/v1/import` 相同。 将 ChatLab API 接入 Claude Desktop 等 AI 工具,实现 AI 对聊天记录的直接查询和分析。 -### 2. 自动化脚本 +### 2. 自动化导入 -编写脚本定期从其他平台导出聊天记录,转换为 ChatLab Format 后通过 Push API 自动导入。 +编写脚本定期从其他平台导出聊天记录,转换为 ChatLab Format 后通过 [Push 导入协议](./chatlab-import.md) 自动导入。 ### 3. 数据分析 @@ -486,9 +436,9 @@ Content-Type 和请求体格式与 `POST /api/v1/import` 相同。 通过 `/export` 端点定期导出重要会话数据作为 JSON 备份。 -### 5. 定时拉取 +### 5. 远程数据源 -在设置页配置外部数据源 URL,ChatLab 会按设定间隔自动拉取并导入新数据。 +在设置页配置外部数据源 URL,ChatLab 按 [Pull 远程数据源协议](./chatlab-pull.md) 自动拉取并导入新数据。 --- @@ -496,4 +446,12 @@ Content-Type 和请求体格式与 `POST /api/v1/import` 相同。 | 版本 | 说明 | | ---- | ------------------------------------------------------------------------------ | -| v1 | 初始版本,支持会话查询、消息搜索、SQL、导出、导入(JSON + JSONL)、Pull 调度器 | +| v1 | 支持会话查询、消息搜索、SQL、导出、Push 导入(JSON + JSONL)、Pull 远程数据源 | + +--- + +## 相关文档 + +- [ChatLab 标准化格式规范](./chatlab-format.md) — 数据交换格式定义 +- [Push 导入协议](./chatlab-import.md) — 外部系统主动推送数据到 ChatLab +- [Pull 远程数据源协议](./chatlab-pull.md) — 第三方暴露标准端点,ChatLab 主动拉取 diff --git a/docs/cn/standard/chatlab-format.md b/docs/cn/standard/chatlab-format.md index 68c148fc..fff67dcf 100644 --- a/docs/cn/standard/chatlab-format.md +++ b/docs/cn/standard/chatlab-format.md @@ -10,7 +10,9 @@ ChatLab 定义了一套标准的聊天记录数据交换格式,用于支持多 只要你将聊天记录转为该格式,那么就可以被 ChatLab 解析并使用其分析能力。 -::: warning 注意该格式规范目前仍处于早期制定阶段,部分字段和结构可能会在后续版本中调整。::: +::: warning 注意 +该格式规范目前仍处于早期制定阶段,部分字段和结构可能会在后续版本中调整。 +::: ## 概述 @@ -165,7 +167,9 @@ ChatLab 定义了一套标准的聊天记录数据交换格式,用于支持多 ## 消息类型对照表 -::: tip 提示若您的聊天记录中有其他特殊类型需要支持,请提交 issue 说明情况,我们会评估是否加入标准消息类型中。::: +::: tip 提示 +若您的聊天记录中有其他特殊类型需要支持,请提交 issue 说明情况,我们会评估是否加入标准消息类型中。 +::: ### 基础消息类型 (0-19) @@ -232,7 +236,8 @@ https://example.com/avatars/user123.jpg - 如果需要离线使用或长期存档,推荐使用 Data URL 格式 - 导出 Data URL 时建议将头像压缩为 100×100 像素以内,以减小文件体积 -- 如果头像来自可靠的长期有效的 CDN,可使用网络 URL 以减小文件体积 ::: +- 如果头像来自可靠的长期有效的 CDN,可使用网络 URL 以减小文件体积 +::: ## 完整示例 diff --git a/docs/cn/standard/chatlab-import.md b/docs/cn/standard/chatlab-import.md new file mode 100644 index 00000000..e0c42e2d --- /dev/null +++ b/docs/cn/standard/chatlab-import.md @@ -0,0 +1,465 @@ +--- +outline: deep +--- + +# Push 导入协议 + +> v1 + +本文档定义外部数据源向 ChatLab 推送聊天数据的标准导入协议。覆盖首次全量导入、历史回填、周期性增量同步三类场景。 + +::: tip 两种导入方式 + +- **Push 模式**(本文档):外部系统主动将数据推送到 ChatLab 的导入接口。适用于脚本集成、一次性文件导入等场景。 +- **[Pull 模式](./chatlab-pull.md)**:第三方暴露标准 HTTP 端点,ChatLab 主动拉取数据。**推荐的第三方集成方式。** + +两种模式底层共用同一套导入逻辑(去重、meta/members 更新、FTS 索引),数据格式统一为 [ChatLab Format](./chatlab-format.md)。 + +::: + +## 设计原则 + +1. **统一入口**:首次导入和增量导入使用同一个端点,调用方无需区分。 +2. **接口最小化**:1 个导入接口 + 2 个查询接口覆盖全部 Push 场景。 +3. **双层幂等**:请求级幂等(Idempotency-Key)+ 记录级去重(platformMessageId / 内容哈希),承诺 **at-least-once + deterministic dedupe**。 +4. **同步优先**:小批量导入同步返回 `200 OK` 和写入结果。 +5. **默认自动更新**:meta 和 members 默认随导入请求自动更新,可通过 `options` 控制。 + +--- + +## 基础约定 + +### 服务地址 + +``` +Base URL:http://: (桌面端默认 127.0.0.1:5200) +Prefix: /api/v1 +``` + +### 认证 + +所有请求必须携带 Bearer Token: + +``` +Authorization: Bearer +``` + +Token 在 ChatLab 设置页面生成,格式为 `clb_` + 64 字符 hex。 + +### Content-Type + +``` +application/json # 标准 JSON body(≤50MB) +application/x-ndjson # JSONL 流式(无大小限制) +``` + +--- + +## 接口总表 + +| 方法 | 路径 | 说明 | +| ------ | ---------------------------- | -------------------------------------------- | +| `POST` | `/api/v1/imports/:sessionId` | 导入消息到指定会话(首次自动创建,后续追加) | +| `GET` | `/api/v1/sessions/:id` | 查询会话状态(主要用于对账校验) | +| `GET` | `/api/v1/sessions` | 列出所有会话(发现目标 Session) | + +查询接口的详细说明请参阅 [ChatLab API 文档](./chatlab-api.md)。 + +--- + +## POST /api/v1/imports/:sessionId + +**唯一导入入口。** 会话不存在时自动创建,已存在时追加数据并自动更新 meta/members。 + +### Path 参数 + +| 参数 | 类型 | 说明 | +| ----------- | ------ | ------------------------------------------------------------------------------------ | +| `sessionId` | string | Session ID,由调用方生成并维护。同一聊天来源必须保持固定,跨批一致。生成策略见下方。 | + +**Session ID 生成策略:** + +| 优先级 | 场景 | 推荐格式 | 示例 | +| --- | --- | --- | --- | +| 1(首选) | 有平台原始 ID | `{platform}_{originalId}` | `wechat_xxx@chatroom`、`qq_123456789` | +| 2 | 文件导入(有结构化 ID) | `{platform}_{meta.groupId}` 或 `{platform}_{对方platformId}` | `wechat_xxx@chatroom` | +| 3 | 文件导入(无结构化标识) | `file_{SHA256(文件内容)[:16]}` | `file_a1b2c3d4e5f6g7h8` | +| 4(兜底) | 一次性导入 | `import_{UUID}` | `import_550e8400-e29b-41d4-a716-446655440000` | + +::: warning 注意 +- 不建议使用文件路径作为 sessionId 的输入——文件重命名会导致 sessionId 变化 +- 同一聊天来源的首次导入和增量导入**必须**使用相同的 sessionId +::: + +### 请求 Header + +| Header | 必填 | 说明 | +| --- | --- | --- | +| `Authorization` | 是 | `Bearer ` | +| `Content-Type` | 是 | `application/json` 或 `application/x-ndjson` | +| `Idempotency-Key` | 建议 | 当前批次的唯一标识,用于重试安全。建议格式:`{sessionId}-{batchIndex}-{windowStart}` | +| `X-Dry-Run` | 否 | 设为 `true` 时仅分析不写入,返回预估结果 | + +### 请求 Body(JSON 模式) + +```json +{ + "chatlab": { + "version": "0.0.2", + "exportedAt": 1711468800, + "generator": "YourSystem/1.0" + }, + "meta": { + "name": "产品讨论群", + "platform": "wechat", + "type": "group", + "groupId": "xxx@chatroom", + "groupAvatar": "data:image/jpeg;base64,...", + "ownerId": "wxid_owner" + }, + "members": [ + { + "platformId": "wxid_a", + "accountName": "张三", + "groupNickname": "产品", + "avatar": "data:image/jpeg;base64,...", + "roles": [{ "id": "owner" }] + } + ], + "messages": [ + { + "platformMessageId": "msg_1001", + "sender": "wxid_a", + "accountName": "张三", + "groupNickname": "产品", + "timestamp": 1711468800, + "type": 0, + "content": "Hello", + "replyToMessageId": "msg_1000" + } + ], + "options": { + "metaUpdateMode": "patch", + "memberUpdateMode": "upsert" + } +} +``` + +### options 对象(可选) + +| 字段 | 类型 | 默认值 | 可选值 | 说明 | +| --- | --- | --- | --- | --- | +| `metaUpdateMode` | string | `patch` | `patch` / `none` | `patch`:非空字段覆盖更新;`none`:跳过更新 | +| `memberUpdateMode` | string | `upsert` | `upsert` / `none` | `upsert`:新增+更新;`none`:跳过更新 | + +::: tip 提示 +回填历史数据时建议传 `"metaUpdateMode": "none"` 防止旧群名覆盖当前值。 +::: + +### 请求 Body(JSONL 模式) + +每行一个 JSON 对象,通过 `_type` 字段区分类型。行顺序:`header` → `member`(零或多行) → `message`(一或多行)。 + +```jsonl +{"_type":"header","chatlab":{"version":"0.0.2","exportedAt":1711468800,"generator":"YourSystem/1.0"},"meta":{"name":"产品讨论群","platform":"wechat","type":"group","groupId":"xxx@chatroom"},"options":{"metaUpdateMode":"patch","memberUpdateMode":"upsert"}} +{"_type":"member","platformId":"wxid_a","accountName":"张三","groupNickname":"产品"} +{"_type":"message","platformMessageId":"msg_1001","sender":"wxid_a","accountName":"张三","timestamp":1711468800,"type":0,"content":"Hello"} +{"_type":"message","platformMessageId":"msg_1002","sender":"wxid_b","accountName":"李四","timestamp":1711468860,"type":0,"content":"Hi"} +``` + +### 各块携带规则 + +| 块 | 会话首次创建 | 后续增量导入 | 语义 | +| ---------- | ------------ | ------------ | -------------------------------------------------------------- | +| `chatlab` | 必填 | 可选 | 首次用于创建会话;后续用于版本兼容判断 | +| `meta` | 必填 | 可选 | 首次作为初始值;后续携带则非空字段自动覆盖更新 | +| `members` | 必填 | 可选 | 首次写入全量成员;后续按 `platformId` 做 upsert | +| `messages` | 必填 | 必填 | 每批必须包含至少一条消息 | + +**Meta 自动更新规则:** + +- 调用方传了且值不为空的字段 → 覆盖更新 +- 调用方未传或值为空的字段 → 保持原值不变 +- `platform` 和 `type` 不允许增量更新(会话创建后不变) + +**Members Upsert 规则:** + +- 按 `platformId` 匹配:新 `platformId` → 插入为新成员;已存在的 → 更新非空字段 +- 不携带 `members` 时:消息中出现的未知 `sender` 仍会被自动创建为成员(仅含最小信息) + +--- + +### 字段定义 + +#### chatlab 对象 + +| 字段 | 类型 | 必填 | 说明 | +| ------------ | ------ | ---- | --------------------------------- | +| `version` | string | 是 | 格式版本号,当前为 `"0.0.2"` | +| `exportedAt` | number | 是 | 导出/生成时间(秒级 Unix 时间戳) | +| `generator` | string | 否 | 生成工具/系统名称 | + +#### meta 对象 + +| 字段 | 类型 | 首次必填 | 说明 | +| ------------- | ------ | -------- | -------------------------------------------- | +| `name` | string | 是 | 群名/会话名 | +| `platform` | string | 是 | 平台标识(见下方枚举) | +| `type` | string | 是 | 会话类型:`group`(群聊)/ `private`(私聊) | +| `groupId` | string | 否 | 群的平台原始 ID | +| `groupAvatar` | string | 否 | 群头像,base64 Data URL 或网络 URL | +| `ownerId` | string | 否 | 导出者/所有者的 platformId | + +**平台标识枚举:** + +| 值 | 平台 | +| ---------- | --------- | +| `wechat` | 微信 | +| `qq` | QQ | +| `telegram` | Telegram | +| `discord` | Discord | +| `whatsapp` | WhatsApp | +| `line` | LINE | +| `slack` | Slack | +| `unknown` | 未知/其他 | + +::: tip 提示 +如果你的平台不在上述列表中,可使用小写英文标识(如 `signal`、`matrix`),ChatLab 会按 `unknown` 的分析策略处理。 +::: + +#### members 数组元素 + +| 字段 | 类型 | 必填 | 说明 | +| --------------- | ------ | ---- | -------------------------------------- | +| `platformId` | string | 是 | 成员在平台的唯一标识(QQ号、微信ID等) | +| `accountName` | string | 建议 | 账号名称(不随群变化的原始昵称) | +| `groupNickname` | string | 否 | 群内专属昵称 | +| `avatar` | string | 否 | 头像,base64 Data URL 或网络 URL | +| `roles` | array | 否 | 角色列表,见 [角色定义](./chatlab-format.md#角色-roles) | + +#### messages 数组元素 + +| 字段 | 类型 | 必填 | 说明 | +| ------------------- | ------------ | -------- | ------------------------------------------------------------------- | +| `sender` | string | 是 | 发送者的 `platformId` 或保留标识(如 `SYSTEM`) | +| `timestamp` | number | 是 | 消息时间戳,秒级 Unix 时间戳 | +| `type` | number | 是 | 消息类型枚举(见 [消息类型](./chatlab-format.md#消息类型对照表)) | +| `accountName` | string | 建议 | 发送时的账号名称 | +| `groupNickname` | string | 否 | 发送时的群昵称 | +| `content` | string\|null | 否 | 纯文本内容,非文本消息可为 null | +| `platformMessageId` | string | 强烈建议 | 消息的平台原始 ID,去重的首选依据 | +| `replyToMessageId` | string | 否 | 回复目标消息的 `platformMessageId` | + +**sender 字段规则:** + +1. 必须是稳定的 `platformId` 或保留标识 `SYSTEM` +2. 若 `sender` 对应的成员不在 `members` 中,ChatLab 自动补创建成员(仅含最小信息) +3. `SYSTEM` 用于系统消息(入群/退群/公告等),不计入成员统计 + +--- + +### 成功响应 + +```json +{ + "success": true, + "data": { + "sessionId": "wechat_xxx@chatroom", + "created": false, + "batch": { + "receivedCount": 5000, + "writtenCount": 4986, + "duplicateCount": 14 + }, + "session": { + "totalCount": 128640, + "memberCount": 86, + "firstTimestamp": 1609459200, + "lastTimestamp": 1711468800 + }, + "updates": { + "metaUpdated": true, + "membersAdded": 3, + "membersUpdated": 5 + } + } +} +``` + +| 字段 | 说明 | +| ------------------------ | ----------------------------------------------------- | +| `created` | `true` 表示本次请求触发了会话创建(首次导入) | +| `batch.receivedCount` | 本批收到的消息条数 | +| `batch.writtenCount` | 实际写入的条数 | +| `batch.duplicateCount` | 因去重跳过的条数 | +| `session.totalCount` | 写入后会话的累计消息总数 | +| `session.memberCount` | 会话的成员总数 | +| `session.firstTimestamp` | 会话内最早消息时间戳 | +| `session.lastTimestamp` | 会话内最新消息时间戳 | +| `updates.metaUpdated` | 本次请求是否触发了 meta 更新 | +| `updates.membersAdded` | 本次新增的成员数 | +| `updates.membersUpdated` | 本次更新的成员数 | + +--- + +## 去重语义 + +去重在单个 Session 内生效,不跨会话。 + +**优先级:** + +1. 若消息提供了 `platformMessageId`,以此作为唯一键去重(高精度,推荐)。 +2. 若未提供 `platformMessageId`,退化为内容哈希去重:`sha256(timestamp + '\0' + sender + '\0' + contentTag + '\0' + normalizedContent)` + +| 层次 | 机制 | 适用范围 | 精度 | +| ------------------ | ----------------------------------------------- | ----------------------------- | -------- | +| 请求级幂等 | `Idempotency-Key` | 同一 HTTP 请求的重试 | 精确 | +| 消息级去重(主键) | `platformMessageId` | 跨批次、跨窗口的同一条消息 | 精确 | +| 消息级去重(降级) | 内容哈希 `sha256(timestamp + sender + content)` | 无 platformMessageId 时的兜底 | 最大努力 | + +::: warning 注意 +- 同一 `platformMessageId` 的消息不会被重复写入,即使 content 不同(以首次写入为准) +- 内容哈希去重在"同一人、同一秒、完全相同内容"时判定为重复,存在极小概率误判 +- **强烈建议**外部数据源提供 `platformMessageId`,这是最可靠的去重依据 +::: + +--- + +## 分批策略 + +### 默认批次大小 + +每批建议 **5000 条消息**。 + +| 约束 | 值 | 说明 | +| ------------------ | ------------ | ------------------------------- | +| JSON body 大小上限 | 50MB | 超过返回 `BODY_TOO_LARGE` (413) | +| JSONL body 大小 | 无限制 | 通过 stream 写入临时文件后处理 | +| 建议每批消息数 | 5000 条 | 兼顾性能和内存占用 | + +### 分批原则 + +- 按时间顺序分批,从旧到新推送 +- 批次间允许时间重叠(建议重叠 5~10 分钟),依靠去重吸收 +- 每批独立请求,任意一批失败不影响其他批次 +- 第一批必须携带 `chatlab` + `meta` + `members`(触发会话创建) +- 后续批次可仅携带 `messages` + +### 游标维护(由调用方负责) + +ChatLab 不为调用方维护游标。推荐结构: + +```json +{ + "sessionId": "wechat_xxx@chatroom", + "lastSyncedTimestamp": 1711468800, + "lastSyncedMessageId": "msg_900000" +} +``` + +每批成功后,将响应中的 `session.lastTimestamp` 更新到游标。失败时保持游标不变,用相同的 `Idempotency-Key` 重试。 + +### 并发约束 + +当前版本:**同一时刻仅允许一个导入任务**。并发请求会收到 `IMPORT_IN_PROGRESS` (409) 错误。 + +--- + +## 标准调用流程 + +### 首次全量导入(Bootstrap) + +``` +1. 准备全量聊天数据,按时间顺序切分为 N 批(每批 ≤5000 条) + +2. 第一批请求: + POST /api/v1/imports/wechat_xxx@chatroom + Body: { chatlab, meta, members, messages } + → 响应 created: true,会话创建成功 + +3. 第 2~N 批请求: + POST /api/v1/imports/wechat_xxx@chatroom + Body: { messages } + Idempotency-Key: {sessionId}-{batchIndex}-{windowStart} + +4. 每批成功后记录游标;失败用相同 Idempotency-Key 重试 + +5. 全部批次完成后对账: + GET /api/v1/sessions/wechat_xxx@chatroom + → 校验 totalCount、firstTimestamp、lastTimestamp +``` + +### 历史回填(Backfill) + +``` +1. 确定需要补齐的时间区间 [start, end] +2. 按时间窗口拆分为多批 +3. 依次调用 POST /api/v1/imports/:sessionId + (建议设置 options.metaUpdateMode: "none" 防止旧群名覆盖) +4. 每批成功后更新本地游标 +5. 可选:GET /api/v1/sessions/:id 对账 +``` + +### 每日定时增量(Scheduled Sync) + +``` +1. 固定 Session ID + 定时器(如每小时/每天) +2. 从上次游标开始,向前重叠 5~10 分钟作为窗口起点 +3. 拉取该时间窗口内的新消息 +4. 如有新成员或群名变更,携带 meta/members +5. 调用 POST /api/v1/imports/:sessionId +6. 依靠去重吸收重叠数据 +7. 更新本地游标 +``` + +--- + +## 媒体附件(协议预留) + +当前版本重点保证文本消息的导入稳定性。`attachments` 作为协议预留字段:调用方可在消息中携带该字段,ChatLab 当前接收但不承诺完整落库与渲染。 + +预留字段结构见 [ChatLab 格式规范](./chatlab-format.md)。 + +--- + +## 错误码与重试策略 + +| 错误码 | HTTP 状态 | 说明 | 可重试 | +| --- | --- | --- | --- | +| `UNAUTHORIZED` | 401 | Token 无效或缺失 | 否 | +| `INVALID_FORMAT` | 400 | Content-Type 不支持或请求体格式错误 | 否 | +| `INVALID_PAYLOAD` | 400 | 必填字段缺失、类型错误或校验失败 | 否 | +| `BODY_TOO_LARGE` | 413 | JSON body 超过 50MB | 否 | +| `IMPORT_IN_PROGRESS` | 409 | 当前有其他导入正在执行 | 是 | +| `IDEMPOTENCY_CONFLICT` | 409 | 相同幂等键但请求体不一致 | 否 | +| `IMPORT_FAILED` | 500 | 导入过程内部错误 | 是 | +| `SERVER_ERROR` | 500 | 服务内部错误 | 是 | + +**重试策略建议:** + +``` +最大重试次数:3 +退避策略:指数退避 + 随机抖动 + 第 1 次重试:5s + random(0, 1s) + 第 2 次重试:15s + random(0, 3s) + 第 3 次重试:45s + random(0, 5s) +重试时必须使用相同的 Idempotency-Key +``` + +--- + +## 版本与兼容性 + +- `chatlab.version`:`0.0.2` +- API 路径前缀:`/api/v1` +- **向后兼容**:新增字段均为可选,不破坏旧调用方 +- **字段废弃策略**:废弃字段先标记为 `deprecated`,保留两个版本后移除 +- **平台标识可扩展**:`platform` 字段不限于预定义枚举,可传入任意小写字母标识 + +--- + +## 相关文档 + +- [ChatLab API 文档](./chatlab-api.md) — 查询、导出和系统端点 +- [Pull 远程数据源协议](./chatlab-pull.md) — 第三方暴露标准端点,ChatLab 主动拉取 +- [ChatLab 标准化格式规范](./chatlab-format.md) — 数据交换格式定义 diff --git a/docs/cn/standard/chatlab-pull.md b/docs/cn/standard/chatlab-pull.md new file mode 100644 index 00000000..fceaacaa --- /dev/null +++ b/docs/cn/standard/chatlab-pull.md @@ -0,0 +1,308 @@ +--- +outline: deep +--- + +# Pull 远程数据源协议 + +> v1 + +本文档定义第三方数据源暴露标准 HTTP 端点供 ChatLab 主动拉取数据的协议规范。这是 ChatLab 生态**推荐的第三方集成方式**。 + +::: tip 两种导入方式 + +- **[Push 模式](./chatlab-import.md)**:外部系统主动将数据推送到 ChatLab 的导入接口。适用于脚本集成、一次性文件导入。 +- **Pull 模式**(本文档):第三方暴露标准 HTTP 端点,ChatLab 主动拉取数据。**推荐的第三方集成方式。** + +::: + +## 为什么 Pull 是推荐方案 + +- 第三方工具是数据生产者,天然适合暴露数据;ChatLab 是数据消费者/分析者,天然适合主动获取 +- 用户只需在 ChatLab UI 中输入数据源地址,即可浏览、选择、同步——操作完全在 ChatLab 端完成 +- Push 模式需要第三方实现 HTTP 客户端逻辑(批次管理、重试、游标维护),门槛更高 +- Pull 协议定义的是**通用数据暴露标准**,不只服务 ChatLab,任何兼容工具都可以接入 + +**适用场景:** + +- 外部采集端运行在远程设备上,只需暴露 HTTP 接口 +- 用户希望在 ChatLab UI 上浏览可用对话、选择导入、点击"立即同步" +- 需要定时自动增量同步的长期运行场景 + +--- + +## 概述 + +Pull 模式的工作流程分为三个阶段: + +``` +1. 发现:ChatLab 获取数据源上的所有可用对话列表 +2. 拉取:用户选择对话后,ChatLab 拉取历史消息 +3. 同步:定时增量拉取新消息(可选 SSE 实时通知加速) +``` + +第三方数据源只需按本协议实现标准 HTTP 端点,ChatLab(以及未来任何兼容工具)即可自动完成发现、全量拉取和增量同步。 + +--- + +## 阶段一:发现可用对话 + +ChatLab 连接到远程数据源后,首先获取所有可拉取的对话列表。 + +### GET /sessions + +``` +GET {baseUrl}/sessions +Authorization: Bearer {token} ← 仅配置了 token 时携带 +Accept: application/json +``` + +**可选参数:** + +| 参数 | 类型 | 说明 | +| --------- | ------ | ------------------------ | +| `keyword` | string | 按对话名称模糊搜索 | +| `limit` | number | 返回条数限制(默认全部) | + +**响应:** + +```json +{ + "sessions": [ + { + "id": "xxx@chatroom", + "name": "产品讨论群", + "platform": "wechat", + "type": "group", + "messageCount": 58000, + "memberCount": 86, + "lastMessageAt": 1711468800 + }, + { + "id": "wxid_friend_a", + "name": "张三", + "platform": "wechat", + "type": "private", + "messageCount": 1200, + "memberCount": 2, + "lastMessageAt": 1711465200 + } + ] +} +``` + +| 字段 | 类型 | 必填 | 说明 | +| --------------- | ------ | ---- | ----------------------------------- | +| `id` | string | 是 | 对话在数据源中的唯一标识 | +| `name` | string | 是 | 对话名称(群名/联系人名) | +| `platform` | string | 是 | 平台标识(与 Push 模式相同) | +| `type` | string | 是 | `group` / `private` | +| `messageCount` | number | 否 | 消息总数(用于 ChatLab 展示预估量) | +| `memberCount` | number | 否 | 成员数 | +| `lastMessageAt` | number | 否 | 最新消息时间戳 | + +ChatLab 在 UI 中展示该列表,用户选择需要导入的对话。 + +--- + +## 阶段二:拉取对话数据 + +用户选定对话后,ChatLab 拉取指定对话的数据。 + +### GET /sessions/:id/messages + +``` +GET {baseUrl}/sessions/{sessionId}/messages?format=chatlab&since={timestamp} +Authorization: Bearer {token} +Accept: application/json, application/x-ndjson +``` + +| 参数 | 必填 | 说明 | +| ----------- | ---- | ------------------------------------------------------------------- | +| `sessionId` | 是 | 来自阶段一返回的对话 `id` | +| `format` | 是 | 固定为 `chatlab`,要求数据源返回 ChatLab 标准格式 | +| `since` | 否 | Unix 时间戳(秒级)。省略或为 `0` 时为全量拉取,大于 0 时为增量拉取 | +| `end` | 否 | Unix 时间戳(秒级)。指定时间范围上界,用于历史回填等分段拉取 | +| `limit` | 否 | 单次返回的最大消息数,用于分页 | +| `offset` | 否 | 分页偏移量,配合 `limit` 使用 | + +### 数据携带规则 + +- **首次全量**(`since` 为空或 0):**必须**包含 `chatlab` + `meta` + `members` + `messages` +- **增量同步**(`since > 0`):**必须**包含 `messages`。`meta` / `members` **仅在发生实际变更时携带**,未变更时不得携带,以避免历史快照覆盖当前状态 +- 无新数据时返回空 `messages` 数组 + +### 响应格式 + +响应为标准 [ChatLab Format](./chatlab-format.md)(JSON 或 JSONL),并附带 `sync` 同步元信息。 + +```json +{ + "chatlab": { "version": "0.0.2", "exportedAt": 1711468800 }, + "meta": { "name": "产品讨论群", "platform": "wechat", "type": "group" }, + "members": [ ... ], + "messages": [ ... ], + "sync": { + "hasMore": true, + "nextSince": 1711468800, + "nextOffset": 5000, + "watermark": 1711470000 + } +} +``` + +### sync 同步元信息 + +| 字段 | 类型 | 说明 | +| ------------ | ------- | ------------------------------------------------------------------------------------------ | +| `hasMore` | boolean | 是否还有更多数据。为 `true` 时 ChatLab 自动续拉 | +| `nextSince` | number | 下一次请求建议使用的 `since` 值 | +| `nextOffset` | number | 下一次请求建议使用的 `offset` 值,用于分页续拉 | +| `watermark` | number | 本次拉取的快照上界时间戳,防止分页期间新数据写入导致的窗口漂移 | + +**sync 块的必要性规则:** + +| 数据源返回方式 | sync 块要求 | 说明 | +| -------------------------- | ----------- | ---------------------------------------------------------------- | +| 单次返回全部数据(不分页) | 可选 | ChatLab 视 `messages` 为完整结果 | +| 支持 `limit`/`offset` 分页 | **必须** | 至少包含 `hasMore`;若无法保证快照一致性,还必须包含 `watermark` | + +::: warning 注意 +若数据源支持分页但未返回 `sync` 块,ChatLab 不保证自动续拉和分页一致性——仅处理首次返回的数据。 +::: + +### 快照一致性要求 + +- 同一次分页拉取(通过 `limit`/`offset` 遍历)**应当**基于同一数据快照视图 +- 若数据源无法保证快照一致性,**必须**返回 `sync.watermark`,供 ChatLab 固定拉取窗口上界 + +### 分批拉取策略 + +对于大量历史数据(如数万条消息),推荐两种分批方式: + +1. **时间窗口分批**:通过 `since` + `end` 指定时间段,逐段拉取 +2. **分页分批**:通过 `limit` + `offset` 做数据库级分页,结合 `sync` 块自动续拉 + +ChatLab 会自动处理分批场景,通过去重机制保证不重复写入。 + +--- + +## 阶段三:定时增量同步 + +ChatLab 按用户配置的间隔,定期对已订阅的对话执行增量拉取: + +``` +GET {baseUrl}/sessions/{sessionId}/messages?format=chatlab&since={lastPullAt} +``` + +远程数据源返回 `since` 之后的增量消息。ChatLab 通过内部导入管道处理(去重、meta/members 更新、FTS 索引等全部复用 Push 模式逻辑)。 + +--- + +## 可选:SSE 实时通知 + +除定时轮询外,远程数据源可**可选**实现 SSE(Server-Sent Events)端点,用于**通知 ChatLab 有新数据可拉取**。 + +::: warning 重要 +SSE 仅作为通知通道,不是数据同步主通道。ChatLab 不假设 SSE 事件可靠送达(网络断连、进程重启均可能丢失事件)。最终数据一致性始终由定时 Pull 保证。SSE 的作用是将增量同步延迟从"分钟级"降到"秒级"。 +::: + +### GET /push/messages + +``` +GET {baseUrl}/push/messages +Authorization: Bearer {token} +Accept: text/event-stream +``` + +**事件格式:** + +``` +event: message.new +data: {"eventId":"evt_001","sessionId":"xxx@chatroom","timestamp":1711468800} +``` + +| 字段 | 类型 | 必填 | 说明 | +| ------------------- | ------ | ---- | ------------------------------------------ | +| `eventId` | string | 是 | 事件唯一 ID,用于 ChatLab 去重已处理的通知 | +| `sessionId` | string | 是 | 有新消息的对话 ID | +| `timestamp` | number | 是 | 新消息的时间戳 | +| `platformMessageId` | string | 否 | 新消息的平台 ID(如可获取) | + +ChatLab 接收到 SSE 事件后,**触发一次该 session 的增量拉取**(调用 `GET /sessions/:id/messages?format=chatlab&since={lastPullAt}`),而非直接将事件数据写入存储。 + +--- + +## 认证 + +远程数据源可选择是否要求认证。如果需要,使用 `Authorization: Bearer {token}` 机制。 + +::: tip SSE 认证 +部分数据源额外支持 `?access_token=TOKEN` 查询参数方式传递 Token(SSE 长连接场景推荐此方式,因为 EventSource API 不支持自定义 Header)。ChatLab 在连接 SSE 时也支持查询参数传 Token。 +::: + +--- + +## 实现指南 + +### 最小实现(2 个端点) + +只需实现以下两个端点即可接入 ChatLab: + +| 端点 | 说明 | +| --------------------------------------------------- | --------------------- | +| `GET /sessions` | 返回对话列表 | +| `GET /sessions/:id/messages?format=chatlab&since=X` | 返回 ChatLab 格式数据 | + +最小实现不需要分页、SSE 或复杂的 `sync` 块。ChatLab 会将响应中的 `messages` 视为完整数据。 + +### 增强实现 + +| 能力 | 说明 | +| -------------------------- | ------------------------------------------------------------ | +| `GET /push/messages` | SSE 实时通知(仅唤醒拉取,不传输完整数据) | +| 支持 `limit`/`offset` 分页 | 大量历史数据的分页拉取 | +| 支持 `end` 参数 | 时间窗口分段拉取 | +| 响应包含 `sync` 块 | 提供 `hasMore`/`nextSince`/`nextOffset`/`watermark` 续拉信息 | + +### 数据格式 + +所有数据响应必须符合 [ChatLab 标准化格式规范](./chatlab-format.md)(JSON 或 JSONL),包括 `chatlab`、`meta`、`members`、`messages` 四个标准块。 + +### 媒体文件 + +如果数据源的消息中包含媒体引用,`attachments` 中的 `filePath` 或 `dataUri` 可指向数据源的媒体服务端点。ChatLab 当前按"协议预留"处理,未来版本将支持从数据源拉取媒体文件。 + +--- + +## 示例场景 + +某采集端在手机上持续采集微信消息,暴露 `GET /sessions` 和 `GET /sessions/:id/messages` 两个端点。用户在 ChatLab 中操作: + +``` +1. 在 ChatLab 设置中添加远程数据源(输入采集端 URL + 可选 Token) + +2. ChatLab 调用 GET {baseUrl}/sessions + → 展示 86 个群和 200 个私聊 + +3. 用户选择其中 5 个群导入 + +4. ChatLab 立即执行全量拉取: + GET {baseUrl}/sessions/{id}/messages?format=chatlab&since=0 + → 如有 sync.hasMore=true,自动续拉直到全部完成 + +5. 之后每小时自动增量同步: + GET {baseUrl}/sessions/{id}/messages?format=chatlab&since={lastPullAt} + +6. 如果采集端实现了 SSE: + 收到 message.new 事件 → 立即触发增量拉取(不等定时器) + +7. 用户可随时在 ChatLab UI 点击"立即同步" +``` + +--- + +## 相关文档 + +- [ChatLab API 文档](./chatlab-api.md) — 查询、导出和系统端点 +- [Push 导入协议](./chatlab-import.md) — 外部系统主动推送数据到 ChatLab +- [ChatLab 标准化格式规范](./chatlab-format.md) — 数据交换格式定义 diff --git a/docs/en/standard/chatlab-format.md b/docs/en/standard/chatlab-format.md index 56398049..74d1aa00 100644 --- a/docs/en/standard/chatlab-format.md +++ b/docs/en/standard/chatlab-format.md @@ -8,7 +8,9 @@ ChatLab defines a standard chat record data exchange format to support unified i As long as you convert your chat records to this format, ChatLab can parse and analyze them. -::: warning Notice This format specification is still in its early development stage. Some fields and structures may be adjusted in future versions. ::: +::: warning Notice +This format specification is still in its early development stage. Some fields and structures may be adjusted in future versions. +::: ## Overview @@ -111,7 +113,9 @@ Here's a **minimal** ChatLab format example with only required fields: ## Message Type Reference -::: warning Tip If you have other special types in your chat records that need support, please submit an issue explaining your situation. We'll evaluate whether to add them to the standard message types. ::: +::: warning Tip +If you have other special types in your chat records that need support, please submit an issue explaining your situation. We'll evaluate whether to add them to the standard message types. +::: ### Basic Message Types (0-19) @@ -162,7 +166,9 @@ Supported image formats: - `image/gif` - GIF format - `image/webp` - WebP format -::: tip Suggestion When exporting, we recommend compressing avatars to 100×100 pixels or less to reduce file size. ::: +::: tip Suggestion +When exporting, we recommend compressing avatars to 100×100 pixels or less to reduce file size. +::: ## Complete Examples diff --git a/docs/tw/chatlab-format.md b/docs/tw/chatlab-format.md index bb9d094d..46c521f9 100644 --- a/docs/tw/chatlab-format.md +++ b/docs/tw/chatlab-format.md @@ -4,7 +4,9 @@ ChatLab 定義了一套標準的聊天記錄資料交換格式,用於支援多 只要你將聊天記錄轉為該格式,那麼就可以被 ChatLab 解析並使用其分析能力。 -::: warning 注意該格式規範目前仍處於早期制定階段,部分欄位和結構可能會在後續版本中調整。::: +::: warning 注意 +該格式規範目前仍處於早期制定階段,部分欄位和結構可能會在後續版本中調整。 +::: ## 概述 @@ -170,7 +172,9 @@ ChatLab 定義了一套標準的聊天記錄資料交換格式,用於支援多 ## 訊息類型對照表 -::: tip 提示若您的聊天記錄中有其他特殊類型需要支援,請提交 issue 說明情況,我們會評估是否加入標準訊息類型中。::: +::: tip 提示 +若您的聊天記錄中有其他特殊類型需要支援,請提交 issue 說明情況,我們會評估是否加入標準訊息類型中。 +::: ### 基礎訊息類型 (0-19) diff --git a/docs/tw/standard/chatlab-format.md b/docs/tw/standard/chatlab-format.md index 3eb51a9f..3b4b53c2 100644 --- a/docs/tw/standard/chatlab-format.md +++ b/docs/tw/standard/chatlab-format.md @@ -10,7 +10,9 @@ ChatLab 定義了一套標準的聊天記錄資料交換格式,用於支援多 只要你將聊天記錄轉為該格式,那麼就可以被 ChatLab 解析並使用其分析能力。 -::: warning 注意該格式規範目前仍處於早期制定階段,部分欄位和結構可能會在後續版本中調整。::: +::: warning 注意 +該格式規範目前仍處於早期制定階段,部分欄位和結構可能會在後續版本中調整。 +::: ## 概述 @@ -165,7 +167,9 @@ ChatLab 定義了一套標準的聊天記錄資料交換格式,用於支援多 ## 訊息類型對照表 -::: tip 提示若您的聊天記錄中有其他特殊類型需要支援,請提交 issue 說明情況,我們會評估是否加入標準訊息類型中。::: +::: tip 提示 +若您的聊天記錄中有其他特殊類型需要支援,請提交 issue 說明情況,我們會評估是否加入標準訊息類型中。 +::: ### 基礎訊息類型 (0-19) diff --git a/electron/main/api/auth.ts b/electron/main/api/auth.ts index 7b1a1f6b..479ba1c7 100644 --- a/electron/main/api/auth.ts +++ b/electron/main/api/auth.ts @@ -3,9 +3,17 @@ */ import type { FastifyRequest, FastifyReply } from 'fastify' +import { timingSafeEqual } from 'crypto' import { loadConfig } from './config' import { unauthorized, errorResponse } from './errors' +function safeTokenCompare(a: string, b: string): boolean { + const bufA = Buffer.from(a) + const bufB = Buffer.from(b) + if (bufA.length !== bufB.length) return false + return timingSafeEqual(bufA, bufB) +} + export async function authHook(request: FastifyRequest, reply: FastifyReply): Promise { const authHeader = request.headers.authorization if (!authHeader || !authHeader.startsWith('Bearer ')) { @@ -17,7 +25,7 @@ export async function authHook(request: FastifyRequest, reply: FastifyReply): Pr const token = authHeader.slice(7) const config = loadConfig() - if (!config.token || token !== config.token) { + if (!config.token || !safeTokenCompare(token, config.token)) { const err = unauthorized() reply.code(err.statusCode).send(errorResponse(err)) return diff --git a/electron/main/api/config.ts b/electron/main/api/config.ts index c8a0dc8c..beba585a 100644 --- a/electron/main/api/config.ts +++ b/electron/main/api/config.ts @@ -7,6 +7,7 @@ import * as fs from 'fs' import * as path from 'path' import * as crypto from 'crypto' import { getSettingsDir, ensureDir } from '../paths' +import { apiLogger } from './logger' const CONFIG_FILE = 'api-server.json' @@ -41,7 +42,7 @@ export function loadConfig(): ApiServerConfig { return { ...DEFAULT_CONFIG, ...parsed } } } catch (err) { - console.error('[ApiConfig] Failed to load config:', err) + apiLogger.error('[Config] Failed to load config', err) } return { ...DEFAULT_CONFIG } } @@ -51,7 +52,7 @@ export function saveConfig(config: ApiServerConfig): void { ensureDir(getSettingsDir()) fs.writeFileSync(getConfigPath(), JSON.stringify(config, null, 2), 'utf-8') } catch (err) { - console.error('[ApiConfig] Failed to save config:', err) + apiLogger.error('[Config] Failed to save config', err) } } diff --git a/electron/main/api/dataSource.ts b/electron/main/api/dataSource.ts index 907ae586..4f099ee9 100644 --- a/electron/main/api/dataSource.ts +++ b/electron/main/api/dataSource.ts @@ -1,37 +1,38 @@ /** - * ChatLab API — Data source configuration management + * ChatLab API — Data source configuration management (hierarchical model) * Persisted to userData/settings/data-sources.json + * + * DataSource (server) → ImportSession[] (subscribed conversations) */ import * as fs from 'fs' import * as path from 'path' import * as crypto from 'crypto' import { getSettingsDir, ensureDir } from '../paths' +import { apiLogger } from './logger' const CONFIG_FILE = 'data-sources.json' +export interface ImportSession { + id: string + name: string + remoteSessionId: string + targetSessionId: string + lastPullAt: number + lastStatus: 'idle' | 'success' | 'error' + lastError: string + lastNewMessages: number +} + export interface DataSource { id: string name: string - url: string - /** Remote API Bearer Token (optional) */ + baseUrl: string token: string - /** Pull interval in minutes */ intervalMinutes: number - /** Whether scheduled pulling is enabled */ enabled: boolean - /** Target session ID (empty string means create new each time) */ - targetSessionId: string - /** Last pull timestamp (seconds) */ - lastPullAt: number - /** Last pull status */ - lastStatus: 'idle' | 'success' | 'error' - /** Last error message */ - lastError: string - /** Number of new messages from last pull */ - lastNewMessages: number - /** Created timestamp (seconds) */ createdAt: number + sessions: ImportSession[] } function getConfigPath(): string { @@ -46,7 +47,7 @@ export function loadDataSources(): DataSource[] { return JSON.parse(raw) as DataSource[] } } catch (err) { - console.error('[DataSource] Failed to load config:', err) + apiLogger.error('[DataSource] Failed to load config', err) } return [] } @@ -56,39 +57,59 @@ export function saveDataSources(sources: DataSource[]): void { ensureDir(getSettingsDir()) fs.writeFileSync(getConfigPath(), JSON.stringify(sources, null, 2), 'utf-8') } catch (err) { - console.error('[DataSource] Failed to save config:', err) + apiLogger.error('[DataSource] Failed to save config', err) } } -export function generateId(): string { - return `ds_${crypto.randomBytes(6).toString('hex')}` +export function generateId(prefix: string = 'ds'): string { + return `${prefix}_${crypto.randomBytes(6).toString('hex')}` } -export function addDataSource( - partial: Omit -): DataSource { +export function normalizeBaseUrl(input: string): string { + let url = input.trim().replace(/\/+$/, '') + if (url && !/^https?:\/\//i.test(url)) { + url = `http://${url}` + } + if (url && !url.endsWith('/api/v1')) { + url = url.replace(/\/api\/v1$/, '') + '/api/v1' + } + return url +} + +// ==================== DataSource CRUD ==================== + +export function addDataSource(partial: { name?: string; baseUrl: string; token: string; intervalMinutes: number }): DataSource { const sources = loadDataSources() const ds: DataSource = { - ...partial, - id: generateId(), - lastPullAt: 0, - lastStatus: 'idle', - lastError: '', - lastNewMessages: 0, + id: generateId('src'), + name: partial.name || '', + baseUrl: normalizeBaseUrl(partial.baseUrl), + token: partial.token, + intervalMinutes: partial.intervalMinutes, + enabled: true, createdAt: Math.floor(Date.now() / 1000), + sessions: [], } sources.push(ds) saveDataSources(sources) return ds } -export function updateDataSource(id: string, updates: Partial): DataSource | null { +export function updateDataSource( + id: string, + updates: Partial> +): DataSource | null { const sources = loadDataSources() const idx = sources.findIndex((s) => s.id === id) if (idx === -1) return null - sources[idx] = { ...sources[idx], ...updates, id } + const ds = sources[idx] + if (updates.name !== undefined) ds.name = updates.name + if (updates.baseUrl !== undefined) ds.baseUrl = normalizeBaseUrl(updates.baseUrl) + if (updates.token !== undefined) ds.token = updates.token + if (updates.intervalMinutes !== undefined) ds.intervalMinutes = updates.intervalMinutes + if (updates.enabled !== undefined) ds.enabled = updates.enabled saveDataSources(sources) - return sources[idx] + return ds } export function deleteDataSource(id: string): boolean { @@ -103,3 +124,59 @@ export function getDataSource(id: string): DataSource | null { const sources = loadDataSources() return sources.find((s) => s.id === id) || null } + +// ==================== ImportSession CRUD ==================== + +export function addImportSessions( + sourceId: string, + sessions: Array<{ name: string; remoteSessionId: string }> +): ImportSession[] { + const sources = loadDataSources() + const ds = sources.find((s) => s.id === sourceId) + if (!ds) return [] + + const added: ImportSession[] = [] + for (const sess of sessions) { + if (ds.sessions.some((s) => s.remoteSessionId === sess.remoteSessionId)) continue + const imp: ImportSession = { + id: generateId('sess'), + name: sess.name, + remoteSessionId: sess.remoteSessionId, + targetSessionId: '', + lastPullAt: 0, + lastStatus: 'idle', + lastError: '', + lastNewMessages: 0, + } + ds.sessions.push(imp) + added.push(imp) + } + saveDataSources(sources) + return added +} + +export function removeImportSession(sourceId: string, sessionId: string): boolean { + const sources = loadDataSources() + const ds = sources.find((s) => s.id === sourceId) + if (!ds) return false + const before = ds.sessions.length + ds.sessions = ds.sessions.filter((s) => s.id !== sessionId) + if (ds.sessions.length === before) return false + saveDataSources(sources) + return true +} + +export function updateImportSession( + sourceId: string, + sessionId: string, + updates: Partial +): ImportSession | null { + const sources = loadDataSources() + const ds = sources.find((s) => s.id === sourceId) + if (!ds) return null + const sess = ds.sessions.find((s) => s.id === sessionId) + if (!sess) return null + Object.assign(sess, updates, { id: sessionId }) + saveDataSources(sources) + return sess +} diff --git a/electron/main/api/errors.ts b/electron/main/api/errors.ts index 27e848f8..69cf74c2 100644 --- a/electron/main/api/errors.ts +++ b/electron/main/api/errors.ts @@ -6,11 +6,13 @@ export enum ApiErrorCode { UNAUTHORIZED = 'UNAUTHORIZED', SESSION_NOT_FOUND = 'SESSION_NOT_FOUND', INVALID_FORMAT = 'INVALID_FORMAT', + INVALID_PAYLOAD = 'INVALID_PAYLOAD', SQL_READONLY_VIOLATION = 'SQL_READONLY_VIOLATION', SQL_EXECUTION_ERROR = 'SQL_EXECUTION_ERROR', EXPORT_TOO_LARGE = 'EXPORT_TOO_LARGE', BODY_TOO_LARGE = 'BODY_TOO_LARGE', IMPORT_IN_PROGRESS = 'IMPORT_IN_PROGRESS', + IDEMPOTENCY_CONFLICT = 'IDEMPOTENCY_CONFLICT', IMPORT_FAILED = 'IMPORT_FAILED', SERVER_ERROR = 'SERVER_ERROR', } @@ -19,11 +21,13 @@ const HTTP_STATUS: Record = { [ApiErrorCode.UNAUTHORIZED]: 401, [ApiErrorCode.SESSION_NOT_FOUND]: 404, [ApiErrorCode.INVALID_FORMAT]: 400, + [ApiErrorCode.INVALID_PAYLOAD]: 400, [ApiErrorCode.SQL_READONLY_VIOLATION]: 400, [ApiErrorCode.SQL_EXECUTION_ERROR]: 400, [ApiErrorCode.EXPORT_TOO_LARGE]: 400, [ApiErrorCode.BODY_TOO_LARGE]: 413, [ApiErrorCode.IMPORT_IN_PROGRESS]: 409, + [ApiErrorCode.IDEMPOTENCY_CONFLICT]: 409, [ApiErrorCode.IMPORT_FAILED]: 500, [ApiErrorCode.SERVER_ERROR]: 500, } @@ -52,6 +56,14 @@ export function invalidFormat(message: string): ApiError { return new ApiError(ApiErrorCode.INVALID_FORMAT, message) } +export function invalidPayload(message: string): ApiError { + return new ApiError(ApiErrorCode.INVALID_PAYLOAD, message) +} + +export function idempotencyConflict(): ApiError { + return new ApiError(ApiErrorCode.IDEMPOTENCY_CONFLICT, 'Same Idempotency-Key with different request body hash') +} + export function sqlReadonlyViolation(): ApiError { return new ApiError(ApiErrorCode.SQL_READONLY_VIOLATION, 'Only SELECT queries are allowed') } diff --git a/electron/main/api/index.ts b/electron/main/api/index.ts index ca6dd0a0..f6ab6fa3 100644 --- a/electron/main/api/index.ts +++ b/electron/main/api/index.ts @@ -9,6 +9,7 @@ import { loadConfig, saveConfig, ensureToken, type ApiServerConfig } from './con import { registerSystemRoutes } from './routes/system' import { registerSessionRoutes } from './routes/sessions' import { registerImportRoutes } from './routes/import' +import { apiLogger } from './logger' let server: FastifyInstance | null = null let startedAt: number | null = null @@ -32,7 +33,7 @@ export function getStatus(): ApiServerStatus { export async function start(): Promise { if (server) { - console.log('[ChatLab API] Server already running') + apiLogger.info('Server already running') return } @@ -48,17 +49,17 @@ export async function start(): Promise { await server.listen({ port: config.port, host: '127.0.0.1' }) startedAt = Math.floor(Date.now() / 1000) - console.log(`[ChatLab API] Server started on http://127.0.0.1:${config.port}`) + apiLogger.info(`Server started on http://127.0.0.1:${config.port}`) } catch (err: any) { server = null startedAt = null if (err.code === 'EADDRINUSE') { lastError = `PORT_IN_USE:${config.port}` - console.warn(`[ChatLab API] Port ${config.port} is already in use`) + apiLogger.warn(`Port ${config.port} is already in use`) } else { lastError = err.message || 'Unknown error' - console.error('[ChatLab API] Failed to start:', err) + apiLogger.error('Failed to start', err) } throw err } @@ -70,12 +71,12 @@ export async function stop(): Promise { try { await server.close() } catch (err) { - console.error('[ChatLab API] Error closing server:', err) + apiLogger.error('Error closing server', err) } finally { server = null startedAt = null lastError = null - console.log('[ChatLab API] Server stopped') + apiLogger.info('Server stopped') } } diff --git a/electron/main/api/logger.ts b/electron/main/api/logger.ts new file mode 100644 index 00000000..a216fd92 --- /dev/null +++ b/electron/main/api/logger.ts @@ -0,0 +1,65 @@ +/** + * ChatLab API — Dedicated logger + * Writes to userData/logs/api.log alongside app.log + */ + +import * as fs from 'fs' +import * as path from 'path' +import { getLogsDir, ensureDir } from '../paths' + +let headerWritten = false + +function getLogPath(): string { + return path.join(getLogsDir(), 'api.log') +} + +function formatPathForLog(filePath: string): string { + return filePath.replace(/ /g, '\\ ') +} + +function ensureHeader(): void { + if (headerWritten) return + headerWritten = true + try { + const logPath = getLogPath() + ensureDir(getLogsDir()) + const isNew = !fs.existsSync(logPath) || fs.statSync(logPath).size === 0 + if (isNew) { + fs.writeFileSync(logPath, `Local Path: ${formatPathForLog(logPath)}\n\n`, 'utf-8') + } + } catch { + // silent + } +} + +function formatTime(): string { + const now = new Date() + const y = now.getFullYear() + const M = String(now.getMonth() + 1).padStart(2, '0') + const d = String(now.getDate()).padStart(2, '0') + const h = String(now.getHours()).padStart(2, '0') + const m = String(now.getMinutes()).padStart(2, '0') + const s = String(now.getSeconds()).padStart(2, '0') + return `${y}-${M}-${d} ${h}:${m}:${s}` +} + +function write(level: string, message: string, detail?: unknown): void { + try { + ensureHeader() + let line = `[${formatTime()}] [${level}] ${message}` + if (detail !== undefined) { + const extra = detail instanceof Error ? detail.stack || detail.message : JSON.stringify(detail) + line += ` ${extra}` + } + line += '\n' + fs.appendFileSync(getLogPath(), line, 'utf-8') + } catch { + // silent — never let logging break the app + } +} + +export const apiLogger = { + info: (msg: string, detail?: unknown) => write('INFO', msg, detail), + warn: (msg: string, detail?: unknown) => write('WARN', msg, detail), + error: (msg: string, detail?: unknown) => write('ERROR', msg, detail), +} diff --git a/electron/main/api/pullDiscovery.ts b/electron/main/api/pullDiscovery.ts new file mode 100644 index 00000000..a35bc49d --- /dev/null +++ b/electron/main/api/pullDiscovery.ts @@ -0,0 +1,68 @@ +/** + * ChatLab API — Pull discovery + * Fetches available sessions from a remote data source via GET /sessions + */ + +import { net } from 'electron' +import { normalizeBaseUrl } from './dataSource' + +export interface RemoteSession { + id: string + name: string + platform: string + type: string + messageCount?: number + memberCount?: number + lastMessageAt?: number +} + +/** + * Fetch available sessions from a remote data source. + * Calls GET {baseUrl}/sessions according to the Pull protocol. + */ +export function fetchRemoteSessions(baseUrl: string, token?: string): Promise { + return new Promise((resolve, reject) => { + const url = normalizeBaseUrl(baseUrl) + '/sessions?format=chatlab' + + const request = net.request(url) + if (token) { + request.setHeader('Authorization', `Bearer ${token}`) + } + request.setHeader('Accept', 'application/json') + + let body = '' + + request.on('response', (response) => { + if (response.statusCode !== 200) { + reject(new Error(`Remote server returned HTTP ${response.statusCode}`)) + return + } + + response.on('data', (chunk: Buffer) => { + body += chunk.toString('utf-8') + }) + + response.on('end', () => { + try { + const parsed = JSON.parse(body) + const sessions: RemoteSession[] = Array.isArray(parsed) + ? parsed + : (parsed.data?.sessions ?? parsed.sessions ?? []) + resolve(sessions) + } catch (err) { + reject(new Error('Failed to parse remote sessions response')) + } + }) + + response.on('error', (err: Error) => { + reject(err) + }) + }) + + request.on('error', (err: Error) => { + reject(err) + }) + + request.end() + }) +} diff --git a/electron/main/api/pullScheduler.ts b/electron/main/api/pullScheduler.ts index fdf398a7..99115df8 100644 --- a/electron/main/api/pullScheduler.ts +++ b/electron/main/api/pullScheduler.ts @@ -1,6 +1,6 @@ /** - * ChatLab API — Pull scheduler - * Periodically fetches ChatLab Format data from external sources and imports it + * ChatLab API — Pull scheduler (hierarchical data source model) + * One timer per DataSource; each tick pulls all ImportSessions under it. */ import * as fs from 'fs' @@ -9,12 +9,20 @@ import * as crypto from 'crypto' import { net, BrowserWindow } from 'electron' import { getTempDir } from '../paths' import * as worker from '../worker/workerManager' -import { loadDataSources, saveDataSources, type DataSource } from './dataSource' +import { + loadDataSources, + type DataSource, + type ImportSession, + updateImportSession, +} from './dataSource' import { getImportingStatus } from './routes/import' +import { apiLogger } from './logger' const timers = new Map>() let initialized = false +// ==================== Helpers ==================== + function getTempFilePath(ext: string): string { const id = crypto.randomBytes(8).toString('hex') return path.join(getTempDir(), `pull-import-${id}${ext}`) @@ -30,8 +38,7 @@ function cleanupTempFile(filePath: string): void { function notifySessionListChanged(): void { try { - const wins = BrowserWindow.getAllWindows() - for (const win of wins) { + for (const win of BrowserWindow.getAllWindows()) { win.webContents.send('api:importCompleted') } } catch { @@ -39,29 +46,92 @@ function notifySessionListChanged(): void { } } -function notifyPullResult(dsId: string, status: 'success' | 'error', detail: string): void { +function notifyPullResult( + sourceId: string, + sessionId: string | undefined, + status: 'success' | 'error', + detail: string +): void { try { - const wins = BrowserWindow.getAllWindows() - for (const win of wins) { - win.webContents.send('api:pullResult', { dsId, status, detail }) + for (const win of BrowserWindow.getAllWindows()) { + win.webContents.send('api:pullResult', { sourceId, sessionId, status, detail }) } } catch { /* ignore */ } } -/** - * Fetch data from remote URL to a temporary file - */ -async function fetchToTempFile(ds: DataSource): Promise { - return new Promise((resolve, reject) => { - const url = ds.url.includes('?') ? `${ds.url}&since=${ds.lastPullAt}` : `${ds.url}?since=${ds.lastPullAt}` +// ==================== Sync block ==================== +interface SyncMeta { + hasMore: boolean + nextSince?: number + nextOffset?: number + watermark?: number +} + +function parseSyncFromFile(filePath: string): SyncMeta | null { + try { + const isJsonl = filePath.endsWith('.jsonl') + if (isJsonl) { + const content = fs.readFileSync(filePath, 'utf-8') + const lines = content.trimEnd().split('\n') + for (let i = lines.length - 1; i >= Math.max(0, lines.length - 5); i--) { + try { + const obj = JSON.parse(lines[i]) + if (obj._type === 'sync') { + return { hasMore: !!obj.hasMore, nextSince: obj.nextSince, nextOffset: obj.nextOffset, watermark: obj.watermark } + } + } catch { + continue + } + } + return null + } + + const raw = fs.readFileSync(filePath, 'utf-8') + const parsed = JSON.parse(raw) + if (parsed.sync && typeof parsed.sync === 'object') { + const s = parsed.sync + return { hasMore: !!s.hasMore, nextSince: s.nextSince, nextOffset: s.nextOffset, watermark: s.watermark } + } + return null + } catch { + return null + } +} + +// ==================== Fetch ==================== + +interface FetchParams { + since?: number + offset?: number + end?: number + limit?: number +} + +function buildPullUrl(baseUrl: string, remoteSessionId: string, params: FetchParams): string { + const base = `${baseUrl}/sessions/${remoteSessionId}/messages` + const qs: string[] = ['format=chatlab'] + if (params.since !== undefined && params.since > 0) qs.push(`since=${params.since}`) + if (params.offset !== undefined && params.offset > 0) qs.push(`offset=${params.offset}`) + if (params.end !== undefined && params.end > 0) qs.push(`end=${params.end}`) + if (params.limit !== undefined && params.limit > 0) qs.push(`limit=${params.limit}`) + + return base + '?' + qs.join('&') +} + +async function fetchToTempFile( + baseUrl: string, + remoteSessionId: string, + token: string, + params: FetchParams +): Promise { + return new Promise((resolve, reject) => { + const url = buildPullUrl(baseUrl, remoteSessionId, params) const request = net.request(url) - if (ds.token) { - request.setHeader('Authorization', `Bearer ${ds.token}`) - } + if (token) request.setHeader('Authorization', `Bearer ${token}`) request.setHeader('Accept', 'application/json, application/x-ndjson') const contentType = { value: '' } @@ -79,16 +149,8 @@ async function fetchToTempFile(ds: DataSource): Promise { tempFile = getTempFilePath(isJsonl ? '.jsonl' : '.json') writeStream = fs.createWriteStream(tempFile) - response.on('data', (chunk: Buffer) => { - writeStream!.write(chunk) - }) - - response.on('end', () => { - writeStream!.end(() => { - resolve(tempFile) - }) - }) - + response.on('data', (chunk: Buffer) => writeStream!.write(chunk)) + response.on('end', () => writeStream!.end(() => resolve(tempFile))) response.on('error', (err: Error) => { writeStream?.end() cleanupTempFile(tempFile) @@ -106,108 +168,232 @@ async function fetchToTempFile(ds: DataSource): Promise { }) } -/** - * Execute a single pull operation - */ -async function executePull(ds: DataSource): Promise { - if (getImportingStatus()) { - console.log(`[PullScheduler] Skipping pull for "${ds.name}": import in progress`) - return +// ==================== Import helper ==================== + +function deriveLocalSessionId(baseUrl: string, remoteSessionId: string): string { + const hash = crypto.createHash('sha256').update(`${baseUrl}\0${remoteSessionId}`).digest('hex').slice(0, 12) + return `remote_${hash}` +} + +function localSessionExists(sessionId: string): boolean { + const dbPath = path.join(worker.getDbDirectory(), `${sessionId}.db`) + return fs.existsSync(dbPath) +} + +interface ImportResult { + success: boolean + newMessageCount: number + sessionId?: string + error?: string + needFullResync?: boolean +} + +async function importTempFile( + baseUrl: string, + sess: ImportSession, + tempFile: string +): Promise { + let targetId = sess.targetSessionId + if (!targetId) { + const derived = deriveLocalSessionId(baseUrl, sess.remoteSessionId) + if (localSessionExists(derived)) { + targetId = derived + apiLogger.info(`[Pull] Reusing existing local session ${derived} for "${sess.name}"`) + } } - console.log(`[PullScheduler] Pulling from "${ds.name}" (${ds.url})`) - - const sources = loadDataSources() - const idx = sources.findIndex((s) => s.id === ds.id) - if (idx === -1) return - - let tempFile = '' - try { - tempFile = await fetchToTempFile(ds) - - // Skip empty responses - const stat = fs.statSync(tempFile) - if (stat.size === 0) { - console.log(`[PullScheduler] Empty response from "${ds.name}", skipping`) - sources[idx].lastPullAt = Math.floor(Date.now() / 1000) - sources[idx].lastStatus = 'success' - sources[idx].lastNewMessages = 0 - saveDataSources(sources) - return - } - - let result: any - if (ds.targetSessionId) { - result = await worker.incrementalImport(ds.targetSessionId, tempFile) - if (result.success) { - try { - await worker.generateIncrementalSessions(ds.targetSessionId) - } catch { - /* ignore */ - } - } - } else { - result = await worker.streamImport(tempFile) - } - - sources[idx].lastPullAt = Math.floor(Date.now() / 1000) - + if (targetId) { + apiLogger.info(`[Pull] Incremental import to session ${targetId}`) + const result = await worker.incrementalImport(targetId, tempFile) if (result.success) { - sources[idx].lastStatus = 'success' - sources[idx].lastNewMessages = result.newMessageCount ?? 0 - sources[idx].lastError = '' - notifySessionListChanged() - notifyPullResult(ds.id, 'success', `Added ${sources[idx].lastNewMessages} new messages`) - } else { - sources[idx].lastStatus = 'error' - sources[idx].lastError = result.error || 'Import failed' - notifyPullResult(ds.id, 'error', sources[idx].lastError) + apiLogger.info(`[Pull] Incremental OK: +${result.newMessageCount} messages`) + try { + await worker.generateIncrementalSessions(targetId) + } catch { + /* ignore */ + } + return { success: true, newMessageCount: result.newMessageCount, sessionId: targetId } + } + if (result.error === 'error.session_not_found') { + apiLogger.warn(`[Pull] Session ${targetId} not found locally, need full resync`) + return { success: false, newMessageCount: 0, sessionId: targetId, needFullResync: true } + } + apiLogger.error(`[Pull] Incremental import failed: ${result.error}`) + return { success: false, newMessageCount: 0, sessionId: targetId, error: result.error } + } + + const externalId = deriveLocalSessionId(baseUrl, sess.remoteSessionId) + apiLogger.info(`[Pull] First import via streamImport for "${sess.name}" (externalId=${externalId})`) + const result = await worker.streamImport(tempFile, undefined, undefined, externalId) + if (result.success) { + const msgCount = result.diagnostics?.messagesWritten ?? 0 + apiLogger.info(`[Pull] streamImport OK: session=${result.sessionId}, messages=${msgCount}`) + return { success: true, newMessageCount: msgCount, sessionId: result.sessionId } + } + apiLogger.error(`[Pull] streamImport failed: ${result.error}`) + return { success: false, newMessageCount: 0, error: result.error } +} + +// ==================== Core pull loop (per ImportSession) ==================== + +const MAX_PAGES_PER_PULL = 50 + +interface PullSessionResult { + success: boolean + newMessageCount: number + error?: string +} + +async function executePullSession( + sourceId: string, + ds: DataSource, + sess: ImportSession +): Promise { + if (getImportingStatus()) { + apiLogger.info(`[Pull] Skipping "${sess.name}": import in progress`) + return { success: false, newMessageCount: 0, error: 'Import in progress' } + } + + apiLogger.info(`[Pull] Pulling "${sess.name}" from ${ds.baseUrl}`) + + let totalNewMessages = 0 + let since = sess.lastPullAt + let offset = 0 + let end: number | undefined + let pageCount = 0 + let resyncAttempted = false + + try { + while (pageCount < MAX_PAGES_PER_PULL) { + pageCount++ + const tempFile = await fetchToTempFile(ds.baseUrl, sess.remoteSessionId, ds.token, { since, offset, end }) + + try { + const stat = fs.statSync(tempFile) + apiLogger.info(`[Pull] "${sess.name}" page ${pageCount}: fetched ${stat.size} bytes`) + if (stat.size === 0) { + cleanupTempFile(tempFile) + break + } + + const sync = parseSyncFromFile(tempFile) + const result = await importTempFile(ds.baseUrl, sess, tempFile) + cleanupTempFile(tempFile) + + if (result.needFullResync && !resyncAttempted) { + resyncAttempted = true + apiLogger.info(`[Pull] Resetting since=0 for "${sess.name}" full resync`) + since = 0 + offset = 0 + pageCount = 0 + sess.targetSessionId = '' + sess.lastPullAt = 0 + updateImportSession(sourceId, sess.id, { targetSessionId: '', lastPullAt: 0 }) + continue + } + + if (result.needFullResync) { + const errMsg = 'Full resync failed' + apiLogger.error(`[Pull] Full resync already attempted for "${sess.name}", aborting`) + updateImportSession(sourceId, sess.id, { + lastPullAt: Math.floor(Date.now() / 1000), + lastStatus: 'error', + lastError: errMsg, + }) + notifyPullResult(sourceId, sess.id, 'error', errMsg) + return { success: false, newMessageCount: 0, error: errMsg } + } + + if (!result.success) { + const errMsg = result.error || 'Import failed' + updateImportSession(sourceId, sess.id, { + lastPullAt: Math.floor(Date.now() / 1000), + lastStatus: 'error', + lastError: errMsg, + }) + notifyPullResult(sourceId, sess.id, 'error', errMsg) + return { success: false, newMessageCount: 0, error: errMsg } + } + + if (!sess.targetSessionId && result.sessionId) { + sess.targetSessionId = result.sessionId + updateImportSession(sourceId, sess.id, { targetSessionId: result.sessionId }) + } + + totalNewMessages += result.newMessageCount + + if (!sync || !sync.hasMore) break + + if (sync.nextSince !== undefined) since = sync.nextSince + if (sync.nextOffset !== undefined) offset = sync.nextOffset + else offset = 0 + if (sync.watermark !== undefined && !end) end = sync.watermark + } catch (importErr) { + cleanupTempFile(tempFile) + throw importErr + } } - saveDataSources(sources) + if (pageCount >= MAX_PAGES_PER_PULL) { + apiLogger.warn(`[Pull] "${sess.name}" reached page limit (${MAX_PAGES_PER_PULL}), data may be incomplete`) + } + + updateImportSession(sourceId, sess.id, { + lastPullAt: Math.floor(Date.now() / 1000), + lastStatus: 'success', + lastNewMessages: totalNewMessages, + lastError: '', + }) + if (totalNewMessages > 0) notifySessionListChanged() + notifyPullResult(sourceId, sess.id, 'success', `+${totalNewMessages} messages`) + return { success: true, newMessageCount: totalNewMessages } } catch (error: any) { - console.error(`[PullScheduler] Pull failed for "${ds.name}":`, error) - sources[idx].lastPullAt = Math.floor(Date.now() / 1000) - sources[idx].lastStatus = 'error' - sources[idx].lastError = error.message || 'Pull failed' - saveDataSources(sources) - notifyPullResult(ds.id, 'error', sources[idx].lastError) - } finally { - if (tempFile) cleanupTempFile(tempFile) + const errMsg = error.message || 'Pull failed' + apiLogger.error(`[Pull] Pull failed for "${sess.name}"`, error) + updateImportSession(sourceId, sess.id, { + lastPullAt: Math.floor(Date.now() / 1000), + lastStatus: 'error', + lastError: errMsg, + }) + notifyPullResult(sourceId, sess.id, 'error', errMsg) + return { success: false, newMessageCount: 0, error: errMsg } } } -/** - * Start timer for a data source - */ +async function pullAllSessions(ds: DataSource): Promise { + for (const sess of ds.sessions) { + await executePullSession(ds.id, ds, sess) + } +} + +// ==================== Timer management ==================== + function startTimer(ds: DataSource): void { stopTimer(ds.id) - - if (!ds.enabled || ds.intervalMinutes < 1) return + if (!ds.enabled || ds.intervalMinutes < 1 || ds.sessions.length === 0) return const intervalMs = ds.intervalMinutes * 60 * 1000 - // Execute immediately on start - executePull(ds).catch((err) => { - console.error(`[PullScheduler] Initial pull failed:`, err) + pullAllSessions(ds).catch((err) => { + apiLogger.error('[Pull] Initial pull failed', err) }) const timer = setInterval(() => { const current = loadDataSources().find((s) => s.id === ds.id) - if (!current || !current.enabled) { + if (!current || !current.enabled || current.sessions.length === 0) { stopTimer(ds.id) return } - executePull(current).catch((err) => { - console.error(`[PullScheduler] Scheduled pull failed:`, err) + pullAllSessions(current).catch((err) => { + apiLogger.error('[Pull] Scheduled pull failed', err) }) }, intervalMs) timers.set(ds.id, timer) - console.log(`[PullScheduler] Timer started for "${ds.name}" (every ${ds.intervalMinutes}min)`) + apiLogger.info(`[Pull] Timer started for source ${ds.baseUrl} (${ds.sessions.length} sessions, every ${ds.intervalMinutes}min)`) } -function stopTimer(id: string): void { +export function stopTimer(id: string): void { const timer = timers.get(id) if (timer) { clearInterval(timer) @@ -215,37 +401,28 @@ function stopTimer(id: string): void { } } -/** - * Initialize scheduler: start timers for all enabled data sources - */ export function initScheduler(): void { if (initialized) return initialized = true const sources = loadDataSources() for (const ds of sources) { - if (ds.enabled) { + if (ds.enabled && ds.sessions.length > 0) { startTimer(ds) } } - console.log(`[PullScheduler] Initialized with ${sources.filter((s) => s.enabled).length} active sources`) + apiLogger.info(`[Pull] Initialized with ${sources.filter((s) => s.enabled).length} active sources`) } -/** - * Stop all active timers - */ export function stopAllTimers(): void { for (const [id] of timers) { stopTimer(id) } initialized = false - console.log('[PullScheduler] All timers stopped') + apiLogger.info('[Pull] All timers stopped') } -/** - * Reload timer for a single data source (called after config changes) - */ export function reloadTimer(dsId: string): void { stopTimer(dsId) const ds = loadDataSources().find((s) => s.id === dsId) @@ -254,21 +431,32 @@ export function reloadTimer(dsId: string): void { } } -/** - * Manually trigger a single pull - */ -export async function triggerPull(dsId: string): Promise<{ success: boolean; error?: string }> { - const ds = loadDataSources().find((s) => s.id === dsId) +export async function triggerPull( + sourceId: string, + sessionId?: string +): Promise<{ success: boolean; error?: string }> { + const ds = loadDataSources().find((s) => s.id === sourceId) if (!ds) return { success: false, error: 'Data source not found' } - try { - await executePull(ds) - const updated = loadDataSources().find((s) => s.id === dsId) - if (updated?.lastStatus === 'error') { - return { success: false, error: updated.lastError } - } - return { success: true } - } catch (error: any) { - return { success: false, error: error.message } + if (sessionId) { + const sess = ds.sessions.find((s) => s.id === sessionId) + if (!sess) return { success: false, error: 'Session not found' } + const result = await executePullSession(sourceId, ds, sess) + return { success: result.success, error: result.error } } + + const errors: string[] = [] + for (const sess of ds.sessions) { + const result = await executePullSession(sourceId, ds, sess) + if (!result.success && result.error) errors.push(`${sess.name}: ${result.error}`) + } + if (errors.length > 0) { + return { success: false, error: errors.join('; ') } + } + return { success: true } +} + +/** Semantic alias for pulling all sessions in a source */ +export async function triggerPullAll(sourceId: string): Promise<{ success: boolean; error?: string }> { + return triggerPull(sourceId) } diff --git a/electron/main/api/routes/import.ts b/electron/main/api/routes/import.ts index dc75396c..7e58ddb3 100644 --- a/electron/main/api/routes/import.ts +++ b/electron/main/api/routes/import.ts @@ -1,7 +1,10 @@ /** * ChatLab API — Import routes (Push mode) * - * POST /api/v1/import Import to new session + * POST /api/v1/imports/:sessionId Unified import endpoint (auto-create or incremental) + * + * Legacy (deprecated, kept for backward compatibility): + * POST /api/v1/import Import to new session (auto-generated sessionId) * POST /api/v1/sessions/:id/import Incremental import to existing session * * Content-Type dispatch: @@ -19,15 +22,50 @@ import { getTempDir } from '../../paths' import * as worker from '../../worker/workerManager' import { successResponse, - sessionNotFound, importInProgress, importFailed, invalidFormat, + invalidPayload, + idempotencyConflict, errorResponse, } from '../errors' +import { apiLogger } from '../logger' let isImporting = false +// ==================== Idempotency cache ==================== + +interface IdempotencyCacheEntry { + bodyHash: string + status: 'pending' | 'success' + response: any + timestamp: number +} + +const IDEMPOTENCY_TTL_MS = 60 * 60 * 1000 // 1 hour +const IDEMPOTENCY_CLEANUP_INTERVAL_MS = 10 * 60 * 1000 // 10 minutes +const idempotencyCache = new Map() + +setInterval(() => { + const now = Date.now() + for (const [key, entry] of idempotencyCache) { + if (now - entry.timestamp > IDEMPOTENCY_TTL_MS) { + idempotencyCache.delete(key) + } + } +}, IDEMPOTENCY_CLEANUP_INTERVAL_MS) + +function computeBodyHash(body: unknown, tempFile?: string): string { + if (tempFile && fs.existsSync(tempFile)) { + const content = fs.readFileSync(tempFile) + return crypto.createHash('sha256').update(content).digest('hex') + } + return crypto + .createHash('sha256') + .update(JSON.stringify(body ?? '')) + .digest('hex') +} + function getTempFilePath(ext: string): string { const id = crypto.randomBytes(8).toString('hex') return path.join(getTempDir(), `api-import-${id}${ext}`) @@ -39,14 +77,10 @@ function cleanupTempFile(filePath: string): void { fs.unlinkSync(filePath) } } catch (err) { - console.error('[ChatLab API] Failed to cleanup temp file:', err) + apiLogger.error('Failed to cleanup temp file', err) } } -/** - * Notify renderer process to refresh session list. - * Lazy-requires electron to avoid circular dependency. - */ function notifySessionListChanged(): void { try { const wins = BrowserWindow.getAllWindows() @@ -58,11 +92,256 @@ function notifySessionListChanged(): void { } } +function idempotencySuccess(key: string | undefined, response: any): void { + if (!key) return + const entry = idempotencyCache.get(key) + if (entry) { + entry.status = 'success' + entry.response = response + } +} + +function idempotencyFail(key: string | undefined): void { + if (!key) return + idempotencyCache.delete(key) +} + export function getImportingStatus(): boolean { return isImporting } -async function handleImport(request: FastifyRequest, reply: FastifyReply, sessionId?: string): Promise { +/** + * 检查 session 是否已存在(快速文件检测) + */ +function sessionExists(sessionId: string): boolean { + try { + const dbDir = worker.getDbDirectory() + return fs.existsSync(path.join(dbDir, `${sessionId}.db`)) + } catch { + return false + } +} + +/** + * 将请求 body 写入临时文件,返回文件路径和解析后的 content type 信息 + */ +async function writeTempFile( + request: FastifyRequest, + isJson: boolean +): Promise<{ tempFile: string; error?: never } | { tempFile?: never; error: string }> { + if (isJson) { + const body = request.body + if (!body || typeof body !== 'object') { + return { error: 'Request body is not valid JSON' } + } + const tempFile = getTempFilePath('.json') + fs.writeFileSync(tempFile, JSON.stringify(body), 'utf-8') + return { tempFile } + } else { + const tempFile = getTempFilePath('.jsonl') + const writeStream = fs.createWriteStream(tempFile) + await pipeline(request.raw, writeStream) + return { tempFile } + } +} + +/** + * v3 统一导入处理:自动判断新建或增量 + */ +async function handleUnifiedImport(request: FastifyRequest, reply: FastifyReply, sessionId: string): Promise { + if (isImporting) { + const err = importInProgress() + reply.code(err.statusCode).send(errorResponse(err)) + return + } + + const contentType = (request.headers['content-type'] || '').toLowerCase() + const isJsonl = contentType.includes('application/x-ndjson') + const isJson = contentType.includes('application/json') + + if (!isJsonl && !isJson) { + const err = invalidFormat('Content-Type must be application/json or application/x-ndjson') + reply.code(err.statusCode).send(errorResponse(err)) + return + } + + const idempotencyKey = request.headers['idempotency-key'] as string | undefined + const isDryRun = (request.headers['x-dry-run'] as string)?.toLowerCase() === 'true' + + const cacheKey = idempotencyKey ? `${idempotencyKey}:${sessionId}:${isDryRun}` : undefined + + isImporting = true + let tempFile = '' + + try { + const writeResult = await writeTempFile(request, isJson) + if (writeResult.error) { + const err = invalidFormat(writeResult.error) + reply.code(err.statusCode).send(errorResponse(err)) + return + } + tempFile = writeResult.tempFile + + // Idempotency-Key check (after tempFile is written so we can hash JSONL content) + if (cacheKey) { + const bodyHash = isJsonl ? computeBodyHash(null, tempFile) : computeBodyHash(request.body) + const cached = idempotencyCache.get(cacheKey) + if (cached) { + if (cached.bodyHash !== bodyHash) { + const err = idempotencyConflict() + reply.code(err.statusCode).send(errorResponse(err)) + return + } + if (cached.status === 'pending') { + reply.code(409).send( + errorResponse({ + statusCode: 409, + code: 'IDEMPOTENCY_PENDING', + message: 'A request with this Idempotency-Key is still in progress. Please retry later.', + }) + ) + return + } + reply.send(cached.response) + return + } + idempotencyCache.set(cacheKey, { bodyHash, status: 'pending', response: null, timestamp: Date.now() }) + } + + const importOptions = + isJson && request.body && typeof request.body === 'object' ? (request.body as any).options : undefined + + const exists = sessionExists(sessionId) + + // X-Dry-Run: analyze only, no writes + if (isDryRun) { + let responsePayload: any + if (exists) { + const result = await worker.analyzeIncrementalImport(sessionId, tempFile) + if (result.error) { + idempotencyFail(cacheKey) + const err = invalidFormat(result.error) + reply.code(err.statusCode).send(errorResponse(err)) + return + } + responsePayload = successResponse({ + sessionId, + created: false, + dryRun: true, + analysis: { + totalInFile: result.totalInFile, + newMessageCount: result.newMessageCount, + duplicateCount: result.duplicateCount, + }, + }) + } else { + const result = await worker.analyzeNewImport(tempFile) + if (result.error) { + idempotencyFail(cacheKey) + const err = invalidFormat(result.error) + reply.code(err.statusCode).send(errorResponse(err)) + return + } + responsePayload = successResponse({ + sessionId, + created: true, + dryRun: true, + analysis: { + totalInFile: result.totalMessages, + newMessageCount: result.totalMessages, + duplicateCount: 0, + newMemberCount: result.totalMembers, + }, + }) + } + idempotencySuccess(cacheKey, responsePayload) + reply.send(responsePayload) + return + } + + if (exists) { + const result = await worker.incrementalImport(sessionId, tempFile, undefined, importOptions) + + if (result.success) { + try { + await worker.generateIncrementalSessions(sessionId) + } catch { + // non-blocking + } + notifySessionListChanged() + const responsePayload = successResponse({ + sessionId, + created: false, + batch: result.batch, + session: result.session, + updates: result.updates, + }) + idempotencySuccess(cacheKey, responsePayload) + reply.send(responsePayload) + } else { + idempotencyFail(cacheKey) + const err = importFailed(result.error || 'Incremental import failed') + reply.code(err.statusCode).send(errorResponse(err)) + } + } else { + const result = await worker.streamImport(tempFile, undefined, undefined, sessionId) + + if (result.success) { + notifySessionListChanged() + + const diag = result.diagnostics + let sessionInfo: any = undefined + try { + const s = await worker.getSession(result.sessionId!) + if (s) { + sessionInfo = { + totalCount: s.totalCount, + memberCount: s.memberCount, + firstTimestamp: s.firstTimestamp, + lastTimestamp: s.lastTimestamp, + } + } + } catch { + // non-blocking + } + + const responsePayload = successResponse({ + sessionId: result.sessionId, + created: true, + batch: diag + ? { + receivedCount: diag.messagesReceived, + writtenCount: diag.messagesWritten, + duplicateCount: diag.messagesSkipped, + } + : undefined, + session: sessionInfo, + }) + idempotencySuccess(cacheKey, responsePayload) + reply.send(responsePayload) + } else { + idempotencyFail(cacheKey) + const err = importFailed(result.error || 'Import failed') + reply.code(err.statusCode).send(errorResponse(err)) + } + } + } catch (error: any) { + idempotencyFail(cacheKey) + apiLogger.error('Import error', error) + const err = importFailed(error.message || 'Import process error') + reply.code(err.statusCode).send(errorResponse(err)) + } finally { + isImporting = false + if (tempFile) { + cleanupTempFile(tempFile) + } + } +} + +/** + * Legacy import handler (backward compatibility) + */ +async function handleLegacyImport(request: FastifyRequest, reply: FastifyReply, sessionId?: string): Promise { if (isImporting) { const err = importInProgress() reply.code(err.statusCode).send(errorResponse(err)) @@ -83,36 +362,28 @@ async function handleImport(request: FastifyRequest, reply: FastifyReply, sessio let tempFile = '' try { - if (isJson) { - // JSON mode: fastify already parsed body, write to temp file - const body = request.body - if (!body || typeof body !== 'object') { - const err = invalidFormat('Request body is not valid JSON') - reply.code(err.statusCode).send(errorResponse(err)) - return - } - - tempFile = getTempFilePath('.json') - fs.writeFileSync(tempFile, JSON.stringify(body), 'utf-8') - } else { - // JSONL mode: pipe raw stream to temp file - tempFile = getTempFilePath('.jsonl') - const writeStream = fs.createWriteStream(tempFile) - await pipeline(request.raw, writeStream) + const writeResult = await writeTempFile(request, isJson) + if (writeResult.error) { + const err = invalidFormat(writeResult.error) + reply.code(err.statusCode).send(errorResponse(err)) + return } - - let result: any + tempFile = writeResult.tempFile if (sessionId) { - // Incremental import to specified session const session = await worker.getSession(sessionId) if (!session) { + // Legacy route requires session to exist + const { sessionNotFound } = await import('../errors') const err = sessionNotFound(sessionId) reply.code(err.statusCode).send(errorResponse(err)) return } - result = await worker.incrementalImport(sessionId, tempFile) + const importOptions = + isJson && request.body && typeof request.body === 'object' ? (request.body as any).options : undefined + + const result = await worker.incrementalImport(sessionId, tempFile, undefined, importOptions) if (result.success) { try { @@ -120,44 +391,38 @@ async function handleImport(request: FastifyRequest, reply: FastifyReply, sessio } catch { // non-blocking } - notifySessionListChanged() - reply.send( successResponse({ - mode: 'incremental', sessionId, - newMessageCount: result.newMessageCount, + created: false, + batch: result.batch, + session: result.session, + updates: result.updates, }) ) - return } else { const err = importFailed(result.error || 'Incremental import failed') reply.code(err.statusCode).send(errorResponse(err)) - return } } else { - // New session import - result = await worker.streamImport(tempFile) + const result = await worker.streamImport(tempFile) if (result.success) { notifySessionListChanged() - reply.send( successResponse({ - mode: 'new', sessionId: result.sessionId, + created: true, }) ) - return } else { const err = importFailed(result.error || 'Import failed') reply.code(err.statusCode).send(errorResponse(err)) - return } } } catch (error: any) { - console.error('[ChatLab API] Import error:', error) + apiLogger.error('Import error', error) const err = importFailed(error.message || 'Import process error') reply.code(err.statusCode).send(errorResponse(err)) } finally { @@ -174,13 +439,25 @@ export function registerImportRoutes(server: FastifyInstance): void { done(null, undefined) }) - // POST /api/v1/import — Import to new session - server.post('/api/v1/import', async (request, reply) => { - await handleImport(request, reply) + const SESSION_ID_RE = /^[A-Za-z0-9._@-]{1,128}$/ + + // v3 unified endpoint + server.post<{ Params: { sessionId: string } }>('/api/v1/imports/:sessionId', async (request, reply) => { + const { sessionId } = request.params + if (!SESSION_ID_RE.test(sessionId)) { + const err = invalidPayload('sessionId must be 1-128 characters of [A-Za-z0-9._@-]') + reply.code(err.statusCode).send(errorResponse(err)) + return + } + await handleUnifiedImport(request, reply, sessionId) + }) + + // Legacy endpoints (deprecated, kept for backward compatibility) + server.post('/api/v1/import', async (request, reply) => { + await handleLegacyImport(request, reply) }) - // POST /api/v1/sessions/:id/import — Incremental import to existing session server.post<{ Params: { id: string } }>('/api/v1/sessions/:id/import', async (request, reply) => { - await handleImport(request, reply, request.params.id) + await handleLegacyImport(request, reply, request.params.id) }) } diff --git a/electron/main/api/server.ts b/electron/main/api/server.ts index 7b85db52..482f2534 100644 --- a/electron/main/api/server.ts +++ b/electron/main/api/server.ts @@ -5,6 +5,7 @@ import Fastify, { type FastifyInstance, type FastifyError } from 'fastify' import { authHook } from './auth' import { ApiError, ApiErrorCode, errorResponse, serverError } from './errors' +import { apiLogger } from './logger' const JSON_BODY_LIMIT = 50 * 1024 * 1024 // 50MB @@ -28,7 +29,7 @@ export function createServer(): FastifyInstance { return } - console.error('[ChatLab API] Unhandled error:', error) + apiLogger.error('Unhandled error', error) const err = serverError(error.message) reply.code(err.statusCode).send(errorResponse(err)) }) diff --git a/electron/main/ipc/api.ts b/electron/main/ipc/api.ts index 0ca8fc30..285ab9ff 100644 --- a/electron/main/ipc/api.ts +++ b/electron/main/ipc/api.ts @@ -1,13 +1,22 @@ /** - * ChatLab API — IPC handlers for renderer process + * ChatLab API — IPC handlers for renderer process (hierarchical data source model) */ import { ipcMain } from 'electron' import type { IpcContext } from './types' import * as apiServer from '../api' import { loadConfig, regenerateToken, updateConfig } from '../api/config' -import { loadDataSources, addDataSource, updateDataSource, deleteDataSource, type DataSource } from '../api/dataSource' -import { initScheduler, stopAllTimers, reloadTimer, triggerPull } from '../api/pullScheduler' +import { + loadDataSources, + addDataSource, + updateDataSource, + deleteDataSource, + addImportSessions, + removeImportSession, + type DataSource, +} from '../api/dataSource' +import { initScheduler, stopAllTimers, stopTimer, reloadTimer, triggerPull, triggerPullAll } from '../api/pullScheduler' +import { fetchRemoteSessions } from '../api/pullDiscovery' export function registerApiHandlers(_ctx: IpcContext): void { // ==================== API Server Management ==================== @@ -50,33 +59,63 @@ export function registerApiHandlers(_ctx: IpcContext): void { ipcMain.handle( 'api:addDataSource', - ( - _event, - partial: Omit - ) => { + (_event, partial: { name?: string; baseUrl: string; token: string; intervalMinutes: number }) => { const ds = addDataSource(partial) - if (ds.enabled) { + return ds + } + ) + + ipcMain.handle( + 'api:updateDataSource', + (_event, id: string, updates: Partial>) => { + const ds = updateDataSource(id, updates) + if (ds) { reloadTimer(ds.id) } return ds } ) - ipcMain.handle('api:updateDataSource', (_event, id: string, updates: Partial) => { - const ds = updateDataSource(id, updates) - if (ds) { - reloadTimer(ds.id) - } - return ds - }) - ipcMain.handle('api:deleteDataSource', (_event, id: string) => { - reloadTimer(id) // stops timer + stopTimer(id) return deleteDataSource(id) }) - ipcMain.handle('api:triggerPull', async (_event, id: string) => { - return triggerPull(id) + // ==================== Import Session Management ==================== + + ipcMain.handle( + 'api:addImportSessions', + (_event, sourceId: string, sessions: Array<{ name: string; remoteSessionId: string }>) => { + const added = addImportSessions(sourceId, sessions) + reloadTimer(sourceId) + return added + } + ) + + ipcMain.handle('api:removeImportSession', (_event, sourceId: string, sessionId: string) => { + const result = removeImportSession(sourceId, sessionId) + reloadTimer(sourceId) + return result + }) + + // ==================== Sync ==================== + + ipcMain.handle('api:triggerPull', async (_event, sourceId: string, sessionId?: string) => { + return triggerPull(sourceId, sessionId) + }) + + ipcMain.handle('api:triggerPullAll', async (_event, sourceId: string) => { + return triggerPullAll(sourceId) + }) + + // ==================== Remote Discovery ==================== + + ipcMain.handle('api:fetchRemoteSessions', async (_event, baseUrl: string, token: string) => { + try { + return await fetchRemoteSessions(baseUrl, token || undefined) + } catch (err: any) { + throw new Error(err.message || 'Failed to fetch remote sessions') + } }) } @@ -95,7 +134,6 @@ export async function initApiServer(ctx: IpcContext): Promise { }) } - // Initialize Pull scheduler (independent of API server, pulls even if API is not running) initScheduler() } diff --git a/electron/main/worker/dbWorker.ts b/electron/main/worker/dbWorker.ts index 8c9b661d..73968257 100644 --- a/electron/main/worker/dbWorker.ts +++ b/electron/main/worker/dbWorker.ts @@ -73,7 +73,13 @@ import { segmentText, getPosTags, } from './query' -import { streamImport, streamParseFileInfo, analyzeIncrementalImport, incrementalImport } from './import' +import { + streamImport, + streamParseFileInfo, + analyzeIncrementalImport, + incrementalImport, + analyzeNewImport, +} from './import' import { initNlpDir } from '../nlp/segmenter' // 初始化数据库目录 @@ -251,12 +257,14 @@ const syncHandlers: Record any> = { // 异步消息处理器(流式操作) const asyncHandlers: Record Promise> = { // 流式导入 - streamImport: (p, id) => streamImport(p.filePath, id, p.formatOptions), + streamImport: (p, id) => streamImport(p.filePath, id, p.formatOptions, p.externalSessionId), // 流式解析文件信息(用于合并预览) streamParseFileInfo: (p, id) => streamParseFileInfo(p.filePath, id), // 增量导入 analyzeIncrementalImport: (p, id) => analyzeIncrementalImport(p.sessionId, p.filePath, id), - incrementalImport: (p, id) => incrementalImport(p.sessionId, p.filePath, id), + incrementalImport: (p, id) => incrementalImport(p.sessionId, p.filePath, id, p.options), + // Dry-run 分析(新会话) + analyzeNewImport: (p, id) => analyzeNewImport(p.filePath, id), // 导出筛选结果到文件(支持进度报告) exportFilterResultToFile: async (p, id) => exportFilterResultToFile(p, id), } diff --git a/electron/main/worker/import/incrementalImport.ts b/electron/main/worker/import/incrementalImport.ts index a4a7e6c7..36c0bcdf 100644 --- a/electron/main/worker/import/incrementalImport.ts +++ b/electron/main/worker/import/incrementalImport.ts @@ -9,6 +9,12 @@ import { streamParseFile, detectFormat } from '../../parser' import { sendProgress, getDbPath } from './utils' import { generateMessageKey } from './tempDb' +/** 导入选项(控制 meta/members 更新行为) */ +export interface ImportOptions { + metaUpdateMode?: 'patch' | 'none' + memberUpdateMode?: 'upsert' | 'none' +} + /** 增量导入分析结果 */ export interface IncrementalAnalyzeResult { newMessageCount: number @@ -18,10 +24,88 @@ export interface IncrementalAnalyzeResult { } /** 增量导入结果 */ +interface ErrorSample { + index: number + reason: string + detail: string +} + export interface IncrementalImportResult { success: boolean newMessageCount: number error?: string + batch?: { + receivedCount: number + writtenCount: number + duplicateCount: number + errorCount: number + errorReasonCounts: Record + errorSample: ErrorSample[] + } + session?: { + totalCount: number + memberCount: number + firstTimestamp: number + lastTimestamp: number + } + updates?: { + metaUpdated: boolean + membersAdded: number + membersUpdated: number + } +} + +/** + * 加载现有消息的去重集合(双路径:platformMessageId 优先 + 内容哈希兜底) + */ +function loadExistingDedup(db: Database.Database): { + existingPlatformMsgIds: Set + existingKeys: Set +} { + const existingPlatformMsgIds = new Set() + const existingKeys = new Set() + + // 加载已有的 platformMessageId 集合 + const pmidRows = db + .prepare('SELECT platform_message_id FROM message WHERE platform_message_id IS NOT NULL') + .all() as Array<{ platform_message_id: string }> + for (const row of pmidRows) { + existingPlatformMsgIds.add(row.platform_message_id) + } + + // 加载内容哈希集合(用于无 platformMessageId 的消息) + const hashRows = db + .prepare( + `SELECT ts, m.platform_id as sender_platform_id, content + FROM message msg + JOIN member m ON msg.sender_id = m.id` + ) + .all() as Array<{ ts: number; sender_platform_id: string; content: string | null }> + for (const row of hashRows) { + existingKeys.add(generateMessageKey(row.ts, row.sender_platform_id, row.content)) + } + + return { existingPlatformMsgIds, existingKeys } +} + +/** + * 双路径去重判断:platformMessageId 优先,内容哈希兜底 + * @returns true 表示是重复消息,应跳过 + */ +function isDuplicate( + msg: { platformMessageId?: string; timestamp: number; senderPlatformId: string; content: string | null }, + existingPlatformMsgIds: Set, + existingKeys: Set +): boolean { + if (msg.platformMessageId) { + if (existingPlatformMsgIds.has(msg.platformMessageId)) return true + existingPlatformMsgIds.add(msg.platformMessageId) + return false + } + const key = generateMessageKey(msg.timestamp, msg.senderPlatformId, msg.content) + if (existingKeys.has(key)) return true + existingKeys.add(key) + return false } /** @@ -32,13 +116,11 @@ export async function analyzeIncrementalImport( filePath: string, requestId: string ): Promise { - // 检测文件格式 const formatFeature = detectFormat(filePath) if (!formatFeature) { return { error: 'error.unrecognized_format', newMessageCount: 0, duplicateCount: 0, totalInFile: 0 } } - // 打开目标数据库获取现有消息的 key 集合 const dbPath = getDbPath(sessionId) if (!fs.existsSync(dbPath)) { return { error: 'error.session_not_found', newMessageCount: 0, duplicateCount: 0, totalInFile: 0 } @@ -47,25 +129,9 @@ export async function analyzeIncrementalImport( const db = new Database(dbPath, { readonly: true }) db.pragma('journal_mode = WAL') - // 获取现有消息的 key 集合 - const existingKeys = new Set() - const rows = db - .prepare( - ` - SELECT ts, m.platform_id as sender_platform_id, content - FROM message msg - JOIN member m ON msg.sender_id = m.id - ` - ) - .all() as Array<{ ts: number; sender_platform_id: string; content: string | null }> - - for (const row of rows) { - existingKeys.add(generateMessageKey(row.ts, row.sender_platform_id, row.content)) - } - + const { existingPlatformMsgIds, existingKeys } = loadExistingDedup(db) db.close() - // 解析新文件,统计新消息数 let totalInFile = 0 let newMessageCount = 0 let duplicateCount = 0 @@ -79,8 +145,7 @@ export async function analyzeIncrementalImport( onMessageBatch: (batch) => { for (const msg of batch) { totalInFile++ - const key = generateMessageKey(msg.timestamp, msg.senderPlatformId, msg.content) - if (existingKeys.has(key)) { + if (isDuplicate(msg, existingPlatformMsgIds, existingKeys)) { duplicateCount++ } else { newMessageCount++ @@ -102,40 +167,28 @@ export async function analyzeIncrementalImport( export async function incrementalImport( sessionId: string, filePath: string, - requestId: string + requestId: string, + options?: ImportOptions ): Promise { - // 检测文件格式 const formatFeature = detectFormat(filePath) if (!formatFeature) { return { success: false, newMessageCount: 0, error: 'error.unrecognized_format' } } - // 打开目标数据库 const dbPath = getDbPath(sessionId) if (!fs.existsSync(dbPath)) { return { success: false, newMessageCount: 0, error: 'error.session_not_found' } } + const metaUpdateMode = options?.metaUpdateMode ?? 'patch' + const memberUpdateMode = options?.memberUpdateMode ?? 'upsert' + const db = new Database(dbPath) db.pragma('journal_mode = WAL') db.pragma('synchronous = NORMAL') try { - // 获取现有消息的 key 集合 - const existingKeys = new Set() - const rows = db - .prepare( - ` - SELECT ts, m.platform_id as sender_platform_id, content - FROM message msg - JOIN member m ON msg.sender_id = m.id - ` - ) - .all() as Array<{ ts: number; sender_platform_id: string; content: string | null }> - - for (const row of rows) { - existingKeys.add(generateMessageKey(row.ts, row.sender_platform_id, row.content)) - } + const { existingPlatformMsgIds, existingKeys } = loadExistingDedup(db) // 获取现有成员映射 const memberIdMap = new Map() @@ -143,42 +196,98 @@ export async function incrementalImport( id: number platform_id: string }> + const initialMemberCount = existingMembers.length for (const m of existingMembers) { memberIdMap.set(m.platform_id, m.id) } - // 准备插入语句 - const insertMember = db.prepare(` - INSERT INTO member (platform_id, account_name, group_nickname, avatar) + // members upsert:新增+更新(含 roles 列) + const upsertMember = db.prepare(` + INSERT INTO member (platform_id, account_name, group_nickname, avatar, roles) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(platform_id) DO UPDATE SET + account_name = COALESCE(NULLIF(excluded.account_name, ''), account_name), + group_nickname = COALESCE(NULLIF(excluded.group_nickname, ''), group_nickname), + avatar = COALESCE(NULLIF(excluded.avatar, ''), avatar), + roles = CASE WHEN excluded.roles != '[]' THEN excluded.roles ELSE roles END + `) + + // 仅新增的 member 插入(用于消息中发现的未知 sender,无 roles 信息) + const insertMemberMinimal = db.prepare(` + INSERT OR IGNORE INTO member (platform_id, account_name, group_nickname, avatar) VALUES (?, ?, ?, ?) `) + const getMemberId = db.prepare('SELECT id FROM member WHERE platform_id = ?') + const insertMessage = db.prepare(` - INSERT INTO message (sender_id, sender_account_name, sender_group_nickname, ts, type, content) - VALUES (?, ?, ?, ?, ?, ?) + INSERT INTO message (sender_id, sender_account_name, sender_group_nickname, ts, type, content, reply_to_message_id, platform_message_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `) + + // meta 非空字段覆盖更新 + const updateMeta = db.prepare(` + UPDATE meta SET + name = COALESCE(NULLIF(?, ''), name), + group_id = COALESCE(NULLIF(?, ''), group_id), + group_avatar = COALESCE(NULLIF(?, ''), group_avatar), + owner_id = COALESCE(NULLIF(?, ''), owner_id), + imported_at = ? `) - // 开始事务 db.exec('BEGIN TRANSACTION') let newMessageCount = 0 + let duplicateCount = 0 let processedCount = 0 + let metaUpdated = false + let membersAdded = 0 + let membersUpdated = 0 + let errorCount = 0 + const errorReasonCounts: Record = {} + const errorSamples: ErrorSample[] = [] + const MAX_ERROR_SAMPLES = 5 const BATCH_SIZE = 5000 - // 解析新文件并写入 + function trackError(index: number, reason: string, detail: string) { + errorCount++ + errorReasonCounts[reason] = (errorReasonCounts[reason] || 0) + 1 + if (errorSamples.length < MAX_ERROR_SAMPLES) { + errorSamples.push({ index, reason, detail }) + } + } + + const newFtsEntries: Array<{ id: number; content: string | null }> = [] + await streamParseFile(filePath, { - onMeta: () => {}, + onMeta: (meta) => { + if (metaUpdateMode === 'none') return + updateMeta.run( + meta.name || '', + meta.groupId || '', + meta.groupAvatar || '', + meta.ownerId || '', + Math.floor(Date.now() / 1000) + ) + metaUpdated = true + }, onMembers: (members) => { - // 添加新成员 + if (memberUpdateMode === 'none') return for (const m of members) { - if (!memberIdMap.has(m.platformId)) { - const result = insertMember.run( - m.platformId, - m.accountName || null, - m.groupNickname || null, - m.avatar || null - ) - memberIdMap.set(m.platformId, result.lastInsertRowid as number) + const existed = memberIdMap.has(m.platformId) + upsertMember.run( + m.platformId, + m.accountName || null, + m.groupNickname || null, + m.avatar || null, + m.roles ? JSON.stringify(m.roles) : '[]' + ) + if (!existed) { + const row = getMemberId.get(m.platformId) as { id: number } | undefined + if (row) memberIdMap.set(m.platformId, row.id) + membersAdded++ + } else { + membersUpdated++ } } }, @@ -188,71 +297,103 @@ export async function incrementalImport( onMessageBatch: (batch) => { for (const msg of batch) { processedCount++ - const key = generateMessageKey(msg.timestamp, msg.senderPlatformId, msg.content) - // 跳过重复消息 - if (existingKeys.has(key)) { + // Message validation + if (!msg.senderPlatformId) { + trackError(processedCount, 'MISSING_SENDER', 'sender field is empty') + continue + } + if (msg.timestamp === undefined || msg.timestamp === null) { + trackError(processedCount, 'MISSING_TIMESTAMP', 'timestamp field is missing') + continue + } + if (typeof msg.timestamp !== 'number' || msg.timestamp <= 0 || !isFinite(msg.timestamp)) { + trackError(processedCount, 'INVALID_TIMESTAMP', `timestamp value: ${msg.timestamp}`) + continue + } + + if (isDuplicate(msg, existingPlatformMsgIds, existingKeys)) { + duplicateCount++ continue } - // 确保成员存在 let memberId = memberIdMap.get(msg.senderPlatformId) if (!memberId) { - const result = insertMember.run( + insertMemberMinimal.run( msg.senderPlatformId, msg.senderAccountName || null, msg.senderGroupNickname || null, null ) - memberId = result.lastInsertRowid as number - memberIdMap.set(msg.senderPlatformId, memberId) + const row = getMemberId.get(msg.senderPlatformId) as { id: number } | undefined + if (row) { + memberId = row.id + memberIdMap.set(msg.senderPlatformId, memberId) + membersAdded++ + } } + if (!memberId) continue - // 插入消息 - insertMessage.run( + const msgResult = insertMessage.run( memberId, msg.senderAccountName || null, msg.senderGroupNickname || null, msg.timestamp, msg.type, - msg.content || null + msg.content || null, + msg.replyToMessageId || null, + msg.platformMessageId || null ) - // 添加到已有 key 集合(防止文件内重复) - existingKeys.add(key) + newFtsEntries.push({ + id: Number(msgResult.lastInsertRowid), + content: msg.content || null, + }) newMessageCount++ } - // 定期发送进度 if (processedCount % BATCH_SIZE === 0) { sendProgress(requestId, { stage: 'saving', bytesRead: 0, totalBytes: 0, messagesProcessed: processedCount, - percentage: 50, // 实际进度难以计算,使用固定值 + percentage: 50, message: `已处理 ${processedCount} 条,新增 ${newMessageCount} 条`, }) } }, }) - // 提交事务 db.exec('COMMIT') - // 更新 imported_at 时间 - db.prepare('UPDATE meta SET imported_at = ?').run(Math.floor(Date.now() / 1000)) + // 若 onMeta 未触发但有新消息,仍需更新 imported_at + if (!metaUpdated) { + db.prepare('UPDATE meta SET imported_at = ?').run(Math.floor(Date.now() / 1000)) + } - // 重建 FTS5 索引(增量导入后需要重建以包含新消息) - if (newMessageCount > 0) { + // 增量 FTS 更新 + if (newFtsEntries.length > 0) { try { - const { rebuildFtsIndex } = await import('../query/fts') - rebuildFtsIndex(sessionId) + const { insertFtsEntries } = await import('../query/fts') + insertFtsEntries(sessionId, newFtsEntries) } catch { - // FTS 重建失败不影响导入流程 + // FTS 更新失败不影响导入流程 } } + // 查询 session 统计信息(用于 v1 响应) + const sessionStats = db + .prepare( + `SELECT + COUNT(*) as totalCount, + MIN(ts) as firstTimestamp, + MAX(ts) as lastTimestamp + FROM message` + ) + .get() as { totalCount: number; firstTimestamp: number; lastTimestamp: number } + const memberCountRow = db.prepare('SELECT COUNT(*) as count FROM member').get() as { count: number } + // 写入概览统计缓存文件 try { const { computeAndSetOverviewCache } = await import('../../database/sessionCache') @@ -273,13 +414,34 @@ export async function incrementalImport( message: `导入完成,新增 ${newMessageCount} 条消息`, }) - return { success: true, newMessageCount } + return { + success: true, + newMessageCount, + batch: { + receivedCount: processedCount, + writtenCount: newMessageCount, + duplicateCount, + errorCount, + errorReasonCounts, + errorSample: errorSamples, + }, + session: { + totalCount: sessionStats.totalCount, + memberCount: memberCountRow.count, + firstTimestamp: sessionStats.firstTimestamp, + lastTimestamp: sessionStats.lastTimestamp, + }, + updates: { + metaUpdated, + membersAdded, + membersUpdated, + }, + } } catch (error) { - // 回滚事务 try { db.exec('ROLLBACK') } catch { - // 忽略回滚错误 + // ignore rollback error } db.close() diff --git a/electron/main/worker/import/index.ts b/electron/main/worker/import/index.ts index 9fe56145..b266133a 100644 --- a/electron/main/worker/import/index.ts +++ b/electron/main/worker/import/index.ts @@ -7,14 +7,17 @@ export { streamImport, streamParseFileInfo, + analyzeNewImport, type StreamImportResult, type StreamParseFileInfoResult, + type AnalyzeNewImportResult, } from './streamImport' // 增量导入 export { analyzeIncrementalImport, incrementalImport, + type ImportOptions, type IncrementalAnalyzeResult, type IncrementalImportResult, } from './incrementalImport' diff --git a/electron/main/worker/import/streamImport.ts b/electron/main/worker/import/streamImport.ts index 99858bc8..5e136745 100644 --- a/electron/main/worker/import/streamImport.ts +++ b/electron/main/worker/import/streamImport.ts @@ -139,11 +139,13 @@ function createTempDatabase(dbPath: string): Database.Database { * @param filePath 文件路径 * @param requestId 请求ID(用于进度回调) * @param formatOptions 格式特定选项(如 Telegram 的 chatIndex) + * @param externalSessionId 外部指定的 sessionId(API 导入场景),省略时自动生成 */ export async function streamImport( filePath: string, requestId: string, - formatOptions?: Record + formatOptions?: Record, + externalSessionId?: string ): Promise { // 用户手动指定格式时,跳过自动检测直接使用指定的 Parser if (formatOptions?.formatId) { @@ -152,7 +154,7 @@ export async function streamImport( if (!feature) { return { success: false, error: 'error.unknown_format_id' } } - return streamImportSingle(filePath, requestId, feature, formatOptions) + return streamImportSingle(filePath, requestId, feature, formatOptions, externalSessionId) } // 检测所有匹配的格式(按优先级排序) @@ -163,12 +165,12 @@ export async function streamImport( // 如果有多个候选格式,使用 fallback 机制逐个尝试 if (candidates.length > 1) { - return streamImportWithFallback(filePath, requestId, candidates, formatOptions) + return streamImportWithFallback(filePath, requestId, candidates, formatOptions, externalSessionId) } // 单个候选格式,走常规逻辑(无额外开销) const formatFeature = candidates[0] - return streamImportSingle(filePath, requestId, formatFeature, formatOptions) + return streamImportSingle(filePath, requestId, formatFeature, formatOptions, externalSessionId) } /** @@ -179,7 +181,8 @@ async function streamImportWithFallback( filePath: string, requestId: string, candidates: FormatFeature[], - formatOptions?: Record + formatOptions?: Record, + externalSessionId?: string ): Promise { for (let i = 0; i < candidates.length; i++) { const candidate = candidates[i] @@ -187,7 +190,7 @@ async function streamImportWithFallback( console.log(`[StreamImport] Trying format ${i + 1}/${candidates.length}: ${candidate.name} (${candidate.id})`) - const result = await streamImportSingle(filePath, requestId, candidate, formatOptions) + const result = await streamImportSingle(filePath, requestId, candidate, formatOptions, externalSessionId) if (result.success) { if (i > 0) { @@ -218,11 +221,12 @@ async function streamImportSingle( filePath: string, requestId: string, formatFeature: FormatFeature, - formatOptions?: Record + formatOptions?: Record, + externalSessionId?: string ): Promise { // 初始化性能日志(实时写入文件) resetPerfLog() - const sessionId = generateSessionId() + const sessionId = externalSessionId || generateSessionId() initPerfLog(sessionId) // 记录导入开始信息 @@ -1029,3 +1033,47 @@ export async function streamParseFileInfo(filePath: string, requestId: string): throw error } } + +// ==================== Dry-run analysis for new sessions ==================== + +/** analyzeNewImport result — parse-only, no DB writes */ +export interface AnalyzeNewImportResult { + totalMessages: number + totalMembers: number + meta: { name: string; platform: string; type: string } | null + error?: string +} + +/** + * Parse file without writing to DB (dry-run for new sessions). + */ +export async function analyzeNewImport(filePath: string, requestId: string): Promise { + const formatFeature = detectFormat(filePath) + if (!formatFeature) { + return { totalMessages: 0, totalMembers: 0, meta: null, error: 'error.unrecognized_format' } + } + + let meta: { name: string; platform: string; type: string } | null = null + const memberSet = new Set() + let totalMessages = 0 + + await streamParseFile(filePath, { + onMeta: (parsedMeta) => { + meta = { name: parsedMeta.name, platform: parsedMeta.platform, type: parsedMeta.type } + }, + onMembers: (members) => { + for (const m of members) memberSet.add(m.platformId) + }, + onProgress: (progress) => { + sendProgress(requestId, progress) + }, + onMessageBatch: (batch) => { + for (const msg of batch) { + totalMessages++ + if (!memberSet.has(msg.senderPlatformId)) memberSet.add(msg.senderPlatformId) + } + }, + }) + + return { totalMessages, totalMembers: memberSet.size, meta } +} diff --git a/electron/main/worker/query/sessions.ts b/electron/main/worker/query/sessions.ts index 13b7bb5b..c6e0b745 100644 --- a/electron/main/worker/query/sessions.ts +++ b/electron/main/worker/query/sessions.ts @@ -226,6 +226,19 @@ export function getSession(sessionId: string): any | null { .get() as { count: number } ).count + // 时间范围(优先从 cache 获取) + const firstTimestamp = + overview?.firstMessageTs ?? + (db.prepare('SELECT MIN(ts) as v FROM message').get() as { v: number | null })?.v ?? + null + const lastTimestamp = + overview?.lastMessageTs ?? (db.prepare('SELECT MAX(ts) as v FROM message').get() as { v: number | null })?.v ?? null + + // 最新的 platform_message_id(用于增量边界参考) + const lastPmidRow = db + .prepare('SELECT platform_message_id FROM message WHERE platform_message_id IS NOT NULL ORDER BY ts DESC LIMIT 1') + .get() as { platform_message_id: string } | undefined + return { id: sessionId, name: meta.name, @@ -238,6 +251,9 @@ export function getSession(sessionId: string): any | null { groupId: meta.group_id || null, groupAvatar: meta.group_avatar || null, ownerId: meta.owner_id || null, + firstTimestamp, + lastTimestamp, + lastPlatformMessageId: lastPmidRow?.platform_message_id ?? null, } } diff --git a/electron/main/worker/workerManager.ts b/electron/main/worker/workerManager.ts index fc308dc8..1e153fc4 100644 --- a/electron/main/worker/workerManager.ts +++ b/electron/main/worker/workerManager.ts @@ -521,9 +521,10 @@ export async function streamParseFileInfo( export async function streamImport( filePath: string, onProgress?: (progress: ParseProgress) => void, - formatOptions?: Record + formatOptions?: Record, + externalSessionId?: string ): Promise { - return sendToWorkerWithProgress('streamImport', { filePath, formatOptions }, onProgress) + return sendToWorkerWithProgress('streamImport', { filePath, formatOptions, externalSessionId }, onProgress) } /** @@ -1066,6 +1067,14 @@ export async function analyzeIncrementalImport(sessionId: string, filePath: stri return sendToWorker('analyzeIncrementalImport', { sessionId, filePath }) } +/** + * 导入选项(控制 meta/members 更新行为) + */ +export interface ImportOptions { + metaUpdateMode?: 'patch' | 'none' + memberUpdateMode?: 'upsert' | 'none' +} + /** * 增量导入结果 */ @@ -1073,6 +1082,25 @@ export interface IncrementalImportResult { success: boolean newMessageCount: number error?: string + batch?: { + receivedCount: number + writtenCount: number + duplicateCount: number + errorCount: number + errorReasonCounts: Record + errorSample: Array<{ index: number; reason: string; detail: string }> + } + session?: { + totalCount: number + memberCount: number + firstTimestamp: number + lastTimestamp: number + } + updates?: { + metaUpdated: boolean + membersAdded: number + membersUpdated: number + } } /** @@ -1081,7 +1109,25 @@ export interface IncrementalImportResult { export async function incrementalImport( sessionId: string, filePath: string, - onProgress?: (progress: ParseProgress) => void + onProgress?: (progress: ParseProgress) => void, + options?: ImportOptions ): Promise { - return sendToWorkerWithProgress('incrementalImport', { sessionId, filePath }, onProgress) + return sendToWorkerWithProgress('incrementalImport', { sessionId, filePath, options }, onProgress) +} + +/** + * Dry-run analysis result for new sessions + */ +export interface AnalyzeNewImportResult { + totalMessages: number + totalMembers: number + meta: { name: string; platform: string; type: string } | null + error?: string +} + +/** + * Analyze a new import file without writing to DB (dry-run) + */ +export async function analyzeNewImport(filePath: string): Promise { + return sendToWorker('analyzeNewImport', { filePath }) } diff --git a/electron/preload/apis/api-server.ts b/electron/preload/apis/api-server.ts index 209e264e..9d3f5e41 100644 --- a/electron/preload/apis/api-server.ts +++ b/electron/preload/apis/api-server.ts @@ -1,5 +1,5 @@ /** - * ChatLab API 服务 Preload API + * ChatLab API 服务 Preload API (hierarchical data source model) */ import { ipcRenderer } from 'electron' @@ -18,19 +18,36 @@ export interface ApiServerStatus { error: string | null } -export interface DataSource { +export interface ImportSession { id: string name: string - url: string - token: string - intervalMinutes: number - enabled: boolean + remoteSessionId: string targetSessionId: string lastPullAt: number lastStatus: 'idle' | 'success' | 'error' lastError: string lastNewMessages: number +} + +export interface DataSource { + id: string + name: string + baseUrl: string + token: string + intervalMinutes: number + enabled: boolean createdAt: number + sessions: ImportSession[] +} + +export interface RemoteSession { + id: string + name: string + platform: string + type: string + messageCount?: number + memberCount?: number + lastMessageAt?: number } export const apiServerApi = { @@ -68,13 +85,14 @@ export const apiServerApi = { return ipcRenderer.invoke('api:getDataSources') }, - addDataSource: ( - partial: Omit - ): Promise => { + addDataSource: (partial: { name?: string; baseUrl: string; token: string; intervalMinutes: number }): Promise => { return ipcRenderer.invoke('api:addDataSource', partial) }, - updateDataSource: (id: string, updates: Partial): Promise => { + updateDataSource: ( + id: string, + updates: Partial> + ): Promise => { return ipcRenderer.invoke('api:updateDataSource', id, updates) }, @@ -82,12 +100,40 @@ export const apiServerApi = { return ipcRenderer.invoke('api:deleteDataSource', id) }, - triggerPull: (id: string): Promise<{ success: boolean; error?: string }> => { - return ipcRenderer.invoke('api:triggerPull', id) + // ==================== 导入会话管理 ==================== + + addImportSessions: ( + sourceId: string, + sessions: Array<{ name: string; remoteSessionId: string }> + ): Promise => { + return ipcRenderer.invoke('api:addImportSessions', sourceId, sessions) }, - onPullResult: (callback: (data: { dsId: string; status: string; detail: string }) => void): (() => void) => { - const handler = (_event: any, data: { dsId: string; status: string; detail: string }) => callback(data) + removeImportSession: (sourceId: string, sessionId: string): Promise => { + return ipcRenderer.invoke('api:removeImportSession', sourceId, sessionId) + }, + + // ==================== 同步 ==================== + + triggerPull: (sourceId: string, sessionId?: string): Promise<{ success: boolean; error?: string }> => { + return ipcRenderer.invoke('api:triggerPull', sourceId, sessionId) + }, + + triggerPullAll: (sourceId: string): Promise<{ success: boolean; error?: string }> => { + return ipcRenderer.invoke('api:triggerPullAll', sourceId) + }, + + fetchRemoteSessions: (baseUrl: string, token?: string): Promise => { + return ipcRenderer.invoke('api:fetchRemoteSessions', baseUrl, token || '') + }, + + onPullResult: ( + callback: (data: { sourceId: string; sessionId?: string; status: string; detail: string }) => void + ): (() => void) => { + const handler = ( + _event: any, + data: { sourceId: string; sessionId?: string; status: string; detail: string } + ) => callback(data) ipcRenderer.on('api:pullResult', handler) return () => ipcRenderer.removeListener('api:pullResult', handler) }, diff --git a/electron/preload/index.d.ts b/electron/preload/index.d.ts index 7c864d4e..fd65c820 100644 --- a/electron/preload/index.d.ts +++ b/electron/preload/index.d.ts @@ -1000,19 +1000,36 @@ interface ApiServerStatus { error: string | null } -interface DataSource { +interface ImportSession { id: string name: string - url: string - token: string - intervalMinutes: number - enabled: boolean + remoteSessionId: string targetSessionId: string lastPullAt: number lastStatus: 'idle' | 'success' | 'error' lastError: string lastNewMessages: number +} + +interface DataSource { + id: string + name: string + baseUrl: string + token: string + intervalMinutes: number + enabled: boolean createdAt: number + sessions: ImportSession[] +} + +interface RemoteSession { + id: string + name: string + platform: string + type: string + messageCount?: number + memberCount?: number + lastMessageAt?: number } interface ApiServerApi { @@ -1023,13 +1040,23 @@ interface ApiServerApi { regenerateToken: () => Promise onStartupError: (callback: (data: { error: string }) => void) => () => void getDataSources: () => Promise - addDataSource: ( - partial: Omit - ) => Promise - updateDataSource: (id: string, updates: Partial) => Promise + addDataSource: (partial: { name?: string; baseUrl: string; token: string; intervalMinutes: number }) => Promise + updateDataSource: ( + id: string, + updates: Partial> + ) => Promise deleteDataSource: (id: string) => Promise - triggerPull: (id: string) => Promise<{ success: boolean; error?: string }> - onPullResult: (callback: (data: { dsId: string; status: string; detail: string }) => void) => () => void + addImportSessions: ( + sourceId: string, + sessions: Array<{ name: string; remoteSessionId: string }> + ) => Promise + removeImportSession: (sourceId: string, sessionId: string) => Promise + triggerPull: (sourceId: string, sessionId?: string) => Promise<{ success: boolean; error?: string }> + triggerPullAll: (sourceId: string) => Promise<{ success: boolean; error?: string }> + fetchRemoteSessions: (baseUrl: string, token?: string) => Promise + onPullResult: ( + callback: (data: { sourceId: string; sessionId?: string; status: string; detail: string }) => void + ) => () => void onImportCompleted: (callback: () => void) => () => void } @@ -1188,4 +1215,5 @@ export { ApiServerConfig, ApiServerStatus, DataSource, + ImportSession, } diff --git a/src/components/common/Sidebar.vue b/src/components/common/Sidebar.vue index a0145973..844ee2eb 100644 --- a/src/components/common/Sidebar.vue +++ b/src/components/common/Sidebar.vue @@ -1,6 +1,6 @@