Compare commits

..

No commits in common. "79c991ee642519923e0d9d1435e78b568fcd4bb0" and "99ae97162c372aad15b05a49c86074dafc7a1686" have entirely different histories.

12 changed files with 1187 additions and 652 deletions

3
.gitignore vendored
View File

@ -36,6 +36,9 @@ scripts/config/chrome_profile/
drivers/* drivers/*
!drivers/chromedriver.exe !drivers/chromedriver.exe
# Rankings config directory
handlers/Rankings/config/
# Environment variables # Environment variables
.env .env
.venv .venv

View File

@ -4,19 +4,33 @@
抖音播放量自动抓取定时器 - 跨平台版本 抖音播放量自动抓取定时器 - 跨平台版本
功能 功能
- 每晚24:00自动执行抖音播放量抓取任务 - 每晚自动执行抖音播放量抓取任务
- 数据抓取完成后自动生成各类榜单播放量榜增长榜新晋榜热门趋势榜
- 支持WindowsmacOSLinux - 支持WindowsmacOSLinux
- 自动保存数据到MongoDB - 自动保存数据到MongoDB
使用方法
- 正常模式python Timer_worker.py启动定时器
- 测试模式python Timer_worker.py --test立即执行一次
- 单次执行python Timer_worker.py --once立即执行一次并退出
- 仅生成榜单python Timer_worker.py --ranking-only仅生成榜单不抓取数据
""" """
import schedule import schedule
import time import time
import subprocess
import sys import sys
import os import os
import logging import logging
import argparse
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime, date
import config
# 添加项目路径到 Python 路径
sys.path.append(os.path.join(os.path.dirname(__file__), 'handlers', 'Rankings'))
from rank_data_scraper import DouyinPlayVVScraper
# 配置日志的函数 # 配置日志的函数
def setup_logging(): def setup_logging():
@ -48,43 +62,216 @@ class DouyinAutoScheduler:
# 设置环境变量,确保自动模式 # 设置环境变量,确保自动模式
os.environ['AUTO_CONTINUE'] = '1' os.environ['AUTO_CONTINUE'] = '1'
# 构建脚本路径 - 指向Rankings目录中的脚本 # 直接创建并运行 DouyinPlayVVScraper 实例
script_path = Path(__file__).parent / 'handlers' / 'Rankings' / 'rank_data_scraper.py' scraper = DouyinPlayVVScraper(
start_url="https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation",
auto_continue=True,
duration_s=60
)
if not script_path.exists(): logging.info("📁 开始执行抓取任务...")
logging.error(f"❌ 脚本文件不存在: {script_path}") scraper.run()
return
logging.info(f"📁 执行脚本: {script_path}")
# 使用subprocess执行脚本
result = subprocess.run([
sys.executable,
str(script_path),
'--auto',
'--duration', '60'
], capture_output=True, text=True, encoding='utf-8', errors='ignore')
if result.returncode == 0:
logging.info("✅ 抖音播放量抓取任务执行成功") logging.info("✅ 抖音播放量抓取任务执行成功")
if result.stdout:
logging.info(f"📄 输出: {result.stdout.strip()}") # 数据抓取完成后,自动生成当日榜单
else: self.generate_daily_rankings()
logging.error(f"❌ 任务执行失败,返回码: {result.returncode}")
if result.stderr:
logging.error(f"💥 错误信息: {result.stderr.strip()}")
if result.stdout:
logging.info(f"📄 输出: {result.stdout.strip()}")
except Exception as e: except Exception as e:
logging.error(f"💥 执行任务时发生异常: {e}") logging.error(f"💥 执行任务时发生异常: {e}")
import traceback
logging.error(f"详细错误信息: {traceback.format_exc()}")
def generate_daily_rankings(self):
"""生成每日榜单数据(基于时间轴对比)"""
try:
from database import db
from datetime import timedelta
# 获取集合
douyin_collection = db['Rankings_list'] # 使用真实抓取的数据
rankings_collection = db['Ranking_storage']
today = date.today()
yesterday = today - timedelta(days=1)
today_str = today.strftime('%Y-%m-%d')
yesterday_str = yesterday.strftime('%Y-%m-%d')
logging.info(f"📅 正在生成 {today_str} 的榜单(对比 {yesterday_str}...")
# 删除当天已有的榜单数据
rankings_collection.delete_many({"date": today_str})
logging.info(f"🗑️ 已清理 {today_str} 的旧榜单数据")
# 获取今天和昨天的榜单数据进行对比
try:
logging.info("🔄 正在生成时间轴对比榜单...")
# 获取今天的数据,按短剧名称去重,只保留播放量最高的
today_videos_raw = list(douyin_collection.find({}).sort("play_vv", -1))
# 按短剧名称去重,每个短剧只保留播放量最高的一条
unique_videos = {}
for video in today_videos_raw:
mix_name = video.get("mix_name", "")
if mix_name and (mix_name not in unique_videos or video.get("play_vv", 0) > unique_videos[mix_name].get("play_vv", 0)):
unique_videos[mix_name] = video
today_videos = list(unique_videos.values())
logging.info(f"📊 今日数据去重后:{len(today_videos)} 个独特短剧(原始数据:{len(today_videos_raw)} 条)")
# 获取昨天的榜单数据(如果存在),取最新的计算结果
yesterday_ranking = rankings_collection.find_one({
"date": yesterday_str,
"type": "comprehensive"
}, sort=[("calculation_sequence", -1)])
yesterday_data = {}
if yesterday_ranking and "data" in yesterday_ranking:
# 将昨天的数据转换为字典,以短剧名称为键
for item in yesterday_ranking["data"]:
title = item.get("title", "")
if title:
yesterday_data[title] = {
"rank": item.get("rank", 0),
"play_vv": item.get("play_vv", 0),
"video_id": item.get("video_id", "")
}
logging.info(f"📊 找到昨天的榜单数据,共 {len(yesterday_data)} 个短剧")
else:
logging.info("📊 未找到昨天的榜单数据,将作为首次生成")
if today_videos:
# 先计算所有视频的播放量差值
videos_with_growth = []
for video in today_videos:
video_id = str(video.get("_id", ""))
current_play_vv = video.get("play_vv", 0)
# 计算与昨天的对比数据
play_vv_change = 0
play_vv_change_rate = 0
is_new = True
mix_name = video.get("mix_name", "")
if mix_name in yesterday_data:
is_new = False
yesterday_play_vv = yesterday_data[mix_name]["play_vv"]
# 计算播放量变化
play_vv_change = current_play_vv - yesterday_play_vv
if yesterday_play_vv > 0:
play_vv_change_rate = round((play_vv_change / yesterday_play_vv) * 100, 2)
# 创建包含增长数据的视频项
video_with_growth = {
"video": video,
"play_vv_change": play_vv_change,
"play_vv_change_rate": play_vv_change_rate,
"is_new": is_new,
"yesterday_data": yesterday_data.get(mix_name, {})
}
videos_with_growth.append(video_with_growth)
# 按播放量差值降序排序(差值越大排名越靠前)
videos_with_growth.sort(key=lambda x: x["play_vv_change"], reverse=True)
comprehensive_ranking = {
"date": today_str,
"type": "comprehensive",
"name": "播放量增长榜单",
"description": f"基于 {yesterday_str}{today_str} 播放量差值排序的榜单(差值越大排名越靠前)",
"comparison_date": yesterday_str,
"total_videos": len(videos_with_growth),
"data": []
}
# 生成排序后的榜单数据
for i, item in enumerate(videos_with_growth, 1):
video = item["video"]
video_id = str(video.get("_id", ""))
current_play_vv = video.get("play_vv", 0)
mix_name = video.get("mix_name", "")
# 计算排名变化(基于昨天的排名)
rank_change = 0
if not item["is_new"] and item["yesterday_data"]:
yesterday_rank = item["yesterday_data"].get("rank", 0)
rank_change = yesterday_rank - i
ranking_item = {
"rank": i,
"title": mix_name,
"play_vv": current_play_vv,
"author": video.get("author", ""),
"video_id": video_id,
"video_url": video.get("video_url", ""),
"cover_image_url": video.get("cover_image_url", ""),
"playcount_str": video.get("playcount", ""),
# 时间轴对比数据
"timeline_data": {
"is_new": item["is_new"],
"rank_change": rank_change,
"play_vv_change": item["play_vv_change"],
"play_vv_change_rate": item["play_vv_change_rate"],
"yesterday_rank": item["yesterday_data"].get("rank", 0) if not item["is_new"] else 0,
"yesterday_play_vv": item["yesterday_data"].get("play_vv", 0) if not item["is_new"] else 0
}
}
comprehensive_ranking["data"].append(ranking_item)
# 为每次计算添加唯一的时间戳,确保数据唯一性
current_timestamp = datetime.now()
comprehensive_ranking["created_at"] = current_timestamp
comprehensive_ranking["calculation_id"] = f"{today_str}_{current_timestamp.strftime('%H%M%S')}"
# 检查今天已有多少次计算
existing_count = rankings_collection.count_documents({
"date": today_str,
"type": "comprehensive"
})
comprehensive_ranking["calculation_sequence"] = existing_count + 1
# 总是插入新的榜单记录,保留所有历史计算数据
rankings_collection.insert_one(comprehensive_ranking)
logging.info(f"📝 创建了新的今日榜单数据(第{existing_count + 1}次计算,包含最新差值)")
logging.info(f"🔖 计算ID: {comprehensive_ranking['calculation_id']}")
# 统计信息
new_count = sum(1 for item in comprehensive_ranking["data"] if item["timeline_data"]["is_new"])
logging.info(f"✅ 时间轴对比榜单生成成功")
logging.info(f"📊 总计 {len(comprehensive_ranking['data'])} 条记录")
logging.info(f"🆕 新上榜 {new_count}")
logging.info(f"🔄 对比基准日期: {yesterday_str}")
return True
else:
logging.warning("⚠️ 榜单生成失败:无今日数据")
return False
except Exception as e:
logging.error(f"💥 生成时间轴对比榜单时发生异常: {e}")
import traceback
logging.error(f"详细错误信息: {traceback.format_exc()}")
return False
except Exception as e:
logging.error(f"💥 生成榜单时发生异常: {e}")
import traceback
logging.error(f"详细错误信息: {traceback.format_exc()}")
def setup_schedule(self): def setup_schedule(self):
"""设置定时任务""" """设置定时任务"""
# 主执行时间每晚24:00午夜 # 从配置文件读取执行时间
schedule.every().day.at("00:00").do(self.run_douyin_scraper) scheduler_time = config.SCHEDULER_TIME
schedule.every().day.at(scheduler_time).do(self.run_douyin_scraper)
logging.info("⏰ 定时器已设置每晚24:00执行抖音播放量抓取") logging.info(f"⏰ 定时器已设置:每晚{scheduler_time}执行抖音播放量抓取")
def show_next_run(self): def show_next_run(self):
"""显示下次执行时间""" """显示下次执行时间"""
@ -103,17 +290,20 @@ class DouyinAutoScheduler:
logging.info("🧪 测试模式 - 立即执行抖音播放量抓取任务...") logging.info("🧪 测试模式 - 立即执行抖音播放量抓取任务...")
self.run_douyin_scraper() self.run_douyin_scraper()
def run_ranking_only(self):
"""仅生成榜单(不抓取数据)"""
logging.info("📊 仅生成榜单模式...")
self.generate_daily_rankings()
def start_scheduler(self): def start_scheduler(self):
"""启动定时器""" """启动定时器"""
self.is_running = True self.is_running = True
logging.info("🚀 抖音播放量自动抓取定时器已启动") logging.info("🚀 抖音播放量自动抓取定时器已启动")
logging.info("⏰ 执行时间每晚24:00") logging.info(f"⏰ 执行时间:每天{config.SCHEDULER_TIME}执行抖音播放量抓取")
logging.info("📁 目标脚本rank_data_scraper.py") logging.info("📁 目标脚本rank_data_scraper.py")
logging.info("💾 数据保存MongoDB") logging.info("💾 数据保存MongoDB")
logging.info("⏹️ 按 Ctrl+C 停止定时器") logging.info("⏹️ 按 Ctrl+C 停止定时器")
self.show_next_run()
try: try:
while self.is_running: while self.is_running:
schedule.run_pending() schedule.run_pending()
@ -131,24 +321,43 @@ def main():
"""主函数""" """主函数"""
import argparse import argparse
try:
parser = argparse.ArgumentParser(description='抖音播放量自动抓取定时器') parser = argparse.ArgumentParser(description='抖音播放量自动抓取定时器')
parser.add_argument('--test', action='store_true', help='测试模式 - 立即执行一次') parser.add_argument('--test', action='store_true', help='测试模式 - 立即执行一次')
parser.add_argument('--once', action='store_true', help='立即执行一次并退出') parser.add_argument('--once', action='store_true', help='立即执行一次并退出')
parser.add_argument('--ranking-only', action='store_true', help='仅生成榜单(不抓取数据)')
args = parser.parse_args() args = parser.parse_args()
# 设置日志配置 # 设置日志配置
setup_logging() setup_logging()
print("正在初始化定时器...")
scheduler = DouyinAutoScheduler() scheduler = DouyinAutoScheduler()
if args.test: if args.test:
print("执行测试模式...")
scheduler.run_test() scheduler.run_test()
elif args.once: elif args.once:
print("执行单次模式...")
scheduler.run_once() scheduler.run_once()
elif args.ranking_only:
print("执行榜单生成模式...")
scheduler.run_ranking_only()
else: else:
print("启动定时器模式...")
scheduler.setup_schedule() scheduler.setup_schedule()
scheduler.start_scheduler() scheduler.start_scheduler()
print("程序执行完成")
except Exception as e:
print(f"程序执行出错: {e}")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == '__main__': if __name__ == '__main__':
main() main()

73
app.py
View File

@ -1,4 +1,4 @@
from flask import Flask from flask import Flask, jsonify
from flask_cors import CORS from flask_cors import CORS
import logging import logging
import os import os
@ -20,74 +20,13 @@ logging.basicConfig(
] ]
) )
# 导入路由 # 导入并注册蓝图
from routers.rank_api_routes import api from routers.rank_api_routes import rank_bp
app.register_blueprint(rank_bp)
# 注册路由
@app.route('/')
def index():
"""API首页"""
from flask import jsonify
return jsonify({
"name": "抖音播放量数据API服务",
"version": "2.0",
"description": "主程序服务 - 整合小程序API功能",
"endpoints": {
"/api/videos": "获取视频列表 (支持分页和排序)",
"/api/top": "获取热门视频榜单",
"/api/search": "搜索视频",
"/api/detail": "获取视频详情",
"/api/stats": "获取统计信息",
"/api/health": "健康检查"
},
"features": [
"分页支持",
"多种排序方式",
"搜索功能",
"详情查看",
"统计分析",
"小程序优化"
]
})
# 注册小程序API路由
@app.route('/api/videos')
def get_videos():
return api.get_videos()
@app.route('/api/top')
def get_top():
return api.get_top()
@app.route('/api/search')
def search():
return api.search()
@app.route('/api/detail')
def get_detail():
return api.get_detail()
@app.route('/api/stats')
def get_stats():
return api.get_stats()
@app.route('/api/health')
def health_check():
return api.health_check()
if __name__ == '__main__': if __name__ == '__main__':
print("启动主程序服务...") print("启动主程序服务...")
print("服务地址: http://localhost:5000") print("服务地址: http://localhost:5001")
print("API接口列表:")
print(" - GET / 显示API信息")
print(" - GET /api/videos?page=1&limit=20&sort=playcount 获取视频列表(总播放量排序)")
print(" - GET /api/videos?page=1&limit=20&sort=growth 获取视频列表(增长排序,默认昨天到今天的差值)")
print(" - GET /api/videos?page=1&limit=20&sort=growth&start_date=2025-10-16&end_date=2025-10-17 获取视频列表(自定义日期范围增长排序)")
print(" - GET /api/top?limit=10 获取热门榜单")
print(" - GET /api/search?q=关键词&page=1&limit=10 搜索视频")
print(" - GET /api/detail?id=视频ID 获取视频详情")
print(" - GET /api/stats 获取统计信息")
print(" - GET /api/health 健康检查")
print("专为小程序优化:分页、搜索、详情、统计、增长排序、自定义日期范围")
app.run(host='0.0.0.0', port=5000, debug=True) app.run(host='0.0.0.0', port=5001, debug=True)

View File

@ -3,6 +3,7 @@ import importlib
# 数据库配置 # 数据库配置
MONGO_URI = "mongodb://localhost:27017" MONGO_URI = "mongodb://localhost:27017"
# MONGO_URI = "mongodb://mongouser:Jdei2243afN@172.16.0.6:27017,172.16.0.4:27017/test?replicaSet=cmgo-r6qkaern_0&authSource=admin"
MONGO_DB_NAME = "Rankings" MONGO_DB_NAME = "Rankings"
# 应用配置 # 应用配置
@ -13,4 +14,7 @@ DEBUG = APP_ENV == 'development'
LOG_LEVEL = 'INFO' LOG_LEVEL = 'INFO'
LOG_DIR = 'logs' LOG_DIR = 'logs'
# 定时器配置
SCHEDULER_TIME = "24:00" # 定时器执行时间,格式为 HH:MM (24小时制)
print(f"Successfully loaded configuration for environment: {APP_ENV}") print(f"Successfully loaded configuration for environment: {APP_ENV}")

View File

@ -5,9 +5,9 @@
本API服务提供抖音播放量数据的查询、搜索、统计等功能专为小程序优化设计。 本API服务提供抖音播放量数据的查询、搜索、统计等功能专为小程序优化设计。
**基础信息** **基础信息**
- 服务地址:`http://localhost:5000` - 服务地址:`http://localhost:5001`
- 数据源MongoDB数据库 - 数据源MongoDB数据库
- 数据更新:每晚24:00自动更新 - 数据更新:每日14:23自动更新
- 响应格式JSON - 响应格式JSON
## 通用响应格式 ## 通用响应格式
@ -85,12 +85,13 @@ GET /
"version": "2.0", "version": "2.0",
"description": "主程序服务 - 整合小程序API功能", "description": "主程序服务 - 整合小程序API功能",
"endpoints": { "endpoints": {
"/api/videos": "获取视频列表 (支持分页和排序)", "/api/rank/videos": "获取视频列表 (支持分页和排序)",
"/api/top": "获取热门视频榜单", "/api/rank/top": "获取热门视频榜单",
"/api/search": "搜索视频", "/api/rank/search": "搜索视频",
"/api/detail": "获取视频详情", "/api/rank/detail": "获取视频详情",
"/api/stats": "获取统计信息", "/api/rank/stats": "获取统计信息",
"/api/health": "健康检查" "/api/rank/health": "健康检查",
"/api/rank/rankings": "获取榜单列表"
}, },
"features": [ "features": [
"分页支持", "分页支持",
@ -108,34 +109,43 @@ GET /
**接口地址** **接口地址**
``` ```
GET /api/videos GET /api/rank/videos
``` ```
**功能描述** **功能描述**
获取分页的视频合集列表,支持多种排序方式 获取视频合集列表,支持分页和排序
**请求参数** **请求参数**
| 参数名 | 类型 | 必填 | 默认值 | 说明 | | 参数名 | 类型 | 必填 | 默认值 | 说明 |
|--------|------|------|--------|------| |--------|------|------|--------|------|
| page | int | 否 | 1 | 页码 | | page | int | 否 | 1 | 页码 |
| limit | int | 否 | 20 | 每页数量 | | limit | int | 否 | 20 | 每页数量 |
| sort | string | 否 | playcount | 排序方式playcount(播放量) / growth(增长量) | | sort | string | 否 | playcount | 排序方式playcount(播放量), growth(增长量) |
| start_date | string | 否 | 昨天 | 增长计算开始日期(格式: YYYY-MM-DD) | | start_date | string | 否 | 昨天 | 增长计算开始日期(YYYY-MM-DD) |
| end_date | string | 否 | 今天 | 增长计算结束日期(格式: YYYY-MM-DD) | | end_date | string | 否 | 今天 | 增长计算结束日期(YYYY-MM-DD) |
**排序说明**
- `playcount`: 按当前播放量从高到低排序
- `growth`: 按播放量增长差值从大到小排序
- 计算公式:增长值 = 结束日期播放量 - 开始日期播放量
- 只显示增长为正数的合集
- 排序规则:增长差值越大,排名越靠前
**使用示例** **使用示例**
``` ```
# 按播放量排序 # 按播放量排序(当前播放量从高到低)
GET /api/videos?page=1&limit=20&sort=playcount GET /api/rank/videos?page=1&limit=20&sort=playcount
# 按增长量排序(默认昨天到今天的增长) # 按增长量排序(播放量差值从大到小,默认昨天到今天的增长)
GET /api/videos?page=1&limit=20&sort=growth GET /api/rank/videos?page=1&limit=20&sort=growth
# 按自定义日期范围的增长排序 # 按自定义日期范围的增长排序(播放量差值从大到小)
GET /api/videos?page=1&limit=20&sort=growth&start_date=2025-10-16&end_date=2025-10-17 GET /api/rank/videos?page=1&limit=20&sort=growth&start_date=2025-10-16&end_date=2025-10-17
``` ```
**响应示例** **响应示例**
播放量排序响应:
```json ```json
{ {
"success": true, "success": true,
@ -166,11 +176,49 @@ GET /api/videos?page=1&limit=20&sort=growth&start_date=2025-10-16&end_date=2025-
} }
``` ```
增长榜排序响应包含额外的growth字段
```json
{
"success": true,
"data": [
{
"_id": "674f1234567890abcdef",
"batch_time": "2025-10-17 15:30:00",
"mix_name": "热门合集1",
"video_url": "https://www.douyin.com/video/xxx",
"playcount": "1.2亿",
"play_vv": 120000000,
"request_id": "request_xxx",
"rank": 1,
"growth": 5000000,
"start_date": "2025-10-16",
"end_date": "2025-10-17",
"cover_image_url": "https://p3.douyinpic.com/xxx",
"cover_backup_urls": ["url1", "url2"]
}
],
"pagination": {
"page": 1,
"limit": 20,
"total": 50,
"pages": 3,
"has_next": true,
"has_prev": false
},
"sort_by": "growth",
"date_range": {
"start_date": "2025-10-16",
"end_date": "2025-10-17"
},
"update_time": "2025-10-17 15:30:00"
}
```
### 3. 获取热门榜单 ### 3. 获取热门榜单
**接口地址** **接口地址**
``` ```
GET /api/top GET /api/rank/top
``` ```
**功能描述** **功能描述**
@ -183,7 +231,7 @@ GET /api/top
**使用示例** **使用示例**
``` ```
GET /api/top?limit=10 GET /api/rank/top?limit=10
``` ```
**响应示例** **响应示例**
@ -213,7 +261,7 @@ GET /api/top?limit=10
**接口地址** **接口地址**
``` ```
GET /api/search GET /api/rank/search
``` ```
**功能描述** **功能描述**
@ -228,7 +276,7 @@ GET /api/search
**使用示例** **使用示例**
``` ```
GET /api/search?q=关键词&page=1&limit=10 GET /api/rank/search?q=关键词&page=1&limit=10
``` ```
**响应示例** **响应示例**
@ -266,7 +314,7 @@ GET /api/search?q=关键词&page=1&limit=10
**接口地址** **接口地址**
``` ```
GET /api/detail GET /api/rank/detail
``` ```
**功能描述** **功能描述**
@ -279,7 +327,7 @@ GET /api/detail
**使用示例** **使用示例**
``` ```
GET /api/detail?id=674f1234567890abcdef GET /api/rank/detail?id=674f1234567890abcdef
``` ```
**响应示例** **响应示例**
@ -306,7 +354,7 @@ GET /api/detail?id=674f1234567890abcdef
**接口地址** **接口地址**
``` ```
GET /api/stats GET /api/rank/stats
``` ```
**功能描述** **功能描述**
@ -360,7 +408,7 @@ GET /api/stats
**接口地址** **接口地址**
``` ```
GET /api/health GET /api/rank/health
``` ```
**功能描述** **功能描述**
@ -406,7 +454,7 @@ GET /api/health
```javascript ```javascript
// 小程序端示例 // 小程序端示例
wx.request({ wx.request({
url: 'http://localhost:5000/api/videos', url: 'http://localhost:5001/api/rank/videos',
data: { data: {
page: 1, page: 1,
limit: 20, limit: 20,

View File

@ -13,29 +13,27 @@
### 项目结构(简版) ### 项目结构(简版)
``` ```
项目根/ 项目根/
├── app.py # 主服务入口5000 ├── app.py # 主服务入口5001
├── Timer_worker.py # 定时抓取任务 ├── Timer_worker.py # 定时抓取任务
├── config.py # 全局配置 ├── config.py # 全局配置
├── database.py # 数据库封装 ├── database.py # 数据库封装
├── docs/ # 项目文档
│ ├── README.md # 项目说明文档
│ ├── API接口文档.md # API接口说明
│ └── requirements.txt # 依赖包列表
├── routers/ ├── routers/
│ └── rank_api_routes.py # 小程序API逻辑模块 │ └── rank_api_routes.py # 小程序API逻辑模块
├── handlers/ └── handlers/
│ └── Rankings/ └── Rankings/
│ ├── rank_data_scraper.py # 抓取脚本Selenium+CDP ├── rank_data_scraper.py # 抓取脚本Selenium+CDP
│ ├── config/ └── drivers/ # 浏览器驱动等
│ │ └── chrome_profile/ └── chromedriver.exe # Chrome驱动程序
│ │ └── douyin_persistent/ # 持久化Chrome用户目录登录态
│ ├── data/ # 数据导出/缓存(可选)
│ ├── docs/ # 使用说明与文档
│ ├── drivers/ # 浏览器驱动等
│ └── logs/ # 运行日志
└── 项目启动说明.md
``` ```
- 核心数据表:`Rankings/Rankings_list` - 核心数据表:`Rankings/Rankings_list`
- 日志示例:`handlers/Rankings/logs/douyin_scraper.log` - 日志示例:`handlers/Rankings/logs/douyin_scraper.log`
## 3. 服务与端口 ## 3. 服务与端口
- 单一服务:`app.py`(默认端口 `5000`,包含小程序 API 路由) - 单一服务:`app.py`(默认端口 `5001`,包含小程序 API 路由)
## 4. 一键启动 ## 4. 一键启动
- 启动主服务: - 启动主服务:
@ -50,7 +48,7 @@
## 5. 使用步骤(首次登录与日常) ## 5. 使用步骤(首次登录与日常)
- 安装依赖: - 安装依赖:
```bash ```bash
pip install -r handlers/Rankings/docs/requirements.txt pip install -r docs/requirements.txt
``` ```
- 第一次使用(登录抖音): - 第一次使用(登录抖音):
- 运行抓取脚本:`python handlers/Rankings/rank_data_scraper.py` - 运行抓取脚本:`python handlers/Rankings/rank_data_scraper.py`
@ -60,15 +58,15 @@
- 日常流程: - 日常流程:
- 抓取:`python handlers/Rankings/rank_data_scraper.py` - 抓取:`python handlers/Rankings/rank_data_scraper.py`
- 服务:`python app.py`(端口 `5000` - 服务:`python app.py`(端口 `5001`
- 定时:`python Timer_worker.py`(每日 9:35 自动执行) - 定时:`python Timer_worker.py`(每日 14:23 自动执行)
- 验证数据: - 验证数据:
- MongoDB数据库 `Rankings`,集合 `Rankings_list` - MongoDB数据库 `Rankings`,集合 `Rankings_list`
- API 检查: - API 检查:
- `http://localhost:5000/api/health` - `http://localhost:5001/api/rank/health`
- `http://localhost:5000/api/videos?page=1&limit=20&sort_by=playcount` - `http://localhost:5001/api/rank/videos?page=1&limit=20&sort=playcount`
- 增长榜:`http://localhost:5000/api/videos?sort_by=growth&page=1&limit=20` - 增长榜:`http://localhost:5001/api/rank/videos?sort=growth&page=1&limit=20`
## 6. 数据抓取流程(简版) ## 6. 数据抓取流程(简版)
- 复用已登录的 Chrome 配置,滚动/刷新触发请求 - 复用已登录的 Chrome 配置,滚动/刷新触发请求
@ -92,11 +90,12 @@
## 10. 快速排错 ## 10. 快速排错
- MongoDB 连接失败:抓取脚本将仅保存本地文件日志 - MongoDB 连接失败:抓取脚本将仅保存本地文件日志
- Chrome 配置:`handlers/Rankings/config/chrome_profile/` - ChromeDriver 配置:`handlers/Rankings/drivers/chromedriver.exe`
- 日志位置:`handlers/Rankings/logs/` - 日志位置:`handlers/Rankings/logs/`(运行时自动创建)
## 11. 你需要知道的 ## 11. 你需要知道的
- 当前架构下没有独立的 `5001` 端口`routers/rank_api_routes.py` 提供逻辑模块,由 `app.py` 注册路由并统一对外服务`5000` - 当前架构下使用单一服务端口 `5001``routers/rank_api_routes.py` 提供逻辑模块,由 `app.py` 注册路由并统一对外服务。
- 抓取脚本与 API 使用同一集合,数据结构一致 - 抓取脚本与 API 使用同一集合,数据结构一致
- 小程序 API 专注返回易用字段(封面、播放量、时间、链接) - 小程序 API 专注返回易用字段(封面、播放量、时间、链接)
- 可直接在现有数据上新增排序或过滤,保持接口向后兼容 - 可直接在现有数据上新增排序或过滤,保持接口向后兼容
- ChromeDriver 已配置本地版本,避免网络下载问题

20
docs/requirements.txt Normal file
View File

@ -0,0 +1,20 @@
# Web抓取相关
selenium>=4.15.0
chromedriver-autoinstaller>=0.6.0
webdriver-manager>=4.0.0
# Web服务框架
flask>=2.3.0
flask-cors>=4.0.0
# 数据库
pymongo>=4.5.0
# 定时任务
schedule>=1.2.0
# 系统工具
psutil>=5.9.0
# 数据处理
pathlib2>=2.3.7; python_version<"3.4"

View File

@ -1,8 +0,0 @@
selenium>=4.15.0
schedule>=1.2.0
pymongo>=4.5.0
flask>=2.3.0
flask-cors>=4.0.0
chromedriver-autoinstaller>=0.6.0
webdriver-manager>=4.0.0
psutil>=5.9.0

Binary file not shown.

View File

@ -31,8 +31,11 @@ from selenium.webdriver.chrome.options import Options
# 保留导入但默认不使用webdriver_manager避免网络下载卡顿 # 保留导入但默认不使用webdriver_manager避免网络下载卡顿
from webdriver_manager.chrome import ChromeDriverManager # noqa: F401 from webdriver_manager.chrome import ChromeDriverManager # noqa: F401
import chromedriver_autoinstaller import chromedriver_autoinstaller
from pymongo import MongoClient import sys
from pymongo.errors import ConnectionFailure import os
# 添加项目根目录到 Python 路径
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
from database import db
# 配置日志 # 配置日志
@ -60,7 +63,6 @@ class DouyinPlayVVScraper:
self.driver = None self.driver = None
self.play_vv_items = [] # list of dicts: {play_vv, formatted, url, request_id, mix_name, watched_item} self.play_vv_items = [] # list of dicts: {play_vv, formatted, url, request_id, mix_name, watched_item}
self.captured_responses = [] self.captured_responses = []
self.mongo_client = None
self.db = None self.db = None
self.collection = None self.collection = None
self._cleanup_old_profiles() self._cleanup_old_profiles()
@ -69,33 +71,17 @@ class DouyinPlayVVScraper:
def _setup_mongodb(self): def _setup_mongodb(self):
"""设置MongoDB连接""" """设置MongoDB连接"""
try: try:
# MongoDB连接配置 # 使用 database.py 中的连接
mongo_host = os.environ.get('MONGO_HOST', 'localhost') self.db = db
mongo_port = int(os.environ.get('MONGO_PORT', 27017))
mongo_db = os.environ.get('MONGO_DB', 'Rankings') # 设置集合
mongo_collection = os.environ.get('MONGO_COLLECTION', 'Rankings_list') mongo_collection = os.environ.get('MONGO_COLLECTION', 'Rankings_list')
# 创建MongoDB连接
self.mongo_client = MongoClient(mongo_host, mongo_port, serverSelectionTimeoutMS=5000)
# 测试连接
self.mongo_client.admin.command('ping')
# 设置数据库和集合
self.db = self.mongo_client[mongo_db]
self.collection = self.db[mongo_collection] self.collection = self.db[mongo_collection]
logging.info(f'MongoDB连接成功: {mongo_host}:{mongo_port}/{mongo_db}.{mongo_collection}') logging.info(f'MongoDB连接成功使用数据库: {self.db.name},集合: {mongo_collection}')
except ConnectionFailure as e:
logging.warning(f'MongoDB连接失败: {e}')
logging.info('将仅保存到本地文件')
self.mongo_client = None
self.db = None
self.collection = None
except Exception as e: except Exception as e:
logging.warning(f'MongoDB设置出错: {e}') logging.warning(f'MongoDB设置出错: {e}')
self.mongo_client = None
self.db = None self.db = None
self.collection = None self.collection = None
@ -649,10 +635,10 @@ class DouyinPlayVVScraper:
elapsed = int(time.time() - start) elapsed = int(time.time() - start)
if elapsed - last_progress >= 5: if elapsed - last_progress >= 5:
last_progress = elapsed last_progress = elapsed
logging.info(f'进度: {elapsed}/{duration_s}s, 已发现play_vv候选 {len(self.play_vv_items)}') logging.info(f'进度: {elapsed}/{duration_s}, 目标数量: {len(self.play_vv_items)}')
time.sleep(0.8) time.sleep(0.8)
logging.info(f'网络收集完成,共发现 {len(self.play_vv_items)}play_vv候选') logging.info(f'网络收集完成,共发现 {len(self.play_vv_items)}目标')
def parse_ssr_data(self): def parse_ssr_data(self):
@ -712,7 +698,7 @@ class DouyinPlayVVScraper:
def save_to_mongodb(self): def save_to_mongodb(self):
"""将数据保存到MongoDB""" """将数据保存到MongoDB"""
if self.mongo_client is None or self.collection is None: if self.collection is None:
logging.warning('MongoDB未连接跳过数据库保存') logging.warning('MongoDB未连接跳过数据库保存')
return return
@ -797,6 +783,5 @@ if __name__ == '__main__':
os.environ['AUTO_CONTINUE'] = '1' os.environ['AUTO_CONTINUE'] = '1'
print('=== Selenium+CDP 抖音play_vv抓取器 ===') print('=== Selenium+CDP 抖音play_vv抓取器 ===')
print('将复用本地Chrome配置并抓取网络响应中的play_vv')
scraper = DouyinPlayVVScraper(args.url, auto_continue=args.auto, duration_s=args.duration) scraper = DouyinPlayVVScraper(args.url, auto_continue=args.auto, duration_s=args.duration)
scraper.run() scraper.run()

View File

@ -5,34 +5,20 @@
优化的数据格式和接口设计专为小程序使用 优化的数据格式和接口设计专为小程序使用
""" """
from pymongo import MongoClient from flask import Blueprint, request, jsonify
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging import logging
import re import re
from database import db
class MiniprogramAPI: # 创建蓝图
def __init__(self): rank_bp = Blueprint('rank', __name__, url_prefix='/api/rank')
self.client = None
self.db = None
self.collection = None
self.connect_mongodb()
def connect_mongodb(self): # 获取数据库集合
"""连接MongoDB数据库""" collection = db['Rankings_list']
try: daily_rankings_collection = db['Ranking_storage'] # 榜单存储表
self.client = MongoClient('mongodb://localhost:27017/')
# 测试连接
self.client.admin.command('ping')
# 使用数据库与集合
self.db = self.client['Rankings']
self.collection = self.db['Rankings_list']
logging.info("MongoDB连接成功")
return True
except Exception as e:
logging.error(f"MongoDB连接失败: {e}")
return False
def format_playcount(self, playcount_str): def format_playcount(playcount_str):
"""格式化播放量字符串为数字""" """格式化播放量字符串为数字"""
if not playcount_str: if not playcount_str:
return 0 return 0
@ -56,7 +42,7 @@ class MiniprogramAPI:
except: except:
return 0 return 0
def format_cover_url(self, cover_data): def format_cover_url(cover_data):
"""格式化封面图片URL""" """格式化封面图片URL"""
if not cover_data: if not cover_data:
return "" return ""
@ -68,7 +54,7 @@ class MiniprogramAPI:
else: else:
return "" return ""
def format_time(self, time_obj): def format_time(time_obj):
"""格式化时间""" """格式化时间"""
if not time_obj: if not time_obj:
return "" return ""
@ -78,11 +64,64 @@ class MiniprogramAPI:
else: else:
return str(time_obj) return str(time_obj)
def format_mix_item(self, doc): def sort_ranking_data(ranking_data, sort_by, sort_order='desc'):
"""
对榜单数据进行动态排序
Args:
ranking_data: 榜单数据列表
sort_by: 排序字段 (play_vv_change, play_vv_change_rate, play_vv, rank)
sort_order: 排序顺序 (asc, desc)
Returns:
排序后的榜单数据
"""
try:
# 定义排序键函数
def get_sort_key(item):
if sort_by == 'play_vv_change':
# 按播放量差值排序
timeline_data = item.get('timeline_data', {})
return timeline_data.get('play_vv_change', 0)
elif sort_by == 'play_vv_change_rate':
# 按播放量变化率排序
timeline_data = item.get('timeline_data', {})
return timeline_data.get('play_vv_change_rate', 0)
elif sort_by == 'play_vv':
# 按当前播放量排序
return item.get('play_vv', 0)
elif sort_by == 'rank':
# 按排名排序
return item.get('rank', 999999)
else:
# 默认按排名排序
return item.get('rank', 999999)
# 执行排序
reverse = (sort_order == 'desc')
# 对于排名字段,降序实际上是升序(排名越小越好)
if sort_by == 'rank':
reverse = (sort_order == 'asc')
sorted_data = sorted(ranking_data, key=get_sort_key, reverse=reverse)
# 重新分配排名
for i, item in enumerate(sorted_data, 1):
item['current_sort_rank'] = i
return sorted_data
except Exception as e:
logging.error(f"排序榜单数据失败: {e}")
# 如果排序失败,返回原始数据
return ranking_data
def format_mix_item(doc):
"""格式化合集数据项 - 完全按照数据库原始字段返回""" """格式化合集数据项 - 完全按照数据库原始字段返回"""
return { return {
"_id": str(doc.get("_id", "")), "_id": str(doc.get("_id", "")),
"batch_time": self.format_time(doc.get("batch_time")), "batch_time": format_time(doc.get("batch_time")),
"mix_name": doc.get("mix_name", ""), "mix_name": doc.get("mix_name", ""),
"video_url": doc.get("video_url", ""), "video_url": doc.get("video_url", ""),
"playcount": doc.get("playcount", ""), "playcount": doc.get("playcount", ""),
@ -93,7 +132,7 @@ class MiniprogramAPI:
"cover_backup_urls": doc.get("cover_backup_urls", []) "cover_backup_urls": doc.get("cover_backup_urls", [])
} }
def get_mix_list(self, page=1, limit=20, sort_by="playcount"): def get_mix_list(page=1, limit=20, sort_by="playcount"):
"""获取合集列表(分页)""" """获取合集列表(分页)"""
try: try:
# 计算跳过的数量 # 计算跳过的数量
@ -102,7 +141,7 @@ class MiniprogramAPI:
# 设置排序字段 # 设置排序字段
if sort_by == "growth": if sort_by == "growth":
# 按增长排序需要特殊处理 # 按增长排序需要特殊处理
return self.get_growth_mixes(page, limit) return get_growth_mixes(page, limit)
else: else:
sort_field = "play_vv" if sort_by == "playcount" else "batch_time" sort_field = "play_vv" if sort_by == "playcount" else "batch_time"
sort_order = -1 # 降序 sort_order = -1 # 降序
@ -132,7 +171,7 @@ class MiniprogramAPI:
{"$limit": limit} {"$limit": limit}
] ]
docs = list(self.collection.aggregate(pipeline)) docs = list(collection.aggregate(pipeline))
# 获取总数 # 获取总数
total_pipeline = [ total_pipeline = [
@ -141,13 +180,13 @@ class MiniprogramAPI:
{"$group": {"_id": "$mix_name"}}, {"$group": {"_id": "$mix_name"}},
{"$count": "total"} {"$count": "total"}
] ]
total_result = list(self.collection.aggregate(total_pipeline)) total_result = list(collection.aggregate(total_pipeline))
total = total_result[0]["total"] if total_result else 0 total = total_result[0]["total"] if total_result else 0
# 格式化数据 # 格式化数据
mix_list = [] mix_list = []
for doc in docs: for doc in docs:
item = self.format_mix_item(doc) item = format_mix_item(doc)
mix_list.append(item) mix_list.append(item)
return { return {
@ -169,8 +208,8 @@ class MiniprogramAPI:
logging.error(f"获取合集列表失败: {e}") logging.error(f"获取合集列表失败: {e}")
return {"success": False, "message": f"获取数据失败: {str(e)}"} return {"success": False, "message": f"获取数据失败: {str(e)}"}
def get_growth_mixes(self, page=1, limit=20, start_date=None, end_date=None): def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None):
"""获取按播放量增长排序的合集列表""" """获取按播放量增长排序的合集列表 - 优先从定时器生成的数据中读取"""
try: try:
# 计算跳过的数量 # 计算跳过的数量
skip = (page - 1) * limit skip = (page - 1) * limit
@ -186,8 +225,55 @@ class MiniprogramAPI:
if isinstance(end_date, str): if isinstance(end_date, str):
end_date = datetime.strptime(end_date, "%Y-%m-%d").date() end_date = datetime.strptime(end_date, "%Y-%m-%d").date()
end_date_str = end_date.strftime("%Y-%m-%d")
start_date_str = start_date.strftime("%Y-%m-%d")
# 优先尝试从定时器生成的增长榜数据中读取
try:
growth_ranking = daily_rankings_collection.find_one({
"date": end_date_str,
"type": "growth",
"start_date": start_date_str,
"end_date": end_date_str
}, sort=[("calculation_sequence", -1)]) # 获取最新的计算结果
if growth_ranking and "data" in growth_ranking:
logging.info(f"📈 从定时器生成的增长榜数据中读取 {end_date_str} 的增长榜")
# 获取预先计算好的增长榜数据
growth_data = growth_ranking["data"]
# 分页处理
total = len(growth_data)
paginated_data = growth_data[skip:skip + limit]
return {
"success": True,
"data": paginated_data,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
},
"sort_by": "growth",
"date_range": {
"start_date": start_date_str,
"end_date": end_date_str
},
"data_source": "timer_generated", # 标识数据来源
"update_time": growth_ranking.get("created_at", datetime.now()).strftime("%Y-%m-%d %H:%M:%S") if isinstance(growth_ranking.get("created_at"), datetime) else str(growth_ranking.get("created_at", ""))
}
except Exception as e:
logging.warning(f"从定时器数据读取增长榜失败,将使用动态计算: {e}")
# 如果定时器数据不存在或读取失败,回退到动态计算
logging.info(f"📊 动态计算 {start_date_str}{end_date_str} 的增长榜")
# 查询结束日期的数据 # 查询结束日期的数据
end_cursor = self.collection.find({ end_cursor = collection.find({
"batch_time": { "batch_time": {
"$gte": datetime(end_date.year, end_date.month, end_date.day), "$gte": datetime(end_date.year, end_date.month, end_date.day),
"$lt": datetime(end_date.year, end_date.month, end_date.day) + timedelta(days=1) "$lt": datetime(end_date.year, end_date.month, end_date.day) + timedelta(days=1)
@ -196,7 +282,7 @@ class MiniprogramAPI:
end_data = list(end_cursor) end_data = list(end_cursor)
# 查询开始日期的数据 # 查询开始日期的数据
start_cursor = self.collection.find({ start_cursor = collection.find({
"batch_time": { "batch_time": {
"$gte": datetime(start_date.year, start_date.month, start_date.day), "$gte": datetime(start_date.year, start_date.month, start_date.day),
"$lt": datetime(start_date.year, start_date.month, start_date.day) + timedelta(days=1) "$lt": datetime(start_date.year, start_date.month, start_date.day) + timedelta(days=1)
@ -217,17 +303,17 @@ class MiniprogramAPI:
# 只保留增长为正的数据 # 只保留增长为正的数据
if growth > 0: if growth > 0:
item = self.format_mix_item(end_item) item = format_mix_item(end_item)
item["growth"] = growth item["growth"] = growth
item["start_date"] = start_date.strftime("%Y-%m-%d") item["start_date"] = start_date_str
item["end_date"] = end_date.strftime("%Y-%m-%d") item["end_date"] = end_date_str
growth_data.append(item) growth_data.append(item)
else: else:
# 如果开始日期没有数据,但结束日期有,也认为是新增长 # 如果开始日期没有数据,但结束日期有,也认为是新增长
item = self.format_mix_item(end_item) item = format_mix_item(end_item)
item["growth"] = end_item.get("play_vv", 0) item["growth"] = end_item.get("play_vv", 0)
item["start_date"] = start_date.strftime("%Y-%m-%d") item["start_date"] = start_date_str
item["end_date"] = end_date.strftime("%Y-%m-%d") item["end_date"] = end_date_str
growth_data.append(item) growth_data.append(item)
# 按增长值降序排序 # 按增长值降序排序
@ -254,22 +340,23 @@ class MiniprogramAPI:
}, },
"sort_by": "growth", "sort_by": "growth",
"date_range": { "date_range": {
"start_date": start_date.strftime("%Y-%m-%d"), "start_date": start_date_str,
"end_date": end_date.strftime("%Y-%m-%d") "end_date": end_date_str
}, },
"data_source": "dynamic_calculation", # 标识数据来源
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
} }
except Exception as e: except Exception as e:
logging.error(f"获取增长合集列表失败: {e}") logging.error(f"获取增长合集列表失败: {e}")
# 如果增长计算失败,返回按播放量排序的数据作为备选 # 如果增长计算失败,返回按播放量排序的数据作为备选
return self.get_mix_list(page, limit, "playcount") return get_mix_list(page, limit, "playcount")
def get_top_mixes(self, limit=10): def get_top_mixes(limit=10):
"""获取热门合集TOP榜单""" """获取热门合集TOP榜单"""
try: try:
# 按播放量排序获取热门合集 # 按播放量排序获取热门合集
cursor = self.collection.find().sort("play_vv", -1).limit(limit) cursor = collection.find().sort("play_vv", -1).limit(limit)
docs = list(cursor) docs = list(cursor)
if not docs: if not docs:
@ -278,21 +365,21 @@ class MiniprogramAPI:
# 格式化数据 # 格式化数据
top_list = [] top_list = []
for doc in docs: for doc in docs:
item = self.format_mix_item(doc) item = format_mix_item(doc)
top_list.append(item) top_list.append(item)
return { return {
"success": True, "success": True,
"data": top_list, "data": top_list,
"total": len(top_list), "total": len(top_list),
"update_time": self.format_time(docs[0].get("batch_time")) if docs else "" "update_time": format_time(docs[0].get("batch_time")) if docs else ""
} }
except Exception as e: except Exception as e:
logging.error(f"获取热门合集失败: {e}") logging.error(f"获取热门合集失败: {e}")
return {"success": False, "message": f"获取数据失败: {str(e)}"} return {"success": False, "message": f"获取数据失败: {str(e)}"}
def search_mixes(self, keyword, page=1, limit=10): def search_mixes(keyword, page=1, limit=10):
"""搜索合集""" """搜索合集"""
try: try:
if not keyword: if not keyword:
@ -307,16 +394,16 @@ class MiniprogramAPI:
} }
# 查询数据 # 查询数据
cursor = self.collection.find(search_condition).sort("play_vv", -1).skip(skip).limit(limit) cursor = collection.find(search_condition).sort("play_vv", -1).skip(skip).limit(limit)
docs = list(cursor) docs = list(cursor)
# 获取搜索结果总数 # 获取搜索结果总数
total = self.collection.count_documents(search_condition) total = collection.count_documents(search_condition)
# 格式化数据 # 格式化数据
search_results = [] search_results = []
for doc in docs: for doc in docs:
item = self.format_mix_item(doc) item = format_mix_item(doc)
search_results.append(item) search_results.append(item)
return { return {
@ -338,17 +425,17 @@ class MiniprogramAPI:
logging.error(f"搜索合集失败: {e}") logging.error(f"搜索合集失败: {e}")
return {"success": False, "message": f"搜索失败: {str(e)}"} return {"success": False, "message": f"搜索失败: {str(e)}"}
def get_mix_detail(self, mix_id): def get_mix_detail(mix_id):
"""获取合集详情""" """获取合集详情"""
try: try:
from bson import ObjectId from bson import ObjectId
# 尝试通过ObjectId查找 # 尝试通过ObjectId查找
try: try:
doc = self.collection.find_one({"_id": ObjectId(mix_id)}) doc = collection.find_one({"_id": ObjectId(mix_id)})
except: except:
# 如果ObjectId无效尝试其他字段 # 如果ObjectId无效尝试其他字段
doc = self.collection.find_one({ doc = collection.find_one({
"$or": [ "$or": [
{"mix_name": mix_id}, {"mix_name": mix_id},
{"request_id": mix_id} {"request_id": mix_id}
@ -359,7 +446,7 @@ class MiniprogramAPI:
return {"success": False, "message": "未找到合集信息"} return {"success": False, "message": "未找到合集信息"}
# 格式化详细信息 - 只返回数据库原始字段 # 格式化详细信息 - 只返回数据库原始字段
detail = self.format_mix_item(doc) detail = format_mix_item(doc)
return { return {
"success": True, "success": True,
@ -371,11 +458,11 @@ class MiniprogramAPI:
logging.error(f"获取合集详情失败: {e}") logging.error(f"获取合集详情失败: {e}")
return {"success": False, "message": f"获取详情失败: {str(e)}"} return {"success": False, "message": f"获取详情失败: {str(e)}"}
def get_statistics(self): def get_statistics():
"""获取统计信息""" """获取统计信息"""
try: try:
# 基本统计 # 基本统计
total_mixes = self.collection.count_documents({}) total_mixes = collection.count_documents({})
if total_mixes == 0: if total_mixes == 0:
return {"success": False, "message": "暂无数据"} return {"success": False, "message": "暂无数据"}
@ -393,16 +480,16 @@ class MiniprogramAPI:
} }
] ]
stats_result = list(self.collection.aggregate(pipeline)) stats_result = list(collection.aggregate(pipeline))
stats = stats_result[0] if stats_result else {} stats = stats_result[0] if stats_result else {}
# 获取最新更新时间 # 获取最新更新时间
latest_doc = self.collection.find().sort("batch_time", -1).limit(1) latest_doc = collection.find().sort("batch_time", -1).limit(1)
latest_time = "" latest_time = ""
if latest_doc: if latest_doc:
latest_list = list(latest_doc) latest_list = list(latest_doc)
if latest_list: if latest_list:
latest_time = self.format_time(latest_list[0].get("batch_time")) latest_time = format_time(latest_list[0].get("batch_time"))
# 热门分类统计(按播放量区间) # 热门分类统计(按播放量区间)
categories = [ categories = [
@ -414,11 +501,11 @@ class MiniprogramAPI:
for category in categories: for category in categories:
if "max" in category: if "max" in category:
count = self.collection.count_documents({ count = collection.count_documents({
"play_vv": {"$gte": category["min"], "$lte": category["max"]} "play_vv": {"$gte": category["min"], "$lte": category["max"]}
}) })
else: else:
count = self.collection.count_documents({ count = collection.count_documents({
"play_vv": {"$gte": category["min"]} "play_vv": {"$gte": category["min"]}
}) })
category["count"] = count category["count"] = count
@ -441,10 +528,10 @@ class MiniprogramAPI:
logging.error(f"获取统计信息失败: {e}") logging.error(f"获取统计信息失败: {e}")
return {"success": False, "message": f"获取统计失败: {str(e)}"} return {"success": False, "message": f"获取统计失败: {str(e)}"}
def get_videos(self): # 路由定义
@rank_bp.route('/videos')
def get_videos():
"""获取合集列表 - 兼容app.py调用""" """获取合集列表 - 兼容app.py调用"""
from flask import request
page = int(request.args.get('page', 1)) page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 20)) limit = int(request.args.get('limit', 20))
sort_by = request.args.get('sort', 'playcount') sort_by = request.args.get('sort', 'playcount')
@ -452,48 +539,58 @@ class MiniprogramAPI:
if sort_by == 'growth': if sort_by == 'growth':
start_date = request.args.get('start_date') start_date = request.args.get('start_date')
end_date = request.args.get('end_date') end_date = request.args.get('end_date')
return self.get_growth_mixes(page, limit, start_date, end_date) result = get_growth_mixes(page, limit, start_date, end_date)
else: else:
return self.get_mix_list(page, limit, sort_by) result = get_mix_list(page, limit, sort_by)
def get_top(self): return jsonify(result)
@rank_bp.route('/top')
def get_top():
"""获取热门榜单 - 兼容app.py调用""" """获取热门榜单 - 兼容app.py调用"""
from flask import request
limit = int(request.args.get('limit', 10)) limit = int(request.args.get('limit', 10))
return self.get_top_mixes(limit) result = get_top_mixes(limit)
return jsonify(result)
def search(self): @rank_bp.route('/search')
def search():
"""搜索合集 - 兼容app.py调用""" """搜索合集 - 兼容app.py调用"""
from flask import request
keyword = request.args.get('q', '') keyword = request.args.get('q', '')
page = int(request.args.get('page', 1)) page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 10)) limit = int(request.args.get('limit', 10))
return self.search_mixes(keyword, page, limit) result = search_mixes(keyword, page, limit)
return jsonify(result)
def get_detail(self): @rank_bp.route('/detail')
def get_detail():
"""获取合集详情 - 兼容app.py调用""" """获取合集详情 - 兼容app.py调用"""
from flask import request
mix_id = request.args.get('id', '') mix_id = request.args.get('id', '')
return self.get_mix_detail(mix_id) result = get_mix_detail(mix_id)
return jsonify(result)
def get_stats(self): @rank_bp.route('/stats')
def get_stats():
"""获取统计信息 - 兼容app.py调用""" """获取统计信息 - 兼容app.py调用"""
return self.get_statistics() result = get_statistics()
return jsonify(result)
def health_check(self): @rank_bp.route('/health')
def health_check():
"""健康检查 - 兼容app.py调用""" """健康检查 - 兼容app.py调用"""
try: try:
from database import client
# 检查数据库连接 # 检查数据库连接
if not self.client: if not client:
return {"success": False, "message": "数据库未连接"} return jsonify({"success": False, "message": "数据库未连接"})
# 测试数据库连接 # 测试数据库连接
self.client.admin.command('ping') client.admin.command('ping')
# 获取数据统计 # 获取数据统计
total_count = self.collection.count_documents({}) total_count = collection.count_documents({})
return { return jsonify({
"success": True, "success": True,
"message": "服务正常", "message": "服务正常",
"data": { "data": {
@ -501,10 +598,249 @@ class MiniprogramAPI:
"total_records": total_count, "total_records": total_count,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
} }
} })
except Exception as e: except Exception as e:
logging.error(f"健康检查失败: {e}") logging.error(f"健康检查失败: {e}")
return {"success": False, "message": f"服务异常: {str(e)}"} return jsonify({"success": False, "message": f"服务异常: {str(e)}"})
# 创建API实例
api = MiniprogramAPI() # ==================== 榜单查询API接口 ====================
@rank_bp.route('/rankings')
def get_rankings():
"""获取榜单列表 - 支持按日期和类型查询,支持动态排序"""
try:
# 获取查询参数
date = request.args.get('date') # 日期格式YYYY-MM-DD
ranking_type = request.args.get('type') # 榜单类型playcount, growth, newcomer
sort_by = request.args.get('sort_by', 'default') # 排序方式default, play_vv_change, play_vv_change_rate, play_vv
sort_order = request.args.get('sort_order', 'desc') # 排序顺序asc, desc
page = int(request.args.get('page', 1))
limit = int(request.args.get('limit', 50))
# 构建查询条件
query = {}
if date:
query['date'] = date
if ranking_type:
query['ranking_type'] = ranking_type
# 如果没有指定日期,默认获取最新日期的榜单
if not date:
latest_ranking = daily_rankings_collection.find_one(
{}, sort=[('date', -1)]
)
if latest_ranking:
query['date'] = latest_ranking['date']
# 查询榜单
rankings = list(daily_rankings_collection.find(query).sort('generated_at', -1))
if not rankings:
return jsonify({
"success": True,
"message": "暂无榜单数据",
"data": {
"rankings": [],
"total": 0,
"page": page,
"limit": limit
}
})
# 格式化返回数据
formatted_rankings = []
for ranking in rankings:
ranking_data = ranking.get('data', [])
# 动态排序逻辑
if sort_by != 'default' and ranking_data:
ranking_data = sort_ranking_data(ranking_data, sort_by, sort_order)
# 分页处理榜单数据
start_idx = (page - 1) * limit
end_idx = start_idx + limit
paginated_data = ranking_data[start_idx:end_idx]
formatted_rankings.append({
"date": ranking.get('date'),
"ranking_type": ranking.get('ranking_type'),
"ranking_name": ranking.get('ranking_name'),
"description": ranking.get('description'),
"data": paginated_data,
"total_count": len(ranking_data),
"current_page_count": len(paginated_data),
"generated_at": format_time(ranking.get('generated_at')),
"version": ranking.get('version', '1.0'),
"sort_info": {
"sort_by": sort_by,
"sort_order": sort_order
}
})
return jsonify({
"success": True,
"message": "获取榜单成功",
"data": {
"rankings": formatted_rankings,
"total": len(formatted_rankings),
"page": page,
"limit": limit,
"sort_by": sort_by,
"sort_order": sort_order
}
})
except Exception as e:
logging.error(f"获取榜单失败: {e}")
return jsonify({"success": False, "message": f"获取榜单失败: {str(e)}"})
@rank_bp.route('/rankings/dates')
def get_ranking_dates():
"""获取可用的榜单日期列表"""
try:
# 获取所有不重复的日期
dates = daily_rankings_collection.distinct('date')
dates.sort(reverse=True) # 按日期倒序排列
return jsonify({
"success": True,
"message": "获取日期列表成功",
"data": {
"dates": dates,
"total": len(dates)
}
})
except Exception as e:
logging.error(f"获取日期列表失败: {e}")
return jsonify({"success": False, "message": f"获取日期列表失败: {str(e)}"})
@rank_bp.route('/rankings/types')
def get_ranking_types():
"""获取支持的榜单类型"""
try:
# 获取所有不重复的榜单类型
types = daily_rankings_collection.distinct('ranking_type')
# 添加类型说明
type_descriptions = {
'playcount': '播放量榜 - 按播放量排序',
'growth': '增长榜 - 播放量增长最快',
'newcomer': '新晋榜 - 新上榜内容'
}
formatted_types = []
for type_name in types:
formatted_types.append({
"type": type_name,
"description": type_descriptions.get(type_name, type_name)
})
return jsonify({
"success": True,
"message": "获取榜单类型成功",
"data": {
"types": formatted_types,
"total": len(formatted_types)
}
})
except Exception as e:
logging.error(f"获取榜单类型失败: {e}")
return jsonify({"success": False, "message": f"获取榜单类型失败: {str(e)}"})
@rank_bp.route('/rankings/latest')
def get_latest_rankings():
"""获取最新的所有类型榜单"""
try:
# 获取最新日期
latest_ranking = daily_rankings_collection.find_one(
{}, sort=[('date', -1)]
)
if not latest_ranking:
return jsonify({
"success": True,
"message": "暂无榜单数据",
"data": {
"date": None,
"rankings": []
}
})
latest_date = latest_ranking['date']
# 获取该日期的所有榜单
rankings = list(daily_rankings_collection.find({
'date': latest_date
}).sort('ranking_type', 1))
formatted_rankings = []
for ranking in rankings:
# 只返回前20条数据
ranking_data = ranking.get('data', [])[:20]
formatted_rankings.append({
"ranking_type": ranking.get('ranking_type'),
"ranking_name": ranking.get('ranking_name'),
"description": ranking.get('description'),
"data": ranking_data,
"total_count": ranking.get('total_count', 0),
"preview_count": len(ranking_data)
})
return jsonify({
"success": True,
"message": "获取最新榜单成功",
"data": {
"date": latest_date,
"rankings": formatted_rankings,
"total_types": len(formatted_rankings)
}
})
except Exception as e:
logging.error(f"获取最新榜单失败: {e}")
return jsonify({"success": False, "message": f"获取最新榜单失败: {str(e)}"})
@rank_bp.route('/rankings/stats')
def get_rankings_stats():
"""获取榜单统计信息"""
try:
# 统计总榜单数
total_rankings = daily_rankings_collection.count_documents({})
# 统计日期数量
total_dates = len(daily_rankings_collection.distinct('date'))
# 统计榜单类型数量
total_types = len(daily_rankings_collection.distinct('ranking_type'))
# 获取最新和最早日期
latest_ranking = daily_rankings_collection.find_one({}, sort=[('date', -1)])
earliest_ranking = daily_rankings_collection.find_one({}, sort=[('date', 1)])
latest_date = latest_ranking['date'] if latest_ranking else None
earliest_date = earliest_ranking['date'] if earliest_ranking else None
return jsonify({
"success": True,
"message": "获取榜单统计成功",
"data": {
"total_rankings": total_rankings,
"total_dates": total_dates,
"total_types": total_types,
"latest_date": latest_date,
"earliest_date": earliest_date,
"date_range": f"{earliest_date}{latest_date}" if earliest_date and latest_date else "暂无数据"
}
})
except Exception as e:
logging.error(f"获取榜单统计失败: {e}")
return jsonify({"success": False, "message": f"获取榜单统计失败: {str(e)}"})