jimeng-free-api/src/lib/services/TaskPollingService.ts
2025-12-13 10:27:09 +08:00

1242 lines
46 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import fs from 'fs-extra';
import path from 'path';
import { formatInTimeZone } from 'date-fns-tz';
import GenerationTask, { IGenerationTask } from '@/lib/database/models/GenerationTask.js';
import GenerationResult, { IGenerationResult } from '@/lib/database/models/GenerationResult.js';
import NeDBManager from '@/lib/database/nedb.js';
import logger from '@/lib/logger.js';
import TOSService from '@/lib/tos/tos-service.js';
import { generateImages as originalGenerateImages } from '@/api/controllers/images.js';
import { generateVideo as originalGenerateVideo } from '@/api/controllers/video.js';
import { request } from '@/api/controllers/core.js';
import EX from "@/api/consts/exceptions.ts";
const timeZone = 'Asia/Shanghai';
const TASK_POLLING_LOG_PATH = path.resolve("./logs/task_polling.log");
function taskLog(message: string) {
try {
const timestamp = formatInTimeZone(new Date(), timeZone, "yyyy-MM-dd HH:mm:ss.SSS");
const logMessage = `[TaskPolling][${timestamp}] ${message}`;
fs.ensureDirSync(path.dirname(TASK_POLLING_LOG_PATH));
fs.appendFileSync(TASK_POLLING_LOG_PATH, logMessage + "\n");
// 同时输出到控制台
logger.info(logMessage);
} catch (err) {
console.error("TaskPolling log write error:", err);
}
}
export class TaskPollingService {
private static instance: TaskPollingService;
private currentServerId: string;
private pollInterval: NodeJS.Timeout | null = null;
private isRunning: boolean = false;
private maxConcurrentTasks: number;
private cleanupCounter = 0;
private constructor() {
this.currentServerId = process.env.SERVICE_ID || 'jimeng-free-api';
this.maxConcurrentTasks = parseInt(process.env.MAX_CONCURRENT_TASKS || '3');
taskLog("TaskPollingService initialized");
}
public static getInstance(): TaskPollingService {
if (!TaskPollingService.instance) {
TaskPollingService.instance = new TaskPollingService();
}
return TaskPollingService.instance;
}
/**
* 启动轮询服务
*/
public async start(): Promise<void> {
if (this.isRunning) {
taskLog('Task polling service is already running');
return;
}
try {
// 确保NeDB数据库初始化
await NeDBManager.initialize();
taskLog('NeDB database initialized for task polling service');
const pollIntervalMs = parseInt(process.env.TASK_POLL_INTERVAL || '10') * 1000;
// 每10秒轮询一次优化性能减少频繁查询
this.pollInterval = setInterval(async () => {
await this.processTasks();
}, pollIntervalMs);
this.isRunning = true;
taskLog(`Task polling service started for server: ${this.currentServerId}, interval: ${pollIntervalMs}ms`);
// 监听进程退出事件
process.on('SIGINT', () => this.gracefulShutdown());
process.on('SIGTERM', () => this.gracefulShutdown());
} catch (error) {
taskLog(`Failed to start task polling service: ${error.message}`);
throw error;
}
}
public async stop(): Promise<void> {
if (!this.isRunning) {
return;
}
if (this.pollInterval) {
clearInterval(this.pollInterval);
this.pollInterval = null;
}
this.isRunning = false;
taskLog('Task polling service stopped');
}
/**
* 主处理方法
*/
private async processTasks(): Promise<void> {
try {
const currentTime = Math.floor(Date.now() / 1000);
// 1. 处理待处理任务
await this.processPendingTasks(currentTime);
// 2. 检查轮询任务
await this.checkPollingTasks(currentTime);
// 3. 检查和处理超时任务
await this.checkTimeoutTasks(currentTime);
// 4. 定期清理每360次轮询约1小时基于10秒间隔
this.cleanupCounter++;
if (this.cleanupCounter >= 360) {
await this.performCleanup();
this.cleanupCounter = 0;
}
} catch (error) {
taskLog(`Task polling error: ${error.message}`);
}
}
/**
* 检查超时任务
*/
private async checkTimeoutTasks(currentTime: number): Promise<void> {
try {
const timeoutCount = await DatabaseCleanupService.cleanupTimeoutTasks();
if (timeoutCount > 0) {
taskLog(`Marked ${timeoutCount} tasks as failed due to timeout`);
}
} catch (error) {
taskLog(`Failed to check timeout tasks: ${error.message}`);
}
}
/**
* 处理待处理任务
*/
private async processPendingTasks(currentTime: number): Promise<void> {
try {
// 获取当前服务器负载
const currentLoad = await this.getCurrentServerLoad();
if (currentLoad >= this.maxConcurrentTasks) {
return; // 服务器已满载
}
const availableSlots = this.maxConcurrentTasks - currentLoad;
// 获取待处理任务
const allTasks = await GenerationTask.find({
server_id: this.currentServerId,
status: 'pending'
});
// 手动排序并限制数量
const pendingTasks = allTasks
.sort((a, b) => a.created_at - b.created_at) // 先入先出
.slice(0, availableSlots);
for (const task of pendingTasks) {
await this.startTask(task, currentTime);
}
if (pendingTasks.length > 0) {
taskLog(`Started ${pendingTasks.length} pending tasks`);
}
} catch (error) {
taskLog(`Failed to process pending tasks: ${error.message}`);
}
}
/**
* 检查轮询任务
*/
private async checkPollingTasks(currentTime: number): Promise<void> {
try {
// 获取需要轮询的任务(排除正在处理成功状态的任务)
const pollingTasks = await GenerationTask.find({
server_id: this.currentServerId,
status: 'polling', // 只处理polling状态的任务不处理processing_success状态
$or: [
{ next_poll_at: { $exists: false } },
{ next_poll_at: { $lte: currentTime } }
]
});
for (const task of pollingTasks) {
await this.pollTaskResult(task, currentTime);
}
if (pollingTasks.length > 0) {
taskLog(`Polled ${pollingTasks.length} tasks for results`);
}
// 查找需要重试上传的任务
const uploadingTasks = await GenerationTask.find({
server_id: this.currentServerId,
status: 'uploading'
});
for (const task of uploadingTasks) {
await this.retryTOSUpload(task, currentTime);
}
if (uploadingTasks.length > 0) {
taskLog(`Retried upload for ${uploadingTasks.length} tasks`);
}
} catch (error) {
taskLog(`Failed to check polling tasks: ${error.message}`);
}
}
/**
* 重试TOS上传
*/
private async retryTOSUpload(task: IGenerationTask, currentTime: number): Promise<void> {
try {
// 检查是否需要重试(避免过于频繁的重试)
const timeSinceLastUpdate = currentTime - task.updated_at;
const minRetryInterval = 60; // 最少等待60秒再重试减少频繁重试
if (timeSinceLastUpdate < minRetryInterval) {
return; // 还未到重试时间
}
taskLog(`Retrying TOS upload for task ${task.task_id}, attempt ${(task.upload_retry_count || 0) + 1}`);
// 从processing_success状态重新获取原始URLs
// 这里需要重新获取生成的URLs因为我们没有在任务中存储它们
// 为了简化我们可以从internal_params中获取history_id然后重新查询
const historyId = task.internal_params.history_id;
if (!historyId) {
throw new Error('Missing history_id for upload retry');
}
// 重新获取生成结果
const result = await request("post", "/mweb/v1/get_history_by_ids", task.internal_params.refresh_token, {
data: {
history_ids: [historyId],
image_info: task.task_type === 'image' ? this.getImageInfo() : undefined,
http_common_info: {
aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"),
},
},
});
if (!result[historyId] || !result[historyId].item_list) {
throw new Error('Failed to retrieve generation result for upload retry');
}
const { item_list } = result[historyId];
let originalUrls: string[] = [];
if (task.task_type === 'image') {
originalUrls = item_list.map((item) => {
let res_url = null;
let idata = item?.image;
let idata2 = item?.image?.large_images;
if(!idata2 || !idata2.length){
for(let key in idata){
if(idata[key] && idata[key].length && idata[key].length > 0){
if(idata[key][0].image_url){
res_url = idata[key][0].image_url;
break;
}
}
}
}else{
res_url = idata2[0].image_url;
}
if(res_url) return res_url;
return item?.common_attr?.cover_url || null;
}).filter(url => url !== null);
} else {
originalUrls = item_list.map((item) => {
let res_url = null;
let vdata = item?.video?.transcoded_video||{};
for(let key in vdata){
if(vdata[key] && vdata[key].video_url){
res_url = vdata[key].video_url;
break;
}
}
return res_url || null;
}).filter(url => url !== null);
}
if (originalUrls.length === 0) {
throw new Error('No valid URLs found for upload retry');
}
// 执行上传
await this.performTOSUpload(task, originalUrls, currentTime);
} catch (error) {
taskLog(`Failed to retry TOS upload for task ${task.task_id}: ${error.message}`);
await this.handleUploadFailure(task, error.message, currentTime);
}
}
/**
* 开始处理任务
*/
private async startTask(task: IGenerationTask, currentTime: number): Promise<void> {
try {
// 原子更新任务状态为processing
const updateResult = await GenerationTask.updateOne(
{ task_id: task.task_id, status: 'pending' },
{
$set: {
status: 'processing',
started_at: currentTime,
updated_at: currentTime
}
}
);
if (updateResult === 0) {
return; // 任务已被其他进程处理
}
taskLog(`Starting task: ${task.task_id} (${task.task_type})`);
// 调用原有生成方法获取historyId
let historyId: string;
if (task.task_type === 'image') {
historyId = await this.callImageGeneration(task);
} else {
historyId = await this.callVideoGeneration(task);
}
// 更新任务为轮询状态
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
$set: {
status: 'polling',
'internal_params.history_id': historyId,
next_poll_at: currentTime + task.poll_interval,
updated_at: currentTime
}
}
);
taskLog(`Task ${task.task_id} started successfully, history_id: ${historyId}`);
} catch (error) {
taskLog(`Failed to start task ${task.task_id}: ${error.message}`);
await this.markTaskFailed(task.task_id, error.message);
}
}
/**
* 轮询任务结果
*/
private async pollTaskResult(task: IGenerationTask, currentTime: number): Promise<void> {
try {
const historyId = task.internal_params.history_id;
if (!historyId) {
throw new Error('Missing history_id for polling');
}
taskLog(`Polling task result: ${task.task_id}, history_id: ${historyId}`);
// 调用即梦API检查结果
const result = await request("post", "/mweb/v1/get_history_by_ids", task.internal_params.refresh_token, {
data: {
history_ids: [historyId],
image_info: task.task_type === 'image' ? this.getImageInfo() : undefined,
http_common_info: {
aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"),
},
},
});
if (!result[historyId]) {
// 结果不存在,更新下次轮询时间
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
$set: {
next_poll_at: currentTime + task.poll_interval,
updated_at: currentTime
}
}
);
return;
}
const { status, fail_code: failCode, item_list } = result[historyId];
if (status === 20) {
// 仍在生成中,更新下次轮询时间
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
$set: {
next_poll_at: currentTime + task.poll_interval,
updated_at: currentTime
}
}
);
} else if (status === 10 || (status !== 30 && item_list && item_list.length > 0)) {
// 生成完成
await this.handleGenerationSuccess(task, item_list, currentTime);
} else {
// 生成失败
if(failCode == '2038' || failCode == '2041'){
await this.handleGenerationFailure(task, EX.API_CONTENT_FILTERED[1] as string, failCode, currentTime);
}else{
await this.handleGenerationFailure(task, EX.API_IMAGE_GENERATION_FAILED[1] as string, failCode, currentTime);
}
}
} catch (error) {
taskLog(`Failed to poll task ${task.task_id}: ${error.message}`);
// 轮询出错,更新下次轮询时间,增加重试计数
await this.handlePollingError(task, error, currentTime);
}
}
/**
* 处理生成成功
*/
private async handleGenerationSuccess(task: IGenerationTask, itemList: any[], currentTime: number): Promise<void> {
try {
let originalUrls: string[] = [];
if (task.task_type === 'image') {
originalUrls = itemList.map((item) => {
let res_url = null;
let idata = item?.image;
let idata2 = item?.image?.large_images;
if(!idata2 || !idata2.length){
for(let key in idata){
if(idata[key] && idata[key].length && idata[key].length > 0){
if(idata[key][0].image_url){
res_url = idata[key][0].image_url;
break;
}
}
}
}else{
res_url = idata2[0].image_url;
}
if(res_url) return res_url;
return item?.common_attr?.cover_url || null;
}).filter(url => url !== null);
} else {
originalUrls = itemList.map((item) => {
let res_url = null;
let vdata = item?.video?.transcoded_video||{};
for(let key in vdata){
if(vdata[key] && vdata[key].video_url){
res_url = vdata[key].video_url;
break;
}
}
return res_url || null;
}).filter(url => url !== null);
}
if (originalUrls.length === 0) {
throw new Error('No valid URLs generated');
}
taskLog(`Task ${task.task_id} generated ${originalUrls.length} files`);
// 先标记任务为处理中,防止重复处理
const updateResult = await GenerationTask.updateOne(
{ task_id: task.task_id, status: 'polling' },
{
$set: {
status: 'processing_success',
updated_at: currentTime
}
}
);
if (updateResult === 0) {
taskLog(`Task ${task.task_id} already being processed by another instance`);
return; // 任务已被其他进程处理
}
// 开始上传到TOS
await this.startTOSUpload(task, originalUrls, currentTime);
} catch (error) {
taskLog(`Failed to handle success for task ${task.task_id}: ${error.message}`);
await this.markTaskFailed(task.task_id, error.message);
}
}
/**
* 开始TOS上传流程
*/
private async startTOSUpload(task: IGenerationTask, originalUrls: string[], currentTime: number): Promise<void> {
try {
// 标记任务为上传中状态
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
$set: {
status: 'uploading',
upload_retry_count: 0,
updated_at: currentTime
}
}
);
// 执行上传
await this.performTOSUpload(task, originalUrls, currentTime);
} catch (error) {
taskLog(`Failed to start TOS upload for task ${task.task_id}: ${error.message}`);
await this.handleUploadFailure(task, error.message, currentTime);
}
}
/**
* 执行TOS上传
*/
private async performTOSUpload(task: IGenerationTask, originalUrls: string[], currentTime: number): Promise<void> {
try {
// 上传到TOS带重试机制
const tosUrls = await this.uploadToTOSWithRetry(originalUrls, task.task_type, task.task_id);
// 创建结果记录
const expireTime = currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400');
await GenerationResult.create({
task_id: task.task_id,
task_type: task.task_type,
server_id: task.server_id,
status: 'success',
original_urls: originalUrls,
tos_urls: tosUrls,
metadata: {
generation_time: (currentTime - (task.started_at || task.created_at)) * 1000,
total_files: originalUrls.length,
successful_uploads: tosUrls.length,
upload_retry_count: task.upload_retry_count || 0,
tos_upload_errors: tosUrls.length < originalUrls.length ? ['Some uploads failed'] : undefined
},
created_at: currentTime,
expires_at: expireTime,
is_read: false,
read_count: 0
});
// 标记任务完成
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
$set: {
status: 'completed',
completed_at: currentTime,
updated_at: currentTime
}
}
);
taskLog(`Task ${task.task_id} completed successfully`);
} catch (error) {
taskLog(`TOS upload failed for task ${task.task_id}: ${error.message}`);
await this.handleUploadFailure(task, error.message, currentTime);
}
}
/**
* 处理上传失败
*/
private async handleUploadFailure(task: IGenerationTask, errorMessage: string, currentTime: number): Promise<void> {
const currentRetryCount = task.upload_retry_count || 0;
const maxRetries = task.max_upload_retries || 5;
if (currentRetryCount >= maxRetries) {
// 重试次数已达上限,标记任务失败
taskLog(`Task ${task.task_id} upload failed after ${maxRetries} retries`);
try {
// 创建失败结果记录
const expireTime = currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400');
await GenerationResult.create({
task_id: task.task_id,
task_type: task.task_type,
server_id: task.server_id,
status: 'failed',
original_urls: [],
tos_urls: [],
metadata: {
total_files: 0,
successful_uploads: 0,
upload_retry_count: currentRetryCount,
fail_reason: `TOS upload failed after ${maxRetries} retries: ${errorMessage}`
},
created_at: currentTime,
expires_at: expireTime,
is_read: false,
read_count: 0
});
// 标记任务失败
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
$set: {
status: 'failed',
error_message: `Upload failed after ${maxRetries} retries: ${errorMessage}`,
completed_at: currentTime,
updated_at: currentTime
}
}
);
} catch (cleanupError) {
taskLog(`Failed to cleanup task ${task.task_id} after upload failure: ${cleanupError.message}`);
}
} else {
// 增加重试计数,保持上传中状态
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
$set: {
upload_retry_count: currentRetryCount + 1,
updated_at: currentTime
}
}
);
taskLog(`Task ${task.task_id} upload retry ${currentRetryCount + 1}/${maxRetries} scheduled`);
}
}
/**
* 上传文件到TOS带重试机制
*/
private async uploadToTOSWithRetry(urls: string[], taskType: 'image' | 'video', task_id: string): Promise<string[]> {
const tosUrls: string[] = [];
for (const url of urls) {
try {
const fileName = taskType === 'image'
? `image-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.png`
: `video-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.mp4`;
const folder = taskType === 'image' ? 'images' : 'videos';
const tosUrl = await TOSService.uploadFromUrl(
url,
`jimeng_free/${folder}/${task_id}/${fileName}`,
{ maxRetries: 5, timeout: 120000 } // TOS客户端内部重试5次超时2分钟
);
tosUrls.push(tosUrl);
taskLog(`${taskType} uploaded to TOS: ${url} -> ${tosUrl}`);
} catch (error) {
taskLog(`${taskType} upload to TOS failed: ${url} - ${error.message}`);
throw error; // 抛出错误,让上层处理重试逻辑
}
}
return tosUrls;
}
/**
* 处理生成失败
*/
private async handleGenerationFailure(task: IGenerationTask, errorMessage: string, failCode?: string, currentTime?: number): Promise<void> {
const now = currentTime || Math.floor(Date.now() / 1000);
try {
// 创建失败结果记录
const expireTime = now + parseInt(process.env.RESULT_EXPIRE_TIME || '86400');
await GenerationResult.create({
task_id: task.task_id,
task_type: task.task_type,
server_id: task.server_id,
status: 'failed',
original_urls: [],
tos_urls: [],
metadata: {
total_files: 0,
successful_uploads: 0,
fail_reason: errorMessage
},
created_at: now,
expires_at: expireTime,
is_read: false,
read_count: 0
});
// 标记任务失败
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
$set: {
status: 'failed',
error_message: errorMessage,
fail_code: failCode,
completed_at: now,
updated_at: now
}
}
);
taskLog(`Task ${task.task_id} failed: ${errorMessage}`);
} catch (error) {
taskLog(`Failed to handle failure for task ${task.task_id}: ${error.message}`);
}
}
/**
* 处理轮询错误
*/
private async handlePollingError(task: IGenerationTask, error: Error, currentTime: number): Promise<void> {
const newRetryCount = task.retry_count + 1;
if (newRetryCount >= task.max_retries) {
await this.markTaskFailed(task.task_id, `Polling failed after ${task.max_retries} retries: ${error.message}`);
} else {
// 增加重试计数,延长轮询间隔
const nextPollDelay = task.poll_interval * Math.pow(2, newRetryCount); // 指数退避
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
$set: {
retry_count: newRetryCount,
next_poll_at: currentTime + nextPollDelay,
updated_at: currentTime
}
}
);
}
}
/**
* 标记任务失败
*/
private async markTaskFailed(taskId: string, errorMessage: string): Promise<void> {
const currentTime = Math.floor(Date.now() / 1000);
try {
const task = await GenerationTask.findOne({ task_id: taskId });
if (task) {
await this.handleGenerationFailure(task, errorMessage, undefined, currentTime);
}
} catch (error) {
taskLog(`Failed to mark task ${taskId} as failed: ${error.message}`);
}
}
/**
* 调用图片生成API
*/
private async callImageGeneration(task: IGenerationTask): Promise<string> {
const { model, prompt, width = 1024, height = 1024, sample_strength = 0.5, negative_prompt = "" } = task.original_params;
const refreshToken = task.internal_params.refresh_token;
// 映射模型
const MODEL_MAP = {
"jimeng-4.5": "high_aes_general_v40l",
"jimeng-4.1": "high_aes_general_v41",
"jimeng-4.0": "high_aes_general_v40",
"jimeng-3.1": "high_aes_general_v30l_art_fangzhou:general_v3.0_18b",
"jimeng-3.0": "high_aes_general_v30l:general_v3.0_18b",
"jimeng-2.1": "high_aes_general_v21_L:general_v2.1_L",
"jimeng-2.0-pro": "high_aes_general_v20_L:general_v2.0_L",
"jimeng-2.0": "high_aes_general_v20:general_v2.0",
"jimeng-1.4": "high_aes_general_v14:general_v1.4",
"jimeng-xl-pro": "text2img_xl_sft",
};
const mappedModel = MODEL_MAP[model] || MODEL_MAP["jimeng-3.0"];
// 4.x 模型特殊参数
const isV4 = model.startsWith('jimeng-4');
// const currentVersion = isV4 ? '3.3.7' : "3.0.2";
const currentVersion = "3.0.2";
const resolutionType = isV4 ? '2k' : '1k';
const componentId = this.generateUUID();
const submitId = this.generateUUID();
const { aigc_data } = await request(
"post",
"/mweb/v1/aigc_draft/generate",
refreshToken,
{
params: {
babi_param: encodeURIComponent(
JSON.stringify({
scenario: "image_video_generation",
feature_key: "aigc_to_image",
feature_entrance: "to_image",
feature_entrance_detail: "to_image-" + mappedModel,
})
),
},
data: {
extend: {
root_model: mappedModel,
template_id: "",
},
submit_id: submitId,
metrics_extra: JSON.stringify({
templateId: "",
generateCount: 1,
promptSource: "custom",
templateSource: "",
lastRequestId: "",
originRequestId: "",
}),
draft_content: JSON.stringify({
type: "draft",
id: this.generateUUID(),
min_version: "3.0.2",
is_from_tsn: true,
version: currentVersion,
main_component_id: componentId,
component_list: [
{
type: "image_base_component",
id: componentId,
min_version: "3.0.2",
generate_type: "generate",
aigc_mode: "workbench",
abilities: {
type: "",
id: this.generateUUID(),
generate: {
type: "",
id: this.generateUUID(),
core_param: {
type: "",
id: this.generateUUID(),
model: mappedModel,
prompt,
negative_prompt,
seed: Math.floor(Math.random() * 100000000) + 2500000000,
sample_strength,
image_ratio: 1,
large_image_info: {
type: "",
id: this.generateUUID(),
height,
width,
resolution_type: resolutionType,
},
},
history_option: {
type: "",
id: this.generateUUID(),
},
},
},
},
],
}),
http_common_info: {
aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"),
},
},
}
);
const historyId = aigc_data.history_record_id;
if (!historyId) {
throw new Error('Failed to get history_record_id from image generation API');
}
taskLog(`Image generation started: ${task.task_id}, history_id: ${historyId}`);
return historyId;
}
/**
* 调用视频生成API
*/
private async callVideoGeneration(task: IGenerationTask): Promise<string> {
const { prompt, images = [], is_pro = false, duration = 5000, ratio = '9:16' } = task.original_params;
const refreshToken = task.internal_params.refresh_token;
// 映射模型
const MODEL_MAP = {
"jimeng-v-3.0": "dreamina_ic_generate_video_model_vgfm_3.0",
"jimeng-v-3.0-pro": "dreamina_ic_generate_video_model_vgfm_3.0_pro",
};
let _model = "jimeng-v-3.0";
if (is_pro) {
_model = "jimeng-v-3.0-pro";
}
const mappedModel = MODEL_MAP[_model] || MODEL_MAP["jimeng-v-3.0"];
// 构建视频生成输入
let video_gen_inputs: any = {
type: "",
id: this.generateUUID(),
min_version: "1.0.0",
prompt: prompt,
video_mode: 2,
fps: 24,
duration_ms: duration,
};
if (images && images.length >= 1) {
// 首帧
video_gen_inputs.first_frame_image = {
type: "image",
id: this.generateUUID(),
source_from: "upload",
platform_type: 1,
name: "",
image_uri: images[0].url,
width: images[0].width,
height: images[0].height,
format: "",
uri: images[0].url,
};
}
if (images && images.length >= 2) {
// 尾帧
video_gen_inputs.end_frame_image = {
type: "image",
id: this.generateUUID(),
source_from: "upload",
platform_type: 1,
name: "",
image_uri: images[1].url,
width: images[1].width,
height: images[1].height,
format: "",
uri: images[1].url,
};
}
const componentId = this.generateUUID();
const originSubmitId = this.generateUUID();
const { aigc_data } = await request(
"post",
"/mweb/v1/aigc_draft/generate",
refreshToken,
{
params: {
babi_param: encodeURIComponent(
JSON.stringify({
scenario: "image_video_generation",
feature_key: "text_to_video",
feature_entrance: "to_video",
feature_entrance_detail: "to_image-text_to_video",
})
),
},
data: {
extend: {
m_video_commerce_info: {
resource_id: "generate_video",
resource_id_type: "str",
resource_sub_type: "aigc",
benefit_type: "basic_video_operation_vgfm_v_three"
},
root_model: mappedModel,
template_id: "",
history_option: {},
},
submit_id: this.generateUUID(),
metrics_extra: JSON.stringify({
promptSource: "custom",
originSubmitId: originSubmitId,
isDefaultSeed: 1,
originTemplateId: "",
imageNameMapping: {},
}),
draft_content: JSON.stringify({
type: "draft",
id: this.generateUUID(),
min_version: "3.0.5",
min_features: [],
is_from_tsn: true,
version: "3.2.2",
main_component_id: componentId,
component_list: [
{
type: "video_base_component",
id: componentId,
min_version: "1.0.0",
generate_type: "gen_video",
aigc_mode: "workbench",
metadata: {
type: "",
id: this.generateUUID(),
created_platform: 3,
created_platform_version: "",
created_time_in_ms: Date.now(),
created_did: "",
},
abilities: {
type: "",
id: this.generateUUID(),
gen_video: {
type: "",
id: this.generateUUID(),
text_to_video_params: {
type: "",
id: this.generateUUID(),
video_gen_inputs: [video_gen_inputs],
video_aspect_ratio: ratio,
seed: Math.floor(Math.random() * 100000000) + 2500000000,
model_req_key: mappedModel,
},
video_task_extra: {
promptSource: "custom",
originSubmitId: originSubmitId,
isDefaultSeed: 1,
originTemplateId: "",
imageNameMapping: {},
}
},
},
process_type: 1,
},
],
}),
http_common_info: {
aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"),
},
},
}
);
const historyId = aigc_data.history_record_id;
if (!historyId) {
throw new Error('Failed to get history_record_id from video generation API');
}
taskLog(`Video generation started: ${task.task_id}, history_id: ${historyId}`);
return historyId;
}
/**
* 上传文件到TOS
*/
private async uploadToTOS(urls: string[], taskType: 'image' | 'video', task_id: string): Promise<string[]> {
const tosUrls: string[] = [];
for (const url of urls) {
try {
const fileName = taskType === 'image'
? `image-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.png`
: `video-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.mp4`;
const folder = taskType === 'image' ? 'images' : 'videos';
const tosUrl = await TOSService.uploadFromUrl(url, `jimeng_free/${folder}/${task_id}/${fileName}`);
tosUrls.push(tosUrl);
taskLog(`${taskType} uploaded to TOS: ${url} -> ${tosUrl}`);
} catch (error) {
taskLog(`${taskType} upload to TOS failed: ${url} - ${error.message}`);
// 如果上传失败保留原URL
tosUrls.push(url);
}
}
return tosUrls;
}
/**
* 获取当前服务器负载
*/
private async getCurrentServerLoad(): Promise<number> {
return await GenerationTask.countDocuments({
server_id: this.currentServerId,
status: { $in: ['processing', 'polling'] }
});
}
/**
* 获取图片查询信息
*/
private getImageInfo() {
return {
width: 2048,
height: 2048,
format: "webp",
image_scene_list: [
{
scene: "normal",
width: 2400,
height: 2400,
uniq_key: "2400",
format: "webp",
},
{
scene: "normal",
width: 1080,
height: 1080,
uniq_key: "1080",
format: "webp",
},
{
scene: "normal",
width: 720,
height: 720,
uniq_key: "720",
format: "webp",
},
],
};
}
/**
* 执行清理任务
*/
private async performCleanup(): Promise<void> {
try {
const expiredResults = await DatabaseCleanupService.cleanupExpiredResults();
const timeoutTasks = await DatabaseCleanupService.cleanupTimeoutTasks();
taskLog(`Cleanup completed - expired results: ${expiredResults}, timeout tasks: ${timeoutTasks}`);
} catch (error) {
taskLog(`Cleanup failed: ${error.message}`);
}
}
/**
* 优雅停机
*/
private async gracefulShutdown(): Promise<void> {
taskLog('Received shutdown signal, stopping task polling service...');
await this.stop();
}
public getServiceInfo() {
return {
serverId: this.currentServerId,
isRunning: this.isRunning,
maxConcurrentTasks: this.maxConcurrentTasks
};
}
/**
* 生成UUID
*/
private generateUUID(): string {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
const r = Math.random() * 16 | 0;
const v = c === 'x' ? r : (r & 0x3 | 0x8);
return v.toString(16);
});
}
}
/**
* 数据库清理服务
*/
export class DatabaseCleanupService {
/**
* 清理过期的结果记录
*/
static async cleanupExpiredResults(): Promise<number> {
const currentTime = Math.floor(Date.now() / 1000);
const result = await GenerationResult.deleteMany({
expires_at: { $lt: currentTime }
});
return result || 0;
}
/**
* 清理超时的任务
*/
static async cleanupTimeoutTasks(): Promise<number> {
const currentTime = Math.floor(Date.now() / 1000);
// 查找所有正在处理或轮询的任务
const activeTasks = await GenerationTask.find({
status: { $in: ['processing', 'polling'] }
});
// 在内存中过滤超时任务NeDB 不支持 $expr
const timeoutTasks = activeTasks.filter(task => {
const elapsedTime = currentTime - (task.started_at || task.created_at);
return elapsedTime > (task.task_timeout || 3600); // 默认1小时超时
});
// 批量更新超时任务为失败状态
const timeoutTaskIds = timeoutTasks.map(task => task.task_id);
if (timeoutTaskIds.length > 0) {
await GenerationTask.updateMany(
{ task_id: { $in: timeoutTaskIds } },
{
status: 'failed',
error_message: 'Task timeout',
updated_at: currentTime,
completed_at: currentTime
}
);
// 为超时任务创建失败结果记录
const failedResults = timeoutTasks.map(task => ({
task_id: task.task_id,
task_type: task.task_type,
server_id: task.server_id,
status: 'failed' as const,
original_urls: [],
tos_urls: [],
metadata: {
total_files: 0,
successful_uploads: 0,
fail_reason: 'Task timeout after ' + task.task_timeout + ' seconds'
},
created_at: currentTime,
expires_at: currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400'),
is_read: false,
read_count: 0
}));
if (failedResults.length > 0) {
await GenerationResult.insertMany(failedResults);
}
}
return timeoutTaskIds.length;
}
}
export default TaskPollingService.getInstance();