修改数据库存储的问题,优化时出现的小问题

This commit is contained in:
qiaoyirui0819 2025-11-04 21:32:40 +08:00
parent 4057620cf4
commit 39239c3e85
2 changed files with 176 additions and 91 deletions

View File

@ -343,13 +343,38 @@ class DouyinAutoScheduler:
management_data = rankings_management_collection.find_one({ management_data = rankings_management_collection.find_one({
"mix_name": mix_name, "mix_name": mix_name,
"$or": [ "$or": [
{"created_at": {"$gte": datetime.strptime(today_str, '%Y-%m-%d'), {"created_at": {"$gte": datetime.strptime(today_str, '%Y-%m-%d'),
"$lt": datetime.strptime(today_str, '%Y-%m-%d') + timedelta(days=1)}}, "$lt": datetime.strptime(today_str, '%Y-%m-%d') + timedelta(days=1)}},
{"last_updated": {"$gte": datetime.strptime(today_str, '%Y-%m-%d'), {"last_updated": {"$gte": datetime.strptime(today_str, '%Y-%m-%d'),
"$lt": datetime.strptime(today_str, '%Y-%m-%d') + timedelta(days=1)}} "$lt": datetime.strptime(today_str, '%Y-%m-%d') + timedelta(days=1)}}
] ]
}) })
# 🔑 如果今天没有数据,查询昨天的 Rankings_management仅用于获取分类字段和锁定状态
classification_data = None
if not management_data:
# 查询昨天的 Rankings_management
yesterday_start = datetime.strptime(yesterday_str, '%Y-%m-%d')
yesterday_end = yesterday_start + timedelta(days=1)
classification_data = rankings_management_collection.find_one({
"mix_name": mix_name,
"$or": [
{"created_at": {"$gte": yesterday_start, "$lt": yesterday_end}},
{"last_updated": {"$gte": yesterday_start, "$lt": yesterday_end}}
]
})
if classification_data:
novel_ids = classification_data.get('Novel_IDs', [])
anime_ids = classification_data.get('Anime_IDs', [])
drama_ids = classification_data.get('Drama_IDs', [])
logging.info(f"📋 今天没有数据,从昨天的 Rankings_management 获取分类: {mix_name}")
logging.info(f" - Novel_IDs: {novel_ids}")
logging.info(f" - Anime_IDs: {anime_ids}")
logging.info(f" - Drama_IDs: {drama_ids}")
logging.info(f" - last_updated: {classification_data.get('last_updated')}")
else:
logging.warning(f"⚠️ 今天和昨天都没有数据: {mix_name}")
ranking_item = { ranking_item = {
# 🎯 核心榜单字段 # 🎯 核心榜单字段
"rank": rank, # 使用排名计数器 "rank": rank, # 使用排名计数器
@ -380,11 +405,29 @@ class DouyinAutoScheduler:
"realtime_saved": management_data.get("realtime_saved", True) if management_data else True, "realtime_saved": management_data.get("realtime_saved", True) if management_data else True,
"created_at": management_data.get("created_at") if management_data else None, "created_at": management_data.get("created_at") if management_data else None,
"last_updated": management_data.get("last_updated") if management_data else None, "last_updated": management_data.get("last_updated") if management_data else None,
# 🔑 分类字段:区分今天数据和历史数据
# - 如果今天有数据:从今天的数据获取所有字段
# - 如果今天没有数据:只从历史记录获取分类字段和锁定状态,其他字段为空
"Manufacturing_Field": management_data.get("Manufacturing_Field", "") if management_data else "", "Manufacturing_Field": management_data.get("Manufacturing_Field", "") if management_data else "",
"Copyright_field": management_data.get("Copyright_field", "") if management_data else "", "Copyright_field": management_data.get("Copyright_field", "") if management_data else "",
"Novel_IDs": management_data.get("Novel_IDs", []) if management_data else [], "Novel_IDs": (
"Anime_IDs": management_data.get("Anime_IDs", []) if management_data else [], management_data.get("Novel_IDs", []) if management_data
"Drama_IDs": management_data.get("Drama_IDs", []) if management_data else [], else (classification_data.get("Novel_IDs", []) if classification_data else [])
),
"Anime_IDs": (
management_data.get("Anime_IDs", []) if management_data
else (classification_data.get("Anime_IDs", []) if classification_data else [])
),
"Drama_IDs": (
management_data.get("Drama_IDs", []) if management_data
else (classification_data.get("Drama_IDs", []) if classification_data else [])
),
# 🔒 锁定状态:同样区分今天数据和历史数据
"field_lock_status": (
management_data.get("field_lock_status", {}) if management_data
else (classification_data.get("field_lock_status", {}) if classification_data else {})
),
# 📊 时间轴对比数据(重要:包含播放量差值) # 📊 时间轴对比数据(重要:包含播放量差值)
"timeline_data": { "timeline_data": {

View File

@ -17,7 +17,6 @@ rank_bp = Blueprint('rank', __name__, url_prefix='/api/rank')
# 获取数据库集合 # 获取数据库集合
collection = db['Ranking_storage'] # 主要数据源榜单存储表包含data数组 collection = db['Ranking_storage'] # 主要数据源榜单存储表包含data数组
rankings_management_collection = db['Rankings_management'] # 管理数据库(字段同步源) rankings_management_collection = db['Rankings_management'] # 管理数据库(字段同步源)
daily_rankings_collection = db['Ranking_storage'] # 榜单存储表
def format_playcount(playcount_str): def format_playcount(playcount_str):
"""格式化播放量字符串为数字""" """格式化播放量字符串为数字"""
@ -43,18 +42,6 @@ def format_playcount(playcount_str):
except: except:
return 0 return 0
def format_cover_url(cover_data):
"""格式化封面图片URL"""
if not cover_data:
return ""
if isinstance(cover_data, str):
return cover_data
elif isinstance(cover_data, dict) and 'url_list' in cover_data:
return cover_data['url_list'][0] if cover_data['url_list'] else ""
else:
return ""
def format_time(time_obj): def format_time(time_obj):
"""格式化时间""" """格式化时间"""
if not time_obj: if not time_obj:
@ -195,35 +182,6 @@ def sort_ranking_data(ranking_data, sort_by, sort_order='desc'):
def parse_formatted_count(formatted_str):
"""解析格式化的数字字符串(如"1.2万""374W"等)"""
try:
if not formatted_str or formatted_str == "0":
return 0
formatted_str = str(formatted_str).strip()
# 处理万、W等单位
if "" in formatted_str or "W" in formatted_str:
# 提取数字部分
import re
numbers = re.findall(r'[\d.]+', formatted_str)
if numbers:
num = float(numbers[0])
return int(num * 10000)
elif "亿" in formatted_str:
numbers = re.findall(r'[\d.]+', formatted_str)
if numbers:
num = float(numbers[0])
return int(num * 100000000)
else:
# 尝试直接转换为数字
return int(float(formatted_str))
except:
return 0
return 0
def format_interaction_count(count): def format_interaction_count(count):
"""格式化互动数量为易读格式""" """格式化互动数量为易读格式"""
try: try:
@ -392,6 +350,42 @@ def get_mix_list(page=1, limit=20, sort_by="playcount", classification_type=None
logging.error(f"获取合集列表失败: {e}") logging.error(f"获取合集列表失败: {e}")
return {"success": False, "message": f"获取数据失败: {str(e)}"} return {"success": False, "message": f"获取数据失败: {str(e)}"}
def get_yesterday_classification_data(mix_name, field_name):
"""
获取昨天的分类数据
Args:
mix_name: 短剧名称
field_name: 分类字段名 (Novel_IDs, Anime_IDs, Drama_IDs)
Returns:
昨天的分类数据列表或None
"""
try:
# 获取昨天的日期
yesterday = datetime.now().date() - timedelta(days=1)
yesterday_str = yesterday.strftime("%Y-%m-%d")
# 从Ranking_storage查询昨天的数据
yesterday_doc = collection.find_one({
"date": yesterday_str,
"data.mix_name": mix_name
})
if yesterday_doc:
# 在data数组中查找对应的项目
for data_item in yesterday_doc.get("data", []):
if data_item.get("mix_name") == mix_name:
classification_ids = data_item.get(field_name, [])
if isinstance(classification_ids, list) and classification_ids:
logging.info(f"从昨天数据获取到分类信息: {mix_name} -> {field_name}: {classification_ids}")
return classification_ids
return None
except Exception as e:
logging.error(f"获取昨天分类数据失败: {e}")
return None
def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None, classification_type=None): def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None, classification_type=None):
"""获取按播放量增长排序的合集列表 - 直接从Ranking_storage读取对应日期的数据""" """获取按播放量增长排序的合集列表 - 直接从Ranking_storage读取对应日期的数据"""
try: try:
@ -425,7 +419,7 @@ def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None, classific
# 检查并自动同步Ranking_storage字段信息 # 检查并自动同步Ranking_storage字段信息
# 检查是否需要同步字段信息 # 检查是否需要同步字段信息
sample_item = daily_rankings_collection.find_one({ sample_item = collection.find_one({
"date": target_date, "date": target_date,
"mix_name": {"$exists": True} "mix_name": {"$exists": True}
}) })
@ -444,14 +438,14 @@ def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None, classific
logging.warning(f"自动同步失败: {sync_result['message']}") logging.warning(f"自动同步失败: {sync_result['message']}")
# 从Ranking_storage读取预计算的增长榜数据 # 从Ranking_storage读取预计算的增长榜数据
growth_ranking = daily_rankings_collection.find_one({ growth_ranking = collection.find_one({
"date": target_date, "date": target_date,
"type": "comprehensive" # 使用comprehensive类型包含增长数据 "type": "comprehensive" # 使用comprehensive类型包含增长数据
}, sort=[("calculation_sequence", -1)]) # 获取最新的计算结果 }, sort=[("calculation_sequence", -1)]) # 获取最新的计算结果
if not growth_ranking or "data" not in growth_ranking: if not growth_ranking or "data" not in growth_ranking:
# 如果没有找到comprehensive类型尝试查找growth类型 # 如果没有找到comprehensive类型尝试查找growth类型
growth_ranking = daily_rankings_collection.find_one({ growth_ranking = collection.find_one({
"date": target_date, "date": target_date,
"type": "growth" "type": "growth"
}, sort=[("calculation_sequence", -1)]) }, sort=[("calculation_sequence", -1)])
@ -473,24 +467,41 @@ def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None, classific
if classification_type: if classification_type:
classification_field_map = { classification_field_map = {
"novel": "Novel_IDs", "novel": "Novel_IDs",
"anime": "Anime_IDs", "anime": "Anime_IDs",
"drama": "Drama_IDs" "drama": "Drama_IDs"
} }
if classification_type in classification_field_map: if classification_type in classification_field_map:
field_name = classification_field_map[classification_type] field_name = classification_field_map[classification_type]
filtered_data = [] filtered_data = []
for item in growth_data: for item in growth_data:
mix_name = item.get("mix_name", "")
mix_id = item.get("mix_id", "") mix_id = item.get("mix_id", "")
if mix_id:
# 查找对应的Rankings_management记录获取分类信息 # 检查当前数据是否有分类信息
management_item = rankings_management_collection.find_one({"mix_id": mix_id}) current_classification_ids = item.get(field_name, [])
if management_item:
classification_ids = management_item.get(field_name, []) # 如果当前数据有分类信息,直接使用
if isinstance(classification_ids, list) and mix_id in classification_ids: if isinstance(current_classification_ids, list) and current_classification_ids:
filtered_data.append(item) if mix_id and mix_id in current_classification_ids:
filtered_data.append(item)
elif not mix_id and mix_name:
# 如果没有mix_id但有mix_name检查分类字段是否包含该短剧
filtered_data.append(item)
else:
# 如果当前数据没有分类信息,尝试从昨天数据获取
if mix_name:
yesterday_classification_ids = get_yesterday_classification_data(mix_name, field_name)
if yesterday_classification_ids:
# 使用昨天的分类数据
if mix_id and mix_id in yesterday_classification_ids:
filtered_data.append(item)
elif not mix_id:
# 如果没有mix_id直接使用昨天的分类数据
filtered_data.append(item)
logging.info(f"使用昨天分类数据: {mix_name} -> {field_name}")
growth_data = filtered_data growth_data = filtered_data
# 分页处理 # 分页处理
@ -507,7 +518,7 @@ def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None, classific
# 优化直接从Ranking_storage中获取已同步的字段信息 # 优化直接从Ranking_storage中获取已同步的字段信息
# 查找对应日期的Ranking_storage记录 # 查找对应日期的Ranking_storage记录
ranking_storage_item = daily_rankings_collection.find_one({ ranking_storage_item = collection.find_one({
"date": target_date, "date": target_date,
"mix_name": mix_name "mix_name": mix_name
}) })
@ -927,14 +938,14 @@ def get_rankings():
# 如果没有指定日期,默认获取最新日期的榜单 # 如果没有指定日期,默认获取最新日期的榜单
if not date: if not date:
latest_ranking = daily_rankings_collection.find_one( latest_ranking = collection.find_one(
{}, sort=[('date', -1)] {}, sort=[('date', -1)]
) )
if latest_ranking: if latest_ranking:
query['date'] = latest_ranking['date'] query['date'] = latest_ranking['date']
# 查询榜单 # 查询榜单
rankings = list(daily_rankings_collection.find(query).sort('generated_at', -1)) rankings = list(collection.find(query).sort('generated_at', -1))
if not rankings: if not rankings:
return jsonify({ return jsonify({
@ -1001,7 +1012,7 @@ def get_ranking_dates():
"""获取可用的榜单日期列表""" """获取可用的榜单日期列表"""
try: try:
# 获取所有不重复的日期 # 获取所有不重复的日期
dates = daily_rankings_collection.distinct('date') dates = collection.distinct('date')
dates.sort(reverse=True) # 按日期倒序排列 dates.sort(reverse=True) # 按日期倒序排列
return jsonify({ return jsonify({
@ -1023,7 +1034,7 @@ def get_ranking_types():
"""获取支持的榜单类型""" """获取支持的榜单类型"""
try: try:
# 获取所有不重复的榜单类型 # 获取所有不重复的榜单类型
types = daily_rankings_collection.distinct('ranking_type') types = collection.distinct('ranking_type')
# 添加类型说明 # 添加类型说明
type_descriptions = { type_descriptions = {
@ -1058,7 +1069,7 @@ def get_latest_rankings():
"""获取最新的所有类型榜单""" """获取最新的所有类型榜单"""
try: try:
# 获取最新日期 # 获取最新日期
latest_ranking = daily_rankings_collection.find_one( latest_ranking = collection.find_one(
{}, sort=[('date', -1)] {}, sort=[('date', -1)]
) )
@ -1075,7 +1086,7 @@ def get_latest_rankings():
latest_date = latest_ranking['date'] latest_date = latest_ranking['date']
# 获取该日期的所有榜单 # 获取该日期的所有榜单
rankings = list(daily_rankings_collection.find({ rankings = list(collection.find({
'date': latest_date 'date': latest_date
}).sort('ranking_type', 1)) }).sort('ranking_type', 1))
@ -1113,17 +1124,17 @@ def get_rankings_stats():
"""获取榜单统计信息""" """获取榜单统计信息"""
try: try:
# 统计总榜单数 # 统计总榜单数
total_rankings = daily_rankings_collection.count_documents({}) total_rankings = collection.count_documents({})
# 统计日期数量 # 统计日期数量
total_dates = len(daily_rankings_collection.distinct('date')) total_dates = len(collection.distinct('date'))
# 统计榜单类型数量 # 统计榜单类型数量
total_types = len(daily_rankings_collection.distinct('ranking_type')) total_types = len(collection.distinct('ranking_type'))
# 获取最新和最早日期 # 获取最新和最早日期
latest_ranking = daily_rankings_collection.find_one({}, sort=[('date', -1)]) latest_ranking = collection.find_one({}, sort=[('date', -1)])
earliest_ranking = daily_rankings_collection.find_one({}, sort=[('date', 1)]) earliest_ranking = collection.find_one({}, sort=[('date', 1)])
latest_date = latest_ranking['date'] if latest_ranking else None latest_date = latest_ranking['date'] if latest_ranking else None
earliest_date = earliest_ranking['date'] if earliest_ranking else None earliest_date = earliest_ranking['date'] if earliest_ranking else None
@ -1628,7 +1639,7 @@ def sync_ranking_storage_fields(target_date=None, force_update=False, max_retrie
# 获取Ranking_storage中指定日期的数据 # 获取Ranking_storage中指定日期的数据
ranking_storage_query = {"date": target_date} ranking_storage_query = {"date": target_date}
ranking_storage_items = list(daily_rankings_collection.find(ranking_storage_query)) ranking_storage_items = list(collection.find(ranking_storage_query))
if not ranking_storage_items: if not ranking_storage_items:
return { return {
@ -1706,8 +1717,8 @@ def sync_ranking_storage_fields(target_date=None, force_update=False, max_retrie
if not source_data: if not source_data:
logging.warning(f"无法找到对应的源数据: mix_name={mix_name}, mix_id={data_item.get('mix_id')}, title={data_item.get('title')}") logging.warning(f"无法找到对应的源数据: mix_name={mix_name}, mix_id={data_item.get('mix_id')}, title={data_item.get('title')}")
# 检查是否有锁定字段,如果有锁定字段,保持原数据不变 # 检查是否有锁定字段,如果有锁定字段,保持原数据不变(从 data_item 获取)
field_lock_status = ranking_doc.get('field_lock_status', {}) field_lock_status = data_item.get('field_lock_status', {})
has_locked_fields = any([ has_locked_fields = any([
field_lock_status.get('Manufacturing_Field_locked', False), field_lock_status.get('Manufacturing_Field_locked', False),
field_lock_status.get('Copyright_field_locked', False), field_lock_status.get('Copyright_field_locked', False),
@ -1716,11 +1727,23 @@ def sync_ranking_storage_fields(target_date=None, force_update=False, max_retrie
field_lock_status.get('Drama_IDs_locked', False) field_lock_status.get('Drama_IDs_locked', False)
]) ])
# 检查是否有用户设置的数据(锁定字段或分类数据)
has_user_data = has_locked_fields or any([
data_item.get('Manufacturing_Field'),
data_item.get('Copyright_field'),
data_item.get('Novel_IDs'),
data_item.get('Anime_IDs'),
data_item.get('Drama_IDs')
])
if has_locked_fields: if has_locked_fields:
logging.info(f"保持锁定字段不变: {mix_name} (无源数据但有锁定字段)") logging.info(f"保持锁定字段不变: {mix_name} (无源数据但有锁定字段)")
updated_data_array.append(data_item) updated_data_array.append(data_item)
elif has_user_data:
logging.info(f"保持用户设置的数据: {mix_name} (无源数据但有用户数据)")
updated_data_array.append(data_item)
else: else:
# 只有当mix_name有效且没有锁定字段时才保留记录 # 只有当mix_name有效时才保留记录
if mix_name and mix_name.strip(): if mix_name and mix_name.strip():
updated_data_array.append(data_item) updated_data_array.append(data_item)
continue continue
@ -1758,8 +1781,8 @@ def sync_ranking_storage_fields(target_date=None, force_update=False, max_retrie
# 计算字段 # 计算字段
} }
# 🔒 检查字段锁定状态 # 🔒 检查字段锁定状态(从 data_item 获取,而不是 ranking_doc
field_lock_status = ranking_doc.get('field_lock_status', {}) field_lock_status = data_item.get('field_lock_status', {})
manufacturing_locked = field_lock_status.get('Manufacturing_Field_locked', False) manufacturing_locked = field_lock_status.get('Manufacturing_Field_locked', False)
copyright_locked = field_lock_status.get('Copyright_field_locked', False) copyright_locked = field_lock_status.get('Copyright_field_locked', False)
novel_ids_locked = field_lock_status.get('Novel_IDs_locked', False) novel_ids_locked = field_lock_status.get('Novel_IDs_locked', False)
@ -1852,20 +1875,39 @@ def sync_ranking_storage_fields(target_date=None, force_update=False, max_retrie
if not isinstance(classification_ids, list): if not isinstance(classification_ids, list):
classification_ids = [] classification_ids = []
# 确保分类互斥性:如果当前字段有值,清空其他分类字段 # 🔑 关键修复:只有当源数据有值时才更新,否则保留用户设置
if classification_ids: if classification_ids:
# 源数据有值,更新分类字段
# 确保分类互斥性:如果当前字段有值,清空其他分类字段(但要检查锁定状态)
if field_name == 'Novel_IDs': if field_name == 'Novel_IDs':
data_item['Anime_IDs'] = [] # 只有在其他字段未锁定时才清空
data_item['Drama_IDs'] = [] if not anime_ids_locked:
data_item['Anime_IDs'] = []
if not drama_ids_locked:
data_item['Drama_IDs'] = []
elif field_name == 'Anime_IDs': elif field_name == 'Anime_IDs':
data_item['Novel_IDs'] = [] if not novel_ids_locked:
data_item['Drama_IDs'] = [] data_item['Novel_IDs'] = []
if not drama_ids_locked:
data_item['Drama_IDs'] = []
elif field_name == 'Drama_IDs': elif field_name == 'Drama_IDs':
data_item['Novel_IDs'] = [] if not novel_ids_locked:
data_item['Anime_IDs'] = [] data_item['Novel_IDs'] = []
if not anime_ids_locked:
data_item[field_name] = classification_ids data_item['Anime_IDs'] = []
item_updated = True
data_item[field_name] = classification_ids
item_updated = True
else:
# 源数据为空,检查当前是否有用户设置的值
current_classification = data_item.get(field_name, [])
if current_classification and isinstance(current_classification, list) and len(current_classification) > 0:
# 用户已设置分类,保留不变
logging.info(f"[分类保护] 保留用户设置的 {field_name}: {mix_name}")
else:
# 当前也没有值,设置为空数组
data_item[field_name] = []
item_updated = True
else: else:
# 对于其他字段,直接从源数据获取 # 对于其他字段,直接从源数据获取
source_value = source_data.get(field_name, '') source_value = source_data.get(field_name, '')
@ -1892,7 +1934,7 @@ def sync_ranking_storage_fields(target_date=None, force_update=False, max_retrie
# 如果有任何项目被更新更新整个文档的data数组 # 如果有任何项目被更新更新整个文档的data数组
if doc_updated: if doc_updated:
daily_rankings_collection.update_one( collection.update_one(
{"_id": ranking_doc["_id"]}, {"_id": ranking_doc["_id"]},
{"$set": {"data": updated_data_array}} {"$set": {"data": updated_data_array}}
) )