#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 缓存和持久化机制测试脚本 测试新的分层缓存、持久化和清理机制 """ import os import sys import time import json from datetime import datetime, timedelta from unittest.mock import Mock, patch # 添加项目根目录到路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) # 设置测试环境变量 os.environ['ARK_API_KEY'] = 'test_api_key_for_testing' os.environ['ARK_BASE_URL'] = 'https://test.api.com' from task_queue_manager import TaskQueueManager from video_service import get_video_service def test_persistence_mechanism(): """测试持久化机制""" print("=== 测试持久化机制 ===") # 创建测试用的持久化文件 test_persistence_file = "test_persistence.json" # 清理之前的测试文件 if os.path.exists(test_persistence_file): os.remove(test_persistence_file) # 创建mock video service mock_video_service = Mock() mock_video_service.get_all_tasks.return_value = [] # 使用patch来替换get_video_service with patch('task_queue_manager.get_video_service', return_value=mock_video_service): # 创建队列管理器 queue_manager = TaskQueueManager( max_running_tasks=2, update_interval=1, persistence_file=test_persistence_file ) # 设置缓存参数 queue_manager.max_completed_cache_size = 10 queue_manager.completed_cache_ttl_hours = 1 # 模拟添加等待任务 waiting_tasks = [ { 'task_id': 'wait_001', 'content': 'test waiting task 1', 'params': {'test': True}, 'created_at': datetime.now().isoformat() }, { 'task_id': 'wait_002', 'content': 'test waiting task 2', 'params': {'test': True}, 'created_at': datetime.now().isoformat() } ] for task in waiting_tasks: queue_manager.waiting_queue.append(task) # 保存持久化数据 queue_manager._save_persistence_data() print(f"已保存 {len(waiting_tasks)} 个等待任务到持久化文件") # 验证文件是否存在 if os.path.exists(test_persistence_file): with open(test_persistence_file, 'r', encoding='utf-8') as f: saved_data = json.load(f) print(f"持久化文件内容: {saved_data}") # 清空内存中的等待队列 queue_manager.waiting_queue.clear() print("已清空内存中的等待队列") # 从持久化文件恢复 queue_manager._load_persistence_data() print(f"从持久化文件恢复了 {len(queue_manager.waiting_queue)} 个等待任务") # 验证恢复的数据 for i, task in enumerate(queue_manager.waiting_queue): print(f"恢复的任务 {i+1}: {task['task_id']} - {task['content']}") # 清理测试文件 if os.path.exists(test_persistence_file): os.remove(test_persistence_file) print("已清理测试持久化文件") print("持久化机制测试完成\n") def test_cache_mechanism(): """测试缓存机制""" print("=== 测试缓存机制 ===") # 创建mock video service mock_video_service = Mock() mock_video_service.get_all_tasks.return_value = [] # 使用patch来替换get_video_service with patch('task_queue_manager.get_video_service', return_value=mock_video_service): # 创建队列管理器 queue_manager = TaskQueueManager( max_running_tasks=3, update_interval=1 ) # 设置缓存参数 queue_manager.max_completed_cache_size = 5 queue_manager.completed_cache_ttl_hours = 0.003 # 约10秒TTL用于测试 # 模拟运行中任务 running_tasks = [ { 'task_id': 'run_001', 'status': 'running', 'content': 'running task 1', 'params': {'test': True}, 'created_at': datetime.now().isoformat(), 'cache_time': datetime.now().isoformat() }, { 'task_id': 'run_002', 'status': 'running', 'content': 'running task 2', 'params': {'test': True}, 'created_at': datetime.now().isoformat(), 'cache_time': datetime.now().isoformat() } ] # 添加到运行中缓存 for task in running_tasks: queue_manager.running_tasks_cache[task['task_id']] = task print(f"添加了 {len(running_tasks)} 个运行中任务到缓存") # 模拟已完成任务 completed_tasks = [ { 'task_id': 'comp_001', 'status': 'succeeded', 'content': 'completed task 1', 'params': {'test': True}, 'created_at': (datetime.now() - timedelta(minutes=5)).isoformat(), 'cache_time': datetime.now().isoformat() }, { 'task_id': 'comp_002', 'status': 'succeeded', 'content': 'completed task 2', 'params': {'test': True}, 'created_at': (datetime.now() - timedelta(minutes=3)).isoformat(), 'cache_time': datetime.now().isoformat() } ] # 添加到已完成缓存 for task in completed_tasks: queue_manager.completed_tasks_cache[task['task_id']] = task print(f"添加了 {len(completed_tasks)} 个已完成任务到缓存") # 测试缓存查询 print("\n=== 测试缓存查询 ===") # 查询运行中任务 task = queue_manager.get_task_from_cache('run_001') if task: print(f"从缓存获取运行中任务: {task['task_id']} - {task['status']}") # 查询已完成任务 task = queue_manager.get_task_from_cache('comp_001') if task: print(f"从缓存获取已完成任务: {task['task_id']} - {task['status']}") # 查询不存在的任务 task = queue_manager.get_task_from_cache('not_exist') if not task: print("正确处理了不存在的任务查询") # 测试缓存状态 status = queue_manager.get_queue_status() print(f"\n缓存状态:") print(f"运行中任务数: {status['running_tasks_count']}") print(f"已完成任务数: {status['completed_tasks_count']}") print(f"等待队列数: {status['waiting_queue_count']}") print("缓存机制测试完成\n") def test_cleanup_mechanism(): """测试清理机制""" print("=== 测试清理机制 ===") # 创建mock video service mock_video_service = Mock() mock_video_service.get_all_tasks.return_value = [] # 使用patch来替换get_video_service with patch('task_queue_manager.get_video_service', return_value=mock_video_service): # 创建队列管理器,设置较短的TTL用于测试 queue_manager = TaskQueueManager( max_running_tasks=3, update_interval=1 ) # 设置缓存参数 queue_manager.max_completed_cache_size = 3 # 设置较小的缓存大小 queue_manager.completed_cache_ttl_hours = 0.0006 # 约2秒TTL用于测试 # 添加多个已完成任务 old_tasks = [] for i in range(5): task = { 'task_id': f'old_task_{i}', 'status': 'succeeded', 'content': f'old task {i}', 'params': {'test': True}, 'created_at': (datetime.now() - timedelta(hours=2)).isoformat(), 'cache_time': (datetime.now() - timedelta(hours=1)).isoformat() } old_tasks.append(task) queue_manager.completed_tasks_cache[task['task_id']] = task print(f"添加了 {len(old_tasks)} 个旧的已完成任务") # 检查清理前的状态 status_before = queue_manager.get_queue_status() print(f"清理前已完成任务数: {status_before['completed_tasks_count']}") # 等待一段时间让任务过期 print("等待任务过期...") time.sleep(3) # 手动触发清理 queue_manager._cleanup_completed_tasks() # 检查清理后的状态 status_after = queue_manager.get_queue_status() print(f"清理后已完成任务数: {status_after['completed_tasks_count']}") # 验证清理效果 if status_after['completed_tasks_count'] < status_before['completed_tasks_count']: print("[OK] 清理机制正常工作") else: print("[ERROR] 清理机制可能存在问题") # 测试数量限制清理 print("\n=== 测试数量限制清理 ===") # 添加更多任务超过限制 for i in range(5): task = { 'task_id': f'new_task_{i}', 'status': 'succeeded', 'content': f'new task {i}', 'params': {'test': True}, 'created_at': datetime.now().isoformat(), 'cache_time': datetime.now().isoformat() } queue_manager.completed_tasks_cache[task['task_id']] = task print(f"添加了5个新任务,当前已完成任务数: {len(queue_manager.completed_tasks_cache)}") # 触发清理 queue_manager._cleanup_completed_tasks() final_status = queue_manager.get_queue_status() print(f"最终已完成任务数: {final_status['completed_tasks_count']}") print(f"缓存大小限制: {queue_manager.max_completed_cache_size}") if final_status['completed_tasks_count'] <= queue_manager.max_completed_cache_size: print("[OK] 数量限制清理正常工作") else: print("[ERROR] 数量限制清理可能存在问题") print("清理机制测试完成\n") def main(): """主测试函数""" print("开始测试缓存和持久化机制...\n") try: # 测试持久化机制 test_persistence_mechanism() # 测试缓存机制 test_cache_mechanism() # 测试清理机制 test_cleanup_mechanism() print("所有测试完成!") except Exception as e: print(f"测试过程中发生错误: {str(e)}") import traceback traceback.print_exc() if __name__ == '__main__': main()