数据库模式

This commit is contained in:
jonathang4 2025-08-27 15:20:38 +08:00
parent 3804c58913
commit 997e50c6e0
7 changed files with 1666 additions and 67 deletions

View File

@ -5,18 +5,28 @@ import { generateImages } from "@/api/controllers/images.ts";
import { tokenSplit } from "@/api/controllers/core.ts"; import { tokenSplit } from "@/api/controllers/core.ts";
import util from "@/lib/util.ts"; import util from "@/lib/util.ts";
import { ImagesTaskCache } from '@/api/ImagesTaskCache.ts'; import { ImagesTaskCache } from '@/api/ImagesTaskCache.ts';
import DatabaseGenerationService from '@/lib/services/DatabaseGenerationService.js';
// 通过环境变量控制新旧方法
const USE_DATABASE_MODE = process.env.USE_DATABASE_MODE === 'true';
export default { export default {
prefix: "/v1/images", prefix: "/v1/images",
get: { get: {
"/query": async (request: Request) => { "/query": async (request: Request) => {
const imagesTaskCache = ImagesTaskCache.getInstance();
request request
.validate("query.task_id", _.isString) // 从 query 中校验 .validate("query.task_id", _.isString); // 从 query 中校验
const { const {
task_id, task_id,
} = request.query; // 从 query 中获取 } = request.query; // 从 query 中获取
if (USE_DATABASE_MODE) {
// 使用新的数据库方法
return await DatabaseGenerationService.queryTaskResult(task_id);
} else {
// 使用原有的内存缓存方法
const imagesTaskCache = ImagesTaskCache.getInstance();
// 使用新的方法获取任务结果并清理缓存 // 使用新的方法获取任务结果并清理缓存
let res = imagesTaskCache.getTaskResultAndClear(task_id); let res = imagesTaskCache.getTaskResultAndClear(task_id);
@ -34,6 +44,7 @@ export default {
data: { task_id, url: "", status: res || 0 }, data: { task_id, url: "", status: res || 0 },
}; };
} }
}
}, },
}, },
@ -64,29 +75,29 @@ export default {
response_format, response_format,
} = request.body; } = request.body;
const responseFormat = _.defaultTo(response_format, "url"); const responseFormat = _.defaultTo(response_format, "url");
if (USE_DATABASE_MODE) {
// 使用新的数据库方法
await DatabaseGenerationService.generateImagesV2('jimeng-3.0', task_id, prompt, {
width,
height,
sampleStrength: 0.5,
negativePrompt: "",
response_format: responseFormat
}, token);
} else {
// 使用原有方法(不等待结果)
generateImages('jimeng-3.0', task_id, prompt, { generateImages('jimeng-3.0', task_id, prompt, {
width, width,
height, height,
sampleStrength:0.5, sampleStrength: 0.5,
negativePrompt:"", negativePrompt: "",
}, token); }, token);
// let data = []; }
// if (responseFormat == "b64_json") {
// data = (
// await Promise.all(imageUrls.map((url) => util.fetchFileBASE64(url)))
// ).map((b64) => ({ b64_json: b64 }));
// } else {
// data = imageUrls.map((url) => ({
// url,
// }));
// }
// return {
// created: util.unixTimestamp(),
// data,
// };
return { return {
created: util.unixTimestamp(), created: util.unixTimestamp(),
data:'success', data: 'success',
}; };
}, },
}, },

View File

@ -5,18 +5,28 @@ import { generateVideo } from "@/api/controllers/video.ts";
import { tokenSplit } from "@/api/controllers/core.ts"; import { tokenSplit } from "@/api/controllers/core.ts";
import util from "@/lib/util.ts"; import util from "@/lib/util.ts";
import { VideoTaskCache } from '@/api/VideoTaskCache.ts'; import { VideoTaskCache } from '@/api/VideoTaskCache.ts';
import databaseService from '@/lib/services/DatabaseGenerationService.js';
// 环境变量控制新旧方法切换
const USE_DATABASE_MODE = process.env.USE_DATABASE_MODE === 'true';
export default { export default {
prefix: "/v1/video", prefix: "/v1/video",
get: { get: {
"/query": async (request: Request) => { "/query": async (request: Request) => {
const videoTaskCache = VideoTaskCache.getInstance();
request request
.validate("query.task_id", _.isString) // 从 query 中校验 .validate("query.task_id", _.isString) // 从 query 中校验
const { const {
task_id, task_id,
} = request.query; // 从 query 中获取 } = request.query; // 从 query 中获取
if (USE_DATABASE_MODE) {
// 使用数据库模式
return await databaseService.queryTaskResult(task_id);
} else {
// 使用原有的内存缓存模式
const videoTaskCache = VideoTaskCache.getInstance();
// 使用新的方法获取任务结果并清理缓存 // 使用新的方法获取任务结果并清理缓存
let res = videoTaskCache.getTaskResultAndClear(task_id); let res = videoTaskCache.getTaskResultAndClear(task_id);
@ -34,6 +44,7 @@ export default {
data: { task_id, url: "", status: res || 0 }, data: { task_id, url: "", status: res || 0 },
}; };
} }
}
}, },
}, },
@ -47,6 +58,7 @@ export default {
.validate("body.duration", v => _.isUndefined(v) || _.isFinite(v)) .validate("body.duration", v => _.isUndefined(v) || _.isFinite(v))
.validate("body.ratio", v => _.isUndefined(v) || _.isString(v)) .validate("body.ratio", v => _.isUndefined(v) || _.isString(v))
.validate("headers.authorization", _.isString); .validate("headers.authorization", _.isString);
// refresh_token切分 // refresh_token切分
const tokens = tokenSplit(request.headers.authorization); const tokens = tokenSplit(request.headers.authorization);
// 随机挑选一个refresh_token // 随机挑选一个refresh_token
@ -59,21 +71,30 @@ export default {
duration, duration,
ratio, ratio,
} = request.body; } = request.body;
// const imageUrls = await generateVideo(model, task_id, prompt, {
//不等结果 直接返回 if (USE_DATABASE_MODE) {
generateVideo(task_id, prompt, { // 使用数据库模式
images:images, await databaseService.generateVideoV2(task_id, prompt, {
isPro:is_pro, images: images,
duration:duration*1000, isPro: is_pro,
duration: duration ? duration * 1000 : undefined,
ratio, ratio,
}, token); }, token);
// let data = []; } else {
// data = imageUrls.map((url) => ({ // 使用原有的内存缓存模式
// url, // const imageUrls = await generateVideo(model, task_id, prompt, {
// })); // 不等结果 直接返回
generateVideo(task_id, prompt, {
images: images,
isPro: is_pro,
duration: duration * 1000,
ratio,
}, token);
}
return { return {
created: util.unixTimestamp(), created: util.unixTimestamp(),
data:'success', data: 'success',
}; };
}, },
}, },

