23 KiB
23 KiB
MongoDB架构改造方案
项目概述
将现有的内存缓存架构改造为基于MongoDB的数据库持久化方案,支持多服务器环境下的任务处理,每个Node服务只处理分配给自己的任务。
适用范围
本方案仅适用于以下异步生成接口,不影响其他同步接口:
新策略适用接口
/v1/images/generations- 图片生成接口(异步)/v1/video/generations- 视频生成接口(异步)/v1/images/query- 图片任务查询接口/v1/video/query- 视频任务查询接口
保持现状的接口
/v1/upload/images- 图片上传接口(同步,不使用缓存策略,保持现有实现)- 其他所有同步接口保持不变
1. 数据表设计
1.1 生成任务表 (jimeng_free_generation_tasks)
interface IGenerationTask extends Document {
task_id: string; // 任务唯一标识,主键
task_type: 'image' | 'video'; // 任务类型
server_id: string; // 分配的服务器ID(外部分配,对应SERVICE_ID)
// 原始请求参数
original_params: {
model?: string; // 模型名称
prompt: string; // 提示词
negative_prompt?: string; // 负向提示词
width?: number; // 宽度
height?: number; // 高度
sample_strength?: number; // 采样强度
images?: Array<{ // 图片参数(视频生成用)
url: string;
width: number;
height: number;
}>;
is_pro?: boolean; // 是否专业版
duration?: number; // 时长(毫秒)
ratio?: string; // 比例
response_format?: string; // 响应格式
};
// 生成过程中的内部参数
internal_params: {
refresh_token: string; // 认证令牌
component_id?: string; // 组件ID
history_id?: string; // 即梦平台历史记录ID
mapped_model?: string; // 映射后的模型名
submit_id?: string; // 提交ID
};
// 任务状态和控制
status: 'pending' | 'processing' | 'polling' | 'completed' | 'failed';
retry_count: number; // 当前重试次数
max_retries: number; // 最大重试次数(默认3次)
// 轮询控制
next_poll_at?: number; // 下次轮询时间戳(用于控制轮询间隔)
poll_interval: number; // 轮询间隔(秒,默认10秒)
task_timeout: number; // 任务超时时间(秒,图片1小时,视频24小时)
// 时间戳
created_at: number; // 创建时间戳(秒)
updated_at: number; // 更新时间戳(秒)
started_at?: number; // 开始处理时间戳(秒)
completed_at?: number; // 完成时间戳(秒)
// 错误信息
error_message?: string; // 错误描述
fail_code?: string; // 即梦平台返回的失败代码
}
数据库索引设计
// 复合索引 - 用于轮询查询
{ "server_id": 1, "status": 1, "next_poll_at": 1 }
// 单字段索引
{ "task_id": 1 } // 主键查询
{ "created_at": 1 } // 按创建时间清理
{ "updated_at": 1 } // 按更新时间查询
1.2 生成结果表 (jimeng_free_generation_results)
interface IGenerationResult extends Document {
task_id: string; // 关联的任务ID,主键
task_type: 'image' | 'video'; // 任务类型
server_id: string; // 处理服务器ID
// 生成结果
status: 'success' | 'failed'; // 最终状态
original_urls: string[]; // 原始URL数组(即梦返回的地址)
tos_urls: string[]; // TOS URL数组(上传后的地址)
// 处理元数据
metadata: {
generation_time?: number; // 生成耗时(毫秒)
tos_upload_time?: number; // TOS上传耗时(毫秒)
total_files: number; // 文件总数
successful_uploads: number; // 成功上传数量
tos_upload_errors?: string[]; // TOS上传错误信息
fail_reason?: string; // 失败原因
};
// 时间管理
created_at: number; // 创建时间戳(秒)
expires_at: number; // 过期时间戳(用于自动清理,默认24小时后)
}
数据库索引设计
// 主键索引
{ "task_id": 1 }
// 过期清理索引
{ "expires_at": 1 }
// 服务器查询索引
{ "server_id": 1, "created_at": 1 }
2. 服务架构设计
2.1 任务处理流程
graph TD
A[外部请求] --> B[创建任务记录]
B --> C[返回任务ID]
C --> D[定时轮询服务]
D --> E{检查任务状态}
E -->|pending| F[开始处理任务]
E -->|processing| G[调用即梦API]
E -->|polling| H[检查即梦结果]
E -->|completed/failed| I[跳过]
F --> J[更新为processing]
J --> K[调用生成API]
K --> L{API调用结果}
L -->|成功| M[更新为polling]
L -->|失败| N[重试或标记失败]
M --> O[设置下次轮询时间]
H --> P{即梦任务完成?}
P -->|是| Q[处理结果]
P -->|否| R[更新轮询时间]
Q --> S[上传TOS]
S --> T[创建结果记录]
T --> U[标记任务完成]
V[查询接口] --> W[查找结果记录]
W --> X{结果存在?}
X -->|是| Y[返回结果并清理]
X -->|否| Z[返回进行中状态]
2.2 定时轮询服务(TaskPollingService)
class TaskPollingService {
private static instance: TaskPollingService;
private currentServerId: string;
private pollInterval: NodeJS.Timeout | null = null;
private isRunning: boolean = false;
private maxConcurrentTasks: number; // 从环境变量获取,默认3个
/**
* 启动轮询服务
* 注意:与HeartbeatService分离,使用不同的定时器
*/
async start(): Promise<void> {
this.currentServerId = process.env.SERVICE_ID || 'jimeng-free-api';
this.maxConcurrentTasks = parseInt(process.env.MAX_CONCURRENT_TASKS || '3');
// 每5秒轮询一次(与心跳服务的60秒区分开)
this.pollInterval = setInterval(async () => {
await this.processTasks();
}, 5000);
this.isRunning = true;
logger.info(`Task polling service started for server: ${this.currentServerId}`);
}
/**
* 主处理方法
*/
private async processTasks(): Promise<void> {
try {
const currentTime = Math.floor(Date.now() / 1000);
// 1. 处理待处理任务
await this.processPendingTasks(currentTime);
// 2. 检查轮询任务
await this.checkPollingTasks(currentTime);
// 3. 检查和处理超时任务
await this.checkTimeoutTasks(currentTime);
} catch (error) {
logger.error('Task polling error:', error);
}
}
/**
* 检查超时任务
*/
private async checkTimeoutTasks(currentTime: number): Promise<void> {
try {
const timeoutCount = await DatabaseCleanupService.cleanupTimeoutTasks();
if (timeoutCount > 0) {
logger.warn(`Marked ${timeoutCount} tasks as failed due to timeout`);
}
} catch (error) {
logger.error('Failed to check timeout tasks:', error);
}
}
}
2.3 与心跳服务的协调
- HeartbeatService: 每60秒发送一次心跳,管理服务器在线状态
- TaskPollingService: 每5秒轮询一次任务,处理任务状态变更
- 独立运行: 两个服务使用不同的定时器,互不干扰
- 共享配置: 都使用相同的SERVICE_ID标识服务器
3. 核心方法重构
3.1 新的生成方法(保持接口兼容)
// 新的数据库驱动方法
class DatabaseGenerationService {
/**
* 图片生成 - 数据库版本
*/
async generateImagesV2(
model: string,
taskId: string,
prompt: string,
params: any,
refreshToken: string
): Promise<void> {
const currentServerId = process.env.SERVICE_ID || 'jimeng-free-api';
// 创建任务记录
const imageTimeout = parseInt(process.env.IMAGE_TASK_TIMEOUT || '3600');
await GenerationTask.create({
task_id: taskId,
task_type: 'image',
server_id: currentServerId,
original_params: { model, prompt, ...params },
internal_params: { refresh_token: refreshToken },
status: 'pending',
retry_count: 0,
max_retries: 3,
poll_interval: 10,
task_timeout: imageTimeout,
created_at: Math.floor(Date.now() / 1000),
updated_at: Math.floor(Date.now() / 1000)
});
logger.info(`Image task created: ${taskId} for server: ${currentServerId}`);
}
/**
* 视频生成 - 数据库版本
*/
async generateVideoV2(
taskId: string,
prompt: string,
params: any,
refreshToken: string
): Promise<void> {
const currentServerId = process.env.SERVICE_ID || 'jimeng-free-api';
const videoTimeout = parseInt(process.env.VIDEO_TASK_TIMEOUT || '86400');
await GenerationTask.create({
task_id: taskId,
task_type: 'video',
server_id: currentServerId,
original_params: { prompt, ...params },
internal_params: { refresh_token: refreshToken },
status: 'pending',
retry_count: 0,
max_retries: 3,
poll_interval: 15, // 视频轮询间隔更长
task_timeout: videoTimeout,
created_at: Math.floor(Date.now() / 1000),
updated_at: Math.floor(Date.now() / 1000)
});
logger.info(`Video task created: ${taskId} for server: ${currentServerId}`);
}
/**
* 查询任务结果
*/
async queryTaskResult(taskId: string): Promise<any> {
// 1. 先查询结果表
const result = await GenerationResult.findOne({ task_id: taskId });
if (result) {
// 找到结果,返回并清理
const response = {
created: Math.floor(Date.now() / 1000),
data: {
task_id: taskId,
url: result.tos_urls.join(','),
status: result.status === 'success' ? -1 : -2
}
};
// 删除结果记录(一次性消费)
await GenerationResult.deleteOne({ task_id: taskId });
return response;
}
// 2. 查询任务状态
const task = await GenerationTask.findOne({ task_id: taskId });
if (!task) {
return {
created: Math.floor(Date.now() / 1000),
data: { task_id: taskId, url: "", status: 0 } // 任务不存在
};
}
// 3. 根据任务状态返回
const statusMap = {
'pending': 0,
'processing': 0,
'polling': 0,
'failed': -2,
'completed': -1 // 这种情况理论上不会出现,因为completed会生成result
};
return {
created: Math.floor(Date.now() / 1000),
data: {
task_id: taskId,
url: "",
status: statusMap[task.status] || 0
}
};
}
}
3.2 兼容性切换机制
// 通过环境变量控制新旧方法
const USE_DATABASE_MODE = process.env.USE_DATABASE_MODE === 'true';
// 统一的调用入口
export async function generateImages(
model: string,
taskId: string,
prompt: string,
params: any,
refreshToken: string
) {
if (USE_DATABASE_MODE) {
return await DatabaseGenerationService.generateImagesV2(
model, taskId, prompt, params, refreshToken
);
} else {
// 调用原有方法
return await OriginalImageController.generateImages(
model, taskId, prompt, params, refreshToken
);
}
}
4. 环境变量配置
4.1 新增环境变量
# 数据库模式开关
USE_DATABASE_MODE=false # 默认false,保持兼容性
# 任务处理配置
MAX_CONCURRENT_TASKS=3 # 最大并发任务数(2核2G建议3个)
TASK_POLL_INTERVAL=5 # 任务轮询间隔(秒)
IMAGE_TASK_TIMEOUT=3600 # 图片任务超时时间(秒,默认1小时)
VIDEO_TASK_TIMEOUT=86400 # 视频任务超时时间(秒,默认24小时)
RESULT_EXPIRE_TIME=86400 # 结果过期时间(秒,默认24小时)
# 服务标识(已有)
SERVICE_ID=jimeng-api-3302 # 服务唯一标识
4.2 配置文件更新
在 ecosystem.config.json 中为每个实例配置不同的 SERVICE_ID:
{
"apps": [
{
"name": "jimeng-api-3302",
"env": {
"SERVICE_ID": "jimeng-api-3302",
"PORT": 3302
}
},
{
"name": "jimeng-api-3303",
"env": {
"SERVICE_ID": "jimeng-api-3303",
"PORT": 3303
}
}
]
}
5. 数据库操作优化
5.1 原子操作设计
// 原子更新任务状态,避免并发冲突
async function atomicUpdateTaskStatus(
taskId: string,
fromStatus: string,
toStatus: string,
updateData: any = {}
): Promise<boolean> {
const result = await GenerationTask.updateOne(
{
task_id: taskId,
status: fromStatus
},
{
status: toStatus,
updated_at: Math.floor(Date.now() / 1000),
...updateData
}
);
return result.modifiedCount > 0;
}
5.2 批量查询优化
// 批量获取待处理任务
async function getBatchPendingTasks(serverId: string, limit: number): Promise<IGenerationTask[]> {
const currentTime = Math.floor(Date.now() / 1000);
return await GenerationTask.find({
server_id: serverId,
status: 'pending'
})
.sort({ created_at: 1 }) // 先入先出
.limit(limit);
}
6. 监控和日志
6.1 任务状态监控
class TaskMonitor {
static async getTaskStats(serverId?: string): Promise<any> {
const filter = serverId ? { server_id: serverId } : {};
const stats = await GenerationTask.aggregate([
{ $match: filter },
{
$group: {
_id: "$status",
count: { $sum: 1 }
}
}
]);
return stats.reduce((acc, curr) => {
acc[curr._id] = curr.count;
return acc;
}, {});
}
static async getServerLoad(serverId: string): Promise<number> {
return await GenerationTask.countDocuments({
server_id: serverId,
status: { $in: ['processing', 'polling'] }
});
}
}
6.2 专用日志文件
// 创建专用的任务轮询日志
const TASK_POLLING_LOG_PATH = path.resolve("./logs/task_polling.log");
function taskLog(message: string) {
const timestamp = formatInTimeZone(new Date(), timeZone, "yyyy-MM-dd HH:mm:ss.SSS");
const logMessage = `[TaskPolling][${timestamp}] ${message}`;
fs.ensureDirSync(path.dirname(TASK_POLLING_LOG_PATH));
fs.appendFileSync(TASK_POLLING_LOG_PATH, logMessage + "\n");
}
7. 数据清理策略
7.1 自动清理任务
class DatabaseCleanupService {
/**
* 清理过期的结果记录
*/
static async cleanupExpiredResults(): Promise<number> {
const currentTime = Math.floor(Date.now() / 1000);
const result = await GenerationResult.deleteMany({
expires_at: { $lt: currentTime }
});
return result.deletedCount;
}
/**
* 清理超时的任务
*/
static async cleanupTimeoutTasks(): Promise<number> {
const currentTime = Math.floor(Date.now() / 1000);
// 查找超时的任务并标记为失败
const timeoutTasks = await GenerationTask.find({
status: { $in: ['processing', 'polling'] },
$expr: {
$gt: [
{ $subtract: [currentTime, '$started_at'] },
'$task_timeout'
]
}
});
// 批量更新超时任务为失败状态
const timeoutTaskIds = timeoutTasks.map(task => task.task_id);
if (timeoutTaskIds.length > 0) {
await GenerationTask.updateMany(
{ task_id: { $in: timeoutTaskIds } },
{
status: 'failed',
error_message: 'Task timeout',
updated_at: currentTime,
completed_at: currentTime
}
);
// 为超时任务创建失败结果记录
const failedResults = timeoutTasks.map(task => ({
task_id: task.task_id,
task_type: task.task_type,
server_id: task.server_id,
status: 'failed',
original_urls: [],
tos_urls: [],
metadata: {
total_files: 0,
successful_uploads: 0,
fail_reason: 'Task timeout after ' + task.task_timeout + ' seconds'
},
created_at: currentTime,
expires_at: currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400')
}));
if (failedResults.length > 0) {
await GenerationResult.insertMany(failedResults);
}
}
return timeoutTaskIds.length;
}
}
7.2 定期清理任务
集成到轮询服务中,每小时执行一次清理:
// 在TaskPollingService中添加
private cleanupCounter = 0;
private async processTasks(): Promise<void> {
// ... 现有逻辑 ...
// 每720次轮询(1小时)执行一次清理
this.cleanupCounter++;
if (this.cleanupCounter >= 720) {
await this.performCleanup();
this.cleanupCounter = 0;
}
}
8. 迁移计划
Phase 1: 准备阶段
- 创建数据模型文件
- 实现新的生成服务类
- 添加轮询服务框架
- 更新环境变量配置
Phase 2: 测试阶段
- 单元测试编写
- 本地环境测试
- 小流量灰度测试(USE_DATABASE_MODE=true)
Phase 3: 切换阶段
- 全量切换到数据库模式
- 监控系统稳定性
- 性能调优
Phase 4: 清理阶段
- 移除旧的内存缓存代码
- 清理不再使用的配置
- 文档更新
9. 风险评估和应对
9.1 主要风险
-
数据库性能瓶颈
- 风险:高频轮询可能对MongoDB造成压力
- 应对:优化索引、控制轮询频率、使用连接池
-
任务丢失风险
- 风险:服务器宕机时正在处理的任务可能丢失
- 应对:实现任务恢复机制、超时重试
-
状态不一致
- 风险:并发更新可能导致状态不一致
- 应对:使用原子操作、乐观锁
9.2 回滚方案
- 保持
USE_DATABASE_MODE=false可随时切回原有方案 - 原有代码完整保留,确保回滚路径畅通
- 数据库数据不影响原有内存缓存功能
10. 性能指标
10.1 目标指标
- 任务处理延迟: < 10秒(从pending到processing)
- 轮询响应时间: < 100ms
- 数据库查询时间: < 50ms
- 任务成功率: > 99%
- 内存使用: < 512MB(单实例)
10.2 监控指标
- 各状态任务数量统计
- 平均任务处理时间
- 服务器负载情况
- 数据库连接状态
- 错误率统计
总结
这个改造方案将现有的内存缓存架构平滑迁移到MongoDB数据库,保持了接口兼容性,支持多服务器部署,实现了任务的持久化和可靠处理。每个服务器只需要关注自己的任务,简化了架构复杂度,提高了系统的可维护性和可扩展性。
核对清单
✅ 已完善的部分
-
接口范围明确:
- ✅ 明确了新策略适用的接口:
/v1/images/generations、/v1/video/generations及对应的 query 接口 - ✅ 明确了保持现状的接口:
/v1/upload/images(同步接口,不使用缓存策略)
- ✅ 明确了新策略适用的接口:
-
超时配置完善:
- ✅ 添加了任务级别的超时配置:
task_timeout字段 - ✅ 区分了图片和视频的超时时间:图片1小时,视频24小时
- ✅ 实现了超时任务自动标记为失败的机制
- ✅ 在轮询服务中集成了超时检查逻辑
- ✅ 添加了任务级别的超时配置:
-
配置文件更新:
- ✅ 更新了
template.ecosystem.config.json,添加了所有新的环境变量 - ✅ 更新了
template.docker-compose.yml,添加了数据库模式和超时配置 - ✅ 为每个服务实例配置了独立的超时参数
- ✅ 更新了
-
数据库设计优化:
- ✅ 任务表增加了
task_timeout字段 - ✅ 超时清理逻辑会创建失败结果记录,保证query接口的一致性
- ✅ 实现了原子操作避免并发冲突
- ✅ 任务表增加了
📋 环境变量配置总览
| 变量名 | 默认值 | 说明 |
|---|---|---|
USE_DATABASE_MODE |
false |
数据库模式开关,控制新旧方法切换 |
MAX_CONCURRENT_TASKS |
3 |
最大并发任务数(2核2G服务器建议值) |
TASK_POLL_INTERVAL |
5 |
轮询间隔(秒) |
IMAGE_TASK_TIMEOUT |
3600 |
图片任务超时时间(1小时) |
VIDEO_TASK_TIMEOUT |
86400 |
视频任务超时时间(24小时) |
RESULT_EXPIRE_TIME |
86400 |
结果过期时间(24小时) |
SERVICE_ID |
- | 服务唯一标识(必须配置) |
🔄 任务状态流转确认
pending → processing → polling → completed/failed
↓ ↓ ↓
超时 超时 超时
↓ ↓ ↓
failed failed failed
🚫 不受影响的接口
/v1/upload/images- 同步上传接口,保持现有实现- 所有其他同步接口保持不变
- 现有的
ImagesTaskCache和VideoTaskCache内存缓存在USE_DATABASE_MODE=false时继续正常工作
⚠️ 注意事项
- 渐进式迁移:通过
USE_DATABASE_MODE环境变量控制,可以随时回滚 - 超时处理:超时任务会自动创建失败结果记录,确保query接口行为一致
- 服务隔离:每个服务只处理自己的任务,通过
SERVICE_ID区分 - 轮询分离:任务轮询服务(5秒间隔)与心跳服务(60秒间隔)完全独立
- 配置灵活:所有关键参数都通过环境变量配置,支持不同环境的差异化设置