jimeng-free-api/MONGODB_ARCHITECTURE_PLAN.md
2025-08-27 15:00:38 +08:00

23 KiB
Raw Blame History

MongoDB架构改造方案

项目概述

将现有的内存缓存架构改造为基于MongoDB的数据库持久化方案支持多服务器环境下的任务处理每个Node服务只处理分配给自己的任务。

适用范围

本方案仅适用于以下异步生成接口,不影响其他同步接口:

新策略适用接口

  • /v1/images/generations - 图片生成接口(异步)
  • /v1/video/generations - 视频生成接口(异步)
  • /v1/images/query - 图片任务查询接口
  • /v1/video/query - 视频任务查询接口

保持现状的接口

  • /v1/upload/images - 图片上传接口(同步,不使用缓存策略,保持现有实现)
  • 其他所有同步接口保持不变

1. 数据表设计

1.1 生成任务表 (jimeng_free_generation_tasks)

interface IGenerationTask extends Document {
    task_id: string;              // 任务唯一标识,主键
    task_type: 'image' | 'video'; // 任务类型
    server_id: string;            // 分配的服务器ID外部分配对应SERVICE_ID
    
    // 原始请求参数
    original_params: {
        model?: string;           // 模型名称
        prompt: string;           // 提示词
        negative_prompt?: string; // 负向提示词
        width?: number;           // 宽度
        height?: number;          // 高度
        sample_strength?: number; // 采样强度
        images?: Array<{          // 图片参数(视频生成用)
            url: string;
            width: number;
            height: number;
        }>;
        is_pro?: boolean;         // 是否专业版
        duration?: number;        // 时长(毫秒)
        ratio?: string;           // 比例
        response_format?: string; // 响应格式
    };
    
    // 生成过程中的内部参数
    internal_params: {
        refresh_token: string;    // 认证令牌
        component_id?: string;    // 组件ID
        history_id?: string;      // 即梦平台历史记录ID
        mapped_model?: string;    // 映射后的模型名
        submit_id?: string;       // 提交ID
    };
    
    // 任务状态和控制
    status: 'pending' | 'processing' | 'polling' | 'completed' | 'failed';
    retry_count: number;          // 当前重试次数
    max_retries: number;          // 最大重试次数默认3次
    
    // 轮询控制
    next_poll_at?: number;        // 下次轮询时间戳(用于控制轮询间隔)
    poll_interval: number;        // 轮询间隔默认10秒
    task_timeout: number;         // 任务超时时间图片1小时视频24小时
    
    // 时间戳
    created_at: number;           // 创建时间戳(秒)
    updated_at: number;           // 更新时间戳(秒)
    started_at?: number;          // 开始处理时间戳(秒)
    completed_at?: number;        // 完成时间戳(秒)
    
    // 错误信息
    error_message?: string;       // 错误描述
    fail_code?: string;           // 即梦平台返回的失败代码
}

数据库索引设计

// 复合索引 - 用于轮询查询
{ "server_id": 1, "status": 1, "next_poll_at": 1 }

// 单字段索引
{ "task_id": 1 }        // 主键查询
{ "created_at": 1 }     // 按创建时间清理
{ "updated_at": 1 }     // 按更新时间查询

1.2 生成结果表 (jimeng_free_generation_results)

interface IGenerationResult extends Document {
    task_id: string;              // 关联的任务ID主键
    task_type: 'image' | 'video'; // 任务类型
    server_id: string;            // 处理服务器ID
    
    // 生成结果
    status: 'success' | 'failed'; // 最终状态
    original_urls: string[];      // 原始URL数组即梦返回的地址
    tos_urls: string[];           // TOS URL数组上传后的地址
    
    // 处理元数据
    metadata: {
        generation_time?: number;     // 生成耗时(毫秒)
        tos_upload_time?: number;     // TOS上传耗时毫秒
        total_files: number;          // 文件总数
        successful_uploads: number;   // 成功上传数量
        tos_upload_errors?: string[]; // TOS上传错误信息
        fail_reason?: string;         // 失败原因
    };
    
    // 时间管理
    created_at: number;           // 创建时间戳(秒)
    expires_at: number;           // 过期时间戳用于自动清理默认24小时后
}

数据库索引设计

// 主键索引
{ "task_id": 1 }

// 过期清理索引
{ "expires_at": 1 }

// 服务器查询索引
{ "server_id": 1, "created_at": 1 }

2. 服务架构设计

