hs-video-api/test_cache_persistence.py
2025-06-07 00:28:35 +08:00

311 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
缓存和持久化机制测试脚本
测试新的分层缓存、持久化和清理机制
"""
import os
import sys
import time
import json
from datetime import datetime, timedelta
from unittest.mock import Mock, patch
# 添加项目根目录到路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 设置测试环境变量
os.environ['ARK_API_KEY'] = 'test_api_key_for_testing'
os.environ['ARK_BASE_URL'] = 'https://test.api.com'
from task_queue_manager import TaskQueueManager
from video_service import get_video_service
def test_persistence_mechanism():
"""测试持久化机制"""
print("=== 测试持久化机制 ===")
# 创建测试用的持久化文件
test_persistence_file = "test_persistence.json"
# 清理之前的测试文件
if os.path.exists(test_persistence_file):
os.remove(test_persistence_file)
# 创建mock video service
mock_video_service = Mock()
mock_video_service.get_all_tasks.return_value = []
# 使用patch来替换get_video_service
with patch('task_queue_manager.get_video_service', return_value=mock_video_service):
# 创建队列管理器
queue_manager = TaskQueueManager(
max_running_tasks=2,
update_interval=1,
persistence_file=test_persistence_file
)
# 设置缓存参数
queue_manager.max_completed_cache_size = 10
queue_manager.completed_cache_ttl_hours = 1
# 模拟添加等待任务
waiting_tasks = [
{
'task_id': 'wait_001',
'content': 'test waiting task 1',
'params': {'test': True},
'created_at': datetime.now().isoformat()
},
{
'task_id': 'wait_002',
'content': 'test waiting task 2',
'params': {'test': True},
'created_at': datetime.now().isoformat()
}
]
for task in waiting_tasks:
queue_manager.waiting_queue.append(task)
# 保存持久化数据
queue_manager._save_persistence_data()
print(f"已保存 {len(waiting_tasks)} 个等待任务到持久化文件")
# 验证文件是否存在
if os.path.exists(test_persistence_file):
with open(test_persistence_file, 'r', encoding='utf-8') as f:
saved_data = json.load(f)
print(f"持久化文件内容: {saved_data}")
# 清空内存中的等待队列
queue_manager.waiting_queue.clear()
print("已清空内存中的等待队列")
# 从持久化文件恢复
queue_manager._load_persistence_data()
print(f"从持久化文件恢复了 {len(queue_manager.waiting_queue)} 个等待任务")
# 验证恢复的数据
for i, task in enumerate(queue_manager.waiting_queue):
print(f"恢复的任务 {i+1}: {task['task_id']} - {task['content']}")
# 清理测试文件
if os.path.exists(test_persistence_file):
os.remove(test_persistence_file)
print("已清理测试持久化文件")
print("持久化机制测试完成\n")
def test_cache_mechanism():
"""测试缓存机制"""
print("=== 测试缓存机制 ===")
# 创建mock video service
mock_video_service = Mock()
mock_video_service.get_all_tasks.return_value = []
# 使用patch来替换get_video_service
with patch('task_queue_manager.get_video_service', return_value=mock_video_service):
# 创建队列管理器
queue_manager = TaskQueueManager(
max_running_tasks=3,
update_interval=1
)
# 设置缓存参数
queue_manager.max_completed_cache_size = 5
queue_manager.completed_cache_ttl_hours = 0.003 # 约10秒TTL用于测试
# 模拟运行中任务
running_tasks = [
{
'task_id': 'run_001',
'status': 'running',
'content': 'running task 1',
'params': {'test': True},
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
},
{
'task_id': 'run_002',
'status': 'running',
'content': 'running task 2',
'params': {'test': True},
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
]
# 添加到运行中缓存
for task in running_tasks:
queue_manager.running_tasks_cache[task['task_id']] = task
print(f"添加了 {len(running_tasks)} 个运行中任务到缓存")
# 模拟已完成任务
completed_tasks = [
{
'task_id': 'comp_001',
'status': 'succeeded',
'content': 'completed task 1',
'params': {'test': True},
'created_at': (datetime.now() - timedelta(minutes=5)).isoformat(),
'cache_time': datetime.now().isoformat()
},
{
'task_id': 'comp_002',
'status': 'succeeded',
'content': 'completed task 2',
'params': {'test': True},
'created_at': (datetime.now() - timedelta(minutes=3)).isoformat(),
'cache_time': datetime.now().isoformat()
}
]
# 添加到已完成缓存
for task in completed_tasks:
queue_manager.completed_tasks_cache[task['task_id']] = task
print(f"添加了 {len(completed_tasks)} 个已完成任务到缓存")
# 测试缓存查询
print("\n=== 测试缓存查询 ===")
# 查询运行中任务
task = queue_manager.get_task_from_cache('run_001')
if task:
print(f"从缓存获取运行中任务: {task['task_id']} - {task['status']}")
# 查询已完成任务
task = queue_manager.get_task_from_cache('comp_001')
if task:
print(f"从缓存获取已完成任务: {task['task_id']} - {task['status']}")
# 查询不存在的任务
task = queue_manager.get_task_from_cache('not_exist')
if not task:
print("正确处理了不存在的任务查询")
# 测试缓存状态
status = queue_manager.get_queue_status()
print(f"\n缓存状态:")
print(f"运行中任务数: {status['running_tasks_count']}")
print(f"已完成任务数: {status['completed_tasks_count']}")
print(f"等待队列数: {status['waiting_queue_count']}")
print("缓存机制测试完成\n")
def test_cleanup_mechanism():
"""测试清理机制"""
print("=== 测试清理机制 ===")
# 创建mock video service
mock_video_service = Mock()
mock_video_service.get_all_tasks.return_value = []
# 使用patch来替换get_video_service
with patch('task_queue_manager.get_video_service', return_value=mock_video_service):
# 创建队列管理器设置较短的TTL用于测试
queue_manager = TaskQueueManager(
max_running_tasks=3,
update_interval=1
)
# 设置缓存参数
queue_manager.max_completed_cache_size = 3 # 设置较小的缓存大小
queue_manager.completed_cache_ttl_hours = 0.0006 # 约2秒TTL用于测试
# 添加多个已完成任务
old_tasks = []
for i in range(5):
task = {
'task_id': f'old_task_{i}',
'status': 'succeeded',
'content': f'old task {i}',
'params': {'test': True},
'created_at': (datetime.now() - timedelta(hours=2)).isoformat(),
'cache_time': (datetime.now() - timedelta(hours=1)).isoformat()
}
old_tasks.append(task)
queue_manager.completed_tasks_cache[task['task_id']] = task
print(f"添加了 {len(old_tasks)} 个旧的已完成任务")
# 检查清理前的状态
status_before = queue_manager.get_queue_status()
print(f"清理前已完成任务数: {status_before['completed_tasks_count']}")
# 等待一段时间让任务过期
print("等待任务过期...")
time.sleep(3)
# 手动触发清理
queue_manager._cleanup_completed_tasks()
# 检查清理后的状态
status_after = queue_manager.get_queue_status()
print(f"清理后已完成任务数: {status_after['completed_tasks_count']}")
# 验证清理效果
if status_after['completed_tasks_count'] < status_before['completed_tasks_count']:
print("[OK] 清理机制正常工作")
else:
print("[ERROR] 清理机制可能存在问题")
# 测试数量限制清理
print("\n=== 测试数量限制清理 ===")
# 添加更多任务超过限制
for i in range(5):
task = {
'task_id': f'new_task_{i}',
'status': 'succeeded',
'content': f'new task {i}',
'params': {'test': True},
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
queue_manager.completed_tasks_cache[task['task_id']] = task
print(f"添加了5个新任务当前已完成任务数: {len(queue_manager.completed_tasks_cache)}")
# 触发清理
queue_manager._cleanup_completed_tasks()
final_status = queue_manager.get_queue_status()
print(f"最终已完成任务数: {final_status['completed_tasks_count']}")
print(f"缓存大小限制: {queue_manager.max_completed_cache_size}")
if final_status['completed_tasks_count'] <= queue_manager.max_completed_cache_size:
print("[OK] 数量限制清理正常工作")
else:
print("[ERROR] 数量限制清理可能存在问题")
print("清理机制测试完成\n")
def main():
"""主测试函数"""
print("开始测试缓存和持久化机制...\n")
try:
# 测试持久化机制
test_persistence_mechanism()
# 测试缓存机制
test_cache_mechanism()
# 测试清理机制
test_cleanup_mechanism()
print("所有测试完成!")
except Exception as e:
print(f"测试过程中发生错误: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == '__main__':
main()