diff --git a/src/api/VideoTaskCache.ts b/src/api/VideoTaskCache.ts new file mode 100644 index 0000000..694c71c --- /dev/null +++ b/src/api/VideoTaskCache.ts @@ -0,0 +1,90 @@ +import fs from 'fs-extra'; +import path from 'path'; +import { format as dateFormat } from 'date-fns'; + +const LOG_PATH = path.resolve("./logs/video_task_cache.log"); + +function cacheLog(value: string, color?: string) { + try { + const head = `[VideoTaskCache][${dateFormat(new Date(), "yyyy-MM-dd HH:mm:ss.SSS")}] `; + value = head + value; + console.log(color ? value[color] : value); + fs.ensureDirSync(path.dirname(LOG_PATH)); + fs.appendFileSync(LOG_PATH, value + "\n"); + } + catch(err) { + console.error("VideoTaskCache log write error:", err); + } +} + +export class VideoTaskCache { + private static instance: VideoTaskCache; + private taskCache: Map; + + private constructor() { + this.taskCache = new Map(); + cacheLog("VideoTaskCache initialized"); + } + + public static getInstance(): VideoTaskCache { + if (!VideoTaskCache.instance) { + VideoTaskCache.instance = new VideoTaskCache(); + } + return VideoTaskCache.instance; + } + + public startTask(taskId: string): void { + const startTime = Math.floor(Date.now() / 1000); // Current time in seconds + this.taskCache.set(taskId, startTime); + cacheLog(`Task started: ${taskId} at ${startTime}`); + } + + public finishTask(taskId: string, status: -1 | -2 | -3, url:string = ''): void { + if (this.taskCache.has(taskId)) { + this.taskCache.set(taskId, status); + let statusMessage = ''; + switch (status) { + case -1: + { + statusMessage = 'successfully'; + if (url) { + this.taskCache.set(taskId, url); + } + } + break; + case -2: statusMessage = 'failed'; break; + case -3: statusMessage = 'timed out'; break; + } + cacheLog(`Task ${taskId} finished ${statusMessage} (status: ${status})`); + } else { + cacheLog(`Attempted to finish non-existent task: ${taskId}`); + } + } + + public getTaskStatus(taskId: string): number | string | undefined { + return this.taskCache.get(taskId); + } + + public getPendingTasks(): string[] { + const pendingTasks: string[] = []; + for (const [taskId, status] of this.taskCache.entries()) { + if (typeof status == 'number' && status > 0) { + pendingTasks.push(taskId); + } + } + return pendingTasks; + } + + public logPendingTasksOnShutdown(): void { + const pendingTasks = this.getPendingTasks(); + if (pendingTasks.length > 0) { + cacheLog(`Pending tasks at shutdown: ${pendingTasks.join(', ')}`, 'yellow'); + } else { + cacheLog("No pending tasks at shutdown."); + } + } +} + +// Initialize the singleton instance when the module is loaded. +// This ensures it's ready when the service starts. +VideoTaskCache.getInstance(); \ No newline at end of file diff --git a/src/api/controllers/video.ts b/src/api/controllers/video.ts index f634afa..5ce7b03 100644 --- a/src/api/controllers/video.ts +++ b/src/api/controllers/video.ts @@ -5,6 +5,7 @@ import EX from "@/api/consts/exceptions.ts"; import util from "@/lib/util.ts"; import { getCredit, receiveCredit, request } from "./core.ts"; import logger from "@/lib/logger.ts"; +import { VideoTaskCache } from '@/api/VideoTaskCache.ts'; const DEFAULT_ASSISTANT_ID = "513695"; export const DEFAULT_MODEL = "jimeng-v-3.0"; @@ -20,6 +21,7 @@ export function getModel(model: string) { export async function generateVideo( _model: string, + task_id: string, prompt: string, { width = 512, @@ -34,10 +36,14 @@ export async function generateVideo( }, refreshToken: string ) { - if(!imgURL){ - throw new APIException(EX.API_REQUEST_PARAMS_INVALID); - return; - } + const videoTaskCache = VideoTaskCache.getInstance(); + videoTaskCache.startTask(task_id); + + try { + if(!imgURL){ + throw new APIException(EX.API_REQUEST_PARAMS_INVALID); + return; + } const model = getModel(_model); logger.info(`使用模型: ${_model} : ${model} 参考图片尺寸: ${width}x${height} 图片地址 ${imgURL} 持续时间: ${duration} 提示词: ${prompt}`); @@ -188,12 +194,30 @@ export async function generateVideo( else throw new APIException(EX.API_IMAGE_GENERATION_FAILED); } - return item_list.map((item) => { + // Assuming success if status is not 30 (failed) and not 20 (pending) + // and item_list is populated. + // A more robust check might be needed depending on actual API behavior for success. + const videoUrls = item_list.map((item) => { if(!item?.video?.transcoded_video?.origin?.video_url) - // return item?.common_attr?.cover_url || null; return null; return item.video.transcoded_video.origin.video_url; }); + + // Filter out nulls and check if any valid URL was generated + const validVideoUrls = videoUrls.filter(url => url !== null); + if (validVideoUrls.length > 0) { + videoTaskCache.finishTask(task_id, -1, validVideoUrls.join(",")); // Success + } else { + // If no valid URLs but no explicit error thrown earlier, consider it a failure. + // This could happen if item_list is empty or items don't have video_url. + videoTaskCache.finishTask(task_id, -2); // Failure + throw new APIException(EX.API_IMAGE_GENERATION_FAILED, "视频生成未返回有效链接"); + } + return validVideoUrls; +} catch (error) { + videoTaskCache.finishTask(task_id, -2); // Failure due to exception + throw error; // Re-throw the error to be handled by the caller +} } export default { diff --git a/src/api/routes/video.ts b/src/api/routes/video.ts index d7794ba..7b2c54d 100644 --- a/src/api/routes/video.ts +++ b/src/api/routes/video.ts @@ -4,14 +4,39 @@ import Request from "@/lib/request/Request.ts"; import { generateVideo } from "@/api/controllers/video.ts"; import { tokenSplit } from "@/api/controllers/core.ts"; import util from "@/lib/util.ts"; +import { VideoTaskCache } from '@/api/VideoTaskCache.ts'; export default { prefix: "/v1/video", + get: { + "/query": async (request: Request) => { + const videoTaskCache = VideoTaskCache.getInstance(); + request + .validate("query.task_id", _.isString) // 从 query 中校验 + const { + task_id, + } = request.query; // 从 query 中获取 + let res = videoTaskCache.getTaskStatus(task_id); + console.log("查询任务状态", task_id, 'res:',res); + if(typeof res === 'string'){ + return { + created: util.unixTimestamp(), + data:{task_id, url:res, status:-1}, + }; + }else{ + return { + created: util.unixTimestamp(), + data:{task_id, url:"", status:res||0}, + }; + } + }, + }, post: { "/generations": async (request: Request) => { request .validate("body.model", v => _.isUndefined(v) || _.isString(v)) + .validate("body.task_id", _.isString) .validate("body.prompt", _.isString) .validate("body.image", v => _.isUndefined(v) || _.isString(v)) .validate("body.width", v => _.isUndefined(v) || _.isFinite(v)) @@ -23,6 +48,7 @@ export default { // 随机挑选一个refresh_token const token = _.sample(tokens); const { + task_id, model, prompt, width, @@ -30,19 +56,21 @@ export default { image, duration, } = request.body; - const imageUrls = await generateVideo(model, prompt, { + // const imageUrls = await generateVideo(model, task_id, prompt, { + //不等结果 直接返回 + generateVideo(model, task_id, prompt, { width, height, imgURL:image, duration:duration*1000, }, token); - let data = []; - data = imageUrls.map((url) => ({ - url, - })); + // let data = []; + // data = imageUrls.map((url) => ({ + // url, + // })); return { created: util.unixTimestamp(), - data, + data:'success', }; }, }, diff --git a/src/daemon.ts b/src/daemon.ts index 0c5fe69..c64d325 100644 --- a/src/daemon.ts +++ b/src/daemon.ts @@ -9,6 +9,7 @@ import { spawn } from 'child_process'; import fs from 'fs-extra'; import { format as dateFormat } from 'date-fns'; import 'colors'; +import { VideoTaskCache } from '@/api/VideoTaskCache.ts'; const CRASH_RESTART_LIMIT = 600; //进程崩溃重启次数限制 const CRASH_RESTART_DELAY = 5000; //进程崩溃重启延迟 @@ -34,6 +35,9 @@ function daemonLog(value, color?: string) { daemonLog(`daemon pid: ${process.pid}`); +// Ensure VideoTaskCache is initialized when daemon starts +VideoTaskCache.getInstance(); + function createProcess() { const childProcess = spawn("node", ["index.js", ...process.argv.slice(2)]); //启动子进程 childProcess.stdout.pipe(process.stdout, { end: false }); //将子进程输出管道到当前进程输出 @@ -62,6 +66,7 @@ function createProcess() { } process.on("exit", code => { + VideoTaskCache.getInstance().logPendingTasksOnShutdown(); if(code === 0) daemonLog("daemon process exited"); else if(code === 2) @@ -71,11 +76,13 @@ process.on("exit", code => { process.on("SIGTERM", () => { daemonLog("received kill signal", "yellow"); currentProcess && currentProcess.kill("SIGINT"); + VideoTaskCache.getInstance().logPendingTasksOnShutdown(); process.exit(2); }); //kill退出守护进程 process.on("SIGINT", () => { currentProcess && currentProcess.kill("SIGINT"); + VideoTaskCache.getInstance().logPendingTasksOnShutdown(); process.exit(0); }); //主动退出守护进程