View File

@ -8,6 +8,7 @@ import routes from "@/api/routes/index.ts";
import logger from "@/lib/logger.ts"; import logger from "@/lib/logger.ts";
import mongoDBManager from "@/lib/database/mongodb.ts"; import mongoDBManager from "@/lib/database/mongodb.ts";
import heartbeatService from "@/lib/services/HeartbeatService.ts"; import heartbeatService from "@/lib/services/HeartbeatService.ts";
import taskPollingService from "@/lib/services/TaskPollingService.js";
const startupTime = performance.now(); const startupTime = performance.now();
@ -41,6 +42,17 @@ const startupTime = performance.now();
} }
} }
// 启动任务轮询服务(仅在数据库模式下)
const useDatabaseMode = process.env.USE_DATABASE_MODE === 'true';
if (useDatabaseMode && mongoDBManager.isMongoConnected()) {
try {
await taskPollingService.start();
logger.success("Task polling service started");
} catch (error) {
logger.warn("Failed to start task polling service:", error.message);
}
}
config.service.bindAddress && config.service.bindAddress &&
logger.success("Service bind address:", config.service.bindAddress); logger.success("Service bind address:", config.service.bindAddress);
})() })()

View File

@ -0,0 +1,90 @@
import mongoose, { Schema, Document } from 'mongoose';
// 生成结果表数据模型
export interface IGenerationResult extends Document {
task_id: string; // 关联的任务ID主键
task_type: 'image' | 'video'; // 任务类型
server_id: string; // 处理服务器ID
// 生成结果
status: 'success' | 'failed'; // 最终状态
original_urls: string[]; // 原始URL数组即梦返回的地址
tos_urls: string[]; // TOS URL数组上传后的地址
// 处理元数据
metadata: {
generation_time?: number; // 生成耗时(毫秒)
tos_upload_time?: number; // TOS上传耗时毫秒
total_files: number; // 文件总数
successful_uploads: number; // 成功上传数量
tos_upload_errors?: string[]; // TOS上传错误信息
fail_reason?: string; // 失败原因
};
// 时间管理
created_at: number; // 创建时间戳(秒)
expires_at: number; // 过期时间戳用于自动清理默认24小时后
}
const GenerationResultSchema: Schema = new Schema({
task_id: {
type: String,
required: true,
unique: true,
index: true
},
task_type: {
type: String,
required: true,
enum: ['image', 'video']
},
server_id: {
type: String,
required: true,
index: true
},
status: {
type: String,
required: true,
enum: ['success', 'failed']
},
original_urls: {
type: [String],
default: []
},
tos_urls: {
type: [String],
default: []
},
metadata: {
generation_time: Number,
tos_upload_time: Number,
total_files: { type: Number, required: true },
successful_uploads: { type: Number, required: true },
tos_upload_errors: [String],
fail_reason: String
},
created_at: {
type: Number,
default: () => Math.floor(Date.now() / 1000),
index: true
},
expires_at: {
type: Number,
required: true,
index: true
}
}, {
collection: 'jimeng_free_generation_results',
timestamps: false // 使用自定义时间戳
});
// 创建索引
GenerationResultSchema.index({ task_id: 1 });
GenerationResultSchema.index({ expires_at: 1 }); // 用于过期清理
GenerationResultSchema.index({ server_id: 1, created_at: 1 });
// 设置TTL索引自动清理过期记录
GenerationResultSchema.index({ expires_at: 1 }, { expireAfterSeconds: 0 });
export default mongoose.model<IGenerationResult>('GenerationResult', GenerationResultSchema);

View File

