commit 53160420d12f43d4e9b29f551d236508ae9fd447 Author: Qyir <13521889462@163.com> Date: Fri Oct 17 10:48:52 2025 +0800 Initial commit: Douyin play count tracking system Features: - Douyin play count scraper using Selenium + Chrome DevTools Protocol - Automated scheduler for daily data collection - MongoDB data storage - Mini-program API server - Data analysis and visualization tools 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ebb1f33 --- /dev/null +++ b/.gitignore @@ -0,0 +1,54 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Logs +*.log +logs/ +miniprogram_api.log + +# Data files +douyin_cdp_play_vv_*.json +douyin_cdp_play_vv_*.txt + +# Chrome profiles and drivers +drivers/ +config/ + +# Environment variables +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo + +# OS +.DS_Store +Thumbs.db \ No newline at end of file diff --git a/README_定时器.md b/README_定时器.md new file mode 100644 index 0000000..3ebe141 --- /dev/null +++ b/README_定时器.md @@ -0,0 +1,133 @@ +# 抖音播放量自动抓取定时器 - 跨平台版本 + +## 🎯 概述 + +这是一个跨平台的Python定时器,可以在任何电脑上运行,自动在每晚22:00执行抖音播放量抓取任务。 + +## 📋 支持平台 + +- ✅ **Windows** (Windows 10/11) +- ✅ **macOS** (Intel/Apple Silicon) +- ✅ **Linux** (Ubuntu/CentOS等) + +## 🚀 快速开始 + +### 1. 安装依赖 +```bash +pip install -r config/requirements.txt +pip install schedule +``` + +### 2. 启动定时器 +```bash +# 启动定时器 +python douyin_auto_scheduler.py + +# 测试模式(立即执行一次) +python douyin_auto_scheduler.py --test + +# 只执行一次 +python douyin_auto_scheduler.py --once + +# 显示帮助 +python douyin_auto_scheduler.py --help +``` + +### 3. 停止定时器 +按 `Ctrl+C` 停止定时器 + +## 📁 文件说明 + +- `douyin_auto_scheduler.py` - 主定时器文件 +- `start_scheduler.py` - 简化启动脚本 +- `scripts/douyin_selenium_cdp_play_vv.py` - 抓取脚本 +- `config/requirements.txt` - Python依赖 + +## ⚙️ 配置说明 + +### 执行时间 +- **主执行时间**: 每晚 22:00 +- **备用执行时间**: 每晚 22:05 + +### 日志文件 +- 定时器日志: `logs/scheduler.log` +- 抓取任务日志: `logs/auto_update_YYYYMMDD.log` + +## 🔧 使用方法 + +### macOS/Linux 后台运行 +```bash +# 使用nohup在后台运行 +nohup python douyin_auto_scheduler.py > scheduler.log 2>&1 & + +# 查看进程 +ps aux | grep douyin_auto_scheduler + +# 停止进程 +pkill -f douyin_auto_scheduler +``` + +### Windows 后台运行 +```bash +# 使用start命令在后台运行 +start /B python douyin_auto_scheduler.py + +# 查看任务管理器中的Python进程 +``` + +### 首次使用 +1. 运行测试模式确保环境正常 +2. 手动登录抖音账号保存登录状态 +3. 启动定时器 + +## 🎯 功能特点 + +- ✅ **跨平台兼容** - 支持所有主流操作系统 +- ✅ **自动执行** - 每晚22:00自动运行 +- ✅ **状态监控** - 实时显示下次执行时间 +- ✅ **错误恢复** - 自动重试机制 +- ✅ **日志记录** - 详细的执行日志 +- ✅ **数据库保存** - 自动保存到MongoDB + +## 📞 故障排除 + +### 常见问题 + +1. **Python依赖问题** + ```bash + pip install --upgrade pip + pip install -r config/requirements.txt + pip install schedule + ``` + +2. **Chrome浏览器问题** + - 确保Chrome浏览器已安装 + - 确保chromedriver可用 + +3. **MongoDB连接问题** + - 确保MongoDB服务运行 + - 检查连接配置 + +4. **网络连接问题** + - 确保网络连接正常 + - 检查防火墙设置 + +### 日志检查 +```bash +# 查看定时器日志 +tail -f logs/scheduler.log + +# 查看抓取任务日志 +ls -la logs/auto_update_*.log +``` + +## 💡 提示 + +- 首次使用建议先运行测试模式 +- 确保计算机在22:00处于开机状态 +- 定期检查日志文件确认任务正常执行 +- 如需修改执行时间,编辑 `douyin_auto_scheduler.py` 中的时间设置 + +## 🎉 完成设置 + +设置完成后,系统将在每晚22:00自动执行,无需手动操作! \ No newline at end of file diff --git a/docs/API接口文档.md b/docs/API接口文档.md new file mode 100644 index 0000000..12056b1 --- /dev/null +++ b/docs/API接口文档.md @@ -0,0 +1,216 @@ +# 抖音播放量数据API接口文档 + +## 🚀 服务器信息 + +- **服务器地址**: `http://localhost:5000` 或 `http://你的服务器IP:5000` +- **协议**: HTTP +- **数据格式**: JSON +- **编码**: UTF-8 +- **跨域支持**: 已配置CORS,支持小程序调用 + +## 📋 API接口列表 + +### 1. 获取最新播放量数据 + +**接口地址**: `GET /api/latest` + +**功能说明**: 获取最新一批的抖音剧本播放量数据,按排名排序 + +**请求参数**: +- `limit` (可选): 返回数据条数,默认50条 + +**请求示例**: +``` +GET /api/latest?limit=10 +``` + +**返回数据格式**: +```json +{ + "success": true, + "data": [ + { + "rank": 1, + "script_name": "九尾狐男妖爱上我更新37集", + "playcount": "2.1亿", + "playcount_number": 210000000.0, + "update_time": "2025-10-15 18:39:29" + } + ], + "total": 35, + "update_time": "2025-10-15 18:39:29" +} +``` + +### 2. 搜索剧本 + +**接口地址**: `GET /api/search` + +**功能说明**: 根据剧本名称进行模糊搜索 + +**请求参数**: +- `name` (必需): 搜索关键词 + +**请求示例**: +``` +GET /api/search?name=九尾狐 +``` + +**返回数据格式**: +```json +{ + "success": true, + "data": [ + { + "rank": 1, + "script_name": "九尾狐男妖爱上我更新37集", + "playcount": "2.1亿", + "playcount_number": 210000000.0, + "update_time": "2025-10-15 18:39:29" + } + ], + "total": 1, + "search_keyword": "九尾狐" +} +``` + +### 3. 获取热门剧本 + +**接口地址**: `GET /api/top` + +**功能说明**: 获取播放量最高的剧本列表 + +**请求参数**: +- `limit` (可选): 返回数据条数,默认10条 + +**请求示例**: +``` +GET /api/top?limit=5 +``` + +**返回数据格式**: +```json +{ + "success": true, + "data": [ + { + "rank": 1, + "script_name": "九尾狐男妖爱上我更新37集", + "playcount": "2.1亿", + "playcount_number": 210000000.0, + "update_time": "2025-10-15 18:39:29" + } + ], + "total": 5 +} +``` + +### 4. 服务器状态 + +**接口地址**: `GET /api/status` + +**功能说明**: 获取API服务器和数据库状态 + +**请求示例**: +``` +GET /api/status +``` + +**返回数据格式**: +```json +{ + "success": true, + "mongodb_status": "连接正常", + "total_records": 35, + "latest_update": "2025-10-15 18:39:29", + "server_time": "2025-10-15 18:54:45" +} +``` + +## 📱 小程序调用示例 + +### 微信小程序示例代码 + +```javascript +// 获取最新数据 +wx.request({ + url: 'http://你的服务器IP:5000/api/latest', + method: 'GET', + data: { + limit: 20 + }, + success: function(res) { + console.log('获取数据成功:', res.data); + if (res.data.success) { + // 处理数据 + const scripts = res.data.data; + // 更新页面数据 + } + }, + fail: function(err) { + console.error('请求失败:', err); + } +}); + +// 搜索剧本 +wx.request({ + url: 'http://你的服务器IP:5000/api/search', + method: 'GET', + data: { + name: '九尾狐' + }, + success: function(res) { + if (res.data.success) { + const searchResults = res.data.data; + // 显示搜索结果 + } + } +}); +``` + +## 🔄 数据更新机制 + +1. **自动更新**: 每天24:00自动运行抓取脚本 +2. **实时同步**: 抓取完成后,API立即返回最新数据 +3. **排序规则**: 数据按播放量自动排序,最高播放量排第一 + +## 📊 数据字段说明 + +| 字段名 | 类型 | 说明 | +|--------|------|------| +| rank | number | 排名(1为最高) | +| script_name | string | 剧本名称 | +| playcount | string | 播放量文本(如"2.1亿") | +| playcount_number | number | 播放量数值(用于排序) | +| update_time | string | 更新时间 | + +## ⚠️ 注意事项 + +1. **服务器地址**: 请将`localhost`替换为实际的服务器IP地址 +2. **端口配置**: 默认端口5000,可在服务器代码中修改 +3. **数据更新**: 数据每天更新一次,建议小程序缓存数据 +4. **错误处理**: 请在小程序中添加网络错误处理逻辑 + +## 🛠️ 启动API服务器 + +```bash +# 安装依赖 +pip install -r requirements.txt + +# 启动服务器 +python douyin_api_server.py +``` + +服务器启动后会显示: +``` +🚀 启动抖音播放量API服务器... +📡 API地址: http://localhost:5000 +``` + +## 📞 技术支持 + +如有问题,请检查: +1. MongoDB是否正常运行 +2. 服务器端口是否被占用 +3. 网络连接是否正常 +4. 数据是否已更新 \ No newline at end of file diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..4804a00 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,51 @@ +# 抖音合集数据抓取工具 + +这是一个用于抓取抖音合集播放数据的Python脚本。 + +## 功能特点 + +- 使用Selenium处理动态加载的内容 +- 多种数据提取策略(页面源码、DOM元素、网络请求) +- 反爬虫机制规避 +- 错误处理和重试机制 +- 数据保存为JSON格式 + +## 安装依赖 + +```bash +pip3 install -r requirements.txt +``` + +## 使用方法 + +1. 确保已安装Chrome浏览器 +2. 运行脚本: + +```bash +python3 douyin_scraper.py +``` + +## 注意事项 + +1. **法律合规**: 请确保您的使用符合抖音的服务条款和相关法律法规 +2. **频率控制**: 避免过于频繁的请求,以免被反爬虫机制阻止 +3. **数据使用**: 抓取的数据仅供学习和研究使用,请勿用于商业用途 +4. **Chrome驱动**: 脚本会自动管理Chrome驱动,但请确保Chrome浏览器已安装 + +## 可能遇到的问题 + +1. **ChromeDriver问题**: 如果遇到驱动问题,请确保Chrome浏览器版本与ChromeDriver版本匹配 +2. **反爬虫限制**: 抖音有较强的反爬虫机制,可能需要调整请求频率或使用代理 +3. **页面结构变化**: 抖音页面结构可能会更新,导致数据提取失败,需要相应调整代码 + +## 输出数据格式 + +抓取的数据将保存为 `douyin_collection_data.json` 文件,包含: +- 合集标题 +- 播放数据 +- 视频列表信息 +- 统计数据 + +## 免责声明 + +本工具仅供学习和研究使用。使用者需要遵守相关法律法规和平台服务条款,作者不承担任何法律责任。 \ No newline at end of file diff --git a/docs/定时任务配置说明.md b/docs/定时任务配置说明.md new file mode 100644 index 0000000..a3172e6 --- /dev/null +++ b/docs/定时任务配置说明.md @@ -0,0 +1,190 @@ +# 抖音播放量自动更新定时任务配置说明 + +## 概述 +本文档详细说明如何配置Windows任务计划程序,实现每天晚上24:00(凌晨0点)自动运行抖音播放量抓取脚本。 + +## 前置条件 + +### 1. 确保MongoDB服务已安装并配置为自动启动 +- 下载并安装MongoDB Community Server +- 配置MongoDB服务为自动启动: + ```cmd + sc config MongoDB start= auto + ``` + +### 2. 确保Python环境正确 +- Python 3.7+ 已安装 +- 所需依赖包已安装: + ```cmd + pip install selenium pymongo webdriver-manager + ``` + +### 3. 浏览器登录状态保持 +- **重要**:首次运行前,需要手动运行脚本并登录抖音账号 +- 登录后,浏览器会保存登录状态到 `chrome_profile` 目录 +- 自动模式将使用已保存的登录状态 + +## 配置步骤 + +### 步骤1:创建日志目录 +在脚本目录下创建 `logs` 文件夹: +```cmd +mkdir c:\Users\EDY\Desktop\test\logs +``` + +### 步骤2:测试批处理脚本 +手动运行批处理脚本测试: +```cmd +cd c:\Users\EDY\Desktop\test +auto_update_douyin.bat +``` + +### 步骤3:打开任务计划程序 +1. 按 `Win + R`,输入 `taskschd.msc`,回车 +2. 或者在开始菜单搜索"任务计划程序" + +### 步骤4:创建基本任务 +1. 在右侧操作面板点击"创建基本任务" +2. 输入任务名称:`抖音播放量自动更新` +3. 输入描述:`每天凌晨0点自动抓取抖音收藏合集播放量数据` + +### 步骤5:设置触发器 +1. 选择"每天" +2. 设置开始时间:`22:00:00`(晚上10点) +3. 设置开始日期:选择明天的日期 +4. 重复间隔:`每 1 天` + +### 步骤6:设置操作 +1. 选择"启动程序" +2. 程序或脚本:`c:\Users\EDY\Desktop\test\auto_update_douyin.bat` +3. 添加参数:`--silent` +4. 起始于:`c:\Users\EDY\Desktop\test` + +### 步骤7:高级设置 +1. 勾选"如果任务失败,重新启动" +2. 设置重新启动间隔:`5分钟` +3. 设置重新启动次数:`3次` +4. 勾选"如果请求后任务还在运行,强行将其停止" +5. 设置停止任务时间:`2小时` + +### 步骤8:条件设置 +1. 取消勾选"只有在计算机使用交流电源时才启动此任务" +2. 勾选"唤醒计算机运行此任务" +3. 勾选"如果任务运行时间超过以下时间,则停止任务:2小时" + +## 文件结构 +``` +c:\Users\EDY\Desktop\test\ +├── douyin_playcount_scraper.py # 主抓取脚本 +├── auto_update_douyin.bat # 批处理启动脚本 +├── query_mongodb_data.py # 数据查询脚本 +├── chrome_profile\ # 浏览器配置文件目录 +├── logs\ # 日志文件目录 +│ ├── auto_update_20241215.log # 每日日志文件 +│ └── ... +├── douyin_collection_data.json # JSON数据文件 +├── douyin_collection_playcounts.txt # TXT格式数据文件 +└── 定时任务配置说明.md # 本说明文档 +``` + +## 日志文件说明 + +### 日志位置 +- 批处理日志:`logs\auto_update_YYYYMMDD.log` +- Python程序日志:同一文件,包含详细的执行信息 + +### 日志内容 +- 任务开始和结束时间 +- MongoDB连接状态 +- 数据抓取进度 +- 文件保存结果 +- 错误信息(如有) + +## 数据存储说明 + +### MongoDB数据结构 +```json +{ + "batch_id": "20241215_000000_abc123", + "batch_time": "2024-12-15 00:00:00", + "name": "剧本名称", + "playcount": "1.2万", + "playcount_raw": 12000 +} +``` + +### 数据特点 +- 每次运行生成新的批次ID +- 最新数据排在前面(按时间倒序) +- 历史数据完整保留 +- 支持按剧本名称搜索历史数据 + +## 故障排除 + +### 常见问题 + +#### 1. MongoDB连接失败 +**症状**:日志显示"MongoDB连接失败" +**解决方案**: +```cmd +# 检查MongoDB服务状态 +sc query MongoDB + +# 启动MongoDB服务 +net start MongoDB +``` + +#### 2. 浏览器登录失效 +**症状**:抓取失败,提示需要登录 +**解决方案**: +1. 手动运行脚本:`python douyin_playcount_scraper.py` +2. 重新登录抖音账号 +3. 登录状态会自动保存到 `chrome_profile` 目录 + +#### 3. Python环境问题 +**症状**:批处理脚本报错"Python未安装" +**解决方案**: +1. 确认Python已安装:`python --version` +2. 确认Python在系统PATH中 +3. 重新安装依赖:`pip install -r requirements.txt` + +#### 4. 权限问题 +**症状**:任务计划程序无法执行 +**解决方案**: +1. 以管理员身份运行任务计划程序 +2. 设置任务以最高权限运行 +3. 确保用户账户有足够权限 + +### 监控和维护 + +#### 查看任务执行历史 +1. 打开任务计划程序 +2. 找到"抖音播放量自动更新"任务 +3. 查看"历史记录"选项卡 + +#### 查看数据抓取结果 +运行查询脚本: +```cmd +python query_mongodb_data.py +``` + +#### 手动测试 +定期手动运行测试: +```cmd +python douyin_playcount_scraper.py --auto-mode +``` + +## 注意事项 + +1. **网络连接**:确保计算机在凌晨0点时有稳定的网络连接 +2. **电源管理**:建议设置计算机不要在夜间自动休眠 +3. **防火墙**:确保防火墙不会阻止浏览器和MongoDB的网络访问 +4. **磁盘空间**:定期清理旧的日志文件,避免占用过多磁盘空间 +5. **数据备份**:建议定期备份MongoDB数据库 + +## 联系支持 +如遇到问题,请检查: +1. 日志文件中的错误信息 +2. 任务计划程序的执行历史 +3. MongoDB服务状态 +4. 网络连接状态 \ No newline at end of file diff --git a/docs/需求.md b/docs/需求.md new file mode 100644 index 0000000..212f61b --- /dev/null +++ b/docs/需求.md @@ -0,0 +1,33 @@ +现在这是我的项目概览: +项目概览 + +- 目标:抓取抖音“合集”和“单个视频”的数据,用于分析。 +- 技术: selenium + webdriver-manager ,少量 requests ;支持无头模式与手动登录。 +- 能力:从页面源码 JSON、DOM 选择器等多源提取数据,含反爬规避与调试截图。 +脚本与输出 + +- douyin_scraper.py :抓取合集数据(标题、播放/点赞统计、视频列表);输出 douyin_collection_data.json 。 +- douyin_scraper_enhanced.py :合集增强版(手动登录、页面状态分析、截图);输出 douyin_collection_data_enhanced.json 。 +- douyin_video_scraper.py :抓取单视频互动数据(点赞/评论/分享/收藏/播放等)、基本信息与评论采样;输出 douyin_video_data.json ,保存页面截图。 +运行与依赖 + +- 依赖: requirements.txt ( selenium 、 requests 、 webdriver-manager )。 +- 运行示例: python3 douyin_scraper.py 、交互式增强版/视频版可选择手动登录。 +注意事项 + +- 合规与频控:遵守平台条款与法律,控制抓取频率。 +- 结构变更:抖音页面结构可能更新,需调整选择器/解析逻辑。 +- 环境匹配:确保本机 Chrome 与 ChromeDriver 版本兼容。 + +需求: +根据链接https://www.douyin.com/video/7556193043586600201 +后面的7556193043586600201是可以替换的,获取到所有的短剧集数视频列表 +7556193043586600201这个就是集数,每一集的这个都不一样 +进行爬取短剧的点赞数,评论数,收藏数,全部评论内容 + +总体来说就是输入视频的链接也就是这个https://www.douyin.com/video/7556193043586600201,然后就可以自动的获取到该系列所有的视频列表,有了列表之后只需要遍历列表,对每个视频进行单独的爬取,之后就可以获取到该视频的点赞数、收藏数、评论数、评论内容了,获取的视频列表一定要保存起来 + +但是现在有一个简单的要求,就是可以输入一个视频的链接,然后只对这一集来进行爬取点赞数、收藏数、评论数、评论内容 + +流程: +输入视频链接->自动识别短剧集数列表->通过列表抓取所有集数短剧视频->获取该系列每个视频的点赞数、收藏数、评论数、评论内容->打印出来即可 \ No newline at end of file diff --git a/scripts/check_mongodb.py b/scripts/check_mongodb.py new file mode 100644 index 0000000..847cb0f --- /dev/null +++ b/scripts/check_mongodb.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +检查MongoDB数据保存状态 +""" + +from pymongo import MongoClient +from datetime import datetime +import sys + +def check_mongodb(): + """检查MongoDB连接和数据""" + try: + # 使用与主脚本相同的连接参数 + client = MongoClient('localhost', 27017, serverSelectionTimeoutMS=5000) + + # 测试连接 + client.admin.command('ping') + print("MongoDB连接成功") + + # 检查数据库和集合 + db = client['douyin_data'] + collection = db['play_vv_records'] + + total_count = collection.count_documents({}) + print(f"总记录数: {total_count}") + + # 检查今天的数据 + today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + today_count = collection.count_documents({'batch_time': {'$gte': today_start}}) + print(f"今天的数据记录数: {today_count}") + + # 显示最新5条记录(按时间倒序排列) + print("\n最新5条记录(按时间倒序排列):") + print("-" * 60) + for doc in collection.find().sort('batch_time', -1).limit(5): + print(f"合集名称: {doc.get('mix_name', '未知')}") + print(f"播放量: {doc.get('play_vv', 0):,} ({doc.get('playcount', '')})") + print(f"合集链接: {doc.get('video_url', '')}") + print(f"保存时间: {doc.get('batch_time', '')}") + print(f"视频ID数: {len(doc.get('aweme_ids', []))}") + print(f"封面图片: {'有' if doc.get('cover_image_url') else '无'}") + print("-" * 60) + + # 显示字段结构 + if total_count > 0: + sample = collection.find_one() + print(f"\n文档字段结构:") + for key in sample.keys(): + print(f" - {key}: {type(sample[key]).__name__}") + + except Exception as e: + print(f"检查MongoDB时出错: {e}") + return False + + return True + +if __name__ == '__main__': + print("=== MongoDB数据检查 ===") + success = check_mongodb() + if success: + print("\n检查完成") + else: + print("\n检查失败") + sys.exit(1) \ No newline at end of file diff --git a/scripts/douyin_auto_scheduler.py b/scripts/douyin_auto_scheduler.py new file mode 100644 index 0000000..85d1f42 --- /dev/null +++ b/scripts/douyin_auto_scheduler.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +抖音播放量自动抓取定时器 - 跨平台版本 + +功能: +- 每天上午9:35自动执行抖音播放量抓取任务 +- 支持Windows、macOS、Linux +- 自动保存数据到MongoDB +""" + +import schedule +import time +import subprocess +import sys +import os +import logging +from pathlib import Path +from datetime import datetime + +# 配置日志的函数 +def setup_logging(): + """设置日志配置""" + # 确保logs目录存在 + os.makedirs('../logs', exist_ok=True) + + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('../logs/scheduler.log', encoding='utf-8'), + logging.StreamHandler() + ] + ) + +class DouyinAutoScheduler: + def __init__(self): + self.is_running = False + + def run_douyin_scraper(self): + """执行抖音播放量抓取任务""" + try: + logging.info("🚀 开始执行抖音播放量抓取任务...") + + # 设置环境变量,确保自动模式 + os.environ['AUTO_CONTINUE'] = '1' + + # 构建脚本路径 - 现在在同一目录下 + script_path = Path(__file__).parent / 'douyin_selenium_cdp_play_vv.py' + + if not script_path.exists(): + logging.error(f"❌ 脚本文件不存在: {script_path}") + return + + logging.info(f"📁 执行脚本: {script_path}") + + # 使用subprocess执行脚本 + result = subprocess.run([ + sys.executable, + str(script_path), + '--auto', + '--duration', '60' + ], capture_output=True, text=True, encoding='utf-8', errors='ignore') + + if result.returncode == 0: + logging.info("✅ 抖音播放量抓取任务执行成功") + if result.stdout: + logging.info(f"📄 输出: {result.stdout.strip()}") + else: + logging.error(f"❌ 任务执行失败,返回码: {result.returncode}") + if result.stderr: + logging.error(f"💥 错误信息: {result.stderr.strip()}") + if result.stdout: + logging.info(f"📄 输出: {result.stdout.strip()}") + + except Exception as e: + logging.error(f"💥 执行任务时发生异常: {e}") + + def setup_schedule(self): + """设置定时任务""" + # 主执行时间:每天上午9:35 + schedule.every().day.at("09:35").do(self.run_douyin_scraper) + + logging.info("⏰ 定时器已设置:每天上午9:35执行抖音播放量抓取") + + def show_next_run(self): + """显示下次执行时间""" + jobs = schedule.get_jobs() + if jobs: + next_run = jobs[0].next_run + logging.info(f"⏰ 下次执行时间: {next_run}") + + def run_once(self): + """立即执行一次""" + logging.info("🔧 立即执行模式...") + self.run_douyin_scraper() + + def run_test(self): + """测试模式 - 立即执行一次""" + logging.info("🧪 测试模式 - 立即执行抖音播放量抓取任务...") + self.run_douyin_scraper() + + def start_scheduler(self): + """启动定时器""" + self.is_running = True + logging.info("🚀 抖音播放量自动抓取定时器已启动") + logging.info("⏰ 执行时间:每天上午9:35") + logging.info("📁 目标脚本:douyin_selenium_cdp_play_vv.py") + logging.info("💾 数据保存:MongoDB") + logging.info("⏹️ 按 Ctrl+C 停止定时器") + + self.show_next_run() + + try: + while self.is_running: + schedule.run_pending() + time.sleep(1) + + # 每分钟显示一次状态 + if int(time.time()) % 60 == 0: + self.show_next_run() + + except KeyboardInterrupt: + logging.info("\n⏹️ 定时器已停止") + self.is_running = False + +def main(): + """主函数""" + import argparse + + parser = argparse.ArgumentParser(description='抖音播放量自动抓取定时器') + parser.add_argument('--test', action='store_true', help='测试模式 - 立即执行一次') + parser.add_argument('--once', action='store_true', help='立即执行一次并退出') + + args = parser.parse_args() + + # 设置日志配置 + setup_logging() + + scheduler = DouyinAutoScheduler() + + if args.test: + scheduler.run_test() + elif args.once: + scheduler.run_once() + else: + scheduler.setup_schedule() + scheduler.start_scheduler() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/scripts/douyin_selenium_cdp_play_vv.py b/scripts/douyin_selenium_cdp_play_vv.py new file mode 100644 index 0000000..99ec123 --- /dev/null +++ b/scripts/douyin_selenium_cdp_play_vv.py @@ -0,0 +1,1052 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Selenium + Chrome DevTools Protocol 抓取抖音收藏合集真实播放量(play_vv) + +核心能力: +- 启用CDP网络事件,获取响应体并解析play_vv +- 复用本地Chrome用户数据,绕过登录障碍 +- 自动滚动与刷新触发更多API请求 +- 同时解析页面中的SSR数据(window._SSR_HYDRATED_DATA/RENDER_DATA) + +使用方法: +1) 默认复用 `config/chrome_profile` 下的已登录Chrome配置。 +2) 若仍需登录,请在弹出的Chrome中完成登录后回到终端按回车。 +3) 程序会滚动和刷新,自动收集网络数据并提取play_vv。 +""" + +import json +import re +import subprocess +import time +import logging +import os +import shutil +from datetime import datetime + +from selenium import webdriver +import os +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.chrome.options import Options +# 保留导入但默认不使用webdriver_manager,避免网络下载卡顿 +from webdriver_manager.chrome import ChromeDriverManager # noqa: F401 +import chromedriver_autoinstaller +from pymongo import MongoClient +from pymongo.errors import ConnectionFailure + + +logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') + + +class DouyinPlayVVScraper: + def __init__(self, start_url: str = None, auto_continue: bool = False, duration_s: int = 60): + self.start_url = start_url or "https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation" + self.auto_continue = auto_continue + self.duration_s = duration_s + self.driver = None + self.play_vv_items = [] # list of dicts: {play_vv, formatted, url, request_id, mix_name, watched_item} + self.captured_responses = [] + self.collected_aweme_ids = [] # 收集到的视频ID列表 + self.mix_aweme_mapping = {} # 合集ID到视频ID列表的映射 + self.mongo_client = None + self.db = None + self.collection = None + self._cleanup_old_profiles() + self._setup_mongodb() + + def _setup_mongodb(self): + """设置MongoDB连接""" + try: + # MongoDB连接配置 + mongo_host = os.environ.get('MONGO_HOST', 'localhost') + mongo_port = int(os.environ.get('MONGO_PORT', 27017)) + mongo_db = os.environ.get('MONGO_DB', 'douyin_data') + mongo_collection = os.environ.get('MONGO_COLLECTION', 'play_vv_records') + + # 创建MongoDB连接 + self.mongo_client = MongoClient(mongo_host, mongo_port, serverSelectionTimeoutMS=5000) + + # 测试连接 + self.mongo_client.admin.command('ping') + + # 设置数据库和集合 + self.db = self.mongo_client[mongo_db] + self.collection = self.db[mongo_collection] + + logging.info(f'MongoDB连接成功: {mongo_host}:{mongo_port}/{mongo_db}.{mongo_collection}') + + except ConnectionFailure as e: + logging.warning(f'MongoDB连接失败: {e}') + logging.info('将仅保存到本地文件') + self.mongo_client = None + self.db = None + self.collection = None + except Exception as e: + logging.warning(f'MongoDB设置出错: {e}') + self.mongo_client = None + self.db = None + self.collection = None + + def _cleanup_old_profiles(self): + """清理超过一天的旧临时Chrome配置文件""" + try: + profile_base_dir = os.path.abspath(os.path.join('.', 'config', 'chrome_profile')) + if not os.path.exists(profile_base_dir): + return + + current_time = time.time() + one_day_ago = current_time - 24 * 60 * 60 # 24小时前 + + for item in os.listdir(profile_base_dir): + if item.startswith('run_'): + item_path = os.path.join(profile_base_dir, item) + if os.path.isdir(item_path): + try: + # 提取时间戳 + timestamp = int(item.split('_')[1]) + if timestamp < one_day_ago: + shutil.rmtree(item_path, ignore_errors=True) + logging.info(f'清理旧配置文件: {item}') + except (ValueError, IndexError): + # 如果无法解析时间戳,跳过 + continue + except Exception as e: + logging.warning(f'清理旧配置文件时出错: {e}') + + def _cleanup_chrome_processes(self): + """清理可能占用配置文件的Chrome进程""" + try: + import subprocess + import psutil + + # 获取当前配置文件路径 + profile_dir = os.path.abspath(os.path.join('.', 'config', 'chrome_profile', 'douyin_persistent')) + + # 查找使用该配置文件的Chrome进程 + killed_processes = [] + for proc in psutil.process_iter(['pid', 'name', 'cmdline']): + try: + if proc.info['name'] and 'chrome' in proc.info['name'].lower(): + cmdline = proc.info['cmdline'] + if cmdline and any(profile_dir in arg for arg in cmdline): + proc.terminate() + killed_processes.append(proc.info['pid']) + logging.info(f'终止占用配置文件的Chrome进程: PID {proc.info["pid"]}') + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + continue + + # 等待进程终止 + if killed_processes: + time.sleep(2) + + return len(killed_processes) > 0 + + except ImportError: + # 如果没有psutil,使用系统命令 + try: + result = subprocess.run(['taskkill', '/f', '/im', 'chrome.exe'], + capture_output=True, text=True, timeout=10) + if result.returncode == 0: + logging.info('使用taskkill清理Chrome进程') + time.sleep(2) + return True + except Exception as e: + logging.warning(f'清理Chrome进程失败: {e}') + return False + except Exception as e: + logging.warning(f'清理Chrome进程时出错: {e}') + return False + + def setup_driver(self): + logging.info('初始化Chrome WebDriver (启用CDP网络日志)') + + # 清理可能占用配置文件的Chrome进程 + self._cleanup_chrome_processes() + + chrome_options = Options() + chrome_options.add_argument('--no-sandbox') + chrome_options.add_argument('--disable-dev-shm-usage') + chrome_options.add_argument('--disable-blink-features=AutomationControlled') + chrome_options.add_experimental_option('excludeSwitches', ['enable-automation']) + chrome_options.add_experimental_option('useAutomationExtension', False) + chrome_options.add_argument('--disable-extensions') + chrome_options.add_argument('--remote-allow-origins=*') + chrome_options.add_argument('--remote-debugging-port=0') + chrome_options.add_argument('--start-maximized') + chrome_options.add_argument('--lang=zh-CN') + # 使用固定的Chrome配置文件目录以保持登录状态 + profile_dir = os.path.abspath(os.path.join('.', 'config', 'chrome_profile', 'douyin_persistent')) + os.makedirs(profile_dir, exist_ok=True) + chrome_options.add_argument(f'--user-data-dir={profile_dir}') + logging.info(f'使用持久化Chrome配置文件: {profile_dir}') + # 明确设置Chrome二进制路径(32位Chrome常见安装位置) + possible_chrome_bins = [ + r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe", + r"C:\Program Files\Google\Chrome\Application\chrome.exe" + ] + for bin_path in possible_chrome_bins: + if os.path.exists(bin_path): + chrome_options.binary_location = bin_path + logging.info(f'使用Chrome二进制路径: {bin_path}') + break + # 性能日志(Network事件) + chrome_options.set_capability('goog:loggingPrefs', {'performance': 'ALL'}) + + # 仅使用本地或PATH中的chromedriver,避免网络下载依赖 + driver_ready = False + candidates = [] + # 可通过环境变量强制覆盖驱动路径 + env_override = os.environ.get('OVERRIDE_CHROMEDRIVER') + if env_override: + candidates.append(env_override) + logging.info(f'检测到环境变量 OVERRIDE_CHROMEDRIVER,优先使用: {env_override}') + # 优先使用用户提供的路径 + user_driver_path = os.path.join(os.getcwd(), 'drivers', 'chromedriver.exe') + candidates.append(user_driver_path) + logging.info(f'优先尝试用户提供路径: {user_driver_path}') + # 项目根目录 + candidates.append(os.path.join(os.getcwd(), 'chromedriver.exe')) + # 其他可能目录 + candidates.append(os.path.join(os.getcwd(), 'drivers', 'chromedriver')) + # PATH 中的chromedriver + which_path = shutil.which('chromedriver') + if which_path: + candidates.append(which_path) + + if not driver_ready: + for p in candidates: + try: + if p and os.path.exists(p): + logging.info(f'尝试使用chromedriver: {p}') + service = Service(p) + self.driver = webdriver.Chrome(service=service, options=chrome_options) + driver_ready = True + logging.info(f'使用chromedriver启动成功: {p}') + try: + caps = self.driver.capabilities + browser_ver = caps.get('browserVersion') or caps.get('version') + cdver = caps.get('chrome', {}).get('chromedriverVersion') + logging.info(f'Chrome版本: {browser_ver}, ChromeDriver版本: {cdver}') + except Exception: + pass + break + else: + logging.info(f'候选路径不存在: {p}') + except Exception as e: + logging.warning(f'尝试使用 {p} 启动失败: {e}') + + if not driver_ready: + # 最终回退:使用webdriver-manager(可能需要网络) + try: + service = Service(ChromeDriverManager().install()) + self.driver = webdriver.Chrome(service=service, options=chrome_options) + driver_ready = True + logging.info('使用webdriver-manager成功启动ChromeDriver') + except Exception as e: + raise RuntimeError('未能启动ChromeDriver。请手动下载匹配版本的chromedriver到项目根目录或PATH,或检查网络以允许webdriver-manager下载。错误: ' + str(e)) + + # 反检测 + try: + self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") + except Exception: + pass + + # 启用CDP Network + try: + self.driver.execute_cdp_cmd('Network.enable', {}) + logging.info('已启用CDP Network') + except Exception as e: + logging.warning(f'启用CDP Network失败: {e}') + + def navigate(self): + logging.info(f'导航到: {self.start_url}') + self.driver.get(self.start_url) + time.sleep(3) + + def ensure_login(self): + """确保用户已登录并导航到收藏合集页面""" + logging.info("检测登录状态和页面位置...") + + # 首先检查是否已经登录并在正确页面 + if self._check_login_and_page(): + logging.info("检测到已登录且在收藏合集页面,跳过手动确认") + return + + # 如果未登录或不在正确页面,进行手动登录流程 + logging.info("请在弹出的浏览器中手动完成登录。") + + if self.auto_continue: + logging.info('自动继续模式,跳过手动等待...') + time.sleep(5) + return + + logging.info("进入手动登录确认循环...") + while True: + # 要求用户输入特定文本确认 + logging.info("等待用户输入确认...") + user_input = input("请在浏览器中完成登录,并导航到【我的】→【收藏】→【合集】页面。操作完成后,请在此处输入 'ok' 并按回车: ") + + if user_input.strip().lower() != 'ok': + logging.warning("请输入 'ok' 确认您已完成登录并导航到【我的】→【收藏】→【合集】页面。") + continue + + logging.info("用户已确认,检查当前页面...") + + try: + current_url = self.driver.current_url + logging.info(f"当前页面URL: {current_url}") + if ("douyin.com/user/self" in current_url and + ("favorite_collection" in current_url or "compilation" in current_url)): + logging.info(f"已确认您位于收藏合集列表页面: {current_url}") + logging.info("脚本将继续执行...") + break + else: + # 用户确认了,但页面不正确,继续循环等待 + logging.warning(f"检测到当前页面 ({current_url}) 并非收藏合集列表页面。请确保已导航至【我的】→【收藏】→【合集】页面。") + + except Exception as e: + if "browser has been closed" in str(e) or "no such window" in str(e) or "target window already closed" in str(e): + logging.error("浏览器窗口已关闭,脚本无法继续。") + raise RuntimeError("浏览器窗口已关闭") + logging.warning(f"检测URL时出错: {e}。请重试。") + time.sleep(1) + + def _check_login_and_page(self, timeout: int = 30) -> bool: + """检查是否已登录并在正确页面""" + try: + current_url = self.driver.current_url + logging.info(f"当前页面URL: {current_url}") + + # 检查是否在收藏合集页面 + if ("douyin.com/user/self" in current_url and + ("favorite_collection" in current_url or "compilation" in current_url)): + # 进一步检查登录状态 + return self._detect_login_status(timeout) + else: + # 如果不在正确页面,尝试导航到收藏合集页面 + if self._detect_login_status(timeout): + logging.info("已登录但不在收藏合集页面,自动导航...") + self.driver.get(self.start_url) + time.sleep(3) + return True + return False + except Exception as e: + logging.warning(f"检查登录状态时出错: {e}") + return False + + def _detect_login_status(self, timeout: int = 30) -> bool: + """自动检测是否已登录""" + try: + start = time.time() + while time.time() - start < timeout: + time.sleep(2) + # 检查登录状态的多个选择器 + selectors = [ + '[data-e2e="user-avatar"]', + '.user-avatar', + '[class*="avatar"]', + '[class*="Avatar"]' + ] + + for selector in selectors: + try: + elements = self.driver.find_elements("css selector", selector) + if elements: + logging.info("检测到用户头像,确认已登录") + return True + except Exception: + continue + + # 检查是否有登录按钮(表示未登录) + login_selectors = [ + '[data-e2e="login-button"]', + 'button[class*="login"]', + 'a[href*="login"]' + ] + + for selector in login_selectors: + try: + elements = self.driver.find_elements("css selector", selector) + if elements: + logging.info("检测到登录按钮,用户未登录") + return False + except Exception: + continue + + logging.info("登录状态检测超时,假设未登录") + return False + except Exception as e: + logging.warning(f"登录状态检测出错: {e}") + return False + + def trigger_loading(self): + logging.info('触发数据加载:滚动 + 刷新') + # 滚动触发懒加载 + for i in range(8): + self.driver.execute_script(f'window.scrollTo(0, {i * 900});') + time.sleep(1.2) + # 刷新触发新请求 + self.driver.refresh() + time.sleep(4) + for i in range(6): + self.driver.execute_script(f'window.scrollTo(0, {i * 1200});') + time.sleep(1.3) + + def format_count(self, n: int) -> str: + if n >= 100_000_000: + return f"{n/100_000_000:.1f}亿" + if n >= 10_000: + return f"{n/10_000:.1f}万" + return str(n) + + def _trigger_mix_aweme_api(self, mix_id: str): + """主动触发/aweme/v1/web/mix/aweme/ API调用来获取合集中的视频列表""" + try: + if not self.driver: + logging.warning('WebDriver不可用,无法触发API调用') + return + + logging.info(f'主动触发mix/aweme API调用,获取合集 {mix_id} 的视频列表') + + # 构建API URL + api_url = f"https://www.douyin.com/aweme/v1/web/mix/aweme/?mix_id={mix_id}&count=20&cursor=0" + + # 使用JavaScript发起fetch请求并直接处理响应 + js_code = f""" + (async function() {{ + try {{ + const response = await fetch('{api_url}', {{ + method: 'GET', + credentials: 'include', + headers: {{ + 'Accept': 'application/json', + 'User-Agent': navigator.userAgent + }} + }}); + + if (response.ok) {{ + const data = await response.json(); + console.log('Mix aweme API response for {mix_id}:', data); + + // 提取aweme_id列表 + let awemeIds = []; + if (data && data.aweme_list && Array.isArray(data.aweme_list)) {{ + awemeIds = data.aweme_list.map(aweme => aweme.aweme_id).filter(id => id); + }} else if (data && data.data && Array.isArray(data.data)) {{ + awemeIds = data.data.map(aweme => aweme.aweme_id).filter(id => id); + }} + + // 将结果存储到window对象中,供Python读取 + if (!window.mixAwemeResults) {{ + window.mixAwemeResults = {{}}; + }} + window.mixAwemeResults['{mix_id}'] = {{ + aweme_ids: awemeIds, + total_count: awemeIds.length, + raw_data: data + }}; + + console.log('Extracted aweme_ids for {mix_id}:', awemeIds); + return awemeIds; + }} else {{ + console.error('Mix aweme API failed for {mix_id}:', response.status); + return []; + }} + }} catch (error) {{ + console.error('Mix aweme API error for {mix_id}:', error); + return []; + }} + }})(); + """ + + # 执行JavaScript代码 + result = self.driver.execute_script(js_code) + + # 等待一下,然后读取结果 + time.sleep(2) + + # 从window对象中读取结果 + try: + js_get_result = f""" + return window.mixAwemeResults && window.mixAwemeResults['{mix_id}'] + ? window.mixAwemeResults['{mix_id}'] + : null; + """ + stored_result = self.driver.execute_script(js_get_result) + + if stored_result and stored_result.get('aweme_ids'): + aweme_ids = stored_result['aweme_ids'] + logging.info(f'成功获取合集 {mix_id} 的 {len(aweme_ids)} 个视频ID: {aweme_ids[:5]}...') + + # 将aweme_ids添加到类属性中 + if not hasattr(self, 'collected_aweme_ids'): + self.collected_aweme_ids = [] + + # 为这个特定的mix_id存储aweme_ids + if not hasattr(self, 'mix_aweme_mapping'): + self.mix_aweme_mapping = {} + self.mix_aweme_mapping[mix_id] = aweme_ids + + # 也添加到总的collected_aweme_ids中 + self.collected_aweme_ids.extend(aweme_ids) + + logging.info(f'已将 {len(aweme_ids)} 个视频ID添加到合集 {mix_id}') + else: + logging.warning(f'未能获取合集 {mix_id} 的视频ID') + + except Exception as e: + logging.warning(f'读取JavaScript结果失败: {e}') + + logging.info(f'已完成mix/aweme API调用,mix_id: {mix_id}') + + except Exception as e: + logging.warning(f'触发mix/aweme API调用失败: {e}') + + def parse_mix_aweme_response(self, text: str, source_url: str, request_id: str = None): + """解析合集中的视频列表API响应,提取单个视频的aweme_id和播放量""" + try: + if not text.strip(): + return + + # 尝试解析JSON响应 + try: + data = json.loads(text) + except json.JSONDecodeError: + logging.warning(f'mix/aweme API响应不是有效JSON: {source_url}') + return + + # 查找aweme_list或类似的视频列表 + aweme_list = None + if isinstance(data, dict): + # 常见的响应结构 + for key in ['aweme_list', 'data', 'awemes', 'items']: + if key in data and isinstance(data[key], list): + aweme_list = data[key] + break + + # 如果没有直接找到,递归查找 + if aweme_list is None: + aweme_list = self._find_aweme_list_recursive(data) + + if aweme_list and isinstance(aweme_list, list): + logging.info(f'从mix/aweme API找到 {len(aweme_list)} 个视频') + + # 收集所有aweme_id,用于后续与合集数据关联 + aweme_ids = [] + for aweme in aweme_list: + if isinstance(aweme, dict): + aweme_id = aweme.get('aweme_id', '') + if aweme_id: + aweme_ids.append(aweme_id) + + # 获取视频标题 + desc = aweme.get('desc', '') + if not desc: + # 尝试从其他字段获取标题 + text_extra = aweme.get('text_extra', []) + if text_extra and isinstance(text_extra, list): + desc = ' '.join([item.get('hashtag_name', '') for item in text_extra if isinstance(item, dict)]) + + logging.info(f'找到视频ID: {aweme_id} - {desc[:50]}...') + + # 将aweme_ids存储到类属性中,供其他函数使用 + if not hasattr(self, 'collected_aweme_ids'): + self.collected_aweme_ids = [] + self.collected_aweme_ids.extend(aweme_ids) + + logging.info(f'累计收集到 {len(self.collected_aweme_ids)} 个视频ID') + else: + logging.warning(f'mix/aweme API响应中未找到视频列表: {source_url}') + + except Exception as e: + logging.warning(f'解析mix/aweme API响应时出错: {e}') + + def _find_aweme_list_recursive(self, obj, max_depth=3, current_depth=0): + """递归查找aweme_list""" + if current_depth >= max_depth: + return None + + if isinstance(obj, dict): + for key, value in obj.items(): + if 'aweme' in key.lower() and isinstance(value, list): + # 检查列表中是否包含aweme对象 + if value and isinstance(value[0], dict) and 'aweme_id' in value[0]: + return value + + if isinstance(value, (dict, list)): + result = self._find_aweme_list_recursive(value, max_depth, current_depth + 1) + if result: + return result + + elif isinstance(obj, list): + for item in obj: + if isinstance(item, (dict, list)): + result = self._find_aweme_list_recursive(item, max_depth, current_depth + 1) + if result: + return result + + return None + + def parse_play_vv_from_text(self, text: str, source_url: str, request_id: str = None): + """解析文本中的play_vv、mix_name和watched_item信息""" + try: + # 尝试解析JSON数据 + if text.strip().startswith('{') or text.strip().startswith('['): + try: + data = json.loads(text) + self._extract_from_json_data(data, source_url, request_id) + return + except json.JSONDecodeError: + pass + + # 如果不是JSON,使用正则表达式查找 + self._extract_from_text_regex(text, source_url, request_id) + + except Exception as e: + logging.warning(f'解析文本数据时出错: {e}') + + def _extract_from_json_data(self, data, source_url: str, request_id: str = None): + """从JSON数据中递归提取合集信息""" + def extract_mix_info(obj, path=""): + if isinstance(obj, dict): + # 检查是否包含合集信息 + if 'mix_id' in obj and 'statis' in obj: + mix_id = obj.get('mix_id', '') + mix_name = obj.get('mix_name', '') + statis = obj.get('statis', {}) + + # 调试:输出包含mix_id的完整对象结构(仅输出前3个) + if len(self.play_vv_items) < 3: + logging.info(f"=== 调试:合集对象结构 ===") + logging.info(f"完整对象键: {list(obj.keys())}") + # 查找可能的视频相关字段 + for key, value in obj.items(): + if 'aweme' in key.lower() or 'video' in key.lower() or 'item' in key.lower() or 'ids' in key.lower(): + logging.info(f"可能的视频字段 {key}: {type(value)} - {str(value)[:200]}") + + # 特别检查ids字段 + if 'ids' in obj: + ids_value = obj['ids'] + logging.info(f"ids字段详细信息: {type(ids_value)} - {ids_value}") + if isinstance(ids_value, list) and len(ids_value) > 0: + logging.info(f"ids列表长度: {len(ids_value)}") + logging.info(f"第一个ID: {ids_value[0]}") + if len(ids_value) > 1: + logging.info(f"第二个ID: {ids_value[1]}") + + if isinstance(statis, dict) and 'play_vv' in statis: + play_vv = statis.get('play_vv') + if isinstance(play_vv, (int, str)) and str(play_vv).isdigit(): + vv = int(play_vv) + # 构建合集链接 + video_url = f"https://www.douyin.com/collection/{mix_id}" if mix_id else "" + + # 获取该合集对应的aweme_id列表 + mix_aweme_mapping = getattr(self, 'mix_aweme_mapping', {}) + aweme_ids = mix_aweme_mapping.get(mix_id, []) + + # 提取合集封面图片URL - 直接存储完整的图片链接 + cover_image_url = "" + cover_image_backup_urls = [] # 备用链接列表 + + # 查找封面图片字段,优先获取完整的URL链接 + if 'cover' in obj: + cover = obj['cover'] + if isinstance(cover, dict) and 'url_list' in cover and cover['url_list']: + # 主链接 + cover_image_url = cover['url_list'][0] + # 备用链接 + cover_image_backup_urls = cover['url_list'][1:] if len(cover['url_list']) > 1 else [] + elif isinstance(cover, str): + cover_image_url = cover + elif 'cover_url' in obj: + cover_url = obj['cover_url'] + if isinstance(cover_url, dict) and 'url_list' in cover_url and cover_url['url_list']: + cover_image_url = cover_url['url_list'][0] + cover_image_backup_urls = cover_url['url_list'][1:] if len(cover_url['url_list']) > 1 else [] + elif isinstance(cover_url, str): + cover_image_url = cover_url + elif 'image' in obj: + image = obj['image'] + if isinstance(image, dict) and 'url_list' in image and image['url_list']: + cover_image_url = image['url_list'][0] + cover_image_backup_urls = image['url_list'][1:] if len(image['url_list']) > 1 else [] + elif isinstance(image, str): + cover_image_url = image + elif 'pic' in obj: + pic = obj['pic'] + if isinstance(pic, dict) and 'url_list' in pic and pic['url_list']: + cover_image_url = pic['url_list'][0] + cover_image_backup_urls = pic['url_list'][1:] if len(pic['url_list']) > 1 else [] + elif isinstance(pic, str): + cover_image_url = pic + + self.play_vv_items.append({ + 'play_vv': vv, + 'formatted': self.format_count(vv), + 'url': source_url, + 'request_id': request_id, + 'mix_name': mix_name, + 'video_url': video_url, # 合集链接 + 'mix_id': mix_id, # 合集ID + 'aweme_ids': aweme_ids.copy() if aweme_ids else [], # 该合集包含的视频ID列表 + 'cover_image_url': cover_image_url, # 合集封面图片主链接(完整URL) + 'cover_backup_urls': cover_image_backup_urls, # 封面图片备用链接列表 + 'timestamp': datetime.now().isoformat() + }) + logging.info(f'提取到合集: {mix_name} (ID: {mix_id}, 包含{len(aweme_ids)}个视频) - {vv:,} 播放量') + + # 如果aweme_ids为空,主动触发API调用获取合集中的视频列表 + if not aweme_ids and mix_id: + self._trigger_mix_aweme_api(mix_id) + + # 递归搜索子对象 + for key, value in obj.items(): + if isinstance(value, (dict, list)): + extract_mix_info(value, f"{path}.{key}" if path else key) + + elif isinstance(obj, list): + for i, item in enumerate(obj): + if isinstance(item, (dict, list)): + extract_mix_info(item, f"{path}[{i}]" if path else f"[{i}]") + + extract_mix_info(data) + + def _extract_from_text_regex(self, text: str, source_url: str, request_id: str = None): + """使用正则表达式从文本中提取信息""" + # 查找包含完整合集信息的JSON片段 + mix_pattern = r'\{[^{}]*"mix_id"\s*:\s*"([^"]*)"[^{}]*"mix_name"\s*:\s*"([^"]*)"[^{}]*"statis"\s*:\s*\{[^{}]*"play_vv"\s*:\s*(\d+)[^{}]*\}[^{}]*\}' + + for match in re.finditer(mix_pattern, text): + try: + mix_id = match.group(1) + mix_name = match.group(2) + vv = int(match.group(3)) + + # 构建合集链接 + video_url = f"https://www.douyin.com/collection/{mix_id}" if mix_id else "" + + # 获取该合集对应的aweme_id列表 + mix_aweme_mapping = getattr(self, 'mix_aweme_mapping', {}) + aweme_ids = mix_aweme_mapping.get(mix_id, []) + + self.play_vv_items.append({ + 'play_vv': vv, + 'formatted': self.format_count(vv), + 'url': source_url, + 'request_id': request_id, + 'mix_name': mix_name, + 'video_url': video_url, # 合集链接 + 'mix_id': mix_id, # 合集ID + 'aweme_ids': aweme_ids.copy() if aweme_ids else [], # 该合集包含的视频ID列表 + 'timestamp': datetime.now().isoformat() + }) + logging.info(f'正则提取到合集: {mix_name} (ID: {mix_id}, 包含{len(aweme_ids)}个视频) - {vv:,} 播放量') + + # 如果aweme_ids为空,主动触发API调用获取合集中的视频列表 + if not aweme_ids and mix_id: + self._trigger_mix_aweme_api(mix_id) + except Exception: + continue + + # 兜底:查找单独的play_vv值 + for match in re.findall(r'"play_vv"\s*:\s*(\d+)', text): + try: + vv = int(match) + # 检查是否已经存在相同的play_vv + if not any(item['play_vv'] == vv for item in self.play_vv_items): + # 获取收集到的aweme_id列表 + aweme_ids = getattr(self, 'collected_aweme_ids', []) + + self.play_vv_items.append({ + 'play_vv': vv, + 'formatted': self.format_count(vv), + 'url': source_url, + 'request_id': request_id, + 'mix_name': '', # 未知合集名称 + 'video_url': '', # 未知链接 + 'mix_id': '', # 未知mix_id + 'aweme_ids': aweme_ids.copy() if aweme_ids else [], # 收集到的视频ID列表 + 'timestamp': datetime.now().isoformat() + }) + except Exception: + continue + + def collect_network_bodies(self, duration_s: int = None): + if duration_s is None: + duration_s = self.duration_s + logging.info(f'开始收集网络响应体,持续 {duration_s}s') + start = time.time() + known_request_ids = set() + + # 目标关键词(收藏/合集/视频) + url_keywords = ['aweme', 'mix', 'collection', 'favorite', 'note', 'api'] + + last_progress = 0 + while time.time() - start < duration_s: + try: + logs = self.driver.get_log('performance') + except Exception as e: + logging.warning(f'获取性能日志失败: {e}') + time.sleep(1) + continue + + for entry in logs: + try: + message = json.loads(entry['message'])['message'] + except Exception: + continue + + method = message.get('method') + params = message.get('params', {}) + + # 记录请求URL + if method == 'Network.requestWillBeSent': + req_id = params.get('requestId') + url = params.get('request', {}).get('url', '') + if any(k in url for k in url_keywords): + self.captured_responses.append({'requestId': req_id, 'url': url, 'type': 'request'}) + + # 响应到达,尝试获取响应体 + if method == 'Network.responseReceived': + req_id = params.get('requestId') + url = params.get('response', {}).get('url', '') + type_ = params.get('type') # XHR, Fetch, Document + if req_id and req_id not in known_request_ids: + known_request_ids.add(req_id) + # 仅处理XHR/Fetch + if type_ in ('XHR', 'Fetch') and any(k in url for k in url_keywords): + try: + body_obj = self.driver.execute_cdp_cmd('Network.getResponseBody', {'requestId': req_id}) + body_text = body_obj.get('body', '') + # 可能是base64编码 + if body_obj.get('base64Encoded'): + try: + import base64 + body_text = base64.b64decode(body_text).decode('utf-8', errors='ignore') + except Exception: + pass + + # 特殊处理mix/aweme API - 获取合集中的视频列表 + if '/aweme/v1/web/mix/aweme/' in url: + self.parse_mix_aweme_response(body_text, url, req_id) + else: + # 解析play_vv + self.parse_play_vv_from_text(body_text, url, req_id) + except Exception: + # 某些响应不可获取或过大 + pass + elapsed = int(time.time() - start) + if elapsed - last_progress >= 5: + last_progress = elapsed + logging.info(f'进度: {elapsed}/{duration_s}s, 已发现play_vv候选 {len(self.play_vv_items)}') + time.sleep(0.8) + + logging.info(f'网络收集完成,共发现 {len(self.play_vv_items)} 个play_vv候选') + + # 更新所有条目的aweme_ids字段 + self._update_aweme_ids_for_existing_items() + + def _update_aweme_ids_for_existing_items(self): + """更新所有已存在条目的aweme_ids字段""" + if not hasattr(self, 'mix_aweme_mapping') or not self.mix_aweme_mapping: + logging.info('没有mix_aweme_mapping数据,跳过aweme_ids更新') + return + + updated_count = 0 + for item in self.play_vv_items: + mix_id = item.get('mix_id') + if mix_id and mix_id in self.mix_aweme_mapping: + aweme_ids = self.mix_aweme_mapping[mix_id] + if aweme_ids and len(aweme_ids) > 0: + item['aweme_ids'] = aweme_ids.copy() + updated_count += 1 + logging.info(f'更新合集 {item.get("mix_name", "未知")} (ID: {mix_id}) 的aweme_ids,包含 {len(aweme_ids)} 个视频') + + logging.info(f'已更新 {updated_count} 个条目的aweme_ids字段') + + def parse_ssr_data(self): + logging.info('尝试解析页面SSR数据') + # 尝试直接从window对象获取 + keys = ['_SSR_HYDRATED_DATA', 'RENDER_DATA'] + for key in keys: + try: + data = self.driver.execute_script(f'return window.{key}') + if data: + text = json.dumps(data, ensure_ascii=False) + self.parse_play_vv_from_text(text, f'page_{key}', None) + logging.info(f'从 {key} 中解析完成') + except Exception: + continue + + # 兜底:从page_source中正则查找 + try: + page_source = self.driver.page_source + self.parse_play_vv_from_text(page_source, 'page_source', None) + # 同时尝试识别statis结构中的play_vv + for m in re.findall(r'"statis"\s*:\s*\{[^}]*"play_vv"\s*:\s*(\d+)[^}]*\}', page_source): + try: + vv = int(m) + # 检查是否已经存在相同的play_vv + if not any(item['play_vv'] == vv for item in self.play_vv_items): + self.play_vv_items.append({ + 'play_vv': vv, + 'formatted': self.format_count(vv), + 'url': 'page_source_statis', + 'request_id': None, + 'mix_name': '', # 从statis中无法获取合集名称 + 'video_url': '', # 从statis中无法获取链接 + 'timestamp': datetime.now().isoformat() + }) + except Exception: + pass + except Exception: + pass + + def dedupe(self): + # 去重按play_vv数值 + unique = [] + seen = set() + for item in self.play_vv_items: + vv = item['play_vv'] + if vv not in seen: + unique.append(item) + seen.add(vv) + self.play_vv_items = unique + + def save_results(self): + ts = datetime.now().strftime('%Y%m%d_%H%M%S') + json_file = f'douyin_cdp_play_vv_{ts}.json' + txt_file = f'douyin_cdp_play_vv_{ts}.txt' + + # 保存到JSON文件 + with open(json_file, 'w', encoding='utf-8') as f: + json.dump({ + 'timestamp': ts, + 'start_url': self.start_url, + 'play_vv_items': self.play_vv_items, + 'captured_count': len(self.play_vv_items) + }, f, ensure_ascii=False, indent=2) + + # 保存到TXT文件 + with open(txt_file, 'w', encoding='utf-8') as f: + f.write('抖音收藏合集真实播放量(play_vv) - Selenium+CDP\n') + f.write('=' * 60 + '\n\n') + if self.play_vv_items: + sorted_items = sorted(self.play_vv_items, key=lambda x: x['play_vv'], reverse=True) + f.write(f"✅ 提取到 {len(sorted_items)} 个唯一play_vv数值\n\n") + for i, item in enumerate(sorted_items, 1): + mix_info = f" - {item.get('mix_name', '未知合集')}" if item.get('mix_name') else "" + video_info = f" (链接: {item.get('video_url', '未知')})" if item.get('video_url') else "" + f.write(f"{i}. play_vv: {item['play_vv']:,} ({item['formatted']}){mix_info}{video_info}\n") + f.write(f" 来源: {item['url']}\n\n") + total = sum(x['play_vv'] for x in sorted_items) + f.write(f"📊 总播放量: {total:,}\n") + f.write(f"📈 最高播放量: {sorted_items[0]['play_vv']:,} ({sorted_items[0]['formatted']})\n") + else: + f.write('❌ 未能提取到play_vv数值\n') + f.write('可能原因:\n') + f.write('- 仍需登录或权限受限\n') + f.write('- API响应体不可读取或被加密\n') + f.write('- 页面结构或接口策略发生变更\n') + + # 保存到MongoDB + self.save_to_mongodb() + + logging.info('结果已保存: %s, %s', json_file, txt_file) + + def save_to_mongodb(self): + """将数据保存到MongoDB""" + if self.mongo_client is None or self.collection is None: + logging.warning('MongoDB未连接,跳过数据库保存') + return + + if not self.play_vv_items: + logging.info('没有数据需要保存到MongoDB') + return + + try: + batch_time = datetime.now() + documents = [] + + for item in self.play_vv_items: + # 保留用户要求的7个字段 + aweme_ids作为短剧集数ID列表 + cover_image_url作为合集封面图片完整链接 + doc = { + 'batch_time': batch_time, + 'mix_name': item.get('mix_name', ''), + 'video_url': item.get('video_url', ''), + 'playcount': item.get('formatted', ''), + 'play_vv': item.get('play_vv', 0), + 'request_id': item.get('request_id', ''), + 'rank': 0, # 临时设置,后面会重新计算 + 'aweme_ids': item.get('aweme_ids', []), # 短剧集数ID列表 + 'cover_image_url': item.get('cover_image_url', ''), # 合集封面图片主链接(完整URL) + 'cover_backup_urls': item.get('cover_backup_urls', []) # 封面图片备用链接列表 + } + documents.append(doc) + + # 按播放量降序排序并添加排名 + documents.sort(key=lambda x: x['play_vv'], reverse=True) + for i, doc in enumerate(documents, 1): + doc['rank'] = i + + # 批量插入 + result = self.collection.insert_many(documents) + logging.info(f'成功保存 {len(result.inserted_ids)} 条记录到MongoDB') + + # 输出统计信息 + total_play_vv = sum(doc['play_vv'] for doc in documents) + max_play_vv = max(doc['play_vv'] for doc in documents) if documents else 0 + + logging.info(f'MongoDB保存统计: 总播放量={total_play_vv:,}, 最高播放量={max_play_vv:,}') + logging.info(f'保存的字段: batch_time, mix_name, video_url, playcount, play_vv, request_id, rank, aweme_ids, cover_image_url, cover_backup_urls') + + # 统计封面图片提取情况 + cover_count = sum(1 for doc in documents if doc.get('cover_image_url')) + backup_count = sum(1 for doc in documents if doc.get('cover_backup_urls')) + logging.info(f'封面图片统计: {cover_count}/{len(documents)} 个合集有主封面链接, {backup_count} 个合集有备用链接') + + # 输出aweme_ids统计信息 + total_episodes = sum(len(doc.get('aweme_ids', [])) for doc in documents) + logging.info(f'短剧集数统计: 总共收集到 {total_episodes} 集视频ID') + + except Exception as e: + logging.error(f'保存到MongoDB时出错: {e}') + + def run(self): + try: + self.setup_driver() + self.navigate() + self.ensure_login() + self.trigger_loading() + self.collect_network_bodies() + self.parse_ssr_data() + self.dedupe() + self.save_results() + logging.info('完成,play_vv数量: %d', len(self.play_vv_items)) + finally: + if self.driver: + try: + self.driver.quit() + except Exception: + pass + + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser(description='Selenium+CDP 抖音play_vv抓取器') + parser.add_argument('--url', default='https://www.douyin.com/user/self?showTab=favorite_collection&showSubTab=compilation', help='收藏合集列表页面URL') + parser.add_argument('--auto', action='store_true', help='自动继续,跳过回车等待') + parser.add_argument('--duration', type=int, default=60, help='网络响应收集时长(秒)') + parser.add_argument('--driver', help='覆盖chromedriver路径') + args = parser.parse_args() + + if args.driver: + os.environ['OVERRIDE_CHROMEDRIVER'] = args.driver + if args.auto: + os.environ['AUTO_CONTINUE'] = '1' + + print('=== Selenium+CDP 抖音play_vv抓取器 ===') + print('将复用本地Chrome配置并抓取网络响应中的play_vv') + scraper = DouyinPlayVVScraper(args.url, auto_continue=args.auto, duration_s=args.duration) + scraper.run() \ No newline at end of file diff --git a/scripts/miniprogram_api_server.py b/scripts/miniprogram_api_server.py new file mode 100644 index 0000000..e835816 --- /dev/null +++ b/scripts/miniprogram_api_server.py @@ -0,0 +1,593 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +小程序专用抖音播放量数据API服务器 +优化的数据格式和接口设计,专为小程序使用 +""" + +from flask import Flask, jsonify, request +from flask_cors import CORS +from pymongo import MongoClient +from datetime import datetime, timedelta +import logging +import os +import re + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('miniprogram_api.log', encoding='utf-8'), + logging.StreamHandler() + ] +) + +app = Flask(__name__) +CORS(app) # 允许跨域访问,支持小程序调用 + +class MiniprogramAPI: + def __init__(self): + self.client = None + self.db = None + self.collection = None + self.connect_mongodb() + + def connect_mongodb(self): + """连接MongoDB数据库""" + try: + self.client = MongoClient('mongodb://localhost:27017/') + # 测试连接 + self.client.admin.command('ping') + # 使用数据库与集合 + self.db = self.client['douyin_data'] + self.collection = self.db['play_vv_records'] + logging.info("MongoDB连接成功") + return True + except Exception as e: + logging.error(f"MongoDB连接失败: {e}") + return False + + def format_playcount(self, playcount_str): + """格式化播放量字符串为数字""" + if not playcount_str: + return 0 + + try: + if isinstance(playcount_str, (int, float)): + return int(playcount_str) + + playcount_str = str(playcount_str).strip() + + # 处理亿、万等单位 + if "亿" in playcount_str: + num = float(re.findall(r'[\d.]+', playcount_str)[0]) + return int(num * 100000000) + elif "万" in playcount_str: + num = float(re.findall(r'[\d.]+', playcount_str)[0]) + return int(num * 10000) + else: + # 尝试直接转换数字 + return int(float(playcount_str)) + except: + return 0 + + def format_cover_url(self, cover_data): + """格式化封面图片URL""" + if not cover_data: + return "" + + if isinstance(cover_data, str): + return cover_data + elif isinstance(cover_data, dict) and 'url_list' in cover_data: + return cover_data['url_list'][0] if cover_data['url_list'] else "" + else: + return "" + + def format_time(self, time_obj): + """格式化时间""" + if not time_obj: + return "" + + if isinstance(time_obj, datetime): + return time_obj.strftime("%Y-%m-%d %H:%M:%S") + else: + return str(time_obj) + + def format_video_item(self, doc): + """格式化单个视频数据项 - 完全按照数据库原始字段返回""" + return { + "_id": str(doc.get("_id", "")), + "batch_time": self.format_time(doc.get("batch_time")), + "mix_name": doc.get("mix_name", ""), + "video_url": doc.get("video_url", ""), + "playcount": doc.get("playcount", ""), + "play_vv": doc.get("play_vv", 0), + "request_id": doc.get("request_id", ""), + "rank": doc.get("rank", 0), + "aweme_ids": doc.get("aweme_ids", []), + "cover_image_url": doc.get("cover_image_url", ""), + "cover_backup_urls": doc.get("cover_backup_urls", []) + } + + def get_video_list(self, page=1, limit=20, sort_by="playcount"): + """获取视频列表(分页)""" + try: + # 计算跳过的数量 + skip = (page - 1) * limit + + # 设置排序字段 + if sort_by == "growth": + # 按增长排序需要特殊处理 + return self.get_growth_videos(page, limit) + else: + sort_field = "play_vv" if sort_by == "playcount" else "batch_time" + sort_order = -1 # 降序 + + # 获取今天的日期 + today = datetime.now().date() + + # 只查询今天的数据 + query_condition = { + "batch_time": { + "$gte": datetime(today.year, today.month, today.day), + "$lt": datetime(today.year, today.month, today.day) + timedelta(days=1) + } + } + + # 查询数据并按短剧名称分组,取每个短剧的最新记录 + pipeline = [ + {"$match": query_condition}, + {"$sort": {"batch_time": -1}}, # 按时间倒序 + {"$group": { + "_id": "$mix_name", # 按短剧名称分组 + "latest_doc": {"$first": "$$ROOT"} # 取每个分组的第一条记录(最新记录) + }}, + {"$replaceRoot": {"newRoot": "$latest_doc"}}, + {"$sort": {sort_field: sort_order}}, + {"$skip": skip}, + {"$limit": limit} + ] + + docs = list(self.collection.aggregate(pipeline)) + + # 获取总数 + total_pipeline = [ + {"$match": query_condition}, + {"$sort": {"batch_time": -1}}, + {"$group": {"_id": "$mix_name"}}, + {"$count": "total"} + ] + total_result = list(self.collection.aggregate(total_pipeline)) + total = total_result[0]["total"] if total_result else 0 + + # 格式化数据 + video_list = [] + for doc in docs: + item = self.format_video_item(doc) + video_list.append(item) + + return { + "success": True, + "data": video_list, + "pagination": { + "page": page, + "limit": limit, + "total": total, + "pages": (total + limit - 1) // limit, + "has_next": page * limit < total, + "has_prev": page > 1 + }, + "sort_by": sort_by, + "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + + except Exception as e: + logging.error(f"获取视频列表失败: {e}") + return {"success": False, "message": f"获取数据失败: {str(e)}"} + + def get_growth_videos(self, page=1, limit=20, start_date=None, end_date=None): + """获取按播放量增长排序的视频列表""" + try: + # 计算跳过的数量 + skip = (page - 1) * limit + + # 如果没有提供日期,默认使用今天和昨天 + if not start_date or not end_date: + end_date = datetime.now().date() + start_date = end_date - timedelta(days=1) + else: + # 转换字符串日期为datetime对象 + if isinstance(start_date, str): + start_date = datetime.strptime(start_date, "%Y-%m-%d").date() + if isinstance(end_date, str): + end_date = datetime.strptime(end_date, "%Y-%m-%d").date() + + # 查询结束日期的数据 + end_cursor = self.collection.find({ + "batch_time": { + "$gte": datetime(end_date.year, end_date.month, end_date.day), + "$lt": datetime(end_date.year, end_date.month, end_date.day) + timedelta(days=1) + } + }) + end_data = list(end_cursor) + + # 查询开始日期的数据 + start_cursor = self.collection.find({ + "batch_time": { + "$gte": datetime(start_date.year, start_date.month, start_date.day), + "$lt": datetime(start_date.year, start_date.month, start_date.day) + timedelta(days=1) + } + }) + start_data = list(start_cursor) + + # 创建字典以便快速查找 + end_dict = {item["mix_name"]: item for item in end_data} + start_dict = {item["mix_name"]: item for item in start_data} + + # 计算增长数据 + growth_data = [] + for mix_name, end_item in end_dict.items(): + if mix_name in start_dict: + start_item = start_dict[mix_name] + growth = end_item.get("play_vv", 0) - start_item.get("play_vv", 0) + + # 只保留增长为正的数据 + if growth > 0: + item = self.format_video_item(end_item) + item["growth"] = growth + item["start_date"] = start_date.strftime("%Y-%m-%d") + item["end_date"] = end_date.strftime("%Y-%m-%d") + growth_data.append(item) + else: + # 如果开始日期没有数据,但结束日期有,也认为是新增长 + item = self.format_video_item(end_item) + item["growth"] = end_item.get("play_vv", 0) + item["start_date"] = start_date.strftime("%Y-%m-%d") + item["end_date"] = end_date.strftime("%Y-%m-%d") + growth_data.append(item) + + # 按增长值降序排序 + growth_data.sort(key=lambda x: x.get("growth", 0), reverse=True) + + # 分页处理 + total = len(growth_data) + paginated_data = growth_data[skip:skip + limit] + + # 添加排名 + for i, item in enumerate(paginated_data): + item["rank"] = skip + i + 1 + + return { + "success": True, + "data": paginated_data, + "pagination": { + "page": page, + "limit": limit, + "total": total, + "pages": (total + limit - 1) // limit, + "has_next": page * limit < total, + "has_prev": page > 1 + }, + "sort_by": "growth", + "date_range": { + "start_date": start_date.strftime("%Y-%m-%d"), + "end_date": end_date.strftime("%Y-%m-%d") + }, + "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + + except Exception as e: + logging.error(f"获取增长视频列表失败: {e}") + # 如果增长计算失败,返回按播放量排序的数据作为备选 + return self.get_video_list(page, limit, "playcount") + + def get_top_videos(self, limit=10): + """获取热门视频(TOP榜单)""" + try: + # 按播放量排序获取热门视频 + cursor = self.collection.find().sort("play_vv", -1).limit(limit) + docs = list(cursor) + + if not docs: + return {"success": False, "message": "暂无数据"} + + # 格式化数据 + top_list = [] + for doc in docs: + item = self.format_video_item(doc) + top_list.append(item) + + return { + "success": True, + "data": top_list, + "total": len(top_list), + "update_time": self.format_time(docs[0].get("batch_time")) if docs else "" + } + + except Exception as e: + logging.error(f"获取热门视频失败: {e}") + return {"success": False, "message": f"获取数据失败: {str(e)}"} + + def search_videos(self, keyword, page=1, limit=10): + """搜索视频""" + try: + if not keyword: + return {"success": False, "message": "请提供搜索关键词"} + + # 计算跳过的数量 + skip = (page - 1) * limit + + # 构建搜索条件(模糊匹配合集名称) + search_condition = { + "mix_name": {"$regex": keyword, "$options": "i"} + } + + # 查询数据 + cursor = self.collection.find(search_condition).sort("play_vv", -1).skip(skip).limit(limit) + docs = list(cursor) + + # 获取搜索结果总数 + total = self.collection.count_documents(search_condition) + + # 格式化数据 + search_results = [] + for doc in docs: + item = self.format_video_item(doc) + search_results.append(item) + + return { + "success": True, + "data": search_results, + "keyword": keyword, + "pagination": { + "page": page, + "limit": limit, + "total": total, + "pages": (total + limit - 1) // limit, + "has_next": page * limit < total, + "has_prev": page > 1 + }, + "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + + except Exception as e: + logging.error(f"搜索视频失败: {e}") + return {"success": False, "message": f"搜索失败: {str(e)}"} + + def get_video_detail(self, video_id): + """获取视频详情""" + try: + from bson import ObjectId + + # 尝试通过ObjectId查找 + try: + doc = self.collection.find_one({"_id": ObjectId(video_id)}) + except: + # 如果ObjectId无效,尝试其他字段 + doc = self.collection.find_one({ + "$or": [ + {"mix_name": video_id}, + {"request_id": video_id} + ] + }) + + if not doc: + return {"success": False, "message": "未找到视频信息"} + + # 格式化详细信息 - 只返回数据库原始字段 + detail = self.format_video_item(doc) + + return { + "success": True, + "data": detail, + "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + + except Exception as e: + logging.error(f"获取视频详情失败: {e}") + return {"success": False, "message": f"获取详情失败: {str(e)}"} + + def get_statistics(self): + """获取统计信息""" + try: + # 基本统计 + total_videos = self.collection.count_documents({}) + + if total_videos == 0: + return {"success": False, "message": "暂无数据"} + + # 播放量统计 + pipeline = [ + { + "$group": { + "_id": None, + "total_playcount": {"$sum": "$play_vv"}, + "avg_playcount": {"$avg": "$play_vv"}, + "max_playcount": {"$max": "$play_vv"}, + "min_playcount": {"$min": "$play_vv"} + } + } + ] + + stats_result = list(self.collection.aggregate(pipeline)) + stats = stats_result[0] if stats_result else {} + + # 获取最新更新时间 + latest_doc = self.collection.find().sort("batch_time", -1).limit(1) + latest_time = "" + if latest_doc: + latest_list = list(latest_doc) + if latest_list: + latest_time = self.format_time(latest_list[0].get("batch_time")) + + # 热门分类统计(按播放量区间) + categories = [ + {"name": "超热门", "min": 100000000, "count": 0}, # 1亿+ + {"name": "热门", "min": 50000000, "max": 99999999, "count": 0}, # 5000万-1亿 + {"name": "中等", "min": 10000000, "max": 49999999, "count": 0}, # 1000万-5000万 + {"name": "一般", "min": 0, "max": 9999999, "count": 0} # 1000万以下 + ] + + for category in categories: + if "max" in category: + count = self.collection.count_documents({ + "play_vv": {"$gte": category["min"], "$lte": category["max"]} + }) + else: + count = self.collection.count_documents({ + "play_vv": {"$gte": category["min"]} + }) + category["count"] = count + + return { + "success": True, + "data": { + "total_videos": total_videos, + "total_playcount": stats.get("total_playcount", 0), + "avg_playcount": int(stats.get("avg_playcount", 0)), + "max_playcount": stats.get("max_playcount", 0), + "min_playcount": stats.get("min_playcount", 0), + "categories": categories, + "latest_update": latest_time + }, + "update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + + except Exception as e: + logging.error(f"获取统计信息失败: {e}") + return {"success": False, "message": f"获取统计失败: {str(e)}"} + +# 创建API实例 +api = MiniprogramAPI() + +# API路由定义 +@app.route('/') +def index(): + """API首页""" + return jsonify({ + "name": "小程序抖音播放量数据API", + "version": "2.0", + "description": "专为小程序优化的抖音播放量数据接口", + "endpoints": { + "/api/videos": "获取视频列表 (支持分页和排序)", + "/api/top": "获取热门视频榜单", + "/api/search": "搜索视频", + "/api/detail": "获取视频详情", + "/api/stats": "获取统计信息", + "/api/health": "健康检查" + }, + "features": [ + "分页支持", + "多种排序方式", + "搜索功能", + "详情查看", + "统计分析", + "小程序优化" + ] + }) + +@app.route('/api/videos') +def get_videos(): + """获取视频列表""" + page = request.args.get('page', 1, type=int) + limit = request.args.get('limit', 20, type=int) + sort_by = request.args.get('sort', 'playcount') # playcount, time, 或 growth + start_date = request.args.get('start_date', None) + end_date = request.args.get('end_date', None) + + # 限制参数范围 + page = max(1, page) + limit = min(50, max(1, limit)) # 限制每页最多50条 + + if sort_by == "growth": + # 增长排序需要特殊处理,支持日期参数 + result = api.get_growth_videos(page, limit, start_date, end_date) + else: + result = api.get_video_list(page, limit, sort_by) + + return jsonify(result) + +@app.route('/api/top') +def get_top(): + """获取热门视频榜单""" + limit = request.args.get('limit', 10, type=int) + limit = min(50, max(1, limit)) # 限制最多50条 + + result = api.get_top_videos(limit) + return jsonify(result) + +@app.route('/api/search') +def search(): + """搜索视频""" + keyword = request.args.get('q', '').strip() + page = request.args.get('page', 1, type=int) + limit = request.args.get('limit', 10, type=int) + + # 限制参数范围 + page = max(1, page) + limit = min(30, max(1, limit)) # 搜索结果限制每页最多30条 + + result = api.search_videos(keyword, page, limit) + return jsonify(result) + +@app.route('/api/detail') +def get_detail(): + """获取视频详情""" + video_id = request.args.get('id', '').strip() + + if not video_id: + return jsonify({"success": False, "message": "请提供视频ID"}) + + result = api.get_video_detail(video_id) + return jsonify(result) + +@app.route('/api/stats') +def get_stats(): + """获取统计信息""" + result = api.get_statistics() + return jsonify(result) + +@app.route('/api/health') +def health_check(): + """健康检查""" + try: + # 检查MongoDB连接 + api.client.admin.command('ping') + + # 获取基本信息 + total_count = api.collection.count_documents({}) + + return jsonify({ + "success": True, + "status": "healthy", + "mongodb": "connected", + "total_records": total_count, + "server_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "api_version": "2.0" + }) + + except Exception as e: + return jsonify({ + "success": False, + "status": "unhealthy", + "mongodb": "disconnected", + "error": str(e), + "server_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + }) + +if __name__ == '__main__': + print("启动小程序专用抖音播放量API服务器...") + print("API地址: http://localhost:5001") + print("小程序API接口列表:") + print(" - GET /api/videos?page=1&limit=20&sort=playcount 获取视频列表(总播放量排序)") + print(" - GET /api/videos?page=1&limit=20&sort=growth 获取视频列表(增长排序,默认昨天到今天的差值)") + print(" - GET /api/videos?page=1&limit=20&sort=growth&start_date=2025-10-16&end_date=2025-10-17 获取视频列表(自定义日期范围增长排序)") + print(" - GET /api/top?limit=10 获取热门榜单") + print(" - GET /api/search?q=关键词&page=1&limit=10 搜索视频") + print(" - GET /api/detail?id=视频ID 获取视频详情") + print(" - GET /api/stats 获取统计信息") + print(" - GET /api/health 健康检查") + print("专为小程序优化:分页、搜索、详情、统计、增长排序、自定义日期范围") + + app.run(host='0.0.0.0', port=5001, debug=True) \ No newline at end of file diff --git a/scripts/mongodb_quick_view.py b/scripts/mongodb_quick_view.py new file mode 100644 index 0000000..4b04267 --- /dev/null +++ b/scripts/mongodb_quick_view.py @@ -0,0 +1,294 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +MongoDB数据库快速查看工具 +一次性显示数据库结构、统计信息和最新数据 +""" + +import pymongo +from pymongo import MongoClient +from datetime import datetime +import json +from collections import defaultdict + +def connect_mongodb(connection_string='mongodb://localhost:27017/'): + """连接到MongoDB""" + try: + client = MongoClient(connection_string, serverSelectionTimeoutMS=5000) + client.admin.command('ping') + print(f"✅ 成功连接到MongoDB: {connection_string}") + return client + except Exception as e: + print(f"❌ 连接MongoDB失败: {e}") + return None + +def analyze_document_schema(document): + """分析文档结构""" + if not document: + return {} + + schema = {} + for key, value in document.items(): + if key == '_id': + schema[key] = {'type': 'ObjectId', 'example': str(value)} + elif isinstance(value, str): + schema[key] = {'type': 'string', 'example': value[:50] + '...' if len(value) > 50 else value} + elif isinstance(value, int): + schema[key] = {'type': 'integer', 'example': value} + elif isinstance(value, float): + schema[key] = {'type': 'float', 'example': value} + elif isinstance(value, bool): + schema[key] = {'type': 'boolean', 'example': value} + elif isinstance(value, datetime): + schema[key] = {'type': 'datetime', 'example': value.strftime('%Y-%m-%d %H:%M:%S')} + elif isinstance(value, list): + schema[key] = { + 'type': 'array', + 'length': len(value), + 'example': value[:3] if len(value) <= 3 else value[:3] + ['...'] + } + elif isinstance(value, dict): + schema[key] = { + 'type': 'object', + 'keys': list(value.keys())[:5], + 'example': {k: v for k, v in list(value.items())[:2]} + } + else: + schema[key] = {'type': type(value).__name__, 'example': str(value)[:50]} + + return schema + +def display_database_info(client): + """显示数据库信息""" + print("\n" + "="*80) + print("📊 MongoDB 数据库结构分析") + print("="*80) + + try: + db_names = client.list_database_names() + + for db_name in db_names: + if db_name in ['admin', 'local', 'config']: + continue + + db = client[db_name] + collections = db.list_collection_names() + + print(f"\n🗄️ 数据库: {db_name}") + print(f" 集合数量: {len(collections)}") + + for coll_name in collections: + collection = db[coll_name] + count = collection.count_documents({}) + + print(f"\n 📁 集合: {coll_name}") + print(f" 文档数量: {count:,}") + + if count > 0: + # 获取样本文档来分析结构 + sample_doc = collection.find_one() + schema = analyze_document_schema(sample_doc) + + if schema: + print(f" 📋 字段结构:") + for field_name, field_info in schema.items(): + print(f" • {field_name}: {field_info['type']}") + if 'example' in field_info: + example = field_info['example'] + if isinstance(example, str) and len(example) > 100: + example = example[:100] + "..." + print(f" 示例: {example}") + else: + print(f" ⚠️ 集合为空") + + except Exception as e: + print(f"❌ 获取数据库信息失败: {e}") + +def display_statistics(client, db_name='douyin_data', collection_name='play_vv_records'): + """显示统计信息""" + try: + db = client[db_name] + collection = db[collection_name] + + print(f"\n📊 统计信息 ({db_name}.{collection_name})") + print("-" * 50) + + # 基本统计 + total_count = collection.count_documents({}) + print(f"📈 总文档数: {total_count:,}") + + if total_count == 0: + print("⚠️ 集合为空,无法显示统计信息") + return + + # 时间范围统计 + time_fields = ['batch_time', 'created_at', 'timestamp'] + for field in time_fields: + if collection.find_one({field: {'$exists': True}}): + pipeline = [ + {'$group': { + '_id': None, + 'min_time': {'$min': f'${field}'}, + 'max_time': {'$max': f'${field}'} + }} + ] + result = list(collection.aggregate(pipeline)) + if result: + min_time = result[0]['min_time'] + max_time = result[0]['max_time'] + print(f"📅 时间范围 ({field}):") + print(f" 最早: {min_time.strftime('%Y-%m-%d %H:%M:%S')}") + print(f" 最新: {max_time.strftime('%Y-%m-%d %H:%M:%S')}") + break + + # 播放量统计 + playcount_fields = ['play_vv', 'playcount', 'play_count', 'views'] + for field in playcount_fields: + if collection.find_one({field: {'$exists': True, '$type': 'number'}}): + pipeline = [ + {'$group': { + '_id': None, + 'total_plays': {'$sum': f'${field}'}, + 'avg_plays': {'$avg': f'${field}'}, + 'max_plays': {'$max': f'${field}'}, + 'min_plays': {'$min': f'${field}'} + }} + ] + result = list(collection.aggregate(pipeline)) + if result: + stats = result[0] + print(f"🎬 播放量统计 ({field}):") + print(f" 总播放量: {stats['total_plays']:,}") + print(f" 平均播放量: {stats['avg_plays']:,.0f}") + print(f" 最高播放量: {stats['max_plays']:,}") + print(f" 最低播放量: {stats['min_plays']:,}") + break + + # 热门内容统计 + if collection.find_one({'mix_name': {'$exists': True}}): + print(f"\n🔥 热门内容 (按播放量排序):") + pipeline = [ + {'$match': {'play_vv': {'$exists': True, '$type': 'number'}}}, + {'$sort': {'play_vv': -1}}, + {'$limit': 5}, + {'$project': {'mix_name': 1, 'play_vv': 1, 'batch_time': 1}} + ] + top_content = list(collection.aggregate(pipeline)) + for i, content in enumerate(top_content, 1): + name = content.get('mix_name', '未知') + plays = content.get('play_vv', 0) + time_str = content.get('batch_time', datetime.now()).strftime('%m-%d %H:%M') + print(f" {i}. {name}: {plays:,} ({time_str})") + + except Exception as e: + print(f"❌ 获取统计信息失败: {e}") + +def display_recent_data(client, db_name='douyin_data', collection_name='play_vv_records', limit=3): + """显示最近的数据""" + try: + db = client[db_name] + collection = db[collection_name] + + print(f"\n📈 最近 {limit} 条数据 ({db_name}.{collection_name})") + print("-" * 80) + + # 尝试按时间字段排序 + time_fields = ['batch_time', 'created_at', 'timestamp', '_id'] + sort_field = None + + for field in time_fields: + if collection.find_one({field: {'$exists': True}}): + sort_field = field + break + + if sort_field: + recent_docs = list(collection.find().sort(sort_field, -1).limit(limit)) + else: + recent_docs = list(collection.find().limit(limit)) + + if not recent_docs: + print("⚠️ 没有找到数据") + return + + for i, doc in enumerate(recent_docs, 1): + print(f"\n📄 记录 {i}:") + display_document(doc) + + except Exception as e: + print(f"❌ 获取最近数据失败: {e}") + +def display_document(doc, indent=2): + """显示单个文档""" + spaces = " " * indent + + for key, value in doc.items(): + if key == '_id': + print(f"{spaces}🆔 {key}: {value}") + elif isinstance(value, datetime): + print(f"{spaces}📅 {key}: {value.strftime('%Y-%m-%d %H:%M:%S')}") + elif isinstance(value, str): + display_value = value[:100] + "..." if len(value) > 100 else value + print(f"{spaces}📝 {key}: {display_value}") + elif isinstance(value, (int, float)): + if key in ['playcount', 'play_count', 'views', 'play_vv']: + print(f"{spaces}📊 {key}: {value:,}") + else: + print(f"{spaces}🔢 {key}: {value}") + elif isinstance(value, list): + print(f"{spaces}📋 {key}: [{len(value)} 项]") + if len(value) > 0 and len(value) <= 3: + for item in value[:3]: + item_str = str(item)[:50] + "..." if len(str(item)) > 50 else str(item) + print(f"{spaces} - {item_str}") + elif len(value) > 3: + for item in value[:2]: + item_str = str(item)[:50] + "..." if len(str(item)) > 50 else str(item) + print(f"{spaces} - {item_str}") + print(f"{spaces} ... 还有 {len(value)-2} 项") + elif isinstance(value, dict): + print(f"{spaces}📦 {key}: {{对象}}") + if len(value) <= 3: + for k, v in value.items(): + v_str = str(v)[:50] + "..." if len(str(v)) > 50 else str(v) + print(f"{spaces} {k}: {v_str}") + else: + for k, v in list(value.items())[:2]: + v_str = str(v)[:50] + "..." if len(str(v)) > 50 else str(v) + print(f"{spaces} {k}: {v_str}") + print(f"{spaces} ... 还有 {len(value)-2} 个字段") + else: + print(f"{spaces}❓ {key}: {value}") + +def main(): + """主函数""" + print("🚀 MongoDB 数据库快速查看工具") + print(f"⏰ 查看时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + + # 连接数据库 + client = connect_mongodb() + if not client: + return + + try: + # 显示数据库结构 + display_database_info(client) + + # 显示统计信息 + display_statistics(client) + + # 显示最近数据 + display_recent_data(client) + + print(f"\n{'='*80}") + print("✅ 数据库查看完成!") + print("💡 提示: 运行 'python scripts/mongodb_viewer.py' 可以使用交互式查看器") + print("🔄 提示: 重新运行此脚本可以查看最新数据") + + except KeyboardInterrupt: + print("\n👋 程序被用户中断") + finally: + if client: + client.close() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/scripts/query_mongodb_data.py b/scripts/query_mongodb_data.py new file mode 100644 index 0000000..960819d --- /dev/null +++ b/scripts/query_mongodb_data.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +查询MongoDB中的抖音播放量数据 +""" + +from pymongo import MongoClient +from pymongo.errors import ConnectionFailure +from datetime import datetime + +def connect_mongodb(): + """连接MongoDB""" + try: + client = MongoClient('mongodb://localhost:27017/', serverSelectionTimeoutMS=5000) + client.admin.command('ping') + db = client['douyin——data'] + collection = db['playcounts'] + print("MongoDB连接成功") + return client, collection + except ConnectionFailure: + print("MongoDB连接失败,请确保MongoDB服务已启动") + return None, None + except Exception as e: + print(f"MongoDB连接出错: {e}") + return None, None + +def query_latest_batches(collection, limit=5): + """查询最近的几个批次数据""" + try: + # 按批次时间倒序获取最近的批次 + pipeline = [ + {"$group": { + "_id": "$batch_id", + "batch_time": {"$first": "$batch_time"}, + "count": {"$sum": 1} + }}, + {"$sort": {"batch_time": -1}}, + {"$limit": limit} + ] + + batches = list(collection.aggregate(pipeline)) + + if not batches: + print("暂无数据") + return + + print(f"\n===== 最近 {len(batches)} 个批次 =====") + for batch in batches: + batch_time = batch['batch_time'].strftime("%Y-%m-%d %H:%M:%S") + print(f"批次ID: {batch['_id']}, 时间: {batch_time}, 数据条数: {batch['count']}") + + # 显示该批次的具体数据,按播放量排序(如果有rank字段则按rank排序,否则按playcount_number排序) + batch_data = list(collection.find( + {"batch_id": batch['_id']}, + {"name": 1, "playcount": 1, "rank": 1, "playcount_number": 1, "_id": 0} + )) + + # 按rank排序(如果存在),否则按playcount_number降序排序 + if batch_data and 'rank' in batch_data[0]: + batch_data.sort(key=lambda x: x.get('rank', 999)) + elif batch_data and 'playcount_number' in batch_data[0]: + batch_data.sort(key=lambda x: x.get('playcount_number', 0), reverse=True) + + for i, item in enumerate(batch_data, 1): + rank_info = f"[第{item.get('rank', i)}名] " if 'rank' in item else "" + print(f" {rank_info}{item['name']}") + print(f" 播放量: {item['playcount']}") + print() + + except Exception as e: + print(f"查询数据失败: {e}") + +def query_by_name(collection, name_keyword): + """根据剧本名称关键词查询""" + try: + # 使用正则表达式进行模糊匹配 + query = {"name": {"$regex": name_keyword, "$options": "i"}} + results = list(collection.find(query).sort("batch_time", -1)) + + if not results: + print(f"未找到包含'{name_keyword}'的剧本") + return + + print(f"\n===== 包含'{name_keyword}'的剧本 =====") + for result in results: + batch_time = result['batch_time'].strftime("%Y-%m-%d %H:%M:%S") + print(f"剧本: {result['name']}") + print(f"播放量: {result['playcount']}") + print(f"抓取时间: {batch_time}") + print(f"批次ID: {result['batch_id']}") + print("-" * 30) + + except Exception as e: + print(f"查询失败: {e}") + +def main(): + print("抖音播放量数据查询工具") + print("=" * 40) + + client, collection = connect_mongodb() + if collection is None: + return + + try: + while True: + print("\n请选择操作:") + print("1. 查看最近的批次数据") + print("2. 根据剧本名称搜索") + print("3. 退出") + + choice = input("请输入选项 (1-3): ").strip() + + if choice == '1': + limit = input("显示最近几个批次? (默认5): ").strip() + try: + limit = int(limit) if limit else 5 + except ValueError: + limit = 5 + query_latest_batches(collection, limit) + + elif choice == '2': + keyword = input("请输入剧本名称关键词: ").strip() + if keyword: + query_by_name(collection, keyword) + else: + print("关键词不能为空") + + elif choice == '3': + break + + else: + print("无效选项,请重新选择") + + except KeyboardInterrupt: + print("\n用户中断操作") + finally: + if client: + client.close() + print("已断开MongoDB连接") + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/scripts/view_latest_data.py b/scripts/view_latest_data.py new file mode 100644 index 0000000..42e0f8c --- /dev/null +++ b/scripts/view_latest_data.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +查看MongoDB最新数据 - 始终按时间倒序排列 +""" + +from pymongo import MongoClient +from datetime import datetime + +def view_latest_data(limit=20): + """查看最新数据""" + try: + client = MongoClient('localhost', 27017) + db = client['douyin_data'] + collection = db['play_vv_records'] + + print("=== 抖音播放量最新数据 ===") + print(f"显示最新 {limit} 条记录(按时间倒序排列)") + print("=" * 80) + + # 获取最新数据,按时间倒序排列 + latest_docs = list(collection.find().sort('batch_time', -1).limit(limit)) + + if not latest_docs: + print("没有找到数据") + return + + for i, doc in enumerate(latest_docs, 1): + print(f"\n记录 #{i}") + print("-" * 50) + print(f"合集名称: {doc.get('mix_name', '未知')}") + print(f"播放量: {doc.get('play_vv', 0):,} ({doc.get('playcount', '')})") + print(f"合集链接: {doc.get('video_url', '')}") + print(f"保存时间: {doc.get('batch_time', '')}") + print(f"视频ID数: {len(doc.get('aweme_ids', []))}") + print(f"封面图片: {'有' if doc.get('cover_image_url') else '无'}") + + # 显示统计信息 + total_count = collection.count_documents({}) + today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + today_count = collection.count_documents({'batch_time': {'$gte': today_start}}) + + print(f"\n" + "=" * 80) + print(f"统计信息:") + print(f"- 总记录数: {total_count}") + print(f"- 今天记录数: {today_count}") + print(f"- 最新记录时间: {latest_docs[0].get('batch_time')}") + + except Exception as e: + print(f"查看数据时出错: {e}") + +if __name__ == '__main__': + import sys + limit = int(sys.argv[1]) if len(sys.argv) > 1 else 20 + view_latest_data(limit) \ No newline at end of file