1339 lines
56 KiB
Python
1339 lines
56 KiB
Python
import os
|
||
import json
|
||
import random
|
||
import time
|
||
from typing import Dict, Iterable, List, Optional, Any, cast
|
||
from datetime import datetime
|
||
from volcenginesdkarkruntime import Ark
|
||
from volcengine.visual.VisualService import VisualService
|
||
from volcengine.vod.VodService import VodService
|
||
from volcengine.util.Functions import Function
|
||
from volcengine.vod.models.request.request_vod_pb2 import VodUploadMediaRequest
|
||
from volcengine.vod.models.request.request_vod_pb2 import VodUrlUploadRequest
|
||
import base64
|
||
|
||
from volcenginesdkarkruntime.types.chat import ChatCompletionToolParam
|
||
|
||
from config import API_CONFIG
|
||
|
||
class HuoshanAPI:
|
||
def __init__(self, type:int=0):
|
||
self.type = type
|
||
# 从环境变量获取模型ID
|
||
self.video_pro_model_id:str = API_CONFIG['huoshan']['model']['jimeng_video3.0_pro'] #即梦3.0 pro 首帧视频生成模型
|
||
self.video_model_id:str = API_CONFIG['huoshan']['model']['jimeng_video3.0'] #即梦3.0 首帧视频生成模型
|
||
self.video_t2v_model_id:str = API_CONFIG['huoshan']['model']['jimeng_video3.0_t2v'] #即梦3.0 文生视频模型
|
||
self.txt_to_image_model_id:str = API_CONFIG['huoshan']['model']['jimeng_image3.0'] #文字生成图片模型
|
||
self.doubao_seed_1_6_model_id:str = API_CONFIG['huoshan']['model']['doubao_seed_1.6'] #豆包1.6 模型
|
||
|
||
# 火山引擎ark 视频、图片生成
|
||
if self.type == 0 or self.type == 1:
|
||
self.client = Ark(
|
||
api_key=API_CONFIG['huoshan']['ark_api_key']
|
||
)
|
||
# 初始化 VisualService 图片处理
|
||
elif self.type == 0 or self.type == 2:
|
||
self.visual = VisualService()
|
||
self.visual.set_ak(API_CONFIG['huoshan']['AccessKey'])
|
||
self.visual.set_sk(API_CONFIG['huoshan']['SecretKey'])
|
||
# 初始化 VisualService 视频处理 工作流
|
||
elif self.type == 0 or self.type == 3:
|
||
self.vodClient = VodService('cn-north-1') # 默认 华北 'cn-north-1'
|
||
self.vodClient.set_ak(API_CONFIG['huoshan']['AccessKey'])
|
||
self.vodClient.set_sk(API_CONFIG['huoshan']['SecretKey'])
|
||
|
||
|
||
def create_txt_to_image_generation_task(self, prompt, parameters=None) -> Dict[str, Any]:
|
||
"""
|
||
创建文字生成图片任务
|
||
同步返回结果
|
||
Args:
|
||
prompt: 提示词
|
||
parameters: 额外参数(可选)
|
||
"response_format": "url",
|
||
"size": "1024x1024",
|
||
"seed": -1,
|
||
"guidance_scale": 2.5,
|
||
"watermark": False
|
||
|
||
Returns:
|
||
status: 接口执行结果 success, error
|
||
list: 图片URL集合
|
||
message: 错误信息
|
||
"""
|
||
try:
|
||
model = self.txt_to_image_model_id
|
||
# 构建符合官方API格式的content数组
|
||
prompt_text = prompt
|
||
|
||
default_parameters = {
|
||
"response_format": "url",
|
||
"size": "1024x1024",
|
||
"seed": -1,
|
||
"guidance_scale": 2.5,
|
||
"watermark": False
|
||
}
|
||
|
||
if parameters:
|
||
default_parameters.update({
|
||
key: parameters[key] for key in default_parameters.keys() & parameters.keys()
|
||
})
|
||
|
||
response_format = default_parameters["response_format"]
|
||
size = default_parameters["size"]
|
||
seed = default_parameters["seed"]
|
||
guidance_scale = default_parameters["guidance_scale"]
|
||
watermark = default_parameters["watermark"]
|
||
|
||
print(f'创建文字生成图片任务 generate :{model} {prompt_text} {response_format} {size} ')
|
||
# 使用官方SDK创建任务
|
||
create_result = self.client.images.generate(
|
||
model=model,
|
||
prompt=prompt_text,
|
||
response_format=response_format,
|
||
size=size,
|
||
seed=seed,
|
||
guidance_scale=guidance_scale,
|
||
watermark=watermark,
|
||
)
|
||
print(f'创建文字生成图片任务 返回:{create_result} ')
|
||
if create_result.data and len(create_result.data) > 0:
|
||
imgs = []
|
||
for item in create_result.data:
|
||
imgs.append(item.url)
|
||
return {'status': 'success', 'list': imgs}
|
||
else:
|
||
error_message = "接口异常"
|
||
if create_result:
|
||
if create_result.error:
|
||
error_message = f'{create_result.error.code}: {create_result.error.message}'
|
||
return {'status': 'error', 'message':f'接口异常: {error_message}' }
|
||
except Exception as e:
|
||
return {
|
||
'status': 'error',
|
||
'message': f'请求异常: {str(e)}'
|
||
}
|
||
|
||
def create_video_generation_task(self, content, parameters=None, callback_url=None) -> Dict[str, Any]:
|
||
"""
|
||
创建视频生成任务
|
||
首帧 尾帧可选
|
||
Args:
|
||
content: 请求内容,格式为 {'model':'', 'image_url': str, 'prompt': str, 'tail_img_url': str}
|
||
callback_url: 回调URL(可选)
|
||
parameters: 额外参数(可选)
|
||
|
||
Returns:
|
||
status: 接口执行结果 success, error
|
||
task_id: 任务ID
|
||
message: 错误信息
|
||
"""
|
||
try:
|
||
if 'model' in content:
|
||
model = content['model']
|
||
else:
|
||
model = self.video_model_id
|
||
# 构建符合官方API格式的content数组
|
||
api_content = []
|
||
|
||
# 添加文本提示词
|
||
if 'prompt' in content:
|
||
prompt_text = content['prompt']
|
||
|
||
# 如果parameters中有参数,将其追加到prompt中
|
||
if parameters:
|
||
param_parts = []
|
||
for key, value in parameters.items():
|
||
if key == 'duration':
|
||
param_parts.append(f"--dur {value}")
|
||
elif key == 'ratio':
|
||
param_parts.append(f"--rt {value}")
|
||
elif key == 'resolution':
|
||
param_parts.append(f"--rs {value}")
|
||
elif key == 'framepersecond':
|
||
param_parts.append(f"--fps {value}")
|
||
elif key == 'watermark':
|
||
param_parts.append(f"--wm {value}")
|
||
elif key == 'seed':
|
||
param_parts.append(f"--seed {value}")
|
||
elif key == 'camerafixed':
|
||
param_parts.append(f"--cf {value}")
|
||
|
||
if param_parts:
|
||
prompt_text += " " + " ".join(param_parts)
|
||
|
||
api_content.append({
|
||
"type": "text",
|
||
"text": prompt_text
|
||
})
|
||
hasTail = False
|
||
# 尾帧
|
||
if 'tail_img_url' in content:
|
||
hasTail = True
|
||
api_content.append({
|
||
"type": "image_url",
|
||
"image_url": {
|
||
"url": content['tail_img_url']
|
||
},
|
||
"role":"last_frame",
|
||
})
|
||
|
||
# 添加图片URL
|
||
if 'image_url' in content:
|
||
first_frame = {
|
||
"type": "image_url",
|
||
"image_url": {
|
||
"url": content['image_url']
|
||
}
|
||
}
|
||
if hasTail:
|
||
first_frame['role'] = "first_frame"
|
||
|
||
api_content.append(first_frame)
|
||
print(f"model: {model}")
|
||
print(f"api_content: {api_content}")
|
||
# 使用官方SDK创建任务
|
||
create_result = self.client.content_generation.tasks.create(
|
||
model=model,
|
||
content=api_content,
|
||
callback_url=callback_url
|
||
)
|
||
task_id = create_result.id
|
||
return {'status': 'success', 'task_id':task_id }
|
||
except Exception as e:
|
||
return {
|
||
'status': 'error',
|
||
'message': f'请求异常: {str(e)}'
|
||
}
|
||
|
||
def create_video_lite_generation_task(self, content, parameters=None, callback_url=None) -> Dict[str, Any]:
|
||
"""
|
||
创建视频生成任务 未使用
|
||
首尾帧
|
||
Args:
|
||
content: 请求内容,格式为 {'first_frame': str, 'last_frame': str, 'prompt': str}
|
||
callback_url: 回调URL(可选)
|
||
parameters: 额外参数(可选)
|
||
|
||
Returns:
|
||
status: 接口执行结果 success, error
|
||
task_id: 任务ID
|
||
message: 错误信息
|
||
"""
|
||
try:
|
||
if 'model' in content:
|
||
model = content['model']
|
||
else:
|
||
model = self.video_model_id
|
||
# 构建符合官方API格式的content数组
|
||
api_content = []
|
||
|
||
# 添加文本提示词
|
||
if 'prompt' in content:
|
||
prompt_text = content['prompt']
|
||
|
||
# 如果parameters中有参数,将其追加到prompt中
|
||
if parameters:
|
||
param_parts = []
|
||
for key, value in parameters.items():
|
||
if key == 'duration':
|
||
param_parts.append(f"--dur {value}")
|
||
elif key == 'ratio':
|
||
param_parts.append(f"--rt {value}")
|
||
elif key == 'resolution':
|
||
param_parts.append(f"--rs {value}")
|
||
elif key == 'framepersecond':
|
||
param_parts.append(f"--fps {value}")
|
||
elif key == 'watermark':
|
||
param_parts.append(f"--wm {value}")
|
||
elif key == 'seed':
|
||
param_parts.append(f"--seed {value}")
|
||
elif key == 'camerafixed':
|
||
param_parts.append(f"--cf {value}")
|
||
|
||
if param_parts:
|
||
prompt_text += " " + " ".join(param_parts)
|
||
|
||
api_content.append({
|
||
"type": "text",
|
||
"text": prompt_text
|
||
})
|
||
|
||
# 首帧
|
||
if 'first_frame' in content:
|
||
api_content.append({
|
||
"type": "image_url",
|
||
"image_url": {
|
||
"url": content['first_frame']
|
||
},
|
||
"role":"first_frame",
|
||
})
|
||
# 尾帧
|
||
if 'last_frame' in content:
|
||
api_content.append({
|
||
"type": "image_url",
|
||
"image_url": {
|
||
"url": content['last_frame']
|
||
},
|
||
"role":"last_frame",
|
||
})
|
||
# print(f"model: {model}")
|
||
# print(f"api_content: {api_content}")
|
||
# 使用官方SDK创建任务
|
||
create_result = self.client.content_generation.tasks.create(
|
||
model=model,
|
||
content=api_content,
|
||
callback_url=callback_url
|
||
)
|
||
task_id = create_result.id
|
||
return {'status': 'success', 'task_id':task_id }
|
||
except Exception as e:
|
||
return {
|
||
'status': 'error',
|
||
'message': f'请求异常: {str(e)}'
|
||
}
|
||
|
||
def create_video_txt_generation_task(self, prompt, model=None, parameters=None, callback_url=None) -> Dict[str, Any]:
|
||
"""
|
||
创建视频生成任务
|
||
文生视频
|
||
Args:
|
||
prompt: 提示词
|
||
callback_url: 回调URL(可选)
|
||
parameters: 额外参数(可选)
|
||
|
||
Returns:
|
||
status: 接口执行结果 success, error
|
||
task_id: 任务ID
|
||
message: 错误信息
|
||
"""
|
||
try:
|
||
# 构建符合官方API格式的content数组
|
||
api_content = []
|
||
if model is None:
|
||
model = self.txt_to_image_model_id
|
||
# 添加文本提示词
|
||
if prompt:
|
||
prompt_text = prompt
|
||
|
||
# 如果parameters中有参数,将其追加到prompt中
|
||
if parameters:
|
||
param_parts = []
|
||
for key, value in parameters.items():
|
||
if key == 'duration':
|
||
param_parts.append(f"--dur {value}")
|
||
elif key == 'ratio':
|
||
param_parts.append(f"--rt {value}")
|
||
elif key == 'resolution':
|
||
param_parts.append(f"--rs {value}")
|
||
elif key == 'framepersecond':
|
||
param_parts.append(f"--fps {value}")
|
||
elif key == 'watermark':
|
||
param_parts.append(f"--wm {value}")
|
||
elif key == 'seed':
|
||
param_parts.append(f"--seed {value}")
|
||
elif key == 'camerafixed':
|
||
param_parts.append(f"--cf {value}")
|
||
|
||
if param_parts:
|
||
prompt_text += " " + " ".join(param_parts)
|
||
|
||
api_content.append({
|
||
"type": "text",
|
||
"text": prompt_text
|
||
})
|
||
|
||
# 使用官方SDK创建任务
|
||
create_result = self.client.content_generation.tasks.create(
|
||
model=model,
|
||
content=api_content,
|
||
callback_url=callback_url
|
||
)
|
||
task_id = create_result.id
|
||
return {'status': 'success', 'task_id':task_id }
|
||
except Exception as e:
|
||
return {
|
||
'status': 'error',
|
||
'message': f'请求异常: {str(e)}'
|
||
}
|
||
|
||
def get_task_status(self, task_id: str) -> Dict[str, Any]:
|
||
"""
|
||
查询任务状态
|
||
|
||
Args:
|
||
task_id: 任务ID
|
||
|
||
Returns:
|
||
status: 任务状态 completed, error, pending
|
||
video_url: 视频URL
|
||
message: 错误信息
|
||
"""
|
||
status = 'pending'
|
||
message = ''
|
||
video_url = ''
|
||
try:
|
||
result = self.client.content_generation.tasks.get(
|
||
task_id=task_id,
|
||
)
|
||
# res_status的状态:succeeded, failed, running, cancelled, queued
|
||
res_status = result.status
|
||
print(f"get_task_status: {res_status}")
|
||
if res_status == 'succeeded':
|
||
if result.content and result.content.video_url:
|
||
video_url = result.content.video_url
|
||
status = 'completed'
|
||
else:
|
||
status = 'error'
|
||
video_url = ''
|
||
message = f'视频URL为空 {task_id}'
|
||
elif res_status =='failed':
|
||
status = 'error'
|
||
errorCode = ''
|
||
errorMessage = ''
|
||
if result.error:
|
||
errorCode = result.error.code
|
||
errorMessage = result.error.message
|
||
|
||
message = f'任务执行失败 {task_id} result.error.code:{errorCode} result.error.message :{errorMessage}'
|
||
elif res_status =='cancelled':
|
||
status = 'error'
|
||
message = f'任务已被取消 {task_id}'
|
||
else:
|
||
status = 'pending'
|
||
message = f'任务正在执行中 {task_id}'
|
||
|
||
return {'status': status, 'video_url': video_url, 'message': message}
|
||
|
||
except Exception as e:
|
||
status = 'error'
|
||
error_str = str(e)
|
||
# 检查是否是资源未找到的错误
|
||
if 'ResourceNotFound' in error_str or '404' in error_str:
|
||
return {
|
||
'status': status,
|
||
'message': f'指定的任务资源未找到 {task_id}'
|
||
}
|
||
else:
|
||
return {
|
||
'status': status,
|
||
'message': f'查询异常: {str(e)}'
|
||
}
|
||
|
||
def get_task_list(self, limit=20, offset=0) -> Dict[str, Any]:
|
||
"""
|
||
获取任务列表
|
||
|
||
Args:
|
||
limit: 每页数量
|
||
offset: 偏移量
|
||
|
||
Returns:
|
||
包含任务列表的字典
|
||
"""
|
||
try:
|
||
# 将limit/offset转换为page_num/page_size
|
||
page_num = (offset // limit) + 1 if limit > 0 else 1
|
||
page_size = limit
|
||
|
||
result = self.client.content_generation.tasks.list(
|
||
page_num=page_num,
|
||
page_size=page_size
|
||
)
|
||
|
||
# 将ContentGenerationTask对象转换为字典格式
|
||
tasks_data = []
|
||
if hasattr(result, 'items') and result.items:
|
||
for task in result.items:
|
||
task_dict = {
|
||
'id': getattr(task, 'id', ''),
|
||
'task_id': getattr(task, 'id', ''), # 兼容性字段
|
||
'status': getattr(task, 'status', ''),
|
||
'model': getattr(task, 'model', ''),
|
||
'created_at': getattr(task, 'created_at', ''),
|
||
'updated_at': getattr(task, 'updated_at', ''),
|
||
'error': getattr(task, 'error', None),
|
||
}
|
||
# 添加content字段
|
||
if hasattr(task, 'content') and task.content:
|
||
task_dict['content'] = {
|
||
'video_url': getattr(task.content, 'video_url', '')
|
||
}
|
||
else:
|
||
task_dict['content'] = None
|
||
|
||
# 添加usage字段
|
||
if hasattr(task, 'usage') and task.usage:
|
||
task_dict['usage'] = {
|
||
'completion_tokens': getattr(task.usage, 'completion_tokens', 0),
|
||
'total_tokens': getattr(task.usage, 'total_tokens', 0)
|
||
}
|
||
else:
|
||
task_dict['usage'] = None
|
||
tasks_data.append(task_dict)
|
||
|
||
return {'success': True, 'data': {
|
||
'tasks': tasks_data,
|
||
'total': getattr(result, 'total', 0),
|
||
'page_num': page_num,
|
||
'page_size': page_size,
|
||
'limit': limit,
|
||
'offset': offset
|
||
}}
|
||
|
||
except Exception as e:
|
||
return {
|
||
'success': False,
|
||
'error': f'获取列表异常: {str(e)}'
|
||
}
|
||
|
||
def delete_task(self, task_id: str) -> Dict[str, Any]:
|
||
"""
|
||
删除任务
|
||
|
||
Args:
|
||
task_id: 任务ID
|
||
|
||
Returns:
|
||
删除结果
|
||
"""
|
||
try:
|
||
self.client.content_generation.tasks.delete(
|
||
task_id=task_id
|
||
)
|
||
return {'success': True}
|
||
|
||
except Exception as e:
|
||
return {
|
||
'success': False,
|
||
'error': f'删除异常: {str(e)}'
|
||
}
|
||
|
||
def get_chat_response(
|
||
self,
|
||
messages: List[Dict],
|
||
model: Optional[str] = None,
|
||
tools: Optional[List[ChatCompletionToolParam]] = None,
|
||
temperature: float = 0.6
|
||
) -> Dict[str, Any]:
|
||
"""同步获取聊天响应,支持工具调用。"""
|
||
|
||
try:
|
||
if model is None:
|
||
model = self.doubao_seed_1_6_model_id
|
||
if not messages or len(messages)==0:
|
||
raise ValueError(f"火山引擎API调用失败: 消息不能为空")
|
||
completion = self.client.chat.completions.create(
|
||
model=model,
|
||
messages=messages, # type: ignore
|
||
max_tokens=16384, # 16K
|
||
temperature=temperature,
|
||
timeout=600,
|
||
thinking={
|
||
"type": "disabled", # 不使用深度思考能力
|
||
# "type": "enabled", # 使用深度思考能力
|
||
# "type": "auto", # 模型自行判断是否使用深度思考能力
|
||
},
|
||
tools=tools # 传入 tools 参数
|
||
)
|
||
return completion.model_dump() # type: ignore 使用 model_dump() 转换为字典
|
||
except Exception as e:
|
||
raise ValueError(f"火山引擎API调用失败: {str(e)}")
|
||
|
||
def get_chat_response_stream(
|
||
self,
|
||
messages: List[Dict],
|
||
model: Optional[str] = None,
|
||
tools: Optional[List[ChatCompletionToolParam]] = None,
|
||
temperature: float = 0.6
|
||
) -> Iterable[str]:
|
||
"""流式获取聊天响应,支持工具调用。"""
|
||
|
||
try:
|
||
if model is None:
|
||
model = self.doubao_seed_1_6_model_id
|
||
|
||
completion = self.client.chat.completions.create(
|
||
model=model,
|
||
messages=messages, # type: ignore
|
||
temperature=temperature,
|
||
max_tokens=16384, # 16K
|
||
timeout=600,
|
||
stream=True,
|
||
thinking={
|
||
"type": "disabled", # 不使用深度思考能力
|
||
# "type": "enabled", # 使用深度思考能力
|
||
# "type": "auto", # 模型自行判断是否使用深度思考能力
|
||
},
|
||
tools=tools # 传入 tools 参数
|
||
)
|
||
for chunk in completion:
|
||
if chunk.choices and chunk.choices[0].delta.content is not None: # type: ignore
|
||
yield chunk.choices[0].delta.content # type: ignore
|
||
except Exception as e:
|
||
import traceback
|
||
traceback.print_exc()
|
||
raise ValueError(f"火山引擎API流式调用失败: {str(e)}")
|
||
|
||
def analyze_image(self, image_url: str, prompt: str = "请描述这张图片的内容", model: Optional[str] = None, detail: str = "high") -> Dict[str, Any]:
|
||
"""
|
||
图片理解功能 - 使用豆包视觉理解模型分析图片内容
|
||
|
||
Args:
|
||
image_url: 图片URL地址
|
||
prompt: 对图片的提问或要求,默认为描述图片内容
|
||
model: 模型名称,默认使用配置的视觉理解模型
|
||
detail: 图片理解精度,"low"为低精度,"high"为高精度,默认为高精度
|
||
|
||
Returns:
|
||
Dict包含:
|
||
- status: 'success' 或 'error'
|
||
- content: 模型分析结果文本
|
||
- message: 错误信息(如果有)
|
||
"""
|
||
try:
|
||
if model is None:
|
||
model = self.doubao_seed_1_6_model_id
|
||
|
||
# 构建消息内容
|
||
messages:Any = [
|
||
{
|
||
"role": "user",
|
||
"content": [
|
||
{
|
||
"type": "image_url",
|
||
"image_url": {
|
||
"url": image_url,
|
||
"detail": detail
|
||
}
|
||
},
|
||
{
|
||
"type": "text",
|
||
"text": prompt
|
||
}
|
||
]
|
||
}
|
||
]
|
||
|
||
# 调用聊天完成API
|
||
response = self.client.chat.completions.create(
|
||
model=model,
|
||
messages=messages,
|
||
max_tokens=16384, # 16K
|
||
temperature=0.6,
|
||
timeout=600
|
||
)
|
||
|
||
content = response.choices[0].message.content # pyright: ignore
|
||
|
||
return {
|
||
'status': 'success',
|
||
'content': content
|
||
}
|
||
|
||
except Exception as e:
|
||
return {
|
||
'status': 'error',
|
||
'message': f'图片理解API调用异常: {str(e)}'
|
||
}
|
||
|
||
def analyze_image_with_base64(self, image_base64: str, prompt: str = "请描述这张图片的内容", model: Optional[str] = None, detail: str = "high") -> Dict[str, Any]:
|
||
"""
|
||
图片理解功能 - 使用Base64编码的图片进行分析
|
||
|
||
Args:
|
||
image_base64: Base64编码的图片数据(需要包含data:image/jpeg;base64,前缀)
|
||
prompt: 对图片的提问或要求,默认为描述图片内容
|
||
model: 模型名称,默认使用配置的视觉理解模型
|
||
detail: 图片理解精度,"low"为低精度,"high"为高精度,默认为高精度
|
||
|
||
Returns:
|
||
Dict包含:
|
||
- status: 'success' 或 'error'
|
||
- content: 模型分析结果文本
|
||
- message: 错误信息(如果有)
|
||
"""
|
||
try:
|
||
if model is None:
|
||
model = self.doubao_seed_1_6_model_id
|
||
|
||
# 确保Base64数据包含正确的前缀
|
||
if not image_base64.startswith('data:image/'):
|
||
# 如果没有前缀,添加默认的JPEG前缀
|
||
image_base64 = f"data:image/jpeg;base64,{image_base64}"
|
||
|
||
# 构建消息内容
|
||
messages:Any = [
|
||
{
|
||
"role": "user",
|
||
"content": [
|
||
{
|
||
"type": "image_url",
|
||
"image_url": {
|
||
"url": image_base64,
|
||
"detail": detail
|
||
}
|
||
},
|
||
{
|
||
"type": "text",
|
||
"text": prompt
|
||
}
|
||
]
|
||
}
|
||
]
|
||
|
||
# 调用聊天完成API
|
||
response = self.client.chat.completions.create(
|
||
model=model,
|
||
messages=messages,
|
||
max_tokens=16384, # 16K
|
||
temperature=0.6,
|
||
timeout=600
|
||
)
|
||
|
||
content = response.choices[0].message.content # pyright: ignore
|
||
|
||
return {
|
||
'status': 'success',
|
||
'content': content
|
||
}
|
||
|
||
except Exception as e:
|
||
return {
|
||
'status': 'error',
|
||
'message': f'图片理解API调用异常: {str(e)}'
|
||
}
|
||
|
||
def super_resolution_v3(
|
||
self,
|
||
image_url: Optional[str] = None,
|
||
image_base64: Optional[str] = None,
|
||
model_quality: str = "MQ",
|
||
result_format: int = 0,
|
||
jpg_quality: int = 95,
|
||
return_url: bool = True,
|
||
) -> dict:
|
||
"""
|
||
图片超清功能
|
||
同步接口
|
||
图片超分辨率V3(超清)接口,异步任务提交,返回task_id
|
||
|
||
返回结果:
|
||
{
|
||
"status": "success",
|
||
"message": "",
|
||
"result": ["url1"]
|
||
}
|
||
|
||
错误结果:
|
||
{
|
||
"status": "error",
|
||
"message": "Error message; code: 10000"
|
||
}
|
||
"""
|
||
try:
|
||
body = {
|
||
"req_key": "lens_nnsr2_pic_common",
|
||
"model_quality": model_quality,
|
||
"result_format": result_format,
|
||
"jpg_quality": jpg_quality,
|
||
"return_url": return_url,
|
||
}
|
||
if image_url:
|
||
body["image_urls"] = [image_url]
|
||
elif image_base64:
|
||
body["binary_data_base64"] = [image_base64]
|
||
else:
|
||
return {"status": "error", "message": "必须提供 image_url 或 image_base64"}
|
||
|
||
resp = self.visual.cv_process(body)
|
||
if resp["message"] == 'Success':
|
||
urls = resp["data"]["image_urls"]
|
||
return {"status": "success", "result": urls}
|
||
else:
|
||
return {"status": "error", "message": f"{resp['message']}; code: {resp['code']}"}
|
||
except Exception as e:
|
||
import traceback
|
||
traceback.print_exc()
|
||
return {"status": "error", "message": str(e)}
|
||
|
||
def inpainting_remove(
|
||
self,
|
||
image_url: Optional[str] = None,
|
||
image_base64: Optional[str] = None,
|
||
mask_url: Optional[str] = None,
|
||
mask_base64: Optional[str] = None,
|
||
) -> dict:
|
||
"""
|
||
图片消除功能
|
||
同步接口
|
||
inpainting涂抹消除接口,异步任务提交,返回task_id
|
||
"""
|
||
try:
|
||
body = {
|
||
"req_key": "i2i_inpainting",
|
||
# "binary_data_base64": [image_url],
|
||
# "image_urls": [image_url],
|
||
"return_url": True,
|
||
# "steps":30,# 可选 采样步数,生成图像的精细程度,越大效果可能更好,但相应的耗时会剧增 默认值:30
|
||
# "strength":0.8,# 可选 float 取值范围(0.1,1.0),越小越接近原图,越大越接近文本控制,如果设成0就和原图一模一样 默认值:0.8
|
||
# "scale":7,# 可选 float 取值范围[1, 20],影响文本描述的程度 默认值:7
|
||
# "seed":0,# 可选 float 随机种子,作为确定扩散初始状态的基础,非负数(-1表示随机种子)。若随机种子为相同正整数且其他参数均一致,则生成图片极大概率效果一致 默认值:0
|
||
# "dilate_size":15,# 可选 mask膨胀半径,默认值15;传给算法做消除的mask应该包裹整个物体,一般用户涂抹区域都会大于物体,但如果提供的mask是通过分割算法获得一般会紧贴物体,请适当增加dilate_size(默认15),不然可能由于漏抠部分要消除的物体,导致无法消除/生成新的物体的情况。
|
||
# "quality":"M",# 可选 质量参数,默认为M H,质量最高,速度稍慢,M,质量中等,速度一般,L,质量较低,速度最快
|
||
# "logo_info":None,# 可选 水印信息
|
||
}
|
||
if image_url and mask_url:
|
||
body["image_urls"] = [image_url, mask_url]
|
||
elif image_base64 and mask_base64:
|
||
body["binary_data_base64"] = [image_base64, mask_base64]
|
||
else:
|
||
return {"status": "error", "message": "必须提供 image_url 或 image_base64"}
|
||
|
||
resp = self.visual.cv_process(body)
|
||
if resp["message"] == 'Success':
|
||
urls = resp["data"]["image_urls"]
|
||
return {"status": "success", "result": urls}
|
||
else:
|
||
return {"status": "error", "message": f"{resp['message']}; code: {resp['code']}"}
|
||
except Exception as e:
|
||
return {"status": "error", "message": str(e)}
|
||
|
||
def inpainting_edit(
|
||
self,
|
||
image_url: Optional[str] = None,
|
||
image_base64: Optional[str] = None,
|
||
mask_url: Optional[str] = None,
|
||
mask_base64: Optional[str] = None,
|
||
prompt: Optional[str] = None,
|
||
) -> dict:
|
||
"""
|
||
图片 局部重绘功能
|
||
同步接口
|
||
inpainting涂抹编辑接口,异步任务提交,返回task_id
|
||
"""
|
||
try:
|
||
body = {
|
||
"req_key": "i2i_inpainting_edit",
|
||
"custom_prompt": prompt,
|
||
"return_url": True,
|
||
# "steps":25,# 可选 采样步数,生成图像的精细程度,越大效果可能更好,但相应的耗时会剧增 默认值:25
|
||
# "scale":5,# 可选 float 取值范围[1, 20],影响文本描述的程度 默认值:5
|
||
# "seed":-1,# 可选 float 随机种子,作为确定扩散初始状态的基础,非负数(-1表示随机种子)。若随机种子为相同正整数且其他参数均一致,则生成图片极大概率效果一致 默认值:-1
|
||
# "logo_info":None,# 可选 水印信息
|
||
}
|
||
if image_url and mask_url:
|
||
body["image_urls"] = [image_url, mask_url]
|
||
elif image_base64 and mask_base64:
|
||
body["binary_data_base64"] = [image_base64, mask_base64]
|
||
else:
|
||
return {"status": "error", "message": "必须提供 image_url 或 image_base64"}
|
||
|
||
resp = self.visual.cv_process(body)
|
||
if resp["message"] == 'Success':
|
||
urls = resp["data"]["image_urls"]
|
||
return {"status": "success", "result": urls}
|
||
else:
|
||
return {"status": "error", "message": f"{resp['message']}; code: {resp['code']}"}
|
||
except Exception as e:
|
||
return {"status": "error", "message": str(e)}
|
||
|
||
def outpainting_expand_mask(
|
||
self,
|
||
image_url: Optional[str] = None,
|
||
image_base64: Optional[str] = None,
|
||
mask_url: Optional[str] = None,
|
||
mask_base64: Optional[str] = None,
|
||
prompt: Optional[str] = None,
|
||
) -> dict:
|
||
"""
|
||
图片扩图功能 使用mask遮罩扩图
|
||
同步接口
|
||
outpainting智能扩图接口,异步任务提交,返回task_id
|
||
"""
|
||
try:
|
||
|
||
body = {
|
||
"req_key": "i2i_outpainting",
|
||
"custom_prompt": prompt,
|
||
"return_url": True,
|
||
# "top":scale,# 可选 取值范围:(0,1],向上扩展比例,暂定最大扩展单边1倍 默认值:0.1
|
||
# "bottom":scale,# 可选 取值范围:(0,1],向下扩展比例,暂定最大扩展单边1倍 默认值:0.1
|
||
# "left":scale,# 可选 取值范围:(0,1],向左扩展比例,暂定最大扩展单边1倍 默认值:0.1
|
||
# "right":scale,# 可选 取值范围:(0,1],向右扩展比例,暂定最大扩展单边1倍 默认值:0.1
|
||
"max_height":4096, # 最大输出高度 默认值:1920,在扩图处理后resize到指定尺寸进行兜底
|
||
"max_width":4096, # 最大输出宽度 默认值:1920,在扩图处理后resize到指定尺寸进行兜底
|
||
# "steps":30,# 可选 采样步数,生成图像的精细程度,越大效果可能更好,但相应的耗时会剧增 默认值:30
|
||
# "strength":0.8,# 可选 float 取值范围(0.1,1.0),越小越接近原图,越大越接近文本控制,如果设成0就和原图一模一样 默认值:0.8
|
||
# "scale":7,# 可选 float 取值范围[1, 20],影响文本描述的程度 默认值:7
|
||
# "seed":-1,# 可选 float 随机种子,作为确定扩散初始状态的基础,非负数(-1表示随机种子)。若随机种子为相同正整数且其他参数均一致,则生成图片极大概率效果一致 默认值:0
|
||
# "logo_info":None,# 可选 水印信息
|
||
}
|
||
if image_url and mask_url:
|
||
body["image_urls"] = [image_url, mask_url]
|
||
elif image_base64 and mask_base64:
|
||
body["binary_data_base64"] = [image_base64, mask_base64]
|
||
else:
|
||
return {"status": "error", "message": "必须提供 image_url 或 image_base64"}
|
||
print(f'扩图接口提交参数{body}')
|
||
resp = self.visual.cv_process( body)
|
||
print(f'扩图接口返回{resp}')
|
||
if resp["message"] == 'Success':
|
||
urls = resp["data"]["image_urls"]
|
||
return {"status": "success", "result": urls}
|
||
else:
|
||
return {"status": "error", "message": f"{resp['message']}; code: {resp['code']}"}
|
||
except Exception as e:
|
||
return {"status": "error", "message": str(e)}
|
||
|
||
|
||
def outpainting_expand_scale(
|
||
self,
|
||
image_url: Optional[str] = None,
|
||
image_base64: Optional[str] = None,
|
||
prompt: Optional[str] = None,
|
||
scale: Optional[float] = None,
|
||
) -> dict:
|
||
"""
|
||
图片扩图功能 等比扩大
|
||
同步接口
|
||
outpainting智能扩图接口,异步任务提交,返回task_id
|
||
"""
|
||
try:
|
||
if scale is None:
|
||
return {"status": "error", "message": "scale 不能为空"}
|
||
|
||
# 转换 scale 为 float 类型
|
||
try:
|
||
scale = float(scale)
|
||
except (ValueError, TypeError):
|
||
return {"status": "error", "message": "scale 必须是有效的数字"}
|
||
|
||
if scale <= 0 or scale > 1:
|
||
return {"status": "error", "message": "scale 取值范围:(0,1]"}
|
||
|
||
body = {
|
||
"req_key": "i2i_outpainting",
|
||
"custom_prompt": prompt,
|
||
"return_url": True,
|
||
"top":scale,# 可选 取值范围:(0,1],向上扩展比例,暂定最大扩展单边1倍 默认值:0.1
|
||
"bottom":scale,# 可选 取值范围:(0,1],向下扩展比例,暂定最大扩展单边1倍 默认值:0.1
|
||
"left":scale,# 可选 取值范围:(0,1],向左扩展比例,暂定最大扩展单边1倍 默认值:0.1
|
||
"right":scale,# 可选 取值范围:(0,1],向右扩展比例,暂定最大扩展单边1倍 默认值:0.1
|
||
# "max_height":1920, # 最大输出高度 默认值:1920,在扩图处理后resize到指定尺寸进行兜底
|
||
# "max_width":1920, # 最大输出宽度 默认值:1920,在扩图处理后resize到指定尺寸进行兜底
|
||
# "steps":30,# 可选 采样步数,生成图像的精细程度,越大效果可能更好,但相应的耗时会剧增 默认值:30
|
||
# "strength":0.8,# 可选 float 取值范围(0.1,1.0),越小越接近原图,越大越接近文本控制,如果设成0就和原图一模一样 默认值:0.8
|
||
# "scale":7,# 可选 float 取值范围[1, 20],影响文本描述的程度 默认值:7
|
||
# "seed":0,# 可选 float 随机种子,作为确定扩散初始状态的基础,非负数(-1表示随机种子)。若随机种子为相同正整数且其他参数均一致,则生成图片极大概率效果一致 默认值:0
|
||
# "logo_info":None,# 可选 水印信息
|
||
}
|
||
if image_url:
|
||
body["image_urls"] = [image_url]
|
||
elif image_base64:
|
||
body["binary_data_base64"] = [image_base64]
|
||
else:
|
||
return {"status": "error", "message": "必须提供 image_url 或 image_base64"}
|
||
|
||
resp = self.visual.cv_process( body)
|
||
# print(f'扩图接口返回{resp}')
|
||
if resp["message"] == 'Success':
|
||
urls = resp["data"]["image_urls"]
|
||
return {"status": "success", "result": urls}
|
||
else:
|
||
return {"status": "error", "message": f"{resp['message']}; code: {resp['code']}"}
|
||
except Exception as e:
|
||
return {"status": "error", "message": str(e)}
|
||
|
||
|
||
def video_change_lips_submit(
|
||
self,
|
||
voice_url: str,
|
||
video_url: str,
|
||
templ_start_seconds: float = 0,
|
||
) -> dict:
|
||
"""
|
||
视频改口型
|
||
同步接口
|
||
视频改口型接口,异步任务提交,返回task_id
|
||
"""
|
||
try:
|
||
body = {
|
||
"req_key": "realman_change_lips", # Lite模式
|
||
# "req_key": "realman_change_lips_basic_chimera", # Basic模式
|
||
"url": video_url,
|
||
"pure_audio_url": voice_url,
|
||
"templ_start_seconds": templ_start_seconds,
|
||
}
|
||
print(f'视频改口型 提交参数{body}')
|
||
resp = self.visual.cv_submit_task( body)
|
||
print(f'视频改口型 返回{resp}')
|
||
if resp["message"] == 'Success':
|
||
task_id = resp["data"]["task_id"]
|
||
return {"status": "success", "job_id": task_id}
|
||
else:
|
||
return {"status": "error", "message": f"{resp['message']}; code: {resp['code']}"}
|
||
except Exception as e:
|
||
return {"status": "error", "message": str(e)}
|
||
|
||
|
||
def video_change_lips_result(self, task_id: str) -> dict:
|
||
"""
|
||
查询视频改口型任务结果
|
||
"""
|
||
result = self._get_visual_task(req_key="realman_change_lips_basic_chimera", task_id=task_id)
|
||
# print(f'视频改口型 查询结果{result}')
|
||
if result["status"] == "success":
|
||
return {"status": "success", "result": result.get('result', {}).get('url', '')}
|
||
else:
|
||
return result
|
||
|
||
|
||
def _get_visual_task(
|
||
self,
|
||
req_key: str,
|
||
task_id: str,
|
||
) -> dict:
|
||
"""
|
||
查询火山 visual 任务结果
|
||
"""
|
||
try:
|
||
body = {
|
||
"req_key": req_key,
|
||
"task_id": task_id,
|
||
}
|
||
|
||
resp = self.visual.cv_get_result( body)
|
||
code = resp["code"]
|
||
data = resp["data"]
|
||
status = data["status"]
|
||
print(f'对口型 查询任务结果 返回{resp}')
|
||
if code == 10000: # 成功
|
||
if status == 'in_queue' or status == 'generating':
|
||
return {"status": "pending", "message": f"任务执行中"}
|
||
elif status == 'done':
|
||
resp_data = data["resp_data"]
|
||
if isinstance(resp_data, (dict, list)):
|
||
# 是对象,直接返回
|
||
return {"status": "success", "result": resp_data}
|
||
elif isinstance(resp_data, str):
|
||
# 是字符串,判断是不是json字符串
|
||
try:
|
||
obj = json.loads(resp_data)
|
||
# 能loads说明是json字符串,转成对象返回
|
||
return {"status": "success", "result": obj}
|
||
except Exception:
|
||
# 不是json字符串,直接返回原字符串
|
||
return {"status": "success", "result": resp_data}
|
||
else:
|
||
# 其它类型,直接返回
|
||
return {"status": "success", "result": resp_data}
|
||
else:
|
||
return {"status": "error", "message": f"任务执行失败 status:{status}"}
|
||
else:
|
||
return {"status": "error", "message": f"查询请求失败 code:{code}"}
|
||
except Exception as e:
|
||
return {"status": "error", "message": f"查询任务结果 异常{str(e)}"}
|
||
|
||
|
||
def check_visual_task(self, visual_task_id: str, fn_name: str) -> dict:
|
||
"""
|
||
查询任务结果
|
||
根据fn_name 调用不同的查询方法
|
||
"""
|
||
try:
|
||
if fn_name == 'video_change_lips_submit':
|
||
return self.video_change_lips_result(visual_task_id)
|
||
else:
|
||
return {"status": "error", "message": f"不支持的火山 visual 任务: {fn_name}"}
|
||
except Exception as e:
|
||
return {"status": "error", "message": f"查询任务结果 异常{str(e)}"}
|
||
|
||
|
||
def _workflow_video_url_upload(self,
|
||
video_url: str,
|
||
workflow_id: str,
|
||
file_path: str,
|
||
task_id: str,
|
||
):
|
||
try:
|
||
space_name = API_CONFIG['huoshan']['workflow_space_name']
|
||
req = VodUrlUploadRequest()
|
||
req.SpaceName = space_name # pyright: ignore
|
||
url_set = req.URLSets.add() # pyright: ignore
|
||
url_set.SourceUrl = video_url
|
||
# url_set.Templates = [
|
||
# {"TemplateIds":[workflow_id]}
|
||
# ]
|
||
url_set.TemplateId = workflow_id
|
||
url_set.FileName = f"{file_path}/upload/{task_id}.mp4"
|
||
url_set.CallbackArgs = json.dumps({'server_task_id': task_id})
|
||
# customUrlHeaders = {'server_task_id': task_id}
|
||
# url_set.CustomURLHeaders.update(**customUrlHeaders)
|
||
resp = self.vodClient.upload_media_by_url(req)
|
||
print(f'上传视频到火山视频点播 工作流 返回{resp}')
|
||
|
||
# 检查是否有错误 - 检查 Error 对象中的 Code 属性
|
||
# 成功的响应中,Error对象可能存在但Code为空
|
||
error = getattr(resp.ResponseMetadata, 'Error', None)
|
||
if error and getattr(error, 'Code', None):
|
||
return {"status": "error", "message": f"上传视频到火山视频点播 工作流 失败: {error}"}
|
||
else:
|
||
# 成功情况下获取 JobId - 使用属性访问
|
||
if hasattr(resp, 'Result') and hasattr(resp.Result, 'Data'):
|
||
job_id = ''
|
||
if hasattr(resp.Result.Data, 'JobId'):
|
||
job_id = resp.Result.Data.JobId
|
||
return {"status": "success", "upload_job_id": job_id or ''}
|
||
else:
|
||
return {"status": "error", "message": "响应格式异常,无法获取 JobId"}
|
||
except Exception as e:
|
||
import traceback
|
||
traceback.print_exc()
|
||
return {"status": "error", "message": f"异常{str(e)}"}
|
||
|
||
def get_workflow_result(self, task_doc: dict):
|
||
"""
|
||
查询火山视频点播 工作流 结果
|
||
同步接口
|
||
完成返回 {"status": "success", "result": "store_uri"}
|
||
失败返回 {"status": "error", "message": "message"}
|
||
执行中返回 {"status": "pending", "message": "任务执行中"}
|
||
"""
|
||
try:
|
||
if task_doc:
|
||
volcengine_workflow_data = task_doc.get('volcengine_workflow', {})
|
||
# uploading, working, completed, failed
|
||
status = volcengine_workflow_data.get('status', '')
|
||
if status == 'completed':
|
||
store_uri = volcengine_workflow_data.get('store_uri', '')
|
||
file_name = volcengine_workflow_data.get('file_name', '')
|
||
if store_uri:
|
||
return {"status": "success", "result": store_uri, "file_name": file_name}
|
||
else:
|
||
return {"status": "error", "message": f"返回无store_uri"}
|
||
elif status == 'failed':
|
||
end_type = volcengine_workflow_data.get('end_type', '')
|
||
error_message = volcengine_workflow_data.get('error_message', '')
|
||
error_code = volcengine_workflow_data.get('error_code', '')
|
||
return {"status": "error", "message": f"任务执行失败 end_type:{end_type} code:{error_code} message:{error_message} "}
|
||
else:
|
||
return {"status": "pending", "message": f"任务执行中 status:{status}"}
|
||
else:
|
||
return {"status": "pending", "message": f"任务执行中"}
|
||
except Exception as e:
|
||
import traceback
|
||
traceback.print_exc()
|
||
return {"status": "error", "message": f"异常{str(e)}"}
|
||
|
||
def video_frame_upsample(
|
||
self,
|
||
task_id: str,
|
||
video_url: str,
|
||
) -> dict:
|
||
"""
|
||
视频补帧
|
||
同步接口
|
||
完成返回 {"status": "success", "result": "store_uri"}
|
||
失败返回 {"status": "error", "message": "message"}
|
||
执行中返回 {"status": "pending", "message": "任务执行中"}
|
||
"""
|
||
workflow_id = API_CONFIG['huoshan']['workflow_id']['video_frame_upsample']
|
||
file_path = API_CONFIG['huoshan']['workflow_file_path']
|
||
|
||
try:
|
||
return self._workflow_video_url_upload(
|
||
video_url=video_url,
|
||
workflow_id=workflow_id,
|
||
file_path=f'{file_path}/video/frame_upsample',
|
||
task_id=task_id,
|
||
)
|
||
except Exception as e:
|
||
return {"status": "error", "message": f"video_frame_upsample 异常 {str(e)}"}
|
||
|
||
def video_super_resolution(
|
||
self,
|
||
task_id: str,
|
||
video_url: str,
|
||
) -> dict:
|
||
"""
|
||
视频超清
|
||
同步接口
|
||
完成返回 {"status": "success", "result": "store_uri"}
|
||
失败返回 {"status": "error", "message": "message"}
|
||
执行中返回 {"status": "pending", "message": "任务执行中"}
|
||
"""
|
||
# 工作流id
|
||
workflow_id = API_CONFIG['huoshan']['workflow_id']['video_super_resolution']
|
||
file_path = API_CONFIG['huoshan']['workflow_file_path']
|
||
|
||
try:
|
||
return self._workflow_video_url_upload(
|
||
video_url=video_url,
|
||
workflow_id=workflow_id,
|
||
file_path=f'{file_path}/video/super_resolution',
|
||
task_id=task_id,
|
||
)
|
||
except Exception as e:
|
||
return {"status": "error", "message": f"video_super_resolution 异常 {str(e)}"}
|
||
|
||
def text_to_speech(
|
||
self,
|
||
text: str,
|
||
speaker: str = "zh_female_wanqudashu_moon_bigtts",
|
||
emotion: str = "",
|
||
output_format: str = "mp3",
|
||
sample_rate: int = 24000,
|
||
app_id: Optional[str] = None,
|
||
access_key: Optional[str] = None,
|
||
resource_id: Optional[str] = None,
|
||
emotion_scale: Optional[float] = None,
|
||
speech_rate: Optional[int] = None,
|
||
loudness_rate: Optional[int] = None,
|
||
# uid: str = "12345"
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
文字转语音功能
|
||
豆包语音 大模型 不包括自定义音色
|
||
|
||
Args:
|
||
text: 要转换的文本内容
|
||
speaker: 语音合成的音色,默认为"zh_female_wanqudashu_moon_bigtts"
|
||
output_format: 输出音频格式,默认为"mp3"
|
||
sample_rate: 采样率,默认为24000
|
||
app_id: 应用ID,如果不提供则从配置中获取
|
||
access_key: 访问密钥,如果不提供则从配置中获取
|
||
resource_id: 资源ID,如果不提供则从配置中获取
|
||
emotion_scale: 情感强度,范围1~5,不设置时默认值为4。
|
||
speech_rate: 语速,取值范围[-50,100],100代表2.0倍速,-50代表0.5倍数
|
||
loudness_rate: 音量,取值范围[-50,100],100代表2.0倍音量,-50代表0.5倍音量(mix音色暂不支持)
|
||
Returns:
|
||
Dict包含:
|
||
- status: 'success' 或 'error'
|
||
- audio_data: 音频数据的字节数组(成功时)
|
||
- audio_size: 音频数据大小(成功时)
|
||
- message: 错误信息(如果有)
|
||
"""
|
||
try:
|
||
# 从配置中获取参数,如果没有传入的话
|
||
final_app_id = API_CONFIG.get('huoshan', {}).get('doubao_voice', {}).get('AppID', '')
|
||
final_access_key = API_CONFIG.get('huoshan', {}).get('doubao_voice', {}).get('AccessToken', '')
|
||
final_resource_id = API_CONFIG.get('huoshan', {}).get('doubao_voice', {}).get('resourceID', '')
|
||
|
||
if not all([final_app_id, final_access_key, final_resource_id]):
|
||
return {
|
||
'status': 'error',
|
||
'message': 'TTS配置参数不完整,需要app_id、access_key和resource_id'
|
||
}
|
||
|
||
# 请求地址
|
||
url = "https://openspeech.bytedance.com/api/v3/tts/unidirectional"
|
||
|
||
# 请求头
|
||
headers = {
|
||
"X-Api-App-Id": final_app_id,
|
||
"X-Api-Access-Key": final_access_key,
|
||
"X-Api-Resource-Id": final_resource_id,
|
||
"X-Api-App-Key": "aGjiRDfUWi",
|
||
"Content-Type": "application/json",
|
||
"Connection": "keep-alive"
|
||
}
|
||
|
||
# 附加参数
|
||
additions = {
|
||
"disable_markdown_filter": True,
|
||
"enable_language_detector": True,
|
||
"enable_latex_tn": True,
|
||
"disable_default_bit_rate": True,
|
||
"max_length_to_filter_parenthesis": 0,
|
||
}
|
||
|
||
# 请求负载
|
||
payload = {
|
||
# "user": {"uid": uid},
|
||
"req_params": {
|
||
"text": text,
|
||
"speaker": speaker,
|
||
"additions": json.dumps(additions),
|
||
"audio_params": {
|
||
"format": output_format,
|
||
"sample_rate": sample_rate,
|
||
"emotion":emotion,
|
||
"emotion_scale":emotion_scale,
|
||
"speech_rate":speech_rate,
|
||
"loudness_rate":loudness_rate
|
||
},
|
||
}
|
||
}
|
||
|
||
# 发送请求
|
||
import requests
|
||
session = requests.Session()
|
||
# print(f'TTS 发送请求 payload: {payload}')
|
||
response = session.post(url, headers=headers, json=payload, stream=True, timeout=600)
|
||
# print(f'TTS 返回 response: {response}')
|
||
|
||
# 用于存储音频数据
|
||
audio_data = bytearray()
|
||
total_audio_size = 0
|
||
|
||
try:
|
||
for chunk in response.iter_lines(decode_unicode=True):
|
||
if not chunk:
|
||
continue
|
||
|
||
try:
|
||
data = json.loads(chunk)
|
||
|
||
# 检查是否有错误
|
||
if data.get("code", 0) > 0:
|
||
if data.get("code", 0) == 20000000:
|
||
# 正常结束
|
||
break
|
||
else:
|
||
# 错误情况
|
||
error_msg = data.get("message", "TTS请求失败")
|
||
return {
|
||
'status': 'error',
|
||
'message': f'TTS API返回错误: {error_msg} (code: {data.get("code", 0)})'
|
||
}
|
||
|
||
# 处理音频数据
|
||
if data.get("code", 0) == 0 and "data" in data and data["data"]:
|
||
chunk_audio = base64.b64decode(data["data"])
|
||
audio_size = len(chunk_audio)
|
||
total_audio_size += audio_size
|
||
audio_data.extend(chunk_audio)
|
||
|
||
except json.JSONDecodeError:
|
||
continue
|
||
|
||
if audio_data:
|
||
return {
|
||
'status': 'success',
|
||
'audio_data': bytes(audio_data),
|
||
'audio_size': total_audio_size
|
||
}
|
||
else:
|
||
return {
|
||
'status': 'error',
|
||
'message': '未收到音频数据'
|
||
}
|
||
|
||
finally:
|
||
response.close()
|
||
session.close()
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
return {
|
||
'status': 'error',
|
||
'message': f'TTS请求异常: {str(e)}'
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
'status': 'error',
|
||
'message': f'TTS处理异常: {str(e)}'
|
||
}
|
||
|
||
|