Initial commit: 排行榜服务端项目

This commit is contained in:
qiaoyirui0819 2025-10-20 14:58:36 +08:00
commit 99ae97162c
12 changed files with 2757 additions and 0 deletions

59
.gitignore vendored Normal file
View File

@ -0,0 +1,59 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# Logs
*.log
logs/
miniprogram_api.log
# Data files
douyin_cdp_play_vv_*.json
douyin_cdp_play_vv_*.txt
# Chrome profiles and drivers
# 注意Chrome profile 包含大量缓存文件不应加入Git
scripts/config/chrome_profile/
drivers/*
!drivers/chromedriver.exe
# Rankings config directory
handlers/Rankings/config/
# Environment variables
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# IDE
.vscode/
.idea/
*.swp
*.swo
# OS
.DS_Store
Thumbs.db

363
Timer_worker.py Normal file
View File

@ -0,0 +1,363 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
抖音播放量自动抓取定时器 - 跨平台版本
功能
- 每晚自动执行抖音播放量抓取任务
- 数据抓取完成后自动生成各类榜单播放量榜增长榜新晋榜热门趋势榜
- 支持WindowsmacOSLinux
- 自动保存数据到MongoDB
使用方法
- 正常模式python Timer_worker.py启动定时器
- 测试模式python Timer_worker.py --test立即执行一次
- 单次执行python Timer_worker.py --once立即执行一次并退出
- 仅生成榜单python Timer_worker.py --ranking-only仅生成榜单不抓取数据
"""
import schedule
import time
import sys
import os
import logging
import argparse
from pathlib import Path
from datetime import datetime, date
import config
# 添加项目路径到 Python 路径
sys.path.append(os.path.join(os.path.dirname(__file__), 'handlers', 'Rankings'))
from rank_data_scraper import DouyinPlayVVScraper
# 配置日志的函数
def setup_logging():
"""设置日志配置"""
# 确保logs目录存在
import os
script_dir = os.path.dirname(os.path.abspath(__file__))
logs_dir = os.path.join(script_dir, 'handlers', 'Rankings', 'logs')
os.makedirs(logs_dir, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(logs_dir, 'scheduler.log'), encoding='utf-8'),
logging.StreamHandler()
]
)
class DouyinAutoScheduler:
def __init__(self):
self.is_running = False
def run_douyin_scraper(self):
"""执行抖音播放量抓取任务"""
try:
logging.info("🚀 开始执行抖音播放量抓取任务...")
# 设置环境变量,确保自动模式
os.environ['AUTO_CONTINUE'] = '1'
# 直接创建并运行 DouyinPlayVVScraper 实例
scraper = DouyinPlayVVScraper(
start_url="https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation",
auto_continue=True,
duration_s=60
)
logging.info("📁 开始执行抓取任务...")
scraper.run()
logging.info("✅ 抖音播放量抓取任务执行成功")
# 数据抓取完成后,自动生成当日榜单
self.generate_daily_rankings()
except Exception as e:
logging.error(f"💥 执行任务时发生异常: {e}")
import traceback
logging.error(f"详细错误信息: {traceback.format_exc()}")
def generate_daily_rankings(self):
"""生成每日榜单数据(基于时间轴对比)"""
try:
from database import db
from datetime import timedelta
# 获取集合
douyin_collection = db['Rankings_list'] # 使用真实抓取的数据
rankings_collection = db['Ranking_storage']
today = date.today()
yesterday = today - timedelta(days=1)
today_str = today.strftime('%Y-%m-%d')
yesterday_str = yesterday.strftime('%Y-%m-%d')
logging.info(f"📅 正在生成 {today_str} 的榜单(对比 {yesterday_str}...")
# 删除当天已有的榜单数据
rankings_collection.delete_many({"date": today_str})
logging.info(f"🗑️ 已清理 {today_str} 的旧榜单数据")
# 获取今天和昨天的榜单数据进行对比
try:
logging.info("🔄 正在生成时间轴对比榜单...")
# 获取今天的数据,按短剧名称去重,只保留播放量最高的
today_videos_raw = list(douyin_collection.find({}).sort("play_vv", -1))
# 按短剧名称去重,每个短剧只保留播放量最高的一条
unique_videos = {}
for video in today_videos_raw:
mix_name = video.get("mix_name", "")
if mix_name and (mix_name not in unique_videos or video.get("play_vv", 0) > unique_videos[mix_name].get("play_vv", 0)):
unique_videos[mix_name] = video
today_videos = list(unique_videos.values())
logging.info(f"📊 今日数据去重后:{len(today_videos)} 个独特短剧(原始数据:{len(today_videos_raw)} 条)")
# 获取昨天的榜单数据(如果存在),取最新的计算结果
yesterday_ranking = rankings_collection.find_one({
"date": yesterday_str,
"type": "comprehensive"
}, sort=[("calculation_sequence", -1)])
yesterday_data = {}
if yesterday_ranking and "data" in yesterday_ranking:
# 将昨天的数据转换为字典,以短剧名称为键
for item in yesterday_ranking["data"]:
title = item.get("title", "")
if title:
yesterday_data[title] = {
"rank": item.get("rank", 0),
"play_vv": item.get("play_vv", 0),
"video_id": item.get("video_id", "")
}
logging.info(f"📊 找到昨天的榜单数据,共 {len(yesterday_data)} 个短剧")
else:
logging.info("📊 未找到昨天的榜单数据,将作为首次生成")
if today_videos:
# 先计算所有视频的播放量差值
videos_with_growth = []
for video in today_videos:
video_id = str(video.get("_id", ""))
current_play_vv = video.get("play_vv", 0)
# 计算与昨天的对比数据
play_vv_change = 0
play_vv_change_rate = 0
is_new = True
mix_name = video.get("mix_name", "")
if mix_name in yesterday_data:
is_new = False
yesterday_play_vv = yesterday_data[mix_name]["play_vv"]
# 计算播放量变化
play_vv_change = current_play_vv - yesterday_play_vv
if yesterday_play_vv > 0:
play_vv_change_rate = round((play_vv_change / yesterday_play_vv) * 100, 2)
# 创建包含增长数据的视频项
video_with_growth = {
"video": video,
"play_vv_change": play_vv_change,
"play_vv_change_rate": play_vv_change_rate,
"is_new": is_new,
"yesterday_data": yesterday_data.get(mix_name, {})
}
videos_with_growth.append(video_with_growth)
# 按播放量差值降序排序(差值越大排名越靠前)
videos_with_growth.sort(key=lambda x: x["play_vv_change"], reverse=True)
comprehensive_ranking = {
"date": today_str,
"type": "comprehensive",
"name": "播放量增长榜单",
"description": f"基于 {yesterday_str}{today_str} 播放量差值排序的榜单(差值越大排名越靠前)",
"comparison_date": yesterday_str,
"total_videos": len(videos_with_growth),
"data": []
}
# 生成排序后的榜单数据
for i, item in enumerate(videos_with_growth, 1):
video = item["video"]
video_id = str(video.get("_id", ""))
current_play_vv = video.get("play_vv", 0)
mix_name = video.get("mix_name", "")
# 计算排名变化(基于昨天的排名)
rank_change = 0
if not item["is_new"] and item["yesterday_data"]:
yesterday_rank = item["yesterday_data"].get("rank", 0)
rank_change = yesterday_rank - i
ranking_item = {
"rank": i,
"title": mix_name,
"play_vv": current_play_vv,
"author": video.get("author", ""),
"video_id": video_id,
"video_url": video.get("video_url", ""),
"cover_image_url": video.get("cover_image_url", ""),
"playcount_str": video.get("playcount", ""),
# 时间轴对比数据
"timeline_data": {
"is_new": item["is_new"],
"rank_change": rank_change,
"play_vv_change": item["play_vv_change"],
"play_vv_change_rate": item["play_vv_change_rate"],
"yesterday_rank": item["yesterday_data"].get("rank", 0) if not item["is_new"] else 0,
"yesterday_play_vv": item["yesterday_data"].get("play_vv", 0) if not item["is_new"] else 0
}
}
comprehensive_ranking["data"].append(ranking_item)
# 为每次计算添加唯一的时间戳,确保数据唯一性
current_timestamp = datetime.now()
comprehensive_ranking["created_at"] = current_timestamp
comprehensive_ranking["calculation_id"] = f"{today_str}_{current_timestamp.strftime('%H%M%S')}"
# 检查今天已有多少次计算
existing_count = rankings_collection.count_documents({
"date": today_str,
"type": "comprehensive"
})
comprehensive_ranking["calculation_sequence"] = existing_count + 1
# 总是插入新的榜单记录,保留所有历史计算数据
rankings_collection.insert_one(comprehensive_ranking)
logging.info(f"📝 创建了新的今日榜单数据(第{existing_count + 1}次计算,包含最新差值)")
logging.info(f"🔖 计算ID: {comprehensive_ranking['calculation_id']}")
# 统计信息
new_count = sum(1 for item in comprehensive_ranking["data"] if item["timeline_data"]["is_new"])
logging.info(f"✅ 时间轴对比榜单生成成功")
logging.info(f"📊 总计 {len(comprehensive_ranking['data'])} 条记录")
logging.info(f"🆕 新上榜 {new_count}")
logging.info(f"🔄 对比基准日期: {yesterday_str}")
return True
else:
logging.warning("⚠️ 榜单生成失败:无今日数据")
return False
except Exception as e:
logging.error(f"💥 生成时间轴对比榜单时发生异常: {e}")
import traceback
logging.error(f"详细错误信息: {traceback.format_exc()}")
return False
except Exception as e:
logging.error(f"💥 生成榜单时发生异常: {e}")
import traceback
logging.error(f"详细错误信息: {traceback.format_exc()}")
def setup_schedule(self):
"""设置定时任务"""
# 从配置文件读取执行时间
scheduler_time = config.SCHEDULER_TIME
schedule.every().day.at(scheduler_time).do(self.run_douyin_scraper)
logging.info(f"⏰ 定时器已设置:每晚{scheduler_time}执行抖音播放量抓取")
def show_next_run(self):
"""显示下次执行时间"""
jobs = schedule.get_jobs()
if jobs:
next_run = jobs[0].next_run
logging.info(f"⏰ 下次执行时间: {next_run}")
def run_once(self):
"""立即执行一次"""
logging.info("🔧 立即执行模式...")
self.run_douyin_scraper()
def run_test(self):
"""测试模式 - 立即执行一次"""
logging.info("🧪 测试模式 - 立即执行抖音播放量抓取任务...")
self.run_douyin_scraper()
def run_ranking_only(self):
"""仅生成榜单(不抓取数据)"""
logging.info("📊 仅生成榜单模式...")
self.generate_daily_rankings()
def start_scheduler(self):
"""启动定时器"""
self.is_running = True
logging.info("🚀 抖音播放量自动抓取定时器已启动")
logging.info(f"⏰ 执行时间:每天{config.SCHEDULER_TIME}执行抖音播放量抓取")
logging.info("📁 目标脚本rank_data_scraper.py")
logging.info("💾 数据保存MongoDB")
logging.info("⏹️ 按 Ctrl+C 停止定时器")
try:
while self.is_running:
schedule.run_pending()
time.sleep(1)
# 每分钟显示一次状态
if int(time.time()) % 60 == 0:
self.show_next_run()
except KeyboardInterrupt:
logging.info("\n⏹️ 定时器已停止")
self.is_running = False
def main():
"""主函数"""
import argparse
try:
parser = argparse.ArgumentParser(description='抖音播放量自动抓取定时器')
parser.add_argument('--test', action='store_true', help='测试模式 - 立即执行一次')
parser.add_argument('--once', action='store_true', help='立即执行一次并退出')
parser.add_argument('--ranking-only', action='store_true', help='仅生成榜单(不抓取数据)')
args = parser.parse_args()
# 设置日志配置
setup_logging()
print("正在初始化定时器...")
scheduler = DouyinAutoScheduler()
if args.test:
print("执行测试模式...")
scheduler.run_test()
elif args.once:
print("执行单次模式...")
scheduler.run_once()
elif args.ranking_only:
print("执行榜单生成模式...")
scheduler.run_ranking_only()
else:
print("启动定时器模式...")
scheduler.setup_schedule()
scheduler.start_scheduler()
print("程序执行完成")
except Exception as e:
print(f"程序执行出错: {e}")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == '__main__':
main()

32
app.py Normal file
View File

@ -0,0 +1,32 @@
from flask import Flask, jsonify
from flask_cors import CORS
import logging
import os
app = Flask(__name__)
CORS(app) # 允许跨域访问
# 配置日志
# 确保logs目录存在
logs_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'handlers', 'Rankings', 'logs')
os.makedirs(logs_dir, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(logs_dir, 'app.log'), encoding='utf-8'),
logging.StreamHandler()
]
)
# 导入并注册蓝图
from routers.rank_api_routes import rank_bp
app.register_blueprint(rank_bp)
if __name__ == '__main__':
print("启动主程序服务...")
print("服务地址: http://localhost:5001")
app.run(host='0.0.0.0', port=5001, debug=True)

20
config.py Normal file
View File

@ -0,0 +1,20 @@
import os
import importlib
# 数据库配置
MONGO_URI = "mongodb://localhost:27017"
# MONGO_URI = "mongodb://mongouser:Jdei2243afN@172.16.0.6:27017,172.16.0.4:27017/test?replicaSet=cmgo-r6qkaern_0&authSource=admin"
MONGO_DB_NAME = "Rankings"
# 应用配置
APP_ENV = os.getenv('APP_ENV', 'development')
DEBUG = APP_ENV == 'development'
# 日志配置
LOG_LEVEL = 'INFO'
LOG_DIR = 'logs'
# 定时器配置
SCHEDULER_TIME = "24:00" # 定时器执行时间,格式为 HH:MM (24小时制)
print(f"Successfully loaded configuration for environment: {APP_ENV}")

19
database.py Normal file
View File

@ -0,0 +1,19 @@
from pymongo import MongoClient
import config
# from mongo_listeners import all_event_listeners # 导入监听器(暂时注释掉,因为文件不存在)
MONGO_URI = config.MONGO_URI
DB_NAME = config.MONGO_DB_NAME
# 创建MongoDB客户端连接
try:
# 实例化MongoClient时传入事件监听器
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) # 设置5秒超时
db = client[DB_NAME]
# 主动检查连接状态
client.admin.command('ping')
success_message = f"\033[92m成功连接到MongoDB: {DB_NAME}\033[0m"
print(success_message)
except Exception as e:
error_message = f"\033[91m数据库连接失败: {MONGO_URI}请检查MongoDB服务是否已启动。\033[0m"
print(error_message)

510
docs/API接口文档.md Normal file
View File

@ -0,0 +1,510 @@
# 抖音播放量数据API接口文档
## 概述
本API服务提供抖音播放量数据的查询、搜索、统计等功能专为小程序优化设计。
**基础信息**
- 服务地址:`http://localhost:5001`
- 数据源MongoDB数据库
- 数据更新每日14:23自动更新
- 响应格式JSON
## 通用响应格式
所有API接口都遵循以下响应格式
```json
{
"success": true,
"data": {},
"message": "操作成功",
"update_time": "2025-10-17 15:30:00"
}
```
**字段说明**
- `success`: 请求是否成功
- `data`: 返回的数据内容
- `message`: 操作结果描述
- `update_time`: 数据更新时间
## 数据模型
### 合集数据项
```json
{
"_id": "674f1234567890abcdef",
"batch_time": "2025-10-17 15:30:00",
"mix_name": "合集名称",
"video_url": "https://www.douyin.com/video/xxx",
"playcount": "1.2亿",
"play_vv": 120000000,
"request_id": "request_xxx",
"rank": 1,
"cover_image_url": "https://p3.douyinpic.com/xxx",
"cover_backup_urls": ["url1", "url2"]
}
```
### 分页信息
```json
{
"pagination": {
"page": 1,
"limit": 20,
"total": 100,
"pages": 5,
"has_next": true,
"has_prev": false
}
}
```
## API接口列表
### 1. 首页信息
**接口地址**
```
GET /
```
**功能描述**
获取API服务的基本信息和所有可用接口列表
**请求参数**
**响应示例**
```json
{
"success": true,
"data": {
"name": "抖音播放量数据API服务",
"version": "2.0",
"description": "主程序服务 - 整合小程序API功能",
"endpoints": {
"/api/rank/videos": "获取视频列表 (支持分页和排序)",
"/api/rank/top": "获取热门视频榜单",
"/api/rank/search": "搜索视频",
"/api/rank/detail": "获取视频详情",
"/api/rank/stats": "获取统计信息",
"/api/rank/health": "健康检查",
"/api/rank/rankings": "获取榜单列表"
},
"features": [
"分页支持",
"多种排序方式",
"搜索功能",
"详情查看",
"统计分析",
"小程序优化"
]
}
}
```
### 2. 获取视频列表
**接口地址**
```
GET /api/rank/videos
```
**功能描述**
获取视频合集列表,支持分页和排序
**请求参数**
| 参数名 | 类型 | 必填 | 默认值 | 说明 |
|--------|------|------|--------|------|
| page | int | 否 | 1 | 页码 |
| limit | int | 否 | 20 | 每页数量 |
| sort | string | 否 | playcount | 排序方式playcount(播放量), growth(增长量) |
| start_date | string | 否 | 昨天 | 增长计算开始日期(YYYY-MM-DD) |
| end_date | string | 否 | 今天 | 增长计算结束日期(YYYY-MM-DD) |
**排序说明**
- `playcount`: 按当前播放量从高到低排序
- `growth`: 按播放量增长差值从大到小排序
- 计算公式:增长值 = 结束日期播放量 - 开始日期播放量
- 只显示增长为正数的合集
- 排序规则:增长差值越大,排名越靠前
**使用示例**
```
# 按播放量排序(当前播放量从高到低)
GET /api/rank/videos?page=1&limit=20&sort=playcount
# 按增长量排序(播放量差值从大到小,默认昨天到今天的增长)
GET /api/rank/videos?page=1&limit=20&sort=growth
# 按自定义日期范围的增长排序(播放量差值从大到小)
GET /api/rank/videos?page=1&limit=20&sort=growth&start_date=2025-10-16&end_date=2025-10-17
```
**响应示例**
播放量排序响应:
```json
{
"success": true,
"data": [
{
"_id": "674f1234567890abcdef",
"batch_time": "2025-10-17 15:30:00",
"mix_name": "热门合集1",
"video_url": "https://www.douyin.com/video/xxx",
"playcount": "1.2亿",
"play_vv": 120000000,
"request_id": "request_xxx",
"rank": 1,
"cover_image_url": "https://p3.douyinpic.com/xxx",
"cover_backup_urls": ["url1", "url2"]
}
],
"pagination": {
"page": 1,
"limit": 20,
"total": 100,
"pages": 5,
"has_next": true,
"has_prev": false
},
"sort_by": "playcount",
"update_time": "2025-10-17 15:30:00"
}
```
增长榜排序响应包含额外的growth字段
```json
{
"success": true,
"data": [
{
"_id": "674f1234567890abcdef",
"batch_time": "2025-10-17 15:30:00",
"mix_name": "热门合集1",
"video_url": "https://www.douyin.com/video/xxx",
"playcount": "1.2亿",
"play_vv": 120000000,
"request_id": "request_xxx",
"rank": 1,
"growth": 5000000,
"start_date": "2025-10-16",
"end_date": "2025-10-17",
"cover_image_url": "https://p3.douyinpic.com/xxx",
"cover_backup_urls": ["url1", "url2"]
}
],
"pagination": {
"page": 1,
"limit": 20,
"total": 50,
"pages": 3,
"has_next": true,
"has_prev": false
},
"sort_by": "growth",
"date_range": {
"start_date": "2025-10-16",
"end_date": "2025-10-17"
},
"update_time": "2025-10-17 15:30:00"
}
```
### 3. 获取热门榜单
**接口地址**
```
GET /api/rank/top
```
**功能描述**
获取热门视频榜单(按播放量排序)
**请求参数**
| 参数名 | 类型 | 必填 | 默认值 | 说明 |
|--------|------|------|--------|------|
| limit | int | 否 | 10 | 返回数量 |
**使用示例**
```
GET /api/rank/top?limit=10
```
**响应示例**
```json
{
"success": true,
"data": [
{
"_id": "674f1234567890abcdef",
"batch_time": "2025-10-17 15:30:00",
"mix_name": "热门合集1",
"video_url": "https://www.douyin.com/video/xxx",
"playcount": "1.2亿",
"play_vv": 120000000,
"request_id": "request_xxx",
"rank": 1,
"cover_image_url": "https://p3.douyinpic.com/xxx",
"cover_backup_urls": ["url1", "url2"]
}
],
"total": 10,
"update_time": "2025-10-17 15:30:00"
}
```
### 4. 搜索视频
**接口地址**
```
GET /api/rank/search
```
**功能描述**
根据关键词搜索视频合集
**请求参数**
| 参数名 | 类型 | 必填 | 默认值 | 说明 |
|--------|------|------|--------|------|
| q | string | 是 | - | 搜索关键词 |
| page | int | 否 | 1 | 页码 |
| limit | int | 否 | 10 | 每页数量 |
**使用示例**
```
GET /api/rank/search?q=关键词&page=1&limit=10
```
**响应示例**
```json
{
"success": true,
"data": [
{
"_id": "674f1234567890abcdef",
"batch_time": "2025-10-17 15:30:00",
"mix_name": "包含关键词的合集",
"video_url": "https://www.douyin.com/video/xxx",
"playcount": "1.2亿",
"play_vv": 120000000,
"request_id": "request_xxx",
"rank": 1,
"cover_image_url": "https://p3.douyinpic.com/xxx",
"cover_backup_urls": ["url1", "url2"]
}
],
"keyword": "关键词",
"pagination": {
"page": 1,
"limit": 10,
"total": 15,
"pages": 2,
"has_next": true,
"has_prev": false
},
"update_time": "2025-10-17 15:30:00"
}
```
### 5. 获取视频详情
**接口地址**
```
GET /api/rank/detail
```
**功能描述**
获取指定合集的详细信息
**请求参数**
| 参数名 | 类型 | 必填 | 默认值 | 说明 |
|--------|------|------|--------|------|
| id | string | 是 | - | 合集ID支持ObjectId、合集名称、request_id |
**使用示例**
```
GET /api/rank/detail?id=674f1234567890abcdef
```
**响应示例**
```json
{
"success": true,
"data": {
"_id": "674f1234567890abcdef",
"batch_time": "2025-10-17 15:30:00",
"mix_name": "合集名称",
"video_url": "https://www.douyin.com/video/xxx",
"playcount": "1.2亿",
"play_vv": 120000000,
"request_id": "request_xxx",
"rank": 1,
"cover_image_url": "https://p3.douyinpic.com/xxx",
"cover_backup_urls": ["url1", "url2"]
},
"update_time": "2025-10-17 15:30:00"
}
```
### 6. 获取统计信息
**接口地址**
```
GET /api/rank/stats
```
**功能描述**
获取系统统计信息
**请求参数**
**响应示例**
```json
{
"success": true,
"data": {
"total_mixes": 1000,
"total_playcount": 5000000000,
"avg_playcount": 5000000,
"max_playcount": 200000000,
"min_playcount": 1000,
"categories": [
{
"name": "超热门",
"min": 100000000,
"count": 5
},
{
"name": "热门",
"min": 50000000,
"max": 99999999,
"count": 20
},
{
"name": "中等",
"min": 10000000,
"max": 49999999,
"count": 150
},
{
"name": "一般",
"min": 0,
"max": 9999999,
"count": 825
}
],
"latest_update": "2025-10-17 15:30:00"
},
"update_time": "2025-10-17 15:30:00"
}
```
### 7. 健康检查
**接口地址**
```
GET /api/rank/health
```
**功能描述**
检查服务状态和数据库连接
**请求参数**
**响应示例**
```json
{
"success": true,
"message": "服务正常",
"data": {
"database": "连接正常",
"total_records": 1000,
"timestamp": "2025-10-17 15:30:00"
}
}
```
## 错误处理
### 通用错误格式
```json
{
"success": false,
"message": "错误描述",
"update_time": "2025-10-17 15:30:00"
}
```
### 常见错误
- `数据库连接失败`MongoDB连接异常
- `未找到合集信息`:查询的合集不存在
- `请提供搜索关键词`:搜索接口缺少关键词参数
- `获取数据失败`:数据查询异常
## 小程序使用建议
### 1. 分页加载
推荐使用分页加载,避免一次性加载过多数据:
```javascript
// 小程序端示例
wx.request({
url: 'http://localhost:5001/api/rank/videos',
data: {
page: 1,
limit: 20,
sort: 'playcount'
},
success: (res) => {
if (res.data.success) {
this.setData({
videos: res.data.data,
hasNext: res.data.pagination.has_next
})
}
}
})
```
### 2. 搜索优化
- 使用防抖处理搜索请求
- 显示搜索进度和结果数量
- 提供搜索建议
### 3. 图片加载
- 优先使用 `cover_image_url`
- 备用 `cover_backup_urls` 作为备选
- 添加图片加载失败处理
### 4. 数据更新
- 注意 `update_time` 字段,判断数据新鲜度
- 合理使用缓存策略
- 定期检查服务健康状态
## 部署说明
### 启动服务
```bash
cd C:\Users\EDY\Desktop\rank_backend
python app.py
```
### 服务信息
- 端口5000
- 数据库MongoDB (localhost:27017)
- 数据更新每晚24:00自动执行
### 注意事项
- 确保MongoDB服务已启动
- 确保网络连接正常
- 小程序端需要配置合法域名(生产环境)
---
**文档版本**v2.0
**最后更新**2025-10-17**维护者**:系统自动生成

101
docs/README.md Normal file
View File

@ -0,0 +1,101 @@
# 排名系统Rankings说明大纲
## 1. 项目概览
- 提供抖音收藏合集真实播放量数据采集与API服务
- 抓取脚本写入 MongoDBAPI 按播放量与增长榜返回数据
## 2. 目录速览(关键)
- `handlers/Rankings/rank_data_scraper.py` 数据抓取脚本Selenium+CDP
- `routers/rank_api_routes.py` 小程序 API 数据访问/逻辑模块(由 `app.py` 调用,不独立运行)
- `app.py` 主服务入口Flask应用注册所有 API 路由)
- `Timer_worker.py` 定时任务,每日自动运行抓取
### 项目结构(简版)
```
项目根/
├── app.py # 主服务入口5001
├── Timer_worker.py # 定时抓取任务
├── config.py # 全局配置
├── database.py # 数据库封装
├── docs/ # 项目文档
│ ├── README.md # 项目说明文档
│ ├── API接口文档.md # API接口说明
│ └── requirements.txt # 依赖包列表
├── routers/
│ └── rank_api_routes.py # 小程序API逻辑模块
└── handlers/
└── Rankings/
├── rank_data_scraper.py # 抓取脚本Selenium+CDP
└── drivers/ # 浏览器驱动等
└── chromedriver.exe # Chrome驱动程序
```
- 核心数据表:`Rankings/Rankings_list`
- 日志示例:`handlers/Rankings/logs/douyin_scraper.log`
## 3. 服务与端口
- 单一服务:`app.py`(默认端口 `5001`,包含小程序 API 路由)
## 4. 一键启动
- 启动主服务:
```bash
python app.py
```
- 启动定时任务(每日 9:35 自动抓取):
```bash
python Timer_worker.py
```
## 5. 使用步骤(首次登录与日常)
- 安装依赖:
```bash
pip install -r docs/requirements.txt
```
- 第一次使用(登录抖音):
- 运行抓取脚本:`python handlers/Rankings/rank_data_scraper.py`
- 弹出 Chrome 后,完成抖音登录(扫码/账号均可)。
- 登录完成后,回到终端提示界面按回车继续抓取。
- 后续运行会复用已登录的浏览器配置,免重复登录。
- 日常流程:
- 抓取:`python handlers/Rankings/rank_data_scraper.py`
- 服务:`python app.py`(端口 `5001`
- 定时:`python Timer_worker.py`(每日 14:23 自动执行)
- 验证数据:
- MongoDB数据库 `Rankings`,集合 `Rankings_list`
- API 检查:
- `http://localhost:5001/api/rank/health`
- `http://localhost:5001/api/rank/videos?page=1&limit=20&sort=playcount`
- 增长榜:`http://localhost:5001/api/rank/videos?sort=growth&page=1&limit=20`
## 6. 数据抓取流程(简版)
- 复用已登录的 Chrome 配置,滚动/刷新触发请求
- 通过 CDP 捕获响应,解析 `play_vv` 与 SSR 数据
- 按合集聚合视频,写入 MongoDB 指定集合
## 7. 数据库与集合
- 数据库:`Rankings`
- 集合:`Rankings_list`
- 连接:`mongodb://localhost:27017/`(可通过环境变量覆盖)
## 8. API 功能摘要
- 视频列表(分页、按播放量/时间排序,仅当日最新数据)
- 增长榜(按指定日期区间对比增长量,分页返回)
## 9. 配置项(环境变量)
- `MONGO_HOST` 默认 `localhost`
- `MONGO_PORT` 默认 `27017`
- `MONGO_DB` 默认 `Rankings`
- `MONGO_COLLECTION` 默认 `Rankings_list`
## 10. 快速排错
- MongoDB 连接失败:抓取脚本将仅保存本地文件日志
- ChromeDriver 配置:`handlers/Rankings/drivers/chromedriver.exe`
- 日志位置:`handlers/Rankings/logs/`(运行时自动创建)
## 11. 你需要知道的
- 当前架构下使用单一服务端口 `5001``routers/rank_api_routes.py` 提供逻辑模块,由 `app.py` 注册路由并统一对外服务。
- 抓取脚本与 API 使用同一集合,数据结构一致
- 小程序 API 专注返回易用字段(封面、播放量、时间、链接)
- 可直接在现有数据上新增排序或过滤,保持接口向后兼容
- ChromeDriver 已配置本地版本,避免网络下载问题

20
docs/requirements.txt Normal file
View File

@ -0,0 +1,20 @@
# Web抓取相关
selenium>=4.15.0
chromedriver-autoinstaller>=0.6.0
webdriver-manager>=4.0.0
# Web服务框架
flask>=2.3.0
flask-cors>=4.0.0
# 数据库
pymongo>=4.5.0
# 定时任务
schedule>=1.2.0
# 系统工具
psutil>=5.9.0
# 数据处理
pathlib2>=2.3.7; python_version<"3.4"

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,787 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Selenium + Chrome DevTools Protocol 抓取抖音收藏合集真实播放量(play_vv)
核心能力
- 启用CDP网络事件获取响应体并解析play_vv
- 复用本地Chrome用户数据绕过登录障碍
- 自动滚动与刷新触发更多API请求
- 同时解析页面中的SSR数据(window._SSR_HYDRATED_DATA/RENDER_DATA)
使用方法
1) 默认复用 `config/chrome_profile` 下的已登录Chrome配置
2) 若仍需登录请在弹出的Chrome中完成登录后回到终端按回车
3) 程序会滚动和刷新自动收集网络数据并提取play_vv
"""
import json
import re
import subprocess
import time
import logging
import os
import shutil
from datetime import datetime
from selenium import webdriver
import os
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
# 保留导入但默认不使用webdriver_manager避免网络下载卡顿
from webdriver_manager.chrome import ChromeDriverManager # noqa: F401
import chromedriver_autoinstaller
import sys
import os
# 添加项目根目录到 Python 路径
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
from database import db
# 配置日志
# 确保logs目录存在
import os
script_dir = os.path.dirname(os.path.abspath(__file__))
logs_dir = os.path.join(script_dir, 'logs')
os.makedirs(logs_dir, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='[%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(os.path.join(logs_dir, 'douyin_scraper.log'), encoding='utf-8'),
logging.StreamHandler()
]
)
class DouyinPlayVVScraper:
def __init__(self, start_url: str = None, auto_continue: bool = False, duration_s: int = 60):
self.start_url = start_url or "https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation"
self.auto_continue = auto_continue
self.duration_s = duration_s
self.driver = None
self.play_vv_items = [] # list of dicts: {play_vv, formatted, url, request_id, mix_name, watched_item}
self.captured_responses = []
self.db = None
self.collection = None
self._cleanup_old_profiles()
self._setup_mongodb()
def _setup_mongodb(self):
"""设置MongoDB连接"""
try:
# 使用 database.py 中的连接
self.db = db
# 设置集合
mongo_collection = os.environ.get('MONGO_COLLECTION', 'Rankings_list')
self.collection = self.db[mongo_collection]
logging.info(f'MongoDB连接成功使用数据库: {self.db.name},集合: {mongo_collection}')
except Exception as e:
logging.warning(f'MongoDB设置出错: {e}')
self.db = None
self.collection = None
def _cleanup_old_profiles(self):
"""清理超过一天的旧临时Chrome配置文件"""
try:
script_dir = os.path.dirname(os.path.abspath(__file__))
profile_base_dir = os.path.join(script_dir, 'config', 'chrome_profile')
if not os.path.exists(profile_base_dir):
return
current_time = time.time()
one_day_ago = current_time - 24 * 60 * 60 # 24小时前
for item in os.listdir(profile_base_dir):
if item.startswith('run_'):
item_path = os.path.join(profile_base_dir, item)
if os.path.isdir(item_path):
try:
# 提取时间戳
timestamp = int(item.split('_')[1])
if timestamp < one_day_ago:
shutil.rmtree(item_path, ignore_errors=True)
logging.info(f'清理旧配置文件: {item}')
except (ValueError, IndexError):
# 如果无法解析时间戳,跳过
continue
except Exception as e:
logging.warning(f'清理旧配置文件时出错: {e}')
def _cleanup_chrome_processes(self):
"""清理可能占用配置文件的Chrome进程"""
try:
import subprocess
import psutil
# 获取当前配置文件路径
script_dir = os.path.dirname(os.path.abspath(__file__))
profile_dir = os.path.join(script_dir, 'config', 'chrome_profile', 'douyin_persistent')
# 查找使用该配置文件的Chrome进程
killed_processes = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
if proc.info['name'] and 'chrome' in proc.info['name'].lower():
cmdline = proc.info['cmdline']
if cmdline and any(profile_dir in arg for arg in cmdline):
proc.terminate()
killed_processes.append(proc.info['pid'])
logging.info(f'终止占用配置文件的Chrome进程: PID {proc.info["pid"]}')
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
# 等待进程终止
if killed_processes:
time.sleep(2)
return len(killed_processes) > 0
except ImportError:
# 如果没有psutil使用系统命令
try:
result = subprocess.run(['taskkill', '/f', '/im', 'chrome.exe'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
logging.info('使用taskkill清理Chrome进程')
time.sleep(2)
return True
except Exception as e:
logging.warning(f'清理Chrome进程失败: {e}')
return False
except Exception as e:
logging.warning(f'清理Chrome进程时出错: {e}')
return False
def setup_driver(self):
logging.info('初始化Chrome WebDriver (启用CDP网络日志)')
# 清理可能占用配置文件的Chrome进程
self._cleanup_chrome_processes()
chrome_options = Options()
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
chrome_options.add_experimental_option('useAutomationExtension', False)
chrome_options.add_argument('--disable-extensions')
chrome_options.add_argument('--remote-allow-origins=*')
chrome_options.add_argument('--remote-debugging-port=0')
chrome_options.add_argument('--start-maximized')
chrome_options.add_argument('--lang=zh-CN')
# 使用固定的Chrome配置文件目录以保持登录状态
script_dir = os.path.dirname(os.path.abspath(__file__))
profile_dir = os.path.join(script_dir, 'config', 'chrome_profile', 'douyin_persistent')
os.makedirs(profile_dir, exist_ok=True)
chrome_options.add_argument(f'--user-data-dir={profile_dir}')
logging.info(f'使用持久化Chrome配置文件: {profile_dir}')
# 明确设置Chrome二进制路径32位Chrome常见安装位置
possible_chrome_bins = [
r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe",
r"C:\Program Files\Google\Chrome\Application\chrome.exe"
]
for bin_path in possible_chrome_bins:
if os.path.exists(bin_path):
chrome_options.binary_location = bin_path
logging.info(f'使用Chrome二进制路径: {bin_path}')
break
# 性能日志Network事件
chrome_options.set_capability('goog:loggingPrefs', {'performance': 'ALL'})
# 仅使用本地或PATH中的chromedriver避免网络下载依赖
driver_ready = False
candidates = []
# 可通过环境变量强制覆盖驱动路径
env_override = os.environ.get('OVERRIDE_CHROMEDRIVER')
if env_override:
candidates.append(env_override)
logging.info(f'检测到环境变量 OVERRIDE_CHROMEDRIVER优先使用: {env_override}')
# 脚本所在目录的drivers路径优先
script_dir = os.path.dirname(os.path.abspath(__file__))
script_driver_path = os.path.join(script_dir, 'drivers', 'chromedriver.exe')
candidates.append(script_driver_path)
logging.info(f'优先尝试脚本目录路径: {script_driver_path}')
# 项目根目录的drivers路径
user_driver_path = os.path.join(os.getcwd(), 'drivers', 'chromedriver.exe')
candidates.append(user_driver_path)
logging.info(f'尝试项目根目录路径: {user_driver_path}')
# 项目根目录
candidates.append(os.path.join(os.getcwd(), 'chromedriver.exe'))
# 其他可能目录
candidates.append(os.path.join(os.getcwd(), 'drivers', 'chromedriver'))
# PATH 中的chromedriver
which_path = shutil.which('chromedriver')
if which_path:
candidates.append(which_path)
if not driver_ready:
for p in candidates:
try:
if p and os.path.exists(p):
logging.info(f'尝试使用chromedriver: {p}')
service = Service(p)
self.driver = webdriver.Chrome(service=service, options=chrome_options)
driver_ready = True
logging.info(f'使用chromedriver启动成功: {p}')
try:
caps = self.driver.capabilities
browser_ver = caps.get('browserVersion') or caps.get('version')
cdver = caps.get('chrome', {}).get('chromedriverVersion')
logging.info(f'Chrome版本: {browser_ver}, ChromeDriver版本: {cdver}')
except Exception:
pass
break
else:
logging.info(f'候选路径不存在: {p}')
except Exception as e:
logging.warning(f'尝试使用 {p} 启动失败: {e}')
if not driver_ready:
# 最终回退使用webdriver-manager可能需要网络
try:
service = Service(ChromeDriverManager().install())
self.driver = webdriver.Chrome(service=service, options=chrome_options)
driver_ready = True
logging.info('使用webdriver-manager成功启动ChromeDriver')
except Exception as e:
raise RuntimeError('未能启动ChromeDriver。请手动下载匹配版本的chromedriver到项目根目录或PATH或检查网络以允许webdriver-manager下载。错误: ' + str(e))
# 反检测
try:
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
except Exception:
pass
# 启用CDP Network
try:
self.driver.execute_cdp_cmd('Network.enable', {})
logging.info('已启用CDP Network')
except Exception as e:
logging.warning(f'启用CDP Network失败: {e}')
def navigate(self):
logging.info(f'导航到: {self.start_url}')
self.driver.get(self.start_url)
time.sleep(8) # 增加页面加载等待时间
def ensure_login(self):
"""确保用户已登录并导航到收藏合集页面"""
logging.info("检测登录状态和页面位置...")
# 首先检查是否已经登录并在正确页面
if self._check_login_and_page():
logging.info("检测到已登录且在收藏合集页面,跳过手动确认")
return
# 如果未登录或不在正确页面,进行手动登录流程
logging.info("请在弹出的浏览器中手动完成登录。")
if self.auto_continue:
logging.info('自动继续模式,跳过手动等待...')
time.sleep(5)
return
logging.info("进入手动登录确认循环...")
while True:
# 要求用户输入特定文本确认
logging.info("等待用户输入确认...")
user_input = input("请在浏览器中完成登录,并导航到【我的】→【收藏】→【合集】页面。操作完成后,请在此处输入 'ok' 并按回车: ")
if user_input.strip().lower() != 'ok':
logging.warning("请输入 'ok' 确认您已完成登录并导航到【我的】→【收藏】→【合集】页面。")
continue
logging.info("用户已确认,检查当前页面...")
try:
current_url = self.driver.current_url
logging.info(f"当前页面URL: {current_url}")
if ("douyin.com/user/self" in current_url and
("favorite_collection" in current_url or "compilation" in current_url)):
logging.info(f"已确认您位于收藏合集列表页面: {current_url}")
logging.info("脚本将继续执行...")
break
else:
# 用户确认了,但页面不正确,继续循环等待
logging.warning(f"检测到当前页面 ({current_url}) 并非收藏合集列表页面。请确保已导航至【我的】→【收藏】→【合集】页面。")
except Exception as e:
if "browser has been closed" in str(e) or "no such window" in str(e) or "target window already closed" in str(e):
logging.error("浏览器窗口已关闭,脚本无法继续。")
raise RuntimeError("浏览器窗口已关闭")
logging.warning(f"检测URL时出错: {e}。请重试。")
time.sleep(1)
def _check_login_and_page(self, timeout: int = 600) -> bool:
"""检查是否已登录并在正确页面"""
try:
current_url = self.driver.current_url
logging.info(f"当前页面URL: {current_url}")
# 检查是否在收藏合集页面
if ("douyin.com/user/self" in current_url and
("favorite_collection" in current_url or "compilation" in current_url)):
# 进一步检查登录状态
return self._detect_login_status(timeout)
else:
# 如果不在正确页面,尝试导航到收藏合集页面
if self._detect_login_status(timeout):
logging.info("已登录但不在收藏合集页面,自动导航...")
self.driver.get(self.start_url)
time.sleep(3)
return True
return False
except Exception as e:
logging.warning(f"检查登录状态时出错: {e}")
return False
def _detect_login_status(self, timeout: int = 600) -> bool:
"""自动检测是否已登录"""
try:
start = time.time()
while time.time() - start < timeout:
time.sleep(2)
# 检查登录状态的多个选择器
selectors = [
'[data-e2e="user-avatar"]',
'.user-avatar',
'[class*="avatar"]',
'[class*="Avatar"]'
]
for selector in selectors:
try:
elements = self.driver.find_elements("css selector", selector)
if elements:
logging.info("检测到用户头像,确认已登录")
return True
except Exception:
continue
# 检查是否有登录按钮(表示未登录)
login_selectors = [
'[data-e2e="login-button"]',
'button[class*="login"]',
'a[href*="login"]'
]
for selector in login_selectors:
try:
elements = self.driver.find_elements("css selector", selector)
if elements:
logging.info("检测到登录按钮,用户未登录")
return False
except Exception:
continue
logging.info("登录状态检测超时,假设未登录")
return False
except Exception as e:
logging.warning(f"登录状态检测出错: {e}")
return False
def trigger_loading(self):
logging.info('触发数据加载:滚动 + 刷新')
# 滚动触发懒加载
for i in range(8):
self.driver.execute_script(f'window.scrollTo(0, {i * 900});')
time.sleep(1.2)
# 刷新触发新请求
self.driver.refresh()
time.sleep(4)
for i in range(6):
self.driver.execute_script(f'window.scrollTo(0, {i * 1200});')
time.sleep(1.3)
def format_count(self, n: int) -> str:
if n >= 100_000_000:
return f"{n/100_000_000:.1f}亿"
if n >= 10_000:
return f"{n/10_000:.1f}"
return str(n)
def parse_play_vv_from_text(self, text: str, source_url: str, request_id: str = None):
"""解析文本中的play_vv、mix_name和watched_item信息"""
try:
# 尝试解析JSON数据
if text.strip().startswith('{') or text.strip().startswith('['):
try:
data = json.loads(text)
self._extract_from_json_data(data, source_url, request_id)
return
except json.JSONDecodeError:
pass
# 如果不是JSON使用正则表达式查找
self._extract_from_text_regex(text, source_url, request_id)
except Exception as e:
logging.warning(f'解析文本数据时出错: {e}')
def _extract_from_json_data(self, data, source_url: str, request_id: str = None):
"""从JSON数据中递归提取合集信息"""
def extract_mix_info(obj, path=""):
if isinstance(obj, dict):
# 检查是否包含合集信息
if 'mix_id' in obj and 'statis' in obj:
mix_id = obj.get('mix_id', '')
mix_name = obj.get('mix_name', '')
statis = obj.get('statis', {})
# 调试输出包含mix_id的完整对象结构仅输出前3个
if len(self.play_vv_items) < 3:
logging.info(f"=== 调试:合集对象结构 ===")
logging.info(f"完整对象键: {list(obj.keys())}")
# 查找可能的视频相关字段
for key, value in obj.items():
if 'aweme' in key.lower() or 'video' in key.lower() or 'item' in key.lower() or 'ids' in key.lower():
logging.info(f"可能的视频字段 {key}: {type(value)} - {str(value)[:200]}")
# 特别检查ids字段
if 'ids' in obj:
ids_value = obj['ids']
logging.info(f"ids字段详细信息: {type(ids_value)} - {ids_value}")
if isinstance(ids_value, list) and len(ids_value) > 0:
logging.info(f"ids列表长度: {len(ids_value)}")
logging.info(f"第一个ID: {ids_value[0]}")
if len(ids_value) > 1:
logging.info(f"第二个ID: {ids_value[1]}")
if isinstance(statis, dict) and 'play_vv' in statis:
play_vv = statis.get('play_vv')
if isinstance(play_vv, (int, str)) and str(play_vv).isdigit():
vv = int(play_vv)
# 构建合集链接
video_url = f"https://www.douyin.com/collection/{mix_id}" if mix_id else ""
# 提取合集封面图片URL - 直接存储完整的图片链接
cover_image_url = ""
cover_image_backup_urls = [] # 备用链接列表
# 查找封面图片字段优先获取完整的URL链接
if 'cover' in obj:
cover = obj['cover']
if isinstance(cover, dict) and 'url_list' in cover and cover['url_list']:
# 主链接
cover_image_url = cover['url_list'][0]
# 备用链接
cover_image_backup_urls = cover['url_list'][1:] if len(cover['url_list']) > 1 else []
elif isinstance(cover, str):
cover_image_url = cover
elif 'cover_url' in obj:
cover_url = obj['cover_url']
if isinstance(cover_url, dict) and 'url_list' in cover_url and cover_url['url_list']:
cover_image_url = cover_url['url_list'][0]
cover_image_backup_urls = cover_url['url_list'][1:] if len(cover_url['url_list']) > 1 else []
elif isinstance(cover_url, str):
cover_image_url = cover_url
elif 'image' in obj:
image = obj['image']
if isinstance(image, dict) and 'url_list' in image and image['url_list']:
cover_image_url = image['url_list'][0]
cover_image_backup_urls = image['url_list'][1:] if len(image['url_list']) > 1 else []
elif isinstance(image, str):
cover_image_url = image
elif 'pic' in obj:
pic = obj['pic']
if isinstance(pic, dict) and 'url_list' in pic and pic['url_list']:
cover_image_url = pic['url_list'][0]
cover_image_backup_urls = pic['url_list'][1:] if len(pic['url_list']) > 1 else []
elif isinstance(pic, str):
cover_image_url = pic
self.play_vv_items.append({
'play_vv': vv,
'formatted': self.format_count(vv),
'url': source_url,
'request_id': request_id,
'mix_name': mix_name,
'video_url': video_url, # 合集链接
'mix_id': mix_id, # 合集ID
'cover_image_url': cover_image_url, # 合集封面图片主链接完整URL
'cover_backup_urls': cover_image_backup_urls, # 封面图片备用链接列表
'timestamp': datetime.now().isoformat()
})
logging.info(f'提取到合集: {mix_name} (ID: {mix_id}) - {vv:,} 播放量')
# 递归搜索子对象
for key, value in obj.items():
if isinstance(value, (dict, list)):
extract_mix_info(value, f"{path}.{key}" if path else key)
elif isinstance(obj, list):
for i, item in enumerate(obj):
if isinstance(item, (dict, list)):
extract_mix_info(item, f"{path}[{i}]" if path else f"[{i}]")
extract_mix_info(data)
def _extract_from_text_regex(self, text: str, source_url: str, request_id: str = None):
"""使用正则表达式从文本中提取信息"""
# 查找包含完整合集信息的JSON片段
mix_pattern = r'\{[^{}]*"mix_id"\s*:\s*"([^"]*)"[^{}]*"mix_name"\s*:\s*"([^"]*)"[^{}]*"statis"\s*:\s*\{[^{}]*"play_vv"\s*:\s*(\d+)[^{}]*\}[^{}]*\}'
for match in re.finditer(mix_pattern, text):
try:
mix_id = match.group(1)
mix_name = match.group(2)
vv = int(match.group(3))
# 构建合集链接
video_url = f"https://www.douyin.com/collection/{mix_id}" if mix_id else ""
self.play_vv_items.append({
'play_vv': vv,
'formatted': self.format_count(vv),
'url': source_url,
'request_id': request_id,
'mix_name': mix_name,
'video_url': video_url, # 合集链接
'mix_id': mix_id, # 合集ID
'timestamp': datetime.now().isoformat()
})
logging.info(f'正则提取到合集: {mix_name} (ID: {mix_id}) - {vv:,} 播放量')
except Exception:
continue
# 兜底查找单独的play_vv值
for match in re.findall(r'"play_vv"\s*:\s*(\d+)', text):
try:
vv = int(match)
# 检查是否已经存在相同的play_vv
if not any(item['play_vv'] == vv for item in self.play_vv_items):
self.play_vv_items.append({
'play_vv': vv,
'formatted': self.format_count(vv),
'url': source_url,
'request_id': request_id,
'mix_name': '', # 未知合集名称
'video_url': '', # 未知链接
'mix_id': '', # 未知mix_id
'timestamp': datetime.now().isoformat()
})
except Exception:
continue
def collect_network_bodies(self, duration_s: int = None):
if duration_s is None:
duration_s = self.duration_s
logging.info(f'开始收集网络响应体,持续 {duration_s}s')
start = time.time()
known_request_ids = set()
# 目标关键词(收藏/合集/视频)
url_keywords = ['aweme', 'mix', 'collection', 'favorite', 'note', 'api']
last_progress = 0
while time.time() - start < duration_s:
try:
logs = self.driver.get_log('performance')
except Exception as e:
logging.warning(f'获取性能日志失败: {e}')
time.sleep(1)
continue
for entry in logs:
try:
message = json.loads(entry['message'])['message']
except Exception:
continue
method = message.get('method')
params = message.get('params', {})
# 记录请求URL
if method == 'Network.requestWillBeSent':
req_id = params.get('requestId')
url = params.get('request', {}).get('url', '')
if any(k in url for k in url_keywords):
self.captured_responses.append({'requestId': req_id, 'url': url, 'type': 'request'})
# 响应到达,尝试获取响应体
if method == 'Network.responseReceived':
req_id = params.get('requestId')
url = params.get('response', {}).get('url', '')
type_ = params.get('type') # XHR, Fetch, Document
if req_id and req_id not in known_request_ids:
known_request_ids.add(req_id)
# 仅处理XHR/Fetch
if type_ in ('XHR', 'Fetch') and any(k in url for k in url_keywords):
try:
body_obj = self.driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': req_id})
body_text = body_obj.get('body', '')
# 可能是base64编码
if body_obj.get('base64Encoded'):
try:
import base64
body_text = base64.b64decode(body_text).decode('utf-8', errors='ignore')
except Exception:
pass
# 解析play_vv
self.parse_play_vv_from_text(body_text, url, req_id)
except Exception:
# 某些响应不可获取或过大
pass
elapsed = int(time.time() - start)
if elapsed - last_progress >= 5:
last_progress = elapsed
logging.info(f'进度: {elapsed}/{duration_s}, 目标数量: {len(self.play_vv_items)}')
time.sleep(0.8)
logging.info(f'网络收集完成,共发现 {len(self.play_vv_items)} 个目标')
def parse_ssr_data(self):
logging.info('尝试解析页面SSR数据')
# 尝试直接从window对象获取
keys = ['_SSR_HYDRATED_DATA', 'RENDER_DATA']
for key in keys:
try:
data = self.driver.execute_script(f'return window.{key}')
if data:
text = json.dumps(data, ensure_ascii=False)
self.parse_play_vv_from_text(text, f'page_{key}', None)
logging.info(f'{key} 中解析完成')
except Exception:
continue
# 兜底从page_source中正则查找
try:
page_source = self.driver.page_source
self.parse_play_vv_from_text(page_source, 'page_source', None)
# 同时尝试识别statis结构中的play_vv
for m in re.findall(r'"statis"\s*:\s*\{[^}]*"play_vv"\s*:\s*(\d+)[^}]*\}', page_source):
try:
vv = int(m)
# 检查是否已经存在相同的play_vv
if not any(item['play_vv'] == vv for item in self.play_vv_items):
self.play_vv_items.append({
'play_vv': vv,
'formatted': self.format_count(vv),
'url': 'page_source_statis',
'request_id': None,
'mix_name': '', # 从statis中无法获取合集名称
'video_url': '', # 从statis中无法获取链接
'timestamp': datetime.now().isoformat()
})
except Exception:
pass
except Exception:
pass
def dedupe(self):
# 去重按play_vv数值
unique = []
seen = set()
for item in self.play_vv_items:
vv = item['play_vv']
if vv not in seen:
unique.append(item)
seen.add(vv)
self.play_vv_items = unique
def save_results(self):
# 保存到MongoDB
self.save_to_mongodb()
logging.info('结果已保存到MongoDB')
def save_to_mongodb(self):
"""将数据保存到MongoDB"""
if self.collection is None:
logging.warning('MongoDB未连接跳过数据库保存')
return
if not self.play_vv_items:
logging.info('没有数据需要保存到MongoDB')
return
try:
batch_time = datetime.now()
documents = []
for item in self.play_vv_items:
# 保留用户要求的7个字段 + cover_image_url作为合集封面图片完整链接
doc = {
'batch_time': batch_time,
'mix_name': item.get('mix_name', ''),
'video_url': item.get('video_url', ''),
'playcount': item.get('formatted', ''),
'play_vv': item.get('play_vv', 0),
'request_id': item.get('request_id', ''),
'rank': 0, # 临时设置,后面会重新计算
'cover_image_url': item.get('cover_image_url', ''), # 合集封面图片主链接完整URL
'cover_backup_urls': item.get('cover_backup_urls', []) # 封面图片备用链接列表
}
documents.append(doc)
# 按播放量降序排序并添加排名
documents.sort(key=lambda x: x['play_vv'], reverse=True)
for i, doc in enumerate(documents, 1):
doc['rank'] = i
# 批量插入
result = self.collection.insert_many(documents)
logging.info(f'成功保存 {len(result.inserted_ids)} 条记录到MongoDB')
# 输出统计信息
total_play_vv = sum(doc['play_vv'] for doc in documents)
max_play_vv = max(doc['play_vv'] for doc in documents) if documents else 0
logging.info(f'MongoDB保存统计: 总播放量={total_play_vv:,}, 最高播放量={max_play_vv:,}')
logging.info(f'保存的字段: batch_time, mix_name, video_url, playcount, play_vv, request_id, rank, cover_image_url, cover_backup_urls')
# 统计封面图片提取情况
cover_count = sum(1 for doc in documents if doc.get('cover_image_url'))
backup_count = sum(1 for doc in documents if doc.get('cover_backup_urls'))
logging.info(f'封面图片统计: {cover_count}/{len(documents)} 个合集有主封面链接, {backup_count} 个合集有备用链接')
except Exception as e:
logging.error(f'保存到MongoDB时出错: {e}')
def run(self):
try:
self.setup_driver()
self.navigate()
self.ensure_login()
self.trigger_loading()
self.collect_network_bodies()
self.parse_ssr_data()
self.dedupe()
self.save_results()
logging.info('完成play_vv数量: %d', len(self.play_vv_items))
finally:
if self.driver:
try:
self.driver.quit()
except Exception:
pass
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Selenium+CDP 抖音play_vv抓取器')
parser.add_argument('--url', default='https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation', help='收藏合集列表页面URL')
parser.add_argument('--auto', action='store_true', help='自动继续,跳过回车等待')
parser.add_argument('--duration', type=int, default=60, help='网络响应收集时长(秒)')
parser.add_argument('--driver', help='覆盖chromedriver路径')
args = parser.parse_args()
if args.driver:
os.environ['OVERRIDE_CHROMEDRIVER'] = args.driver
if args.auto:
os.environ['AUTO_CONTINUE'] = '1'
print('=== Selenium+CDP 抖音play_vv抓取器 ===')
scraper = DouyinPlayVVScraper(args.url, auto_continue=args.auto, duration_s=args.duration)
scraper.run()

846
routers/rank_api_routes.py Normal file
View File

@ -0,0 +1,846 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
小程序专用抖音播放量数据API服务器
优化的数据格式和接口设计专为小程序使用
"""
from flask import Blueprint, request, jsonify
from datetime import datetime, timedelta
import logging
import re
from database import db
# 创建蓝图
rank_bp = Blueprint('rank', __name__, url_prefix='/api/rank')
# 获取数据库集合
collection = db['Rankings_list']
daily_rankings_collection = db['Ranking_storage'] # 榜单存储表
def format_playcount(playcount_str):
"""格式化播放量字符串为数字"""
if not playcount_str:
return 0
try:
if isinstance(playcount_str, (int, float)):
return int(playcount_str)
playcount_str = str(playcount_str).strip()
# 处理亿、万等单位
if "亿" in playcount_str:
num = float(re.findall(r'[\d.]+', playcount_str)[0])
return int(num * 100000000)
elif "" in playcount_str:
num = float(re.findall(r'[\d.]+', playcount_str)[0])
return int(num * 10000)
else:
# 尝试直接转换数字
return int(float(playcount_str))
except:
return 0
def format_cover_url(cover_data):
"""格式化封面图片URL"""
if not cover_data:
return ""
if isinstance(cover_data, str):
return cover_data
elif isinstance(cover_data, dict) and 'url_list' in cover_data:
return cover_data['url_list'][0] if cover_data['url_list'] else ""
else:
return ""
def format_time(time_obj):
"""格式化时间"""
if not time_obj:
return ""
if isinstance(time_obj, datetime):
return time_obj.strftime("%Y-%m-%d %H:%M:%S")
else:
return str(time_obj)
def sort_ranking_data(ranking_data, sort_by, sort_order='desc'):
"""
对榜单数据进行动态排序
Args:
ranking_data: 榜单数据列表
sort_by: 排序字段 (play_vv_change, play_vv_change_rate, play_vv, rank)
sort_order: 排序顺序 (asc, desc)
Returns:
排序后的榜单数据
"""
try:
# 定义排序键函数
def get_sort_key(item):
if sort_by == 'play_vv_change':
# 按播放量差值排序
timeline_data = item.get('timeline_data', {})
return timeline_data.get('play_vv_change', 0)
elif sort_by == 'play_vv_change_rate':
# 按播放量变化率排序
timeline_data = item.get('timeline_data', {})
return timeline_data.get('play_vv_change_rate', 0)
elif sort_by == 'play_vv':
# 按当前播放量排序
return item.get('play_vv', 0)
elif sort_by == 'rank':
# 按排名排序
return item.get('rank', 999999)
else:
# 默认按排名排序
return item.get('rank', 999999)
# 执行排序
reverse = (sort_order == 'desc')
# 对于排名字段,降序实际上是升序(排名越小越好)
if sort_by == 'rank':
reverse = (sort_order == 'asc')
sorted_data = sorted(ranking_data, key=get_sort_key, reverse=reverse)
# 重新分配排名
for i, item in enumerate(sorted_data, 1):
item['current_sort_rank'] = i
return sorted_data
except Exception as e:
logging.error(f"排序榜单数据失败: {e}")
# 如果排序失败,返回原始数据
return ranking_data
def format_mix_item(doc):
"""格式化合集数据项 - 完全按照数据库原始字段返回"""
return {
"_id": str(doc.get("_id", "")),
"batch_time": format_time(doc.get("batch_time")),
"mix_name": doc.get("mix_name", ""),
"video_url": doc.get("video_url", ""),
"playcount": doc.get("playcount", ""),
"play_vv": doc.get("play_vv", 0),
"request_id": doc.get("request_id", ""),
"rank": doc.get("rank", 0),
"cover_image_url": doc.get("cover_image_url", ""),
"cover_backup_urls": doc.get("cover_backup_urls", [])
}
def get_mix_list(page=1, limit=20, sort_by="playcount"):
"""获取合集列表(分页)"""
try:
# 计算跳过的数量
skip = (page - 1) * limit
# 设置排序字段
if sort_by == "growth":
# 按增长排序需要特殊处理
return get_growth_mixes(page, limit)
else:
sort_field = "play_vv" if sort_by == "playcount" else "batch_time"
sort_order = -1 # 降序
# 获取今天的日期
today = datetime.now().date()
# 只查询今天的数据
query_condition = {
"batch_time": {
"$gte": datetime(today.year, today.month, today.day),
"$lt": datetime(today.year, today.month, today.day) + timedelta(days=1)
}
}
# 查询数据并按短剧名称分组,取每个短剧的最新记录
pipeline = [
{"$match": query_condition},
{"$sort": {"batch_time": -1}}, # 按时间倒序
{"$group": {
"_id": "$mix_name", # 按短剧名称分组
"latest_doc": {"$first": "$$ROOT"} # 取每个分组的第一条记录(最新记录)
}},
{"$replaceRoot": {"newRoot": "$latest_doc"}},
{"$sort": {sort_field: sort_order}},
{"$skip": skip},
{"$limit": limit}
]
docs = list(collection.aggregate(pipeline))
# 获取总数
total_pipeline = [
{"$match": query_condition},
{"$sort": {"batch_time": -1}},
{"$group": {"_id": "$mix_name"}},
{"$count": "total"}
]
total_result = list(collection.aggregate(total_pipeline))
total = total_result[0]["total"] if total_result else 0
# 格式化数据
mix_list = []
for doc in docs:
item = format_mix_item(doc)
mix_list.append(item)
return {
"success": True,
"data": mix_list,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"sort_by": sort_by,
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取合集列表失败: {e}")
return {"success": False, "message": f"获取数据失败: {str(e)}"}
def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None):
"""获取按播放量增长排序的合集列表 - 优先从定时器生成的数据中读取"""
try:
# 计算跳过的数量
skip = (page - 1) * limit
# 如果没有提供日期,默认使用今天和昨天
if not start_date or not end_date:
end_date = datetime.now().date()
start_date = end_date - timedelta(days=1)
else:
# 转换字符串日期为datetime对象
if isinstance(start_date, str):
start_date = datetime.strptime(start_date, "%Y-%m-%d").date()
if isinstance(end_date, str):
end_date = datetime.strptime(end_date, "%Y-%m-%d").date()
end_date_str = end_date.strftime("%Y-%m-%d")
start_date_str = start_date.strftime("%Y-%m-%d")
# 优先尝试从定时器生成的增长榜数据中读取
try:
growth_ranking = daily_rankings_collection.find_one({
"date": end_date_str,
"type": "growth",
"start_date": start_date_str,
"end_date": end_date_str
}, sort=[("calculation_sequence", -1)]) # 获取最新的计算结果
if growth_ranking and "data" in growth_ranking:
logging.info(f"📈 从定时器生成的增长榜数据中读取 {end_date_str} 的增长榜")
# 获取预先计算好的增长榜数据
growth_data = growth_ranking["data"]
# 分页处理
total = len(growth_data)
paginated_data = growth_data[skip:skip + limit]
return {
"success": True,
"data": paginated_data,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"sort_by": "growth",
"date_range": {
"start_date": start_date_str,
"end_date": end_date_str
},
"data_source": "timer_generated", # 标识数据来源
"update_time": growth_ranking.get("created_at", datetime.now()).strftime("%Y-%m-%d %H:%M:%S") if isinstance(growth_ranking.get("created_at"), datetime) else str(growth_ranking.get("created_at", ""))
}
except Exception as e:
logging.warning(f"从定时器数据读取增长榜失败,将使用动态计算: {e}")
# 如果定时器数据不存在或读取失败,回退到动态计算
logging.info(f"📊 动态计算 {start_date_str}{end_date_str} 的增长榜")
# 查询结束日期的数据
end_cursor = collection.find({
"batch_time": {
"$gte": datetime(end_date.year, end_date.month, end_date.day),
"$lt": datetime(end_date.year, end_date.month, end_date.day) + timedelta(days=1)
}
})
end_data = list(end_cursor)
# 查询开始日期的数据
start_cursor = collection.find({
"batch_time": {
"$gte": datetime(start_date.year, start_date.month, start_date.day),
"$lt": datetime(start_date.year, start_date.month, start_date.day) + timedelta(days=1)
}
})
start_data = list(start_cursor)
# 创建字典以便快速查找
end_dict = {item["mix_name"]: item for item in end_data}
start_dict = {item["mix_name"]: item for item in start_data}
# 计算增长数据
growth_data = []
for mix_name, end_item in end_dict.items():
if mix_name in start_dict:
start_item = start_dict[mix_name]
growth = end_item.get("play_vv", 0) - start_item.get("play_vv", 0)
# 只保留增长为正的数据
if growth > 0:
item = format_mix_item(end_item)
item["growth"] = growth
item["start_date"] = start_date_str
item["end_date"] = end_date_str
growth_data.append(item)
else:
# 如果开始日期没有数据,但结束日期有,也认为是新增长
item = format_mix_item(end_item)
item["growth"] = end_item.get("play_vv", 0)
item["start_date"] = start_date_str
item["end_date"] = end_date_str
growth_data.append(item)
# 按增长值降序排序
growth_data.sort(key=lambda x: x.get("growth", 0), reverse=True)
# 分页处理
total = len(growth_data)
paginated_data = growth_data[skip:skip + limit]
# 添加排名
for i, item in enumerate(paginated_data):
item["rank"] = skip + i + 1
return {
"success": True,
"data": paginated_data,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"sort_by": "growth",
"date_range": {
"start_date": start_date_str,
"end_date": end_date_str
},
"data_source": "dynamic_calculation", # 标识数据来源
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取增长合集列表失败: {e}")
# 如果增长计算失败,返回按播放量排序的数据作为备选
return get_mix_list(page, limit, "playcount")
def get_top_mixes(limit=10):
"""获取热门合集TOP榜单"""
try:
# 按播放量排序获取热门合集
cursor = collection.find().sort("play_vv", -1).limit(limit)
docs = list(cursor)
if not docs:
return {"success": False, "message": "暂无数据"}
# 格式化数据
top_list = []
for doc in docs:
item = format_mix_item(doc)
top_list.append(item)
return {
"success": True,
"data": top_list,
"total": len(top_list),
"update_time": format_time(docs[0].get("batch_time")) if docs else ""
}
except Exception as e:
logging.error(f"获取热门合集失败: {e}")
return {"success": False, "message": f"获取数据失败: {str(e)}"}
def search_mixes(keyword, page=1, limit=10):
"""搜索合集"""
try:
if not keyword:
return {"success": False, "message": "请提供搜索关键词"}
# 计算跳过的数量
skip = (page - 1) * limit
# 构建搜索条件(模糊匹配合集名称)
search_condition = {
"mix_name": {"$regex": keyword, "$options": "i"}
}
# 查询数据
cursor = collection.find(search_condition).sort("play_vv", -1).skip(skip).limit(limit)
docs = list(cursor)
# 获取搜索结果总数
total = collection.count_documents(search_condition)
# 格式化数据
search_results = []
for doc in docs:
item = format_mix_item(doc)
search_results.append(item)
return {
"success": True,
"data": search_results,
"keyword": keyword,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"搜索合集失败: {e}")
return {"success": False, "message": f"搜索失败: {str(e)}"}
def get_mix_detail(mix_id):
"""获取合集详情"""
try:
from bson import ObjectId
# 尝试通过ObjectId查找
try:
doc = collection.find_one({"_id": ObjectId(mix_id)})
except:
# 如果ObjectId无效尝试其他字段
doc = collection.find_one({
"$or": [
{"mix_name": mix_id},
{"request_id": mix_id}
]
})
if not doc:
return {"success": False, "message": "未找到合集信息"}
# 格式化详细信息 - 只返回数据库原始字段
detail = format_mix_item(doc)
return {
"success": True,
"data": detail,
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取合集详情失败: {e}")
return {"success": False, "message": f"获取详情失败: {str(e)}"}
def get_statistics():
"""获取统计信息"""
try:
# 基本统计
total_mixes = collection.count_documents({})
if total_mixes == 0:
return {"success": False, "message": "暂无数据"}
# 播放量统计
pipeline = [
{
"$group": {
"_id": None,
"total_playcount": {"$sum": "$play_vv"},
"avg_playcount": {"$avg": "$play_vv"},
"max_playcount": {"$max": "$play_vv"},
"min_playcount": {"$min": "$play_vv"}
}
}
]
stats_result = list(collection.aggregate(pipeline))
stats = stats_result[0] if stats_result else {}
# 获取最新更新时间
latest_doc = collection.find().sort("batch_time", -1).limit(1)
latest_time = ""
if latest_doc:
latest_list = list(latest_doc)
if latest_list:
latest_time = format_time(latest_list[0].get("batch_time"))
# 热门分类统计(按播放量区间)
categories = [
{"name": "超热门", "min": 100000000, "count": 0}, # 1亿+
{"name": "热门", "min": 50000000, "max": 99999999, "count": 0}, # 5000万-1亿
{"name": "中等", "min": 10000000, "max": 49999999, "count": 0}, # 1000万-5000万
{"name": "一般", "min": 0, "max": 9999999, "count": 0} # 1000万以下
]
for category in categories:
if "max" in category:
count = collection.count_documents({
"play_vv": {"$gte": category["min"], "$lte": category["max"]}
})
else:
count = collection.count_documents({
"play_vv": {"$gte": category["min"]}
})
category["count"] = count
return {
"success": True,
"data": {
"total_mixes": total_mixes,
"total_playcount": stats.get("total_playcount", 0),
"avg_playcount": int(stats.get("avg_playcount", 0)),
"max_playcount": stats.get("max_playcount", 0),
"min_playcount": stats.get("min_playcount", 0),
"categories": categories,
"latest_update": latest_time
},
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
except Exception as e:
logging.error(f"获取统计信息失败: {e}")
return {"success": False, "message": f"获取统计失败: {str(e)}"}
# 路由定义
@rank_bp.route('/videos')
def get_videos():
"""获取合集列表 - 兼容app.py调用"""
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 20))
sort_by = request.args.get('sort', 'playcount')
if sort_by == 'growth':
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
result = get_growth_mixes(page, limit, start_date, end_date)
else:
result = get_mix_list(page, limit, sort_by)
return jsonify(result)
@rank_bp.route('/top')
def get_top():
"""获取热门榜单 - 兼容app.py调用"""
limit = int(request.args.get('limit', 10))
result = get_top_mixes(limit)
return jsonify(result)
@rank_bp.route('/search')
def search():
"""搜索合集 - 兼容app.py调用"""
keyword = request.args.get('q', '')
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10))
result = search_mixes(keyword, page, limit)
return jsonify(result)
@rank_bp.route('/detail')
def get_detail():
"""获取合集详情 - 兼容app.py调用"""
mix_id = request.args.get('id', '')
result = get_mix_detail(mix_id)
return jsonify(result)
@rank_bp.route('/stats')
def get_stats():
"""获取统计信息 - 兼容app.py调用"""
result = get_statistics()
return jsonify(result)
@rank_bp.route('/health')
def health_check():
"""健康检查 - 兼容app.py调用"""
try:
from database import client
# 检查数据库连接
if not client:
return jsonify({"success": False, "message": "数据库未连接"})
# 测试数据库连接
client.admin.command('ping')
# 获取数据统计
total_count = collection.count_documents({})
return jsonify({
"success": True,
"message": "服务正常",
"data": {
"database": "连接正常",
"total_records": total_count,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
})
except Exception as e:
logging.error(f"健康检查失败: {e}")
return jsonify({"success": False, "message": f"服务异常: {str(e)}"})
# ==================== 榜单查询API接口 ====================
@rank_bp.route('/rankings')
def get_rankings():
"""获取榜单列表 - 支持按日期和类型查询,支持动态排序"""
try:
# 获取查询参数
date = request.args.get('date') # 日期格式YYYY-MM-DD
ranking_type = request.args.get('type') # 榜单类型playcount, growth, newcomer
sort_by = request.args.get('sort_by', 'default') # 排序方式default, play_vv_change, play_vv_change_rate, play_vv
sort_order = request.args.get('sort_order', 'desc') # 排序顺序asc, desc
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 50))
# 构建查询条件
query = {}
if date:
query['date'] = date
if ranking_type:
query['ranking_type'] = ranking_type
# 如果没有指定日期,默认获取最新日期的榜单
if not date:
latest_ranking = daily_rankings_collection.find_one(
{}, sort=[('date', -1)]
)
if latest_ranking:
query['date'] = latest_ranking['date']
# 查询榜单
rankings = list(daily_rankings_collection.find(query).sort('generated_at', -1))
if not rankings:
return jsonify({
"success": True,
"message": "暂无榜单数据",
"data": {
"rankings": [],
"total": 0,
"page": page,
"limit": limit
}
})
# 格式化返回数据
formatted_rankings = []
for ranking in rankings:
ranking_data = ranking.get('data', [])
# 动态排序逻辑
if sort_by != 'default' and ranking_data:
ranking_data = sort_ranking_data(ranking_data, sort_by, sort_order)
# 分页处理榜单数据
start_idx = (page - 1) * limit
end_idx = start_idx + limit
paginated_data = ranking_data[start_idx:end_idx]
formatted_rankings.append({
"date": ranking.get('date'),
"ranking_type": ranking.get('ranking_type'),
"ranking_name": ranking.get('ranking_name'),
"description": ranking.get('description'),
"data": paginated_data,
"total_count": len(ranking_data),
"current_page_count": len(paginated_data),
"generated_at": format_time(ranking.get('generated_at')),
"version": ranking.get('version', '1.0'),
"sort_info": {
"sort_by": sort_by,
"sort_order": sort_order
}
})
return jsonify({
"success": True,
"message": "获取榜单成功",
"data": {
"rankings": formatted_rankings,
"total": len(formatted_rankings),
"page": page,
"limit": limit,
"sort_by": sort_by,
"sort_order": sort_order
}
})
except Exception as e:
logging.error(f"获取榜单失败: {e}")
return jsonify({"success": False, "message": f"获取榜单失败: {str(e)}"})
@rank_bp.route('/rankings/dates')
def get_ranking_dates():
"""获取可用的榜单日期列表"""
try:
# 获取所有不重复的日期
dates = daily_rankings_collection.distinct('date')
dates.sort(reverse=True) # 按日期倒序排列
return jsonify({
"success": True,
"message": "获取日期列表成功",
"data": {
"dates": dates,
"total": len(dates)
}
})
except Exception as e:
logging.error(f"获取日期列表失败: {e}")
return jsonify({"success": False, "message": f"获取日期列表失败: {str(e)}"})
@rank_bp.route('/rankings/types')
def get_ranking_types():
"""获取支持的榜单类型"""
try:
# 获取所有不重复的榜单类型
types = daily_rankings_collection.distinct('ranking_type')
# 添加类型说明
type_descriptions = {
'playcount': '播放量榜 - 按播放量排序',
'growth': '增长榜 - 播放量增长最快',
'newcomer': '新晋榜 - 新上榜内容'
}
formatted_types = []
for type_name in types:
formatted_types.append({
"type": type_name,
"description": type_descriptions.get(type_name, type_name)
})
return jsonify({
"success": True,
"message": "获取榜单类型成功",
"data": {
"types": formatted_types,
"total": len(formatted_types)
}
})
except Exception as e:
logging.error(f"获取榜单类型失败: {e}")
return jsonify({"success": False, "message": f"获取榜单类型失败: {str(e)}"})
@rank_bp.route('/rankings/latest')
def get_latest_rankings():
"""获取最新的所有类型榜单"""
try:
# 获取最新日期
latest_ranking = daily_rankings_collection.find_one(
{}, sort=[('date', -1)]
)
if not latest_ranking:
return jsonify({
"success": True,
"message": "暂无榜单数据",
"data": {
"date": None,
"rankings": []
}
})
latest_date = latest_ranking['date']
# 获取该日期的所有榜单
rankings = list(daily_rankings_collection.find({
'date': latest_date
}).sort('ranking_type', 1))
formatted_rankings = []
for ranking in rankings:
# 只返回前20条数据
ranking_data = ranking.get('data', [])[:20]
formatted_rankings.append({
"ranking_type": ranking.get('ranking_type'),
"ranking_name": ranking.get('ranking_name'),
"description": ranking.get('description'),
"data": ranking_data,
"total_count": ranking.get('total_count', 0),
"preview_count": len(ranking_data)
})
return jsonify({
"success": True,
"message": "获取最新榜单成功",
"data": {
"date": latest_date,
"rankings": formatted_rankings,
"total_types": len(formatted_rankings)
}
})
except Exception as e:
logging.error(f"获取最新榜单失败: {e}")
return jsonify({"success": False, "message": f"获取最新榜单失败: {str(e)}"})
@rank_bp.route('/rankings/stats')
def get_rankings_stats():
"""获取榜单统计信息"""
try:
# 统计总榜单数
total_rankings = daily_rankings_collection.count_documents({})
# 统计日期数量
total_dates = len(daily_rankings_collection.distinct('date'))
# 统计榜单类型数量
total_types = len(daily_rankings_collection.distinct('ranking_type'))
# 获取最新和最早日期
latest_ranking = daily_rankings_collection.find_one({}, sort=[('date', -1)])
earliest_ranking = daily_rankings_collection.find_one({}, sort=[('date', 1)])
latest_date = latest_ranking['date'] if latest_ranking else None
earliest_date = earliest_ranking['date'] if earliest_ranking else None
return jsonify({
"success": True,
"message": "获取榜单统计成功",
"data": {
"total_rankings": total_rankings,
"total_dates": total_dates,
"total_types": total_types,
"latest_date": latest_date,
"earliest_date": earliest_date,
"date_range": f"{earliest_date}{latest_date}" if earliest_date and latest_date else "暂无数据"
}
})
except Exception as e:
logging.error(f"获取榜单统计失败: {e}")
return jsonify({"success": False, "message": f"获取榜单统计失败: {str(e)}"})