This commit is contained in:
jonathang4 2025-06-07 00:28:35 +08:00
parent 5fdeec479e
commit a89593ae17
21 changed files with 4255 additions and 16 deletions

40
.env.dev Normal file
View File

@ -0,0 +1,40 @@
# ========================================
# DEV 本地开发环境配置
# ========================================
# 火山引擎API配置
ARK_API_KEY=0d5189b3-9a03-4393-81be-8c1ba1e97cbb
VOLC_ACCESSKEY=AKLTYjQyYmE1ZDAwZTY5NGZiOWI3ODZkZDhhOWE4MzVjODE
VOLC_SECRETKEY=WlRKa05EbGhZVEUyTXpjNU5ESmpPRGt5T0RJNFl6QmhPR0pqTVRjMVpUWQ==
# Flask配置
FLASK_ENV=development
SECRET_KEY=dev_secret_key_here
# 服务配置
HOST=127.0.0.1
PORT=5000
# 日志配置
LOG_LEVEL=DEBUG
LOG_FILE=logs/dev_video_api.log
# 队列管理配置
QUEUE_MAX_RUNNING_TASKS=5 # 最大并发运行任务数
QUEUE_UPDATE_INTERVAL=5 # 任务状态更新间隔(秒)
QUEUE_PERSISTENCE_FILE=task_queue_persistence.json # 队列持久化文件
QUEUE_MAX_COMPLETED_CACHE_SIZE=100 # 最大已完成任务缓存数量
QUEUE_COMPLETED_CACHE_TTL_HOURS=24 # 已完成任务缓存保留时间(小时)
# 视频生成配置
VIDEO_MODEL=doubao-seedance-1-0-lite-i2v-250428
VIDEO_MAX_DURATION=10
VIDEO_DEFAULT_DURATION=5
VIDEO_DEFAULT_ASPECT_RATIO=16:9
VIDEO_DEFAULT_RESOLUTION=1280x720
# 开发环境特殊配置
# 启用调试模式
DEBUG=true
# 热重载
FLASK_DEBUG=true

40
.env.dev-server Normal file
View File

@ -0,0 +1,40 @@
# ========================================
# DEV-SERVER 测试服环境配置
# ========================================
# 火山引擎API配置
ARK_API_KEY=0d5189b3-9a03-4393-81be-8c1ba1e97cbb
VOLC_ACCESSKEY=AKLTYjQyYmE1ZDAwZTY5NGZiOWI3ODZkZDhhOWE4MzVjODE
VOLC_SECRETKEY=WlRKa05EbGhZVEUyTXpjNU5ESmpPRGt5T0RJNFl6QmhPR0pqTVRjMVpUWQ==
# Flask配置
FLASK_ENV=testing
SECRET_KEY=test_secret_key_here
# 服务配置
HOST=0.0.0.0
PORT=5001
# 日志配置
LOG_LEVEL=INFO
LOG_FILE=logs/test_video_api.log
# 队列管理配置
QUEUE_MAX_RUNNING_TASKS=5 # 最大并发运行任务数
QUEUE_UPDATE_INTERVAL=5 # 任务状态更新间隔(秒)
QUEUE_PERSISTENCE_FILE=task_queue_persistence.json # 队列持久化文件
QUEUE_MAX_COMPLETED_CACHE_SIZE=200 # 最大已完成任务缓存数量
QUEUE_COMPLETED_CACHE_TTL_HOURS=24 # 已完成任务缓存保留时间(小时)
# 视频生成配置
VIDEO_MODEL=doubao-seedance-1-0-lite-i2v-250428
VIDEO_MAX_DURATION=15
VIDEO_DEFAULT_DURATION=8
VIDEO_DEFAULT_ASPECT_RATIO=16:9
VIDEO_DEFAULT_RESOLUTION=1920x1080
# 测试环境特殊配置
# 关闭调试模式
DEBUG=false
# 测试模式
TESTING=true

98
.env.example Normal file
View File

@ -0,0 +1,98 @@
# ========================================
# 环境配置示例文件
# 请根据部署环境选择对应的配置并复制到 .env 文件中
# ========================================
# ========================================
# DEV 本地开发环境配置
# ========================================
# 火山引擎API配置
# Flask配置
# FLASK_ENV=development
# SECRET_KEY=dev_secret_key_here
# 服务配置
# HOST=127.0.0.1
# PORT=5000
# 日志配置
# LOG_LEVEL=DEBUG
# 队列管理配置
# QUEUE_MAX_RUNNING_TASKS=5 # 最大并发运行任务数
# QUEUE_UPDATE_INTERVAL=5 # 任务状态更新间隔(秒)
# QUEUE_PERSISTENCE_FILE=task_queue_persistence.json # 队列持久化文件
# QUEUE_MAX_COMPLETED_CACHE_SIZE=100 # 最大已完成任务缓存数量
# QUEUE_COMPLETED_CACHE_TTL_HOURS=24 # 已完成任务缓存保留时间(小时)
# LOG_FILE=logs/dev_video_api.log
# 视频生成配置
# VIDEO_MODEL=doubao-seedance-1-0-lite-i2v-250428
# VIDEO_MAX_DURATION=10
# VIDEO_DEFAULT_DURATION=5
# VIDEO_DEFAULT_ASPECT_RATIO=16:9
# VIDEO_DEFAULT_RESOLUTION=1280x720
# ========================================
# DEV-SERVER 测试服环境配置
# ========================================
# 火山引擎API配置
# ARK_API_KEY=your_test_ark_api_key_here
# Flask配置
# FLASK_ENV=testing
# SECRET_KEY=test_secret_key_here
# 服务配置
# HOST=0.0.0.0
# PORT=5001
# 日志配置
# LOG_LEVEL=INFO
# LOG_FILE=logs/test_video_api.log
# 视频生成配置
# VIDEO_MODEL=doubao-seedance-1-0-lite-i2v-250428
# VIDEO_MAX_DURATION=15
# VIDEO_DEFAULT_DURATION=8
# VIDEO_DEFAULT_ASPECT_RATIO=16:9
# VIDEO_DEFAULT_RESOLUTION=1920x1080
# ========================================
# PRO 正式服环境配置
# ========================================
# 火山引擎API配置
# ARK_API_KEY=your_prod_ark_api_key_here
# Flask配置
# FLASK_ENV=production
# SECRET_KEY=prod_secret_key_here_use_strong_key
# 服务配置
# HOST=0.0.0.0
# PORT=5000
# 日志配置
# LOG_LEVEL=WARNING
# LOG_FILE=logs/prod_video_api.log
# 视频生成配置
# VIDEO_MODEL=doubao-seedance-1-0-lite-i2v-250428
# VIDEO_MAX_DURATION=30
# VIDEO_DEFAULT_DURATION=10
# VIDEO_DEFAULT_ASPECT_RATIO=16:9
# VIDEO_DEFAULT_RESOLUTION=1920x1080
# ========================================
# 使用说明
# ========================================
# 1. 根据你的部署环境,选择对应的配置段落
# 2. 取消注释(删除行首的 # 符号)需要的配置项
# 3. 将配置复制到项目根目录的 .env 文件中
# 4. 根据实际情况修改配置值
#
# 环境说明:
# - DEV: 本地开发环境,调试模式,详细日志
# - DEV-SERVER: 测试服环境,用于功能测试
# - PRO: 正式生产环境,性能优化,错误日志

42
.env.pro Normal file
View File

@ -0,0 +1,42 @@
# ========================================
# PRO 正式服环境配置
# ========================================
# 火山引擎API配置
ARK_API_KEY=0d5189b3-9a03-4393-81be-8c1ba1e97cbb
VOLC_ACCESSKEY=AKLTYjQyYmE1ZDAwZTY5NGZiOWI3ODZkZDhhOWE4MzVjODE
VOLC_SECRETKEY=WlRKa05EbGhZVEUyTXpjNU5ESmpPRGt5T0RJNFl6QmhPR0pqTVRjMVpUWQ==
# Flask配置
FLASK_ENV=production
SECRET_KEY=prod_secret_key_here_use_strong_key
# 服务配置
HOST=0.0.0.0
PORT=5000
# 日志配置
LOG_LEVEL=WARNING
LOG_FILE=logs/prod_video_api.log
# 队列管理配置
QUEUE_MAX_RUNNING_TASKS=5 # 最大并发运行任务数
QUEUE_UPDATE_INTERVAL=3 # 任务状态更新间隔(秒)
QUEUE_PERSISTENCE_FILE=task_queue_persistence.json # 队列持久化文件
QUEUE_MAX_COMPLETED_CACHE_SIZE=500 # 最大已完成任务缓存数量
QUEUE_COMPLETED_CACHE_TTL_HOURS=24 # 已完成任务缓存保留时间(小时)
# 视频生成配置
VIDEO_MODEL=doubao-seedance-1-0-lite-i2v-250428
VIDEO_MAX_DURATION=30
VIDEO_DEFAULT_DURATION=10
VIDEO_DEFAULT_ASPECT_RATIO=16:9
VIDEO_DEFAULT_RESOLUTION=1920x1080
# 生产环境特殊配置
# 关闭调试模式
DEBUG=false
# 关闭测试模式
TESTING=false
# 启用性能优化
OPTIMIZE=true

51
.gitignore vendored
View File

@ -1,4 +1,3 @@
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
@ -95,12 +94,6 @@ ipython_config.py
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
@ -113,10 +106,8 @@ ipython_config.py
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
@ -163,8 +154,40 @@ cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# be added to the global gitignore or merged into this project gitignore. For a PyCharm
# project, it is recommended to include the following files:
# .idea/
# *.iml
# *.ipr
# *.iws
.idea/
*.iml
*.ipr
*.iws
# Visual Studio Code
.vscode/
# Project specific
logs/
uploads/
*.log
.DS_Store
Thumbs.db
# Secret files
.env.local
.env.production
secrets/
*.key
*.pem
*.crt
# Temporary files
*.tmp
*.temp
*.swp
*.swo
*~
task_queue_persistence.json

245
CACHE_PERSISTENCE_README.md Normal file
View File

