缓存改造 1

This commit is contained in:
jonathang4 2025-08-27 15:00:38 +08:00
parent cadee11ab9
commit 3804c58913
9 changed files with 1168 additions and 887 deletions

View File

@ -0,0 +1,755 @@
# MongoDB架构改造方案
## 项目概述
将现有的内存缓存架构改造为基于MongoDB的数据库持久化方案支持多服务器环境下的任务处理每个Node服务只处理分配给自己的任务。
## 适用范围
本方案仅适用于以下异步生成接口,不影响其他同步接口:
### 新策略适用接口
- `/v1/images/generations` - 图片生成接口(异步)
- `/v1/video/generations` - 视频生成接口(异步)
- `/v1/images/query` - 图片任务查询接口
- `/v1/video/query` - 视频任务查询接口
### 保持现状的接口
- `/v1/upload/images` - 图片上传接口(同步,不使用缓存策略,保持现有实现)
- 其他所有同步接口保持不变
## 1. 数据表设计
### 1.1 生成任务表 (jimeng_free_generation_tasks)
```typescript
interface IGenerationTask extends Document {
task_id: string; // 任务唯一标识,主键
task_type: 'image' | 'video'; // 任务类型
server_id: string; // 分配的服务器ID外部分配对应SERVICE_ID
// 原始请求参数
original_params: {
model?: string; // 模型名称
prompt: string; // 提示词
negative_prompt?: string; // 负向提示词
width?: number; // 宽度
height?: number; // 高度
sample_strength?: number; // 采样强度
images?: Array<{ // 图片参数(视频生成用)
url: string;
width: number;
height: number;
}>;
is_pro?: boolean; // 是否专业版
duration?: number; // 时长(毫秒)
ratio?: string; // 比例
response_format?: string; // 响应格式
};
// 生成过程中的内部参数
internal_params: {
refresh_token: string; // 认证令牌
component_id?: string; // 组件ID
history_id?: string; // 即梦平台历史记录ID
mapped_model?: string; // 映射后的模型名
submit_id?: string; // 提交ID
};
// 任务状态和控制
status: 'pending' | 'processing' | 'polling' | 'completed' | 'failed';
retry_count: number; // 当前重试次数
max_retries: number; // 最大重试次数默认3次
// 轮询控制
next_poll_at?: number; // 下次轮询时间戳(用于控制轮询间隔)
poll_interval: number; // 轮询间隔默认10秒
task_timeout: number; // 任务超时时间图片1小时视频24小时
// 时间戳
created_at: number; // 创建时间戳(秒)
updated_at: number; // 更新时间戳(秒)
started_at?: number; // 开始处理时间戳(秒)
completed_at?: number; // 完成时间戳(秒)
// 错误信息
error_message?: string; // 错误描述
fail_code?: string; // 即梦平台返回的失败代码
}
```
#### 数据库索引设计
```javascript
// 复合索引 - 用于轮询查询
{ "server_id": 1, "status": 1, "next_poll_at": 1 }
// 单字段索引
{ "task_id": 1 } // 主键查询
{ "created_at": 1 } // 按创建时间清理
{ "updated_at": 1 } // 按更新时间查询
```
### 1.2 生成结果表 (jimeng_free_generation_results)
```typescript
interface IGenerationResult extends Document {
task_id: string; // 关联的任务ID主键
task_type: 'image' | 'video'; // 任务类型
server_id: string; // 处理服务器ID
// 生成结果
status: 'success' | 'failed'; // 最终状态
original_urls: string[]; // 原始URL数组即梦返回的地址
tos_urls: string[]; // TOS URL数组上传后的地址
// 处理元数据
metadata: {
generation_time?: number; // 生成耗时(毫秒)
tos_upload_time?: number; // TOS上传耗时毫秒
total_files: number; // 文件总数
successful_uploads: number; // 成功上传数量
tos_upload_errors?: string[]; // TOS上传错误信息
fail_reason?: string; // 失败原因
};
// 时间管理
created_at: number; // 创建时间戳(秒)
expires_at: number; // 过期时间戳用于自动清理默认24小时后
}
```
#### 数据库索引设计
```javascript
// 主键索引
{ "task_id": 1 }
// 过期清理索引
{ "expires_at": 1 }
// 服务器查询索引
{ "server_id": 1, "created_at": 1 }
```
## 2. 服务架构设计
### 2.1 任务处理流程
```mermaid
graph TD
A[外部请求] --> B[创建任务记录]
B --> C[返回任务ID]
C --> D[定时轮询服务]
D --> E{检查任务状态}
E -->|pending| F[开始处理任务]
E -->|processing| G[调用即梦API]
E -->|polling| H[检查即梦结果]
E -->|completed/failed| I[跳过]
F --> J[更新为processing]
J --> K[调用生成API]
K --> L{API调用结果}
L -->|成功| M[更新为polling]
L -->|失败| N[重试或标记失败]
M --> O[设置下次轮询时间]
H --> P{即梦任务完成?}
P -->|是| Q[处理结果]
P -->|否| R[更新轮询时间]
Q --> S[上传TOS]
S --> T[创建结果记录]
T --> U[标记任务完成]
V[查询接口] --> W[查找结果记录]
W --> X{结果存在?}
X -->|是| Y[返回结果并清理]
X -->|否| Z[返回进行中状态]
```
### 2.2 定时轮询服务TaskPollingService
```typescript
class TaskPollingService {
private static instance: TaskPollingService;
private currentServerId: string;
private pollInterval: NodeJS.Timeout | null = null;
private isRunning: boolean = false;
private maxConcurrentTasks: number; // 从环境变量获取默认3个
/**
* 启动轮询服务
* 注意与HeartbeatService分离使用不同的定时器
*/
async start(): Promise<void> {
this.currentServerId = process.env.SERVICE_ID || 'jimeng-free-api';
this.maxConcurrentTasks = parseInt(process.env.MAX_CONCURRENT_TASKS || '3');
// 每5秒轮询一次与心跳服务的60秒区分开
this.pollInterval = setInterval(async () => {
await this.processTasks();
}, 5000);
this.isRunning = true;
logger.info(`Task polling service started for server: ${this.currentServerId}`);
}
/**
* 主处理方法
*/
private async processTasks(): Promise<void> {
try {
const currentTime = Math.floor(Date.now() / 1000);
// 1. 处理待处理任务
await this.processPendingTasks(currentTime);
// 2. 检查轮询任务
await this.checkPollingTasks(currentTime);
// 3. 检查和处理超时任务
await this.checkTimeoutTasks(currentTime);
} catch (error) {
logger.error('Task polling error:', error);
}
}
/**
* 检查超时任务
*/
private async checkTimeoutTasks(currentTime: number): Promise<void> {
try {
const timeoutCount = await DatabaseCleanupService.cleanupTimeoutTasks();
if (timeoutCount > 0) {
logger.warn(`Marked ${timeoutCount} tasks as failed due to timeout`);
}
} catch (error) {
logger.error('Failed to check timeout tasks:', error);
}
}
}
```
### 2.3 与心跳服务的协调
- **HeartbeatService**: 每60秒发送一次心跳管理服务器在线状态
- **TaskPollingService**: 每5秒轮询一次任务处理任务状态变更
- **独立运行**: 两个服务使用不同的定时器,互不干扰
- **共享配置**: 都使用相同的SERVICE_ID标识服务器
## 3. 核心方法重构
### 3.1 新的生成方法(保持接口兼容)
```typescript
// 新的数据库驱动方法
class DatabaseGenerationService {
/**
* 图片生成 - 数据库版本
*/
async generateImagesV2(
model: string,
taskId: string,
prompt: string,
params: any,
refreshToken: string
): Promise<void> {
const currentServerId = process.env.SERVICE_ID || 'jimeng-free-api';
// 创建任务记录
const imageTimeout = parseInt(process.env.IMAGE_TASK_TIMEOUT || '3600');
await GenerationTask.create({
task_id: taskId,
task_type: 'image',
server_id: currentServerId,
original_params: { model, prompt, ...params },
internal_params: { refresh_token: refreshToken },
status: 'pending',
retry_count: 0,
max_retries: 3,
poll_interval: 10,
task_timeout: imageTimeout,
created_at: Math.floor(Date.now() / 1000),
updated_at: Math.floor(Date.now() / 1000)
});
logger.info(`Image task created: ${taskId} for server: ${currentServerId}`);
}
/**
* 视频生成 - 数据库版本
*/
async generateVideoV2(
taskId: string,
prompt: string,
params: any,
refreshToken: string
): Promise<void> {
const currentServerId = process.env.SERVICE_ID || 'jimeng-free-api';
const videoTimeout = parseInt(process.env.VIDEO_TASK_TIMEOUT || '86400');
await GenerationTask.create({
task_id: taskId,
task_type: 'video',
server_id: currentServerId,
original_params: { prompt, ...params },
internal_params: { refresh_token: refreshToken },
status: 'pending',
retry_count: 0,
max_retries: 3,
poll_interval: 15, // 视频轮询间隔更长
task_timeout: videoTimeout,
created_at: Math.floor(Date.now() / 1000),
updated_at: Math.floor(Date.now() / 1000)
});
logger.info(`Video task created: ${taskId} for server: ${currentServerId}`);
}
/**
* 查询任务结果
*/
async queryTaskResult(taskId: string): Promise<any> {
// 1. 先查询结果表
const result = await GenerationResult.findOne({ task_id: taskId });
if (result) {
// 找到结果,返回并清理
const response = {
created: Math.floor(Date.now() / 1000),
data: {
task_id: taskId,
url: result.tos_urls.join(','),
status: result.status === 'success' ? -1 : -2
}
};
// 删除结果记录(一次性消费)
await GenerationResult.deleteOne({ task_id: taskId });
return response;
}
// 2. 查询任务状态
const task = await GenerationTask.findOne({ task_id: taskId });
if (!task) {
return {
created: Math.floor(Date.now() / 1000),
data: { task_id: taskId, url: "", status: 0 } // 任务不存在
};
}
// 3. 根据任务状态返回
const statusMap = {
'pending': 0,
'processing': 0,
'polling': 0,
'failed': -2,
'completed': -1 // 这种情况理论上不会出现因为completed会生成result
};
return {
created: Math.floor(Date.now() / 1000),
data: {
task_id: taskId,
url: "",
status: statusMap[task.status] || 0
}
};
}
}
```
### 3.2 兼容性切换机制
```typescript
// 通过环境变量控制新旧方法
const USE_DATABASE_MODE = process.env.USE_DATABASE_MODE === 'true';
// 统一的调用入口
export async function generateImages(
model: string,
taskId: string,
prompt: string,
params: any,
refreshToken: string
) {
if (USE_DATABASE_MODE) {
return await DatabaseGenerationService.generateImagesV2(
model, taskId, prompt, params, refreshToken
);
} else {
// 调用原有方法
return await OriginalImageController.generateImages(
model, taskId, prompt, params, refreshToken
);
}
}
```
## 4. 环境变量配置
### 4.1 新增环境变量
```bash
# 数据库模式开关
USE_DATABASE_MODE=false # 默认false保持兼容性
# 任务处理配置
MAX_CONCURRENT_TASKS=3 # 最大并发任务数2核2G建议3个
TASK_POLL_INTERVAL=5 # 任务轮询间隔(秒)
IMAGE_TASK_TIMEOUT=3600 # 图片任务超时时间默认1小时
VIDEO_TASK_TIMEOUT=86400 # 视频任务超时时间默认24小时
RESULT_EXPIRE_TIME=86400 # 结果过期时间默认24小时
# 服务标识(已有)
SERVICE_ID=jimeng-api-3302 # 服务唯一标识
```
### 4.2 配置文件更新
`ecosystem.config.json` 中为每个实例配置不同的 `SERVICE_ID`
```json
{
"apps": [
{
"name": "jimeng-api-3302",
"env": {
"SERVICE_ID": "jimeng-api-3302",
"PORT": 3302
}
},
{
"name": "jimeng-api-3303",
"env": {
"SERVICE_ID": "jimeng-api-3303",
"PORT": 3303
}
}
]
}
```
## 5. 数据库操作优化
### 5.1 原子操作设计
```typescript
// 原子更新任务状态,避免并发冲突
async function atomicUpdateTaskStatus(
taskId: string,
fromStatus: string,
toStatus: string,
updateData: any = {}
): Promise<boolean> {
const result = await GenerationTask.updateOne(
{
task_id: taskId,
status: fromStatus
},
{
status: toStatus,
updated_at: Math.floor(Date.now() / 1000),
...updateData
}
);
return result.modifiedCount > 0;
}
```
### 5.2 批量查询优化
```typescript
// 批量获取待处理任务
async function getBatchPendingTasks(serverId: string, limit: number): Promise<IGenerationTask[]> {
const currentTime = Math.floor(Date.now() / 1000);
return await GenerationTask.find({
server_id: serverId,
status: 'pending'
})
.sort({ created_at: 1 }) // 先入先出
.limit(limit);
}
```
## 6. 监控和日志
### 6.1 任务状态监控
```typescript
class TaskMonitor {
static async getTaskStats(serverId?: string): Promise<any> {
const filter = serverId ? { server_id: serverId } : {};
const stats = await GenerationTask.aggregate([
{ $match: filter },
{
$group: {
_id: "$status",
count: { $sum: 1 }
}
}
]);
return stats.reduce((acc, curr) => {
acc[curr._id] = curr.count;
return acc;
}, {});
}
static async getServerLoad(serverId: string): Promise<number> {
return await GenerationTask.countDocuments({
server_id: serverId,
status: { $in: ['processing', 'polling'] }
});
}
}
```
### 6.2 专用日志文件
```typescript
// 创建专用的任务轮询日志
const TASK_POLLING_LOG_PATH = path.resolve("./logs/task_polling.log");
function taskLog(message: string) {
const timestamp = formatInTimeZone(new Date(), timeZone, "yyyy-MM-dd HH:mm:ss.SSS");
const logMessage = `[TaskPolling][${timestamp}] ${message}`;
fs.ensureDirSync(path.dirname(TASK_POLLING_LOG_PATH));
fs.appendFileSync(TASK_POLLING_LOG_PATH, logMessage + "\n");
}
```
## 7. 数据清理策略
### 7.1 自动清理任务
```typescript
class DatabaseCleanupService {
/**
* 清理过期的结果记录
*/
static async cleanupExpiredResults(): Promise<number> {
const currentTime = Math.floor(Date.now() / 1000);
const result = await GenerationResult.deleteMany({
expires_at: { $lt: currentTime }
});
return result.deletedCount;
}
/**
* 清理超时的任务
*/
static async cleanupTimeoutTasks(): Promise<number> {
const currentTime = Math.floor(Date.now() / 1000);
// 查找超时的任务并标记为失败
const timeoutTasks = await GenerationTask.find({
status: { $in: ['processing', 'polling'] },
$expr: {
$gt: [
{ $subtract: [currentTime, '$started_at'] },
'$task_timeout'
]
}
});
// 批量更新超时任务为失败状态
const timeoutTaskIds = timeoutTasks.map(task => task.task_id);
if (timeoutTaskIds.length > 0) {
await GenerationTask.updateMany(
{ task_id: { $in: timeoutTaskIds } },
{
status: 'failed',
error_message: 'Task timeout',
updated_at: currentTime,
completed_at: currentTime
}
);
// 为超时任务创建失败结果记录
const failedResults = timeoutTasks.map(task => ({
task_id: task.task_id,
task_type: task.task_type,
server_id: task.server_id,
status: 'failed',
original_urls: [],
tos_urls: [],
metadata: {
total_files: 0,
successful_uploads: 0,
fail_reason: 'Task timeout after ' + task.task_timeout + ' seconds'
},
created_at: currentTime,
expires_at: currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400')
}));
if (failedResults.length > 0) {
await GenerationResult.insertMany(failedResults);
}
}
return timeoutTaskIds.length;
}
}
```
### 7.2 定期清理任务
集成到轮询服务中,每小时执行一次清理:
```typescript
// 在TaskPollingService中添加
private cleanupCounter = 0;
private async processTasks(): Promise<void> {
// ... 现有逻辑 ...
// 每720次轮询1小时执行一次清理
this.cleanupCounter++;
if (this.cleanupCounter >= 720) {
await this.performCleanup();
this.cleanupCounter = 0;
}
}
```
## 8. 迁移计划
### Phase 1: 准备阶段
- [ ] 创建数据模型文件
- [ ] 实现新的生成服务类
- [ ] 添加轮询服务框架
- [ ] 更新环境变量配置
### Phase 2: 测试阶段
- [ ] 单元测试编写
- [ ] 本地环境测试
- [ ] 小流量灰度测试USE_DATABASE_MODE=true
### Phase 3: 切换阶段
- [ ] 全量切换到数据库模式
- [ ] 监控系统稳定性
- [ ] 性能调优
### Phase 4: 清理阶段
- [ ] 移除旧的内存缓存代码
- [ ] 清理不再使用的配置
- [ ] 文档更新
## 9. 风险评估和应对
### 9.1 主要风险
1. **数据库性能瓶颈**
- 风险高频轮询可能对MongoDB造成压力
- 应对:优化索引、控制轮询频率、使用连接池
2. **任务丢失风险**
- 风险:服务器宕机时正在处理的任务可能丢失
- 应对:实现任务恢复机制、超时重试
3. **状态不一致**
- 风险:并发更新可能导致状态不一致
- 应对:使用原子操作、乐观锁
### 9.2 回滚方案
- 保持 `USE_DATABASE_MODE=false` 可随时切回原有方案
- 原有代码完整保留,确保回滚路径畅通
- 数据库数据不影响原有内存缓存功能
## 10. 性能指标
### 10.1 目标指标
- **任务处理延迟**: < 10秒从pending到processing
- **轮询响应时间**: < 100ms
- **数据库查询时间**: < 50ms
- **任务成功率**: > 99%
- **内存使用**: < 512MB单实例
### 10.2 监控指标
- 各状态任务数量统计
- 平均任务处理时间
- 服务器负载情况
- 数据库连接状态
- 错误率统计
---
## 总结
这个改造方案将现有的内存缓存架构平滑迁移到MongoDB数据库保持了接口兼容性支持多服务器部署实现了任务的持久化和可靠处理。每个服务器只需要关注自己的任务简化了架构复杂度提高了系统的可维护性和可扩展性。
## 核对清单
### ✅ 已完善的部分
1. **接口范围明确**
- ✅ 明确了新策略适用的接口:`/v1/images/generations``/v1/video/generations` 及对应的 query 接口
- ✅ 明确了保持现状的接口:`/v1/upload/images`(同步接口,不使用缓存策略)
2. **超时配置完善**
- ✅ 添加了任务级别的超时配置:`task_timeout` 字段
- ✅ 区分了图片和视频的超时时间图片1小时视频24小时
- ✅ 实现了超时任务自动标记为失败的机制
- ✅ 在轮询服务中集成了超时检查逻辑
3. **配置文件更新**
- ✅ 更新了 `template.ecosystem.config.json`,添加了所有新的环境变量
- ✅ 更新了 `template.docker-compose.yml`,添加了数据库模式和超时配置
- ✅ 为每个服务实例配置了独立的超时参数
4. **数据库设计优化**
- ✅ 任务表增加了 `task_timeout` 字段
- ✅ 超时清理逻辑会创建失败结果记录保证query接口的一致性
- ✅ 实现了原子操作避免并发冲突
### 📋 环境变量配置总览
| 变量名 | 默认值 | 说明 |
|--------|--------|------|
| `USE_DATABASE_MODE` | `false` | 数据库模式开关,控制新旧方法切换 |
| `MAX_CONCURRENT_TASKS` | `3` | 最大并发任务数2核2G服务器建议值 |
| `TASK_POLL_INTERVAL` | `5` | 轮询间隔(秒) |
| `IMAGE_TASK_TIMEOUT` | `3600` | 图片任务超时时间1小时 |
| `VIDEO_TASK_TIMEOUT` | `86400` | 视频任务超时时间24小时 |
| `RESULT_EXPIRE_TIME` | `86400` | 结果过期时间24小时 |
| `SERVICE_ID` | - | 服务唯一标识(必须配置) |
### 🔄 任务状态流转确认
```
pending → processing → polling → completed/failed
↓ ↓ ↓
超时 超时 超时
↓ ↓ ↓
failed failed failed
```
### 🚫 不受影响的接口
- `/v1/upload/images` - 同步上传接口,保持现有实现
- 所有其他同步接口保持不变
- 现有的 `ImagesTaskCache``VideoTaskCache` 内存缓存在 `USE_DATABASE_MODE=false` 时继续正常工作
### ⚠️ 注意事项
1. **渐进式迁移**:通过 `USE_DATABASE_MODE` 环境变量控制,可以随时回滚
2. **超时处理**超时任务会自动创建失败结果记录确保query接口行为一致
3. **服务隔离**:每个服务只处理自己的任务,通过 `SERVICE_ID` 区分
4. **轮询分离**任务轮询服务5秒间隔与心跳服务60秒间隔完全独立
5. **配置灵活**:所有关键参数都通过环境变量配置,支持不同环境的差异化设置