@ -0,0 +1,152 @@
import mongoose, { Schema, Document } from 'mongoose';
// 生成任务表数据模型
export interface IGenerationTask extends Document {
task_id: string; // 任务唯一标识,主键
task_type: 'image' | 'video'; // 任务类型
server_id: string; // 分配的服务器ID外部分配对应SERVICE_ID
// 原始请求参数
original_params: {
model?: string; // 模型名称
prompt: string; // 提示词
negative_prompt?: string; // 负向提示词
width?: number; // 宽度
height?: number; // 高度
sample_strength?: number; // 采样强度
images?: Array<{ // 图片参数(视频生成用)
url: string;
width: number;
height: number;
}>;
is_pro?: boolean; // 是否专业版
duration?: number; // 时长(毫秒)
ratio?: string; // 比例
response_format?: string; // 响应格式
};
// 生成过程中的内部参数
internal_params: {
refresh_token: string; // 认证令牌
component_id?: string; // 组件ID
history_id?: string; // 即梦平台历史记录ID
mapped_model?: string; // 映射后的模型名
submit_id?: string; // 提交ID
};
// 任务状态和控制
status: 'pending' | 'processing' | 'polling' | 'completed' | 'failed';
retry_count: number; // 当前重试次数
max_retries: number; // 最大重试次数默认3次
// 轮询控制
next_poll_at?: number; // 下次轮询时间戳(用于控制轮询间隔)
poll_interval: number; // 轮询间隔默认10秒
task_timeout: number; // 任务超时时间图片1小时视频24小时
// 时间戳
created_at: number; // 创建时间戳(秒)
updated_at: number; // 更新时间戳(秒)
started_at?: number; // 开始处理时间戳(秒)
completed_at?: number; // 完成时间戳(秒)
// 错误信息
error_message?: string; // 错误描述
fail_code?: string; // 即梦平台返回的失败代码
}
const GenerationTaskSchema: Schema = new Schema({
task_id: {
type: String,
required: true,
unique: true,
index: true
},
task_type: {
type: String,
required: true,
enum: ['image', 'video']
},
server_id: {
type: String,
required: true,
index: true
},
original_params: {
model: String,
prompt: { type: String, required: true },
negative_prompt: String,
width: Number,
height: Number,
sample_strength: Number,
images: [{
url: String,
width: Number,
height: Number
}],
is_pro: Boolean,
duration: Number,
ratio: String,
response_format: String
},
internal_params: {
refresh_token: { type: String, required: true },
component_id: String,
history_id: String,
mapped_model: String,
submit_id: String
},
status: {
type: String,
required: true,
enum: ['pending', 'processing', 'polling', 'completed', 'failed'],
default: 'pending',
index: true
},
retry_count: {
type: Number,
default: 0
},
max_retries: {
type: Number,
default: 3
},
next_poll_at: {
type: Number,
index: true
},
poll_interval: {
type: Number,
default: 10
},
task_timeout: {
type: Number,
required: true
},
created_at: {
type: Number,
default: () => Math.floor(Date.now() / 1000),
index: true
},
updated_at: {
type: Number,
default: () => Math.floor(Date.now() / 1000)
},
started_at: Number,
completed_at: Number,
error_message: String,
fail_code: String
}, {
collection: 'jimeng_free_generation_tasks',
timestamps: false // 使用自定义时间戳
});
// 创建复合索引 - 用于轮询查询
GenerationTaskSchema.index({ server_id: 1, status: 1, next_poll_at: 1 });
// 创建其他索引
GenerationTaskSchema.index({ task_id: 1 });
GenerationTaskSchema.index({ created_at: 1 });
GenerationTaskSchema.index({ updated_at: 1 });
export default mongoose.model<IGenerationTask>('GenerationTask', GenerationTaskSchema);

View File

