commit 99ae97162c372aad15b05a49c86074dafc7a1686 Author: qiaoyirui0819 <3160533978@qq.com> Date: Mon Oct 20 14:58:36 2025 +0800 Initial commit: 排行榜服务端项目 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..05acbec --- /dev/null +++ b/.gitignore @@ -0,0 +1,59 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Logs +*.log +logs/ +miniprogram_api.log + +# Data files +douyin_cdp_play_vv_*.json +douyin_cdp_play_vv_*.txt + +# Chrome profiles and drivers +# 注意:Chrome profile 包含大量缓存文件,不应加入Git +scripts/config/chrome_profile/ +drivers/* +!drivers/chromedriver.exe + +# Rankings config directory +handlers/Rankings/config/ + +# Environment variables +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo + +# OS +.DS_Store +Thumbs.db \ No newline at end of file diff --git a/Timer_worker.py b/Timer_worker.py new file mode 100644 index 0000000..05d0eb9 --- /dev/null +++ b/Timer_worker.py @@ -0,0 +1,363 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +抖音播放量自动抓取定时器 - 跨平台版本 + +功能: +- 每晚自动执行抖音播放量抓取任务 +- 数据抓取完成后自动生成各类榜单(播放量榜、增长榜、新晋榜、热门趋势榜) +- 支持Windows、macOS、Linux +- 自动保存数据到MongoDB + +使用方法: +- 正常模式:python Timer_worker.py(启动定时器) +- 测试模式:python Timer_worker.py --test(立即执行一次) +- 单次执行:python Timer_worker.py --once(立即执行一次并退出) +- 仅生成榜单:python Timer_worker.py --ranking-only(仅生成榜单,不抓取数据) +""" + +import schedule +import time +import sys +import os +import logging +import argparse +from pathlib import Path +from datetime import datetime, date +import config + +# 添加项目路径到 Python 路径 +sys.path.append(os.path.join(os.path.dirname(__file__), 'handlers', 'Rankings')) +from rank_data_scraper import DouyinPlayVVScraper + + + +# 配置日志的函数 +def setup_logging(): + """设置日志配置""" + # 确保logs目录存在 + import os + script_dir = os.path.dirname(os.path.abspath(__file__)) + logs_dir = os.path.join(script_dir, 'handlers', 'Rankings', 'logs') + os.makedirs(logs_dir, exist_ok=True) + + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(os.path.join(logs_dir, 'scheduler.log'), encoding='utf-8'), + logging.StreamHandler() + ] + ) + +class DouyinAutoScheduler: + def __init__(self): + self.is_running = False + + def run_douyin_scraper(self): + """执行抖音播放量抓取任务""" + try: + logging.info("🚀 开始执行抖音播放量抓取任务...") + + # 设置环境变量,确保自动模式 + os.environ['AUTO_CONTINUE'] = '1' + + # 直接创建并运行 DouyinPlayVVScraper 实例 + scraper = DouyinPlayVVScraper( + start_url="https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation", + auto_continue=True, + duration_s=60 + ) + + logging.info("📁 开始执行抓取任务...") + scraper.run() + + logging.info("✅ 抖音播放量抓取任务执行成功") + + # 数据抓取完成后,自动生成当日榜单 + self.generate_daily_rankings() + + except Exception as e: + logging.error(f"💥 执行任务时发生异常: {e}") + import traceback + logging.error(f"详细错误信息: {traceback.format_exc()}") + + def generate_daily_rankings(self): + """生成每日榜单数据(基于时间轴对比)""" + try: + from database import db + from datetime import timedelta + + # 获取集合 + douyin_collection = db['Rankings_list'] # 使用真实抓取的数据 + rankings_collection = db['Ranking_storage'] + + today = date.today() + yesterday = today - timedelta(days=1) + today_str = today.strftime('%Y-%m-%d') + yesterday_str = yesterday.strftime('%Y-%m-%d') + + logging.info(f"📅 正在生成 {today_str} 的榜单(对比 {yesterday_str})...") + + # 删除当天已有的榜单数据 + rankings_collection.delete_many({"date": today_str}) + logging.info(f"🗑️ 已清理 {today_str} 的旧榜单数据") + + # 获取今天和昨天的榜单数据进行对比 + try: + logging.info("🔄 正在生成时间轴对比榜单...") + + # 获取今天的数据,按短剧名称去重,只保留播放量最高的 + today_videos_raw = list(douyin_collection.find({}).sort("play_vv", -1)) + + # 按短剧名称去重,每个短剧只保留播放量最高的一条 + unique_videos = {} + for video in today_videos_raw: + mix_name = video.get("mix_name", "") + if mix_name and (mix_name not in unique_videos or video.get("play_vv", 0) > unique_videos[mix_name].get("play_vv", 0)): + unique_videos[mix_name] = video + + today_videos = list(unique_videos.values()) + + logging.info(f"📊 今日数据去重后:{len(today_videos)} 个独特短剧(原始数据:{len(today_videos_raw)} 条)") + + # 获取昨天的榜单数据(如果存在),取最新的计算结果 + yesterday_ranking = rankings_collection.find_one({ + "date": yesterday_str, + "type": "comprehensive" + }, sort=[("calculation_sequence", -1)]) + + yesterday_data = {} + if yesterday_ranking and "data" in yesterday_ranking: + # 将昨天的数据转换为字典,以短剧名称为键 + for item in yesterday_ranking["data"]: + title = item.get("title", "") + if title: + yesterday_data[title] = { + "rank": item.get("rank", 0), + "play_vv": item.get("play_vv", 0), + "video_id": item.get("video_id", "") + } + logging.info(f"📊 找到昨天的榜单数据,共 {len(yesterday_data)} 个短剧") + else: + logging.info("📊 未找到昨天的榜单数据,将作为首次生成") + + if today_videos: + # 先计算所有视频的播放量差值 + videos_with_growth = [] + for video in today_videos: + video_id = str(video.get("_id", "")) + current_play_vv = video.get("play_vv", 0) + + # 计算与昨天的对比数据 + play_vv_change = 0 + play_vv_change_rate = 0 + is_new = True + + mix_name = video.get("mix_name", "") + if mix_name in yesterday_data: + is_new = False + yesterday_play_vv = yesterday_data[mix_name]["play_vv"] + + # 计算播放量变化 + play_vv_change = current_play_vv - yesterday_play_vv + if yesterday_play_vv > 0: + play_vv_change_rate = round((play_vv_change / yesterday_play_vv) * 100, 2) + + # 创建包含增长数据的视频项 + video_with_growth = { + "video": video, + "play_vv_change": play_vv_change, + "play_vv_change_rate": play_vv_change_rate, + "is_new": is_new, + "yesterday_data": yesterday_data.get(mix_name, {}) + } + videos_with_growth.append(video_with_growth) + + # 按播放量差值降序排序(差值越大排名越靠前) + videos_with_growth.sort(key=lambda x: x["play_vv_change"], reverse=True) + + comprehensive_ranking = { + "date": today_str, + "type": "comprehensive", + "name": "播放量增长榜单", + "description": f"基于 {yesterday_str} 和 {today_str} 播放量差值排序的榜单(差值越大排名越靠前)", + "comparison_date": yesterday_str, + "total_videos": len(videos_with_growth), + "data": [] + } + + # 生成排序后的榜单数据 + for i, item in enumerate(videos_with_growth, 1): + video = item["video"] + video_id = str(video.get("_id", "")) + current_play_vv = video.get("play_vv", 0) + mix_name = video.get("mix_name", "") + + # 计算排名变化(基于昨天的排名) + rank_change = 0 + if not item["is_new"] and item["yesterday_data"]: + yesterday_rank = item["yesterday_data"].get("rank", 0) + rank_change = yesterday_rank - i + + ranking_item = { + "rank": i, + "title": mix_name, + "play_vv": current_play_vv, + "author": video.get("author", ""), + "video_id": video_id, + "video_url": video.get("video_url", ""), + "cover_image_url": video.get("cover_image_url", ""), + "playcount_str": video.get("playcount", ""), + # 时间轴对比数据 + "timeline_data": { + "is_new": item["is_new"], + "rank_change": rank_change, + "play_vv_change": item["play_vv_change"], + "play_vv_change_rate": item["play_vv_change_rate"], + "yesterday_rank": item["yesterday_data"].get("rank", 0) if not item["is_new"] else 0, + "yesterday_play_vv": item["yesterday_data"].get("play_vv", 0) if not item["is_new"] else 0 + } + } + + comprehensive_ranking["data"].append(ranking_item) + + # 为每次计算添加唯一的时间戳,确保数据唯一性 + current_timestamp = datetime.now() + comprehensive_ranking["created_at"] = current_timestamp + comprehensive_ranking["calculation_id"] = f"{today_str}_{current_timestamp.strftime('%H%M%S')}" + + # 检查今天已有多少次计算 + existing_count = rankings_collection.count_documents({ + "date": today_str, + "type": "comprehensive" + }) + + comprehensive_ranking["calculation_sequence"] = existing_count + 1 + + # 总是插入新的榜单记录,保留所有历史计算数据 + rankings_collection.insert_one(comprehensive_ranking) + logging.info(f"📝 创建了新的今日榜单数据(第{existing_count + 1}次计算,包含最新差值)") + logging.info(f"🔖 计算ID: {comprehensive_ranking['calculation_id']}") + + # 统计信息 + new_count = sum(1 for item in comprehensive_ranking["data"] if item["timeline_data"]["is_new"]) + logging.info(f"✅ 时间轴对比榜单生成成功") + logging.info(f"📊 总计 {len(comprehensive_ranking['data'])} 条记录") + logging.info(f"🆕 新上榜 {new_count} 条") + logging.info(f"🔄 对比基准日期: {yesterday_str}") + + return True + else: + logging.warning("⚠️ 榜单生成失败:无今日数据") + return False + + except Exception as e: + logging.error(f"💥 生成时间轴对比榜单时发生异常: {e}") + import traceback + logging.error(f"详细错误信息: {traceback.format_exc()}") + return False + + except Exception as e: + logging.error(f"💥 生成榜单时发生异常: {e}") + import traceback + logging.error(f"详细错误信息: {traceback.format_exc()}") + + + + def setup_schedule(self): + """设置定时任务""" + # 从配置文件读取执行时间 + scheduler_time = config.SCHEDULER_TIME + schedule.every().day.at(scheduler_time).do(self.run_douyin_scraper) + + logging.info(f"⏰ 定时器已设置:每晚{scheduler_time}执行抖音播放量抓取") + + def show_next_run(self): + """显示下次执行时间""" + jobs = schedule.get_jobs() + if jobs: + next_run = jobs[0].next_run + logging.info(f"⏰ 下次执行时间: {next_run}") + + def run_once(self): + """立即执行一次""" + logging.info("🔧 立即执行模式...") + self.run_douyin_scraper() + + def run_test(self): + """测试模式 - 立即执行一次""" + logging.info("🧪 测试模式 - 立即执行抖音播放量抓取任务...") + self.run_douyin_scraper() + + def run_ranking_only(self): + """仅生成榜单(不抓取数据)""" + logging.info("📊 仅生成榜单模式...") + self.generate_daily_rankings() + + def start_scheduler(self): + """启动定时器""" + self.is_running = True + logging.info("🚀 抖音播放量自动抓取定时器已启动") + logging.info(f"⏰ 执行时间:每天{config.SCHEDULER_TIME}执行抖音播放量抓取") + logging.info("📁 目标脚本:rank_data_scraper.py") + logging.info("💾 数据保存:MongoDB") + logging.info("⏹️ 按 Ctrl+C 停止定时器") + + try: + while self.is_running: + schedule.run_pending() + time.sleep(1) + + # 每分钟显示一次状态 + if int(time.time()) % 60 == 0: + self.show_next_run() + + except KeyboardInterrupt: + logging.info("\n⏹️ 定时器已停止") + self.is_running = False + +def main(): + """主函数""" + import argparse + + try: + parser = argparse.ArgumentParser(description='抖音播放量自动抓取定时器') + parser.add_argument('--test', action='store_true', help='测试模式 - 立即执行一次') + parser.add_argument('--once', action='store_true', help='立即执行一次并退出') + parser.add_argument('--ranking-only', action='store_true', help='仅生成榜单(不抓取数据)') + + args = parser.parse_args() + + # 设置日志配置 + setup_logging() + + print("正在初始化定时器...") + scheduler = DouyinAutoScheduler() + + if args.test: + print("执行测试模式...") + scheduler.run_test() + elif args.once: + print("执行单次模式...") + scheduler.run_once() + elif args.ranking_only: + print("执行榜单生成模式...") + scheduler.run_ranking_only() + else: + print("启动定时器模式...") + scheduler.setup_schedule() + scheduler.start_scheduler() + + print("程序执行完成") + + except Exception as e: + print(f"程序执行出错: {e}") + import traceback + traceback.print_exc() + return 1 + + return 0 + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..6499f93 --- /dev/null +++ b/app.py @@ -0,0 +1,32 @@ +from flask import Flask, jsonify +from flask_cors import CORS +import logging +import os + +app = Flask(__name__) +CORS(app) # 允许跨域访问 + +# 配置日志 +# 确保logs目录存在 +logs_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'handlers', 'Rankings', 'logs') +os.makedirs(logs_dir, exist_ok=True) + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(os.path.join(logs_dir, 'app.log'), encoding='utf-8'), + logging.StreamHandler() + ] +) + +# 导入并注册蓝图 +from routers.rank_api_routes import rank_bp +app.register_blueprint(rank_bp) + + +if __name__ == '__main__': + print("启动主程序服务...") + print("服务地址: http://localhost:5001") + + app.run(host='0.0.0.0', port=5001, debug=True) \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..035616d --- /dev/null +++ b/config.py @@ -0,0 +1,20 @@ +import os +import importlib + +# 数据库配置 +MONGO_URI = "mongodb://localhost:27017" +# MONGO_URI = "mongodb://mongouser:Jdei2243afN@172.16.0.6:27017,172.16.0.4:27017/test?replicaSet=cmgo-r6qkaern_0&authSource=admin" +MONGO_DB_NAME = "Rankings" + +# 应用配置 +APP_ENV = os.getenv('APP_ENV', 'development') +DEBUG = APP_ENV == 'development' + +# 日志配置 +LOG_LEVEL = 'INFO' +LOG_DIR = 'logs' + +# 定时器配置 +SCHEDULER_TIME = "24:00" # 定时器执行时间,格式为 HH:MM (24小时制) + +print(f"Successfully loaded configuration for environment: {APP_ENV}") \ No newline at end of file diff --git a/database.py b/database.py new file mode 100644 index 0000000..098307c --- /dev/null +++ b/database.py @@ -0,0 +1,19 @@ +from pymongo import MongoClient +import config +# from mongo_listeners import all_event_listeners # 导入监听器(暂时注释掉,因为文件不存在) + +MONGO_URI = config.MONGO_URI +DB_NAME = config.MONGO_DB_NAME + +# 创建MongoDB客户端连接 +try: + # 实例化MongoClient时传入事件监听器 + client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) # 设置5秒超时 + db = client[DB_NAME] + # 主动检查连接状态 + client.admin.command('ping') + success_message = f"\033[92m成功连接到MongoDB: {DB_NAME}\033[0m" + print(success_message) +except Exception as e: + error_message = f"\033[91m数据库连接失败: {MONGO_URI},请检查MongoDB服务是否已启动。\033[0m" + print(error_message) \ No newline at end of file diff --git a/docs/API接口文档.md b/docs/API接口文档.md new file mode 100644 index 0000000..a4ab59d --- /dev/null +++ b/docs/API接口文档.md @@ -0,0 +1,510 @@ +# 抖音播放量数据API接口文档 + +## 概述 + +本API服务提供抖音播放量数据的查询、搜索、统计等功能,专为小程序优化设计。 + +**基础信息** +- 服务地址:`http://localhost:5001` +- 数据源:MongoDB数据库 +- 数据更新:每日14:23自动更新 +- 响应格式:JSON + +## 通用响应格式 + +所有API接口都遵循以下响应格式: + +```json +{ + "success": true, + "data": {}, + "message": "操作成功", + "update_time": "2025-10-17 15:30:00" +} +``` + +**字段说明** +- `success`: 请求是否成功 +- `data`: 返回的数据内容 +- `message`: 操作结果描述 +- `update_time`: 数据更新时间 + +## 数据模型 + +### 合集数据项 +```json +{ + "_id": "674f1234567890abcdef", + "batch_time": "2025-10-17 15:30:00", + "mix_name": "合集名称", + "video_url": "https://www.douyin.com/video/xxx", + "playcount": "1.2亿", + "play_vv": 120000000, + "request_id": "request_xxx", + "rank": 1, + "cover_image_url": "https://p3.douyinpic.com/xxx", + "cover_backup_urls": ["url1", "url2"] +} +``` + +### 分页信息 +```json +{ + "pagination": { + "page": 1, + "limit": 20, + "total": 100, + "pages": 5, + "has_next": true, + "has_prev": false + } +} +``` + +## API接口列表 + +### 1. 首页信息 + +**接口地址** +``` +GET / +``` + +**功能描述** +获取API服务的基本信息和所有可用接口列表 + +**请求参数** +无 + +**响应示例** +```json +{ + "success": true, + "data": { + "name": "抖音播放量数据API服务", + "version": "2.0", + "description": "主程序服务 - 整合小程序API功能", + "endpoints": { + "/api/rank/videos": "获取视频列表 (支持分页和排序)", + "/api/rank/top": "获取热门视频榜单", + "/api/rank/search": "搜索视频", + "/api/rank/detail": "获取视频详情", + "/api/rank/stats": "获取统计信息", + "/api/rank/health": "健康检查", + "/api/rank/rankings": "获取榜单列表" + }, + "features": [ + "分页支持", + "多种排序方式", + "搜索功能", + "详情查看", + "统计分析", + "小程序优化" + ] + } +} +``` + +### 2. 获取视频列表 + +**接口地址** +``` +GET /api/rank/videos +``` + +**功能描述** +获取视频合集列表,支持分页和排序 + +**请求参数** +| 参数名 | 类型 | 必填 | 默认值 | 说明 | +|--------|------|------|--------|------| +| page | int | 否 | 1 | 页码 | +| limit | int | 否 | 20 | 每页数量 | +| sort | string | 否 | playcount | 排序方式:playcount(播放量), growth(增长量) | +| start_date | string | 否 | 昨天 | 增长计算开始日期(YYYY-MM-DD) | +| end_date | string | 否 | 今天 | 增长计算结束日期(YYYY-MM-DD) | + +**排序说明** +- `playcount`: 按当前播放量从高到低排序 +- `growth`: 按播放量增长差值从大到小排序 + - 计算公式:增长值 = 结束日期播放量 - 开始日期播放量 + - 只显示增长为正数的合集 + - 排序规则:增长差值越大,排名越靠前 + +**使用示例** +``` +# 按播放量排序(当前播放量从高到低) +GET /api/rank/videos?page=1&limit=20&sort=playcount + +# 按增长量排序(播放量差值从大到小,默认昨天到今天的增长) +GET /api/rank/videos?page=1&limit=20&sort=growth + +# 按自定义日期范围的增长排序(播放量差值从大到小) +GET /api/rank/videos?page=1&limit=20&sort=growth&start_date=2025-10-16&end_date=2025-10-17 +``` + +**响应示例** + +播放量排序响应: +```json +{ + "success": true, + "data": [ + { + "_id": "674f1234567890abcdef", + "batch_time": "2025-10-17 15:30:00", + "mix_name": "热门合集1", + "video_url": "https://www.douyin.com/video/xxx", + "playcount": "1.2亿", + "play_vv": 120000000, + "request_id": "request_xxx", + "rank": 1, + "cover_image_url": "https://p3.douyinpic.com/xxx", + "cover_backup_urls": ["url1", "url2"] + } + ], + "pagination": { + "page": 1, + "limit": 20, + "total": 100, + "pages": 5, + "has_next": true, + "has_prev": false + }, + "sort_by": "playcount", + "update_time": "2025-10-17 15:30:00" +} +``` + +增长榜排序响应(包含额外的growth字段): +```json +{ + "success": true, + "data": [ + { + "_id": "674f1234567890abcdef", + "batch_time": "2025-10-17 15:30:00", + "mix_name": "热门合集1", + "video_url": "https://www.douyin.com/video/xxx", + "playcount": "1.2亿", + "play_vv": 120000000, + "request_id": "request_xxx", + "rank": 1, + "growth": 5000000, + "start_date": "2025-10-16", + "end_date": "2025-10-17", + "cover_image_url": "https://p3.douyinpic.com/xxx", + "cover_backup_urls": ["url1", "url2"] + } + ], + "pagination": { + "page": 1, + "limit": 20, + "total": 50, + "pages": 3, + "has_next": true, + "has_prev": false + }, + "sort_by": "growth", + "date_range": { + "start_date": "2025-10-16", + "end_date": "2025-10-17" + }, + "update_time": "2025-10-17 15:30:00" +} +``` + +### 3. 获取热门榜单 + +**接口地址** +``` +GET /api/rank/top +``` + +**功能描述** +获取热门视频榜单(按播放量排序) + +**请求参数** +| 参数名 | 类型 | 必填 | 默认值 | 说明 | +|--------|------|------|--------|------| +| limit | int | 否 | 10 | 返回数量 | + +**使用示例** +``` +GET /api/rank/top?limit=10 +``` + +**响应示例** +```json +{ + "success": true, + "data": [ + { + "_id": "674f1234567890abcdef", + "batch_time": "2025-10-17 15:30:00", + "mix_name": "热门合集1", + "video_url": "https://www.douyin.com/video/xxx", + "playcount": "1.2亿", + "play_vv": 120000000, + "request_id": "request_xxx", + "rank": 1, + "cover_image_url": "https://p3.douyinpic.com/xxx", + "cover_backup_urls": ["url1", "url2"] + } + ], + "total": 10, + "update_time": "2025-10-17 15:30:00" +} +``` + +### 4. 搜索视频 + +**接口地址** +``` +GET /api/rank/search +``` + +**功能描述** +根据关键词搜索视频合集 + +**请求参数** +| 参数名 | 类型 | 必填 | 默认值 | 说明 | +|--------|------|------|--------|------| +| q | string | 是 | - | 搜索关键词 | +| page | int | 否 | 1 | 页码 | +| limit | int | 否 | 10 | 每页数量 | + +**使用示例** +``` +GET /api/rank/search?q=关键词&page=1&limit=10 +``` + +**响应示例** +```json +{ + "success": true, + "data": [ + { + "_id": "674f1234567890abcdef", + "batch_time": "2025-10-17 15:30:00", + "mix_name": "包含关键词的合集", + "video_url": "https://www.douyin.com/video/xxx", + "playcount": "1.2亿", + "play_vv": 120000000, + "request_id": "request_xxx", + "rank": 1, + "cover_image_url": "https://p3.douyinpic.com/xxx", + "cover_backup_urls": ["url1", "url2"] + } + ], + "keyword": "关键词", + "pagination": { + "page": 1, + "limit": 10, + "total": 15, + "pages": 2, + "has_next": true, + "has_prev": false + }, + "update_time": "2025-10-17 15:30:00" +} +``` + +### 5. 获取视频详情 + +**接口地址** +``` +GET /api/rank/detail +``` + +**功能描述** +获取指定合集的详细信息 + +**请求参数** +| 参数名 | 类型 | 必填 | 默认值 | 说明 | +|--------|------|------|--------|------| +| id | string | 是 | - | 合集ID(支持ObjectId、合集名称、request_id) | + +**使用示例** +``` +GET /api/rank/detail?id=674f1234567890abcdef +``` + +**响应示例** +```json +{ + "success": true, + "data": { + "_id": "674f1234567890abcdef", + "batch_time": "2025-10-17 15:30:00", + "mix_name": "合集名称", + "video_url": "https://www.douyin.com/video/xxx", + "playcount": "1.2亿", + "play_vv": 120000000, + "request_id": "request_xxx", + "rank": 1, + "cover_image_url": "https://p3.douyinpic.com/xxx", + "cover_backup_urls": ["url1", "url2"] + }, + "update_time": "2025-10-17 15:30:00" +} +``` + +### 6. 获取统计信息 + +**接口地址** +``` +GET /api/rank/stats +``` + +**功能描述** +获取系统统计信息 + +**请求参数** +无 + +**响应示例** +```json +{ + "success": true, + "data": { + "total_mixes": 1000, + "total_playcount": 5000000000, + "avg_playcount": 5000000, + "max_playcount": 200000000, + "min_playcount": 1000, + "categories": [ + { + "name": "超热门", + "min": 100000000, + "count": 5 + }, + { + "name": "热门", + "min": 50000000, + "max": 99999999, + "count": 20 + }, + { + "name": "中等", + "min": 10000000, + "max": 49999999, + "count": 150 + }, + { + "name": "一般", + "min": 0, + "max": 9999999, + "count": 825 + } + ], + "latest_update": "2025-10-17 15:30:00" + }, + "update_time": "2025-10-17 15:30:00" +} +``` + +### 7. 健康检查 + +**接口地址** +``` +GET /api/rank/health +``` + +**功能描述** +检查服务状态和数据库连接 + +**请求参数** +无 + +**响应示例** +```json +{ + "success": true, + "message": "服务正常", + "data": { + "database": "连接正常", + "total_records": 1000, + "timestamp": "2025-10-17 15:30:00" + } +} +``` + +## 错误处理 + +### 通用错误格式 +```json +{ + "success": false, + "message": "错误描述", + "update_time": "2025-10-17 15:30:00" +} +``` + +### 常见错误 +- `数据库连接失败`:MongoDB连接异常 +- `未找到合集信息`:查询的合集不存在 +- `请提供搜索关键词`:搜索接口缺少关键词参数 +- `获取数据失败`:数据查询异常 + +## 小程序使用建议 + +### 1. 分页加载 +推荐使用分页加载,避免一次性加载过多数据: +```javascript +// 小程序端示例 +wx.request({ + url: 'http://localhost:5001/api/rank/videos', + data: { + page: 1, + limit: 20, + sort: 'playcount' + }, + success: (res) => { + if (res.data.success) { + this.setData({ + videos: res.data.data, + hasNext: res.data.pagination.has_next + }) + } + } +}) +``` + +### 2. 搜索优化 +- 使用防抖处理搜索请求 +- 显示搜索进度和结果数量 +- 提供搜索建议 + +### 3. 图片加载 +- 优先使用 `cover_image_url` +- 备用 `cover_backup_urls` 作为备选 +- 添加图片加载失败处理 + +### 4. 数据更新 +- 注意 `update_time` 字段,判断数据新鲜度 +- 合理使用缓存策略 +- 定期检查服务健康状态 + +## 部署说明 + +### 启动服务 +```bash +cd C:\Users\EDY\Desktop\rank_backend +python app.py +``` + +### 服务信息 +- 端口:5000 +- 数据库:MongoDB (localhost:27017) +- 数据更新:每晚24:00自动执行 + +### 注意事项 +- 确保MongoDB服务已启动 +- 确保网络连接正常 +- 小程序端需要配置合法域名(生产环境) + +--- + +**文档版本**:v2.0 +**最后更新**:2025-10-17**维护者**:系统自动生成 \ No newline at end of file diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..3358197 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,101 @@ +# 排名系统(Rankings)说明大纲 + +## 1. 项目概览 +- 提供抖音收藏合集真实播放量数据采集与API服务 +- 抓取脚本写入 MongoDB;API 按播放量与增长榜返回数据 + +## 2. 目录速览(关键) +- `handlers/Rankings/rank_data_scraper.py` 数据抓取脚本(Selenium+CDP) +- `routers/rank_api_routes.py` 小程序 API 数据访问/逻辑模块(由 `app.py` 调用,不独立运行) +- `app.py` 主服务入口(Flask应用,注册所有 API 路由) +- `Timer_worker.py` 定时任务,每日自动运行抓取 + +### 项目结构(简版) +``` +项目根/ +├── app.py # 主服务入口(5001) +├── Timer_worker.py # 定时抓取任务 +├── config.py # 全局配置 +├── database.py # 数据库封装 +├── docs/ # 项目文档 +│ ├── README.md # 项目说明文档 +│ ├── API接口文档.md # API接口说明 +│ └── requirements.txt # 依赖包列表 +├── routers/ +│ └── rank_api_routes.py # 小程序API逻辑模块 +└── handlers/ + └── Rankings/ + ├── rank_data_scraper.py # 抓取脚本(Selenium+CDP) + └── drivers/ # 浏览器驱动等 + └── chromedriver.exe # Chrome驱动程序 +``` +- 核心数据表:`Rankings/Rankings_list` +- 日志示例:`handlers/Rankings/logs/douyin_scraper.log` + +## 3. 服务与端口 +- 单一服务:`app.py`(默认端口 `5001`,包含小程序 API 路由) + +## 4. 一键启动 +- 启动主服务: + ```bash + python app.py + ``` +- 启动定时任务(每日 9:35 自动抓取): + ```bash + python Timer_worker.py + ``` + +## 5. 使用步骤(首次登录与日常) +- 安装依赖: + ```bash + pip install -r docs/requirements.txt + ``` +- 第一次使用(登录抖音): + - 运行抓取脚本:`python handlers/Rankings/rank_data_scraper.py` + - 弹出 Chrome 后,完成抖音登录(扫码/账号均可)。 + - 登录完成后,回到终端提示界面按回车继续抓取。 + - 后续运行会复用已登录的浏览器配置,免重复登录。 + +- 日常流程: + - 抓取:`python handlers/Rankings/rank_data_scraper.py` + - 服务:`python app.py`(端口 `5001`) + - 定时:`python Timer_worker.py`(每日 14:23 自动执行) + +- 验证数据: + - MongoDB:数据库 `Rankings`,集合 `Rankings_list` + - API 检查: + - `http://localhost:5001/api/rank/health` + - `http://localhost:5001/api/rank/videos?page=1&limit=20&sort=playcount` + - 增长榜:`http://localhost:5001/api/rank/videos?sort=growth&page=1&limit=20` + +## 6. 数据抓取流程(简版) +- 复用已登录的 Chrome 配置,滚动/刷新触发请求 +- 通过 CDP 捕获响应,解析 `play_vv` 与 SSR 数据 +- 按合集聚合视频,写入 MongoDB 指定集合 + +## 7. 数据库与集合 +- 数据库:`Rankings` +- 集合:`Rankings_list` +- 连接:`mongodb://localhost:27017/`(可通过环境变量覆盖) + +## 8. API 功能摘要 +- 视频列表(分页、按播放量/时间排序,仅当日最新数据) +- 增长榜(按指定日期区间对比增长量,分页返回) + +## 9. 配置项(环境变量) +- `MONGO_HOST` 默认 `localhost` +- `MONGO_PORT` 默认 `27017` +- `MONGO_DB` 默认 `Rankings` +- `MONGO_COLLECTION` 默认 `Rankings_list` + +## 10. 快速排错 +- MongoDB 连接失败:抓取脚本将仅保存本地文件日志 +- ChromeDriver 配置:`handlers/Rankings/drivers/chromedriver.exe` +- 日志位置:`handlers/Rankings/logs/`(运行时自动创建) + +## 11. 你需要知道的 +- 当前架构下使用单一服务端口 `5001`;`routers/rank_api_routes.py` 提供逻辑模块,由 `app.py` 注册路由并统一对外服务。 +- 抓取脚本与 API 使用同一集合,数据结构一致 +- 小程序 API 专注返回易用字段(封面、播放量、时间、链接) +- 可直接在现有数据上新增排序或过滤,保持接口向后兼容 +- ChromeDriver 已配置本地版本,避免网络下载问题 \ No newline at end of file diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000..fd746aa --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,20 @@ +# Web抓取相关 +selenium>=4.15.0 +chromedriver-autoinstaller>=0.6.0 +webdriver-manager>=4.0.0 + +# Web服务框架 +flask>=2.3.0 +flask-cors>=4.0.0 + +# 数据库 +pymongo>=4.5.0 + +# 定时任务 +schedule>=1.2.0 + +# 系统工具 +psutil>=5.9.0 + +# 数据处理 +pathlib2>=2.3.7; python_version<"3.4" \ No newline at end of file diff --git a/handlers/Rankings/drivers/chromedriver-win32.zip b/handlers/Rankings/drivers/chromedriver-win32.zip new file mode 100644 index 0000000..337f702 Binary files /dev/null and b/handlers/Rankings/drivers/chromedriver-win32.zip differ diff --git a/handlers/Rankings/drivers/chromedriver.exe b/handlers/Rankings/drivers/chromedriver.exe new file mode 100644 index 0000000..9225a08 Binary files /dev/null and b/handlers/Rankings/drivers/chromedriver.exe differ diff --git a/handlers/Rankings/rank_data_scraper.py b/handlers/Rankings/rank_data_scraper.py new file mode 100644 index 0000000..d3d8312 --- /dev/null +++ b/handlers/Rankings/rank_data_scraper.py @@ -0,0 +1,787 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Selenium + Chrome DevTools Protocol 抓取抖音收藏合集真实播放量(play_vv) + +核心能力: +- 启用CDP网络事件,获取响应体并解析play_vv +- 复用本地Chrome用户数据,绕过登录障碍 +- 自动滚动与刷新触发更多API请求 +- 同时解析页面中的SSR数据(window._SSR_HYDRATED_DATA/RENDER_DATA) + +使用方法: +1) 默认复用 `config/chrome_profile` 下的已登录Chrome配置。 +2) 若仍需登录,请在弹出的Chrome中完成登录后回到终端按回车。 +3) 程序会滚动和刷新,自动收集网络数据并提取play_vv。 +""" + +import json +import re +import subprocess +import time +import logging +import os +import shutil +from datetime import datetime + +from selenium import webdriver +import os +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.chrome.options import Options +# 保留导入但默认不使用webdriver_manager,避免网络下载卡顿 +from webdriver_manager.chrome import ChromeDriverManager # noqa: F401 +import chromedriver_autoinstaller +import sys +import os +# 添加项目根目录到 Python 路径 +sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) +from database import db + + +# 配置日志 +# 确保logs目录存在 +import os +script_dir = os.path.dirname(os.path.abspath(__file__)) +logs_dir = os.path.join(script_dir, 'logs') +os.makedirs(logs_dir, exist_ok=True) + +logging.basicConfig( + level=logging.INFO, + format='[%(levelname)s] %(message)s', + handlers=[ + logging.FileHandler(os.path.join(logs_dir, 'douyin_scraper.log'), encoding='utf-8'), + logging.StreamHandler() + ] +) + + +class DouyinPlayVVScraper: + def __init__(self, start_url: str = None, auto_continue: bool = False, duration_s: int = 60): + self.start_url = start_url or "https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation" + self.auto_continue = auto_continue + self.duration_s = duration_s + self.driver = None + self.play_vv_items = [] # list of dicts: {play_vv, formatted, url, request_id, mix_name, watched_item} + self.captured_responses = [] + self.db = None + self.collection = None + self._cleanup_old_profiles() + self._setup_mongodb() + + def _setup_mongodb(self): + """设置MongoDB连接""" + try: + # 使用 database.py 中的连接 + self.db = db + + # 设置集合 + mongo_collection = os.environ.get('MONGO_COLLECTION', 'Rankings_list') + self.collection = self.db[mongo_collection] + + logging.info(f'MongoDB连接成功,使用数据库: {self.db.name},集合: {mongo_collection}') + + except Exception as e: + logging.warning(f'MongoDB设置出错: {e}') + self.db = None + self.collection = None + + def _cleanup_old_profiles(self): + """清理超过一天的旧临时Chrome配置文件""" + try: + script_dir = os.path.dirname(os.path.abspath(__file__)) + profile_base_dir = os.path.join(script_dir, 'config', 'chrome_profile') + if not os.path.exists(profile_base_dir): + return + + current_time = time.time() + one_day_ago = current_time - 24 * 60 * 60 # 24小时前 + + for item in os.listdir(profile_base_dir): + if item.startswith('run_'): + item_path = os.path.join(profile_base_dir, item) + if os.path.isdir(item_path): + try: + # 提取时间戳 + timestamp = int(item.split('_')[1]) + if timestamp < one_day_ago: + shutil.rmtree(item_path, ignore_errors=True) + logging.info(f'清理旧配置文件: {item}') + except (ValueError, IndexError): + # 如果无法解析时间戳,跳过 + continue + except Exception as e: + logging.warning(f'清理旧配置文件时出错: {e}') + + def _cleanup_chrome_processes(self): + """清理可能占用配置文件的Chrome进程""" + try: + import subprocess + import psutil + + # 获取当前配置文件路径 + script_dir = os.path.dirname(os.path.abspath(__file__)) + profile_dir = os.path.join(script_dir, 'config', 'chrome_profile', 'douyin_persistent') + + # 查找使用该配置文件的Chrome进程 + killed_processes = [] + for proc in psutil.process_iter(['pid', 'name', 'cmdline']): + try: + if proc.info['name'] and 'chrome' in proc.info['name'].lower(): + cmdline = proc.info['cmdline'] + if cmdline and any(profile_dir in arg for arg in cmdline): + proc.terminate() + killed_processes.append(proc.info['pid']) + logging.info(f'终止占用配置文件的Chrome进程: PID {proc.info["pid"]}') + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + continue + + # 等待进程终止 + if killed_processes: + time.sleep(2) + + return len(killed_processes) > 0 + + except ImportError: + # 如果没有psutil,使用系统命令 + try: + result = subprocess.run(['taskkill', '/f', '/im', 'chrome.exe'], + capture_output=True, text=True, timeout=10) + if result.returncode == 0: + logging.info('使用taskkill清理Chrome进程') + time.sleep(2) + return True + except Exception as e: + logging.warning(f'清理Chrome进程失败: {e}') + return False + except Exception as e: + logging.warning(f'清理Chrome进程时出错: {e}') + return False + + def setup_driver(self): + logging.info('初始化Chrome WebDriver (启用CDP网络日志)') + + # 清理可能占用配置文件的Chrome进程 + self._cleanup_chrome_processes() + + chrome_options = Options() + chrome_options.add_argument('--no-sandbox') + chrome_options.add_argument('--disable-dev-shm-usage') + chrome_options.add_argument('--disable-blink-features=AutomationControlled') + chrome_options.add_experimental_option('excludeSwitches', ['enable-automation']) + chrome_options.add_experimental_option('useAutomationExtension', False) + chrome_options.add_argument('--disable-extensions') + chrome_options.add_argument('--remote-allow-origins=*') + chrome_options.add_argument('--remote-debugging-port=0') + chrome_options.add_argument('--start-maximized') + chrome_options.add_argument('--lang=zh-CN') + # 使用固定的Chrome配置文件目录以保持登录状态 + script_dir = os.path.dirname(os.path.abspath(__file__)) + profile_dir = os.path.join(script_dir, 'config', 'chrome_profile', 'douyin_persistent') + os.makedirs(profile_dir, exist_ok=True) + chrome_options.add_argument(f'--user-data-dir={profile_dir}') + logging.info(f'使用持久化Chrome配置文件: {profile_dir}') + # 明确设置Chrome二进制路径(32位Chrome常见安装位置) + possible_chrome_bins = [ + r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe", + r"C:\Program Files\Google\Chrome\Application\chrome.exe" + ] + for bin_path in possible_chrome_bins: + if os.path.exists(bin_path): + chrome_options.binary_location = bin_path + logging.info(f'使用Chrome二进制路径: {bin_path}') + break + # 性能日志(Network事件) + chrome_options.set_capability('goog:loggingPrefs', {'performance': 'ALL'}) + + # 仅使用本地或PATH中的chromedriver,避免网络下载依赖 + driver_ready = False + candidates = [] + # 可通过环境变量强制覆盖驱动路径 + env_override = os.environ.get('OVERRIDE_CHROMEDRIVER') + if env_override: + candidates.append(env_override) + logging.info(f'检测到环境变量 OVERRIDE_CHROMEDRIVER,优先使用: {env_override}') + # 脚本所在目录的drivers路径(优先) + script_dir = os.path.dirname(os.path.abspath(__file__)) + script_driver_path = os.path.join(script_dir, 'drivers', 'chromedriver.exe') + candidates.append(script_driver_path) + logging.info(f'优先尝试脚本目录路径: {script_driver_path}') + + # 项目根目录的drivers路径 + user_driver_path = os.path.join(os.getcwd(), 'drivers', 'chromedriver.exe') + candidates.append(user_driver_path) + logging.info(f'尝试项目根目录路径: {user_driver_path}') + + # 项目根目录 + candidates.append(os.path.join(os.getcwd(), 'chromedriver.exe')) + # 其他可能目录 + candidates.append(os.path.join(os.getcwd(), 'drivers', 'chromedriver')) + # PATH 中的chromedriver + which_path = shutil.which('chromedriver') + if which_path: + candidates.append(which_path) + + if not driver_ready: + for p in candidates: + try: + if p and os.path.exists(p): + logging.info(f'尝试使用chromedriver: {p}') + service = Service(p) + self.driver = webdriver.Chrome(service=service, options=chrome_options) + driver_ready = True + logging.info(f'使用chromedriver启动成功: {p}') + try: + caps = self.driver.capabilities + browser_ver = caps.get('browserVersion') or caps.get('version') + cdver = caps.get('chrome', {}).get('chromedriverVersion') + logging.info(f'Chrome版本: {browser_ver}, ChromeDriver版本: {cdver}') + except Exception: + pass + break + else: + logging.info(f'候选路径不存在: {p}') + except Exception as e: + logging.warning(f'尝试使用 {p} 启动失败: {e}') + + if not driver_ready: + # 最终回退:使用webdriver-manager(可能需要网络) + try: + service = Service(ChromeDriverManager().install()) + self.driver = webdriver.Chrome(service=service, options=chrome_options) + driver_ready = True + logging.info('使用webdriver-manager成功启动ChromeDriver') + except Exception as e: + raise RuntimeError('未能启动ChromeDriver。请手动下载匹配版本的chromedriver到项目根目录或PATH,或检查网络以允许webdriver-manager下载。错误: ' + str(e)) + + # 反检测 + try: + self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") + except Exception: + pass + + # 启用CDP Network + try: + self.driver.execute_cdp_cmd('Network.enable', {}) + logging.info('已启用CDP Network') + except Exception as e: + logging.warning(f'启用CDP Network失败: {e}') + + def navigate(self): + logging.info(f'导航到: {self.start_url}') + self.driver.get(self.start_url) + time.sleep(8) # 增加页面加载等待时间 + + def ensure_login(self): + """确保用户已登录并导航到收藏合集页面""" + logging.info("检测登录状态和页面位置...") + + # 首先检查是否已经登录并在正确页面 + if self._check_login_and_page(): + logging.info("检测到已登录且在收藏合集页面,跳过手动确认") + return + + # 如果未登录或不在正确页面,进行手动登录流程 + logging.info("请在弹出的浏览器中手动完成登录。") + + if self.auto_continue: + logging.info('自动继续模式,跳过手动等待...') + time.sleep(5) + return + + logging.info("进入手动登录确认循环...") + while True: + # 要求用户输入特定文本确认 + logging.info("等待用户输入确认...") + user_input = input("请在浏览器中完成登录,并导航到【我的】→【收藏】→【合集】页面。操作完成后,请在此处输入 'ok' 并按回车: ") + + if user_input.strip().lower() != 'ok': + logging.warning("请输入 'ok' 确认您已完成登录并导航到【我的】→【收藏】→【合集】页面。") + continue + + logging.info("用户已确认,检查当前页面...") + + try: + current_url = self.driver.current_url + logging.info(f"当前页面URL: {current_url}") + if ("douyin.com/user/self" in current_url and + ("favorite_collection" in current_url or "compilation" in current_url)): + logging.info(f"已确认您位于收藏合集列表页面: {current_url}") + logging.info("脚本将继续执行...") + break + else: + # 用户确认了,但页面不正确,继续循环等待 + logging.warning(f"检测到当前页面 ({current_url}) 并非收藏合集列表页面。请确保已导航至【我的】→【收藏】→【合集】页面。") + + except Exception as e: + if "browser has been closed" in str(e) or "no such window" in str(e) or "target window already closed" in str(e): + logging.error("浏览器窗口已关闭,脚本无法继续。") + raise RuntimeError("浏览器窗口已关闭") + logging.warning(f"检测URL时出错: {e}。请重试。") + time.sleep(1) + + def _check_login_and_page(self, timeout: int = 600) -> bool: + """检查是否已登录并在正确页面""" + try: + current_url = self.driver.current_url + logging.info(f"当前页面URL: {current_url}") + + # 检查是否在收藏合集页面 + if ("douyin.com/user/self" in current_url and + ("favorite_collection" in current_url or "compilation" in current_url)): + # 进一步检查登录状态 + return self._detect_login_status(timeout) + else: + # 如果不在正确页面,尝试导航到收藏合集页面 + if self._detect_login_status(timeout): + logging.info("已登录但不在收藏合集页面,自动导航...") + self.driver.get(self.start_url) + time.sleep(3) + return True + return False + except Exception as e: + logging.warning(f"检查登录状态时出错: {e}") + return False + + def _detect_login_status(self, timeout: int = 600) -> bool: + """自动检测是否已登录""" + try: + start = time.time() + while time.time() - start < timeout: + time.sleep(2) + # 检查登录状态的多个选择器 + selectors = [ + '[data-e2e="user-avatar"]', + '.user-avatar', + '[class*="avatar"]', + '[class*="Avatar"]' + ] + + for selector in selectors: + try: + elements = self.driver.find_elements("css selector", selector) + if elements: + logging.info("检测到用户头像,确认已登录") + return True + except Exception: + continue + + # 检查是否有登录按钮(表示未登录) + login_selectors = [ + '[data-e2e="login-button"]', + 'button[class*="login"]', + 'a[href*="login"]' + ] + + for selector in login_selectors: + try: + elements = self.driver.find_elements("css selector", selector) + if elements: + logging.info("检测到登录按钮,用户未登录") + return False + except Exception: + continue + + logging.info("登录状态检测超时,假设未登录") + return False + except Exception as e: + logging.warning(f"登录状态检测出错: {e}") + return False + + def trigger_loading(self): + logging.info('触发数据加载:滚动 + 刷新') + # 滚动触发懒加载 + for i in range(8): + self.driver.execute_script(f'window.scrollTo(0, {i * 900});') + time.sleep(1.2) + # 刷新触发新请求 + self.driver.refresh() + time.sleep(4) + for i in range(6): + self.driver.execute_script(f'window.scrollTo(0, {i * 1200});') + time.sleep(1.3) + + def format_count(self, n: int) -> str: + if n >= 100_000_000: + return f"{n/100_000_000:.1f}亿" + if n >= 10_000: + return f"{n/10_000:.1f}万" + return str(n) + + + + def parse_play_vv_from_text(self, text: str, source_url: str, request_id: str = None): + """解析文本中的play_vv、mix_name和watched_item信息""" + try: + # 尝试解析JSON数据 + if text.strip().startswith('{') or text.strip().startswith('['): + try: + data = json.loads(text) + self._extract_from_json_data(data, source_url, request_id) + return + except json.JSONDecodeError: + pass + + # 如果不是JSON,使用正则表达式查找 + self._extract_from_text_regex(text, source_url, request_id) + + except Exception as e: + logging.warning(f'解析文本数据时出错: {e}') + + def _extract_from_json_data(self, data, source_url: str, request_id: str = None): + """从JSON数据中递归提取合集信息""" + def extract_mix_info(obj, path=""): + if isinstance(obj, dict): + # 检查是否包含合集信息 + if 'mix_id' in obj and 'statis' in obj: + mix_id = obj.get('mix_id', '') + mix_name = obj.get('mix_name', '') + statis = obj.get('statis', {}) + + # 调试:输出包含mix_id的完整对象结构(仅输出前3个) + if len(self.play_vv_items) < 3: + logging.info(f"=== 调试:合集对象结构 ===") + logging.info(f"完整对象键: {list(obj.keys())}") + # 查找可能的视频相关字段 + for key, value in obj.items(): + if 'aweme' in key.lower() or 'video' in key.lower() or 'item' in key.lower() or 'ids' in key.lower(): + logging.info(f"可能的视频字段 {key}: {type(value)} - {str(value)[:200]}") + + # 特别检查ids字段 + if 'ids' in obj: + ids_value = obj['ids'] + logging.info(f"ids字段详细信息: {type(ids_value)} - {ids_value}") + if isinstance(ids_value, list) and len(ids_value) > 0: + logging.info(f"ids列表长度: {len(ids_value)}") + logging.info(f"第一个ID: {ids_value[0]}") + if len(ids_value) > 1: + logging.info(f"第二个ID: {ids_value[1]}") + + if isinstance(statis, dict) and 'play_vv' in statis: + play_vv = statis.get('play_vv') + if isinstance(play_vv, (int, str)) and str(play_vv).isdigit(): + vv = int(play_vv) + # 构建合集链接 + video_url = f"https://www.douyin.com/collection/{mix_id}" if mix_id else "" + + # 提取合集封面图片URL - 直接存储完整的图片链接 + cover_image_url = "" + cover_image_backup_urls = [] # 备用链接列表 + + # 查找封面图片字段,优先获取完整的URL链接 + if 'cover' in obj: + cover = obj['cover'] + if isinstance(cover, dict) and 'url_list' in cover and cover['url_list']: + # 主链接 + cover_image_url = cover['url_list'][0] + # 备用链接 + cover_image_backup_urls = cover['url_list'][1:] if len(cover['url_list']) > 1 else [] + elif isinstance(cover, str): + cover_image_url = cover + elif 'cover_url' in obj: + cover_url = obj['cover_url'] + if isinstance(cover_url, dict) and 'url_list' in cover_url and cover_url['url_list']: + cover_image_url = cover_url['url_list'][0] + cover_image_backup_urls = cover_url['url_list'][1:] if len(cover_url['url_list']) > 1 else [] + elif isinstance(cover_url, str): + cover_image_url = cover_url + elif 'image' in obj: + image = obj['image'] + if isinstance(image, dict) and 'url_list' in image and image['url_list']: + cover_image_url = image['url_list'][0] + cover_image_backup_urls = image['url_list'][1:] if len(image['url_list']) > 1 else [] + elif isinstance(image, str): + cover_image_url = image + elif 'pic' in obj: + pic = obj['pic'] + if isinstance(pic, dict) and 'url_list' in pic and pic['url_list']: + cover_image_url = pic['url_list'][0] + cover_image_backup_urls = pic['url_list'][1:] if len(pic['url_list']) > 1 else [] + elif isinstance(pic, str): + cover_image_url = pic + + self.play_vv_items.append({ + 'play_vv': vv, + 'formatted': self.format_count(vv), + 'url': source_url, + 'request_id': request_id, + 'mix_name': mix_name, + 'video_url': video_url, # 合集链接 + 'mix_id': mix_id, # 合集ID + 'cover_image_url': cover_image_url, # 合集封面图片主链接(完整URL) + 'cover_backup_urls': cover_image_backup_urls, # 封面图片备用链接列表 + 'timestamp': datetime.now().isoformat() + }) + logging.info(f'提取到合集: {mix_name} (ID: {mix_id}) - {vv:,} 播放量') + + # 递归搜索子对象 + for key, value in obj.items(): + if isinstance(value, (dict, list)): + extract_mix_info(value, f"{path}.{key}" if path else key) + + elif isinstance(obj, list): + for i, item in enumerate(obj): + if isinstance(item, (dict, list)): + extract_mix_info(item, f"{path}[{i}]" if path else f"[{i}]") + + extract_mix_info(data) + + def _extract_from_text_regex(self, text: str, source_url: str, request_id: str = None): + """使用正则表达式从文本中提取信息""" + # 查找包含完整合集信息的JSON片段 + mix_pattern = r'\{[^{}]*"mix_id"\s*:\s*"([^"]*)"[^{}]*"mix_name"\s*:\s*"([^"]*)"[^{}]*"statis"\s*:\s*\{[^{}]*"play_vv"\s*:\s*(\d+)[^{}]*\}[^{}]*\}' + + for match in re.finditer(mix_pattern, text): + try: + mix_id = match.group(1) + mix_name = match.group(2) + vv = int(match.group(3)) + + # 构建合集链接 + video_url = f"https://www.douyin.com/collection/{mix_id}" if mix_id else "" + + self.play_vv_items.append({ + 'play_vv': vv, + 'formatted': self.format_count(vv), + 'url': source_url, + 'request_id': request_id, + 'mix_name': mix_name, + 'video_url': video_url, # 合集链接 + 'mix_id': mix_id, # 合集ID + 'timestamp': datetime.now().isoformat() + }) + logging.info(f'正则提取到合集: {mix_name} (ID: {mix_id}) - {vv:,} 播放量') + except Exception: + continue + + # 兜底:查找单独的play_vv值 + for match in re.findall(r'"play_vv"\s*:\s*(\d+)', text): + try: + vv = int(match) + # 检查是否已经存在相同的play_vv + if not any(item['play_vv'] == vv for item in self.play_vv_items): + self.play_vv_items.append({ + 'play_vv': vv, + 'formatted': self.format_count(vv), + 'url': source_url, + 'request_id': request_id, + 'mix_name': '', # 未知合集名称 + 'video_url': '', # 未知链接 + 'mix_id': '', # 未知mix_id + 'timestamp': datetime.now().isoformat() + }) + except Exception: + continue + + def collect_network_bodies(self, duration_s: int = None): + if duration_s is None: + duration_s = self.duration_s + logging.info(f'开始收集网络响应体,持续 {duration_s}s') + start = time.time() + known_request_ids = set() + + # 目标关键词(收藏/合集/视频) + url_keywords = ['aweme', 'mix', 'collection', 'favorite', 'note', 'api'] + + last_progress = 0 + while time.time() - start < duration_s: + try: + logs = self.driver.get_log('performance') + except Exception as e: + logging.warning(f'获取性能日志失败: {e}') + time.sleep(1) + continue + + for entry in logs: + try: + message = json.loads(entry['message'])['message'] + except Exception: + continue + + method = message.get('method') + params = message.get('params', {}) + + # 记录请求URL + if method == 'Network.requestWillBeSent': + req_id = params.get('requestId') + url = params.get('request', {}).get('url', '') + if any(k in url for k in url_keywords): + self.captured_responses.append({'requestId': req_id, 'url': url, 'type': 'request'}) + + # 响应到达,尝试获取响应体 + if method == 'Network.responseReceived': + req_id = params.get('requestId') + url = params.get('response', {}).get('url', '') + type_ = params.get('type') # XHR, Fetch, Document + if req_id and req_id not in known_request_ids: + known_request_ids.add(req_id) + # 仅处理XHR/Fetch + if type_ in ('XHR', 'Fetch') and any(k in url for k in url_keywords): + try: + body_obj = self.driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': req_id}) + body_text = body_obj.get('body', '') + # 可能是base64编码 + if body_obj.get('base64Encoded'): + try: + import base64 + body_text = base64.b64decode(body_text).decode('utf-8', errors='ignore') + except Exception: + pass + + # 解析play_vv + self.parse_play_vv_from_text(body_text, url, req_id) + except Exception: + # 某些响应不可获取或过大 + pass + elapsed = int(time.time() - start) + if elapsed - last_progress >= 5: + last_progress = elapsed + logging.info(f'进度: {elapsed}/{duration_s}, 目标数量: {len(self.play_vv_items)}') + time.sleep(0.8) + + logging.info(f'网络收集完成,共发现 {len(self.play_vv_items)} 个目标') + + + def parse_ssr_data(self): + logging.info('尝试解析页面SSR数据') + # 尝试直接从window对象获取 + keys = ['_SSR_HYDRATED_DATA', 'RENDER_DATA'] + for key in keys: + try: + data = self.driver.execute_script(f'return window.{key}') + if data: + text = json.dumps(data, ensure_ascii=False) + self.parse_play_vv_from_text(text, f'page_{key}', None) + logging.info(f'从 {key} 中解析完成') + except Exception: + continue + + # 兜底:从page_source中正则查找 + try: + page_source = self.driver.page_source + self.parse_play_vv_from_text(page_source, 'page_source', None) + # 同时尝试识别statis结构中的play_vv + for m in re.findall(r'"statis"\s*:\s*\{[^}]*"play_vv"\s*:\s*(\d+)[^}]*\}', page_source): + try: + vv = int(m) + # 检查是否已经存在相同的play_vv + if not any(item['play_vv'] == vv for item in self.play_vv_items): + self.play_vv_items.append({ + 'play_vv': vv, + 'formatted': self.format_count(vv), + 'url': 'page_source_statis', + 'request_id': None, + 'mix_name': '', # 从statis中无法获取合集名称 + 'video_url': '', # 从statis中无法获取链接 + 'timestamp': datetime.now().isoformat() + }) + except Exception: + pass + except Exception: + pass + + def dedupe(self): + # 去重按play_vv数值 + unique = [] + seen = set() + for item in self.play_vv_items: + vv = item['play_vv'] + if vv not in seen: + unique.append(item) + seen.add(vv) + self.play_vv_items = unique + + def save_results(self): + # 保存到MongoDB + self.save_to_mongodb() + + logging.info('结果已保存到MongoDB') + + def save_to_mongodb(self): + """将数据保存到MongoDB""" + if self.collection is None: + logging.warning('MongoDB未连接,跳过数据库保存') + return + + if not self.play_vv_items: + logging.info('没有数据需要保存到MongoDB') + return + + try: + batch_time = datetime.now() + documents = [] + + for item in self.play_vv_items: + # 保留用户要求的7个字段 + cover_image_url作为合集封面图片完整链接 + doc = { + 'batch_time': batch_time, + 'mix_name': item.get('mix_name', ''), + 'video_url': item.get('video_url', ''), + 'playcount': item.get('formatted', ''), + 'play_vv': item.get('play_vv', 0), + 'request_id': item.get('request_id', ''), + 'rank': 0, # 临时设置,后面会重新计算 + 'cover_image_url': item.get('cover_image_url', ''), # 合集封面图片主链接(完整URL) + 'cover_backup_urls': item.get('cover_backup_urls', []) # 封面图片备用链接列表 + } + documents.append(doc) + + # 按播放量降序排序并添加排名 + documents.sort(key=lambda x: x['play_vv'], reverse=True) + for i, doc in enumerate(documents, 1): + doc['rank'] = i + + # 批量插入 + result = self.collection.insert_many(documents) + logging.info(f'成功保存 {len(result.inserted_ids)} 条记录到MongoDB') + + # 输出统计信息 + total_play_vv = sum(doc['play_vv'] for doc in documents) + max_play_vv = max(doc['play_vv'] for doc in documents) if documents else 0 + + logging.info(f'MongoDB保存统计: 总播放量={total_play_vv:,}, 最高播放量={max_play_vv:,}') + logging.info(f'保存的字段: batch_time, mix_name, video_url, playcount, play_vv, request_id, rank, cover_image_url, cover_backup_urls') + + # 统计封面图片提取情况 + cover_count = sum(1 for doc in documents if doc.get('cover_image_url')) + backup_count = sum(1 for doc in documents if doc.get('cover_backup_urls')) + logging.info(f'封面图片统计: {cover_count}/{len(documents)} 个合集有主封面链接, {backup_count} 个合集有备用链接') + + except Exception as e: + logging.error(f'保存到MongoDB时出错: {e}') + + def run(self): + try: + self.setup_driver() + self.navigate() + self.ensure_login() + self.trigger_loading() + self.collect_network_bodies() + self.parse_ssr_data() + self.dedupe() + self.save_results() + logging.info('完成,play_vv数量: %d', len(self.play_vv_items)) + finally: + if self.driver: + try: + self.driver.quit() + except Exception: + pass + + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser(description='Selenium+CDP 抖音play_vv抓取器') + parser.add_argument('--url', default='https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation', help='收藏合集列表页面URL') + parser.add_argument('--auto', action='store_true', help='自动继续,跳过回车等待') + parser.add_argument('--duration', type=int, default=60, help='网络响应收集时长(秒)') + parser.add_argument('--driver', help='覆盖chromedriver路径') + args = parser.parse_args() + + if args.driver: + os.environ['OVERRIDE_CHROMEDRIVER'] = args.driver + if args.auto: + os.environ['AUTO_CONTINUE'] = '1' + + print('=== Selenium+CDP 抖音play_vv抓取器 ===') + scraper = DouyinPlayVVScraper(args.url, auto_continue=args.auto, duration_s=args.duration) + scraper.run() \ No newline at end of file diff --git a/routers/rank_api_routes.py b/routers/rank_api_routes.py new file mode 100644 index 0000000..ecbe162 --- /dev/null +++ b/routers/rank_api_routes.py @@ -0,0 +1,846 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +小程序专用抖音播放量数据API服务器 +优化的数据格式和接口设计,专为小程序使用 +""" + +from flask import Blueprint, request, jsonify +from datetime import datetime, timedelta +import logging +import re +from database import db + +# 创建蓝图 +rank_bp = Blueprint('rank', __name__, url_prefix='/api/rank') + +# 获取数据库集合 +collection = db['Rankings_list'] +daily_rankings_collection = db['Ranking_storage'] # 榜单存储表 + +def format_playcount(playcount_str): + """格式化播放量字符串为数字""" + if not playcount_str: + return 0 + + try: + if isinstance(playcount_str, (int, float)): + return int(playcount_str) + + playcount_str = str(playcount_str).strip() + + # 处理亿、万等单位 + if "亿" in playcount_str: + num = float(re.findall(r'[\d.]+', playcount_str)[0]) + return int(num * 100000000) + elif "万" in playcount_str: + num = float(re.findall(r'[\d.]+', playcount_str)[0]) + return int(num * 10000) + else: + # 尝试直接转换数字 + return int(float(playcount_str)) + except: + return 0 + +def format_cover_url(cover_data): + """格式化封面图片URL""" + if not cover_data: + return "" + + if isinstance(cover_data, str): + return cover_data + elif isinstance(cover_data, dict) and 'url_list' in cover_data: + return cover_data['url_list'][0] if cover_data['url_list'] else "" + else: + return "" + +def format_time(time_obj): + """格式化时间""" + if not time_obj: + return "" + + if isinstance(time_obj, datetime): + return time_obj.strftime("%Y-%m-%d %H:%M:%S") + else: + return str(time_obj) + +def sort_ranking_data(ranking_data, sort_by, sort_order='desc'): + """ + 对榜单数据进行动态排序 + + Args: + ranking_data: 榜单数据列表 + sort_by: 排序字段 (play_vv_change, play_vv_change_rate, play_vv, rank) + sort_order: 排序顺序 (asc, desc) + + Returns: + 排序后的榜单数据 + """ + try: + # 定义排序键函数 + def get_sort_key(item): + if sort_by == 'play_vv_change': + # 按播放量差值排序 + timeline_data = item.get('timeline_data', {}) + return timeline_data.get('play_vv_change', 0) + elif sort_by == 'play_vv_change_rate': + # 按播放量变化率排序 + timeline_data = item.get('timeline_data', {}) + return timeline_data.get('play_vv_change_rate', 0) + elif sort_by == 'play_vv': + # 按当前播放量排序 + return item.get('play_vv', 0) + elif sort_by == 'rank': + # 按排名排序 + return item.get('rank', 999999) + else: + # 默认按排名排序 + return item.get('rank', 999999) + + # 执行排序 + reverse = (sort_order == 'desc') + + # 对于排名字段,降序实际上是升序(排名越小越好) + if sort_by == 'rank': + reverse = (sort_order == 'asc') + + sorted_data = sorted(ranking_data, key=get_sort_key, reverse=reverse) + + # 重新分配排名 + for i, item in enumerate(sorted_data, 1): + item['current_sort_rank'] = i + + return sorted_data + + except Exception as e: + logging.error(f"排序榜单数据失败: {e}") + # 如果排序失败,返回原始数据 + return ranking_data + +def format_mix_item(doc): + """格式化合集数据项 - 完全按照数据库原始字段返回""" + return { + "_id": str(doc.get("_id", "")), + "batch_time": format_time(doc.get("batch_time")), + "mix_name": doc.get("mix_name", ""), + "video_url": doc.get("video_url", ""), + "playcount": doc.get("playcount", ""), + "play_vv": doc.get("play_vv", 0), + "request_id": doc.get("request_id", ""), + "rank": doc.get("rank", 0), + "cover_image_url": doc.get("cover_image_url", ""), + "cover_backup_urls": doc.get("cover_backup_urls", []) + } + +def get_mix_list(page=1, limit=20, sort_by="playcount"): + """获取合集列表(分页)""" + try: + # 计算跳过的数量 + skip = (page - 1) * limit + + # 设置排序字段 + if sort_by == "growth": + # 按增长排序需要特殊处理 + return get_growth_mixes(page, limit) + else: + sort_field = "play_vv" if sort_by == "playcount" else "batch_time" + sort_order = -1 # 降序 + + # 获取今天的日期 + today = datetime.now().date() + + # 只查询今天的数据 + query_condition = { + "batch_time": { + "$gte": datetime(today.year, today.month, today.day), + "$lt": datetime(today.year, today.month, today.day) + timedelta(days=1) + } + } + + # 查询数据并按短剧名称分组,取每个短剧的最新记录 + pipeline = [ + {"$match": query_condition}, + {"$sort": {"batch_time": -1}}, # 按时间倒序 + {"$group": { + "_id": "$mix_name", # 按短剧名称分组 + "latest_doc": {"$first": "$$ROOT"} # 取每个分组的第一条记录(最新记录) + }}, + {"$replaceRoot": {"newRoot": "$latest_doc"}}, + {"$sort": {sort_field: sort_order}}, + {"$skip": skip}, + {"$limit": limit} + ] + + docs = list(collection.aggregate(pipeline)) + + # 获取总数 + total_pipeline = [ + {"$match": query_condition}, + {"$sort": {"batch_time": -1}}, + {"$group": {"_id": "$mix_name"}}, + {"$count": "total"} + ] + total_result = list(collection.aggregate(total_pipeline)) + total = total_result[0]["total"] if total_result else 0 + + # 格式化数据 + mix_list = [] + for doc in docs: + item = format_mix_item(doc) + mix_list.append(item) + + return { + "success": True, + "data": mix_list, + "pagination": { + "page": page, + "limit": limit, + "total": total, + "pages": (total + limit - 1) // limit, + "has_next": page * limit < total, + "has_prev": page > 1 + }, + "sort_by": sort_by, + "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + + except Exception as e: + logging.error(f"获取合集列表失败: {e}") + return {"success": False, "message": f"获取数据失败: {str(e)}"} + +def get_growth_mixes(page=1, limit=20, start_date=None, end_date=None): + """获取按播放量增长排序的合集列表 - 优先从定时器生成的数据中读取""" + try: + # 计算跳过的数量 + skip = (page - 1) * limit + + # 如果没有提供日期,默认使用今天和昨天 + if not start_date or not end_date: + end_date = datetime.now().date() + start_date = end_date - timedelta(days=1) + else: + # 转换字符串日期为datetime对象 + if isinstance(start_date, str): + start_date = datetime.strptime(start_date, "%Y-%m-%d").date() + if isinstance(end_date, str): + end_date = datetime.strptime(end_date, "%Y-%m-%d").date() + + end_date_str = end_date.strftime("%Y-%m-%d") + start_date_str = start_date.strftime("%Y-%m-%d") + + # 优先尝试从定时器生成的增长榜数据中读取 + try: + growth_ranking = daily_rankings_collection.find_one({ + "date": end_date_str, + "type": "growth", + "start_date": start_date_str, + "end_date": end_date_str + }, sort=[("calculation_sequence", -1)]) # 获取最新的计算结果 + + if growth_ranking and "data" in growth_ranking: + logging.info(f"📈 从定时器生成的增长榜数据中读取 {end_date_str} 的增长榜") + + # 获取预先计算好的增长榜数据 + growth_data = growth_ranking["data"] + + # 分页处理 + total = len(growth_data) + paginated_data = growth_data[skip:skip + limit] + + return { + "success": True, + "data": paginated_data, + "pagination": { + "page": page, + "limit": limit, + "total": total, + "pages": (total + limit - 1) // limit, + "has_next": page * limit < total, + "has_prev": page > 1 + }, + "sort_by": "growth", + "date_range": { + "start_date": start_date_str, + "end_date": end_date_str + }, + "data_source": "timer_generated", # 标识数据来源 + "update_time": growth_ranking.get("created_at", datetime.now()).strftime("%Y-%m-%d %H:%M:%S") if isinstance(growth_ranking.get("created_at"), datetime) else str(growth_ranking.get("created_at", "")) + } + except Exception as e: + logging.warning(f"从定时器数据读取增长榜失败,将使用动态计算: {e}") + + # 如果定时器数据不存在或读取失败,回退到动态计算 + logging.info(f"📊 动态计算 {start_date_str} 到 {end_date_str} 的增长榜") + + # 查询结束日期的数据 + end_cursor = collection.find({ + "batch_time": { + "$gte": datetime(end_date.year, end_date.month, end_date.day), + "$lt": datetime(end_date.year, end_date.month, end_date.day) + timedelta(days=1) + } + }) + end_data = list(end_cursor) + + # 查询开始日期的数据 + start_cursor = collection.find({ + "batch_time": { + "$gte": datetime(start_date.year, start_date.month, start_date.day), + "$lt": datetime(start_date.year, start_date.month, start_date.day) + timedelta(days=1) + } + }) + start_data = list(start_cursor) + + # 创建字典以便快速查找 + end_dict = {item["mix_name"]: item for item in end_data} + start_dict = {item["mix_name"]: item for item in start_data} + + # 计算增长数据 + growth_data = [] + for mix_name, end_item in end_dict.items(): + if mix_name in start_dict: + start_item = start_dict[mix_name] + growth = end_item.get("play_vv", 0) - start_item.get("play_vv", 0) + + # 只保留增长为正的数据 + if growth > 0: + item = format_mix_item(end_item) + item["growth"] = growth + item["start_date"] = start_date_str + item["end_date"] = end_date_str + growth_data.append(item) + else: + # 如果开始日期没有数据,但结束日期有,也认为是新增长 + item = format_mix_item(end_item) + item["growth"] = end_item.get("play_vv", 0) + item["start_date"] = start_date_str + item["end_date"] = end_date_str + growth_data.append(item) + + # 按增长值降序排序 + growth_data.sort(key=lambda x: x.get("growth", 0), reverse=True) + + # 分页处理 + total = len(growth_data) + paginated_data = growth_data[skip:skip + limit] + + # 添加排名 + for i, item in enumerate(paginated_data): + item["rank"] = skip + i + 1 + + return { + "success": True, + "data": paginated_data, + "pagination": { + "page": page, + "limit": limit, + "total": total, + "pages": (total + limit - 1) // limit, + "has_next": page * limit < total, + "has_prev": page > 1 + }, + "sort_by": "growth", + "date_range": { + "start_date": start_date_str, + "end_date": end_date_str + }, + "data_source": "dynamic_calculation", # 标识数据来源 + "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + + except Exception as e: + logging.error(f"获取增长合集列表失败: {e}") + # 如果增长计算失败,返回按播放量排序的数据作为备选 + return get_mix_list(page, limit, "playcount") + +def get_top_mixes(limit=10): + """获取热门合集(TOP榜单)""" + try: + # 按播放量排序获取热门合集 + cursor = collection.find().sort("play_vv", -1).limit(limit) + docs = list(cursor) + + if not docs: + return {"success": False, "message": "暂无数据"} + + # 格式化数据 + top_list = [] + for doc in docs: + item = format_mix_item(doc) + top_list.append(item) + + return { + "success": True, + "data": top_list, + "total": len(top_list), + "update_time": format_time(docs[0].get("batch_time")) if docs else "" + } + + except Exception as e: + logging.error(f"获取热门合集失败: {e}") + return {"success": False, "message": f"获取数据失败: {str(e)}"} + +def search_mixes(keyword, page=1, limit=10): + """搜索合集""" + try: + if not keyword: + return {"success": False, "message": "请提供搜索关键词"} + + # 计算跳过的数量 + skip = (page - 1) * limit + + # 构建搜索条件(模糊匹配合集名称) + search_condition = { + "mix_name": {"$regex": keyword, "$options": "i"} + } + + # 查询数据 + cursor = collection.find(search_condition).sort("play_vv", -1).skip(skip).limit(limit) + docs = list(cursor) + + # 获取搜索结果总数 + total = collection.count_documents(search_condition) + + # 格式化数据 + search_results = [] + for doc in docs: + item = format_mix_item(doc) + search_results.append(item) + + return { + "success": True, + "data": search_results, + "keyword": keyword, + "pagination": { + "page": page, + "limit": limit, + "total": total, + "pages": (total + limit - 1) // limit, + "has_next": page * limit < total, + "has_prev": page > 1 + }, + "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + + except Exception as e: + logging.error(f"搜索合集失败: {e}") + return {"success": False, "message": f"搜索失败: {str(e)}"} + +def get_mix_detail(mix_id): + """获取合集详情""" + try: + from bson import ObjectId + + # 尝试通过ObjectId查找 + try: + doc = collection.find_one({"_id": ObjectId(mix_id)}) + except: + # 如果ObjectId无效,尝试其他字段 + doc = collection.find_one({ + "$or": [ + {"mix_name": mix_id}, + {"request_id": mix_id} + ] + }) + + if not doc: + return {"success": False, "message": "未找到合集信息"} + + # 格式化详细信息 - 只返回数据库原始字段 + detail = format_mix_item(doc) + + return { + "success": True, + "data": detail, + "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + + except Exception as e: + logging.error(f"获取合集详情失败: {e}") + return {"success": False, "message": f"获取详情失败: {str(e)}"} + +def get_statistics(): + """获取统计信息""" + try: + # 基本统计 + total_mixes = collection.count_documents({}) + + if total_mixes == 0: + return {"success": False, "message": "暂无数据"} + + # 播放量统计 + pipeline = [ + { + "$group": { + "_id": None, + "total_playcount": {"$sum": "$play_vv"}, + "avg_playcount": {"$avg": "$play_vv"}, + "max_playcount": {"$max": "$play_vv"}, + "min_playcount": {"$min": "$play_vv"} + } + } + ] + + stats_result = list(collection.aggregate(pipeline)) + stats = stats_result[0] if stats_result else {} + + # 获取最新更新时间 + latest_doc = collection.find().sort("batch_time", -1).limit(1) + latest_time = "" + if latest_doc: + latest_list = list(latest_doc) + if latest_list: + latest_time = format_time(latest_list[0].get("batch_time")) + + # 热门分类统计(按播放量区间) + categories = [ + {"name": "超热门", "min": 100000000, "count": 0}, # 1亿+ + {"name": "热门", "min": 50000000, "max": 99999999, "count": 0}, # 5000万-1亿 + {"name": "中等", "min": 10000000, "max": 49999999, "count": 0}, # 1000万-5000万 + {"name": "一般", "min": 0, "max": 9999999, "count": 0} # 1000万以下 + ] + + for category in categories: + if "max" in category: + count = collection.count_documents({ + "play_vv": {"$gte": category["min"], "$lte": category["max"]} + }) + else: + count = collection.count_documents({ + "play_vv": {"$gte": category["min"]} + }) + category["count"] = count + + return { + "success": True, + "data": { + "total_mixes": total_mixes, + "total_playcount": stats.get("total_playcount", 0), + "avg_playcount": int(stats.get("avg_playcount", 0)), + "max_playcount": stats.get("max_playcount", 0), + "min_playcount": stats.get("min_playcount", 0), + "categories": categories, + "latest_update": latest_time + }, + "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + + except Exception as e: + logging.error(f"获取统计信息失败: {e}") + return {"success": False, "message": f"获取统计失败: {str(e)}"} + +# 路由定义 +@rank_bp.route('/videos') +def get_videos(): + """获取合集列表 - 兼容app.py调用""" + page = int(request.args.get('page', 1)) + limit = int(request.args.get('limit', 20)) + sort_by = request.args.get('sort', 'playcount') + + if sort_by == 'growth': + start_date = request.args.get('start_date') + end_date = request.args.get('end_date') + result = get_growth_mixes(page, limit, start_date, end_date) + else: + result = get_mix_list(page, limit, sort_by) + + return jsonify(result) + +@rank_bp.route('/top') +def get_top(): + """获取热门榜单 - 兼容app.py调用""" + limit = int(request.args.get('limit', 10)) + result = get_top_mixes(limit) + return jsonify(result) + +@rank_bp.route('/search') +def search(): + """搜索合集 - 兼容app.py调用""" + keyword = request.args.get('q', '') + page = int(request.args.get('page', 1)) + limit = int(request.args.get('limit', 10)) + result = search_mixes(keyword, page, limit) + return jsonify(result) + +@rank_bp.route('/detail') +def get_detail(): + """获取合集详情 - 兼容app.py调用""" + mix_id = request.args.get('id', '') + result = get_mix_detail(mix_id) + return jsonify(result) + +@rank_bp.route('/stats') +def get_stats(): + """获取统计信息 - 兼容app.py调用""" + result = get_statistics() + return jsonify(result) + +@rank_bp.route('/health') +def health_check(): + """健康检查 - 兼容app.py调用""" + try: + from database import client + + # 检查数据库连接 + if not client: + return jsonify({"success": False, "message": "数据库未连接"}) + + # 测试数据库连接 + client.admin.command('ping') + + # 获取数据统计 + total_count = collection.count_documents({}) + + return jsonify({ + "success": True, + "message": "服务正常", + "data": { + "database": "连接正常", + "total_records": total_count, + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + }) + except Exception as e: + logging.error(f"健康检查失败: {e}") + return jsonify({"success": False, "message": f"服务异常: {str(e)}"}) + + +# ==================== 榜单查询API接口 ==================== + +@rank_bp.route('/rankings') +def get_rankings(): + """获取榜单列表 - 支持按日期和类型查询,支持动态排序""" + try: + # 获取查询参数 + date = request.args.get('date') # 日期,格式:YYYY-MM-DD + ranking_type = request.args.get('type') # 榜单类型:playcount, growth, newcomer + sort_by = request.args.get('sort_by', 'default') # 排序方式:default, play_vv_change, play_vv_change_rate, play_vv + sort_order = request.args.get('sort_order', 'desc') # 排序顺序:asc, desc + page = int(request.args.get('page', 1)) + limit = int(request.args.get('limit', 50)) + + # 构建查询条件 + query = {} + if date: + query['date'] = date + if ranking_type: + query['ranking_type'] = ranking_type + + # 如果没有指定日期,默认获取最新日期的榜单 + if not date: + latest_ranking = daily_rankings_collection.find_one( + {}, sort=[('date', -1)] + ) + if latest_ranking: + query['date'] = latest_ranking['date'] + + # 查询榜单 + rankings = list(daily_rankings_collection.find(query).sort('generated_at', -1)) + + if not rankings: + return jsonify({ + "success": True, + "message": "暂无榜单数据", + "data": { + "rankings": [], + "total": 0, + "page": page, + "limit": limit + } + }) + + # 格式化返回数据 + formatted_rankings = [] + for ranking in rankings: + ranking_data = ranking.get('data', []) + + # 动态排序逻辑 + if sort_by != 'default' and ranking_data: + ranking_data = sort_ranking_data(ranking_data, sort_by, sort_order) + + # 分页处理榜单数据 + start_idx = (page - 1) * limit + end_idx = start_idx + limit + paginated_data = ranking_data[start_idx:end_idx] + + formatted_rankings.append({ + "date": ranking.get('date'), + "ranking_type": ranking.get('ranking_type'), + "ranking_name": ranking.get('ranking_name'), + "description": ranking.get('description'), + "data": paginated_data, + "total_count": len(ranking_data), + "current_page_count": len(paginated_data), + "generated_at": format_time(ranking.get('generated_at')), + "version": ranking.get('version', '1.0'), + "sort_info": { + "sort_by": sort_by, + "sort_order": sort_order + } + }) + + return jsonify({ + "success": True, + "message": "获取榜单成功", + "data": { + "rankings": formatted_rankings, + "total": len(formatted_rankings), + "page": page, + "limit": limit, + "sort_by": sort_by, + "sort_order": sort_order + } + }) + + except Exception as e: + logging.error(f"获取榜单失败: {e}") + return jsonify({"success": False, "message": f"获取榜单失败: {str(e)}"}) + + +@rank_bp.route('/rankings/dates') +def get_ranking_dates(): + """获取可用的榜单日期列表""" + try: + # 获取所有不重复的日期 + dates = daily_rankings_collection.distinct('date') + dates.sort(reverse=True) # 按日期倒序排列 + + return jsonify({ + "success": True, + "message": "获取日期列表成功", + "data": { + "dates": dates, + "total": len(dates) + } + }) + + except Exception as e: + logging.error(f"获取日期列表失败: {e}") + return jsonify({"success": False, "message": f"获取日期列表失败: {str(e)}"}) + + +@rank_bp.route('/rankings/types') +def get_ranking_types(): + """获取支持的榜单类型""" + try: + # 获取所有不重复的榜单类型 + types = daily_rankings_collection.distinct('ranking_type') + + # 添加类型说明 + type_descriptions = { + 'playcount': '播放量榜 - 按播放量排序', + 'growth': '增长榜 - 播放量增长最快', + 'newcomer': '新晋榜 - 新上榜内容' + } + + formatted_types = [] + for type_name in types: + formatted_types.append({ + "type": type_name, + "description": type_descriptions.get(type_name, type_name) + }) + + return jsonify({ + "success": True, + "message": "获取榜单类型成功", + "data": { + "types": formatted_types, + "total": len(formatted_types) + } + }) + + except Exception as e: + logging.error(f"获取榜单类型失败: {e}") + return jsonify({"success": False, "message": f"获取榜单类型失败: {str(e)}"}) + + +@rank_bp.route('/rankings/latest') +def get_latest_rankings(): + """获取最新的所有类型榜单""" + try: + # 获取最新日期 + latest_ranking = daily_rankings_collection.find_one( + {}, sort=[('date', -1)] + ) + + if not latest_ranking: + return jsonify({ + "success": True, + "message": "暂无榜单数据", + "data": { + "date": None, + "rankings": [] + } + }) + + latest_date = latest_ranking['date'] + + # 获取该日期的所有榜单 + rankings = list(daily_rankings_collection.find({ + 'date': latest_date + }).sort('ranking_type', 1)) + + formatted_rankings = [] + for ranking in rankings: + # 只返回前20条数据 + ranking_data = ranking.get('data', [])[:20] + + formatted_rankings.append({ + "ranking_type": ranking.get('ranking_type'), + "ranking_name": ranking.get('ranking_name'), + "description": ranking.get('description'), + "data": ranking_data, + "total_count": ranking.get('total_count', 0), + "preview_count": len(ranking_data) + }) + + return jsonify({ + "success": True, + "message": "获取最新榜单成功", + "data": { + "date": latest_date, + "rankings": formatted_rankings, + "total_types": len(formatted_rankings) + } + }) + + except Exception as e: + logging.error(f"获取最新榜单失败: {e}") + return jsonify({"success": False, "message": f"获取最新榜单失败: {str(e)}"}) + + +@rank_bp.route('/rankings/stats') +def get_rankings_stats(): + """获取榜单统计信息""" + try: + # 统计总榜单数 + total_rankings = daily_rankings_collection.count_documents({}) + + # 统计日期数量 + total_dates = len(daily_rankings_collection.distinct('date')) + + # 统计榜单类型数量 + total_types = len(daily_rankings_collection.distinct('ranking_type')) + + # 获取最新和最早日期 + latest_ranking = daily_rankings_collection.find_one({}, sort=[('date', -1)]) + earliest_ranking = daily_rankings_collection.find_one({}, sort=[('date', 1)]) + + latest_date = latest_ranking['date'] if latest_ranking else None + earliest_date = earliest_ranking['date'] if earliest_ranking else None + + return jsonify({ + "success": True, + "message": "获取榜单统计成功", + "data": { + "total_rankings": total_rankings, + "total_dates": total_dates, + "total_types": total_types, + "latest_date": latest_date, + "earliest_date": earliest_date, + "date_range": f"{earliest_date} 至 {latest_date}" if earliest_date and latest_date else "暂无数据" + } + }) + + except Exception as e: + logging.error(f"获取榜单统计失败: {e}") + return jsonify({"success": False, "message": f"获取榜单统计失败: {str(e)}"}) \ No newline at end of file