@ -0,0 +1,245 @@
# 缓存和持久化机制说明
## 概述
本项目实现了完善的任务缓存和持久化机制,确保服务重启后能够恢复任务状态,并提供高效的任务管理功能。
## 主要特性
### 1. 分层缓存设计
- **运行中任务缓存** (`running_tasks_cache`): 存储正在执行的任务(状态为 `running``queued`
- **已完成任务缓存** (`completed_tasks_cache`): 存储已完成的任务(状态为 `succeeded``failed``cancelled`
- **等待队列** (`waiting_queue`): 存储等待执行的任务请求
### 2. 持久化机制
- **文件持久化**: 等待队列数据保存到本地JSON文件
- **自动保存**: 等待队列变化时自动保存
- **启动恢复**: 服务启动时自动从持久化文件恢复等待队列
### 3. 智能清理策略
- **按时间清理**: 自动清理超过TTL的已完成任务
- **按数量限制**: 保留最新的N个已完成任务
- **定期执行**: 每10次状态更新执行一次清理
### 4. SDK数据恢复
- **启动时恢复**: 从SDK重新加载运行中和已完成的任务
- **状态同步**: 定时从SDK更新任务状态
- **缓存更新**: SDK数据变化时自动更新本地缓存
## 配置参数
### TaskQueueManager 初始化参数
```python
queue_manager = TaskQueueManager(
max_running_tasks=5, # 最大运行任务数
update_interval=5, # 状态更新间隔(秒)
persistence_file="task_queue_persistence.json" # 持久化文件路径
)
```
### 缓存配置
```python
# 在 __init__ 方法中设置
self.max_completed_cache_size = 100 # 最多保留100个已完成任务
self.completed_cache_ttl_hours = 24 # 已完成任务缓存保留24小时
```
## 工作流程
### 1. 服务启动流程
```
1. 创建TaskQueueManager实例
2. 调用start()方法
3. 从持久化文件恢复等待队列 (_load_persistence_data)
4. 从SDK恢复任务缓存 (_load_initial_tasks)
5. 启动状态更新线程 (_start_update_thread)
```
### 2. 任务创建流程
```
1. 检查是否可以创建新任务 (can_create_task)
2. 调用SDK创建任务
3. 根据队列状态决定:
- 直接加入运行中缓存
- 或加入等待队列并持久化
4. 返回任务信息和队列状态
```
### 3. 状态更新流程
```
1. 定时查询运行中任务状态
2. 更新任务缓存时间戳
3. 将完成的任务从运行中缓存移到已完成缓存
4. 处理等待队列,将等待任务移到运行中
5. 定期清理过期的已完成任务
6. 定期保存持久化数据
```
### 4. 服务停止流程
```
1. 调用stop()方法
2. 保存等待队列到持久化文件
3. 停止更新线程
4. 清理资源
```
## API接口
### 获取队列状态
```http
GET /api/video/queue/status
```
返回详细的队列状态信息:
```json
{
"success": true,
"data": {
"running_tasks_count": 3,
"completed_tasks_count": 25,
"waiting_queue_count": 2,
"total_cache_count": 30,
"status_counts": {
"running": 2,
"queued": 1,
"succeeded": 20,
"failed": 3,
"cancelled": 2,
"waiting": 2
},
"max_running_tasks": 5,
"max_completed_cache_size": 100,
"completed_cache_ttl_hours": 24,
"running_task_ids": ["task1", "task2", "task3"],
"completed_task_ids": ["task4", "task5", ...],
"waiting_task_ids": ["task6", "task7"],
"persistence_file": "task_queue_persistence.json"
}
}
```
### 查询任务结果
```http
GET /api/video/result/<task_id>
```
优先从缓存查询缓存未命中时调用SDK
1. 检查运行中任务缓存
2. 检查已完成任务缓存
3. 检查等待队列
4. 调用SDK查询如果缓存都未命中
## 持久化文件格式
```json
{
"waiting_queue": [
{
"task_id": "task_123",
"status": "waiting",
"content": {
"image_url": "...",
"prompt": "..."
},
"cache_time": "2024-01-01T12:00:00",
"created_at": "2024-01-01T12:00:00"
}
],
"timestamp": "2024-01-01T12:00:00"
}
```
## 监控和调试
### 日志级别
- `INFO`: 重要操作(启动、停止、任务状态变化)
- `DEBUG`: 详细操作(任务恢复、缓存更新)
- `WARNING`: 异常情况(任务不存在、清理失败)
- `ERROR`: 错误情况API调用失败、持久化失败
### 关键日志示例
```
启动任务队列管理器
从持久化文件恢复了 3 个等待任务
缓存恢复完成: 2 个运行中任务, 15 个已完成任务
任务状态更新: 1 个任务完成
清理了 5 个已完成任务的缓存
保存了 2 个等待任务到持久化文件
```
## 测试
运行测试脚本验证缓存和持久化机制:
```bash
python test_cache_persistence.py
```
测试内容包括:
- 持久化机制测试
- 缓存机制测试
- 清理机制测试
## 性能优化
### 1. 缓存策略
- 运行中任务优先级最高,实时更新
- 已完成任务按时间和数量双重限制
- 等待队列持久化保证数据安全
### 2. 更新频率
- 状态更新每5秒一次
- 缓存清理每10次更新一次
- 持久化保存每20次更新一次或队列变化时
### 3. 内存管理
- 自动清理过期任务
- 限制缓存大小
- 避免内存泄漏
## 故障恢复
### 服务意外重启
1. **等待队列恢复**: 从持久化文件完全恢复
2. **运行中任务恢复**: 从SDK重新加载状态
3. **已完成任务恢复**: 从SDK加载最近的任务
### 持久化文件损坏
1. 服务正常启动,但等待队列为空
2. 记录错误日志
3. 继续正常服务,新任务正常排队
### SDK服务异常
1. 缓存继续提供查询服务
2. 记录API调用失败日志
3. 定时重试恢复连接
## 最佳实践
1. **定期备份**: 备份持久化文件
2. **监控日志**: 关注缓存清理和恢复日志
3. **合理配置**: 根据业务量调整缓存大小和TTL
4. **性能监控**: 监控缓存命中率和队列长度
5. **故障演练**: 定期测试服务重启恢复能力

39
Dockerfile Normal file
View File

@ -0,0 +1,39 @@
FROM python:3.9-slim
# 设置工作目录
WORKDIR /app
# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV FLASK_APP=app.py
ENV FLASK_ENV=production
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建非root用户
RUN useradd --create-home --shell /bin/bash app \
&& chown -R app:app /app
USER app
# 暴露端口
EXPOSE 5000
# 健康检查
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:5000/health || exit 1
# 启动命令
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "4", "--timeout", "120", "app:app"]

91
QUEUE_CONFIG_README.md Normal file
View File

@ -0,0 +1,91 @@
# 队列管理配置说明
本文档说明了视频生成API服务中队列管理相关的配置参数。
## 配置参数
### 基础队列配置
| 参数名 | 环境变量 | 默认值 | 说明 |
|--------|----------|--------|------|
| 最大运行任务数 | `QUEUE_MAX_RUNNING_TASKS` | 5 | 同时运行的最大任务数量 |
| 状态更新间隔 | `QUEUE_UPDATE_INTERVAL` | 5 | 任务状态检查更新间隔(秒) |
| 持久化文件 | `QUEUE_PERSISTENCE_FILE` | task_queue_persistence.json | 队列数据持久化文件路径 |
### 缓存管理配置
| 参数名 | 环境变量 | 默认值 | 说明 |
|--------|----------|--------|------|
| 已完成任务缓存大小 | `QUEUE_MAX_COMPLETED_CACHE_SIZE` | 100 | 最多保留的已完成任务数量 |
| 缓存保留时间 | `QUEUE_COMPLETED_CACHE_TTL_HOURS` | 24 | 已完成任务在缓存中保留的小时数 |
## 环境配置建议
### 开发环境 (.env.dev)
```bash
# 队列管理配置
QUEUE_MAX_RUNNING_TASKS=5
QUEUE_UPDATE_INTERVAL=5
QUEUE_PERSISTENCE_FILE=task_queue_persistence.json
QUEUE_MAX_COMPLETED_CACHE_SIZE=100
QUEUE_COMPLETED_CACHE_TTL_HOURS=24
```
### 测试环境 (.env.dev-server)
```bash
# 队列管理配置
QUEUE_MAX_RUNNING_TASKS=8
QUEUE_UPDATE_INTERVAL=3
QUEUE_PERSISTENCE_FILE=task_queue_persistence.json
QUEUE_MAX_COMPLETED_CACHE_SIZE=200
QUEUE_COMPLETED_CACHE_TTL_HOURS=24
```
### 生产环境 (.env.pro)
```bash
# 队列管理配置
QUEUE_MAX_RUNNING_TASKS=10
QUEUE_UPDATE_INTERVAL=3
QUEUE_PERSISTENCE_FILE=task_queue_persistence.json
QUEUE_MAX_COMPLETED_CACHE_SIZE=500
QUEUE_COMPLETED_CACHE_TTL_HOURS=48
```
## 配置调优建议
### QUEUE_MAX_RUNNING_TASKS最大运行任务数
- **开发环境**: 5个任务适合本地开发测试
- **测试环境**: 8个任务模拟中等负载
- **生产环境**: 10个任务根据服务器性能和API限制调整
### QUEUE_UPDATE_INTERVAL状态更新间隔
- **开发环境**: 5秒便于调试观察
- **测试/生产环境**: 3秒提高响应速度
- **注意**: 间隔太短会增加API调用频率太长会影响用户体验
### QUEUE_MAX_COMPLETED_CACHE_SIZE缓存大小
- **开发环境**: 100个任务满足基本需求
- **测试环境**: 200个任务支持更多测试场景
- **生产环境**: 500个任务支持高并发场景
### QUEUE_COMPLETED_CACHE_TTL_HOURS缓存保留时间
- **开发/测试环境**: 24小时便于调试和测试
- **生产环境**: 48小时给用户更长的查询窗口
## 性能影响
1. **QUEUE_MAX_RUNNING_TASKS**: 影响并发处理能力和资源消耗
2. **QUEUE_UPDATE_INTERVAL**: 影响状态更新及时性和API调用频率
3. **QUEUE_MAX_COMPLETED_CACHE_SIZE**: 影响内存使用和查询性能
4. **QUEUE_COMPLETED_CACHE_TTL_HOURS**: 影响存储空间和数据一致性
## 监控建议
建议监控以下指标:
- 运行中任务数量
- 等待队列长度
- 任务完成率
- 平均处理时间
- 缓存命中率
通过这些指标可以判断当前配置是否合适,并进行相应调整。

522
README.md
View File

@ -1,3 +1,521 @@
# hs-video-api
# 视频生成API服务
火山引擎 API服务
基于Flask的视频生成API服务集成火山引擎豆包视频生成功能提供异步视频生成服务。
## 功能特性
- 🎬 集成火山引擎豆包视频生成API
- 🔄 异步视频生成任务处理与队列管理
- 📊 实时任务状态查询
- 🎯 支持文本+图片的多模态输入
- 📁 支持图片文件上传和URL输入
- 🔧 RESTful API设计
- ⚡ 完整的错误处理和日志记录
- 💾 任务队列持久化和缓存管理
- 🔧 灵活的环境配置管理
## 快速开始
### 1. 安装依赖
```bash
pip install -r requirements.txt
```
### 2. 配置环境变量
根据部署环境选择对应的配置文件:
- 开发环境:`.env.dev`
- 测试环境:`.env.dev-server`
- 生产环境:`.env.pro`
或者复制示例文件进行自定义配置:
```bash
cp .env.example .env
```
设置环境变量:
```bash
# Windows
set APP_ENV=dev
# Linux/Mac
export APP_ENV=dev
```
### 3. 启动服务
```bash
python app.py
```
服务将在配置的地址和端口启动(默认 `http://localhost:5000`)。
## API文档
### 视频生成API接口
#### 创建视频生成任务
**POST** `/api/video/create`
支持两种请求方式:
##### 方式1JSON请求图片URL
请求头:
```
Content-Type: application/json
```
请求体:
```json
{
"prompt": "一只可爱的小猫在花园里玩耍",
"image_url": "https://example.com/cat.jpg",
"duration": 5,
"callback_url": "https://your-callback-url.com/webhook"
}
```
##### 方式2文件上传图片文件
请求头:
```
Content-Type: multipart/form-data
```
表单数据:
- `image_file`: 图片文件
- `prompt`: 文本描述
- `duration`: 视频时长(可选)
- `callback_url`: 回调URL可选
响应:
```json
{
"success": true,
"task_id": "task-uuid-here",
"message": "任务创建成功"
}
```
#### 查询任务状态
**GET** `/api/video/status/<task_id>`
响应:
```json
{
"success": true,
"data": {
"task_id": "task-uuid",
"status": "running",
"message": "任务正在处理中,请稍后查询"
}
}
```
#### 查询任务结果
**GET** `/api/video/result/<task_id>`
响应(成功):
```json
{
"success": true,
"data": {
"task_id": "task-uuid",
"status": "succeeded",
"video_url": "https://example.com/generated_video.mp4",
"created_at": "2024-01-01T00:00:00",
"updated_at": "2024-01-01T00:05:00"
}
}
```
#### 获取任务列表
**GET** `/api/video/tasks`
响应:
```json
{
"success": true,
"data": {
"tasks": [
{
"task_id": "task-uuid-1",
"status": "succeeded",
"created_at": "2024-01-01T00:00:00"
}
],
"total": 1
}
}
```
#### 取消/删除任务
**DELETE** `/api/video/cancel/<task_id>`
响应:
```json
{
"success": true,
"message": "任务已取消"
}
```
#### 查询队列状态
**GET** `/api/video/queue/status`
响应:
```json
{
"success": true,
"data": {
"running_tasks_count": 2,
"completed_tasks_count": 10,
"waiting_queue_count": 0,
"total_cache_count": 12
}
}
```
## 使用示例
### cURL示例
#### 创建视频生成任务JSON方式
```bash
curl -X POST http://localhost:5000/api/video/create \
-H "Content-Type: application/json" \
-d '{
"prompt": "一只可爱的小猫在花园里玩耍",
"image_url": "https://example.com/cat.jpg",
"duration": 5,
"callback_url": "https://your-callback-url.com/webhook"
}'
```
#### 创建视频生成任务(文件上传方式)
```bash
curl -X POST http://localhost:5000/api/video/create \
-F "image_file=@/path/to/your/image.jpg" \
-F "prompt=一只可爱的小猫在花园里玩耍" \
-F "duration=5"
```
#### 查询任务状态
```bash
curl -X GET http://localhost:5000/api/video/status/task-uuid
```
#### 查询任务结果
```bash
curl -X GET http://localhost:5000/api/video/result/task-uuid
```
#### 获取任务列表
```bash
curl -X GET http://localhost:5000/api/video/tasks
```
#### 查询队列状态
```bash
curl -X GET http://localhost:5000/api/video/queue/status
```
### Python示例
```python
import requests
import json
import time
# 配置
API_BASE_URL = "http://localhost:5000"
# 创建视频生成任务JSON方式
def create_video_task_json(prompt, image_url=None, duration=5, callback_url=None):
url = f"{API_BASE_URL}/api/video/create"
data = {
"prompt": prompt,
"duration": duration
}
if image_url:
data["image_url"] = image_url
if callback_url:
data["callback_url"] = callback_url
response = requests.post(url, json=data)
return response.json()
# 创建视频生成任务(文件上传方式)
def create_video_task_file(prompt, image_file_path, duration=5):
url = f"{API_BASE_URL}/api/video/create"
with open(image_file_path, 'rb') as f:
files = {'image_file': f}
data = {
'prompt': prompt,
'duration': str(duration)
}
response = requests.post(url, files=files, data=data)
return response.json()
# 查询任务状态
def get_task_status(task_id):
url = f"{API_BASE_URL}/api/video/status/{task_id}"
response = requests.get(url)
return response.json()
# 查询任务结果
def get_task_result(task_id):
url = f"{API_BASE_URL}/api/video/result/{task_id}"
response = requests.get(url)
return response.json()
# 获取任务列表
def get_task_list():
url = f"{API_BASE_URL}/api/video/tasks"
response = requests.get(url)
return response.json()
# 查询队列状态
def get_queue_status():
url = f"{API_BASE_URL}/api/video/queue/status"
response = requests.get(url)
return response.json()
# 使用示例
if __name__ == "__main__":
# 创建任务JSON方式
result = create_video_task_json(
prompt="一只可爱的小猫在花园里玩耍",
image_url="https://example.com/cat.jpg",
duration=5
)
print("创建任务结果:", result)
if result.get("success") and "task_id" in result:
task_id = result["task_id"]
# 轮询查询任务状态
while True:
status = get_task_status(task_id)
print(f"任务状态: {status}")
if status.get("data", {}).get("status") in ["succeeded", "failed"]:
# 获取最终结果
result = get_task_result(task_id)
print(f"任务结果: {result}")
break
time.sleep(5) # 等待5秒后再次查询
# 查询队列状态
queue_status = get_queue_status()
print("队列状态:", queue_status)
```
## 测试
运行测试脚本:
```bash
# 设置API密钥
export ARK_API_KEY=your_api_key
# 运行测试
python test_doubao_api.py
```
## 任务状态说明
- **pending**: 任务已创建,等待处理
- **running**: 任务正在处理中
- **succeeded**: 任务处理成功,视频生成完成
- **failed**: 任务处理失败
- **cancelled**: 任务已被取消
## 配置说明
### 环境变量配置
项目支持多环境配置,通过 `APP_ENV` 环境变量指定:
- `dev`: 开发环境(使用 `.env.dev`
- `dev-server`: 测试环境(使用 `.env.dev-server`
- `pro`: 生产环境(使用 `.env.pro`
### 队列管理配置
详细的队列配置说明请参考 `QUEUE_CONFIG_README.md` 文件。
主要配置参数:
- `QUEUE_MAX_RUNNING_TASKS`: 最大并发运行任务数
- `QUEUE_UPDATE_INTERVAL`: 任务状态更新间隔(秒)
- `QUEUE_PERSISTENCE_FILE`: 队列持久化文件路径
- `QUEUE_MAX_COMPLETED_CACHE_SIZE`: 最大已完成任务缓存数量
- `QUEUE_COMPLETED_CACHE_TTL_HOURS`: 已完成任务缓存保留时间(小时)
### 使用注意事项
- 该模型主要用于图生视频需要提供图片URL作为输入
- 如需使用文生视频功能,可以将模型改为 `doubao-seedance-1-0-lite-t2v-250428`
- 视频生成为异步过程,通常需要等待较长时间
## 项目结构
```
hs-video-api/
├── app.py # Flask应用主文件
├── config.py # 配置管理
├── routes.py # API路由定义
├── video_service.py # 视频生成服务
├── task_queue_manager.py # 任务队列管理
├── requirements.txt # 依赖包列表
├── .env.dev # 开发环境配置
├── .env.dev-server # 测试环境配置
├── .env.pro # 生产环境配置
├── .env.example # 环境变量示例
├── Dockerfile # Docker配置
├── README.md # 项目说明
├── QUEUE_CONFIG_README.md # 队列配置说明
└── tests/ # 测试文件目录
├── test_cache_persistence.py
├── test_robustness.py
└── test_stress.py
```
## Docker部署
### 构建镜像
```bash
docker build -t hs-video-api .
```
### 运行容器
```bash
docker run -d \
--name hs-video-api \
-p 5000:5000 \
-e APP_ENV=pro \
hs-video-api
```
### 使用docker-compose
创建 `docker-compose.yml`
```yaml
version: '3.8'
services:
hs-video-api:
build: .
ports:
- "5000:5000"
environment:
- APP_ENV=pro
volumes:
- ./queue_data:/app/queue_data # 队列持久化数据
restart: unless-stopped
```
启动服务:
```bash
docker-compose up -d
```
## 错误处理
### 常见错误码
- `400`: 请求参数错误
- `404`: 任务不存在
- `405`: 请求方法不允许
- `413`: 上传文件过大
- `500`: 服务器内部错误
### 错误响应格式
```json
{
"success": false,
"error": "参数验证失败",
"message": "prompt字段为必填项"
}
```
### 成功响应格式
```json
{
"success": true,
"data": {
// 具体数据内容
},
"message": "操作成功"
}
```
## 开发和测试
### 运行测试
项目包含多种测试用例:
```bash
# 缓存持久化测试
python tests/test_cache_persistence.py
# 鲁棒性测试
python tests/test_robustness.py
# 压力测试
python tests/test_stress.py
```
### 日志查看
应用日志会输出到控制台和日志文件(如果配置了的话)。可以通过调整 `LOG_LEVEL` 环境变量来控制日志级别。
## 性能优化建议
1. **队列配置优化**:根据服务器性能调整 `QUEUE_MAX_RUNNING_TASKS`
2. **缓存管理**:合理设置 `QUEUE_MAX_COMPLETED_CACHE_SIZE``QUEUE_COMPLETED_CACHE_TTL_HOURS`
3. **文件上传**对于大文件上传建议使用CDN或对象存储
4. **监控告警**:建议监控队列状态和任务处理时间
## 注意事项
1. **API密钥安全**确保火山引擎API密钥的安全性不要在代码中硬编码
2. **文件存储**:上传的图片文件会临时存储,建议定期清理
3. **队列持久化**:队列数据会持久化到文件,确保有足够的磁盘空间
4. **并发限制**根据API配额和服务器性能合理设置并发数
5. **错误重试**:建议在客户端实现适当的重试机制
## 许可证
MIT License
---
**注意**: 本项目仅供学习和研究使用请遵守火山引擎API的使用条款和限制。

54
app.py Normal file
View File

@ -0,0 +1,54 @@
from flask import Flask
from routes import api_bp
from config import config, APP_ENV
from task_queue_manager import init_queue_manager
import os
import logging
from logging.handlers import RotatingFileHandler
# 创建日志目录
log_dir = 'logs'
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s %(message)s',
handlers=[
# 文件处理器 - 记录到文件
RotatingFileHandler(
os.path.join(log_dir, 'app.log'),
maxBytes=10*1024*1024, # 10MB
backupCount=5
),
# 控制台处理器 - 输出到控制台
logging.StreamHandler()
]
)
app = Flask(__name__)
# 加载配置
app.config.from_object(config.get(APP_ENV, config['default'])())
# 注册API蓝图
app.register_blueprint(api_bp)
# 初始化队列管理器
init_queue_manager()
if __name__ == '__main__':
print("视频生成API服务启动中...")
print("核心API接口:")
print("1. 创建视频任务: POST /api/video/create")
print("2. 查询任务状态: GET /api/video/result/<task_id>")
print("3. 查询任务列表: GET /api/video/tasks")
print("4. 取消/删除任务: DELETE /api/video/cancel/<task_id>")
print("5. 健康检查: GET /health")
# app.run(host='0.0.0.0', port=5000, debug=True, threaded=True, use_reloader=True)
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True, use_reloader=False)

43
assessment_report.json Normal file
View File

@ -0,0 +1,43 @@
{
"assessment_time": "2025-06-06T23:40:14.511369",
"report": {
"overall_success_rate": 100.0,
"overall_grade": "A+ (优秀)",
"overall_assessment": "系统具有优秀的健壮性和性能,可以安全地部署到生产环境。",
"recommendations": [
"继续保持高质量的代码标准",
"定期进行回归测试",
"监控生产环境性能指标"
],
"risks": [],
"test_results": {
"basic_functionality": {
"success": true,
"test_count": 4,
"error_count": 0,
"details": "基础缓存、持久化和清理机制测试",
"output": "4396'}\n已清空内存中的等待队列\n从持久化文件恢复了 2 个等待任务\n恢复的任务 1: wait_001 - test waiting task 1\n恢复的任务 2: wait_002 - test waiting task 2\n已清理测试持久化文件\n持久化机制测试完成\n\n=== 测试缓存机制 ===\n添加了 2 个运行中任务到缓存\n添加了 2 个已完成任务到缓存\n\n=== 测试缓存查询 ===\n从缓存获取运行中任务: run_001 - running\n从缓存获取已完成任务: comp_001 - succeeded\n正确处理了不存在的任务查询\n\n缓存状态:\n运行中任务数: 2\n已完成任务数: 2\n等待队列数: 0\n缓存机制测试完成\n\n=== 测试清理机制 ===\n添加了 5 个旧的已完成任务\n清理前已完成任务数: 5\n等待任务过期...\n清理后已完成任务数: 0\n[OK] 清理机制正常工作\n\n=== 测试数量限制清理 ===\n添加了5个新任务当前已完成任务数: 5\n最终已完成任务数: 3\n缓存大小限制: 3\n[OK] 数量限制清理正常工作\n清理机制测试完成\n\n所有测试完成\n"
},
"robustness": {
"success": true,
"success_rate": 100.0,
"total_tests": 12,
"passed_tests": 12,
"failed_tests": 0,
"grade": "A+ (优秀)",
"assessment": "系统具有极高的健壮性,能够很好地处理各种边界情况和异常场景。",
"details": "边界条件、异常处理、并发操作、数据完整性测试"
},
"stress_performance": {
"success": true,
"success_rate": 60.0,
"total_tests": 5,
"passed_tests": 3,
"grade": "一般",
"assessment": "系统基本能够处理压力场景,但存在性能瓶颈,需要优化。",
"memory_change": 0.12450313568115234,
"details": "高容量操作、并发压力、长时间运行、内存管理测试"
}
}
}
}

351
comprehensive_assessment.py Normal file
View File

@ -0,0 +1,351 @@
# -*- coding: utf-8 -*-
"""
综合评估报告
汇总所有测试结果生成TaskQueueManager核心逻辑健壮性的综合评估
"""
import os
import sys
import subprocess
import json
from datetime import datetime
# 添加项目根目录到路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 设置测试环境变量
os.environ['ARK_API_KEY'] = 'test_api_key_for_testing'
os.environ['ARK_BASE_URL'] = 'https://test.api.com'
class ComprehensiveAssessment:
"""综合评估类"""
def __init__(self):
self.test_results = {
'basic_functionality': None,
'robustness': None,
'stress_performance': None
}
self.assessment_time = datetime.now()
def run_basic_tests(self):
"""运行基础功能测试"""
print("=== 运行基础功能测试 ===")
try:
result = subprocess.run(
[sys.executable, 'test_cache_persistence.py'],
capture_output=True,
text=True,
cwd=os.path.dirname(os.path.abspath(__file__))
)
# 分析输出
output = result.stdout
success = result.returncode == 0 and "所有测试完成" in output
# 统计测试结果
test_count = output.count("测试完成")
error_count = output.count("")
self.test_results['basic_functionality'] = {
'success': success,
'test_count': test_count,
'error_count': error_count,
'details': '基础缓存、持久化和清理机制测试',
'output': output[-500:] if len(output) > 500 else output # 保留最后500字符
}
print(f"✓ 基础功能测试完成 - 成功: {success}, 测试数: {test_count}, 错误数: {error_count}")
except Exception as e:
self.test_results['basic_functionality'] = {
'success': False,
'error': str(e),
'details': '基础功能测试执行失败'
}
print(f"✗ 基础功能测试失败: {str(e)}")
def run_robustness_tests(self):
"""运行健壮性测试"""
print("\n=== 运行健壮性测试 ===")
try:
# 直接导入并运行健壮性测试
from test_robustness import RobustnessTestSuite
test_suite = RobustnessTestSuite()
assessment = test_suite.run_all_tests()
self.test_results['robustness'] = {
'success': assessment['success_rate'] >= 80,
'success_rate': assessment['success_rate'],
'total_tests': assessment['total_tests'],
'passed_tests': assessment['passed_tests'],
'failed_tests': assessment['failed_tests'],
'grade': assessment['grade'],
'assessment': assessment['assessment'],
'details': '边界条件、异常处理、并发操作、数据完整性测试'
}
print(f"✓ 健壮性测试完成 - 成功率: {assessment['success_rate']:.1f}%, 评级: {assessment['grade']}")
except Exception as e:
self.test_results['robustness'] = {
'success': False,
'error': str(e),
'details': '健壮性测试执行失败'
}
print(f"✗ 健壮性测试失败: {str(e)}")
def run_stress_tests(self):
"""运行压力测试"""
print("\n=== 运行压力测试 ===")
try:
# 直接导入并运行压力测试
from test_stress import StressTestSuite
test_suite = StressTestSuite()
report = test_suite.run_stress_tests()
self.test_results['stress_performance'] = {
'success': report['success_rate'] >= 60,
'success_rate': report['success_rate'],
'total_tests': report['total_tests'],
'passed_tests': report['passed_tests'],
'grade': report['grade'],
'assessment': report['assessment'],
'memory_change': report['memory_change'],
'details': '高容量操作、并发压力、长时间运行、内存管理测试'
}
print(f"✓ 压力测试完成 - 成功率: {report['success_rate']:.1f}%, 评级: {report['grade']}")
except Exception as e:
self.test_results['stress_performance'] = {
'success': False,
'error': str(e),
'details': '压力测试执行失败'
}
print(f"✗ 压力测试失败: {str(e)}")
def generate_comprehensive_report(self):
"""生成综合评估报告"""
print("\n" + "="*80)
print("TaskQueueManager 核心逻辑健壮性综合评估报告")
print("="*80)
print(f"评估时间: {self.assessment_time.strftime('%Y-%m-%d %H:%M:%S')}")
# 1. 测试概览
print("\n📊 测试概览")
print("-" * 40)
total_categories = 0
passed_categories = 0
for category, result in self.test_results.items():
if result:
total_categories += 1
if result.get('success', False):
passed_categories += 1
status = "" if result.get('success', False) else ""
category_name = {
'basic_functionality': '基础功能',
'robustness': '健壮性',
'stress_performance': '压力性能'
}.get(category, category)
print(f"{status} {category_name}: {result.get('details', 'N/A')}")
if 'success_rate' in result:
print(f" 成功率: {result['success_rate']:.1f}%")
if 'grade' in result:
print(f" 评级: {result['grade']}")
overall_success_rate = (passed_categories / total_categories * 100) if total_categories > 0 else 0
# 2. 详细分析
print("\n📋 详细分析")
print("-" * 40)
# 基础功能分析
basic = self.test_results.get('basic_functionality')
if basic:
print(f"\n🔧 基础功能测试:")
if basic.get('success'):
print(f" ✓ 所有核心功能正常工作")
print(f" ✓ 缓存机制运行正常")
print(f" ✓ 持久化机制运行正常")
print(f" ✓ 清理机制运行正常")
else:
print(f" ✗ 基础功能存在问题")
if 'error' in basic:
print(f" 错误: {basic['error']}")
# 健壮性分析
robustness = self.test_results.get('robustness')
if robustness:
print(f"\n🛡️ 健壮性测试:")
if robustness.get('success'):
print(f" ✓ 边界条件处理良好")
print(f" ✓ 异常处理机制完善")
print(f" ✓ 并发操作安全")
print(f" ✓ 数据完整性保证")
else:
print(f" ⚠️ 健壮性需要改进")
if 'failed_tests' in robustness and robustness['failed_tests'] > 0:
print(f" 失败测试数: {robustness['failed_tests']}")
# 压力性能分析
stress = self.test_results.get('stress_performance')
if stress:
print(f"\n⚡ 压力性能测试:")
if stress.get('success'):
print(f" ✓ 高负载处理能力良好")
print(f" ✓ 内存管理有效")
print(f" ✓ 长时间运行稳定")
else:
print(f" ⚠️ 性能需要优化")
if 'memory_change' in stress:
print(f" 内存变化: {stress['memory_change']:+.1f}MB")
# 3. 综合评级
print("\n🎯 综合评级")
print("-" * 40)
if overall_success_rate >= 90:
overall_grade = "A+ (优秀)"
overall_assessment = "系统具有优秀的健壮性和性能,可以安全地部署到生产环境。"
recommendations = [
"继续保持高质量的代码标准",
"定期进行回归测试",
"监控生产环境性能指标"
]
elif overall_success_rate >= 75:
overall_grade = "A (良好)"
overall_assessment = "系统整体表现良好,具有较好的健壮性,适合生产环境使用。"
recommendations = [
"重点优化失败的测试用例",
"加强异常处理机制",
"定期进行性能调优"
]
elif overall_success_rate >= 60:
overall_grade = "B (一般)"
overall_assessment = "系统基本可用,但存在一些问题需要解决后再部署到生产环境。"
recommendations = [
"修复所有失败的测试用例",
"重点改进健壮性和异常处理",
"进行性能优化",
"增加监控和日志"
]
elif overall_success_rate >= 40:
overall_grade = "C (较差)"
overall_assessment = "系统存在较多问题,需要大量改进工作。"
recommendations = [
"全面重构异常处理机制",
"优化核心算法和数据结构",
"加强并发控制",
"进行全面的代码审查"
]
else:
overall_grade = "D (差)"
overall_assessment = "系统存在严重问题,不建议在当前状态下使用。"
recommendations = [
"重新设计核心架构",
"重写关键组件",
"建立完善的测试体系",
"进行全面的质量保证"
]
print(f"总体成功率: {overall_success_rate:.1f}%")
print(f"综合评级: {overall_grade}")
print(f"\n📝 评估结论:")
print(f"{overall_assessment}")
# 4. 改进建议
print("\n💡 改进建议")
print("-" * 40)
for i, recommendation in enumerate(recommendations, 1):
print(f"{i}. {recommendation}")
# 5. 技术指标总结
print("\n📈 技术指标总结")
print("-" * 40)
if robustness and 'success_rate' in robustness:
print(f"健壮性成功率: {robustness['success_rate']:.1f}%")
if stress and 'success_rate' in stress:
print(f"压力测试成功率: {stress['success_rate']:.1f}%")
if basic and basic.get('success'):
print(f"基础功能: 正常")
# 6. 风险评估
print("\n⚠️ 风险评估")
print("-" * 40)
risks = []
if not basic or not basic.get('success'):
risks.append("高风险: 基础功能不稳定")
if robustness and robustness.get('success_rate', 0) < 80:
risks.append("中风险: 异常处理能力不足")
if stress and stress.get('success_rate', 0) < 60:
risks.append("中风险: 高负载性能问题")
if not risks:
print("✓ 未发现重大风险")
else:
for risk in risks:
print(f"⚠️ {risk}")
return {
'overall_success_rate': overall_success_rate,
'overall_grade': overall_grade,
'overall_assessment': overall_assessment,
'recommendations': recommendations,
'risks': risks,
'test_results': self.test_results
}
def save_report(self, report, filename="assessment_report.json"):
"""保存评估报告到文件"""
report_data = {
'assessment_time': self.assessment_time.isoformat(),
'report': report
}
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(report_data, f, ensure_ascii=False, indent=2)
print(f"\n📄 评估报告已保存到: {filename}")
except Exception as e:
print(f"\n❌ 保存报告失败: {str(e)}")
def run_comprehensive_assessment(self):
"""运行综合评估"""
print("开始TaskQueueManager核心逻辑健壮性综合评估...\n")
# 运行所有测试
self.run_basic_tests()
self.run_robustness_tests()
self.run_stress_tests()
# 生成综合报告
report = self.generate_comprehensive_report()
# 保存报告
self.save_report(report)
return report
def main():
"""主函数"""
assessment = ComprehensiveAssessment()
return assessment.run_comprehensive_assessment()
if __name__ == "__main__":
main()

85
config.py Normal file
View File

@ -0,0 +1,85 @@
import os
from datetime import timedelta
from dotenv import load_dotenv
# 获取环境变量 APP_ENV默认为 'dev'
APP_ENV = os.getenv('APP_ENV', 'dev').lower()
# 根据环境变量加载对应的.env文件
env_file_map = {
'dev': '.env.dev',
'development': '.env.dev',
'prod': '.env.pro',
'production': '.env.pro',
'test': '.env.test',
'testing': '.env.test',
'dev-server': '.env.dev-server'
}
env_file = env_file_map.get(APP_ENV, '.env.dev')
if os.path.exists(env_file):
load_dotenv(env_file)
print(f"Successfully loaded configuration from {env_file} for environment: {APP_ENV}")
else:
print(f"Warning: Environment file {env_file} not found, using system environment variables")
class Config:
"""基础配置类"""
# Flask配置
SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-secret-key-change-in-production'
# API配置
API_VERSION = 'v1'
API_PREFIX = '/api'
# 火山引擎API配置
ARK_API_KEY = os.environ.get('ARK_API_KEY', '')
# 视频生成配置
VIDEO_MODEL = os.environ.get('VIDEO_MODEL', 'doubao-seedance-1-0-lite-i2v-250428') # doubao-seedance-1.0-lite 图生视频模型
VIDEO_MAX_DURATION = int(os.environ.get('VIDEO_MAX_DURATION', '10')) # 最大时长(秒)
VIDEO_DEFAULT_DURATION = int(os.environ.get('VIDEO_DEFAULT_DURATION', '5')) # 默认时长(秒)
VIDEO_DEFAULT_ASPECT_RATIO = os.environ.get('VIDEO_DEFAULT_ASPECT_RATIO', '16:9') # 默认宽高比
VIDEO_DEFAULT_RESOLUTION = os.environ.get('VIDEO_DEFAULT_RESOLUTION', '1280x720') # 默认分辨率
DEFAULT_VIDEO_DURATION = 10 # 默认视频时长(秒)
MAX_VIDEO_DURATION = 60 # 最大视频时长(秒)
DEFAULT_RESOLUTION = '1080p'
SUPPORTED_RESOLUTIONS = ['720p', '1080p', '4k']
DEFAULT_STYLE = 'realistic'
SUPPORTED_STYLES = ['realistic', 'cartoon', 'anime', 'abstract']
# 任务配置
TASK_TIMEOUT = timedelta(minutes=10) # 任务超时时间
MAX_CONCURRENT_TASKS = 10 # 最大并发任务数
# 文件存储配置
UPLOAD_FOLDER = os.environ.get('UPLOAD_FOLDER') or 'uploads'
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MB
# 日志配置
LOG_LEVEL = os.environ.get('LOG_LEVEL') or 'INFO'
LOG_FILE = os.environ.get('LOG_FILE') or 'app.log'
class DevelopmentConfig(Config):
"""开发环境配置"""
DEBUG = True
LOG_LEVEL = 'DEBUG'
class ProductionConfig(Config):
"""生产环境配置"""
DEBUG = False
LOG_LEVEL = 'WARNING'
class TestingConfig(Config):
"""测试环境配置"""
TESTING = True
DEBUG = True
# 配置字典
config = {
'development': DevelopmentConfig,
'production': ProductionConfig,
'testing': TestingConfig,
'default': DevelopmentConfig
}

60
docker-compose.yml Normal file
View File

@ -0,0 +1,60 @@
version: '3.8'
services:
video-api:
build: .
container_name: hs-video-api
ports:
- "5000:5000"
environment:
- FLASK_ENV=production
- SECRET_KEY=your-secret-key-here
volumes:
- ./logs:/app/logs
- ./uploads:/app/uploads
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:5000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
networks:
- video-api-network
# 可选添加Redis用于任务状态持久化
# redis:
# image: redis:7-alpine
# container_name: hs-video-redis
# ports:
# - "6379:6379"
# volumes:
# - redis_data:/data
# restart: unless-stopped
# networks:
# - video-api-network
# 可选添加Nginx反向代理
# nginx:
# image: nginx:alpine
# container_name: hs-video-nginx
# ports:
# - "80:80"
# - "443:443"
# volumes:
# - ./nginx.conf:/etc/nginx/nginx.conf:ro
# - ./ssl:/etc/nginx/ssl:ro
# depends_on:
# - video-api
# restart: unless-stopped
# networks:
# - video-api-network
networks:
video-api-network:
driver: bridge
volumes:
# redis_data:
logs:
uploads:

6
requirements.txt Normal file
View File

@ -0,0 +1,6 @@
Flask==2.3.3
Flask-CORS==4.0.0
requests==2.31.0
gunicorn==21.2.0
volcengine-python-sdk[ark]==1.0.98
python-dotenv==1.0.0

330
routes.py Normal file
View File

@ -0,0 +1,330 @@
from flask import Blueprint, request, jsonify
import os
from datetime import datetime
from video_service import get_video_service
from task_queue_manager import get_queue_manager
# 创建蓝图
api_bp = Blueprint('api', __name__)
@api_bp.route('/api/video/create', methods=['POST'])
def create_video_task():
"""创建视频生成任务"""
content_type = request.content_type
# print(f"Content-Type: {content_type}")
# print(f"Files: {request.files}")
# print(f"Form: {request.form}")
# 检查是否为文件上传模式
if content_type and 'multipart/form-data' in content_type:
# 文件上传模式
if 'image_file' not in request.files:
return jsonify({
'success': False,
'error': '缺少必需的参数: image_file'
}), 400
image_file = request.files['image_file']
if image_file.filename == '':
return jsonify({
'success': False,
'error': '未选择文件'
}), 400
# 读取图片文件并进行Base64编码
import base64
image_data = image_file.read()
image_base64 = base64.b64encode(image_data).decode('utf-8')
# 构建image_url
image_url = f"data:image/{image_file.filename.split('.')[-1]};base64,{image_base64}"
# 从表单数据获取其他参数
prompt = request.form.get('prompt', '')
duration = request.form.get('duration')
callback_url = request.form.get('callback_url')
elif content_type and 'application/json' in content_type:
# JSON请求模式
data = request.get_json()
if not data:
return jsonify({
'success': False,
'error': '无效的JSON数据'
}), 400
image_url = data.get('image_url')
prompt = data.get('prompt', '')
duration = data.get('duration')
callback_url = data.get('callback_url')
if not image_url:
return jsonify({
'success': False,
'error': '缺少必需的参数: image_url'
}), 400
else:
return jsonify({
'success': False,
'error': '不支持的请求类型请使用multipart/form-data或application/json'
}), 400
# 验证必需参数
if not prompt:
return jsonify({
'success': False,
'error': '缺少必需的参数: prompt'
}), 400
# 构建content
content = {
'image_url': image_url,
'prompt': prompt
}
# 构建parameters
parameters = {}
if duration is not None and duration != '':
try:
parameters['duration'] = float(duration)
except (ValueError, TypeError):
return jsonify({
'success': False,
'error': 'duration参数必须是数字'
}), 400
try:
# 获取队列管理器
queue_manager = get_queue_manager()
# 检查是否可以直接创建任务
if queue_manager.can_create_task():
# 直接创建任务
video_service = get_video_service()
result = video_service.create_video_generation_task(content, callback_url, parameters)
if result['success']:
task_data = result['data']
task_data['status'] = 'running' # 设置初始状态
# 添加到运行队列
queue_manager.add_task_to_queue(task_data)
return jsonify({
'success': True,
'data': task_data,
'queue_status': 'running'
})
else:
return jsonify({
'success': False,
'error': result['error']
}), 500
else:
# 队列已满,创建任务但加入等待队列
video_service = get_video_service()
result = video_service.create_video_generation_task(content, callback_url, parameters)
if result['success']:
task_data = result['data']
task_data['status'] = 'waiting' # 设置等待状态
# 添加到等待队列
queue_manager.add_task_to_queue(task_data)
return jsonify({
'success': True,
'data': task_data,
'queue_status': 'waiting',
'message': '任务已创建并加入等待队列,将在有空闲位置时开始执行'
})
else:
return jsonify({
'success': False,
'error': result['error']
}), 500
except Exception as e:
return jsonify({
'success': False,
'error': f'创建任务失败: {str(e)}'
}), 500
@api_bp.route('/api/video/status/<task_id>', methods=['GET'])
def get_task_status(task_id):
"""查询任务状态"""
video_service = get_video_service()
result = video_service.get_task_status(task_id)
if result['success']:
return jsonify({
'success': True,
'data': result['data']
})
else:
return jsonify({
'success': False,
'error': result['error']
}), 500
@api_bp.route('/api/video/result/<task_id>', methods=['GET'])
def get_task_result(task_id):
"""获取任务结果"""
try:
# 先从缓存查询
queue_manager = get_queue_manager()
cached_task = queue_manager.get_task_from_cache(task_id)
if cached_task:
# 从缓存返回结果
task_data = cached_task
print(f"从缓存获取结果: {task_id}")
else:
# 缓存中没有调用SDK查询
print(f"缓存中没有调用SDK查询: {task_id}")
video_service = get_video_service()
result = video_service.get_task_status(task_id)
if not result['success']:
return jsonify({
'success': False,
'error': result['error']
}), 500
task_data = result['data']
# 检查任务状态
if task_data['status'] == 'succeeded':
return jsonify({
'success': True,
'data': {
'task_id': task_data['task_id'],
'status': task_data['status'],
'video_url': task_data.get('content', {}).get('video_url') if task_data.get('content') else None,
'created_at': task_data.get('created_at'),
'updated_at': task_data.get('updated_at')
}
})
elif task_data['status'] == 'failed':
return jsonify({
'success': False,
'error': f"任务失败: {task_data.get('error', '未知错误')}"
}), 500
elif task_data['status'] == 'not_found':
return jsonify({
'success': False,
'error': '任务不存在或已被删除'
}), 404
else:
return jsonify({
'success': True,
'data': {
'task_id': task_data['task_id'],
'status': task_data['status'],
'message': '任务正在处理中,请稍后查询'
}
})
except Exception as e:
return jsonify({
'success': False,
'error': f'查询异常: {str(e)}'
}), 500
@api_bp.route('/api/video/tasks', methods=['GET'])
def get_task_list():
"""获取任务列表"""
limit = request.args.get('limit', 20, type=int)
offset = request.args.get('offset', 0, type=int)
try:
# 验证参数
if limit < 1 or limit > 100:
return jsonify({
'success': False,
'error': 'limit必须在1-100之间'
}), 400
if offset < 0:
return jsonify({
'success': False,
'error': 'offset必须大于等于0'
}), 400
# 调用火山引擎API获取任务列表
video_service = get_video_service()
result = video_service.get_task_list(limit, offset)
if result['success']:
task_data = result['data']
return jsonify({
'success': True,
'data': task_data
})
else:
return jsonify({
'success': False,
'error': result['error']
}), 500
except Exception as e:
return jsonify({
'success': False,
'error': f'获取任务列表失败: {str(e)}'
}), 500
@api_bp.route('/api/video/cancel/<task_id>', methods=['DELETE'])
def cancel_task(task_id):
"""删除任务"""
try:
# 调用火山引擎API删除任务
video_service = get_video_service()
result = video_service.delete_task(task_id)
if result['success']:
# 同时从队列管理器中删除缓存
queue_manager = get_queue_manager()
queue_manager.remove_task_from_cache(task_id)
return jsonify({
'success': True,
'message': '任务删除成功'
})
else:
return jsonify({
'success': False,
'error': result['error']
}), 500
except Exception as e:
return jsonify({
'success': False,
'error': f'删除任务失败: {str(e)}'
}), 500
@api_bp.route('/api/video/queue/status', methods=['GET'])
def get_queue_status():
"""获取队列状态"""
try:
queue_manager = get_queue_manager()
status = queue_manager.get_queue_status()
return jsonify({
'success': True,
'data': status
})
except Exception as e:
return jsonify({
'success': False,
'error': f'获取队列状态失败: {str(e)}'
}), 500
@api_bp.route('/health', methods=['GET'])
def health_check():
"""健康检查"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'service': 'video-generation-api'
})

478
task_queue_manager.py Normal file
View File

@ -0,0 +1,478 @@
import threading
import time
from datetime import datetime, timedelta
from collections import deque
from typing import Dict, List, Any, Optional
import json
import os
from video_service import get_video_service
import logging
logger = logging.getLogger(__name__)
class TaskQueueManager:
"""任务队列管理器"""
def __init__(self, max_running_tasks: int = None, update_interval: int = None, persistence_file: str = None):
"""
初始化任务队列管理器
Args:
max_running_tasks: 最大运行任务数量默认从环境变量读取
update_interval: 更新间隔默认从环境变量读取
persistence_file: 持久化文件路径默认从环境变量读取
"""
# 从环境变量读取配置,如果没有则使用默认值
self.max_running_tasks = max_running_tasks or int(os.environ.get('QUEUE_MAX_RUNNING_TASKS', '5'))
self.update_interval = update_interval or int(os.environ.get('QUEUE_UPDATE_INTERVAL', '5'))
self.persistence_file = persistence_file or os.environ.get('QUEUE_PERSISTENCE_FILE', 'task_queue_persistence.json')
# 运行中的任务缓存 (task_id -> task_data)
self.running_tasks_cache: Dict[str, Dict[str, Any]] = {}
# 已完成任务缓存 (task_id -> task_data) - 按时间排序保留最近的任务
self.completed_tasks_cache: Dict[str, Dict[str, Any]] = {}
# 等待队列 (FIFO) - 存储等待中的任务请求
self.waiting_queue: deque = deque()
# 线程锁
self._lock = threading.Lock()
# 更新线程
self._update_thread = None
self._stop_event = threading.Event()
# 视频服务
self.video_service = get_video_service()
# 缓存清理配置(从环境变量读取)
self.max_completed_cache_size = int(os.environ.get('QUEUE_MAX_COMPLETED_CACHE_SIZE', '100')) # 最多保留已完成任务数量
self.completed_cache_ttl_hours = int(os.environ.get('QUEUE_COMPLETED_CACHE_TTL_HOURS', '24')) # 已完成任务缓存保留小时数
def start(self):
"""启动队列管理器"""
logger.info("启动任务队列管理器")
# 从持久化文件恢复等待队列
self._load_persistence_data()
# 从SDK恢复缓存数据
self._load_initial_tasks()
# 启动更新线程
self._start_update_thread()
def stop(self):
"""停止队列管理器"""
logger.info("停止任务队列管理器")
# 保存等待队列到持久化文件
self._save_persistence_data()
self._stop_event.set()
if self._update_thread and self._update_thread.is_alive():
self._update_thread.join()
def _load_persistence_data(self):
"""从持久化文件加载等待队列数据"""
try:
if os.path.exists(self.persistence_file):
with open(self.persistence_file, 'r', encoding='utf-8') as f:
data = json.load(f)
with self._lock:
# 恢复等待队列
waiting_tasks = data.get('waiting_queue', [])
self.waiting_queue = deque(waiting_tasks)
logger.info(f"从持久化文件恢复了 {len(self.waiting_queue)} 个等待任务")
else:
logger.info("持久化文件不存在,跳过恢复")
except Exception as e:
logger.error(f"加载持久化数据异常: {str(e)}")
def _save_persistence_data(self):
"""保存等待队列数据到持久化文件"""
try:
with self._lock:
data = {
'waiting_queue': list(self.waiting_queue),
'timestamp': datetime.now().isoformat()
}
with open(self.persistence_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"保存了 {len(self.waiting_queue)} 个等待任务到持久化文件")
except Exception as e:
logger.error(f"保存持久化数据异常: {str(e)}")
def _load_initial_tasks(self):
"""从SDK加载初始任务恢复缓存"""
try:
logger.info("开始从SDK恢复任务缓存...")
# 分页加载任务
offset = 0
limit = 50
running_tasks_loaded = 0
completed_tasks_loaded = 0
page_count = 0
while True:
result = self.video_service.get_task_list(limit=limit, offset=offset)
if not result['success']:
logger.error(f"加载任务失败(offset={offset}): {result['error']}")
break
tasks = result['data']['tasks']
if not tasks:
break
with self._lock:
for task in tasks:
task_id = task['task_id']
task['cache_time'] = datetime.now().isoformat()
if task['status'] in ['running', 'queued']:
# 运行中任务缓存
self.running_tasks_cache[task_id] = task
running_tasks_loaded += 1
logger.debug(f"恢复运行中任务: {task_id}")
elif task['status'] not in ['queued', 'running']:
# 已完成任务缓存(只保留最近的)
if completed_tasks_loaded < self.max_completed_cache_size:
self.completed_tasks_cache[task_id] = task
completed_tasks_loaded += 1
logger.debug(f"恢复已完成任务: {task_id}")
# 如果已经加载足够的任务,可以停止
if completed_tasks_loaded >= self.max_completed_cache_size and running_tasks_loaded > 0:
break
offset += limit
page_count += 1
# 防止无限循环最多加载10页
if page_count >= 10:
break
logger.info(f"缓存恢复完成: {running_tasks_loaded} 个运行中任务, {completed_tasks_loaded} 个已完成任务")
except Exception as e:
logger.error(f"恢复任务缓存异常: {str(e)}")
def _start_update_thread(self):
"""启动更新线程"""
self._update_thread = threading.Thread(target=self._update_loop, daemon=True)
self._update_thread.start()
def _update_loop(self):
"""更新循环"""
logger.info(f"任务状态更新线程已启动,更新间隔: {self.update_interval}")
while not self._stop_event.is_set():
try:
# 记录更新开始
running_count = len(self.running_tasks_cache)
if running_count > 0:
logger.info(f"开始更新任务状态,当前运行中任务数: {running_count}")
self._update_task_statuses()
self._process_waiting_queue()
# 记录更新完成
if running_count > 0:
new_running_count = len(self.running_tasks_cache)
logger.info(f"任务状态更新完成,运行中任务数: {new_running_count}")
except Exception as e:
logger.error(f"更新任务状态异常: {str(e)}")
# 等待指定间隔
self._stop_event.wait(self.update_interval)
def _update_task_statuses(self):
"""更新任务状态"""
try:
# 获取video_service实例
from video_service import VideoGenerationService
video_service = VideoGenerationService()
# 获取需要更新的运行中任务
tasks_to_update = []
with self._lock:
tasks_to_update = list(self.running_tasks_cache.values())
if len(tasks_to_update) == 0:
return
logger.info(f"开始查询 {len(tasks_to_update)} 个运行中任务的状态")
# 遍历运行中的任务,查询最新状态
for task in tasks_to_update:
try:
task_id = task.get('task_id') or task.get('id')
if not task_id:
continue
logger.info(f"查询任务 {task_id} 的状态")
# 查询任务状态
status_result = video_service.get_task_status(task_id)
if status_result['success']:
updated_task = status_result['data']
task_status = updated_task.get('status', 'running')
logger.info(f"任务 {task_id} 当前状态: {task_status}")
# 更新缓存中的任务信息
with self._lock:
if task_id in self.running_tasks_cache:
# 更新任务数据
self.running_tasks_cache[task_id].update(updated_task)
# 如果任务已完成,移动到已完成缓存
# 正确的状态succeeded, failed, running, cancelled, queued
# 其中只有 queued 和 running 是运行中状态,其他都是已完成状态
if task_status not in ['queued', 'running']:
completed_task = self.running_tasks_cache.pop(task_id)
completed_task['completed_at'] = datetime.now().isoformat()
self.completed_tasks_cache[task_id] = completed_task
logger.info(f"任务 {task_id} 状态更新为: {task_status},已移动到完成缓存")
else:
logger.info(f"任务 {task_id} 仍在运行中,状态: {task_status}")
else:
logger.warning(f"查询任务 {task_id} 状态失败: {status_result.get('error', '未知错误')}")
except Exception as e:
logger.error(f"更新任务 {task_id} 状态时发生错误: {str(e)}")
continue
# 调用清理方法
self._cleanup_completed_tasks()
except Exception as e:
logger.error(f"更新任务状态时发生错误: {str(e)}")
def _cleanup_completed_tasks(self):
"""清理已完成任务缓存"""
try:
with self._lock:
# 快速检查是否需要清理
cache_size = len(self.completed_tasks_cache)
if cache_size == 0:
return
current_time = datetime.now()
cutoff_time = current_time - timedelta(hours=self.completed_cache_ttl_hours)
# 优化使用集合存储要删除的任务ID提高查找效率
tasks_to_remove = set()
valid_tasks = []
# 1. 一次遍历完成过期检查和有效任务收集
for task_id, task_data in self.completed_tasks_cache.items():
cache_time_str = task_data.get('cache_time')
if cache_time_str:
try:
cache_time = datetime.fromisoformat(cache_time_str)
if cache_time < cutoff_time:
tasks_to_remove.add(task_id)
else:
valid_tasks.append((task_id, task_data, cache_time))
except (ValueError, TypeError):
# 时间格式有问题,标记删除
tasks_to_remove.add(task_id)
else:
# 没有缓存时间,标记删除
tasks_to_remove.add(task_id)
# 2. 检查数量限制(只对有效任务进行排序)
if len(valid_tasks) > self.max_completed_cache_size:
# 按缓存时间排序(最新的在前),只排序有效任务
valid_tasks.sort(key=lambda x: x[2], reverse=True)
# 保留最新的任务,其余的标记删除
for task_id, _, _ in valid_tasks[self.max_completed_cache_size:]:
tasks_to_remove.add(task_id)
# 3. 批量删除(如果有需要删除的任务)
if tasks_to_remove:
for task_id in tasks_to_remove:
self.completed_tasks_cache.pop(task_id, None)
logger.info(f"清理了 {len(tasks_to_remove)} 个已完成任务,当前缓存大小: {len(self.completed_tasks_cache)}")
except Exception as e:
logger.error(f"清理已完成任务缓存时发生错误: {str(e)}")
# 发生错误时不中断服务运行
def _process_waiting_queue(self):
"""处理等待队列"""
try:
tasks_moved = 0
with self._lock:
# 检查是否有空闲位置
while len(self.running_tasks_cache) < self.max_running_tasks and self.waiting_queue:
waiting_task = self.waiting_queue.popleft()
task_id = waiting_task['task_id']
# 将等待任务移到运行中缓存
waiting_task['cache_time'] = datetime.now().isoformat()
self.running_tasks_cache[task_id] = waiting_task
tasks_moved += 1
logger.info(f"等待任务开始执行: {task_id}")
# 批量保存持久化数据(只在有变化时保存)
if tasks_moved > 0:
self._save_persistence_data()
except Exception as e:
logger.error(f"处理等待队列时发生错误: {str(e)}")
# 确保异常不会中断服务运行
def can_create_task(self) -> bool:
"""检查是否可以创建新任务"""
with self._lock:
# 限制总任务数(运行中 + 等待中)不超过合理数量
total_tasks = len(self.running_tasks_cache) + len(self.waiting_queue)
max_total_tasks = self.max_running_tasks + 5 # 最多允许5个等待任务
return total_tasks < max_total_tasks
def add_task_to_queue(self, task_data: Dict[str, Any]) -> bool:
"""添加任务到队列
Args:
task_data: 任务数据
Returns:
True: 直接加入运行队列
False: 加入等待队列
"""
task_id = task_data['task_id']
task_data['cache_time'] = datetime.now().isoformat()
with self._lock:
if len(self.running_tasks_cache) < self.max_running_tasks:
# 直接加入运行中缓存
self.running_tasks_cache[task_id] = task_data
logger.info(f"任务直接加入运行队列: {task_id}")
return True
else:
# 加入等待队列
self.waiting_queue.append(task_data)
logger.info(f"任务加入等待队列: {task_id}, 等待队列长度: {len(self.waiting_queue)}")
# 保存持久化数据(等待队列发生变化)
self._save_persistence_data()
return False
def get_task_from_cache(self, task_id: str) -> Optional[Dict[str, Any]]:
"""从缓存获取任务数据"""
with self._lock:
# 先从运行中缓存查找
if task_id in self.running_tasks_cache:
return self.running_tasks_cache[task_id]
# 再从已完成缓存查找
if task_id in self.completed_tasks_cache:
return self.completed_tasks_cache[task_id]
# 最后从等待队列查找
for task in self.waiting_queue:
if task['task_id'] == task_id:
return task
return None
def get_task_by_id(self, task_id: str) -> Optional[Dict[str, Any]]:
"""根据任务ID获取任务数据兼容性方法"""
return self.get_task_from_cache(task_id)
def remove_task_from_cache(self, task_id: str):
"""从缓存中删除任务"""
with self._lock:
removed = False
# 从运行中缓存删除
if task_id in self.running_tasks_cache:
del self.running_tasks_cache[task_id]
logger.info(f"从运行中缓存删除任务: {task_id}")
removed = True
# 从已完成缓存删除
if task_id in self.completed_tasks_cache:
del self.completed_tasks_cache[task_id]
logger.info(f"从已完成缓存删除任务: {task_id}")
removed = True
# 从等待队列中删除
original_length = len(self.waiting_queue)
self.waiting_queue = deque([task for task in self.waiting_queue if task['task_id'] != task_id])
if len(self.waiting_queue) < original_length:
logger.info(f"从等待队列删除任务: {task_id}")
removed = True
# 保存持久化数据(等待队列发生变化)
self._save_persistence_data()
if not removed:
logger.warning(f"任务 {task_id} 不在任何缓存中")
def get_queue_status(self) -> Dict[str, Any]:
"""获取队列状态"""
with self._lock:
# 统计各种状态的任务数量
status_counts = {}
# 统计运行中任务状态
for task in self.running_tasks_cache.values():
status = task['status']
status_counts[status] = status_counts.get(status, 0) + 1
# 统计已完成任务状态
for task in self.completed_tasks_cache.values():
status = task['status']
status_counts[status] = status_counts.get(status, 0) + 1
# 统计等待队列任务状态
for task in self.waiting_queue:
status = task.get('status', 'waiting')
status_counts[status] = status_counts.get(status, 0) + 1
total_cache_count = len(self.running_tasks_cache) + len(self.completed_tasks_cache) + len(self.waiting_queue)
return {
'running_tasks_count': len(self.running_tasks_cache),
'completed_tasks_count': len(self.completed_tasks_cache),
'waiting_queue_count': len(self.waiting_queue),
'total_cache_count': total_cache_count,
'status_counts': status_counts,
'max_running_tasks': self.max_running_tasks,
'max_completed_cache_size': self.max_completed_cache_size,
'completed_cache_ttl_hours': self.completed_cache_ttl_hours,
'running_task_ids': list(self.running_tasks_cache.keys()),
'completed_task_ids': list(self.completed_tasks_cache.keys()),
'waiting_task_ids': [task['task_id'] for task in self.waiting_queue],
'persistence_file': self.persistence_file
}
# 全局队列管理器实例
_queue_manager = None
def get_queue_manager() -> TaskQueueManager:
"""获取队列管理器实例(单例模式)"""
global _queue_manager
if _queue_manager is None:
_queue_manager = TaskQueueManager()
return _queue_manager
def init_queue_manager():
"""初始化并启动队列管理器"""
queue_manager = get_queue_manager()
queue_manager.start()
return queue_manager

311
test_cache_persistence.py Normal file
View File

@ -0,0 +1,311 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
缓存和持久化机制测试脚本
测试新的分层缓存持久化和清理机制
"""
import os
import sys
import time
import json
from datetime import datetime, timedelta
from unittest.mock import Mock, patch
# 添加项目根目录到路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 设置测试环境变量
os.environ['ARK_API_KEY'] = 'test_api_key_for_testing'
os.environ['ARK_BASE_URL'] = 'https://test.api.com'
from task_queue_manager import TaskQueueManager
from video_service import get_video_service
def test_persistence_mechanism():
"""测试持久化机制"""
print("=== 测试持久化机制 ===")
# 创建测试用的持久化文件
test_persistence_file = "test_persistence.json"
# 清理之前的测试文件
if os.path.exists(test_persistence_file):
os.remove(test_persistence_file)
# 创建mock video service
mock_video_service = Mock()
mock_video_service.get_all_tasks.return_value = []
# 使用patch来替换get_video_service
with patch('task_queue_manager.get_video_service', return_value=mock_video_service):
# 创建队列管理器
queue_manager = TaskQueueManager(
max_running_tasks=2,
update_interval=1,
persistence_file=test_persistence_file
)
# 设置缓存参数
queue_manager.max_completed_cache_size = 10
queue_manager.completed_cache_ttl_hours = 1
# 模拟添加等待任务
waiting_tasks = [
{
'task_id': 'wait_001',
'content': 'test waiting task 1',
'params': {'test': True},
'created_at': datetime.now().isoformat()
},
{
'task_id': 'wait_002',
'content': 'test waiting task 2',
'params': {'test': True},
'created_at': datetime.now().isoformat()
}
]
for task in waiting_tasks:
queue_manager.waiting_queue.append(task)
# 保存持久化数据
queue_manager._save_persistence_data()
print(f"已保存 {len(waiting_tasks)} 个等待任务到持久化文件")
# 验证文件是否存在
if os.path.exists(test_persistence_file):
with open(test_persistence_file, 'r', encoding='utf-8') as f:
saved_data = json.load(f)
print(f"持久化文件内容: {saved_data}")
# 清空内存中的等待队列
queue_manager.waiting_queue.clear()
print("已清空内存中的等待队列")
# 从持久化文件恢复
queue_manager._load_persistence_data()
print(f"从持久化文件恢复了 {len(queue_manager.waiting_queue)} 个等待任务")
# 验证恢复的数据
for i, task in enumerate(queue_manager.waiting_queue):
print(f"恢复的任务 {i+1}: {task['task_id']} - {task['content']}")
# 清理测试文件
if os.path.exists(test_persistence_file):
os.remove(test_persistence_file)
print("已清理测试持久化文件")
print("持久化机制测试完成\n")
def test_cache_mechanism():
"""测试缓存机制"""
print("=== 测试缓存机制 ===")
# 创建mock video service
mock_video_service = Mock()
mock_video_service.get_all_tasks.return_value = []
# 使用patch来替换get_video_service
with patch('task_queue_manager.get_video_service', return_value=mock_video_service):
# 创建队列管理器
queue_manager = TaskQueueManager(
max_running_tasks=3,
update_interval=1
)
# 设置缓存参数
queue_manager.max_completed_cache_size = 5
queue_manager.completed_cache_ttl_hours = 0.003 # 约10秒TTL用于测试
# 模拟运行中任务
running_tasks = [
{
'task_id': 'run_001',
'status': 'running',
'content': 'running task 1',
'params': {'test': True},
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
},
{
'task_id': 'run_002',
'status': 'running',
'content': 'running task 2',
'params': {'test': True},
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
]
# 添加到运行中缓存
for task in running_tasks:
queue_manager.running_tasks_cache[task['task_id']] = task
print(f"添加了 {len(running_tasks)} 个运行中任务到缓存")
# 模拟已完成任务
completed_tasks = [
{
'task_id': 'comp_001',
'status': 'succeeded',
'content': 'completed task 1',
'params': {'test': True},
'created_at': (datetime.now() - timedelta(minutes=5)).isoformat(),
'cache_time': datetime.now().isoformat()
},
{
'task_id': 'comp_002',
'status': 'succeeded',
'content': 'completed task 2',
'params': {'test': True},
'created_at': (datetime.now() - timedelta(minutes=3)).isoformat(),
'cache_time': datetime.now().isoformat()
}
]
# 添加到已完成缓存
for task in completed_tasks:
queue_manager.completed_tasks_cache[task['task_id']] = task
print(f"添加了 {len(completed_tasks)} 个已完成任务到缓存")
# 测试缓存查询
print("\n=== 测试缓存查询 ===")
# 查询运行中任务
task = queue_manager.get_task_from_cache('run_001')
if task:
print(f"从缓存获取运行中任务: {task['task_id']} - {task['status']}")
# 查询已完成任务
task = queue_manager.get_task_from_cache('comp_001')
if task:
print(f"从缓存获取已完成任务: {task['task_id']} - {task['status']}")
# 查询不存在的任务
task = queue_manager.get_task_from_cache('not_exist')
if not task:
print("正确处理了不存在的任务查询")
# 测试缓存状态
status = queue_manager.get_queue_status()
print(f"\n缓存状态:")
print(f"运行中任务数: {status['running_tasks_count']}")
print(f"已完成任务数: {status['completed_tasks_count']}")
print(f"等待队列数: {status['waiting_queue_count']}")
print("缓存机制测试完成\n")
def test_cleanup_mechanism():
"""测试清理机制"""
print("=== 测试清理机制 ===")
# 创建mock video service
mock_video_service = Mock()
mock_video_service.get_all_tasks.return_value = []
# 使用patch来替换get_video_service
with patch('task_queue_manager.get_video_service', return_value=mock_video_service):
# 创建队列管理器设置较短的TTL用于测试
queue_manager = TaskQueueManager(
max_running_tasks=3,
update_interval=1
)
# 设置缓存参数
queue_manager.max_completed_cache_size = 3 # 设置较小的缓存大小
queue_manager.completed_cache_ttl_hours = 0.0006 # 约2秒TTL用于测试
# 添加多个已完成任务
old_tasks = []
for i in range(5):
task = {
'task_id': f'old_task_{i}',
'status': 'succeeded',
'content': f'old task {i}',
'params': {'test': True},
'created_at': (datetime.now() - timedelta(hours=2)).isoformat(),
'cache_time': (datetime.now() - timedelta(hours=1)).isoformat()
}
old_tasks.append(task)
queue_manager.completed_tasks_cache[task['task_id']] = task
print(f"添加了 {len(old_tasks)} 个旧的已完成任务")
# 检查清理前的状态
status_before = queue_manager.get_queue_status()
print(f"清理前已完成任务数: {status_before['completed_tasks_count']}")
# 等待一段时间让任务过期
print("等待任务过期...")
time.sleep(3)
# 手动触发清理
queue_manager._cleanup_completed_tasks()
# 检查清理后的状态
status_after = queue_manager.get_queue_status()
print(f"清理后已完成任务数: {status_after['completed_tasks_count']}")
# 验证清理效果
if status_after['completed_tasks_count'] < status_before['completed_tasks_count']:
print("[OK] 清理机制正常工作")
else:
print("[ERROR] 清理机制可能存在问题")
# 测试数量限制清理
print("\n=== 测试数量限制清理 ===")
# 添加更多任务超过限制
for i in range(5):
task = {
'task_id': f'new_task_{i}',
'status': 'succeeded',
'content': f'new task {i}',
'params': {'test': True},
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
queue_manager.completed_tasks_cache[task['task_id']] = task
print(f"添加了5个新任务当前已完成任务数: {len(queue_manager.completed_tasks_cache)}")
# 触发清理
queue_manager._cleanup_completed_tasks()
final_status = queue_manager.get_queue_status()
print(f"最终已完成任务数: {final_status['completed_tasks_count']}")
print(f"缓存大小限制: {queue_manager.max_completed_cache_size}")
if final_status['completed_tasks_count'] <= queue_manager.max_completed_cache_size:
print("[OK] 数量限制清理正常工作")
else:
print("[ERROR] 数量限制清理可能存在问题")
print("清理机制测试完成\n")
def main():
"""主测试函数"""
print("开始测试缓存和持久化机制...\n")
try:
# 测试持久化机制
test_persistence_mechanism()
# 测试缓存机制
test_cache_mechanism()
# 测试清理机制
test_cleanup_mechanism()
print("所有测试完成!")
except Exception as e:
print(f"测试过程中发生错误: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == '__main__':
main()

555
test_robustness.py Normal file
View File

@ -0,0 +1,555 @@
# -*- coding: utf-8 -*-
"""
核心逻辑健壮性测试脚本
全面测试TaskQueueManager的各种边界情况异常处理和并发场景
"""
import os
import sys
import time
import json
import threading
import tempfile
from datetime import datetime, timedelta
from unittest.mock import Mock, patch, MagicMock
from concurrent.futures import ThreadPoolExecutor
import random
# 添加项目根目录到路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 设置测试环境变量
os.environ['ARK_API_KEY'] = 'test_api_key_for_testing'
os.environ['ARK_BASE_URL'] = 'https://test.api.com'
from task_queue_manager import TaskQueueManager
from video_service import get_video_service
class RobustnessTestSuite:
"""健壮性测试套件"""
def __init__(self):
self.test_results = []
self.temp_files = []
def log_test_result(self, test_name, passed, details=""):
"""记录测试结果"""
result = {
'test_name': test_name,
'passed': passed,
'details': details,
'timestamp': datetime.now().isoformat()
}
self.test_results.append(result)
status = "" if passed else ""
print(f"{status} {test_name}: {details}")
def cleanup(self):
"""清理测试文件"""
for temp_file in self.temp_files:
if os.path.exists(temp_file):
try:
os.remove(temp_file)
except:
pass
def test_boundary_conditions(self):
"""测试边界条件"""
print("\n=== 边界条件测试 ===")
# 测试1: 零任务处理
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=1, update_interval=1)
status = manager.get_queue_status()
passed = (status['running_tasks_count'] == 0 and
status['completed_tasks_count'] == 0 and
status['waiting_queue_count'] == 0)
self.log_test_result("零任务处理", passed, f"状态: {status}")
except Exception as e:
self.log_test_result("零任务处理", False, f"异常: {str(e)}")
# 测试2: 最大缓存容量边界
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=1, update_interval=1)
manager.max_completed_cache_size = 2
# 添加超过限制的任务
for i in range(5):
task = {
'task_id': f'boundary_task_{i}',
'status': 'succeeded',
'content': f'boundary task {i}',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
# 触发清理
manager._cleanup_completed_tasks()
final_count = len(manager.completed_tasks_cache)
passed = final_count <= manager.max_completed_cache_size
self.log_test_result("最大缓存容量边界", passed,
f"最终缓存数量: {final_count}, 限制: {manager.max_completed_cache_size}")
except Exception as e:
self.log_test_result("最大缓存容量边界", False, f"异常: {str(e)}")
# 测试3: 极短TTL测试
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=1, update_interval=1)
manager.completed_cache_ttl_hours = 0.0001 # 约0.36秒
# 添加任务
task = {
'task_id': 'ttl_test_task',
'status': 'succeeded',
'content': 'ttl test task',
'created_at': datetime.now().isoformat(),
'cache_time': (datetime.now() - timedelta(seconds=1)).isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
# 等待过期
time.sleep(0.5)
manager._cleanup_completed_tasks()
passed = len(manager.completed_tasks_cache) == 0
self.log_test_result("极短TTL测试", passed,
f"清理后缓存数量: {len(manager.completed_tasks_cache)}")
except Exception as e:
self.log_test_result("极短TTL测试", False, f"异常: {str(e)}")
def test_exception_handling(self):
"""测试异常处理"""
print("\n=== 异常处理测试 ===")
# 测试1: 持久化文件权限错误
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
# 创建一个无效的文件路径
invalid_path = "/invalid/path/persistence.json"
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(
max_running_tasks=1,
update_interval=1,
persistence_file=invalid_path
)
# 尝试保存数据(应该优雅处理错误)
manager.waiting_queue.append({'task_id': 'test', 'content': 'test'})
manager._save_persistence_data() # 不应该崩溃
self.log_test_result("持久化文件权限错误", True, "优雅处理了文件权限错误")
except Exception as e:
self.log_test_result("持久化文件权限错误", False, f"未能优雅处理: {str(e)}")
# 测试2: 损坏的持久化文件
try:
temp_file = tempfile.mktemp(suffix='.json')
self.temp_files.append(temp_file)
# 创建损坏的JSON文件
with open(temp_file, 'w') as f:
f.write('{invalid json content')
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(
max_running_tasks=1,
update_interval=1,
persistence_file=temp_file
)
# 尝试加载损坏的文件(应该优雅处理)
manager._load_persistence_data() # 不应该崩溃
self.log_test_result("损坏的持久化文件", True, "优雅处理了损坏的JSON文件")
except Exception as e:
self.log_test_result("损坏的持久化文件", False, f"未能优雅处理: {str(e)}")
# 测试3: API服务异常
try:
mock_service = Mock()
mock_service.get_all_tasks.side_effect = Exception("API服务不可用")
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=1, update_interval=1)
# 尝试更新状态应该优雅处理API异常
manager._update_task_statuses() # 不应该崩溃
self.log_test_result("API服务异常", True, "优雅处理了API服务异常")
except Exception as e:
self.log_test_result("API服务异常", False, f"未能优雅处理: {str(e)}")
def test_concurrent_operations(self):
"""测试并发操作"""
print("\n=== 并发操作测试 ===")
# 测试1: 并发缓存操作
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=10, update_interval=1)
manager.max_completed_cache_size = 100
def add_tasks(thread_id, count):
"""并发添加任务"""
for i in range(count):
task = {
'task_id': f'concurrent_task_{thread_id}_{i}',
'status': 'succeeded',
'content': f'concurrent task {thread_id}_{i}',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
time.sleep(0.001) # 模拟处理时间
# 启动多个线程并发添加任务
threads = []
for i in range(5):
thread = threading.Thread(target=add_tasks, args=(i, 10))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
final_count = len(manager.completed_tasks_cache)
passed = final_count == 50 # 5个线程 × 10个任务
self.log_test_result("并发缓存操作", passed,
f"预期50个任务实际{final_count}个任务")
except Exception as e:
self.log_test_result("并发缓存操作", False, f"异常: {str(e)}")
# 测试2: 并发清理操作
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=1, update_interval=1)
manager.max_completed_cache_size = 10
# 添加大量任务
for i in range(50):
task = {
'task_id': f'cleanup_task_{i}',
'status': 'succeeded',
'content': f'cleanup task {i}',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
# 并发执行清理
def cleanup_worker():
manager._cleanup_completed_tasks()
threads = []
for i in range(3):
thread = threading.Thread(target=cleanup_worker)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
final_count = len(manager.completed_tasks_cache)
passed = final_count <= manager.max_completed_cache_size
self.log_test_result("并发清理操作", passed,
f"清理后{final_count}个任务,限制{manager.max_completed_cache_size}")
except Exception as e:
self.log_test_result("并发清理操作", False, f"异常: {str(e)}")
def test_data_integrity(self):
"""测试数据完整性"""
print("\n=== 数据完整性测试 ===")
# 测试1: 任务ID唯一性
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=1, update_interval=1)
# 添加重复ID的任务
task1 = {
'task_id': 'duplicate_id',
'status': 'succeeded',
'content': 'task 1',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
task2 = {
'task_id': 'duplicate_id',
'status': 'succeeded',
'content': 'task 2',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task1['task_id']] = task1
manager.completed_tasks_cache[task2['task_id']] = task2
# 检查是否只保留了一个任务(后者覆盖前者)
cached_task = manager.completed_tasks_cache.get('duplicate_id')
passed = (len(manager.completed_tasks_cache) == 1 and
cached_task['content'] == 'task 2')
self.log_test_result("任务ID唯一性", passed,
f"缓存中任务数: {len(manager.completed_tasks_cache)}")
except Exception as e:
self.log_test_result("任务ID唯一性", False, f"异常: {str(e)}")
# 测试2: 持久化数据一致性
try:
temp_file = tempfile.mktemp(suffix='.json')
self.temp_files.append(temp_file)
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
# 第一个管理器实例
manager1 = TaskQueueManager(
max_running_tasks=1,
update_interval=1,
persistence_file=temp_file
)
# 添加等待任务
original_tasks = [
{'task_id': 'persist_1', 'content': 'test 1'},
{'task_id': 'persist_2', 'content': 'test 2'}
]
for task in original_tasks:
manager1.waiting_queue.append(task)
manager1._save_persistence_data()
# 第二个管理器实例加载数据
manager2 = TaskQueueManager(
max_running_tasks=1,
update_interval=1,
persistence_file=temp_file
)
manager2._load_persistence_data()
# 检查数据一致性
loaded_tasks = manager2.waiting_queue
passed = (len(loaded_tasks) == len(original_tasks) and
all(task['task_id'] in [t['task_id'] for t in original_tasks]
for task in loaded_tasks))
self.log_test_result("持久化数据一致性", passed,
f"原始{len(original_tasks)}个,加载{len(loaded_tasks)}")
except Exception as e:
self.log_test_result("持久化数据一致性", False, f"异常: {str(e)}")
def test_performance_stress(self):
"""测试性能压力"""
print("\n=== 性能压力测试 ===")
# 测试1: 大量任务处理性能
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=1, update_interval=1)
manager.max_completed_cache_size = 1000
start_time = time.time()
# 添加大量任务
for i in range(500):
task = {
'task_id': f'perf_task_{i}',
'status': 'succeeded',
'content': f'performance task {i}',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
add_time = time.time() - start_time
# 测试查询性能
query_start = time.time()
for i in range(100):
task_id = f'perf_task_{random.randint(0, 499)}'
manager.get_task_by_id(task_id)
query_time = time.time() - query_start
# 测试清理性能
cleanup_start = time.time()
manager._cleanup_completed_tasks()
cleanup_time = time.time() - cleanup_start
passed = (add_time < 5.0 and query_time < 1.0 and cleanup_time < 2.0)
self.log_test_result("大量任务处理性能", passed,
f"添加:{add_time:.2f}s, 查询:{query_time:.2f}s, 清理:{cleanup_time:.2f}s")
except Exception as e:
self.log_test_result("大量任务处理性能", False, f"异常: {str(e)}")
def test_memory_management(self):
"""测试内存管理"""
print("\n=== 内存管理测试 ===")
# 测试1: 内存泄漏检测
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=1, update_interval=1)
manager.max_completed_cache_size = 10
# 循环添加和清理任务
for cycle in range(10):
# 添加任务
for i in range(20):
task = {
'task_id': f'memory_task_{cycle}_{i}',
'status': 'succeeded',
'content': f'memory test task {cycle}_{i}',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
# 清理
manager._cleanup_completed_tasks()
# 检查最终内存使用
final_cache_size = len(manager.completed_tasks_cache)
passed = final_cache_size <= manager.max_completed_cache_size
self.log_test_result("内存泄漏检测", passed,
f"最终缓存大小: {final_cache_size}")
except Exception as e:
self.log_test_result("内存泄漏检测", False, f"异常: {str(e)}")
def run_all_tests(self):
"""运行所有测试"""
print("开始核心逻辑健壮性测试...\n")
try:
self.test_boundary_conditions()
self.test_exception_handling()
self.test_concurrent_operations()
self.test_data_integrity()
self.test_performance_stress()
self.test_memory_management()
finally:
self.cleanup()
return self.generate_assessment()
def generate_assessment(self):
"""生成评估报告"""
print("\n" + "="*50)
print("核心逻辑健壮性评估报告")
print("="*50)
total_tests = len(self.test_results)
passed_tests = sum(1 for result in self.test_results if result['passed'])
failed_tests = total_tests - passed_tests
success_rate = (passed_tests / total_tests * 100) if total_tests > 0 else 0
print(f"\n📊 测试统计:")
print(f" 总测试数: {total_tests}")
print(f" 通过测试: {passed_tests}")
print(f" 失败测试: {failed_tests}")
print(f" 成功率: {success_rate:.1f}%")
print(f"\n📋 详细结果:")
for result in self.test_results:
status = "" if result['passed'] else ""
print(f" {status} {result['test_name']}: {result['details']}")
# 健壮性评级
if success_rate >= 95:
grade = "A+ (优秀)"
assessment = "系统具有极高的健壮性,能够很好地处理各种边界情况和异常场景。"
elif success_rate >= 85:
grade = "A (良好)"
assessment = "系统健壮性良好,大部分场景下表现稳定,少数边界情况需要优化。"
elif success_rate >= 70:
grade = "B (一般)"
assessment = "系统基本健壮,但在某些异常处理和边界情况下存在问题,需要改进。"
elif success_rate >= 50:
grade = "C (较差)"
assessment = "系统健壮性较差,存在较多问题,需要重点优化异常处理和边界情况。"
else:
grade = "D (差)"
assessment = "系统健壮性差,存在严重问题,需要全面重构和优化。"
print(f"\n🎯 健壮性评级: {grade}")
print(f"\n📝 评估结论:")
print(f" {assessment}")
# 改进建议
print(f"\n💡 改进建议:")
if failed_tests > 0:
print(" 1. 重点关注失败的测试用例,分析根本原因")
print(" 2. 加强异常处理机制,确保系统在各种异常情况下的稳定性")
print(" 3. 优化并发控制,防止竞态条件和数据不一致")
print(" 4. 完善边界条件处理,确保极端情况下的正确行为")
else:
print(" 1. 继续保持当前的高质量代码标准")
print(" 2. 定期进行健壮性测试,确保新功能不影响系统稳定性")
print(" 3. 考虑增加更多的压力测试和性能监控")
return {
'total_tests': total_tests,
'passed_tests': passed_tests,
'failed_tests': failed_tests,
'success_rate': success_rate,
'grade': grade,
'assessment': assessment,
'test_results': self.test_results
}
def main():
"""主函数"""
test_suite = RobustnessTestSuite()
assessment = test_suite.run_all_tests()
return assessment
if __name__ == "__main__":
main()

532
test_stress.py Normal file
View File

@ -0,0 +1,532 @@
# -*- coding: utf-8 -*-
"""
压力测试脚本
测试TaskQueueManager在高负载长时间运行等极端条件下的性能和稳定性
"""
import os
import sys
import time
import threading
import multiprocessing
from datetime import datetime, timedelta
from unittest.mock import Mock, patch
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
import gc
import tracemalloc
# 添加项目根目录到路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 设置测试环境变量
os.environ['ARK_API_KEY'] = 'test_api_key_for_testing'
os.environ['ARK_BASE_URL'] = 'https://test.api.com'
from task_queue_manager import TaskQueueManager
class StressTestSuite:
"""压力测试套件"""
def __init__(self):
self.results = []
self.start_memory = None
self.peak_memory = 0
def log_result(self, test_name, success, details, metrics=None):
"""记录测试结果"""
result = {
'test_name': test_name,
'success': success,
'details': details,
'metrics': metrics or {},
'timestamp': datetime.now().isoformat()
}
self.results.append(result)
status = "" if success else ""
print(f"{status} {test_name}: {details}")
if metrics:
for key, value in metrics.items():
print(f" {key}: {value}")
def get_memory_usage(self):
"""获取当前内存使用量(MB)"""
try:
# 使用tracemalloc获取内存使用情况
current, peak = tracemalloc.get_traced_memory()
return current / 1024 / 1024 # 转换为MB
except:
# 如果tracemalloc未启动返回一个估算值
return len(gc.get_objects()) * 0.001 # 粗略估算
def test_high_volume_operations(self):
"""测试高容量操作"""
print("\n=== 高容量操作测试 ===")
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=100, update_interval=1)
manager.max_completed_cache_size = 10000
start_time = time.time()
start_memory = self.get_memory_usage()
# 添加大量任务
task_count = 5000
for i in range(task_count):
task = {
'task_id': f'volume_task_{i}',
'status': random.choice(['succeeded', 'failed', 'running']),
'content': f'high volume task {i}' * 10, # 增加数据量
'params': {'index': i, 'data': list(range(100))},
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
if task['status'] == 'running':
manager.running_tasks_cache[task['task_id']] = task
else:
manager.completed_tasks_cache[task['task_id']] = task
add_time = time.time() - start_time
peak_memory = self.get_memory_usage()
memory_increase = peak_memory - start_memory
# 测试查询性能
query_start = time.time()
query_count = 1000
for _ in range(query_count):
task_id = f'volume_task_{random.randint(0, task_count-1)}'
manager.get_task_by_id(task_id)
query_time = time.time() - query_start
avg_query_time = query_time / query_count * 1000 # ms
# 测试清理性能
cleanup_start = time.time()
manager._cleanup_completed_tasks()
cleanup_time = time.time() - cleanup_start
final_memory = self.get_memory_usage()
success = (add_time < 30 and avg_query_time < 1 and cleanup_time < 10)
metrics = {
'任务数量': task_count,
'添加耗时': f'{add_time:.2f}s',
'平均查询时间': f'{avg_query_time:.3f}ms',
'清理耗时': f'{cleanup_time:.2f}s',
'内存增长': f'{memory_increase:.1f}MB',
'最终内存': f'{final_memory:.1f}MB'
}
self.log_result("高容量操作", success,
f"处理{task_count}个任务", metrics)
except Exception as e:
self.log_result("高容量操作", False, f"异常: {str(e)}")
def test_concurrent_stress(self):
"""测试并发压力"""
print("\n=== 并发压力测试 ===")
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=50, update_interval=1)
manager.max_completed_cache_size = 5000
start_time = time.time()
start_memory = self.get_memory_usage()
def worker_thread(worker_id, operations_per_worker):
"""工作线程"""
operations = 0
errors = 0
for i in range(operations_per_worker):
try:
# 随机操作
operation = random.choice(['add', 'query', 'cleanup'])
if operation == 'add':
task = {
'task_id': f'stress_task_{worker_id}_{i}',
'status': random.choice(['succeeded', 'failed']),
'content': f'stress task {worker_id}_{i}',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
elif operation == 'query':
# 查询随机任务
task_id = f'stress_task_{random.randint(0, 9)}_{random.randint(0, 99)}'
manager.get_task_by_id(task_id)
elif operation == 'cleanup':
manager._cleanup_completed_tasks()
operations += 1
except Exception:
errors += 1
# 随机延迟
time.sleep(random.uniform(0.001, 0.01))
return {'operations': operations, 'errors': errors}
# 启动多个工作线程
thread_count = 20
operations_per_worker = 100
with ThreadPoolExecutor(max_workers=thread_count) as executor:
futures = []
for i in range(thread_count):
future = executor.submit(worker_thread, i, operations_per_worker)
futures.append(future)
# 收集结果
total_operations = 0
total_errors = 0
for future in as_completed(futures):
result = future.result()
total_operations += result['operations']
total_errors += result['errors']
total_time = time.time() - start_time
peak_memory = self.get_memory_usage()
memory_increase = peak_memory - start_memory
operations_per_second = total_operations / total_time
error_rate = (total_errors / total_operations * 100) if total_operations > 0 else 0
success = (error_rate < 5 and operations_per_second > 100)
metrics = {
'并发线程数': thread_count,
'总操作数': total_operations,
'错误数': total_errors,
'错误率': f'{error_rate:.2f}%',
'操作/秒': f'{operations_per_second:.1f}',
'总耗时': f'{total_time:.2f}s',
'内存增长': f'{memory_increase:.1f}MB'
}
self.log_result("并发压力", success,
f"{thread_count}个线程并发操作", metrics)
except Exception as e:
self.log_result("并发压力", False, f"异常: {str(e)}")
def test_long_running_stability(self):
"""测试长时间运行稳定性"""
print("\n=== 长时间运行稳定性测试 ===")
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=10, update_interval=1)
manager.max_completed_cache_size = 100
manager.completed_cache_ttl_hours = 0.01 # 36秒TTL
start_time = time.time()
start_memory = self.get_memory_usage()
# 模拟长时间运行压缩到30秒
test_duration = 30 # 秒
cycle_count = 0
max_memory = start_memory
while time.time() - start_time < test_duration:
cycle_count += 1
# 添加一批任务
for i in range(20):
task = {
'task_id': f'longrun_task_{cycle_count}_{i}',
'status': 'succeeded',
'content': f'long running task {cycle_count}_{i}',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
# 随机查询
for _ in range(10):
task_id = f'longrun_task_{random.randint(max(1, cycle_count-5), cycle_count)}_{random.randint(0, 19)}'
manager.get_task_by_id(task_id)
# 定期清理
if cycle_count % 5 == 0:
manager._cleanup_completed_tasks()
gc.collect() # 强制垃圾回收
# 监控内存
current_memory = self.get_memory_usage()
max_memory = max(max_memory, current_memory)
time.sleep(0.5)
total_time = time.time() - start_time
final_memory = self.get_memory_usage()
memory_increase = final_memory - start_memory
peak_memory_increase = max_memory - start_memory
# 检查内存是否稳定(没有持续增长)
memory_stable = memory_increase < peak_memory_increase * 0.8
cache_size_reasonable = len(manager.completed_tasks_cache) <= manager.max_completed_cache_size
success = memory_stable and cache_size_reasonable
metrics = {
'运行时长': f'{total_time:.1f}s',
'循环次数': cycle_count,
'最终缓存大小': len(manager.completed_tasks_cache),
'缓存限制': manager.max_completed_cache_size,
'内存增长': f'{memory_increase:.1f}MB',
'峰值内存增长': f'{peak_memory_increase:.1f}MB',
'内存稳定': '' if memory_stable else ''
}
self.log_result("长时间运行稳定性", success,
f"运行{total_time:.1f}秒,{cycle_count}个周期", metrics)
except Exception as e:
self.log_result("长时间运行稳定性", False, f"异常: {str(e)}")
def test_memory_pressure(self):
"""测试内存压力"""
print("\n=== 内存压力测试 ===")
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=10, update_interval=1)
manager.max_completed_cache_size = 1000
start_memory = self.get_memory_usage()
# 创建大量大对象任务
large_data = 'x' * 10000 # 10KB字符串
task_count = 2000
start_time = time.time()
for i in range(task_count):
task = {
'task_id': f'memory_task_{i}',
'status': 'succeeded',
'content': large_data,
'params': {
'large_list': list(range(1000)),
'large_dict': {f'key_{j}': f'value_{j}' * 100 for j in range(100)}
},
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
# 每100个任务检查一次内存
if i % 100 == 0:
current_memory = self.get_memory_usage()
if current_memory - start_memory > 500: # 超过500MB增长
# 触发清理
manager._cleanup_completed_tasks()
gc.collect()
add_time = time.time() - start_time
peak_memory = self.get_memory_usage()
# 强制清理
cleanup_start = time.time()
manager._cleanup_completed_tasks()
gc.collect()
cleanup_time = time.time() - cleanup_start
final_memory = self.get_memory_usage()
memory_increase = peak_memory - start_memory
memory_recovered = peak_memory - final_memory
# 检查内存是否得到有效管理
memory_managed = memory_increase < 300 # 增长不超过300MB
cleanup_effective = memory_recovered > memory_increase * 0.5 # 清理回收超过50%
success = memory_managed and cleanup_effective
metrics = {
'任务数量': task_count,
'添加耗时': f'{add_time:.2f}s',
'清理耗时': f'{cleanup_time:.2f}s',
'内存增长': f'{memory_increase:.1f}MB',
'内存回收': f'{memory_recovered:.1f}MB',
'最终缓存大小': len(manager.completed_tasks_cache)
}
self.log_result("内存压力", success,
f"处理{task_count}个大对象任务", metrics)
except Exception as e:
self.log_result("内存压力", False, f"异常: {str(e)}")
def test_extreme_cleanup_scenarios(self):
"""测试极端清理场景"""
print("\n=== 极端清理场景测试 ===")
try:
mock_service = Mock()
mock_service.get_all_tasks.return_value = []
with patch('task_queue_manager.get_video_service', return_value=mock_service):
manager = TaskQueueManager(max_running_tasks=10, update_interval=1)
manager.max_completed_cache_size = 50
manager.completed_cache_ttl_hours = 0.001 # 3.6秒TTL
start_time = time.time()
# 场景1: 大量过期任务
old_time = datetime.now() - timedelta(hours=1)
for i in range(200):
task = {
'task_id': f'old_task_{i}',
'status': 'succeeded',
'content': f'old task {i}',
'created_at': old_time.isoformat(),
'cache_time': old_time.isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
# 场景2: 混合新旧任务
for i in range(100):
task = {
'task_id': f'new_task_{i}',
'status': 'succeeded',
'content': f'new task {i}',
'created_at': datetime.now().isoformat(),
'cache_time': datetime.now().isoformat()
}
manager.completed_tasks_cache[task['task_id']] = task
before_cleanup = len(manager.completed_tasks_cache)
# 执行清理
cleanup_start = time.time()
manager._cleanup_completed_tasks()
cleanup_time = time.time() - cleanup_start
after_cleanup = len(manager.completed_tasks_cache)
# 验证清理效果
cleanup_effective = after_cleanup <= manager.max_completed_cache_size
cleanup_fast = cleanup_time < 5.0
old_tasks_removed = after_cleanup < before_cleanup
success = cleanup_effective and cleanup_fast and old_tasks_removed
metrics = {
'清理前任务数': before_cleanup,
'清理后任务数': after_cleanup,
'清理耗时': f'{cleanup_time:.3f}s',
'清理比例': f'{(before_cleanup - after_cleanup) / before_cleanup * 100:.1f}%'
}
self.log_result("极端清理场景", success,
f"{before_cleanup}个任务清理到{after_cleanup}", metrics)
except Exception as e:
self.log_result("极端清理场景", False, f"异常: {str(e)}")
def run_stress_tests(self):
"""运行所有压力测试"""
print("开始压力测试...\n")
# 启动内存跟踪
tracemalloc.start()
self.start_memory = self.get_memory_usage()
print(f"初始内存使用: {self.start_memory:.1f}MB\n")
self.test_high_volume_operations()
self.test_concurrent_stress()
self.test_long_running_stability()
self.test_memory_pressure()
self.test_extreme_cleanup_scenarios()
return self.generate_stress_report()
def generate_stress_report(self):
"""生成压力测试报告"""
print("\n" + "="*60)
print("压力测试报告")
print("="*60)
total_tests = len(self.results)
passed_tests = sum(1 for r in self.results if r['success'])
failed_tests = total_tests - passed_tests
success_rate = (passed_tests / total_tests * 100) if total_tests > 0 else 0
print(f"\n📊 测试统计:")
print(f" 总测试数: {total_tests}")
print(f" 通过测试: {passed_tests}")
print(f" 失败测试: {failed_tests}")
print(f" 成功率: {success_rate:.1f}%")
print(f"\n📋 详细结果:")
for result in self.results:
status = "" if result['success'] else ""
print(f" {status} {result['test_name']}: {result['details']}")
if result['metrics']:
for key, value in result['metrics'].items():
print(f" {key}: {value}")
# 性能评级
if success_rate >= 90:
grade = "优秀"
assessment = "系统在高负载下表现优秀,具有良好的性能和稳定性。"
elif success_rate >= 75:
grade = "良好"
assessment = "系统在压力测试中表现良好,但在某些极端情况下可能需要优化。"
elif success_rate >= 60:
grade = "一般"
assessment = "系统基本能够处理压力场景,但存在性能瓶颈,需要优化。"
else:
grade = "较差"
assessment = "系统在高负载下表现不佳,存在严重的性能或稳定性问题。"
print(f"\n🎯 性能评级: {grade}")
print(f"\n📝 评估结论: {assessment}")
final_memory = self.get_memory_usage()
memory_change = final_memory - self.start_memory
print(f"\n💾 内存使用: 初始{self.start_memory:.1f}MB → 最终{final_memory:.1f}MB (变化{memory_change:+.1f}MB)")
return {
'total_tests': total_tests,
'passed_tests': passed_tests,
'success_rate': success_rate,
'grade': grade,
'assessment': assessment,
'memory_change': memory_change
}
def main():
"""主函数"""
test_suite = StressTestSuite()
report = test_suite.run_stress_tests()
return report
if __name__ == "__main__":
main()

298
video_service.py Normal file
View File

@ -0,0 +1,298 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
视频生成服务模块
使用火山引擎豆包视频生成API
"""
import os
import json
import time
from typing import Dict, List, Optional, Any
from datetime import datetime
from volcenginesdkarkruntime import Ark
class VideoGenerationService:
"""视频生成服务类"""
def __init__(self):
"""
初始化视频生成服务
"""
# 使用环境变量初始化客户端
api_key = os.environ.get('ARK_API_KEY', '')
if not api_key:
raise ValueError("ARK_API_KEY环境变量未设置")
self.client = Ark(
api_key=api_key
)
# 从环境变量获取模型ID
self.model_id = os.environ.get("VIDEO_MODEL")
def create_video_generation_task(self, content, callback_url=None, parameters=None) -> Dict[str, Any]:
"""
创建视频生成任务
Args:
content: 请求内容格式为 {'image_url': str, 'prompt': str}
callback_url: 回调URL可选
parameters: 额外参数可选
Returns:
包含任务信息的字典
"""
try:
model = self.model_id
# 构建符合官方API格式的content数组
api_content = []
# 添加文本提示词
if 'prompt' in content:
prompt_text = content['prompt']
# 如果parameters中有参数将其追加到prompt中
if parameters:
param_parts = []
for key, value in parameters.items():
if key == 'duration':
param_parts.append(f"--dur {value}")
elif key == 'ratio':
param_parts.append(f"--rt {value}")
elif key == 'resolution':
param_parts.append(f"--rs {value}")
elif key == 'framepersecond':
param_parts.append(f"--fps {value}")
elif key == 'watermark':
param_parts.append(f"--wm {value}")
elif key == 'seed':
param_parts.append(f"--seed {value}")
elif key == 'camerafixed':
param_parts.append(f"--cf {value}")
if param_parts:
prompt_text += " " + " ".join(param_parts)
api_content.append({
"type": "text",
"text": prompt_text
})
# 添加图片URL
if 'image_url' in content:
api_content.append({
"type": "image_url",
"image_url": {
"url": content['image_url']
}
})
print(f"api_content: {api_content}")
# 使用官方SDK创建任务
create_result = self.client.content_generation.tasks.create(
model=model,
content=api_content
)
task_id = create_result.id
return {'success': True, 'data':{'task_id':task_id}}
except Exception as e:
return {
'success': False,
'error': f'请求异常: {str(e)}'
}
def get_task_status(self, task_id: str) -> Dict[str, Any]:
"""
查询任务状态
Args:
task_id: 任务ID
Returns:
包含任务状态信息的字典
"""
try:
result = self.client.content_generation.tasks.get(
task_id=task_id,
)
print(result)
# 构建返回数据,匹配实际的 ContentGenerationTask 对象结构
task_data = {
'id': result.id,
'task_id': result.id, # 保持兼容性
'model': result.model,
'status': result.status,
'error': result.error,
'content': {
'video_url': result.content.video_url
} if hasattr(result, 'content') and result.content else None,
'usage': {
'completion_tokens': result.usage.completion_tokens,
'total_tokens': result.usage.total_tokens
} if hasattr(result, 'usage') and result.usage else None,
'created_at': result.created_at,
'updated_at': result.updated_at
}
return {'success': True, 'data': task_data}
except Exception as e:
error_str = str(e)
# 检查是否是资源未找到的错误
if 'ResourceNotFound' in error_str or '404' in error_str:
return {
'success': True,
'data': {
'id': task_id,
'task_id': task_id,
'status': 'not_found',
'error': 'ResourceNotFound',
'message': '指定的任务资源未找到',
'model': None,
'content': None,
'usage': None,
'created_at': None,
'updated_at': None
}
}
else:
return {
'success': False,
'error': f'查询异常: {str(e)}'
}
def get_task_list(self, limit=20, offset=0) -> Dict[str, Any]:
"""
获取任务列表
Args:
limit: 每页数量
offset: 偏移量
Returns:
包含任务列表的字典
"""
try:
# 将limit/offset转换为page_num/page_size
page_num = (offset // limit) + 1 if limit > 0 else 1
page_size = limit
result = self.client.content_generation.tasks.list(
page_num=page_num,
page_size=page_size
)
# 将ContentGenerationTask对象转换为字典格式
tasks_data = []
if hasattr(result, 'items') and result.items:
for task in result.items:
task_dict = {
'id': getattr(task, 'id', ''),
'task_id': getattr(task, 'id', ''), # 兼容性字段
'status': getattr(task, 'status', ''),
'model': getattr(task, 'model', ''),
'created_at': getattr(task, 'created_at', ''),
'updated_at': getattr(task, 'updated_at', ''),
'error': getattr(task, 'error', None),
}
# 添加content字段
if hasattr(task, 'content') and task.content:
task_dict['content'] = {
'video_url': getattr(task.content, 'video_url', '')
}
else:
task_dict['content'] = None
# 添加usage字段
if hasattr(task, 'usage') and task.usage:
task_dict['usage'] = {
'completion_tokens': getattr(task.usage, 'completion_tokens', 0),
'total_tokens': getattr(task.usage, 'total_tokens', 0)
}
else:
task_dict['usage'] = None
tasks_data.append(task_dict)
return {'success': True, 'data': {
'tasks': tasks_data,
'total': getattr(result, 'total', 0),
'page_num': page_num,
'page_size': page_size,
'limit': limit,
'offset': offset
}}
except Exception as e:
return {
'success': False,
'error': f'获取列表异常: {str(e)}'
}
def delete_task(self, task_id: str) -> Dict[str, Any]:
"""
删除任务
Args:
task_id: 任务ID
Returns:
删除结果
"""
try:
self.client.content_generation.tasks.delete(
task_id=task_id
)
return {'success': True}
except Exception as e:
return {
'success': False,
'error': f'删除异常: {str(e)}'
}
def wait_for_completion(self, task_id: str, max_wait_time: int = 300, check_interval: int = 5) -> Dict[str, Any]:
"""
等待任务完成
Args:
task_id: 任务ID
max_wait_time: 最大等待时间
check_interval: 检查间隔
Returns:
最终的任务状态信息
"""
start_time = time.time()
while time.time() - start_time < max_wait_time:
status_result = self.get_task_status(task_id)
if not status_result["success"]:
return status_result
status = status_result.get("data", {}).get("status")
if status not in ["queued", "running"]:
return status_result
time.sleep(check_interval)
return {
"success": False,
"error": "任务等待超时",
"task_id": task_id
}
# 全局服务实例
video_service = None
def get_video_service() -> VideoGenerationService:
"""
获取视频生成服务实例单例模式
"""
global video_service
if video_service is None:
video_service = VideoGenerationService()
return video_service