榜单列表数据库储存

This commit is contained in:
qiaoyirui0819 2025-10-21 16:17:52 +08:00
parent 2c0f891a89
commit bd58c1dc8f
3 changed files with 592 additions and 28 deletions

View File

@ -4,9 +4,16 @@
抖音播放量自动抓取定时器 - 跨平台版本 抖音播放量自动抓取定时器 - 跨平台版本
功能 功能
- 每晚24:00自动执行抖音播放量抓取任务 - 每晚自动执行抖音播放量抓取任务
- 数据抓取完成后自动生类榜单
- 支持WindowsmacOSLinux - 支持WindowsmacOSLinux
- 自动保存数据到MongoDB - 自动保存数据到MongoDB
使用方法
- 正常模式python Timer_worker.py启动定时器
- 测试模式python Timer_worker.py --test立即执行一次
- 单次执行python Timer_worker.py --once立即执行一次并退出
- 仅生成榜单python Timer_worker.py --ranking-only仅生成榜单不抓取数据
""" """
import schedule import schedule
@ -14,14 +21,17 @@ import time
import sys import sys
import os import os
import logging import logging
import argparse
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime, date
import config import config
# 添加项目路径到 Python 路径 # 添加项目路径到 Python 路径
sys.path.append(os.path.join(os.path.dirname(__file__), 'handlers', 'Rankings')) sys.path.append(os.path.join(os.path.dirname(__file__), 'handlers', 'Rankings'))
from rank_data_scraper import DouyinPlayVVScraper from rank_data_scraper import DouyinPlayVVScraper
# 配置日志的函数 # 配置日志的函数
def setup_logging(): def setup_logging():
"""设置日志配置""" """设置日志配置"""
@ -64,11 +74,197 @@ class DouyinAutoScheduler:
logging.info("✅ 抖音播放量抓取任务执行成功") logging.info("✅ 抖音播放量抓取任务执行成功")
# 数据抓取完成后,自动生成当日榜单
self.generate_daily_rankings()
except Exception as e: except Exception as e:
logging.error(f"💥 执行任务时发生异常: {e}") logging.error(f"💥 执行任务时发生异常: {e}")
import traceback import traceback
logging.error(f"详细错误信息: {traceback.format_exc()}") logging.error(f"详细错误信息: {traceback.format_exc()}")
def generate_daily_rankings(self):
"""生成每日榜单数据(基于时间轴对比)"""
try:
from database import db
from datetime import timedelta
# 获取集合
douyin_collection = db['Rankings_list'] # 使用真实抓取的数据
rankings_collection = db['Ranking_storage']
today = date.today()
yesterday = today - timedelta(days=1)
today_str = today.strftime('%Y-%m-%d')
yesterday_str = yesterday.strftime('%Y-%m-%d')
logging.info(f"📅 正在生成 {today_str} 的榜单(对比 {yesterday_str}...")
# 删除当天已有的榜单数据
rankings_collection.delete_many({"date": today_str})
logging.info(f"🗑️ 已清理 {today_str} 的旧榜单数据")
# 获取今天和昨天的榜单数据进行对比
try:
logging.info("🔄 正在生成时间轴对比榜单...")
# 获取今天的数据,按短剧名称去重,只保留播放量最高的
today_videos_raw = list(douyin_collection.find({}).sort("play_vv", -1))
# 按短剧名称去重,每个短剧只保留播放量最高的一条
unique_videos = {}
for video in today_videos_raw:
mix_name = video.get("mix_name", "")
if mix_name and (mix_name not in unique_videos or video.get("play_vv", 0) > unique_videos[mix_name].get("play_vv", 0)):
unique_videos[mix_name] = video
today_videos = list(unique_videos.values())
logging.info(f"📊 今日数据去重后:{len(today_videos)} 个独特短剧(原始数据:{len(today_videos_raw)} 条)")
# 获取昨天的榜单数据(如果存在),取最新的计算结果
yesterday_ranking = rankings_collection.find_one({
"date": yesterday_str,
"type": "comprehensive"
}, sort=[("calculation_sequence", -1)])
yesterday_data = {}
if yesterday_ranking and "data" in yesterday_ranking:
# 将昨天的数据转换为字典,以短剧名称为键
for item in yesterday_ranking["data"]:
title = item.get("title", "")
if title:
yesterday_data[title] = {
"rank": item.get("rank", 0),
"play_vv": item.get("play_vv", 0),
"video_id": item.get("video_id", "")
}
logging.info(f"📊 找到昨天的榜单数据,共 {len(yesterday_data)} 个短剧")
else:
logging.info("📊 未找到昨天的榜单数据,将作为首次生成")
if today_videos:
# 先计算所有视频的播放量差值
videos_with_growth = []
for video in today_videos:
video_id = str(video.get("_id", ""))
current_play_vv = video.get("play_vv", 0)
# 计算与昨天的对比数据
play_vv_change = 0
play_vv_change_rate = 0
is_new = True
mix_name = video.get("mix_name", "")
if mix_name in yesterday_data:
is_new = False
yesterday_play_vv = yesterday_data[mix_name]["play_vv"]
# 计算播放量变化
play_vv_change = current_play_vv - yesterday_play_vv
if yesterday_play_vv > 0:
play_vv_change_rate = round((play_vv_change / yesterday_play_vv) * 100, 2)
# 创建包含增长数据的视频项
video_with_growth = {
"video": video,
"play_vv_change": play_vv_change,
"play_vv_change_rate": play_vv_change_rate,
"is_new": is_new,
"yesterday_data": yesterday_data.get(mix_name, {})
}
videos_with_growth.append(video_with_growth)
# 按播放量差值降序排序(差值越大排名越靠前)
videos_with_growth.sort(key=lambda x: x["play_vv_change"], reverse=True)
comprehensive_ranking = {
"date": today_str,
"type": "comprehensive",
"name": "播放量增长榜单",
"description": f"基于 {yesterday_str}{today_str} 播放量差值排序的榜单(差值越大排名越靠前)",
"comparison_date": yesterday_str,
"total_videos": len(videos_with_growth),
"data": []
}
# 生成排序后的榜单数据
for i, item in enumerate(videos_with_growth, 1):
video = item["video"]
video_id = str(video.get("_id", ""))
current_play_vv = video.get("play_vv", 0)
mix_name = video.get("mix_name", "")
# 计算排名变化(基于昨天的排名)
rank_change = 0
if not item["is_new"] and item["yesterday_data"]:
yesterday_rank = item["yesterday_data"].get("rank", 0)
rank_change = yesterday_rank - i
ranking_item = {
"rank": i,
"title": mix_name,
"play_vv": current_play_vv,
"author": video.get("author", ""),
"video_id": video_id,
"video_url": video.get("video_url", ""),
"cover_image_url": video.get("cover_image_url", ""),
"playcount_str": video.get("playcount", ""),
# 时间轴对比数据
"timeline_data": {
"is_new": item["is_new"],
"rank_change": rank_change,
"play_vv_change": item["play_vv_change"],
"play_vv_change_rate": item["play_vv_change_rate"],
"yesterday_rank": item["yesterday_data"].get("rank", 0) if not item["is_new"] else 0,
"yesterday_play_vv": item["yesterday_data"].get("play_vv", 0) if not item["is_new"] else 0
}
}
comprehensive_ranking["data"].append(ranking_item)
# 为每次计算添加唯一的时间戳,确保数据唯一性
current_timestamp = datetime.now()
comprehensive_ranking["created_at"] = current_timestamp
comprehensive_ranking["calculation_id"] = f"{today_str}_{current_timestamp.strftime('%H%M%S')}"
# 检查今天已有多少次计算
existing_count = rankings_collection.count_documents({
"date": today_str,
"type": "comprehensive"
})
comprehensive_ranking["calculation_sequence"] = existing_count + 1
# 总是插入新的榜单记录,保留所有历史计算数据
rankings_collection.insert_one(comprehensive_ranking)
logging.info(f"📝 创建了新的今日榜单数据(第{existing_count + 1}次计算,包含最新差值)")
logging.info(f"🔖 计算ID: {comprehensive_ranking['calculation_id']}")
# 统计信息
new_count = sum(1 for item in comprehensive_ranking["data"] if item["timeline_data"]["is_new"])
logging.info(f"✅ 时间轴对比榜单生成成功")
logging.info(f"📊 总计 {len(comprehensive_ranking['data'])} 条记录")
logging.info(f"🆕 新上榜 {new_count}")
logging.info(f"🔄 对比基准日期: {yesterday_str}")
return True
else:
logging.warning("⚠️ 榜单生成失败:无今日数据")
return False
except Exception as e:
logging.error(f"💥 生成时间轴对比榜单时发生异常: {e}")
import traceback
logging.error(f"详细错误信息: {traceback.format_exc()}")
return False
except Exception as e:
logging.error(f"💥 生成榜单时发生异常: {e}")
import traceback
logging.error(f"详细错误信息: {traceback.format_exc()}")
def setup_schedule(self): def setup_schedule(self):
"""设置定时任务""" """设置定时任务"""
# 从配置文件读取执行时间 # 从配置文件读取执行时间
@ -94,6 +290,11 @@ class DouyinAutoScheduler:
logging.info("🧪 测试模式 - 立即执行抖音播放量抓取任务...") logging.info("🧪 测试模式 - 立即执行抖音播放量抓取任务...")
self.run_douyin_scraper() self.run_douyin_scraper()
def run_ranking_only(self):
"""仅生成榜单(不抓取数据)"""
logging.info("📊 仅生成榜单模式...")
self.generate_daily_rankings()
def start_scheduler(self): def start_scheduler(self):
"""启动定时器""" """启动定时器"""
self.is_running = True self.is_running = True
@ -120,24 +321,43 @@ def main():
"""主函数""" """主函数"""
import argparse import argparse
try:
parser = argparse.ArgumentParser(description='抖音播放量自动抓取定时器') parser = argparse.ArgumentParser(description='抖音播放量自动抓取定时器')
parser.add_argument('--test', action='store_true', help='测试模式 - 立即执行一次') parser.add_argument('--test', action='store_true', help='测试模式 - 立即执行一次')
parser.add_argument('--once', action='store_true', help='立即执行一次并退出') parser.add_argument('--once', action='store_true', help='立即执行一次并退出')
parser.add_argument('--ranking-only', action='store_true', help='仅生成榜单(不抓取数据)')
args = parser.parse_args() args = parser.parse_args()
# 设置日志配置 # 设置日志配置
setup_logging() setup_logging()
print("正在初始化定时器...")
scheduler = DouyinAutoScheduler() scheduler = DouyinAutoScheduler()
if args.test: if args.test:
print("执行测试模式...")
scheduler.run_test() scheduler.run_test()
elif args.once: elif args.once:
print("执行单次模式...")
scheduler.run_once() scheduler.run_once()
elif args.ranking_only:
print("执行榜单生成模式...")
scheduler.run_ranking_only()
else: else:
print("启动定时器模式...")
scheduler.setup_schedule() scheduler.setup_schedule()
scheduler.start_scheduler() scheduler.start_scheduler()
print("程序执行完成")
except Exception as e:
print(f"程序执行出错: {e}")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -4,7 +4,7 @@ import importlib
# 数据库配置 # 数据库配置
MONGO_URI = "mongodb://localhost:27017" MONGO_URI = "mongodb://localhost:27017"
# MONGO_URI = "mongodb://mongouser:Jdei2243afN@172.16.0.6:27017,172.16.0.4:27017/test?replicaSet=cmgo-r6qkaern_0&authSource=admin" # MONGO_URI = "mongodb://mongouser:Jdei2243afN@172.16.0.6:27017,172.16.0.4:27017/test?replicaSet=cmgo-r6qkaern_0&authSource=admin"
MONGO_DB_NAME = "kemeng_media" MONGO_DB_NAME = "Rankings"
# 应用配置 # 应用配置
APP_ENV = os.getenv('APP_ENV', 'development') APP_ENV = os.getenv('APP_ENV', 'development')
@ -15,6 +15,6 @@ LOG_LEVEL = 'INFO'
LOG_DIR = 'logs' LOG_DIR = 'logs'
# 定时器配置 # 定时器配置
SCHEDULER_TIME = "00:01" # 定时器执行时间,格式为 HH:MM (24小时制) SCHEDULER_TIME = "24:00" # 定时器执行时间,格式为 HH:MM (24小时制)
print(f"Successfully loaded configuration for environment: {APP_ENV}") print(f"Successfully loaded configuration for environment: {APP_ENV}")