2.1 任务处理流程

graph TD
    A[外部请求] --> B[创建任务记录]
    B --> C[返回任务ID]
    C --> D[定时轮询服务]
    D --> E{检查任务状态}
    E -->|pending| F[开始处理任务]
    E -->|processing| G[调用即梦API]
    E -->|polling| H[检查即梦结果]
    E -->|completed/failed| I[跳过]
    
    F --> J[更新为processing]
    J --> K[调用生成API]
    K --> L{API调用结果}
    L -->|成功| M[更新为polling]
    L -->|失败| N[重试或标记失败]
    
    M --> O[设置下次轮询时间]
    H --> P{即梦任务完成?}
    P -->|是| Q[处理结果]
    P -->|否| R[更新轮询时间]
    
    Q --> S[上传TOS]
    S --> T[创建结果记录]
    T --> U[标记任务完成]
    
    V[查询接口] --> W[查找结果记录]
    W --> X{结果存在?}
    X -->|是| Y[返回结果并清理]
    X -->|否| Z[返回进行中状态]

2.2 定时轮询服务TaskPollingService

class TaskPollingService {
    private static instance: TaskPollingService;
    private currentServerId: string;
    private pollInterval: NodeJS.Timeout | null = null;
    private isRunning: boolean = false;
    private maxConcurrentTasks: number; // 从环境变量获取默认3个
    
    /**
     * 启动轮询服务
     * 注意与HeartbeatService分离使用不同的定时器
     */
    async start(): Promise<void> {
        this.currentServerId = process.env.SERVICE_ID || 'jimeng-free-api';
        this.maxConcurrentTasks = parseInt(process.env.MAX_CONCURRENT_TASKS || '3');
        
        // 每5秒轮询一次与心跳服务的60秒区分开
        this.pollInterval = setInterval(async () => {
            await this.processTasks();
        }, 5000);
        
        this.isRunning = true;
        logger.info(`Task polling service started for server: ${this.currentServerId}`);
    }
    
    /**
     * 主处理方法
     */
    private async processTasks(): Promise<void> {
        try {
            const currentTime = Math.floor(Date.now() / 1000);
            
            // 1. 处理待处理任务
            await this.processPendingTasks(currentTime);
            
            // 2. 检查轮询任务
            await this.checkPollingTasks(currentTime);
            
            // 3. 检查和处理超时任务
            await this.checkTimeoutTasks(currentTime);
            
        } catch (error) {
            logger.error('Task polling error:', error);
        }
    }
    
    /**
     * 检查超时任务
     */
    private async checkTimeoutTasks(currentTime: number): Promise<void> {
        try {
            const timeoutCount = await DatabaseCleanupService.cleanupTimeoutTasks();
            if (timeoutCount > 0) {
                logger.warn(`Marked ${timeoutCount} tasks as failed due to timeout`);
            }
        } catch (error) {
            logger.error('Failed to check timeout tasks:', error);
        }
    }
}

2.3 与心跳服务的协调

  • HeartbeatService: 每60秒发送一次心跳管理服务器在线状态
  • TaskPollingService: 每5秒轮询一次任务处理任务状态变更
  • 独立运行: 两个服务使用不同的定时器,互不干扰
  • 共享配置: 都使用相同的SERVICE_ID标识服务器

3. 核心方法重构

3.1 新的生成方法(保持接口兼容)

// 新的数据库驱动方法
class DatabaseGenerationService {
    
    /**
     * 图片生成 - 数据库版本
     */
    async generateImagesV2(
        model: string,
        taskId: string, 
        prompt: string,
        params: any,
        refreshToken: string
    ): Promise<void> {
        const currentServerId = process.env.SERVICE_ID || 'jimeng-free-api';
        
        // 创建任务记录
        const imageTimeout = parseInt(process.env.IMAGE_TASK_TIMEOUT || '3600');
        
        await GenerationTask.create({
            task_id: taskId,
            task_type: 'image',
            server_id: currentServerId,
            original_params: { model, prompt, ...params },
            internal_params: { refresh_token: refreshToken },
            status: 'pending',
            retry_count: 0,
            max_retries: 3,
            poll_interval: 10,
            task_timeout: imageTimeout,
            created_at: Math.floor(Date.now() / 1000),
            updated_at: Math.floor(Date.now() / 1000)
        });
        
        logger.info(`Image task created: ${taskId} for server: ${currentServerId}`);
    }
    
