#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 小程序专用抖音播放量数据API服务器 优化的数据格式和接口设计,专为小程序使用 """ from flask import Blueprint, request, jsonify from datetime import datetime, timedelta import logging import re from database import db # 创建蓝图 rank_bp = Blueprint('rank', __name__, url_prefix='/api/rank') # 获取数据库集合 collection = db['Rankings_list'] daily_rankings_collection = db['Ranking_storage'] # 榜单存储表 def format_playcount(playcount_str): """格式化播放量字符串为数字""" if not playcount_str: return 0 try: if isinstance(playcount_str, (int, float)): return int(playcount_str) playcount_str = str(playcount_str).strip() # 处理亿、万等单位 if "亿" in playcount_str: num = float(re.findall(r'[\d.]+', playcount_str)[0]) return int(num * 100000000) elif "万" in playcount_str: num = float(re.findall(r'[\d.]+', playcount_str)[0]) return int(num * 10000) else: # 尝试直接转换数字 return int(float(playcount_str)) except: return 0 def format_cover_url(cover_data): """格式化封面图片URL""" if not cover_data: return "" if isinstance(cover_data, str): return cover_data elif isinstance(cover_data, dict) and 'url_list' in cover_data: return cover_data['url_list'][0] if cover_data['url_list'] else "" else: return "" def format_time(time_obj): """格式化时间""" if not time_obj: return "" if isinstance(time_obj, datetime): return time_obj.strftime("%Y-%m-%d %H:%M:%S") else: return str(time_obj) def sort_ranking_data(ranking_data, sort_by, sort_order='desc'): """ 对榜单数据进行动态排序 Args: ranking_data: 榜单数据列表 sort_by: 排序字段 (play_vv_change, play_vv_change_rate, play_vv, rank) sort_order: 排序顺序 (asc, desc) Returns: 排序后的榜单数据 """ try: # 定义排序键函数 def get_sort_key(item): if sort_by == 'play_vv_change': # 按播放量差值排序 timeline_data = item.get('timeline_data', {}) return timeline_data.get('play_vv_change', 0) elif sort_by == 'play_vv_change_rate': # 按播放量变化率排序 timeline_data = item.get('timeline_data', {}) return timeline_data.get('play_vv_change_rate', 0) elif sort_by == 'play_vv': # 按当前播放量排序 return item.get('play_vv', 0) elif sort_by == 'rank': # 按排名排序 return item.get('rank', 999999) else: # 默认按排名排序 return item.get('rank', 999999) # 执行排序 reverse = (sort_order == 'desc') # 对于排名字段,降序实际上是升序(排名越小越好) if sort_by == 'rank': reverse = (sort_order == 'asc') sorted_data = sorted(ranking_data, key=get_sort_key, reverse=reverse) # 重新分配排名 for i, item in enumerate(sorted_data, 1): item['current_sort_rank'] = i return sorted_data except Exception as e: logging.error(f"排序榜单数据失败: {e}") # 如果排序失败,返回原始数据 return ranking_data def format_mix_item(doc): """格式化合集数据项 - 完全按照数据库原始字段返回""" return { "_id": str(doc.get("_id", "")), "batch_time": format_time(doc.get("batch_time")), "mix_name": doc.get("mix_name", ""), "video_url": doc.get("video_url", ""), "playcount": doc.get("playcount", ""), "play_vv": doc.get("play_vv", 0), "request_id": doc.get("request_id", ""), "rank": doc.get("rank", 0), "cover_image_url": doc.get("cover_image_url", ""), "cover_backup_urls": doc.get("cover_backup_urls", []) } def get_mix_list(page=1, limit=20, sort_by="playcount"): """获取合集列表(分页)""" try: # 计算跳过的数量 skip = (page - 1) * limit # 设置排序字段 if sort_by == "growth": # 按增长排序需要特殊处理 return get_growth_mixes(page, limit) else: sort_field = "play_vv" if sort_by == "playcount" else "batch_time" sort_order = -1 # 降序 # 获取今天的日期 today = datetime.now().date() # 只查询今天的数据 query_condition = { "batch_time": { "$gte": datetime(today.year, today.month, today.day), "$lt": datetime(today.year, today.month, today.day) + timedelta(days=1) } } # 查询数据并按短剧名称分组,取每个短剧的最新记录 pipeline = [ {"$match": query_condition}, {"$sort": {"batch_time": -1}}, # 按时间倒序 {"$group": { "_id": "$mix_name", # 按短剧名称分组 "latest_doc": {"$first": "$$ROOT"} # 取每个分组的第一条记录(最新记录) }}, {"$replaceRoot": {"newRoot": "$latest_doc"}}, {"$sort": {sort_field: sort_order}}, {"$skip": skip}, {"$limit": limit} ] docs = list(collection.aggregate(pipeline)) # 获取总数 total_pipeline = [ {"$match": query_condition}, {"$sort": {"batch_time": -1}}, {"$group": {"_id": "$mix_name"}}, {"$count": "total"} ] total_result = list(collection.aggregate(total_pipeline)) total = total_result[0]["total"] if total_result else 0 # 格式化数据 mix_list = [] for doc in docs: item = format_mix_item(doc) mix_list.append(item) return { "success": True, "data": mix_list, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit, "has_next": page * limit < total, "has_prev": page > 1 }, "sort_by": sort_by, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取合集列表失败: {e}") return {"success": False, "message": f"获取数据失败: {str(e)}"} def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None): """获取按播放量增长排序的合集列表 - 优先从定时器生成的数据中读取""" try: # 计算跳过的数量 skip = (page - 1) * limit # 如果没有提供日期,默认使用今天和昨天 if not start_date or not end_date: end_date = datetime.now().date() start_date = end_date - timedelta(days=1) else: # 转换字符串日期为datetime对象 if isinstance(start_date, str): start_date = datetime.strptime(start_date, "%Y-%m-%d").date() if isinstance(end_date, str): end_date = datetime.strptime(end_date, "%Y-%m-%d").date() end_date_str = end_date.strftime("%Y-%m-%d") start_date_str = start_date.strftime("%Y-%m-%d") # 优先尝试从定时器生成的增长榜数据中读取 try: growth_ranking = daily_rankings_collection.find_one({ "date": end_date_str, "type": "growth", "start_date": start_date_str, "end_date": end_date_str }, sort=[("calculation_sequence", -1)]) # 获取最新的计算结果 if growth_ranking and "data" in growth_ranking: logging.info(f"📈 从定时器生成的增长榜数据中读取 {end_date_str} 的增长榜") # 获取预先计算好的增长榜数据 growth_data = growth_ranking["data"] # 分页处理 total = len(growth_data) paginated_data = growth_data[skip:skip + limit] return { "success": True, "data": paginated_data, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit, "has_next": page * limit < total, "has_prev": page > 1 }, "sort_by": "growth", "date_range": { "start_date": start_date_str, "end_date": end_date_str }, "data_source": "timer_generated", # 标识数据来源 "update_time": growth_ranking.get("created_at", datetime.now()).strftime("%Y-%m-%d %H:%M:%S") if isinstance(growth_ranking.get("created_at"), datetime) else str(growth_ranking.get("created_at", "")) } except Exception as e: logging.warning(f"从定时器数据读取增长榜失败,将使用动态计算: {e}") # 如果定时器数据不存在或读取失败,回退到动态计算 logging.info(f"📊 动态计算 {start_date_str} 到 {end_date_str} 的增长榜") # 查询结束日期的数据 end_cursor = collection.find({ "batch_time": { "$gte": datetime(end_date.year, end_date.month, end_date.day), "$lt": datetime(end_date.year, end_date.month, end_date.day) + timedelta(days=1) } }) end_data = list(end_cursor) # 查询开始日期的数据 start_cursor = collection.find({ "batch_time": { "$gte": datetime(start_date.year, start_date.month, start_date.day), "$lt": datetime(start_date.year, start_date.month, start_date.day) + timedelta(days=1) } }) start_data = list(start_cursor) # 创建字典以便快速查找 end_dict = {item["mix_name"]: item for item in end_data} start_dict = {item["mix_name"]: item for item in start_data} # 计算增长数据 growth_data = [] for mix_name, end_item in end_dict.items(): if mix_name in start_dict: start_item = start_dict[mix_name] growth = end_item.get("play_vv", 0) - start_item.get("play_vv", 0) # 只保留增长为正的数据 if growth > 0: item = format_mix_item(end_item) item["growth"] = growth item["start_date"] = start_date_str item["end_date"] = end_date_str growth_data.append(item) else: # 如果开始日期没有数据,但结束日期有,也认为是新增长 item = format_mix_item(end_item) item["growth"] = end_item.get("play_vv", 0) item["start_date"] = start_date_str item["end_date"] = end_date_str growth_data.append(item) # 按增长值降序排序 growth_data.sort(key=lambda x: x.get("growth", 0), reverse=True) # 分页处理 total = len(growth_data) paginated_data = growth_data[skip:skip + limit] # 添加排名 for i, item in enumerate(paginated_data): item["rank"] = skip + i + 1 return { "success": True, "data": paginated_data, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit, "has_next": page * limit < total, "has_prev": page > 1 }, "sort_by": "growth", "date_range": { "start_date": start_date_str, "end_date": end_date_str }, "data_source": "dynamic_calculation", # 标识数据来源 "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取增长合集列表失败: {e}") # 如果增长计算失败,返回按播放量排序的数据作为备选 return get_mix_list(page, limit, "playcount") def get_top_mixes(limit=10): """获取热门合集(TOP榜单)""" try: # 按播放量排序获取热门合集 cursor = collection.find().sort("play_vv", -1).limit(limit) docs = list(cursor) if not docs: return {"success": False, "message": "暂无数据"} # 格式化数据 top_list = [] for doc in docs: item = format_mix_item(doc) top_list.append(item) return { "success": True, "data": top_list, "total": len(top_list), "update_time": format_time(docs[0].get("batch_time")) if docs else "" } except Exception as e: logging.error(f"获取热门合集失败: {e}") return {"success": False, "message": f"获取数据失败: {str(e)}"} def search_mixes(keyword, page=1, limit=10): """搜索合集""" try: if not keyword: return {"success": False, "message": "请提供搜索关键词"} # 计算跳过的数量 skip = (page - 1) * limit # 构建搜索条件(模糊匹配合集名称) search_condition = { "mix_name": {"$regex": keyword, "$options": "i"} } # 查询数据 cursor = collection.find(search_condition).sort("play_vv", -1).skip(skip).limit(limit) docs = list(cursor) # 获取搜索结果总数 total = collection.count_documents(search_condition) # 格式化数据 search_results = [] for doc in docs: item = format_mix_item(doc) search_results.append(item) return { "success": True, "data": search_results, "keyword": keyword, "pagination": { "page": page, "limit": limit, "total": total, "pages": (total + limit - 1) // limit, "has_next": page * limit < total, "has_prev": page > 1 }, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"搜索合集失败: {e}") return {"success": False, "message": f"搜索失败: {str(e)}"} def get_mix_detail(mix_id): """获取合集详情""" try: from bson import ObjectId # 尝试通过ObjectId查找 try: doc = collection.find_one({"_id": ObjectId(mix_id)}) except: # 如果ObjectId无效,尝试其他字段 doc = collection.find_one({ "$or": [ {"mix_name": mix_id}, {"request_id": mix_id} ] }) if not doc: return {"success": False, "message": "未找到合集信息"} # 格式化详细信息 - 只返回数据库原始字段 detail = format_mix_item(doc) return { "success": True, "data": detail, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取合集详情失败: {e}") return {"success": False, "message": f"获取详情失败: {str(e)}"} def get_statistics(): """获取统计信息""" try: # 基本统计 total_mixes = collection.count_documents({}) if total_mixes == 0: return {"success": False, "message": "暂无数据"} # 播放量统计 pipeline = [ { "$group": { "_id": None, "total_playcount": {"$sum": "$play_vv"}, "avg_playcount": {"$avg": "$play_vv"}, "max_playcount": {"$max": "$play_vv"}, "min_playcount": {"$min": "$play_vv"} } } ] stats_result = list(collection.aggregate(pipeline)) stats = stats_result[0] if stats_result else {} # 获取最新更新时间 latest_doc = collection.find().sort("batch_time", -1).limit(1) latest_time = "" if latest_doc: latest_list = list(latest_doc) if latest_list: latest_time = format_time(latest_list[0].get("batch_time")) # 热门分类统计(按播放量区间) categories = [ {"name": "超热门", "min": 100000000, "count": 0}, # 1亿+ {"name": "热门", "min": 50000000, "max": 99999999, "count": 0}, # 5000万-1亿 {"name": "中等", "min": 10000000, "max": 49999999, "count": 0}, # 1000万-5000万 {"name": "一般", "min": 0, "max": 9999999, "count": 0} # 1000万以下 ] for category in categories: if "max" in category: count = collection.count_documents({ "play_vv": {"$gte": category["min"], "$lte": category["max"]} }) else: count = collection.count_documents({ "play_vv": {"$gte": category["min"]} }) category["count"] = count return { "success": True, "data": { "total_mixes": total_mixes, "total_playcount": stats.get("total_playcount", 0), "avg_playcount": int(stats.get("avg_playcount", 0)), "max_playcount": stats.get("max_playcount", 0), "min_playcount": stats.get("min_playcount", 0), "categories": categories, "latest_update": latest_time }, "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: logging.error(f"获取统计信息失败: {e}") return {"success": False, "message": f"获取统计失败: {str(e)}"} # 路由定义 @rank_bp.route('/videos') def get_videos(): """获取合集列表 - 兼容app.py调用""" page = int(request.args.get('page', 1)) limit = int(request.args.get('limit', 20)) sort_by = request.args.get('sort', 'playcount') if sort_by == 'growth': start_date = request.args.get('start_date') end_date = request.args.get('end_date') result = get_growth_mixes(page, limit, start_date, end_date) else: result = get_mix_list(page, limit, sort_by) return jsonify(result) @rank_bp.route('/top') def get_top(): """获取热门榜单 - 兼容app.py调用""" limit = int(request.args.get('limit', 10)) result = get_top_mixes(limit) return jsonify(result) @rank_bp.route('/search') def search(): """搜索合集 - 兼容app.py调用""" keyword = request.args.get('q', '') page = int(request.args.get('page', 1)) limit = int(request.args.get('limit', 10)) result = search_mixes(keyword, page, limit) return jsonify(result) @rank_bp.route('/detail') def get_detail(): """获取合集详情 - 兼容app.py调用""" mix_id = request.args.get('id', '') result = get_mix_detail(mix_id) return jsonify(result) @rank_bp.route('/stats') def get_stats(): """获取统计信息 - 兼容app.py调用""" result = get_statistics() return jsonify(result) @rank_bp.route('/health') def health_check(): """健康检查 - 兼容app.py调用""" try: from database import client # 检查数据库连接 if not client: return jsonify({"success": False, "message": "数据库未连接"}) # 测试数据库连接 client.admin.command('ping') # 获取数据统计 total_count = collection.count_documents({}) return jsonify({ "success": True, "message": "服务正常", "data": { "database": "连接正常", "total_records": total_count, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } }) except Exception as e: logging.error(f"健康检查失败: {e}") return jsonify({"success": False, "message": f"服务异常: {str(e)}"}) # ==================== 榜单查询API接口 ==================== @rank_bp.route('/rankings') def get_rankings(): """获取榜单列表 - 支持按日期和类型查询,支持动态排序""" try: # 获取查询参数 date = request.args.get('date') # 日期,格式:YYYY-MM-DD ranking_type = request.args.get('type') # 榜单类型:playcount, growth, newcomer sort_by = request.args.get('sort_by', 'default') # 排序方式:default, play_vv_change, play_vv_change_rate, play_vv sort_order = request.args.get('sort_order', 'desc') # 排序顺序:asc, desc page = int(request.args.get('page', 1)) limit = int(request.args.get('limit', 50)) # 构建查询条件 query = {} if date: query['date'] = date if ranking_type: query['ranking_type'] = ranking_type # 如果没有指定日期,默认获取最新日期的榜单 if not date: latest_ranking = daily_rankings_collection.find_one( {}, sort=[('date', -1)] ) if latest_ranking: query['date'] = latest_ranking['date'] # 查询榜单 rankings = list(daily_rankings_collection.find(query).sort('generated_at', -1)) if not rankings: return jsonify({ "success": True, "message": "暂无榜单数据", "data": { "rankings": [], "total": 0, "page": page, "limit": limit } }) # 格式化返回数据 formatted_rankings = [] for ranking in rankings: ranking_data = ranking.get('data', []) # 动态排序逻辑 if sort_by != 'default' and ranking_data: ranking_data = sort_ranking_data(ranking_data, sort_by, sort_order) # 分页处理榜单数据 start_idx = (page - 1) * limit end_idx = start_idx + limit paginated_data = ranking_data[start_idx:end_idx] formatted_rankings.append({ "date": ranking.get('date'), "ranking_type": ranking.get('ranking_type'), "ranking_name": ranking.get('ranking_name'), "description": ranking.get('description'), "data": paginated_data, "total_count": len(ranking_data), "current_page_count": len(paginated_data), "generated_at": format_time(ranking.get('generated_at')), "version": ranking.get('version', '1.0'), "sort_info": { "sort_by": sort_by, "sort_order": sort_order } }) return jsonify({ "success": True, "message": "获取榜单成功", "data": { "rankings": formatted_rankings, "total": len(formatted_rankings), "page": page, "limit": limit, "sort_by": sort_by, "sort_order": sort_order } }) except Exception as e: logging.error(f"获取榜单失败: {e}") return jsonify({"success": False, "message": f"获取榜单失败: {str(e)}"}) @rank_bp.route('/rankings/dates') def get_ranking_dates(): """获取可用的榜单日期列表""" try: # 获取所有不重复的日期 dates = daily_rankings_collection.distinct('date') dates.sort(reverse=True) # 按日期倒序排列 return jsonify({ "success": True, "message": "获取日期列表成功", "data": { "dates": dates, "total": len(dates) } }) except Exception as e: logging.error(f"获取日期列表失败: {e}") return jsonify({"success": False, "message": f"获取日期列表失败: {str(e)}"}) @rank_bp.route('/rankings/types') def get_ranking_types(): """获取支持的榜单类型""" try: # 获取所有不重复的榜单类型 types = daily_rankings_collection.distinct('ranking_type') # 添加类型说明 type_descriptions = { 'playcount': '播放量榜 - 按播放量排序', 'growth': '增长榜 - 播放量增长最快', 'newcomer': '新晋榜 - 新上榜内容' } formatted_types = [] for type_name in types: formatted_types.append({ "type": type_name, "description": type_descriptions.get(type_name, type_name) }) return jsonify({ "success": True, "message": "获取榜单类型成功", "data": { "types": formatted_types, "total": len(formatted_types) } }) except Exception as e: logging.error(f"获取榜单类型失败: {e}") return jsonify({"success": False, "message": f"获取榜单类型失败: {str(e)}"}) @rank_bp.route('/rankings/latest') def get_latest_rankings(): """获取最新的所有类型榜单""" try: # 获取最新日期 latest_ranking = daily_rankings_collection.find_one( {}, sort=[('date', -1)] ) if not latest_ranking: return jsonify({ "success": True, "message": "暂无榜单数据", "data": { "date": None, "rankings": [] } }) latest_date = latest_ranking['date'] # 获取该日期的所有榜单 rankings = list(daily_rankings_collection.find({ 'date': latest_date }).sort('ranking_type', 1)) formatted_rankings = [] for ranking in rankings: # 只返回前20条数据 ranking_data = ranking.get('data', [])[:20] formatted_rankings.append({ "ranking_type": ranking.get('ranking_type'), "ranking_name": ranking.get('ranking_name'), "description": ranking.get('description'), "data": ranking_data, "total_count": ranking.get('total_count', 0), "preview_count": len(ranking_data) }) return jsonify({ "success": True, "message": "获取最新榜单成功", "data": { "date": latest_date, "rankings": formatted_rankings, "total_types": len(formatted_rankings) } }) except Exception as e: logging.error(f"获取最新榜单失败: {e}") return jsonify({"success": False, "message": f"获取最新榜单失败: {str(e)}"}) @rank_bp.route('/rankings/stats') def get_rankings_stats(): """获取榜单统计信息""" try: # 统计总榜单数 total_rankings = daily_rankings_collection.count_documents({}) # 统计日期数量 total_dates = len(daily_rankings_collection.distinct('date')) # 统计榜单类型数量 total_types = len(daily_rankings_collection.distinct('ranking_type')) # 获取最新和最早日期 latest_ranking = daily_rankings_collection.find_one({}, sort=[('date', -1)]) earliest_ranking = daily_rankings_collection.find_one({}, sort=[('date', 1)]) latest_date = latest_ranking['date'] if latest_ranking else None earliest_date = earliest_ranking['date'] if earliest_ranking else None return jsonify({ "success": True, "message": "获取榜单统计成功", "data": { "total_rankings": total_rankings, "total_dates": total_dates, "total_types": total_types, "latest_date": latest_date, "earliest_date": earliest_date, "date_range": f"{earliest_date} 至 {latest_date}" if earliest_date and latest_date else "暂无数据" } }) except Exception as e: logging.error(f"获取榜单统计失败: {e}") return jsonify({"success": False, "message": f"获取榜单统计失败: {str(e)}"})