View File

@ -3,6 +3,8 @@ import path from 'path';
// import { format as dateFormat } from 'date-fns'; // import { format as dateFormat } from 'date-fns';
const timeZone = 'Asia/Shanghai'; // Beijing Time const timeZone = 'Asia/Shanghai'; // Beijing Time
import { formatInTimeZone } from 'date-fns-tz'; import { formatInTimeZone } from 'date-fns-tz';
import TOSService from '@/lib/tos/tos-service.js';
import logger from '@/lib/logger.js';
const LOG_PATH = path.resolve("./logs/images_task_cache.log"); const LOG_PATH = path.resolve("./logs/images_task_cache.log");
@ -23,11 +25,15 @@ export class ImagesTaskCache {
private static instance: ImagesTaskCache; private static instance: ImagesTaskCache;
private taskCache: Map<string, number|string>; private taskCache: Map<string, number|string>;
private tosProcessedTasks: Set<string>; // 记录已处理TOS上传的任务 private tosProcessedTasks: Set<string>; // 记录已处理TOS上传的任务
private cleanupInterval: NodeJS.Timeout | null = null;
private constructor() { private constructor() {
this.taskCache = new Map<string, number|string>(); this.taskCache = new Map<string, number|string>();
this.tosProcessedTasks = new Set<string>(); this.tosProcessedTasks = new Set<string>();
cacheLog("ImagesTaskCache initialized"); cacheLog("ImagesTaskCache initialized");
// 启动定时清理任务每30分钟
this.startPeriodicCleanup();
} }
public static getInstance(): ImagesTaskCache { public static getInstance(): ImagesTaskCache {
@ -37,47 +43,181 @@ export class ImagesTaskCache {
return ImagesTaskCache.instance; return ImagesTaskCache.instance;
} }
/**
*
*/
private startPeriodicCleanup(): void {
// 每30分钟清理一次过期任务
this.cleanupInterval = setInterval(() => {
this.clearExpiredTasks();
}, 30 * 60 * 1000); // 30分钟
cacheLog("Periodic cleanup started for ImagesTaskCache");
}
/**
*
*/
public stopPeriodicCleanup(): void {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
this.cleanupInterval = null;
cacheLog("Periodic cleanup stopped for ImagesTaskCache");
}
}
public startTask(taskId: string): void { public startTask(taskId: string): void {
const startTime = Math.floor(Date.now() / 1000); // Current time in seconds const startTime = Math.floor(Date.now() / 1000); // Current time in seconds
this.taskCache.set(taskId, startTime); this.taskCache.set(taskId, startTime);
cacheLog(`Task started: ${taskId} at ${startTime}`); cacheLog(`Task started: ${taskId} at ${startTime}`);
} }
public finishTask(taskId: string, status: -1 | -2 | -3, url:string = ''): void { /**
if (this.taskCache.has(taskId)) { * URL上传到TOS
this.taskCache.set(taskId, status); * @param imageUrls URL数组
* @returns TOS URL数组
*/
private async uploadImagesToTOS(imageUrls: string[]): Promise<string[]> {
const tosUrls: string[] = [];
for (const imageUrl of imageUrls) {
try {
// 从URL获取文件名
const fileName = `image-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.webp`;
// 上传到TOS
const tosUrl = await TOSService.uploadFromUrl(imageUrl, `images/${fileName}`);
tosUrls.push(tosUrl);
logger.info(`图片上传到TOS成功: ${imageUrl} -> ${tosUrl}`);
} catch (error) {
logger.error(`图片上传到TOS失败: ${imageUrl}`, error);
// 如果上传失败保留原URL
tosUrls.push(imageUrl);
}
}
return tosUrls;
}
public async finishTask(taskId: string, status: -1 | -2 | -3, url: string = ''): Promise<void> {
if (!this.taskCache.has(taskId)) {
cacheLog(`Attempted to finish non-existent task: ${taskId}`);
return;
}
let finalUrl = url;
let statusMessage = ''; let statusMessage = '';
switch (status) { switch (status) {
case -1: case -1: {
{
statusMessage = 'successfully'; statusMessage = 'successfully';
if (url) { if (url) {
this.taskCache.set(taskId, url); try {
// 任务成功完成时自动上传到TOS
cacheLog(`开始上传图片到TOS: ${taskId}`);
const imageUrls = url.split(',');
const tosUrls = await this.uploadImagesToTOS(imageUrls);
finalUrl = tosUrls.join(',');
this.tosProcessedTasks.add(taskId);
cacheLog(`Task ${taskId} TOS上传完成存储TOS地址: ${finalUrl}`);
} catch (error) {
logger.error(`TOS上传失败使用原始URL: ${taskId}`, error);
finalUrl = url; // 保留原始URL
cacheLog(`Task ${taskId} TOS上传失败使用原始URL`);
} }
} }
break; break;
}
case -2: statusMessage = 'failed'; break; case -2: statusMessage = 'failed'; break;
case -3: statusMessage = 'timed out'; break; case -3: statusMessage = 'timed out'; break;
} }
// 存储最终URLTOS地址或原始URL
this.taskCache.set(taskId, finalUrl || status);
cacheLog(`Task ${taskId} finished ${statusMessage} (status: ${status})`); cacheLog(`Task ${taskId} finished ${statusMessage} (status: ${status})`);
} else {
cacheLog(`Attempted to finish non-existent task: ${taskId}`);
}
} }
public getTaskStatus(taskId: string): number | string | undefined { public getTaskStatus(taskId: string): number | string | undefined {
return this.taskCache.get(taskId); return this.taskCache.get(taskId);
} }
/**
* TOS上传
*/
public isTosProcessed(taskId: string): boolean { public isTosProcessed(taskId: string): boolean {
return this.tosProcessedTasks.has(taskId); return this.tosProcessedTasks.has(taskId);
} }
/**
* TOS上传
*/
public markTosProcessed(taskId: string): void { public markTosProcessed(taskId: string): void {
this.tosProcessedTasks.add(taskId); this.tosProcessedTasks.add(taskId);
cacheLog(`Task ${taskId} marked as TOS processed`); cacheLog(`Task ${taskId} marked as TOS processed`);
} }
/**
*
* @param taskId ID
* @returns undefined
*/
public getTaskResultAndClear(taskId: string): number | string | undefined {
const result = this.taskCache.get(taskId);
if (result && typeof result === 'string') {
// 只有当任务完成时返回字符串URL才清除缓存
this.taskCache.delete(taskId);
this.tosProcessedTasks.delete(taskId);
cacheLog(`Task ${taskId} result retrieved and cache cleared`);
}
return result;
}
/**
* 1
*
*/
public clearExpiredTasks(): void {
const now = Math.floor(Date.now() / 1000);
const expiredTime = 3600; // 1尊时
let clearCount = 0;
for (const [taskId, status] of this.taskCache.entries()) {
if (typeof status === 'number' && status > 0) {
// 这是一个时间戳,检查是否过期
if (now - status > expiredTime) {
this.taskCache.delete(taskId);
this.tosProcessedTasks.delete(taskId);
clearCount++;
}
}
}
if (clearCount > 0) {
cacheLog(`Cleared ${clearCount} expired tasks`);
}
}
/**
*
*/
public getCacheStats(): { totalTasks: number, completedTasks: number, pendingTasks: number } {
let completedTasks = 0;
let pendingTasks = 0;
for (const [, status] of this.taskCache.entries()) {
if (typeof status === 'string') {
completedTasks++;
} else if (typeof status === 'number' && status > 0) {
pendingTasks++;
}
}
return {
totalTasks: this.taskCache.size,
completedTasks,
pendingTasks
};
}
public getPendingTasks(): string[] { public getPendingTasks(): string[] {
const pendingTasks: string[] = []; const pendingTasks: string[] = [];
for (const [taskId, status] of this.taskCache.entries()) { for (const [taskId, status] of this.taskCache.entries()) {
@ -95,6 +235,13 @@ export class ImagesTaskCache {
} else { } else {
cacheLog("No pending tasks at shutdown."); cacheLog("No pending tasks at shutdown.");
} }
// 关闭时停止定时清理并进行最终清理
this.stopPeriodicCleanup();
this.clearExpiredTasks();
const stats = this.getCacheStats();
cacheLog(`Final cache stats - Total: ${stats.totalTasks}, Completed: ${stats.completedTasks}, Pending: ${stats.pendingTasks}`);
} }
} }