    /**
     * 视频生成 - 数据库版本
     */
    async generateVideoV2(
        taskId: string,
        prompt: string, 
        params: any,
        refreshToken: string
    ): Promise<void> {
        const currentServerId = process.env.SERVICE_ID || 'jimeng-free-api';
        
        const videoTimeout = parseInt(process.env.VIDEO_TASK_TIMEOUT || '86400');
        
        await GenerationTask.create({
            task_id: taskId,
            task_type: 'video',
            server_id: currentServerId,
            original_params: { prompt, ...params },
            internal_params: { refresh_token: refreshToken },
            status: 'pending',
            retry_count: 0,
            max_retries: 3,
            poll_interval: 15, // 视频轮询间隔更长
            task_timeout: videoTimeout,
            created_at: Math.floor(Date.now() / 1000),
            updated_at: Math.floor(Date.now() / 1000)
        });
        
        logger.info(`Video task created: ${taskId} for server: ${currentServerId}`);
    }
    
    /**
     * 查询任务结果
     */
    async queryTaskResult(taskId: string): Promise<any> {
        // 1. 先查询结果表
        const result = await GenerationResult.findOne({ task_id: taskId });
        
        if (result) {
            // 找到结果,返回并清理
            const response = {
                created: Math.floor(Date.now() / 1000),
                data: {
                    task_id: taskId,
                    url: result.tos_urls.join(','),
                    status: result.status === 'success' ? -1 : -2
                }
            };
            
            // 删除结果记录(一次性消费)
            await GenerationResult.deleteOne({ task_id: taskId });
            
            return response;
        }
        
        // 2. 查询任务状态
        const task = await GenerationTask.findOne({ task_id: taskId });
        
        if (!task) {
            return {
                created: Math.floor(Date.now() / 1000),
                data: { task_id: taskId, url: "", status: 0 } // 任务不存在
            };
        }
        
        // 3. 根据任务状态返回
        const statusMap = {
            'pending': 0,
            'processing': 0,
            'polling': 0,
            'failed': -2,
            'completed': -1  // 这种情况理论上不会出现因为completed会生成result
        };
        
        return {
            created: Math.floor(Date.now() / 1000),
            data: { 
                task_id: taskId, 
                url: "", 
                status: statusMap[task.status] || 0 
            }
        };
    }
}

3.2 兼容性切换机制

// 通过环境变量控制新旧方法
const USE_DATABASE_MODE = process.env.USE_DATABASE_MODE === 'true';

// 统一的调用入口
export async function generateImages(
    model: string,
    taskId: string,
    prompt: string,
    params: any,
    refreshToken: string
) {
    if (USE_DATABASE_MODE) {
        return await DatabaseGenerationService.generateImagesV2(
            model, taskId, prompt, params, refreshToken
        );
    } else {
        // 调用原有方法
        return await OriginalImageController.generateImages(
            model, taskId, prompt, params, refreshToken
        );
    }
}

4. 环境变量配置

4.1 新增环境变量

# 数据库模式开关
USE_DATABASE_MODE=false              # 默认false保持兼容性

# 任务处理配置
MAX_CONCURRENT_TASKS=3               # 最大并发任务数2核2G建议3个
TASK_POLL_INTERVAL=5                 # 任务轮询间隔(秒)
IMAGE_TASK_TIMEOUT=3600              # 图片任务超时时间默认1小时
VIDEO_TASK_TIMEOUT=86400             # 视频任务超时时间默认24小时
RESULT_EXPIRE_TIME=86400             # 结果过期时间默认24小时

# 服务标识(已有)
SERVICE_ID=jimeng-api-3302           # 服务唯一标识

4.2 配置文件更新

ecosystem.config.json 中为每个实例配置不同的 SERVICE_ID

{
  "apps": [
    {
      "name": "jimeng-api-3302",
      "env": {
        "SERVICE_ID": "jimeng-api-3302",
        "PORT": 3302
      }
    },
    {
      "name": "jimeng-api-3303", 
      "env": {
        "SERVICE_ID": "jimeng-api-3303",
        "PORT": 3303
      }
    }
  ]
}

5. 数据库操作优化

5.1 原子操作设计

// 原子更新任务状态,避免并发冲突
async function atomicUpdateTaskStatus(
    taskId: string, 
    fromStatus: string, 
    toStatus: string,
    updateData: any = {}
): Promise<boolean> {
    const result = await GenerationTask.updateOne(
        { 
            task_id: taskId, 
            status: fromStatus 
        },
        { 
            status: toStatus,
            updated_at: Math.floor(Date.now() / 1000),
            ...updateData
        }
    );
    
    return result.modifiedCount > 0;
}

