diff --git a/src/api/routes/images.ts b/src/api/routes/images.ts index 252cf13..a66f617 100644 --- a/src/api/routes/images.ts +++ b/src/api/routes/images.ts @@ -5,34 +5,45 @@ import { generateImages } from "@/api/controllers/images.ts"; import { tokenSplit } from "@/api/controllers/core.ts"; import util from "@/lib/util.ts"; import { ImagesTaskCache } from '@/api/ImagesTaskCache.ts'; +import DatabaseGenerationService from '@/lib/services/DatabaseGenerationService.js'; + +// 通过环境变量控制新旧方法 +const USE_DATABASE_MODE = process.env.USE_DATABASE_MODE === 'true'; export default { prefix: "/v1/images", get: { "/query": async (request: Request) => { - const imagesTaskCache = ImagesTaskCache.getInstance(); request - .validate("query.task_id", _.isString) // 从 query 中校验 + .validate("query.task_id", _.isString); // 从 query 中校验 const { task_id, } = request.query; // 从 query 中获取 - // 使用新的方法获取任务结果并清理缓存 - let res = imagesTaskCache.getTaskResultAndClear(task_id); - - if (typeof res === 'string') { - // 任务已完成,返回TOS地址(已经在finishTask中处理过) - return { - created: util.unixTimestamp(), - data: { task_id, url: res, status: -1 }, - }; + if (USE_DATABASE_MODE) { + // 使用新的数据库方法 + return await DatabaseGenerationService.queryTaskResult(task_id); } else { - // 任务进行中或失败,不清理缓存 - res = imagesTaskCache.getTaskStatus(task_id); - return { - created: util.unixTimestamp(), - data: { task_id, url: "", status: res || 0 }, - }; + // 使用原有的内存缓存方法 + const imagesTaskCache = ImagesTaskCache.getInstance(); + + // 使用新的方法获取任务结果并清理缓存 + let res = imagesTaskCache.getTaskResultAndClear(task_id); + + if (typeof res === 'string') { + // 任务已完成,返回TOS地址(已经在finishTask中处理过) + return { + created: util.unixTimestamp(), + data: { task_id, url: res, status: -1 }, + }; + } else { + // 任务进行中或失败,不清理缓存 + res = imagesTaskCache.getTaskStatus(task_id); + return { + created: util.unixTimestamp(), + data: { task_id, url: "", status: res || 0 }, + }; + } } }, }, @@ -64,29 +75,29 @@ export default { response_format, } = request.body; const responseFormat = _.defaultTo(response_format, "url"); - generateImages('jimeng-3.0', task_id, prompt, { - width, - height, - sampleStrength:0.5, - negativePrompt:"", - }, token); - // let data = []; - // if (responseFormat == "b64_json") { - // data = ( - // await Promise.all(imageUrls.map((url) => util.fetchFileBASE64(url))) - // ).map((b64) => ({ b64_json: b64 })); - // } else { - // data = imageUrls.map((url) => ({ - // url, - // })); - // } - // return { - // created: util.unixTimestamp(), - // data, - // }; + + if (USE_DATABASE_MODE) { + // 使用新的数据库方法 + await DatabaseGenerationService.generateImagesV2('jimeng-3.0', task_id, prompt, { + width, + height, + sampleStrength: 0.5, + negativePrompt: "", + response_format: responseFormat + }, token); + } else { + // 使用原有方法(不等待结果) + generateImages('jimeng-3.0', task_id, prompt, { + width, + height, + sampleStrength: 0.5, + negativePrompt: "", + }, token); + } + return { created: util.unixTimestamp(), - data:'success', + data: 'success', }; }, }, diff --git a/src/api/routes/video.ts b/src/api/routes/video.ts index aecc1cb..0a2adad 100644 --- a/src/api/routes/video.ts +++ b/src/api/routes/video.ts @@ -5,34 +5,45 @@ import { generateVideo } from "@/api/controllers/video.ts"; import { tokenSplit } from "@/api/controllers/core.ts"; import util from "@/lib/util.ts"; import { VideoTaskCache } from '@/api/VideoTaskCache.ts'; +import databaseService from '@/lib/services/DatabaseGenerationService.js'; + +// 环境变量控制新旧方法切换 +const USE_DATABASE_MODE = process.env.USE_DATABASE_MODE === 'true'; export default { prefix: "/v1/video", get: { "/query": async (request: Request) => { - const videoTaskCache = VideoTaskCache.getInstance(); request .validate("query.task_id", _.isString) // 从 query 中校验 const { task_id, } = request.query; // 从 query 中获取 - // 使用新的方法获取任务结果并清理缓存 - let res = videoTaskCache.getTaskResultAndClear(task_id); - - if (typeof res === 'string') { - // 任务已完成,返回TOS地址(已经在finishTask中处理过) - return { - created: util.unixTimestamp(), - data: { task_id, url: res, status: -1 }, - }; + if (USE_DATABASE_MODE) { + // 使用数据库模式 + return await databaseService.queryTaskResult(task_id); } else { - // 任务进行中或失败,不清理缓存 - res = videoTaskCache.getTaskStatus(task_id); - return { - created: util.unixTimestamp(), - data: { task_id, url: "", status: res || 0 }, - }; + // 使用原有的内存缓存模式 + const videoTaskCache = VideoTaskCache.getInstance(); + + // 使用新的方法获取任务结果并清理缓存 + let res = videoTaskCache.getTaskResultAndClear(task_id); + + if (typeof res === 'string') { + // 任务已完成,返回TOS地址(已经在finishTask中处理过) + return { + created: util.unixTimestamp(), + data: { task_id, url: res, status: -1 }, + }; + } else { + // 任务进行中或失败,不清理缓存 + res = videoTaskCache.getTaskStatus(task_id); + return { + created: util.unixTimestamp(), + data: { task_id, url: "", status: res || 0 }, + }; + } } }, }, @@ -47,6 +58,7 @@ export default { .validate("body.duration", v => _.isUndefined(v) || _.isFinite(v)) .validate("body.ratio", v => _.isUndefined(v) || _.isString(v)) .validate("headers.authorization", _.isString); + // refresh_token切分 const tokens = tokenSplit(request.headers.authorization); // 随机挑选一个refresh_token @@ -59,21 +71,30 @@ export default { duration, ratio, } = request.body; - // const imageUrls = await generateVideo(model, task_id, prompt, { - //不等结果 直接返回 - generateVideo(task_id, prompt, { - images:images, - isPro:is_pro, - duration:duration*1000, - ratio, - }, token); - // let data = []; - // data = imageUrls.map((url) => ({ - // url, - // })); + + if (USE_DATABASE_MODE) { + // 使用数据库模式 + await databaseService.generateVideoV2(task_id, prompt, { + images: images, + isPro: is_pro, + duration: duration ? duration * 1000 : undefined, + ratio, + }, token); + } else { + // 使用原有的内存缓存模式 + // const imageUrls = await generateVideo(model, task_id, prompt, { + // 不等结果 直接返回 + generateVideo(task_id, prompt, { + images: images, + isPro: is_pro, + duration: duration * 1000, + ratio, + }, token); + } + return { created: util.unixTimestamp(), - data:'success', + data: 'success', }; }, }, diff --git a/src/index.ts b/src/index.ts index 8f38768..92a2e25 100644 --- a/src/index.ts +++ b/src/index.ts @@ -8,6 +8,7 @@ import routes from "@/api/routes/index.ts"; import logger from "@/lib/logger.ts"; import mongoDBManager from "@/lib/database/mongodb.ts"; import heartbeatService from "@/lib/services/HeartbeatService.ts"; +import taskPollingService from "@/lib/services/TaskPollingService.js"; const startupTime = performance.now(); @@ -41,6 +42,17 @@ const startupTime = performance.now(); } } + // 启动任务轮询服务(仅在数据库模式下) + const useDatabaseMode = process.env.USE_DATABASE_MODE === 'true'; + if (useDatabaseMode && mongoDBManager.isMongoConnected()) { + try { + await taskPollingService.start(); + logger.success("Task polling service started"); + } catch (error) { + logger.warn("Failed to start task polling service:", error.message); + } + } + config.service.bindAddress && logger.success("Service bind address:", config.service.bindAddress); })() diff --git a/src/lib/database/models/GenerationResult.ts b/src/lib/database/models/GenerationResult.ts new file mode 100644 index 0000000..92684a0 --- /dev/null +++ b/src/lib/database/models/GenerationResult.ts @@ -0,0 +1,90 @@ +import mongoose, { Schema, Document } from 'mongoose'; + +// 生成结果表数据模型 +export interface IGenerationResult extends Document { + task_id: string; // 关联的任务ID,主键 + task_type: 'image' | 'video'; // 任务类型 + server_id: string; // 处理服务器ID + + // 生成结果 + status: 'success' | 'failed'; // 最终状态 + original_urls: string[]; // 原始URL数组(即梦返回的地址) + tos_urls: string[]; // TOS URL数组(上传后的地址) + + // 处理元数据 + metadata: { + generation_time?: number; // 生成耗时(毫秒) + tos_upload_time?: number; // TOS上传耗时(毫秒) + total_files: number; // 文件总数 + successful_uploads: number; // 成功上传数量 + tos_upload_errors?: string[]; // TOS上传错误信息 + fail_reason?: string; // 失败原因 + }; + + // 时间管理 + created_at: number; // 创建时间戳(秒) + expires_at: number; // 过期时间戳(用于自动清理,默认24小时后) +} + +const GenerationResultSchema: Schema = new Schema({ + task_id: { + type: String, + required: true, + unique: true, + index: true + }, + task_type: { + type: String, + required: true, + enum: ['image', 'video'] + }, + server_id: { + type: String, + required: true, + index: true + }, + status: { + type: String, + required: true, + enum: ['success', 'failed'] + }, + original_urls: { + type: [String], + default: [] + }, + tos_urls: { + type: [String], + default: [] + }, + metadata: { + generation_time: Number, + tos_upload_time: Number, + total_files: { type: Number, required: true }, + successful_uploads: { type: Number, required: true }, + tos_upload_errors: [String], + fail_reason: String + }, + created_at: { + type: Number, + default: () => Math.floor(Date.now() / 1000), + index: true + }, + expires_at: { + type: Number, + required: true, + index: true + } +}, { + collection: 'jimeng_free_generation_results', + timestamps: false // 使用自定义时间戳 +}); + +// 创建索引 +GenerationResultSchema.index({ task_id: 1 }); +GenerationResultSchema.index({ expires_at: 1 }); // 用于过期清理 +GenerationResultSchema.index({ server_id: 1, created_at: 1 }); + +// 设置TTL索引,自动清理过期记录 +GenerationResultSchema.index({ expires_at: 1 }, { expireAfterSeconds: 0 }); + +export default mongoose.model('GenerationResult', GenerationResultSchema); \ No newline at end of file diff --git a/src/lib/database/models/GenerationTask.ts b/src/lib/database/models/GenerationTask.ts new file mode 100644 index 0000000..a51f013 --- /dev/null +++ b/src/lib/database/models/GenerationTask.ts @@ -0,0 +1,152 @@ +import mongoose, { Schema, Document } from 'mongoose'; + +// 生成任务表数据模型 +export interface IGenerationTask extends Document { + task_id: string; // 任务唯一标识,主键 + task_type: 'image' | 'video'; // 任务类型 + server_id: string; // 分配的服务器ID(外部分配,对应SERVICE_ID) + + // 原始请求参数 + original_params: { + model?: string; // 模型名称 + prompt: string; // 提示词 + negative_prompt?: string; // 负向提示词 + width?: number; // 宽度 + height?: number; // 高度 + sample_strength?: number; // 采样强度 + images?: Array<{ // 图片参数(视频生成用) + url: string; + width: number; + height: number; + }>; + is_pro?: boolean; // 是否专业版 + duration?: number; // 时长(毫秒) + ratio?: string; // 比例 + response_format?: string; // 响应格式 + }; + + // 生成过程中的内部参数 + internal_params: { + refresh_token: string; // 认证令牌 + component_id?: string; // 组件ID + history_id?: string; // 即梦平台历史记录ID + mapped_model?: string; // 映射后的模型名 + submit_id?: string; // 提交ID + }; + + // 任务状态和控制 + status: 'pending' | 'processing' | 'polling' | 'completed' | 'failed'; + retry_count: number; // 当前重试次数 + max_retries: number; // 最大重试次数(默认3次) + + // 轮询控制 + next_poll_at?: number; // 下次轮询时间戳(用于控制轮询间隔) + poll_interval: number; // 轮询间隔(秒,默认10秒) + task_timeout: number; // 任务超时时间(秒,图片1小时,视频24小时) + + // 时间戳 + created_at: number; // 创建时间戳(秒) + updated_at: number; // 更新时间戳(秒) + started_at?: number; // 开始处理时间戳(秒) + completed_at?: number; // 完成时间戳(秒) + + // 错误信息 + error_message?: string; // 错误描述 + fail_code?: string; // 即梦平台返回的失败代码 +} + +const GenerationTaskSchema: Schema = new Schema({ + task_id: { + type: String, + required: true, + unique: true, + index: true + }, + task_type: { + type: String, + required: true, + enum: ['image', 'video'] + }, + server_id: { + type: String, + required: true, + index: true + }, + original_params: { + model: String, + prompt: { type: String, required: true }, + negative_prompt: String, + width: Number, + height: Number, + sample_strength: Number, + images: [{ + url: String, + width: Number, + height: Number + }], + is_pro: Boolean, + duration: Number, + ratio: String, + response_format: String + }, + internal_params: { + refresh_token: { type: String, required: true }, + component_id: String, + history_id: String, + mapped_model: String, + submit_id: String + }, + status: { + type: String, + required: true, + enum: ['pending', 'processing', 'polling', 'completed', 'failed'], + default: 'pending', + index: true + }, + retry_count: { + type: Number, + default: 0 + }, + max_retries: { + type: Number, + default: 3 + }, + next_poll_at: { + type: Number, + index: true + }, + poll_interval: { + type: Number, + default: 10 + }, + task_timeout: { + type: Number, + required: true + }, + created_at: { + type: Number, + default: () => Math.floor(Date.now() / 1000), + index: true + }, + updated_at: { + type: Number, + default: () => Math.floor(Date.now() / 1000) + }, + started_at: Number, + completed_at: Number, + error_message: String, + fail_code: String +}, { + collection: 'jimeng_free_generation_tasks', + timestamps: false // 使用自定义时间戳 +}); + +// 创建复合索引 - 用于轮询查询 +GenerationTaskSchema.index({ server_id: 1, status: 1, next_poll_at: 1 }); + +// 创建其他索引 +GenerationTaskSchema.index({ task_id: 1 }); +GenerationTaskSchema.index({ created_at: 1 }); +GenerationTaskSchema.index({ updated_at: 1 }); + +export default mongoose.model('GenerationTask', GenerationTaskSchema); \ No newline at end of file diff --git a/src/lib/services/DatabaseGenerationService.ts b/src/lib/services/DatabaseGenerationService.ts new file mode 100644 index 0000000..e8d851e --- /dev/null +++ b/src/lib/services/DatabaseGenerationService.ts @@ -0,0 +1,380 @@ +import GenerationTask, { IGenerationTask } from '@/lib/database/models/GenerationTask.js'; +import GenerationResult, { IGenerationResult } from '@/lib/database/models/GenerationResult.js'; +import logger from '@/lib/logger.js'; + +/** + * 数据库驱动的生成服务 + * 新的生成方法,支持异步任务处理和结果持久化 + */ +export class DatabaseGenerationService { + private static instance: DatabaseGenerationService; + private currentServerId: string; + + private constructor() { + this.currentServerId = process.env.SERVICE_ID || 'jimeng-free-api'; + } + + public static getInstance(): DatabaseGenerationService { + if (!DatabaseGenerationService.instance) { + DatabaseGenerationService.instance = new DatabaseGenerationService(); + } + return DatabaseGenerationService.instance; + } + + /** + * 图片生成 - 数据库版本 + * 创建任务记录,由轮询服务异步处理 + */ + async generateImagesV2( + model: string, + taskId: string, + prompt: string, + params: { + width?: number; + height?: number; + sampleStrength?: number; + negativePrompt?: string; + response_format?: string; + }, + refreshToken: string + ): Promise { + try { + const currentServerId = this.currentServerId; + const imageTimeout = parseInt(process.env.IMAGE_TASK_TIMEOUT || '3600'); + + // 检查任务是否已存在 + const existingTask = await GenerationTask.findOne({ task_id: taskId }); + if (existingTask) { + logger.warn(`Task ${taskId} already exists, skipping creation`); + return; + } + + // 创建任务记录 + await GenerationTask.create({ + task_id: taskId, + task_type: 'image', + server_id: currentServerId, + original_params: { + model, + prompt, + width: params.width || 1024, + height: params.height || 1024, + sample_strength: params.sampleStrength || 0.5, + negative_prompt: params.negativePrompt || "", + response_format: params.response_format + }, + internal_params: { + refresh_token: refreshToken + }, + status: 'pending', + retry_count: 0, + max_retries: 3, + poll_interval: 10, + task_timeout: imageTimeout, + created_at: Math.floor(Date.now() / 1000), + updated_at: Math.floor(Date.now() / 1000) + }); + + logger.info(`Image task created: ${taskId} for server: ${currentServerId}`); + + } catch (error) { + logger.error(`Failed to create image task ${taskId}:`, error); + throw error; + } + } + + /** + * 视频生成 - 数据库版本 + * 创建任务记录,由轮询服务异步处理 + */ + async generateVideoV2( + taskId: string, + prompt: string, + params: { + images?: Array<{ + url: string; + width: number; + height: number; + }>; + isPro?: boolean; + duration?: number; + ratio?: string; + }, + refreshToken: string + ): Promise { + try { + const currentServerId = this.currentServerId; + const videoTimeout = parseInt(process.env.VIDEO_TASK_TIMEOUT || '86400'); + + // 检查任务是否已存在 + const existingTask = await GenerationTask.findOne({ task_id: taskId }); + if (existingTask) { + logger.warn(`Task ${taskId} already exists, skipping creation`); + return; + } + + // 创建任务记录 + await GenerationTask.create({ + task_id: taskId, + task_type: 'video', + server_id: currentServerId, + original_params: { + prompt, + images: params.images || [], + is_pro: params.isPro || false, + duration: params.duration || 5000, + ratio: params.ratio || '9:16' + }, + internal_params: { + refresh_token: refreshToken + }, + status: 'pending', + retry_count: 0, + max_retries: 3, + poll_interval: 15, // 视频轮询间隔更长 + task_timeout: videoTimeout, + created_at: Math.floor(Date.now() / 1000), + updated_at: Math.floor(Date.now() / 1000) + }); + + logger.info(`Video task created: ${taskId} for server: ${currentServerId}`); + + } catch (error) { + logger.error(`Failed to create video task ${taskId}:`, error); + throw error; + } + } + + /** + * 查询任务结果 + * 从结果表查询,如果存在则返回并清理,否则查询任务状态 + */ + async queryTaskResult(taskId: string): Promise { + try { + // 1. 先查询结果表 + const result = await GenerationResult.findOne({ task_id: taskId }); + + if (result) { + // 找到结果,返回并清理 + const response = { + created: Math.floor(Date.now() / 1000), + data: { + task_id: taskId, + url: result.tos_urls.join(','), + status: result.status === 'success' ? -1 : -2 + } + }; + + // 删除结果记录(一次性消费) + await GenerationResult.deleteOne({ task_id: taskId }); + + logger.info(`Task result retrieved and cleaned: ${taskId}, status: ${result.status}`); + return response; + } + + // 2. 查询任务状态 + const task = await GenerationTask.findOne({ task_id: taskId }); + + if (!task) { + return { + created: Math.floor(Date.now() / 1000), + data: { task_id: taskId, url: "", status: 0 } // 任务不存在 + }; + } + + // 3. 根据任务状态返回 + const statusMap = { + 'pending': 0, + 'processing': 0, + 'polling': 0, + 'failed': -2, + 'completed': -1 // 这种情况理论上不会出现,因为completed会生成result + }; + + const responseStatus = statusMap[task.status] || 0; + + logger.debug(`Task status queried: ${taskId}, status: ${task.status} -> ${responseStatus}`); + + return { + created: Math.floor(Date.now() / 1000), + data: { + task_id: taskId, + url: "", + status: responseStatus + } + }; + + } catch (error) { + logger.error(`Failed to query task result ${taskId}:`, error); + // 发生错误时返回任务不存在状态 + return { + created: Math.floor(Date.now() / 1000), + data: { task_id: taskId, url: "", status: 0 } + }; + } + } + + /** + * 获取任务统计信息 + */ + async getTaskStats(serverId?: string): Promise { + try { + const filter = serverId ? { server_id: serverId } : {}; + + const stats = await GenerationTask.aggregate([ + { $match: filter }, + { + $group: { + _id: "$status", + count: { $sum: 1 } + } + } + ]); + + const result = stats.reduce((acc, curr) => { + acc[curr._id] = curr.count; + return acc; + }, {} as Record); + + // 添加服务器负载信息 + if (serverId) { + result['server_load'] = await this.getServerLoad(serverId); + } + + return result; + } catch (error) { + logger.error('Failed to get task stats:', error); + return {}; + } + } + + /** + * 获取服务器负载 + */ + async getServerLoad(serverId: string): Promise { + try { + return await GenerationTask.countDocuments({ + server_id: serverId, + status: { $in: ['processing', 'polling'] } + }); + } catch (error) { + logger.error(`Failed to get server load for ${serverId}:`, error); + return 0; + } + } + + /** + * 获取任务详情 + */ + async getTaskDetail(taskId: string): Promise { + try { + return await GenerationTask.findOne({ task_id: taskId }); + } catch (error) { + logger.error(`Failed to get task detail ${taskId}:`, error); + return null; + } + } + + /** + * 取消任务 + */ + async cancelTask(taskId: string): Promise { + try { + const result = await GenerationTask.updateOne( + { + task_id: taskId, + status: { $in: ['pending', 'processing', 'polling'] } + }, + { + status: 'failed', + error_message: 'Task cancelled by user', + completed_at: Math.floor(Date.now() / 1000), + updated_at: Math.floor(Date.now() / 1000) + } + ); + + const cancelled = result.modifiedCount > 0; + + if (cancelled) { + logger.info(`Task cancelled: ${taskId}`); + + // 创建取消结果记录 + const currentTime = Math.floor(Date.now() / 1000); + const expireTime = currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400'); + + const task = await GenerationTask.findOne({ task_id: taskId }); + if (task) { + await GenerationResult.create({ + task_id: taskId, + task_type: task.task_type, + server_id: task.server_id, + status: 'failed', + original_urls: [], + tos_urls: [], + metadata: { + total_files: 0, + successful_uploads: 0, + fail_reason: 'Task cancelled by user' + }, + created_at: currentTime, + expires_at: expireTime + }); + } + } + + return cancelled; + } catch (error) { + logger.error(`Failed to cancel task ${taskId}:`, error); + return false; + } + } + + /** + * 清理过期任务和结果 + */ + async cleanupExpiredData(): Promise<{ tasks: number; results: number }> { + try { + const currentTime = Math.floor(Date.now() / 1000); + const taskTimeout = parseInt(process.env.IMAGE_TASK_TIMEOUT || '3600'); // 使用较短的作为默认值 + const cutoffTime = currentTime - taskTimeout * 2; // 清理超过2倍超时时间的任务 + + // 清理过期任务 + const taskResult = await GenerationTask.deleteMany({ + status: { $in: ['completed', 'failed'] }, + completed_at: { $lt: cutoffTime } + }); + + // 清理过期结果(由TTL索引自动处理,这里手动清理作为备份) + const resultResult = await GenerationResult.deleteMany({ + expires_at: { $lt: currentTime } + }); + + const cleanupStats = { + tasks: taskResult.deletedCount || 0, + results: resultResult.deletedCount || 0 + }; + + if (cleanupStats.tasks > 0 || cleanupStats.results > 0) { + logger.info(`Cleanup completed - tasks: ${cleanupStats.tasks}, results: ${cleanupStats.results}`); + } + + return cleanupStats; + } catch (error) { + logger.error('Failed to cleanup expired data:', error); + return { tasks: 0, results: 0 }; + } + } + + /** + * 获取服务信息 + */ + getServiceInfo() { + return { + serverId: this.currentServerId, + type: 'DatabaseGenerationService', + version: '1.0.0' + }; + } +} + +export default DatabaseGenerationService.getInstance(); \ No newline at end of file diff --git a/src/lib/services/TaskPollingService.ts b/src/lib/services/TaskPollingService.ts new file mode 100644 index 0000000..9cf7134 --- /dev/null +++ b/src/lib/services/TaskPollingService.ts @@ -0,0 +1,933 @@ +import fs from 'fs-extra'; +import path from 'path'; +import { formatInTimeZone } from 'date-fns-tz'; +import GenerationTask, { IGenerationTask } from '@/lib/database/models/GenerationTask.js'; +import GenerationResult, { IGenerationResult } from '@/lib/database/models/GenerationResult.js'; +import mongoDBManager from '@/lib/database/mongodb.js'; +import logger from '@/lib/logger.js'; +import TOSService from '@/lib/tos/tos-service.js'; +import { generateImages as originalGenerateImages } from '@/api/controllers/images.js'; +import { generateVideo as originalGenerateVideo } from '@/api/controllers/video.js'; +import { request } from '@/api/controllers/core.js'; + +const timeZone = 'Asia/Shanghai'; +const TASK_POLLING_LOG_PATH = path.resolve("./logs/task_polling.log"); + +function taskLog(message: string) { + try { + const timestamp = formatInTimeZone(new Date(), timeZone, "yyyy-MM-dd HH:mm:ss.SSS"); + const logMessage = `[TaskPolling][${timestamp}] ${message}`; + + fs.ensureDirSync(path.dirname(TASK_POLLING_LOG_PATH)); + fs.appendFileSync(TASK_POLLING_LOG_PATH, logMessage + "\n"); + + // 同时输出到控制台 + logger.info(logMessage); + } catch (err) { + console.error("TaskPolling log write error:", err); + } +} + +export class TaskPollingService { + private static instance: TaskPollingService; + private currentServerId: string; + private pollInterval: NodeJS.Timeout | null = null; + private isRunning: boolean = false; + private maxConcurrentTasks: number; + private cleanupCounter = 0; + + private constructor() { + this.currentServerId = process.env.SERVICE_ID || 'jimeng-free-api'; + this.maxConcurrentTasks = parseInt(process.env.MAX_CONCURRENT_TASKS || '3'); + taskLog("TaskPollingService initialized"); + } + + public static getInstance(): TaskPollingService { + if (!TaskPollingService.instance) { + TaskPollingService.instance = new TaskPollingService(); + } + return TaskPollingService.instance; + } + + /** + * 启动轮询服务 + * 注意:与HeartbeatService分离,使用不同的定时器 + */ + public async start(): Promise { + if (this.isRunning) { + taskLog('Task polling service is already running'); + return; + } + + try { + // 确保MongoDB连接可用 + if (!mongoDBManager.isMongoConnected()) { + taskLog('MongoDB not connected, skipping task polling service'); + return; + } + + const pollIntervalMs = parseInt(process.env.TASK_POLL_INTERVAL || '5') * 1000; + + // 每5秒轮询一次(与心跳服务的60秒区分开) + this.pollInterval = setInterval(async () => { + await this.processTasks(); + }, pollIntervalMs); + + this.isRunning = true; + taskLog(`Task polling service started for server: ${this.currentServerId}, interval: ${pollIntervalMs}ms`); + + // 监听进程退出事件 + process.on('SIGINT', () => this.gracefulShutdown()); + process.on('SIGTERM', () => this.gracefulShutdown()); + + } catch (error) { + taskLog(`Failed to start task polling service: ${error.message}`); + throw error; + } + } + + public async stop(): Promise { + if (!this.isRunning) { + return; + } + + if (this.pollInterval) { + clearInterval(this.pollInterval); + this.pollInterval = null; + } + + this.isRunning = false; + taskLog('Task polling service stopped'); + } + + /** + * 主处理方法 + */ + private async processTasks(): Promise { + try { + const currentTime = Math.floor(Date.now() / 1000); + + // 1. 处理待处理任务 + await this.processPendingTasks(currentTime); + + // 2. 检查轮询任务 + await this.checkPollingTasks(currentTime); + + // 3. 检查和处理超时任务 + await this.checkTimeoutTasks(currentTime); + + // 4. 定期清理(每720次轮询,约1小时) + this.cleanupCounter++; + if (this.cleanupCounter >= 720) { + await this.performCleanup(); + this.cleanupCounter = 0; + } + + } catch (error) { + taskLog(`Task polling error: ${error.message}`); + } + } + + /** + * 检查超时任务 + */ + private async checkTimeoutTasks(currentTime: number): Promise { + try { + const timeoutCount = await DatabaseCleanupService.cleanupTimeoutTasks(); + if (timeoutCount > 0) { + taskLog(`Marked ${timeoutCount} tasks as failed due to timeout`); + } + } catch (error) { + taskLog(`Failed to check timeout tasks: ${error.message}`); + } + } + + /** + * 处理待处理任务 + */ + private async processPendingTasks(currentTime: number): Promise { + try { + // 获取当前服务器负载 + const currentLoad = await this.getCurrentServerLoad(); + if (currentLoad >= this.maxConcurrentTasks) { + return; // 服务器已满载 + } + + const availableSlots = this.maxConcurrentTasks - currentLoad; + + // 获取待处理任务 + const pendingTasks = await GenerationTask.find({ + server_id: this.currentServerId, + status: 'pending' + }) + .sort({ created_at: 1 }) // 先入先出 + .limit(availableSlots); + + for (const task of pendingTasks) { + await this.startTask(task, currentTime); + } + + if (pendingTasks.length > 0) { + taskLog(`Started ${pendingTasks.length} pending tasks`); + } + } catch (error) { + taskLog(`Failed to process pending tasks: ${error.message}`); + } + } + + /** + * 检查轮询任务 + */ + private async checkPollingTasks(currentTime: number): Promise { + try { + // 获取需要轮询的任务 + const pollingTasks = await GenerationTask.find({ + server_id: this.currentServerId, + status: 'polling', + $or: [ + { next_poll_at: { $exists: false } }, + { next_poll_at: { $lte: currentTime } } + ] + }); + + for (const task of pollingTasks) { + await this.pollTaskResult(task, currentTime); + } + + if (pollingTasks.length > 0) { + taskLog(`Polled ${pollingTasks.length} tasks for results`); + } + } catch (error) { + taskLog(`Failed to check polling tasks: ${error.message}`); + } + } + + /** + * 开始处理任务 + */ + private async startTask(task: IGenerationTask, currentTime: number): Promise { + try { + // 原子更新任务状态为processing + const updateResult = await GenerationTask.updateOne( + { task_id: task.task_id, status: 'pending' }, + { + status: 'processing', + started_at: currentTime, + updated_at: currentTime + } + ); + + if (updateResult.modifiedCount === 0) { + return; // 任务已被其他进程处理 + } + + taskLog(`Starting task: ${task.task_id} (${task.task_type})`); + + // 调用原有生成方法获取historyId + let historyId: string; + + if (task.task_type === 'image') { + historyId = await this.callImageGeneration(task); + } else { + historyId = await this.callVideoGeneration(task); + } + + // 更新任务为轮询状态 + await GenerationTask.updateOne( + { task_id: task.task_id }, + { + status: 'polling', + 'internal_params.history_id': historyId, + next_poll_at: currentTime + task.poll_interval, + updated_at: currentTime + } + ); + + taskLog(`Task ${task.task_id} started successfully, history_id: ${historyId}`); + + } catch (error) { + taskLog(`Failed to start task ${task.task_id}: ${error.message}`); + await this.markTaskFailed(task.task_id, error.message); + } + } + + /** + * 轮询任务结果 + */ + private async pollTaskResult(task: IGenerationTask, currentTime: number): Promise { + try { + const historyId = task.internal_params.history_id; + if (!historyId) { + throw new Error('Missing history_id for polling'); + } + + taskLog(`Polling task result: ${task.task_id}, history_id: ${historyId}`); + + // 调用即梦API检查结果 + const result = await request("post", "/mweb/v1/get_history_by_ids", task.internal_params.refresh_token, { + data: { + history_ids: [historyId], + image_info: task.task_type === 'image' ? this.getImageInfo() : undefined, + http_common_info: { + aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"), + }, + }, + }); + + if (!result[historyId]) { + // 结果不存在,更新下次轮询时间 + await GenerationTask.updateOne( + { task_id: task.task_id }, + { + next_poll_at: currentTime + task.poll_interval, + updated_at: currentTime + } + ); + return; + } + + const { status, fail_code: failCode, item_list } = result[historyId]; + + if (status === 20) { + // 仍在生成中,更新下次轮询时间 + await GenerationTask.updateOne( + { task_id: task.task_id }, + { + next_poll_at: currentTime + task.poll_interval, + updated_at: currentTime + } + ); + } else if (status === 10 || (status !== 30 && item_list && item_list.length > 0)) { + // 生成完成 + await this.handleGenerationSuccess(task, item_list, currentTime); + } else { + // 生成失败 + const errorMessage = failCode === '2038' ? 'Content filtered' : 'Generation failed'; + await this.handleGenerationFailure(task, errorMessage, failCode, currentTime); + } + + } catch (error) { + taskLog(`Failed to poll task ${task.task_id}: ${error.message}`); + // 轮询出错,更新下次轮询时间,增加重试计数 + await this.handlePollingError(task, error, currentTime); + } + } + + /** + * 处理生成成功 + */ + private async handleGenerationSuccess(task: IGenerationTask, itemList: any[], currentTime: number): Promise { + try { + let originalUrls: string[] = []; + + if (task.task_type === 'image') { + originalUrls = itemList.map((item) => { + if (item?.image?.large_images?.[0]?.image_url) { + return item.image.large_images[0].image_url; + } + return item?.common_attr?.cover_url || null; + }).filter(url => url !== null); + } else { + originalUrls = itemList.map((item) => { + return item?.video?.transcoded_video?.origin?.video_url || null; + }).filter(url => url !== null); + } + + if (originalUrls.length === 0) { + throw new Error('No valid URLs generated'); + } + + taskLog(`Task ${task.task_id} generated ${originalUrls.length} files`); + + // 上传到TOS + const tosUrls = await this.uploadToTOS(originalUrls, task.task_type); + + // 创建结果记录 + const expireTime = currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400'); + + await GenerationResult.create({ + task_id: task.task_id, + task_type: task.task_type, + server_id: task.server_id, + status: 'success', + original_urls: originalUrls, + tos_urls: tosUrls, + metadata: { + generation_time: (currentTime - (task.started_at || task.created_at)) * 1000, + total_files: originalUrls.length, + successful_uploads: tosUrls.length, + tos_upload_errors: tosUrls.length < originalUrls.length ? ['Some uploads failed'] : undefined + }, + created_at: currentTime, + expires_at: expireTime + }); + + // 标记任务完成 + await GenerationTask.updateOne( + { task_id: task.task_id }, + { + status: 'completed', + completed_at: currentTime, + updated_at: currentTime + } + ); + + taskLog(`Task ${task.task_id} completed successfully`); + + } catch (error) { + taskLog(`Failed to handle success for task ${task.task_id}: ${error.message}`); + await this.markTaskFailed(task.task_id, error.message); + } + } + + /** + * 处理生成失败 + */ + private async handleGenerationFailure(task: IGenerationTask, errorMessage: string, failCode?: string, currentTime?: number): Promise { + const now = currentTime || Math.floor(Date.now() / 1000); + + try { + // 创建失败结果记录 + const expireTime = now + parseInt(process.env.RESULT_EXPIRE_TIME || '86400'); + + await GenerationResult.create({ + task_id: task.task_id, + task_type: task.task_type, + server_id: task.server_id, + status: 'failed', + original_urls: [], + tos_urls: [], + metadata: { + total_files: 0, + successful_uploads: 0, + fail_reason: errorMessage + }, + created_at: now, + expires_at: expireTime + }); + + // 标记任务失败 + await GenerationTask.updateOne( + { task_id: task.task_id }, + { + status: 'failed', + error_message: errorMessage, + fail_code: failCode, + completed_at: now, + updated_at: now + } + ); + + taskLog(`Task ${task.task_id} failed: ${errorMessage}`); + + } catch (error) { + taskLog(`Failed to handle failure for task ${task.task_id}: ${error.message}`); + } + } + + /** + * 处理轮询错误 + */ + private async handlePollingError(task: IGenerationTask, error: Error, currentTime: number): Promise { + const newRetryCount = task.retry_count + 1; + + if (newRetryCount >= task.max_retries) { + await this.markTaskFailed(task.task_id, `Polling failed after ${task.max_retries} retries: ${error.message}`); + } else { + // 增加重试计数,延长轮询间隔 + const nextPollDelay = task.poll_interval * Math.pow(2, newRetryCount); // 指数退避 + + await GenerationTask.updateOne( + { task_id: task.task_id }, + { + retry_count: newRetryCount, + next_poll_at: currentTime + nextPollDelay, + updated_at: currentTime + } + ); + } + } + + /** + * 标记任务失败 + */ + private async markTaskFailed(taskId: string, errorMessage: string): Promise { + const currentTime = Math.floor(Date.now() / 1000); + + try { + const task = await GenerationTask.findOne({ task_id: taskId }); + if (task) { + await this.handleGenerationFailure(task, errorMessage, undefined, currentTime); + } + } catch (error) { + taskLog(`Failed to mark task ${taskId} as failed: ${error.message}`); + } + } + + /** + * 调用图片生成API + */ + private async callImageGeneration(task: IGenerationTask): Promise { + const { model, prompt, width = 1024, height = 1024, sample_strength = 0.5, negative_prompt = "" } = task.original_params; + const refreshToken = task.internal_params.refresh_token; + + // 映射模型 + const MODEL_MAP = { + "jimeng-3.1": "high_aes_general_v30l_art_fangzhou:general_v3.0_18b", + "jimeng-3.0": "high_aes_general_v30l:general_v3.0_18b", + "jimeng-2.1": "high_aes_general_v21_L:general_v2.1_L", + "jimeng-2.0-pro": "high_aes_general_v20_L:general_v2.0_L", + "jimeng-2.0": "high_aes_general_v20:general_v2.0", + "jimeng-1.4": "high_aes_general_v14:general_v1.4", + "jimeng-xl-pro": "text2img_xl_sft", + }; + const mappedModel = MODEL_MAP[model] || MODEL_MAP["jimeng-3.0"]; + + const componentId = this.generateUUID(); + const submitId = this.generateUUID(); + + const { aigc_data } = await request( + "post", + "/mweb/v1/aigc_draft/generate", + refreshToken, + { + params: { + babi_param: encodeURIComponent( + JSON.stringify({ + scenario: "image_video_generation", + feature_key: "aigc_to_image", + feature_entrance: "to_image", + feature_entrance_detail: "to_image-" + mappedModel, + }) + ), + }, + data: { + extend: { + root_model: mappedModel, + template_id: "", + }, + submit_id: submitId, + metrics_extra: JSON.stringify({ + templateId: "", + generateCount: 1, + promptSource: "custom", + templateSource: "", + lastRequestId: "", + originRequestId: "", + }), + draft_content: JSON.stringify({ + type: "draft", + id: this.generateUUID(), + min_version: "3.0.2", + is_from_tsn: true, + version: "3.0.2", + main_component_id: componentId, + component_list: [ + { + type: "image_base_component", + id: componentId, + min_version: "3.0.2", + generate_type: "generate", + aigc_mode: "workbench", + abilities: { + type: "", + id: this.generateUUID(), + generate: { + type: "", + id: this.generateUUID(), + core_param: { + type: "", + id: this.generateUUID(), + model: mappedModel, + prompt, + negative_prompt, + seed: Math.floor(Math.random() * 100000000) + 2500000000, + sample_strength, + image_ratio: 1, + large_image_info: { + type: "", + id: this.generateUUID(), + height, + width, + resolution_type: "1k", + }, + }, + history_option: { + type: "", + id: this.generateUUID(), + }, + }, + }, + }, + ], + }), + http_common_info: { + aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"), + }, + }, + } + ); + + const historyId = aigc_data.history_record_id; + if (!historyId) { + throw new Error('Failed to get history_record_id from image generation API'); + } + + taskLog(`Image generation started: ${task.task_id}, history_id: ${historyId}`); + return historyId; + } + + /** + * 调用视频生成API + */ + private async callVideoGeneration(task: IGenerationTask): Promise { + const { prompt, images = [], is_pro = false, duration = 5000, ratio = '9:16' } = task.original_params; + const refreshToken = task.internal_params.refresh_token; + + // 映射模型 + const MODEL_MAP = { + "jimeng-v-3.0": "dreamina_ic_generate_video_model_vgfm_3.0", + "jimeng-v-3.0-pro": "dreamina_ic_generate_video_model_vgfm_3.0_pro", + }; + + let _model = "jimeng-v-3.0"; + if (is_pro) { + _model = "jimeng-v-3.0-pro"; + } + const mappedModel = MODEL_MAP[_model] || MODEL_MAP["jimeng-v-3.0"]; + + // 构建视频生成输入 + let video_gen_inputs: any = { + type: "", + id: this.generateUUID(), + min_version: "1.0.0", + prompt: prompt, + video_mode: 2, + fps: 24, + duration_ms: duration, + }; + + if (images && images.length >= 1) { + // 首帧 + video_gen_inputs.first_frame_image = { + type: "image", + id: this.generateUUID(), + source_from: "upload", + platform_type: 1, + name: "", + image_uri: images[0].url, + width: images[0].width, + height: images[0].height, + format: "", + uri: images[0].url, + }; + } + + if (images && images.length >= 2) { + // 尾帧 + video_gen_inputs.end_frame_image = { + type: "image", + id: this.generateUUID(), + source_from: "upload", + platform_type: 1, + name: "", + image_uri: images[1].url, + width: images[1].width, + height: images[1].height, + format: "", + uri: images[1].url, + }; + } + + const componentId = this.generateUUID(); + const originSubmitId = this.generateUUID(); + + const { aigc_data } = await request( + "post", + "/mweb/v1/aigc_draft/generate", + refreshToken, + { + params: { + babi_param: encodeURIComponent( + JSON.stringify({ + scenario: "image_video_generation", + feature_key: "text_to_video", + feature_entrance: "to_video", + feature_entrance_detail: "to_image-text_to_video", + }) + ), + }, + data: { + extend: { + m_video_commerce_info: { + resource_id: "generate_video", + resource_id_type: "str", + resource_sub_type: "aigc", + benefit_type: "basic_video_operation_vgfm_v_three" + }, + root_model: mappedModel, + template_id: "", + history_option: {}, + }, + submit_id: this.generateUUID(), + metrics_extra: JSON.stringify({ + promptSource: "custom", + originSubmitId: originSubmitId, + isDefaultSeed: 1, + originTemplateId: "", + imageNameMapping: {}, + }), + draft_content: JSON.stringify({ + type: "draft", + id: this.generateUUID(), + min_version: "3.0.5", + min_features: [], + is_from_tsn: true, + version: "3.2.2", + main_component_id: componentId, + component_list: [ + { + type: "video_base_component", + id: componentId, + min_version: "1.0.0", + generate_type: "gen_video", + aigc_mode: "workbench", + metadata: { + type: "", + id: this.generateUUID(), + created_platform: 3, + created_platform_version: "", + created_time_in_ms: Date.now(), + created_did: "", + }, + abilities: { + type: "", + id: this.generateUUID(), + gen_video: { + type: "", + id: this.generateUUID(), + text_to_video_params: { + type: "", + id: this.generateUUID(), + video_gen_inputs: [video_gen_inputs], + video_aspect_ratio: ratio, + seed: Math.floor(Math.random() * 100000000) + 2500000000, + model_req_key: mappedModel, + }, + video_task_extra: { + promptSource: "custom", + originSubmitId: originSubmitId, + isDefaultSeed: 1, + originTemplateId: "", + imageNameMapping: {}, + } + }, + }, + process_type: 1, + }, + ], + }), + http_common_info: { + aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"), + }, + }, + } + ); + + const historyId = aigc_data.history_record_id; + if (!historyId) { + throw new Error('Failed to get history_record_id from video generation API'); + } + + taskLog(`Video generation started: ${task.task_id}, history_id: ${historyId}`); + return historyId; + } + + /** + * 上传文件到TOS + */ + private async uploadToTOS(urls: string[], taskType: 'image' | 'video'): Promise { + const tosUrls: string[] = []; + + for (const url of urls) { + try { + const fileName = taskType === 'image' + ? `image-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.webp` + : `video-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.mp4`; + + const folder = taskType === 'image' ? 'images' : 'videos'; + const tosUrl = await TOSService.uploadFromUrl(url, `${folder}/${fileName}`); + tosUrls.push(tosUrl); + + taskLog(`${taskType} uploaded to TOS: ${url} -> ${tosUrl}`); + } catch (error) { + taskLog(`${taskType} upload to TOS failed: ${url} - ${error.message}`); + // 如果上传失败,保留原URL + tosUrls.push(url); + } + } + + return tosUrls; + } + + /** + * 获取当前服务器负载 + */ + private async getCurrentServerLoad(): Promise { + return await GenerationTask.countDocuments({ + server_id: this.currentServerId, + status: { $in: ['processing', 'polling'] } + }); + } + + /** + * 获取图片查询信息 + */ + private getImageInfo() { + return { + width: 2048, + height: 2048, + format: "webp", + image_scene_list: [ + { + scene: "normal", + width: 2400, + height: 2400, + uniq_key: "2400", + format: "webp", + }, + { + scene: "normal", + width: 1080, + height: 1080, + uniq_key: "1080", + format: "webp", + }, + { + scene: "normal", + width: 720, + height: 720, + uniq_key: "720", + format: "webp", + }, + ], + }; + } + + /** + * 执行清理任务 + */ + private async performCleanup(): Promise { + try { + const expiredResults = await DatabaseCleanupService.cleanupExpiredResults(); + const timeoutTasks = await DatabaseCleanupService.cleanupTimeoutTasks(); + + taskLog(`Cleanup completed - expired results: ${expiredResults}, timeout tasks: ${timeoutTasks}`); + } catch (error) { + taskLog(`Cleanup failed: ${error.message}`); + } + } + + /** + * 优雅停机 + */ + private async gracefulShutdown(): Promise { + taskLog('Received shutdown signal, stopping task polling service...'); + await this.stop(); + } + + public getServiceInfo() { + return { + serverId: this.currentServerId, + isRunning: this.isRunning, + maxConcurrentTasks: this.maxConcurrentTasks + }; + } + + /** + * 生成UUID + */ + private generateUUID(): string { + return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) { + const r = Math.random() * 16 | 0; + const v = c === 'x' ? r : (r & 0x3 | 0x8); + return v.toString(16); + }); + } +} + +/** + * 数据库清理服务 + */ +export class DatabaseCleanupService { + /** + * 清理过期的结果记录 + */ + static async cleanupExpiredResults(): Promise { + const currentTime = Math.floor(Date.now() / 1000); + + const result = await GenerationResult.deleteMany({ + expires_at: { $lt: currentTime } + }); + + return result.deletedCount || 0; + } + + /** + * 清理超时的任务 + */ + static async cleanupTimeoutTasks(): Promise { + const currentTime = Math.floor(Date.now() / 1000); + + // 查找超时的任务并标记为失败 + const timeoutTasks = await GenerationTask.find({ + status: { $in: ['processing', 'polling'] }, + $expr: { + $gt: [ + { $subtract: [currentTime, '$started_at'] }, + '$task_timeout' + ] + } + }); + + // 批量更新超时任务为失败状态 + const timeoutTaskIds = timeoutTasks.map(task => task.task_id); + + if (timeoutTaskIds.length > 0) { + await GenerationTask.updateMany( + { task_id: { $in: timeoutTaskIds } }, + { + status: 'failed', + error_message: 'Task timeout', + updated_at: currentTime, + completed_at: currentTime + } + ); + + // 为超时任务创建失败结果记录 + const failedResults = timeoutTasks.map(task => ({ + task_id: task.task_id, + task_type: task.task_type, + server_id: task.server_id, + status: 'failed', + original_urls: [], + tos_urls: [], + metadata: { + total_files: 0, + successful_uploads: 0, + fail_reason: 'Task timeout after ' + task.task_timeout + ' seconds' + }, + created_at: currentTime, + expires_at: currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400') + })); + + if (failedResults.length > 0) { + await GenerationResult.insertMany(failedResults); + } + } + + return timeoutTaskIds.length; + } +} + +export default TaskPollingService.getInstance(); \ No newline at end of file