View File

@ -16,6 +16,7 @@ rank_bp = Blueprint('rank', __name__, url_prefix='/api/rank')
# 获取数据库集合 # 获取数据库集合
collection = db['Rankings_list'] collection = db['Rankings_list']
daily_rankings_collection = db['Ranking_storage'] # 榜单存储表
def format_playcount(playcount_str): def format_playcount(playcount_str):
"""格式化播放量字符串为数字""" """格式化播放量字符串为数字"""
@ -63,6 +64,59 @@ def format_time(time_obj):
else: else:
return str(time_obj) return str(time_obj)
def sort_ranking_data(ranking_data, sort_by, sort_order='desc'):
"""
对榜单数据进行动态排序
Args:
ranking_data: 榜单数据列表
sort_by: 排序字段 (play_vv_change, play_vv_change_rate, play_vv, rank)
sort_order: 排序顺序 (asc, desc)
Returns:
排序后的榜单数据
"""
try:
# 定义排序键函数
def get_sort_key(item):
if sort_by == 'play_vv_change':
# 按播放量差值排序
timeline_data = item.get('timeline_data', {})
return timeline_data.get('play_vv_change', 0)
elif sort_by == 'play_vv_change_rate':
# 按播放量变化率排序
timeline_data = item.get('timeline_data', {})
return timeline_data.get('play_vv_change_rate', 0)
elif sort_by == 'play_vv':
# 按当前播放量排序
return item.get('play_vv', 0)
elif sort_by == 'rank':
# 按排名排序
return item.get('rank', 999999)
else:
# 默认按排名排序
return item.get('rank', 999999)
# 执行排序
reverse = (sort_order == 'desc')
# 对于排名字段,降序实际上是升序(排名越小越好)
if sort_by == 'rank':
reverse = (sort_order == 'asc')
sorted_data = sorted(ranking_data, key=get_sort_key, reverse=reverse)
# 重新分配排名
for i, item in enumerate(sorted_data, 1):
item['current_sort_rank'] = i
return sorted_data
except Exception as e:
logging.error(f"排序榜单数据失败: {e}")
# 如果排序失败,返回原始数据
return ranking_data
def format_mix_item(doc): def format_mix_item(doc):
"""格式化合集数据项 - 完全按照数据库原始字段返回""" """格式化合集数据项 - 完全按照数据库原始字段返回"""
return { return {
@ -155,7 +209,7 @@ def get_mix_list(page=1, limit=20, sort_by="playcount"):
return {"success": False, "message": f"获取数据失败: {str(e)}"} return {"success": False, "message": f"获取数据失败: {str(e)}"}
def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None): def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None):
"""获取按播放量增长排序的合集列表""" """获取按播放量增长排序的合集列表 - 优先从定时器生成的数据中读取"""
try: try:
# 计算跳过的数量 # 计算跳过的数量
skip = (page - 1) * limit skip = (page - 1) * limit
@ -171,6 +225,53 @@ def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None):
if isinstance(end_date, str): if isinstance(end_date, str):
end_date = datetime.strptime(end_date, "%Y-%m-%d").date() end_date = datetime.strptime(end_date, "%Y-%m-%d").date()
end_date_str = end_date.strftime("%Y-%m-%d")
start_date_str = start_date.strftime("%Y-%m-%d")
# 优先尝试从定时器生成的增长榜数据中读取
try:
growth_ranking = daily_rankings_collection.find_one({
"date": end_date_str,
"type": "growth",
"start_date": start_date_str,
"end_date": end_date_str
}, sort=[("calculation_sequence", -1)]) # 获取最新的计算结果
if growth_ranking and "data" in growth_ranking:
logging.info(f"📈 从定时器生成的增长榜数据中读取 {end_date_str} 的增长榜")
# 获取预先计算好的增长榜数据
growth_data = growth_ranking["data"]
# 分页处理
total = len(growth_data)
paginated_data = growth_data[skip:skip + limit]
return {
"success": True,
"data": paginated_data,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"sort_by": "growth",
"date_range": {
"start_date": start_date_str,
"end_date": end_date_str
},
"data_source": "timer_generated", # 标识数据来源
"update_time": growth_ranking.get("created_at", datetime.now()).strftime("%Y-%m-%d %H:%M:%S") if isinstance(growth_ranking.get("created_at"), datetime) else str(growth_ranking.get("created_at", ""))
}
except Exception as e:
logging.warning(f"从定时器数据读取增长榜失败,将使用动态计算: {e}")
# 如果定时器数据不存在或读取失败,回退到动态计算
logging.info(f"📊 动态计算 {start_date_str}{end_date_str} 的增长榜")
# 查询结束日期的数据 # 查询结束日期的数据
end_cursor = collection.find({ end_cursor = collection.find({
"batch_time": { "batch_time": {
@ -204,15 +305,15 @@ def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None):
if growth > 0: if growth > 0:
item = format_mix_item(end_item) item = format_mix_item(end_item)
item["growth"] = growth item["growth"] = growth
item["start_date"] = start_date.strftime("%Y-%m-%d") item["start_date"] = start_date_str
item["end_date"] = end_date.strftime("%Y-%m-%d") item["end_date"] = end_date_str
growth_data.append(item) growth_data.append(item)
else: else:
# 如果开始日期没有数据,但结束日期有,也认为是新增长 # 如果开始日期没有数据,但结束日期有,也认为是新增长
item = format_mix_item(end_item) item = format_mix_item(end_item)
item["growth"] = end_item.get("play_vv", 0) item["growth"] = end_item.get("play_vv", 0)
item["start_date"] = start_date.strftime("%Y-%m-%d") item["start_date"] = start_date_str
item["end_date"] = end_date.strftime("%Y-%m-%d") item["end_date"] = end_date_str
growth_data.append(item) growth_data.append(item)
# 按增长值降序排序 # 按增长值降序排序
@ -239,9 +340,10 @@ def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None):
}, },
"sort_by": "growth", "sort_by": "growth",
"date_range": { "date_range": {
"start_date": start_date.strftime("%Y-%m-%d"), "start_date": start_date_str,
"end_date": end_date.strftime("%Y-%m-%d") "end_date": end_date_str
}, },
"data_source": "dynamic_calculation", # 标识数据来源
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
} }
@ -500,3 +602,245 @@ def health_check():
except Exception as e: except Exception as e:
logging.error(f"健康检查失败: {e}") logging.error(f"健康检查失败: {e}")
return jsonify({"success": False, "message": f"服务异常: {str(e)}"}) return jsonify({"success": False, "message": f"服务异常: {str(e)}"})
# ==================== 榜单查询API接口 ====================
@rank_bp.route('/rankings')
def get_rankings():
"""获取榜单列表 - 支持按日期和类型查询,支持动态排序"""
try:
# 获取查询参数
date = request.args.get('date') # 日期格式YYYY-MM-DD
ranking_type = request.args.get('type') # 榜单类型playcount, growth, newcomer
sort_by = request.args.get('sort_by', 'default') # 排序方式default, play_vv_change, play_vv_change_rate, play_vv
sort_order = request.args.get('sort_order', 'desc') # 排序顺序asc, desc
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 50))
# 构建查询条件
query = {}
if date:
query['date'] = date
if ranking_type:
query['ranking_type'] = ranking_type
# 如果没有指定日期,默认获取最新日期的榜单
if not date:
latest_ranking = daily_rankings_collection.find_one(
{}, sort=[('date', -1)]
)
if latest_ranking:
query['date'] = latest_ranking['date']
# 查询榜单
rankings = list(daily_rankings_collection.find(query).sort('generated_at', -1))
if not rankings:
return jsonify({
"success": True,
"message": "暂无榜单数据",
"data": {
"rankings": [],
"total": 0,
"page": page,
"limit": limit
}
})
# 格式化返回数据
formatted_rankings = []
for ranking in rankings:
ranking_data = ranking.get('data', [])
# 动态排序逻辑
if sort_by != 'default' and ranking_data:
ranking_data = sort_ranking_data(ranking_data, sort_by, sort_order)
# 分页处理榜单数据
start_idx = (page - 1) * limit
end_idx = start_idx + limit
paginated_data = ranking_data[start_idx:end_idx]
formatted_rankings.append({
"date": ranking.get('date'),
"ranking_type": ranking.get('ranking_type'),
"ranking_name": ranking.get('ranking_name'),
"description": ranking.get('description'),
"data": paginated_data,
"total_count": len(ranking_data),
"current_page_count": len(paginated_data),
"generated_at": format_time(ranking.get('generated_at')),
"version": ranking.get('version', '1.0'),
"sort_info": {
"sort_by": sort_by,
"sort_order": sort_order
}
})
return jsonify({
"success": True,
"message": "获取榜单成功",
"data": {
"rankings": formatted_rankings,
"total": len(formatted_rankings),
"page": page,
"limit": limit,
"sort_by": sort_by,
"sort_order": sort_order
}
})
except Exception as e:
logging.error(f"获取榜单失败: {e}")
return jsonify({"success": False, "message": f"获取榜单失败: {str(e)}"})
@rank_bp.route('/rankings/dates')
def get_ranking_dates():
"""获取可用的榜单日期列表"""
try:
# 获取所有不重复的日期
dates = daily_rankings_collection.distinct('date')
dates.sort(reverse=True) # 按日期倒序排列
return jsonify({
"success": True,
"message": "获取日期列表成功",
"data": {
"dates": dates,
"total": len(dates)
}
})
except Exception as e:
logging.error(f"获取日期列表失败: {e}")
return jsonify({"success": False, "message": f"获取日期列表失败: {str(e)}"})
@rank_bp.route('/rankings/types')
def get_ranking_types():
"""获取支持的榜单类型"""
try:
# 获取所有不重复的榜单类型
types = daily_rankings_collection.distinct('ranking_type')
# 添加类型说明
type_descriptions = {
'playcount': '播放量榜 - 按播放量排序',
'growth': '增长榜 - 播放量增长最快',
'newcomer': '新晋榜 - 新上榜内容'
}
formatted_types = []
for type_name in types:
formatted_types.append({
"type": type_name,
"description": type_descriptions.get(type_name, type_name)
})
return jsonify({
"success": True,
"message": "获取榜单类型成功",
"data": {
"types": formatted_types,
"total": len(formatted_types)
}
})
except Exception as e:
logging.error(f"获取榜单类型失败: {e}")
return jsonify({"success": False, "message": f"获取榜单类型失败: {str(e)}"})
@rank_bp.route('/rankings/latest')
def get_latest_rankings():
"""获取最新的所有类型榜单"""
try:
# 获取最新日期
latest_ranking = daily_rankings_collection.find_one(
{}, sort=[('date', -1)]
)
if not latest_ranking:
return jsonify({
"success": True,
"message": "暂无榜单数据",
"data": {
"date": None,
"rankings": []
}
})
latest_date = latest_ranking['date']
# 获取该日期的所有榜单
rankings = list(daily_rankings_collection.find({
'date': latest_date
}).sort('ranking_type', 1))
formatted_rankings = []
for ranking in rankings:
# 只返回前20条数据
ranking_data = ranking.get('data', [])[:20]
formatted_rankings.append({
"ranking_type": ranking.get('ranking_type'),
"ranking_name": ranking.get('ranking_name'),
"description": ranking.get('description'),
"data": ranking_data,
"total_count": ranking.get('total_count', 0),
"preview_count": len(ranking_data)
})
return jsonify({
"success": True,
"message": "获取最新榜单成功",
"data": {
"date": latest_date,
"rankings": formatted_rankings,
"total_types": len(formatted_rankings)
}
})
except Exception as e:
logging.error(f"获取最新榜单失败: {e}")
return jsonify({"success": False, "message": f"获取最新榜单失败: {str(e)}"})
@rank_bp.route('/rankings/stats')
def get_rankings_stats():
"""获取榜单统计信息"""
try:
# 统计总榜单数
total_rankings = daily_rankings_collection.count_documents({})
# 统计日期数量
total_dates = len(daily_rankings_collection.distinct('date'))
# 统计榜单类型数量
total_types = len(daily_rankings_collection.distinct('ranking_type'))
# 获取最新和最早日期
latest_ranking = daily_rankings_collection.find_one({}, sort=[('date', -1)])
earliest_ranking = daily_rankings_collection.find_one({}, sort=[('date', 1)])
latest_date = latest_ranking['date'] if latest_ranking else None
earliest_date = earliest_ranking['date'] if earliest_ranking else None
return jsonify({
"success": True,
"message": "获取榜单统计成功",
"data": {
"total_rankings": total_rankings,
"total_dates": total_dates,
"total_types": total_types,
"latest_date": latest_date,
"earliest_date": earliest_date,
"date_range": f"{earliest_date}{latest_date}" if earliest_date and latest_date else "暂无数据"
}
})
except Exception as e:
logging.error(f"获取榜单统计失败: {e}")
return jsonify({"success": False, "message": f"获取榜单统计失败: {str(e)}"})