1.添加后台管理页面,使用网址进入后台管理页面:http://localhost:5174/admin 2.剧种分类完成,只要用户再后台管理页面选择类型之后会一直显示 3.在两个脚本运行的时候可以同时启动两个浏览器页面不受影响(原因是:必须要管理数据库有了数据之后前端点赞才可以显示,但是主代码运行慢,为了不浪费时间 每次定时器运行的时候都会通过视频ID来同步短剧的详细信息) 前端可以稳定的显示数据。
2016 lines
87 KiB
Python
2016 lines
87 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
小程序专用抖音播放量数据API服务器
|
||
优化的数据格式和接口设计,专为小程序使用
|
||
"""
|
||
|
||
from flask import Blueprint, request, jsonify
|
||
from datetime import datetime, timedelta
|
||
import logging
|
||
import re
|
||
from database import db
|
||
|
||
# 创建蓝图
|
||
rank_bp = Blueprint('rank', __name__, url_prefix='/api/rank')
|
||
|
||
# 获取数据库集合
|
||
collection = db['Ranking_storage'] # 主要数据源:榜单存储表(包含data数组)
|
||
rankings_management_collection = db['Rankings_management'] # 管理数据库(字段同步源)
|
||
|
||
def format_playcount(playcount_str):
|
||
"""格式化播放量字符串为数字"""
|
||
if not playcount_str:
|
||
return 0
|
||
|
||
try:
|
||
if isinstance(playcount_str, (int, float)):
|
||
return int(playcount_str)
|
||
|
||
playcount_str = str(playcount_str).strip()
|
||
|
||
# 处理亿、万等单位
|
||
if "亿" in playcount_str:
|
||
num = float(re.findall(r'[\d.]+', playcount_str)[0])
|
||
return int(num * 100000000)
|
||
elif "万" in playcount_str:
|
||
num = float(re.findall(r'[\d.]+', playcount_str)[0])
|
||
return int(num * 10000)
|
||
else:
|
||
# 尝试直接转换数字
|
||
return int(float(playcount_str))
|
||
except:
|
||
return 0
|
||
|
||
def format_time(time_obj):
|
||
"""格式化时间"""
|
||
if not time_obj:
|
||
return ""
|
||
|
||
if isinstance(time_obj, datetime):
|
||
return time_obj.strftime("%Y-%m-%d %H:%M:%S")
|
||
else:
|
||
return str(time_obj)
|
||
|
||
def parse_date_string(date_str):
|
||
"""通用日期解析函数"""
|
||
try:
|
||
if isinstance(date_str, str):
|
||
return datetime.strptime(date_str, '%Y-%m-%d').date()
|
||
return date_str
|
||
except (ValueError, TypeError):
|
||
logging.warning(f"无法解析日期字符串: {date_str}")
|
||
return None
|
||
|
||
def find_management_data(query, target_date=None):
|
||
"""
|
||
通用的管理数据查询函数,优先使用mix_id进行查询
|
||
|
||
Args:
|
||
query: 查询条件字典,可以包含mix_id, mix_name等字段
|
||
target_date: 目标日期,用于日期过滤
|
||
|
||
Returns:
|
||
查询到的文档或None
|
||
"""
|
||
try:
|
||
# 如果查询条件中有mix_id,优先使用mix_id查询
|
||
if 'mix_id' in query and query['mix_id']:
|
||
mix_id_query = {"mix_id": query['mix_id']}
|
||
|
||
# 添加日期过滤(如果提供了target_date)
|
||
if target_date:
|
||
if isinstance(target_date, str):
|
||
target_date = parse_date_string(target_date)
|
||
if target_date:
|
||
start_of_day = datetime.combine(target_date, datetime.min.time())
|
||
end_of_day = datetime.combine(target_date, datetime.max.time())
|
||
mix_id_query.update({
|
||
"$or": [
|
||
{"created_at": {"$gte": start_of_day, "$lte": end_of_day}},
|
||
{"last_updated": {"$gte": start_of_day, "$lte": end_of_day}}
|
||
]
|
||
})
|
||
|
||
result = rankings_management_collection.find_one(mix_id_query)
|
||
if result:
|
||
logging.info(f"通过mix_id找到管理数据: {query['mix_id']}")
|
||
return result
|
||
|
||
# 如果通过mix_id没找到,或者没有mix_id,尝试其他查询条件
|
||
fallback_query = {k: v for k, v in query.items() if k != 'mix_id'}
|
||
|
||
# 添加日期过滤(如果提供了target_date)
|
||
if target_date and fallback_query:
|
||
if isinstance(target_date, str):
|
||
target_date = parse_date_string(target_date)
|
||
if target_date:
|
||
start_of_day = datetime.combine(target_date, datetime.min.time())
|
||
end_of_day = datetime.combine(target_date, datetime.max.time())
|
||
fallback_query.update({
|
||
"$or": [
|
||
{"created_at": {"$gte": start_of_day, "$lte": end_of_day}},
|
||
{"last_updated": {"$gte": start_of_day, "$lte": end_of_day}}
|
||
]
|
||
})
|
||
|
||
if fallback_query:
|
||
result = rankings_management_collection.find_one(fallback_query)
|
||
if result:
|
||
logging.info(f"通过备用查询找到管理数据: {fallback_query}")
|
||
return result
|
||
|
||
logging.warning(f"未找到匹配的管理数据: {query}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logging.error(f"查询管理数据时出错: {e}")
|
||
return None
|
||
|
||
def sort_ranking_data(ranking_data, sort_by, sort_order='desc'):
|
||
"""
|
||
对榜单数据进行动态排序
|
||
|
||
Args:
|
||
ranking_data: 榜单数据列表
|
||
sort_by: 排序字段 (play_vv_change, play_vv_change_rate, play_vv, rank)
|
||
sort_order: 排序顺序 (asc, desc)
|
||
|
||
Returns:
|
||
排序后的榜单数据
|
||
"""
|
||
try:
|
||
# 定义排序键函数
|
||
def get_sort_key(item):
|
||
if sort_by == 'play_vv_change':
|
||
# 按播放量差值排序
|
||
timeline_data = item.get('timeline_data', {})
|
||
return timeline_data.get('play_vv_change', 0)
|
||
elif sort_by == 'play_vv_change_rate':
|
||
# 按播放量变化率排序
|
||
timeline_data = item.get('timeline_data', {})
|
||
return timeline_data.get('play_vv_change_rate', 0)
|
||
elif sort_by == 'play_vv':
|
||
# 按当前播放量排序
|
||
return item.get('play_vv', 0)
|
||
elif sort_by == 'rank':
|
||
# 按排名排序
|
||
return item.get('rank', 999999)
|
||
else:
|
||
# 默认按排名排序
|
||
return item.get('rank', 999999)
|
||
|
||
# 执行排序
|
||
reverse = (sort_order == 'desc')
|
||
|
||
# 对于排名字段,降序实际上是升序(排名越小越好)
|
||
if sort_by == 'rank':
|
||
reverse = (sort_order == 'asc')
|
||
|
||
sorted_data = sorted(ranking_data, key=get_sort_key, reverse=reverse)
|
||
|
||
# 重新分配排名
|
||
for i, item in enumerate(sorted_data, 1):
|
||
item['current_sort_rank'] = i
|
||
|
||
return sorted_data
|
||
|
||
except Exception as e:
|
||
logging.error(f"排序榜单数据失败: {e}")
|
||
# 如果排序失败,返回原始数据
|
||
return ranking_data
|
||
|
||
|
||
|
||
def format_interaction_count(count):
|
||
"""格式化互动数量为易读格式"""
|
||
try:
|
||
count = int(count)
|
||
if count >= 100000000: # 1亿+
|
||
return f"{count / 100000000:.1f}亿"
|
||
elif count >= 10000: # 1万+
|
||
return f"{count / 10000:.1f}万"
|
||
else:
|
||
return str(count)
|
||
except:
|
||
return "0"
|
||
|
||
def format_mix_item(doc, target_date=None):
|
||
"""格式化合集数据项 - 完全按照数据库原始字段返回"""
|
||
mix_name = doc.get("mix_name", "")
|
||
|
||
# 计算总点赞数
|
||
episode_details = doc.get("episode_details", [])
|
||
total_likes = 0
|
||
total_comments = 0
|
||
|
||
if episode_details:
|
||
for episode in episode_details:
|
||
total_likes += episode.get("likes", 0)
|
||
total_comments += len(episode.get("comments", []))
|
||
|
||
# 格式化总点赞数
|
||
total_likes_formatted = format_interaction_count(total_likes)
|
||
total_comments_formatted = format_interaction_count(total_comments)
|
||
|
||
return {
|
||
"_id": str(doc.get("_id", "")),
|
||
"batch_time": format_time(doc.get("batch_time")),
|
||
"mix_name": mix_name,
|
||
"video_url": doc.get("video_url", ""),
|
||
"playcount": doc.get("playcount", ""),
|
||
"play_vv": doc.get("play_vv", 0),
|
||
"request_id": doc.get("request_id", ""),
|
||
"rank": doc.get("rank", 0),
|
||
"cover_image_url": doc.get("cover_image_url", ""),
|
||
# 新增字段
|
||
"series_author": doc.get("series_author", ""),
|
||
"Manufacturing_Field": doc.get("Manufacturing_Field", ""),
|
||
"Copyright_field": doc.get("Copyright_field", ""),
|
||
"desc": doc.get("desc", ""),
|
||
"updated_to_episode": doc.get("updated_to_episode", 0),
|
||
"cover_backup_urls": doc.get("cover_backup_urls", []),
|
||
"mix_id": doc.get("mix_id", ""),
|
||
"episode_video_ids": doc.get("episode_video_ids", []),
|
||
"episode_details": doc.get("episode_details", []),
|
||
# 点赞和评论总数
|
||
"total_likes": total_likes,
|
||
"total_likes_formatted": total_likes_formatted,
|
||
"total_comments": total_comments,
|
||
"total_comments_formatted": total_comments_formatted,
|
||
# 播放量变化数据
|
||
"timeline_data": doc.get("timeline_data", []),
|
||
|
||
}
|
||
|
||
def get_mix_list(page=1, limit=20, sort_by="playcount", classification_type=None):
|
||
"""获取合集列表(分页)- 从Ranking_storage的data数组中获取数据,支持分类筛选"""
|
||
try:
|
||
# 计算跳过的数量
|
||
skip = (page - 1) * limit
|
||
|
||
# 设置排序字段
|
||
if sort_by == "growth":
|
||
# 按增长排序需要特殊处理
|
||
return get_growth_mixes(page, limit)
|
||
else:
|
||
# 获取今天的日期
|
||
today = datetime.now().date()
|
||
today_str = today.strftime("%Y-%m-%d")
|
||
|
||
# 从Ranking_storage中获取今天的数据
|
||
ranking_doc = collection.find_one({
|
||
"date": today_str,
|
||
"type": {"$in": ["comprehensive", "playcount"]} # 查找包含播放量数据的榜单
|
||
}, sort=[("calculation_sequence", -1)]) # 获取最新的计算结果
|
||
|
||
if not ranking_doc or "data" not in ranking_doc:
|
||
# 如果没有找到今天的数据,返回空结果
|
||
logging.warning(f"Ranking_storage中未找到 {today_str} 的数据")
|
||
return {
|
||
"success": True,
|
||
"message": f"暂无 {today_str} 的数据,请等待定时任务生成",
|
||
"data": [],
|
||
"pagination": {
|
||
"page": page,
|
||
"limit": limit,
|
||
"total": 0,
|
||
"pages": 0,
|
||
"has_next": False,
|
||
"has_prev": False
|
||
},
|
||
"sort_by": sort_by,
|
||
"data_source": "ranking_storage",
|
||
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
}
|
||
|
||
# 获取data数组中的数据
|
||
mix_data = ranking_doc.get("data", [])
|
||
|
||
# 分类筛选逻辑
|
||
if classification_type:
|
||
filtered_data = []
|
||
classification_field_map = {
|
||
'novel': 'Novel_IDs',
|
||
'anime': 'Anime_IDs',
|
||
'drama': 'Drama_IDs'
|
||
}
|
||
|
||
if classification_type in classification_field_map:
|
||
field_name = classification_field_map[classification_type]
|
||
|
||
for item in mix_data:
|
||
mix_id = item.get('mix_id')
|
||
if mix_id:
|
||
# 检查该mix_id是否在对应的分类字段中
|
||
classification_ids = item.get(field_name, [])
|
||
if isinstance(classification_ids, list) and mix_id in classification_ids:
|
||
filtered_data.append(item)
|
||
|
||
mix_data = filtered_data
|
||
logging.info(f"分类筛选 {classification_type}: 筛选出 {len(mix_data)} 条数据")
|
||
|
||
# 按播放量排序(如果需要)
|
||
if sort_by == "playcount":
|
||
mix_data = sorted(mix_data, key=lambda x: x.get("play_vv", 0), reverse=True)
|
||
|
||
# 分页处理
|
||
total = len(mix_data)
|
||
paginated_data = mix_data[skip:skip + limit]
|
||
|
||
# 为分页数据添加排名并格式化
|
||
formatted_data = []
|
||
for i, item in enumerate(paginated_data):
|
||
item["rank"] = skip + i + 1
|
||
# 确保mix_name字段存在
|
||
if "mix_name" not in item and "title" in item:
|
||
item["mix_name"] = item["title"]
|
||
|
||
# 使用format_mix_item函数格式化数据,包括计算总点赞数
|
||
formatted_item = format_mix_item(item)
|
||
formatted_data.append(formatted_item)
|
||
|
||
return {
|
||
"success": True,
|
||
"data": formatted_data,
|
||
"pagination": {
|
||
"page": page,
|
||
"limit": limit,
|
||
"total": total,
|
||
"pages": (total + limit - 1) // limit,
|
||
"has_next": page * limit < total,
|
||
"has_prev": page > 1
|
||
},
|
||
"sort_by": sort_by,
|
||
"data_source": "ranking_storage",
|
||
"update_time": ranking_doc.get("created_at", datetime.now()).strftime("%Y-%m-%d %H:%M:%S") if isinstance(ranking_doc.get("created_at"), datetime) else str(ranking_doc.get("created_at", ""))
|
||
}
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取合集列表失败: {e}")
|
||
return {"success": False, "message": f"获取数据失败: {str(e)}"}
|
||
|
||
def get_yesterday_classification_data(mix_name, field_name):
|
||
"""
|
||
获取昨天的分类数据
|
||
|
||
Args:
|
||
mix_name: 短剧名称
|
||
field_name: 分类字段名 (Novel_IDs, Anime_IDs, Drama_IDs)
|
||
|
||
Returns:
|
||
昨天的分类数据列表或None
|
||
"""
|
||
try:
|
||
# 获取昨天的日期
|
||
yesterday = datetime.now().date() - timedelta(days=1)
|
||
yesterday_str = yesterday.strftime("%Y-%m-%d")
|
||
|
||
# 从Ranking_storage查询昨天的数据
|
||
yesterday_doc = collection.find_one({
|
||
"date": yesterday_str,
|
||
"data.mix_name": mix_name
|
||
})
|
||
|
||
if yesterday_doc:
|
||
# 在data数组中查找对应的项目
|
||
for data_item in yesterday_doc.get("data", []):
|
||
if data_item.get("mix_name") == mix_name:
|
||
classification_ids = data_item.get(field_name, [])
|
||
if isinstance(classification_ids, list) and classification_ids:
|
||
logging.info(f"从昨天数据获取到分类信息: {mix_name} -> {field_name}: {classification_ids}")
|
||
return classification_ids
|
||
|
||
return None
|
||
except Exception as e:
|
||
logging.error(f"获取昨天分类数据失败: {e}")
|
||
return None
|
||
|
||
def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None, classification_type=None):
|
||
"""获取按播放量增长排序的合集列表 - 直接从Ranking_storage读取对应日期的数据"""
|
||
try:
|
||
# 计算跳过的数量
|
||
skip = (page - 1) * limit
|
||
|
||
# 简化日期处理:直接使用前端传来的日期
|
||
if start_date and end_date:
|
||
# 如果前端提供了日期,直接使用(优先使用end_date作为查询日期)
|
||
if isinstance(end_date, str):
|
||
target_date = end_date
|
||
else:
|
||
target_date = end_date.strftime("%Y-%m-%d")
|
||
elif end_date:
|
||
# 如果只提供了end_date,使用end_date
|
||
if isinstance(end_date, str):
|
||
target_date = end_date
|
||
else:
|
||
target_date = end_date.strftime("%Y-%m-%d")
|
||
elif start_date:
|
||
# 如果只提供了start_date,使用start_date
|
||
if isinstance(start_date, str):
|
||
target_date = start_date
|
||
else:
|
||
target_date = start_date.strftime("%Y-%m-%d")
|
||
else:
|
||
# 如果没有提供日期,默认使用今天
|
||
target_date = datetime.now().date().strftime("%Y-%m-%d")
|
||
|
||
logging.info(f"📅 查询日期: {target_date}")
|
||
|
||
# 检查并自动同步Ranking_storage字段信息
|
||
# 检查是否需要同步字段信息
|
||
sample_item = collection.find_one({
|
||
"date": target_date,
|
||
"mix_name": {"$exists": True}
|
||
})
|
||
|
||
if sample_item:
|
||
# 检查是否缺少关键字段
|
||
missing_manufacturing = sample_item.get('Manufacturing_Field') is None
|
||
missing_copyright = sample_item.get('Copyright_field') is None
|
||
|
||
if missing_manufacturing or missing_copyright:
|
||
logging.info(f"检测到 {target_date} 的Ranking_storage数据缺少字段信息,开始自动同步...")
|
||
sync_result = sync_ranking_storage_fields(target_date, force_update=False)
|
||
if sync_result["success"]:
|
||
logging.info(f"自动同步完成: {sync_result['stats']}")
|
||
else:
|
||
logging.warning(f"自动同步失败: {sync_result['message']}")
|
||
|
||
# 从Ranking_storage读取预计算的增长榜数据
|
||
growth_ranking = collection.find_one({
|
||
"date": target_date,
|
||
"type": "comprehensive" # 使用comprehensive类型,包含增长数据
|
||
}, sort=[("calculation_sequence", -1)]) # 获取最新的计算结果
|
||
|
||
if not growth_ranking or "data" not in growth_ranking:
|
||
# 如果没有找到comprehensive类型,尝试查找growth类型
|
||
growth_ranking = collection.find_one({
|
||
"date": target_date,
|
||
"type": "growth"
|
||
}, sort=[("calculation_sequence", -1)])
|
||
|
||
if growth_ranking and "data" in growth_ranking:
|
||
logging.info(f"📈 从Ranking_storage读取 {target_date} 的增长榜数据")
|
||
|
||
# 获取预先计算好的增长榜数据
|
||
growth_data = growth_ranking["data"]
|
||
|
||
# 如果是comprehensive类型,需要按增长值排序
|
||
if growth_ranking.get("type") == "comprehensive":
|
||
# 按timeline_data中的play_vv_change排序
|
||
growth_data = sorted(growth_data,
|
||
key=lambda x: x.get("timeline_data", {}).get("play_vv_change", 0),
|
||
reverse=True)
|
||
|
||
# 根据分类类型筛选数据
|
||
if classification_type:
|
||
classification_field_map = {
|
||
"novel": "Novel_IDs",
|
||
"anime": "Anime_IDs",
|
||
"drama": "Drama_IDs"
|
||
}
|
||
|
||
if classification_type in classification_field_map:
|
||
field_name = classification_field_map[classification_type]
|
||
filtered_data = []
|
||
|
||
for item in growth_data:
|
||
mix_name = item.get("mix_name", "")
|
||
mix_id = item.get("mix_id", "")
|
||
|
||
# 检查当前数据是否有分类信息
|
||
current_classification_ids = item.get(field_name, [])
|
||
|
||
# 如果当前数据有分类信息,直接使用
|
||
if isinstance(current_classification_ids, list) and current_classification_ids:
|
||
if mix_id and mix_id in current_classification_ids:
|
||
filtered_data.append(item)
|
||
elif not mix_id and mix_name:
|
||
# 如果没有mix_id但有mix_name,检查分类字段是否包含该短剧
|
||
filtered_data.append(item)
|
||
else:
|
||
# 如果当前数据没有分类信息,尝试从昨天数据获取
|
||
if mix_name:
|
||
yesterday_classification_ids = get_yesterday_classification_data(mix_name, field_name)
|
||
if yesterday_classification_ids:
|
||
# 使用昨天的分类数据
|
||
if mix_id and mix_id in yesterday_classification_ids:
|
||
filtered_data.append(item)
|
||
elif not mix_id:
|
||
# 如果没有mix_id,直接使用昨天的分类数据
|
||
filtered_data.append(item)
|
||
logging.info(f"使用昨天分类数据: {mix_name} -> {field_name}")
|
||
|
||
growth_data = filtered_data
|
||
|
||
# 分页处理
|
||
total = len(growth_data)
|
||
paginated_data = growth_data[skip:skip + limit]
|
||
|
||
# 为分页数据添加排名和补充完整字段信息
|
||
for i, item in enumerate(paginated_data):
|
||
item["rank"] = skip + i + 1
|
||
# 修复:使用mix_name字段,不要用空的title覆盖它
|
||
mix_name = item.get("mix_name", "")
|
||
|
||
if mix_name:
|
||
|
||
# 优化:直接从Ranking_storage中获取已同步的字段信息
|
||
# 查找对应日期的Ranking_storage记录
|
||
ranking_storage_item = collection.find_one({
|
||
"date": target_date,
|
||
"mix_name": mix_name
|
||
})
|
||
|
||
if ranking_storage_item:
|
||
# 直接使用Ranking_storage中已同步的字段
|
||
item.update({
|
||
"Manufacturing_Field": ranking_storage_item.get("Manufacturing_Field", ""),
|
||
"Copyright_field": ranking_storage_item.get("Copyright_field", ""),
|
||
"series_author": ranking_storage_item.get("series_author", item.get("series_author", "")),
|
||
"video_id": ranking_storage_item.get("video_id", item.get("video_id", "")),
|
||
"video_url": ranking_storage_item.get("video_url", item.get("video_url", "")),
|
||
# 保持当前item中的封面和播放量数据(来自榜单计算)
|
||
"cover_image_url": item.get("cover_image_url", ranking_storage_item.get("cover_image_url", "")),
|
||
"play_vv": item.get("play_vv", ranking_storage_item.get("play_vv", 0)),
|
||
"playcount_str": item.get("playcount_str", ranking_storage_item.get("playcount_str", "0"))
|
||
})
|
||
logging.info(f"从Ranking_storage获取到同步字段: {mix_name}")
|
||
else:
|
||
# 如果Ranking_storage中没有对应记录,回退到原有逻辑
|
||
logging.warning(f"Ranking_storage中未找到 {mix_name} 的记录,回退到原有查询逻辑")
|
||
|
||
# 根据查询日期判断数据源
|
||
today = datetime.now().date()
|
||
# 将target_date字符串转换为日期对象进行比较
|
||
try:
|
||
target_date_obj = datetime.strptime(target_date, "%Y-%m-%d").date()
|
||
is_historical_date = target_date_obj < today
|
||
except:
|
||
is_historical_date = False
|
||
|
||
management_doc = None
|
||
|
||
# 统一从Rankings_management获取数据
|
||
management_doc = rankings_management_collection.find_one({"mix_name": mix_name})
|
||
|
||
if management_doc:
|
||
item.update({
|
||
"Manufacturing_Field": management_doc.get("Manufacturing_Field", ""),
|
||
"Copyright_field": management_doc.get("Copyright_field", ""),
|
||
"series_author": management_doc.get("series_author", item.get("series_author", "")),
|
||
"video_id": management_doc.get("video_id", item.get("video_id", "")),
|
||
"video_url": management_doc.get("video_url", item.get("video_url", "")),
|
||
"cover_image_url": item.get("cover_image_url", management_doc.get("cover_image_url", "")),
|
||
"play_vv": item.get("play_vv", management_doc.get("play_vv", 0)),
|
||
"playcount_str": item.get("playcount_str", management_doc.get("playcount_str", "0"))
|
||
})
|
||
else:
|
||
# 设置默认值
|
||
item.update({
|
||
"Manufacturing_Field": "",
|
||
"Copyright_field": "",
|
||
"series_author": item.get("series_author", ""),
|
||
"video_id": item.get("video_id", ""),
|
||
"video_url": item.get("video_url", ""),
|
||
"cover_image_url": item.get("cover_image_url", ""),
|
||
"play_vv": item.get("play_vv", 0),
|
||
"playcount_str": item.get("playcount_str", "0")
|
||
})
|
||
else:
|
||
item["Manufacturing_Field"] = ""
|
||
item["Copyright_field"] = ""
|
||
|
||
# 使用format_mix_item函数格式化所有数据,包括计算总点赞数
|
||
formatted_data = []
|
||
for item in paginated_data:
|
||
formatted_item = format_mix_item(item)
|
||
formatted_data.append(formatted_item)
|
||
|
||
return {
|
||
"success": True,
|
||
"data": formatted_data,
|
||
"pagination": {
|
||
"page": page,
|
||
"limit": limit,
|
||
"total": total,
|
||
"pages": (total + limit - 1) // limit,
|
||
"has_next": page * limit < total,
|
||
"has_prev": page > 1
|
||
},
|
||
"sort_by": "growth",
|
||
"date_range": {
|
||
"start_date": target_date,
|
||
"end_date": target_date
|
||
},
|
||
"data_source": "ranking_storage", # 标识数据来源
|
||
"update_time": growth_ranking.get("created_at", datetime.now()).strftime("%Y-%m-%d %H:%M:%S") if isinstance(growth_ranking.get("created_at"), datetime) else str(growth_ranking.get("created_at", ""))
|
||
}
|
||
else:
|
||
# 如果Ranking_storage中没有数据,返回空结果
|
||
logging.warning(f"Ranking_storage中未找到 {target_date} 的增长榜数据")
|
||
return {
|
||
"success": True,
|
||
"message": f"暂无 {target_date} 的增长榜数据,请等待定时任务生成",
|
||
"data": [],
|
||
"pagination": {
|
||
"page": page,
|
||
"limit": limit,
|
||
"total": 0,
|
||
"pages": 0,
|
||
"has_next": False,
|
||
"has_prev": False
|
||
},
|
||
"sort_by": "growth",
|
||
"date_range": {
|
||
"start_date": target_date,
|
||
"end_date": target_date
|
||
},
|
||
"data_source": "ranking_storage",
|
||
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
}
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取增长合集列表失败: {e}")
|
||
# 返回错误信息,不再回退到播放量排序
|
||
return {
|
||
"success": False,
|
||
"message": f"获取增长榜数据失败: {str(e)}",
|
||
"data": [],
|
||
"pagination": {
|
||
"page": page,
|
||
"limit": limit,
|
||
"total": 0,
|
||
"pages": 0,
|
||
"has_next": False,
|
||
"has_prev": False
|
||
},
|
||
"sort_by": "growth",
|
||
"data_source": "ranking_storage"
|
||
}
|
||
|
||
def get_top_mixes(limit=10):
|
||
"""获取热门合集(TOP榜单)"""
|
||
try:
|
||
# 按播放量排序获取热门合集
|
||
cursor = collection.find().sort("play_vv", -1).limit(limit)
|
||
docs = list(cursor)
|
||
if not docs:
|
||
return {"success": False, "message": "暂无数据"}
|
||
# 格式化数据
|
||
top_list = []
|
||
for doc in docs:
|
||
item = format_mix_item(doc)
|
||
top_list.append(item)
|
||
return {
|
||
"success": True,
|
||
"data": top_list,
|
||
"total": len(top_list),
|
||
"update_time": format_time(docs[0].get("batch_time")) if docs else ""
|
||
}
|
||
except Exception as e:
|
||
logging.error(f"获取热门合集失败: {e}")
|
||
return {"success": False, "message": f"获取数据失败: {str(e)}"}
|
||
|
||
def search_mixes(keyword, page=1, limit=10):
|
||
"""搜索合集"""
|
||
try:
|
||
if not keyword:
|
||
return {"success": False, "message": "请提供搜索关键词"}
|
||
|
||
# 计算跳过的数量
|
||
skip = (page - 1) * limit
|
||
|
||
# 构建搜索条件(模糊匹配合集名称)
|
||
search_condition = {
|
||
"mix_name": {"$regex": keyword, "$options": "i"}
|
||
}
|
||
|
||
# 查询数据
|
||
cursor = collection.find(search_condition).sort("play_vv", -1).skip(skip).limit(limit)
|
||
docs = list(cursor)
|
||
|
||
# 获取搜索结果总数
|
||
total = collection.count_documents(search_condition)
|
||
|
||
# 格式化数据
|
||
search_results = []
|
||
for doc in docs:
|
||
item = format_mix_item(doc)
|
||
search_results.append(item)
|
||
|
||
return {
|
||
"success": True,
|
||
"data": search_results,
|
||
"keyword": keyword,
|
||
"pagination": {
|
||
"page": page,
|
||
"limit": limit,
|
||
"total": total,
|
||
"pages": (total + limit - 1) // limit,
|
||
"has_next": page * limit < total,
|
||
"has_prev": page > 1
|
||
},
|
||
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
}
|
||
|
||
except Exception as e:
|
||
logging.error(f"搜索合集失败: {e}")
|
||
return {"success": False, "message": f"搜索失败: {str(e)}"}
|
||
|
||
def get_mix_detail(mix_id):
|
||
"""获取合集详情"""
|
||
try:
|
||
from bson import ObjectId
|
||
|
||
# 尝试通过ObjectId查找
|
||
try:
|
||
doc = collection.find_one({"_id": ObjectId(mix_id)})
|
||
except:
|
||
# 如果ObjectId无效,尝试其他字段
|
||
doc = collection.find_one({
|
||
"$or": [
|
||
{"mix_name": mix_id},
|
||
{"request_id": mix_id}
|
||
]
|
||
})
|
||
|
||
if not doc:
|
||
return {"success": False, "message": "未找到合集信息"}
|
||
|
||
# 格式化详细信息 - 只返回数据库原始字段
|
||
detail = format_mix_item(doc)
|
||
|
||
return {
|
||
"success": True,
|
||
"data": detail,
|
||
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
}
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取合集详情失败: {e}")
|
||
return {"success": False, "message": f"获取详情失败: {str(e)}"}
|
||
|
||
def get_statistics():
|
||
"""获取统计信息"""
|
||
try:
|
||
# 基本统计
|
||
total_mixes = collection.count_documents({})
|
||
|
||
if total_mixes == 0:
|
||
return {"success": False, "message": "暂无数据"}
|
||
|
||
# 播放量统计
|
||
pipeline = [
|
||
{
|
||
"$group": {
|
||
"_id": None,
|
||
"total_playcount": {"$sum": "$play_vv"},
|
||
"avg_playcount": {"$avg": "$play_vv"},
|
||
"max_playcount": {"$max": "$play_vv"},
|
||
"min_playcount": {"$min": "$play_vv"}
|
||
}
|
||
}
|
||
]
|
||
|
||
stats_result = list(collection.aggregate(pipeline))
|
||
stats = stats_result[0] if stats_result else {}
|
||
|
||
# 获取最新更新时间
|
||
latest_doc = collection.find().sort("batch_time", -1).limit(1)
|
||
latest_time = ""
|
||
if latest_doc:
|
||
latest_list = list(latest_doc)
|
||
if latest_list:
|
||
latest_time = format_time(latest_list[0].get("batch_time"))
|
||
|
||
# 热门分类统计(按播放量区间)
|
||
categories = [
|
||
{"name": "超热门", "min": 100000000, "count": 0}, # 1亿+
|
||
{"name": "热门", "min": 50000000, "max": 99999999, "count": 0}, # 5000万-1亿
|
||
{"name": "中等", "min": 10000000, "max": 49999999, "count": 0}, # 1000万-5000万
|
||
{"name": "一般", "min": 0, "max": 9999999, "count": 0} # 1000万以下
|
||
]
|
||
|
||
for category in categories:
|
||
if "max" in category:
|
||
count = collection.count_documents({
|
||
"play_vv": {"$gte": category["min"], "$lte": category["max"]}
|
||
})
|
||
else:
|
||
count = collection.count_documents({
|
||
"play_vv": {"$gte": category["min"]}
|
||
})
|
||
category["count"] = count
|
||
|
||
return {
|
||
"success": True,
|
||
"data": {
|
||
"total_mixes": total_mixes,
|
||
"total_playcount": stats.get("total_playcount", 0),
|
||
"avg_playcount": int(stats.get("avg_playcount", 0)),
|
||
"max_playcount": stats.get("max_playcount", 0),
|
||
"min_playcount": stats.get("min_playcount", 0),
|
||
"categories": categories,
|
||
"latest_update": latest_time
|
||
},
|
||
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
}
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取统计信息失败: {e}")
|
||
return {"success": False, "message": f"获取统计失败: {str(e)}"}
|
||
|
||
# 路由定义
|
||
@rank_bp.route('/growth_mixes')
|
||
def get_growth_mixes_route():
|
||
"""获取增长榜合集列表"""
|
||
page = int(request.args.get('page', 1))
|
||
limit = int(request.args.get('limit', 20))
|
||
start_date = request.args.get('start_date')
|
||
end_date = request.args.get('end_date')
|
||
classification_type = request.args.get('classification_type')
|
||
|
||
result = get_growth_mixes(page, limit, start_date, end_date, classification_type)
|
||
return jsonify(result)
|
||
|
||
@rank_bp.route('/videos')
|
||
def get_videos():
|
||
"""获取合集列表 - 兼容app.py调用,支持分类筛选"""
|
||
page = int(request.args.get('page', 1))
|
||
limit = int(request.args.get('limit', 20))
|
||
sort_by = request.args.get('sort', 'playcount')
|
||
classification_type = request.args.get('classification_type') # 新增分类筛选参数
|
||
|
||
if sort_by == 'growth':
|
||
start_date = request.args.get('start_date')
|
||
end_date = request.args.get('end_date')
|
||
result = get_growth_mixes(page, limit, start_date, end_date, classification_type)
|
||
else:
|
||
result = get_mix_list(page, limit, sort_by, classification_type)
|
||
|
||
return jsonify(result)
|
||
|
||
@rank_bp.route('/top')
|
||
def get_top():
|
||
"""获取热门榜单 - 兼容app.py调用"""
|
||
limit = int(request.args.get('limit', 10))
|
||
result = get_top_mixes(limit)
|
||
return jsonify(result)
|
||
|
||
@rank_bp.route('/search')
|
||
def search():
|
||
"""搜索合集 - 兼容app.py调用"""
|
||
keyword = request.args.get('q', '')
|
||
page = int(request.args.get('page', 1))
|
||
limit = int(request.args.get('limit', 10))
|
||
result = search_mixes(keyword, page, limit)
|
||
return jsonify(result)
|
||
|
||
@rank_bp.route('/detail')
|
||
def get_detail():
|
||
"""获取合集详情 - 兼容app.py调用"""
|
||
mix_id = request.args.get('id', '')
|
||
result = get_mix_detail(mix_id)
|
||
return jsonify(result)
|
||
|
||
@rank_bp.route('/stats')
|
||
def get_stats():
|
||
"""获取统计信息 - 兼容app.py调用"""
|
||
result = get_statistics()
|
||
return jsonify(result)
|
||
|
||
@rank_bp.route('/health')
|
||
def health_check():
|
||
"""健康检查 - 兼容app.py调用"""
|
||
try:
|
||
from database import client
|
||
|
||
# 检查数据库连接
|
||
if not client:
|
||
return jsonify({"success": False, "message": "数据库未连接"})
|
||
|
||
# 测试数据库连接
|
||
client.admin.command('ping')
|
||
|
||
# 获取数据统计
|
||
total_count = collection.count_documents({})
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"message": "服务正常",
|
||
"data": {
|
||
"database": "连接正常",
|
||
"total_records": total_count,
|
||
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
}
|
||
})
|
||
except Exception as e:
|
||
logging.error(f"健康检查失败: {e}")
|
||
return jsonify({"success": False, "message": f"服务异常: {str(e)}"})
|
||
|
||
|
||
# ==================== 榜单查询API接口 ====================
|
||
|
||
@rank_bp.route('/rankings')
|
||
def get_rankings():
|
||
"""获取榜单列表 - 支持按日期和类型查询,支持动态排序"""
|
||
try:
|
||
# 获取查询参数
|
||
date = request.args.get('date') # 日期,格式:YYYY-MM-DD
|
||
ranking_type = request.args.get('type') # 榜单类型:playcount, growth, newcomer
|
||
sort_by = request.args.get('sort_by', 'default') # 排序方式:default, play_vv_change, play_vv_change_rate, play_vv
|
||
sort_order = request.args.get('sort_order', 'desc') # 排序顺序:asc, desc
|
||
page = int(request.args.get('page', 1))
|
||
limit = int(request.args.get('limit', 50))
|
||
|
||
# 构建查询条件
|
||
query = {}
|
||
if date:
|
||
query['date'] = date
|
||
if ranking_type:
|
||
query['ranking_type'] = ranking_type
|
||
|
||
# 如果没有指定日期,默认获取最新日期的榜单
|
||
if not date:
|
||
latest_ranking = collection.find_one(
|
||
{}, sort=[('date', -1)]
|
||
)
|
||
if latest_ranking:
|
||
query['date'] = latest_ranking['date']
|
||
|
||
# 查询榜单
|
||
rankings = list(collection.find(query).sort('generated_at', -1))
|
||
|
||
if not rankings:
|
||
return jsonify({
|
||
"success": True,
|
||
"message": "暂无榜单数据",
|
||
"data": {
|
||
"rankings": [],
|
||
"total": 0,
|
||
"page": page,
|
||
"limit": limit
|
||
}
|
||
})
|
||
|
||
# 格式化返回数据
|
||
formatted_rankings = []
|
||
for ranking in rankings:
|
||
ranking_data = ranking.get('data', [])
|
||
|
||
# 动态排序逻辑
|
||
if sort_by != 'default' and ranking_data:
|
||
ranking_data = sort_ranking_data(ranking_data, sort_by, sort_order)
|
||
|
||
# 分页处理榜单数据
|
||
start_idx = (page - 1) * limit
|
||
end_idx = start_idx + limit
|
||
paginated_data = ranking_data[start_idx:end_idx]
|
||
|
||
formatted_rankings.append({
|
||
"date": ranking.get('date'),
|
||
"ranking_type": ranking.get('ranking_type'),
|
||
"ranking_name": ranking.get('ranking_name'),
|
||
"description": ranking.get('description'),
|
||
"data": paginated_data,
|
||
"total_count": len(ranking_data),
|
||
"current_page_count": len(paginated_data),
|
||
"generated_at": format_time(ranking.get('generated_at')),
|
||
"version": ranking.get('version', '1.0'),
|
||
"sort_info": {
|
||
"sort_by": sort_by,
|
||
"sort_order": sort_order
|
||
}
|
||
})
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"message": "获取榜单成功",
|
||
"data": {
|
||
"rankings": formatted_rankings,
|
||
"total": len(formatted_rankings),
|
||
"page": page,
|
||
"limit": limit,
|
||
"sort_by": sort_by,
|
||
"sort_order": sort_order
|
||
}
|
||
})
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取榜单失败: {e}")
|
||
return jsonify({"success": False, "message": f"获取榜单失败: {str(e)}"})
|
||
|
||
|
||
@rank_bp.route('/rankings/dates')
|
||
def get_ranking_dates():
|
||
"""获取可用的榜单日期列表"""
|
||
try:
|
||
# 获取所有不重复的日期
|
||
dates = collection.distinct('date')
|
||
dates.sort(reverse=True) # 按日期倒序排列
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"message": "获取日期列表成功",
|
||
"data": {
|
||
"dates": dates,
|
||
"total": len(dates)
|
||
}
|
||
})
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取日期列表失败: {e}")
|
||
return jsonify({"success": False, "message": f"获取日期列表失败: {str(e)}"})
|
||
|
||
|
||
@rank_bp.route('/rankings/types')
|
||
def get_ranking_types():
|
||
"""获取支持的榜单类型"""
|
||
try:
|
||
# 获取所有不重复的榜单类型
|
||
types = collection.distinct('ranking_type')
|
||
|
||
# 添加类型说明
|
||
type_descriptions = {
|
||
'playcount': '播放量榜 - 按播放量排序',
|
||
'growth': '增长榜 - 播放量增长最快',
|
||
'newcomer': '新晋榜 - 新上榜内容'
|
||
}
|
||
|
||
formatted_types = []
|
||
for type_name in types:
|
||
formatted_types.append({
|
||
"type": type_name,
|
||
"description": type_descriptions.get(type_name, type_name)
|
||
})
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"message": "获取榜单类型成功",
|
||
"data": {
|
||
"types": formatted_types,
|
||
"total": len(formatted_types)
|
||
}
|
||
})
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取榜单类型失败: {e}")
|
||
return jsonify({"success": False, "message": f"获取榜单类型失败: {str(e)}"})
|
||
|
||
|
||
@rank_bp.route('/rankings/latest')
|
||
def get_latest_rankings():
|
||
"""获取最新的所有类型榜单"""
|
||
try:
|
||
# 获取最新日期
|
||
latest_ranking = collection.find_one(
|
||
{}, sort=[('date', -1)]
|
||
)
|
||
|
||
if not latest_ranking:
|
||
return jsonify({
|
||
"success": True,
|
||
"message": "暂无榜单数据",
|
||
"data": {
|
||
"date": None,
|
||
"rankings": []
|
||
}
|
||
})
|
||
|
||
latest_date = latest_ranking['date']
|
||
|
||
# 获取该日期的所有榜单
|
||
rankings = list(collection.find({
|
||
'date': latest_date
|
||
}).sort('ranking_type', 1))
|
||
|
||
formatted_rankings = []
|
||
for ranking in rankings:
|
||
# 只返回前20条数据
|
||
ranking_data = ranking.get('data', [])[:20]
|
||
|
||
formatted_rankings.append({
|
||
"ranking_type": ranking.get('ranking_type'),
|
||
"ranking_name": ranking.get('ranking_name'),
|
||
"description": ranking.get('description'),
|
||
"data": ranking_data,
|
||
"total_count": ranking.get('total_count', 0),
|
||
"preview_count": len(ranking_data)
|
||
})
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"message": "获取最新榜单成功",
|
||
"data": {
|
||
"date": latest_date,
|
||
"rankings": formatted_rankings,
|
||
"total_types": len(formatted_rankings)
|
||
}
|
||
})
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取最新榜单失败: {e}")
|
||
return jsonify({"success": False, "message": f"获取最新榜单失败: {str(e)}"})
|
||
|
||
|
||
@rank_bp.route('/rankings/stats')
|
||
def get_rankings_stats():
|
||
"""获取榜单统计信息"""
|
||
try:
|
||
# 统计总榜单数
|
||
total_rankings = collection.count_documents({})
|
||
|
||
# 统计日期数量
|
||
total_dates = len(collection.distinct('date'))
|
||
|
||
# 统计榜单类型数量
|
||
total_types = len(collection.distinct('ranking_type'))
|
||
|
||
# 获取最新和最早日期
|
||
latest_ranking = collection.find_one({}, sort=[('date', -1)])
|
||
earliest_ranking = collection.find_one({}, sort=[('date', 1)])
|
||
|
||
latest_date = latest_ranking['date'] if latest_ranking else None
|
||
earliest_date = earliest_ranking['date'] if earliest_ranking else None
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"message": "获取榜单统计成功",
|
||
"data": {
|
||
"total_rankings": total_rankings,
|
||
"total_dates": total_dates,
|
||
"total_types": total_types,
|
||
"latest_date": latest_date,
|
||
"earliest_date": earliest_date,
|
||
"date_range": f"{earliest_date} 至 {latest_date}" if earliest_date and latest_date else "暂无数据"
|
||
}
|
||
})
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取榜单统计失败: {e}")
|
||
return jsonify({"success": False, "message": f"获取榜单统计失败: {str(e)}"})
|
||
|
||
|
||
@rank_bp.route('/update_drama_info', methods=['POST'])
|
||
def update_drama_info():
|
||
"""更新短剧信息(支持双向同步)"""
|
||
try:
|
||
data = request.get_json()
|
||
|
||
# 验证必需参数
|
||
if not data or 'mix_name' not in data:
|
||
return jsonify({"success": False, "message": "缺少必需参数 mix_name"})
|
||
|
||
mix_name = data['mix_name']
|
||
target_date = data.get('target_date') # 可选参数,用于判断是否为今日数据
|
||
|
||
# 准备更新字段
|
||
update_fields = {}
|
||
field_lock_updates = {}
|
||
|
||
# 检查并添加需要更新的字段
|
||
if 'title' in data:
|
||
update_fields['title'] = data['title']
|
||
if 'series_author' in data:
|
||
update_fields['series_author'] = data['series_author']
|
||
if 'Manufacturing_Field' in data:
|
||
update_fields['Manufacturing_Field'] = data['Manufacturing_Field']
|
||
# 标记制作方字段已被用户锁定
|
||
field_lock_updates['field_lock_status.Manufacturing_Field_locked'] = True
|
||
if 'Copyright_field' in data:
|
||
update_fields['Copyright_field'] = data['Copyright_field']
|
||
# 标记版权方字段已被用户锁定
|
||
field_lock_updates['field_lock_status.Copyright_field_locked'] = True
|
||
if 'desc' in data:
|
||
update_fields['desc'] = data['desc']
|
||
if 'play_vv' in data:
|
||
update_fields['play_vv'] = data['play_vv']
|
||
|
||
if 'cover_image_url' in data:
|
||
update_fields['cover_image_url'] = data['cover_image_url']
|
||
if 'cover_backup_urls' in data:
|
||
update_fields['cover_backup_urls'] = data['cover_backup_urls']
|
||
if 'timeline_data' in data:
|
||
update_fields['timeline_data'] = data['timeline_data']
|
||
|
||
# 检查分类字段的锁定状态
|
||
if 'Novel_IDs' in data:
|
||
update_fields['Novel_IDs'] = data['Novel_IDs']
|
||
field_lock_updates['field_lock_status.Novel_IDs_locked'] = True
|
||
if 'Anime_IDs' in data:
|
||
update_fields['Anime_IDs'] = data['Anime_IDs']
|
||
field_lock_updates['field_lock_status.Anime_IDs_locked'] = True
|
||
if 'Drama_IDs' in data:
|
||
update_fields['Drama_IDs'] = data['Drama_IDs']
|
||
field_lock_updates['field_lock_status.Drama_IDs_locked'] = True
|
||
|
||
if not update_fields:
|
||
return jsonify({"success": False, "message": "没有提供需要更新的字段"})
|
||
|
||
# 获取今天的日期
|
||
today = datetime.now().date().strftime('%Y-%m-%d')
|
||
is_today_data = target_date == today if target_date else True
|
||
|
||
updated_count = 0
|
||
|
||
# 首先检查短剧是否存在
|
||
existing_drama = rankings_management_collection.find_one({"mix_name": mix_name})
|
||
if not existing_drama:
|
||
return jsonify({
|
||
"success": False,
|
||
"message": f"未找到短剧: {mix_name}"
|
||
})
|
||
|
||
# 1. 更新Rankings_management数据库
|
||
mgmt_update_data = update_fields.copy()
|
||
mgmt_update_data.update(field_lock_updates) # 添加锁定状态更新
|
||
|
||
result_mgmt = rankings_management_collection.update_many(
|
||
{"mix_name": mix_name},
|
||
{"$set": mgmt_update_data}
|
||
)
|
||
|
||
# 2. 更新Ranking_storage数据库中的data数组
|
||
storage_update_data = {f"data.$.{field}": value for field, value in update_fields.items()}
|
||
# 为Ranking_storage也添加锁定状态更新
|
||
for field, value in field_lock_updates.items():
|
||
storage_update_data[f"data.$.{field}"] = value
|
||
|
||
result_storage = collection.update_many(
|
||
{"data.mix_name": mix_name},
|
||
{"$set": storage_update_data}
|
||
)
|
||
|
||
updated_count = result_mgmt.modified_count + result_storage.modified_count
|
||
matched_count = result_mgmt.matched_count + result_storage.matched_count
|
||
|
||
# 记录锁定状态更新
|
||
locked_fields = []
|
||
if field_lock_updates:
|
||
for field_key in field_lock_updates.keys():
|
||
field_name = field_key.replace('field_lock_status.', '').replace('_locked', '')
|
||
locked_fields.append(field_name)
|
||
|
||
logging.info(f"数据更新: Rankings_management(匹配:{result_mgmt.matched_count}, 修改:{result_mgmt.modified_count}), Ranking_storage(匹配:{result_storage.matched_count}, 修改:{result_storage.modified_count})")
|
||
if locked_fields:
|
||
logging.info(f"字段锁定状态更新: {', '.join(locked_fields)} 已被标记为用户锁定")
|
||
|
||
# 只要找到了数据就算成功,不管是否有修改
|
||
if matched_count > 0:
|
||
message = f"成功处理短剧 {mix_name} 的信息"
|
||
if updated_count > 0:
|
||
message += f",已更新 {updated_count} 条记录"
|
||
else:
|
||
message += ",数据无变化"
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"message": message,
|
||
"data": {
|
||
"mix_name": mix_name,
|
||
"updated_fields": list(update_fields.keys()),
|
||
"updated_count": updated_count,
|
||
"matched_count": matched_count,
|
||
"is_today_data": is_today_data
|
||
}
|
||
})
|
||
else:
|
||
return jsonify({
|
||
"success": False,
|
||
"message": f"未找到短剧 {mix_name} 的相关数据"
|
||
})
|
||
|
||
except Exception as e:
|
||
logging.error(f"更新短剧信息失败: {e}")
|
||
return jsonify({"success": False, "message": f"更新短剧信息失败: {str(e)}"})
|
||
|
||
|
||
@rank_bp.route('/update_content_classification', methods=['POST'])
|
||
def update_content_classification():
|
||
"""更新内容分类(支持将短剧ID添加到对应分类字段中)"""
|
||
try:
|
||
data = request.get_json()
|
||
|
||
# 验证必需参数
|
||
if not data or 'mix_name' not in data or 'classification_type' not in data:
|
||
return jsonify({"success": False, "message": "缺少必需参数 mix_name 或 classification_type"})
|
||
|
||
mix_name = data['mix_name']
|
||
classification_type = data['classification_type'] # 'novel', 'anime', 'drama'
|
||
action = data.get('action', 'add') # 'add' 或 'remove'
|
||
exclusive = data.get('exclusive', True) # 默认启用互斥模式,确保每个短剧只能属于一个分类
|
||
|
||
# 验证分类类型
|
||
valid_types = ['novel', 'anime', 'drama']
|
||
if classification_type not in valid_types:
|
||
return jsonify({"success": False, "message": f"无效的分类类型,支持的类型: {valid_types}"})
|
||
|
||
# 映射分类类型到字段名
|
||
field_mapping = {
|
||
'novel': 'Novel_IDs',
|
||
'anime': 'Anime_IDs',
|
||
'drama': 'Drama_IDs'
|
||
}
|
||
field_name = field_mapping[classification_type]
|
||
|
||
# 首先从Rankings_management获取短剧的mix_id,使用今天的日期
|
||
today = datetime.now().date()
|
||
start_of_day = datetime.combine(today, datetime.min.time())
|
||
end_of_day = datetime.combine(today, datetime.max.time())
|
||
|
||
mgmt_doc = rankings_management_collection.find_one({
|
||
"mix_name": mix_name,
|
||
"$or": [
|
||
{"created_at": {"$gte": start_of_day, "$lte": end_of_day}},
|
||
{"last_updated": {"$gte": start_of_day, "$lte": end_of_day}}
|
||
]
|
||
})
|
||
if not mgmt_doc:
|
||
return jsonify({"success": False, "message": f"未找到短剧: {mix_name}"})
|
||
|
||
mix_id = mgmt_doc.get('mix_id')
|
||
if not mix_id:
|
||
return jsonify({"success": False, "message": f"短剧 {mix_name} 缺少 mix_id"})
|
||
|
||
updated_count = 0
|
||
|
||
# 根据操作类型更新数据
|
||
if action == 'add':
|
||
# 如果启用互斥模式,先移除其他分类
|
||
if exclusive:
|
||
# 获取其他分类字段名
|
||
other_fields = [f for f in field_mapping.values() if f != field_name]
|
||
|
||
# 记录移除操作的结果
|
||
removed_from_other_categories = []
|
||
|
||
# 1. 从Rankings_management中移除其他分类
|
||
for other_field in other_fields:
|
||
result = rankings_management_collection.update_many(
|
||
{"mix_name": mix_name, other_field: mix_id},
|
||
{"$pull": {other_field: mix_id}}
|
||
)
|
||
if result.modified_count > 0:
|
||
# 找到对应的分类名称
|
||
for cat_type, field in field_mapping.items():
|
||
if field == other_field:
|
||
removed_from_other_categories.append(cat_type)
|
||
break
|
||
|
||
# 2. 从Ranking_storage中移除其他分类
|
||
for other_field in other_fields:
|
||
collection.update_many(
|
||
{"data.mix_name": mix_name},
|
||
{"$pull": {f"data.$.{other_field}": mix_id}}
|
||
)
|
||
|
||
if removed_from_other_categories:
|
||
logging.info(f"互斥模式:已将短剧 {mix_name} 从 {', '.join(removed_from_other_categories)} 分类中移除")
|
||
else:
|
||
logging.info(f"互斥模式:短剧 {mix_name} 未在其他分类中,无需移除")
|
||
|
||
# 添加到分类字段(使用$addToSet避免重复)
|
||
# 1. 更新Rankings_management数据库
|
||
result_mgmt = rankings_management_collection.update_many(
|
||
{"mix_name": mix_name},
|
||
{"$addToSet": {field_name: mix_id}}
|
||
)
|
||
|
||
# 2. 更新Ranking_storage数据库中的data数组
|
||
result_storage = collection.update_many(
|
||
{"data.mix_name": mix_name},
|
||
{"$addToSet": {f"data.$.{field_name}": mix_id}}
|
||
)
|
||
|
||
updated_count = result_mgmt.modified_count + result_storage.modified_count
|
||
message = f"成功将短剧 {mix_name} 添加到 {classification_type} 分类"
|
||
if exclusive and removed_from_other_categories:
|
||
message += f"(已自动从 {', '.join(removed_from_other_categories)} 分类中移除)"
|
||
|
||
elif action == 'remove':
|
||
# 从分类字段中移除
|
||
# 1. 更新Rankings_management数据库
|
||
result_mgmt = rankings_management_collection.update_many(
|
||
{"mix_name": mix_name},
|
||
{"$pull": {field_name: mix_id}}
|
||
)
|
||
|
||
# 2. 更新Ranking_storage数据库中的data数组
|
||
result_storage = collection.update_many(
|
||
{"data.mix_name": mix_name},
|
||
{"$pull": {f"data.$.{field_name}": mix_id}}
|
||
)
|
||
|
||
updated_count = result_mgmt.modified_count + result_storage.modified_count
|
||
message = f"成功将短剧 {mix_name} 从 {classification_type} 分类中移除"
|
||
|
||
else:
|
||
return jsonify({"success": False, "message": "无效的操作类型,支持 'add' 或 'remove'"})
|
||
|
||
logging.info(f"分类更新: {message}, Rankings_management({result_mgmt.modified_count}), Ranking_storage({result_storage.modified_count})")
|
||
|
||
# 获取更新后的分类状态,使用今天的日期
|
||
updated_mgmt_doc = rankings_management_collection.find_one({
|
||
"mix_name": mix_name,
|
||
"$or": [
|
||
{"created_at": {"$gte": start_of_day, "$lte": end_of_day}},
|
||
{"last_updated": {"$gte": start_of_day, "$lte": end_of_day}}
|
||
]
|
||
})
|
||
classification_status = {
|
||
'novel': mix_id in updated_mgmt_doc.get('Novel_IDs', []) if updated_mgmt_doc else False,
|
||
'anime': mix_id in updated_mgmt_doc.get('Anime_IDs', []) if updated_mgmt_doc else False,
|
||
'drama': mix_id in updated_mgmt_doc.get('Drama_IDs', []) if updated_mgmt_doc else False
|
||
}
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"message": message,
|
||
"data": {
|
||
"mix_name": mix_name,
|
||
"mix_id": mix_id,
|
||
"classification_type": classification_type,
|
||
"field_name": field_name,
|
||
"action": action,
|
||
"updated_count": updated_count,
|
||
"classification_status": classification_status
|
||
}
|
||
})
|
||
|
||
except Exception as e:
|
||
logging.error(f"更新内容分类失败: {e}")
|
||
return jsonify({"success": False, "message": f"更新内容分类失败: {str(e)}"})
|
||
|
||
|
||
@rank_bp.route('/get_content_classification', methods=['GET'])
|
||
def get_content_classification():
|
||
"""获取短剧的分类状态"""
|
||
try:
|
||
mix_name = request.args.get('mix_name')
|
||
|
||
if not mix_name:
|
||
return jsonify({"success": False, "message": "缺少必需参数 mix_name"})
|
||
|
||
# 从Rankings_management获取短剧信息
|
||
mgmt_doc = rankings_management_collection.find_one({"mix_name": mix_name})
|
||
if not mgmt_doc:
|
||
return jsonify({"success": False, "message": f"未找到短剧: {mix_name}"})
|
||
|
||
mix_id = mgmt_doc.get('mix_id')
|
||
if not mix_id:
|
||
return jsonify({"success": False, "message": f"短剧 {mix_name} 缺少 mix_id"})
|
||
|
||
# 检查短剧在各个分类中的状态
|
||
novel_ids = mgmt_doc.get('Novel_IDs', [])
|
||
anime_ids = mgmt_doc.get('Anime_IDs', [])
|
||
drama_ids = mgmt_doc.get('Drama_IDs', [])
|
||
|
||
classification_status = {
|
||
'novel': mix_id in novel_ids,
|
||
'anime': mix_id in anime_ids,
|
||
'drama': mix_id in drama_ids
|
||
}
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"message": f"获取短剧 {mix_name} 分类状态成功",
|
||
"data": {
|
||
"mix_name": mix_name,
|
||
"mix_id": mix_id,
|
||
"classification_status": classification_status,
|
||
"classification_details": {
|
||
"Novel_IDs": novel_ids,
|
||
"Anime_IDs": anime_ids,
|
||
"Drama_IDs": drama_ids
|
||
}
|
||
}
|
||
})
|
||
|
||
except Exception as e:
|
||
logging.error(f"获取内容分类状态失败: {e}")
|
||
return jsonify({"success": False, "message": f"获取内容分类状态失败: {str(e)}"})
|
||
|
||
|
||
def validate_and_fix_classification_exclusivity():
|
||
"""
|
||
验证和修复数据库中的分类互斥性
|
||
确保每个短剧只属于一个分类(Novel_IDs、Anime_IDs、Drama_IDs)
|
||
|
||
Returns:
|
||
dict: 修复结果统计
|
||
"""
|
||
try:
|
||
# 获取所有Rankings_management数据
|
||
all_docs = list(rankings_management_collection.find({}))
|
||
|
||
fixed_count = 0
|
||
conflict_count = 0
|
||
|
||
for doc in all_docs:
|
||
mix_name = doc.get('mix_name', '')
|
||
mix_id = doc.get('mix_id')
|
||
|
||
if not mix_id:
|
||
continue
|
||
|
||
# 检查分类字段
|
||
novel_ids = doc.get('Novel_IDs', [])
|
||
anime_ids = doc.get('Anime_IDs', [])
|
||
drama_ids = doc.get('Drama_IDs', [])
|
||
|
||
# 统计该mix_id在多少个分类中出现
|
||
classifications = []
|
||
if mix_id in novel_ids:
|
||
classifications.append('novel')
|
||
if mix_id in anime_ids:
|
||
classifications.append('anime')
|
||
if mix_id in drama_ids:
|
||
classifications.append('drama')
|
||
|
||
# 如果出现在多个分类中,需要修复
|
||
if len(classifications) > 1:
|
||
conflict_count += 1
|
||
logging.warning(f"发现分类冲突: {mix_name} 同时属于 {classifications}")
|
||
|
||
# 保留最后一个分类,移除其他分类
|
||
# 优先级:drama > anime > novel
|
||
if 'drama' in classifications:
|
||
keep_classification = 'drama'
|
||
elif 'anime' in classifications:
|
||
keep_classification = 'anime'
|
||
else:
|
||
keep_classification = 'novel'
|
||
|
||
# 更新数据库
|
||
update_fields = {}
|
||
if keep_classification == 'novel':
|
||
update_fields['Novel_IDs'] = novel_ids
|
||
update_fields['Anime_IDs'] = [id for id in anime_ids if id != mix_id]
|
||
update_fields['Drama_IDs'] = [id for id in drama_ids if id != mix_id]
|
||
elif keep_classification == 'anime':
|
||
update_fields['Novel_IDs'] = [id for id in novel_ids if id != mix_id]
|
||
update_fields['Anime_IDs'] = anime_ids
|
||
update_fields['Drama_IDs'] = [id for id in drama_ids if id != mix_id]
|
||
elif keep_classification == 'drama':
|
||
update_fields['Novel_IDs'] = [id for id in novel_ids if id != mix_id]
|
||
update_fields['Anime_IDs'] = [id for id in anime_ids if id != mix_id]
|
||
update_fields['Drama_IDs'] = drama_ids
|
||
|
||
# 更新Rankings_management - 优先使用mix_id
|
||
if mix_id:
|
||
rankings_management_collection.update_one(
|
||
{"mix_id": mix_id},
|
||
{"$set": update_fields}
|
||
)
|
||
else:
|
||
rankings_management_collection.update_one(
|
||
{"mix_name": mix_name},
|
||
{"$set": update_fields}
|
||
)
|
||
|
||
# 更新Ranking_storage - 优先使用mix_id
|
||
if mix_id:
|
||
collection.update_many(
|
||
{"data.mix_id": mix_id},
|
||
{"$set": {
|
||
f"data.$.Novel_IDs": update_fields['Novel_IDs'],
|
||
f"data.$.Anime_IDs": update_fields['Anime_IDs'],
|
||
f"data.$.Drama_IDs": update_fields['Drama_IDs']
|
||
}}
|
||
)
|
||
else:
|
||
collection.update_many(
|
||
{"data.mix_name": mix_name},
|
||
{"$set": {
|
||
f"data.$.Novel_IDs": update_fields['Novel_IDs'],
|
||
f"data.$.Anime_IDs": update_fields['Anime_IDs'],
|
||
f"data.$.Drama_IDs": update_fields['Drama_IDs']
|
||
}}
|
||
)
|
||
|
||
fixed_count += 1
|
||
logging.info(f"修复分类冲突: {mix_name} 保留为 {keep_classification} 分类")
|
||
|
||
return {
|
||
"success": True,
|
||
"message": f"分类互斥性验证完成",
|
||
"data": {
|
||
"total_checked": len(all_docs),
|
||
"conflicts_found": conflict_count,
|
||
"conflicts_fixed": fixed_count
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logging.error(f"验证分类互斥性失败: {e}")
|
||
return {
|
||
"success": False,
|
||
"message": f"验证分类互斥性失败: {str(e)}"
|
||
}
|
||
|
||
|
||
def sync_ranking_storage_fields(target_date=None, force_update=False, max_retries=3, retry_delay=60):
|
||
"""
|
||
同步Ranking_storage中的字段信息
|
||
统一从Rankings_management中获取对应的字段值并保存到Ranking_storage
|
||
|
||
Args:
|
||
target_date: 目标日期,格式为'YYYY-MM-DD',默认为今天
|
||
force_update: 是否强制更新已有字段,默认False
|
||
max_retries: 最大重试次数,默认3次
|
||
retry_delay: 重试间隔(秒),默认60秒
|
||
|
||
Returns:
|
||
dict: 同步结果统计
|
||
"""
|
||
try:
|
||
# 设置目标日期
|
||
if target_date is None:
|
||
target_date_obj = datetime.now().date()
|
||
target_date = target_date_obj.strftime('%Y-%m-%d')
|
||
else:
|
||
target_date_obj = datetime.strptime(target_date, '%Y-%m-%d').date()
|
||
|
||
# 获取Ranking_storage中指定日期的数据
|
||
ranking_storage_query = {"date": target_date}
|
||
ranking_storage_items = list(collection.find(ranking_storage_query))
|
||
|
||
if not ranking_storage_items:
|
||
return {
|
||
"success": False,
|
||
"message": f"未找到日期 {target_date} 的Ranking_storage数据"
|
||
}
|
||
|
||
# 统计信息
|
||
total_items = len(ranking_storage_items)
|
||
updated_items = 0
|
||
skipped_items = 0
|
||
error_items = 0
|
||
retry_count = 0 # 重试次数计数器
|
||
pending_items = [] # 需要重试的项目
|
||
|
||
# 🔄 修复后的同步逻辑:更新data数组中的每个项目
|
||
for ranking_doc in ranking_storage_items:
|
||
try:
|
||
# 获取data数组
|
||
data_array = ranking_doc.get('data', [])
|
||
if not data_array:
|
||
logging.warning(f"Ranking_storage文档没有data数组: {ranking_doc.get('_id')}")
|
||
skipped_items += 1
|
||
continue
|
||
|
||
# 标记是否有任何项目被更新
|
||
doc_updated = False
|
||
updated_data_array = []
|
||
|
||
# 遍历data数组中的每个项目
|
||
for data_item in data_array:
|
||
try:
|
||
mix_name = data_item.get('mix_name', '').strip()
|
||
|
||
# 🚫 跳过无效数据:确保mix_name不为空
|
||
if not mix_name or mix_name == "" or mix_name.lower() == "null":
|
||
logging.warning(f"跳过空的或无效的mix_name记录: {data_item.get('_id', 'unknown')}")
|
||
continue # 不添加到updated_data_array,直接跳过
|
||
|
||
# 🔧 优化逻辑:优先使用mix_id进行查询,提高准确性
|
||
source_data = None
|
||
mix_id = data_item.get('mix_id')
|
||
|
||
# 使用通用查询函数,优先mix_id查询
|
||
query_conditions = {}
|
||
if mix_id:
|
||
query_conditions['mix_id'] = mix_id
|
||
if mix_name:
|
||
query_conditions['mix_name'] = mix_name
|
||
|
||
# 使用find_management_data函数进行查询
|
||
if query_conditions:
|
||
source_data = find_management_data(query_conditions, target_date)
|
||
|
||
# 如果还是没找到,尝试通过title匹配
|
||
if not source_data:
|
||
title = data_item.get('title')
|
||
if title and title.strip():
|
||
title_query = {"mix_name": title.strip()}
|
||
source_data = find_management_data(title_query, target_date)
|
||
if source_data:
|
||
logging.info(f"通过title找到数据: {title} -> {source_data.get('mix_name', 'N/A')}")
|
||
|
||
# 如果找到了源数据,更新mix_name(如果原来为空的话)
|
||
if source_data and not mix_name:
|
||
mix_name = source_data.get('mix_name', '').strip()
|
||
if mix_name:
|
||
data_item['mix_name'] = mix_name
|
||
logging.info(f"修复空的mix_name: {data_item.get('title', 'N/A')} -> {mix_name}")
|
||
else:
|
||
logging.warning(f"源数据中的mix_name也为空,跳过此记录")
|
||
continue # 跳过无效记录
|
||
|
||
# 如果还是没有找到源数据,检查是否有锁定字段需要保护
|
||
if not source_data:
|
||
logging.warning(f"无法找到对应的源数据: mix_name={mix_name}, mix_id={data_item.get('mix_id')}, title={data_item.get('title')}")
|
||
|
||
# 检查是否有锁定字段,如果有锁定字段,保持原数据不变(从 data_item 获取)
|
||
field_lock_status = data_item.get('field_lock_status', {})
|
||
has_locked_fields = any([
|
||
field_lock_status.get('Manufacturing_Field_locked', False),
|
||
field_lock_status.get('Copyright_field_locked', False),
|
||
field_lock_status.get('Novel_IDs_locked', False),
|
||
field_lock_status.get('Anime_IDs_locked', False),
|
||
field_lock_status.get('Drama_IDs_locked', False)
|
||
])
|
||
|
||
# 检查是否有用户设置的数据(锁定字段或分类数据)
|
||
has_user_data = has_locked_fields or any([
|
||
data_item.get('Manufacturing_Field'),
|
||
data_item.get('Copyright_field'),
|
||
data_item.get('Novel_IDs'),
|
||
data_item.get('Anime_IDs'),
|
||
data_item.get('Drama_IDs')
|
||
])
|
||
|
||
if has_locked_fields:
|
||
logging.info(f"保持锁定字段不变: {mix_name} (无源数据但有锁定字段)")
|
||
updated_data_array.append(data_item)
|
||
elif has_user_data:
|
||
logging.info(f"保持用户设置的数据: {mix_name} (无源数据但有用户数据)")
|
||
updated_data_array.append(data_item)
|
||
else:
|
||
# 只有当mix_name有效时才保留记录
|
||
if mix_name and mix_name.strip():
|
||
updated_data_array.append(data_item)
|
||
continue
|
||
|
||
# 检查是否需要更新 - 包含所有Rankings_management字段
|
||
fields_to_check = {
|
||
# 基础字段
|
||
'batch_id': data_item.get('batch_id'),
|
||
'batch_time': data_item.get('batch_time'),
|
||
'item_sequence': data_item.get('item_sequence'),
|
||
'mix_id': data_item.get('mix_id'),
|
||
'playcount': data_item.get('playcount'),
|
||
'request_id': data_item.get('request_id'),
|
||
# 封面相关字段
|
||
'cover_image_url_original': data_item.get('cover_image_url_original'),
|
||
'cover_upload_success': data_item.get('cover_upload_success'),
|
||
'cover_backup_urls': data_item.get('cover_backup_urls'),
|
||
# 内容字段
|
||
'desc': data_item.get('desc'),
|
||
'series_author': data_item.get('series_author'),
|
||
'updated_to_episode': data_item.get('updated_to_episode'),
|
||
'episode_video_ids': data_item.get('episode_video_ids'),
|
||
'episode_details': data_item.get('episode_details'),
|
||
# 状态字段
|
||
'data_status': data_item.get('data_status'),
|
||
'realtime_saved': data_item.get('realtime_saved'),
|
||
'created_at': data_item.get('created_at'),
|
||
'last_updated': data_item.get('last_updated'),
|
||
'Manufacturing_Field': data_item.get('Manufacturing_Field'),
|
||
'Copyright_field': data_item.get('Copyright_field'),
|
||
# 新增:内容分类字段
|
||
'Novel_IDs': data_item.get('Novel_IDs', []),
|
||
'Anime_IDs': data_item.get('Anime_IDs', []),
|
||
'Drama_IDs': data_item.get('Drama_IDs', []),
|
||
# 计算字段
|
||
}
|
||
|
||
# 🔒 检查字段锁定状态(从 data_item 获取,而不是 ranking_doc)
|
||
field_lock_status = data_item.get('field_lock_status', {})
|
||
manufacturing_locked = field_lock_status.get('Manufacturing_Field_locked', False)
|
||
copyright_locked = field_lock_status.get('Copyright_field_locked', False)
|
||
novel_ids_locked = field_lock_status.get('Novel_IDs_locked', False)
|
||
anime_ids_locked = field_lock_status.get('Anime_IDs_locked', False)
|
||
drama_ids_locked = field_lock_status.get('Drama_IDs_locked', False)
|
||
|
||
# 检查哪些字段需要更新
|
||
needs_update = False
|
||
for field_name, field_value in fields_to_check.items():
|
||
# 🔒 字段锁定保护:如果字段已锁定,跳过更新
|
||
if field_name == 'Manufacturing_Field' and manufacturing_locked:
|
||
logging.info(f"[字段锁定] 跳过Manufacturing_Field更新: {mix_name} (已锁定)")
|
||
continue
|
||
elif field_name == 'Copyright_field' and copyright_locked:
|
||
logging.info(f"[字段锁定] 跳过Copyright_field更新: {mix_name} (已锁定)")
|
||
continue
|
||
elif field_name == 'Novel_IDs' and novel_ids_locked:
|
||
logging.info(f"[字段锁定] 跳过Novel_IDs更新: {mix_name} (已锁定)")
|
||
continue
|
||
elif field_name == 'Anime_IDs' and anime_ids_locked:
|
||
logging.info(f"[字段锁定] 跳过Anime_IDs更新: {mix_name} (已锁定)")
|
||
continue
|
||
elif field_name == 'Drama_IDs' and drama_ids_locked:
|
||
logging.info(f"[字段锁定] 跳过Drama_IDs更新: {mix_name} (已锁定)")
|
||
continue
|
||
|
||
# 对于数组字段,检查是否为空数组
|
||
if field_name in ['cover_backup_urls', 'episode_video_ids', 'episode_details', 'Novel_IDs', 'Anime_IDs', 'Drama_IDs']:
|
||
if force_update or field_value is None or (isinstance(field_value, list) and len(field_value) == 0):
|
||
needs_update = True
|
||
break
|
||
# 对于其他字段,使用原来的条件
|
||
elif force_update or field_value is None or field_value == '' or field_value == 0:
|
||
needs_update = True
|
||
break
|
||
|
||
if not needs_update:
|
||
updated_data_array.append(data_item)
|
||
continue
|
||
|
||
# 从源数据获取字段值并更新data_item
|
||
item_updated = False
|
||
for field_name, current_value in fields_to_check.items():
|
||
# 🔒 字段锁定保护:如果字段已锁定,跳过更新
|
||
if field_name == 'Manufacturing_Field' and manufacturing_locked:
|
||
logging.info(f"[字段锁定] 保护Manufacturing_Field不被覆盖: {mix_name}")
|
||
continue
|
||
elif field_name == 'Copyright_field' and copyright_locked:
|
||
logging.info(f"[字段锁定] 保护Copyright_field不被覆盖: {mix_name}")
|
||
continue
|
||
elif field_name == 'Novel_IDs' and novel_ids_locked:
|
||
logging.info(f"[字段锁定] 保护Novel_IDs不被覆盖: {mix_name}")
|
||
continue
|
||
elif field_name == 'Anime_IDs' and anime_ids_locked:
|
||
logging.info(f"[字段锁定] 保护Anime_IDs不被覆盖: {mix_name}")
|
||
continue
|
||
elif field_name == 'Drama_IDs' and drama_ids_locked:
|
||
logging.info(f"[字段锁定] 保护Drama_IDs不被覆盖: {mix_name}")
|
||
continue
|
||
|
||
# 对于数组字段,检查是否为空数组
|
||
should_update = False
|
||
if field_name in ['cover_backup_urls', 'episode_video_ids', 'episode_details', 'Novel_IDs', 'Anime_IDs', 'Drama_IDs']:
|
||
should_update = force_update or current_value is None or (isinstance(current_value, list) and len(current_value) == 0)
|
||
else:
|
||
should_update = force_update or current_value is None or current_value == '' or current_value == 0
|
||
|
||
if should_update:
|
||
if field_name == 'episode_details':
|
||
# 特殊处理episode_details字段,直接从源数据复制
|
||
data_item[field_name] = source_data.get(field_name, [])
|
||
item_updated = True
|
||
elif field_name == 'cover_backup_urls':
|
||
# 特殊处理cover_backup_urls字段,确保是数组格式
|
||
cover_backup_urls = source_data.get(field_name, [])
|
||
if not isinstance(cover_backup_urls, list):
|
||
cover_backup_urls = []
|
||
data_item[field_name] = cover_backup_urls
|
||
item_updated = True
|
||
elif field_name == 'episode_video_ids':
|
||
# 特殊处理episode_video_ids字段,确保是数组格式
|
||
episode_video_ids = source_data.get(field_name, [])
|
||
if not isinstance(episode_video_ids, list):
|
||
episode_video_ids = []
|
||
data_item[field_name] = episode_video_ids
|
||
item_updated = True
|
||
elif field_name in ['Novel_IDs', 'Anime_IDs', 'Drama_IDs']:
|
||
# 特殊处理分类字段,确保是数组格式和互斥性
|
||
classification_ids = source_data.get(field_name, [])
|
||
if not isinstance(classification_ids, list):
|
||
classification_ids = []
|
||
|
||
# 🔑 关键修复:只有当源数据有值时才更新,否则保留用户设置
|
||
if classification_ids:
|
||
# 源数据有值,更新分类字段
|
||
# 确保分类互斥性:如果当前字段有值,清空其他分类字段(但要检查锁定状态)
|
||
if field_name == 'Novel_IDs':
|
||
# 只有在其他字段未锁定时才清空
|
||
if not anime_ids_locked:
|
||
data_item['Anime_IDs'] = []
|
||
if not drama_ids_locked:
|
||
data_item['Drama_IDs'] = []
|
||
elif field_name == 'Anime_IDs':
|
||
if not novel_ids_locked:
|
||
data_item['Novel_IDs'] = []
|
||
if not drama_ids_locked:
|
||
data_item['Drama_IDs'] = []
|
||
elif field_name == 'Drama_IDs':
|
||
if not novel_ids_locked:
|
||
data_item['Novel_IDs'] = []
|
||
if not anime_ids_locked:
|
||
data_item['Anime_IDs'] = []
|
||
|
||
data_item[field_name] = classification_ids
|
||
item_updated = True
|
||
else:
|
||
# 源数据为空,检查当前是否有用户设置的值
|
||
current_classification = data_item.get(field_name, [])
|
||
if current_classification and isinstance(current_classification, list) and len(current_classification) > 0:
|
||
# 用户已设置分类,保留不变
|
||
logging.info(f"[分类保护] 保留用户设置的 {field_name}: {mix_name}")
|
||
else:
|
||
# 当前也没有值,设置为空数组
|
||
data_item[field_name] = []
|
||
item_updated = True
|
||
else:
|
||
# 对于其他字段,直接从源数据获取
|
||
source_value = source_data.get(field_name, '')
|
||
data_item[field_name] = source_value
|
||
item_updated = True
|
||
|
||
# 🔒 保护重要字段:确保不覆盖播放量差值等关键数据
|
||
# timeline_data字段必须保留
|
||
# 保护其他重要的计算字段
|
||
protected_fields = ['rank', 'play_vv', 'video_id', 'video_url', 'cover_image_url', 'playcount_str', 'timeline_data']
|
||
# 这些字段不会被覆盖,因为它们不在fields_to_check中
|
||
|
||
if item_updated:
|
||
doc_updated = True
|
||
logging.info(f"✅ 成功同步data项目字段: {mix_name}")
|
||
|
||
updated_data_array.append(data_item)
|
||
|
||
except Exception as e:
|
||
logging.error(f"同步data项目失败 {data_item.get('mix_name', 'N/A')}: {e}")
|
||
# 保持原数据不变
|
||
updated_data_array.append(data_item)
|
||
continue
|
||
|
||
# 如果有任何项目被更新,更新整个文档的data数组
|
||
if doc_updated:
|
||
collection.update_one(
|
||
{"_id": ranking_doc["_id"]},
|
||
{"$set": {"data": updated_data_array}}
|
||
)
|
||
updated_items += 1
|
||
logging.info(f"✅ 成功更新Ranking_storage文档的data数组: {ranking_doc.get('date', 'N/A')}")
|
||
else:
|
||
skipped_items += 1
|
||
|
||
except Exception as e:
|
||
logging.error(f"同步Ranking_storage文档失败 {ranking_doc.get('_id')}: {e}")
|
||
error_items += 1
|
||
continue
|
||
|
||
# 新的同步逻辑已经直接处理data数组,不需要重试机制
|
||
|
||
return {
|
||
"success": True,
|
||
"message": f"同步完成(重试 {retry_count} 次)",
|
||
"stats": {
|
||
"target_date": target_date,
|
||
"total_items": total_items,
|
||
"updated_items": updated_items,
|
||
"skipped_items": skipped_items,
|
||
"error_items": error_items,
|
||
"retry_count": retry_count,
|
||
"pending_items_final": len(pending_items),
|
||
"data_source": "Rankings_management"
|
||
}
|
||
}
|
||
|
||
except Exception as e:
|
||
logging.error(f"同步Ranking_storage字段失败: {e}")
|
||
return {
|
||
"success": False,
|
||
"message": f"同步失败: {str(e)}"
|
||
}
|
||
|
||
|
||
@rank_bp.route('/sync_ranking_fields', methods=['POST'])
|
||
def sync_ranking_fields():
|
||
"""
|
||
API端点:同步Ranking_storage字段
|
||
"""
|
||
try:
|
||
data = request.get_json() or {}
|
||
target_date = data.get('target_date')
|
||
force_update = data.get('force_update', False)
|
||
|
||
result = sync_ranking_storage_fields(target_date, force_update)
|
||
|
||
if result["success"]:
|
||
return jsonify(result)
|
||
else:
|
||
return jsonify(result), 400
|
||
|
||
except Exception as e:
|
||
logging.error(f"同步API调用失败: {e}")
|
||
return jsonify({
|
||
"success": False,
|
||
"message": f"API调用失败: {str(e)}"
|
||
}), 500
|
||
|
||
|
||
@rank_bp.route('/validate_classification_exclusivity', methods=['POST'])
|
||
def validate_classification_exclusivity_api():
|
||
"""
|
||
API端点:验证和修复分类互斥性
|
||
确保每个短剧只属于一个分类(Novel_IDs、Anime_IDs、Drama_IDs)
|
||
"""
|
||
try:
|
||
result = validate_and_fix_classification_exclusivity()
|
||
|
||
if result["success"]:
|
||
return jsonify(result)
|
||
else:
|
||
return jsonify(result), 400
|
||
|
||
except Exception as e:
|
||
logging.error(f"验证分类互斥性API失败: {e}")
|
||
return jsonify({
|
||
"success": False,
|
||
"message": f"验证分类互斥性失败: {str(e)}"
|
||
}), 500 |