@ -0,0 +1,380 @@
import GenerationTask, { IGenerationTask } from '@/lib/database/models/GenerationTask.js';
import GenerationResult, { IGenerationResult } from '@/lib/database/models/GenerationResult.js';
import logger from '@/lib/logger.js';
/**
*
*
*/
export class DatabaseGenerationService {
private static instance: DatabaseGenerationService;
private currentServerId: string;
private constructor() {
this.currentServerId = process.env.SERVICE_ID || 'jimeng-free-api';
}
public static getInstance(): DatabaseGenerationService {
if (!DatabaseGenerationService.instance) {
DatabaseGenerationService.instance = new DatabaseGenerationService();
}
return DatabaseGenerationService.instance;
}
/**
* -
*
*/
async generateImagesV2(
model: string,
taskId: string,
prompt: string,
params: {
width?: number;
height?: number;
sampleStrength?: number;
negativePrompt?: string;
response_format?: string;
},
refreshToken: string
): Promise<void> {
try {
const currentServerId = this.currentServerId;
const imageTimeout = parseInt(process.env.IMAGE_TASK_TIMEOUT || '3600');
// 检查任务是否已存在
const existingTask = await GenerationTask.findOne({ task_id: taskId });
if (existingTask) {
logger.warn(`Task ${taskId} already exists, skipping creation`);
return;
}
// 创建任务记录
await GenerationTask.create({
task_id: taskId,
task_type: 'image',
server_id: currentServerId,
original_params: {
model,
prompt,
width: params.width || 1024,
height: params.height || 1024,
sample_strength: params.sampleStrength || 0.5,
negative_prompt: params.negativePrompt || "",
response_format: params.response_format
},
internal_params: {
refresh_token: refreshToken
},
status: 'pending',
retry_count: 0,
max_retries: 3,
poll_interval: 10,
task_timeout: imageTimeout,
created_at: Math.floor(Date.now() / 1000),
updated_at: Math.floor(Date.now() / 1000)
});
logger.info(`Image task created: ${taskId} for server: ${currentServerId}`);
} catch (error) {
logger.error(`Failed to create image task ${taskId}:`, error);
throw error;
}
}
/**
* -
*
*/
async generateVideoV2(
taskId: string,
prompt: string,
params: {
images?: Array<{
url: string;
width: number;
height: number;
}>;
isPro?: boolean;
duration?: number;
ratio?: string;
},
refreshToken: string
): Promise<void> {
try {
const currentServerId = this.currentServerId;
const videoTimeout = parseInt(process.env.VIDEO_TASK_TIMEOUT || '86400');
// 检查任务是否已存在
const existingTask = await GenerationTask.findOne({ task_id: taskId });
if (existingTask) {
logger.warn(`Task ${taskId} already exists, skipping creation`);
return;
}
// 创建任务记录
await GenerationTask.create({
task_id: taskId,
task_type: 'video',
server_id: currentServerId,
original_params: {
prompt,
images: params.images || [],
is_pro: params.isPro || false,
duration: params.duration || 5000,
ratio: params.ratio || '9:16'
},
internal_params: {
refresh_token: refreshToken
},
status: 'pending',
retry_count: 0,
max_retries: 3,
poll_interval: 15, // 视频轮询间隔更长
task_timeout: videoTimeout,
created_at: Math.floor(Date.now() / 1000),
updated_at: Math.floor(Date.now() / 1000)
});
logger.info(`Video task created: ${taskId} for server: ${currentServerId}`);
} catch (error) {
logger.error(`Failed to create video task ${taskId}:`, error);
throw error;
}
}
/**
*
*
*/
async queryTaskResult(taskId: string): Promise<any> {
try {
// 1. 先查询结果表
const result = await GenerationResult.findOne({ task_id: taskId });
if (result) {
// 找到结果,返回并清理
const response = {
created: Math.floor(Date.now() / 1000),
data: {
task_id: taskId,
url: result.tos_urls.join(','),
status: result.status === 'success' ? -1 : -2
}
};
// 删除结果记录(一次性消费)
await GenerationResult.deleteOne({ task_id: taskId });
logger.info(`Task result retrieved and cleaned: ${taskId}, status: ${result.status}`);
return response;
}
// 2. 查询任务状态
const task = await GenerationTask.findOne({ task_id: taskId });
if (!task) {
return {
created: Math.floor(Date.now() / 1000),
data: { task_id: taskId, url: "", status: 0 } // 任务不存在
};
}
// 3. 根据任务状态返回
const statusMap = {
'pending': 0,
'processing': 0,
'polling': 0,
'failed': -2,
'completed': -1 // 这种情况理论上不会出现因为completed会生成result
};
const responseStatus = statusMap[task.status] || 0;
logger.debug(`Task status queried: ${taskId}, status: ${task.status} -> ${responseStatus}`);
return {
created: Math.floor(Date.now() / 1000),
data: {
task_id: taskId,
url: "",
status: responseStatus
}
};
} catch (error) {
logger.error(`Failed to query task result ${taskId}:`, error);
// 发生错误时返回任务不存在状态
return {
created: Math.floor(Date.now() / 1000),
data: { task_id: taskId, url: "", status: 0 }
};
}
}
/**
*
*/
async getTaskStats(serverId?: string): Promise<any> {
try {
const filter = serverId ? { server_id: serverId } : {};
const stats = await GenerationTask.aggregate([
{ $match: filter },
{
$group: {
_id: "$status",
count: { $sum: 1 }
}
}
]);
const result = stats.reduce((acc, curr) => {
acc[curr._id] = curr.count;
return acc;
}, {} as Record<string, number>);
// 添加服务器负载信息
if (serverId) {
result['server_load'] = await this.getServerLoad(serverId);
}
return result;
} catch (error) {
logger.error('Failed to get task stats:', error);
return {};
}
}
/**
*
*/
async getServerLoad(serverId: string): Promise<number> {
try {
return await GenerationTask.countDocuments({
server_id: serverId,
status: { $in: ['processing', 'polling'] }
});
} catch (error) {
logger.error(`Failed to get server load for ${serverId}:`, error);
return 0;
}
}
/**
*
*/
async getTaskDetail(taskId: string): Promise<IGenerationTask | null> {
try {
return await GenerationTask.findOne({ task_id: taskId });
} catch (error) {
logger.error(`Failed to get task detail ${taskId}:`, error);
return null;
}
}
/**
*
*/
async cancelTask(taskId: string): Promise<boolean> {
try {
const result = await GenerationTask.updateOne(
{
task_id: taskId,
status: { $in: ['pending', 'processing', 'polling'] }
},
{
status: 'failed',
error_message: 'Task cancelled by user',
completed_at: Math.floor(Date.now() / 1000),
updated_at: Math.floor(Date.now() / 1000)
}
);
const cancelled = result.modifiedCount > 0;
if (cancelled) {
logger.info(`Task cancelled: ${taskId}`);
// 创建取消结果记录
const currentTime = Math.floor(Date.now() / 1000);
const expireTime = currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400');
const task = await GenerationTask.findOne({ task_id: taskId });
if (task) {
await GenerationResult.create({
task_id: taskId,
task_type: task.task_type,
server_id: task.server_id,
status: 'failed',
original_urls: [],
tos_urls: [],
metadata: {
total_files: 0,
successful_uploads: 0,
fail_reason: 'Task cancelled by user'
},
created_at: currentTime,
expires_at: expireTime
});
}
}
return cancelled;
} catch (error) {
logger.error(`Failed to cancel task ${taskId}:`, error);
return false;
}
}
/**
*
*/
async cleanupExpiredData(): Promise<{ tasks: number; results: number }> {
try {
const currentTime = Math.floor(Date.now() / 1000);
const taskTimeout = parseInt(process.env.IMAGE_TASK_TIMEOUT || '3600'); // 使用较短的作为默认值
const cutoffTime = currentTime - taskTimeout * 2; // 清理超过2倍超时时间的任务
// 清理过期任务
const taskResult = await GenerationTask.deleteMany({
status: { $in: ['completed', 'failed'] },
completed_at: { $lt: cutoffTime }
});
// 清理过期结果由TTL索引自动处理这里手动清理作为备份
const resultResult = await GenerationResult.deleteMany({
expires_at: { $lt: currentTime }
});
const cleanupStats = {
tasks: taskResult.deletedCount || 0,
results: resultResult.deletedCount || 0
};
if (cleanupStats.tasks > 0 || cleanupStats.results > 0) {
logger.info(`Cleanup completed - tasks: ${cleanupStats.tasks}, results: ${cleanupStats.results}`);
}
return cleanupStats;
} catch (error) {
logger.error('Failed to cleanup expired data:', error);
return { tasks: 0, results: 0 };
}
}
/**
*
*/
getServiceInfo() {
return {
serverId: this.currentServerId,
type: 'DatabaseGenerationService',
version: '1.0.0'
};
}
}
export default DatabaseGenerationService.getInstance();

View File

