rank_backend/backend/routers/rank_api_routes.py
Qyir 2a32b2a8c0 1.添加判断代码,启动定时器时不调用主代码的某几个函数,确保定时器正常计算播放量差值
2.新增功能:获取点赞,收藏,转发数量+评论内容列表(不完整,正在继续优化)
3.增加数据库文件夹,当启动定时器时存储到Ranking_storage_list中,
按照Ranking_storage_list中的数据进行计算播放量差值,计算结果存入Ranking_storage中
单独运行rank_data_scraper.py的时候存入Rankings_list

原因:
Rankings_list里面存储的数据结构较多
Ranking_storage_list里面存储的主要是播放量
Rankings_list里面存入的是播放量差值
2025-10-23 10:04:44 +08:00

825 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
小程序专用抖音播放量数据API服务器
优化的数据格式和接口设计,专为小程序使用
"""
from flask import Blueprint, request, jsonify
from datetime import datetime, timedelta
import logging
import re
from database import db
# 创建蓝图
rank_bp = Blueprint('rank', __name__, url_prefix='/api/rank')
# 获取数据库集合
collection = db['Rankings_list']
daily_rankings_collection = db['Ranking_storage'] # 榜单存储表
def format_playcount(playcount_str):
"""格式化播放量字符串为数字"""
if not playcount_str:
return 0
try:
if isinstance(playcount_str, (int, float)):
return int(playcount_str)
playcount_str = str(playcount_str).strip()
# 处理亿、万等单位
if "亿" in playcount_str:
num = float(re.findall(r'[\d.]+', playcount_str)[0])
return int(num * 100000000)
elif "" in playcount_str:
num = float(re.findall(r'[\d.]+', playcount_str)[0])
return int(num * 10000)
else:
# 尝试直接转换数字
return int(float(playcount_str))
except:
return 0
def format_cover_url(cover_data):
"""格式化封面图片URL"""
if not cover_data:
return ""
if isinstance(cover_data, str):
return cover_data
elif isinstance(cover_data, dict) and 'url_list' in cover_data:
return cover_data['url_list'][0] if cover_data['url_list'] else ""
else:
return ""
def format_time(time_obj):
"""格式化时间"""
if not time_obj:
return ""
if isinstance(time_obj, datetime):
return time_obj.strftime("%Y-%m-%d %H:%M:%S")
else:
return str(time_obj)
def sort_ranking_data(ranking_data, sort_by, sort_order='desc'):
"""
对榜单数据进行动态排序
Args:
ranking_data: 榜单数据列表
sort_by: 排序字段 (play_vv_change, play_vv_change_rate, play_vv, rank)
sort_order: 排序顺序 (asc, desc)
Returns:
排序后的榜单数据
"""
try:
# 定义排序键函数
def get_sort_key(item):
if sort_by == 'play_vv_change':
# 按播放量差值排序
timeline_data = item.get('timeline_data', {})
return timeline_data.get('play_vv_change', 0)
elif sort_by == 'play_vv_change_rate':
# 按播放量变化率排序
timeline_data = item.get('timeline_data', {})
return timeline_data.get('play_vv_change_rate', 0)
elif sort_by == 'play_vv':
# 按当前播放量排序
return item.get('play_vv', 0)
elif sort_by == 'rank':
# 按排名排序
return item.get('rank', 999999)
else:
# 默认按排名排序
return item.get('rank', 999999)
# 执行排序
reverse = (sort_order == 'desc')
# 对于排名字段,降序实际上是升序(排名越小越好)
if sort_by == 'rank':
reverse = (sort_order == 'asc')
sorted_data = sorted(ranking_data, key=get_sort_key, reverse=reverse)
# 重新分配排名
for i, item in enumerate(sorted_data, 1):
item['current_sort_rank'] = i
return sorted_data
except Exception as e:
logging.error(f"排序榜单数据失败: {e}")
# 如果排序失败,返回原始数据
return ranking_data
def format_mix_item(doc):
"""格式化合集数据项 - 完全按照数据库原始字段返回"""
return {
"_id": str(doc.get("_id", "")),
"batch_time": format_time(doc.get("batch_time")),
"mix_name": doc.get("mix_name", ""),
"video_url": doc.get("video_url", ""),
"playcount": doc.get("playcount", ""),
"play_vv": doc.get("play_vv", 0),
"request_id": doc.get("request_id", ""),
"rank": doc.get("rank", 0),
"cover_image_url": doc.get("cover_image_url", ""),
# 新增字段
"series_author": doc.get("series_author", ""),
"desc": doc.get("desc", ""),
"updated_to_episode": doc.get("updated_to_episode", 0),
"cover_backup_urls": doc.get("cover_backup_urls", []),
"mix_id": doc.get("mix_id", ""),
"episode_video_ids": doc.get("episode_video_ids", []),
"episode_details": doc.get("episode_details", [])
}
def get_mix_list(page=1, limit=20, sort_by="playcount"):
"""获取合集列表(分页)"""
try:
# 计算跳过的数量
skip = (page - 1) * limit
# 设置排序字段
if sort_by == "growth":
# 按增长排序需要特殊处理
return get_growth_mixes(page, limit)
else:
sort_field = "play_vv" if sort_by == "playcount" else "batch_time"
sort_order = -1 # 降序
# 获取今天的日期
today = datetime.now().date()
# 只查询今天的数据
query_condition = {
"batch_time": {
"$gte": datetime(today.year, today.month, today.day),
"$lt": datetime(today.year, today.month, today.day) + timedelta(days=1)
}
}
# 查询数据并按短剧名称分组,取每个短剧的最新记录
pipeline = [
{"$match": query_condition},
{"$sort": {"batch_time": -1}}, # 按时间倒序
{"$group": {
"_id": "$mix_name", # 按短剧名称分组
"latest_doc": {"$first": "$$ROOT"} # 取每个分组的第一条记录(最新记录)
}},
{"$replaceRoot": {"newRoot": "$latest_doc"}},
{"$sort": {sort_field: sort_order}},
{"$skip": skip},
{"$limit": limit}
]
docs = list(collection.aggregate(pipeline))
# 获取总数
total_pipeline = [
{"$match": query_condition},
{"$sort": {"batch_time": -1}},
{"$group": {"_id": "$mix_name"}},
{"$count": "total"}
]
total_result = list(collection.aggregate(total_pipeline))
total = total_result[0]["total"] if total_result else 0
# 格式化数据
mix_list = []
for doc in docs:
item = format_mix_item(doc)
mix_list.append(item)
return {
"success": True,
"data": mix_list,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"sort_by": sort_by,
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取合集列表失败: {e}")
return {"success": False, "message": f"获取数据失败: {str(e)}"}
def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None):
"""获取按播放量增长排序的合集列表 - 仅从Ranking_storage读取预计算数据"""
try:
# 计算跳过的数量
skip = (page - 1) * limit
# 如果没有提供日期,默认使用今天和昨天
if not start_date or not end_date:
end_date = datetime.now().date()
start_date = end_date - timedelta(days=1)
else:
# 转换字符串日期为datetime对象
if isinstance(start_date, str):
start_date = datetime.strptime(start_date, "%Y-%m-%d").date()
if isinstance(end_date, str):
end_date = datetime.strptime(end_date, "%Y-%m-%d").date()
end_date_str = end_date.strftime("%Y-%m-%d")
start_date_str = start_date.strftime("%Y-%m-%d")
# 从Ranking_storage读取预计算的增长榜数据
growth_ranking = daily_rankings_collection.find_one({
"date": end_date_str,
"type": "comprehensive" # 使用comprehensive类型包含增长数据
}, sort=[("calculation_sequence", -1)]) # 获取最新的计算结果
if not growth_ranking or "data" not in growth_ranking:
# 如果没有找到comprehensive类型尝试查找growth类型
growth_ranking = daily_rankings_collection.find_one({
"date": end_date_str,
"type": "growth"
}, sort=[("calculation_sequence", -1)])
if growth_ranking and "data" in growth_ranking:
logging.info(f"📈 从Ranking_storage读取 {end_date_str} 的增长榜数据")
# 获取预先计算好的增长榜数据
growth_data = growth_ranking["data"]
# 如果是comprehensive类型需要按增长值排序
if growth_ranking.get("type") == "comprehensive":
# 按timeline_data中的play_vv_change排序
growth_data = sorted(growth_data,
key=lambda x: x.get("timeline_data", {}).get("play_vv_change", 0),
reverse=True)
# 分页处理
total = len(growth_data)
paginated_data = growth_data[skip:skip + limit]
# 为分页数据添加排名
for i, item in enumerate(paginated_data):
item["rank"] = skip + i + 1
return {
"success": True,
"data": paginated_data,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"sort_by": "growth",
"date_range": {
"start_date": start_date_str,
"end_date": end_date_str
},
"data_source": "ranking_storage", # 标识数据来源
"update_time": growth_ranking.get("created_at", datetime.now()).strftime("%Y-%m-%d %H:%M:%S") if isinstance(growth_ranking.get("created_at"), datetime) else str(growth_ranking.get("created_at", ""))
}
else:
# 如果Ranking_storage中没有数据返回空结果
logging.warning(f"Ranking_storage中未找到 {end_date_str} 的增长榜数据")
return {
"success": True,
"message": f"暂无 {end_date_str} 的增长榜数据,请等待定时任务生成",
"data": [],
"pagination": {
"page": page,
"limit": limit,
"total": 0,
"pages": 0,
"has_next": False,
"has_prev": False
},
"sort_by": "growth",
"date_range": {
"start_date": start_date_str,
"end_date": end_date_str
},
"data_source": "ranking_storage",
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取增长合集列表失败: {e}")
# 返回错误信息,不再回退到播放量排序
return {
"success": False,
"message": f"获取增长榜数据失败: {str(e)}",
"data": [],
"pagination": {
"page": page,
"limit": limit,
"total": 0,
"pages": 0,
"has_next": False,
"has_prev": False
},
"sort_by": "growth",
"data_source": "ranking_storage"
}
def get_top_mixes(limit=10):
"""获取热门合集TOP榜单"""
try:
# 按播放量排序获取热门合集
cursor = collection.find().sort("play_vv", -1).limit(limit)
docs = list(cursor)
if not docs:
return {"success": False, "message": "暂无数据"}
# 格式化数据
top_list = []
for doc in docs:
item = format_mix_item(doc)
top_list.append(item)
return {
"success": True,
"data": top_list,
"total": len(top_list),
"update_time": format_time(docs[0].get("batch_time")) if docs else ""
}
except Exception as e:
logging.error(f"获取热门合集失败: {e}")
return {"success": False, "message": f"获取数据失败: {str(e)}"}
def search_mixes(keyword, page=1, limit=10):
"""搜索合集"""
try:
if not keyword:
return {"success": False, "message": "请提供搜索关键词"}
# 计算跳过的数量
skip = (page - 1) * limit
# 构建搜索条件(模糊匹配合集名称)
search_condition = {
"mix_name": {"$regex": keyword, "$options": "i"}
}
# 查询数据
cursor = collection.find(search_condition).sort("play_vv", -1).skip(skip).limit(limit)
docs = list(cursor)
# 获取搜索结果总数
total = collection.count_documents(search_condition)
# 格式化数据
search_results = []
for doc in docs:
item = format_mix_item(doc)
search_results.append(item)
return {
"success": True,
"data": search_results,
"keyword": keyword,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"搜索合集失败: {e}")
return {"success": False, "message": f"搜索失败: {str(e)}"}
def get_mix_detail(mix_id):
"""获取合集详情"""
try:
from bson import ObjectId
# 尝试通过ObjectId查找
try:
doc = collection.find_one({"_id": ObjectId(mix_id)})
except:
# 如果ObjectId无效尝试其他字段
doc = collection.find_one({
"$or": [
{"mix_name": mix_id},
{"request_id": mix_id}
]
})
if not doc:
return {"success": False, "message": "未找到合集信息"}
# 格式化详细信息 - 只返回数据库原始字段
detail = format_mix_item(doc)
return {
"success": True,
"data": detail,
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取合集详情失败: {e}")
return {"success": False, "message": f"获取详情失败: {str(e)}"}
def get_statistics():
"""获取统计信息"""
try:
# 基本统计
total_mixes = collection.count_documents({})
if total_mixes == 0:
return {"success": False, "message": "暂无数据"}
# 播放量统计
pipeline = [
{
"$group": {
"_id": None,
"total_playcount": {"$sum": "$play_vv"},
"avg_playcount": {"$avg": "$play_vv"},
"max_playcount": {"$max": "$play_vv"},
"min_playcount": {"$min": "$play_vv"}
}
}
]
stats_result = list(collection.aggregate(pipeline))
stats = stats_result[0] if stats_result else {}
# 获取最新更新时间
latest_doc = collection.find().sort("batch_time", -1).limit(1)
latest_time = ""
if latest_doc:
latest_list = list(latest_doc)
if latest_list:
latest_time = format_time(latest_list[0].get("batch_time"))
# 热门分类统计(按播放量区间)
categories = [
{"name": "超热门", "min": 100000000, "count": 0}, # 1亿+
{"name": "热门", "min": 50000000, "max": 99999999, "count": 0}, # 5000万-1亿
{"name": "中等", "min": 10000000, "max": 49999999, "count": 0}, # 1000万-5000万
{"name": "一般", "min": 0, "max": 9999999, "count": 0} # 1000万以下
]
for category in categories:
if "max" in category:
count = collection.count_documents({
"play_vv": {"$gte": category["min"], "$lte": category["max"]}
})
else:
count = collection.count_documents({
"play_vv": {"$gte": category["min"]}
})
category["count"] = count
return {
"success": True,
"data": {
"total_mixes": total_mixes,
"total_playcount": stats.get("total_playcount", 0),
"avg_playcount": int(stats.get("avg_playcount", 0)),
"max_playcount": stats.get("max_playcount", 0),
"min_playcount": stats.get("min_playcount", 0),
"categories": categories,
"latest_update": latest_time
},
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取统计信息失败: {e}")
return {"success": False, "message": f"获取统计失败: {str(e)}"}
# 路由定义
@rank_bp.route('/videos')
def get_videos():
"""获取合集列表 - 兼容app.py调用"""
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 20))
sort_by = request.args.get('sort', 'playcount')
if sort_by == 'growth':
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
result = get_growth_mixes(page, limit, start_date, end_date)
else:
result = get_mix_list(page, limit, sort_by)
return jsonify(result)
@rank_bp.route('/top')
def get_top():
"""获取热门榜单 - 兼容app.py调用"""
limit = int(request.args.get('limit', 10))
result = get_top_mixes(limit)
return jsonify(result)
@rank_bp.route('/search')
def search():
"""搜索合集 - 兼容app.py调用"""
keyword = request.args.get('q', '')
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
result = search_mixes(keyword, page, limit)
return jsonify(result)
@rank_bp.route('/detail')
def get_detail():
"""获取合集详情 - 兼容app.py调用"""
mix_id = request.args.get('id', '')
result = get_mix_detail(mix_id)
return jsonify(result)
@rank_bp.route('/stats')
def get_stats():
"""获取统计信息 - 兼容app.py调用"""
result = get_statistics()
return jsonify(result)
@rank_bp.route('/health')
def health_check():
"""健康检查 - 兼容app.py调用"""
try:
from database import client
# 检查数据库连接
if not client:
return jsonify({"success": False, "message": "数据库未连接"})
# 测试数据库连接
client.admin.command('ping')
# 获取数据统计
total_count = collection.count_documents({})
return jsonify({
"success": True,
"message": "服务正常",
"data": {
"database": "连接正常",
"total_records": total_count,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
})
except Exception as e:
logging.error(f"健康检查失败: {e}")
return jsonify({"success": False, "message": f"服务异常: {str(e)}"})
# ==================== 榜单查询API接口 ====================
@rank_bp.route('/rankings')
def get_rankings():
"""获取榜单列表 - 支持按日期和类型查询,支持动态排序"""
try:
# 获取查询参数
date = request.args.get('date') # 日期格式YYYY-MM-DD
ranking_type = request.args.get('type') # 榜单类型playcount, growth, newcomer
sort_by = request.args.get('sort_by', 'default') # 排序方式default, play_vv_change, play_vv_change_rate, play_vv
sort_order = request.args.get('sort_order', 'desc') # 排序顺序asc, desc
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 50))
# 构建查询条件
query = {}
if date:
query['date'] = date
if ranking_type:
query['ranking_type'] = ranking_type
# 如果没有指定日期,默认获取最新日期的榜单
if not date:
latest_ranking = daily_rankings_collection.find_one(
{}, sort=[('date', -1)]
)
if latest_ranking:
query['date'] = latest_ranking['date']
# 查询榜单
rankings = list(daily_rankings_collection.find(query).sort('generated_at', -1))
if not rankings:
return jsonify({
"success": True,
"message": "暂无榜单数据",
"data": {
"rankings": [],
"total": 0,
"page": page,
"limit": limit
}
})
# 格式化返回数据
formatted_rankings = []
for ranking in rankings:
ranking_data = ranking.get('data', [])
# 动态排序逻辑
if sort_by != 'default' and ranking_data:
ranking_data = sort_ranking_data(ranking_data, sort_by, sort_order)
# 分页处理榜单数据
start_idx = (page - 1) * limit
end_idx = start_idx + limit
paginated_data = ranking_data[start_idx:end_idx]
formatted_rankings.append({
"date": ranking.get('date'),
"ranking_type": ranking.get('ranking_type'),
"ranking_name": ranking.get('ranking_name'),
"description": ranking.get('description'),
"data": paginated_data,
"total_count": len(ranking_data),
"current_page_count": len(paginated_data),
"generated_at": format_time(ranking.get('generated_at')),
"version": ranking.get('version', '1.0'),
"sort_info": {
"sort_by": sort_by,
"sort_order": sort_order
}
})
return jsonify({
"success": True,
"message": "获取榜单成功",
"data": {
"rankings": formatted_rankings,
"total": len(formatted_rankings),
"page": page,
"limit": limit,
"sort_by": sort_by,
"sort_order": sort_order
}
})
except Exception as e:
logging.error(f"获取榜单失败: {e}")
return jsonify({"success": False, "message": f"获取榜单失败: {str(e)}"})
@rank_bp.route('/rankings/dates')
def get_ranking_dates():
"""获取可用的榜单日期列表"""
try:
# 获取所有不重复的日期
dates = daily_rankings_collection.distinct('date')
dates.sort(reverse=True) # 按日期倒序排列
return jsonify({
"success": True,
"message": "获取日期列表成功",
"data": {
"dates": dates,
"total": len(dates)
}
})
except Exception as e:
logging.error(f"获取日期列表失败: {e}")
return jsonify({"success": False, "message": f"获取日期列表失败: {str(e)}"})
@rank_bp.route('/rankings/types')
def get_ranking_types():
"""获取支持的榜单类型"""
try:
# 获取所有不重复的榜单类型
types = daily_rankings_collection.distinct('ranking_type')
# 添加类型说明
type_descriptions = {
'playcount': '播放量榜 - 按播放量排序',
'growth': '增长榜 - 播放量增长最快',
'newcomer': '新晋榜 - 新上榜内容'
}
formatted_types = []
for type_name in types:
formatted_types.append({
"type": type_name,
"description": type_descriptions.get(type_name, type_name)
})
return jsonify({
"success": True,
"message": "获取榜单类型成功",
"data": {
"types": formatted_types,
"total": len(formatted_types)
}
})
except Exception as e:
logging.error(f"获取榜单类型失败: {e}")
return jsonify({"success": False, "message": f"获取榜单类型失败: {str(e)}"})
@rank_bp.route('/rankings/latest')
def get_latest_rankings():
"""获取最新的所有类型榜单"""
try:
# 获取最新日期
latest_ranking = daily_rankings_collection.find_one(
{}, sort=[('date', -1)]
)
if not latest_ranking:
return jsonify({
"success": True,
"message": "暂无榜单数据",
"data": {
"date": None,
"rankings": []
}
})
latest_date = latest_ranking['date']
# 获取该日期的所有榜单
rankings = list(daily_rankings_collection.find({
'date': latest_date
}).sort('ranking_type', 1))
formatted_rankings = []
for ranking in rankings:
# 只返回前20条数据
ranking_data = ranking.get('data', [])[:20]
formatted_rankings.append({
"ranking_type": ranking.get('ranking_type'),
"ranking_name": ranking.get('ranking_name'),
"description": ranking.get('description'),
"data": ranking_data,
"total_count": ranking.get('total_count', 0),
"preview_count": len(ranking_data)
})
return jsonify({
"success": True,
"message": "获取最新榜单成功",
"data": {
"date": latest_date,
"rankings": formatted_rankings,
"total_types": len(formatted_rankings)
}
})
except Exception as e:
logging.error(f"获取最新榜单失败: {e}")
return jsonify({"success": False, "message": f"获取最新榜单失败: {str(e)}"})
@rank_bp.route('/rankings/stats')
def get_rankings_stats():
"""获取榜单统计信息"""
try:
# 统计总榜单数
total_rankings = daily_rankings_collection.count_documents({})
# 统计日期数量
total_dates = len(daily_rankings_collection.distinct('date'))
# 统计榜单类型数量
total_types = len(daily_rankings_collection.distinct('ranking_type'))
# 获取最新和最早日期
latest_ranking = daily_rankings_collection.find_one({}, sort=[('date', -1)])
earliest_ranking = daily_rankings_collection.find_one({}, sort=[('date', 1)])
latest_date = latest_ranking['date'] if latest_ranking else None
earliest_date = earliest_ranking['date'] if earliest_ranking else None
return jsonify({
"success": True,
"message": "获取榜单统计成功",
"data": {
"total_rankings": total_rankings,
"total_dates": total_dates,
"total_types": total_types,
"latest_date": latest_date,
"earliest_date": earliest_date,
"date_range": f"{earliest_date}{latest_date}" if earliest_date and latest_date else "暂无数据"
}
})
except Exception as e:
logging.error(f"获取榜单统计失败: {e}")
return jsonify({"success": False, "message": f"获取榜单统计失败: {str(e)}"})