上传文件自动重试机制

This commit is contained in:
jonathang4 2025-08-28 18:45:30 +08:00
parent 99fb1876f5
commit 497cc19484
9 changed files with 315 additions and 37 deletions

View File

@ -6,6 +6,9 @@ import { formatInTimeZone } from 'date-fns-tz';
import TOSService from '@/lib/tos/tos-service.js';
import logger from '@/lib/logger.js';
// 环境变量控制新旧方法切换
const USE_DATABASE_MODE = process.env.USE_DATABASE_MODE === 'true';
const LOG_PATH = path.resolve("./logs/images_task_cache.log");
function cacheLog(value: string, color?: string) {
@ -110,7 +113,8 @@ export class ImagesTaskCache {
switch (status) {
case -1: {
statusMessage = 'successfully';
if (url) {
if (url && !USE_DATABASE_MODE) {
// 只在非数据库模式下执行TOS上传避免与TaskPollingService重复上传
try {
// 任务成功完成时自动上传到TOS
cacheLog(`开始上传图片到TOS: ${taskId}`);
@ -124,6 +128,10 @@ export class ImagesTaskCache {
finalUrl = url; // 保留原始URL
cacheLog(`Task ${taskId} TOS上传失败使用原始URL`);
}
} else if (url) {
// 数据库模式下直接使用原始URLTOS上传由TaskPollingService处理
finalUrl = url;
cacheLog(`Task ${taskId} 数据库模式下完成TOS上传由TaskPollingService处理`);
}
break;
}

View File

@ -6,6 +6,9 @@ import { formatInTimeZone } from 'date-fns-tz';
import TOSService from '@/lib/tos/tos-service.js';
import logger from '@/lib/logger.js';
// 环境变量控制新旧方法切换
const USE_DATABASE_MODE = process.env.USE_DATABASE_MODE === 'true';
const LOG_PATH = path.resolve("./logs/video_task_cache.log");
function cacheLog(value: string, color?: string) {
@ -110,7 +113,8 @@ export class VideoTaskCache {
switch (status) {
case -1: {
statusMessage = 'successfully';
if (url) {
if (url && !USE_DATABASE_MODE) {
// 只在非数据库模式下执行TOS上传避免与TaskPollingService重复上传
try {
// 任务成功完成时自动上传到TOS
cacheLog(`开始上传视频到TOS: ${taskId}`);
@ -124,6 +128,10 @@ export class VideoTaskCache {
finalUrl = url; // 保留原始URL
cacheLog(`Task ${taskId} TOS上传失败使用原始URL`);
}
} else if (url) {
// 数据库模式下直接使用原始URLTOS上传由TaskPollingService处理
finalUrl = url;
cacheLog(`Task ${taskId} 数据库模式下完成TOS上传由TaskPollingService处理`);
}
break;
}

View File

@ -17,6 +17,7 @@ export interface IGenerationResult {
tos_upload_time?: number; // TOS上传耗时毫秒
total_files: number; // 文件总数
successful_uploads: number; // 成功上传数量
upload_retry_count?: number; // 上传重试次数
tos_upload_errors?: string[]; // TOS上传错误信息
fail_reason?: string; // 失败原因
};

View File

@ -35,9 +35,11 @@ export interface IGenerationTask {
};
// 任务状态和控制
status: 'pending' | 'processing' | 'polling' | 'completed' | 'failed';
status: 'pending' | 'processing' | 'polling' | 'processing_success' | 'uploading' | 'completed' | 'failed';
retry_count: number; // 当前重试次数
max_retries: number; // 最大重试次数默认3次
upload_retry_count: number; // TOS上传重试次数
max_upload_retries: number; // 最大TOS上传重试次数默认5次
// 轮询控制
next_poll_at?: number; // 下次轮询时间戳(用于控制轮询间隔)
@ -69,6 +71,8 @@ export class GenerationTask {
status: taskData.status || 'pending',
retry_count: taskData.retry_count || 0,
max_retries: taskData.max_retries || 3,
upload_retry_count: taskData.upload_retry_count || 0,
max_upload_retries: taskData.max_upload_retries || 5,
poll_interval: taskData.poll_interval || 10
};

View File

@ -74,6 +74,8 @@ export class DatabaseGenerationService {
status: 'pending',
retry_count: 0,
max_retries: 3,
upload_retry_count: 0,
max_upload_retries: 5,
poll_interval: 10,
task_timeout: imageTimeout,
created_at: Math.floor(Date.now() / 1000),
@ -139,6 +141,8 @@ export class DatabaseGenerationService {
status: 'pending',
retry_count: 0,
max_retries: 3,
upload_retry_count: 0,
max_upload_retries: 5,
poll_interval: 15, // 视频轮询间隔更长
task_timeout: videoTimeout,
created_at: Math.floor(Date.now() / 1000),
@ -212,6 +216,7 @@ export class DatabaseGenerationService {
'pending': 0,
'processing': 0,
'polling': 0,
'processing_success': 0, // 正在处理成功结果,仍需等待
'failed': -2,
'completed': -1 // 这种情况理论上不会出现因为completed会生成result
};

View File

@ -112,7 +112,7 @@ export class NeDBCleanupService {
// 删除超过 7 天的已完成任务
const deletedCount = await GenerationTask.deleteMany({
status: { $in: ['completed', 'failed', 'cancelled'] },
status: { $in: ['completed', 'failed', 'cancelled', 'processing_success'] },
created_at: { $lt: sevenDaysAgo }
});

View File

@ -180,10 +180,10 @@ export class TaskPollingService {
*/
private async checkPollingTasks(currentTime: number): Promise<void> {
try {
// 获取需要轮询的任务
// 获取需要轮询的任务(排除正在处理成功状态的任务)
const pollingTasks = await GenerationTask.find({
server_id: this.currentServerId,
status: 'polling',
status: 'polling', // 只处理polling状态的任务不处理processing_success状态
$or: [
{ next_poll_at: { $exists: false } },
{ next_poll_at: { $lte: currentTime } }
@ -197,11 +197,92 @@ export class TaskPollingService {
if (pollingTasks.length > 0) {
taskLog(`Polled ${pollingTasks.length} tasks for results`);
}
// 查找需要重试上传的任务
const uploadingTasks = await GenerationTask.find({
server_id: this.currentServerId,
status: 'uploading'
});
for (const task of uploadingTasks) {
await this.retryTOSUpload(task, currentTime);
}
if (uploadingTasks.length > 0) {
taskLog(`Retried upload for ${uploadingTasks.length} tasks`);
}
} catch (error) {
taskLog(`Failed to check polling tasks: ${error.message}`);
}
}
/**
* TOS上传
*/
private async retryTOSUpload(task: IGenerationTask, currentTime: number): Promise<void> {
try {
// 检查是否需要重试(避免过于频繁的重试)
const timeSinceLastUpdate = currentTime - task.updated_at;
const minRetryInterval = 30; // 最少等待30秒再重试
if (timeSinceLastUpdate < minRetryInterval) {
return; // 还未到重试时间
}
taskLog(`Retrying TOS upload for task ${task.task_id}, attempt ${(task.upload_retry_count || 0) + 1}`);
// 从processing_success状态重新获取原始URLs
// 这里需要重新获取生成的URLs因为我们没有在任务中存储它们
// 为了简化我们可以从internal_params中获取history_id然后重新查询
const historyId = task.internal_params.history_id;
if (!historyId) {
throw new Error('Missing history_id for upload retry');
}
// 重新获取生成结果
const result = await request("post", "/mweb/v1/get_history_by_ids", task.internal_params.refresh_token, {
data: {
history_ids: [historyId],
image_info: task.task_type === 'image' ? this.getImageInfo() : undefined,
http_common_info: {
aid: Number(process.env.DEFAULT_ASSISTANT_ID || "513695"),
},
},
});
if (!result[historyId] || !result[historyId].item_list) {
throw new Error('Failed to retrieve generation result for upload retry');
}
const { item_list } = result[historyId];
let originalUrls: string[] = [];
if (task.task_type === 'image') {
originalUrls = item_list.map((item) => {
if (item?.image?.large_images?.[0]?.image_url) {
return item.image.large_images[0].image_url;
}
return item?.common_attr?.cover_url || null;
}).filter(url => url !== null);
} else {
originalUrls = item_list.map((item) => {
return item?.video?.transcoded_video?.origin?.video_url || null;
}).filter(url => url !== null);
}
if (originalUrls.length === 0) {
throw new Error('No valid URLs found for upload retry');
}
// 执行上传
await this.performTOSUpload(task, originalUrls, currentTime);
} catch (error) {
taskLog(`Failed to retry TOS upload for task ${task.task_id}: ${error.message}`);
await this.handleUploadFailure(task, error.message, currentTime);
}
}
/**
*
*/
@ -347,8 +428,64 @@ export class TaskPollingService {
taskLog(`Task ${task.task_id} generated ${originalUrls.length} files`);
// 上传到TOS
const tosUrls = await this.uploadToTOS(originalUrls, task.task_type, task.task_id);
// 先标记任务为处理中,防止重复处理
const updateResult = await GenerationTask.updateOne(
{ task_id: task.task_id, status: 'polling' },
{
$set: {
status: 'processing_success',
updated_at: currentTime
}
}
);
if (updateResult === 0) {
taskLog(`Task ${task.task_id} already being processed by another instance`);
return; // 任务已被其他进程处理
}
// 开始上传到TOS
await this.startTOSUpload(task, originalUrls, currentTime);
} catch (error) {
taskLog(`Failed to handle success for task ${task.task_id}: ${error.message}`);
await this.markTaskFailed(task.task_id, error.message);
}
}
/**
* TOS上传流程
*/
private async startTOSUpload(task: IGenerationTask, originalUrls: string[], currentTime: number): Promise<void> {
try {
// 标记任务为上传中状态
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
$set: {
status: 'uploading',
upload_retry_count: 0,
updated_at: currentTime
}
}
);
// 执行上传
await this.performTOSUpload(task, originalUrls, currentTime);
} catch (error) {
taskLog(`Failed to start TOS upload for task ${task.task_id}: ${error.message}`);
await this.handleUploadFailure(task, error.message, currentTime);
}
}
/**
* TOS上传
*/
private async performTOSUpload(task: IGenerationTask, originalUrls: string[], currentTime: number): Promise<void> {
try {
// 上传到TOS带重试机制
const tosUrls = await this.uploadToTOSWithRetry(originalUrls, task.task_type, task.task_id);
// 创建结果记录
const expireTime = currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400');
@ -364,6 +501,7 @@ export class TaskPollingService {
generation_time: (currentTime - (task.started_at || task.created_at)) * 1000,
total_files: originalUrls.length,
successful_uploads: tosUrls.length,
upload_retry_count: task.upload_retry_count || 0,
tos_upload_errors: tosUrls.length < originalUrls.length ? ['Some uploads failed'] : undefined
},
created_at: currentTime,
@ -387,11 +525,107 @@ export class TaskPollingService {
taskLog(`Task ${task.task_id} completed successfully`);
} catch (error) {
taskLog(`Failed to handle success for task ${task.task_id}: ${error.message}`);
await this.markTaskFailed(task.task_id, error.message);
taskLog(`TOS upload failed for task ${task.task_id}: ${error.message}`);
await this.handleUploadFailure(task, error.message, currentTime);
}
}
/**
*
*/
private async handleUploadFailure(task: IGenerationTask, errorMessage: string, currentTime: number): Promise<void> {
const currentRetryCount = task.upload_retry_count || 0;
const maxRetries = task.max_upload_retries || 5;
if (currentRetryCount >= maxRetries) {
// 重试次数已达上限,标记任务失败
taskLog(`Task ${task.task_id} upload failed after ${maxRetries} retries`);
try {
// 创建失败结果记录
const expireTime = currentTime + parseInt(process.env.RESULT_EXPIRE_TIME || '86400');
await GenerationResult.create({
task_id: task.task_id,
task_type: task.task_type,
server_id: task.server_id,
status: 'failed',
original_urls: [],
tos_urls: [],
metadata: {
total_files: 0,
successful_uploads: 0,
upload_retry_count: currentRetryCount,
fail_reason: `TOS upload failed after ${maxRetries} retries: ${errorMessage}`
},
created_at: currentTime,
expires_at: expireTime,
is_read: false,
read_count: 0
});
// 标记任务失败
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
$set: {
status: 'failed',
error_message: `Upload failed after ${maxRetries} retries: ${errorMessage}`,
completed_at: currentTime,
updated_at: currentTime
}
}
);
} catch (cleanupError) {
taskLog(`Failed to cleanup task ${task.task_id} after upload failure: ${cleanupError.message}`);
}
} else {
// 增加重试计数,保持上传中状态
await GenerationTask.updateOne(
{ task_id: task.task_id },
{
$set: {
upload_retry_count: currentRetryCount + 1,
updated_at: currentTime
}
}
);
taskLog(`Task ${task.task_id} upload retry ${currentRetryCount + 1}/${maxRetries} scheduled`);
}
}
/**
* TOS
*/
private async uploadToTOSWithRetry(urls: string[], taskType: 'image' | 'video', task_id: string): Promise<string[]> {
const tosUrls: string[] = [];
for (const url of urls) {
try {
const fileName = taskType === 'image'
? `image-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.png`
: `video-${Date.now()}-${Math.random().toString(36).substr(2, 9)}.mp4`;
const folder = taskType === 'image' ? 'images' : 'videos';
const tosUrl = await TOSService.uploadFromUrl(
url,
`jimeng_free/${folder}/${task_id}/${fileName}`,
{ maxRetries: 3 } // TOS客户端内部重试3次
);
tosUrls.push(tosUrl);
taskLog(`${taskType} uploaded to TOS: ${url} -> ${tosUrl}`);
} catch (error) {
taskLog(`${taskType} upload to TOS failed: ${url} - ${error.message}`);
throw error; // 抛出错误,让上层处理重试逻辑
}
}
return tosUrls;
}
/**
*
*/

View File

@ -189,45 +189,63 @@ export class TOSClientWrapper {
}
/**
* URL下载文件并上传到TOS
* URL下载文件并上传到TOS
*/
async uploadFromUrl(
url: string,
objectKey: string,
options: UploadOptions & { timeout?: number } = {}
options: UploadOptions & { timeout?: number; maxRetries?: number } = {}
): Promise<string> {
const { headers = {}, timeout = 30000, returnUrl = true } = options;
const { headers = {}, timeout = 30000, returnUrl = true, maxRetries = 3 } = options;
if (!url.startsWith('http://') && !url.startsWith('https://')) {
throw new Error('URL必须以http://或https://开头');
}
try {
// 下载文件
const response = await axios.get(url, {
responseType: 'arraybuffer',
timeout
});
let lastError: Error;
// 获取内容类型
let contentType = response.headers['content-type'] || '';
if (!contentType) {
contentType = mimeLookup(url) || 'application/octet-stream';
}
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
// 下载文件
const response = await axios.get(url, {
responseType: 'arraybuffer',
timeout
});
// 上传到TOS
return this.uploadBytes(
Buffer.from(response.data),
objectKey,
contentType,
{ headers, returnUrl }
);
} catch (error) {
if (error.code === 'ECONNABORTED' || error.code === 'ETIMEDOUT') {
throw new Error(`下载网络文件超时: ${url}`);
// 获取内容类型
let contentType = response.headers['content-type'] || '';
if (!contentType) {
contentType = mimeLookup(url) || 'application/octet-stream';
}
// 上传到TOS
return this.uploadBytes(
Buffer.from(response.data),
objectKey,
contentType,
{ headers, returnUrl }
);
} catch (error) {
lastError = error;
const isTimeout = error.code === 'ECONNABORTED' || error.code === 'ETIMEDOUT';
const isNetworkError = error.code === 'ENOTFOUND' || error.code === 'ECONNRESET' || error.code === 'ECONNREFUSED';
// 如果是最后一次尝试,或者是非网络/超时错误,直接抛出
if (attempt === maxRetries || (!isTimeout && !isNetworkError)) {
if (isTimeout) {
throw new Error(`下载网络文件超时(重试${maxRetries}次后失败): ${url}`);
}
throw new Error(`上传网络文件到TOS失败重试${maxRetries}次后失败): ${error.message}`);
}
// 等待一段时间后重试(指数退避)
const delay = Math.min(1000 * Math.pow(2, attempt - 1), 10000); // 最大10秒
await new Promise(resolve => setTimeout(resolve, delay));
}
throw new Error(`上传网络文件到TOS失败: ${error.message}`);
}
throw lastError;
}
/**

View File

@ -57,11 +57,11 @@ export class TOSService {
/**
* URL上传文件
*/
static async uploadFromUrl(url: string, targetPath?: string): Promise<string> {
static async uploadFromUrl(url: string, targetPath?: string, options?: any): Promise<string> {
try {
const fileName = path.basename(new URL(url).pathname) || 'downloaded-file';
const objectKey = targetPath || `downloads/${Date.now()}-${fileName}`;
return await getTOSClient().uploadFromUrl(url, objectKey);
return await getTOSClient().uploadFromUrl(url, objectKey, options);
} catch (error) {
console.error('从URL上传文件失败:', error.message);
throw error;