import fs from 'fs-extra'; import path from 'path'; // import { format as dateFormat } from 'date-fns'; const timeZone = 'Asia/Shanghai'; // Beijing Time import { formatInTimeZone } from 'date-fns-tz'; import TOSService from '@/lib/tos/tos-service.js'; import logger from '@/lib/logger.js'; const LOG_PATH = path.resolve("./logs/video_task_cache.log"); function cacheLog(value: string, color?: string) { try { const head = `[VideoTaskCache][${formatInTimeZone(new Date(),timeZone, "yyyy-MM-dd HH:mm:ss.SSS")}] `; value = head + value; // console.log(color ? value[color] : value); fs.ensureDirSync(path.dirname(LOG_PATH)); fs.appendFileSync(LOG_PATH, value + "\n"); } catch(err) { console.error("VideoTaskCache log write error:", err); } } export class VideoTaskCache { private static instance: VideoTaskCache; private taskCache: Map; private tosProcessedTasks: Set; // 记录已处理TOS上传的任务 private cleanupInterval: NodeJS.Timeout | null = null; private constructor() { this.taskCache = new Map(); this.tosProcessedTasks = new Set(); cacheLog("VideoTaskCache initialized"); // 启动定时清理任务(每30分钟) this.startPeriodicCleanup(); } public static getInstance(): VideoTaskCache { if (!VideoTaskCache.instance) { VideoTaskCache.instance = new VideoTaskCache(); } return VideoTaskCache.instance; } /** * 启动定期清理 */ private startPeriodicCleanup(): void { // 每30分钟清理一次过期任务 this.cleanupInterval = setInterval(() => { this.clearExpiredTasks(); }, 30 * 60 * 1000); // 30分钟 cacheLog("Periodic cleanup started for VideoTaskCache"); } /** * 停止定期清理 */ public stopPeriodicCleanup(): void { if (this.cleanupInterval) { clearInterval(this.cleanupInterval); this.cleanupInterval = null; cacheLog("Periodic cleanup stopped for VideoTaskCache"); } } public startTask(taskId: string): void { const startTime = Math.floor(Date.now() / 1000); // Current time in seconds this.taskCache.set(taskId, startTime); cacheLog(`Task started: ${taskId} at ${startTime}`); } /** * 处理视频URL上传到TOS * @param videoUrls 视频URL数组 * @returns TOS URL数组 */ private async uploadVideosToTOS(videoUrls: string[]): Promise { const tosUrls: string[] = []; for (const videoUrl of videoUrls) { try { // 从URL获取文件名 const fileName = `video-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.mp4`; // 上传到TOS const tosUrl = await TOSService.uploadFromUrl(videoUrl, `jimeng_free/videos/${fileName}`); tosUrls.push(tosUrl); logger.info(`视频上传到TOS成功: ${videoUrl} -> ${tosUrl}`); } catch (error) { logger.error(`视频上传到TOS失败: ${videoUrl}`, error); // 如果上传失败,保留原URL tosUrls.push(videoUrl); } } return tosUrls; } public async finishTask(taskId: string, status: -1 | -2 | -3, url: string = ''): Promise { if (!this.taskCache.has(taskId)) { cacheLog(`Attempted to finish non-existent task: ${taskId}`); return; } let finalUrl = url; let statusMessage = ''; switch (status) { case -1: { statusMessage = 'successfully'; if (url) { try { // 任务成功完成时,自动上传到TOS cacheLog(`开始上传视频到TOS: ${taskId}`); const videoUrls = url.split(','); const tosUrls = await this.uploadVideosToTOS(videoUrls); finalUrl = tosUrls.join(','); this.tosProcessedTasks.add(taskId); cacheLog(`Task ${taskId} TOS上传完成,存储TOS地址: ${finalUrl}`); } catch (error) { logger.error(`TOS上传失败,使用原始URL: ${taskId}`, error); finalUrl = url; // 保留原始URL cacheLog(`Task ${taskId} TOS上传失败,使用原始URL`); } } break; } case -2: statusMessage = 'failed'; break; case -3: statusMessage = 'timed out'; break; } // 存储最终URL(TOS地址或原始URL) this.taskCache.set(taskId, finalUrl || status); cacheLog(`Task ${taskId} finished ${statusMessage} (status: ${status})`); } public getTaskStatus(taskId: string): number | string | undefined { return this.taskCache.get(taskId); } /** * 检查任务是否已处理TOS上传(兼容性保持) */ public isTosProcessed(taskId: string): boolean { return this.tosProcessedTasks.has(taskId); } /** * 标记任务为已处理TOS上传(兼容性保持) */ public markTosProcessed(taskId: string): void { this.tosProcessedTasks.add(taskId); cacheLog(`Task ${taskId} marked as TOS processed`); } /** * 获取任务结果并释放缓存 * @param taskId 任务ID * @returns 任务结果,如果不存在返回undefined */ public getTaskResultAndClear(taskId: string): number | string | undefined { const result = this.taskCache.get(taskId); if (result && typeof result === 'string') { // 只有当任务完成时(返回字符串URL)才清除缓存 this.taskCache.delete(taskId); this.tosProcessedTasks.delete(taskId); cacheLog(`Task ${taskId} result retrieved and cache cleared`); } return result; } /** * 清理过期任务(超过1小时的任务) * 防止在低配置服务器上内存泄漏 */ public clearExpiredTasks(): void { const now = Math.floor(Date.now() / 1000); const expiredTime = 3600; // 1小时 let clearCount = 0; for (const [taskId, status] of this.taskCache.entries()) { if (typeof status === 'number' && status > 0) { // 这是一个时间戳,检查是否过期 if (now - status > expiredTime) { this.taskCache.delete(taskId); this.tosProcessedTasks.delete(taskId); clearCount++; } } } if (clearCount > 0) { cacheLog(`Cleared ${clearCount} expired tasks`); } } /** * 获取缓存统计信息 */ public getCacheStats(): { totalTasks: number, completedTasks: number, pendingTasks: number } { let completedTasks = 0; let pendingTasks = 0; for (const [, status] of this.taskCache.entries()) { if (typeof status === 'string') { completedTasks++; } else if (typeof status === 'number' && status > 0) { pendingTasks++; } } return { totalTasks: this.taskCache.size, completedTasks, pendingTasks }; } public getPendingTasks(): string[] { const pendingTasks: string[] = []; for (const [taskId, status] of this.taskCache.entries()) { if (typeof status == 'number' && status > 0) { pendingTasks.push(taskId); } } return pendingTasks; } public logPendingTasksOnShutdown(): void { const pendingTasks = this.getPendingTasks(); if (pendingTasks.length > 0) { cacheLog(`Pending tasks at shutdown: ${pendingTasks.join(', ')}`, 'yellow'); } else { cacheLog("No pending tasks at shutdown."); } // 关闭时停止定时清理并进行最终清理 this.stopPeriodicCleanup(); this.clearExpiredTasks(); const stats = this.getCacheStats(); cacheLog(`Final cache stats - Total: ${stats.totalTasks}, Completed: ${stats.completedTasks}, Pending: ${stats.pendingTasks}`); } } // Initialize the singleton instance when the module is loaded. // This ensures it's ready when the service starts. VideoTaskCache.getInstance();