View File

@ -3,6 +3,8 @@ import path from 'path';
// import { format as dateFormat } from 'date-fns'; // import { format as dateFormat } from 'date-fns';
const timeZone = 'Asia/Shanghai'; // Beijing Time const timeZone = 'Asia/Shanghai'; // Beijing Time
import { formatInTimeZone } from 'date-fns-tz'; import { formatInTimeZone } from 'date-fns-tz';
import TOSService from '@/lib/tos/tos-service.js';
import logger from '@/lib/logger.js';
const LOG_PATH = path.resolve("./logs/video_task_cache.log"); const LOG_PATH = path.resolve("./logs/video_task_cache.log");
@ -23,11 +25,15 @@ export class VideoTaskCache {
private static instance: VideoTaskCache; private static instance: VideoTaskCache;
private taskCache: Map<string, number|string>; private taskCache: Map<string, number|string>;
private tosProcessedTasks: Set<string>; // 记录已处理TOS上传的任务 private tosProcessedTasks: Set<string>; // 记录已处理TOS上传的任务
private cleanupInterval: NodeJS.Timeout | null = null;
private constructor() { private constructor() {
this.taskCache = new Map<string, number|string>(); this.taskCache = new Map<string, number|string>();
this.tosProcessedTasks = new Set<string>(); this.tosProcessedTasks = new Set<string>();
cacheLog("VideoTaskCache initialized"); cacheLog("VideoTaskCache initialized");
// 启动定时清理任务每30分钟
this.startPeriodicCleanup();
} }
public static getInstance(): VideoTaskCache { public static getInstance(): VideoTaskCache {
@ -37,47 +43,181 @@ export class VideoTaskCache {
return VideoTaskCache.instance; return VideoTaskCache.instance;
} }
/**
*
*/
private startPeriodicCleanup(): void {
// 每30分钟清理一次过期任务
this.cleanupInterval = setInterval(() => {
this.clearExpiredTasks();
}, 30 * 60 * 1000); // 30分钟
cacheLog("Periodic cleanup started for VideoTaskCache");
}
/**
*
*/
public stopPeriodicCleanup(): void {
if (this.cleanupInterval) {
clearInterval(this.cleanupInterval);
this.cleanupInterval = null;
cacheLog("Periodic cleanup stopped for VideoTaskCache");
}
}
public startTask(taskId: string): void { public startTask(taskId: string): void {
const startTime = Math.floor(Date.now() / 1000); // Current time in seconds const startTime = Math.floor(Date.now() / 1000); // Current time in seconds
this.taskCache.set(taskId, startTime); this.taskCache.set(taskId, startTime);
cacheLog(`Task started: ${taskId} at ${startTime}`); cacheLog(`Task started: ${taskId} at ${startTime}`);
} }
public finishTask(taskId: string, status: -1 | -2 | -3, url:string = ''): void { /**
if (this.taskCache.has(taskId)) { * URL上传到TOS
this.taskCache.set(taskId, status); * @param videoUrls URL数组
* @returns TOS URL数组
*/
private async uploadVideosToTOS(videoUrls: string[]): Promise<string[]> {
const tosUrls: string[] = [];
for (const videoUrl of videoUrls) {
try {
// 从URL获取文件名
const fileName = `video-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.mp4`;
// 上传到TOS
const tosUrl = await TOSService.uploadFromUrl(videoUrl, `videos/${fileName}`);
tosUrls.push(tosUrl);
logger.info(`视频上传到TOS成功: ${videoUrl} -> ${tosUrl}`);
} catch (error) {
logger.error(`视频上传到TOS失败: ${videoUrl}`, error);
// 如果上传失败保留原URL
tosUrls.push(videoUrl);
}
}
return tosUrls;
}
public async finishTask(taskId: string, status: -1 | -2 | -3, url: string = ''): Promise<void> {
if (!this.taskCache.has(taskId)) {
cacheLog(`Attempted to finish non-existent task: ${taskId}`);
return;
}
let finalUrl = url;
let statusMessage = ''; let statusMessage = '';
switch (status) { switch (status) {
case -1: case -1: {
{
statusMessage = 'successfully'; statusMessage = 'successfully';
if (url) { if (url) {
this.taskCache.set(taskId, url); try {
// 任务成功完成时自动上传到TOS
cacheLog(`开始上传视频到TOS: ${taskId}`);
const videoUrls = url.split(',');
const tosUrls = await this.uploadVideosToTOS(videoUrls);
finalUrl = tosUrls.join(',');
this.tosProcessedTasks.add(taskId);
cacheLog(`Task ${taskId} TOS上传完成存储TOS地址: ${finalUrl}`);
} catch (error) {
logger.error(`TOS上传失败使用原始URL: ${taskId}`, error);
finalUrl = url; // 保留原始URL
cacheLog(`Task ${taskId} TOS上传失败使用原始URL`);
} }
} }
break; break;
}
case -2: statusMessage = 'failed'; break; case -2: statusMessage = 'failed'; break;
case -3: statusMessage = 'timed out'; break; case -3: statusMessage = 'timed out'; break;
} }
// 存储最终URLTOS地址或原始URL
this.taskCache.set(taskId, finalUrl || status);
cacheLog(`Task ${taskId} finished ${statusMessage} (status: ${status})`); cacheLog(`Task ${taskId} finished ${statusMessage} (status: ${status})`);
} else {
cacheLog(`Attempted to finish non-existent task: ${taskId}`);
}
} }
public getTaskStatus(taskId: string): number | string | undefined { public getTaskStatus(taskId: string): number | string | undefined {
return this.taskCache.get(taskId); return this.taskCache.get(taskId);
} }
/**
* TOS上传
*/
public isTosProcessed(taskId: string): boolean { public isTosProcessed(taskId: string): boolean {
return this.tosProcessedTasks.has(taskId); return this.tosProcessedTasks.has(taskId);
} }
/**
* TOS上传
*/
public markTosProcessed(taskId: string): void { public markTosProcessed(taskId: string): void {
this.tosProcessedTasks.add(taskId); this.tosProcessedTasks.add(taskId);
cacheLog(`Task ${taskId} marked as TOS processed`); cacheLog(`Task ${taskId} marked as TOS processed`);
} }
/**
*
* @param taskId ID
* @returns undefined
*/
public getTaskResultAndClear(taskId: string): number | string | undefined {
const result = this.taskCache.get(taskId);
if (result && typeof result === 'string') {
// 只有当任务完成时返回字符串URL才清除缓存
this.taskCache.delete(taskId);
this.tosProcessedTasks.delete(taskId);
cacheLog(`Task ${taskId} result retrieved and cache cleared`);
}
return result;
}
/**
* 1
*
*/
public clearExpiredTasks(): void {
const now = Math.floor(Date.now() / 1000);
const expiredTime = 3600; // 1小时
let clearCount = 0;
for (const [taskId, status] of this.taskCache.entries()) {
if (typeof status === 'number' && status > 0) {
// 这是一个时间戳,检查是否过期
if (now - status > expiredTime) {
this.taskCache.delete(taskId);
this.tosProcessedTasks.delete(taskId);
clearCount++;
}
}
}
if (clearCount > 0) {
cacheLog(`Cleared ${clearCount} expired tasks`);
}
}
/**
*
*/
public getCacheStats(): { totalTasks: number, completedTasks: number, pendingTasks: number } {
let completedTasks = 0;
let pendingTasks = 0;
for (const [, status] of this.taskCache.entries()) {
if (typeof status === 'string') {
completedTasks++;
} else if (typeof status === 'number' && status > 0) {
pendingTasks++;
}
}
return {
totalTasks: this.taskCache.size,
completedTasks,
pendingTasks
};
}
public getPendingTasks(): string[] { public getPendingTasks(): string[] {
const pendingTasks: string[] = []; const pendingTasks: string[] = [];
for (const [taskId, status] of this.taskCache.entries()) { for (const [taskId, status] of this.taskCache.entries()) {
@ -95,6 +235,13 @@ export class VideoTaskCache {
} else { } else {
cacheLog("No pending tasks at shutdown."); cacheLog("No pending tasks at shutdown.");
} }
// 关闭时停止定时清理并进行最终清理
this.stopPeriodicCleanup();
this.clearExpiredTasks();
const stats = this.getCacheStats();
cacheLog(`Final cache stats - Total: ${stats.totalTasks}, Completed: ${stats.completedTasks}, Pending: ${stats.pendingTasks}`);
} }
} }

