hs-video-api/test_stress.py
2025-06-07 00:28:35 +08:00

532 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
压力测试脚本
测试TaskQueueManager在高负载、长时间运行等极端条件下的性能和稳定性
"""
import os
import sys
import time
import threading
import multiprocessing
from datetime import datetime, timedelta
from unittest.mock import Mock, patch
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
import gc
import tracemalloc
# 添加项目根目录到路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 设置测试环境变量
os.environ['ARK_API_KEY'] = 'test_api_key_for_testing'
os.environ['ARK_BASE_URL'] = 'https://test.api.com'
from task_queue_manager import TaskQueueManager
class StressTestSuite:
"""压力测试套件"""
def __init__(self):
self.results = []
self.start_memory = None
self.peak_memory = 0
def log_result(self, test_name, success, details, metrics=None):
"""记录测试结果"""
result = {
'test_name': test_name,
'success': success,
'details': details,
'metrics': metrics or {},
'timestamp': datetime.now().isoformat()
}
self.results.append(result)
status = "" if success else ""
print(f"{status} {test_name}: {details}")
if metrics:
for key, value in metrics.items():
print(f" {key}: {value}")
def get_memory_usage(self):
"""获取当前内存使用量(MB)"""
try:
# 使用tracemalloc获取内存使用情况
current, peak = tracemalloc.get_traced_memory()
return current / 1024 / 1024 # 转换为MB
except:
# 如果tracemalloc未启动返回一个估算值
return len(gc.get_objects()) * 0.001 # 粗略估算
def test_high_volume_operations(self):
"""测试高容量操作"""
print("\n=== 高容量操作测试 ===")
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=100, update_interval=1)
manager.max_completed_cache_size = 10000
start_time = time.time()
start_memory = self.get_memory_usage()
# 添加大量任务
task_count = 5000
for i in range(task_count):
task = {
'task_id': f'volume_task_{i}',
'status': random.choice(['succeeded', 'failed', 'running']),
'content': f'high volume task {i}' * 10, # 增加数据量
'params': {'index': i, 'data': list(range(100))},
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
if task['status'] == 'running':
manager.running_tasks_cache[task['task_id']] = task
else:
manager.completed_tasks_cache[task['task_id']] = task
add_time = time.time() - start_time
peak_memory = self.get_memory_usage()
memory_increase = peak_memory - start_memory
# 测试查询性能
query_start = time.time()
query_count = 1000
for _ in range(query_count):
task_id = f'volume_task_{random.randint(0, task_count-1)}'
manager.get_task_by_id(task_id)
query_time = time.time() - query_start
avg_query_time = query_time / query_count * 1000 # ms
# 测试清理性能
cleanup_start = time.time()
manager._cleanup_completed_tasks()
cleanup_time = time.time() - cleanup_start
final_memory = self.get_memory_usage()
success = (add_time < 30 and avg_query_time < 1 and cleanup_time < 10)
metrics = {
'任务数量': task_count,
'添加耗时': f'{add_time:.2f}s',
'平均查询时间': f'{avg_query_time:.3f}ms',
'清理耗时': f'{cleanup_time:.2f}s',
'内存增长': f'{memory_increase:.1f}MB',
'最终内存': f'{final_memory:.1f}MB'
}
self.log_result("高容量操作", success,
f"处理{task_count}个任务", metrics)
except Exception as e:
self.log_result("高容量操作", False, f"异常: {str(e)}")
def test_concurrent_stress(self):
"""测试并发压力"""
print("\n=== 并发压力测试 ===")
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=50, update_interval=1)
manager.max_completed_cache_size = 5000
start_time = time.time()
start_memory = self.get_memory_usage()
def worker_thread(worker_id, operations_per_worker):
"""工作线程"""
operations = 0
errors = 0
for i in range(operations_per_worker):
try:
# 随机操作
operation = random.choice(['add', 'query', 'cleanup'])
if operation == 'add':
task = {
'task_id': f'stress_task_{worker_id}_{i}',
'status': random.choice(['succeeded', 'failed']),
'content': f'stress task {worker_id}_{i}',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
elif operation == 'query':
# 查询随机任务
task_id = f'stress_task_{random.randint(0, 9)}_{random.randint(0, 99)}'
manager.get_task_by_id(task_id)
elif operation == 'cleanup':
manager._cleanup_completed_tasks()
operations += 1
except Exception:
errors += 1
# 随机延迟
time.sleep(random.uniform(0.001, 0.01))
return {'operations': operations, 'errors': errors}
# 启动多个工作线程
thread_count = 20
operations_per_worker = 100
with ThreadPoolExecutor(max_workers=thread_count) as executor:
futures = []
for i in range(thread_count):
future = executor.submit(worker_thread, i, operations_per_worker)
futures.append(future)
# 收集结果
total_operations = 0
total_errors = 0
for future in as_completed(futures):
result = future.result()
total_operations += result['operations']
total_errors += result['errors']
total_time = time.time() - start_time
peak_memory = self.get_memory_usage()
memory_increase = peak_memory - start_memory
operations_per_second = total_operations / total_time
error_rate = (total_errors / total_operations * 100) if total_operations > 0 else 0
success = (error_rate < 5 and operations_per_second > 100)
metrics = {
'并发线程数': thread_count,
'总操作数': total_operations,
'错误数': total_errors,
'错误率': f'{error_rate:.2f}%',
'操作/秒': f'{operations_per_second:.1f}',
'总耗时': f'{total_time:.2f}s',
'内存增长': f'{memory_increase:.1f}MB'
}
self.log_result("并发压力", success,
f"{thread_count}个线程并发操作", metrics)
except Exception as e:
self.log_result("并发压力", False, f"异常: {str(e)}")
def test_long_running_stability(self):
"""测试长时间运行稳定性"""
print("\n=== 长时间运行稳定性测试 ===")
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=10, update_interval=1)
manager.max_completed_cache_size = 100
manager.completed_cache_ttl_hours = 0.01 # 36秒TTL
start_time = time.time()
start_memory = self.get_memory_usage()
# 模拟长时间运行压缩到30秒
test_duration = 30 # 秒
cycle_count = 0
max_memory = start_memory
while time.time() - start_time < test_duration:
cycle_count += 1
# 添加一批任务
for i in range(20):
task = {
'task_id': f'longrun_task_{cycle_count}_{i}',
'status': 'succeeded',
'content': f'long running task {cycle_count}_{i}',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
# 随机查询
for _ in range(10):
task_id = f'longrun_task_{random.randint(max(1, cycle_count-5), cycle_count)}_{random.randint(0, 19)}'
manager.get_task_by_id(task_id)
# 定期清理
if cycle_count % 5 == 0:
manager._cleanup_completed_tasks()
gc.collect() # 强制垃圾回收
# 监控内存
current_memory = self.get_memory_usage()
max_memory = max(max_memory, current_memory)
time.sleep(0.5)
total_time = time.time() - start_time
final_memory = self.get_memory_usage()
memory_increase = final_memory - start_memory
peak_memory_increase = max_memory - start_memory
# 检查内存是否稳定(没有持续增长)
memory_stable = memory_increase < peak_memory_increase * 0.8
cache_size_reasonable = len(manager.completed_tasks_cache) <= manager.max_completed_cache_size
success = memory_stable and cache_size_reasonable
metrics = {
'运行时长': f'{total_time:.1f}s',
'循环次数': cycle_count,
'最终缓存大小': len(manager.completed_tasks_cache),
'缓存限制': manager.max_completed_cache_size,
'内存增长': f'{memory_increase:.1f}MB',
'峰值内存增长': f'{peak_memory_increase:.1f}MB',
'内存稳定': '' if memory_stable else ''
}
self.log_result("长时间运行稳定性", success,
f"运行{total_time:.1f}秒,{cycle_count}个周期", metrics)
except Exception as e:
self.log_result("长时间运行稳定性", False, f"异常: {str(e)}")
def test_memory_pressure(self):
"""测试内存压力"""
print("\n=== 内存压力测试 ===")
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=10, update_interval=1)
manager.max_completed_cache_size = 1000
start_memory = self.get_memory_usage()
# 创建大量大对象任务
large_data = 'x' * 10000 # 10KB字符串
task_count = 2000
start_time = time.time()
for i in range(task_count):
task = {
'task_id': f'memory_task_{i}',
'status': 'succeeded',
'content': large_data,
'params': {
'large_list': list(range(1000)),
'large_dict': {f'key_{j}': f'value_{j}' * 100 for j in range(100)}
},
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
# 每100个任务检查一次内存
if i % 100 == 0:
current_memory = self.get_memory_usage()
if current_memory - start_memory > 500: # 超过500MB增长
# 触发清理
manager._cleanup_completed_tasks()
gc.collect()
add_time = time.time() - start_time
peak_memory = self.get_memory_usage()
# 强制清理
cleanup_start = time.time()
manager._cleanup_completed_tasks()
gc.collect()
cleanup_time = time.time() - cleanup_start
final_memory = self.get_memory_usage()
memory_increase = peak_memory - start_memory
memory_recovered = peak_memory - final_memory
# 检查内存是否得到有效管理
memory_managed = memory_increase < 300 # 增长不超过300MB
cleanup_effective = memory_recovered > memory_increase * 0.5 # 清理回收超过50%
success = memory_managed and cleanup_effective
metrics = {
'任务数量': task_count,
'添加耗时': f'{add_time:.2f}s',
'清理耗时': f'{cleanup_time:.2f}s',
'内存增长': f'{memory_increase:.1f}MB',
'内存回收': f'{memory_recovered:.1f}MB',
'最终缓存大小': len(manager.completed_tasks_cache)
}
self.log_result("内存压力", success,
f"处理{task_count}个大对象任务", metrics)
except Exception as e:
self.log_result("内存压力", False, f"异常: {str(e)}")
def test_extreme_cleanup_scenarios(self):
"""测试极端清理场景"""
print("\n=== 极端清理场景测试 ===")
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=10, update_interval=1)
manager.max_completed_cache_size = 50
manager.completed_cache_ttl_hours = 0.001 # 3.6秒TTL
start_time = time.time()
# 场景1: 大量过期任务
old_time = datetime.now() - timedelta(hours=1)
for i in range(200):
task = {
'task_id': f'old_task_{i}',
'status': 'succeeded',
'content': f'old task {i}',
'created_at': old_time.isoformat(),
'cache_time': old_time.isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
# 场景2: 混合新旧任务
for i in range(100):
task = {
'task_id': f'new_task_{i}',
'status': 'succeeded',
'content': f'new task {i}',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
before_cleanup = len(manager.completed_tasks_cache)
# 执行清理
cleanup_start = time.time()
manager._cleanup_completed_tasks()
cleanup_time = time.time() - cleanup_start
after_cleanup = len(manager.completed_tasks_cache)
# 验证清理效果
cleanup_effective = after_cleanup <= manager.max_completed_cache_size
cleanup_fast = cleanup_time < 5.0
old_tasks_removed = after_cleanup < before_cleanup
success = cleanup_effective and cleanup_fast and old_tasks_removed
metrics = {
'清理前任务数': before_cleanup,
'清理后任务数': after_cleanup,
'清理耗时': f'{cleanup_time:.3f}s',
'清理比例': f'{(before_cleanup - after_cleanup) / before_cleanup * 100:.1f}%'
}
self.log_result("极端清理场景", success,
f"{before_cleanup}个任务清理到{after_cleanup}", metrics)
except Exception as e:
self.log_result("极端清理场景", False, f"异常: {str(e)}")
def run_stress_tests(self):
"""运行所有压力测试"""
print("开始压力测试...\n")
# 启动内存跟踪
tracemalloc.start()
self.start_memory = self.get_memory_usage()
print(f"初始内存使用: {self.start_memory:.1f}MB\n")
self.test_high_volume_operations()
self.test_concurrent_stress()
self.test_long_running_stability()
self.test_memory_pressure()
self.test_extreme_cleanup_scenarios()
return self.generate_stress_report()
def generate_stress_report(self):
"""生成压力测试报告"""
print("\n" + "="*60)
print("压力测试报告")
print("="*60)
total_tests = len(self.results)
passed_tests = sum(1 for r in self.results if r['success'])
failed_tests = total_tests - passed_tests
success_rate = (passed_tests / total_tests * 100) if total_tests > 0 else 0
print(f"\n📊 测试统计:")
print(f" 总测试数: {total_tests}")
print(f" 通过测试: {passed_tests}")
print(f" 失败测试: {failed_tests}")
print(f" 成功率: {success_rate:.1f}%")
print(f"\n📋 详细结果:")
for result in self.results:
status = "" if result['success'] else ""
print(f" {status} {result['test_name']}: {result['details']}")
if result['metrics']:
for key, value in result['metrics'].items():
print(f" {key}: {value}")
# 性能评级
if success_rate >= 90:
grade = "优秀"
assessment = "系统在高负载下表现优秀,具有良好的性能和稳定性。"
elif success_rate >= 75:
grade = "良好"
assessment = "系统在压力测试中表现良好,但在某些极端情况下可能需要优化。"
elif success_rate >= 60:
grade = "一般"
assessment = "系统基本能够处理压力场景,但存在性能瓶颈,需要优化。"
else:
grade = "较差"
assessment = "系统在高负载下表现不佳,存在严重的性能或稳定性问题。"
print(f"\n🎯 性能评级: {grade}")
print(f"\n📝 评估结论: {assessment}")
final_memory = self.get_memory_usage()
memory_change = final_memory - self.start_memory
print(f"\n💾 内存使用: 初始{self.start_memory:.1f}MB → 最终{final_memory:.1f}MB (变化{memory_change:+.1f}MB)")
return {
'total_tests': total_tests,
'passed_tests': passed_tests,
'success_rate': success_rate,
'grade': grade,
'assessment': assessment,
'memory_change': memory_change
}
def main():
"""主函数"""
test_suite = StressTestSuite()
report = test_suite.run_stress_tests()
return report
if __name__ == "__main__":
main()