5.2 批量查询优化

// 批量获取待处理任务
async function getBatchPendingTasks(serverId: string, limit: number): Promise<IGenerationTask[]> {
    const currentTime = Math.floor(Date.now() / 1000);
    
    return await GenerationTask.find({
        server_id: serverId,
        status: 'pending'
    })
    .sort({ created_at: 1 }) // 先入先出
    .limit(limit);
}

6. 监控和日志

6.1 任务状态监控

class TaskMonitor {
    static async getTaskStats(serverId?: string): Promise<any> {
        const filter = serverId ? { server_id: serverId } : {};
        
        const stats = await GenerationTask.aggregate([
            { $match: filter },
            {
                $group: {
                    _id: "$status",
                    count: { $sum: 1 }
                }
            }
        ]);
        
        return stats.reduce((acc, curr) => {
            acc[curr._id] = curr.count;
            return acc;
        }, {});
    }
    
    static async getServerLoad(serverId: string): Promise<number> {
        return await GenerationTask.countDocuments({
            server_id: serverId,
            status: { $in: ['processing', 'polling'] }
        });
    }
}

6.2 专用日志文件

// 创建专用的任务轮询日志
const TASK_POLLING_LOG_PATH = path.resolve("./logs/task_polling.log");

function taskLog(message: string) {
    const timestamp = formatInTimeZone(new Date(), timeZone, "yyyy-MM-dd HH:mm:ss.SSS");
    const logMessage = `[TaskPolling][${timestamp}] ${message}`;
    
    fs.ensureDirSync(path.dirname(TASK_POLLING_LOG_PATH));
    fs.appendFileSync(TASK_POLLING_LOG_PATH, logMessage + "\n");
}

7. 数据清理策略

7.1 自动清理任务

class DatabaseCleanupService {
    /**
     * 清理过期的结果记录
     */
    static async cleanupExpiredResults(): Promise<number> {
        const currentTime = Math.floor(Date.now() / 1000);
        
        const result = await GenerationResult.deleteMany({
            expires_at: { $lt: currentTime }
        });
        
        return result.deletedCount;
    }
    
    /**
     * 清理超时的任务
     */
    static async cleanupTimeoutTasks(): Promise<number> {
        const currentTime = Math.floor(Date.now() / 1000);
        
        // 查找超时的任务并标记为失败
        const timeoutTasks = await GenerationTask.find({
            status: { $in: ['processing', 'polling'] },
            $expr: {
                $gt: [
                    { $subtract: [currentTime, '$started_at'] },
                    '$task_timeout'
                ]
            }
        });
        
        // 批量更新超时任务为失败状态
        const timeoutTaskIds = timeoutTasks.map(task => task.task_id);
        
        if (timeoutTaskIds.length > 0) {
            await GenerationTask.updateMany(
                { task_id: { $in: timeoutTaskIds } },
                { 
                    status: 'failed',
                    error_message: 'Task timeout',
                    updated_at: currentTime,
                    completed_at: currentTime
                }
            );
            
            // 为超时任务创建失败结果记录
            const failedResults = timeoutTasks.map(task => ({
                task_id: task.task_id,
                task_type: task.task_type,
                server_id: task.server_id,
                status: 'failed',
                original_urls: [],
                tos_urls: [],
                metadata: {
                    total_files: 0,
                    successful_uploads: 0,
                    fail_reason: 'Task timeout after ' + task.task_timeout + ' seconds'
                },
                created_at: currentTime,
                expires_at: currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400')
            }));
            
            if (failedResults.length > 0) {
                await GenerationResult.insertMany(failedResults);
            }
        }
        
        return timeoutTaskIds.length;
    }
}

7.2 定期清理任务

集成到轮询服务中,每小时执行一次清理:

// 在TaskPollingService中添加
private cleanupCounter = 0;

private async processTasks(): Promise<void> {
    // ... 现有逻辑 ...
    
    // 每720次轮询1小时执行一次清理
    this.cleanupCounter++;
    if (this.cleanupCounter >= 720) {
        await this.performCleanup();
        this.cleanupCounter = 0;
    }
}

8. 迁移计划