View File

@ -258,16 +258,16 @@ export async function generateImages(
}); });
const validImageUrls = imageUrls.filter(url => url !== null); const validImageUrls = imageUrls.filter(url => url !== null);
if (validImageUrls.length > 0) { if (validImageUrls.length > 0) {
imagesTaskCache.finishTask(task_id, -1, validImageUrls.join(",")); // Success await imagesTaskCache.finishTask(task_id, -1, validImageUrls.join(",")); // Success
} else { } else {
// If no valid URLs but no explicit error thrown earlier, consider it a failure. // If no valid URLs but no explicit error thrown earlier, consider it a failure.
// This could happen if item_list is empty or items don't have video_url. // This could happen if item_list is empty or items don't have video_url.
imagesTaskCache.finishTask(task_id, -2); // Failure await imagesTaskCache.finishTask(task_id, -2); // Failure
throw new APIException(EX.API_IMAGE_GENERATION_FAILED, "图片生成未返回有效链接"); throw new APIException(EX.API_IMAGE_GENERATION_FAILED, "图片生成未返回有效链接");
} }
return validImageUrls; return validImageUrls;
}catch (error) { }catch (error) {
imagesTaskCache.finishTask(task_id, -2); // Failure due to exception await imagesTaskCache.finishTask(task_id, -2); // Failure due to exception
throw error; // Re-throw the error to be handled by the caller throw error; // Re-throw the error to be handled by the caller
} }
} }

