diff --git a/MONGODB_ARCHITECTURE_PLAN.md b/MONGODB_ARCHITECTURE_PLAN.md new file mode 100644 index 0000000..bd87874 --- /dev/null +++ b/MONGODB_ARCHITECTURE_PLAN.md @@ -0,0 +1,755 @@ +# MongoDB架构改造方案 + +## 项目概述 + +将现有的内存缓存架构改造为基于MongoDB的数据库持久化方案,支持多服务器环境下的任务处理,每个Node服务只处理分配给自己的任务。 + +## 适用范围 + +本方案仅适用于以下异步生成接口,不影响其他同步接口: + +### 新策略适用接口 +- `/v1/images/generations` - 图片生成接口(异步) +- `/v1/video/generations` - 视频生成接口(异步) +- `/v1/images/query` - 图片任务查询接口 +- `/v1/video/query` - 视频任务查询接口 + +### 保持现状的接口 +- `/v1/upload/images` - 图片上传接口(同步,不使用缓存策略,保持现有实现) +- 其他所有同步接口保持不变 + +## 1. 数据表设计 + +### 1.1 生成任务表 (jimeng_free_generation_tasks) + +```typescript +interface IGenerationTask extends Document { + task_id: string; // 任务唯一标识,主键 + task_type: 'image' | 'video'; // 任务类型 + server_id: string; // 分配的服务器ID(外部分配,对应SERVICE_ID) + + // 原始请求参数 + original_params: { + model?: string; // 模型名称 + prompt: string; // 提示词 + negative_prompt?: string; // 负向提示词 + width?: number; // 宽度 + height?: number; // 高度 + sample_strength?: number; // 采样强度 + images?: Array<{ // 图片参数(视频生成用) + url: string; + width: number; + height: number; + }>; + is_pro?: boolean; // 是否专业版 + duration?: number; // 时长(毫秒) + ratio?: string; // 比例 + response_format?: string; // 响应格式 + }; + + // 生成过程中的内部参数 + internal_params: { + refresh_token: string; // 认证令牌 + component_id?: string; // 组件ID + history_id?: string; // 即梦平台历史记录ID + mapped_model?: string; // 映射后的模型名 + submit_id?: string; // 提交ID + }; + + // 任务状态和控制 + status: 'pending' | 'processing' | 'polling' | 'completed' | 'failed'; + retry_count: number; // 当前重试次数 + max_retries: number; // 最大重试次数(默认3次) + + // 轮询控制 + next_poll_at?: number; // 下次轮询时间戳(用于控制轮询间隔) + poll_interval: number; // 轮询间隔(秒,默认10秒) + task_timeout: number; // 任务超时时间(秒,图片1小时,视频24小时) + + // 时间戳 + created_at: number; // 创建时间戳(秒) + updated_at: number; // 更新时间戳(秒) + started_at?: number; // 开始处理时间戳(秒) + completed_at?: number; // 完成时间戳(秒) + + // 错误信息 + error_message?: string; // 错误描述 + fail_code?: string; // 即梦平台返回的失败代码 +} +``` + +#### 数据库索引设计 +```javascript +// 复合索引 - 用于轮询查询 +{ "server_id": 1, "status": 1, "next_poll_at": 1 } + +// 单字段索引 +{ "task_id": 1 } // 主键查询 +{ "created_at": 1 } // 按创建时间清理 +{ "updated_at": 1 } // 按更新时间查询 +``` + +### 1.2 生成结果表 (jimeng_free_generation_results) + +```typescript +interface IGenerationResult extends Document { + task_id: string; // 关联的任务ID,主键 + task_type: 'image' | 'video'; // 任务类型 + server_id: string; // 处理服务器ID + + // 生成结果 + status: 'success' | 'failed'; // 最终状态 + original_urls: string[]; // 原始URL数组(即梦返回的地址) + tos_urls: string[]; // TOS URL数组(上传后的地址) + + // 处理元数据 + metadata: { + generation_time?: number; // 生成耗时(毫秒) + tos_upload_time?: number; // TOS上传耗时(毫秒) + total_files: number; // 文件总数 + successful_uploads: number; // 成功上传数量 + tos_upload_errors?: string[]; // TOS上传错误信息 + fail_reason?: string; // 失败原因 + }; + + // 时间管理 + created_at: number; // 创建时间戳(秒) + expires_at: number; // 过期时间戳(用于自动清理,默认24小时后) +} +``` + +#### 数据库索引设计 +```javascript +// 主键索引 +{ "task_id": 1 } + +// 过期清理索引 +{ "expires_at": 1 } + +// 服务器查询索引 +{ "server_id": 1, "created_at": 1 } +``` + +## 2. 服务架构设计 + +### 2.1 任务处理流程 + +```mermaid +graph TD + A[外部请求] --> B[创建任务记录] + B --> C[返回任务ID] + C --> D[定时轮询服务] + D --> E{检查任务状态} + E -->|pending| F[开始处理任务] + E -->|processing| G[调用即梦API] + E -->|polling| H[检查即梦结果] + E -->|completed/failed| I[跳过] + + F --> J[更新为processing] + J --> K[调用生成API] + K --> L{API调用结果} + L -->|成功| M[更新为polling] + L -->|失败| N[重试或标记失败] + + M --> O[设置下次轮询时间] + H --> P{即梦任务完成?} + P -->|是| Q[处理结果] + P -->|否| R[更新轮询时间] + + Q --> S[上传TOS] + S --> T[创建结果记录] + T --> U[标记任务完成] + + V[查询接口] --> W[查找结果记录] + W --> X{结果存在?} + X -->|是| Y[返回结果并清理] + X -->|否| Z[返回进行中状态] +``` + +### 2.2 定时轮询服务(TaskPollingService) + +```typescript +class TaskPollingService { + private static instance: TaskPollingService; + private currentServerId: string; + private pollInterval: NodeJS.Timeout | null = null; + private isRunning: boolean = false; + private maxConcurrentTasks: number; // 从环境变量获取,默认3个 + + /** + * 启动轮询服务 + * 注意:与HeartbeatService分离,使用不同的定时器 + */ + async start(): Promise { + this.currentServerId = process.env.SERVICE_ID || 'jimeng-free-api'; + this.maxConcurrentTasks = parseInt(process.env.MAX_CONCURRENT_TASKS || '3'); + + // 每5秒轮询一次(与心跳服务的60秒区分开) + this.pollInterval = setInterval(async () => { + await this.processTasks(); + }, 5000); + + this.isRunning = true; + logger.info(`Task polling service started for server: ${this.currentServerId}`); + } + + /** + * 主处理方法 + */ + private async processTasks(): Promise { + try { + const currentTime = Math.floor(Date.now() / 1000); + + // 1. 处理待处理任务 + await this.processPendingTasks(currentTime); + + // 2. 检查轮询任务 + await this.checkPollingTasks(currentTime); + + // 3. 检查和处理超时任务 + await this.checkTimeoutTasks(currentTime); + + } catch (error) { + logger.error('Task polling error:', error); + } + } + + /** + * 检查超时任务 + */ + private async checkTimeoutTasks(currentTime: number): Promise { + try { + const timeoutCount = await DatabaseCleanupService.cleanupTimeoutTasks(); + if (timeoutCount > 0) { + logger.warn(`Marked ${timeoutCount} tasks as failed due to timeout`); + } + } catch (error) { + logger.error('Failed to check timeout tasks:', error); + } + } +} +``` + +### 2.3 与心跳服务的协调 + +- **HeartbeatService**: 每60秒发送一次心跳,管理服务器在线状态 +- **TaskPollingService**: 每5秒轮询一次任务,处理任务状态变更 +- **独立运行**: 两个服务使用不同的定时器,互不干扰 +- **共享配置**: 都使用相同的SERVICE_ID标识服务器 + +## 3. 核心方法重构 + +### 3.1 新的生成方法(保持接口兼容) + +```typescript +// 新的数据库驱动方法 +class DatabaseGenerationService { + + /** + * 图片生成 - 数据库版本 + */ + async generateImagesV2( + model: string, + taskId: string, + prompt: string, + params: any, + refreshToken: string + ): Promise { + const currentServerId = process.env.SERVICE_ID || 'jimeng-free-api'; + + // 创建任务记录 + const imageTimeout = parseInt(process.env.IMAGE_TASK_TIMEOUT || '3600'); + + await GenerationTask.create({ + task_id: taskId, + task_type: 'image', + server_id: currentServerId, + original_params: { model, prompt, ...params }, + internal_params: { refresh_token: refreshToken }, + status: 'pending', + retry_count: 0, + max_retries: 3, + poll_interval: 10, + task_timeout: imageTimeout, + created_at: Math.floor(Date.now() / 1000), + updated_at: Math.floor(Date.now() / 1000) + }); + + logger.info(`Image task created: ${taskId} for server: ${currentServerId}`); + } + + /** + * 视频生成 - 数据库版本 + */ + async generateVideoV2( + taskId: string, + prompt: string, + params: any, + refreshToken: string + ): Promise { + const currentServerId = process.env.SERVICE_ID || 'jimeng-free-api'; + + const videoTimeout = parseInt(process.env.VIDEO_TASK_TIMEOUT || '86400'); + + await GenerationTask.create({ + task_id: taskId, + task_type: 'video', + server_id: currentServerId, + original_params: { prompt, ...params }, + internal_params: { refresh_token: refreshToken }, + status: 'pending', + retry_count: 0, + max_retries: 3, + poll_interval: 15, // 视频轮询间隔更长 + task_timeout: videoTimeout, + created_at: Math.floor(Date.now() / 1000), + updated_at: Math.floor(Date.now() / 1000) + }); + + logger.info(`Video task created: ${taskId} for server: ${currentServerId}`); + } + + /** + * 查询任务结果 + */ + async queryTaskResult(taskId: string): Promise { + // 1. 先查询结果表 + const result = await GenerationResult.findOne({ task_id: taskId }); + + if (result) { + // 找到结果,返回并清理 + const response = { + created: Math.floor(Date.now() / 1000), + data: { + task_id: taskId, + url: result.tos_urls.join(','), + status: result.status === 'success' ? -1 : -2 + } + }; + + // 删除结果记录(一次性消费) + await GenerationResult.deleteOne({ task_id: taskId }); + + return response; + } + + // 2. 查询任务状态 + const task = await GenerationTask.findOne({ task_id: taskId }); + + if (!task) { + return { + created: Math.floor(Date.now() / 1000), + data: { task_id: taskId, url: "", status: 0 } // 任务不存在 + }; + } + + // 3. 根据任务状态返回 + const statusMap = { + 'pending': 0, + 'processing': 0, + 'polling': 0, + 'failed': -2, + 'completed': -1 // 这种情况理论上不会出现,因为completed会生成result + }; + + return { + created: Math.floor(Date.now() / 1000), + data: { + task_id: taskId, + url: "", + status: statusMap[task.status] || 0 + } + }; + } +} +``` + +### 3.2 兼容性切换机制 + +```typescript +// 通过环境变量控制新旧方法 +const USE_DATABASE_MODE = process.env.USE_DATABASE_MODE === 'true'; + +// 统一的调用入口 +export async function generateImages( + model: string, + taskId: string, + prompt: string, + params: any, + refreshToken: string +) { + if (USE_DATABASE_MODE) { + return await DatabaseGenerationService.generateImagesV2( + model, taskId, prompt, params, refreshToken + ); + } else { + // 调用原有方法 + return await OriginalImageController.generateImages( + model, taskId, prompt, params, refreshToken + ); + } +} +``` + +## 4. 环境变量配置 + +### 4.1 新增环境变量 + +```bash +# 数据库模式开关 +USE_DATABASE_MODE=false # 默认false,保持兼容性 + +# 任务处理配置 +MAX_CONCURRENT_TASKS=3 # 最大并发任务数(2核2G建议3个) +TASK_POLL_INTERVAL=5 # 任务轮询间隔(秒) +IMAGE_TASK_TIMEOUT=3600 # 图片任务超时时间(秒,默认1小时) +VIDEO_TASK_TIMEOUT=86400 # 视频任务超时时间(秒,默认24小时) +RESULT_EXPIRE_TIME=86400 # 结果过期时间(秒,默认24小时) + +# 服务标识(已有) +SERVICE_ID=jimeng-api-3302 # 服务唯一标识 +``` + +### 4.2 配置文件更新 + +在 `ecosystem.config.json` 中为每个实例配置不同的 `SERVICE_ID`: + +```json +{ + "apps": [ + { + "name": "jimeng-api-3302", + "env": { + "SERVICE_ID": "jimeng-api-3302", + "PORT": 3302 + } + }, + { + "name": "jimeng-api-3303", + "env": { + "SERVICE_ID": "jimeng-api-3303", + "PORT": 3303 + } + } + ] +} +``` + +## 5. 数据库操作优化 + +### 5.1 原子操作设计 + +```typescript +// 原子更新任务状态,避免并发冲突 +async function atomicUpdateTaskStatus( + taskId: string, + fromStatus: string, + toStatus: string, + updateData: any = {} +): Promise { + const result = await GenerationTask.updateOne( + { + task_id: taskId, + status: fromStatus + }, + { + status: toStatus, + updated_at: Math.floor(Date.now() / 1000), + ...updateData + } + ); + + return result.modifiedCount > 0; +} +``` + +### 5.2 批量查询优化 + +```typescript +// 批量获取待处理任务 +async function getBatchPendingTasks(serverId: string, limit: number): Promise { + const currentTime = Math.floor(Date.now() / 1000); + + return await GenerationTask.find({ + server_id: serverId, + status: 'pending' + }) + .sort({ created_at: 1 }) // 先入先出 + .limit(limit); +} +``` + +## 6. 监控和日志 + +### 6.1 任务状态监控 + +```typescript +class TaskMonitor { + static async getTaskStats(serverId?: string): Promise { + const filter = serverId ? { server_id: serverId } : {}; + + const stats = await GenerationTask.aggregate([ + { $match: filter }, + { + $group: { + _id: "$status", + count: { $sum: 1 } + } + } + ]); + + return stats.reduce((acc, curr) => { + acc[curr._id] = curr.count; + return acc; + }, {}); + } + + static async getServerLoad(serverId: string): Promise { + return await GenerationTask.countDocuments({ + server_id: serverId, + status: { $in: ['processing', 'polling'] } + }); + } +} +``` + +### 6.2 专用日志文件 + +```typescript +// 创建专用的任务轮询日志 +const TASK_POLLING_LOG_PATH = path.resolve("./logs/task_polling.log"); + +function taskLog(message: string) { + const timestamp = formatInTimeZone(new Date(), timeZone, "yyyy-MM-dd HH:mm:ss.SSS"); + const logMessage = `[TaskPolling][${timestamp}] ${message}`; + + fs.ensureDirSync(path.dirname(TASK_POLLING_LOG_PATH)); + fs.appendFileSync(TASK_POLLING_LOG_PATH, logMessage + "\n"); +} +``` + +## 7. 数据清理策略 + +### 7.1 自动清理任务 + +```typescript +class DatabaseCleanupService { + /** + * 清理过期的结果记录 + */ + static async cleanupExpiredResults(): Promise { + const currentTime = Math.floor(Date.now() / 1000); + + const result = await GenerationResult.deleteMany({ + expires_at: { $lt: currentTime } + }); + + return result.deletedCount; + } + + /** + * 清理超时的任务 + */ + static async cleanupTimeoutTasks(): Promise { + const currentTime = Math.floor(Date.now() / 1000); + + // 查找超时的任务并标记为失败 + const timeoutTasks = await GenerationTask.find({ + status: { $in: ['processing', 'polling'] }, + $expr: { + $gt: [ + { $subtract: [currentTime, '$started_at'] }, + '$task_timeout' + ] + } + }); + + // 批量更新超时任务为失败状态 + const timeoutTaskIds = timeoutTasks.map(task => task.task_id); + + if (timeoutTaskIds.length > 0) { + await GenerationTask.updateMany( + { task_id: { $in: timeoutTaskIds } }, + { + status: 'failed', + error_message: 'Task timeout', + updated_at: currentTime, + completed_at: currentTime + } + ); + + // 为超时任务创建失败结果记录 + const failedResults = timeoutTasks.map(task => ({ + task_id: task.task_id, + task_type: task.task_type, + server_id: task.server_id, + status: 'failed', + original_urls: [], + tos_urls: [], + metadata: { + total_files: 0, + successful_uploads: 0, + fail_reason: 'Task timeout after ' + task.task_timeout + ' seconds' + }, + created_at: currentTime, + expires_at: currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400') + })); + + if (failedResults.length > 0) { + await GenerationResult.insertMany(failedResults); + } + } + + return timeoutTaskIds.length; + } +} +``` + +### 7.2 定期清理任务 + +集成到轮询服务中,每小时执行一次清理: + +```typescript +// 在TaskPollingService中添加 +private cleanupCounter = 0; + +private async processTasks(): Promise { + // ... 现有逻辑 ... + + // 每720次轮询(1小时)执行一次清理 + this.cleanupCounter++; + if (this.cleanupCounter >= 720) { + await this.performCleanup(); + this.cleanupCounter = 0; + } +} +``` + +## 8. 迁移计划 + +### Phase 1: 准备阶段 +- [ ] 创建数据模型文件 +- [ ] 实现新的生成服务类 +- [ ] 添加轮询服务框架 +- [ ] 更新环境变量配置 + +### Phase 2: 测试阶段 +- [ ] 单元测试编写 +- [ ] 本地环境测试 +- [ ] 小流量灰度测试(USE_DATABASE_MODE=true) + +### Phase 3: 切换阶段 +- [ ] 全量切换到数据库模式 +- [ ] 监控系统稳定性 +- [ ] 性能调优 + +### Phase 4: 清理阶段 +- [ ] 移除旧的内存缓存代码 +- [ ] 清理不再使用的配置 +- [ ] 文档更新 + +## 9. 风险评估和应对 + +### 9.1 主要风险 + +1. **数据库性能瓶颈** + - 风险:高频轮询可能对MongoDB造成压力 + - 应对:优化索引、控制轮询频率、使用连接池 + +2. **任务丢失风险** + - 风险:服务器宕机时正在处理的任务可能丢失 + - 应对:实现任务恢复机制、超时重试 + +3. **状态不一致** + - 风险:并发更新可能导致状态不一致 + - 应对:使用原子操作、乐观锁 + +### 9.2 回滚方案 + +- 保持 `USE_DATABASE_MODE=false` 可随时切回原有方案 +- 原有代码完整保留,确保回滚路径畅通 +- 数据库数据不影响原有内存缓存功能 + +## 10. 性能指标 + +### 10.1 目标指标 + +- **任务处理延迟**: < 10秒(从pending到processing) +- **轮询响应时间**: < 100ms +- **数据库查询时间**: < 50ms +- **任务成功率**: > 99% +- **内存使用**: < 512MB(单实例) + +### 10.2 监控指标 + +- 各状态任务数量统计 +- 平均任务处理时间 +- 服务器负载情况 +- 数据库连接状态 +- 错误率统计 + +--- + +## 总结 + +这个改造方案将现有的内存缓存架构平滑迁移到MongoDB数据库,保持了接口兼容性,支持多服务器部署,实现了任务的持久化和可靠处理。每个服务器只需要关注自己的任务,简化了架构复杂度,提高了系统的可维护性和可扩展性。 + +## 核对清单 + +### ✅ 已完善的部分 + +1. **接口范围明确**: + - ✅ 明确了新策略适用的接口:`/v1/images/generations`、`/v1/video/generations` 及对应的 query 接口 + - ✅ 明确了保持现状的接口:`/v1/upload/images`(同步接口,不使用缓存策略) + +2. **超时配置完善**: + - ✅ 添加了任务级别的超时配置:`task_timeout` 字段 + - ✅ 区分了图片和视频的超时时间:图片1小时,视频24小时 + - ✅ 实现了超时任务自动标记为失败的机制 + - ✅ 在轮询服务中集成了超时检查逻辑 + +3. **配置文件更新**: + - ✅ 更新了 `template.ecosystem.config.json`,添加了所有新的环境变量 + - ✅ 更新了 `template.docker-compose.yml`,添加了数据库模式和超时配置 + - ✅ 为每个服务实例配置了独立的超时参数 + +4. **数据库设计优化**: + - ✅ 任务表增加了 `task_timeout` 字段 + - ✅ 超时清理逻辑会创建失败结果记录,保证query接口的一致性 + - ✅ 实现了原子操作避免并发冲突 + +### 📋 环境变量配置总览 + +| 变量名 | 默认值 | 说明 | +|--------|--------|------| +| `USE_DATABASE_MODE` | `false` | 数据库模式开关,控制新旧方法切换 | +| `MAX_CONCURRENT_TASKS` | `3` | 最大并发任务数(2核2G服务器建议值) | +| `TASK_POLL_INTERVAL` | `5` | 轮询间隔(秒) | +| `IMAGE_TASK_TIMEOUT` | `3600` | 图片任务超时时间(1小时) | +| `VIDEO_TASK_TIMEOUT` | `86400` | 视频任务超时时间(24小时) | +| `RESULT_EXPIRE_TIME` | `86400` | 结果过期时间(24小时) | +| `SERVICE_ID` | - | 服务唯一标识(必须配置) | + +### 🔄 任务状态流转确认 + +``` +pending → processing → polling → completed/failed + ↓ ↓ ↓ + 超时 超时 超时 + ↓ ↓ ↓ + failed failed failed +``` + +### 🚫 不受影响的接口 + +- `/v1/upload/images` - 同步上传接口,保持现有实现 +- 所有其他同步接口保持不变 +- 现有的 `ImagesTaskCache` 和 `VideoTaskCache` 内存缓存在 `USE_DATABASE_MODE=false` 时继续正常工作 + +### ⚠️ 注意事项 + +1. **渐进式迁移**:通过 `USE_DATABASE_MODE` 环境变量控制,可以随时回滚 +2. **超时处理**:超时任务会自动创建失败结果记录,确保query接口行为一致 +3. **服务隔离**:每个服务只处理自己的任务,通过 `SERVICE_ID` 区分 +4. **轮询分离**:任务轮询服务(5秒间隔)与心跳服务(60秒间隔)完全独立 +5. **配置灵活**:所有关键参数都通过环境变量配置,支持不同环境的差异化设置 \ No newline at end of file diff --git a/src/api/ImagesTaskCache.ts b/src/api/ImagesTaskCache.ts index 85329ed..0507c92 100644 --- a/src/api/ImagesTaskCache.ts +++ b/src/api/ImagesTaskCache.ts @@ -3,6 +3,8 @@ import path from 'path'; // import { format as dateFormat } from 'date-fns'; const timeZone = 'Asia/Shanghai'; // Beijing Time import { formatInTimeZone } from 'date-fns-tz'; +import TOSService from '@/lib/tos/tos-service.js'; +import logger from '@/lib/logger.js'; const LOG_PATH = path.resolve("./logs/images_task_cache.log"); @@ -23,11 +25,15 @@ export class ImagesTaskCache { private static instance: ImagesTaskCache; private taskCache: Map; private tosProcessedTasks: Set; // 记录已处理TOS上传的任务 + private cleanupInterval: NodeJS.Timeout | null = null; private constructor() { this.taskCache = new Map(); this.tosProcessedTasks = new Set(); cacheLog("ImagesTaskCache initialized"); + + // 启动定时清理任务(每30分钟) + this.startPeriodicCleanup(); } public static getInstance(): ImagesTaskCache { @@ -37,47 +43,181 @@ export class ImagesTaskCache { return ImagesTaskCache.instance; } + /** + * 启动定期清理 + */ + private startPeriodicCleanup(): void { + // 每30分钟清理一次过期任务 + this.cleanupInterval = setInterval(() => { + this.clearExpiredTasks(); + }, 30 * 60 * 1000); // 30分钟 + + cacheLog("Periodic cleanup started for ImagesTaskCache"); + } + + /** + * 停止定期清理 + */ + public stopPeriodicCleanup(): void { + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = null; + cacheLog("Periodic cleanup stopped for ImagesTaskCache"); + } + } + public startTask(taskId: string): void { const startTime = Math.floor(Date.now() / 1000); // Current time in seconds this.taskCache.set(taskId, startTime); cacheLog(`Task started: ${taskId} at ${startTime}`); } - public finishTask(taskId: string, status: -1 | -2 | -3, url:string = ''): void { - if (this.taskCache.has(taskId)) { - this.taskCache.set(taskId, status); - let statusMessage = ''; - switch (status) { - case -1: - { - statusMessage = 'successfully'; - if (url) { - this.taskCache.set(taskId, url); - } - } - break; - case -2: statusMessage = 'failed'; break; - case -3: statusMessage = 'timed out'; break; + /** + * 处理图片URL上传到TOS + * @param imageUrls 图片URL数组 + * @returns TOS URL数组 + */ + private async uploadImagesToTOS(imageUrls: string[]): Promise { + const tosUrls: string[] = []; + + for (const imageUrl of imageUrls) { + try { + // 从URL获取文件名 + const fileName = `image-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.webp`; + // 上传到TOS + const tosUrl = await TOSService.uploadFromUrl(imageUrl, `images/${fileName}`); + tosUrls.push(tosUrl); + logger.info(`图片上传到TOS成功: ${imageUrl} -> ${tosUrl}`); + } catch (error) { + logger.error(`图片上传到TOS失败: ${imageUrl}`, error); + // 如果上传失败,保留原URL + tosUrls.push(imageUrl); } - cacheLog(`Task ${taskId} finished ${statusMessage} (status: ${status})`); - } else { - cacheLog(`Attempted to finish non-existent task: ${taskId}`); } + + return tosUrls; + } + + public async finishTask(taskId: string, status: -1 | -2 | -3, url: string = ''): Promise { + if (!this.taskCache.has(taskId)) { + cacheLog(`Attempted to finish non-existent task: ${taskId}`); + return; + } + + let finalUrl = url; + let statusMessage = ''; + + switch (status) { + case -1: { + statusMessage = 'successfully'; + if (url) { + try { + // 任务成功完成时,自动上传到TOS + cacheLog(`开始上传图片到TOS: ${taskId}`); + const imageUrls = url.split(','); + const tosUrls = await this.uploadImagesToTOS(imageUrls); + finalUrl = tosUrls.join(','); + this.tosProcessedTasks.add(taskId); + cacheLog(`Task ${taskId} TOS上传完成,存储TOS地址: ${finalUrl}`); + } catch (error) { + logger.error(`TOS上传失败,使用原始URL: ${taskId}`, error); + finalUrl = url; // 保留原始URL + cacheLog(`Task ${taskId} TOS上传失败,使用原始URL`); + } + } + break; + } + case -2: statusMessage = 'failed'; break; + case -3: statusMessage = 'timed out'; break; + } + + // 存储最终URL(TOS地址或原始URL) + this.taskCache.set(taskId, finalUrl || status); + cacheLog(`Task ${taskId} finished ${statusMessage} (status: ${status})`); } public getTaskStatus(taskId: string): number | string | undefined { return this.taskCache.get(taskId); } + /** + * 检查任务是否已处理TOS上传(兼容性保持) + */ public isTosProcessed(taskId: string): boolean { return this.tosProcessedTasks.has(taskId); } + /** + * 标记任务为已处理TOS上传(兼容性保持) + */ public markTosProcessed(taskId: string): void { this.tosProcessedTasks.add(taskId); cacheLog(`Task ${taskId} marked as TOS processed`); } + /** + * 获取任务结果并释放缓存 + * @param taskId 任务ID + * @returns 任务结果,如果不存在返回undefined + */ + public getTaskResultAndClear(taskId: string): number | string | undefined { + const result = this.taskCache.get(taskId); + if (result && typeof result === 'string') { + // 只有当任务完成时(返回字符串URL)才清除缓存 + this.taskCache.delete(taskId); + this.tosProcessedTasks.delete(taskId); + cacheLog(`Task ${taskId} result retrieved and cache cleared`); + } + return result; + } + + /** + * 清理过期任务(超过1小时的任务) + * 防止在低配置服务器上内存泄漏 + */ + public clearExpiredTasks(): void { + const now = Math.floor(Date.now() / 1000); + const expiredTime = 3600; // 1尊时 + let clearCount = 0; + + for (const [taskId, status] of this.taskCache.entries()) { + if (typeof status === 'number' && status > 0) { + // 这是一个时间戳,检查是否过期 + if (now - status > expiredTime) { + this.taskCache.delete(taskId); + this.tosProcessedTasks.delete(taskId); + clearCount++; + } + } + } + + if (clearCount > 0) { + cacheLog(`Cleared ${clearCount} expired tasks`); + } + } + + /** + * 获取缓存统计信息 + */ + public getCacheStats(): { totalTasks: number, completedTasks: number, pendingTasks: number } { + let completedTasks = 0; + let pendingTasks = 0; + + for (const [, status] of this.taskCache.entries()) { + if (typeof status === 'string') { + completedTasks++; + } else if (typeof status === 'number' && status > 0) { + pendingTasks++; + } + } + + return { + totalTasks: this.taskCache.size, + completedTasks, + pendingTasks + }; + } + public getPendingTasks(): string[] { const pendingTasks: string[] = []; for (const [taskId, status] of this.taskCache.entries()) { @@ -95,6 +235,13 @@ export class ImagesTaskCache { } else { cacheLog("No pending tasks at shutdown."); } + + // 关闭时停止定时清理并进行最终清理 + this.stopPeriodicCleanup(); + this.clearExpiredTasks(); + + const stats = this.getCacheStats(); + cacheLog(`Final cache stats - Total: ${stats.totalTasks}, Completed: ${stats.completedTasks}, Pending: ${stats.pendingTasks}`); } } diff --git a/src/api/VideoTaskCache.ts b/src/api/VideoTaskCache.ts index cd44ddc..b5e0bdb 100644 --- a/src/api/VideoTaskCache.ts +++ b/src/api/VideoTaskCache.ts @@ -3,6 +3,8 @@ import path from 'path'; // import { format as dateFormat } from 'date-fns'; const timeZone = 'Asia/Shanghai'; // Beijing Time import { formatInTimeZone } from 'date-fns-tz'; +import TOSService from '@/lib/tos/tos-service.js'; +import logger from '@/lib/logger.js'; const LOG_PATH = path.resolve("./logs/video_task_cache.log"); @@ -23,11 +25,15 @@ export class VideoTaskCache { private static instance: VideoTaskCache; private taskCache: Map; private tosProcessedTasks: Set; // 记录已处理TOS上传的任务 + private cleanupInterval: NodeJS.Timeout | null = null; private constructor() { this.taskCache = new Map(); this.tosProcessedTasks = new Set(); cacheLog("VideoTaskCache initialized"); + + // 启动定时清理任务(每30分钟) + this.startPeriodicCleanup(); } public static getInstance(): VideoTaskCache { @@ -37,47 +43,181 @@ export class VideoTaskCache { return VideoTaskCache.instance; } + /** + * 启动定期清理 + */ + private startPeriodicCleanup(): void { + // 每30分钟清理一次过期任务 + this.cleanupInterval = setInterval(() => { + this.clearExpiredTasks(); + }, 30 * 60 * 1000); // 30分钟 + + cacheLog("Periodic cleanup started for VideoTaskCache"); + } + + /** + * 停止定期清理 + */ + public stopPeriodicCleanup(): void { + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = null; + cacheLog("Periodic cleanup stopped for VideoTaskCache"); + } + } + public startTask(taskId: string): void { const startTime = Math.floor(Date.now() / 1000); // Current time in seconds this.taskCache.set(taskId, startTime); cacheLog(`Task started: ${taskId} at ${startTime}`); } - public finishTask(taskId: string, status: -1 | -2 | -3, url:string = ''): void { - if (this.taskCache.has(taskId)) { - this.taskCache.set(taskId, status); - let statusMessage = ''; - switch (status) { - case -1: - { - statusMessage = 'successfully'; - if (url) { - this.taskCache.set(taskId, url); - } - } - break; - case -2: statusMessage = 'failed'; break; - case -3: statusMessage = 'timed out'; break; + /** + * 处理视频URL上传到TOS + * @param videoUrls 视频URL数组 + * @returns TOS URL数组 + */ + private async uploadVideosToTOS(videoUrls: string[]): Promise { + const tosUrls: string[] = []; + + for (const videoUrl of videoUrls) { + try { + // 从URL获取文件名 + const fileName = `video-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.mp4`; + // 上传到TOS + const tosUrl = await TOSService.uploadFromUrl(videoUrl, `videos/${fileName}`); + tosUrls.push(tosUrl); + logger.info(`视频上传到TOS成功: ${videoUrl} -> ${tosUrl}`); + } catch (error) { + logger.error(`视频上传到TOS失败: ${videoUrl}`, error); + // 如果上传失败,保留原URL + tosUrls.push(videoUrl); } - cacheLog(`Task ${taskId} finished ${statusMessage} (status: ${status})`); - } else { - cacheLog(`Attempted to finish non-existent task: ${taskId}`); } + + return tosUrls; + } + + public async finishTask(taskId: string, status: -1 | -2 | -3, url: string = ''): Promise { + if (!this.taskCache.has(taskId)) { + cacheLog(`Attempted to finish non-existent task: ${taskId}`); + return; + } + + let finalUrl = url; + let statusMessage = ''; + + switch (status) { + case -1: { + statusMessage = 'successfully'; + if (url) { + try { + // 任务成功完成时,自动上传到TOS + cacheLog(`开始上传视频到TOS: ${taskId}`); + const videoUrls = url.split(','); + const tosUrls = await this.uploadVideosToTOS(videoUrls); + finalUrl = tosUrls.join(','); + this.tosProcessedTasks.add(taskId); + cacheLog(`Task ${taskId} TOS上传完成,存储TOS地址: ${finalUrl}`); + } catch (error) { + logger.error(`TOS上传失败,使用原始URL: ${taskId}`, error); + finalUrl = url; // 保留原始URL + cacheLog(`Task ${taskId} TOS上传失败,使用原始URL`); + } + } + break; + } + case -2: statusMessage = 'failed'; break; + case -3: statusMessage = 'timed out'; break; + } + + // 存储最终URL(TOS地址或原始URL) + this.taskCache.set(taskId, finalUrl || status); + cacheLog(`Task ${taskId} finished ${statusMessage} (status: ${status})`); } public getTaskStatus(taskId: string): number | string | undefined { return this.taskCache.get(taskId); } + /** + * 检查任务是否已处理TOS上传(兼容性保持) + */ public isTosProcessed(taskId: string): boolean { return this.tosProcessedTasks.has(taskId); } + /** + * 标记任务为已处理TOS上传(兼容性保持) + */ public markTosProcessed(taskId: string): void { this.tosProcessedTasks.add(taskId); cacheLog(`Task ${taskId} marked as TOS processed`); } + /** + * 获取任务结果并释放缓存 + * @param taskId 任务ID + * @returns 任务结果,如果不存在返回undefined + */ + public getTaskResultAndClear(taskId: string): number | string | undefined { + const result = this.taskCache.get(taskId); + if (result && typeof result === 'string') { + // 只有当任务完成时(返回字符串URL)才清除缓存 + this.taskCache.delete(taskId); + this.tosProcessedTasks.delete(taskId); + cacheLog(`Task ${taskId} result retrieved and cache cleared`); + } + return result; + } + + /** + * 清理过期任务(超过1小时的任务) + * 防止在低配置服务器上内存泄漏 + */ + public clearExpiredTasks(): void { + const now = Math.floor(Date.now() / 1000); + const expiredTime = 3600; // 1小时 + let clearCount = 0; + + for (const [taskId, status] of this.taskCache.entries()) { + if (typeof status === 'number' && status > 0) { + // 这是一个时间戳,检查是否过期 + if (now - status > expiredTime) { + this.taskCache.delete(taskId); + this.tosProcessedTasks.delete(taskId); + clearCount++; + } + } + } + + if (clearCount > 0) { + cacheLog(`Cleared ${clearCount} expired tasks`); + } + } + + /** + * 获取缓存统计信息 + */ + public getCacheStats(): { totalTasks: number, completedTasks: number, pendingTasks: number } { + let completedTasks = 0; + let pendingTasks = 0; + + for (const [, status] of this.taskCache.entries()) { + if (typeof status === 'string') { + completedTasks++; + } else if (typeof status === 'number' && status > 0) { + pendingTasks++; + } + } + + return { + totalTasks: this.taskCache.size, + completedTasks, + pendingTasks + }; + } + public getPendingTasks(): string[] { const pendingTasks: string[] = []; for (const [taskId, status] of this.taskCache.entries()) { @@ -95,6 +235,13 @@ export class VideoTaskCache { } else { cacheLog("No pending tasks at shutdown."); } + + // 关闭时停止定时清理并进行最终清理 + this.stopPeriodicCleanup(); + this.clearExpiredTasks(); + + const stats = this.getCacheStats(); + cacheLog(`Final cache stats - Total: ${stats.totalTasks}, Completed: ${stats.completedTasks}, Pending: ${stats.pendingTasks}`); } } diff --git a/src/api/controllers/images.ts b/src/api/controllers/images.ts index 7e3f56b..59b224c 100644 --- a/src/api/controllers/images.ts +++ b/src/api/controllers/images.ts @@ -258,16 +258,16 @@ export async function generateImages( }); const validImageUrls = imageUrls.filter(url => url !== null); if (validImageUrls.length > 0) { - imagesTaskCache.finishTask(task_id, -1, validImageUrls.join(",")); // Success + await imagesTaskCache.finishTask(task_id, -1, validImageUrls.join(",")); // Success } else { // If no valid URLs but no explicit error thrown earlier, consider it a failure. // This could happen if item_list is empty or items don't have video_url. - imagesTaskCache.finishTask(task_id, -2); // Failure + await imagesTaskCache.finishTask(task_id, -2); // Failure throw new APIException(EX.API_IMAGE_GENERATION_FAILED, "图片生成未返回有效链接"); } return validImageUrls; }catch (error) { - imagesTaskCache.finishTask(task_id, -2); // Failure due to exception + await imagesTaskCache.finishTask(task_id, -2); // Failure due to exception throw error; // Re-throw the error to be handled by the caller } } diff --git a/src/api/controllers/video.ts b/src/api/controllers/video.ts index ee4a2a5..821657e 100644 --- a/src/api/controllers/video.ts +++ b/src/api/controllers/video.ts @@ -244,691 +244,20 @@ export async function generateVideo( // Filter out nulls and check if any valid URL was generated const validVideoUrls = videoUrls.filter(url => url !== null); if (validVideoUrls.length > 0) { - videoTaskCache.finishTask(task_id, -1, validVideoUrls.join(",")); // Success + await videoTaskCache.finishTask(task_id, -1, validVideoUrls.join(",")); // Success } else { // If no valid URLs but no explicit error thrown earlier, consider it a failure. // This could happen if item_list is empty or items don't have video_url. - videoTaskCache.finishTask(task_id, -2); // Failure + await videoTaskCache.finishTask(task_id, -2); // Failure throw new APIException(EX.API_VIDEO_GENERATION_FAILED, "视频生成未返回有效链接"); } return validVideoUrls; } catch (error) { - videoTaskCache.finishTask(task_id, -2); // Failure due to exception - throw error; // Re-throw the error to be handled by the caller -} -} -//视频 提升分辨率 -export async function upgradeVideoResolution( - task_id: string, - { - targetVideoId = "", - targetHistoryId = "", - targetSubmitId = "", - originHistoryId = "", - originComponentList = [] - }: { - targetVideoId: string, - targetHistoryId: string, - targetSubmitId: string, - originHistoryId: string, - originComponentList: any[], - }, - refreshToken: string -) { - const videoTaskCache = VideoTaskCache.getInstance(); - videoTaskCache.startTask(task_id); - - try { - - const { totalCredit } = await getCredit(refreshToken); - if (totalCredit <= 0) - await receiveCredit(refreshToken); - - const componentId = util.uuid(); - const originSubmitId = util.uuid(); - //生成视频返回的 historyId item_list[0].video.video_id - let video_id = targetVideoId; - //生成视频返回的 historyId - let pre_historyId = targetHistoryId; - let origin_historyId = originHistoryId; - //生成视频返回的 historyId submit_id - let previewSubmitId = targetSubmitId; - //生成视频返回的 historyId draft_content.component_list[0] - - let origin_component_list_item:any = originComponentList.find((a)=>{ - return a.process_type == 1; - }); - originComponentList.sort((a,b)=>{ - return b.process_type - a.process_type; - }) - let pro_component_list_item:any = originComponentList[0]; - let origin_video_gen_inputs = origin_component_list_item.abilities.gen_video.text_to_video_params.video_gen_inputs[0]; - let prompt = origin_video_gen_inputs.prompt; - let first_frame_image = origin_video_gen_inputs.first_frame_image; - let width = first_frame_image.width; - let height = first_frame_image.height; - let duration = origin_video_gen_inputs.duration_ms; - let metrics_extra = JSON.stringify({ - promptSource: "upscale", - //生成视频返回的 historyId 19680709245698 - originId:origin_historyId, - originSubmitId: originSubmitId, - //返回的 task.first_frame_image 信息 - coverInfo: { - width: first_frame_image.width, - height: first_frame_image.height, - format: "", - imageUri: first_frame_image.image_uri, - imageUrl:first_frame_image.uri, - smartCropLoc: null, - coverUrlMap: {}, - }, - generateTimes: 0, - isDefaultSeed: 1, - previewId: pre_historyId, - //生成视频用的submit_id - previewSubmitId: previewSubmitId, - imageNameMapping: {}, - }); - const { aigc_data } = await request( - "post", - "/mweb/v1/aigc_draft/generate", - refreshToken, - { - params: { - babi_param: encodeURIComponent( - JSON.stringify({ - scenario: "image_video_generation", - feature_key: "text_to_video", - feature_entrance: "to_video", - feature_entrance_detail: "to_image-text_to_video", - }) - ), - }, - data: { - extend: { - m_video_commerce_info: { - resource_id: "generate_video", - resource_id_type: "str", - resource_sub_type: "aigc", - benefit_type: "video_upscale" - }, - root_model: pro_component_list_item.abilities.gen_video.text_to_video_params.model_req_key, - template_id: "", - history_option: {}, - }, - submit_id: util.uuid(), - metrics_extra: metrics_extra, - draft_content: JSON.stringify({ - type: "draft", - id: util.uuid(), - min_version: DRAFT_VERSION, - min_features: [], - is_from_tsn: true, - version: "3.2.2", - main_component_id: componentId, - //上一步生成视频任务返回的 historyId 中 draft_content的内容作为第一项 - component_list: [ - ...originComponentList, - { - type: "video_base_component", - id: componentId, - min_version: DRAFT_V_VERSION, - //上一步生成视频任务返回的 historyId 中 draft_content的内容的id - parent_id: pro_component_list_item.id, - metadata: { - type: "", - id: util.uuid(), - created_platform: 3, - created_platform_version: "", - created_time_in_ms: Date.now(), - created_did: "", - }, - generate_type: "gen_video", - aigc_mode: "workbench", - abilities: { - type: "", - id: util.uuid(), - gen_video:{ - type: "", - id: util.uuid(), - text_to_video_params:{ - type: "", - id: util.uuid(), - video_gen_inputs:[ - { - type: "", - id: util.uuid(), - min_version: DRAFT_V_VERSION, - prompt: prompt, - first_frame_image:{ - type: "image", - id: util.uuid(), - source_from: "upload", - platform_type: 1, - name: "", - image_uri: first_frame_image.image_uri, - width: first_frame_image.width, - height: first_frame_image.height, - format: "", - uri: first_frame_image.image_uri, - }, - lens_motion_type: "", - video_mode:2, - //上一步生成视频任务返回的 historyId 中 的video_id - vid: video_id, - fps:24, - duration_ms:duration, - v2v_opt: { - type: "", - id: util.uuid(), - min_version: "3.1.0", - super_resolution: { - type: "", - id: util.uuid(), - enable: true, - target_width: width*2, - target_height: height*2, - origin_width: width, - origin_height: height, - }, - }, - //上一步生成视频任务返回的 historyId - origin_history_id: pre_historyId, - } - ], - video_aspect_ratio:pro_component_list_item.abilities.gen_video.text_to_video_params.video_aspect_ratio, - model_req_key: pro_component_list_item.abilities.gen_video.text_to_video_params.model_req_key, - }, - scene: "super_resolution", - //上面生成的 metrics_extra - video_task_extra:metrics_extra, - video_ref_params: { - type: "", - id: util.uuid(), - generate_type: 0, - item_id: (7512653500000000000 + Date.now()), - origin_history_id: pre_historyId, - }, - }, - }, - process_type:pro_component_list_item.process_type+1, - }, - ], - }), - }, - } - ); - const historyId = aigc_data.history_record_id; - if (!historyId) - throw new APIException(EX.API_VIDEO_GENERATION_FAILED, "高清 记录ID不存在"); - let status = 20, failCode, item_list = []; - //https://jimeng.jianying.com/mweb/v1/get_history_by_ids? - // - let emptyCount = 30; - while (status === 20) { - await new Promise((resolve) => setTimeout(resolve, 1000)); - const result = await request("post", "/mweb/v1/get_history_by_ids", refreshToken, { - data: { - history_ids: [historyId], - http_common_info: { - aid: Number(DEFAULT_ASSISTANT_ID), - }, - }, - }); - if (!result[historyId]){ - logger.warn(`高清 记录ID不存在: ${historyId} 重试次数: ${emptyCount} res: ${JSON.stringify(result)}`); - emptyCount--; - if(emptyCount<=0){ - throw new APIException(EX.API_HISTORY_EMPTY, "高清 记录不存在: " + JSON.stringify(result)); - }else{ - status = 20; - continue; - } - } - status = result[historyId].status; - failCode = result[historyId].fail_code; - item_list = result[historyId].item_list; - } - if (status === 30) { - if (failCode === '2038') - throw new APIException(EX.API_CONTENT_FILTERED); - else - throw new APIException(EX.API_VIDEO_GENERATION_FAILED); - } - // Assuming success if status is not 30 (failed) and not 20 (pending) - // and item_list is populated. - // A more robust check might be needed depending on actual API behavior for success. - const videoUrls = item_list.map((item) => { - if(!item?.video?.transcoded_video?.origin?.video_url) - return null; - return item.video.transcoded_video.origin.video_url; - }); - - // Filter out nulls and check if any valid URL was generated - const validVideoUrls = videoUrls.filter(url => url !== null); - if (validVideoUrls.length > 0) { - videoTaskCache.finishTask(task_id, -1, validVideoUrls.join(",")); // Success - } else { - // If no valid URLs but no explicit error thrown earlier, consider it a failure. - // This could happen if item_list is empty or items don't have video_url. - videoTaskCache.finishTask(task_id, -2); // Failure - throw new APIException(EX.API_VIDEO_GENERATION_FAILED, "高清 视频生成未返回有效链接"); - } - return validVideoUrls; -} catch (error) { - videoTaskCache.finishTask(task_id, -2); // Failure due to exception - throw error; // Re-throw the error to be handled by the caller -} -} - -//视频 补帧 (未完成) -export async function upgradeVideoFrame( - _model: string, - task_id: string, - prompt: string, - { - width = 512, - height = 512, - imgURL = "", - duration = 5000, - }: { - width: number; - height: number; - imgURL: string; - duration: number; - }, - refreshToken: string -) { - const videoTaskCache = VideoTaskCache.getInstance(); - videoTaskCache.startTask(task_id); - - try { - if(!imgURL){ - throw new APIException(EX.API_REQUEST_PARAMS_INVALID); - return; - } - const model = getModel(_model); - logger.info(`使用模型: ${_model} : ${model} 参考图片尺寸: ${width}x${height} 图片地址 ${imgURL} 持续时间: ${duration} 提示词: ${prompt}`); - - const { totalCredit } = await getCredit(refreshToken); - if (totalCredit <= 0) - await receiveCredit(refreshToken); - - const componentId = util.uuid(); - const originSubmitId = util.uuid(); - const { aigc_data } = await request( - "post", - "/mweb/v1/aigc_draft/generate", - refreshToken, - { - params: { - babi_param: encodeURIComponent( - JSON.stringify({ - scenario: "image_video_generation", - feature_key: "text_to_video", - feature_entrance: "to_video", - feature_entrance_detail: "to_image-text_to_video", - }) - ), - }, - data: { - extend: { - m_video_commerce_info: { - resource_id: "generate_video", - resource_id_type: "str", - resource_sub_type: "aigc", - benefit_type: "basic_video_operation_vgfm_v_three" - }, - root_model: model, - template_id: "", - history_option: {}, - }, - submit_id: util.uuid(), - metrics_extra: JSON.stringify({ - promptSource: "custom", - originSubmitId: originSubmitId, - isDefaultSeed: 1, - originTemplateId: "", - imageNameMapping: {}, - }), - draft_content: JSON.stringify({ - type: "draft", - id: util.uuid(), - min_version: DRAFT_VERSION, - min_features: [], - is_from_tsn: true, - version: "3.2.2", - main_component_id: componentId, - component_list: [ - { - type: "video_base_component", - id: componentId, - min_version: DRAFT_V_VERSION, - generate_type: "gen_video", - aigc_mode: "workbench", - metadata: { - type: "", - id: util.uuid(), - created_platform: 3, - created_platform_version: "", - created_time_in_ms: Date.now(), - created_did: "", - }, - abilities: { - type: "", - id: util.uuid(), - gen_video:{ - type: "", - id: util.uuid(), - text_to_video_params:{ - type: "", - id: util.uuid(), - video_gen_inputs:[ - { - type: "", - id: util.uuid(), - min_version: DRAFT_V_VERSION, - prompt: prompt, - first_frame_image:{ - type: "image", - id: util.uuid(), - source_from: "upload", - platform_type: 1, - name: "", - image_uri: imgURL, - width: width, - height: height, - format: "", - uri: imgURL, - }, - video_mode:2, - fps:24, - duration_ms:duration, - } - ], - video_aspect_ratio:"9:16", - seed: Math.floor(Math.random() * 100000000) + 2500000000, - model_req_key: model, - }, - video_task_extra:{ - promptSource: "custom", - originSubmitId: originSubmitId, - isDefaultSeed: 1, - originTemplateId: "", - imageNameMapping: {}, - } - }, - }, - process_type:1, - }, - ], - }), - http_common_info: { - aid: Number(DEFAULT_ASSISTANT_ID), - }, - }, - } - ); - const historyId = aigc_data.history_record_id; - if (!historyId) - throw new APIException(EX.API_VIDEO_GENERATION_FAILED, "记录ID不存在"); - let status = 20, failCode, item_list = []; - //https://jimeng.jianying.com/mweb/v1/get_history_by_ids? - // - while (status === 20) { - await new Promise((resolve) => setTimeout(resolve, 1000)); - const result = await request("post", "/mweb/v1/get_history_by_ids", refreshToken, { - data: { - history_ids: [historyId], - http_common_info: { - aid: Number(DEFAULT_ASSISTANT_ID), - }, - }, - }); - if (!result[historyId]) - throw new APIException(EX.API_HISTORY_EMPTY, "记录不存在"); - status = result[historyId].status; - failCode = result[historyId].fail_code; - item_list = result[historyId].item_list; - } - if (status === 30) { - if (failCode === '2038') - throw new APIException(EX.API_CONTENT_FILTERED); - else - throw new APIException(EX.API_VIDEO_GENERATION_FAILED); - } - // Assuming success if status is not 30 (failed) and not 20 (pending) - // and item_list is populated. - // A more robust check might be needed depending on actual API behavior for success. - const videoUrls = item_list.map((item) => { - if(!item?.video?.transcoded_video?.origin?.video_url) - return null; - return item.video.transcoded_video.origin.video_url; - }); - - // Filter out nulls and check if any valid URL was generated - const validVideoUrls = videoUrls.filter(url => url !== null); - if (validVideoUrls.length > 0) { - videoTaskCache.finishTask(task_id, -1, validVideoUrls.join(",")); // Success - } else { - // If no valid URLs but no explicit error thrown earlier, consider it a failure. - // This could happen if item_list is empty or items don't have video_url. - videoTaskCache.finishTask(task_id, -2); // Failure - throw new APIException(EX.API_VIDEO_GENERATION_FAILED, "视频生成未返回有效链接"); - } - return validVideoUrls; -} catch (error) { - videoTaskCache.finishTask(task_id, -2); // Failure due to exception - throw error; // Re-throw the error to be handled by the caller -} -} - -//视频 生成音效 (未完成) -export async function generateVideoSound( - _model: string, - task_id: string, - prompt: string, - { - width = 512, - height = 512, - imgURL = "", - duration = 5000, - }: { - width: number; - height: number; - imgURL: string; - duration: number; - }, - refreshToken: string -) { - const videoTaskCache = VideoTaskCache.getInstance(); - videoTaskCache.startTask(task_id); - - try { - if(!imgURL){ - throw new APIException(EX.API_REQUEST_PARAMS_INVALID); - return; - } - const model = getModel(_model); - logger.info(`使用模型: ${_model} : ${model} 参考图片尺寸: ${width}x${height} 图片地址 ${imgURL} 持续时间: ${duration} 提示词: ${prompt}`); - - const { totalCredit } = await getCredit(refreshToken); - if (totalCredit <= 0) - await receiveCredit(refreshToken); - - const componentId = util.uuid(); - const originSubmitId = util.uuid(); - const { aigc_data } = await request( - "post", - "/mweb/v1/aigc_draft/generate", - refreshToken, - { - params: { - babi_param: encodeURIComponent( - JSON.stringify({ - scenario: "image_video_generation", - feature_key: "text_to_video", - feature_entrance: "to_video", - feature_entrance_detail: "to_image-text_to_video", - }) - ), - }, - data: { - extend: { - m_video_commerce_info: { - resource_id: "generate_video", - resource_id_type: "str", - resource_sub_type: "aigc", - benefit_type: "basic_video_operation_vgfm_v_three" - }, - root_model: model, - template_id: "", - history_option: {}, - }, - submit_id: util.uuid(), - metrics_extra: JSON.stringify({ - promptSource: "custom", - originSubmitId: originSubmitId, - isDefaultSeed: 1, - originTemplateId: "", - imageNameMapping: {}, - }), - draft_content: JSON.stringify({ - type: "draft", - id: util.uuid(), - min_version: DRAFT_VERSION, - min_features: [], - is_from_tsn: true, - version: "3.2.2", - main_component_id: componentId, - component_list: [ - { - type: "video_base_component", - id: componentId, - min_version: DRAFT_V_VERSION, - generate_type: "gen_video", - aigc_mode: "workbench", - metadata: { - type: "", - id: util.uuid(), - created_platform: 3, - created_platform_version: "", - created_time_in_ms: Date.now(), - created_did: "", - }, - abilities: { - type: "", - id: util.uuid(), - gen_video:{ - type: "", - id: util.uuid(), - text_to_video_params:{ - type: "", - id: util.uuid(), - video_gen_inputs:[ - { - type: "", - id: util.uuid(), - min_version: DRAFT_V_VERSION, - prompt: prompt, - first_frame_image:{ - type: "image", - id: util.uuid(), - source_from: "upload", - platform_type: 1, - name: "", - image_uri: imgURL, - width: width, - height: height, - format: "", - uri: imgURL, - }, - video_mode:2, - fps:24, - duration_ms:duration, - } - ], - video_aspect_ratio:"9:16", - seed: Math.floor(Math.random() * 100000000) + 2500000000, - model_req_key: model, - }, - video_task_extra:{ - promptSource: "custom", - originSubmitId: originSubmitId, - isDefaultSeed: 1, - originTemplateId: "", - imageNameMapping: {}, - } - }, - }, - process_type:1, - }, - ], - }), - http_common_info: { - aid: Number(DEFAULT_ASSISTANT_ID), - }, - }, - } - ); - const historyId = aigc_data.history_record_id; - if (!historyId) - throw new APIException(EX.API_VIDEO_GENERATION_FAILED, "记录ID不存在"); - let status = 20, failCode, item_list = []; - //https://jimeng.jianying.com/mweb/v1/get_history_by_ids? - // - while (status === 20) { - await new Promise((resolve) => setTimeout(resolve, 1000)); - const result = await request("post", "/mweb/v1/get_history_by_ids", refreshToken, { - data: { - history_ids: [historyId], - http_common_info: { - aid: Number(DEFAULT_ASSISTANT_ID), - }, - }, - }); - if (!result[historyId]) - throw new APIException(EX.API_HISTORY_EMPTY, "记录不存在"); - status = result[historyId].status; - failCode = result[historyId].fail_code; - item_list = result[historyId].item_list; - } - if (status === 30) { - if (failCode === '2038') - throw new APIException(EX.API_CONTENT_FILTERED); - else - throw new APIException(EX.API_VIDEO_GENERATION_FAILED); - } - // Assuming success if status is not 30 (failed) and not 20 (pending) - // and item_list is populated. - // A more robust check might be needed depending on actual API behavior for success. - const videoUrls = item_list.map((item) => { - if(!item?.video?.transcoded_video?.origin?.video_url) - return null; - return item.video.transcoded_video.origin.video_url; - }); - - // Filter out nulls and check if any valid URL was generated - const validVideoUrls = videoUrls.filter(url => url !== null); - if (validVideoUrls.length > 0) { - videoTaskCache.finishTask(task_id, -1, validVideoUrls.join(",")); // Success - } else { - // If no valid URLs but no explicit error thrown earlier, consider it a failure. - // This could happen if item_list is empty or items don't have video_url. - videoTaskCache.finishTask(task_id, -2); // Failure - throw new APIException(EX.API_VIDEO_GENERATION_FAILED, "视频生成未返回有效链接"); - } - return validVideoUrls; -} catch (error) { - videoTaskCache.finishTask(task_id, -2); // Failure due to exception + await videoTaskCache.finishTask(task_id, -2); // Failure due to exception throw error; // Re-throw the error to be handled by the caller } } export default { generateVideo, - upgradeVideoResolution, - // upgradeVideoFrame, - // generateVideoSound, }; diff --git a/src/api/routes/images.ts b/src/api/routes/images.ts index 6a349f2..252cf13 100644 --- a/src/api/routes/images.ts +++ b/src/api/routes/images.ts @@ -5,34 +5,6 @@ import { generateImages } from "@/api/controllers/images.ts"; import { tokenSplit } from "@/api/controllers/core.ts"; import util from "@/lib/util.ts"; import { ImagesTaskCache } from '@/api/ImagesTaskCache.ts'; -import TOSService from "@/lib/tos/tos-service.ts"; -import logger from "@/lib/logger.ts"; - -/** - * 处理图片URL上传到TOS - * @param imageUrls 图片URL数组 - * @returns TOS URL数组 - */ -async function uploadImagesToTOS(imageUrls: string[]): Promise { - const tosUrls: string[] = []; - - for (const imageUrl of imageUrls) { - try { - // 从URL获取文件名 - const fileName = `image-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.webp`; - // 上传到TOS - const tosUrl = await TOSService.uploadFromUrl(imageUrl, `images/${fileName}`); - tosUrls.push(tosUrl); - logger.info(`图片上传到TOS成功: ${imageUrl} -> ${tosUrl}`); - } catch (error) { - logger.error(`图片上传到TOS失败: ${imageUrl}`, error); - // 如果上传失败,保留原URL - tosUrls.push(imageUrl); - } - } - - return tosUrls; -} export default { prefix: "/v1/images", @@ -44,44 +16,22 @@ export default { const { task_id, } = request.query; // 从 query 中获取 - let res = imagesTaskCache.getTaskStatus(task_id); - // console.log("查询任务状态", task_id, 'res:',res); - if(typeof res === 'string'){ - // 任务已完成,检查是否已处理TOS上传 - if (!imagesTaskCache.isTosProcessed(task_id)) { - // 尚未处理TOS上传,处理图片URL上传到TOS - try { - const imageUrls = res.split(','); - const tosUrls = await uploadImagesToTOS(imageUrls); - const tosUrlsString = tosUrls.join(','); - - // 更新缓存中TOS URL并标记为已处理 - imagesTaskCache.finishTask(task_id, -1, tosUrlsString); - imagesTaskCache.markTosProcessed(task_id); - - return { - created: util.unixTimestamp(), - data:{task_id, url: tosUrlsString, status:-1}, - }; - } catch (error) { - logger.error(`处理图片TOS上传失败: ${task_id}`, error); - // 如果上传失败,返回原始URL - return { - created: util.unixTimestamp(), - data:{task_id, url:res, status:-1}, - }; - } - } else { - // 已处理TOS上传,直接返回缓存的TOS URL - return { - created: util.unixTimestamp(), - data:{task_id, url:res, status:-1}, - }; - } - }else{ + + // 使用新的方法获取任务结果并清理缓存 + let res = imagesTaskCache.getTaskResultAndClear(task_id); + + if (typeof res === 'string') { + // 任务已完成,返回TOS地址(已经在finishTask中处理过) return { created: util.unixTimestamp(), - data:{task_id, url:"", status:res||0}, + data: { task_id, url: res, status: -1 }, + }; + } else { + // 任务进行中或失败,不清理缓存 + res = imagesTaskCache.getTaskStatus(task_id); + return { + created: util.unixTimestamp(), + data: { task_id, url: "", status: res || 0 }, }; } }, diff --git a/src/api/routes/video.ts b/src/api/routes/video.ts index 6ffe46a..aecc1cb 100644 --- a/src/api/routes/video.ts +++ b/src/api/routes/video.ts @@ -1,38 +1,10 @@ import _ from "lodash"; import Request from "@/lib/request/Request.ts"; -import { generateVideo, upgradeVideoResolution } from "@/api/controllers/video.ts"; +import { generateVideo } from "@/api/controllers/video.ts"; import { tokenSplit } from "@/api/controllers/core.ts"; import util from "@/lib/util.ts"; import { VideoTaskCache } from '@/api/VideoTaskCache.ts'; -import TOSService from "@/lib/tos/tos-service.ts"; -import logger from "@/lib/logger.ts"; - -/** - * 处理视频URL上传到TOS - * @param videoUrls 视频URL数组 - * @returns TOS URL数组 - */ -async function uploadVideosToTOS(videoUrls: string[]): Promise { - const tosUrls: string[] = []; - - for (const videoUrl of videoUrls) { - try { - // 从URL获取文件名 - const fileName = `video-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.mp4`; - // 上传到TOS - const tosUrl = await TOSService.uploadFromUrl(videoUrl, `videos/${fileName}`); - tosUrls.push(tosUrl); - logger.info(`视频上传到TOS成功: ${videoUrl} -> ${tosUrl}`); - } catch (error) { - logger.error(`视频上传到TOS失败: ${videoUrl}`, error); - // 如果上传失败,保留原URL - tosUrls.push(videoUrl); - } - } - - return tosUrls; -} export default { prefix: "/v1/video", @@ -44,44 +16,22 @@ export default { const { task_id, } = request.query; // 从 query 中获取 - let res = videoTaskCache.getTaskStatus(task_id); - // console.log("查询任务状态", task_id, 'res:',res); - if(typeof res === 'string'){ - // 任务已完成,检查是否已处理TOS上传 - if (!videoTaskCache.isTosProcessed(task_id)) { - // 尚未处理TOS上传,处理视频URL上传到TOS - try { - const videoUrls = res.split(','); - const tosUrls = await uploadVideosToTOS(videoUrls); - const tosUrlsString = tosUrls.join(','); - - // 更新缓存中TOS URL并标记为已处理 - videoTaskCache.finishTask(task_id, -1, tosUrlsString); - videoTaskCache.markTosProcessed(task_id); - - return { - created: util.unixTimestamp(), - data:{task_id, url: tosUrlsString, status:-1}, - }; - } catch (error) { - logger.error(`处理视频TOS上传失败: ${task_id}`, error); - // 如果上传失败,返回原始URL - return { - created: util.unixTimestamp(), - data:{task_id, url:res, status:-1}, - }; - } - } else { - // 已处理TOS上传,直接返回缓存的TOS URL - return { - created: util.unixTimestamp(), - data:{task_id, url:res, status:-1}, - }; - } - }else{ + + // 使用新的方法获取任务结果并清理缓存 + let res = videoTaskCache.getTaskResultAndClear(task_id); + + if (typeof res === 'string') { + // 任务已完成,返回TOS地址(已经在finishTask中处理过) return { created: util.unixTimestamp(), - data:{task_id, url:"", status:res||0}, + data: { task_id, url: res, status: -1 }, + }; + } else { + // 任务进行中或失败,不清理缓存 + res = videoTaskCache.getTaskStatus(task_id); + return { + created: util.unixTimestamp(), + data: { task_id, url: "", status: res || 0 }, }; } }, @@ -126,44 +76,5 @@ export default { data:'success', }; }, - "/upscale": async (request: Request) => { - request - .validate("body.task_id", _.isString) - .validate("body.targetVideoId", _.isString) - .validate("body.targetHistoryId", _.isString) - .validate("body.targetSubmitId", _.isString) - .validate("body.originHistoryId", _.isString) - .validate("body.components", _.isString) - .validate("headers.authorization", _.isString); - // refresh_token切分 必须和generations使用同一个token - const tokens = tokenSplit(request.headers.authorization); - // 取第一个 必须和generations使用同一个token - const token = tokens[0]; - const { - task_id, - targetVideoId, - targetHistoryId, - targetSubmitId, - originHistoryId, - components, - } = request.body; - const originComponentList = JSON.parse(components); - //不等结果 直接返回 - upgradeVideoResolution(task_id, { - targetVideoId, - targetHistoryId, - targetSubmitId, - originHistoryId, - originComponentList - }, token); - // let data = []; - // data = imageUrls.map((url) => ({ - // url, - // })); - return { - created: util.unixTimestamp(), - data:'success', - }; - }, }, }; diff --git a/template.docker-compose.yml b/template.docker-compose.yml index 5444c3d..76fe12f 100644 --- a/template.docker-compose.yml +++ b/template.docker-compose.yml @@ -23,6 +23,12 @@ services: - TOS_ENDPOINT=${TOS_ENDPOINT:-tos-cn-beijing.volces.com} - HEARTBEAT_ENABLED=${HEARTBEAT_ENABLED:-true} - HEARTBEAT_INTERVAL=${HEARTBEAT_INTERVAL:-30} + - USE_DATABASE_MODE=${USE_DATABASE_MODE:-false} + - MAX_CONCURRENT_TASKS=${MAX_CONCURRENT_TASKS:-3} + - TASK_POLL_INTERVAL=${TASK_POLL_INTERVAL:-5} + - IMAGE_TASK_TIMEOUT=${IMAGE_TASK_TIMEOUT:-3600} + - VIDEO_TASK_TIMEOUT=${VIDEO_TASK_TIMEOUT:-86400} + - RESULT_EXPIRE_TIME=${RESULT_EXPIRE_TIME:-86400} ports: - "${API_PORT:-3302}:3302" volumes: diff --git a/template.ecosystem.config.json b/template.ecosystem.config.json index 8133f84..67a53ae 100644 --- a/template.ecosystem.config.json +++ b/template.ecosystem.config.json @@ -21,7 +21,13 @@ "TOS_REGION": "cn-beijing", "TOS_ENDPOINT": "tos-cn-beijing.volces.com", "HEARTBEAT_ENABLED": true, - "HEARTBEAT_INTERVAL": 30 + "HEARTBEAT_INTERVAL": 30, + "USE_DATABASE_MODE": false, + "MAX_CONCURRENT_TASKS": 3, + "TASK_POLL_INTERVAL": 5, + "IMAGE_TASK_TIMEOUT": 3600, + "VIDEO_TASK_TIMEOUT": 86400, + "RESULT_EXPIRE_TIME": 86400 }, "env_production": { "NODE_ENV": "production", @@ -37,7 +43,13 @@ "TOS_REGION": "cn-beijing", "TOS_ENDPOINT": "tos-cn-beijing.volces.com", "HEARTBEAT_ENABLED": true, - "HEARTBEAT_INTERVAL": 30 + "HEARTBEAT_INTERVAL": 30, + "USE_DATABASE_MODE": false, + "MAX_CONCURRENT_TASKS": 3, + "TASK_POLL_INTERVAL": 5, + "IMAGE_TASK_TIMEOUT": 3600, + "VIDEO_TASK_TIMEOUT": 86400, + "RESULT_EXPIRE_TIME": 86400 }, "log_file": "./logs/combined-3302.log", "out_file": "./logs/out-3302.log", @@ -69,7 +81,13 @@ "TOS_REGION": "cn-beijing", "TOS_ENDPOINT": "tos-cn-beijing.volces.com", "HEARTBEAT_ENABLED": true, - "HEARTBEAT_INTERVAL": 30 + "HEARTBEAT_INTERVAL": 30, + "USE_DATABASE_MODE": false, + "MAX_CONCURRENT_TASKS": 3, + "TASK_POLL_INTERVAL": 5, + "IMAGE_TASK_TIMEOUT": 3600, + "VIDEO_TASK_TIMEOUT": 86400, + "RESULT_EXPIRE_TIME": 86400 }, "env_production": { "NODE_ENV": "production", @@ -85,7 +103,13 @@ "TOS_REGION": "cn-beijing", "TOS_ENDPOINT": "tos-cn-beijing.volces.com", "HEARTBEAT_ENABLED": true, - "HEARTBEAT_INTERVAL": 30 + "HEARTBEAT_INTERVAL": 30, + "USE_DATABASE_MODE": false, + "MAX_CONCURRENT_TASKS": 3, + "TASK_POLL_INTERVAL": 5, + "IMAGE_TASK_TIMEOUT": 3600, + "VIDEO_TASK_TIMEOUT": 86400, + "RESULT_EXPIRE_TIME": 86400 }, "log_file": "./logs/combined-3303.log", "out_file": "./logs/out-3303.log", @@ -117,7 +141,13 @@ "TOS_REGION": "cn-beijing", "TOS_ENDPOINT": "tos-cn-beijing.volces.com", "HEARTBEAT_ENABLED": true, - "HEARTBEAT_INTERVAL": 30 + "HEARTBEAT_INTERVAL": 30, + "USE_DATABASE_MODE": false, + "MAX_CONCURRENT_TASKS": 3, + "TASK_POLL_INTERVAL": 5, + "IMAGE_TASK_TIMEOUT": 3600, + "VIDEO_TASK_TIMEOUT": 86400, + "RESULT_EXPIRE_TIME": 86400 }, "env_production": { "NODE_ENV": "production", @@ -133,7 +163,13 @@ "TOS_REGION": "cn-beijing", "TOS_ENDPOINT": "tos-cn-beijing.volces.com", "HEARTBEAT_ENABLED": true, - "HEARTBEAT_INTERVAL": 30 + "HEARTBEAT_INTERVAL": 30, + "USE_DATABASE_MODE": false, + "MAX_CONCURRENT_TASKS": 3, + "TASK_POLL_INTERVAL": 5, + "IMAGE_TASK_TIMEOUT": 3600, + "VIDEO_TASK_TIMEOUT": 86400, + "RESULT_EXPIRE_TIME": 86400 }, "log_file": "./logs/combined-3304.log", "out_file": "./logs/out-3304.log",