Phase 1: 准备阶段

  • 创建数据模型文件
  • 实现新的生成服务类
  • 添加轮询服务框架
  • 更新环境变量配置

Phase 2: 测试阶段

  • 单元测试编写
  • 本地环境测试
  • 小流量灰度测试USE_DATABASE_MODE=true

Phase 3: 切换阶段

  • 全量切换到数据库模式
  • 监控系统稳定性
  • 性能调优

Phase 4: 清理阶段

  • 移除旧的内存缓存代码
  • 清理不再使用的配置
  • 文档更新

9. 风险评估和应对

9.1 主要风险

  1. 数据库性能瓶颈

    • 风险高频轮询可能对MongoDB造成压力
    • 应对:优化索引、控制轮询频率、使用连接池
  2. 任务丢失风险

    • 风险:服务器宕机时正在处理的任务可能丢失
    • 应对:实现任务恢复机制、超时重试
  3. 状态不一致

    • 风险:并发更新可能导致状态不一致
    • 应对:使用原子操作、乐观锁

9.2 回滚方案

  • 保持 USE_DATABASE_MODE=false 可随时切回原有方案
  • 原有代码完整保留,确保回滚路径畅通
  • 数据库数据不影响原有内存缓存功能

10. 性能指标

10.1 目标指标

  • 任务处理延迟: < 10秒从pending到processing
  • 轮询响应时间: < 100ms
  • 数据库查询时间: < 50ms
  • 任务成功率: > 99%
  • 内存使用: < 512MB单实例

10.2 监控指标

  • 各状态任务数量统计
  • 平均任务处理时间
  • 服务器负载情况
  • 数据库连接状态
  • 错误率统计

总结

这个改造方案将现有的内存缓存架构平滑迁移到MongoDB数据库保持了接口兼容性支持多服务器部署实现了任务的持久化和可靠处理。每个服务器只需要关注自己的任务简化了架构复杂度提高了系统的可维护性和可扩展性。

核对清单

已完善的部分

  1. 接口范围明确

    • 明确了新策略适用的接口:/v1/images/generations/v1/video/generations 及对应的 query 接口
    • 明确了保持现状的接口:/v1/upload/images(同步接口,不使用缓存策略)
  2. 超时配置完善

    • 添加了任务级别的超时配置:task_timeout 字段
    • 区分了图片和视频的超时时间图片1小时视频24小时
    • 实现了超时任务自动标记为失败的机制
    • 在轮询服务中集成了超时检查逻辑
  3. 配置文件更新

    • 更新了 template.ecosystem.config.json,添加了所有新的环境变量
    • 更新了 template.docker-compose.yml,添加了数据库模式和超时配置
    • 为每个服务实例配置了独立的超时参数
  4. 数据库设计优化

    • 任务表增加了 task_timeout 字段
    • 超时清理逻辑会创建失败结果记录保证query接口的一致性
    • 实现了原子操作避免并发冲突

📋 环境变量配置总览

变量名 默认值 说明
USE_DATABASE_MODE false 数据库模式开关,控制新旧方法切换
MAX_CONCURRENT_TASKS 3 最大并发任务数2核2G服务器建议值
TASK_POLL_INTERVAL 5 轮询间隔(秒)
IMAGE_TASK_TIMEOUT 3600 图片任务超时时间1小时
VIDEO_TASK_TIMEOUT 86400 视频任务超时时间24小时
RESULT_EXPIRE_TIME 86400 结果过期时间24小时
SERVICE_ID - 服务唯一标识(必须配置)

🔄 任务状态流转确认

pending → processing → polling → completed/failed
   ↓         ↓          ↓
  超时      超时       超时
   ↓         ↓          ↓
 failed    failed    failed

🚫 不受影响的接口

  • /v1/upload/images - 同步上传接口,保持现有实现
  • 所有其他同步接口保持不变
  • 现有的 ImagesTaskCacheVideoTaskCache 内存缓存在 USE_DATABASE_MODE=false 时继续正常工作

⚠️ 注意事项

  1. 渐进式迁移:通过 USE_DATABASE_MODE 环境变量控制,可以随时回滚
  2. 超时处理超时任务会自动创建失败结果记录确保query接口行为一致
  3. 服务隔离:每个服务只处理自己的任务,通过 SERVICE_ID 区分
  4. 轮询分离任务轮询服务5秒间隔与心跳服务60秒间隔完全独立
  5. 配置灵活:所有关键参数都通过环境变量配置,支持不同环境的差异化设置