View File

@ -244,691 +244,20 @@ export async function generateVideo(
// Filter out nulls and check if any valid URL was generated // Filter out nulls and check if any valid URL was generated
const validVideoUrls = videoUrls.filter(url => url !== null); const validVideoUrls = videoUrls.filter(url => url !== null);
if (validVideoUrls.length > 0) { if (validVideoUrls.length > 0) {
videoTaskCache.finishTask(task_id, -1, validVideoUrls.join(",")); // Success await videoTaskCache.finishTask(task_id, -1, validVideoUrls.join(",")); // Success
} else { } else {
// If no valid URLs but no explicit error thrown earlier, consider it a failure. // If no valid URLs but no explicit error thrown earlier, consider it a failure.
// This could happen if item_list is empty or items don't have video_url. // This could happen if item_list is empty or items don't have video_url.
videoTaskCache.finishTask(task_id, -2); // Failure await videoTaskCache.finishTask(task_id, -2); // Failure
throw new APIException(EX.API_VIDEO_GENERATION_FAILED, "视频生成未返回有效链接"); throw new APIException(EX.API_VIDEO_GENERATION_FAILED, "视频生成未返回有效链接");
} }
return validVideoUrls; return validVideoUrls;
} catch (error) { } catch (error) {
videoTaskCache.finishTask(task_id, -2); // Failure due to exception await videoTaskCache.finishTask(task_id, -2); // Failure due to exception
throw error; // Re-throw the error to be handled by the caller
}
}
//视频 提升分辨率
export async function upgradeVideoResolution(
task_id: string,
{
targetVideoId = "",
targetHistoryId = "",
targetSubmitId = "",
originHistoryId = "",
originComponentList = []
}: {
targetVideoId: string,
targetHistoryId: string,
targetSubmitId: string,
originHistoryId: string,
originComponentList: any[],
},
refreshToken: string
) {
const videoTaskCache = VideoTaskCache.getInstance();
videoTaskCache.startTask(task_id);
try {
const { totalCredit } = await getCredit(refreshToken);
if (totalCredit <= 0)
await receiveCredit(refreshToken);
const componentId = util.uuid();
const originSubmitId = util.uuid();
//生成视频返回的 historyId item_list[0].video.video_id
let video_id = targetVideoId;
//生成视频返回的 historyId
let pre_historyId = targetHistoryId;
let origin_historyId = originHistoryId;
//生成视频返回的 historyId submit_id
let previewSubmitId = targetSubmitId;
//生成视频返回的 historyId draft_content.component_list[0]
let origin_component_list_item:any = originComponentList.find((a)=>{
return a.process_type == 1;
});
originComponentList.sort((a,b)=>{
return b.process_type - a.process_type;
})
let pro_component_list_item:any = originComponentList[0];
let origin_video_gen_inputs = origin_component_list_item.abilities.gen_video.text_to_video_params.video_gen_inputs[0];
let prompt = origin_video_gen_inputs.prompt;
let first_frame_image = origin_video_gen_inputs.first_frame_image;
let width = first_frame_image.width;
let height = first_frame_image.height;
let duration = origin_video_gen_inputs.duration_ms;
let metrics_extra = JSON.stringify({
promptSource: "upscale",
//生成视频返回的 historyId 19680709245698
originId:origin_historyId,
originSubmitId: originSubmitId,
//返回的 task.first_frame_image 信息
coverInfo: {
width: first_frame_image.width,
height: first_frame_image.height,
format: "",
imageUri: first_frame_image.image_uri,
imageUrl:first_frame_image.uri,
smartCropLoc: null,
coverUrlMap: {},
},
generateTimes: 0,
isDefaultSeed: 1,
previewId: pre_historyId,
//生成视频用的submit_id
previewSubmitId: previewSubmitId,
imageNameMapping: {},
});
const { aigc_data } = await request(
"post",
"/mweb/v1/aigc_draft/generate",
refreshToken,
{
params: {
babi_param: encodeURIComponent(
JSON.stringify({
scenario: "image_video_generation",
feature_key: "text_to_video",
feature_entrance: "to_video",
feature_entrance_detail: "to_image-text_to_video",
})
),
},
data: {
extend: {
m_video_commerce_info: {
resource_id: "generate_video",
resource_id_type: "str",
resource_sub_type: "aigc",
benefit_type: "video_upscale"
},
root_model: pro_component_list_item.abilities.gen_video.text_to_video_params.model_req_key,
template_id: "",
history_option: {},
},
submit_id: util.uuid(),
metrics_extra: metrics_extra,
draft_content: JSON.stringify({
type: "draft",
id: util.uuid(),
min_version: DRAFT_VERSION,
min_features: [],
is_from_tsn: true,
version: "3.2.2",
main_component_id: componentId,
//上一步生成视频任务返回的 historyId 中 draft_content的内容作为第一项
component_list: [
...originComponentList,
{
type: "video_base_component",
id: componentId,
min_version: DRAFT_V_VERSION,
//上一步生成视频任务返回的 historyId 中 draft_content的内容的id
parent_id: pro_component_list_item.id,
metadata: {
type: "",
id: util.uuid(),
created_platform: 3,
created_platform_version: "",
created_time_in_ms: Date.now(),
created_did: "",
},
generate_type: "gen_video",
aigc_mode: "workbench",
abilities: {
type: "",
id: util.uuid(),
gen_video:{
type: "",
id: util.uuid(),
text_to_video_params:{
type: "",
id: util.uuid(),
video_gen_inputs:[
{
type: "",
id: util.uuid(),
min_version: DRAFT_V_VERSION,
prompt: prompt,
first_frame_image:{
type: "image",
id: util.uuid(),
source_from: "upload",
platform_type: 1,
name: "",
image_uri: first_frame_image.image_uri,
width: first_frame_image.width,
height: first_frame_image.height,
format: "",
uri: first_frame_image.image_uri,
},
lens_motion_type: "",
video_mode:2,
//上一步生成视频任务返回的 historyId 中 的video_id
vid: video_id,
fps:24,
duration_ms:duration,
v2v_opt: {
type: "",
id: util.uuid(),
min_version: "3.1.0",
super_resolution: {
type: "",
id: util.uuid(),
enable: true,
target_width: width*2,
target_height: height*2,
origin_width: width,
origin_height: height,
},
},
//上一步生成视频任务返回的 historyId
origin_history_id: pre_historyId,
}
],
video_aspect_ratio:pro_component_list_item.abilities.gen_video.text_to_video_params.video_aspect_ratio,
model_req_key: pro_component_list_item.abilities.gen_video.text_to_video_params.model_req_key,
},
scene: "super_resolution",
//上面生成的 metrics_extra
video_task_extra:metrics_extra,
video_ref_params: {
type: "",
id: util.uuid(),
generate_type: 0,
item_id: (7512653500000000000 + Date.now()),
origin_history_id: pre_historyId,
},
},
},
process_type:pro_component_list_item.process_type+1,
},
],
}),
},
}
);
const historyId = aigc_data.history_record_id;
if (!historyId)
throw new APIException(EX.API_VIDEO_GENERATION_FAILED, "高清 记录ID不存在");
let status = 20, failCode, item_list = [];
//https://jimeng.jianying.com/mweb/v1/get_history_by_ids?
//
let emptyCount = 30;
while (status === 20) {
await new Promise((resolve) => setTimeout(resolve, 1000));
const result = await request("post", "/mweb/v1/get_history_by_ids", refreshToken, {
data: {
history_ids: [historyId],
http_common_info: {
aid: Number(DEFAULT_ASSISTANT_ID),
},
},
});
if (!result[historyId]){
logger.warn(`高清 记录ID不存在 ${historyId} 重试次数: ${emptyCount} res: ${JSON.stringify(result)}`);
emptyCount--;
if(emptyCount<=0){
throw new APIException(EX.API_HISTORY_EMPTY, "高清 记录不存在: " + JSON.stringify(result));
}else{
status = 20;
continue;
}
}
status = result[historyId].status;
failCode = result[historyId].fail_code;
item_list = result[historyId].item_list;
}
if (status === 30) {
if (failCode === '2038')
throw new APIException(EX.API_CONTENT_FILTERED);
else
throw new APIException(EX.API_VIDEO_GENERATION_FAILED);
}
// Assuming success if status is not 30 (failed) and not 20 (pending)
// and item_list is populated.
// A more robust check might be needed depending on actual API behavior for success.
const videoUrls = item_list.map((item) => {
if(!item?.video?.transcoded_video?.origin?.video_url)
return null;
return item.video.transcoded_video.origin.video_url;
});
// Filter out nulls and check if any valid URL was generated
const validVideoUrls = videoUrls.filter(url => url !== null);
if (validVideoUrls.length > 0) {
videoTaskCache.finishTask(task_id, -1, validVideoUrls.join(",")); // Success
} else {
// If no valid URLs but no explicit error thrown earlier, consider it a failure.
// This could happen if item_list is empty or items don't have video_url.
videoTaskCache.finishTask(task_id, -2); // Failure
throw new APIException(EX.API_VIDEO_GENERATION_FAILED, "高清 视频生成未返回有效链接");
}
return validVideoUrls;
} catch (error) {
videoTaskCache.finishTask(task_id, -2); // Failure due to exception
throw error; // Re-throw the error to be handled by the caller
}
}
//视频 补帧 (未完成)
export async function upgradeVideoFrame(
_model: string,
task_id: string,
prompt: string,
{
width = 512,
height = 512,
imgURL = "",
duration = 5000,
}: {
width: number;
height: number;
imgURL: string;
duration: number;
},
refreshToken: string
) {
const videoTaskCache = VideoTaskCache.getInstance();
videoTaskCache.startTask(task_id);
try {
if(!imgURL){
throw new APIException(EX.API_REQUEST_PARAMS_INVALID);
return;
}
const model = getModel(_model);
logger.info(`使用模型: ${_model} ${model} 参考图片尺寸: ${width}x${height} 图片地址 ${imgURL} 持续时间: ${duration} 提示词: ${prompt}`);
const { totalCredit } = await getCredit(refreshToken);
if (totalCredit <= 0)
await receiveCredit(refreshToken);
const componentId = util.uuid();
const originSubmitId = util.uuid();
const { aigc_data } = await request(
"post",
"/mweb/v1/aigc_draft/generate",
refreshToken,
{
params: {
babi_param: encodeURIComponent(
JSON.stringify({
scenario: "image_video_generation",
feature_key: "text_to_video",
feature_entrance: "to_video",
feature_entrance_detail: "to_image-text_to_video",
})
),
},
data: {
extend: {
m_video_commerce_info: {
resource_id: "generate_video",
resource_id_type: "str",
resource_sub_type: "aigc",
benefit_type: "basic_video_operation_vgfm_v_three"
},
root_model: model,
template_id: "",
history_option: {},
},
submit_id: util.uuid(),
metrics_extra: JSON.stringify({
promptSource: "custom",
originSubmitId: originSubmitId,
isDefaultSeed: 1,
originTemplateId: "",
imageNameMapping: {},
}),
draft_content: JSON.stringify({
type: "draft",
id: util.uuid(),
min_version: DRAFT_VERSION,
min_features: [],
is_from_tsn: true,
version: "3.2.2",
main_component_id: componentId,
component_list: [
{
type: "video_base_component",
id: componentId,
min_version: DRAFT_V_VERSION,
generate_type: "gen_video",
aigc_mode: "workbench",
metadata: {
type: "",
id: util.uuid(),
created_platform: 3,
created_platform_version: "",
created_time_in_ms: Date.now(),
created_did: "",
},
abilities: {
type: "",
id: util.uuid(),
gen_video:{
type: "",
id: util.uuid(),
text_to_video_params:{
type: "",
id: util.uuid(),
video_gen_inputs:[
{
type: "",
id: util.uuid(),
min_version: DRAFT_V_VERSION,
prompt: prompt,
first_frame_image:{
type: "image",
id: util.uuid(),
source_from: "upload",
platform_type: 1,
name: "",
image_uri: imgURL,
width: width,
height: height,
format: "",
uri: imgURL,
},
video_mode:2,
fps:24,
duration_ms:duration,
}
],
video_aspect_ratio:"9:16",
seed: Math.floor(Math.random() * 100000000) + 2500000000,
model_req_key: model,
},
video_task_extra:{
promptSource: "custom",
originSubmitId: originSubmitId,
isDefaultSeed: 1,
originTemplateId: "",
imageNameMapping: {},
}
},
},
process_type:1,
},
],
}),
http_common_info: {
aid: Number(DEFAULT_ASSISTANT_ID),
},
},
}
);
const historyId = aigc_data.history_record_id;
if (!historyId)
throw new APIException(EX.API_VIDEO_GENERATION_FAILED, "记录ID不存在");
let status = 20, failCode, item_list = [];
//https://jimeng.jianying.com/mweb/v1/get_history_by_ids?
//
while (status === 20) {
await new Promise((resolve) => setTimeout(resolve, 1000));
const result = await request("post", "/mweb/v1/get_history_by_ids", refreshToken, {
data: {
history_ids: [historyId],
http_common_info: {
aid: Number(DEFAULT_ASSISTANT_ID),
},
},
});
if (!result[historyId])
throw new APIException(EX.API_HISTORY_EMPTY, "记录不存在");
status = result[historyId].status;
failCode = result[historyId].fail_code;
item_list = result[historyId].item_list;
}
if (status === 30) {
if (failCode === '2038')
throw new APIException(EX.API_CONTENT_FILTERED);
else
throw new APIException(EX.API_VIDEO_GENERATION_FAILED);
}
// Assuming success if status is not 30 (failed) and not 20 (pending)
// and item_list is populated.
// A more robust check might be needed depending on actual API behavior for success.
const videoUrls = item_list.map((item) => {
if(!item?.video?.transcoded_video?.origin?.video_url)
return null;
return item.video.transcoded_video.origin.video_url;
});
// Filter out nulls and check if any valid URL was generated
const validVideoUrls = videoUrls.filter(url => url !== null);
if (validVideoUrls.length > 0) {
videoTaskCache.finishTask(task_id, -1, validVideoUrls.join(",")); // Success
} else {
// If no valid URLs but no explicit error thrown earlier, consider it a failure.
// This could happen if item_list is empty or items don't have video_url.
videoTaskCache.finishTask(task_id, -2); // Failure
throw new APIException(EX.API_VIDEO_GENERATION_FAILED, "视频生成未返回有效链接");
}
return validVideoUrls;
} catch (error) {
videoTaskCache.finishTask(task_id, -2); // Failure due to exception
throw error; // Re-throw the error to be handled by the caller
}
}
//视频 生成音效 (未完成)
export async function generateVideoSound(
_model: string,
task_id: string,
prompt: string,
{
width = 512,
height = 512,
imgURL = "",
duration = 5000,
}: {
width: number;
height: number;
imgURL: string;
duration: number;
},
refreshToken: string
) {
const videoTaskCache = VideoTaskCache.getInstance();
videoTaskCache.startTask(task_id);
try {
if(!imgURL){
throw new APIException(EX.API_REQUEST_PARAMS_INVALID);
return;
}
const model = getModel(_model);
logger.info(`使用模型: ${_model} ${model} 参考图片尺寸: ${width}x${height} 图片地址 ${imgURL} 持续时间: ${duration} 提示词: ${prompt}`);
const { totalCredit } = await getCredit(refreshToken);
if (totalCredit <= 0)
await receiveCredit(refreshToken);
const componentId = util.uuid();
const originSubmitId = util.uuid();
const { aigc_data } = await request(
"post",
"/mweb/v1/aigc_draft/generate",
refreshToken,
{
params: {
babi_param: encodeURIComponent(
JSON.stringify({
scenario: "image_video_generation",
feature_key: "text_to_video",
feature_entrance: "to_video",
feature_entrance_detail: "to_image-text_to_video",
})
),
},
data: {
extend: {
m_video_commerce_info: {
resource_id: "generate_video",
resource_id_type: "str",
resource_sub_type: "aigc",
benefit_type: "basic_video_operation_vgfm_v_three"
},
root_model: model,
template_id: "",
history_option: {},
},
submit_id: util.uuid(),
metrics_extra: JSON.stringify({
promptSource: "custom",
originSubmitId: originSubmitId,
isDefaultSeed: 1,
originTemplateId: "",
imageNameMapping: {},
}),
draft_content: JSON.stringify({
type: "draft",
id: util.uuid(),
min_version: DRAFT_VERSION,
min_features: [],
is_from_tsn: true,
version: "3.2.2",
main_component_id: componentId,
component_list: [
{
type: "video_base_component",
id: componentId,
min_version: DRAFT_V_VERSION,
generate_type: "gen_video",
aigc_mode: "workbench",
metadata: {
type: "",
id: util.uuid(),
created_platform: 3,
created_platform_version: "",
created_time_in_ms: Date.now(),
created_did: "",
},
abilities: {
type: "",
id: util.uuid(),
gen_video:{
type: "",
id: util.uuid(),
text_to_video_params:{
type: "",
id: util.uuid(),
video_gen_inputs:[
{
type: "",
id: util.uuid(),
min_version: DRAFT_V_VERSION,
prompt: prompt,
first_frame_image:{
type: "image",
id: util.uuid(),
source_from: "upload",
platform_type: 1,
name: "",
image_uri: imgURL,
width: width,
height: height,
format: "",
uri: imgURL,
},
video_mode:2,
fps:24,
duration_ms:duration,
}
],
video_aspect_ratio:"9:16",
seed: Math.floor(Math.random() * 100000000) + 2500000000,
model_req_key: model,
},
video_task_extra:{
promptSource: "custom",
originSubmitId: originSubmitId,
isDefaultSeed: 1,
originTemplateId: "",
imageNameMapping: {},
}
},
},
process_type:1,
},
],
}),
http_common_info: {
aid: Number(DEFAULT_ASSISTANT_ID),
},
},
}
);
const historyId = aigc_data.history_record_id;
if (!historyId)
throw new APIException(EX.API_VIDEO_GENERATION_FAILED, "记录ID不存在");
let status = 20, failCode, item_list = [];
//https://jimeng.jianying.com/mweb/v1/get_history_by_ids?
//
while (status === 20) {
await new Promise((resolve) => setTimeout(resolve, 1000));
const result = await request("post", "/mweb/v1/get_history_by_ids", refreshToken, {
data: {
history_ids: [historyId],
http_common_info: {
aid: Number(DEFAULT_ASSISTANT_ID),
},
},
});
if (!result[historyId])
throw new APIException(EX.API_HISTORY_EMPTY, "记录不存在");
status = result[historyId].status;
failCode = result[historyId].fail_code;
item_list = result[historyId].item_list;
}
if (status === 30) {
if (failCode === '2038')
throw new APIException(EX.API_CONTENT_FILTERED);
else
throw new APIException(EX.API_VIDEO_GENERATION_FAILED);
}
// Assuming success if status is not 30 (failed) and not 20 (pending)
// and item_list is populated.
// A more robust check might be needed depending on actual API behavior for success.
const videoUrls = item_list.map((item) => {
if(!item?.video?.transcoded_video?.origin?.video_url)
return null;
return item.video.transcoded_video.origin.video_url;
});
// Filter out nulls and check if any valid URL was generated
const validVideoUrls = videoUrls.filter(url => url !== null);
if (validVideoUrls.length > 0) {
videoTaskCache.finishTask(task_id, -1, validVideoUrls.join(",")); // Success
} else {
// If no valid URLs but no explicit error thrown earlier, consider it a failure.
// This could happen if item_list is empty or items don't have video_url.
videoTaskCache.finishTask(task_id, -2); // Failure
throw new APIException(EX.API_VIDEO_GENERATION_FAILED, "视频生成未返回有效链接");
}
return validVideoUrls;
} catch (error) {
videoTaskCache.finishTask(task_id, -2); // Failure due to exception
throw error; // Re-throw the error to be handled by the caller throw error; // Re-throw the error to be handled by the caller
} }
} }
export default { export default {
generateVideo, generateVideo,
upgradeVideoResolution,
// upgradeVideoFrame,
// generateVideoSound,
}; };

