# -*- coding: utf-8 -*- """ 压力测试脚本 测试TaskQueueManager在高负载、长时间运行等极端条件下的性能和稳定性 """ import os import sys import time import threading import multiprocessing from datetime import datetime, timedelta from unittest.mock import Mock, patch from concurrent.futures import ThreadPoolExecutor, as_completed import random import gc import tracemalloc # 添加项目根目录到路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) # 设置测试环境变量 os.environ['ARK_API_KEY'] = 'test_api_key_for_testing' os.environ['ARK_BASE_URL'] = 'https://test.api.com' from task_queue_manager import TaskQueueManager class StressTestSuite: """压力测试套件""" def __init__(self): self.results = [] self.start_memory = None self.peak_memory = 0 def log_result(self, test_name, success, details, metrics=None): """记录测试结果""" result = { 'test_name': test_name, 'success': success, 'details': details, 'metrics': metrics or {}, 'timestamp': datetime.now().isoformat() } self.results.append(result) status = "✓" if success else "✗" print(f"{status} {test_name}: {details}") if metrics: for key, value in metrics.items(): print(f" {key}: {value}") def get_memory_usage(self): """获取当前内存使用量(MB)""" try: # 使用tracemalloc获取内存使用情况 current, peak = tracemalloc.get_traced_memory() return current / 1024 / 1024 # 转换为MB except: # 如果tracemalloc未启动,返回一个估算值 return len(gc.get_objects()) * 0.001 # 粗略估算 def test_high_volume_operations(self): """测试高容量操作""" print("\n=== 高容量操作测试 ===") try: mock_service = Mock() mock_service.get_all_tasks.return_value = [] with patch('task_queue_manager.get_video_service', return_value=mock_service): manager = TaskQueueManager(max_running_tasks=100, update_interval=1) manager.max_completed_cache_size = 10000 start_time = time.time() start_memory = self.get_memory_usage() # 添加大量任务 task_count = 5000 for i in range(task_count): task = { 'task_id': f'volume_task_{i}', 'status': random.choice(['succeeded', 'failed', 'running']), 'content': f'high volume task {i}' * 10, # 增加数据量 'params': {'index': i, 'data': list(range(100))}, 'created_at': datetime.now().isoformat(), 'cache_time': datetime.now().isoformat() } if task['status'] == 'running': manager.running_tasks_cache[task['task_id']] = task else: manager.completed_tasks_cache[task['task_id']] = task add_time = time.time() - start_time peak_memory = self.get_memory_usage() memory_increase = peak_memory - start_memory # 测试查询性能 query_start = time.time() query_count = 1000 for _ in range(query_count): task_id = f'volume_task_{random.randint(0, task_count-1)}' manager.get_task_by_id(task_id) query_time = time.time() - query_start avg_query_time = query_time / query_count * 1000 # ms # 测试清理性能 cleanup_start = time.time() manager._cleanup_completed_tasks() cleanup_time = time.time() - cleanup_start final_memory = self.get_memory_usage() success = (add_time < 30 and avg_query_time < 1 and cleanup_time < 10) metrics = { '任务数量': task_count, '添加耗时': f'{add_time:.2f}s', '平均查询时间': f'{avg_query_time:.3f}ms', '清理耗时': f'{cleanup_time:.2f}s', '内存增长': f'{memory_increase:.1f}MB', '最终内存': f'{final_memory:.1f}MB' } self.log_result("高容量操作", success, f"处理{task_count}个任务", metrics) except Exception as e: self.log_result("高容量操作", False, f"异常: {str(e)}") def test_concurrent_stress(self): """测试并发压力""" print("\n=== 并发压力测试 ===") try: mock_service = Mock() mock_service.get_all_tasks.return_value = [] with patch('task_queue_manager.get_video_service', return_value=mock_service): manager = TaskQueueManager(max_running_tasks=50, update_interval=1) manager.max_completed_cache_size = 5000 start_time = time.time() start_memory = self.get_memory_usage() def worker_thread(worker_id, operations_per_worker): """工作线程""" operations = 0 errors = 0 for i in range(operations_per_worker): try: # 随机操作 operation = random.choice(['add', 'query', 'cleanup']) if operation == 'add': task = { 'task_id': f'stress_task_{worker_id}_{i}', 'status': random.choice(['succeeded', 'failed']), 'content': f'stress task {worker_id}_{i}', 'created_at': datetime.now().isoformat(), 'cache_time': datetime.now().isoformat() } manager.completed_tasks_cache[task['task_id']] = task elif operation == 'query': # 查询随机任务 task_id = f'stress_task_{random.randint(0, 9)}_{random.randint(0, 99)}' manager.get_task_by_id(task_id) elif operation == 'cleanup': manager._cleanup_completed_tasks() operations += 1 except Exception: errors += 1 # 随机延迟 time.sleep(random.uniform(0.001, 0.01)) return {'operations': operations, 'errors': errors} # 启动多个工作线程 thread_count = 20 operations_per_worker = 100 with ThreadPoolExecutor(max_workers=thread_count) as executor: futures = [] for i in range(thread_count): future = executor.submit(worker_thread, i, operations_per_worker) futures.append(future) # 收集结果 total_operations = 0 total_errors = 0 for future in as_completed(futures): result = future.result() total_operations += result['operations'] total_errors += result['errors'] total_time = time.time() - start_time peak_memory = self.get_memory_usage() memory_increase = peak_memory - start_memory operations_per_second = total_operations / total_time error_rate = (total_errors / total_operations * 100) if total_operations > 0 else 0 success = (error_rate < 5 and operations_per_second > 100) metrics = { '并发线程数': thread_count, '总操作数': total_operations, '错误数': total_errors, '错误率': f'{error_rate:.2f}%', '操作/秒': f'{operations_per_second:.1f}', '总耗时': f'{total_time:.2f}s', '内存增长': f'{memory_increase:.1f}MB' } self.log_result("并发压力", success, f"{thread_count}个线程并发操作", metrics) except Exception as e: self.log_result("并发压力", False, f"异常: {str(e)}") def test_long_running_stability(self): """测试长时间运行稳定性""" print("\n=== 长时间运行稳定性测试 ===") try: mock_service = Mock() mock_service.get_all_tasks.return_value = [] with patch('task_queue_manager.get_video_service', return_value=mock_service): manager = TaskQueueManager(max_running_tasks=10, update_interval=1) manager.max_completed_cache_size = 100 manager.completed_cache_ttl_hours = 0.01 # 36秒TTL start_time = time.time() start_memory = self.get_memory_usage() # 模拟长时间运行(压缩到30秒) test_duration = 30 # 秒 cycle_count = 0 max_memory = start_memory while time.time() - start_time < test_duration: cycle_count += 1 # 添加一批任务 for i in range(20): task = { 'task_id': f'longrun_task_{cycle_count}_{i}', 'status': 'succeeded', 'content': f'long running task {cycle_count}_{i}', 'created_at': datetime.now().isoformat(), 'cache_time': datetime.now().isoformat() } manager.completed_tasks_cache[task['task_id']] = task # 随机查询 for _ in range(10): task_id = f'longrun_task_{random.randint(max(1, cycle_count-5), cycle_count)}_{random.randint(0, 19)}' manager.get_task_by_id(task_id) # 定期清理 if cycle_count % 5 == 0: manager._cleanup_completed_tasks() gc.collect() # 强制垃圾回收 # 监控内存 current_memory = self.get_memory_usage() max_memory = max(max_memory, current_memory) time.sleep(0.5) total_time = time.time() - start_time final_memory = self.get_memory_usage() memory_increase = final_memory - start_memory peak_memory_increase = max_memory - start_memory # 检查内存是否稳定(没有持续增长) memory_stable = memory_increase < peak_memory_increase * 0.8 cache_size_reasonable = len(manager.completed_tasks_cache) <= manager.max_completed_cache_size success = memory_stable and cache_size_reasonable metrics = { '运行时长': f'{total_time:.1f}s', '循环次数': cycle_count, '最终缓存大小': len(manager.completed_tasks_cache), '缓存限制': manager.max_completed_cache_size, '内存增长': f'{memory_increase:.1f}MB', '峰值内存增长': f'{peak_memory_increase:.1f}MB', '内存稳定': '是' if memory_stable else '否' } self.log_result("长时间运行稳定性", success, f"运行{total_time:.1f}秒,{cycle_count}个周期", metrics) except Exception as e: self.log_result("长时间运行稳定性", False, f"异常: {str(e)}") def test_memory_pressure(self): """测试内存压力""" print("\n=== 内存压力测试 ===") try: mock_service = Mock() mock_service.get_all_tasks.return_value = [] with patch('task_queue_manager.get_video_service', return_value=mock_service): manager = TaskQueueManager(max_running_tasks=10, update_interval=1) manager.max_completed_cache_size = 1000 start_memory = self.get_memory_usage() # 创建大量大对象任务 large_data = 'x' * 10000 # 10KB字符串 task_count = 2000 start_time = time.time() for i in range(task_count): task = { 'task_id': f'memory_task_{i}', 'status': 'succeeded', 'content': large_data, 'params': { 'large_list': list(range(1000)), 'large_dict': {f'key_{j}': f'value_{j}' * 100 for j in range(100)} }, 'created_at': datetime.now().isoformat(), 'cache_time': datetime.now().isoformat() } manager.completed_tasks_cache[task['task_id']] = task # 每100个任务检查一次内存 if i % 100 == 0: current_memory = self.get_memory_usage() if current_memory - start_memory > 500: # 超过500MB增长 # 触发清理 manager._cleanup_completed_tasks() gc.collect() add_time = time.time() - start_time peak_memory = self.get_memory_usage() # 强制清理 cleanup_start = time.time() manager._cleanup_completed_tasks() gc.collect() cleanup_time = time.time() - cleanup_start final_memory = self.get_memory_usage() memory_increase = peak_memory - start_memory memory_recovered = peak_memory - final_memory # 检查内存是否得到有效管理 memory_managed = memory_increase < 300 # 增长不超过300MB cleanup_effective = memory_recovered > memory_increase * 0.5 # 清理回收超过50% success = memory_managed and cleanup_effective metrics = { '任务数量': task_count, '添加耗时': f'{add_time:.2f}s', '清理耗时': f'{cleanup_time:.2f}s', '内存增长': f'{memory_increase:.1f}MB', '内存回收': f'{memory_recovered:.1f}MB', '最终缓存大小': len(manager.completed_tasks_cache) } self.log_result("内存压力", success, f"处理{task_count}个大对象任务", metrics) except Exception as e: self.log_result("内存压力", False, f"异常: {str(e)}") def test_extreme_cleanup_scenarios(self): """测试极端清理场景""" print("\n=== 极端清理场景测试 ===") try: mock_service = Mock() mock_service.get_all_tasks.return_value = [] with patch('task_queue_manager.get_video_service', return_value=mock_service): manager = TaskQueueManager(max_running_tasks=10, update_interval=1) manager.max_completed_cache_size = 50 manager.completed_cache_ttl_hours = 0.001 # 3.6秒TTL start_time = time.time() # 场景1: 大量过期任务 old_time = datetime.now() - timedelta(hours=1) for i in range(200): task = { 'task_id': f'old_task_{i}', 'status': 'succeeded', 'content': f'old task {i}', 'created_at': old_time.isoformat(), 'cache_time': old_time.isoformat() } manager.completed_tasks_cache[task['task_id']] = task # 场景2: 混合新旧任务 for i in range(100): task = { 'task_id': f'new_task_{i}', 'status': 'succeeded', 'content': f'new task {i}', 'created_at': datetime.now().isoformat(), 'cache_time': datetime.now().isoformat() } manager.completed_tasks_cache[task['task_id']] = task before_cleanup = len(manager.completed_tasks_cache) # 执行清理 cleanup_start = time.time() manager._cleanup_completed_tasks() cleanup_time = time.time() - cleanup_start after_cleanup = len(manager.completed_tasks_cache) # 验证清理效果 cleanup_effective = after_cleanup <= manager.max_completed_cache_size cleanup_fast = cleanup_time < 5.0 old_tasks_removed = after_cleanup < before_cleanup success = cleanup_effective and cleanup_fast and old_tasks_removed metrics = { '清理前任务数': before_cleanup, '清理后任务数': after_cleanup, '清理耗时': f'{cleanup_time:.3f}s', '清理比例': f'{(before_cleanup - after_cleanup) / before_cleanup * 100:.1f}%' } self.log_result("极端清理场景", success, f"从{before_cleanup}个任务清理到{after_cleanup}个", metrics) except Exception as e: self.log_result("极端清理场景", False, f"异常: {str(e)}") def run_stress_tests(self): """运行所有压力测试""" print("开始压力测试...\n") # 启动内存跟踪 tracemalloc.start() self.start_memory = self.get_memory_usage() print(f"初始内存使用: {self.start_memory:.1f}MB\n") self.test_high_volume_operations() self.test_concurrent_stress() self.test_long_running_stability() self.test_memory_pressure() self.test_extreme_cleanup_scenarios() return self.generate_stress_report() def generate_stress_report(self): """生成压力测试报告""" print("\n" + "="*60) print("压力测试报告") print("="*60) total_tests = len(self.results) passed_tests = sum(1 for r in self.results if r['success']) failed_tests = total_tests - passed_tests success_rate = (passed_tests / total_tests * 100) if total_tests > 0 else 0 print(f"\n📊 测试统计:") print(f" 总测试数: {total_tests}") print(f" 通过测试: {passed_tests}") print(f" 失败测试: {failed_tests}") print(f" 成功率: {success_rate:.1f}%") print(f"\n📋 详细结果:") for result in self.results: status = "✓" if result['success'] else "✗" print(f" {status} {result['test_name']}: {result['details']}") if result['metrics']: for key, value in result['metrics'].items(): print(f" {key}: {value}") # 性能评级 if success_rate >= 90: grade = "优秀" assessment = "系统在高负载下表现优秀,具有良好的性能和稳定性。" elif success_rate >= 75: grade = "良好" assessment = "系统在压力测试中表现良好,但在某些极端情况下可能需要优化。" elif success_rate >= 60: grade = "一般" assessment = "系统基本能够处理压力场景,但存在性能瓶颈,需要优化。" else: grade = "较差" assessment = "系统在高负载下表现不佳,存在严重的性能或稳定性问题。" print(f"\n🎯 性能评级: {grade}") print(f"\n📝 评估结论: {assessment}") final_memory = self.get_memory_usage() memory_change = final_memory - self.start_memory print(f"\n💾 内存使用: 初始{self.start_memory:.1f}MB → 最终{final_memory:.1f}MB (变化{memory_change:+.1f}MB)") return { 'total_tests': total_tests, 'passed_tests': passed_tests, 'success_rate': success_rate, 'grade': grade, 'assessment': assessment, 'memory_change': memory_change } def main(): """主函数""" test_suite = StressTestSuite() report = test_suite.run_stress_tests() return report if __name__ == "__main__": main()