rank_backend/backend/routers/rank_api_routes.py
2025-11-06 18:13:31 +08:00

1978 lines
84 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
小程序专用抖音播放量数据API服务器
优化的数据格式和接口设计,专为小程序使用
"""
from flask import Blueprint, request, jsonify
from datetime import datetime, timedelta
import logging
import re
from database import db
# 创建蓝图
rank_bp = Blueprint('rank', __name__, url_prefix='/api/rank')
# 获取数据库集合
collection = db['Ranking_storage'] # 主要数据源榜单存储表包含data数组
rankings_management_collection = db['Rankings_management'] # 管理数据库(字段同步源)
daily_rankings_collection = db['Ranking_storage'] # 榜单存储表
def format_playcount(playcount_str):
"""格式化播放量字符串为数字"""
if not playcount_str:
return 0
try:
if isinstance(playcount_str, (int, float)):
return int(playcount_str)
playcount_str = str(playcount_str).strip()
# 处理亿、万等单位
if "亿" in playcount_str:
num = float(re.findall(r'[\d.]+', playcount_str)[0])
return int(num * 100000000)
elif "" in playcount_str:
num = float(re.findall(r'[\d.]+', playcount_str)[0])
return int(num * 10000)
else:
# 尝试直接转换数字
return int(float(playcount_str))
except:
return 0
def format_cover_url(cover_data):
"""格式化封面图片URL"""
if not cover_data:
return ""
if isinstance(cover_data, str):
return cover_data
elif isinstance(cover_data, dict) and 'url_list' in cover_data:
return cover_data['url_list'][0] if cover_data['url_list'] else ""
else:
return ""
def format_time(time_obj):
"""格式化时间"""
if not time_obj:
return ""
if isinstance(time_obj, datetime):
return time_obj.strftime("%Y-%m-%d %H:%M:%S")
else:
return str(time_obj)
def parse_date_string(date_str):
"""通用日期解析函数"""
try:
if isinstance(date_str, str):
return datetime.strptime(date_str, '%Y-%m-%d').date()
return date_str
except (ValueError, TypeError):
logging.warning(f"无法解析日期字符串: {date_str}")
return None
def find_management_data(query, target_date=None):
"""
通用的管理数据查询函数优先使用mix_id进行查询
Args:
query: 查询条件字典可以包含mix_id, mix_name等字段
target_date: 目标日期,用于日期过滤
Returns:
查询到的文档或None
"""
try:
# 如果查询条件中有mix_id优先使用mix_id查询
if 'mix_id' in query and query['mix_id']:
mix_id_query = {"mix_id": query['mix_id']}
# 添加日期过滤如果提供了target_date
if target_date:
if isinstance(target_date, str):
target_date = parse_date_string(target_date)
if target_date:
start_of_day = datetime.combine(target_date, datetime.min.time())
end_of_day = datetime.combine(target_date, datetime.max.time())
mix_id_query.update({
"$or": [
{"created_at": {"$gte": start_of_day, "$lte": end_of_day}},
{"last_updated": {"$gte": start_of_day, "$lte": end_of_day}}
]
})
result = rankings_management_collection.find_one(mix_id_query)
if result:
logging.info(f"通过mix_id找到管理数据: {query['mix_id']}")
return result
# 如果通过mix_id没找到或者没有mix_id尝试其他查询条件
fallback_query = {k: v for k, v in query.items() if k != 'mix_id'}
# 添加日期过滤如果提供了target_date
if target_date and fallback_query:
if isinstance(target_date, str):
target_date = parse_date_string(target_date)
if target_date:
start_of_day = datetime.combine(target_date, datetime.min.time())
end_of_day = datetime.combine(target_date, datetime.max.time())
fallback_query.update({
"$or": [
{"created_at": {"$gte": start_of_day, "$lte": end_of_day}},
{"last_updated": {"$gte": start_of_day, "$lte": end_of_day}}
]
})
if fallback_query:
result = rankings_management_collection.find_one(fallback_query)
if result:
logging.info(f"通过备用查询找到管理数据: {fallback_query}")
return result
logging.warning(f"未找到匹配的管理数据: {query}")
return None
except Exception as e:
logging.error(f"查询管理数据时出错: {e}")
return None
def sort_ranking_data(ranking_data, sort_by, sort_order='desc'):
"""
对榜单数据进行动态排序
Args:
ranking_data: 榜单数据列表
sort_by: 排序字段 (play_vv_change, play_vv_change_rate, play_vv, rank)
sort_order: 排序顺序 (asc, desc)
Returns:
排序后的榜单数据
"""
try:
# 定义排序键函数
def get_sort_key(item):
if sort_by == 'play_vv_change':
# 按播放量差值排序
timeline_data = item.get('timeline_data', {})
return timeline_data.get('play_vv_change', 0)
elif sort_by == 'play_vv_change_rate':
# 按播放量变化率排序
timeline_data = item.get('timeline_data', {})
return timeline_data.get('play_vv_change_rate', 0)
elif sort_by == 'play_vv':
# 按当前播放量排序
return item.get('play_vv', 0)
elif sort_by == 'rank':
# 按排名排序
return item.get('rank', 999999)
else:
# 默认按排名排序
return item.get('rank', 999999)
# 执行排序
reverse = (sort_order == 'desc')
# 对于排名字段,降序实际上是升序(排名越小越好)
if sort_by == 'rank':
reverse = (sort_order == 'asc')
sorted_data = sorted(ranking_data, key=get_sort_key, reverse=reverse)
# 重新分配排名
for i, item in enumerate(sorted_data, 1):
item['current_sort_rank'] = i
return sorted_data
except Exception as e:
logging.error(f"排序榜单数据失败: {e}")
# 如果排序失败,返回原始数据
return ranking_data
def parse_formatted_count(formatted_str):
"""解析格式化的数字字符串(如"1.2万""374W"等)"""
try:
if not formatted_str or formatted_str == "0":
return 0
formatted_str = str(formatted_str).strip()
# 处理万、W等单位
if "" in formatted_str or "W" in formatted_str:
# 提取数字部分
import re
numbers = re.findall(r'[\d.]+', formatted_str)
if numbers:
num = float(numbers[0])
return int(num * 10000)
elif "亿" in formatted_str:
numbers = re.findall(r'[\d.]+', formatted_str)
if numbers:
num = float(numbers[0])
return int(num * 100000000)
else:
# 尝试直接转换为数字
return int(float(formatted_str))
except:
return 0
return 0
def format_interaction_count(count):
"""格式化互动数量为易读格式"""
try:
count = int(count)
if count >= 100000000: # 1亿+
return f"{count / 100000000:.1f}亿"
elif count >= 10000: # 1万+
return f"{count / 10000:.1f}"
else:
return str(count)
except:
return "0"
def format_mix_item(doc, target_date=None):
"""格式化合集数据项 - 完全按照数据库原始字段返回"""
mix_name = doc.get("mix_name", "")
# 计算总点赞数
episode_details = doc.get("episode_details", [])
total_likes = 0
total_comments = 0
if episode_details:
for episode in episode_details:
total_likes += episode.get("likes", 0)
total_comments += len(episode.get("comments", []))
# 格式化总点赞数
total_likes_formatted = format_interaction_count(total_likes)
total_comments_formatted = format_interaction_count(total_comments)
return {
"_id": str(doc.get("_id", "")),
"batch_time": format_time(doc.get("batch_time")),
"mix_name": mix_name,
"video_url": doc.get("video_url", ""),
"playcount": doc.get("playcount", ""),
"play_vv": doc.get("play_vv", 0),
"request_id": doc.get("request_id", ""),
"rank": doc.get("rank", 0),
"cover_image_url": doc.get("cover_image_url", ""),
# 新增字段
"series_author": doc.get("series_author", ""),
"Manufacturing_Field": doc.get("Manufacturing_Field", ""),
"Copyright_field": doc.get("Copyright_field", ""),
"desc": doc.get("desc", ""),
"updated_to_episode": doc.get("updated_to_episode", 0),
"cover_backup_urls": doc.get("cover_backup_urls", []),
"mix_id": doc.get("mix_id", ""),
"episode_video_ids": doc.get("episode_video_ids", []),
"episode_details": doc.get("episode_details", []),
# 点赞和评论总数
"total_likes": total_likes,
"total_likes_formatted": total_likes_formatted,
"total_comments": total_comments,
"total_comments_formatted": total_comments_formatted,
# 播放量变化数据
"timeline_data": doc.get("timeline_data", []),
}
def get_mix_list(page=1, limit=20, sort_by="playcount", classification_type=None):
"""获取合集列表(分页)- 从Ranking_storage的data数组中获取数据支持分类筛选"""
try:
# 计算跳过的数量
skip = (page - 1) * limit
# 设置排序字段
if sort_by == "growth":
# 按增长排序需要特殊处理
return get_growth_mixes(page, limit)
else:
# 获取今天的日期
today = datetime.now().date()
today_str = today.strftime("%Y-%m-%d")
# 从Ranking_storage中获取今天的数据
ranking_doc = collection.find_one({
"date": today_str,
"type": {"$in": ["comprehensive", "playcount"]} # 查找包含播放量数据的榜单
}, sort=[("calculation_sequence", -1)]) # 获取最新的计算结果
if not ranking_doc or "data" not in ranking_doc:
# 如果没有找到今天的数据,返回空结果
logging.warning(f"Ranking_storage中未找到 {today_str} 的数据")
return {
"success": True,
"message": f"暂无 {today_str} 的数据,请等待定时任务生成",
"data": [],
"pagination": {
"page": page,
"limit": limit,
"total": 0,
"pages": 0,
"has_next": False,
"has_prev": False
},
"sort_by": sort_by,
"data_source": "ranking_storage",
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# 获取data数组中的数据
mix_data = ranking_doc.get("data", [])
# 分类筛选逻辑
if classification_type:
filtered_data = []
classification_field_map = {
'novel': 'Novel_IDs',
'anime': 'Anime_IDs',
'drama': 'Drama_IDs'
}
if classification_type in classification_field_map:
field_name = classification_field_map[classification_type]
for item in mix_data:
mix_id = item.get('mix_id')
if mix_id:
# 检查该mix_id是否在对应的分类字段中
classification_ids = item.get(field_name, [])
if isinstance(classification_ids, list) and mix_id in classification_ids:
filtered_data.append(item)
mix_data = filtered_data
logging.info(f"分类筛选 {classification_type}: 筛选出 {len(mix_data)} 条数据")
# 按播放量排序(如果需要)
if sort_by == "playcount":
mix_data = sorted(mix_data, key=lambda x: x.get("play_vv", 0), reverse=True)
# 分页处理
total = len(mix_data)
paginated_data = mix_data[skip:skip + limit]
# 为分页数据添加排名并格式化
formatted_data = []
for i, item in enumerate(paginated_data):
item["rank"] = skip + i + 1
# 确保mix_name字段存在
if "mix_name" not in item and "title" in item:
item["mix_name"] = item["title"]
# 使用format_mix_item函数格式化数据包括计算总点赞数
formatted_item = format_mix_item(item)
formatted_data.append(formatted_item)
return {
"success": True,
"data": formatted_data,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"sort_by": sort_by,
"data_source": "ranking_storage",
"update_time": ranking_doc.get("created_at", datetime.now()).strftime("%Y-%m-%d %H:%M:%S") if isinstance(ranking_doc.get("created_at"), datetime) else str(ranking_doc.get("created_at", ""))
}
except Exception as e:
logging.error(f"获取合集列表失败: {e}")
return {"success": False, "message": f"获取数据失败: {str(e)}"}
def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None, classification_type=None):
"""获取按播放量增长排序的合集列表 - 直接从Ranking_storage读取对应日期的数据"""
try:
# 计算跳过的数量
skip = (page - 1) * limit
# 简化日期处理:直接使用前端传来的日期
if start_date and end_date:
# 如果前端提供了日期直接使用优先使用end_date作为查询日期
if isinstance(end_date, str):
target_date = end_date
else:
target_date = end_date.strftime("%Y-%m-%d")
elif end_date:
# 如果只提供了end_date使用end_date
if isinstance(end_date, str):
target_date = end_date
else:
target_date = end_date.strftime("%Y-%m-%d")
elif start_date:
# 如果只提供了start_date使用start_date
if isinstance(start_date, str):
target_date = start_date
else:
target_date = start_date.strftime("%Y-%m-%d")
else:
# 如果没有提供日期,默认使用今天
target_date = datetime.now().date().strftime("%Y-%m-%d")
logging.info(f"📅 查询日期: {target_date}")
# 检查并自动同步Ranking_storage字段信息
# 检查是否需要同步字段信息
sample_item = daily_rankings_collection.find_one({
"date": target_date,
"mix_name": {"$exists": True}
})
if sample_item:
# 检查是否缺少关键字段
missing_manufacturing = sample_item.get('Manufacturing_Field') is None
missing_copyright = sample_item.get('Copyright_field') is None
if missing_manufacturing or missing_copyright:
logging.info(f"检测到 {target_date} 的Ranking_storage数据缺少字段信息开始自动同步...")
sync_result = sync_ranking_storage_fields(target_date, force_update=False)
if sync_result["success"]:
logging.info(f"自动同步完成: {sync_result['stats']}")
else:
logging.warning(f"自动同步失败: {sync_result['message']}")
# 从Ranking_storage读取预计算的增长榜数据
growth_ranking = daily_rankings_collection.find_one({
"date": target_date,
"type": "comprehensive" # 使用comprehensive类型包含增长数据
}, sort=[("calculation_sequence", -1)]) # 获取最新的计算结果
if not growth_ranking or "data" not in growth_ranking:
# 如果没有找到comprehensive类型尝试查找growth类型
growth_ranking = daily_rankings_collection.find_one({
"date": target_date,
"type": "growth"
}, sort=[("calculation_sequence", -1)])
if growth_ranking and "data" in growth_ranking:
logging.info(f"📈 从Ranking_storage读取 {target_date} 的增长榜数据")
# 获取预先计算好的增长榜数据
growth_data = growth_ranking["data"]
# 如果是comprehensive类型需要按增长值排序
if growth_ranking.get("type") == "comprehensive":
# 按timeline_data中的play_vv_change排序
growth_data = sorted(growth_data,
key=lambda x: x.get("timeline_data", {}).get("play_vv_change", 0),
reverse=True)
# 根据分类类型筛选数据
if classification_type:
classification_field_map = {
"novel": "Novel_IDs",
"anime": "Anime_IDs",
"drama": "Drama_IDs"
}
if classification_type in classification_field_map:
field_name = classification_field_map[classification_type]
filtered_data = []
for item in growth_data:
mix_id = item.get("mix_id", "")
if mix_id:
# 查找对应的Rankings_management记录获取分类信息
management_item = rankings_management_collection.find_one({"mix_id": mix_id})
if management_item:
classification_ids = management_item.get(field_name, [])
if isinstance(classification_ids, list) and mix_id in classification_ids:
filtered_data.append(item)
growth_data = filtered_data
# 分页处理
total = len(growth_data)
paginated_data = growth_data[skip:skip + limit]
# 为分页数据添加排名和补充完整字段信息
for i, item in enumerate(paginated_data):
item["rank"] = skip + i + 1
# 修复使用mix_name字段不要用空的title覆盖它
mix_name = item.get("mix_name", "")
if mix_name:
# 优化直接从Ranking_storage中获取已同步的字段信息
# 查找对应日期的Ranking_storage记录
ranking_storage_item = daily_rankings_collection.find_one({
"date": target_date,
"mix_name": mix_name
})
if ranking_storage_item:
# 直接使用Ranking_storage中已同步的字段
item.update({
"Manufacturing_Field": ranking_storage_item.get("Manufacturing_Field", ""),
"Copyright_field": ranking_storage_item.get("Copyright_field", ""),
"series_author": ranking_storage_item.get("series_author", item.get("series_author", "")),
"video_id": ranking_storage_item.get("video_id", item.get("video_id", "")),
"video_url": ranking_storage_item.get("video_url", item.get("video_url", "")),
# 保持当前item中的封面和播放量数据来自榜单计算
"cover_image_url": item.get("cover_image_url", ranking_storage_item.get("cover_image_url", "")),
"play_vv": item.get("play_vv", ranking_storage_item.get("play_vv", 0)),
"playcount_str": item.get("playcount_str", ranking_storage_item.get("playcount_str", "0"))
})
logging.info(f"从Ranking_storage获取到同步字段: {mix_name}")
else:
# 如果Ranking_storage中没有对应记录回退到原有逻辑
logging.warning(f"Ranking_storage中未找到 {mix_name} 的记录,回退到原有查询逻辑")
# 根据查询日期判断数据源
today = datetime.now().date()
# 将target_date字符串转换为日期对象进行比较
try:
target_date_obj = datetime.strptime(target_date, "%Y-%m-%d").date()
is_historical_date = target_date_obj < today
except:
is_historical_date = False
management_doc = None
# 统一从Rankings_management获取数据
management_doc = rankings_management_collection.find_one({"mix_name": mix_name})
if management_doc:
item.update({
"Manufacturing_Field": management_doc.get("Manufacturing_Field", ""),
"Copyright_field": management_doc.get("Copyright_field", ""),
"series_author": management_doc.get("series_author", item.get("series_author", "")),
"video_id": management_doc.get("video_id", item.get("video_id", "")),
"video_url": management_doc.get("video_url", item.get("video_url", "")),
"cover_image_url": item.get("cover_image_url", management_doc.get("cover_image_url", "")),
"play_vv": item.get("play_vv", management_doc.get("play_vv", 0)),
"playcount_str": item.get("playcount_str", management_doc.get("playcount_str", "0"))
})
else:
# 设置默认值
item.update({
"Manufacturing_Field": "",
"Copyright_field": "",
"series_author": item.get("series_author", ""),
"video_id": item.get("video_id", ""),
"video_url": item.get("video_url", ""),
"cover_image_url": item.get("cover_image_url", ""),
"play_vv": item.get("play_vv", 0),
"playcount_str": item.get("playcount_str", "0")
})
else:
item["Manufacturing_Field"] = ""
item["Copyright_field"] = ""
# 使用format_mix_item函数格式化所有数据包括计算总点赞数
formatted_data = []
for item in paginated_data:
formatted_item = format_mix_item(item)
formatted_data.append(formatted_item)
return {
"success": True,
"data": formatted_data,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"sort_by": "growth",
"date_range": {
"start_date": target_date,
"end_date": target_date
},
"data_source": "ranking_storage", # 标识数据来源
"update_time": growth_ranking.get("created_at", datetime.now()).strftime("%Y-%m-%d %H:%M:%S") if isinstance(growth_ranking.get("created_at"), datetime) else str(growth_ranking.get("created_at", ""))
}
else:
# 如果Ranking_storage中没有数据返回空结果
logging.warning(f"Ranking_storage中未找到 {target_date} 的增长榜数据")
return {
"success": True,
"message": f"暂无 {target_date} 的增长榜数据,请等待定时任务生成",
"data": [],
"pagination": {
"page": page,
"limit": limit,
"total": 0,
"pages": 0,
"has_next": False,
"has_prev": False
},
"sort_by": "growth",
"date_range": {
"start_date": target_date,
"end_date": target_date
},
"data_source": "ranking_storage",
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取增长合集列表失败: {e}")
# 返回错误信息,不再回退到播放量排序
return {
"success": False,
"message": f"获取增长榜数据失败: {str(e)}",
"data": [],
"pagination": {
"page": page,
"limit": limit,
"total": 0,
"pages": 0,
"has_next": False,
"has_prev": False
},
"sort_by": "growth",
"data_source": "ranking_storage"
}
def get_top_mixes(limit=10):
"""获取热门合集TOP榜单"""
try:
# 按播放量排序获取热门合集
cursor = collection.find().sort("play_vv", -1).limit(limit)
docs = list(cursor)
if not docs:
return {"success": False, "message": "暂无数据"}
# 格式化数据
top_list = []
for doc in docs:
item = format_mix_item(doc)
top_list.append(item)
return {
"success": True,
"data": top_list,
"total": len(top_list),
"update_time": format_time(docs[0].get("batch_time")) if docs else ""
}
except Exception as e:
logging.error(f"获取热门合集失败: {e}")
return {"success": False, "message": f"获取数据失败: {str(e)}"}
def search_mixes(keyword, page=1, limit=10):
"""搜索合集"""
try:
if not keyword:
return {"success": False, "message": "请提供搜索关键词"}
# 计算跳过的数量
skip = (page - 1) * limit
# 构建搜索条件(模糊匹配合集名称)
search_condition = {
"mix_name": {"$regex": keyword, "$options": "i"}
}
# 查询数据
cursor = collection.find(search_condition).sort("play_vv", -1).skip(skip).limit(limit)
docs = list(cursor)
# 获取搜索结果总数
total = collection.count_documents(search_condition)
# 格式化数据
search_results = []
for doc in docs:
item = format_mix_item(doc)
search_results.append(item)
return {
"success": True,
"data": search_results,
"keyword": keyword,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"搜索合集失败: {e}")
return {"success": False, "message": f"搜索失败: {str(e)}"}
def get_mix_detail(mix_id):
"""获取合集详情"""
try:
from bson import ObjectId
# 尝试通过ObjectId查找
try:
doc = collection.find_one({"_id": ObjectId(mix_id)})
except:
# 如果ObjectId无效尝试其他字段
doc = collection.find_one({
"$or": [
{"mix_name": mix_id},
{"request_id": mix_id}
]
})
if not doc:
return {"success": False, "message": "未找到合集信息"}
# 格式化详细信息 - 只返回数据库原始字段
detail = format_mix_item(doc)
return {
"success": True,
"data": detail,
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取合集详情失败: {e}")
return {"success": False, "message": f"获取详情失败: {str(e)}"}
def get_statistics():
"""获取统计信息"""
try:
# 基本统计
total_mixes = collection.count_documents({})
if total_mixes == 0:
return {"success": False, "message": "暂无数据"}
# 播放量统计
pipeline = [
{
"$group": {
"_id": None,
"total_playcount": {"$sum": "$play_vv"},
"avg_playcount": {"$avg": "$play_vv"},
"max_playcount": {"$max": "$play_vv"},
"min_playcount": {"$min": "$play_vv"}
}
}
]
stats_result = list(collection.aggregate(pipeline))
stats = stats_result[0] if stats_result else {}
# 获取最新更新时间
latest_doc = collection.find().sort("batch_time", -1).limit(1)
latest_time = ""
if latest_doc:
latest_list = list(latest_doc)
if latest_list:
latest_time = format_time(latest_list[0].get("batch_time"))
# 热门分类统计(按播放量区间)
categories = [
{"name": "超热门", "min": 100000000, "count": 0}, # 1亿+
{"name": "热门", "min": 50000000, "max": 99999999, "count": 0}, # 5000万-1亿
{"name": "中等", "min": 10000000, "max": 49999999, "count": 0}, # 1000万-5000万
{"name": "一般", "min": 0, "max": 9999999, "count": 0} # 1000万以下
]
for category in categories:
if "max" in category:
count = collection.count_documents({
"play_vv": {"$gte": category["min"], "$lte": category["max"]}
})
else:
count = collection.count_documents({
"play_vv": {"$gte": category["min"]}
})
category["count"] = count
return {
"success": True,
"data": {
"total_mixes": total_mixes,
"total_playcount": stats.get("total_playcount", 0),
"avg_playcount": int(stats.get("avg_playcount", 0)),
"max_playcount": stats.get("max_playcount", 0),
"min_playcount": stats.get("min_playcount", 0),
"categories": categories,
"latest_update": latest_time
},
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取统计信息失败: {e}")
return {"success": False, "message": f"获取统计失败: {str(e)}"}
# 路由定义
@rank_bp.route('/growth_mixes')
def get_growth_mixes_route():
"""获取增长榜合集列表"""
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 20))
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
classification_type = request.args.get('classification_type')
result = get_growth_mixes(page, limit, start_date, end_date, classification_type)
return jsonify(result)
@rank_bp.route('/videos')
def get_videos():
"""获取合集列表 - 兼容app.py调用支持分类筛选"""
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 20))
sort_by = request.args.get('sort', 'playcount')
classification_type = request.args.get('classification_type') # 新增分类筛选参数
if sort_by == 'growth':
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
result = get_growth_mixes(page, limit, start_date, end_date, classification_type)
else:
result = get_mix_list(page, limit, sort_by, classification_type)
return jsonify(result)
@rank_bp.route('/top')
def get_top():
"""获取热门榜单 - 兼容app.py调用"""
limit = int(request.args.get('limit', 10))
result = get_top_mixes(limit)
return jsonify(result)
@rank_bp.route('/search')
def search():
"""搜索合集 - 兼容app.py调用"""
keyword = request.args.get('q', '')
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
result = search_mixes(keyword, page, limit)
return jsonify(result)
@rank_bp.route('/detail')
def get_detail():
"""获取合集详情 - 兼容app.py调用"""
mix_id = request.args.get('id', '')
result = get_mix_detail(mix_id)
return jsonify(result)
@rank_bp.route('/stats')
def get_stats():
"""获取统计信息 - 兼容app.py调用"""
result = get_statistics()
return jsonify(result)
@rank_bp.route('/health')
def health_check():
"""健康检查 - 兼容app.py调用"""
try:
from database import client
# 检查数据库连接
if not client:
return jsonify({"success": False, "message": "数据库未连接"})
# 测试数据库连接
client.admin.command('ping')
# 获取数据统计
total_count = collection.count_documents({})
return jsonify({
"success": True,
"message": "服务正常",
"data": {
"database": "连接正常",
"total_records": total_count,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
})
except Exception as e:
logging.error(f"健康检查失败: {e}")
return jsonify({"success": False, "message": f"服务异常: {str(e)}"})
# ==================== 榜单查询API接口 ====================
@rank_bp.route('/rankings')
def get_rankings():
"""获取榜单列表 - 支持按日期和类型查询,支持动态排序"""
try:
# 获取查询参数
date = request.args.get('date') # 日期格式YYYY-MM-DD
ranking_type = request.args.get('type') # 榜单类型playcount, growth, newcomer
sort_by = request.args.get('sort_by', 'default') # 排序方式default, play_vv_change, play_vv_change_rate, play_vv
sort_order = request.args.get('sort_order', 'desc') # 排序顺序asc, desc
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 50))
# 构建查询条件
query = {}
if date:
query['date'] = date
if ranking_type:
query['ranking_type'] = ranking_type
# 如果没有指定日期,默认获取最新日期的榜单
if not date:
latest_ranking = daily_rankings_collection.find_one(
{}, sort=[('date', -1)]
)
if latest_ranking:
query['date'] = latest_ranking['date']
# 查询榜单
rankings = list(daily_rankings_collection.find(query).sort('generated_at', -1))
if not rankings:
return jsonify({
"success": True,
"message": "暂无榜单数据",
"data": {
"rankings": [],
"total": 0,
"page": page,
"limit": limit
}
})
# 格式化返回数据
formatted_rankings = []
for ranking in rankings:
ranking_data = ranking.get('data', [])
# 动态排序逻辑
if sort_by != 'default' and ranking_data:
ranking_data = sort_ranking_data(ranking_data, sort_by, sort_order)
# 分页处理榜单数据
start_idx = (page - 1) * limit
end_idx = start_idx + limit
paginated_data = ranking_data[start_idx:end_idx]
formatted_rankings.append({
"date": ranking.get('date'),
"ranking_type": ranking.get('ranking_type'),
"ranking_name": ranking.get('ranking_name'),
"description": ranking.get('description'),
"data": paginated_data,
"total_count": len(ranking_data),
"current_page_count": len(paginated_data),
"generated_at": format_time(ranking.get('generated_at')),
"version": ranking.get('version', '1.0'),
"sort_info": {
"sort_by": sort_by,
"sort_order": sort_order
}
})
return jsonify({
"success": True,
"message": "获取榜单成功",
"data": {
"rankings": formatted_rankings,
"total": len(formatted_rankings),
"page": page,
"limit": limit,
"sort_by": sort_by,
"sort_order": sort_order
}
})
except Exception as e:
logging.error(f"获取榜单失败: {e}")
return jsonify({"success": False, "message": f"获取榜单失败: {str(e)}"})
@rank_bp.route('/rankings/dates')
def get_ranking_dates():
"""获取可用的榜单日期列表"""
try:
# 获取所有不重复的日期
dates = daily_rankings_collection.distinct('date')
dates.sort(reverse=True) # 按日期倒序排列
return jsonify({
"success": True,
"message": "获取日期列表成功",
"data": {
"dates": dates,
"total": len(dates)
}
})
except Exception as e:
logging.error(f"获取日期列表失败: {e}")
return jsonify({"success": False, "message": f"获取日期列表失败: {str(e)}"})
@rank_bp.route('/rankings/types')
def get_ranking_types():
"""获取支持的榜单类型"""
try:
# 获取所有不重复的榜单类型
types = daily_rankings_collection.distinct('ranking_type')
# 添加类型说明
type_descriptions = {
'playcount': '播放量榜 - 按播放量排序',
'growth': '增长榜 - 播放量增长最快',
'newcomer': '新晋榜 - 新上榜内容'
}
formatted_types = []
for type_name in types:
formatted_types.append({
"type": type_name,
"description": type_descriptions.get(type_name, type_name)
})
return jsonify({
"success": True,
"message": "获取榜单类型成功",
"data": {
"types": formatted_types,
"total": len(formatted_types)
}
})
except Exception as e:
logging.error(f"获取榜单类型失败: {e}")
return jsonify({"success": False, "message": f"获取榜单类型失败: {str(e)}"})
@rank_bp.route('/rankings/latest')
def get_latest_rankings():
"""获取最新的所有类型榜单"""
try:
# 获取最新日期
latest_ranking = daily_rankings_collection.find_one(
{}, sort=[('date', -1)]
)
if not latest_ranking:
return jsonify({
"success": True,
"message": "暂无榜单数据",
"data": {
"date": None,
"rankings": []
}
})
latest_date = latest_ranking['date']
# 获取该日期的所有榜单
rankings = list(daily_rankings_collection.find({
'date': latest_date
}).sort('ranking_type', 1))
formatted_rankings = []
for ranking in rankings:
# 只返回前20条数据
ranking_data = ranking.get('data', [])[:20]
formatted_rankings.append({
"ranking_type": ranking.get('ranking_type'),
"ranking_name": ranking.get('ranking_name'),
"description": ranking.get('description'),
"data": ranking_data,
"total_count": ranking.get('total_count', 0),
"preview_count": len(ranking_data)
})
return jsonify({
"success": True,
"message": "获取最新榜单成功",
"data": {
"date": latest_date,
"rankings": formatted_rankings,
"total_types": len(formatted_rankings)
}
})
except Exception as e:
logging.error(f"获取最新榜单失败: {e}")
return jsonify({"success": False, "message": f"获取最新榜单失败: {str(e)}"})
@rank_bp.route('/rankings/stats')
def get_rankings_stats():
"""获取榜单统计信息"""
try:
# 统计总榜单数
total_rankings = daily_rankings_collection.count_documents({})
# 统计日期数量
total_dates = len(daily_rankings_collection.distinct('date'))
# 统计榜单类型数量
total_types = len(daily_rankings_collection.distinct('ranking_type'))
# 获取最新和最早日期
latest_ranking = daily_rankings_collection.find_one({}, sort=[('date', -1)])
earliest_ranking = daily_rankings_collection.find_one({}, sort=[('date', 1)])
latest_date = latest_ranking['date'] if latest_ranking else None
earliest_date = earliest_ranking['date'] if earliest_ranking else None
return jsonify({
"success": True,
"message": "获取榜单统计成功",
"data": {
"total_rankings": total_rankings,
"total_dates": total_dates,
"total_types": total_types,
"latest_date": latest_date,
"earliest_date": earliest_date,
"date_range": f"{earliest_date}{latest_date}" if earliest_date and latest_date else "暂无数据"
}
})
except Exception as e:
logging.error(f"获取榜单统计失败: {e}")
return jsonify({"success": False, "message": f"获取榜单统计失败: {str(e)}"})
@rank_bp.route('/update_drama_info', methods=['POST'])
def update_drama_info():
"""更新短剧信息(支持双向同步)"""
try:
data = request.get_json()
# 验证必需参数
if not data or 'mix_name' not in data:
return jsonify({"success": False, "message": "缺少必需参数 mix_name"})
mix_name = data['mix_name']
target_date = data.get('target_date') # 可选参数,用于判断是否为今日数据
# 准备更新字段
update_fields = {}
field_lock_updates = {}
# 检查并添加需要更新的字段
if 'title' in data:
update_fields['title'] = data['title']
if 'series_author' in data:
update_fields['series_author'] = data['series_author']
if 'Manufacturing_Field' in data:
update_fields['Manufacturing_Field'] = data['Manufacturing_Field']
# 标记制作方字段已被用户锁定
field_lock_updates['field_lock_status.Manufacturing_Field_locked'] = True
if 'Copyright_field' in data:
update_fields['Copyright_field'] = data['Copyright_field']
# 标记版权方字段已被用户锁定
field_lock_updates['field_lock_status.Copyright_field_locked'] = True
if 'desc' in data:
update_fields['desc'] = data['desc']
if 'play_vv' in data:
update_fields['play_vv'] = data['play_vv']
if 'cover_image_url' in data:
update_fields['cover_image_url'] = data['cover_image_url']
if 'cover_backup_urls' in data:
update_fields['cover_backup_urls'] = data['cover_backup_urls']
if 'timeline_data' in data:
update_fields['timeline_data'] = data['timeline_data']
# 检查分类字段的锁定状态
if 'Novel_IDs' in data:
update_fields['Novel_IDs'] = data['Novel_IDs']
field_lock_updates['field_lock_status.Novel_IDs_locked'] = True
if 'Anime_IDs' in data:
update_fields['Anime_IDs'] = data['Anime_IDs']
field_lock_updates['field_lock_status.Anime_IDs_locked'] = True
if 'Drama_IDs' in data:
update_fields['Drama_IDs'] = data['Drama_IDs']
field_lock_updates['field_lock_status.Drama_IDs_locked'] = True
if not update_fields:
return jsonify({"success": False, "message": "没有提供需要更新的字段"})
# 获取今天的日期
today = datetime.now().date().strftime('%Y-%m-%d')
is_today_data = target_date == today if target_date else True
updated_count = 0
# 首先检查短剧是否存在
existing_drama = rankings_management_collection.find_one({"mix_name": mix_name})
if not existing_drama:
return jsonify({
"success": False,
"message": f"未找到短剧: {mix_name}"
})
# 1. 更新Rankings_management数据库
mgmt_update_data = update_fields.copy()
mgmt_update_data.update(field_lock_updates) # 添加锁定状态更新
result_mgmt = rankings_management_collection.update_many(
{"mix_name": mix_name},
{"$set": mgmt_update_data}
)
# 2. 更新Ranking_storage数据库中的data数组
storage_update_data = {f"data.$.{field}": value for field, value in update_fields.items()}
# 为Ranking_storage也添加锁定状态更新
for field, value in field_lock_updates.items():
storage_update_data[f"data.$.{field}"] = value
result_storage = collection.update_many(
{"data.mix_name": mix_name},
{"$set": storage_update_data}
)
updated_count = result_mgmt.modified_count + result_storage.modified_count
matched_count = result_mgmt.matched_count + result_storage.matched_count
# 记录锁定状态更新
locked_fields = []
if field_lock_updates:
for field_key in field_lock_updates.keys():
field_name = field_key.replace('field_lock_status.', '').replace('_locked', '')
locked_fields.append(field_name)
logging.info(f"数据更新: Rankings_management(匹配:{result_mgmt.matched_count}, 修改:{result_mgmt.modified_count}), Ranking_storage(匹配:{result_storage.matched_count}, 修改:{result_storage.modified_count})")
if locked_fields:
logging.info(f"字段锁定状态更新: {', '.join(locked_fields)} 已被标记为用户锁定")
# 只要找到了数据就算成功,不管是否有修改
if matched_count > 0:
message = f"成功处理短剧 {mix_name} 的信息"
if updated_count > 0:
message += f",已更新 {updated_count} 条记录"
else:
message += ",数据无变化"
return jsonify({
"success": True,
"message": message,
"data": {
"mix_name": mix_name,
"updated_fields": list(update_fields.keys()),
"updated_count": updated_count,
"matched_count": matched_count,
"is_today_data": is_today_data
}
})
else:
return jsonify({
"success": False,
"message": f"未找到短剧 {mix_name} 的相关数据"
})
except Exception as e:
logging.error(f"更新短剧信息失败: {e}")
return jsonify({"success": False, "message": f"更新短剧信息失败: {str(e)}"})
@rank_bp.route('/update_content_classification', methods=['POST'])
def update_content_classification():
"""更新内容分类支持将短剧ID添加到对应分类字段中"""
try:
data = request.get_json()
# 验证必需参数
if not data or 'mix_name' not in data or 'classification_type' not in data:
return jsonify({"success": False, "message": "缺少必需参数 mix_name 或 classification_type"})
mix_name = data['mix_name']
classification_type = data['classification_type'] # 'novel', 'anime', 'drama'
action = data.get('action', 'add') # 'add' 或 'remove'
exclusive = data.get('exclusive', True) # 默认启用互斥模式,确保每个短剧只能属于一个分类
# 验证分类类型
valid_types = ['novel', 'anime', 'drama']
if classification_type not in valid_types:
return jsonify({"success": False, "message": f"无效的分类类型,支持的类型: {valid_types}"})
# 映射分类类型到字段名
field_mapping = {
'novel': 'Novel_IDs',
'anime': 'Anime_IDs',
'drama': 'Drama_IDs'
}
field_name = field_mapping[classification_type]
# 首先从Rankings_management获取短剧的mix_id使用今天的日期
today = datetime.now().date()
start_of_day = datetime.combine(today, datetime.min.time())
end_of_day = datetime.combine(today, datetime.max.time())
mgmt_doc = rankings_management_collection.find_one({
"mix_name": mix_name,
"$or": [
{"created_at": {"$gte": start_of_day, "$lte": end_of_day}},
{"last_updated": {"$gte": start_of_day, "$lte": end_of_day}}
]
})
if not mgmt_doc:
return jsonify({"success": False, "message": f"未找到短剧: {mix_name}"})
mix_id = mgmt_doc.get('mix_id')
if not mix_id:
return jsonify({"success": False, "message": f"短剧 {mix_name} 缺少 mix_id"})
updated_count = 0
# 根据操作类型更新数据
if action == 'add':
# 如果启用互斥模式,先移除其他分类
if exclusive:
# 获取其他分类字段名
other_fields = [f for f in field_mapping.values() if f != field_name]
# 记录移除操作的结果
removed_from_other_categories = []
# 1. 从Rankings_management中移除其他分类
for other_field in other_fields:
result = rankings_management_collection.update_many(
{"mix_name": mix_name, other_field: mix_id},
{"$pull": {other_field: mix_id}}
)
if result.modified_count > 0:
# 找到对应的分类名称
for cat_type, field in field_mapping.items():
if field == other_field:
removed_from_other_categories.append(cat_type)
break
# 2. 从Ranking_storage中移除其他分类
for other_field in other_fields:
collection.update_many(
{"data.mix_name": mix_name},
{"$pull": {f"data.$.{other_field}": mix_id}}
)
if removed_from_other_categories:
logging.info(f"互斥模式:已将短剧 {mix_name}{', '.join(removed_from_other_categories)} 分类中移除")
else:
logging.info(f"互斥模式:短剧 {mix_name} 未在其他分类中,无需移除")
# 添加到分类字段(使用$addToSet避免重复
# 1. 更新Rankings_management数据库
result_mgmt = rankings_management_collection.update_many(
{"mix_name": mix_name},
{"$addToSet": {field_name: mix_id}}
)
# 2. 更新Ranking_storage数据库中的data数组
result_storage = collection.update_many(
{"data.mix_name": mix_name},
{"$addToSet": {f"data.$.{field_name}": mix_id}}
)
updated_count = result_mgmt.modified_count + result_storage.modified_count
message = f"成功将短剧 {mix_name} 添加到 {classification_type} 分类"
if exclusive and removed_from_other_categories:
message += f"(已自动从 {', '.join(removed_from_other_categories)} 分类中移除)"
elif action == 'remove':
# 从分类字段中移除
# 1. 更新Rankings_management数据库
result_mgmt = rankings_management_collection.update_many(
{"mix_name": mix_name},
{"$pull": {field_name: mix_id}}
)
# 2. 更新Ranking_storage数据库中的data数组
result_storage = collection.update_many(
{"data.mix_name": mix_name},
{"$pull": {f"data.$.{field_name}": mix_id}}
)
updated_count = result_mgmt.modified_count + result_storage.modified_count
message = f"成功将短剧 {mix_name}{classification_type} 分类中移除"
else:
return jsonify({"success": False, "message": "无效的操作类型,支持 'add''remove'"})
logging.info(f"分类更新: {message}, Rankings_management({result_mgmt.modified_count}), Ranking_storage({result_storage.modified_count})")
# 获取更新后的分类状态,使用今天的日期
updated_mgmt_doc = rankings_management_collection.find_one({
"mix_name": mix_name,
"$or": [
{"created_at": {"$gte": start_of_day, "$lte": end_of_day}},
{"last_updated": {"$gte": start_of_day, "$lte": end_of_day}}
]
})
classification_status = {
'novel': mix_id in updated_mgmt_doc.get('Novel_IDs', []) if updated_mgmt_doc else False,
'anime': mix_id in updated_mgmt_doc.get('Anime_IDs', []) if updated_mgmt_doc else False,
'drama': mix_id in updated_mgmt_doc.get('Drama_IDs', []) if updated_mgmt_doc else False
}
return jsonify({
"success": True,
"message": message,
"data": {
"mix_name": mix_name,
"mix_id": mix_id,
"classification_type": classification_type,
"field_name": field_name,
"action": action,
"updated_count": updated_count,
"classification_status": classification_status
}
})
except Exception as e:
logging.error(f"更新内容分类失败: {e}")
return jsonify({"success": False, "message": f"更新内容分类失败: {str(e)}"})
@rank_bp.route('/get_content_classification', methods=['GET'])
def get_content_classification():
"""获取短剧的分类状态"""
try:
mix_name = request.args.get('mix_name')
if not mix_name:
return jsonify({"success": False, "message": "缺少必需参数 mix_name"})
# 从Rankings_management获取短剧信息
mgmt_doc = rankings_management_collection.find_one({"mix_name": mix_name})
if not mgmt_doc:
return jsonify({"success": False, "message": f"未找到短剧: {mix_name}"})
mix_id = mgmt_doc.get('mix_id')
if not mix_id:
return jsonify({"success": False, "message": f"短剧 {mix_name} 缺少 mix_id"})
# 检查短剧在各个分类中的状态
novel_ids = mgmt_doc.get('Novel_IDs', [])
anime_ids = mgmt_doc.get('Anime_IDs', [])
drama_ids = mgmt_doc.get('Drama_IDs', [])
classification_status = {
'novel': mix_id in novel_ids,
'anime': mix_id in anime_ids,
'drama': mix_id in drama_ids
}
return jsonify({
"success": True,
"message": f"获取短剧 {mix_name} 分类状态成功",
"data": {
"mix_name": mix_name,
"mix_id": mix_id,
"classification_status": classification_status,
"classification_details": {
"Novel_IDs": novel_ids,
"Anime_IDs": anime_ids,
"Drama_IDs": drama_ids
}
}
})
except Exception as e:
logging.error(f"获取内容分类状态失败: {e}")
return jsonify({"success": False, "message": f"获取内容分类状态失败: {str(e)}"})
def validate_and_fix_classification_exclusivity():
"""
验证和修复数据库中的分类互斥性
确保每个短剧只属于一个分类Novel_IDs、Anime_IDs、Drama_IDs
Returns:
dict: 修复结果统计
"""
try:
# 获取所有Rankings_management数据
all_docs = list(rankings_management_collection.find({}))
fixed_count = 0
conflict_count = 0
for doc in all_docs:
mix_name = doc.get('mix_name', '')
mix_id = doc.get('mix_id')
if not mix_id:
continue
# 检查分类字段
novel_ids = doc.get('Novel_IDs', [])
anime_ids = doc.get('Anime_IDs', [])
drama_ids = doc.get('Drama_IDs', [])
# 统计该mix_id在多少个分类中出现
classifications = []
if mix_id in novel_ids:
classifications.append('novel')
if mix_id in anime_ids:
classifications.append('anime')
if mix_id in drama_ids:
classifications.append('drama')
# 如果出现在多个分类中,需要修复
if len(classifications) > 1:
conflict_count += 1
logging.warning(f"发现分类冲突: {mix_name} 同时属于 {classifications}")
# 保留最后一个分类,移除其他分类
# 优先级drama > anime > novel
if 'drama' in classifications:
keep_classification = 'drama'
elif 'anime' in classifications:
keep_classification = 'anime'
else:
keep_classification = 'novel'
# 更新数据库
update_fields = {}
if keep_classification == 'novel':
update_fields['Novel_IDs'] = novel_ids
update_fields['Anime_IDs'] = [id for id in anime_ids if id != mix_id]
update_fields['Drama_IDs'] = [id for id in drama_ids if id != mix_id]
elif keep_classification == 'anime':
update_fields['Novel_IDs'] = [id for id in novel_ids if id != mix_id]
update_fields['Anime_IDs'] = anime_ids
update_fields['Drama_IDs'] = [id for id in drama_ids if id != mix_id]
elif keep_classification == 'drama':
update_fields['Novel_IDs'] = [id for id in novel_ids if id != mix_id]
update_fields['Anime_IDs'] = [id for id in anime_ids if id != mix_id]
update_fields['Drama_IDs'] = drama_ids
# 更新Rankings_management - 优先使用mix_id
if mix_id:
rankings_management_collection.update_one(
{"mix_id": mix_id},
{"$set": update_fields}
)
else:
rankings_management_collection.update_one(
{"mix_name": mix_name},
{"$set": update_fields}
)
# 更新Ranking_storage - 优先使用mix_id
if mix_id:
collection.update_many(
{"data.mix_id": mix_id},
{"$set": {
f"data.$.Novel_IDs": update_fields['Novel_IDs'],
f"data.$.Anime_IDs": update_fields['Anime_IDs'],
f"data.$.Drama_IDs": update_fields['Drama_IDs']
}}
)
else:
collection.update_many(
{"data.mix_name": mix_name},
{"$set": {
f"data.$.Novel_IDs": update_fields['Novel_IDs'],
f"data.$.Anime_IDs": update_fields['Anime_IDs'],
f"data.$.Drama_IDs": update_fields['Drama_IDs']
}}
)
fixed_count += 1
logging.info(f"修复分类冲突: {mix_name} 保留为 {keep_classification} 分类")
return {
"success": True,
"message": f"分类互斥性验证完成",
"data": {
"total_checked": len(all_docs),
"conflicts_found": conflict_count,
"conflicts_fixed": fixed_count
}
}
except Exception as e:
logging.error(f"验证分类互斥性失败: {e}")
return {
"success": False,
"message": f"验证分类互斥性失败: {str(e)}"
}
def sync_ranking_storage_fields(target_date=None, force_update=False, max_retries=3, retry_delay=60):
"""
同步Ranking_storage中的字段信息
统一从Rankings_management中获取对应的字段值并保存到Ranking_storage
Args:
target_date: 目标日期,格式为'YYYY-MM-DD',默认为今天
force_update: 是否强制更新已有字段默认False
max_retries: 最大重试次数默认3次
retry_delay: 重试间隔默认60秒
Returns:
dict: 同步结果统计
"""
try:
# 设置目标日期
if target_date is None:
target_date_obj = datetime.now().date()
target_date = target_date_obj.strftime('%Y-%m-%d')
else:
target_date_obj = datetime.strptime(target_date, '%Y-%m-%d').date()
# 获取Ranking_storage中指定日期的数据
ranking_storage_query = {"date": target_date}
ranking_storage_items = list(daily_rankings_collection.find(ranking_storage_query))
if not ranking_storage_items:
return {
"success": False,
"message": f"未找到日期 {target_date} 的Ranking_storage数据"
}
# 统计信息
total_items = len(ranking_storage_items)
updated_items = 0
skipped_items = 0
error_items = 0
retry_count = 0 # 重试次数计数器
pending_items = [] # 需要重试的项目
# 🔄 修复后的同步逻辑更新data数组中的每个项目
for ranking_doc in ranking_storage_items:
try:
# 获取data数组
data_array = ranking_doc.get('data', [])
if not data_array:
logging.warning(f"Ranking_storage文档没有data数组: {ranking_doc.get('_id')}")
skipped_items += 1
continue
# 标记是否有任何项目被更新
doc_updated = False
updated_data_array = []
# 遍历data数组中的每个项目
for data_item in data_array:
try:
mix_name = data_item.get('mix_name', '').strip()
# 🚫 跳过无效数据确保mix_name不为空
if not mix_name or mix_name == "" or mix_name.lower() == "null":
logging.warning(f"跳过空的或无效的mix_name记录: {data_item.get('_id', 'unknown')}")
continue # 不添加到updated_data_array直接跳过
# 🔧 优化逻辑优先使用mix_id进行查询提高准确性
source_data = None
mix_id = data_item.get('mix_id')
# 使用通用查询函数优先mix_id查询
query_conditions = {}
if mix_id:
query_conditions['mix_id'] = mix_id
if mix_name:
query_conditions['mix_name'] = mix_name
# 使用find_management_data函数进行查询
if query_conditions:
source_data = find_management_data(query_conditions, target_date)
# 如果还是没找到尝试通过title匹配
if not source_data:
title = data_item.get('title')
if title and title.strip():
title_query = {"mix_name": title.strip()}
source_data = find_management_data(title_query, target_date)
if source_data:
logging.info(f"通过title找到数据: {title} -> {source_data.get('mix_name', 'N/A')}")
# 如果找到了源数据更新mix_name如果原来为空的话
if source_data and not mix_name:
mix_name = source_data.get('mix_name', '').strip()
if mix_name:
data_item['mix_name'] = mix_name
logging.info(f"修复空的mix_name: {data_item.get('title', 'N/A')} -> {mix_name}")
else:
logging.warning(f"源数据中的mix_name也为空跳过此记录")
continue # 跳过无效记录
# 如果还是没有找到源数据,检查是否有锁定字段需要保护
if not source_data:
logging.warning(f"无法找到对应的源数据: mix_name={mix_name}, mix_id={data_item.get('mix_id')}, title={data_item.get('title')}")
# 检查是否有锁定字段,如果有锁定字段,保持原数据不变
field_lock_status = ranking_doc.get('field_lock_status', {})
has_locked_fields = any([
field_lock_status.get('Manufacturing_Field_locked', False),
field_lock_status.get('Copyright_field_locked', False),
field_lock_status.get('Novel_IDs_locked', False),
field_lock_status.get('Anime_IDs_locked', False),
field_lock_status.get('Drama_IDs_locked', False)
])
if has_locked_fields:
logging.info(f"保持锁定字段不变: {mix_name} (无源数据但有锁定字段)")
updated_data_array.append(data_item)
else:
# 只有当mix_name有效且没有锁定字段时才保留记录
if mix_name and mix_name.strip():
updated_data_array.append(data_item)
continue
# 检查是否需要更新 - 包含所有Rankings_management字段
fields_to_check = {
# 基础字段
'batch_id': data_item.get('batch_id'),
'batch_time': data_item.get('batch_time'),
'item_sequence': data_item.get('item_sequence'),
'mix_id': data_item.get('mix_id'),
'playcount': data_item.get('playcount'),
'request_id': data_item.get('request_id'),
# 封面相关字段
'cover_image_url_original': data_item.get('cover_image_url_original'),
'cover_upload_success': data_item.get('cover_upload_success'),
'cover_backup_urls': data_item.get('cover_backup_urls'),
# 内容字段
'desc': data_item.get('desc'),
'series_author': data_item.get('series_author'),
'updated_to_episode': data_item.get('updated_to_episode'),
'episode_video_ids': data_item.get('episode_video_ids'),
'episode_details': data_item.get('episode_details'),
# 状态字段
'data_status': data_item.get('data_status'),
'realtime_saved': data_item.get('realtime_saved'),
'created_at': data_item.get('created_at'),
'last_updated': data_item.get('last_updated'),
'Manufacturing_Field': data_item.get('Manufacturing_Field'),
'Copyright_field': data_item.get('Copyright_field'),
# 新增:内容分类字段
'Novel_IDs': data_item.get('Novel_IDs', []),
'Anime_IDs': data_item.get('Anime_IDs', []),
'Drama_IDs': data_item.get('Drama_IDs', []),
# 计算字段
}
# 🔒 检查字段锁定状态
field_lock_status = ranking_doc.get('field_lock_status', {})
manufacturing_locked = field_lock_status.get('Manufacturing_Field_locked', False)
copyright_locked = field_lock_status.get('Copyright_field_locked', False)
novel_ids_locked = field_lock_status.get('Novel_IDs_locked', False)
anime_ids_locked = field_lock_status.get('Anime_IDs_locked', False)
drama_ids_locked = field_lock_status.get('Drama_IDs_locked', False)
# 检查哪些字段需要更新
needs_update = False
for field_name, field_value in fields_to_check.items():
# 🔒 字段锁定保护:如果字段已锁定,跳过更新
if field_name == 'Manufacturing_Field' and manufacturing_locked:
logging.info(f"[字段锁定] 跳过Manufacturing_Field更新: {mix_name} (已锁定)")
continue
elif field_name == 'Copyright_field' and copyright_locked:
logging.info(f"[字段锁定] 跳过Copyright_field更新: {mix_name} (已锁定)")
continue
elif field_name == 'Novel_IDs' and novel_ids_locked:
logging.info(f"[字段锁定] 跳过Novel_IDs更新: {mix_name} (已锁定)")
continue
elif field_name == 'Anime_IDs' and anime_ids_locked:
logging.info(f"[字段锁定] 跳过Anime_IDs更新: {mix_name} (已锁定)")
continue
elif field_name == 'Drama_IDs' and drama_ids_locked:
logging.info(f"[字段锁定] 跳过Drama_IDs更新: {mix_name} (已锁定)")
continue
# 对于数组字段,检查是否为空数组
if field_name in ['cover_backup_urls', 'episode_video_ids', 'episode_details', 'Novel_IDs', 'Anime_IDs', 'Drama_IDs']:
if force_update or field_value is None or (isinstance(field_value, list) and len(field_value) == 0):
needs_update = True
break
# 对于其他字段,使用原来的条件
elif force_update or field_value is None or field_value == '' or field_value == 0:
needs_update = True
break
if not needs_update:
updated_data_array.append(data_item)
continue
# 从源数据获取字段值并更新data_item
item_updated = False
for field_name, current_value in fields_to_check.items():
# 🔒 字段锁定保护:如果字段已锁定,跳过更新
if field_name == 'Manufacturing_Field' and manufacturing_locked:
logging.info(f"[字段锁定] 保护Manufacturing_Field不被覆盖: {mix_name}")
continue
elif field_name == 'Copyright_field' and copyright_locked:
logging.info(f"[字段锁定] 保护Copyright_field不被覆盖: {mix_name}")
continue
elif field_name == 'Novel_IDs' and novel_ids_locked:
logging.info(f"[字段锁定] 保护Novel_IDs不被覆盖: {mix_name}")
continue
elif field_name == 'Anime_IDs' and anime_ids_locked:
logging.info(f"[字段锁定] 保护Anime_IDs不被覆盖: {mix_name}")
continue
elif field_name == 'Drama_IDs' and drama_ids_locked:
logging.info(f"[字段锁定] 保护Drama_IDs不被覆盖: {mix_name}")
continue
# 对于数组字段,检查是否为空数组
should_update = False
if field_name in ['cover_backup_urls', 'episode_video_ids', 'episode_details', 'Novel_IDs', 'Anime_IDs', 'Drama_IDs']:
should_update = force_update or current_value is None or (isinstance(current_value, list) and len(current_value) == 0)
else:
should_update = force_update or current_value is None or current_value == '' or current_value == 0
if should_update:
if field_name == 'episode_details':
# 特殊处理episode_details字段直接从源数据复制
data_item[field_name] = source_data.get(field_name, [])
item_updated = True
elif field_name == 'cover_backup_urls':
# 特殊处理cover_backup_urls字段确保是数组格式
cover_backup_urls = source_data.get(field_name, [])
if not isinstance(cover_backup_urls, list):
cover_backup_urls = []
data_item[field_name] = cover_backup_urls
item_updated = True
elif field_name == 'episode_video_ids':
# 特殊处理episode_video_ids字段确保是数组格式
episode_video_ids = source_data.get(field_name, [])
if not isinstance(episode_video_ids, list):
episode_video_ids = []
data_item[field_name] = episode_video_ids
item_updated = True
elif field_name in ['Novel_IDs', 'Anime_IDs', 'Drama_IDs']:
# 特殊处理分类字段,确保是数组格式和互斥性
classification_ids = source_data.get(field_name, [])
if not isinstance(classification_ids, list):
classification_ids = []
# 确保分类互斥性:如果当前字段有值,清空其他分类字段
if classification_ids:
if field_name == 'Novel_IDs':
data_item['Anime_IDs'] = []
data_item['Drama_IDs'] = []
elif field_name == 'Anime_IDs':
data_item['Novel_IDs'] = []
data_item['Drama_IDs'] = []
elif field_name == 'Drama_IDs':
data_item['Novel_IDs'] = []
data_item['Anime_IDs'] = []
data_item[field_name] = classification_ids
item_updated = True
else:
# 对于其他字段,直接从源数据获取
source_value = source_data.get(field_name, '')
data_item[field_name] = source_value
item_updated = True
# 🔒 保护重要字段:确保不覆盖播放量差值等关键数据
# timeline_data字段必须保留
# 保护其他重要的计算字段
protected_fields = ['rank', 'play_vv', 'video_id', 'video_url', 'cover_image_url', 'playcount_str', 'timeline_data']
# 这些字段不会被覆盖因为它们不在fields_to_check中
if item_updated:
doc_updated = True
logging.info(f"✅ 成功同步data项目字段: {mix_name}")
updated_data_array.append(data_item)
except Exception as e:
logging.error(f"同步data项目失败 {data_item.get('mix_name', 'N/A')}: {e}")
# 保持原数据不变
updated_data_array.append(data_item)
continue
# 如果有任何项目被更新更新整个文档的data数组
if doc_updated:
daily_rankings_collection.update_one(
{"_id": ranking_doc["_id"]},
{"$set": {"data": updated_data_array}}
)
updated_items += 1
logging.info(f"✅ 成功更新Ranking_storage文档的data数组: {ranking_doc.get('date', 'N/A')}")
else:
skipped_items += 1
except Exception as e:
logging.error(f"同步Ranking_storage文档失败 {ranking_doc.get('_id')}: {e}")
error_items += 1
continue
# 新的同步逻辑已经直接处理data数组不需要重试机制
return {
"success": True,
"message": f"同步完成(重试 {retry_count} 次)",
"stats": {
"target_date": target_date,
"total_items": total_items,
"updated_items": updated_items,
"skipped_items": skipped_items,
"error_items": error_items,
"retry_count": retry_count,
"pending_items_final": len(pending_items),
"data_source": "Rankings_management"
}
}
except Exception as e:
logging.error(f"同步Ranking_storage字段失败: {e}")
return {
"success": False,
"message": f"同步失败: {str(e)}"
}
@rank_bp.route('/sync_ranking_fields', methods=['POST'])
def sync_ranking_fields():
"""
API端点同步Ranking_storage字段
"""
try:
data = request.get_json() or {}
target_date = data.get('target_date')
force_update = data.get('force_update', False)
result = sync_ranking_storage_fields(target_date, force_update)
if result["success"]:
return jsonify(result)
else:
return jsonify(result), 400
except Exception as e:
logging.error(f"同步API调用失败: {e}")
return jsonify({
"success": False,
"message": f"API调用失败: {str(e)}"
}), 500
@rank_bp.route('/validate_classification_exclusivity', methods=['POST'])
def validate_classification_exclusivity_api():
"""
API端点验证和修复分类互斥性
确保每个短剧只属于一个分类Novel_IDs、Anime_IDs、Drama_IDs
"""
try:
result = validate_and_fix_classification_exclusivity()
if result["success"]:
return jsonify(result)
else:
return jsonify(result), 400
except Exception as e:
logging.error(f"验证分类互斥性API失败: {e}")
return jsonify({
"success": False,
"message": f"验证分类互斥性失败: {str(e)}"
}), 500