@ -0,0 +1,933 @@
import fs from 'fs-extra';
import path from 'path';
import { formatInTimeZone } from 'date-fns-tz';
import GenerationTask, { IGenerationTask } from '@/lib/database/models/GenerationTask.js';
import GenerationResult, { IGenerationResult } from '@/lib/database/models/GenerationResult.js';
import mongoDBManager from '@/lib/database/mongodb.js';
import logger from '@/lib/logger.js';
import TOSService from '@/lib/tos/tos-service.js';
import { generateImages as originalGenerateImages } from '@/api/controllers/images.js';
import { generateVideo as originalGenerateVideo } from '@/api/controllers/video.js';
import { request } from '@/api/controllers/core.js';
const timeZone = 'Asia/Shanghai';
const TASK_POLLING_LOG_PATH = path.resolve("./logs/task_polling.log");
function taskLog(message: string) {
try {
const timestamp = formatInTimeZone(new Date(), timeZone, "yyyy-MM-dd HH:mm:ss.SSS");
const logMessage = `[TaskPolling][${timestamp}] ${message}`;
fs.ensureDirSync(path.dirname(TASK_POLLING_LOG_PATH));
fs.appendFileSync(TASK_POLLING_LOG_PATH, logMessage + "\n");
// 同时输出到控制台
logger.info(logMessage);
} catch (err) {
console.error("TaskPolling log write error:", err);
}
}
export class TaskPollingService {
private static instance: TaskPollingService;
private currentServerId: string;
private pollInterval: NodeJS.Timeout | null = null;
private isRunning: boolean = false;
private maxConcurrentTasks: number;
private cleanupCounter = 0;
private constructor() {
this.currentServerId = process.env.SERVICE_ID || 'jimeng-free-api';
this.maxConcurrentTasks = parseInt(process.env.MAX_CONCURRENT_TASKS || '3');
taskLog("TaskPollingService initialized");
}
public static getInstance(): TaskPollingService {
if (!TaskPollingService.instance) {
TaskPollingService.instance = new TaskPollingService();
}
return TaskPollingService.instance;
}
/**
*
* HeartbeatService分离使
*/
public async start(): Promise<void> {
if (this.isRunning) {
taskLog('Task polling service is already running');
return;
}
try {
// 确保MongoDB连接可用
if (!mongoDBManager.isMongoConnected()) {
taskLog('MongoDB not connected, skipping task polling service');
return;
}
const pollIntervalMs = parseInt(process.env.TASK_POLL_INTERVAL || '5') * 1000;
// 每5秒轮询一次与心跳服务的60秒区分开
this.pollInterval = setInterval(async () => {
await this.processTasks();
}, pollIntervalMs);
this.isRunning = true;
taskLog(`Task polling service started for server: ${this.currentServerId}, interval: ${pollIntervalMs}ms`);
// 监听进程退出事件
process.on('SIGINT', () => this.gracefulShutdown());
process.on('SIGTERM', () => this.gracefulShutdown());
} catch (error) {
taskLog(`Failed to start task polling service: ${error.message}`);
throw error;
}
}
public async stop(): Promise<void> {
if (!this.isRunning) {
return;
}
if (this.pollInterval) {
clearInterval(this.pollInterval);
this.pollInterval = null;
}
this.isRunning = false;
taskLog('Task polling service stopped');
}
/**
*
*/
private async processTasks(): Promise<void> {
try {
const currentTime = Math.floor(Date.now() / 1000);
// 1. 处理待处理任务
await this.processPendingTasks(currentTime);
// 2. 检查轮询任务
await this.checkPollingTasks(currentTime);
// 3. 检查和处理超时任务
await this.checkTimeoutTasks(currentTime);
// 4. 定期清理每720次轮询约1小时
this.cleanupCounter++;
if (this.cleanupCounter >= 720) {
await this.performCleanup();
this.cleanupCounter = 0;
}
} catch (error) {
taskLog(`Task polling error: ${error.message}`);
}
}
/**
*
*/
private async checkTimeoutTasks(currentTime: number): Promise<void> {
try {
const timeoutCount = await DatabaseCleanupService.cleanupTimeoutTasks();
if (timeoutCount > 0) {
taskLog(`Marked ${timeoutCount} tasks as failed due to timeout`);
}
} catch (error) {
taskLog(`Failed to check timeout tasks: ${error.message}`);
}
}
/**
*
*/
private async processPendingTasks(currentTime: number): Promise<void> {
try {
// 获取当前服务器负载
const currentLoad = await this.getCurrentServerLoad();
if (currentLoad >= this.maxConcurrentTasks) {
return; // 服务器已满载
}
const availableSlots = this.maxConcurrentTasks - currentLoad;
// 获取待处理任务
const pendingTasks = await GenerationTask.find({
server_id: this.currentServerId,
status: 'pending'
})
.sort({ created_at: 1 }) // 先入先出
.limit(availableSlots);
for (const task of pendingTasks) {
await this.startTask(task, currentTime);
}
if (pendingTasks.length > 0) {
taskLog(`Started ${pendingTasks.length} pending tasks`);
}
} catch (error) {
taskLog(`Failed to process pending tasks: ${error.message}`);
}
}
/**
*
*/
private async checkPollingTasks(currentTime: number): Promise<void> {
try {
// 获取需要轮询的任务
const pollingTasks = await GenerationTask.find({
server_id: this.currentServerId,
status: 'polling',
$or: [
{ next_poll_at: { $exists: false } },
{ next_poll_at: { $lte: currentTime } }
]
});
for (const task of pollingTasks) {
await this.pollTaskResult(task, currentTime);
}
if (pollingTasks.length > 0) {
taskLog(`Polled ${pollingTasks.length} tasks for results`);
}
} catch (error) {
taskLog(`Failed to check polling tasks: ${error.message}`);
}
}
/**
*
*/
private async startTask(task: IGenerationTask, currentTime: number): Promise<void> {
try {
// 原子更新任务状态为processing
const updateResult = await GenerationTask.updateOne(
{ task_id: task.task_id, status: 'pending' },
{
status: 'processing',
started_at: currentTime,
updated_at: currentTime
}
);
if (updateResult.modifiedCount === 0) {
return; // 任务已被其他进程处理
}
taskLog(`Starting task: ${task.task_id} (${task.task_type})`);
// 调用原有生成方法获取historyId
let historyId: string;
if (task.task_type === 'image') {
historyId = await this.callImageGeneration(task);
} else {
historyId = await this.callVideoGeneration(task);
}
// 更新任务为轮询状态
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
status: 'polling',
'internal_params.history_id': historyId,
next_poll_at: currentTime + task.poll_interval,
updated_at: currentTime
}
);
taskLog(`Task ${task.task_id} started successfully, history_id: ${historyId}`);
} catch (error) {
taskLog(`Failed to start task ${task.task_id}: ${error.message}`);
await this.markTaskFailed(task.task_id, error.message);
}
}
/**
*
*/
private async pollTaskResult(task: IGenerationTask, currentTime: number): Promise<void> {
try {
const historyId = task.internal_params.history_id;
if (!historyId) {
throw new Error('Missing history_id for polling');
}
taskLog(`Polling task result: ${task.task_id}, history_id: ${historyId}`);
// 调用即梦API检查结果
const result = await request("post", "/mweb/v1/get_history_by_ids", task.internal_params.refresh_token, {
data: {
history_ids: [historyId],
image_info: task.task_type === 'image' ? this.getImageInfo() : undefined,
http_common_info: {
aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"),
},
},
});
if (!result[historyId]) {
// 结果不存在,更新下次轮询时间
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
next_poll_at: currentTime + task.poll_interval,
updated_at: currentTime
}
);
return;
}
const { status, fail_code: failCode, item_list } = result[historyId];
if (status === 20) {
// 仍在生成中,更新下次轮询时间
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
next_poll_at: currentTime + task.poll_interval,
updated_at: currentTime
}
);
} else if (status === 10 || (status !== 30 && item_list && item_list.length > 0)) {
// 生成完成
await this.handleGenerationSuccess(task, item_list, currentTime);
} else {
// 生成失败
const errorMessage = failCode === '2038' ? 'Content filtered' : 'Generation failed';
await this.handleGenerationFailure(task, errorMessage, failCode, currentTime);
}
} catch (error) {
taskLog(`Failed to poll task ${task.task_id}: ${error.message}`);
// 轮询出错,更新下次轮询时间,增加重试计数
await this.handlePollingError(task, error, currentTime);
}
}
/**
*
*/
private async handleGenerationSuccess(task: IGenerationTask, itemList: any[], currentTime: number): Promise<void> {
try {
let originalUrls: string[] = [];
if (task.task_type === 'image') {
originalUrls = itemList.map((item) => {
if (item?.image?.large_images?.[0]?.image_url) {
return item.image.large_images[0].image_url;
}
return item?.common_attr?.cover_url || null;
}).filter(url => url !== null);
} else {
originalUrls = itemList.map((item) => {
return item?.video?.transcoded_video?.origin?.video_url || null;
}).filter(url => url !== null);
}
if (originalUrls.length === 0) {
throw new Error('No valid URLs generated');
}
taskLog(`Task ${task.task_id} generated ${originalUrls.length} files`);
// 上传到TOS
const tosUrls = await this.uploadToTOS(originalUrls, task.task_type);
// 创建结果记录
const expireTime = currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400');
await GenerationResult.create({
task_id: task.task_id,
task_type: task.task_type,
server_id: task.server_id,
status: 'success',
original_urls: originalUrls,
tos_urls: tosUrls,
metadata: {
generation_time: (currentTime - (task.started_at || task.created_at)) * 1000,
total_files: originalUrls.length,
successful_uploads: tosUrls.length,
tos_upload_errors: tosUrls.length < originalUrls.length ? ['Some uploads failed'] : undefined
},
created_at: currentTime,
expires_at: expireTime
});
// 标记任务完成
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
status: 'completed',
completed_at: currentTime,
updated_at: currentTime
}
);
taskLog(`Task ${task.task_id} completed successfully`);
} catch (error) {
taskLog(`Failed to handle success for task ${task.task_id}: ${error.message}`);
await this.markTaskFailed(task.task_id, error.message);
}
}
/**
*
*/
private async handleGenerationFailure(task: IGenerationTask, errorMessage: string, failCode?: string, currentTime?: number): Promise<void> {
const now = currentTime || Math.floor(Date.now() / 1000);
try {
// 创建失败结果记录
const expireTime = now + parseInt(process.env.RESULT_EXPIRE_TIME || '86400');
await GenerationResult.create({
task_id: task.task_id,
task_type: task.task_type,
server_id: task.server_id,
status: 'failed',
original_urls: [],
tos_urls: [],
metadata: {
total_files: 0,
successful_uploads: 0,
fail_reason: errorMessage
},
created_at: now,
expires_at: expireTime
});
// 标记任务失败
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
status: 'failed',
error_message: errorMessage,
fail_code: failCode,
completed_at: now,
updated_at: now
}
);
taskLog(`Task ${task.task_id} failed: ${errorMessage}`);
} catch (error) {
taskLog(`Failed to handle failure for task ${task.task_id}: ${error.message}`);
}
}
/**
*
*/
private async handlePollingError(task: IGenerationTask, error: Error, currentTime: number): Promise<void> {
const newRetryCount = task.retry_count + 1;
if (newRetryCount >= task.max_retries) {
await this.markTaskFailed(task.task_id, `Polling failed after ${task.max_retries} retries: ${error.message}`);
} else {
// 增加重试计数,延长轮询间隔
const nextPollDelay = task.poll_interval * Math.pow(2, newRetryCount); // 指数退避
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
retry_count: newRetryCount,
next_poll_at: currentTime + nextPollDelay,
updated_at: currentTime
}
);
}
}
/**
*
*/
private async markTaskFailed(taskId: string, errorMessage: string): Promise<void> {
const currentTime = Math.floor(Date.now() / 1000);
try {
const task = await GenerationTask.findOne({ task_id: taskId });
if (task) {
await this.handleGenerationFailure(task, errorMessage, undefined, currentTime);
}
} catch (error) {
taskLog(`Failed to mark task ${taskId} as failed: ${error.message}`);
}
}
/**
* API
*/
private async callImageGeneration(task: IGenerationTask): Promise<string> {
const { model, prompt, width = 1024, height = 1024, sample_strength = 0.5, negative_prompt = "" } = task.original_params;
const refreshToken = task.internal_params.refresh_token;
// 映射模型
const MODEL_MAP = {
"jimeng-3.1": "high_aes_general_v30l_art_fangzhou:general_v3.0_18b",
"jimeng-3.0": "high_aes_general_v30l:general_v3.0_18b",
"jimeng-2.1": "high_aes_general_v21_L:general_v2.1_L",
"jimeng-2.0-pro": "high_aes_general_v20_L:general_v2.0_L",
"jimeng-2.0": "high_aes_general_v20:general_v2.0",
"jimeng-1.4": "high_aes_general_v14:general_v1.4",
"jimeng-xl-pro": "text2img_xl_sft",
};
const mappedModel = MODEL_MAP[model] || MODEL_MAP["jimeng-3.0"];
const componentId = this.generateUUID();
const submitId = this.generateUUID();
const { aigc_data } = await request(
"post",
"/mweb/v1/aigc_draft/generate",
refreshToken,
{
params: {
babi_param: encodeURIComponent(
JSON.stringify({
scenario: "image_video_generation",
feature_key: "aigc_to_image",
feature_entrance: "to_image",
feature_entrance_detail: "to_image-" + mappedModel,
})
),
},
data: {
extend: {
root_model: mappedModel,
template_id: "",
},
submit_id: submitId,
metrics_extra: JSON.stringify({
templateId: "",
generateCount: 1,
promptSource: "custom",
templateSource: "",
lastRequestId: "",
originRequestId: "",
}),
draft_content: JSON.stringify({
type: "draft",
id: this.generateUUID(),
min_version: "3.0.2",
is_from_tsn: true,
version: "3.0.2",
main_component_id: componentId,
component_list: [
{
type: "image_base_component",
id: componentId,
min_version: "3.0.2",
generate_type: "generate",
aigc_mode: "workbench",
abilities: {
type: "",
id: this.generateUUID(),
generate: {
type: "",
id: this.generateUUID(),
core_param: {
type: "",
id: this.generateUUID(),
model: mappedModel,
prompt,
negative_prompt,
seed: Math.floor(Math.random() * 100000000) + 2500000000,
sample_strength,
image_ratio: 1,
large_image_info: {
type: "",
id: this.generateUUID(),
height,
width,
resolution_type: "1k",
},
},
history_option: {
type: "",
id: this.generateUUID(),
},
},
},
},
],
}),
http_common_info: {
aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"),
},
},
}
);
const historyId = aigc_data.history_record_id;
if (!historyId) {
throw new Error('Failed to get history_record_id from image generation API');
}
taskLog(`Image generation started: ${task.task_id}, history_id: ${historyId}`);
return historyId;
}
/**
* API
*/
private async callVideoGeneration(task: IGenerationTask): Promise<string> {
const { prompt, images = [], is_pro = false, duration = 5000, ratio = '9:16' } = task.original_params;
const refreshToken = task.internal_params.refresh_token;
// 映射模型
const MODEL_MAP = {
"jimeng-v-3.0": "dreamina_ic_generate_video_model_vgfm_3.0",
"jimeng-v-3.0-pro": "dreamina_ic_generate_video_model_vgfm_3.0_pro",
};
let _model = "jimeng-v-3.0";
if (is_pro) {
_model = "jimeng-v-3.0-pro";
}
const mappedModel = MODEL_MAP[_model] || MODEL_MAP["jimeng-v-3.0"];
// 构建视频生成输入
let video_gen_inputs: any = {
type: "",
id: this.generateUUID(),
min_version: "1.0.0",
prompt: prompt,
video_mode: 2,
fps: 24,
duration_ms: duration,
};
if (images && images.length >= 1) {
// 首帧
video_gen_inputs.first_frame_image = {
type: "image",
id: this.generateUUID(),
source_from: "upload",
platform_type: 1,
name: "",
image_uri: images[0].url,
width: images[0].width,
height: images[0].height,
format: "",
uri: images[0].url,
};
}
if (images && images.length >= 2) {
// 尾帧
video_gen_inputs.end_frame_image = {
type: "image",
id: this.generateUUID(),
source_from: "upload",
platform_type: 1,
name: "",
image_uri: images[1].url,
width: images[1].width,
height: images[1].height,
format: "",
uri: images[1].url,
};
}
const componentId = this.generateUUID();
const originSubmitId = this.generateUUID();
const { aigc_data } = await request(
"post",
"/mweb/v1/aigc_draft/generate",
refreshToken,
{
params: {
babi_param: encodeURIComponent(
JSON.stringify({
scenario: "image_video_generation",
feature_key: "text_to_video",
feature_entrance: "to_video",
feature_entrance_detail: "to_image-text_to_video",
})
),
},
data: {
extend: {
m_video_commerce_info: {
resource_id: "generate_video",
resource_id_type: "str",
resource_sub_type: "aigc",
benefit_type: "basic_video_operation_vgfm_v_three"
},
root_model: mappedModel,
template_id: "",
history_option: {},
},
submit_id: this.generateUUID(),
metrics_extra: JSON.stringify({
promptSource: "custom",
originSubmitId: originSubmitId,
isDefaultSeed: 1,
originTemplateId: "",
imageNameMapping: {},
}),
draft_content: JSON.stringify({
type: "draft",
id: this.generateUUID(),
min_version: "3.0.5",
min_features: [],
is_from_tsn: true,
version: "3.2.2",
main_component_id: componentId,
component_list: [
{
type: "video_base_component",
id: componentId,
min_version: "1.0.0",
generate_type: "gen_video",
aigc_mode: "workbench",
metadata: {
type: "",
id: this.generateUUID(),
created_platform: 3,
created_platform_version: "",
created_time_in_ms: Date.now(),
created_did: "",
},
abilities: {
type: "",
id: this.generateUUID(),
gen_video: {
type: "",
id: this.generateUUID(),
text_to_video_params: {
type: "",
id: this.generateUUID(),
video_gen_inputs: [video_gen_inputs],
video_aspect_ratio: ratio,
seed: Math.floor(Math.random() * 100000000) + 2500000000,
model_req_key: mappedModel,
},
video_task_extra: {
promptSource: "custom",
originSubmitId: originSubmitId,
isDefaultSeed: 1,
originTemplateId: "",
imageNameMapping: {},
}
},
},
process_type: 1,
},
],
}),
http_common_info: {
aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"),
},
},
}
);
const historyId = aigc_data.history_record_id;
if (!historyId) {
throw new Error('Failed to get history_record_id from video generation API');
}
taskLog(`Video generation started: ${task.task_id}, history_id: ${historyId}`);
return historyId;
}
/**
* TOS
*/
private async uploadToTOS(urls: string[], taskType: 'image' | 'video'): Promise<string[]> {
const tosUrls: string[] = [];
for (const url of urls) {
try {
const fileName = taskType === 'image'
? `image-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.webp`
: `video-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.mp4`;
const folder = taskType === 'image' ? 'images' : 'videos';
const tosUrl = await TOSService.uploadFromUrl(url, `${folder}/${fileName}`);
tosUrls.push(tosUrl);
taskLog(`${taskType} uploaded to TOS: ${url} -> ${tosUrl}`);
} catch (error) {
taskLog(`${taskType} upload to TOS failed: ${url} - ${error.message}`);
// 如果上传失败保留原URL
tosUrls.push(url);
}
}
return tosUrls;
}
/**
*
*/
private async getCurrentServerLoad(): Promise<number> {
return await GenerationTask.countDocuments({
server_id: this.currentServerId,
status: { $in: ['processing', 'polling'] }
});
}
/**
*
*/
private getImageInfo() {
return {
width: 2048,
height: 2048,
format: "webp",
image_scene_list: [
{
scene: "normal",
width: 2400,
height: 2400,
uniq_key: "2400",
format: "webp",
},
{
scene: "normal",
width: 1080,
height: 1080,
uniq_key: "1080",
format: "webp",
},
{
scene: "normal",
width: 720,
height: 720,
uniq_key: "720",
format: "webp",
},
],
};
}
/**
*
*/
private async performCleanup(): Promise<void> {
try {
const expiredResults = await DatabaseCleanupService.cleanupExpiredResults();
const timeoutTasks = await DatabaseCleanupService.cleanupTimeoutTasks();
taskLog(`Cleanup completed - expired results: ${expiredResults}, timeout tasks: ${timeoutTasks}`);
} catch (error) {
taskLog(`Cleanup failed: ${error.message}`);
}
}
/**
*
*/
private async gracefulShutdown(): Promise<void> {
taskLog('Received shutdown signal, stopping task polling service...');
await this.stop();
}
public getServiceInfo() {
return {
serverId: this.currentServerId,
isRunning: this.isRunning,
maxConcurrentTasks: this.maxConcurrentTasks
};
}
/**
* UUID
*/
private generateUUID(): string {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
const r = Math.random() * 16 | 0;
const v = c === 'x' ? r : (r & 0x3 | 0x8);
return v.toString(16);
});
}
}
/**
*
*/
export class DatabaseCleanupService {
/**
*
*/
static async cleanupExpiredResults(): Promise<number> {
const currentTime = Math.floor(Date.now() / 1000);
const result = await GenerationResult.deleteMany({
expires_at: { $lt: currentTime }
});
return result.deletedCount || 0;
}
/**
*
*/
static async cleanupTimeoutTasks(): Promise<number> {
const currentTime = Math.floor(Date.now() / 1000);
// 查找超时的任务并标记为失败
const timeoutTasks = await GenerationTask.find({
status: { $in: ['processing', 'polling'] },
$expr: {
$gt: [
{ $subtract: [currentTime, '$started_at'] },
'$task_timeout'
]
}
});
// 批量更新超时任务为失败状态
const timeoutTaskIds = timeoutTasks.map(task => task.task_id);
if (timeoutTaskIds.length > 0) {
await GenerationTask.updateMany(
{ task_id: { $in: timeoutTaskIds } },
{
status: 'failed',
error_message: 'Task timeout',
updated_at: currentTime,
completed_at: currentTime
}
);
// 为超时任务创建失败结果记录
const failedResults = timeoutTasks.map(task => ({
task_id: task.task_id,
task_type: task.task_type,
server_id: task.server_id,
status: 'failed',
original_urls: [],
tos_urls: [],
metadata: {
total_files: 0,
successful_uploads: 0,
fail_reason: 'Task timeout after ' + task.task_timeout + ' seconds'
},
created_at: currentTime,
expires_at: currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400')
}));
if (failedResults.length > 0) {
await GenerationResult.insertMany(failedResults);
}
}
return timeoutTaskIds.length;
}
}
export default TaskPollingService.getInstance();