755 lines
23 KiB
Markdown
755 lines
23 KiB
Markdown
# MongoDB架构改造方案
|
||
|
||
## 项目概述
|
||
|
||
将现有的内存缓存架构改造为基于MongoDB的数据库持久化方案,支持多服务器环境下的任务处理,每个Node服务只处理分配给自己的任务。
|
||
|
||
## 适用范围
|
||
|
||
本方案仅适用于以下异步生成接口,不影响其他同步接口:
|
||
|
||
### 新策略适用接口
|
||
- `/v1/images/generations` - 图片生成接口(异步)
|
||
- `/v1/video/generations` - 视频生成接口(异步)
|
||
- `/v1/images/query` - 图片任务查询接口
|
||
- `/v1/video/query` - 视频任务查询接口
|
||
|
||
### 保持现状的接口
|
||
- `/v1/upload/images` - 图片上传接口(同步,不使用缓存策略,保持现有实现)
|
||
- 其他所有同步接口保持不变
|
||
|
||
## 1. 数据表设计
|
||
|
||
### 1.1 生成任务表 (jimeng_free_generation_tasks)
|
||
|
||
```typescript
|
||
interface IGenerationTask extends Document {
|
||
task_id: string; // 任务唯一标识,主键
|
||
task_type: 'image' | 'video'; // 任务类型
|
||
server_id: string; // 分配的服务器ID(外部分配,对应SERVICE_ID)
|
||
|
||
// 原始请求参数
|
||
original_params: {
|
||
model?: string; // 模型名称
|
||
prompt: string; // 提示词
|
||
negative_prompt?: string; // 负向提示词
|
||
width?: number; // 宽度
|
||
height?: number; // 高度
|
||
sample_strength?: number; // 采样强度
|
||
images?: Array<{ // 图片参数(视频生成用)
|
||
url: string;
|
||
width: number;
|
||
height: number;
|
||
}>;
|
||
is_pro?: boolean; // 是否专业版
|
||
duration?: number; // 时长(毫秒)
|
||
ratio?: string; // 比例
|
||
response_format?: string; // 响应格式
|
||
};
|
||
|
||
// 生成过程中的内部参数
|
||
internal_params: {
|
||
refresh_token: string; // 认证令牌
|
||
component_id?: string; // 组件ID
|
||
history_id?: string; // 即梦平台历史记录ID
|
||
mapped_model?: string; // 映射后的模型名
|
||
submit_id?: string; // 提交ID
|
||
};
|
||
|
||
// 任务状态和控制
|
||
status: 'pending' | 'processing' | 'polling' | 'completed' | 'failed';
|
||
retry_count: number; // 当前重试次数
|
||
max_retries: number; // 最大重试次数(默认3次)
|
||
|
||
// 轮询控制
|
||
next_poll_at?: number; // 下次轮询时间戳(用于控制轮询间隔)
|
||
poll_interval: number; // 轮询间隔(秒,默认10秒)
|
||
task_timeout: number; // 任务超时时间(秒,图片1小时,视频24小时)
|
||
|
||
// 时间戳
|
||
created_at: number; // 创建时间戳(秒)
|
||
updated_at: number; // 更新时间戳(秒)
|
||
started_at?: number; // 开始处理时间戳(秒)
|
||
completed_at?: number; // 完成时间戳(秒)
|
||
|
||
// 错误信息
|
||
error_message?: string; // 错误描述
|
||
fail_code?: string; // 即梦平台返回的失败代码
|
||
}
|
||
```
|
||
|
||
#### 数据库索引设计
|
||
```javascript
|
||
// 复合索引 - 用于轮询查询
|
||
{ "server_id": 1, "status": 1, "next_poll_at": 1 }
|
||
|
||
// 单字段索引
|
||
{ "task_id": 1 } // 主键查询
|
||
{ "created_at": 1 } // 按创建时间清理
|
||
{ "updated_at": 1 } // 按更新时间查询
|
||
```
|
||
|
||
### 1.2 生成结果表 (jimeng_free_generation_results)
|
||
|
||
```typescript
|
||
interface IGenerationResult extends Document {
|
||
task_id: string; // 关联的任务ID,主键
|
||
task_type: 'image' | 'video'; // 任务类型
|
||
server_id: string; // 处理服务器ID
|
||
|
||
// 生成结果
|
||
status: 'success' | 'failed'; // 最终状态
|
||
original_urls: string[]; // 原始URL数组(即梦返回的地址)
|
||
tos_urls: string[]; // TOS URL数组(上传后的地址)
|
||
|
||
// 处理元数据
|
||
metadata: {
|
||
generation_time?: number; // 生成耗时(毫秒)
|
||
tos_upload_time?: number; // TOS上传耗时(毫秒)
|
||
total_files: number; // 文件总数
|
||
successful_uploads: number; // 成功上传数量
|
||
tos_upload_errors?: string[]; // TOS上传错误信息
|
||
fail_reason?: string; // 失败原因
|
||
};
|
||
|
||
// 时间管理
|
||
created_at: number; // 创建时间戳(秒)
|
||
expires_at: number; // 过期时间戳(用于自动清理,默认24小时后)
|
||
}
|
||
```
|
||
|
||
#### 数据库索引设计
|
||
```javascript
|
||
// 主键索引
|
||
{ "task_id": 1 }
|
||
|
||
// 过期清理索引
|
||
{ "expires_at": 1 }
|
||
|
||
// 服务器查询索引
|
||
{ "server_id": 1, "created_at": 1 }
|
||
```
|
||
|
||
## 2. 服务架构设计
|
||
|
||
### 2.1 任务处理流程
|
||
|
||
```mermaid
|
||
graph TD
|
||
A[外部请求] --> B[创建任务记录]
|
||
B --> C[返回任务ID]
|
||
C --> D[定时轮询服务]
|
||
D --> E{检查任务状态}
|
||
E -->|pending| F[开始处理任务]
|
||
E -->|processing| G[调用即梦API]
|
||
E -->|polling| H[检查即梦结果]
|
||
E -->|completed/failed| I[跳过]
|
||
|
||
F --> J[更新为processing]
|
||
J --> K[调用生成API]
|
||
K --> L{API调用结果}
|
||
L -->|成功| M[更新为polling]
|
||
L -->|失败| N[重试或标记失败]
|
||
|
||
M --> O[设置下次轮询时间]
|
||
H --> P{即梦任务完成?}
|
||
P -->|是| Q[处理结果]
|
||
P -->|否| R[更新轮询时间]
|
||
|
||
Q --> S[上传TOS]
|
||
S --> T[创建结果记录]
|
||
T --> U[标记任务完成]
|
||
|
||
V[查询接口] --> W[查找结果记录]
|
||
W --> X{结果存在?}
|
||
X -->|是| Y[返回结果并清理]
|
||
X -->|否| Z[返回进行中状态]
|
||
```
|
||
|
||
### 2.2 定时轮询服务(TaskPollingService)
|
||
|
||
```typescript
|
||
class TaskPollingService {
|
||
private static instance: TaskPollingService;
|
||
private currentServerId: string;
|
||
private pollInterval: NodeJS.Timeout | null = null;
|
||
private isRunning: boolean = false;
|
||
private maxConcurrentTasks: number; // 从环境变量获取,默认3个
|
||
|
||
/**
|
||
* 启动轮询服务
|
||
* 注意:与HeartbeatService分离,使用不同的定时器
|
||
*/
|
||
async start(): Promise<void> {
|
||
this.currentServerId = process.env.SERVICE_ID || 'jimeng-free-api';
|
||
this.maxConcurrentTasks = parseInt(process.env.MAX_CONCURRENT_TASKS || '3');
|
||
|
||
// 每5秒轮询一次(与心跳服务的60秒区分开)
|
||
this.pollInterval = setInterval(async () => {
|
||
await this.processTasks();
|
||
}, 5000);
|
||
|
||
this.isRunning = true;
|
||
logger.info(`Task polling service started for server: ${this.currentServerId}`);
|
||
}
|
||
|
||
/**
|
||
* 主处理方法
|
||
*/
|
||
private async processTasks(): Promise<void> {
|
||
try {
|
||
const currentTime = Math.floor(Date.now() / 1000);
|
||
|
||
// 1. 处理待处理任务
|
||
await this.processPendingTasks(currentTime);
|
||
|
||
// 2. 检查轮询任务
|
||
await this.checkPollingTasks(currentTime);
|
||
|
||
// 3. 检查和处理超时任务
|
||
await this.checkTimeoutTasks(currentTime);
|
||
|
||
} catch (error) {
|
||
logger.error('Task polling error:', error);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 检查超时任务
|
||
*/
|
||
private async checkTimeoutTasks(currentTime: number): Promise<void> {
|
||
try {
|
||
const timeoutCount = await DatabaseCleanupService.cleanupTimeoutTasks();
|
||
if (timeoutCount > 0) {
|
||
logger.warn(`Marked ${timeoutCount} tasks as failed due to timeout`);
|
||
}
|
||
} catch (error) {
|
||
logger.error('Failed to check timeout tasks:', error);
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
### 2.3 与心跳服务的协调
|
||
|
||
- **HeartbeatService**: 每60秒发送一次心跳,管理服务器在线状态
|
||
- **TaskPollingService**: 每5秒轮询一次任务,处理任务状态变更
|
||
- **独立运行**: 两个服务使用不同的定时器,互不干扰
|
||
- **共享配置**: 都使用相同的SERVICE_ID标识服务器
|
||
|
||
## 3. 核心方法重构
|
||
|
||
### 3.1 新的生成方法(保持接口兼容)
|
||
|
||
```typescript
|
||
// 新的数据库驱动方法
|
||
class DatabaseGenerationService {
|
||
|
||
/**
|
||
* 图片生成 - 数据库版本
|
||
*/
|
||
async generateImagesV2(
|
||
model: string,
|
||
taskId: string,
|
||
prompt: string,
|
||
params: any,
|
||
refreshToken: string
|
||
): Promise<void> {
|
||
const currentServerId = process.env.SERVICE_ID || 'jimeng-free-api';
|
||
|
||
// 创建任务记录
|
||
const imageTimeout = parseInt(process.env.IMAGE_TASK_TIMEOUT || '3600');
|
||
|
||
await GenerationTask.create({
|
||
task_id: taskId,
|
||
task_type: 'image',
|
||
server_id: currentServerId,
|
||
original_params: { model, prompt, ...params },
|
||
internal_params: { refresh_token: refreshToken },
|
||
status: 'pending',
|
||
retry_count: 0,
|
||
max_retries: 3,
|
||
poll_interval: 10,
|
||
task_timeout: imageTimeout,
|
||
created_at: Math.floor(Date.now() / 1000),
|
||
updated_at: Math.floor(Date.now() / 1000)
|
||
});
|
||
|
||
logger.info(`Image task created: ${taskId} for server: ${currentServerId}`);
|
||
}
|
||
|
||
/**
|
||
* 视频生成 - 数据库版本
|
||
*/
|
||
async generateVideoV2(
|
||
taskId: string,
|
||
prompt: string,
|
||
params: any,
|
||
refreshToken: string
|
||
): Promise<void> {
|
||
const currentServerId = process.env.SERVICE_ID || 'jimeng-free-api';
|
||
|
||
const videoTimeout = parseInt(process.env.VIDEO_TASK_TIMEOUT || '86400');
|
||
|
||
await GenerationTask.create({
|
||
task_id: taskId,
|
||
task_type: 'video',
|
||
server_id: currentServerId,
|
||
original_params: { prompt, ...params },
|
||
internal_params: { refresh_token: refreshToken },
|
||
status: 'pending',
|
||
retry_count: 0,
|
||
max_retries: 3,
|
||
poll_interval: 15, // 视频轮询间隔更长
|
||
task_timeout: videoTimeout,
|
||
created_at: Math.floor(Date.now() / 1000),
|
||
updated_at: Math.floor(Date.now() / 1000)
|
||
});
|
||
|
||
logger.info(`Video task created: ${taskId} for server: ${currentServerId}`);
|
||
}
|
||
|
||
/**
|
||
* 查询任务结果
|
||
*/
|
||
async queryTaskResult(taskId: string): Promise<any> {
|
||
// 1. 先查询结果表
|
||
const result = await GenerationResult.findOne({ task_id: taskId });
|
||
|
||
if (result) {
|
||
// 找到结果,返回并清理
|
||
const response = {
|
||
created: Math.floor(Date.now() / 1000),
|
||
data: {
|
||
task_id: taskId,
|
||
url: result.tos_urls.join(','),
|
||
status: result.status === 'success' ? -1 : -2
|
||
}
|
||
};
|
||
|
||
// 删除结果记录(一次性消费)
|
||
await GenerationResult.deleteOne({ task_id: taskId });
|
||
|
||
return response;
|
||
}
|
||
|
||
// 2. 查询任务状态
|
||
const task = await GenerationTask.findOne({ task_id: taskId });
|
||
|
||
if (!task) {
|
||
return {
|
||
created: Math.floor(Date.now() / 1000),
|
||
data: { task_id: taskId, url: "", status: 0 } // 任务不存在
|
||
};
|
||
}
|
||
|
||
// 3. 根据任务状态返回
|
||
const statusMap = {
|
||
'pending': 0,
|
||
'processing': 0,
|
||
'polling': 0,
|
||
'failed': -2,
|
||
'completed': -1 // 这种情况理论上不会出现,因为completed会生成result
|
||
};
|
||
|
||
return {
|
||
created: Math.floor(Date.now() / 1000),
|
||
data: {
|
||
task_id: taskId,
|
||
url: "",
|
||
status: statusMap[task.status] || 0
|
||
}
|
||
};
|
||
}
|
||
}
|
||
```
|
||
|
||
### 3.2 兼容性切换机制
|
||
|
||
```typescript
|
||
// 通过环境变量控制新旧方法
|
||
const USE_DATABASE_MODE = process.env.USE_DATABASE_MODE === 'true';
|
||
|
||
// 统一的调用入口
|
||
export async function generateImages(
|
||
model: string,
|
||
taskId: string,
|
||
prompt: string,
|
||
params: any,
|
||
refreshToken: string
|
||
) {
|
||
if (USE_DATABASE_MODE) {
|
||
return await DatabaseGenerationService.generateImagesV2(
|
||
model, taskId, prompt, params, refreshToken
|
||
);
|
||
} else {
|
||
// 调用原有方法
|
||
return await OriginalImageController.generateImages(
|
||
model, taskId, prompt, params, refreshToken
|
||
);
|
||
}
|
||
}
|
||
```
|
||
|
||
## 4. 环境变量配置
|
||
|
||
### 4.1 新增环境变量
|
||
|
||
```bash
|
||
# 数据库模式开关
|
||
USE_DATABASE_MODE=false # 默认false,保持兼容性
|
||
|
||
# 任务处理配置
|
||
MAX_CONCURRENT_TASKS=3 # 最大并发任务数(2核2G建议3个)
|
||
TASK_POLL_INTERVAL=5 # 任务轮询间隔(秒)
|
||
IMAGE_TASK_TIMEOUT=3600 # 图片任务超时时间(秒,默认1小时)
|
||
VIDEO_TASK_TIMEOUT=86400 # 视频任务超时时间(秒,默认24小时)
|
||
RESULT_EXPIRE_TIME=86400 # 结果过期时间(秒,默认24小时)
|
||
|
||
# 服务标识(已有)
|
||
SERVICE_ID=jimeng-api-3302 # 服务唯一标识
|
||
```
|
||
|
||
### 4.2 配置文件更新
|
||
|
||
在 `ecosystem.config.json` 中为每个实例配置不同的 `SERVICE_ID`:
|
||
|
||
```json
|
||
{
|
||
"apps": [
|
||
{
|
||
"name": "jimeng-api-3302",
|
||
"env": {
|
||
"SERVICE_ID": "jimeng-api-3302",
|
||
"PORT": 3302
|
||
}
|
||
},
|
||
{
|
||
"name": "jimeng-api-3303",
|
||
"env": {
|
||
"SERVICE_ID": "jimeng-api-3303",
|
||
"PORT": 3303
|
||
}
|
||
}
|
||
]
|
||
}
|
||
```
|
||
|
||
## 5. 数据库操作优化
|
||
|
||
### 5.1 原子操作设计
|
||
|
||
```typescript
|
||
// 原子更新任务状态,避免并发冲突
|
||
async function atomicUpdateTaskStatus(
|
||
taskId: string,
|
||
fromStatus: string,
|
||
toStatus: string,
|
||
updateData: any = {}
|
||
): Promise<boolean> {
|
||
const result = await GenerationTask.updateOne(
|
||
{
|
||
task_id: taskId,
|
||
status: fromStatus
|
||
},
|
||
{
|
||
status: toStatus,
|
||
updated_at: Math.floor(Date.now() / 1000),
|
||
...updateData
|
||
}
|
||
);
|
||
|
||
return result.modifiedCount > 0;
|
||
}
|
||
```
|
||
|
||
### 5.2 批量查询优化
|
||
|
||
```typescript
|
||
// 批量获取待处理任务
|
||
async function getBatchPendingTasks(serverId: string, limit: number): Promise<IGenerationTask[]> {
|
||
const currentTime = Math.floor(Date.now() / 1000);
|
||
|
||
return await GenerationTask.find({
|
||
server_id: serverId,
|
||
status: 'pending'
|
||
})
|
||
.sort({ created_at: 1 }) // 先入先出
|
||
.limit(limit);
|
||
}
|
||
```
|
||
|
||
## 6. 监控和日志
|
||
|
||
### 6.1 任务状态监控
|
||
|
||
```typescript
|
||
class TaskMonitor {
|
||
static async getTaskStats(serverId?: string): Promise<any> {
|
||
const filter = serverId ? { server_id: serverId } : {};
|
||
|
||
const stats = await GenerationTask.aggregate([
|
||
{ $match: filter },
|
||
{
|
||
$group: {
|
||
_id: "$status",
|
||
count: { $sum: 1 }
|
||
}
|
||
}
|
||
]);
|
||
|
||
return stats.reduce((acc, curr) => {
|
||
acc[curr._id] = curr.count;
|
||
return acc;
|
||
}, {});
|
||
}
|
||
|
||
static async getServerLoad(serverId: string): Promise<number> {
|
||
return await GenerationTask.countDocuments({
|
||
server_id: serverId,
|
||
status: { $in: ['processing', 'polling'] }
|
||
});
|
||
}
|
||
}
|
||
```
|
||
|
||
### 6.2 专用日志文件
|
||
|
||
```typescript
|
||
// 创建专用的任务轮询日志
|
||
const TASK_POLLING_LOG_PATH = path.resolve("./logs/task_polling.log");
|
||
|
||
function taskLog(message: string) {
|
||
const timestamp = formatInTimeZone(new Date(), timeZone, "yyyy-MM-dd HH:mm:ss.SSS");
|
||
const logMessage = `[TaskPolling][${timestamp}] ${message}`;
|
||
|
||
fs.ensureDirSync(path.dirname(TASK_POLLING_LOG_PATH));
|
||
fs.appendFileSync(TASK_POLLING_LOG_PATH, logMessage + "\n");
|
||
}
|
||
```
|
||
|
||
## 7. 数据清理策略
|
||
|
||
### 7.1 自动清理任务
|
||
|
||
```typescript
|
||
class DatabaseCleanupService {
|
||
/**
|
||
* 清理过期的结果记录
|
||
*/
|
||
static async cleanupExpiredResults(): Promise<number> {
|
||
const currentTime = Math.floor(Date.now() / 1000);
|
||
|
||
const result = await GenerationResult.deleteMany({
|
||
expires_at: { $lt: currentTime }
|
||
});
|
||
|
||
return result.deletedCount;
|
||
}
|
||
|
||
/**
|
||
* 清理超时的任务
|
||
*/
|
||
static async cleanupTimeoutTasks(): Promise<number> {
|
||
const currentTime = Math.floor(Date.now() / 1000);
|
||
|
||
// 查找超时的任务并标记为失败
|
||
const timeoutTasks = await GenerationTask.find({
|
||
status: { $in: ['processing', 'polling'] },
|
||
$expr: {
|
||
$gt: [
|
||
{ $subtract: [currentTime, '$started_at'] },
|
||
'$task_timeout'
|
||
]
|
||
}
|
||
});
|
||
|
||
// 批量更新超时任务为失败状态
|
||
const timeoutTaskIds = timeoutTasks.map(task => task.task_id);
|
||
|
||
if (timeoutTaskIds.length > 0) {
|
||
await GenerationTask.updateMany(
|
||
{ task_id: { $in: timeoutTaskIds } },
|
||
{
|
||
status: 'failed',
|
||
error_message: 'Task timeout',
|
||
updated_at: currentTime,
|
||
completed_at: currentTime
|
||
}
|
||
);
|
||
|
||
// 为超时任务创建失败结果记录
|
||
const failedResults = timeoutTasks.map(task => ({
|
||
task_id: task.task_id,
|
||
task_type: task.task_type,
|
||
server_id: task.server_id,
|
||
status: 'failed',
|
||
original_urls: [],
|
||
tos_urls: [],
|
||
metadata: {
|
||
total_files: 0,
|
||
successful_uploads: 0,
|
||
fail_reason: 'Task timeout after ' + task.task_timeout + ' seconds'
|
||
},
|
||
created_at: currentTime,
|
||
expires_at: currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400')
|
||
}));
|
||
|
||
if (failedResults.length > 0) {
|
||
await GenerationResult.insertMany(failedResults);
|
||
}
|
||
}
|
||
|
||
return timeoutTaskIds.length;
|
||
}
|
||
}
|
||
```
|
||
|
||
### 7.2 定期清理任务
|
||
|
||
集成到轮询服务中,每小时执行一次清理:
|
||
|
||
```typescript
|
||
// 在TaskPollingService中添加
|
||
private cleanupCounter = 0;
|
||
|
||
private async processTasks(): Promise<void> {
|
||
// ... 现有逻辑 ...
|
||
|
||
// 每720次轮询(1小时)执行一次清理
|
||
this.cleanupCounter++;
|
||
if (this.cleanupCounter >= 720) {
|
||
await this.performCleanup();
|
||
this.cleanupCounter = 0;
|
||
}
|
||
}
|
||
```
|
||
|
||
## 8. 迁移计划
|
||
|
||
### Phase 1: 准备阶段
|
||
- [ ] 创建数据模型文件
|
||
- [ ] 实现新的生成服务类
|
||
- [ ] 添加轮询服务框架
|
||
- [ ] 更新环境变量配置
|
||
|
||
### Phase 2: 测试阶段
|
||
- [ ] 单元测试编写
|
||
- [ ] 本地环境测试
|
||
- [ ] 小流量灰度测试(USE_DATABASE_MODE=true)
|
||
|
||
### Phase 3: 切换阶段
|
||
- [ ] 全量切换到数据库模式
|
||
- [ ] 监控系统稳定性
|
||
- [ ] 性能调优
|
||
|
||
### Phase 4: 清理阶段
|
||
- [ ] 移除旧的内存缓存代码
|
||
- [ ] 清理不再使用的配置
|
||
- [ ] 文档更新
|
||
|
||
## 9. 风险评估和应对
|
||
|
||
### 9.1 主要风险
|
||
|
||
1. **数据库性能瓶颈**
|
||
- 风险:高频轮询可能对MongoDB造成压力
|
||
- 应对:优化索引、控制轮询频率、使用连接池
|
||
|
||
2. **任务丢失风险**
|
||
- 风险:服务器宕机时正在处理的任务可能丢失
|
||
- 应对:实现任务恢复机制、超时重试
|
||
|
||
3. **状态不一致**
|
||
- 风险:并发更新可能导致状态不一致
|
||
- 应对:使用原子操作、乐观锁
|
||
|
||
### 9.2 回滚方案
|
||
|
||
- 保持 `USE_DATABASE_MODE=false` 可随时切回原有方案
|
||
- 原有代码完整保留,确保回滚路径畅通
|
||
- 数据库数据不影响原有内存缓存功能
|
||
|
||
## 10. 性能指标
|
||
|
||
### 10.1 目标指标
|
||
|
||
- **任务处理延迟**: < 10秒(从pending到processing)
|
||
- **轮询响应时间**: < 100ms
|
||
- **数据库查询时间**: < 50ms
|
||
- **任务成功率**: > 99%
|
||
- **内存使用**: < 512MB(单实例)
|
||
|
||
### 10.2 监控指标
|
||
|
||
- 各状态任务数量统计
|
||
- 平均任务处理时间
|
||
- 服务器负载情况
|
||
- 数据库连接状态
|
||
- 错误率统计
|
||
|
||
---
|
||
|
||
## 总结
|
||
|
||
这个改造方案将现有的内存缓存架构平滑迁移到MongoDB数据库,保持了接口兼容性,支持多服务器部署,实现了任务的持久化和可靠处理。每个服务器只需要关注自己的任务,简化了架构复杂度,提高了系统的可维护性和可扩展性。
|
||
|
||
## 核对清单
|
||
|
||
### ✅ 已完善的部分
|
||
|
||
1. **接口范围明确**:
|
||
- ✅ 明确了新策略适用的接口:`/v1/images/generations`、`/v1/video/generations` 及对应的 query 接口
|
||
- ✅ 明确了保持现状的接口:`/v1/upload/images`(同步接口,不使用缓存策略)
|
||
|
||
2. **超时配置完善**:
|
||
- ✅ 添加了任务级别的超时配置:`task_timeout` 字段
|
||
- ✅ 区分了图片和视频的超时时间:图片1小时,视频24小时
|
||
- ✅ 实现了超时任务自动标记为失败的机制
|
||
- ✅ 在轮询服务中集成了超时检查逻辑
|
||
|
||
3. **配置文件更新**:
|
||
- ✅ 更新了 `template.ecosystem.config.json`,添加了所有新的环境变量
|
||
- ✅ 更新了 `template.docker-compose.yml`,添加了数据库模式和超时配置
|
||
- ✅ 为每个服务实例配置了独立的超时参数
|
||
|
||
4. **数据库设计优化**:
|
||
- ✅ 任务表增加了 `task_timeout` 字段
|
||
- ✅ 超时清理逻辑会创建失败结果记录,保证query接口的一致性
|
||
- ✅ 实现了原子操作避免并发冲突
|
||
|
||
### 📋 环境变量配置总览
|
||
|
||
| 变量名 | 默认值 | 说明 |
|
||
|--------|--------|------|
|
||
| `USE_DATABASE_MODE` | `false` | 数据库模式开关,控制新旧方法切换 |
|
||
| `MAX_CONCURRENT_TASKS` | `3` | 最大并发任务数(2核2G服务器建议值) |
|
||
| `TASK_POLL_INTERVAL` | `5` | 轮询间隔(秒) |
|
||
| `IMAGE_TASK_TIMEOUT` | `3600` | 图片任务超时时间(1小时) |
|
||
| `VIDEO_TASK_TIMEOUT` | `86400` | 视频任务超时时间(24小时) |
|
||
| `RESULT_EXPIRE_TIME` | `86400` | 结果过期时间(24小时) |
|
||
| `SERVICE_ID` | - | 服务唯一标识(必须配置) |
|
||
|
||
### 🔄 任务状态流转确认
|
||
|
||
```
|
||
pending → processing → polling → completed/failed
|
||
↓ ↓ ↓
|
||
超时 超时 超时
|
||
↓ ↓ ↓
|
||
failed failed failed
|
||
```
|
||
|
||
### 🚫 不受影响的接口
|
||
|
||
- `/v1/upload/images` - 同步上传接口,保持现有实现
|
||
- 所有其他同步接口保持不变
|
||
- 现有的 `ImagesTaskCache` 和 `VideoTaskCache` 内存缓存在 `USE_DATABASE_MODE=false` 时继续正常工作
|
||
|
||
### ⚠️ 注意事项
|
||
|
||
1. **渐进式迁移**:通过 `USE_DATABASE_MODE` 环境变量控制,可以随时回滚
|
||
2. **超时处理**:超时任务会自动创建失败结果记录,确保query接口行为一致
|
||
3. **服务隔离**:每个服务只处理自己的任务,通过 `SERVICE_ID` 区分
|
||
4. **轮询分离**:任务轮询服务(5秒间隔)与心跳服务(60秒间隔)完全独立
|
||
5. **配置灵活**:所有关键参数都通过环境变量配置,支持不同环境的差异化设置 |