# 缓存和持久化机制说明 ## 概述 本项目实现了完善的任务缓存和持久化机制,确保服务重启后能够恢复任务状态,并提供高效的任务管理功能。 ## 主要特性 ### 1. 分层缓存设计 - **运行中任务缓存** (`running_tasks_cache`): 存储正在执行的任务(状态为 `running` 或 `queued`) - **已完成任务缓存** (`completed_tasks_cache`): 存储已完成的任务(状态为 `succeeded`、`failed`、`cancelled`) - **等待队列** (`waiting_queue`): 存储等待执行的任务请求 ### 2. 持久化机制 - **文件持久化**: 等待队列数据保存到本地JSON文件 - **自动保存**: 等待队列变化时自动保存 - **启动恢复**: 服务启动时自动从持久化文件恢复等待队列 ### 3. 智能清理策略 - **按时间清理**: 自动清理超过TTL的已完成任务 - **按数量限制**: 保留最新的N个已完成任务 - **定期执行**: 每10次状态更新执行一次清理 ### 4. SDK数据恢复 - **启动时恢复**: 从SDK重新加载运行中和已完成的任务 - **状态同步**: 定时从SDK更新任务状态 - **缓存更新**: SDK数据变化时自动更新本地缓存 ## 配置参数 ### TaskQueueManager 初始化参数 ```python queue_manager = TaskQueueManager( max_running_tasks=5, # 最大运行任务数 update_interval=5, # 状态更新间隔(秒) persistence_file="task_queue_persistence.json" # 持久化文件路径 ) ``` ### 缓存配置 ```python # 在 __init__ 方法中设置 self.max_completed_cache_size = 100 # 最多保留100个已完成任务 self.completed_cache_ttl_hours = 24 # 已完成任务缓存保留24小时 ``` ## 工作流程 ### 1. 服务启动流程 ``` 1. 创建TaskQueueManager实例 2. 调用start()方法 3. 从持久化文件恢复等待队列 (_load_persistence_data) 4. 从SDK恢复任务缓存 (_load_initial_tasks) 5. 启动状态更新线程 (_start_update_thread) ``` ### 2. 任务创建流程 ``` 1. 检查是否可以创建新任务 (can_create_task) 2. 调用SDK创建任务 3. 根据队列状态决定: - 直接加入运行中缓存 - 或加入等待队列并持久化 4. 返回任务信息和队列状态 ``` ### 3. 状态更新流程 ``` 1. 定时查询运行中任务状态 2. 更新任务缓存时间戳 3. 将完成的任务从运行中缓存移到已完成缓存 4. 处理等待队列,将等待任务移到运行中 5. 定期清理过期的已完成任务 6. 定期保存持久化数据 ``` ### 4. 服务停止流程 ``` 1. 调用stop()方法 2. 保存等待队列到持久化文件 3. 停止更新线程 4. 清理资源 ``` ## API接口 ### 获取队列状态 ```http GET /api/video/queue/status ``` 返回详细的队列状态信息: ```json { "success": true, "data": { "running_tasks_count": 3, "completed_tasks_count": 25, "waiting_queue_count": 2, "total_cache_count": 30, "status_counts": { "running": 2, "queued": 1, "succeeded": 20, "failed": 3, "cancelled": 2, "waiting": 2 }, "max_running_tasks": 5, "max_completed_cache_size": 100, "completed_cache_ttl_hours": 24, "running_task_ids": ["task1", "task2", "task3"], "completed_task_ids": ["task4", "task5", ...], "waiting_task_ids": ["task6", "task7"], "persistence_file": "task_queue_persistence.json" } } ``` ### 查询任务结果 ```http GET /api/video/result/ ``` 优先从缓存查询,缓存未命中时调用SDK: 1. 检查运行中任务缓存 2. 检查已完成任务缓存 3. 检查等待队列 4. 调用SDK查询(如果缓存都未命中) ## 持久化文件格式 ```json { "waiting_queue": [ { "task_id": "task_123", "status": "waiting", "content": { "image_url": "...", "prompt": "..." }, "cache_time": "2024-01-01T12:00:00", "created_at": "2024-01-01T12:00:00" } ], "timestamp": "2024-01-01T12:00:00" } ``` ## 监控和调试 ### 日志级别 - `INFO`: 重要操作(启动、停止、任务状态变化) - `DEBUG`: 详细操作(任务恢复、缓存更新) - `WARNING`: 异常情况(任务不存在、清理失败) - `ERROR`: 错误情况(API调用失败、持久化失败) ### 关键日志示例 ``` 启动任务队列管理器 从持久化文件恢复了 3 个等待任务 缓存恢复完成: 2 个运行中任务, 15 个已完成任务 任务状态更新: 1 个任务完成 清理了 5 个已完成任务的缓存 保存了 2 个等待任务到持久化文件 ``` ## 测试 运行测试脚本验证缓存和持久化机制: ```bash python test_cache_persistence.py ``` 测试内容包括: - 持久化机制测试 - 缓存机制测试 - 清理机制测试 ## 性能优化 ### 1. 缓存策略 - 运行中任务优先级最高,实时更新 - 已完成任务按时间和数量双重限制 - 等待队列持久化保证数据安全 ### 2. 更新频率 - 状态更新:每5秒一次 - 缓存清理:每10次更新一次 - 持久化保存:每20次更新一次或队列变化时 ### 3. 内存管理 - 自动清理过期任务 - 限制缓存大小 - 避免内存泄漏 ## 故障恢复 ### 服务意外重启 1. **等待队列恢复**: 从持久化文件完全恢复 2. **运行中任务恢复**: 从SDK重新加载状态 3. **已完成任务恢复**: 从SDK加载最近的任务 ### 持久化文件损坏 1. 服务正常启动,但等待队列为空 2. 记录错误日志 3. 继续正常服务,新任务正常排队 ### SDK服务异常 1. 缓存继续提供查询服务 2. 记录API调用失败日志 3. 定时重试恢复连接 ## 最佳实践 1. **定期备份**: 备份持久化文件 2. **监控日志**: 关注缓存清理和恢复日志 3. **合理配置**: 根据业务量调整缓存大小和TTL 4. **性能监控**: 监控缓存命中率和队列长度 5. **故障演练**: 定期测试服务重启恢复能力