import fs from 'fs-extra'; import path from 'path'; import { formatInTimeZone } from 'date-fns-tz'; import GenerationTask, { IGenerationTask } from '@/lib/database/models/GenerationTask.js'; import GenerationResult, { IGenerationResult } from '@/lib/database/models/GenerationResult.js'; import NeDBManager from '@/lib/database/nedb.js'; import logger from '@/lib/logger.js'; import TOSService from '@/lib/tos/tos-service.js'; import { generateImages as originalGenerateImages } from '@/api/controllers/images.js'; import { generateVideo as originalGenerateVideo } from '@/api/controllers/video.js'; import { request, image4Options_0302, image3Options, image4Options } from '@/api/controllers/core.js'; import EX from "@/api/consts/exceptions.ts"; const timeZone = 'Asia/Shanghai'; const TASK_POLLING_LOG_PATH = path.resolve("./logs/task_polling.log"); function taskLog(message: string) { try { const timestamp = formatInTimeZone(new Date(), timeZone, "yyyy-MM-dd HH:mm:ss.SSS"); const logMessage = `[TaskPolling][${timestamp}] ${message}`; fs.ensureDirSync(path.dirname(TASK_POLLING_LOG_PATH)); fs.appendFileSync(TASK_POLLING_LOG_PATH, logMessage + "\n"); // 同时输出到控制台 logger.info(logMessage); } catch (err) { console.error("TaskPolling log write error:", err); } } export class TaskPollingService { private static instance: TaskPollingService; private currentServerId: string; private pollInterval: NodeJS.Timeout | null = null; private isRunning: boolean = false; private maxConcurrentTasks: number; private cleanupCounter = 0; private constructor() { this.currentServerId = process.env.SERVICE_ID || 'jimeng-free-api'; this.maxConcurrentTasks = parseInt(process.env.MAX_CONCURRENT_TASKS || '3'); taskLog("TaskPollingService initialized"); } public static getInstance(): TaskPollingService { if (!TaskPollingService.instance) { TaskPollingService.instance = new TaskPollingService(); } return TaskPollingService.instance; } /** * 启动轮询服务 */ public async start(): Promise { if (this.isRunning) { taskLog('Task polling service is already running'); return; } try { // 确保NeDB数据库初始化 await NeDBManager.initialize(); taskLog('NeDB database initialized for task polling service'); const pollIntervalMs = parseInt(process.env.TASK_POLL_INTERVAL || '10') * 1000; // 每10秒轮询一次(优化性能,减少频繁查询) this.pollInterval = setInterval(async () => { await this.processTasks(); }, pollIntervalMs); this.isRunning = true; taskLog(`Task polling service started for server: ${this.currentServerId}, interval: ${pollIntervalMs}ms`); // 监听进程退出事件 process.on('SIGINT', () => this.gracefulShutdown()); process.on('SIGTERM', () => this.gracefulShutdown()); } catch (error) { taskLog(`Failed to start task polling service: ${error.message}`); throw error; } } public async stop(): Promise { if (!this.isRunning) { return; } if (this.pollInterval) { clearInterval(this.pollInterval); this.pollInterval = null; } this.isRunning = false; taskLog('Task polling service stopped'); } /** * 主处理方法 */ private async processTasks(): Promise { try { const currentTime = Math.floor(Date.now() / 1000); // 1. 处理待处理任务 await this.processPendingTasks(currentTime); // 2. 检查轮询任务 await this.checkPollingTasks(currentTime); // 3. 检查和处理超时任务 await this.checkTimeoutTasks(currentTime); // 4. 定期清理(每360次轮询,约1小时,基于10秒间隔) this.cleanupCounter++; if (this.cleanupCounter >= 360) { await this.performCleanup(); this.cleanupCounter = 0; } } catch (error) { taskLog(`Task polling error: ${error.message}`); } } /** * 检查超时任务 */ private async checkTimeoutTasks(currentTime: number): Promise { try { const timeoutCount = await DatabaseCleanupService.cleanupTimeoutTasks(); if (timeoutCount > 0) { taskLog(`Marked ${timeoutCount} tasks as failed due to timeout`); } } catch (error) { taskLog(`Failed to check timeout tasks: ${error.message}`); } } /** * 处理待处理任务 */ private async processPendingTasks(currentTime: number): Promise { try { // 获取当前服务器负载 const currentLoad = await this.getCurrentServerLoad(); if (currentLoad >= this.maxConcurrentTasks) { return; // 服务器已满载 } const availableSlots = this.maxConcurrentTasks - currentLoad; // 获取待处理任务 const allTasks = await GenerationTask.find({ server_id: this.currentServerId, status: 'pending' }); // 手动排序并限制数量 const pendingTasks = allTasks .sort((a, b) => a.created_at - b.created_at) // 先入先出 .slice(0, availableSlots); for (const task of pendingTasks) { await this.startTask(task, currentTime); } if (pendingTasks.length > 0) { taskLog(`Started ${pendingTasks.length} pending tasks`); } } catch (error) { taskLog(`Failed to process pending tasks: ${error.message}`); } } /** * 检查轮询任务 */ private async checkPollingTasks(currentTime: number): Promise { try { // 获取需要轮询的任务(排除正在处理成功状态的任务) const pollingTasks = await GenerationTask.find({ server_id: this.currentServerId, status: 'polling', // 只处理polling状态的任务,不处理processing_success状态 $or: [ { next_poll_at: { $exists: false } }, { next_poll_at: { $lte: currentTime } } ] }); for (const task of pollingTasks) { await this.pollTaskResult(task, currentTime); } if (pollingTasks.length > 0) { taskLog(`Polled ${pollingTasks.length} tasks for results`); } // 查找需要重试上传的任务 const uploadingTasks = await GenerationTask.find({ server_id: this.currentServerId, status: 'uploading' }); for (const task of uploadingTasks) { await this.retryTOSUpload(task, currentTime); } if (uploadingTasks.length > 0) { taskLog(`Retried upload for ${uploadingTasks.length} tasks`); } } catch (error) { taskLog(`Failed to check polling tasks: ${error.message}`); } } /** * 重试TOS上传 */ private async retryTOSUpload(task: IGenerationTask, currentTime: number): Promise { try { // 检查是否需要重试(避免过于频繁的重试) const timeSinceLastUpdate = currentTime - task.updated_at; const minRetryInterval = 60; // 最少等待60秒再重试,减少频繁重试 if (timeSinceLastUpdate < minRetryInterval) { return; // 还未到重试时间 } taskLog(`Retrying TOS upload for task ${task.task_id}, attempt ${(task.upload_retry_count || 0) + 1}`); // 从processing_success状态重新获取原始URLs // 这里需要重新获取生成的URLs,因为我们没有在任务中存储它们 // 为了简化,我们可以从internal_params中获取history_id,然后重新查询 const historyId = task.internal_params.history_id; if (!historyId) { throw new Error('Missing history_id for upload retry'); } // 重新获取生成结果 const result = await request("post", "/mweb/v1/get_history_by_ids", task.internal_params.refresh_token, { data: { history_ids: [historyId], image_info: task.task_type === 'image' ? this.getImageInfo() : undefined, http_common_info: { aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"), }, }, }); if (!result[historyId] || !result[historyId].item_list) { throw new Error('Failed to retrieve generation result for upload retry'); } const { item_list } = result[historyId]; let originalUrls: string[] = []; if (task.task_type === 'image') { originalUrls = item_list.map((item) => { let res_url = null; let idata = item?.image; let idata2 = item?.image?.large_images; if(!idata2 || !idata2.length){ for(let key in idata){ if(idata[key] && idata[key].length && idata[key].length > 0){ if(idata[key][0].image_url){ res_url = idata[key][0].image_url; break; } } } }else{ res_url = idata2[0].image_url; } if(res_url) return res_url; return item?.common_attr?.cover_url || null; }).filter(url => url !== null); } else { originalUrls = item_list.map((item) => { let res_url = null; let vdata = item?.video?.transcoded_video||{}; for(let key in vdata){ if(vdata[key] && vdata[key].video_url){ res_url = vdata[key].video_url; break; } } return res_url || null; }).filter(url => url !== null); } if (originalUrls.length === 0) { throw new Error('No valid URLs found for upload retry'); } // 执行上传 await this.performTOSUpload(task, originalUrls, currentTime); } catch (error) { taskLog(`Failed to retry TOS upload for task ${task.task_id}: ${error.message}`); await this.handleUploadFailure(task, error.message, currentTime); } } /** * 开始处理任务 */ private async startTask(task: IGenerationTask, currentTime: number): Promise { try { // 原子更新任务状态为processing const updateResult = await GenerationTask.updateOne( { task_id: task.task_id, status: 'pending' }, { $set: { status: 'processing', started_at: currentTime, updated_at: currentTime } } ); if (updateResult === 0) { return; // 任务已被其他进程处理 } taskLog(`Starting task: ${task.task_id} (${task.task_type})`); // 调用原有生成方法获取historyId let historyId: string; if (task.task_type === 'image') { historyId = await this.callImageGeneration(task); } else { historyId = await this.callVideoGeneration(task); } // 更新任务为轮询状态 await GenerationTask.updateOne( { task_id: task.task_id }, { $set: { status: 'polling', 'internal_params.history_id': historyId, next_poll_at: currentTime + task.poll_interval, updated_at: currentTime } } ); taskLog(`Task ${task.task_id} started successfully, history_id: ${historyId}`); } catch (error) { taskLog(`Failed to start task ${task.task_id}: ${error.message}`); await this.markTaskFailed(task.task_id, error.message); } } /** * 轮询任务结果 */ private async pollTaskResult(task: IGenerationTask, currentTime: number): Promise { try { const historyId = task.internal_params.history_id; if (!historyId) { throw new Error('Missing history_id for polling'); } taskLog(`Polling task result: ${task.task_id}, history_id: ${historyId}`); // 调用即梦API检查结果 const result = await request("post", "/mweb/v1/get_history_by_ids", task.internal_params.refresh_token, { data: { history_ids: [historyId], image_info: task.task_type === 'image' ? this.getImageInfo() : undefined, http_common_info: { aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"), }, }, }); if (!result[historyId]) { // 结果不存在,更新下次轮询时间 await GenerationTask.updateOne( { task_id: task.task_id }, { $set: { next_poll_at: currentTime + task.poll_interval, updated_at: currentTime } } ); return; } const { status, fail_code: failCode, item_list } = result[historyId]; if (status === 20) { // 仍在生成中,更新下次轮询时间 await GenerationTask.updateOne( { task_id: task.task_id }, { $set: { next_poll_at: currentTime + task.poll_interval, updated_at: currentTime } } ); } else if (status === 10 || (status !== 30 && item_list && item_list.length > 0)) { // 生成完成 await this.handleGenerationSuccess(task, item_list, currentTime); } else { // 生成失败 if(failCode == '2038' || failCode == '2041'){ await this.handleGenerationFailure(task, EX.API_CONTENT_FILTERED[1] as string, failCode, currentTime); }else{ await this.handleGenerationFailure(task, EX.API_IMAGE_GENERATION_FAILED[1] as string, failCode, currentTime); } } } catch (error) { taskLog(`Failed to poll task ${task.task_id}: ${error.message}`); // 轮询出错,更新下次轮询时间,增加重试计数 await this.handlePollingError(task, error, currentTime); } } /** * 处理生成成功 */ private async handleGenerationSuccess(task: IGenerationTask, itemList: any[], currentTime: number): Promise { try { let originalUrls: string[] = []; if (task.task_type === 'image') { originalUrls = itemList.map((item) => { let res_url = null; let idata = item?.image; let idata2 = item?.image?.large_images; if(!idata2 || !idata2.length){ for(let key in idata){ if(idata[key] && idata[key].length && idata[key].length > 0){ if(idata[key][0].image_url){ res_url = idata[key][0].image_url; break; } } } }else{ res_url = idata2[0].image_url; } if(res_url) return res_url; return item?.common_attr?.cover_url || null; }).filter(url => url !== null); } else { originalUrls = itemList.map((item) => { let res_url = null; let vdata = item?.video?.transcoded_video||{}; for(let key in vdata){ if(vdata[key] && vdata[key].video_url){ res_url = vdata[key].video_url; break; } } return res_url || null; }).filter(url => url !== null); } if (originalUrls.length === 0) { throw new Error('No valid URLs generated'); } taskLog(`Task ${task.task_id} generated ${originalUrls.length} files`); // 先标记任务为处理中,防止重复处理 const updateResult = await GenerationTask.updateOne( { task_id: task.task_id, status: 'polling' }, { $set: { status: 'processing_success', updated_at: currentTime } } ); if (updateResult === 0) { taskLog(`Task ${task.task_id} already being processed by another instance`); return; // 任务已被其他进程处理 } // 开始上传到TOS await this.startTOSUpload(task, originalUrls, currentTime); } catch (error) { taskLog(`Failed to handle success for task ${task.task_id}: ${error.message}`); await this.markTaskFailed(task.task_id, error.message); } } /** * 开始TOS上传流程 */ private async startTOSUpload(task: IGenerationTask, originalUrls: string[], currentTime: number): Promise { try { // 标记任务为上传中状态 await GenerationTask.updateOne( { task_id: task.task_id }, { $set: { status: 'uploading', upload_retry_count: 0, updated_at: currentTime } } ); // 执行上传 await this.performTOSUpload(task, originalUrls, currentTime); } catch (error) { taskLog(`Failed to start TOS upload for task ${task.task_id}: ${error.message}`); await this.handleUploadFailure(task, error.message, currentTime); } } /** * 执行TOS上传 */ private async performTOSUpload(task: IGenerationTask, originalUrls: string[], currentTime: number): Promise { try { // 上传到TOS(带重试机制) const tosUrls = await this.uploadToTOSWithRetry(originalUrls, task.task_type, task.task_id); // 创建结果记录 const expireTime = currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400'); await GenerationResult.create({ task_id: task.task_id, task_type: task.task_type, server_id: task.server_id, status: 'success', original_urls: originalUrls, tos_urls: tosUrls, metadata: { generation_time: (currentTime - (task.started_at || task.created_at)) * 1000, total_files: originalUrls.length, successful_uploads: tosUrls.length, upload_retry_count: task.upload_retry_count || 0, tos_upload_errors: tosUrls.length < originalUrls.length ? ['Some uploads failed'] : undefined }, created_at: currentTime, expires_at: expireTime, is_read: false, read_count: 0 }); // 标记任务完成 await GenerationTask.updateOne( { task_id: task.task_id }, { $set: { status: 'completed', completed_at: currentTime, updated_at: currentTime } } ); taskLog(`Task ${task.task_id} completed successfully`); } catch (error) { taskLog(`TOS upload failed for task ${task.task_id}: ${error.message}`); await this.handleUploadFailure(task, error.message, currentTime); } } /** * 处理上传失败 */ private async handleUploadFailure(task: IGenerationTask, errorMessage: string, currentTime: number): Promise { const currentRetryCount = task.upload_retry_count || 0; const maxRetries = task.max_upload_retries || 5; if (currentRetryCount >= maxRetries) { // 重试次数已达上限,标记任务失败 taskLog(`Task ${task.task_id} upload failed after ${maxRetries} retries`); try { // 创建失败结果记录 const expireTime = currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400'); await GenerationResult.create({ task_id: task.task_id, task_type: task.task_type, server_id: task.server_id, status: 'failed', original_urls: [], tos_urls: [], metadata: { total_files: 0, successful_uploads: 0, upload_retry_count: currentRetryCount, fail_reason: `TOS upload failed after ${maxRetries} retries: ${errorMessage}` }, created_at: currentTime, expires_at: expireTime, is_read: false, read_count: 0 }); // 标记任务失败 await GenerationTask.updateOne( { task_id: task.task_id }, { $set: { status: 'failed', error_message: `Upload failed after ${maxRetries} retries: ${errorMessage}`, completed_at: currentTime, updated_at: currentTime } } ); } catch (cleanupError) { taskLog(`Failed to cleanup task ${task.task_id} after upload failure: ${cleanupError.message}`); } } else { // 增加重试计数,保持上传中状态 await GenerationTask.updateOne( { task_id: task.task_id }, { $set: { upload_retry_count: currentRetryCount + 1, updated_at: currentTime } } ); taskLog(`Task ${task.task_id} upload retry ${currentRetryCount + 1}/${maxRetries} scheduled`); } } /** * 上传文件到TOS(带重试机制) */ private async uploadToTOSWithRetry(urls: string[], taskType: 'image' | 'video', task_id: string): Promise { const tosUrls: string[] = []; for (const url of urls) { try { const fileName = taskType === 'image' ? `image-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.png` : `video-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.mp4`; const folder = taskType === 'image' ? 'images' : 'videos'; const tosUrl = await TOSService.uploadFromUrl( url, `jimeng_free/${folder}/${task_id}/${fileName}`, { maxRetries: 5, timeout: 120000 } // TOS客户端内部重试5次,超时2分钟 ); tosUrls.push(tosUrl); taskLog(`${taskType} uploaded to TOS: ${url} -> ${tosUrl}`); } catch (error) { taskLog(`${taskType} upload to TOS failed: ${url} - ${error.message}`); throw error; // 抛出错误,让上层处理重试逻辑 } } return tosUrls; } /** * 处理生成失败 */ private async handleGenerationFailure(task: IGenerationTask, errorMessage: string, failCode?: string, currentTime?: number): Promise { const now = currentTime || Math.floor(Date.now() / 1000); try { // 创建失败结果记录 const expireTime = now + parseInt(process.env.RESULT_EXPIRE_TIME || '86400'); await GenerationResult.create({ task_id: task.task_id, task_type: task.task_type, server_id: task.server_id, status: 'failed', original_urls: [], tos_urls: [], metadata: { total_files: 0, successful_uploads: 0, fail_reason: errorMessage }, created_at: now, expires_at: expireTime, is_read: false, read_count: 0 }); // 标记任务失败 await GenerationTask.updateOne( { task_id: task.task_id }, { $set: { status: 'failed', error_message: errorMessage, fail_code: failCode, completed_at: now, updated_at: now } } ); taskLog(`Task ${task.task_id} failed: ${errorMessage}`); } catch (error) { taskLog(`Failed to handle failure for task ${task.task_id}: ${error.message}`); } } /** * 处理轮询错误 */ private async handlePollingError(task: IGenerationTask, error: Error, currentTime: number): Promise { const newRetryCount = task.retry_count + 1; if (newRetryCount >= task.max_retries) { await this.markTaskFailed(task.task_id, `Polling failed after ${task.max_retries} retries: ${error.message}`); } else { // 增加重试计数,延长轮询间隔 const nextPollDelay = task.poll_interval * Math.pow(2, newRetryCount); // 指数退避 await GenerationTask.updateOne( { task_id: task.task_id }, { $set: { retry_count: newRetryCount, next_poll_at: currentTime + nextPollDelay, updated_at: currentTime } } ); } } /** * 标记任务失败 */ private async markTaskFailed(taskId: string, errorMessage: string): Promise { const currentTime = Math.floor(Date.now() / 1000); try { const task = await GenerationTask.findOne({ task_id: taskId }); if (task) { await this.handleGenerationFailure(task, errorMessage, undefined, currentTime); } } catch (error) { taskLog(`Failed to mark task ${taskId} as failed: ${error.message}`); } } /** * 调用图片生成API */ private async callImageGeneration(task: IGenerationTask): Promise { const { model, prompt, width = 1024, height = 1024, sample_strength = 0.5, negative_prompt = "", generate_count = 4 } = task.original_params; const refreshToken = task.internal_params.refresh_token; // 映射模型 const MODEL_MAP = { "jimeng-4.1": "high_aes_general_v41", "jimeng-3.1": "high_aes_general_v30l_art_fangzhou:general_v3.0_18b", "jimeng-3.0": "high_aes_general_v30l:general_v3.0_18b", "jimeng-2.1": "high_aes_general_v21_L:general_v2.1_L", "jimeng-2.0-pro": "high_aes_general_v20_L:general_v2.0_L", "jimeng-2.0": "high_aes_general_v20:general_v2.0", "jimeng-1.4": "high_aes_general_v14:general_v1.4", "jimeng-xl-pro": "text2img_xl_sft", }; const mappedModel = MODEL_MAP[model] || MODEL_MAP["jimeng-3.0"]; const componentId = this.generateUUID(); const submitId = this.generateUUID(); let isModel4 = model.startsWith("jimeng-4"); let options:any = image3Options( mappedModel, componentId, prompt, sample_strength, height, width, negative_prompt, '1k', generate_count, ); if(isModel4){ options = image4Options_0302( mappedModel, componentId, prompt, sample_strength, height, width, negative_prompt, '2k', generate_count, ); } taskLog(`生成图片 发起请求 isModel4 ${isModel4}`); const { aigc_data } = await request( "post", "/mweb/v1/aigc_draft/generate", refreshToken, options, // { // params: { // babi_param: encodeURIComponent( // JSON.stringify({ // scenario: "image_video_generation", // feature_key: "aigc_to_image", // feature_entrance: "to_image", // feature_entrance_detail: "to_image-" + mappedModel, // }) // ), // }, // data: { // extend: { // root_model: mappedModel, // template_id: "", // }, // submit_id: submitId, // metrics_extra: JSON.stringify({ // templateId: "", // generateCount: 1, // promptSource: "custom", // templateSource: "", // lastRequestId: "", // originRequestId: "", // }), // draft_content: JSON.stringify({ // type: "draft", // id: this.generateUUID(), // min_version: "3.0.2", // is_from_tsn: true, // version: "3.0.2", // main_component_id: componentId, // component_list: [ // { // type: "image_base_component", // id: componentId, // min_version: "3.0.2", // generate_type: "generate", // aigc_mode: "workbench", // abilities: { // type: "", // id: this.generateUUID(), // generate: { // type: "", // id: this.generateUUID(), // core_param: { // type: "", // id: this.generateUUID(), // model: mappedModel, // prompt, // negative_prompt, // seed: Math.floor(Math.random() * 100000000) + 2500000000, // sample_strength, // image_ratio: 1, // large_image_info: { // type: "", // id: this.generateUUID(), // height, // width, // resolution_type: "1k", // }, // }, // history_option: { // type: "", // id: this.generateUUID(), // }, // }, // }, // }, // ], // }), // http_common_info: { // aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"), // }, // }, // } ); const historyId = aigc_data.history_record_id; if (!historyId) { throw new Error('Failed to get history_record_id from image generation API'); } taskLog(`Image generation started: ${task.task_id}, history_id: ${historyId}`); return historyId; } /** * 调用视频生成API */ private async callVideoGeneration(task: IGenerationTask): Promise { const { prompt, images = [], is_pro = false, duration = 5000, ratio = '9:16' } = task.original_params; const refreshToken = task.internal_params.refresh_token; // 映射模型 const MODEL_MAP = { "jimeng-v-3.0": "dreamina_ic_generate_video_model_vgfm_3.0", "jimeng-v-3.0-pro": "dreamina_ic_generate_video_model_vgfm_3.0_pro", }; let _model = "jimeng-v-3.0"; if (is_pro) { _model = "jimeng-v-3.0-pro"; } const mappedModel = MODEL_MAP[_model] || MODEL_MAP["jimeng-v-3.0"]; // 构建视频生成输入 let video_gen_inputs: any = { type: "", id: this.generateUUID(), min_version: "1.0.0", prompt: prompt, video_mode: 2, fps: 24, duration_ms: duration, }; if (images && images.length >= 1) { // 首帧 video_gen_inputs.first_frame_image = { type: "image", id: this.generateUUID(), source_from: "upload", platform_type: 1, name: "", image_uri: images[0].url, width: images[0].width, height: images[0].height, format: "", uri: images[0].url, }; } if (images && images.length >= 2) { // 尾帧 video_gen_inputs.end_frame_image = { type: "image", id: this.generateUUID(), source_from: "upload", platform_type: 1, name: "", image_uri: images[1].url, width: images[1].width, height: images[1].height, format: "", uri: images[1].url, }; } const componentId = this.generateUUID(); const originSubmitId = this.generateUUID(); const { aigc_data } = await request( "post", "/mweb/v1/aigc_draft/generate", refreshToken, { params: { babi_param: encodeURIComponent( JSON.stringify({ scenario: "image_video_generation", feature_key: "text_to_video", feature_entrance: "to_video", feature_entrance_detail: "to_image-text_to_video", }) ), }, data: { extend: { m_video_commerce_info: { resource_id: "generate_video", resource_id_type: "str", resource_sub_type: "aigc", benefit_type: "basic_video_operation_vgfm_v_three" }, root_model: mappedModel, template_id: "", history_option: {}, }, submit_id: this.generateUUID(), metrics_extra: JSON.stringify({ promptSource: "custom", originSubmitId: originSubmitId, isDefaultSeed: 1, originTemplateId: "", imageNameMapping: {}, }), draft_content: JSON.stringify({ type: "draft", id: this.generateUUID(), min_version: "3.0.5", min_features: [], is_from_tsn: true, version: "3.2.2", main_component_id: componentId, component_list: [ { type: "video_base_component", id: componentId, min_version: "1.0.0", generate_type: "gen_video", aigc_mode: "workbench", metadata: { type: "", id: this.generateUUID(), created_platform: 3, created_platform_version: "", created_time_in_ms: Date.now(), created_did: "", }, abilities: { type: "", id: this.generateUUID(), gen_video: { type: "", id: this.generateUUID(), text_to_video_params: { type: "", id: this.generateUUID(), video_gen_inputs: [video_gen_inputs], video_aspect_ratio: ratio, seed: Math.floor(Math.random() * 100000000) + 2500000000, model_req_key: mappedModel, }, video_task_extra: { promptSource: "custom", originSubmitId: originSubmitId, isDefaultSeed: 1, originTemplateId: "", imageNameMapping: {}, } }, }, process_type: 1, }, ], }), http_common_info: { aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"), }, }, } ); const historyId = aigc_data.history_record_id; if (!historyId) { throw new Error('Failed to get history_record_id from video generation API'); } taskLog(`Video generation started: ${task.task_id}, history_id: ${historyId}`); return historyId; } /** * 上传文件到TOS */ private async uploadToTOS(urls: string[], taskType: 'image' | 'video', task_id: string): Promise { const tosUrls: string[] = []; for (const url of urls) { try { const fileName = taskType === 'image' ? `image-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.png` : `video-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.mp4`; const folder = taskType === 'image' ? 'images' : 'videos'; const tosUrl = await TOSService.uploadFromUrl(url, `jimeng_free/${folder}/${task_id}/${fileName}`); tosUrls.push(tosUrl); taskLog(`${taskType} uploaded to TOS: ${url} -> ${tosUrl}`); } catch (error) { taskLog(`${taskType} upload to TOS failed: ${url} - ${error.message}`); // 如果上传失败,保留原URL tosUrls.push(url); } } return tosUrls; } /** * 获取当前服务器负载 */ private async getCurrentServerLoad(): Promise { return await GenerationTask.countDocuments({ server_id: this.currentServerId, status: { $in: ['processing', 'polling'] } }); } /** * 获取图片查询信息 */ private getImageInfo() { return { width: 2048, height: 2048, format: "webp", image_scene_list: [ { scene: "normal", width: 2400, height: 2400, uniq_key: "2400", format: "webp", }, { scene: "normal", width: 1080, height: 1080, uniq_key: "1080", format: "webp", }, { scene: "normal", width: 720, height: 720, uniq_key: "720", format: "webp", }, ], }; } /** * 执行清理任务 */ private async performCleanup(): Promise { try { const expiredResults = await DatabaseCleanupService.cleanupExpiredResults(); const timeoutTasks = await DatabaseCleanupService.cleanupTimeoutTasks(); taskLog(`Cleanup completed - expired results: ${expiredResults}, timeout tasks: ${timeoutTasks}`); } catch (error) { taskLog(`Cleanup failed: ${error.message}`); } } /** * 优雅停机 */ private async gracefulShutdown(): Promise { taskLog('Received shutdown signal, stopping task polling service...'); await this.stop(); } public getServiceInfo() { return { serverId: this.currentServerId, isRunning: this.isRunning, maxConcurrentTasks: this.maxConcurrentTasks }; } /** * 生成UUID */ private generateUUID(): string { return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) { const r = Math.random() * 16 | 0; const v = c === 'x' ? r : (r & 0x3 | 0x8); return v.toString(16); }); } } /** * 数据库清理服务 */ export class DatabaseCleanupService { /** * 清理过期的结果记录 */ static async cleanupExpiredResults(): Promise { const currentTime = Math.floor(Date.now() / 1000); const result = await GenerationResult.deleteMany({ expires_at: { $lt: currentTime } }); return result || 0; } /** * 清理超时的任务 */ static async cleanupTimeoutTasks(): Promise { const currentTime = Math.floor(Date.now() / 1000); // 查找所有正在处理或轮询的任务 const activeTasks = await GenerationTask.find({ status: { $in: ['processing', 'polling'] } }); // 在内存中过滤超时任务(NeDB 不支持 $expr) const timeoutTasks = activeTasks.filter(task => { const elapsedTime = currentTime - (task.started_at || task.created_at); return elapsedTime > (task.task_timeout || 3600); // 默认1小时超时 }); // 批量更新超时任务为失败状态 const timeoutTaskIds = timeoutTasks.map(task => task.task_id); if (timeoutTaskIds.length > 0) { await GenerationTask.updateMany( { task_id: { $in: timeoutTaskIds } }, { status: 'failed', error_message: 'Task timeout', updated_at: currentTime, completed_at: currentTime } ); // 为超时任务创建失败结果记录 const failedResults = timeoutTasks.map(task => ({ task_id: task.task_id, task_type: task.task_type, server_id: task.server_id, status: 'failed' as const, original_urls: [], tos_urls: [], metadata: { total_files: 0, successful_uploads: 0, fail_reason: 'Task timeout after ' + task.task_timeout + ' seconds' }, created_at: currentTime, expires_at: currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400'), is_read: false, read_count: 0 })); if (failedResults.length > 0) { await GenerationResult.insertMany(failedResults); } } return timeoutTaskIds.length; } } export default TaskPollingService.getInstance();