From 497cc194846c131fb41e85d23b361a2897e77116 Mon Sep 17 00:00:00 2001 From: jonathang4 Date: Thu, 28 Aug 2025 18:45:30 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E9=87=8D=E8=AF=95=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/api/ImagesTaskCache.ts | 10 +- src/api/VideoTaskCache.ts | 10 +- src/lib/database/models/GenerationResult.ts | 1 + src/lib/database/models/GenerationTask.ts | 6 +- src/lib/services/DatabaseGenerationService.ts | 5 + src/lib/services/NeDBCleanupService.ts | 2 +- src/lib/services/TaskPollingService.ts | 246 +++++++++++++++++- src/lib/tos/tos-client.ts | 68 +++-- src/lib/tos/tos-service.ts | 4 +- 9 files changed, 315 insertions(+), 37 deletions(-) diff --git a/src/api/ImagesTaskCache.ts b/src/api/ImagesTaskCache.ts index e37d011..b4fb03a 100644 --- a/src/api/ImagesTaskCache.ts +++ b/src/api/ImagesTaskCache.ts @@ -6,6 +6,9 @@ import { formatInTimeZone } from 'date-fns-tz'; import TOSService from '@/lib/tos/tos-service.js'; import logger from '@/lib/logger.js'; +// 环境变量控制新旧方法切换 +const USE_DATABASE_MODE = process.env.USE_DATABASE_MODE === 'true'; + const LOG_PATH = path.resolve("./logs/images_task_cache.log"); function cacheLog(value: string, color?: string) { @@ -110,7 +113,8 @@ export class ImagesTaskCache { switch (status) { case -1: { statusMessage = 'successfully'; - if (url) { + if (url && !USE_DATABASE_MODE) { + // 只在非数据库模式下执行TOS上传,避免与TaskPollingService重复上传 try { // 任务成功完成时,自动上传到TOS cacheLog(`开始上传图片到TOS: ${taskId}`); @@ -124,6 +128,10 @@ export class ImagesTaskCache { finalUrl = url; // 保留原始URL cacheLog(`Task ${taskId} TOS上传失败,使用原始URL`); } + } else if (url) { + // 数据库模式下,直接使用原始URL,TOS上传由TaskPollingService处理 + finalUrl = url; + cacheLog(`Task ${taskId} 数据库模式下完成,TOS上传由TaskPollingService处理`); } break; } diff --git a/src/api/VideoTaskCache.ts b/src/api/VideoTaskCache.ts index 9e0f71d..cf1aedd 100644 --- a/src/api/VideoTaskCache.ts +++ b/src/api/VideoTaskCache.ts @@ -6,6 +6,9 @@ import { formatInTimeZone } from 'date-fns-tz'; import TOSService from '@/lib/tos/tos-service.js'; import logger from '@/lib/logger.js'; +// 环境变量控制新旧方法切换 +const USE_DATABASE_MODE = process.env.USE_DATABASE_MODE === 'true'; + const LOG_PATH = path.resolve("./logs/video_task_cache.log"); function cacheLog(value: string, color?: string) { @@ -110,7 +113,8 @@ export class VideoTaskCache { switch (status) { case -1: { statusMessage = 'successfully'; - if (url) { + if (url && !USE_DATABASE_MODE) { + // 只在非数据库模式下执行TOS上传,避免与TaskPollingService重复上传 try { // 任务成功完成时,自动上传到TOS cacheLog(`开始上传视频到TOS: ${taskId}`); @@ -124,6 +128,10 @@ export class VideoTaskCache { finalUrl = url; // 保留原始URL cacheLog(`Task ${taskId} TOS上传失败,使用原始URL`); } + } else if (url) { + // 数据库模式下,直接使用原始URL,TOS上传由TaskPollingService处理 + finalUrl = url; + cacheLog(`Task ${taskId} 数据库模式下完成,TOS上传由TaskPollingService处理`); } break; } diff --git a/src/lib/database/models/GenerationResult.ts b/src/lib/database/models/GenerationResult.ts index 95bcd2b..c6d5a14 100644 --- a/src/lib/database/models/GenerationResult.ts +++ b/src/lib/database/models/GenerationResult.ts @@ -17,6 +17,7 @@ export interface IGenerationResult { tos_upload_time?: number; // TOS上传耗时(毫秒) total_files: number; // 文件总数 successful_uploads: number; // 成功上传数量 + upload_retry_count?: number; // 上传重试次数 tos_upload_errors?: string[]; // TOS上传错误信息 fail_reason?: string; // 失败原因 }; diff --git a/src/lib/database/models/GenerationTask.ts b/src/lib/database/models/GenerationTask.ts index b477a36..e1295a0 100644 --- a/src/lib/database/models/GenerationTask.ts +++ b/src/lib/database/models/GenerationTask.ts @@ -35,9 +35,11 @@ export interface IGenerationTask { }; // 任务状态和控制 - status: 'pending' | 'processing' | 'polling' | 'completed' | 'failed'; + status: 'pending' | 'processing' | 'polling' | 'processing_success' | 'uploading' | 'completed' | 'failed'; retry_count: number; // 当前重试次数 max_retries: number; // 最大重试次数(默认3次) + upload_retry_count: number; // TOS上传重试次数 + max_upload_retries: number; // 最大TOS上传重试次数(默认5次) // 轮询控制 next_poll_at?: number; // 下次轮询时间戳(用于控制轮询间隔) @@ -69,6 +71,8 @@ export class GenerationTask { status: taskData.status || 'pending', retry_count: taskData.retry_count || 0, max_retries: taskData.max_retries || 3, + upload_retry_count: taskData.upload_retry_count || 0, + max_upload_retries: taskData.max_upload_retries || 5, poll_interval: taskData.poll_interval || 10 }; diff --git a/src/lib/services/DatabaseGenerationService.ts b/src/lib/services/DatabaseGenerationService.ts index 6e7c822..b6b671b 100644 --- a/src/lib/services/DatabaseGenerationService.ts +++ b/src/lib/services/DatabaseGenerationService.ts @@ -74,6 +74,8 @@ export class DatabaseGenerationService { status: 'pending', retry_count: 0, max_retries: 3, + upload_retry_count: 0, + max_upload_retries: 5, poll_interval: 10, task_timeout: imageTimeout, created_at: Math.floor(Date.now() / 1000), @@ -139,6 +141,8 @@ export class DatabaseGenerationService { status: 'pending', retry_count: 0, max_retries: 3, + upload_retry_count: 0, + max_upload_retries: 5, poll_interval: 15, // 视频轮询间隔更长 task_timeout: videoTimeout, created_at: Math.floor(Date.now() / 1000), @@ -212,6 +216,7 @@ export class DatabaseGenerationService { 'pending': 0, 'processing': 0, 'polling': 0, + 'processing_success': 0, // 正在处理成功结果,仍需等待 'failed': -2, 'completed': -1 // 这种情况理论上不会出现,因为completed会生成result }; diff --git a/src/lib/services/NeDBCleanupService.ts b/src/lib/services/NeDBCleanupService.ts index caf4cc8..412e478 100644 --- a/src/lib/services/NeDBCleanupService.ts +++ b/src/lib/services/NeDBCleanupService.ts @@ -112,7 +112,7 @@ export class NeDBCleanupService { // 删除超过 7 天的已完成任务 const deletedCount = await GenerationTask.deleteMany({ - status: { $in: ['completed', 'failed', 'cancelled'] }, + status: { $in: ['completed', 'failed', 'cancelled', 'processing_success'] }, created_at: { $lt: sevenDaysAgo } }); diff --git a/src/lib/services/TaskPollingService.ts b/src/lib/services/TaskPollingService.ts index 6276686..f2269bf 100644 --- a/src/lib/services/TaskPollingService.ts +++ b/src/lib/services/TaskPollingService.ts @@ -180,10 +180,10 @@ export class TaskPollingService { */ private async checkPollingTasks(currentTime: number): Promise { try { - // 获取需要轮询的任务 + // 获取需要轮询的任务(排除正在处理成功状态的任务) const pollingTasks = await GenerationTask.find({ server_id: this.currentServerId, - status: 'polling', + status: 'polling', // 只处理polling状态的任务,不处理processing_success状态 $or: [ { next_poll_at: { $exists: false } }, { next_poll_at: { $lte: currentTime } } @@ -197,11 +197,92 @@ export class TaskPollingService { if (pollingTasks.length > 0) { taskLog(`Polled ${pollingTasks.length} tasks for results`); } + + // 查找需要重试上传的任务 + const uploadingTasks = await GenerationTask.find({ + server_id: this.currentServerId, + status: 'uploading' + }); + + for (const task of uploadingTasks) { + await this.retryTOSUpload(task, currentTime); + } + + if (uploadingTasks.length > 0) { + taskLog(`Retried upload for ${uploadingTasks.length} tasks`); + } } catch (error) { taskLog(`Failed to check polling tasks: ${error.message}`); } } + /** + * 重试TOS上传 + */ + private async retryTOSUpload(task: IGenerationTask, currentTime: number): Promise { + try { + // 检查是否需要重试(避免过于频繁的重试) + const timeSinceLastUpdate = currentTime - task.updated_at; + const minRetryInterval = 30; // 最少等待30秒再重试 + + if (timeSinceLastUpdate < minRetryInterval) { + return; // 还未到重试时间 + } + + taskLog(`Retrying TOS upload for task ${task.task_id}, attempt ${(task.upload_retry_count || 0) + 1}`); + + // 从processing_success状态重新获取原始URLs + // 这里需要重新获取生成的URLs,因为我们没有在任务中存储它们 + // 为了简化,我们可以从internal_params中获取history_id,然后重新查询 + const historyId = task.internal_params.history_id; + if (!historyId) { + throw new Error('Missing history_id for upload retry'); + } + + // 重新获取生成结果 + const result = await request("post", "/mweb/v1/get_history_by_ids", task.internal_params.refresh_token, { + data: { + history_ids: [historyId], + image_info: task.task_type === 'image' ? this.getImageInfo() : undefined, + http_common_info: { + aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"), + }, + }, + }); + + if (!result[historyId] || !result[historyId].item_list) { + throw new Error('Failed to retrieve generation result for upload retry'); + } + + const { item_list } = result[historyId]; + let originalUrls: string[] = []; + + if (task.task_type === 'image') { + originalUrls = item_list.map((item) => { + if (item?.image?.large_images?.[0]?.image_url) { + return item.image.large_images[0].image_url; + } + return item?.common_attr?.cover_url || null; + }).filter(url => url !== null); + } else { + originalUrls = item_list.map((item) => { + return item?.video?.transcoded_video?.origin?.video_url || null; + }).filter(url => url !== null); + } + + if (originalUrls.length === 0) { + throw new Error('No valid URLs found for upload retry'); + } + + // 执行上传 + await this.performTOSUpload(task, originalUrls, currentTime); + + } catch (error) { + taskLog(`Failed to retry TOS upload for task ${task.task_id}: ${error.message}`); + await this.handleUploadFailure(task, error.message, currentTime); + } + } + /** * 开始处理任务 */ @@ -347,8 +428,64 @@ export class TaskPollingService { taskLog(`Task ${task.task_id} generated ${originalUrls.length} files`); - // 上传到TOS - const tosUrls = await this.uploadToTOS(originalUrls, task.task_type, task.task_id); + // 先标记任务为处理中,防止重复处理 + const updateResult = await GenerationTask.updateOne( + { task_id: task.task_id, status: 'polling' }, + { + $set: { + status: 'processing_success', + updated_at: currentTime + } + } + ); + + if (updateResult === 0) { + taskLog(`Task ${task.task_id} already being processed by another instance`); + return; // 任务已被其他进程处理 + } + + // 开始上传到TOS + await this.startTOSUpload(task, originalUrls, currentTime); + + } catch (error) { + taskLog(`Failed to handle success for task ${task.task_id}: ${error.message}`); + await this.markTaskFailed(task.task_id, error.message); + } + } + + /** + * 开始TOS上传流程 + */ + private async startTOSUpload(task: IGenerationTask, originalUrls: string[], currentTime: number): Promise { + try { + // 标记任务为上传中状态 + await GenerationTask.updateOne( + { task_id: task.task_id }, + { + $set: { + status: 'uploading', + upload_retry_count: 0, + updated_at: currentTime + } + } + ); + + // 执行上传 + await this.performTOSUpload(task, originalUrls, currentTime); + + } catch (error) { + taskLog(`Failed to start TOS upload for task ${task.task_id}: ${error.message}`); + await this.handleUploadFailure(task, error.message, currentTime); + } + } + + /** + * 执行TOS上传 + */ + private async performTOSUpload(task: IGenerationTask, originalUrls: string[], currentTime: number): Promise { + try { + // 上传到TOS(带重试机制) + const tosUrls = await this.uploadToTOSWithRetry(originalUrls, task.task_type, task.task_id); // 创建结果记录 const expireTime = currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400'); @@ -364,6 +501,7 @@ export class TaskPollingService { generation_time: (currentTime - (task.started_at || task.created_at)) * 1000, total_files: originalUrls.length, successful_uploads: tosUrls.length, + upload_retry_count: task.upload_retry_count || 0, tos_upload_errors: tosUrls.length < originalUrls.length ? ['Some uploads failed'] : undefined }, created_at: currentTime, @@ -387,11 +525,107 @@ export class TaskPollingService { taskLog(`Task ${task.task_id} completed successfully`); } catch (error) { - taskLog(`Failed to handle success for task ${task.task_id}: ${error.message}`); - await this.markTaskFailed(task.task_id, error.message); + taskLog(`TOS upload failed for task ${task.task_id}: ${error.message}`); + await this.handleUploadFailure(task, error.message, currentTime); } } + /** + * 处理上传失败 + */ + private async handleUploadFailure(task: IGenerationTask, errorMessage: string, currentTime: number): Promise { + const currentRetryCount = task.upload_retry_count || 0; + const maxRetries = task.max_upload_retries || 5; + + if (currentRetryCount >= maxRetries) { + // 重试次数已达上限,标记任务失败 + taskLog(`Task ${task.task_id} upload failed after ${maxRetries} retries`); + + try { + // 创建失败结果记录 + const expireTime = currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400'); + + await GenerationResult.create({ + task_id: task.task_id, + task_type: task.task_type, + server_id: task.server_id, + status: 'failed', + original_urls: [], + tos_urls: [], + metadata: { + total_files: 0, + successful_uploads: 0, + upload_retry_count: currentRetryCount, + fail_reason: `TOS upload failed after ${maxRetries} retries: ${errorMessage}` + }, + created_at: currentTime, + expires_at: expireTime, + is_read: false, + read_count: 0 + }); + + // 标记任务失败 + await GenerationTask.updateOne( + { task_id: task.task_id }, + { + $set: { + status: 'failed', + error_message: `Upload failed after ${maxRetries} retries: ${errorMessage}`, + completed_at: currentTime, + updated_at: currentTime + } + } + ); + + } catch (cleanupError) { + taskLog(`Failed to cleanup task ${task.task_id} after upload failure: ${cleanupError.message}`); + } + } else { + // 增加重试计数,保持上传中状态 + await GenerationTask.updateOne( + { task_id: task.task_id }, + { + $set: { + upload_retry_count: currentRetryCount + 1, + updated_at: currentTime + } + } + ); + + taskLog(`Task ${task.task_id} upload retry ${currentRetryCount + 1}/${maxRetries} scheduled`); + } + } + + /** + * 上传文件到TOS(带重试机制) + */ + private async uploadToTOSWithRetry(urls: string[], taskType: 'image' | 'video', task_id: string): Promise { + const tosUrls: string[] = []; + + for (const url of urls) { + try { + const fileName = taskType === 'image' + ? `image-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.png` + : `video-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.mp4`; + + const folder = taskType === 'image' ? 'images' : 'videos'; + const tosUrl = await TOSService.uploadFromUrl( + url, + `jimeng_free/${folder}/${task_id}/${fileName}`, + { maxRetries: 3 } // TOS客户端内部重试3次 + ); + tosUrls.push(tosUrl); + + taskLog(`${taskType} uploaded to TOS: ${url} -> ${tosUrl}`); + } catch (error) { + taskLog(`${taskType} upload to TOS failed: ${url} - ${error.message}`); + throw error; // 抛出错误,让上层处理重试逻辑 + } + } + + return tosUrls; + } + /** * 处理生成失败 */ diff --git a/src/lib/tos/tos-client.ts b/src/lib/tos/tos-client.ts index 434ae80..65ec603 100644 --- a/src/lib/tos/tos-client.ts +++ b/src/lib/tos/tos-client.ts @@ -189,45 +189,63 @@ export class TOSClientWrapper { } /** - * 从网络URL下载文件并上传到TOS + * 从网络URL下载文件并上传到TOS(带重试机制) */ async uploadFromUrl( url: string, objectKey: string, - options: UploadOptions & { timeout?: number } = {} + options: UploadOptions & { timeout?: number; maxRetries?: number } = {} ): Promise { - const { headers = {}, timeout = 30000, returnUrl = true } = options; + const { headers = {}, timeout = 30000, returnUrl = true, maxRetries = 3 } = options; if (!url.startsWith('http://') && !url.startsWith('https://')) { throw new Error('URL必须以http://或https://开头'); } - try { - // 下载文件 - const response = await axios.get(url, { - responseType: 'arraybuffer', - timeout - }); + let lastError: Error; + + for (let attempt = 1; attempt <= maxRetries; attempt++) { + try { + // 下载文件 + const response = await axios.get(url, { + responseType: 'arraybuffer', + timeout + }); - // 获取内容类型 - let contentType = response.headers['content-type'] || ''; - if (!contentType) { - contentType = mimeLookup(url) || 'application/octet-stream'; - } + // 获取内容类型 + let contentType = response.headers['content-type'] || ''; + if (!contentType) { + contentType = mimeLookup(url) || 'application/octet-stream'; + } - // 上传到TOS - return this.uploadBytes( - Buffer.from(response.data), - objectKey, - contentType, - { headers, returnUrl } - ); - } catch (error) { - if (error.code === 'ECONNABORTED' || error.code === 'ETIMEDOUT') { - throw new Error(`下载网络文件超时: ${url}`); + // 上传到TOS + return this.uploadBytes( + Buffer.from(response.data), + objectKey, + contentType, + { headers, returnUrl } + ); + } catch (error) { + lastError = error; + + const isTimeout = error.code === 'ECONNABORTED' || error.code === 'ETIMEDOUT'; + const isNetworkError = error.code === 'ENOTFOUND' || error.code === 'ECONNRESET' || error.code === 'ECONNREFUSED'; + + // 如果是最后一次尝试,或者是非网络/超时错误,直接抛出 + if (attempt === maxRetries || (!isTimeout && !isNetworkError)) { + if (isTimeout) { + throw new Error(`下载网络文件超时(重试${maxRetries}次后失败): ${url}`); + } + throw new Error(`上传网络文件到TOS失败(重试${maxRetries}次后失败): ${error.message}`); + } + + // 等待一段时间后重试(指数退避) + const delay = Math.min(1000 * Math.pow(2, attempt - 1), 10000); // 最大10秒 + await new Promise(resolve => setTimeout(resolve, delay)); } - throw new Error(`上传网络文件到TOS失败: ${error.message}`); } + + throw lastError; } /** diff --git a/src/lib/tos/tos-service.ts b/src/lib/tos/tos-service.ts index 77d6f50..f261081 100644 --- a/src/lib/tos/tos-service.ts +++ b/src/lib/tos/tos-service.ts @@ -57,11 +57,11 @@ export class TOSService { /** * 从URL上传文件 */ - static async uploadFromUrl(url: string, targetPath?: string): Promise { + static async uploadFromUrl(url: string, targetPath?: string, options?: any): Promise { try { const fileName = path.basename(new URL(url).pathname) || 'downloaded-file'; const objectKey = targetPath || `downloads/${Date.now()}-${fileName}`; - return await getTOSClient().uploadFromUrl(url, objectKey); + return await getTOSClient().uploadFromUrl(url, objectKey, options); } catch (error) { console.error('从URL上传文件失败:', error.message); throw error;