#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 小程序专用抖音播放量数据API服务器 优化的数据格式和接口设计,专为小程序使用 """ from flask import Blueprint, request, jsonify from datetime import datetime, timedelta import logging import re import uuid from werkzeug.utils import secure_filename from database import db from handlers.Rankings.tos_client import oss_client # 创建蓝图 rank_bp = Blueprint('rank', __name__, url_prefix='/api/rank') # 获取数据库集合 collection = db['Ranking_storage'] # 主要数据源:榜单存储表(包含data数组) rankings_management_collection = db['Rankings_management'] # 管理数据库(字段同步源) claim_applications_collection = db['Claim_Applications'] # 认领申请集合 def format_playcount(playcount_str): """格式化播放量字符串为数字""" if not playcount_str: return 0 try: if isinstance(playcount_str, (int, float)): return int(playcount_str) playcount_str = str(playcount_str).strip() # 处理亿、万等单位 if "亿" in playcount_str: num = float(re.findall(r'[\d.]+', playcount_str)[0]) return int(num * 100000000) elif "万" in playcount_str: num = float(re.findall(r'[\d.]+', playcount_str)[0]) return int(num * 10000) else: # 尝试直接转换数字 return int(float(playcount_str)) except: return 0 def format_time(time_obj): """格式化时间""" if not time_obj: return "" if isinstance(time_obj, datetime): return time_obj.strftime("%Y-%m-%d %H:%M:%S") else: return str(time_obj) def parse_date_string(date_str): """通用日期解析函数""" try: if isinstance(date_str, str): return datetime.strptime(date_str, '%Y-%m-%d').date() return date_str except (ValueError, TypeError): logging.warning(f"无法解析日期字符串: {date_str}") return None def find_management_data(query, target_date=None): """ 通用的管理数据查询函数,优先使用mix_id进行查询 Args: query: 查询条件字典,可以包含mix_id, mix_name等字段 target_date: 目标日期(已不用于管理库过滤,保留参数兼容) Returns: 查询到的文档或None """ try: # 如果查询条件中有mix_id,优先使用mix_id查询 if 'mix_id' in query and query['mix_id']: mix_id_query = {"mix_id": query['mix_id']} result = rankings_management_collection.find_one(mix_id_query) if result: logging.info(f"通过mix_id找到管理数据: {query['mix_id']}") return result # 如果通过mix_id没找到,或者没有mix_id,尝试其他查询条件 fallback_query = {k: v for k, v in query.items() if k != 'mix_id'} if fallback_query: result = rankings_management_collection.find_one(fallback_query) if result: logging.info(f"通过备用查询找到管理数据: {fallback_query}") return result logging.warning(f"未找到匹配的管理数据: {query}") return None except Exception as e: logging.error(f"查询管理数据时出错: {e}") return None def sort_ranking_data(ranking_data, sort_by, sort_order='desc'): """ 对榜单数据进行动态排序 Args: ranking_data: 榜单数据列表 sort_by: 排序字段 (play_vv_change, play_vv_change_rate, play_vv, rank) sort_order: 排序顺序 (asc, desc) Returns: 排序后的榜单数据 """ try: # 定义排序键函数 def get_sort_key(item): if sort_by == 'play_vv_change': # 按播放量差值排序 timeline_data = item.get('timeline_data', {}) return timeline_data.get('play_vv_change', 0) elif sort_by == 'play_vv_change_rate': # 按播放量变化率排序 timeline_data = item.get('timeline_data', {}) return timeline_data.get('play_vv_change_rate', 0) elif sort_by == 'play_vv': # 按当前播放量排序 return item.get('play_vv', 0) elif sort_by == 'rank': # 按排名排序 return item.get('rank', 999999) else: # 默认按排名排序 return item.get('rank', 999999) # 执行排序 reverse = (sort_order == 'desc') # 对于排名字段,降序实际上是升序(排名越小越好) if sort_by == 'rank': reverse = (sort_order == 'asc') sorted_data = sorted(ranking_data, key=get_sort_key, reverse=reverse) # 重新分配排名 for i, item in enumerate(sorted_data, 1): item['current_sort_rank'] = i return sorted_data except Exception as e: logging.error(f"排序榜单数据失败: {e}") # 如果排序失败,返回原始数据 return ranking_data def format_interaction_count(count): """格式化互动数量为易读格式""" try: count = int(count) if count >= 100000000: # 1亿+ return f"{count / 100000000:.1f}亿" elif count >= 10000: # 1万+ return f"{count / 10000:.1f}万" else: return str(count) except: return "0" def format_mix_item(doc, target_date=None): """格式化合集数据项 - 完全按照数据库原始字段返回""" mix_name = doc.get("mix_name", "") or doc.get("title", "") # 计算总点赞数 episode_details = doc.get("episode_details", []) total_likes = 0 total_comments = 0 if episode_details: for episode in episode_details: total_likes += episode.get("likes", 0) total_comments += len(episode.get("comments", [])) # 格式化总点赞数 total_likes_formatted = format_interaction_count(total_likes) total_comments_formatted = format_interaction_count(total_comments) return { "_id": str(doc.get("_id", "")), "batch_time": format_time(doc.get("batch_time")), "mix_name": mix_name, "title": mix_name, "video_url": doc.get("video_url", ""), "playcount": doc.get("playcount", ""), "play_vv": doc.get("play_vv", 0), "request_id": doc.get("request_id", ""), "rank": doc.get("rank", 0), "cover_image_url": doc.get("cover_image_url", ""), # 基础字段 "series_author": doc.get("series_author", ""), "Manufacturing_Field": doc.get("Manufacturing_Field", ""), "Copyright_field": doc.get("Copyright_field", ""), "classification_type": doc.get("classification_type", ""), # 新增:类型/元素 "release_date": doc.get("release_date", ""), # 新增:上线日期 "desc": doc.get("desc", ""), "updated_to_episode": doc.get("updated_to_episode", 0), "cover_backup_urls": doc.get("cover_backup_urls", []), "mix_id": doc.get("mix_id", ""), "episode_video_ids": doc.get("episode_video_ids", []), "episode_details": doc.get("episode_details", []), # 点赞和评论总数 "total_likes": total_likes, "total_likes_formatted": total_likes_formatted, "total_comments": total_comments, "total_comments_formatted": total_comments_formatted, # 播放量变化数据 "timeline_data": doc.get("timeline_data", []), # 评论总结 "comments_summary": doc.get("comments_summary", ""), } def get_mix_list(page=1, limit=20, sort_by="playcount", classification_type=None): """获取合集列表(分页)- 从Ranking_storage的data数组中获取数据,支持分类筛选""" try: # 计算跳过的数量 skip = (page - 1) * limit # 设置排序字段 if sort_by == "growth": # 按增长排序需要特殊处理 return get_growth_mixes(page, limit) else: # 获取今天的日期 today = datetime.now().date() today_str = today.strftime("%Y-%m-%d") # 从Ranking_storage中获取今天的数据 ranking_doc = collection.find_one({ "date": today_str, "type": {"$in": ["comprehensive", "playcount"]} # 查找包含播放量数据的榜单 }, sort=[("calculation_sequence", -1)]) # 获取最新的计算结果 if not ranking_doc or "data" not in ranking_doc: # 如果没有找到今天的数据,返回空结果 logging.warning(f"Ranking_storage中未找到 {today_str} 的数据") return { "success": True, "message": f"暂无 {today_str} 的数据,请等待定时任务生成", "data": [], "pagination": { "page": page, "limit": limit, "total": 0, "pages": 0, "has_next": False, "has_prev": False }, "sort_by": sort_by, "data_source": "ranking_storage", "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } # 获取data数组中的数据 mix_data = ranking_doc.get("data", []) # 分类筛选逻辑 if classification_type: filtered_data = [] classification_field_map = { 'novel': 'Novel_IDs', 'anime': 'Anime_IDs', 'drama': 'Drama_IDs' } if classification_type in classification_field_map: field_name = classification_field_map[classification_type] for item in mix_data: mix_id = item.get('mix_id') if mix_id: # 检查该mix_id是否在对应的分类字段中 classification_ids = item.get(field_name, []) if isinstance(classification_ids, list) and mix_id in classification_ids: filtered_data.append(item) mix_data = filtered_data logging.info(f"分类筛选 {classification_type}: 筛选出 {len(mix_data)} 条数据") # 按播放量排序(如果需要) if sort_by == "playcount": mix_data = sorted(mix_data, key=lambda x: x.get("play_vv", 0), reverse=True) # 分页处理 total = len(mix_data) paginated_data = mix_data[skip:skip + limit] # 为分页数据添加排名并格式化 formatted_data = [] for i, item in enumerate(paginated_data): item["rank"] = skip + i + 1 # 确保mix_name字段存在 if "mix_name" not in item and "title" in item: item["mix_name"] = item["title"] # 使用format_mix_item函数格式化数据,包括计算总点赞数 formatted_item = format_mix_item(item) formatted_data.append(formatted_item) return { "success": True, "data": formatted_data, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit, "has_next": page * limit < total, "has_prev": page > 1 }, "sort_by": sort_by, "data_source": "ranking_storage", "update_time": ranking_doc.get("created_at", datetime.now()).strftime("%Y-%m-%d %H:%M:%S") if isinstance(ranking_doc.get("created_at"), datetime) else str(ranking_doc.get("created_at", "")) } except Exception as e: logging.error(f"获取合集列表失败: {e}") return {"success": False, "message": f"获取数据失败: {str(e)}"} def get_yesterday_classification_data(mix_name, field_name): """ 获取昨天的分类数据 Args: mix_name: 短剧名称 field_name: 分类字段名 (Novel_IDs, Anime_IDs, Drama_IDs) Returns: 昨天的分类数据列表或None """ try: # 获取昨天的日期 yesterday = datetime.now().date() - timedelta(days=1) yesterday_str = yesterday.strftime("%Y-%m-%d") # 从Ranking_storage查询昨天的数据 yesterday_doc = collection.find_one({ "date": yesterday_str, "data.mix_name": mix_name }) if yesterday_doc: # 在data数组中查找对应的项目 for data_item in yesterday_doc.get("data", []): if data_item.get("mix_name") == mix_name: classification_ids = data_item.get(field_name, []) if isinstance(classification_ids, list) and classification_ids: logging.info(f"从昨天数据获取到分类信息: {mix_name} -> {field_name}: {classification_ids}") return classification_ids return None except Exception as e: logging.error(f"获取昨天分类数据失败: {e}") return None def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None, classification_type=None): """获取按播放量增长排序的合集列表 - 直接从Ranking_storage读取对应日期的数据""" try: # 计算跳过的数量 skip = (page - 1) * limit # 简化日期处理:直接使用前端传来的日期 if start_date and end_date: # 如果前端提供了日期,直接使用(优先使用end_date作为查询日期) if isinstance(end_date, str): target_date = end_date else: target_date = end_date.strftime("%Y-%m-%d") elif end_date: # 如果只提供了end_date,使用end_date if isinstance(end_date, str): target_date = end_date else: target_date = end_date.strftime("%Y-%m-%d") elif start_date: # 如果只提供了start_date,使用start_date if isinstance(start_date, str): target_date = start_date else: target_date = start_date.strftime("%Y-%m-%d") else: # 如果没有提供日期,默认使用今天 target_date = datetime.now().date().strftime("%Y-%m-%d") logging.info(f"📅 查询日期: {target_date}") # 检查并自动同步Ranking_storage字段信息 # 检查是否需要同步字段信息 sample_item = collection.find_one({ "date": target_date, "mix_name": {"$exists": True} }) if sample_item: # 检查是否缺少关键字段 missing_manufacturing = sample_item.get('Manufacturing_Field') is None missing_copyright = sample_item.get('Copyright_field') is None if missing_manufacturing or missing_copyright: logging.info(f"检测到 {target_date} 的Ranking_storage数据缺少字段信息,开始自动同步...") sync_result = sync_ranking_storage_fields(target_date, force_update=False) if sync_result["success"]: logging.info(f"自动同步完成: {sync_result['stats']}") else: logging.warning(f"自动同步失败: {sync_result['message']}") # 从Ranking_storage读取预计算的增长榜数据 growth_ranking = collection.find_one({ "date": target_date, "type": "comprehensive" # 使用comprehensive类型,包含增长数据 }, sort=[("calculation_sequence", -1)]) # 获取最新的计算结果 if not growth_ranking or "data" not in growth_ranking: # 如果没有找到comprehensive类型,尝试查找growth类型 growth_ranking = collection.find_one({ "date": target_date, "type": "growth" }, sort=[("calculation_sequence", -1)]) if growth_ranking and "data" in growth_ranking: logging.info(f"📈 从Ranking_storage读取 {target_date} 的增长榜数据") # 获取预先计算好的增长榜数据 growth_data = growth_ranking["data"] # 如果是comprehensive类型,需要按增长值排序 if growth_ranking.get("type") == "comprehensive": # 按timeline_data中的play_vv_change排序 growth_data = sorted(growth_data, key=lambda x: x.get("timeline_data", {}).get("play_vv_change", 0), reverse=True) # 根据分类类型筛选数据 if classification_type: classification_field_map = { "novel": "Novel_IDs", "anime": "Anime_IDs", "drama": "Drama_IDs" } if classification_type in classification_field_map: field_name = classification_field_map[classification_type] filtered_data = [] for item in growth_data: mix_name = item.get("mix_name", "") mix_id = item.get("mix_id", "") # 检查当前数据是否有分类信息 current_classification_ids = item.get(field_name, []) # 如果当前数据有分类信息,直接使用 if isinstance(current_classification_ids, list) and current_classification_ids: if mix_id and mix_id in current_classification_ids: filtered_data.append(item) elif not mix_id and mix_name: # 如果没有mix_id但有mix_name,检查分类字段是否包含该短剧 filtered_data.append(item) else: # 如果当前数据没有分类信息,尝试从昨天数据获取 if mix_name: yesterday_classification_ids = get_yesterday_classification_data(mix_name, field_name) if yesterday_classification_ids: # 使用昨天的分类数据 if mix_id and mix_id in yesterday_classification_ids: filtered_data.append(item) elif not mix_id: # 如果没有mix_id,直接使用昨天的分类数据 filtered_data.append(item) logging.info(f"使用昨天分类数据: {mix_name} -> {field_name}") growth_data = filtered_data # 分页处理 total = len(growth_data) paginated_data = growth_data[skip:skip + limit] # 为分页数据添加排名和补充完整字段信息 for i, item in enumerate(paginated_data): item["rank"] = skip + i + 1 # 修复:使用mix_name字段,不要用空的title覆盖它 mix_name = item.get("mix_name", "") if mix_name: # 优化:直接从Ranking_storage中获取已同步的字段信息 # 查找对应日期的Ranking_storage记录 ranking_storage_item = collection.find_one({ "date": target_date, "mix_name": mix_name }) if ranking_storage_item: # 直接使用Ranking_storage中已同步的字段 item.update({ "Manufacturing_Field": ranking_storage_item.get("Manufacturing_Field", ""), "Copyright_field": ranking_storage_item.get("Copyright_field", ""), "series_author": ranking_storage_item.get("series_author", item.get("series_author", "")), "video_id": ranking_storage_item.get("video_id", item.get("video_id", "")), "video_url": ranking_storage_item.get("video_url", item.get("video_url", "")), # 保持当前item中的封面和播放量数据(来自榜单计算) "cover_image_url": item.get("cover_image_url", ranking_storage_item.get("cover_image_url", "")), "play_vv": item.get("play_vv", ranking_storage_item.get("play_vv", 0)), "playcount_str": item.get("playcount_str", ranking_storage_item.get("playcount_str", "0")) }) logging.info(f"从Ranking_storage获取到同步字段: {mix_name}") else: # 如果Ranking_storage中没有对应记录,回退到原有逻辑 logging.warning(f"Ranking_storage中未找到 {mix_name} 的记录,回退到原有查询逻辑") # 根据查询日期判断数据源 today = datetime.now().date() # 将target_date字符串转换为日期对象进行比较 try: target_date_obj = datetime.strptime(target_date, "%Y-%m-%d").date() is_historical_date = target_date_obj < today except: is_historical_date = False management_doc = None # 统一从Rankings_management获取数据 management_doc = rankings_management_collection.find_one({"mix_name": mix_name}) if management_doc: item.update({ "Manufacturing_Field": management_doc.get("Manufacturing_Field", ""), "Copyright_field": management_doc.get("Copyright_field", ""), "series_author": management_doc.get("series_author", item.get("series_author", "")), "video_id": management_doc.get("video_id", item.get("video_id", "")), "video_url": management_doc.get("video_url", item.get("video_url", "")), "cover_image_url": item.get("cover_image_url", management_doc.get("cover_image_url", "")), "play_vv": item.get("play_vv", management_doc.get("play_vv", 0)), "playcount_str": item.get("playcount_str", management_doc.get("playcount_str", "0")) }) else: # 设置默认值 item.update({ "Manufacturing_Field": "", "Copyright_field": "", "series_author": item.get("series_author", ""), "video_id": item.get("video_id", ""), "video_url": item.get("video_url", ""), "cover_image_url": item.get("cover_image_url", ""), "play_vv": item.get("play_vv", 0), "playcount_str": item.get("playcount_str", "0") }) else: item["Manufacturing_Field"] = "" item["Copyright_field"] = "" # 使用format_mix_item函数格式化所有数据,包括计算总点赞数 formatted_data = [] for item in paginated_data: formatted_item = format_mix_item(item) formatted_data.append(formatted_item) return { "success": True, "data": formatted_data, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit, "has_next": page * limit < total, "has_prev": page > 1 }, "sort_by": "growth", "date_range": { "start_date": target_date, "end_date": target_date }, "data_source": "ranking_storage", # 标识数据来源 "update_time": growth_ranking.get("created_at", datetime.now()).strftime("%Y-%m-%d %H:%M:%S") if isinstance(growth_ranking.get("created_at"), datetime) else str(growth_ranking.get("created_at", "")) } else: # 如果Ranking_storage中没有数据,返回空结果 logging.warning(f"Ranking_storage中未找到 {target_date} 的增长榜数据") return { "success": True, "message": f"暂无 {target_date} 的增长榜数据,请等待定时任务生成", "data": [], "pagination": { "page": page, "limit": limit, "total": 0, "pages": 0, "has_next": False, "has_prev": False }, "sort_by": "growth", "date_range": { "start_date": target_date, "end_date": target_date }, "data_source": "ranking_storage", "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取增长合集列表失败: {e}") # 返回错误信息,不再回退到播放量排序 return { "success": False, "message": f"获取增长榜数据失败: {str(e)}", "data": [], "pagination": { "page": page, "limit": limit, "total": 0, "pages": 0, "has_next": False, "has_prev": False }, "sort_by": "growth", "data_source": "ranking_storage" } def get_top_mixes(limit=10): """获取热门合集(TOP榜单)""" try: # 按播放量排序获取热门合集 cursor = collection.find().sort("play_vv", -1).limit(limit) docs = list(cursor) if not docs: return {"success": False, "message": "暂无数据"} # 格式化数据 top_list = [] for doc in docs: item = format_mix_item(doc) top_list.append(item) return { "success": True, "data": top_list, "total": len(top_list), "update_time": format_time(docs[0].get("batch_time")) if docs else "" } except Exception as e: logging.error(f"获取热门合集失败: {e}") return {"success": False, "message": f"获取数据失败: {str(e)}"} def search_mixes(keyword, page=1, limit=10): """搜索合集""" try: if not keyword: return {"success": False, "message": "请提供搜索关键词"} # 计算跳过的数量 skip = (page - 1) * limit # 构建搜索条件(模糊匹配合集名称) search_condition = { "mix_name": {"$regex": keyword, "$options": "i"} } # 查询数据 cursor = collection.find(search_condition).sort("play_vv", -1).skip(skip).limit(limit) docs = list(cursor) # 获取搜索结果总数 total = collection.count_documents(search_condition) # 格式化数据 search_results = [] for doc in docs: item = format_mix_item(doc) search_results.append(item) return { "success": True, "data": search_results, "keyword": keyword, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit, "has_next": page * limit < total, "has_prev": page > 1 }, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"搜索合集失败: {e}") return {"success": False, "message": f"搜索失败: {str(e)}"} def get_mix_detail(mix_id): """获取合集详情""" try: from bson import ObjectId # 尝试通过ObjectId查找 try: doc = collection.find_one({"_id": ObjectId(mix_id)}) except: # 如果ObjectId无效,尝试其他字段 doc = collection.find_one({ "$or": [ {"mix_name": mix_id}, {"request_id": mix_id} ] }) if not doc: return {"success": False, "message": "未找到合集信息"} # 格式化详细信息 - 只返回数据库原始字段 detail = format_mix_item(doc) return { "success": True, "data": detail, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取合集详情失败: {e}") return {"success": False, "message": f"获取详情失败: {str(e)}"} def get_statistics(): """获取统计信息""" try: # 基本统计 total_mixes = collection.count_documents({}) if total_mixes == 0: return {"success": False, "message": "暂无数据"} # 播放量统计 pipeline = [ { "$group": { "_id": None, "total_playcount": {"$sum": "$play_vv"}, "avg_playcount": {"$avg": "$play_vv"}, "max_playcount": {"$max": "$play_vv"}, "min_playcount": {"$min": "$play_vv"} } } ] stats_result = list(collection.aggregate(pipeline)) stats = stats_result[0] if stats_result else {} # 获取最新更新时间 latest_doc = collection.find().sort("batch_time", -1).limit(1) latest_time = "" if latest_doc: latest_list = list(latest_doc) if latest_list: latest_time = format_time(latest_list[0].get("batch_time")) # 热门分类统计(按播放量区间) categories = [ {"name": "超热门", "min": 100000000, "count": 0}, # 1亿+ {"name": "热门", "min": 50000000, "max": 99999999, "count": 0}, # 5000万-1亿 {"name": "中等", "min": 10000000, "max": 49999999, "count": 0}, # 1000万-5000万 {"name": "一般", "min": 0, "max": 9999999, "count": 0} # 1000万以下 ] for category in categories: if "max" in category: count = collection.count_documents({ "play_vv": {"$gte": category["min"], "$lte": category["max"]} }) else: count = collection.count_documents({ "play_vv": {"$gte": category["min"]} }) category["count"] = count return { "success": True, "data": { "total_mixes": total_mixes, "total_playcount": stats.get("total_playcount", 0), "avg_playcount": int(stats.get("avg_playcount", 0)), "max_playcount": stats.get("max_playcount", 0), "min_playcount": stats.get("min_playcount", 0), "categories": categories, "latest_update": latest_time }, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取统计信息失败: {e}") return {"success": False, "message": f"获取统计失败: {str(e)}"} # 路由定义 @rank_bp.route('/growth_mixes') def get_growth_mixes_route(): """获取增长榜合集列表""" page = int(request.args.get('page', 1)) limit = int(request.args.get('limit', 20)) start_date = request.args.get('start_date') end_date = request.args.get('end_date') classification_type = request.args.get('classification_type') result = get_growth_mixes(page, limit, start_date, end_date, classification_type) return jsonify(result) @rank_bp.route('/videos') def get_videos(): """获取合集列表 - 兼容app.py调用,支持分类筛选""" page = int(request.args.get('page', 1)) limit = int(request.args.get('limit', 20)) sort_by = request.args.get('sort', 'playcount') classification_type = request.args.get('classification_type') # 新增分类筛选参数 if sort_by == 'growth': start_date = request.args.get('start_date') end_date = request.args.get('end_date') result = get_growth_mixes(page, limit, start_date, end_date, classification_type) else: result = get_mix_list(page, limit, sort_by, classification_type) return jsonify(result) @rank_bp.route('/top') def get_top(): """获取热门榜单 - 兼容app.py调用""" limit = int(request.args.get('limit', 10)) result = get_top_mixes(limit) return jsonify(result) @rank_bp.route('/search') def search(): """搜索合集 - 兼容app.py调用""" keyword = request.args.get('q', '') page = int(request.args.get('page', 1)) limit = int(request.args.get('limit', 10)) result = search_mixes(keyword, page, limit) return jsonify(result) @rank_bp.route('/detail') def get_detail(): """获取合集详情 - 兼容app.py调用""" mix_id = request.args.get('id', '') result = get_mix_detail(mix_id) return jsonify(result) @rank_bp.route('/stats') def get_stats(): """获取统计信息 - 兼容app.py调用""" result = get_statistics() return jsonify(result) @rank_bp.route('/health') def health_check(): """健康检查 - 兼容app.py调用""" try: from database import client # 检查数据库连接 if not client: return jsonify({"success": False, "message": "数据库未连接"}) # 测试数据库连接 client.admin.command('ping') # 获取数据统计 total_count = collection.count_documents({}) return jsonify({ "success": True, "message": "服务正常", "data": { "database": "连接正常", "total_records": total_count, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } }) except Exception as e: logging.error(f"健康检查失败: {e}") return jsonify({"success": False, "message": f"服务异常: {str(e)}"}) # ==================== 榜单查询API接口 ==================== @rank_bp.route('/rankings') def get_rankings(): """获取榜单列表 - 支持按日期和类型查询,支持动态排序""" try: # 获取查询参数 date = request.args.get('date') # 日期,格式:YYYY-MM-DD ranking_type = request.args.get('type') # 榜单类型:playcount, growth, newcomer sort_by = request.args.get('sort_by', 'default') # 排序方式:default, play_vv_change, play_vv_change_rate, play_vv sort_order = request.args.get('sort_order', 'desc') # 排序顺序:asc, desc page = int(request.args.get('page', 1)) limit = int(request.args.get('limit', 50)) # 构建查询条件 query = {} if date: query['date'] = date if ranking_type: query['ranking_type'] = ranking_type # 如果没有指定日期,默认获取最新日期的榜单 if not date: latest_ranking = collection.find_one( {}, sort=[('date', -1)] ) if latest_ranking: query['date'] = latest_ranking['date'] # 查询榜单 rankings = list(collection.find(query).sort('generated_at', -1)) if not rankings: return jsonify({ "success": True, "message": "暂无榜单数据", "data": { "rankings": [], "total": 0, "page": page, "limit": limit } }) # 格式化返回数据 formatted_rankings = [] for ranking in rankings: ranking_data = ranking.get('data', []) # 动态排序逻辑 if sort_by != 'default' and ranking_data: ranking_data = sort_ranking_data(ranking_data, sort_by, sort_order) # 分页处理榜单数据 start_idx = (page - 1) * limit end_idx = start_idx + limit paginated_data = ranking_data[start_idx:end_idx] formatted_rankings.append({ "date": ranking.get('date'), "ranking_type": ranking.get('ranking_type'), "ranking_name": ranking.get('ranking_name'), "description": ranking.get('description'), "data": paginated_data, "total_count": len(ranking_data), "current_page_count": len(paginated_data), "generated_at": format_time(ranking.get('generated_at')), "version": ranking.get('version', '1.0'), "sort_info": { "sort_by": sort_by, "sort_order": sort_order } }) return jsonify({ "success": True, "message": "获取榜单成功", "data": { "rankings": formatted_rankings, "total": len(formatted_rankings), "page": page, "limit": limit, "sort_by": sort_by, "sort_order": sort_order } }) except Exception as e: logging.error(f"获取榜单失败: {e}") return jsonify({"success": False, "message": f"获取榜单失败: {str(e)}"}) @rank_bp.route('/rankings/dates') def get_ranking_dates(): """获取可用的榜单日期列表""" try: # 获取所有不重复的日期 dates = collection.distinct('date') dates.sort(reverse=True) # 按日期倒序排列 return jsonify({ "success": True, "message": "获取日期列表成功", "data": { "dates": dates, "total": len(dates) } }) except Exception as e: logging.error(f"获取日期列表失败: {e}") return jsonify({"success": False, "message": f"获取日期列表失败: {str(e)}"}) @rank_bp.route('/rankings/types') def get_ranking_types(): """获取支持的榜单类型""" try: # 获取所有不重复的榜单类型 types = collection.distinct('ranking_type') # 添加类型说明 type_descriptions = { 'playcount': '播放量榜 - 按播放量排序', 'growth': '增长榜 - 播放量增长最快', 'newcomer': '新晋榜 - 新上榜内容' } formatted_types = [] for type_name in types: formatted_types.append({ "type": type_name, "description": type_descriptions.get(type_name, type_name) }) return jsonify({ "success": True, "message": "获取榜单类型成功", "data": { "types": formatted_types, "total": len(formatted_types) } }) except Exception as e: logging.error(f"获取榜单类型失败: {e}") return jsonify({"success": False, "message": f"获取榜单类型失败: {str(e)}"}) @rank_bp.route('/rankings/latest') def get_latest_rankings(): """获取最新的所有类型榜单""" try: # 获取最新日期 latest_ranking = collection.find_one( {}, sort=[('date', -1)] ) if not latest_ranking: return jsonify({ "success": True, "message": "暂无榜单数据", "data": { "date": None, "rankings": [] } }) latest_date = latest_ranking['date'] # 获取该日期的所有榜单 rankings = list(collection.find({ 'date': latest_date }).sort('ranking_type', 1)) formatted_rankings = [] for ranking in rankings: # 只返回前20条数据 ranking_data = ranking.get('data', [])[:20] formatted_rankings.append({ "ranking_type": ranking.get('ranking_type'), "ranking_name": ranking.get('ranking_name'), "description": ranking.get('description'), "data": ranking_data, "total_count": ranking.get('total_count', 0), "preview_count": len(ranking_data) }) return jsonify({ "success": True, "message": "获取最新榜单成功", "data": { "date": latest_date, "rankings": formatted_rankings, "total_types": len(formatted_rankings) } }) except Exception as e: logging.error(f"获取最新榜单失败: {e}") return jsonify({"success": False, "message": f"获取最新榜单失败: {str(e)}"}) @rank_bp.route('/rankings/stats') def get_rankings_stats(): """获取榜单统计信息""" try: # 统计总榜单数 total_rankings = collection.count_documents({}) # 统计日期数量 total_dates = len(collection.distinct('date')) # 统计榜单类型数量 total_types = len(collection.distinct('ranking_type')) # 获取最新和最早日期 latest_ranking = collection.find_one({}, sort=[('date', -1)]) earliest_ranking = collection.find_one({}, sort=[('date', 1)]) latest_date = latest_ranking['date'] if latest_ranking else None earliest_date = earliest_ranking['date'] if earliest_ranking else None return jsonify({ "success": True, "message": "获取榜单统计成功", "data": { "total_rankings": total_rankings, "total_dates": total_dates, "total_types": total_types, "latest_date": latest_date, "earliest_date": earliest_date, "date_range": f"{earliest_date} 至 {latest_date}" if earliest_date and latest_date else "暂无数据" } }) except Exception as e: logging.error(f"获取榜单统计失败: {e}") return jsonify({"success": False, "message": f"获取榜单统计失败: {str(e)}"}) @rank_bp.route('/update_drama_info', methods=['POST']) def update_drama_info(): """更新短剧信息(支持双向同步)""" try: data = request.get_json() # 验证必需参数 if not data or 'mix_name' not in data: return jsonify({"success": False, "message": "缺少必需参数 mix_name"}) mix_name = data['mix_name'] target_date = data.get('target_date') # 可选参数,用于判断是否为今日数据 # 准备更新字段 update_fields = {} field_lock_updates = {} # 检查并添加需要更新的字段 if 'title' in data: update_fields['title'] = data['title'] if 'series_author' in data: update_fields['series_author'] = data['series_author'] if 'Manufacturing_Field' in data: update_fields['Manufacturing_Field'] = data['Manufacturing_Field'] # 标记制作方字段已被用户锁定 field_lock_updates['field_lock_status.Manufacturing_Field_locked'] = True if 'Copyright_field' in data: update_fields['Copyright_field'] = data['Copyright_field'] # 标记版权方字段已被用户锁定 field_lock_updates['field_lock_status.Copyright_field_locked'] = True if 'classification_type' in data: update_fields['classification_type'] = data['classification_type'] # 标记类型/元素字段已被用户锁定 field_lock_updates['field_lock_status.classification_type_locked'] = True if 'release_date' in data: update_fields['release_date'] = data['release_date'] # 标记上线日期字段已被用户锁定 field_lock_updates['field_lock_status.release_date_locked'] = True if 'desc' in data: update_fields['desc'] = data['desc'] if 'play_vv' in data: update_fields['play_vv'] = data['play_vv'] if 'cover_image_url' in data: update_fields['cover_image_url'] = data['cover_image_url'] if 'cover_backup_urls' in data: update_fields['cover_backup_urls'] = data['cover_backup_urls'] if 'timeline_data' in data: update_fields['timeline_data'] = data['timeline_data'] if 'comments_summary' in data: update_fields['comments_summary'] = data['comments_summary'] # 检查分类字段的锁定状态 if 'Novel_IDs' in data: update_fields['Novel_IDs'] = data['Novel_IDs'] field_lock_updates['field_lock_status.Novel_IDs_locked'] = True if 'Anime_IDs' in data: update_fields['Anime_IDs'] = data['Anime_IDs'] field_lock_updates['field_lock_status.Anime_IDs_locked'] = True if 'Drama_IDs' in data: update_fields['Drama_IDs'] = data['Drama_IDs'] field_lock_updates['field_lock_status.Drama_IDs_locked'] = True if not update_fields: return jsonify({"success": False, "message": "没有提供需要更新的字段"}) # 获取今天的日期 today = datetime.now().date().strftime('%Y-%m-%d') is_today_data = target_date == today if target_date else True updated_count = 0 # 首先检查短剧是否存在 existing_drama = rankings_management_collection.find_one({"mix_name": mix_name}) if not existing_drama: return jsonify({ "success": False, "message": f"未找到短剧: {mix_name}" }) # 1. 更新Rankings_management数据库 mgmt_update_data = update_fields.copy() mgmt_update_data.update(field_lock_updates) # 添加锁定状态更新 result_mgmt = rankings_management_collection.update_many( {"mix_name": mix_name}, {"$set": mgmt_update_data} ) # 2. 更新Ranking_storage数据库中的data数组 storage_update_data = {f"data.$.{field}": value for field, value in update_fields.items()} # 为Ranking_storage也添加锁定状态更新 for field, value in field_lock_updates.items(): storage_update_data[f"data.$.{field}"] = value result_storage = collection.update_many( {"data.mix_name": mix_name}, {"$set": storage_update_data} ) updated_count = result_mgmt.modified_count + result_storage.modified_count matched_count = result_mgmt.matched_count + result_storage.matched_count # 记录锁定状态更新 locked_fields = [] if field_lock_updates: for field_key in field_lock_updates.keys(): field_name = field_key.replace('field_lock_status.', '').replace('_locked', '') locked_fields.append(field_name) logging.info(f"数据更新: Rankings_management(匹配:{result_mgmt.matched_count}, 修改:{result_mgmt.modified_count}), Ranking_storage(匹配:{result_storage.matched_count}, 修改:{result_storage.modified_count})") if locked_fields: logging.info(f"字段锁定状态更新: {', '.join(locked_fields)} 已被标记为用户锁定") # 只要找到了数据就算成功,不管是否有修改 if matched_count > 0: message = f"成功处理短剧 {mix_name} 的信息" if updated_count > 0: message += f",已更新 {updated_count} 条记录" else: message += ",数据无变化" return jsonify({ "success": True, "message": message, "data": { "mix_name": mix_name, "updated_fields": list(update_fields.keys()), "updated_count": updated_count, "matched_count": matched_count, "is_today_data": is_today_data } }) else: return jsonify({ "success": False, "message": f"未找到短剧 {mix_name} 的相关数据" }) except Exception as e: logging.error(f"更新短剧信息失败: {e}") return jsonify({"success": False, "message": f"更新短剧信息失败: {str(e)}"}) @rank_bp.route('/update_content_classification', methods=['POST']) def update_content_classification(): """更新内容分类(支持将短剧ID添加到对应分类字段中)""" try: data = request.get_json() # 验证必需参数(支持 mix_id 或 mix_name 任一) if not data or ('mix_id' not in data and 'mix_name' not in data) or 'classification_type' not in data: return jsonify({"success": False, "message": "缺少必需参数:需要 mix_id 或 mix_name,以及 classification_type"}) mix_id_param = data.get('mix_id') mix_name = data.get('mix_name') classification_type = data['classification_type'] # 'novel', 'anime', 'drama' action = data.get('action', 'add') # 'add' 或 'remove' exclusive = data.get('exclusive', True) # 默认启用互斥模式,确保每个短剧只能属于一个分类 # 验证分类类型 valid_types = ['novel', 'anime', 'drama'] if classification_type not in valid_types: return jsonify({"success": False, "message": f"无效的分类类型,支持的类型: {valid_types}"}) # 映射分类类型到字段名 field_mapping = { 'novel': 'Novel_IDs', 'anime': 'Anime_IDs', 'drama': 'Drama_IDs' } field_name = field_mapping[classification_type] # 首先从Rankings_management获取短剧的mix_id,使用今天的日期 today = datetime.now().date() start_of_day = datetime.combine(today, datetime.min.time()) end_of_day = datetime.combine(today, datetime.max.time()) mgmt_doc = rankings_management_collection.find_one({ "mix_name": mix_name }) if not mgmt_doc: return jsonify({"success": False, "message": f"未找到短剧:{mix_name or mix_id_param}"}) mix_id = mgmt_doc.get('mix_id') if not mix_id: return jsonify({"success": False, "message": f"短剧 {mix_name or '[未知名称]'} 缺少 mix_id"}) updated_count = 0 # 根据操作类型更新数据 if action == 'add': # 如果启用互斥模式,先移除其他分类 if exclusive: # 获取其他分类字段名 other_fields = [f for f in field_mapping.values() if f != field_name] # 记录移除操作的结果 removed_from_other_categories = [] # 1. 从Rankings_management中移除其他分类 for other_field in other_fields: result = rankings_management_collection.update_many( {"mix_id": mix_id, other_field: mix_id}, {"$pull": {other_field: mix_id}} ) if result.modified_count > 0: # 找到对应的分类名称 for cat_type, field in field_mapping.items(): if field == other_field: removed_from_other_categories.append(cat_type) break # 2. 从Ranking_storage中移除其他分类 for other_field in other_fields: collection.update_many( {"data.mix_name": mix_name}, {"$pull": {f"data.$.{other_field}": mix_id}} ) if removed_from_other_categories: logging.info(f"互斥模式:已将短剧 {mix_name} 从 {', '.join(removed_from_other_categories)} 分类中移除") else: logging.info(f"互斥模式:短剧 {mix_name} 未在其他分类中,无需移除") # 添加到分类字段(使用$addToSet避免重复) # 1. 更新Rankings_management数据库 result_mgmt = rankings_management_collection.update_many( {"mix_id": mix_id}, {"$addToSet": {field_name: mix_id}} ) # 2. 更新Ranking_storage数据库中的data数组 result_storage = collection.update_many( {"data.mix_name": mix_name}, {"$addToSet": {f"data.$.{field_name}": mix_id}} ) updated_count = result_mgmt.modified_count + result_storage.modified_count message = f"成功将短剧 {mix_name} 添加到 {classification_type} 分类" if exclusive and removed_from_other_categories: message += f"(已自动从 {', '.join(removed_from_other_categories)} 分类中移除)" elif action == 'remove': # 从分类字段中移除 # 1. 更新Rankings_management数据库 result_mgmt = rankings_management_collection.update_many( {"mix_id": mix_id}, {"$pull": {field_name: mix_id}} ) # 2. 更新Ranking_storage数据库中的data数组 result_storage = collection.update_many( {"data.mix_name": mix_name}, {"$pull": {f"data.$.{field_name}": mix_id}} ) updated_count = result_mgmt.modified_count + result_storage.modified_count message = f"成功将短剧 {mix_name} 从 {classification_type} 分类中移除" else: return jsonify({"success": False, "message": "无效的操作类型,支持 'add' 或 'remove'"}) logging.info(f"分类更新: {message}, Rankings_management({result_mgmt.modified_count}), Ranking_storage({result_storage.modified_count})") # 获取更新后的分类状态(按 mix_id 直接查询,不做日期过滤) updated_mgmt_doc = rankings_management_collection.find_one({"mix_id": mix_id}) classification_status = { 'novel': mix_id in updated_mgmt_doc.get('Novel_IDs', []) if updated_mgmt_doc else False, 'anime': mix_id in updated_mgmt_doc.get('Anime_IDs', []) if updated_mgmt_doc else False, 'drama': mix_id in updated_mgmt_doc.get('Drama_IDs', []) if updated_mgmt_doc else False } return jsonify({ "success": True, "message": message, "data": { "mix_name": mix_name, "mix_id": mix_id, "classification_type": classification_type, "field_name": field_name, "action": action, "updated_count": updated_count, "classification_status": classification_status } }) except Exception as e: logging.error(f"更新内容分类失败: {e}") return jsonify({"success": False, "message": f"更新内容分类失败: {str(e)}"}) @rank_bp.route('/get_content_classification', methods=['GET']) def get_content_classification(): """获取短剧的分类状态""" try: mix_id_param = request.args.get('mix_id') mix_name = request.args.get('mix_name') if not mix_id_param and not mix_name: return jsonify({"success": False, "message": "缺少必需参数:需要 mix_id 或 mix_name"}) # 优先使用 mix_id 获取管理库信息(不做日期过滤) mgmt_doc = find_management_data({'mix_id': mix_id_param, 'mix_name': mix_name}) if not mgmt_doc: return jsonify({"success": False, "message": f"未找到短剧:{mix_name or mix_id_param}"}) mix_id = mgmt_doc.get('mix_id') if not mix_id: return jsonify({"success": False, "message": f"短剧 {mix_name or '[未知名称]'} 缺少 mix_id"}) # 检查短剧在各个分类中的状态 novel_ids = mgmt_doc.get('Novel_IDs', []) anime_ids = mgmt_doc.get('Anime_IDs', []) drama_ids = mgmt_doc.get('Drama_IDs', []) classification_status = { 'novel': mix_id in novel_ids, 'anime': mix_id in anime_ids, 'drama': mix_id in drama_ids } return jsonify({ "success": True, "message": f"获取短剧 {mgmt_doc.get('mix_name', mix_name)} 分类状态成功", "data": { "mix_name": mgmt_doc.get('mix_name', mix_name), "mix_id": mix_id, "classification_status": classification_status, "classification_details": { "Novel_IDs": novel_ids, "Anime_IDs": anime_ids, "Drama_IDs": drama_ids } } }) except Exception as e: logging.error(f"获取内容分类状态失败: {e}") return jsonify({"success": False, "message": f"获取内容分类状态失败: {str(e)}"}) def validate_and_fix_classification_exclusivity(): """ 验证和修复数据库中的分类互斥性 确保每个短剧只属于一个分类(Novel_IDs、Anime_IDs、Drama_IDs) Returns: dict: 修复结果统计 """ try: # 获取所有Rankings_management数据 all_docs = list(rankings_management_collection.find({})) fixed_count = 0 conflict_count = 0 for doc in all_docs: mix_name = doc.get('mix_name', '') mix_id = doc.get('mix_id') if not mix_id: continue # 检查分类字段 novel_ids = doc.get('Novel_IDs', []) anime_ids = doc.get('Anime_IDs', []) drama_ids = doc.get('Drama_IDs', []) # 统计该mix_id在多少个分类中出现 classifications = [] if mix_id in novel_ids: classifications.append('novel') if mix_id in anime_ids: classifications.append('anime') if mix_id in drama_ids: classifications.append('drama') # 如果出现在多个分类中,需要修复 if len(classifications) > 1: conflict_count += 1 logging.warning(f"发现分类冲突: {mix_name} 同时属于 {classifications}") # 保留最后一个分类,移除其他分类 # 优先级:drama > anime > novel if 'drama' in classifications: keep_classification = 'drama' elif 'anime' in classifications: keep_classification = 'anime' else: keep_classification = 'novel' # 更新数据库 update_fields = {} if keep_classification == 'novel': update_fields['Novel_IDs'] = novel_ids update_fields['Anime_IDs'] = [id for id in anime_ids if id != mix_id] update_fields['Drama_IDs'] = [id for id in drama_ids if id != mix_id] elif keep_classification == 'anime': update_fields['Novel_IDs'] = [id for id in novel_ids if id != mix_id] update_fields['Anime_IDs'] = anime_ids update_fields['Drama_IDs'] = [id for id in drama_ids if id != mix_id] elif keep_classification == 'drama': update_fields['Novel_IDs'] = [id for id in novel_ids if id != mix_id] update_fields['Anime_IDs'] = [id for id in anime_ids if id != mix_id] update_fields['Drama_IDs'] = drama_ids # 更新Rankings_management - 优先使用mix_id if mix_id: rankings_management_collection.update_one( {"mix_id": mix_id}, {"$set": update_fields} ) else: rankings_management_collection.update_one( {"mix_name": mix_name}, {"$set": update_fields} ) # 更新Ranking_storage - 优先使用mix_id if mix_id: collection.update_many( {"data.mix_id": mix_id}, {"$set": { f"data.$.Novel_IDs": update_fields['Novel_IDs'], f"data.$.Anime_IDs": update_fields['Anime_IDs'], f"data.$.Drama_IDs": update_fields['Drama_IDs'] }} ) else: collection.update_many( {"data.mix_name": mix_name}, {"$set": { f"data.$.Novel_IDs": update_fields['Novel_IDs'], f"data.$.Anime_IDs": update_fields['Anime_IDs'], f"data.$.Drama_IDs": update_fields['Drama_IDs'] }} ) fixed_count += 1 logging.info(f"修复分类冲突: {mix_name} 保留为 {keep_classification} 分类") return { "success": True, "message": f"分类互斥性验证完成", "data": { "total_checked": len(all_docs), "conflicts_found": conflict_count, "conflicts_fixed": fixed_count } } except Exception as e: logging.error(f"验证分类互斥性失败: {e}") return { "success": False, "message": f"验证分类互斥性失败: {str(e)}" } def sync_ranking_storage_fields(target_date=None, force_update=False, max_retries=3, retry_delay=60): """ 同步Ranking_storage中的字段信息 统一从Rankings_management中获取对应的字段值并保存到Ranking_storage Args: target_date: 目标日期,格式为'YYYY-MM-DD',默认为今天 force_update: 是否强制更新已有字段,默认False max_retries: 最大重试次数,默认3次 retry_delay: 重试间隔(秒),默认60秒 Returns: dict: 同步结果统计 """ try: # 设置目标日期 if target_date is None: target_date_obj = datetime.now().date() target_date = target_date_obj.strftime('%Y-%m-%d') else: target_date_obj = datetime.strptime(target_date, '%Y-%m-%d').date() # 获取Ranking_storage中指定日期的数据 ranking_storage_query = {"date": target_date} ranking_storage_items = list(collection.find(ranking_storage_query)) if not ranking_storage_items: return { "success": False, "message": f"未找到日期 {target_date} 的Ranking_storage数据" } # 统计信息 total_items = len(ranking_storage_items) updated_items = 0 skipped_items = 0 error_items = 0 retry_count = 0 # 重试次数计数器 pending_items = [] # 需要重试的项目 # 🔄 修复后的同步逻辑:更新data数组中的每个项目 for ranking_doc in ranking_storage_items: try: # 获取data数组 data_array = ranking_doc.get('data', []) if not data_array: logging.warning(f"Ranking_storage文档没有data数组: {ranking_doc.get('_id')}") skipped_items += 1 continue # 标记是否有任何项目被更新 doc_updated = False updated_data_array = [] # 遍历data数组中的每个项目 for data_item in data_array: try: mix_name = data_item.get('mix_name', '').strip() # 🚫 跳过无效数据:确保mix_name不为空 if not mix_name or mix_name == "" or mix_name.lower() == "null": logging.warning(f"跳过空的或无效的mix_name记录: {data_item.get('_id', 'unknown')}") continue # 不添加到updated_data_array,直接跳过 # 🔧 优化逻辑:优先使用mix_id进行查询,提高准确性 source_data = None mix_id = data_item.get('mix_id') # 使用通用查询函数,优先mix_id查询 query_conditions = {} if mix_id: query_conditions['mix_id'] = mix_id if mix_name: query_conditions['mix_name'] = mix_name # 使用find_management_data函数进行查询 if query_conditions: source_data = find_management_data(query_conditions, target_date) # 如果还是没找到,尝试通过title匹配 if not source_data: title = data_item.get('title') if title and title.strip(): title_query = {"mix_name": title.strip()} source_data = find_management_data(title_query, target_date) if source_data: logging.info(f"通过title找到数据: {title} -> {source_data.get('mix_name', 'N/A')}") # 如果找到了源数据,更新mix_name(如果原来为空的话) if source_data and not mix_name: mix_name = source_data.get('mix_name', '').strip() if mix_name: data_item['mix_name'] = mix_name logging.info(f"修复空的mix_name: {data_item.get('title', 'N/A')} -> {mix_name}") else: logging.warning(f"源数据中的mix_name也为空,跳过此记录") continue # 跳过无效记录 # 如果还是没有找到源数据,检查是否有锁定字段需要保护 if not source_data: logging.warning(f"无法找到对应的源数据: mix_name={mix_name}, mix_id={data_item.get('mix_id')}, title={data_item.get('title')}") # 检查是否有锁定字段,如果有锁定字段,保持原数据不变(从 data_item 获取) field_lock_status = data_item.get('field_lock_status', {}) has_locked_fields = any([ field_lock_status.get('Manufacturing_Field_locked', False), field_lock_status.get('Copyright_field_locked', False), field_lock_status.get('classification_type_locked', False), # 新增 field_lock_status.get('release_date_locked', False), # 新增 field_lock_status.get('Novel_IDs_locked', False), field_lock_status.get('Anime_IDs_locked', False), field_lock_status.get('Drama_IDs_locked', False) ]) # 检查是否有用户设置的数据(锁定字段或分类数据) has_user_data = has_locked_fields or any([ data_item.get('Manufacturing_Field'), data_item.get('Copyright_field'), data_item.get('classification_type'), # 新增 data_item.get('release_date'), # 新增 data_item.get('Novel_IDs'), data_item.get('Anime_IDs'), data_item.get('Drama_IDs') ]) if has_locked_fields: logging.info(f"保持锁定字段不变: {mix_name} (无源数据但有锁定字段)") updated_data_array.append(data_item) elif has_user_data: logging.info(f"保持用户设置的数据: {mix_name} (无源数据但有用户数据)") updated_data_array.append(data_item) else: # 只有当mix_name有效时才保留记录 if mix_name and mix_name.strip(): updated_data_array.append(data_item) continue # 检查是否需要更新 - 包含所有Rankings_management字段 fields_to_check = { # 基础字段 'batch_id': data_item.get('batch_id'), 'batch_time': data_item.get('batch_time'), 'item_sequence': data_item.get('item_sequence'), 'mix_id': data_item.get('mix_id'), 'playcount': data_item.get('playcount'), 'request_id': data_item.get('request_id'), # 封面相关字段 'cover_image_url_original': data_item.get('cover_image_url_original'), 'cover_upload_success': data_item.get('cover_upload_success'), 'cover_backup_urls': data_item.get('cover_backup_urls'), # 内容字段 'desc': data_item.get('desc'), 'series_author': data_item.get('series_author'), 'updated_to_episode': data_item.get('updated_to_episode'), 'episode_video_ids': data_item.get('episode_video_ids'), 'episode_details': data_item.get('episode_details'), # 状态字段 'data_status': data_item.get('data_status'), 'realtime_saved': data_item.get('realtime_saved'), 'created_at': data_item.get('created_at'), 'last_updated': data_item.get('last_updated'), 'Manufacturing_Field': data_item.get('Manufacturing_Field'), 'Copyright_field': data_item.get('Copyright_field'), 'classification_type': data_item.get('classification_type', ''), # 新增:类型/元素 'release_date': data_item.get('release_date', ''), # 新增:上线日期 # 新增:内容分类字段 'Novel_IDs': data_item.get('Novel_IDs', []), 'Anime_IDs': data_item.get('Anime_IDs', []), 'Drama_IDs': data_item.get('Drama_IDs', []), # 评论总结字段 'comments_summary': data_item.get('comments_summary', ''), # 计算字段 } # 🔒 检查字段锁定状态(从 data_item 获取,而不是 ranking_doc) field_lock_status = data_item.get('field_lock_status', {}) manufacturing_locked = field_lock_status.get('Manufacturing_Field_locked', False) copyright_locked = field_lock_status.get('Copyright_field_locked', False) novel_ids_locked = field_lock_status.get('Novel_IDs_locked', False) anime_ids_locked = field_lock_status.get('Anime_IDs_locked', False) drama_ids_locked = field_lock_status.get('Drama_IDs_locked', False) # 检查哪些字段需要更新(检查目标数据是否缺少字段) needs_update = False for field_name, source_field_value in fields_to_check.items(): # 🔒 字段锁定保护:如果字段已锁定,跳过更新 if field_name == 'Manufacturing_Field' and manufacturing_locked: continue elif field_name == 'Copyright_field' and copyright_locked: continue elif field_name == 'Novel_IDs' and novel_ids_locked: continue elif field_name == 'Anime_IDs' and anime_ids_locked: continue elif field_name == 'Drama_IDs' and drama_ids_locked: continue # 🔑 关键修复:检查目标数据(data_item)中的字段值,而不是源数据 current_value = data_item.get(field_name) # 对于数组字段,检查是否为空数组 if field_name in ['cover_backup_urls', 'episode_video_ids', 'episode_details', 'Novel_IDs', 'Anime_IDs', 'Drama_IDs']: if force_update or current_value is None or (isinstance(current_value, list) and len(current_value) == 0): needs_update = True break # 对于其他字段,检查目标数据是否缺少或为空 elif force_update or current_value is None or current_value == '': needs_update = True break if not needs_update: updated_data_array.append(data_item) continue # 从源数据获取字段值并更新data_item item_updated = False for field_name, source_field_value in fields_to_check.items(): # 🔒 字段锁定保护:如果字段已锁定,跳过更新 if field_name == 'Manufacturing_Field' and manufacturing_locked: logging.info(f"[字段锁定] 保护Manufacturing_Field不被覆盖: {mix_name}") continue elif field_name == 'Copyright_field' and copyright_locked: logging.info(f"[字段锁定] 保护Copyright_field不被覆盖: {mix_name}") continue elif field_name == 'Novel_IDs' and novel_ids_locked: logging.info(f"[字段锁定] 保护Novel_IDs不被覆盖: {mix_name}") continue elif field_name == 'Anime_IDs' and anime_ids_locked: logging.info(f"[字段锁定] 保护Anime_IDs不被覆盖: {mix_name}") continue elif field_name == 'Drama_IDs' and drama_ids_locked: logging.info(f"[字段锁定] 保护Drama_IDs不被覆盖: {mix_name}") continue # 🔑 关键修复:检查目标数据(data_item)中的字段值 current_value = data_item.get(field_name) # 对于数组字段,检查是否为空数组 should_update = False if field_name in ['cover_backup_urls', 'episode_video_ids', 'episode_details', 'Novel_IDs', 'Anime_IDs', 'Drama_IDs']: should_update = force_update or current_value is None or (isinstance(current_value, list) and len(current_value) == 0) else: should_update = force_update or current_value is None or current_value == '' if should_update: if field_name == 'episode_details': # 特殊处理episode_details字段,直接从源数据复制 data_item[field_name] = source_data.get(field_name, []) item_updated = True elif field_name == 'cover_backup_urls': # 特殊处理cover_backup_urls字段,确保是数组格式 cover_backup_urls = source_data.get(field_name, []) if not isinstance(cover_backup_urls, list): cover_backup_urls = [] data_item[field_name] = cover_backup_urls item_updated = True elif field_name == 'episode_video_ids': # 特殊处理episode_video_ids字段,确保是数组格式 episode_video_ids = source_data.get(field_name, []) if not isinstance(episode_video_ids, list): episode_video_ids = [] data_item[field_name] = episode_video_ids item_updated = True elif field_name in ['Novel_IDs', 'Anime_IDs', 'Drama_IDs']: # 特殊处理分类字段,确保是数组格式和互斥性 classification_ids = source_data.get(field_name, []) if not isinstance(classification_ids, list): classification_ids = [] # 🔑 关键修复:只有当源数据有值时才更新,否则保留用户设置 if classification_ids: # 源数据有值,更新分类字段 # 确保分类互斥性:如果当前字段有值,清空其他分类字段(但要检查锁定状态) if field_name == 'Novel_IDs': # 只有在其他字段未锁定时才清空 if not anime_ids_locked: data_item['Anime_IDs'] = [] if not drama_ids_locked: data_item['Drama_IDs'] = [] elif field_name == 'Anime_IDs': if not novel_ids_locked: data_item['Novel_IDs'] = [] if not drama_ids_locked: data_item['Drama_IDs'] = [] elif field_name == 'Drama_IDs': if not novel_ids_locked: data_item['Novel_IDs'] = [] if not anime_ids_locked: data_item['Anime_IDs'] = [] data_item[field_name] = classification_ids item_updated = True else: # 源数据为空,检查当前是否有用户设置的值 current_classification = data_item.get(field_name, []) if current_classification and isinstance(current_classification, list) and len(current_classification) > 0: # 用户已设置分类,保留不变 logging.info(f"[分类保护] 保留用户设置的 {field_name}: {mix_name}") else: # 当前也没有值,设置为空数组 data_item[field_name] = [] item_updated = True elif field_name == 'comments_summary': # 🎬 特殊处理评论总结字段:只有源数据有值时才更新,保护已有的总结 source_value = source_data.get(field_name, '') if source_value: # 只有当源数据有评论总结时才更新 data_item[field_name] = source_value item_updated = True logging.info(f"[评论总结] 更新评论总结: {mix_name}") else: # 源数据没有总结,保留当前值(不覆盖) logging.debug(f"[评论总结] 保留现有评论总结: {mix_name}") else: # 对于其他字段,直接从源数据获取 source_value = source_data.get(field_name, '') data_item[field_name] = source_value item_updated = True # 🔒 保护重要字段:确保不覆盖播放量差值等关键数据 # timeline_data字段必须保留 # 保护其他重要的计算字段 protected_fields = ['rank', 'play_vv', 'video_id', 'video_url', 'cover_image_url', 'playcount_str', 'timeline_data'] # 这些字段不会被覆盖,因为它们不在fields_to_check中 if item_updated: doc_updated = True logging.info(f"✅ 成功同步data项目字段: {mix_name}") updated_data_array.append(data_item) except Exception as e: logging.error(f"同步data项目失败 {data_item.get('mix_name', 'N/A')}: {e}") # 保持原数据不变 updated_data_array.append(data_item) continue # 如果有任何项目被更新,更新整个文档的data数组 if doc_updated: collection.update_one( {"_id": ranking_doc["_id"]}, {"$set": {"data": updated_data_array}} ) updated_items += 1 logging.info(f"✅ 成功更新Ranking_storage文档的data数组: {ranking_doc.get('date', 'N/A')}") else: skipped_items += 1 except Exception as e: logging.error(f"同步Ranking_storage文档失败 {ranking_doc.get('_id')}: {e}") error_items += 1 continue # 新的同步逻辑已经直接处理data数组,不需要重试机制 return { "success": True, "message": f"同步完成(重试 {retry_count} 次)", "stats": { "target_date": target_date, "total_items": total_items, "updated_items": updated_items, "skipped_items": skipped_items, "error_items": error_items, "retry_count": retry_count, "pending_items_final": len(pending_items), "data_source": "Rankings_management" } } except Exception as e: logging.error(f"同步Ranking_storage字段失败: {e}") return { "success": False, "message": f"同步失败: {str(e)}" } @rank_bp.route('/sync_ranking_fields', methods=['POST']) def sync_ranking_fields(): """ API端点:同步Ranking_storage字段 """ try: data = request.get_json() or {} target_date = data.get('target_date') force_update = data.get('force_update', False) result = sync_ranking_storage_fields(target_date, force_update) if result["success"]: return jsonify(result) else: return jsonify(result), 400 except Exception as e: logging.error(f"同步API调用失败: {e}") return jsonify({ "success": False, "message": f"API调用失败: {str(e)}" }), 500 @rank_bp.route('/validate_classification_exclusivity', methods=['POST']) def validate_classification_exclusivity_api(): """ API端点:验证和修复分类互斥性 确保每个短剧只属于一个分类(Novel_IDs、Anime_IDs、Drama_IDs) """ try: result = validate_and_fix_classification_exclusivity() if result["success"]: return jsonify(result) else: return jsonify(result), 400 except Exception as e: logging.error(f"验证分类互斥性API失败: {e}") return jsonify({ "success": False, "message": f"验证分类互斥性失败: {str(e)}" }), 500 @rank_bp.route('/get_comments_summary', methods=['GET']) def get_comments_summary(): """获取短剧的评论总结(优先使用 mix_id)""" try: mix_id = request.args.get('mix_id') mix_name = request.args.get('mix_name') date_str = request.args.get('date') if not mix_id and not mix_name: return jsonify({"success": False, "message": "缺少必需参数 mix_id 或 mix_name"}) if not date_str: from datetime import date date_str = date.today().strftime('%Y-%m-%d') # 从 Ranking_storage 获取榜单数据 ranking_doc = collection.find_one({ "date": date_str, "type": "comprehensive" }, sort=[("created_at", -1)]) if not ranking_doc: return jsonify({ "success": False, "message": f"未找到 {date_str} 的榜单数据" }) # 在 data 数组中查找短剧(优先使用 mix_id) data_items = ranking_doc.get("data", []) drama_item = None for item in data_items: # 优先使用 mix_id 匹配 if mix_id and item.get("mix_id") == mix_id: drama_item = item break # 备用:使用 mix_name 匹配 elif mix_name and item.get("mix_name") == mix_name: drama_item = item # 继续查找,看是否有 mix_id 匹配的 if not drama_item: return jsonify({ "success": False, "message": f"未找到短剧: {mix_name or mix_id}" }) comments_summary = drama_item.get("comments_summary", "") if not comments_summary: return jsonify({ "success": False, "message": "该短剧暂无评论总结" }) return jsonify({ "success": True, "data": { "mix_id": drama_item.get("mix_id"), "mix_name": drama_item.get("mix_name"), "date": date_str, "comments_summary": comments_summary } }) except Exception as e: logging.error(f"获取评论总结失败: {e}") return jsonify({ "success": False, "message": f"获取评论总结失败: {str(e)}" }), 500 @rank_bp.route('/clear_comments_summary', methods=['POST']) def clear_comments_summary(): """清空短剧的评论总结(优先使用 mix_id)""" try: data = request.get_json() mix_id = data.get('mix_id') mix_name = data.get('mix_name') date_str = data.get('date') if not mix_id and not mix_name: return jsonify({"success": False, "message": "缺少必需参数 mix_id 或 mix_name"}) if not date_str: from datetime import date date_str = date.today().strftime('%Y-%m-%d') # 从 Ranking_storage 获取榜单数据 ranking_doc = collection.find_one({ "date": date_str, "type": "comprehensive" }, sort=[("created_at", -1)]) if not ranking_doc: return jsonify({ "success": False, "message": f"未找到 {date_str} 的榜单数据" }) # 在 data 数组中查找短剧并获取 mix_id data_items = ranking_doc.get("data", []) target_mix_id = None target_mix_name = None for item in data_items: if mix_id and item.get("mix_id") == mix_id: target_mix_id = item.get("mix_id") target_mix_name = item.get("mix_name") break elif mix_name and item.get("mix_name") == mix_name: target_mix_id = item.get("mix_id") target_mix_name = item.get("mix_name") if not target_mix_id and not target_mix_name: return jsonify({ "success": False, "message": f"未找到短剧: {mix_name or mix_id}" }) # 清空评论总结字段(优先使用 mix_id) if target_mix_id: result = collection.update_many( { "date": date_str, "type": "comprehensive", "data.mix_id": target_mix_id }, { "$set": { "data.$[elem].comments_summary": "" } }, array_filters=[{"elem.mix_id": target_mix_id}] ) else: # 备用:使用 mix_name result = collection.update_many( { "date": date_str, "type": "comprehensive", "data.mix_name": target_mix_name }, { "$set": { "data.$[elem].comments_summary": "" } }, array_filters=[{"elem.mix_name": target_mix_name}] ) # 同时清空 Rankings_management 中的评论总结 management_result = None if target_mix_id: management_result = rankings_management_collection.update_one( {"mix_id": target_mix_id}, {"$set": {"comments_summary": ""}} ) elif target_mix_name: management_result = rankings_management_collection.update_one( {"mix_name": target_mix_name}, {"$set": {"comments_summary": ""}} ) if result.modified_count > 0 or (management_result and management_result.modified_count > 0): return jsonify({ "success": True, "message": f"已清空短剧 {target_mix_name} 的评论总结(Ranking_storage: {result.modified_count}, Rankings_management: {management_result.modified_count if management_result else 0})", "modified_count": result.modified_count }) else: return jsonify({ "success": False, "message": "未找到需要清空的评论总结" }) except Exception as e: logging.error(f"清空评论总结失败: {e}") return jsonify({ "success": False, "message": f"清空评论总结失败: {str(e)}" }), 500 @rank_bp.route('/drama/') def get_drama_detail_by_id(drama_id): """ 根据短剧ID获取详细信息(用于详情页) 支持通过 mix_id 或 _id 查询 """ try: # 获取日期参数(可选) date_str = request.args.get('date') if not date_str: date_str = datetime.now().date().strftime("%Y-%m-%d") # 首先尝试从 Ranking_storage 中查找 ranking_doc = collection.find_one({ "date": date_str, "type": "comprehensive" }, sort=[("calculation_sequence", -1)]) drama_data = None if ranking_doc and "data" in ranking_doc: # 在 data 数组中查找匹配的短剧 for item in ranking_doc.get("data", []): if item.get("mix_id") == drama_id or str(item.get("_id")) == drama_id: drama_data = item break # 如果在 Ranking_storage 中没找到,尝试从 Rankings_management 查找 if not drama_data: from bson import ObjectId try: mgmt_doc = rankings_management_collection.find_one({"mix_id": drama_id}) if not mgmt_doc: mgmt_doc = rankings_management_collection.find_one({"_id": ObjectId(drama_id)}) if mgmt_doc: drama_data = mgmt_doc except: pass if not drama_data: return jsonify({ "success": False, "message": f"未找到短剧: {drama_id}" }) # 格式化数据(format_mix_item已经包含了所有新字段) formatted_data = format_mix_item(drama_data, date_str) return jsonify({ "success": True, "data": formatted_data, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }) except Exception as e: logging.error(f"获取短剧详情失败: {e}") return jsonify({ "success": False, "message": f"获取短剧详情失败: {str(e)}" }) def upload_certification_file(file): """ 上传认领证明文件到TOS Args: file: 上传的文件对象 Returns: str: TOS永久链接URL """ try: # 获取文件扩展名 filename = secure_filename(file.filename) file_extension = '' if '.' in filename: file_extension = '.' + filename.rsplit('.', 1)[1].lower() # 验证文件类型 allowed_image_extensions = ['.jpg', '.jpeg', '.png', '.gif'] allowed_doc_extensions = ['.pdf', '.doc', '.docx'] if file_extension not in allowed_image_extensions + allowed_doc_extensions: raise ValueError(f"不支持的文件类型: {file_extension}") # 验证文件大小 file.seek(0, 2) # 移动到文件末尾 file_size = file.tell() # 获取文件大小 file.seek(0) # 重置文件指针 max_size = 10 * 1024 * 1024 # 10MB for images if file_extension in allowed_doc_extensions: max_size = 20 * 1024 * 1024 # 20MB for documents if file_size > max_size: raise ValueError(f"文件大小超过限制: {file_size / 1024 / 1024:.2f}MB") # 生成唯一文件名 random_filename = f"{uuid.uuid4().hex}{file_extension}" object_key = f"media/rank/Certification/{random_filename}" # 上传到TOS tos_url = oss_client.upload_bytes( data=file.read(), object_key=object_key, content_type=file.content_type or 'application/octet-stream', return_url=True ) logging.info(f"文件上传成功: {filename} -> {tos_url}") return tos_url except Exception as e: logging.error(f"文件上传失败: {str(e)}") raise @rank_bp.route('/claim', methods=['POST']) def submit_claim(): """ 提交认领申请(新版本:上传文件到TOS并创建待审核申请) """ try: # 获取表单数据 drama_id = request.form.get('drama_id') field_type = request.form.get('field_type') # 'copyright' 或 'manufacturing' company_name = request.form.get('company_name') description = request.form.get('description', '') # 验证必填字段 if not all([drama_id, field_type, company_name]): return jsonify({ "success": False, "message": "缺少必填字段" }), 400 # 验证字段类型 if field_type not in ['copyright', 'manufacturing']: return jsonify({ "success": False, "message": "无效的字段类型" }), 400 # 获取短剧信息 drama_info = rankings_management_collection.find_one({"mix_id": drama_id}) if not drama_info: return jsonify({ "success": False, "message": "未找到对应的短剧" }), 404 drama_name = drama_info.get('mix_name', '未知短剧') # 处理上传的文件并上传到TOS uploaded_files = request.files.getlist('files') tos_file_urls = [] if uploaded_files: for file in uploaded_files: if file and file.filename: try: tos_url = upload_certification_file(file) tos_file_urls.append(tos_url) except ValueError as ve: return jsonify({ "success": False, "message": str(ve) }), 400 except Exception as e: return jsonify({ "success": False, "message": f"文件上传失败: {str(e)}" }), 500 if not tos_file_urls: return jsonify({ "success": False, "message": "请至少上传一个证明文件" }), 400 # 检查是否存在该短剧+该字段类型的待审核申请 existing_application = claim_applications_collection.find_one({ "drama_id": drama_id, "field_type": field_type, "status": "pending" }) # 如果存在待审核申请,删除旧的(但保留TOS文件) if existing_application: claim_applications_collection.delete_one({"_id": existing_application["_id"]}) logging.info(f"删除旧的待审核申请: {existing_application.get('application_id')}") # 创建新的申请记录 application_id = str(uuid.uuid4()) application_data = { "application_id": application_id, "drama_id": drama_id, "drama_name": drama_name, "field_type": field_type, "company_name": company_name, "description": description, "tos_file_urls": tos_file_urls, "status": "pending", "submit_time": datetime.now(), "review_time": None, "reviewer": None, "reject_reason": None } claim_applications_collection.insert_one(application_data) logging.info(f"认领申请创建成功: application_id={application_id}, drama_id={drama_id}, field_type={field_type}") return jsonify({ "success": True, "message": "认领申请提交成功,等待管理员审核", "data": { "application_id": application_id, "drama_id": drama_id, "field_type": field_type, "company_name": company_name, "file_count": len(tos_file_urls) } }) except Exception as e: logging.error(f"提交认领申请失败: {e}") return jsonify({ "success": False, "message": f"提交认领申请失败: {str(e)}" }), 500 # 获取申请列表 @rank_bp.route('/claim/applications', methods=['GET']) def get_claim_applications(): """ 获取认领申请列表 支持筛选和分页 """ try: # 获取查询参数 status = request.args.get('status', 'all') # all/pending/approved/rejected page = int(request.args.get('page', 1)) limit = int(request.args.get('limit', 20)) # 构建查询条件 query = {} if status != 'all': query['status'] = status # 查询总数 total = claim_applications_collection.count_documents(query) # 查询数据(按提交时间倒序) applications = list(claim_applications_collection.find(query) .sort('submit_time', -1) .skip((page - 1) * limit) .limit(limit)) # 格式化数据 formatted_applications = [] for app in applications: formatted_applications.append({ "application_id": app.get('application_id'), "drama_id": app.get('drama_id'), "drama_name": app.get('drama_name'), "field_type": app.get('field_type'), "field_type_label": "版权方" if app.get('field_type') == 'copyright' else "承制方", "company_name": app.get('company_name'), "status": app.get('status'), "status_label": { "pending": "待审核", "approved": "已通过", "rejected": "已拒绝" }.get(app.get('status'), "未知"), "submit_time": app.get('submit_time').strftime("%Y-%m-%d %H:%M:%S") if app.get('submit_time') else "", "file_count": len(app.get('tos_file_urls', [])) }) return jsonify({ "success": True, "data": formatted_applications, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit } }) except Exception as e: logging.error(f"获取申请列表失败: {e}") return jsonify({ "success": False, "message": f"获取申请列表失败: {str(e)}" }), 500 # 获取申请详情 @rank_bp.route('/claim/application/', methods=['GET']) def get_claim_application_detail(application_id): """ 获取认领申请详情 """ try: application = claim_applications_collection.find_one({"application_id": application_id}) if not application: return jsonify({ "success": False, "message": "申请不存在" }), 404 # 格式化数据 formatted_data = { "application_id": application.get('application_id'), "drama_id": application.get('drama_id'), "drama_name": application.get('drama_name'), "field_type": application.get('field_type'), "field_type_label": "版权方" if application.get('field_type') == 'copyright' else "承制方", "company_name": application.get('company_name'), "description": application.get('description', ''), "tos_file_urls": application.get('tos_file_urls', []), "status": application.get('status'), "status_label": { "pending": "待审核", "approved": "已通过", "rejected": "已拒绝" }.get(application.get('status'), "未知"), "submit_time": application.get('submit_time').strftime("%Y-%m-%d %H:%M:%S") if application.get('submit_time') else "", "review_time": application.get('review_time').strftime("%Y-%m-%d %H:%M:%S") if application.get('review_time') else None, "reviewer": application.get('reviewer'), "reject_reason": application.get('reject_reason') } return jsonify({ "success": True, "data": formatted_data }) except Exception as e: logging.error(f"获取申请详情失败: {e}") return jsonify({ "success": False, "message": f"获取申请详情失败: {str(e)}" }), 500 # 审核申请 @rank_bp.route('/claim/review', methods=['POST']) def review_claim_application(): """ 审核认领申请 """ try: data = request.get_json() application_id = data.get('application_id') action = data.get('action') # 'approve' 或 'reject' reject_reason = data.get('reject_reason', '') reviewer = data.get('reviewer', 'admin') # 审核人 # 验证参数 if not application_id or not action: return jsonify({ "success": False, "message": "缺少必填参数" }), 400 if action not in ['approve', 'reject']: return jsonify({ "success": False, "message": "无效的操作类型" }), 400 if action == 'reject' and not reject_reason: return jsonify({ "success": False, "message": "拒绝时必须填写理由" }), 400 # 查找申请 application = claim_applications_collection.find_one({"application_id": application_id}) if not application: return jsonify({ "success": False, "message": "申请不存在" }), 404 if application.get('status') != 'pending': return jsonify({ "success": False, "message": "该申请已经被审核过了" }), 400 # 执行审核操作 if action == 'approve': # 通过:更新短剧字段并锁定 drama_id = application.get('drama_id') field_type = application.get('field_type') company_name = application.get('company_name') description = application.get('description', '') tos_file_urls = application.get('tos_file_urls', []) field_name = 'Copyright_field' if field_type == 'copyright' else 'Manufacturing_field' # 更新 Rankings_management 数据库 update_data = { field_name: company_name, f"{field_name}_claim_description": description, f"{field_name}_claim_images": tos_file_urls, f"{field_name}_claim_time": datetime.now(), "last_updated": datetime.now() } # 设置锁定状态 lock_status_update = { f"field_lock_status.{field_name}": True, f"field_lock_status.{field_name}_claim_description": True, f"field_lock_status.{field_name}_claim_images": True, f"field_lock_status.{field_name}_claim_time": True } update_data.update(lock_status_update) rankings_management_collection.update_one( {"mix_id": drama_id}, {"$set": update_data} ) # 同步更新 Ranking_storage 数据库 ranking_storage_update = { f"data.$[elem].{field_name}": company_name, f"data.$[elem].{field_name}_claim_description": description, f"data.$[elem].{field_name}_claim_images": tos_file_urls, f"data.$[elem].{field_name}_claim_time": datetime.now(), f"data.$[elem].field_lock_status.{field_name}": True, f"data.$[elem].field_lock_status.{field_name}_claim_description": True, f"data.$[elem].field_lock_status.{field_name}_claim_images": True, f"data.$[elem].field_lock_status.{field_name}_claim_time": True } collection.update_many( {"data.mix_id": drama_id}, {"$set": ranking_storage_update}, array_filters=[{"elem.mix_id": drama_id}] ) # 更新申请状态 claim_applications_collection.update_one( {"application_id": application_id}, {"$set": { "status": "approved", "review_time": datetime.now(), "reviewer": reviewer }} ) logging.info(f"认领申请审核通过: application_id={application_id}, drama_id={drama_id}") return jsonify({ "success": True, "message": "申请已通过,短剧信息已更新" }) else: # reject # 拒绝:只更新申请状态 claim_applications_collection.update_one( {"application_id": application_id}, {"$set": { "status": "rejected", "review_time": datetime.now(), "reviewer": reviewer, "reject_reason": reject_reason }} ) logging.info(f"认领申请已拒绝: application_id={application_id}, reason={reject_reason}") return jsonify({ "success": True, "message": "申请已拒绝" }) except Exception as e: logging.error(f"审核申请失败: {e}") return jsonify({ "success": False, "message": f"审核申请失败: {str(e)}" }), 500 # 获取待审核数量 @rank_bp.route('/claim/pending-count', methods=['GET']) def get_pending_claim_count(): """ 获取待审核的认领申请数量 """ try: count = claim_applications_collection.count_documents({"status": "pending"}) return jsonify({ "success": True, "count": count }) except Exception as e: logging.error(f"获取待审核数量失败: {e}") return jsonify({ "success": False, "message": f"获取待审核数量失败: {str(e)}" }), 500 # ==================== 文章相关API ==================== # 获取数据库集合 articles_collection = db['articles'] def format_article_item(doc): """格式化文章数据项""" return { "_id": str(doc.get("_id", "")), "title": doc.get("title", ""), "author_id": doc.get("author_id", ""), "cover_image": doc.get("cover_image", ""), "status": doc.get("status", ""), "summary": doc.get("summary", ""), "created_at": format_time(doc.get("created_at")), "likes": doc.get("likes", []), "likes_count": len(doc.get("likes", [])) } def get_article_list_data(page=1, limit=20, sort_by="created_at", status=None): """获取文章列表(分页)""" try: skip = (page - 1) * limit query_condition = {} if status: query_condition["status"] = status sort_field = sort_by if sort_by in ["created_at", "title"] else "created_at" sort_order = -1 cursor = articles_collection.find(query_condition).sort(sort_field, sort_order).skip(skip).limit(limit) docs = list(cursor) total = articles_collection.count_documents(query_condition) article_list = [] for doc in docs: item = format_article_item(doc) article_list.append(item) return { "success": True, "data": article_list, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit, "has_next": page * limit < total, "has_prev": page > 1 }, "sort_by": sort_by, "status_filter": status, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取文章列表失败: {e}") return {"success": False, "message": f"获取数据失败: {str(e)}"} def search_articles_data(keyword, page=1, limit=10): """搜索文章""" try: if not keyword: return {"success": False, "message": "请提供搜索关键词"} skip = (page - 1) * limit search_condition = { "$or": [ {"title": {"$regex": keyword, "$options": "i"}}, {"content": {"$regex": keyword, "$options": "i"}}, {"summary": {"$regex": keyword, "$options": "i"}} ] } cursor = articles_collection.find(search_condition).sort("created_at", -1).skip(skip).limit(limit) docs = list(cursor) total = articles_collection.count_documents(search_condition) search_results = [] for doc in docs: item = format_article_item(doc) search_results.append(item) return { "success": True, "data": search_results, "keyword": keyword, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit, "has_next": page * limit < total, "has_prev": page > 1 }, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"搜索文章失败: {e}") return {"success": False, "message": f"搜索失败: {str(e)}"} def get_article_detail_data(article_id): """获取文章详情""" try: from bson import ObjectId try: doc = articles_collection.find_one({"_id": ObjectId(article_id)}) except: doc = articles_collection.find_one({ "$or": [ {"title": article_id}, {"author_id": article_id} ] }) if not doc: return {"success": False, "message": "未找到文章信息"} detail = format_article_item(doc) return { "success": True, "data": detail, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取文章详情失败: {e}") return {"success": False, "message": f"获取详情失败: {str(e)}"} def get_article_statistics(): """获取统计信息""" try: total_articles = articles_collection.count_documents({}) if total_articles == 0: return {"success": False, "message": "暂无数据"} status_stats = [] for status in ["draft", "published", "archived"]: count = articles_collection.count_documents({"status": status}) status_stats.append({"status": status, "count": count}) latest_doc = articles_collection.find().sort("created_at", -1).limit(1) latest_time = "" if latest_doc: latest_list = list(latest_doc) if latest_list: latest_time = format_time(latest_list[0].get("created_at")) return { "success": True, "data": { "total_articles": total_articles, "status_stats": status_stats, "latest_update": latest_time }, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取统计信息失败: {e}") return {"success": False, "message": f"获取统计失败: {str(e)}"} # 文章路由定义 @rank_bp.route('/article/list') def get_articles_route(): """获取文章列表""" page = int(request.args.get('page', 1)) limit = int(request.args.get('limit', 20)) sort_by = request.args.get('sort', 'created_at') status = request.args.get('status') result = get_article_list_data(page, limit, sort_by, status) return jsonify(result) @rank_bp.route('/article/search') def search_articles_route(): """搜索文章""" keyword = request.args.get('q', '') page = int(request.args.get('page', 1)) limit = int(request.args.get('limit', 10)) result = search_articles_data(keyword, page, limit) return jsonify(result) @rank_bp.route('/article/detail') def get_article_detail_route(): """获取文章详情""" article_id = request.args.get('id', '') result = get_article_detail_data(article_id) return jsonify(result) @rank_bp.route('/article/stats') def get_article_stats_route(): """获取统计信息""" result = get_article_statistics() return jsonify(result) @rank_bp.route('/article/health') def article_health_check(): """健康检查""" try: total_records = articles_collection.count_documents({}) return jsonify({ "success": True, "message": "服务正常", "data": { "database": "连接正常", "total_records": total_records, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } }) except Exception as e: return jsonify({ "success": False, "message": f"服务异常: {str(e)}", "data": { "database": "连接失败", "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } })