# MongoDB架构改造方案 ## 项目概述 将现有的内存缓存架构改造为基于MongoDB的数据库持久化方案,支持多服务器环境下的任务处理,每个Node服务只处理分配给自己的任务。 ## 适用范围 本方案仅适用于以下异步生成接口,不影响其他同步接口: ### 新策略适用接口 - `/v1/images/generations` - 图片生成接口(异步) - `/v1/video/generations` - 视频生成接口(异步) - `/v1/images/query` - 图片任务查询接口 - `/v1/video/query` - 视频任务查询接口 ### 保持现状的接口 - `/v1/upload/images` - 图片上传接口(同步,不使用缓存策略,保持现有实现) - 其他所有同步接口保持不变 ## 1. 数据表设计 ### 1.1 生成任务表 (jimeng_free_generation_tasks) ```typescript interface IGenerationTask extends Document { task_id: string; // 任务唯一标识,主键 task_type: 'image' | 'video'; // 任务类型 server_id: string; // 分配的服务器ID(外部分配,对应SERVICE_ID) // 原始请求参数 original_params: { model?: string; // 模型名称 prompt: string; // 提示词 negative_prompt?: string; // 负向提示词 width?: number; // 宽度 height?: number; // 高度 sample_strength?: number; // 采样强度 images?: Array<{ // 图片参数(视频生成用) url: string; width: number; height: number; }>; is_pro?: boolean; // 是否专业版 duration?: number; // 时长(毫秒) ratio?: string; // 比例 response_format?: string; // 响应格式 }; // 生成过程中的内部参数 internal_params: { refresh_token: string; // 认证令牌 component_id?: string; // 组件ID history_id?: string; // 即梦平台历史记录ID mapped_model?: string; // 映射后的模型名 submit_id?: string; // 提交ID }; // 任务状态和控制 status: 'pending' | 'processing' | 'polling' | 'completed' | 'failed'; retry_count: number; // 当前重试次数 max_retries: number; // 最大重试次数(默认3次) // 轮询控制 next_poll_at?: number; // 下次轮询时间戳(用于控制轮询间隔) poll_interval: number; // 轮询间隔(秒,默认10秒) task_timeout: number; // 任务超时时间(秒,图片1小时,视频24小时) // 时间戳 created_at: number; // 创建时间戳(秒) updated_at: number; // 更新时间戳(秒) started_at?: number; // 开始处理时间戳(秒) completed_at?: number; // 完成时间戳(秒) // 错误信息 error_message?: string; // 错误描述 fail_code?: string; // 即梦平台返回的失败代码 } ``` #### 数据库索引设计 ```javascript // 复合索引 - 用于轮询查询 { "server_id": 1, "status": 1, "next_poll_at": 1 } // 单字段索引 { "task_id": 1 } // 主键查询 { "created_at": 1 } // 按创建时间清理 { "updated_at": 1 } // 按更新时间查询 ``` ### 1.2 生成结果表 (jimeng_free_generation_results) ```typescript interface IGenerationResult extends Document { task_id: string; // 关联的任务ID,主键 task_type: 'image' | 'video'; // 任务类型 server_id: string; // 处理服务器ID // 生成结果 status: 'success' | 'failed'; // 最终状态 original_urls: string[]; // 原始URL数组(即梦返回的地址) tos_urls: string[]; // TOS URL数组(上传后的地址) // 处理元数据 metadata: { generation_time?: number; // 生成耗时(毫秒) tos_upload_time?: number; // TOS上传耗时(毫秒) total_files: number; // 文件总数 successful_uploads: number; // 成功上传数量 tos_upload_errors?: string[]; // TOS上传错误信息 fail_reason?: string; // 失败原因 }; // 时间管理 created_at: number; // 创建时间戳(秒) expires_at: number; // 过期时间戳(用于自动清理,默认24小时后) } ``` #### 数据库索引设计 ```javascript // 主键索引 { "task_id": 1 } // 过期清理索引 { "expires_at": 1 } // 服务器查询索引 { "server_id": 1, "created_at": 1 } ``` ## 2. 服务架构设计 ### 2.1 任务处理流程 ```mermaid graph TD A[外部请求] --> B[创建任务记录] B --> C[返回任务ID] C --> D[定时轮询服务] D --> E{检查任务状态} E -->|pending| F[开始处理任务] E -->|processing| G[调用即梦API] E -->|polling| H[检查即梦结果] E -->|completed/failed| I[跳过] F --> J[更新为processing] J --> K[调用生成API] K --> L{API调用结果} L -->|成功| M[更新为polling] L -->|失败| N[重试或标记失败] M --> O[设置下次轮询时间] H --> P{即梦任务完成?} P -->|是| Q[处理结果] P -->|否| R[更新轮询时间] Q --> S[上传TOS] S --> T[创建结果记录] T --> U[标记任务完成] V[查询接口] --> W[查找结果记录] W --> X{结果存在?} X -->|是| Y[返回结果并清理] X -->|否| Z[返回进行中状态] ``` ### 2.2 定时轮询服务(TaskPollingService) ```typescript class TaskPollingService { private static instance: TaskPollingService; private currentServerId: string; private pollInterval: NodeJS.Timeout | null = null; private isRunning: boolean = false; private maxConcurrentTasks: number; // 从环境变量获取,默认3个 /** * 启动轮询服务 * 注意:与HeartbeatService分离,使用不同的定时器 */ async start(): Promise { this.currentServerId = process.env.SERVICE_ID || 'jimeng-free-api'; this.maxConcurrentTasks = parseInt(process.env.MAX_CONCURRENT_TASKS || '3'); // 每5秒轮询一次(与心跳服务的60秒区分开) this.pollInterval = setInterval(async () => { await this.processTasks(); }, 5000); this.isRunning = true; logger.info(`Task polling service started for server: ${this.currentServerId}`); } /** * 主处理方法 */ private async processTasks(): Promise { try { const currentTime = Math.floor(Date.now() / 1000); // 1. 处理待处理任务 await this.processPendingTasks(currentTime); // 2. 检查轮询任务 await this.checkPollingTasks(currentTime); // 3. 检查和处理超时任务 await this.checkTimeoutTasks(currentTime); } catch (error) { logger.error('Task polling error:', error); } } /** * 检查超时任务 */ private async checkTimeoutTasks(currentTime: number): Promise { try { const timeoutCount = await DatabaseCleanupService.cleanupTimeoutTasks(); if (timeoutCount > 0) { logger.warn(`Marked ${timeoutCount} tasks as failed due to timeout`); } } catch (error) { logger.error('Failed to check timeout tasks:', error); } } } ``` ### 2.3 与心跳服务的协调 - **HeartbeatService**: 每60秒发送一次心跳,管理服务器在线状态 - **TaskPollingService**: 每5秒轮询一次任务,处理任务状态变更 - **独立运行**: 两个服务使用不同的定时器,互不干扰 - **共享配置**: 都使用相同的SERVICE_ID标识服务器 ## 3. 核心方法重构 ### 3.1 新的生成方法(保持接口兼容) ```typescript // 新的数据库驱动方法 class DatabaseGenerationService { /** * 图片生成 - 数据库版本 */ async generateImagesV2( model: string, taskId: string, prompt: string, params: any, refreshToken: string ): Promise { const currentServerId = process.env.SERVICE_ID || 'jimeng-free-api'; // 创建任务记录 const imageTimeout = parseInt(process.env.IMAGE_TASK_TIMEOUT || '3600'); await GenerationTask.create({ task_id: taskId, task_type: 'image', server_id: currentServerId, original_params: { model, prompt, ...params }, internal_params: { refresh_token: refreshToken }, status: 'pending', retry_count: 0, max_retries: 3, poll_interval: 10, task_timeout: imageTimeout, created_at: Math.floor(Date.now() / 1000), updated_at: Math.floor(Date.now() / 1000) }); logger.info(`Image task created: ${taskId} for server: ${currentServerId}`); } /** * 视频生成 - 数据库版本 */ async generateVideoV2( taskId: string, prompt: string, params: any, refreshToken: string ): Promise { const currentServerId = process.env.SERVICE_ID || 'jimeng-free-api'; const videoTimeout = parseInt(process.env.VIDEO_TASK_TIMEOUT || '86400'); await GenerationTask.create({ task_id: taskId, task_type: 'video', server_id: currentServerId, original_params: { prompt, ...params }, internal_params: { refresh_token: refreshToken }, status: 'pending', retry_count: 0, max_retries: 3, poll_interval: 15, // 视频轮询间隔更长 task_timeout: videoTimeout, created_at: Math.floor(Date.now() / 1000), updated_at: Math.floor(Date.now() / 1000) }); logger.info(`Video task created: ${taskId} for server: ${currentServerId}`); } /** * 查询任务结果 */ async queryTaskResult(taskId: string): Promise { // 1. 先查询结果表 const result = await GenerationResult.findOne({ task_id: taskId }); if (result) { // 找到结果,返回并清理 const response = { created: Math.floor(Date.now() / 1000), data: { task_id: taskId, url: result.tos_urls.join(','), status: result.status === 'success' ? -1 : -2 } }; // 删除结果记录(一次性消费) await GenerationResult.deleteOne({ task_id: taskId }); return response; } // 2. 查询任务状态 const task = await GenerationTask.findOne({ task_id: taskId }); if (!task) { return { created: Math.floor(Date.now() / 1000), data: { task_id: taskId, url: "", status: 0 } // 任务不存在 }; } // 3. 根据任务状态返回 const statusMap = { 'pending': 0, 'processing': 0, 'polling': 0, 'failed': -2, 'completed': -1 // 这种情况理论上不会出现,因为completed会生成result }; return { created: Math.floor(Date.now() / 1000), data: { task_id: taskId, url: "", status: statusMap[task.status] || 0 } }; } } ``` ### 3.2 兼容性切换机制 ```typescript // 通过环境变量控制新旧方法 const USE_DATABASE_MODE = process.env.USE_DATABASE_MODE === 'true'; // 统一的调用入口 export async function generateImages( model: string, taskId: string, prompt: string, params: any, refreshToken: string ) { if (USE_DATABASE_MODE) { return await DatabaseGenerationService.generateImagesV2( model, taskId, prompt, params, refreshToken ); } else { // 调用原有方法 return await OriginalImageController.generateImages( model, taskId, prompt, params, refreshToken ); } } ``` ## 4. 环境变量配置 ### 4.1 新增环境变量 ```bash # 数据库模式开关 USE_DATABASE_MODE=false # 默认false,保持兼容性 # 任务处理配置 MAX_CONCURRENT_TASKS=3 # 最大并发任务数(2核2G建议3个) TASK_POLL_INTERVAL=5 # 任务轮询间隔(秒) IMAGE_TASK_TIMEOUT=3600 # 图片任务超时时间(秒,默认1小时) VIDEO_TASK_TIMEOUT=86400 # 视频任务超时时间(秒,默认24小时) RESULT_EXPIRE_TIME=86400 # 结果过期时间(秒,默认24小时) # 服务标识(已有) SERVICE_ID=jimeng-api-3302 # 服务唯一标识 ``` ### 4.2 配置文件更新 在 `ecosystem.config.json` 中为每个实例配置不同的 `SERVICE_ID`: ```json { "apps": [ { "name": "jimeng-api-3302", "env": { "SERVICE_ID": "jimeng-api-3302", "PORT": 3302 } }, { "name": "jimeng-api-3303", "env": { "SERVICE_ID": "jimeng-api-3303", "PORT": 3303 } } ] } ``` ## 5. 数据库操作优化 ### 5.1 原子操作设计 ```typescript // 原子更新任务状态,避免并发冲突 async function atomicUpdateTaskStatus( taskId: string, fromStatus: string, toStatus: string, updateData: any = {} ): Promise { const result = await GenerationTask.updateOne( { task_id: taskId, status: fromStatus }, { status: toStatus, updated_at: Math.floor(Date.now() / 1000), ...updateData } ); return result.modifiedCount > 0; } ``` ### 5.2 批量查询优化 ```typescript // 批量获取待处理任务 async function getBatchPendingTasks(serverId: string, limit: number): Promise { const currentTime = Math.floor(Date.now() / 1000); return await GenerationTask.find({ server_id: serverId, status: 'pending' }) .sort({ created_at: 1 }) // 先入先出 .limit(limit); } ``` ## 6. 监控和日志 ### 6.1 任务状态监控 ```typescript class TaskMonitor { static async getTaskStats(serverId?: string): Promise { const filter = serverId ? { server_id: serverId } : {}; const stats = await GenerationTask.aggregate([ { $match: filter }, { $group: { _id: "$status", count: { $sum: 1 } } } ]); return stats.reduce((acc, curr) => { acc[curr._id] = curr.count; return acc; }, {}); } static async getServerLoad(serverId: string): Promise { return await GenerationTask.countDocuments({ server_id: serverId, status: { $in: ['processing', 'polling'] } }); } } ``` ### 6.2 专用日志文件 ```typescript // 创建专用的任务轮询日志 const TASK_POLLING_LOG_PATH = path.resolve("./logs/task_polling.log"); function taskLog(message: string) { const timestamp = formatInTimeZone(new Date(), timeZone, "yyyy-MM-dd HH:mm:ss.SSS"); const logMessage = `[TaskPolling][${timestamp}] ${message}`; fs.ensureDirSync(path.dirname(TASK_POLLING_LOG_PATH)); fs.appendFileSync(TASK_POLLING_LOG_PATH, logMessage + "\n"); } ``` ## 7. 数据清理策略 ### 7.1 自动清理任务 ```typescript class DatabaseCleanupService { /** * 清理过期的结果记录 */ static async cleanupExpiredResults(): Promise { const currentTime = Math.floor(Date.now() / 1000); const result = await GenerationResult.deleteMany({ expires_at: { $lt: currentTime } }); return result.deletedCount; } /** * 清理超时的任务 */ static async cleanupTimeoutTasks(): Promise { const currentTime = Math.floor(Date.now() / 1000); // 查找超时的任务并标记为失败 const timeoutTasks = await GenerationTask.find({ status: { $in: ['processing', 'polling'] }, $expr: { $gt: [ { $subtract: [currentTime, '$started_at'] }, '$task_timeout' ] } }); // 批量更新超时任务为失败状态 const timeoutTaskIds = timeoutTasks.map(task => task.task_id); if (timeoutTaskIds.length > 0) { await GenerationTask.updateMany( { task_id: { $in: timeoutTaskIds } }, { status: 'failed', error_message: 'Task timeout', updated_at: currentTime, completed_at: currentTime } ); // 为超时任务创建失败结果记录 const failedResults = timeoutTasks.map(task => ({ task_id: task.task_id, task_type: task.task_type, server_id: task.server_id, status: 'failed', original_urls: [], tos_urls: [], metadata: { total_files: 0, successful_uploads: 0, fail_reason: 'Task timeout after ' + task.task_timeout + ' seconds' }, created_at: currentTime, expires_at: currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400') })); if (failedResults.length > 0) { await GenerationResult.insertMany(failedResults); } } return timeoutTaskIds.length; } } ``` ### 7.2 定期清理任务 集成到轮询服务中,每小时执行一次清理: ```typescript // 在TaskPollingService中添加 private cleanupCounter = 0; private async processTasks(): Promise { // ... 现有逻辑 ... // 每720次轮询(1小时)执行一次清理 this.cleanupCounter++; if (this.cleanupCounter >= 720) { await this.performCleanup(); this.cleanupCounter = 0; } } ``` ## 8. 迁移计划 ### Phase 1: 准备阶段 - [ ] 创建数据模型文件 - [ ] 实现新的生成服务类 - [ ] 添加轮询服务框架 - [ ] 更新环境变量配置 ### Phase 2: 测试阶段 - [ ] 单元测试编写 - [ ] 本地环境测试 - [ ] 小流量灰度测试(USE_DATABASE_MODE=true) ### Phase 3: 切换阶段 - [ ] 全量切换到数据库模式 - [ ] 监控系统稳定性 - [ ] 性能调优 ### Phase 4: 清理阶段 - [ ] 移除旧的内存缓存代码 - [ ] 清理不再使用的配置 - [ ] 文档更新 ## 9. 风险评估和应对 ### 9.1 主要风险 1. **数据库性能瓶颈** - 风险:高频轮询可能对MongoDB造成压力 - 应对:优化索引、控制轮询频率、使用连接池 2. **任务丢失风险** - 风险:服务器宕机时正在处理的任务可能丢失 - 应对:实现任务恢复机制、超时重试 3. **状态不一致** - 风险:并发更新可能导致状态不一致 - 应对:使用原子操作、乐观锁 ### 9.2 回滚方案 - 保持 `USE_DATABASE_MODE=false` 可随时切回原有方案 - 原有代码完整保留,确保回滚路径畅通 - 数据库数据不影响原有内存缓存功能 ## 10. 性能指标 ### 10.1 目标指标 - **任务处理延迟**: < 10秒(从pending到processing) - **轮询响应时间**: < 100ms - **数据库查询时间**: < 50ms - **任务成功率**: > 99% - **内存使用**: < 512MB(单实例) ### 10.2 监控指标 - 各状态任务数量统计 - 平均任务处理时间 - 服务器负载情况 - 数据库连接状态 - 错误率统计 --- ## 总结 这个改造方案将现有的内存缓存架构平滑迁移到MongoDB数据库,保持了接口兼容性,支持多服务器部署,实现了任务的持久化和可靠处理。每个服务器只需要关注自己的任务,简化了架构复杂度,提高了系统的可维护性和可扩展性。 ## 核对清单 ### ✅ 已完善的部分 1. **接口范围明确**: - ✅ 明确了新策略适用的接口:`/v1/images/generations`、`/v1/video/generations` 及对应的 query 接口 - ✅ 明确了保持现状的接口:`/v1/upload/images`(同步接口,不使用缓存策略) 2. **超时配置完善**: - ✅ 添加了任务级别的超时配置:`task_timeout` 字段 - ✅ 区分了图片和视频的超时时间:图片1小时,视频24小时 - ✅ 实现了超时任务自动标记为失败的机制 - ✅ 在轮询服务中集成了超时检查逻辑 3. **配置文件更新**: - ✅ 更新了 `template.ecosystem.config.json`,添加了所有新的环境变量 - ✅ 更新了 `template.docker-compose.yml`,添加了数据库模式和超时配置 - ✅ 为每个服务实例配置了独立的超时参数 4. **数据库设计优化**: - ✅ 任务表增加了 `task_timeout` 字段 - ✅ 超时清理逻辑会创建失败结果记录,保证query接口的一致性 - ✅ 实现了原子操作避免并发冲突 ### 📋 环境变量配置总览 | 变量名 | 默认值 | 说明 | |--------|--------|------| | `USE_DATABASE_MODE` | `false` | 数据库模式开关,控制新旧方法切换 | | `MAX_CONCURRENT_TASKS` | `3` | 最大并发任务数(2核2G服务器建议值) | | `TASK_POLL_INTERVAL` | `5` | 轮询间隔(秒) | | `IMAGE_TASK_TIMEOUT` | `3600` | 图片任务超时时间(1小时) | | `VIDEO_TASK_TIMEOUT` | `86400` | 视频任务超时时间(24小时) | | `RESULT_EXPIRE_TIME` | `86400` | 结果过期时间(24小时) | | `SERVICE_ID` | - | 服务唯一标识(必须配置) | ### 🔄 任务状态流转确认 ``` pending → processing → polling → completed/failed ↓ ↓ ↓ 超时 超时 超时 ↓ ↓ ↓ failed failed failed ``` ### 🚫 不受影响的接口 - `/v1/upload/images` - 同步上传接口,保持现有实现 - 所有其他同步接口保持不变 - 现有的 `ImagesTaskCache` 和 `VideoTaskCache` 内存缓存在 `USE_DATABASE_MODE=false` 时继续正常工作 ### ⚠️ 注意事项 1. **渐进式迁移**:通过 `USE_DATABASE_MODE` 环境变量控制,可以随时回滚 2. **超时处理**:超时任务会自动创建失败结果记录,确保query接口行为一致 3. **服务隔离**:每个服务只处理自己的任务,通过 `SERVICE_ID` 区分 4. **轮询分离**:任务轮询服务(5秒间隔)与心跳服务(60秒间隔)完全独立 5. **配置灵活**:所有关键参数都通过环境变量配置,支持不同环境的差异化设置