2026-01-25 19:27:44 +08:00

284 lines
8.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
剧集内容管理 API 路由
提供剧集内容的 CRUD、审核和导出功能
"""
from fastapi import APIRouter, Depends, HTTPException, status, Query
from typing import List, Optional
from pydantic import BaseModel
from datetime import datetime
from app.models.episode_content import (
EpisodeContent,
EpisodeContentCreate,
EpisodeContentUpdate,
EpisodeContentReview,
ExportFormat,
ExportRequest
)
from app.utils.logger import get_logger
logger = get_logger(__name__)
router = APIRouter(prefix="/episodes", tags=["剧集内容管理"])
# 简单的内存存储(实际应该使用 MongoDB
# TODO: 替换为真实的数据库存储
_episode_contents: dict = {}
_content_id_counter = 0
def _generate_content_id(project_id: str, episode_number: int) -> str:
"""生成内容 ID"""
global _content_id_counter
_content_id_counter += 1
return f"ep_{project_id}_{episode_number:03d}_{_content_id_counter:03d}"
@router.get("/{project_id}", response_model=List[EpisodeContent])
async def list_episode_contents(
project_id: str,
status: Optional[str] = None,
episode_number: Optional[int] = None
):
"""
列出项目的所有剧集内容
- **project_id**: 项目 ID
- **status**: 筛选状态
- **episode_number**: 筛选集数
"""
contents = []
for content_id, content in _episode_contents.items():
if content.get('project_id') != project_id:
continue
if status and content.get('status') != status:
continue
if episode_number is not None and content.get('episode_number') != episode_number:
continue
contents.append(EpisodeContent(**content))
# 按集数排序
contents.sort(key=lambda x: x.episode_number)
return contents
@router.get("/{project_id}/{episode_number}", response_model=EpisodeContent)
async def get_episode_content(
project_id: str,
episode_number: int
):
"""
获取指定集数的内容
"""
for content in _episode_contents.values():
if content.get('project_id') == project_id and content.get('episode_number') == episode_number:
return EpisodeContent(**content)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"内容不存在: 项目 {project_id}, 第 {episode_number}"
)
@router.post("/", response_model=EpisodeContent, status_code=status.HTTP_201_CREATED)
async def create_episode_content(data: EpisodeContentCreate):
"""
创建新的剧集内容
通常由 Agent 自动调用,保存生成的内容
"""
global _content_id_counter
# 生成 ID
content_id = _generate_content_id(data.project_id, data.episode_number)
# 创建内容对象
content = {
"id": content_id,
"project_id": data.project_id,
"episode_number": data.episode_number,
"title": data.title,
"content": data.content,
"status": "draft",
"quality_score": None,
"review_issues": [],
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"created_by": "system",
"generation_params": data.generation_params,
"version": 1,
"parent_version": None
}
_episode_contents[content_id] = content
logger.info(f"创建剧集内容: {content_id}")
return EpisodeContent(**content)
@router.put("/{content_id}", response_model=EpisodeContent)
async def update_episode_content(
content_id: str,
data: EpisodeContentUpdate
):
"""
更新剧集内容
支持用户编辑生成后的内容
"""
if content_id not in _episode_contents:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"内容不存在: {content_id}"
)
content = _episode_contents[content_id].copy()
# 更新字段
if data.title is not None:
content['title'] = data.title
if data.content is not None:
content['content'] = data.content
# 增加版本号
content['version'] = content.get('version', 1) + 1
if data.status is not None:
content['status'] = data.status
content['updated_at'] = datetime.now().isoformat()
_episode_contents[content_id] = content
logger.info(f"更新剧集内容: {content_id}")
return EpisodeContent(**content)
@router.post("/{content_id}/review", response_model=EpisodeContent)
async def review_episode_content(
content_id: str,
review: EpisodeContentReview
):
"""
审核剧集内容
通过或拒绝生成的内容
"""
if content_id not in _episode_contents:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"内容不存在: {content_id}"
)
content = _episode_contents[content_id].copy()
# 更新状态
content['status'] = 'approved' if review.approved else 'rejected'
content['updated_at'] = datetime.now().isoformat()
if review.comment:
content['review_comment'] = review.comment
_episode_contents[content_id] = content
logger.info(f"审核剧集内容: {content_id}, 结果: {content['status']}")
return EpisodeContent(**content)
@router.post("/{project_id}/export")
async def export_episode_contents(
project_id: str,
request: ExportRequest
):
"""
导出剧集内容
支持多种格式Markdown, TXT, PDF, DOCX
"""
# 获取内容
contents = []
for content in _episode_contents.values():
if content.get('project_id') != project_id:
continue
if request.episode_numbers and content.get('episode_number') not in request.episode_numbers:
continue
contents.append(content)
if not contents:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"没有找到可导出的内容"
)
# 按格式生成导出内容
if request.format == ExportFormat.MARKDOWN or request.format == ExportFormat.TXT:
# 文本格式Markdown/TXT
output_lines = []
output_lines.append(f"# 剧集内容导出\n")
output_lines.append(f"导出时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
output_lines.append(f"总集数: {len(contents)}\n")
output_lines.append("---\n\n")
for content in sorted(contents, key=lambda x: x.get('episode_number', 0)):
output_lines.append(f"## 第 {content.get('episode_number')}")
if content.get('title'):
output_lines.append(f"### {content['title']}")
output_lines.append("")
output_lines.append(content['content'])
output_lines.append("\n\n---\n\n")
return {
"success": True,
"format": request.format,
"content": "\n".join(output_lines),
"filename": f"episodes_{project_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
}
elif request.format == ExportFormat.PDF:
# PDF 格式(需要额外库支持,这里返回 Markdown 建议前端转换)
return {
"success": True,
"format": request.format,
"message": "PDF 导出功能需要前端渲染后转换",
"markdown_content": "\n\n".join([c['content'] for c in contents])
}
elif request.format == ExportFormat.DOCX:
# DOCX 格式(需要 python-docx 库)
return {
"success": True,
"format": request.format,
"message": "DOCX 导出功能需要后端安装 python-docx 库",
"markdown_content": "\n\n".join([c['content'] for c in contents])
}
@router.delete("/{content_id}")
async def delete_episode_content(content_id: str):
"""
删除剧集内容
"""
if content_id not in _episode_contents:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"内容不存在: {content_id}"
)
del _episode_contents[content_id]
logger.info(f"删除剧集内容: {content_id}")
return {"success": True, "message": "删除成功"}