hs-video-api/test_robustness.py
2025-06-07 00:28:35 +08:00

555 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
核心逻辑健壮性测试脚本
全面测试TaskQueueManager的各种边界情况、异常处理和并发场景
"""
import os
import sys
import time
import json
import threading
import tempfile
from datetime import datetime, timedelta
from unittest.mock import Mock, patch, MagicMock
from concurrent.futures import ThreadPoolExecutor
import random
# 添加项目根目录到路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 设置测试环境变量
os.environ['ARK_API_KEY'] = 'test_api_key_for_testing'
os.environ['ARK_BASE_URL'] = 'https://test.api.com'
from task_queue_manager import TaskQueueManager
from video_service import get_video_service
class RobustnessTestSuite:
"""健壮性测试套件"""
def __init__(self):
self.test_results = []
self.temp_files = []
def log_test_result(self, test_name, passed, details=""):
"""记录测试结果"""
result = {
'test_name': test_name,
'passed': passed,
'details': details,
'timestamp': datetime.now().isoformat()
}
self.test_results.append(result)
status = "" if passed else ""
print(f"{status} {test_name}: {details}")
def cleanup(self):
"""清理测试文件"""
for temp_file in self.temp_files:
if os.path.exists(temp_file):
try:
os.remove(temp_file)
except:
pass
def test_boundary_conditions(self):
"""测试边界条件"""
print("\n=== 边界条件测试 ===")
# 测试1: 零任务处理
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=1, update_interval=1)
status = manager.get_queue_status()
passed = (status['running_tasks_count'] == 0 and
status['completed_tasks_count'] == 0 and
status['waiting_queue_count'] == 0)
self.log_test_result("零任务处理", passed, f"状态: {status}")
except Exception as e:
self.log_test_result("零任务处理", False, f"异常: {str(e)}")
# 测试2: 最大缓存容量边界
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=1, update_interval=1)
manager.max_completed_cache_size = 2
# 添加超过限制的任务
for i in range(5):
task = {
'task_id': f'boundary_task_{i}',
'status': 'succeeded',
'content': f'boundary task {i}',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
# 触发清理
manager._cleanup_completed_tasks()
final_count = len(manager.completed_tasks_cache)
passed = final_count <= manager.max_completed_cache_size
self.log_test_result("最大缓存容量边界", passed,
f"最终缓存数量: {final_count}, 限制: {manager.max_completed_cache_size}")
except Exception as e:
self.log_test_result("最大缓存容量边界", False, f"异常: {str(e)}")
# 测试3: 极短TTL测试
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=1, update_interval=1)
manager.completed_cache_ttl_hours = 0.0001 # 约0.36秒
# 添加任务
task = {
'task_id': 'ttl_test_task',
'status': 'succeeded',
'content': 'ttl test task',
'created_at': datetime.now().isoformat(),
'cache_time': (datetime.now() - timedelta(seconds=1)).isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
# 等待过期
time.sleep(0.5)
manager._cleanup_completed_tasks()
passed = len(manager.completed_tasks_cache) == 0
self.log_test_result("极短TTL测试", passed,
f"清理后缓存数量: {len(manager.completed_tasks_cache)}")
except Exception as e:
self.log_test_result("极短TTL测试", False, f"异常: {str(e)}")
def test_exception_handling(self):
"""测试异常处理"""
print("\n=== 异常处理测试 ===")
# 测试1: 持久化文件权限错误
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
# 创建一个无效的文件路径
invalid_path = "/invalid/path/persistence.json"
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(
max_running_tasks=1,
update_interval=1,
persistence_file=invalid_path
)
# 尝试保存数据(应该优雅处理错误)
manager.waiting_queue.append({'task_id': 'test', 'content': 'test'})
manager._save_persistence_data() # 不应该崩溃
self.log_test_result("持久化文件权限错误", True, "优雅处理了文件权限错误")
except Exception as e:
self.log_test_result("持久化文件权限错误", False, f"未能优雅处理: {str(e)}")
# 测试2: 损坏的持久化文件
try:
temp_file = tempfile.mktemp(suffix='.json')
self.temp_files.append(temp_file)
# 创建损坏的JSON文件
with open(temp_file, 'w') as f:
f.write('{invalid json content')
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(
max_running_tasks=1,
update_interval=1,
persistence_file=temp_file
)
# 尝试加载损坏的文件(应该优雅处理)
manager._load_persistence_data() # 不应该崩溃
self.log_test_result("损坏的持久化文件", True, "优雅处理了损坏的JSON文件")
except Exception as e:
self.log_test_result("损坏的持久化文件", False, f"未能优雅处理: {str(e)}")
# 测试3: API服务异常
try:
mock_service = Mock()
mock_service.get_all_tasks.side_effect = Exception("API服务不可用")
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=1, update_interval=1)
# 尝试更新状态应该优雅处理API异常
manager._update_task_statuses() # 不应该崩溃
self.log_test_result("API服务异常", True, "优雅处理了API服务异常")
except Exception as e:
self.log_test_result("API服务异常", False, f"未能优雅处理: {str(e)}")
def test_concurrent_operations(self):
"""测试并发操作"""
print("\n=== 并发操作测试 ===")
# 测试1: 并发缓存操作
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=10, update_interval=1)
manager.max_completed_cache_size = 100
def add_tasks(thread_id, count):
"""并发添加任务"""
for i in range(count):
task = {
'task_id': f'concurrent_task_{thread_id}_{i}',
'status': 'succeeded',
'content': f'concurrent task {thread_id}_{i}',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
time.sleep(0.001) # 模拟处理时间
# 启动多个线程并发添加任务
threads = []
for i in range(5):
thread = threading.Thread(target=add_tasks, args=(i, 10))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
final_count = len(manager.completed_tasks_cache)
passed = final_count == 50 # 5个线程 × 10个任务
self.log_test_result("并发缓存操作", passed,
f"预期50个任务实际{final_count}个任务")
except Exception as e:
self.log_test_result("并发缓存操作", False, f"异常: {str(e)}")
# 测试2: 并发清理操作
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=1, update_interval=1)
manager.max_completed_cache_size = 10
# 添加大量任务
for i in range(50):
task = {
'task_id': f'cleanup_task_{i}',
'status': 'succeeded',
'content': f'cleanup task {i}',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
# 并发执行清理
def cleanup_worker():
manager._cleanup_completed_tasks()
threads = []
for i in range(3):
thread = threading.Thread(target=cleanup_worker)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
final_count = len(manager.completed_tasks_cache)
passed = final_count <= manager.max_completed_cache_size
self.log_test_result("并发清理操作", passed,
f"清理后{final_count}个任务,限制{manager.max_completed_cache_size}")
except Exception as e:
self.log_test_result("并发清理操作", False, f"异常: {str(e)}")
def test_data_integrity(self):
"""测试数据完整性"""
print("\n=== 数据完整性测试 ===")
# 测试1: 任务ID唯一性
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=1, update_interval=1)
# 添加重复ID的任务
task1 = {
'task_id': 'duplicate_id',
'status': 'succeeded',
'content': 'task 1',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
task2 = {
'task_id': 'duplicate_id',
'status': 'succeeded',
'content': 'task 2',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task1['task_id']] = task1
manager.completed_tasks_cache[task2['task_id']] = task2
# 检查是否只保留了一个任务(后者覆盖前者)
cached_task = manager.completed_tasks_cache.get('duplicate_id')
passed = (len(manager.completed_tasks_cache) == 1 and
cached_task['content'] == 'task 2')
self.log_test_result("任务ID唯一性", passed,
f"缓存中任务数: {len(manager.completed_tasks_cache)}")
except Exception as e:
self.log_test_result("任务ID唯一性", False, f"异常: {str(e)}")
# 测试2: 持久化数据一致性
try:
temp_file = tempfile.mktemp(suffix='.json')
self.temp_files.append(temp_file)
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
# 第一个管理器实例
manager1 = TaskQueueManager(
max_running_tasks=1,
update_interval=1,
persistence_file=temp_file
)
# 添加等待任务
original_tasks = [
{'task_id': 'persist_1', 'content': 'test 1'},
{'task_id': 'persist_2', 'content': 'test 2'}
]
for task in original_tasks:
manager1.waiting_queue.append(task)
manager1._save_persistence_data()
# 第二个管理器实例加载数据
manager2 = TaskQueueManager(
max_running_tasks=1,
update_interval=1,
persistence_file=temp_file
)
manager2._load_persistence_data()
# 检查数据一致性
loaded_tasks = manager2.waiting_queue
passed = (len(loaded_tasks) == len(original_tasks) and
all(task['task_id'] in [t['task_id'] for t in original_tasks]
for task in loaded_tasks))
self.log_test_result("持久化数据一致性", passed,
f"原始{len(original_tasks)}个,加载{len(loaded_tasks)}")
except Exception as e:
self.log_test_result("持久化数据一致性", False, f"异常: {str(e)}")
def test_performance_stress(self):
"""测试性能压力"""
print("\n=== 性能压力测试 ===")
# 测试1: 大量任务处理性能
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=1, update_interval=1)
manager.max_completed_cache_size = 1000
start_time = time.time()
# 添加大量任务
for i in range(500):
task = {
'task_id': f'perf_task_{i}',
'status': 'succeeded',
'content': f'performance task {i}',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
add_time = time.time() - start_time
# 测试查询性能
query_start = time.time()
for i in range(100):
task_id = f'perf_task_{random.randint(0, 499)}'
manager.get_task_by_id(task_id)
query_time = time.time() - query_start
# 测试清理性能
cleanup_start = time.time()
manager._cleanup_completed_tasks()
cleanup_time = time.time() - cleanup_start
passed = (add_time < 5.0 and query_time < 1.0 and cleanup_time < 2.0)
self.log_test_result("大量任务处理性能", passed,
f"添加:{add_time:.2f}s, 查询:{query_time:.2f}s, 清理:{cleanup_time:.2f}s")
except Exception as e:
self.log_test_result("大量任务处理性能", False, f"异常: {str(e)}")
def test_memory_management(self):
"""测试内存管理"""
print("\n=== 内存管理测试 ===")
# 测试1: 内存泄漏检测
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=1, update_interval=1)
manager.max_completed_cache_size = 10
# 循环添加和清理任务
for cycle in range(10):
# 添加任务
for i in range(20):
task = {
'task_id': f'memory_task_{cycle}_{i}',
'status': 'succeeded',
'content': f'memory test task {cycle}_{i}',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
# 清理
manager._cleanup_completed_tasks()
# 检查最终内存使用
final_cache_size = len(manager.completed_tasks_cache)
passed = final_cache_size <= manager.max_completed_cache_size
self.log_test_result("内存泄漏检测", passed,
f"最终缓存大小: {final_cache_size}")
except Exception as e:
self.log_test_result("内存泄漏检测", False, f"异常: {str(e)}")
def run_all_tests(self):
"""运行所有测试"""
print("开始核心逻辑健壮性测试...\n")
try:
self.test_boundary_conditions()
self.test_exception_handling()
self.test_concurrent_operations()
self.test_data_integrity()
self.test_performance_stress()
self.test_memory_management()
finally:
self.cleanup()
return self.generate_assessment()
def generate_assessment(self):
"""生成评估报告"""
print("\n" + "="*50)
print("核心逻辑健壮性评估报告")
print("="*50)
total_tests = len(self.test_results)
passed_tests = sum(1 for result in self.test_results if result['passed'])
failed_tests = total_tests - passed_tests
success_rate = (passed_tests / total_tests * 100) if total_tests > 0 else 0
print(f"\n📊 测试统计:")
print(f" 总测试数: {total_tests}")
print(f" 通过测试: {passed_tests}")
print(f" 失败测试: {failed_tests}")
print(f" 成功率: {success_rate:.1f}%")
print(f"\n📋 详细结果:")
for result in self.test_results:
status = "" if result['passed'] else ""
print(f" {status} {result['test_name']}: {result['details']}")
# 健壮性评级
if success_rate >= 95:
grade = "A+ (优秀)"
assessment = "系统具有极高的健壮性,能够很好地处理各种边界情况和异常场景。"
elif success_rate >= 85:
grade = "A (良好)"
assessment = "系统健壮性良好,大部分场景下表现稳定,少数边界情况需要优化。"
elif success_rate >= 70:
grade = "B (一般)"
assessment = "系统基本健壮,但在某些异常处理和边界情况下存在问题,需要改进。"
elif success_rate >= 50:
grade = "C (较差)"
assessment = "系统健壮性较差,存在较多问题,需要重点优化异常处理和边界情况。"
else:
grade = "D (差)"
assessment = "系统健壮性差,存在严重问题,需要全面重构和优化。"
print(f"\n🎯 健壮性评级: {grade}")
print(f"\n📝 评估结论:")
print(f" {assessment}")
# 改进建议
print(f"\n💡 改进建议:")
if failed_tests > 0:
print(" 1. 重点关注失败的测试用例,分析根本原因")
print(" 2. 加强异常处理机制,确保系统在各种异常情况下的稳定性")
print(" 3. 优化并发控制,防止竞态条件和数据不一致")
print(" 4. 完善边界条件处理,确保极端情况下的正确行为")
else:
print(" 1. 继续保持当前的高质量代码标准")
print(" 2. 定期进行健壮性测试,确保新功能不影响系统稳定性")
print(" 3. 考虑增加更多的压力测试和性能监控")
return {
'total_tests': total_tests,
'passed_tests': passed_tests,
'failed_tests': failed_tests,
'success_rate': success_rate,
'grade': grade,
'assessment': assessment,
'test_results': self.test_results
}
def main():
"""主函数"""
test_suite = RobustnessTestSuite()
assessment = test_suite.run_all_tests()
return assessment
if __name__ == "__main__":
main()