View File

@ -5,34 +5,6 @@ import { generateImages } from "@/api/controllers/images.ts";
import { tokenSplit } from "@/api/controllers/core.ts"; import { tokenSplit } from "@/api/controllers/core.ts";
import util from "@/lib/util.ts"; import util from "@/lib/util.ts";
import { ImagesTaskCache } from '@/api/ImagesTaskCache.ts'; import { ImagesTaskCache } from '@/api/ImagesTaskCache.ts';
import TOSService from "@/lib/tos/tos-service.ts";
import logger from "@/lib/logger.ts";
/**
* URL上传到TOS
* @param imageUrls URL数组
* @returns TOS URL数组
*/
async function uploadImagesToTOS(imageUrls: string[]): Promise<string[]> {
const tosUrls: string[] = [];
for (const imageUrl of imageUrls) {
try {
// 从URL获取文件名
const fileName = `image-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.webp`;
// 上传到TOS
const tosUrl = await TOSService.uploadFromUrl(imageUrl, `images/${fileName}`);
tosUrls.push(tosUrl);
logger.info(`图片上传到TOS成功: ${imageUrl} -> ${tosUrl}`);
} catch (error) {
logger.error(`图片上传到TOS失败: ${imageUrl}`, error);
// 如果上传失败保留原URL
tosUrls.push(imageUrl);
}
}
return tosUrls;
}
export default { export default {
prefix: "/v1/images", prefix: "/v1/images",
@ -44,41 +16,19 @@ export default {
const { const {
task_id, task_id,
} = request.query; // 从 query 中获取 } = request.query; // 从 query 中获取
let res = imagesTaskCache.getTaskStatus(task_id);
// console.log("查询任务状态", task_id, 'res:',res); // 使用新的方法获取任务结果并清理缓存
let res = imagesTaskCache.getTaskResultAndClear(task_id);
if (typeof res === 'string') { if (typeof res === 'string') {
// 任务已完成检查是否已处理TOS上传 // 任务已完成返回TOS地址已经在finishTask中处理过
if (!imagesTaskCache.isTosProcessed(task_id)) {
// 尚未处理TOS上传处理图片URL上传到TOS
try {
const imageUrls = res.split(',');
const tosUrls = await uploadImagesToTOS(imageUrls);
const tosUrlsString = tosUrls.join(',');
// 更新缓存中TOS URL并标记为已处理
imagesTaskCache.finishTask(task_id, -1, tosUrlsString);
imagesTaskCache.markTosProcessed(task_id);
return {
created: util.unixTimestamp(),
data:{task_id, url: tosUrlsString, status:-1},
};
} catch (error) {
logger.error(`处理图片TOS上传失败: ${task_id}`, error);
// 如果上传失败返回原始URL
return { return {
created: util.unixTimestamp(), created: util.unixTimestamp(),
data: { task_id, url: res, status: -1 }, data: { task_id, url: res, status: -1 },
}; };
}
} else {
// 已处理TOS上传直接返回缓存的TOS URL
return {
created: util.unixTimestamp(),
data:{task_id, url:res, status:-1},
};
}
} else { } else {
// 任务进行中或失败,不清理缓存
res = imagesTaskCache.getTaskStatus(task_id);
return { return {
created: util.unixTimestamp(), created: util.unixTimestamp(),
data: { task_id, url: "", status: res || 0 }, data: { task_id, url: "", status: res || 0 },

View File

@ -1,38 +1,10 @@
import _ from "lodash"; import _ from "lodash";
import Request from "@/lib/request/Request.ts"; import Request from "@/lib/request/Request.ts";
import { generateVideo, upgradeVideoResolution } from "@/api/controllers/video.ts"; import { generateVideo } from "@/api/controllers/video.ts";
import { tokenSplit } from "@/api/controllers/core.ts"; import { tokenSplit } from "@/api/controllers/core.ts";
import util from "@/lib/util.ts"; import util from "@/lib/util.ts";
import { VideoTaskCache } from '@/api/VideoTaskCache.ts'; import { VideoTaskCache } from '@/api/VideoTaskCache.ts';
import TOSService from "@/lib/tos/tos-service.ts";
import logger from "@/lib/logger.ts";
/**
* URL上传到TOS
* @param videoUrls URL数组
* @returns TOS URL数组
*/
async function uploadVideosToTOS(videoUrls: string[]): Promise<string[]> {
const tosUrls: string[] = [];
for (const videoUrl of videoUrls) {
try {
// 从URL获取文件名
const fileName = `video-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.mp4`;
// 上传到TOS
const tosUrl = await TOSService.uploadFromUrl(videoUrl, `videos/${fileName}`);
tosUrls.push(tosUrl);
logger.info(`视频上传到TOS成功: ${videoUrl} -> ${tosUrl}`);
} catch (error) {
logger.error(`视频上传到TOS失败: ${videoUrl}`, error);
// 如果上传失败保留原URL
tosUrls.push(videoUrl);
}
}
return tosUrls;
}
export default { export default {
prefix: "/v1/video", prefix: "/v1/video",
@ -44,41 +16,19 @@ export default {
const { const {
task_id, task_id,
} = request.query; // 从 query 中获取 } = request.query; // 从 query 中获取
let res = videoTaskCache.getTaskStatus(task_id);
// console.log("查询任务状态", task_id, 'res:',res); // 使用新的方法获取任务结果并清理缓存
let res = videoTaskCache.getTaskResultAndClear(task_id);
if (typeof res === 'string') { if (typeof res === 'string') {
// 任务已完成检查是否已处理TOS上传 // 任务已完成返回TOS地址已经在finishTask中处理过
if (!videoTaskCache.isTosProcessed(task_id)) {
// 尚未处理TOS上传处理视频URL上传到TOS
try {
const videoUrls = res.split(',');
const tosUrls = await uploadVideosToTOS(videoUrls);
const tosUrlsString = tosUrls.join(',');
// 更新缓存中TOS URL并标记为已处理
videoTaskCache.finishTask(task_id, -1, tosUrlsString);
videoTaskCache.markTosProcessed(task_id);
return {
created: util.unixTimestamp(),
data:{task_id, url: tosUrlsString, status:-1},
};
} catch (error) {
logger.error(`处理视频TOS上传失败: ${task_id}`, error);
// 如果上传失败返回原始URL
return { return {
created: util.unixTimestamp(), created: util.unixTimestamp(),
data: { task_id, url: res, status: -1 }, data: { task_id, url: res, status: -1 },
}; };
}
} else {
// 已处理TOS上传直接返回缓存的TOS URL
return {
created: util.unixTimestamp(),
data:{task_id, url:res, status:-1},
};
}
} else { } else {
// 任务进行中或失败,不清理缓存
res = videoTaskCache.getTaskStatus(task_id);
return { return {
created: util.unixTimestamp(), created: util.unixTimestamp(),
data: { task_id, url: "", status: res || 0 }, data: { task_id, url: "", status: res || 0 },
@ -126,44 +76,5 @@ export default {
data:'success', data:'success',
}; };
}, },
"/upscale": async (request: Request) => {
request
.validate("body.task_id", _.isString)
.validate("body.targetVideoId", _.isString)
.validate("body.targetHistoryId", _.isString)
.validate("body.targetSubmitId", _.isString)
.validate("body.originHistoryId", _.isString)
.validate("body.components", _.isString)
.validate("headers.authorization", _.isString);
// refresh_token切分 必须和generations使用同一个token
const tokens = tokenSplit(request.headers.authorization);
// 取第一个 必须和generations使用同一个token
const token = tokens[0];
const {
task_id,
targetVideoId,
targetHistoryId,
targetSubmitId,
originHistoryId,
components,
} = request.body;
const originComponentList = JSON.parse(components);
//不等结果 直接返回
upgradeVideoResolution(task_id, {
targetVideoId,
targetHistoryId,
targetSubmitId,
originHistoryId,
originComponentList
}, token);
// let data = [];
// data = imageUrls.map((url) => ({
// url,
// }));
return {
created: util.unixTimestamp(),
data:'success',
};
},
}, },
}; };

View File

@ -23,6 +23,12 @@ services:
- TOS_ENDPOINT=${TOS_ENDPOINT:-tos-cn-beijing.volces.com} - TOS_ENDPOINT=${TOS_ENDPOINT:-tos-cn-beijing.volces.com}
- HEARTBEAT_ENABLED=${HEARTBEAT_ENABLED:-true} - HEARTBEAT_ENABLED=${HEARTBEAT_ENABLED:-true}
- HEARTBEAT_INTERVAL=${HEARTBEAT_INTERVAL:-30} - HEARTBEAT_INTERVAL=${HEARTBEAT_INTERVAL:-30}
- USE_DATABASE_MODE=${USE_DATABASE_MODE:-false}
- MAX_CONCURRENT_TASKS=${MAX_CONCURRENT_TASKS:-3}
- TASK_POLL_INTERVAL=${TASK_POLL_INTERVAL:-5}
- IMAGE_TASK_TIMEOUT=${IMAGE_TASK_TIMEOUT:-3600}
- VIDEO_TASK_TIMEOUT=${VIDEO_TASK_TIMEOUT:-86400}
- RESULT_EXPIRE_TIME=${RESULT_EXPIRE_TIME:-86400}
ports: ports:
- "${API_PORT:-3302}:3302" - "${API_PORT:-3302}:3302"
volumes: volumes:

View File

@ -21,7 +21,13 @@
"TOS_REGION": "cn-beijing", "TOS_REGION": "cn-beijing",
"TOS_ENDPOINT": "tos-cn-beijing.volces.com", "TOS_ENDPOINT": "tos-cn-beijing.volces.com",
"HEARTBEAT_ENABLED": true, "HEARTBEAT_ENABLED": true,
"HEARTBEAT_INTERVAL": 30 "HEARTBEAT_INTERVAL": 30,
"USE_DATABASE_MODE": false,
"MAX_CONCURRENT_TASKS": 3,
"TASK_POLL_INTERVAL": 5,
"IMAGE_TASK_TIMEOUT": 3600,
"VIDEO_TASK_TIMEOUT": 86400,
"RESULT_EXPIRE_TIME": 86400
}, },
"env_production": { "env_production": {
"NODE_ENV": "production", "NODE_ENV": "production",
@ -37,7 +43,13 @@
"TOS_REGION": "cn-beijing", "TOS_REGION": "cn-beijing",
"TOS_ENDPOINT": "tos-cn-beijing.volces.com", "TOS_ENDPOINT": "tos-cn-beijing.volces.com",
"HEARTBEAT_ENABLED": true, "HEARTBEAT_ENABLED": true,
"HEARTBEAT_INTERVAL": 30 "HEARTBEAT_INTERVAL": 30,
"USE_DATABASE_MODE": false,
"MAX_CONCURRENT_TASKS": 3,
"TASK_POLL_INTERVAL": 5,
"IMAGE_TASK_TIMEOUT": 3600,
"VIDEO_TASK_TIMEOUT": 86400,
"RESULT_EXPIRE_TIME": 86400
}, },
"log_file": "./logs/combined-3302.log", "log_file": "./logs/combined-3302.log",
"out_file": "./logs/out-3302.log", "out_file": "./logs/out-3302.log",
@ -69,7 +81,13 @@
"TOS_REGION": "cn-beijing", "TOS_REGION": "cn-beijing",
"TOS_ENDPOINT": "tos-cn-beijing.volces.com", "TOS_ENDPOINT": "tos-cn-beijing.volces.com",
"HEARTBEAT_ENABLED": true, "HEARTBEAT_ENABLED": true,
"HEARTBEAT_INTERVAL": 30 "HEARTBEAT_INTERVAL": 30,
"USE_DATABASE_MODE": false,
"MAX_CONCURRENT_TASKS": 3,
"TASK_POLL_INTERVAL": 5,
"IMAGE_TASK_TIMEOUT": 3600,
"VIDEO_TASK_TIMEOUT": 86400,
"RESULT_EXPIRE_TIME": 86400
}, },
"env_production": { "env_production": {
"NODE_ENV": "production", "NODE_ENV": "production",
@ -85,7 +103,13 @@
"TOS_REGION": "cn-beijing", "TOS_REGION": "cn-beijing",
"TOS_ENDPOINT": "tos-cn-beijing.volces.com", "TOS_ENDPOINT": "tos-cn-beijing.volces.com",
"HEARTBEAT_ENABLED": true, "HEARTBEAT_ENABLED": true,
"HEARTBEAT_INTERVAL": 30 "HEARTBEAT_INTERVAL": 30,
"USE_DATABASE_MODE": false,
"MAX_CONCURRENT_TASKS": 3,
"TASK_POLL_INTERVAL": 5,
"IMAGE_TASK_TIMEOUT": 3600,
"VIDEO_TASK_TIMEOUT": 86400,
"RESULT_EXPIRE_TIME": 86400
}, },
"log_file": "./logs/combined-3303.log", "log_file": "./logs/combined-3303.log",
"out_file": "./logs/out-3303.log", "out_file": "./logs/out-3303.log",
@ -117,7 +141,13 @@
"TOS_REGION": "cn-beijing", "TOS_REGION": "cn-beijing",
"TOS_ENDPOINT": "tos-cn-beijing.volces.com", "TOS_ENDPOINT": "tos-cn-beijing.volces.com",
"HEARTBEAT_ENABLED": true, "HEARTBEAT_ENABLED": true,
"HEARTBEAT_INTERVAL": 30 "HEARTBEAT_INTERVAL": 30,
"USE_DATABASE_MODE": false,
"MAX_CONCURRENT_TASKS": 3,
"TASK_POLL_INTERVAL": 5,
"IMAGE_TASK_TIMEOUT": 3600,
"VIDEO_TASK_TIMEOUT": 86400,
"RESULT_EXPIRE_TIME": 86400
}, },
"env_production": { "env_production": {
"NODE_ENV": "production", "NODE_ENV": "production",
@ -133,7 +163,13 @@
"TOS_REGION": "cn-beijing", "TOS_REGION": "cn-beijing",
"TOS_ENDPOINT": "tos-cn-beijing.volces.com", "TOS_ENDPOINT": "tos-cn-beijing.volces.com",
"HEARTBEAT_ENABLED": true, "HEARTBEAT_ENABLED": true,
"HEARTBEAT_INTERVAL": 30 "HEARTBEAT_INTERVAL": 30,
"USE_DATABASE_MODE": false,
"MAX_CONCURRENT_TASKS": 3,
"TASK_POLL_INTERVAL": 5,
"IMAGE_TASK_TIMEOUT": 3600,
"VIDEO_TASK_TIMEOUT": 86400,
"RESULT_EXPIRE_TIME": 86400
}, },
"log_file": "./logs/combined-3304.log", "log_file": "./logs/combined-3304.log",
"out_file": "./logs/out-3304.log", "out_file": "./logs/out-3304.log",