1261 lines
47 KiB
TypeScript
1261 lines
47 KiB
TypeScript
import fs from 'fs-extra';
|
||
import path from 'path';
|
||
import { formatInTimeZone } from 'date-fns-tz';
|
||
import GenerationTask, { IGenerationTask } from '@/lib/database/models/GenerationTask.js';
|
||
import GenerationResult, { IGenerationResult } from '@/lib/database/models/GenerationResult.js';
|
||
import NeDBManager from '@/lib/database/nedb.js';
|
||
import logger from '@/lib/logger.js';
|
||
import TOSService from '@/lib/tos/tos-service.js';
|
||
import { generateImages as originalGenerateImages } from '@/api/controllers/images.js';
|
||
import { generateVideo as originalGenerateVideo } from '@/api/controllers/video.js';
|
||
import { request, image4Options_0302, image3Options, image4Options } from '@/api/controllers/core.js';
|
||
import EX from "@/api/consts/exceptions.ts";
|
||
|
||
const timeZone = 'Asia/Shanghai';
|
||
const TASK_POLLING_LOG_PATH = path.resolve("./logs/task_polling.log");
|
||
|
||
function taskLog(message: string) {
|
||
try {
|
||
const timestamp = formatInTimeZone(new Date(), timeZone, "yyyy-MM-dd HH:mm:ss.SSS");
|
||
const logMessage = `[TaskPolling][${timestamp}] ${message}`;
|
||
|
||
fs.ensureDirSync(path.dirname(TASK_POLLING_LOG_PATH));
|
||
fs.appendFileSync(TASK_POLLING_LOG_PATH, logMessage + "\n");
|
||
|
||
// 同时输出到控制台
|
||
logger.info(logMessage);
|
||
} catch (err) {
|
||
console.error("TaskPolling log write error:", err);
|
||
}
|
||
}
|
||
|
||
export class TaskPollingService {
|
||
private static instance: TaskPollingService;
|
||
private currentServerId: string;
|
||
private pollInterval: NodeJS.Timeout | null = null;
|
||
private isRunning: boolean = false;
|
||
private maxConcurrentTasks: number;
|
||
private cleanupCounter = 0;
|
||
|
||
private constructor() {
|
||
this.currentServerId = process.env.SERVICE_ID || 'jimeng-free-api';
|
||
this.maxConcurrentTasks = parseInt(process.env.MAX_CONCURRENT_TASKS || '3');
|
||
taskLog("TaskPollingService initialized");
|
||
}
|
||
|
||
public static getInstance(): TaskPollingService {
|
||
if (!TaskPollingService.instance) {
|
||
TaskPollingService.instance = new TaskPollingService();
|
||
}
|
||
return TaskPollingService.instance;
|
||
}
|
||
|
||
/**
|
||
* 启动轮询服务
|
||
*/
|
||
public async start(): Promise<void> {
|
||
if (this.isRunning) {
|
||
taskLog('Task polling service is already running');
|
||
return;
|
||
}
|
||
|
||
try {
|
||
// 确保NeDB数据库初始化
|
||
await NeDBManager.initialize();
|
||
taskLog('NeDB database initialized for task polling service');
|
||
|
||
const pollIntervalMs = parseInt(process.env.TASK_POLL_INTERVAL || '10') * 1000;
|
||
|
||
// 每10秒轮询一次(优化性能,减少频繁查询)
|
||
this.pollInterval = setInterval(async () => {
|
||
await this.processTasks();
|
||
}, pollIntervalMs);
|
||
|
||
this.isRunning = true;
|
||
taskLog(`Task polling service started for server: ${this.currentServerId}, interval: ${pollIntervalMs}ms`);
|
||
|
||
// 监听进程退出事件
|
||
process.on('SIGINT', () => this.gracefulShutdown());
|
||
process.on('SIGTERM', () => this.gracefulShutdown());
|
||
|
||
} catch (error) {
|
||
taskLog(`Failed to start task polling service: ${error.message}`);
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
public async stop(): Promise<void> {
|
||
if (!this.isRunning) {
|
||
return;
|
||
}
|
||
|
||
if (this.pollInterval) {
|
||
clearInterval(this.pollInterval);
|
||
this.pollInterval = null;
|
||
}
|
||
|
||
this.isRunning = false;
|
||
taskLog('Task polling service stopped');
|
||
}
|
||
|
||
/**
|
||
* 主处理方法
|
||
*/
|
||
private async processTasks(): Promise<void> {
|
||
try {
|
||
const currentTime = Math.floor(Date.now() / 1000);
|
||
|
||
// 1. 处理待处理任务
|
||
await this.processPendingTasks(currentTime);
|
||
|
||
// 2. 检查轮询任务
|
||
await this.checkPollingTasks(currentTime);
|
||
|
||
// 3. 检查和处理超时任务
|
||
await this.checkTimeoutTasks(currentTime);
|
||
|
||
// 4. 定期清理(每360次轮询,约1小时,基于10秒间隔)
|
||
this.cleanupCounter++;
|
||
if (this.cleanupCounter >= 360) {
|
||
await this.performCleanup();
|
||
this.cleanupCounter = 0;
|
||
}
|
||
|
||
} catch (error) {
|
||
taskLog(`Task polling error: ${error.message}`);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 检查超时任务
|
||
*/
|
||
private async checkTimeoutTasks(currentTime: number): Promise<void> {
|
||
try {
|
||
const timeoutCount = await DatabaseCleanupService.cleanupTimeoutTasks();
|
||
if (timeoutCount > 0) {
|
||
taskLog(`Marked ${timeoutCount} tasks as failed due to timeout`);
|
||
}
|
||
} catch (error) {
|
||
taskLog(`Failed to check timeout tasks: ${error.message}`);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 处理待处理任务
|
||
*/
|
||
private async processPendingTasks(currentTime: number): Promise<void> {
|
||
try {
|
||
// 获取当前服务器负载
|
||
const currentLoad = await this.getCurrentServerLoad();
|
||
if (currentLoad >= this.maxConcurrentTasks) {
|
||
return; // 服务器已满载
|
||
}
|
||
|
||
const availableSlots = this.maxConcurrentTasks - currentLoad;
|
||
|
||
// 获取待处理任务
|
||
const allTasks = await GenerationTask.find({
|
||
server_id: this.currentServerId,
|
||
status: 'pending'
|
||
});
|
||
|
||
// 手动排序并限制数量
|
||
const pendingTasks = allTasks
|
||
.sort((a, b) => a.created_at - b.created_at) // 先入先出
|
||
.slice(0, availableSlots);
|
||
|
||
for (const task of pendingTasks) {
|
||
await this.startTask(task, currentTime);
|
||
}
|
||
|
||
if (pendingTasks.length > 0) {
|
||
taskLog(`Started ${pendingTasks.length} pending tasks`);
|
||
}
|
||
} catch (error) {
|
||
taskLog(`Failed to process pending tasks: ${error.message}`);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 检查轮询任务
|
||
*/
|
||
private async checkPollingTasks(currentTime: number): Promise<void> {
|
||
try {
|
||
// 获取需要轮询的任务(排除正在处理成功状态的任务)
|
||
const pollingTasks = await GenerationTask.find({
|
||
server_id: this.currentServerId,
|
||
status: 'polling', // 只处理polling状态的任务,不处理processing_success状态
|
||
$or: [
|
||
{ next_poll_at: { $exists: false } },
|
||
{ next_poll_at: { $lte: currentTime } }
|
||
]
|
||
});
|
||
|
||
for (const task of pollingTasks) {
|
||
await this.pollTaskResult(task, currentTime);
|
||
}
|
||
|
||
if (pollingTasks.length > 0) {
|
||
taskLog(`Polled ${pollingTasks.length} tasks for results`);
|
||
}
|
||
|
||
// 查找需要重试上传的任务
|
||
const uploadingTasks = await GenerationTask.find({
|
||
server_id: this.currentServerId,
|
||
status: 'uploading'
|
||
});
|
||
|
||
for (const task of uploadingTasks) {
|
||
await this.retryTOSUpload(task, currentTime);
|
||
}
|
||
|
||
if (uploadingTasks.length > 0) {
|
||
taskLog(`Retried upload for ${uploadingTasks.length} tasks`);
|
||
}
|
||
} catch (error) {
|
||
taskLog(`Failed to check polling tasks: ${error.message}`);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 重试TOS上传
|
||
*/
|
||
private async retryTOSUpload(task: IGenerationTask, currentTime: number): Promise<void> {
|
||
try {
|
||
// 检查是否需要重试(避免过于频繁的重试)
|
||
const timeSinceLastUpdate = currentTime - task.updated_at;
|
||
const minRetryInterval = 60; // 最少等待60秒再重试,减少频繁重试
|
||
|
||
if (timeSinceLastUpdate < minRetryInterval) {
|
||
return; // 还未到重试时间
|
||
}
|
||
|
||
taskLog(`Retrying TOS upload for task ${task.task_id}, attempt ${(task.upload_retry_count || 0) + 1}`);
|
||
|
||
// 从processing_success状态重新获取原始URLs
|
||
// 这里需要重新获取生成的URLs,因为我们没有在任务中存储它们
|
||
// 为了简化,我们可以从internal_params中获取history_id,然后重新查询
|
||
const historyId = task.internal_params.history_id;
|
||
if (!historyId) {
|
||
throw new Error('Missing history_id for upload retry');
|
||
}
|
||
|
||
// 重新获取生成结果
|
||
const result = await request("post", "/mweb/v1/get_history_by_ids", task.internal_params.refresh_token, {
|
||
data: {
|
||
history_ids: [historyId],
|
||
image_info: task.task_type === 'image' ? this.getImageInfo() : undefined,
|
||
http_common_info: {
|
||
aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"),
|
||
},
|
||
},
|
||
});
|
||
|
||
if (!result[historyId] || !result[historyId].item_list) {
|
||
throw new Error('Failed to retrieve generation result for upload retry');
|
||
}
|
||
|
||
const { item_list } = result[historyId];
|
||
let originalUrls: string[] = [];
|
||
|
||
if (task.task_type === 'image') {
|
||
originalUrls = item_list.map((item) => {
|
||
let res_url = null;
|
||
let idata = item?.image;
|
||
let idata2 = item?.image?.large_images;
|
||
if(!idata2 || !idata2.length){
|
||
for(let key in idata){
|
||
if(idata[key] && idata[key].length && idata[key].length > 0){
|
||
if(idata[key][0].image_url){
|
||
res_url = idata[key][0].image_url;
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
}else{
|
||
res_url = idata2[0].image_url;
|
||
}
|
||
if(res_url) return res_url;
|
||
return item?.common_attr?.cover_url || null;
|
||
}).filter(url => url !== null);
|
||
} else {
|
||
originalUrls = item_list.map((item) => {
|
||
let res_url = null;
|
||
let vdata = item?.video?.transcoded_video||{};
|
||
for(let key in vdata){
|
||
if(vdata[key] && vdata[key].video_url){
|
||
res_url = vdata[key].video_url;
|
||
break;
|
||
}
|
||
}
|
||
return res_url || null;
|
||
}).filter(url => url !== null);
|
||
}
|
||
|
||
if (originalUrls.length === 0) {
|
||
throw new Error('No valid URLs found for upload retry');
|
||
}
|
||
|
||
// 执行上传
|
||
await this.performTOSUpload(task, originalUrls, currentTime);
|
||
|
||
} catch (error) {
|
||
taskLog(`Failed to retry TOS upload for task ${task.task_id}: ${error.message}`);
|
||
await this.handleUploadFailure(task, error.message, currentTime);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 开始处理任务
|
||
*/
|
||
private async startTask(task: IGenerationTask, currentTime: number): Promise<void> {
|
||
try {
|
||
// 原子更新任务状态为processing
|
||
const updateResult = await GenerationTask.updateOne(
|
||
{ task_id: task.task_id, status: 'pending' },
|
||
{
|
||
$set: {
|
||
status: 'processing',
|
||
started_at: currentTime,
|
||
updated_at: currentTime
|
||
}
|
||
}
|
||
);
|
||
|
||
if (updateResult === 0) {
|
||
return; // 任务已被其他进程处理
|
||
}
|
||
|
||
taskLog(`Starting task: ${task.task_id} (${task.task_type})`);
|
||
|
||
// 调用原有生成方法获取historyId
|
||
let historyId: string;
|
||
|
||
if (task.task_type === 'image') {
|
||
historyId = await this.callImageGeneration(task);
|
||
} else {
|
||
historyId = await this.callVideoGeneration(task);
|
||
}
|
||
|
||
// 更新任务为轮询状态
|
||
await GenerationTask.updateOne(
|
||
{ task_id: task.task_id },
|
||
{
|
||
$set: {
|
||
status: 'polling',
|
||
'internal_params.history_id': historyId,
|
||
next_poll_at: currentTime + task.poll_interval,
|
||
updated_at: currentTime
|
||
}
|
||
}
|
||
);
|
||
|
||
taskLog(`Task ${task.task_id} started successfully, history_id: ${historyId}`);
|
||
|
||
} catch (error) {
|
||
taskLog(`Failed to start task ${task.task_id}: ${error.message}`);
|
||
await this.markTaskFailed(task.task_id, error.message);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 轮询任务结果
|
||
*/
|
||
private async pollTaskResult(task: IGenerationTask, currentTime: number): Promise<void> {
|
||
try {
|
||
const historyId = task.internal_params.history_id;
|
||
if (!historyId) {
|
||
throw new Error('Missing history_id for polling');
|
||
}
|
||
|
||
taskLog(`Polling task result: ${task.task_id}, history_id: ${historyId}`);
|
||
|
||
// 调用即梦API检查结果
|
||
const result = await request("post", "/mweb/v1/get_history_by_ids", task.internal_params.refresh_token, {
|
||
data: {
|
||
history_ids: [historyId],
|
||
image_info: task.task_type === 'image' ? this.getImageInfo() : undefined,
|
||
http_common_info: {
|
||
aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"),
|
||
},
|
||
},
|
||
});
|
||
|
||
if (!result[historyId]) {
|
||
// 结果不存在,更新下次轮询时间
|
||
await GenerationTask.updateOne(
|
||
{ task_id: task.task_id },
|
||
{
|
||
$set: {
|
||
next_poll_at: currentTime + task.poll_interval,
|
||
updated_at: currentTime
|
||
}
|
||
}
|
||
);
|
||
return;
|
||
}
|
||
|
||
const { status, fail_code: failCode, item_list } = result[historyId];
|
||
|
||
if (status === 20) {
|
||
// 仍在生成中,更新下次轮询时间
|
||
await GenerationTask.updateOne(
|
||
{ task_id: task.task_id },
|
||
{
|
||
$set: {
|
||
next_poll_at: currentTime + task.poll_interval,
|
||
updated_at: currentTime
|
||
}
|
||
}
|
||
);
|
||
} else if (status === 10 || (status !== 30 && item_list && item_list.length > 0)) {
|
||
// 生成完成
|
||
await this.handleGenerationSuccess(task, item_list, currentTime);
|
||
} else {
|
||
// 生成失败
|
||
if(failCode == '2038' || failCode == '2041'){
|
||
await this.handleGenerationFailure(task, EX.API_CONTENT_FILTERED[1] as string, failCode, currentTime);
|
||
}else{
|
||
await this.handleGenerationFailure(task, EX.API_IMAGE_GENERATION_FAILED[1] as string, failCode, currentTime);
|
||
}
|
||
}
|
||
|
||
} catch (error) {
|
||
taskLog(`Failed to poll task ${task.task_id}: ${error.message}`);
|
||
// 轮询出错,更新下次轮询时间,增加重试计数
|
||
await this.handlePollingError(task, error, currentTime);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 处理生成成功
|
||
*/
|
||
private async handleGenerationSuccess(task: IGenerationTask, itemList: any[], currentTime: number): Promise<void> {
|
||
try {
|
||
let originalUrls: string[] = [];
|
||
|
||
if (task.task_type === 'image') {
|
||
originalUrls = itemList.map((item) => {
|
||
let res_url = null;
|
||
let idata = item?.image;
|
||
let idata2 = item?.image?.large_images;
|
||
if(!idata2 || !idata2.length){
|
||
for(let key in idata){
|
||
if(idata[key] && idata[key].length && idata[key].length > 0){
|
||
if(idata[key][0].image_url){
|
||
res_url = idata[key][0].image_url;
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
}else{
|
||
res_url = idata2[0].image_url;
|
||
}
|
||
if(res_url) return res_url;
|
||
return item?.common_attr?.cover_url || null;
|
||
}).filter(url => url !== null);
|
||
} else {
|
||
originalUrls = itemList.map((item) => {
|
||
let res_url = null;
|
||
let vdata = item?.video?.transcoded_video||{};
|
||
for(let key in vdata){
|
||
if(vdata[key] && vdata[key].video_url){
|
||
res_url = vdata[key].video_url;
|
||
break;
|
||
}
|
||
}
|
||
return res_url || null;
|
||
}).filter(url => url !== null);
|
||
}
|
||
|
||
if (originalUrls.length === 0) {
|
||
throw new Error('No valid URLs generated');
|
||
}
|
||
|
||
taskLog(`Task ${task.task_id} generated ${originalUrls.length} files`);
|
||
|
||
// 先标记任务为处理中,防止重复处理
|
||
const updateResult = await GenerationTask.updateOne(
|
||
{ task_id: task.task_id, status: 'polling' },
|
||
{
|
||
$set: {
|
||
status: 'processing_success',
|
||
updated_at: currentTime
|
||
}
|
||
}
|
||
);
|
||
|
||
if (updateResult === 0) {
|
||
taskLog(`Task ${task.task_id} already being processed by another instance`);
|
||
return; // 任务已被其他进程处理
|
||
}
|
||
|
||
// 开始上传到TOS
|
||
await this.startTOSUpload(task, originalUrls, currentTime);
|
||
|
||
} catch (error) {
|
||
taskLog(`Failed to handle success for task ${task.task_id}: ${error.message}`);
|
||
await this.markTaskFailed(task.task_id, error.message);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 开始TOS上传流程
|
||
*/
|
||
private async startTOSUpload(task: IGenerationTask, originalUrls: string[], currentTime: number): Promise<void> {
|
||
try {
|
||
// 标记任务为上传中状态
|
||
await GenerationTask.updateOne(
|
||
{ task_id: task.task_id },
|
||
{
|
||
$set: {
|
||
status: 'uploading',
|
||
upload_retry_count: 0,
|
||
updated_at: currentTime
|
||
}
|
||
}
|
||
);
|
||
|
||
// 执行上传
|
||
await this.performTOSUpload(task, originalUrls, currentTime);
|
||
|
||
} catch (error) {
|
||
taskLog(`Failed to start TOS upload for task ${task.task_id}: ${error.message}`);
|
||
await this.handleUploadFailure(task, error.message, currentTime);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 执行TOS上传
|
||
*/
|
||
private async performTOSUpload(task: IGenerationTask, originalUrls: string[], currentTime: number): Promise<void> {
|
||
try {
|
||
// 上传到TOS(带重试机制)
|
||
const tosUrls = await this.uploadToTOSWithRetry(originalUrls, task.task_type, task.task_id);
|
||
|
||
// 创建结果记录
|
||
const expireTime = currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400');
|
||
|
||
await GenerationResult.create({
|
||
task_id: task.task_id,
|
||
task_type: task.task_type,
|
||
server_id: task.server_id,
|
||
status: 'success',
|
||
original_urls: originalUrls,
|
||
tos_urls: tosUrls,
|
||
metadata: {
|
||
generation_time: (currentTime - (task.started_at || task.created_at)) * 1000,
|
||
total_files: originalUrls.length,
|
||
successful_uploads: tosUrls.length,
|
||
upload_retry_count: task.upload_retry_count || 0,
|
||
tos_upload_errors: tosUrls.length < originalUrls.length ? ['Some uploads failed'] : undefined
|
||
},
|
||
created_at: currentTime,
|
||
expires_at: expireTime,
|
||
is_read: false,
|
||
read_count: 0
|
||
});
|
||
|
||
// 标记任务完成
|
||
await GenerationTask.updateOne(
|
||
{ task_id: task.task_id },
|
||
{
|
||
$set: {
|
||
status: 'completed',
|
||
completed_at: currentTime,
|
||
updated_at: currentTime
|
||
}
|
||
}
|
||
);
|
||
|
||
taskLog(`Task ${task.task_id} completed successfully`);
|
||
|
||
} catch (error) {
|
||
taskLog(`TOS upload failed for task ${task.task_id}: ${error.message}`);
|
||
await this.handleUploadFailure(task, error.message, currentTime);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 处理上传失败
|
||
*/
|
||
private async handleUploadFailure(task: IGenerationTask, errorMessage: string, currentTime: number): Promise<void> {
|
||
const currentRetryCount = task.upload_retry_count || 0;
|
||
const maxRetries = task.max_upload_retries || 5;
|
||
|
||
if (currentRetryCount >= maxRetries) {
|
||
// 重试次数已达上限,标记任务失败
|
||
taskLog(`Task ${task.task_id} upload failed after ${maxRetries} retries`);
|
||
|
||
try {
|
||
// 创建失败结果记录
|
||
const expireTime = currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400');
|
||
|
||
await GenerationResult.create({
|
||
task_id: task.task_id,
|
||
task_type: task.task_type,
|
||
server_id: task.server_id,
|
||
status: 'failed',
|
||
original_urls: [],
|
||
tos_urls: [],
|
||
metadata: {
|
||
total_files: 0,
|
||
successful_uploads: 0,
|
||
upload_retry_count: currentRetryCount,
|
||
fail_reason: `TOS upload failed after ${maxRetries} retries: ${errorMessage}`
|
||
},
|
||
created_at: currentTime,
|
||
expires_at: expireTime,
|
||
is_read: false,
|
||
read_count: 0
|
||
});
|
||
|
||
// 标记任务失败
|
||
await GenerationTask.updateOne(
|
||
{ task_id: task.task_id },
|
||
{
|
||
$set: {
|
||
status: 'failed',
|
||
error_message: `Upload failed after ${maxRetries} retries: ${errorMessage}`,
|
||
completed_at: currentTime,
|
||
updated_at: currentTime
|
||
}
|
||
}
|
||
);
|
||
|
||
} catch (cleanupError) {
|
||
taskLog(`Failed to cleanup task ${task.task_id} after upload failure: ${cleanupError.message}`);
|
||
}
|
||
} else {
|
||
// 增加重试计数,保持上传中状态
|
||
await GenerationTask.updateOne(
|
||
{ task_id: task.task_id },
|
||
{
|
||
$set: {
|
||
upload_retry_count: currentRetryCount + 1,
|
||
updated_at: currentTime
|
||
}
|
||
}
|
||
);
|
||
|
||
taskLog(`Task ${task.task_id} upload retry ${currentRetryCount + 1}/${maxRetries} scheduled`);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 上传文件到TOS(带重试机制)
|
||
*/
|
||
private async uploadToTOSWithRetry(urls: string[], taskType: 'image' | 'video', task_id: string): Promise<string[]> {
|
||
const tosUrls: string[] = [];
|
||
|
||
for (const url of urls) {
|
||
try {
|
||
const fileName = taskType === 'image'
|
||
? `image-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.png`
|
||
: `video-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.mp4`;
|
||
|
||
const folder = taskType === 'image' ? 'images' : 'videos';
|
||
const tosUrl = await TOSService.uploadFromUrl(
|
||
url,
|
||
`jimeng_free/${folder}/${task_id}/${fileName}`,
|
||
{ maxRetries: 5, timeout: 120000 } // TOS客户端内部重试5次,超时2分钟
|
||
);
|
||
tosUrls.push(tosUrl);
|
||
|
||
taskLog(`${taskType} uploaded to TOS: ${url} -> ${tosUrl}`);
|
||
} catch (error) {
|
||
taskLog(`${taskType} upload to TOS failed: ${url} - ${error.message}`);
|
||
throw error; // 抛出错误,让上层处理重试逻辑
|
||
}
|
||
}
|
||
|
||
return tosUrls;
|
||
}
|
||
|
||
/**
|
||
* 处理生成失败
|
||
*/
|
||
private async handleGenerationFailure(task: IGenerationTask, errorMessage: string, failCode?: string, currentTime?: number): Promise<void> {
|
||
const now = currentTime || Math.floor(Date.now() / 1000);
|
||
|
||
try {
|
||
// 创建失败结果记录
|
||
const expireTime = now + parseInt(process.env.RESULT_EXPIRE_TIME || '86400');
|
||
|
||
await GenerationResult.create({
|
||
task_id: task.task_id,
|
||
task_type: task.task_type,
|
||
server_id: task.server_id,
|
||
status: 'failed',
|
||
original_urls: [],
|
||
tos_urls: [],
|
||
metadata: {
|
||
total_files: 0,
|
||
successful_uploads: 0,
|
||
fail_reason: errorMessage
|
||
},
|
||
created_at: now,
|
||
expires_at: expireTime,
|
||
is_read: false,
|
||
read_count: 0
|
||
});
|
||
|
||
// 标记任务失败
|
||
await GenerationTask.updateOne(
|
||
{ task_id: task.task_id },
|
||
{
|
||
$set: {
|
||
status: 'failed',
|
||
error_message: errorMessage,
|
||
fail_code: failCode,
|
||
completed_at: now,
|
||
updated_at: now
|
||
}
|
||
}
|
||
);
|
||
|
||
taskLog(`Task ${task.task_id} failed: ${errorMessage}`);
|
||
|
||
} catch (error) {
|
||
taskLog(`Failed to handle failure for task ${task.task_id}: ${error.message}`);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 处理轮询错误
|
||
*/
|
||
private async handlePollingError(task: IGenerationTask, error: Error, currentTime: number): Promise<void> {
|
||
const newRetryCount = task.retry_count + 1;
|
||
|
||
if (newRetryCount >= task.max_retries) {
|
||
await this.markTaskFailed(task.task_id, `Polling failed after ${task.max_retries} retries: ${error.message}`);
|
||
} else {
|
||
// 增加重试计数,延长轮询间隔
|
||
const nextPollDelay = task.poll_interval * Math.pow(2, newRetryCount); // 指数退避
|
||
|
||
await GenerationTask.updateOne(
|
||
{ task_id: task.task_id },
|
||
{
|
||
$set: {
|
||
retry_count: newRetryCount,
|
||
next_poll_at: currentTime + nextPollDelay,
|
||
updated_at: currentTime
|
||
}
|
||
}
|
||
);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 标记任务失败
|
||
*/
|
||
private async markTaskFailed(taskId: string, errorMessage: string): Promise<void> {
|
||
const currentTime = Math.floor(Date.now() / 1000);
|
||
|
||
try {
|
||
const task = await GenerationTask.findOne({ task_id: taskId });
|
||
if (task) {
|
||
await this.handleGenerationFailure(task, errorMessage, undefined, currentTime);
|
||
}
|
||
} catch (error) {
|
||
taskLog(`Failed to mark task ${taskId} as failed: ${error.message}`);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 调用图片生成API
|
||
*/
|
||
private async callImageGeneration(task: IGenerationTask): Promise<string> {
|
||
const { model, prompt, width = 1024, height = 1024, sample_strength = 0.5,
|
||
negative_prompt = "", generate_count = 4 } = task.original_params;
|
||
const refreshToken = task.internal_params.refresh_token;
|
||
|
||
// 映射模型
|
||
const MODEL_MAP = {
|
||
"jimeng-4.1": "high_aes_general_v41",
|
||
"jimeng-3.1": "high_aes_general_v30l_art_fangzhou:general_v3.0_18b",
|
||
"jimeng-3.0": "high_aes_general_v30l:general_v3.0_18b",
|
||
"jimeng-2.1": "high_aes_general_v21_L:general_v2.1_L",
|
||
"jimeng-2.0-pro": "high_aes_general_v20_L:general_v2.0_L",
|
||
"jimeng-2.0": "high_aes_general_v20:general_v2.0",
|
||
"jimeng-1.4": "high_aes_general_v14:general_v1.4",
|
||
"jimeng-xl-pro": "text2img_xl_sft",
|
||
};
|
||
const mappedModel = MODEL_MAP[model] || MODEL_MAP["jimeng-3.0"];
|
||
|
||
const componentId = this.generateUUID();
|
||
const submitId = this.generateUUID();
|
||
let isModel4 = model.startsWith("jimeng-4");
|
||
let options:any = image3Options(
|
||
mappedModel,
|
||
componentId,
|
||
prompt,
|
||
sample_strength,
|
||
height,
|
||
width,
|
||
negative_prompt,
|
||
'1k',
|
||
generate_count,
|
||
);
|
||
if(isModel4){
|
||
options = image4Options_0302(
|
||
mappedModel,
|
||
componentId,
|
||
prompt,
|
||
sample_strength,
|
||
height,
|
||
width,
|
||
negative_prompt,
|
||
'1k',
|
||
generate_count,
|
||
);
|
||
}
|
||
logger.info("生成图片 发起请求", options);
|
||
const { aigc_data } = await request(
|
||
"post",
|
||
"/mweb/v1/aigc_draft/generate",
|
||
refreshToken,
|
||
options,
|
||
// {
|
||
// params: {
|
||
// babi_param: encodeURIComponent(
|
||
// JSON.stringify({
|
||
// scenario: "image_video_generation",
|
||
// feature_key: "aigc_to_image",
|
||
// feature_entrance: "to_image",
|
||
// feature_entrance_detail: "to_image-" + mappedModel,
|
||
// })
|
||
// ),
|
||
// },
|
||
// data: {
|
||
// extend: {
|
||
// root_model: mappedModel,
|
||
// template_id: "",
|
||
// },
|
||
// submit_id: submitId,
|
||
// metrics_extra: JSON.stringify({
|
||
// templateId: "",
|
||
// generateCount: 1,
|
||
// promptSource: "custom",
|
||
// templateSource: "",
|
||
// lastRequestId: "",
|
||
// originRequestId: "",
|
||
// }),
|
||
// draft_content: JSON.stringify({
|
||
// type: "draft",
|
||
// id: this.generateUUID(),
|
||
// min_version: "3.0.2",
|
||
// is_from_tsn: true,
|
||
// version: "3.0.2",
|
||
// main_component_id: componentId,
|
||
// component_list: [
|
||
// {
|
||
// type: "image_base_component",
|
||
// id: componentId,
|
||
// min_version: "3.0.2",
|
||
// generate_type: "generate",
|
||
// aigc_mode: "workbench",
|
||
// abilities: {
|
||
// type: "",
|
||
// id: this.generateUUID(),
|
||
// generate: {
|
||
// type: "",
|
||
// id: this.generateUUID(),
|
||
// core_param: {
|
||
// type: "",
|
||
// id: this.generateUUID(),
|
||
// model: mappedModel,
|
||
// prompt,
|
||
// negative_prompt,
|
||
// seed: Math.floor(Math.random() * 100000000) + 2500000000,
|
||
// sample_strength,
|
||
// image_ratio: 1,
|
||
// large_image_info: {
|
||
// type: "",
|
||
// id: this.generateUUID(),
|
||
// height,
|
||
// width,
|
||
// resolution_type: "1k",
|
||
// },
|
||
// },
|
||
// history_option: {
|
||
// type: "",
|
||
// id: this.generateUUID(),
|
||
// },
|
||
// },
|
||
// },
|
||
// },
|
||
// ],
|
||
// }),
|
||
// http_common_info: {
|
||
// aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"),
|
||
// },
|
||
// },
|
||
// }
|
||
);
|
||
|
||
const historyId = aigc_data.history_record_id;
|
||
if (!historyId) {
|
||
throw new Error('Failed to get history_record_id from image generation API');
|
||
}
|
||
|
||
taskLog(`Image generation started: ${task.task_id}, history_id: ${historyId}`);
|
||
return historyId;
|
||
}
|
||
|
||
/**
|
||
* 调用视频生成API
|
||
*/
|
||
private async callVideoGeneration(task: IGenerationTask): Promise<string> {
|
||
const { prompt, images = [], is_pro = false, duration = 5000, ratio = '9:16' } = task.original_params;
|
||
const refreshToken = task.internal_params.refresh_token;
|
||
|
||
// 映射模型
|
||
const MODEL_MAP = {
|
||
"jimeng-v-3.0": "dreamina_ic_generate_video_model_vgfm_3.0",
|
||
"jimeng-v-3.0-pro": "dreamina_ic_generate_video_model_vgfm_3.0_pro",
|
||
};
|
||
|
||
let _model = "jimeng-v-3.0";
|
||
if (is_pro) {
|
||
_model = "jimeng-v-3.0-pro";
|
||
}
|
||
const mappedModel = MODEL_MAP[_model] || MODEL_MAP["jimeng-v-3.0"];
|
||
|
||
// 构建视频生成输入
|
||
let video_gen_inputs: any = {
|
||
type: "",
|
||
id: this.generateUUID(),
|
||
min_version: "1.0.0",
|
||
prompt: prompt,
|
||
video_mode: 2,
|
||
fps: 24,
|
||
duration_ms: duration,
|
||
};
|
||
|
||
if (images && images.length >= 1) {
|
||
// 首帧
|
||
video_gen_inputs.first_frame_image = {
|
||
type: "image",
|
||
id: this.generateUUID(),
|
||
source_from: "upload",
|
||
platform_type: 1,
|
||
name: "",
|
||
image_uri: images[0].url,
|
||
width: images[0].width,
|
||
height: images[0].height,
|
||
format: "",
|
||
uri: images[0].url,
|
||
};
|
||
}
|
||
|
||
if (images && images.length >= 2) {
|
||
// 尾帧
|
||
video_gen_inputs.end_frame_image = {
|
||
type: "image",
|
||
id: this.generateUUID(),
|
||
source_from: "upload",
|
||
platform_type: 1,
|
||
name: "",
|
||
image_uri: images[1].url,
|
||
width: images[1].width,
|
||
height: images[1].height,
|
||
format: "",
|
||
uri: images[1].url,
|
||
};
|
||
}
|
||
|
||
const componentId = this.generateUUID();
|
||
const originSubmitId = this.generateUUID();
|
||
|
||
const { aigc_data } = await request(
|
||
"post",
|
||
"/mweb/v1/aigc_draft/generate",
|
||
refreshToken,
|
||
{
|
||
params: {
|
||
babi_param: encodeURIComponent(
|
||
JSON.stringify({
|
||
scenario: "image_video_generation",
|
||
feature_key: "text_to_video",
|
||
feature_entrance: "to_video",
|
||
feature_entrance_detail: "to_image-text_to_video",
|
||
})
|
||
),
|
||
},
|
||
data: {
|
||
extend: {
|
||
m_video_commerce_info: {
|
||
resource_id: "generate_video",
|
||
resource_id_type: "str",
|
||
resource_sub_type: "aigc",
|
||
benefit_type: "basic_video_operation_vgfm_v_three"
|
||
},
|
||
root_model: mappedModel,
|
||
template_id: "",
|
||
history_option: {},
|
||
},
|
||
submit_id: this.generateUUID(),
|
||
metrics_extra: JSON.stringify({
|
||
promptSource: "custom",
|
||
originSubmitId: originSubmitId,
|
||
isDefaultSeed: 1,
|
||
originTemplateId: "",
|
||
imageNameMapping: {},
|
||
}),
|
||
draft_content: JSON.stringify({
|
||
type: "draft",
|
||
id: this.generateUUID(),
|
||
min_version: "3.0.5",
|
||
min_features: [],
|
||
is_from_tsn: true,
|
||
version: "3.2.2",
|
||
main_component_id: componentId,
|
||
component_list: [
|
||
{
|
||
type: "video_base_component",
|
||
id: componentId,
|
||
min_version: "1.0.0",
|
||
generate_type: "gen_video",
|
||
aigc_mode: "workbench",
|
||
metadata: {
|
||
type: "",
|
||
id: this.generateUUID(),
|
||
created_platform: 3,
|
||
created_platform_version: "",
|
||
created_time_in_ms: Date.now(),
|
||
created_did: "",
|
||
},
|
||
abilities: {
|
||
type: "",
|
||
id: this.generateUUID(),
|
||
gen_video: {
|
||
type: "",
|
||
id: this.generateUUID(),
|
||
text_to_video_params: {
|
||
type: "",
|
||
id: this.generateUUID(),
|
||
video_gen_inputs: [video_gen_inputs],
|
||
video_aspect_ratio: ratio,
|
||
seed: Math.floor(Math.random() * 100000000) + 2500000000,
|
||
model_req_key: mappedModel,
|
||
},
|
||
video_task_extra: {
|
||
promptSource: "custom",
|
||
originSubmitId: originSubmitId,
|
||
isDefaultSeed: 1,
|
||
originTemplateId: "",
|
||
imageNameMapping: {},
|
||
}
|
||
},
|
||
},
|
||
process_type: 1,
|
||
},
|
||
],
|
||
}),
|
||
http_common_info: {
|
||
aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"),
|
||
},
|
||
},
|
||
}
|
||
);
|
||
|
||
const historyId = aigc_data.history_record_id;
|
||
if (!historyId) {
|
||
throw new Error('Failed to get history_record_id from video generation API');
|
||
}
|
||
|
||
taskLog(`Video generation started: ${task.task_id}, history_id: ${historyId}`);
|
||
return historyId;
|
||
}
|
||
|
||
/**
|
||
* 上传文件到TOS
|
||
*/
|
||
private async uploadToTOS(urls: string[], taskType: 'image' | 'video', task_id: string): Promise<string[]> {
|
||
const tosUrls: string[] = [];
|
||
|
||
for (const url of urls) {
|
||
try {
|
||
const fileName = taskType === 'image'
|
||
? `image-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.png`
|
||
: `video-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.mp4`;
|
||
|
||
const folder = taskType === 'image' ? 'images' : 'videos';
|
||
const tosUrl = await TOSService.uploadFromUrl(url, `jimeng_free/${folder}/${task_id}/${fileName}`);
|
||
tosUrls.push(tosUrl);
|
||
|
||
taskLog(`${taskType} uploaded to TOS: ${url} -> ${tosUrl}`);
|
||
} catch (error) {
|
||
taskLog(`${taskType} upload to TOS failed: ${url} - ${error.message}`);
|
||
// 如果上传失败,保留原URL
|
||
tosUrls.push(url);
|
||
}
|
||
}
|
||
|
||
return tosUrls;
|
||
}
|
||
|
||
/**
|
||
* 获取当前服务器负载
|
||
*/
|
||
private async getCurrentServerLoad(): Promise<number> {
|
||
return await GenerationTask.countDocuments({
|
||
server_id: this.currentServerId,
|
||
status: { $in: ['processing', 'polling'] }
|
||
});
|
||
}
|
||
|
||
/**
|
||
* 获取图片查询信息
|
||
*/
|
||
private getImageInfo() {
|
||
return {
|
||
width: 2048,
|
||
height: 2048,
|
||
format: "webp",
|
||
image_scene_list: [
|
||
{
|
||
scene: "normal",
|
||
width: 2400,
|
||
height: 2400,
|
||
uniq_key: "2400",
|
||
format: "webp",
|
||
},
|
||
{
|
||
scene: "normal",
|
||
width: 1080,
|
||
height: 1080,
|
||
uniq_key: "1080",
|
||
format: "webp",
|
||
},
|
||
{
|
||
scene: "normal",
|
||
width: 720,
|
||
height: 720,
|
||
uniq_key: "720",
|
||
format: "webp",
|
||
},
|
||
],
|
||
};
|
||
}
|
||
|
||
/**
|
||
* 执行清理任务
|
||
*/
|
||
private async performCleanup(): Promise<void> {
|
||
try {
|
||
const expiredResults = await DatabaseCleanupService.cleanupExpiredResults();
|
||
const timeoutTasks = await DatabaseCleanupService.cleanupTimeoutTasks();
|
||
|
||
taskLog(`Cleanup completed - expired results: ${expiredResults}, timeout tasks: ${timeoutTasks}`);
|
||
} catch (error) {
|
||
taskLog(`Cleanup failed: ${error.message}`);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 优雅停机
|
||
*/
|
||
private async gracefulShutdown(): Promise<void> {
|
||
taskLog('Received shutdown signal, stopping task polling service...');
|
||
await this.stop();
|
||
}
|
||
|
||
public getServiceInfo() {
|
||
return {
|
||
serverId: this.currentServerId,
|
||
isRunning: this.isRunning,
|
||
maxConcurrentTasks: this.maxConcurrentTasks
|
||
};
|
||
}
|
||
|
||
/**
|
||
* 生成UUID
|
||
*/
|
||
private generateUUID(): string {
|
||
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
|
||
const r = Math.random() * 16 | 0;
|
||
const v = c === 'x' ? r : (r & 0x3 | 0x8);
|
||
return v.toString(16);
|
||
});
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 数据库清理服务
|
||
*/
|
||
export class DatabaseCleanupService {
|
||
/**
|
||
* 清理过期的结果记录
|
||
*/
|
||
static async cleanupExpiredResults(): Promise<number> {
|
||
const currentTime = Math.floor(Date.now() / 1000);
|
||
|
||
const result = await GenerationResult.deleteMany({
|
||
expires_at: { $lt: currentTime }
|
||
});
|
||
|
||
return result || 0;
|
||
}
|
||
|
||
/**
|
||
* 清理超时的任务
|
||
*/
|
||
static async cleanupTimeoutTasks(): Promise<number> {
|
||
const currentTime = Math.floor(Date.now() / 1000);
|
||
|
||
// 查找所有正在处理或轮询的任务
|
||
const activeTasks = await GenerationTask.find({
|
||
status: { $in: ['processing', 'polling'] }
|
||
});
|
||
|
||
// 在内存中过滤超时任务(NeDB 不支持 $expr)
|
||
const timeoutTasks = activeTasks.filter(task => {
|
||
const elapsedTime = currentTime - (task.started_at || task.created_at);
|
||
return elapsedTime > (task.task_timeout || 3600); // 默认1小时超时
|
||
});
|
||
|
||
// 批量更新超时任务为失败状态
|
||
const timeoutTaskIds = timeoutTasks.map(task => task.task_id);
|
||
|
||
if (timeoutTaskIds.length > 0) {
|
||
await GenerationTask.updateMany(
|
||
{ task_id: { $in: timeoutTaskIds } },
|
||
{
|
||
status: 'failed',
|
||
error_message: 'Task timeout',
|
||
updated_at: currentTime,
|
||
completed_at: currentTime
|
||
}
|
||
);
|
||
|
||
// 为超时任务创建失败结果记录
|
||
const failedResults = timeoutTasks.map(task => ({
|
||
task_id: task.task_id,
|
||
task_type: task.task_type,
|
||
server_id: task.server_id,
|
||
status: 'failed' as const,
|
||
original_urls: [],
|
||
tos_urls: [],
|
||
metadata: {
|
||
total_files: 0,
|
||
successful_uploads: 0,
|
||
fail_reason: 'Task timeout after ' + task.task_timeout + ' seconds'
|
||
},
|
||
created_at: currentTime,
|
||
expires_at: currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400'),
|
||
is_read: false,
|
||
read_count: 0
|
||
}));
|
||
|
||
if (failedResults.length > 0) {
|
||
await GenerationResult.insertMany(failedResults);
|
||
}
|
||
}
|
||
|
||
return timeoutTaskIds.length;
|
||
}
|
||
}
|
||
|
||
export default TaskPollingService.getInstance(); |